"同步操作"是指在计算机科学和信息技术中的一种操作方式,其中不同的任务或操作按顺序执行,一个操作完成后才能开始下一个操作。在多线程编程中,各个任务通常需要通过同步操作进行相互协调和等待 ,以确保数据的一致性 和正确性 。
本章的主要内容有:
本章将讨论如何使用条件变量等待事件,介绍 future 等标准库设施用作同步操作,使用Qt+CMake 构建一个项目展示多线程的必要性,介绍 C++20 引入的新的同步设施。
等待事件或条件
假设你正在一辆夜间运行的地铁上,那么你要如何在正确的站点下车呢?
一直不休息,每一站都能知道,这样就不会错过你要下车的站点,但是这会很疲惫。
可以看一下时间,估算一下地铁到达目的地的时间,然后设置一个稍早的闹钟,就休息。这个方法听起来还行,但是你可能被过早的叫醒,甚至估算错误导致坐过站,又或者闹钟没电了睡过站。
事实上最简单的方式是,到站的时候有人或者其它东西能将你叫醒(比如手机的地图,到达设置的位置就提醒)。
这和线程有什么关系呢?其实第一种方法就是在说”忙等待 (busy waiting)”也称“自旋 “。
Copy bool flag = false ;
std :: mutex m;
void wait_for_flag (){
std :: unique_lock < std :: mutex > lk{ m };
while ( ! flag){
lk . unlock (); // 1 解锁互斥量
lk . lock (); // 2 上锁互斥量
}
}
第二种方法就是加个延时,这种实现进步了很多,减少浪费的执行时间,但很难确定正确的休眠时间。这会影响到程序的行为,在需要快速响应的程序中就意味着丢帧或错过了一个时间片。循环中,休眠②前函数对互斥量解锁①,再休眠结束后再对互斥量上锁,让另外的线程有机会获取锁并设置标识(因为修改函数和等待函数共用一个互斥量)。
Copy void wait_for_flag (){
std :: unique_lock < std :: mutex > lk{ m };
while ( ! flag){
lk . unlock (); // 1 解锁互斥量
std :: this_thread :: sleep_for (std :: chrono :: milliseconds ( 100 )); // 2 休眠
lk . lock (); // 3 上锁互斥量
}
}
第三种方式(也是最好的)实际上就是使用条件变量了。通过另一线程触发等待事件的机制是最基本的唤醒方式,这种机制就称为“条件变量”。
C++ 标准库对条件变量有两套实现:std::condition_variable
和 std::condition_variable_any
,这两个实现都包含在 <condition_variable>
头文件中。
condition_variable_any
类是 std::condition_variable
的泛化。相对于只在 std::unique_lock<std::mutex>
上工作的 std::condition_variable
,condition_variable_any
能在任何满足可基本锁定(BasicLockable) 要求的锁上工作,所以增加了 _any
后缀。显而易见,这种区分必然是 any
版更加通用但是却有更多的性能开销 。所以通常首选 std::condition_variable
。有特殊需求,才会考虑 std::condition_variable_any
。
Copy std :: mutex mtx;
std :: condition_variable cv;
bool arrived = false ;
void wait_for_arrival () {
std :: unique_lock < std :: mutex > lck (mtx);
cv . wait (lck , []{ return arrived; }); // 等待 arrived 变为 true
std :: cout << "到达目的地,可以下车了!" << std :: endl;
}
void simulate_arrival () {
std :: this_thread :: sleep_for (std :: chrono :: seconds ( 5 )); // 模拟地铁到站,假设5秒后到达目的地
{
std :: lock_guard < std :: mutex > lck (mtx);
arrived = true ; // 设置条件变量为 true,表示到达目的地
}
cv . notify_one (); // 通知等待的线程
}
运行 测试。更换为 std::condition_variable_any
效果相同 。
std::mutex mtx
: 创建了一个互斥量,用于保护共享数据的访问,确保在多线程环境下的数据同步。
std::condition_variable cv
: 创建了一个条件变量,用于线程间的同步,当条件不满足时,线程可以等待,直到条件满足时被唤醒。
bool arrived = false
: 设置了一个标志位,表示是否到达目的地。
在 wait_for_arrival
函数中:
std::unique_lock<std::mutex> lck(mtx)
: 使用互斥量创建了一个独占锁。
cv.wait(lck, []{ return arrived; })
: 阻塞当前线程,释放(unlock)锁,直到条件被满足。
一旦条件满足,即 arrived
变为 true,并且条件变量 cv
被唤醒 (包括虚假唤醒 ),那么当前线程会重新获取锁(lock),并执行后续的操作。
在 simulate_arrival
函数中:
std::this_thread::sleep_for(std::chrono::seconds(5))
: 模拟地铁到站,暂停当前线程 5 秒。
设置 arrived
为 true,表示到达目的地。
cv.notify_one()
: 唤醒一个等待条件变量的线程。
这样,当 simulate_arrival
函数执行后,arrived
被设置为 true,并且通过 cv.notify_one()
唤醒了等待在条件变量上的线程,从而使得 wait_for_arrival
函数中的等待结束,可以执行后续的操作,即输出提示信息。
条件变量的 wait
成员函数有两个版本,以上代码使用的就是第二个版本,传入了一个谓词 。
Copy void wait (std :: unique_lock <std :: mutex > & lock); // 1
template < class Predicate >
void wait (std :: unique_lock <std :: mutex > & lock , Predicate pred); // 2
②等价于:
Copy while ( ! pred ())
wait (lock);
第二个版本只是对第一个版本的包装 ,等待并判断谓词,会调用第一个版本的重载。这可以避免“虚假唤醒(spurious wakeup) ”。
条件变量虚假唤醒是指在使用条件变量进行线程同步时,有时候线程可能会在没有收到通知的情况下被唤醒。问题取决于程序和系统的具体实现。解决方法很简单,在循环中等待并判断条件可一并解决。使用 C++ 标准库则没有这个烦恼了。
我们也可以简单看一下 MSVC STL 的源码实现 :
Copy void wait ( unique_lock < mutex > & _Lck) noexcept {
_Cnd_wait ( _Mycnd () , _Lck . mutex () -> _Mymtx ());
}
template < class _Predicate >
void wait ( unique_lock < mutex > & _Lck , _Predicate _Pred) {
while ( ! _Pred ()) {
wait (_Lck);
}
}
线程安全的队列
在本节中,我们介绍了一个更为复杂的示例,以巩固我们对条件变量的学习。为了实现一个线程安全的队列,我们需要考虑以下两个关键点:
当执行 push
操作时,需要确保没有其他线程正在执行 push
或 pop
操作;同样,在执行 pop
操作时,也需要确保没有其他线程正在执行 push
或 pop
操作。
当队列为空 时,不应该执行 pop
操作。因此,我们需要使用条件变量来传递一个谓词,以确保在执行 pop
操作时队列不为空。
基于以上思考,我们设计了一个名为 threadsafe_queue
的模板类,如下:
Copy template < typename T >
class threadsafe_queue {
mutable std :: mutex m; // 互斥量,用于保护队列操作的独占访问
std :: condition_variable data_cond; // 条件变量,用于在队列为空时等待
std :: queue < T > data_queue; // 实际存储数据的队列
public :
threadsafe_queue () {}
void push ( T new_value) {
{
std :: lock_guard < std :: mutex > lk { m };
data_queue . push (new_value);
}
data_cond . notify_one ();
}
// 从队列中弹出元素(阻塞直到队列不为空)
void pop ( T & value) {
std :: unique_lock < std :: mutex > lk{ m };
data_cond . wait (lk , [ this ] { return ! data_queue . empty (); });
value = data_queue . front ();
data_queue . pop ();
}
// 从队列中弹出元素(阻塞直到队列不为空),并返回一个指向弹出元素的 shared_ptr
std :: shared_ptr < T > pop () {
std :: unique_lock < std :: mutex > lk{ m };
data_cond . wait (lk , [ this ] { return ! data_queue . empty (); });
std :: shared_ptr < T > res { std :: make_shared < T >( data_queue . front ()) };
data_queue . pop ();
return res;
}
bool empty () const {
std :: lock_guard < std :: mutex > lk (m);
return data_queue . empty ();
}
};
请无视我们省略的构造、赋值、交换、try_xx
等操作。以上示例已经足够。
光写好了肯定不够,我们还得测试运行,我们可以写一个经典的:”生产者消费者模型 “,也就是一个线程 push
”生产 “,一个线程 pop
”消费 “。
Copy void producer ( threadsafe_queue < int > & q) {
for ( int i = 0 ; i < 5 ; ++ i) {
q . push (i);
}
}
void consumer ( threadsafe_queue < int > & q) {
for ( int i = 0 ; i < 5 ; ++ i) {
int value{};
q . pop (value);
}
}
两个线程分别运行 producer
与 consumer
,为了观测运行我们可以为 push
与 pop
中增加打印语句:
Copy std :: cout << "push:" << new_value << std :: endl;
std :: cout << "pop:" << value << std :: endl;
可能 的运行 结果是:
Copy push:0
pop:0
push:1
pop:1
push:2
push:3
push:4
pop:2
pop:3
pop:4
这很正常,到底哪个线程会抢到 CPU 时间片持续运行,是系统调度决定的,我们只需要保证一开始提到的两点就行了:
push
与 pop
都只能单独执行;当队列为空 时,不执行 pop
操作。
我们可以给一个简单的示意图帮助你理解这段运行结果:
Copy 初始状态:队列为空
+---+---+---+---+---+
Producer 线程插入元素 0:
+---+---+---+---+---+
| 0 | | | | |
Consumer 线程弹出元素 0:
+---+---+---+---+---+
| | | | | |
Producer 线程插入元素 1:
+---+---+---+---+---+
| 1 | | | | |
Consumer 线程弹出元素 1:
+---+---+---+---+---+
| | | | | |
Producer 线程插入元素 2:
+---+---+---+---+---+
| | 2 | | | |
Producer 线程插入元素 3:
+---+---+---+---+---+
| | 2 | 3 | | |
Producer 线程插入元素 4:
+---+---+---+---+---+
| | 2 | 3 | 4 | |
Consumer 线程弹出元素 2:
+---+---+---+---+---+
| | | 3 | 4 | |
Consumer 线程弹出元素 3:
+---+---+---+---+---+
| | | | 4 | |
Consumer 线程弹出元素 4:
+---+---+---+---+---+
| | | | | |
队列为空,所有元素已被弹出
到此,也就可以了。
使用 future
举个例子:我们在车站等车,你可能会做一些别的事情打发时间,比如学习现代 C++ 模板教程 、观看 mq白 的视频教程、玩手机等。不过,你始终在等待一件事情:车到站 。
C++ 标准库将这种事件称为 future 。它用于处理线程中需要等待某个事件的情况,线程知道预期结果。等待的同时也可以执行其它的任务。
C++ 标准库有两种 future,都声明在 <future>
头文件中:独占的 std::future
、共享的 std::shared_future
。它们的区别与 std::unique_ptr
和 std::shared_ptr
类似。std::future
只能与单个 指定事件关联,而 std::shared_future
能关联多个 事件。它们都是模板,它们的模板类型参数,就是其关联的事件(函数)的返回类型。当多个线程需要访问一个独立 future 对象时, 必须使用互斥量或类似同步机制进行保护。而多个线程访问同一共享状态,若每个线程都是通过其自身的 shared_future
对象副本进行访问,则是安全的。
最简单有效的使用是,我们先前讲的 std::thread
在线程中执行任务是没有返回值的,这个问题就能使用 future 解决。
创建异步任务获取返回值
假设需要执行一个耗时任务并获取其返回值,但是并不急切的需要它。那么就可以启动新线程计算,然而 std::thread
没提供直接从线程获取返回值的机制。所以我们可以使用 std::async
函数模板。
使用 std::async
启动一个异步任务,它会返回一个 std::future
对象,这个对象和任务关联,将持有最终计算出来的结果。当需要任务执行完的结果的时候,只需要调用 get()
成员函数,就会阻塞直到 future
为就绪为止(即任务执行完毕),返回执行结果。valid()
成员函数检查 future 当前是否关联共享状态,即是否当前关联任务。还未关联,或者任务已经执行完(调用了 get()、set()),都会返回 false
。
Copy #include <iostream>
#include <thread>
#include <future> // 引入 future 头文件
int task ( int n) {
std :: cout << "异步任务 ID: " << std :: this_thread :: get_id () << '\n' ;
return n * n;
}
int main () {
std :: future <int> future = std :: async (task , 10 );
std :: cout << "main: " << std :: this_thread :: get_id () << '\n' ;
std :: cout << std :: boolalpha << future . valid () << '\n' ; // true
std :: cout << future . get () << '\n' ;
std :: cout << std :: boolalpha << future . valid () << '\n' ; // false
}
运行 测试。
与 std::thread
一样,std::async
支持任意可调用(Callable) 对象,以及传递调用参数。包括支持使用 std::ref
,以及支持只能移动的类型 。我们下面详细聊一下 std::async
参数传递的事。
Copy struct X {
int operator ()( int n) const {
return n * n;
}
};
struct Y {
int f ( int n) const {
return n * n;
}
};
void f ( int& p) { std :: cout << & p << '\n' ; }
int main (){
Y y;
int n = 0 ;
auto t1 = std :: async (X{} , 10 );
auto t2 = std :: async ( & Y :: f , & y , 10 );
auto t3 = std :: async ([] {});
auto t4 = std :: async (f , std :: ref (n));
std :: cout << & n << '\n' ;
}
运行 测试。
如你所见,它支持所有可调用(Callable) 对象,并且也是默认按值复制,必须使用 std::ref
才能传递引用。并且它和 std::thread
一样,内部会将保有的参数副本转换为右值表达式进行传递 ,这是为了那些只支持移动的类型 ,左值引用没办法引用右值表达式,所以如果不使用 std::ref
,这里 void f(int&)
就会导致编译错误,如果是 void f(const int&)
则可以通过编译,不过引用的不是我们传递的局部对象。
Copy void f ( const int& p) {}
void f2 ( int& p ){}
int n = 0 ;
std :: async (f , n); // OK! 可以通过编译,不过引用的并非是局部的n
std :: async (f2 , n); // Error! 无法通过编译
我们来展示使用 std::move
,也就是移动传递参数并接受返回值:
Copy struct move_only {
move_only () { std :: puts ( "默认构造" ); }
move_only ( move_only && ) noexcept { std :: puts ( "移动构造" ); }
move_only & operator =( move_only && ) noexcept {
std :: puts ( "移动赋值" );
return * this ;
}
move_only ( const move_only & ) = delete ;
};
move_only task ( move_only x){
std :: cout << "异步任务 ID: " << std :: this_thread :: get_id () << '\n' ;
return x;
}
int main (){
move_only x;
std :: future < move_only > future = std :: async (task , std :: move (x));
std :: this_thread :: sleep_for (std :: chrono :: seconds ( 1 ));
std :: cout << "main\n" ;
move_only result = future . get (); // 等待异步任务执行完毕
}
运行 测试。
如你所见,它支持只移动类型 ,我们将参数使用 std::move
传递,接收参数的时候直接调用 get
函数即可。
接下来我们聊 std::async
的执行策略 ,我们前面一直没有使用,其实就是在传递可调用对象与参数之前传递枚举值罢了:
std::launch::async
在不同线程上 执行异步任务。
std::launch::deferred
惰性求值,不创建线程 ,等待 future
对象调用 wait
或 get
成员函数的时候执行任务。
而我们先前一直没有写明这个参数,是因为 std::async
函数模板有两个重载 ,不给出执行策略就是以:std::launch::async | std::launch::deferred
调用另一个重载版本(这一点中在源码 中很明显),此策略表示由实现选择到底是否创建线程执行异步任务。典型情况是,如果系统资源充足,并且异步任务的执行不会导致性能问题,那么系统可能会选择在新线程中执行任务。但是,如果系统资源有限,或者延迟执行可以提高性能或节省资源,那么系统可能会选择延迟执行。
如果你阅读 libstdc++
的代码,会发现的确如此。
然而值得注意的是,在 MSVC STL 的实现中,launch::async | launch::deferred
与 launch::async
执行策略毫无区别,源码 如下:
Copy template < class _Ret , class _Fty >
_Associated_state < typename _P_arg_type< _Ret > :: type > * _Get_associated_state ( launch _Psync , _Fty && _Fnarg) {
// construct associated asynchronous state object for the launch type
switch (_Psync) { // select launch type
case launch :: deferred :
return new _Deferred_async_state < _Ret >(_STD forward < _Fty >(_Fnarg));
case launch :: async : // TRANSITION, fixed in vMajorNext, should create a new thread here
default :
return new _Task_async_state < _Ret >(_STD forward < _Fty >(_Fnarg));
}
}
且 _Task_async_state
会通过 ::Concurrency::create_task
从线程池中获取线程并执行任务返回包装对象。
简而言之,使用 std::async
,只要不是 launch::deferred
策略,那么 MSVC STL 实现中都是必然在线程中执行任务。因为是线程池,所以执行新任务是否创建新线程,任务执行完毕线程是否立即销毁,不确定 。
我们来展示一下:
Copy void f (){
std :: cout << std :: this_thread :: get_id () << '\n' ;
}
int main (){
std :: cout << std :: this_thread :: get_id () << '\n' ;
auto f1 = std :: async (std :: launch :: deferred , f);
f1 . wait (); // 在 wait() 或 get() 调用时执行,不创建线程
auto f2 = std :: async (std :: launch :: async , f); // 创建线程执行异步任务
auto f3 = std :: async (std :: launch :: deferred | std :: launch :: async , f); // 实现选择的执行方式
}
运行 测试。
其实到此基本就差不多了,我们再介绍两个常见问题即可:
如果从 std::async
获得的 std::future
没有被移动或绑定到引用,那么在完整表达式结尾, std::future
的**析构函数 将阻塞,直到到异步任务完成**。因为临时对象的生存期就在这一行,而对象生存期结束就会调用调用析构函数。
Copy std :: async (std :: launch :: async , []{ f (); }); // 临时量的析构函数等待 f()
std :: async (std :: launch :: async , []{ g (); }); // f() 完成前不开始
如你所见,这并不能创建异步任务,它会阻塞,然后逐个执行。
被移动的 std::future
没有所有权,失去共享状态,不能调用 get
、wait
成员函数。
Copy auto t = std :: async ([] {});
std :: future <void> future{ std :: move (t) };
t . wait (); // Error! 抛出异常
如同没有线程资源所有权的 std::thread
对象调用 join()
一样错误,这是移动语义的基本语义逻辑。
future
与 std::packaged_task
类模板 std::packaged_task
包装任何可调用(Callable) 目标(函数、lambda 表达式、bind 表达式或其它函数对象),使得能异步 调用它。其返回值或所抛异常被存储于能通过 std::future 对象访问的共享状态中。
通常它会和 std::future
一起使用,不过也可以单独使用,我们一步一步来:
Copy std :: packaged_task < double ( int , int )> task ([]( int a , int b){
return std :: pow (a , b);
});
task ( 10 , 2 ); // 执行传递的 lambda,但无法获取返回值
它有 operator()
的重载,它会执行我们传递的可调用(Callable) 对象,不过这个重载的返回类型是 void
没办法获取返回值 。
如果想要异步的获取返回值,我们需要在调用 operator()
之前,让它和 future 关联,然后使用 future.get()
,也就是:
Copy std :: packaged_task < double ( int , int )> task ([]( int a , int b){
return std :: pow (a , b);
});
std :: future <double> future = task . get_future ();
task ( 10 , 2 ); // 此处执行任务
std :: cout << future . get () << '\n' ; // 不阻塞,此处获取返回值
运行 测试。
先关联任务,再执行任务,当我们想要获取任务的返回值的时候,就 future.get()
即可。值得注意的是,任务并不会在线程中执行,想要在线程中执行异步任务,然后再获取返回值,我们可以这么做:
Copy std :: packaged_task < double ( int , int )> task ([]( int a , int b){
return std :: pow (a , b);
});
std :: future <double> future = task . get_future ();
std :: thread t{ std :: move (task) , 10 , 2 }; // 任务在线程中执行
// todo.. 幻想还有许多耗时的代码
t . join ();
std :: cout << future . get () << '\n' ; // 并不阻塞,获取任务返回值罢了
运行 测试。
因为 task
本身是重载了 operator()
的,是可调用对象,自然可以传递给 std::thread
执行,以及传递调用参数。唯一需要注意的是我们使用了 std::move
,这是因为 std::packaged_task
只能移动,不能复制。
简而言之,其实 std::packaged_task
也就是一个“包装”类而已,它本身并没什么特殊的,老老实实执行我们传递的任务,且方便我们获取返回值罢了,明确这一点,那么一切都不成问题。
std::packaged_task
也可以在线程中传递,在需要的时候获取返回值,而非像上面那样将它自己作为可调用对象:
Copy template < typename R , typename ... Ts , typename ... Args >
requires std :: invocable < std :: packaged_task < R (Ts...) >& , Args... >
void async_task (std :: packaged_task < R (Ts...)> & task , Args && ...args) {
// todo..
task (std :: forward < Args >(args)...);
}
int main () {
std :: packaged_task <int ( int , int ) > task ([]( int a , int b){
return a + b;
});
int value = 50 ;
std :: future <int> future = task . get_future ();
// 创建一个线程来执行异步任务
std :: thread t{ [ & ] { async_task (task , value , value); } };
std :: cout << future . get () << '\n' ;
t . join ();
}
运行 测试。
我们套了一个 lambda,这是因为函数模板不是函数,它并非具体类型,没办法直接被那样传递使用,只能包一层了。这只是一个简单的示例,展示可以使用 std::packaged_task
作函数形参,然后我们来传递任务进行异步调用等操作。
我们再将第二章实现的并行 sum
改成 std::package_task
+ std::future
的形式:
Copy template < typename ForwardIt >
auto sum ( ForwardIt first , ForwardIt last) {
using value_type = std :: iter_value_t < ForwardIt >;
std :: size_t num_threads = std :: thread :: hardware_concurrency ();
std :: ptrdiff_t distance = std :: distance (first , last);
if (distance > 1024000 ) {
// 计算每个线程处理的元素数量
std :: size_t chunk_size = distance / num_threads;
std :: size_t remainder = distance % num_threads;
// 存储每个线程要执行的任务
std :: vector < std :: packaged_task < value_type () >> tasks;
// 和每一个任务进行关联的 future 用于获取返回值
std :: vector < std :: future < value_type >> futures (num_threads);
// 存储关联线程的线程对象
std :: vector < std :: thread > threads;
// 制作任务、与 future 关联、启动线程执行
auto start = first;
for (std :: size_t i = 0 ; i < num_threads; ++ i) {
auto end = std :: next (start , chunk_size + (i < remainder ? 1 : 0 ));
tasks . emplace_back (std :: packaged_task < value_type () > {[start , end , i] {
return std :: accumulate (start , end , value_type{});
}});
start = end; // 开始迭代器不断向前
futures [i] = tasks [i] . get_future (); // 任务与 std::future 关联
threads . emplace_back (std :: move ( tasks [i]));
}
// 等待所有线程执行完毕
for ( auto& thread : threads)
thread . join ();
// 汇总线程的计算结果
value_type total_sum {};
for (std :: size_t i = 0 ; i < num_threads; ++ i) {
total_sum += futures [i] . get ();
}
return total_sum;
}
value_type total_sum = std :: accumulate (first , last , value_type{});
return total_sum;
}
运行 测试。
相比于之前,其实不同无非是定义了 std::vector<std::packaged_task<value_type()>> tasks
与 std::vector<std::future<value_type>> futures
,然后在循环中制造任务插入容器,关联 tuple,再放到线程中执行。最后汇总的时候写一个循环,futures[i].get()
获取任务的返回值加起来即可。
到此,也就可以了。
使用 std::promise
类模板 std::promise
用于存储一个值或一个异常,之后通过 std::promise
对象所创建的 std::future 对象异步获得。
Copy // 计算函数,接受一个整数并返回它的平方
void calculate_square (std :: promise < int > promiseObj , int num) {
// 模拟一些计算
std :: this_thread :: sleep_for (std :: chrono :: seconds ( 1 ));
// 计算平方并设置值到 promise 中
promiseObj . set_value (num * num);
}
// 创建一个 promise 对象,用于存储计算结果
std :: promise <int> promise;
// 从 promise 获取 future 对象进行关联
std :: future <int> future = promise . get_future ();
// 启动一个线程进行计算
int num = 5 ;
std :: thread t ( calculate_square , std :: move (promise) , num );
// 阻塞,直到结果可用
int result = future . get ();
std :: cout << num << " 的平方是:" << result << std :: endl;
t . join ();
运行 测试。
我们在新线程中通过调用 set_value()
函数设置 promise
的值,并在主线程中通过与其关联的 future 对象的 get()
成员函数获取这个值,如果promise
的值还没有被设置,那么将阻塞当前线程,直到被设置为止。同样的 std::promise
只能移动 ,不可复制,所以我们使用了 std::move
进行传递。
除了 set_value()
函数外,std::promise
还有一个 set_exception()
成员函数,它接受一个 std::exception_ptr
类型的参数,这个参数通常通过 std::current_exception()
获取,用于指示当前线程中抛出的异常。然后,std::future
对象通过 get()
函数获取这个异常,如果 promise
所在的函数有异常被抛出,则 std::future
对象会重新抛出这个异常,从而允许主线程捕获并处理它。
Copy void throw_function (std :: promise < int > prom) {
try {
throw std :: runtime_error ( "一个异常" );
}
catch (...) {
prom . set_exception (std :: current_exception ());
}
}
int main () {
std :: promise <int> prom;
std :: future <int> fut = prom . get_future ();
std :: thread t (throw_function , std :: move (prom));
try {
std :: cout << "等待线程执行,抛出异常并设置\n" ;
fut . get ();
}
catch (std :: exception & e) {
std :: cerr << "来自线程的异常: " << e . what () << '\n' ;
}
t . join ();
}
运行结果 :
Copy 等待线程执行,抛出异常并设置
来自线程的异常: 一个异常
你可能对这段代码还有一些疑问:我们写的是 promised<int>
,但是却没有使用 set_value
设置值,你可能会想着再写一行 prom.set_value(0)
?
共享状态的 promise 已经存储值或者异常,再次调用 set_value
(set_exception
) 会抛出 std::future_error 异常,将错误码设置为 promise_already_satisfied
。这是因为 std::promise
对象只能是存储值或者异常其中一种,而无法共存 。
简而言之,set_value
与 set_exception
二选一,如果先前调用了 set_value
,就不可再次调用 set_exception
,反之亦然(不然就会抛出异常),示例如下:
Copy void throw_function (std :: promise < int > prom) {
prom . set_value ( 100 );
try {
throw std :: runtime_error ( "一个异常" );
}
catch (...) {
try {
// 共享状态的 promise 已存储值,调用 set_exception 产生异常
prom . set_exception (std :: current_exception ());
} catch (std :: exception & e){
std :: cerr << "来自 set_exception 的异常: " << e . what () << '\n' ;
}
}
}
int main () {
std :: promise <int> prom;
std :: future <int> fut = prom . get_future ();
std :: thread t (throw_function , std :: move (prom));
std :: cout << "等待线程执行,抛出异常并设置\n" ;
std :: cout << "值:" << fut . get () << '\n' ; // 100
t . join ();
}
运行结果 :
Copy 等待线程执行,抛出异常并设置
值:100
来自 set_exception 的异常: promise already satisfied
future 的状态变化
需要注意的是,future 是一次性的 ,所以你需要注意移动。并且,调用 get
函数后,future 对象也会失去共享状态 。
移动语义 :这一点很好理解并且常见,因为移动操作标志着所有权的转移 ,意味着 future
不再拥有共享状态(如之前所提到)。get
和 wait
函数要求 future
对象拥有共享状态,否则会抛出异常。
共享状态失效 :调用 get
成员函数时,future
对象必须拥有共享状态,但调用完成后,它就会失去共享状态 ,不能再次调用 get
。这是我们在本节需要特别讨论的内容。
Copy std :: future <void> future = std :: async ([] {});
std :: cout << std :: boolalpha << future . valid () << '\n' ; // true
future . get ();
std :: cout << std :: boolalpha << future . valid () << '\n' ; // false
try {
future . get (); // 抛出 future_errc::no_state 异常
}
catch (std :: exception & e) {
std :: cerr << e . what () << '\n' ;
}
运行 测试。
这个问题在许多文档中没有明确说明,但通过阅读源码(MSVC STL ),可以很清楚地理解:
Copy // std::future<void>
void get () {
// block until ready then return or throw the stored exception
future _Local{_STD move ( * this )};
_Local . _Get_value ();
}
// std::future<T>
_Ty get () {
// block until ready then return the stored result or throw the stored exception
future _Local{_STD move ( * this )};
return _STD move ( _Local . _Get_value ());
}
// std::future<T&>
_Ty & get () {
// block until ready then return the stored result or throw the stored exception
future _Local{_STD move ( * this )};
return * _Local . _Get_value ();