置頂/星標公眾號????,硬核文章第一時間送達!
為什么要并發編程
并發與并行

并發:如果多個隊列可以交替使用某臺咖啡機,則這一行為就是并發的。 并行:如果存在多臺咖啡機可以被多個隊列交替使用,則就是并行。
進程與線程
進程(英語:process),是指計算機中已運行的程序。進程為曾經是分時系統的基本運作單位。在面向進程設計的系統(如早期的UNIX,Linux 2.4及更早的版本)中,進程是程序的基本執行實體; 線程(英語:thread)是操作系統能夠進行運算調度的最小單位。它被包含在進程之中,是進程中的實際運作單位。 -- 維基百科


并發系統的性能
C++與并發編程
相較而言,Java自JDK 1.0就包含了多線程模型。

編譯器與C++標準
GCC對于C++特性的支持請參見這里:C++ Standards Support in GCC。 Clang對于C++特性的支持請參見這里:C++ Support in Clang。
C++標準與相應的GCC版本要求如下:

C++標準與相應的Clang版本要求如下:

g++ -std=c++17 your_file.cpp -o your_program
測試環境
git clone https://github.com/paulQuei/cpp-concurrency.git
具體編譯器對于C++特性支持的情況請參見這里:C++ compiler support。
./make_all.sh
MacOS
rew install gcc
brew insbtall tbb
export tbb_path=/usr/local/Cellar/tbb/2019_U8/
./make_all.sh
brew upgrade gcc
Ubuntu
sudo add-apt-repository ppa:ubuntu-toolchain-r/test
sudo apt-get update
sudo apt install gcc-9 g++-9
libtbb2_2019~U8-1_amd64.deb libtbb-dev_2019~U8-1_amd64.deb
sudo apt install ~/Downloads/libtbb2_2019~U8-1_amd64.deb
sudo apt install ~/Downloads/libtbb-dev_2019~U8-1_amd64.deb
線程
創建線程
// 01_hello_thread.cpp
#include <iostream>
#include <thread> // ①
using namespace std; // ②
void hello() { // ③
cout << "Hello World from new thread." << endl;
}
int main() {
thread t(hello); // ④
t.join(); // ⑤
return 0;
}
為了使用多線程的接口,我們需要#include <thread>頭文件。 為了簡化聲明,本文中的代碼都將using namespace std;。 新建線程的入口是一個普通的函數,它并沒有什么特別的地方。 創建線程的方式就是構造一個thread對象,并指定入口函數。與普通對象不一樣的是,此時編譯器便會為我們創建一個新的操作系統線程,并在新的線程中執行我們的入口函數。 關于join函數在下文中講解。
// 02_lambda_thread.cpp
#include <iostream>
#include <thread>
using namespace std;
int main() {
thread t([] {
cout << "Hello World from lambda thread." << endl;
});
t.join();
return 0;
}
為了減少不必要的重復,若無必要,下文中的代碼將不貼出include指令以及using聲明。
// 03_thread_argument.cpp
void hello(string name) {
cout << "Welcome to " << name << endl;
}
int main() {
thread t(hello, "https://paul.pub");
t.join();
return 0;
}
join與detach
主要API

請思考在上面的代碼示例中,thread對象在何時會銷毀。
join:調用此接口時,當前線程會一直阻塞,直到目標線程執行完成(當然,很可能目標線程在此處調用之前就已經執行完成了,不過這不要緊)。因此,如果目標線程的任務非常耗時,你就要考慮好是否需要在主線程上等待它了,因此這很可能會導致主線程卡住。 detach:detach是讓目標線程成為守護線程(daemon threads)。一旦detach之后,目標線程將獨立執行,即便其對應的thread對象銷毀也不影響線程的執行。并且,你無法再與之通信。
管理當前線程
主要API

