CassandraReader 插件文档

CassandraReader 插件文档 1 快速介绍 CassandraReader插件实现了从Cassandra读取数据。在底层实现上,CassandraReader通过datastax的java driver连接Cassandra实例,并执行相应的cql语句将数据从cassandra中SELECT出来。 2 实现原理 简而言之,CassandraReader通过java driver连接到Cassandra实例,并根据用户配置的信息生成查询SELECT CQL语句,然后发送到Cassandra,并将该CQL执行返回结果使用DataX自定义的数据类型拼装为抽象的数据集,并传递给下游Writer处理。 对于用户配置Table、Column的信息,CassandraReader将其拼接为CQL语句发送到Cassandra。 3 功能说明 3.1 配置样例 配置一个从Cassandra同步抽取数据到本地的作业: { "job": { "setting": { "speed": { "channel": 3 } }, "content": [ { "reader": { "name": "cassandrareader", "parameter": { "host": "localhost", "port": 9042, "useSSL": false, "keyspace": "test", "table": "datax_src", "column": [ "textCol", "blobCol", "writetime(blobCol)", "boolCol", "smallintCol", "tinyintCol", "intCol", "bigintCol", "varintCol", "floatCol", "doubleCol", "decimalCol", "dateCol", "timeCol", "timeStampCol", "uuidCol", "inetCol", "durationCol", "listCol", "mapCol", "setCol" "tupleCol" "udtCol", ] } }, "writer": { "name": "streamwriter", "parameter": { "print":true } } } ] } } 3.2 参数说明 host 描述:Cassandra连接点的域名或ip,多个node之间用逗号分隔。 必选:是 默认值:无 port 描述:Cassandra端口。 必选:是 默认值:9042 username 描述:数据源的用户名 必选:否 默认值:无 password...

February 2, 2021

CassandraWriter 插件文档

CassandraWriter 插件文档 1 快速介绍 CassandraWriter插件实现了向Cassandra写入数据。在底层实现上,CassandraWriter通过datastax的java driver连接Cassandra实例,并执行相应的cql语句将数据写入cassandra中。 2 实现原理 简而言之,CassandraWriter通过java driver连接到Cassandra实例,并根据用户配置的信息生成INSERT CQL语句,然后发送到Cassandra。 对于用户配置Table、Column的信息,CassandraReader将其拼接为CQL语句发送到Cassandra。 3 功能说明 3.1 配置样例 配置一个从内存产生到Cassandra导入的作业: { "job": { "setting": { "speed": { "channel": 5 } }, "content": [ { "reader": { "name": "streamreader", "parameter": { "column": [ {"value":"name","type": "string"}, {"value":"false","type":"bool"}, {"value":"1988-08-08 08:08:08","type":"date"}, {"value":"addr","type":"bytes"}, {"value":1.234,"type":"double"}, {"value":12345678,"type":"long"}, {"value":2.345,"type":"double"}, {"value":3456789,"type":"long"}, {"value":"4a0ef8c0-4d97-11d0-db82-ebecdb03ffa5","type":"string"}, {"value":"value","type":"bytes"}, {"value":"-838383838,37377373,-383883838,27272772,393993939,-38383883,83883838,-1350403181,817650816,1630642337,251398784,-622020148","type":"string"}, ], "sliceRecordCount": 10000000 } }, "writer": { "name": "cassandrawriter", "parameter": { "host": "localhost", "port": 9042, "useSSL": false, "keyspace": "stresscql", "table": "dst", "batchSize":10, "column": [ "name", "choice", "date", "address", "dbl", "lval", "fval", "ival", "uid", "value", "listval" ] } } } ] } } 3.2 参数说明 host 描述:Cassandra连接点的域名或ip,多个node之间用逗号分隔。 必选:是 默认值:无 port...

February 2, 2021

DataX

