内存模型与原子操作

  • 内存模型定义了多线程程序中,读写操作如何在不同线程之间可见,以及这些操作在何种顺序下执行。内存模型确保程序的行为在并发环境下是可预测的。

  • 原子操作即不可分割的操作。系统的所有线程,不可能观察到原子操作完成了一半。

最基础的概念就是如此,这里不再过多赘述,后续还会详细展开内存模型的问题。

原子操作

int a = 0;
void f(){
    ++a;
}

显然,++a 是非原子操作,也就是说在多线程中可能会被另一个线程观察到只完成一半。

  1. 线程 A 和线程 B 同时开始修改变量 a 的值。

  2. 线程 A 对 a 执行递增操作,但还未完成。

  3. 在线程 A 完成递增操作之前,线程 B 也执行了递增操作。

  4. 线程 C 读取 a 的值。

线程 C 到底读取到多少不确定,a 的值是多少也不确定。显然,这构成了数据竞争,出现了未定义行为

在之前的内容中,我们讲述了使用很多同步设施,如互斥量,来保护共享资源。

std::mutex m;
void f() {
    std::lock_guard<std::mutex> lc{ m };
    ++a;
}

通过互斥量的保护,即使 ++a 本身不是原子操作,逻辑上也可视为原子操作。互斥量确保了对共享资源的读写是线程安全的,避免了数据竞争问题。

不过这显然不是我们的重点。我们想要的是一种原子类型,它的所有操作都直接是原子的,不需要额外的同步设施进行保护。C++11 引入了原子类型 std::atomic,在下节我们会详细讲解。

原子类型 std::atomic

标准原子类型定义在头文件 <atomic> 中。这些类型的操作都是原子的,语言定义中只有这些类型的操作是原子的,虽然也可以用互斥量来模拟原子操作(见上文)。

标准原子类型的实现通常包括一个 is_lock_free() 成员函数,允许用户查询特定原子类型的操作是否是通过直接的原子指令实现(返回 true),还是通过锁来实现(返回 false)。

如果一个线程写入原子对象,同时另一线程从它读取,那么行为有良好定义(数据竞争的细节见内存模型)。

原子操作可以在一些时候代替互斥量,来进行同步操作,也能带来更高的性能。但是如果它的内部使用互斥量实现,那么不可能有性能的提升。

在 C++17 中,所有原子类型都有一个 static constexpr 的数据成员 is_always_lock_free 。如果当前环境上的原子类型 X 是无锁类型,那么 X::is_always_lock_free 将返回 true 。例如:

std::atomic<int>::is_always_lock_free // true 或 false

标准库还提供了一组宏 ATOMIC_xxx_LOCK_FREE ,在编译时对各种整数原子类型是否无锁进行判断。

// (C++11 起)
#define ATOMIC_BOOL_LOCK_FREE     /* 未指定 */
#define ATOMIC_CHAR_LOCK_FREE     /* 未指定 */
#define ATOMIC_CHAR16_T_LOCK_FREE /* 未指定 */
#define ATOMIC_CHAR32_T_LOCK_FREE /* 未指定 */
#define ATOMIC_WCHAR_T_LOCK_FREE  /* 未指定 */
#define ATOMIC_SHORT_LOCK_FREE    /* 未指定 */
#define ATOMIC_INT_LOCK_FREE      /* 未指定 */
#define ATOMIC_LONG_LOCK_FREE     /* 未指定 */
#define ATOMIC_LLONG_LOCK_FREE    /* 未指定 */
#define ATOMIC_POINTER_LOCK_FREE  /* 未指定 */
// (C++20 起)
#define ATOMIC_CHAR8_T_LOCK_FREE  /* 未指定 */
  • 对于一定有锁的内建原子类型是 0;

  • 对于有时无锁的内建原子类型是 1;

  • 对于一定无锁的内建原子类型是 2。

我们可以使用这些宏来对代码进行编译时的优化和检查,以确保在特定平台上原子操作的性能。例如,如果我们知道某些操作在目标平台上是无锁的,那么我们可以利用这一点进行性能优化。如果这些操作在目标平台上是有锁的,我们可能会选择其它同步机制。

// 检查 std::atomic<int> 是否总是无锁
if constexpr(std::atomic<int>::is_always_lock_free) {
    std::cout << "当前环境 std::atomic<int> 始终是无锁" << std::endl;
}
else {
    std::cout << "当前环境 std::atomic<int> 并不总是无锁" << std::endl;
}

// 使用 ATOMIC_INT_LOCK_FREE 宏进行编译时检查
#if ATOMIC_INT_LOCK_FREE == 2
    std::cout << "int 类型的原子操作一定无锁的。" << std::endl;
#elif ATOMIC_INT_LOCK_FREE == 1
    std::cout << "int 类型的原子操作有时是无锁的。" << std::endl;
