summaryrefslogtreecommitdiff
path: root/examples
diff options
context:
space:
mode:
authorschmidt <douglascraigschmidt@users.noreply.github.com>1997-01-02 09:05:39 +0000
committerschmidt <douglascraigschmidt@users.noreply.github.com>1997-01-02 09:05:39 +0000
commit1c44106287219a05ddbff09df4574b90777040ae (patch)
tree1d371fe6828480e7cecdcb75a8887a2b4bb83f53 /examples
parentfbcfcdb6ff9975a9e2152be0a5dc7e28a32635fc (diff)
downloadATCD-1c44106287219a05ddbff09df4574b90777040ae.tar.gz
foo
Diffstat (limited to 'examples')
-rw-r--r--examples/ASX/Event_Server/Event_Server/Consumer_Router.cpp138
-rw-r--r--examples/ASX/Event_Server/Event_Server/Consumer_Router.h49
-rw-r--r--examples/ASX/Event_Server/Event_Server/Event_Analyzer.cpp11
-rw-r--r--examples/ASX/Event_Server/Event_Server/Event_Analyzer.h12
-rw-r--r--examples/ASX/Event_Server/Event_Server/Makefile117
-rw-r--r--examples/ASX/Event_Server/Event_Server/Options.cpp45
-rw-r--r--examples/ASX/Event_Server/Event_Server/Options.h93
-rw-r--r--examples/ASX/Event_Server/Event_Server/Options.i8
-rw-r--r--examples/ASX/Event_Server/Event_Server/Peer_Router.cpp347
-rw-r--r--examples/ASX/Event_Server/Event_Server/Peer_Router.h142
-rw-r--r--examples/ASX/Event_Server/Event_Server/Supplier_Router.cpp133
-rw-r--r--examples/ASX/Event_Server/Event_Server/Supplier_Router.h59
-rw-r--r--examples/ASX/Event_Server/Event_Server/event_server.cpp84
-rw-r--r--examples/ASX/Event_Server/README46
-rw-r--r--examples/ASX/Event_Server/Transceiver/transceiver.cpp122
-rw-r--r--examples/Connection/non_blocking/CPP-connector.cpp44
-rw-r--r--examples/Logger/Acceptor-server/server_loggerd.cpp2
-rw-r--r--examples/Logger/simple-server/Logging_Acceptor.cpp2
-rw-r--r--examples/Logger/simple-server/server_loggerd.cpp6
-rw-r--r--examples/Reactor/Ntalker/ntalker.cpp7
-rw-r--r--examples/Service_Configurator/IPC-tests/server/Handle_Broadcast.i4
-rw-r--r--examples/Service_Configurator/IPC-tests/server/Handle_L_CODgram.i4
-rw-r--r--examples/Service_Configurator/IPC-tests/server/Handle_L_Dgram.i4
-rw-r--r--examples/Service_Configurator/IPC-tests/server/Handle_L_FIFO.i4
-rw-r--r--examples/Service_Configurator/IPC-tests/server/Handle_L_Pipe.i5
-rw-r--r--examples/Service_Configurator/IPC-tests/server/Handle_L_SPIPE.i7
-rw-r--r--examples/Service_Configurator/IPC-tests/server/Handle_L_Stream.i4
-rw-r--r--examples/Service_Configurator/IPC-tests/server/Handle_R_Dgram.i4
-rw-r--r--examples/Service_Configurator/IPC-tests/server/Handle_R_Stream.i7
-rw-r--r--examples/Service_Configurator/IPC-tests/server/Handle_Thr_Stream.cpp71
-rw-r--r--examples/Service_Configurator/IPC-tests/server/Handle_Thr_Stream.h10
31 files changed, 872 insertions, 719 deletions
diff --git a/examples/ASX/Event_Server/Event_Server/Consumer_Router.cpp b/examples/ASX/Event_Server/Event_Server/Consumer_Router.cpp
index 41b5dcb6ea6..67c61bca9b4 100644
--- a/examples/ASX/Event_Server/Event_Server/Consumer_Router.cpp
+++ b/examples/ASX/Event_Server/Event_Server/Consumer_Router.cpp
@@ -1,31 +1,10 @@
-
// $Id$
#include "Consumer_Router.h"
#include "Options.h"
-#if defined (ACE_HAS_THREADS)
-
-typedef Acceptor_Factory<Consumer_Handler, CONSUMER_KEY> CONSUMER_FACTORY;
-
-int
-Consumer_Handler::open (void *a)
-{
- CONSUMER_FACTORY *af = (CONSUMER_FACTORY *) a;
- this->router_task_ = af->router ();
- return this->Peer_Handler<CONSUMER_ROUTER, CONSUMER_KEY>::open (a);
-}
-
-Consumer_Handler::Consumer_Handler (ACE_Thread_Manager *tm)
- : Peer_Handler<CONSUMER_ROUTER, CONSUMER_KEY> (tm)
-{
-}
-
-// Create a new handler that will interact with a consumer and point
-// its ROUTER_TASK_ data member to the CONSUMER_ROUTER.
-
-Consumer_Router::Consumer_Router (ACE_Thread_Manager *tm)
- : CONSUMER_ROUTER (tm)
+Consumer_Router::Consumer_Router (Peer_Router_Context *prc)
+ : Peer_Router (prc)
{
}
@@ -34,93 +13,122 @@ Consumer_Router::Consumer_Router (ACE_Thread_Manager *tm)
int
Consumer_Router::open (void *)
{
- assert (this->is_reader ());
-
- char *argv[4];
-
- argv[0] = (char *) this->name ();
- argv[1] = "-p";
- argv[2] = options.consumer_port ();
- argv[3] = 0;
-
- if (this->init (2, &argv[1]) == -1)
- return -1;
+ if (this->is_writer ())
+ {
+ // Set the Peer_Router_Context to point back to us so that if
+ // any Consumer's "accidentally" send us data we'll be able to
+ // handle it.
+ this->context ()->peer_router (this);
+
+ // Make this an active object to handle the error cases in a
+ // separate thread.
+ return this->activate (Options::instance ()->t_flags ());
+ }
+ else // if (this->is_reader ())
- // Make this an active object.
- return this->activate (options.t_flags ());
+ // Nothing to do since this side is primarily used to transmit to
+ // Consumers, rather than receive.
+ return 0;
}
int
Consumer_Router::close (u_long)
{
- assert (this->is_reader ());
- ACE_DEBUG ((LM_DEBUG, "(%t) closing Consumer_Router\n"));
- this->peer_map_.close ();
+ ACE_DEBUG ((LM_DEBUG, "(%t) closing Consumer_Router %s\n",
+ this->is_reader () ? "reader" : "writer"));
- // Inform the thread to shut down.
- this->msg_queue ()->deactivate ();
+ if (this->is_writer ())
+ // Inform the thread to shut down.
+ this->msg_queue ()->deactivate ();
+
+ // Both writer and reader call release(), so the context knows when
+ // to clean itself up.
+ this->context ()->release ();
return 0;
}
-// Handle incoming messages in a separate thread.
+// Handle incoming messages in a separate thread.
int
Consumer_Router::svc (void)
{
- ACE_Thread_Control tc (this->thr_mgr ());
- ACE_Message_Block *mb = 0;
+ assert (this->is_writer ());
- assert (this->is_reader ());
+ ACE_Thread_Control tc (this->thr_mgr ());
- ACE_DEBUG ((LM_DEBUG, "(%t) starting svc in Consumer_Router\n"));
+ ACE_DEBUG ((LM_DEBUG,
+ "(%t) starting svc in Consumer_Router\n"));
- while (this->getq (mb) >= 0)
+ for (ACE_Message_Block *mb = 0;
+ this->getq (mb) >= 0;
+ )
{
- ACE_DEBUG ((LM_DEBUG, "Consumer_Router is routing via send_peers\n"));
- if (this->send_peers (mb) == -1)
+ ACE_DEBUG ((LM_DEBUG,
+ "(%t) warning: Consumer_Router is forwarding a message to Supplier_Router\n"));
+
+ // Pass this message down to the next Module's writer Task.
+ if (this->put_next (mb) == -1)
ACE_ERROR_RETURN ((LM_ERROR,
"(%t) send_peers failed in Consumer_Router\n"),
-1);
}
- ACE_DEBUG ((LM_DEBUG, "(%t) stopping svc in Consumer_Router\n"));
+
+ ACE_DEBUG ((LM_DEBUG,
+ "(%t) stopping svc in Consumer_Router\n"));
return 0;
// Note the implicit ACE_OS::thr_exit() via destructor.
}
-// Send a MESSAGE_BLOCK to the supplier(s).
+// Send a <Message_Block> to the supplier(s).
int
-Consumer_Router::put (ACE_Message_Block *mb, ACE_Time_Value *)
+Consumer_Router::put (ACE_Message_Block *mb,
+ ACE_Time_Value *)
{
- assert (this->is_reader ());
+ // Perform the necessary control operations before passing
+ // the message down the stream.
if (mb->msg_type () == ACE_Message_Block::MB_IOCTL)
{
this->control (mb);
return this->put_next (mb);
}
- else
- // Queue up the message, which will be processed by
- // Consumer_Router::svc().
+
+ // If we're the reader side then we're responsible for broadcasting
+ // messages to Consumers.
+
+ else if (this->is_reader ())
+ {
+ if (this->context ()->send_peers (mb) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "(%t) send_peers failed in Consumer_Router\n"),
+ -1);
+ else
+ return 0;
+ }
+ else // if (this->is_writer ())
+
+ // Queue up the message to processed by Consumer_Router::svc().
+ // Since we don't expect to be getting many of these messages, we
+ // queue them up and run them in a separate thread to avoid taxing
+ // the main thread.
return this->putq (mb);
}
-
-// Return information about the Client_Router ACE_Module.
+// Return information about the Client_Router ACE_Module.
int
Consumer_Router::info (char **strp, size_t length) const
{
- char buf[BUFSIZ];
+ char buf[BUFSIZ];
ACE_INET_Addr addr;
const char *mod_name = this->name ();
- ACE_SOCK_Acceptor &sa = this->acceptor_->acceptor ();
- if (sa.get_local_addr (addr) == -1)
+ if (this->context ()->acceptor ().get_local_addr (addr) == -1)
return -1;
- ACE_OS::sprintf (buf, "%s\t %d/%s %s",
- mod_name, addr.get_port_number (), "tcp",
- "# consumer router\n");
+ ACE_OS::sprintf (buf, "%s\t %d/%s %s (%s)\n",
+ mod_name, addr.get_port_number (), "tcp",
+ "# consumer router", this->is_reader () ? "reader" : "writer");
if (*strp == 0 && (*strp = ACE_OS::strdup (mod_name)) == 0)
return -1;
@@ -128,5 +136,3 @@ Consumer_Router::info (char **strp, size_t length) const
ACE_OS::strncpy (*strp, mod_name, length);
return ACE_OS::strlen (mod_name);
}
-
-#endif /* ACE_HAS_THREADS */
diff --git a/examples/ASX/Event_Server/Event_Server/Consumer_Router.h b/examples/ASX/Event_Server/Event_Server/Consumer_Router.h
index 1fb9f4e40f2..cdaf7090b97 100644
--- a/examples/ASX/Event_Server/Event_Server/Consumer_Router.h
+++ b/examples/ASX/Event_Server/Event_Server/Consumer_Router.h
@@ -1,46 +1,49 @@
/* -*- C++ -*- */
// $Id$
-/* The interface between one or more consumers and an Event Server ACE_Stream */
-
#if !defined (_CONSUMER_ROUTER_H)
#define _CONSUMER_ROUTER_H
-#include "ace/Thread_Manager.h"
#include "ace/SOCK_Acceptor.h"
#include "ace/UPIPE_Acceptor.h"
#include "ace/Svc_Handler.h"
#include "Peer_Router.h"
-#if defined (ACE_HAS_THREADS)
-
-class Consumer_Handler; /* Forward declaration... */
-
-typedef ACE_HANDLE CONSUMER_KEY;
-
-typedef Peer_Router<Consumer_Handler, CONSUMER_KEY> CONSUMER_ROUTER;
-
-class Consumer_Handler : public Peer_Handler<CONSUMER_ROUTER, CONSUMER_KEY>
-{
-public:
- Consumer_Handler (ACE_Thread_Manager *tm = 0);
- virtual int open (void *);
-};
-
-class Consumer_Router : public CONSUMER_ROUTER
+class Consumer_Router : public Peer_Router
+ // = TITLE
+ // Provides the interface between one or more Consumers and the
+ // Event Server ACE_Stream.
{
public:
- Consumer_Router (ACE_Thread_Manager *thr_manager);
+ Consumer_Router (Peer_Router_Context *prc);
+ // Initialization method.
protected:
- /* ACE_Task hooks. */
+ // = ACE_Task hooks.
+
+ // All of these methods are called via base class pointers by the
+ // ACE Stream apparatus. Therefore, we can put them in the
+ // protected section.
+
virtual int open (void *a = 0);
+ // Called by the Stream to initialize the router.
+
virtual int close (u_long flags = 0);
+ // Called by the Stream to shutdown the router.
+
virtual int put (ACE_Message_Block *msg, ACE_Time_Value * = 0);
+ // Called by the Consumer_Handler to pass a message to the Router.
+ // The Router queues up this message, which is then processed in the
+ // <svc> method in a separate thread.
+
virtual int svc (void);
+ // Runs in a separate thread to dequeue messages and pass them up
+ // the stream.
+
+ // = Dynamic linking hooks.
- /* Dynamic linking hooks */
virtual int info (char **info_string, size_t length) const;
+ // Returns information about this service.
};
-#endif /* ACE_HAS_THREADS */
+
#endif /* _CONSUMER_ROUTER_H */
diff --git a/examples/ASX/Event_Server/Event_Server/Event_Analyzer.cpp b/examples/ASX/Event_Server/Event_Server/Event_Analyzer.cpp
index 5a63224eb51..e413c16acce 100644
--- a/examples/ASX/Event_Server/Event_Server/Event_Analyzer.cpp
+++ b/examples/ASX/Event_Server/Event_Server/Event_Analyzer.cpp
@@ -1,8 +1,7 @@
-#include "Event_Analyzer.h"
// $Id$
-
-#if defined (ACE_HAS_THREADS)
+#include "Options.h"
+#include "Event_Analyzer.h"
int
Event_Analyzer::open (void *)
@@ -35,6 +34,10 @@ Event_Analyzer::control (ACE_Message_Block *mb)
int
Event_Analyzer::put (ACE_Message_Block *mb, ACE_Time_Value *)
{
+ if (Options::instance ()->debug ())
+ ACE_DEBUG ((LM_DEBUG, "(%t) passing through Event_Analyser::put() (%s)\n",
+ this->is_reader () ? "reader" : "writer"));
+
if (mb->msg_type () == ACE_Message_Block::MB_IOCTL)
this->control (mb);
@@ -64,5 +67,3 @@ Event_Analyzer::info (char **strp, size_t length) const
ACE_OS::strncpy (*strp, mod_name, length);
return ACE_OS::strlen (mod_name);
}
-
-#endif /* ACE_HAS_THREADS */
diff --git a/examples/ASX/Event_Server/Event_Server/Event_Analyzer.h b/examples/ASX/Event_Server/Event_Server/Event_Analyzer.h
index 498e91d476e..3eb012c6add 100644
--- a/examples/ASX/Event_Server/Event_Server/Event_Analyzer.h
+++ b/examples/ASX/Event_Server/Event_Server/Event_Analyzer.h
@@ -1,8 +1,6 @@
/* -*- C++ -*- */
// $Id$
-/* Signal router */
-
#if !defined (_EVENT_ANALYZER_H)
#define _EVENT_ANALYZER_H
@@ -11,16 +9,19 @@
#include "ace/Task.h"
#include "ace/Synch.h"
-#if defined (ACE_HAS_THREADS)
-
class Event_Analyzer : public ACE_Task<ACE_MT_SYNCH>
+ // = TITLE
+ // This is a "no-op" class that just forwards all its message
+ // blocks onto its neighboring Module in the Stream. In a real
+ // application these tasks would be where the Stream processing
+ // would go.
{
public:
virtual int open (void *a = 0);
virtual int close (u_long flags = 0);
virtual int put (ACE_Message_Block *msg, ACE_Time_Value * = 0);
- /* Dynamic linking hooks */
+ // Dynamic linking hooks.
virtual int init (int argc, char *argv[]);
virtual int fini (void);
virtual int info (char **info_string, size_t length) const;
@@ -29,5 +30,4 @@ private:
virtual int control (ACE_Message_Block *);
};
-#endif /* ACE_HAS_THREADS */
#endif /* _EVENT_ANALYZER_H */
diff --git a/examples/ASX/Event_Server/Event_Server/Makefile b/examples/ASX/Event_Server/Event_Server/Makefile
index c94314b19fd..15c448561a1 100644
--- a/examples/ASX/Event_Server/Event_Server/Makefile
+++ b/examples/ASX/Event_Server/Event_Server/Makefile
@@ -120,21 +120,21 @@ include $(WRAPPER_ROOT)/include/makeinclude/rules.local.GNU
$(WRAPPER_ROOT)/ace/Set.h \
$(WRAPPER_ROOT)/ace/Mem_Map.h \
$(WRAPPER_ROOT)/ace/IO_Cntl_Msg.h \
+ $(WRAPPER_ROOT)/ace/Strategies.h \
+ $(WRAPPER_ROOT)/ace/Strategies_T.h \
$(WRAPPER_ROOT)/ace/Service_Config.h \
- $(WRAPPER_ROOT)/ace/Proactor.h \
- $(WRAPPER_ROOT)/ace/Timer_Queue.h \
- $(WRAPPER_ROOT)/ace/Timer_Queue.i \
- $(WRAPPER_ROOT)/ace/ReactorEx.h \
- $(WRAPPER_ROOT)/ace/Token.h \
$(WRAPPER_ROOT)/ace/Reactor.h \
$(WRAPPER_ROOT)/ace/Handle_Set.h \
+ $(WRAPPER_ROOT)/ace/Timer_Queue.h \
+ $(WRAPPER_ROOT)/ace/Token.h \
$(WRAPPER_ROOT)/ace/Pipe.h \
$(WRAPPER_ROOT)/ace/Pipe.i \
$(WRAPPER_ROOT)/ace/Reactor.i \
+ $(WRAPPER_ROOT)/ace/Proactor.h \
+ $(WRAPPER_ROOT)/ace/ReactorEx.h \
$(WRAPPER_ROOT)/ace/Svc_Conf_Tokens.h \
Peer_Router.h \
$(WRAPPER_ROOT)/ace/Acceptor.h \
- $(WRAPPER_ROOT)/ace/Strategies.h \
$(WRAPPER_ROOT)/ace/Acceptor.i \
Options.h \
$(WRAPPER_ROOT)/ace/Profile_Timer.h \
@@ -174,10 +174,35 @@ include $(WRAPPER_ROOT)/include/makeinclude/rules.local.GNU
$(WRAPPER_ROOT)/ace/Thread_Manager.h \
$(WRAPPER_ROOT)/ace/Thread.h \
$(WRAPPER_ROOT)/ace/Task_T.h \
- $(WRAPPER_ROOT)/ace/Message_Queue.h
+ $(WRAPPER_ROOT)/ace/Message_Queue.h \
+ $(WRAPPER_ROOT)/ace/Strategies.h \
+ $(WRAPPER_ROOT)/ace/Strategies_T.h \
+ $(WRAPPER_ROOT)/ace/Service_Config.h \
+ $(WRAPPER_ROOT)/ace/Reactor.h \
+ $(WRAPPER_ROOT)/ace/Handle_Set.h \
+ $(WRAPPER_ROOT)/ace/Timer_Queue.h \
+ $(WRAPPER_ROOT)/ace/Token.h \
+ $(WRAPPER_ROOT)/ace/Pipe.h \
+ $(WRAPPER_ROOT)/ace/Pipe.i \
+ $(WRAPPER_ROOT)/ace/SOCK_Stream.h \
+ $(WRAPPER_ROOT)/ace/SOCK_IO.h \
+ $(WRAPPER_ROOT)/ace/SOCK.h \
+ $(WRAPPER_ROOT)/ace/Addr.h \
+ $(WRAPPER_ROOT)/ace/IPC_SAP.h \
+ $(WRAPPER_ROOT)/ace/IPC_SAP.i \
+ $(WRAPPER_ROOT)/ace/SOCK.i \
+ $(WRAPPER_ROOT)/ace/SOCK_IO.i \
+ $(WRAPPER_ROOT)/ace/INET_Addr.h \
+ $(WRAPPER_ROOT)/ace/SOCK_Stream.i \
+ $(WRAPPER_ROOT)/ace/Reactor.i \
+ $(WRAPPER_ROOT)/ace/Proactor.h \
+ $(WRAPPER_ROOT)/ace/ReactorEx.h \
+ $(WRAPPER_ROOT)/ace/Svc_Conf_Tokens.h
.obj/Consumer_Router.o .shobj/Consumer_Router.so: Consumer_Router.cpp Consumer_Router.h \
- $(WRAPPER_ROOT)/ace/Thread_Manager.h \
- $(WRAPPER_ROOT)/ace/Thread.h \
+ $(WRAPPER_ROOT)/ace/SOCK_Acceptor.h \
+ $(WRAPPER_ROOT)/ace/SOCK_Stream.h \
+ $(WRAPPER_ROOT)/ace/SOCK_IO.h \
+ $(WRAPPER_ROOT)/ace/SOCK.h \
$(WRAPPER_ROOT)/ace/ACE.h \
$(WRAPPER_ROOT)/ace/OS.h \
$(WRAPPER_ROOT)/ace/Time_Value.h \
@@ -189,17 +214,6 @@ include $(WRAPPER_ROOT)/include/makeinclude/rules.local.GNU
$(WRAPPER_ROOT)/ace/Log_Priority.h \
$(WRAPPER_ROOT)/ace/Log_Record.i \
$(WRAPPER_ROOT)/ace/ACE.i \
- $(WRAPPER_ROOT)/ace/Synch.h \
- $(WRAPPER_ROOT)/ace/SV_Semaphore_Complex.h \
- $(WRAPPER_ROOT)/ace/SV_Semaphore_Simple.h \
- $(WRAPPER_ROOT)/ace/SV_Semaphore_Simple.i \
- $(WRAPPER_ROOT)/ace/SV_Semaphore_Complex.i \
- $(WRAPPER_ROOT)/ace/Synch_T.h \
- $(WRAPPER_ROOT)/ace/Event_Handler.h \
- $(WRAPPER_ROOT)/ace/SOCK_Acceptor.h \
- $(WRAPPER_ROOT)/ace/SOCK_Stream.h \
- $(WRAPPER_ROOT)/ace/SOCK_IO.h \
- $(WRAPPER_ROOT)/ace/SOCK.h \
$(WRAPPER_ROOT)/ace/Addr.h \
$(WRAPPER_ROOT)/ace/IPC_SAP.h \
$(WRAPPER_ROOT)/ace/IPC_SAP.i \
@@ -214,6 +228,13 @@ include $(WRAPPER_ROOT)/include/makeinclude/rules.local.GNU
$(WRAPPER_ROOT)/ace/Message_Block.h \
$(WRAPPER_ROOT)/ace/Malloc.h \
$(WRAPPER_ROOT)/ace/Malloc_T.h \
+ $(WRAPPER_ROOT)/ace/Synch.h \
+ $(WRAPPER_ROOT)/ace/SV_Semaphore_Complex.h \
+ $(WRAPPER_ROOT)/ace/SV_Semaphore_Simple.h \
+ $(WRAPPER_ROOT)/ace/SV_Semaphore_Simple.i \
+ $(WRAPPER_ROOT)/ace/SV_Semaphore_Complex.i \
+ $(WRAPPER_ROOT)/ace/Synch_T.h \
+ $(WRAPPER_ROOT)/ace/Event_Handler.h \
$(WRAPPER_ROOT)/ace/Memory_Pool.h \
$(WRAPPER_ROOT)/ace/Signal.h \
$(WRAPPER_ROOT)/ace/Set.h \
@@ -222,8 +243,23 @@ include $(WRAPPER_ROOT)/include/makeinclude/rules.local.GNU
$(WRAPPER_ROOT)/ace/Task.h \
$(WRAPPER_ROOT)/ace/Service_Object.h \
$(WRAPPER_ROOT)/ace/Shared_Object.h \
+ $(WRAPPER_ROOT)/ace/Thread_Manager.h \
+ $(WRAPPER_ROOT)/ace/Thread.h \
$(WRAPPER_ROOT)/ace/Task_T.h \
$(WRAPPER_ROOT)/ace/Message_Queue.h \
+ $(WRAPPER_ROOT)/ace/Strategies.h \
+ $(WRAPPER_ROOT)/ace/Strategies_T.h \
+ $(WRAPPER_ROOT)/ace/Service_Config.h \
+ $(WRAPPER_ROOT)/ace/Reactor.h \
+ $(WRAPPER_ROOT)/ace/Handle_Set.h \
+ $(WRAPPER_ROOT)/ace/Timer_Queue.h \
+ $(WRAPPER_ROOT)/ace/Token.h \
+ $(WRAPPER_ROOT)/ace/Pipe.h \
+ $(WRAPPER_ROOT)/ace/Pipe.i \
+ $(WRAPPER_ROOT)/ace/Reactor.i \
+ $(WRAPPER_ROOT)/ace/Proactor.h \
+ $(WRAPPER_ROOT)/ace/ReactorEx.h \
+ $(WRAPPER_ROOT)/ace/Svc_Conf_Tokens.h \
$(WRAPPER_ROOT)/ace/SPIPE.h \
$(WRAPPER_ROOT)/ace/SPIPE_Addr.h \
$(WRAPPER_ROOT)/ace/SPIPE.i \
@@ -234,21 +270,8 @@ include $(WRAPPER_ROOT)/include/makeinclude/rules.local.GNU
$(WRAPPER_ROOT)/ace/UPIPE_Acceptor.i \
$(WRAPPER_ROOT)/ace/Svc_Handler.h \
$(WRAPPER_ROOT)/ace/Synch_Options.h \
- $(WRAPPER_ROOT)/ace/Service_Config.h \
- $(WRAPPER_ROOT)/ace/Proactor.h \
- $(WRAPPER_ROOT)/ace/Timer_Queue.h \
- $(WRAPPER_ROOT)/ace/Timer_Queue.i \
- $(WRAPPER_ROOT)/ace/ReactorEx.h \
- $(WRAPPER_ROOT)/ace/Token.h \
- $(WRAPPER_ROOT)/ace/Reactor.h \
- $(WRAPPER_ROOT)/ace/Handle_Set.h \
- $(WRAPPER_ROOT)/ace/Pipe.h \
- $(WRAPPER_ROOT)/ace/Pipe.i \
- $(WRAPPER_ROOT)/ace/Reactor.i \
- $(WRAPPER_ROOT)/ace/Svc_Conf_Tokens.h \
Peer_Router.h \
$(WRAPPER_ROOT)/ace/Acceptor.h \
- $(WRAPPER_ROOT)/ace/Strategies.h \
$(WRAPPER_ROOT)/ace/Acceptor.i \
$(WRAPPER_ROOT)/ace/Map_Manager.h \
Options.h \
@@ -278,20 +301,12 @@ include $(WRAPPER_ROOT)/include/makeinclude/rules.local.GNU
$(WRAPPER_ROOT)/ace/SV_Semaphore_Simple.i \
$(WRAPPER_ROOT)/ace/SV_Semaphore_Complex.i \
$(WRAPPER_ROOT)/ace/Synch_T.h \
- $(WRAPPER_ROOT)/ace/Set.h \
- $(WRAPPER_ROOT)/ace/Proactor.h \
- $(WRAPPER_ROOT)/ace/Message_Block.h \
- $(WRAPPER_ROOT)/ace/Malloc.h \
- $(WRAPPER_ROOT)/ace/Malloc_T.h \
- $(WRAPPER_ROOT)/ace/Memory_Pool.h \
$(WRAPPER_ROOT)/ace/Signal.h \
- $(WRAPPER_ROOT)/ace/Mem_Map.h \
- $(WRAPPER_ROOT)/ace/Timer_Queue.h \
- $(WRAPPER_ROOT)/ace/Timer_Queue.i \
- $(WRAPPER_ROOT)/ace/ReactorEx.h \
- $(WRAPPER_ROOT)/ace/Token.h \
+ $(WRAPPER_ROOT)/ace/Set.h \
$(WRAPPER_ROOT)/ace/Reactor.h \
$(WRAPPER_ROOT)/ace/Handle_Set.h \
+ $(WRAPPER_ROOT)/ace/Timer_Queue.h \
+ $(WRAPPER_ROOT)/ace/Token.h \
$(WRAPPER_ROOT)/ace/Pipe.h \
$(WRAPPER_ROOT)/ace/Pipe.i \
$(WRAPPER_ROOT)/ace/SOCK_Stream.h \
@@ -305,6 +320,17 @@ include $(WRAPPER_ROOT)/include/makeinclude/rules.local.GNU
$(WRAPPER_ROOT)/ace/INET_Addr.h \
$(WRAPPER_ROOT)/ace/SOCK_Stream.i \
$(WRAPPER_ROOT)/ace/Reactor.i \
+ $(WRAPPER_ROOT)/ace/Proactor.h \
+ $(WRAPPER_ROOT)/ace/Message_Block.h \
+ $(WRAPPER_ROOT)/ace/Malloc.h \
+ $(WRAPPER_ROOT)/ace/Malloc_T.h \
+ $(WRAPPER_ROOT)/ace/Memory_Pool.h \
+ $(WRAPPER_ROOT)/ace/Mem_Map.h \
+ $(WRAPPER_ROOT)/ace/ReactorEx.h \
+ $(WRAPPER_ROOT)/ace/Message_Queue.h \
+ $(WRAPPER_ROOT)/ace/IO_Cntl_Msg.h \
+ $(WRAPPER_ROOT)/ace/Strategies.h \
+ $(WRAPPER_ROOT)/ace/Strategies_T.h \
$(WRAPPER_ROOT)/ace/Svc_Conf_Tokens.h \
$(WRAPPER_ROOT)/ace/Get_Opt.h Options.h \
$(WRAPPER_ROOT)/ace/Profile_Timer.h \
@@ -314,9 +340,6 @@ include $(WRAPPER_ROOT)/include/makeinclude/rules.local.GNU
$(WRAPPER_ROOT)/ace/Synch_Options.h \
$(WRAPPER_ROOT)/ace/Task.h \
$(WRAPPER_ROOT)/ace/Task_T.h \
- $(WRAPPER_ROOT)/ace/Message_Queue.h \
- $(WRAPPER_ROOT)/ace/IO_Cntl_Msg.h \
- $(WRAPPER_ROOT)/ace/Strategies.h \
$(WRAPPER_ROOT)/ace/Acceptor.i \
$(WRAPPER_ROOT)/ace/SOCK_Acceptor.h \
$(WRAPPER_ROOT)/ace/Map_Manager.h
diff --git a/examples/ASX/Event_Server/Event_Server/Options.cpp b/examples/ASX/Event_Server/Event_Server/Options.cpp
index 087d8ed57c5..1255bbecf6a 100644
--- a/examples/ASX/Event_Server/Event_Server/Options.cpp
+++ b/examples/ASX/Event_Server/Event_Server/Options.cpp
@@ -6,20 +6,30 @@
#include "Options.h"
-#if defined (ACE_HAS_THREADS)
+/* static */
+Options *Options::instance_ = 0;
+
+Options *
+Options::instance (void)
+{
+ if (Options::instance_ == 0)
+ Options::instance_ = new Options;
+
+ return Options::instance_;
+}
Options::Options (void)
- : thr_count_ (4),
- t_flags_ (THR_DETACHED),
- high_water_mark_ (8 * 1024),
- low_water_mark_ (1024),
- message_size_ (128),
- initial_queue_length_ (0),
- iterations_ (100000),
- debugging_ (0),
- verbosity_ (0),
- consumer_port_ ("-p " ACE_ITOA (10000)),
- supplier_port_ ("-p " ACE_ITOA (10001))
+ : thr_count_ (4),
+ t_flags_ (THR_DETACHED),
+ high_water_mark_ (8 * 1024),
+ low_water_mark_ (1024),
+ message_size_ (128),
+ initial_queue_length_ (0),
+ iterations_ (100000),
+ debugging_ (0),
+ verbosity_ (0),
+ consumer_port_ (ACE_DEFAULT_SERVER_PORT),
+ supplier_port_ (ACE_DEFAULT_SERVER_PORT + 1)
{
}
@@ -36,7 +46,7 @@ void Options::print_results (void)
this->itimer_.elapsed_time (et);
this->itimer_.get_rusage (rusage);
- if (options.verbose ())
+ if (this->verbose ())
{
#if defined (ACE_HAS_PRUSAGE_T)
ACE_OS::printf ("final concurrency hint = %d\n", ACE_Thread::getconcurrency ());
@@ -86,9 +96,6 @@ void Options::print_results (void)
#endif /* ACE_WIN32 */
}
-/* Manages the options */
-Options options;
-
void
Options::parse_args (int argc, char *argv[])
{
@@ -104,7 +111,7 @@ Options::parse_args (int argc, char *argv[])
this->t_flags (THR_BOUND);
break;
case 'c':
- this->consumer_port (get_opt.optarg);
+ this->consumer_port (ACE_OS::atoi (get_opt.optarg));
break;
case 'd':
this->debugging_ = 1;
@@ -128,7 +135,7 @@ Options::parse_args (int argc, char *argv[])
this->t_flags (THR_NEW_LWP);
break;
case 's':
- this->supplier_port (get_opt.optarg);
+ this->supplier_port (ACE_OS::atoi (get_opt.optarg));
break;
case 'T':
if (ACE_OS::strcasecmp (get_opt.optarg, "ON") == 0)
@@ -182,5 +189,3 @@ Options::parse_args (int argc, char *argv[])
(this->t_flags () & THR_BOUND) != 0,
(this->t_flags () & THR_NEW_LWP) != 0);
}
-
-#endif /* ACE_HAS_THREADS */
diff --git a/examples/ASX/Event_Server/Event_Server/Options.h b/examples/ASX/Event_Server/Event_Server/Options.h
index 5a7a541b835..596b2b48cc3 100644
--- a/examples/ASX/Event_Server/Event_Server/Options.h
+++ b/examples/ASX/Event_Server/Event_Server/Options.h
@@ -14,61 +14,88 @@
class Options
{
public:
- Options (void);
- ~Options (void);
+ static Options *instance (void);
+
void parse_args (int argc, char *argv[]);
- void stop_timer (void);
- void start_timer (void);
+ void stop_timer (void);
+ void start_timer (void);
- void thr_count (size_t count);
+ void thr_count (size_t count);
size_t thr_count (void);
- void initial_queue_length (size_t length);
+ void initial_queue_length (size_t length);
size_t initial_queue_length (void);
- void high_water_mark (size_t size);
+ void high_water_mark (size_t size);
size_t high_water_mark (void);
- void low_water_mark (size_t size);
+ void low_water_mark (size_t size);
size_t low_water_mark (void);
- void message_size (size_t size);
+ void message_size (size_t size);
size_t message_size (void);
- void iterations (size_t n);
+ void iterations (size_t n);
size_t iterations (void);
- void t_flags (long flag);
- long t_flags (void);
+ void t_flags (long flag);
+ long t_flags (void);
- void supplier_port (char *port);
- char *supplier_port (void);
+ void supplier_port (u_short port);
+ u_short supplier_port (void);
- void consumer_port (char *port);
- char *consumer_port (void);
+ void consumer_port (u_short port);
+ u_short consumer_port (void);
- int debug (void);
- int verbose (void);
+ int debug (void);
+ int verbose (void);
- void print_results (void);
+ void print_results (void);
private:
- ACE_Profile_Timer itimer_; /* Time the process */
- size_t thr_count_; /* Number of threads to spawn */
- long t_flags_; /* Flags to thr_create() */
- size_t high_water_mark_; /* ACE_Task high water mark */
- size_t low_water_mark_; /* ACE_Task low water mark */
- size_t message_size_; /* Size of a message */
- size_t initial_queue_length_; /* Initial number of items in the queue */
- size_t iterations_; /* Number of iterations to run the test program */
- int debugging_; /* Extra debugging info */
- int verbosity_; /* Extra verbose messages */
- char *consumer_port_; /* Port that the Consumer_Router is using */
- char *supplier_port_; /* Port that the Supplier_Router is using */
-};
+ Options (void);
+ ~Options (void);
+
+ ACE_Profile_Timer itimer_;
+ // Time the process.
+
+ size_t thr_count_;
+ // Number of threads to spawn.
+
+ long t_flags_;
+ // Flags to thr_create().
+
+ size_t high_water_mark_;
+ // ACE_Task high water mark.
-extern Options options;
+ size_t low_water_mark_;
+ // ACE_Task low water mark.
+
+ size_t message_size_;
+ // Size of a message.
+
+ size_t initial_queue_length_;
+ // Initial number of items in the queue.
+
+ size_t iterations_;
+ // Number of iterations to run the test program.
+
+ int debugging_;
+ // Extra debugging info.
+
+ int verbosity_;
+ // Extra verbose messages.
+
+ u_short consumer_port_;
+ // Port that the Consumer_Router is using.
+
+ u_short supplier_port_;
+ // Port that the Supplier_Router is using.
+
+ static Options *instance_;
+ // Static Singleton.
+};
#include "Options.i"
#endif /* ACE_HAS_THREADS */
diff --git a/examples/ASX/Event_Server/Event_Server/Options.i b/examples/ASX/Event_Server/Event_Server/Options.i
index 518a309f6cb..b3ce4e2e807 100644
--- a/examples/ASX/Event_Server/Event_Server/Options.i
+++ b/examples/ASX/Event_Server/Event_Server/Options.i
@@ -4,24 +4,24 @@
/* Option manager for ustreams */
inline void
-Options::supplier_port (char *port)
+Options::supplier_port (u_short port)
{
this->supplier_port_ = port;
}
-inline char *
+inline u_short
Options::supplier_port (void)
{
return this->supplier_port_;
}
inline void
-Options::consumer_port (char *port)
+Options::consumer_port (u_short port)
{
this->consumer_port_ = port;
}
-inline char *
+inline u_short
Options::consumer_port (void)
{
return this->consumer_port_;
diff --git a/examples/ASX/Event_Server/Event_Server/Peer_Router.cpp b/examples/ASX/Event_Server/Event_Server/Peer_Router.cpp
index 38e2977140d..1cbaa77b83b 100644
--- a/examples/ASX/Event_Server/Event_Server/Peer_Router.cpp
+++ b/examples/ASX/Event_Server/Event_Server/Peer_Router.cpp
@@ -1,70 +1,148 @@
-#if !defined (_PEER_ROUTER_C)
// $Id$
+#if !defined (_PEER_ROUTER_C)
#define _PEER_ROUTER_C
#include "ace/Service_Config.h"
#include "ace/Get_Opt.h"
-
#include "Options.h"
#include "Peer_Router.h"
-#if defined (ACE_HAS_THREADS)
+int
+Peer_Router_Context::send_peers (ACE_Message_Block *mb)
+{
+ PEER_ITERATOR map_iter = this->peer_map_;
+ int bytes = 0;
+ int iterations = 0;
+
+ // Skip past the header and get the message to send.
+ ACE_Message_Block *data_block = mb->cont ();
+
+ // "Multicast" the data to *all* the registered peers.
-// Define some short-hand macros to deal with verbose templates
-// names...
+ for (PEER_ENTRY *ss = 0;
+ map_iter.next (ss) != 0;
+ map_iter.advance ())
+ {
+ if (Options::instance ()->debug ())
+ ACE_DEBUG ((LM_DEBUG,
+ "(%t) sending to peer via handle %d\n",
+ ss->ext_id_));
+ iterations++;
+ // Increment reference count before sending since the
+ // Peer_Handler might be running in its own thread of control.
+ bytes += ss->int_id_->put (data_block->duplicate ());
+ }
-#define PH PEER_HANDLER
-#define PA PEER_ACCEPTOR
-#define PAD PEER_ADDR
-#define PK PEER_KEY
-#define PM PEER_MAP
+ mb->release ();
+ return bytes == 0 ? 0 : bytes / iterations;
+}
-template <class PH, class PK> int
-Acceptor_Factory<PH, PK>::init (int argc, char *argv[])
+int
+Peer_Router_Context::unbind_peer (ROUTING_KEY key)
{
- ACE_Get_Opt get_opt (argc, argv, "dp:", 0);
- ACE_INET_Addr addr;
-
- for (int c; (c = get_opt ()) != -1; )
- switch (c)
- {
- case 'p':
- addr.set (ACE_OS::atoi (get_opt.optarg));
- break;
- case 'd':
- break;
- default:
- break;
- }
-
- if (this->open (addr, ACE_Service_Config::reactor ()) == -1)
- ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "open"), -1);
- return 0;
+ return this->peer_map_.unbind (key);
}
-template <class PH, class PK>
-Acceptor_Factory<PH, PK>::Acceptor_Factory (Peer_Router<PH, PK> *pr)
- : pr_ (pr)
+void
+Peer_Router_Context::release (void)
{
+ this->reference_count_--;
+ if (this->reference_count_ == 0)
+ delete this;
}
-template <class PH, class PK> Peer_Router<PH, PK> *
-Acceptor_Factory<PH, PK>::router (void)
-{
- return this->pr_;
+int
+Peer_Router_Context::bind_peer (ROUTING_KEY key,
+ Peer_Handler *peer_handler)
+{
+ return this->peer_map_.bind (key, peer_handler);
}
-template <class ROUTER, class KEY>
-Peer_Handler<ROUTER, KEY>::Peer_Handler (ACE_Thread_Manager *tm)
- : inherited (tm)
+Peer_Router_Context::Peer_Router_Context (u_short port)
+ : reference_count_ (2) // 1 Consumer + 1 Supplier
{
+ // Perform initializations.
+
+ if (this->open (ACE_INET_Addr (port)) == -1)
+ ACE_ERROR ((LM_ERROR, "%p\n", "Acceptor::open"));
+
+ else if (this->peer_map_.open () == -1)
+ ACE_ERROR ((LM_ERROR, "%p\n", "Map_Manager::open"));
+
+ else
+ {
+ ACE_INET_Addr addr;
+
+ if (this->acceptor().get_local_addr (addr) != -1)
+ ACE_DEBUG ((LM_DEBUG,
+ "(%t) initializing on port = %d, handle = %d, this = %u\n",
+ addr.get_port_number (),
+ this->acceptor().get_handle (),
+ this));
+ else
+ ACE_ERROR ((LM_ERROR,
+ "%p\n", "get_local_addr"));
+ }
+}
+
+Peer_Router_Context::~Peer_Router_Context (void)
+{
+ // Free up the handle and close down the listening socket.
+ ACE_DEBUG ((LM_DEBUG,
+ "(%t) closing down Peer_Router_Context"));
+
+ // Close down the Acceptor and take ourselves out of the Reactor.
+ this->handle_close ();
+
+ PEER_ITERATOR map_iter = this->peer_map_;
+
+ // Make sure to take all the handles out of the map.
+
+ for (PEER_ENTRY *ss = 0;
+ map_iter.next (ss) != 0;
+ map_iter.advance ())
+ {
+ if (Options::instance ()->debug ())
+ ACE_DEBUG ((LM_DEBUG,
+ "(%t) closing down peer on handle %d\n",
+ ss->ext_id_));
+
+ if (ACE_Service_Config::reactor ()->remove_handler
+ (ss->ext_id_, ACE_Event_Handler::READ_MASK) == -1)
+ ACE_ERROR ((LM_ERROR, "(%t) p\n", "remove_handle"));
+ }
+
+ // Close down the map.
+ this->peer_map_.close ();
}
-template <class ROUTER, class KEY> int
-Peer_Handler<ROUTER, KEY>::svc (void)
+Peer_Router *
+Peer_Router_Context::peer_router (void)
{
+ return this->peer_router_;
+}
+
+void
+Peer_Router_Context::peer_router (Peer_Router *pr)
+{
+ this->peer_router_ = pr;
+}
+
+Peer_Handler *
+Peer_Router_Context::make_svc_handler (void)
+{
+ return new Peer_Handler (this);
+}
+
+Peer_Handler::Peer_Handler (Peer_Router_Context *prc)
+ : prc_ (prc)
+{
+}
+
#if 0
+Peer_Handler::svc (void)
+{
ACE_Thread_Control thread_control (tm);
ACE_Message_Block *db, *hb;
@@ -74,13 +152,13 @@ Peer_Handler<ROUTER, KEY>::svc (void)
for (;;)
{
db = new Message_Block (BUFSIZ);
- hb = new Message_Block (sizeof (KEY), Message_Block::MB_PROTO, db);
+ hb = new Message_Block (sizeof (ROUTING_KEY), Message_Block::MB_PROTO, db);
if ((n = this->peer_.recv (db->rd_ptr (), db->size ())) == -1)
LM_ERROR_RETURN ((LOG_ERROR, "%p", "recv failed"), -1);
else if (n == 0) // Client has closed down the connection.
{
- if (this->router_task_->unbind_peer (this->get_handle ()) == -1)
+ if (this->prc_->peer_router ()->unbind_peer (this->get_handle ()) == -1)
LM_ERROR_RETURN ((LOG_ERROR, "%p", "unbind failed"), -1);
LM_DEBUG ((LOG_DEBUG, "(%t) shutting down \n"));
return -1; // We do not need to be deregistered by reactor
@@ -94,130 +172,133 @@ Peer_Handler<ROUTER, KEY>::svc (void)
*(long *) hb->rd_ptr () = this->get_handle (); // Structure assignment.
hb->wr_ptr (sizeof (long));
- if (this->router_task_->reply (hb) == -1)
+ if (this->prc_->peer_router ()->reply (hb) == -1)
{
- cout << "Peer_Handler.svc : router_task->reply failed" << endl ;
+ cout << "Peer_Handler.svc : peer_router->reply failed" << endl ;
return -1;
}
}
}
return 0;
-#else
- return -1;
-#endif
}
+#endif
-template <class ROUTER, class KEY> int
-Peer_Handler<ROUTER, KEY>::put (ACE_Message_Block *mb, ACE_Time_Value *)
+int
+Peer_Handler::put (ACE_Message_Block *mb, ACE_Time_Value *)
{
- return this->peer ().send_n (mb->rd_ptr (), mb->length ());
+ return this->peer ().send_n (mb->rd_ptr (),
+ mb->length ());
}
-// Create a new handler and point its ROUTER_TASK_ data member to the
-// corresponding router. Note that this router is extracted out of
-// the Acceptor_Factory * that is passed in via the
-// ACE_Acceptor::handle_input() method.
+// Initialize a newly connected handler.
-template <class ROUTER, class KEY> int
-Peer_Handler<ROUTER, KEY>::open (void *a)
+int
+Peer_Handler::open (void *)
{
char buf[BUFSIZ], *p = buf;
- if (this->router_task_->info (&p, sizeof buf) != -1)
- ACE_DEBUG ((LM_DEBUG, "(%t) creating handler for %s, fd = %d, this = %d\n",
- buf, this->get_handle (), a));
+ if (this->prc_->peer_router ()->info (&p, sizeof buf) != -1)
+ ACE_DEBUG ((LM_DEBUG, "(%t) creating handler for %s, handle = %d\n",
+ buf, this->get_handle ()));
else
ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "info"), -1);
#if 0
- if (this->activate (options.t_flags ()) == -1)
+ if (this->activate (Options::instance ()->t_flags ()) == -1)
ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "activation of thread failed"), -1);
#endif
ACE_DEBUG ((LM_DEBUG,
- "Peer_Handler::open registering with Reactor for handle_input\n"));
+ "(%t) Peer_Handler::open registering with Reactor for handle_input\n"));
+ // Register with the Reactor to receive messages from our Peer.
if (ACE_Service_Config::reactor ()->register_handler
(this, ACE_Event_Handler::READ_MASK) == -1)
ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "register_handler"), -1);
- else if (this->router_task_->bind_peer (this->get_handle (), this) == -1)
+
+ // Insert outselves into the routing map.
+ else if (this->prc_->bind_peer (this->get_handle (), this) == -1)
ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "bind_peer"), -1);
+
+ // Turn off non-blocking I/O in case it was turned on.
else if (this->peer ().disable (ACE_NONBLOCK) == -1)
ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "disable non-blocking I/O"), -1);
- return 0;
+ else
+ return 0;
}
-// Receive a message from a supplier.
+// Receive a message from a Peer.
-template <class ROUTER, class KEY> int
-Peer_Handler<ROUTER, KEY>::handle_input (ACE_HANDLE h)
+int
+Peer_Handler::handle_input (ACE_HANDLE h)
{
- ACE_DEBUG ((LM_DEBUG, "(%t) input arrived on sd %d\n", h));
+ ACE_DEBUG ((LM_DEBUG, "(%t) input arrived on handle %d\n", h));
ACE_Message_Block *db = new ACE_Message_Block (BUFSIZ);
- ACE_Message_Block *hb = new ACE_Message_Block (sizeof (KEY),
+ ACE_Message_Block *hb = new ACE_Message_Block (sizeof (ROUTING_KEY),
ACE_Message_Block::MB_PROTO, db);
- int n;
- if ((n = this->peer ().recv (db->rd_ptr (), db->size ())) == -1)
+ // Check for memory failures.
+ if (db == 0 || hb == 0)
+ {
+ delete hb;
+ delete db;
+ errno = ENOMEM;
+ return -1;
+ }
+
+ ssize_t n = this->peer ().recv (db->rd_ptr (), db->size ());
+
+ if (n == -1)
ACE_ERROR_RETURN ((LM_ERROR, "%p", "recv failed"), -1);
else if (n == 0) // Client has closed down the connection.
{
- if (this->router_task_->unbind_peer (this->get_handle ()) == -1)
+ if (this->prc_->unbind_peer (this->get_handle ()) == -1)
ACE_ERROR_RETURN ((LM_ERROR, "%p", "unbind failed"), -1);
- ACE_DEBUG ((LM_DEBUG, "(%t) shutting down %d\n", h));
+
+ ACE_DEBUG ((LM_DEBUG, "(%t) shutting down handle %d\n", h));
return -1; // Instruct the ACE_Reactor to deregister us by returning -1.
}
- else // Transform incoming buffer into a Message and pass downstream.
+ else
{
+ // Transform incoming buffer into a Message.
+
+ // First, increment the write pointer to the end of the newly
+ // read data block.
db->wr_ptr (n);
- *(ACE_HANDLE *) hb->rd_ptr () = this->get_handle (); // structure assignment.
- hb->wr_ptr (sizeof (ACE_HANDLE));
- return this->router_task_->reply (hb) == -1 ? -1 : 0;
- }
-}
-template <class PH, class PK>
-Peer_Router<PH, PK>::Peer_Router (ACE_Thread_Manager *tm)
- : ACE_Task<ACE_MT_SYNCH> (tm)
-{
-}
+ // Second, copy the "address" into the header block.
+ *(ACE_HANDLE *) hb->rd_ptr () = this->get_handle ();
-template <class PH, class PK> int
-Peer_Router<PH, PK>::send_peers (ACE_Message_Block *mb)
-{
- PEER_ITERATOR map_iter = this->peer_map_;
- int bytes = 0;
- int iterations = 0;
- ACE_Message_Block *data_block = mb->cont ();
-
- for (ACE_Map_Entry<PK, PH *> *ss = 0;
- map_iter.next (ss) != 0;
- map_iter.advance ())
- {
- if (options.debug ())
- ACE_DEBUG ((LM_DEBUG, "(%t) sending to peer via sd %d\n", ss->ext_id_));
+ // Third, update the write pointer in the header block.
+ hb->wr_ptr (sizeof (ACE_HANDLE));
- iterations++;
- bytes += ss->int_id_->put (data_block);
+ // Finally, pass the message through the stream. Note that we
+ // use <Task::put> here because this gives the method at *our*
+ // level in the stream a chance to do something with the message
+ // before it is sent up the other side. For instance, if we
+ // receive messages in the Supplier_Router, it will just call
+ // <put_next> and send them up the stream to the Consumer_Router
+ // (which broadcasts them to consumers). However, if we receive
+ // messages in the Consumer_Router, it will reply to the
+ // Consumer with an error since it's not correct for Consumers
+ // to send messages.
+ return this->prc_->peer_router ()->put (hb) == -1 ? -1 : 0;
}
-
- delete mb;
- return bytes == 0 ? 0 : bytes / iterations;
}
-template <class PH, class PK>
-Peer_Router<PH, PK>::~Peer_Router (void)
+Peer_Router::Peer_Router (Peer_Router_Context *prc)
+ : prc_ (prc)
{
+
}
-template <class PH, class PK> int
-Peer_Router<PH, PK>::fini (void)
+Peer_Router_Context *
+Peer_Router::context (void) const
{
- delete this->acceptor_;
- return 0;
+ return this->prc_;
}
-template <class PH, class PK> int
-Peer_Router<PH, PK>::control (ACE_Message_Block *mb)
+int
+Peer_Router::control (ACE_Message_Block *mb)
{
ACE_IO_Cntl_Msg *ioc = (ACE_IO_Cntl_Msg *) mb->rd_ptr ();
ACE_IO_Cntl_Msg::ACE_IO_Cntl_Cmds command;
@@ -234,46 +315,4 @@ Peer_Router<PH, PK>::control (ACE_Message_Block *mb)
return 0;
}
-template <class PH, class PK> int
-Peer_Router<PH, PK>::unbind_peer (PK key)
-{
- return this->peer_map_.unbind (key);
-}
-
-template <class PH, class PK> int
-Peer_Router<PH, PK>::bind_peer (PK key, Peer_Handler<Peer_Router<PH, PK>, PK> *ph)
-{
- PH *peer_handler = (PH *) ph;
- return this->peer_map_.bind (key, peer_handler);
-}
-
-template <class PH, class PK> int
-Peer_Router<PH, PK>::init (int argc, char *argv[])
-{
- this->acceptor_ = new ACCEPTOR (this);
-
- if (this->acceptor_->init (argc, argv) == -1
- || this->peer_map_.open () == -1)
- return -1;
- else
- {
- ACE_INET_Addr addr;
- ACE_SOCK_Acceptor &acceptor = this->acceptor_->acceptor();
-
- if (acceptor.get_local_addr (addr) != -1)
- ACE_DEBUG ((LM_DEBUG, "(%t) initializing %s, port = %d, fd = %d, this = %u\n",
- this->name (), addr.get_port_number (),
- acceptor.get_handle (), this));
- else
- ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "get_local_addr"), -1);
- }
- return 0;
-}
-
-#undef PH
-#undef PA
-#undef PAD
-#undef PK
-#undef PM
-#endif /* ACE_HAS_THREADS */
#endif /* _PEER_ROUTER_C */
diff --git a/examples/ASX/Event_Server/Event_Server/Peer_Router.h b/examples/ASX/Event_Server/Event_Server/Peer_Router.h
index 299c534f72c..32d7e85906d 100644
--- a/examples/ASX/Event_Server/Event_Server/Peer_Router.h
+++ b/examples/ASX/Event_Server/Event_Server/Peer_Router.h
@@ -1,121 +1,133 @@
/* -*- C++ -*- */
// $Id$
-
#if !defined (_PEER_ROUTER_H)
#define _PEER_ROUTER_H
#include "ace/Acceptor.h"
#include "ace/SOCK_Acceptor.h"
-#include "ace/Thread_Manager.h"
#include "ace/Map_Manager.h"
-#if defined (ACE_HAS_THREADS)
+// Type of search key for CONSUMER_MAP
+typedef ACE_HANDLE ROUTING_KEY;
-// Forward declaration.
-template <class PEER_HANDLER, class KEY>
+// Forward declarations.
class Peer_Router;
+class Peer_Router_Context;
-template <class PEER_HANDLER, class KEY>
-class Acceptor_Factory : public ACE_Acceptor<PEER_HANDLER, ACE_SOCK_ACCEPTOR>
- // = TITLE
- // Creates <PEER_HANDLERs>, which route events between peers.
-{
-public:
- Acceptor_Factory (Peer_Router<PEER_HANDLER, KEY> *pr);
- Peer_Router<PEER_HANDLER, KEY> *router (void);
-
- int init (int argc, char *argv[]);
- // Initialize the acceptor when it's linked dynamically.
-
-private:
- Peer_Router<PEER_HANDLER, KEY> *pr_;
-};
-
-template <class ROUTER, class KEY>
class Peer_Handler : public ACE_Svc_Handler<ACE_SOCK_STREAM, ACE_MT_SYNCH>
// = TITLE
- // Receive input from a Peer.
+ // Receive input from a Peer and forward to the appropriate
+ // <Peer_Router>.
{
public:
- Peer_Handler (ACE_Thread_Manager * = 0);
+ Peer_Handler (Peer_Router_Context * = 0);
+ // Initialization method.
virtual int open (void * = 0);
- // Called by the ACE_Acceptor::handle_input() to activate this object.
+ // Called by the ACE_Acceptor::handle_input() to activate this
+ // object.
virtual int handle_input (ACE_HANDLE);
// Receive input from the peer.
virtual int put (ACE_Message_Block *, ACE_Time_Value *tv = 0);
- // Send output to a peer.
+ // Send output to a peer.
protected:
- typedef ACE_Svc_Handler<ACE_SOCK_STREAM, ACE_MT_SYNCH> inherited;
-
- ROUTER *router_task_;
- // Pointer to write task.
-
-private:
- virtual int svc (void);
- // Don't need this method here...
+ Peer_Router_Context *prc_;
+ // Pointer to router context.
};
-template <class PEER_HANDLER, class PEER_KEY>
-class Peer_Router : public ACE_Task<ACE_MT_SYNCH>
+class Peer_Router_Context : public ACE_Acceptor<Peer_Handler, ACE_SOCK_ACCEPTOR>
// = TITLE
- // This abstract base class provides mechanisms for routing
- // messages to/from a ACE_Stream from/to one or more peers (which
- // are typically running on remote hosts).
+ // Defines state and behavior shared between both Tasks in a
+ // Peer_Router Module.
//
// = DESCRIPTION
- // A subclass of Peer_Router overrides the open(), close(), and
- // put() methods in order to specialize the behavior of the router
- // to meet application-specific requirements.
+ // This class also serves as an Acceptor, which creates
+ // Peer_Handlers when Peers connect.
{
public:
// = Initialization and termination methods.
- Peer_Router (ACE_Thread_Manager * = 0);
- ~Peer_Router (void);
-
- typedef Peer_Handler<Peer_Router<PEER_HANDLER, PEER_KEY>, PEER_KEY> HANDLER;
+ Peer_Router_Context (u_short port);
- virtual int unbind_peer (PEER_KEY);
- // Remove a PEER_HANDLER from the PEER_MAP.
+ virtual int unbind_peer (ROUTING_KEY);
+ // Remove the <Peer_Handler *> from the <PEER_MAP> that corresponds
+ // to the <ROUTING_KEY>.
- virtual int bind_peer (PEER_KEY, HANDLER *);
- // Add a PEER_HANDLER to the PEER_MAP
+ virtual int bind_peer (ROUTING_KEY, Peer_Handler *);
+ // Add a <Peer_Handler> to the <PEER_MAP> that's associated with the
+ // <ROUTING_KEY>.
int send_peers (ACE_Message_Block *mb);
- // Send the message block to the peer(s).
+ // Send the <ACE_Message_Block> to the peer(s).
-protected:
- virtual int control (ACE_Message_Block *);
- // Handle control messages arriving from adjacent Modules.
+ Peer_Handler *make_svc_handler (void);
+ // Create a new <Peer_Handler> for each connection.
+
+ // = Set/Get Router Task.
+ Peer_Router *peer_router ();
+ void peer_router (Peer_Router *);
+
+ void release (void);
+ // Decrement the reference count and delete <this> when count == 0;
+
+private:
+ Peer_Router *peer_router_;
+ // Pointer to the <Peer_Router> that we are accepting for.
// = Useful typedefs
- typedef ACE_Map_Manager <PEER_KEY, PEER_HANDLER *, ACE_RW_Mutex> PEER_MAP;
- typedef ACE_Map_Iterator<PEER_KEY, PEER_HANDLER *, ACE_RW_Mutex> PEER_ITERATOR;
+ typedef ACE_Map_Manager <ROUTING_KEY, Peer_Handler *, ACE_RW_Mutex> PEER_MAP;
+ typedef ACE_Map_Iterator<ROUTING_KEY, Peer_Handler *, ACE_RW_Mutex> PEER_ITERATOR;
+ typedef ACE_Map_Entry<ROUTING_KEY, Peer_Handler *> PEER_ENTRY;
PEER_MAP peer_map_;
// Map used to keep track of active peers.
- // = Dynamic linking initialization hooks inherited from ACE_Task
- virtual int init (int argc, char *argv[]);
- virtual int fini (void);
+ int reference_count_;
+ // Keep track of when we can delete ourselves.
- typedef Acceptor_Factory<PEER_HANDLER, PEER_KEY> ACCEPTOR;
+ ~Peer_Router_Context (void);
+ // Private to ensure dynamic allocation.
+};
- ACCEPTOR *acceptor_;
- // Factory for accepting new PEER_HANDLERs.
+class Peer_Router : public ACE_Task<ACE_MT_SYNCH>
+ // = TITLE
+ // This abstract base class provides mechanisms for routing
+ // messages to/from a ACE_Stream from/to one or more peers (which
+ // are typically running on remote hosts).
+ //
+ // = DESCRIPTION
+ // A subclass of Peer_Router overrides the open(), close(), and
+ // put() methods in order to specialize the behavior of the router
+ // to meet application-specific requirements.
+{
+protected:
+ Peer_Router (Peer_Router_Context *prc);
+ // Initialization method.
+
+ virtual int control (ACE_Message_Block *);
+ // Handle control messages arriving from adjacent Modules.
+
+ Peer_Router_Context *context (void) const;
+ // Returns the routing context.
+
+ typedef ACE_Task<ACE_MT_SYNCH> inherited;
+ // Helpful typedef.
private:
+ Peer_Router_Context *prc_;
+ // Reference to the context shared by the writer and reader Tasks in
+ // the Consumer and Supplier Modules.
+
// = Prevent copies and pass-by-value.
- Peer_Router (const Peer_Router<PEER_HANDLER, PEER_KEY> &) {}
- void operator= (const Peer_Router<PEER_HANDLER, PEER_KEY> &) {}
+ Peer_Router (const Peer_Router &) {}
+ void operator= (const Peer_Router &) {}
};
#if defined (ACE_TEMPLATES_REQUIRE_SOURCE)
#include "Peer_Router.cpp"
#endif /* ACE_TEMPLATES_REQUIRE_SOURCE */
-#endif /* ACE_HAS_THREADS */
+
#endif /* _PEER_ROUTER_H */
diff --git a/examples/ASX/Event_Server/Event_Server/Supplier_Router.cpp b/examples/ASX/Event_Server/Event_Server/Supplier_Router.cpp
index f9450fd99e1..a9f4ebb8521 100644
--- a/examples/ASX/Event_Server/Event_Server/Supplier_Router.cpp
+++ b/examples/ASX/Event_Server/Event_Server/Supplier_Router.cpp
@@ -1,33 +1,9 @@
-#include "Supplier_Router.h"
// $Id$
+#include "Supplier_Router.h"
#include "Options.h"
-#if defined (ACE_HAS_THREADS)
-
-typedef Acceptor_Factory<Supplier_Handler, SUPPLIER_KEY> SUPPLIER_FACTORY;
-
-int
-Supplier_Handler::open (void *a)
-{
- SUPPLIER_FACTORY *af = (SUPPLIER_FACTORY *) a;
- this->router_task_ = af->router ();
- return this->Peer_Handler<SUPPLIER_ROUTER, SUPPLIER_KEY>::open (a);
-}
-
-Supplier_Handler::Supplier_Handler (ACE_Thread_Manager *tm)
- : Peer_Handler<SUPPLIER_ROUTER, SUPPLIER_KEY> (tm)
-{
-}
-
-// Create a new router and associate it with the REACTOR parameter.
-
-Supplier_Router::Supplier_Router (ACE_Thread_Manager *tm)
- : SUPPLIER_ROUTER (tm)
-{
-}
-
-// Handle incoming messages in a separate thread.
+// Handle outgoing messages in a separate thread.
int
Supplier_Router::svc (void)
@@ -35,14 +11,19 @@ Supplier_Router::svc (void)
assert (this->is_writer ());
ACE_Thread_Control tc (this->thr_mgr ());
- ACE_Message_Block *mb = 0;
ACE_DEBUG ((LM_DEBUG, "(%t) starting svc in Supplier_Router\n"));
- while (this->getq (mb) >= 0)
+ for (ACE_Message_Block *mb = 0;
+ this->getq (mb) >= 0;
+ )
{
- ACE_DEBUG ((LM_DEBUG, "Supplier_Router is routing via send_peers\n"));
- if (this->send_peers (mb) == -1)
+ ACE_DEBUG ((LM_DEBUG,
+ "(%t) warning: Supplier_Router is forwarding a message via send_peers\n"));
+
+ // Broadcast the message to the Suppliers.
+
+ if (this->context ()->send_peers (mb) == -1)
ACE_ERROR_RETURN ((LM_ERROR,
"(%t) send_peers failed in Supplier_Router\n"),
-1);
@@ -54,25 +35,26 @@ Supplier_Router::svc (void)
// destructor.
}
-// Initialize the Router.
+Supplier_Router::Supplier_Router (Peer_Router_Context *prc)
+ : Peer_Router (prc)
+{
+}
+
+// Initialize the Supplier Router.
int
Supplier_Router::open (void *)
{
- assert (this->is_writer ());
-
- char *argv[4];
-
- argv[0] = (char *) this->name ();
- argv[1] = "-p";
- argv[2] = options.supplier_port ();
- argv[3] = 0;
-
- if (this->init (2, &argv[1]) == -1)
- return -1;
-
- // Make this an active object.
- return this->activate (options.t_flags ());
+ if (this->is_reader ())
+ // Set the Peer_Router_Context to point back to us so that all the
+ // Peer_Handler's <put> their incoming <Message_Blocks> to our
+ // reader Task.
+ this->context ()->peer_router (this);
+
+ else // if (this->is_writer ()
+ // Make this an active object to handle the error cases in a
+ // separate thread.
+ return this->activate (Options::instance ()->t_flags ());
}
// Close down the router.
@@ -80,31 +62,53 @@ Supplier_Router::open (void *)
int
Supplier_Router::close (u_long)
{
- assert (this->is_writer ());
- ACE_DEBUG ((LM_DEBUG, "(%t) closing Supplier_Router\n"));
- this->peer_map_.close ();
+ ACE_DEBUG ((LM_DEBUG, "(%t) closing Supplier_Router %s\n",
+ this->is_reader () ? "reader" : "writer"));
- // Inform the thread to shut down.
- this->msg_queue ()->deactivate ();
+ if (this->is_writer ())
+ // Inform the thread to shut down.
+ this->msg_queue ()->deactivate ();
+
+ // Both writer and reader call release(), so the context knows when
+ // to clean itself up.
+ this->context ()->release ();
return 0;
}
-// Send a MESSAGE_BLOCK to the supplier(s).
+// Send an <ACE_Message_Block> to the supplier(s).
int
-Supplier_Router::put (ACE_Message_Block *mb, ACE_Time_Value *)
+Supplier_Router::put (ACE_Message_Block *mb,
+ ACE_Time_Value *)
{
- assert (this->is_writer ());
+ // Perform the necessary control operations before passing
+ // the message up the stream.
if (mb->msg_type () == ACE_Message_Block::MB_IOCTL)
{
this->control (mb);
return this->put_next (mb);
}
- else
- // Queue up the message, which will be processed by
- // Supplier_Router::svc().
- return this->putq (mb);
+
+ // If we're the reader then we are responsible for pass messages up
+ // to the next Module's writer Task.
+
+ else if (this->is_reader ())
+ return this->put_next (mb);
+ else // if (this->is_writer ())
+ {
+ // Someone is trying to write to the Supplier. In this
+ // implementation this is considered an error. However, we'll
+ // just go ahead and forward the message to the Supplier (who
+ // hopefully is prepared to receive it).
+ ACE_DEBUG ((LM_WARNING, "(%t) warning: sending to a Supplier\n"));
+
+ // Queue up the message to processed by Supplier_Router::svc().
+ // Since we don't expect to be getting many of these messages,
+ // we queue them up and run them in a separate thread to avoid
+ // taxing the main thread.
+ return this->putq (mb);
+ }
}
// Return information about the Supplier_Router ACE_Module.
@@ -112,17 +116,16 @@ Supplier_Router::put (ACE_Message_Block *mb, ACE_Time_Value *)
int
Supplier_Router::info (char **strp, size_t length) const
{
- char buf[BUFSIZ];
- ACE_INET_Addr addr;
+ char buf[BUFSIZ];
+ ACE_INET_Addr addr;
const char *mod_name = this->name ();
- ACE_SOCK_Acceptor &sa = this->acceptor_->acceptor ();
- if (sa.get_local_addr (addr) == -1)
+ if (this->context ()->acceptor ().get_local_addr (addr) == -1)
return -1;
- ACE_OS::sprintf (buf, "%s\t %d/%s %s",
- mod_name, addr.get_port_number (), "tcp",
- "# supplier router\n");
+ ACE_OS::sprintf (buf, "%s\t %d/%s %s (%s)\n",
+ mod_name, addr.get_port_number (), "tcp",
+ "# supplier router", this->is_reader () ? "reader" : "writer");
if (*strp == 0 && (*strp = ACE_OS::strdup (mod_name)) == 0)
return -1;
@@ -130,5 +133,3 @@ Supplier_Router::info (char **strp, size_t length) const
ACE_OS::strncpy (*strp, mod_name, length);
return ACE_OS::strlen (mod_name);
}
-
-#endif /* ACE_HAS_THREADS */
diff --git a/examples/ASX/Event_Server/Event_Server/Supplier_Router.h b/examples/ASX/Event_Server/Event_Server/Supplier_Router.h
index 23da0812781..42d067a6454 100644
--- a/examples/ASX/Event_Server/Event_Server/Supplier_Router.h
+++ b/examples/ASX/Event_Server/Event_Server/Supplier_Router.h
@@ -1,8 +1,6 @@
/* -*- C++ -*- */
// $Id$
-/* The interface between a supplier and an Event Service ACE_Stream */
-
#if !defined (_SUPPLIER_ROUTER_H)
#define _SUPPLIER_ROUTER_H
@@ -12,40 +10,49 @@
#include "ace/Svc_Handler.h"
#include "Peer_Router.h"
-#if defined (ACE_HAS_THREADS)
-
-/* Forward declaration */
-class Supplier_Handler;
-
-/* Type of search key for SUPPLIER_MAP */
-typedef ACE_HANDLE SUPPLIER_KEY;
-
-/* Instantiated type for routing messages to suppliers */
-
-typedef Peer_Router<Supplier_Handler, SUPPLIER_KEY> SUPPLIER_ROUTER;
-
-class Supplier_Handler : public Peer_Handler<SUPPLIER_ROUTER, SUPPLIER_KEY>
+class Supplier_Router : public Peer_Router
+ // = TITLE
+ // Provides the interface between one or more Suppliers and the
+ // Event Server ACE_Stream.
+ //
+ // = DESCRIPTION
+ // When used on the "reader" side of a Stream, this Router Task
+ // simply forwards all messages up the stream. When used on the
+ // "writer" side, this Router Task queues up outgoing messages
+ // to suppliers and sends them in a separate thread. The reason
+ // for this is that it's really an "error" for a
+ // <Supplier_Router> to send messages to Suppliers, so we don't
+ // expect this to happen very much. when it does we use a
+ // separate thread to avoid taxing the main thread.
{
public:
- Supplier_Handler (ACE_Thread_Manager *tm = 0);
- virtual int open (void *);
-};
-
-class Supplier_Router : public SUPPLIER_ROUTER
-{
-public:
- Supplier_Router (ACE_Thread_Manager *);
+ Supplier_Router (Peer_Router_Context *prc);
+ // Initialization method.
protected:
- /* ACE_Task hooks. */
+ // = ACE_Task hooks.
+
+ // All of these methods are called via base class pointers by the
+ // ACE Stream apparatus. Therefore, we can put them in the
+ // protected section.
+
virtual int open (void *a = 0);
+ // Called by the Stream to initialize the router.
+
virtual int close (u_long flags = 0);
+ // Called by the Stream to shutdown the router.
+
virtual int put (ACE_Message_Block *msg, ACE_Time_Value * = 0);
+ // Called by the <SUPPLIER_HANDLER> to pass a message to the Router.
+ // The Router queues up this message, which is then processed in the
+ // <svc> method in a separate thread.
+
virtual int svc (void);
+ // Runs in a separate thread to dequeue messages and pass them up
+ // the stream.
- /* Dynamic linking hooks inherited from Peer_Router */
virtual int info (char **info_string, size_t length) const;
+ // Dynamic linking hook.
};
-#endif /* ACE_HAS_THREADS */
#endif /* _SUPPLIER_ROUTER_H */
diff --git a/examples/ASX/Event_Server/Event_Server/event_server.cpp b/examples/ASX/Event_Server/Event_Server/event_server.cpp
index 1e111a69fb9..d40f82b987c 100644
--- a/examples/ASX/Event_Server/Event_Server/event_server.cpp
+++ b/examples/ASX/Event_Server/Event_Server/event_server.cpp
@@ -2,7 +2,6 @@
// Test the event server.
-
#include "ace/Stream.h"
#include "ace/Service_Config.h"
#include "Options.h"
@@ -10,14 +9,12 @@
#include "Event_Analyzer.h"
#include "Supplier_Router.h"
-#if defined (ACE_HAS_THREADS)
-
typedef ACE_Stream<ACE_MT_SYNCH> MT_Stream;
typedef ACE_Module<ACE_MT_SYNCH> MT_Module;
-// Handle SIGINT and terminate the entire application.
-
class Quit_Handler : public ACE_Sig_Adapter
+ // = TITLE
+ // Handle SIGINT and terminate the entire application.
{
public:
Quit_Handler (void);
@@ -41,9 +38,9 @@ Quit_Handler::Quit_Handler (void)
int
Quit_Handler::handle_input (ACE_HANDLE)
{
- options.stop_timer ();
+ Options::instance ()->stop_timer ();
ACE_DEBUG ((LM_INFO, "(%t) closing down the test\n"));
- options.print_results ();
+ Options::instance ()->print_results ();
ACE_Service_Config::end_reactor_event_loop ();
return 0;
@@ -54,8 +51,7 @@ main (int argc, char *argv[])
{
ACE_Service_Config daemon;
- options.parse_args (argc, argv);
-
+ Options::instance ()->parse_args (argc, argv);
{
// Primary ACE_Stream for EVENT_SERVER application.
MT_Stream event_server;
@@ -63,49 +59,72 @@ main (int argc, char *argv[])
// Enable graceful shutdowns...
Quit_Handler quit_handler;
- // Create the Supplier Router module.
+ Peer_Router_Context *src;
+ // Create the Supplier_Router's routing context, which contains
+ // context shared by both the write-side and read-side of the
+ // Supplier_Router Module.
+ ACE_NEW_RETURN (src,
+ Peer_Router_Context (Options::instance ()->supplier_port ()),
+ -1);
- MT_Module *sr = new MT_Module ("Supplier_Router",
- new Supplier_Router (ACE_Service_Config::thr_mgr ()));
+ MT_Module *srm = 0;
+ // Create the Supplier Router module.
+ ACE_NEW_RETURN (srm, MT_Module
+ ("Supplier_Router",
+ new Supplier_Router (src),
+ new Supplier_Router (src)),
+ -1);
+ MT_Module *eam = 0;
// Create the Event Analyzer module.
-
- MT_Module *ea = new MT_Module ("Event_Analyzer",
- new Event_Analyzer,
- new Event_Analyzer);
-
+ ACE_NEW_RETURN (eam, MT_Module
+ ("Event_Analyzer",
+ new Event_Analyzer,
+ new Event_Analyzer),
+ -1);
+
+ Peer_Router_Context *crc;
+ // Create the Consumer_Router's routing context, which contains
+ // context shared by both the write-side and read-side of the
+ // Consumer_Router Module.
+ ACE_NEW_RETURN (crc,
+ Peer_Router_Context (Options::instance ()->consumer_port ()),
+ -1);
+
+ MT_Module *crm = 0;
// Create the Consumer Router module.
-
- MT_Module *cr = new MT_Module ("Consumer_Router",
- 0, // 0 triggers the creation of a ACE_Thru_Task...
- new Consumer_Router (ACE_Service_Config::thr_mgr ()));
+ ACE_NEW_RETURN (crm, MT_Module
+ ("Consumer_Router",
+ new Consumer_Router (crc),
+ new Consumer_Router (crc)),
+ -1);
// Push the Modules onto the event_server stream.
- if (event_server.push (sr) == -1)
+ if (event_server.push (srm) == -1)
ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "push (Supplier_Router)"), -1);
- if (event_server.push (ea) == -1)
+ if (event_server.push (eam) == -1)
ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "push (Event_Analyzer)"), -1);
- if (event_server.push (cr) == -1)
+ if (event_server.push (crm) == -1)
ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "push (Consumer_Router)"), -1);
// Set the high and low water marks appropriately.
- int wm = options.low_water_mark ();
+ int wm = Options::instance ()->low_water_mark ();
if (event_server.control (ACE_IO_Cntl_Msg::SET_LWM, &wm) == -1)
ACE_ERROR_RETURN ((LM_ERROR, "push (setting low watermark)"), -1);
- wm = options.high_water_mark ();
+ wm = Options::instance ()->high_water_mark ();
if (event_server.control (ACE_IO_Cntl_Msg::SET_HWM, &wm) == -1)
ACE_ERROR_RETURN ((LM_ERROR, "push (setting high watermark)"), -1);
- options.start_timer ();
+ Options::instance ()->start_timer ();
- // Perform the main event loop waiting for the user to type ^C or to
- // enter a line on the ACE_STDIN.
+ // Perform the main event loop waiting for the user to type ^C or
+ // to enter a line on the ACE_STDIN.
daemon.run_reactor_event_loop ();
// The destructor of event_server will close down the stream and
@@ -117,10 +136,3 @@ main (int argc, char *argv[])
ACE_DEBUG ((LM_DEBUG, "exiting main\n"));
return 0;
}
-#else
-int
-main (void)
-{
- ACE_ERROR_RETURN ((LM_ERROR, "test not defined for this platform\n"), -1);
-}
-#endif /* ACE_HAS_THREADS */
diff --git a/examples/ASX/Event_Server/README b/examples/ASX/Event_Server/README
index f97e767cdd8..8e1342fd7bc 100644
--- a/examples/ASX/Event_Server/README
+++ b/examples/ASX/Event_Server/README
@@ -1,13 +1,18 @@
The subdirectory illustrates a number of the ACE ASX framework
-features using an ACE_Stream application called the Event Server. The
-Event Server works as follows:
+features using an ACE_Stream application called the Event Server. For
+more information on the design and use of the ACE ASX framework please
+see http://www.cs.wustl.edu/~schmidt/C++-USENIX-94.ps.gz and
+http://www.cs.wustl.edu/~schmidt/DSEJ-94.ps.gz
+
+The Event Server example works as follows:
1. When the ./Event_Server/event_server executable is run it
creates two SOCK_Acceptors, which listen for and accept
- incoming connections from consumers and suppliers.
+ incoming connections from Consumers and Suppliers.
-2. The ./Event_Server/Transceiver/transceiver application may be
- started multiple times. Each call should be either:
+2. The ./Event_Server/Transceiver/transceiver application plays
+ the role of Consumer and Supplier. It can be started multiple
+ times. Each call should be either:
% transceiver -p XYZ -h hostname
@@ -15,24 +20,25 @@ Event Server works as follows:
% transceiver -p ABC -h hostname
- where XYZ and ABC are the consumer port and supplier port,
- respectively, on the event server and "hostname" is the name of the
- machine the event_server is running. I typically open up multiple
- windows.
+ where XYZ and ABC are the Consumer listening port and the Supplier
+ listening port, respectively, on the event server and "hostname" is
+ the name of the machine the event_server is running. I typically
+ run the Consumer(s) and Supplier(s) in different windows.
-3. Once the consumer(s) and supplier(s) are connected, you can type
- data from any supplier windows. This data will be routed
+3. Once the Consumer(s) and Supplier(s) are connected, you can type
+ data from any Supplier window. This data will be routed
through the Modules/Tasks in an event_server's Stream and
- be forwarded to the consumer(s).
+ be forwarded to the Consumer(s). Note that you can also
+ send messages from the Consumer(s) to Supplier(s), but the
+ Event Server will warn you about this since it's not really
+ kosher...
4. When you want to shut down the tranceivers or event server
- just type ^C (which generates a SIGINT).
+ just type ^C (which generates a SIGINT) or type any input
+ in the window running the Event Server.
-What makes this example particularly interesting is that
-once you've got the hang of this basic architecture, you can
-"push" new filtering Modules onto the event_server Stream
- and modify the application's behavior.
+What makes this example particularly interesting is that once you've
+got the hang of this basic architecture, you can "push" new filtering
+Modules onto the event_server Stream and modify the application's
+behavior transparently.
-For more information on the design and use of the ACE ASX framework
-please see http://www.cs.wustl.edu/~schmidt/C++-USENIX-94.ps.gz and
-http://www.cs.wustl.edu/~schmidt/DSEJ-94.ps.gz
diff --git a/examples/ASX/Event_Server/Transceiver/transceiver.cpp b/examples/ASX/Event_Server/Transceiver/transceiver.cpp
index 37335b8209f..673ec0a0e93 100644
--- a/examples/ASX/Event_Server/Transceiver/transceiver.cpp
+++ b/examples/ASX/Event_Server/Transceiver/transceiver.cpp
@@ -1,16 +1,62 @@
-// Test program for the event transceiver. This program can play the
// $Id$
+// Test program for the event transceiver. This program can play the
// role of either Consumer or Supplier. You can terminate this
// program by typing ^C....
-
#include "ace/Service_Config.h"
#include "ace/Connector.h"
#include "ace/SOCK_Connector.h"
#include "ace/Get_Opt.h"
-#if defined (ACE_HAS_THREADS)
+// Port number of event server.
+static u_short port_number;
+
+// Name of event server.
+static char *host_name;
+
+// Are we playing the Consumer ('C') or Supplier ('S') role?
+static char role = 'S';
+
+// Handle the command-line arguments.
+
+static void
+parse_args (int argc, char *argv[])
+{
+ ACE_Get_Opt get_opt (argc, argv, "Ch:p:S");
+
+ port_number = ACE_DEFAULT_SERVER_PORT;
+ host_name = ACE_DEFAULT_SERVER_HOST;
+
+ for (int c; (c = get_opt ()) != -1; )
+ switch (c)
+ {
+ case 'C':
+ role = c;
+ break;
+ case 'h':
+ host_name = get_opt.optarg;
+ break;
+ case 'p':
+ port_number = ACE_OS::atoi (get_opt.optarg);
+ break;
+ case 'S':
+ role = c;
+ break;
+ default:
+ ACE_ERROR ((LM_ERROR,
+ "usage: %n [-p portnum] [-h host_name]\n%a", 1));
+ /* NOTREACHED */
+ break;
+ }
+
+ // Increment by 1 if we're the supplier to mirror the default
+ // behavior of the Event_Server (which sets the Consumer port to
+ // ACE_DEFAULT_SERVER_PORT and the Supplier port to
+ // ACE_DEFAULT_SERVER_PORT + 1).
+ if (role == 'S' && port_number == ACE_DEFAULT_SERVER_PORT)
+ port_number++;
+}
class Event_Transceiver : public ACE_Svc_Handler<ACE_SOCK_STREAM, ACE_NULL_SYNCH>
// = TITLE
@@ -34,6 +80,9 @@ public:
virtual int handle_signal (int signum, siginfo_t *, ucontext_t *);
// Close down via SIGINT.
+ virtual int handle_close (ACE_HANDLE, ACE_Reactor_Mask);
+ // Close down the event loop.
+
private:
int receiver (void);
// Reads data from socket and writes to ACE_STDOUT.
@@ -42,6 +91,14 @@ private:
// Writes data from ACE_STDIN to socket.
};
+int
+Event_Transceiver::handle_close (ACE_HANDLE,
+ ACE_Reactor_Mask)
+{
+ ACE_Service_Config::end_reactor_event_loop ();
+ return 0;
+}
+
// Close down via SIGINT.
int
@@ -71,8 +128,7 @@ int
Event_Transceiver::open (void *)
{
if (ACE_Service_Config::reactor ()->register_handler
- (this->peer ().get_handle (),
- this,
+ (this->peer ().get_handle (), this,
ACE_Event_Handler::READ_MASK) == -1)
ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "register_handler"), -1);
else if (ACE::register_stdin_handler (this,
@@ -95,7 +151,8 @@ Event_Transceiver::handle_input (ACE_HANDLE handle)
int
Event_Transceiver::forwarder (void)
{
- ACE_DEBUG ((LM_DEBUG, "(%P|%t) entering transceiver forwarder\n"));
+ ACE_DEBUG ((LM_DEBUG, "(%P|%t) entering %s forwarder\n",
+ role == 'C' ? "Consumer" : "Supplier"));
char buf[BUFSIZ];
ssize_t n = ACE_OS::read (ACE_STDIN, buf, sizeof buf);
@@ -104,14 +161,16 @@ Event_Transceiver::forwarder (void)
if (n <= 0 || this->peer ().send_n (buf, n) != n)
result = -1;
- ACE_DEBUG ((LM_DEBUG, "(%P|%t) leaving transceiver forwarder\n"));
+ ACE_DEBUG ((LM_DEBUG, "(%P|%t) leaving %s forwarder\n",
+ role == 'C' ? "Consumer" : "Supplier"));
return result;
}
int
Event_Transceiver::receiver (void)
{
- ACE_DEBUG ((LM_DEBUG, "(%P|%t) entering transceiver receiver\n"));
+ ACE_DEBUG ((LM_DEBUG, "(%P|%t) entering %s receiver\n",
+ role == 'C' ? "Consumer" : "Supplier"));
char buf[BUFSIZ];
@@ -121,43 +180,11 @@ Event_Transceiver::receiver (void)
if (n <= 0 || ACE_OS::write (ACE_STDOUT, buf, n) != n)
result = -1;
- ACE_DEBUG ((LM_DEBUG, "(%P|%t) leaving transceiver receiver\n"));
+ ACE_DEBUG ((LM_DEBUG, "(%P|%t) leaving %s receiver\n",
+ role == 'C' ? "Consumer" : "Supplier"));
return result;
}
-// Port number of event server.
-static u_short port_number;
-
-// Name of event server.
-static char *host_name;
-
-// Handle the command-line arguments.
-
-static void
-parse_args (int argc, char *argv[])
-{
- ACE_Get_Opt get_opt (argc, argv, "h:p:");
-
- port_number = ACE_DEFAULT_SERVER_PORT;
- host_name = ACE_DEFAULT_SERVER_HOST;
-
- for (int c; (c = get_opt ()) != -1; )
- switch (c)
- {
- case 'h':
- host_name = get_opt.optarg;
- break;
- case 'p':
- port_number = ACE_OS::atoi (get_opt.optarg);
- break;
- default:
- ACE_ERROR ((LM_ERROR,
- "usage: %n [-p portnum] [-h host_name]\n%a", 1));
- /* NOTREACHED */
- break;
- }
-}
-
int
main (int argc, char *argv[])
{
@@ -169,7 +196,8 @@ main (int argc, char *argv[])
ACE_Connector<Event_Transceiver, ACE_SOCK_CONNECTOR> connector;
Event_Transceiver transceiver;
- if (connector.connect (&transceiver, ACE_INET_Addr (port_number, host_name)) == -1)
+ if (connector.connect (&transceiver,
+ ACE_INET_Addr (port_number, host_name)) == -1)
ACE_ERROR_RETURN ((LM_ERROR, "%p\n", host_name), 1);
// Run event loop until either the event server shuts down or we get
@@ -177,11 +205,3 @@ main (int argc, char *argv[])
ACE_Service_Config::run_reactor_event_loop ();
return 0;
}
-#else
-int
-main (void)
-{
- ACE_ERROR ((LM_ERROR, "test not defined for this platform\n"));
- return 0;
-}
-#endif /* ACE_HAS_THREADS */
diff --git a/examples/Connection/non_blocking/CPP-connector.cpp b/examples/Connection/non_blocking/CPP-connector.cpp
index b2628abfa52..130692028f2 100644
--- a/examples/Connection/non_blocking/CPP-connector.cpp
+++ b/examples/Connection/non_blocking/CPP-connector.cpp
@@ -1,9 +1,8 @@
-#if !defined (CPP_CONNECTOR_C)
// $Id$
+#if !defined (CPP_CONNECTOR_C)
#define CPP_CONNECTOR_C
-
#include "CPP-connector.h"
#define PR_ST_1 ACE_PEER_STREAM_1
@@ -27,7 +26,8 @@ Peer_Handler<PR_ST_2>::open (void *)
this->action_ = &Peer_Handler<PR_ST_2>::connected;
if (this->reactor ())
- this->reactor ()->register_handler (this, ACE_Event_Handler::WRITE_MASK);
+ this->reactor ()->register_handler
+ (this, ACE_Event_Handler::WRITE_MASK);
else
{
while (this->connected () != -1)
@@ -48,10 +48,11 @@ template <PR_ST_1> int
Peer_Handler<PR_ST_2>::disconnecting (void)
{
char buf[BUFSIZ];
- int n;
+ ssize_t n = this->peer ().recv (buf, sizeof buf);
- if ((n = this->peer ().recv (buf, sizeof buf)) > 0)
+ if (n > 0)
ACE_OS::write (ACE_STDOUT, buf, n);
+
this->action_ = &Peer_Handler<PR_ST_2>::idle;
return -1;
}
@@ -67,12 +68,12 @@ template <PR_ST_1> int
Peer_Handler<PR_ST_2>::connected (void)
{
char buf[BUFSIZ];
- int n;
ACE_DEBUG ((LM_DEBUG, "please enter input..: "));
- if ((n = ACE_OS::read (ACE_STDIN, buf, sizeof buf)) > 0
- && this->peer ().send_n (buf, n) != n)
+ ssize_t n = ACE_OS::read (ACE_STDIN, buf, sizeof buf);
+
+ if (n > 0 && this->peer ().send_n (buf, n) != n)
ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "write failed"), -1);
else if (n == 0) /* Explicitly close the connection. */
{
@@ -89,13 +90,12 @@ template <PR_ST_1> int
Peer_Handler<PR_ST_2>::stdio (void)
{
char buf[BUFSIZ];
- int n;
- ACE_DEBUG ((LM_DEBUG, "stdio!\n"));
-
- ACE_DEBUG ((LM_DEBUG, "please enter input..: "));
+ ACE_DEBUG ((LM_DEBUG, "stdio!\nplease enter input..: "));
- if ((n = ACE_OS::read (ACE_STDIN, buf, sizeof buf)) > 0)
+ ssize_t n = ACE_OS::read (ACE_STDIN, buf, sizeof buf);
+
+ if (n > 0)
{
ACE_OS::write (ACE_STDOUT, buf, n);
return 0;
@@ -125,13 +125,20 @@ Peer_Handler<PR_ST_2>::handle_close (ACE_HANDLE,
ACE_Reactor_Mask mask)
{
ACE_DEBUG ((LM_DEBUG, "closing down (%d)\n", mask));
+
+ // When the socket closes down, then we'll switch over to reading
+ // from stdin and writing to stdout.
if (ACE_BIT_ENABLED (mask, ACE_Event_Handler::WRITE_MASK))
{
this->action_ = &Peer_Handler<PR_ST_2>::stdio;
this->peer ().close ();
ACE_OS::rewind (stdin);
- return this->reactor () && this->reactor ()->register_handler
- (ACE_STDIN, this, ACE_Event_Handler::READ_MASK);
+
+ if (this->reactor ())
+ return ACE::register_stdin_handler
+ (this, this->reactor (), ACE_Service_Config::thr_mgr ());
+ else
+ return 0;
}
else if (ACE_BIT_ENABLED (mask, ACE_Event_Handler::READ_MASK))
delete this;
@@ -167,8 +174,11 @@ IPC_Client<SH, PR_CO_2>::init (int argc, char *argv[])
this->inherited::open (ACE_Service_Config::reactor ());
char *r_addr = argc > 1 ? argv[1] :
- ACE_SERVER_ADDRESS (ACE_DEFAULT_SERVER_HOST, ACE_DEFAULT_SERVER_PORT_STR);
- ACE_Time_Value timeout (argc > 2 ? ACE_OS::atoi (argv[2]) : ACE_DEFAULT_TIMEOUT);
+ ACE_SERVER_ADDRESS (ACE_DEFAULT_SERVER_HOST,
+ ACE_DEFAULT_SERVER_PORT_STR);
+ ACE_Time_Value timeout (argc > 2
+ ? ACE_OS::atoi (argv[2])
+ : ACE_DEFAULT_TIMEOUT);
char *l_addr = argc > 3 ? argv[3] : ACE_DEFAULT_LOCAL_PORT_STR;
// Handle signals through the ACE_Reactor.
diff --git a/examples/Logger/Acceptor-server/server_loggerd.cpp b/examples/Logger/Acceptor-server/server_loggerd.cpp
index 20522d9168c..89ed3a859a7 100644
--- a/examples/Logger/Acceptor-server/server_loggerd.cpp
+++ b/examples/Logger/Acceptor-server/server_loggerd.cpp
@@ -242,7 +242,7 @@ main (int argc, char *argv[])
ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "open"), -1);
else if (REACTOR::instance ()->register_handler
- (&peer_acceptor, ACE_Event_Handler::READ_MASK) == -1)
+ (&peer_acceptor, ACE_Event_Handler::ACCEPT_MASK) == -1)
ACE_ERROR_RETURN ((LM_ERROR,
"registering service with ACE_Reactor\n"), -1);
diff --git a/examples/Logger/simple-server/Logging_Acceptor.cpp b/examples/Logger/simple-server/Logging_Acceptor.cpp
index 641ed58256c..c971f3537e9 100644
--- a/examples/Logger/simple-server/Logging_Acceptor.cpp
+++ b/examples/Logger/simple-server/Logging_Acceptor.cpp
@@ -34,7 +34,7 @@ Logging_Acceptor::handle_close (ACE_HANDLE, ACE_Reactor_Mask)
Logging_Acceptor::~Logging_Acceptor (void)
{
this->handle_close (ACE_INVALID_HANDLE,
- ACE_Event_Handler::READ_MASK);
+ ACE_Event_Handler::ACCEPT_MASK);
}
// Returns underlying device descriptor.
diff --git a/examples/Logger/simple-server/server_loggerd.cpp b/examples/Logger/simple-server/server_loggerd.cpp
index 2d65a8b56db..dedb8366289 100644
--- a/examples/Logger/simple-server/server_loggerd.cpp
+++ b/examples/Logger/simple-server/server_loggerd.cpp
@@ -1,9 +1,9 @@
-// This server daemon collects, formats, and displays logging
// $Id$
+// This server daemon collects, formats, and displays logging
// information forwarded from client daemons running on other hosts in
// the network.
-
+//
// In addition, it also illustrates how the ACE_Reactor framework is
// used.
@@ -47,7 +47,7 @@ main (int argc, char *argv[])
if (peer_acceptor.open (addr) == -1)
ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "open"), -1);
else if (REACTOR::instance ()->register_handler
- (&peer_acceptor, ACE_Event_Handler::READ_MASK) == -1)
+ (&peer_acceptor, ACE_Event_Handler::ACCEPT_MASK) == -1)
ACE_ERROR_RETURN ((LM_ERROR, "registering service with ACE_Reactor\n"), -1);
// Run forever, performing logging service.
diff --git a/examples/Reactor/Ntalker/ntalker.cpp b/examples/Reactor/Ntalker/ntalker.cpp
index 3234c201048..f81a2c75376 100644
--- a/examples/Reactor/Ntalker/ntalker.cpp
+++ b/examples/Reactor/Ntalker/ntalker.cpp
@@ -115,10 +115,9 @@ Handle_Events::Handle_Events (u_short udp_port,
if (this->mcast_.subscribe (sockmc_addr, 1, interface) == -1)
ACE_OS::perror ("can't subscribe to multicast group"), ACE_OS::exit (1);
- // disable loopbacks
-
-// if (this->mcast_.set_option (IP_MULTICAST_LOOP, 0) == -1 )
-// ACE_OS::perror (" can't disable loopbacks " ), ACE_OS::exit (1);
+ // Disable loopbacks.
+ // if (this->mcast_.set_option (IP_MULTICAST_LOOP, 0) == -1 )
+ // ACE_OS::perror (" can't disable loopbacks " ), ACE_OS::exit (1);
this->handle_set_.set_bit (0);
this->handle_set_.set_bit (this->mcast_.get_handle ());
diff --git a/examples/Service_Configurator/IPC-tests/server/Handle_Broadcast.i b/examples/Service_Configurator/IPC-tests/server/Handle_Broadcast.i
index cbd5ffdb04b..306222534d7 100644
--- a/examples/Service_Configurator/IPC-tests/server/Handle_Broadcast.i
+++ b/examples/Service_Configurator/IPC-tests/server/Handle_Broadcast.i
@@ -62,7 +62,7 @@ Handle_Broadcast::init (int argc, char *argv[])
if (this->open (sba) == -1)
ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "open"), -1);
else if (ACE_Service_Config::reactor ()->register_handler
- (this, ACE_Event_Handler::READ_MASK) == -1)
+ (this, ACE_Event_Handler::ACCEPT_MASK) == -1)
ACE_ERROR_RETURN ((LM_ERROR, "registering service with ACE_Reactor\n"), -1);
return 0;
}
@@ -71,7 +71,7 @@ ACE_INLINE int
Handle_Broadcast::fini (void)
{
return ACE_Service_Config::reactor ()->remove_handler
- (this, ACE_Event_Handler::READ_MASK);
+ (this, ACE_Event_Handler::ACCEPT_MASK);
}
ACE_INLINE int
diff --git a/examples/Service_Configurator/IPC-tests/server/Handle_L_CODgram.i b/examples/Service_Configurator/IPC-tests/server/Handle_L_CODgram.i
index 8b631a6f252..7ebf45ba2f3 100644
--- a/examples/Service_Configurator/IPC-tests/server/Handle_L_CODgram.i
+++ b/examples/Service_Configurator/IPC-tests/server/Handle_L_CODgram.i
@@ -62,7 +62,7 @@ Handle_L_CODgram::init (int argc, char *argv[])
if (this->open (sucd) == -1)
ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "open"), -1);
else if (ACE_Service_Config::reactor ()->register_handler
- (this, ACE_Event_Handler::READ_MASK) == -1)
+ (this, ACE_Event_Handler::ACCEPT_MASK) == -1)
ACE_ERROR_RETURN ((LM_ERROR,
"registering service with ACE_Reactor\n"), -1);
return 0;
@@ -72,7 +72,7 @@ ACE_INLINE int
Handle_L_CODgram::fini(void)
{
return ACE_Service_Config::reactor ()->remove_handler
- (this, ACE_Event_Handler::READ_MASK);
+ (this, ACE_Event_Handler::ACCEPT_MASK);
}
ACE_INLINE ACE_HANDLE
diff --git a/examples/Service_Configurator/IPC-tests/server/Handle_L_Dgram.i b/examples/Service_Configurator/IPC-tests/server/Handle_L_Dgram.i
index e3ecb36ffd9..8946947f4fd 100644
--- a/examples/Service_Configurator/IPC-tests/server/Handle_L_Dgram.i
+++ b/examples/Service_Configurator/IPC-tests/server/Handle_L_Dgram.i
@@ -61,7 +61,7 @@ Handle_L_Dgram::init (int argc, char *argv[])
if (this->open (sudg) == -1)
ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "open"), -1);
else if (ACE_Service_Config::reactor ()->register_handler (this,
- ACE_Event_Handler::READ_MASK) == -1)
+ ACE_Event_Handler::ACCEPT_MASK) == -1)
ACE_ERROR_RETURN ((LM_ERROR, "registering service with ACE_Reactor\n"), -1);
return 0;
}
@@ -69,7 +69,7 @@ Handle_L_Dgram::init (int argc, char *argv[])
ACE_INLINE int
Handle_L_Dgram::fini (void)
{
- return ACE_Service_Config::reactor ()->remove_handler (this, ACE_Event_Handler::READ_MASK);
+ return ACE_Service_Config::reactor ()->remove_handler (this, ACE_Event_Handler::ACCEPT_MASK);
}
ACE_INLINE int
diff --git a/examples/Service_Configurator/IPC-tests/server/Handle_L_FIFO.i b/examples/Service_Configurator/IPC-tests/server/Handle_L_FIFO.i
index f8044d312d9..8b667cb6681 100644
--- a/examples/Service_Configurator/IPC-tests/server/Handle_L_FIFO.i
+++ b/examples/Service_Configurator/IPC-tests/server/Handle_L_FIFO.i
@@ -55,7 +55,7 @@ Handle_L_FIFO::init (int argc, char *argv[])
if (this->open (rendezvous_fifo) == -1)
ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "open"), -1);
else if (ACE_Service_Config::reactor ()->register_handler (this,
- ACE_Event_Handler::READ_MASK) == -1)
+ ACE_Event_Handler::ACCEPT_MASK) == -1)
ACE_ERROR_RETURN ((LM_ERROR, "registering service with ACE_Reactor\n"), -1);
return 0;
}
@@ -63,7 +63,7 @@ Handle_L_FIFO::init (int argc, char *argv[])
ACE_INLINE int
Handle_L_FIFO::fini (void)
{
- return ACE_Service_Config::reactor ()->remove_handler (this, ACE_Event_Handler::READ_MASK);
+ return ACE_Service_Config::reactor ()->remove_handler (this, ACE_Event_Handler::ACCEPT_MASK);
}
ACE_INLINE int
diff --git a/examples/Service_Configurator/IPC-tests/server/Handle_L_Pipe.i b/examples/Service_Configurator/IPC-tests/server/Handle_L_Pipe.i
index ff59a5ffd6b..048f66cbcff 100644
--- a/examples/Service_Configurator/IPC-tests/server/Handle_L_Pipe.i
+++ b/examples/Service_Configurator/IPC-tests/server/Handle_L_Pipe.i
@@ -1,7 +1,6 @@
/* -*- C++ -*- */
// $Id$
-
#include "ace/Get_Opt.h"
ACE_INLINE
@@ -78,7 +77,7 @@ Handle_L_Pipe::init (int argc, char *argv[])
if (this->open (sup) == -1)
ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "open"), -1);
else if (ACE_Service_Config::reactor ()->register_handler
- (this, ACE_Event_Handler::READ_MASK) == -1)
+ (this, ACE_Event_Handler::ACCEPT_MASK) == -1)
ACE_ERROR_RETURN ((LM_ERROR,
"registering service with ACE_Reactor\n"), -1);
return 0;
@@ -88,7 +87,7 @@ ACE_INLINE int
Handle_L_Pipe::fini (void)
{
return ACE_Service_Config::reactor ()->remove_handler
- (this, ACE_Event_Handler::READ_MASK);
+ (this, ACE_Event_Handler::ACCEPT_MASK);
}
ACE_INLINE int
diff --git a/examples/Service_Configurator/IPC-tests/server/Handle_L_SPIPE.i b/examples/Service_Configurator/IPC-tests/server/Handle_L_SPIPE.i
index 9bb576deccf..e129c26521c 100644
--- a/examples/Service_Configurator/IPC-tests/server/Handle_L_SPIPE.i
+++ b/examples/Service_Configurator/IPC-tests/server/Handle_L_SPIPE.i
@@ -60,8 +60,8 @@ Handle_L_SPIPE::init (int argc, char *argv[])
susp.set (rendezvous);
if (this->open (susp) == -1)
ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "open"), -1);
- else if (ACE_Service_Config::reactor ()->register_handler (this,
- ACE_Event_Handler::READ_MASK) == -1)
+ else if (ACE_Service_Config::reactor ()->register_handler
+ (this, ACE_Event_Handler::ACCEPT_MASK) == -1)
ACE_ERROR_RETURN ((LM_ERROR, "registering service with ACE_Reactor\n"), -1);
return 0;
}
@@ -69,7 +69,8 @@ Handle_L_SPIPE::init (int argc, char *argv[])
ACE_INLINE int
Handle_L_SPIPE::fini (void)
{
- return ACE_Service_Config::reactor ()->remove_handler (this, ACE_Event_Handler::READ_MASK);
+ return ACE_Service_Config::reactor ()->remove_handler
+ (this, ACE_Event_Handler::ACCEPT_MASK);
}
ACE_INLINE int
diff --git a/examples/Service_Configurator/IPC-tests/server/Handle_L_Stream.i b/examples/Service_Configurator/IPC-tests/server/Handle_L_Stream.i
index 60dbe34fb47..3fd2c09e2a7 100644
--- a/examples/Service_Configurator/IPC-tests/server/Handle_L_Stream.i
+++ b/examples/Service_Configurator/IPC-tests/server/Handle_L_Stream.i
@@ -71,7 +71,7 @@ Handle_L_Stream::init (int argc, char *argv[])
if (this->open (sus) == -1)
ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "open"), -1);
else if (ACE_Service_Config::reactor ()->register_handler
- (this, ACE_Event_Handler::READ_MASK) == -1)
+ (this, ACE_Event_Handler::ACCEPT_MASK) == -1)
ACE_ERROR_RETURN ((LM_ERROR,
"registering service with ACE_Reactor\n"), -1);
return 0;
@@ -81,7 +81,7 @@ ACE_INLINE int
Handle_L_Stream::fini (void)
{
return ACE_Service_Config::reactor ()->remove_handler
- (this, ACE_Event_Handler::READ_MASK);
+ (this, ACE_Event_Handler::ACCEPT_MASK);
}
ACE_INLINE ACE_HANDLE
diff --git a/examples/Service_Configurator/IPC-tests/server/Handle_R_Dgram.i b/examples/Service_Configurator/IPC-tests/server/Handle_R_Dgram.i
index c683f7bbab6..aaf11ceacb0 100644
--- a/examples/Service_Configurator/IPC-tests/server/Handle_R_Dgram.i
+++ b/examples/Service_Configurator/IPC-tests/server/Handle_R_Dgram.i
@@ -57,7 +57,7 @@ Handle_R_Dgram::init (int argc, char *argv[])
if (this->open (sidg) == -1)
ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "open"), -1);
else if (ACE_Service_Config::reactor ()->register_handler
- (this, ACE_Event_Handler::READ_MASK) == -1)
+ (this, ACE_Event_Handler::ACCEPT_MASK) == -1)
ACE_ERROR_RETURN ((LM_ERROR, "registering service with ACE_Reactor\n"), -1);
return 0;
}
@@ -66,7 +66,7 @@ ACE_INLINE int
Handle_R_Dgram::fini (void)
{
return ACE_Service_Config::reactor ()->remove_handler
- (this, ACE_Event_Handler::READ_MASK);
+ (this, ACE_Event_Handler::ACCEPT_MASK);
}
ACE_INLINE int
diff --git a/examples/Service_Configurator/IPC-tests/server/Handle_R_Stream.i b/examples/Service_Configurator/IPC-tests/server/Handle_R_Stream.i
index 8c4439f8c0f..aa17b7ca160 100644
--- a/examples/Service_Configurator/IPC-tests/server/Handle_R_Stream.i
+++ b/examples/Service_Configurator/IPC-tests/server/Handle_R_Stream.i
@@ -58,8 +58,8 @@ Handle_R_Stream::init (int argc, char *argv[])
if (this->open (sis) == -1)
ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "open"), -1);
- else if (ACE_Service_Config::reactor ()->register_handler (this,
- ACE_Event_Handler::READ_MASK) == -1)
+ else if (ACE_Service_Config::reactor ()->register_handler
+ (this, ACE_Event_Handler::ACCEPT_MASK) == -1)
ACE_ERROR_RETURN ((LM_ERROR, "registering service with ACE_Reactor\n"), -1);
return 0;
}
@@ -67,7 +67,8 @@ Handle_R_Stream::init (int argc, char *argv[])
ACE_INLINE int
Handle_R_Stream::fini (void)
{
- return ACE_Service_Config::reactor ()->remove_handler (this, ACE_Event_Handler::READ_MASK);
+ return ACE_Service_Config::reactor ()->remove_handler
+ (this, ACE_Event_Handler::ACCEPT_MASK);
}
ACE_INLINE int
diff --git a/examples/Service_Configurator/IPC-tests/server/Handle_Thr_Stream.cpp b/examples/Service_Configurator/IPC-tests/server/Handle_Thr_Stream.cpp
index 94b41367a5c..f7e2d98f69a 100644
--- a/examples/Service_Configurator/IPC-tests/server/Handle_Thr_Stream.cpp
+++ b/examples/Service_Configurator/IPC-tests/server/Handle_Thr_Stream.cpp
@@ -1,6 +1,6 @@
-#if !defined (ACE_HANDLE_THR_STREAM_C)
// $Id$
+#if !defined (ACE_HANDLE_THR_STREAM_C)
#define ACE_HANDLE_THR_STREAM_C
#include "ace/Get_Opt.h"
@@ -13,31 +13,20 @@
#include "Handle_Thr_Stream.i"
#endif /* __ACE_INLINE__ */
-// Shorthand names.
-#define SH SVC_HANDLER
-#define PR_AC_1 ACE_PEER_ACCEPTOR_1
-#define PR_AC_2 ACE_PEER_ACCEPTOR_2
-#define PR_ST_1 ACE_PEER_STREAM_1
-#define PR_ST_2 ACE_PEER_STREAM_2
-
template <class SH, PR_AC_1>
-Handle_Thr_Stream<SH, PR_AC_2>::~Handle_Thr_Stream (void)
+Handle_Thr_Acceptor<SH, PR_AC_2>::~Handle_Thr_Acceptor (void)
{
}
template <class SH, PR_AC_1>
-Handle_Thr_Stream<SH, PR_AC_2>::Handle_Thr_Stream (void)
-#if defined (ACE_HAS_THREADS)
+Handle_Thr_Acceptor<SH, PR_AC_2>::Handle_Thr_Acceptor (void)
: thr_flags_ (THR_DETACHED | THR_NEW_LWP)
-#else
- : thr_flags_ (0)
-#endif /* ACE_HAS_THREADS */
{
}
template <class SH, PR_AC_1> int
-Handle_Thr_Stream<SH, PR_AC_2>::info (char **strp,
- size_t length) const
+Handle_Thr_Acceptor<SH, PR_AC_2>::info (char **strp,
+ size_t length) const
{
char buf[BUFSIZ];
ACE_INET_Addr sa;
@@ -56,7 +45,7 @@ Handle_Thr_Stream<SH, PR_AC_2>::info (char **strp,
}
template <class SH, PR_AC_1> int
-Handle_Thr_Stream<SH, PR_AC_2>::init (int argc, char *argv[])
+Handle_Thr_Acceptor<SH, PR_AC_2>::init (int argc, char *argv[])
{
ACE_INET_Addr local_addr (inherited::DEFAULT_PORT_);
int n_threads = ACE_DEFAULT_THREADS;
@@ -64,19 +53,17 @@ Handle_Thr_Stream<SH, PR_AC_2>::init (int argc, char *argv[])
ACE_Get_Opt get_opt (argc, argv, "p:t:", 0);
for (int c; (c = get_opt ()) != -1; )
- {
- switch (c)
- {
- case 'p':
- local_addr.set (ACE_OS::atoi (get_opt.optarg));
- break;
- case 't':
- n_threads = ACE_OS::atoi (get_opt.optarg);
- break;
- default:
- break;
- }
- }
+ switch (c)
+ {
+ case 'p':
+ local_addr.set (ACE_OS::atoi (get_opt.optarg));
+ break;
+ case 't':
+ n_threads = ACE_OS::atoi (get_opt.optarg);
+ break;
+ default:
+ break;
+ }
// Initialize the threading strategy.
if (this->thr_strategy_.open (&this->thr_mgr_,
@@ -94,10 +81,10 @@ Handle_Thr_Stream<SH, PR_AC_2>::init (int argc, char *argv[])
}
template <class SH, PR_AC_1> int
-Handle_Thr_Stream<SH, PR_AC_2>::fini (void)
+Handle_Thr_Acceptor<SH, PR_AC_2>::fini (void)
{
return ACE_Service_Config::reactor ()->remove_handler
- (this, ACE_Event_Handler::READ_MASK);
+ (this, ACE_Event_Handler::ACCEPT_MASK);
}
template <PR_ST_1>
@@ -112,7 +99,7 @@ CLI_Stream<PR_ST_2>::close (u_long)
ACE_DEBUG ((LM_DEBUG, "(%t) client stream object closing down\n"));
this->peer ().close ();
- /* Must be allocated dynamically! */
+ // Must be allocated dynamically!
delete this;
return 0;
}
@@ -159,12 +146,6 @@ CLI_Stream<PR_ST_2>::svc (void)
return 0;
}
-#undef SH
-#undef PR_AC_1
-#undef PR_AC_2
-#undef PR_ST_1
-#undef PR_ST_2
-
//----------------------------------------
#if defined (ACE_HAS_TLI)
@@ -181,19 +162,19 @@ CLI_Stream<PR_ST_2>::svc (void)
#include "ace/INET_Addr.h"
typedef CLI_Stream <THR_STREAM> CLI_STREAM;
-typedef Handle_Thr_Stream<CLI_STREAM, THR_ACCEPTOR> HANDLE_THR_STREAM;
+typedef Handle_Thr_Acceptor<CLI_STREAM, THR_ACCEPTOR> HANDLE_THR_ACCEPTOR;
-/* Static class variables */
+// Static class variables.
-u_short HANDLE_THR_STREAM::DEFAULT_PORT_ = ACE_DEFAULT_THR_PORT;
+u_short HANDLE_THR_ACCEPTOR::DEFAULT_PORT_ = ACE_DEFAULT_THR_PORT;
-/* Service object */
-HANDLE_THR_STREAM remote_thr_stream;
+// Service object.
+HANDLE_THR_ACCEPTOR remote_thr_stream;
ACE_Service_Object_Type rts (&remote_thr_stream, "Remote_Thr_Stream");
#if defined (ACE_TEMPLATES_REQUIRE_SPECIALIZATION)
template class CLI_Stream<THR_STREAM>;
-template class Handle_Thr_Stream<CLI_Stream<THR_STREAM>, THR_ACCEPTOR>;
+template class Handle_Thr_Acceptor<CLI_Stream<THR_STREAM>, THR_ACCEPTOR>;
#endif /* ACE_TEMPLATES_REQUIRE_SPECIALIZATION */
#endif /* ACE_HAS_THREADS */
diff --git a/examples/Service_Configurator/IPC-tests/server/Handle_Thr_Stream.h b/examples/Service_Configurator/IPC-tests/server/Handle_Thr_Stream.h
index 9c12d872f47..be17d2d0338 100644
--- a/examples/Service_Configurator/IPC-tests/server/Handle_Thr_Stream.h
+++ b/examples/Service_Configurator/IPC-tests/server/Handle_Thr_Stream.h
@@ -1,7 +1,7 @@
/* -*- C++ -*- */
// $Id$
-/* Handle connections from remote INET connections. */
+// Handle connections from remote INET connections.
#if !defined (_HANDLE_THR_STREAM_H)
#define _HANDLE_THR_STREAM_H
@@ -12,12 +12,12 @@
#if defined (ACE_HAS_THREADS)
template <class SVC_HANDLER, ACE_PEER_ACCEPTOR_1>
-class Handle_Thr_Stream : public ACE_Strategy_Acceptor<SVC_HANDLER, ACE_PEER_ACCEPTOR_2>
+class Handle_Thr_Acceptor : public ACE_Strategy_Acceptor<SVC_HANDLER, ACE_PEER_ACCEPTOR_2>
{
public:
// = Initialization and termination.
- Handle_Thr_Stream (void);
- ~Handle_Thr_Stream (void);
+ Handle_Thr_Acceptor (void);
+ ~Handle_Thr_Acceptor (void);
// = Dynamic linking hooks.
virtual int init (int argc, char *argv[]);
@@ -25,7 +25,7 @@ public:
virtual int fini (void);
private:
- typedef Handle_Thr_Stream<SVC_HANDLER, ACE_PEER_ACCEPTOR_2> inherited;
+ typedef Handle_Thr_Acceptor<SVC_HANDLER, ACE_PEER_ACCEPTOR_2> inherited;
static u_short DEFAULT_PORT_;