?? peer.cpp
字號:
event->header_.connection_id_,
event->header_.len_,
data_received + header_received));
if (Options::instance ()->enabled (Options::VERBOSE))
ACE_DEBUG ((LM_DEBUG,
"data_ = %*s\n",
event->header_.len_ - 2,
event->data_));
return data_received + header_received;
}
}
// Receive various types of input (e.g., Peer event from the gatewayd,
// as well as stdio).
int
Peer_Handler::handle_input (ACE_HANDLE sd)
{
ACE_DEBUG ((LM_DEBUG,
"in handle_input, sd = %d\n",
sd));
if (sd == ACE_STDIN) // Handle event from stdin.
return this->transmit_stdin ();
else
// Perform the appropriate action depending on the state we are
// in.
return (this->*do_action_) ();
}
// Action that receives our connection id from the Gateway.
int
Peer_Handler::await_connection_id (void)
{
ssize_t n = this->peer ().recv (&this->connection_id_,
sizeof this->connection_id_);
if (n != sizeof this->connection_id_)
{
if (n == 0)
ACE_ERROR_RETURN ((LM_ERROR,
"gatewayd has closed down unexpectedly\n"),
-1);
else
ACE_ERROR_RETURN ((LM_ERROR,
"%p, bytes received on handle %d = %d\n",
"recv",
this->get_handle (),
n),
-1);
}
else
{
this->connection_id_ = ntohl (this->connection_id_);
ACE_DEBUG ((LM_DEBUG,
"assigned connection id %d\n",
this->connection_id_));
}
// Subscribe for events if we're a Consumer.
if (Options::instance ()->enabled (Options::CONSUMER_CONNECTOR))
this->subscribe ();
// No need to disconnect by timeout.
ACE_Reactor::instance ()->cancel_timer(this);
// Transition to the action that waits for Peer events.
this->do_action_ = &Peer_Handler::await_events;
// Reset standard input.
ACE_OS::rewind (stdin);
// Call register_stdin_handler only once, until the stdin-thread
// closed which caused by transmit_stdin error.
if (first_time_)
{
// Register this handler to receive test events on stdin.
if (ACE_Event_Handler::register_stdin_handler
(this,
ACE_Reactor::instance (),
ACE_Thread_Manager::instance ()) == -1)
ACE_ERROR_RETURN ((LM_ERROR,
"(%t) %p\n",
"register_stdin_handler"),
-1);
// Next time in await_connection_id(), I'll don't call
// register_stdin_handler().
first_time_ = 0;
}
return 0;
}
int
Peer_Handler::subscribe (void)
{
ACE_Message_Block *mb;
ACE_NEW_RETURN (mb,
ACE_Message_Block (sizeof (Event)),
-1);
Subscription *subscription =
(Subscription *) ((Event *) mb->rd_ptr ())->data_;
subscription->connection_id_ =
Options::instance ()->connection_id ();
return this->transmit (mb, sizeof *subscription, SUBSCRIPTION_EVENT);
}
// Action that receives events from the Gateway.
int
Peer_Handler::await_events (void)
{
ACE_Message_Block *mb = 0;
ssize_t n = this->recv (mb);
switch (n)
{
case 0:
ACE_ERROR_RETURN ((LM_ERROR,
"gatewayd has closed down\n"),
-1);
/* NOTREACHED */
case -1:
if (errno == EWOULDBLOCK)
// A short-read, we'll come back and finish it up later on!
return 0;
else
ACE_ERROR_RETURN ((LM_ERROR,
"%p\n",
"recv"),
-1);
/* NOTREACHED */
default:
{
// We got a valid event, so let's process it now! At the
// moment, we just print out the event contents...
Event *event = (Event *) mb->rd_ptr ();
this->total_bytes_ += mb->length ();
ACE_DEBUG ((LM_DEBUG,
"route id = %d, cur len = %d, total len = %d\n",
event->header_.connection_id_,
event->header_.len_,
this->total_bytes_));
if (Options::instance ()->enabled (Options::VERBOSE))
ACE_DEBUG ((LM_DEBUG,
"data_ = %*s\n",
event->header_.len_ - 2,
event->data_));
mb->release ();
return 0;
}
}
}
// Periodically send events via ACE_Reactor timer mechanism.
int
Peer_Handler::handle_timeout (const ACE_Time_Value &,
const void *)
{
// Shut down the handler.
return this->handle_close ();
}
Peer_Handler::~Peer_Handler (void)
{
// Shut down the handler.
this->handle_close ();
}
// Handle shutdown of the Peer object.
int
Peer_Handler::handle_close (ACE_HANDLE,
ACE_Reactor_Mask)
{
if (this->get_handle () != ACE_INVALID_HANDLE)
{
ACE_DEBUG ((LM_DEBUG,
"shutting down Peer on handle %d\n",
this->get_handle ()));
ACE_Reactor_Mask mask =
ACE_Event_Handler::DONT_CALL | ACE_Event_Handler::READ_MASK;
// Explicitly remove ourselves for ACE_STDIN (the <ACE_Reactor>
// removes the HANDLE. Note that <ACE_Event_Handler::DONT_CALL>
// instructs the ACE_Reactor *not* to call <handle_close>, which
// would otherwise lead to infinite recursion!).
ACE_Reactor::instance ()->remove_handler
(ACE_STDIN, mask);
// Deregister this handler with the ACE_Reactor.
if (ACE_Reactor::instance ()->remove_handler
(this, mask) == -1)
ACE_ERROR_RETURN ((LM_ERROR,
"handle = %d: %p\n",
this->get_handle (),
"remove_handler"),
-1);
// Close down the peer.
this->peer ().close ();
}
return 0;
}
int
Peer_Acceptor::start (u_short port)
{
// This object only gets allocated once and is just recycled
// forever.
ACE_NEW_RETURN (peer_handler_, Peer_Handler, -1);
this->addr_.set (port);
ACE_DEBUG ((LM_DEBUG,
"opening acceptor at port %d\n",
port));
// Call down to the <Acceptor::open> method.
if (this->inherited::open (this->addr_) == -1)
ACE_ERROR_RETURN ((LM_ERROR,
"%p\n",
"open"),
-1);
else if (this->acceptor ().get_local_addr (this->addr_) == -1)
ACE_ERROR_RETURN ((LM_ERROR,
"%p\n",
"get_local_addr"),
-1);
else
ACE_DEBUG ((LM_DEBUG,
"accepting at port %d\n",
this->addr_.get_port_number ()));
return 0;
}
Peer_Acceptor::Peer_Acceptor (void)
: peer_handler_ (0)
{
}
int
Peer_Acceptor::close (void)
{
// Will trigger a delete.
if (this->peer_handler_ != 0)
this->peer_handler_->destroy ();
// Close down the base class.
return this->inherited::close ();
}
// Note how this method just passes back the pre-allocated
// <Peer_Handler> instead of having the <ACE_Acceptor> allocate a new
// one each time!
int
Peer_Acceptor::make_svc_handler (Peer_Handler *&sh)
{
sh = this->peer_handler_;
return 0;
}
int
Peer_Connector::open_connector (Peer_Handler *&peer_handler,
u_short port)
{
// This object only gets allocated once and is just recycled
// forever.
ACE_NEW_RETURN (peer_handler,
Peer_Handler,
-1);
ACE_INET_Addr addr (port,
Options::instance ()->connector_host ());
ACE_DEBUG ((LM_DEBUG,
"connecting to %s:%d\n",
addr.get_host_name (),
addr.get_port_number ()));
if (this->connect (peer_handler, addr) == -1)
ACE_ERROR_RETURN ((LM_ERROR,
"%p\n",
"connect"),
-1);
else
ACE_DEBUG ((LM_DEBUG,
"connected to %s:%d\n",
addr.get_host_name (),
addr.get_port_number ()));
return 0;
}
int
Peer_Connector::open (ACE_Reactor *, int)
{
this->supplier_peer_handler_ = 0;
this->consumer_peer_handler_ = 0;
if (Options::instance ()->enabled (Options::SUPPLIER_CONNECTOR)
&& this->open_connector (this->supplier_peer_handler_,
Options::instance ()->supplier_connector_port ()) == -1)
return -1;
if (Options::instance ()->enabled (Options::CONSUMER_CONNECTOR)
&& this->open_connector (this->consumer_peer_handler_,
Options::instance ()->consumer_connector_port ()) == -1)
return -1;
return 0;
}
int
Peer_Factory::handle_signal (int signum, siginfo_t *, ucontext_t *)
{
if (signum != SIGPIPE)
{
// Shut down the main event loop.
ACE_DEBUG((LM_NOTICE, "Exit case signal\n")); // Why do I exit?
ACE_Reactor::end_event_loop();
}
return 0;
}
// Returns information on the currently active service.
int
Peer_Factory::info (char **strp, size_t length) const
{
char buf[BUFSIZ];
char consumer_addr_str[BUFSIZ];
char supplier_addr_str[BUFSIZ];
ACE_INET_Addr addr;
if (this->consumer_acceptor_.acceptor ().get_local_addr (addr) == -1)
return -1;
else if (addr.addr_to_string (consumer_addr_str,
sizeof addr) == -1)
return -1;
else if (this->supplier_acceptor_.acceptor ().get_local_addr (addr) == -1)
return -1;
else if (addr.addr_to_string (supplier_addr_str,
sizeof addr) == -1)
return -1;
ACE_OS::sprintf (buf,
"%s\t C:%s|S:%s/%s %s",
"peerd",
consumer_addr_str,
supplier_addr_str,
"tcp",
"# Gateway traffic generator and data sink\n");
if (*strp == 0 && (*strp = ACE_OS::strdup (buf)) == 0)
return -1;
else
ACE_OS::strncpy (*strp, buf, length);
return ACE_OS::strlen (buf);
}
// Hook called by the explicit dynamic linking facility to terminate
// the peer.
int
Peer_Factory::fini (void)
{
this->consumer_acceptor_.close ();
this->supplier_acceptor_.close ();
return 0;
}
// Hook called by the explicit dynamic linking facility to initialize
// the peer.
int
Peer_Factory::init (int argc, char *argv[])
{
Options::instance ()->parse_args (argc, argv);
ACE_Sig_Set sig_set;
sig_set.sig_add (SIGINT);
sig_set.sig_add (SIGQUIT);
sig_set.sig_add (SIGPIPE);
// Register ourselves to receive signals so we can shut down
// gracefully.
if (ACE_Reactor::instance ()->register_handler (sig_set,
this) == -1)
ACE_ERROR_RETURN ((LM_ERROR,
"%p\n",
"register_handler"),
-1);
if (Options::instance ()->enabled (Options::SUPPLIER_ACCEPTOR)
&& this->supplier_acceptor_.start
(Options::instance ()->supplier_acceptor_port ()) == -1)
ACE_ERROR_RETURN ((LM_ERROR,
"%p\n",
"Acceptor::open"),
-1);
else if (Options::instance ()->enabled (Options::CONSUMER_ACCEPTOR)
&& this->consumer_acceptor_.start
(Options::instance ()->consumer_acceptor_port ()) == -1)
ACE_ERROR_RETURN ((LM_ERROR,
"%p\n",
"Acceptor::open"),
-1);
else if (this->connector_.open () == -1)
ACE_ERROR_RETURN ((LM_ERROR,
"%p\n",
"Connector::open"),
-1);
return 0;
}
// The following is a "Factory" used by the <ACE_Service_Config> and
// svc.conf file to dynamically initialize the <Peer_Acceptor> and
// <Peer_Connector>.
ACE_SVC_FACTORY_DEFINE (Peer_Factory)
#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)
template class ACE_Acceptor<Peer_Handler, ACE_SOCK_ACCEPTOR>;
template class ACE_Connector<Peer_Handler, ACE_SOCK_CONNECTOR>;
template class ACE_Map_Manager<ACE_HANDLE, ACE_Svc_Tuple<Peer_Handler> *, ACE_SYNCH_RW_MUTEX>;
template class ACE_Map_Iterator<ACE_HANDLE, ACE_Svc_Tuple<Peer_Handler> *, ACE_SYNCH_RW_MUTEX>;
template class ACE_Map_Reverse_Iterator<ACE_HANDLE, ACE_Svc_Tuple<Peer_Handler> *, ACE_SYNCH_RW_MUTEX>;
template class ACE_Map_Iterator_Base<ACE_HANDLE, ACE_Svc_Tuple<Peer_Handler> *, ACE_SYNCH_RW_MUTEX>;
template class ACE_Map_Entry<ACE_HANDLE, ACE_Svc_Tuple<Peer_Handler> *>;
template class ACE_Svc_Handler<ACE_SOCK_STREAM, ACE_NULL_SYNCH>;
template class ACE_Svc_Tuple<Peer_Handler>;
#elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA)
#pragma instantiate ACE_Acceptor<Peer_Handler, ACE_SOCK_ACCEPTOR>
#pragma instantiate ACE_Connector<Peer_Handler, ACE_SOCK_CONNECTOR>
#pragma instantiate ACE_Map_Manager<ACE_HANDLE, ACE_Svc_Tuple<Peer_Handler> *, ACE_SYNCH_RW_MUTEX>
#pragma instantiate ACE_Map_Iterator_Base<ACE_HANDLE, ACE_Svc_Tuple<Peer_Handler> *, ACE_SYNCH_RW_MUTEX>
#pragma instantiate ACE_Map_Iterator<ACE_HANDLE, ACE_Svc_Tuple<Peer_Handler> *, ACE_SYNCH_RW_MUTEX>
#pragma instantiate ACE_Map_Reverse_Iterator<ACE_HANDLE, ACE_Svc_Tuple<Peer_Handler> *, ACE_SYNCH_RW_MUTEX>
#pragma instantiate ACE_Map_Entry<ACE_HANDLE, ACE_Svc_Tuple<Peer_Handler> *>
#pragma instantiate ACE_Svc_Handler<ACE_SOCK_STREAM, ACE_NULL_SYNCH>
#pragma instantiate ACE_Svc_Tuple<Peer_Handler>
#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */
?? 快捷鍵說明
復制代碼
Ctrl + C
搜索代碼
Ctrl + F
全屏模式
F11
切換主題
Ctrl + Shift + D
顯示快捷鍵
?
增大字號
Ctrl + =
減小字號
Ctrl + -