DataX FtpWriter 说明

DataX FtpWriter 说明 1 快速介绍 FtpWriter提供了向远程FTP文件写入CSV格式的一个或者多个文件,在底层实现上,FtpWriter将DataX传输协议下的数据转换为csv格式,并使用FTP相关的网络协议写出到远程FTP服务器。 写入FTP文件内容存放的是一张逻辑意义上的二维表,例如CSV格式的文本信息。 2 功能与限制 FtpWriter实现了从DataX协议转为FTP文件功能,FTP文件本身是无结构化数据存储,FtpWriter如下几个方面约定: 支持且仅支持写入文本类型(不支持BLOB如视频数据)的文件,且要求文本中shema为一张二维表。 支持类CSV格式文件,自定义分隔符。 写出时不支持文本压缩。 支持多线程写入,每个线程写入不同子文件。 我们不能做到: 单个文件不能支持并发写入。 3 功能说明 3.1 配置样例 { "setting": {}, "job": { "setting": { "speed": { "channel": 2 } }, "content": [ { "reader": {}, "writer": { "name": "ftpwriter", "parameter": { "protocol": "sftp", "host": "***", "port": 22, "username": "xxx", "password": "xxx", "timeout": "60000", "connectPattern": "PASV", "path": "/tmp/data/", "fileName": "yixiao", "writeMode": "truncate|append|nonConflict", "fieldDelimiter": ",", "encoding": "UTF-8", "nullFormat": "null", "dateFormat": "yyyy-MM-dd", "fileFormat": "csv", "suffix": ".csv", "header": [] } } } ] } } 3.2 参数说明 protocol 描述:ftp服务器协议,目前支持传输协议有ftp和sftp。 必选:是 默认值:无 host 描述:ftp服务器地址。 必选:是 默认值:无 port 描述:ftp服务器端口。 必选:否 默认值:若传输协议是sftp协议,默认值是22;若传输协议是标准ftp协议,默认值是21 timeout 描述:连接ftp服务器连接超时时间,单位毫秒。 必选:否 默认值:60000(1分钟)...

February 2, 2021

DataX GDBReader

DataX GDBReader 1. 快速介绍 GDBReader插件实现读取GDB实例数据的功能,通过Gremlin Client连接远程GDB实例,按配置提供的label生成查询DSL,遍历点或边数据,包括属性数据,并将数据写入到Record中给到Writer使用。 2. 实现原理 GDBReader使用Gremlin Client连接GDB实例,按label分不同Task取点或边数据。 单个Task中按label遍历点或边的id,再切分范围分多次请求查询点或边和属性数据,最后将点或边数据根据配置转换成指定格式记录发送给下游写插件。 GDBReader按label切分多个Task并发,同一个label的数据批量异步获取来加快读取速度。如果配置读取的label列表为空,任务启动前会从GDB查询所有label再切分Task。 3. 功能说明 GDB中点和边不同,读取需要区分点和边点配置。 3.1 点配置样例 { "job": { "setting": { "speed": { "channel": 1 } "errorLimit": { "record": 1 } }, "content": [ { "reader": { "name": "gdbreader", "parameter": { "host": "10.218.145.24", "port": 8182, "username": "***", "password": "***", "fetchBatchSize": 100, "rangeSplitSize": 1000, "labelType": "VERTEX", "labels": ["label1", "label2"], "column": [ { "name": "id", "type": "string", "columnType": "primaryKey" }, { "name": "label", "type": "string", "columnType": "primaryLabel" }, { "name": "age", "type": "int", "columnType": "vertexProperty" } ] } }, "writer": { "name": "streamwriter", "parameter": { "print": true } } } ] } } 3....

February 2, 2021

DataX GDBWriter

