searchusermenu
  • 发布文章
  • 消息中心
点赞
收藏
评论
分享
原创

binlog 复制流程分析

2023-10-26 08:22:24
16
0

BINLOG复制主流程

 

 

1)Slave 在接收到start slave cmd 后 ,先后启动IO线程和 SQL线程,触发复制流程。
2)IO线程 首先建立mysql connect, 接着发送 COM_REGISTER_SLAVE , COM_BINLOG_DUMP( COM_BINLOG_DUMP_GTID).开启了master binlog sender 流程。
3)master sender流程,初始化start logfile and pos, 打开logfile,开始循环读入binlog file 中的event ,并发送给slave;如果没有新的事件则等待,直到新事件出现或者新文件出现;过程中主要是与mysql_bin_log 对象交互,mysql_bin_log 写入新事件触发update_cond 信号。同时mysql_bin_log 提供查询最新的pos 和logfile 的接口。在等待新事件过程中会定时发送心跳事件。
4)IO 线程在发生完 DUMP请求后 进入循环流程:等待从网络中读取事件,读到event 后进行各个类型的检查和事务检查,如果通过则写入到relay log files 中。循环过程中出错,触发重连,如果是严重错误或者重连超出次数后退出IO线程。
5) SQL线程 启动后,初始化worker 线程(在开启并行复制的情况下),然后初始化applier_reader. 并打开reader. 接着接入循环流程,阻塞在 reader.read_next_event,直到 relaylog file有新事件产生,然后apply event。 apply event 过程中, 如果是在MTS 模式下并且事件是可以并行处理, 仅仅是把event 写入worker 的队列,其他情况都是直接在SQL线程直接apply event。
 
 

Master端实现

 

master端的实现主要由Binlog_sender. 

/**
  The major logic of dump thread is implemented in this class. It sends
  required binlog events to clients according to their requests.
*/
class Binlog_sender {
  typedef Basic_binlog_file_reader<Binlog_ifile, Binlog_event_data_istream,
                                   Binlog_event_object_istream, Event_allocator>
      File_reader;

 public:
  Binlog_sender(THD *thd, const char *start_file, my_off_t start_pos,
                Gtid_set *exclude_gtids, uint32 flag);
  // 启动发送流程              
  void run();
private:  
  /* Requested start binlog file and position */
  const char *m_start_file;
  my_off_t m_start_pos;
  /* The binlog file it is reading */
  LOG_INFO m_linfo;
  
  // 心跳间隔
  std::chrono::nanoseconds m_heartbeat_period;
  std::chrono::nanoseconds m_last_event_sent_ts;
  // 启动发送一个binlog file,直到轮转下一个binlog
  int send_binlog(File_reader *reader, my_off_t start_pos);
  
