置頂/星標(biāo)公眾號(hào)????,硬核文章第一時(shí)間送達(dá)!
為什么要并發(fā)編程
并發(fā)與并行

并發(fā):如果多個(gè)隊(duì)列可以交替使用某臺(tái)咖啡機(jī),則這一行為就是并發(fā)的。 并行:如果存在多臺(tái)咖啡機(jī)可以被多個(gè)隊(duì)列交替使用,則就是并行。
進(jìn)程與線程
進(jìn)程(英語(yǔ):process),是指計(jì)算機(jī)中已運(yùn)行的程序。進(jìn)程為曾經(jīng)是分時(shí)系統(tǒng)的基本運(yùn)作單位。在面向進(jìn)程設(shè)計(jì)的系統(tǒng)(如早期的UNIX,Linux 2.4及更早的版本)中,進(jìn)程是程序的基本執(zhí)行實(shí)體; 線程(英語(yǔ):thread)是操作系統(tǒng)能夠進(jìn)行運(yùn)算調(diào)度的最小單位。它被包含在進(jìn)程之中,是進(jìn)程中的實(shí)際運(yùn)作單位。 -- 維基百科


并發(fā)系統(tǒng)的性能
C++與并發(fā)編程
相較而言,Java自JDK 1.0就包含了多線程模型。

編譯器與C++標(biāo)準(zhǔn)
GCC對(duì)于C++特性的支持請(qǐng)參見(jiàn)這里:C++ Standards Support in GCC。 Clang對(duì)于C++特性的支持請(qǐng)參見(jiàn)這里:C++ Support in Clang。
C++標(biāo)準(zhǔn)與相應(yīng)的GCC版本要求如下:

C++標(biāo)準(zhǔn)與相應(yīng)的Clang版本要求如下:

g++ -std=c++17 your_file.cpp -o your_program
測(cè)試環(huán)境
git clone https://github.com/paulQuei/cpp-concurrency.git
具體編譯器對(duì)于C++特性支持的情況請(qǐng)參見(jiàn)這里:C++ compiler support。
./make_all.sh
MacOS
rew install gcc
brew insbtall tbb
export tbb_path=/usr/local/Cellar/tbb/2019_U8/
./make_all.sh
brew upgrade gcc
Ubuntu
sudo add-apt-repository ppa:ubuntu-toolchain-r/test
sudo apt-get update
sudo apt install gcc-9 g++-9
libtbb2_2019~U8-1_amd64.deb libtbb-dev_2019~U8-1_amd64.deb
sudo apt install ~/Downloads/libtbb2_2019~U8-1_amd64.deb
sudo apt install ~/Downloads/libtbb-dev_2019~U8-1_amd64.deb
線程
創(chuàng)建線程
// 01_hello_thread.cpp
#include <iostream>
#include <thread> // ①
using namespace std; // ②
void hello() { // ③
cout << "Hello World from new thread." << endl;
}
int main() {
thread t(hello); // ④
t.join(); // ⑤
return 0;
}
為了使用多線程的接口,我們需要#include <thread>頭文件。 為了簡(jiǎn)化聲明,本文中的代碼都將using namespace std;。 新建線程的入口是一個(gè)普通的函數(shù),它并沒(méi)有什么特別的地方。 創(chuàng)建線程的方式就是構(gòu)造一個(gè)thread對(duì)象,并指定入口函數(shù)。與普通對(duì)象不一樣的是,此時(shí)編譯器便會(huì)為我們創(chuàng)建一個(gè)新的操作系統(tǒng)線程,并在新的線程中執(zhí)行我們的入口函數(shù)。 關(guān)于join函數(shù)在下文中講解。
// 02_lambda_thread.cpp
#include <iostream>
#include <thread>
using namespace std;
int main() {
thread t([] {
cout << "Hello World from lambda thread." << endl;
});
t.join();
return 0;
}
為了減少不必要的重復(fù),若無(wú)必要,下文中的代碼將不貼出include指令以及using聲明。
// 03_thread_argument.cpp
void hello(string name) {
cout << "Welcome to " << name << endl;
}
int main() {
thread t(hello, "https://paul.pub");
t.join();
return 0;
}
join與detach
主要API

請(qǐng)思考在上面的代碼示例中,thread對(duì)象在何時(shí)會(huì)銷(xiāo)毀。
join:調(diào)用此接口時(shí),當(dāng)前線程會(huì)一直阻塞,直到目標(biāo)線程執(zhí)行完成(當(dāng)然,很可能目標(biāo)線程在此處調(diào)用之前就已經(jīng)執(zhí)行完成了,不過(guò)這不要緊)。因此,如果目標(biāo)線程的任務(wù)非常耗時(shí),你就要考慮好是否需要在主線程上等待它了,因此這很可能會(huì)導(dǎo)致主線程卡住。 detach:detach是讓目標(biāo)線程成為守護(hù)線程(daemon threads)。一旦detach之后,目標(biāo)線程將獨(dú)立執(zhí)行,即便其對(duì)應(yīng)的thread對(duì)象銷(xiāo)毀也不影響線程的執(zhí)行。并且,你無(wú)法再與之通信。
管理當(dāng)前線程
主要API

