使用线程

在标准 C++ 中,std::thread 可以指代线程,本章中,使用线程也就是代表使用 std::thread (C++20 std::jthread)。

本章围绕着它们来讲解。

Hello World

在我们初学 C++ 的时候应该都写过这样一段代码:

#include <iostream>

int main(){
    std::cout << "Hello World!" << std::endl;
}

这段代码将"Hello World!"写入到标准输出流,换行并刷新

我们可以启动一个线程来做这件事情:

#include <iostream>
#include <thread>  // 引入线程支持头文件

void hello(){     // 定义一个函数用作打印任务
    std::cout << "Hello World" << std::endl;
}

int main(){
    std::thread t{ hello };
    t.join();
}

std::thread t{ hello }; 创建了一个线程对象 t,将 hello 作为它的可调用(Callable)对象,在新线程中执行。线程对象关联了一个线程资源,我们无需手动控制,在线程对象构造成功,就自动在新线程开始执行函数 hello

t.join(); 等待线程对象 t 关联的线程执行完毕,否则将一直阻塞。这里的调用是必须的,否则 std::thread 的析构函数将调用 std::terminate() 无法正确析构。

这是因为我们创建线程对象 t 的时候就关联了一个活跃的线程,调用 join() 就是确保线程对象关联的线程已经执行完毕,然后会修改对象的状态,让 std::thread::joinable() 返回 false,表示线程对象目前没有关联活跃线程。std::thread 的析构函数,正是通过 joinable() 判断线程对象目前是否有关联活跃线程,如果为 true,那么就当做有关联活跃线程,会调用 std::terminate()


如你所见,std::thread 高度封装,其成员函数也很少,我们可以轻易的创建线程执行任务,不过,它的用法也还远不止如此,我们慢慢介绍。

当前环境支持并发线程数

使用 hardware_concurrency 函数可以获得我们当前硬件支持的并发线程数量,它是 std::thread 的静态成员函数。

#include <iostream>
#include <thread>
 
int main(){
    unsigned int n = std::thread::hardware_concurrency();
    std::cout << "支持 " << n << " 个并发线程。\n";
}

本节其实是要普及一下计算机常识,一些古老的书籍比如 csapp 应该也会提到“超线程技术”。

英特尔® 超线程技术是一项硬件创新,允许在每个内核上运行多个线程。更多的线程意味着可以并行完成更多的工作。

AMD 超线程技术被称为 SMT(Simultaneous Multi-Threading),它与英特尔的技术实现有所不同,不过使用类似。

举个例子:一款 4 核心 8 线程的 CPU,这里的 8 线程其实是指所谓的逻辑处理器,也意味着这颗 CPU 最多可并行执行 8 个任务。

我们的 hardware_concurrency() 获取的值自然也会是 8

当然了,都 2024 年了,我们还得考虑一个问题:“ 英特尔从 12 代酷睿开始,为其处理器引入了全新的“大小核”混合设计架构”。

比如我的 CPU i7 13700H 它是 14 核心,20 线程,有 8 个能效核,6 个性能核。不过我们说了,物理核心这个通常不看重,hardware_concurrency() 输出的值会为 20。

  • 在进行多线程编程时,我们可以参考此值来确定创建的线程数量,以更好地利用当前硬件,从而提升程序性能。


我们可以举个简单的例子运用这个值:

template<typename ForwardIt>
auto sum(ForwardIt first, ForwardIt last){
    using value_type = std::iter_value_t<ForwardIt>;
    std::size_t num_threads = std::thread::hardware_concurrency();
    std::ptrdiff_t distance = std::distance(first, last);

    if(distance > 1024000){
        // 计算每个线程处理的元素数量
        std::size_t chunk_size = distance / num_threads;
        std::size_t remainder = distance % num_threads;

        // 存储每个线程的结果
        std::vector<value_type> results { num_threads };

        // 存储关联线程的线程对象
        std::vector<std::thread> threads;

        // 创建并启动线程
        auto start = first;
        for (std::size_t i = 0; i < num_threads; ++i) {
            auto end = std::next(start, chunk_size + (i < remainder ? 1 : 0));
            threads.emplace_back([start, end, &results, i] {
                results[i] = std::accumulate(start, end, value_type{});
            });
            start = end; // 开始迭代器不断向前
        }

        // 等待所有线程执行完毕
        for (auto& thread : threads)
            thread.join();

        // 汇总线程的计算结果
        value_type total_sum = std::accumulate(results.begin(), results.end(), value_type{});
        return total_sum;
    }

    value_type total_sum = std::accumulate(first, last, value_type{});
    return total_sum;
}

运行测试。

我们写了这样一个求和函数 sum,接受两个迭代器计算它们范围中对象的和。

我们先获取了迭代器所指向的值的类型,定义了一个别名 value_type,我们这里使用到的 std::iter_value_t 是 C++20 引入的,返回类型推导是 C++14 引入。如果希望代码可以在 C++11 的环境运行也可以自行修改为:

template<typename ForwardIt>
typename std::iterator_traits<ForwardIt>::value_type sum(ForwardIt first, ForwardIt last);

运行测试。