  // 发送event,before end_pos
  int send_events(File_reader *reader, my_off_t end_pos);
  // 获取当前binlog 最后的位置, 可能会wait_for_events
  int get_binlog_end_pos(File_reader *reader, my_off_t *end_pos);
  // 读取一个事件
  int read_event(File_reader *reader, uchar **event_ptr, uint32 *event_len);
  // 发送一个事件
  int send_packet();
  // 等待一个新的事件,wait for  update_cond or timeout ,
  int wait_new_events(my_off_t log_pos);
  
  

 

每一个从库Start slave 后,在主库对应启动一个binlog_sender实例,来实时发送binlog事件. 其流程实现如下:
// COM_BINLOG_DUMP | COM_BINLOG_DUMP_GTID
com_binlog_dump | com_binlog_dump_gtid    
  |--  check_global_access
  |--  kill_zombie_dump_threads
  |--  mysql_binlog_send
       |-- Binlog_sender sender
       |-- sender.run  
          |-- init    
          |-- get start Logfile
          |-- while {
            |--   reader.open(logfile)
            |--   send_binlog(reader,start_pos)
                  |-- send_format_description_event
                  |-- while{
                     |-- get_binlog_end_pos(&end_pos) 
                         |-- read_pos = reader->position()
                         |-- end_pos = mysql_bin_log.get_binlog_end_pos();
                         |-- if read_pos==end_pos 循环等待新的event,直到换文件或者新event写入。
                              |-- 等待过程中 send_heartbeat_event 发生心跳
                         
                     |-- send_events(end_pos)  
                         |-- 循环发送read_pos 到 end_pos 的event.
                            |-- read_event
                            |-- before_send_hook
                            |-- send_packet
                            |-- after_send_hook        
                  |--   }
            |--   logfile=mysql_bin_log.find_next_log() 
          |-- }
  |-- unregister_replica  


SLAVE 端的实现

在Slave 端主要是由IO 线程和 SQL 线程来完成复制工作。 启动流程如下:
 
//rpl_replica.cc
start_slave_cmd
|-- start_slave(thd)
   |-- for mi in channel_map:
      |-- start_slave(thd,mi) 
         |-- start_slave_threads(mi)
            |-- start_slave_thread(handle_slave_io)  // 启动IO 线程
            |-- start_slave_thread(handle_slave_sql) // 启动SQL线程

 

IO线程核心实现

RUN_HOOK(binlog_relay_io, thread_start,.)
mysql = mysql_init(nullptr)  
safe_connect(thd, mysql, mi)  // 建立连接

connected: 重连后入口
register_slave_on_master(mysql, mi, &suppress_warnings)  // send COM_REGISTER_SLAVE
request_dump   // send  COM_BINLOG_DUMP_GTID or  COM_BINLOG_DUMP
while  !io_slave_killed(thd, mi)
  |-- read_event(mysql, &rpl, mi, &suppress_warnings)  // 从链接中读取一个完整数据包
  |-- RUN_HOOK(binlog_relay_io, after_read_event..)
  |-- queue_event(mi, event_buf, event_len)  // 检查binlog事件,不同事件有不同的检查逻辑,有效事件输出到 relay_log文件
  |-- RUN_HOOK(binlog_relay_io, after_queue_event..)
  |-- thd->mem_root->ClearForReuse();

 

IO线程核心工作是从master库中拉取binlog ,输出到relay_log 文件中;其过程中发生出错会尝试重连来恢复流程, 严重错误或者超过重连次数阈值则结束IO线程。
 
 
SQL线程核心实现
Relay_log_info *rli = ((Master_info *)arg)->rli;
Rpl_applier_reader applier_reader(rli);

init_replica_thread(thd, SLAVE_THD_SQL);  // 初始化SQL 线程资源

// MTS  并行复制模式下启动多个worker
slave_start_workers(rli, rli->opt_replica_parallel_workers,&mts_inited)
|-- rli->gaq = new Slave_committed_queue(rli->checkpoint_group, n); // 创建 queue 
|-- for (uint i = 0; i < n; i++)
    |-- slave_start_single_worker(rli, i)   // 启动worker 线程

applier_reader.open(&errmsg)
while (!main_loop_error && !sql_slave_killed(thd, rli)) 
|--  ev = applier_reader.read_next_event();
|--  exec_relay_log_event(thd, rli, &applier_reader, ev);  
    |--  apply_event_and_update_pos // 直接调用ev的do_apply_event或者写入worker队列进行异步处理(MTS ).
 
SQL 线程核心工作是读取relay_log ,执行逻辑日志的回放。 如果了开启MTS (并行复制模式),SQL线程则把事件分配给具体的WORKER来执行并行回放。
 
SLAVE端复制回放使用到了两个重要的对象,Master_info 和 Relay_log_info 。 其中Master_info 
记录当前复制source的信息,包括 log_name,log_pos, master_host,master_port等等信息, Relay_log_info 主要记录relay_log 的回放进度,worker信息,回放延迟等等。 两者都会持久化存储下来,可以通过库表查看 mysql.slave_master_info 和mysql.slave_relay_log_info 看到具体的复制信息。
 
 
复制的消息格式
复制流程主要使用到如下的消息:
1) COM_BINLOG_DUMP_GTID 消息格式
2 byte             flags
4 byte             server-id
4 byte             binlog-filename-len
filenamesize       binlog-filename
8 byte             binlog-pos
4 byte             gtid_set_encoded_size  
gitdEncodeSize     gtid_set_encoded_data

2) COM_BINLOG_DUMP 消息格式

 

4 byte       start-position
2 byte       flags
4 byte       server-id
filenamesize       binlog-filename

 3)  COM_REGISTER_SLAVE 消息格式

4 byte             server-id
1~2 byte           report_host_len
report_host_len    report_host
1~2 byte           report_user_len
report_user_len    report_user
1~2 byte           report_pass_len
report_pass_len    report_pass
2 byte             report_port
4 byte             0   // Fake rpl_recovery_rank
4 byte             0   // master_id ,master will fill

4) binlog_event 消息格式 

主库向从库发生binlog_event 消息,直接使用存储在binlog中的格式,从库获取到后可以直接转成binlog事件对象。
 
0条评论
0 / 1000
tbLu
7文章数
0粉丝数
tbLu
7 文章 | 0 粉丝
原创

