现代C++并发编程教程
  • README
  • 此处存放作业
  • 阅读须知
    • 基本概念
    • 使用线程
    • 共享数据
    • 同步操作
    • 内存模型与原子操作
    • 协程
    • 详细分析
      • std::thread 的构造-源码解析
      • std::scoped_lock 的源码实现与解析
      • std::async 与 std::future 源码解析
      • 线程池
  • image
    • 捐赠
Powered by GitBook
On this page
  • 等待事件或条件
  • 线程安全的队列
  • 使用条件变量实现后台提示音播放
  • 使用 future
  • 创建异步任务获取返回值
  • future 与 std::packaged_task
  • 使用 std::promise
  • future 的状态变化
  • 多个线程的等待 std::shared_future
  • 限时等待
  • 时钟
  • 时间段
  • 时间点
  • 异步任务执行
  • 背景介绍
  • 项目说明
  • 完整代码实现
  • 注意事项
  • 跨平台兼容性
  • 实践建议
  • C++20 信号量
  • C++20 闩与屏障
  • std::latch
  • std::barrier
  • 总结
  1. 阅读须知

同步操作

Previous共享数据Next内存模型与原子操作

Last updated 7 months ago

"同步操作"是指在计算机科学和信息技术中的一种操作方式,其中不同的任务或操作按顺序执行,一个操作完成后才能开始下一个操作。在多线程编程中,各个任务通常需要通过同步设施进行相互协调和等待,以确保数据的一致性和正确性。

本章的主要内容有:

  • 条件变量

  • std::future 等待异步任务

  • 在规定时间内等待

  • Qt 实现异步任务的示例

  • 其它 C++20 同步设施:信号量、闩与屏障

本章将讨论如何使用条件变量等待事件,介绍 future 等标准库设施用作同步操作,使用Qt+CMake 构建一个项目展示多线程的必要性,介绍 C++20 引入的新的同步设施。

等待事件或条件

假设你正在一辆夜间运行的地铁上,那么你要如何在正确的站点下车呢?

  1. 一直不休息,每一站都能知道,这样就不会错过你要下车的站点,但是这会很疲惫。

  2. 可以看一下时间,估算一下地铁到达目的地的时间,然后设置一个稍早的闹钟,就休息。这个方法听起来还行,但是你可能被过早的叫醒,甚至估算错误导致坐过站,又或者闹钟没电了睡过站。

  3. 事实上最简单的方式是,到站的时候有人或者其它东西能将你叫醒(比如手机的地图,到达设置的位置就提醒)。

这和线程有什么关系呢?其实第一种方法就是在说”(busy waiting)”也称“自旋“。

bool flag = false;
std::mutex m;

void wait_for_flag(){
    std::unique_lock<std::mutex> lk{ m };
    while (!flag){
        lk.unlock();    // 1 解锁互斥量
        lk.lock();      // 2 上锁互斥量
    }
}

第二种方法就是加个延时,这种实现进步了很多,减少浪费的执行时间,但很难确定正确的休眠时间。这会影响到程序的行为,在需要快速响应的程序中就意味着丢帧或错过了一个时间片。循环中,休眠②前函数对互斥量解锁①,再休眠结束后再对互斥量上锁,让另外的线程有机会获取锁并设置标识(因为修改函数和等待函数共用一个互斥量)。

void wait_for_flag(){
    std::unique_lock<std::mutex> lk{ m };
    while (!flag){
        lk.unlock();    // 1 解锁互斥量
        std::this_thread::sleep_for(std::chrono::milliseconds(100)); // 2 休眠
        lk.lock();      // 3 上锁互斥量
    }
}

第三种方式(也是最好的)实际上就是使用条件变量了。通过另一线程触发等待事件的机制是最基本的唤醒方式,这种机制就称为“条件变量”。

std::mutex mtx;
std::condition_variable cv;
bool arrived = false;

void wait_for_arrival() {
    std::unique_lock<std::mutex> lck(mtx);
    cv.wait(lck, []{ return arrived; }); // 等待 arrived 变为 true
    std::cout << "到达目的地,可以下车了!" << std::endl;
}

void simulate_arrival() {
    std::this_thread::sleep_for(std::chrono::seconds(5)); // 模拟地铁到站,假设5秒后到达目的地
    {
        std::lock_guard<std::mutex> lck(mtx);
        arrived = true; // 设置条件变量为 true,表示到达目的地
    }
    cv.notify_one(); // 通知等待的线程
}
  • std::mutex mtx: 创建了一个互斥量,用于保护共享数据的访问,确保在多线程环境下的数据同步。

  • std::condition_variable cv: 创建了一个条件变量,用于线程间的同步,当条件不满足时,线程可以等待,直到条件满足时被唤醒。

  • bool arrived = false: 设置了一个标志位,表示是否到达目的地。

在 wait_for_arrival 函数中:

  1. std::unique_lock<std::mutex> lck(mtx): 使用互斥量创建了一个独占锁。

  2. cv.wait(lck, []{ return arrived; }): 阻塞当前线程,释放(unlock)锁,直到条件被满足。

  3. 一旦条件满足,即 arrived 变为 true,并且条件变量 cv 被唤醒(包括虚假唤醒),那么当前线程会重新获取锁(lock),并执行后续的操作。

在 simulate_arrival 函数中:

  1. std::this_thread::sleep_for(std::chrono::seconds(5)): 模拟地铁到站,暂停当前线程 5 秒。

  2. 设置 arrived 为 true,表示到达目的地。

  3. cv.notify_one(): 唤醒一个等待条件变量的线程。

这样,当 simulate_arrival 函数执行后,arrived 被设置为 true,并且通过 cv.notify_one() 唤醒了等待在条件变量上的线程,从而使得 wait_for_arrival 函数中的等待结束,可以执行后续的操作,即输出提示信息。


void wait(std::unique_lock<std::mutex>& lock);                 // 1

template<class Predicate>
void wait(std::unique_lock<std::mutex>& lock, Predicate pred); // 2

②等价于:

while (!pred())
    wait(lock);

条件变量虚假唤醒是指在使用条件变量进行线程同步时,有时候线程可能会在没有收到通知的情况下被唤醒。问题取决于程序和系统的具体实现。解决方法很简单,在循环中等待并判断条件可一并解决。使用 C++ 标准库则没有这个烦恼了。

void wait(unique_lock<mutex>& _Lck) noexcept {
    _Cnd_wait(_Mycnd(), _Lck.mutex()->_Mymtx());
}

template <class _Predicate>
void wait(unique_lock<mutex>& _Lck, _Predicate _Pred) {
    while (!_Pred()) {
        wait(_Lck);
    }
}

线程安全的队列

在本节中,我们介将绍一个更为复杂的示例,以巩固我们对条件变量的学习。为了实现一个线程安全的队列,我们需要考虑以下两个关键点:

  1. 当执行 push 操作时,需要确保没有其他线程正在执行 push 或 pop 操作;同样,在执行 pop 操作时,也需要确保没有其他线程正在执行 push 或 pop 操作。

  2. 当队列为空时,不应该执行 pop 操作。因此,我们需要使用条件变量来传递一个谓词,以确保在执行 pop 操作时队列不为空。

基于以上思考,我们设计了一个名为 threadsafe_queue 的模板类,如下:

template<typename T>
class threadsafe_queue {
    mutable std::mutex m;              // 互斥量,用于保护队列操作的独占访问
    std::condition_variable data_cond; // 条件变量,用于在队列为空时等待
    std::queue<T> data_queue;          // 实际存储数据的队列
public:
    threadsafe_queue() {}
    void push(T new_value) {
        {
            std::lock_guard<std::mutex> lk { m };
            data_queue.push(new_value);
        }
        data_cond.notify_one();
    }
    // 从队列中弹出元素(阻塞直到队列不为空)
    void pop(T& value) {
        std::unique_lock<std::mutex> lk{ m };
        data_cond.wait(lk, [this] {return !data_queue.empty(); });
        value = data_queue.front();
        data_queue.pop();
    }
    // 从队列中弹出元素(阻塞直到队列不为空),并返回一个指向弹出元素的 shared_ptr
    std::shared_ptr<T> pop() {
        std::unique_lock<std::mutex> lk{ m };
        data_cond.wait(lk, [this] {return !data_queue.empty(); });
        std::shared_ptr<T> res { std::make_shared<T>(data_queue.front()) };
        data_queue.pop();
        return res;
    }
    bool empty()const {
        std::lock_guard<std::mutex> lk (m);
        return data_queue.empty();
    }
};

请无视我们省略的构造、赋值、交换、try_xx 等操作。以上示例已经足够。

光写好了肯定不够,我们还得测试运行,我们可以写一个经典的:”生产者消费者模型“,也就是一个线程 push ”生产“,一个线程 pop ”消费“。

void producer(threadsafe_queue<int>& q) {
    for (int i = 0; i < 5; ++i) {
        q.push(i);
    }
}
void consumer(threadsafe_queue<int>& q) {
    for (int i = 0; i < 5; ++i) {
        int value{};
        q.pop(value);
    }
}

两个线程分别运行 producer 与 consumer,为了观测运行我们可以为 push 与 pop 中增加打印语句:

