如何使用 INFINI 迁移功能
> 文档中心 > INFINI Console > 动手教程 > 如何使用 INFINI 迁移功能

如何使用 INFINI 迁移功能 #

简介 #

本文将介绍如何使用 INFINI Console 和 INFINI Gateway 来迁移 Elasticsearch 索引数据。

准备 #

  • 下载并安装最新版 INFINI Console (版本要求 0.9.0-1036 及以上)
  • 下载并安装最新版的 INFINI Gateway (版本要求 1.12.0-915 及以上)
  • 两个 Elasticsearch 集群

Gateway 迁移配置 #

下载解压之后默认配置文件名为 gateway.yml,内容如下:

#the env section used for setup default settings, it can be overwritten by system environments.
#eg: PROD_ES_ENDPOINT=http://192.168.3.185:9200 LOGGING_ES_ENDPOINT=http://192.168.3.185:9201  ./bin/gateway
env: #use $[[env.LOGGING_ES_ENDPOINT]] in config instead
  LOGGING_ES_ENDPOINT: https://localhost:9200
  LOGGING_ES_USER: admin
  LOGGING_ES_PASS: admin
  PROD_ES_ENDPOINT: https://localhost:9200
  PROD_ES_USER: admin
  PROD_ES_PASS: admin
  GW_BINDING: "0.0.0.0:8000"
  API_BINDING: "0.0.0.0:2900"

path.data: data
path.logs: log
path.configs: config # directory of additional gateway configurations

configs.auto_reload: false # set true to auto reload gateway configurations

# Gateway internal stats collecting
stats:
  enabled: true
  # save stats under path.data
  persist: true
  # disable stats operations cache
  no_buffer: true
  # stats operations cache size
  buffer_size: 1000
  # stats operations cache flush interval
  flush_interval_ms: 1000

# Statsd integration
statsd:
  enabled: false
  host: localhost
  port: 8125
  namespace: "gateway."
  protocol: "udp"
  # flush interval
  interval_in_seconds: 1

##json logging layout
#log.format: '{"timestamp":"%UTCDateT%UTCTime","level":"%Level","message":"%EscapedMsg","file":"%File:%Line","func":"%FuncShort"}%n'

#system api
api:
  enabled: true
  network:
    binding: $[[env.API_BINDING]]

##elasticsearch servers
elasticsearch:
  - name: prod
    enabled: true
    endpoints:
      - $[[env.PROD_ES_ENDPOINT]]
    discovery:
      enabled: false
    basic_auth:
      username: $[[env.PROD_ES_USER]]
      password: $[[env.PROD_ES_PASS]]
    traffic_control.max_bytes_per_node: 1010485760
    metadata_cache_enabled: false # Whether to cache the cluster info in memory cache
  - name: logging-server
    enabled: true
    endpoints:
      - $[[env.LOGGING_ES_ENDPOINT]]
    basic_auth:
      username: $[[env.LOGGING_ES_USER]]
      password: $[[env.LOGGING_ES_PASS]]
    discovery:
      enabled: false

entry:
  - name: my_es_entry
    enabled: true
    router: my_router
    max_concurrency: 10000
    network:
      binding: $[[env.GW_BINDING]]
#  tls:
#   enabled: true #auto generate certs, cert_file and key_file are optional
#   cert_file: /data/gateway/cert/elasticsearch.pem
#   key_file: /data/gateway/cert/elasticsearch.key
#   skip_insecure_verify: false

router:
  - name: my_router
    default_flow: default_flow
    tracing_flow: logging_flow
    rules:
      - method:
          - "*"
        pattern:
          - "/_bulk"
          - "/{any_index}/_bulk"
        flow:
          - async_bulk_flow