yield 通常用在自己的主要任務(wù)已經(jīng)完成的時(shí)候,此時(shí)希望讓出處理器給其他任務(wù)使用。 get_id 返回當(dāng)前線程的id,可以以此來(lái)標(biāo)識(shí)不同的線程。 sleep_for 是讓當(dāng)前線程停止一段時(shí)間。 sleep_until 和sleep_for類(lèi)似,但是是以具體的時(shí)間點(diǎn)為參數(shù)。這兩個(gè)API都以chrono API(由于篇幅所限,這里不展開(kāi)這方面內(nèi)容)為基礎(chǔ)。
// 04_thread_self_manage.cpp
void print_time() {
auto now = chrono::system_clock::now();
auto in_time_t = chrono::system_clock::to_time_t(now);
std::stringstream ss;
ss << put_time(localtime(&in_time_t), "%Y-%m-%d %X");
cout << "now is: " << ss.str() << endl;
}
void sleep_thread() {
this_thread::sleep_for(chrono::seconds(3));
cout << "[thread-" << this_thread::get_id() << "] is waking up" << endl;
}
void loop_thread() {
for (int i = 0; i < 10; i++) {
cout << "[thread-" << this_thread::get_id() << "] print: " << i << endl;
}
}
int main() {
print_time();
thread t1(sleep_thread);
thread t2(loop_thread);
t1.join();
t2.detach();
print_time();
return 0;
}
now is: 2019-10-13 10:17:48
[thread-0x70000cdda000] print: 0
[thread-0x70000cdda000] print: 1
[thread-0x70000cdda000] print: 2
[thread-0x70000cdda000] print: 3
[thread-0x70000cdda000] print: 4
[thread-0x70000cdda000] print: 5
[thread-0x70000cdda000] print: 6
[thread-0x70000cdda000] print: 7
[thread-0x70000cdda000] print: 8
[thread-0x70000cdda000] print: 9
[thread-0x70000cd57000] is waking up
now is: 2019-10-13 10:17:51
一次調(diào)用
主要API
// 05_call_once.cpp
void init() {
cout << "Initialing..." << endl;
// Do something...
}
void worker(once_flag* flag) {
call_once(*flag, init);
}
int main() {
once_flag flag;
thread t1(worker, &flag);
thread t2(worker, &flag);
thread t3(worker, &flag);
t1.join();
t2.join();
t3.join();
return 0;
}
請(qǐng)思考一下,為什么要在main函數(shù)中創(chuàng)建once_flag flag。如果是在worker函數(shù)中直接聲明一個(gè)once_flag并使用行不行?為什么?
并發(fā)任務(wù)
// 06_naive_multithread.cpp
static const int MAX = 10e8; // ①
static double sum = 0; // ②
void worker(int min, int max) { // ③
for (int i = min; i <= max; i++) {
sum += sqrt(i);
}
}
void serial_task(int min, int max) { // ④
auto start_time = chrono::steady_clock::now();
sum = 0;
worker(0, MAX);
auto end_time = chrono::steady_clock::now();
auto ms = chrono::duration_cast<chrono::milliseconds>(end_time - start_time).count();
cout << "Serail task finish, " << ms << " ms consumed, Result: " << sum << endl;
}
通過(guò)一個(gè)常量指定數(shù)據(jù)范圍,這個(gè)是為了方便調(diào)整。 通過(guò)一個(gè)全局變量來(lái)存儲(chǔ)結(jié)果。 通過(guò)一個(gè)任務(wù)函數(shù)來(lái)計(jì)算值。 統(tǒng)計(jì)任務(wù)的執(zhí)行時(shí)間。
Serail task finish, 6406 ms consumed, Result: 2.10819e+13
// 06_naive_multithread.cpp
void concurrent_task(int min, int max) {
auto start_time = chrono::steady_clock::now();
unsigned concurrent_count = thread::hardware_concurrency(); // ①
cout << "hardware_concurrency: " << concurrent_count << endl;
vector<thread> threads;
min = 0;
sum = 0;
for (int t = 0; t < concurrent_count; t++) { // ②
int range = max / concurrent_count * (t + 1);
threads.push_back(thread(worker, min, range)); // ③
min = range + 1;
}
for (auto& t : threads) {
t.join(); // ④
}
auto end_time = chrono::steady_clock::now();
auto ms = chrono::duration_cast<chrono::milliseconds>(end_time - start_time).count();
cout << "Concurrent task finish, " << ms << " ms consumed, Result: " << sum << endl;
}
thread::hardware_concurrency()可以獲取到當(dāng)前硬件支持多少個(gè)線程并行執(zhí)行。 根據(jù)處理器的情況決定線程的數(shù)量。 對(duì)于每一個(gè)線程都通過(guò)worker函數(shù)來(lái)完成任務(wù),并劃分一部分?jǐn)?shù)據(jù)給它處理。 等待每一個(gè)線程執(zhí)行結(jié)束。
hardware_concurrency: 16
Concurrent task finish, 6246 ms consumed, Result: 1.78162e+12
事實(shí)上,目前大部分CPU的緩存已經(jīng)不只一層。


