searchusermenu
  • 发布文章
  • 消息中心
点赞
收藏
评论
分享
原创

线程池原理

2023-04-11 01:21:35
60
0

author: 覃育龙
update:2023/3/16

本文主要讲移植入mysql中的线程池的原理

MySQL中的连接管理

连接和线程管理相关的类

mysql中连接和线程管理相关的代码主要在mysql/sql/conn_handler中,其中主要涉及的类包括:

  • Connection_handler_manager:单例类,用来管理连接处理器;
  • Connection_handler:连接处理的抽象类,具体实现由其子类实现;
  • Per_thread_connection_handler:继承了Connection_handler,每一个连接用单独的线程处理,默认为该实现,通过thread_handling参数可以设置;
  • One_thread_connection_handler:继承了Connection_handler,所有连接用同一个线程处理;
  • Plugin_connection_handler:继承了Connection_handler,支持由Plugin具体实现的handler,例如线程池;
  • Connection_acceptor:一个模版类,以模版方式支持不同的监听实现(三个listener)。并且提供一个死循环,用以监听连接;
  • Mysqld_socket_listener:实现以Socket的方式监听客户端的连接事件,并且支持TCP socket和Unix socket,即对应的TCP_socket和Unix_socket;
  • Shared_mem_listener:通过共享内存的监听方式(windows下);
  • Named_pipe_listener:通过命名管道来监听和接收客户请求(windows下);
  • Channel_info:连接信道的抽象类,具体实现有Channel_info_local_socket和Channel_info_tcpip_socket,
  • Channel_info_local_socket:与本地方式与服务器进行交互;
  • Channel_info_tcpip_socket:以TCP/IP方式与服务器进行交互;
  • TCP_socket:TCP socket,与MySQL服务不同机器的连接访问;
  • Unix_socket:Unix socket,与MySQL服务相同主机的连接访问;

其类图关系如下:

其中,三个Listener将作为Connection_acceptor模版类作为具体的Listener。
简单来说就是通过listener监听请求,并创建连接Channel_Info的具体类,然后通过单例Connection_handler_manager指派给具体的Connection_Handler(Per_thread_connection_handler/One_thread_connection_handler/Thread_pool_connection_handler)进行调度。

连接的流程

连接管理器的初始化(Connection_handler_manager)

Connection_handler_manager这个单例类初始化时会根据server启动参数或配置文件来初始化,根据thread_handling来初始化connection_handler
其中Per_thread_connection_handler无论mysqld使用哪种方式均会被初始化

> mysqld_main()
  > init_common_variables() // 初始化变量
    > get_options() // 获取server启动参数
      > Connection_handler_manager::init() {// 初始化连接管理的单例类
        Per_thread_connection_handler::init()
        Connection_handler *connection_handler = nullptr;
        switch (Connection_handler_manager::thread_handling) {
          case SCHEDULER_ONE_THREAD_PER_CONNECTION:
            connection_handler = new (std::nothrow) Per_thread_connection_handler();
            break;
          case SCHEDULER_NO_THREADS:
            connection_handler = new (std::nothrow) One_thread_connection_handler();
            break;
          case SCHEDULER_THREAD_POOL:
            connection_handler = new (std::nothrow) Thread_pool_connection_handler();
            break;
          default:
            assert(false);
        }

        m_instance =
            new (std::nothrow) Connection_handler_manager(connection_handler);
      }

监听处理循环

mysqld主函数开启后,进入Connection_acceptor::connection_event_loop,在windows系统上是由三个线程分别调用Socket/NamePipe/SharedMemory的connection_event_loop;(Connection_acceptor是一个模板类,分别有3种实现)

//sql/conn_handler/connection_acceptor.h
  void connection_event_loop() {
    Connection_handler_manager *mgr =
        Connection_handler_manager::get_instance();
    while (!connection_events_loop_aborted()) {
      Channel_info *channel_info = m_listener->listen_for_connection_event();
      if (channel_info != nullptr) mgr->process_new_connection(channel_info);
    }
  }

该函数作用就是通过监听客户请求listen_for_connection_event(),然后处理请求m_listener->process_new_connection()
其中通过connection_events_loop_aborted()来判断是否被终止

Mysqld_socket_listener的处理

初始化

通过Mysqld_socket_listener::setup_listener()来进行初始化工作。
Mysqld_socket_listener具体监听的列表存储在m_socket_vector中,由setup_listener()来完成初始化。
在m_socket_vector插入的顺序为: admin socket-> tcp socket-> unix socket -> event socket。 所以说linster会监听多个socket,最后才是普通的socket。

获取请求
Mysqld_socket_listener::listen_for_connection_event() {
  #ifdef HAVE_POLL
    int retval = poll();
  #else
    int retval = select();
  // 获取一个新的连接请求
  const Listen_socket *listen_socket = get_listen_socket();
  ...
  // 接受请求
  accept_connection(listen_socket->m_socket, &onnect_sock)
  // 构造Channel_info
  Channel_info *channel_info = nullptr;
  if (listen_socket->m_socket_type == ocket_type::UNIX_SOCKET)
    channel_info = new (std::nothrow) hannel_info_local_socket(connect_sock);
  else
    channel_info = new (std::nothrow) hannel_info_tcpip_socket(...);
  return channel_info;
}

通过Mysqld_socket_listener::listen_for_connection_event()来监听请求,有poll与select两种方式。

在监听到有新请求之后,调用Mysqld_socket_listener::get_listen_socket()获取监听到新请求的socket,具体逻辑如下:

  • 首先检查是否有admin的请求,有则返回admin的socket(首先处理admin socket)
  • 返回第一个有请求的socket
接受请求

