summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorschmidt <douglascraigschmidt@users.noreply.github.com>1998-06-24 07:05:11 +0000
committerschmidt <douglascraigschmidt@users.noreply.github.com>1998-06-24 07:05:11 +0000
commit13c7fde4a38f03871c9d433456f14c16e4921b42 (patch)
tree2aca6c6087645136202067b5299b436bef97a88c
parentf4a9897a651df2556b2e4c387cb6a05021cf8182 (diff)
downloadATCD-13c7fde4a38f03871c9d433456f14c16e4921b42.tar.gz
*** empty log message ***
-rw-r--r--ChangeLog-98b8
-rw-r--r--examples/ASX/Event_Server/Event_Server/Consumer_Router.cpp39
-rw-r--r--examples/ASX/Event_Server/Event_Server/Consumer_Router.h38
-rw-r--r--examples/ASX/Event_Server/Event_Server/Event_Analyzer.cpp8
-rw-r--r--examples/ASX/Event_Server/Event_Server/Event_Analyzer.h25
-rw-r--r--examples/ASX/Event_Server/Event_Server/Options.h25
-rw-r--r--examples/ASX/Event_Server/Event_Server/Peer_Router.cpp301
-rw-r--r--examples/ASX/Event_Server/Event_Server/Peer_Router.h69
-rw-r--r--examples/ASX/Event_Server/Event_Server/Supplier_Router.cpp54
-rw-r--r--examples/ASX/Event_Server/Event_Server/Supplier_Router.h35
-rw-r--r--examples/ASX/Event_Server/Event_Server/event_server.cpp301
-rw-r--r--examples/ASX/Event_Server/Transceiver/transceiver.cpp246
12 files changed, 744 insertions, 405 deletions
diff --git a/ChangeLog-98b b/ChangeLog-98b
index b0c67ac0185..68695e1c2af 100644
--- a/ChangeLog-98b
+++ b/ChangeLog-98b
@@ -1,3 +1,11 @@
+Wed Jun 24 00:00:44 1998 Douglas C. Schmidt <schmidt@tango.cs.wustl.edu>
+
+ * examples/ASX/Event_Server/Transceiver/transceiver.cpp: Cleaned
+ up the Event_Transceiver code a bit.
+
+ * examples/ASX/Event_Server/Event_Server/event_server.cpp (main):
+ Cleaned up the code a bit.
+
Wed Jun 24 06:54:00 EET DST 1998 Wei Chiang <chiang@horizon.ntc.nokia.com>
* Attached "CLASSIX_" to all the file names in ace/CLASSIX.
* Modified include statments in ace/CLASSIX/* to reflect the
diff --git a/examples/ASX/Event_Server/Event_Server/Consumer_Router.cpp b/examples/ASX/Event_Server/Event_Server/Consumer_Router.cpp
index f76a18012e0..27788af3719 100644
--- a/examples/ASX/Event_Server/Event_Server/Consumer_Router.cpp
+++ b/examples/ASX/Event_Server/Event_Server/Consumer_Router.cpp
@@ -16,14 +16,17 @@ Consumer_Router::open (void *)
{
if (this->is_writer ())
{
- // Set the Peer_Router_Context to point back to us so that if
+ // Set the <Peer_Router_Context> to point back to us so that if
// any Consumer's "accidentally" send us data we'll be able to
- // handle it.
+ // handle it by passing it down the stream.
this->context ()->peer_router (this);
- // Make this an active object to handle the error cases in a
- // separate thread.
+ // Increment the reference count.
this->context ()->duplicate ();
+
+ // Make this an active object to handle the error cases in a
+ // separate thread. This is mostly just for illustration, i.e.,
+ // it's probably overkill to use a thread for this!
return this->activate (Options::instance ()->t_flags ());
}
else // if (this->is_reader ())
@@ -43,7 +46,7 @@ Consumer_Router::close (u_long)
// Inform the thread to shut down.
this->msg_queue ()->deactivate ();
- // Both writer and reader call release(), so the context knows when
+ // Both writer and reader call <release>, so the context knows when
// to clean itself up.
this->context ()->release ();
return 0;
@@ -64,7 +67,8 @@ Consumer_Router::svc (void)
)
{
ACE_DEBUG ((LM_DEBUG,
- "(%t) warning: Consumer_Router is forwarding a message to Supplier_Router\n"));
+ "(%t) warning: Consumer_Router is "
+ "forwarding a message to Supplier_Router\n"));
// Pass this message down to the next Module's writer Task.
if (this->put_next (mb) == -1)
@@ -85,8 +89,8 @@ int
Consumer_Router::put (ACE_Message_Block *mb,
ACE_Time_Value *)
{
- // Perform the necessary control operations before passing
- // the message down the stream.
+ // Perform the necessary control operations before passing the
+ // message down the stream.
if (mb->msg_type () == ACE_Message_Block::MB_IOCTL)
{
@@ -94,7 +98,7 @@ Consumer_Router::put (ACE_Message_Block *mb,
return this->put_next (mb);
}
- // If we're the reader side then we're responsible for broadcasting
+ // If we're the reader then we're responsible for broadcasting
// messages to Consumers.
else if (this->is_reader ())
@@ -108,13 +112,14 @@ Consumer_Router::put (ACE_Message_Block *mb,
}
else // if (this->is_writer ())
- // Queue up the message to processed by Consumer_Router::svc().
+ // Queue up the message to processed by <Consumer_Router::svc>
// Since we don't expect to be getting many of these messages, we
// queue them up and run them in a separate thread to avoid taxing
// the main thread.
return this->putq (mb);
}
-// Return information about the Client_Router ACE_Module.
+
+// Return information about the <Consumer_Router>.
int
Consumer_Router::info (char **strp, size_t length) const
@@ -126,13 +131,17 @@ Consumer_Router::info (char **strp, size_t length) const
if (this->context ()->acceptor ().get_local_addr (addr) == -1)
return -1;
- ACE_OS::sprintf (buf, "%s\t %d/%s %s (%s)\n",
- mod_name, addr.get_port_number (), "tcp",
- "# consumer router", this->is_reader () ? "reader" : "writer");
-
+ ACE_OS::sprintf (buf,
+ "%s\t %d/%s %s (%s)\n",
+ mod_name,
+ addr.get_port_number (),
+ "tcp",
+ "# consumer router",
+ this->is_reader () ? "reader" : "writer");
if (*strp == 0 && (*strp = ACE_OS::strdup (mod_name)) == 0)
return -1;
else
ACE_OS::strncpy (*strp, mod_name, length);
+
return ACE_OS::strlen (mod_name);
}
diff --git a/examples/ASX/Event_Server/Event_Server/Consumer_Router.h b/examples/ASX/Event_Server/Event_Server/Consumer_Router.h
index cdaf7090b97..56e58c58c37 100644
--- a/examples/ASX/Event_Server/Event_Server/Consumer_Router.h
+++ b/examples/ASX/Event_Server/Event_Server/Consumer_Router.h
@@ -10,21 +10,38 @@
#include "Peer_Router.h"
class Consumer_Router : public Peer_Router
+{
// = TITLE
// Provides the interface between one or more Consumers and the
- // Event Server ACE_Stream.
-{
+ // Event Server <ACE_Stream>.
+ //
+ // = DESCRIPTION
+ // This class normally sits on "top" of the Stream and routes
+ // messages coming from "downstream" to all the Consumers
+ // connected to it via its "read" <Task>. Normally, the messages
+ // flow up the stream from <Supplier_Router>s. However, if
+ // Consumers transmit data to the <Consumer_Router>, we dutifully
+ // push it out to the Suppliers via the <Supplier_Router>.
+ //
+ // When used on the "reader" side of a Stream, the
+ // <Consumer_Router> simply forwards all messages up the stream.
+ // When used on the "writer" side, the <Consumer_Router> queues
+ // up outgoing messages to suppliers and sends them down to the
+ // <Supplier_Router> in a separate thread. The reason for this
+ // is that it's really an "error" for a <Consumer_Router> to
+ // send messages to Suppliers, so we don't expect this to happen
+ // very much. When it does we use a separate thread to avoid
+ // taxing the main thread, which processes "normal" messages.
+ //
+ // All of the methods in this class except the constructor are
+ // called via base class pointers by the <ACE_Stream>.
+ // Therefore, we can put them in the protected section.
public:
Consumer_Router (Peer_Router_Context *prc);
// Initialization method.
protected:
// = ACE_Task hooks.
-
- // All of these methods are called via base class pointers by the
- // ACE Stream apparatus. Therefore, we can put them in the
- // protected section.
-
virtual int open (void *a = 0);
// Called by the Stream to initialize the router.
@@ -32,16 +49,15 @@ protected:
// Called by the Stream to shutdown the router.
virtual int put (ACE_Message_Block *msg, ACE_Time_Value * = 0);
- // Called by the Consumer_Handler to pass a message to the Router.
- // The Router queues up this message, which is then processed in the
- // <svc> method in a separate thread.
+ // Called by the <Peer_Handler> to pass a message to the
+ // <Consumer_Router>. The <Consumer_Router> queues up this message,
+ // which is then processed in the <svc> method in a separate thread.
virtual int svc (void);
// Runs in a separate thread to dequeue messages and pass them up
// the stream.
// = Dynamic linking hooks.
-
virtual int info (char **info_string, size_t length) const;
// Returns information about this service.
};
diff --git a/examples/ASX/Event_Server/Event_Server/Event_Analyzer.cpp b/examples/ASX/Event_Server/Event_Server/Event_Analyzer.cpp
index 5f90e1941ad..5a796f31bd8 100644
--- a/examples/ASX/Event_Server/Event_Server/Event_Analyzer.cpp
+++ b/examples/ASX/Event_Server/Event_Server/Event_Analyzer.cpp
@@ -6,12 +6,14 @@
int
Event_Analyzer::open (void *)
{
+ // No-op for now...
return 0;
}
int
Event_Analyzer::close (u_long)
{
+ // No-op for now...
return 0;
}
@@ -37,24 +39,28 @@ int
Event_Analyzer::put (ACE_Message_Block *mb, ACE_Time_Value *)
{
if (Options::instance ()->debug ())
- ACE_DEBUG ((LM_DEBUG, "(%t) passing through Event_Analyser::put() (%s)\n",
+ ACE_DEBUG ((LM_DEBUG,
+ "(%t) passing through Event_Analyser::put() (%s)\n",
this->is_reader () ? "reader" : "writer"));
if (mb->msg_type () == ACE_Message_Block::MB_IOCTL)
this->control (mb);
+ // Just pass the message along to the next Module in the stream...
return this->put_next (mb);
}
int
Event_Analyzer::init (int, char *[])
{
+ // No-op for now.
return 0;
}
int
Event_Analyzer::fini (void)
{
+ // No-op for now.
return 0;
}
diff --git a/examples/ASX/Event_Server/Event_Server/Event_Analyzer.h b/examples/ASX/Event_Server/Event_Server/Event_Analyzer.h
index a9497730421..a5c18496789 100644
--- a/examples/ASX/Event_Server/Event_Server/Event_Analyzer.h
+++ b/examples/ASX/Event_Server/Event_Server/Event_Analyzer.h
@@ -10,24 +10,31 @@
#include "ace/Synch.h"
class Event_Analyzer : public ACE_Task<ACE_SYNCH>
- // = TITLE
- // This is a "no-op" class that just forwards all its message
- // blocks onto its neighboring Module in the Stream. In a real
- // application these tasks would be where the Stream processing
- // would go.
{
+ // = TITLE
+ // This class forwards all the <ACE_Message_Block>s it receives
+ // onto its neighboring Module in the Stream.
+ //
+ // = DESCRIPTION
+ // In a "real" event service, application-specific processing
+ // would be done in the <put> (or <svc>) method in this class.
public:
+ // = Initialization hooks called by <ACE_Stream> (not used).
virtual int open (void *a = 0);
virtual int close (u_long flags = 0);
- virtual int put (ACE_Message_Block *msg, ACE_Time_Value * = 0);
- // Dynamic linking hooks.
+ virtual int put (ACE_Message_Block *msg,
+ ACE_Time_Value * = 0);
+ // Entry point into this task.
+
+ // Dynamic linking hooks (not used).
virtual int init (int argc, char *argv[]);
virtual int fini (void);
- virtual int info (char **info_string, size_t length) const;
-
+ virtual int info (char **info_string,
+ size_t length) const;
private:
virtual int control (ACE_Message_Block *);
+ // Implements the watermark control processing.
};
#endif /* _EVENT_ANALYZER_H */
diff --git a/examples/ASX/Event_Server/Event_Server/Options.h b/examples/ASX/Event_Server/Event_Server/Options.h
index 639f013ac73..9e9bb0e7a14 100644
--- a/examples/ASX/Event_Server/Event_Server/Options.h
+++ b/examples/ASX/Event_Server/Event_Server/Options.h
@@ -8,51 +8,67 @@
#include "ace/Profile_Timer.h"
class Options
+{
// = TITLE
// Option Singleton for Event Server.
-{
- friend class ACE_Shutup_GPlusPlus; // Turn off g++ warning...
public:
static Options *instance (void);
+ // Singleton access point.
void parse_args (int argc, char *argv[]);
+ // Parse the command-line arguments and set the options.
+ // = Timer management.
void stop_timer (void);
void start_timer (void);
+ // = Set/get the number of threads.
void thr_count (size_t count);
size_t thr_count (void);
+ // = Set/get the size of the queue.
void initial_queue_length (size_t length);
size_t initial_queue_length (void);
+ // = Set/get the high water mark.
void high_water_mark (size_t size);
size_t high_water_mark (void);
+ // = Set/get the high water mark.
void low_water_mark (size_t size);
size_t low_water_mark (void);
+ // = Set/get the size of a message.
void message_size (size_t size);
size_t message_size (void);
+ // = Set/get the number of iterations.
void iterations (size_t n);
size_t iterations (void);
+ // Set/get threading flags.
void t_flags (long flag);
long t_flags (void);
+ // Set/get supplier port number.
void supplier_port (u_short port);
u_short supplier_port (void);
+ // Set/get consumer port number.
void consumer_port (u_short port);
u_short consumer_port (void);
+ // Enabled if we're in debugging mode.
int debug (void);
+
+ // Enabled if we're in verbose mode.
int verbose (void);
+ // Print the results to the STDERR.
void print_results (void);
private:
+ // = Ensure we're a Singleton.
Options (void);
~Options (void);
@@ -63,7 +79,7 @@ private:
// Number of threads to spawn.
long t_flags_;
- // Flags to thr_create().
+ // Flags to <thr_create>.
size_t high_water_mark_;
// ACE_Task high water mark.
@@ -95,7 +111,8 @@ private:
static Options *instance_;
// Static Singleton.
-
+ friend class ACE_Shutup_GPlusPlus;
+ // Turn off g++ warning...
};
#include "Options.i"
diff --git a/examples/ASX/Event_Server/Event_Server/Peer_Router.cpp b/examples/ASX/Event_Server/Event_Server/Peer_Router.cpp
index f1cd0afac5f..1dd8c99a159 100644
--- a/examples/ASX/Event_Server/Event_Server/Peer_Router.cpp
+++ b/examples/ASX/Event_Server/Event_Server/Peer_Router.cpp
@@ -8,6 +8,11 @@
#include "Options.h"
#include "Peer_Router.h"
+// Send the <ACE_Message_Block> to all the peers. Note that in a
+// "real" application this logic would most likely be more selective,
+// i.e., it would actually do "routing" based on addressing
+// information passed in the <ACE_Message_Block>.
+
int
Peer_Router_Context::send_peers (ACE_Message_Block *mb)
{
@@ -18,7 +23,12 @@ Peer_Router_Context::send_peers (ACE_Message_Block *mb)
// Skip past the header and get the message to send.
ACE_Message_Block *data_block = mb->cont ();
- // "Multicast" the data to *all* the registered peers.
+ // Use an iterator to "multicast" the data to *all* the registered
+ // peers. Note that this doesn't really multicast, it just makes a
+ // "logical" copy of the <ACE_Message_Block> and enqueues it in the
+ // appropriate <Peer_Handler> corresponding to each peer. Note that
+ // a "real" application would probably "route" the data to a subset
+ // of connected peers here, rather than send it to all the peers.
for (PEER_ENTRY *ss = 0;
map_iter.next (ss) != 0;
@@ -28,9 +38,11 @@ Peer_Router_Context::send_peers (ACE_Message_Block *mb)
ACE_DEBUG ((LM_DEBUG,
"(%t) sending to peer via handle %d\n",
ss->ext_id_));
+
iterations++;
+
// Increment reference count before sending since the
- // Peer_Handler might be running in its own thread of control.
+ // <Peer_Handler> might be running in its own thread of control.
bytes += ss->int_id_->put (data_block->duplicate ());
}
@@ -38,12 +50,23 @@ Peer_Router_Context::send_peers (ACE_Message_Block *mb)
return bytes == 0 ? 0 : bytes / iterations;
}
+// Remove the <Peer_Handler> from the peer connection map.
+
int
Peer_Router_Context::unbind_peer (ROUTING_KEY key)
{
return this->peer_map_.unbind (key);
}
+// Add the <Peer_Handler> to the peer connection map.
+
+int
+Peer_Router_Context::bind_peer (ROUTING_KEY key,
+ Peer_Handler *peer_handler)
+{
+ return this->peer_map_.bind (key, peer_handler);
+}
+
void
Peer_Router_Context::duplicate (void)
{
@@ -60,39 +83,36 @@ Peer_Router_Context::release (void)
delete this;
}
-int
-Peer_Router_Context::bind_peer (ROUTING_KEY key,
- Peer_Handler *peer_handler)
-{
- return this->peer_map_.bind (key, peer_handler);
-}
-
Peer_Router_Context::Peer_Router_Context (u_short port)
: reference_count_ (0)
{
- // Perform initializations.
-
+ // Initialize the Acceptor's "listen-mode" socket.
if (this->open (ACE_INET_Addr (port)) == -1)
- ACE_ERROR ((LM_ERROR, "%p\n", "Acceptor::open"));
+ ACE_ERROR ((LM_ERROR,
+ "%p\n",
+ "Acceptor::open"));
+ // Initialize the connection map.
else if (this->peer_map_.open () == -1)
- ACE_ERROR ((LM_ERROR, "%p\n", "Map_Manager::open"));
-
+ ACE_ERROR ((LM_ERROR,
+ "%p\n",
+ "Map_Manager::open"));
else
{
ACE_INET_Addr addr;
- if (this->acceptor().get_local_addr (addr) != -1)
+ if (this->acceptor ().get_local_addr (addr) != -1)
ACE_DEBUG ((LM_DEBUG,
"(%t) initializing %s on port = %d, handle = %d, this = %u\n",
- addr.get_port_number () == Options::instance ()->supplier_port () ?
- "Supplier_Handler" : "Consumer_Handler",
+ addr.get_port_number () == Options::instance ()->supplier_port ()
+ ? "Supplier_Handler" : "Consumer_Handler",
addr.get_port_number (),
this->acceptor().get_handle (),
this));
else
ACE_ERROR ((LM_ERROR,
- "%p\n", "get_local_addr"));
+ "%p\n",
+ "get_local_addr"));
}
}
@@ -107,7 +127,8 @@ Peer_Router_Context::~Peer_Router_Context (void)
PEER_ITERATOR map_iter = this->peer_map_;
- // Make sure to take all the handles out of the map.
+ // Make sure to take all the handles out of the map to avoid
+ // "resource leaks."
for (PEER_ENTRY *ss = 0;
map_iter.next (ss) != 0;
@@ -119,8 +140,11 @@ Peer_Router_Context::~Peer_Router_Context (void)
ss->ext_id_));
if (ACE_Reactor::instance ()->remove_handler
- (ss->ext_id_, ACE_Event_Handler::READ_MASK) == -1)
- ACE_ERROR ((LM_ERROR, "(%t) p\n", "remove_handle"));
+ (ss->ext_id_,
+ ACE_Event_Handler::READ_MASK) == -1)
+ ACE_ERROR ((LM_ERROR,
+ "(%t) p\n",
+ "remove_handle"));
}
// Close down the map.
@@ -139,72 +163,32 @@ Peer_Router_Context::peer_router (Peer_Router *pr)
this->peer_router_ = pr;
}
+// Factory Method that creates a new <Peer_Handler> for each
+// connection.
+
int
Peer_Router_Context::make_svc_handler (Peer_Handler *&sh)
{
- ACE_NEW_RETURN (sh, Peer_Handler (this), -1);
+ ACE_NEW_RETURN (sh,
+ Peer_Handler (this),
+ -1);
return 0;
}
Peer_Handler::Peer_Handler (Peer_Router_Context *prc)
- : prc_ (prc)
+ : peer_router_context_ (prc)
{
}
-#if 0
-
-// Right now, Peer_Handlers are purely Reactive, i.e., they all run in
-// a single thread of control. It would be easy to make them Active
-// Objects by calling activate() in Peer_Handler::open(), making
-// Peer_Handler::put() enqueue each message on the message queue, and
-// (3) then running the following svc() routine to route each message
-// to its final destination within a separate thread. Note that we'd
-// want to move the svc() call up to the Consumer_Router and
-// Supplier_Router level in order to get the right level of control
-// for input and output.
-
-Peer_Handler::svc (void)
-{
- ACE_Message_Block *db, *hb;
- int n;
-
- // Do an endless loop
- for (;;)
- {
- db = new Message_Block (BUFSIZ);
- hb = new Message_Block (sizeof (ROUTING_KEY), Message_Block::MB_PROTO, db);
-
- if ((n = this->peer_.recv (db->rd_ptr (), db->size ())) == -1)
- LM_ERROR_RETURN ((LOG_ERROR, "%p", "recv failed"), -1);
- else if (n == 0) // Client has closed down the connection.
- {
- if (this->prc_->peer_router ()->unbind_peer (this->get_handle ()) == -1)
- LM_ERROR_RETURN ((LOG_ERROR, "%p", "unbind failed"), -1);
- LM_DEBUG ((LOG_DEBUG, "(%t) shutting down \n"));
- return -1; // We do not need to be deregistered by reactor
- // as we were not registered at all
- }
- else
- // Transform incoming buffer into a Message and pass
- // downstream.
- {
- db->wr_ptr (n);
- *(long *) hb->rd_ptr () = this->get_handle (); // Structure assignment.
- hb->wr_ptr (sizeof (long));
-
- if (this->prc_->peer_router ()->reply (hb) == -1)
- {
- cout << "Peer_Handler.svc : peer_router->reply failed" << endl ;
- return -1;
- }
- }
- }
- return 0;
-}
-#endif
+// Send output to a peer. Note that this implementation "blocks" if
+// flow control occurs. This is undesirable for "real" applications.
+// The best way around this is to make the <Peer_Handler> an Active
+// Object, e.g., as done in the $ACE_ROOT/apps/Gateway/Gateway
+// application.
int
-Peer_Handler::put (ACE_Message_Block *mb, ACE_Time_Value *tv)
+Peer_Handler::put (ACE_Message_Block *mb,
+ ACE_Time_Value *tv)
{
#if 0
// If we're running as Active Objects just enqueue the message here.
@@ -212,14 +196,13 @@ Peer_Handler::put (ACE_Message_Block *mb, ACE_Time_Value *tv)
#else
ACE_UNUSED_ARG (tv);
- int result = 0;
-
- result = this->peer ().send_n (mb->rd_ptr (),
- mb->length ());
+ int result = this->peer ().send_n (mb->rd_ptr (),
+ mb->length ());
// Release the memory.
mb->release ();
+
return result;
-#endif
+#endif /* 0 */
}
// Initialize a newly connected handler.
@@ -229,29 +212,45 @@ Peer_Handler::open (void *)
{
char buf[BUFSIZ], *p = buf;
- if (this->prc_->peer_router ()->info (&p, sizeof buf) != -1)
- ACE_DEBUG ((LM_DEBUG, "(%t) creating handler for %s, handle = %d\n",
- buf, this->get_handle ()));
+ if (this->peer_router_context_->peer_router ()->info (&p,
+ sizeof buf) != -1)
+ ACE_DEBUG ((LM_DEBUG,
+ "(%t) creating handler for %s, handle = %d\n",
+ buf,
+ this->get_handle ()));
else
- ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "info"), -1);
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "%p\n",
+ "info"),
+ -1);
#if 0
// If we're running as an Active Object activate the Peer_Handler
// here.
if (this->activate (Options::instance ()->t_flags ()) == -1)
- ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "activation of thread failed"), -1);
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "%p\n",
+ "activation of thread failed"),
+ -1);
ACE_DEBUG ((LM_DEBUG,
"(%t) Peer_Handler::open registering with Reactor for handle_input\n"));
#else
+
// Register with the Reactor to receive messages from our Peer.
if (ACE_Reactor::instance ()->register_handler
(this, ACE_Event_Handler::READ_MASK) == -1)
- ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "register_handler"), -1);
-#endif
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "%p\n",
+ "register_handler"),
+ -1);
+#endif /* 0 */
// Insert outselves into the routing map.
- else if (this->prc_->bind_peer (this->get_handle (), this) == -1)
- ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "bind_peer"), -1);
-
+ else if (this->peer_router_context_->bind_peer (this->get_handle (),
+ this) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "%p\n",
+ "bind_peer"),
+ -1);
else
return 0;
}
@@ -261,7 +260,9 @@ Peer_Handler::open (void *)
int
Peer_Handler::handle_input (ACE_HANDLE h)
{
- ACE_DEBUG ((LM_DEBUG, "(%t) input arrived on handle %d\n", h));
+ ACE_DEBUG ((LM_DEBUG,
+ "(%t) input arrived on handle %d\n",
+ h));
ACE_Message_Block *db = new ACE_Message_Block (BUFSIZ);
ACE_Message_Block *hb = new ACE_Message_Block (sizeof (ROUTING_KEY),
@@ -276,27 +277,39 @@ Peer_Handler::handle_input (ACE_HANDLE h)
return -1;
}
- ssize_t n = this->peer ().recv (db->rd_ptr (), db->size ());
+ ssize_t n = this->peer ().recv (db->rd_ptr (),
+ db->size ());
if (n == -1)
- ACE_ERROR_RETURN ((LM_ERROR, "%p", "recv failed"), -1);
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "%p",
+ "recv failed"),
+ -1);
else if (n == 0) // Client has closed down the connection.
{
- if (this->prc_->unbind_peer (this->get_handle ()) == -1)
- ACE_ERROR_RETURN ((LM_ERROR, "%p", "unbind failed"), -1);
-
- ACE_DEBUG ((LM_DEBUG, "(%t) shutting down handle %d\n", h));
- return -1; // Instruct the ACE_Reactor to deregister us by returning -1.
+ if (this->peer_router_context_->unbind_peer (this->get_handle ()) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "%p",
+ "unbind failed"),
+ -1);
+
+ ACE_DEBUG ((LM_DEBUG,
+ "(%t) shutting down handle %d\n", h));
+ // Instruct the <ACE_Reactor> to deregister us by returning -1.
+ return -1;
}
else
{
- // Transform incoming buffer into a Message.
+ // Transform incoming buffer into an <ACE_Message_Block>.
// First, increment the write pointer to the end of the newly
// read data block.
db->wr_ptr (n);
- // Second, copy the "address" into the header block.
+ // Second, copy the "address" into the header block. Note that
+ // for this implementation the HANDLE we receive the message on
+ // is considered the "address." A "real" application would want
+ // to do something more sophisticated.
*(ACE_HANDLE *) hb->rd_ptr () = this->get_handle ();
// Third, update the write pointer in the header block.
@@ -306,26 +319,31 @@ Peer_Handler::handle_input (ACE_HANDLE h)
// use <Task::put> here because this gives the method at *our*
// level in the stream a chance to do something with the message
// before it is sent up the other side. For instance, if we
- // receive messages in the Supplier_Router, it will just call
- // <put_next> and send them up the stream to the Consumer_Router
- // (which broadcasts them to consumers). However, if we receive
- // messages in the Consumer_Router, it will reply to the
- // Consumer with an error since it's not correct for Consumers
- // to send messages.
- return this->prc_->peer_router ()->put (hb) == -1 ? -1 : 0;
+ // receive messages in the <Supplier_Router>, it will just call
+ // <put_next> and send them up the stream to the
+ // <Consumer_Router> (which broadcasts them to consumers).
+ // However, if we receive messages in the <Consumer_Router>, it
+ // could reply to the Consumer with an error since it's not
+ // correct for Consumers to send messages (we don't do this in
+ // the current implementation, but it could be done in a "real"
+ // application).
+
+ if (this->peer_router_context_->peer_router ()->put (hb) == -1)
+ return -1;
+ else
+ return 0;
}
}
Peer_Router::Peer_Router (Peer_Router_Context *prc)
- : prc_ (prc)
+ : peer_router_context_ (prc)
{
-
}
Peer_Router_Context *
Peer_Router::context (void) const
{
- return this->prc_;
+ return this->peer_router_context_;
}
int
@@ -346,6 +364,69 @@ Peer_Router::control (ACE_Message_Block *mb)
return 0;
}
+#if 0
+
+// Right now, Peer_Handlers are purely Reactive, i.e., they all run in
+// a single thread of control. It would be easy to make them Active
+// Objects by calling activate() in Peer_Handler::open(), making
+// Peer_Handler::put() enqueue each message on the message queue, and
+// (3) then running the following svc() routine to route each message
+// to its final destination within a separate thread. Note that we'd
+// want to move the svc() call up to the Consumer_Router and
+// Supplier_Router level in order to get the right level of control
+// for input and output.
+
+Peer_Handler::svc (void)
+{
+ ACE_Message_Block *db, *hb;
+
+ // Do an endless loop
+ for (;;)
+ {
+ db = new Message_Block (BUFSIZ);
+ hb = new Message_Block (sizeof (ROUTING_KEY),
+ Message_Block::MB_PROTO,
+ db);
+
+ ssize_t n = this->peer_.recv (db->rd_ptr (), db->size ());
+
+ if (n == -1)
+ LM_ERROR_RETURN ((LOG_ERROR,
+ "%p",
+ "recv failed"),
+ -1);
+ else if (n == 0) // Client has closed down the connection.
+ {
+ if (this->peer_router_context_->peer_router ()->unbind_peer (this->get_handle ()) == -1)
+ LM_ERROR_RETURN ((LOG_ERROR,
+ "%p",
+ "unbind failed"),
+ -1);
+ LM_DEBUG ((LOG_DEBUG,
+ "(%t) shutting down \n"));
+
+ // We do not need to be deregistered by reactor
+ // as we were not registered at all.
+ return -1;
+ }
+ else
+ {
+ // Transform incoming buffer into a Message.
+ db->wr_ptr (n);
+ *(long *) hb->rd_ptr () = this->get_handle (); // Structure assignment.
+ hb->wr_ptr (sizeof (long));
+
+ // Pass the message to the stream.
+ if (this->peer_router_context_->peer_router ()->reply (hb) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "(%t) %p\n",
+ "Peer_Handler.svc : peer_router->reply failed"),
+ -1);
+ }
+ }
+ return 0;
+}
+#endif /* 0 */
#endif /* _PEER_ROUTER_C */
#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)
diff --git a/examples/ASX/Event_Server/Event_Server/Peer_Router.h b/examples/ASX/Event_Server/Event_Server/Peer_Router.h
index d9b7021f726..3369fe6415f 100644
--- a/examples/ASX/Event_Server/Event_Server/Peer_Router.h
+++ b/examples/ASX/Event_Server/Event_Server/Peer_Router.h
@@ -16,10 +16,10 @@ class Peer_Router;
class Peer_Router_Context;
class Peer_Handler : public ACE_Svc_Handler<ACE_SOCK_STREAM, ACE_SYNCH>
+{
// = TITLE
// Receive input from a Peer and forward to the appropriate
- // <Peer_Router>.
-{
+ // <Peer_Router> (i.e., <Consumer_Router> or <Supplier_Router>).
public:
Peer_Handler (Peer_Router_Context * = 0);
// Initialization method.
@@ -29,28 +29,34 @@ public:
// object.
virtual int handle_input (ACE_HANDLE);
- // Receive input from the peer.
+ // Receive input from a peer.
virtual int put (ACE_Message_Block *, ACE_Time_Value *tv = 0);
- // Send output to a peer.
+ // Send output to a peer. Note that this implementation "blocks" if
+ // flow control occurs. This is undesirable for "real"
+ // applications. The best way around this is to make the
+ // <Peer_Handler> an Active Object, e.g., as done in the
+ // $ACE_ROOT/apps/Gateway/Gateway application.
protected:
- Peer_Router_Context *prc_;
- // Pointer to router context.
+ Peer_Router_Context *peer_router_context_;
+ // Pointer to router context. This maintains the state that is
+ // shared by both Tasks in a <Peer_Router> Module.
};
class Peer_Router_Context : public ACE_Acceptor<Peer_Handler, ACE_SOCK_ACCEPTOR>
+{
// = TITLE
// Defines state and behavior shared between both Tasks in a
- // Peer_Router Module.
+ // <Peer_Router> Module.
//
// = DESCRIPTION
- // This class also serves as an Acceptor, which creates
- // Peer_Handlers when Peers connect.
-{
+ // This class also serves as an <ACE_Acceptor>, which creates
+ // <Peer_Handlers> when Peers connect.
public:
// = Initialization and termination methods.
Peer_Router_Context (u_short port);
+ // Constructor.
virtual int unbind_peer (ROUTING_KEY);
// Remove the <Peer_Handler *> from the <PEER_MAP> that corresponds
@@ -61,29 +67,37 @@ public:
// <ROUTING_KEY>.
int send_peers (ACE_Message_Block *mb);
- // Send the <ACE_Message_Block> to the peer(s).
+ // Send the <ACE_Message_Block> to all the peers. Note that in a
+ // "real" application this logic would most likely be more
+ // selective, i.e., it would actually do "routing" based on
+ // addressing information passed in the <ACE_Message_Block>.
int make_svc_handler (Peer_Handler *&sh);
- // Create a new <Peer_Handler> for each connection.
+ // Factory Method that creates a new <Peer_Handler> for each
+ // connection. This method overrides the default behavior in
+ // <ACE_Acceptor>.
// = Set/Get Router Task.
- Peer_Router *peer_router ();
+ Peer_Router *peer_router (void);
void peer_router (Peer_Router *);
void release (void);
// Decrement the reference count and delete <this> when count == 0;
void duplicate (void);
- // Increment the reference count
+ // Increment the reference count.
private:
Peer_Router *peer_router_;
// Pointer to the <Peer_Router> that we are accepting for.
- // = Useful typedefs
- typedef ACE_Map_Manager <ROUTING_KEY, Peer_Handler *, ACE_SYNCH_RW_MUTEX> PEER_MAP;
- typedef ACE_Map_Iterator<ROUTING_KEY, Peer_Handler *, ACE_SYNCH_RW_MUTEX> PEER_ITERATOR;
- typedef ACE_Map_Entry<ROUTING_KEY, Peer_Handler *> PEER_ENTRY;
+ // = Useful typedefs.
+ typedef ACE_Map_Manager <ROUTING_KEY, Peer_Handler *, ACE_SYNCH_RW_MUTEX>
+ PEER_MAP;
+ typedef ACE_Map_Iterator<ROUTING_KEY, Peer_Handler *, ACE_SYNCH_RW_MUTEX>
+ PEER_ITERATOR;
+ typedef ACE_Map_Entry<ROUTING_KEY, Peer_Handler *>
+ PEER_ENTRY;
PEER_MAP peer_map_;
// Map used to keep track of active peers.
@@ -95,21 +109,22 @@ private:
// Private to ensure dynamic allocation.
friend class Friend_Of_Peer_Router_Context;
- // declare a friend class to avoid compiler warnings because the
+ // Declare a friend class to avoid compiler warnings because the
// destructor is private.
};
class Peer_Router : public ACE_Task<ACE_SYNCH>
+{
// = TITLE
// This abstract base class provides mechanisms for routing
- // messages to/from a ACE_Stream from/to one or more peers (which
+ // messages to/from a <ACE_Stream> from/to one or more peers (which
// are typically running on remote hosts).
//
// = DESCRIPTION
- // A subclass of Peer_Router overrides the open(), close(), and
- // put() methods in order to specialize the behavior of the router
- // to meet application-specific requirements.
-{
+ // Subclasses of <Peer_Router> (such as <Consumer_Router> or
+ // <Supplier_Router>) override the <open>, <close>, and
+ // <put> methods to specialize the behavior of the router to
+ // meet application-specific requirements.
protected:
Peer_Router (Peer_Router_Context *prc);
// Initialization method.
@@ -124,9 +139,9 @@ protected:
// Helpful typedef.
private:
- Peer_Router_Context *prc_;
- // Reference to the context shared by the writer and reader Tasks in
- // the Consumer and Supplier Modules.
+ Peer_Router_Context *peer_router_context_;
+ // Reference to the context shared by the writer and reader Tasks,
+ // e.g., in the <Consumer_Router> and <Supplier_Router> Modules.
// = Prevent copies and pass-by-value.
Peer_Router (const Peer_Router &);
diff --git a/examples/ASX/Event_Server/Event_Server/Supplier_Router.cpp b/examples/ASX/Event_Server/Event_Server/Supplier_Router.cpp
index 0ab012eaa4a..e42c9371eed 100644
--- a/examples/ASX/Event_Server/Event_Server/Supplier_Router.cpp
+++ b/examples/ASX/Event_Server/Event_Server/Supplier_Router.cpp
@@ -17,9 +17,12 @@ Supplier_Router::svc (void)
)
{
ACE_DEBUG ((LM_DEBUG,
- "(%t) warning: Supplier_Router is forwarding a message via send_peers\n"));
+ "(%t) warning: Supplier_Router is "
+ "forwarding a message via send_peers\n"));
- // Broadcast the message to the Suppliers.
+ // Broadcast the message to the Suppliers, even though this is
+ // "incorrect" (assuming a oneway flow of events from Suppliers
+ // to Consumers)!
if (this->context ()->send_peers (mb) == -1)
ACE_ERROR_RETURN ((LM_ERROR,
@@ -27,35 +30,39 @@ Supplier_Router::svc (void)
-1);
}
- ACE_DEBUG ((LM_DEBUG, "(%t) stopping svc in Supplier_Router\n"));
+ ACE_DEBUG ((LM_DEBUG,
+ "(%t) stopping svc in Supplier_Router\n"));
return 0;
}
Supplier_Router::Supplier_Router (Peer_Router_Context *prc)
: Peer_Router (prc)
{
+ // Increment the reference count.
this->context ()->duplicate ();
}
-// Initialize the Supplier Router.
+// Initialize the Supplier Router.
int
Supplier_Router::open (void *)
{
if (this->is_reader ())
{
- // Set the Peer_Router_Context to point back to us so that all the
- // Peer_Handler's <put> their incoming <Message_Blocks> to our
- // reader Task.
+ // Set the <Peer_Router_Context> to point back to us so that all
+ // the Peer_Handler's <put> their incoming <Message_Blocks> to
+ // our reader Task.
this->context ()->peer_router (this);
return 0;
}
else // if (this->is_writer ()
{
+ // Increment the reference count.
+ this->context ()->duplicate ();
+
// Make this an active object to handle the error cases in a
// separate thread.
- this->context ()->duplicate ();
return this->activate (Options::instance ()->t_flags ());
}
}
@@ -65,7 +72,8 @@ Supplier_Router::open (void *)
int
Supplier_Router::close (u_long)
{
- ACE_DEBUG ((LM_DEBUG, "(%t) closing Supplier_Router %s\n",
+ ACE_DEBUG ((LM_DEBUG,
+ "(%t) closing Supplier_Router %s\n",
this->is_reader () ? "reader" : "writer"));
if (this->is_writer ())
@@ -94,19 +102,23 @@ Supplier_Router::put (ACE_Message_Block *mb,
}
// If we're the reader then we are responsible for pass messages up
- // to the next Module's writer Task.
-
+ // to the next Module's reader Task. Note that in a "real"
+ // application this is likely where we'd take a look a the actual
+ // information that was in the message, e.g., in order to figure out
+ // what operation it was and what it's "parameters" where, etc.
else if (this->is_reader ())
return this->put_next (mb);
+
else // if (this->is_writer ())
{
// Someone is trying to write to the Supplier. In this
- // implementation this is considered an error. However, we'll
+ // implementation this is considered an "error." However, we'll
// just go ahead and forward the message to the Supplier (who
// hopefully is prepared to receive it).
- ACE_DEBUG ((LM_WARNING, "(%t) warning: sending to a Supplier\n"));
+ ACE_DEBUG ((LM_WARNING,
+ "(%t) warning: sending to a Supplier\n"));
- // Queue up the message to processed by Supplier_Router::svc().
+ // Queue up the message to processed by <Supplier_Router::svc>.
// Since we don't expect to be getting many of these messages,
// we queue them up and run them in a separate thread to avoid
// taxing the main thread.
@@ -114,7 +126,7 @@ Supplier_Router::put (ACE_Message_Block *mb,
}
}
-// Return information about the Supplier_Router ACE_Module.
+// Return information about the <Supplier_Router>.
int
Supplier_Router::info (char **strp, size_t length) const
@@ -126,13 +138,17 @@ Supplier_Router::info (char **strp, size_t length) const
if (this->context ()->acceptor ().get_local_addr (addr) == -1)
return -1;
- ACE_OS::sprintf (buf, "%s\t %d/%s %s (%s)\n",
- mod_name, addr.get_port_number (), "tcp",
- "# supplier router", this->is_reader () ? "reader" : "writer");
-
+ ACE_OS::sprintf (buf,
+ "%s\t %d/%s %s (%s)\n",
+ mod_name,
+ addr.get_port_number (),
+ "tcp",
+ "# supplier router",
+ this->is_reader () ? "reader" : "writer");
if (*strp == 0 && (*strp = ACE_OS::strdup (mod_name)) == 0)
return -1;
else
ACE_OS::strncpy (*strp, mod_name, length);
+
return ACE_OS::strlen (mod_name);
}
diff --git a/examples/ASX/Event_Server/Event_Server/Supplier_Router.h b/examples/ASX/Event_Server/Event_Server/Supplier_Router.h
index 42d067a6454..b83035cc957 100644
--- a/examples/ASX/Event_Server/Event_Server/Supplier_Router.h
+++ b/examples/ASX/Event_Server/Event_Server/Supplier_Router.h
@@ -11,20 +11,33 @@
#include "Peer_Router.h"
class Supplier_Router : public Peer_Router
+{
// = TITLE
// Provides the interface between one or more Suppliers and the
// Event Server ACE_Stream.
//
// = DESCRIPTION
- // When used on the "reader" side of a Stream, this Router Task
- // simply forwards all messages up the stream. When used on the
- // "writer" side, this Router Task queues up outgoing messages
- // to suppliers and sends them in a separate thread. The reason
- // for this is that it's really an "error" for a
- // <Supplier_Router> to send messages to Suppliers, so we don't
- // expect this to happen very much. when it does we use a
- // separate thread to avoid taxing the main thread.
-{
+ // This class normally sits on "bottom" of the Stream and sends
+ // all messages coming from Suppliers via its "write" <Task>
+ // "upstream" to all the Consumers connected to the
+ // <Consumer_Router>. Normally, the messages flow up the
+ // stream to <Consumer_Router>s. However, if Consumers
+ // transmit data to the <Consumer_Router>, we dutifully push it
+ // out to the Suppliers via the <Supplier_Router>.
+ //
+ // When used on the "reader" side of a Stream, the
+ // <Supplier_Router> simply forwards all messages up the stream.
+ // When used on the "writer" side, the <Supplier_Router> queues
+ // up outgoing messages to suppliers and sends them in a
+ // separate thread. The reason for this is that it's really an
+ // "error" for a <Supplier_Router> to send messages to
+ // Suppliers, so we don't expect this to happen very much. When
+ // it does we use a separate thread to avoid taxing the main
+ // thread, which processes "normal" messages.
+ //
+ // All of these methods are called via base class pointers by
+ // the <ACE_Stream> apparatus. Therefore, we can put them in
+ // the protected section.
public:
Supplier_Router (Peer_Router_Context *prc);
// Initialization method.
@@ -32,10 +45,6 @@ public:
protected:
// = ACE_Task hooks.
- // All of these methods are called via base class pointers by the
- // ACE Stream apparatus. Therefore, we can put them in the
- // protected section.
-
virtual int open (void *a = 0);
// Called by the Stream to initialize the router.
diff --git a/examples/ASX/Event_Server/Event_Server/event_server.cpp b/examples/ASX/Event_Server/Event_Server/event_server.cpp
index 6dba2c38283..4fa59126d64 100644
--- a/examples/ASX/Event_Server/Event_Server/event_server.cpp
+++ b/examples/ASX/Event_Server/Event_Server/event_server.cpp
@@ -1,6 +1,6 @@
// $Id$
-// Test the event server.
+// Main driver program for the event server example.
#include "ace/Stream.h"
#include "ace/Service_Config.h"
@@ -9,142 +9,241 @@
#include "Event_Analyzer.h"
#include "Supplier_Router.h"
+// Typedef these components to handle multi-threading correctly.
typedef ACE_Stream<ACE_SYNCH> MT_Stream;
typedef ACE_Module<ACE_SYNCH> MT_Module;
-class Quit_Handler : public ACE_Sig_Adapter
- // = TITLE
- // Handle SIGINT and terminate the entire application.
+class Event_Server : public ACE_Sig_Adapter
{
+ // = TITLE
+ // Run the logic for the <Event_Server>.
+ //
+ // = DESCRIPTION
+ // In addition to packaging the <Event_Server> components, this
+ // class also handles SIGINT and terminate the entire
+ // application process. There are several ways to terminate
+ // this application process:
+ //
+ // 1. Send a SIGINT signal (e.g., via ^C)
+ // 2. Type any character on the STDIN.
+ //
+ // Note that by inheriting from the <ACE_Sig_Adapter> we can
+ // shutdown the <ACE_Reactor> cleanly when a SIGINT is
+ // generated.
public:
- Quit_Handler (void);
- virtual int handle_input (ACE_HANDLE fd);
+ Event_Server (void);
+ // Constructor.
+
+ int svc (void);
+ // Run the event-loop for the event server.
+
+private:
+ virtual int handle_input (ACE_HANDLE handle);
+ // Hook method called back when a user types something into the
+ // STDIN in order to shut down the program.
+
+ int configure_stream (void);
+ // Setup the plumbing in the stream.
+
+ int set_watermarks (void);
+ // Set the high and low queue watermarks.
+
+ int run_event_loop (void);
+ // Run the event-loop for the <Event_Server>.
+
+ MT_Stream event_server_;
+ // The <ACE_Stream> that contains the <Event_Server> application
+ // <Modules>.
};
-Quit_Handler::Quit_Handler (void)
+Event_Server::Event_Server (void)
: ACE_Sig_Adapter (ACE_Sig_Handler_Ex (ACE_Reactor::end_event_loop))
+ // Shutdown the <ACE_Reactor>'s event loop when a SIGINT is
+ // received.
{
- // Register to trap input from the user.
+ // Register to trap STDIN from the user.
if (ACE::register_stdin_handler (this,
ACE_Reactor::instance (),
ACE_Thread_Manager::instance ()) == -1)
- ACE_ERROR ((LM_ERROR, "%p\n", "register_stdin_handler"));
+ ACE_ERROR ((LM_ERROR,
+ "%p\n",
+ "register_stdin_handler"));
// Register to trap the SIGINT signal.
else if (ACE_Reactor::instance ()->register_handler
(SIGINT, this) == -1)
- ACE_ERROR ((LM_ERROR, "%p\n", "register_handler"));
+ ACE_ERROR ((LM_ERROR,
+ "%p\n",
+ "register_handler"));
}
int
-Quit_Handler::handle_input (ACE_HANDLE)
+Event_Server::handle_input (ACE_HANDLE)
{
// This code here will make sure we actually wait for the user to
- // type something. On platforms like Win32, handle_input() is called
+ // type something. On platforms like Win32, <handle_input> is called
// prematurely (even when there is no data).
char temp_buffer [BUFSIZ];
- ACE_OS::read (ACE_STDIN, temp_buffer, sizeof (temp_buffer));
+
+ ssize_t n = ACE_OS::read (ACE_STDIN,
+ temp_buffer,
+ sizeof (temp_buffer));
+ // This ought to be > 0, otherwise something very strange has
+ // happened!!
+ ACE_ASSERT (n > 0);
Options::instance ()->stop_timer ();
- ACE_DEBUG ((LM_INFO, "(%t) closing down the test\n"));
+
+ ACE_DEBUG ((LM_INFO,
+ "(%t) closing down the test\n"));
Options::instance ()->print_results ();
- ACE_Reactor::end_event_loop();
+ ACE_Reactor::end_event_loop ();
return -1;
}
int
+Event_Server::configure_stream (void)
+{
+ Peer_Router_Context *src;
+ // Create the <Supplier_Router>'s routing context. This contains a
+ // context shared by both the write-side and read-side of the
+ // <Supplier_Router> Module.
+ ACE_NEW_RETURN (src,
+ Peer_Router_Context (Options::instance ()->supplier_port ()),
+ -1);
+
+ MT_Module *srm = 0;
+ // Create the <Supplier_Router> module.
+ ACE_NEW_RETURN (srm,
+ MT_Module
+ ("Supplier_Router",
+ new Supplier_Router (src),
+ new Supplier_Router (src)),
+ -1);
+
+ MT_Module *eam = 0;
+ // Create the <Event_Analyzer> module.
+ ACE_NEW_RETURN (eam,
+ MT_Module
+ ("Event_Analyzer",
+ new Event_Analyzer,
+ new Event_Analyzer),
+ -1);
+
+ Peer_Router_Context *crc;
+ // Create the <Consumer_Router>'s routing context. This contains a
+ // context shared by both the write-side and read-side of the
+ // <Consumer_Router> Module.
+ ACE_NEW_RETURN (crc,
+ Peer_Router_Context (Options::instance ()->consumer_port ()),
+ -1);
+
+ MT_Module *crm = 0;
+ // Create the <Consumer_Router> module.
+ ACE_NEW_RETURN (crm,
+ MT_Module
+ ("Consumer_Router",
+ new Consumer_Router (crc),
+ new Consumer_Router (crc)),
+ -1);
+
+ // Push the Modules onto the event_server stream.
+
+ if (this->event_server_.push (srm) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "%p\n",
+ "push (Supplier_Router)"),
+ -1);
+ else if (this->event_server_.push (eam) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "%p\n",
+ "push (Event_Analyzer)"),
+ -1);
+ else if (this->event_server_.push (crm) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "%p\n",
+ "push (Consumer_Router)"),
+ -1);
+ return 0;
+}
+
+int
+Event_Server::set_watermarks (void)
+{
+ // Set the high and low water marks appropriately. The water marks
+ // control how much data can be buffered before the queues are
+ // considered "full."
+ int wm = Options::instance ()->low_water_mark ();
+
+ if (this->event_server_.control (ACE_IO_Cntl_Msg::SET_LWM,
+ &wm) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "push (setting low watermark)"),
+ -1);
+
+ wm = Options::instance ()->high_water_mark ();
+ if (this->event_server_.control (ACE_IO_Cntl_Msg::SET_HWM,
+ &wm) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "push (setting high watermark)"),
+ -1);
+ return 0;
+}
+
+int
+Event_Server::run_event_loop (void)
+{
+ // Begin the timer.
+ Options::instance ()->start_timer ();
+
+ // Perform the main event loop waiting for the user to type ^C or to
+ // enter a line on the ACE_STDIN.
+
+ ACE_Reactor::run_event_loop ();
+
+ // Close down the stream and call the <close> hooks on all the
+ // <ACE_Task>s in the various Modules in the Stream.
+ this->event_server_.close ();
+
+ // Wait for the threads in the <Consumer_Router> and
+ // <Supplier_Router> to exit.
+ return ACE_Thread_Manager::instance ()->wait ();
+}
+
+int
+Event_Server::svc (void)
+{
+ if (this->configure_stream () == -1)
+ return -1;
+ else if (this->set_watermarks () == -1)
+ return -1;
+ else if (this->run_event_loop () == -1)
+ return -1;
+ else
+ return 0;
+}
+
+int
main (int argc, char *argv[])
{
#if defined (ACE_HAS_THREADS)
- ACE_Service_Config daemon;
-
Options::instance ()->parse_args (argc, argv);
- {
- // Primary ACE_Stream for EVENT_SERVER application.
- MT_Stream event_server;
-
- // Enable graceful shutdowns...
- Quit_Handler quit_handler;
-
- Peer_Router_Context *src;
- // Create the Supplier_Router's routing context, which contains
- // context shared by both the write-side and read-side of the
- // Supplier_Router Module.
- ACE_NEW_RETURN (src,
- Peer_Router_Context (Options::instance ()->supplier_port ()),
- -1);
-
- MT_Module *srm = 0;
- // Create the Supplier Router module.
- ACE_NEW_RETURN (srm, MT_Module
- ("Supplier_Router",
- new Supplier_Router (src),
- new Supplier_Router (src)),
- -1);
-
- MT_Module *eam = 0;
- // Create the Event Analyzer module.
- ACE_NEW_RETURN (eam, MT_Module
- ("Event_Analyzer",
- new Event_Analyzer,
- new Event_Analyzer),
- -1);
-
- Peer_Router_Context *crc;
- // Create the Consumer_Router's routing context, which contains
- // context shared by both the write-side and read-side of the
- // Consumer_Router Module.
- ACE_NEW_RETURN (crc,
- Peer_Router_Context (Options::instance ()->consumer_port ()),
- -1);
-
- MT_Module *crm = 0;
- // Create the Consumer Router module.
- ACE_NEW_RETURN (crm, MT_Module
- ("Consumer_Router",
- new Consumer_Router (crc),
- new Consumer_Router (crc)),
- -1);
-
- // Push the Modules onto the event_server stream.
-
- if (event_server.push (srm) == -1)
- ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "push (Supplier_Router)"), -1);
-
- if (event_server.push (eam) == -1)
- ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "push (Event_Analyzer)"), -1);
-
- if (event_server.push (crm) == -1)
- ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "push (Consumer_Router)"), -1);
-
- // Set the high and low water marks appropriately.
-
- int wm = Options::instance ()->low_water_mark ();
-
- if (event_server.control (ACE_IO_Cntl_Msg::SET_LWM, &wm) == -1)
- ACE_ERROR_RETURN ((LM_ERROR, "push (setting low watermark)"), -1);
-
- wm = Options::instance ()->high_water_mark ();
- if (event_server.control (ACE_IO_Cntl_Msg::SET_HWM, &wm) == -1)
- ACE_ERROR_RETURN ((LM_ERROR, "push (setting high watermark)"), -1);
-
- Options::instance ()->start_timer ();
-
- // Perform the main event loop waiting for the user to type ^C or
- // to enter a line on the ACE_STDIN.
-
- ACE_Reactor::run_event_loop ();
- // The destructor of event_server will close down the stream and
- // call the close() hooks on all the ACE_Tasks.
- }
-
- // Wait for the threads to exit.
- ACE_Thread_Manager::instance ()->wait ();
- ACE_DEBUG ((LM_DEBUG, "exiting main\n"));
+
+ // Initialize the <Event_Server>.
+ Event_Server event_server;
+
+ // Run the event server's event-loop.
+ int result = event_server.svc ();
+
+ ACE_DEBUG ((LM_DEBUG,
+ "exiting main\n"));
+
+ return result;
#else
ACE_UNUSED_ARG (argc);
ACE_UNUSED_ARG (argv);
- ACE_ERROR ((LM_ERROR, "threads not supported on this platform\n"));
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "threads not supported on this platform\n"),
+ 1);
#endif /* ACE_HAS_THREADS */
- return 0;
}
diff --git a/examples/ASX/Event_Server/Transceiver/transceiver.cpp b/examples/ASX/Event_Server/Transceiver/transceiver.cpp
index 787fd064803..148c4ede90f 100644
--- a/examples/ASX/Event_Server/Transceiver/transceiver.cpp
+++ b/examples/ASX/Event_Server/Transceiver/transceiver.cpp
@@ -9,66 +9,21 @@
#include "ace/SOCK_Connector.h"
#include "ace/Get_Opt.h"
-// Port number of event server.
-static u_short port_number;
-
-// Name of event server.
-static char *host_name;
-
-// Are we playing the Consumer ('C') or Supplier ('S') role?
-static char role = 'S';
-
-// Handle the command-line arguments.
-
-static void
-parse_args (int argc, char *argv[])
-{
- ACE_Get_Opt get_opt (argc, argv, "Ch:p:S");
-
- port_number = ACE_DEFAULT_SERVER_PORT;
- host_name = ACE_DEFAULT_SERVER_HOST;
-
- for (int c; (c = get_opt ()) != -1; )
- switch (c)
- {
- case 'C':
- role = c;
- break;
- case 'h':
- host_name = get_opt.optarg;
- break;
- case 'p':
- port_number = ACE_OS::atoi (get_opt.optarg);
- break;
- case 'S':
- role = c;
- break;
- default:
- ACE_ERROR ((LM_ERROR,
- "usage: %n [-p portnum] [-h host_name]\n%a", 1));
- /* NOTREACHED */
- break;
- }
-
- // Increment by 1 if we're the supplier to mirror the default
- // behavior of the Event_Server (which sets the Consumer port to
- // ACE_DEFAULT_SERVER_PORT and the Supplier port to
- // ACE_DEFAULT_SERVER_PORT + 1).
- if (role == 'S' && port_number == ACE_DEFAULT_SERVER_PORT)
- port_number++;
-}
-
class Event_Transceiver : public ACE_Svc_Handler<ACE_SOCK_STREAM, ACE_NULL_SYNCH>
+{
// = TITLE
// Generate and receives messages from the event server.
//
// = DESCRIPTION
// This class is both a consumer and supplier of events, i.e.,
- // it is a ``transceiver.''
-{
+ // it's a ``transceiver.''
public:
// = Initialization method.
+ Event_Transceiver (int argc, char *argv[]);
+ // Performs the actual initialization.
+
Event_Transceiver (void);
+ // No-op constructor (required by the <ACE_Connector>).
// = Svc_Handler hook called by the <ACE_Connector>.
virtual int open (void *);
@@ -88,15 +43,71 @@ private:
int receiver (void);
// Reads data from socket and writes to ACE_STDOUT.
- int forwarder (void);
+ int transmitter (void);
// Writes data from ACE_STDIN to socket.
+
+ int parse_args (int argc, char *argv[]);
+ // Parse the command-line arguments.
+
+ u_short port_number_;
+ // Port number of event server.
+
+ char *host_name_;
+ // Name of event server.
+
+ char *role_;
+ // Are we playing the Consumer or Supplier role?
};
+// Handle the command-line arguments.
+
+int
+Event_Transceiver::parse_args (int argc, char *argv[])
+{
+ ACE_Get_Opt get_opt (argc, argv, "Ch:p:S");
+
+ this->port_number_ = ACE_DEFAULT_SERVER_PORT;
+ this->host_name_ = ACE_DEFAULT_SERVER_HOST;
+ this->role_ = "Supplier";
+
+ for (int c; (c = get_opt ()) != -1; )
+ switch (c)
+ {
+ case 'C':
+ this->role_ = "Consumer";
+ break;
+ case 'h':
+ this->host_name_ = get_opt.optarg;
+ break;
+ case 'p':
+ this->port_number_ = ACE_OS::atoi (get_opt.optarg);
+ break;
+ case 'S':
+ this->role_ = "Supplier";
+ break;
+ default:
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "usage: %n [-CS] [-h host_name] [-p portnum] \n"),
+ -1);
+ /* NOTREACHED */
+ break;
+ }
+
+ // Increment by 1 if we're the supplier to mirror the default
+ // behavior of the Event_Server (which sets the Consumer port to
+ // ACE_DEFAULT_SERVER_PORT and the Supplier port to
+ // ACE_DEFAULT_SERVER_PORT + 1). Note that this is kind of a
+ // hack...
+ if (ACE_OS::strcmp (this->role_, "Supplier") == 0
+ && this->port_number_ == ACE_DEFAULT_SERVER_PORT)
+ this->port_number_++;
+}
+
int
Event_Transceiver::handle_close (ACE_HANDLE,
ACE_Reactor_Mask)
{
- ACE_Reactor::end_event_loop();
+ ACE_Reactor::end_event_loop ();
return 0;
}
@@ -107,41 +118,76 @@ Event_Transceiver::handle_signal (int signum,
siginfo_t *,
ucontext_t *)
{
- ACE_DEBUG ((LM_DEBUG, "(%P|%t) received signal %S\n", signum));
+ ACE_DEBUG ((LM_DEBUG,
+ "(%P|%t) received signal %S\n",
+ signum));
- ACE_Reactor::end_event_loop();
+ ACE_Reactor::end_event_loop ();
return 0;
}
Event_Transceiver::Event_Transceiver (void)
{
- ACE_Sig_Set sig_set;
-
- sig_set.sig_add (SIGINT);
- sig_set.sig_add (SIGQUIT);
+}
- if (ACE_Reactor::instance ()->register_handler
- (sig_set, this) == -1)
- ACE_ERROR ((LM_ERROR, "%p\n", "register_handler"));
-
- // We need to register <this> here before we're connected since
- // otherwise <get_handle> will return the connection socket handle
- // for the peer.
- else if (ACE::register_stdin_handler (this,
- ACE_Reactor::instance (),
- ACE_Thread_Manager::instance ()) == -1)
+Event_Transceiver::Event_Transceiver (int argc, char *argv[])
+{
+ if (this->parse_args (argc, argv) == -1)
ACE_ERROR ((LM_ERROR,
- "%p\n",
- "register_stdin_handler"));
+ "%p\n",
+ "parse_args"));
+ else
+ {
+ ACE_Sig_Set sig_set;
+
+ sig_set.sig_add (SIGINT);
+ sig_set.sig_add (SIGQUIT);
+
+ // Register to handle the SIGINT and SIGQUIT signals.
+ if (ACE_Reactor::instance ()->register_handler
+ (sig_set,
+ this) == -1)
+ ACE_ERROR ((LM_ERROR,
+ "%p\n",
+ "register_handler"));
+
+ // We need to register <this> here before we're connected since
+ // otherwise <get_handle> will return the connection socket
+ // handle for the peer.
+ else if (ACE::register_stdin_handler (this,
+ ACE_Reactor::instance (),
+ ACE_Thread_Manager::instance ()) == -1)
+ ACE_ERROR ((LM_ERROR,
+ "%p\n",
+ "register_stdin_handler"));
+
+ // Address of the server.
+ ACE_INET_Addr server_addr (this->port_number_,
+ this->host_name_);
+
+ ACE_Connector<Event_Transceiver, ACE_SOCK_CONNECTOR> connector;
+
+ // We need a pointer here because connect takes a reference to a
+ // pointer!
+ Event_Transceiver *etp = this;
+
+ // Establish the connection to the Event Server.
+ if (connector.connect (etp,
+ server_addr) == -1)
+ ACE_ERROR ((LM_ERROR,
+ "%p\n",
+ this->host_name_));
+ }
}
int
Event_Transceiver::open (void *)
{
- // Register ourselves to be notified when there's data on the
- // socket.
+ // Register ourselves to be notified when there's data to read on
+ // the socket.
if (ACE_Reactor::instance ()->register_handler
- (this, ACE_Event_Handler::READ_MASK) == -1)
+ (this,
+ ACE_Event_Handler::READ_MASK) == -1)
ACE_ERROR_RETURN ((LM_ERROR,
"%p\n",
"register_handler"),
@@ -152,18 +198,19 @@ Event_Transceiver::open (void *)
int
Event_Transceiver::handle_input (ACE_HANDLE handle)
{
+ // Determine whether we play the role of a consumer or a supplier.
if (handle == ACE_STDIN)
- return this->forwarder ();
+ return this->transmitter ();
else
return this->receiver ();
}
-
int
-Event_Transceiver::forwarder (void)
+Event_Transceiver::transmitter (void)
{
- ACE_DEBUG ((LM_DEBUG, "(%P|%t) entering %s forwarder\n",
- role == 'C' ? "Consumer" : "Supplier"));
+ ACE_DEBUG ((LM_DEBUG,
+ "(%P|%t) entering %s transmitter\n",
+ this->role_));
char buf[BUFSIZ];
ssize_t n = ACE_OS::read (ACE_STDIN, buf, sizeof buf);
@@ -172,45 +219,54 @@ Event_Transceiver::forwarder (void)
if (n <= 0 || this->peer ().send_n (buf, n) != n)
result = -1;
- ACE_DEBUG ((LM_DEBUG, "(%P|%t) leaving %s forwarder\n",
- role == 'C' ? "Consumer" : "Supplier"));
+ ACE_DEBUG ((LM_DEBUG,
+ "(%P|%t) leaving %s transmitter\n",
+ this->role_));
return result;
}
int
Event_Transceiver::receiver (void)
{
- ACE_DEBUG ((LM_DEBUG, "(%P|%t) entering %s receiver\n",
- role == 'C' ? "Consumer" : "Supplier"));
+ ACE_DEBUG ((LM_DEBUG,
+ "(%P|%t) entering %s receiver\n",
+ this->role_));
char buf[BUFSIZ];
ssize_t n = this->peer ().recv (buf, sizeof buf);
int result = 0;
- if (n <= 0 || ACE_OS::write (ACE_STDOUT, buf, n) != n)
+ if (n <= 0
+ || ACE_OS::write (ACE_STDOUT, buf, n) != n)
result = -1;
- ACE_DEBUG ((LM_DEBUG, "(%P|%t) leaving %s receiver\n",
- role == 'C' ? "Consumer" : "Supplier"));
+ ACE_DEBUG ((LM_DEBUG,
+ "(%P|%t) leaving %s receiver\n",
+ this->role_));
return result;
}
int
main (int argc, char *argv[])
{
- ACE_Service_Config daemon (argv[0]);
-
- parse_args (argc, argv);
+ if (ACE_Service_Config::open (argv[0]) == -1
+ && errno != ENOENT)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "%p\n",
+ "open"),
+ -1);
- ACE_Connector<Event_Transceiver, ACE_SOCK_CONNECTOR> connector;
- Event_Transceiver transceiver, *tp = &transceiver;
+ // Create and initialize the transceiver.
+ Event_Transceiver transceiver (argc, argv);
- ACE_INET_Addr server_addr (port_number, host_name);
+ // Demonstrate how we can check if a constructor failed...
+ if (ACE_LOG_MSG->op_status () == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "%p\n",
+ "Event_Transceiver constructor failed"),
+ -1);
- // Establish the connection to the Event Server.
- if (connector.connect (tp, server_addr) == -1)
- ACE_ERROR_RETURN ((LM_ERROR, "%p\n", host_name), 1);
// Run event loop until either the event server shuts down or we get
// a SIGINT.