同步操作

"同步操作"是指在计算机科学和信息技术中的一种操作方式,其中不同的任务或操作按顺序执行,一个操作完成后才能开始下一个操作。在多线程编程中,各个任务通常需要通过同步操作进行相互协调和等待,以确保数据的一致性正确性

本章的主要内容有:

  • 条件变量

  • std::future 等待异步任务

  • 在规定时间内等待

  • Qt 实现异步任务的示例

  • 其它 C++20 同步设施:信号量、闩与屏障

本章将讨论如何使用条件变量等待事件,介绍 future 等标准库设施用作同步操作,使用Qt+CMake 构建一个项目展示多线程的必要性,介绍 C++20 引入的新的同步设施。

等待事件或条件

假设你正在一辆夜间运行的地铁上,那么你要如何在正确的站点下车呢?

  1. 一直不休息,每一站都能知道,这样就不会错过你要下车的站点,但是这会很疲惫。

  2. 可以看一下时间,估算一下地铁到达目的地的时间,然后设置一个稍早的闹钟,就休息。这个方法听起来还行,但是你可能被过早的叫醒,甚至估算错误导致坐过站,又或者闹钟没电了睡过站。

  3. 事实上最简单的方式是,到站的时候有人或者其它东西能将你叫醒(比如手机的地图,到达设置的位置就提醒)。

这和线程有什么关系呢?其实第一种方法就是在说”忙等待(busy waiting)”也称“自旋“。

bool flag = false;
std::mutex m;

void wait_for_flag(){
    std::unique_lock<std::mutex> lk{ m };
    while (!flag){
        lk.unlock();    // 1 解锁互斥量
        lk.lock();      // 2 上锁互斥量
    }
}

第二种方法就是加个延时,这种实现进步了很多,减少浪费的执行时间,但很难确定正确的休眠时间。这会影响到程序的行为,在需要快速响应的程序中就意味着丢帧或错过了一个时间片。循环中,休眠②前函数对互斥量解锁①,再休眠结束后再对互斥量上锁,让另外的线程有机会获取锁并设置标识(因为修改函数和等待函数共用一个互斥量)。

void wait_for_flag(){
    std::unique_lock<std::mutex> lk{ m };
    while (!flag){
        lk.unlock();    // 1 解锁互斥量
        std::this_thread::sleep_for(std::chrono::milliseconds(100)); // 2 休眠
        lk.lock();      // 3 上锁互斥量
    }
}

第三种方式(也是最好的)实际上就是使用条件变量了。通过另一线程触发等待事件的机制是最基本的唤醒方式,这种机制就称为“条件变量”。

C++ 标准库对条件变量有两套实现:std::condition_variablestd::condition_variable_any,这两个实现都包含在 <condition_variable> 头文件中。

condition_variable_any 类是 std::condition_variable 的泛化。相对于只在 std::unique_lock<std::mutex> 上工作的 std::condition_variablecondition_variable_any 能在任何满足可基本锁定(BasicLockable)要求的锁上工作,所以增加了 _any 后缀。显而易见,这种区分必然是 any更加通用但是却有更多的性能开销。所以通常首选 std::condition_variable。有特殊需求,才会考虑 std::condition_variable_any

std::mutex mtx;
std::condition_variable cv;
bool arrived = false;

void wait_for_arrival() {
    std::unique_lock<std::mutex> lck(mtx);
    cv.wait(lck, []{ return arrived; }); // 等待 arrived 变为 true
    std::cout << "到达目的地,可以下车了!" << std::endl;
}

void simulate_arrival() {
    std::this_thread::sleep_for(std::chrono::seconds(5)); // 模拟地铁到站,假设5秒后到达目的地
    {
        std::lock_guard<std::mutex> lck(mtx);
        arrived = true; // 设置条件变量为 true,表示到达目的地
    }
    cv.notify_one(); // 通知等待的线程
}

运行测试。更换为 std::condition_variable_any 效果相同

  • std::mutex mtx: 创建了一个互斥量,用于保护共享数据的访问,确保在多线程环境下的数据同步。

  • std::condition_variable cv: 创建了一个条件变量,用于线程间的同步,当条件不满足时,线程可以等待,直到条件满足时被唤醒。

  • bool arrived = false: 设置了一个标志位,表示是否到达目的地。

