共享数据

本章节主要内容:

  • 多线程共享数据的问题

  • 使用互斥量保护共享数据

  • 保护共享数据的其它方案

  • 有关线程安全的其它问题

在上一章内容,我们对于线程的基本使用和管理,可以说已经比较了解了,甚至深入阅读了部分的 std::thread 源码。所以如果你好好学习了上一章,本章也完全不用担心。

我们本章,就要开始聊共享数据的那些事。

条件竞争

在多线程的情况下,每个线程都抢着完成自己的任务。在大多数情况下,即使会改变执行顺序,也是良性竞争,这是无所谓的。比如两个线程都要往标准输出输出一段字符,谁先谁后并不会有什么太大影响。

void f() { std::cout << "❤️\n"; }
void f2() { std::cout << "😢\n"; }

int main(){
    std::thread t{ f };
    std::thread t2{ f2 };
    t.join();
    t2.join();
}

std::cout 的 operator<< 调用是线程安全的,不会被打断。即:同步的 C++ 流保证是线程安全的(从多个线程输出的单独字符可能交错,但无数据竞争)

只有在涉及多线程读写相同共享数据的时候,才会导致“恶性的条件竞争”。

std::vector<int>v;

void f() { v.emplace_back(1); }
void f2() { v.erase(v.begin()); }

int main() {
    std::thread t{ f };
    std::thread t2{ f2 };
    t.join();
    t2.join();
    std::cout << v.size() << '\n';
}

比如这段代码就是典型的恶性条件竞争,两个线程共享一个 vector,并对它进行修改。可能导致许多问题,比如 f2 先执行,此时 vector 还没有元素,导致抛出异常。又或者 f 执行了一半,调用了 f2(),等等。

当然了,也有可能先执行 f,然后执行 f2,最后打印了 0,程序老老实实执行完毕。

但是我们显然不能寄希望于这种操作系统的调度。

而且即使不是一个添加元素,一个删除元素,全是 emplace_back 添加元素,也一样会有问题,由于 std::vector 不是线程安全的容器,因此当多个线程同时访问并修改 v 时,可能会发生未定义的行为。具体来说,当两个线程同时尝试向 v 中添加元素时,但是 emplace_back 函数却是可以被打断的,执行了一半,又去执行另一个线程。可能会导致数据竞争,从而引发未定义的结果。

当某个表达式的求值写入某个内存位置,而另一求值读或修改同一内存位置时,称这些表达式冲突拥有两个冲突的求值的程序就有数据竞争,除非

  • 两个求值都在同一线程上,或者在同一信号处理函数中执行,或

  • 两个冲突的求值都是原子操作(见 std::atomic),或

  • 一个冲突的求值发生早于 另一个(见 std::memory_order)

如果出现数据竞争,那么程序的行为未定义。

标量类型等都同理,有数据竞争未定义行为

int cnt = 0;
auto f = [&]{cnt++;};
std::thread t1{f}, t2{f}, t3{f}; // 未定义行为

使用互斥量

互斥量(Mutex),又称为互斥锁(或者直接被称作“锁”),是一种用来保护临界区的特殊对象,其相当于实现了一个公共的“标志位”。它可以处于锁定(locked)状态,也可以处于解锁(unlocked)状态:

  1. 如果互斥量是锁定的,通常说某个特定的线程正持有这个锁。

  2. 如果没有线程持有这个互斥量,那么这个互斥量就处于解锁状态。


概念从来不是我们的重点,用一段对比代码为你直观的展示互斥量的作用:

void f() {
    std::cout << std::this_thread::get_id() << '\n';
}

int main() {
    std::vector<std::thread> threads;
    for (std::size_t i = 0; i < 10; ++i)
        threads.emplace_back(f);

    for (auto& thread : threads)
        thread.join();
}

这段代码你多次运行它会得到毫无规律和格式的结果,我们可以使用互斥量解决这个问题:

#include <mutex> // 必要标头
std::mutex m;

void f() {
    m.lock();
    std::cout << std::this_thread::get_id() << '\n';
    m.unlock();
}

int main() {
    std::vector<std::thread>threads;
    for (std::size_t i = 0; i < 10; ++i)
        threads.emplace_back(f);

    for (auto& thread : threads)
        thread.join();
}

当多个线程执行函数 f 的时候,只有一个线程能成功调用 lock() 给互斥量上锁,其他所有的线程 lock() 的调用将阻塞执行,直至获得锁。第一个调用 lock() 的线程得以继续往下执行,执行我们的 std::cout 输出语句,不会有任何其他的线程打断这个操作。直到线程执行 unlock(),就解锁了互斥量。

那么其他线程此时也就能再有一个成功调用 lock...

至于到底哪个线程才会成功调用,这个是由操作系统调度决定的。

看一遍描述就可以了,简而言之,被 lock()unlock() 包含在其中的代码是线程安全的,同一时间只有一个线程执行,不会被其它线程的执行所打断。

std::lock_guard

不过一般不推荐这样显式的 lock()unlock(),我们可以使用 C++11 标准库引入的“管理类” std::lock_guard

void f() {
    std::lock_guard<std::mutex> lc{ m };
    std::cout << std::this_thread::get_id() << '\n';
}

那么问题来了,std::lock_guard 是如何做到的呢?它是怎么实现的呢?首先顾名思义,这是一个“管理类”模板,用来管理互斥量的上锁与解锁,我们来看它在 MSVC STL 的实现:

_EXPORT_STD template <class _Mutex>
class _NODISCARD_LOCK lock_guard { // class with destructor that unlocks a mutex
public:
    using mutex_type = _Mutex;