num_threads 是当前硬件支持的并发线程的值。std::distance 用来计算 first 到 last 的距离,也就是我们要进行求和的元素个数了。

我们这里的设计比较简单,毕竟是初学,所以只对元素个数大于 1024000 的进行多线程求和,而小于这个值的则直接使用标准库函数 std::accumulate 求和即可。

多线程求和只需要介绍三个地方

  1. chunk_size 是每个线程分配的任务,但是这是可能有余数的,比如 10 个任务分配三个线程,必然余 1。但是我们也需要执行这个任务,所以还定义了一个对象 remainder ,它存储的就是余数。

  2. auto end = std::next(start, chunk_size + (i < remainder ? 1 : 0)); 这行代码是获取当前线程的执行范围,其实也就是要 chunk_size 再加上我们的余数 remainder 。这里写了一个三目运算符是为了进行分配任务,比如:

    假设有 3 个线程执行,并且余数是 2。那么,每个线程的处理情况如下:

    • i = 0 时,由于 0 < 2,所以这个线程会多分配一个元素。

    • i = 1 时,同样因为 1 < 2,这个线程也会多分配一个元素。

    • i = 2 时,由于 2 >= 2,所以这个线程只处理平均数量的元素。

    这确保了剩余的 2 个元素被分配给了前两个线程,而第三个线程只处理了平均数量的元素。这样就确保了所有的元素都被正确地分配给了各个线程进行处理。

  3. auto start = first; 在创建线程执行之前先定义了一个开始迭代器。在传递给线程执行的lambda表达式中,最后一行是:start = end; 这是为了让迭代器一直向前。

由于求和不涉及数据竞争之类的问题,所以我们甚至可以在刚讲完 Hello World 就手搓了一个“并行求和”的简单的模板函数。主要的难度其实在于对 C++ 的熟悉程度,而非对线程类 std::thread 的使用了,这里反而是最简单的,无非是用容器存储线程对象管理,最后进行 join() 罢了。

本节代码只是为了学习,而且只是百万数据通常没必要多线程,上亿的话差不多。如果你需要多线程求和,可以使用 C++17 引入的求和算法 std::reduce 并指明执行策略。它的效率接近我们实现的 sum 的两倍,当前环境核心越多数据越多,和单线程效率差距越明显。

线程管理

在 C++ 标准库中,没有直接管理线程的机制,只能通过对象关联线程后,通过该对象来管理线程。类 std::thread 的对象就是指代线程的对象,而我们本节说的“线程管理”,其实也就是指管理 std::thread 对象。

启动新线程

使用 C++ 线程库启动线程,就是构造 std::thread 对象。

当然了,如果是默认构造之类的,那么 std::thread 线程对象没有关联线程的,自然也不会启动线程执行任务。

std::thread t; //  构造不表示线程的新 std::thread 对象

我们上一节的示例是传递了一个函数给 std::thread 对象,函数会在新线程中执行。std::thread 支持的形式还有很多,只要是可调用(Callable)对象即可,比如重载了 operator() 的类对象(也可以直接叫函数对象)。

class Task{
public:
    void operator()()const {
        std::cout << "operator()()const\n";
    }
};

我们显然没办法直接像函数使用函数名一样,使用“类名”,函数名可以隐式转换到指向它的函数指针,而类名可不会直接变成对象,我们想使用 Task 自然就得构造对象了

std::thread t{ Task{} };
t.join();

直接创建临时对象即可,可以简化代码并避免引入不必要的局部对象。

不过有件事情需要注意,当我们使用函数对象用于构造 std::thread 的时候,如果你传入的是一个临时对象,且使用的都是 “()”小括号初始化,那么编译器会将此语法解析为函数声明

std::thread t( Task() ); // 函数声明

这被编译器解析为函数声明,是一个返回类型为 std::thread,函数名为 t,接受一个返回 Task 的空参的函数指针类型,也就是 Task(*)()

之所以我们看着抽象是因为这里的形参是无名的,且写了个函数类型。

我们用一个简单的示例为你展示:

void h(int(int));         //#1 声明
void h(int (*p)(int)){}   //#2 定义

即使我还没有为你讲述概念,我相信你也发现了,#1 和 #2 的区别无非是,#1 省略了形参的名称,还有它的形参是函数类型而不是函数指针类型,没有 *

在确定每个形参的类型后,类型是 “T 的数组”或某个函数类型 T 的形参会调整为具有类型“指向 T 的指针”文档

显然,int(int) 是一个函数类型,它被调整为了一个指向这个函数类型的指针类型。

那么回到我们最初的:

std::thread t( Task() );                    // #1 函数声明
std::thread t( Task (*p)() ){ return {}; }  // #2 函数定义

#2我们写出了函数形参名称 p,再将函数类型写成函数指针类型,事实上完全等价。我相信,这样,也就足够了。

所以总而言之,建议使用 {} 进行初始化,这是好习惯,大多数时候它是合适的。

C++11 引入的 Lambda 表达式,同样可以作为构造 std::thread 的参数,因为 Lambda 本身就是生成了一个函数对象,它自身就是类类型

#include <iostream>
#include <thread>

int main(){
    std::thread thread{ [] {std::cout << "Hello World!\n"; } };
    thread.join();
}

