用户线程事务提交
//事务刷盘及提交
//第一阶段,将缓存刷到binlog
//第二阶段,提交所有事务
ordered_commit()
...
//一阶段flush binlog后,通过RUN_HOOK调用repl_semi_report_binlog_update
if (RUN_HOOK(binlog_storage, after_flush,(thd, file_name_ptr, flush_end_pos)))
repl_semi_report_binlog_update()
//传入文件名和位置(文件中所能看的最大事务)
//如果是未初始化,将文件名和位置更新
//如果已初始化,比较位置大小。若log_file更大,将文件名和位置更新
repl_semisync.writeTranxInBinlog(log_file,log_pos)
//比较文件和位置
ActiveTranx::compare(log_file_name, log_file_pos,commit_file_name_, commit_file_pos_)
//插入一个节点,位于事务链表的末端,并生成一个哈希值给节点
active_tranxs_->insert_tranx_node(log_file_name, log_file_pos)
...
//判断binlog是否发送到从库并应答,并阻塞等待
sync_error=call_after_sync_hook(commit_queue)
//通过RUN_HOOK调用repl_semi_report_binlog_sync
RUN_HOOK(binlog_storage, after_sync, (queue_head, log_file, pos)))
repl_semi_report_binlog_sync(Binlog_storage_param *param,const char *log_file,my_off_t log_pos)
repl_semisync.commitTrx(log_file, log_pos);
...
while(is_on()){
//比较返回的文件,cmp>=0则退出循环
int cmp = ActiveTranx::compare(reply_file_name_, reply_file_pos_,
trx_wait_binlog_name, trx_wait_binlog_pos);
//等待ack节点信号
wait_result= mysql_cond_timedwait(&entry->cond, &LOCK_binlog_, &abstime);
//没有在等待时间收到回复,则关闭半同步
if(wait_result!=0)
witch_off()
}
插件调用的机制
RUN_HOOK宏定义
#define RUN_HOOK(group, hook, args)
(group ##_delegate->is_empty() ?
0 : group ##_delegate->hook args)
把
RUN_HOOK(binlog_storage, after_flush,(thd, file_name_ptr, flush_end_pos))
展开
binlog_storage_after_flush_delegate->is_empty()?0:binlog_storage_delegate->after_flush(thd, file_name_ptr, flush_end_pos)
转到
int Binlog_storage_delegate::after_flush(THD *thd,
const char *log_file,
my_off_t log_pos)
{
Binlog_storage_param param;
param.server_id= thd->server_id;
int ret= 0;
FOREACH_OBSERVER(ret, after_flush, thd, (¶m, log_file, log_pos));
}
把宏展开
define FOREACH_OBSERVER(r, f, thd, args)
..
Observer_info_iterator iter= observer_info_iter();
Observer_info *info= iter++;
for (; info; info= iter++)
{
...
if (((Observer *)info->observer)->f &&
((Observer *)info->observer)->f args)
...
}
info使用observer_info_iter()赋值
Delegate::Observer_info_iterator Delegate::observer_info_iter()
{
return Observer_info_iterator(observer_info_list);
}
再看Observer
class Binlog_storage_delegate
:public Delegate {
public:
...
typedef Binlog_storage_observer Observer;
}
Binlog_storage_observer保存了函数的指针
typedef struct Binlog_storage_observer {
uint32 len;
int (*after_flush)(Binlog_storage_param *param,
const char *log_file, my_off_t log_pos);
int (*after_sync)(Binlog_storage_param *param,
const char *log_file, my_off_t log_pos);
} Binlog_storage_observer;
初始化函数中通过register_binlog_storage_observer添加观察者
static int semi_sync_master_plugin_init(void *p)
{
#ifdef HAVE_PSI_INTERFACE
init_semisync_psi_keys();
#endif
my_create_thread_local_key(&THR_RPL_SEMI_SYNC_DUMP, NULL);
if (repl_semisync.initObject())
return 1;
if (ack_receiver.init())
return 1;
if (register_trans_observer(&trans_observer, p))
return 1;
if (register_binlog_storage_observer(&storage_observer, p))
return 1;
if (register_binlog_transmit_observer(&transmit_observer, p))
return 1;
return 0;
}
storage_observer初始化
Binlog_storage_observer storage_observer = {
sizeof(Binlog_storage_observer), // len
repl_semi_report_binlog_update, // report_update
repl_semi_report_binlog_sync, // after_sync
};
主库ack应答线程
//启动半同步复制线程
fix_rpl_semi_sync_master_enabled()
//启动ack_receiver
ack_receiver.start()
//创建线程
mysql_thread_create(key_ss_thread_Ack_receiver_thread, &m_pid,&attr, ack_receive_handler, this)
//运行
Ack_receiver::run()
//获取连接,
ret= listener.listen_on_sockets()
//处理收到的数据
repl_semisync.reportReplyPacket(slave_obj.server_id, net.read_pos, len);
handleAck(server_id, log_file_name, log_file_pos)
Ack存放的数据结构
struct AckInfo
{
int server_id;
char binlog_name[FN_REFLEN];
unsigned long long binlog_pos;
...
}
class AckContainer : public Trace
{
public:
AckContainer() : m_ack_array(NULL), m_size(0), m_empty_slot(0) {}
...
}
Ack_receiver::run()线程运行函数
while (1) {
...
ret = listener.listen_on_sockets();
...
i = 0;
while (i < listener.number_of_slave_sockets() && m_status == ST_UP) {
if (listener.is_socket_active(i)) {
Slave slave_obj = listener.get_slave_obj(i);
ulong len;
net.compress = slave_obj.net_compress;
do {
net_clear(&net, 0);
len = my_net_read(
&net); // 返回packet的长度,多个packet会合并读取,压缩packet会解压
if (likely(len != packet_error))
repl_semisync.reportReplyPacket(slave_obj.server_id,
net.read_pos, len);
else if (net.last_errno == ER_NET_READ_ERROR)
listener.clear_socket_info(i);
} while (net.vio->has_data(net.vio) && m_status == ST_UP);
}
i++;
}
}
repl_semisync.reportReplyPacket()解析并处理packet
int ReplSemiSyncMaster::reportReplyPacket(uint32 server_id, const uchar*packet,
ulong packet_len)
{
...
char log_file_name[FN_REFLEN+1];
my_off_t log_file_pos;
ulong log_file_len = 0;
...
log_file_pos = uint8korr(packet + REPLY_BINLOG_POS_OFFSET);
log_file_len = packet_len - REPLY_BINLOG_NAME_OFFSET;
if (unlikely(log_file_len >= FN_REFLEN))
{
sql_print_error("Read semi-sync reply binlog file length too large");
goto l_end;
}
strncpy(log_file_name, (const char*)packet + REPLY_BINLOG_NAME_OFFSET, log_file_len);
log_file_name[log_file_len] = 0;
if (trace_level_ & kTraceDetail)
sql_print_information("%s: Got reply(%s, %lu) from server %u",
kWho, log_file_name, (ulong)log_file_pos, server_id);
handleAck(server_id, log_file_name, log_file_pos);
...
}
handleAck()收到ack后,更新ack_array。
void handleAck(int server_id, const char *log_file_name,
my_off_t log_file_pos)
{
lock();
if (rpl_semi_sync_master_wait_for_slave_count == 1)
reportReplyBinlog(log_file_name, log_file_pos);
else
{
const AckInfo *ackinfo= NULL;
//多个从库则返回小于log_file_pos的、最小的ack
ackinfo= ack_container_.insert(server_id, log_file_name, log_file_pos);
if (ackinfo != NULL)
reportReplyBinlog(ackinfo->binlog_name, ackinfo->binlog_pos);
}
unlock();
}
reportReplyBinlog()通知线程
void ReplSemiSyncMaster::reportReplyBinlog(const char *log_file_name,
my_off_t log_file_pos)
{
bool need_copy_send_pos = true;
...
//半同步关闭时,尝试开启半同步模式。开启条件:收到的文件位置大小大于提交事务能看到的最大位置
if (!is_on())
try_switch_on(log_file_name, log_file_pos);
//比较,判断条件同上
if (reply_file_name_inited_)
{
cmp = ActiveTranx::compare(log_file_name, log_file_pos,
reply_file_name_, reply_file_pos_);
if (cmp < 0)
{
need_copy_send_pos = false;
}
}
//更新
if (need_copy_send_pos)
{
strncpy(reply_file_name_, log_file_name, sizeof(reply_file_name_) - 1);
reply_file_name_[sizeof(reply_file_name_) - 1]= '\0';
reply_file_pos_ = log_file_pos;
reply_file_name_inited_ = true;
...
}
if (rpl_semi_sync_master_wait_sessions > 0)
{
//比较收到的文件与等待从库应答的文件大小
cmp = ActiveTranx::compare(reply_file_name_, reply_file_pos_,
wait_file_name_, wait_file_pos_);
if (cmp >= 0)
{
can_release_threads = true;
wait_file_name_inited_ = false;
}
}
..
//激活等待的线程,只要偏移量小于从库应答的binlog
if (can_release_threads)
{
...
active_tranxs_->signal_waiting_sessions_up_to(reply_file_name_, reply_file_pos_);
}