DataX DataX 是阿里巴巴集团内被广泛使用的离线数据同步工具/平台,实现包括 MySQL、Oracle、SqlServer、Postgre、HDFS、Hive、ADS、HBase、TableStore(OTS)、MaxCompute(ODPS)、DRDS 等各种异构数据源之间高效的数据同步功能。 DataX 商业版本 阿里云DataWorks数据集成是DataX团队在阿里云上的商业化产品,致力于提供复杂网络环境下、丰富的异构数据源之间高速稳定的数据移动能力,以及繁杂业务背景下的数据同步解决方案。目前已经支持云上近3000家客户,单日同步数据超过3万亿条。DataWorks数据集成目前支持离线50+种数据源,可以进行整库迁移、批量上云、增量同步、分库分表等各类同步解决方案。2020年更新实时同步能力,2020年更新实时同步能力,支持10+种数据源的读写任意组合。提供MySQL,Oracle等多种数据源到阿里云MaxCompute,Hologres等大数据引擎的一键全增量同步解决方案。 https://www.aliyun.com/product/bigdata/ide Features DataX本身作为数据同步框架,将不同数据源的同步抽象为从源头数据源读取数据的Reader插件,以及向目标端写入数据的Writer插件,理论上DataX框架可以支持任意数据源类型的数据同步工作。同时DataX插件体系作为一套生态系统, 每接入一套新数据源该新加入的数据源即可实现和现有的数据源互通。 DataX详细介绍 请参考:DataX-Introduction Quick Start Download DataX下载地址 请点击:Quick Start Support Data Channels DataX目前已经有了比较全面的插件体系,主流的RDBMS数据库、NOSQL、大数据计算系统都已经接入,目前支持数据如下图,详情请点击:DataX数据源参考指南 类型 数据源 Reader(读) Writer(写) 文档 RDBMS 关系型数据库 MySQL √ √ 读 、写 Oracle √ √ 读 、写 SQLServer √ √ 读 、写 PostgreSQL √ √ 读 、写 DRDS √ √ 读 、写 通用RDBMS(支持所有关系型数据库) √ √ 读 、写 阿里云数仓数据存储 ODPS √ √ 读 、写 ADS √ 写 OSS √ √ 读 、写 OCS √ √ 读 、写 NoSQL数据存储 OTS √ √ 读 、写 Hbase0.94 √ √ 读 、写 Hbase1.1 √ √ 读 、写 Phoenix4.x √ √ 读 、写 Phoenix5....

February 2, 2021

DataX

DataX DataX 是阿里巴巴集团内被广泛使用的离线数据同步工具/平台,实现包括 MySQL、SQL Server、Oracle、PostgreSQL、HDFS、Hive、HBase、OTS、ODPS 等各种异构数据源之间高效的数据同步功能。 Features DataX本身作为数据同步框架,将不同数据源的同步抽象为从源头数据源读取数据的Reader插件,以及向目标端写入数据的Writer插件,理论上DataX框架可以支持任意数据源类型的数据同步工作。同时DataX插件体系作为一套生态系统, 每接入一套新数据源该新加入的数据源即可实现和现有的数据源互通。 System Requirements Linux JDK(1.8以上,推荐1.8) Python(推荐Python2.6.X) Apache Maven 3.x (Compile DataX) Quick Start 工具部署 方法一、直接下载DataX工具包:DataX下载地址 下载后解压至本地某个目录,进入bin目录,即可运行同步作业: $ cd {YOUR_DATAX_HOME}/bin $ python datax.py {YOUR_JOB.json} 自检脚本: python {YOUR_DATAX_HOME}/bin/datax.py {YOUR_DATAX_HOME}/job/job.json 方法二、下载DataX源码,自己编译:DataX源码 (1)、下载DataX源码: $ git clone git@github.com:alibaba/DataX.git (2)、通过maven打包: $ cd {DataX_source_code_home} $ mvn -U clean package assembly:assembly -Dmaven.test.skip=true 打包成功,日志显示如下: [INFO] BUILD SUCCESS [INFO] ----------------------------------------------------------------- [INFO] Total time: 08:12 min [INFO] Finished at: 2015-12-13T16:26:48+08:00 [INFO] Final Memory: 133M/960M [INFO] ----------------------------------------------------------------- 打包成功后的DataX包位于 {DataX_source_code_home}/target/datax/datax/ ,结构如下: $ cd {DataX_source_code_home} $ ls ./target/datax/datax/ bin conf job lib log log_perf plugin script tmp 配置示例:从stream读取数据并打印到控制台 第一步、创建作业的配置文件(json格式) 可以通过命令查看配置模板: python datax.py -r {YOUR_READER} -w {YOUR_WRITER}...

February 2, 2021

datax 3.0 教程