启动线程后(也就是构造 std::thread 对象)我们必须在线程对象的生存期结束之前,即 std::thread::~thread 调用之前,决定它的执行策略,是 join()(合并)还是 detach()(分离)。

我们先前使用的就是 join(),我们聊一下 detach(),当 std::thread 线程对象调用了 detach(),那么就是线程对象放弃了对线程资源的所有权,不再管理此线程,允许此线程独立的运行,在线程退出时释放所有分配的资源。

放弃了对线程资源的所有权,也就是线程对象没有关联活跃线程了,此时 joinable 为 false

在单线程的代码中,对象销毁之后再去访问,会产生未定义行为,多线程增加了这个问题发生的几率。

比如函数结束,那么函数局部对象的生存期都已经结束了,都被销毁了,此时线程函数还持有函数局部对象的指针或引用。

#include <iostream>
#include <thread>

struct func {
    int& m_i;
    func(int& i) :m_i{ i } {}
    void operator()(int n)const {
        for (int i = 0; i <= n; ++i) {
            m_i += i;           // 可能悬空引用
        }
    }
};

int main(){
    int n = 0;
    std::thread my_thread{ func{n},100 };
    my_thread.detach();        // 分离,不等待线程结束
}                              // 分离的线程可能还在运行
  1. 主线程(main)创建局部对象 n、创建线程对象 my_thread 启动线程,执行任务 func{n},局部对象 n 的引用被子线程持有。传入 100 用于调用 func 的 operator(int)。

  2. my_thread.detach();,joinable() 为 false。线程分离,线程对象不再持有线程资源,线程独立的运行。

  3. 主线程不等待,此时分离的子线程可能没有执行完毕,但是主线程(main)已经结束,局部对象 n 生存期结束,被销毁,而此时子线程还持有它的引用,访问悬空引用,造成未定义行为。my_thread 已经没有关联线程资源,正常析构,没有问题。

解决方法很简单,将 detach() 替换为 join()。

通常非常不推荐使用 detach(),因为程序员必须确保所有创建的线程正常退出,释放所有获取的资源并执行其它必要的清理操作。这意味着通过调用 detach() 放弃线程的所有权不是一种选择,因此 join 应该在所有场景中使用。 一些老式特殊情况不聊。

另外提示一下,也不要想着 detach() 之后,再次调用 join()

my_thread.detach();
// todo..
my_thread.join();
// 函数结束

认为这样可以确保被分离的线程在这里阻塞执行完?

我们前面聊的很清楚了,detach() 是线程分离,线程对象放弃了线程资源的所有权,此时我们的 my_thread 它现在根本没有关联任何线程。调用 join() 是:“阻塞当前线程直至 *this 所标识的线程结束其执行”,我们的线程对象都没有线程,阻塞什么?执行什么呢?

简单点说,必须是 std::thread 的 joinable() 为 true 即线程对象有活跃线程,才能调用 join() 和 detach()。

顺带的,我们还得处理线程运行后的异常问题,举个例子:你在一个函数中构造了一个 std::thread 对象,线程开始执行,函数继续执行下面别的代码,但是如果抛出了异常呢?下面我的 join() 就会被跳过

std::thread my_thread{func{n},10};
//todo.. 抛出异常的代码
my_thread.join();

避免程序被抛出的异常所终止,在异常处理过程中调用 join(),从而避免线程对象析构产生问题。

struct func; // 复用之前
void f(){
    int n = 0;
    std::thread t{ func{n},10 };
    try{
        // todo.. 一些当前线程可能抛出异常的代码
        f2();
    }
    catch (...){
        t.join(); // 1
        throw;
    }
    t.join();    // 2
}

我知道你可能有很多疑问,我们既然 catch 接住了异常,为什么还要 throw?以及为什么我们要两个 join()?

这两个问题其实也算一个问题,如果代码里抛出了异常,就会跳转到 catch 的代码中,执行 join() 确保线程正常执行完成,线程对象可以正常析构。然而此时我们必须再次 throw 抛出异常,因为你要是不抛出,那么你不是还得执行一个 t.join()?显然逻辑不对,自然抛出。

至于这个函数产生的异常,由调用方进行处理,我们只是确保函数 f 中创建的线程正常执行完成,其局部对象正常析构释放。测试代码

我知道你可能会想到:“我在 try 块中最后一行写一个 t.join() ,这样如果前面的代码没有抛出异常,就能正常的调用 join(),如果抛出了异常,那就调用 catch 中的 t.join() 根本不需要最外部 2 那里的 join(),也不需要再次 throw 抛出异常”

void f(){
    int n = 0;
    std::thread t{ func{n},10 };
    try{
        // todo.. 一些当前线程可能抛出异常的代码
        f2();
        t.join(); // try 最后一行调用 join()
    }
    catch (...){
        t.join(); // 如果抛出异常,就在 这里调用 join()
    }
}

你是否觉得这样也可以?也没问题?简单的测试运行的确没问题。