競(jìng)爭(zhēng)條件與臨界區(qū)
互斥體與鎖
mutex
獨(dú)立的對(duì)于劃分給自己的數(shù)據(jù)的處理 對(duì)于處理結(jié)果的匯總
主要API

| lock|鎖定互斥體,如果不可用,則阻塞 |
| try_lock |嘗試鎖定互斥體,如果不可用,直接返回 |
|unlock | 解鎖互斥體|
超時(shí): timed_mutex, recursive_timed_mutex, shared_timed_mutex 名稱(chēng)都帶有timed,這意味著它們都支持超時(shí)功能。它們都提供了try_lock_for和try_lock_until方法,這兩個(gè)方法分別可以指定超時(shí)的時(shí)間長(zhǎng)度和時(shí)間點(diǎn)。如果在超時(shí)的時(shí)間范圍內(nèi)沒(méi)有能獲取到鎖,則直接返回,不再繼續(xù)等待。 可重入: recursive_mutex和recursive_timed_mutex的名稱(chēng)都帶有recursive。可重入或者叫做可遞歸,是指在同一個(gè)線程中,同一把鎖可以鎖定多次。這就避免了一些不必要的死鎖。 共享: shared_timed_mutex和shared_mutex提供了共享功能。對(duì)于這類(lèi)互斥體,實(shí)際上是提供了兩把鎖:一把是共享鎖,一把是互斥鎖。一旦某個(gè)線程獲取了互斥鎖,任何其他線程都無(wú)法再獲取互斥鎖和共享鎖;但是如果有某個(gè)線程獲取到了共享鎖,其他線程無(wú)法再獲取到互斥鎖,但是還有獲取到共享鎖。這里互斥鎖的使用和其他的互斥體接口和功能一樣。而共享鎖可以同時(shí)被多個(gè)線程同時(shí)獲取到(使用共享鎖的接口見(jiàn)下面的表格)。共享鎖通常用在讀者寫(xiě)者模型上。
|lock_shared | 獲取互斥體的共享鎖,如果無(wú)法獲取則阻塞 |
| try_lock_shared| 嘗試獲取共享鎖,如果不可用,直接返回 |
| unlock_shared| 解鎖共享鎖 |
// 07_mutex_lock.cpp
static const int MAX = 10e8;
static double sum = 0;
static mutex exclusive;
void concurrent_worker(int min, int max) {
for (int i = min; i <= max; i++) {
exclusive.lock(); // ①
sum += sqrt(i);
exclusive.unlock(); // ②
}
}
void concurrent_task(int min, int max) {
auto start_time = chrono::steady_clock::now();
unsigned concurrent_count = thread::hardware_concurrency();
cout << "hardware_concurrency: " << concurrent_count << endl;
vector<thread> threads;
min = 0;
sum = 0;
for (int t = 0; t < concurrent_count; t++) {
int range = max / concurrent_count * (t + 1);
threads.push_back(thread(concurrent_worker, min, range)); // ③
min = range + 1;
}
for (int i = 0; i < threads.size(); i++) {
threads[i].join();
}
auto end_time = chrono::steady_clock::now();
auto ms = chrono::duration_cast<chrono::milliseconds>(end_time - start_time).count();
cout << "Concurrent task finish, " << ms << " ms consumed, Result: " << sum << endl;
}
在訪問(wèn)共享數(shù)據(jù)之前加鎖 訪問(wèn)完成之后解鎖 在多線程中使用帶鎖的版本
hardware_concurrency: 16
Concurrent task finish, 74232 ms consumed, Result: 2.10819e+13
// 08_improved_mutex_lock.cpp
void concurrent_worker(int min, int max) {
double tmp_sum = 0;
for (int i = min; i <= max; i++) {
tmp_sum += sqrt(i); // ①
}
exclusive.lock(); // ②
sum += tmp_sum;
exclusive.unlock();
}
通過(guò)一個(gè)局部變量保存當(dāng)前線程的處理結(jié)果 在匯總總結(jié)過(guò)的時(shí)候進(jìn)行鎖保護(hù)
hardware_concurrency: 16
Concurrent task finish, 451 ms consumed, Result: 2.10819e+13
In general, a lock should be held for only the minimum possible time needed to perform the required operations. --《C++ Concurrency in Action》
死鎖