DataXDataX ADB PG WriterDataX ADS写入CassandraReader 插件文档CassandraWriter 插件文档Readme.mdDataX插件开发宝典DrdsReader 插件文档DataX DRDSWriterREADME.mdDataX ElasticSearchWriterDataX FtpReader 说明DataX FtpWriter 说明DataX GDBReaderDataX GDBWriterHbase094XReader & Hbase11XReader 插件文档Hbase094XWriter & Hbase11XWriter 插件文档Hbase094XReader & Hbase11XReader 插件文档hbase11xsqlreader 插件文档HBase11xsqlwriter插件文档Hbase094XWriter & Hbase11XWriter 插件文档hbase20xsqlreader 插件文档HBase20xsqlwriter插件文档DataX HdfsReader 插件文档DataX HdfsWriter 插件文档阿里云开源离线同步工具DataX3.0介绍KingbaseesReader 插件文档DataX KingbaseesWriterdatax-kudu-plugindatax-kudu-pluginsDatax MongoDBReaderDatax MongoDBWriterMysqlReader 插件文档DataX MysqlWriterDataX OCSWriter 适用memcached客户端写入ocsDataX ODPSReaderDataX ODPS写入OpenTSDBReader 插件文档OracleReader 插件文档DataX OracleWriterDataX OSSReader 说明DataX OSSWriter 说明OTSReader 插件文档TableStore增量数据导出通道:TableStoreStreamReaderOTSWriter 插件文档PostgresqlReader 插件文档DataX PostgresqlWriterRDBMSReader 插件文档RDBMSWriter 插件文档SqlServerReader 插件文档DataX SqlServerWriterDataX TransformerTSDBReader 插件文档TSDBWriter 插件文档DataX TxtFileReader 说明DataX TxtFileWriter 说明DataX

February 2, 2021

DataX ADB PG Writer

DataX ADB PG Writer 1 快速介绍 AdbpgWriter 插件实现了写入数据到 ABD PG版数据库的功能。在底层实现上,AdbpgWriter 插件会先缓存需要写入的数据,当缓存的 数据量达到 commitSize 时,插件会通过 JDBC 连接远程 ADB PG版 数据库,并执行 COPY 命令将数据写入 ADB PG 数据库。 AdbpgWriter 可以作为数据迁移工具为用户提供服务。 2 实现原理 AdbpgWriter 通过 DataX 框架获取 Reader 生成的协议数据,首先会将数据缓存,当缓存的数据量达到commitSize时,插件根据你配置生成相应的COPY语句,执行 COPY命令将数据写入ADB PG数据库中。 3 功能说明 3.1 配置样例 这里使用一份从内存产生到 AdbpgWriter导入的数据 { "job": { "setting": { "speed": { "channel": 32 } }, "content": [ { "reader": { "name": "streamreader", "parameter": { "column" : [ { "value": "DataX", "type": "string" }, { "value": 19880808, "type": "long" }, { "value": "1988-08-08 08:08:08", "type": "date" }, { "value": true, "type": "bool" }, { "value": "test", "type": "bytes" } ] }, "sliceRecordCount": 1000 }, "writer": { "name": "adbpgwriter", "parameter": { "username": "username", "password": "password", "host": "host", "port": "1234", "database": "database", "schema": "schema", "table": "table", "preSql": ["delete * from table"], "postSql": ["select * from table"], "column": ["*"] } } } ] } } 3....

February 2, 2021

DataX ADS写入

