summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorschmidt <douglascraigschmidt@users.noreply.github.com>1997-01-02 09:05:39 +0000
committerschmidt <douglascraigschmidt@users.noreply.github.com>1997-01-02 09:05:39 +0000
commit1c44106287219a05ddbff09df4574b90777040ae (patch)
tree1d371fe6828480e7cecdcb75a8887a2b4bb83f53
parentfbcfcdb6ff9975a9e2152be0a5dc7e28a32635fc (diff)
downloadATCD-1c44106287219a05ddbff09df4574b90777040ae.tar.gz
foo
-rw-r--r--ChangeLog-96b15
-rw-r--r--ChangeLog-97a86
-rw-r--r--Makefile1
-rw-r--r--README1
-rw-r--r--ace/ACE.cpp6
-rw-r--r--ace/Acceptor.cpp18
-rw-r--r--ace/Event_Handler.cpp8
-rw-r--r--ace/Event_Handler.h22
-rw-r--r--ace/Message_Block.cpp13
-rw-r--r--ace/Message_Block.h6
-rw-r--r--ace/OS.h12
-rw-r--r--ace/Reactor.cpp11
-rw-r--r--ace/Service_Manager.cpp4
-rw-r--r--ace/Svc_Handler.cpp4
-rw-r--r--ace/Task.cpp18
-rw-r--r--ace/Task.h10
-rw-r--r--ace/Task.i29
-rw-r--r--apps/Gateway/Gateway/Concrete_Proxy_Handlers.cpp593
-rw-r--r--apps/Gateway/Gateway/Concrete_Proxy_Handlers.h125
-rw-r--r--apps/Gateway/Gateway/Config_Files.cpp36
-rw-r--r--apps/Gateway/Gateway/Config_Files.h29
-rw-r--r--apps/Gateway/Gateway/Event.h6
-rw-r--r--apps/Gateway/Gateway/Event_Channel.cpp163
-rw-r--r--apps/Gateway/Gateway/Event_Channel.h57
-rw-r--r--apps/Gateway/Gateway/Event_Forwarding_Discriminator.h6
-rw-r--r--apps/Gateway/Gateway/Gateway.cpp223
-rw-r--r--apps/Gateway/Gateway/Makefile353
-rw-r--r--apps/Gateway/Gateway/Proxy_Handler.cpp443
-rw-r--r--apps/Gateway/Gateway/Proxy_Handler.h81
-rw-r--r--apps/Gateway/Gateway/Proxy_Handler_Acceptor.cpp9
-rw-r--r--apps/Gateway/Gateway/Proxy_Handler_Acceptor.h40
-rw-r--r--apps/Gateway/Gateway/Proxy_Handler_Connector.cpp2
-rw-r--r--apps/Gateway/Gateway/Proxy_Handler_Connector.h3
-rw-r--r--apps/Gateway/Gateway/consumer_config9
-rw-r--r--apps/Gateway/Gateway/gatewayd.cpp39
-rw-r--r--apps/Gateway/Gateway/proxy_config12
-rw-r--r--apps/Gateway/Gateway/svc.conf2
-rw-r--r--apps/Gateway/Peer/Makefile33
-rw-r--r--apps/Gateway/Peer/peerd.cpp49
-rw-r--r--apps/Gateway/README67
-rw-r--r--apps/Orbix-Examples/Event_Comm/Consumer/Input_Handler.cpp8
-rw-r--r--apps/Orbix-Examples/Event_Comm/Supplier/Input_Handler.cpp4
-rw-r--r--examples/ASX/Event_Server/Event_Server/Consumer_Router.cpp138
-rw-r--r--examples/ASX/Event_Server/Event_Server/Consumer_Router.h49
-rw-r--r--examples/ASX/Event_Server/Event_Server/Event_Analyzer.cpp11
-rw-r--r--examples/ASX/Event_Server/Event_Server/Event_Analyzer.h12
-rw-r--r--examples/ASX/Event_Server/Event_Server/Makefile117
-rw-r--r--examples/ASX/Event_Server/Event_Server/Options.cpp45
-rw-r--r--examples/ASX/Event_Server/Event_Server/Options.h93
-rw-r--r--examples/ASX/Event_Server/Event_Server/Options.i8
-rw-r--r--examples/ASX/Event_Server/Event_Server/Peer_Router.cpp347
-rw-r--r--examples/ASX/Event_Server/Event_Server/Peer_Router.h142
-rw-r--r--examples/ASX/Event_Server/Event_Server/Supplier_Router.cpp133
-rw-r--r--examples/ASX/Event_Server/Event_Server/Supplier_Router.h59
-rw-r--r--examples/ASX/Event_Server/Event_Server/event_server.cpp84
-rw-r--r--examples/ASX/Event_Server/README46
-rw-r--r--examples/ASX/Event_Server/Transceiver/transceiver.cpp122
-rw-r--r--examples/Connection/non_blocking/CPP-connector.cpp44
-rw-r--r--examples/Logger/Acceptor-server/server_loggerd.cpp2
-rw-r--r--examples/Logger/simple-server/Logging_Acceptor.cpp2
-rw-r--r--examples/Logger/simple-server/server_loggerd.cpp6
-rw-r--r--examples/Reactor/Ntalker/ntalker.cpp7
-rw-r--r--examples/Service_Configurator/IPC-tests/server/Handle_Broadcast.i4
-rw-r--r--examples/Service_Configurator/IPC-tests/server/Handle_L_CODgram.i4
-rw-r--r--examples/Service_Configurator/IPC-tests/server/Handle_L_Dgram.i4
-rw-r--r--examples/Service_Configurator/IPC-tests/server/Handle_L_FIFO.i4
-rw-r--r--examples/Service_Configurator/IPC-tests/server/Handle_L_Pipe.i5
-rw-r--r--examples/Service_Configurator/IPC-tests/server/Handle_L_SPIPE.i7
-rw-r--r--examples/Service_Configurator/IPC-tests/server/Handle_L_Stream.i4
-rw-r--r--examples/Service_Configurator/IPC-tests/server/Handle_R_Dgram.i4
-rw-r--r--examples/Service_Configurator/IPC-tests/server/Handle_R_Stream.i7
-rw-r--r--examples/Service_Configurator/IPC-tests/server/Handle_Thr_Stream.cpp71
-rw-r--r--examples/Service_Configurator/IPC-tests/server/Handle_Thr_Stream.h10
-rw-r--r--netsvcs/lib/Logging_Strategy.cpp31
-rw-r--r--netsvcs/servers/main.cpp72
75 files changed, 2401 insertions, 1949 deletions
diff --git a/ChangeLog-96b b/ChangeLog-96b
index 64de3de46b5..0527c3e1a49 100644
--- a/ChangeLog-96b
+++ b/ChangeLog-96b
@@ -1,18 +1,3 @@
-Wed Jan 1 00:10:47 1997 Douglas C. Schmidt <schmidt@tango.cs.wustl.edu>
-
- * apps/Gateway/Gateway: Moved all of the configuration file
- parsing logic *outside* of the Event_Channel into the Gateway
- class so that we wouldn't have unnecessary dependencies.
-
- * apps/Gateway/Gateway: Redesigned the Gateway so that the
- Proxy_Handlers (i.e., the Consumer_Proxy and Supplier_Proxy)
- most of their work to the Event_Channel. This "lightweight
- proxy" design is an improvement since it is now possible to
- emulate the COS Event Channel semantics within the Event_Channel
- "kernel."
-
- * Happy new year!
-
Tue Dec 31 18:27:50 1996 Douglas C. Schmidt <schmidt@tango.cs.wustl.edu>
* ace/Log_Msg.cpp (log): Added a test so that if we're
diff --git a/ChangeLog-97a b/ChangeLog-97a
new file mode 100644
index 00000000000..fa53d88624c
--- /dev/null
+++ b/ChangeLog-97a
@@ -0,0 +1,86 @@
+Thu Jan 2 00:42:21 1997 Douglas C. Schmidt <schmidt@tango.cs.wustl.edu>
+
+ * examples/ASX/Event_Server: Completely rewrote and retested the
+ ACE Event Server example. The new code is *much* easier to
+ understand, has many more comments, is more robust, and compiles
+ much faster since I removed many of the templates.
+
+ * examples/ASX/Event_Server/Tranceiver/tranceiver.cpp: Fixed the
+ tranceiver so that it shuts down correctly when the Event_Server
+ exits.
+
+ * examples/Connection/non_blocking/CPP-connector.cpp: Fixed a
+ problem where we were trying to select() on ACE_STDIN on Win32.
+ Naturally, this doesn't work, so we used the
+ ACE::register_stdin_handler() to fix this. Thanks to
+ Samuel_Bercovici <Samuel_Bercovici_at_EFT__AD2@mail.icomverse.com>
+ for reporting this.
+
+ * examples/ASX/Event_Server/Event_Server/Options: Changed the
+ Options class to be a Singleton...
+
+ * ace/Task.h: Added "const" method qualifier to accessor methods
+ like is_reader() and is_writer().
+
+Wed Jan 1 00:10:47 1997 Douglas C. Schmidt <schmidt@tango.cs.wustl.edu>
+
+ * ace/Message_Block: Added a new static release() method that
+ behaves like the non-static method release(), except that it
+ checks if <mb> is 0. This is similar to CORBA::release(), which
+ is useful if you want to eliminate lots of checks for NULL
+ pointers before calling release() on them.
+
+ * Updated all places in ACE where we use
+ ACE_Event_Handler::READ_MASK when we should be using
+ ACE_Event_Handler::ACCEPT_MASK.
+
+ * examples/Service_Configurator/IPC-tests/server/Handle_Thr_Stream.cpp:
+ Changed the name of the acceptor class from Handle_Thr_Stream to
+ Handle_Thr_Acceptor, which is more accurate.
+
+ * ace/Reactor: Since we've now got an
+ ACE_Event_Handler::ACCEPT_MASK (intended primarily to accept
+ connections asynchronously using Win32 overlapped I/O) we can
+ now use this with the ACE_Reactor, as well. In particular, any
+ time that we are registering a "passive-mode" socket acceptor we
+ can now specify ACE_Event_Handler::ACCEPT_MASK. This is much
+ more intuitive than using the READ_MASK (which never made any
+ sense...). The ACE_Reactor now treats the ACCEPT_MASK as a
+ READ_MASK internally, so you can still use READ_MASK if you'd
+ like (i.e., this change doesn't break any existing code).
+
+ * ace/Event_Handler: Changed "get_priority" and "set_priority" to
+ simply "priority(void)" and "priority(int)" to be more
+ consistent with other parts of ACE.
+
+ * apps/Gateway/Gateway: Updated the Gateway so that the
+ concurrency strategies can now be specified on the
+ command-line (or in the svc.conf file), rather than being
+ determined at compile-time. This is much more flexible.
+ See the ./apps/Gateway/Gateway/README file for details.
+
+ * apps/Gateway/Gateway: Verified that all the multi-threading
+ strategies still work.
+
+ * ace/Message_Block.cpp (ACE_Message_Block): Make sure that we
+ "release" the continuation field, rather than delete it, since
+ its reference count may be > 1.
+
+ * ace/Log_Msg.cpp (log): Change the abort_prog logic a bit so that
+ we always print a message to stderr if we're exiting,
+ regardless... Thanks to David Levine for pointing this out.
+
+ * apps/Gateway/Gateway: Moved all of the configuration file
+ parsing logic *outside* of the Event_Channel into the Gateway
+ class so that we wouldn't have unnecessary dependencies.
+
+ * apps/Gateway/Gateway: Redesigned the Gateway so that the
+ Proxy_Handlers (i.e., the Consumer_Proxy and Supplier_Proxy)
+ most of their work to the Event_Channel. This "lightweight
+ proxy" design is an improvement since it is now possible to
+ emulate the COS Event Channel semantics within the Event_Channel
+ "kernel."
+
+ * Happy new year! Let's start a new ChangeLog to celebrate the
+ new year.
+
diff --git a/Makefile b/Makefile
index d1e5a1297f1..854665febda 100644
--- a/Makefile
+++ b/Makefile
@@ -52,6 +52,7 @@ RELEASE_FILES = ACE_wrappers/ACE-categories \
ACE_wrappers/ACE-install.sh \
ACE_wrappers/BIBLIOGRAPHY \
ACE_wrappers/ChangeLog \
+ ACE_wrappers/ChangeLog-97a \
ACE_wrappers/ChangeLog-96b \
ACE_wrappers/ChangeLog-96a \
ACE_wrappers/ChangeLog-95 \
diff --git a/README b/README
index 756747c5f20..21f495eeb92 100644
--- a/README
+++ b/README
@@ -671,6 +671,7 @@ John Cosby <John.D.Cosby@cpmx.saic.com>
Wayne Vucenic <wvucenic@netgate.net>
Harry Gunnarsson <hg@carmenta.se>
James CE Johnson <jcej@lads.com>
+Samuel_Bercovici <Samuel_Bercovici_at_EFT__AD2@mail.icomverse.com>
I would particularly like to thank Paul Stephenson, who worked with me
at Ericsson and is now at ObjectSpace. Paul devised the recursive
diff --git a/ace/ACE.cpp b/ace/ACE.cpp
index 207dd5c805e..dc3177ed5c4 100644
--- a/ace/ACE.cpp
+++ b/ace/ACE.cpp
@@ -19,9 +19,9 @@ ACE::register_stdin_handler (ACE_Event_Handler *eh,
return thr_mgr->spawn (&ACE::read_adapter, eh, flags);
#else
// Keep compilers happy.
- flags = flags;
- thr_mgr = thr_mgr;
- reactor = reactor;
+ ACE_UNUSED_ARG (flags);
+ ACE_UNUSED_ARG (thr_mgr);
+ ACE_UNUSED_ARG (reactor);
return reactor->register_handler (ACE_STDIN, eh, ACE_Event_Handler::READ_MASK);
#endif /* ACE_WIN32 */
}
diff --git a/ace/Acceptor.cpp b/ace/Acceptor.cpp
index 10cd3306258..de394450352 100644
--- a/ace/Acceptor.cpp
+++ b/ace/Acceptor.cpp
@@ -68,7 +68,7 @@ ACE_Acceptor<SVC_HANDLER, ACE_PEER_ACCEPTOR_2>::open
return -1;
return this->reactor ()->register_handler
- (this, ACE_Event_Handler::READ_MASK);
+ (this, ACE_Event_Handler::ACCEPT_MASK);
}
// Simple constructor.
@@ -169,7 +169,7 @@ ACE_Acceptor<SVC_HANDLER, ACE_PEER_ACCEPTOR_2>::handle_close (ACE_HANDLE,
// accept_strategy_...
this->reactor_->remove_handler
- (handle, ACE_Event_Handler::READ_MASK | ACE_Event_Handler::DONT_CALL);
+ (handle, ACE_Event_Handler::ACCEPT_MASK | ACE_Event_Handler::DONT_CALL);
// Shut down the listen socket to recycle the handles.
if (this->peer_acceptor_.close () == -1)
@@ -438,7 +438,7 @@ ACE_Strategy_Acceptor<SVC_HANDLER, ACE_PEER_ACCEPTOR_2>::open
this->scheduling_strategy_ = sch_s;
return this->reactor ()->register_handler
- (this, ACE_Event_Handler::READ_MASK);
+ (this, ACE_Event_Handler::ACCEPT_MASK);
}
// Simple constructor.
@@ -520,7 +520,7 @@ ACE_Strategy_Acceptor<SVC_HANDLER, ACE_PEER_ACCEPTOR_2>::handle_close (ACE_HANDL
// accept_strategy_...
this->reactor ()->remove_handler
- (handle, ACE_Event_Handler::READ_MASK | ACE_Event_Handler::DONT_CALL);
+ (handle, ACE_Event_Handler::ACCEPT_MASK | ACE_Event_Handler::DONT_CALL);
// Set the Reactor to 0 so that we don't try to close down
// again.
@@ -711,7 +711,7 @@ ACE_Oneshot_Acceptor<SVC_HANDLER, ACE_PEER_ACCEPTOR_2>::handle_close (ACE_HANDLE
if (this->reactor ())
this->reactor ()->remove_handler
- (this, ACE_Event_Handler::READ_MASK | ACE_Event_Handler::DONT_CALL);
+ (this, ACE_Event_Handler::ACCEPT_MASK | ACE_Event_Handler::DONT_CALL);
if (this->peer_acceptor_.close () == -1)
ACE_ERROR ((LM_ERROR, "close\n"));
@@ -731,7 +731,7 @@ ACE_Oneshot_Acceptor<SVC_HANDLER, ACE_PEER_ACCEPTOR_2>::handle_timeout
// Since we aren't necessarily registered with the Reactor, don't
// bother to check the return value here...
if (this->reactor ())
- this->reactor ()->remove_handler (this, ACE_Event_Handler::READ_MASK);
+ this->reactor ()->remove_handler (this, ACE_Event_Handler::ACCEPT_MASK);
return 0;
}
@@ -763,8 +763,8 @@ ACE_Oneshot_Acceptor<SVC_HANDLER, ACE_PEER_ACCEPTOR_2>::register_handler
*tv) == 0)
return -1;
else
- return this->reactor ()->register_handler (this,
- ACE_Event_Handler::READ_MASK);
+ return this->reactor ()->register_handler
+ (this, ACE_Event_Handler::ACCEPT_MASK);
}
}
@@ -869,7 +869,7 @@ ACE_Oneshot_Acceptor<SVC_HANDLER, ACE_PEER_ACCEPTOR_2>::handle_input (ACE_HANDLE
if (this->shared_accept (this->svc_handler_, 0, 0, this->restart_) == -1)
result = -1;
if (this->reactor () && this->reactor ()->remove_handler
- (this, ACE_Event_Handler::READ_MASK | ACE_Event_Handler::DONT_CALL) == -1)
+ (this, ACE_Event_Handler::ACCEPT_MASK | ACE_Event_Handler::DONT_CALL) == -1)
result = -1;
return result;
}
diff --git a/ace/Event_Handler.cpp b/ace/Event_Handler.cpp
index ef2fbdb9b64..2b9ec440911 100644
--- a/ace/Event_Handler.cpp
+++ b/ace/Event_Handler.cpp
@@ -46,18 +46,18 @@ ACE_Event_Handler::set_handle (ACE_HANDLE)
// Gets the priority of this handler.
int
-ACE_Event_Handler::get_priority (void) const
+ACE_Event_Handler::priority (void) const
{
- ACE_TRACE ("ACE_Event_Handler::get_priority");
+ ACE_TRACE ("ACE_Event_Handler::priority");
return this->priority_;
}
// Sets the priority
void
-ACE_Event_Handler::set_priority (int priority)
+ACE_Event_Handler::priority (int priority)
{
- ACE_TRACE ("ACE_Event_Handler::set_priority");
+ ACE_TRACE ("ACE_Event_Handler::priority");
this->priority_ = priority;
}
diff --git a/ace/Event_Handler.h b/ace/Event_Handler.h
index f414d930ed4..78166e921d4 100644
--- a/ace/Event_Handler.h
+++ b/ace/Event_Handler.h
@@ -43,20 +43,20 @@ public:
{
LO_PRIORITY = 0,
HI_PRIORITY = 10,
- NULL_MASK = 0,
+ NULL_MASK = 0,
#if defined (ACE_USE_POLL)
- READ_MASK = POLLIN,
- WRITE_MASK = POLLOUT,
+ READ_MASK = POLLIN,
+ WRITE_MASK = POLLOUT,
EXCEPT_MASK = POLLPRI,
#else /* USE SELECT */
- READ_MASK = 0x1,
- WRITE_MASK = 0x4,
+ READ_MASK = 0x1,
+ WRITE_MASK = 0x4,
EXCEPT_MASK = 0x2,
#endif /* ACE_USE_POLL */
ACCEPT_MASK = 0x8,
- ALL_EVENTS_MASK = READ_MASK | WRITE_MASK | EXCEPT_MASK | ACCEPT_MASK,
- RWE_MASK = ALL_EVENTS_MASK,
- DONT_CALL = 0x100
+ ALL_EVENTS_MASK = READ_MASK | WRITE_MASK | EXCEPT_MASK | ACCEPT_MASK,
+ RWE_MASK = ALL_EVENTS_MASK,
+ DONT_CALL = 0x100
};
virtual ~ACE_Event_Handler (void);
@@ -70,9 +70,9 @@ public:
// Set the I/O handle.
// = Priority runs from MIN_PRIORITY (which is the "lowest priority") to MAX_PRIORITY (which is the "highest priority").
- virtual int get_priority (void) const;
+ virtual int priority (void) const;
// Get the priority of the Event_Handler.
- virtual void set_priority (int priority);
+ virtual void priority (int priority);
// Set the priority of the Event_Handler.
virtual int handle_input (ACE_HANDLE fd = ACE_INVALID_HANDLE);
@@ -89,7 +89,7 @@ public:
const void *arg = 0);
// Called when timer expires.
- virtual int handle_close (ACE_HANDLE fd,
+ virtual int handle_close (ACE_HANDLE handle,
ACE_Reactor_Mask close_mask);
// Called when object is removed from the ACE_Reactor
diff --git a/ace/Message_Block.cpp b/ace/Message_Block.cpp
index d58e13a8c03..df4dc918cc6 100644
--- a/ace/Message_Block.cpp
+++ b/ace/Message_Block.cpp
@@ -114,7 +114,7 @@ ACE_Message_Block::~ACE_Message_Block (void)
delete [] this->base_;
}
if (this->cont_)
- delete this->cont_;
+ this->cont_->release ();
this->prev_ = 0;
this->next_ = 0;
}
@@ -316,7 +316,16 @@ ACE_Message_Block::release (void)
return result;
}
-ACE_INLINE ACE_Message_Block *
+/* static */ ACE_Message_Block *
+ACE_Message_Block::release (ACE_Message_Block *mb)
+{
+ if (mb)
+ return mb->release ();
+ else
+ return 0;
+}
+
+ACE_Message_Block *
ACE_Message_Block::duplicate (void)
{
ACE_TRACE ("ACE_Message_Block::duplicate");
diff --git a/ace/Message_Block.h b/ace/Message_Block.h
index 30852f4dc8c..ba1a760b2d1 100644
--- a/ace/Message_Block.h
+++ b/ace/Message_Block.h
@@ -165,6 +165,12 @@ public:
// <this> and return 0. Behavior is undefined if reference count <
// 0.
+ static ACE_Message_Block *release (ACE_Message_Block *mb);
+ // This behaves like the non-static method <release>, except that it
+ // checks if <mb> is 0. This is similar to <CORBA::release>, which
+ // is useful if you want to eliminate lots of checks for NULL
+ // pointers before calling <release> on them. Returns <mb>.
+
// = Operations on Message data
int copy (const char *buf, size_t n);
diff --git a/ace/OS.h b/ace/OS.h
index 8b5c5342be5..0490acd87f7 100644
--- a/ace/OS.h
+++ b/ace/OS.h
@@ -64,9 +64,17 @@
#define ACE_DEFAULT_THR_LOGGING_SERVER_PORT 10008
#define ACE_DEFAULT_THR_LOGGING_SERVER_PORT_STR "10008"
+// Used for the gateway server.
+#define ACE_DEFAULT_GATEWAY_SERVER_PORT 10009
+#define ACE_DEFAULT_GATEWAY_SERVER_PORT_STR "10009"
+
+// Used for the peer server.
+#define ACE_DEFAULT_PEER_SERVER_PORT 10010
+#define ACE_DEFAULT_PEER_SERVER_PORT_STR "10010"
+
// Used for the time server.
-#define ACE_DEFAULT_TIME_SERVER_PORT 10010
-#define ACE_DEFAULT_TIME_SERVER_PORT_STR "10010"
+#define ACE_DEFAULT_TIME_SERVER_PORT 10011
+#define ACE_DEFAULT_TIME_SERVER_PORT_STR "10011"
#define ACE_DEFAULT_TIME_SERVER_STR "ACE_TS_TIME"
#define ACE_DEFAULT_SERVER_HOST "localhost"
diff --git a/ace/Reactor.cpp b/ace/Reactor.cpp
index c7a629d028b..335f8879edc 100644
--- a/ace/Reactor.cpp
+++ b/ace/Reactor.cpp
@@ -625,6 +625,7 @@ ACE_Reactor_Notify::handle_input (ACE_HANDLE handle)
switch (buffer.mask_)
{
case ACE_Event_Handler::READ_MASK:
+ case ACE_Event_Handler::ACCEPT_MASK:
result = buffer.eh_->handle_input (ACE_INVALID_HANDLE);
break;
case ACE_Event_Handler::WRITE_MASK:
@@ -1093,7 +1094,7 @@ ACE_Reactor::bit_ops (ACE_HANDLE handle,
ACE_Sig_Guard sb; // Block out all signals until method returns.
ACE_FDS_PTMF ptmf = &ACE_Handle_Set::set_bit;
- u_long omask = ACE_Event_Handler::NULL_MASK;
+ u_long omask = ACE_Event_Handler::NULL_MASK;
switch (ops)
{
@@ -1116,11 +1117,12 @@ ACE_Reactor::bit_ops (ACE_HANDLE handle,
// The following code is rather subtle... Note that if we are
// doing a ACE_Reactor::SET_MASK then if the bit is not enabled
// in the mask we need to clear the bit from the ACE_Handle_Set.
- // On the other hand, f we are doing a ACE_Reactor::CLR_MASK or
+ // On the other hand, if we are doing a ACE_Reactor::CLR_MASK or
// a ACE_Reactor::ADD_MASK we just carry out the operations
// specified by the mask.
- if (ACE_BIT_ENABLED (mask, ACE_Event_Handler::READ_MASK))
+ if (ACE_BIT_ENABLED (mask, ACE_Event_Handler::READ_MASK)
+ || ACE_BIT_ENABLED (mask, ACE_Event_Handler::ACCEPT_MASK))
{
(rd.*ptmf) (handle);
ACE_SET_BITS (omask, ACE_Event_Handler::READ_MASK);
@@ -1189,7 +1191,8 @@ ACE_Reactor::handler_i (ACE_HANDLE handle,
return -1;
else
{
- if (ACE_BIT_ENABLED (mask, ACE_Event_Handler::READ_MASK)
+ if ((ACE_BIT_ENABLED (mask, ACE_Event_Handler::READ_MASK)
+ || ACE_BIT_ENABLED (mask, ACE_Event_Handler::ACCEPT_MASK))
&& this->rd_handle_mask_.is_set (handle) == 0)
return -1;
if (ACE_BIT_ENABLED (mask, ACE_Event_Handler::WRITE_MASK)
diff --git a/ace/Service_Manager.cpp b/ace/Service_Manager.cpp
index 834396dff1d..49564b0e30a 100644
--- a/ace/Service_Manager.cpp
+++ b/ace/Service_Manager.cpp
@@ -102,7 +102,7 @@ ACE_Service_Manager::init (int argc, char *argv[])
if (this->open (local_addr) == -1)
ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "open"), -1);
else if (ACE_Service_Config::reactor ()->register_handler
- (this, ACE_Event_Handler::READ_MASK) == -1)
+ (this, ACE_Event_Handler::ACCEPT_MASK) == -1)
ACE_ERROR_RETURN ((LM_ERROR, "registering service with ACE_Reactor\n"), -1);
return 0;
}
@@ -119,7 +119,7 @@ ACE_Service_Manager::fini (void)
{
ACE_TRACE ("ACE_Service_Manager::fini");
return ACE_Service_Config::reactor ()->remove_handler
- (this, ACE_Event_Handler::READ_MASK);
+ (this, ACE_Event_Handler::ACCEPT_MASK);
}
ACE_HANDLE
diff --git a/ace/Svc_Handler.cpp b/ace/Svc_Handler.cpp
index 67855016c1a..33a507d7e4b 100644
--- a/ace/Svc_Handler.cpp
+++ b/ace/Svc_Handler.cpp
@@ -124,8 +124,8 @@ ACE_Svc_Handler<PR_ST_2, ACE_SYNCH_2>::open (void *)
buf, this->peer_.get_handle ()));
#endif /* DEBUGGING */
if (this->reactor ()
- && this->reactor ()->register_handler (this,
- ACE_Event_Handler::READ_MASK) == -1)
+ && this->reactor ()->register_handler
+ (this, ACE_Event_Handler::READ_MASK) == -1)
ACE_ERROR_RETURN ((LM_ERROR, "%p",
"unable to register client handler"), -1);
return 0;
diff --git a/ace/Task.cpp b/ace/Task.cpp
index a7626f52983..c93fb7e4822 100644
--- a/ace/Task.cpp
+++ b/ace/Task.cpp
@@ -123,24 +123,6 @@ ACE_Task_Base::ACE_Task_Base (ACE_Thread_Manager *thr_man)
{
}
-// Get the current group id.
-int
-ACE_Task_Base::grp_id (void)
-{
- ACE_TRACE ("ACE_Task_Base::grp_id");
- ACE_MT (ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, this->lock_, -1));
- return this->grp_id_;
-}
-
-// Set the current group id.
-void
-ACE_Task_Base::grp_id (int id)
-{
- ACE_TRACE ("ACE_Task_Base::grp_id");
- ACE_MT (ACE_GUARD (ACE_Thread_Mutex, ace_mon, this->lock_));
- this->grp_id_ = id;
-}
-
// Suspend a task.
int
ACE_Task_Base::suspend (void)
diff --git a/ace/Task.h b/ace/Task.h
index 8e6f4752f27..9975f765220 100644
--- a/ace/Task.h
+++ b/ace/Task.h
@@ -103,25 +103,25 @@ public:
virtual int resume (void);
// Resume a suspended task.
- int grp_id (void);
+ int grp_id (void) const;
// Get the current group id.
void grp_id (int);
// Set the current group id.
- ACE_Thread_Manager *thr_mgr (void);
+ ACE_Thread_Manager *thr_mgr (void) const;
// Gets the thread manager associated with this Task.
void thr_mgr (ACE_Thread_Manager *);
// Set the thread manager associated with this Task.
- int is_reader (void);
+ int is_reader (void) const;
// True if queue is a reader, else false.
- int is_writer (void);
+ int is_writer (void) const;
// True if queue is a writer, else false.
- size_t thr_count (void);
+ size_t thr_count (void) const;
// Returns the number of threads currently running within a task.
// If we're a passive object this value is 0, else it's > 0.
diff --git a/ace/Task.i b/ace/Task.i
index 7e682513761..798606a68ae 100644
--- a/ace/Task.i
+++ b/ace/Task.i
@@ -4,7 +4,7 @@
// Task.i
ACE_INLINE ACE_Thread_Manager *
-ACE_Task_Base::thr_mgr (void)
+ACE_Task_Base::thr_mgr (void) const
{
ACE_TRACE ("ACE_Task_Base::thr_mgr");
return this->thr_mgr_;
@@ -19,10 +19,10 @@ ACE_Task_Base::thr_mgr (ACE_Thread_Manager *thr_mgr)
// Return the count of the current number of threads.
ACE_INLINE size_t
-ACE_Task_Base::thr_count (void)
+ACE_Task_Base::thr_count (void) const
{
ACE_TRACE ("ACE_Task_Base::thr_count");
- ACE_MT (ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, this->lock_, 0));
+ ACE_MT (ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, (ACE_Thread_Mutex &) this->lock_, 0));
return this->thr_count_;
}
@@ -39,14 +39,14 @@ ACE_Task_Base::thr_count_dec (void)
}
ACE_INLINE int
-ACE_Task_Base::is_reader (void)
+ACE_Task_Base::is_reader (void) const
{
ACE_TRACE ("ACE_Task_Base::is_reader");
return (ACE_BIT_ENABLED (this->flags_, ACE_Task_Flags::ACE_READER));
}
ACE_INLINE int
-ACE_Task_Base::is_writer (void)
+ACE_Task_Base::is_writer (void) const
{
ACE_TRACE ("ACE_Task_Base::is_writer");
return (ACE_BIT_DISABLED (this->flags_, ACE_Task_Flags::ACE_READER));
@@ -87,3 +87,22 @@ ACE_Task_Base::put (ACE_Message_Block *, ACE_Time_Value *)
ACE_TRACE ("ACE_Task_Base::put");
return 0;
}
+
+// Get the current group id.
+ACE_INLINE int
+ACE_Task_Base::grp_id (void) const
+{
+ ACE_TRACE ("ACE_Task_Base::grp_id");
+ ACE_MT (ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, (ACE_Thread_Mutex &) this->lock_, -1));
+ return this->grp_id_;
+}
+
+// Set the current group id.
+ACE_INLINE void
+ACE_Task_Base::grp_id (int id)
+{
+ ACE_TRACE ("ACE_Task_Base::grp_id");
+ ACE_MT (ACE_GUARD (ACE_Thread_Mutex, ace_mon, this->lock_));
+ this->grp_id_ = id;
+}
+
diff --git a/apps/Gateway/Gateway/Concrete_Proxy_Handlers.cpp b/apps/Gateway/Gateway/Concrete_Proxy_Handlers.cpp
new file mode 100644
index 00000000000..e250dccf173
--- /dev/null
+++ b/apps/Gateway/Gateway/Concrete_Proxy_Handlers.cpp
@@ -0,0 +1,593 @@
+// $Id$
+
+#include "Event_Channel.h"
+#include "Concrete_Proxy_Handlers.h"
+
+Consumer_Proxy::Consumer_Proxy (const Proxy_Config_Info &pci)
+ : Proxy_Handler (pci)
+{
+ this->proxy_role_ = 'C';
+ this->msg_queue ()->high_water_mark (Consumer_Proxy::MAX_QUEUE_SIZE);
+}
+
+// This method should be called only when the Consumer shuts down
+// unexpectedly. This method simply marks the Proxy_Handler as having
+// failed so that handle_close () can reconnect.
+
+int
+Consumer_Proxy::handle_input (ACE_HANDLE)
+{
+ char buf[1];
+
+ this->state (Proxy_Handler::FAILED);
+
+ switch (this->peer ().recv (buf, sizeof buf))
+ {
+ case -1:
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "(%t) Peer has failed unexpectedly for Consumer_Proxy %d\n",
+ this->id ()), -1);
+ /* NOTREACHED */
+ case 0:
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "(%t) Peer has shutdown unexpectedly for Consumer_Proxy %d\n",
+ this->id ()), -1);
+ /* NOTREACHED */
+ default:
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "(%t) Consumer is erroneously sending input to Consumer_Proxy %d\n",
+ this->id ()), -1);
+ /* NOTREACHED */
+ }
+}
+
+// Perform a non-blocking put() of event. If we are unable to send
+// the entire event the remainder is re-queued at the *front* of the
+// Event_List.
+
+int
+Consumer_Proxy::nonblk_put (ACE_Message_Block *event)
+{
+ // Try to send the event. If we don't send it all (e.g., due to
+ // flow control), then re-queue the remainder at the head of the
+ // Event_List and ask the ACE_Reactor to inform us (via
+ // handle_output()) when it is possible to try again.
+
+ ssize_t n = this->send (event);
+
+ if (n == -1)
+ {
+ // Things have gone wrong, let's try to close down and set up a
+ // new reconnection by calling handle_close().
+ this->state (Proxy_Handler::FAILED);
+ this->handle_close ();
+ return -1;
+ }
+ else if (errno == EWOULDBLOCK) // Didn't manage to send everything.
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "(%t) queueing activated on handle %d to routing id %d\n",
+ this->get_handle (), this->id ()));
+
+ // ACE_Queue in *front* of the list to preserve order.
+ if (this->msg_queue ()->enqueue_head
+ (event, (ACE_Time_Value *) &ACE_Time_Value::zero) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "enqueue_head"), -1);
+
+ // Tell ACE_Reactor to call us back when we can send again.
+ else if (ACE_Service_Config::reactor ()->schedule_wakeup
+ (this, ACE_Event_Handler::WRITE_MASK) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "schedule_wakeup"), -1);
+ return 0;
+ }
+ else
+ return n;
+}
+
+ssize_t
+Consumer_Proxy::send (ACE_Message_Block *event)
+{
+ ACE_DEBUG ((LM_DEBUG, "(%t) sending %d bytes to Consumer %d\n",
+ event->length (), this->id ()));
+
+ ssize_t len = event->length ();
+ ssize_t n = this->peer ().send (event->rd_ptr (), len);
+
+ if (n <= 0)
+ return errno == EWOULDBLOCK ? 0 : n;
+ else if (n < len)
+ // Re-adjust pointer to skip over the part we did send.
+ event->rd_ptr (n);
+ else // if (n == length)
+ {
+ // The whole event is sent, we now decrement the reference count
+ // (which deletes itself with it reaches 0.
+ event->release ();
+ errno = 0;
+ }
+ this->total_bytes (n);
+ return n;
+}
+
+// Finish sending an event when flow control conditions abate.
+// This method is automatically called by the ACE_Reactor.
+
+int
+Consumer_Proxy::handle_output (ACE_HANDLE)
+{
+ ACE_Message_Block *event = 0;
+
+ ACE_DEBUG ((LM_DEBUG,
+ "(%t) in handle_output on handle %d\n",
+ this->get_handle ()));
+ // The list had better not be empty, otherwise there's a bug!
+
+ if (this->msg_queue ()->dequeue_head
+ (event, (ACE_Time_Value *) &ACE_Time_Value::zero) != -1)
+ {
+ switch (this->nonblk_put (event))
+ {
+ case 0: // Partial send.
+ ACE_ASSERT (errno == EWOULDBLOCK);
+ // Didn't write everything this time, come back later...
+ break;
+
+ case -1:
+ // We are responsible for releasing an ACE_Message_Block if
+ // failures occur.
+ event->release ();
+ ACE_ERROR ((LM_ERROR, "(%t) %p\n", "transmission failure"));
+
+ /* FALLTHROUGH */
+ default: // Sent the whole thing.
+
+ // If we succeed in writing the entire event (or we did not
+ // fail due to EWOULDBLOCK) then check if there are more
+ // events on the Message_Queue. If there aren't, tell the
+ // ACE_Reactor not to notify us anymore (at least until
+ // there are new events queued up).
+
+ if (this->msg_queue ()->is_empty ())
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "(%t) queueing deactivated on handle %d to routing id %d\n",
+ this->get_handle (), this->id ()));
+
+
+ if (ACE_Service_Config::reactor ()->cancel_wakeup
+ (this, ACE_Event_Handler::WRITE_MASK) == -1)
+ ACE_ERROR ((LM_ERROR, "(%t) %p\n", "cancel_wakeup"));
+ }
+ }
+ }
+ else
+ ACE_ERROR ((LM_ERROR, "(%t) %p\n", "dequeue_head"));
+ return 0;
+}
+
+// Send an event to a Consumer (may queue if necessary).
+
+int
+Consumer_Proxy::put (ACE_Message_Block *event, ACE_Time_Value *)
+{
+ if (this->msg_queue ()->is_empty ())
+ // Try to send the event *without* blocking!
+ return this->nonblk_put (event);
+ else
+ // If we have queued up events due to flow control then just
+ // enqueue and return.
+ return this->msg_queue ()->enqueue_tail
+ (event, (ACE_Time_Value *) &ACE_Time_Value::zero);
+}
+
+Supplier_Proxy::Supplier_Proxy (const Proxy_Config_Info &pci)
+ : msg_frag_ (0),
+ Proxy_Handler (pci)
+{
+ this->proxy_role_ = 'S';
+ this->msg_queue ()->high_water_mark (0);
+}
+
+// Receive an Event from a Supplier. Handles fragmentation.
+//
+// The event returned from recv consists of two parts:
+//
+// 1. The Address part, contains the "virtual" routing id.
+//
+// 2. The Data part, which contains the actual data to be forwarded.
+//
+// The reason for having two parts is to shield the higher layers
+// of software from knowledge of the event structure.
+
+int
+Supplier_Proxy::recv (ACE_Message_Block *&forward_addr)
+{
+ if (this->msg_frag_ == 0)
+ // No existing fragment...
+ ACE_NEW_RETURN (this->msg_frag_,
+ ACE_Message_Block (sizeof (Event)),
+ -1);
+
+ Event *event = (Event *) this->msg_frag_->rd_ptr ();
+ ssize_t header_received = 0;
+
+ const ssize_t HEADER_SIZE = sizeof (Event_Header);
+ ssize_t header_bytes_left_to_read =
+ HEADER_SIZE - this->msg_frag_->length ();
+
+ if (header_bytes_left_to_read > 0)
+ {
+ header_received = this->peer ().recv
+ (this->msg_frag_->wr_ptr (), header_bytes_left_to_read);
+
+ if (header_received == -1 /* error */
+ || header_received == 0 /* EOF */)
+ {
+ ACE_ERROR ((LM_ERROR, "%p\n",
+ "Recv error during header read "));
+ ACE_DEBUG ((LM_DEBUG,
+ "attempted to read %d\n",
+ header_bytes_left_to_read));
+ this->msg_frag_ = this->msg_frag_->release ();
+ return header_received;
+ }
+
+ // Bump the write pointer by the amount read.
+ this->msg_frag_->wr_ptr (header_received);
+
+ // At this point we may or may not have the ENTIRE header.
+ if (this->msg_frag_->length () < HEADER_SIZE)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "Partial header received: only %d bytes\n",
+ this->msg_frag_->length ()));
+ // Notify the caller that we didn't get an entire event.
+ errno = EWOULDBLOCK;
+ return -1;
+ }
+
+ // Convert the header into host byte order so that we can access
+ // it directly without having to repeatedly muck with it...
+ event->header_.decode ();
+
+ if (event->header_.len_ > sizeof event->data_)
+ {
+ // This data_ payload is too big!
+ errno = EINVAL;
+ ACE_DEBUG ((LM_DEBUG,
+ "Data payload is too big (%d bytes)\n",
+ event->header_.len_));
+ return -1;
+ }
+
+ }
+
+ // At this point there is a complete, valid header in Event. Now we
+ // need to get the event payload. Due to incomplete reads this may
+ // not be the first time we've read in a fragment for this message.
+ // We account for this here. Note that the first time in here
+ // msg_frag_->wr_ptr() will point to event->data_. Every time we do
+ // a successful fragment read, we advance wr_ptr(). Therefore, by
+ // subtracting how much we've already read from the
+ // event->header_.len_ we complete the data_bytes_left_to_read...
+
+ ssize_t data_bytes_left_to_read =
+ ssize_t (event->header_.len_ - (msg_frag_->wr_ptr () - event->data_));
+
+ ssize_t data_received =
+ this->peer ().recv (this->msg_frag_->wr_ptr (), data_bytes_left_to_read);
+
+ // Try to receive the remainder of the event.
+
+ switch (data_received)
+ {
+ case -1:
+ if (errno == EWOULDBLOCK)
+ // This might happen if only the header came through.
+ return -1;
+ else
+ /* FALLTHROUGH */;
+
+ case 0: // Premature EOF.
+ this->msg_frag_ = this->msg_frag_->release ();
+ return 0;
+
+ default:
+ // Set the write pointer at 1 past the end of the event.
+ this->msg_frag_->wr_ptr (data_received);
+
+ if (data_received != data_bytes_left_to_read)
+ {
+ errno = EWOULDBLOCK;
+ // Inform caller that we didn't get the whole event.
+ return -1;
+ }
+ else
+ {
+ // Set the read pointer to the beginning of the event.
+ this->msg_frag_->rd_ptr (this->msg_frag_->base ());
+
+ // Allocate an event forwarding header and chain the data
+ // portion onto its continuation field.
+ forward_addr = new ACE_Message_Block (sizeof (Event_Key),
+ ACE_Message_Block::MB_PROTO,
+ this->msg_frag_);
+ if (forward_addr == 0)
+ {
+ this->msg_frag_ = this->msg_frag_->release ();
+ errno = ENOMEM;
+ return -1;
+ }
+
+ Event_Key event_addr (this->id (),
+ event->header_.supplier_id_,
+ event->header_.type_);
+ // Copy the forwarding address from the Event_Key into
+ // forward_addr.
+ forward_addr->copy ((char *) &event_addr, sizeof (Event_Key));
+
+ // Reset the pointer to indicate we've got an entire event.
+ this->msg_frag_ = 0;
+ }
+
+ this->total_bytes (data_received + header_received);
+#if defined (VERBOSE)
+ ACE_DEBUG ((LM_DEBUG, "(%t) connection id = %d, supplier id = %d, len = %d, payload = %*s",
+ event_addr.proxy_id_, event->header_.supplier_id_, event->header_.len_,
+ event->header_.len_, event->data_));
+#else
+ ACE_DEBUG ((LM_DEBUG, "(%t) supplier id = %d, cur len = %d, total bytes read = %d\n",
+ event->header_.supplier_id_, event->header_.len_, data_received + header_received));
+#endif /* VERBOSE */
+
+ // Encode before returning so that we can set things out in
+ // network byte order.
+ event->header_.encode ();
+ return data_received + header_received;
+ }
+}
+
+// Receive various types of input (e.g., Peer event from the
+// gatewayd, as well as stdio).
+
+int
+Supplier_Proxy::handle_input (ACE_HANDLE)
+{
+ ACE_Message_Block *forward_addr = 0;
+
+ switch (this->recv (forward_addr))
+ {
+ case 0:
+ // Note that a peer should never initiate a shutdown by closing
+ // the connection. Instead, it should reconnect.
+ this->state (Proxy_Handler::FAILED);
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "(%t) Peer has closed down unexpectedly for Input Proxy_Handler %d\n",
+ this->id ()), -1);
+ /* NOTREACHED */
+ case -1:
+ if (errno == EWOULDBLOCK)
+ // A short-read, we'll come back and finish it up later on!
+ return 0;
+ else // A weird problem occurred, shut down and start again.
+ {
+ this->state (Proxy_Handler::FAILED);
+ ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p for Input Proxy_Handler %d\n",
+ "Peer has failed unexpectedly",
+ this->id ()), -1);
+ }
+ /* NOTREACHED */
+ default:
+ return this->forward (forward_addr);
+ }
+}
+
+// Forward an event to its appropriate Consumer(s). This delegates to
+// the <ACE_Event_Channel> to do the actual forwarding.
+
+int
+Supplier_Proxy::forward (ACE_Message_Block *forward_addr)
+{
+ return this->event_channel_.put (forward_addr);
+}
+
+#if defined (ACE_HAS_THREADS)
+Thr_Consumer_Proxy::Thr_Consumer_Proxy (const Proxy_Config_Info &pci)
+ : Consumer_Proxy (pci)
+{
+}
+
+// This method should be called only when the Consumer shuts down
+// unexpectedly. This method marks the Proxy_Handler as having failed
+// and deactivates the ACE_Message_Queue (to wake up the thread
+// blocked on <dequeue_head> in svc()).
+// Thr_Output_Handler::handle_close () will eventually try to
+// reconnect...
+
+int
+Thr_Consumer_Proxy::handle_input (ACE_HANDLE h)
+{
+ // Call down to the <Consumer_Proxy> to handle this first.
+ this->Consumer_Proxy::handle_input (h);
+
+ ACE_Service_Config::reactor ()->remove_handler
+ (h, ACE_Event_Handler::ALL_EVENTS_MASK | ACE_Event_Handler::DONT_CALL);
+
+ // Deactivate the queue while we try to get reconnected.
+ this->msg_queue ()->deactivate ();
+ return 0;
+}
+
+// Initialize the threaded Consumer_Proxy object and spawn a new
+// thread.
+
+int
+Thr_Consumer_Proxy::open (void *)
+{
+ // Turn off non-blocking I/O.
+ if (this->peer ().disable (ACE_NONBLOCK) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "enable"), -1);
+
+ // Call back to the <Event_Channel> to complete our initialization.
+ else if (this->event_channel_.complete_proxy_connection (this) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "complete_proxy_connection"), -1);
+
+ // Register ourselves to receive input events (which indicate that
+ // the Consumer has shut down unexpectedly).
+ else if (ACE_Service_Config::reactor ()->register_handler
+ (this, ACE_Event_Handler::READ_MASK) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "register_handler"), -1);
+
+ // Reactivate message queue. If it was active then this is the
+ // first time in and we need to spawn a thread, otherwise the queue
+ // was inactive due to some problem and we've already got a thread.
+ else if (this->msg_queue ()->activate () == ACE_Message_Queue<ACE_MT_SYNCH>::WAS_ACTIVE)
+ {
+ ACE_DEBUG ((LM_DEBUG, "(%t) spawning new thread\n"));
+ // Become an active object by spawning a new thread to transmit
+ // events to Consumers.
+ return this->activate (THR_NEW_LWP | THR_DETACHED);
+ }
+ else
+ {
+ ACE_DEBUG ((LM_DEBUG, "(%t) reusing existing thread\n"));
+ return 0;
+ }
+}
+
+// Queue up an event for transmission (must not block since
+// Supplier_Proxys may be single-threaded).
+
+int
+Thr_Consumer_Proxy::put (ACE_Message_Block *mb, ACE_Time_Value *)
+{
+ // Perform non-blocking enqueue.
+ return this->msg_queue ()->enqueue_tail
+ (mb, (ACE_Time_Value *) &ACE_Time_Value::zero);
+}
+
+// Transmit events to the peer (note simplification resulting from
+// threads...)
+
+int
+Thr_Consumer_Proxy::svc (void)
+{
+
+ for (;;)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "(%t) Thr_Consumer_Proxy's handle = %d\n",
+ this->peer ().get_handle ()));
+
+ // Since this method runs in its own thread it is OK to block on
+ // output.
+
+ for (ACE_Message_Block *mb = 0;
+ this->msg_queue ()->dequeue_head (mb) != -1;
+ )
+ {
+ if (this->send (mb) == -1)
+ ACE_ERROR ((LM_ERROR, "(%t) %p\n", "send failed"));
+ }
+
+ ACE_ASSERT (errno == ESHUTDOWN);
+
+ ACE_DEBUG ((LM_DEBUG,
+ "(%t) shutting down threaded Consumer_Proxy %d on handle %d\n",
+ this->id (), this->get_handle ()));
+
+ this->peer ().close ();
+
+ for (this->timeout (1);
+ // Default is to reconnect synchronously.
+ this->event_channel_.initiate_proxy_connection (this) == -1; )
+ {
+ ACE_Time_Value tv (this->timeout ());
+
+ ACE_ERROR ((LM_ERROR,
+ "(%t) reattempting connection, sec = %d\n",
+ tv.sec ()));
+
+ ACE_OS::sleep (tv);
+ }
+ }
+
+ return 0;
+}
+
+Thr_Supplier_Proxy::Thr_Supplier_Proxy (const Proxy_Config_Info &pci)
+ : Supplier_Proxy (pci)
+{
+}
+
+int
+Thr_Supplier_Proxy::open (void *)
+{
+ // Turn off non-blocking I/O.
+ if (this->peer ().disable (ACE_NONBLOCK) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "enable"), -1);
+
+ // Call back to the <Event_Channel> to complete our initialization.
+ else if (this->event_channel_.complete_proxy_connection (this) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "complete_proxy_connection"), -1);
+
+ // Reactivate message queue. If it was active then this is the
+ // first time in and we need to spawn a thread, otherwise the queue
+ // was inactive due to some problem and we've already got a thread.
+ else if (this->msg_queue ()->activate () == ACE_Message_Queue<ACE_MT_SYNCH>::WAS_ACTIVE)
+ {
+ ACE_DEBUG ((LM_DEBUG, "(%t) spawning new thread\n"));
+ // Become an active object by spawning a new thread to transmit
+ // events to peers.
+ return this->activate (THR_NEW_LWP | THR_DETACHED);
+ }
+ else
+ {
+ ACE_DEBUG ((LM_DEBUG, "(%t) reusing existing thread\n"));
+ return 0;
+ }
+}
+
+// Receive events from a Peer in a separate thread (note reuse of
+// existing code!).
+
+int
+Thr_Supplier_Proxy::svc (void)
+{
+ for (;;)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "(%t) Thr_Supplier_Proxy's handle = %d\n",
+ this->peer ().get_handle ()));
+
+ // Since this method runs in its own thread and processes events
+ // for one connection it is OK to call down to the
+ // <Supplier_Proxy::handle_input> method, which blocks on input.
+
+ while (this->Supplier_Proxy::handle_input () != -1)
+ continue;
+
+ ACE_DEBUG ((LM_DEBUG,
+ "(%t) shutting down threaded Supplier_Proxy %d on handle %d\n",
+ this->id (), this->get_handle ()));
+
+ this->peer ().close ();
+
+ // Deactivate the queue while we try to get reconnected.
+ this->msg_queue ()->deactivate ();
+
+ for (this->timeout (1);
+ // Default is to reconnect synchronously.
+ this->event_channel_.initiate_proxy_connection (this) == -1; )
+ {
+ ACE_Time_Value tv (this->timeout ());
+ ACE_ERROR ((LM_ERROR,
+ "(%t) reattempting connection, sec = %d\n",
+ tv.sec ()));
+ ACE_OS::sleep (tv);
+ }
+ }
+ return 0;
+}
+
+#endif /* ACE_HAS_THREADS */
diff --git a/apps/Gateway/Gateway/Concrete_Proxy_Handlers.h b/apps/Gateway/Gateway/Concrete_Proxy_Handlers.h
new file mode 100644
index 00000000000..30f06b7d7c7
--- /dev/null
+++ b/apps/Gateway/Gateway/Concrete_Proxy_Handlers.h
@@ -0,0 +1,125 @@
+/* -*- C++ -*- */
+// $Id$
+
+// ============================================================================
+//
+// = LIBRARY
+// apps
+//
+// = FILENAME
+// Concrete_Proxy_Handlers.h
+//
+// = DESCRIPTION
+// These are all the subclasses of Proxy_Handler that define the
+// appropriate threaded/reactive Consumer/Supplier behavior.
+//
+// = AUTHOR
+// Doug Schmidt
+//
+// ============================================================================
+
+#if !defined (_CONCRETE_PROXY_HANDLER)
+#define _CONCRETE_PROXY_HANDLER
+
+#include "Proxy_Handler.h"
+
+class Supplier_Proxy : public Proxy_Handler
+ // = TITLE
+ // Handles reception of Events from Suppliers
+ //
+ // = DESCRIPTION
+ // Performs framing and error checking.
+{
+public:
+ // = Initialization method.
+ Supplier_Proxy (const Proxy_Config_Info &);
+
+protected:
+ // = All the following methods are upcalls, so they can be protected.
+
+ virtual int handle_input (ACE_HANDLE = ACE_INVALID_HANDLE);
+ // Receive and process peer events.
+
+ virtual int recv (ACE_Message_Block *&);
+ // Receive an event from a Supplier.
+
+ int forward (ACE_Message_Block *event);
+ // Forward the <event> to its appropriate Consumer. This delegates
+ // to the <ACE_Event_Channel> to do the actual forwarding.
+
+ ACE_Message_Block *msg_frag_;
+ // Keep track of event fragment to handle non-blocking recv's from
+ // Suppliers.
+};
+
+class Consumer_Proxy : public Proxy_Handler
+ // = TITLE
+ // Handles transmission of events to Consumers.
+ //
+ // = DESCRIPTION
+ // Performs queueing and error checking. Uses a single-threaded
+ // Reactive approach to handle flow control.
+{
+public:
+ // = Initialization method.
+ Consumer_Proxy (const Proxy_Config_Info &);
+
+ virtual int put (ACE_Message_Block *event,
+ ACE_Time_Value * = 0);
+ // Send an event to a Consumer (may be queued if necessary).
+
+protected:
+ // = We'll allow up to 16 megabytes to be queued per-output proxy.
+ enum {MAX_QUEUE_SIZE = 1024 * 1024 * 16};
+
+ virtual int handle_output (ACE_HANDLE);
+ // Finish sending event when flow control conditions abate.
+
+ int nonblk_put (ACE_Message_Block *mb);
+ // Perform a non-blocking put().
+
+ virtual ssize_t send (ACE_Message_Block *);
+ // Send an event to a Consumer.
+
+ virtual int handle_input (ACE_HANDLE);
+ // Receive and process shutdowns from a Consumer.
+};
+
+class Thr_Consumer_Proxy : public Consumer_Proxy
+ // = TITLE
+ // Runs each Output Proxy_Handler in a separate thread.
+{
+public:
+ Thr_Consumer_Proxy (const Proxy_Config_Info &);
+
+ virtual int open (void *);
+ // Initialize the threaded Consumer_Proxy object and spawn a new
+ // thread.
+
+ virtual int put (ACE_Message_Block *, ACE_Time_Value * = 0);
+ // Send a message to a peer.
+
+protected:
+ virtual int handle_input (ACE_HANDLE);
+ // Called when Peer shutdown unexpectedly.
+
+ virtual int svc (void);
+ // Transmit peer messages.
+};
+
+class Thr_Supplier_Proxy : public Supplier_Proxy
+ // = TITLE
+ // Runs each Input Proxy_Handler in a separate thread.
+{
+public:
+ Thr_Supplier_Proxy (const Proxy_Config_Info &pci);
+
+ virtual int open (void *);
+ // Initialize the object and spawn a new thread.
+
+protected:
+ virtual int svc (void);
+ // Transmit peer messages.
+};
+
+#endif /* _CONCRETE_PROXY_HANDLER */
diff --git a/apps/Gateway/Gateway/Config_Files.cpp b/apps/Gateway/Gateway/Config_Files.cpp
index 5b95dc4fbf0..feb4d72f010 100644
--- a/apps/Gateway/Gateway/Config_Files.cpp
+++ b/apps/Gateway/Gateway/Config_Files.cpp
@@ -8,7 +8,7 @@
typedef FP::Return_Type FP_RETURN_TYPE;
FP_RETURN_TYPE
-Consumer_Config_File_Parser::read_entry (Consumer_Config_File_Entry &entry,
+Consumer_Config_File_Parser::read_entry (Consumer_Config_Info &entry,
int &line_number)
{
FP_RETURN_TYPE read_result;
@@ -18,7 +18,7 @@ Consumer_Config_File_Parser::read_entry (Consumer_Config_File_Entry &entry,
// Ignore comments, check for EOF and EOLINE if this succeeds, we
// have our connection id.
- while ((read_result = this->getint (entry.conn_id_)) != FP::SUCCESS)
+ while ((read_result = this->getint (entry.proxy_id_)) != FP::SUCCESS)
{
if (read_result == FP::EOFILE)
return FP::EOFILE;
@@ -27,7 +27,7 @@ Consumer_Config_File_Parser::read_entry (Consumer_Config_File_Entry &entry,
line_number++;
}
- // Get the logical id.
+ // Get the supplier id.
if ((read_result = this->getint (entry.supplier_id_)) != FP::SUCCESS)
return read_result;
@@ -49,7 +49,7 @@ Consumer_Config_File_Parser::read_entry (Consumer_Config_File_Entry &entry,
}
FP_RETURN_TYPE
-Connection_Config_File_Parser::read_entry (Connection_Config_File_Entry &entry,
+Proxy_Config_File_Parser::read_entry (Proxy_Config_Info &entry,
int &line_number)
{
char buf[BUFSIZ];
@@ -59,7 +59,7 @@ Connection_Config_File_Parser::read_entry (Connection_Config_File_Entry &entry,
// Ignore comments, check for EOF and EOLINE
// if this succeeds, we have our connection id
- while ((read_result = this->getint (entry.conn_id_)) != FP::SUCCESS)
+ while ((read_result = this->getint (entry.proxy_id_)) != FP::SUCCESS)
{
if (read_result == FP::EOFILE)
return FP::EOFILE;
@@ -87,7 +87,7 @@ Connection_Config_File_Parser::read_entry (Connection_Config_File_Entry &entry,
entry.proxy_role_ = buf[0];
// Get the max retry delay.
- if ((read_result = this->getint (entry.max_retry_delay_)) != FP::SUCCESS)
+ if ((read_result = this->getint (entry.max_retry_timeout_)) != FP::SUCCESS)
return read_result;
// Get the local port number.
@@ -96,6 +96,14 @@ Connection_Config_File_Parser::read_entry (Connection_Config_File_Entry &entry,
else
entry.local_port_ = (u_short) port;
+ ACE_INT32 priority;
+
+ // Get the priority
+ if ((read_result = this->getint (priority)) != FP::SUCCESS)
+ return read_result;
+ else
+ entry.priority_ = priority;
+
return FP::SUCCESS;
}
@@ -108,8 +116,8 @@ int main (int argc, char *argv[])
exit (1);
}
FP_RETURN_TYPE result;
- Connection_Config_File_Entry entry;
- Connection_Config_File_Parser CCfile;
+ Proxy_Config_Info entry;
+ Proxy_Config_File_Parser CCfile;
CCfile.open (argv[1]);
@@ -125,12 +133,12 @@ int main (int argc, char *argv[])
cerr << "Error at line " << line_number << endl;
else
printf ("%d\t%s\t%d\t%c\t%d\t%c\t%d\n",
- entry.conn_id_, entry.host_, entry.remote_port_, entry.proxy_role_,
- entry.max_retry_delay_, entry.transform_, entry.local_port_);
+ entry.proxy_id_, entry.host_, entry.remote_port_, entry.proxy_role_,
+ entry.max_retry_timeout_, entry.transform_, entry.local_port_);
}
CCfile.close();
- Consumer_Config_File_Entry entry;
+ Consumer_Config_Info entry;
Consumer_Config_File_Parser file;
file.open (argv[2]);
@@ -147,7 +155,7 @@ int main (int argc, char *argv[])
else
{
printf ("%d\t%d\t%d\t%d\t",
- entry.conn_id_, entry.supplier_id_, entry.type_);
+ entry.proxy_id_, entry.supplier_id_, entry.type_);
while (--entry.total_consumers_ >= 0)
printf ("%d,", entry.consumers_[entry.total_consumers_]);
printf ("\n");
@@ -160,6 +168,6 @@ int main (int argc, char *argv[])
#endif /* DEBUGGING */
#if defined (ACE_TEMPLATES_REQUIRE_SPECIALIZATION)
-template class File_Parser<Connection_Config_File_Entry>;
-template class File_Parser<Consumer_Config_File_Entry>;
+template class File_Parser<Proxy_Config_Info>;
+template class File_Parser<Consumer_Config_Info>;
#endif /* ACE_TEMPLATES_REQUIRE_SPECIALIZATION */
diff --git a/apps/Gateway/Gateway/Config_Files.h b/apps/Gateway/Gateway/Config_Files.h
index eae0248eb8c..e7cd99505a2 100644
--- a/apps/Gateway/Gateway/Config_Files.h
+++ b/apps/Gateway/Gateway/Config_Files.h
@@ -20,12 +20,15 @@
#include "ace/OS.h"
#include "File_Parser.h"
-class Connection_Config_File_Entry
+// Forward declaration.
+class ACE_Event_Channel;
+
+class Proxy_Config_Info
// = TITLE
// Stores connection configuration information.
{
public:
- ACE_INT32 conn_id_;
+ ACE_INT32 proxy_id_;
// Connection id for this Proxy_Handler.
char host_[BUFSIZ];
@@ -37,23 +40,31 @@ public:
char proxy_role_;
// 'S' (supplier) or 'C' (consumer).
- ACE_INT32 max_retry_delay_;
+ ACE_INT32 max_retry_timeout_;
// Maximum amount of time to wait for reconnecting.
u_short local_port_;
// Our local port number.
+
+ ACE_INT32 priority_;
+ // Priority by which different Consumers and Suppliers should be
+ // serviced.
+
+ ACE_Event_Channel *event_channel_;
+ // We just need a place to store this until we can pass it along
+ // when creating a Proxy_Handler.
};
-class Connection_Config_File_Parser : public File_Parser<Connection_Config_File_Entry>
+class Proxy_Config_File_Parser : public File_Parser<Proxy_Config_Info>
// = TITLE
// Parser for the Proxy_Handler Connection file.
{
public:
virtual FP::Return_Type
- read_entry (Connection_Config_File_Entry &entry, int &line_number);
+ read_entry (Proxy_Config_Info &entry, int &line_number);
};
-class Consumer_Config_File_Entry
+class Consumer_Config_Info
// = TITLE
// Stores the information in a Consumer Map entry.
{
@@ -62,7 +73,7 @@ public:
MAX_CONSUMERS = 1000 // Total number of multicast consumers.
};
- ACE_INT32 conn_id_;
+ ACE_INT32 proxy_id_;
// Connection id for this proxy.
ACE_INT32 supplier_id_;
@@ -78,13 +89,13 @@ public:
// Total number of these consumers.
};
-class Consumer_Config_File_Parser : public File_Parser<Consumer_Config_File_Entry>
+class Consumer_Config_File_Parser : public File_Parser<Consumer_Config_Info>
// = TITLE
// Parser for the Consumer Map file.
{
public:
virtual FP::Return_Type
- read_entry (Consumer_Config_File_Entry &entry, int &line_number);
+ read_entry (Consumer_Config_Info &entry, int &line_number);
};
#endif /* _CONFIG_FILES */
diff --git a/apps/Gateway/Gateway/Event.h b/apps/Gateway/Gateway/Event.h
index 5e288edf910..467102392b1 100644
--- a/apps/Gateway/Gateway/Event.h
+++ b/apps/Gateway/Gateway/Event.h
@@ -36,18 +36,18 @@ public:
Event_Key (ACE_INT32 cid = -1,
u_char sid = 0,
u_char type = 0)
- : conn_id_ (cid),
+ : proxy_id_ (cid),
supplier_id_ (sid),
type_ (type) {}
int operator== (const Event_Key &event_addr) const
{
- return this->conn_id_ == event_addr.conn_id_
+ return this->proxy_id_ == event_addr.proxy_id_
&& this->supplier_id_ == event_addr.supplier_id_
&& this->type_ == event_addr.type_;
}
- ACE_INT32 conn_id_;
+ ACE_INT32 proxy_id_;
// Unique connection identifier that denotes a particular
// Proxy_Handler.
diff --git a/apps/Gateway/Gateway/Event_Channel.cpp b/apps/Gateway/Gateway/Event_Channel.cpp
index 02f2cd465f8..532523956a9 100644
--- a/apps/Gateway/Gateway/Event_Channel.cpp
+++ b/apps/Gateway/Gateway/Event_Channel.cpp
@@ -8,7 +8,11 @@
ACE_Event_Channel_Options::ACE_Event_Channel_Options (void)
: performance_window_ (0),
blocking_semantics_ (ACE_NONBLOCK),
- socket_queue_size_ (0)
+ socket_queue_size_ (0),
+ threading_strategy_ (REACTIVE),
+ acceptor_port_ (ACE_DEFAULT_GATEWAY_SERVER_PORT),
+ connector_role_ (0),
+ acceptor_role_ (0)
{
}
@@ -26,20 +30,21 @@ ACE_Event_Channel::options (void)
return this->options_;
}
-ACE_Event_Channel::handle_timeout (const ACE_Time_Value &,
- const void *)
+int
+ACE_Event_Channel::compute_performance_statistics (void)
{
ACE_DEBUG ((LM_DEBUG, "(%t) doing the performance timeout here...\n"));
- CONNECTION_MAP_ITERATOR cmi (this->connection_map_);
+ PROXY_MAP_ITERATOR cmi (this->proxy_map_);
// If we've got a ACE_Thread Manager then use it to suspend all the
// threads. This will enable us to get an accurate count.
-#if defined (USE_OUTPUT_MT) || defined (USE_INPUT_MT)
- if (ACE_Service_Config::thr_mgr ()->suspend_all () == -1)
- ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "suspend_all"), -1);
- ACE_DEBUG ((LM_DEBUG, "(%t) suspending all threads..."));
-#endif /* USE_INPUT_MT || USE_OUTPUT_MT */
+ if (this->options ().threading_strategy_ != ACE_Event_Channel_Options::REACTIVE)
+ {
+ if (ACE_Service_Config::thr_mgr ()->suspend_all () == -1)
+ ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "suspend_all"), -1);
+ ACE_DEBUG ((LM_DEBUG, "(%t) suspending all threads..."));
+ }
size_t total_bytes_in = 0;
size_t total_bytes_out = 0;
@@ -47,7 +52,7 @@ ACE_Event_Channel::handle_timeout (const ACE_Time_Value &,
// Iterate through the connection map summing up the number of bytes
// sent/received.
- for (CONNECTION_MAP_ENTRY *me = 0;
+ for (PROXY_MAP_ENTRY *me = 0;
cmi.next (me) != 0;
cmi.advance ())
{
@@ -60,7 +65,8 @@ ACE_Event_Channel::handle_timeout (const ACE_Time_Value &,
}
#if defined (ACE_NLOGGING)
- ACE_OS::fprintf (stderr, "After %d seconds, \ntotal_bytes_in = %d\ntotal_bytes_out = %d\n",
+ ACE_OS::fprintf (stderr,
+ "After %d seconds, \ntotal_bytes_in = %d\ntotal_bytes_out = %d\n",
performance_window_,
total_bytes_in,
total_bytes_out);
@@ -81,29 +87,36 @@ ACE_Event_Channel::handle_timeout (const ACE_Time_Value &,
(float) (total_bytes_out * 8 / (float) (1024 * 1024 * this->options ().performance_window_))));
#endif /* ACE_NLOGGING */
-#if defined (USE_INPUT_MT) || defined (USE_OUTPUT_MT)
- ACE_DEBUG ((LM_DEBUG, "(%t) resuming all threads..."));
-
// Resume all the threads again.
- if (ACE_Service_Config::thr_mgr ()->resume_all () == -1)
- ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "resume_all"), -1);
-#endif /* USE_INPUT_MT || USE_OUTPUT_MT */
+ if (this->options ().threading_strategy_ != ACE_Event_Channel_Options::REACTIVE)
+ {
+ if (ACE_Service_Config::thr_mgr ()->resume_all () == -1)
+ ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "resume_all"), -1);
+ ACE_DEBUG ((LM_DEBUG, "(%t) resuming all threads..."));
+ }
+}
- return 0;
+ACE_Event_Channel::handle_timeout (const ACE_Time_Value &,
+ const void *)
+{
+ return this->collection_performance_statistics ();
}
-ACE_Event_Channel::put (ACE_Message_Block *forward_addr,
+// This method forwards the <event> to Consumer that have registered
+// to receive it.
+
+ACE_Event_Channel::put (ACE_Message_Block *event,
ACE_Time_Value *)
{
// We got a valid event, so determine its virtual forwarding
// address, which is stored in the first of the two event blocks,
// which are chained together by this->recv().
- Event_Key *forwarding_addr = (Event_Key *) forward_addr->rd_ptr ();
+ Event_Key *forwarding_addr = (Event_Key *) event->rd_ptr ();
// Skip over the address portion and get the data.
- ACE_Message_Block *data = forward_addr->cont ();
+ ACE_Message_Block *data = event->cont ();
// <dispatch_set> points to the set of Consumers associated with
// this forwarding address.
@@ -112,8 +125,8 @@ ACE_Event_Channel::put (ACE_Message_Block *forward_addr,
if (this->efd_.find (*forwarding_addr, dispatch_set) == -1)
// Failure.
ACE_ERROR ((LM_DEBUG,
- "(%t) find failed on conn id = %d, logical id = %d, type = %d\n",
- forwarding_addr->conn_id_,
+ "(%t) find failed on conn id = %d, supplier id = %d, type = %d\n",
+ forwarding_addr->proxy_id_,
forwarding_addr->supplier_id_,
forwarding_addr->type_));
else
@@ -143,7 +156,7 @@ ACE_Event_Channel::put (ACE_Message_Block *forward_addr,
// counting.
ACE_Message_Block *dup_msg = data->duplicate ();
- ACE_DEBUG ((LM_DEBUG, "(%t) sending to peer %d\n",
+ ACE_DEBUG ((LM_DEBUG, "(%t) forwarding to Consumer %d\n",
(*proxy_handler)->id ()));
if ((*proxy_handler)->put (dup_msg) == -1)
@@ -165,7 +178,7 @@ ACE_Event_Channel::put (ACE_Message_Block *forward_addr,
}
// Release the memory in the message block.
- forward_addr->release ();
+ event->release ();
return 0;
}
@@ -183,6 +196,11 @@ ACE_Event_Channel::initiate_proxy_connection (Proxy_Handler *proxy_handler,
}
int
+ACE_Event_Channel::initiate_proxy_accept (void)
+{
+}
+
+int
ACE_Event_Channel::complete_proxy_connection (Proxy_Handler *proxy_handler)
{
int option = proxy_handler->proxy_role () == 'S' ? SO_RCVBUF : SO_SNDBUF;
@@ -220,30 +238,34 @@ ACE_Event_Channel::complete_proxy_connection (Proxy_Handler *proxy_handler)
int
ACE_Event_Channel::reinitiate_proxy_connection (Proxy_Handler *proxy_handler)
{
- // Skip over deactivated descriptors.
+ // Skip over proxies with deactivated handles.
if (proxy_handler->get_handle () != ACE_INVALID_HANDLE)
{
// Make sure to close down peer to reclaim descriptor.
proxy_handler->peer ().close ();
- ACE_DEBUG ((LM_DEBUG,
- "(%t) scheduling reinitiation of Proxy_Handler %d\n",
- proxy_handler->id ()));
-
- // Reschedule ourselves to try and connect again.
- if (ACE_Service_Config::reactor ()->schedule_timer
- (proxy_handler, 0, proxy_handler->timeout ()) == -1)
- ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n",
- "schedule_timer"), -1);
+ if (proxy_handler->state () != Proxy_Handler::DISCONNECTING)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "(%t) scheduling reinitiation of Proxy_Handler %d\n",
+ proxy_handler->id ()));
+
+ // Reschedule ourselves to try and connect again.
+ if (ACE_Service_Config::reactor ()->schedule_timer
+ (proxy_handler, 0, proxy_handler->timeout ()) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n",
+ "schedule_timer"), -1);
+ }
}
return 0;
}
-// Initiate connections with the Consumer and Supplier Peers.
+// Initiate active connections with the Consumer and Supplier Peers.
-ACE_Event_Channel::initiate_connections (void)
+void
+ACE_Event_Channel::initiate_connector (void)
{
- CONNECTION_MAP_ITERATOR cmi (this->connection_map_);
+ PROXY_MAP_ITERATOR cmi (this->proxy_map_);
ACE_Synch_Options synch_options;
@@ -255,7 +277,7 @@ ACE_Event_Channel::initiate_connections (void)
// Iterate through the Consumer Map connecting all the
// Proxy_Handlers.
- for (CONNECTION_MAP_ENTRY *me = 0;
+ for (PROXY_MAP_ENTRY *me = 0;
cmi.next (me) != 0;
cmi.advance ())
{
@@ -265,8 +287,18 @@ ACE_Event_Channel::initiate_connections (void)
(proxy_handler, synch_options) == -1)
continue; // Failures are handled elsewhere...
}
+}
- return 0;
+// Initiate passive acceptor to wait for Consumer and Supplier Peers
+// to accept.
+
+void
+ACE_Event_Channel::initiate_acceptor (void)
+{
+ if (ACE_Service_Config::reactor ()->register_handler
+ (&this->acceptor_, ACE_Event_Handler::ACCEPT_MASK) == -1)
+ ACE_ERROR ((LM_ERROR, "%p\n",
+ "cannot register acceptor"));
}
// This method gracefully shuts down all the Handlers in the
@@ -274,17 +306,18 @@ ACE_Event_Channel::initiate_connections (void)
ACE_Event_Channel::close (u_long)
{
-#if defined (USE_INPUT_MT) || defined (USE_OUTPUT_MT)
- ACE_DEBUG ((LM_DEBUG, "(%t) suspending all threads\n"));
- if (ACE_Service_Config::thr_mgr ()->suspend_all () == -1)
- ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "suspend_all"), -1);
-#endif /* USE_INPUT_MT || USE_OUTPUT_MT */
+ if (this->options ().threading_strategy_ != ACE_Event_Channel_Options::REACTIVE)
+ {
+ if (ACE_Service_Config::thr_mgr ()->suspend_all () == -1)
+ ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "suspend_all"), -1);
+ ACE_DEBUG ((LM_DEBUG, "(%t) suspending all threads\n"));
+ }
- CONNECTION_MAP_ITERATOR cmi (this->connection_map_);
+ PROXY_MAP_ITERATOR cmi (this->proxy_map_);
// Iterate over all the handlers and shut them down.
- for (CONNECTION_MAP_ENTRY *me;
+ for (PROXY_MAP_ENTRY *me;
cmi.next (me) != 0;
cmi.advance ())
{
@@ -293,10 +326,9 @@ ACE_Event_Channel::close (u_long)
ACE_DEBUG ((LM_DEBUG, "(%t) closing down connection %d\n",
proxy_handler->id ()));
- if (proxy_handler->state () != Proxy_Handler::IDLE)
- // Mark Proxy_Handler as DISCONNECTING so we don't try to
- // reconnect...
- proxy_handler->state (Proxy_Handler::DISCONNECTING);
+ // Mark Proxy_Handler as DISCONNECTING so we don't try to
+ // reconnect...
+ proxy_handler->state (Proxy_Handler::DISCONNECTING);
// Deallocate Proxy_Handler resources.
proxy_handler->destroy (); // Will trigger a delete.
@@ -306,16 +338,16 @@ ACE_Event_Channel::close (u_long)
}
int
-ACE_Event_Channel::find_proxy (ACE_INT32 conn_id,
+ACE_Event_Channel::find_proxy (ACE_INT32 proxy_id,
Proxy_Handler *&proxy_handler)
{
- return this->connection_map_.find (conn_id, proxy_handler);
+ return this->proxy_map_.find (proxy_id, proxy_handler);
}
int
ACE_Event_Channel::bind_proxy (Proxy_Handler *proxy_handler)
{
- switch (this->connection_map_.bind (proxy_handler->id (), proxy_handler))
+ switch (this->proxy_map_.bind (proxy_handler->id (), proxy_handler))
{
case -1:
ACE_ERROR_RETURN ((LM_ERROR,
@@ -342,11 +374,11 @@ ACE_Event_Channel::subscribe (const Event_Key &event_addr,
{
case -1:
ACE_ERROR_RETURN ((LM_ERROR, "(%t) bind failed for connection %d\n",
- event_addr.conn_id_), -1);
+ event_addr.proxy_id_), -1);
/* NOTREACHED */
case 1: // Oops, found a duplicate!
ACE_ERROR_RETURN ((LM_DEBUG, "(%t) duplicate consumer map entry %d, "
- "already bound\n", event_addr.conn_id_), -1);
+ "already bound\n", event_addr.proxy_id_), -1);
/* NOTREACHED */
case 0:
// Success.
@@ -359,19 +391,12 @@ ACE_Event_Channel::open (void *)
// Ignore SIPPIPE so each Consumer_Proxy can handle it.
ACE_Sig_Action sig (ACE_SignalHandler (SIG_IGN), SIGPIPE);
-#if 0
- // If this->performance_window_ > 0 start a timer.
-
- if (this->options ().performance_window_ > 0)
- {
- if (ACE_Service_Config::reactor ()->schedule_timer
- (this, 0, this->options ().performance_window_) == -1)
- ACE_ERROR ((LM_ERROR, "(%t) %p\n", "schedule_timer"));
- else
- ACE_DEBUG ((LM_DEBUG, "starting timer for %d seconds...\n",
- this->options ().performance_window_)));
- }
-#endif
+ if (this->connector_role_)
+ // Actively initiate Peer connections.
+ this->initiate_connector ();
+ if (this->acceptor_role_)
+ // Passively initiate Peer acceptor.
+ this->initiate_acceptor ();
return 0;
}
diff --git a/apps/Gateway/Gateway/Event_Channel.h b/apps/Gateway/Gateway/Event_Channel.h
index 1ecf468addf..513fa36e1f9 100644
--- a/apps/Gateway/Gateway/Event_Channel.h
+++ b/apps/Gateway/Gateway/Event_Channel.h
@@ -18,6 +18,11 @@
#define ACE_EVENT_CHANNEL
#include "Proxy_Handler_Connector.h"
+#include "Proxy_Handler_Acceptor.h"
+#include "Consumer_Dispatch_Set.h"
+#include "Event_Forwarding_Discriminator.h"
+
+typedef ACE_Null_Mutex MAP_MUTEX;
class ACE_Svc_Export ACE_Event_Channel_Options
// = TITLE
@@ -36,9 +41,28 @@ public:
int socket_queue_size_;
// Size of the socket queue (0 means "use default").
+
+ enum
+ {
+ REACTIVE = 0,
+ OUTPUT_MT = 1,
+ INPUT_MT = 2
+ };
+
+ u_long threading_strategy_;
+ // i.e., REACTIVE, OUTPUT_MT, and/or INPUT_MT.
+
+ u_short acceptor_port_;
+ // Port used to accept connections from Peers.
+
+ int connector_role_;
+ // Enabled if we are playing the role of the Connector.
+
+ int acceptor_role_;
+ // Enabled if we are playing the role of the Connector.
};
-class ACE_Svc_Export ACE_Event_Channel : public ACE_Task<SYNCH_STRATEGY>
+class ACE_Svc_Export ACE_Event_Channel : public ACE_Task<ACE_MT_SYNCH>
// = TITLE
// Define a generic Event_Channel.
//
@@ -68,10 +92,10 @@ public:
// Reinitiate a connection asynchronously when the Peer fails.
int bind_proxy (Proxy_Handler *);
- // Bind the <Proxy_Handler> to the <connection_map_>.
+ // Bind the <Proxy_Handler> to the <proxy_map_>.
- int find_proxy (ACE_INT32 conn_id, Proxy_Handler *&);
- // Locate the <Proxy_Handler> with <conn_id>.
+ int find_proxy (ACE_INT32 proxy_id, Proxy_Handler *&);
+ // Locate the <Proxy_Handler> with <proxy_id>.
int subscribe (const Event_Key &event_addr,
Consumer_Dispatch_Set *cds);
@@ -85,8 +109,11 @@ public:
ACE_Event_Channel_Options &options (void);
// Points to the Event_Channel options.
- int initiate_connections (void);
- // Initiate connections to the peers.
+ void initiate_connector (void);
+ // Actively initiate connections to the Peers.
+
+ void initiate_acceptor (void);
+ // Passively initiate Peer acceptor.
private:
virtual int svc (void);
@@ -95,24 +122,26 @@ private:
int parse_args (int argc, char *argv[]);
// Parse the command-line arguments.
+ int compute_performance_statistics (void);
+ // Perform timer-based performance profiling.
+
virtual int handle_timeout (const ACE_Time_Value &,
const void *arg);
- // Perform timer-based performance profiling.
+ // Periodically callback to perform timer-based performance
+ // profiling.
Proxy_Handler_Connector connector_;
// Used to establish the connections actively.
- // Proxy_Handler_Acceptor acceptor_;
+ Proxy_Handler_Acceptor acceptor_;
// Used to establish the connections passively.
// = Make life easier by defining typedefs.
- // Note that Proxy_Handler is assumed to the base class of
- // SUPPLIER_PROXY and CONSUMER_PROXY.
- typedef ACE_Map_Manager<ACE_INT32, Proxy_Handler *, MAP_MUTEX> CONNECTION_MAP;
- typedef ACE_Map_Iterator<ACE_INT32, Proxy_Handler *, MAP_MUTEX> CONNECTION_MAP_ITERATOR;
- typedef ACE_Map_Entry<ACE_INT32, Proxy_Handler *> CONNECTION_MAP_ENTRY;
+ typedef ACE_Map_Manager<ACE_INT32, Proxy_Handler *, MAP_MUTEX> PROXY_MAP;
+ typedef ACE_Map_Iterator<ACE_INT32, Proxy_Handler *, MAP_MUTEX> PROXY_MAP_ITERATOR;
+ typedef ACE_Map_Entry<ACE_INT32, Proxy_Handler *> PROXY_MAP_ENTRY;
- CONNECTION_MAP connection_map_;
+ PROXY_MAP proxy_map_;
// Table that maps Connection IDs to Proxy_Handler *'s.
Event_Forwarding_Discriminator efd_;
diff --git a/apps/Gateway/Gateway/Event_Forwarding_Discriminator.h b/apps/Gateway/Gateway/Event_Forwarding_Discriminator.h
index 9b7531c1f46..6c35776c104 100644
--- a/apps/Gateway/Gateway/Event_Forwarding_Discriminator.h
+++ b/apps/Gateway/Gateway/Event_Forwarding_Discriminator.h
@@ -18,7 +18,7 @@
#define _CONSUMER_MAP_H
#include "ace/Map_Manager.h"
-#include "Concurrency_Strategies.h"
+#include "ace/Synch.h"
#include "Event.h"
#include "Consumer_Dispatch_Set.h"
@@ -39,7 +39,7 @@ public:
// Break any association of EXID.
public:
- ACE_Map_Manager<Event_Key, Consumer_Dispatch_Set *, MAP_MUTEX> map_;
+ ACE_Map_Manager<Event_Key, Consumer_Dispatch_Set *, ACE_Null_Mutex> map_;
// Map that associates Event Addrs (external ids) with Consumer_Dispatch_Set *'s
// <internal IDs>.
};
@@ -54,7 +54,7 @@ public:
int advance (void);
private:
- ACE_Map_Iterator<Event_Key, Consumer_Dispatch_Set *, MAP_MUTEX> map_iter_;
+ ACE_Map_Iterator<Event_Key, Consumer_Dispatch_Set *, ACE_Null_Mutex> map_iter_;
// Map we are iterating over.
};
#endif /* _CONSUMER_MAP_H */
diff --git a/apps/Gateway/Gateway/Gateway.cpp b/apps/Gateway/Gateway/Gateway.cpp
index 4ff09aed1b7..054a06282aa 100644
--- a/apps/Gateway/Gateway/Gateway.cpp
+++ b/apps/Gateway/Gateway/Gateway.cpp
@@ -15,24 +15,25 @@ class Gateway : public ACE_Service_Object
// This implementation uses the <ACE_Event_Channel> as the basis
// for the <Gateway> routing.
{
-public:
+protected:
// = Service configurator hooks.
virtual int init (int argc, char *argv[]);
// Perform initialization.
virtual int fini (void);
- // Perform termination.
+ // Perform termination when unlinked dynamically.
virtual int info (char **, size_t) const;
// Return info about this service.
- int parse_connection_config_file (void);
- // Parse the connection configuration file.
+ // = Configuration methods.
+ int parse_proxy_config_file (void);
+ // Parse the proxy configuration file.
int parse_consumer_config_file (void);
// Parse the consumer configuration file.
-protected:
+ // = Lifecycle management methods.
int handle_input (ACE_HANDLE);
// Shut down the Gateway when input comes in from the controlling
// console.
@@ -44,17 +45,18 @@ protected:
// Parse gateway configuration arguments obtained from svc.conf
// file.
- char connection_config_file_[MAXPATHLEN + 1];
+ char proxy_config_file_[MAXPATHLEN + 1];
// Name of the connection configuration file.
char consumer_config_file_[MAXPATHLEN + 1];
// Name of the consumer map configuration file.
ACE_Event_Channel event_channel_;
- // The Event Channel routes events from Supplier(s) to Consumer(s).
+ // The Event Channel routes events from Supplier(s) to Consumer(s)
+ // using <Supplier_Proxy> and <Consumer_Proxy> objects.
- int active_connector_role_;
- // Enabled if we are playing the role of the active Connector.
+ Proxy_Handler_Factory proxy_handler_factory_;
+ // Creates the appropriate type of <Proxy_Handlers>.
int debug_;
// Are we debugging?
@@ -91,41 +93,64 @@ Gateway::handle_input (ACE_HANDLE h)
int
Gateway::parse_args (int argc, char *argv[])
{
- ACE_OS::strcpy (this->connection_config_file_, "connection_config");
+ // Assign defaults.
+ ACE_OS::strcpy (this->proxy_config_file_, "proxy_config");
ACE_OS::strcpy (this->consumer_config_file_, "consumer_config");
- this->active_connector_role_ = 1;
this->debug_ = 0;
- ACE_Get_Opt get_opt (argc, argv, "bc:dpr:q:w:", 0);
+ ACE_Get_Opt get_opt (argc, argv, "abC:cdP:pq:t:w:", 0);
- for (int c; (c = get_opt ()) != -1; )
+ for (int c; (c = get_opt ()) != EOF; )
{
switch (c)
{
+ case 'a': // We are (also?) playing the Acceptor role.
+ this->event_channel_.options ().acceptor_role_ = 1;
+ break;
+
case 'b': // Use blocking connection establishment.
this->event_channel_.options ().blocking_semantics_ = 0;
break;
- case 'c':
- ACE_OS::strncpy (this->connection_config_file_,
+ case 'C': // Use a different proxy config filename.
+ ACE_OS::strncpy (this->proxy_config_file_,
get_opt.optarg,
- sizeof this->connection_config_file_);
+ sizeof this->proxy_config_file_);
break;
- case 'd':
- this->debug_ = 1;
- break;
- case 'p':
- // We are not playing the active Connector role.
- this->active_connector_role_ = 0;
+ case 'c': // We are (also?) playing the Connector role.
+ this->event_channel_.options ().connector_role_ = 1;
break;
- case 'q':
- this->event_channel_.options ().socket_queue_size_ =
- ACE_OS::atoi (get_opt.optarg);
+ case 'd': // We are debugging.
+ this->debug_ = 1;
break;
- case 'r':
+ case 'P': // Use a different consumer config filename.
ACE_OS::strncpy (this->consumer_config_file_,
get_opt.optarg,
sizeof this->consumer_config_file_);
break;
+ case 'p': // Use a different acceptor port.
+ this->event_channel_.options ().acceptor_port_ =
+ ACE_OS::atoi (get_opt.optarg);
+ break;
+ case 'q': // Use a different socket queue size.
+ this->event_channel_.options ().socket_queue_size_ =
+ ACE_OS::atoi (get_opt.optarg);
+ break;
+ case 't': // Use a different threading strategy.
+ {
+ for (char *flag = ACE_OS::strtok (get_opt.optarg, "|");
+ flag != 0;
+ flag = ACE_OS::strtok (0, "|"))
+ {
+ if (ACE_OS::strcmp (flag, "OUTPUT_MT") == 0)
+ ACE_SET_BITS (this->event_channel_.options ().threading_strategy_,
+ ACE_Event_Channel_Options::OUTPUT_MT);
+ else if (ACE_OS::strcmp (flag, "INPUT_MT") == 0)
+ ACE_SET_BITS (this->event_channel_.options ().threading_strategy_,
+ ACE_Event_Channel_Options::INPUT_MT);
+ }
+
+ break;
+ }
case 'w': // Time performance for a designated amount of time.
this->event_channel_.options ().performance_window_ =
ACE_OS::atoi (get_opt.optarg);
@@ -143,10 +168,6 @@ Gateway::parse_args (int argc, char *argv[])
int
Gateway::init (int argc, char *argv[])
{
- // Initialize the Event_Channel.
- if (this->event_channel_.open () == -1)
- ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "open"), -1);
-
// Parse the "command-line" arguments.
this->parse_args (argc, argv);
@@ -165,30 +186,40 @@ Gateway::init (int argc, char *argv[])
(0, this, ACE_Event_Handler::READ_MASK) == -1)
ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "register_handler"), -1);
- if (this->active_connector_role_)
+ // If this->performance_window_ > 0 start a timer.
+
+ if (this->event_channel_.options ().performance_window_ > 0)
{
- // Parse the connection configuration file.
- this->parse_connection_config_file ();
+ if (ACE_Service_Config::reactor ()->schedule_timer
+ (&this->event_channel_, 0,
+ this->event_channel_.options ().performance_window_) == -1)
+ ACE_ERROR ((LM_ERROR, "(%t) %p\n", "schedule_timer"));
+ else
+ ACE_DEBUG ((LM_DEBUG, "starting timer for %d seconds...\n",
+ this->event_channel_.options ().performance_window_));
+ }
- // Parse the consumer map config file and build the consumer
- // map.
- this->parse_consumer_config_file ();
+ if (this->event_channel_.options ().connector_role_)
+ {
+ // Parse the proxy configuration file.
+ this->parse_proxy_config_file ();
- // Initiate connections with the peers.
- this->event_channel_.initiate_connections ();
+ // Parse the consumer config file and build the event forwarding
+ // discriminator.
+ this->parse_consumer_config_file ();
}
- return 0;
+ // Initialize the Event_Channel.
+ return this->event_channel_.open ();
}
-// This method is automatically called when the gateway is shutdown.
-// It closes down the Event Channel.
+// This method is automatically called when the Gateway is shutdown.
int
Gateway::fini (void)
{
- this->event_channel_.close ();
- return 0;
+ // Close down the event channel.
+ return this->event_channel_.close ();
}
// Returns information on the currently active service.
@@ -208,61 +239,49 @@ Gateway::info (char **strp, size_t length) const
return ACE_OS::strlen (buf);
}
-// Parse and build the connection table.
+// Parse and build the proxy table.
int
-Gateway::parse_connection_config_file (void)
+Gateway::parse_proxy_config_file (void)
{
- // File that contains the consumer map configuration information.
- Connection_Config_File_Parser connection_file;
- Connection_Config_File_Entry entry;
+ // File that contains the proxy configuration information.
+ Proxy_Config_File_Parser proxy_file;
int file_empty = 1;
int line_number = 0;
- if (connection_file.open (this->connection_config_file_) == -1)
- ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", this->connection_config_file_), -1);
+ if (proxy_file.open (this->proxy_config_file_) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "(%t) %p\n",
+ this->proxy_config_file_),
+ -1);
// Read config file one line at a time.
- while (connection_file.read_entry (entry, line_number) != FP::EOFILE)
+ for (Proxy_Config_Info pci;
+ proxy_file.read_entry (pci, line_number) != FP::EOFILE;
+ )
{
file_empty = 0;
if (this->debug_)
- ACE_DEBUG ((LM_DEBUG, "(%t) conn id = %d, host = %s, remote port = %d, "
- "proxy role = %c, max retry timeout = %d, local port = %d\n",
- entry.conn_id_,
- entry.host_,
- entry.remote_port_,
- entry.proxy_role_,
- entry.max_retry_delay_,
- entry.local_port_));
-
- Proxy_Handler *proxy_handler = 0;
-
- // Initialize the entry's peer addressing info.
-
- ACE_INET_Addr remote_addr (entry.remote_port_, entry.host_);
- ACE_INET_Addr local_addr (entry.local_port_);
-
- // The next few lines of code are dependent on whether we are
- // making an Supplier_Proxy or an Consumer_Proxy.
-
- if (entry.proxy_role_ == 'C') // Configure a Consumer_Proxy.
- ACE_NEW_RETURN (proxy_handler,
- CONSUMER_PROXY (this->event_channel_, remote_addr,
- local_addr, entry.conn_id_),
- -1);
- else // proxy_role == 'S', so configure a Supplier_Proxy.
- ACE_NEW_RETURN (proxy_handler,
- SUPPLIER_PROXY (this->event_channel_, remote_addr,
- local_addr, entry.conn_id_),
- -1);
-
- // The following code is common to both Supplier_Proxys_ and
- // Consumer_Proxys.
-
- // Initialize max timeout.
- proxy_handler->max_timeout (entry.max_retry_delay_);
+ ACE_DEBUG ((LM_DEBUG,
+ "(%t) conn id = %d, host = %s, remote port = %d, proxy role = %c, "
+ "max retry timeout = %d, local port = %d, priority = %d\n",
+ pci.proxy_id_,
+ pci.host_,
+ pci.remote_port_,
+ pci.proxy_role_,
+ pci.max_retry_timeout_,
+ pci.local_port_,
+ pci.priority_));
+
+ pci.event_channel_ = &this->event_channel_;
+
+ // Create the appropriate type of Proxy.
+ Proxy_Handler *proxy_handler =
+ this->proxy_handler_factory_.make_proxy_handler (pci);
+
+ if (proxy_handler == 0)
+ ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "make_proxy_handler"), -1);
// Bind the new Proxy_Handler to the connection ID.
this->event_channel_.bind_proxy (proxy_handler);
@@ -279,7 +298,6 @@ Gateway::parse_consumer_config_file (void)
{
// File that contains the consumer event forwarding information.
Consumer_Config_File_Parser consumer_file;
- Consumer_Config_File_Entry entry;
int file_empty = 1;
int line_number = 0;
@@ -287,43 +305,46 @@ Gateway::parse_consumer_config_file (void)
ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", this->consumer_config_file_), -1);
// Read config file line at a time.
- while (consumer_file.read_entry (entry, line_number) != FP::EOFILE)
+ for (Consumer_Config_Info cci;
+ consumer_file.read_entry (cci, line_number) != FP::EOFILE);
+ )
{
file_empty = 0;
if (this->debug_)
{
- ACE_DEBUG ((LM_DEBUG, "(%t) conn id = %d, logical id = %d, payload = %d, "
+ ACE_DEBUG ((LM_DEBUG, "(%t) conn id = %d, supplier id = %d, payload = %d, "
"number of consumers = %d\n",
- entry.conn_id_,
- entry.supplier_id_,
- entry.type_,
- entry.total_consumers_));
- for (int i = 0; i < entry.total_consumers_; i++)
+ cci.proxy_id_,
+ cci.supplier_id_,
+ cci.type_,
+ cci.total_consumers_));
+
+ for (int i = 0; i < cci.total_consumers_; i++)
ACE_DEBUG ((LM_DEBUG, "(%t) destination[%d] = %d\n",
- i, entry.consumers_[i]));
+ i, cci.consumers_[i]));
}
Consumer_Dispatch_Set *dispatch_set;
ACE_NEW_RETURN (dispatch_set, Consumer_Dispatch_Set, -1);
- Event_Key event_addr (entry.conn_id_,
- entry.supplier_id_,
- entry.type_);
+ Event_Key event_addr (cci.proxy_id_,
+ cci.supplier_id_,
+ cci.type_);
// Add the Consumers to the Dispatch_Set.
- for (int i = 0; i < entry.total_consumers_; i++)
+ for (int i = 0; i < cci.total_consumers_; i++)
{
Proxy_Handler *proxy_handler = 0;
// Lookup destination and add to Consumer_Dispatch_Set set
// if found.
- if (this->event_channel_.find_proxy (entry.consumers_[i],
+ if (this->event_channel_.find_proxy (cci.consumers_[i],
proxy_handler) != -1)
dispatch_set->insert (proxy_handler);
else
ACE_ERROR ((LM_ERROR, "(%t) not found: destination[%d] = %d\n",
- i, entry.consumers_[i]));
+ i, cci.consumers_[i]));
}
this->event_channel_.subscribe (event_addr, dispatch_set);
diff --git a/apps/Gateway/Gateway/Makefile b/apps/Gateway/Gateway/Makefile
index c3ae8dffe4d..0f1c3766b56 100644
--- a/apps/Gateway/Gateway/Makefile
+++ b/apps/Gateway/Gateway/Makefile
@@ -12,14 +12,15 @@ BIN = gatewayd
LIB = libGateway.a
SHLIB = libGateway.so
-FILES = Config_Files \
+FILES = Concrete_Proxy_Handlers \
+ Config_Files \
File_Parser \
Gateway \
Event_Channel \
Event_Forwarding_Discriminator \
Proxy_Handler \
- Proxy_Handler_Connector \
- Thr_Proxy_Handler
+ Proxy_Handler_Acceptor \
+ Proxy_Handler_Connector
LSRC = $(addsuffix .cpp,$(FILES))
LOBJ = $(addsuffix .o,$(FILES))
@@ -48,11 +49,6 @@ include $(WRAPPER_ROOT)/include/makeinclude/rules.local.GNU
# Local targets
#----------------------------------------------------------------------------
-# Default behavior is to use single-threading. See the README
-# file for information on how to configure this with multiple
-# strategies for threading the input and output channels.
-# DEFFLAGS += -DUSE_OUTPUT_MT -DUSE_INPUT_MT
-
#----------------------------------------------------------------------------
# Dependencies
#----------------------------------------------------------------------------
@@ -86,143 +82,6 @@ include $(WRAPPER_ROOT)/include/makeinclude/rules.local.GNU
$(WRAPPER_ROOT)/ace/ACE.i \
$(WRAPPER_ROOT)/ace/Log_Record.i \
File_Parser.h
-.obj/Gateway.o .shobj/Gateway.so: Gateway.cpp \
- $(WRAPPER_ROOT)/ace/Service_Config.h \
- $(WRAPPER_ROOT)/ace/Service_Object.h \
- $(WRAPPER_ROOT)/ace/Shared_Object.h \
- $(WRAPPER_ROOT)/ace/ACE.h \
- $(WRAPPER_ROOT)/ace/OS.h \
- $(WRAPPER_ROOT)/ace/Time_Value.h \
- $(WRAPPER_ROOT)/ace/config.h \
- $(WRAPPER_ROOT)/ace/stdcpp.h \
- $(WRAPPER_ROOT)/ace/Trace.h \
- $(WRAPPER_ROOT)/ace/Log_Msg.h \
- $(WRAPPER_ROOT)/ace/Log_Record.h \
- $(WRAPPER_ROOT)/ace/Log_Priority.h \
- $(WRAPPER_ROOT)/ace/Log_Record.i \
- $(WRAPPER_ROOT)/ace/ACE.i \
- $(WRAPPER_ROOT)/ace/Event_Handler.h \
- $(WRAPPER_ROOT)/ace/Thread_Manager.h \
- $(WRAPPER_ROOT)/ace/Thread.h \
- $(WRAPPER_ROOT)/ace/Synch.h \
- $(WRAPPER_ROOT)/ace/SV_Semaphore_Complex.h \
- $(WRAPPER_ROOT)/ace/SV_Semaphore_Simple.h \
- $(WRAPPER_ROOT)/ace/SV_Semaphore_Simple.i \
- $(WRAPPER_ROOT)/ace/SV_Semaphore_Complex.i \
- $(WRAPPER_ROOT)/ace/Synch_T.h \
- $(WRAPPER_ROOT)/ace/Signal.h \
- $(WRAPPER_ROOT)/ace/Set.h \
- $(WRAPPER_ROOT)/ace/Reactor.h \
- $(WRAPPER_ROOT)/ace/Handle_Set.h \
- $(WRAPPER_ROOT)/ace/Timer_Queue.h \
- $(WRAPPER_ROOT)/ace/Token.h \
- $(WRAPPER_ROOT)/ace/Pipe.h \
- $(WRAPPER_ROOT)/ace/Pipe.i \
- $(WRAPPER_ROOT)/ace/SOCK_Stream.h \
- $(WRAPPER_ROOT)/ace/SOCK_IO.h \
- $(WRAPPER_ROOT)/ace/SOCK.h \
- $(WRAPPER_ROOT)/ace/Addr.h \
- $(WRAPPER_ROOT)/ace/IPC_SAP.h \
- $(WRAPPER_ROOT)/ace/IPC_SAP.i \
- $(WRAPPER_ROOT)/ace/SOCK.i \
- $(WRAPPER_ROOT)/ace/SOCK_IO.i \
- $(WRAPPER_ROOT)/ace/INET_Addr.h \
- $(WRAPPER_ROOT)/ace/SOCK_Stream.i \
- $(WRAPPER_ROOT)/ace/Reactor.i \
- $(WRAPPER_ROOT)/ace/Proactor.h \
- $(WRAPPER_ROOT)/ace/Message_Block.h \
- $(WRAPPER_ROOT)/ace/Malloc.h \
- $(WRAPPER_ROOT)/ace/Malloc_T.h \
- $(WRAPPER_ROOT)/ace/Memory_Pool.h \
- $(WRAPPER_ROOT)/ace/Mem_Map.h \
- $(WRAPPER_ROOT)/ace/ReactorEx.h \
- $(WRAPPER_ROOT)/ace/Message_Queue.h \
- $(WRAPPER_ROOT)/ace/IO_Cntl_Msg.h \
- $(WRAPPER_ROOT)/ace/Strategies.h \
- $(WRAPPER_ROOT)/ace/Strategies_T.h \
- $(WRAPPER_ROOT)/ace/Svc_Conf_Tokens.h \
- Event_Channel.h Proxy_Handler_Connector.h \
- $(WRAPPER_ROOT)/ace/Connector.h \
- $(WRAPPER_ROOT)/ace/Map_Manager.h \
- $(WRAPPER_ROOT)/ace/Svc_Handler.h \
- $(WRAPPER_ROOT)/ace/Synch_Options.h \
- $(WRAPPER_ROOT)/ace/Task.h \
- $(WRAPPER_ROOT)/ace/Task_T.h \
- $(WRAPPER_ROOT)/ace/Connector.i \
- Thr_Proxy_Handler.h Proxy_Handler.h \
- $(WRAPPER_ROOT)/ace/SOCK_Connector.h \
- $(WRAPPER_ROOT)/ace/SOCK_Connector.i \
- Event_Forwarding_Discriminator.h Concurrency_Strategies.h Event.h \
- Consumer_Dispatch_Set.h Gateway.h
-.obj/Event_Channel.o .shobj/Event_Channel.so: Event_Channel.cpp \
- $(WRAPPER_ROOT)/ace/Get_Opt.h \
- $(WRAPPER_ROOT)/ace/ACE.h \
- $(WRAPPER_ROOT)/ace/OS.h \
- $(WRAPPER_ROOT)/ace/Time_Value.h \
- $(WRAPPER_ROOT)/ace/config.h \
- $(WRAPPER_ROOT)/ace/stdcpp.h \
- $(WRAPPER_ROOT)/ace/Trace.h \
- $(WRAPPER_ROOT)/ace/Log_Msg.h \
- $(WRAPPER_ROOT)/ace/Log_Record.h \
- $(WRAPPER_ROOT)/ace/Log_Priority.h \
- $(WRAPPER_ROOT)/ace/Log_Record.i \
- $(WRAPPER_ROOT)/ace/ACE.i \
- Config_Files.h File_Parser.h Proxy_Handler_Connector.h \
- $(WRAPPER_ROOT)/ace/Connector.h \
- $(WRAPPER_ROOT)/ace/Service_Config.h \
- $(WRAPPER_ROOT)/ace/Service_Object.h \
- $(WRAPPER_ROOT)/ace/Shared_Object.h \
- $(WRAPPER_ROOT)/ace/Event_Handler.h \
- $(WRAPPER_ROOT)/ace/Thread_Manager.h \
- $(WRAPPER_ROOT)/ace/Thread.h \
- $(WRAPPER_ROOT)/ace/Synch.h \
- $(WRAPPER_ROOT)/ace/SV_Semaphore_Complex.h \
- $(WRAPPER_ROOT)/ace/SV_Semaphore_Simple.h \
- $(WRAPPER_ROOT)/ace/SV_Semaphore_Simple.i \
- $(WRAPPER_ROOT)/ace/SV_Semaphore_Complex.i \
- $(WRAPPER_ROOT)/ace/Synch_T.h \
- $(WRAPPER_ROOT)/ace/Signal.h \
- $(WRAPPER_ROOT)/ace/Set.h \
- $(WRAPPER_ROOT)/ace/Reactor.h \
- $(WRAPPER_ROOT)/ace/Handle_Set.h \
- $(WRAPPER_ROOT)/ace/Timer_Queue.h \
- $(WRAPPER_ROOT)/ace/Token.h \
- $(WRAPPER_ROOT)/ace/Pipe.h \
- $(WRAPPER_ROOT)/ace/Pipe.i \
- $(WRAPPER_ROOT)/ace/SOCK_Stream.h \
- $(WRAPPER_ROOT)/ace/SOCK_IO.h \
- $(WRAPPER_ROOT)/ace/SOCK.h \
- $(WRAPPER_ROOT)/ace/Addr.h \
- $(WRAPPER_ROOT)/ace/IPC_SAP.h \
- $(WRAPPER_ROOT)/ace/IPC_SAP.i \
- $(WRAPPER_ROOT)/ace/SOCK.i \
- $(WRAPPER_ROOT)/ace/SOCK_IO.i \
- $(WRAPPER_ROOT)/ace/INET_Addr.h \
- $(WRAPPER_ROOT)/ace/SOCK_Stream.i \
- $(WRAPPER_ROOT)/ace/Reactor.i \
- $(WRAPPER_ROOT)/ace/Proactor.h \
- $(WRAPPER_ROOT)/ace/Message_Block.h \
- $(WRAPPER_ROOT)/ace/Malloc.h \
- $(WRAPPER_ROOT)/ace/Malloc_T.h \
- $(WRAPPER_ROOT)/ace/Memory_Pool.h \
- $(WRAPPER_ROOT)/ace/Mem_Map.h \
- $(WRAPPER_ROOT)/ace/ReactorEx.h \
- $(WRAPPER_ROOT)/ace/Message_Queue.h \
- $(WRAPPER_ROOT)/ace/IO_Cntl_Msg.h \
- $(WRAPPER_ROOT)/ace/Strategies.h \
- $(WRAPPER_ROOT)/ace/Strategies_T.h \
- $(WRAPPER_ROOT)/ace/Svc_Conf_Tokens.h \
- $(WRAPPER_ROOT)/ace/Map_Manager.h \
- $(WRAPPER_ROOT)/ace/Svc_Handler.h \
- $(WRAPPER_ROOT)/ace/Synch_Options.h \
- $(WRAPPER_ROOT)/ace/Task.h \
- $(WRAPPER_ROOT)/ace/Task_T.h \
- $(WRAPPER_ROOT)/ace/Connector.i \
- Thr_Proxy_Handler.h Proxy_Handler.h \
- $(WRAPPER_ROOT)/ace/SOCK_Connector.h \
- $(WRAPPER_ROOT)/ace/SOCK_Connector.i \
- Event_Forwarding_Discriminator.h Concurrency_Strategies.h Event.h \
- Consumer_Dispatch_Set.h Event_Channel.h
.obj/Event_Forwarding_Discriminator.o .shobj/Event_Forwarding_Discriminator.so: Event_Forwarding_Discriminator.cpp \
Event_Forwarding_Discriminator.h \
$(WRAPPER_ROOT)/ace/Map_Manager.h \
@@ -237,7 +96,6 @@ include $(WRAPPER_ROOT)/include/makeinclude/rules.local.GNU
$(WRAPPER_ROOT)/ace/Log_Priority.h \
$(WRAPPER_ROOT)/ace/Log_Record.i \
$(WRAPPER_ROOT)/ace/ACE.i \
- Concurrency_Strategies.h \
$(WRAPPER_ROOT)/ace/Synch.h \
$(WRAPPER_ROOT)/ace/SV_Semaphore_Complex.h \
$(WRAPPER_ROOT)/ace/SV_Semaphore_Simple.h \
@@ -247,208 +105,5 @@ include $(WRAPPER_ROOT)/include/makeinclude/rules.local.GNU
$(WRAPPER_ROOT)/ace/Event_Handler.h \
Event.h Consumer_Dispatch_Set.h \
$(WRAPPER_ROOT)/ace/Set.h
-.obj/Proxy_Handler.o .shobj/Proxy_Handler.so: Proxy_Handler.cpp Consumer_Dispatch_Set.h \
- $(WRAPPER_ROOT)/ace/Set.h \
- $(WRAPPER_ROOT)/ace/ACE.h \
- $(WRAPPER_ROOT)/ace/OS.h \
- $(WRAPPER_ROOT)/ace/Time_Value.h \
- $(WRAPPER_ROOT)/ace/config.h \
- $(WRAPPER_ROOT)/ace/stdcpp.h \
- $(WRAPPER_ROOT)/ace/Trace.h \
- $(WRAPPER_ROOT)/ace/Log_Msg.h \
- $(WRAPPER_ROOT)/ace/Log_Record.h \
- $(WRAPPER_ROOT)/ace/Log_Priority.h \
- $(WRAPPER_ROOT)/ace/Log_Record.i \
- $(WRAPPER_ROOT)/ace/ACE.i \
- Proxy_Handler_Connector.h \
- $(WRAPPER_ROOT)/ace/Connector.h \
- $(WRAPPER_ROOT)/ace/Service_Config.h \
- $(WRAPPER_ROOT)/ace/Service_Object.h \
- $(WRAPPER_ROOT)/ace/Shared_Object.h \
- $(WRAPPER_ROOT)/ace/Event_Handler.h \
- $(WRAPPER_ROOT)/ace/Thread_Manager.h \
- $(WRAPPER_ROOT)/ace/Thread.h \
- $(WRAPPER_ROOT)/ace/Synch.h \
- $(WRAPPER_ROOT)/ace/SV_Semaphore_Complex.h \
- $(WRAPPER_ROOT)/ace/SV_Semaphore_Simple.h \
- $(WRAPPER_ROOT)/ace/SV_Semaphore_Simple.i \
- $(WRAPPER_ROOT)/ace/SV_Semaphore_Complex.i \
- $(WRAPPER_ROOT)/ace/Synch_T.h \
- $(WRAPPER_ROOT)/ace/Signal.h \
- $(WRAPPER_ROOT)/ace/Reactor.h \
- $(WRAPPER_ROOT)/ace/Handle_Set.h \
- $(WRAPPER_ROOT)/ace/Timer_Queue.h \
- $(WRAPPER_ROOT)/ace/Token.h \
- $(WRAPPER_ROOT)/ace/Pipe.h \
- $(WRAPPER_ROOT)/ace/Pipe.i \
- $(WRAPPER_ROOT)/ace/SOCK_Stream.h \
- $(WRAPPER_ROOT)/ace/SOCK_IO.h \
- $(WRAPPER_ROOT)/ace/SOCK.h \
- $(WRAPPER_ROOT)/ace/Addr.h \
- $(WRAPPER_ROOT)/ace/IPC_SAP.h \
- $(WRAPPER_ROOT)/ace/IPC_SAP.i \
- $(WRAPPER_ROOT)/ace/SOCK.i \
- $(WRAPPER_ROOT)/ace/SOCK_IO.i \
- $(WRAPPER_ROOT)/ace/INET_Addr.h \
- $(WRAPPER_ROOT)/ace/SOCK_Stream.i \
- $(WRAPPER_ROOT)/ace/Reactor.i \
- $(WRAPPER_ROOT)/ace/Proactor.h \
- $(WRAPPER_ROOT)/ace/Message_Block.h \
- $(WRAPPER_ROOT)/ace/Malloc.h \
- $(WRAPPER_ROOT)/ace/Malloc_T.h \
- $(WRAPPER_ROOT)/ace/Memory_Pool.h \
- $(WRAPPER_ROOT)/ace/Mem_Map.h \
- $(WRAPPER_ROOT)/ace/ReactorEx.h \
- $(WRAPPER_ROOT)/ace/Message_Queue.h \
- $(WRAPPER_ROOT)/ace/IO_Cntl_Msg.h \
- $(WRAPPER_ROOT)/ace/Strategies.h \
- $(WRAPPER_ROOT)/ace/Strategies_T.h \
- $(WRAPPER_ROOT)/ace/Svc_Conf_Tokens.h \
- $(WRAPPER_ROOT)/ace/Map_Manager.h \
- $(WRAPPER_ROOT)/ace/Svc_Handler.h \
- $(WRAPPER_ROOT)/ace/Synch_Options.h \
- $(WRAPPER_ROOT)/ace/Task.h \
- $(WRAPPER_ROOT)/ace/Task_T.h \
- $(WRAPPER_ROOT)/ace/Connector.i \
- Thr_Proxy_Handler.h Proxy_Handler.h \
- $(WRAPPER_ROOT)/ace/SOCK_Connector.h \
- $(WRAPPER_ROOT)/ace/SOCK_Connector.i \
- Event_Forwarding_Discriminator.h Concurrency_Strategies.h Event.h
-.obj/Proxy_Handler_Connector.o .shobj/Proxy_Handler_Connector.so: Proxy_Handler_Connector.cpp \
- Proxy_Handler_Connector.h \
- $(WRAPPER_ROOT)/ace/Connector.h \
- $(WRAPPER_ROOT)/ace/Service_Config.h \
- $(WRAPPER_ROOT)/ace/Service_Object.h \
- $(WRAPPER_ROOT)/ace/Shared_Object.h \
- $(WRAPPER_ROOT)/ace/ACE.h \
- $(WRAPPER_ROOT)/ace/OS.h \
- $(WRAPPER_ROOT)/ace/Time_Value.h \
- $(WRAPPER_ROOT)/ace/config.h \
- $(WRAPPER_ROOT)/ace/stdcpp.h \
- $(WRAPPER_ROOT)/ace/Trace.h \
- $(WRAPPER_ROOT)/ace/Log_Msg.h \
- $(WRAPPER_ROOT)/ace/Log_Record.h \
- $(WRAPPER_ROOT)/ace/Log_Priority.h \
- $(WRAPPER_ROOT)/ace/Log_Record.i \
- $(WRAPPER_ROOT)/ace/ACE.i \
- $(WRAPPER_ROOT)/ace/Event_Handler.h \
- $(WRAPPER_ROOT)/ace/Thread_Manager.h \
- $(WRAPPER_ROOT)/ace/Thread.h \
- $(WRAPPER_ROOT)/ace/Synch.h \
- $(WRAPPER_ROOT)/ace/SV_Semaphore_Complex.h \
- $(WRAPPER_ROOT)/ace/SV_Semaphore_Simple.h \
- $(WRAPPER_ROOT)/ace/SV_Semaphore_Simple.i \
- $(WRAPPER_ROOT)/ace/SV_Semaphore_Complex.i \
- $(WRAPPER_ROOT)/ace/Synch_T.h \
- $(WRAPPER_ROOT)/ace/Signal.h \
- $(WRAPPER_ROOT)/ace/Set.h \
- $(WRAPPER_ROOT)/ace/Reactor.h \
- $(WRAPPER_ROOT)/ace/Handle_Set.h \
- $(WRAPPER_ROOT)/ace/Timer_Queue.h \
- $(WRAPPER_ROOT)/ace/Token.h \
- $(WRAPPER_ROOT)/ace/Pipe.h \
- $(WRAPPER_ROOT)/ace/Pipe.i \
- $(WRAPPER_ROOT)/ace/SOCK_Stream.h \
- $(WRAPPER_ROOT)/ace/SOCK_IO.h \
- $(WRAPPER_ROOT)/ace/SOCK.h \
- $(WRAPPER_ROOT)/ace/Addr.h \
- $(WRAPPER_ROOT)/ace/IPC_SAP.h \
- $(WRAPPER_ROOT)/ace/IPC_SAP.i \
- $(WRAPPER_ROOT)/ace/SOCK.i \
- $(WRAPPER_ROOT)/ace/SOCK_IO.i \
- $(WRAPPER_ROOT)/ace/INET_Addr.h \
- $(WRAPPER_ROOT)/ace/SOCK_Stream.i \
- $(WRAPPER_ROOT)/ace/Reactor.i \
- $(WRAPPER_ROOT)/ace/Proactor.h \
- $(WRAPPER_ROOT)/ace/Message_Block.h \
- $(WRAPPER_ROOT)/ace/Malloc.h \
- $(WRAPPER_ROOT)/ace/Malloc_T.h \
- $(WRAPPER_ROOT)/ace/Memory_Pool.h \
- $(WRAPPER_ROOT)/ace/Mem_Map.h \
- $(WRAPPER_ROOT)/ace/ReactorEx.h \
- $(WRAPPER_ROOT)/ace/Message_Queue.h \
- $(WRAPPER_ROOT)/ace/IO_Cntl_Msg.h \
- $(WRAPPER_ROOT)/ace/Strategies.h \
- $(WRAPPER_ROOT)/ace/Strategies_T.h \
- $(WRAPPER_ROOT)/ace/Svc_Conf_Tokens.h \
- $(WRAPPER_ROOT)/ace/Map_Manager.h \
- $(WRAPPER_ROOT)/ace/Svc_Handler.h \
- $(WRAPPER_ROOT)/ace/Synch_Options.h \
- $(WRAPPER_ROOT)/ace/Task.h \
- $(WRAPPER_ROOT)/ace/Task_T.h \
- $(WRAPPER_ROOT)/ace/Connector.i \
- Thr_Proxy_Handler.h Proxy_Handler.h \
- $(WRAPPER_ROOT)/ace/SOCK_Connector.h \
- $(WRAPPER_ROOT)/ace/SOCK_Connector.i \
- Event_Forwarding_Discriminator.h Concurrency_Strategies.h Event.h \
- Consumer_Dispatch_Set.h
-.obj/Thr_Proxy_Handler.o .shobj/Thr_Proxy_Handler.so: Thr_Proxy_Handler.cpp Thr_Proxy_Handler.h \
- Proxy_Handler.h \
- $(WRAPPER_ROOT)/ace/Service_Config.h \
- $(WRAPPER_ROOT)/ace/Service_Object.h \
- $(WRAPPER_ROOT)/ace/Shared_Object.h \
- $(WRAPPER_ROOT)/ace/ACE.h \
- $(WRAPPER_ROOT)/ace/OS.h \
- $(WRAPPER_ROOT)/ace/Time_Value.h \
- $(WRAPPER_ROOT)/ace/config.h \
- $(WRAPPER_ROOT)/ace/stdcpp.h \
- $(WRAPPER_ROOT)/ace/Trace.h \
- $(WRAPPER_ROOT)/ace/Log_Msg.h \
- $(WRAPPER_ROOT)/ace/Log_Record.h \
- $(WRAPPER_ROOT)/ace/Log_Priority.h \
- $(WRAPPER_ROOT)/ace/Log_Record.i \
- $(WRAPPER_ROOT)/ace/ACE.i \
- $(WRAPPER_ROOT)/ace/Event_Handler.h \
- $(WRAPPER_ROOT)/ace/Thread_Manager.h \
- $(WRAPPER_ROOT)/ace/Thread.h \
- $(WRAPPER_ROOT)/ace/Synch.h \
- $(WRAPPER_ROOT)/ace/SV_Semaphore_Complex.h \
- $(WRAPPER_ROOT)/ace/SV_Semaphore_Simple.h \
- $(WRAPPER_ROOT)/ace/SV_Semaphore_Simple.i \
- $(WRAPPER_ROOT)/ace/SV_Semaphore_Complex.i \
- $(WRAPPER_ROOT)/ace/Synch_T.h \
- $(WRAPPER_ROOT)/ace/Signal.h \
- $(WRAPPER_ROOT)/ace/Set.h \
- $(WRAPPER_ROOT)/ace/Reactor.h \
- $(WRAPPER_ROOT)/ace/Handle_Set.h \
- $(WRAPPER_ROOT)/ace/Timer_Queue.h \
- $(WRAPPER_ROOT)/ace/Token.h \
- $(WRAPPER_ROOT)/ace/Pipe.h \
- $(WRAPPER_ROOT)/ace/Pipe.i \
- $(WRAPPER_ROOT)/ace/SOCK_Stream.h \
- $(WRAPPER_ROOT)/ace/SOCK_IO.h \
- $(WRAPPER_ROOT)/ace/SOCK.h \
- $(WRAPPER_ROOT)/ace/Addr.h \
- $(WRAPPER_ROOT)/ace/IPC_SAP.h \
- $(WRAPPER_ROOT)/ace/IPC_SAP.i \
- $(WRAPPER_ROOT)/ace/SOCK.i \
- $(WRAPPER_ROOT)/ace/SOCK_IO.i \
- $(WRAPPER_ROOT)/ace/INET_Addr.h \
- $(WRAPPER_ROOT)/ace/SOCK_Stream.i \
- $(WRAPPER_ROOT)/ace/Reactor.i \
- $(WRAPPER_ROOT)/ace/Proactor.h \
- $(WRAPPER_ROOT)/ace/Message_Block.h \
- $(WRAPPER_ROOT)/ace/Malloc.h \
- $(WRAPPER_ROOT)/ace/Malloc_T.h \
- $(WRAPPER_ROOT)/ace/Memory_Pool.h \
- $(WRAPPER_ROOT)/ace/Mem_Map.h \
- $(WRAPPER_ROOT)/ace/ReactorEx.h \
- $(WRAPPER_ROOT)/ace/Message_Queue.h \
- $(WRAPPER_ROOT)/ace/IO_Cntl_Msg.h \
- $(WRAPPER_ROOT)/ace/Strategies.h \
- $(WRAPPER_ROOT)/ace/Strategies_T.h \
- $(WRAPPER_ROOT)/ace/Svc_Conf_Tokens.h \
- $(WRAPPER_ROOT)/ace/SOCK_Connector.h \
- $(WRAPPER_ROOT)/ace/SOCK_Connector.i \
- $(WRAPPER_ROOT)/ace/Svc_Handler.h \
- $(WRAPPER_ROOT)/ace/Synch_Options.h \
- $(WRAPPER_ROOT)/ace/Task.h \
- $(WRAPPER_ROOT)/ace/Task_T.h \
- Event_Forwarding_Discriminator.h \
- $(WRAPPER_ROOT)/ace/Map_Manager.h \
- Concurrency_Strategies.h Event.h Consumer_Dispatch_Set.h \
- Proxy_Handler_Connector.h \
- $(WRAPPER_ROOT)/ace/Connector.h \
- $(WRAPPER_ROOT)/ace/Connector.i
# IF YOU PUT ANYTHING HERE IT WILL GO AWAY
diff --git a/apps/Gateway/Gateway/Proxy_Handler.cpp b/apps/Gateway/Gateway/Proxy_Handler.cpp
index 2f161c171f6..cbfed1760d3 100644
--- a/apps/Gateway/Gateway/Proxy_Handler.cpp
+++ b/apps/Gateway/Gateway/Proxy_Handler.cpp
@@ -1,6 +1,7 @@
// $Id$
#include "Event_Channel.h"
+#include "Concrete_Proxy_Handlers.h"
void
Proxy_Handler::id (ACE_INT32 id)
@@ -28,19 +29,18 @@ Proxy_Handler::total_bytes (size_t bytes)
this->total_bytes_ += bytes;
}
-Proxy_Handler::Proxy_Handler (ACE_Event_Channel &ec,
- const ACE_INET_Addr &remote_addr,
- const ACE_INET_Addr &local_addr,
- ACE_INT32 conn_id)
- : remote_addr_ (remote_addr),
- local_addr_ (local_addr),
- id_ (conn_id),
+Proxy_Handler::Proxy_Handler (const Proxy_Config_Info &pci)
+ : remote_addr_ (pci.remote_port_, pci.host_),
+ local_addr_ (pci.local_port_),
+ id_ (pci.proxy_id_),
total_bytes_ (0),
state_ (Proxy_Handler::IDLE),
timeout_ (1),
- max_timeout_ (Proxy_Handler::MAX_RETRY_TIMEOUT),
- event_channel_ (ec)
+ max_timeout_ (pci.max_retry_timeout_),
+ event_channel_ (*pci.event_channel_)
{
+ // Set the priority of the Proxy.
+ this->priority (int (pci.priority_));
}
// Set the proxy_role.
@@ -123,8 +123,9 @@ int
Proxy_Handler::handle_close (ACE_HANDLE, ACE_Reactor_Mask)
{
ACE_DEBUG ((LM_DEBUG,
- "(%t) shutting down Proxy_Handler %d on handle %d\n",
- this->id (), this->get_handle ()));
+ "(%t) shutting down %s Proxy_Handler %d on handle %d\n",
+ this->proxy_role () == 'C' ? "Consumer" : "Supplier",
+ this->id (), this->get_handle ()));
// Restart the connection, if possible.
return this->event_channel_.reinitiate_proxy_connection (this);
@@ -144,7 +145,8 @@ Proxy_Handler::state (Proxy_Handler::State s)
int
Proxy_Handler::open (void *)
{
- ACE_DEBUG ((LM_DEBUG, "(%t) Proxy_Handler's handle = %d\n",
+ ACE_DEBUG ((LM_DEBUG, "(%t) %s Proxy_Handler's handle = %d\n",
+ this->proxy_role () == 'C' ? "Consumer" : "Supplier",
this->peer ().get_handle ()));
// Turn on non-blocking I/O.
@@ -183,395 +185,54 @@ Proxy_Handler::local_addr (void)
return this->local_addr_;
}
-Consumer_Proxy::Consumer_Proxy (ACE_Event_Channel &ec,
- const ACE_INET_Addr &remote_addr,
- const ACE_INET_Addr &local_addr,
- ACE_INT32 conn_id)
- : Proxy_Handler (ec, remote_addr, local_addr, conn_id)
-{
- this->proxy_role_ = 'C';
- this->msg_queue ()->high_water_mark (Consumer_Proxy::MAX_QUEUE_SIZE);
-}
-
-// This method should be called only when the Consumer shuts down
-// unexpectedly. This method simply marks the Proxy_Handler as having
-// failed so that handle_close () can reconnect.
-
-int
-Consumer_Proxy::handle_input (ACE_HANDLE)
-{
- char buf[1];
-
- this->state (Proxy_Handler::FAILED);
-
- switch (this->peer ().recv (buf, sizeof buf))
- {
- case -1:
- ACE_ERROR_RETURN ((LM_ERROR,
- "(%t) Peer has failed unexpectedly for Consumer_Proxy %d\n",
- this->id ()), -1);
- /* NOTREACHED */
- case 0:
- ACE_ERROR_RETURN ((LM_ERROR,
- "(%t) Peer has shutdown unexpectedly for Consumer_Proxy %d\n",
- this->id ()), -1);
- /* NOTREACHED */
- default:
- ACE_ERROR_RETURN ((LM_ERROR,
- "(%t) Consumer is erroneously sending input to Consumer_Proxy %d\n",
- this->id ()), -1);
- /* NOTREACHED */
- }
-}
+// Make the appropriate type of <Proxy_Handler> (i.e.,
+// <Consumer_Proxy>, <Supplier_Proxy>, <Thr_Consumer_Proxy>, or
+// <Thr_Supplier_Proxy>).
-// Perform a non-blocking put() of event. If we are unable to send
-// the entire event the remainder is re-queued at the *front* of the
-// Event_List.
-
-int
-Consumer_Proxy::nonblk_put (ACE_Message_Block *event)
+Proxy_Handler *
+Proxy_Handler_Factory::make_proxy_handler (const Proxy_Config_Info &pci)
{
- // Try to send the event. If we don't send it all (e.g., due to
- // flow control), then re-queue the remainder at the head of the
- // Event_List and ask the ACE_Reactor to inform us (via
- // handle_output()) when it is possible to try again.
+ Proxy_Handler *proxy_handler = 0;
- ssize_t n = this->send (event);
+ // The next few lines of code are dependent on whether we are making
+ // a threaded/reactive Supplier_Proxy/Consumer_Proxy.
- if (n == -1)
+ if (pci.proxy_role_ == 'C') // Configure a Consumer_Proxy.
{
- // Things have gone wrong, let's try to close down and set up a
- // new reconnection by calling handle_close().
- this->state (Proxy_Handler::FAILED);
- this->handle_close ();
- return -1;
- }
- else if (errno == EWOULDBLOCK) // Didn't manage to send everything.
- {
- ACE_DEBUG ((LM_DEBUG,
- "(%t) queueing activated on handle %d to routing id %d\n",
- this->get_handle (), this->id ()));
-
- // ACE_Queue in *front* of the list to preserve order.
- if (this->msg_queue ()->enqueue_head
- (event, (ACE_Time_Value *) &ACE_Time_Value::zero) == -1)
- ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "enqueue_head"), -1);
-
- // Tell ACE_Reactor to call us back when we can send again.
- else if (ACE_Service_Config::reactor ()->schedule_wakeup
- (this, ACE_Event_Handler::WRITE_MASK) == -1)
- ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "schedule_wakeup"), -1);
- return 0;
- }
- else
- return n;
-}
-
-ssize_t
-Consumer_Proxy::send (ACE_Message_Block *event)
-{
- ssize_t len = event->length ();
- ssize_t n = this->peer ().send (event->rd_ptr (), len);
-
- if (n <= 0)
- return errno == EWOULDBLOCK ? 0 : n;
- else if (n < len)
- // Re-adjust pointer to skip over the part we did send.
- event->rd_ptr (n);
- else // if (n == length)
- {
- // The whole event is sent, we now decrement the reference count
- // (which deletes itself with it reaches 0.
- event->release ();
- errno = 0;
- }
- this->total_bytes (n);
- return n;
-}
-
-// Finish sending an event when flow control conditions abate.
-// This method is automatically called by the ACE_Reactor.
-
-int
-Consumer_Proxy::handle_output (ACE_HANDLE)
-{
- ACE_Message_Block *event = 0;
-
- ACE_DEBUG ((LM_DEBUG,
- "(%t) in handle_output on handle %d\n",
- this->get_handle ()));
- // The list had better not be empty, otherwise there's a bug!
-
- if (this->msg_queue ()->dequeue_head
- (event, (ACE_Time_Value *) &ACE_Time_Value::zero) != -1)
- {
- switch (this->nonblk_put (event))
- {
- case 0: // Partial send.
- ACE_ASSERT (errno == EWOULDBLOCK);
- // Didn't write everything this time, come back later...
- break;
-
- case -1:
- // We are responsible for releasing an ACE_Message_Block if
- // failures occur.
- event->release ();
- ACE_ERROR ((LM_ERROR, "(%t) %p\n", "transmission failure"));
-
- /* FALLTHROUGH */
- default: // Sent the whole thing.
-
- // If we succeed in writing the entire event (or we did not
- // fail due to EWOULDBLOCK) then check if there are more
- // events on the Message_Queue. If there aren't, tell the
- // ACE_Reactor not to notify us anymore (at least until
- // there are new events queued up).
-
- if (this->msg_queue ()->is_empty ())
- {
- ACE_DEBUG ((LM_DEBUG,
- "(%t) queueing deactivated on handle %d to routing id %d\n",
- this->get_handle (), this->id ()));
-
-
- if (ACE_Service_Config::reactor ()->cancel_wakeup
- (this, ACE_Event_Handler::WRITE_MASK) == -1)
- ACE_ERROR ((LM_ERROR, "(%t) %p\n", "cancel_wakeup"));
- }
- }
- }
- else
- ACE_ERROR ((LM_ERROR, "(%t) %p\n", "dequeue_head"));
- return 0;
-}
-
-// Send an event to a Consumer (may queue if necessary).
-
-int
-Consumer_Proxy::put (ACE_Message_Block *event, ACE_Time_Value *)
-{
- if (this->msg_queue ()->is_empty ())
- // Try to send the event *without* blocking!
- return this->nonblk_put (event);
- else
- // If we have queued up events due to flow control then just
- // enqueue and return.
- return this->msg_queue ()->enqueue_tail
- (event, (ACE_Time_Value *) &ACE_Time_Value::zero);
-}
-
-Supplier_Proxy::Supplier_Proxy (ACE_Event_Channel &ec,
- const ACE_INET_Addr &remote_addr,
- const ACE_INET_Addr &local_addr,
- ACE_INT32 conn_id)
- : msg_frag_ (0),
- Proxy_Handler (ec, remote_addr, local_addr, conn_id)
-{
- this->proxy_role_ = 'S';
- this->msg_queue ()->high_water_mark (0);
-}
-
-// Receive an Event from a Supplier. Handles fragmentation.
-//
-// The event returned from recv consists of two parts:
-//
-// 1. The Address part, contains the "virtual" routing id.
-//
-// 2. The Data part, which contains the actual data to be forwarded.
-//
-// The reason for having two parts is to shield the higher layers
-// of software from knowledge of the event structure.
-
-int
-Supplier_Proxy::recv (ACE_Message_Block *&forward_addr)
-{
- if (this->msg_frag_ == 0)
- // No existing fragment...
- ACE_NEW_RETURN (this->msg_frag_,
- ACE_Message_Block (sizeof (Event)),
- -1);
-
- Event *event = (Event *) this->msg_frag_->rd_ptr ();
- ssize_t header_received = 0;
-
- const ssize_t HEADER_SIZE = sizeof (Event_Header);
- ssize_t header_bytes_left_to_read =
- HEADER_SIZE - this->msg_frag_->length ();
-
- if (header_bytes_left_to_read > 0)
- {
- header_received = this->peer ().recv
- (this->msg_frag_->wr_ptr (), header_bytes_left_to_read);
-
- if (header_received == -1 /* error */
- || header_received == 0 /* EOF */)
- {
- ACE_ERROR ((LM_ERROR, "%p\n",
- "Recv error during header read "));
- ACE_DEBUG ((LM_DEBUG,
- "attempted to read %d\n",
- header_bytes_left_to_read));
- this->msg_frag_ = this->msg_frag_->release ();
- return header_received;
- }
-
- // Bump the write pointer by the amount read.
- this->msg_frag_->wr_ptr (header_received);
-
- // At this point we may or may not have the ENTIRE header.
- if (this->msg_frag_->length () < HEADER_SIZE)
- {
- ACE_DEBUG ((LM_DEBUG,
- "Partial header received: only %d bytes\n",
- this->msg_frag_->length ()));
- // Notify the caller that we didn't get an entire event.
- errno = EWOULDBLOCK;
- return -1;
- }
-
- // Convert the header into host byte order so that we can access
- // it directly without having to repeatedly muck with it...
- event->header_.decode ();
-
- if (event->header_.len_ > sizeof event->data_)
- {
- // This data_ payload is too big!
- errno = EINVAL;
- ACE_DEBUG ((LM_DEBUG,
- "Data payload is too big (%d bytes)\n",
- event->header_.len_));
- return -1;
- }
-
- }
-
- // At this point there is a complete, valid header in Event. Now we
- // need to get the event payload. Due to incomplete reads this may
- // not be the first time we've read in a fragment for this message.
- // We account for this here. Note that the first time in here
- // msg_frag_->wr_ptr() will point to event->data_. Every time we do
- // a successful fragment read, we advance wr_ptr(). Therefore, by
- // subtracting how much we've already read from the
- // event->header_.len_ we complete the data_bytes_left_to_read...
-
- ssize_t data_bytes_left_to_read =
- ssize_t (event->header_.len_ - (msg_frag_->wr_ptr () - event->data_));
-
- ssize_t data_received =
- this->peer ().recv (this->msg_frag_->wr_ptr (), data_bytes_left_to_read);
-
- // Try to receive the remainder of the event.
-
- switch (data_received)
- {
- case -1:
- if (errno == EWOULDBLOCK)
- // This might happen if only the header came through.
- return -1;
+#if defined (ACE_HAS_THREADS)
+ // Create a threaded Consumer_Proxy.
+ if (ACE_BIT_ENABLED (pci.event_channel_->options ().threading_strategy_,
+ ACE_Event_Channel_Options::OUTPUT_MT))
+ ACE_NEW_RETURN (proxy_handler,
+ Thr_Consumer_Proxy (pci),
+ 0);
+
+ // Create a reactive Consumer_Proxy.
else
- /* FALLTHROUGH */;
-
- case 0: // Premature EOF.
- this->msg_frag_ = this->msg_frag_->release ();
- return 0;
-
- default:
- // Set the write pointer at 1 past the end of the event.
- this->msg_frag_->wr_ptr (data_received);
-
- if (data_received != data_bytes_left_to_read)
- {
- errno = EWOULDBLOCK;
- // Inform caller that we didn't get the whole event.
- return -1;
- }
- else
- {
- // Set the read pointer to the beginning of the event.
- this->msg_frag_->rd_ptr (this->msg_frag_->base ());
-
- // Allocate an event forwarding header and chain the data
- // portion onto its continuation field.
- forward_addr = new ACE_Message_Block (sizeof (Event_Key),
- ACE_Message_Block::MB_PROTO,
- this->msg_frag_);
- if (forward_addr == 0)
- {
- this->msg_frag_ = this->msg_frag_->release ();
- errno = ENOMEM;
- return -1;
- }
-
- Event_Key event_addr (this->id (),
- event->header_.supplier_id_,
- event->header_.type_);
- // Copy the forwarding address from the Event_Key into
- // forward_addr.
- forward_addr->copy ((char *) &event_addr, sizeof (Event_Key));
-
- // Reset the pointer to indicate we've got an entire event.
- this->msg_frag_ = 0;
- }
-
- this->total_bytes (data_received + header_received);
-#if defined (VERBOSE)
- ACE_DEBUG ((LM_DEBUG, "(%t) connection id = %d, supplier id = %d, len = %d, payload = %*s",
- event_addr.conn_id_, event->header_.supplier_id_, event->header_.len_,
- event->header_.len_, event->data_));
-#else
- ACE_DEBUG ((LM_DEBUG, "(%t) supplier id = %d, cur len = %d, total bytes read = %d\n",
- event->header_.supplier_id_, event->header_.len_, data_received + header_received));
-#endif /* VERBOSE */
-
- // Encode before returning so that we can set things out in
- // network byte order.
- event->header_.encode ();
- return data_received + header_received;
+#endif /* ACE_HAS_THREADS */
+ ACE_NEW_RETURN (proxy_handler,
+ Consumer_Proxy (pci),
+ 0);
}
-}
-
-// Receive various types of input (e.g., Peer event from the
-// gatewayd, as well as stdio).
-
-int
-Supplier_Proxy::handle_input (ACE_HANDLE)
-{
- ACE_Message_Block *forward_addr = 0;
-
- switch (this->recv (forward_addr))
+ else // proxy_role == 'S', so configure a Supplier_Proxy.
{
- case 0:
- // Note that a peer should never initiate a shutdown by closing
- // the connection. Instead, it should reconnect.
- this->state (Proxy_Handler::FAILED);
- ACE_ERROR_RETURN ((LM_ERROR,
- "(%t) Peer has closed down unexpectedly for Input Proxy_Handler %d\n",
- this->id ()), -1);
- /* NOTREACHED */
- case -1:
- if (errno == EWOULDBLOCK)
- // A short-read, we'll come back and finish it up later on!
- return 0;
- else // A weird problem occurred, shut down and start again.
- {
- this->state (Proxy_Handler::FAILED);
- ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p for Input Proxy_Handler %d\n",
- "Peer has failed unexpectedly",
- this->id ()), -1);
- }
- /* NOTREACHED */
- default:
- return this->forward (forward_addr);
+#if defined (ACE_HAS_THREADS)
+ // Create a threaded Supplier_Proxy.
+ if (ACE_BIT_ENABLED (pci.event_channel_->options ().threading_strategy_,
+ ACE_Event_Channel_Options::INPUT_MT))
+ ACE_NEW_RETURN (proxy_handler,
+ Thr_Supplier_Proxy (pci),
+ 0);
+
+ // Create a reactive Supplier_Proxy.
+ else
+#endif /* ACE_HAS_THREAD */
+ ACE_NEW_RETURN (proxy_handler,
+ Supplier_Proxy (pci),
+ 0);
}
-}
-// Forward an event to its appropriate Consumer(s). This delegates to
-// the <ACE_Event_Channel> to do the actual forwarding.
-
-int
-Supplier_Proxy::forward (ACE_Message_Block *forward_addr)
-{
- return this->event_channel_.put (forward_addr);
+ return proxy_handler;
}
#if defined (ACE_TEMPLATES_REQUIRE_SPECIALIZATION)
diff --git a/apps/Gateway/Gateway/Proxy_Handler.h b/apps/Gateway/Gateway/Proxy_Handler.h
index ffce18d1c71..e75795bd340 100644
--- a/apps/Gateway/Gateway/Proxy_Handler.h
+++ b/apps/Gateway/Gateway/Proxy_Handler.h
@@ -20,27 +20,22 @@
#include "ace/Service_Config.h"
#include "ace/SOCK_Connector.h"
#include "ace/Svc_Handler.h"
-#include "Event_Forwarding_Discriminator.h"
-#include "Consumer_Dispatch_Set.h"
+#include "Config_Files.h"
#include "Event.h"
// Forward declaration.
-class Proxy_Handler_Connector;
class ACE_Event_Channel;
-class Proxy_Handler : public ACE_Svc_Handler<ACE_SOCK_STREAM, SYNCH_STRATEGY>
+class Proxy_Handler : public ACE_Svc_Handler<ACE_SOCK_STREAM, ACE_MT_SYNCH>
// = TITLE
- // Proxy_Handler contains info about connection state and addressing.
+ // Proxy_Handler contains info about connection state and addressing.
//
// = DESCRIPTION
// The Proxy_Handler classes process events sent to the Event
// Channel from Suppliers and forward them to Consumers.
{
public:
- Proxy_Handler (ACE_Event_Channel &,
- const ACE_INET_Addr &remote_addr,
- const ACE_INET_Addr &local_addr,
- ACE_INT32 conn_id);
+ Proxy_Handler (const Proxy_Config_Info &);
virtual int open (void * = 0);
// Initialize and activate a single-threaded Proxy_Handler (called by
@@ -131,72 +126,18 @@ protected:
// the events from Consumers and Suppliers.
};
-class Supplier_Proxy : public Proxy_Handler
+class Proxy_Handler_Factory : public ACE_Svc_Handler<ACE_SOCK_STREAM, ACE_NULL_SYNCH>
// = TITLE
- // Handles reception of Events from Suppliers
+ // Creates the appropriate type of <Proxy_Handler>
//
// = DESCRIPTION
- // Performs framing and error checking.
+ // <Proxy_Handler>s can include <Consumer_Proxy>,
+ // <Supplier_Proxy>, <Thr_Consumer_Proxy>, or
+ // <Thr_Supplier_Proxy>).
{
public:
- // = Initialization method.
- Supplier_Proxy (ACE_Event_Channel &,
- const ACE_INET_Addr &remote_addr,
- const ACE_INET_Addr &local_addr,
- ACE_INT32 conn_id);
-
-protected:
- // = All the following methods are upcalls, so they can be protected.
-
- virtual int handle_input (ACE_HANDLE = ACE_INVALID_HANDLE);
- // Receive and process peer events.
-
- virtual int recv (ACE_Message_Block *&);
- // Receive an event from a Supplier.
-
- int forward (ACE_Message_Block *event);
- // Forward the <event> to its appropriate Consumer. This delegates
- // to the <ACE_Event_Channel> to do the actual forwarding.
-
- ACE_Message_Block *msg_frag_;
- // Keep track of event fragment to handle non-blocking recv's from
- // Suppliers.
-};
-
-class Consumer_Proxy : public Proxy_Handler
- // = TITLE
- // Handles transmission of events to Consumers.
- //
- // = DESCRIPTION
- // Performs queueing and error checking. Uses a single-threaded
- // Reactive approach to handle flow control.
-{
-public:
- // = Initialization method.
- Consumer_Proxy (ACE_Event_Channel &,
- const ACE_INET_Addr &remote_addr,
- const ACE_INET_Addr &local_addr,
- ACE_INT32 conn_id);
-
- virtual int put (ACE_Message_Block *event,
- ACE_Time_Value * = 0);
- // Send an event to a Consumer (may be queued if necessary).
-
-protected:
- // = We'll allow up to 16 megabytes to be queued per-output proxy.
- enum {MAX_QUEUE_SIZE = 1024 * 1024 * 16};
-
- virtual int handle_output (ACE_HANDLE);
- // Finish sending event when flow control conditions abate.
-
- int nonblk_put (ACE_Message_Block *mb);
- // Perform a non-blocking put().
-
- virtual ssize_t send (ACE_Message_Block *);
- // Send an event to a Consumer.
-
- virtual int handle_input (ACE_HANDLE);
- // Receive and process shutdowns from a Consumer.
+ Proxy_Handler *make_proxy_handler (const Proxy_Config_Info &);
+ // Make the appropriate type of <Proxy_Handler>.
};
#endif /* _PROXY_HANDLER */
diff --git a/apps/Gateway/Gateway/Proxy_Handler_Acceptor.cpp b/apps/Gateway/Gateway/Proxy_Handler_Acceptor.cpp
new file mode 100644
index 00000000000..05b597ad0ba
--- /dev/null
+++ b/apps/Gateway/Gateway/Proxy_Handler_Acceptor.cpp
@@ -0,0 +1,9 @@
+// $Id$
+
+#include "Event_Channel.h"
+#include "Proxy_Handler_Acceptor.h"
+
+Proxy_Handler_Acceptor::Proxy_Handler_Acceptor (ACE_Event_Channel &ec)
+ : event_channel_ (ec)
+{
+}
diff --git a/apps/Gateway/Gateway/Proxy_Handler_Acceptor.h b/apps/Gateway/Gateway/Proxy_Handler_Acceptor.h
new file mode 100644
index 00000000000..179dc0e38d3
--- /dev/null
+++ b/apps/Gateway/Gateway/Proxy_Handler_Acceptor.h
@@ -0,0 +1,40 @@
+/* -*- C++ -*- */
+// $Id$
+
+// ============================================================================
+//
+// = LIBRARY
+// apps
+//
+// = FILENAME
+// Proxy_Handler_acceptor.h
+//
+// = AUTHOR
+// Doug Schmidt
+//
+// ============================================================================
+
+#if !defined (_PROXY_HANDLER_ACCEPTOR)
+#define _PROXY_HANDLER_ACCEPTOR
+
+#include "ace/Acceptor.h"
+#include "ace/SOCK_Acceptor.h"
+#include "Proxy_Handler.h"
+
+// Forward declaration
+class ACE_Event_Channel;
+
+class Proxy_Handler_Acceptor : public ACE_Acceptor<Proxy_Handler_Factory, ACE_SOCK_ACCEPTOR>
+ // = TITLE
+ // A concrete factory class that setups connections to peerds
+ // and produces a new Proxy_Handler object to do the dirty work...
+{
+public:
+ Proxy_Handler_Acceptor (ACE_Event_Channel &);
+
+protected:
+ ACE_Event_Channel &event_channel_;
+ // Reference to the event channel.
+};
+
+#endif /* _PROXY_HANDLER_ACCEPTOR */
diff --git a/apps/Gateway/Gateway/Proxy_Handler_Connector.cpp b/apps/Gateway/Gateway/Proxy_Handler_Connector.cpp
index dc18eca8500..ba2e5162238 100644
--- a/apps/Gateway/Gateway/Proxy_Handler_Connector.cpp
+++ b/apps/Gateway/Gateway/Proxy_Handler_Connector.cpp
@@ -1,6 +1,6 @@
-#include "Proxy_Handler_Connector.h"
// $Id$
+#include "Proxy_Handler_Connector.h"
Proxy_Handler_Connector::Proxy_Handler_Connector (void)
{
diff --git a/apps/Gateway/Gateway/Proxy_Handler_Connector.h b/apps/Gateway/Gateway/Proxy_Handler_Connector.h
index 3baea75934a..c53579f9ec4 100644
--- a/apps/Gateway/Gateway/Proxy_Handler_Connector.h
+++ b/apps/Gateway/Gateway/Proxy_Handler_Connector.h
@@ -18,7 +18,8 @@
#define _IO_HANDLER_CONNECTOR
#include "ace/Connector.h"
-#include "Thr_Proxy_Handler.h"
+#include "ace/SOCK_Connector.h"
+#include "Proxy_Handler.h"
class Proxy_Handler_Connector : public ACE_Connector<Proxy_Handler, ACE_SOCK_CONNECTOR>
// = TITLE
diff --git a/apps/Gateway/Gateway/consumer_config b/apps/Gateway/Gateway/consumer_config
index 58884340e61..fa3e63a0b26 100644
--- a/apps/Gateway/Gateway/consumer_config
+++ b/apps/Gateway/Gateway/consumer_config
@@ -1,8 +1,7 @@
# Consumer configuration file
# Conn ID Supplier ID Type Consumers
# ------- ----------- ------- ------------
-# 1 1 0 3,4,5
- 1 1 0 3
- 3 1 0 3
-# 4 1 0 4
-# 5 1 0 5
+ 1 1 0 3,4
+ 2 2 0 3
+# 3 3 0 4
+# 4 4 0 5
diff --git a/apps/Gateway/Gateway/gatewayd.cpp b/apps/Gateway/Gateway/gatewayd.cpp
index b0af5f7cace..babb6c80350 100644
--- a/apps/Gateway/Gateway/gatewayd.cpp
+++ b/apps/Gateway/Gateway/gatewayd.cpp
@@ -1,11 +1,28 @@
-// Main driver program for the Gateway. This file is completely
// $Id$
+// Main driver program for the Gateway. This file is completely
// generic code due to the ACE Service Configurator framework!
#include "ace/Service_Config.h"
#include "Gateway.h"
+class Service_Ptr
+ // = TITLE
+ // Holds the <ACE_Service_Object> * until we're done.
+{
+public:
+ Service_Ptr (ACE_Service_Object *so)
+ : service_object_ (so) {}
+
+ ~Service_Ptr (void) { this->service_object_->fini (); }
+
+ ACE_Service_Object *operator-> () { return this->service_object_; }
+
+private:
+ ACE_Service_Object *service_object_;
+ // Holds the service object until we're done.
+};
+
int
main (int argc, char *argv[])
{
@@ -15,19 +32,27 @@ main (int argc, char *argv[])
{
if (errno != ENOENT)
ACE_ERROR ((LM_ERROR, "%p\n%a", "open", 1));
- else // Use static binding.
+ else // Use static linking.
{
- ACE_Service_Object *so = ACE_SVC_INVOKE (Gateway);
+ Service_Ptr sp = ACE_SVC_INVOKE (Gateway);
- if (so->init (argc - 1, argv + 1) == -1)
+ if (sp->init (argc - 1, argv + 1) == -1)
ACE_ERROR ((LM_ERROR, "%p\n%a", "init", 1));
+
+ // Run forever, performing the configured services until we are
+ // shut down by a SIGINT/SIGQUIT signal.
+
+ daemon.run_reactor_event_loop ();
+
+ // Destructors of Service_Ptr's automagically call fini().
}
}
+ else // Use dynamic linking.
- // Run forever, performing the configured services until we are shut
- // down by a SIGINT/SIGQUIT signal.
+ // Run forever, performing the configured services until we are
+ // shut down by a SIGINT/SIGQUIT signal.
- daemon.run_reactor_event_loop ();
+ daemon.run_reactor_event_loop ();
return 0;
}
diff --git a/apps/Gateway/Gateway/proxy_config b/apps/Gateway/Gateway/proxy_config
new file mode 100644
index 00000000000..2f26c2c430b
--- /dev/null
+++ b/apps/Gateway/Gateway/proxy_config
@@ -0,0 +1,12 @@
+# Connection configuration file.
+# Conn Host Remote Proxy Max Retry Local Priority
+# ID Port Role Timeout Port
+# ---- -------- ------ ------ ---------- ----- --------
+ 1 merengue.cs 10002 S 32 0 1
+ 2 flamenco.cs 10002 S 32 0 1
+ 3 mambo.cs 10002 C 32 0 1
+ 4 lambada.cs 10002 C 32 0 1
+# 5 lambada.cs 10002 C 32 0 1
+# 6 tango.cs 10002 C 32 0 1
+# 7 tango.cs 5001 S 32 0 1
+# 8 tango.cs 5002 C 32 0 1
diff --git a/apps/Gateway/Gateway/svc.conf b/apps/Gateway/Gateway/svc.conf
index db7a9d04ad5..c822713287b 100644
--- a/apps/Gateway/Gateway/svc.conf
+++ b/apps/Gateway/Gateway/svc.conf
@@ -1,3 +1,3 @@
#static Svc_Manager "-d -p 2913"
-dynamic Gateway Service_Object * ./Gateway:_make_Gateway() active "-d -c connection_config -f consumer_config"
+dynamic Gateway Service_Object * ./Gateway:_make_Gateway() active "-d -c -P proxy_config -C consumer_config"
diff --git a/apps/Gateway/Peer/Makefile b/apps/Gateway/Peer/Makefile
index 3176a62b455..46c265a1df9 100644
--- a/apps/Gateway/Peer/Makefile
+++ b/apps/Gateway/Peer/Makefile
@@ -38,6 +38,8 @@ include $(WRAPPER_ROOT)/include/makeinclude/rules.local.GNU
# Local targets
#----------------------------------------------------------------------------
+INCLDIRS += -I../Gateway
+
#----------------------------------------------------------------------------
# Dependencies
#----------------------------------------------------------------------------
@@ -59,7 +61,6 @@ include $(WRAPPER_ROOT)/include/makeinclude/rules.local.GNU
$(WRAPPER_ROOT)/ace/Log_Priority.h \
$(WRAPPER_ROOT)/ace/Log_Record.i \
$(WRAPPER_ROOT)/ace/ACE.i \
- Peer.h \
$(WRAPPER_ROOT)/ace/Service_Config.h \
$(WRAPPER_ROOT)/ace/Service_Object.h \
$(WRAPPER_ROOT)/ace/Shared_Object.h \
@@ -72,20 +73,12 @@ include $(WRAPPER_ROOT)/include/makeinclude/rules.local.GNU
$(WRAPPER_ROOT)/ace/SV_Semaphore_Simple.i \
$(WRAPPER_ROOT)/ace/SV_Semaphore_Complex.i \
$(WRAPPER_ROOT)/ace/Synch_T.h \
- $(WRAPPER_ROOT)/ace/Set.h \
- $(WRAPPER_ROOT)/ace/Proactor.h \
- $(WRAPPER_ROOT)/ace/Message_Block.h \
- $(WRAPPER_ROOT)/ace/Malloc.h \
- $(WRAPPER_ROOT)/ace/Malloc_T.h \
- $(WRAPPER_ROOT)/ace/Memory_Pool.h \
$(WRAPPER_ROOT)/ace/Signal.h \
- $(WRAPPER_ROOT)/ace/Mem_Map.h \
- $(WRAPPER_ROOT)/ace/Timer_Queue.h \
- $(WRAPPER_ROOT)/ace/Timer_Queue.i \
- $(WRAPPER_ROOT)/ace/ReactorEx.h \
- $(WRAPPER_ROOT)/ace/Token.h \
+ $(WRAPPER_ROOT)/ace/Set.h \
$(WRAPPER_ROOT)/ace/Reactor.h \
$(WRAPPER_ROOT)/ace/Handle_Set.h \
+ $(WRAPPER_ROOT)/ace/Timer_Queue.h \
+ $(WRAPPER_ROOT)/ace/Token.h \
$(WRAPPER_ROOT)/ace/Pipe.h \
$(WRAPPER_ROOT)/ace/Pipe.i \
$(WRAPPER_ROOT)/ace/SOCK_Stream.h \
@@ -99,18 +92,26 @@ include $(WRAPPER_ROOT)/include/makeinclude/rules.local.GNU
$(WRAPPER_ROOT)/ace/INET_Addr.h \
$(WRAPPER_ROOT)/ace/SOCK_Stream.i \
$(WRAPPER_ROOT)/ace/Reactor.i \
+ $(WRAPPER_ROOT)/ace/Proactor.h \
+ $(WRAPPER_ROOT)/ace/Message_Block.h \
+ $(WRAPPER_ROOT)/ace/Malloc.h \
+ $(WRAPPER_ROOT)/ace/Malloc_T.h \
+ $(WRAPPER_ROOT)/ace/Memory_Pool.h \
+ $(WRAPPER_ROOT)/ace/Mem_Map.h \
+ $(WRAPPER_ROOT)/ace/ReactorEx.h \
+ $(WRAPPER_ROOT)/ace/Message_Queue.h \
+ $(WRAPPER_ROOT)/ace/IO_Cntl_Msg.h \
+ $(WRAPPER_ROOT)/ace/Strategies.h \
+ $(WRAPPER_ROOT)/ace/Strategies_T.h \
$(WRAPPER_ROOT)/ace/Svc_Conf_Tokens.h \
$(WRAPPER_ROOT)/ace/Svc_Handler.h \
$(WRAPPER_ROOT)/ace/Synch_Options.h \
$(WRAPPER_ROOT)/ace/Task.h \
$(WRAPPER_ROOT)/ace/Task_T.h \
- $(WRAPPER_ROOT)/ace/Message_Queue.h \
- $(WRAPPER_ROOT)/ace/IO_Cntl_Msg.h \
$(WRAPPER_ROOT)/ace/Acceptor.h \
- $(WRAPPER_ROOT)/ace/Strategies.h \
$(WRAPPER_ROOT)/ace/Acceptor.i \
$(WRAPPER_ROOT)/ace/SOCK_Acceptor.h \
$(WRAPPER_ROOT)/ace/Map_Manager.h \
- Event.h
+ ../Gateway/Event.h
# IF YOU PUT ANYTHING HERE IT WILL GO AWAY
diff --git a/apps/Gateway/Peer/peerd.cpp b/apps/Gateway/Peer/peerd.cpp
index ab59567fc08..b3afee295e9 100644
--- a/apps/Gateway/Peer/peerd.cpp
+++ b/apps/Gateway/Peer/peerd.cpp
@@ -6,6 +6,23 @@
#include "ace/Service_Config.h"
#include "Peer.h"
+class Service_Ptr
+ // = TITLE
+ // Holds the <ACE_Service_Object> * until we're done.
+{
+public:
+ Service_Ptr (ACE_Service_Object *so)
+ : service_object_ (so) {}
+
+ ~Service_Ptr (void) { this->service_object_->fini (); }
+
+ ACE_Service_Object *operator-> () { return this->service_object_; }
+
+private:
+ ACE_Service_Object *service_object_;
+ // Holds the service object until we're done.
+};
+
int
main (int argc, char *argv[])
{
@@ -15,30 +32,28 @@ main (int argc, char *argv[])
{
if (errno != ENOENT)
ACE_ERROR ((LM_ERROR, "%p\n%a", "open", 1));
- else // Use static binding.
+ else // Use static linking.
{
- ACE_Service_Object *so = ACE_SVC_INVOKE (Peer_Acceptor);
+ Service_Ptr sp = ACE_SVC_INVOKE (Peer_Acceptor);
- if (so->init (argc - 1, argv + 1) == -1)
+ if (sp->init (argc - 1, argv + 1) == -1)
ACE_ERROR ((LM_ERROR, "%p\n%a", "init", 1));
- }
- }
-
- // Create an adapter to end the event loop.
- ACE_Sig_Adapter sa ((ACE_Sig_Handler_Ex) ACE_Service_Config::end_reactor_event_loop);
+
+ // Run forever, performing the configured services until we
+ // are shut down by a SIGINT/SIGQUIT signal.
- ACE_Sig_Set sig_set;
- sig_set.sig_add (SIGINT);
- sig_set.sig_add (SIGQUIT);
+ daemon.run_reactor_event_loop ();
- // Register ourselves to receive SIGINT and SIGQUIT so we can shut
- // down gracefully via signals.
- ACE_Service_Config::reactor ()->register_handler (sig_set, &sa);
+ // Destructors of Service_Ptr's automagically call fini().
- // Run forever, performing the configured services until we are shut
- // down by a SIGINT/SIGQUIT signal.
+ }
+ }
+ else // Use dynamic linking.
- daemon.run_reactor_event_loop ();
+ // Run forever, performing the configured services until we are shut
+ // down by a SIGINT/SIGQUIT signal.
+
+ daemon.run_reactor_event_loop ();
return 0;
}
diff --git a/apps/Gateway/README b/apps/Gateway/README
index ffd7e52bdf4..ce0c97856be 100644
--- a/apps/Gateway/README
+++ b/apps/Gateway/README
@@ -1,9 +1,10 @@
OVERVIEW
-This directory contains source code for a prototype application-level
-gateway implemented with ACE. This prototype was developed in my
-cs422 OS class at Washington University. It illustrates the use of
-Event Channels to forward events from Suppliers to Consumers in a
+This directory contains source code for an application-level
+Communication Gateway implemented with ACE. This prototype was
+developed in my cs422 OS class at Washington University in 1994. The
+Gateway has recently been updated to illustrate the use of ACE Event
+Channels, which forward events from Suppliers to Consumers in a
distributed system.
You can get a paper that explains the patterns used in this
@@ -17,33 +18,33 @@ There are 2 directories:
Gateway
- -- The application Gateway, which must be started *after* all
- the Peers described below). This process reads the
- connection_config and consumer_config files:
+ -- The application Gateway, which generally should be started
+ after all the Peers described below. This process reads the
+ proxy_config and consumer_config files:
- 1. The connection_config file is used to establish the "physical
- configuration." It tells the Gateway what connections
- to establish with particular hosts using particular
- ports.
+ 1. The proxy_config file is used to establish the "physical
+ configuration" of the Consumer and Supplier proxies. It
+ tells the Gateway what connections to establish with
+ particular hosts using particular ports.
2. The consumer_config file is used to establish the "logical
configuration." It tells the Gateway how to forward
- data coming from "sources" to the appropriate
- "destinations."
+ data coming from Suppliers to the appropriate
+ Consumers.
Peer
- -- The test driver programs that must be started *before* the
- Gateway. To do anything interesting you'll need at least
- two Peers: one to supply events and one to consume events.
- In the configuration files, these two types of Peers are
- designated as follows:
+ -- The test driver programs, which generally should be started
+ before the Gateway process. To do anything interesting you'll
+ need at least two Peers: one to supply events and one to consume
+ events. In the configuration files, these two types of Peers
+ are designated as follows:
1. Supplier Peers (designated by an 'S' in the Gateway's
- connection_config configuration file). These Peers are
+ proxy_config configuration file). These Peers are
"suppliers" of events to the Gateway.
2. Consumer Peers (designated by an 'C' in the Gateway's
- connection_config file). These Peers are "consumers" of
+ proxy_config file). These Peers are "consumers" of
events forwarded by the Gateway (forwarding is based on
the settings in the consumer_config configuration file).
@@ -54,18 +55,27 @@ To run the tests do the following:
1. Compile everything (i.e., first compile the ACE libraries, then
compile the the Gateway directories).
-2. Edit the consumer_config and connection_config files as discussed
- above to indicate the desired physical and logical mappings.
+2. Edit the consumer_config and proxy_config files as discussed
+ above to indicate the desired physical and logical mappings
+ for Consumers and Suppliers.
3. Start up the Peers (peerd). You can start up as many as you
- like, as per the connection_config file, but you'll need at least
- two (one to supply and one to consume). I typically start up each
+ like, as per the proxy_config file, but you'll need at least
+ two (i.e., one Supplier and Consumer). I typically start up each
Peer in a different window on a different machine. The Peers
- should print out some diagnostic info and then block awaiting
+ will print out some diagnostic info and then block awaiting
connections from the Gateway.
4. Start up the Gateway (gatewayd). This will print out a bunch of
- events as it reads the config files and connects to all the Peers.
+ messages as it reads the config files and connects to all the Peers.
+ By default, the Gateway is purely reactive, i.e., it handles
+ Consumers and Suppliers in the same thread of control. However,
+ if you give the '-t OUTPUT_MT' option the Gateway will handle all
+ Consumers in separate threads. If you give the '-t INPUT_MT' option
+ the Gateway will handle all Suppliers in separate threads. If you
+ give the '-t INPUT_MT|OUTPUT_MT' option both Consumers and Suppliers
+ will be handled in the separate threads.
+
Assuming everything works, then all the Peers will be connected.
If some of the Peers aren't set up correctly then the Gateway will
use an exponential backoff algorithm to attempt to reestablish
@@ -82,8 +92,9 @@ To run the tests do the following:
trying to reestablish the connection using the same exponential
backoff algorithm it used for the initial connection establishment.
-7. When you want to terminate a Gateway, just type ^C and the process
- will shut down gracefully.
+7. When you want to terminate a Gateway, just type ^C or type any
+ characters in the ./gatewayd window and the process will shut down
+ gracefully.
Please let me know if there are any questions.
diff --git a/apps/Orbix-Examples/Event_Comm/Consumer/Input_Handler.cpp b/apps/Orbix-Examples/Event_Comm/Consumer/Input_Handler.cpp
index 004b54d6fd5..a695277a216 100644
--- a/apps/Orbix-Examples/Event_Comm/Consumer/Input_Handler.cpp
+++ b/apps/Orbix-Examples/Event_Comm/Consumer/Input_Handler.cpp
@@ -52,8 +52,8 @@ Input_Handler::handle_close (ACE_HANDLE, ACE_Reactor_Mask)
} ENDTRY;
}
// Don't execute a callback here otherwise we'll recurse indefinitely!
- if (ACE_Service_Config::reactor ()->remove_handler (this, ACE_Event_Handler::READ_MASK
- | ACE_Event_Handler::DONT_CALL) == -1)
+ if (ACE_Service_Config::reactor ()->remove_handler
+ (this, ACE_Event_Handler::READ_MASK | ACE_Event_Handler::DONT_CALL) == -1)
ACE_ERROR ((LM_ERROR, "%p\n", "remove_handler"));
// *Must* be allocated dyanmically!
@@ -67,8 +67,8 @@ Input_Handler::Input_Handler (Notification_Receiver_Handler *ch,
handle_ (handle),
consumer_initiated_shutdown_ (0)
{
- if (ACE_Service_Config::reactor ()->register_handler (this,
- ACE_Event_Handler::READ_MASK) == -1)
+ if (ACE_Service_Config::reactor ()->register_handler
+ (this, ACE_Event_Handler::READ_MASK) == -1)
ACE_ERROR ((LM_ERROR, "Input_Handler::Input_Handler\n"));
}
diff --git a/apps/Orbix-Examples/Event_Comm/Supplier/Input_Handler.cpp b/apps/Orbix-Examples/Event_Comm/Supplier/Input_Handler.cpp
index 0769ecfcd69..8305d6293bc 100644
--- a/apps/Orbix-Examples/Event_Comm/Supplier/Input_Handler.cpp
+++ b/apps/Orbix-Examples/Event_Comm/Supplier/Input_Handler.cpp
@@ -24,8 +24,8 @@ Input_Handler::handle_close (ACE_HANDLE, ACE_Reactor_Mask)
} ENDTRY;
// Don't execute a callback here otherwise we'll recurse indefinitely!
- if (ACE_Service_Config::reactor ()->remove_handler (this, ACE_Event_Handler::READ_MASK
- | ACE_Event_Handler::DONT_CALL) == -1)
+ if (ACE_Service_Config::reactor ()->remove_handler
+ (this, ACE_Event_Handler::READ_MASK | ACE_Event_Handler::DONT_CALL) == -1)
ACE_ERROR ((LM_ERROR, "%p\n", "remove_handler"));
// *Must* be allocated dyanmically!
diff --git a/examples/ASX/Event_Server/Event_Server/Consumer_Router.cpp b/examples/ASX/Event_Server/Event_Server/Consumer_Router.cpp
index 41b5dcb6ea6..67c61bca9b4 100644
--- a/examples/ASX/Event_Server/Event_Server/Consumer_Router.cpp
+++ b/examples/ASX/Event_Server/Event_Server/Consumer_Router.cpp
@@ -1,31 +1,10 @@
-
// $Id$
#include "Consumer_Router.h"
#include "Options.h"
-#if defined (ACE_HAS_THREADS)
-
-typedef Acceptor_Factory<Consumer_Handler, CONSUMER_KEY> CONSUMER_FACTORY;
-
-int
-Consumer_Handler::open (void *a)
-{
- CONSUMER_FACTORY *af = (CONSUMER_FACTORY *) a;
- this->router_task_ = af->router ();
- return this->Peer_Handler<CONSUMER_ROUTER, CONSUMER_KEY>::open (a);
-}
-
-Consumer_Handler::Consumer_Handler (ACE_Thread_Manager *tm)
- : Peer_Handler<CONSUMER_ROUTER, CONSUMER_KEY> (tm)
-{
-}
-
-// Create a new handler that will interact with a consumer and point
-// its ROUTER_TASK_ data member to the CONSUMER_ROUTER.
-
-Consumer_Router::Consumer_Router (ACE_Thread_Manager *tm)
- : CONSUMER_ROUTER (tm)
+Consumer_Router::Consumer_Router (Peer_Router_Context *prc)
+ : Peer_Router (prc)
{
}
@@ -34,93 +13,122 @@ Consumer_Router::Consumer_Router (ACE_Thread_Manager *tm)
int
Consumer_Router::open (void *)
{
- assert (this->is_reader ());
-
- char *argv[4];
-
- argv[0] = (char *) this->name ();
- argv[1] = "-p";
- argv[2] = options.consumer_port ();
- argv[3] = 0;
-
- if (this->init (2, &argv[1]) == -1)
- return -1;
+ if (this->is_writer ())
+ {
+ // Set the Peer_Router_Context to point back to us so that if
+ // any Consumer's "accidentally" send us data we'll be able to
+ // handle it.
+ this->context ()->peer_router (this);
+
+ // Make this an active object to handle the error cases in a
+ // separate thread.
+ return this->activate (Options::instance ()->t_flags ());
+ }
+ else // if (this->is_reader ())
- // Make this an active object.
- return this->activate (options.t_flags ());
+ // Nothing to do since this side is primarily used to transmit to
+ // Consumers, rather than receive.
+ return 0;
}
int
Consumer_Router::close (u_long)
{
- assert (this->is_reader ());
- ACE_DEBUG ((LM_DEBUG, "(%t) closing Consumer_Router\n"));
- this->peer_map_.close ();
+ ACE_DEBUG ((LM_DEBUG, "(%t) closing Consumer_Router %s\n",
+ this->is_reader () ? "reader" : "writer"));
- // Inform the thread to shut down.
- this->msg_queue ()->deactivate ();
+ if (this->is_writer ())
+ // Inform the thread to shut down.
+ this->msg_queue ()->deactivate ();
+
+ // Both writer and reader call release(), so the context knows when
+ // to clean itself up.
+ this->context ()->release ();
return 0;
}
-// Handle incoming messages in a separate thread.
+// Handle incoming messages in a separate thread.
int
Consumer_Router::svc (void)
{
- ACE_Thread_Control tc (this->thr_mgr ());
- ACE_Message_Block *mb = 0;
+ assert (this->is_writer ());
- assert (this->is_reader ());
+ ACE_Thread_Control tc (this->thr_mgr ());
- ACE_DEBUG ((LM_DEBUG, "(%t) starting svc in Consumer_Router\n"));
+ ACE_DEBUG ((LM_DEBUG,
+ "(%t) starting svc in Consumer_Router\n"));
- while (this->getq (mb) >= 0)
+ for (ACE_Message_Block *mb = 0;
+ this->getq (mb) >= 0;
+ )
{
- ACE_DEBUG ((LM_DEBUG, "Consumer_Router is routing via send_peers\n"));
- if (this->send_peers (mb) == -1)
+ ACE_DEBUG ((LM_DEBUG,
+ "(%t) warning: Consumer_Router is forwarding a message to Supplier_Router\n"));
+
+ // Pass this message down to the next Module's writer Task.
+ if (this->put_next (mb) == -1)
ACE_ERROR_RETURN ((LM_ERROR,
"(%t) send_peers failed in Consumer_Router\n"),
-1);
}
- ACE_DEBUG ((LM_DEBUG, "(%t) stopping svc in Consumer_Router\n"));
+
+ ACE_DEBUG ((LM_DEBUG,
+ "(%t) stopping svc in Consumer_Router\n"));
return 0;
// Note the implicit ACE_OS::thr_exit() via destructor.
}
-// Send a MESSAGE_BLOCK to the supplier(s).
+// Send a <Message_Block> to the supplier(s).
int
-Consumer_Router::put (ACE_Message_Block *mb, ACE_Time_Value *)
+Consumer_Router::put (ACE_Message_Block *mb,
+ ACE_Time_Value *)
{
- assert (this->is_reader ());
+ // Perform the necessary control operations before passing
+ // the message down the stream.
if (mb->msg_type () == ACE_Message_Block::MB_IOCTL)
{
this->control (mb);
return this->put_next (mb);
}
- else
- // Queue up the message, which will be processed by
- // Consumer_Router::svc().
+
+ // If we're the reader side then we're responsible for broadcasting
+ // messages to Consumers.
+
+ else if (this->is_reader ())
+ {
+ if (this->context ()->send_peers (mb) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "(%t) send_peers failed in Consumer_Router\n"),
+ -1);
+ else
+ return 0;
+ }
+ else // if (this->is_writer ())
+
+ // Queue up the message to processed by Consumer_Router::svc().
+ // Since we don't expect to be getting many of these messages, we
+ // queue them up and run them in a separate thread to avoid taxing
+ // the main thread.
return this->putq (mb);
}
-
-// Return information about the Client_Router ACE_Module.
+// Return information about the Client_Router ACE_Module.
int
Consumer_Router::info (char **strp, size_t length) const
{
- char buf[BUFSIZ];
+ char buf[BUFSIZ];
ACE_INET_Addr addr;
const char *mod_name = this->name ();
- ACE_SOCK_Acceptor &sa = this->acceptor_->acceptor ();
- if (sa.get_local_addr (addr) == -1)
+ if (this->context ()->acceptor ().get_local_addr (addr) == -1)
return -1;
- ACE_OS::sprintf (buf, "%s\t %d/%s %s",
- mod_name, addr.get_port_number (), "tcp",
- "# consumer router\n");
+ ACE_OS::sprintf (buf, "%s\t %d/%s %s (%s)\n",
+ mod_name, addr.get_port_number (), "tcp",
+ "# consumer router", this->is_reader () ? "reader" : "writer");
if (*strp == 0 && (*strp = ACE_OS::strdup (mod_name)) == 0)
return -1;
@@ -128,5 +136,3 @@ Consumer_Router::info (char **strp, size_t length) const
ACE_OS::strncpy (*strp, mod_name, length);
return ACE_OS::strlen (mod_name);
}
-
-#endif /* ACE_HAS_THREADS */
diff --git a/examples/ASX/Event_Server/Event_Server/Consumer_Router.h b/examples/ASX/Event_Server/Event_Server/Consumer_Router.h
index 1fb9f4e40f2..cdaf7090b97 100644
--- a/examples/ASX/Event_Server/Event_Server/Consumer_Router.h
+++ b/examples/ASX/Event_Server/Event_Server/Consumer_Router.h
@@ -1,46 +1,49 @@
/* -*- C++ -*- */
// $Id$
-/* The interface between one or more consumers and an Event Server ACE_Stream */
-
#if !defined (_CONSUMER_ROUTER_H)
#define _CONSUMER_ROUTER_H
-#include "ace/Thread_Manager.h"
#include "ace/SOCK_Acceptor.h"
#include "ace/UPIPE_Acceptor.h"
#include "ace/Svc_Handler.h"
#include "Peer_Router.h"
-#if defined (ACE_HAS_THREADS)
-
-class Consumer_Handler; /* Forward declaration... */
-
-typedef ACE_HANDLE CONSUMER_KEY;
-
-typedef Peer_Router<Consumer_Handler, CONSUMER_KEY> CONSUMER_ROUTER;
-
-class Consumer_Handler : public Peer_Handler<CONSUMER_ROUTER, CONSUMER_KEY>
-{
-public:
- Consumer_Handler (ACE_Thread_Manager *tm = 0);
- virtual int open (void *);
-};
-
-class Consumer_Router : public CONSUMER_ROUTER
+class Consumer_Router : public Peer_Router
+ // = TITLE
+ // Provides the interface between one or more Consumers and the
+ // Event Server ACE_Stream.
{
public:
- Consumer_Router (ACE_Thread_Manager *thr_manager);
+ Consumer_Router (Peer_Router_Context *prc);
+ // Initialization method.
protected:
- /* ACE_Task hooks. */
+ // = ACE_Task hooks.
+
+ // All of these methods are called via base class pointers by the
+ // ACE Stream apparatus. Therefore, we can put them in the
+ // protected section.
+
virtual int open (void *a = 0);
+ // Called by the Stream to initialize the router.
+
virtual int close (u_long flags = 0);
+ // Called by the Stream to shutdown the router.
+
virtual int put (ACE_Message_Block *msg, ACE_Time_Value * = 0);
+ // Called by the Consumer_Handler to pass a message to the Router.
+ // The Router queues up this message, which is then processed in the
+ // <svc> method in a separate thread.
+
virtual int svc (void);
+ // Runs in a separate thread to dequeue messages and pass them up
+ // the stream.
+
+ // = Dynamic linking hooks.
- /* Dynamic linking hooks */
virtual int info (char **info_string, size_t length) const;
+ // Returns information about this service.
};
-#endif /* ACE_HAS_THREADS */
+
#endif /* _CONSUMER_ROUTER_H */
diff --git a/examples/ASX/Event_Server/Event_Server/Event_Analyzer.cpp b/examples/ASX/Event_Server/Event_Server/Event_Analyzer.cpp
index 5a63224eb51..e413c16acce 100644
--- a/examples/ASX/Event_Server/Event_Server/Event_Analyzer.cpp
+++ b/examples/ASX/Event_Server/Event_Server/Event_Analyzer.cpp
@@ -1,8 +1,7 @@
-#include "Event_Analyzer.h"
// $Id$
-
-#if defined (ACE_HAS_THREADS)
+#include "Options.h"
+#include "Event_Analyzer.h"
int
Event_Analyzer::open (void *)
@@ -35,6 +34,10 @@ Event_Analyzer::control (ACE_Message_Block *mb)
int
Event_Analyzer::put (ACE_Message_Block *mb, ACE_Time_Value *)
{
+ if (Options::instance ()->debug ())
+ ACE_DEBUG ((LM_DEBUG, "(%t) passing through Event_Analyser::put() (%s)\n",
+ this->is_reader () ? "reader" : "writer"));
+
if (mb->msg_type () == ACE_Message_Block::MB_IOCTL)
this->control (mb);
@@ -64,5 +67,3 @@ Event_Analyzer::info (char **strp, size_t length) const
ACE_OS::strncpy (*strp, mod_name, length);
return ACE_OS::strlen (mod_name);
}
-
-#endif /* ACE_HAS_THREADS */
diff --git a/examples/ASX/Event_Server/Event_Server/Event_Analyzer.h b/examples/ASX/Event_Server/Event_Server/Event_Analyzer.h
index 498e91d476e..3eb012c6add 100644
--- a/examples/ASX/Event_Server/Event_Server/Event_Analyzer.h
+++ b/examples/ASX/Event_Server/Event_Server/Event_Analyzer.h
@@ -1,8 +1,6 @@
/* -*- C++ -*- */
// $Id$
-/* Signal router */
-
#if !defined (_EVENT_ANALYZER_H)
#define _EVENT_ANALYZER_H
@@ -11,16 +9,19 @@
#include "ace/Task.h"
#include "ace/Synch.h"
-#if defined (ACE_HAS_THREADS)
-
class Event_Analyzer : public ACE_Task<ACE_MT_SYNCH>
+ // = TITLE
+ // This is a "no-op" class that just forwards all its message
+ // blocks onto its neighboring Module in the Stream. In a real
+ // application these tasks would be where the Stream processing
+ // would go.
{
public:
virtual int open (void *a = 0);
virtual int close (u_long flags = 0);
virtual int put (ACE_Message_Block *msg, ACE_Time_Value * = 0);
- /* Dynamic linking hooks */
+ // Dynamic linking hooks.
virtual int init (int argc, char *argv[]);
virtual int fini (void);
virtual int info (char **info_string, size_t length) const;
@@ -29,5 +30,4 @@ private:
virtual int control (ACE_Message_Block *);
};
-#endif /* ACE_HAS_THREADS */
#endif /* _EVENT_ANALYZER_H */
diff --git a/examples/ASX/Event_Server/Event_Server/Makefile b/examples/ASX/Event_Server/Event_Server/Makefile
index c94314b19fd..15c448561a1 100644
--- a/examples/ASX/Event_Server/Event_Server/Makefile
+++ b/examples/ASX/Event_Server/Event_Server/Makefile
@@ -120,21 +120,21 @@ include $(WRAPPER_ROOT)/include/makeinclude/rules.local.GNU
$(WRAPPER_ROOT)/ace/Set.h \
$(WRAPPER_ROOT)/ace/Mem_Map.h \
$(WRAPPER_ROOT)/ace/IO_Cntl_Msg.h \
+ $(WRAPPER_ROOT)/ace/Strategies.h \
+ $(WRAPPER_ROOT)/ace/Strategies_T.h \
$(WRAPPER_ROOT)/ace/Service_Config.h \
- $(WRAPPER_ROOT)/ace/Proactor.h \
- $(WRAPPER_ROOT)/ace/Timer_Queue.h \
- $(WRAPPER_ROOT)/ace/Timer_Queue.i \
- $(WRAPPER_ROOT)/ace/ReactorEx.h \
- $(WRAPPER_ROOT)/ace/Token.h \
$(WRAPPER_ROOT)/ace/Reactor.h \
$(WRAPPER_ROOT)/ace/Handle_Set.h \
+ $(WRAPPER_ROOT)/ace/Timer_Queue.h \
+ $(WRAPPER_ROOT)/ace/Token.h \
$(WRAPPER_ROOT)/ace/Pipe.h \
$(WRAPPER_ROOT)/ace/Pipe.i \
$(WRAPPER_ROOT)/ace/Reactor.i \
+ $(WRAPPER_ROOT)/ace/Proactor.h \
+ $(WRAPPER_ROOT)/ace/ReactorEx.h \
$(WRAPPER_ROOT)/ace/Svc_Conf_Tokens.h \
Peer_Router.h \
$(WRAPPER_ROOT)/ace/Acceptor.h \
- $(WRAPPER_ROOT)/ace/Strategies.h \
$(WRAPPER_ROOT)/ace/Acceptor.i \
Options.h \
$(WRAPPER_ROOT)/ace/Profile_Timer.h \
@@ -174,10 +174,35 @@ include $(WRAPPER_ROOT)/include/makeinclude/rules.local.GNU
$(WRAPPER_ROOT)/ace/Thread_Manager.h \
$(WRAPPER_ROOT)/ace/Thread.h \
$(WRAPPER_ROOT)/ace/Task_T.h \
- $(WRAPPER_ROOT)/ace/Message_Queue.h
+ $(WRAPPER_ROOT)/ace/Message_Queue.h \
+ $(WRAPPER_ROOT)/ace/Strategies.h \
+ $(WRAPPER_ROOT)/ace/Strategies_T.h \
+ $(WRAPPER_ROOT)/ace/Service_Config.h \
+ $(WRAPPER_ROOT)/ace/Reactor.h \
+ $(WRAPPER_ROOT)/ace/Handle_Set.h \
+ $(WRAPPER_ROOT)/ace/Timer_Queue.h \
+ $(WRAPPER_ROOT)/ace/Token.h \
+ $(WRAPPER_ROOT)/ace/Pipe.h \
+ $(WRAPPER_ROOT)/ace/Pipe.i \
+ $(WRAPPER_ROOT)/ace/SOCK_Stream.h \
+ $(WRAPPER_ROOT)/ace/SOCK_IO.h \
+ $(WRAPPER_ROOT)/ace/SOCK.h \
+ $(WRAPPER_ROOT)/ace/Addr.h \
+ $(WRAPPER_ROOT)/ace/IPC_SAP.h \
+ $(WRAPPER_ROOT)/ace/IPC_SAP.i \
+ $(WRAPPER_ROOT)/ace/SOCK.i \
+ $(WRAPPER_ROOT)/ace/SOCK_IO.i \
+ $(WRAPPER_ROOT)/ace/INET_Addr.h \
+ $(WRAPPER_ROOT)/ace/SOCK_Stream.i \
+ $(WRAPPER_ROOT)/ace/Reactor.i \
+ $(WRAPPER_ROOT)/ace/Proactor.h \
+ $(WRAPPER_ROOT)/ace/ReactorEx.h \
+ $(WRAPPER_ROOT)/ace/Svc_Conf_Tokens.h
.obj/Consumer_Router.o .shobj/Consumer_Router.so: Consumer_Router.cpp Consumer_Router.h \
- $(WRAPPER_ROOT)/ace/Thread_Manager.h \
- $(WRAPPER_ROOT)/ace/Thread.h \
+ $(WRAPPER_ROOT)/ace/SOCK_Acceptor.h \
+ $(WRAPPER_ROOT)/ace/SOCK_Stream.h \
+ $(WRAPPER_ROOT)/ace/SOCK_IO.h \
+ $(WRAPPER_ROOT)/ace/SOCK.h \
$(WRAPPER_ROOT)/ace/ACE.h \
$(WRAPPER_ROOT)/ace/OS.h \
$(WRAPPER_ROOT)/ace/Time_Value.h \
@@ -189,17 +214,6 @@ include $(WRAPPER_ROOT)/include/makeinclude/rules.local.GNU
$(WRAPPER_ROOT)/ace/Log_Priority.h \
$(WRAPPER_ROOT)/ace/Log_Record.i \
$(WRAPPER_ROOT)/ace/ACE.i \
- $(WRAPPER_ROOT)/ace/Synch.h \
- $(WRAPPER_ROOT)/ace/SV_Semaphore_Complex.h \
- $(WRAPPER_ROOT)/ace/SV_Semaphore_Simple.h \
- $(WRAPPER_ROOT)/ace/SV_Semaphore_Simple.i \
- $(WRAPPER_ROOT)/ace/SV_Semaphore_Complex.i \
- $(WRAPPER_ROOT)/ace/Synch_T.h \
- $(WRAPPER_ROOT)/ace/Event_Handler.h \
- $(WRAPPER_ROOT)/ace/SOCK_Acceptor.h \
- $(WRAPPER_ROOT)/ace/SOCK_Stream.h \
- $(WRAPPER_ROOT)/ace/SOCK_IO.h \
- $(WRAPPER_ROOT)/ace/SOCK.h \
$(WRAPPER_ROOT)/ace/Addr.h \
$(WRAPPER_ROOT)/ace/IPC_SAP.h \
$(WRAPPER_ROOT)/ace/IPC_SAP.i \
@@ -214,6 +228,13 @@ include $(WRAPPER_ROOT)/include/makeinclude/rules.local.GNU
$(WRAPPER_ROOT)/ace/Message_Block.h \
$(WRAPPER_ROOT)/ace/Malloc.h \
$(WRAPPER_ROOT)/ace/Malloc_T.h \
+ $(WRAPPER_ROOT)/ace/Synch.h \
+ $(WRAPPER_ROOT)/ace/SV_Semaphore_Complex.h \
+ $(WRAPPER_ROOT)/ace/SV_Semaphore_Simple.h \
+ $(WRAPPER_ROOT)/ace/SV_Semaphore_Simple.i \
+ $(WRAPPER_ROOT)/ace/SV_Semaphore_Complex.i \
+ $(WRAPPER_ROOT)/ace/Synch_T.h \
+ $(WRAPPER_ROOT)/ace/Event_Handler.h \
$(WRAPPER_ROOT)/ace/Memory_Pool.h \
$(WRAPPER_ROOT)/ace/Signal.h \
$(WRAPPER_ROOT)/ace/Set.h \
@@ -222,8 +243,23 @@ include $(WRAPPER_ROOT)/include/makeinclude/rules.local.GNU
$(WRAPPER_ROOT)/ace/Task.h \
$(WRAPPER_ROOT)/ace/Service_Object.h \
$(WRAPPER_ROOT)/ace/Shared_Object.h \
+ $(WRAPPER_ROOT)/ace/Thread_Manager.h \
+ $(WRAPPER_ROOT)/ace/Thread.h \
$(WRAPPER_ROOT)/ace/Task_T.h \
$(WRAPPER_ROOT)/ace/Message_Queue.h \
+ $(WRAPPER_ROOT)/ace/Strategies.h \
+ $(WRAPPER_ROOT)/ace/Strategies_T.h \
+ $(WRAPPER_ROOT)/ace/Service_Config.h \
+ $(WRAPPER_ROOT)/ace/Reactor.h \
+ $(WRAPPER_ROOT)/ace/Handle_Set.h \
+ $(WRAPPER_ROOT)/ace/Timer_Queue.h \
+ $(WRAPPER_ROOT)/ace/Token.h \
+ $(WRAPPER_ROOT)/ace/Pipe.h \
+ $(WRAPPER_ROOT)/ace/Pipe.i \
+ $(WRAPPER_ROOT)/ace/Reactor.i \
+ $(WRAPPER_ROOT)/ace/Proactor.h \
+ $(WRAPPER_ROOT)/ace/ReactorEx.h \
+ $(WRAPPER_ROOT)/ace/Svc_Conf_Tokens.h \
$(WRAPPER_ROOT)/ace/SPIPE.h \
$(WRAPPER_ROOT)/ace/SPIPE_Addr.h \
$(WRAPPER_ROOT)/ace/SPIPE.i \
@@ -234,21 +270,8 @@ include $(WRAPPER_ROOT)/include/makeinclude/rules.local.GNU
$(WRAPPER_ROOT)/ace/UPIPE_Acceptor.i \
$(WRAPPER_ROOT)/ace/Svc_Handler.h \
$(WRAPPER_ROOT)/ace/Synch_Options.h \
- $(WRAPPER_ROOT)/ace/Service_Config.h \
- $(WRAPPER_ROOT)/ace/Proactor.h \
- $(WRAPPER_ROOT)/ace/Timer_Queue.h \
- $(WRAPPER_ROOT)/ace/Timer_Queue.i \
- $(WRAPPER_ROOT)/ace/ReactorEx.h \
- $(WRAPPER_ROOT)/ace/Token.h \
- $(WRAPPER_ROOT)/ace/Reactor.h \
- $(WRAPPER_ROOT)/ace/Handle_Set.h \
- $(WRAPPER_ROOT)/ace/Pipe.h \
- $(WRAPPER_ROOT)/ace/Pipe.i \
- $(WRAPPER_ROOT)/ace/Reactor.i \
- $(WRAPPER_ROOT)/ace/Svc_Conf_Tokens.h \
Peer_Router.h \
$(WRAPPER_ROOT)/ace/Acceptor.h \
- $(WRAPPER_ROOT)/ace/Strategies.h \
$(WRAPPER_ROOT)/ace/Acceptor.i \
$(WRAPPER_ROOT)/ace/Map_Manager.h \
Options.h \
@@ -278,20 +301,12 @@ include $(WRAPPER_ROOT)/include/makeinclude/rules.local.GNU
$(WRAPPER_ROOT)/ace/SV_Semaphore_Simple.i \
$(WRAPPER_ROOT)/ace/SV_Semaphore_Complex.i \
$(WRAPPER_ROOT)/ace/Synch_T.h \
- $(WRAPPER_ROOT)/ace/Set.h \
- $(WRAPPER_ROOT)/ace/Proactor.h \
- $(WRAPPER_ROOT)/ace/Message_Block.h \
- $(WRAPPER_ROOT)/ace/Malloc.h \
- $(WRAPPER_ROOT)/ace/Malloc_T.h \
- $(WRAPPER_ROOT)/ace/Memory_Pool.h \
$(WRAPPER_ROOT)/ace/Signal.h \
- $(WRAPPER_ROOT)/ace/Mem_Map.h \
- $(WRAPPER_ROOT)/ace/Timer_Queue.h \
- $(WRAPPER_ROOT)/ace/Timer_Queue.i \
- $(WRAPPER_ROOT)/ace/ReactorEx.h \
- $(WRAPPER_ROOT)/ace/Token.h \
+ $(WRAPPER_ROOT)/ace/Set.h \
$(WRAPPER_ROOT)/ace/Reactor.h \
$(WRAPPER_ROOT)/ace/Handle_Set.h \
+ $(WRAPPER_ROOT)/ace/Timer_Queue.h \
+ $(WRAPPER_ROOT)/ace/Token.h \
$(WRAPPER_ROOT)/ace/Pipe.h \
$(WRAPPER_ROOT)/ace/Pipe.i \
$(WRAPPER_ROOT)/ace/SOCK_Stream.h \
@@ -305,6 +320,17 @@ include $(WRAPPER_ROOT)/include/makeinclude/rules.local.GNU
$(WRAPPER_ROOT)/ace/INET_Addr.h \
$(WRAPPER_ROOT)/ace/SOCK_Stream.i \
$(WRAPPER_ROOT)/ace/Reactor.i \
+ $(WRAPPER_ROOT)/ace/Proactor.h \
+ $(WRAPPER_ROOT)/ace/Message_Block.h \
+ $(WRAPPER_ROOT)/ace/Malloc.h \
+ $(WRAPPER_ROOT)/ace/Malloc_T.h \
+ $(WRAPPER_ROOT)/ace/Memory_Pool.h \
+ $(WRAPPER_ROOT)/ace/Mem_Map.h \
+ $(WRAPPER_ROOT)/ace/ReactorEx.h \
+ $(WRAPPER_ROOT)/ace/Message_Queue.h \
+ $(WRAPPER_ROOT)/ace/IO_Cntl_Msg.h \
+ $(WRAPPER_ROOT)/ace/Strategies.h \
+ $(WRAPPER_ROOT)/ace/Strategies_T.h \
$(WRAPPER_ROOT)/ace/Svc_Conf_Tokens.h \
$(WRAPPER_ROOT)/ace/Get_Opt.h Options.h \
$(WRAPPER_ROOT)/ace/Profile_Timer.h \
@@ -314,9 +340,6 @@ include $(WRAPPER_ROOT)/include/makeinclude/rules.local.GNU
$(WRAPPER_ROOT)/ace/Synch_Options.h \
$(WRAPPER_ROOT)/ace/Task.h \
$(WRAPPER_ROOT)/ace/Task_T.h \
- $(WRAPPER_ROOT)/ace/Message_Queue.h \
- $(WRAPPER_ROOT)/ace/IO_Cntl_Msg.h \
- $(WRAPPER_ROOT)/ace/Strategies.h \
$(WRAPPER_ROOT)/ace/Acceptor.i \
$(WRAPPER_ROOT)/ace/SOCK_Acceptor.h \
$(WRAPPER_ROOT)/ace/Map_Manager.h
diff --git a/examples/ASX/Event_Server/Event_Server/Options.cpp b/examples/ASX/Event_Server/Event_Server/Options.cpp
index 087d8ed57c5..1255bbecf6a 100644
--- a/examples/ASX/Event_Server/Event_Server/Options.cpp
+++ b/examples/ASX/Event_Server/Event_Server/Options.cpp
@@ -6,20 +6,30 @@
#include "Options.h"
-#if defined (ACE_HAS_THREADS)
+/* static */
+Options *Options::instance_ = 0;
+
+Options *
+Options::instance (void)
+{
+ if (Options::instance_ == 0)
+ Options::instance_ = new Options;
+
+ return Options::instance_;
+}
Options::Options (void)
- : thr_count_ (4),
- t_flags_ (THR_DETACHED),
- high_water_mark_ (8 * 1024),
- low_water_mark_ (1024),
- message_size_ (128),
- initial_queue_length_ (0),
- iterations_ (100000),
- debugging_ (0),
- verbosity_ (0),
- consumer_port_ ("-p " ACE_ITOA (10000)),
- supplier_port_ ("-p " ACE_ITOA (10001))
+ : thr_count_ (4),
+ t_flags_ (THR_DETACHED),
+ high_water_mark_ (8 * 1024),
+ low_water_mark_ (1024),
+ message_size_ (128),
+ initial_queue_length_ (0),
+ iterations_ (100000),
+ debugging_ (0),
+ verbosity_ (0),
+ consumer_port_ (ACE_DEFAULT_SERVER_PORT),
+ supplier_port_ (ACE_DEFAULT_SERVER_PORT + 1)
{
}
@@ -36,7 +46,7 @@ void Options::print_results (void)
this->itimer_.elapsed_time (et);
this->itimer_.get_rusage (rusage);
- if (options.verbose ())
+ if (this->verbose ())
{
#if defined (ACE_HAS_PRUSAGE_T)
ACE_OS::printf ("final concurrency hint = %d\n", ACE_Thread::getconcurrency ());
@@ -86,9 +96,6 @@ void Options::print_results (void)
#endif /* ACE_WIN32 */
}
-/* Manages the options */
-Options options;
-
void
Options::parse_args (int argc, char *argv[])
{
@@ -104,7 +111,7 @@ Options::parse_args (int argc, char *argv[])
this->t_flags (THR_BOUND);
break;
case 'c':
- this->consumer_port (get_opt.optarg);
+ this->consumer_port (ACE_OS::atoi (get_opt.optarg));
break;
case 'd':
this->debugging_ = 1;
@@ -128,7 +135,7 @@ Options::parse_args (int argc, char *argv[])
this->t_flags (THR_NEW_LWP);
break;
case 's':
- this->supplier_port (get_opt.optarg);
+ this->supplier_port (ACE_OS::atoi (get_opt.optarg));
break;
case 'T':
if (ACE_OS::strcasecmp (get_opt.optarg, "ON") == 0)
@@ -182,5 +189,3 @@ Options::parse_args (int argc, char *argv[])
(this->t_flags () & THR_BOUND) != 0,
(this->t_flags () & THR_NEW_LWP) != 0);
}
-
-#endif /* ACE_HAS_THREADS */
diff --git a/examples/ASX/Event_Server/Event_Server/Options.h b/examples/ASX/Event_Server/Event_Server/Options.h
index 5a7a541b835..596b2b48cc3 100644
--- a/examples/ASX/Event_Server/Event_Server/Options.h
+++ b/examples/ASX/Event_Server/Event_Server/Options.h
@@ -14,61 +14,88 @@
class Options
{
public:
- Options (void);
- ~Options (void);
+ static Options *instance (void);
+
void parse_args (int argc, char *argv[]);
- void stop_timer (void);
- void start_timer (void);
+ void stop_timer (void);
+ void start_timer (void);
- void thr_count (size_t count);
+ void thr_count (size_t count);
size_t thr_count (void);
- void initial_queue_length (size_t length);
+ void initial_queue_length (size_t length);
size_t initial_queue_length (void);
- void high_water_mark (size_t size);
+ void high_water_mark (size_t size);
size_t high_water_mark (void);
- void low_water_mark (size_t size);
+ void low_water_mark (size_t size);
size_t low_water_mark (void);
- void message_size (size_t size);
+ void message_size (size_t size);
size_t message_size (void);
- void iterations (size_t n);
+ void iterations (size_t n);
size_t iterations (void);
- void t_flags (long flag);
- long t_flags (void);
+ void t_flags (long flag);
+ long t_flags (void);
- void supplier_port (char *port);
- char *supplier_port (void);
+ void supplier_port (u_short port);
+ u_short supplier_port (void);
- void consumer_port (char *port);
- char *consumer_port (void);
+ void consumer_port (u_short port);
+ u_short consumer_port (void);
- int debug (void);
- int verbose (void);
+ int debug (void);
+ int verbose (void);
- void print_results (void);
+ void print_results (void);
private:
- ACE_Profile_Timer itimer_; /* Time the process */
- size_t thr_count_; /* Number of threads to spawn */
- long t_flags_; /* Flags to thr_create() */
- size_t high_water_mark_; /* ACE_Task high water mark */
- size_t low_water_mark_; /* ACE_Task low water mark */
- size_t message_size_; /* Size of a message */
- size_t initial_queue_length_; /* Initial number of items in the queue */
- size_t iterations_; /* Number of iterations to run the test program */
- int debugging_; /* Extra debugging info */
- int verbosity_; /* Extra verbose messages */
- char *consumer_port_; /* Port that the Consumer_Router is using */
- char *supplier_port_; /* Port that the Supplier_Router is using */
-};
+ Options (void);
+ ~Options (void);
+
+ ACE_Profile_Timer itimer_;
+ // Time the process.
+
+ size_t thr_count_;
+ // Number of threads to spawn.
+
+ long t_flags_;
+ // Flags to thr_create().
+
+ size_t high_water_mark_;
+ // ACE_Task high water mark.
-extern Options options;
+ size_t low_water_mark_;
+ // ACE_Task low water mark.
+
+ size_t message_size_;
+ // Size of a message.
+
+ size_t initial_queue_length_;
+ // Initial number of items in the queue.
+
+ size_t iterations_;
+ // Number of iterations to run the test program.
+
+ int debugging_;
+ // Extra debugging info.
+
+ int verbosity_;
+ // Extra verbose messages.
+
+ u_short consumer_port_;
+ // Port that the Consumer_Router is using.
+
+ u_short supplier_port_;
+ // Port that the Supplier_Router is using.
+
+ static Options *instance_;
+ // Static Singleton.
+};
#include "Options.i"
#endif /* ACE_HAS_THREADS */
diff --git a/examples/ASX/Event_Server/Event_Server/Options.i b/examples/ASX/Event_Server/Event_Server/Options.i
index 518a309f6cb..b3ce4e2e807 100644
--- a/examples/ASX/Event_Server/Event_Server/Options.i
+++ b/examples/ASX/Event_Server/Event_Server/Options.i
@@ -4,24 +4,24 @@
/* Option manager for ustreams */
inline void
-Options::supplier_port (char *port)
+Options::supplier_port (u_short port)
{
this->supplier_port_ = port;
}
-inline char *
+inline u_short
Options::supplier_port (void)
{
return this->supplier_port_;
}
inline void
-Options::consumer_port (char *port)
+Options::consumer_port (u_short port)
{
this->consumer_port_ = port;
}
-inline char *
+inline u_short
Options::consumer_port (void)
{
return this->consumer_port_;
diff --git a/examples/ASX/Event_Server/Event_Server/Peer_Router.cpp b/examples/ASX/Event_Server/Event_Server/Peer_Router.cpp
index 38e2977140d..1cbaa77b83b 100644
--- a/examples/ASX/Event_Server/Event_Server/Peer_Router.cpp
+++ b/examples/ASX/Event_Server/Event_Server/Peer_Router.cpp
@@ -1,70 +1,148 @@
-#if !defined (_PEER_ROUTER_C)
// $Id$
+#if !defined (_PEER_ROUTER_C)
#define _PEER_ROUTER_C
#include "ace/Service_Config.h"
#include "ace/Get_Opt.h"
-
#include "Options.h"
#include "Peer_Router.h"
-#if defined (ACE_HAS_THREADS)
+int
+Peer_Router_Context::send_peers (ACE_Message_Block *mb)
+{
+ PEER_ITERATOR map_iter = this->peer_map_;
+ int bytes = 0;
+ int iterations = 0;
+
+ // Skip past the header and get the message to send.
+ ACE_Message_Block *data_block = mb->cont ();
+
+ // "Multicast" the data to *all* the registered peers.
-// Define some short-hand macros to deal with verbose templates
-// names...
+ for (PEER_ENTRY *ss = 0;
+ map_iter.next (ss) != 0;
+ map_iter.advance ())
+ {
+ if (Options::instance ()->debug ())
+ ACE_DEBUG ((LM_DEBUG,
+ "(%t) sending to peer via handle %d\n",
+ ss->ext_id_));
+ iterations++;
+ // Increment reference count before sending since the
+ // Peer_Handler might be running in its own thread of control.
+ bytes += ss->int_id_->put (data_block->duplicate ());
+ }
-#define PH PEER_HANDLER
-#define PA PEER_ACCEPTOR
-#define PAD PEER_ADDR
-#define PK PEER_KEY
-#define PM PEER_MAP
+ mb->release ();
+ return bytes == 0 ? 0 : bytes / iterations;
+}
-template <class PH, class PK> int
-Acceptor_Factory<PH, PK>::init (int argc, char *argv[])
+int
+Peer_Router_Context::unbind_peer (ROUTING_KEY key)
{
- ACE_Get_Opt get_opt (argc, argv, "dp:", 0);
- ACE_INET_Addr addr;
-
- for (int c; (c = get_opt ()) != -1; )
- switch (c)
- {
- case 'p':
- addr.set (ACE_OS::atoi (get_opt.optarg));
- break;
- case 'd':
- break;
- default:
- break;
- }
-
- if (this->open (addr, ACE_Service_Config::reactor ()) == -1)
- ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "open"), -1);
- return 0;
+ return this->peer_map_.unbind (key);
}
-template <class PH, class PK>
-Acceptor_Factory<PH, PK>::Acceptor_Factory (Peer_Router<PH, PK> *pr)
- : pr_ (pr)
+void
+Peer_Router_Context::release (void)
{
+ this->reference_count_--;
+ if (this->reference_count_ == 0)
+ delete this;
}
-template <class PH, class PK> Peer_Router<PH, PK> *
-Acceptor_Factory<PH, PK>::router (void)
-{
- return this->pr_;
+int
+Peer_Router_Context::bind_peer (ROUTING_KEY key,
+ Peer_Handler *peer_handler)
+{
+ return this->peer_map_.bind (key, peer_handler);
}
-template <class ROUTER, class KEY>
-Peer_Handler<ROUTER, KEY>::Peer_Handler (ACE_Thread_Manager *tm)
- : inherited (tm)
+Peer_Router_Context::Peer_Router_Context (u_short port)
+ : reference_count_ (2) // 1 Consumer + 1 Supplier
{
+ // Perform initializations.
+
+ if (this->open (ACE_INET_Addr (port)) == -1)
+ ACE_ERROR ((LM_ERROR, "%p\n", "Acceptor::open"));
+
+ else if (this->peer_map_.open () == -1)
+ ACE_ERROR ((LM_ERROR, "%p\n", "Map_Manager::open"));
+
+ else
+ {
+ ACE_INET_Addr addr;
+
+ if (this->acceptor().get_local_addr (addr) != -1)
+ ACE_DEBUG ((LM_DEBUG,
+ "(%t) initializing on port = %d, handle = %d, this = %u\n",
+ addr.get_port_number (),
+ this->acceptor().get_handle (),
+ this));
+ else
+ ACE_ERROR ((LM_ERROR,
+ "%p\n", "get_local_addr"));
+ }
+}
+
+Peer_Router_Context::~Peer_Router_Context (void)
+{
+ // Free up the handle and close down the listening socket.
+ ACE_DEBUG ((LM_DEBUG,
+ "(%t) closing down Peer_Router_Context"));
+
+ // Close down the Acceptor and take ourselves out of the Reactor.
+ this->handle_close ();
+
+ PEER_ITERATOR map_iter = this->peer_map_;
+
+ // Make sure to take all the handles out of the map.
+
+ for (PEER_ENTRY *ss = 0;
+ map_iter.next (ss) != 0;
+ map_iter.advance ())
+ {
+ if (Options::instance ()->debug ())
+ ACE_DEBUG ((LM_DEBUG,
+ "(%t) closing down peer on handle %d\n",
+ ss->ext_id_));
+
+ if (ACE_Service_Config::reactor ()->remove_handler
+ (ss->ext_id_, ACE_Event_Handler::READ_MASK) == -1)
+ ACE_ERROR ((LM_ERROR, "(%t) p\n", "remove_handle"));
+ }
+
+ // Close down the map.
+ this->peer_map_.close ();
}
-template <class ROUTER, class KEY> int
-Peer_Handler<ROUTER, KEY>::svc (void)
+Peer_Router *
+Peer_Router_Context::peer_router (void)
{
+ return this->peer_router_;
+}
+
+void
+Peer_Router_Context::peer_router (Peer_Router *pr)
+{
+ this->peer_router_ = pr;
+}
+
+Peer_Handler *
+Peer_Router_Context::make_svc_handler (void)
+{
+ return new Peer_Handler (this);
+}
+
+Peer_Handler::Peer_Handler (Peer_Router_Context *prc)
+ : prc_ (prc)
+{
+}
+
#if 0
+Peer_Handler::svc (void)
+{
ACE_Thread_Control thread_control (tm);
ACE_Message_Block *db, *hb;
@@ -74,13 +152,13 @@ Peer_Handler<ROUTER, KEY>::svc (void)
for (;;)
{
db = new Message_Block (BUFSIZ);
- hb = new Message_Block (sizeof (KEY), Message_Block::MB_PROTO, db);
+ hb = new Message_Block (sizeof (ROUTING_KEY), Message_Block::MB_PROTO, db);
if ((n = this->peer_.recv (db->rd_ptr (), db->size ())) == -1)
LM_ERROR_RETURN ((LOG_ERROR, "%p", "recv failed"), -1);
else if (n == 0) // Client has closed down the connection.
{
- if (this->router_task_->unbind_peer (this->get_handle ()) == -1)
+ if (this->prc_->peer_router ()->unbind_peer (this->get_handle ()) == -1)
LM_ERROR_RETURN ((LOG_ERROR, "%p", "unbind failed"), -1);
LM_DEBUG ((LOG_DEBUG, "(%t) shutting down \n"));
return -1; // We do not need to be deregistered by reactor
@@ -94,130 +172,133 @@ Peer_Handler<ROUTER, KEY>::svc (void)
*(long *) hb->rd_ptr () = this->get_handle (); // Structure assignment.
hb->wr_ptr (sizeof (long));
- if (this->router_task_->reply (hb) == -1)
+ if (this->prc_->peer_router ()->reply (hb) == -1)
{
- cout << "Peer_Handler.svc : router_task->reply failed" << endl ;
+ cout << "Peer_Handler.svc : peer_router->reply failed" << endl ;
return -1;
}
}
}
return 0;
-#else
- return -1;
-#endif
}
+#endif
-template <class ROUTER, class KEY> int
-Peer_Handler<ROUTER, KEY>::put (ACE_Message_Block *mb, ACE_Time_Value *)
+int
+Peer_Handler::put (ACE_Message_Block *mb, ACE_Time_Value *)
{
- return this->peer ().send_n (mb->rd_ptr (), mb->length ());
+ return this->peer ().send_n (mb->rd_ptr (),
+ mb->length ());
}
-// Create a new handler and point its ROUTER_TASK_ data member to the
-// corresponding router. Note that this router is extracted out of
-// the Acceptor_Factory * that is passed in via the
-// ACE_Acceptor::handle_input() method.
+// Initialize a newly connected handler.
-template <class ROUTER, class KEY> int
-Peer_Handler<ROUTER, KEY>::open (void *a)
+int
+Peer_Handler::open (void *)
{
char buf[BUFSIZ], *p = buf;
- if (this->router_task_->info (&p, sizeof buf) != -1)
- ACE_DEBUG ((LM_DEBUG, "(%t) creating handler for %s, fd = %d, this = %d\n",
- buf, this->get_handle (), a));
+ if (this->prc_->peer_router ()->info (&p, sizeof buf) != -1)
+ ACE_DEBUG ((LM_DEBUG, "(%t) creating handler for %s, handle = %d\n",
+ buf, this->get_handle ()));
else
ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "info"), -1);
#if 0
- if (this->activate (options.t_flags ()) == -1)
+ if (this->activate (Options::instance ()->t_flags ()) == -1)
ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "activation of thread failed"), -1);
#endif
ACE_DEBUG ((LM_DEBUG,
- "Peer_Handler::open registering with Reactor for handle_input\n"));
+ "(%t) Peer_Handler::open registering with Reactor for handle_input\n"));
+ // Register with the Reactor to receive messages from our Peer.
if (ACE_Service_Config::reactor ()->register_handler
(this, ACE_Event_Handler::READ_MASK) == -1)
ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "register_handler"), -1);
- else if (this->router_task_->bind_peer (this->get_handle (), this) == -1)
+
+ // Insert outselves into the routing map.
+ else if (this->prc_->bind_peer (this->get_handle (), this) == -1)
ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "bind_peer"), -1);
+
+ // Turn off non-blocking I/O in case it was turned on.
else if (this->peer ().disable (ACE_NONBLOCK) == -1)
ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "disable non-blocking I/O"), -1);
- return 0;
+ else
+ return 0;
}
-// Receive a message from a supplier.
+// Receive a message from a Peer.
-template <class ROUTER, class KEY> int
-Peer_Handler<ROUTER, KEY>::handle_input (ACE_HANDLE h)
+int
+Peer_Handler::handle_input (ACE_HANDLE h)
{
- ACE_DEBUG ((LM_DEBUG, "(%t) input arrived on sd %d\n", h));
+ ACE_DEBUG ((LM_DEBUG, "(%t) input arrived on handle %d\n", h));
ACE_Message_Block *db = new ACE_Message_Block (BUFSIZ);
- ACE_Message_Block *hb = new ACE_Message_Block (sizeof (KEY),
+ ACE_Message_Block *hb = new ACE_Message_Block (sizeof (ROUTING_KEY),
ACE_Message_Block::MB_PROTO, db);
- int n;
- if ((n = this->peer ().recv (db->rd_ptr (), db->size ())) == -1)
+ // Check for memory failures.
+ if (db == 0 || hb == 0)
+ {
+ delete hb;
+ delete db;
+ errno = ENOMEM;
+ return -1;
+ }
+
+ ssize_t n = this->peer ().recv (db->rd_ptr (), db->size ());
+
+ if (n == -1)
ACE_ERROR_RETURN ((LM_ERROR, "%p", "recv failed"), -1);
else if (n == 0) // Client has closed down the connection.
{
- if (this->router_task_->unbind_peer (this->get_handle ()) == -1)
+ if (this->prc_->unbind_peer (this->get_handle ()) == -1)
ACE_ERROR_RETURN ((LM_ERROR, "%p", "unbind failed"), -1);
- ACE_DEBUG ((LM_DEBUG, "(%t) shutting down %d\n", h));
+
+ ACE_DEBUG ((LM_DEBUG, "(%t) shutting down handle %d\n", h));
return -1; // Instruct the ACE_Reactor to deregister us by returning -1.
}
- else // Transform incoming buffer into a Message and pass downstream.
+ else
{
+ // Transform incoming buffer into a Message.
+
+ // First, increment the write pointer to the end of the newly
+ // read data block.
db->wr_ptr (n);
- *(ACE_HANDLE *) hb->rd_ptr () = this->get_handle (); // structure assignment.
- hb->wr_ptr (sizeof (ACE_HANDLE));
- return this->router_task_->reply (hb) == -1 ? -1 : 0;
- }
-}
-template <class PH, class PK>
-Peer_Router<PH, PK>::Peer_Router (ACE_Thread_Manager *tm)
- : ACE_Task<ACE_MT_SYNCH> (tm)
-{
-}
+ // Second, copy the "address" into the header block.
+ *(ACE_HANDLE *) hb->rd_ptr () = this->get_handle ();
-template <class PH, class PK> int
-Peer_Router<PH, PK>::send_peers (ACE_Message_Block *mb)
-{
- PEER_ITERATOR map_iter = this->peer_map_;
- int bytes = 0;
- int iterations = 0;
- ACE_Message_Block *data_block = mb->cont ();
-
- for (ACE_Map_Entry<PK, PH *> *ss = 0;
- map_iter.next (ss) != 0;
- map_iter.advance ())
- {
- if (options.debug ())
- ACE_DEBUG ((LM_DEBUG, "(%t) sending to peer via sd %d\n", ss->ext_id_));
+ // Third, update the write pointer in the header block.
+ hb->wr_ptr (sizeof (ACE_HANDLE));
- iterations++;
- bytes += ss->int_id_->put (data_block);
+ // Finally, pass the message through the stream. Note that we
+ // use <Task::put> here because this gives the method at *our*
+ // level in the stream a chance to do something with the message
+ // before it is sent up the other side. For instance, if we
+ // receive messages in the Supplier_Router, it will just call
+ // <put_next> and send them up the stream to the Consumer_Router
+ // (which broadcasts them to consumers). However, if we receive
+ // messages in the Consumer_Router, it will reply to the
+ // Consumer with an error since it's not correct for Consumers
+ // to send messages.
+ return this->prc_->peer_router ()->put (hb) == -1 ? -1 : 0;
}
-
- delete mb;
- return bytes == 0 ? 0 : bytes / iterations;
}
-template <class PH, class PK>
-Peer_Router<PH, PK>::~Peer_Router (void)
+Peer_Router::Peer_Router (Peer_Router_Context *prc)
+ : prc_ (prc)
{
+
}
-template <class PH, class PK> int
-Peer_Router<PH, PK>::fini (void)
+Peer_Router_Context *
+Peer_Router::context (void) const
{
- delete this->acceptor_;
- return 0;
+ return this->prc_;
}
-template <class PH, class PK> int
-Peer_Router<PH, PK>::control (ACE_Message_Block *mb)
+int
+Peer_Router::control (ACE_Message_Block *mb)
{
ACE_IO_Cntl_Msg *ioc = (ACE_IO_Cntl_Msg *) mb->rd_ptr ();
ACE_IO_Cntl_Msg::ACE_IO_Cntl_Cmds command;
@@ -234,46 +315,4 @@ Peer_Router<PH, PK>::control (ACE_Message_Block *mb)
return 0;
}
-template <class PH, class PK> int
-Peer_Router<PH, PK>::unbind_peer (PK key)
-{
- return this->peer_map_.unbind (key);
-}
-
-template <class PH, class PK> int
-Peer_Router<PH, PK>::bind_peer (PK key, Peer_Handler<Peer_Router<PH, PK>, PK> *ph)
-{
- PH *peer_handler = (PH *) ph;
- return this->peer_map_.bind (key, peer_handler);
-}
-
-template <class PH, class PK> int
-Peer_Router<PH, PK>::init (int argc, char *argv[])
-{
- this->acceptor_ = new ACCEPTOR (this);
-
- if (this->acceptor_->init (argc, argv) == -1
- || this->peer_map_.open () == -1)
- return -1;
- else
- {
- ACE_INET_Addr addr;
- ACE_SOCK_Acceptor &acceptor = this->acceptor_->acceptor();
-
- if (acceptor.get_local_addr (addr) != -1)
- ACE_DEBUG ((LM_DEBUG, "(%t) initializing %s, port = %d, fd = %d, this = %u\n",
- this->name (), addr.get_port_number (),
- acceptor.get_handle (), this));
- else
- ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "get_local_addr"), -1);
- }
- return 0;
-}
-
-#undef PH
-#undef PA
-#undef PAD
-#undef PK
-#undef PM
-#endif /* ACE_HAS_THREADS */
#endif /* _PEER_ROUTER_C */
diff --git a/examples/ASX/Event_Server/Event_Server/Peer_Router.h b/examples/ASX/Event_Server/Event_Server/Peer_Router.h
index 299c534f72c..32d7e85906d 100644
--- a/examples/ASX/Event_Server/Event_Server/Peer_Router.h
+++ b/examples/ASX/Event_Server/Event_Server/Peer_Router.h
@@ -1,121 +1,133 @@
/* -*- C++ -*- */
// $Id$
-
#if !defined (_PEER_ROUTER_H)
#define _PEER_ROUTER_H
#include "ace/Acceptor.h"
#include "ace/SOCK_Acceptor.h"
-#include "ace/Thread_Manager.h"
#include "ace/Map_Manager.h"
-#if defined (ACE_HAS_THREADS)
+// Type of search key for CONSUMER_MAP
+typedef ACE_HANDLE ROUTING_KEY;
-// Forward declaration.
-template <class PEER_HANDLER, class KEY>
+// Forward declarations.
class Peer_Router;
+class Peer_Router_Context;
-template <class PEER_HANDLER, class KEY>
-class Acceptor_Factory : public ACE_Acceptor<PEER_HANDLER, ACE_SOCK_ACCEPTOR>
- // = TITLE
- // Creates <PEER_HANDLERs>, which route events between peers.
-{
-public:
- Acceptor_Factory (Peer_Router<PEER_HANDLER, KEY> *pr);
- Peer_Router<PEER_HANDLER, KEY> *router (void);
-
- int init (int argc, char *argv[]);
- // Initialize the acceptor when it's linked dynamically.
-
-private:
- Peer_Router<PEER_HANDLER, KEY> *pr_;
-};
-
-template <class ROUTER, class KEY>
class Peer_Handler : public ACE_Svc_Handler<ACE_SOCK_STREAM, ACE_MT_SYNCH>
// = TITLE
- // Receive input from a Peer.
+ // Receive input from a Peer and forward to the appropriate
+ // <Peer_Router>.
{
public:
- Peer_Handler (ACE_Thread_Manager * = 0);
+ Peer_Handler (Peer_Router_Context * = 0);
+ // Initialization method.
virtual int open (void * = 0);
- // Called by the ACE_Acceptor::handle_input() to activate this object.
+ // Called by the ACE_Acceptor::handle_input() to activate this
+ // object.
virtual int handle_input (ACE_HANDLE);
// Receive input from the peer.
virtual int put (ACE_Message_Block *, ACE_Time_Value *tv = 0);
- // Send output to a peer.
+ // Send output to a peer.
protected:
- typedef ACE_Svc_Handler<ACE_SOCK_STREAM, ACE_MT_SYNCH> inherited;
-
- ROUTER *router_task_;
- // Pointer to write task.
-
-private:
- virtual int svc (void);
- // Don't need this method here...
+ Peer_Router_Context *prc_;
+ // Pointer to router context.
};
-template <class PEER_HANDLER, class PEER_KEY>
-class Peer_Router : public ACE_Task<ACE_MT_SYNCH>
+class Peer_Router_Context : public ACE_Acceptor<Peer_Handler, ACE_SOCK_ACCEPTOR>
// = TITLE
- // This abstract base class provides mechanisms for routing
- // messages to/from a ACE_Stream from/to one or more peers (which
- // are typically running on remote hosts).
+ // Defines state and behavior shared between both Tasks in a
+ // Peer_Router Module.
//
// = DESCRIPTION
- // A subclass of Peer_Router overrides the open(), close(), and
- // put() methods in order to specialize the behavior of the router
- // to meet application-specific requirements.
+ // This class also serves as an Acceptor, which creates
+ // Peer_Handlers when Peers connect.
{
public:
// = Initialization and termination methods.
- Peer_Router (ACE_Thread_Manager * = 0);
- ~Peer_Router (void);
-
- typedef Peer_Handler<Peer_Router<PEER_HANDLER, PEER_KEY>, PEER_KEY> HANDLER;
+ Peer_Router_Context (u_short port);
- virtual int unbind_peer (PEER_KEY);
- // Remove a PEER_HANDLER from the PEER_MAP.
+ virtual int unbind_peer (ROUTING_KEY);
+ // Remove the <Peer_Handler *> from the <PEER_MAP> that corresponds
+ // to the <ROUTING_KEY>.
- virtual int bind_peer (PEER_KEY, HANDLER *);
- // Add a PEER_HANDLER to the PEER_MAP
+ virtual int bind_peer (ROUTING_KEY, Peer_Handler *);
+ // Add a <Peer_Handler> to the <PEER_MAP> that's associated with the
+ // <ROUTING_KEY>.
int send_peers (ACE_Message_Block *mb);
- // Send the message block to the peer(s).
+ // Send the <ACE_Message_Block> to the peer(s).
-protected:
- virtual int control (ACE_Message_Block *);
- // Handle control messages arriving from adjacent Modules.
+ Peer_Handler *make_svc_handler (void);
+ // Create a new <Peer_Handler> for each connection.
+
+ // = Set/Get Router Task.
+ Peer_Router *peer_router ();
+ void peer_router (Peer_Router *);
+
+ void release (void);
+ // Decrement the reference count and delete <this> when count == 0;
+
+private:
+ Peer_Router *peer_router_;
+ // Pointer to the <Peer_Router> that we are accepting for.
// = Useful typedefs
- typedef ACE_Map_Manager <PEER_KEY, PEER_HANDLER *, ACE_RW_Mutex> PEER_MAP;
- typedef ACE_Map_Iterator<PEER_KEY, PEER_HANDLER *, ACE_RW_Mutex> PEER_ITERATOR;
+ typedef ACE_Map_Manager <ROUTING_KEY, Peer_Handler *, ACE_RW_Mutex> PEER_MAP;
+ typedef ACE_Map_Iterator<ROUTING_KEY, Peer_Handler *, ACE_RW_Mutex> PEER_ITERATOR;
+ typedef ACE_Map_Entry<ROUTING_KEY, Peer_Handler *> PEER_ENTRY;
PEER_MAP peer_map_;
// Map used to keep track of active peers.
- // = Dynamic linking initialization hooks inherited from ACE_Task
- virtual int init (int argc, char *argv[]);
- virtual int fini (void);
+ int reference_count_;
+ // Keep track of when we can delete ourselves.
- typedef Acceptor_Factory<PEER_HANDLER, PEER_KEY> ACCEPTOR;
+ ~Peer_Router_Context (void);
+ // Private to ensure dynamic allocation.
+};
- ACCEPTOR *acceptor_;
- // Factory for accepting new PEER_HANDLERs.
+class Peer_Router : public ACE_Task<ACE_MT_SYNCH>
+ // = TITLE
+ // This abstract base class provides mechanisms for routing
+ // messages to/from a ACE_Stream from/to one or more peers (which
+ // are typically running on remote hosts).
+ //
+ // = DESCRIPTION
+ // A subclass of Peer_Router overrides the open(), close(), and
+ // put() methods in order to specialize the behavior of the router
+ // to meet application-specific requirements.
+{
+protected:
+ Peer_Router (Peer_Router_Context *prc);
+ // Initialization method.
+
+ virtual int control (ACE_Message_Block *);
+ // Handle control messages arriving from adjacent Modules.
+
+ Peer_Router_Context *context (void) const;
+ // Returns the routing context.
+
+ typedef ACE_Task<ACE_MT_SYNCH> inherited;
+ // Helpful typedef.
private:
+ Peer_Router_Context *prc_;
+ // Reference to the context shared by the writer and reader Tasks in
+ // the Consumer and Supplier Modules.
+
// = Prevent copies and pass-by-value.
- Peer_Router (const Peer_Router<PEER_HANDLER, PEER_KEY> &) {}
- void operator= (const Peer_Router<PEER_HANDLER, PEER_KEY> &) {}
+ Peer_Router (const Peer_Router &) {}
+ void operator= (const Peer_Router &) {}
};
#if defined (ACE_TEMPLATES_REQUIRE_SOURCE)
#include "Peer_Router.cpp"
#endif /* ACE_TEMPLATES_REQUIRE_SOURCE */
-#endif /* ACE_HAS_THREADS */
+
#endif /* _PEER_ROUTER_H */
diff --git a/examples/ASX/Event_Server/Event_Server/Supplier_Router.cpp b/examples/ASX/Event_Server/Event_Server/Supplier_Router.cpp
index f9450fd99e1..a9f4ebb8521 100644
--- a/examples/ASX/Event_Server/Event_Server/Supplier_Router.cpp
+++ b/examples/ASX/Event_Server/Event_Server/Supplier_Router.cpp
@@ -1,33 +1,9 @@
-#include "Supplier_Router.h"
// $Id$
+#include "Supplier_Router.h"
#include "Options.h"
-#if defined (ACE_HAS_THREADS)
-
-typedef Acceptor_Factory<Supplier_Handler, SUPPLIER_KEY> SUPPLIER_FACTORY;
-
-int
-Supplier_Handler::open (void *a)
-{
- SUPPLIER_FACTORY *af = (SUPPLIER_FACTORY *) a;
- this->router_task_ = af->router ();
- return this->Peer_Handler<SUPPLIER_ROUTER, SUPPLIER_KEY>::open (a);
-}
-
-Supplier_Handler::Supplier_Handler (ACE_Thread_Manager *tm)
- : Peer_Handler<SUPPLIER_ROUTER, SUPPLIER_KEY> (tm)
-{
-}
-
-// Create a new router and associate it with the REACTOR parameter.
-
-Supplier_Router::Supplier_Router (ACE_Thread_Manager *tm)
- : SUPPLIER_ROUTER (tm)
-{
-}
-
-// Handle incoming messages in a separate thread.
+// Handle outgoing messages in a separate thread.
int
Supplier_Router::svc (void)
@@ -35,14 +11,19 @@ Supplier_Router::svc (void)
assert (this->is_writer ());
ACE_Thread_Control tc (this->thr_mgr ());
- ACE_Message_Block *mb = 0;
ACE_DEBUG ((LM_DEBUG, "(%t) starting svc in Supplier_Router\n"));
- while (this->getq (mb) >= 0)
+ for (ACE_Message_Block *mb = 0;
+ this->getq (mb) >= 0;
+ )
{
- ACE_DEBUG ((LM_DEBUG, "Supplier_Router is routing via send_peers\n"));
- if (this->send_peers (mb) == -1)
+ ACE_DEBUG ((LM_DEBUG,
+ "(%t) warning: Supplier_Router is forwarding a message via send_peers\n"));
+
+ // Broadcast the message to the Suppliers.
+
+ if (this->context ()->send_peers (mb) == -1)
ACE_ERROR_RETURN ((LM_ERROR,
"(%t) send_peers failed in Supplier_Router\n"),
-1);
@@ -54,25 +35,26 @@ Supplier_Router::svc (void)
// destructor.
}
-// Initialize the Router.
+Supplier_Router::Supplier_Router (Peer_Router_Context *prc)
+ : Peer_Router (prc)
+{
+}
+
+// Initialize the Supplier Router.
int
Supplier_Router::open (void *)
{
- assert (this->is_writer ());
-
- char *argv[4];
-
- argv[0] = (char *) this->name ();
- argv[1] = "-p";
- argv[2] = options.supplier_port ();
- argv[3] = 0;
-
- if (this->init (2, &argv[1]) == -1)
- return -1;
-
- // Make this an active object.
- return this->activate (options.t_flags ());
+ if (this->is_reader ())
+ // Set the Peer_Router_Context to point back to us so that all the
+ // Peer_Handler's <put> their incoming <Message_Blocks> to our
+ // reader Task.
+ this->context ()->peer_router (this);
+
+ else // if (this->is_writer ()
+ // Make this an active object to handle the error cases in a
+ // separate thread.
+ return this->activate (Options::instance ()->t_flags ());
}
// Close down the router.
@@ -80,31 +62,53 @@ Supplier_Router::open (void *)
int
Supplier_Router::close (u_long)
{
- assert (this->is_writer ());
- ACE_DEBUG ((LM_DEBUG, "(%t) closing Supplier_Router\n"));
- this->peer_map_.close ();
+ ACE_DEBUG ((LM_DEBUG, "(%t) closing Supplier_Router %s\n",
+ this->is_reader () ? "reader" : "writer"));
- // Inform the thread to shut down.
- this->msg_queue ()->deactivate ();
+ if (this->is_writer ())
+ // Inform the thread to shut down.
+ this->msg_queue ()->deactivate ();
+
+ // Both writer and reader call release(), so the context knows when
+ // to clean itself up.
+ this->context ()->release ();
return 0;
}
-// Send a MESSAGE_BLOCK to the supplier(s).
+// Send an <ACE_Message_Block> to the supplier(s).
int
-Supplier_Router::put (ACE_Message_Block *mb, ACE_Time_Value *)
+Supplier_Router::put (ACE_Message_Block *mb,
+ ACE_Time_Value *)
{
- assert (this->is_writer ());
+ // Perform the necessary control operations before passing
+ // the message up the stream.
if (mb->msg_type () == ACE_Message_Block::MB_IOCTL)
{
this->control (mb);
return this->put_next (mb);
}
- else
- // Queue up the message, which will be processed by
- // Supplier_Router::svc().
- return this->putq (mb);
+
+ // If we're the reader then we are responsible for pass messages up
+ // to the next Module's writer Task.
+
+ else if (this->is_reader ())
+ return this->put_next (mb);
+ else // if (this->is_writer ())
+ {
+ // Someone is trying to write to the Supplier. In this
+ // implementation this is considered an error. However, we'll
+ // just go ahead and forward the message to the Supplier (who
+ // hopefully is prepared to receive it).
+ ACE_DEBUG ((LM_WARNING, "(%t) warning: sending to a Supplier\n"));
+
+ // Queue up the message to processed by Supplier_Router::svc().
+ // Since we don't expect to be getting many of these messages,
+ // we queue them up and run them in a separate thread to avoid
+ // taxing the main thread.
+ return this->putq (mb);
+ }
}
// Return information about the Supplier_Router ACE_Module.
@@ -112,17 +116,16 @@ Supplier_Router::put (ACE_Message_Block *mb, ACE_Time_Value *)
int
Supplier_Router::info (char **strp, size_t length) const
{
- char buf[BUFSIZ];
- ACE_INET_Addr addr;
+ char buf[BUFSIZ];
+ ACE_INET_Addr addr;
const char *mod_name = this->name ();
- ACE_SOCK_Acceptor &sa = this->acceptor_->acceptor ();
- if (sa.get_local_addr (addr) == -1)
+ if (this->context ()->acceptor ().get_local_addr (addr) == -1)
return -1;
- ACE_OS::sprintf (buf, "%s\t %d/%s %s",
- mod_name, addr.get_port_number (), "tcp",
- "# supplier router\n");
+ ACE_OS::sprintf (buf, "%s\t %d/%s %s (%s)\n",
+ mod_name, addr.get_port_number (), "tcp",
+ "# supplier router", this->is_reader () ? "reader" : "writer");
if (*strp == 0 && (*strp = ACE_OS::strdup (mod_name)) == 0)
return -1;
@@ -130,5 +133,3 @@ Supplier_Router::info (char **strp, size_t length) const
ACE_OS::strncpy (*strp, mod_name, length);
return ACE_OS::strlen (mod_name);
}
-
-#endif /* ACE_HAS_THREADS */
diff --git a/examples/ASX/Event_Server/Event_Server/Supplier_Router.h b/examples/ASX/Event_Server/Event_Server/Supplier_Router.h
index 23da0812781..42d067a6454 100644
--- a/examples/ASX/Event_Server/Event_Server/Supplier_Router.h
+++ b/examples/ASX/Event_Server/Event_Server/Supplier_Router.h
@@ -1,8 +1,6 @@
/* -*- C++ -*- */
// $Id$
-/* The interface between a supplier and an Event Service ACE_Stream */
-
#if !defined (_SUPPLIER_ROUTER_H)
#define _SUPPLIER_ROUTER_H
@@ -12,40 +10,49 @@
#include "ace/Svc_Handler.h"
#include "Peer_Router.h"
-#if defined (ACE_HAS_THREADS)
-
-/* Forward declaration */
-class Supplier_Handler;
-
-/* Type of search key for SUPPLIER_MAP */
-typedef ACE_HANDLE SUPPLIER_KEY;
-
-/* Instantiated type for routing messages to suppliers */
-
-typedef Peer_Router<Supplier_Handler, SUPPLIER_KEY> SUPPLIER_ROUTER;
-
-class Supplier_Handler : public Peer_Handler<SUPPLIER_ROUTER, SUPPLIER_KEY>
+class Supplier_Router : public Peer_Router
+ // = TITLE
+ // Provides the interface between one or more Suppliers and the
+ // Event Server ACE_Stream.
+ //
+ // = DESCRIPTION
+ // When used on the "reader" side of a Stream, this Router Task
+ // simply forwards all messages up the stream. When used on the
+ // "writer" side, this Router Task queues up outgoing messages
+ // to suppliers and sends them in a separate thread. The reason
+ // for this is that it's really an "error" for a
+ // <Supplier_Router> to send messages to Suppliers, so we don't
+ // expect this to happen very much. when it does we use a
+ // separate thread to avoid taxing the main thread.
{
public:
- Supplier_Handler (ACE_Thread_Manager *tm = 0);
- virtual int open (void *);
-};
-
-class Supplier_Router : public SUPPLIER_ROUTER
-{
-public:
- Supplier_Router (ACE_Thread_Manager *);
+ Supplier_Router (Peer_Router_Context *prc);
+ // Initialization method.
protected:
- /* ACE_Task hooks. */
+ // = ACE_Task hooks.
+
+ // All of these methods are called via base class pointers by the
+ // ACE Stream apparatus. Therefore, we can put them in the
+ // protected section.
+
virtual int open (void *a = 0);
+ // Called by the Stream to initialize the router.
+
virtual int close (u_long flags = 0);
+ // Called by the Stream to shutdown the router.
+
virtual int put (ACE_Message_Block *msg, ACE_Time_Value * = 0);
+ // Called by the <SUPPLIER_HANDLER> to pass a message to the Router.
+ // The Router queues up this message, which is then processed in the
+ // <svc> method in a separate thread.
+
virtual int svc (void);
+ // Runs in a separate thread to dequeue messages and pass them up
+ // the stream.
- /* Dynamic linking hooks inherited from Peer_Router */
virtual int info (char **info_string, size_t length) const;
+ // Dynamic linking hook.
};
-#endif /* ACE_HAS_THREADS */
#endif /* _SUPPLIER_ROUTER_H */
diff --git a/examples/ASX/Event_Server/Event_Server/event_server.cpp b/examples/ASX/Event_Server/Event_Server/event_server.cpp
index 1e111a69fb9..d40f82b987c 100644
--- a/examples/ASX/Event_Server/Event_Server/event_server.cpp
+++ b/examples/ASX/Event_Server/Event_Server/event_server.cpp
@@ -2,7 +2,6 @@
// Test the event server.
-
#include "ace/Stream.h"
#include "ace/Service_Config.h"
#include "Options.h"
@@ -10,14 +9,12 @@
#include "Event_Analyzer.h"
#include "Supplier_Router.h"
-#if defined (ACE_HAS_THREADS)
-
typedef ACE_Stream<ACE_MT_SYNCH> MT_Stream;
typedef ACE_Module<ACE_MT_SYNCH> MT_Module;
-// Handle SIGINT and terminate the entire application.
-
class Quit_Handler : public ACE_Sig_Adapter
+ // = TITLE
+ // Handle SIGINT and terminate the entire application.
{
public:
Quit_Handler (void);
@@ -41,9 +38,9 @@ Quit_Handler::Quit_Handler (void)
int
Quit_Handler::handle_input (ACE_HANDLE)
{
- options.stop_timer ();
+ Options::instance ()->stop_timer ();
ACE_DEBUG ((LM_INFO, "(%t) closing down the test\n"));
- options.print_results ();
+ Options::instance ()->print_results ();
ACE_Service_Config::end_reactor_event_loop ();
return 0;
@@ -54,8 +51,7 @@ main (int argc, char *argv[])
{
ACE_Service_Config daemon;
- options.parse_args (argc, argv);
-
+ Options::instance ()->parse_args (argc, argv);
{
// Primary ACE_Stream for EVENT_SERVER application.
MT_Stream event_server;
@@ -63,49 +59,72 @@ main (int argc, char *argv[])
// Enable graceful shutdowns...
Quit_Handler quit_handler;
- // Create the Supplier Router module.
+ Peer_Router_Context *src;
+ // Create the Supplier_Router's routing context, which contains
+ // context shared by both the write-side and read-side of the
+ // Supplier_Router Module.
+ ACE_NEW_RETURN (src,
+ Peer_Router_Context (Options::instance ()->supplier_port ()),
+ -1);
- MT_Module *sr = new MT_Module ("Supplier_Router",
- new Supplier_Router (ACE_Service_Config::thr_mgr ()));
+ MT_Module *srm = 0;
+ // Create the Supplier Router module.
+ ACE_NEW_RETURN (srm, MT_Module
+ ("Supplier_Router",
+ new Supplier_Router (src),
+ new Supplier_Router (src)),
+ -1);
+ MT_Module *eam = 0;
// Create the Event Analyzer module.
-
- MT_Module *ea = new MT_Module ("Event_Analyzer",
- new Event_Analyzer,
- new Event_Analyzer);
-
+ ACE_NEW_RETURN (eam, MT_Module
+ ("Event_Analyzer",
+ new Event_Analyzer,
+ new Event_Analyzer),
+ -1);
+
+ Peer_Router_Context *crc;
+ // Create the Consumer_Router's routing context, which contains
+ // context shared by both the write-side and read-side of the
+ // Consumer_Router Module.
+ ACE_NEW_RETURN (crc,
+ Peer_Router_Context (Options::instance ()->consumer_port ()),
+ -1);
+
+ MT_Module *crm = 0;
// Create the Consumer Router module.
-
- MT_Module *cr = new MT_Module ("Consumer_Router",
- 0, // 0 triggers the creation of a ACE_Thru_Task...
- new Consumer_Router (ACE_Service_Config::thr_mgr ()));
+ ACE_NEW_RETURN (crm, MT_Module
+ ("Consumer_Router",
+ new Consumer_Router (crc),
+ new Consumer_Router (crc)),
+ -1);
// Push the Modules onto the event_server stream.
- if (event_server.push (sr) == -1)
+ if (event_server.push (srm) == -1)
ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "push (Supplier_Router)"), -1);
- if (event_server.push (ea) == -1)
+ if (event_server.push (eam) == -1)
ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "push (Event_Analyzer)"), -1);
- if (event_server.push (cr) == -1)
+ if (event_server.push (crm) == -1)
ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "push (Consumer_Router)"), -1);
// Set the high and low water marks appropriately.
- int wm = options.low_water_mark ();
+ int wm = Options::instance ()->low_water_mark ();
if (event_server.control (ACE_IO_Cntl_Msg::SET_LWM, &wm) == -1)
ACE_ERROR_RETURN ((LM_ERROR, "push (setting low watermark)"), -1);
- wm = options.high_water_mark ();
+ wm = Options::instance ()->high_water_mark ();
if (event_server.control (ACE_IO_Cntl_Msg::SET_HWM, &wm) == -1)
ACE_ERROR_RETURN ((LM_ERROR, "push (setting high watermark)"), -1);
- options.start_timer ();
+ Options::instance ()->start_timer ();
- // Perform the main event loop waiting for the user to type ^C or to
- // enter a line on the ACE_STDIN.
+ // Perform the main event loop waiting for the user to type ^C or
+ // to enter a line on the ACE_STDIN.
daemon.run_reactor_event_loop ();
// The destructor of event_server will close down the stream and
@@ -117,10 +136,3 @@ main (int argc, char *argv[])
ACE_DEBUG ((LM_DEBUG, "exiting main\n"));
return 0;
}
-#else
-int
-main (void)
-{
- ACE_ERROR_RETURN ((LM_ERROR, "test not defined for this platform\n"), -1);
-}
-#endif /* ACE_HAS_THREADS */
diff --git a/examples/ASX/Event_Server/README b/examples/ASX/Event_Server/README
index f97e767cdd8..8e1342fd7bc 100644
--- a/examples/ASX/Event_Server/README
+++ b/examples/ASX/Event_Server/README
@@ -1,13 +1,18 @@
The subdirectory illustrates a number of the ACE ASX framework
-features using an ACE_Stream application called the Event Server. The
-Event Server works as follows:
+features using an ACE_Stream application called the Event Server. For
+more information on the design and use of the ACE ASX framework please
+see http://www.cs.wustl.edu/~schmidt/C++-USENIX-94.ps.gz and
+http://www.cs.wustl.edu/~schmidt/DSEJ-94.ps.gz
+
+The Event Server example works as follows:
1. When the ./Event_Server/event_server executable is run it
creates two SOCK_Acceptors, which listen for and accept
- incoming connections from consumers and suppliers.
+ incoming connections from Consumers and Suppliers.
-2. The ./Event_Server/Transceiver/transceiver application may be
- started multiple times. Each call should be either:
+2. The ./Event_Server/Transceiver/transceiver application plays
+ the role of Consumer and Supplier. It can be started multiple
+ times. Each call should be either:
% transceiver -p XYZ -h hostname
@@ -15,24 +20,25 @@ Event Server works as follows:
% transceiver -p ABC -h hostname
- where XYZ and ABC are the consumer port and supplier port,
- respectively, on the event server and "hostname" is the name of the
- machine the event_server is running. I typically open up multiple
- windows.
+ where XYZ and ABC are the Consumer listening port and the Supplier
+ listening port, respectively, on the event server and "hostname" is
+ the name of the machine the event_server is running. I typically
+ run the Consumer(s) and Supplier(s) in different windows.
-3. Once the consumer(s) and supplier(s) are connected, you can type
- data from any supplier windows. This data will be routed
+3. Once the Consumer(s) and Supplier(s) are connected, you can type
+ data from any Supplier window. This data will be routed
through the Modules/Tasks in an event_server's Stream and
- be forwarded to the consumer(s).
+ be forwarded to the Consumer(s). Note that you can also
+ send messages from the Consumer(s) to Supplier(s), but the
+ Event Server will warn you about this since it's not really
+ kosher...
4. When you want to shut down the tranceivers or event server
- just type ^C (which generates a SIGINT).
+ just type ^C (which generates a SIGINT) or type any input
+ in the window running the Event Server.
-What makes this example particularly interesting is that
-once you've got the hang of this basic architecture, you can
-"push" new filtering Modules onto the event_server Stream
- and modify the application's behavior.
+What makes this example particularly interesting is that once you've
+got the hang of this basic architecture, you can "push" new filtering
+Modules onto the event_server Stream and modify the application's
+behavior transparently.
-For more information on the design and use of the ACE ASX framework
-please see http://www.cs.wustl.edu/~schmidt/C++-USENIX-94.ps.gz and
-http://www.cs.wustl.edu/~schmidt/DSEJ-94.ps.gz
diff --git a/examples/ASX/Event_Server/Transceiver/transceiver.cpp b/examples/ASX/Event_Server/Transceiver/transceiver.cpp
index 37335b8209f..673ec0a0e93 100644
--- a/examples/ASX/Event_Server/Transceiver/transceiver.cpp
+++ b/examples/ASX/Event_Server/Transceiver/transceiver.cpp
@@ -1,16 +1,62 @@
-// Test program for the event transceiver. This program can play the
// $Id$
+// Test program for the event transceiver. This program can play the
// role of either Consumer or Supplier. You can terminate this
// program by typing ^C....
-
#include "ace/Service_Config.h"
#include "ace/Connector.h"
#include "ace/SOCK_Connector.h"
#include "ace/Get_Opt.h"
-#if defined (ACE_HAS_THREADS)
+// Port number of event server.
+static u_short port_number;
+
+// Name of event server.
+static char *host_name;
+
+// Are we playing the Consumer ('C') or Supplier ('S') role?
+static char role = 'S';
+
+// Handle the command-line arguments.
+
+static void
+parse_args (int argc, char *argv[])
+{
+ ACE_Get_Opt get_opt (argc, argv, "Ch:p:S");
+
+ port_number = ACE_DEFAULT_SERVER_PORT;
+ host_name = ACE_DEFAULT_SERVER_HOST;
+
+ for (int c; (c = get_opt ()) != -1; )
+ switch (c)
+ {
+ case 'C':
+ role = c;
+ break;
+ case 'h':
+ host_name = get_opt.optarg;
+ break;
+ case 'p':
+ port_number = ACE_OS::atoi (get_opt.optarg);
+ break;
+ case 'S':
+ role = c;
+ break;
+ default:
+ ACE_ERROR ((LM_ERROR,
+ "usage: %n [-p portnum] [-h host_name]\n%a", 1));
+ /* NOTREACHED */
+ break;
+ }
+
+ // Increment by 1 if we're the supplier to mirror the default
+ // behavior of the Event_Server (which sets the Consumer port to
+ // ACE_DEFAULT_SERVER_PORT and the Supplier port to
+ // ACE_DEFAULT_SERVER_PORT + 1).
+ if (role == 'S' && port_number == ACE_DEFAULT_SERVER_PORT)
+ port_number++;
+}
class Event_Transceiver : public ACE_Svc_Handler<ACE_SOCK_STREAM, ACE_NULL_SYNCH>
// = TITLE
@@ -34,6 +80,9 @@ public:
virtual int handle_signal (int signum, siginfo_t *, ucontext_t *);
// Close down via SIGINT.
+ virtual int handle_close (ACE_HANDLE, ACE_Reactor_Mask);
+ // Close down the event loop.
+
private:
int receiver (void);
// Reads data from socket and writes to ACE_STDOUT.
@@ -42,6 +91,14 @@ private:
// Writes data from ACE_STDIN to socket.
};
+int
+Event_Transceiver::handle_close (ACE_HANDLE,
+ ACE_Reactor_Mask)
+{
+ ACE_Service_Config::end_reactor_event_loop ();
+ return 0;
+}
+
// Close down via SIGINT.
int
@@ -71,8 +128,7 @@ int
Event_Transceiver::open (void *)
{
if (ACE_Service_Config::reactor ()->register_handler
- (this->peer ().get_handle (),
- this,
+ (this->peer ().get_handle (), this,
ACE_Event_Handler::READ_MASK) == -1)
ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "register_handler"), -1);
else if (ACE::register_stdin_handler (this,
@@ -95,7 +151,8 @@ Event_Transceiver::handle_input (ACE_HANDLE handle)
int
Event_Transceiver::forwarder (void)
{
- ACE_DEBUG ((LM_DEBUG, "(%P|%t) entering transceiver forwarder\n"));
+ ACE_DEBUG ((LM_DEBUG, "(%P|%t) entering %s forwarder\n",
+ role == 'C' ? "Consumer" : "Supplier"));
char buf[BUFSIZ];
ssize_t n = ACE_OS::read (ACE_STDIN, buf, sizeof buf);
@@ -104,14 +161,16 @@ Event_Transceiver::forwarder (void)
if (n <= 0 || this->peer ().send_n (buf, n) != n)
result = -1;
- ACE_DEBUG ((LM_DEBUG, "(%P|%t) leaving transceiver forwarder\n"));
+ ACE_DEBUG ((LM_DEBUG, "(%P|%t) leaving %s forwarder\n",
+ role == 'C' ? "Consumer" : "Supplier"));
return result;
}
int
Event_Transceiver::receiver (void)
{
- ACE_DEBUG ((LM_DEBUG, "(%P|%t) entering transceiver receiver\n"));
+ ACE_DEBUG ((LM_DEBUG, "(%P|%t) entering %s receiver\n",
+ role == 'C' ? "Consumer" : "Supplier"));
char buf[BUFSIZ];
@@ -121,43 +180,11 @@ Event_Transceiver::receiver (void)
if (n <= 0 || ACE_OS::write (ACE_STDOUT, buf, n) != n)
result = -1;
- ACE_DEBUG ((LM_DEBUG, "(%P|%t) leaving transceiver receiver\n"));
+ ACE_DEBUG ((LM_DEBUG, "(%P|%t) leaving %s receiver\n",
+ role == 'C' ? "Consumer" : "Supplier"));
return result;
}
-// Port number of event server.
-static u_short port_number;
-
-// Name of event server.
-static char *host_name;
-
-// Handle the command-line arguments.
-
-static void
-parse_args (int argc, char *argv[])
-{
- ACE_Get_Opt get_opt (argc, argv, "h:p:");
-
- port_number = ACE_DEFAULT_SERVER_PORT;
- host_name = ACE_DEFAULT_SERVER_HOST;
-
- for (int c; (c = get_opt ()) != -1; )
- switch (c)
- {
- case 'h':
- host_name = get_opt.optarg;
- break;
- case 'p':
- port_number = ACE_OS::atoi (get_opt.optarg);
- break;
- default:
- ACE_ERROR ((LM_ERROR,
- "usage: %n [-p portnum] [-h host_name]\n%a", 1));
- /* NOTREACHED */
- break;
- }
-}
-
int
main (int argc, char *argv[])
{
@@ -169,7 +196,8 @@ main (int argc, char *argv[])
ACE_Connector<Event_Transceiver, ACE_SOCK_CONNECTOR> connector;
Event_Transceiver transceiver;
- if (connector.connect (&transceiver, ACE_INET_Addr (port_number, host_name)) == -1)
+ if (connector.connect (&transceiver,
+ ACE_INET_Addr (port_number, host_name)) == -1)
ACE_ERROR_RETURN ((LM_ERROR, "%p\n", host_name), 1);
// Run event loop until either the event server shuts down or we get
@@ -177,11 +205,3 @@ main (int argc, char *argv[])
ACE_Service_Config::run_reactor_event_loop ();
return 0;
}
-#else
-int
-main (void)
-{
- ACE_ERROR ((LM_ERROR, "test not defined for this platform\n"));
- return 0;
-}
-#endif /* ACE_HAS_THREADS */
diff --git a/examples/Connection/non_blocking/CPP-connector.cpp b/examples/Connection/non_blocking/CPP-connector.cpp
index b2628abfa52..130692028f2 100644
--- a/examples/Connection/non_blocking/CPP-connector.cpp
+++ b/examples/Connection/non_blocking/CPP-connector.cpp
@@ -1,9 +1,8 @@
-#if !defined (CPP_CONNECTOR_C)
// $Id$
+#if !defined (CPP_CONNECTOR_C)
#define CPP_CONNECTOR_C
-
#include "CPP-connector.h"
#define PR_ST_1 ACE_PEER_STREAM_1
@@ -27,7 +26,8 @@ Peer_Handler<PR_ST_2>::open (void *)
this->action_ = &Peer_Handler<PR_ST_2>::connected;
if (this->reactor ())
- this->reactor ()->register_handler (this, ACE_Event_Handler::WRITE_MASK);
+ this->reactor ()->register_handler
+ (this, ACE_Event_Handler::WRITE_MASK);
else
{
while (this->connected () != -1)
@@ -48,10 +48,11 @@ template <PR_ST_1> int
Peer_Handler<PR_ST_2>::disconnecting (void)
{
char buf[BUFSIZ];
- int n;
+ ssize_t n = this->peer ().recv (buf, sizeof buf);
- if ((n = this->peer ().recv (buf, sizeof buf)) > 0)
+ if (n > 0)
ACE_OS::write (ACE_STDOUT, buf, n);
+
this->action_ = &Peer_Handler<PR_ST_2>::idle;
return -1;
}
@@ -67,12 +68,12 @@ template <PR_ST_1> int
Peer_Handler<PR_ST_2>::connected (void)
{
char buf[BUFSIZ];
- int n;
ACE_DEBUG ((LM_DEBUG, "please enter input..: "));
- if ((n = ACE_OS::read (ACE_STDIN, buf, sizeof buf)) > 0
- && this->peer ().send_n (buf, n) != n)
+ ssize_t n = ACE_OS::read (ACE_STDIN, buf, sizeof buf);
+
+ if (n > 0 && this->peer ().send_n (buf, n) != n)
ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "write failed"), -1);
else if (n == 0) /* Explicitly close the connection. */
{
@@ -89,13 +90,12 @@ template <PR_ST_1> int
Peer_Handler<PR_ST_2>::stdio (void)
{
char buf[BUFSIZ];
- int n;
- ACE_DEBUG ((LM_DEBUG, "stdio!\n"));
-
- ACE_DEBUG ((LM_DEBUG, "please enter input..: "));
+ ACE_DEBUG ((LM_DEBUG, "stdio!\nplease enter input..: "));
- if ((n = ACE_OS::read (ACE_STDIN, buf, sizeof buf)) > 0)
+ ssize_t n = ACE_OS::read (ACE_STDIN, buf, sizeof buf);
+
+ if (n > 0)
{
ACE_OS::write (ACE_STDOUT, buf, n);
return 0;
@@ -125,13 +125,20 @@ Peer_Handler<PR_ST_2>::handle_close (ACE_HANDLE,
ACE_Reactor_Mask mask)
{
ACE_DEBUG ((LM_DEBUG, "closing down (%d)\n", mask));
+
+ // When the socket closes down, then we'll switch over to reading
+ // from stdin and writing to stdout.
if (ACE_BIT_ENABLED (mask, ACE_Event_Handler::WRITE_MASK))
{
this->action_ = &Peer_Handler<PR_ST_2>::stdio;
this->peer ().close ();
ACE_OS::rewind (stdin);
- return this->reactor () && this->reactor ()->register_handler
- (ACE_STDIN, this, ACE_Event_Handler::READ_MASK);
+
+ if (this->reactor ())
+ return ACE::register_stdin_handler
+ (this, this->reactor (), ACE_Service_Config::thr_mgr ());
+ else
+ return 0;
}
else if (ACE_BIT_ENABLED (mask, ACE_Event_Handler::READ_MASK))
delete this;
@@ -167,8 +174,11 @@ IPC_Client<SH, PR_CO_2>::init (int argc, char *argv[])
this->inherited::open (ACE_Service_Config::reactor ());
char *r_addr = argc > 1 ? argv[1] :
- ACE_SERVER_ADDRESS (ACE_DEFAULT_SERVER_HOST, ACE_DEFAULT_SERVER_PORT_STR);
- ACE_Time_Value timeout (argc > 2 ? ACE_OS::atoi (argv[2]) : ACE_DEFAULT_TIMEOUT);
+ ACE_SERVER_ADDRESS (ACE_DEFAULT_SERVER_HOST,
+ ACE_DEFAULT_SERVER_PORT_STR);
+ ACE_Time_Value timeout (argc > 2
+ ? ACE_OS::atoi (argv[2])
+ : ACE_DEFAULT_TIMEOUT);
char *l_addr = argc > 3 ? argv[3] : ACE_DEFAULT_LOCAL_PORT_STR;
// Handle signals through the ACE_Reactor.
diff --git a/examples/Logger/Acceptor-server/server_loggerd.cpp b/examples/Logger/Acceptor-server/server_loggerd.cpp
index 20522d9168c..89ed3a859a7 100644
--- a/examples/Logger/Acceptor-server/server_loggerd.cpp
+++ b/examples/Logger/Acceptor-server/server_loggerd.cpp
@@ -242,7 +242,7 @@ main (int argc, char *argv[])
ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "open"), -1);
else if (REACTOR::instance ()->register_handler
- (&peer_acceptor, ACE_Event_Handler::READ_MASK) == -1)
+ (&peer_acceptor, ACE_Event_Handler::ACCEPT_MASK) == -1)
ACE_ERROR_RETURN ((LM_ERROR,
"registering service with ACE_Reactor\n"), -1);
diff --git a/examples/Logger/simple-server/Logging_Acceptor.cpp b/examples/Logger/simple-server/Logging_Acceptor.cpp
index 641ed58256c..c971f3537e9 100644
--- a/examples/Logger/simple-server/Logging_Acceptor.cpp
+++ b/examples/Logger/simple-server/Logging_Acceptor.cpp
@@ -34,7 +34,7 @@ Logging_Acceptor::handle_close (ACE_HANDLE, ACE_Reactor_Mask)
Logging_Acceptor::~Logging_Acceptor (void)
{
this->handle_close (ACE_INVALID_HANDLE,
- ACE_Event_Handler::READ_MASK);
+ ACE_Event_Handler::ACCEPT_MASK);
}
// Returns underlying device descriptor.
diff --git a/examples/Logger/simple-server/server_loggerd.cpp b/examples/Logger/simple-server/server_loggerd.cpp
index 2d65a8b56db..dedb8366289 100644
--- a/examples/Logger/simple-server/server_loggerd.cpp
+++ b/examples/Logger/simple-server/server_loggerd.cpp
@@ -1,9 +1,9 @@
-// This server daemon collects, formats, and displays logging
// $Id$
+// This server daemon collects, formats, and displays logging
// information forwarded from client daemons running on other hosts in
// the network.
-
+//
// In addition, it also illustrates how the ACE_Reactor framework is
// used.
@@ -47,7 +47,7 @@ main (int argc, char *argv[])
if (peer_acceptor.open (addr) == -1)
ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "open"), -1);
else if (REACTOR::instance ()->register_handler
- (&peer_acceptor, ACE_Event_Handler::READ_MASK) == -1)
+ (&peer_acceptor, ACE_Event_Handler::ACCEPT_MASK) == -1)
ACE_ERROR_RETURN ((LM_ERROR, "registering service with ACE_Reactor\n"), -1);
// Run forever, performing logging service.
diff --git a/examples/Reactor/Ntalker/ntalker.cpp b/examples/Reactor/Ntalker/ntalker.cpp
index 3234c201048..f81a2c75376 100644
--- a/examples/Reactor/Ntalker/ntalker.cpp
+++ b/examples/Reactor/Ntalker/ntalker.cpp
@@ -115,10 +115,9 @@ Handle_Events::Handle_Events (u_short udp_port,
if (this->mcast_.subscribe (sockmc_addr, 1, interface) == -1)
ACE_OS::perror ("can't subscribe to multicast group"), ACE_OS::exit (1);
- // disable loopbacks
-
-// if (this->mcast_.set_option (IP_MULTICAST_LOOP, 0) == -1 )
-// ACE_OS::perror (" can't disable loopbacks " ), ACE_OS::exit (1);
+ // Disable loopbacks.
+ // if (this->mcast_.set_option (IP_MULTICAST_LOOP, 0) == -1 )
+ // ACE_OS::perror (" can't disable loopbacks " ), ACE_OS::exit (1);
this->handle_set_.set_bit (0);
this->handle_set_.set_bit (this->mcast_.get_handle ());
diff --git a/examples/Service_Configurator/IPC-tests/server/Handle_Broadcast.i b/examples/Service_Configurator/IPC-tests/server/Handle_Broadcast.i
index cbd5ffdb04b..306222534d7 100644
--- a/examples/Service_Configurator/IPC-tests/server/Handle_Broadcast.i
+++ b/examples/Service_Configurator/IPC-tests/server/Handle_Broadcast.i
@@ -62,7 +62,7 @@ Handle_Broadcast::init (int argc, char *argv[])
if (this->open (sba) == -1)
ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "open"), -1);
else if (ACE_Service_Config::reactor ()->register_handler
- (this, ACE_Event_Handler::READ_MASK) == -1)
+ (this, ACE_Event_Handler::ACCEPT_MASK) == -1)
ACE_ERROR_RETURN ((LM_ERROR, "registering service with ACE_Reactor\n"), -1);
return 0;
}
@@ -71,7 +71,7 @@ ACE_INLINE int
Handle_Broadcast::fini (void)
{
return ACE_Service_Config::reactor ()->remove_handler
- (this, ACE_Event_Handler::READ_MASK);
+ (this, ACE_Event_Handler::ACCEPT_MASK);
}
ACE_INLINE int
diff --git a/examples/Service_Configurator/IPC-tests/server/Handle_L_CODgram.i b/examples/Service_Configurator/IPC-tests/server/Handle_L_CODgram.i
index 8b631a6f252..7ebf45ba2f3 100644
--- a/examples/Service_Configurator/IPC-tests/server/Handle_L_CODgram.i
+++ b/examples/Service_Configurator/IPC-tests/server/Handle_L_CODgram.i
@@ -62,7 +62,7 @@ Handle_L_CODgram::init (int argc, char *argv[])
if (this->open (sucd) == -1)
ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "open"), -1);
else if (ACE_Service_Config::reactor ()->register_handler
- (this, ACE_Event_Handler::READ_MASK) == -1)
+ (this, ACE_Event_Handler::ACCEPT_MASK) == -1)
ACE_ERROR_RETURN ((LM_ERROR,
"registering service with ACE_Reactor\n"), -1);
return 0;
@@ -72,7 +72,7 @@ ACE_INLINE int
Handle_L_CODgram::fini(void)
{
return ACE_Service_Config::reactor ()->remove_handler
- (this, ACE_Event_Handler::READ_MASK);
+ (this, ACE_Event_Handler::ACCEPT_MASK);
}
ACE_INLINE ACE_HANDLE
diff --git a/examples/Service_Configurator/IPC-tests/server/Handle_L_Dgram.i b/examples/Service_Configurator/IPC-tests/server/Handle_L_Dgram.i
index e3ecb36ffd9..8946947f4fd 100644
--- a/examples/Service_Configurator/IPC-tests/server/Handle_L_Dgram.i
+++ b/examples/Service_Configurator/IPC-tests/server/Handle_L_Dgram.i
@@ -61,7 +61,7 @@ Handle_L_Dgram::init (int argc, char *argv[])
if (this->open (sudg) == -1)
ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "open"), -1);
else if (ACE_Service_Config::reactor ()->register_handler (this,
- ACE_Event_Handler::READ_MASK) == -1)
+ ACE_Event_Handler::ACCEPT_MASK) == -1)
ACE_ERROR_RETURN ((LM_ERROR, "registering service with ACE_Reactor\n"), -1);
return 0;
}
@@ -69,7 +69,7 @@ Handle_L_Dgram::init (int argc, char *argv[])
ACE_INLINE int
Handle_L_Dgram::fini (void)
{
- return ACE_Service_Config::reactor ()->remove_handler (this, ACE_Event_Handler::READ_MASK);
+ return ACE_Service_Config::reactor ()->remove_handler (this, ACE_Event_Handler::ACCEPT_MASK);
}
ACE_INLINE int
diff --git a/examples/Service_Configurator/IPC-tests/server/Handle_L_FIFO.i b/examples/Service_Configurator/IPC-tests/server/Handle_L_FIFO.i
index f8044d312d9..8b667cb6681 100644
--- a/examples/Service_Configurator/IPC-tests/server/Handle_L_FIFO.i
+++ b/examples/Service_Configurator/IPC-tests/server/Handle_L_FIFO.i
@@ -55,7 +55,7 @@ Handle_L_FIFO::init (int argc, char *argv[])
if (this->open (rendezvous_fifo) == -1)
ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "open"), -1);
else if (ACE_Service_Config::reactor ()->register_handler (this,
- ACE_Event_Handler::READ_MASK) == -1)
+ ACE_Event_Handler::ACCEPT_MASK) == -1)
ACE_ERROR_RETURN ((LM_ERROR, "registering service with ACE_Reactor\n"), -1);
return 0;
}
@@ -63,7 +63,7 @@ Handle_L_FIFO::init (int argc, char *argv[])
ACE_INLINE int
Handle_L_FIFO::fini (void)
{
- return ACE_Service_Config::reactor ()->remove_handler (this, ACE_Event_Handler::READ_MASK);
+ return ACE_Service_Config::reactor ()->remove_handler (this, ACE_Event_Handler::ACCEPT_MASK);
}
ACE_INLINE int
diff --git a/examples/Service_Configurator/IPC-tests/server/Handle_L_Pipe.i b/examples/Service_Configurator/IPC-tests/server/Handle_L_Pipe.i
index ff59a5ffd6b..048f66cbcff 100644
--- a/examples/Service_Configurator/IPC-tests/server/Handle_L_Pipe.i
+++ b/examples/Service_Configurator/IPC-tests/server/Handle_L_Pipe.i
@@ -1,7 +1,6 @@
/* -*- C++ -*- */
// $Id$
-
#include "ace/Get_Opt.h"
ACE_INLINE
@@ -78,7 +77,7 @@ Handle_L_Pipe::init (int argc, char *argv[])
if (this->open (sup) == -1)
ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "open"), -1);
else if (ACE_Service_Config::reactor ()->register_handler
- (this, ACE_Event_Handler::READ_MASK) == -1)
+ (this, ACE_Event_Handler::ACCEPT_MASK) == -1)
ACE_ERROR_RETURN ((LM_ERROR,
"registering service with ACE_Reactor\n"), -1);
return 0;
@@ -88,7 +87,7 @@ ACE_INLINE int
Handle_L_Pipe::fini (void)
{
return ACE_Service_Config::reactor ()->remove_handler
- (this, ACE_Event_Handler::READ_MASK);
+ (this, ACE_Event_Handler::ACCEPT_MASK);
}
ACE_INLINE int
diff --git a/examples/Service_Configurator/IPC-tests/server/Handle_L_SPIPE.i b/examples/Service_Configurator/IPC-tests/server/Handle_L_SPIPE.i
index 9bb576deccf..e129c26521c 100644
--- a/examples/Service_Configurator/IPC-tests/server/Handle_L_SPIPE.i
+++ b/examples/Service_Configurator/IPC-tests/server/Handle_L_SPIPE.i
@@ -60,8 +60,8 @@ Handle_L_SPIPE::init (int argc, char *argv[])
susp.set (rendezvous);
if (this->open (susp) == -1)
ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "open"), -1);
- else if (ACE_Service_Config::reactor ()->register_handler (this,
- ACE_Event_Handler::READ_MASK) == -1)
+ else if (ACE_Service_Config::reactor ()->register_handler
+ (this, ACE_Event_Handler::ACCEPT_MASK) == -1)
ACE_ERROR_RETURN ((LM_ERROR, "registering service with ACE_Reactor\n"), -1);
return 0;
}
@@ -69,7 +69,8 @@ Handle_L_SPIPE::init (int argc, char *argv[])
ACE_INLINE int
Handle_L_SPIPE::fini (void)
{
- return ACE_Service_Config::reactor ()->remove_handler (this, ACE_Event_Handler::READ_MASK);
+ return ACE_Service_Config::reactor ()->remove_handler
+ (this, ACE_Event_Handler::ACCEPT_MASK);
}
ACE_INLINE int
diff --git a/examples/Service_Configurator/IPC-tests/server/Handle_L_Stream.i b/examples/Service_Configurator/IPC-tests/server/Handle_L_Stream.i
index 60dbe34fb47..3fd2c09e2a7 100644
--- a/examples/Service_Configurator/IPC-tests/server/Handle_L_Stream.i
+++ b/examples/Service_Configurator/IPC-tests/server/Handle_L_Stream.i
@@ -71,7 +71,7 @@ Handle_L_Stream::init (int argc, char *argv[])
if (this->open (sus) == -1)
ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "open"), -1);
else if (ACE_Service_Config::reactor ()->register_handler
- (this, ACE_Event_Handler::READ_MASK) == -1)
+ (this, ACE_Event_Handler::ACCEPT_MASK) == -1)
ACE_ERROR_RETURN ((LM_ERROR,
"registering service with ACE_Reactor\n"), -1);
return 0;
@@ -81,7 +81,7 @@ ACE_INLINE int
Handle_L_Stream::fini (void)
{
return ACE_Service_Config::reactor ()->remove_handler
- (this, ACE_Event_Handler::READ_MASK);
+ (this, ACE_Event_Handler::ACCEPT_MASK);
}
ACE_INLINE ACE_HANDLE
diff --git a/examples/Service_Configurator/IPC-tests/server/Handle_R_Dgram.i b/examples/Service_Configurator/IPC-tests/server/Handle_R_Dgram.i
index c683f7bbab6..aaf11ceacb0 100644
--- a/examples/Service_Configurator/IPC-tests/server/Handle_R_Dgram.i
+++ b/examples/Service_Configurator/IPC-tests/server/Handle_R_Dgram.i
@@ -57,7 +57,7 @@ Handle_R_Dgram::init (int argc, char *argv[])
if (this->open (sidg) == -1)
ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "open"), -1);
else if (ACE_Service_Config::reactor ()->register_handler
- (this, ACE_Event_Handler::READ_MASK) == -1)
+ (this, ACE_Event_Handler::ACCEPT_MASK) == -1)
ACE_ERROR_RETURN ((LM_ERROR, "registering service with ACE_Reactor\n"), -1);
return 0;
}
@@ -66,7 +66,7 @@ ACE_INLINE int
Handle_R_Dgram::fini (void)
{
return ACE_Service_Config::reactor ()->remove_handler
- (this, ACE_Event_Handler::READ_MASK);
+ (this, ACE_Event_Handler::ACCEPT_MASK);
}
ACE_INLINE int
diff --git a/examples/Service_Configurator/IPC-tests/server/Handle_R_Stream.i b/examples/Service_Configurator/IPC-tests/server/Handle_R_Stream.i
index 8c4439f8c0f..aa17b7ca160 100644
--- a/examples/Service_Configurator/IPC-tests/server/Handle_R_Stream.i
+++ b/examples/Service_Configurator/IPC-tests/server/Handle_R_Stream.i
@@ -58,8 +58,8 @@ Handle_R_Stream::init (int argc, char *argv[])
if (this->open (sis) == -1)
ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "open"), -1);
- else if (ACE_Service_Config::reactor ()->register_handler (this,
- ACE_Event_Handler::READ_MASK) == -1)
+ else if (ACE_Service_Config::reactor ()->register_handler
+ (this, ACE_Event_Handler::ACCEPT_MASK) == -1)
ACE_ERROR_RETURN ((LM_ERROR, "registering service with ACE_Reactor\n"), -1);
return 0;
}
@@ -67,7 +67,8 @@ Handle_R_Stream::init (int argc, char *argv[])
ACE_INLINE int
Handle_R_Stream::fini (void)
{
- return ACE_Service_Config::reactor ()->remove_handler (this, ACE_Event_Handler::READ_MASK);
+ return ACE_Service_Config::reactor ()->remove_handler
+ (this, ACE_Event_Handler::ACCEPT_MASK);
}
ACE_INLINE int
diff --git a/examples/Service_Configurator/IPC-tests/server/Handle_Thr_Stream.cpp b/examples/Service_Configurator/IPC-tests/server/Handle_Thr_Stream.cpp
index 94b41367a5c..f7e2d98f69a 100644
--- a/examples/Service_Configurator/IPC-tests/server/Handle_Thr_Stream.cpp
+++ b/examples/Service_Configurator/IPC-tests/server/Handle_Thr_Stream.cpp
@@ -1,6 +1,6 @@
-#if !defined (ACE_HANDLE_THR_STREAM_C)
// $Id$
+#if !defined (ACE_HANDLE_THR_STREAM_C)
#define ACE_HANDLE_THR_STREAM_C
#include "ace/Get_Opt.h"
@@ -13,31 +13,20 @@
#include "Handle_Thr_Stream.i"
#endif /* __ACE_INLINE__ */
-// Shorthand names.
-#define SH SVC_HANDLER
-#define PR_AC_1 ACE_PEER_ACCEPTOR_1
-#define PR_AC_2 ACE_PEER_ACCEPTOR_2
-#define PR_ST_1 ACE_PEER_STREAM_1
-#define PR_ST_2 ACE_PEER_STREAM_2
-
template <class SH, PR_AC_1>
-Handle_Thr_Stream<SH, PR_AC_2>::~Handle_Thr_Stream (void)
+Handle_Thr_Acceptor<SH, PR_AC_2>::~Handle_Thr_Acceptor (void)
{
}
template <class SH, PR_AC_1>
-Handle_Thr_Stream<SH, PR_AC_2>::Handle_Thr_Stream (void)
-#if defined (ACE_HAS_THREADS)
+Handle_Thr_Acceptor<SH, PR_AC_2>::Handle_Thr_Acceptor (void)
: thr_flags_ (THR_DETACHED | THR_NEW_LWP)
-#else
- : thr_flags_ (0)
-#endif /* ACE_HAS_THREADS */
{
}
template <class SH, PR_AC_1> int
-Handle_Thr_Stream<SH, PR_AC_2>::info (char **strp,
- size_t length) const
+Handle_Thr_Acceptor<SH, PR_AC_2>::info (char **strp,
+ size_t length) const
{
char buf[BUFSIZ];
ACE_INET_Addr sa;
@@ -56,7 +45,7 @@ Handle_Thr_Stream<SH, PR_AC_2>::info (char **strp,
}
template <class SH, PR_AC_1> int
-Handle_Thr_Stream<SH, PR_AC_2>::init (int argc, char *argv[])
+Handle_Thr_Acceptor<SH, PR_AC_2>::init (int argc, char *argv[])
{
ACE_INET_Addr local_addr (inherited::DEFAULT_PORT_);
int n_threads = ACE_DEFAULT_THREADS;
@@ -64,19 +53,17 @@ Handle_Thr_Stream<SH, PR_AC_2>::init (int argc, char *argv[])
ACE_Get_Opt get_opt (argc, argv, "p:t:", 0);
for (int c; (c = get_opt ()) != -1; )
- {
- switch (c)
- {
- case 'p':
- local_addr.set (ACE_OS::atoi (get_opt.optarg));
- break;
- case 't':
- n_threads = ACE_OS::atoi (get_opt.optarg);
- break;
- default:
- break;
- }
- }
+ switch (c)
+ {
+ case 'p':
+ local_addr.set (ACE_OS::atoi (get_opt.optarg));
+ break;
+ case 't':
+ n_threads = ACE_OS::atoi (get_opt.optarg);
+ break;
+ default:
+ break;
+ }
// Initialize the threading strategy.
if (this->thr_strategy_.open (&this->thr_mgr_,
@@ -94,10 +81,10 @@ Handle_Thr_Stream<SH, PR_AC_2>::init (int argc, char *argv[])
}
template <class SH, PR_AC_1> int
-Handle_Thr_Stream<SH, PR_AC_2>::fini (void)
+Handle_Thr_Acceptor<SH, PR_AC_2>::fini (void)
{
return ACE_Service_Config::reactor ()->remove_handler
- (this, ACE_Event_Handler::READ_MASK);
+ (this, ACE_Event_Handler::ACCEPT_MASK);
}
template <PR_ST_1>
@@ -112,7 +99,7 @@ CLI_Stream<PR_ST_2>::close (u_long)
ACE_DEBUG ((LM_DEBUG, "(%t) client stream object closing down\n"));
this->peer ().close ();
- /* Must be allocated dynamically! */
+ // Must be allocated dynamically!
delete this;
return 0;
}
@@ -159,12 +146,6 @@ CLI_Stream<PR_ST_2>::svc (void)
return 0;
}
-#undef SH
-#undef PR_AC_1
-#undef PR_AC_2
-#undef PR_ST_1
-#undef PR_ST_2
-
//----------------------------------------
#if defined (ACE_HAS_TLI)
@@ -181,19 +162,19 @@ CLI_Stream<PR_ST_2>::svc (void)
#include "ace/INET_Addr.h"
typedef CLI_Stream <THR_STREAM> CLI_STREAM;
-typedef Handle_Thr_Stream<CLI_STREAM, THR_ACCEPTOR> HANDLE_THR_STREAM;
+typedef Handle_Thr_Acceptor<CLI_STREAM, THR_ACCEPTOR> HANDLE_THR_ACCEPTOR;
-/* Static class variables */
+// Static class variables.
-u_short HANDLE_THR_STREAM::DEFAULT_PORT_ = ACE_DEFAULT_THR_PORT;
+u_short HANDLE_THR_ACCEPTOR::DEFAULT_PORT_ = ACE_DEFAULT_THR_PORT;
-/* Service object */
-HANDLE_THR_STREAM remote_thr_stream;
+// Service object.
+HANDLE_THR_ACCEPTOR remote_thr_stream;
ACE_Service_Object_Type rts (&remote_thr_stream, "Remote_Thr_Stream");
#if defined (ACE_TEMPLATES_REQUIRE_SPECIALIZATION)
template class CLI_Stream<THR_STREAM>;
-template class Handle_Thr_Stream<CLI_Stream<THR_STREAM>, THR_ACCEPTOR>;
+template class Handle_Thr_Acceptor<CLI_Stream<THR_STREAM>, THR_ACCEPTOR>;
#endif /* ACE_TEMPLATES_REQUIRE_SPECIALIZATION */
#endif /* ACE_HAS_THREADS */
diff --git a/examples/Service_Configurator/IPC-tests/server/Handle_Thr_Stream.h b/examples/Service_Configurator/IPC-tests/server/Handle_Thr_Stream.h
index 9c12d872f47..be17d2d0338 100644
--- a/examples/Service_Configurator/IPC-tests/server/Handle_Thr_Stream.h
+++ b/examples/Service_Configurator/IPC-tests/server/Handle_Thr_Stream.h
@@ -1,7 +1,7 @@
/* -*- C++ -*- */
// $Id$
-/* Handle connections from remote INET connections. */
+// Handle connections from remote INET connections.
#if !defined (_HANDLE_THR_STREAM_H)
#define _HANDLE_THR_STREAM_H
@@ -12,12 +12,12 @@
#if defined (ACE_HAS_THREADS)
template <class SVC_HANDLER, ACE_PEER_ACCEPTOR_1>
-class Handle_Thr_Stream : public ACE_Strategy_Acceptor<SVC_HANDLER, ACE_PEER_ACCEPTOR_2>
+class Handle_Thr_Acceptor : public ACE_Strategy_Acceptor<SVC_HANDLER, ACE_PEER_ACCEPTOR_2>
{
public:
// = Initialization and termination.
- Handle_Thr_Stream (void);
- ~Handle_Thr_Stream (void);
+ Handle_Thr_Acceptor (void);
+ ~Handle_Thr_Acceptor (void);
// = Dynamic linking hooks.
virtual int init (int argc, char *argv[]);
@@ -25,7 +25,7 @@ public:
virtual int fini (void);
private:
- typedef Handle_Thr_Stream<SVC_HANDLER, ACE_PEER_ACCEPTOR_2> inherited;
+ typedef Handle_Thr_Acceptor<SVC_HANDLER, ACE_PEER_ACCEPTOR_2> inherited;
static u_short DEFAULT_PORT_;
diff --git a/netsvcs/lib/Logging_Strategy.cpp b/netsvcs/lib/Logging_Strategy.cpp
index a0cddc36176..877feb8ead0 100644
--- a/netsvcs/lib/Logging_Strategy.cpp
+++ b/netsvcs/lib/Logging_Strategy.cpp
@@ -37,25 +37,20 @@ private:
void
ACE_Logging_Strategy::tokenize (char *flag_string)
{
- char *flag;
- if ((flag = ACE_OS::strtok (flag_string, "|")) != NULL)
+ for (char *flag = ACE_OS::strtok (flag_string, "|");
+ flag != 0;
+ flag = ACE_OS::strtok (0, "|"))
{
- while (flag)
- {
- if (ACE_OS::strcmp (flag, "STDERR") == 0)
- ACE_SET_BITS (this->flags_, ACE_Log_Msg::STDERR);
- else if (ACE_OS::strcmp (flag, "LOGGER") == 0)
- ACE_SET_BITS (this->flags_, ACE_Log_Msg::LOGGER);
- else if (ACE_OS::strcmp (flag, "OSTREAM") == 0)
- ACE_SET_BITS (this->flags_, ACE_Log_Msg::OSTREAM);
- else if (ACE_OS::strcmp (flag, "VERBOSE") == 0)
- ACE_SET_BITS (this->flags_, ACE_Log_Msg::VERBOSE);
- else if (ACE_OS::strcmp (flag, "SILENT") == 0)
- ACE_SET_BITS (this->flags_, ACE_Log_Msg::SILENT);
-
- // Get the next flag
- flag = ACE_OS::strtok(0, "|");
- }
+ if (ACE_OS::strcmp (flag, "STDERR") == 0)
+ ACE_SET_BITS (this->flags_, ACE_Log_Msg::STDERR);
+ else if (ACE_OS::strcmp (flag, "LOGGER") == 0)
+ ACE_SET_BITS (this->flags_, ACE_Log_Msg::LOGGER);
+ else if (ACE_OS::strcmp (flag, "OSTREAM") == 0)
+ ACE_SET_BITS (this->flags_, ACE_Log_Msg::OSTREAM);
+ else if (ACE_OS::strcmp (flag, "VERBOSE") == 0)
+ ACE_SET_BITS (this->flags_, ACE_Log_Msg::VERBOSE);
+ else if (ACE_OS::strcmp (flag, "SILENT") == 0)
+ ACE_SET_BITS (this->flags_, ACE_Log_Msg::SILENT);
}
}
diff --git a/netsvcs/servers/main.cpp b/netsvcs/servers/main.cpp
index 130b7f1ab30..b2f614c1567 100644
--- a/netsvcs/servers/main.cpp
+++ b/netsvcs/servers/main.cpp
@@ -9,31 +9,56 @@
#include "Server_Logging_Handler.h"
#include "Logging_Strategy.h"
+class Service_Ptr
+{
+public:
+ Service_Ptr (ACE_Service_Object *so)
+ : service_object_ (so) {}
+
+ ~Service_Ptr (void) { this->service_object_->fini (); }
+
+ ACE_Service_Object *operator-> () { return this->service_object_; }
+
+private:
+ ACE_Service_Object *service_object_;
+ // Holds the service object until we're done.
+};
+
int
main (int argc, char *argv[])
{
ACE_Service_Config daemon;
+ // Create an adapter to end the event loop.
+ ACE_Sig_Adapter sa ((ACE_Sig_Handler_Ex) ACE_Service_Config::end_reactor_event_loop);
+
+ ACE_Sig_Set sig_set;
+ sig_set.sig_add (SIGINT);
+ sig_set.sig_add (SIGQUIT);
+
+ // Register ourselves to receive SIGINT and SIGQUIT so we can shut
+ // down gracefully via signals.
+ ACE_Service_Config::reactor ()->register_handler (sig_set, &sa);
+
// Try to link in the svc.conf entries dynamically.
if (daemon.open (argc, argv) == -1)
{
if (errno != ENOENT)
ACE_ERROR ((LM_ERROR, "%p\n%a", "open", 1));
- else // Use static binding.
+ else // Use static linking.
{
char *l_argv[3];
- ACE_Service_Object *so;
l_argv[0] = "-p " ACE_DEFAULT_NAME_SERVER_PORT_STR;
l_argv[1] = 0;
- so = ACE_SVC_INVOKE (ACE_Name_Acceptor);
+ Service_Ptr sp_1 = ACE_SVC_INVOKE (ACE_Name_Acceptor);
- if (so->init (1, l_argv) == -1)
+ if (sp_1->init (1, l_argv) == -1)
ACE_ERROR ((LM_ERROR, "%p\n%a", "Name_Service", 1));
l_argv[0] = "-p " ACE_DEFAULT_TIME_SERVER_PORT_STR;
l_argv[1] = 0;
- so = ACE_SVC_INVOKE (ACE_TS_Server_Acceptor);
+ Service_Ptr sp_2 = ACE_SVC_INVOKE (ACE_TS_Server_Acceptor);
if (so->init (1, l_argv) == -1)
ACE_ERROR ((LM_ERROR, "%p\n%a", "ACE_TS_Server_Acceptor", 1));
@@ -41,49 +66,46 @@ main (int argc, char *argv[])
l_argv[0] = argv[0];
l_argv[1] = "-p 10011";
l_argv[2] = 0;
- so = ACE_SVC_INVOKE (ACE_TS_Clerk_Processor);
+ Service_Ptr sp_2 = ACE_SVC_INVOKE (ACE_TS_Clerk_Processor);
- if (so->init (2, l_argv) == -1)
+ if (sp_2->init (2, l_argv) == -1)
ACE_ERROR ((LM_ERROR, "%p\n%a", "ACE_TS_Clerk_Processor", 1));
l_argv[0] = "-p " ACE_DEFAULT_TOKEN_SERVER_PORT_STR;
l_argv[1] = 0;
- so = ACE_SVC_INVOKE (ACE_Token_Acceptor);
+ Service_Ptr sp_3 = ACE_SVC_INVOKE (ACE_Token_Acceptor);
- if (so->init (1, l_argv) == -1)
+ if (sp_3->init (1, l_argv) == -1)
ACE_ERROR ((LM_ERROR, "%p\n%a", "Token_Service", 1));
l_argv[0] = "-p " ACE_DEFAULT_LOGGING_SERVER_PORT_STR;
l_argv[1] = 0;
- so = ACE_SVC_INVOKE (ACE_Server_Logging_Acceptor);
+ Service_Ptr sp_4 = ACE_SVC_INVOKE (ACE_Server_Logging_Acceptor);
- if (so->init (1, l_argv) == -1)
+ if (sp_4->init (1, l_argv) == -1)
ACE_ERROR ((LM_ERROR, "%p\n%a", "Logging_Service", 1));
l_argv[0] = "-p " ACE_DEFAULT_THR_LOGGING_SERVER_PORT_STR;
l_argv[1] = 0;
- so = ACE_SVC_INVOKE (ACE_Thr_Server_Logging_Acceptor);
+ Service_Ptr sp_5 = ACE_SVC_INVOKE (ACE_Thr_Server_Logging_Acceptor);
- if (so->init (1, l_argv) == -1)
+ if (sp_5->init (1, l_argv) == -1)
ACE_ERROR ((LM_ERROR, "%p\n%a", "Thr_Logging_Service", 1));
}
- }
- // Create an adapter to end the event loop.
- ACE_Sig_Adapter sa ((ACE_Sig_Handler_Ex) ACE_Service_Config::end_reactor_event_loop);
+ // Run forever, performing the configured services until we are shut
+ // down by a SIGINT/SIGQUIT signal.
- ACE_Sig_Set sig_set;
- sig_set.sig_add (SIGINT);
- sig_set.sig_add (SIGQUIT);
+ daemon.run_reactor_event_loop ();
- // Register ourselves to receive SIGINT and SIGQUIT so we can shut
- // down gracefully via signals.
- ACE_Service_Config::reactor ()->register_handler (sig_set, &sa);
+ // Destructors of Service_Ptr's automagically call fini().
+ }
+ else // Use dynamic linking.
- // Run forever, performing the configured services until we are shut
- // down by a SIGINT/SIGQUIT signal.
+ // Run forever, performing the configured services until we are shut
+ // down by a SIGINT/SIGQUIT signal.
- daemon.run_reactor_event_loop ();
+ daemon.run_reactor_event_loop ();
return 0;
}