#else
    std::cout << "int 类型的原子操作一定有锁的。" << std::endl;
#endif

运行测试。

如你所见,我们写了一个简单的示例,展示了如何使用 C++17 的静态数据成员 is_always_lock_free 和预处理宏来让程序执行不同的代码。

因为 is_always_lock_free 是编译期常量,所以我们可以使用 C++17 引入的 constexpr if ,它可以在编译阶段进行决策,避免了运行时的判断开销,提高了性能。

宏则更是简单了,最基本的预处理器判断,在预处理阶段就选择编译合适的代码。

在实际应用中,如果一个类型的原子操作总是无锁的,我们可以更放心地在性能关键的代码路径中使用它。例如,在高频交易系统、实时系统或者其它需要高并发性能的场景中,无锁的原子操作可以显著减少锁的开销和竞争,提高系统的吞吐量和响应时间。

另一方面,如果发现某些原子类型在目标平台上是有锁的,我们可以考虑以下优化策略:

  1. 使用不同的数据结构:有时可以通过改变数据结构来避免对原子操作的依赖。

  2. 减少原子操作的频率:通过批处理等技术,减少对原子操作的调用次数。

  3. 使用更高效的同步机制:在一些情况下,其它同步机制(如读写锁)可能比原子操作更高效。

当然,其实很多时候根本没这种性能的担忧,我们很多时候使用原子对象只是为了简单方便,比如 std::atomic<bool> 表示状态、std::atomic<int> 进行计数等。即使它们是用了锁,那也是封装好了的,起码用着方便,而不需要在代码中引入额外的互斥量来保护,更加简洁。这也是很正常的需求,各位不但要考虑程序的性能,同时也要考虑代码的简洁性、易用性。即使使用原子类型无法带来效率的提升,那也没有负提升。


除了直接使用 std::atomic 模板外,也可以使用原子类型的别名。这个数量非常之多,见 MSVC STL

对于标准内建类型的别名,就是在原子类型的类型名前面加上 atomic_ 的前缀:atomic_T。不过 signed 缩写 sunsigned 缩写 ulong long 缩写 llong

using atomic_char   = atomic<char>;
using atomic_schar  = atomic<signed char>;
using atomic_uchar  = atomic<unsigned char>;
using atomic_short  = atomic<short>;
using atomic_ushort = atomic<unsigned short>;
using atomic_int    = atomic<int>;
using atomic_uint   = atomic<unsigned int>;
using atomic_long   = atomic<long>;
using atomic_ulong  = atomic<unsigned long>;
using atomic_llong  = atomic<long long>;
using atomic_ullong = atomic<unsigned long long>;

通常 std::atomic 对象不可进行复制、移动、赋值,因为它们的复制构造复制赋值运算符被定义为弃置的。不过可以隐式转换成对应的内置类型,因为它有转换函数

atomic(const atomic&) = delete;
atomic& operator=(const atomic&) = delete;
operator T() const noexcept;

可以使用 load()store()exchange()compare_exchange_weak()compare_exchange_strong() 等成员函数对 std::atomic 进行操作。如果是整数类型的特化,还支持 ++--+=-=&=|=^=fetch_addfetch_sub 等操作方式。在后面详细的展开使用。

std::atomic 类模板不仅只能使用标准库为我们定义的特化类型,我们也完全可以自定义类型创建对应的原子对象。不过因为是通用模板,操作仅限 load()store()exchange()compare_exchange_weak()compare_exchange_strong(),以及一个转换函数。

模板 std::atomic 可用任何满足可复制构造 (CopyConstructible)可复制赋值 (CopyAssignable)可平凡复制 (TriviallyCopyable)类型 T 实例化。

struct trivial_type {
    int x{};
    float y{};

    trivial_type() {}

    trivial_type(int a, float b) : x{ a }, y{ b } {}

    trivial_type(const trivial_type& other) = default;

    trivial_type& operator=(const trivial_type& other) = default;

    ~trivial_type() = default;
};

验证自己的类型是否满足 std::atomic 要求,我们可以使用静态断言

static_assert(std::is_trivially_copyable<trivial_type>::value, "");
static_assert(std::is_copy_constructible<trivial_type>::value, "");
static_assert(std::is_move_constructible<trivial_type>::value, "");
static_assert(std::is_copy_assignable<trivial_type>::value, "");
static_assert(std::is_move_assignable<trivial_type>::value, "");

程序能通过编译即代表满足要求。如果不满足要求,静态断言求值中第一个表达式求值为 false,则编译错误。显然我们的类型满足要求,我们可以尝试使用一下它:

// 创建一个 std::atomic<trivial_type> 对象
std::atomic<trivial_type> atomic_my_type { trivial_type{ 10, 20.5f } };

// 使用 store 和 load 操作来设置和获取值
trivial_type new_value{ 30, 40.5f };
atomic_my_type.store(new_value);