wait_for_arrival 函数中:

  1. std::unique_lock<std::mutex> lck(mtx): 使用互斥量创建了一个独占锁。

  2. cv.wait(lck, []{ return arrived; }): 阻塞当前线程,释放(unlock)锁,直到条件被满足。

  3. 一旦条件满足,即 arrived 变为 true,并且条件变量 cv唤醒(包括虚假唤醒),那么当前线程会重新获取锁(lock),并执行后续的操作。

simulate_arrival 函数中:

  1. std::this_thread::sleep_for(std::chrono::seconds(5)): 模拟地铁到站,暂停当前线程 5 秒。

  2. 设置 arrived 为 true,表示到达目的地。

  3. cv.notify_one(): 唤醒一个等待条件变量的线程。

这样,当 simulate_arrival 函数执行后,arrived 被设置为 true,并且通过 cv.notify_one() 唤醒了等待在条件变量上的线程,从而使得 wait_for_arrival 函数中的等待结束,可以执行后续的操作,即输出提示信息。


条件变量的 wait 成员函数有两个版本,以上代码使用的就是第二个版本,传入了一个谓词

void wait(std::unique_lock<std::mutex>& lock);                 // 1

template<class Predicate>
void wait(std::unique_lock<std::mutex>& lock, Predicate pred); // 2

②等价于:

while (!pred())
    wait(lock);

第二个版本只是对第一个版本的包装,等待并判断谓词,会调用第一个版本的重载。这可以避免“虚假唤醒(spurious wakeup)”。

条件变量虚假唤醒是指在使用条件变量进行线程同步时,有时候线程可能会在没有收到通知的情况下被唤醒。问题取决于程序和系统的具体实现。解决方法很简单,在循环中等待并判断条件可一并解决。使用 C++ 标准库则没有这个烦恼了。

我们也可以简单看一下 MSVC STL 的源码实现

void wait(unique_lock<mutex>& _Lck) noexcept {
    _Cnd_wait(_Mycnd(), _Lck.mutex()->_Mymtx());
}

template <class _Predicate>
void wait(unique_lock<mutex>& _Lck, _Predicate _Pred) {
    while (!_Pred()) {
        wait(_Lck);
    }
}

线程安全的队列

在本节中,我们介绍了一个更为复杂的示例,以巩固我们对条件变量的学习。为了实现一个线程安全的队列,我们需要考虑以下两个关键点:

  1. 当执行 push 操作时,需要确保没有其他线程正在执行 pushpop 操作;同样,在执行 pop 操作时,也需要确保没有其他线程正在执行 pushpop 操作。

  2. 当队列为时,不应该执行 pop 操作。因此,我们需要使用条件变量来传递一个谓词,以确保在执行 pop 操作时队列不为空。

基于以上思考,我们设计了一个名为 threadsafe_queue 的模板类,如下:

template<typename T>
class threadsafe_queue {
    mutable std::mutex m;              // 互斥量,用于保护队列操作的独占访问
    std::condition_variable data_cond; // 条件变量,用于在队列为空时等待
    std::queue<T> data_queue;          // 实际存储数据的队列
public:
    threadsafe_queue() {}
    void push(T new_value) {
        {
            std::lock_guard<std::mutex> lk { m };
            data_queue.push(new_value);
        }
        data_cond.notify_one();
    }
    // 从队列中弹出元素(阻塞直到队列不为空)
    void pop(T& value) {
        std::unique_lock<std::mutex> lk{ m };
        data_cond.wait(lk, [this] {return !data_queue.empty(); });
        value = data_queue.front();
        data_queue.pop();
    }
    // 从队列中弹出元素(阻塞直到队列不为空),并返回一个指向弹出元素的 shared_ptr
    std::shared_ptr<T> pop() {
        std::unique_lock<std::mutex> lk{ m };
        data_cond.wait(lk, [this] {return !data_queue.empty(); });
        std::shared_ptr<T>res { std::make_shared<T>(data_queue.front()) };
        data_queue.pop();
        return res;
    }
    bool empty()const {
        std::lock_guard<std::mutex> lk (m);
        return data_queue.empty();
    }
};

