之前寫過一篇文章叫《寫了一段高端C++代碼》,這篇文章的背景和它完全相同,我這里再復(fù)述一遍。
背景:
在音視頻方向中,線程分為普通線程和GL線程(OpenGL線程),GL線程中可以執(zhí)行OpenGL相關(guān)的語句,做一些圖像渲染的工作,也可以理解為所有GL語句都要在GL線程中執(zhí)行;而在普通線程中,只能執(zhí)行那些我們平時經(jīng)常接觸的普通語句。
在具體項(xiàng)目開發(fā)中會有些需求:在普通線程中突然想要執(zhí)行某些必須要在GL線程下執(zhí)行的任務(wù)(比如某些初始化工作,釋放某些GL相關(guān)的對象),執(zhí)行完此任務(wù)后又繼續(xù)執(zhí)行自己的任務(wù),像在同一個線程執(zhí)行一樣:
void func() {
task1();
task2(); // 需要在GL線程執(zhí)行
task3();
}
分析:
這里有個關(guān)鍵點(diǎn):task3()一定要等到task2()執(zhí)行完畢后才可執(zhí)行,但是由于task2()是被拋到了其他線程運(yùn)行,沒有起到阻塞執(zhí)行的效果。
怎么能達(dá)到目的呢?可以這樣使用條件變量:
void task2() {
...
notify();
}
void func() {
task1();
task2(); // 需要在GL線程執(zhí)行
wait();
task3();
}
普通線程在task2()后使用wait()阻塞線程,待GL線程中的任務(wù)執(zhí)行完后使用notity()打斷普通線程的阻塞,可達(dá)到順序執(zhí)行的目的。
但這樣非常麻煩,而且不通用,代碼還相當(dāng)難看。
在之前的文章里我使用C++的future封裝了一套函數(shù),可以方便的跨線程阻塞調(diào)度某個任務(wù)執(zhí)行,然而我還有個項(xiàng)目是使用純C語言開發(fā)的,沒有了C++的future,要完成類似需求就比較困難,但是,困難也得搞阿,于是有了下面的代碼。
先看下如何函數(shù)的設(shè)計:
typedef struct Dispatcher Dispatcher;
typedef void (*DispatcherFunc)(void* arg);
/**
* @brief 創(chuàng)建一個實(shí)例,內(nèi)部會常駐一個GL線程,外部可以將某些任務(wù)丟到此線程里執(zhí)行,可以選擇是否阻塞執(zhí)行
*/
Dispatcher* Dispatcher_create();
/**
* @brief 銷毀實(shí)例
*/
void Dispatcher_destroy(Dispatcher** dispatcher_p);
/**
* @brief 利用此函數(shù)將任務(wù)丟到線程里執(zhí)行
*
* @param dispatcher 實(shí)例
* @param func 要執(zhí)行的函數(shù)
* @param arg 函數(shù)參數(shù)
* @param block 選擇是否阻塞執(zhí)行
*/
void Dispatcher_run(Dispatcher* dispatcher, DispatcherFunc func, void* arg, bool block);
主要功能就在最后一個函數(shù),該函數(shù)可以選擇是否阻塞執(zhí)行,這里大家可以思考一下,如何實(shí)現(xiàn)阻塞的需求?條件變量是肯定的,但是如何保證條件變量等待的是當(dāng)前事件呢?還是直接看代碼實(shí)現(xiàn)吧:
#include <stdatomic.h>
#include "libctools.h"
#include "gldispatch.h"
typedef struct Task {
DispatcherFunc func;
void* arg;
Mutex* mutex;
Cond* cond;
bool is_continue;
bool is_block;
} Task;
struct Dispatcher {
Thread* thread_id;
Thread _thread_id;
Mutex* mutex;
Cond* cond;
List* task_queue;
atomic_bool is_interrupt;
};
static void taskDestroy(Task* task) {
if (task->mutex) {
mutex_destroyp(&task->mutex);
}
if (task->cond) {
cond_destroyp(&task->cond);
}
free((void*)task);
}
static Task* taskCreate(DispatcherFunc func, void* arg, bool is_block) {
Task* task = (Task*)calloc(1, sizeof(Task));
if (!task) return NULL;
task->func = func;
task->arg = arg;
task->is_block = is_block;
if (is_block) {
task->mutex = mutex_create();
task->cond = cond_create();
}
return task;
}
static int Dispatcher_process(void* arg) {
log_info("%s start", __func__);
Dispatcher* self = (Dispatcher*)arg;
while (!atomic_load(&self->is_interrupt)) {
mutex_lock(self->mutex);
while (self->task_queue->len == 0 && !atomic_load(&self->is_interrupt)) {
cond_wait(self->cond, self->mutex);
}
ListNode* node = list_pop_front(self->task_queue);
mutex_unlock(self->mutex);
if (atomic_load(&self->is_interrupt)) {
break;
}
if (node) {
Task* task = (Task*)node->val;
task->func(task->arg);
if (task->is_block) {
mutex_lock(task->mutex);
task->is_continue = true;
cond_signal(task->cond);
mutex_unlock(task->mutex);
} else {
taskDestroy(task);
}
free(node);
}
}
log_info("%s stop", __func__);
return 0;
}
static void Dispatcher_start(Dispatcher* dispatcher) {
dispatcher->thread_id =
thread_create_with_name(&dispatcher->_thread_id, Dispatcher_process, dispatcher, "Dispatcher_process");
}
static void Dispatcher_stop(Dispatcher* dispatcher) {
atomic_store(&dispatcher->is_interrupt, true);
cond_signal(dispatcher->cond);
list_clear(dispatcher->task_queue, true);
if (dispatcher->thread_id) {
thread_wait(dispatcher->thread_id, NULL);
}
}
Dispatcher* Dispatcher_create() {
Dispatcher* self = (Dispatcher*)calloc(1, sizeof(Dispatcher));
if (!self) return NULL;
self->mutex = mutex_create();
self->cond = cond_create();
self->task_queue = list_create();
self->task_queue->free_func = (void (*)(int64_t))taskDestroy;
Dispatcher_start(self);
return self;
}
void Dispatcher_destroy(Dispatcher** dispatcher_p) {
if (NULL == dispatcher_p || NULL == *dispatcher_p) return;
Dispatcher* self = *dispatcher_p;
Dispatcher_stop(self);
mutex_destroyp(&self->mutex);
cond_destroyp(&self->cond);
list_destroy(self->task_queue);
freep((void**)dispatcher_p);
}
void Dispatcher_run(Dispatcher* dispatcher, DispatcherFunc func, void* arg, bool block) {
if (!dispatcher) return;
Task* task = taskCreate(func, arg, block);
if (task) {
ListNode* node = list_node_new((int64_t)task);
mutex_lock(dispatcher->mutex);
list_push_back(dispatcher->task_queue, node);
cond_signal(dispatcher->cond);
mutex_unlock(dispatcher->mutex);
if (task->is_block) {
mutex_lock(task->mutex);
while (!task->is_continue) {
cond_wait(task->cond, task->mutex);
}
mutex_unlock(task->mutex);
taskDestroy(task);
}
}
}
使用方式:
void func1(void* arg) {
print("hello func1");
}
void func2(void* arg) {
print("hello func2");
}
int main() {
Dispatcher* dispatcher = Dispatcher_create();
Dispatcher_run(dispatcher, func1, NULL, true);
Dispatcher_run(dispatcher, func2, NULL, true);
Dispatcher_destroy(&dispatcher);
return 0;
}
tips:
代碼中的log、mutex、cond、thread、list都是二次封裝的函數(shù),功能無非就是log、加解鎖、條件變量、創(chuàng)建線程以及C語言的鏈表。這里就不貼出他們的實(shí)現(xiàn)了,大家可以自己實(shí)現(xiàn)一套,當(dāng)作個小練習(xí),不難。
代碼我也就不過多介紹了,相信有點(diǎn)水平的朋友都能看懂,有問題可以留言!