RDBMSWriter 插件文档

RDBMSWriter 插件文档 1 快速介绍 RDBMSWriter 插件实现了写入数据到 RDBMS 主库的目的表的功能。在底层实现上, RDBMSWriter 通过 JDBC 连接远程 RDBMS 数据库,并执行相应的 insert into … 的 sql 语句将数据写入 RDBMS。 RDBMSWriter是一个通用的关系数据库写插件,您可以通过注册数据库驱动等方式增加任意多样的关系数据库写支持。 RDBMSWriter 面向ETL开发工程师,他们使用 RDBMSWriter 从数仓导入数据到 RDBMS。同时 RDBMSWriter 亦可以作为数据迁移工具为DBA等用户提供服务。 2 实现原理 RDBMSWriter 通过 DataX 框架获取 Reader 生成的协议数据,RDBMSWriter 通过 JDBC 连接远程 RDBMS 数据库,并执行相应的 insert into … 的 sql 语句将数据写入 RDBMS。 3 功能说明 3.1 配置样例 配置一个写入RDBMS的作业。 { "job": { "setting": { "speed": { "channel": 1 } }, "content": [ { "reader": { "name": "streamreader", "parameter": { "column": [ { "value": "DataX", "type": "string" }, { "value": 19880808, "type": "long" }, { "value": "1988-08-08 08:08:08", "type": "date" }, { "value": true, "type": "bool" }, { "value": "test", "type": "bytes" } ], "sliceRecordCount": 1000 } }, "writer": { "name": "rdbmswriter", "parameter": { "connection": [ { "jdbcUrl": "jdbc:dm://ip:port/database", "table": [ "table" ] } ], "username": "username", "password": "password", "table": "table", "column": [ "*" ], "preSql": [ "delete from XXX;" ] } } } ] } } 3....

February 2, 2021

Readme.md

some script here.

February 2, 2021

README.md

本插件仅在Elasticsearch 5.x上测试

February 2, 2021

SqlServerReader 插件文档