请无视我们省略的构造、赋值、交换、try_xx 等操作。以上示例已经足够。

光写好了肯定不够,我们还得测试运行,我们可以写一个经典的:”生产者消费者模型“,也就是一个线程 push生产“,一个线程 pop消费“。

void producer(threadsafe_queue<int>& q) {
    for (int i = 0; i < 5; ++i) {
        q.push(i);
    }
}
void consumer(threadsafe_queue<int>& q) {
    for (int i = 0; i < 5; ++i) {
        int value{};
        q.pop(value);
    }
}

两个线程分别运行 producerconsumer,为了观测运行我们可以为 pushpop 中增加打印语句:

std::cout << "push:" << new_value << std::endl;
std::cout << "pop:" << value << std::endl;

可能运行结果是:

push:0
pop:0
push:1
pop:1
push:2
push:3
push:4
pop:2
pop:3
pop:4

这很正常,到底哪个线程会抢到 CPU 时间片持续运行,是系统调度决定的,我们只需要保证一开始提到的两点就行了:

pushpop 都只能单独执行;当队列为时,不执行 pop 操作。

我们可以给一个简单的示意图帮助你理解这段运行结果:

初始状态:队列为空
+---+---+---+---+---+

Producer 线程插入元素 0:
+---+---+---+---+---+
| 0 |   |   |   |   |

Consumer 线程弹出元素 0:
+---+---+---+---+---+
|   |   |   |   |   |

Producer 线程插入元素 1:
+---+---+---+---+---+
| 1 |   |   |   |   |

Consumer 线程弹出元素 1:
+---+---+---+---+---+
|   |   |   |   |   |

Producer 线程插入元素 2:
+---+---+---+---+---+
|   | 2 |   |   |   |

Producer 线程插入元素 3:
+---+---+---+---+---+
|   | 2 | 3 |   |   |

Producer 线程插入元素 4:
+---+---+---+---+---+
|   | 2 | 3 | 4 |   |

Consumer 线程弹出元素 2:
+---+---+---+---+---+
|   |   | 3 | 4 |   |

Consumer 线程弹出元素 3:
+---+---+---+---+---+
|   |   |   | 4 |   |

Consumer 线程弹出元素 4:
+---+---+---+---+---+
|   |   |   |   |   |

队列为空,所有元素已被弹出

到此,也就可以了。

使用 future

举个例子:我们在车站等车,你可能会做一些别的事情打发时间,比如学习现代 C++ 模板教程、观看 mq白 的视频教程、玩手机等。不过,你始终在等待一件事情:车到站

C++ 标准库将这种事件称为 future。它用于处理线程中需要等待某个事件的情况,线程知道预期结果。等待的同时也可以执行其它的任务。

C++ 标准库有两种 future,都声明在 <future> 头文件中:独占的 std::future 、共享的 std::shared_future。它们的区别与 std::unique_ptrstd::shared_ptr 类似。std::future 只能与单个指定事件关联,而 std::shared_future 能关联多个事件。它们都是模板,它们的模板类型参数,就是其关联的事件(函数)的返回类型。当多个线程需要访问一个独立 future 对象时, 必须使用互斥量或类似同步机制进行保护。而多个线程访问同一共享状态,若每个线程都是通过其自身的 shared_future 对象副本进行访问,则是安全的。

最简单有效的使用是,我们先前讲的 std::thread 在线程中执行任务是没有返回值的,这个问题就能使用 future 解决。

创建异步任务获取返回值

假设需要执行一个耗时任务并获取其返回值,但是并不急切的需要它。那么就可以启动新线程计算,然而 std::thread 没提供直接从线程获取返回值的机制。所以我们可以使用 std::async 函数模板。

使用 std::async 启动一个异步任务,它会返回一个 std::future 对象,这个对象和任务关联,将持有最终计算出来的结果。当需要任务执行完的结果的时候,只需要调用 get() 成员函数,就会阻塞直到 future 为就绪为止(即任务执行完毕),返回执行结果。valid() 成员函数检查 future 当前是否关联共享状态,即是否当前关联任务。还未关联,或者任务已经执行完(调用了 get()、set()),都会返回 false