yield 通常用在自己的主要任務已經完成的時候,此時希望讓出處理器給其他任務使用。 get_id 返回當前線程的id,可以以此來標識不同的線程。 sleep_for 是讓當前線程停止一段時間。 sleep_until 和sleep_for類似,但是是以具體的時間點為參數。這兩個API都以chrono API(由于篇幅所限,這里不展開這方面內容)為基礎。
// 04_thread_self_manage.cpp
void print_time() {
auto now = chrono::system_clock::now();
auto in_time_t = chrono::system_clock::to_time_t(now);
std::stringstream ss;
ss << put_time(localtime(&in_time_t), "%Y-%m-%d %X");
cout << "now is: " << ss.str() << endl;
}
void sleep_thread() {
this_thread::sleep_for(chrono::seconds(3));
cout << "[thread-" << this_thread::get_id() << "] is waking up" << endl;
}
void loop_thread() {
for (int i = 0; i < 10; i++) {
cout << "[thread-" << this_thread::get_id() << "] print: " << i << endl;
}
}
int main() {
print_time();
thread t1(sleep_thread);
thread t2(loop_thread);
t1.join();
t2.detach();
print_time();
return 0;
}
now is: 2019-10-13 10:17:48
[thread-0x70000cdda000] print: 0
[thread-0x70000cdda000] print: 1
[thread-0x70000cdda000] print: 2
[thread-0x70000cdda000] print: 3
[thread-0x70000cdda000] print: 4
[thread-0x70000cdda000] print: 5
[thread-0x70000cdda000] print: 6
[thread-0x70000cdda000] print: 7
[thread-0x70000cdda000] print: 8
[thread-0x70000cdda000] print: 9
[thread-0x70000cd57000] is waking up
now is: 2019-10-13 10:17:51
一次調用
主要API
// 05_call_once.cpp
void init() {
cout << "Initialing..." << endl;
// Do something...
}
void worker(once_flag* flag) {
call_once(*flag, init);
}
int main() {
once_flag flag;
thread t1(worker, &flag);
thread t2(worker, &flag);
thread t3(worker, &flag);
t1.join();
t2.join();
t3.join();
return 0;
}
請思考一下,為什么要在main函數中創建once_flag flag。如果是在worker函數中直接聲明一個once_flag并使用行不行?為什么?
并發任務
// 06_naive_multithread.cpp
static const int MAX = 10e8; // ①
static double sum = 0; // ②
void worker(int min, int max) { // ③
for (int i = min; i <= max; i++) {
sum += sqrt(i);
}
}
void serial_task(int min, int max) { // ④
auto start_time = chrono::steady_clock::now();
sum = 0;
worker(0, MAX);
auto end_time = chrono::steady_clock::now();
auto ms = chrono::duration_cast<chrono::milliseconds>(end_time - start_time).count();
cout << "Serail task finish, " << ms << " ms consumed, Result: " << sum << endl;
}
通過一個常量指定數據范圍,這個是為了方便調整。 通過一個全局變量來存儲結果。 通過一個任務函數來計算值。 統計任務的執行時間。
Serail task finish, 6406 ms consumed, Result: 2.10819e+13
// 06_naive_multithread.cpp
void concurrent_task(int min, int max) {
auto start_time = chrono::steady_clock::now();
unsigned concurrent_count = thread::hardware_concurrency(); // ①
cout << "hardware_concurrency: " << concurrent_count << endl;
vector<thread> threads;
min = 0;
sum = 0;
for (int t = 0; t < concurrent_count; t++) { // ②
int range = max / concurrent_count * (t + 1);
threads.push_back(thread(worker, min, range)); // ③
min = range + 1;
}
for (auto& t : threads) {
t.join(); // ④
}
auto end_time = chrono::steady_clock::now();
auto ms = chrono::duration_cast<chrono::milliseconds>(end_time - start_time).count();
cout << "Concurrent task finish, " << ms << " ms consumed, Result: " << sum << endl;
}
thread::hardware_concurrency()可以獲取到當前硬件支持多少個線程并行執行。 根據處理器的情況決定線程的數量。 對于每一個線程都通過worker函數來完成任務,并劃分一部分數據給它處理。 等待每一個線程執行結束。
hardware_concurrency: 16
Concurrent task finish, 6246 ms consumed, Result: 1.78162e+12
事實上,目前大部分CPU的緩存已經不只一層。


競爭條件與臨界區
互斥體與鎖
mutex
獨立的對于劃分給自己的數據的處理 對于處理結果的匯總
主要API

