DataX ElasticSearchWriter


1 快速介绍

数据导入elasticsearch的插件

2 实现原理

使用elasticsearch的rest api接口, 批量把从reader读入的数据写入elasticsearch

3 功能说明

3.1 配置样例

job.json

{
  "job": {
    "setting": {
        "speed": {
            "channel": 1
        }
    },
    "content": [
      {
        "reader": {
          ...
        },
        "writer": {
          "name": "elasticsearchwriter",
          "parameter": {
            "endpoint": "http://xxx:9999",
            "accessId": "xxxx",
            "accessKey": "xxxx",
            "index": "test-1",
            "type": "default",
            "cleanup": true,
            "settings": {"index" :{"number_of_shards": 1, "number_of_replicas": 0}},
            "discovery": false,
            "batchSize": 1000,
            "splitter": ",",
            "column": [
              {"name": "pk", "type": "id"},
              { "name": "col_ip","type": "ip" },
              { "name": "col_double","type": "double" },
              { "name": "col_long","type": "long" },
              { "name": "col_integer","type": "integer" },
              { "name": "col_keyword", "type": "keyword" },
              { "name": "col_text", "type": "text", "analyzer": "ik_max_word"},
              { "name": "col_geo_point", "type": "geo_point" },
              { "name": "col_date", "type": "date", "format": "yyyy-MM-dd HH:mm:ss"},
              { "name": "col_nested1", "type": "nested" },
              { "name": "col_nested2", "type": "nested" },
              { "name": "col_object1", "type": "object" },
              { "name": "col_object2", "type": "object" },
              { "name": "col_integer_array", "type":"integer", "array":true},
              { "name": "col_geo_shape", "type":"geo_shape", "tree": "quadtree", "precision": "10m"}
            ]
          }
        }
      }
    ]
  }
}

3.2 参数说明

  • endpoint

  • 描述:ElasticSearch的连接地址

  • 必选:是

  • 默认值:无

  • accessId

  • 描述:http auth中的user

  • 必选:否

  • 默认值:空

  • accessKey

  • 描述:http auth中的password

  • 必选:否

  • 默认值:空

  • index

  • 描述:elasticsearch中的index名

  • 必选:是

  • 默认值:无

  • type

  • 描述:elasticsearch中index的type名

  • 必选:否

  • 默认值:index名

  • cleanup

  • 描述:是否删除原表

  • 必选:否

  • 默认值:false

  • batchSize

  • 描述:每次批量数据的条数

  • 必选:否

  • 默认值:1000

  • trySize

  • 描述:失败后重试的次数

  • 必选:否

  • 默认值:30

  • timeout

  • 描述:客户端超时时间

  • 必选:否

  • 默认值:600000

  • discovery

  • 描述:启用节点发现将(轮询)并定期更新客户机中的服务器列表。

  • 必选:否

  • 默认值:false

  • compression

  • 描述:http请求,开启压缩

  • 必选:否

  • 默认值:true

  • multiThread

  • 描述:http请求,是否有多线程

  • 必选:否

  • 默认值:true

  • ignoreWriteError

  • 描述:忽略写入错误,不重试,继续写入

  • 必选:否

  • 默认值:false

  • ignoreParseError

  • 描述:忽略解析数据格式错误,继续写入

  • 必选:否

  • 默认值:true

  • alias

  • 描述:数据导入完成后写入别名

  • 必选:否

  • 默认值:无

  • aliasMode

  • 描述:数据导入完成后增加别名的模式,append(增加模式), exclusive(只留这一个)

  • 必选:否

  • 默认值:append

  • settings

  • 描述:创建index时候的settings, 与elasticsearch官方相同

  • 必选:否

  • 默认值:无

  • splitter

  • 描述:如果插入数据是array,就使用指定分隔符

  • 必选:否

  • 默认值:-,-

  • column

  • 描述:elasticsearch所支持的字段类型,样例中包含了全部

  • 必选:是

  • dynamic

  • 描述: 不使用datax的mappings,使用es自己的自动mappings

  • 必选: 否

  • 默认值: false

4 性能报告

4.1 环境准备

  • 总数据量 1kw条数据, 每条0.1kb
  • 1个shard, 0个replica
  • 不加id,这样默认是append_only模式,不检查版本,插入速度会有20%左右的提升

4.1.1 输入数据类型(streamreader)

{"value": "1.1.1.1", "type": "string"},
{"value": 19890604.0, "type": "double"},
{"value": 19890604, "type": "long"},
{"value": 19890604, "type": "long"},
{"value": "hello world", "type": "string"},
{"value": "hello world", "type": "string"},
{"value": "41.12,-71.34", "type": "string"},
{"value": "2017-05-25", "type": "string"},

4.1.2 输出数据类型(eswriter)

{ "name": "col_ip","type": "ip" },
{ "name": "col_double","type": "double" },
{ "name": "col_long","type": "long" },
{ "name": "col_integer","type": "integer" },
{ "name": "col_keyword", "type": "keyword" },
{ "name": "col_text", "type": "text"},
{ "name": "col_geo_point", "type": "geo_point" },
{ "name": "col_date", "type": "date"}

4.1.2 机器参数

  1. cpu: 32 Intel(R) Xeon(R) CPU E5-2650 v2 @ 2.60GHz
  2. mem: 128G
  3. net: 千兆双网卡

4.1.3 DataX jvm 参数

-Xms1024m -Xmx1024m -XX:+HeapDumpOnOutOfMemoryError

4.2 测试报告

通道数 批量提交行数 DataX速度(Rec/s) DataX流量(MB/s)
4 256 11013 0.828
4 1024 19417 1.43
4 4096 23923 1.76
4 8172 24449 1.80
8 256 21459 1.58
8 1024 37037 2.72
8 4096 45454 3.34
8 8172 45871 3.37
16 1024 67567 4.96
16 4096 78125 5.74
16 8172 77519 5.69
32 1024 94339 6.93
32 4096 96153 7.06
64 1024 91743 6.74

4.3 测试总结

  • 最好的结果是32通道,每次传4096,如果单条数据很大, 请适当减少批量数,防止oom
  • 当然这个很容易水平扩展,而且es也是分布式的,多设置几个shard也可以水平扩展

5 约束限制

  • 如果导入id,这样数据导入失败也会重试,重新导入也仅仅是覆盖,保证数据一致性
  • 如果不导入id,就是append_only模式,elasticsearch自动生成id,速度会提升20%左右,但数据无法修复,适合日志型数据(对数据精度要求不高的)