#include <iostream>
#include <thread>
#include <future> // 引入 future 头文件

int task(int n) {
    std::cout << "异步任务 ID: " << std::this_thread::get_id() << '\n';
    return n * n;
}

int main() {
    std::future<int> future = std::async(task, 10);
    std::cout << "main: " << std::this_thread::get_id() << '\n';
    std::cout << std::boolalpha << future.valid() << '\n'; // true
    std::cout << future.get() << '\n';
    std::cout << std::boolalpha << future.valid() << '\n'; // false
}

运行测试。

std::thread 一样,std::async 支持任意可调用(Callable)对象,以及传递调用参数。包括支持使用 std::ref ,以及支持只能移动的类型。我们下面详细聊一下 std::async 参数传递的事。

struct X{
    int operator()(int n)const{
        return n * n;
    }
};
struct Y{
    int f(int n)const{
        return n * n;
    }
};
void f(int& p) { std::cout << &p << '\n'; }

int main(){
    Y y;
    int n = 0;
    auto t1 = std::async(X{}, 10);
    auto t2 = std::async(&Y::f,&y,10);
    auto t3 = std::async([] {});         
    auto t4 = std::async(f, std::ref(n));
    std::cout << &n << '\n';
}

运行测试。

如你所见,它支持所有可调用(Callable)对象,并且也是默认按值复制,必须使用 std::ref 才能传递引用。并且它和 std::thread 一样,内部会将保有的参数副本转换为右值表达式进行传递,这是为了那些只支持移动的类型,左值引用没办法引用右值表达式,所以如果不使用 std::ref,这里 void f(int&) 就会导致编译错误,如果是 void f(const int&) 则可以通过编译,不过引用的不是我们传递的局部对象。

void f(const int& p) {}
void f2(int& p ){}

int n = 0;
std::async(f, n);   // OK! 可以通过编译,不过引用的并非是局部的n
std::async(f2, n);  // Error! 无法通过编译

我们来展示使用 std::move ,也就是移动传递参数并接受返回值:

struct move_only{
    move_only() { std::puts("默认构造"); }
    move_only(move_only&&)noexcept { std::puts("移动构造"); }
    move_only& operator=(move_only&&) noexcept {
        std::puts("移动赋值");
        return *this;
    }
    move_only(const move_only&) = delete;
};

move_only task(move_only x){
    std::cout << "异步任务 ID: " << std::this_thread::get_id() << '\n';
    return x;
}

int main(){
    move_only x;
    std::future<move_only> future = std::async(task, std::move(x));
    std::this_thread::sleep_for(std::chrono::seconds(1));
    std::cout << "main\n";
    move_only result = future.get();  // 等待异步任务执行完毕
}

运行测试。

如你所见,它支持只移动类型,我们将参数使用 std::move 传递,接收参数的时候直接调用 get 函数即可。


接下来我们聊 std::async执行策略,我们前面一直没有使用,其实就是在传递可调用对象与参数之前传递枚举值罢了:

  1. std::launch::async 在不同线程上执行异步任务。

  2. std::launch::deferred 惰性求值,不创建线程,等待 future 对象调用 waitget 成员函数的时候执行任务。

而我们先前一直没有写明这个参数,是因为 std::async 函数模板有两个重载,不给出执行策略就是以:std::launch::async | std::launch::deferred 调用另一个重载版本(这一点中在源码中很明显),此策略表示由实现选择到底是否创建线程执行异步任务。典型情况是,如果系统资源充足,并且异步任务的执行不会导致性能问题,那么系统可能会选择在新线程中执行任务。但是,如果系统资源有限,或者延迟执行可以提高性能或节省资源,那么系统可能会选择延迟执行。

如果你阅读 libstdc++ 的代码,会发现的确如此。

然而值得注意的是,在 MSVC STL 的实现中,launch::async | launch::deferredlaunch::async 执行策略毫无区别,源码如下:

template <class _Ret, class _Fty>
_Associated_state<typename _P_arg_type<_Ret>::type>* _Get_associated_state(launch _Psync, _Fty&& _Fnarg) {
     // construct associated asynchronous state object for the launch type
     switch (_Psync) { // select launch type
     case launch::deferred:
           return new _Deferred_async_state<_Ret>(_STD forward<_Fty>(_Fnarg));
     case launch::async: // TRANSITION, fixed in vMajorNext, should create a new thread here
     default:
           return new _Task_async_state<_Ret>(_STD forward<_Fty>(_Fnarg));
     }
}