DataX ADS写入 1 快速介绍 欢迎ADS加入DataX生态圈!ADSWriter插件实现了其他数据源向ADS写入功能,现有DataX所有的数据源均可以无缝接入ADS,实现数据快速导入ADS。 ADS写入预计支持两种实现方式: ADSWriter 支持向ODPS中转落地导入ADS方式,优点在于当数据量较大时(>1KW),可以以较快速度进行导入,缺点引入了ODPS作为落地中转,因此牵涉三方系统(DataX、ADS、ODPS)鉴权认证。 ADSWriter 同时支持向ADS直接写入的方式,优点在于小批量数据写入能够较快完成(<1KW),缺点在于大数据导入较慢。 注意: 如果从ODPS导入数据到ADS,请用户提前在源ODPS的Project中授权ADS Build账号具有读取你源表ODPS的权限,同时,ODPS源表创建人和ADS写入属于同一个阿里云账号。 如果从非ODPS导入数据到ADS,请用户提前在目的端ADS空间授权ADS Build账号具备Load data权限。 以上涉及ADS Build账号请联系ADS管理员提供。 2 实现原理 ADS写入预计支持两种实现方式: 2.1 Load模式 DataX 将数据导入ADS为当前导入任务分配的ADS项目表,随后DataX通知ADS完成数据加载。该类数据导入方式实际上是写ADS完成数据同步,由于ADS是分布式存储集群,因此该通道吞吐量较大,可以支持TB级别数据导入。 DataX底层得到明文的 jdbc://host:port/dbname + username + password + table, 以此连接ADS, 执行show grants; 前置检查该用户是否有ADS中目标表的Load Data或者更高的权限。注意,此时ADSWriter使用用户填写的ADS用户名+密码信息完成登录鉴权工作。 检查通过后,通过ADS中目标表的元数据反向生成ODPS DDL,在ODPS中间project中,以ADSWriter的账户建立ODPS表(非分区表,生命周期设为1-2Day), 并调用ODPSWriter把数据源的数据写入该ODPS表中。 注意,这里需要使用中转ODPS的账号AK向中转ODPS写入数据。 写入完成后,以中转ODPS账号连接ADS,发起Load Data From ‘odps://中转project/中转table/’ [overwrite] into adsdb.adstable [partition (xx,xx=xx)]; 这个命令返回一个Job ID需要记录。 注意,此时ADS使用自己的Build账号访问中转ODPS,因此需要中转ODPS对这个Build账号提前开放读取权限。 连接ADS一分钟一次轮询执行 select state from information_schema.job_instances where job_id like ‘$Job ID’,查询状态,注意这个第一个一分钟可能查不到状态记录。 Success或者Fail后返回给用户,然后删除中转ODPS表,任务结束。 上述流程是从其他非ODPS数据源导入ADS流程,对于ODPS导入ADS流程使用如下流程: 2.2 Insert模式 DataX 将数据直连ADS接口,利用ADS暴露的INSERT接口直写到ADS。该类数据导入方式写入吞吐量较小,不适合大批量数据写入。有如下注意点: ADSWriter使用JDBC连接直连ADS,并只使用了JDBC Statement进行数据插入。ADS不支持PreparedStatement,故ADSWriter只能单行多线程进行写入。 ADSWriter支持筛选部分列,列换序等功能,即用户可以填写列。 考虑到ADS负载问题,建议ADSWriter Insert模式建议用户使用TPS限流,最高在1W TPS。 ADSWriter在所有Task完成写入任务后,Job Post单例执行flush工作,保证数据在ADS整体更新。 3 功能说明 3.1 配置样例 这里使用一份从内存产生到ADS,使用Load模式进行导入的数据。 { "job": { "setting": { "speed": { "channel": 2 } }, "content": [ { "reader": { "name": "streamreader", "parameter": { "column": [ { "value": "DataX", "type": "string" }, { "value": "test", "type": "bytes" } ], "sliceRecordCount": 100000 } }, "writer": { "name": "adswriter", "parameter": { "odps": { "accessId": "xxx", "accessKey": "xxx", "account": "xxx@aliyun....

February 2, 2021

DataX DRDSWriter