// 09_deadlock_bank_transfer.cpp
class Account {
public:
Account(string name, double money): mName(name), mMoney(money) {};
public:
void changeMoney(double amount) {
mMoney += amount;
}
string getName() {
return mName;
}
double getMoney() {
return mMoney;
}
mutex* getLock() {
return &mMoneyLock;
}
private:
string mName;
double mMoney;
mutex mMoneyLock;
};
// 09_deadlock_bank_transfer.cpp
class Bank {
public:
void addAccount(Account* account) {
mAccounts.insert(account);
}
bool transferMoney(Account* accountA, Account* accountB, double amount) {
lock_guard guardA(*accountA->getLock()); // ①
lock_guard guardB(*accountB->getLock());
if (amount > accountA->getMoney()) { // ②
return false;
}
accountA->changeMoney(-amount); // ③
accountB->changeMoney(amount);
return true;
}
double totalMoney() const {
double sum = 0;
for (auto a : mAccounts) {
sum += a->getMoney();
}
return sum;
}
private:
set<Account*> mAccounts;
};
為了保證線程安全,在修改每個(gè)賬號(hào)之前,需要獲取相應(yīng)的鎖。 判斷轉(zhuǎn)出賬戶(hù)金額是否足夠,如果不夠此次轉(zhuǎn)賬失敗。 進(jìn)行轉(zhuǎn)賬。
// 09_deadlock_bank_transfer.cpp
void randomTransfer(Bank* bank, Account* accountA, Account* accountB) {
while(true) {
double randomMoney = ((double)rand() / RAND_MAX) * 100;
if (bank->transferMoney(accountA, accountB, randomMoney)) {
cout << "Transfer " << randomMoney << " from " << accountA->getName()
<< " to " << accountB->getName()
<< ", Bank totalMoney: " << bank->totalMoney() << endl;
} else {
cout << "Transfer failed, "
<< accountA->getName() << " has only $" << accountA->getMoney() << ", but "
<< randomMoney << " required" << endl;
}
}
}
// 09_deadlock_bank_transfer.cpp
int main() {
Account a("Paul", 100);
Account b("Moira", 100);
Bank aBank;
aBank.addAccount(&a);
aBank.addAccount(&b);
thread t1(randomTransfer, &aBank, &a, &b);
thread t2(randomTransfer, &aBank, &b, &a);
t1.join();
t2.join();
return 0;
}
...
Transfer 13.2901 from Paul to Moira, Bank totalMoney: 20042.6259 from Moira to Paul, Bank totalMoney: 200
Transfer failed, Moira has only $34.7581, but 66.3208 required
Transfer failed, Moira has only $34.7581, but
Transfer 93.191 from 53.9176 required
Transfer 60.6146 from Moira to Paul, Bank totalMoney: 200
Transfer 49.7304 from Moira to Paul, Bank totalMoney: 200Paul to Moira, Bank totalMoney:
Transfer failed, Moira has only $17.6041, but 18.1186 required
Transfer failed, Moira has only $17.6041, but 18.893 required
Transfer failed, Moira has only $17.6041, but 34.7078 required
Transfer failed, Moira has only $17.6041, but 33.9569 required
Transfer 12.7899 from 200
Moira to Paul, Bank totalMoney: 200
Transfer failed, Moira has only $63.9373, but 80.9038 required
Transfer 50.933 from Moira to Paul, Bank totalMoney: 200
Transfer failed, Moira has only $13.0043, but 30.2056 required
Transfer failed, Moira has only $Transfer 59.123 from Paul to Moira, Bank totalMoney: 200
Transfer 29.0486 from Paul to Moira, Bank totalMoney: 20013.0043, but 64.7307 required

// 10_improved_bank_transfer.cpp
mutex sCoutLock;
void randomTransfer(Bank* bank, Account* accountA, Account* accountB) {
while(true) {
double randomMoney = ((double)rand() / RAND_MAX) * 100;
if (bank->transferMoney(accountA, accountB, randomMoney)) {
sCoutLock.lock();
cout << "Transfer " << randomMoney << " from " << accountA->getName()
<< " to " << accountB->getName()
<< ", Bank totalMoney: " << bank->totalMoney() << endl;
sCoutLock.unlock();
} else {
sCoutLock.lock();
cout << "Transfer failed, "
<< accountA->getName() << " has only " << accountA->getMoney() << ", but "
<< randomMoney << " required" << endl;
sCoutLock.unlock();
}
}
}
請(qǐng)思考一下兩處lock和unlock調(diào)用,并考慮為什么不在while(true)下面寫(xiě)一次整體的加鎖和解鎖。
通用鎖定算法
主要API