| lock|鎖定互斥體,如果不可用,則阻塞 |
| try_lock |嘗試鎖定互斥體,如果不可用,直接返回 |
|unlock | 解鎖互斥體|
超時: timed_mutex, recursive_timed_mutex, shared_timed_mutex 名稱都帶有timed,這意味著它們都支持超時功能。它們都提供了try_lock_for和try_lock_until方法,這兩個方法分別可以指定超時的時間長度和時間點。如果在超時的時間范圍內沒有能獲取到鎖,則直接返回,不再繼續等待。 可重入: recursive_mutex和recursive_timed_mutex的名稱都帶有recursive。可重入或者叫做可遞歸,是指在同一個線程中,同一把鎖可以鎖定多次。這就避免了一些不必要的死鎖。 共享: shared_timed_mutex和shared_mutex提供了共享功能。對于這類互斥體,實際上是提供了兩把鎖:一把是共享鎖,一把是互斥鎖。一旦某個線程獲取了互斥鎖,任何其他線程都無法再獲取互斥鎖和共享鎖;但是如果有某個線程獲取到了共享鎖,其他線程無法再獲取到互斥鎖,但是還有獲取到共享鎖。這里互斥鎖的使用和其他的互斥體接口和功能一樣。而共享鎖可以同時被多個線程同時獲取到(使用共享鎖的接口見下面的表格)。共享鎖通常用在讀者寫者模型上。
|lock_shared | 獲取互斥體的共享鎖,如果無法獲取則阻塞 |
| try_lock_shared| 嘗試獲取共享鎖,如果不可用,直接返回 |
| unlock_shared| 解鎖共享鎖 |
// 07_mutex_lock.cpp
static const int MAX = 10e8;
static double sum = 0;
static mutex exclusive;
void concurrent_worker(int min, int max) {
for (int i = min; i <= max; i++) {
exclusive.lock(); // ①
sum += sqrt(i);
exclusive.unlock(); // ②
}
}
void concurrent_task(int min, int max) {
auto start_time = chrono::steady_clock::now();
unsigned concurrent_count = thread::hardware_concurrency();
cout << "hardware_concurrency: " << concurrent_count << endl;
vector<thread> threads;
min = 0;
sum = 0;
for (int t = 0; t < concurrent_count; t++) {
int range = max / concurrent_count * (t + 1);
threads.push_back(thread(concurrent_worker, min, range)); // ③
min = range + 1;
}
for (int i = 0; i < threads.size(); i++) {
threads[i].join();
}
auto end_time = chrono::steady_clock::now();
auto ms = chrono::duration_cast<chrono::milliseconds>(end_time - start_time).count();
cout << "Concurrent task finish, " << ms << " ms consumed, Result: " << sum << endl;
}
在訪問共享數據之前加鎖 訪問完成之后解鎖 在多線程中使用帶鎖的版本
hardware_concurrency: 16
Concurrent task finish, 74232 ms consumed, Result: 2.10819e+13
// 08_improved_mutex_lock.cpp
void concurrent_worker(int min, int max) {
double tmp_sum = 0;
for (int i = min; i <= max; i++) {
tmp_sum += sqrt(i); // ①
}
exclusive.lock(); // ②
sum += tmp_sum;
exclusive.unlock();
}
通過一個局部變量保存當前線程的處理結果 在匯總總結過的時候進行鎖保護
hardware_concurrency: 16
Concurrent task finish, 451 ms consumed, Result: 2.10819e+13
In general, a lock should be held for only the minimum possible time needed to perform the required operations. --《C++ Concurrency in Action》
死鎖

