datax-kudu-plugin
datax-kudu-plugin datax kudu的writer插件 仅在kudu11进行过测试
datax-kudu-plugin datax kudu的writer插件 仅在kudu11进行过测试
datax-kudu-plugins datax kudu的writer插件 eg: { "name": "kuduwriter", "parameter": { "kuduConfig": { "kudu.master_addresses": "***", "timeout": 60000, "sessionTimeout": 60000 }, "table": "", "replicaCount": 3, "truncate": false, "writeMode": "upsert", "partition": { "range": { "column1": [ { "lower": "2020-08-25", "upper": "2020-08-26" }, { "lower": "2020-08-26", "upper": "2020-08-27" }, { "lower": "2020-08-27", "upper": "2020-08-28" } ] }, "hash": { "column": [ "column1" ], "number": 3 } }, "column": [ { "index": 0, "name": "c1", "type": "string", "primaryKey": true }, { "index": 1, "name": "c2", "type": "string", "compress": "DEFAULT_COMPRESSION", "encoding": "AUTO_ENCODING", "comment": "注解xxxx" } ], "batchSize": 1024, "bufferSize": 2048, "skipFail": false, "encoding": "UTF-8" } } 必须参数:...
DataX插件开发宝典 本文面向DataX插件开发人员,尝试尽可能全面地阐述开发一个DataX插件所经过的历程,力求消除开发者的困惑,让插件开发变得简单。 一、开发之前 路走对了,就不怕远。✓ 路走远了,就不管对不对。✕ 当你打开这篇文档,想必已经不用在此解释什么是DataX了。那下一个问题便是: DataX为什么要使用插件机制? 从设计之初,DataX就把异构数据源同步作为自身的使命,为了应对不同数据源的差异、同时提供一致的同步原语和扩展能力,DataX自然而然地采用了框架 + 插件 的模式: 插件只需关心数据的读取或者写入本身。 而同步的共性问题,比如:类型转换、性能、统计,则交由框架来处理。 作为插件开发人员,则需要关注两个问题: 数据源本身的读写数据正确性。 如何与框架沟通、合理正确地使用框架。 开工前需要想明白的问题 就插件本身而言,希望在您动手coding之前,能够回答我们列举的这些问题,不然路走远了发现没走对,就尴尬了。 二、插件视角看框架 逻辑执行模型 插件开发者不用关心太多,基本只需要关注特定系统读和写,以及自己的代码在逻辑上是怎样被执行的,哪一个方法是在什么时候被调用的。在此之前,需要明确以下概念: Job: Job是DataX用以描述从一个源头到一个目的端的同步作业,是DataX数据同步的最小业务单元。比如:从一张mysql的表同步到odps的一个表的特定分区。 Task: Task是为最大化而把Job拆分得到的最小执行单元。比如:读一张有1024个分表的mysql分库分表的Job,拆分成1024个读Task,用若干个并发执行。 TaskGroup: 描述的是一组Task集合。在同一个TaskGroupContainer执行下的Task集合称之为TaskGroup JobContainer: Job执行器,负责Job全局拆分、调度、前置语句和后置语句等工作的工作单元。类似Yarn中的JobTracker TaskGroupContainer: TaskGroup执行器,负责执行一组Task的工作单元,类似Yarn中的TaskTracker。 简而言之, Job拆分成Task,在分别在框架提供的容器中执行,插件只需要实现Job和Task两部分逻辑。 物理执行模型 框架为插件提供物理上的执行能力(线程)。DataX框架有三种运行模式: Standalone: 单进程运行,没有外部依赖。 Local: 单进程运行,统计信息、错误信息汇报到集中存储。 Distrubuted: 分布式多进程运行,依赖DataX Service服务。 当然,上述三种模式对插件的编写而言没有什么区别,你只需要避开一些小错误,插件就能够在单机/分布式之间无缝切换了。 当JobContainer和TaskGroupContainer运行在同一个进程内时,就是单机模式(Standalone和Local);当它们分布在不同的进程中执行时,就是分布式(Distributed)模式。 是不是很简单? 编程接口 那么,Job和Task的逻辑应是怎么对应到具体的代码中的? 首先,插件的入口类必须扩展Reader或Writer抽象类,并且实现分别实现Job和Task两个内部抽象类,Job和Task的实现必须是 内部类 的形式,原因见 加载原理 一节。以Reader为例: public class SomeReader extends Reader { public static class Job extends Reader.Job { @Override public void init() { } @Override public void prepare() { } @Override public List<Configuration> split(int adviceNumber) { return null; } @Override public void post() { } @Override public void destroy() { } } public static class Task extends Reader....
DrdsReader 插件文档 1 快速介绍 DrdsReader插件实现了从DRDS(分布式RDS)读取数据。在底层实现上,DrdsReader通过JDBC连接远程DRDS数据库,并执行相应的sql语句将数据从DRDS库中SELECT出来。 DRDS的插件目前DataX只适配了Mysql引擎的场景,DRDS对于DataX而言,就是一套分布式Mysql数据库,并且大部分通信协议遵守Mysql使用场景。 2 实现原理 简而言之,DrdsReader通过JDBC连接器连接到远程的DRDS数据库,并根据用户配置的信息生成查询SELECT SQL语句并发送到远程DRDS数据库,并将该SQL执行返回结果使用DataX自定义的数据类型拼装为抽象的数据集,并传递给下游Writer处理。 对于用户配置Table、Column、Where的信息,DrdsReader将其拼接为SQL语句发送到DRDS数据库。不同于普通的Mysql数据库,DRDS作为分布式数据库系统,无法适配所有Mysql的协议,包括复杂的Join等语句,DRDS暂时无法支持。 3 功能说明 3.1 配置样例 配置一个从DRDS数据库同步抽取数据到本地的作业: { "job": { "setting": { "speed": { //设置传输速度,单位为byte/s,DataX运行会尽可能达到该速度但是不超过它. "byte": 1048576 } //出错限制 "errorLimit": { //出错的record条数上限,当大于该值即报错。 "record": 0, //出错的record百分比上限 1.0表示100%,0.02表示2% "percentage": 0.02 } }, "content": [ { "reader": { "name": "drdsReader", "parameter": { // 数据库连接用户名 "username": "root", // 数据库连接密码 "password": "root", "column": [ "id","name" ], "connection": [ { "table": [ "table" ], "jdbcUrl": [ "jdbc:mysql://127.0.0.1:3306/database" ] } ] } }, "writer": { //writer类型 "name": "streamwriter", //是否打印内容 "parameter": { "print":true, } } } ] } } 配置一个自定义SQL的数据库同步任务到本地内容的作业: { "job": { "setting": { }, "content": [ { "reader": { "name": "drdsreader", "parameter": { "username": "root", "password": "root", "where": "", "connection": [ { "querySql": [ "select db_id,on_line_flag from db_info where db_id < 10;" ], "jdbcUrl": [ "jdbc:drds://localhost:3306/database"] } ] } }, "writer": { "name": "streamwriter", "parameter": { "print": false, "encoding": "UTF-8" } } } ] } } 3....
Hbase094XReader & Hbase11XReader 插件文档 1 快速介绍 HbaseReader 插件实现了从 Hbase中读取数据。在底层实现上,HbaseReader 通过 HBase 的 Java 客户端连接远程 HBase 服务,并通过 Scan 方式读取你指定 rowkey 范围内的数据,并将读取的数据使用 DataX 自定义的数据类型拼装为抽象的数据集,并传递给下游 Writer 处理。 1.1支持的功能 1、目前HbaseReader支持的Hbase版本有:Hbase0.94.x和Hbase1.1.x。 若您的hbase版本为Hbase0.94.x,reader端的插件请选择:hbase094xreader,即: "reader": { "name": "hbase094xreader" } 若您的hbase版本为Hbase1.1.x,reader端的插件请选择:hbase11xreader,即: "reader": { "name": "hbase11xreader" } 2、目前HbaseReader支持两模式读取:normal 模式、multiVersionFixedColumn模式; normal 模式:把HBase中的表,当成普通二维表(横表)进行读取,读取最新版本数据。如: hbase(main):017:0> scan ‘users’ ROW COLUMN+CELL lisi column=address:city, timestamp=1457101972764, value=beijing lisi column=address:contry, timestamp=1457102773908, value=china lisi column=address:province, timestamp=1457101972736, value=beijing lisi column=info:age, timestamp=1457101972548, value=27 lisi column=info:birthday, timestamp=1457101972604, value=1987-06-17 lisi column=info:company, timestamp=1457101972653, value=baidu xiaoming column=address:city, timestamp=1457082196082, value=hangzhou xiaoming column=address:contry, timestamp=1457082195729, value=china xiaoming column=address:province, timestamp=1457082195773, value=zhejiang xiaoming column=info:age, timestamp=1457082218735, value=29 xiaoming column=info:birthday, timestamp=1457082186830, value=1987-06-17 xiaoming column=info:company, timestamp=1457082189826, value=alibaba 2 row(s) in 0.0580 seconds...
Hbase094XReader & Hbase11XReader 插件文档 1 快速介绍 HbaseReader 插件实现了从 Hbase中读取数据。在底层实现上,HbaseReader 通过 HBase 的 Java 客户端连接远程 HBase 服务,并通过 Scan 方式读取你指定 rowkey 范围内的数据,并将读取的数据使用 DataX 自定义的数据类型拼装为抽象的数据集,并传递给下游 Writer 处理。 1.1支持的功能 1、目前HbaseReader支持的Hbase版本有:Hbase0.94.x和Hbase1.1.x。 若您的hbase版本为Hbase0.94.x,reader端的插件请选择:hbase094xreader,即: "reader": { "name": "hbase094xreader" } 若您的hbase版本为Hbase1.1.x,reader端的插件请选择:hbase11xreader,即: "reader": { "name": "hbase11xreader" } 2、目前HbaseReader支持两模式读取:normal 模式、multiVersionFixedColumn模式; normal 模式:把HBase中的表,当成普通二维表(横表)进行读取,读取最新版本数据。如: hbase(main):017:0> scan ‘users’ ROW COLUMN+CELL lisi column=address:city, timestamp=1457101972764, value=beijing lisi column=address:contry, timestamp=1457102773908, value=china lisi column=address:province, timestamp=1457101972736, value=beijing lisi column=info:age, timestamp=1457101972548, value=27 lisi column=info:birthday, timestamp=1457101972604, value=1987-06-17 lisi column=info:company, timestamp=1457101972653, value=baidu xiaoming column=address:city, timestamp=1457082196082, value=hangzhou xiaoming column=address:contry, timestamp=1457082195729, value=china xiaoming column=address:province, timestamp=1457082195773, value=zhejiang xiaoming column=info:age, timestamp=1457082218735, value=29 xiaoming column=info:birthday, timestamp=1457082186830, value=1987-06-17 xiaoming column=info:company, timestamp=1457082189826, value=alibaba 2 row(s) in 0.0580 seconds...
Hbase094XWriter & Hbase11XWriter 插件文档 1 快速介绍 HbaseWriter 插件实现了从向Hbase中写取数据。在底层实现上,HbaseWriter 通过 HBase 的 Java 客户端连接远程 HBase 服务,并通过 put 方式写入Hbase。 1.1支持功能 1、目前HbaseWriter支持的Hbase版本有:Hbase0.94.x和Hbase1.1.x。 若您的hbase版本为Hbase0.94.x,writer端的插件请选择:hbase094xwriter,即: "writer": { "name": "hbase094xwriter" } 若您的hbase版本为Hbase1.1.x,writer端的插件请选择:hbase11xwriter,即: "writer": { "name": "hbase11xwriter" } 2、目前HbaseWriter支持源端多个字段拼接作为hbase 表的 rowkey,具体配置参考:rowkeyColumn配置; 3、写入hbase的时间戳(版本)支持:用当前时间作为版本,指定源端列作为版本,指定一个时间 三种方式作为版本; 4、HbaseWriter中有一个必填配置项是:hbaseConfig,需要你联系 HBase PE,将hbase-site.xml 中与连接 HBase 相关的配置项提取出来,以 json 格式填入,同时可以补充更多HBase client的配置来优化与服务器的交互。 如:hbase-site.xml的配置内容如下 <configuration> <property> <name>hbase.rootdir</name> <value>hdfs://ip:9000/hbase</value> </property> <property> <name>hbase.cluster.distributed</name> <value>true</value> </property> <property> <name>hbase.zookeeper.quorum</name> <value>***</value> </property> </configuration> 转换后的json为: "hbaseConfig": { "hbase.rootdir": "hdfs: //ip: 9000/hbase", "hbase.cluster.distributed": "true", "hbase.zookeeper.quorum": "***" } 1.2 限制 1、目前只支持源端为横表写入,不支持竖表(源端读出的为四元组: rowKey,family:qualifier,timestamp,value)模式的数据写入;本期目标主要是替换DataX2中的habsewriter,下次迭代考虑支持。 2、目前不支持写入hbase前清空表数据,若需要清空数据请联系HBase PE 2 实现原理 简而言之,HbaseWriter 通过 HBase 的 Java 客户端,通过 HTable, Put等 API,将从上游Reader读取的数据写入HBase你hbase11xwriter与hbase094xwriter的主要不同在于API的调用不同,Hbase1.1.x废弃了很多Hbase0.94.x的api。 3 功能说明 3.1 配置样例 配置一个从本地写入hbase1.1.x的作业: { "job": { "setting": { "speed": { "channel": 5 } }, "content": [ { "reader": { "name": "txtfilereader", "parameter": { "path": "/Users/shf/workplace/datax_test/hbase11xwriter/txt/normal....
Hbase094XWriter & Hbase11XWriter 插件文档 1 快速介绍 HbaseWriter 插件实现了从向Hbase中写取数据。在底层实现上,HbaseWriter 通过 HBase 的 Java 客户端连接远程 HBase 服务,并通过 put 方式写入Hbase。 1.1支持功能 1、目前HbaseWriter支持的Hbase版本有:Hbase0.94.x和Hbase1.1.x。 若您的hbase版本为Hbase0.94.x,writer端的插件请选择:hbase094xwriter,即: "writer": { "name": "hbase094xwriter" } 若您的hbase版本为Hbase1.1.x,writer端的插件请选择:hbase11xwriter,即: "writer": { "name": "hbase11xwriter" } 2、目前HbaseWriter支持源端多个字段拼接作为hbase 表的 rowkey,具体配置参考:rowkeyColumn配置; 3、写入hbase的时间戳(版本)支持:用当前时间作为版本,指定源端列作为版本,指定一个时间 三种方式作为版本; 1.2 限制 1、目前只支持源端为横表写入,不支持竖表(源端读出的为四元组: rowKey,family:qualifier,timestamp,value)模式的数据写入;本期目标主要是替换DataX2中的habsewriter,下次迭代考虑支持。 2、目前不支持写入hbase前清空表数据,若需要清空数据请联系HBase PE 2 实现原理 简而言之,HbaseWriter 通过 HBase 的 Java 客户端,通过 HTable, Put等 API,将从上游Reader读取的数据写入HBase你hbase11xwriter与hbase094xwriter的主要不同在于API的调用不同,Hbase1.1.x废弃了很多Hbase0.94.x的api。 3 功能说明 3.1 配置样例 配置一个从本地写入hbase1.1.x的作业: { "job": { "setting": { "speed": { "channel": 5 } }, "content": [ { "reader": { "name": "txtfilereader", "parameter": { "path": "/Users/shf/workplace/datax_test/hbase11xwriter/txt/normal.txt", "charset": "UTF-8", "column": [ { "index": 0, "type": "String" }, { "index": 1, "type": "string" }, { "index": 2, "type": "string" }, { "index": 3, "type": "string" }, { "index": 4, "type": "string" }, { "index": 5, "type": "string" }, { "index": 6, "type": "string" } ], "fieldDelimiter": "," } }, "writer": { "name": "hbase11xwriter", "parameter": { "hbaseConfig": { "hbase....
hbase11xsqlreader 插件文档 1 快速介绍 hbase11xsqlreader插件实现了从Phoenix(HBase SQL)读取数据。在底层实现上,hbase11xsqlreader通过Phoenix客户端去连接远程的HBase集群,并执行相应的sql语句将数据从Phoenix库中SELECT出来。 2 实现原理 简而言之,hbase11xsqlreader通过Phoenix客户端去连接远程的HBase集群,并根据用户配置的信息生成查询SELECT 语句,然后发送到HBase集群,并将返回结果使用DataX自定义的数据类型拼装为抽象的数据集,并传递给下游Writer处理。 hbase11xsqlreader 插件文档 1 快速介绍 hbase11xsqlreader插件实现了从Phoenix(HBase SQL)读取数据。在底层实现上,hbase11xsqlreader通过Phoenix客户端去连接远程的HBase集群,并执行相应的sql语句将数据从Phoenix库中SELECT出来。 2 实现原理 简而言之,hbase11xsqlreader通过Phoenix客户端去连接远程的HBase集群,并根据用户配置的信息生成查询SELECT 语句,然后发送到HBase集群,并将返回结果使用DataX自定义的数据类型拼装为抽象的数据集,并传递给下游Writer处理。 3 功能说明 3.1 配置样例 配置一个从Phoenix同步抽取数据到本地的作业: { "job": { "setting": { "speed": { //设置传输速度,单位为byte/s,DataX运行会尽可能达到该速度但是不超过它. "byte":10485760 }, //出错限制 "errorLimit": { //出错的record条数上限,当大于该值即报错。 "record": 0, //出错的record百分比上限 1.0表示100%,0.02表示2% "percentage": 0.02 } }, "content": [ { "reader": { //指定插件为hbase11xsqlreader "name": "hbase11xsqlreader", "parameter": { //填写连接Phoenix的hbase集群zk地址 "hbaseConfig": { "hbase.zookeeper.quorum": "hb-proxy-xxx-002.hbase.rds.aliyuncs.com,hb-proxy-xxx-001.hbase.rds.aliyuncs.com,hb-proxy-xxx-003.hbase.rds.aliyuncs.com" }, //填写要读取的phoenix的表名 "table": "US_POPULATION", //填写要读取的列名,不填读取所有列 "column": [ ] } }, "writer": { //writer类型 "name": "streamwriter", //是否打印内容 "parameter": { "print":true, "encoding": "UTF-8" } } } ] } } 3.2 参数说明 hbaseConfig 描述:hbase11xsqlreader需要通过Phoenix客户端去连接hbase集群,因此这里需要填写对应hbase集群的zkurl地址,注意不要添加2181。 必选:是 默认值:无 table 描述:编写Phoenix中的表名,如果有namespace,该值设置为’namespace.tablename' 必选:是 默认值:无 column...
HBase11xsqlwriter插件文档 1. 快速介绍 HBase11xsqlwriter实现了向hbase中的SQL表(phoenix)批量导入数据的功能。Phoenix因为对rowkey做了数据编码,所以,直接使用HBaseAPI进行写入会面临手工数据转换的问题,麻烦且易错。本插件提供了单间的SQL表的数据导入方式。 在底层实现上,通过Phoenix的JDBC驱动,执行UPSERT语句向hbase写入数据。 1.1 支持的功能 支持带索引的表的数据导入,可以同步更新所有的索引表 1.2 限制 仅支持1.x系列的hbase 仅支持通过phoenix创建的表,不支持原生HBase表 不支持带时间戳的数据导入 2. 实现原理 通过Phoenix的JDBC驱动,执行UPSERT语句向表中批量写入数据。因为使用上层接口,所以,可以同步更新索引表。 3. 配置说明 3.1 配置样例 { "job": { "entry": { "jvm": "-Xms2048m -Xmx2048m" }, "content": [ { "reader": { "name": "txtfilereader", "parameter": { "path": "/Users/shf/workplace/datax_test/hbase11xsqlwriter/txt/normal.txt", "charset": "UTF-8", "column": [ { "index": 0, "type": "String" }, { "index": 1, "type": "string" }, { "index": 2, "type": "string" }, { "index": 3, "type": "string" } ], "fieldDelimiter": "," } }, "writer": { "name": "hbase11xsqlwriter", "parameter": { "batchSize": "256", "column": [ "UID", "TS", "EVENTID", "CONTENT" ], "hbaseConfig": { "hbase.zookeeper.quorum": "目标hbase集群的ZK服务器地址,向PE咨询", "zookeeper.znode.parent": "目标hbase集群的znode,向PE咨询" }, "nullMode": "skip", "table": "目标hbase表名,大小写有关" } } } ], "setting": { "speed": { "channel": 5 } } } } 3....