searchusermenu
  • 发布文章
  • 消息中心
点赞
收藏
评论
分享
原创

mysql半同步复制分析

2023-04-14 09:40:59
25
0

用户线程事务提交

 //事务刷盘及提交
 //第一阶段,将缓存刷到binlog
 //第二阶段,提交所有事务

ordered_commit()
   ...
   //一阶段flush binlog后,通过RUN_HOOK调用repl_semi_report_binlog_update
   if (RUN_HOOK(binlog_storage, after_flush,(thd, file_name_ptr, flush_end_pos)))
  repl_semi_report_binlog_update()
    //传入文件名和位置(文件中所能看的最大事务)
    //如果是未初始化,将文件名和位置更新
    //如果已初始化,比较位置大小。若log_file更大,将文件名和位置更新
    repl_semisync.writeTranxInBinlog(log_file,log_pos)
    //比较文件和位置
    ActiveTranx::compare(log_file_name, log_file_pos,commit_file_name_, commit_file_pos_)
    //插入一个节点,位于事务链表的末端,并生成一个哈希值给节点
    active_tranxs_->insert_tranx_node(log_file_name, log_file_pos)
  ...
 //判断binlog是否发送到从库并应答,并阻塞等待
  sync_error=call_after_sync_hook(commit_queue)
   //通过RUN_HOOK调用repl_semi_report_binlog_sync
   RUN_HOOK(binlog_storage, after_sync, (queue_head, log_file, pos)))
  repl_semi_report_binlog_sync(Binlog_storage_param *param,const char *log_file,my_off_t log_pos)
    repl_semisync.commitTrx(log_file, log_pos);
   ...
   while(is_on()){
     //比较返回的文件,cmp>=0则退出循环
     int cmp = ActiveTranx::compare(reply_file_name_, reply_file_pos_,
                                       trx_wait_binlog_name, trx_wait_binlog_pos);
     //等待ack节点信号
     wait_result= mysql_cond_timedwait(&entry->cond, &LOCK_binlog_, &abstime);
     //没有在等待时间收到回复,则关闭半同步
     if(wait_result!=0)
       witch_off()
   }

插件调用的机制