trivial_type loadedValue = atomic_my_type.load();
std::cout << "x: " << loadedValue.x << ", y: " << loadedValue.y << std::endl;

// 使用 exchange 操作
trivial_type exchanged_value = atomic_my_type.exchange(trivial_type{ 50, 60.5f });
std::cout << "交换前的 x: " << exchanged_value.x
          << ", 交换前的 y: " << exchanged_value.y << std::endl;
std::cout << "交换后的 x: " << atomic_my_type.load().x
          << ", 交换后的 y: " << atomic_my_type.load().y << std::endl;

运行测试。

没有问题,不过其实我们的 trivial_type 直接改成:

struct trivial_type {
    int x;
    float y;
};

运行测试。

也是完全可以的,满足要求。先前只是为了展示一下显式写明的情况。


原子类型的每个操作函数,都有一个内存序参数,这个参数可以用来指定执行顺序,在后面的内容会详细讲述,现在只需要知道操作分为三类:

  1. Store 操作(存储操作):可选的内存序包括 memory_order_relaxedmemory_order_releasememory_order_seq_cst

  2. Load 操作(加载操作):可选的内存序包括 memory_order_relaxedmemory_order_consumememory_order_acquirememory_order_seq_cst

  3. Read-modify-write(读-改-写)操作:可选的内存序包括 memory_order_relaxedmemory_order_consumememory_order_acquirememory_order_releasememory_order_acq_relmemory_order_seq_cst

本节主要广泛介绍 std::atomic,而未展开具体使用。在后续章节中,我们将更详细地讨论一些版本,如 std::atomic<bool>,并介绍其成员函数和使用方法。

最后强调一下:任何 std::atomic 类型,初始化不是原子操作

std::atomic_flag

std::atomic_flag 是最简单的原子类型,这个类型的对象可以在两个状态间切换:设置(true)和清除(false)。它很简单,通常只是用作构建一些库设施,不会单独使用或直接面向普通开发者。

在 C++20 之前,std::atomic_flag 类型的对象需要以 ATOMIC_FLAG_INIT 初始化,可以确保此时对象处于 "清除"(false)状态。

std::atomic_flag f = ATOMIC_FLAG_INIT;

C++20std::atomic_flag 的默认构造函数保证对象为“清除”(false)状态,就不再需要使用 ATOMIC_FLAG_INIT

ATOMIC_FLAG_INIT 其实并不是什么复杂的东西,它在不同的标准库实现中只是简单的初始化:在 MSVC STL 它只是一个 {},在 libstdc++libc++ 它只是一个 { 0 }。也就是说我们可以这样初始化:

std::atomic_flag f ATOMIC_FLAG_INIT;
std::atomic_flag f2 = {};
std::atomic_flag f3{};
std::atomic_flag f4{ 0 };

使用 ATOMIC_FLAG_INIT 宏只是为了统一,我们知道即可。

当标志对象已初始化,它只能做三件事情:销毁、清除、设置。这些操作对应的函数分别是:

  1. clear() (清除):将标志对象的状态原子地更改为清除(false)

  2. test_and_set(测试并设置):将标志对象的状态原子地更改为设置(true),并返回它先前保有的值。

  3. 销毁:对象的生命周期结束时,自动调用析构函数进行销毁操作。

每个操作都可以指定内存顺序。clear() 是一个“读-改-写”操作,可以应用任何内存顺序。默认的内存顺序是 memory_order_seq_cst。例如:

f.clear(std::memory_order_release);
bool r = f.test_and_set();
  1. f 的状态原子地更改为清除(false),指明 memory_order_release 内存序。

  2. f 的状态原子地更改为设置(true),并返回它先前保有的值给 r。使用默认的 memory_order_seq_cst 内存序。

不用着急,这里还不是详细展开聊内存序的时候。

std::atomic_flag 不可复制不可移动不可赋值。这不是 std::atomic_flag 特有的,而是所有原子类型共有的属性。原子类型的所有操作都是原子的,而赋值和复制涉及两个对象,破坏了操作的原子性。复制构造和复制赋值会先读取第一个对象的值,然后再写入另一个对象。对于两个独立的对象,这里实际上有两个独立的操作,合并这两个操作无法保证其原子性。因此,这些操作是不被允许的。

有限的特性使得 std::atomic_flag 非常适合用作制作自旋锁

自旋锁可以理解为一种忙等锁,因为它在等待锁的过程中不会主动放弃 CPU,而是持续检查锁的状态。

与此相对,std::mutex 互斥量是一种睡眠锁。当线程请求锁(lock())而未能获取时,它会放弃 CPU 时间片,让其他线程得以执行,从而有效利用系统资源。

从性能上看,自旋锁的响应更快,但是睡眠锁更加节省资源,高效。

