DataX ElasticSearchWriter
1 快速介绍
数据导入elasticsearch的插件
2 实现原理
使用elasticsearch的rest api接口, 批量把从reader读入的数据写入elasticsearch
3 功能说明
3.1 配置样例
job.json
{
"job": {
"setting": {
"speed": {
"channel": 1
}
},
"content": [
{
"reader": {
...
},
"writer": {
"name": "elasticsearchwriter",
"parameter": {
"endpoint": "http://xxx:9999",
"accessId": "xxxx",
"accessKey": "xxxx",
"index": "test-1",
"type": "default",
"cleanup": true,
"settings": {"index" :{"number_of_shards": 1, "number_of_replicas": 0}},
"discovery": false,
"batchSize": 1000,
"splitter": ",",
"column": [
{"name": "pk", "type": "id"},
{ "name": "col_ip","type": "ip" },
{ "name": "col_double","type": "double" },
{ "name": "col_long","type": "long" },
{ "name": "col_integer","type": "integer" },
{ "name": "col_keyword", "type": "keyword" },
{ "name": "col_text", "type": "text", "analyzer": "ik_max_word"},
{ "name": "col_geo_point", "type": "geo_point" },
{ "name": "col_date", "type": "date", "format": "yyyy-MM-dd HH:mm:ss"},
{ "name": "col_nested1", "type": "nested" },
{ "name": "col_nested2", "type": "nested" },
{ "name": "col_object1", "type": "object" },
{ "name": "col_object2", "type": "object" },
{ "name": "col_integer_array", "type":"integer", "array":true},
{ "name": "col_geo_shape", "type":"geo_shape", "tree": "quadtree", "precision": "10m"}
]
}
}
}
]
}
}
3.2 参数说明
-
endpoint
-
描述:ElasticSearch的连接地址
-
必选:是
-
默认值:无
-
accessId
-
描述:http auth中的user
-
必选:否
-
默认值:空
-
accessKey
-
描述:http auth中的password
-
必选:否
-
默认值:空
-
index
-
描述:elasticsearch中的index名
-
必选:是
-
默认值:无
-
type
-
描述:elasticsearch中index的type名
-
必选:否
-
默认值:index名
-
cleanup
-
描述:是否删除原表
-
必选:否
-
默认值:false
-
batchSize
-
描述:每次批量数据的条数
-
必选:否
-
默认值:1000
-
trySize
-
描述:失败后重试的次数
-
必选:否
-
默认值:30
-
timeout
-
描述:客户端超时时间
-
必选:否
-
默认值:600000
-
discovery
-
描述:启用节点发现将(轮询)并定期更新客户机中的服务器列表。
-
必选:否
-
默认值:false
-
compression
-
描述:http请求,开启压缩
-
必选:否
-
默认值:true
-
multiThread
-
描述:http请求,是否有多线程
-
必选:否
-
默认值:true
-
ignoreWriteError
-
描述:忽略写入错误,不重试,继续写入
-
必选:否
-
默认值:false
-
ignoreParseError
-
描述:忽略解析数据格式错误,继续写入
-
必选:否
-
默认值:true
-
alias
-
描述:数据导入完成后写入别名
-
必选:否
-
默认值:无
-
aliasMode
-
描述:数据导入完成后增加别名的模式,append(增加模式), exclusive(只留这一个)
-
必选:否
-
默认值:append
-
settings
-
描述:创建index时候的settings, 与elasticsearch官方相同
-
必选:否
-
默认值:无
-
splitter
-
描述:如果插入数据是array,就使用指定分隔符
-
必选:否
-
默认值:-,-
-
column
-
描述:elasticsearch所支持的字段类型,样例中包含了全部
-
必选:是
-
dynamic
-
描述: 不使用datax的mappings,使用es自己的自动mappings
-
必选: 否
-
默认值: false
4 性能报告
4.1 环境准备
- 总数据量 1kw条数据, 每条0.1kb
- 1个shard, 0个replica
- 不加id,这样默认是append_only模式,不检查版本,插入速度会有20%左右的提升
4.1.1 输入数据类型(streamreader)
{"value": "1.1.1.1", "type": "string"},
{"value": 19890604.0, "type": "double"},
{"value": 19890604, "type": "long"},
{"value": 19890604, "type": "long"},
{"value": "hello world", "type": "string"},
{"value": "hello world", "type": "string"},
{"value": "41.12,-71.34", "type": "string"},
{"value": "2017-05-25", "type": "string"},
4.1.2 输出数据类型(eswriter)
{ "name": "col_ip","type": "ip" },
{ "name": "col_double","type": "double" },
{ "name": "col_long","type": "long" },
{ "name": "col_integer","type": "integer" },
{ "name": "col_keyword", "type": "keyword" },
{ "name": "col_text", "type": "text"},
{ "name": "col_geo_point", "type": "geo_point" },
{ "name": "col_date", "type": "date"}
4.1.2 机器参数
- cpu: 32 Intel(R) Xeon(R) CPU E5-2650 v2 @ 2.60GHz
- mem: 128G
- net: 千兆双网卡
4.1.3 DataX jvm 参数
-Xms1024m -Xmx1024m -XX:+HeapDumpOnOutOfMemoryError
4.2 测试报告
通道数 | 批量提交行数 | DataX速度(Rec/s) | DataX流量(MB/s) |
---|---|---|---|
4 | 256 | 11013 | 0.828 |
4 | 1024 | 19417 | 1.43 |
4 | 4096 | 23923 | 1.76 |
4 | 8172 | 24449 | 1.80 |
8 | 256 | 21459 | 1.58 |
8 | 1024 | 37037 | 2.72 |
8 | 4096 | 45454 | 3.34 |
8 | 8172 | 45871 | 3.37 |
16 | 1024 | 67567 | 4.96 |
16 | 4096 | 78125 | 5.74 |
16 | 8172 | 77519 | 5.69 |
32 | 1024 | 94339 | 6.93 |
32 | 4096 | 96153 | 7.06 |
64 | 1024 | 91743 | 6.74 |
4.3 测试总结
- 最好的结果是32通道,每次传4096,如果单条数据很大, 请适当减少批量数,防止oom
- 当然这个很容易水平扩展,而且es也是分布式的,多设置几个shard也可以水平扩展
5 约束限制
- 如果导入id,这样数据导入失败也会重试,重新导入也仅仅是覆盖,保证数据一致性
- 如果不导入id,就是append_only模式,elasticsearch自动生成id,速度会提升20%左右,但数据无法修复,适合日志型数据(对数据精度要求不高的)