// 10_improved_bank_transfer.cpp
bool transferMoney(Account* accountA, Account* accountB, double amount) {
lock(*accountA->getLock(), *accountB->getLock()); // ①
lock_guard lockA(*accountA->getLock(), adopt_lock); // ②
lock_guard lockB(*accountB->getLock(), adopt_lock); // ③
if (amount > accountA->getMoney()) {
return false;
}
accountA->changeMoney(-amount);
accountB->changeMoney(amount);
return true;
}
這里通過(guò)lock函數(shù)來(lái)獲取兩把鎖,標(biāo)準(zhǔn)庫(kù)的實(shí)現(xiàn)會(huì)保證不會(huì)發(fā)生死鎖。 lock_guard在下面我們還會(huì)詳細(xì)介紹。這里只要知道它會(huì)在自身對(duì)象生命周期的范圍內(nèi)鎖定互斥體即可。創(chuàng)建lock_guard的目的是為了在transferMoney結(jié)束的時(shí)候釋放鎖,lockB也是一樣。但需要注意的是,這里傳遞了 adopt_lock表示:現(xiàn)在是已經(jīng)獲取到互斥體了的狀態(tài)了,不用再次加鎖(如果不加adopt_lock就是二次鎖定了)。
...
Transfer failed, Paul has only $1.76243, but 17.5974 required
Transfer failed, Paul has only $1.76243, but 59.2104 required
Transfer failed, Paul has only $1.76243, but 49.6379 required
Transfer failed, Paul has only $1.76243, but 63.6373 required
Transfer failed, Paul has only $1.76243, but 51.8742 required
Transfer failed, Paul has only $1.76243, but 50.0081 required
Transfer failed, Paul has only $1.76243, but 86.1041 required
Transfer failed, Paul has only $1.76243, but 51.3278 required
Transfer failed, Paul has only $1.76243, but 66.5754 required
Transfer failed, Paul has only $1.76243, but 32.1867 required
Transfer failed, Paul has only $1.76243, but 62.0039 required
Transfer failed, Paul has only $1.76243, but 98.7819 required
Transfer failed, Paul has only $1.76243, but 27.046 required
Transfer failed, Paul has only $1.76243, but 62.9155 required
Transfer 98.8478 from Moira to Paul, Bank totalMoney: 200
Transfer 80.0722 from Moira to Paul, Bank totalMoney: 200
Transfer 73.7035 from Moira to Paul, Bank totalMoney: 200
Transfer 34.4476 from Moira to Paul, Bank totalMoney: 200
Transfer failed, Moira has only $10.0142, but 61.3033 required
Transfer failed, Moira has only $10.0142, but 24.5595 required
...
通用互斥管理
主要API

