?? erl_async.c
字號:
/*** Asyncronous*/#ifdef HAVE_CONFIG_H# include "config.h"#endif#include "sys.h"#include "erl_sys_driver.h"#include "global.h"#include "erl_threads.h"typedef struct _erl_async { struct _erl_async* next; struct _erl_async* prev; DE_Handle* hndl; /* The DE_Handle is needed when port is gone */ Eterm port; long async_id; void* async_data; ErlDrvPDL pdl; void (*async_invoke)(void*); void (*async_free)(void*);} ErlAsync;typedef struct { erts_mtx_t mtx; erts_cnd_t cv; erts_tid_t thr; int len;#ifndef ERTS_SMP int hndl;#endif ErlAsync* head; ErlAsync* tail;#ifdef ERTS_ENABLE_LOCK_CHECK int no;#endif} AsyncQueue;static erts_smp_spinlock_t async_id_lock;static long async_id = 0;#ifndef ERTS_SMPerts_mtx_t async_ready_mtx;static ErlAsync* async_ready_list = NULL;#endif/*** Initialize worker threads (if supported)*//* Detach from driver */static void async_detach(DE_Handle* dh){ /* XXX:PaN what should happen here? we want to unload the driver or??? */ return;}#ifdef USE_THREADSstatic AsyncQueue* async_q;static void* async_main(void*);static void async_add(ErlAsync*, AsyncQueue*);int init_async(int hndl){ erts_thr_opts_t thr_opts = ERTS_THR_OPTS_DEFAULT_INITER; AsyncQueue* q; int i; thr_opts.detached = 0; thr_opts.suggested_stack_size = erts_async_thread_suggested_stack_size;#ifndef ERTS_SMP erts_mtx_init(&async_ready_mtx, "async_ready"); async_ready_list = NULL;#endif async_id = 0; erts_smp_spinlock_init(&async_id_lock, "async_id"); async_q = q = (AsyncQueue*) (erts_async_max_threads ? erts_alloc(ERTS_ALC_T_ASYNC_Q, erts_async_max_threads * sizeof(AsyncQueue)) : NULL); for (i = 0; i < erts_async_max_threads; i++) { q->head = NULL; q->tail = NULL; q->len = 0;#ifndef ERTS_SMP q->hndl = hndl;#endif#ifdef ERTS_ENABLE_LOCK_CHECK q->no = i;#endif erts_mtx_init(&q->mtx, "asyncq"); erts_cnd_init(&q->cv); erts_thr_create(&q->thr, async_main, (void*)q, &thr_opts); q++; } return 0;}int exit_async(){ int i; /* terminate threads */ for (i = 0; i < erts_async_max_threads; i++) { ErlAsync* a = (ErlAsync*) erts_alloc(ERTS_ALC_T_ASYNC, sizeof(ErlAsync)); a->port = NIL; async_add(a, &async_q[i]); } for (i = 0; i < erts_async_max_threads; i++) { erts_thr_join(async_q[i].thr, NULL); erts_mtx_destroy(&async_q[i].mtx); erts_cnd_destroy(&async_q[i].cv); }#ifndef ERTS_SMP erts_mtx_destroy(&async_ready_mtx);#endif if (async_q) erts_free(ERTS_ALC_T_ASYNC_Q, (void *) async_q); return 0;}static void async_add(ErlAsync* a, AsyncQueue* q){ /* XXX:PaN Is this still necessary when ports lock drivers? */ if (is_internal_port(a->port)) { ERTS_LC_ASSERT(erts_drvportid2port(a->port)); /* make sure the driver will stay around */ driver_lock_driver(internal_port_index(a->port)); } erts_mtx_lock(&q->mtx); if (q->len == 0) { q->head = a; q->tail = a; q->len = 1; erts_cnd_signal(&q->cv); } else { /* no need to signal (since the worker is working) */ a->next = q->head; q->head->prev = a; q->head = a; q->len++; } erts_mtx_unlock(&q->mtx);}static voidprepare_for_block(void *vmtxp){ erts_mtx_unlock((erts_mtx_t *) vmtxp);}static voidresume_after_block(void *vmtxp){ erts_mtx_lock((erts_mtx_t *) vmtxp);}static ErlAsync* async_get(AsyncQueue* q){ ErlAsync* a; erts_mtx_lock(&q->mtx); erts_smp_activity_change(ERTS_ACTIVITY_IO, ERTS_ACTIVITY_WAIT, prepare_for_block, resume_after_block, (void *) &q->mtx); while((a = q->tail) == NULL) { erts_cnd_wait(&q->cv, &q->mtx); } erts_smp_activity_change(ERTS_ACTIVITY_WAIT, ERTS_ACTIVITY_IO, prepare_for_block, resume_after_block, (void *) &q->mtx);#ifdef ERTS_SMP ASSERT(a && q->tail == a);#endif if (q->head == q->tail) { q->head = q->tail = NULL; q->len = 0; } else { q->tail->prev->next = NULL; q->tail = q->tail->prev; q->len--; } erts_mtx_unlock(&q->mtx); return a;}static int async_del(long id){ int i; /* scan all queue for an entry with async_id == 'id' */ for (i = 0; i < erts_async_max_threads; i++) { ErlAsync* a; erts_mtx_lock(&async_q[i].mtx); a = async_q[i].head; while(a != NULL) { if (a->async_id == id) { if (a->prev != NULL) a->prev->next = a->next; else async_q[i].head = a->next; if (a->next != NULL) a->next->prev = a->prev; else async_q[i].tail = a->prev; async_q[i].len--; erts_mtx_unlock(&async_q[i].mtx); if (a->async_free != NULL) a->async_free(a->async_data); async_detach(a->hndl); erts_free(ERTS_ALC_T_ASYNC, a); return 1; } } erts_mtx_unlock(&async_q[i].mtx); } return 0;}static void* async_main(void* arg){ AsyncQueue* q = (AsyncQueue*) arg;#ifdef ERTS_ENABLE_LOCK_CHECK { char buf[27]; erts_snprintf(&buf[0], 27, "async %d", q->no); erts_lc_set_thread_name(&buf[0]); }#endif#ifdef ERTS_SMP erts_register_blockable_thread();#endif erts_smp_activity_begin(ERTS_ACTIVITY_IO, NULL, NULL, NULL); while(1) { ErlAsync* a = async_get(q); if (a->port == NIL) { /* TIME TO DIE SIGNAL */ erts_free(ERTS_ALC_T_ASYNC, (void *) a); break; } else { (*a->async_invoke)(a->async_data); /* Major problem if the code for async_invoke or async_free is removed during a blocking operation */#ifdef ERTS_SMP { Port *p; p = erts_id2port_sflgs(a->port, NULL, 0, ERTS_PORT_SFLGS_INVALID_DRIVER_LOOKUP); if (p) { if (async_ready(p, a->async_data)) { if (a->async_free) (*a->async_free)(a->async_data); } async_detach(a->hndl); erts_port_release(p); } if (a->pdl) { driver_pdl_dec_refc(a->pdl); } erts_free(ERTS_ALC_T_ASYNC, (void *) a); }#else if (a->pdl) { driver_pdl_dec_refc(a->pdl); } erts_mtx_lock(&async_ready_mtx); a->next = async_ready_list; async_ready_list = a; erts_mtx_unlock(&async_ready_mtx); sys_async_ready(q->hndl);#endif } } erts_smp_activity_end(ERTS_ACTIVITY_IO, NULL, NULL, NULL); return NULL;}#endif#ifndef ERTS_SMPint check_async_ready(){ ErlAsync* a; int count = 0; erts_mtx_lock(&async_ready_mtx); a = async_ready_list; async_ready_list = NULL; erts_mtx_unlock(&async_ready_mtx); while(a != NULL) { ErlAsync* a_next = a->next; /* Every port not dead */ Port *p = erts_id2port_sflgs(a->port, NULL, 0, ERTS_PORT_SFLGS_INVALID_DRIVER_LOOKUP); if (p != NULL) { count++; if (async_ready(p, a->async_data)) { if (a->async_free != NULL) (*a->async_free)(a->async_data); } async_detach(a->hndl); erts_port_release(p); } erts_free(ERTS_ALC_T_ASYNC, (void *) a); a = a_next; } return count;}#endif/*** Schedule async_invoke on a worker thread** NOTE will be syncrounous when threads are unsupported** return values:** 0 completed ** -1 error** N handle value (used with async_cancel)** arguments:** ix driver index ** key pointer to secedule queue (NULL means round robin)** async_invoke function to run in thread** async_data data to pass to invoke function** async_free function for relase async_data in case of failure*/long driver_async(ErlDrvPort ix, unsigned int* key, void (*async_invoke)(void*), void* async_data, void (*async_free)(void*)){ ErlAsync* a = (ErlAsync*) erts_alloc(ERTS_ALC_T_ASYNC, sizeof(ErlAsync)); Port* prt = erts_drvport2port(ix); long id; unsigned int qix; if (!prt) return -1; ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt)); a->next = NULL; a->prev = NULL; a->hndl = (DE_Handle*)prt->drv_ptr->handle; a->port = prt->id; a->pdl = NULL; a->async_data = async_data; a->async_invoke = async_invoke; a->async_free = async_free; erts_smp_spin_lock(&async_id_lock); async_id = (async_id + 1) & 0x7fffffff; if (async_id == 0) async_id++; id = async_id; erts_smp_spin_unlock(&async_id_lock); a->async_id = id; if (key == NULL) { qix = (erts_async_max_threads > 0) ? (id % erts_async_max_threads) : 0; } else { qix = (erts_async_max_threads > 0) ? (*key % erts_async_max_threads) : 0; *key = qix; }#ifdef USE_THREADS if (erts_async_max_threads > 0) { if (prt->port_data_lock) { driver_pdl_inc_refc(prt->port_data_lock); a->pdl = prt->port_data_lock; } async_add(a, &async_q[qix]); return id; }#endif (*a->async_invoke)(a->async_data); if (async_ready(prt, a->async_data)) { if (a->async_free != NULL) (*a->async_free)(a->async_data); } erts_free(ERTS_ALC_T_ASYNC, (void *) a); return id;}int driver_async_cancel(unsigned int id){#ifdef USE_THREADS if (erts_async_max_threads > 0) return async_del(id);#endif return 0;}
?? 快捷鍵說明
復制代碼
Ctrl + C
搜索代碼
Ctrl + F
全屏模式
F11
切換主題
Ctrl + Shift + D
顯示快捷鍵
?
增大字號
Ctrl + =
減小字號
Ctrl + -