class spinlock_mutex {
    std::atomic_flag flag{};
public:
    spinlock_mutex()noexcept = default;
    void lock()noexcept {
        while (flag.test_and_set(std::memory_order_acquire));
    }

    void unlock()noexcept {
        flag.clear(std::memory_order_release);
    }
};

我们可以简单的使用测试一下,它是有效的:

spinlock_mutex m;

void f(){
    std::lock_guard<spinlock_mutex> lc{ m };
    std::cout << "😅😅" << "❤️❤️\n";
}

运行测试。

稍微聊一下原理,我们的 spinlock_mutex 对象中存储的 flag 对象在默认构造时是清除 (false) 状态。在 lock() 函数中调用 test_and_set 函数,它是原子的,只有一个线程能成功调用并将 flag 的状态原子地更改为设置 (true),并返回它先前的值 (false)。此时,该线程成功获取了锁,退出循环。

flag 对象的状态为设置 (true) 时,其它线程调用 test_and_set 函数会返回 true,导致它们继续在循环中自旋,无法退出。直到先前持有锁的线程调用 unlock() 函数,将 flag 对象的状态原子地更改为清除 (false) 状态。此时,等待的线程中会有一个线程成功调用 test_and_set 返回 false,然后退出循环,成功获取锁。

值得注意的是,我们只是稍微的讲一下使用 std::atomic_flag 实现自旋锁。不过并不推荐各位在实践中使用它,具体可参见 Linus Torvalds文章。其中有一段话说得很直接:

  • 我再说一遍:不要在用户空间中使用自旋锁,除非你真的知道自己在做什么。请注意,你知道自己在做什么的可能性基本上为零。 I repeat: do not use spinlocks in user space, unless you actually know what you're doing. And be aware that the likelihood that you know what you are doing is basically nil.

然后就是推荐使用 std::mutexpthread_mutex ,比自旋好的多。

std::atomic_flag 的局限性太强,甚至不能当普通的 bool 标志那样使用。一般最好使用 std::atomic<bool>,下节,我们来使用它。

std::atomic<bool>

std::atomic<bool> 是最基本的整数原子类型 ,它相较于 std::atomic_flag 提供了更加完善的布尔标志。虽然同样不可复制不可移动,但可以使用非原子的 bool 类型进行构造,初始化为 true 或 false,并且能从非原子的 bool 对象赋值给 std::atomic<bool>

std::atomic<bool> b{ true };
b = false;

不过这个 operator= 不同于通常情况,赋值操作 b = false 返回一个普通的 bool 值。

这个行为不仅仅适用于std::atomic<bool>,而是适用于所有std::atomic类型。

如果原子变量的赋值操作返回了一个引用,那么依赖这个结果的代码需要显式地进行加载(load),以确保数据的正确性。例如:

std::atomic<bool>b {true};
auto& ref = (b = false);  // 假设返回 atomic 引用
bool flag = ref.load();   // 那就必须显式调用 load() 加载

通过返回非原子值进行赋值,可以避免多余的加载(load)过程,得到实际存储的值。

std::atomic<bool> b{ true };
bool new_value = (b = false);  // new_value 将是 false

使用 store 原子的替换当前对象的值,远好于 std::atomic_flagclear()test_and_set() 也可以换为更加通用常见的 exchange,它可以原子的使用新的值替换已经存储的值,并返回旧值。

获取 std::atomic<bool> 的值有两种方式,调用 load() 函数,或者隐式转换

store 是一个存储操作、load 是一个加载操作exchange 是一个“读-改-写”操作:

std::atomic<bool> b;
bool x = b.load(std::memory_order_acquire);
b.store(true);
x = b.exchange(false, std::memory_order_acq_rel);

std::atomic<bool> 提供多个“读-改-写”的操作,exchange 只是其中之一。它还提供了一种存储方式:当前值与预期一致时,存储新值。

这种操作叫做“比较/交换”,它的形式表现为 compare_exchange_weak()compare_exchang_strong()

  • compare_exchange_weak:尝试将原子对象的当前值与预期值进行比较,如果相等则将其更新为新值并返回 true;否则,将原子对象的值加载进 expected(进行加载操作)并返回 false此操作可能会由于某些硬件的特性而出现假失败,需要在循环中重试

    std::atomic<bool> flag{ false };
    bool expected = false;
    
    while (!flag.compare_exchange_weak(expected, true));

    运行测试。

    返回 false 即代表出现了假失败,因此需要在循环中重试。。

  • compare_exchange_strong:类似于 compare_exchange_weak但不会出现假失败,因此不需要重试。适用于需要确保操作成功的场合。

    std::atomic<bool> flag{ false };
    bool expected = false;
    
    void try_set_flag() {
        // 尝试将 flag 设置为 true,如果当前值为 false
        if (flag.compare_exchange_strong(expected, true)) {
            std::cout << "flag 为 false,设为 true。\n";
        }
        else {
            std::cout << "flag 为 true, expected 设为 true。\n";
        }
    }

    运行测试。

    假设有两个线程运行 try_set_flag 函数,那么第一个线程调用 compare_exchange_strong 将原子对象 flag 设置为 true。第二个线程调用 compare_exchange_strong,当前原子对象的值为 true,而 expectedfalse,不相等,将原子对象的值设置给 expected。此时 flagexpected 均为 true

