author: 覃育龙
update:2023/3/16
本文主要讲移植入mysql中的线程池的原理
MySQL中的连接管理
连接和线程管理相关的类
mysql中连接和线程管理相关的代码主要在mysql/sql/conn_handler中,其中主要涉及的类包括:
- Connection_handler_manager:单例类,用来管理连接处理器;
- Connection_handler:连接处理的抽象类,具体实现由其子类实现;
- Per_thread_connection_handler:继承了Connection_handler,每一个连接用单独的线程处理,默认为该实现,通过thread_handling参数可以设置;
- One_thread_connection_handler:继承了Connection_handler,所有连接用同一个线程处理;
- Plugin_connection_handler:继承了Connection_handler,支持由Plugin具体实现的handler,例如线程池;
- Connection_acceptor:一个模版类,以模版方式支持不同的监听实现(三个listener)。并且提供一个死循环,用以监听连接;
- Mysqld_socket_listener:实现以Socket的方式监听客户端的连接事件,并且支持TCP socket和Unix socket,即对应的TCP_socket和Unix_socket;
- Shared_mem_listener:通过共享内存的监听方式(windows下);
- Named_pipe_listener:通过命名管道来监听和接收客户请求(windows下);
- Channel_info:连接信道的抽象类,具体实现有Channel_info_local_socket和Channel_info_tcpip_socket,
- Channel_info_local_socket:与本地方式与服务器进行交互;
- Channel_info_tcpip_socket:以TCP/IP方式与服务器进行交互;
- TCP_socket:TCP socket,与MySQL服务不同机器的连接访问;
- Unix_socket:Unix socket,与MySQL服务相同主机的连接访问;
其类图关系如下:
其中,三个Listener将作为Connection_acceptor模版类作为具体的Listener。
简单来说就是通过listener监听请求,并创建连接Channel_Info的具体类,然后通过单例Connection_handler_manager指派给具体的Connection_Handler(Per_thread_connection_handler/One_thread_connection_handler/Thread_pool_connection_handler)进行调度。
连接的流程
连接管理器的初始化(Connection_handler_manager)
Connection_handler_manager这个单例类初始化时会根据server启动参数或配置文件来初始化,根据thread_handling来初始化connection_handler
其中Per_thread_connection_handler无论mysqld使用哪种方式均会被初始化
> mysqld_main()
> init_common_variables() // 初始化变量
> get_options() // 获取server启动参数
> Connection_handler_manager::init() {// 初始化连接管理的单例类
Per_thread_connection_handler::init()
Connection_handler *connection_handler = nullptr;
switch (Connection_handler_manager::thread_handling) {
case SCHEDULER_ONE_THREAD_PER_CONNECTION:
connection_handler = new (std::nothrow) Per_thread_connection_handler();
break;
case SCHEDULER_NO_THREADS:
connection_handler = new (std::nothrow) One_thread_connection_handler();
break;
case SCHEDULER_THREAD_POOL:
connection_handler = new (std::nothrow) Thread_pool_connection_handler();
break;
default:
assert(false);
}
m_instance =
new (std::nothrow) Connection_handler_manager(connection_handler);
}
监听处理循环
mysqld主函数开启后,进入Connection_acceptor::connection_event_loop,在windows系统上是由三个线程分别调用Socket/NamePipe/SharedMemory的connection_event_loop;(Connection_acceptor是一个模板类,分别有3种实现)
//sql/conn_handler/connection_acceptor.h
void connection_event_loop() {
Connection_handler_manager *mgr =
Connection_handler_manager::get_instance();
while (!connection_events_loop_aborted()) {
Channel_info *channel_info = m_listener->listen_for_connection_event();
if (channel_info != nullptr) mgr->process_new_connection(channel_info);
}
}
该函数作用就是通过监听客户请求listen_for_connection_event(),然后处理请求m_listener->process_new_connection()
其中通过connection_events_loop_aborted()来判断是否被终止
Mysqld_socket_listener的处理
初始化
通过Mysqld_socket_listener::setup_listener()来进行初始化工作。
Mysqld_socket_listener具体监听的列表存储在m_socket_vector中,由setup_listener()来完成初始化。
在m_socket_vector插入的顺序为: admin socket-> tcp socket-> unix socket -> event socket。 所以说linster会监听多个socket,最后才是普通的socket。
获取请求
Mysqld_socket_listener::listen_for_connection_event() {
#ifdef HAVE_POLL
int retval = poll();
#else
int retval = select();
// 获取一个新的连接请求
const Listen_socket *listen_socket = get_listen_socket();
...
// 接受请求
accept_connection(listen_socket->m_socket, &onnect_sock)
// 构造Channel_info
Channel_info *channel_info = nullptr;
if (listen_socket->m_socket_type == ocket_type::UNIX_SOCKET)
channel_info = new (std::nothrow) hannel_info_local_socket(connect_sock);
else
channel_info = new (std::nothrow) hannel_info_tcpip_socket(...);
return channel_info;
}
通过Mysqld_socket_listener::listen_for_connection_event()来监听请求,有poll与select两种方式。
在监听到有新请求之后,调用Mysqld_socket_listener::get_listen_socket()获取监听到新请求的socket,具体逻辑如下:
- 首先检查是否有admin的请求,有则返回admin的socket(首先处理admin socket)
- 返回第一个有请求的socket
接受请求
获取到有效socket后,进入accept_connection()->mysql_socket_accept调用accept接收请求。
然后构造一个Channel_info,Channel_info_local_socket(localhost登录)或Channel_info_tcpip_socket(ip登录),并作为listen_for_connection_event()的返回值,所有与连接的相关的信息都保存在Channel_info中。
处理新连接(process_new_connection)
// sql/conn_handler/connection_handler_manager.cc
void Connection_handler_manager::process_new_connection(
Channel_info *channel_info) {
if (connection_events_loop_aborted() ||
!check_and_incr_conn_count(channel_info->is_admin_connection())) {
channel_info->send_error_and_close_channel(ER_CON_COUNT_ERROR, 0, true);
sql_print_warning("%s", ER_DEFAULT(ER_CON_COUNT_ERROR));
delete channel_info;
return;
}
if (m_connection_handler->add_connection(channel_info)) {
inc_aborted_connects();
delete channel_info;
}
}
1.首先判断是否超过连接上限
其代码逻辑如下:
- 首先检查服务是否已经停止,然后check_and_incr_conn_count增加连接计数,如果超过连接上限,那么将拒绝连接,并关闭连接;
- 将连接传入Connection_handler::add_connection()中
实际上mysql会运行max_connections + 1个连接,当连接数满时,为super用户保留最后一个连接
2.执行add_connection()
最终add_connetion()的执行mysql中配置的线程处理方式决定,有三种:
- one-thread-per-connection: 每个连接一个线程,线程关闭后线程会等待,直到下次新的连接后复用该线程。
- no-threads:在一个工作线程中处理的所有连接。
- pool-of-threads: 线程池方式
one-thread-per-connection方式
先查看是否有空闲线程,如果有则直接用空闲线程处理;否则将创建一个新的线程来处理新连接。
bool Per_thread_connection_handler::add_connection(Channel_info *channel_info) {
int error = 0;
my_thread_handle id;
if (!check_idle_thread_and_enqueue_connection(channel_info)) return false;
/*
There are no idle threads avaliable to take up the new
connection. Create a new thread to handle the connection
*/
channel_info->set_prior_thr_create_utime();
error =
mysql_thread_create(key_thread_one_connection, &id, &connection_attrib,
handle_connection, (void *)channel_info);
...
Global_THD_manager::get_instance()->inc_thread_created();
DBUG_PRINT("info", ("Thread created"));
return false;
}
check_idle_thread_and_enqueue_connection 做的事情就是检查当前是否有空闲状态的线程;有的话,则将channel_info加入到队列中,并向空闲线程发送信号;否则创建新线程,并执行handle_connection函数。
handle_connection函数的逻辑如下:
- 首先是初始化线程需要的内存;
- 然后创建一个THD对象init_new_thd;
- 创建/或重用psi对象,并加到thd对象;
- 将thd对象加到thd manager中;
- 调用thd_prepare_connection做验证;
- do_command(thd)处理query
- end_connection,关闭一个已经建立的连接,
- close_connection,disconnect关闭当前会话的vio;
- 释放资源thd中的资源
- 阻塞等待下一个连接,block_until_new_connection(),监听接收前面提到的check_idle_thread_and_enqueue_connection发过来的信号
线程池
线程池的总览
主要数据结构
connection_t
连接结构体,一个连接/事件对应一个connection_t,一个connection_t对应一个THD
struct connection_t {
THD *thd; // THD
thread_group_t *thread_group; // 该连接所在的线程组
connection_t *next_in_queue; // 用于connection_queue_t
connection_t **prev_in_queue; // 用于connection_queue_t
ulonglong abs_wait_timeout; // 连接时间,用于超时关闭
bool logged_in; // 是否登入
bool bound_to_poll_descriptor; // 是否获取到poolfd
bool waiting; // 是否在等待
uint tickets; // threadpool_high_prio_mode=TP_HIGH_PRIO_MODE_TRANSACTIONS下,在高优先级队列中的票数
};
thread_group_t
线程组结构体,在该线程池的框架中,最多有128(MAX_THREAD_GROUPS)个线程组
struct alignas(128) thread_group_t {
mysql_mutex_t mutex;
connection_queue_t queue; // 待处理连接队列
connection_queue_t high_prio_queue; // 高优先级待处理连接队列
worker_list_t waiting_threads; // 等待线程队列,线程每次执行完后会进入睡眠状态等待下次唤醒工作
worker_thread_t *listener; // 监听线程,负责监听命令事件,事件队列为空时也会充当工作线程
pthread_attr_t *pthread_attr; // 线程属性
int pollfd; // poll文件描述符
int thread_count; // 线程数
int active_thread_count; // 活跃线程数
int connection_count; // 连接数
int waiting_thread_count; // 活跃连接数
/* Stats for the deadlock detection timer routine.*/
int io_event_count; // io事件数
int queue_event_count; // 队列中已处理事件数量
ulonglong last_thread_creation_time;// 上一个线程创建的时间
int shutdown_pipe[2]; // 管道通信
bool shutdown; // 关闭标志
bool stalled; // 停滞标志
char padding[328]; // 填充
};
waiting_threads: 等待线程队列,线程每次执行完后会将线程添加到等待列表的头部并等待。重要的是要把线加在头部而不是尾部,因为它确保了LIFO唤醒顺序(热缓存,工作不活动超时)
worker_thread_t
工作线程结构体,一个结构体对应一个worker
struct worker_thread_t {
ulonglong event_count; /* number of request handled by this thread */
thread_group_t *thread_group; // 所在线程组
worker_thread_t *next_in_list; // 用于worker_list_t
worker_thread_t **prev_in_list; // 用于worker_list_t
mysql_cond_t cond; // 用于唤醒线程的信号量
bool woken; // 是否被唤醒
};
pool_timer_t
用于所有线程组的一个全局计时器
struct pool_timer_t {
mysql_mutex_t mutex; // 锁
mysql_cond_t cond; // 信号量
std::atomic<uint64> current_microtime; // 当前时间,微秒
std::atomic<uint64> next_timeout_check; // 下一次检查的事件
int tick_interval; // 节拍
bool shutdown; // 关闭标志
};
线程池初始化
线程池的初始化在tp_init()中
//sql/threadpool_unix.cc
bool tp_init() {
DBUG_ENTER("tp_init");
threadpool_started = true;
for (uint i = 0; i < array_elements(all_groups); i++) {
thread_group_init(&all_groups[i], get_connection_attrib());
}
tp_set_threadpool_size(threadpool_size);
...
pool_timer.tick_interval = threadpool_stall_limit;
start_timer(&pool_timer);
DBUG_RETURN(false);
}
在tp_init()主要初始化工作:
- 初始化所有(MAX_THREAD_GROUPS 128)的线程组, 初始化每个线程组的属性和状态等信息
- 初始化threadpool_size个线程组的pollfd
- 启动start_timer定时器线程
处理新连接
将新连接加入线程池
下面是线程池从add_connetion开始,将新连接加入到线程池中的过程
> Thread_pool_connection_handler::add_connection(Channel_info *channel_info) {
THD *const thd = channel_info->create_thd(); // 创建THD
connection_t *const connection = alloc_connection(thd); // 初始化connection_t
...
// 初始化thd的一些信息
thd->set_new_thread_id();
thd->start_utime = my_micro_time();
thd->scheduler = &tp_event_functions;
Global_THD_manager::get_instance()->add_thd(thd);
thd->event_scheduler.data = connection;
// 分配一个线程组
thread_group_t *group = &all_groups[thd->thread_id() % group_count];
// 将new connetion添加到工作队列,实际登录操作由worker完成
queue_put(group, connection);
> queue_put(thread_group_t *thread_group, connection_t *connection) {
connection->tickets = connection->thd->variables.threadpool_high_prio_tickets; // 取票
thread_group->queue.push_back(connection); // 加入工作队列
// 如果活跃线程为0则创建或唤醒一个工作线程
wake_or_create_thread(thread_group, connection->thd->is_admin_connection());
> wake_or_create_thread() {
// 若唤醒成功则退出
// 唤醒过程就是在waiting_threads中找第一个线程,然后给它发送信号量
if (wake_thread(thread_group) == 0) DBUG_RETURN(0);
// 1. 当前线程组线程数大于连接数,则不用创建,返回
// 2. 如果活跃线程为0或为admin_connection连接,则创建直接工作线程
// 3. 如果time_since_last_thread_created大于限制创建的时间则创建
create_worker(thread_group, admin_connection) {
// 创建工作线程 worker_main
> err = mysql_thread_create(key_worker_thread, &thread_id,
thread_group->pthread_attr, worker_main,
thread_group);
}
}
}
}
工作线程(worker_main)
worker_main主要工作内容:
- get_event: 阻塞的从当前线程组中获取一个的待处理事件(connection with pending event)
- handle_event:处理该连接
worker_main()
static void *worker_main(void *param) {
/* Init per-thread structure */
worker_thread_t this_thread;
mysql_cond_init(key_worker_cond, &this_thread.cond);
this_thread.thread_group = thread_group;
this_thread.event_count = 0;
/* Run event loop */
for (;;) {
connection_t *connection;
struct timespec ts;
set_timespec(&ts, threadpool_idle_timeout);
connection = get_event(&this_thread, thread_group, &ts);
if (!connection) break; // 如果超时已过期或关闭,则返回NULL,即退出线程
this_thread.event_count++;
handle_event(connection);
}
/* Thread shutdown: cleanup per-worker-thread structure. */
...
my_thread_end();
return nullptr;
}
工作线程的退出的情况(get_event返回空):
- 线程组被关闭
- 检测到too_many_busy_threads,与参数thread_pool_oversubscribe有关(限制其小于1 + thread_pool_oversubscribe)
- 睡眠时mysql_cond_timedwait超时或出错
- 睡眠时mysql_cond_wait出错
get_event():从队列或socket拿取一个事件
- oversubscribed表示是否系统CUP负载太高,已防止同时运行太多线程;与参数threadpool_oversubscribe有关
- 高优先级事件:与参数thread_pool_high_prio_mode有关
- 工作线程的睡眠状态:在get_event()中,停在current_thread->cond信号上(mysql_cond_timedwait/mysql_cond_wait)
get_event()主要流程概述:
- 非oversubscribed时
- 从队列中取一个连接/事件并返回,没取到则继续
- 检查是否有监听线程,没有则变成监听线程;有则继续
- 从连接线程组socket中取一个连接或事件
- 没取到连接,进入睡眠状态
- 拿到连接,如果是高优先级事件则直接返回
- 否则判断是否有太多忙线程,忙将事件放入队列并进入睡眠状态;否则返回连接
- oversubscribed时
- 检查是否有监听线程,没有则变成监听线程;有则继续
- 进入睡眠状态
listener():监听线程
监听线程主要工作:
- (优先)队列中有事件时,常驻;负责获取socket中的事件并将它们分发给工作线程(推入队列中)
- (优先)队列中无事件时,即空闲时:讲N-1个事件推入队列,返回第一个事件,最终get_event()会返回并由handle_event()处理
handle_event():实际处理连接
handle_event()主要工作:
- 未登录状态
- 进行threadpool_add_connection,进行一些初始化、登录等工作(thd_prepare_connection)
- 处理的是从add_conntion被放入队列中的新连接
- 已登录状态
- 进行threadpool_process_request,最后进入do_command进行处理事务
- 处理的是get_event或listener(队列为空时)从socket获取的事件
static void handle_event(connection_t *connection) {
...
if (!connection->logged_in) {
err = threadpool_add_connection(connection->thd);
connection->logged_in = true;
} else {
err = threadpool_process_request(connection->thd);
}
...
set_wait_timeout(connection);
}
对于threadpool_process_request函数:
函数中的循环与thread-per-connections的逻辑类似,但是这个函数的目标是执行单个QUERY,所以正常情况下,下面的循环只会执行一次
对于SSL的连接,可能会执行多次,由vio->has_data()判断数据是否结束
/**
Process a single client request or a single batch.
*/
int threadpool_process_request(THD *thd) {
thread_attach(thd);
...
for (;;) {
Vio *vio;
thd_set_net_read_write(thd, 0);
if ((retval = do_command(thd)) != 0) goto end;
if (!thd_connection_alive(thd)) {
retval = 1;
goto end;
}
vio = thd->get_protocol_classic()->get_vio();
if (!vio->has_data(vio)) {
/* More info on this debug sync is in sql_parse.cc*/
DEBUG_SYNC(thd, "before_do_command_net_read");
thd_set_net_read_write(thd, 1);
goto end;
}
if (!thd->m_server_idle) {
MYSQL_SOCKET_SET_STATE(vio->mysql_socket, PSI_SOCKET_STATE_IDLE);
MYSQL_START_IDLE_WAIT(thd->m_idle_psi, &thd->m_idle_state);
thd->m_server_idle = true;
}
}
end:
...
return retval;
}
计时器线程(timer_thread)
计时器线程主要工作:
- 检查每个线程组是否停滞(stall)
- 检查每个连接(THD)是否超时,超时则kill掉
static void *timer_thread(void *param) noexcept {
...
for (;;) {
...
if (err == ETIMEDOUT) {
timer->current_microtime.store(my_microsecond_getsystime(),
std::memory_order_relaxed);
/* Check stalls in thread groups */
for (size_t i = 0; i < array_elements(all_groups); i++) {
if (all_groups[i].connection_count) check_stall(&all_groups[i]);
}
/* Check if any client exceeded wait_timeout */
if (timer->next_timeout_check.load(std::memory_order_relaxed) <=
timer->current_microtime.load(std::memory_order_relaxed))
timeout_check(timer);
}
}
...
}
check_stall流程:
- 检测listener是否存在
- 如果不存在,检查是否由io事件被弹出
- 如果没有,表明listener被阻塞,创建一个工作线程(而后它会变成listener)并退出
- 重置io_event_count
- 判断是否线程组是否stall:
- thread_group->queue_event_count表示从队列中被处理事件的数量,每次会被重置
- 如果自从上一次check_stall到现在仍然为0且队列和优先队列不为空,则确定stall
- 如果确定stall了,则设置thread_group->stalled,唤醒,或创建新工作线程
- 当一个事件从队列中出去,则表明恢复正常,重置thread_group->stalled
static void check_stall(thread_group_t *thread_group) {
...
// 如果listener不存在,则创建一个工作线程(而后它会变成listener)
if (!thread_group->listener && !thread_group->io_event_count) {
wake_or_create_thread(thread_group);
mysql_mutex_unlock(&thread_group->mutex);
return;
}
// 重置io_event_count
thread_group->io_event_count = 0;
// queue_event_count为0且队列和优先队列不为空,确定stall
if (!thread_group->queue_event_count && !queues_are_empty(*thread_group)) {
thread_group->stalled = true;
wake_or_create_thread(thread_group);
}
// check完后重置queue_event_count
thread_group->queue_event_count = 0;
mysql_mutex_unlock(&thread_group->mutex);
}
线程池退出
调用tp_end(), 回收线程组资源,停止定时器线程,最终会等待所有线程结束后才会退出
线程池相关参数
系统变量
变量 |
说明 |
thread_handling |
pool-of-threads开启线程池模式 |
threadpool_idle_timeout |
用于限制空闲线程退出前应该等待的时间 |
threadpool_high_prio_mode |
用于对全局或每个连接的高优先级调度提供更细粒度的控制 |
threadpool_high_prio_tickets |
控制高优先级队列策略,为每个新连接分配的票数,以进入高优先级队列。将此变量设置为0将禁用高优先级队列 |
threadpool_max_threads |
用于限制线程池中的最大线程数 |
threadpool_oversubscribe |
该参数的值越高,可以同时运行的线程就越多,如果该值低于3,则可能导致更多的睡眠和唤醒 |
threadpool_size |
用于定义同时可以使用CPU的线程数 |
threadpool_stall_limit |
认定为stall的时间,当达到此限制时,线程池将唤醒或创建另一个线程 |
状态变量
变量 |
说明 |
Threadpool_idle_threads |
显示线程池中空闲线程的数量 |
Threadpool_threads |
显示线程池中的线程数。 |
参考文档:
MySQL源码阅读2-连接与线程管理
线程池移植原理文档
percona线程池文档