std::cout << "push:" << new_value << std::endl;
std::cout << "pop:" << value << std::endl;
push:0
pop:0
push:1
pop:1
push:2
push:3
push:4
pop:2
pop:3
pop:4

这很正常,到底哪个线程会抢到 CPU 时间片持续运行,是系统调度决定的,我们只需要保证一开始提到的两点就行了:

push 与 pop 都只能单独执行;当队列为空时,不执行 pop 操作。

我们可以给一个简单的示意图帮助你理解这段运行结果:

初始状态:队列为空
+---+---+---+---+---+

Producer 线程插入元素 0:
+---+---+---+---+---+
| 0 |   |   |   |   |

Consumer 线程弹出元素 0:
+---+---+---+---+---+
|   |   |   |   |   |

Producer 线程插入元素 1:
+---+---+---+---+---+
| 1 |   |   |   |   |

Consumer 线程弹出元素 1:
+---+---+---+---+---+
|   |   |   |   |   |

Producer 线程插入元素 2:
+---+---+---+---+---+
|   | 2 |   |   |   |

Producer 线程插入元素 3:
+---+---+---+---+---+
|   | 2 | 3 |   |   |

Producer 线程插入元素 4:
+---+---+---+---+---+
|   | 2 | 3 | 4 |   |

Consumer 线程弹出元素 2:
+---+---+---+---+---+
|   |   | 3 | 4 |   |

Consumer 线程弹出元素 3:
+---+---+---+---+---+
|   |   |   | 4 |   |

Consumer 线程弹出元素 4:
+---+---+---+---+---+
|   |   |   |   |   |

队列为空,所有元素已被弹出

到此,也就可以了。

使用条件变量实现后台提示音播放

一个常见的场景是:当你的软件完成了主要功能后,领导可能突然要求添加一些竞争对手产品的功能。比如领导看到了人家的设备跑起来总是有一些播报,说明当前的情况,执行的过程,或者报错了也会有提示音说明。于是就想让我们的程序也增加“语音提示”的功能。此时,你需要考虑如何在程序运行到不同状态时添加适当的语音播报,并且确保这些提示音的播放不会影响其他功能的正常运行。

为了不影响程序的流畅执行,提示音的播放显然不能占据业务线程的资源。我们需要额外启动一个线程来专门处理这个任务。

但是,大多数的提示音播放都是短暂且简单。如果每次播放提示音时都新建一个线程,且不说创建线程也需要大量时间,可能影响业务正常的执行任务的流程,就光是其频繁创建线程的开销也是不能接受的。


因此,更合理的方案是:在程序启动时,就启动一个专门用于播放提示音的线程。当没有需要播放的提示时,该线程会一直处于等待状态;一旦有提示音需要播放,线程就被唤醒,完成播放任务。

具体来说,我们可以通过条件变量来实现这一逻辑,核心是监控一个音频队列。我们可以封装一个类型,包含以下功能:

  • 一个成员函数在对象构造时就启动,使用条件变量监控队列是否为空,互斥量确保共享资源的同步。如果队列中有任务,就取出并播放提示音;如果队列为空,则线程保持阻塞状态,等待新的任务到来。

  • 提供一个外部函数,以供在需要播放提示音的时候调用它,向队列添加新的元素,该函数需要通过互斥量来保护数据一致性,并在成功添加任务后唤醒条件变量,通知播放线程执行任务。

这种设计通过合理利用条件变量和互斥量,不仅有效减少了 CPU 的无效开销,还能够确保主线程的顺畅运行。它不仅适用于提示音的播放,还能扩展用于其他类似的后台任务场景。

class AudioPlayer {
public:
    AudioPlayer() : stop{ false }, player_thread{ &AudioPlayer::playMusic, this }
    {}

    ~AudioPlayer() {
        // 等待队列中所有音乐播放完毕
        while (!audio_queue.empty()) {
            std::this_thread::sleep_for(50ms);
        }
        stop = true;
        cond.notify_all();
        if (player_thread.joinable()) {
            player_thread.join();
        }
    }

    void addAudioPath(const std::string& path) {
        std::lock_guard<std::mutex> lock{ mtx }; // 互斥量确保了同一时间不会有其它地方在操作共享资源(队列)
        audio_queue.push(path); // 为队列添加元素 表示有新的提示音需要播放
        cond.notify_one();      // 通知线程新的音频
    }

private:
    void playMusic() {
        while (!stop) {
            std::string path;
            {
                std::unique_lock<std::mutex> lock{ mtx };
                cond.wait(lock, [this] { return !audio_queue.empty() || stop; });

                if (audio_queue.empty()) return; // 防止在对象为空时析构出错

                path = audio_queue.front(); // 从队列中取出元素
                audio_queue.pop();          // 取出后就删除元素,表示此元素已被使用
            }

            if (!music.openFromFile(path)) {
                std::cerr << "无法加载音频文件: " << path << std::endl;
                continue;  // 继续播放下一个音频
            }

            music.play();

            // 等待音频播放完毕
            while (music.getStatus() == sf::SoundSource::Playing) {
                sf::sleep(sf::seconds(0.1f));  // sleep 避免忙等占用 CPU
            }
        }
    }

    std::atomic<bool> stop;              // 控制线程的停止与退出,
    std::thread player_thread;           // 后台执行音频任务的专用线程
    std::mutex mtx;                      // 保护共享资源
    std::condition_variable cond;        // 控制线程等待和唤醒,当有新任务时通知音频线程
    std::queue<std::string> audio_queue; // 音频任务队列,存储待播放的音频文件路径
    sf::Music music;                     // SFML 音频播放器,用于加载和播放音频文件
};

该代码实现了一个简单的后台音频播放类型,通过条件变量和互斥量确保播放线程 playMusic 只在只在有音频任务需要播放时工作(当外部通过调用 addAudioPath() 向队列添加播放任务时)。在没有任务时,线程保持等待状态,避免占用 CPU 资源影响主程序的运行。

注意

此外,关于提示音的播报,为了避免每次都手动添加路径,我们可以创建一个音频资源数组,便于使用:

static constexpr std::array soundResources{
    "./sound/01初始化失败.ogg",
    "./sound/02初始化成功.ogg",
    "./sound/03试剂不足,请添加.ogg",
    "./sound/04试剂已失效,请更新.ogg",
    "./sound/05清洗液不足,请添加.ogg",
    "./sound/06废液桶即将装满,请及时清空.ogg",
    "./sound/07废料箱即将装满,请及时清空.ogg",
    "./sound/08激发液A液不足,请添加.ogg",
    "./sound/09激发液B液不足,请添加.ogg",
    "./sound/10反应杯不足,请添加.ogg",
    "./sound/11检测全部完成.ogg"
};

为了提高代码的可读性,我们还可以使用一个枚举类型来表示音频资源的索引:

enum SoundIndex {
    InitializationFailed,
    InitializationSuccessful,
    ReagentInsufficient,
    ReagentExpired,
    CleaningAgentInsufficient,
    WasteBinAlmostFull,
    WasteContainerAlmostFull,
    LiquidAInsufficient,
    LiquidBInsufficient,
    ReactionCupInsufficient,
    DetectionCompleted,
    SoundCount // 总音频数量,用于计数
};
sudo apt-get install libflac-dev
sudo apt-get install libopenal-dev

使用 future

最简单有效的使用是,我们先前讲的 std::thread 在线程中执行任务是没有返回值的,这个问题就能使用 future 解决。

创建异步任务获取返回值

#include <iostream>
#include <thread>
#include <future> // 引入 future 头文件

int task(int n) {
    std::cout << "异步任务 ID: " << std::this_thread::get_id() << '\n';
    return n * n;
}

int main() {
    std::future<int> future = std::async(task, 10);
    std::cout << "main: " << std::this_thread::get_id() << '\n';
    std::cout << std::boolalpha << future.valid() << '\n'; // true
    std::cout << future.get() << '\n';
    std::cout << std::boolalpha << future.valid() << '\n'; // false
}
struct X{
    int operator()(int n)const{
        return n * n;
    }
};
struct Y{
    int f(int n)const{
        return n * n;
    }
};
void f(int& p) { std::cout << &p << '\n'; }

int main(){
    Y y;
    int n = 0;
    auto t1 = std::async(X{}, 10);
    auto t2 = std::async(&Y::f,&y,10);
    auto t3 = std::async([] {});         
    auto t4 = std::async(f, std::ref(n));
    std::cout << &n << '\n';
}
void f(const int& p) {}
void f2(int& p ){}

int n = 0;
std::async(f, n);   // OK! 可以通过编译,不过引用的并非是局部的n
std::async(f2, n);  // Error! 无法通过编译

我们来展示使用 std::move ,也就是移动传递参数并接受返回值:

struct move_only{
    move_only() { std::puts("默认构造"); }
    move_only(move_only&&)noexcept { std::puts("移动构造"); }
    move_only& operator=(move_only&&) noexcept {
        std::puts("移动赋值");
        return *this;
    }
    move_only(const move_only&) = delete;
};

move_only task(move_only x){
    std::cout << "异步任务 ID: " << std::this_thread::get_id() << '\n';
    return x;
}

int main(){
    move_only x;
    std::future<move_only> future = std::async(task, std::move(x));
    std::this_thread::sleep_for(std::chrono::seconds(1));
    std::cout << "main\n";
    move_only result = future.get();  // 等待异步任务执行完毕
}

