亚洲欧美第一页_禁久久精品乱码_粉嫩av一区二区三区免费野_久草精品视频

蟲蟲首頁| 資源下載| 資源專輯| 精品軟件
登錄| 注冊

您現在的位置是:首頁 > 技術閱讀 >  C++并發編程(C++11到C++17)

C++并發編程(C++11到C++17)

時間:2024-02-12

置頂/星標公眾號????,硬核文章第一時間送達!


為什么要并發編程


大型的軟件項目常常包含非常多的任務需要處理。例如:對于大量數據的數據流處理,或者是包含復雜GUI界面的應用程序。如果將所有的任務都以串行的方式執行,則整個系統的效率將會非常低下,應用程序的用戶體驗會非常的差。

另一方面,自上個世紀六七十年代英特爾創始人之一 Gordon Moore 提出 摩爾定義 以來,CPU頻率以每18個月翻一番的指數速度增長。但這一增長在最近的十年已經基本停滯,大家會發現曾經有過一段時間CPU的頻率從3G到達4G,但在這之后就停滯不前了。因此最近的新款CPU也基本上都是3G左右的頻率。相應的,CPU以更多核的形式在增長。目前的Intel i7有8核的版本,Xeon處理器達到了28核。并且,最近幾年手機上使用的CPU也基本上是4核或者8核的了。

由此,掌握并發編程技術,利用多處理器來提升軟件項目的性能將是軟件工程師的一項基本技能。

本文以C++語言為例,講解如何進行并發編程。并盡可能涉及C++11,C++14以及C++17中的主要內容。

并發與并行


并發(Concurrent)與并行(Parallel)都是很常見的術語。

Erlang之父Joe Armstrong曾經以人們使用咖啡機的場景為例描述了這兩個術語。如下圖所示:


  • 并發:如果多個隊列可以交替使用某臺咖啡機,則這一行為就是并發的。
  • 并行:如果存在多臺咖啡機可以被多個隊列交替使用,則就是并行。

這里隊列中的每個人類比于計算機的任務,咖啡機類比于計算機處理器。因此:并發和并行都是在多任務的環境下的討論。

更嚴格的來說:如果一個系統支持多個動作同時存在,那么這個系統就是一個并發系統。如果這個系統還支持多個動作(物理時間上)同時執行,那么這個系統就是一個并行系統。

你可能已經看出,“并行”其實是“并發”的子集。它們的區別在于是否具有多個處理器。如果存在多個處理器同時執行多個線程,就是并行。

在不考慮處理器數量的情況下,我們統稱之為“并發”。

進程與線程


進程與線程是操作系統的基本概念。無論是桌面系統:MacOS,Linux,Windows,還是移動操作系統:Android,iOS,都存在進程和線程的概念。

進程(英語:process),是指計算機中已運行的程序。進程為曾經是分時系統的基本運作單位。在面向進程設計的系統(如早期的UNIX,Linux 2.4及更早的版本)中,進程是程序的基本執行實體;
線程(英語:thread)是操作系統能夠進行運算調度的最小單位。它被包含在進程之中,是進程中的實際運作單位。
-- 維基百科

關于這兩個概念在任何一本操作系統書上都可以找到定義。網上也有很多文章對它們進行了解釋。因此這里不再贅述,這里僅僅提及一下它們與編程的關系。

對于絕大部分編程語言或者編程環境來說,我們所寫的程序都會在一個進程中運行。一個進程至少會包含一個線程。這個線程我們通常稱之為主線程。

在默認的情況下,我們寫的代碼都是在進程的主線程中運行,除非開發者在程序中創建了新的線程。

不同編程語言的線程環境會不一樣,Java語言在很早就支持了多線程接口。(Java程序在Java虛擬機中運行,虛擬機通常還會包含自己特有的線程,例如垃圾回收線程。)。而對于JavaScript這樣的語言來說,它就沒有多線程的概念。

當我們只有一個處理器時,所有的進程或線程會分時占用這個處理器。但如果系統中存在多個處理器時,則就可能有多個任務并行的運行在不同的處理器上。

下面兩幅圖以不同顏色的矩形代表不同的任務(可能是進程,也可能是線程)來描述它們可能在處理器上執行的順序。

下圖是單核處理器的情況:


下面是四核處理器的情況:


任務會在何時占有處理器,通常是由操作系統的調度策略決定的。在《Android系統上的進程管理:進程的調度》一文中,我們介紹過Linux的調度策略。

當我們在開發跨平臺的軟件時,我們不應當對調度策略做任何假設,而應該抱有“系統可能以任意順序來調度我的任務”這樣的想法。

并發系統的性能


開發并發系統最主要的動機就是提升系統性能(事實上,這是以增加復雜度為代價的)。

但我們需要知道,單純的使用多線程并不一定能提升系統性能(當然,也并非線程越多系統的性能就越好)。從上面的兩幅圖我們就可以直觀的感受到:線程(任務)的數量要根據具體的處理器數量來決定。假設只有一個處理器,那么劃分太多線程可能會適得其反。因為很多時間都花在任務切換上了。

因此,在設計并發系統之前,一方面我們需要做好對于硬件性能的了解,另一方面需要對我們的任務有足夠的認識。

關于這一點,你可能需要了解一下阿姆達爾定律了。對于這個定律,簡單來說:我們想要預先意識到那些任務是可以并行的,那些是無法并行的。只有明確了任務的性質,才能有的放矢的進行優化。這個定律告訴了我們將系統并行之后性能收益的上限。

關于阿姆達爾定律在Linux系統監測工具sysstat介紹一文中已經介紹過,因此這里不再贅述。

C++與并發編程


前面我們已經了解到,并非所有的語言都提供了多線程的環境。

即便是C++語言,直到C++11標準之前,也是沒有多線程支持的。在這種情況下,Linux/Unix平臺下的開發者通常會使用POSIX Threads,Windows上的開發者也會有相應的接口。但很明顯,這些API都只針對特定的操作系統平臺,可移植性較差。如果要同時支持Linux和Windows系統,你可能要寫兩套代碼。

相較而言,Java自JDK 1.0就包含了多線程模型。

這個狀態在C++ 11標準發布之后得到了改變。并且,在C++ 14和C++ 17標準中又對并發編程機制進行了增強。

下圖是最近幾個版本的C++標準特性的線路圖。


編譯器與C++標準


編譯器對于語言特性的支持是逐步完成的。想要使用特定的特性你需要相應版本的編譯器。

  • GCC對于C++特性的支持請參見這里:C++ Standards Support in GCC。
  • Clang對于C++特性的支持請參見這里:C++ Support in Clang。

下面兩個表格列出了C++標準和相應編譯器的版本對照:

  • C++標準與相應的GCC版本要求如下:
  • C++標準與相應的Clang版本要求如下:

默認情況下編譯器是以較低的標準來進行編譯的,如果希望使用新的標準,你需要通過編譯參數-std=c++xx告知編譯器,例如:

g++ -std=c++17 your_file.cpp -o your_program

測試環境


本文的源碼可以到下載我的github上獲取,地址:paulQuei/cpp-concurrency。
你可以直接通過下面這條命令獲取源碼:

git clone https://github.com/paulQuei/cpp-concurrency.git

源碼下載之后,你可以通過任何文本編輯器瀏覽源碼。如果希望編譯和運行程序,你還需要按照下面的內容來準備環境。

本文中的源碼使用cmake編譯,只有cmake 3.8以上的版本才支持C++ 17,所以你需要安裝這個或者更新版本的cmake。

另外,截止目前(2019年10月)為止,clang編譯器還不支持并行算法。

