whale-code Logo
石家庄鲸码互联网科技
← 返回博客列表

Logstash配置Elasticsearch集群之间增量同步

Logstash 配置:将数据从一个 Elasticsearch 同步到另一个 Elasticsearch

Logstash配置信息如下:

input {
  elasticsearch {
    hosts => ["http://source-es-node1:9200"]  # 源ES集群地址
    index => "source_index*"                # 要同步的索引名,支持通配符
    query => '{ "query": { "match_all": {} } }'  # 查询条件,默认匹配所有文档
    scroll => "5m"                          # 滚动时间
    size => 1000                            # 每次批量获取的文档数
    docinfo => true                         # 保留文档元数据
    schedule => "* * * * *"                 # 定时任务,每分钟执行一次(持续同步)
    # schedule => "@once"                   # 只执行一次(全量同步)
  }
}

filter {
  # 这里可以添加任何需要的过滤处理
  # 例如移除某些字段、重命名字段等
  
  # 示例:移除某些不需要的字段
  mutate {
    remove_field => ["@timestamp", "@version"]
  }
}

output {
  elasticsearch {
    hosts => ["http://target-es-node1:9200"]  # 目标ES集群地址
    index => "%{[@metadata][_index]}"      # 保持与源索引相同的名称
    document_id => "%{[@metadata][_id]}"   # 保持与源文档相同的ID
    action => "index"                      # 默认操作是索引文档
    # 如果目标集群需要认证
    user => "elastic"
    password => "your_password"
    # 如果使用HTTPS
    ssl => true
    cacert => "/path/to/cert.pem"
  }
  
  # 可选:输出到控制台用于调试
  stdout {
    codec => rubydebug
  }
}

使用说明

将上述配置保存为 es-to-es.conf 文件

运行 Logstash 指定该配置文件:

bin/logstash -f es-to-es.conf

高级配置选项

增量同步:基于时间戳只同步新增或修改的文档,直接修改query参数即可

input {
  elasticsearch {
    # ...其他配置...
    query => '{
      "query": {
        "range": {
          "@timestamp": {
            "gte": "now-1h"  # 只同步最近1小时的数据
          }
        }
      }
    }'
  }
}