获取到有效socket后,进入accept_connection()->mysql_socket_accept调用accept接收请求。
然后构造一个Channel_info,Channel_info_local_socket(localhost登录)或Channel_info_tcpip_socket(ip登录),并作为listen_for_connection_event()的返回值,所有与连接的相关的信息都保存在Channel_info中。

处理新连接(process_new_connection)
// sql/conn_handler/connection_handler_manager.cc
void Connection_handler_manager::process_new_connection(
    Channel_info *channel_info) {
  if (connection_events_loop_aborted() ||
      !check_and_incr_conn_count(channel_info->is_admin_connection())) {
    channel_info->send_error_and_close_channel(ER_CON_COUNT_ERROR, 0, true);
    sql_print_warning("%s", ER_DEFAULT(ER_CON_COUNT_ERROR));
    delete channel_info;
    return;
  }

  if (m_connection_handler->add_connection(channel_info)) {
    inc_aborted_connects();
    delete channel_info;
  }
}

1.首先判断是否超过连接上限
其代码逻辑如下:

  • 首先检查服务是否已经停止,然后check_and_incr_conn_count增加连接计数,如果超过连接上限,那么将拒绝连接,并关闭连接;
  • 将连接传入Connection_handler::add_connection()中

实际上mysql会运行max_connections + 1个连接,当连接数满时,为super用户保留最后一个连接

2.执行add_connection()
最终add_connetion()的执行mysql中配置的线程处理方式决定,有三种:

  • one-thread-per-connection: 每个连接一个线程,线程关闭后线程会等待,直到下次新的连接后复用该线程。
  • no-threads:在一个工作线程中处理的所有连接。
  • pool-of-threads: 线程池方式
one-thread-per-connection方式

先查看是否有空闲线程,如果有则直接用空闲线程处理;否则将创建一个新的线程来处理新连接。

bool Per_thread_connection_handler::add_connection(Channel_info *channel_info) {
  int error = 0;
  my_thread_handle id;

  if (!check_idle_thread_and_enqueue_connection(channel_info)) return false;

  /*
    There are no idle threads avaliable to take up the new
    connection. Create a new thread to handle the connection
  */
  channel_info->set_prior_thr_create_utime();
  error =
      mysql_thread_create(key_thread_one_connection, &id, &connection_attrib,
                          handle_connection, (void *)channel_info);
  ...
  Global_THD_manager::get_instance()->inc_thread_created();
  DBUG_PRINT("info", ("Thread created"));
  return false;
}

check_idle_thread_and_enqueue_connection 做的事情就是检查当前是否有空闲状态的线程;有的话,则将channel_info加入到队列中,并向空闲线程发送信号;否则创建新线程,并执行handle_connection函数。

handle_connection函数的逻辑如下:

  • 首先是初始化线程需要的内存;
  • 然后创建一个THD对象init_new_thd;
  • 创建/或重用psi对象,并加到thd对象;
  • 将thd对象加到thd manager中;
  • 调用thd_prepare_connection做验证;
  • do_command(thd)处理query
  • end_connection,关闭一个已经建立的连接,
  • close_connection,disconnect关闭当前会话的vio;
  • 释放资源thd中的资源
  • 阻塞等待下一个连接,block_until_new_connection(),监听接收前面提到的check_idle_thread_and_enqueue_connection发过来的信号

线程池

线程池的总览

主要数据结构

connection_t

连接结构体,一个连接/事件对应一个connection_t,一个connection_t对应一个THD

struct connection_t {
  THD *thd;                       // THD
  thread_group_t *thread_group;   // 该连接所在的线程组
  connection_t *next_in_queue;    // 用于connection_queue_t
  connection_t **prev_in_queue;   // 用于connection_queue_t
  ulonglong abs_wait_timeout;     // 连接时间,用于超时关闭
  bool logged_in;                 // 是否登入
  bool bound_to_poll_descriptor;  // 是否获取到poolfd
  bool waiting;                   // 是否在等待
  uint tickets;                   // threadpool_high_prio_mode=TP_HIGH_PRIO_MODE_TRANSACTIONS下,在高优先级队列中的票数
};

thread_group_t

线程组结构体,在该线程池的框架中,最多有128(MAX_THREAD_GROUPS)个线程组

struct alignas(128) thread_group_t {
  mysql_mutex_t mutex;
  connection_queue_t queue;           // 待处理连接队列
  connection_queue_t high_prio_queue; // 高优先级待处理连接队列
  worker_list_t waiting_threads;      // 等待线程队列,线程每次执行完后会进入睡眠状态等待下次唤醒工作
  worker_thread_t *listener;          // 监听线程,负责监听命令事件,事件队列为空时也会充当工作线程
  pthread_attr_t *pthread_attr;       // 线程属性
  int pollfd;                         // poll文件描述符
  int thread_count;                   // 线程数
  int active_thread_count;            // 活跃线程数
  int connection_count;               // 连接数
  int waiting_thread_count;           // 活跃连接数
  /* Stats for the deadlock detection timer routine.*/
  int io_event_count;                 // io事件数
  int queue_event_count;              // 队列中已处理事件数量
  ulonglong last_thread_creation_time;// 上一个线程创建的时间
  int shutdown_pipe[2];               // 管道通信
  bool shutdown;                      // 关闭标志
  bool stalled;                       // 停滞标志
  char padding[328];                  // 填充
};

waiting_threads: 等待线程队列,线程每次执行完后会将线程添加到等待列表的头部并等待。重要的是要把线加在头部而不是尾部,因为它确保了LIFO唤醒顺序(热缓存,工作不活动超时)

worker_thread_t

工作线程结构体,一个结构体对应一个worker

struct worker_thread_t {
  ulonglong event_count; /* number of request handled by this thread */
  thread_group_t *thread_group;   // 所在线程组
  worker_thread_t *next_in_list;  // 用于worker_list_t
  worker_thread_t **prev_in_list; // 用于worker_list_t
  mysql_cond_t cond;              // 用于唤醒线程的信号量
  bool woken;                     // 是否被唤醒
};