// 09_deadlock_bank_transfer.cpp
class Account {
public:
Account(string name, double money): mName(name), mMoney(money) {};
public:
void changeMoney(double amount) {
mMoney += amount;
}
string getName() {
return mName;
}
double getMoney() {
return mMoney;
}
mutex* getLock() {
return &mMoneyLock;
}
private:
string mName;
double mMoney;
mutex mMoneyLock;
};
// 09_deadlock_bank_transfer.cpp
class Bank {
public:
void addAccount(Account* account) {
mAccounts.insert(account);
}
bool transferMoney(Account* accountA, Account* accountB, double amount) {
lock_guard guardA(*accountA->getLock()); // ①
lock_guard guardB(*accountB->getLock());
if (amount > accountA->getMoney()) { // ②
return false;
}
accountA->changeMoney(-amount); // ③
accountB->changeMoney(amount);
return true;
}
double totalMoney() const {
double sum = 0;
for (auto a : mAccounts) {
sum += a->getMoney();
}
return sum;
}
private:
set<Account*> mAccounts;
};
為了保證線程安全,在修改每個賬號之前,需要獲取相應的鎖。 判斷轉出賬戶金額是否足夠,如果不夠此次轉賬失敗。 進行轉賬。
// 09_deadlock_bank_transfer.cpp
void randomTransfer(Bank* bank, Account* accountA, Account* accountB) {
while(true) {
double randomMoney = ((double)rand() / RAND_MAX) * 100;
if (bank->transferMoney(accountA, accountB, randomMoney)) {
cout << "Transfer " << randomMoney << " from " << accountA->getName()
<< " to " << accountB->getName()
<< ", Bank totalMoney: " << bank->totalMoney() << endl;
} else {
cout << "Transfer failed, "
<< accountA->getName() << " has only $" << accountA->getMoney() << ", but "
<< randomMoney << " required" << endl;
}
}
}
// 09_deadlock_bank_transfer.cpp
int main() {
Account a("Paul", 100);
Account b("Moira", 100);
Bank aBank;
aBank.addAccount(&a);
aBank.addAccount(&b);
thread t1(randomTransfer, &aBank, &a, &b);
thread t2(randomTransfer, &aBank, &b, &a);
t1.join();
t2.join();
return 0;
}
...
Transfer 13.2901 from Paul to Moira, Bank totalMoney: 20042.6259 from Moira to Paul, Bank totalMoney: 200
Transfer failed, Moira has only $34.7581, but 66.3208 required
Transfer failed, Moira has only $34.7581, but
Transfer 93.191 from 53.9176 required
Transfer 60.6146 from Moira to Paul, Bank totalMoney: 200
Transfer 49.7304 from Moira to Paul, Bank totalMoney: 200Paul to Moira, Bank totalMoney:
Transfer failed, Moira has only $17.6041, but 18.1186 required
Transfer failed, Moira has only $17.6041, but 18.893 required
Transfer failed, Moira has only $17.6041, but 34.7078 required
Transfer failed, Moira has only $17.6041, but 33.9569 required
Transfer 12.7899 from 200
Moira to Paul, Bank totalMoney: 200
Transfer failed, Moira has only $63.9373, but 80.9038 required
Transfer 50.933 from Moira to Paul, Bank totalMoney: 200
Transfer failed, Moira has only $13.0043, but 30.2056 required
Transfer failed, Moira has only $Transfer 59.123 from Paul to Moira, Bank totalMoney: 200
Transfer 29.0486 from Paul to Moira, Bank totalMoney: 20013.0043, but 64.7307 required

// 10_improved_bank_transfer.cpp
mutex sCoutLock;
void randomTransfer(Bank* bank, Account* accountA, Account* accountB) {
while(true) {
double randomMoney = ((double)rand() / RAND_MAX) * 100;
if (bank->transferMoney(accountA, accountB, randomMoney)) {
sCoutLock.lock();
cout << "Transfer " << randomMoney << " from " << accountA->getName()
<< " to " << accountB->getName()
<< ", Bank totalMoney: " << bank->totalMoney() << endl;
sCoutLock.unlock();
} else {
sCoutLock.lock();
cout << "Transfer failed, "
<< accountA->getName() << " has only " << accountA->getMoney() << ", but "
<< randomMoney << " required" << endl;
sCoutLock.unlock();
}
}
}
請思考一下兩處lock和unlock調用,并考慮為什么不在while(true)下面寫一次整體的加鎖和解鎖。
通用鎖定算法
主要API