如你所见,它支持只移动类型,我们将参数使用 std::move 传递,接收参数的时候直接调用 get 函数即可。


  1. std::launch::async 在不同线程上执行异步任务。

  2. std::launch::deferred 惰性求值,不创建线程,等待 future 对象调用 wait 或 get 成员函数的时候执行任务。

template <class _Ret, class _Fty>
_Associated_state<typename _P_arg_type<_Ret>::type>* _Get_associated_state(launch _Psync, _Fty&& _Fnarg) {
     // construct associated asynchronous state object for the launch type
     switch (_Psync) { // select launch type
     case launch::deferred:
           return new _Deferred_async_state<_Ret>(_STD forward<_Fty>(_Fnarg));
     case launch::async: // TRANSITION, fixed in vMajorNext, should create a new thread here
     default:
           return new _Task_async_state<_Ret>(_STD forward<_Fty>(_Fnarg));
     }
}

且 _Task_async_state 会通过 ::Concurrency::create_task 从线程池中获取线程并执行任务返回包装对象。

简而言之,使用 std::async,只要不是 launch::deferred 策略,那么 MSVC STL 实现中都是必然在线程中执行任务。因为是线程池,所以执行新任务是否创建新线程,任务执行完毕线程是否立即销毁,不确定。

我们来展示一下:

void f(){
    std::cout << std::this_thread::get_id() << '\n';
}

int main(){
    std::cout << std::this_thread::get_id() << '\n';
    auto f1 = std::async(std::launch::deferred, f);
    f1.wait(); // 在 wait() 或 get() 调用时执行,不创建线程
    auto f2 = std::async(std::launch::async,f); // 创建线程执行异步任务
    auto f3 = std::async(std::launch::deferred | std::launch::async, f); // 实现选择的执行方式
}

其实到此基本就差不多了,我们再介绍两个常见问题即可:

  1. std::async(std::launch::async, []{ f(); }); // 临时量的析构函数等待 f()
    std::async(std::launch::async, []{ g(); }); // f() 完成前不开始

    如你所见,这并不能创建异步任务,它会阻塞,然后逐个执行。

  2. 被移动的 std::future 没有所有权,失去共享状态,不能调用 get、wait 成员函数。

    auto t = std::async([] {});
    std::future<void> future{ std::move(t) };
    t.wait();   // Error! 抛出异常

    如同没有线程资源所有权的 std::thread 对象调用 join() 一样错误,这是移动语义的基本语义逻辑。

future 与 std::packaged_task

通常它会和 std::future 一起使用,不过也可以单独使用,我们一步一步来:

std::packaged_task<double(int, int)> task([](int a, int b){
    return std::pow(a, b);
});
task(10, 2); // 执行传递的 lambda,但无法获取返回值

如果想要异步的获取返回值,我们需要在调用 operator() 之前,让它和 future 关联,然后使用 future.get(),也就是:

std::packaged_task<double(int, int)> task([](int a, int b){
    return std::pow(a, b);
});
std::future<double>future = task.get_future();
task(10, 2); // 此处执行任务
std::cout << future.get() << '\n'; // 不阻塞,此处获取返回值

先关联任务,再执行任务,当我们想要获取任务的返回值的时候,就 future.get() 即可。值得注意的是,任务并不会在线程中执行,想要在线程中执行异步任务,然后再获取返回值,我们可以这么做:

std::packaged_task<double(int, int)> task([](int a, int b){
    return std::pow(a, b);
});
std::future<double> future = task.get_future();
std::thread t{ std::move(task),10,2 }; // 任务在线程中执行
// todo.. 幻想还有许多耗时的代码
t.join();

std::cout << future.get() << '\n'; // 并不阻塞,获取任务返回值罢了

因为 task 本身是重载了 operator() 的,是可调用对象,自然可以传递给 std::thread 执行,以及传递调用参数。唯一需要注意的是我们使用了 std::move ,这是因为 std::packaged_task 只能移动,不能复制。


简而言之,其实 std::packaged_task 也就是一个“包装”类而已,它本身并没什么特殊的,老老实实执行我们传递的任务,且方便我们获取返回值罢了,明确这一点,那么一切都不成问题。

std::packaged_task 也可以在线程中传递,在需要的时候获取返回值,而非像上面那样将它自己作为可调用对象:

template<typename R, typename...Ts, typename...Args>
    requires std::invocable<std::packaged_task<R(Ts...)>&, Args...> 
void async_task(std::packaged_task<R(Ts...)>& task, Args&&...args) {
    // todo..
    task(std::forward<Args>(args)...);
}

int main() {
    std::packaged_task<int(int,int)> task([](int a,int b){
        return a + b;
    });
    
    int value = 50;
    std::future<int> future = task.get_future();
    // 创建一个线程来执行异步任务
    std::thread t{ [&] {async_task(task, value, value); } };
    std::cout << future.get() << '\n';
    t.join();
}

我们套了一个 lambda,这是因为函数模板不是函数,它并非具体类型,没办法直接被那样传递使用,只能包一层了。这只是一个简单的示例,展示可以使用 std::packaged_task 作函数形参,然后我们来传递任务进行异步调用等操作。

我们再将第二章实现的并行 sum 改成 std::package_task + std::future 的形式:

template<typename ForwardIt>
auto sum(ForwardIt first, ForwardIt last) {
    using value_type = std::iter_value_t<ForwardIt>;
    std::size_t num_threads = std::thread::hardware_concurrency();
    std::ptrdiff_t distance = std::distance(first, last);

    if (distance > 1024000) {
        // 计算每个线程处理的元素数量
        std::size_t chunk_size = distance / num_threads;
        std::size_t remainder = distance % num_threads;

        // 存储每个线程要执行的任务
        std::vector<std::packaged_task<value_type()>> tasks;
        // 和每一个任务进行关联的 future 用于获取返回值
        std::vector<std::future<value_type>> futures(num_threads);

        // 存储关联线程的线程对象
        std::vector<std::thread> threads;

        // 制作任务、与 future 关联、启动线程执行
        auto start = first;
        for (std::size_t i = 0; i < num_threads; ++i) {
            auto end = std::next(start, chunk_size + (i < remainder ? 1 : 0));
            tasks.emplace_back(std::packaged_task<value_type()>{[start, end, i] {
                return std::accumulate(start, end, value_type{});
            }});
            start = end; // 开始迭代器不断向前
            futures[i] = tasks[i].get_future(); // 任务与 std::future 关联
            threads.emplace_back(std::move(tasks[i]));
        }

        // 等待所有线程执行完毕
        for (auto& thread : threads)
            thread.join();

        // 汇总线程的计算结果
        value_type total_sum {};
        for (std::size_t i = 0; i < num_threads; ++i) {
            total_sum += futures[i].get();
        }
        return total_sum;
    }

    value_type total_sum = std::accumulate(first, last, value_type{});
    return total_sum;
}

相比于之前,其实不同无非是定义了 std::vector<std::packaged_task<value_type()>> tasks 与 std::vector<std::future<value_type>> futures ,然后在循环中制造任务插入容器,关联 future,再放到线程中执行。最后汇总的时候写一个循环,futures[i].get() 获取任务的返回值加起来即可。

到此,也就可以了。

使用 std::promise

// 计算函数,接受一个整数并返回它的平方
void calculate_square(std::promise<int> promiseObj, int num) {
    // 模拟一些计算
    std::this_thread::sleep_for(std::chrono::seconds(1));

    // 计算平方并设置值到 promise 中
    promiseObj.set_value(num * num);
}

// 创建一个 promise 对象,用于存储计算结果
std::promise<int> promise;

// 从 promise 获取 future 对象进行关联
std::future<int> future = promise.get_future();

// 启动一个线程进行计算
int num = 5;
std::thread t(calculate_square, std::move(promise), num);

// 阻塞,直到结果可用
int result = future.get();
std::cout << num << " 的平方是:" << result << std::endl;

t.join();

void throw_function(std::promise<int> prom) {
    try {
        throw std::runtime_error("一个异常");
    }
    catch (...) {
        prom.set_exception(std::current_exception());
    }
}

int main() {
    std::promise<int> prom;
    std::future<int> fut = prom.get_future();

    std::thread t(throw_function, std::move(prom));

    try {
        std::cout << "等待线程执行,抛出异常并设置\n";
        fut.get();
    }
    catch (std::exception& e) {
        std::cerr << "来自线程的异常: " << e.what() << '\n';
    }
    t.join();
}
等待线程执行,抛出异常并设置
来自线程的异常: 一个异常

你可能对这段代码还有一些疑问:我们写的是 promise<int> ,但是却没有使用 set_value 设置值,你可能会想着再写一行 prom.set_value(0)?

简而言之,set_value 与 set_exception 二选一,如果先前调用了 set_value ,就不可再次调用 set_exception,反之亦然(不然就会抛出异常),示例如下:

void throw_function(std::promise<int> prom) {
    prom.set_value(100);
    try {
        throw std::runtime_error("一个异常");
    }
    catch (...) {
        try{
            // 共享状态的 promise 已存储值,调用 set_exception 产生异常
            prom.set_exception(std::current_exception());
        }catch (std::exception& e){
            std::cerr << "来自 set_exception 的异常: " << e.what() << '\n';
        }
    }
}