exchange 的另一个不同是,compare_exchange_weakcompare_exchange_strong 允许指定成功和失败情况下的内存序。这意味着你可以根据成功或失败的情况,为原子操作指定不同的内存序。

std::atomic<bool> data{ false };
bool expected = false;

// 成功时的内存序为 memory_order_release,失败时的内存序为 memory_order_acquire
if (data.compare_exchange_weak(expected, true,
    std::memory_order_release, std::memory_order_acquire)) {
    // 操作成功
}
else {
    // 操作失败
}

另一个简单的原子类型是特化的原子指针,即:std::atomic<T*>,下一节我们来看看它是如何工作的。

std::atomic<T*>

std::atomic<T*> 是一个原子指针类型,T 是指针所指向的对象类型。操作是针对 T 类型的指针进行的。虽然 std::atomic<T*> 不能被拷贝和移动,但它可以通过符合类型的指针进行构造和赋值。

std::atomic<T*> 拥有以下成员函数:

  • load():以原子方式读取指针值。

  • store():以原子方式存储指针值。

  • exchange():以原子方式交换指针值。

  • compare_exchange_weak()compare_exchange_strong():以原子方式比较并交换指针值。

这些函数接受并返回的类型都是 T*。此外,std::atomic<T*> 还提供了以下操作:

  • fetch_add:以原子方式增加指针的值。(p.fetch_add(1) 会将指针 p 向前移动一个元素,并返回操作前的指针值)

  • fetch_sub:以原子方式减少指针的值。返回操作前的指针值。

  • operator+=operator-=:以原子方式增加或减少指针的值。返回操作前的指针值。

这些操作确保在多线程环境下进行安全的指针操作,避免数据竞争和并发问题。

使用示例如下:

struct Foo {};

Foo array[5]{};
std::atomic<Foo*> p{ array };

// p 加 2,并返回原始值
Foo* x = p.fetch_add(2);
assert(x == array);
assert(p.load() == &array[2]);

// p 减 1,并返回原始值
x = (p -= 1);
assert(x == &array[1]);
assert(p.load() == &array[1]);

// 函数也允许内存序作为给定函数的参数
p.fetch_add(3, std::memory_order_release);

这个特化十分简单,我们无需过多赘述。

std::atomic<std::shared_ptr>

在前文中,我们多次提到 std::shared_ptr

第四章中提到:多个线程能在不同的 shared_ptr 对象上调用所有成员函数(包含复制构造函数与复制赋值)而不附加同步,即使这些实例是同一对象的副本且共享所有权也是如此。若多个执行线程访问同一 shared_ptr 对象而不同步,且任一线程使用 shared_ptr 的非 const 成员函数,则将出现数据竞争;std::atomic<shared_ptr> 能用于避免数据竞争。文档

一个在互联网上非常热门的八股问题是:std::shared_ptr 是不是线程安全?

显然,它并不是完全线程安全的,尽管在多线程环境中有很大的保证,但这还不够。在 C++20 中,原子模板 std::atomic 引入了一个偏特化版本 std::atomic<std::shared_ptr> 允许用户原子地操纵 shared_ptr 对象。因为它是 std::atomic 的特化版本,即使我们还没有深入讲述它,也能知道它是原子类型,这意味着它的所有操作都是原子操作

若多个执行线程不同步地同时访问同一 std::shared_ptr 对象,且任何这些访问使用了 shared_ptr 的非 const 成员函数,则将出现数据竞争除非通过 std::atomic<std::shared_ptr> 的实例进行所有访问

class Data {
public:
    Data(int value = 0) : value_(value) {}
    int get_value() const { return value_; }
    void set_value(int new_value) { value_ = new_value; }
private:
    int value_;
};

auto data = std::make_shared<Data>();

void writer(){
    for (int i = 0; i < 10; ++i) {
        std::shared_ptr<Data> new_data = std::make_shared<Data>(i);
        data.swap(new_data); // 调用非 const 成员函数
        std::this_thread::sleep_for(100ms);
    }
}

void reader(){
    for (int i = 0; i < 10; ++i) {
        if (data) {
            std::cout << "读取线程值: " << data->get_value() << std::endl;
        }
        else {
            std::cout << "没有读取到数据" << std::endl;
        }
        std::this_thread::sleep_for(100ms);
    }
}