flow:
  - name: default_flow
    filter:
      - elasticsearch:
          elasticsearch: prod
          max_connection_per_node: 1000
  #    - http: ##another general option to proxy requests
  #        schema: "http" #https or http
  #        max_idle_conn_duration: "900s"
  #        skip_failure_host: false
  #        hosts:
  #          - "localhost:9200"
  - name: logging_flow
    filter:
      - logging:
          queue_name: request_logging
          max_request_body_size: 4096
          max_response_body_size: 4096
  #        when: #>1s or none-200 requests will be logged
  #         or:
  #           - not:
  #               or:
  #                 - equals:
  #                     _ctx.request.path: "/favicon.ico"
  #                 - in:
  #                     _ctx.response.status: [404,200,201]
  #                 - suffix:
  #                     _ctx.request.path: ".js"
  #                 - suffix:
  #                     _ctx.request.path: ".css"
  #           - range:
  #               _ctx.elapsed.gte: 1000
  - name: async_bulk_flow
    filter:
      - bulk_reshuffle:
          when:
            contains:
              _ctx.request.path: /_bulk
          elasticsearch: prod
          # Options: cluster,node,index,shard
          # NOTE: node/shard level requires elasticsearch cluster info
          level: node
          partition_size: 10 #extra partition within level
          #shards: [1,3,5,7,9,11,13] #filter shards to ingest for node or shard level
          continue_metadata_missing: true # If true, will continue to execute following processors if cluster info missing (level: node, shard)
          fix_null_id: true
      - elasticsearch:
          elasticsearch: prod
          max_connection_per_node: 1000
#    - http: #fallback for non-bulk requests
#        schema: "http" #https or http
#        hosts:
#          - localhost:9200

##background jobs
pipeline:
  - name: pipeline_logging_merge
    auto_start: true
    keep_running: true
    processor:
      - indexing_merge:
          input_queue: "logging"
          idle_timeout_in_seconds: 1
          elasticsearch: "logging-server"
          index_name: ".infini_logs"
          output_queue:
            name: "gateway-pipeline-logs"
            label:
              tag: "pipeline_logging"
          worker_size: 1
          bulk_size_in_kb: 1
  - name: ingest_pipeline_logging
    auto_start: true
    keep_running: true
    processor:
      - bulk_indexing:
          bulk:
            compress: true
            batch_size_in_mb: 1
            batch_size_in_docs: 1
          consumer:
            fetch_max_messages: 100
          queues:
            type: indexing_merge
            tag: "pipeline_logging"
  ## system logging and metrics
  - name: async_messages_merge
    auto_start: true
    keep_running: true
    processor:
      - indexing_merge:
          input_queue: "bulk_result_messages"
          elasticsearch: "logging-server"
          index_name: ".infini_async_bulk_results"
          output_queue:
            name: "bulk_requests"
            label:
              tag: "bulk_logging"
          worker_size: 1
          bulk_size_in_mb: 10
  - name: metrics_merge
    auto_start: true
    keep_running: true
    processor:
      - indexing_merge:
          input_queue: "metrics"
          elasticsearch: "logging-server"
          index_name: ".infini_metrics"
          output_queue:
            name: "bulk_requests"
            label:
              tag: "metrics"
          worker_size: 1
          bulk_size_in_mb: 10
  - name: request_logging_merge
    auto_start: true
    keep_running: true
    processor:
      - indexing_merge:
          input_queue: "request_logging"
          elasticsearch: "logging-server"
          index_name: ".infini_requests_logging"
          output_queue:
            name: "bulk_requests"
            label:
              tag: "request_logging"
          worker_size: 1
          bulk_size_in_mb: 10
  - name: ingest_merged_requests
    auto_start: true
    keep_running: true
    processor:
      - bulk_indexing:
          num_of_slices: 3 #runtime slicing
          bulk:
            compress: false
            batch_size_in_mb: 10
            batch_size_in_docs: 500
            #remove_duplicated_newlines: true
            invalid_queue: "invalid_request"
            response_handle:
              bulk_result_message_queue: "system_failure_messages"
              max_request_body_size: 10240
              max_response_body_size: 10240
              save_success_results: false
              max_error_details_count: 5
          consumer:
            fetch_max_messages: 100
          queues:
            type: indexing_merge
          when:
            cluster_available: ["logging-server"]

  ##async way to ingest bulk requests handled by async_bulk_flow
  - name: async_ingest_bulk_requests
    auto_start: true
    keep_running: true
    retry_delay_in_ms: 1000
    processor:
      - bulk_indexing:
          max_connection_per_node: 1000
          num_of_slices: 2 #runtime slice
          max_worker_size: 200
          idle_timeout_in_seconds: 10
          bulk:
            compress: false
            batch_size_in_mb: 20
            batch_size_in_docs: 5000
            invalid_queue: "bulk_invalid_requests"
            dead_letter_queue: "bulk_dead_requests"
            response_handle:
              bulk_result_message_queue: "bulk_result_messages"
              max_request_body_size: 1024
              max_response_body_size: 1024
              save_success_results: true
              max_error_details_count: 5
              retry_rules:
                default: true
                retry_429: true
                retry_4xx: false
                denied:
                  status: []
                  keyword:
                    - illegal_state_exception
          consumer:
            fetch_max_messages: 100
            eof_retry_delay_in_ms: 500
          queue_selector:
            labels:
              type: bulk_reshuffle