DataX GDBWriter 1 快速介绍 GDBWriter插件实现了写入数据到GDB实例的功能。GDBWriter通过Gremlin Client连接远程GDB实例,获取Reader的数据,生成写入DSL语句,将数据写入到GDB。 2 实现原理 GDBWriter通过DataX框架获取Reader生成的协议数据,使用g.addV/E(GDB___label).property(id, GDB___id).property(GDB___PK1, GDB___PV1)...语句写入数据到GDB实例。 可以配置Gremlin Client工作在session模式,由客户端控制事务,在一次事务中实现多个记录的批量写入。 3 功能说明 因为GDB中点和边的配置不同,导入时需要区分点和边的配置。 3.1 点配置样例 这里是一份从内存生成点数据导入GDB实例的配置 { "job": { "setting": { "speed": { "channel": 1 } }, "content": [ { "reader": { "name": "streamreader", "parameter": { "column" : [ { "random": "1,100", "type": "double" }, { "random": "1000,1200", "type": "long" }, { "random": "60,64", "type": "string" }, { "random": "100,1000", "type": "long" }, { "random": "32,48", "type": "string" } ], "sliceRecordCount": 1000 } }, "writer": { "name": "gdbwriter", "parameter": { "host": "gdb-endpoint", "port": 8182, "username": "root", "password": "***", "writeMode": "INSERT", "labelType": "VERTEX", "label": "#{1}", "idTransRule": "none", "session": true, "maxRecordsInBatch": 64, "column": [ { "name": "id", "value": "#{0}", "type": "string", "columnType": "primaryKey" }, { "name": "vertex_propKey", "value": "#{2}", "type": "string", "columnType": "vertexSetProperty" }, { "name": "vertex_propKey", "value": "#{3}", "type": "long", "columnType": "vertexSetProperty" }, { "name": "vertex_propKey2", "value": "#{4}", "type": "string", "columnType": "vertexProperty" } ] } } } ] } } 3....

February 2, 2021

DataX HdfsReader 插件文档