但是这是不对的,你要注意我们的注释:“一些当前线程可能抛出异常的代码”,而不是 f2(),我们的 try catch 只是为了让线程对象关联的线程得以正确执行完毕,以及线程对象正确析构。并没有处理什么其他的东西,不掩盖错误,try 块中的代码抛出了异常, catch 接住了,我们理所应当再次抛出。

RAII

资源获取即初始化”(RAII,Resource Acquisition Is Initialization)。

简单的说是:构造函数申请资源,析构函数释放资源,让对象的生命周期和资源绑定。当异常抛出时,C++ 会自动调用对象的析构函数。

我们可以提供一个类,在析构函数中使用 join() 确保线程执行完成,线程对象正常析构。

class thread_guard{
    std::thread& m_t;
public:
    explicit thread_guard(std::thread& t) :m_t{ t } {}
    ~thread_guard(){
        std::puts("析构");     // 打印日志 不用在乎
        if (m_t.joinable()) { // 线程对象当前关联了活跃线程
            m_t.join();
        }
    }
    thread_guard(const thread_guard&) = delete;
    thread_guard& operator=(const thread_guard&) = delete;
};
void f(){
    int n = 0;
    std::thread t{ func{n},10 };
    thread_guard g(t);
    f2(); // 可能抛出异常
}

函数 f 执行完毕,局部对象就要逆序销毁了。因此,thread_guard 对象 g 是第一个被销毁的,调用析构函数即使函数 f2() 抛出了一个异常,这个销毁依然会发生(前提是你捕获了这个异常)。这确保了线程对象 t 所关联的线程正常的执行完毕以及线程对象的正常析构。测试代码

如果异常被抛出但未被捕获那么就会调用 std::terminate。是否对未捕获的异常进行任何栈回溯由实现定义。(简单的说就是不一定会调用析构)

我们的测试代码是捕获了异常的,为了观测,看到它一定打印“析构”。

在 thread_guard 的析构函数中,我们要判断 std::thread 线程对象现在是否有关联的活跃线程,如果有,我们才会执行 join(),阻塞当前线程直到线程对象关联的线程执行完毕。如果不想等待线程结束可以使用 detach() ,但是这让 std::thread 对象失去了线程资源的所有权,难以掌控,具体如何,看情况分析。

复制赋值和复制构造定义为 =delete 可以防止编译器隐式生成,同时会阻止移动构造函数和移动赋值运算符的隐式定义。这样的话,对 thread_guard 对象进行复制或赋值等操作会引发一个编译错误。

不允许这些操作主要在于:这是个管理类,而且顾名思义,它就应该只是单纯的管理线程对象仅此而已,只保有一个引用,单纯的做好 RAII 的事情就行,允许其他操作没有价值。

严格来说其实这里倒也不算 RAII,因为 thread_guard 的构造函数其实并没有申请资源,只是保有了线程对象的引用,在析构的时候进行了 join() 。

传递参数

向可调用对象传递参数很简单,我们前面也都写了,只需要将这些参数作为 std::thread 的构造参数即可。需要注意的是,这些参数会复制到新线程的内存空间中,即使函数中的参数是引用,依然实际是复制

void f(int, const int& a);

int n = 1;
std::thread t{ f, 3, n };

线程对象 t 的构造没有问题,可以通过编译,但是这个 n 实际上并没有按引用传递,而是按值复制的。我们可以打印地址来验证我们的猜想。

void f(int, const int& a) { // a 并非引用了局部对象 n
    std::cout << &a << '\n'; 
}

int main() {
    int n = 1;
    std::cout << &n << '\n';
    std::thread t{ f, 3, n };
    t.join();
}

运行代码,打印的地址截然不同。

可以通过编译,但通常这不符合我们的需求,因为我们的函数中的参数是引用,我们自然希望能引用调用方传递的参数,而不是复制。如果我们的 f 的形参类型不是 const 的引用,则会产生一个编译错误

想要解决这个问题很简单,我们可以使用标准库的设施 std::refstd::cref 函数模板。

void f(int, int& a) {
    std::cout << &a << '\n'; 
}

int main() {
    int n = 1;
    std::cout << &n << '\n';
    std::thread t { f, 3, std::ref(n) };
    t.join();
}

运行代码,打印地址完全相同。

我们来解释一下,“ref” 其实就是 “reference”(引用)的缩写,意思也很简单,返回“引用”,当然了,不是真的返回引用,它们返回一个包装类 std::reference_wrapper,顾名思义,这个类就是包装引用对象类模板,将对象包装,可以隐式转换为被包装对象的引用。

cref”呢?,这个“c”就是“const”,就是返回了 std::reference_wrapper<const T>。我们不详细介绍他们的实现,你简单认为reference_wrapper可以隐式转换为被包装对象的引用即可,

int n = 0;
std::reference_wrapper<int> r = std::ref(n);
int& p = r; // r 隐式转换为 n 的引用 此时 p 引用的就是 n
int n = 0;
std::reference_wrapper<const int> r = std::cref(n);
const int& p = r; // r 隐式转换为 n 的 const 的引用 此时 p 引用的就是 n

如果对他们的实现感兴趣,可以观看视频