DataX DRDSWriter 1 快速介绍 DRDSWriter 插件实现了写入数据到 DRDS 的目的表的功能。在底层实现上, DRDSWriter 通过 JDBC 连接远程 DRDS 数据库的 Proxy,并执行相应的 replace into … 的 sql 语句将数据写入 DRDS,特别注意执行的 Sql 语句是 replace into,为了避免数据重复写入,需要你的表具备主键或者唯一性索引(Unique Key)。 DRDSWriter 面向ETL开发工程师,他们使用 DRDSWriter 从数仓导入数据到 DRDS。同时 DRDSWriter 亦可以作为数据迁移工具为DBA等用户提供服务。 2 实现原理 DRDSWriter 通过 DataX 框架获取 Reader 生成的协议数据,通过 replace into...(没有遇到主键/唯一性索引冲突时,与 insert into 行为一致,冲突时会用新行替换原有行所有字段) 的语句写入数据到 DRDS。DRDSWriter 累积一定数据,提交给 DRDS 的 Proxy,该 Proxy 内部决定数据是写入一张还是多张表以及多张表写入时如何路由数据。 注意:整个任务至少需要具备 replace into...的权限,是否需要其他权限,取决于你任务配置中在 preSql 和 postSql 中指定的语句。 3 功能说明 3.1 配置样例 这里使用一份从内存产生到 DRDS 导入的数据。 { "job": { "setting": { "speed": { "channel": 1 } }, "content": [ { "reader": { "name": "streamreader", "parameter": { "column" : [ { "value": "DataX", "type": "string" }, { "value": 19880808, "type": "long" }, { "value": "1988-08-08 08:08:08", "type": "date" }, { "value": true, "type": "bool" }, { "value": "test", "type": "bytes" } ], "sliceRecordCount": 1000 } }, "writer": { "name": "drdswriter", "parameter": { "writeMode": "insert", "username": "root", "password": "root", "column": [ "id", "name" ], "preSql": [ "delete from test" ], "connection": [ { "jdbcUrl": "jdbc:mysql://127....

February 2, 2021

DataX ElasticSearchWriter

DataX ElasticSearchWriter 1 快速介绍 数据导入elasticsearch的插件 2 实现原理 使用elasticsearch的rest api接口, 批量把从reader读入的数据写入elasticsearch 3 功能说明 3.1 配置样例 job.json { "job": { "setting": { "speed": { "channel": 1 } }, "content": [ { "reader": { ... }, "writer": { "name": "elasticsearchwriter", "parameter": { "endpoint": "http://xxx:9999", "accessId": "xxxx", "accessKey": "xxxx", "index": "test-1", "type": "default", "cleanup": true, "settings": {"index" :{"number_of_shards": 1, "number_of_replicas": 0}}, "discovery": false, "batchSize": 1000, "splitter": ",", "column": [ {"name": "pk", "type": "id"}, { "name": "col_ip","type": "ip" }, { "name": "col_double","type": "double" }, { "name": "col_long","type": "long" }, { "name": "col_integer","type": "integer" }, { "name": "col_keyword", "type": "keyword" }, { "name": "col_text", "type": "text", "analyzer": "ik_max_word"}, { "name": "col_geo_point", "type": "geo_point" }, { "name": "col_date", "type": "date", "format": "yyyy-MM-dd HH:mm:ss"}, { "name": "col_nested1", "type": "nested" }, { "name": "col_nested2", "type": "nested" }, { "name": "col_object1", "type": "object" }, { "name": "col_object2", "type": "object" }, { "name": "col_integer_array", "type":"integer", "array":true}, { "name": "col_geo_shape", "type":"geo_shape", "tree": "quadtree", "precision": "10m"} ] } } } ] } } 3....

February 2, 2021

DataX FtpReader 说明

DataX FtpReader 说明 1 快速介绍 FtpReader提供了读取远程FTP文件系统数据存储的能力。在底层实现上,FtpReader获取远程FTP文件数据,并转换为DataX传输协议传递给Writer。 本地文件内容存放的是一张逻辑意义上的二维表,例如CSV格式的文本信息。 2 功能与限制 FtpReader实现了从远程FTP文件读取数据并转为DataX协议的功能,远程FTP文件本身是无结构化数据存储,对于DataX而言,FtpReader实现上类比TxtFileReader,有诸多相似之处。目前FtpReader支持功能如下: 支持且仅支持读取TXT的文件,且要求TXT中shema为一张二维表。 支持类CSV格式文件,自定义分隔符。 支持多种类型数据读取(使用String表示),支持列裁剪,支持列常量 支持递归读取、支持文件名过滤。 支持文本压缩,现有压缩格式为zip、gzip、bzip2。 多个File可以支持并发读取。 我们暂时不能做到: 单个File支持多线程并发读取,这里涉及到单个File内部切分算法。二期考虑支持。 单个File在压缩情况下,从技术上无法支持多线程并发读取。 3 功能说明 3.1 配置样例 { "setting": {}, "job": { "setting": { "speed": { "channel": 2 } }, "content": [ { "reader": { "name": "ftpreader", "parameter": { "protocol": "sftp", "host": "127.0.0.1", "port": 22, "username": "xx", "password": "xxx", "path": [ "/home/hanfa.shf/ftpReaderTest/data" ], "column": [ { "index": 0, "type": "long" }, { "index": 1, "type": "boolean" }, { "index": 2, "type": "double" }, { "index": 3, "type": "string" }, { "index": 4, "type": "date", "format": "yyyy.MM.dd" } ], "encoding": "UTF-8", "fieldDelimiter": "," } }, "writer": { "name": "ftpWriter", "parameter": { "path": "/home/hanfa....

February 2, 2021