    explicit lock_guard(_Mutex& _Mtx) : _MyMutex(_Mtx) { // construct and lock
        _MyMutex.lock();
    }

    lock_guard(_Mutex& _Mtx, adopt_lock_t) noexcept // strengthened
        : _MyMutex(_Mtx) {} // construct but don't lock

    ~lock_guard() noexcept {
        _MyMutex.unlock();
    }

    lock_guard(const lock_guard&)            = delete;
    lock_guard& operator=(const lock_guard&) = delete;

private:
    _Mutex& _MyMutex;
};

这段代码极其简单,首先管理类,自然不可移动不可复制,我们定义复制构造与复制赋值为弃置函数,同时阻止了移动等函数的隐式定义。

它只保有一个私有数据成员,一个引用,用来引用互斥量。

构造函数中初始化这个引用,同时上锁,析构函数中解锁,这是一个非常典型的 RAII 式的管理。

同时它还提供一个有额外std::adopt_lock_t参数的构造函数 ,如果使用这个构造函数,则构造函数不会上锁。

所以有的时候你可能会看到一些这样的代码:

void f(){
    //code..
    {
        std::lock_guard<std::mutex> lc{ m };
        // 涉及共享资源的修改的代码...
    }
    //code..
}

使用 {} 创建了一个块作用域,限制了对象 lc 的生存期,进入作用域构造 lock_guard 的时候上锁(lock),离开作用域析构的时候解锁(unlock)。

  • 我们要尽可能的让互斥量上锁的粒度小,只用来确保必须的共享资源的线程安全。

“粒度”通常用于描述锁定的范围大小,较小的粒度意味着锁定的范围更小,因此有更好的性能和更少的竞争。

我们举一个例子:

std::mutex m;

void add_to_list(int n, std::list<int>& list) {
    std::vector<int> numbers(n + 1);
    std::iota(numbers.begin(), numbers.end(), 0);
    int sum = std::accumulate(numbers.begin(), numbers.end(), 0);

    {
        std::lock_guard<std::mutex> lc{ m };
        list.push_back(sum);
    }
}
void print_list(const std::list<int>& list){
    std::lock_guard<std::mutex> lc{ m };
    for(const auto& i : list){
        std::cout << i << ' ';
    }
    std::cout << '\n';
}
std::list<int> list;
std::thread t1{ add_to_list,i,std::ref(list) };
std::thread t2{ add_to_list,i,std::ref(list) };
std::thread t3{ print_list,std::cref(list) };
std::thread t4{ print_list,std::cref(list) };
t1.join();
t2.join();
t3.join();
t4.join();

完整代码测试

先看 add_to_list,只有 list.push_back(sum) 涉及到了对共享数据的修改,需要进行保护,我们用 {} 包起来了。

假设有线程 A、B执行函数 add_to_list() :线程 A 中的 numbers、sum 与线程 B 中的,不是同一个,希望大家分清楚,自然不存在数据竞争,也不需要上锁。线程 A、B执行完了前面求 0-n 的计算,只有一个线程能在 lock_guard 的构造函数中成功调用 lock() 给互斥量上锁。假设线程 A 成功调用 lock(),那么线程 B 的 lock() 调用就阻塞了,必须等待线程 A 执行完里面的代码,然后作用域结束,调用 lock_guard 的析构函数,解锁 unlock(),此时线程 B 就可以进去执行了,避免了数据竞争,不存在一个对象同时被多个线程修改。

函数 print_list() 就更简单了,打印 list,给整个函数上锁,同一时刻只能有一个线程执行。

我们的使用代码是多个线程执行这两个函数,两个函数共享了一个锁,这样确保了当执行函数 print_list() 打印的时候,list 的状态是确定的。打印函数 print_listadd_to_list 函数的修改操作同一时间只能有一个线程在执行。print_list() 不可能看到正在被add_to_list() 修改的 list。

至于到底哪个函数哪个线程会先执行,执行多少次,这些都由操作系统调度决定,也完全有可能连续 4 次都是执行函数 print_list 的线程成功调用 lock,会打印出了一样的值,这都很正常。


C++17 添加了一个新的特性,类模板实参推导std::lock_guard 可以根据传入的参数自行推导,而不需要写明模板类型参数:

std::mutex m;
std::lock_guard lc{ m }; // std::lock_guard<std::mutex>

并且 C++17 还引入了一个新的“管理类”:std::scoped_lock,它相较于 lock_guard的区别在于,它可以管理多个互斥量。不过对于处理一个互斥量的情况,它和 lock_guard 几乎完全相同。

std::mutex m;
std::scoped_lock lc{ m }; // std::scoped_lock<std::mutex>

我们在后续管理多个互斥量,会详细了解这个类。

try_lock

try_lock 是互斥量中的一种尝试上锁的方式。与常规的 lock 不同,try_lock 会尝试上锁,但如果锁已经被其他线程占用,则不会阻塞当前线程,而是立即返回

它的返回类型是 bool ,如果上锁成功就返回 true,失败就返回 false

这种方法在多线程编程中很有用,特别是在需要保护临界区的同时,又不想线程因为等待锁而阻塞的情况下。

std::mutex mtx;

void thread_function(int id) {
    // 尝试加锁
    if (mtx.try_lock()) {
        std::cout << "线程:" << id << " 获得锁" << std::endl;
        // 临界区代码
        std::this_thread::sleep_for(std::chrono::milliseconds(100)); // 模拟临界区操作
        mtx.unlock(); // 解锁
        std::cout << "线程:" << id << " 释放锁" << std::endl;
    } else {
        std::cout << "线程:" << id << " 获取锁失败 处理步骤" << std::endl;
    }
}

如果有两个线程运行这段代码,必然有一个线程无法成功上锁,要走 else 的分支。

