summaryrefslogtreecommitdiff
path: root/apps/Gateway/Gateway/Event_Channel.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'apps/Gateway/Gateway/Event_Channel.cpp')
-rw-r--r--apps/Gateway/Gateway/Event_Channel.cpp498
1 files changed, 235 insertions, 263 deletions
diff --git a/apps/Gateway/Gateway/Event_Channel.cpp b/apps/Gateway/Gateway/Event_Channel.cpp
index d146ddfb362..02f2cd465f8 100644
--- a/apps/Gateway/Gateway/Event_Channel.cpp
+++ b/apps/Gateway/Gateway/Event_Channel.cpp
@@ -2,38 +2,35 @@
// $Id$
#define ACE_BUILD_SVC_DLL
-#include "ace/Get_Opt.h"
-#include "Config_Files.h"
#include "Proxy_Handler_Connector.h"
#include "Event_Channel.h"
-#if !defined (ACE_EVENT_CHANNEL_C)
-#define ACE_EVENT_CHANNEL_C
+ACE_Event_Channel_Options::ACE_Event_Channel_Options (void)
+ : performance_window_ (0),
+ blocking_semantics_ (ACE_NONBLOCK),
+ socket_queue_size_ (0)
+{
+}
-template <class SH, class CH>
-ACE_Event_Channel<SH, CH>::~ACE_Event_Channel (void)
+ACE_Event_Channel::~ACE_Event_Channel (void)
{
}
-template <class SH, class CH>
-ACE_Event_Channel<SH, CH>::ACE_Event_Channel (void)
- : connection_config_file_ ("connection_config"),
- consumer_config_file_ ("consumer_config"),
- active_connector_role_ (1),
- performance_window_ (0),
- blocking_semantics_ (ACE_NONBLOCK),
- debug_ (0),
- connector_ (0),
- socket_queue_size_ (0)
+ACE_Event_Channel::ACE_Event_Channel (void)
+{
+}
+
+ACE_Event_Channel_Options &
+ACE_Event_Channel::options (void)
{
+ return this->options_;
}
-template <class SH, class CH> int
-ACE_Event_Channel<SH, CH>::handle_timeout (const ACE_Time_Value &,
- const void *)
+ACE_Event_Channel::handle_timeout (const ACE_Time_Value &,
+ const void *)
{
ACE_DEBUG ((LM_DEBUG, "(%t) doing the performance timeout here...\n"));
- CONNECTION_MAP_ITERATOR cti (this->connection_map_);
+ CONNECTION_MAP_ITERATOR cmi (this->connection_map_);
// If we've got a ACE_Thread Manager then use it to suspend all the
// threads. This will enable us to get an accurate count.
@@ -47,17 +44,18 @@ ACE_Event_Channel<SH, CH>::handle_timeout (const ACE_Time_Value &,
size_t total_bytes_in = 0;
size_t total_bytes_out = 0;
- // Iterate through the consumer map connecting all the Proxy_Handlers.
+ // Iterate through the connection map summing up the number of bytes
+ // sent/received.
for (CONNECTION_MAP_ENTRY *me = 0;
- cti.next (me) != 0;
- cti.advance ())
+ cmi.next (me) != 0;
+ cmi.advance ())
{
Proxy_Handler *proxy_handler = me->int_id_;
- if (proxy_handler->direction () == 'C')
+ if (proxy_handler->proxy_role () == 'C')
total_bytes_out += proxy_handler->total_bytes ();
- else // proxy_handler->direction () == 'S'
+ else // proxy_handler->proxy_role () == 'S'
total_bytes_in += proxy_handler->total_bytes ();
}
@@ -74,13 +72,13 @@ ACE_Event_Channel<SH, CH>::handle_timeout (const ACE_Time_Value &,
(float) (total_bytes_out * 8 / (float) (1024*1024*this->performance_window_)));
#else
ACE_DEBUG ((LM_DEBUG, "(%t) after %d seconds, \ntotal_bytes_in = %d\ntotal_bytes_out = %d\n",
- this->performance_window_,
- total_bytes_in,
+ this->options ().performance_window_,
+ total_bytes_in,
total_bytes_out));
ACE_DEBUG ((LM_DEBUG, "(%t) %f Mbits/sec received.\n",
- (float) (total_bytes_in * 8 / (float) (1024*1024*this->performance_window_))));
+ (float) (total_bytes_in * 8 / (float) (1024 * 1024 * this->options ().performance_window_))));
ACE_DEBUG ((LM_DEBUG, "(%t) %f Mbits/sec sent.\n",
- (float) (total_bytes_out * 8 / (float) (1024*1024*this->performance_window_))));
+ (float) (total_bytes_out * 8 / (float) (1024 * 1024 * this->options ().performance_window_))));
#endif /* ACE_NLOGGING */
#if defined (USE_INPUT_MT) || defined (USE_OUTPUT_MT)
@@ -95,31 +93,177 @@ ACE_Event_Channel<SH, CH>::handle_timeout (const ACE_Time_Value &,
return 0;
}
+ACE_Event_Channel::put (ACE_Message_Block *forward_addr,
+ ACE_Time_Value *)
+{
+ // We got a valid event, so determine its virtual forwarding
+ // address, which is stored in the first of the two event blocks,
+ // which are chained together by this->recv().
+
+ Event_Key *forwarding_addr = (Event_Key *) forward_addr->rd_ptr ();
+
+ // Skip over the address portion and get the data.
+ ACE_Message_Block *data = forward_addr->cont ();
+
+ // <dispatch_set> points to the set of Consumers associated with
+ // this forwarding address.
+ Consumer_Dispatch_Set *dispatch_set = 0;
+
+ if (this->efd_.find (*forwarding_addr, dispatch_set) == -1)
+ // Failure.
+ ACE_ERROR ((LM_DEBUG,
+ "(%t) find failed on conn id = %d, logical id = %d, type = %d\n",
+ forwarding_addr->conn_id_,
+ forwarding_addr->supplier_id_,
+ forwarding_addr->type_));
+ else
+ {
+ // Check to see if there are any consumers.
+ if (dispatch_set->size () == 0)
+ ACE_DEBUG ((LM_WARNING,
+ "there are no active consumers for this event currently\n"));
+
+ else // There are consumers, so forward the event.
+ {
+ Consumer_Dispatch_Set_Iterator dsi (*dispatch_set);
+
+ // At this point, we should assign a thread-safe locking
+ // strategy to the Message_Block is we're running in a
+ // multi-threaded configuration.
+ // data->locking_strategy (MB_Locking_Strategy::instance ());
+
+ for (Proxy_Handler **proxy_handler = 0;
+ dsi.next (proxy_handler) != 0;
+ dsi.advance ())
+ {
+ // Only process active proxy_handlers.
+ if ((*proxy_handler)->state () == Proxy_Handler::ESTABLISHED)
+ {
+ // Duplicate the event portion via reference
+ // counting.
+ ACE_Message_Block *dup_msg = data->duplicate ();
+
+ ACE_DEBUG ((LM_DEBUG, "(%t) sending to peer %d\n",
+ (*proxy_handler)->id ()));
+
+ if ((*proxy_handler)->put (dup_msg) == -1)
+ {
+ if (errno == EWOULDBLOCK) // The queue has filled up!
+ ACE_ERROR ((LM_ERROR, "(%t) %p\n",
+ "gateway is flow controlled, so we're dropping events"));
+ else
+ ACE_ERROR ((LM_ERROR, "(%t) %p transmission error to peer %d\n",
+ "put", (*proxy_handler)->id ()));
+
+ // We are responsible for releasing an
+ // ACE_Message_Block if failures occur.
+ dup_msg->release ();
+ }
+ }
+ }
+ }
+ }
+
+ // Release the memory in the message block.
+ forward_addr->release ();
+ return 0;
+}
+
+ACE_Event_Channel::svc (void)
+{
+ return 0;
+}
+
+int
+ACE_Event_Channel::initiate_proxy_connection (Proxy_Handler *proxy_handler,
+ ACE_Synch_Options &synch_options)
+{
+ return this->connector_.initiate_connection (proxy_handler,
+ synch_options);
+}
+
+int
+ACE_Event_Channel::complete_proxy_connection (Proxy_Handler *proxy_handler)
+{
+ int option = proxy_handler->proxy_role () == 'S' ? SO_RCVBUF : SO_SNDBUF;
+ int socket_queue_size = this->options ().socket_queue_size_;
+
+ if (proxy_handler->peer ().set_option (SOL_SOCKET,
+ option,
+ &socket_queue_size,
+ sizeof (int)) == -1)
+ ACE_ERROR ((LM_ERROR, "(%t) %p\n", "set_option"));
+
+ proxy_handler->thr_mgr (ACE_Service_Config::thr_mgr ());
+
+ // Our state is now "established."
+ proxy_handler->state (Proxy_Handler::ESTABLISHED);
+
+ // Restart the timeout to 1.
+ proxy_handler->timeout (1);
+
+ ACE_INT32 id = htonl (proxy_handler->id ());
+
+ // Send the connection id to the peerd.
+
+ ssize_t n = proxy_handler->peer ().send ((const void *) &id, sizeof id);
+
+ if (n != sizeof id)
+ ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n",
+ n == 0 ? "peer has closed down unexpectedly" : "send"),
+ -1);
+}
+
+// Restart connection (blocking_semantics dicates whether we restart
+// synchronously or asynchronously).
+
+int
+ACE_Event_Channel::reinitiate_proxy_connection (Proxy_Handler *proxy_handler)
+{
+ // Skip over deactivated descriptors.
+ if (proxy_handler->get_handle () != ACE_INVALID_HANDLE)
+ {
+ // Make sure to close down peer to reclaim descriptor.
+ proxy_handler->peer ().close ();
+
+ ACE_DEBUG ((LM_DEBUG,
+ "(%t) scheduling reinitiation of Proxy_Handler %d\n",
+ proxy_handler->id ()));
+
+ // Reschedule ourselves to try and connect again.
+ if (ACE_Service_Config::reactor ()->schedule_timer
+ (proxy_handler, 0, proxy_handler->timeout ()) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n",
+ "schedule_timer"), -1);
+ }
+ return 0;
+}
+
// Initiate connections with the Consumer and Supplier Peers.
-template <class SH, class CH> int
-ACE_Event_Channel<SH, CH>::initiate_connections (void)
+ACE_Event_Channel::initiate_connections (void)
{
- CONNECTION_MAP_ITERATOR cti (this->connection_map_);
+ CONNECTION_MAP_ITERATOR cmi (this->connection_map_);
ACE_Synch_Options synch_options;
- if (this->blocking_semantics_ == ACE_NONBLOCK)
+ if (this->options ().blocking_semantics_ == ACE_NONBLOCK)
synch_options = ACE_Synch_Options::asynch;
else
synch_options = ACE_Synch_Options::synch;
- // Iterate through the Consumer Map connecting all the Proxy_Handlers.
+ // Iterate through the Consumer Map connecting all the
+ // Proxy_Handlers.
for (CONNECTION_MAP_ENTRY *me = 0;
- cti.next (me) != 0;
- cti.advance ())
+ cmi.next (me) != 0;
+ cmi.advance ())
{
Proxy_Handler *proxy_handler = me->int_id_;
- if (this->connector_->initiate_connection
+ if (this->initiate_proxy_connection
(proxy_handler, synch_options) == -1)
- continue;
+ continue; // Failures are handled elsewhere...
}
return 0;
@@ -128,8 +272,7 @@ ACE_Event_Channel<SH, CH>::initiate_connections (void)
// This method gracefully shuts down all the Handlers in the
// Proxy_Handler Connection Map.
-template <class SH, class CH> int
-ACE_Event_Channel<SH, CH>::close (void)
+ACE_Event_Channel::close (u_long)
{
#if defined (USE_INPUT_MT) || defined (USE_OUTPUT_MT)
ACE_DEBUG ((LM_DEBUG, "(%t) suspending all threads\n"));
@@ -147,7 +290,7 @@ ACE_Event_Channel<SH, CH>::close (void)
{
Proxy_Handler *proxy_handler = me->int_id_;
- ACE_DEBUG ((LM_DEBUG, "(%t) closing down route %d\n",
+ ACE_DEBUG ((LM_DEBUG, "(%t) closing down connection %d\n",
proxy_handler->id ()));
if (proxy_handler->state () != Proxy_Handler::IDLE)
@@ -159,247 +302,76 @@ ACE_Event_Channel<SH, CH>::close (void)
proxy_handler->destroy (); // Will trigger a delete.
}
- // Free up the resources allocated dynamically by the ACE_Connector.
- delete this->connector_;
return 0;
}
-template <class SH, class CH> int
-ACE_Event_Channel<SH, CH>::open (int argc, char *argv[])
+int
+ACE_Event_Channel::find_proxy (ACE_INT32 conn_id,
+ Proxy_Handler *&proxy_handler)
{
- this->parse_args (argc, argv);
-
- ACE_NEW_RETURN (this->connector_, Proxy_Handler_Connector (), -1);
-
- // Ignore SIPPIPE so each Consumer_Proxy can handle it.
- ACE_Sig_Action sig (ACE_SignalHandler (SIG_IGN), SIGPIPE);
-
- if (this->active_connector_role_)
- {
- // Parse the connection configuration file.
- this->parse_connection_config_file ();
-
- // Parse the consumer map config file and build the consumer map.
- this->parse_consumer_config_file ();
-
- // Initiate connections with the peers.
- this->initiate_connections ();
- }
-
- // If this->performance_window_ > 0 start a timer.
-
- if (this->performance_window_ > 0)
- {
- if (ACE_Service_Config::reactor ()->schedule_timer
- (this, 0, this->performance_window_) == -1)
- ACE_ERROR ((LM_ERROR, "(%t) %p\n", "schedule_timer"));
- else
- ACE_DEBUG ((LM_DEBUG, "starting timer for %d seconds...\n",
- this->performance_window_));
- }
-
- return 0;
+ return this->connection_map_.find (conn_id, proxy_handler);
}
-// Parse and build the connection table.
-
-template <class SH, class CH> int
-ACE_Event_Channel<SH, CH>::parse_connection_config_file (void)
+int
+ACE_Event_Channel::bind_proxy (Proxy_Handler *proxy_handler)
{
- // File that contains the consumer map configuration information.
- Connection_Config_File_Parser connection_file;
- Connection_Config_File_Entry entry;
- int file_empty = 1;
- int line_number = 0;
-
- if (connection_file.open (this->connection_config_file_) == -1)
- ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", this->connection_config_file_), -1);
-
- // Read config file one line at a time.
- while (connection_file.read_entry (entry, line_number) != FP::EOFILE)
+ switch (this->connection_map_.bind (proxy_handler->id (), proxy_handler))
{
- file_empty = 0;
-
- if (this->debug_)
- ACE_DEBUG ((LM_DEBUG, "(%t) conn id = %d, host = %s, remote port = %d, "
- "direction = %c, max retry timeout = %d, local port = %d\n",
- entry.conn_id_,
- entry.host_,
- entry.remote_poconsumer_,
- entry.direction_,
- entry.max_retry_delay_,
- entry.local_poconsumer_));
-
- Proxy_Handler *proxy_handler = 0;
-
- // The next few lines of code are dependent on whether we are
- // making an Supplier_Proxy or an Consumer_Proxy.
-
- if (entry.direction_ == 'C') // Configure a Consumer_Proxy.
- ACE_NEW_RETURN (proxy_handler,
- CONSUMER_HANDLER (&this->efd_,
- this->connector_,
- ACE_Service_Config::thr_mgr (),
- this->socket_queue_size_),
- -1);
- else /* direction == 'S' */ // Configure a Supplier_Proxy.
- ACE_NEW_RETURN (proxy_handler,
- SUPPLIER_HANDLER (&this->efd_,
- this->connector_,
- ACE_Service_Config::thr_mgr (),
- this->socket_queue_size_),
- -1);
-
- // The following code is common to both Supplier_Proxys_ and
- // Consumer_Proxys.
-
- // Initialize the routing entry's peer addressing info.
- proxy_handler->bind (ACE_INET_Addr (entry.remote_poconsumer_, entry.host_),
- ACE_INET_Addr (entry.local_poconsumer_), entry.conn_id_);
-
- // Initialize max timeout.
- proxy_handler->max_timeout (entry.max_retry_delay_);
-
- // Try to bind the new Proxy_Handler to the connection ID.
- switch (this->connection_map_.bind (entry.conn_id_, proxy_handler))
- {
- case -1:
- ACE_ERROR_RETURN ((LM_ERROR,
- "(%t) bind failed for connection %d\n",
- entry.conn_id_), -1);
- /* NOTREACHED */
- case 1: // Oops, found a duplicate!
- ACE_DEBUG ((LM_DEBUG,
- "(%t) duplicate connection %d, already bound\n",
- entry.conn_id_));
- break;
- case 0:
- // Success.
- break;
- }
+ case -1:
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "(%t) bind failed for connection %d\n",
+ proxy_handler->id ()), -1);
+ /* NOTREACHED */
+ case 1: // Oops, found a duplicate!
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "(%t) duplicate connection %d, already bound\n",
+ proxy_handler->id ()), -1);
+ /* NOTREACHED */
+ case 0:
+ // Success.
+ return 0;
}
-
- if (file_empty)
- ACE_ERROR ((LM_WARNING,
- "warning: connection proxy_handler configuration file was empty\n"));
- return 0;
}
-template <class SH, class CH> int
-ACE_Event_Channel<SH, CH>::parse_consumer_config_file (void)
+int
+ACE_Event_Channel::subscribe (const Event_Key &event_addr,
+ Consumer_Dispatch_Set *cds)
{
- // File that contains the consumer map configuration information.
- Consumer_Config_File_Parser consumer_file;
- Consumer_Config_File_Entry entry;
- int file_empty = 1;
- int line_number = 0;
-
- if (consumer_file.open (this->consumer_config_file_) == -1)
- ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", this->consumer_config_file_), -1);
-
- // Read config file line at a time.
- while (consumer_file.read_entry (entry, line_number) != FP::EOFILE)
+ // Bind with consumer map, keyed by peer address.
+ switch (this->efd_.bind (event_addr, cds))
{
- file_empty = 0;
-
- if (this->debug_)
- {
- ACE_DEBUG ((LM_DEBUG, "(%t) conn id = %d, logical id = %d, payload = %d, "
- "number of destinations = %d\n",
- entry.conn_id_,
- entry.supplier_id_,
- entry.type_,
- entry.total_destinations_));
- for (int i = 0; i < entry.total_destinations_; i++)
- ACE_DEBUG ((LM_DEBUG, "(%t) destination[%d] = %d\n",
- i, entry.destinations_[i]));
- }
-
- Dispatch_Set *dispatch_set;
- ACE_NEW_RETURN (dispatch_set, Dispatch_Set, -1);
-
- Event_Addr event_addr (entry.conn_id_,
- entry.supplier_id_,
- entry.type_);
-
- // Add the destinations to the Routing Entry.
- for (int i = 0; i < entry.total_destinations_; i++)
- {
- Proxy_Handler *proxy_handler = 0;
-
- // Lookup destination and add to Dispatch_Set set if found.
- if (this->connection_map_.find (entry.destinations_[i],
- proxy_handler) != -1)
- dispatch_set->insert (proxy_handler);
- else
- ACE_ERROR ((LM_ERROR, "(%t) not found: destination[%d] = %d\n",
- i, entry.destinations_[i]));
- }
-
- // Bind with consumer map, keyed by peer address.
- switch (this->efd_.bind (event_addr, dispatch_set))
- {
- case -1:
- ACE_ERROR_RETURN ((LM_ERROR, "(%t) bind failed for connection %d\n",
- entry.conn_id_), -1);
- /* NOTREACHED */
- case 1: // Oops, found a duplicate!
- ACE_DEBUG ((LM_DEBUG, "(%t) duplicate consumer map entry %d, "
- "already bound\n", entry.conn_id_));
- break;
- case 0:
- // Success.
- break;
- }
+ case -1:
+ ACE_ERROR_RETURN ((LM_ERROR, "(%t) bind failed for connection %d\n",
+ event_addr.conn_id_), -1);
+ /* NOTREACHED */
+ case 1: // Oops, found a duplicate!
+ ACE_ERROR_RETURN ((LM_DEBUG, "(%t) duplicate consumer map entry %d, "
+ "already bound\n", event_addr.conn_id_), -1);
+ /* NOTREACHED */
+ case 0:
+ // Success.
+ return 0;
}
-
- if (file_empty)
- ACE_ERROR ((LM_WARNING,
- "warning: consumer map configuration file was empty\n"));
- return 0;
}
-// Parse the "command-line" arguments and set the corresponding flags.
-
-template <class SH, class CH> int
-ACE_Event_Channel<SH, CH>::parse_args (int argc, char *argv[])
+ACE_Event_Channel::open (void *)
{
- ACE_Get_Opt get_opt (argc, argv, "bc:dpr:q:w:", 0);
+ // Ignore SIPPIPE so each Consumer_Proxy can handle it.
+ ACE_Sig_Action sig (ACE_SignalHandler (SIG_IGN), SIGPIPE);
- for (int c; (c = get_opt ()) != -1; )
+#if 0
+ // If this->performance_window_ > 0 start a timer.
+
+ if (this->options ().performance_window_ > 0)
{
- switch (c)
- {
- case 'b': // Use blocking connection establishment.
- this->blocking_semantics_ = 0;
- break;
- case 'c':
- this->connection_config_file_ = get_opt.optarg;
- break;
- case 'd':
- this->debug_ = 1;
- break;
- case 'p':
- // We are not playing the active Connector role.
- this->active_connector_role_ = 0;
- break;
- case 'q':
- this->socket_queue_size_ = ACE_OS::atoi (get_opt.optarg);
- break;
- case 'r':
- this->consumer_config_file_ = get_opt.optarg;
- break;
- case 'w': // Time performance for a designated amount of time.
- this->performance_window_ = ACE_OS::atoi (get_opt.optarg);
- // Use blocking connection semantics so that we get accurate
- // timings (since all connections start at once).
- this->blocking_semantics_ = 0;
- break;
- default:
- break;
- }
+ if (ACE_Service_Config::reactor ()->schedule_timer
+ (this, 0, this->options ().performance_window_) == -1)
+ ACE_ERROR ((LM_ERROR, "(%t) %p\n", "schedule_timer"));
+ else
+ ACE_DEBUG ((LM_DEBUG, "starting timer for %d seconds...\n",
+ this->options ().performance_window_)));
}
+#endif
+
return 0;
}
-
-#endif /* ACE_EVENT_CHANNEL_C */