C++多线程实现
C++11支持语言级别的多线程编程,可以跨平台运行,支持windows/linux/mac等。
主要涉及:
- thread/mutex/condition_variable
- lock_quard/unique_lock
- automic:原子类型,基于CAS操作的原子类型,线程安全的
- sleep_for
C++的thread本质上还是调用系统支持的函数,windows(createThread)、linux(pthread_create)进行多线程。
初识多线程
- 如何创建启动一个线程?
thread
来创建一个线程对象,需要线程所需要的线程函数和参数;线程自动开启。 - 子线程如何结束?子线程函数运行完成,线程就结束了。
- 主线程如何处理子线程?
join
和detach
方法。
#include <iostream>
#include <string>
#include<thread>
void threadHandle1()
{
// 让子线程睡眠两秒,this_thread获取当前线程,chrono计时的函数
std::this_thread::sleep_for(std::chrono::seconds(2));
std::cout << "hello,thread1" << std::endl;
}
void threadHandle2(int time)
{
// 让子线程睡眠两秒,this_thread获取当前线程,chrono计时的函数
std::this_thread::sleep_for(std::chrono::seconds(time));
std::cout << "hello,thread2" << std::endl;
}
int main()
{
// 创建了一个线程对象t1,传入一个线程函数,新线程就开始运行了
std::thread t1(threadHandle1);
// join是子线程等待主线程结束,主线程继续往下执行,
// detach则是分离线程,子线程和主线程无关联,可以独立运行,等主线程结束,整个程序结束,所有子线程都自动结束了
// 传入参数的情况
std::thread t2(threadHandle2, 2);
t1.join();
t2.join();
std::cout << "main thread hello" << std::endl;
return 0;
}
mutex和lock
多线程程序中可能会出现,竞态条件:多线程程序执行的结果是一致的,不会随着CPU对线程不同的调用顺序,而产生不同的运行结果。所以需要引入互斥锁来防止多个线程之间的对资源的访问正确性。
初识mutex
互斥锁,使用lock
和unlock
函数完成进程互斥,将会导致程序中断将会导致mutex的内存释放问题。具体如下所示:
// 模拟车站卖票的程序
#include <iostream>
#include <string>
#include<thread>
#include<list>
#include<mutex>
using namespace std;
// 一共有tickCount张票
int tickCount = 10;
// 定义全局互斥锁
mutex mtx;
void sellTicket(int index)
{
// mtx.lock(); //1、这样就只会存在一个窗口在卖票,因为while循环只支持一个线程访问
while(tickCount>0)
{
/*
直接使用mutex.lock()和mutex.unlock()将会出现一个问题,
当函数还未unlock时就因为程序中间return或者error结束后,
导致mutex资源无法释放的问题。
*/
mtx.lock(); //2、仅在临界区代码段 -> 原子操作 -> 线程间互斥操作 -> mutex
// 在锁里面加判断是防止当一个进程1面临tickCount=1时,还为-1成功,
// 另一个线程2进入while循环,只是在mtx.lock阻塞了,等进程1-1成功后,
// 其实进程2获取到的tickCount已经由1->0,但是还是会进行tickCount--
// 导致了最终卖出-1张票
if(tickCount>0)
{
cout << "窗口:" << index << "卖出第" << tickCount << "张票。" << endl;
tickCount--;
}
mtx.unlock(); //2、
this_thread::sleep_for(chrono::milliseconds(2));
}
// mtx.unlock(); // 1、
}
/*
多线程程序:
竞态条件:多线程程序执行的结果是一致的,不会随着CPU对线程不同的调用顺序,而产生不同的运行结果。
*/
int main()
{
list<thread> tlist;
int thread_num = 3;
for (int i = 0; i < thread_num;i++)
{
tlist.push_back(thread(sellTicket, i));
}
for (thread &t : tlist)
{
t.join();
}
return 0;
}
所以进一步引入lock_guard
(不可能用在函数参数传递或者返回过程中,也不能赋值使用,只能用于简单的加锁解锁的临界代码段当中)和unique_lock
(一般用于进程通信,和condition_varible
联合使用)在作用域结束后自动析构,类似于智能指针。如下代码所示
void sellTicket2(int index)
{
while (tickCount > 0)
{
// mtx.lock();
{
// lock_guard函数删除了拷贝构造函数和操作符=重载,类似于scoped_ptr,但保留了构造函数
lock_guard<mutex> lock(mtx); //在这个局部作用域中,程序结束后自动析构,如果中间return了也会析构
if (tickCount > 0)
{
cout << "窗口:" << index << "卖出第" << tickCount << "张票。" << endl;
tickCount--;
}
}
// mtx.unlock();
this_thread::sleep_for(chrono::milliseconds(2));
}
}
void sellTicket3(int index)
{
while (tickCount > 0)
{
// mtx.lock();
{
// lock_guard函数删除了拷贝构造函数和操作符=重载,类似于scoped_ptr,但保留了构造函数
// lock_guard<mutex> lock(mtx); // 在这个局部作用域中,程序结束后自动析构,如果中间return了也会析构
unique_lock<mutex> temp_lock(mtx); // 类似于unique_ptr,虽然删除了拷贝构造函数和操作符=重载,但是扩展了右值引用
if (tickCount > 0)
{
cout << "窗口:" << index << "卖出第" << tickCount << "张票。" << endl;
tickCount--;
}
}
// mtx.unlock();
this_thread::sleep_for(chrono::milliseconds(2));
}
}
unique_lock
和condition_variable
使用连用:
mutex mtx;
condition_variable cv;
unique_lock<mutex> lck(mtx);
cv.wait(lck); //1、wait的作用使线程进入等待状态;2、lck.unlock可以把mtx给释放掉
// 通知cv上等待的线程,条件成立了,可以往下运行了
//其他在cv上等待的线程,收到通知,从等待状态->阻塞状态->获取互斥锁->线程执行
cv.notify_all();
线程同步通信
多线程编程存在的问题:
- 线程间的互斥;防止资源的访问出现问题。竞态条件 -> 临界区代码段 -> 原子操作 -> 互斥锁mutex(lock_guard、unique_lock)/强两级的无锁实现CAS
- 线程间的同步通信。生产者和消费者线程模型。
互斥
多线程执行共享变量的这段代码可能会导致竞争状态,因此我们将此段代码称为临界区(criticalsection),它是执行共享资源的代码片段,一定不能给多线程同时执行。
所以我们希望这段代码是互斥(mutualexclusion)的,也就说执行临界区(criticalsection)代码段的只能有一个线程,其他线程阻塞等待,达到排队效果。
互斥并不只是针对多线程的竞争条件,同时还可用于多进程,避免共享资源混乱。
同步
互斥解决了「多进程/线程」对临界区使用的问题,但是它没有解决「多进程/线程」协同工作的问题
我们都知道在多线程里,每个线程一定是顺序执行的,它们各自独立,以不可预知的速度向前推进,但有时候我们希望多个线程能密切合作,以实现一个共同的任务。
所谓同步,就是「多进程/线程间」在一些关键点上可能需要互相等待与互通消息,这种相互制约的等待与互通信息称为「进程/线程」同步。
生产者、消费者线程通信
这里涉及到两个线程之间的通信,生产者和消费者线程互相告知,lock_guard
无法实现进程间通信这样复杂的工作,所以使用unique_lock
和condition_variable
进行搭配使用可以实现进程间的通信。代码如下所示:
#include <iostream>
#include <string>
#include<thread>
#include<mutex>
#include<condition_variable>
#include<queue>
using namespace std;
// 定义互斥锁,用户线程间互斥
mutex mtx;
// 定义条件变量,用于线程间的同步通信
condition_variable cv;
// 最常见的问题就是消费者线程消费的更快,生产者线程还没生产出来就开始消费了
class Queue
{
public:
void put(int val)
{
// lock_guard<mutex> lock(mtx);
unique_lock<mutex> lck(mtx);
while(!que.empty())
{
// que不为空,生产者应该通知消费者去消费,消费完了在生产
// 生产者进程应该进入阻塞状态,并把mtx互斥锁
cv.wait(lck);
}
que.push(val);
/*
notify_one:通知另外的一个线程
notify_all:通知另外的所有线程
*/
// 通知其他的所有线程,生产了一个物品,可以进行消费了,
// 其他线程得到该通知就会从 等待状态 -> 阻塞状态 -> 获取互斥锁才能继续执行。
cv.notify_all();
cout << "生产者 生产:" << val << "号物品" << endl;
}
int get()
{
// lock_guard<mutex> lock(mtx);
unique_lock<mutex> lck(mtx);
while(que.empty())
{
// 消费者发现que是空的,通知生产者线程生产物品
// 进入等待状态,把互斥锁mutex进行释放
cv.wait(lck);
}
int val = que.front();
que.pop();
cv.notify_all(); //消费完了,通知其他线程进行生产
cout << "消费者 消费:" << val << "号物品" << endl;
return val;
}
private:
queue<int> que;
};
void producer(Queue* que)
{
for (int i = 0; i <= 10;i++)
{
que->put(i);
this_thread::sleep_for(chrono::milliseconds(100));
}
}
void consumer(Queue* que)
{
for (int i = 0; i <= 10; i++)
{
que->get();
this_thread::sleep_for(chrono::milliseconds(100));
}
}
int main()
{
Queue que;
thread t1(producer, &que);
thread t2(consumer,&que);
t1.join();
t2.join();
return 0;
}
CAS操作
互斥锁是比较重的,临界区代码做的事情如果很复杂,互斥锁使用便很麻烦。但是使用CAS来实现某些代码操作的原子特性便是足够了,CAS是无锁的。使用的头文件为atomic
,其实本质上也就是将某些类型设置为原子类型变量,导致只有一个线程可以独立使用。如下示例所示:
#include <iostream>
#include <string>
#include<atomic>
#include<list>
#include<thread>
using namespace std;
/*
使用lock_guard实现临界代码段的互斥访问
lock_guard<mutex> lock(mtx);
Count++;
*/
volatile std::atomic_bool isReady = {false};
volatile std::atomic_int number = {0};
void task()
{
while(!isReady)
{
// 让线程让出当前的CPU时间片,等待下一次调度
this_thread::yield();
}
for (int i = 0; i < 100;i++)
{
number++;
}
}
int main()
{
list<thread> tlist;
for (int i = 0; i < 10;i++)
{
tlist.push_back(thread(task));
}
// 让主线程睡眠三秒
this_thread::sleep_for(chrono::seconds(3));
cout << "number = " << number << endl;
isReady = true;
cout << "number = " << number << endl;
for(thread &t:tlist)
{
t.join();
}
cout << "number = " << number << endl;
return 0;
}
/*output::
number = 0
number = 1000
number = 1000
*/