std::thread t1(thread_function, 1);
std::thread t2(thread_function, 2);

t1.join();
t2.join();

可能的运行结果

线程:1 获得锁
线程:2 获取锁失败 处理步骤
线程:1 释放锁

保护共享数据

互斥量主要也就是为了保护共享数据,上一节的使用互斥量也已经为各位展示了一些。

然而使用互斥量来保护共享数据也并不是在函数中加上一个 std::lock_guard 就万事大吉了。有的时候只需要一个指针或者引用,就能让这种保护形同虚设

class Data{
    int a{};
    std::string b{};
public:
    void do_something(){
        // 修改数据成员等...
    }
};

class Data_wrapper{
    Data data;
    std::mutex m;
public:
    template<class Func>
    void process_data(Func func){
        std::lock_guard<std::mutex> lc{m};
        func(data);  // 受保护数据传递给函数
    }
};

Data* p = nullptr;

void malicious_function(Data& protected_data){
    p = &protected_data; // 受保护的数据被传递
}

Data_wrapper d;

void foo(){
    d.process_data(malicious_function);  // 传递了一个恶意的函数
    p->do_something();                   // 在无保护的情况下访问保护数据
}

成员函数模板 process_data 看起来一点问题也没有,使用 std::lock_guard 对数据做了保护,但是调用方传递了 malicious_function 这样一个恶意的函数,使受保护数据传递给外部,可以在没有被互斥量保护的情况下调用 do_something()

我们传递的函数就不该是涉及外部副作用的,就应该是单纯的在受互斥量保护的情况下老老实实调用 do_something() 操作受保护的数据。

  • 简而言之:切勿将受保护数据的指针或引用传递到互斥量作用域之外,不然保护将形同虚设

process_data 的确算是没问题,用户非要做这些事情也是防不住的,我们只是告诉各位可能的情况。

死锁:问题与解决

试想一下,有一个玩具,这个玩具有两个部分,必须同时拿到两部分才能玩。比如一个遥控汽车,需要遥控器和玩具车才能玩。有两个小孩,他们都想玩这个玩具。当其中一个小孩拿到了遥控器和玩具车时,就可以尽情玩耍。当另一个小孩也想玩,他就得等待另一个小孩玩完才行。再试想,遥控器和玩具车被放在两个不同的地方,并且两个小孩都想要玩,并且一个拿到了遥控器,另一个拿到了玩具车。问题就出现了,除非其中一个孩子决定让另一个先玩,他把自己的那个部分给另一个小孩。但如果他们都不愿意,那么这个遥控汽车就谁都没有办法玩。

我们当然不在乎小孩抢玩具,我们要聊的是线程对锁的竞争:两个线程需要对它们所有的互斥量做一些操作,其中每个线程都有一个互斥量,且等待另一个线程的互斥量解锁。因为它们都在等待对方释放互斥量,没有线程工作。 这种情况就是死锁。

  • 多个互斥量才可能遇到死锁问题

避免死锁的一般建议是让两个互斥量以相同的顺序上锁,总在互斥量 B 之前锁住互斥量 A,就通常不会死锁。反面示例

std::mutex m1,m2;
std::size_t n{};

void f(){
    std::lock_guard<std::mutex> lc1{ m1 };
    std::lock_guard<std::mutex> lc2{ m2 };
    ++n;
}
void f2() {
    std::lock_guard<std::mutex> lc1{ m2 };
    std::lock_guard<std::mutex> lc2{ m1 };
    ++n;
}

ff2 因为互斥量上锁顺序不同,就有死锁风险。函数 f 先锁定 m1,然后再尝试锁定 m2,而函数 f2 先锁定 m2 再锁定 m1 。如果两个线程同时运行,它们就可能会彼此等待对方释放其所需的锁,从而造成死锁。

简而言之,有可能函数 f 锁定了 m1,函数 f2 锁定了 m2,函数 f 要往下执行,给 m2 上锁,所以在等待 f2 解锁 m2,然而函数 f2 也在等待函数 f 解锁 m1 它才能往下执行。所以死锁。测试代码


但是有的时候即使固定锁顺序,依旧会产生问题。当有多个互斥量保护同一个类的对象时,对于相同类型的两个不同对象进行数据的交换操作,为了保证数据交换的正确性,就要避免其它线程修改,确保每个对象的互斥量都锁住自己要保护的区域。如果按照前面的的选择一个固定的顺序上锁解锁,则毫无意义,比如:

struct X{
    X(const std::string& str) :object{ str } {}

    friend void swap(X& lhs, X& rhs);
private:
    std::string object;
    std::mutex m;
};

void swap(X& lhs, X& rhs) {
    if (&lhs == &rhs) return;
    std::lock_guard<std::mutex> lock1{ lhs.m }; 
    std::lock_guard<std::mutex> lock2{ rhs.m }; 
    swap(lhs.object, rhs.object);
}

考虑用户调用的时候将参数交换,就会产生死锁

X a{ "🤣" }, b{ "😅" };
std::thread t{ [&] {swap(a, b); } };  // 1
std::thread t2{ [&] {swap(b, a); } }; // 2

1 执行的时候,先上锁 a 的互斥量,再上锁 b 的互斥量。

2 执行的时候,先上锁 b 的互斥量,再上锁 a 的互斥量。

完全可能线程 A 执行 1 的时候上锁了 a 的互斥量,线程 B 执行 2 上锁了 b 的互斥量。线程 A 往下执行需要上锁 b 的互斥量,线程 B 则要上锁 a 的互斥量执行完毕才能解锁,哪个都没办法往下执行,死锁测试代码