int main() {
    std::promise<int> prom;
    std::future<int> fut = prom.get_future();

    std::thread t(throw_function, std::move(prom));
    
    std::cout << "等待线程执行,抛出异常并设置\n";
    std::cout << "值:" << fut.get() << '\n'; // 100

    t.join();
}
等待线程执行,抛出异常并设置
值:100
来自 set_exception 的异常: promise already satisfied

future 的状态变化

需要注意的是,future 是一次性的,所以你需要注意移动。并且,调用 get 函数后,future 对象也会失去共享状态。

  • 移动语义:这一点很好理解并且常见,因为移动操作标志着所有权的转移,意味着 future 不再拥有共享状态(如之前所提到)。get 和 wait 函数要求 future 对象拥有共享状态,否则会抛出异常。

  • 共享状态失效:调用 get 成员函数时,future 对象必须拥有共享状态,但调用完成后,它就会失去共享状态,不能再次调用 get。这是我们在本节需要特别讨论的内容。

std::future<void>future = std::async([] {});
std::cout << std::boolalpha << future.valid() << '\n'; // true
future.get();
std::cout << std::boolalpha << future.valid() << '\n'; // false
try {
    future.get(); // 抛出 future_errc::no_state 异常
}
catch (std::exception& e) {
    std::cerr << e.what() << '\n';
}
// std::future<void>
void get() {
    // block until ready then return or throw the stored exception
    future _Local{_STD move(*this)};
    _Local._Get_value();
}
// std::future<T>
_Ty get() {
    // block until ready then return the stored result or throw the stored exception
    future _Local{_STD move(*this)};
    return _STD move(_Local._Get_value());
}
// std::future<T&>
_Ty& get() {
    // block until ready then return the stored result or throw the stored exception
    future _Local{_STD move(*this)};
    return *_Local._Get_value();
}

如上所示,我们展示了 std::future 的所有特化中 get 成员函数的实现。注意到了吗?尽管我们可能不了解移动构造函数的具体实现,但根据通用的语义,可以看出 future _Local{_STD move(*this)}; 将当前对象的共享状态转移给了这个局部对象,而局部对象在函数结束时析构。这意味着当前对象失去共享状态,并且状态被完全销毁。

另外一提,std::future<T> 这个特化,它 return std::move 是为了支持只能移动的类型能够使用 get 返回值,参见前文的 move_only 类型。

如果需要进行多次 get 调用,可以考虑使用下文提到的 std::shared_future。

多个线程的等待 std::shared_future

之前的例子中我们一直使用 std::future,但 std::future 有一个局限:future 是一次性的,它的结果只能被一个线程获取。get() 成员函数只能调用一次,当结果被某个线程获取后,std::future 就无法再用于其他线程。

int task(){
    // todo..
    return 10;
}

void thread_functio(std::future<int>& fut){
    // todo..
    int result = fut.get();
    std::cout << result << '\n';
    // todo..
}

int main(){
    auto future = std::async(task); // 启动耗时的异步任务

    // 可能有多个线程都需要此任务的返回值,于是我们将与其关联的 future 对象的引入传入
    std::thread t{ thread_functio,std::ref(future) };
    std::thread t2{ thread_functio,std::ref(future) };
    t.join();
    t2.join();
}

可能有多个线程都需要耗时的异步任务的返回值,于是我们将与其关联的 future 对象的引入传给线程对象,让它能在需要的时候获取。

但是这存在个问题,future 是一次性的,只能被调用一次 get() 成员函数,所以以上代码存在问题。

此时就需要使用 std::shared_future 来替代 std::future 了。std::future 与 std::shared_future 的区别就如同 std::unique_ptr、std::shared_ptr 一样。

std::future 是只能移动的,其所有权可以在不同的对象中互相传递,但只有一个对象可以获得特定的同步结果。而 std::shared_future 是可复制的,多个对象可以指代同一个共享状态。

在多个线程中对同一个 std::shared_future 对象进行操作时(如果没有进行同步保护)存在条件竞争。而从多个线程访问同一共享状态,若每个线程都是通过其自身的 shared_future 对象副本进行访问,则是安全的。

std::string fetch_data() {
    std::this_thread::sleep_for(std::chrono::seconds(1)); // 模拟耗时操作
    return "从网络获取的数据!";
}

int main() {
    std::future<std::string> future_data = std::async(std::launch::async, fetch_data);

    // // 转移共享状态,原来的 future 被清空  valid() == false
    std::shared_future<std::string> shared_future_data = future_data.share();

    // 第一个线程等待结果并访问数据
    std::thread thread1([&shared_future_data] {
        std::cout << "线程1:等待数据中..." << std::endl;
        shared_future_data.wait();
        std::cout << "线程1:收到数据:" << shared_future_data.get() << std::endl;
    });

    // 第二个线程等待结果并访问数据
    std::thread thread2([&shared_future_data] {
        std::cout << "线程2:等待数据中..." << std::endl;
        shared_future_data.wait();
        std::cout << "线程2:收到数据:" << shared_future_data.get() << std::endl;
    });

    thread1.join();
    thread2.join();
}

这段代码存在数据竞争,就如同我们先前所说:“在多个线程中对同一个 std::shared_future 对象进行操作时(如果没有进行同步保护)存在条件竞争”,它并没有提供线程安全的方式。而我们的 lambda 是按引用传递,也就是“同一个”进行操作了。可以改为:

std::string fetch_data() {
    std::this_thread::sleep_for(std::chrono::seconds(1)); // 模拟耗时操作
    return "从网络获取的数据!";
}

int main() {
    std::future<std::string> future_data = std::async(std::launch::async, fetch_data);

    std::shared_future<std::string> shared_future_data = future_data.share();

    std::thread thread1([shared_future_data] {
        std::cout << "线程1:等待数据中..." << std::endl;
        shared_future_data.wait();
        std::cout << "线程1:收到数据:" << shared_future_data.get() << std::endl;
    });

    std::thread thread2([shared_future_data] {
        std::cout << "线程2:等待数据中..." << std::endl;
        shared_future_data.wait();
        std::cout << "线程2:收到数据:" << shared_future_data.get() << std::endl;
    });

    thread1.join();
    thread2.join();
}

这样访问的就都是 std::shared_future 的副本了,我们的 lambda 按复制捕获 std::shared_future 对象,每个线程都有一个 shared_future 的副本,这样不会有任何问题。这一点和 std::shared_ptr 类似。

std::promise<std::string> p;
std::shared_future<std::string> sf{ p.get_future() }; // 隐式转移所有权

就不需要再强调了。

限时等待

阻塞调用会将线程挂起一段(不确定的)时间,直到对应的事件发生。通常情况下,这样的方式很好,但是在一些情况下,需要限定线程等待的时间,因为无限期地等待事件发生可能会导致性能下降或资源浪费。一个常见的例子是在很多网络库中的 connect 函数,这个函数调用是阻塞的,但是也是限时的,一定时间内没有连接到服务器就不会继续阻塞了,会进行其它处理,比如抛出异常。

时钟

在 C++ 标准库中,时钟被视为时间信息的来源。C++ 定义了很多种时间类型,每种时钟类型都提供了四种不同的信息:

  • 当前时间

  • 时间类型

  • 时钟节拍

  • 稳定时钟

时钟节拍被指定为 1/x(x 在不同硬件上有不同的值)秒,这是由时间周期所决定。假设一个时钟一秒有 25 个节拍,因此一个周期为 std::ratio<1,25> 。当一个时钟的时钟节拍每 2.5 秒一次,周期就可以表示为 std::ratio<5,2>。

template<class Rep, class Period = std::ratio<1>>
class duration;

如你所见,它默认的时钟节拍是 1,这是一个很重要的类,标准库通过它定义了很多的时间类型,比如 std::chrono::minutes 是分钟类型,那么它的 Period 就是 std::ratio<60> ,因为一分钟等于 60 秒。

using minutes      = duration<int, ratio<60>>;
auto now = std::chrono::system_clock::now();
time_t now_time = std::chrono::system_clock::to_time_t(now);
std::cout << "Current time:\t" << std::put_time(std::localtime(&now_time), "%H:%M:%S\n");

auto now2 = std::chrono::steady_clock::now();
now_time = std::chrono::system_clock::to_time_t(now);
std::cout << "Current time:\t" << std::put_time(std::localtime(&now_time), "%H:%M:%S\n");

C++ 的时间库极其繁杂,主要在于类型之多,以及实现之复杂。根据我们的描述,了解基本构成、概念、使用,即可。

时间段

时间部分最简单的就是时间段,主要的内容就是我们上面讲的类模板 std::chrono::duration ,它用于对时间段进行处理。

它的第一个参数是类型表示,第二个参数就是先前提到的“节拍”,需要传递一个 std::ratio 类型,也就是一个时钟所用的秒数。

using nanoseconds  = duration<long long, nano>;
using microseconds = duration<long long, micro>;
using milliseconds = duration<long long, milli>;
using seconds      = duration<long long>;
using minutes      = duration<int, ratio<60>>;
using hours        = duration<int, ratio<3600>>;
// CXX20
using days   = duration<int, ratio_multiply<ratio<24>, hours::period>>;
using weeks  = duration<int, ratio_multiply<ratio<7>, days::period>>;
using years  = duration<int, ratio_multiply<ratio<146097, 400>, days::period>>;
using months = duration<int, ratio_divide<years::period, ratio<12>>>;