pool_timer_t

用于所有线程组的一个全局计时器

struct pool_timer_t {
  mysql_mutex_t mutex;                    // 锁
  mysql_cond_t cond;                      // 信号量
  std::atomic<uint64> current_microtime;  // 当前时间,微秒
  std::atomic<uint64> next_timeout_check; // 下一次检查的事件
  int tick_interval;                      // 节拍
  bool shutdown;                          // 关闭标志
};

线程池初始化

线程池的初始化在tp_init()中

//sql/threadpool_unix.cc
bool tp_init() {
  DBUG_ENTER("tp_init");
  threadpool_started = true;

  for (uint i = 0; i < array_elements(all_groups); i++) {
    thread_group_init(&all_groups[i], get_connection_attrib());
  }
  tp_set_threadpool_size(threadpool_size);
  ...

  pool_timer.tick_interval = threadpool_stall_limit;
  start_timer(&pool_timer);
  DBUG_RETURN(false);
}

在tp_init()主要初始化工作:

  1. 初始化所有(MAX_THREAD_GROUPS 128)的线程组, 初始化每个线程组的属性和状态等信息
  2. 初始化threadpool_size个线程组的pollfd
  3. 启动start_timer定时器线程

处理新连接

将新连接加入线程池

下面是线程池从add_connetion开始,将新连接加入到线程池中的过程

> Thread_pool_connection_handler::add_connection(Channel_info *channel_info) {
  THD *const thd = channel_info->create_thd(); // 创建THD
  connection_t *const connection = alloc_connection(thd); // 初始化connection_t
  ...
  // 初始化thd的一些信息
  thd->set_new_thread_id();
  thd->start_utime = my_micro_time();
  thd->scheduler = &tp_event_functions;
  Global_THD_manager::get_instance()->add_thd(thd);
  thd->event_scheduler.data = connection;
  // 分配一个线程组
  thread_group_t *group = &all_groups[thd->thread_id() % group_count];
  // 将new connetion添加到工作队列,实际登录操作由worker完成
  queue_put(group, connection);
  > queue_put(thread_group_t *thread_group, connection_t *connection) {
    connection->tickets = connection->thd->variables.threadpool_high_prio_tickets; // 取票
    thread_group->queue.push_back(connection); // 加入工作队列
    // 如果活跃线程为0则创建或唤醒一个工作线程
    wake_or_create_thread(thread_group, connection->thd->is_admin_connection());
    > wake_or_create_thread() {
      // 若唤醒成功则退出
      // 唤醒过程就是在waiting_threads中找第一个线程,然后给它发送信号量
      if (wake_thread(thread_group) == 0) DBUG_RETURN(0);
      // 1. 当前线程组线程数大于连接数,则不用创建,返回
      // 2. 如果活跃线程为0或为admin_connection连接,则创建直接工作线程
      // 3. 如果time_since_last_thread_created大于限制创建的时间则创建
      create_worker(thread_group, admin_connection) {
        // 创建工作线程 worker_main
       > err = mysql_thread_create(key_worker_thread, &thread_id,
                thread_group->pthread_attr, worker_main,
                thread_group);
      }
    }
  }
}

工作线程(worker_main)

worker_main主要工作内容:

  1. get_event: 阻塞的从当前线程组中获取一个的待处理事件(connection with pending event)
  2. handle_event:处理该连接
worker_main()
static void *worker_main(void *param) {
  /* Init per-thread structure */
  worker_thread_t this_thread;
  mysql_cond_init(key_worker_cond, &this_thread.cond);
  this_thread.thread_group = thread_group;
  this_thread.event_count = 0;
  /* Run event loop */
  for (;;) {
    connection_t *connection;
    struct timespec ts;
    set_timespec(&ts, threadpool_idle_timeout);
    connection = get_event(&this_thread, thread_group, &ts);
    if (!connection) break; // 如果超时已过期或关闭,则返回NULL,即退出线程
    this_thread.event_count++;
    handle_event(connection);
  }

  /* Thread shutdown: cleanup per-worker-thread structure. */
  ...
  my_thread_end();
  return nullptr;
}

工作线程的退出的情况(get_event返回空):

  • 线程组被关闭
  • 检测到too_many_busy_threads,与参数thread_pool_oversubscribe有关(限制其小于1 + thread_pool_oversubscribe)
  • 睡眠时mysql_cond_timedwait超时或出错
  • 睡眠时mysql_cond_wait出错
get_event():从队列或socket拿取一个事件
  1. oversubscribed表示是否系统CUP负载太高,已防止同时运行太多线程;与参数threadpool_oversubscribe有关
  2. 高优先级事件:与参数thread_pool_high_prio_mode有关
  3. 工作线程的睡眠状态:在get_event()中,停在current_thread->cond信号上(mysql_cond_timedwait/mysql_cond_wait)

get_event()主要流程概述:

  1. 非oversubscribed时
    • 从队列中取一个连接/事件并返回,没取到则继续
    • 检查是否有监听线程,没有则变成监听线程;有则继续
    • 从连接线程组socket中取一个连接或事件
      • 没取到连接,进入睡眠状态
      • 拿到连接,如果是高优先级事件则直接返回
      • 否则判断是否有太多忙线程,忙将事件放入队列并进入睡眠状态;否则返回连接
  1. oversubscribed时
    • 检查是否有监听线程,没有则变成监听线程;有则继续
    • 进入睡眠状态
listener():监听线程