其实也就是回到了第一个示例的问题。

C++ 标准库有很多办法解决这个问题,可以使用 std::lock ,它能一次性锁住多个互斥量,并且没有死锁风险。修改 swap 代码后如下:

void swap(X& lhs, X& rhs) {
    if (&lhs == &rhs) return;
    std::lock(lhs.m, rhs.m);    // 给两个互斥量上锁
    std::lock_guard<std::mutex> lock1{ lhs.m,std::adopt_lock }; 
    std::lock_guard<std::mutex> lock2{ rhs.m,std::adopt_lock }; 
    swap(lhs.object, rhs.object);
}

因为前面已经使用了 std::lock 上锁,所以后的 std::lock_guard 构造都额外传递了一个 std::adopt_lock 参数,让其选择到不上锁的构造函数。函数退出也能正常解锁。

std::locklhs.mrhs.m 上锁时若抛出异常,则在重抛前对任何已锁的对象调用 unlock() 解锁,也就是 std::lock 要么将互斥量都上锁,要么一个都不锁。

C++17 新增了 std::scoped_lock ,提供此函数的 RAII 包装,通常它比裸调用 std::lock 更好。

所以我们前面的代码可以改写为:

void swap(X& lhs, X& rhs) {
    if (&lhs == &rhs) return;
    std::scoped_lock guard{ lhs.m,rhs.m };
    swap(lhs.object, rhs.object);
}

对此类有兴趣或任何疑问,建议阅读std::scoped_lock 的源码实现与解析

使用 std::scoped_lock 可以将所有 std::lock 替换掉,减少错误发生。

然而它们的帮助都是有限的,一切最终都是依靠开发者使用与管理。

死锁是多线程编程中令人相当头疼的问题,并且死锁经常是不可预见,甚至难以复现,因为在大部分时间里,程序都能正常完成工作。我们可以通过一些简单的规则,约束开发者的行为,帮助写出“无死锁”的代码

  • 避免嵌套锁

    线程获取一个锁时,就别再获取第二个锁。每个线程只持有一个锁,自然不会产生死锁。如果必须要获取多个锁,使用 std::lock

  • 避免在持有锁时调用外部代码

    这个建议是很简单的:因为代码是外部提供的,所以没办法确定外部要做什么。外部程序可能做任何事情,包括获取锁。在持有锁的情况下,如果用外部代码要获取一个锁,就会违反第一个指导意见,并造成死锁(有时这是无法避免的)。当写通用代码时(比如保护共享数据中的 Date 类)。这不是接口设计者可以处理的,只能寄希望于调用方传递的代码是能正常执行的。

  • 使用固定顺序获取锁

    如同第一个示例那样,固定的顺序上锁就不存在问题。

std::unique_lock 灵活的锁

std::unique_lock 是 C++11 引入的一种通用互斥包装器,它相比于 std::lock_guard 更加的灵活。当然,它也更加的复杂,尤其它还可以与我们下一章要讲的条件变量一起使用。使用它可以将之前使用 std::lock_guardswap 改写一下:

void swap(X& lhs, X& rhs) {
    if (&lhs == &rhs) return;
    std::unique_lock<std::mutex> lock1{ lhs.m, std::defer_lock };
    std::unique_lock<std::mutex> lock2{ rhs.m, std::defer_lock };
    std::lock(lock1, lock2);
    swap(lhs.object, rhs.object);
    ++n;
}

解释这段代码最简单的方式就是直接展示标准库的源码,首先,我们要了解 std::defer_lock 是“不获得互斥体的所有权”。没有所有权自然构造函数就不会上锁,但不止如此。我们还要先知道 std::unique_lock 保有的数据成员(都以 MSVC STL 为例):

private:
    _Mutex* _Pmtx = nullptr;
    bool _Owns    = false;

如你所见很简单,一个互斥量的指针,还有一个就是表示对象是否拥有互斥量所有权的 bool 类型的对象 _Owns 了。我们前面代码会调用构造函数:

unique_lock(_Mutex& _Mtx, defer_lock_t) noexcept
    : _Pmtx(_STD addressof(_Mtx)), _Owns(false) {} // construct but don't lock

如你所见,只是初始化了数据成员而已,注意,这个构造函数没有给互斥量上锁,且 _Ownsfalse 表示没有互斥量所有权。并且 std::unique_lock 是有 lock()try_lock()unlock() 成员函数的,所以可以直接传递给 std::lock、 进行调用。这里还需要提一下 lock() 成员函数的代码:

void lock() { // lock the mutex
    _Validate();
    _Pmtx->lock();
    _Owns = true;
}

如你所见,正常上锁,并且把 _Owns 设置为 true,即表示当前对象拥有互斥量的所有权。那么接下来看析构函数:

~unique_lock() noexcept {
    if (_Owns) {
        _Pmtx->unlock();
    }
}

必须得是当前对象拥有互斥量的所有权析构函数才会调用 unlock() 解锁互斥量。我们的代码因为调用了 lock ,所以 _Owns 设置为 true ,函数结束对象析构的时候会解锁互斥量。


设计挺奇怪的对吧,这个所有权语义。其实上面的代码还不够简单直接,我们再举个例子:

std::mutex m;

int main() {
    std::unique_lock<std::mutex> lock{ m,std::adopt_lock };
    lock.lock();
}

这段代码运行会抛出异常,原因很简单,因为 std::adopt_lock 只是不上锁,但是有所有权,即 _Owns 设置为 true 了,当运行 lock() 成员函数的时候,调用了 _Validate() 进行检测,也就是:

void _Validate() const { // check if the mutex can be locked
    if (!_Pmtx) {
        _Throw_system_error(errc::operation_not_permitted);
    }

    if (_Owns) {
        _Throw_system_error(errc::resource_deadlock_would_occur);
    }
}