// https://en.cppreference.com/w/cpp/thread/lock_guard
int g_i = 0;
std::mutex g_i_mutex; // ①
void safe_increment()
{
std::lock_guard<std::mutex> lock(g_i_mutex); // ②
++g_i;
std::cout << std::this_thread::get_id() << ": " << g_i << '\n';
// ③
}
int main()
{
std::cout << "main: " << g_i << '\n';
std::thread t1(safe_increment); // ④
std::thread t2(safe_increment);
t1.join();
t2.join();
std::cout << "main: " << g_i << '\n';
}
全局的互斥體g_i_mutex用來(lái)保護(hù)全局變量g_i 這是一個(gè)設(shè)計(jì)為可以被多線程環(huán)境使用的方法。因此需要通過(guò)互斥體來(lái)進(jìn)行保護(hù)。這里沒(méi)有調(diào)用lock方法,而是直接使用lock_guard來(lái)鎖定互斥體。 在方法結(jié)束的時(shí)候,局部變量std::lock_guard<std::mutex> lock會(huì)被銷(xiāo)毀,它對(duì)互斥體的鎖定也就解除了。 在多個(gè)線程中使用這個(gè)方法。
RAII
RAII保證資源可用于任何會(huì)訪問(wèn)該對(duì)象的函數(shù)。它亦保證所有資源在其控制對(duì)象的生存期結(jié)束時(shí),以獲取順序的逆序釋放。類(lèi)似地,若資源獲取失敗(構(gòu)造函數(shù)以異常退出),則為已構(gòu)造完成的對(duì)象和基類(lèi)子對(duì)象所獲取的所有資源,會(huì)以初始化順序的逆序釋放。這有效地利用了語(yǔ)言特性以消除內(nèi)存泄漏并保證異常安全。
將每個(gè)資源封裝入一個(gè)類(lèi),其中: 構(gòu)造函數(shù)請(qǐng)求資源,并建立所有類(lèi)不變式,或在它無(wú)法完成時(shí)拋出異常, 析構(gòu)函數(shù)釋放資源并決不拋出異常; 始終經(jīng)由 RAII 類(lèi)的實(shí)例使用滿(mǎn)足要求的資源,該資源 自身?yè)碛凶詣?dòng)存儲(chǔ)期或臨時(shí)生存期,或 具有與自動(dòng)或臨時(shí)對(duì)象的生存期綁定的生存期
lock(*accountA->getLock(), *accountB->getLock());
lock_guard lockA(*accountA->getLock(), adopt_lock);
lock_guard lockB(*accountB->getLock(), adopt_lock);
unique_lock lockA(*accountA->getLock(), defer_lock);
unique_lock lockB(*accountB->getLock(), defer_lock);
lock(*accountA->getLock(), *accountB->getLock());
scoped_lock lockAll(*accountA->getLock(), *accountB->getLock());
條件變量
| condition_variable | C++ 11 | 提供與 std::unique_lock 關(guān)聯(lián)的條件變量 |
| condition_variable_any | C++ 11 |提供與任何鎖類(lèi)型關(guān)聯(lián)的條件變量 |
| notify_all_at_thread_exit |C++ 11 | 安排到在此線程完全結(jié)束時(shí)對(duì) notify_all 的調(diào)用 |
| cv_status | C++ 11 |列出條件變量上定時(shí)等待的可能結(jié)果 |
生產(chǎn)者和消費(fèi)者共享一個(gè)工作區(qū)。這個(gè)區(qū)間的大小是有限的。 生產(chǎn)者總是產(chǎn)生數(shù)據(jù)放入工作區(qū)中,當(dāng)工作區(qū)滿(mǎn)了。它就停下來(lái)等消費(fèi)者消費(fèi)一部分?jǐn)?shù)據(jù),然后繼續(xù)工作。 消費(fèi)者總是從工作區(qū)中拿出數(shù)據(jù)使用。當(dāng)工作區(qū)中的數(shù)據(jù)全部被消費(fèi)空了之后,它也會(huì)停下來(lái)等待生產(chǎn)者往工作區(qū)中放入新的數(shù)據(jù)。
// 11_bank_transfer_wait_notify.cpp
class Account {
public:
Account(string name, double money): mName(name), mMoney(money) {};
public:
void changeMoney(double amount) {
unique_lock lock(mMoneyLock); // ②
mConditionVar.wait(lock, [this, amount] { // ③
return mMoney + amount > 0; // ④
});
mMoney += amount;
mConditionVar.notify_all(); // ⑤
}
string getName() {
return mName;
}
double getMoney() {
return mMoney;
}
private:
string mName;
double mMoney;
mutex mMoneyLock;
condition_variable mConditionVar; // ①
};
這里聲明了一個(gè)條件變量,用來(lái)在多個(gè)線程之間協(xié)作。 這里使用的是unique_lock,這是為了與條件變量相配合。因?yàn)闂l件變量會(huì)解鎖和重新鎖定互斥體。 這里是比較重要的一個(gè)地方:通過(guò)條件變量進(jìn)行等待。此時(shí):會(huì)通過(guò)后面的lambda表達(dá)式判斷條件是否滿(mǎn)足。如果滿(mǎn)足則繼續(xù);如果不滿(mǎn)足,則此處會(huì)解鎖互斥體,并讓當(dāng)前線程等待。解鎖這一點(diǎn)非常重要,因?yàn)橹挥羞@樣,才能讓其他線程獲取互斥體。 這里是條件變量等待的條件。如果你不熟悉lambda表達(dá)式,請(qǐng)自行網(wǎng)上學(xué)習(xí),或者閱讀我之前寫(xiě)的文章。 此處也很重要。當(dāng)金額發(fā)生變動(dòng)之后,我們需要通知所有在條件變量上等待的其他線程。此時(shí)所有調(diào)用wait線程都會(huì)再次喚醒,然后嘗試獲取鎖(當(dāng)然,只有一個(gè)能獲取到)并再次判斷條件是否滿(mǎn)足。除了notify_all還有notify_one,它只通知一個(gè)等待的線程。wait和notify就構(gòu)成了線程間互相協(xié)作的工具。
// 11_bank_transfer_wait_notify.cpp
void Bank::transferMoney(Account* accountA, Account* accountB, double amount) {
accountA->changeMoney(-amount);
accountB->changeMoney(amount);
}
// 11_bank_transfer_wait_notify.cpp
mutex sCoutLock;
void randomTransfer(Bank* bank, Account* accountA, Account* accountB) {
while(true) {
double randomMoney = ((double)rand() / RAND_MAX) * 100;
{
lock_guard guard(sCoutLock);
cout << "Try to Transfer " << randomMoney
<< " from " << accountA->getName() << "(" << accountA->getMoney()
<< ") to " << accountB->getName() << "(" << accountB->getMoney()
<< "), Bank totalMoney: " << bank->totalMoney() << endl;
}
bank->transferMoney(accountA, accountB, randomMoney);
}
}
...
Try to Transfer 13.72 from Moira(10.9287) to Paul(189.071), Bank totalMoney: 200
Try to Transfer 28.6579 from Paul(189.071) to Moira(10.9287), Bank totalMoney: 200
Try to Transfer 91.8049 from Paul(160.413) to Moira(39.5866), Bank totalMoney: 200
Try to Transfer 5.56383 from Paul(82.3285) to Moira(117.672), Bank totalMoney: 200
Try to Transfer 11.3594 from Paul(76.7646) to Moira(123.235), Bank totalMoney: 200
Try to Transfer 16.9557 from Paul(65.4053) to Moira(134.595), Bank totalMoney: 200
Try to Transfer 74.998 from Paul(48.4495) to Moira(151.55), Bank totalMoney: 200
Try to Transfer 65.3005 from Moira(151.55) to Paul(48.4495), Bank totalMoney: 200
Try to Transfer 90.6084 from Moira(86.25) to Paul(113.75), Bank totalMoney: 125.002
Try to Transfer 99.6425 from Moira(70.6395) to Paul(129.36), Bank totalMoney: 200
Try to Transfer 55.2091 from Paul(129.36) to Moira(70.6395), Bank totalMoney: 200
Try to Transfer 92.259 from Paul(74.1513) to Moira(125.849), Bank totalMoney: 200
...
future

