C语言线程解池解读和实现01
在说到并发,池式组件的时候,最先想到的肯定是线程池。那线程池的原理是什么呢,又是如何工作的呢?这篇文章告诉你答案。
知识梳理
- 什么是线程池
线程池就是维护和管理一定数量线程的池式组件。有提高CPU工 作效率的作用 - 为什么需要线程池
通俗来说,如果我们有一个IO十分耗时但是我们又是单线程的那么我们的线程将会阻塞,等待这个IO执行完之后才会继续执行。这样就会十分耗时。 - 线程池有哪些组件
- 任务队列
- 一定数量的线程
- 锁(保证线程安全)
- 线程池是如歌管理线程的
- 有任务:执行任务
- 没有任务:CPU休眠
头文件解读
我们先来看一下头文件吧:
#ifndef THRDPOOL_H_
#define THRDPOOL_H_
typedef struct thrdpool_s thrdpool_t;
typedef void (*handler_pt)(void*);
#ifdef __cplusplus
extern "C"
{
#endif
thrdpool_t *thrdpool_create(int thrd_count);
void thrdpool_treminater(thrdpool_t *pool);
int thrdpool_post(thrdpool_t *pool, handler_pt func, void *arg);
void thrdpool_waitdone(thrdpool_t *pool);
#ifdef __cplusplus
}
#endif
#endif
代码解读:
作为一个组件,或者说一个库,我们并不希望别人可以看到我们内部是如何实现的,所以我们告诉用户如何使用即可。thrdpool_create
用来初始化一个线程池,参数是线程数量。thrdpool_terminater
用来停止线程池。thrdpool_post
用来抛出任务,即在哪个线程池,执行哪个函数,参数是什么。thrdpool_waitdone
检测线程是不是都执行完。
注意,虽然我们不希望用户看到我们的实现,但是要告诉用户我们库的使用规范,就是最前面的两个typedef
。他告诉用户我们线程池的对象类型和传入任务的规范。
由于我们支持C++使用我们库,所以我们加上#ifdef __cplusplus extern “C”
。就是说如果是C++,那我们就采用C规则编译这个文件。
数据结构解读
队列
typedef struct task_s
{
void *next;
handler_pt func;//对应函数
void *arg;//参数
} task_t;
typedef struct task_queue_s
{
void *head;//头指针
void **tail;//尾指针
int block;//标志
spinlock_t lock;//回旋锁
pthread_mutex_t mutex;//互斥锁
pthread_cond_t cond;//条件
} task_queue_t;
代码解读:
这里我们先看一下我们队列结构的示意图:
我们使用一个链式结构。将所有task连接在一起,然后有一个manager来管理这些task。看了这张图结合上面的代码就可以很清楚的了解到我们队列的结构。
池
typedef struct thrdpool_s
{
task_queue_t *task_queue;//任务队列
atomic_int quit;//标志
uint32_t thrd_count;//池内线程数量
pthread_t *threads;//线程数组
} thrdpool_t;
代码解读:
这里没有什么好说的。其中标志的意思就是:如果为0,正常运行,如果为1阻塞。它是一个原子变量,所以具有线程安全。由于就这一个变量,没有设计复杂的操作,所以没有使用锁的必要,我们用原子变量即可解决。
代码实现
初始化队列
static task_queue_t* __taskqueue_create()
{
int ret;
task_queue_t *queue = (task_queue_t*)malloc(sizeof(*queue));
if(queue)
{
ret = pthread_mutex_init(&queue->mutex);
if(ret == 0)
{
ret = pthread_cond_init(&queue->cond);
if(ret == 0)
{
spinlock_init(&queue->lock);
queue->head = NULL;
queue->tail = &queue->head;
queue->block = 1;
return queue;
}
pthread_mutex_destroy(&queue->mutex);
}
free(queue);
}
return NULL;
}
代码解读:
没有什么特别好说的,这里就是申请一块内存,然后对其中的结构体成员进行初始化。如果全部初始化成功就返回指针。主要注意的是,我们的block要初始化为0,就是默认在阻塞状态,因为刚刚初始化,队列里面没有任何任务。
移除阻塞
static int __nonblock(task_queue_t *queue)
{
pthread_mutex_lock(&queue->mutex);
queue->block = 0;
pthread_mutex_unlock(&queue->mutex);
pthread_cond_broadcast(&queue->cond);
}
代码解读:
在对task_queue
内部变量操作的时候,必须使用锁来保证线程安全。将标志为阻塞的block
置0,解除阻塞。然后广播,唤醒其他线程。
插入任务
static inline __add_task(task_queue_t *queue, void *task)
{
void **link = (void**)task;
*link = NULL;
spinlock_lock(&queue->lock);
*queue->tail = link;
queue->tail = link;
spinlock_unlock(&queue->lock);
pthread_cond_signal(&queue->cond);
}
代码解读:
首先,我们让task->next
指向NULL。然后,在加锁的情况下,对队列进行尾插。最后唤醒一个线程获取任务。这里需要解释一下我们的写法,为什么可以这样写。事实上,在内核中,队列都是这样的一级指针头+二级指针尾的写法,原理请看图:
从指正管理长度的角度来看void*
就是管理了整个node,但是**void
就是管理指向内存的后面8个字节(64位操作系统)即*next
;
所以*queue->tail 就是 queue->tail->next
;同理*link 就是 *next
。
删除任务
static task_t* __pop_task(task_queue_t *queue)
{
spinlock_lock(&queue->lock);
if(queue->head == NULL)
return NULL;
task_t *task;
void **link = (void**)queue->head;
queue->head = *link;
if(queue->head == NULL)
queue->tail = &queue->head;
spinlock_unlock(&queue->lock);
}
代码讲解:
我们将最前面的task取出,然后head指针指向后面一个。
获取任务
static void* __get_task(task_queue_t *queue)
{
task_t *task;
pthread_mutex_lock(&queue->mutex);
while(task = __pop_task(queue) == NULL)
{
if(queue->block == 0)
{
pthread_mutex_unlock(&queue->mutex);
return NULL;
}
pthread_cond_wait(&queue->cond, &queue->mutex);
pthread_mutex_unlock(&queue->mutex);
}
return task;
}
代码解读:
我们调用上面写的__pop_task
来获取队列最前面的task。如果队列是非阻塞的,那就直接返回。如果队列是阻塞的(刚刚初始化完),那我们就要进入休眠,等待__add_task
里面的唤醒。然后返回获取到的task。
队列的销毁
static void __destroy_task_queue(task_queue_t *queue)
{
task_t *task;
while(task = __pop_task(queue))
{
free(task);
}
pthread_mutex_destroy(&queue->mutex);
pthread_cond_destroy(&queue->cond);
spinlock_destroy(&queue->lock);
free(queue);
}
代码解读:
释放全部资源销毁即可。
线程池工作
static void* __thrdpool_work(void *arg)
{
thrdpool_t *thrdpool = (thrdpool_t*)arg;
task_t *task;
void *cxt;
if(atomic_load(&thrdpool->quit) == 0)
{
task = (task_t*)__get_task(thrdpool->task_queue);
handler_pt func = task->func;
cxt = task->arg;
free(task);
func(cxt);
}
return NULL;
}
代码解读:
线程停止
static void __thrdpool_terminater(thrdpool_t *pool)
{
atomic_store(&pool->quit, 1);
__nonblock(pool->task_queue);
int i = 0;
for(i; i < pool->thrd_count; i++)
{
pthread_join(pool->threads[i], NULL);
}
}
代码解读:
首先让线程池阻塞,然后执行完当前所有线程的任务。
创建线程池
static int __thrdpool_create(thrdpool_t *pool, int thrd_num)
{
int ret;
pthread_attr_t attr;
ret = pthread_attr_init(&attr);
if(ret == 0)
{
pool->threads = (pthread_t*)malloc(sizeof(pthread_t) * thrd_num);
if(pool->threads)
{
int i = 0;
for(i; i < thrd_num; i++)
{
if(pthread_create(&pool->threads[i], &attr, __thrdpool_work, pool) != 0)
{
break;
}
}
pool->thrd_count = i;
if(thrd_num == i)
return 0;
__thrdpool_terminater(pool);
free(pool->threads);
}
ret = -1;
}
return ret;
}
代码解读:
主要就是在堆上开辟空间,然后使用循环批量创建线程。就是要注意一点一点来,如果资源创建失败需要及时销毁资源并且返回。
接口
void thrdpool_treminater(thrdpool_t *pool)
{
atomic_store(&pool->quit, 1);
__nonblock(pool->task_queue);
}
thrdpool_t *thrdpool_create(int thrd_count)
{
thrdpool_t *pool = (thrdpool_t*)malloc(sizeof(thrdpool_t));
if(pool)
{
task_queue_t *task = __taskqueue_create();
if(task)
{
pool->task_queue = task;
int ret = __thrdpool_create(pool, thrd_count);
if(ret == 0)
{
return pool;
}
__destroy_task_queue(pool->task_queue);
}
free(pool);
}
return NULL;
}
int thrdpool_post(thrdpool_t *pool, handler_pt func, void *arg)
{
task_t *task = (task_t*)malloc(sizeof(task_t));
if(atomic_load(&pool->quit) == 1)
{
return -1;
}
task->arg = arg;
task->func = func;
__add_task(pool->task_queue, task);
return 0;
}
void thrdpool_waitdone(thrdpool_t *pool)
{
int i = 0;
for(i; i < pool->thrd_count; i++)
{
pthread_join(pool->threads[i], NULL);
}
__destroy_task_queue(pool->task_queue);
free(pool->threads);
free(pool);
}
代码解读:
这里的接口是提供给客户使用的。这里需要注意waitdone
和terminater
的区别。一个是完全停止线程池(带有销毁功能),一个是不销毁,只是暂停。