监听线程主要工作:

  • (优先)队列中有事件时,常驻;负责获取socket中的事件并将它们分发给工作线程(推入队列中)
  • (优先)队列中无事件时,即空闲时:讲N-1个事件推入队列,返回第一个事件,最终get_event()会返回并由handle_event()处理
handle_event():实际处理连接

handle_event()主要工作:

  1. 未登录状态
    • 进行threadpool_add_connection,进行一些初始化、登录等工作(thd_prepare_connection)
    • 处理的是从add_conntion被放入队列中的新连接
  2. 已登录状态
    • 进行threadpool_process_request,最后进入do_command进行处理事务
    • 处理的是get_event或listener(队列为空时)从socket获取的事件
static void handle_event(connection_t *connection) {
  ...
  if (!connection->logged_in) {
    err = threadpool_add_connection(connection->thd);
    connection->logged_in = true;
  } else {
    err = threadpool_process_request(connection->thd);
  }
  ...
  set_wait_timeout(connection);
}

对于threadpool_process_request函数:
函数中的循环与thread-per-connections的逻辑类似,但是这个函数的目标是执行单个QUERY,所以正常情况下,下面的循环只会执行一次
对于SSL的连接,可能会执行多次,由vio->has_data()判断数据是否结束

/**
 Process a single client request or a single batch.
*/
int threadpool_process_request(THD *thd) {
  thread_attach(thd);
  ...
  for (;;) {
    Vio *vio;
    thd_set_net_read_write(thd, 0);

    if ((retval = do_command(thd)) != 0) goto end;

    if (!thd_connection_alive(thd)) {
      retval = 1;
      goto end;
    }

    vio = thd->get_protocol_classic()->get_vio();
    if (!vio->has_data(vio)) {
      /* More info on this debug sync is in sql_parse.cc*/
      DEBUG_SYNC(thd, "before_do_command_net_read");
      thd_set_net_read_write(thd, 1);
      goto end;
    }
    if (!thd->m_server_idle) {
      MYSQL_SOCKET_SET_STATE(vio->mysql_socket, PSI_SOCKET_STATE_IDLE);
      MYSQL_START_IDLE_WAIT(thd->m_idle_psi, &thd->m_idle_state);
      thd->m_server_idle = true;
    }
  }

end:
  ...
  return retval;
}

计时器线程(timer_thread)

计时器线程主要工作:

  1. 检查每个线程组是否停滞(stall)
  2. 检查每个连接(THD)是否超时,超时则kill掉
static void *timer_thread(void *param) noexcept {
  ...
  for (;;) {
    ...
    if (err == ETIMEDOUT) {
      timer->current_microtime.store(my_microsecond_getsystime(),
                                     std::memory_order_relaxed);

      /* Check stalls in thread groups */
      for (size_t i = 0; i < array_elements(all_groups); i++) {
        if (all_groups[i].connection_count) check_stall(&all_groups[i]);
      }

      /* Check if any client exceeded wait_timeout */
      if (timer->next_timeout_check.load(std::memory_order_relaxed) <=
          timer->current_microtime.load(std::memory_order_relaxed))
        timeout_check(timer);
    }
  }
  ...
}

check_stall流程:

  1. 检测listener是否存在
    • 如果不存在,检查是否由io事件被弹出
    • 如果没有,表明listener被阻塞,创建一个工作线程(而后它会变成listener)并退出
  2. 重置io_event_count
  3. 判断是否线程组是否stall:
    • thread_group->queue_event_count表示从队列中被处理事件的数量,每次会被重置
    • 如果自从上一次check_stall到现在仍然为0且队列和优先队列不为空,则确定stall
    • 如果确定stall了,则设置thread_group->stalled,唤醒,或创建新工作线程
    • 当一个事件从队列中出去,则表明恢复正常,重置thread_group->stalled
static void check_stall(thread_group_t *thread_group) {
  ...
  // 如果listener不存在,则创建一个工作线程(而后它会变成listener)
  if (!thread_group->listener && !thread_group->io_event_count) {
    wake_or_create_thread(thread_group);
    mysql_mutex_unlock(&thread_group->mutex);
    return;
  }
  // 重置io_event_count
  thread_group->io_event_count = 0;
  // queue_event_count为0且队列和优先队列不为空,确定stall
  if (!thread_group->queue_event_count && !queues_are_empty(*thread_group)) {
    thread_group->stalled = true;
    wake_or_create_thread(thread_group);
  }
  // check完后重置queue_event_count
  thread_group->queue_event_count = 0;

  mysql_mutex_unlock(&thread_group->mutex);
}

线程池退出

调用tp_end(), 回收线程组资源,停止定时器线程,最终会等待所有线程结束后才会退出

线程池相关参数

系统变量

变量

说明

thread_handling

pool-of-threads开启线程池模式

threadpool_idle_timeout

用于限制空闲线程退出前应该等待的时间

threadpool_high_prio_mode

用于对全局或每个连接的高优先级调度提供更细粒度的控制

threadpool_high_prio_tickets

控制高优先级队列策略,为每个新连接分配的票数,以进入高优先级队列。将此变量设置为0将禁用高优先级队列

threadpool_max_threads

用于限制线程池中的最大线程数

threadpool_oversubscribe

该参数的值越高,可以同时运行的线程就越多,如果该值低于3,则可能导致更多的睡眠和唤醒

threadpool_size

用于定义同时可以使用CPU的线程数

threadpool_stall_limit

认定为stall的时间,当达到此限制时,线程池将唤醒或创建另一个线程

状态变量

变量

说明

Threadpool_idle_threads

显示线程池中空闲线程的数量

Threadpool_threads

显示线程池中的线程数。

参考文档:

MySQL源码阅读2-连接与线程管理
线程池移植原理文档
percona线程池文档

0条评论
0 / 1000
qinyl
6文章数
0粉丝数
qinyl
6 文章 | 0 粉丝
qinyl
6文章数
0粉丝数
qinyl
6 文章 | 0 粉丝
原创