async
// 12_async_task.cpp
static const int MAX = 10e8;
static double sum = 0;
void worker(int min, int max) {
for (int i = min; i <= max; i++) {
sum += sqrt(i);
}
}
int main() {
sum = 0;
auto f1 = async(worker, 0, MAX);
cout << "Async task triggered" << endl;
f1.wait();
cout << "Async task finish, result: " << sum << endl << endl;
}
這里以異步的方式啟動(dòng)了任務(wù)。它會(huì)返回一個(gè)future對(duì)象。future用來(lái)存儲(chǔ)異步任務(wù)的執(zhí)行結(jié)果,關(guān)于future我們?cè)诤竺鎝ackaged_task的例子中再詳細(xì)說(shuō)明。在這個(gè)例子中我們僅僅用它來(lái)等待任務(wù)執(zhí)行完成。 此處是等待異步任務(wù)執(zhí)行完成。
async:運(yùn)行新線程,以異步執(zhí)行任務(wù)。 deferred:調(diào)用方線程上第一次請(qǐng)求其結(jié)果時(shí)才執(zhí)行任務(wù),即惰性求值。
// 12_async_task.cpp
int main() {
double result = 0;
cout << "Async task with lambda triggered, thread: " << this_thread::get_id() << endl;
auto f2 = async(launch::async, [&result]() {
cout << "Lambda task in thread: " << this_thread::get_id() << endl;
for (int i = 0; i <= MAX; i++) {
result += sqrt(i);
}
});
f2.wait();
cout << "Async task with lambda finish, result: " << result << endl << endl;
return 0;
}
Async task with lambda triggered, thread: 0x11290d5c0
Lambda task in thread: 0x700007aa1000
Async task with lambda finish, result: 2.10819e+13
// 12_async_task.cpp
class Worker {
public:
Worker(int min, int max): mMin(min), mMax(max) {} // ①
double work() { // ②
mResult = 0;
for (int i = mMin; i <= mMax; i++) {
mResult += sqrt(i);
}
return mResult;
}
double getResult() {
return mResult;
}
private:
int mMin;
int mMax;
double mResult;
};
int main() {
Worker w(0, MAX);
cout << "Task in class triggered" << endl;
auto f3 = async(&Worker::work, &w); // ③
f3.wait();
cout << "Task in class finish, result: " << w.getResult() << endl << endl;
return 0;
}
這里通過(guò)一個(gè)類(lèi)來(lái)描述任務(wù)。這個(gè)類(lèi)是對(duì)前面提到的任務(wù)的封裝。它包含了任務(wù)的輸入?yún)?shù),和輸出結(jié)果。 work函數(shù)是任務(wù)的主體邏輯。 通過(guò)async執(zhí)行任務(wù):這里指定了具體的任務(wù)函數(shù)以及相應(yīng)的對(duì)象。請(qǐng)注意這里是&w,因此傳遞的是對(duì)象的指針。如果不寫(xiě)&將傳入w對(duì)象的臨時(shí)復(fù)制。
packaged_task
如果你了解設(shè)計(jì)模式,你應(yīng)該會(huì)知道命令模式。
// 13_packaged_task.cpp
double concurrent_worker(int min, int max) {
double sum = 0;
for (int i = min; i <= max; i++) {
sum += sqrt(i);
}
return sum;
}
double concurrent_task(int min, int max) {
vector<future<double>> results; // ①
unsigned concurrent_count = thread::hardware_concurrency();
min = 0;
for (int i = 0; i < concurrent_count; i++) { // ②
packaged_task<double(int, int)> task(concurrent_worker); // ③
results.push_back(task.get_future()); // ④
int range = max / concurrent_count * (i + 1);
thread t(std::move(task), min, range); // ⑤
t.detach();
min = range + 1;
}
cout << "threads create finish" << endl;
double sum = 0;
for (auto& r : results) {
sum += r.get(); ⑥
}
return sum;
}
int main() {
auto start_time = chrono::steady_clock::now();
double r = concurrent_task(0, MAX);
auto end_time = chrono::steady_clock::now();
auto ms = chrono::duration_cast<chrono::milliseconds>(end_time - start_time).count();
cout << "Concurrent task finish, " << ms << " ms consumed, Result: " << r << endl;
return 0;
}
首先創(chuàng)建一個(gè)集合來(lái)存儲(chǔ)future對(duì)象。我們將用它來(lái)獲取任務(wù)的結(jié)果。 同樣的,根據(jù)CPU的情況來(lái)創(chuàng)建線程的數(shù)量。 將任務(wù)包裝成packaged_task。請(qǐng)注意,由于concurrent_worker被包裝成了任務(wù),我們無(wú)法直接獲取它的return值。而是要通過(guò)future對(duì)象來(lái)獲取。 獲取任務(wù)關(guān)聯(lián)的future對(duì)象,并將其存入集合中。 通過(guò)一個(gè)新的線程來(lái)執(zhí)行任務(wù),并傳入需要的參數(shù)。 通過(guò)future集合,逐個(gè)獲取每個(gè)任務(wù)的計(jì)算結(jié)果,將其累加。這里r.get()獲取到的就是每個(gè)任務(wù)中concurrent_worker的返回值。
promise與future
// 14_promise_future.cpp
double concurrent_worker(int min, int max) {
double sum = 0;
for (int i = min; i <= max; i++) {
sum += sqrt(i);
}
return sum;
}
void concurrent_task(int min, int max, promise<double>* result) { // ①
vector<future<double>> results;
unsigned concurrent_count = thread::hardware_concurrency();
min = 0;
for (int i = 0; i < concurrent_count; i++) {
packaged_task<double(int, int)> task(concurrent_worker);
results.push_back(task.get_future());
int range = max / concurrent_count * (i + 1);
thread t(std::move(task), min, range);
t.detach();
min = range + 1;
}
cout << "threads create finish" << endl;
double sum = 0;
for (auto& r : results) {
sum += r.get();
}
result->set_value(sum); // ②
cout << "concurrent_task finish" << endl;
}
int main() {
auto start_time = chrono::steady_clock::now();
promise<double> sum; // ③
concurrent_task(0, MAX, &sum);
auto end_time = chrono::steady_clock::now();
auto ms = chrono::duration_cast<chrono::milliseconds>(end_time - start_time).count();
cout << "Concurrent task finish, " << ms << " ms consumed." << endl;
cout << "Result: " << sum.get_future().get() << endl; // ④
return 0;
}
concurrent_task不再直接返回計(jì)算結(jié)果,而是增加了一個(gè)promise對(duì)象來(lái)存放結(jié)果。 在任務(wù)計(jì)算完成之后,將總結(jié)過(guò)設(shè)置到promise對(duì)象上。一旦這里調(diào)用了set_value,其相關(guān)聯(lián)的future對(duì)象就會(huì)就緒。 這里是在main中創(chuàng)建一個(gè)promoise來(lái)存放結(jié)果,并以指針的形式傳遞進(jìn)concurrent_task中。 通過(guò)sum.get_future().get()來(lái)獲取結(jié)果。第2點(diǎn)中已經(jīng)說(shuō)了:一旦調(diào)用了set_value,其相關(guān)聯(lián)的future對(duì)象就會(huì)就緒。