binlog 复制流程分析

2023-10-26 08:22:24
16
0

BINLOG复制主流程

 

 

1)Slave 在接收到start slave cmd 后 ,先后启动IO线程和 SQL线程,触发复制流程。
2)IO线程 首先建立mysql connect, 接着发送 COM_REGISTER_SLAVE , COM_BINLOG_DUMP( COM_BINLOG_DUMP_GTID).开启了master binlog sender 流程。
3)master sender流程,初始化start logfile and pos, 打开logfile,开始循环读入binlog file 中的event ,并发送给slave;如果没有新的事件则等待,直到新事件出现或者新文件出现;过程中主要是与mysql_bin_log 对象交互,mysql_bin_log 写入新事件触发update_cond 信号。同时mysql_bin_log 提供查询最新的pos 和logfile 的接口。在等待新事件过程中会定时发送心跳事件。
4)IO 线程在发生完 DUMP请求后 进入循环流程:等待从网络中读取事件,读到event 后进行各个类型的检查和事务检查,如果通过则写入到relay log files 中。循环过程中出错,触发重连,如果是严重错误或者重连超出次数后退出IO线程。
5) SQL线程 启动后,初始化worker 线程(在开启并行复制的情况下),然后初始化applier_reader. 并打开reader. 接着接入循环流程,阻塞在 reader.read_next_event,直到 relaylog file有新事件产生,然后apply event。 apply event 过程中, 如果是在MTS 模式下并且事件是可以并行处理, 仅仅是把event 写入worker 的队列,其他情况都是直接在SQL线程直接apply event。
 
 

Master端实现

 

master端的实现主要由Binlog_sender. 

/**
  The major logic of dump thread is implemented in this class. It sends
  required binlog events to clients according to their requests.
*/
class Binlog_sender {
  typedef Basic_binlog_file_reader<Binlog_ifile, Binlog_event_data_istream,
                                   Binlog_event_object_istream, Event_allocator>
      File_reader;

 public:
  Binlog_sender(THD *thd, const char *start_file, my_off_t start_pos,
                Gtid_set *exclude_gtids, uint32 flag);
  // 启动发送流程              
  void run();
private:  
  /* Requested start binlog file and position */
  const char *m_start_file;
  my_off_t m_start_pos;
  /* The binlog file it is reading */
  LOG_INFO m_linfo;
  
  // 心跳间隔
  std::chrono::nanoseconds m_heartbeat_period;
  std::chrono::nanoseconds m_last_event_sent_ts;
  // 启动发送一个binlog file,直到轮转下一个binlog
  int send_binlog(File_reader *reader, my_off_t start_pos);
  