但是gcc-9是支持的。因此想要編譯和運行這部分代碼,你需要安裝gcc 9.0或更新的版本。并且,gcc-9還要依賴Intel Threading Building Blocks才能使用并行算法以及<execution>頭文件。

具體的安裝方法見下文。

具體編譯器對于C++特性支持的情況請參見這里:C++ compiler support。

安裝好之后運行根目錄下的下面這個命令即可:
 

./make_all.sh

它會完成所有的編譯工作。

本文的源碼在下面兩個環境中經過測試,環境的準備方法如下。

MacOS


在Mac上,我使用brew工具安裝gcc以及tbb庫。

考慮到其他人與我的環境可能會有所差異,所以需要手動告知tbb庫的安裝路徑。
讀者需要執行下面這些命令來準備環境:


rew install gccbrew insbtall tbb
export tbb_path=/usr/local/Cellar/tbb/2019_U8/./make_all.sh

注意,請通過運行g++-9命令以確認gcc的版本是否正確,如果版本較低,則需要通過brew命令將其升級到新版本:

brew upgrade gcc

Ubuntu


Ubuntu上,通過下面的命令安裝gcc-9。

sudo add-apt-repository ppa:ubuntu-toolchain-r/testsudo apt-get updatesudo apt install gcc-9 g++-9

但安裝tbb庫就有些麻煩了。這是因為Ubuntu 16.04默認關聯的版本是較低的,直接安裝是無法使用的。我們需要安裝更新的版本。
聯網安裝的方式步驟繁瑣,所以可以通過下載包的方式進行安裝,我已經將這需要的兩個文件放到的這里:

  • libtbb2_2019~U8-1_amd64.deb
  • libtbb-dev_2019~U8-1_amd64.deb

如果需要,你可以下載后通過apt命令安裝即可:

sudo apt install ~/Downloads/libtbb2_2019~U8-1_amd64.deb sudo apt install ~/Downloads/libtbb-dev_2019~U8-1_amd64.deb

線程


創建線程


創建線程非常的簡單的,下面就是一個使用了多線程的Hello World示例:

