?? nt.cc
字號:
// Package : omnithread// omnithread/nt.cc Created : 6/95 tjr//// Copyright (C) 1995-1999 AT&T Laboratories Cambridge//// This file is part of the omnithread library//// The omnithread library is free software; you can redistribute it and/or// modify it under the terms of the GNU Library General Public// License as published by the Free Software Foundation; either// version 2 of the License, or (at your option) any later version.//// This library is distributed in the hope that it will be useful,// but WITHOUT ANY WARRANTY; without even the implied warranty of// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU// Library General Public License for more details.//// You should have received a copy of the GNU Library General Public// License along with this library; if not, write to the Free// Software Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA // 02111-1307, USA////// Implementation of OMNI thread abstraction for NT threads//#ifdef HAVE_CONFIG_H#include <config.h>#endif#include <stdlib.h>#include <errno.h>#include <omnithread.h>#include <process.h>#define DB(x) // x //#include <iostream.h> or #include <iostream> if DB is on.static void get_time_now(unsigned long* abs_sec, unsigned long* abs_nsec);/////////////////////////////////////////////////////////////////////////////// Mutex/////////////////////////////////////////////////////////////////////////////omni_mutex::omni_mutex(void){ InitializeCriticalSection(&crit);}omni_mutex::~omni_mutex(void){ DeleteCriticalSection(&crit);}/////////////////////////////////////////////////////////////////////////////// Condition variable///////////////////////////////////////////////////////////////////////////////// Condition variables are tricky to implement using NT synchronisation// primitives, since none of them have the atomic "release mutex and wait to be// signalled" which is central to the idea of a condition variable. To get// around this the solution is to record which threads are waiting and// explicitly wake up those threads.//// Here we implement a condition variable using a list of waiting threads// (protected by a critical section), and a per-thread semaphore (which// actually only needs to be a binary semaphore).//// To wait on the cv, a thread puts itself on the list of waiting threads for// that cv, then releases the mutex and waits on its own personal semaphore. A// signalling thread simply takes a thread from the head of the list and kicks// that thread's semaphore. Broadcast is simply implemented by kicking the// semaphore of each waiting thread.//// The only other tricky part comes when a thread gets a timeout from a timed// wait on its semaphore. Between returning with a timeout from the wait and// entering the critical section, a signalling thread could get in, kick the// waiting thread's semaphore and remove it from the list. If this happens,// the waiting thread's semaphore is now out of step so it needs resetting, and// the thread should indicate that it was signalled rather than that it timed// out.//// It is possible that the thread calling wait or timedwait is not a// omni_thread. In this case we have to provide a temporary data structure,// i.e. for the duration of the call, for the thread to link itself on the// list of waiting threads. _internal_omni_thread_dummy provides such// a data structure and _internal_omni_thread_helper is a helper class to// deal with this special case for wait() and timedwait(). Once created,// the _internal_omni_thread_dummy is cached for use by the next wait() or// timedwait() call from a non-omni_thread. This is probably worth doing// because creating a Semaphore is quite heavy weight.class _internal_omni_thread_helper;class _internal_omni_thread_dummy : public omni_thread {public: inline _internal_omni_thread_dummy() : next(0) { } inline ~_internal_omni_thread_dummy() { } friend class _internal_omni_thread_helper;private: _internal_omni_thread_dummy* next;};class _internal_omni_thread_helper {public: inline _internal_omni_thread_helper() { d = 0; t = omni_thread::self(); if (!t) { omni_mutex_lock sync(cachelock); if (cache) { d = cache; cache = cache->next; } else { d = new _internal_omni_thread_dummy; } t = d; } } inline ~_internal_omni_thread_helper() { if (d) { omni_mutex_lock sync(cachelock); d->next = cache; cache = d; } } inline operator omni_thread* () { return t; } inline omni_thread* operator->() { return t; } static _internal_omni_thread_dummy* cache; static omni_mutex cachelock;private: _internal_omni_thread_dummy* d; omni_thread* t;};_internal_omni_thread_dummy* _internal_omni_thread_helper::cache = 0;omni_mutex _internal_omni_thread_helper::cachelock;omni_condition::omni_condition(omni_mutex* m) : mutex(m){ InitializeCriticalSection(&crit); waiting_head = waiting_tail = NULL;}omni_condition::~omni_condition(void){ DeleteCriticalSection(&crit); DB( if (waiting_head != NULL) { cerr << "omni_condition::~omni_condition: list of waiting threads " << "is not empty\n"; } )}voidomni_condition::wait(void){ _internal_omni_thread_helper me; EnterCriticalSection(&crit); me->cond_next = NULL; me->cond_prev = waiting_tail; if (waiting_head == NULL) waiting_head = me; else waiting_tail->cond_next = me; waiting_tail = me; me->cond_waiting = TRUE; LeaveCriticalSection(&crit); mutex->unlock(); DWORD result = WaitForSingleObject(me->cond_semaphore, INFINITE); mutex->lock(); if (result != WAIT_OBJECT_0) throw omni_thread_fatal(GetLastError());}intomni_condition::timedwait(unsigned long abs_sec, unsigned long abs_nsec){ _internal_omni_thread_helper me; EnterCriticalSection(&crit); me->cond_next = NULL; me->cond_prev = waiting_tail; if (waiting_head == NULL) waiting_head = me; else waiting_tail->cond_next = me; waiting_tail = me; me->cond_waiting = TRUE; LeaveCriticalSection(&crit); mutex->unlock(); unsigned long now_sec, now_nsec; get_time_now(&now_sec, &now_nsec); DWORD timeout; if ((abs_sec <= now_sec) && ((abs_sec < now_sec) || (abs_nsec < now_nsec))) timeout = 0; else { timeout = (abs_sec-now_sec) * 1000; if( abs_nsec < now_nsec ) timeout -= (now_nsec-abs_nsec) / 1000000; else timeout += (abs_nsec-now_nsec) / 1000000; } DWORD result = WaitForSingleObject(me->cond_semaphore, timeout); if (result == WAIT_TIMEOUT) { EnterCriticalSection(&crit); if (me->cond_waiting) { if (me->cond_prev != NULL) me->cond_prev->cond_next = me->cond_next; else waiting_head = me->cond_next; if (me->cond_next != NULL) me->cond_next->cond_prev = me->cond_prev; else waiting_tail = me->cond_prev; me->cond_waiting = FALSE; LeaveCriticalSection(&crit); mutex->lock(); return 0; } // // We timed out but another thread still signalled us. Wait for // the semaphore (it _must_ have been signalled) to decrement it // again. Return that we were signalled, not that we timed out. // LeaveCriticalSection(&crit); result = WaitForSingleObject(me->cond_semaphore, INFINITE); } if (result != WAIT_OBJECT_0) throw omni_thread_fatal(GetLastError()); mutex->lock(); return 1;}voidomni_condition::signal(void){ EnterCriticalSection(&crit); if (waiting_head != NULL) { omni_thread* t = waiting_head; waiting_head = t->cond_next; if (waiting_head == NULL) waiting_tail = NULL; else waiting_head->cond_prev = NULL; t->cond_waiting = FALSE; if (!ReleaseSemaphore(t->cond_semaphore, 1, NULL)) { int rc = GetLastError(); LeaveCriticalSection(&crit); throw omni_thread_fatal(rc); } } LeaveCriticalSection(&crit);}voidomni_condition::broadcast(void){ EnterCriticalSection(&crit); while (waiting_head != NULL) { omni_thread* t = waiting_head; waiting_head = t->cond_next; if (waiting_head == NULL) waiting_tail = NULL; else waiting_head->cond_prev = NULL; t->cond_waiting = FALSE; if (!ReleaseSemaphore(t->cond_semaphore, 1, NULL)) { int rc = GetLastError(); LeaveCriticalSection(&crit); throw omni_thread_fatal(rc); } } LeaveCriticalSection(&crit);}/////////////////////////////////////////////////////////////////////////////// Counting semaphore/////////////////////////////////////////////////////////////////////////////#define SEMAPHORE_MAX 0x7fffffffomni_semaphore::omni_semaphore(unsigned int initial){ nt_sem = CreateSemaphore(NULL, initial, SEMAPHORE_MAX, NULL); if (nt_sem == NULL) { DB( cerr << "omni_semaphore::omni_semaphore: CreateSemaphore error " << GetLastError() << endl ); throw omni_thread_fatal(GetLastError()); }}omni_semaphore::~omni_semaphore(void){ if (!CloseHandle(nt_sem)) { DB( cerr << "omni_semaphore::~omni_semaphore: CloseHandle error " << GetLastError() << endl ); throw omni_thread_fatal(GetLastError()); }}voidomni_semaphore::wait(void){ if (WaitForSingleObject(nt_sem, INFINITE) != WAIT_OBJECT_0) throw omni_thread_fatal(GetLastError());}intomni_semaphore::trywait(void){ switch (WaitForSingleObject(nt_sem, 0)) { case WAIT_OBJECT_0: return 1; case WAIT_TIMEOUT: return 0; } throw omni_thread_fatal(GetLastError()); return 0; /* keep msvc++ happy */}voidomni_semaphore::post(void){ if (!ReleaseSemaphore(nt_sem, 1, NULL)) throw omni_thread_fatal(GetLastError());}/////////////////////////////////////////////////////////////////////////////// Thread///////////////////////////////////////////////////////////////////////////////// Static variables//omni_mutex* omni_thread::next_id_mutex;int omni_thread::next_id = 0;static DWORD self_tls_index;static unsigned int stack_size = 0;//// Initialisation function (gets called before any user code).//static int& count() { static int the_count = 0; return the_count;}omni_thread::init_t::init_t(void){ if (count()++ != 0) // only do it once however many objects get created. return; DB(cerr << "omni_thread::init: NT implementation initialising\n"); self_tls_index = TlsAlloc(); if (self_tls_index == 0xffffffff) throw omni_thread_fatal(GetLastError()); next_id_mutex = new omni_mutex; // // Create object for this (i.e. initial) thread. // omni_thread* t = new omni_thread; t->_state = STATE_RUNNING; if (!DuplicateHandle(GetCurrentProcess(), GetCurrentThread(), GetCurrentProcess(), &t->handle, 0, FALSE, DUPLICATE_SAME_ACCESS)) throw omni_thread_fatal(GetLastError()); t->nt_id = GetCurrentThreadId(); DB(cerr << "initial thread " << t->id() << " NT thread id " << t->nt_id << endl); if (!TlsSetValue(self_tls_index, (LPVOID)t)) throw omni_thread_fatal(GetLastError()); if (!SetThreadPriority(t->handle, nt_priority(PRIORITY_NORMAL))) throw omni_thread_fatal(GetLastError());}omni_thread::init_t::~init_t(void){ if (--count() != 0) return; omni_thread* self = omni_thread::self(); if (!self) return; TlsSetValue(self_tls_index, (LPVOID)0); delete self; delete next_id_mutex; TlsFree(self_tls_index);}//// Wrapper for thread creation.//extern "C" #ifndef __BCPLUSPLUS__unsigned __stdcall#elsevoid _USERENTRY#endifomni_thread_wrapper(void* ptr){ omni_thread* me = (omni_thread*)ptr;
?? 快捷鍵說明
復制代碼
Ctrl + C
搜索代碼
Ctrl + F
全屏模式
F11
切換主題
Ctrl + Shift + D
顯示快捷鍵
?
增大字號
Ctrl + =
減小字號
Ctrl + -