_Task_async_state 会通过 ::Concurrency::create_task 从线程池中获取线程并执行任务返回包装对象。

简而言之,使用 std::async,只要不是 launch::deferred 策略,那么 MSVC STL 实现中都是必然在线程中执行任务。因为是线程池,所以执行新任务是否创建新线程,任务执行完毕线程是否立即销毁,不确定

我们来展示一下:

void f(){
    std::cout << std::this_thread::get_id() << '\n';
}

int main(){
    std::cout << std::this_thread::get_id() << '\n';
    auto f1 = std::async(std::launch::deferred, f);
    f1.wait(); // 在 wait() 或 get() 调用时执行,不创建线程
    auto f2 = std::async(std::launch::async,f); // 创建线程执行异步任务
    auto f3 = std::async(std::launch::deferred | std::launch::async, f); // 实现选择的执行方式
}

运行测试。


其实到此基本就差不多了,我们再介绍两个常见问题即可:

  1. 如果从 std::async 获得的 std::future 没有被移动或绑定到引用,那么在完整表达式结尾, std::future 的**析构函数将阻塞,直到到异步任务完成**。因为临时对象的生存期就在这一行,而对象生存期结束就会调用调用析构函数。

    std::async(std::launch::async, []{ f(); }); // 临时量的析构函数等待 f()
    std::async(std::launch::async, []{ g(); }); // f() 完成前不开始

    如你所见,这并不能创建异步任务,它会阻塞,然后逐个执行。

  2. 被移动的 std::future 没有所有权,失去共享状态,不能调用 getwait 成员函数。

    auto t = std::async([] {});
    std::future<void> future{ std::move(t) };
    t.wait();   // Error! 抛出异常

    如同没有线程资源所有权的 std::thread 对象调用 join() 一样错误,这是移动语义的基本语义逻辑。

futurestd::packaged_task

类模板 std::packaged_task 包装任何可调用(Callable)目标(函数、lambda 表达式、bind 表达式或其它函数对象),使得能异步调用它。其返回值或所抛异常被存储于能通过 std::future 对象访问的共享状态中。

通常它会和 std::future 一起使用,不过也可以单独使用,我们一步一步来:

std::packaged_task<double(int, int)> task([](int a, int b){
    return std::pow(a, b);
});
task(10, 2); // 执行传递的 lambda,但无法获取返回值

它有 operator() 的重载,它会执行我们传递的可调用(Callable)对象,不过这个重载的返回类型是 void 没办法获取返回值

如果想要异步的获取返回值,我们需要在调用 operator() 之前,让它和 future 关联,然后使用 future.get(),也就是:

std::packaged_task<double(int, int)> task([](int a, int b){
    return std::pow(a, b);
});
std::future<double>future = task.get_future();
task(10, 2); // 此处执行任务
std::cout << future.get() << '\n'; // 不阻塞,此处获取返回值

运行测试。

先关联任务,再执行任务,当我们想要获取任务的返回值的时候,就 future.get() 即可。值得注意的是,任务并不会在线程中执行,想要在线程中执行异步任务,然后再获取返回值,我们可以这么做:

std::packaged_task<double(int, int)> task([](int a, int b){
    return std::pow(a, b);
});
std::future<double> future = task.get_future();
std::thread t{ std::move(task),10,2 }; // 任务在线程中执行
// todo.. 幻想还有许多耗时的代码
t.join();

std::cout << future.get() << '\n'; // 并不阻塞,获取任务返回值罢了

运行测试。

因为 task 本身是重载了 operator() 的,是可调用对象,自然可以传递给 std::thread 执行,以及传递调用参数。唯一需要注意的是我们使用了 std::move ,这是因为 std::packaged_task 只能移动,不能复制。


简而言之,其实 std::packaged_task 也就是一个“包装”类而已,它本身并没什么特殊的,老老实实执行我们传递的任务,且方便我们获取返回值罢了,明确这一点,那么一切都不成问题。