int main(){
    std::thread writer_thread{ writer };
    std::thread reader_thread{ reader };

    writer_thread.join();
    reader_thread.join();
}

运行测试。

以上这段代码是典型的线程不安全,它满足:

  1. 多个线程不同步地同时访问同一 std::shared_ptr 对象

  2. 任一线程使用 shared_ptr 的非 const 成员函数

那么为什么呢?为什么满足这些概念就是线程不安全呢?为了理解这些概念,首先需要了解 shared_ptr 的内部实现:

shared_ptr 的通常实现只保有两个指针

  • 指向底层元素的指针(get()) 所返回的指针)

  • 指向控制块 的指针

控制块是一个动态分配的对象,其中包含:

  • 指向被管理对象的指针或被管理对象本身

  • 删除器(类型擦除)

  • 分配器(类型擦除)

  • 持有被管理对象的 shared_ptr 的数量

  • 涉及被管理对象的 weak_ptr 的数量

控制块是线程安全的,这意味着多个线程可以安全地操作引用计数和访问管理对象,即使这些 shared_ptr 实例是同一对象的副本且共享所有权也是如此。因此,多个线程可以安全地创建、销毁和复制 shared_ptr 对象,因为这些操作仅影响控制块中的引用计数。

然而,shared_ptr 对象实例本身并不是线程安全的。shared_ptr 对象实例包含一个指向控制块的指针和一个指向底层元素的指针。这两个指针的操作在多个线程中并没有同步机制。因此,如果多个线程同时访问同一个 shared_ptr 对象实例并调用非 const 成员函数(如 resetoperator=),这些操作会导致对这些指针的并发修改,进而引发数据竞争。

如果不是同一 shared_ptr 对象,每个线程读写的指针也不是同一个,控制块又是线程安全的,那么自然不存在数据竞争,可以安全的调用所有成员函数。


使用 std::atomic<shared_ptr> 修改:

std::atomic<std::shared_ptr<Data>> data = std::make_shared<Data>();

void writer() {
    for (int i = 0; i < 10; ++i) {
        std::shared_ptr<Data> new_data = std::make_shared<Data>(i);
        data.store(new_data); // 原子地替换所保有的值
        std::this_thread::sleep_for(10ms);
    }
}

void reader() {
    for (int i = 0; i < 10; ++i) {
        if (auto sp = data.load()) {
            std::cout << "读取线程值: " << sp->get_value() << std::endl;
        }
        else {
            std::cout << "没有读取到数据" << std::endl;
        }
        std::this_thread::sleep_for(10ms);
    }
}

很显然,这是线程安全的,store 是原子操作,而 sp->get_value() 只是个读取操作。

我知道,你肯定会想着:能不能调用 load() 成员函数原子地返回底层的 std::shared_ptr 再调用 swap 成员函数?

可以,但是没有意义,因为 load() 成员函数返回的是底层 std::shared_ptr副本,也就是一个临时对象。对这个临时对象调用 swap 并不会改变 data 本身的值,因此这种操作没有实际意义,尽管这不会引发数据竞争(因为是副本)。

由于我们没有对读写操作进行同步,只是确保了操作的线程安全,所以多次运行时可能会看到一些无序的打印,这是正常的。

不过事实上 std::atomic<std::shared_ptr> 的功能相当有限,单看它提供的修改接口(=storeloadexchang)就能明白。如果要操作其保护的共享指针指向的资源还是得 load() 获取底层共享指针的副本。此时再进行操作时就得考虑 std::shared_ptr 本身在多线程的支持了。


在使用 std::atomic<std::shared_ptr> 的时候,我们要注意第三章中关于共享数据的一句话:

切勿将受保护数据的指针或引用传递到互斥量作用域之外,不然保护将形同虚设

原子类型也有类似的问题,以下是一个例子:

std::atomic<std::shared_ptr<int>> ptr = std::make_shared<int>(10);
*ptr.load() = 100;
  1. 调用 load() 成员函数,原子地返回底层共享指针的副本 std::shared_ptr

  2. 解引用,等价 *get(),返回了 int&

  3. 直接修改这个引用所指向的资源。

在第一步时,已经脱离了 std::atomic 的保护,第二步就获取了被保护的数据的引用,第三步进行了修改,这导致了数据竞争。当然了,这种做法非常的愚蠢,只是为了表示,所谓的线程安全,也是要靠开发者的正确使用

正确的用法如下:

std::atomic<std::shared_ptr<int>> ptr = std::make_shared<int>(10);
std::atomic_ref<int> ref{ *ptr.load() };
ref = 100; // 原子地赋 100 给被引用的对象

通过使用 std::atomic_ref 我们得以确保在修改共享资源时保持操作的原子性,从而避免了数据竞争。