以上代码void f(int, int&) 如果不使用 std::ref 并不会和前面 void f(int, const int&) 一样只是多了复制,而是会产生编译错误,这是因为 std::thread 内部会将保有的参数副本转换为右值表达式进行传递,这是为了那些只支持移动的类型,左值引用没办法引用右值表达式,所以产生编译错误。

struct move_only {
    move_only() { std::puts("默认构造"); }
    move_only(const move_only&) = delete;
    move_only(move_only&&)noexcept {
        std::puts("移动构造");
    }
};

void f(move_only){}

int main(){
    move_only obj;
    std::thread t{ f,std::move(obj) };
    t.join();
}

运行测试。

没有 std::ref 自然是会保有一个副本,所以有两次移动构造,一次是被 std::thread 构造函数中初始化副本,一次是调用函数 f

如果还有不理解,不用担心,记住,这一切的问题都会在后面的 std::thread 的构造-源码解析 解释清楚。


成员函数指针也是可调用(Callable)的 ,可以传递给 std::thread 作为构造参数,让其关联的线程执行成员函数。

struct X{
    void task_run(int)const;
};

 X x;
 int n = 0;
 std::thread t{ &X::task_run,&x,n };
 t.join();

传入成员函数指针、与其配合使用的对象、调用成员函数的参数,构造线程对象 t,启动线程。

如果你是第一次见到成员指针,那么我们稍微聊一下,&X::task_run 是一个整体,它们构成了成员指针,&类名::非静态成员

成员指针必须和对象一起使用,这是唯一标准用法,成员指针不可以转换到函数指针单独使用,即使是非静态成员函数没有使用任何数据成员。

我们还可以使用模板函数 std::bind与成员指针一起使用

std::thread t{ std::bind(&X::task_run, &x ,n) };

不过需要注意,std::bind 也是默认按值复制的,即使我们的成员函数形参类型为引用:

struct X {
    void task_run(int& a)const{
        std::cout << &a << '\n';
    }
};

X x;
int n = 0;
std::cout << &n << '\n';
std::thread t{ std::bind(&X::task_run,&x,n) };
t.join();

除非给参数 n 加上 std::ref,就是按引用传递了:

std::thread t{ std::bind(&X::task_run,&x,std::ref(n)) };

void f(const std::string&);
std::thread t{ f,"hello" };

代码创建了一个调用 f("hello") 的线程。注意,函数 f 实际需要的是一个 std::string 类型的对象作为参数,但这里使用的是字符串字面量,我们要明白“A的引用只能引用A,或者以任何形式转换到A”,字符串字面量的类型是 const char[N] ,它会退化成指向它的const char* 指针,被线程对象保存。在调用 f 的时候,这个指针可以通过 std::string 的转换构造函数,构造出一个临时的 std::string 对象,就能成功调用。

字符串字面量具有静态存储期,指向它的指针这当然没问题了,不用担心生存期的问题,但是如果是指向“动态”对象的指针,就要特别注意了:

void f(const std::string&);
void test(){
    char buffer[1024]{};
    //todo.. code
    std::thread t{ f,buffer };
    t.detach();
}

以上代码可能导致一些问题,buffer 是一个数组对象,作为 std::thread 构造参数的传递的时候会decay-copy (确保实参在按值传递时会退化) 隐式转换为了指向这个数组的指针

我们要特别强调,std::thread 构造是代表“启动线程”,而不是调用我们传递的可调用对象。

std::thread 的构造函数中调用了创建线程的函数(windows 下可能为 _beginthreadex),它将我们传入的参数,f、buffer ,传递给这个函数,在新线程中执行函数 f。也就是说,调用和执行 f(buffer) 并不是说要在 std::thread 的构造函数中,而是在创建的新线程中,具体什么时候执行,取决于操作系统的调度,所以完全有可能函数 test 先执行完,而新线程此时还没有进行 f(buffer) 的调用,转换为std::string,那么 buffer 指针就悬空了,会导致问题。解决方案:

  1. detach() 替换为 join()

    void f(const std::string&);
    void test(){
        char buffer[1024]{};
        //todo.. code
        std::thread t{ f,buffer };
        t.join();
    }
  2. 显式将 buffer 转换为 std::string

    void f(const std::string&);
    void test(){
        char buffer[1024]{};
        //todo.. code
        std::thread t{ f,std::string(buffer) };
        t.detach();
    }

std::this_thread

这个命名空间包含了管理当前线程的函数。

  1. yield 建议实现重新调度各执行线程。

  2. get_id 返回当前线程 id。

  3. sleep_for 使当前线程停止执行指定时间。

  4. sleep_until 使当前线程执行停止到指定的时间点。

