summaryrefslogtreecommitdiff
path: root/apps/Gateway/Gateway/Event_Channel.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'apps/Gateway/Gateway/Event_Channel.cpp')
-rw-r--r--apps/Gateway/Gateway/Event_Channel.cpp106
1 files changed, 52 insertions, 54 deletions
diff --git a/apps/Gateway/Gateway/Event_Channel.cpp b/apps/Gateway/Gateway/Event_Channel.cpp
index 815755216c7..d146ddfb362 100644
--- a/apps/Gateway/Gateway/Event_Channel.cpp
+++ b/apps/Gateway/Gateway/Event_Channel.cpp
@@ -1,9 +1,10 @@
/* -*- C++ -*- */
// $Id$
+#define ACE_BUILD_SVC_DLL
#include "ace/Get_Opt.h"
#include "Config_Files.h"
-#include "IO_Handler_Connector.h"
+#include "Proxy_Handler_Connector.h"
#include "Event_Channel.h"
#if !defined (ACE_EVENT_CHANNEL_C)
@@ -46,18 +47,18 @@ ACE_Event_Channel<SH, CH>::handle_timeout (const ACE_Time_Value &,
size_t total_bytes_in = 0;
size_t total_bytes_out = 0;
- // Iterate through the consumer map connecting all the IO_Handlers.
+ // Iterate through the consumer map connecting all the Proxy_Handlers.
for (CONNECTION_MAP_ENTRY *me = 0;
cti.next (me) != 0;
cti.advance ())
{
- IO_Handler *io_handler = me->int_id_;
+ Proxy_Handler *proxy_handler = me->int_id_;
- if (io_handler->direction () == 'C')
- total_bytes_out += io_handler->total_bytes ();
- else // io_handler->direction () == 'S'
- total_bytes_in += io_handler->total_bytes ();
+ if (proxy_handler->direction () == 'C')
+ total_bytes_out += proxy_handler->total_bytes ();
+ else // proxy_handler->direction () == 'S'
+ total_bytes_in += proxy_handler->total_bytes ();
}
#if defined (ACE_NLOGGING)
@@ -108,16 +109,16 @@ ACE_Event_Channel<SH, CH>::initiate_connections (void)
else
synch_options = ACE_Synch_Options::synch;
- // Iterate through the Consumer Map connecting all the IO_Handlers.
+ // Iterate through the Consumer Map connecting all the Proxy_Handlers.
for (CONNECTION_MAP_ENTRY *me = 0;
cti.next (me) != 0;
cti.advance ())
{
- IO_Handler *io_handler = me->int_id_;
+ Proxy_Handler *proxy_handler = me->int_id_;
if (this->connector_->initiate_connection
- (io_handler, synch_options) == -1)
+ (proxy_handler, synch_options) == -1)
continue;
}
@@ -125,7 +126,7 @@ ACE_Event_Channel<SH, CH>::initiate_connections (void)
}
// This method gracefully shuts down all the Handlers in the
-// IO_Handler Connection Map.
+// Proxy_Handler Connection Map.
template <class SH, class CH> int
ACE_Event_Channel<SH, CH>::close (void)
@@ -136,26 +137,26 @@ ACE_Event_Channel<SH, CH>::close (void)
ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "suspend_all"), -1);
#endif /* USE_INPUT_MT || USE_OUTPUT_MT */
- CONNECTION_MAP_ITERATOR cti (this->connection_map_);
+ CONNECTION_MAP_ITERATOR cmi (this->connection_map_);
// Iterate over all the handlers and shut them down.
for (CONNECTION_MAP_ENTRY *me;
- cti.next (me) != 0;
- cti.advance ())
+ cmi.next (me) != 0;
+ cmi.advance ())
{
- IO_Handler *io_handler = me->int_id_;
+ Proxy_Handler *proxy_handler = me->int_id_;
ACE_DEBUG ((LM_DEBUG, "(%t) closing down route %d\n",
- io_handler->id ()));
+ proxy_handler->id ()));
- if (io_handler->state () != IO_Handler::IDLE)
- // Mark IO_Handler as DISCONNECTING so we don't try to
+ if (proxy_handler->state () != Proxy_Handler::IDLE)
+ // Mark Proxy_Handler as DISCONNECTING so we don't try to
// reconnect...
- io_handler->state (IO_Handler::DISCONNECTING);
+ proxy_handler->state (Proxy_Handler::DISCONNECTING);
- // Deallocate IO_Handler resources.
- io_handler->destroy (); // Will trigger a delete.
+ // Deallocate Proxy_Handler resources.
+ proxy_handler->destroy (); // Will trigger a delete.
}
// Free up the resources allocated dynamically by the ACE_Connector.
@@ -168,7 +169,10 @@ ACE_Event_Channel<SH, CH>::open (int argc, char *argv[])
{
this->parse_args (argc, argv);
- ACE_NEW_RETURN (this->connector_, IO_Handler_Connector (), -1);
+ ACE_NEW_RETURN (this->connector_, Proxy_Handler_Connector (), -1);
+
+ // Ignore SIPPIPE so each Consumer_Proxy can handle it.
+ ACE_Sig_Action sig (ACE_SignalHandler (SIG_IGN), SIGPIPE);
if (this->active_connector_role_)
{
@@ -226,38 +230,38 @@ ACE_Event_Channel<SH, CH>::parse_connection_config_file (void)
entry.max_retry_delay_,
entry.local_poconsumer_));
- IO_Handler *io_handler = 0;
+ Proxy_Handler *proxy_handler = 0;
// The next few lines of code are dependent on whether we are
- // making an Supplier_Handler or an Consumer_Handler.
+ // making an Supplier_Proxy or an Consumer_Proxy.
- if (entry.direction_ == 'C') // Configure a Consumer_Handler.
- ACE_NEW_RETURN (io_handler,
- CONSUMER_HANDLER (&this->consumer_map_,
+ if (entry.direction_ == 'C') // Configure a Consumer_Proxy.
+ ACE_NEW_RETURN (proxy_handler,
+ CONSUMER_HANDLER (&this->efd_,
this->connector_,
ACE_Service_Config::thr_mgr (),
this->socket_queue_size_),
-1);
- else /* direction == 'S' */ // Configure a Supplier_Handler.
- ACE_NEW_RETURN (io_handler,
- SUPPLIER_HANDLER (&this->consumer_map_,
+ else /* direction == 'S' */ // Configure a Supplier_Proxy.
+ ACE_NEW_RETURN (proxy_handler,
+ SUPPLIER_HANDLER (&this->efd_,
this->connector_,
ACE_Service_Config::thr_mgr (),
this->socket_queue_size_),
-1);
- // The following code is common to both Supplier_Handlers_ and
- // Consumer_Handlers.
+ // The following code is common to both Supplier_Proxys_ and
+ // Consumer_Proxys.
// Initialize the routing entry's peer addressing info.
- io_handler->bind (ACE_INET_Addr (entry.remote_poconsumer_, entry.host_),
+ proxy_handler->bind (ACE_INET_Addr (entry.remote_poconsumer_, entry.host_),
ACE_INET_Addr (entry.local_poconsumer_), entry.conn_id_);
// Initialize max timeout.
- io_handler->max_timeout (entry.max_retry_delay_);
+ proxy_handler->max_timeout (entry.max_retry_delay_);
- // Try to bind the new IO_Handler to the connection ID.
- switch (this->connection_map_.bind (entry.conn_id_, io_handler))
+ // Try to bind the new Proxy_Handler to the connection ID.
+ switch (this->connection_map_.bind (entry.conn_id_, proxy_handler))
{
case -1:
ACE_ERROR_RETURN ((LM_ERROR,
@@ -277,7 +281,7 @@ ACE_Event_Channel<SH, CH>::parse_connection_config_file (void)
if (file_empty)
ACE_ERROR ((LM_WARNING,
- "warning: connection io_handler configuration file was empty\n"));
+ "warning: connection proxy_handler configuration file was empty\n"));
return 0;
}
@@ -303,43 +307,37 @@ ACE_Event_Channel<SH, CH>::parse_consumer_config_file (void)
ACE_DEBUG ((LM_DEBUG, "(%t) conn id = %d, logical id = %d, payload = %d, "
"number of destinations = %d\n",
entry.conn_id_,
- entry.logical_id_,
- entry.payload_type_,
+ entry.supplier_id_,
+ entry.type_,
entry.total_destinations_));
for (int i = 0; i < entry.total_destinations_; i++)
ACE_DEBUG ((LM_DEBUG, "(%t) destination[%d] = %d\n",
i, entry.destinations_[i]));
}
- Consumer_Entry *re;
- ACE_NEW_RETURN (re, Consumer_Entry, -1);
-
- Consumer_Entry::ENTRY_SET *io_handler_set;
- ACE_NEW_RETURN (io_handler_set, Consumer_Entry::ENTRY_SET, -1);
+ Dispatch_Set *dispatch_set;
+ ACE_NEW_RETURN (dispatch_set, Dispatch_Set, -1);
Event_Addr event_addr (entry.conn_id_,
- entry.logical_id_,
- entry.payload_type_);
+ entry.supplier_id_,
+ entry.type_);
// Add the destinations to the Routing Entry.
for (int i = 0; i < entry.total_destinations_; i++)
{
- IO_Handler *io_handler = 0;
+ Proxy_Handler *proxy_handler = 0;
- // Lookup destination and add to Consumer_Entry set if found.
+ // Lookup destination and add to Dispatch_Set set if found.
if (this->connection_map_.find (entry.destinations_[i],
- io_handler) != -1)
- io_handler_set->insert (io_handler);
+ proxy_handler) != -1)
+ dispatch_set->insert (proxy_handler);
else
ACE_ERROR ((LM_ERROR, "(%t) not found: destination[%d] = %d\n",
i, entry.destinations_[i]));
}
- // Attach set of destination io_handlers to routing entry.
- re->destinations (io_handler_set);
-
// Bind with consumer map, keyed by peer address.
- switch (this->consumer_map_.bind (event_addr, re))
+ switch (this->efd_.bind (event_addr, dispatch_set))
{
case -1:
ACE_ERROR_RETURN ((LM_ERROR, "(%t) bind failed for connection %d\n",