hbase20xsqlreader 插件文档

hbase20xsqlreader 插件文档 1 快速介绍 hbase20xsqlreader插件实现了从Phoenix(HBase SQL)读取数据,对应版本为HBase2.X和Phoenix5.X。 2 实现原理 简而言之,hbase20xsqlreader通过Phoenix轻客户端去连接Phoenix QueryServer,并根据用户配置信息生成查询SELECT 语句,然后发送到QueryServer读取HBase数据,并将返回结果使用DataX自定义的数据类型拼装为抽象的数据集,最终传递给下游Writer处理。 3 功能说明 3.1 配置样例 配置一个从Phoenix同步抽取数据到本地的作业: { "job": { "content": [ { "reader": { "name": "hbase20xsqlreader", //指定插件为hbase20xsqlreader "parameter": { "queryServerAddress": "http://127.0.0.1:8765", //填写连接Phoenix QueryServer地址 "serialization": "PROTOBUF", //QueryServer序列化格式 "table": "TEST", //读取表名 "column": ["ID", "NAME"], //所要读取列名 "splitKey": "ID" //切分列,必须是表主键 } }, "writer": { "name": "streamwriter", "parameter": { "encoding": "UTF-8", "print": true } } } ], "setting": { "speed": { "channel": "3" } } } } 3.2 参数说明 queryServerAddress 描述:hbase20xsqlreader需要通过Phoenix轻客户端去连接Phoenix QueryServer,因此这里需要填写对应QueryServer地址。 增强版/Lindorm 用户若需透传user, password参数,可以在queryServerAddress后增加对应可选属性. 格式参考:http://127.0.0.1:8765;user=root;password=root 必选:是 默认值:无 serialization 描述:QueryServer使用的序列化协议 必选:否 默认值:PROTOBUF table 描述:所要读取表名 必选:是 默认值:无 schema 描述:表所在的schema 必选:否 默认值:无 column 描述:填写需要从phoenix表中读取的列名集合,使用JSON的数组描述字段信息,空值表示读取所有列。 必选: 否 默认值:全部列 splitKey...

February 2, 2021

HBase20xsqlwriter插件文档

HBase20xsqlwriter插件文档 1. 快速介绍 HBase20xsqlwriter实现了向hbase中的SQL表(phoenix)批量导入数据的功能。Phoenix因为对rowkey做了数据编码,所以,直接使用HBaseAPI进行写入会面临手工数据转换的问题,麻烦且易错。本插件提供了SQL方式直接向Phoenix表写入数据。 在底层实现上,通过Phoenix QueryServer的轻客户端驱动,执行UPSERT语句向Phoenix写入数据。 1.1 支持的功能 支持带索引的表的数据导入,可以同步更新所有的索引表 1.2 限制 要求版本为Phoenix5.x及HBase2.x 仅支持通过Phoenix QeuryServer导入数据,因此您Phoenix必须启动QueryServer服务才能使用本插件 不支持清空已有表数据 仅支持通过phoenix创建的表,不支持原生HBase表 不支持带时间戳的数据导入 2. 实现原理 通过Phoenix轻客户端,连接Phoenix QueryServer服务,执行UPSERT语句向表中批量写入数据。因为使用上层接口,所以,可以同步更新索引表。 3. 配置说明 3.1 配置样例 { "job": { "entry": { "jvm": "-Xms2048m -Xmx2048m" }, "content": [ { "reader": { "name": "txtfilereader", "parameter": { "path": "/Users/shf/workplace/datax_test/hbase20xsqlwriter/txt/normal.txt", "charset": "UTF-8", "column": [ { "index": 0, "type": "String" }, { "index": 1, "type": "string" }, { "index": 2, "type": "string" }, { "index": 3, "type": "string" } ], "fieldDelimiter": "," } }, "writer": { "name": "hbase20xsqlwriter", "parameter": { "batchSize": "100", "column": [ "UID", "TS", "EVENTID", "CONTENT" ], "queryServerAddress": "http://127.0.0.1:8765", "nullMode": "skip", "table": "目标hbase表名,大小写有关" } } } ], "setting": { "speed": { "channel": 5 } } } } 3....

February 2, 2021

KingbaseesReader 插件文档

KingbaseesReader 插件文档 1 快速介绍 KingbaseesReader插件实现了从KingbaseES读取数据。在底层实现上,KingbaseesReader通过JDBC连接远程KingbaseES数据库,并执行相应的sql语句将数据从KingbaseES库中SELECT出来。 2 实现原理 简而言之,KingbaseesReader通过JDBC连接器连接到远程的KingbaseES数据库,并根据用户配置的信息生成查询SELECT SQL语句并发送到远程KingbaseES数据库,并将该SQL执行返回结果使用DataX自定义的数据类型拼装为抽象的数据集,并传递给下游Writer处理。 对于用户配置Table、Column、Where的信息,KingbaseesReader将其拼接为SQL语句发送到KingbaseES数据库;对于用户配置querySql信息,KingbaseesReader直接将其发送到KingbaseES数据库。 3 功能说明 3.1 配置样例 配置一个从KingbaseES数据库同步抽取数据到本地的作业: { "job": { "setting": { "speed": { //设置传输速度,单位为byte/s,DataX运行会尽可能达到该速度但是不超过它. "byte": 1048576 }, //出错限制 "errorLimit": { //出错的record条数上限,当大于该值即报错。 "record": 0, //出错的record百分比上限 1.0表示100%,0.02表示2% "percentage": 0.02 } }, "content": [ { "reader": { "name": "kingbaseesreader", "parameter": { // 数据库连接用户名 "username": "xx", // 数据库连接密码 "password": "xx", "column": [ "id","name" ], //切分主键 "splitPk": "id", "connection": [ { "table": [ "table" ], "jdbcUrl": [ "jdbc:kingbase8://host:port/database" ] } ] } }, "writer": { //writer类型 "name": "streamwriter", //是否打印内容 "parameter": { "print":true, } } } ] } } 配置一个自定义SQL的数据库同步任务到本地内容的作业: { "job": { "setting": { "speed": 1048576 }, "content": [ { "reader": { "name": "kingbaseesreader", "parameter": { "username": "xx", "password": "xx", "where": "", "connection": [ { "querySql": [ "select db_id,on_line_flag from db_info where db_id < 10;" ], "jdbcUrl": [ "jdbc:kingbase8://host:port/database", "jdbc:kingbase8://host:port/database" ] } ] } }, "writer": { "name": "streamwriter", "parameter": { "print": false, "encoding": "UTF-8" } } } ] } } 3....

February 2, 2021

MysqlReader 插件文档

MysqlReader 插件文档 1 快速介绍 MysqlReader插件实现了从Mysql读取数据。在底层实现上,MysqlReader通过JDBC连接远程Mysql数据库,并执行相应的sql语句将数据从mysql库中SELECT出来。 不同于其他关系型数据库,MysqlReader不支持FetchSize. 2 实现原理 简而言之,MysqlReader通过JDBC连接器连接到远程的Mysql数据库,并根据用户配置的信息生成查询SELECT SQL语句,然后发送到远程Mysql数据库,并将该SQL执行返回结果使用DataX自定义的数据类型拼装为抽象的数据集,并传递给下游Writer处理。 对于用户配置Table、Column、Where的信息,MysqlReader将其拼接为SQL语句发送到Mysql数据库;对于用户配置querySql信息,MysqlReader直接将其发送到Mysql数据库。 3 功能说明 3.1 配置样例 配置一个从Mysql数据库同步抽取数据到本地的作业: { "job": { "setting": { "speed": { "channel": 3 }, "errorLimit": { "record": 0, "percentage": 0.02 } }, "content": [ { "reader": { "name": "mysqlreader", "parameter": { "username": "root", "password": "root", "column": [ "id", "name" ], "splitPk": "db_id", "connection": [ { "table": [ "table" ], "jdbcUrl": [ "jdbc:mysql://127.0.0.1:3306/database" ] } ] } }, "writer": { "name": "streamwriter", "parameter": { "print":true } } } ] } } 配置一个自定义SQL的数据库同步任务到本地内容的作业: { "job": { "setting": { "speed": { "channel":1 } }, "content": [ { "reader": { "name": "mysqlreader", "parameter": { "username": "root", "password": "root", "connection": [ { "querySql": [ "select db_id,on_line_flag from db_info where db_id < 10;" ], "jdbcUrl": [ "jdbc:mysql://bad_ip:3306/database", "jdbc:mysql://127....

February 2, 2021

OpenTSDBReader 插件文档

OpenTSDBReader 插件文档 1 快速介绍 OpenTSDBReader 插件实现了从 OpenTSDB 读取数据。OpenTSDB 是主要由 Yahoo 维护的、可扩展的、分布式时序数据库,与阿里巴巴自研 TSDB 的关系与区别详见阿里云官网:《相比 OpenTSDB 优势》 2 实现原理 在底层实现上,OpenTSDBReader 通过 HTTP 请求链接到 OpenTSDB 实例,利用 /api/config 接口获取到其底层存储 HBase 的连接信息,再利用 AsyncHBase 框架连接 HBase,通过 Scan 的方式将数据点扫描出来。整个同步的过程通过 metric 和时间段进行切分,即某个 metric 在某一个小时内的数据迁移,组合成一个迁移 Task。 3 功能说明 3.1 配置样例 配置一个从 OpenTSDB 数据库同步抽取数据到本地的作业: { "job": { "content": [ { "reader": { "name": "opentsdbreader", "parameter": { "endpoint": "http://localhost:4242", "column": [ "m" ], "beginDateTime": "2019-01-01 00:00:00", "endDateTime": "2019-01-01 03:00:00" } }, "writer": { "name": "streamwriter", "parameter": { "encoding": "UTF-8", "print": true } } } ], "setting": { "speed": { "channel": 1 } } } } 3.2 参数说明 name 描述:本插件的名称 必选:是 默认值:opentsdbreader parameter...

February 2, 2021

OracleReader 插件文档

OracleReader 插件文档 1 快速介绍 OracleReader插件实现了从Oracle读取数据。在底层实现上,OracleReader通过JDBC连接远程Oracle数据库,并执行相应的sql语句将数据从Oracle库中SELECT出来。 2 实现原理 简而言之,OracleReader通过JDBC连接器连接到远程的Oracle数据库,并根据用户配置的信息生成查询SELECT SQL语句并发送到远程Oracle数据库,并将该SQL执行返回结果使用DataX自定义的数据类型拼装为抽象的数据集,并传递给下游Writer处理。 对于用户配置Table、Column、Where的信息,OracleReader将其拼接为SQL语句发送到Oracle数据库;对于用户配置querySql信息,Oracle直接将其发送到Oracle数据库。 3 功能说明 3.1 配置样例 配置一个从Oracle数据库同步抽取数据到本地的作业: { "job": { "setting": { "speed": { //设置传输速度 byte/s 尽量逼近这个速度但是不高于它. // channel 表示通道数量,byte表示通道速度,如果单通道速度1MB,配置byte为1048576表示一个channel "byte": 1048576 }, //出错限制 "errorLimit": { //先选择record "record": 0, //百分比 1表示100% "percentage": 0.02 } }, "content": [ { "reader": { "name": "oraclereader", "parameter": { // 数据库连接用户名 "username": "root", // 数据库连接密码 "password": "root", "column": [ "id","name" ], //切分主键 "splitPk": "db_id", "connection": [ { "table": [ "table" ], "jdbcUrl": [ "jdbc:oracle:thin:@[HOST_NAME]:PORT:[DATABASE_NAME]" ] } ] } }, "writer": { //writer类型 "name": "streamwriter", // 是否打印内容 "parameter": { "print": true } } } ] } } 配置一个自定义SQL的数据库同步任务到本地内容的作业: { "job": { "setting": { "speed": { "channel": 5 } }, "content": [ { "reader": { "name": "oraclereader", "parameter": { "username": "root", "password": "root", "where": "", "connection": [ { "querySql": [ "select db_id,on_line_flag from db_info where db_id < 10" ], "jdbcUrl": [ "jdbc:oracle:thin:@[HOST_NAME]:PORT:[DATABASE_NAME]" ] } ] } }, "writer": { "name": "streamwriter", "parameter": { "visible": false, "encoding": "UTF-8" } } } ] } } 3....

February 2, 2021

OTSReader 插件文档

OTSReader 插件文档 1 快速介绍 OTSReader插件实现了从OTS读取数据,并可以通过用户指定抽取数据范围可方便的实现数据增量抽取的需求。目前支持三种抽取方式: 全表抽取 范围抽取 指定分片抽取 OTS是构建在阿里云飞天分布式系统之上的 NoSQL数据库服务,提供海量结构化数据的存储和实时访问。OTS 以实例和表的形式组织数据,通过数据分片和负载均衡技术,实现规模上的无缝扩展。 2 实现原理 简而言之,OTSReader通过OTS官方Java SDK连接到OTS服务端,获取并按照DataX官方协议标准转为DataX字段信息传递给下游Writer端。 OTSReader会根据OTS的表范围,按照Datax并发的数目N,将范围等分为N份Task。每个Task都会有一个OTSReader线程来执行。 3 功能说明 3.1 配置样例 配置一个从OTS全表同步抽取数据到本地的作业: { "job": { "setting": { }, "content": [ { "reader": { "name": "otsreader", "parameter": { /* ----------- 必填 --------------*/ "endpoint":"", "accessId":"", "accessKey":"", "instanceName":"", // 导出数据表的表名 "table":"", // 需要导出的列名,支持重复列和常量列,区分大小写 // 常量列:类型支持STRING,INT,DOUBLE,BOOL和BINARY // 备注:BINARY需要通过Base64转换为对应的字符串传入插件 "column":[ {"name":"col1"}, // 普通列 {"name":"col2"}, // 普通列 {"name":"col3"}, // 普通列 {"type":"STRING", "value" : "bazhen"}, // 常量列(字符串) {"type":"INT", "value" : ""}, // 常量列(整形) {"type":"DOUBLE", "value" : ""}, // 常量列(浮点) {"type":"BOOL", "value" : ""}, // 常量列(布尔) {"type":"BINARY", "value" : "Base64(bin)"} // 常量列(二进制),使用Base64编码完成 ], "range":{ // 导出数据的起始范围 // 支持INF_MIN, INF_MAX, STRING, INT "begin":[ {"type":"INF_MIN"}, ], // 导出数据的结束范围 // 支持INF_MIN, INF_MAX, STRING, INT "end":[ {"type":"INF_MAX"}, ] } } }, "writer": {} } ] } } 配置一个定义抽取范围的OTSReader: { "job": { "setting": { "speed": { "byte":10485760 }, "errorLimit":0....

February 2, 2021

OTSWriter 插件文档

OTSWriter 插件文档 1 快速介绍 OTSWriter插件实现了向OTS写入数据,目前支持三种写入方式: PutRow,对应于OTS API PutRow,插入数据到指定的行,如果该行不存在,则新增一行;若该行存在,则覆盖原有行。 UpdateRow,对应于OTS API UpdateRow,更新指定行的数据,如果该行不存在,则新增一行;若该行存在,则根据请求的内容在这一行中新增、修改或者删除指定列的值。 DeleteRow,对应于OTS API DeleteRow,删除指定行的数据。 OTS是构建在阿里云飞天分布式系统之上的 NoSQL数据库服务,提供海量结构化数据的存储和实时访问。OTS 以实例和表的形式组织数据,通过数据分片和负载均衡技术,实现规模上的无缝扩展。 2 实现原理 简而言之,OTSWriter通过OTS官方Java SDK连接到OTS服务端,并通过SDK写入OTS服务端。OTSWriter本身对于写入过程做了很多优化,包括写入超时重试、异常写入重试、批量提交等Feature。 3 功能说明 3.1 配置样例 配置一个写入OTS作业: { "job": { "setting": { }, "content": [ { "reader": {}, "writer": { "name": "otswriter", "parameter": { "endpoint":"", "accessId":"", "accessKey":"", "instanceName":"", // 导出数据表的表名 "table":"", // Writer支持不同类型之间进行相互转换 // 如下类型转换不支持: // ================================ // int -> binary // double -> bool, binary // bool -> binary // bytes -> int, double, bool // ================================ // 需要导入的PK列名,区分大小写 // 类型支持:STRING,INT // 1. 支持类型转换,注意类型转换时的精度丢失 // 2. 顺序不要求和表的Meta一致 "primaryKey" : [ {"name":"pk1", "type":"string"}, {"name":"pk2", "type":"int"} ], // 需要导入的列名,区分大小写 // 类型支持STRING,INT,DOUBLE,BOOL和BINARY "column" : [ {"name":"col2", "type":"INT"}, {"name":"col3", "type":"STRING"}, {"name":"col4", "type":"STRING"}, {"name":"col5", "type":"BINARY"}, {"name":"col6", "type":"DOUBLE"} ], // 写入OTS的方式 // PutRow : 等同于OTS API中PutRow操作,检查条件是ignore // UpdateRow : 等同于OTS API中UpdateRow操作,检查条件是ignore // DeleteRow: 等同于OTS API中DeleteRow操作,检查条件是ignore "writeMode" : "PutRow" } } } ] } } 3....

February 2, 2021

PostgresqlReader 插件文档

PostgresqlReader 插件文档 1 快速介绍 PostgresqlReader插件实现了从PostgreSQL读取数据。在底层实现上,PostgresqlReader通过JDBC连接远程PostgreSQL数据库,并执行相应的sql语句将数据从PostgreSQL库中SELECT出来。 2 实现原理 简而言之,PostgresqlReader通过JDBC连接器连接到远程的PostgreSQL数据库,并根据用户配置的信息生成查询SELECT SQL语句并发送到远程PostgreSQL数据库,并将该SQL执行返回结果使用DataX自定义的数据类型拼装为抽象的数据集,并传递给下游Writer处理。 对于用户配置Table、Column、Where的信息,PostgresqlReader将其拼接为SQL语句发送到PostgreSQL数据库;对于用户配置querySql信息,PostgresqlReader直接将其发送到PostgreSQL数据库。 3 功能说明 3.1 配置样例 配置一个从PostgreSQL数据库同步抽取数据到本地的作业: { "job": { "setting": { "speed": { //设置传输速度,单位为byte/s,DataX运行会尽可能达到该速度但是不超过它. "byte": 1048576 }, //出错限制 "errorLimit": { //出错的record条数上限,当大于该值即报错。 "record": 0, //出错的record百分比上限 1.0表示100%,0.02表示2% "percentage": 0.02 } }, "content": [ { "reader": { "name": "postgresqlreader", "parameter": { // 数据库连接用户名 "username": "xx", // 数据库连接密码 "password": "xx", "column": [ "id","name" ], //切分主键 "splitPk": "id", "connection": [ { "table": [ "table" ], "jdbcUrl": [ "jdbc:postgresql://host:port/database" ] } ] } }, "writer": { //writer类型 "name": "streamwriter", //是否打印内容 "parameter": { "print":true, } } } ] } } 配置一个自定义SQL的数据库同步任务到本地内容的作业: { "job": { "setting": { "speed": 1048576 }, "content": [ { "reader": { "name": "postgresqlreader", "parameter": { "username": "xx", "password": "xx", "where": "", "connection": [ { "querySql": [ "select db_id,on_line_flag from db_info where db_id < 10;" ], "jdbcUrl": [ "jdbc:postgresql://host:port/database", "jdbc:postgresql://host:port/database" ] } ] } }, "writer": { "name": "streamwriter", "parameter": { "print": false, "encoding": "UTF-8" } } } ] } } 3....

February 2, 2021

RDBMSReader 插件文档

RDBMSReader 插件文档 1 快速介绍 RDBMSReader插件实现了从RDBMS读取数据。在底层实现上,RDBMSReader通过JDBC连接远程RDBMS数据库,并执行相应的sql语句将数据从RDBMS库中SELECT出来。目前支持达梦、db2、PPAS、Sybase数据库的读取。RDBMSReader是一个通用的关系数据库读插件,您可以通过注册数据库驱动等方式增加任意多样的关系数据库读支持。 2 实现原理 简而言之,RDBMSReader通过JDBC连接器连接到远程的RDBMS数据库,并根据用户配置的信息生成查询SELECT SQL语句并发送到远程RDBMS数据库,并将该SQL执行返回结果使用DataX自定义的数据类型拼装为抽象的数据集,并传递给下游Writer处理。 对于用户配置Table、Column、Where的信息,RDBMSReader将其拼接为SQL语句发送到RDBMS数据库;对于用户配置querySql信息,RDBMS直接将其发送到RDBMS数据库。 3 功能说明 3.1 配置样例 配置一个从RDBMS数据库同步抽取数据作业: { "job": { "setting": { "speed": { "byte": 1048576 }, "errorLimit": { "record": 0, "percentage": 0.02 } }, "content": [ { "reader": { "name": "rdbmsreader", "parameter": { "username": "xxx", "password": "xxx", "column": [ "id", "name" ], "splitPk": "pk", "connection": [ { "table": [ "table" ], "jdbcUrl": [ "jdbc:dm://ip:port/database" ] } ], "fetchSize": 1024, "where": "1 = 1" } }, "writer": { "name": "streamwriter", "parameter": { "print": true } } } ] } } 配置一个自定义SQL的数据库同步任务到ODPS的作业: { "job": { "setting": { "speed": { "byte": 1048576 }, "errorLimit": { "record": 0, "percentage": 0....

February 2, 2021