线程池原理

2023-04-11 01:21:35
60
0

author: 覃育龙
update:2023/3/16

本文主要讲移植入mysql中的线程池的原理

MySQL中的连接管理

连接和线程管理相关的类

mysql中连接和线程管理相关的代码主要在mysql/sql/conn_handler中,其中主要涉及的类包括:

  • Connection_handler_manager:单例类,用来管理连接处理器;
  • Connection_handler:连接处理的抽象类,具体实现由其子类实现;
  • Per_thread_connection_handler:继承了Connection_handler,每一个连接用单独的线程处理,默认为该实现,通过thread_handling参数可以设置;
  • One_thread_connection_handler:继承了Connection_handler,所有连接用同一个线程处理;
  • Plugin_connection_handler:继承了Connection_handler,支持由Plugin具体实现的handler,例如线程池;
  • Connection_acceptor:一个模版类,以模版方式支持不同的监听实现(三个listener)。并且提供一个死循环,用以监听连接;
  • Mysqld_socket_listener:实现以Socket的方式监听客户端的连接事件,并且支持TCP socket和Unix socket,即对应的TCP_socket和Unix_socket;
  • Shared_mem_listener:通过共享内存的监听方式(windows下);
  • Named_pipe_listener:通过命名管道来监听和接收客户请求(windows下);
  • Channel_info:连接信道的抽象类,具体实现有Channel_info_local_socket和Channel_info_tcpip_socket,
  • Channel_info_local_socket:与本地方式与服务器进行交互;
  • Channel_info_tcpip_socket:以TCP/IP方式与服务器进行交互;
  • TCP_socket:TCP socket,与MySQL服务不同机器的连接访问;
  • Unix_socket:Unix socket,与MySQL服务相同主机的连接访问;

其类图关系如下:

其中,三个Listener将作为Connection_acceptor模版类作为具体的Listener。
简单来说就是通过listener监听请求,并创建连接Channel_Info的具体类,然后通过单例Connection_handler_manager指派给具体的Connection_Handler(Per_thread_connection_handler/One_thread_connection_handler/Thread_pool_connection_handler)进行调度。

连接的流程

连接管理器的初始化(Connection_handler_manager)

Connection_handler_manager这个单例类初始化时会根据server启动参数或配置文件来初始化,根据thread_handling来初始化connection_handler
其中Per_thread_connection_handler无论mysqld使用哪种方式均会被初始化

> mysqld_main()
  > init_common_variables() // 初始化变量
    > get_options() // 获取server启动参数
      > Connection_handler_manager::init() {// 初始化连接管理的单例类
        Per_thread_connection_handler::init()
        Connection_handler *connection_handler = nullptr;
        switch (Connection_handler_manager::thread_handling) {
          case SCHEDULER_ONE_THREAD_PER_CONNECTION:
            connection_handler = new (std::nothrow) Per_thread_connection_handler();
            break;
          case SCHEDULER_NO_THREADS:
            connection_handler = new (std::nothrow) One_thread_connection_handler();
            break;
          case SCHEDULER_THREAD_POOL:
            connection_handler = new (std::nothrow) Thread_pool_connection_handler();
            break;
          default:
            assert(false);
        }

        m_instance =
            new (std::nothrow) Connection_handler_manager(connection_handler);
      }

监听处理循环

mysqld主函数开启后,进入Connection_acceptor::connection_event_loop,在windows系统上是由三个线程分别调用Socket/NamePipe/SharedMemory的connection_event_loop;(Connection_acceptor是一个模板类,分别有3种实现)

//sql/conn_handler/connection_acceptor.h
  void connection_event_loop() {
    Connection_handler_manager *mgr =
        Connection_handler_manager::get_instance();
    while (!connection_events_loop_aborted()) {
      Channel_info *channel_info = m_listener->listen_for_connection_event();
      if (channel_info != nullptr) mgr->process_new_connection(channel_info);
    }
  }

该函数作用就是通过监听客户请求listen_for_connection_event(),然后处理请求m_listener->process_new_connection()
其中通过connection_events_loop_aborted()来判断是否被终止

Mysqld_socket_listener的处理

初始化

通过Mysqld_socket_listener::setup_listener()来进行初始化工作。
Mysqld_socket_listener具体监听的列表存储在m_socket_vector中,由setup_listener()来完成初始化。
在m_socket_vector插入的顺序为: admin socket-> tcp socket-> unix socket -> event socket。 所以说linster会监听多个socket,最后才是普通的socket。

获取请求
Mysqld_socket_listener::listen_for_connection_event() {
  #ifdef HAVE_POLL
    int retval = poll();
  #else
    int retval = select();
  // 获取一个新的连接请求
  const Listen_socket *listen_socket = get_listen_socket();
  ...
  // 接受请求
  accept_connection(listen_socket->m_socket, &onnect_sock)
  // 构造Channel_info
  Channel_info *channel_info = nullptr;
  if (listen_socket->m_socket_type == ocket_type::UNIX_SOCKET)
    channel_info = new (std::nothrow) hannel_info_local_socket(connect_sock);
  else
    channel_info = new (std::nothrow) hannel_info_tcpip_socket(...);
  return channel_info;
}

通过Mysqld_socket_listener::listen_for_connection_event()来监听请求,有poll与select两种方式。

在监听到有新请求之后,调用Mysqld_socket_listener::get_listen_socket()获取监听到新请求的socket,具体逻辑如下:

  • 首先检查是否有admin的请求,有则返回admin的socket(首先处理admin socket)
  • 返回第一个有请求的socket
接受请求

