開源項目Workflow中有個重要的基礎模塊:
代碼僅300行的C語言線程池。
本文會伴隨源碼分析,而邏輯完備、對稱無差別的特點于第3部分開始
歡迎跳閱, 或直接到Github主頁上圍觀代碼????
https://github.com/sogou/workflow/blob/master/src/kernel/thrdpool.c
???? 0 - Workflow的thrdpool
作為目前Github上炙手可熱的異步調度引擎
Workflow有一個大招是:
計算通信融為一體。
而計算的核心:Executor調度器,
就是基于這個線程池實現的。
可以說,一個通用而高效的線程池,
是我們寫C/C++代碼時離不開的基礎模塊。
thrdpool代碼位置在src/kernel/,
不僅可以直接拿來使用,
同時也適合閱讀學習。
而更重要的,秉承Workflow項目本身
一貫的嚴謹極簡的作風,
這個thrdpool代碼極致簡潔,
實現邏輯上亦非常完備,
結構精巧,處處嚴謹,
復雜的并發處理依然可以對稱無差別,
不得不讓我驚嘆:妙啊!!!????
你可能會很好奇,
線程池還能寫出什么別致的新思路嗎?
先列出一些,你們細品:
特點1
:創建完線程池后,無需記錄任何線程id或對象,線程池可以通過一個等一個的方式優雅地去結束所有線程;???? 也就是說,每一個線程都是對等的 特點2
:線程任務可以由另一個線程任務調起;甚至線程池正在被銷毀時也可以提交下一個任務;(這很重要,因為線程本身很可能是不知道線程池的狀態的;???? 即,每一個任務也是對等的 特點3
:同理,線程任務也可以銷毀這個線程池;(非常完整~???? 每一種行為也是對等的,包括destroy
我真的迫不及待為大家深層解讀一下,
這個我愿稱之為“邏輯完備”的線程池。
???? 1 - 前置知識
第一部分我先梳理一些基本內容梳理,
有基礎的小伙伴可以直接跳過。
如果有不準確的地方,歡迎大家指正交流~
Question: 為什么需要線程池?
其實思路不僅對線程池,
對任何有限資源的調度管理都是類似的。
我們知道,
通過pthread或者std::thread創建線程,
就可以實現多線程并發執行我們的代碼。
但是CPU的核數是固定的,
所以真正并行執行的最大值也是固定的,
過多的線程除了頻繁創建產生overhead以外,
還會導致對系統資源進行爭搶,
這些都是不必要的浪費。
因此我們可以管理有限個線程,
循環且合理地利用它們。??
那么線程池一般包含哪些內容呢?
首先是管理若干個工具人線程; 其次是管理交給線程去執行的任務,這個一般會有一個隊列; 再然后線程之間需要一些同步機制,比如mutex、condition等; 最后就是各線程池實現上自身需要的其他內容了;
接下來我們看看Workflow的thrdpool是怎么做的。
???? 2 - 代碼概覽
以下共7步常用思路,
足以讓我們把代碼飛快過一遍。
第1步:先看頭文件,有什么接口。
我們打開thrdpool.h
,只需關注這三個:
// 創建線程池
thrdpool_t *thrdpool_create(size_t nthreads, size_t stacksize);
// 把任務交給線程池的入口
int thrdpool_schedule(const struct thrdpool_task *task, thrdpool_t *pool);
// 銷毀線程池
void thrdpool_destroy(void (*pending)(const struct thrdpool_task *), thrdpool_t *pool);
第2步:接口上有什么數據結構。
即,我們如何描述一個交給線程池的任務。
struct thrdpool_task
{
void (*routine)(void *); // 函數指針
void *context; // 上下文
};
第3步:再看實現.c,內部數據結構。
struct __thrdpool
{
struct list_head task_queue;// 任務隊列
size_t nthreads; // 線程個數
size_t stacksize; // 構造線程時的參數
pthread_t tid; // 運行期間記錄的是個zero值
pthread_mutex_t mutex;
pthread_cond_t cond;
pthread_key_t key;
pthread_cond_t *terminate;
};
沒有一個多余,每一個成員都很到位:
tid:線程id,整個線程池只有一個,它不會奇怪地去記錄任何一個線程的id,這樣就不完美了????,它平時運行的時候是空值,退出的時候,它是用來實現鏈式等待的關鍵。 mutex 和 cond是常見的線程間同步的工具,其中這個cond是用來給生產者和消費者去操作任務隊列用的。 key:是線程池的key,然后會賦予給每個由線程池創建的線程作為他們的thread local,用于區分這個線程是否是線程池創建的。 一個pthread_cond_t *terminate,這有兩個用途:不僅是退出時的標記位 ,而且還是調用退出的那個人要等待的condition。
以上各個成員的用途,
好像說了,又好像沒說,????
是因為幾乎每一個成員都值得深挖一下,
所以我們記住它們,
后面看代碼的時候就會豁然開朗!????
第4步:接口都調用了什么核心函數。
thrdpool_t *thrdpool_create(size_t nthreads, size_t stacksize)
{
thrdpool_t *pool;
ret = pthread_key_create(&pool->key, NULL);
if (ret == 0)
{
// 去掉了其他代碼,但是注意到剛才的tid和terminate的賦值
memset(&pool->tid, 0, sizeof (pthread_t));
pool->terminate = NULL;
if (__thrdpool_create_threads(nthreads, pool) >= 0)
return pool;
...
這里可以看到__thrdpool_create_threads()
里邊
最關鍵的,就是循環創建nthreads個線程。
while (pool->nthreads < nthreads)
{
ret = pthread_create(&tid, &attr, __thrdpool_routine, pool);
...
第5步:略讀核心函數的功能。
所以我們在上一步知道了,
每個線程執行的是__thrdpool_routine()
不難想象,它會不停從隊列拿任務出來執行:
static void *__thrdpool_routine(void *arg)
{
...
while (1)
{
// 1. 從隊列里拿一個任務出來,沒有就等待
pthread_mutex_lock(&pool->mutex);
while (!pool->terminate && list_empty(&pool->task_queue))
pthread_cond_wait(&pool->cond, &pool->mutex);
// 2. 線程池結束的標志位,記住它,先跳過
if (pool->terminate)
break;
// 3. 如果能走到這里,恭喜你,拿到了任務~
entry = list_entry(*pos, struct __thrdpool_task_entry, list);
list_del(*pos);
// 4. 先解鎖
pthread_mutex_unlock(&pool->mutex);
task_routine = entry->task.routine;
task_context = entry->task.context;
free(entry);
// 5. 再執行
task_routine(task_context);
// 6. 這里也先記住它,意思是線程池里的線程可以銷毀線程池
if (pool->nthreads == 0)
{
/* Thread pool was destroyed by the task. */
free(pool);
return NULL;
}
}
... // 后面還有魔法,留下一章解讀~~~
第6步:把函數之間的關系聯系起來。
剛才看到的__thrdpool_routine()
就是線程的核心函數了,
它可以和誰關聯起來呢?
可以和接口thrdpool_schedule()
關聯上
我們說過,線程池上有個隊列管理任務:
每個執行routine的線程,都是消費者 每個發起schedule的線程,都是生產者
我們已經看過消費者了,來看看生產者的代碼:
inline void __thrdpool_schedule(const struct thrdpool_task *task, void *buf,
thrdpool_t *pool)
{
struct __thrdpool_task_entry *entry = (struct __thrdpool_task_entry *)buf;
entry->task = *task;
pthread_mutex_lock(&pool->mutex);
// 添加到隊列里
list_add_tail(&entry->list, &pool->task_queue);
// 叫醒在等待的線程
pthread_cond_signal(&pool->cond);
pthread_mutex_unlock(&pool->mutex);
}
說到這里,特點2
就非常清晰了:
開篇說的特點2
是說,
”線程任務可以由另一個線程任務調起”。
只要對隊列的管理做得好,
顯然消費者所執行的函數里也可以生產
第7步:看其他情況的處理
對于線程池來說就是比如銷毀的情況。
接口thrdpool_destroy()
實現非常簡單:
void thrdpool_destroy(void (*pending)(const struct thrdpool_task *),
thrdpool_t *pool)
{
...
// 1. 內部會設置pool->terminate,并叫醒所有等在隊列拿任務的線程
__thrdpool_terminate(in_pool, pool);
// 2. 把隊列里還沒有執行的任務都拿出來,通過pending返回給用戶
list_for_each_safe(pos, tmp, &pool->task_queue)
{
entry = list_entry(pos, struct __thrdpool_task_entry, list);
list_del(pos);
if (pending)
pending(&entry->task);
... // 后面就是銷毀各種內存,同樣有魔法~
在退出的時候,
我們那些已經提交但是還沒有被執行的任務
是絕對不能就這么扔掉了的,
于是我們可以傳入一個pending()
函數,
上層可以做自己的回收、回調、
或任何保證上層邏輯完備的事情。
設計的完整性,無處不在。
接下來我們就可以跟著我們的核心問題,
針對性地看看每個特點都是怎么實現的。
???? 3 - 特點1: 一個等待一個的優雅退出
這里提出一個問題:
線程池要退出,如何結束所有線程?
一般線程池的實現都是
需要記錄下所有的線程id,
或者thread對象,
以便于我們去用join方法等待它們結束。
不嚴格地用join收拾干凈會有什么問題?
最直觀的,模塊退出時很可能會報內存泄漏
但是我們剛才看,
pool里并沒有記錄所有的tid呀?
正如開篇說的,
pool上只有一個tid,而且還是個空的值。
而特點1
正是Workflow thrdpool的答案:
無需記錄所有線程,我可以讓線程挨個自動退出、且一個等待下一個,最終達到調用完thrdpool_destroy()后內存回收干凈的目的。
這里先給一個簡單的圖,
假設發起destroy的人是main線程,
我們如何做到一個等一個退出:
步驟如下:
線程的退出,由thrdpool_destroy()設置pool->terminate開始。 我們每個線程,在while(1) 里會第一時間發現terminate,線程池要退出了,然后會break出這個while循環。 注意這個時候,還持有著mutex鎖,我們拿出pool上唯一的那個tid,放到我的臨時變量,我會根據拿出來的值做不同的處理。且我會把我自己的tid放上去,然后再解mutex鎖。 那么很顯然,第一個從pool上拿tid的人,會發現這是個0值,就可以直接結束了,不用負責等待任何其他人,但我在完全結束之前需要有人負責等待我的結束,所以我會把我的id放上去。 而如果發現自己從pool里拿到的tid不是0值,說明我要負責join上一個人,并且把我的tid放上去,讓下一個人負責我。 最后的那個人,是那個發現pool->nthreads為0的人,那么我就可以通過這個terminate(它本身是個condition)去通知發起destroy的人。 最后發起者就可以退了。????
是不是非常有意思!!!
非常優雅的做法!!!????
所以我們會發現,
其實大家不太需要知道太多信息,
只需要知道我要負責的上一個人。
當然每一步都是非常嚴謹的,
結合剛才跳過的第一段魔法????感受一下:
static void *__thrdpool_routine(void *arg)
{
while (1)
{ // 1.注意這里還持有鎖
pthread_mutex_lock(&pool->mutex);
... // 等著隊列拿任務出來
// 2. 這既是標識位,也是發起銷毀的那個人所等待的condition
if (pool->terminate)
break;
... // 執行拿到的任務
}
/* One thread joins another. Don't need to keep all thread IDs. */
// 3. 把線程池上記錄的那個tid拿下來,我來負責上一人
tid = pool->tid;
// 4. 把我自己記錄到線程池上,下一個人來負責我
pool->tid = pthread_self();
// 5. 每個人都減1,最后一個人負責叫醒發起detroy的人
if (--pool->nthreads == 0)
pthread_cond_signal(pool->terminate);
// 6. 這里可以解鎖進行等待了
pthread_mutex_unlock(&pool->mutex);
// 7. 只有第一個人拿到0值
if (memcmp(&tid, &__zero_tid, sizeof (pthread_t)) != 0)
// 8. 只要不0值,我就要負責等上一個結束才能退
pthread_join(tid, NULL);
return NULL; // 9. 退出,干干凈凈~
}
???? 4 - 特點2:線程任務可以由另一個線程任務調起
在第二部分我們看過源碼,
只要隊列管理得好,
線程任務里提交下一個任務是完全OK的。
這很合理。????
那么問題來了,特點1
又說,我們每個線程,
是不需要知道太多線程池的狀態和信息的。
而線程池的銷毀是個過程,
如果在這個過程間提交任務會怎么樣呢?
因此特點2
的一個重要解讀是:
線程池被銷毀時也可以提交下一個任務。
而且剛才提過,
還沒有被執行的任務,
可以通過我們傳入的pending()函數拿回來。
簡單看看銷毀時的嚴謹做法:
static void __thrdpool_terminate(int in_pool, thrdpool_t *pool)
{
pthread_cond_t term = PTHREAD_COND_INITIALIZER;
pthread_mutex_lock(&pool->mutex);
// 1. 加鎖設置標志位,之后的添加任務不會被執行,但可以pending拿到
pool->terminate = &term;
// 2. 廣播所有等待的消費者
pthread_cond_broadcast(&pool->cond);
if (in_pool) // 3. 這里的魔法等下講>_<~
{
/* Thread pool destroyed in a pool thread is legal. */
pthread_detach(pthread_self());
pool->nthreads--;
}
// 4. 如果還有線程沒有退完,我會等,注意這里是while
while (pool->nthreads > 0)
pthread_cond_wait(&term, &pool->mutex);
pthread_mutex_unlock(&pool->mutex);
// 5.同樣地等待打算退出的上一個人
if (memcmp(&pool->tid, &__zero_tid, sizeof (pthread_t)) != 0)
pthread_join(pool->tid, NULL);
}
???? 5 - 特點3:同樣可以在線程任務里銷毀這個線程池
既然線程任務可以做任何事情,
理論上,線程任務也可以銷毀線程池?
作為一個邏輯完備的線程池,
大膽一點,
我們把問號去掉。
而且,銷毀并不會結束當前任務,
它會等這個任務執行完。
想象一下,剛才的__thrdpool_routine(),
while(1)里拿出來的那個任務,
做的事情竟然是發起thrdpool_destroy()...
把上面的圖大膽改一下:????
如果發起銷毀的人,
是我們自己內部的線程,
那么我們就不是等n個,而是等n-1,
少了一個外部線程等待我們。
如何實現才能讓這些邏輯都完美融合呢?
我們把剛才跳過的三段魔法串起來看看。
???? 第一段魔法,銷毀的發起者。
如果發起銷毀的人是線程池內部的線程,
那么它具有較強的自我管理意識
(因為前面說了,會等它這個任務執行完)
而我們可以放心大膽地pthread_detach,
無需任何人join它等待它結束。
static void __thrdpool_terminate(int in_pool, thrdpool_t *pool)
{
...
// 每個由線程池創建的線程都設置了一個key,由此判斷是否是in_pool
if (in_pool)
{
/* Thread pool destroyed in a pool thread is legal. */
pthread_detach(pthread_self());
pool->nthreads--;
}
???? 第二段魔法:線程池誰來free?
一定是發起銷毀的那個人。
所以這里用in_pool來判斷是否是外部的人:
void thrdpool_destroy(void (*pending)(const struct thrdpool_task *),
thrdpool_t *pool)
{
// 已經調用完第一段,且挨個pending(未執行的task)了
// 銷毀其他內部分配的內存
...
// 如果不是內部線程發起的銷毀,要負責回收線程池內存
if (!in_pool)
free(pool);
}
那現在不是main線程發起的銷毀呢?
發起的銷毀的那個內部線程,
怎么能保證我可以在最后關頭
把所有資源回收干凈、調free(pool)、
最后功成身退呢?
在前面閱讀源碼第5步,其實我們看過,
__thrdpool_routine()里有free的地方。
于是現在三段魔法終于串起來了。
???? 第三段魔法:嚴謹的并發。
static void *__thrdpool_routine(void *arg)
{
while (1)
{ // ...
task_routine(task_context); // 如果routine里做的事情,是銷毀線程池...
// 注意這個時候,其他內存都已經被destroy里清掉了,萬萬不可以再用什么mutex、cond
if (pool->nthreads == 0)
{
/* Thread pool was destroyed by the task. */
free(pool);
return NULL;
}
...
非常重要的一點,
由于并發,我們是不知道誰先操作的。
假設我們稍微改一改這個順序,
就又是另一番邏輯。
比如我作為一個內部線程,
在routine()里調用destroy()期間,
發現還有線程沒有執行完,
我就要等在我的terminate上,
待最后看到nthreads==0的那個人叫醒我。
然后,我的代碼繼續執行,
函數棧就會從destroy()回到routine(),
也就是上面那幾行,
再然后就可以free(pool);
由于這時候我已經放飛自我detach了,
于是一切順利結束。
你看,無論如何都可以完美地銷毀線程池:
并發是復雜多變的,代碼是簡潔統一的
是不是太妙了!
我寫到這里已經要感動哭了!????
???? 6 - 簡單的用法
這個線程池只有兩個文件:thrdpool.h
和 thrdpool.c
,
而且只依賴內核的數據結構list.h
。
我們把它拿出來玩,自己寫一段代碼:
void my_routine(void *context)
{
// 我們要執行的函數
printf("task-%llu start.\n", reinterpret_cast<unsigned long long>(context); );
}
void my_pending(const struct thrdpool_task *task)
{
// 線程池銷毀后,沒執行的任務會到這里
printf("pending task-%llu.\n", reinterpret_cast<unsigned long long>(task->context););
}
int main()
{
thrdpool_t *thrd_pool = thrdpool_create(3, 1024); // 創建
struct thrdpool_task task;
unsigned long long i;
for (i = 0; i < 5; i++)
{
task.routine = &my_routine;
task.context = reinterpret_cast<void *>(i);
thrdpool_schedule(&task, thrd_pool); // 調用
}
getchar(); // 卡住主線程,按回車繼續
thrdpool_destroy(&my_pending, thrd_pool); // 結束
return 0;
}
再打印幾行log,直接編譯就可以跑起來:
簡單程度堪比大一上學期C語言作業。????
???? 7 - 并發與結構之美
最后談談感受。
看完之后我很后悔為什么沒有早點看
為什么不早點就可以獲得知識的感覺,
并且在淺層看懂之際,
我知道自己肯定沒有完全理解到里邊的精髓,
畢竟我不能深刻地理解到
設計者當時對并發的構思和模型上的選擇。
我只能說,
沒有十多年頂級的系統調用和并發編程的功底
寫不出這樣的代碼,
沒有極致的審美與對品控的偏執
也寫不出這樣的代碼。
并發編程有很多說道,
就正如退出這個這么簡單的事情,
想要做到退出時回收干凈卻很難。
如果說你寫業務邏輯自己管線程,
退出什么的sleep(1)都無所謂,
但如果說做框架的人
不能把自己的框架做得完美無暇邏輯自洽
就難免讓人感覺差點意思。
而這個thrdpool,它作為一個線程池,
是如此的邏輯完備,
用最對稱簡潔的方式去面對復雜的并發。
再次讓我深深地感到震撼:
我們身邊那些原始的、底層的、基礎的代碼,
還有很多新思路,
還可以寫得如此美。
Workflow項目源碼地址:
https://github.com/sogou/workflow
????【閱讀原文】可以查看~