std::packaged_task 也可以在线程中传递,在需要的时候获取返回值,而非像上面那样将它自己作为可调用对象:

template<typename R, typename...Ts, typename...Args>
    requires std::invocable<std::packaged_task<R(Ts...)>&, Args...> 
void async_task(std::packaged_task<R(Ts...)>& task, Args&&...args) {
    // todo..
    task(std::forward<Args>(args)...);
}

int main() {
    std::packaged_task<int(int,int)> task([](int a,int b){
        return a + b;
    });
    
    int value = 50;
    std::future<int> future = task.get_future();
    // 创建一个线程来执行异步任务
    std::thread t{ [&] {async_task(task, value, value); } };
    std::cout << future.get() << '\n';
    t.join();
}

运行测试。

我们套了一个 lambda,这是因为函数模板不是函数,它并非具体类型,没办法直接被那样传递使用,只能包一层了。这只是一个简单的示例,展示可以使用 std::packaged_task 作函数形参,然后我们来传递任务进行异步调用等操作。

我们再将第二章实现的并行 sum 改成 std::package_task + std::future 的形式:

template<typename ForwardIt>
auto sum(ForwardIt first, ForwardIt last) {
    using value_type = std::iter_value_t<ForwardIt>;
    std::size_t num_threads = std::thread::hardware_concurrency();
    std::ptrdiff_t distance = std::distance(first, last);

    if (distance > 1024000) {
        // 计算每个线程处理的元素数量
        std::size_t chunk_size = distance / num_threads;
        std::size_t remainder = distance % num_threads;

        // 存储每个线程要执行的任务
        std::vector<std::packaged_task<value_type()>>tasks;
        // 和每一个任务进行关联的 future 用于获取返回值
        std::vector<std::future<value_type>>futures(num_threads);

        // 存储关联线程的线程对象
        std::vector<std::thread> threads;

        // 制作任务、与 future 关联、启动线程执行
        auto start = first;
        for (std::size_t i = 0; i < num_threads; ++i) {
            auto end = std::next(start, chunk_size + (i < remainder ? 1 : 0));
            tasks.emplace_back(std::packaged_task<value_type()>{[start, end, i] {
                return std::accumulate(start, end, value_type{});
            }});
            start = end; // 开始迭代器不断向前
            futures[i] = tasks[i].get_future(); // 任务与 std::future 关联
            threads.emplace_back(std::move(tasks[i]));
        }

        // 等待所有线程执行完毕
        for (auto& thread : threads)
            thread.join();

        // 汇总线程的计算结果
        value_type total_sum {};
        for (std::size_t i = 0; i < num_threads; ++i) {
            total_sum += futures[i].get();
        }
        return total_sum;
    }

    value_type total_sum = std::accumulate(first, last, value_type{});
    return total_sum;
}

运行测试。

相比于之前,其实不同无非是定义了 std::vector<std::packaged_task<value_type()>> tasksstd::vector<std::future<value_type>> futures ,然后在循环中制造任务插入容器,关联 tuple,再放到线程中执行。最后汇总的时候写一个循环,futures[i].get() 获取任务的返回值加起来即可。

到此,也就可以了。

使用 std::promise

类模板 std::promise 用于存储一个值或一个异常,之后通过 std::promise 对象所创建的 std::future 对象异步获得。

// 计算函数,接受一个整数并返回它的平方
void calculate_square(std::promise<int> promiseObj, int num) {
    // 模拟一些计算
    std::this_thread::sleep_for(std::chrono::seconds(1));

    // 计算平方并设置值到 promise 中
    promiseObj.set_value(num * num);
}

// 创建一个 promise 对象,用于存储计算结果
std::promise<int> promise;

// 从 promise 获取 future 对象进行关联
std::future<int> future = promise.get_future();

// 启动一个线程进行计算
int num = 5;
std::thread t(calculate_square, std::move(promise), num);

// 阻塞,直到结果可用
int result = future.get();
std::cout << num << " 的平方是:" << result << std::endl;

t.join();

运行测试。