获取到有效socket后,进入accept_connection()->mysql_socket_accept调用accept接收请求。
然后构造一个Channel_info,Channel_info_local_socket(localhost登录)或Channel_info_tcpip_socket(ip登录),并作为listen_for_connection_event()的返回值,所有与连接的相关的信息都保存在Channel_info中。

处理新连接(process_new_connection)
// sql/conn_handler/connection_handler_manager.cc
void Connection_handler_manager::process_new_connection(
    Channel_info *channel_info) {
  if (connection_events_loop_aborted() ||
      !check_and_incr_conn_count(channel_info->is_admin_connection())) {
    channel_info->send_error_and_close_channel(ER_CON_COUNT_ERROR, 0, true);
    sql_print_warning("%s", ER_DEFAULT(ER_CON_COUNT_ERROR));
    delete channel_info;
    return;
  }

  if (m_connection_handler->add_connection(channel_info)) {
    inc_aborted_connects();
    delete channel_info;
  }
}

1.首先判断是否超过连接上限
其代码逻辑如下:

  • 首先检查服务是否已经停止,然后check_and_incr_conn_count增加连接计数,如果超过连接上限,那么将拒绝连接,并关闭连接;
  • 将连接传入Connection_handler::add_connection()中

实际上mysql会运行max_connections + 1个连接,当连接数满时,为super用户保留最后一个连接

2.执行add_connection()
最终add_connetion()的执行mysql中配置的线程处理方式决定,有三种:

  • one-thread-per-connection: 每个连接一个线程,线程关闭后线程会等待,直到下次新的连接后复用该线程。
  • no-threads:在一个工作线程中处理的所有连接。
  • pool-of-threads: 线程池方式
one-thread-per-connection方式

先查看是否有空闲线程,如果有则直接用空闲线程处理;否则将创建一个新的线程来处理新连接。

bool Per_thread_connection_handler::add_connection(Channel_info *channel_info) {
  int error = 0;
  my_thread_handle id;

  if (!check_idle_thread_and_enqueue_connection(channel_info)) return false;

  /*
    There are no idle threads avaliable to take up the new
    connection. Create a new thread to handle the connection
  */
  channel_info->set_prior_thr_create_utime();
  error =
      mysql_thread_create(key_thread_one_connection, &id, &connection_attrib,
                          handle_connection, (void *)channel_info);
  ...
  Global_THD_manager::get_instance()->inc_thread_created();
  DBUG_PRINT("info", ("Thread created"));
  return false;
}

check_idle_thread_and_enqueue_connection 做的事情就是检查当前是否有空闲状态的线程;有的话,则将channel_info加入到队列中,并向空闲线程发送信号;否则创建新线程,并执行handle_connection函数。

handle_connection函数的逻辑如下:

  • 首先是初始化线程需要的内存;
  • 然后创建一个THD对象init_new_thd;
  • 创建/或重用psi对象,并加到thd对象;
  • 将thd对象加到thd manager中;
  • 调用thd_prepare_connection做验证;
  • do_command(thd)处理query
  • end_connection,关闭一个已经建立的连接,
  • close_connection,disconnect关闭当前会话的vio;
  • 释放资源thd中的资源
  • 阻塞等待下一个连接,block_until_new_connection(),监听接收前面提到的check_idle_thread_and_enqueue_connection发过来的信号

线程池

线程池的总览

主要数据结构

connection_t

连接结构体,一个连接/事件对应一个connection_t,一个connection_t对应一个THD

struct connection_t {
  THD *thd;                       // THD
  thread_group_t *thread_group;   // 该连接所在的线程组
  connection_t *next_in_queue;    // 用于connection_queue_t
  connection_t **prev_in_queue;   // 用于connection_queue_t
  ulonglong abs_wait_timeout;     // 连接时间,用于超时关闭
  bool logged_in;                 // 是否登入
  bool bound_to_poll_descriptor;  // 是否获取到poolfd
  bool waiting;                   // 是否在等待
  uint tickets;                   // threadpool_high_prio_mode=TP_HIGH_PRIO_MODE_TRANSACTIONS下,在高优先级队列中的票数
};

thread_group_t

线程组结构体,在该线程池的框架中,最多有128(MAX_THREAD_GROUPS)个线程组

struct alignas(128) thread_group_t {
  mysql_mutex_t mutex;
  connection_queue_t queue;           // 待处理连接队列
  connection_queue_t high_prio_queue; // 高优先级待处理连接队列
  worker_list_t waiting_threads;      // 等待线程队列,线程每次执行完后会进入睡眠状态等待下次唤醒工作
  worker_thread_t *listener;          // 监听线程,负责监听命令事件,事件队列为空时也会充当工作线程
  pthread_attr_t *pthread_attr;       // 线程属性
  int pollfd;                         // poll文件描述符
  int thread_count;                   // 线程数
  int active_thread_count;            // 活跃线程数
  int connection_count;               // 连接数
  int waiting_thread_count;           // 活跃连接数
  /* Stats for the deadlock detection timer routine.*/
  int io_event_count;                 // io事件数
  int queue_event_count;              // 队列中已处理事件数量
  ulonglong last_thread_creation_time;// 上一个线程创建的时间
  int shutdown_pipe[2];               // 管道通信
  bool shutdown;                      // 关闭标志
  bool stalled;                       // 停滞标志
  char padding[328];                  // 填充
};

waiting_threads: 等待线程队列,线程每次执行完后会将线程添加到等待列表的头部并等待。重要的是要把线加在头部而不是尾部,因为它确保了LIFO唤醒顺序(热缓存,工作不活动超时)

worker_thread_t

工作线程结构体,一个结构体对应一个worker

struct worker_thread_t {
  ulonglong event_count; /* number of request handled by this thread */
  thread_group_t *thread_group;   // 所在线程组
  worker_thread_t *next_in_list;  // 用于worker_list_t
  worker_thread_t **prev_in_list; // 用于worker_list_t
  mysql_cond_t cond;              // 用于唤醒线程的信号量
  bool woken;                     // 是否被唤醒
};