如果没有指明 duration 的第二个非类型模板参数,那么代表默认 std::ratio<1>,比如 seconds 也就是一秒。

using milli = ratio<1, 1000>; // 千分之一秒,也就是一毫秒了

并且为了方便使用,在 C++14 标准库增加了时间字面量,存在于 std::chrono_literals 命名空间中,让我们得以简单的使用:

using namespace std::chrono_literals;

auto one_nanosecond = 1ns;
auto one_microsecond = 1us;
auto one_millisecond = 1ms;
auto one_second = 1s;
auto one_minute = 1min;
auto one_hour = 1h;
std::chrono::milliseconds ms{ 3999 };
std::chrono::seconds s = std::chrono::duration_cast<std::chrono::seconds>(ms);
std::cout << s.count() << '\n';

这里的结果是截断的,而不会进行所谓的四舍五入,3999 毫秒,也就是 3.999 秒最终的值是 3。

很多时候这并不是我们想要的,比如我们想要的其实是输出 3.999 秒,而不是 3 秒 或者 3999 毫秒。

seconds 是 duration<long long> 这意味着它无法接受浮点数,我们直接改成 duration<double> 即可:

std::chrono::duration<double> s = std::chrono::duration_cast<std::chrono::duration<double>>(ms);

当然了,这样写很冗余,并且这种形式的转换是可以直接隐式的,也就是其实我们可以直接:

std::chrono::duration<double> s = ms;

无需使用 duration_cast,可以直接隐式转换。

_EXPORT_STD template <class _Rep, class _Period = ratio<1>>
class duration;

基于时间段的等待都是由 std::chrono::duration<> 来完成。例如:等待一个 future 对象在 35 毫秒内变为就绪状态:

std::future<int> future = std::async([] {return 6; });
if (future.wait_for(35ms) == std::future_status::ready)
    std::cout << future.get() << '\n';

deferred

共享状态持有的函数正在延迟运行,结果将仅在明确请求时计算

ready

共享状态就绪

timeout

共享状态在经过指定的等待时间内仍未就绪

timeout 超时,也很好理解,那我们就提一下 deferred :

auto future = std::async(std::launch::deferred, []{});
if (future.wait_for(35ms) == std::future_status::deferred)
    std::cout << "future_status::deferred " << "正在延迟执行\n";
future.wait(); // 在 wait() 或 get() 调用时执行,不创建线程

时间点

template<
    class Clock,
    class Duration = typename Clock::duration
> class time_point;

如你所见,它的第二个模板参数是时间段,就是时间的间隔,其实也就可以理解为表示时间点的精度,默认是根据第一个参数时钟得到的,所以假设有类型:

std::chrono::time_point<std::chrono::system_clock>

那它等价于:

std::chrono::time_point<std::chrono::system_clock, std::chrono::system_clock::duration>

也就是说第二个参数的实际类型是:

std::chrono::duration<long long,std::ratio<1, 10000000>> //  // 100 nanoseconds

也就是说 std::chrono::time_point<std::chrono::system_clock> 的精度是 100 纳秒。

注意,这里的精度并非是实际的时间精度。时间和硬件系统等关系极大,以 windows 为例:

Windows 内核中的时间间隔计时器默认每隔 15.6 毫秒触发一次中断。因此,如果你使用基于系统时钟的计时方法,默认情况下精度约为 15.6 毫秒。不可能达到纳秒级别。

由于这个系统时钟的限制,那些基于系统时钟的 API(例如 Sleep()、WaitForSingleObject() 等)的最小睡眠时间默认就是 15.6 毫秒左右。

如:

std::this_thread::sleep_for(std::chrono::milliseconds(1));

不过我们也可以使用系统 API 调整系统时钟的精度,需要链接 windows 多媒体库 winmm.lib ,然后使用 API:

timeBeginPeriod(1); // 设置时钟精度为 1 毫秒
// todo..
timeEndPeriod(1);   // 恢复默认精度

同样的,时间点也支持加减以及比较操作。

std::chrono::steady_clock::now() + std::chrono::nanoseconds(500); // 500 纳秒之后的时间

可以减去一个时间点,结果是两个时间点的时间差。这对于代码块的计时是很有用的,如:

auto start = std::chrono::steady_clock::now();
std::this_thread::sleep_for(std::chrono::seconds(1));
auto end = std::chrono::steady_clock::now();

auto result = std::chrono::duration_cast<std::chrono::milliseconds>(end - start);
std::cout << result.count() << '\n';

我们进行了一个显式的转换,最终输出的是以毫秒作为单位,有可能不会是 1000,没有这么精确。


等待条件变量满足条件——带超时功能

using namespace std::chrono_literals;

std::condition_variable cv;
bool done{};
std::mutex m;

bool wait_loop() {
    const auto timeout = std::chrono::steady_clock::now() + 500ms;
    std::unique_lock<std::mutex> lk{ m };
    while (!done) {
        if (cv.wait_until(lk, timeout) == std::cv_status::timeout) {
            std::cout << "超时 500ms\n";
            return false;
        }
    }
    return true;
}

_until 也就是等待到一个时间点,我们设置的是等待到当前时间往后 500 毫秒。如果超过了这个时间还没有被唤醒,那就打印超时,并退出循环,函数返回 false。

到此,时间点的知识也就足够了。

异步任务执行

在开发带有 UI 的程序时,主线程用于处理 UI 更新和用户交互,如果在主线程中执行耗时任务会导致界面卡顿。因此,需要使用异步任务来减轻主线程的压力。以下是一个使用 Qt 实现异步任务的示例,展示了如何在不阻塞 UI 线程的情况下执行耗时任务,并更新进度条。

背景介绍

在 Qt 中,GUI 控件通常只能在创建它们的线程中进行操作,因为它们是线程不安全的。我们可以使用 QMetaObject::invokeMethod 来跨线程调用主线程上的控件方法,从而在其他线程中安全地更新 UI 控件。以下代码示例展示了如何通过 QMetaObject::invokeMethod 确保 UI 控件的更新操作在主线程中执行。

void task(){
    future = std::async(std::launch::async, [=] {
        QMetaObject::invokeMethod(this, [this] {
            button->setEnabled(false);
            progressBar->setRange(0, 1000);
            button->setText("正在执行...");
        });
        for (int i = 0; i < 1000; ++i) {
            std::this_thread::sleep_for(10ms);
            QMetaObject::invokeMethod(this, [this, i] {
                progressBar->setValue(i);
            });
        }
        QMetaObject::invokeMethod(this, [this] {
            button->setText("start");
            button->setEnabled(true);
        });
    });
}

上面的代码创建了一个异步任务,并指明了执行策略。任务在线程中执行,不会阻塞 UI 线程。如果不这样做,界面将会卡顿(可以尝试将函数的第一行与最后一行注释掉以验证这一点)。

在启动进度条后,能够正常点击“测试”按钮并触发弹窗,说明 UI 没有被阻塞。相反,如果不使用线程,界面将会卡住,无法点击“测试”按钮或移动窗口。

项目说明

项目使用 Visual Studio + CMake,可以直接安装 Qt 插件后打开此项目。项目结构简单,所有界面与设置均通过代码控制,无需进行其他 UI 操作。只需关注 async_progress_bar.h、async_progress_bar.cpp 和 main.cpp 这三个文件,它们位于仓库的 code 文件夹中。

完整代码实现

class async_progress_bar : public QMainWindow{
    Q_OBJECT

public:
    async_progress_bar(QWidget *parent = nullptr);
    ~async_progress_bar();