// 01_hello_thread.cpp
#include <iostream>#include <thread> // ①
using namespace std; // ②
void hello() { // ③ cout << "Hello World from new thread." << endl;}
int main() { thread t(hello); // ④ t.join(); // ⑤
return 0;}

對于這段代碼說明如下:

  1. 為了使用多線程的接口,我們需要#include <thread>頭文件。
  2. 為了簡化聲明,本文中的代碼都將using namespace std;。
  3. 新建線程的入口是一個普通的函數,它并沒有什么特別的地方。
  4. 創建線程的方式就是構造一個thread對象,并指定入口函數。與普通對象不一樣的是,此時編譯器便會為我們創建一個新的操作系統線程,并在新的線程中執行我們的入口函數。
  5. 關于join函數在下文中講解。

thread可以和callable類型一起工作,因此如果你熟悉lambda表達式,你可以直接用它來寫線程的邏輯,像這樣:

// 02_lambda_thread.cpp
#include <iostream>#include <thread>
using namespace std;
int main() { thread t([] { cout << "Hello World from lambda thread." << endl; });
t.join();
return 0;}

為了減少不必要的重復,若無必要,下文中的代碼將不貼出include指令以及using聲明。

當然,你可以傳遞參數給入口函數,像下面這樣:

// 03_thread_argument.cpp
void hello(string name) { cout << "Welcome to " << name << endl;}
int main() { thread t(hello, "https://paul.pub"); t.join();
return 0;}

不過需要注意的是,參數是以拷貝的形式進行傳遞的。因此對于拷貝耗時的對象你可能需要傳遞指針或者引用類型作為參數。但是,如果是傳遞指針或者引用,你還需要考慮參數對象的生命周期。因為線程的運行長度很可能會超過參數的生命周期(見下文detach),這個時候如果線程還在訪問一個已經被銷毀的對象就會出現問題。

join與detach


  • 主要API

一旦啟動線程之后,我們必須決定是要等待直接它結束(通過join),還是讓它獨立運行(通過detach),我們必須二者選其一。如果在thread對象銷毀的時候我們還沒有做決定,則thread對象在析構函數出將調用std::terminate()從而導致我們的進程異常退出。

請思考在上面的代碼示例中,thread對象在何時會銷毀。

需要注意的是:在我們做決定的時候,很可能線程已經執行完了(例如上面的示例中線程的邏輯僅僅是一句打印,執行時間會很短)。新的線程創建之后,究竟是新的線程先執行,還是當前線程的下一條語句先執行這是不確定的,因為這是由操作系統的調度策略決定的。不過這不要緊,我們只要在thread對象銷毀前做決定即可。

  • join:調用此接口時,當前線程會一直阻塞,直到目標線程執行完成(當然,很可能目標線程在此處調用之前就已經執行完成了,不過這不要緊)。因此,如果目標線程的任務非常耗時,你就要考慮好是否需要在主線程上等待它了,因此這很可能會導致主線程卡住。
  • detach:detach是讓目標線程成為守護線程(daemon threads)。一旦detach之后,目標線程將獨立執行,即便其對應的thread對象銷毀也不影響線程的執行。并且,你無法再與之通信。

對于這兩個接口,都必須是可執行的線程才有意義。你可以通過joinable()接口查詢是否可以對它們進行join或者detach。

管理當前線程


  • 主要API

上面是一些在線程內部使用的API,它們用來對當前線程做一些控制。

  • yield 通常用在自己的主要任務已經完成的時候,此時希望讓出處理器給其他任務使用。
  • get_id 返回當前線程的id,可以以此來標識不同的線程。
  • sleep_for 是讓當前線程停止一段時間。
  • sleep_until 和sleep_for類似,但是是以具體的時間點為參數。這兩個API都以chrono API(由于篇幅所限,這里不展開這方面內容)為基礎。

下面是一個代碼示例:

// 04_thread_self_manage.cpp
void print_time() { auto now = chrono::system_clock::now(); auto in_time_t = chrono::system_clock::to_time_t(now);
std::stringstream ss; ss << put_time(localtime(&in_time_t), "%Y-%m-%d %X"); cout << "now is: " << ss.str() << endl;}
void sleep_thread() { this_thread::sleep_for(chrono::seconds(3)); cout << "[thread-" << this_thread::get_id() << "] is waking up" << endl;}
void loop_thread() { for (int i = 0; i < 10; i++) { cout << "[thread-" << this_thread::get_id() << "] print: " << i << endl; }}
int main() { print_time();
thread t1(sleep_thread); thread t2(loop_thread);
t1.join(); t2.detach();
print_time(); return 0;}

這段代碼應該還是比較容易理解的,這里創建了兩個線程。它們都會有一些輸出,其中一個會先停止3秒鐘,然后再輸出。主線程調用join會一直卡住等待它運行結束。
這段程序的輸出如下:

now is: 2019-10-13 10:17:48[thread-0x70000cdda000] print: 0[thread-0x70000cdda000] print: 1[thread-0x70000cdda000] print: 2[thread-0x70000cdda000] print: 3[thread-0x70000cdda000] print: 4[thread-0x70000cdda000] print: 5[thread-0x70000cdda000] print: 6[thread-0x70000cdda000] print: 7[thread-0x70000cdda000] print: 8[thread-0x70000cdda000] print: 9[thread-0x70000cd57000] is waking upnow is: 2019-10-13 10:17:51

一次調用


  • 主要API

在一些情況下,我們有些任務需要執行一次,并且我們只希望它執行一次,例如資源的初始化任務。這個時候就可以用到上面的接口。這個接口會保證,即便在多線程的環境下,相應的函數也只會調用一次。

下面就是一個示例:有三個線程都會使用init函數,但是只會有一個線程真正執行它。

// 05_call_once.cpp
void init() { cout << "Initialing..." << endl; // Do something...}
void worker(once_flag* flag) { call_once(*flag, init);}
int main() { once_flag flag;
thread t1(worker, &flag); thread t2(worker, &flag); thread t3(worker, &flag);
t1.join(); t2.join(); t3.join();
return 0;}

我們無法確定具體是哪一個線程會執行init。而事實上,我們也不關心,因為只要有某個線程完成這個初始化工作就可以了。

請思考一下,為什么要在main函數中創建once_flag flag。如果是在worker函數中直接聲明一個once_flag并使用行不行?為什么?


并發任務


下面以一個并發任務為示例講解如何引入多線程。

任務示例:現在假設我們需要計算某個范圍內所有自然數的平方根之和,例如[1, 10e8]。

在單線程模型下,我們的代碼可能是這樣的:

// 06_naive_multithread.cpp
static const int MAX = 10e8; // ①static double sum = 0; // ②
void worker(int min, int max) { // ③ for (int i = min; i <= max; i++) { sum += sqrt(i); }}
void serial_task(int min, int max) { // ④ auto start_time = chrono::steady_clock::now(); sum = 0; worker(0, MAX); auto end_time = chrono::steady_clock::now(); auto ms = chrono::duration_cast<chrono::milliseconds>(end_time - start_time).count(); cout << "Serail task finish, " << ms << " ms consumed, Result: " << sum << endl;}

這段代碼說明如下:

  1. 通過一個常量指定數據范圍,這個是為了方便調整。
  2. 通過一個全局變量來存儲結果。
  3. 通過一個任務函數來計算值。
  4. 統計任務的執行時間。

這段程序輸出如下:

Serail task finish, 6406 ms consumed, Result: 2.10819e+13

很顯然,上面單線程的做法性能太差了。我們的任務完全是可以并發執行的。并且任務很容易劃分。

下面我們就嘗試以多線程的方式來改造原先的程序。

改造后的程序如下:

// 06_naive_multithread.cpp
void concurrent_task(int min, int max) { auto start_time = chrono::steady_clock::now();
unsigned concurrent_count = thread::hardware_concurrency(); // ① cout << "hardware_concurrency: " << concurrent_count << endl; vector<thread> threads; min = 0; sum = 0; for (int t = 0; t < concurrent_count; t++) { // ② int range = max / concurrent_count * (t + 1); threads.push_back(thread(worker, min, range)); // ③ min = range + 1; } for (auto& t : threads) { t.join(); // ④ }
auto end_time = chrono::steady_clock::now(); auto ms = chrono::duration_cast<chrono::milliseconds>(end_time - start_time).count(); cout << "Concurrent task finish, " << ms << " ms consumed, Result: " << sum << endl;}

這段代碼說明如下:

  1. thread::hardware_concurrency()可以獲取到當前硬件支持多少個線程并行執行。
  2. 根據處理器的情況決定線程的數量。
  3. 對于每一個線程都通過worker函數來完成任務,并劃分一部分數據給它處理。
  4. 等待每一個線程執行結束。

很好,似乎很簡單就完成了并發的改造。然后我們運行一下這個程序:

hardware_concurrency: 16Concurrent task finish, 6246 ms consumed, Result: 1.78162e+12

很抱歉,我們會發現這里的性能并沒有明顯的提升。更嚴重的是,這里的結果是錯誤的。

要搞清楚為什么結果不正確我們需要更多的背景知識。

我們知道,對于現代的處理器來說,為了加速處理的速度,每個處理器都會有自己的高速緩存(Cache),這個高速緩存是與每個處理器相對應的,如下圖所示:

事實上,目前大部分CPU的緩存已經不只一層。


處理器在進行計算的時候,高速緩存會參與其中,例如數據的讀和寫。而高速緩存和系統主存(Memory)是有可能存在不一致的。即:某個結果計算后保存在處理器的高速緩存中了,但是沒有同步到主存中,此時這個值對于其他處理器就是不可見的。

事情還遠不止這么簡單。我們對于全局變量值的修改:sum += sqrt(i);這條語句,它并非是原子的。它其實是很多條指令的組合才能完成。假設在某個設備上,這條語句通過下面這幾個步驟來完成。它們的時序可能如下所示:


在時間點a的時候,所有線程對于sum變量的值是一致的。

但是在時間點b之后,thread3上已經對sum進行了賦值。而這個時候其他幾個線程也同時在其他處理器上使用了這個值,那么這個時候它們所使用的值就是舊的(錯誤的)。最后得到的結果也自然是錯的。

競爭條件與臨界區


當多個進程或者線程同時訪問共享數據時,只要有一個任務會修改數據,那么就可能會發生問題。此時結果依賴于這些任務執行的相對時間,這種場景稱為競爭條件(race condition)。

訪問共享數據的代碼片段稱之為臨界區(critical section)。具體到上面這個示例,臨界區就是讀寫sum變量的地方。

要避免競爭條件,就需要對臨界區進行數據保護。

很自然的,現在我們能夠理解發生競爭條件是因為這些線程在同時訪問共享數據,其中有些線程的改動沒有讓其他線程知道,導致其他線程在錯誤的基礎上進行處理,結果自然也就是錯誤的。

那么,如果一次只讓一個線程訪問共享數據,訪問完了再讓其他線程接著訪問,這樣就可以避免問題的發生了。

接下來介紹的API提供的就是這樣的功能。

互斥體與鎖


mutex


開發并發系統的目的主要是為了提升性能:將任務分散到多個線程,然后在不同的處理器上同時執行。這些分散開來的線程通常會包含兩類任務:

  1. 獨立的對于劃分給自己的數據的處理
  2. 對于處理結果的匯總

其中第1項任務因為每個線程是獨立的,不存在競爭條件的問題。而第2項任務,由于所有線程都可能往總結果(例如上面的sum變量)匯總,這就需要做保護了。在某一個具體的時刻,只應當有一個線程更新總結果,即:保證每個線程對于共享數據的訪問是“互斥”的。mutex 就提供了這樣的功能。

mutex是mutual exclusion(互斥)的簡寫。

  • 主要API

很明顯,在這些類中,mutex是最基礎的API。其他類都是在它的基礎上的改進。所以這些類都提供了下面三個方法,并且它們的功能是一樣的:

| 方法| 說明 |
| lock|鎖定互斥體,如果不可用,則阻塞 |
| try_lock |嘗試鎖定互斥體,如果不可用,直接返回 |
|unlock | 解鎖互斥體|

這三個方法提供了基礎的鎖定和解除鎖定的功能。使用lock意味著你有很強的意愿一定要獲取到互斥體,而使用try_lock則是進行一次嘗試。這意味著如果失敗了,你通常還有其他的路徑可以走。

在這些基礎功能之上,其他的類分別在下面三個方面進行了擴展:

  • 超時:
    timed_mutex,
    recursive_timed_mutex,
    shared_timed_mutex
    名稱都帶有timed,這意味著它們都支持超時功能。它們都提供了try_lock_for和try_lock_until方法,這兩個方法分別可以指定超時的時間長度和時間點。如果在超時的時間范圍內沒有能獲取到鎖,則直接返回,不再繼續等待。
  • 可重入:
    recursive_mutex和recursive_timed_mutex的名稱都帶有recursive。可重入或者叫做可遞歸,是指在同一個線程中,同一把鎖可以鎖定多次。這就避免了一些不必要的死鎖。
  • 共享:
    shared_timed_mutex和shared_mutex提供了共享功能。對于這類互斥體,實際上是提供了兩把鎖:一把是共享鎖,一把是互斥鎖。一旦某個線程獲取了互斥鎖,任何其他線程都無法再獲取互斥鎖和共享鎖;但是如果有某個線程獲取到了共享鎖,其他線程無法再獲取到互斥鎖,但是還有獲取到共享鎖。這里互斥鎖的使用和其他的互斥體接口和功能一樣。而共享鎖可以同時被多個線程同時獲取到(使用共享鎖的接口見下面的表格)。共享鎖通常用在讀者寫者模型上。

使用共享鎖的接口如下:

| 方法| 說明 |
|lock_shared | 獲取互斥體的共享鎖,如果無法獲取則阻塞 |
| try_lock_shared| 嘗試獲取共享鎖,如果不可用,直接返回 |
| unlock_shared| 解鎖共享鎖 |

接下來,我們就借助剛學到的mutex來改造我們的并發系統,改造后的程序如下:

// 07_mutex_lock.cpp
static const int MAX = 10e8;static double sum = 0;
static mutex exclusive;
void concurrent_worker(int min, int max) { for (int i = min; i <= max; i++) { exclusive.lock(); // ① sum += sqrt(i); exclusive.unlock(); // ② }}
void concurrent_task(int min, int max) { auto start_time = chrono::steady_clock::now();
unsigned concurrent_count = thread::hardware_concurrency(); cout << "hardware_concurrency: " << concurrent_count << endl; vector<thread> threads; min = 0; sum = 0; for (int t = 0; t < concurrent_count; t++) { int range = max / concurrent_count * (t + 1); threads.push_back(thread(concurrent_worker, min, range)); // ③ min = range + 1; } for (int i = 0; i < threads.size(); i++) { threads[i].join(); }
auto end_time = chrono::steady_clock::now(); auto ms = chrono::duration_cast<chrono::milliseconds>(end_time - start_time).count(); cout << "Concurrent task finish, " << ms << " ms consumed, Result: " << sum << endl;}

這里只有三個地方需要關注:

  1. 在訪問共享數據之前加鎖
  2. 訪問完成之后解鎖
  3. 在多線程中使用帶鎖的版本

執行之后結果輸出如下:

hardware_concurrency: 16Concurrent task finish, 74232 ms consumed, Result: 2.10819e+13

這下結果是對了,但是我們卻發現這個版本比原先單線程的版本性能還要差很多。這是為什么?

這是因為加鎖和解鎖是有代價的,這里計算最耗時的地方在鎖里面,每次只能有一個線程串行執行,相比于單線程模型,它不但是串行的,還增加了鎖的負擔,因此就更慢了。

這就是為什么前面說多線程系統會增加系統的復雜度,而且并非多線程系統一定就有更好的性能。

不過,對于這里的問題是可以改進的。我們仔細思考一下:我們劃分給每個線程的數據其實是獨立的,對于數據的處理是耗時的,但其實這部分邏輯每個線程可以單獨處理,沒必要加鎖。只有在最后匯總數據的時候進行一次鎖保護就可以了。

于是我們改造concurrent_worker,像下面這樣:

// 08_improved_mutex_lock.cpp
void concurrent_worker(int min, int max) { double tmp_sum = 0; for (int i = min; i <= max; i++) { tmp_sum += sqrt(i); // ① } exclusive.lock(); // ② sum += tmp_sum; exclusive.unlock();}

這段代碼的改變在于兩處:

  1. 通過一個局部變量保存當前線程的處理結果
  2. 在匯總總結過的時候進行鎖保護

運行一下改進后的程序,其結果輸出如下:

hardware_concurrency: 16Concurrent task finish, 451 ms consumed, Result: 2.10819e+13

可以看到,性能一下就提升了好多倍。我們終于體驗到多線程帶來的好處了。

我們用鎖的粒度(granularity)來描述鎖的范圍。細粒度(fine-grained)是指鎖保護較小的范圍,粗粒度(coarse-grained)是指鎖保護較大的范圍。出于性能的考慮,我們應該保證鎖的粒度盡可能的細。并且,不應該在獲取鎖的范圍內執行耗時的操作,例如執行IO。如果是耗時的運算,也應該盡可能的移到鎖的外面。

In general, a lock should be held for only the minimum possible time needed to perform the required operations.
--《C++ Concurrency in Action》


死鎖


死鎖是并發系統很常見的一類問題。

死鎖是指:兩個或以上的運算單元,每一方都在等待其他方釋放資源,但是所有方都不愿意釋放資源。結果是沒有任何一方能繼續推進下去,于是整個系統無法再繼續運轉。

死鎖在現實中也很常見,例如:兩個孩子分別拿著玩具的一半然后哭著要從對方手里得到另外一半玩具,但是誰都不肯讓步。

在成年人的世界里也會發生類似的情況,例如下面這個交通狀況:


下面我們來看一個編程示例。

現在假設我們在開發一個銀行的系統,這個系統包含了轉賬的功能。

首先我們創建一個Account類來描述銀行賬號。由于這僅僅是一個演示使用的代碼,所以我們希望代碼足夠的簡單。Account類僅僅包含名稱和金額兩個字段。

另外,為了支持并發,這個類包含了一個mutex對象,用來保護賬號金額,在讀寫賬號金額時需要先加鎖保護。

// 09_deadlock_bank_transfer.cpp
class Account {public: Account(string name, double money): mName(name), mMoney(money) {};
public: void changeMoney(double amount) { mMoney += amount; } string getName() { return mName; } double getMoney() { return mMoney; } mutex* getLock() { return &mMoneyLock; }
private: string mName; double mMoney; mutex mMoneyLock;};

Account類很簡單,我想就不用多做說明了。

接下來,我們再創建一個描述銀行的Bank類。

// 09_deadlock_bank_transfer.cpp
class Bank {public: void addAccount(Account* account) { mAccounts.insert(account); }
bool transferMoney(Account* accountA, Account* accountB, double amount) { lock_guard guardA(*accountA->getLock()); // ① lock_guard guardB(*accountB->getLock());
if (amount > accountA->getMoney()) { // ② return false; }
accountA->changeMoney(-amount); // ③ accountB->changeMoney(amount); return true; }
double totalMoney() const { double sum = 0; for (auto a : mAccounts) { sum += a->getMoney(); } return sum; }
private: set<Account*> mAccounts;};

銀行類中記錄了所有的賬號,并且提供了一個方法用來查詢整個銀行的總金額。
這其中,我們最主要要關注轉賬的實現:transferMoney。該方法的幾個關鍵點如下:

  1. 為了保證線程安全,在修改每個賬號之前,需要獲取相應的鎖。
  2. 判斷轉出賬戶金額是否足夠,如果不夠此次轉賬失敗。
  3. 進行轉賬。

有了銀行和賬戶結構之后就可以開發轉賬系統了,同樣的,由于是為了演示所用,我們的轉賬系統也會盡可能的簡單:

// 09_deadlock_bank_transfer.cpp
void randomTransfer(Bank* bank, Account* accountA, Account* accountB) { while(true) { double randomMoney = ((double)rand() / RAND_MAX) * 100; if (bank->transferMoney(accountA, accountB, randomMoney)) { cout << "Transfer " << randomMoney << " from " << accountA->getName() << " to " << accountB->getName() << ", Bank totalMoney: " << bank->totalMoney() << endl; } else { cout << "Transfer failed, " << accountA->getName() << " has only $" << accountA->getMoney() << ", but " << randomMoney << " required" << endl; } }}

這里每次生成一個隨機數,然后通過銀行進行轉賬。

最后我們在main函數中創建兩個線程,互相在兩個賬號之間來回轉賬:

// 09_deadlock_bank_transfer.cpp
int main() { Account a("Paul", 100); Account b("Moira", 100);
Bank aBank; aBank.addAccount(&a); aBank.addAccount(&b);
thread t1(randomTransfer, &aBank, &a, &b); thread t2(randomTransfer, &aBank, &b, &a);
t1.join(); t2.join();
return 0;}

至此,我們的銀行轉賬系統就開發完成了。然后編譯并運行,其結果可能像下面這樣:

...Transfer 13.2901 from Paul to Moira, Bank totalMoney: 20042.6259 from Moira to Paul, Bank totalMoney: 200Transfer failed, Moira has only $34.7581, but 66.3208 requiredTransfer failed, Moira has only $34.7581, but Transfer 93.191 from 53.9176 requiredTransfer 60.6146 from Moira to Paul, Bank totalMoney: 200Transfer 49.7304 from Moira to Paul, Bank totalMoney: 200Paul to Moira, Bank totalMoney: Transfer failed, Moira has only $17.6041, but 18.1186 requiredTransfer failed, Moira has only $17.6041, but 18.893 requiredTransfer failed, Moira has only $17.6041, but 34.7078 requiredTransfer failed, Moira has only $17.6041, but 33.9569 requiredTransfer 12.7899 from 200Moira to Paul, Bank totalMoney: 200Transfer failed, Moira has only $63.9373, but 80.9038 requiredTransfer 50.933 from Moira to Paul, Bank totalMoney: 200Transfer failed, Moira has only $13.0043, but 30.2056 requiredTransfer failed, Moira has only $Transfer 59.123 from Paul to Moira, Bank totalMoney: 200Transfer 29.0486 from Paul to Moira, Bank totalMoney: 20013.0043, but 64.7307 required

如果你運行了這個程序,你會發現很快它就卡住不動了。為什么?

因為發生了死鎖。

我們仔細思考一下這兩個線程的邏輯:這兩個線程可能會同時獲取其中一個賬號的鎖,然后又想獲取另外一個賬號的鎖,此時就發生了死鎖。如下圖所示:


當然,發生死鎖的原因遠不止上面這一種情況。如果兩個線程互相join就可能發生死鎖。還有在一個線程中對一個不可重入的互斥體(例如mutex而非recursive_mutex)多次加鎖也會死鎖。

你可能會覺得,我可不會這么傻,寫出這樣的代碼。但實際上,很多時候是由于代碼的深層次嵌套導致了死鎖的發生,由于調用關系的復雜導致發現這類問題并不容易。

如果仔細看一下上面的輸出,我們會發現還有另外一個問題:這里的輸出是亂的。兩個線程的輸出混雜在一起了。究其原因也很容易理解:兩個線程可能會同時輸出,沒有做好隔離。

下面我們就來逐步解決上面的問題。

對于輸出混亂的問題很好解決,專門用一把鎖來保護輸出邏輯即可:

// 10_improved_bank_transfer.cpp
mutex sCoutLock;void randomTransfer(Bank* bank, Account* accountA, Account* accountB) { while(true) { double randomMoney = ((double)rand() / RAND_MAX) * 100; if (bank->transferMoney(accountA, accountB, randomMoney)) { sCoutLock.lock(); cout << "Transfer " << randomMoney << " from " << accountA->getName() << " to " << accountB->getName() << ", Bank totalMoney: " << bank->totalMoney() << endl; sCoutLock.unlock(); } else { sCoutLock.lock(); cout << "Transfer failed, " << accountA->getName() << " has only " << accountA->getMoney() << ", but " << randomMoney << " required" << endl; sCoutLock.unlock(); } }}

請思考一下兩處lock和unlock調用,并考慮為什么不在while(true)下面寫一次整體的加鎖和解鎖。


通用鎖定算法


  • 主要API
要避免死鎖,需要仔細的思考和設計業務邏輯。

有一個比較簡單的原則可以避免死鎖,即:對所有的鎖進行排序,每次一定要按照順序來獲取鎖,不允許亂序。例如:要獲取某個玩具,一定要先拿到鎖A,再拿到鎖B,才能玩玩具。這樣就不會死鎖了。

這個原則雖然簡單,但卻不容易遵守。因為數據常常是分散在很多地方的。

不過好消息是,C++ 11標準中為我們提供了一些工具來避免因為多把鎖而導致的死鎖。我們只要直接調用這些接口就可以了。這個就是上面提到的兩個函數。它們都支持傳入多個Lockable對象。

接下來我們用它來改造之前死鎖的轉賬系統:

// 10_improved_bank_transfer.cpp
bool transferMoney(Account* accountA, Account* accountB, double amount) { lock(*accountA->getLock(), *accountB->getLock()); // ① lock_guard lockA(*accountA->getLock(), adopt_lock); // ② lock_guard lockB(*accountB->getLock(), adopt_lock); // ③
if (amount > accountA->getMoney()) { return false; }
accountA->changeMoney(-amount); accountB->changeMoney(amount); return true;}

這里只改動了3行代碼。

  1. 這里通過lock函數來獲取兩把鎖,標準庫的實現會保證不會發生死鎖。
  2. lock_guard在下面我們還會詳細介紹。這里只要知道它會在自身對象生命周期的范圍內鎖定互斥體即可。創建lock_guard的目的是為了在transferMoney結束的時候釋放鎖,lockB也是一樣。但需要注意的是,這里傳遞了 adopt_lock表示:現在是已經獲取到互斥體了的狀態了,不用再次加鎖(如果不加adopt_lock就是二次鎖定了)。

運行一下這個改造后的程序,其輸出如下所示:

...Transfer failed, Paul has only $1.76243, but 17.5974 requiredTransfer failed, Paul has only $1.76243, but 59.2104 requiredTransfer failed, Paul has only $1.76243, but 49.6379 requiredTransfer failed, Paul has only $1.76243, but 63.6373 requiredTransfer failed, Paul has only $1.76243, but 51.8742 requiredTransfer failed, Paul has only $1.76243, but 50.0081 requiredTransfer failed, Paul has only $1.76243, but 86.1041 requiredTransfer failed, Paul has only $1.76243, but 51.3278 requiredTransfer failed, Paul has only $1.76243, but 66.5754 requiredTransfer failed, Paul has only $1.76243, but 32.1867 requiredTransfer failed, Paul has only $1.76243, but 62.0039 requiredTransfer failed, Paul has only $1.76243, but 98.7819 requiredTransfer failed, Paul has only $1.76243, but 27.046 requiredTransfer failed, Paul has only $1.76243, but 62.9155 requiredTransfer 98.8478 from Moira to Paul, Bank totalMoney: 200Transfer 80.0722 from Moira to Paul, Bank totalMoney: 200Transfer 73.7035 from Moira to Paul, Bank totalMoney: 200Transfer 34.4476 from Moira to Paul, Bank totalMoney: 200Transfer failed, Moira has only $10.0142, but 61.3033 requiredTransfer failed, Moira has only $10.0142, but 24.5595 required...

現在這個轉賬程序會一直運行下去,不會再死鎖了。輸出也是正常的了。

通用互斥管理


  • 主要API

互斥體(mutex相關類)提供了對于資源的保護功能,但是手動的鎖定(調用lock或者try_lock)和解鎖(調用unlock)互斥體是要耗費比較大的精力的,我們需要精心考慮和設計代碼才行。因為我們需要保證,在任何情況下,解鎖要和加鎖配對,因為假設出現一條路徑導致獲取鎖之后沒有正常釋放,就會影響整個系統。如果考慮方法還可以會拋出異常,這樣的代碼寫起來會很費勁。

鑒于這個原因,標準庫就提供了上面的這些API。它們都使用了叫做RAII的編程技巧,來簡化我們手動加鎖和解鎖的“體力活”。

請看下面的例子

// https://en.cppreference.com/w/cpp/thread/lock_guard
#include <thread>#include <mutex>#include <iostream> int g_i = 0;std::mutex g_i_mutex; // ① void safe_increment(){ std::lock_guard<std::mutex> lock(g_i_mutex); // ② ++g_i;
std::cout << std::this_thread::get_id() << ": " << g_i << '\n'; // ③} int main(){ std::cout << "main: " << g_i << '\n'; std::thread t1(safe_increment); // ④ std::thread t2(safe_increment); t1.join(); t2.join(); std::cout << "main: " << g_i << '\n';}

這段代碼中:

  1. 全局的互斥體g_i_mutex用來保護全局變量g_i
  2. 這是一個設計為可以被多線程環境使用的方法。因此需要通過互斥體來進行保護。這里沒有調用lock方法,而是直接使用lock_guard來鎖定互斥體。
  3. 在方法結束的時候,局部變量std::lock_guard<std::mutex> lock會被銷毀,它對互斥體的鎖定也就解除了。
  4. 在多個線程中使用這個方法。


RAII


上面的幾個類(lock_guard,unique_lock,shared_lock,scoped_lock)都使用了一個叫做RAII的編程技巧。

RAII全稱是Resource Acquisition Is Initialization,直譯過來就是:資源獲取即初始化。

RAII是一種C++編程技術,它將必須在使用前請求的資源(例如:分配的堆內存、執行線程、打開的套接字、打開的文件、鎖定的互斥體、磁盤空間、數據庫連接等——任何存在受限供給中的事物)的生命周期與一個對象的生存周期相綁定。

RAII保證資源可用于任何會訪問該對象的函數。它亦保證所有資源在其控制對象的生存期結束時,以獲取順序的逆序釋放。類似地,若資源獲取失敗(構造函數以異常退出),則為已構造完成的對象和基類子對象所獲取的所有資源,會以初始化順序的逆序釋放。這有效地利用了語言特性以消除內存泄漏并保證異常安全。

RAII 可總結如下:

  • 將每個資源封裝入一個類,其中:
    • 構造函數請求資源,并建立所有類不變式,或在它無法完成時拋出異常,
    • 析構函數釋放資源并決不拋出異常;
  • 始終經由 RAII 類的實例使用滿足要求的資源,該資源
    • 自身擁有自動存儲期或臨時生存期,或
    • 具有與自動或臨時對象的生存期綁定的生存期

回想一下上文中的transferMoney方法中的三行代碼:

lock(*accountA->getLock(), *accountB->getLock());lock_guard lockA(*accountA->getLock(), adopt_lock);lock_guard lockB(*accountB->getLock(), adopt_lock);

如果使用unique_lock這三行代碼還有一種等價的寫法:

unique_lock lockA(*accountA->getLock(), defer_lock);unique_lock lockB(*accountB->getLock(), defer_lock);lock(*accountA->getLock(), *accountB->getLock());

請注意這里lock方法的調用位置。這里先定義unique_lock指定了defer_lock,因此實際沒有鎖定互斥體,而是到第三行才進行鎖定。

最后,借助scoped_lock,我們可以將三行代碼合成一行,這種寫法也是等價的。

scoped_lock lockAll(*accountA->getLock(), *accountB->getLock());

scoped_lock會在其生命周期范圍內鎖定互斥體,銷毀的時候解鎖。同時,它可以鎖定多個互斥體,并且避免死鎖。

目前,只還有shared_lock我們沒有提到。它與其他幾個類的區別在于:它是以共享的方式鎖定互斥體。

條件變量


| API | C++標準 | 說明 |
| condition_variable | C++ 11 | 提供與 std::unique_lock 關聯的條件變量 |
| condition_variable_any | C++ 11 |提供與任何鎖類型關聯的條件變量 |
| notify_all_at_thread_exit |C++ 11 | 安排到在此線程完全結束時對 notify_all 的調用 |
| cv_status | C++ 11 |列出條件變量上定時等待的可能結果 |

至此,我們還有一個地方可以改進。那就是:轉賬金額不足的時候,程序直接返回了false。這很難說是一個好的策略。因為,即便雖然當前賬號金額不足以轉賬,但只要別的賬號又轉賬進來之后,當前這個轉賬操作也許就可以繼續執行了。

這在很多業務中是很常見的一個需求:每一次操作都要正確執行,如果條件不滿足就停下來等待,直到條件滿足之后再繼續。而不是直接返回。

條件變量提供了一個可以讓多個線程間同步協作的功能。這對于生產者-消費者模型很有意義。在這個模型下:

  • 生產者和消費者共享一個工作區。這個區間的大小是有限的。
  • 生產者總是產生數據放入工作區中,當工作區滿了。它就停下來等消費者消費一部分數據,然后繼續工作。
  • 消費者總是從工作區中拿出數據使用。當工作區中的數據全部被消費空了之后,它也會停下來等待生產者往工作區中放入新的數據。

從上面可以看到,無論是生產者還是消費者,當它們工作的條件不滿足時,它們并不是直接報錯返回,而是停下來等待,直到條件滿足。

下面我們就借助于條件變量,再次改造之前的銀行轉賬系統。

這個改造主要在于賬號類。我們重點是要調整changeMoney方法。

// 11_bank_transfer_wait_notify.cpp
class Account {public: Account(string name, double money): mName(name), mMoney(money) {};
public: void changeMoney(double amount) { unique_lock lock(mMoneyLock); // ② mConditionVar.wait(lock, [this, amount] { // ③ return mMoney + amount > 0; // ④ }); mMoney += amount; mConditionVar.notify_all(); // ⑤ }
string getName() { return mName; }
double getMoney() { return mMoney; }
private: string mName; double mMoney; mutex mMoneyLock; condition_variable mConditionVar; // ①};

這幾處改動說明如下:

  1. 這里聲明了一個條件變量,用來在多個線程之間協作。
  2. 這里使用的是unique_lock,這是為了與條件變量相配合。因為條件變量會解鎖和重新鎖定互斥體。
  3. 這里是比較重要的一個地方:通過條件變量進行等待。此時:會通過后面的lambda表達式判斷條件是否滿足。如果滿足則繼續;如果不滿足,則此處會解鎖互斥體,并讓當前線程等待。解鎖這一點非常重要,因為只有這樣,才能讓其他線程獲取互斥體。
  4. 這里是條件變量等待的條件。如果你不熟悉lambda表達式,請自行網上學習,或者閱讀我之前寫的文章。
  5. 此處也很重要。當金額發生變動之后,我們需要通知所有在條件變量上等待的其他線程。此時所有調用wait線程都會再次喚醒,然后嘗試獲取鎖(當然,只有一個能獲取到)并再次判斷條件是否滿足。除了notify_all還有notify_one,它只通知一個等待的線程。wait和notify就構成了線程間互相協作的工具。

請注意:wait和notify_all雖然是寫在一個函數中的,但是在運行時它們是在多線程環境中執行的,因此對于這段代碼,需要能夠從不同線程的角度去思考代碼的邏輯。這也是開發并發系統比較難的地方。

有了上面的改動之后,銀行的轉賬方法實現起來就很簡單了,不用再考慮數據保護的問題了:

// 11_bank_transfer_wait_notify.cpp
void Bank::transferMoney(Account* accountA, Account* accountB, double amount) { accountA->changeMoney(-amount); accountB->changeMoney(amount);}

當然,轉賬邏輯也會變得簡單,不用再管轉賬失敗的情況發生。

// 11_bank_transfer_wait_notify.cpp
mutex sCoutLock;void randomTransfer(Bank* bank, Account* accountA, Account* accountB) { while(true) { double randomMoney = ((double)rand() / RAND_MAX) * 100; { lock_guard guard(sCoutLock); cout << "Try to Transfer " << randomMoney << " from " << accountA->getName() << "(" << accountA->getMoney() << ") to " << accountB->getName() << "(" << accountB->getMoney() << "), Bank totalMoney: " << bank->totalMoney() << endl; } bank->transferMoney(accountA, accountB, randomMoney); }}

修改完之后的程序運行輸出如下:

...Try to Transfer 13.72 from Moira(10.9287) to Paul(189.071), Bank totalMoney: 200Try to Transfer 28.6579 from Paul(189.071) to Moira(10.9287), Bank totalMoney: 200Try to Transfer 91.8049 from Paul(160.413) to Moira(39.5866), Bank totalMoney: 200Try to Transfer 5.56383 from Paul(82.3285) to Moira(117.672), Bank totalMoney: 200Try to Transfer 11.3594 from Paul(76.7646) to Moira(123.235), Bank totalMoney: 200Try to Transfer 16.9557 from Paul(65.4053) to Moira(134.595), Bank totalMoney: 200Try to Transfer 74.998 from Paul(48.4495) to Moira(151.55), Bank totalMoney: 200Try to Transfer 65.3005 from Moira(151.55) to Paul(48.4495), Bank totalMoney: 200Try to Transfer 90.6084 from Moira(86.25) to Paul(113.75), Bank totalMoney: 125.002Try to Transfer 99.6425 from Moira(70.6395) to Paul(129.36), Bank totalMoney: 200Try to Transfer 55.2091 from Paul(129.36) to Moira(70.6395), Bank totalMoney: 200Try to Transfer 92.259 from Paul(74.1513) to Moira(125.849), Bank totalMoney: 200...
這下比之前都要好了。

但是細心的讀者會發現,Bank totalMoney的輸出有時候是200,有時候不是。但不管怎樣,即便這一次不是,下一次又是了。關于這一點,請讀者自行思考一下為什么,以及如何改進。

future


這一小節中,我們來熟悉更多的可以在并發環境中使用的工具,它們都位于<future>頭文件中。

async


很多語言都提供了異步的機制。異步使得耗時的操作不影響當前主線程的執行流。

在C++11中,async便是完成這樣的功能的。下面是一個代碼示例:

// 12_async_task.cpp
static const int MAX = 10e8;static double sum = 0;
void worker(int min, int max) { for (int i = min; i <= max; i++) { sum += sqrt(i); }}
int main() { sum = 0; auto f1 = async(worker, 0, MAX); cout << "Async task triggered" << endl; f1.wait(); cout << "Async task finish, result: " << sum << endl << endl;}

這仍然是我們之前熟悉的例子。這里有兩個地方需要說明:

  1. 這里以異步的方式啟動了任務。它會返回一個future對象。future用來存儲異步任務的執行結果,關于future我們在后面packaged_task的例子中再詳細說明。在這個例子中我們僅僅用它來等待任務執行完成。
  2. 此處是等待異步任務執行完成。

需要注意的是,默認情況下,async是啟動一個新的線程,還是以同步的方式(不啟動新的線程)運行任務,這一點標準是沒有指定的,由具體的編譯器決定。如果希望一定要以新的線程來異步執行任務,可以通過launch::async來明確說明。launch中有兩個常量:

  • async:運行新線程,以異步執行任務。
  • deferred:調用方線程上第一次請求其結果時才執行任務,即惰性求值。

除了通過函數來指定異步任務,還可以lambda表達式的方式來指定。如下所示:

// 12_async_task.cpp
int main() {
double result = 0; cout << "Async task with lambda triggered, thread: " << this_thread::get_id() << endl; auto f2 = async(launch::async, [&result]() { cout << "Lambda task in thread: " << this_thread::get_id() << endl; for (int i = 0; i <= MAX; i++) { result += sqrt(i); } }); f2.wait(); cout << "Async task with lambda finish, result: " << result << endl << endl; return 0;}

在上面這段代碼中,我們使用一個lambda表達式來編寫異步任務的邏輯,并通過launch::async明確指定要通過獨立的線程來執行任務,同時我們打印出了線程的id。

這段代碼輸出如下:

Async task with lambda triggered, thread: 0x11290d5c0Lambda task in thread: 0x700007aa1000Async task with lambda finish, result: 2.10819e+13


對于面向對象編程來說,很多時候肯定希望以對象的方法來指定異步任務。下面是一個示例:

// 12_async_task.cpp
class Worker {public: Worker(int min, int max): mMin(min), mMax(max) {} // ① double work() { // ② mResult = 0; for (int i = mMin; i <= mMax; i++) { mResult += sqrt(i); } return mResult; } double getResult() { return mResult; }
private: int mMin; int mMax; double mResult;};
int main() { Worker w(0, MAX); cout << "Task in class triggered" << endl; auto f3 = async(&Worker::work, &w); // ③ f3.wait(); cout << "Task in class finish, result: " << w.getResult() << endl << endl;
return 0;}

這段代碼有三處需要說明:

  1. 這里通過一個類來描述任務。這個類是對前面提到的任務的封裝。它包含了任務的輸入參數,和輸出結果。
  2. work函數是任務的主體邏輯。
  3. 通過async執行任務:這里指定了具體的任務函數以及相應的對象。請注意這里是&w,因此傳遞的是對象的指針。如果不寫&將傳入w對象的臨時復制。


packaged_task


在一些業務中,我們可能會有很多的任務需要調度。這時我們常常會設計出任務隊列和線程池的結構。此時,就可以使用packaged_task來包裝任務。

如果你了解設計模式,你應該會知道命令模式。

packaged_task綁定到一個函數或者可調用對象上。當它被調用時,它就會調用其綁定的函數或者可調用對象。并且,可以通過與之相關聯的future來獲取任務的結果。調度程序只需要處理packaged_task,而非各個函數。

packaged_task對象是一個可調用對象,它可以被封裝成一個std::fucntion,或者作為線程函數傳遞給std::thread,或者直接調用。

下面是一個代碼示例:

// 13_packaged_task.cpp
double concurrent_worker(int min, int max) { double sum = 0; for (int i = min; i <= max; i++) { sum += sqrt(i); } return sum;}
double concurrent_task(int min, int max) { vector<future<double>> results; // ①
unsigned concurrent_count = thread::hardware_concurrency(); min = 0; for (int i = 0; i < concurrent_count; i++) { // ② packaged_task<double(int, int)> task(concurrent_worker); // ③ results.push_back(task.get_future()); // ④
int range = max / concurrent_count * (i + 1); thread t(std::move(task), min, range); // ⑤ t.detach();
min = range + 1; }
cout << "threads create finish" << endl; double sum = 0; for (auto& r : results) { sum += r.get(); ⑥ } return sum;}
int main() { auto start_time = chrono::steady_clock::now();
double r = concurrent_task(0, MAX);
auto end_time = chrono::steady_clock::now(); auto ms = chrono::duration_cast<chrono::milliseconds>(end_time - start_time).count(); cout << "Concurrent task finish, " << ms << " ms consumed, Result: " << r << endl; return 0;}

在這段代碼中:

  1. 首先創建一個集合來存儲future對象。我們將用它來獲取任務的結果。
  2. 同樣的,根據CPU的情況來創建線程的數量。
  3. 將任務包裝成packaged_task。請注意,由于concurrent_worker被包裝成了任務,我們無法直接獲取它的return值。而是要通過future對象來獲取。
  4. 獲取任務關聯的future對象,并將其存入集合中。
  5. 通過一個新的線程來執行任務,并傳入需要的參數。
  6. 通過future集合,逐個獲取每個任務的計算結果,將其累加。這里r.get()獲取到的就是每個任務中concurrent_worker的返回值。

為了簡單起見,這里的示例只使用了我們熟悉的例子和結構。但在實際上的工程中,調用關系通常更復雜,你可以借助于packaged_task將任務組裝成隊列,然后通過線程池的方式進行調度:


promise與future


在上面的例子中,concurrent_task的結果是通過return返回的。但在一些時候,我們可能不能這么做:在得到任務結果之后,可能還有一些事情需要繼續處理,例如清理工作。

這個時候,就可以將promise與future配對使用。這樣就可以將返回結果和任務結束兩個事情分開。

下面是對上面代碼示例的改寫:

// 14_promise_future.cpp
double concurrent_worker(int min, int max) { double sum = 0; for (int i = min; i <= max; i++) { sum += sqrt(i); } return sum;}
void concurrent_task(int min, int max, promise<double>* result) { // ① vector<future<double>> results;
unsigned concurrent_count = thread::hardware_concurrency(); min = 0; for (int i = 0; i < concurrent_count; i++) { packaged_task<double(int, int)> task(concurrent_worker); results.push_back(task.get_future());
int range = max / concurrent_count * (i + 1); thread t(std::move(task), min, range); t.detach();
min = range + 1; }
cout << "threads create finish" << endl; double sum = 0; for (auto& r : results) { sum += r.get(); } result->set_value(sum); // ② cout << "concurrent_task finish" << endl;}
int main() { auto start_time = chrono::steady_clock::now();
promise<double> sum; // ③ concurrent_task(0, MAX, &sum);
auto end_time = chrono::steady_clock::now(); auto ms = chrono::duration_cast<chrono::milliseconds>(end_time - start_time).count(); cout << "Concurrent task finish, " << ms << " ms consumed." << endl; cout << "Result: " << sum.get_future().get() << endl; // ④ return 0;}

這段代碼和上面的示例在很大程度上是一樣的。只有小部分內容做了改動:

  1. concurrent_task不再直接返回計算結果,而是增加了一個promise對象來存放結果。
  2. 在任務計算完成之后,將總結過設置到promise對象上。一旦這里調用了set_value,其相關聯的future對象就會就緒。
  3. 這里是在main中創建一個promoise來存放結果,并以指針的形式傳遞進concurrent_task中。
  4. 通過sum.get_future().get()來獲取結果。第2點中已經說了:一旦調用了set_value,其相關聯的future對象就會就緒。

需要注意的是,future對象只有被一個線程獲取值。并且在調用get()之后,就沒有可以獲取的值了。如果從多個線程調用get()會出現數據競爭,其結果是未定義的。

如果真的需要在多個線程中獲取future的結果,可以使用shared_future。

并行算法

從C++17開始。<algorithm>和<numeric> 頭文件的中的很多算法都添加了一個新的參數:sequenced_policy。

借助這個參數,開發者可以直接使用這些算法的并行版本,不用再自己創建并發系統和劃分數據來調度這些算法。

sequenced_policy可能的取值有三種,它們的說明如下:



注意:本文的前面已經提到,目前clang編譯器還不支持這個功能。因此想要編譯這部分代碼,你需要使用gcc 9.0或更高版本,同時還需要安裝Intel Threading Building Blocks。

下面還是通過一個示例來進行說明:

// 15_parallel_algorithm.cpp

void generateRandomData(vector<double>& collection, int size) { random_device rd; mt19937 mt(rd()); uniform_real_distribution<double> dist(1.0, 100.0); for (int i = 0; i < size; i++) { collection.push_back(dist(mt)); }}



int main() { vector<double> collection; generateRandomData(collection, 10e6); // ①

vector<double> copy1(collection); // ② vector<double> copy2(collection); vector<double> copy3(collection);

auto time1 = chrono::steady_clock::now(); // ③ sort(execution::seq, copy1.begin(), copy1.end()); // ④ auto time2 = chrono::steady_clock::now(); auto duration = chrono::duration_cast<chrono::milliseconds>(time2 - time1).count(); cout << "Sequenced sort consuming " << duration << "ms." << endl; // ⑤

auto time3 = chrono::steady_clock::now(); sort(execution::par, copy2.begin(),copy2.end()); // ⑥ auto time4 = chrono::steady_clock::now(); duration = chrono::duration_cast<chrono::milliseconds>(time4 - time3).count(); cout << "Parallel sort consuming " << duration << "ms." << endl;

auto time5 = chrono::steady_clock::now(); sort(execution::par_unseq, copy2.begin(),copy2.end()); // ⑦ auto time6 = chrono::steady_clock::now(); duration = chrono::duration_cast<chrono::milliseconds>(time6 - time5).count(); cout << "Parallel unsequenced sort consuming " << duration << "ms." << endl;}

這段代碼很簡單:

  1. 通過一個函數生成1000,000個隨機數。

  2. 將數據拷貝3份,以備使用。

  3. 接下來將通過三個不同的parallel_policy參數來調用同樣的sort算法。每次調用記錄開始和結束的時間。

  4. 第一次調用使用std::execution::seq參數。

  5. 輸出本次測試所使用的時間。

  6. 第二次調用使用std::execution::par參數。

  7. 第三次調用使用std::execution::par_unseq參數。


該程序的輸出如下:

Sequenced sort consuming 4464ms.Parallel sort consuming 459ms.Parallel unsequenced sort consuming 168ms.

可以看到,性能最好的和最差的相差了超過26倍。

結束語

在本篇文章中,我們介紹了C++語言中新增的并發編程API。雖然這部分內容已經不少(大部分人很難一次性搞懂所有這些內容,包括我自己),但實際上還有一個很重要的話題我們沒有觸及,那就是“內存模型”。

C++內存模型是C++11標準中最重要的特性之一。它是多線程環境能夠可靠工作的基礎。考慮到這部分內容還需要比較多的篇幅來說明,因此我們會在下一篇文章中繼續討論。

C++學習資料免費獲取方法:關注程序喵大人,后臺回復“程序喵”即可免費獲取40萬字C++進階獨家學習資料。





往期推薦


1、少寫點
if-else吧,它的效率有多低你知道嗎?
2、年度原創好文匯總
3、全網首發!!C++20新特性全在這一張圖里了
4
他來了,他來了,C+
+17新特性精華都在這了
5、一文讓你搞懂設計模式
6、C++11新特性,所有知識點都在這了!
主站蜘蛛池模板: 古蔺县| 上虞市| 奉节县| 芒康县| 普兰店市| 九龙县| 海南省| 阿图什市| 龙游县| 江津市| 怀来县| 吴忠市| 灵台县| 七台河市| 南乐县| 斗六市| 英吉沙县| 合江县| 宝鸡市| 苏尼特右旗| 苍山县| 耿马| 汶川县| 会理县| 鄢陵县| 和顺县| 右玉县| 杭锦旗| 丰宁| 外汇| 黄陵县| 绥芬河市| 博野县| 扎囊县| 苍溪县| 鄱阳县| 抚顺市| 忻州市| 乐东| 大荔县| 明光市|