?? cbroadcast.cpp
字號(hào):
#include "CMcl.h"
#include <stdio.h>
#include <assert.h>
class CBroadcastChannel {
private:
enum { MAX_CLIENTS = 32 };
typedef struct {
DWORD dwClientMask;
BYTE abData[1];
} SampleHeader;
private:
DWORD m_dwMaxSamples;
DWORD m_cbDataSize;
DWORD m_cbSampleSize;
SampleHeader *m_pSample;
DWORD m_dwNextWrite;
DWORD m_adwNextRead[MAX_CLIENTS];
DWORD m_dwClientMask;
CMclSemaphore m_csBuffersEmpty;
CMclSemaphoreAutoPtr m_apcsBuffersFull[MAX_CLIENTS];
CMclMutex m_cmUpdateGuard;
public:
CBroadcastChannel( DWORD dwMaxSamples, DWORD cbDataSize) :
m_csBuffersEmpty( dwMaxSamples, dwMaxSamples),
m_cmUpdateGuard() {
m_dwMaxSamples = dwMaxSamples;
m_cbDataSize = cbDataSize;
m_cbSampleSize = cbDataSize + sizeof(SampleHeader) - 1;
m_pSample = (SampleHeader *) new BYTE[m_cbSampleSize * m_dwMaxSamples];
ZeroMemory( m_pSample, m_cbSampleSize * m_dwMaxSamples);
m_dwClientMask = 0;
m_dwNextWrite = 0;
ZeroMemory( m_adwNextRead, sizeof(DWORD) * MAX_CLIENTS);
};
DWORD Register(void) {
// grab the guard mutex...
CMclAutoLock lock(m_cmUpdateGuard);
// find the next available ID,
// MAX_CLIENTS ID's are available, at each bit position
// of a DWORD...
for (DWORD dwMask = 1, dwId = 0; dwMask; dwMask <<= 1, dwId++) {
if (!(dwMask & m_dwClientMask)) {
// mark the bit in the client mask...
m_dwClientMask |= dwMask;
// allocate a semaphore...
m_apcsBuffersFull[dwId] = new CMclSemaphore( 0, m_dwMaxSamples);
// reading will start with the NEXT sample written
// into the broadcast channel...
m_adwNextRead[dwId] = m_dwNextWrite;
// return the id...
return dwId;
}
}
// return 0xFFFFFFFF if we are servicing
// the maximum number of clients...
return 0xFFFFFFFF;
};
void Unregister(DWORD dwId) {
// grab the guard mutex...
CMclAutoLock lock(m_cmUpdateGuard);
// remove this client from all of the samples...
DWORD dwSample;
DWORD dwReleaseCount = 0;
DWORD dwMask = 1 << dwId;
for (dwSample = 0; dwSample < m_dwMaxSamples; dwSample++) {
SampleHeader *pNextSample = (SampleHeader *) ((LPBYTE)m_pSample + m_cbSampleSize * dwSample);
if (pNextSample->dwClientMask & dwMask) {
pNextSample->dwClientMask &= ~dwMask;
if (pNextSample->dwClientMask == 0) {
// this sample has no waiting clients
// and can be freed...
dwReleaseCount++;
}
}
}
// release the semaphore equal to the number
// of samples freed...
if (dwReleaseCount > 0) {
m_csBuffersEmpty.Release(dwReleaseCount);
}
// zero out the bit for this client ID...
m_dwClientMask &= ~dwMask;
};
BOOL ReadSample( DWORD dwId, LPVOID pSample) {
// compute the semaphore index from the id
// and wait until some buffers are ready to be read...
m_apcsBuffersFull[dwId]->Wait(INFINITE);
// read the next sample for this client...
DWORD dwSample = m_adwNextRead[dwId];
SampleHeader *pNextSample = (SampleHeader *) ((LPBYTE)m_pSample + m_cbSampleSize * dwSample);
DWORD dwMask = 1 << dwId;
if (pNextSample->dwClientMask & dwMask) {
// found a sample, copy out the data...
// this sample can be read by multiple
// clients at the same time...
CopyMemory( pSample, pNextSample->abData, m_cbDataSize);
// now we need to modify and check the client
// mask for this same, so we must acquire the
// guard mutex...
m_cmUpdateGuard.Wait(INFINITE);
// mark the sample as read by this client...
pNextSample->dwClientMask &= ~dwMask;
// release this sample if everyone has read it...
if (pNextSample->dwClientMask == 0) {
m_csBuffersEmpty.Release(1);
}
// increment the read pointer...
m_adwNextRead[dwId] = (m_adwNextRead[dwId] + 1) % m_dwMaxSamples;
// release the guard mutex...
m_cmUpdateGuard.Release();
// return success...
return TRUE;
}
// something went wrong...
return FALSE;
};
BOOL WriteSample( LPVOID pSample) {
// wait for a empty sample buffer to become available...
// we will be modifying the sample so we
// need to acquire the guard mutex as well...
m_csBuffersEmpty.WaitForTwo( m_cmUpdateGuard, TRUE, INFINITE);
// only broadcast this sample if there are clients...
BOOL bSampleWritten;
if (m_dwClientMask) {
// compute the pointer to the next sample...
SampleHeader *pNextSample = (SampleHeader *) ((LPBYTE)m_pSample + m_cbSampleSize * m_dwNextWrite);
// set all the current client bits...
pNextSample->dwClientMask = m_dwClientMask;
// copy the data into the sample...
CopyMemory( pNextSample->abData, pSample, m_cbDataSize);
// increment the write index...
m_dwNextWrite = (m_dwNextWrite + 1) % m_dwMaxSamples;
// increase the semaphore counts of all the readers...
DWORD dwIndex;
DWORD dwMask;
for (dwIndex = 0, dwMask = 1; dwIndex < MAX_CLIENTS; dwMask <<= 1, dwIndex++) {
if (m_dwClientMask & dwMask)
m_apcsBuffersFull[dwIndex]->Release(1);
}
bSampleWritten = TRUE;
}
else {
// throw away the sample, release the buffer...
m_csBuffersEmpty.Release(1);
bSampleWritten = FALSE;
}
// release the guard mutex...
m_cmUpdateGuard.Release();
// return status indicates if sample was written...
return bSampleWritten;
};
};
class MultiConsumerHandler : public CMclThreadHandler {
private:
CMclThreadAutoPtr m_apThread;
BOOL m_bRun;
CBroadcastChannel *m_pBroadcast;
DWORD m_dwId;
public:
MultiConsumerHandler( CBroadcastChannel *pBroadcast) {
m_bRun = TRUE;
m_pBroadcast = pBroadcast;
m_apThread = new CMclThread(this);
};
void Stop(void) {
m_bRun = FALSE;
m_apThread->Wait(INFINITE);
};
unsigned ThreadHandlerProc(void) {
// register with the broadcast channel
// before we read from it...
m_dwId = m_pBroadcast->Register();
printf( "Consumer #%d Started.\n", m_dwId);
while (m_bRun) {
DWORD dwSample;
BOOL bStatus = m_pBroadcast->ReadSample( m_dwId, &dwSample);
assert(bStatus);
Sleep(m_dwId*100);
}
// unregister when we are done with the
// broadcast channel...
m_pBroadcast->Unregister(m_dwId);
printf( "Consumer #%d Stopped.\n", m_dwId);
return 0;
};
};
class ProducerHandler : public CMclThreadHandler {
private:
CMclThreadAutoPtr m_apThread;
BOOL m_bRun;
CBroadcastChannel *m_pBroadcast;
public:
ProducerHandler( CBroadcastChannel *pBroadcast) {
m_bRun = TRUE;
m_pBroadcast = pBroadcast;
m_apThread = new CMclThread(this);
};
void Stop(void) {
m_bRun = FALSE;
m_apThread->Wait(INFINITE);
};
unsigned ThreadHandlerProc(void) {
printf( "Producer Started.\n");
DWORD dwData = 0;
while (m_bRun) {
// write data into the broadcast channel
// until we are told to stop...
m_pBroadcast->WriteSample( &dwData);
dwData++;
}
printf( "Producer Stopped.\n");
return 0;
}
};
// define some constants for the main() function.,.
#define NUMBER_CONSUMERS 8
#define NUMBER_CHANNEL_BUFFERS 4
int main( int argc, char *argv[]) {
CBroadcastChannel broadcast( NUMBER_CHANNEL_BUFFERS, sizeof(DWORD));
ProducerHandler *pProducer;
MultiConsumerHandler *pConsumer[NUMBER_CONSUMERS];
// create the producer...
printf("Creating the Producer...\n");
pProducer = new ProducerHandler( &broadcast);
// create nConsumers consumers...
int i;
for (i = 0; i < NUMBER_CONSUMERS; i++) {
printf( "Creating Consumer #%d...\n", i);
pConsumer[i] = new MultiConsumerHandler( &broadcast);
}
// let everything run for a little while...
Sleep(2000);
// stop the consumers...
for (i = 0; i < NUMBER_CONSUMERS; i++) {
printf( "Stopping Consumer #%d...\n", i);
pConsumer[i]->Stop();
delete pConsumer[i];
}
// stop the producer...
printf("Stopping the Producer...\n");
pProducer->Stop();
delete pProducer;
// all done...
printf("Exiting.\n");
return 0;
}
?? 快捷鍵說(shuō)明
復(fù)制代碼
Ctrl + C
搜索代碼
Ctrl + F
全屏模式
F11
切換主題
Ctrl + Shift + D
顯示快捷鍵
?
增大字號(hào)
Ctrl + =
減小字號(hào)
Ctrl + -