    void task(){
        future = std::async(std::launch::async, [=] {
            QMetaObject::invokeMethod(this, [this] {
                // 这里显示的线程 ID 就是主线程,代表这些任务就是在主线程,即 UI 线程执行
                QMessageBox::information(nullptr, "线程ID", std::to_string(_Thrd_id()).c_str());
                button->setEnabled(false);
                progress_bar->setRange(0, 1000);
                button->setText("正在执行...");
            });
            for (int i = 0; i <= 1000; ++i) {
                std::this_thread::sleep_for(10ms);
                QMetaObject::invokeMethod(this, [this, i] {
                    progress_bar->setValue(i);
                });
            }
            QMetaObject::invokeMethod(this, [this] {
                button->setText("start");
                button->setEnabled(true);
            });
            // 不在 invokeMethod 中获取线程 ID,这里显示的是子线程的ID
            auto s = std::to_string(_Thrd_id());
            QMetaObject::invokeMethod(this, [=] {
                QMessageBox::information(nullptr, "线程ID", s.c_str());
            });
        });
    }
private:
    QString progress_bar_style =
        "QProgressBar {"
        "    border: 2px solid grey;"
        "    border-radius: 5px;"
        "    background-color: lightgrey;"
        "    text-align: center;"  // 文本居中
        "    color: #000000;"      // 文本颜色
        "}"
        "QProgressBar::chunk {"
        "    background-color: #7FFF00;"
        "    width: 10px;"         // 设置每个进度块的宽度
        "    font: bold 14px;"     // 设置进度条文本字体
        "}";
    QString button_style =
        "QPushButton {"
        "    text-align: center;"  // 文本居中
        "}";
    QProgressBar* progress_bar{};
    QPushButton* button{};
    QPushButton* button2{};
    Ui::async_progress_barClass ui{};
    std::future<void>future;
};
// 创建控件 设置布局、样式 连接信号
async_progress_bar::async_progress_bar(QWidget *parent)
    : QMainWindow{ parent }, progress_bar{ new QProgressBar(this) },
    button{ new QPushButton("start",this) },button2{ new QPushButton("测试",this) } {
    ui.setupUi(this);

    progress_bar->setStyleSheet(progress_bar_style);
    progress_bar->setRange(0, 1000);

    button->setMinimumSize(100, 50);
    button->setMaximumWidth(100);
    button->setStyleSheet(button_style);
    button->setSizePolicy(QSizePolicy::Minimum, QSizePolicy::Fixed);

    button2->setMinimumSize(100, 50);
    button2->setMaximumWidth(100);
    button2->setStyleSheet(button_style);
    button2->setSizePolicy(QSizePolicy::Minimum, QSizePolicy::Fixed);

    QVBoxLayout* layout = new QVBoxLayout;
    layout->addWidget(progress_bar);
    layout->addWidget(button, 0, Qt::AlignHCenter);
    layout->addWidget(button2, 0, Qt::AlignHCenter);
    // 设置窗口布局为垂直布局管理器
    centralWidget()->setLayout(layout);

    connect(button, &QPushButton::clicked, this, &async_progress_bar::task);
    connect(button2, &QPushButton::clicked, []{
        QMessageBox::information(nullptr, "测试", "没有卡界面!");
    });
}

注意事项

  • QMetaObject::invokeMethod 的 lambda 是在主线程运行的,通过显示的线程 ID 可以验证这一点。

  • 使用 std::async 的 std::launch::async 参数强制异步执行任务,以确保任务在新线程中运行。

跨平台兼容性

实践建议

这个例子其实很好的展示了多线程异步的作用,因为有 UI,所以很直观,毕竟如果你不用线程,那么不就卡界面了,用了就没事。

建议下载并运行此项目,通过实际操作理解代码效果。同时,可以尝试修改代码,观察不同情况下 UI 的响应情况,以加深对异步任务处理的理解。

C++20 信号量

信号量是一个非常轻量简单的同步设施,它维护一个计数,这个计数不能小于 0。信号量提供两种基本操作:释放(增加计数)和等待(减少计数)。如果当前信号量的计数值为 0,那么执行“等待”操作的线程将会一直阻塞,直到计数大于 0,也就是其它线程执行了“释放”操作。

binary_semaphore 只是 counting_semaphore 的一个特化别名:

using binary_semaphore = counting_semaphore<1>;

好了,我们举一个简单的例子来使用一下:

// 全局二元信号量对象
// 设置对象初始计数为 0
std::binary_semaphore smph_signal_main_to_thread{ 0 };
std::binary_semaphore smph_signal_thread_to_main{ 0 };

void thread_proc() {
    smph_signal_main_to_thread.acquire();
    std::cout << "[线程] 获得信号" << std::endl;

    std::this_thread::sleep_for(3s);

    std::cout << "[线程] 发送信号\n";
    smph_signal_thread_to_main.release();
}

int main() {
    std::jthread thr_worker{ thread_proc };

    std::cout << "[主] 发送信号\n";
    smph_signal_main_to_thread.release();

    smph_signal_thread_to_main.acquire();
    std::cout << "[主] 获得信号\n";
}
[主] 发送信号
[线程] 获得信号
[线程] 发送信号
[主] 获得信号

信号量常用于发信/提醒而非互斥,通过初始化该信号量为 0 从而阻塞尝试 acquire() 的接收者,直至提醒者通过调用 release(n) “发信”。在此方面可把信号量当作条件变量的替代品,通常它有更好的性能。

假设我们有一个 Web 服务器,它只能处理有限数量的并发请求。为了防止服务器过载,我们可以使用信号量来限制并发请求的数量。

// 定义一个信号量,最大并发数为 3
std::counting_semaphore<3> semaphore{ 3 };

void handle_request(int request_id) {
    // 请求到达,尝试获取信号量
    std::cout << "进入 handle_request 尝试获取信号量\n";

    semaphore.acquire();

    std::cout << "成功获取信号量\n";

    // 此处延时三秒可以方便测试,会看到先输出 3 个“成功获取信号量”,因为只有三个线程能成功调用 acquire,剩余的会被阻塞
    std::this_thread::sleep_for(3s);

    // 模拟处理时间
    std::random_device rd;
    std::mt19937 gen{ rd() };
    std::uniform_int_distribution<> dis(1, 5);
    int processing_time = dis(gen);
    std::this_thread::sleep_for(std::chrono::seconds(processing_time));

    std::cout << std::format("请求 {} 已被处理\n", request_id);

    semaphore.release(); 
}

int main() {
    // 模拟 10 个并发请求
    std::vector<std::jthread> threads;
    for (int i = 0; i < 10; ++i) {
        threads.emplace_back(handle_request, i);
    }
}

这段代码很简单,以至于我们可以在这里来再说一条概念:

  • binary_semaphore 是 std::counting_semaphore 的特化的别名,其 LeastMaxValue 为 1。

LeastMaxValue 是我们设置的非类型模板参数,意思是信号量维护的计数最大值。我们这段代码设置的是 3,也就是允许 3 个同时访问者。

牢记信号量的基本的概念不变,计数的值不能小于 0,如果当前信号量的计数值为 0,那么执行“等待”(acquire)操作的线程将会一直阻塞。明白这点,那么就都不存在问题。

通过这种方式,可以有效控制 Web 服务器处理并发请求的数量,防止服务器过载。

C++20 闩与屏障

闩 (latch) 与屏障 (barrier) 是线程协调机制,允许任何数量的线程阻塞直至期待数量的线程到达。闩不能重复使用,而屏障则可以。

  • std::latch:单次使用的线程屏障

  • std::barrier:可复用的线程屏障

它们定义在标头 <latch> 与 <barrier>。

std::latch

“闩” ,中文语境一般说“门闩” 是指门背后用来关门的棍子。不过不用在意,在 C++ 中的意思就是先前说的:单次使用的线程屏障。

std::latch work_start{ 3 };

void work(){
    std::cout << "等待其它线程执行\n";
    work_start.wait(); // 等待计数为 0
    std::cout << "任务开始执行\n";
}

int main(){
    std::jthread thread{ work };
    std::this_thread::sleep_for(3s);
    std::cout << "休眠结束\n";
    work_start.count_down();  // 默认值是 1 减少计数 1
    work_start.count_down(2); // 传递参数 2 减少计数 2
}
等待其它线程执行
休眠结束
任务开始执行

在这个例子中,通过调用 wait 函数阻塞子线程,直到主线程调用 count_down 函数原子地将计数减至 0,从而解除阻塞。这个例子清楚地展示了 latch 的使用,其逻辑比信号量更简单。


由于 latch 的计数不可增加,它的使用通常非常简单,可以用来划分任务执行的工作区间。例如:

std::latch latch{ 10 };

void f(int id) {
    //todo.. 脑补任务
    std::this_thread::sleep_for(1s);
    std::cout << std::format("线程 {} 执行完任务,开始等待其它线程执行到此处\n", id);
    latch.arrive_and_wait();
    std::cout << std::format("线程 {} 彻底退出函数\n", id);
}

int main() {
    std::vector<std::jthread> threads;
    for (int i = 0; i < 10; ++i) {
        threads.emplace_back(f,i);
    }
}

必须等待所有线程执行到 latch.arrive_and_wait(); 将 latch 的计数减少至 0 才能继续往下执行。这个示例非常直观地展示了如何使用 latch 来划分任务执行的工作区间。

由于 latch 的功能受限,通常用于简单直接的需求,不少情况很多同步设施都能完成你的需求,在这个时候请考虑使用尽可能功能最少的那一个。

  • 使用功能尽可能少的设施有助于开发者阅读代码理解含义。如果使用的是一个功能丰富的设施,可能就无法直接猜测其意图。

std::barrier

上节我们学习了 std::latch ,本节内容也不会对你构成难度。

template< class CompletionFunction = /* 未指定 */ >
class barrier;

CompletionFunction - 函数对象类型。


std::barrier barrier{ 10,
    [n = 1]()mutable noexcept {std::cout << "\t第" << n++ << "轮结束\n"; }
};

void f(int start, int end){
    for (int i = start; i <= end; ++i) {
        std::osyncstream{ std::cout } << i << ' '; 
        barrier.arrive_and_wait(); // 减少计数并等待 解除阻塞时就重置计数并调用函数对象
        
        std::this_thread::sleep_for(300ms);
    }
}

int main(){
    std::vector<std::jthread> threads;
    for (int i = 0; i < 10; ++i) {
        threads.emplace_back(f, i * 10 + 1, (i + 1) * 10);
    }
}
1 21 11 31 41 51 61 71 81 91    第1轮结束
12 2 22 32 42 52 62 72 92 82    第2轮结束
13 63 73 33 23 53 83 93 43 3    第3轮结束
14 44 24 34 94 74 64 4 84 54    第4轮结束
5 95 15 45 75 25 55 65 35 85    第5轮结束
6 46 16 26 56 96 86 66 76 36    第6轮结束
47 17 57 97 87 67 77 7 27 37    第7轮结束
38 8 28 78 68 88 98 58 18 48    第8轮结束
9 39 29 69 89 99 59 19 79 49    第9轮结束
30 40 70 10 90 50 60 20 80 100  第10轮结束