它们之中最常用的是 get_id,其次是 sleep_for,再然后 yieldsleep_until 较少。

  • 使用 get_id 打印主线程和子线程的 ID。

    int main() {
        std::cout << std::this_thread::get_id() << '\n';
    
        std::thread t{ [] {
            std::cout << std::this_thread::get_id() << '\n';
        } };
        t.join();
    }
  • 使用 sleep_for 延时。当 Sleep 之类的就行,但是它需要接受的参数不同,是 std::chrono 命名空间中的时间对象。

    int main() {
        std::this_thread::sleep_for(std::chrono::seconds(3));
    }

    主线程延时 3 秒,这个传入了一个临时对象 seconds ,它是模板 std::chrono::duration 的别名,以及还有很多其他的时间类型,都基于这个类。说实话挺麻烦的,如果您支持 C++14,建议使用时间字面量,在 std::chrono_literals 命名空间中。我们可以改成下面这样:

    using namespace std::chrono_literals;
    
    int main() {
        std::this_thread::sleep_for(3s);
    }

    简单直观。

  • yield 减少 CPU 的占用。

    while (!isDone()){
        std::this_thread::yield();
    }

    线程需要等待某个操作完成,如果你直接用一个循环不断判断这个操作是否完成就会使得这个线程占满 CPU 时间,这会造成资源浪费。此时可以判断操作是否完成,如果还没完成就调用 yield 交出 CPU 时间片让其他线程执行,过一会儿再来判断是否完成,这样这个线程占用 CPU 时间会大大减少。

  • 使用 sleep_until 让当前线程延迟到具体的时间。我们延时 5 秒就是。

    int main() {
        // 获取当前时间点
        auto now = std::chrono::system_clock::now();
    
        // 设置要等待的时间点为当前时间点之后的5秒
        auto wakeup_time = now + 5s;
    
        // 输出当前时间
        auto now_time = std::chrono::system_clock::to_time_t(now);
        std::cout << "Current time:\t\t" << std::put_time(std::localtime(&now_time), "%H:%M:%S") << std::endl;
    
        // 输出等待的时间点
        auto wakeup_time_time = std::chrono::system_clock::to_time_t(wakeup_time);
        std::cout << "Waiting until:\t\t" << std::put_time(std::localtime(&wakeup_time_time), "%H:%M:%S") << std::endl;
    
        // 等待到指定的时间点
        std::this_thread::sleep_until(wakeup_time);
    
        // 输出等待结束后的时间
        now = std::chrono::system_clock::now();
        now_time = std::chrono::system_clock::to_time_t(now);
        std::cout << "Time after waiting:\t" << std::put_time(std::localtime(&now_time), "%H:%M:%S") << std::endl;
    }

    sleep_until 本身设置使用很简单,是打印时间格式、设置时区麻烦。运行结果

介绍了一下 std::this_thread 命名空间中的四个函数的基本用法,我们后续会经常看到这些函数的使用,不用着急。

std::thread 转移所有权

传入可调用对象以及参数,构造 std::thread 对象,启动线程,而线程对象拥有了线程的所有权,线程是一种系统资源,所以可称作“线程资源”。

std::thread 不可复制。两个 std::thread 对象不可表示一个线程,std::thread 对线程资源是独占所有权。而移动操作可以将一个 std::thread 对象的线程资源所有权转移给另一个 std::thread 对象。

int main() {
    std::thread t{ [] {
        std::cout << std::this_thread::get_id() << '\n';
    } };
    std::cout << t.joinable() << '\n'; // 线程对象 t 当前关联了活跃线程 打印 1
    std::thread t2{ std::move(t) };    // 将 t 的线程资源的所有权移交给 t2
    std::cout << t.joinable() << '\n'; // 线程对象 t 当前没有关联活跃线程 打印 0
    //t.join(); // Error! t 没有线程资源
    t2.join();  // t2 当前持有线程资源
}

这段代码通过移动构造转移了线程对象 t 的线程资源所有权到 t2,这里虽然有两个 std::thread 对象,但是从始至终只有一个线程资源,让持有线程资源的 t2 对象最后调用 join() 阻塞让其线程执行完毕。tt2 都能正常析构。

我们还可以使用移动赋值来转移线程资源的所有权:

int main() {
    std::thread t;      // 默认构造,没有关联活跃线程
    std::cout << t.joinable() << '\n'; // 0
    std::thread t2{ [] {} };
    t = std::move(t2); // 转移线程资源的所有权到 t
    std::cout << t.joinable() << '\n'; // 1
    t.join();
    
    t2 = std::thread([] {});
    t2.join();
}

我们只需要介绍 t2 = std::thread([] {}) ,临时对象是右值表达式,不用调用 std::move,这里相当于是将临时的 std::thread 对象所持有的线程资源转移给 t2t2 再调用 join() 正常析构。

函数返回 std::thread 对象:

std::thread f(){
    std::thread t{ [] {} };
    return t;
}

int main(){
    std::thread rt = f();
    rt.join();
}

这段代码可以通过编译,你是否感到奇怪?我们在函数 f 中创建了一个局部的 std::thread 对象,启动线程,然后返回它。

这里的 return t 重载决议选择到了移动构造,将 t 线程资源的所有权转移给函数调用 f() 返回的临时 std::thread 对象中,然后这个临时对象再用来初始化 rt ,临时对象是右值表达式,这里一样选择到移动构造,将临时对象的线程资源所有权移交给 rt。此时 rt 具有线程资源的所有权,由它调用 join() 正常析构。

如果标准达到 C++17,强制的复制消除(RVO)保证这里少一次移动构造的开销(临时对象初始化 rt 的这次)。

所有权也可以在函数内部传递

void f(std::thread t){
    t.join();
}