pool_timer_t

用于所有线程组的一个全局计时器

struct pool_timer_t {
  mysql_mutex_t mutex;                    // 锁
  mysql_cond_t cond;                      // 信号量
  std::atomic<uint64> current_microtime;  // 当前时间,微秒
  std::atomic<uint64> next_timeout_check; // 下一次检查的事件
  int tick_interval;                      // 节拍
  bool shutdown;                          // 关闭标志
};

线程池初始化

线程池的初始化在tp_init()中

//sql/threadpool_unix.cc
bool tp_init() {
  DBUG_ENTER("tp_init");
  threadpool_started = true;

  for (uint i = 0; i < array_elements(all_groups); i++) {
    thread_group_init(&all_groups[i], get_connection_attrib());
  }
  tp_set_threadpool_size(threadpool_size);
  ...

  pool_timer.tick_interval = threadpool_stall_limit;
  start_timer(&pool_timer);
  DBUG_RETURN(false);
}

在tp_init()主要初始化工作:

  1. 初始化所有(MAX_THREAD_GROUPS 128)的线程组, 初始化每个线程组的属性和状态等信息
  2. 初始化threadpool_size个线程组的pollfd
  3. 启动start_timer定时器线程

处理新连接

将新连接加入线程池

下面是线程池从add_connetion开始,将新连接加入到线程池中的过程

> Thread_pool_connection_handler::add_connection(Channel_info *channel_info) {
  THD *const thd = channel_info->create_thd(); // 创建THD
  connection_t *const connection = alloc_connection(thd); // 初始化connection_t
  ...
  // 初始化thd的一些信息
  thd->set_new_thread_id();
  thd->start_utime = my_micro_time();
  thd->scheduler = &tp_event_functions;
  Global_THD_manager::get_instance()->add_thd(thd);
  thd->event_scheduler.data = connection;
  // 分配一个线程组
  thread_group_t *group = &all_groups[thd->thread_id() % group_count];
  // 将new connetion添加到工作队列,实际登录操作由worker完成
  queue_put(group, connection);
  > queue_put(thread_group_t *thread_group, connection_t *connection) {
    connection->tickets = connection->thd->variables.threadpool_high_prio_tickets; // 取票
    thread_group->queue.push_back(connection); // 加入工作队列
    // 如果活跃线程为0则创建或唤醒一个工作线程
    wake_or_create_thread(thread_group, connection->thd->is_admin_connection());
    > wake_or_create_thread() {
      // 若唤醒成功则退出
      // 唤醒过程就是在waiting_threads中找第一个线程,然后给它发送信号量
      if (wake_thread(thread_group) == 0) DBUG_RETURN(0);
      // 1. 当前线程组线程数大于连接数,则不用创建,返回
      // 2. 如果活跃线程为0或为admin_connection连接,则创建直接工作线程
      // 3. 如果time_since_last_thread_created大于限制创建的时间则创建
      create_worker(thread_group, admin_connection) {
        // 创建工作线程 worker_main
       > err = mysql_thread_create(key_worker_thread, &thread_id,
                thread_group->pthread_attr, worker_main,
                thread_group);
      }
    }
  }
}

工作线程(worker_main)

worker_main主要工作内容:

  1. get_event: 阻塞的从当前线程组中获取一个的待处理事件(connection with pending event)
  2. handle_event:处理该连接
worker_main()
static void *worker_main(void *param) {
  /* Init per-thread structure */
  worker_thread_t this_thread;
  mysql_cond_init(key_worker_cond, &this_thread.cond);
  this_thread.thread_group = thread_group;
  this_thread.event_count = 0;
  /* Run event loop */
  for (;;) {
    connection_t *connection;
    struct timespec ts;
    set_timespec(&ts, threadpool_idle_timeout);
    connection = get_event(&this_thread, thread_group, &ts);
    if (!connection) break; // 如果超时已过期或关闭,则返回NULL,即退出线程
    this_thread.event_count++;
    handle_event(connection);
  }

  /* Thread shutdown: cleanup per-worker-thread structure. */
  ...
  my_thread_end();
  return nullptr;
}

工作线程的退出的情况(get_event返回空):

  • 线程组被关闭
  • 检测到too_many_busy_threads,与参数thread_pool_oversubscribe有关(限制其小于1 + thread_pool_oversubscribe)
  • 睡眠时mysql_cond_timedwait超时或出错
  • 睡眠时mysql_cond_wait出错
get_event():从队列或socket拿取一个事件
  1. oversubscribed表示是否系统CUP负载太高,已防止同时运行太多线程;与参数threadpool_oversubscribe有关
  2. 高优先级事件:与参数thread_pool_high_prio_mode有关
  3. 工作线程的睡眠状态:在get_event()中,停在current_thread->cond信号上(mysql_cond_timedwait/mysql_cond_wait)

get_event()主要流程概述:

  1. 非oversubscribed时
    • 从队列中取一个连接/事件并返回,没取到则继续
    • 检查是否有监听线程,没有则变成监听线程;有则继续
    • 从连接线程组socket中取一个连接或事件
      • 没取到连接,进入睡眠状态
      • 拿到连接,如果是高优先级事件则直接返回
      • 否则判断是否有太多忙线程,忙将事件放入队列并进入睡眠状态;否则返回连接
  1. oversubscribed时
    • 检查是否有监听线程,没有则变成监听线程;有则继续
    • 进入睡眠状态
listener():监听线程

