summaryrefslogtreecommitdiff
path: root/apps/Gateway/Gateway/Event_Channel.cpp
diff options
context:
space:
mode:
authorschmidt <douglascraigschmidt@users.noreply.github.com>1998-01-02 01:04:38 +0000
committerschmidt <douglascraigschmidt@users.noreply.github.com>1998-01-02 01:04:38 +0000
commit895521b8424ba7eac3dcd246338d9211bfbf1332 (patch)
treeb41e7a860bd9ad0f95ec4d7a330044b5c345c785 /apps/Gateway/Gateway/Event_Channel.cpp
parentf708a7b22cb6e8c8733ff3cc517b25e626f3dd39 (diff)
downloadATCD-895521b8424ba7eac3dcd246338d9211bfbf1332.tar.gz
*** empty log message ***
Diffstat (limited to 'apps/Gateway/Gateway/Event_Channel.cpp')
-rw-r--r--apps/Gateway/Gateway/Event_Channel.cpp138
1 files changed, 74 insertions, 64 deletions
diff --git a/apps/Gateway/Gateway/Event_Channel.cpp b/apps/Gateway/Gateway/Event_Channel.cpp
index 55e1f769af2..0e05f5ef67c 100644
--- a/apps/Gateway/Gateway/Event_Channel.cpp
+++ b/apps/Gateway/Gateway/Event_Channel.cpp
@@ -5,39 +5,16 @@
#include "Proxy_Handler_Connector.h"
#include "Event_Channel.h"
-ACE_Event_Channel_Options::ACE_Event_Channel_Options (void)
- : locking_strategy_ (0),
- performance_window_ (0),
- blocking_semantics_ (ACE_NONBLOCK),
- socket_queue_size_ (0),
- threading_strategy_ (REACTIVE),
- acceptor_port_ (ACE_DEFAULT_GATEWAY_SERVER_PORT),
- connector_role_ (0),
- acceptor_role_ (0),
- verbose_ (0)
-{
-}
-
-ACE_Event_Channel_Options::~ACE_Event_Channel_Options (void)
-{
- delete this->locking_strategy_;
-}
-
ACE_Event_Channel::~ACE_Event_Channel (void)
{
}
ACE_Event_Channel::ACE_Event_Channel (void)
- : acceptor_ (*this)
+ : supplier_acceptor_ (*this),
+ consumer_acceptor_ (*this)
{
}
-ACE_Event_Channel_Options &
-ACE_Event_Channel::options (void)
-{
- return this->options_;
-}
-
int
ACE_Event_Channel::compute_performance_statistics (void)
{
@@ -47,7 +24,8 @@ ACE_Event_Channel::compute_performance_statistics (void)
// If we've got a ACE_Thread Manager then use it to suspend all the
// threads. This will enable us to get an accurate count.
- if (this->options ().threading_strategy_ != ACE_Event_Channel_Options::REACTIVE)
+ if (Options::instance ()->threading_strategy ()
+ != Options::REACTIVE)
{
if (ACE_Thread_Manager::instance ()->suspend_all () == -1)
ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "suspend_all"), -1);
@@ -86,18 +64,23 @@ ACE_Event_Channel::compute_performance_statistics (void)
(float) (total_bytes_out * 8 / (float) (1024*1024*this->performance_window_)));
#else
ACE_DEBUG ((LM_DEBUG, "(%t) after %d seconds, \ntotal_bytes_in = %d\ntotal_bytes_out = %d\n",
- this->options ().performance_window_,
+ Options::instance ()->performance_window (),
total_bytes_in,
total_bytes_out));
- ACE_DEBUG ((LM_DEBUG, "(%t) %f Mbits/sec received.\n",
- (float) (total_bytes_in * 8 / (float) (1024 * 1024 * this->options ().performance_window_))));
- ACE_DEBUG ((LM_DEBUG, "(%t) %f Mbits/sec sent.\n",
- (float) (total_bytes_out * 8 / (float) (1024 * 1024 * this->options ().performance_window_))));
+ ACE_DEBUG ((LM_DEBUG,
+ "(%t) %f Mbits/sec received.\n",
+ (float) (total_bytes_in * 8 /
+ (float) (1024 * 1024 * Options::instance ()->performance_window ()))));
+ ACE_DEBUG ((LM_DEBUG,
+ "(%t) %f Mbits/sec sent.\n",
+ (float) (total_bytes_out * 8 /
+ (float) (1024 * 1024 * Options::instance ()->performance_window ()))));
#endif /* ACE_NLOGGING */
// Resume all the threads again.
- if (this->options ().threading_strategy_ != ACE_Event_Channel_Options::REACTIVE)
+ if (Options::instance ()->threading_strategy ()
+ != Options::REACTIVE)
{
if (ACE_Thread_Manager::instance ()->resume_all () == -1)
ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "resume_all"), -1);
@@ -205,7 +188,7 @@ ACE_Event_Channel::initiate_proxy_connection (Proxy_Handler *proxy_handler)
{
ACE_Synch_Options synch_options;
- if (this->options ().blocking_semantics_ == ACE_NONBLOCK)
+ if (Options::instance ()->blocking_semantics () == ACE_NONBLOCK)
synch_options = ACE_Synch_Options::asynch;
else
synch_options = ACE_Synch_Options::synch;
@@ -218,7 +201,7 @@ int
ACE_Event_Channel::complete_proxy_connection (Proxy_Handler *proxy_handler)
{
int option = proxy_handler->proxy_role () == 'S' ? SO_RCVBUF : SO_SNDBUF;
- int socket_queue_size = this->options ().socket_queue_size_;
+ int socket_queue_size = Options::instance ()->socket_queue_size ();
if (socket_queue_size > 0)
if (proxy_handler->peer ().set_option (SOL_SOCKET,
@@ -281,19 +264,22 @@ ACE_Event_Channel::reinitiate_proxy_connection (Proxy_Handler *proxy_handler)
void
ACE_Event_Channel::initiate_connector (void)
{
- PROXY_MAP_ITERATOR cmi (this->proxy_map_);
+ if (Options::instance ()->enabled (Options::CONSUMER_CONNECTOR | Options::SUPPLIER_CONNECTOR))
+ {
+ PROXY_MAP_ITERATOR cmi (this->proxy_map_);
- // Iterate through the Consumer Map connecting all the
- // Proxy_Handlers.
+ // Iterate through the Consumer Map connecting all the
+ // Proxy_Handlers.
- for (PROXY_MAP_ENTRY *me = 0;
- cmi.next (me) != 0;
- cmi.advance ())
- {
- Proxy_Handler *proxy_handler = me->int_id_;
+ for (PROXY_MAP_ENTRY *me = 0;
+ cmi.next (me) != 0;
+ cmi.advance ())
+ {
+ Proxy_Handler *proxy_handler = me->int_id_;
- if (this->initiate_proxy_connection (proxy_handler) == -1)
- continue; // Failures are handled elsewhere...
+ if (this->initiate_proxy_connection (proxy_handler) == -1)
+ continue; // Failures are handled elsewhere...
+ }
}
}
@@ -301,11 +287,21 @@ ACE_Event_Channel::initiate_connector (void)
// to accept.
void
-ACE_Event_Channel::initiate_acceptor (void)
+ACE_Event_Channel::initiate_acceptors (void)
{
- if (this->acceptor_.open (this->options ().acceptor_port_,
- ACE_Reactor::instance (),
- this->options ().blocking_semantics_) == -1)
+ if (Options::instance ()->enabled (Options::CONSUMER_ACCEPTOR)
+ && this->consumer_acceptor_.open
+ (Options::instance ()->consumer_acceptor_port (),
+ ACE_Reactor::instance (),
+ Options::instance ()->blocking_semantics ()) == -1)
+ ACE_ERROR ((LM_ERROR, "%p\n",
+ "cannot register acceptor"));
+
+ if (Options::instance ()->enabled (Options::SUPPLIER_CONNECTOR)
+ && this->supplier_acceptor_.open
+ (Options::instance ()->supplier_acceptor_port (),
+ ACE_Reactor::instance (),
+ Options::instance ()->blocking_semantics ()) == -1)
ACE_ERROR ((LM_ERROR, "%p\n",
"cannot register acceptor"));
}
@@ -316,14 +312,15 @@ ACE_Event_Channel::initiate_acceptor (void)
int
ACE_Event_Channel::close (u_long)
{
- if (this->options ().threading_strategy_ != ACE_Event_Channel_Options::REACTIVE)
+ if (Options::instance ()->threading_strategy ()
+ != Options::REACTIVE)
{
if (ACE_Thread_Manager::instance ()->suspend_all () == -1)
ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "suspend_all"), -1);
ACE_DEBUG ((LM_DEBUG, "(%t) suspending all threads\n"));
}
- // Tell everyone that the spaceship is here.
+ // First tell everyone that the spaceship is here...
{
PROXY_MAP_ITERATOR cmi (this->proxy_map_);
@@ -345,12 +342,15 @@ ACE_Event_Channel::close (u_long)
}
// Close down the connector
- connector_.close ();
+ this->connector_.close ();
- // Close down the acceptor
- acceptor_.close ();
+ // Close down the supplier acceptor.
+ this->supplier_acceptor_.close ();
- // Tell everyone that it is now time to commit suicide.
+ // Close down the consumer acceptor.
+ this->consumer_acceptor_.close ();
+
+ // Now tell everyone that it is now time to commit suicide.
{
PROXY_MAP_ITERATOR cmi (this->proxy_map_);
@@ -395,11 +395,14 @@ ACE_Event_Channel::bind_proxy (Proxy_Handler *proxy_handler)
case 0:
// Success.
return 0;
+ /* NOTREACHED */
default:
ACE_ERROR_RETURN ((LM_DEBUG,
"(%t) invalid result %d\n", result), -1);
/* NOTREACHED */
}
+
+ return 0;
}
int
@@ -429,6 +432,8 @@ ACE_Event_Channel::subscribe (const Event_Key &event_addr,
"(%t) invalid result %d\n", result), -1);
/* NOTREACHED */
}
+
+ return 0;
}
int
@@ -438,22 +443,27 @@ ACE_Event_Channel::open (void *)
ACE_Sig_Action sig (ACE_SignalHandler (SIG_IGN), SIGPIPE);
ACE_UNUSED_ARG (sig);
- if (this->options ().connector_role_)
- // Actively initiate Peer connections.
- this->initiate_connector ();
+ // Actively initiate Peer connections.
+ this->initiate_connector ();
- if (this->options ().acceptor_role_)
- // Passively initiate Peer acceptor.
- this->initiate_acceptor ();
+ // Passively initiate Peer acceptor.
+ this->initiate_acceptors ();
// If we're not running reactively, then we need to make sure that
// <ACE_Message_Block> reference counting operations are
// thread-safe. Therefore, we create an <ACE_Lock_Adapter> that is
// parameterized by <ACE_SYNCH_MUTEX> to prevent race conditions.
- if (this->options ().threading_strategy_ != ACE_Event_Channel_Options::REACTIVE)
- ACE_NEW_RETURN (this->options ().locking_strategy_,
- ACE_Lock_Adapter<ACE_SYNCH_MUTEX>,
- -1);
+ if (Options::instance ()->threading_strategy ()
+ != Options::REACTIVE)
+ {
+ ACE_Lock_Adapter<ACE_SYNCH_MUTEX> *la;
+
+ ACE_NEW_RETURN (la,
+ ACE_Lock_Adapter<ACE_SYNCH_MUTEX>,
+ -1);
+
+ Options::instance ()->locking_strategy (la);
+ }
return 0;
}