int main(){
    std::thread t{ [] {} };
    f(std::move(t));
    f(std::thread{ [] {} });
}

std::move 将 t 转换为了一个右值表达式,初始化函数f 形参 t,选择到了移动构造转移线程资源的所有权,在函数中调用 t.join() 后正常析构。std::thread{ [] {} } 构造了一个临时对象,本身就是右值表达式,初始化函数f 形参 t,移动构造转移线程资源的所有权到 tt.join() 后正常析构。

本节内容总体来说是很简单的,如果你有些地方无法理解,那只有一种可能,“对移动语义不了解”,不过这也不是问题,在后续我们详细介绍 std::thread 构造函数的源码即可,不用着急。

我们上一个大节讲解了线程管理,也就是 std::thread 的使用,其中的重中之重就是它的构造,传递参数。我们用源码实现为各位从头讲解。

了解其实现,才能更好的使用它,同时也能解释其使用与学习中的各种问题。如:

  • 如何做到的默认按值复制?

  • 为什么需要 std::ref

  • 如何支持只能移动的对象?

  • 如何做到接受任意可调用对象?

  • 如何创建的线程?

  • 传递参数一节中的:“std::thread 内部会将保有的参数副本转换为右值表达式进行传递”到底是如何做到的?

当你看完 std::thread 的构造-源码解析 后,可以再回过头来问问自己是否能够回答这些问题。

实现 joining_thread

这个类和 std::thread 的区别就是析构函数会自动 join 。如果您好好的学习了上一节的内容,阅读了 std::thread 的源码,以下内容不会对您构成任何的难度。

我们存储一个 std::thread 作为底层数据成员,稍微注意一下构造函数和赋值运算符的实现即可。

class joining_thread {
    std::thread t;
public:
    joining_thread()noexcept = default;
    template<typename Callable, typename... Args>
    explicit joining_thread(Callable&& func, Args&&...args) :
        t{ std::forward<Callable>(func), std::forward<Args>(args)... } {}
    explicit joining_thread(std::thread t_)noexcept : t{ std::move(t_) } {}
    joining_thread(joining_thread&& other)noexcept : t{ std::move(other.t) } {}

    joining_thread& operator=(std::thread&& other)noexcept {
        if (joinable()) { // 如果当前有活跃线程,那就先执行完
            join();
        }
        t = std::move(other);
        return *this;
    }
    ~joining_thread() {
        if (joinable()) {
            join();
        }
    }
    void swap(joining_thread& other)noexcept {
        t.swap(other.t);
    }
    std::thread::id get_id()const noexcept {
        return t.get_id();
    }
    bool joinable()const noexcept {
        return t.joinable();
    }
    void join() {
        t.join();
    }
    void detach() {
        t.detach();
    }
    std::thread& data()noexcept {
        return t;
    }
    const std::thread& data()const noexcept {
        return t;
    }
};

简单使用一下:

int main(){
    std::cout << std::this_thread::get_id() << '\n';
    joining_thread thread{[]{
            std::cout << std::this_thread::get_id() << '\n';
    } };
    joining_thread thread2{ std::move(thread) };
}

使用容器管理线程对象,等待线程执行结束

void do_work(std::size_t id){
    std::cout << id << '\n';
}

int main(){
    std::vector<std::thread>threads;
    for (std::size_t i = 0; i < 10; ++i){
        threads.emplace_back(do_work, i); // 产生线程
    }
    for(auto& thread:threads){
        thread.join();                   // 对每个线程对象调用 join()
    }
}

运行测试

线程对象代表了线程,管理线程对象也就是管理线程,这个 vector 对象管理 10 个线程,保证他们的执行、退出。

使用我们这节实现的 joining_thread 则不需要最后的循环 join()

int main(){
    std::vector<joining_thread>threads;
    for (std::size_t i = 0; i < 10; ++i){
        threads.emplace_back(do_work, i);
    }
}

运行测试

如果你自己编译了这些代码,相信你注意到了,打印的是乱序的,没什么规律,而且重复运行的结果还不一样,这是正常现象。多线程执行就是如此,无序且操作可能被打断。使用互斥量可以解决这些问题,这也就是下一章节的内容了。

C++20 std::jthread

std::jthread 相比于 C++11 引入的 std::thread,只是多了两个功能:

  1. RAII 管理:在析构时自动调用 join()

  2. 线程停止功能:线程的取消/停止。

零开销原则

我知道你肯定有疑问,为什么 C++20 不直接为 std::thread 增加这两个功能,而是创造一个新的线程类型呢?

这就是 C++ 的设计哲学,零开销原则你不需要为你没有用到的(特性)付出额外的开销

std::jthread 的通常实现就是单纯的保有 std::thread + std::stop_source 这两个数据成员:

thread _Impl;
stop_source _Ssource;

MSVC STLlibstdc++libc++ 均是如此。

stop_source 通常占 8 字节,先前 std::thread 源码解析详细聊过其不同标准库对其保有的成员不同,简单来说也就是 64 位环境,大小为 16 或者 8。也就是 sizeof(std::jthread) 的值相比 std::thread 会多 8 ,为 2416