满足第二个 if,因为 _Ownstrue 所以抛出异常,别的标准库也都有类似设计。很诡异的设计对吧,正常。除非我们写成:

lock.mutex()->lock();

也就是说 std::unique_lock 要想调用 lock() 成员函数,必须是当前没有所有权

所以正常的用法其实是,先上锁了互斥量,然后传递 std::adopt_lock 构造 std::unique_lock 对象表示拥有互斥量的所有权,即可在析构的时候正常解锁。如下:

std::mutex m;

int main() {
    m.lock();
    std::unique_lock<std::mutex> lock { m,std::adopt_lock };
}

简而言之:

  • 使用 std::defer_lock 构造函数不上锁,要求构造之后上锁

  • 使用 std::adopt_lock 构造函数不上锁,要求在构造之前互斥量上锁

  • 默认构造会上锁,要求构造函数之前和构造函数之后都不能再次上锁


我们前面提到了 std::unique_lock 更加灵活,那么灵活在哪?很简单,它拥有 lock()unlock() 成员函数,所以我们能写出如下代码:

void f() {
    //code..
    
    std::unique_lock<std::mutex> lock{ m };

    // 涉及共享资源的修改的代码...

    lock.unlock(); // 解锁并释放所有权,析构函数不会再 unlock()

    //code..
}

而不是像之前 std::lock_guard 一样使用 {}

另外再聊一聊开销吧,其实倒也还好,多了一个 bool ,内存对齐,x64 环境也就是 16 字节。这都不是最重要的,主要是复杂性和需求,通常建议优先 std::lock_guard,当它无法满足你的需求或者显得代码非常繁琐,那么可以考虑使用 std::unique_lock

在不同作用域传递互斥量

首先我们要明白,互斥量满足互斥体 (Mutex)的要求,不可复制不可移动。所谓的在不同作用域传递互斥量,其实只是传递了它们的指针或者引用罢了。可以利用各种类来进行传递,比如前面提到的 std::unique_lock

std::unique_lock 可以获取互斥量的所有权,而互斥量的所有权可以通过移动操作转移给其他的 std::unique_lock 对象。有些时候,这种转移(就是调用移动构造)是自动发生的,比如当函数返回 std::unique_lock 对象。另一种情况就是得显式使用 std::move

请勿对移动语义和转移所有权抱有错误的幻想,我们说的无非是调用 std::unique_lock 的移动构造罢了:

_NODISCARD_CTOR_LOCK unique_lock(unique_lock&& _Other) noexcept : _Pmtx(_Other._Pmtx), _Owns(_Other._Owns) {
    _Other._Pmtx = nullptr;
    _Other._Owns = false;
}

将数据成员赋给新对象,原来的置空,这就是所谓的 所有权”转移,切勿被词语迷惑。

std::unique_lock 是只能移动不可复制的类,它移动即标志其管理的互斥量的所有权转移了。

一种可能的使用是允许函数去锁住一个互斥量,并将互斥量的所有权转移到调用者上,所以调用者可以在这个锁保护的范围内执行代码。

std::unique_lock<std::mutex>get_lock(){
    extern std::mutex some_mutex;
    std::unique_lock<std::mutex> lk{ some_mutex };
    return lk;

}
void process_data(){
    std::unique_lock<std::mutex> lk{ get_lock() };
    // 执行一些任务...
}

return lk 这里会调用移动构造,将互斥量的所有权转移给调用方, process_data 函数结束的时候会解锁互斥量。

我相信你可能对 extern std::mutex some_mutex 有疑问,其实不用感到奇怪,这是一个互斥量的声明,可能别的翻译单元(或 dll 等)有它的定义,成功链接上。我们前面也说了:“所谓的在不同作用域传递互斥量,其实只是传递了它们的指针或者引用罢了”,所以要特别注意互斥量的生存期

extern 说明符只能搭配变量声明和函数声明(除了类成员或函数形参)。它指定外部链接,而且技术上不影响存储期,但它不能用来定义自动存储期的对象,故所有 extern 对象都具有静态或线程存储期

如果你简单写一个 std::mutex some_mutex 那么函数 process_data 中的 lk 会持有一个悬垂指针。

举一个使用 extern std::mutex 的完整运行示例。当然,其实理论上你 new std::mutex 也是完全可行...... 🤣🤣

std::unique_lock 是灵活的,同样允许在对象销毁之前就解锁互斥量,调用 unlock() 成员函数即可,不再强调。

保护共享数据的初始化过程

保护共享数据并非必须使用互斥量,互斥量只是其中一种常见的方式而已,对于一些特殊的场景,也有专门的保护方式,比如对于共享数据的初始化过程的保护。我们通常就不会用互斥量,这会造成很多的额外开销