监听线程主要工作:

  • (优先)队列中有事件时,常驻;负责获取socket中的事件并将它们分发给工作线程(推入队列中)
  • (优先)队列中无事件时,即空闲时:讲N-1个事件推入队列,返回第一个事件,最终get_event()会返回并由handle_event()处理
handle_event():实际处理连接

handle_event()主要工作:

  1. 未登录状态
    • 进行threadpool_add_connection,进行一些初始化、登录等工作(thd_prepare_connection)
    • 处理的是从add_conntion被放入队列中的新连接
  2. 已登录状态
    • 进行threadpool_process_request,最后进入do_command进行处理事务
    • 处理的是get_event或listener(队列为空时)从socket获取的事件
static void handle_event(connection_t *connection) {
  ...
  if (!connection->logged_in) {
    err = threadpool_add_connection(connection->thd);
    connection->logged_in = true;
  } else {
    err = threadpool_process_request(connection->thd);
  }
  ...
  set_wait_timeout(connection);
}

对于threadpool_process_request函数:
函数中的循环与thread-per-connections的逻辑类似,但是这个函数的目标是执行单个QUERY,所以正常情况下,下面的循环只会执行一次
对于SSL的连接,可能会执行多次,由vio->has_data()判断数据是否结束

/**
 Process a single client request or a single batch.
*/
int threadpool_process_request(THD *thd) {
  thread_attach(thd);
  ...
  for (;;) {
    Vio *vio;
    thd_set_net_read_write(thd, 0);

    if ((retval = do_command(thd)) != 0) goto end;

    if (!thd_connection_alive(thd)) {
      retval = 1;
      goto end;
    }

    vio = thd->get_protocol_classic()->get_vio();
    if (!vio->has_data(vio)) {
      /* More info on this debug sync is in sql_parse.cc*/
      DEBUG_SYNC(thd, "before_do_command_net_read");
      thd_set_net_read_write(thd, 1);
      goto end;
    }
    if (!thd->m_server_idle) {
      MYSQL_SOCKET_SET_STATE(vio->mysql_socket, PSI_SOCKET_STATE_IDLE);
      MYSQL_START_IDLE_WAIT(thd->m_idle_psi, &thd->m_idle_state);
      thd->m_server_idle = true;
    }
  }

end:
  ...
  return retval;
}

计时器线程(timer_thread)

计时器线程主要工作:

  1. 检查每个线程组是否停滞(stall)
  2. 检查每个连接(THD)是否超时,超时则kill掉
static void *timer_thread(void *param) noexcept {
  ...
  for (;;) {
    ...
    if (err == ETIMEDOUT) {
      timer->current_microtime.store(my_microsecond_getsystime(),
                                     std::memory_order_relaxed);

      /* Check stalls in thread groups */
      for (size_t i = 0; i < array_elements(all_groups); i++) {
        if (all_groups[i].connection_count) check_stall(&all_groups[i]);
      }

      /* Check if any client exceeded wait_timeout */
      if (timer->next_timeout_check.load(std::memory_order_relaxed) <=
          timer->current_microtime.load(std::memory_order_relaxed))
        timeout_check(timer);
    }
  }
  ...
}

check_stall流程:

  1. 检测listener是否存在
    • 如果不存在,检查是否由io事件被弹出
    • 如果没有,表明listener被阻塞,创建一个工作线程(而后它会变成listener)并退出
  2. 重置io_event_count
  3. 判断是否线程组是否stall:
    • thread_group->queue_event_count表示从队列中被处理事件的数量,每次会被重置
    • 如果自从上一次check_stall到现在仍然为0且队列和优先队列不为空,则确定stall
    • 如果确定stall了,则设置thread_group->stalled,唤醒,或创建新工作线程
    • 当一个事件从队列中出去,则表明恢复正常,重置thread_group->stalled
static void check_stall(thread_group_t *thread_group) {
  ...
  // 如果listener不存在,则创建一个工作线程(而后它会变成listener)
  if (!thread_group->listener && !thread_group->io_event_count) {
    wake_or_create_thread(thread_group);
    mysql_mutex_unlock(&thread_group->mutex);
    return;
  }
  // 重置io_event_count
  thread_group->io_event_count = 0;
  // queue_event_count为0且队列和优先队列不为空,确定stall
  if (!thread_group->queue_event_count && !queues_are_empty(*thread_group)) {
    thread_group->stalled = true;
    wake_or_create_thread(thread_group);
  }
  // check完后重置queue_event_count
  thread_group->queue_event_count = 0;

  mysql_mutex_unlock(&thread_group->mutex);
}

线程池退出

调用tp_end(), 回收线程组资源,停止定时器线程,最终会等待所有线程结束后才会退出

线程池相关参数

系统变量

变量

说明

thread_handling

pool-of-threads开启线程池模式

threadpool_idle_timeout

用于限制空闲线程退出前应该等待的时间

threadpool_high_prio_mode

用于对全局或每个连接的高优先级调度提供更细粒度的控制

threadpool_high_prio_tickets

控制高优先级队列策略,为每个新连接分配的票数,以进入高优先级队列。将此变量设置为0将禁用高优先级队列

threadpool_max_threads

用于限制线程池中的最大线程数

threadpool_oversubscribe

该参数的值越高,可以同时运行的线程就越多,如果该值低于3,则可能导致更多的睡眠和唤醒

threadpool_size

用于定义同时可以使用CPU的线程数

threadpool_stall_limit

认定为stall的时间,当达到此限制时,线程池将唤醒或创建另一个线程

状态变量

变量

说明

Threadpool_idle_threads

显示线程池中空闲线程的数量

Threadpool_threads

显示线程池中的线程数。

参考文档:

MySQL源码阅读2-连接与线程管理
线程池移植原理文档
percona线程池文档

文章来自个人专栏
文章
6 文章 | 1 订阅
0条评论
0 / 1000
请输入你的评论
1
0