引入 std::jthread 符合零开销原则,它通过创建新的类型提供了更多的功能,而没有影响到原来 std::thread 的性能和内存占用。

线程停止

第一个功能很简单,不用赘述,我们直接聊这个所谓的“线程停止”就好。

首先要明确,C++ 的 std::jthread 提供的线程停止功能并不同于常见的 POSIX 函数 pthread_cancelpthread_cancel 是一种发送取消请求的函数,但并不是强制性的线程终止方式。目标线程的可取消性状态和类型决定了取消何时生效。当取消被执行时,进行清理和终止线程。

std::jthread 所谓的线程停止只是一种基于用户代码的控制机制,而不是一种与操作系统系统有关系的线程终止。使用 std::stop_sourcestd::stop_token 提供了一种优雅地请求线程停止的方式,但实际上停止的决定和实现都由用户代码来完成

using namespace std::literals::chrono_literals;

void f(std::stop_token stop_token, int value){
    while (!stop_token.stop_requested()){ // 检查是否已经收到停止请求
        std::cout << value++ << ' ' << std::flush;
        std::this_thread::sleep_for(200ms);
    }
    std::cout << std::endl;
}

int main(){
    std::jthread thread{ f, 1 }; // 打印 1..15 大约 3 秒
    std::this_thread::sleep_for(3s);
    // jthread 的析构函数调用 request_stop() 和 join()。
}

运行测试。截止目前(2024/5/29 clang19 未发布) libc++ 不完全支持 std::jthread,建议使用 clang 的开发者链接 libstdc++MSVC STL 进行编译。如果非要使用 libc++,可以添加 -fexperimental-library 编译选项,启用不稳定库功能和实验库功能。这样,我们的这段代码就可以通过编译。

std::jthread 提供了三个成员函数进行所谓的线程停止

  • get_stop_source:返回与 jthread 对象关联的 std::stop_source,允许从外部请求线程停止。

  • get_stop_token:返回与 jthread 对象停止状态关联的 std::stop_token,允许检查是否有停止请求。

  • request_stop:请求线程停止。

上面这段代码并未出现这三个函数的任何一个调用,不过在 jthread 的析构函数中,会调用 request_stop 请求线程停止。

void _Try_cancel_and_join() noexcept {
    if (_Impl.joinable()) {
        _Ssource.request_stop();
        _Impl.join();
    }
}
~jthread() {
    _Try_cancel_and_join();
}

至于 std::jthread thread{ f, 1 }; 函数 f 的 std::stop_token 的形参是谁传递的?其实就是线程对象自己调用 get_token() 传递的 ,源码一眼便可发现:

template <class _Fn, class... _Args, enable_if_t<!is_same_v<remove_cvref_t<_Fn>, jthread>, int> = 0>
_NODISCARD_CTOR_JTHREAD explicit jthread(_Fn&& _Fx, _Args&&... _Ax) {
    if constexpr (is_invocable_v<decay_t<_Fn>, stop_token, decay_t<_Args>...>) {
        _Impl._Start(_STD forward<_Fn>(_Fx), _Ssource.get_token(), _STD forward<_Args>(_Ax)...);
    } else {
        _Impl._Start(_STD forward<_Fn>(_Fx), _STD forward<_Args>(_Ax)...);
    }
}

也就是说虽然最初的那段代码看似什么都没调用,但是实际什么都调用了。这所谓的线程停止,其实简单来说,有点像外部给线程传递信号一样。


std::stop_source

  • 这是一个可以发出停止请求的类型。当你调用 stop_sourcerequest_stop() 方法时,它会设置内部的停止状态为“已请求停止”。

  • 任何持有与这个 stop_source 关联的 std::stop_token 对象都能检查到这个停止请求。

std::stop_token

  • 这是一个可以检查停止请求的类型。线程内部可以定期检查 stop_token 是否收到了停止请求。

  • 通过调用 stop_token.stop_requested(),线程可以检测到停止状态是否已被设置为“已请求停止”。

总结

零开销原则应当很好理解。我们本节的难点只在于使用到了一些 MSVC STL 的源码实现来配合理解,其主要在于“线程停止”。线程停止设施你会感觉是一种类似与外部与线程进行某种信号通信的设施,std::stop_sourcestd::stop_token 都与线程对象关联,然后来管理函数到底如何执行。

我们并没有举很多的例子,我们觉得这一个小例子所牵扯到的内容也就足够了,关键在于理解其设计与概念。

总结

本章节的内容围绕着:“使用线程”,也就是"使用 std::thread"展开, std::thread 是我们学习 C++ 并发支持库的重中之重,在最后谈起了 C++20 引入的 std::jthread ,它的使用与概念也非常的简单。本章的内容在市面上并不算少见,但是却是少有的准确与完善。即使你早已学习乃至使用 C++ 标准库进行多线程编程,我相信本章也一定可以让你收获良多。

并且如果是第一次学习本章的内容,能会有一些难以理解的地方。建议你多思考、多记忆,并在以后反复查看和实践。

我尽量以简单通俗的方式进行讲解。学完本章后,你可能还无法在实际环境利用多线程提升程序效率,至少还需要学习到使用互斥量来保护共享数据,才能实际应用多线程编程。

Last updated