我们不想为各位介绍其它乱七八糟的各种保护初始化的方式,我们只介绍三种:双检锁(错误)使用 std::call_once静态局部变量初始化在 C++11 是线程安全

  1. 双检锁(错误)线程不安全

    void f(){
        if(!ptr){      // 1
            std::lock_guard<std::mutex> lk{ m };
            if(!ptr){  // 2
                ptr.reset(new some);  // 3
            }
        }
        ptr->do_something();  // 4
    }

    ① 是查看指针是否为空,空才需要初始化,才需要获取锁。指针为空,当获取锁后会再检查一次指针②(这就是双重检查),避免另一线程在第一次检查后再做初始化,并且让当前线程获取锁。

    然而这显然没用,因为有潜在的条件竞争。未被锁保护的读取操作①没有与其他线程里被锁保护的写入操作③进行同步,因此就会产生条件竞争。

    简而言之:一个线程知道另一个线程已经在执行③,但是此时还没有创建 some 对象,而只是分配内存对指针写入。那么这个线程在①的时候就不会进入,直接执行了 ptr->do_something()④,得不到正确的结果,因为对象还没构造。

    如果你觉得难以理解,那就记住 ptr.reset(new some); 并非是不可打断不可交换的固定指令。

    这种错误写法在一些单例中也非常的常见。如果你的同事或上司写出此代码,一般不建议指出,因为不见得你能教会他们,不要“没事找事”,只要不影响自己即可。

  2. C++ 标准委员会也认为处理此问题很重要,所以标准库提供了 std::call_oncestd::once_flag 来处理这种情况。比起锁住互斥量并显式检查指针,每个线程只需要使用 std::call_once 就可以。使用 std::call_once 比显式使用互斥量消耗的资源更少,特别是当初始化完成之后

    std::shared_ptr<some>ptr;
    std::mutex m;
    std::once_flag resource_flag;
    
    void init_resource(){
        ptr.reset(new some);
    }
    
    void foo(){
        std::call_once(resource_flag, init_resource); // 线程安全的一次初始化
        ptr->do_something();
    }

    以上代码 std::once_flag 对象是全局命名空间作用域声明,如果你有需要,它也可以是类的成员。用于搭配 std::call_once 使用,保证线程安全的一次初始化。std::call_once 只需要接受可调用 (Callable)对象即可,也不要求一定是函数。

    初始化”,那自然是只有一次。但是 std::call_once 也有一些例外情况(比如异常)会让传入的可调用对象被多次调用,即“多次”初始化:

    std::once_flag flag;
    int n = 0;
    
    void f(){
        std::call_once(flag, [] {
            ++n;
            std::cout << "第" << n << "次调用\n";
            throw std::runtime_error("异常");
        });
    }
    
    int main(){
        try{
            f();
        }
        catch (std::exception&){}
        
        try{
            f();
        }
        catch (std::exception&){}
    }

    测试链接。正常情况会保证传入的可调用对象只调用一次,即初始化只有一次。异常之类的是例外。

    这种行为很合理,因为异常代表操作失败,需要进行回溯和重置状态,符合语义和设计。

  3. 静态局部变量初始化在 C++11 是线程安全

    class my_class;
    inline my_class& get_my_class_instance(){
        static my_class instance;  // 线程安全的初始化过程 初始化严格发生一次
        return instance;
    }

    即使多个线程同时访问 get_my_class_instance 函数,也只有一个线程会执行 instance 的初始化,其它线程会等待初始化完成。这种实现方式是线程安全的,不用担心数据竞争。此方式也在单例中多见,被称作“Meyers Singleton”单例,是简单合理的做法。


其实还有不少其他的做法或者反例,但是觉得没必要再聊了,这些已经足够了,再多下去就冗余了。且本文不是详尽的文档,而是“教程”。

保护不常更新的数据结构

试想一下,你有一个数据结构存储了用户的设置信息,每次用户打开程序的时候,都要进行读取,且运行时很多地方都依赖这个数据结构需要读取,所以为了效率,我们使用了多线程读写。这个数据结构很少进行改变,而我们知道,多线程读取,是没有数据竞争的,是安全的,但是有些时候又不可避免的有修改和读取都要工作的时候,所以依然必须得使用互斥量进行保护。

然而使用 std::mutex 的开销是过大的,它不管有没有发生数据竞争(也就是就算全是读的情况)也必须是老老实实上锁解锁,只有一个线程可以运行。如果你学过其它语言或者操作系统,相信这个时候就已经想到了:“读写锁”。

C++ 标准库自然为我们提供了: std::shared_timed_mutex(C++14)、 std::shared_mutex(C++17)。它们的区别简单来说,前者支持更多的操作方式,后者有更高的性能优势。

std::shared_mutex 同样支持 std::lock_guardstd::unique_lock。和 std::mutex 做的一样,保证写线程的独占访问。而那些无需修改数据结构的读线程,可以使用 std::shared_lock<std::shared_mutex> 获取访问权,多个线程可以一起读取。

class Settings {
private:
    std::map<std::string, std::string> data_;
    mutable std::shared_mutex mutex_; // “M&M 规则”:mutable 与 mutex 一起出现

public:
    void set(const std::string& key, const std::string& value) {
        std::lock_guard<std::shared_mutex> lock{ mutex_ };
        data_[key] = value;
    }

    std::string get(const std::string& key) const {
        std::shared_lock<std::shared_mutex> lock(mutex_);
        auto it = data_.find(key);
        return (it != data_.end()) ? it->second : ""; // 如果没有找到键返回空字符串
    }
};

完整代码测试链接。标准输出可能交错,但无数据竞争。

std::shared_timed_mutex 具有 std::shared_mutex 的所有功能,并且额外支持超时功能。所以以上代码可以随意更换这两个互斥量。

std::recursive_mutex

线程对已经上锁的 std::mutex 再次上锁是错误的,这是未定义行为。然而在某些情况下,一个线程会尝试在释放一个互斥量前多次获取,所以提供了std::recursive_mutex

std::recursive_mutex 是 C++ 标准库提供的一种互斥量类型,它允许同一线程多次锁定同一个互斥量,而不会造成死锁。当同一线程多次对同一个 std::recursive_mutex 进行锁定时,只有在解锁与锁定次数相匹配时,互斥量才会真正释放。但它并不影响不同线程对同一个互斥量进行锁定的情况。不同线程对同一个互斥量进行锁定时,会按照互斥量的规则进行阻塞

