环境要求:
1、python2.7
2、redis
3、Mysql 配置 binlog-format=row
安装
pip install mysqlsmom
全量同步
# 创建全量同步配置文件
$ mom new test_mom/init_config.py -t init --force
# 编辑配置文件
$ vim ./test_mom/init_config.py # 按注释提示修改配置
# 开始同步
$ mom run -c ./test_mom/init_config.py
增量同步
配置三个文件
test_mom
├── binlog_config.py # 配置文件
├── my_filters.py # 过滤器 配置于 watched
└── my_handlers.py # 处理器 配置于 pipeline
新建配置
mom new test_mom/binlog_config.py -t binlog --force
1、binlog_config.py
# coding=utf-8
STREAM = "BINLOG" # "BINLOG" or "INIT"
SERVER_ID = 99
SLAVE_UUID = __name__
# 一次同步 BULK_SIZE 条数据到elasticsearch,不设置该配置项默认为1
BULK_SIZE = 1
BINLOG_CONNECTION = {
'host': '127.0.0.1',
'port': 3306,
'user': 'root',
'passwd': '123456'
}
# redis存储上次同步位置等信息
REDIS = {
"host": "127.0.0.1",
"port": 6379,
"db": 0,
# "password": "password", # 不需要密码则注释或删掉该行
}
NODES = [{"host": "127.0.0.1", "port": 9200}]
TASKS = [
{
"stream": {
"database": "demo",
"table": "student"
},
"jobs": [{
"actions": ["insert", "update"],
"watched": {
"filter_display": {}
},
"pipeline": [
{"only_fields": {"fields": ["id", "name", "age"]}},
{"change_name": {"key": "name", "prefix": "hot-"}},
{"set_id": {"field": "id"}}
],
"dest": {
"es": {
"action": "upsert",
"index": "demo",
"type": "student",
"nodes": NODES
}
}
},
{
"actions": ["delete"],
"pipeline": [
# {"only_fields": {"fields": ["id", "name", "age"]}},
{"set_id": {"field": "id"}}
],
"dest": {
"es": {
"action": "delete",
"index": "demo",
"type": "student",
"nodes": NODES
}
}
}
]
}
]
CUSTOM_ROW_HANDLERS = "./my_handlers.py"
CUSTOM_ROW_FILTERS = "./my_filters.py"
自定义处理器 my_handlers.py
# -*- coding: utf-8 -*-
import copy
def change_name(row, key, prefix):
new_row = copy.deepcopy(row)
new_row[key] = "{}{}".format(prefix, row[key])
# 返回数据字典,下一工序继续处理
return new_row
自定义过滤器 my_filters.py
# -*- coding: utf-8 -*-
def filter_display(event):
# 返回True 或 False,使用或丢弃
return event["values"]["display"] == 1
启动
mom run -c test_mom/binlog_config.py