Logstash 是一个开源的数据处理管道工具,广泛用于数据收集、处理和传输。它通常作为“ELK Stack”(Elasticsearch、Logstash、Kibana、Beats)的一个核心组件,用于处理结构化和非结构化数据。
自建Logstash可以实现将源Elasticsearch实例(如天翼云、自建或第三方Elasticsearch实例)中的数据迁移至天翼云Elasticsearch实例。在升级实例版本、实例架构调整、或跨区域的实例数据迁移时,可以选择使用Logstash迁移源Elasticsearch实例数据,推荐使用Logstash 7.10.2版本。
Logstash的方式迁移数据支持跨大版本,且迁移方式灵活,下表是支持的集群版本:
源\目标 | Elasticsearch7.10.2 | OpenSearch2.9.0 |
---|---|---|
Elasticsearch6.x | √ | √ |
Elasticsearch7.x小于7.10.2 | √ | √ |
Elasticsearch7.x大于7.10.2 | √ | √ |
Elasticsearch8.x | √ | √ |
本文以自建Elasticsearch 7.10.2版本迁移至天翼云Elasticsearch实例为例子。
前提条件
- 已经创建天翼云Elasticsearch实例。
- 已经申请天翼云云服务器且自行部署Logstash或者在其他服务器部署Logstash(Logstash OSS版本推荐7.10.2版本)。
- 自建Logstash能够访问目的Elasticsearch实例以及需要迁移的源Elasticsearch实例。
Logstash工作模型
Logstash工作模型核心部分为三部分:输入(Input)、过滤器(Filter)、输出(Output),按照配置管道文件的顺序对数据进行提取、处理转换、输出。
1.输入(Input):Logstash支持多种数据输入源,如文件、数据库、消息队列以及Elasticsearch等。在我们的场景中,源Elasticsearch实例就是输入数据源。Logstash会批量提取源Elasticsearch实例中的数据。
2.过滤器(Filter):过滤器是可选的,用于对输入数据进行实时处理和转换。它提供了一些强大的插件,可以对数据进行解析、变换、裁剪或其他操作。在我们的场景中,可以选择是否使用过滤器来处理迁移中的数据,例如删除源数据中不需要迁移的字段等操作。
3.输出(Output):Logstash的输出插件负责将处理后的数据写入到目标位置,这可以是文件、数据库、消息队列,或者像本例中的天翼云Elasticsearch实例。
迁移必备的信息
- 源实例(天翼云、自建或第三方Elasticsearch实例)访问地址、用户名以及密码。
- 目的实例(天翼云Elasticsearch实例)访问地址、用户名以及密码。
- 提前在目的实例创建好源实例中的待迁移索引的索引结构。
测试Elasticsearch实例服务正常
curl http://{ip}:{port}
分别将ip和port替换为源实例以及目的实例的实际ip地址和端口号。
使用自建Logstash全量迁移数据
例如在Logstash目录下创建一个管道文件es-es.conf,写入下面的配置:
input{
elasticsearch{
# 源Elasticsearch实例的访问地址
hosts => ["http://{ip}:{port}", "http://{ip}:{port}", "http://{ip}:{port}"]
# 访问源Elasticsearch实例的用户名和密码,如无安全机制可不配置
user => "*********"
password => "********"
# 配置源实例中待迁移的索引,可以使用通配符
index => "index1, index2"
# 查询Elasticsearch实例包含元数据
docinfo => true
# 使用多个切片提高吞吐量,合理值的范围从2到大约8,一般不要超过索引分片数
slices => 2
# Logstahs每次查询Elasticsearch实例返回的最大数据条数
size => 1000
}
}
# 在此对数据进行处理
filter {
mutate {
# 移除logstash增加的字段
remove_field => ["@metadata", "@version"]
}
}
output{
elasticsearch{
# 目标Elasticsearch实例的访问地址
hosts => ["http://{ip}:{port}", "http://{ip}:{port}", "http://{ip}:{port}"]
# 访问目标Elasticsearch实例的用户名和密码,如无安全机制可不配置
user => "********"
password => "********"
# 配置目标实例的索引,如下配置时和源实例保持一致
index => "%{[@metadata][_index]}"
document_id => "%{[@metadata][_id]}"
}
}
使用下面的命令检查Logstash管道文件的配置是否正确。
./bin/logstash -tf es-es.conf
Result:OK即检查通过,语法没有问题。检查通过后使用下面的命令启动Logstash管道。
./bin/logstash -f es-es.conf
完成迁移后对比迁移前后索引结构、索引中数据条数、索引存储等指标,保证数据全部迁移完毕。
使用自建Logstash增量迁移数据
增量迁移数据和全量迁移数据类似,区别在于增量迁移需要待迁移的索引中有增量字段。
增量迁移数据的Logstash管道配置文件和全量迁移数据不同在于需要配置具体的’query’。例如,有一个索引中有一个时间字段'created_at',类型为'date',可能的值例如,"2024-10-11T12:34:56Z"。
那么需要添加如下配置,‘query’参数的内容是Elasticsearch实例查询的语法,例如:
query => '{"query":{"bool":{"should":[{"range":{"created_at":{"from":"2024-10-11T12:34:56Z"}}}]}}}'
需要根据具体索引结构来修改。
整个管道文件完成的配置如下:
input{
elasticsearch{
# 源Elasticsearch实例的访问地址
hosts => ["http://{ip}:{port}", "http://{ip}:{port}", "http://{ip}:{port}"]
# 访问源Elasticsearch实例的用户名和密码,如无安全机制可不配置
user => "*********"
password => "********"
# 配置源实例中待迁移的索引,可以使用通配符
index => "index1, index2"
query => '{"query":{"bool":{"should":[{"range":{"created_at":{"from":"2024-10-11T12:34:56Z"}}}]}}}'
# 查询Elasticsearch实例包含元数据
docinfo => true
# 使用多个切片提高吞吐量,合理值的范围从2到大约8,一般不要超过索引分片数
slices => 2
# Logstahs每次查询Elasticsearch实例返回的最大数据条数
size => 1000
}
}
# 在此对数据进行处理
filter {
mutate {
# 移除logstash增加的字段
remove_field => ["@metadata", "@version"]
}
}
output{
elasticsearch{
# 目标Elasticsearch实例的访问地址
hosts => ["http://{ip}:{port}", "http://{ip}:{port}", "http://{ip}:{port}"]
# 访问目标Elasticsearch实例的用户名和密码,如无安全机制可不配置
user => "********"
password => "********"
# 配置目标实例的索引,如下配置时和源实例保持一致
index => "%{[@metadata][_index]}"
document_id => "%{[@metadata][_id]}"
}
}
检查并启动Logstash即可完成增量迁移。