#include <iostream>
#include <thread>
#include <mutex>

std::recursive_mutex mtx;

void recursive_function(int count) {
    // 递归函数,每次递归都会锁定互斥量
    mtx.lock();
    std::cout << "Locked by thread: " << std::this_thread::get_id() << ", count: " << count << std::endl;
    if (count > 0) {
        recursive_function(count - 1); // 递归调用
    }
    mtx.unlock(); // 解锁互斥量
}

int main() {
    std::thread t1(recursive_function, 3);
    std::thread t2(recursive_function, 2);

    t1.join();
    t2.join();
}

运行测试。

  • lock:线程可以在递归互斥体上重复调用 lock。在线程调用 unlock 匹配次数后,所有权才会得到释放

  • unlock:若所有权层数为 1(此线程对 lock() 的调用恰好比 unlock() 多一次 )则解锁互斥体,否则将所有权层数减少 1。

我们重点的强调了一下这两个成员函数的这个概念,其实也很简单,总而言之就是 unlock 必须和 lock 的调用次数一样,才会真正解锁互斥量。

同样的,我们也可以使用 std::lock_guardstd::unique_lock 帮我们管理 std::recursive_mutex,而非显式调用 lockunlock

void recursive_function(int count) {
    std::lock_guard<std::recursive_mutex> lc{ mtx };
    std::cout << "Locked by thread: " << std::this_thread::get_id() << ", count: " << count << std::endl;
    if (count > 0) {
        recursive_function(count - 1);
    }
}

运行测试。

newdelete 是线程安全的吗?

如果你的标准达到 C++11,要求下列函数是线程安全的:

所以以下函数在多线程运行是线程安全的:

void f(){
    T* p = new T{};
    delete p;
}

内存分配、释放操作是线程安全,构造和析构不涉及共享资源。而局部对象 p 对于每个线程来说是独立的。换句话说,每个线程都有其自己的 p 对象实例,因此它们不会共享同一个对象,自然没有数据竞争。

如果 p 是全局对象(或者外部的,只要可被多个线程读写),多个线程同时对其进行访问和修改时,就可能会导致数据竞争和未定义行为。因此,确保全局对象的线程安全访问通常需要额外的同步措施,比如互斥量或原子操作。

T* p = nullptr;
void f(){
    p = new T{}; // 存在数据竞争
    delete p;
}

即使 p 是局部对象,如果构造函数(析构同理)涉及读写共享资源,那么一样存在数据竞争,需要进行额外的同步措施进行保护。

int n = 1;

struct X{
    X(int v){
        ::n += v;
    }
};

void f(){
    X* p = new X{ 1 }; // 存在数据竞争
    delete p;
}

一个直观的展示是,我们可以在构造函数中使用 std::cout,看到无序的输出,例子


值得注意的是,如果是自己重载 operator newoperator delete 替换了库的全局版本,那么它的线程安全就要我们来保证。

// 全局的 new 运算符,替换了库的版本
void* operator new  (std::size_t count){
    return ::operator new(count); 
}

以上代码是线程安全的,因为 C++11 保证了 new 运算符的库版本,即 ::operator new 是线程安全的,我们直接调用它自然不成问题。如果你需要更多的操作,就得使用互斥量之类的方式保护了。


总而言之,new 表达式线程安全要考虑三方面:operator new、构造函数、修改指针。

delete 表达式线程安全考虑两方面:operator delete、析构函数。

C++ 只保证了 operator newoperator delete 这两个方面的线程安全(不包括用户定义的),其它方面就得自己保证了。前面的内容也都提到了。

线程存储期

线程存储期(也有人喜欢称作“线程局部存储”)的概念源自操作系统,是一种非常古老的机制,广泛应用于各种编程语言。线程存储期的对象在线程开始时分配,并在线程结束时释放。每个线程拥有自己独立的对象实例,互不干扰。在 C++11中,引入了thread_local关键字,用于声明具有线程存储期的对象。不少开发者喜欢直接将声明为线程存储期的对象称为:线程变量;也与全局变量CPU 变量,在一起讨论。

以下是一段示例代码,展示了 thread_local 关键字的使用:

int global_counter = 0;
thread_local int thread_local_counter = 0;

void print_counters(){
    std::cout << "global:" << global_counter++ << '\n';
    std::cout << "thread_local:" << thread_local_counter++ << '\n';
}

int main(){
    std::thread{ print_counters }.join();
    std::thread{ print_counters }.join();
}

运行结果

global:0
thread_local:0
global:1
thread_local:0

这段代码很好的展示了 thread_local 关键字的使用以及它的作用。每一个线程都有独立的 thread_local_counter 对象,它们不是同一个。


我知道你会有问题:“那么 C++11 之前呢?”那时开发者通常使用 POSIX 线程(Pthreads)或 Win32 线程的接口,或者依赖各家编译器的扩展。例如:

  • POSIX:使用 pthread_key_t 和相关的函数( pthread_key_createpthread_setspecificpthread_getspecificpthread_key_delete)来管理线程局部存储。

  • Win32:使用 TLS(Thread Local Storage)机制,通过函数 TlsAllocTlsSetValueTlsGetValueTlsFree 来实现线程局部存储。

  • GCC:使用 __thread

  • MSVC:使用 __declspec(thread)

POSIX 与 Win32 接口的就不再介绍了,有兴趣参见我们的链接即可。我们就拿先前的代码改成使用 GCC 与 MSVC 的编译器扩展即可。

__thread int thread_local_counter = 0;           // GCC
__declspec(thread) int thread_local_counter = 0; // MSVC

