BINLOG复制主流程
1)Slave 在接收到start slave cmd 后 ,先后启动IO线程和 SQL线程,触发复制流程。
2)IO线程 首先建立mysql connect, 接着发送 COM_REGISTER_SLAVE , COM_BINLOG_DUMP( COM_BINLOG_DUMP_GTID).开启了master binlog sender 流程。
3)master sender流程,初始化start logfile and pos, 打开logfile,开始循环读入binlog file 中的event ,并发送给slave;如果没有新的事件则等待,直到新事件出现或者新文件出现;过程中主要是与mysql_bin_log 对象交互,mysql_bin_log 写入新事件触发update_cond 信号。同时mysql_bin_log 提供查询最新的pos 和logfile 的接口。在等待新事件过程中会定时发送心跳事件。
4)IO 线程在发生完 DUMP请求后 进入循环流程:等待从网络中读取事件,读到event 后进行各个类型的检查和事务检查,如果通过则写入到relay log files 中。循环过程中出错,触发重连,如果是严重错误或者重连超出次数后退出IO线程。
5) SQL线程 启动后,初始化worker 线程(在开启并行复制的情况下),然后初始化applier_reader. 并打开reader. 接着接入循环流程,阻塞在 reader.read_next_event,直到 relaylog file有新事件产生,然后apply event。 apply event 过程中, 如果是在MTS 模式下并且事件是可以并行处理, 仅仅是把event 写入worker 的队列,其他情况都是直接在SQL线程直接apply event。
Master端实现
master端的实现主要由Binlog_sender.
/**
The major logic of dump thread is implemented in this class. It sends
required binlog events to clients according to their requests.
*/
class Binlog_sender {
typedef Basic_binlog_file_reader<Binlog_ifile, Binlog_event_data_istream,
Binlog_event_object_istream, Event_allocator>
File_reader;
public:
Binlog_sender(THD *thd, const char *start_file, my_off_t start_pos,
Gtid_set *exclude_gtids, uint32 flag);
// 启动发送流程
void run();
private:
/* Requested start binlog file and position */
const char *m_start_file;
my_off_t m_start_pos;
/* The binlog file it is reading */
LOG_INFO m_linfo;
// 心跳间隔
std::chrono::nanoseconds m_heartbeat_period;
std::chrono::nanoseconds m_last_event_sent_ts;
// 启动发送一个binlog file,直到轮转下一个binlog
int send_binlog(File_reader *reader, my_off_t start_pos);
// 发送event,before end_pos
int send_events(File_reader *reader, my_off_t end_pos);
// 获取当前binlog 最后的位置, 可能会wait_for_events
int get_binlog_end_pos(File_reader *reader, my_off_t *end_pos);
// 读取一个事件
int read_event(File_reader *reader, uchar **event_ptr, uint32 *event_len);
// 发送一个事件
int send_packet();
// 等待一个新的事件,wait for update_cond or timeout ,
int wait_new_events(my_off_t log_pos);
每一个从库Start slave 后,在主库对应启动一个binlog_sender实例,来实时发送binlog事件. 其流程实现如下:
// COM_BINLOG_DUMP | COM_BINLOG_DUMP_GTID
com_binlog_dump | com_binlog_dump_gtid
|-- check_global_access
|-- kill_zombie_dump_threads
|-- mysql_binlog_send
|-- Binlog_sender sender
|-- sender.run
|-- init
|-- get start Logfile
|-- while {
|-- reader.open(logfile)
|-- send_binlog(reader,start_pos)
|-- send_format_description_event
|-- while{
|-- get_binlog_end_pos(&end_pos)
|-- read_pos = reader->position()
|-- end_pos = mysql_bin_log.get_binlog_end_pos();
|-- if read_pos==end_pos 循环等待新的event,直到换文件或者新event写入。
|-- 等待过程中 send_heartbeat_event 发生心跳
|-- send_events(end_pos)
|-- 循环发送read_pos 到 end_pos 的event.
|-- read_event
|-- before_send_hook
|-- send_packet
|-- after_send_hook
|-- }
|-- logfile=mysql_bin_log.find_next_log()
|-- }
|-- unregister_replica
SLAVE 端的实现
在Slave 端主要是由IO 线程和 SQL 线程来完成复制工作。 启动流程如下:
//rpl_replica.cc
start_slave_cmd
|-- start_slave(thd)
|-- for mi in channel_map:
|-- start_slave(thd,mi)
|-- start_slave_threads(mi)
|-- start_slave_thread(handle_slave_io) // 启动IO 线程
|-- start_slave_thread(handle_slave_sql) // 启动SQL线程
IO线程核心实现
RUN_HOOK(binlog_relay_io, thread_start,.)
mysql = mysql_init(nullptr)
safe_connect(thd, mysql, mi) // 建立连接
connected: 重连后入口
register_slave_on_master(mysql, mi, &suppress_warnings) // send COM_REGISTER_SLAVE
request_dump // send COM_BINLOG_DUMP_GTID or COM_BINLOG_DUMP
while !io_slave_killed(thd, mi)
|-- read_event(mysql, &rpl, mi, &suppress_warnings) // 从链接中读取一个完整数据包
|-- RUN_HOOK(binlog_relay_io, after_read_event..)
|-- queue_event(mi, event_buf, event_len) // 检查binlog事件,不同事件有不同的检查逻辑,有效事件输出到 relay_log文件
|-- RUN_HOOK(binlog_relay_io, after_queue_event..)
|-- thd->mem_root->ClearForReuse();
IO线程核心工作是从master库中拉取binlog ,输出到relay_log 文件中;其过程中发生出错会尝试重连来恢复流程, 严重错误或者超过重连次数阈值则结束IO线程。
SQL线程核心实现
Relay_log_info *rli = ((Master_info *)arg)->rli;
Rpl_applier_reader applier_reader(rli);
init_replica_thread(thd, SLAVE_THD_SQL); // 初始化SQL 线程资源
// MTS 并行复制模式下启动多个worker
slave_start_workers(rli, rli->opt_replica_parallel_workers,&mts_inited)
|-- rli->gaq = new Slave_committed_queue(rli->checkpoint_group, n); // 创建 queue
|-- for (uint i = 0; i < n; i++)
|-- slave_start_single_worker(rli, i) // 启动worker 线程
applier_reader.open(&errmsg)
while (!main_loop_error && !sql_slave_killed(thd, rli))
|-- ev = applier_reader.read_next_event();
|-- exec_relay_log_event(thd, rli, &applier_reader, ev);
|-- apply_event_and_update_pos // 直接调用ev的do_apply_event或者写入worker队列进行异步处理(MTS ).
SQL 线程核心工作是读取relay_log ,执行逻辑日志的回放。 如果了开启MTS (并行复制模式),SQL线程则把事件分配给具体的WORKER来执行并行回放。
SLAVE端复制回放使用到了两个重要的对象,Master_info 和 Relay_log_info 。 其中Master_info
复制的消息格式
复制流程主要使用到如下的消息:
记录当前复制source的信息,包括 log_name,log_pos, master_host,master_port等等信息, Relay_log_info 主要记录relay_log 的回放进度,worker信息,回放延迟等等。 两者都会持久化存储下来,可以通过库表查看 mysql.slave_master_info 和mysql.slave_relay_log_info 看到具体的复制信息。
1) COM_BINLOG_DUMP_GTID 消息格式
2 byte flags
4 byte server-id
4 byte binlog-filename-len
filenamesize binlog-filename
8 byte binlog-pos
4 byte gtid_set_encoded_size
gitdEncodeSize gtid_set_encoded_data
2) COM_BINLOG_DUMP 消息格式
4 byte start-position
2 byte flags
4 byte server-id
filenamesize binlog-filename
3) COM_REGISTER_SLAVE 消息格式
4 byte server-id
1~2 byte report_host_len
report_host_len report_host
1~2 byte report_user_len
report_user_len report_user
1~2 byte report_pass_len
report_pass_len report_pass
2 byte report_port
4 byte 0 // Fake rpl_recovery_rank
4 byte 0 // master_id ,master will fill
4) binlog_event 消息格式
主库向从库发生binlog_event 消息,直接使用存储在binlog中的格式,从库获取到后可以直接转成binlog事件对象。