SqlServerReader 插件文档 1 快速介绍 SqlServerReader插件实现了从SqlServer读取数据。在底层实现上,SqlServerReader通过JDBC连接远程SqlServer数据库,并执行相应的sql语句将数据从SqlServer库中SELECT出来。 2 实现原理 简而言之,SqlServerReader通过JDBC连接器连接到远程的SqlServer数据库,并根据用户配置的信息生成查询SELECT SQL语句并发送到远程SqlServer数据库,并将该SQL执行返回结果使用DataX自定义的数据类型拼装为抽象的数据集,并传递给下游Writer处理。 对于用户配置Table、Column、Where的信息,SqlServerReader将其拼接为SQL语句发送到SqlServer数据库;对于用户配置querySql信息,SqlServer直接将其发送到SqlServer数据库。 3 功能说明 3.1 配置样例 配置一个从SqlServer数据库同步抽取数据到本地的作业: { "job": { "setting": { "speed": { "byte": 1048576 } }, "content": [ { "reader": { "name": "sqlserverreader", "parameter": { // 数据库连接用户名 "username": "root", // 数据库连接密码 "password": "root", "column": [ "id" ], "splitPk": "db_id", "connection": [ { "table": [ "table" ], "jdbcUrl": [ "jdbc:sqlserver://localhost:3433;DatabaseName=dbname" ] } ] } }, "writer": { "name": "streamwriter", "parameter": { "print": true, "encoding": "UTF-8" } } } ] } } 配置一个自定义SQL的数据库同步任务到本地内容的作业: { "job": { "setting": { "speed": 1048576 }, "content": [ { "reader": { "name": "sqlserverreader", "parameter": { "username": "root", "password": "root", "where": "", "connection": [ { "querySql": [ "select db_id,on_line_flag from db_info where db_id < 10;" ], "jdbcUrl": [ "jdbc:sqlserver://bad_ip:3433;DatabaseName=dbname", "jdbc:sqlserver://127....

February 2, 2021

TableStore增量数据导出通道:TableStoreStreamReader

TableStore增量数据导出通道:TableStoreStreamReader 快速介绍 TableStoreStreamReader插件主要用于TableStore的增量数据导出,增量数据可以看作操作日志,除了数据本身外还附有操作信息。 与全量导出插件不同,增量导出插件只有多版本模式,同时不支持指定列。这是与增量导出的原理有关的,导出的格式下面有详细介绍。 使用插件前必须确保表上已经开启Stream功能,可以在建表的时候指定开启,或者使用SDK的UpdateTable接口开启。 开启Stream的方法: SyncClient client = new SyncClient("", "", "", ""); 1. 建表的时候开启: CreateTableRequest createTableRequest = new CreateTableRequest(tableMeta); createTableRequest.setStreamSpecification(new StreamSpecification(true, 24)); // 24代表增量数据保留24小时 client.createTable(createTableRequest); 2. 如果建表时未开启,可以通过UpdateTable开启: UpdateTableRequest updateTableRequest = new UpdateTableRequest("tableName"); updateTableRequest.setStreamSpecification(new StreamSpecification(true, 24)); client.updateTable(updateTableRequest); 实现原理 首先用户使用SDK的UpdateTable功能,指定开启Stream并设置过期时间,即开启了增量功能。 开启后,TableStore服务端就会将用户的操作日志额外保存起来, 每个分区有一个有序的操作日志队列,每条操作日志会在一定时间后被垃圾回收,这个时间即用户指定的过期时间。 TableStore的SDK提供了几个Stream相关的API用于将这部分操作日志读取出来,增量插件也是通过TableStore SDK的接口获取到增量数据的,并将 增量数据转化为多个6元组的形式(pk, colName, version, colValue, opType, sequenceInfo)导入到ODPS中。 Reader的配置模版: "reader": { "name" : "otsstreamreader", "parameter" : { "endpoint" : "", "accessId" : "", "accessKey" : "", "instanceName" : "", //dataTable即需要导出数据的表。 "dataTable" : "", //statusTable是Reader用于保存状态的表,若该表不存在,Reader会自动创建该表。 //一次离线导出任务完成后,用户不应删除该表,该表中记录的状态可用于下次导出任务中。 "statusTable" : "TableStoreStreamReaderStatusTable", //增量数据的时间范围(左闭右开)的左边界。 "startTimestampMillis" : "", //增量数据的时间范围(左闭右开)的右边界。 "endTimestampMillis" : "", //采云间调度只支持天级别,所以提供该配置,作用与startTimestampMillis和endTimestampMillis类似。 "date": "", //是否导出时序信息。 "isExportSequenceInfo": true, //从TableStore中读增量数据时,每次请求的最大重试次数,默认为30。 "maxRetries" : 30 } } 参数说明 名称 说明 类型 必选 endpoint TableStoreServer的Endpoint地址。 String 是 accessId 用于访问TableStore服务的accessId。 String 是 accessKey 用于访问TableStore服务的accessKey。 String 是 instanceName TableStore的实例名称。 String 是 dataTable 需要导出增量数据的表的名称。该表需要开启Stream,可以在建表时开启,或者使用UpdateTable接口开启。 String 是 statusTable Reader插件用于记录状态的表的名称,这些状态可用于减少对非目标范围内的数据的扫描,从而加快导出速度。 1....

February 2, 2021

TSDBReader 插件文档

TSDBReader 插件文档 1 快速介绍 TSDBReader 插件实现了从阿里云 TSDB 读取数据。阿里云时间序列数据库 ( Time Series Database , 简称 TSDB) 是一种集时序数据高效读写,压缩存储,实时计算能力为一体的数据库服务,可广泛应用于物联网和互联网领域,实现对设备及业务服务的实时监控,实时预测告警。详见 TSDB 的阿里云官网。 2 实现原理 在底层实现上,TSDBReader 通过 HTTP 请求链接到 阿里云 TSDB 实例,利用 /api/query 或者 /api/mquery 接口将数据点扫描出来(更多细节详见:时序数据库 TSDB - HTTP API 概览)。而整个同步的过程,是通过时间线和查询时间线范围进行切分。 3 功能说明 3.1 配置样例 配置一个从 阿里云 TSDB 数据库同步抽取数据到本地的作业,并以时序数据的格式输出: 时序数据样例: {"metric":"m","tags":{"app":"a19","cluster":"c5","group":"g10","ip":"i999","zone":"z1"},"timestamp":1546272263,"value":1} { "job": { "content": [ { "reader": { "name": "tsdbreader", "parameter": { "sinkDbType": "TSDB", "endpoint": "http://localhost:8242", "column": [ "m" ], "splitIntervalMs": 60000, "beginDateTime": "2019-01-01 00:00:00", "endDateTime": "2019-01-01 01:00:00" } }, "writer": { "name": "streamwriter", "parameter": { "encoding": "UTF-8", "print": true } } } ], "setting": { "speed": { "channel": 3 } } } } 配置一个从 阿里云 TSDB 数据库同步抽取数据到本地的作业,并以关系型数据的格式输出: 关系型数据样例:...

February 2, 2021

TSDBWriter 插件文档

TSDBWriter 插件文档 1 快速介绍 TSDBWriter 插件实现了将数据点写入到阿里巴巴自研 TSDB 数据库中(后续简称 TSDB)。 时间序列数据库(Time Series Database , 简称 TSDB)是一种高性能,低成本,稳定可靠的在线时序数据库服务;提供高效读写,高压缩比存储、时序数据插值及聚合计算,广泛应用于物联网(IoT)设备监控系统 ,企业能源管理系统(EMS),生产安全监控系统,电力检测系统等行业场景。 TSDB 提供百万级时序数据秒级写入,高压缩比低成本存储、预降采样、插值、多维聚合计算,查询结果可视化功能;解决由于设备采集点数量巨大,数据采集频率高,造成的存储成本高,写入和查询分析效率低的问题。更多关于 TSDB 的介绍,详见阿里云 TSDB 官网。 2 实现原理 通过 HTTP 连接 TSDB 实例,并通过 /api/put 接口将数据点写入。关于写入接口详见 TSDB 的接口说明文档。 3 功能说明 3.1 配置样例 配置一个从 OpenTSDB 数据库同步抽取数据到 TSDB: { "job": { "content": [ { "reader": { "name": "opentsdbreader", "parameter": { "endpoint": "http://localhost:4242", "column": [ "m" ], "startTime": "2019-01-01 00:00:00", "endTime": "2019-01-01 03:00:00" } }, "writer": { "name": "tsdbhttpwriter", "parameter": { "endpoint": "http://localhost:8242" } } } ], "setting": { "speed": { "channel": 1 } } } } 3.2 参数说明 name 描述:本插件的名称 必选:是 默认值:tsdbhttpwriter parameter endpoint 描述:TSDB 的 HTTP 连接地址 必选:是 格式:http://IP:Port 默认值:无 batchSize...

February 2, 2021

阿里云开源离线同步工具DataX3.0介绍

阿里云开源离线同步工具DataX3.0介绍 一. DataX3.0概览 ​ DataX 是一个异构数据源离线同步工具,致力于实现包括关系型数据库(MySQL、Oracle等)、HDFS、Hive、ODPS、HBase、FTP等各种异构数据源之间稳定高效的数据同步功能。 设计理念 为了解决异构数据源同步问题,DataX将复杂的网状的同步链路变成了星型数据链路,DataX作为中间传输载体负责连接各种数据源。当需要接入一个新的数据源的时候,只需要将此数据源对接到DataX,便能跟已有的数据源做到无缝数据同步。 当前使用现状 DataX在阿里巴巴集团内被广泛使用,承担了所有大数据的离线同步业务,并已持续稳定运行了6年之久。目前每天完成同步8w多道作业,每日传输数据量超过300TB。 此前已经开源DataX1.0版本,此次介绍为阿里云开源全新版本DataX3.0,有了更多更强大的功能和更好的使用体验。Github主页地址:https://github.com/alibaba/DataX 二、DataX3.0框架设计 DataX本身作为离线数据同步框架,采用Framework + plugin架构构建。将数据源读取和写入抽象成为Reader/Writer插件,纳入到整个同步框架中。 Reader:Reader为数据采集模块,负责采集数据源的数据,将数据发送给Framework。 Writer: Writer为数据写入模块,负责不断向Framework取数据,并将数据写入到目的端。 Framework:Framework用于连接reader和writer,作为两者的数据传输通道,并处理缓冲,流控,并发,数据转换等核心技术问题。 三. DataX3.0插件体系 ​ 经过几年积累,DataX目前已经有了比较全面的插件体系,主流的RDBMS数据库、NOSQL、大数据计算系统都已经接入。DataX目前支持数据如下: 类型 数据源 Reader(读) Writer(写) 文档 RDBMS 关系型数据库 MySQL √ √ 读 、写 Oracle √ √ 读 、写 SQLServer √ √ 读 、写 PostgreSQL √ √ 读 、写 DRDS √ √ 读 、写 达梦 √ √ 读 、写 通用RDBMS(支持所有关系型数据库) √ √ 读 、写 阿里云数仓数据存储 ODPS √ √ 读 、写 ADS √ 写 OSS √ √ 读 、写 OCS √ √ 读 、写 NoSQL数据存储 OTS √ √ 读 、写 Hbase0.94 √ √ 读 、写 Hbase1.1 √ √ 读 、写 MongoDB √ √ 读 、写 Hive √ √ 读 、写 无结构化数据存储 TxtFile √ √ 读 、写 FTP √ √ 读 、写 HDFS √ √ 读 、写 Elasticsearch √ 写 DataX Framework提供了简单的接口与插件交互,提供简单的插件接入机制,只需要任意加上一种插件,就能无缝对接其他数据源。详情请看:DataX数据源指南...

February 2, 2021