  // 发送event,before end_pos
  int send_events(File_reader *reader, my_off_t end_pos);
  // 获取当前binlog 最后的位置, 可能会wait_for_events
  int get_binlog_end_pos(File_reader *reader, my_off_t *end_pos);
  // 读取一个事件
  int read_event(File_reader *reader, uchar **event_ptr, uint32 *event_len);
  // 发送一个事件
  int send_packet();
  // 等待一个新的事件,wait for  update_cond or timeout ,
  int wait_new_events(my_off_t log_pos);
  
  

 

每一个从库Start slave 后,在主库对应启动一个binlog_sender实例,来实时发送binlog事件. 其流程实现如下:
// COM_BINLOG_DUMP | COM_BINLOG_DUMP_GTID
com_binlog_dump | com_binlog_dump_gtid    
  |--  check_global_access
  |--  kill_zombie_dump_threads
  |--  mysql_binlog_send
       |-- Binlog_sender sender
       |-- sender.run  
          |-- init    
          |-- get start Logfile
          |-- while {
            |--   reader.open(logfile)
            |--   send_binlog(reader,start_pos)
                  |-- send_format_description_event
                  |-- while{
                     |-- get_binlog_end_pos(&end_pos) 
                         |-- read_pos = reader->position()
                         |-- end_pos = mysql_bin_log.get_binlog_end_pos();
                         |-- if read_pos==end_pos 循环等待新的event,直到换文件或者新event写入。
                              |-- 等待过程中 send_heartbeat_event 发生心跳
                         
                     |-- send_events(end_pos)  
                         |-- 循环发送read_pos 到 end_pos 的event.
                            |-- read_event
                            |-- before_send_hook
                            |-- send_packet
                            |-- after_send_hook        
                  |--   }
            |--   logfile=mysql_bin_log.find_next_log() 
          |-- }
  |-- unregister_replica  


SLAVE 端的实现

在Slave 端主要是由IO 线程和 SQL 线程来完成复制工作。 启动流程如下:
 
//rpl_replica.cc
start_slave_cmd
|-- start_slave(thd)
   |-- for mi in channel_map:
      |-- start_slave(thd,mi) 
         |-- start_slave_threads(mi)
            |-- start_slave_thread(handle_slave_io)  // 启动IO 线程
            |-- start_slave_thread(handle_slave_sql) // 启动SQL线程

 

IO线程核心实现

RUN_HOOK(binlog_relay_io, thread_start,.)
mysql = mysql_init(nullptr)  
safe_connect(thd, mysql, mi)  // 建立连接

connected: 重连后入口
register_slave_on_master(mysql, mi, &suppress_warnings)  // send COM_REGISTER_SLAVE
request_dump   // send  COM_BINLOG_DUMP_GTID or  COM_BINLOG_DUMP
while  !io_slave_killed(thd, mi)
  |-- read_event(mysql, &rpl, mi, &suppress_warnings)  // 从链接中读取一个完整数据包
  |-- RUN_HOOK(binlog_relay_io, after_read_event..)
  |-- queue_event(mi, event_buf, event_len)  // 检查binlog事件,不同事件有不同的检查逻辑,有效事件输出到 relay_log文件
  |-- RUN_HOOK(binlog_relay_io, after_queue_event..)
  |-- thd->mem_root->ClearForReuse();

 

IO线程核心工作是从master库中拉取binlog ,输出到relay_log 文件中;其过程中发生出错会尝试重连来恢复流程, 严重错误或者超过重连次数阈值则结束IO线程。
 
 
SQL线程核心实现
Relay_log_info *rli = ((Master_info *)arg)->rli;
Rpl_applier_reader applier_reader(rli);

init_replica_thread(thd, SLAVE_THD_SQL);  // 初始化SQL 线程资源

// MTS  并行复制模式下启动多个worker
slave_start_workers(rli, rli->opt_replica_parallel_workers,&mts_inited)
|-- rli->gaq = new Slave_committed_queue(rli->checkpoint_group, n); // 创建 queue 
|-- for (uint i = 0; i < n; i++)
    |-- slave_start_single_worker(rli, i)   // 启动worker 线程

applier_reader.open(&errmsg)
while (!main_loop_error && !sql_slave_killed(thd, rli)) 
|--  ev = applier_reader.read_next_event();
|--  exec_relay_log_event(thd, rli, &applier_reader, ev);  
    |--  apply_event_and_update_pos // 直接调用ev的do_apply_event或者写入worker队列进行异步处理(MTS ).
 
SQL 线程核心工作是读取relay_log ,执行逻辑日志的回放。 如果了开启MTS (并行复制模式),SQL线程则把事件分配给具体的WORKER来执行并行回放。
 
SLAVE端复制回放使用到了两个重要的对象,Master_info 和 Relay_log_info 。 其中Master_info 
记录当前复制source的信息,包括 log_name,log_pos, master_host,master_port等等信息, Relay_log_info 主要记录relay_log 的回放进度,worker信息,回放延迟等等。 两者都会持久化存储下来,可以通过库表查看 mysql.slave_master_info 和mysql.slave_relay_log_info 看到具体的复制信息。
 
 
复制的消息格式
复制流程主要使用到如下的消息:
1) COM_BINLOG_DUMP_GTID 消息格式
2 byte             flags
4 byte             server-id
4 byte             binlog-filename-len
filenamesize       binlog-filename
8 byte             binlog-pos
4 byte             gtid_set_encoded_size  
gitdEncodeSize     gtid_set_encoded_data

2) COM_BINLOG_DUMP 消息格式

 

4 byte       start-position
2 byte       flags
4 byte       server-id
filenamesize       binlog-filename

 3)  COM_REGISTER_SLAVE 消息格式

4 byte             server-id
1~2 byte           report_host_len
report_host_len    report_host
1~2 byte           report_user_len
report_user_len    report_user
1~2 byte           report_pass_len
report_pass_len    report_pass
2 byte             report_port
4 byte             0   // Fake rpl_recovery_rank
4 byte             0   // master_id ,master will fill

4) binlog_event 消息格式 

主库向从库发生binlog_event 消息,直接使用存储在binlog中的格式,从库获取到后可以直接转成binlog事件对象。
 
文章来自个人专栏
皮皮鲁的专栏
7 文章 | 1 订阅
0条评论
0 / 1000
请输入你的评论
0
0