?? outputq.cpp
字號:
if (S_OK == m_hr) {
if (m_bBatchExact) {
SendAnyway();
}
m_pPin->NewSegment(tStart, tStop, dRate);
}
} else {
if (m_hr == S_OK) {
//
// we need to queue the new segment to appear in order in the
// data, but we need to pass parameters to it. Rather than
// take the hit of wrapping every single sample so we can tell
// special ones apart, we queue special pointers to indicate
// special packets, and we guarantee (by holding the
// critical section) that the packet immediately following a
// NEW_SEGMENT value is a NewSegmentPacket containing the
// parameters.
NewSegmentPacket * ppack = new NewSegmentPacket;
if (ppack == NULL) {
return;
}
ppack->tStart = tStart;
ppack->tStop = tStop;
ppack->dRate = dRate;
CAutoLock lck(this);
QueueSample(NEW_SEGMENT);
QueueSample( (IMediaSample*) ppack);
NotifyThread();
}
}
}
//
// End of Stream is queued to output device
//
void COutputQueue::EOS()
{
CAutoLock lck(this);
if (!IsQueued()) {
if (m_bBatchExact) {
SendAnyway();
}
if (m_hr == S_OK) {
DbgLog((LOG_TRACE, 2, TEXT("COutputQueue sending EndOfStream()")));
m_bFlushed = FALSE;
HRESULT hr = m_pPin->EndOfStream();
if (FAILED(hr)) {
DbgLog((LOG_ERROR, 2, TEXT("COutputQueue got code 0x%8.8X from EndOfStream()")));
}
}
} else {
if (m_hr == S_OK) {
m_bFlushed = FALSE;
QueueSample(EOS_PACKET);
NotifyThread();
}
}
}
//
// Flush all the samples in the queue
//
void COutputQueue::BeginFlush()
{
if (IsQueued()) {
{
CAutoLock lck(this);
// block receives -- we assume this is done by the
// filter in which we are a component
// discard all queued data
m_bFlushing = TRUE;
// Make sure we discard all samples from now on
if (m_hr == S_OK) {
m_hr = S_FALSE;
}
// Optimize so we don't keep calling downstream all the time
if (m_bFlushed && m_bFlushingOpt) {
return;
}
// Make sure we really wait for the flush to complete
m_evFlushComplete.Reset();
NotifyThread();
}
// pass this downstream
m_pPin->BeginFlush();
} else {
// pass downstream first to avoid deadlocks
m_pPin->BeginFlush();
CAutoLock lck(this);
// discard all queued data
m_bFlushing = TRUE;
// Make sure we discard all samples from now on
if (m_hr == S_OK) {
m_hr = S_FALSE;
}
}
}
//
// leave flush mode - pass this downstream
void COutputQueue::EndFlush()
{
{
CAutoLock lck(this);
ASSERT(m_bFlushing);
if (m_bFlushingOpt && m_bFlushed && IsQueued()) {
m_bFlushing = FALSE;
m_hr = S_OK;
return;
}
}
// sync with pushing thread -- done in BeginFlush
// ensure no more data to go downstream -- done in BeginFlush
//
// Because we are synching here there is no need to hold the critical
// section (in fact we'd deadlock if we did!)
if (IsQueued()) {
m_evFlushComplete.Wait();
} else {
FreeSamples();
}
// Be daring - the caller has guaranteed no samples will arrive
// before EndFlush() returns
m_bFlushing = FALSE;
m_bFlushed = TRUE;
// call EndFlush on downstream pins
m_pPin->EndFlush();
m_hr = S_OK;
}
// COutputQueue::QueueSample
//
// private method to Send a sample to the output queue
// The critical section MUST be held when this is called
void COutputQueue::QueueSample(IMediaSample *pSample)
{
if (NULL == m_List->AddTail(pSample)) {
if (!IsSpecialSample(pSample)) {
pSample->Release();
}
}
}
//
// COutputQueue::Receive()
//
// Send a single sample by the multiple sample route
// (NOTE - this could be optimized if necessary)
//
// On return the sample will have been Release()'d
//
HRESULT COutputQueue::Receive(IMediaSample *pSample)
{
LONG nProcessed;
return ReceiveMultiple(&pSample, 1, &nProcessed);
}
//
// COutputQueue::ReceiveMultiple()
//
// Send a set of samples to the downstream pin
//
// ppSamples - array of samples
// nSamples - how many
// nSamplesProcessed - How many were processed
//
// On return all samples will have been Release()'d
//
HRESULT COutputQueue::ReceiveMultiple (
IMediaSample **ppSamples,
long nSamples,
long *nSamplesProcessed)
{
CAutoLock lck(this);
// Either call directly or queue up the samples
if (!IsQueued()) {
// If we already had a bad return code then just return
if (S_OK != m_hr) {
// If we've never received anything since the last Flush()
// and the sticky return code is not S_OK we must be
// flushing
// ((!A || B) is equivalent to A implies B)
ASSERT(!m_bFlushed || m_bFlushing);
// We're supposed to Release() them anyway!
*nSamplesProcessed = 0;
for (int i = 0; i < nSamples; i++) {
DbgLog((LOG_TRACE, 3, TEXT("COutputQueue (direct) : Discarding %d samples code 0x%8.8X"),
nSamples, m_hr));
ppSamples[i]->Release();
}
return m_hr;
}
//
// If we're flushing the sticky return code should be S_FALSE
//
ASSERT(!m_bFlushing);
m_bFlushed = FALSE;
ASSERT(m_nBatched < m_lBatchSize);
ASSERT(m_nBatched == 0 || m_bBatchExact);
// Loop processing the samples in batches
LONG iLost = 0;
for (long iDone = 0;
iDone < nSamples || (m_nBatched != 0 && m_bSendAnyway);
) {
//pragma message (REMIND("Implement threshold scheme"))
ASSERT(m_nBatched < m_lBatchSize);
if (iDone < nSamples) {
m_ppSamples[m_nBatched++] = ppSamples[iDone++];
}
if (m_nBatched == m_lBatchSize ||
nSamples == 0 && (m_bSendAnyway || !m_bBatchExact)) {
LONG nDone;
DbgLog((LOG_TRACE, 4, TEXT("Batching %d samples"),
m_nBatched));
if (m_hr == S_OK) {
m_hr = m_pInputPin->ReceiveMultiple(m_ppSamples,
m_nBatched,
&nDone);
} else {
nDone = 0;
}
iLost += m_nBatched - nDone;
for (LONG i = 0; i < m_nBatched; i++) {
m_ppSamples[i]->Release();
}
m_nBatched = 0;
}
}
*nSamplesProcessed = iDone - iLost;
if (*nSamplesProcessed < 0) {
*nSamplesProcessed = 0;
}
return m_hr;
} else {
/* We're sending to our thread */
if (m_hr != S_OK) {
*nSamplesProcessed = 0;
DbgLog((LOG_TRACE, 3, TEXT("COutputQueue (queued) : Discarding %d samples code 0x%8.8X"),
nSamples, m_hr));
for (int i = 0; i < nSamples; i++) {
ppSamples[i]->Release();
}
return m_hr;
}
m_bFlushed = FALSE;
for (long i = 0; i < nSamples; i++) {
QueueSample(ppSamples[i]);
}
*nSamplesProcessed = nSamples;
if (!m_bBatchExact ||
m_nBatched + m_List->GetCount() >= m_lBatchSize) {
NotifyThread();
}
return S_OK;
}
}
// Get ready for new data - cancels sticky m_hr
void COutputQueue::Reset()
{
if (!IsQueued()) {
m_hr = S_OK;
} else {
CAutoLock lck(this);
QueueSample(RESET_PACKET);
NotifyThread();
m_evFlushComplete.Wait();
}
}
// Remove and Release() all queued and Batched samples
void COutputQueue::FreeSamples()
{
CAutoLock lck(this);
if (IsQueued()) {
while (TRUE) {
IMediaSample *pSample = m_List->RemoveHead();
// inform derived class we took something off the queue
if (m_hEventPop) {
//DbgLog((LOG_TRACE,3,TEXT("Queue: Delivered SET EVENT")));
SetEvent(m_hEventPop);
}
if (pSample == NULL) {
break;
}
if (!IsSpecialSample(pSample)) {
pSample->Release();
} else {
if (pSample == NEW_SEGMENT) {
// Free NEW_SEGMENT packet
NewSegmentPacket *ppacket =
(NewSegmentPacket *) m_List->RemoveHead();
// inform derived class we took something off the queue
if (m_hEventPop) {
//DbgLog((LOG_TRACE,3,TEXT("Queue: Delivered SET EVENT")));
SetEvent(m_hEventPop);
}
ASSERT(ppacket != NULL);
delete ppacket;
}
}
}
}
for (int i = 0; i < m_nBatched; i++) {
m_ppSamples[i]->Release();
}
m_nBatched = 0;
}
// Notify the thread if there is something to do
//
// The critical section MUST be held when this is called
void COutputQueue::NotifyThread()
{
// Optimize - no need to signal if it's not waiting
ASSERT(IsQueued());
if (m_lWaiting) {
ReleaseSemaphore(m_hSem, m_lWaiting, NULL);
m_lWaiting = 0;
}
}
// See if there's any work to do
// Returns
// TRUE if there is nothing on the queue and nothing in the batch
// and all data has been sent
// FALSE otherwise
//
BOOL COutputQueue::IsIdle()
{
CAutoLock lck(this);
// We're idle if
// there is no thread (!IsQueued()) OR
// the thread is waiting for more work (m_lWaiting != 0)
// AND
// there's nothing in the current batch (m_nBatched == 0)
if (IsQueued() && m_lWaiting == 0 || m_nBatched != 0) {
return FALSE;
} else {
// If we're idle it shouldn't be possible for there
// to be anything on the work queue
ASSERT(!IsQueued() || m_List->GetCount() == 0);
return TRUE;
}
}
void COutputQueue::SetPopEvent(HANDLE hEvent)
{
m_hEventPop = hEvent;
}
?? 快捷鍵說明
復(fù)制代碼
Ctrl + C
搜索代碼
Ctrl + F
全屏模式
F11
切換主題
Ctrl + Shift + D
顯示快捷鍵
?
增大字號
Ctrl + =
減小字號
Ctrl + -