summaryrefslogtreecommitdiff
path: root/apps/Gateway/Gateway/Event_Channel.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'apps/Gateway/Gateway/Event_Channel.cpp')
-rw-r--r--apps/Gateway/Gateway/Event_Channel.cpp548
1 files changed, 0 insertions, 548 deletions
diff --git a/apps/Gateway/Gateway/Event_Channel.cpp b/apps/Gateway/Gateway/Event_Channel.cpp
deleted file mode 100644
index 3204b371ee7..00000000000
--- a/apps/Gateway/Gateway/Event_Channel.cpp
+++ /dev/null
@@ -1,548 +0,0 @@
-// $Id$
-
-#define ACE_BUILD_SVC_DLL
-
-#include "Connection_Handler_Connector.h"
-#include "Event_Channel.h"
-
-ACE_RCSID(Gateway, Event_Channel, "$Id$")
-
-Event_Channel::~Event_Channel (void)
-{
-}
-
-Event_Channel::Event_Channel (void)
- : supplier_acceptor_ (*this, 'S'),
- consumer_acceptor_ (*this, 'C')
-{
-}
-
-int
-Event_Channel::compute_performance_statistics (void)
-{
- ACE_DEBUG ((LM_DEBUG, "(%t) doing the performance timeout here...\n"));
- CONNECTION_MAP_ITERATOR cmi (this->connection_map_);
-
- // If we've got a <ACE_Thread_Manager> then use it to suspend all
- // the threads. This will enable us to get an accurate count.
-
- if (Options::instance ()->threading_strategy ()
- != Options::REACTIVE)
- {
- if (ACE_Thread_Manager::instance ()->suspend_all () == -1)
- ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "suspend_all"), -1);
- ACE_DEBUG ((LM_DEBUG, "(%t) suspending all threads..."));
- }
-
- size_t total_bytes_in = 0;
- size_t total_bytes_out = 0;
-
- // Iterate through the connection map summing up the number of bytes
- // sent/received.
-
- for (CONNECTION_MAP_ENTRY *me = 0;
- cmi.next (me) != 0;
- cmi.advance ())
- {
- Connection_Handler *connection_handler = me->int_id_;
-
- if (connection_handler->connection_role () == 'C')
- total_bytes_out += connection_handler->total_bytes ();
- else // connection_handler->connection_role () == 'S'
- total_bytes_in += connection_handler->total_bytes ();
- }
-
- ACE_DEBUG ((LM_DEBUG,
- "(%t) after %d seconds, \ntotal_bytes_in = %d\ntotal_bytes_out = %d\n",
- Options::instance ()->performance_window (),
- total_bytes_in,
- total_bytes_out));
-
- ACE_DEBUG ((LM_DEBUG,
- "(%t) %f Mbits/sec received.\n",
- (float) (total_bytes_in * 8 /
- (float) (1024 * 1024 * Options::instance ()->performance_window ()))));
-
- ACE_DEBUG ((LM_DEBUG,
- "(%t) %f Mbits/sec sent.\n",
- (float) (total_bytes_out * 8 /
- (float) (1024 * 1024 * Options::instance ()->performance_window ()))));
-
- // Resume all the threads again.
-
- if (Options::instance ()->threading_strategy ()
- != Options::REACTIVE)
- {
- if (ACE_Thread_Manager::instance ()->resume_all () == -1)
- ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "resume_all"), -1);
- ACE_DEBUG ((LM_DEBUG, "(%t) resuming all threads..."));
- }
-
- return 0;
-}
-
-int
-Event_Channel::handle_timeout (const ACE_Time_Value &,
- const void *)
-{
- // This is called periodically to compute performance statistics.
- return this->compute_performance_statistics ();
-}
-
-int
-Event_Channel::put (ACE_Message_Block *event,
- ACE_Time_Value *)
-{
- // We got a valid event, so determine its type, which is stored in
- // the first of the two <ACE_Message_Block>s, which are chained
- // together by <ACE::recv>.
- Event_Key *event_key = (Event_Key *) event->rd_ptr ();
-
- // Skip over the address portion and get the data, which is in the
- // second <ACE_Message_Block>.
- ACE_Message_Block *data = event->cont ();
-
- switch (event_key->type_)
- {
- case ROUTING_EVENT:
- this->routing_event (event_key,
- data);
- break;
- case SUBSCRIPTION_EVENT:
- this->subscription_event (data);
- break;
- }
-
- // Release the memory in the message block.
- event->release ();
- return 0;
-}
-
-void
-Event_Channel::subscription_event (ACE_Message_Block *data)
-{
- Event *event = (Event *) data->rd_ptr ();
-
- ACE_DEBUG ((LM_DEBUG,
- "(%t) received a subscription with %d bytes from connection id %d\n",
- event->header_.len_,
- event->header_.connection_id_));
- Subscription *subscription = (Subscription *) event->data_;
- // Convert the subscription into host byte order so that we can
- // access it directly without having to repeatedly muck with it...
- subscription->decode ();
-
- ACE_DEBUG ((LM_DEBUG,
- "(%t) connection_id_ = %d, total_consumers_ = %d\n",
- subscription->connection_id_,
- subscription->total_consumers_));
-
- for (ACE_INT32 i = 0;
- i < subscription->total_consumers_;
- i++)
- ACE_DEBUG ((LM_DEBUG,
- "(%t) consumers_[%d] = %d\n",
- i,
- subscription->consumers_[i]));
-
-}
-
-void
-Event_Channel::routing_event (Event_Key *forwarding_address,
- ACE_Message_Block *data)
-{
- Consumer_Dispatch_Set *dispatch_set = 0;
-
- // Initialize the <dispatch_set> to points to the set of Consumers
- // associated with this forwarding address.
-
- if (this->efd_.find (*forwarding_address,
- dispatch_set) == -1)
- // Failure.
- ACE_ERROR ((LM_DEBUG,
- "(%t) find failed on connection id = %d, type = %d\n",
- forwarding_address->connection_id_,
- forwarding_address->type_));
- else
- {
- // Check to see if there are any consumers.
- if (dispatch_set->size () == 0)
- ACE_DEBUG ((LM_WARNING,
- "there are no active consumers for this event currently\n"));
-
- else // There are consumers, so forward the event.
- {
- // Initialize the interator.
- Consumer_Dispatch_Set_Iterator dsi (*dispatch_set);
-
- // At this point, we should assign a thread-safe locking
- // strategy to the <ACE_Message_Block> is we're running in a
- // multi-threaded configuration.
- data->locking_strategy (Options::instance ()->locking_strategy ());
-
- for (Connection_Handler **connection_handler = 0;
- dsi.next (connection_handler) != 0;
- dsi.advance ())
- {
- // Only process active connection_handlers.
- if ((*connection_handler)->state () == Connection_Handler::ESTABLISHED)
- {
- // Duplicate the event portion via reference
- // counting.
- ACE_Message_Block *dup_msg = data->duplicate ();
-
- ACE_DEBUG ((LM_DEBUG, "(%t) forwarding to Consumer %d\n",
- (*connection_handler)->connection_id ()));
-
- if ((*connection_handler)->put (dup_msg) == -1)
- {
- if (errno == EWOULDBLOCK) // The queue has filled up!
- ACE_ERROR ((LM_ERROR, "(%t) %p\n",
- "gateway is flow controlled, so we're dropping events"));
- else
- ACE_ERROR ((LM_ERROR,
- "(%t) %p transmission error to peer %d\n",
- "put",
- (*connection_handler)->connection_id ()));
-
- // We are responsible for releasing an
- // ACE_Message_Block if failures occur.
- dup_msg->release ();
- }
- }
- }
- }
- }
-}
-
-int
-Event_Channel::initiate_connection_connection (Connection_Handler *connection_handler)
-{
- ACE_Synch_Options synch_options;
-
- if (Options::instance ()->blocking_semantics () == ACE_NONBLOCK)
- synch_options = ACE_Synch_Options::asynch;
- else
- synch_options = ACE_Synch_Options::synch;
-
- return this->connector_.initiate_connection (connection_handler,
- synch_options);
-}
-
-int
-Event_Channel::complete_connection_connection (Connection_Handler *connection_handler)
-{
- int option = connection_handler->connection_role () == 'S' ? SO_RCVBUF : SO_SNDBUF;
- int socket_queue_size = Options::instance ()->socket_queue_size ();
-
- if (socket_queue_size > 0)
- if (connection_handler->peer ().set_option (SOL_SOCKET,
- option,
- &socket_queue_size,
- sizeof (int)) == -1)
- ACE_ERROR ((LM_ERROR, "(%t) %p\n", "set_option"));
-
- connection_handler->thr_mgr (ACE_Thread_Manager::instance ());
-
- // Our state is now "established."
- connection_handler->state (Connection_Handler::ESTABLISHED);
-
- // Restart the timeout to 1.
- connection_handler->timeout (1);
-
- ACE_INT32 id = htonl (connection_handler->connection_id ());
-
- // Send the connection id to the peerd.
-
- ssize_t n = connection_handler->peer ().send ((const void *) &id, sizeof id);
-
- if (n != sizeof id)
- ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n",
- n == 0 ? "peer has closed down unexpectedly" : "send"),
- -1);
- return 0;
-}
-
-// Restart connection (blocking_semantics dicates whether we restart
-// synchronously or asynchronously).
-
-int
-Event_Channel::reinitiate_connection_connection (Connection_Handler *connection_handler)
-{
- // Skip over proxies with deactivated handles.
- if (connection_handler->get_handle () != ACE_INVALID_HANDLE)
- // Make sure to close down peer to reclaim descriptor.
- connection_handler->peer ().close ();
-
- if (connection_handler->state () != Connection_Handler::DISCONNECTING)
- {
- ACE_DEBUG ((LM_DEBUG,
- "(%t) scheduling reinitiation of Connection_Handler %d\n",
- connection_handler->connection_id ()));
-
- // Reschedule ourselves to try and connect again.
- if (ACE_Reactor::instance ()->schedule_timer
- (connection_handler, 0, connection_handler->timeout ()) == -1)
- ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n",
- "schedule_timer"), -1);
- }
- return 0;
-}
-
-// Initiate active connections with the Consumer and Supplier Peers.
-
-void
-Event_Channel::initiate_connector (void)
-{
- if (Options::instance ()->enabled
- (Options::CONSUMER_CONNECTOR | Options::SUPPLIER_CONNECTOR))
- {
- CONNECTION_MAP_ITERATOR cmi (this->connection_map_);
-
- // Iterate through the Consumer Map connecting all the
- // Connection_Handlers.
-
- for (CONNECTION_MAP_ENTRY *me = 0;
- cmi.next (me) != 0;
- cmi.advance ())
- {
- Connection_Handler *connection_handler = me->int_id_;
-
- if (this->initiate_connection_connection (connection_handler) == -1)
- continue; // Failures are handled elsewhere...
- }
- }
-}
-
-// Initiate passive acceptor to wait for Consumer and Supplier Peers
-// to accept.
-
-int
-Event_Channel::initiate_acceptors (void)
-{
- if (Options::instance ()->enabled (Options::CONSUMER_ACCEPTOR)
- && this->consumer_acceptor_.open
- (Options::instance ()->consumer_acceptor_port (),
- ACE_Reactor::instance (),
- Options::instance ()->blocking_semantics ()) == -1)
- ACE_ERROR_RETURN ((LM_ERROR,
- "%p\n",
- "cannot register acceptor"),
- -1);
- else
- ACE_DEBUG ((LM_DEBUG,
- "accepting Consumers at %d\n",
- Options::instance ()->consumer_acceptor_port ()));
-
- if (Options::instance ()->enabled (Options::SUPPLIER_ACCEPTOR)
- && this->supplier_acceptor_.open
- (Options::instance ()->supplier_acceptor_port (),
- ACE_Reactor::instance (),
- Options::instance ()->blocking_semantics ()) == -1)
- ACE_ERROR_RETURN ((LM_ERROR,
- "%p\n",
- "cannot register acceptor"),
- -1);
- else
- ACE_DEBUG ((LM_DEBUG,
- "accepting Suppliers at %d\n",
- Options::instance ()->supplier_acceptor_port ()));
-
- return 0;
-}
-
-// This method gracefully shuts down all the Handlers in the
-// Connection_Handler Connection Map.
-
-int
-Event_Channel::close (u_long)
-{
- if (Options::instance ()->threading_strategy () != Options::REACTIVE)
- {
- if (ACE_Thread_Manager::instance ()->suspend_all () == -1)
- ACE_ERROR_RETURN ((LM_ERROR,
- "(%t) %p\n",
- "suspend_all"),
- -1);
- ACE_DEBUG ((LM_DEBUG,
- "(%t) suspending all threads\n"));
- }
-
- // First tell everyone that the spaceship is here...
- {
- CONNECTION_MAP_ITERATOR cmi (this->connection_map_);
-
- // Iterate over all the handlers and shut them down.
-
- for (CONNECTION_MAP_ENTRY *me;
- cmi.next (me) != 0;
- cmi.advance ())
- {
- Connection_Handler *connection_handler = me->int_id_;
-
- ACE_DEBUG ((LM_DEBUG,
- "(%t) closing down connection %d\n",
- connection_handler->connection_id ()));
-
- // Mark Connection_Handler as DISCONNECTING so we don't try to
- // reconnect...
- connection_handler->state (Connection_Handler::DISCONNECTING);
- }
- }
-
- // Close down the connector
- this->connector_.close ();
-
- // Close down the supplier acceptor.
- this->supplier_acceptor_.close ();
-
- // Close down the consumer acceptor.
- this->consumer_acceptor_.close ();
-
- // Now tell everyone that it is now time to commit suicide.
- {
- CONNECTION_MAP_ITERATOR cmi (this->connection_map_);
-
- for (CONNECTION_MAP_ENTRY *me;
- cmi.next (me) != 0;
- cmi.advance ())
- {
- Connection_Handler *connection_handler = me->int_id_;
-
- // Deallocate Connection_Handler resources.
- connection_handler->destroy (); // Will trigger a delete.
- }
- }
-
- return 0;
-}
-
-int
-Event_Channel::find_proxy (ACE_INT32 connection_id,
- Connection_Handler *&connection_handler)
-{
- return this->connection_map_.find (connection_id,
- connection_handler);
-}
-
-int
-Event_Channel::bind_proxy (Connection_Handler *connection_handler)
-{
- int result = this->connection_map_.bind (connection_handler->connection_id (),
- connection_handler);
-
- switch (result)
- {
- case -1:
- ACE_ERROR_RETURN ((LM_ERROR,
- "(%t) bind failed for connection %d\n",
- connection_handler->connection_id ()),
- -1);
- /* NOTREACHED */
- case 1: // Oops, found a duplicate!
- ACE_ERROR_RETURN ((LM_ERROR,
- "(%t) duplicate connection %d, already bound\n",
- connection_handler->connection_id ()),
- -1);
- /* NOTREACHED */
- case 0:
- // Success.
- return 0;
- /* NOTREACHED */
- default:
- ACE_ERROR_RETURN ((LM_DEBUG,
- "(%t) invalid result %d\n",
- result),
- -1);
- /* NOTREACHED */
- }
-
- ACE_NOTREACHED (return 0);
-}
-
-int
-Event_Channel::subscribe (const Event_Key &event_addr,
- Consumer_Dispatch_Set *cds)
-{
- int result = this->efd_.bind (event_addr, cds);
-
- // Bind with consumer map, keyed by peer address.
- switch (result)
- {
- case -1:
- ACE_ERROR_RETURN ((LM_ERROR,
- "(%t) bind failed for connection %d\n",
- event_addr.connection_id_),
- -1);
- /* NOTREACHED */
- case 1: // Oops, found a duplicate!
- ACE_ERROR_RETURN ((LM_DEBUG,
- "(%t) duplicate consumer map entry %d, "
- "already bound\n",
- event_addr.connection_id_),
- -1);
- /* NOTREACHED */
- case 0:
- // Success.
- return 0;
- default:
- ACE_ERROR_RETURN ((LM_DEBUG,
- "(%t) invalid result %d\n",
- result),
- -1);
- /* NOTREACHED */
- }
-
- ACE_NOTREACHED (return 0);
-}
-
-int
-Event_Channel::open (void *)
-{
- // Ignore <SIGPIPE> so each <Consumer_Handler> can handle it.
- ACE_Sig_Action sig ((ACE_SignalHandler) SIG_IGN, SIGPIPE);
- ACE_UNUSED_ARG (sig);
-
- // Actively initiate Peer connections.
- this->initiate_connector ();
-
- // Passively initiate Peer acceptor.
- if (this->initiate_acceptors () == -1)
- return -1;
-
- // If we're not running reactively, then we need to make sure that
- // <ACE_Message_Block> reference counting operations are
- // thread-safe. Therefore, we create an <ACE_Lock_Adapter> that is
- // parameterized by <ACE_SYNCH_MUTEX> to prevent race conditions.
- if (Options::instance ()->threading_strategy ()
- != Options::REACTIVE)
- {
- ACE_Lock_Adapter<ACE_SYNCH_MUTEX> *la;
-
- ACE_NEW_RETURN (la,
- ACE_Lock_Adapter<ACE_SYNCH_MUTEX>,
- -1);
-
- Options::instance ()->locking_strategy (la);
- }
-
- return 0;
-}
-
-#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)
-template class ACE_Lock_Adapter<ACE_SYNCH_MUTEX>;
-template class ACE_Map_Entry<ACE_INT32, Connection_Handler *>;
-template class ACE_Map_Iterator<ACE_INT32, Connection_Handler *, MAP_MUTEX>;
-template class ACE_Map_Reverse_Iterator<ACE_INT32, Connection_Handler *, MAP_MUTEX>;
-template class ACE_Map_Iterator_Base<ACE_INT32, Connection_Handler *, MAP_MUTEX>;
-template class ACE_Map_Manager<ACE_INT32, Connection_Handler *, MAP_MUTEX>;
-template class ACE_Unbounded_Set_Iterator<Connection_Handler *>;
-#elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA)
-#pragma instantiate ACE_Lock_Adapter<ACE_SYNCH_MUTEX>
-#pragma instantiate ACE_Map_Entry<ACE_INT32, Connection_Handler *>
-#pragma instantiate ACE_Map_Iterator<ACE_INT32, Connection_Handler *, MAP_MUTEX>
-#pragma instantiate ACE_Map_Reverse_Iterator<ACE_INT32, Connection_Handler *, MAP_MUTEX>
-#pragma instantiate ACE_Map_Iterator_Base<ACE_INT32, Connection_Handler *, MAP_MUTEX>
-#pragma instantiate ACE_Map_Manager<ACE_INT32, Connection_Handler *, MAP_MUTEX>
-#pragma instantiate ACE_Unbounded_Set_Iterator<Connection_Handler *>
-#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */