DataX ODPSReader

DataX ODPSReader 1 快速介绍 ODPSReader 实现了从 ODPS读取数据的功能,有关ODPS请参看(https://help.aliyun.com/document_detail/27800.html?spm=5176.doc27803.6.101.NxCIgY)。 在底层实现上,ODPSReader 根据你配置的 源头项目 / 表 / 分区 / 表字段 等信息,通过 Tunnel 从 ODPS 系统中读取数据。 注意 1、如果你需要使用ODPSReader/Writer插件,由于 AccessId/AccessKey 解密的需要,请务必使用 JDK 1.6.32 及以上版本。JDK 安装事项,请联系 PE 处理 2、ODPSReader 不是通过 ODPS SQL (select ... from ... where ... )来抽取数据的 3、注意区分你要读取的表是线上环境还是线下环境 4、目前 DataX3 依赖的 SDK 版本是: <dependency> <groupId>com.aliyun.odps</groupId> <artifactId>odps-sdk-core-internal</artifactId> <version>0.13.2</version> </dependency> 2 实现原理 ODPSReader 支持读取分区表、非分区表,不支持读取虚拟视图。当要读取分区表时,需要指定出具体的分区配置,比如读取 t0 表,其分区为 pt=1,ds=hangzhou 那么你需要在配置中配置该值。当要读取非分区表时,你不能提供分区配置。表字段可以依序指定全部列,也可以指定部分列,或者调整列顺序,或者指定常量字段,但是表字段中不能指定分区列(分区列不是表字段)。 注意:要特别注意 odpsServer、project、table、accessId、accessKey 的配置,因为直接影响到是否能够加载到你需要读取数据的表。很多权限问题都出现在这里。 3 功能说明 3.1 配置样例 这里使用一份读出 ODPS 数据然后打印到屏幕的配置样板。 { "job": { "setting": { "speed": { "channel": 1 } }, "content": [ { "reader": { "name": "odpsreader", "parameter": { "accessId": "accessId", "accessKey": "accessKey", "project": "targetProjectName", "table": "tableName", "partition": [ "pt=1,ds=hangzhou" ], "column": [ "customer_id", "nickname" ], "packageAuthorizedProject": "yourCurrentProjectName", "splitMode": "record", "odpsServer": "http://xxx/api", "tunnelServer": "http://dt....

February 2, 2021

DataX ODPS写入

DataX ODPS写入 1 快速介绍 ODPSWriter插件用于实现往ODPS插入或者更新数据,主要提供给etl开发同学将业务数据导入odps,适合于TB,GB数量级的数据传输,如果需要传输PB量级的数据,请选择dt task工具 ; 2 实现原理 在底层实现上,ODPSWriter是通过DT Tunnel写入ODPS系统的,有关ODPS的更多技术细节请参看 ODPS主站 https://data.aliyun.com/product/odps 和ODPS产品文档 https://help.aliyun.com/product/27797.html 目前 DataX3 依赖的 SDK 版本是: <dependency> <groupId>com.aliyun.odps</groupId> <artifactId>odps-sdk-core-internal</artifactId> <version>0.13.2</version> </dependency> 注意: 如果你需要使用ODPSReader/Writer插件,请务必使用JDK 1.6-32及以上版本 使用java -version查看Java版本号 3 功能说明 3.1 配置样例 这里使用一份从内存产生到ODPS导入的数据。 { "job": { "setting": { "speed": { "byte": 1048576 } }, "content": [ { "reader": { "name": "streamreader", "parameter": { "column": [ { "value": "DataX", "type": "string" }, { "value": "test", "type": "bytes" } ], "sliceRecordCount": 100000 } }, "writer": { "name": "odpswriter", "parameter": { "project": "chinan_test", "table": "odps_write_test00_partitioned", "partition": "school=SiChuan-School,class=1", "column": [ "id", "name" ], "accessId": "xxx", "accessKey": "xxxx", "truncate": true, "odpsServer": "http://sxxx/api", "tunnelServer": "http://xxx", "accountType": "aliyun" } } } ] } } 3....

February 2, 2021

DataX OracleWriter

DataX OracleWriter 1 快速介绍 OracleWriter 插件实现了写入数据到 Oracle 主库的目的表的功能。在底层实现上, OracleWriter 通过 JDBC 连接远程 Oracle 数据库,并执行相应的 insert into … sql 语句将数据写入 Oracle,内部会分批次提交入库。 OracleWriter 面向ETL开发工程师,他们使用 OracleWriter 从数仓导入数据到 Oracle。同时 OracleWriter 亦可以作为数据迁移工具为DBA等用户提供服务。 2 实现原理 OracleWriter 通过 DataX 框架获取 Reader 生成的协议数据,根据你配置生成相应的SQL语句 insert into...(当主键/唯一性索引冲突时会写不进去冲突的行) 注意: 1. 目的表所在数据库必须是主库才能写入数据;整个任务至少需具备 insert into...的权限,是否需要其他权限,取决于你任务配置中在 preSql 和 postSql 中指定的语句。 2.OracleWriter和MysqlWriter不同,不支持配置writeMode参数。 3 功能说明 3.1 配置样例 这里使用一份从内存产生到 Oracle 导入的数据。 { "job": { "setting": { "speed": { "channel": 1 } }, "content": [ { "reader": { "name": "streamreader", "parameter": { "column" : [ { "value": "DataX", "type": "string" }, { "value": 19880808, "type": "long" }, { "value": "1988-08-08 08:08:08", "type": "date" }, { "value": true, "type": "bool" }, { "value": "test", "type": "bytes" } ], "sliceRecordCount": 1000 } }, "writer": { "name": "oraclewriter", "parameter": { "username": "root", "password": "root", "column": [ "id", "name" ], "preSql": [ "delete from test" ], "connection": [ { "jdbcUrl": "jdbc:oracle:thin:@[HOST_NAME]:PORT:[DATABASE_NAME]", "table": [ "test" ] } ] } } } ] } } 3....

February 2, 2021

DataX OSSReader 说明

DataX OSSReader 说明 1 快速介绍 OSSReader提供了读取OSS数据存储的能力。在底层实现上,OSSReader使用OSS官方Java SDK获取OSS数据,并转换为DataX传输协议传递给Writer。 OSS 产品介绍, 参看[阿里云OSS Portal] OSS Java SDK, 参看[阿里云OSS Java SDK] 2 功能与限制 OSSReader实现了从OSS读取数据并转为DataX协议的功能,OSS本身是无结构化数据存储,对于DataX而言,OSSReader实现上类比TxtFileReader,有诸多相似之处。目前OSSReader支持功能如下: 支持且仅支持读取TXT的文件,且要求TXT中shema为一张二维表。 支持类CSV格式文件,自定义分隔符。 支持多种类型数据读取(使用String表示),支持列裁剪,支持列常量 支持递归读取、支持文件名过滤。 支持文本压缩,现有压缩格式为zip、gzip、bzip2。注意,一个压缩包不允许多文件打包压缩。 多个object可以支持并发读取。 我们暂时不能做到: 单个Object(File)支持多线程并发读取,这里涉及到单个Object内部切分算法。二期考虑支持。 单个Object在压缩情况下,从技术上无法支持多线程并发读取。 3 功能说明 3.1 配置样例 { "job": { "setting": {}, "content": [ { "reader": { "name": "ossreader", "parameter": { "endpoint": "http://oss.aliyuncs.com", "accessId": "", "accessKey": "", "bucket": "myBucket", "object": [ "bazhen/*" ], "column": [ { "type": "long", "index": 0 }, { "type": "string", "value": "alibaba" }, { "type": "date", "index": 1, "format": "yyyy-MM-dd" } ], "encoding": "UTF-8", "fieldDelimiter": "\t", "compress": "gzip" } }, "writer": {} } ] } } 3.2 参数说明 endpoint 描述:OSS Server的EndPoint地址,例如http://oss....

February 2, 2021

DataX OSSWriter 说明

DataX OSSWriter 说明 1 快速介绍 OSSWriter提供了向OSS写入类CSV格式的一个或者多个表文件。 写入OSS内容存放的是一张逻辑意义上的二维表,例如CSV格式的文本信息。 OSS 产品介绍, 参看[阿里云OSS Portal] OSS Java SDK, 参看[阿里云OSS Java SDK] 2 功能与限制 OSSWriter实现了从DataX协议转为OSS中的TXT文件功能,OSS本身是无结构化数据存储,OSSWriter需要在如下几个方面增加: 支持且仅支持写入 TXT的文件,且要求TXT中shema为一张二维表。 支持类CSV格式文件,自定义分隔符。 暂时不支持文本压缩。 支持多线程写入,每个线程写入不同子文件。 文件支持滚动,当文件大于某个size值或者行数值,文件需要切换。 [暂不支持] 我们不能做到: 单个文件不能支持并发写入。 3 功能说明 3.1 配置样例 { "job": { "setting": {}, "content": [ { "reader": { }, "writer": { "name": "osswriter", "parameter": { "endpoint": "http://oss.aliyuncs.com", "accessId": "", "accessKey": "", "bucket": "myBucket", "object": "cdo/datax", "encoding": "UTF-8", "fieldDelimiter": ",", "writeMode": "truncate|append|nonConflict" } } } ] } } 3.2 参数说明 endpoint 描述:OSS Server的EndPoint地址,例如http://oss.aliyuncs.com。 必选:是 默认值:无 accessId 描述:OSS的accessId 必选:是 默认值:无 accessKey 描述:OSS的accessKey 必选:是 默认值:无 bucket 描述:OSS的bucket 必选:是 默认值:无 object 描述:OSSWriter写入的文件名,OSS使用文件名模拟目录的实现。 使用"object": “datax”,写入object以datax开头,后缀添加随机字符串。 使用"object": “cdo/datax”,写入的object以cdo/datax开头,后缀随机添加字符串,/作为OSS模拟目录的分隔符。 必选:是 默认值:无 writeMode...

February 2, 2021

DataX PostgresqlWriter

DataX PostgresqlWriter 1 快速介绍 PostgresqlWriter插件实现了写入数据到 PostgreSQL主库目的表的功能。在底层实现上,PostgresqlWriter通过JDBC连接远程 PostgreSQL 数据库,并执行相应的 insert into … sql 语句将数据写入 PostgreSQL,内部会分批次提交入库。 PostgresqlWriter面向ETL开发工程师,他们使用PostgresqlWriter从数仓导入数据到PostgreSQL。同时 PostgresqlWriter亦可以作为数据迁移工具为DBA等用户提供服务。 2 实现原理 PostgresqlWriter通过 DataX 框架获取 Reader 生成的协议数据,根据你配置生成相应的SQL插入语句 insert into...(当主键/唯一性索引冲突时会写不进去冲突的行) 注意: 1. 目的表所在数据库必须是主库才能写入数据;整个任务至少需具备 insert into...的权限,是否需要其他权限,取决于你任务配置中在 preSql 和 postSql 中指定的语句。 2. PostgresqlWriter和MysqlWriter不同,不支持配置writeMode参数。 3 功能说明 3.1 配置样例 这里使用一份从内存产生到 PostgresqlWriter导入的数据。 { "job": { "setting": { "speed": { "channel": 1 } }, "content": [ { "reader": { "name": "streamreader", "parameter": { "column" : [ { "value": "DataX", "type": "string" }, { "value": 19880808, "type": "long" }, { "value": "1988-08-08 08:08:08", "type": "date" }, { "value": true, "type": "bool" }, { "value": "test", "type": "bytes" } ], "sliceRecordCount": 1000 } }, "writer": { "name": "postgresqlwriter", "parameter": { "username": "xx", "password": "xx", "column": [ "id", "name" ], "preSql": [ "delete from test" ], "connection": [ { "jdbcUrl": "jdbc:postgresql://127....

February 2, 2021

DataX SqlServerWriter

DataX SqlServerWriter 1 快速介绍 SqlServerWriter 插件实现了写入数据到 SqlServer 库的目的表的功能。在底层实现上, SqlServerWriter 通过 JDBC 连接远程 SqlServer 数据库,并执行相应的 insert into … sql 语句将数据写入 SqlServer,内部会分批次提交入库。 SqlServerWriter 面向ETL开发工程师,他们使用 SqlServerWriter 从数仓导入数据到 SqlServer。同时 SqlServerWriter 亦可以作为数据迁移工具为DBA等用户提供服务。 2 实现原理 SqlServerWriter 通过 DataX 框架获取 Reader 生成的协议数据,根据你配置生成相应的SQL语句 insert into...(当主键/唯一性索引冲突时会写不进去冲突的行) 注意: 1. 目的表所在数据库必须是主库才能写入数据;整个任务至少需具备 insert into...的权限,是否需要其他权限,取决于你任务配置中在 preSql 和 postSql 中指定的语句。 2.SqlServerWriter和MysqlWriter不同,不支持配置writeMode参数。 3 功能说明 3.1 配置样例 这里使用一份从内存产生到 SqlServer 导入的数据。 { "job": { "setting": { "speed": { "channel": 5 } }, "content": [ { "reader": {}, "writer": { "name": "sqlserverwriter", "parameter": { "username": "root", "password": "root", "column": [ "db_id", "db_type", "db_ip", "db_port", "db_role", "db_name", "db_username", "db_password", "db_modify_time", "db_modify_user", "db_description", "db_tddl_info" ], "connection": [ { "table": [ "db_info_for_writer" ], "jdbcUrl": "jdbc:sqlserver://[HOST_NAME]:PORT;DatabaseName=[DATABASE_NAME]" } ], "preSql": [ "delete from @table where db_id = -1;" ], "postSql": [ "update @table set db_modify_time = now() where db_id = 1;" ] } } } ] } } 3....

February 2, 2021

DataX Transformer

DataX Transformer Transformer定义 在数据同步、传输过程中,存在用户对于数据传输进行特殊定制化的需求场景,包括裁剪列、转换列等工作,可以借助ETL的T过程实现(Transformer)。DataX包含了完整的E(Extract)、T(Transformer)、L(Load)支持。 运行模型 UDF手册 dx_substr 参数:3个 第一个参数:字段编号,对应record中第几个字段。 第二个参数:字段值的开始位置。 第三个参数:目标字段长度。 返回: 从字符串的指定位置(包含)截取指定长度的字符串。如果开始位置非法抛出异常。如果字段为空值,直接返回(即不参与本transformer) 举例: dx_substr(1,"2","5") column 1的value为“dataxTest”=>"taxTe" dx_substr(1,"5","10") column 1的value为“dataxTest”=>"Test" dx_pad 参数:4个 第一个参数:字段编号,对应record中第几个字段。 第二个参数:“l”,“r”, 指示是在头进行pad,还是尾进行pad。 第三个参数:目标字段长度。 第四个参数:需要pad的字符。 返回: 如果源字符串长度小于目标字段长度,按照位置添加pad字符后返回。如果长于,直接截断(都截右边)。如果字段为空值,转换为空字符串进行pad,即最后的字符串全是需要pad的字符 举例: dx_pad(1,"l","4","A"), 如果column 1 的值为 xyz=> Axyz, 值为 xyzzzzz => xyzz dx_pad(1,"r","4","A"), 如果column 1 的值为 xyz=> xyzA, 值为 xyzzzzz => xyzz dx_replace 参数:4个 第一个参数:字段编号,对应record中第几个字段。 第二个参数:字段值的开始位置。 第三个参数:需要替换的字段长度。 第四个参数:需要替换的字符串。 返回: 从字符串的指定位置(包含)替换指定长度的字符串。如果开始位置非法抛出异常。如果字段为空值,直接返回(即不参与本transformer) 举例: dx_replace(1,"2","4","****") column 1的value为“dataxTest”=>"da****est" dx_replace(1,"5","10","****") column 1的value为“dataxTest”=>"data****" dx_filter (关联filter暂不支持,即多个字段的联合判断,函参太过复杂,用户难以使用。) 参数: 第一个参数:字段编号,对应record中第几个字段。 第二个参数:运算符,支持一下运算符:like, not like, >, =, <, >=, !=, <= 第三个参数:正则表达式(java正则表达式)、值。 返回: 如果匹配正则表达式,返回Null,表示过滤该行。不匹配表达式时,表示保留该行。(注意是该行)。对于>=<都是对字段直接compare的结果. like , not like是将字段转换成String,然后和目标正则表达式进行全匹配。 , =, <, >=, !=, <= 对于DoubleColumn比较double值,对于LongColumn和DateColumn比较long值,其他StringColumn,BooleanColumn以及ByteColumn均比较的是StringColumn值。 如果目标colunn为空(null),对于 = null的过滤条件,将满足条件,被过滤。!=null的过滤条件,null不满足过滤条件,不被过滤。 like,字段为null不满足条件,不被过滤,和not like,字段为null满足条件,被过滤。 举例: dx_filter(1,"like","dataTest") dx_filter(1,">=","10") dx_groovy 参数。 第一个参数: groovy code 第二个参数(列表或者为空):extraPackage 备注: dx_groovy只能调用一次。不能多次调用。 groovy code中支持java....

February 2, 2021

DataX TxtFileReader 说明

DataX TxtFileReader 说明 1 快速介绍 TxtFileReader提供了读取本地文件系统数据存储的能力。在底层实现上,TxtFileReader获取本地文件数据,并转换为DataX传输协议传递给Writer。 本地文件内容存放的是一张逻辑意义上的二维表,例如CSV格式的文本信息。 2 功能与限制 TxtFileReader实现了从本地文件读取数据并转为DataX协议的功能,本地文件本身是无结构化数据存储,对于DataX而言,TxtFileReader实现上类比OSSReader,有诸多相似之处。目前TxtFileReader支持功能如下: 支持且仅支持读取TXT的文件,且要求TXT中shema为一张二维表。 支持类CSV格式文件,自定义分隔符。 支持多种类型数据读取(使用String表示),支持列裁剪,支持列常量 支持递归读取、支持文件名过滤。 支持文本压缩,现有压缩格式为zip、gzip、bzip2。 多个File可以支持并发读取。 我们暂时不能做到: 单个File支持多线程并发读取,这里涉及到单个File内部切分算法。二期考虑支持。 单个File在压缩情况下,从技术上无法支持多线程并发读取。 3 功能说明 3.1 配置样例 { "setting": {}, "job": { "setting": { "speed": { "channel": 2 } }, "content": [ { "reader": { "name": "txtfilereader", "parameter": { "path": ["/home/haiwei.luo/case00/data"], "encoding": "UTF-8", "column": [ { "index": 0, "type": "long" }, { "index": 1, "type": "boolean" }, { "index": 2, "type": "double" }, { "index": 3, "type": "string" }, { "index": 4, "type": "date", "format": "yyyy.MM.dd" } ], "fieldDelimiter": "," } }, "writer": { "name": "txtfilewriter", "parameter": { "path": "/home/haiwei.luo/case00/result", "fileName": "luohw", "writeMode": "truncate", "format": "yyyy-MM-dd" } } } ] } } 3....

February 2, 2021

DataX TxtFileWriter 说明

DataX TxtFileWriter 说明 1 快速介绍 TxtFileWriter提供了向本地文件写入类CSV格式的一个或者多个表文件。TxtFileWriter服务的用户主要在于DataX开发、测试同学。 写入本地文件内容存放的是一张逻辑意义上的二维表,例如CSV格式的文本信息。 2 功能与限制 TxtFileWriter实现了从DataX协议转为本地TXT文件功能,本地文件本身是无结构化数据存储,TxtFileWriter如下几个方面约定: 支持且仅支持写入 TXT的文件,且要求TXT中shema为一张二维表。 支持类CSV格式文件,自定义分隔符。 支持文本压缩,现有压缩格式为gzip、bzip2。 支持多线程写入,每个线程写入不同子文件。 文件支持滚动,当文件大于某个size值或者行数值,文件需要切换。 [暂不支持] 我们不能做到: 单个文件不能支持并发写入。 3 功能说明 3.1 配置样例 { "setting": {}, "job": { "setting": { "speed": { "channel": 2 } }, "content": [ { "reader": { "name": "txtfilereader", "parameter": { "path": ["/home/haiwei.luo/case00/data"], "encoding": "UTF-8", "column": [ { "index": 0, "type": "long" }, { "index": 1, "type": "boolean" }, { "index": 2, "type": "double" }, { "index": 3, "type": "string" }, { "index": 4, "type": "date", "format": "yyyy.MM.dd" } ], "fieldDelimiter": "," } }, "writer": { "name": "txtfilewriter", "parameter": { "path": "/home/haiwei.luo/case00/result", "fileName": "luohw", "writeMode": "truncate", "dateFormat": "yyyy-MM-dd" } } } ] } } 3....

February 2, 2021