Logstash 是一个开源的数据处理管道工具,广泛用于数据收集、处理和传输。它通常作为“ELK Stack”(Elasticsearch、Logstash、Kibana、Beats)的一个核心组件,用于处理结构化和非结构化数据。
天翼云Logstash可以实现将源Elasticsearch实例(如天翼云、自建或第三方Elasticsearch实例)中的数据迁移至天翼云Elasticsearch实例。在升级实例版本、实例架构调整、或跨区域的实例数据迁移时,可以选择使用天翼云Logstash迁移源Elasticsearch实例数据。
Logstash的方式迁移数据支持跨大版本,且迁移方式灵活,下表是支持的集群版本:
源\目标 | Elasticsearch7.10.2 | OpenSearch2.19.1 |
---|---|---|
Elasticsearch6.x | √ | √ |
Elasticsearch7.x小于7.10.2 | √ | √ |
Elasticsearch7.x大于7.10.2 | √ | √ |
Elasticsearch8.x | √ | √ |
本文以自建Elasticsearch 7.10.2版本迁移至天翼云Elasticsearch实例为例子。
前提条件
已经创建天翼云Elasticsearch实例。
已经在创建的Elasticsearch实例中加装了Logstash实例。
加装Logstash能够通过内网或公网访问需要迁移的源Elasticsearch实例。
Logstash工作模型
Logstash工作模型核心部分为三部分:输入(Input)、过滤器(Filter)、输出(Output),按照配置管道文件的顺序对数据进行提取、处理转换、输出。
1.输入(Input):Logstash支持多种数据输入源,如文件、数据库、消息队列以及Elasticsearch等。在我们的场景中,源Elasticsearch实例就是输入数据源。Logstash会批量提取源Elasticsearch实例中的数据。
2.过滤器(Filter):过滤器是可选的,用于对输入数据进行实时处理和转换。它提供了一些强大的插件,可以对数据进行解析、变换、裁剪或其他操作。在我们的场景中,可以选择是否使用过滤器来处理迁移中的数据,例如删除源数据中不需要迁移的字段等操作。
3.输出(Output):Logstash的输出插件负责将处理后的数据写入到目标位置,这可以是文件、数据库、消息队列,或者像本例中的天翼云Elasticsearch实例。
迁移必备的信息
源实例(天翼云、自建或第三方Elasticsearch实例)访问地址、用户名以及密码。
目的实例(天翼云Elasticsearch实例)访问地址、用户名以及密码。
提前在目的实例创建好源实例中的待迁移索引的索引结构。
测试Elasticsearch实例服务正常
curl http://{ip}:{port}
分别将ip和port替换为源实例以及目的实例的实际ip地址和端口号。
使用自建Logstash全量迁移数据
例如进入“Logstash实例管理”界面,在左侧导航栏选择“管道管理”,进入管道管理页面。选择新建管道,在Logstash管道管理下创建一个管道名称为test的管道,写入下面的配置:
input{
elasticsearch{
# 源Elasticsearch实例的访问地址
hosts ==> ["http://{ip}:{port}", "http://{ip}:{port}", "http://{ip}:{port}"]
# 访问源Elasticsearch实例的用户名和密码,如无安全机制可不配置
user ==> "*********"
password ==> "********"
# 配置源实例中待迁移的索引,可以使用通配符
index ==> "index1, index2"
# 查询Elasticsearch实例包含元数据
docinfo ==> true
# 使用多个切片提高吞吐量,合理值的范围从2到大约8,一般不要超过索引分片数
slices ==> 2
# Logstahs每次查询Elasticsearch实例返回的最大数据条数
size ==> 1000
}
}
# 在此对数据进行处理
filter {
mutate {
# 移除logstash增加的字段
remove_field ==> ["@metadata", "@version"]
}
}
output{
elasticsearch{
# 目标Elasticsearch实例的访问地址
hosts ==> ["http://{ip}:{port}", "http://{ip}:{port}", "http://{ip}:{port}"]
# 访问目标Elasticsearch实例的用户名和密码,如无安全机制可不配置
user ==> "********"
password ==> "********"
# 配置目标实例的索引,如下配置时和源实例保持一致
index ==> "%{[@metadata][_index]}"
document_id ==> "%{[@metadata][_id]}"
}
}
点击保存并部署完成管道注册,通过预检验后,会进行全量数据迁移。通过日志和目的实例中的索引可以观察是否数据正常迁移。
待迁移完成后,选择停止管道,取消注册。
完成迁移后对比迁移前后索引结构、索引中数据条数、索引存储等指标,保证数据全部迁移完毕。
使用加装的Logstash增量迁移数据
增量迁移数据和全量迁移数据类似,区别在于增量迁移需要待迁移的索引中有增量字段。
增量迁移数据的Logstash管道配置文件和全量迁移数据不同在于需要配置具体的’query’。例如,有一个索引中有一个时间字段'created_at',类型为'date',可能的值例如,"2024-10-11T12:34:56Z"。
那么需要添加如下配置,‘query’参数的内容是Elasticsearch实例查询的语法,例如:
query ==> '{"query":{"bool":{"should":[{"range":{"created_at":{"from":"2024-10-11T12:34:56Z"}}}]}}}'
需要根据具体索引结构来修改。
整个管道文件完成的配置如下:
input{
elasticsearch{
# 源Elasticsearch实例的访问地址
hosts ==> ["http://{ip}:{port}", "http://{ip}:{port}", "http://{ip}:{port}"]
# 访问源Elasticsearch实例的用户名和密码,如无安全机制可不配置
user ==> "*********"
password ==> "********"
# 配置源实例中待迁移的索引,可以使用通配符
index ==> "index1, index2"
query ==> '{"query":{"bool":{"should":[{"range":{"created_at":{"from":"2024-10-11T12:34:56Z"}}}]}}}'
# 查询Elasticsearch实例包含元数据
docinfo ==> true
# 使用多个切片提高吞吐量,合理值的范围从2到大约8,一般不要超过索引分片数
slices ==> 2
# Logstahs每次查询Elasticsearch实例返回的最大数据条数
size ==> 1000
}
}
# 在此对数据进行处理
filter {
mutate {
# 移除logstash增加的字段
remove_field ==> ["@metadata", "@version"]
}
}
output{
elasticsearch{
# 目标Elasticsearch实例的访问地址
hosts ==> ["http://{ip}:{port}", "http://{ip}:{port}", "http://{ip}:{port}"]
# 访问目标Elasticsearch实例的用户名和密码,如无安全机制可不配置
user ==> "********"
password ==> "********"
# 配置目标实例的索引,如下配置时和源实例保持一致
index ==> "%{[@metadata][_index]}"
document_id ==> "%{[@metadata][_id]}"
}
}
和全量迁移一样,保存并部署完成管道注册,即可完成增量迁移。