// 10_improved_bank_transfer.cpp
bool transferMoney(Account* accountA, Account* accountB, double amount) {
lock(*accountA->getLock(), *accountB->getLock()); // ①
lock_guard lockA(*accountA->getLock(), adopt_lock); // ②
lock_guard lockB(*accountB->getLock(), adopt_lock); // ③
if (amount > accountA->getMoney()) {
return false;
}
accountA->changeMoney(-amount);
accountB->changeMoney(amount);
return true;
}
這里通過lock函數來獲取兩把鎖,標準庫的實現會保證不會發生死鎖。 lock_guard在下面我們還會詳細介紹。這里只要知道它會在自身對象生命周期的范圍內鎖定互斥體即可。創建lock_guard的目的是為了在transferMoney結束的時候釋放鎖,lockB也是一樣。但需要注意的是,這里傳遞了 adopt_lock表示:現在是已經獲取到互斥體了的狀態了,不用再次加鎖(如果不加adopt_lock就是二次鎖定了)。
...
Transfer failed, Paul has only $1.76243, but 17.5974 required
Transfer failed, Paul has only $1.76243, but 59.2104 required
Transfer failed, Paul has only $1.76243, but 49.6379 required
Transfer failed, Paul has only $1.76243, but 63.6373 required
Transfer failed, Paul has only $1.76243, but 51.8742 required
Transfer failed, Paul has only $1.76243, but 50.0081 required
Transfer failed, Paul has only $1.76243, but 86.1041 required
Transfer failed, Paul has only $1.76243, but 51.3278 required
Transfer failed, Paul has only $1.76243, but 66.5754 required
Transfer failed, Paul has only $1.76243, but 32.1867 required
Transfer failed, Paul has only $1.76243, but 62.0039 required
Transfer failed, Paul has only $1.76243, but 98.7819 required
Transfer failed, Paul has only $1.76243, but 27.046 required
Transfer failed, Paul has only $1.76243, but 62.9155 required
Transfer 98.8478 from Moira to Paul, Bank totalMoney: 200
Transfer 80.0722 from Moira to Paul, Bank totalMoney: 200
Transfer 73.7035 from Moira to Paul, Bank totalMoney: 200
Transfer 34.4476 from Moira to Paul, Bank totalMoney: 200
Transfer failed, Moira has only $10.0142, but 61.3033 required
Transfer failed, Moira has only $10.0142, but 24.5595 required
...
通用互斥管理
主要API