MSVC 无法使用 GCC 的编译器扩展,GCC 也肯定无法使用 MSVC 的扩展,不过 Clang 编译器可以,它支持 __thread__declspec(thread) 两种。Clang 默认情况就支持 GCC 的编译器扩展,如果要支持 MSVC,需要设置 -fms-extensions 编译选项。

要注意的是,这些扩展并不是标准的 C++ 语言特性,它们的跨平台性和可移植性较差,我们应当使用 C++ 标准的 thread_local

了解其它 API 以及编译器扩展有助于理解历史上线程存储期的演进。同时扩展知识面。

注意事项

需要注意的是,在 MSVC 的实现中,如果 std::async 策略为 launch::async ,但却并不是每次都创建一个新的线程,而是从线程池获取线程。这意味着无法保证线程局部变量在任务完成时会被销毁。如果线程被回收并用于新的 std::async 调用,则旧的线程局部变量仍然存在。因此,建议不要将线程局部变量与 std::async 一起使用文档

虽然还没有讲 std::async ,不过还是可以注意一下这个问题,我们用一个简单的示例为你展示:

int n = 0;

struct X {
 ~X() { ++n; }
};

thread_local X x{};

void use_thread_local_x() {
 // 如果不写此弃值表达式,那么在 Gcc 与 Clang 编译此代码,会优化掉 x
 (void)x;
}

int main() {
 std::cout << "使用 std::thread: \n";
 std::thread{ use_thread_local_x }.join();
 std::cout << n << '\n';

 std::cout << "使用 std::async: \n";
 std::async(std::launch::async, use_thread_local_x);
 std::cout << n << '\n';
}

在不同编译器上的输出结果:

  • Linux/Windows GCC、Clang:会输出 12,因为线程变量 x 在每个任务中被正确销毁析构。

  • Windows MSVC:会输出 11,因为线程被线程池复用,线程依然活跃,线程变量 x 也就还未释放。

CPU 变量

CPU 变量的概念很好理解。就像线程变量为每个线程提供独立的对象实例,互不干扰一样,CPU 变量也是如此。在创建 CPU 变量时,系统上的每个 CPU 都会获得该变量的一个副本。

在 Linux 内核中,从 2. 版本开始引入了 Per-CPU 变量(Per-CPU variables)功能。Per-CPU 变量是为每个处理器单独分配的变量副本,旨在减少多处理器访问共享数据时的同步开销,提升性能。每个处理器只访问自己的变量副本,不需要进行同步操作,避免了数据竞争,增强了并行处理能力。

在 Windows 内核中,没有直接对应的 Per-CPU 变量机制。

本节是偏向概念的认识,而非实际进行内核编程,C++ 语言层面也并未提供此抽象。理解 CPU 变量的概念对于系统编程和内核开发非常重要。这些概念在面试和技术讨论中常常出现,掌握这些知识不仅有助于应对面试问题,也能提升对多处理器系统性能优化的理解。

局部、全局、线程、CPU 变量的对比与使用

在并发编程中,不同的变量有不同的使用场景和特点。以下是局部变量、全局变量、线程变量、CPU变量的对比:

局部变量(不考虑静态局部)

  • 它拥有自动存储期,随作用域开始分配,结束时释放。每个线程、每次调用都有独立实例,完全独立,几乎无需同步。

全局变量

  • 拥有 静态(static) 存储期,它的存储在程序开始时分配,并在程序结束时解分配;且它在主函数执行之前进行初始化

线程变量

  • 拥有 线程(thread) 存储期。它的存储在线程开始时分配,并在线程结束时解分配。每个线程拥有它自身的对象实例。只有声明为 thread_local 的对象拥有此存储期(不考虑非标准用法)。它的初始化需要考虑局部与非局部两种情况:

    • 非局部变量:所有具有线程局部存储期的非局部变量的初始化,会作为线程启动的一部分进行,并按顺序早于线程函数的执行开始。

    • 静态局部变量:控制流首次经过它的声明时才会被初始化(除非它被零初始化常量初始化)。在其后所有的调用中,声明都会被跳过。

CPU变量

  • 它在标准 C++ 中无对应抽象实现,是操作系统内核功能。它主要依赖于当前系统内核来进行使用,也无法跨平台。基本概念与线程变量类似:CPU 变量是为每个处理器单独分配的变量副本。


  • 局部变量适合临时数据,作用域结束自动释放,几乎无需同步。

  • 全局变量适合整个程序的共享状态,但需要使用同步设施进行保护。

  • 线程变量适合线程的独立状态,通常无需同步。

  • CPU 变量的使用是少见的,主要用于内核开发和追求极致性能的高并发场景,减少 CPU 同步开销。

总而言之,结合实际使用即可,把这四种变量拿出来进行对比,增进理解,加深印象。

总结

本章讨论了多线程的共享数据引发的恶性条件竞争会带来的问题。并说明了可以使用互斥量(std::mutex)保护共享数据,并且要注意互斥量上锁的“粒度”。C++标准库提供了很多工具,包括管理互斥量的管理类(std::lock_guard),但是互斥量只能解决它能解决的问题,并且它有自己的问题(死锁)。同时我们讲述了一些避免死锁的方法和技术。还讲了一下互斥量所有权转移。然后讨论了面对不同情况保护共享数据的不同方式,使用 std::call_once() 保护共享数据的初始化过程,使用读写锁(std::shared_mutex)保护不常更新的数据结构。以及特殊情况可能用到的互斥量 recursive_mutex,有些人可能喜欢称作:递归锁。然后聊了一下 newdelete 运算符的库函数实际是线程安全的。最后介绍了一下线程存储期、CPU 变量,和各种变量进行了一个对比。

下一章,我们将开始讲述同步操作,会使用到 Futures条件变量等设施。

Last updated