?? task.cpp
字號:
// Task.cpp,v 1.5 2001/12/24 20:51:02 schmidt Exp
// Tutorial regarding a way to use ACE_Stream.
//
// written by bob mcwhirter (bob@netwrench.com)
#include <ace/Message_Block.h>
#include "Task.h"
Task::Task (const char * nameOfTask,
int numberOfThreads)
: d_numberOfThreads (numberOfThreads),
d_barrier (numberOfThreads)
{
// Just initialize our name, number of threads, and barrier.
ACE_OS::strcpy (d_nameOfTask, nameOfTask);
}
Task::~Task (void)
{
ACE_DEBUG ((LM_DEBUG, " (%P|%t) %s Task::~Task () -- once per Task\n", d_nameOfTask));
}
int Task::open (void *arg)
{
ACE_UNUSED_ARG (arg);
ACE_DEBUG ((LM_DEBUG, " (%P|%t) %s Task::open () -- once per Task\n", d_nameOfTask));
// call ACE_Task::activate () to spawn the threads using
// our Task::svc () as the function to be run.
// No need to use THR_DETACHED here, we're going to wait ()
// for the threads to exit later. No leaks.
return this->activate (THR_NEW_LWP, d_numberOfThreads);
}
int Task::put (ACE_Message_Block *message,
ACE_Time_Value *timeout)
{
// ACE_Stream uses the put () method of Tasks to send messages.
// This defaultly does nothing. Here we link our put () method
// directly to our putq () method, so that Messages put () to us
// will appear in the Message_Queue that is checked by the
// service threads.
return this->putq (message, timeout);
}
int Task::close (u_long flags)
{
// When the Stream closes the Module, the Module then close ()'s the Task
// and passing a value of (1) as the flag.
// When a service thread exits, it calls close () with a value that is not
// (1).
// We use this fact to tell the difference between closing a service thread,
// and closing the main Task itself.
if (flags == 1) {
// The Module has asked to close the main Task.
ACE_DEBUG ((LM_DEBUG, " (%P|%t) %s Task::close () -- flags == 1 -- once per Task\n", d_nameOfTask));
// We create a Message_Block of type MB_HANGUP.
ACE_Message_Block *hangupBlock;
// Note the use of the lock_adapter () to ensure proper serialization.
ACE_NEW_RETURN (hangupBlock,
ACE_Message_Block (0,
ACE_Message_Block::MB_HANGUP,
0,
0,
0,
Task::lock_adapter ()),
-1);
// We then send this Block into the Message_Queue to be seen by the
// service threads.
// Once again we duplicate () the Block as send it off...
if (this->putq (hangupBlock->duplicate ()) == -1) {
ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "Task::close () putq"), -1);
}
// ..and we're free to release () our copy of it.
hangupBlock->release ();
// Now, all we have to do is wait () for the service threads to all
// exit. This is where using THR_DETACHED in the activate () method
// will come back to haunt you.
// The Stream waits until this returns before attempting to remove
// the next Module/Task group in the Stream. This allows for an
// orderly shutting down of the Stream.
return this->wait ();
} else {
ACE_DEBUG ((LM_DEBUG, " (%P|%t) %s Task::close () -- flags != 1 -- once per servicing thread\n", d_nameOfTask));
// This is where we can clean up any mess left over by each service thread.
// In this Task, there is nothing to do.
}
return 0;
}
int Task::svc (void)
{
// This is the function that our service threads run once they are spawned.
ACE_DEBUG ((LM_DEBUG, " (%P|%t) %s Task::svc () -- once per servicing thread\n", d_nameOfTask));
// First, we wait until all of our peer service threads have arrived
// at this point also.
d_barrier.wait ();
ACE_Message_Block *messageBlock;
while (1) {
// And now we loop almost infinitely.
// getq () will block until a Message_Block is available to be read,
// or an error occurs.
if ( this->getq (messageBlock, 0) == -1) {
ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "Task::svc () getq"), -1);
}
if (messageBlock->msg_type () == ACE_Message_Block::MB_HANGUP) {
// If the Message_Block is of type MB_HANGUP, then we're being asked
// to shut down nicely.
ACE_DEBUG ((LM_DEBUG, " (%P|%t) %s Task::svc () -- HANGUP block received\n", d_nameOfTask));
// So, we duplicate the Block, and put it back into the Message_Queue,
// in case there are some more peer service threads still running.
if (this->putq (messageBlock->duplicate ()) == -1) {
ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "Task::svc () putq"), -1);
}
// We release our copy of the Block.
messageBlock->release ();
// And we break out of the nearly infinitely loop, and
// head towards close () ourselves.
break;
}
// If we're here, then we've received a Message_Block that was
// not informing us to quit, so we're assuming it's a valid
// meaningful Block.
ACE_DEBUG ((LM_DEBUG, " (%P|%t) %s Task::svc () -- Normal block received\n", d_nameOfTask));
// We grab the read-pointer from the Block, and display it through a DEBUG statement.
ACE_DEBUG ((LM_DEBUG, " (%P|%t) %s Task::svc () -- %s\n", d_nameOfTask, messageBlock->rd_ptr () ));
// We pretend that this takes to time to process the Block.
// If you're on a fast machine, you might have to raise this
// value to actually witness different threads handling
// blocks for each Task.
ACE_OS::sleep (ACE_Time_Value (0, 250));
// Since we're part of a Stream, we duplicate the Block, and
// send it on to the next Task.
if (put_next (messageBlock->duplicate ()) == -1) {
ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "Task::svc () put_next"), -1);
}
// And then we release our copy of it.
messageBlock->release ();
}
return 0;
}
const char *Task::nameOfTask (void) const
{
return d_nameOfTask;
}
ACE_Lock_Adapter<ACE_SYNCH_MUTEX> *Task::lock_adapter (void)
{
return &lock_adapter_;
}
// Static definition.
ACE_Lock_Adapter<ACE_SYNCH_MUTEX> Task::lock_adapter_;
#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)
template class ACE_Lock_Adapter <ACE_SYNCH_MUTEX>;
#elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA)
#pragma instantiate ACE_Lock_Adapter <ACE_SYNCH_MUTEX>;
#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */
?? 快捷鍵說明
復制代碼
Ctrl + C
搜索代碼
Ctrl + F
全屏模式
F11
切換主題
Ctrl + Shift + D
顯示快捷鍵
?
增大字號
Ctrl + =
減小字號
Ctrl + -