// https://en.cppreference.com/w/cpp/thread/lock_guard
int g_i = 0;
std::mutex g_i_mutex; // ①
void safe_increment()
{
std::lock_guard<std::mutex> lock(g_i_mutex); // ②
++g_i;
std::cout << std::this_thread::get_id() << ": " << g_i << '\n';
// ③
}
int main()
{
std::cout << "main: " << g_i << '\n';
std::thread t1(safe_increment); // ④
std::thread t2(safe_increment);
t1.join();
t2.join();
std::cout << "main: " << g_i << '\n';
}
全局的互斥體g_i_mutex用來保護全局變量g_i 這是一個設計為可以被多線程環境使用的方法。因此需要通過互斥體來進行保護。這里沒有調用lock方法,而是直接使用lock_guard來鎖定互斥體。 在方法結束的時候,局部變量std::lock_guard<std::mutex> lock會被銷毀,它對互斥體的鎖定也就解除了。 在多個線程中使用這個方法。
RAII
RAII保證資源可用于任何會訪問該對象的函數。它亦保證所有資源在其控制對象的生存期結束時,以獲取順序的逆序釋放。類似地,若資源獲取失敗(構造函數以異常退出),則為已構造完成的對象和基類子對象所獲取的所有資源,會以初始化順序的逆序釋放。這有效地利用了語言特性以消除內存泄漏并保證異常安全。
將每個資源封裝入一個類,其中: 構造函數請求資源,并建立所有類不變式,或在它無法完成時拋出異常, 析構函數釋放資源并決不拋出異常; 始終經由 RAII 類的實例使用滿足要求的資源,該資源 自身擁有自動存儲期或臨時生存期,或 具有與自動或臨時對象的生存期綁定的生存期
lock(*accountA->getLock(), *accountB->getLock());
lock_guard lockA(*accountA->getLock(), adopt_lock);
lock_guard lockB(*accountB->getLock(), adopt_lock);
unique_lock lockA(*accountA->getLock(), defer_lock);
unique_lock lockB(*accountB->getLock(), defer_lock);
lock(*accountA->getLock(), *accountB->getLock());
scoped_lock lockAll(*accountA->getLock(), *accountB->getLock());
條件變量
| condition_variable | C++ 11 | 提供與 std::unique_lock 關聯的條件變量 |
| condition_variable_any | C++ 11 |提供與任何鎖類型關聯的條件變量 |
| notify_all_at_thread_exit |C++ 11 | 安排到在此線程完全結束時對 notify_all 的調用 |
| cv_status | C++ 11 |列出條件變量上定時等待的可能結果 |
生產者和消費者共享一個工作區。這個區間的大小是有限的。 生產者總是產生數據放入工作區中,當工作區滿了。它就停下來等消費者消費一部分數據,然后繼續工作。 消費者總是從工作區中拿出數據使用。當工作區中的數據全部被消費空了之后,它也會停下來等待生產者往工作區中放入新的數據。
// 11_bank_transfer_wait_notify.cpp
class Account {
public:
Account(string name, double money): mName(name), mMoney(money) {};
public:
void changeMoney(double amount) {
unique_lock lock(mMoneyLock); // ②
mConditionVar.wait(lock, [this, amount] { // ③
return mMoney + amount > 0; // ④
});
mMoney += amount;
mConditionVar.notify_all(); // ⑤
}
string getName() {
return mName;
}
double getMoney() {
return mMoney;
}
private:
string mName;
double mMoney;
mutex mMoneyLock;
condition_variable mConditionVar; // ①
};
這里聲明了一個條件變量,用來在多個線程之間協作。 這里使用的是unique_lock,這是為了與條件變量相配合。因為條件變量會解鎖和重新鎖定互斥體。 這里是比較重要的一個地方:通過條件變量進行等待。此時:會通過后面的lambda表達式判斷條件是否滿足。如果滿足則繼續;如果不滿足,則此處會解鎖互斥體,并讓當前線程等待。解鎖這一點非常重要,因為只有這樣,才能讓其他線程獲取互斥體。 這里是條件變量等待的條件。如果你不熟悉lambda表達式,請自行網上學習,或者閱讀我之前寫的文章。 此處也很重要。當金額發生變動之后,我們需要通知所有在條件變量上等待的其他線程。此時所有調用wait線程都會再次喚醒,然后嘗試獲取鎖(當然,只有一個能獲取到)并再次判斷條件是否滿足。除了notify_all還有notify_one,它只通知一個等待的線程。wait和notify就構成了線程間互相協作的工具。
// 11_bank_transfer_wait_notify.cpp
void Bank::transferMoney(Account* accountA, Account* accountB, double amount) {
accountA->changeMoney(-amount);
accountB->changeMoney(amount);
}
// 11_bank_transfer_wait_notify.cpp
mutex sCoutLock;
void randomTransfer(Bank* bank, Account* accountA, Account* accountB) {
while(true) {
double randomMoney = ((double)rand() / RAND_MAX) * 100;
{
lock_guard guard(sCoutLock);
cout << "Try to Transfer " << randomMoney
<< " from " << accountA->getName() << "(" << accountA->getMoney()
<< ") to " << accountB->getName() << "(" << accountB->getMoney()
<< "), Bank totalMoney: " << bank->totalMoney() << endl;
}
bank->transferMoney(accountA, accountB, randomMoney);
}
}
...
Try to Transfer 13.72 from Moira(10.9287) to Paul(189.071), Bank totalMoney: 200
Try to Transfer 28.6579 from Paul(189.071) to Moira(10.9287), Bank totalMoney: 200
Try to Transfer 91.8049 from Paul(160.413) to Moira(39.5866), Bank totalMoney: 200
Try to Transfer 5.56383 from Paul(82.3285) to Moira(117.672), Bank totalMoney: 200
Try to Transfer 11.3594 from Paul(76.7646) to Moira(123.235), Bank totalMoney: 200
Try to Transfer 16.9557 from Paul(65.4053) to Moira(134.595), Bank totalMoney: 200
Try to Transfer 74.998 from Paul(48.4495) to Moira(151.55), Bank totalMoney: 200
Try to Transfer 65.3005 from Moira(151.55) to Paul(48.4495), Bank totalMoney: 200
Try to Transfer 90.6084 from Moira(86.25) to Paul(113.75), Bank totalMoney: 125.002
Try to Transfer 99.6425 from Moira(70.6395) to Paul(129.36), Bank totalMoney: 200
Try to Transfer 55.2091 from Paul(129.36) to Moira(70.6395), Bank totalMoney: 200
Try to Transfer 92.259 from Paul(74.1513) to Moira(125.849), Bank totalMoney: 200
...
future