注意输出的规律,第一轮每个数字最后一位都是 1,第二轮每个数字最后一位都是 2……以此类推,因为我们分配给每个线程的输出任务就是如此,然后利用了屏障一轮一轮地打印。

arrive_and_wait 等价于 wait(arrive());。原子地将期待计数减少 1,然后在当前阶段的同步点阻塞直至运行当前阶段的阶段完成步骤。

arrive_and_wait() 会在期待计数减少至 0 时调用我们构造 barrier 对象时传入的 lambda 表达式,并解除所有在阶段同步点上阻塞的线程。之后重置期待计数为构造中指定的值。屏障的一个阶段就完成了。

  • 并发调用barrier 除了析构函数外的成员函数不会引起数据竞争。

虽然 std::cout 的 operator<< 调用是线程安全的,不会被打断,但多个 operator<< 的调用在多线程环境中可能会交错,导致输出结果混乱,使用 std::osyncstream 就可以解决这个问题。开发者可以尝试去除 std::osyncstream 直接使用 std::cout,效果会非常明显。


使用 arrive 或 arrive_and_wait 减少的都是当前屏障计数,我们称作“期待计数”。不管如何减少计数,当完成一个阶段,就重置期待计数为构造中指定的值了。

不用感到难以理解,我们来解释一下这个概念:

std::barrier barrier{ 4 }; // 初始化计数为 4 完成阶段重置计数也是 4
barrier.arrive_and_wait(); // 当前计数减 1,不影响之后重置计数 4
barrier.arrive_and_drop(); // 当前计数与重置之后的计数均减 1 完成阶段会重置计数为 3

arrive_and_drop 可以用来控制在需要的时候,让一些线程退出同步,如:

std::atomic_int active_threads{ 4 };
std::barrier barrier{ 4,
    [n = 1]() mutable noexcept {
        std::cout << "\t第" << n++ << "轮结束,活跃线程数: " << active_threads << '\n';
    }
};

void f(int thread_id) {
    for (int i = 1; i <= 5; ++i) {
        std::osyncstream{ std::cout } << "线程 " << thread_id << " 输出: " << i << '\n';
        if (i == 3 && thread_id == 2) {  // 假设线程 ID 为 2 的线程在完成第三轮同步后退出
            std::osyncstream{ std::cout } << "线程 " << thread_id << " 完成并退出\n";
            --active_threads; // 减少活跃线程数
            barrier.arrive_and_drop(); // 减少当前计数 1,并减少重置计数 1
            return;
        }
        barrier.arrive_and_wait(); // 减少计数并等待,解除阻塞时重置计数并调用函数对象
    }
}

int main() {
    std::vector<std::jthread> threads;
    for (int i = 1; i <= 4; ++i) {
        threads.emplace_back(f, i);
    }
}

初始线程有 4 个,线程 2 在执行了三轮同步便直接退出了,调用 arrive_and_drop 函数,下一个阶段的计数会重置为 3,也就是执行完第三轮同步后只有三个活跃线程继续执行。查看输出结果,非常的直观。

这样,arrive_and_drop 的作用就非常明显了,使用也十分的简单。


std::barrier barrier{ 1,[] {} };

按照标准规定,这行代码会产生一个编译错误。因为传入的函数对象它不是 noexcept 的。不过,在 gcc 与 clang(即 libstdc++ 和 libc++)均可以通过编译,这是因为它们没有进行相应的检测,存在缺陷,为了代码的可维护性开发者应遵守标准规定,确保传入的函数对象是 noexcept 的。

总结

在并发编程中,同步操作对于并发编程至关重要。如果没有同步,线程基本上就是独立的,因其任务之间的相关性,才可作为一个整体执行(比如第二章的并行求和)。本章讨论了多种用于同步操作的工具,包括条件变量、future、promise、package_task、信号量。同时,详细介绍了 C++ 时间库的知识,以使用并发支持库中的“限时等待”。还使用 CMake + Qt 构建了一个带有 UI 界面的示例,展示异步多线程的必要性。最后介绍了 C++20 引入的两种新的并发设施,信号量、闩与屏障。

在讨论了 C++ 中的高级工具之后,现在让我们来看看底层工具:C++ 内存模型与原子操作。

C++ 标准库对条件变量有两套实现: 和 ,这两个实现都包含在 头文件中。

condition_variable_any 类是 std::condition_variable 的泛化。相对于只在 std::unique_lock<std::mutex> 上工作的 std::condition_variable,condition_variable_any 能在任何满足要求的锁上工作,所以增加了 _any 后缀。显而易见,这种区分必然是 any 版更加通用但是却有更多的性能开销。所以通常首选 std::condition_variable。有特殊需求,才会考虑 std::condition_variable_any。

测试。更换为 std::condition_variable_any 效果。

条件变量的 成员函数有两个版本,以上代码使用的就是第二个版本,传入了一个。

第二个版本只是对第一个版本的包装,等待并判断谓词,会调用第一个版本的重载。这可以避免“”。

我们也可以简单看一下 MSVC STL 的:

可能的结果是:

我们引入 三方库进行声音播放,然后再自己进行上层封装。

其实这段代码还存在着一个初始化顺序导致的问题,见

需要注意的是 SFML不支持 .mp3 格式的音频文件,大家可以使用 ffmpeg 或者其它软件将音频转换为支持的格式。

如果是测试使用,不知道去哪生成这些语音播报,我们推荐 。

我们的代码也可以在 Linux 中运行,并且整体仅需 C++11 标准(除了 soundResources 数组)。 SFML 依赖于 和 这两个库。官网上的 windows 版本的 SFML 已包含这些依赖,但在 Linux 上需要用户自行下载并安装它们。如:

举个例子:我们在车站等车,你可能会做一些别的事情打发时间,比如学习、观看 的视频教程、玩手机等。不过,你始终在等待一件事情:车到站。

C++ 标准库将这种事件称为 。它用于处理线程中需要等待某个事件的情况,线程知道预期结果。等待的同时也可以执行其它的任务。

C++ 标准库有两种 future,都声明在 头文件中:独占的 、共享的 。它们的区别与 std::unique_ptr 和 std::shared_ptr 类似。std::future 只能与单个指定事件关联,而 std::shared_future 能关联多个事件。它们都是模板,它们的模板类型参数,就是其关联的事件(函数)的返回类型。当多个线程需要访问一个独立 future 对象时, 必须使用互斥量或类似同步机制进行保护。而多个线程访问同一共享状态,若每个线程都是通过其自身的 shared_future 对象副本进行访问,则是安全的。

假设需要执行一个耗时任务并获取其返回值,但是并不急切的需要它。那么就可以启动新线程计算,然而 std::thread 没提供直接从线程获取返回值的机制。所以我们可以使用 函数模板。

使用 std::async 启动一个异步任务,它会返回一个 std::future 对象,这个对象和任务关联,将持有最终计算出来的结果。当需要任务执行完的结果的时候,只需要调用 成员函数,就会阻塞直到 future 为就绪为止(即任务执行完毕),返回执行结果。 成员函数检查 future 当前是否关联共享状态,即是否当前关联任务。还未关联,或者任务已经执行完(调用了 get()、set()),都会返回 false。

测试。

与 std::thread 一样,std::async 支持任意对象,以及传递调用参数。包括支持使用 std::ref ,以及支持只能移动的类型。我们下面详细聊一下 std::async 参数传递的事。

测试。

如你所见,它支持所有对象,并且也是默认按值复制,必须使用 std::ref 才能传递引用。并且它和 std::thread 一样,内部会将保有的参数副本转换为右值表达式进行传递,这是为了那些只支持移动的类型,左值引用没办法引用右值表达式,所以如果不使用 std::ref,这里 void f(int&) 就会导致编译错误,如果是 void f(const int&) 则可以通过编译,不过引用的不是我们传递的局部对象。

测试。

接下来我们聊 std::async 的,我们前面一直没有使用,其实就是在传递可调用对象与参数之前传递枚举值罢了:

而我们先前一直没有写明这个参数,是因为 std::async 函数模板有两个重载,不给出执行策略就是以:std::launch::async | std::launch::deferred 调用另一个重载版本(这一点中在中很明显),此策略表示由实现选择到底是否创建线程执行异步任务。典型情况是,如果系统资源充足,并且异步任务的执行不会导致性能问题,那么系统可能会选择在新线程中执行任务。但是,如果系统资源有限,或者延迟执行可以提高性能或节省资源,那么系统可能会选择延迟执行。

如果你阅读 的代码,会发现的确如此。

然而值得注意的是,在 MSVC STL 的实现中,launch::async | launch::deferred 与 launch::async 执行策略毫无区别,如下:

测试。

如果从 std::async 获得的 没有被移动或绑定到引用,那么在完整表达式结尾, std::future 的**将阻塞,直到到异步任务完成**。因为临时对象的生存期就在这一行,而对象生存期结束就会调用调用析构函数。