DataX HdfsReader 插件文档 1 快速介绍 HdfsReader提供了读取分布式文件系统数据存储的能力。在底层实现上,HdfsReader获取分布式文件系统上文件的数据,并转换为DataX传输协议传递给Writer。 目前HdfsReader支持的文件格式有textfile(text)、orcfile(orc)、rcfile(rc)、sequence file(seq)和普通逻辑二维表(csv)类型格式的文件,且文件内容存放的必须是一张逻辑意义上的二维表。 HdfsReader需要Jdk1.7及以上版本的支持。 2 功能与限制 HdfsReader实现了从Hadoop分布式文件系统Hdfs中读取文件数据并转为DataX协议的功能。textfile是Hive建表时默认使用的存储格式,数据不做压缩,本质上textfile就是以文本的形式将数据存放在hdfs中,对于DataX而言,HdfsReader实现上类比TxtFileReader,有诸多相似之处。orcfile,它的全名是Optimized Row Columnar file,是对RCFile做了优化。据官方文档介绍,这种文件格式可以提供一种高效的方法来存储Hive数据。HdfsReader利用Hive提供的OrcSerde类,读取解析orcfile文件的数据。目前HdfsReader支持的功能如下: 支持textfile、orcfile、rcfile、sequence file和csv格式的文件,且要求文件内容存放的是一张逻辑意义上的二维表。 支持多种类型数据读取(使用String表示),支持列裁剪,支持列常量 支持递归读取、支持正则表达式("*“和”?")。 支持orcfile数据压缩,目前支持SNAPPY,ZLIB两种压缩方式。 多个File可以支持并发读取。 支持sequence file数据压缩,目前支持lzo压缩方式。 csv类型支持压缩格式有:gzip、bz2、zip、lzo、lzo_deflate、snappy。 目前插件中Hive版本为1.1.1,Hadoop版本为2.7.1(Apache[为适配JDK1.7],在Hadoop 2.5.0, Hadoop 2.6.0 和Hive 1.2.0测试环境中写入正常;其它版本需后期进一步测试; 支持kerberos认证(注意:如果用户需要进行kerberos认证,那么用户使用的Hadoop集群版本需要和hdfsreader的Hadoop版本保持一致,如果高于hdfsreader的Hadoop版本,不保证kerberos认证有效) 我们暂时不能做到: 单个File支持多线程并发读取,这里涉及到单个File内部切分算法。二期考虑支持。 目前还不支持hdfs HA; 3 功能说明 3.1 配置样例 { "job": { "setting": { "speed": { "channel": 3 } }, "content": [ { "reader": { "name": "hdfsreader", "parameter": { "path": "/user/hive/warehouse/mytable01/*", "defaultFS": "hdfs://xxx:port", "column": [ { "index": 0, "type": "long" }, { "index": 1, "type": "boolean" }, { "type": "string", "value": "hello" }, { "index": 2, "type": "double" } ], "fileType": "orc", "encoding": "UTF-8", "fieldDelimiter": "," } }, "writer": { "name": "streamwriter", "parameter": { "print": true } } } ] } } 3....

February 2, 2021

DataX HdfsWriter 插件文档

DataX HdfsWriter 插件文档 1 快速介绍 HdfsWriter提供向HDFS文件系统指定路径中写入TEXTFile文件和ORCFile文件,文件内容可与hive中表关联。 2 功能与限制 (1)、目前HdfsWriter仅支持textfile和orcfile两种格式的文件,且文件内容存放的必须是一张逻辑意义上的二维表; (2)、由于HDFS是文件系统,不存在schema的概念,因此不支持对部分列写入; (3)、目前仅支持与以下Hive数据类型: 数值型:TINYINT,SMALLINT,INT,BIGINT,FLOAT,DOUBLE 字符串类型:STRING,VARCHAR,CHAR 布尔类型:BOOLEAN 时间类型:DATE,TIMESTAMP 目前不支持:decimal、binary、arrays、maps、structs、union类型; (4)、对于Hive分区表目前仅支持一次写入单个分区; (5)、对于textfile需用户保证写入hdfs文件的分隔符与在Hive上创建表时的分隔符一致,从而实现写入hdfs数据与Hive表字段关联; (6)、HdfsWriter实现过程是:首先根据用户指定的path,创建一个hdfs文件系统上不存在的临时目录,创建规则:path_随机;然后将读取的文件写入这个临时目录;全部写入后再将这个临时目录下的文件移动到用户指定目录(在创建文件时保证文件名不重复); 最后删除临时目录。如果在中间过程发生网络中断等情况造成无法与hdfs建立连接,需要用户手动删除已经写入的文件和临时目录。 (7)、目前插件中Hive版本为1.1.1,Hadoop版本为2.7.1(Apache[为适配JDK1.7],在Hadoop 2.5.0, Hadoop 2.6.0 和Hive 1.2.0测试环境中写入正常;其它版本需后期进一步测试; (8)、目前HdfsWriter支持Kerberos认证(注意:如果用户需要进行kerberos认证,那么用户使用的Hadoop集群版本需要和hdfsreader的Hadoop版本保持一致,如果高于hdfsreader的Hadoop版本,不保证kerberos认证有效) 3 功能说明 3.1 配置样例 { "setting": {}, "job": { "setting": { "speed": { "channel": 2 } }, "content": [ { "reader": { "name": "txtfilereader", "parameter": { "path": ["/Users/shf/workplace/txtWorkplace/job/dataorcfull.txt"], "encoding": "UTF-8", "column": [ { "index": 0, "type": "long" }, { "index": 1, "type": "long" }, { "index": 2, "type": "long" }, { "index": 3, "type": "long" }, { "index": 4, "type": "DOUBLE" }, { "index": 5, "type": "DOUBLE" }, { "index": 6, "type": "STRING" }, { "index": 7, "type": "STRING" }, { "index": 8, "type": "STRING" }, { "index": 9, "type": "BOOLEAN" }, { "index": 10, "type": "date" }, { "index": 11, "type": "date" } ], "fieldDelimiter": "\t" } }, "writer": { "name": "hdfswriter", "parameter": { "defaultFS": "hdfs://xxx:port", "fileType": "orc", "path": "/user/hive/warehouse/writerorc....

February 2, 2021

DataX KingbaseesWriter

DataX KingbaseesWriter 1 快速介绍 KingbaseesWriter插件实现了写入数据到 KingbaseES主库目的表的功能。在底层实现上,KingbaseesWriter通过JDBC连接远程 KingbaseES 数据库,并执行相应的 insert into … sql 语句将数据写入 KingbaseES,内部会分批次提交入库。 KingbaseesWriter面向ETL开发工程师,他们使用KingbaseesWriter从数仓导入数据到KingbaseES。同时 KingbaseesWriter亦可以作为数据迁移工具为DBA等用户提供服务。 2 实现原理 KingbaseesWriter通过 DataX 框架获取 Reader 生成的协议数据,根据你配置生成相应的SQL插入语句 insert into...(当主键/唯一性索引冲突时会写不进去冲突的行) 注意: 1. 目的表所在数据库必须是主库才能写入数据;整个任务至少需具备 insert into...的权限,是否需要其他权限,取决于你任务配置中在 preSql 和 postSql 中指定的语句。 2. KingbaseesWriter和MysqlWriter不同,不支持配置writeMode参数。 3 功能说明 3.1 配置样例 这里使用一份从内存产生到 KingbaseesWriter导入的数据。 { "job": { "setting": { "speed": { "channel": 1 } }, "content": [ { "reader": { "name": "streamreader", "parameter": { "column" : [ { "value": "DataX", "type": "string" }, { "value": 19880808, "type": "long" }, { "value": "1988-08-08 08:08:08", "type": "date" }, { "value": true, "type": "bool" }, { "value": "test", "type": "bytes" } ], "sliceRecordCount": 1000 } }, "writer": { "name": "kingbaseeswriter", "parameter": { "username": "xx", "password": "xx", "column": [ "id", "name" ], "preSql": [ "delete from test" ], "connection": [ { "jdbcUrl": "jdbc:kingbase8://127....

February 2, 2021

Datax MongoDBReader

Datax MongoDBReader 1 快速介绍 MongoDBReader 插件利用 MongoDB 的java客户端MongoClient进行MongoDB的读操作。最新版本的Mongo已经将DB锁的粒度从DB级别降低到document级别,配合上MongoDB强大的索引功能,基本可以达到高性能的读取MongoDB的需求。 2 实现原理 MongoDBReader通过Datax框架从MongoDB并行的读取数据,通过主控的JOB程序按照指定的规则对MongoDB中的数据进行分片,并行读取,然后将MongoDB支持的类型通过逐一判断转换成Datax支持的类型。 3 功能说明 该示例从ODPS读一份数据到MongoDB。 { "job": { "setting": { "speed": { "channel": 2 } }, "content": [ { "reader": { "name": "mongodbreader", "parameter": { "address": ["127.0.0.1:27017"], "userName": "", "userPassword": "", "dbName": "tag_per_data", "collectionName": "tag_data12", "column": [ { "name": "unique_id", "type": "string" }, { "name": "sid", "type": "string" }, { "name": "user_id", "type": "string" }, { "name": "auction_id", "type": "string" }, { "name": "content_type", "type": "string" }, { "name": "pool_type", "type": "string" }, { "name": "frontcat_id", "type": "Array", "spliter": "" }, { "name": "categoryid", "type": "Array", "spliter": "" }, { "name": "gmt_create", "type": "string" }, { "name": "taglist", "type": "Array", "spliter": " " }, { "name": "property", "type": "string" }, { "name": "scorea", "type": "int" }, { "name": "scoreb", "type": "int" }, { "name": "scorec", "type": "int" } ] } }, "writer": { "name": "odpswriter", "parameter": { "project": "tb_ai_recommendation", "table": "jianying_tag_datax_read_test01", "column": [ "unique_id", "sid", "user_id", "auction_id", "content_type", "pool_type", "frontcat_id", "categoryid", "gmt_create", "taglist", "property", "scorea", "scoreb" ], "accessId": "**************", "accessKey": "********************", "truncate": true, "odpsServer": "xxx/api", "tunnelServer": "xxx", "accountType": "aliyun" } } } ] } } 4 参数说明 address: MongoDB的数据地址信息,因为MonogDB可能是个集群,则ip端口信息需要以Json数组的形式给出。【必填】 userName:MongoDB的用户名。【选填】 userPassword: MongoDB的密码。【选填】 collectionName: MonogoDB的集合名。【必填】 column:MongoDB的文档列名。【必填】 name:Column的名字。【必填】 type:Column的类型。【选填】 splitter:因为MongoDB支持数组类型,但是Datax框架本身不支持数组类型,所以mongoDB读出来的数组类型要通过这个分隔符合并成字符串。【选填】 query: MongoDB的额外查询条件。【选填】 5 类型转换 DataX 内部类型 MongoDB 数据类型 Long int, Long Double double String string, array Date date Boolean boolean Bytes bytes 6 性能报告 7 测试报告

February 2, 2021

Datax MongoDBWriter

Datax MongoDBWriter 1 快速介绍 MongoDBWriter 插件利用 MongoDB 的java客户端MongoClient进行MongoDB的写操作。最新版本的Mongo已经将DB锁的粒度从DB级别降低到document级别,配合上MongoDB强大的索引功能,基本可以满足数据源向MongoDB写入数据的需求,针对数据更新的需求,通过配置业务主键的方式也可以实现。 2 实现原理 MongoDBWriter通过Datax框架获取Reader生成的数据,然后将Datax支持的类型通过逐一判断转换成MongoDB支持的类型。其中一个值得指出的点就是Datax本身不支持数组类型,但是MongoDB支持数组类型,并且数组类型的索引还是蛮强大的。为了使用MongoDB的数组类型,则可以通过参数的特殊配置,将字符串可以转换成MongoDB中的数组。类型转换之后,就可以依托于Datax框架并行的写入MongoDB。 3 功能说明 该示例从ODPS读一份数据到MongoDB。 { "job": { "setting": { "speed": { "channel": 2 } }, "content": [ { "reader": { "name": "odpsreader", "parameter": { "accessId": "********", "accessKey": "*********", "project": "tb_ai_recommendation", "table": "jianying_tag_datax_test", "column": [ "unique_id", "sid", "user_id", "auction_id", "content_type", "pool_type", "frontcat_id", "categoryid", "gmt_create", "taglist", "property", "scorea", "scoreb" ], "splitMode": "record", "odpsServer": "http://xxx/api" } }, "writer": { "name": "mongodbwriter", "parameter": { "address": [ "127.0.0.1:27017" ], "userName": "", "userPassword": "", "dbName": "tag_per_data", "collectionName": "tag_data", "column": [ { "name": "unique_id", "type": "string" }, { "name": "sid", "type": "string" }, { "name": "user_id", "type": "string" }, { "name": "auction_id", "type": "string" }, { "name": "content_type", "type": "string" }, { "name": "pool_type", "type": "string" }, { "name": "frontcat_id", "type": "Array", "splitter": " " }, { "name": "categoryid", "type": "Array", "splitter": " " }, { "name": "gmt_create", "type": "string" }, { "name": "taglist", "type": "Array", "splitter": " " }, { "name": "property", "type": "string" }, { "name": "scorea", "type": "int" }, { "name": "scoreb", "type": "int" }, { "name": "scorec", "type": "int" } ], "upsertInfo": { "isUpsert": "true", "upsertKey": "unique_id" } } } } ] } } 4 参数说明 address: MongoDB的数据地址信息,因为MonogDB可能是个集群,则ip端口信息需要以Json数组的形式给出。【必填】 userName:MongoDB的用户名。【选填】 userPassword: MongoDB的密码。【选填】 collectionName: MonogoDB的集合名。【必填】 column:MongoDB的文档列名。【必填】 name:Column的名字。【必填】 type:Column的类型。【选填】 splitter:特殊分隔符,当且仅当要处理的字符串要用分隔符分隔为字符数组时,才使用这个参数,通过这个参数指定的分隔符,将字符串分隔存储到MongoDB的数组中。【选填】 upsertInfo:指定了传输数据时更新的信息。【选填】 isUpsert:当设置为true时,表示针对相同的upsertKey做更新操作。【选填】 upsertKey:upsertKey指定了没行记录的业务主键。用来做更新时使用。【选填】 5 类型转换 DataX 内部类型 MongoDB 数据类型 Long int, Long Double double String string, array Date date Boolean boolean Bytes bytes 6 性能报告 7 测试报告

February 2, 2021

DataX MysqlWriter

DataX MysqlWriter 1 快速介绍 MysqlWriter 插件实现了写入数据到 Mysql 主库的目的表的功能。在底层实现上, MysqlWriter 通过 JDBC 连接远程 Mysql 数据库,并执行相应的 insert into … 或者 ( replace into …) 的 sql 语句将数据写入 Mysql,内部会分批次提交入库,需要数据库本身采用 innodb 引擎。 MysqlWriter 面向ETL开发工程师,他们使用 MysqlWriter 从数仓导入数据到 Mysql。同时 MysqlWriter 亦可以作为数据迁移工具为DBA等用户提供服务。 2 实现原理 MysqlWriter 通过 DataX 框架获取 Reader 生成的协议数据,根据你配置的 writeMode 生成 insert into...(当主键/唯一性索引冲突时会写不进去冲突的行) 或者 replace into...(没有遇到主键/唯一性索引冲突时,与 insert into 行为一致,冲突时会用新行替换原有行所有字段) 的语句写入数据到 Mysql。出于性能考虑,采用了 PreparedStatement + Batch,并且设置了:rewriteBatchedStatements=true,将数据缓冲到线程上下文 Buffer 中,当 Buffer 累计到预定阈值时,才发起写入请求。 注意:目的表所在数据库必须是主库才能写入数据;整个任务至少需要具备 insert/replace into...的权限,是否需要其他权限,取决于你任务配置中在 preSql 和 postSql 中指定的语句。 3 功能说明 3.1 配置样例 这里使用一份从内存产生到 Mysql 导入的数据。 { "job": { "setting": { "speed": { "channel": 1 } }, "content": [ { "reader": { "name": "streamreader", "parameter": { "column" : [ { "value": "DataX", "type": "string" }, { "value": 19880808, "type": "long" }, { "value": "1988-08-08 08:08:08", "type": "date" }, { "value": true, "type": "bool" }, { "value": "test", "type": "bytes" } ], "sliceRecordCount": 1000 } }, "writer": { "name": "mysqlwriter", "parameter": { "writeMode": "insert", "username": "root", "password": "root", "column": [ "id", "name" ], "session": [ "set session sql_mode='ANSI'" ], "preSql": [ "delete from test" ], "connection": [ { "jdbcUrl": "jdbc:mysql://127....

February 2, 2021

DataX OCSWriter 适用memcached客户端写入ocs

DataX OCSWriter 适用memcached客户端写入ocs 1 快速介绍 1.1 OCS简介 开放缓存服务( Open Cache Service,简称OCS)是基于内存的缓存服务,支持海量小数据的高速访问。OCS可以极大缓解对后端存储的压力,提高网站或应用的响应速度。OCS支持Key-Value的数据结构,兼容Memcached协议的客户端都可与OCS通信。 OCS 支持即开即用的方式快速部署;对于动态Web、APP应用,可通过缓存服务减轻对数据库的压力,从而提高网站整体的响应速度。 与本地MemCache相同之处在于OCS兼容Memcached协议,与用户环境兼容,可直接用于OCS服务 不同之处在于硬件和数据部署在云端,有完善的基础设施、网络安全保障、系统维护服务。所有的这些服务,都不需要投资,只需根据使用量进行付费即可。 1.2 OCSWriter简介 OCSWriter是DataX实现的,基于Memcached协议的数据写入OCS通道。 2 功能说明 2.1 配置样例 这里使用一份从内存产生的数据导入到OCS。 { "job": { "setting": { "speed": { "channel": 1 } }, "content": [ { "reader": { "name": "streamreader", "parameter": { "column": [ { "value": "DataX", "type": "string" }, { "value": 19880808, "type": "long" }, { "value": "1988-08-08 08:08:08", "type": "date" }, { "value": true, "type": "bool" }, { "value": "test", "type": "bytes" } ], "sliceRecordCount": 1000 } }, "writer": { "name": "ocswriter", "parameter": { "proxy": "xxxx", "port": "11211", "userName": "user", "password": "******", "writeMode": "set|add|replace|append|prepend", "writeFormat": "text|binary", "fieldDelimiter": "\u0001", "expireTime": 1000, "indexes": "0,2", "batchSize": 1000 } } } ] } } 2....

February 2, 2021