##metrics
metrics:
  enabled: true
  queue: metrics
  logging_queue: logging
  instance:
    enabled: true
  network:
    enabled: true
    summary: true
    sockets: true

##diskqueue
disk_queue:
  prepare_files_to_read: true
  #max_bytes_per_file: 20971520
  eof_retry_delay_in_ms: 500
  cleanup_files_on_init: false
  retention:
    max_num_of_local_files: 20 # automatically cleanup consumed files
  compress:
    segment:
      enabled: true
    delete_after_compress: true # trigger cleanup after compression.
    idle_threshold: 20 # max number of uncompressed consumed files to preserve.
#  upload_to_s3: true
#  s3:
#    server: my_blob_store #name defined in s3 servers
#    location: your_location
#    bucket: your_bucket_name

##s3 servers
#s3:
#  my_blob_store: #name of s3 server
#    endpoint: "localhost:9021"
#    access_key: "your_access_key"
#    access_secret: "your_access_secret"
#    token: "your_token"
#    skip_insecure_verify: true
#    ssl: true

## badger kv storage configuration
badger:
  single_bucket_mode: true
  path: "" # defaults to {path.data}/gateway/node/{nodeID}/badger/
  memory_mode: false # don't persist data to disk
  sync_writes: false # flush to disk on every write
  mem_table_size: 10485760
  num_mem_tables: 1
  # lsm tuning options
  value_log_max_entries: 1000000
  value_log_file_size: 536870912
  value_threshold: 1048576
  num_level0_tables: 1
  num_level0_tables_stall: 2

## elasticsearch module global configurations
elastic:
  # elasticsearch for gateway's system info storage
  elasticsearch: prod
  enabled: true
  remote_configs: false
  health_check:
    enabled: true
    interval: 30s
  availability_check:
    enabled: true
    interval: 30s
  metadata_refresh:
    enabled: true
    interval: 60s
  cluster_settings_check:
    enabled: false
    interval: 60s

一般我们只要按需修改 Elasticsearch 集群的地址和身份验证信息, 这里我们修改 env 配置节如下:

env: #use $[[env.LOGGING_ES_ENDPOINT]] in config instead
  LOGGING_ES_ENDPOINT: https://192.168.3.12:9212
  LOGGING_ES_USER: admin
  LOGGING_ES_PASS: admin
  PROD_ES_ENDPOINT:  https://192.168.3.12:9212
  PROD_ES_USER: admin
  PROD_ES_PASS: admin
  GW_BINDING: "0.0.0.0:8000"
  API_BINDING: "0.0.0.0:2900"

这里需要注意的是 LOGGING_ES_ENDPOINT 配置的是日志写入的 ES 集群,这个集群需要和 Console 配置的系统 ES 集群保持一致

启动 Gateway #

./gateway-xxx-xxx -config gateway.yml

注册 Gateway #

这里我们使用极限网关作为迁移任务的执行者,需要提前将网关实例注册到 Console 里面管理,后面创建迁移任务的时候会用到。 点击 INFINI Console 中左侧菜单 资源管理》网关管理 ,然后点击 新建 按钮注册新的实例,如下图所示:

输入网关的地址,这里要注意网关的默认 API 地址使用的是 2900 端口,这里我们输入 192.168.3.12:2900,然后点击下一步