// 15_parallel_algorithm.cpp
void generateRandomData(vector<double>& collection, int size) {
random_device rd;
mt19937 mt(rd());
uniform_real_distribution<double> dist(1.0, 100.0);
for (int i = 0; i < size; i++) {
collection.push_back(dist(mt));
}
}
int main() {
vector<double> collection;
generateRandomData(collection, 10e6); // ①
vector<double> copy1(collection); // ②
vector<double> copy2(collection);
vector<double> copy3(collection);
auto time1 = chrono::steady_clock::now(); // ③
sort(execution::seq, copy1.begin(), copy1.end()); // ④
auto time2 = chrono::steady_clock::now();
auto duration = chrono::duration_cast<chrono::milliseconds>(time2 - time1).count();
cout << "Sequenced sort consuming " << duration << "ms." << endl; // ⑤
auto time3 = chrono::steady_clock::now();
sort(execution::par, copy2.begin(),copy2.end()); // ⑥
auto time4 = chrono::steady_clock::now();
duration = chrono::duration_cast<chrono::milliseconds>(time4 - time3).count();
cout << "Parallel sort consuming " << duration << "ms." << endl;
auto time5 = chrono::steady_clock::now();
sort(execution::par_unseq, copy2.begin(),copy2.end()); // ⑦
auto time6 = chrono::steady_clock::now();
duration = chrono::duration_cast<chrono::milliseconds>(time6 - time5).count();
cout << "Parallel unsequenced sort consuming " << duration << "ms." << endl;
}
通過(guò)一個(gè)函數(shù)生成1000,000個(gè)隨機(jī)數(shù)。
將數(shù)據(jù)拷貝3份,以備使用。
接下來(lái)將通過(guò)三個(gè)不同的parallel_policy參數(shù)來(lái)調(diào)用同樣的sort算法。每次調(diào)用記錄開(kāi)始和結(jié)束的時(shí)間。
第一次調(diào)用使用std::execution::seq參數(shù)。
輸出本次測(cè)試所使用的時(shí)間。
第二次調(diào)用使用std::execution::par參數(shù)。
第三次調(diào)用使用std::execution::par_unseq參數(shù)。
Sequenced sort consuming 4464ms.
Parallel sort consuming 459ms.
Parallel unsequenced sort consuming 168ms.
C++學(xué)習(xí)資料免費(fèi)獲取方法:關(guān)注程序喵大人,后臺(tái)回復(fù)“程序喵”即可免費(fèi)獲取40萬(wàn)字C++進(jìn)階獨(dú)家學(xué)習(xí)資料。
往期推薦