async
// 12_async_task.cpp
static const int MAX = 10e8;
static double sum = 0;
void worker(int min, int max) {
for (int i = min; i <= max; i++) {
sum += sqrt(i);
}
}
int main() {
sum = 0;
auto f1 = async(worker, 0, MAX);
cout << "Async task triggered" << endl;
f1.wait();
cout << "Async task finish, result: " << sum << endl << endl;
}
這里以異步的方式啟動了任務。它會返回一個future對象。future用來存儲異步任務的執行結果,關于future我們在后面packaged_task的例子中再詳細說明。在這個例子中我們僅僅用它來等待任務執行完成。 此處是等待異步任務執行完成。
async:運行新線程,以異步執行任務。 deferred:調用方線程上第一次請求其結果時才執行任務,即惰性求值。
// 12_async_task.cpp
int main() {
double result = 0;
cout << "Async task with lambda triggered, thread: " << this_thread::get_id() << endl;
auto f2 = async(launch::async, [&result]() {
cout << "Lambda task in thread: " << this_thread::get_id() << endl;
for (int i = 0; i <= MAX; i++) {
result += sqrt(i);
}
});
f2.wait();
cout << "Async task with lambda finish, result: " << result << endl << endl;
return 0;
}
Async task with lambda triggered, thread: 0x11290d5c0
Lambda task in thread: 0x700007aa1000
Async task with lambda finish, result: 2.10819e+13
// 12_async_task.cpp
class Worker {
public:
Worker(int min, int max): mMin(min), mMax(max) {} // ①
double work() { // ②
mResult = 0;
for (int i = mMin; i <= mMax; i++) {
mResult += sqrt(i);
}
return mResult;
}
double getResult() {
return mResult;
}
private:
int mMin;
int mMax;
double mResult;
};
int main() {
Worker w(0, MAX);
cout << "Task in class triggered" << endl;
auto f3 = async(&Worker::work, &w); // ③
f3.wait();
cout << "Task in class finish, result: " << w.getResult() << endl << endl;
return 0;
}
這里通過一個類來描述任務。這個類是對前面提到的任務的封裝。它包含了任務的輸入參數,和輸出結果。 work函數是任務的主體邏輯。 通過async執行任務:這里指定了具體的任務函數以及相應的對象。請注意這里是&w,因此傳遞的是對象的指針。如果不寫&將傳入w對象的臨時復制。
packaged_task
如果你了解設計模式,你應該會知道命令模式。
// 13_packaged_task.cpp
double concurrent_worker(int min, int max) {
double sum = 0;
for (int i = min; i <= max; i++) {
sum += sqrt(i);
}
return sum;
}
double concurrent_task(int min, int max) {
vector<future<double>> results; // ①
unsigned concurrent_count = thread::hardware_concurrency();
min = 0;
for (int i = 0; i < concurrent_count; i++) { // ②
packaged_task<double(int, int)> task(concurrent_worker); // ③
results.push_back(task.get_future()); // ④
int range = max / concurrent_count * (i + 1);
thread t(std::move(task), min, range); // ⑤
t.detach();
min = range + 1;
}
cout << "threads create finish" << endl;
double sum = 0;
for (auto& r : results) {
sum += r.get(); ⑥
}
return sum;
}
int main() {
auto start_time = chrono::steady_clock::now();
double r = concurrent_task(0, MAX);
auto end_time = chrono::steady_clock::now();
auto ms = chrono::duration_cast<chrono::milliseconds>(end_time - start_time).count();
cout << "Concurrent task finish, " << ms << " ms consumed, Result: " << r << endl;
return 0;
}
首先創建一個集合來存儲future對象。我們將用它來獲取任務的結果。 同樣的,根據CPU的情況來創建線程的數量。 將任務包裝成packaged_task。請注意,由于concurrent_worker被包裝成了任務,我們無法直接獲取它的return值。而是要通過future對象來獲取。 獲取任務關聯的future對象,并將其存入集合中。 通過一個新的線程來執行任務,并傳入需要的參數。 通過future集合,逐個獲取每個任務的計算結果,將其累加。這里r.get()獲取到的就是每個任務中concurrent_worker的返回值。
promise與future
// 14_promise_future.cpp
double concurrent_worker(int min, int max) {
double sum = 0;
for (int i = min; i <= max; i++) {
sum += sqrt(i);
}
return sum;
}
void concurrent_task(int min, int max, promise<double>* result) { // ①
vector<future<double>> results;
unsigned concurrent_count = thread::hardware_concurrency();
min = 0;
for (int i = 0; i < concurrent_count; i++) {
packaged_task<double(int, int)> task(concurrent_worker);
results.push_back(task.get_future());
int range = max / concurrent_count * (i + 1);
thread t(std::move(task), min, range);
t.detach();
min = range + 1;
}
cout << "threads create finish" << endl;
double sum = 0;
for (auto& r : results) {
sum += r.get();
}
result->set_value(sum); // ②
cout << "concurrent_task finish" << endl;
}
int main() {
auto start_time = chrono::steady_clock::now();
promise<double> sum; // ③
concurrent_task(0, MAX, &sum);
auto end_time = chrono::steady_clock::now();
auto ms = chrono::duration_cast<chrono::milliseconds>(end_time - start_time).count();
cout << "Concurrent task finish, " << ms << " ms consumed." << endl;
cout << "Result: " << sum.get_future().get() << endl; // ④
return 0;
}
concurrent_task不再直接返回計算結果,而是增加了一個promise對象來存放結果。 在任務計算完成之后,將總結過設置到promise對象上。一旦這里調用了set_value,其相關聯的future對象就會就緒。 這里是在main中創建一個promoise來存放結果,并以指針的形式傳遞進concurrent_task中。 通過sum.get_future().get()來獲取結果。第2點中已經說了:一旦調用了set_value,其相關聯的future對象就會就緒。