点击下一步,完成网关注册

注册源集群和目标集群 #

点击 INFINI Console 中左侧菜单 资源管理》集群管理,然后点击注册集群,先后注册源集群 es-v5616 和目标集群 es-v710,如下图所示:

如果 Elasticsearch 集群有身份验证,需要设置身份验证信息,然后点击下一步

确认集群信息无误,然后点击下一步

到这里源目标集群就注册完成了,目标集群 es-v710 的注册步骤也是一样的,这里就不赘述了。

创建迁移任务 #

点击 INFINI Console 中左侧菜单 数据工具》数据迁移,然后点击新建按钮创建迁移任务,如下图所示:

配置迁移集群 #

在源集群列表中选择集群 es-v5616, 在目标集群列表中选择集群 es-v710

配置迁移索引 #

点击选择迁移索引按钮, 如下图:

这里我们选择了索引 test ,然后点击确认

test 索引包含两个 type,系统自动按 type 拆分成两个索引

表格右方可以设置目标索引名称和文档 type,按需修改即可。 选择完索引之后,点击下一步,进行索引的初始化操作,如下图:

点击展开后,可以看到有 mappings 和 settings 设置,如图所示, mappings 设置左侧显示的是源集群索引的 mappings, 可以点击中间按钮复制到右侧, 然后点击 Auto Optimize 自动优化(兼容性优化)。设置完成后点击 Start 执行初始化 mappings 和 settings 操作,若没有设置,则自动跳过。

如果已通过其他方式初始化索引 settings 和 mappings, 这里可以直接点击下一步跳过

完成索引初始化之后,点击下一步,进行迁移任务的数据范围设置和分区设置,如下图:

配置数据范围 #

如果需要过滤数据迁移,可以进行数据范围的设置,这里我们进行全量的数据迁移,就不设置了

配置数据分区 #

如果一个索引数据量特别大,可以进行数据分区的设置。数据分区根据设置的字段,以及分区步长将数据拆成多段,系统最终会将一个分段的数据作为一个子任务去运行,迁移数据, 这样的话即使,一个分段迁移过程出现异常,只需要重跑这个子任务。

数据分区设置目前支持按照日期类型字段(date), 和数字类型 (number) 拆分分区,如上图所示,我们选择日期类型字段 now_widh_format 进行拆分分区,分区步长设置为 5分钟(5m), 然后点击预览按钮,可以看到根据设置拆分可以得到 8 个分区(文档数为0的分区最终不会生成子任务)。 根据预览信息确认分区设置无误之后,点击保存关闭分区设置并保存,然后点击下一步进行运行设置。

运行设置 #

一般情况下使用默认设置,然后执行节点选择先前注册的网关实例 Nebula,然后点击创建任务。

启动迁移任务 #

创建迁移任务成功后会看到任务列表,如下图:

可以看到,最近一条任务就是我们刚创建的,然后在表格右侧操作栏中点击 start 开始任务

任务开始之前,需要确认如果迁移索引涉及到 ILM 配置,需要确认目标集群中相关索引模版,ILM 别名是否配置好。

点击开始按钮 启动迁移任务。

查看迁移任务进度 #

任务启动成功之,点击详情进入任务详情页查看任务执行状态。点击 Refresh 按钮开启自动刷新之后,我们可以看到任务详情有如下变化:

图中蓝色方块表示,子任务(分区任务)已经在运行,灰色表示任务还没有开始

上图中可以看到方块变成了绿色,表示子任务(分区任务)已经数据迁移完成,索引 test-doc的迁移进度是 100%, 索引 test-doc1 迁移进度是 21.11

上图中可以看到所有方块变成了绿色,索引迁移进度都是 100%, 表示数据已经迁移完成。 如果迁移过程中有方块变成了红色,则表示迁移过程出现了错误,这时候可以点击任务方块进度信息里面的 View Log 查看错误日志,定位具体错误原因。

小结 #

使用 INFINI 数据迁移功能可以很方便地将 Elasticsearch 数据进行跨版本迁移,并且可以很直观地查看 当前数据的迁移进度。