最后再来稍微聊一聊提供的 waitnotify_onenotify_all 成员函数。这并非是 std::atomic<shared_ptr> 专属,C++20 以后任何 atomic 的特化都拥有这些成员函数,使用起来也都十分的简单,我们这里用一个简单的例子为你展示一下:

std::atomic<std::shared_ptr<int>> ptr = std::make_shared<int>();

void wait_for_wake_up(){
    std::osyncstream{ std::cout }
        << "线程 "
        << std::this_thread::get_id()
        << " 阻塞,等待更新唤醒\n";

    // 等待 ptr 变为其它值
    ptr.wait(ptr.load());

    std::osyncstream{ std::cout }
        << "线程 "
        << std::this_thread::get_id()
        << " 已被唤醒\n";
}

void wake_up(){
    std::this_thread::sleep_for(5s);

    // 更新值并唤醒
    ptr.store(std::make_shared<int>(10));
    ptr.notify_one();
}

内存次序

前言

事实上我们在前面就用到了不少的内存次序,只不过一直没详细展开讲解。

在开始学习之前,我们需要强调一些基本的认识:

  1. 内存次序是非常底层知识:对于普通开发者来说,了解内存次序并非硬性需求。如果您主要关注业务开发,可以直接跳过本节内容。如果您对内存次序感兴趣,则需要注意其复杂性和难以观察的特性,这将使学习过程具有一定挑战性。

  2. 内存次序错误的使用难以察觉:即使通过多次(数以万计)运行也难以发现。这是因为许多内存次序问题是由于极端的、少见的情况下的竞争条件引起的,而这些情况很难被重现。此外,即使程序在某些平台上运行正常,也不能保证它在其他平台上也能表现良好,因为不同的 CPU 和编译器可能对内存操作的顺序有不同的处理(例如 x86 架构内存模型:Total Store Order (TSO),是比较严格的内存模型)。因此,开发者必须依赖自己的知识和经验,以及可能的测试和调试技术,来发现和解决内存次序错误。

错误难以被我们观察到的原因其实可以简单的说:

  • CPU 与编译器不是神经病,没有好处不会闲的没事给你指令重排


  • 编译器重排:编译器在编译代码时,为了提高性能,可以按照一定规则重新安排代码的执行顺序。例如,可以将不相关的指令重排,使得 CPU 流水线更加高效地执行指令。编译器优化需要遵守一个“如同规则(as-if rule)”,即不可改变可观察的副作用。

  • CPU 重排:CPU 在运行程序时,也会对指令进行重排,以提高执行效率,减少等待时间。这种重排通常遵循一些硬件层面的优化规则,如内存访问的优化。

你们可能还有疑问:“单线程能不能指令重排?”

CPU 的指令重排必须遵循一定的规则,以确保程序的可观察副作用不受影响。对于单线程程序,CPU 会保证外部行为的一致性。对于多线程程序,需要开发者使用同步原语来显式地控制内存操作的顺序和可见性,确保多线程环境下的正确性。而标准库中提供的原子对象的原子操作,还可以设置内存次序。

那有没有可能:

  • end 重排到 start 前面了!指令重排了!

这也就是前面说的,把 CPU 与编译器当神经病。各位写代码难道还要考虑下面这段,会不会指令重排导致先输出 end 吗?这显然不现实。

print("start"); // 1
print("end");   // 2 

不禁止就是有可能,但是我们无需在乎,就算真的 CPU 将 end 重排到 start 前面了,也得在可观测行为发生前回溯了。所以我一直在强调,这些东西,我们无需在意

好了,到此,基本认识也就足够了,以上的示例更多的是泛指,知道其表达的意思就好,这些还是简单直接且符合直觉的。

可见

可见 是 C++ 多线程并发编程中的一个重要概念,它描述了一个线程中的数据修改对其他线程的可见程度。具体来说,如果线程 A 对变量 x 进行了修改,那么其他线程 B 是否能够看到线程 A 对 x 的修改,就涉及到可见的问题。

在讨论多线程的内存模型和执行顺序时,虽然经常会提到 CPU 重排、编译器优化、缓存等底层细节,但真正核心的概念是可见,而不是这些底层实现细节。

C++ 标准中的可见

  • 如果线程 A 对变量 x 进行了修改,而线程 B 能够读取到线程 A 对 x 的修改,那么我们说线程 B 能看到线程 A 对 x 的修改。也就是说,线程 A 的修改对线程 B 是可见的。

C++ 标准通过内存序(memory order)来定义如何确保这种可见,而不必直接关心底层的 CPU 和编译器的具体行为。内存序提供了操作之间的顺序关系,确保即使存在 CPU 重排、编译器优化或缓存问题,线程也能正确地看到其他线程对共享数据的修改。

例如,通过使用合适的内存序(如 memory_order_release 和 memory_order_acquire),可以确保线程 A 的写操作在其他线程 B 中是可见的,从而避免数据竞争问题。