RUN_HOOK宏定义

    #define RUN_HOOK(group, hook, args)
    (group ##_delegate->is_empty() ?
    0 : group ##_delegate->hook args)


    RUN_HOOK(binlog_storage, after_flush,(thd, file_name_ptr, flush_end_pos))

展开

    binlog_storage_after_flush_delegate->is_empty()?0:binlog_storage_delegate->after_flush(thd, file_name_ptr, flush_end_pos)

转到

    int Binlog_storage_delegate::after_flush(THD *thd,
                                          const char *log_file,
                                          my_off_t log_pos)
 {

    Binlog_storage_param param;
   param.server_id= thd->server_id;

    int ret= 0;
   FOREACH_OBSERVER(ret, after_flush, thd, (&param, log_file, log_pos));
 }

把宏展开

    define FOREACH_OBSERVER(r, f, thd, args)
    ..
   Observer_info_iterator iter= observer_info_iter();
   Observer_info *info= iter++;
   for (; info; info= iter++)
   {
    ...
    if (((Observer *)info->observer)->f &&
    ((Observer *)info->observer)->f args)
    ...
   }

info使用observer_info_iter()赋值

Delegate::Observer_info_iterator Delegate::observer_info_iter()
 {
   return Observer_info_iterator(observer_info_list);
 }

再看Observer

class Binlog_storage_delegate
   :public Delegate {
 public:
 ...
 typedef Binlog_storage_observer Observer;
}


Binlog_storage_observer保存了函数的指针

typedef struct Binlog_storage_observer {
   uint32 len;
   int (*after_flush)(Binlog_storage_param *param,
                      const char *log_file, my_off_t log_pos);
   int (*after_sync)(Binlog_storage_param *param,
                      const char *log_file, my_off_t log_pos);
 } Binlog_storage_observer;


初始化函数中通过register_binlog_storage_observer添加观察者

static int semi_sync_master_plugin_init(void *p)
 {
 #ifdef HAVE_PSI_INTERFACE
   init_semisync_psi_keys();
 #endif

    my_create_thread_local_key(&THR_RPL_SEMI_SYNC_DUMP, NULL);

    if (repl_semisync.initObject())
     return 1;
   if (ack_receiver.init())
     return 1;
   if (register_trans_observer(&trans_observer, p))
     return 1;
   if (register_binlog_storage_observer(&storage_observer, p))
     return 1;
   if (register_binlog_transmit_observer(&transmit_observer, p))
     return 1;
   return 0;
 }

storage_observer初始化

Binlog_storage_observer storage_observer = {
   sizeof(Binlog_storage_observer), // len

   repl_semi_report_binlog_update, // report_update
   repl_semi_report_binlog_sync,   // after_sync
 };

主库ack应答线程

 //启动半同步复制线程
 fix_rpl_semi_sync_master_enabled()
 //启动ack_receiver
 ack_receiver.start()
 //创建线程
 mysql_thread_create(key_ss_thread_Ack_receiver_thread, &m_pid,&attr, ack_receive_handler, this)
  //运行
 Ack_receiver::run()
 //获取连接,
 ret= listener.listen_on_sockets()
 //处理收到的数据
 repl_semisync.reportReplyPacket(slave_obj.server_id, net.read_pos, len);
 handleAck(server_id, log_file_name, log_file_pos)

Ack存放的数据结构

 struct AckInfo
 {
   int server_id;
   char binlog_name[FN_REFLEN];
   unsigned long long binlog_pos;
   ...
 }

 class AckContainer : public Trace
 {
 public:
   AckContainer() : m_ack_array(NULL), m_size(0), m_empty_slot(0) {}
 ...
 }

Ack_receiver::run()线程运行函数

 

while (1) {
    ... 
    ret = listener.listen_on_sockets();
    ... 
    i = 0;
    while (i < listener.number_of_slave_sockets() && m_status == ST_UP) {
        if (listener.is_socket_active(i)) {
            Slave slave_obj = listener.get_slave_obj(i);
            ulong len;
            net.compress = slave_obj.net_compress;

            do {
                net_clear(&net, 0);
                len = my_net_read(
                    &net);  // 返回packet的长度,多个packet会合并读取,压缩packet会解压
                if (likely(len != packet_error))
                    repl_semisync.reportReplyPacket(slave_obj.server_id,
                                                    net.read_pos, len);
                else if (net.last_errno == ER_NET_READ_ERROR)
                    listener.clear_socket_info(i);
            } while (net.vio->has_data(net.vio) && m_status == ST_UP);
        }
        i++;
    }
}

repl_semisync.reportReplyPacket()解析并处理packet

int ReplSemiSyncMaster::reportReplyPacket(uint32 server_id, const uchar*packet,
                             ulong packet_len)
 {
   ...
   char log_file_name[FN_REFLEN+1];
   my_off_t log_file_pos;
   ulong log_file_len = 0;
   ...
   log_file_pos = uint8korr(packet + REPLY_BINLOG_POS_OFFSET);
   log_file_len = packet_len - REPLY_BINLOG_NAME_OFFSET;
   if (unlikely(log_file_len >= FN_REFLEN))
   {
     sql_print_error("Read semi-sync reply binlog file length too large");
     goto l_end;
   }
   strncpy(log_file_name, (const char*)packet + REPLY_BINLOG_NAME_OFFSET, log_file_len);
   log_file_name[log_file_len] = 0;

    if (trace_level_ & kTraceDetail)
     sql_print_information("%s: Got reply(%s, %lu) from server %u",
                           kWho, log_file_name, (ulong)log_file_pos, server_id);

    handleAck(server_id, log_file_name, log_file_pos);
   ...
 }

handleAck()收到ack后,更新ack_array。

void handleAck(int server_id, const char *log_file_name,
                  my_off_t log_file_pos)
   {
     lock();
     if (rpl_semi_sync_master_wait_for_slave_count == 1)
       reportReplyBinlog(log_file_name, log_file_pos);
     else
     {
       const AckInfo *ackinfo= NULL;
    //多个从库则返回小于log_file_pos的、最小的ack
       ackinfo= ack_container_.insert(server_id, log_file_name, log_file_pos);
       if (ackinfo != NULL)
         reportReplyBinlog(ackinfo->binlog_name, ackinfo->binlog_pos);
     }
     unlock();
   }

reportReplyBinlog()通知线程

void ReplSemiSyncMaster::reportReplyBinlog(const char *log_file_name,
                                            my_off_t log_file_pos)
 {
   bool  need_copy_send_pos = true;
   ...
   //半同步关闭时,尝试开启半同步模式。开启条件:收到的文件位置大小大于提交事务能看到的最大位置
   if (!is_on())
     try_switch_on(log_file_name, log_file_pos);
   //比较,判断条件同上
   if (reply_file_name_inited_)
   {
     cmp = ActiveTranx::compare(log_file_name, log_file_pos,
                                reply_file_name_, reply_file_pos_);

    if (cmp < 0)
     {
       need_copy_send_pos = false;
     }
   }
   //更新
   if (need_copy_send_pos)
   {
     strncpy(reply_file_name_, log_file_name, sizeof(reply_file_name_) - 1);
     reply_file_name_[sizeof(reply_file_name_) - 1]= '\0';
     reply_file_pos_ = log_file_pos;
     reply_file_name_inited_ = true;
     ...
   }
   if (rpl_semi_sync_master_wait_sessions > 0)
   {
  //比较收到的文件与等待从库应答的文件大小
     cmp = ActiveTranx::compare(reply_file_name_, reply_file_pos_,
                                wait_file_name_, wait_file_pos_);
     if (cmp >= 0)
     {
       can_release_threads = true;
       wait_file_name_inited_ = false;
     }
   }
   ..
   //激活等待的线程,只要偏移量小于从库应答的binlog
   if (can_release_threads)
   {
  ...
     active_tranxs_->signal_waiting_sessions_up_to(reply_file_name_, reply_file_pos_);
   }
0条评论
0 / 1000
meng321
5文章数
0粉丝数
meng321
5 文章 | 0 粉丝
原创

mysql半同步复制分析

2023-04-14 09:40:59
25
0

用户线程事务提交

 //事务刷盘及提交
 //第一阶段,将缓存刷到binlog
 //第二阶段,提交所有事务

ordered_commit()
   ...
   //一阶段flush binlog后,通过RUN_HOOK调用repl_semi_report_binlog_update
   if (RUN_HOOK(binlog_storage, after_flush,(thd, file_name_ptr, flush_end_pos)))
  repl_semi_report_binlog_update()
    //传入文件名和位置(文件中所能看的最大事务)
    //如果是未初始化,将文件名和位置更新
    //如果已初始化,比较位置大小。若log_file更大,将文件名和位置更新
    repl_semisync.writeTranxInBinlog(log_file,log_pos)
    //比较文件和位置
    ActiveTranx::compare(log_file_name, log_file_pos,commit_file_name_, commit_file_pos_)
    //插入一个节点,位于事务链表的末端,并生成一个哈希值给节点
    active_tranxs_->insert_tranx_node(log_file_name, log_file_pos)
  ...
 //判断binlog是否发送到从库并应答,并阻塞等待
  sync_error=call_after_sync_hook(commit_queue)
   //通过RUN_HOOK调用repl_semi_report_binlog_sync
   RUN_HOOK(binlog_storage, after_sync, (queue_head, log_file, pos)))
  repl_semi_report_binlog_sync(Binlog_storage_param *param,const char *log_file,my_off_t log_pos)
    repl_semisync.commitTrx(log_file, log_pos);
   ...
   while(is_on()){
     //比较返回的文件,cmp>=0则退出循环
     int cmp = ActiveTranx::compare(reply_file_name_, reply_file_pos_,
                                       trx_wait_binlog_name, trx_wait_binlog_pos);
     //等待ack节点信号
     wait_result= mysql_cond_timedwait(&entry->cond, &LOCK_binlog_, &abstime);
     //没有在等待时间收到回复,则关闭半同步
     if(wait_result!=0)
       witch_off()
   }

插件调用的机制

RUN_HOOK宏定义

    #define RUN_HOOK(group, hook, args)
    (group ##_delegate->is_empty() ?
    0 : group ##_delegate->hook args)


    RUN_HOOK(binlog_storage, after_flush,(thd, file_name_ptr, flush_end_pos))

展开

    binlog_storage_after_flush_delegate->is_empty()?0:binlog_storage_delegate->after_flush(thd, file_name_ptr, flush_end_pos)

转到

    int Binlog_storage_delegate::after_flush(THD *thd,
                                          const char *log_file,
                                          my_off_t log_pos)
 {

    Binlog_storage_param param;
   param.server_id= thd->server_id;

    int ret= 0;
   FOREACH_OBSERVER(ret, after_flush, thd, (&param, log_file, log_pos));
 }

把宏展开

    define FOREACH_OBSERVER(r, f, thd, args)
    ..
   Observer_info_iterator iter= observer_info_iter();
   Observer_info *info= iter++;
   for (; info; info= iter++)
   {
    ...
    if (((Observer *)info->observer)->f &&
    ((Observer *)info->observer)->f args)
    ...
   }

info使用observer_info_iter()赋值

Delegate::Observer_info_iterator Delegate::observer_info_iter()
 {
   return Observer_info_iterator(observer_info_list);
 }

再看Observer

class Binlog_storage_delegate
   :public Delegate {
 public:
 ...
 typedef Binlog_storage_observer Observer;
}


Binlog_storage_observer保存了函数的指针

typedef struct Binlog_storage_observer {
   uint32 len;
   int (*after_flush)(Binlog_storage_param *param,
                      const char *log_file, my_off_t log_pos);
   int (*after_sync)(Binlog_storage_param *param,
                      const char *log_file, my_off_t log_pos);
 } Binlog_storage_observer;


初始化函数中通过register_binlog_storage_observer添加观察者

static int semi_sync_master_plugin_init(void *p)
 {
 #ifdef HAVE_PSI_INTERFACE
   init_semisync_psi_keys();
 #endif

    my_create_thread_local_key(&THR_RPL_SEMI_SYNC_DUMP, NULL);

    if (repl_semisync.initObject())
     return 1;
   if (ack_receiver.init())
     return 1;
   if (register_trans_observer(&trans_observer, p))
     return 1;
   if (register_binlog_storage_observer(&storage_observer, p))
     return 1;
   if (register_binlog_transmit_observer(&transmit_observer, p))
     return 1;
   return 0;
 }

storage_observer初始化

Binlog_storage_observer storage_observer = {
   sizeof(Binlog_storage_observer), // len

   repl_semi_report_binlog_update, // report_update
   repl_semi_report_binlog_sync,   // after_sync
 };

主库ack应答线程

 //启动半同步复制线程
 fix_rpl_semi_sync_master_enabled()
 //启动ack_receiver
 ack_receiver.start()
 //创建线程
 mysql_thread_create(key_ss_thread_Ack_receiver_thread, &m_pid,&attr, ack_receive_handler, this)
  //运行
 Ack_receiver::run()
 //获取连接,
 ret= listener.listen_on_sockets()
 //处理收到的数据
 repl_semisync.reportReplyPacket(slave_obj.server_id, net.read_pos, len);
 handleAck(server_id, log_file_name, log_file_pos)

Ack存放的数据结构

 struct AckInfo
 {
   int server_id;
   char binlog_name[FN_REFLEN];
   unsigned long long binlog_pos;
   ...
 }

 class AckContainer : public Trace
 {
 public:
   AckContainer() : m_ack_array(NULL), m_size(0), m_empty_slot(0) {}
 ...
 }

Ack_receiver::run()线程运行函数

 

while (1) {
    ... 
    ret = listener.listen_on_sockets();
    ... 
    i = 0;
    while (i < listener.number_of_slave_sockets() && m_status == ST_UP) {
        if (listener.is_socket_active(i)) {
            Slave slave_obj = listener.get_slave_obj(i);
            ulong len;
            net.compress = slave_obj.net_compress;

            do {
                net_clear(&net, 0);
                len = my_net_read(
                    &net);  // 返回packet的长度,多个packet会合并读取,压缩packet会解压
                if (likely(len != packet_error))
                    repl_semisync.reportReplyPacket(slave_obj.server_id,
                                                    net.read_pos, len);
                else if (net.last_errno == ER_NET_READ_ERROR)
                    listener.clear_socket_info(i);
            } while (net.vio->has_data(net.vio) && m_status == ST_UP);
        }
        i++;
    }
}

repl_semisync.reportReplyPacket()解析并处理packet

int ReplSemiSyncMaster::reportReplyPacket(uint32 server_id, const uchar*packet,
                             ulong packet_len)
 {
   ...
   char log_file_name[FN_REFLEN+1];
   my_off_t log_file_pos;
   ulong log_file_len = 0;
   ...
   log_file_pos = uint8korr(packet + REPLY_BINLOG_POS_OFFSET);
   log_file_len = packet_len - REPLY_BINLOG_NAME_OFFSET;
   if (unlikely(log_file_len >= FN_REFLEN))
   {
     sql_print_error("Read semi-sync reply binlog file length too large");
     goto l_end;
   }
   strncpy(log_file_name, (const char*)packet + REPLY_BINLOG_NAME_OFFSET, log_file_len);
   log_file_name[log_file_len] = 0;

    if (trace_level_ & kTraceDetail)
     sql_print_information("%s: Got reply(%s, %lu) from server %u",
                           kWho, log_file_name, (ulong)log_file_pos, server_id);

    handleAck(server_id, log_file_name, log_file_pos);
   ...
 }

handleAck()收到ack后,更新ack_array。

void handleAck(int server_id, const char *log_file_name,
                  my_off_t log_file_pos)
   {
     lock();
     if (rpl_semi_sync_master_wait_for_slave_count == 1)
       reportReplyBinlog(log_file_name, log_file_pos);
     else
     {
       const AckInfo *ackinfo= NULL;
    //多个从库则返回小于log_file_pos的、最小的ack
       ackinfo= ack_container_.insert(server_id, log_file_name, log_file_pos);
       if (ackinfo != NULL)
         reportReplyBinlog(ackinfo->binlog_name, ackinfo->binlog_pos);
     }
     unlock();
   }

reportReplyBinlog()通知线程

void ReplSemiSyncMaster::reportReplyBinlog(const char *log_file_name,
                                            my_off_t log_file_pos)
 {
   bool  need_copy_send_pos = true;
   ...
   //半同步关闭时,尝试开启半同步模式。开启条件:收到的文件位置大小大于提交事务能看到的最大位置
   if (!is_on())
     try_switch_on(log_file_name, log_file_pos);
   //比较,判断条件同上
   if (reply_file_name_inited_)
   {
     cmp = ActiveTranx::compare(log_file_name, log_file_pos,
                                reply_file_name_, reply_file_pos_);

    if (cmp < 0)
     {
       need_copy_send_pos = false;
     }
   }
   //更新
   if (need_copy_send_pos)
   {
     strncpy(reply_file_name_, log_file_name, sizeof(reply_file_name_) - 1);
     reply_file_name_[sizeof(reply_file_name_) - 1]= '\0';
     reply_file_pos_ = log_file_pos;
     reply_file_name_inited_ = true;
     ...
   }
   if (rpl_semi_sync_master_wait_sessions > 0)
   {
  //比较收到的文件与等待从库应答的文件大小
     cmp = ActiveTranx::compare(reply_file_name_, reply_file_pos_,
                                wait_file_name_, wait_file_pos_);
     if (cmp >= 0)
     {
       can_release_threads = true;
       wait_file_name_inited_ = false;
     }
   }
   ..
   //激活等待的线程,只要偏移量小于从库应答的binlog
   if (can_release_threads)
   {
  ...
     active_tranxs_->signal_waiting_sessions_up_to(reply_file_name_, reply_file_pos_);
   }
文章来自个人专栏
文章 | 订阅
0条评论
0 / 1000
请输入你的评论
0
0