类模板 包装任何目标(函数、lambda 表达式、bind 表达式或其它函数对象),使得能异步调用它。其返回值或所抛异常被存储于能通过 对象访问的共享状态中。

它有 的重载,它会执行我们传递的对象,不过这个重载的返回类型是 void 没办法获取返回值。

测试。

测试。

测试。

测试。

类模板 用于存储一个值或一个异常,之后通过 std::promise 对象所创建的 对象异步获得。

测试。

我们在新线程中通过调用 函数设置 promise 的值,并在主线程中通过与其关联的 future 对象的 get() 成员函数获取这个值,如果promise的值还没有被设置,那么将阻塞当前线程,直到被设置为止。同样的 std::promise 只能移动,不可复制,所以我们使用了 std::move 进行传递。

除了 set_value() 函数外,std::promise 还有一个 成员函数,它接受一个 类型的参数,这个参数通常通过 获取,用于指示当前线程中抛出的异常。然后,std::future 对象通过 get() 函数获取这个异常,如果 promise 所在的函数有异常被抛出,则 std::future 对象会重新抛出这个异常,从而允许主线程捕获并处理它。

:

共享状态的 promise 已经存储值或者异常,再次调用 set_value(set_exception) 会抛出 异常,将错误码设置为 。这是因为 std::promise 对象只能是存储值或者异常其中一种,而无法共存。

:

测试。

这个问题在许多文档中没有明确说明,但通过阅读源码(),可以很清楚地理解:

std::promise 也同,它的 get_future() 成员函数一样可以用来构造 std::shared_future,虽然它的返回类型是 std::future,不过不影响,这是因为 std::shared_future 有一个 std::future<T>&& 参数的,转移 std::future 的所有权。

介绍两种指定超时的方式,一种是“时间段”,另一种是“时间点”,其实就是先前讲的 与 的区别。前者是需要指定等待一段时间(比如 10 毫秒)。而后者是指定等待到一个具体的时间点(比如到 2024-05-07T12:01:10.123)。多数函数都对两种超时方式进行处理。处理持续时间的函数以 _for 作为后缀,处理绝对时间的函数以 _until 作为后缀。

条件变量 std::condition_variable 的等待函数,也有两个超时的版本 和 。它们和我们先前讲的 wait 成员函数一样有两个重载,可以选择是否传递一个。它们相比于 wait 多了一个解除阻塞的可能,即:超过指定的时长或抵达指定的时间点。

在讲述它的使用细节之前,我们还是要来先聊一下 C++ 中的(chrono),指定时间的方式,它较为麻烦。我们分:时钟(clock)、时间段(duration)、*时间点(time point)*三个阶段稍微介绍一下。

当前时间可以通过静态成员函数 now 获取,例如, 会返回系统的当前时间。特定的时间点则可以通过 来指定。system_clock::now() 的返回类型就是 time_point。

类模板 表示时间间隔。

是一个分数类模板,它有两个非类型模板参数,也就是分子与分母,分母有默认实参 1,所以 std::ratio<1> 等价于 std::ratio<1,1>。

稳定时钟(Steady Clock)是指提供稳定、持续递增的时间流逝信息的时钟。它的特点是不受系统时间调整或变化的影响,即使在系统休眠或时钟调整的情况下,它也能保持稳定。在 C++ 标准库中, 就是一个稳定时钟。它通常用于测量时间间隔和性能计时等需要高精度和稳定性的场景。可以通过 is_steady 静态常量判断当前时钟是否是稳定时钟。

稳定时钟的主要优点在于,它可以提供相对于起始时间的稳定的递增时间,因此适用于需要保持时间顺序和不受系统时间变化影响的应用场景。相比之下,像 这样的系统时钟可能会受到系统时间调整或变化的影响,因此在某些情况下可能不适合对时间间隔进行精确测量。

不管使用哪种时钟获取时间,C++ 都提供了函数,可以将时间点转换为 类型的值:

标准库在 std::chrono 命名空间内为时间段提供了一系列的类型,它们都是通过 std::chrono::duration 定义的:

如上,是 MSVC STL 定义的,看似有一些没有使用 ratio 作为第二个参数,其实也还是别名罢了,:

当不要求截断值的情况下(时转换为秒时没问题的,但反过来不行)时间段有隐式转换,显式转换可以由 来完成。

另外我们用的 duration 都是省略了 ratio 的,其实默认类型就是 ratio<1>,代表一秒。参见源码:

时间库支持四则运算,可以对两个时间段进行加减乘除。时间段对象可以通过 成员函数获得计次数。例如 std::chrono::milliseconds{123}.count() 的结果就是 123。

: 等待结果,如果在指定的超时间隔后仍然无法得到结果,则返回。它的返回类型是一个枚举类 ,三个枚举项分别表示三种 future 状态。

时间点可用 来表示,第一个模板参数用来指定使用的时钟,第二个模板参数用来表示时间单位(std::chrono::duration<>)。时间点顾名思义就是时间中的一个点,在 C++ 中用于表达当前时间,先前提到的静态成员函数 now() 获取当前时间,它们的返回类型都是 std::chrono::time_point。

更多的问题参见都很直观。

测试。

测试。

C++11 的 std::this_thread::get_id() 返回的内部类 没办法直接转换为 unsigned int,我们就直接使用了 win32 的 API _Thrd_id() 了。如果您是 Linux 之类的环境,使用 接口 。

C++20 引入了信号量,对于那些熟悉操作系统或其它并发支持库的开发者来说,这个同步设施的概念应该不会感到陌生。源自操作系统,是一个古老而广泛应用的同步设施,在各种编程语言中都有自己的抽象实现。然而,C++ 标准库对其的支持却来得很晚,在 C++20 中才得以引入。

C++ 提供了两个信号量类型:std::counting_semaphore 与 std::binary_semaphore,定义在 中。

:

函数就是我们先前说的“等待”(原子地减少计数), 函数就是"释放"(原子地增加计数)。

测试。

counting_semaphore 是一个轻量同步原语,能控制对共享资源的访问。不同于 ,counting_semaphore 允许同一资源进行多个并发的访问,至少允许 LeastMaxValue 个同时访问者。

虽然说是说有 LeastMaxValue 可能不是最大,但是我们通常不用在意这个事情,中 max 函数就是直接返回 LeastMaxValue,将它视为信号量维护的计数最大值即可。

与信号量类似,屏障也是一种古老而广泛应用的同步机制。许多系统 API 提供了对屏障机制的支持,例如 POSIX 和 Win32。此外, 也提供了屏障机制来支持多线程编程。

latch 类维护着一个 类型的计数,且只能减少计数,无法增加计数。在创建对象的时候初始化计数器的值。线程可以阻塞,直到 latch 对象的计数减少到零。由于无法增加计数,这使得 latch 成为一种单次使用的屏障。

:

测试。

函数等价于:count_down(n); wait();。也就是减少计数 + 等待。这意味着

和 std::latch 最大的不同是,前者可以在阶段完成之后将计数重置为构造时传递的值,而后者只能减少计数。我们用一个非常简单直观的示例为你展示:

可能的:

另外你可能注意到我们使用了 ,它是 C++20 引入的,此处是确保输出流在多线程环境中同步,避免除数据竞争,而且将不以任何方式穿插或截断。

标准库还提供一个函数 可以改变重置的计数值:它将所有后继阶段的初始期待计数减少一,当前阶段的期待计数也减少一。

测试。

最后请注意,我们的 lambda 表达式必须声明为 noexcept ,因为 std::barrier 要求其函数对象类型必须是不抛出异常的。即要求 std::is_nothrow_invocable_v<_Completion_function&> 为 true,见 。

忙等待
std::condition_variable
std::condition_variable_any
<condition_variable>
可基本锁定(BasicLockable)
运行
相同
wait
谓词
虚假唤醒(spurious wakeup)
源码实现
运行
SFML
#27
网站
tts-vue
FLAC
OpenAL
下载
现代 C++ 模板教程
mq白
future
<future>
std::future
std::shared_future
std::async
get()
valid()
运行
可调用(Callable)
运行
可调用(Callable)
运行
执行策略
源码
libstdc++
源码
运行
std::future
析构函数
std::packaged_task
可调用(Callable)
std::future
operator()
可调用(Callable)
运行
运行
运行
运行
std::promise
std::future
运行
set_value()
set_exception()
std::exception_ptr
std::current_exception()
运行结果
std::future_error
promise_already_satisfied
运行结果
运行
MSVC STL
构造函数
std::this::thread::sleep_for
std::this_thread::sleep_until
wait_for
wait_until
谓词
时间库
std::chrono::system_clock::now()
time_point
std::chrono::duration
std::ratio
std::chrono::steady_clock
std::chrono::system_clock
time_t
别名
见
std::chrono::duration_cast<>
声明
count()
wait_for
std::future_status
std::chrono::time_point<>
源码
运行
运行
std::thread::id
POSIX
pthread_self()
信号量
<semaphore>
运行结果
acquire
release
运行
std::mutex
MSVC STL 的实现
OpenMP
std::ptrdiff_t
运行结果
运行
arrive_and_wait
std::barrier
运行结果
std::osyncstream
arrive_and_drop
运行
MSVC STL
进度条