总结:

  • 可见 关注的是线程之间的数据一致性,而不是底层的实现细节。

  • 使用 C++ 的内存序机制可以确保数据修改的可见,而不必过多关注具体的 CPU 和编译器行为。

这种描述方式可以帮助更清楚地理解和描述多线程并发编程中如何通过 C++ 标准的内存模型来确保线程之间的数据一致性,而无需太多关注底层细节。


我知道各位肯定有疑问,我们大多数时候写多线程代码都从来没使用过内存序,一般都是互斥量、条件变量等高级同步设施,这没有可见性的问题吗?

没有,这些设施自动确保数据的可见性。例如: std::mutexunlock() 保证:

  • 此操作同步于任何后继的取得同一互斥体所有权的锁定操作。

也就是 unlock() 同步于 lock()

同步于”:操作 A 的完成会确保操作 B 在其之后的执行中,能够看到操作 A 所做的所有修改。

也就是说:

  • std::mutexunlock() 操作同步于任何随后的 lock() 操作。这意味着,线程在调用 unlock() 时,对共享数据的修改会对之后调用 lock() 的线程可见

std::memory_order

std::memory_order 是一个枚举类型,用来指定原子操作的内存顺序,影响这些操作的行为。

typedef enum memory_order {
    memory_order_relaxed,
    memory_order_consume,
    memory_order_acquire,
    memory_order_release,
    memory_order_acq_rel,
    memory_order_seq_cst
} memory_order;

// C++20 起则为:

enum class memory_order : /* 未指明 */ {
    relaxed, consume, acquire, release, acq_rel, seq_cst
};
inline constexpr memory_order memory_order_relaxed = memory_order::relaxed;
inline constexpr memory_order memory_order_consume = memory_order::consume;
inline constexpr memory_order memory_order_acquire = memory_order::acquire;
inline constexpr memory_order memory_order_release = memory_order::release;
inline constexpr memory_order memory_order_acq_rel = memory_order::acq_rel;
inline constexpr memory_order memory_order_seq_cst = memory_order::seq_cst;

这 6 个常量,每一个常量都表示不同的内存次序。

大体来说我们可以将它们分为三类。

  1. memory_order_relaxed 宽松定序:不是定序约束,仅对此操作要求原子性

  2. memory_order_seq_cst 序列一致定序,这是库中所有原子操作的默认行为,也是最严格的内存次序,是绝对安全的。

剩下的就是第三类。

其它概念

x86ARM 的内存模型:强一致性与弱一致性

内存模型是软件与实现之间的一种约定契约。它定义了在多线程或并发环境中,如何对内存操作的顺序和一致性进行规范,以确保程序的正确性和可靠性。

C++ 标准为我们定义了 C++ 标准内存模型,使我们能够无需关心底层硬件环境就编写出跨平台的应用程序。不过,了解底层硬件架构的内存模型对扩展知识面和深入理解编程细节也非常有帮助。

最经典与常见的两种 CPU 指令集架构就是:x86ARM

  • x86 架构:是一种复杂指令集计算(CISC)架构,因其强大的性能被广泛应用于桌面电脑、笔记本电脑和服务器中。x86 架构采用的是 TSO(Total Store Order)内存一致性模型,是一种强一致性模型简化了多线程编程中的内存同步问题(后文中会提到)。

  • ARM 架构:是一种精简指令集计算(RISC)架构,因其高能效和低功耗特点广泛应用于移动设备、嵌入式系统和物联网设备中。ARM 架构采用的是弱序内存模型weakly-ordered memory),允许更灵活的内存优化,但这需要程序员使用内存屏障等机制来确保正确性。

这两种架构在设计理念和应用领域上存在显著差异,这也是它们在不同应用场景中表现出色的原因。

如果你从事嵌入式系统或者学术研究等,可能也听说过 RISC-V 架构,它目前在国内的应用也逐渐增多。

RISC-V 是一种开源的精简指令集计算(RISC)架构,旨在提供一种高效、模块化且开放的指令集。与 x86 和 ARM 架构不同,RISC-V 的设计目标是简化指令集,同时保持高度的灵活性和扩展性。它在内存模型方面也有自己独特的特性。

RISC-V 采用的也是弱序内存模型(weakly-ordered memory model),这与 x86 的强一致性模型(TSO)和 ARM 的弱一致性模型有所不同。你可能会有疑问:

  • ARMRISC-V 都是弱序内存模型,为什么不同?

各位一定要区分,这种强弱其实也只是一种分类而已,不同的指令集架构大多都还是有所不同的,并不会完全一样。例如: x86 的 TSO(Total Store Order)是强一致性模型的一种,但并不是所有强一致性模型都是 TSO。

宽松定序

释放-获取定序

释放-消费定序

序列一致定序

volatile 的关系

Last updated