我们在新线程中通过调用 set_value() 函数设置 promise 的值,并在主线程中通过与其关联的 future 对象的 get() 成员函数获取这个值,如果promise的值还没有被设置,那么将阻塞当前线程,直到被设置为止。同样的 std::promise 只能移动,不可复制,所以我们使用了 std::move 进行传递。


除了 set_value() 函数外,std::promise 还有一个 set_exception() 成员函数,它接受一个 std::exception_ptr 类型的参数,这个参数通常通过 std::current_exception() 获取,用于指示当前线程中抛出的异常。然后,std::future 对象通过 get() 函数获取这个异常,如果 promise 所在的函数有异常被抛出,则 std::future 对象会重新抛出这个异常,从而允许主线程捕获并处理它。

void throw_function(std::promise<int> prom) {
    try {
        throw std::runtime_error("一个异常");
    }
    catch (...) {
        prom.set_exception(std::current_exception());
    }
}

int main() {
    std::promise<int> prom;
    std::future<int> fut = prom.get_future();

    std::thread t(throw_function, std::move(prom));

    try {
        std::cout << "等待线程执行,抛出异常并设置\n";
        fut.get();
    }
    catch (std::exception& e) {
        std::cerr << "来自线程的异常: " << e.what() << '\n';
    }
    t.join();
}

运行结果

等待线程执行,抛出异常并设置
来自线程的异常: 一个异常

你可能对这段代码还有一些疑问:我们写的是 promised<int> ,但是却没有使用 set_value 设置值,你可能会想着再写一行 prom.set_value(0)

共享状态的 promise 已经存储值或者异常,再次调用 set_valueset_exception) 会抛出 std::future_error 异常,将错误码设置为 promise_already_satisfied。这是因为 std::promise 对象只能是存储值或者异常其中一种,而无法共存

简而言之,set_valueset_exception 二选一,如果先前调用了 set_value ,就不可再次调用 set_exception,反之亦然(不然就会抛出异常),示例如下:

void throw_function(std::promise<int> prom) {
    prom.set_value(100);
    try {
        throw std::runtime_error("一个异常");
    }
    catch (...) {
        try{
            // 共享状态的 promise 已存储值,调用 set_exception 产生异常
            prom.set_exception(std::current_exception());
        }catch (std::exception& e){
            std::cerr << "来自 set_exception 的异常: " << e.what() << '\n';
        }
    }
}

int main() {
    std::promise<int> prom;
    std::future<int> fut = prom.get_future();

    std::thread t(throw_function, std::move(prom));
    
    std::cout << "等待线程执行,抛出异常并设置\n";
    std::cout << "值:" << fut.get() << '\n'; // 100

    t.join();
}

运行结果

等待线程执行,抛出异常并设置
值:100
来自 set_exception 的异常: promise already satisfied

future 的状态变化

需要注意的是,future 是一次性的,所以你需要注意移动。并且,调用 get 函数后,future 对象也会失去共享状态

  • 移动语义:这一点很好理解并且常见,因为移动操作标志着所有权的转移,意味着 future 不再拥有共享状态(如之前所提到)。getwait 函数要求 future 对象拥有共享状态,否则会抛出异常。

  • 共享状态失效:调用 get 成员函数时,future 对象必须拥有共享状态,但调用完成后,它就会失去共享状态,不能再次调用 get。这是我们在本节需要特别讨论的内容。

std::future<void>future = std::async([] {});
std::cout << std::boolalpha << future.valid() << '\n'; // true
future.get();
std::cout << std::boolalpha << future.valid() << '\n'; // false
try {
    future.get(); // 抛出 future_errc::no_state 异常
}
catch (std::exception& e) {
    std::cerr << e.what() << '\n';
}

运行测试。

这个问题在许多文档中没有明确说明,但通过阅读源码(MSVC STL),可以很清楚地理解:

// std::future<void>
void get() {
    // block until ready then return or throw the stored exception
    future _Local{_STD move(*this)};
    _Local._Get_value();
}
// std::future<T>
_Ty get() {
    // block until ready then return the stored result or throw the stored exception
    future _Local{_STD move(*this)};
    return _STD move(_Local._Get_value());
}
// std::future<T&>
_Ty& get() {
    // block until ready then return the stored result or throw the stored exception
    future _Local{_STD move(*this)};
    return *_Local._Get_value();