// 15_parallel_algorithm.cpp
void generateRandomData(vector<double>& collection, int size) {
random_device rd;
mt19937 mt(rd());
uniform_real_distribution<double> dist(1.0, 100.0);
for (int i = 0; i < size; i++) {
collection.push_back(dist(mt));
}
}
int main() {
vector<double> collection;
generateRandomData(collection, 10e6); // ①
vector<double> copy1(collection); // ②
vector<double> copy2(collection);
vector<double> copy3(collection);
auto time1 = chrono::steady_clock::now(); // ③
sort(execution::seq, copy1.begin(), copy1.end()); // ④
auto time2 = chrono::steady_clock::now();
auto duration = chrono::duration_cast<chrono::milliseconds>(time2 - time1).count();
cout << "Sequenced sort consuming " << duration << "ms." << endl; // ⑤
auto time3 = chrono::steady_clock::now();
sort(execution::par, copy2.begin(),copy2.end()); // ⑥
auto time4 = chrono::steady_clock::now();
duration = chrono::duration_cast<chrono::milliseconds>(time4 - time3).count();
cout << "Parallel sort consuming " << duration << "ms." << endl;
auto time5 = chrono::steady_clock::now();
sort(execution::par_unseq, copy2.begin(),copy2.end()); // ⑦
auto time6 = chrono::steady_clock::now();
duration = chrono::duration_cast<chrono::milliseconds>(time6 - time5).count();
cout << "Parallel unsequenced sort consuming " << duration << "ms." << endl;
}
通過一個函數生成1000,000個隨機數。
將數據拷貝3份,以備使用。
接下來將通過三個不同的parallel_policy參數來調用同樣的sort算法。每次調用記錄開始和結束的時間。
第一次調用使用std::execution::seq參數。
輸出本次測試所使用的時間。
第二次調用使用std::execution::par參數。
第三次調用使用std::execution::par_unseq參數。
Sequenced sort consuming 4464ms.
Parallel sort consuming 459ms.
Parallel unsequenced sort consuming 168ms.
C++學習資料免費獲取方法:關注程序喵大人,后臺回復“程序喵”即可免費獲取40萬字C++進階獨家學習資料。
往期推薦