summaryrefslogtreecommitdiff
path: root/ACE/apps/Gateway
diff options
context:
space:
mode:
Diffstat (limited to 'ACE/apps/Gateway')
-rw-r--r--ACE/apps/Gateway/Gateway/Concrete_Connection_Handlers.cpp802
-rw-r--r--ACE/apps/Gateway/Gateway/Concrete_Connection_Handlers.h151
-rw-r--r--ACE/apps/Gateway/Gateway/Config_Files.cpp215
-rw-r--r--ACE/apps/Gateway/Gateway/Config_Files.h98
-rw-r--r--ACE/apps/Gateway/Gateway/Connection_Handler.cpp272
-rw-r--r--ACE/apps/Gateway/Gateway/Connection_Handler.h157
-rw-r--r--ACE/apps/Gateway/Gateway/Connection_Handler_Acceptor.cpp56
-rw-r--r--ACE/apps/Gateway/Gateway/Connection_Handler_Acceptor.h65
-rw-r--r--ACE/apps/Gateway/Gateway/Connection_Handler_Connector.cpp60
-rw-r--r--ACE/apps/Gateway/Gateway/Connection_Handler_Connector.h44
-rw-r--r--ACE/apps/Gateway/Gateway/Consumer_Dispatch_Set.h32
-rw-r--r--ACE/apps/Gateway/Gateway/Event.h225
-rw-r--r--ACE/apps/Gateway/Gateway/Event_Channel.cpp588
-rw-r--r--ACE/apps/Gateway/Gateway/Event_Channel.h135
-rw-r--r--ACE/apps/Gateway/Gateway/Event_Forwarding_Discriminator.cpp64
-rw-r--r--ACE/apps/Gateway/Gateway/Event_Forwarding_Discriminator.h65
-rw-r--r--ACE/apps/Gateway/Gateway/File_Parser.cpp163
-rw-r--r--ACE/apps/Gateway/Gateway/File_Parser.h97
-rw-r--r--ACE/apps/Gateway/Gateway/Gateway.cpp340
-rw-r--r--ACE/apps/Gateway/Gateway/Gateway.h33
-rw-r--r--ACE/apps/Gateway/Gateway/Makefile.am78
-rw-r--r--ACE/apps/Gateway/Gateway/Options.cpp288
-rw-r--r--ACE/apps/Gateway/Gateway/Options.h196
-rw-r--r--ACE/apps/Gateway/Gateway/connection_config55
-rw-r--r--ACE/apps/Gateway/Gateway/consumer_config35
-rw-r--r--ACE/apps/Gateway/Gateway/gateway.mpc29
-rw-r--r--ACE/apps/Gateway/Gateway/gatewayd.cpp69
-rw-r--r--ACE/apps/Gateway/Gateway/svc.conf3
-rw-r--r--ACE/apps/Gateway/Makefile.am14
-rw-r--r--ACE/apps/Gateway/Peer/Makefile.am52
-rw-r--r--ACE/apps/Gateway/Peer/Options.cpp201
-rw-r--r--ACE/apps/Gateway/Peer/Options.h135
-rw-r--r--ACE/apps/Gateway/Peer/Peer.cpp890
-rw-r--r--ACE/apps/Gateway/Peer/Peer.h257
-rw-r--r--ACE/apps/Gateway/Peer/peer.mpc23
-rw-r--r--ACE/apps/Gateway/Peer/peerd.cpp63
-rw-r--r--ACE/apps/Gateway/Peer/svc.conf2
-rw-r--r--ACE/apps/Gateway/README136
38 files changed, 6188 insertions, 0 deletions
diff --git a/ACE/apps/Gateway/Gateway/Concrete_Connection_Handlers.cpp b/ACE/apps/Gateway/Gateway/Concrete_Connection_Handlers.cpp
new file mode 100644
index 00000000000..e6ac36ba1fe
--- /dev/null
+++ b/ACE/apps/Gateway/Gateway/Concrete_Connection_Handlers.cpp
@@ -0,0 +1,802 @@
+// $Id$
+
+#define ACE_BUILD_SVC_DLL
+
+#include "ace/OS_NS_unistd.h"
+#include "Event_Channel.h"
+#include "Concrete_Connection_Handlers.h"
+
+ACE_RCSID(Gateway, Concrete_Connection_Handlers, "$Id$")
+
+Consumer_Handler::Consumer_Handler (const Connection_Config_Info &pci)
+ : Connection_Handler (pci)
+{
+ this->connection_role_ = 'C';
+ this->msg_queue ()->high_water_mark (Options::instance ()->max_queue_size ());
+}
+
+// This method should be called only when the Consumer shuts down
+// unexpectedly. This method simply marks the Connection_Handler as
+// having failed so that handle_close () can reconnect.
+
+// Do not close handler when received data successfully.
+// Consumer_Handler should could process received data.
+// For example, Consumer could send reply-event to Supplier.
+int
+Consumer_Handler::handle_input (ACE_HANDLE)
+{
+ // Do not set FAILED state at here, just at real failed place.
+
+ char buf[BUFSIZ];
+ ssize_t received = this->peer ().recv (buf, sizeof buf);
+
+ switch (received)
+ {
+ case -1:
+ this->state (Connection_Handler::FAILED);
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "(%t) Peer has failed unexpectedly for Consumer_Handler %d\n",
+ this->connection_id ()),
+ -1);
+ /* NOTREACHED */
+ case 0:
+ this->state (Connection_Handler::FAILED);
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "(%t) Peer has shutdown unexpectedly for Consumer_Handler %d\n",
+ this->connection_id ()),
+ -1);
+ /* NOTREACHED */
+ default:
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "(%t) IGNORED: Consumer is erroneously sending input to Consumer_Handler %d\n"
+ "data size = %d\n",
+ this->connection_id (),
+ received),
+ 0); // Return 0 to identify received data successfully.
+ /* NOTREACHED */
+ }
+}
+
+// Perform a non-blocking put() of event. If we are unable to send
+// the entire event the remainder is re-queued at the *front* of the
+// Event_List.
+
+int
+Consumer_Handler::nonblk_put (ACE_Message_Block *event)
+{
+ // Try to send the event. If we don't send it all (e.g., due to
+ // flow control), then re-queue the remainder at the head of the
+ // Event_List and ask the ACE_Reactor to inform us (via
+ // handle_output()) when it is possible to try again.
+
+ ssize_t n = this->send (event);
+
+ if (n == -1)
+ {
+ // -1 is returned only when things have really gone wrong (i.e.,
+ // not when flow control occurs). Thus, let's try to close down
+ // and set up a new reconnection by calling handle_close().
+ this->state (Connection_Handler::FAILED);
+ this->handle_close ();
+ return -1;
+ }
+ else if (errno == EWOULDBLOCK)
+ {
+ // We didn't manage to send everything, so we need to queue
+ // things up.
+
+ ACE_DEBUG ((LM_DEBUG,
+ "(%t) queueing activated on handle %d to routing id %d\n",
+ this->get_handle (),
+ this->connection_id ()));
+
+ // ACE_Queue in *front* of the list to preserve order.
+ if (this->msg_queue ()->enqueue_head
+ (event, (ACE_Time_Value *) &ACE_Time_Value::zero) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "(%t) %p\n",
+ "enqueue_head"),
+ -1);
+
+ // Tell ACE_Reactor to call us back when we can send again.
+ else if (ACE_Reactor::instance ()->schedule_wakeup
+ (this, ACE_Event_Handler::WRITE_MASK) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "(%t) %p\n",
+ "schedule_wakeup"),
+ -1);
+ return 0;
+ }
+ else
+ return n;
+}
+
+ssize_t
+Consumer_Handler::send (ACE_Message_Block *event)
+{
+ ACE_DEBUG ((LM_DEBUG,
+ "(%t) sending %d bytes to Consumer %d\n",
+ event->length (),
+ this->connection_id ()));
+
+ ssize_t len = event->length ();
+ ssize_t n = this->peer ().send (event->rd_ptr (), len);
+
+ if (n <= 0)
+ return errno == EWOULDBLOCK ? 0 : n;
+ else if (n < len)
+ {
+ // Re-adjust pointer to skip over the part we did send.
+ event->rd_ptr (n);
+ errno = EWOULDBLOCK;
+ }
+ else // if (n == length)
+ {
+ // The whole event is sent, we now decrement the reference count
+ // (which deletes itself with it reaches 0).
+ event->release ();
+ errno = 0;
+ }
+ this->total_bytes (n);
+ return n;
+}
+
+// Finish sending an event when flow control conditions abate.
+// This method is automatically called by the ACE_Reactor.
+
+int
+Consumer_Handler::handle_output (ACE_HANDLE)
+{
+ ACE_Message_Block *event = 0;
+
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT("(%t) Receiver signalled 'resume transmission' %d\n"),
+ this->get_handle ()));
+
+ // WIN32 Notes: When the receiver blocked, we started adding to the
+ // consumer handler's message Q. At this time, we registered a
+ // callback with the reactor to tell us when the TCP layer signalled
+ // that we could continue to send messages to the consumer. However,
+ // Winsock only sends this notification ONCE, so we have to assume
+ // at the application level, that we can continue to send until we
+ // get any subsequent blocking signals from the receiver's buffer.
+
+#if defined (ACE_WIN32)
+ // Win32 Winsock doesn't trigger multiple "You can write now"
+ // signals, so we have to assume that we can continue to write until
+ // we get another EWOULDBLOCK.
+
+ // We cancel the wakeup callback we set earlier.
+ if (ACE_Reactor::instance ()->cancel_wakeup
+ (this, ACE_Event_Handler::WRITE_MASK) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ ACE_TEXT ("(%t) %p\n"),
+ ACE_TEXT ("Error in ACE_Reactor::cancel_wakeup()")),
+ -1);
+
+ // The list had better not be empty, otherwise there's a bug!
+ while (this->msg_queue ()->dequeue_head
+ (event, (ACE_Time_Value *) &ACE_Time_Value::zero) != -1)
+ {
+ switch (this->nonblk_put (event))
+ {
+ case -1: // Error sending message to consumer.
+ {
+ // We are responsible for releasing an ACE_Message_Block if
+ // failures occur.
+ event->release ();
+
+ ACE_ERROR ((LM_ERROR,
+ ACE_TEXT ("(%t) %p\n"),
+ ACE_TEXT ("transmission failure")));
+ break;
+ }
+ case 0: // Partial Send - we got flow controlled by the receiver
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("%D Partial Send due to flow control")
+ ACE_TEXT ("- scheduling new wakeup with reactor\n")));
+
+ // Re-schedule a wakeup call from the reactor when the
+ // flow control conditions abate.
+ if (ACE_Reactor::instance ()->schedule_wakeup
+ (this,
+ ACE_Event_Handler::WRITE_MASK) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ ACE_TEXT ("(%t) %p\n"),
+ ACE_TEXT ("Error in ACE_Reactor::schedule_wakeup()")),
+ -1);
+
+ // Didn't write everything this time, come back later...
+ return 0;
+ }
+ default: // Sent the whole thing
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("Sent message from message Q, Q size = %d\n"),
+ this->msg_queue()->message_count ()));
+ break;
+ }
+ }
+ }
+
+ // If we drop out of the while loop, then the message Q should be
+ // empty...or there's a problem in the dequeue_head() call...but
+ // thats another story.
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("%D Sent all messages from consumers message Q\n")));
+
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("(%t) queueing deactivated on handle %d to routing id %d\n"),
+ this->get_handle (),
+ this->connection_id ()));
+#else /* !defined (ACE_WIN32) */
+ // The list had better not be empty, otherwise there's a bug!
+ if (this->msg_queue ()->dequeue_head
+ (event, (ACE_Time_Value *) &ACE_Time_Value::zero) != -1)
+ {
+ switch (this->nonblk_put (event))
+ {
+ case 0: // Partial send.
+ ACE_ASSERT (errno == EWOULDBLOCK);
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("%D Partial Send\n")));
+
+ // Didn't write everything this time, come back later...
+ break;
+
+ case -1:
+ // We are responsible for releasing an ACE_Message_Block if
+ // failures occur.
+ event->release ();
+ ACE_ERROR ((LM_ERROR,
+ ACE_TEXT ("(%t) %p\n"),
+ ACE_TEXT ("transmission failure")));
+
+ /* FALLTHROUGH */
+ default: // Sent the whole thing.
+
+ // If we succeed in writing the entire event (or we did not
+ // fail due to EWOULDBLOCK) then check if there are more
+ // events on the Message_Queue. If there aren't, tell the
+ // ACE_Reactor not to notify us anymore (at least until
+ // there are new events queued up).
+
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("QQQ::Sent Message from consumer's Q\n")));
+
+ if (this->msg_queue ()->is_empty ())
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("(%t) queueing deactivated on handle %d to routing id %d\n"),
+ this->get_handle (),
+ this->connection_id ()));
+
+ if (ACE_Reactor::instance ()->cancel_wakeup
+ (this, ACE_Event_Handler::WRITE_MASK) == -1)
+ ACE_ERROR ((LM_ERROR,
+ ACE_TEXT ("(%t) %p\n"),
+ ACE_TEXT ("cancel_wakeup")));
+ }
+ }
+ }
+ else
+ ACE_ERROR ((LM_ERROR,
+ ACE_TEXT ("(%t) %p\n"),
+ ACE_TEXT ("dequeue_head - handle_output called by reactor but nothing in Q")));
+#endif /* ACE_WIN32 */
+ return 0;
+}
+
+// Send an event to a Consumer (may queue if necessary).
+
+int
+Consumer_Handler::put (ACE_Message_Block *event,
+ ACE_Time_Value *)
+{
+ if (this->msg_queue ()->is_empty ())
+ // Try to send the event *without* blocking!
+ return this->nonblk_put (event);
+ else
+ // If we have queued up events due to flow control then just
+ // enqueue and return.
+ return this->msg_queue ()->enqueue_tail
+ (event, (ACE_Time_Value *) &ACE_Time_Value::zero);
+}
+
+Supplier_Handler::Supplier_Handler (const Connection_Config_Info &pci)
+ : Connection_Handler (pci),
+ msg_frag_ (0)
+{
+ this->connection_role_ = 'S';
+ this->msg_queue ()->high_water_mark (0);
+}
+
+// Receive an Event from a Supplier. Handles fragmentation.
+//
+// The event returned from recv consists of two parts:
+//
+// 1. The Address part, contains the "virtual" routing id.
+//
+// 2. The Data part, which contains the actual data to be forwarded.
+//
+// The reason for having two parts is to shield the higher layers
+// of software from knowledge of the event structure.
+
+int
+Supplier_Handler::recv (ACE_Message_Block *&forward_addr)
+{
+ if (this->msg_frag_ == 0)
+ // No existing fragment...
+ ACE_NEW_RETURN (this->msg_frag_,
+ ACE_Message_Block (sizeof (Event),
+ ACE_Message_Block::MB_DATA,
+ 0,
+ 0,
+ 0,
+ Options::instance ()->locking_strategy ()),
+ -1);
+
+ Event *event = (Event *) this->msg_frag_->rd_ptr ();
+ ssize_t header_received = 0;
+
+ const size_t HEADER_SIZE = sizeof (Event_Header);
+ ssize_t header_bytes_left_to_read =
+ HEADER_SIZE - this->msg_frag_->length ();
+
+ if (header_bytes_left_to_read > 0)
+ {
+ header_received = this->peer ().recv
+ (this->msg_frag_->wr_ptr (), header_bytes_left_to_read);
+
+ if (header_received == -1 /* error */
+ || header_received == 0 /* EOF */)
+ {
+ ACE_ERROR ((LM_ERROR, "%p\n",
+ "Recv error during header read "));
+ ACE_DEBUG ((LM_DEBUG,
+ "attempted to read %d\n",
+ header_bytes_left_to_read));
+ this->msg_frag_ = this->msg_frag_->release ();
+ return header_received;
+ }
+
+ // Bump the write pointer by the amount read.
+ this->msg_frag_->wr_ptr (header_received);
+
+ // At this point we may or may not have the ENTIRE header.
+ if (this->msg_frag_->length () < HEADER_SIZE)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "Partial header received: only %d bytes\n",
+ this->msg_frag_->length ()));
+ // Notify the caller that we didn't get an entire event.
+ errno = EWOULDBLOCK;
+ return -1;
+ }
+
+ // Convert the header into host byte order so that we can access
+ // it directly without having to repeatedly muck with it...
+ event->header_.decode ();
+
+ if (event->header_.len_ > ACE_INT32 (sizeof event->data_))
+ {
+ // This data_ payload is too big!
+ errno = EINVAL;
+ ACE_DEBUG ((LM_DEBUG,
+ "Data payload is too big (%d bytes)\n",
+ event->header_.len_));
+ return -1;
+ }
+
+ }
+
+ // At this point there is a complete, valid header in Event. Now we
+ // need to get the event payload. Due to incomplete reads this may
+ // not be the first time we've read in a fragment for this message.
+ // We account for this here. Note that the first time in here
+ // msg_frag_->wr_ptr() will point to event->data_. Every time we do
+ // a successful fragment read, we advance wr_ptr(). Therefore, by
+ // subtracting how much we've already read from the
+ // event->header_.len_ we complete the data_bytes_left_to_read...
+
+ ssize_t data_bytes_left_to_read =
+ ssize_t (event->header_.len_ - (msg_frag_->wr_ptr () - event->data_));
+
+ ssize_t data_received =
+ !data_bytes_left_to_read
+ ? 0 // peer().recv() should not be called when data_bytes_left_to_read is 0.
+ : this->peer ().recv (this->msg_frag_->wr_ptr (), data_bytes_left_to_read);
+
+ // Try to receive the remainder of the event.
+
+ switch (data_received)
+ {
+ case -1:
+ if (errno == EWOULDBLOCK)
+ // This might happen if only the header came through.
+ return -1;
+ else
+ /* FALLTHROUGH */;
+
+ case 0: // Premature EOF.
+ if (data_bytes_left_to_read)
+ {
+ this->msg_frag_ = this->msg_frag_->release ();
+ return 0;
+ }
+ /* FALLTHROUGH */;
+
+ default:
+ // Set the write pointer at 1 past the end of the event.
+ this->msg_frag_->wr_ptr (data_received);
+
+ if (data_received != data_bytes_left_to_read)
+ {
+ errno = EWOULDBLOCK;
+ // Inform caller that we didn't get the whole event.
+ return -1;
+ }
+ else
+ {
+ // Set the read pointer to the beginning of the event.
+ this->msg_frag_->rd_ptr (this->msg_frag_->base ());
+
+ // Allocate an event forwarding header and chain the data
+ // portion onto its continuation field.
+ forward_addr = new ACE_Message_Block (sizeof (Event_Key),
+ ACE_Message_Block::MB_PROTO,
+ this->msg_frag_,
+ 0,
+ 0,
+ Options::instance ()->locking_strategy ());
+ if (forward_addr == 0)
+ {
+ this->msg_frag_ = this->msg_frag_->release ();
+ errno = ENOMEM;
+ return -1;
+ }
+
+ Event_Key event_addr (this->connection_id (),
+ event->header_.type_);
+ // Copy the forwarding address from the Event_Key into
+ // forward_addr.
+ forward_addr->copy ((char *) &event_addr, sizeof (Event_Key));
+
+ // Reset the pointer to indicate we've got an entire event.
+ this->msg_frag_ = 0;
+ }
+
+ this->total_bytes (data_received + header_received);
+ ACE_DEBUG ((LM_DEBUG,
+ "(%t) connection id = %d, cur len = %d, total bytes read = %d\n",
+ event->header_.connection_id_,
+ event->header_.len_,
+ data_received + header_received));
+ if (Options::instance ()->enabled (Options::VERBOSE))
+ ACE_DEBUG ((LM_DEBUG,
+ "data_ = %*s\n",
+ event->header_.len_ - 2,
+ event->data_));
+
+ // Encode before returning so that we can set things out in
+ // network byte order.
+ event->header_.encode ();
+ return data_received + header_received;
+ }
+}
+
+// Receive various types of input (e.g., Peer event from the gatewayd,
+// as well as stdio).
+
+int
+Supplier_Handler::handle_input (ACE_HANDLE)
+{
+ ACE_Message_Block *event_key = 0;
+
+ switch (this->recv (event_key))
+ {
+ case 0:
+ // Note that a peer shouldn't initiate a shutdown by closing the
+ // connection. Therefore, the peer must have crashed, so we'll
+ // need to bail out here and let the higher layers reconnect.
+ this->state (Connection_Handler::FAILED);
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "(%t) Peer has closed down unexpectedly for Input Connection_Handler %d\n",
+ this->connection_id ()),
+ -1);
+ /* NOTREACHED */
+ case -1:
+ if (errno == EWOULDBLOCK)
+ // A short-read, we'll come back and finish it up later on!
+ return 0;
+ else // A weird problem occurred, shut down and start again.
+ {
+ this->state (Connection_Handler::FAILED);
+ ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p for Input Connection_Handler %d\n",
+ "Peer has failed unexpectedly",
+ this->connection_id ()),
+ -1);
+ }
+ /* NOTREACHED */
+ default:
+ // Route messages to Consumers.
+ return this->process (event_key);
+ }
+}
+
+// This delegates to the <Event_Channel> to do the actual processing.
+// Typically, this forwards the event to its appropriate Consumer(s).
+
+int
+Supplier_Handler::process (ACE_Message_Block *event_key)
+{
+ return this->event_channel_->put (event_key);
+}
+
+Thr_Consumer_Handler::Thr_Consumer_Handler (const Connection_Config_Info &pci)
+ : Consumer_Handler (pci)
+{
+ // It is not in thread svc() now.
+ in_thread_ = 0;
+}
+
+// Overriding handle_close() method. If in thread svc(), no need to
+// process handle_close() when call peer().close(), because the
+// connection is blocked now.
+
+int
+Thr_Consumer_Handler::handle_close (ACE_HANDLE h, ACE_Reactor_Mask m)
+{
+ if (in_thread_)
+ return 0;
+ else
+ return Consumer_Handler::handle_close (h, m);
+}
+
+// This method should be called only when the Consumer shuts down
+// unexpectedly. This method marks the Connection_Handler as having
+// failed and deactivates the ACE_Message_Queue (to wake up the thread
+// blocked on <dequeue_head> in svc()).
+// Thr_Consumer_Handler::handle_close () will eventually try to
+// reconnect...
+
+// Let Consumer_Handler receive normal data.
+int
+Thr_Consumer_Handler::handle_input (ACE_HANDLE h)
+{
+ // Call down to the <Consumer_Handler> to handle this first.
+ if (this->Consumer_Handler::handle_input (h) != 0)
+ {
+ // Only do such work when failed.
+
+ ACE_Reactor::instance ()->remove_handler
+ (h, ACE_Event_Handler::ALL_EVENTS_MASK | ACE_Event_Handler::DONT_CALL);
+
+ // Deactivate the queue while we try to get reconnected.
+ this->msg_queue ()->deactivate ();
+ // Will call handle_close.
+ return -1;
+ }
+ return 0;
+}
+
+// Initialize the threaded Consumer_Handler object and spawn a new
+// thread.
+
+int
+Thr_Consumer_Handler::open (void *)
+{
+ // Turn off non-blocking I/O.
+ if (this->peer ().disable (ACE_NONBLOCK) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "(%t) %p\n",
+ "disable"),
+ -1); // Incorrect info fixed.
+
+ // Call back to the <Event_Channel> to complete our initialization.
+ else if (this->event_channel_->complete_connection_connection (this) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "(%t) %p\n",
+ "complete_connection_connection"),
+ -1);
+
+ // Register ourselves to receive input events (which indicate that
+ // the Consumer has shut down unexpectedly).
+ else if (ACE_Reactor::instance ()->register_handler
+ (this, ACE_Event_Handler::READ_MASK) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "(%t) %p\n",
+ "register_handler"),
+ -1);
+
+ // Reactivate message queue. If it was active then this is the
+ // first time in and we need to spawn a thread, otherwise the queue
+ // was inactive due to some problem and we've already got a thread.
+ else if (this->msg_queue ()->activate () == ACE_Message_Queue<ACE_SYNCH>::WAS_ACTIVE)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "(%t) spawning new thread\n"));
+ // Become an active object by spawning a new thread to transmit
+ // events to Consumers.
+ return this->activate (THR_NEW_LWP | THR_DETACHED);
+ }
+ else
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "(%t) reusing existing thread\n"));
+ return 0;
+ }
+}
+
+// Queue up an event for transmission (must not block since
+// Supplier_Handlers may be single-threaded).
+
+int
+Thr_Consumer_Handler::put (ACE_Message_Block *mb, ACE_Time_Value *)
+{
+ // Perform non-blocking enqueue, i.e., if <msg_queue> is full
+ // *don't* block!
+ return this->msg_queue ()->enqueue_tail
+ (mb, (ACE_Time_Value *) &ACE_Time_Value::zero);
+}
+
+// Transmit events to the peer. Note the simplification resulting
+// from the use of threads, compared with the Reactive solution.
+
+int
+Thr_Consumer_Handler::svc (void)
+{
+ for (in_thread_ = 1;;)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "(%t) Thr_Consumer_Handler's handle = %d\n",
+ this->peer ().get_handle ()));
+
+ // Since this method runs in its own thread it is OK to block on
+ // output.
+
+ for (ACE_Message_Block *mb = 0;
+ this->msg_queue ()->dequeue_head (mb) != -1;
+ )
+ if (this->send (mb) == -1)
+ ACE_ERROR ((LM_ERROR,
+ "(%t) %p\n",
+ "send failed"));
+
+ ACE_ASSERT (errno == ESHUTDOWN);
+
+ ACE_DEBUG ((LM_DEBUG,
+ "(%t) shutting down threaded Consumer_Handler %d on handle %d\n",
+ this->connection_id (),
+ this->get_handle ()));
+
+ this->peer ().close ();
+
+ // Re-establish the connection, using exponential backoff.
+ for (this->timeout (1);
+ // Default is to reconnect synchronously.
+ this->event_channel_->initiate_connection_connection (this, 1) == -1;
+ // Second parameter '1' means using sync mode directly,
+ // don't care Options::blocking_semantics(). If don't do
+ // so, async mode will be used to connect which won't
+ // satisfy original design.
+ )
+ {
+ ACE_Time_Value tv (this->timeout ());
+
+ ACE_ERROR ((LM_ERROR,
+ "(%t) reattempting connection, sec = %d\n",
+ tv.sec ()));
+
+ ACE_OS::sleep (tv);
+ }
+ }
+
+ ACE_NOTREACHED (return 0;)
+}
+
+Thr_Supplier_Handler::Thr_Supplier_Handler (const Connection_Config_Info &pci)
+ : Supplier_Handler (pci)
+{
+ // It is not in thread svc() now.
+ in_thread_ = 0;
+}
+
+// Overriding handle_close() method. If in thread svc(), no need to
+// process handle_close() when call peer().close(), because the
+// connection is blocked now.
+
+int
+Thr_Supplier_Handler::handle_close (ACE_HANDLE h, ACE_Reactor_Mask m)
+{
+ if (in_thread_)
+ return 0;
+ else
+ return Supplier_Handler::handle_close (h, m);
+}
+
+int
+Thr_Supplier_Handler::open (void *)
+{
+ // Turn off non-blocking I/O.
+ if (this->peer ().disable (ACE_NONBLOCK) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "(%t) %p\n",
+ "disable"),
+ -1); // Incorrect info fixed.
+
+ // Call back to the <Event_Channel> to complete our initialization.
+ else if (this->event_channel_->complete_connection_connection (this) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "(%t) %p\n",
+ "complete_connection_connection"),
+ -1);
+
+ // Reactivate message queue. If it was active then this is the
+ // first time in and we need to spawn a thread, otherwise the queue
+ // was inactive due to some problem and we've already got a thread.
+ else if (this->msg_queue ()->activate () == ACE_Message_Queue<ACE_SYNCH>::WAS_ACTIVE)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "(%t) spawning new thread\n"));
+ // Become an active object by spawning a new thread to transmit
+ // events to peers.
+ return this->activate (THR_NEW_LWP | THR_DETACHED);
+ }
+ else
+ {
+ ACE_DEBUG ((LM_DEBUG, "(%t) reusing existing thread\n"));
+ return 0;
+ }
+}
+
+// Receive events from a Peer in a separate thread (note reuse of
+// existing code!).
+
+int
+Thr_Supplier_Handler::svc (void)
+{
+ for (in_thread_ = 1;;)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "(%t) Thr_Supplier_Handler's handle = %d\n",
+ this->peer ().get_handle ()));
+
+ // Since this method runs in its own thread and processes events
+ // for one connection it is OK to call down to the
+ // <Supplier_Handler::handle_input> method, which blocks on
+ // input.
+
+ while (this->Supplier_Handler::handle_input () != -1)
+ continue;
+
+ ACE_DEBUG ((LM_DEBUG,
+ "(%t) shutting down threaded Supplier_Handler %d on handle %d\n",
+ this->connection_id (),
+ this->get_handle ()));
+
+ this->peer ().close ();
+
+ // Deactivate the queue while we try to get reconnected.
+ this->msg_queue ()->deactivate ();
+
+ // Re-establish the connection, using expoential backoff.
+ for (this->timeout (1);
+ // Default is to reconnect synchronously.
+ this->event_channel_->initiate_connection_connection (this, 1) == -1;
+ // Second parameter '1' means using sync mode directly,
+ // don't care Options::blocking_semantics(). If don't do
+ // so, async mode will be used to connect which won't
+ // satisfy original design.
+ )
+ {
+ ACE_Time_Value tv (this->timeout ());
+ ACE_ERROR ((LM_ERROR,
+ "(%t) reattempting connection, sec = %d\n",
+ tv.sec ()));
+ ACE_OS::sleep (tv);
+ }
+ }
+ ACE_NOTREACHED(return 0);
+}
diff --git a/ACE/apps/Gateway/Gateway/Concrete_Connection_Handlers.h b/ACE/apps/Gateway/Gateway/Concrete_Connection_Handlers.h
new file mode 100644
index 00000000000..287a4c8ec34
--- /dev/null
+++ b/ACE/apps/Gateway/Gateway/Concrete_Connection_Handlers.h
@@ -0,0 +1,151 @@
+/* -*- C++ -*- */
+// $Id$
+
+// ============================================================================
+//
+// = LIBRARY
+// gateway
+//
+// = FILENAME
+// Concrete_Connection_Handlers.h
+//
+// = DESCRIPTION
+// These are all the subclasses of Connection_Handler that define the
+// appropriate threaded/reactive Consumer/Supplier behavior.
+//
+// = AUTHOR
+// Doug Schmidt <schmidt@cs.wustl.edu>
+//
+// ============================================================================
+
+#ifndef CONCRETE_CONNECTION_HANDLER
+#define CONCRETE_CONNECTION_HANDLER
+
+#include "Connection_Handler.h"
+
+class Supplier_Handler : public Connection_Handler
+{
+ // = TITLE
+ // Handles reception of Events from Suppliers.
+ //
+ // = DESCRIPTION
+ // Performs framing and error checking on Events. Intended to
+ // run reactively, i.e., in one thread of control using a
+ // Reactor for demuxing and dispatching.
+public:
+ // = Initialization method.
+ Supplier_Handler (const Connection_Config_Info &);
+
+protected:
+ // = All the following methods are upcalls, so they can be protected.
+
+ virtual int handle_input (ACE_HANDLE = ACE_INVALID_HANDLE);
+ // Receive and process peer events.
+
+ virtual int recv (ACE_Message_Block *&);
+ // Receive an event from a Supplier.
+
+ int process (ACE_Message_Block *event);
+ // This delegates to the <Event_Channel> to do the actual
+ // processing. Typically, it forwards the <event> to its
+ // appropriate Consumer.
+
+ ACE_Message_Block *msg_frag_;
+ // Keep track of event fragment to handle non-blocking recv's from
+ // Suppliers.
+};
+
+class Consumer_Handler : public Connection_Handler
+{
+ // = TITLE
+ // Handles transmission of events to Consumers.
+ //
+ // = DESCRIPTION
+ // Performs queueing and error checking. Intended to run
+ // reactively, i.e., in one thread of control using a Reactor
+ // for demuxing and dispatching. Also uses a Reactor to handle
+ // flow controlled output connections.
+public:
+ // = Initialization method.
+ Consumer_Handler (const Connection_Config_Info &);
+
+ virtual int put (ACE_Message_Block *event,
+ ACE_Time_Value * = 0);
+ // Send an event to a Consumer (may be queued if necessary).
+
+protected:
+ virtual int handle_output (ACE_HANDLE);
+ // Finish sending event when flow control conditions abate.
+
+ int nonblk_put (ACE_Message_Block *mb);
+ // Perform a non-blocking put().
+
+ virtual ssize_t send (ACE_Message_Block *);
+ // Send an event to a Consumer.
+
+ virtual int handle_input (ACE_HANDLE);
+ // Receive and process shutdowns from a Consumer.
+};
+
+class Thr_Consumer_Handler : public Consumer_Handler
+{
+ // = TITLE
+ // Runs each <Consumer_Handler> in a separate thread.
+public:
+ Thr_Consumer_Handler (const Connection_Config_Info &);
+
+ virtual int open (void *);
+ // Initialize the threaded Consumer_Handler object and spawn a new
+ // thread.
+
+ virtual int put (ACE_Message_Block *, ACE_Time_Value * = 0);
+ // Send a message to a peer.
+
+protected:
+ virtual int handle_input (ACE_HANDLE);
+ // Called when Peer shutdown unexpectedly.
+
+ virtual int svc (void);
+ // Transmit peer messages.
+
+ virtual int handle_close (ACE_HANDLE = ACE_INVALID_HANDLE,
+ ACE_Reactor_Mask = ACE_Event_Handler::ALL_EVENTS_MASK);
+ // When thread started, connection become blocked, so no need to use
+ // handle_close to reinitiate the connection_handler, so should
+ // override this function to justify if controlling is in thread or
+ // not. If yes, handle_close do nothing, otherwise, it call parent
+ // handle_close().
+
+private:
+ int in_thread_;
+ // If the controlling is in thread's svc() or not.
+};
+
+class Thr_Supplier_Handler : public Supplier_Handler
+{
+ // = TITLE
+ // Runs each <Supplier_Handler> in a separate thread.
+public:
+ Thr_Supplier_Handler (const Connection_Config_Info &pci);
+
+ virtual int open (void *);
+ // Initialize the object and spawn a new thread.
+
+protected:
+ virtual int handle_close (ACE_HANDLE = ACE_INVALID_HANDLE,
+ ACE_Reactor_Mask = ACE_Event_Handler::ALL_EVENTS_MASK);
+ // When thread started, connection become blocked, so no need to use
+ // handle_close to reinitiate the connection_handler, so should
+ // override this function to justify if controlling is in thread or
+ // not. If yes, handle_close do nothing, otherwise, it call parent
+ // handle_close().
+
+ virtual int svc (void);
+ // Transmit peer messages.
+
+private:
+ int in_thread_;
+ // If the controlling is in thread's svc() or not.
+};
+
+#endif /* CONCRETE_CONNECTION_HANDLER */
diff --git a/ACE/apps/Gateway/Gateway/Config_Files.cpp b/ACE/apps/Gateway/Gateway/Config_Files.cpp
new file mode 100644
index 00000000000..f1b9e96dd23
--- /dev/null
+++ b/ACE/apps/Gateway/Gateway/Config_Files.cpp
@@ -0,0 +1,215 @@
+// $Id$
+
+#define ACE_BUILD_SVC_DLL
+
+#include "Config_Files.h"
+#include "Options.h"
+
+ACE_RCSID(Gateway, Config_Files, "$Id$")
+
+// This fixes a nasty bug with cfront-based compilers (like
+// Centerline).
+typedef FP::Return_Type FP_RETURN_TYPE;
+
+FP_RETURN_TYPE
+Consumer_Config_File_Parser::read_entry (Consumer_Config_Info &entry,
+ int &line_number)
+{
+ FP_RETURN_TYPE result;
+
+ // Increment the line count.
+ line_number++;
+
+ // Ignore comments, check for EOF and EOLINE if this succeeds, we
+ // have our connection id.
+
+ while ((result = this->getint (entry.connection_id_)) != FP::RT_SUCCESS)
+ if (result == FP::RT_EOFILE)
+ return FP::RT_EOFILE;
+ else if (result == FP::RT_EOLINE
+ || result == FP::RT_COMMENT)
+ line_number++;
+
+ // Get the payload type.
+ result = this->getint (entry.type_);
+ if (result != FP::RT_SUCCESS)
+ return result;
+
+ // get all the consumers.
+ entry.total_consumers_ = 0;
+
+ while ((result = this->getint
+ (entry.consumers_[entry.total_consumers_])) == FP::RT_SUCCESS)
+ ++entry.total_consumers_; // do nothing (should check against max...)
+
+ if (result == FP::RT_EOLINE || result == FP::RT_EOFILE)
+ return FP::RT_SUCCESS;
+ else
+ return result;
+}
+
+FP_RETURN_TYPE
+Connection_Config_File_Parser::read_entry (Connection_Config_Info &entry,
+ int &line_number)
+{
+ char buf[BUFSIZ];
+ FP_RETURN_TYPE result;
+
+ // Increment the line count.
+ line_number++;
+
+ // Ignore comments, check for EOF and EOLINE if this succeeds, we
+ // have our connection id
+
+ while ((result = this->getint (entry.connection_id_)) != FP::RT_SUCCESS)
+ if (result == FP::RT_EOFILE)
+ return FP::RT_EOFILE;
+ else if (result == FP::RT_EOLINE
+ || result == FP::RT_COMMENT)
+ line_number++;
+
+ // Get the hostname.
+ result = this->getword (entry.host_);
+ if (result != FP::RT_SUCCESS)
+ return result;
+
+ ACE_INT32 port;
+
+ // Get the port number.
+ result = this->getint (port);
+ if (result == FP::RT_DEFAULT)
+ {
+ // Get the proxy role, i.e., 'C' (Consumer) or 'S' (Supplier).
+ result = this->getword (buf);
+ if (result != FP::RT_SUCCESS)
+ return result;
+ else
+ entry.connection_role_ = buf[0];
+
+ if (entry.connection_role_ == 'C')
+ entry.remote_port_ = Options::instance ()->consumer_connector_port ();
+ else if (entry.connection_role_ == 'S')
+ entry.remote_port_ = Options::instance ()->supplier_connector_port ();
+ else
+ // Yikes, this is a *weird* error!
+ entry.remote_port_ = 0;
+ }
+ else if (result != FP::RT_SUCCESS)
+ return result;
+ else
+ {
+ entry.remote_port_ = (unsigned short) port;
+
+ // Get the proxy role, i.e., 'C' (Consumer) or 'S' (Supplier).
+ result = this->getword (buf);
+ if (result != FP::RT_SUCCESS)
+ return result;
+ else
+ entry.connection_role_ = buf[0];
+ }
+
+ // Get the max retry delay.
+ result = this->getint (entry.max_retry_timeout_);
+ if (result == FP::RT_DEFAULT)
+ entry.max_retry_timeout_ = Options::instance ()->max_timeout ();
+ else if (result != FP::RT_SUCCESS)
+ return result;
+
+ // Get the local port number.
+ result = this->getint (port);
+ if (result == FP::RT_DEFAULT)
+ entry.local_port_ = 0; // @@ Should make this an option.
+ else if (result != FP::RT_SUCCESS)
+ return result;
+ else
+ entry.local_port_ = (unsigned short) port;
+
+ ACE_INT32 priority;
+
+ // Get the priority.
+ result = this->getint (priority);
+ if (result != FP::RT_SUCCESS)
+ return result;
+ else
+ entry.priority_ = priority;
+
+ return FP::RT_SUCCESS;
+}
+
+#if defined (DEBUGGING)
+int
+main (int argc, char *argv[])
+{
+ FP_RETURN_TYPE result;
+ int line_number = 0;
+
+ {
+ Connection_Config_Info entry;
+ Connection_Config_File_Parser connection_config_file;
+
+ connection_config_file.open (argc > 1 ? argv[1] : "connection_config");
+
+ int line_number = 0;
+
+ ACE_DEBUG ((LM_DEBUG,
+ "ConnID\tHost\t\tRPort\tRole\tRetry\tLPort\tPriority\n"));
+
+ // Read config file line at a time.
+ while ((result = connection_config_file.read_entry (entry, line_number)) != FP::RT_EOFILE)
+ if (result == FP::RT_PARSE_ERROR)
+ ACE_DEBUG ((LM_DEBUG,
+ "Error line %d.\n",
+ line_number));
+ else
+ ACE_DEBUG ((LM_DEBUG,
+ "%d\t%s\t%d\t%c\t%d\t%d\t%d\n",
+ entry.connection_id_,
+ entry.host_,
+ entry.remote_port_,
+ entry.connection_role_,
+ entry.max_retry_timeout_,
+ entry.local_port_,
+ entry.priority_));
+
+ connection_config_file.close ();
+ }
+
+ {
+ Consumer_Config_Info entry;
+ Consumer_Config_File_Parser consumer_config_file;
+
+ consumer_config_file.open (argc > 2 ? argv[2] : "consumer_config");
+
+ line_number = 0;
+
+ ACE_DEBUG ((LM_DEBUG,
+ "\nConnID\tLogic\tPayload\tDestinations\n"));
+
+ // Read config file line at a time.
+ while ((result = consumer_config_file.read_entry (entry, line_number)) != FP::RT_EOFILE)
+ if (result == FP::RT_PARSE_ERROR)
+ ACE_DEBUG ((LM_DEBUG,
+ "Error line %d.\n",
+ line_number));
+ else
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "%d\t%d\t",
+ entry.connection_id_,
+ entry.type_));
+
+ while (--entry.total_consumers_ >= 0)
+ ACE_DEBUG ((LM_DEBUG,
+ "%d,",
+ entry.consumers_[entry.total_consumers_]));
+ ACE_DEBUG ((LM_DEBUG,
+ "\n"));
+ }
+
+ consumer_config_file.close ();
+ }
+
+ return 0;
+}
+#endif /* DEBUGGING */
+
diff --git a/ACE/apps/Gateway/Gateway/Config_Files.h b/ACE/apps/Gateway/Gateway/Config_Files.h
new file mode 100644
index 00000000000..9fd96b687f6
--- /dev/null
+++ b/ACE/apps/Gateway/Gateway/Config_Files.h
@@ -0,0 +1,98 @@
+/* -*- C++ -*- */
+// $Id$
+
+// ============================================================================
+//
+// = LIBRARY
+// gateway
+//
+// = FILENAME
+// Config_Files.h
+//
+// = AUTHOR
+// Doug Schmidt
+//
+// ============================================================================
+
+#ifndef _CONFIG_FILES
+#define _CONFIG_FILES
+
+#include "File_Parser.h"
+#include "Event.h"
+
+// Forward declaration.
+class Event_Channel;
+
+class Connection_Config_Info
+ // = TITLE
+ // Stores connection configuration information.
+{
+public:
+ ACE_INT32 connection_id_;
+ // Connection id for this Connection_Handler.
+
+ char host_[BUFSIZ];
+ // Host to connect with.
+
+ u_short remote_port_;
+ // Port to connect with.
+
+ char connection_role_;
+ // 'S' (supplier) or 'C' (consumer).
+
+ ACE_INT32 max_retry_timeout_;
+ // Maximum amount of time to wait for reconnecting.
+
+ u_short local_port_;
+ // Our local port number.
+
+ ACE_INT32 priority_;
+ // Priority by which different Consumers and Suppliers should be
+ // serviced.
+
+ Event_Channel *event_channel_;
+ // We just need a place to store this until we can pass it along
+ // when creating a Connection_Handler.
+};
+
+class Connection_Config_File_Parser : public File_Parser<Connection_Config_Info>
+ // = TITLE
+ // Parser for the Connection_Handler Connection file.
+{
+public:
+ virtual FP::Return_Type read_entry (Connection_Config_Info &entry,
+ int &line_number);
+ // Read in a <Connection_Config_Info> entry.
+
+};
+
+class Consumer_Config_Info
+ // = TITLE
+ // Stores the information in a Consumer Map entry.
+{
+public:
+ ACE_INT32 connection_id_;
+ // Connection id.
+
+ ACE_INT32 type_;
+ // Message type.
+
+ ACE_INT32 consumers_[MAX_CONSUMERS];
+ // Connection ids for consumers that will be routed information
+ // containing this <connection_id_>
+
+ ACE_INT32 total_consumers_;
+ // Total number of these consumers.
+};
+
+class Consumer_Config_File_Parser : public File_Parser<Consumer_Config_Info>
+ // = TITLE
+ // Parser for the Consumer Map file.
+{
+public:
+ virtual FP::Return_Type read_entry (Consumer_Config_Info &entry,
+ int &line_number);
+ // Read in a <Consumer_Config_Info> entry.
+};
+
+#endif /* _CONFIG_FILES */
diff --git a/ACE/apps/Gateway/Gateway/Connection_Handler.cpp b/ACE/apps/Gateway/Gateway/Connection_Handler.cpp
new file mode 100644
index 00000000000..ff93886c187
--- /dev/null
+++ b/ACE/apps/Gateway/Gateway/Connection_Handler.cpp
@@ -0,0 +1,272 @@
+// $Id$
+
+#define ACE_BUILD_SVC_DLL
+
+#include "ace/OS_NS_string.h"
+#include "Event_Channel.h"
+#include "Concrete_Connection_Handlers.h"
+
+ACE_RCSID(Gateway, Connection_Handler, "$Id$")
+
+Event_Channel *
+Connection_Handler::event_channel (void) const
+{
+ return this->event_channel_;
+}
+
+void
+Connection_Handler::event_channel (Event_Channel *ec)
+{
+ this->event_channel_ = ec;
+}
+
+void
+Connection_Handler::connection_id (CONNECTION_ID id)
+{
+ this->connection_id_ = id;
+}
+
+CONNECTION_ID
+Connection_Handler::connection_id (void) const
+{
+ return this->connection_id_;
+}
+
+// The total number of bytes sent/received on this Proxy.
+
+size_t
+Connection_Handler::total_bytes (void) const
+{
+ return this->total_bytes_;
+}
+
+void
+Connection_Handler::total_bytes (size_t bytes)
+{
+ this->total_bytes_ += bytes;
+}
+
+Connection_Handler::Connection_Handler (void)
+{
+}
+
+Connection_Handler::Connection_Handler (const Connection_Config_Info &pci)
+ : local_addr_ (pci.local_port_),
+ connection_id_ (pci.connection_id_),
+ total_bytes_ (0),
+ state_ (Connection_Handler::IDLE),
+ timeout_ (1),
+ max_timeout_ (pci.max_retry_timeout_),
+ event_channel_ (pci.event_channel_)
+{
+ if (ACE_OS::strlen (pci.host_) > 0)
+ this->remote_addr_.set (pci.remote_port_, pci.host_);
+ else
+ this->remote_addr_.set (pci.remote_port_, ACE_DEFAULT_SERVER_HOST);
+ // Set the priority of the Proxy.
+ this->priority (int (pci.priority_));
+}
+
+// Set the connection_role.
+
+void
+Connection_Handler::connection_role (char d)
+{
+ this->connection_role_ = d;
+}
+
+// Get the connection_role.
+
+char
+Connection_Handler::connection_role (void) const
+{
+ return this->connection_role_;
+}
+
+// Sets the timeout delay.
+
+void
+Connection_Handler::timeout (long to)
+{
+ if (to > this->max_timeout_)
+ to = this->max_timeout_;
+
+ this->timeout_ = to;
+}
+
+// Re-calculate the current retry timeout delay using exponential
+// backoff. Returns the original timeout (i.e., before the
+// re-calculation).
+
+long
+Connection_Handler::timeout (void)
+{
+ long old_timeout = this->timeout_;
+ this->timeout_ *= 2;
+
+ if (this->timeout_ > this->max_timeout_)
+ this->timeout_ = this->max_timeout_;
+
+ return old_timeout;
+}
+
+// Sets the max timeout delay.
+
+void
+Connection_Handler::max_timeout (long mto)
+{
+ this->max_timeout_ = mto;
+}
+
+// Gets the max timeout delay.
+
+long
+Connection_Handler::max_timeout (void) const
+{
+ return this->max_timeout_;
+}
+
+// Restart connection asynchronously when timeout occurs.
+
+int
+Connection_Handler::handle_timeout (const ACE_Time_Value &,
+ const void *)
+{
+ ACE_DEBUG ((LM_DEBUG,
+ "(%t) attempting to reconnect Connection_Handler %d with timeout = %d\n",
+ this->connection_id (),
+ this->timeout_));
+
+ // Delegate the re-connection attempt to the Event Channel.
+ this->event_channel_->initiate_connection_connection (this);
+
+ return 0;
+}
+
+// Handle shutdown of the Connection_Handler object.
+
+int
+Connection_Handler::handle_close (ACE_HANDLE, ACE_Reactor_Mask)
+{
+ ACE_DEBUG ((LM_DEBUG,
+ "(%t) shutting down %s Connection_Handler %d on handle %d\n",
+ this->connection_role () == 'C' ? "Consumer" : "Supplier",
+ this->connection_id (),
+ this->get_handle ()));
+
+ // Restart the connection, if possible.
+ return this->event_channel_->reinitiate_connection_connection (this);
+}
+
+// Set the state of the Proxy.
+
+void
+Connection_Handler::state (Connection_Handler::State s)
+{
+ this->state_ = s;
+}
+
+// Return the current state of the Proxy.
+
+Connection_Handler::State
+Connection_Handler::state (void) const
+{
+ return this->state_;
+}
+
+// Upcall from the <ACE_Acceptor> or <ACE_Connector> that delegates
+// control to our Connection_Handler.
+
+int
+Connection_Handler::open (void *)
+{
+ ACE_DEBUG ((LM_DEBUG, "(%t) %s Connection_Handler's handle = %d\n",
+ this->connection_role () == 'C' ? "Consumer" : "Supplier",
+ this->peer ().get_handle ()));
+
+ // Call back to the <Event_Channel> to complete our initialization.
+ if (this->event_channel_->complete_connection_connection (this) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "complete_connection_connection"), -1);
+
+ // Turn on non-blocking I/O.
+ else if (this->peer ().enable (ACE_NONBLOCK) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "enable"), -1);
+
+ // Register ourselves to receive input events.
+ else if (ACE_Reactor::instance ()->register_handler
+ (this, ACE_Event_Handler::READ_MASK) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "register_handler"), -1);
+ else
+ return 0;
+}
+
+const ACE_INET_Addr &
+Connection_Handler::remote_addr (void) const
+{
+ return this->remote_addr_;
+}
+
+void
+Connection_Handler::remote_addr (ACE_INET_Addr &ra)
+{
+ this->remote_addr_ = ra;
+}
+
+const ACE_INET_Addr &
+Connection_Handler::local_addr (void) const
+{
+ return this->local_addr_;
+}
+
+void
+Connection_Handler::local_addr (ACE_INET_Addr &la)
+{
+ this->local_addr_ = la;
+}
+
+// Make the appropriate type of <Connection_Handler> (i.e.,
+// <Consumer_Handler>, <Supplier_Handler>, <Thr_Consumer_Handler>, or
+// <Thr_Supplier_Handler>).
+
+Connection_Handler *
+Connection_Handler_Factory::make_connection_handler (const Connection_Config_Info &pci)
+{
+ Connection_Handler *connection_handler = 0;
+
+ // The next few lines of code are dependent on whether we are making
+ // a threaded/reactive Supplier_Handler/Consumer_Handler.
+
+ if (pci.connection_role_ == 'C') // Configure a Consumer_Handler.
+ {
+ // Create a threaded Consumer_Handler.
+ if (ACE_BIT_ENABLED (Options::instance ()->threading_strategy (),
+ Options::OUTPUT_MT))
+ ACE_NEW_RETURN (connection_handler,
+ Thr_Consumer_Handler (pci),
+ 0);
+
+ // Create a reactive Consumer_Handler.
+ else
+ ACE_NEW_RETURN (connection_handler,
+ Consumer_Handler (pci),
+ 0);
+ }
+ else // connection_role == 'S', so configure a Supplier_Handler.
+ {
+ // Create a threaded Supplier_Handler.
+ if (ACE_BIT_ENABLED (Options::instance ()->threading_strategy (),
+ Options::INPUT_MT))
+ ACE_NEW_RETURN (connection_handler,
+ Thr_Supplier_Handler (pci),
+ 0);
+
+ // Create a reactive Supplier_Handler.
+ else
+ ACE_NEW_RETURN (connection_handler,
+ Supplier_Handler (pci),
+ 0);
+ }
+
+ return connection_handler;
+}
+
diff --git a/ACE/apps/Gateway/Gateway/Connection_Handler.h b/ACE/apps/Gateway/Gateway/Connection_Handler.h
new file mode 100644
index 00000000000..a8a72830135
--- /dev/null
+++ b/ACE/apps/Gateway/Gateway/Connection_Handler.h
@@ -0,0 +1,157 @@
+/* -*- C++ -*- */
+// $Id$
+
+// ============================================================================
+//
+// = LIBRARY
+// gateway
+//
+// = FILENAME
+// Connection_Handler.h
+//
+// = AUTHOR
+// Doug Schmidt
+//
+// ============================================================================
+
+#ifndef _CONNECTION_HANDLER
+#define _CONNECTION_HANDLER
+
+#include "ace/Service_Config.h"
+
+#if !defined (ACE_LACKS_PRAGMA_ONCE)
+# pragma once
+#endif /* ACE_LACKS_PRAGMA_ONCE */
+
+#include "ace/SOCK_Connector.h"
+#include "ace/Svc_Handler.h"
+#include "Config_Files.h"
+#include "Options.h"
+#include "Event.h"
+
+// Forward declaration.
+class Event_Channel;
+
+class Connection_Handler : public ACE_Svc_Handler<ACE_SOCK_STREAM, ACE_SYNCH>
+{
+ // = TITLE
+ // <Connection_Handler> contains info about connection state and
+ // addressing.
+ //
+ // = DESCRIPTION
+ // The <Connection_Handler> classes process events sent to the
+ // Event Channel from Suppliers and forward them to Consumers.
+public:
+ Connection_Handler (void);
+ // Default constructor (needed to make <ACE_Connector> happy).
+
+ Connection_Handler (const Connection_Config_Info &);
+ // Real constructor.
+
+ virtual int open (void * = 0);
+ // Initialize and activate a single-threaded <Connection_Handler>
+ // (called by <ACE_Connector::handle_output>).
+
+ // = The current state of the Connection_Handler.
+ enum State
+ {
+ IDLE = 1, // Prior to initialization.
+ CONNECTING, // During connection establishment.
+ ESTABLISHED, // Connection_Handler is established and active.
+ DISCONNECTING, // Connection_Handler is in the process of connecting.
+ FAILED // Connection_Handler has failed.
+ };
+
+ // = Set/get the current state.
+ void state (State);
+ State state (void) const;
+
+ // = Set/get remote INET addr.
+ void remote_addr (ACE_INET_Addr &);
+ const ACE_INET_Addr &remote_addr (void) const;
+
+ // = Set/get local INET addr.
+ void local_addr (ACE_INET_Addr &);
+ const ACE_INET_Addr &local_addr (void) const;
+
+ // = Set/get connection id.
+ void connection_id (CONNECTION_ID);
+ CONNECTION_ID connection_id (void) const;
+
+ // = Set/get the current retry timeout delay.
+ void timeout (long);
+ long timeout (void);
+
+ // = Set/get the maximum retry timeout delay.
+ void max_timeout (long);
+ long max_timeout (void) const;
+
+ // = Set/get proxy role (i.e., 'S' for Supplier and 'C' for Consumer
+ // (necessary for error checking).
+ void connection_role (char);
+ char connection_role (void) const;
+
+ // = Set/get the <Event_Channel> *.
+ void event_channel (Event_Channel *);
+ Event_Channel *event_channel (void) const;
+
+ // = The total number of bytes sent/received on this proxy.
+ void total_bytes (size_t bytes);
+ // Increment count by <bytes>.
+ size_t total_bytes (void) const;
+ // Return the current byte count.
+
+ virtual int handle_timeout (const ACE_Time_Value &, const void *arg);
+ // Perform timer-based Connection_Handler reconnection.
+
+ virtual int handle_close (ACE_HANDLE = ACE_INVALID_HANDLE,
+ ACE_Reactor_Mask = ACE_Event_Handler::ALL_EVENTS_MASK);
+ // Perform Connection_Handler termination.
+
+protected:
+ ACE_INET_Addr remote_addr_;
+ // Address of peer.
+
+ ACE_INET_Addr local_addr_;
+ // Address of us.
+
+ CONNECTION_ID connection_id_;
+ // The assigned connection ID of this entry.
+
+ size_t total_bytes_;
+ // The total number of bytes sent/received on this proxy.
+
+ State state_;
+ // The current state of the proxy.
+
+ long timeout_;
+ // Amount of time to wait between reconnection attempts.
+
+ long max_timeout_;
+ // Maximum amount of time to wait between reconnection attempts.
+
+ char connection_role_;
+ // Indicates which role the proxy plays ('S' == Supplier and 'C' ==
+ // Consumer).
+
+ Event_Channel *event_channel_;
+ // Reference to the <Event_Channel> that we use to forward all
+ // the events from Consumers and Suppliers.
+};
+
+class Connection_Handler_Factory
+{
+ // = TITLE
+ // Creates the appropriate type of <Connection_Handler>.
+ //
+ // = DESCRIPTION
+ // <Connection_Handler>s can include <Consumer_Handler>,
+ // <Supplier_Handler>, <Thr_Consumer_Handler>, or
+ // <Thr_Supplier_Handler>).
+public:
+ Connection_Handler *make_connection_handler (const Connection_Config_Info &);
+ // Make the appropriate type of <Connection_Handler>, based on the
+ // <Connection_Config_Info> parameter.
+};
+
+#endif /* _CONNECTION_HANDLER */
diff --git a/ACE/apps/Gateway/Gateway/Connection_Handler_Acceptor.cpp b/ACE/apps/Gateway/Gateway/Connection_Handler_Acceptor.cpp
new file mode 100644
index 00000000000..7790fb83d08
--- /dev/null
+++ b/ACE/apps/Gateway/Gateway/Connection_Handler_Acceptor.cpp
@@ -0,0 +1,56 @@
+// $Id$
+
+#define ACE_BUILD_SVC_DLL
+
+#include "Event_Channel.h"
+#include "Connection_Handler_Acceptor.h"
+
+ACE_RCSID(Gateway, Connection_Handler_Acceptor, "$Id$")
+
+int
+Connection_Handler_Acceptor::make_svc_handler (Connection_Handler *&ch)
+{
+ ACE_ALLOCATOR_RETURN (ch,
+ this->connection_handler_factory_.make_connection_handler (this->connection_config_info_),
+ -1);
+ return 0;
+}
+
+int
+Connection_Handler_Acceptor::accept_svc_handler (Connection_Handler *ch)
+{
+ if (this->inherited::accept_svc_handler (ch) == -1)
+ return -1;
+ else
+ {
+ ch->connection_id (Options::instance ()->connection_id ());
+ ACE_INET_Addr remote_addr;
+
+ if (ch->peer ().get_remote_addr (remote_addr) == -1)
+ return -1;
+
+ // Set the remote address of our connected Peer.
+ ch->remote_addr (remote_addr);
+
+ // Set the Event_Channel pointer.
+ ch->event_channel (&this->event_channel_);
+
+ // Increment the connection ID by one.
+ Options::instance ()->connection_id ()++;
+ return 0;
+ }
+}
+
+Connection_Handler_Acceptor::Connection_Handler_Acceptor (Event_Channel &ec,
+ char connection_role)
+ : event_channel_ (ec)
+{
+ this->connection_config_info_.connection_id_ = 0;
+ this->connection_config_info_.host_[0] = '\0';
+ this->connection_config_info_.remote_port_ = 0;
+ this->connection_config_info_.connection_role_ = connection_role;
+ this->connection_config_info_.max_retry_timeout_ = Options::instance ()->max_timeout ();
+ this->connection_config_info_.local_port_ = 0;
+ this->connection_config_info_.priority_ = 1;
+}
+
diff --git a/ACE/apps/Gateway/Gateway/Connection_Handler_Acceptor.h b/ACE/apps/Gateway/Gateway/Connection_Handler_Acceptor.h
new file mode 100644
index 00000000000..777f58c4a5b
--- /dev/null
+++ b/ACE/apps/Gateway/Gateway/Connection_Handler_Acceptor.h
@@ -0,0 +1,65 @@
+/* -*- C++ -*- */
+// $Id$
+
+// ============================================================================
+//
+// = LIBRARY
+// gateway
+//
+// = FILENAME
+// Connection_Handler_acceptor.h
+//
+// = AUTHOR
+// Doug Schmidt
+//
+// ============================================================================
+
+#ifndef _CONNECTION_HANDLER_ACCEPTOR
+#define _CONNECTION_HANDLER_ACCEPTOR
+
+#include "ace/Acceptor.h"
+
+#if !defined (ACE_LACKS_PRAGMA_ONCE)
+# pragma once
+#endif /* ACE_LACKS_PRAGMA_ONCE */
+
+#include "ace/SOCK_Acceptor.h"
+#include "Connection_Handler.h"
+
+// Forward declaration
+class Event_Channel;
+
+class Connection_Handler_Acceptor : public ACE_Acceptor<Connection_Handler, ACE_SOCK_ACCEPTOR>
+{
+ // = TITLE
+ // A concrete factory class that setups connections to peerds
+ // and produces a new Connection_Handler object to do the dirty
+ // work...
+public:
+ Connection_Handler_Acceptor (Event_Channel &,
+ char connection_role);
+ // Constructor.
+
+ virtual int make_svc_handler (Connection_Handler *&ch);
+ // Hook method for creating an appropriate <Connection_Handler>.
+
+ virtual int accept_svc_handler (Connection_Handler *ch);
+ // Hook method for accepting a connection into the
+ // <Connection_Handler>.
+
+protected:
+ typedef ACE_Acceptor<Connection_Handler, ACE_SOCK_ACCEPTOR>
+ inherited;
+ // Make life easier later on.
+
+ Event_Channel &event_channel_;
+ // Reference to the event channel.
+
+ Connection_Config_Info connection_config_info_;
+ // Keeps track of what type of proxy we need to create.
+
+ Connection_Handler_Factory connection_handler_factory_;
+ // Make the appropriate type of <Connection_Handler>.
+};
+
+#endif /* _CONNECTION_HANDLER_ACCEPTOR */
diff --git a/ACE/apps/Gateway/Gateway/Connection_Handler_Connector.cpp b/ACE/apps/Gateway/Gateway/Connection_Handler_Connector.cpp
new file mode 100644
index 00000000000..368ad14f373
--- /dev/null
+++ b/ACE/apps/Gateway/Gateway/Connection_Handler_Connector.cpp
@@ -0,0 +1,60 @@
+// $Id$
+
+#include "Connection_Handler_Connector.h"
+#include "ace/os_include/os_netdb.h"
+
+ACE_RCSID(Gateway, Connection_Handler_Connector, "$Id$")
+
+Connection_Handler_Connector::Connection_Handler_Connector (void)
+{
+}
+
+// Initiate (or reinitiate) a connection to the Connection_Handler.
+
+int
+Connection_Handler_Connector::initiate_connection (Connection_Handler *connection_handler,
+ ACE_Synch_Options &synch_options)
+{
+ ACE_TCHAR addr_buf[MAXHOSTNAMELEN];
+
+ // Mark ourselves as idle so that the various iterators will ignore
+ // us until we are reconnected.
+ connection_handler->state (Connection_Handler::IDLE);
+
+ // We check the remote addr second so that it remains in the
+ // addr_buf.
+ if (connection_handler->local_addr ().addr_to_string (addr_buf, sizeof addr_buf) == -1
+ || connection_handler->remote_addr ().addr_to_string (addr_buf, sizeof addr_buf) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("(%t) %p\n"),
+ ACE_TEXT ("can't obtain peer's address")), -1);
+
+ // Try to connect to the Peer.
+ if (this->connect (connection_handler,
+ connection_handler->remote_addr (),
+ synch_options,
+ connection_handler->local_addr ()) == -1)
+ {
+ if (errno != EWOULDBLOCK)
+ {
+ connection_handler->state (Connection_Handler::FAILED);
+ ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) %p on address %s\n"),
+ ACE_TEXT ("connect"), addr_buf));
+ return -1;
+ }
+ else
+ {
+ connection_handler->state (Connection_Handler::CONNECTING);
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("(%t) in the process of connecting to %s\n"),
+ addr_buf));
+ }
+ }
+ else
+ {
+ connection_handler->state (Connection_Handler::ESTABLISHED);
+ ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) connected to %s on %d\n"),
+ addr_buf, connection_handler->get_handle ()));
+ }
+ return 0;
+}
+
diff --git a/ACE/apps/Gateway/Gateway/Connection_Handler_Connector.h b/ACE/apps/Gateway/Gateway/Connection_Handler_Connector.h
new file mode 100644
index 00000000000..f4e7d7d06a1
--- /dev/null
+++ b/ACE/apps/Gateway/Gateway/Connection_Handler_Connector.h
@@ -0,0 +1,44 @@
+/* -*- C++ -*- */
+// $Id$
+
+// ============================================================================
+//
+// = LIBRARY
+// gateway
+//
+// = FILENAME
+// Connection_Handler_Connector.h
+//
+// = AUTHOR
+// Doug Schmidt
+//
+// ============================================================================
+
+#ifndef _IO_HANDLER_CONNECTOR
+#define _IO_HANDLER_CONNECTOR
+
+#include "ace/Connector.h"
+
+#if !defined (ACE_LACKS_PRAGMA_ONCE)
+# pragma once
+#endif /* ACE_LACKS_PRAGMA_ONCE */
+
+#include "ace/SOCK_Connector.h"
+#include "Connection_Handler.h"
+
+class Connection_Handler_Connector : public ACE_Connector<Connection_Handler, ACE_SOCK_CONNECTOR>
+{
+ // = TITLE
+ // A concrete factory class that setups connections to peerds
+ // and produces a new Connection_Handler object to do the dirty
+ // work...
+public:
+ Connection_Handler_Connector (void);
+
+ // Initiate (or reinitiate) a connection on the Connection_Handler.
+ int initiate_connection (Connection_Handler *,
+ ACE_Synch_Options & = ACE_Synch_Options::synch);
+
+};
+
+#endif /* _IO_HANDLER_CONNECTOR */
diff --git a/ACE/apps/Gateway/Gateway/Consumer_Dispatch_Set.h b/ACE/apps/Gateway/Gateway/Consumer_Dispatch_Set.h
new file mode 100644
index 00000000000..3a5d0cf3e25
--- /dev/null
+++ b/ACE/apps/Gateway/Gateway/Consumer_Dispatch_Set.h
@@ -0,0 +1,32 @@
+/* -*- C++ -*- */
+// $Id$
+
+// ============================================================================
+//
+// = LIBRARY
+// apps
+//
+// = FILENAME
+// Consumer_Dispatch_Set.h
+//
+// = AUTHOR
+// Doug Schmidt
+//
+// ============================================================================
+
+#ifndef CONSUMER_DISPATCH_SET
+#define CONSUMER_DISPATCH_SET
+
+#include "ace/Containers.h"
+
+#if !defined (ACE_LACKS_PRAGMA_ONCE)
+# pragma once
+#endif /* ACE_LACKS_PRAGMA_ONCE */
+
+// Forward reference.
+class Connection_Handler;
+
+typedef ACE_Unbounded_Set<Connection_Handler *> Consumer_Dispatch_Set;
+typedef ACE_Unbounded_Set_Iterator<Connection_Handler *> Consumer_Dispatch_Set_Iterator;
+
+#endif /* CONSUMER_DISPATCH_SET */
diff --git a/ACE/apps/Gateway/Gateway/Event.h b/ACE/apps/Gateway/Gateway/Event.h
new file mode 100644
index 00000000000..4c157bbce68
--- /dev/null
+++ b/ACE/apps/Gateway/Gateway/Event.h
@@ -0,0 +1,225 @@
+/* -*- C++ -*- */
+// $Id$
+
+// ============================================================================
+//
+// = LIBRARY
+// gateway
+//
+// = FILENAME
+// Event.h
+//
+// = AUTHOR
+// Doug Schmidt
+//
+// ============================================================================
+
+#ifndef EVENT_H
+#define EVENT_H
+
+#include "ace/os_include/arpa/os_inet.h"
+
+#if !defined (ACE_LACKS_PRAGMA_ONCE)
+# pragma once
+#endif /* ACE_LACKS_PRAGMA_ONCE */
+
+#include "ace/Basic_Types.h"
+
+// = The following #defines should really be in a separate include
+// file that is shared with the ../Peer/ directory. For now, we'll
+// keep them here to simplify the sharing between the two directories.
+// BTW, this is also the reason why all the methods are inlined...
+
+// Used by Peers to create Consumers in a Gateway.
+#if !defined (DEFAULT_GATEWAY_CONSUMER_PORT)
+#define DEFAULT_GATEWAY_CONSUMER_PORT 10009
+#endif /* DEFAULT_GATEWAY_CONSUMER_PORT */
+
+// Used by Peers create Suppliers in a Gateway.
+#if !defined (DEFAULT_GATEWAY_SUPPLIER_PORT)
+#define DEFAULT_GATEWAY_SUPPLIER_PORT 10010
+#endif /* DEFAULT_GATEWAY_SUPPLIER_PORT */
+
+// Used by a Gateway to create Consumers in a Peer.
+#if !defined (DEFAULT_PEER_CONSUMER_PORT)
+#define DEFAULT_PEER_CONSUMER_PORT 10011
+#endif /* DEFAULT_PEER_CONSUMER_PORT */
+
+// Used by a Gateway to create Suppliers in a Peer.
+#if !defined (DEFAULT_PEER_SUPPLIER_PORT)
+#define DEFAULT_PEER_SUPPLIER_PORT 10012
+#endif /* DEFAULT_PEER_SUPPLIER_PORT */
+
+#if !defined (MAX_CONSUMERS)
+#define MAX_CONSUMERS 1000
+#endif /* MAX_CONSUMERS */
+
+// This is the unique supplier identifier that denotes a particular
+// <Connection_Handler> in the Gateway.
+typedef ACE_INT32 CONNECTION_ID;
+
+enum
+{
+ // = These are the types of events generated by the <Suppliers> and
+ // handled by the <Event_Channel>.
+
+ ROUTING_EVENT = 0,
+ // A normal event, which is forwarded to the <Consumers>.
+
+ SUBSCRIPTION_EVENT = 1
+ // A subscription to <Suppliers> managed by the <Event_Channel>.
+};
+
+class Event_Key
+{
+ // = TITLE
+ // Address used to identify the source/destination of an event.
+ //
+ // = DESCRIPTION
+ // This is really a "processing descriptor" that is used to
+ // decouple the processing, filtering, and forwarding logic of
+ // the Event Channel from the format of the data. The
+ // <connection_id_> and <type_> fields are copied from the
+ // <Event_Header> class below.
+public:
+ Event_Key (CONNECTION_ID cid = -1,
+ ACE_INT32 type = 0,
+ ACE_INT32 priority = 0)
+ : connection_id_ (cid),
+ type_ (type),
+ priority_ (priority)
+ {
+ }
+
+ bool operator== (const Event_Key &event_addr) const
+ {
+ return this->connection_id_ == event_addr.connection_id_
+ && this->type_ == event_addr.type_;
+ }
+
+ CONNECTION_ID connection_id_;
+ // Unique connection identifier that denotes a particular
+ // Connection_Handler.
+
+ ACE_INT32 type_;
+ // Event type, e.g., <ROUTING_EVENT> or <SUBSCRIPTION_EVENT>.
+
+ ACE_INT32 priority_;
+ // Event priority.
+};
+
+class Event_Header
+{
+ // = TITLE
+ // Fixed sized header.
+ //
+ // = DESCRIPTION
+ // This is designed to have a sizeof (16) to avoid alignment
+ // problems on most platforms.
+public:
+ enum
+ {
+ INVALID_ID = -1 // No peer can validly use this number.
+ };
+
+ Event_Header (ACE_INT32 len,
+ CONNECTION_ID connection_id,
+ ACE_INT32 type,
+ ACE_INT32 priority)
+ : len_ (len),
+ connection_id_ (connection_id),
+ type_ (type),
+ priority_ (priority)
+ {
+ }
+
+ void decode (void)
+ {
+ this->len_ = ntohl (this->len_);
+ this->connection_id_ = ntohl (this->connection_id_);
+ this->type_ = ntohl (this->type_);
+ this->priority_ = ntohl (this->priority_);
+ }
+ // Decode from network byte order to host byte order.
+
+ void encode (void)
+ {
+ this->len_ = htonl (this->len_);
+ this->connection_id_ = htonl (this->connection_id_);
+ this->type_ = htonl (this->type_);
+ this->priority_ = htonl (this->priority_);
+ }
+ // Encode from host byte order to network byte order.
+
+ ACE_INT32 len_;
+ // Length of the data_ payload, in bytes.
+
+ CONNECTION_ID connection_id_;
+ // Unique connection identifier that denotes a particular
+ // Connection_Handler.
+
+ ACE_INT32 type_;
+ // Event type, e.g., <ROUTING_EVENT> or <SUBSCRIPTION_EVENT>.
+
+ ACE_INT32 priority_;
+ // Event priority.
+};
+
+class Event
+{
+ // = TITLE
+ // Variable-sized event (data_ may be variable-sized between
+ // 0 and MAX_PAYLOAD_SIZE).
+public:
+ enum { MAX_PAYLOAD_SIZE = 1024 };
+ // The maximum size of an Event.
+
+ Event () : header_ (0, -1, 0, 0) {};
+
+ Event_Header header_;
+ // Event header.
+
+ char data_[MAX_PAYLOAD_SIZE];
+ // Event data.
+};
+
+class Subscription
+{
+ // = TITLE
+ // Allows Consumers to subscribe to be routed information
+ // arriving from a particular Supplier connection id.
+public:
+ void decode (void)
+ {
+ this->connection_id_ = ntohl (this->connection_id_);
+
+ for (ACE_INT32 i = 0; i < this->total_consumers_; i++)
+ this->consumers_[i] = ntohl (this->consumers_[i]);
+
+ this->total_consumers_ = ntohl (this->total_consumers_);
+ }
+ // Decode from network byte order to host byte order.
+
+ void encode (void)
+ {
+ this->connection_id_ = htonl (this->connection_id_);
+
+ for (ACE_INT32 i = 0; i < this->total_consumers_; i++)
+ this->consumers_[i] = htonl (this->consumers_[i]);
+
+ this->total_consumers_ = htonl (this->total_consumers_);
+ }
+ // Encode from host byte order to network byte order.
+
+ ACE_INT32 connection_id_;
+ // Connection id.
+
+ ACE_INT32 consumers_[MAX_CONSUMERS];
+ // Connection ids for consumers that will be routed information
+ // containing this <connection_id_>
+
+ ACE_INT32 total_consumers_;
+ // Total number of these consumers.
+};
+
+#endif /* EVENT_H */
diff --git a/ACE/apps/Gateway/Gateway/Event_Channel.cpp b/ACE/apps/Gateway/Gateway/Event_Channel.cpp
new file mode 100644
index 00000000000..beb35c1856e
--- /dev/null
+++ b/ACE/apps/Gateway/Gateway/Event_Channel.cpp
@@ -0,0 +1,588 @@
+// $Id$
+
+#define ACE_BUILD_SVC_DLL
+
+#include "Connection_Handler_Connector.h"
+#include "Event_Channel.h"
+#include "ace/OS_NS_sys_select.h"
+#include "ace/Signal.h"
+
+ACE_RCSID(Gateway, Event_Channel, "$Id$")
+
+Event_Channel::~Event_Channel (void)
+{
+}
+
+Event_Channel::Event_Channel (void)
+ : supplier_acceptor_ (*this, 'S'),
+ consumer_acceptor_ (*this, 'C')
+{
+}
+
+int
+Event_Channel::compute_performance_statistics (void)
+{
+ ACE_DEBUG ((LM_DEBUG, "(%t) doing the performance timeout here...\n"));
+ CONNECTION_MAP_ITERATOR cmi (this->connection_map_);
+
+ // If we've got a <ACE_Thread_Manager> then use it to suspend all
+ // the threads. This will enable us to get an accurate count.
+
+ if (Options::instance ()->threading_strategy ()
+ != Options::REACTIVE)
+ {
+ if (ACE_Thread_Manager::instance ()->suspend_all () == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "(%t) %p\n",
+ "suspend_all"),
+ -1);
+ ACE_DEBUG ((LM_DEBUG,
+ "(%t) suspending all threads..."));
+ }
+
+ size_t total_bytes_in = 0;
+ size_t total_bytes_out = 0;
+
+ // Iterate through the connection map summing up the number of bytes
+ // sent/received.
+
+ for (CONNECTION_MAP_ENTRY *me = 0;
+ cmi.next (me) != 0;
+ cmi.advance ())
+ {
+ Connection_Handler *connection_handler = me->int_id_;
+
+ if (connection_handler->connection_role () == 'C')
+ total_bytes_out += connection_handler->total_bytes ();
+ else // connection_handler->connection_role () == 'S'
+ total_bytes_in += connection_handler->total_bytes ();
+ }
+
+ ACE_DEBUG ((LM_DEBUG,
+ "(%t) after %d seconds, \ntotal_bytes_in = %d\ntotal_bytes_out = %d\n",
+ Options::instance ()->performance_window (),
+ total_bytes_in,
+ total_bytes_out));
+
+ ACE_DEBUG ((LM_DEBUG,
+ "(%t) %f Mbits/sec received.\n",
+ (float) (total_bytes_in * 8 /
+ (float) (1024 * 1024 * Options::instance ()->performance_window ()))));
+
+ ACE_DEBUG ((LM_DEBUG,
+ "(%t) %f Mbits/sec sent.\n",
+ (float) (total_bytes_out * 8 /
+ (float) (1024 * 1024 * Options::instance ()->performance_window ()))));
+
+ // Resume all the threads again.
+
+ if (Options::instance ()->threading_strategy ()
+ != Options::REACTIVE)
+ {
+ if (ACE_Thread_Manager::instance ()->resume_all () == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "(%t) %p\n",
+ "resume_all"),
+ -1);
+ ACE_DEBUG ((LM_DEBUG,
+ "(%t) resuming all threads..."));
+ }
+
+
+ return 0;
+}
+
+int
+Event_Channel::handle_timeout (const ACE_Time_Value &,
+ const void *)
+{
+ // This is called periodically to compute performance statistics.
+ return this->compute_performance_statistics ();
+}
+
+int
+Event_Channel::put (ACE_Message_Block *event,
+ ACE_Time_Value *)
+{
+ // We got a valid event, so determine its type, which is stored in
+ // the first of the two <ACE_Message_Block>s, which are chained
+ // together by <ACE::recv>.
+ Event_Key *event_key = (Event_Key *) event->rd_ptr ();
+
+ // Skip over the address portion and get the data, which is in the
+ // second <ACE_Message_Block>.
+ ACE_Message_Block *data = event->cont ();
+
+ switch (event_key->type_)
+ {
+ case ROUTING_EVENT:
+ this->routing_event (event_key,
+ data);
+ break;
+ case SUBSCRIPTION_EVENT:
+ this->subscription_event (data);
+ break;
+ }
+
+ // Release the memory in the message block.
+ event->release ();
+ return 0;
+}
+
+void
+Event_Channel::subscription_event (ACE_Message_Block *data)
+{
+ Event *event = (Event *) data->rd_ptr ();
+
+ ACE_DEBUG ((LM_DEBUG,
+ "(%t) received a subscription with %d bytes from connection id %d\n",
+ event->header_.len_,
+ event->header_.connection_id_));
+ Subscription *subscription = (Subscription *) event->data_;
+ // Convert the subscription into host byte order so that we can
+ // access it directly without having to repeatedly muck with it...
+ subscription->decode ();
+
+ ACE_DEBUG ((LM_DEBUG,
+ "(%t) connection_id_ = %d, total_consumers_ = %d\n",
+ subscription->connection_id_,
+ subscription->total_consumers_));
+
+ for (ACE_INT32 i = 0;
+ i < subscription->total_consumers_;
+ i++)
+ ACE_DEBUG ((LM_DEBUG,
+ "(%t) consumers_[%d] = %d\n",
+ i,
+ subscription->consumers_[i]));
+
+}
+
+void
+Event_Channel::routing_event (Event_Key *forwarding_address,
+ ACE_Message_Block *data)
+{
+ Consumer_Dispatch_Set *dispatch_set = 0;
+
+ // Initialize the <dispatch_set> to points to the set of Consumers
+ // associated with this forwarding address.
+
+ if (this->efd_.find (*forwarding_address,
+ dispatch_set) == -1)
+ // Failure.
+ ACE_ERROR ((LM_DEBUG,
+ "(%t) find failed on connection id = %d, type = %d\n",
+ forwarding_address->connection_id_,
+ forwarding_address->type_));
+ else
+ {
+ // Check to see if there are any consumers.
+ if (dispatch_set->size () == 0)
+ ACE_DEBUG ((LM_WARNING,
+ "there are no active consumers for this event currently\n"));
+
+ else // There are consumers, so forward the event.
+ {
+ // Initialize the interator.
+ Consumer_Dispatch_Set_Iterator dsi (*dispatch_set);
+
+ // At this point, we should assign a thread-safe locking
+ // strategy to the <ACE_Message_Block> is we're running in a
+ // multi-threaded configuration.
+ data->locking_strategy (Options::instance ()->locking_strategy ());
+
+ for (Connection_Handler **connection_handler = 0;
+ dsi.next (connection_handler) != 0;
+ dsi.advance ())
+ {
+ // Only process active connection_handlers.
+ if ((*connection_handler)->state () == Connection_Handler::ESTABLISHED)
+ {
+ // Duplicate the event portion via reference
+ // counting.
+ ACE_Message_Block *dup_msg = data->duplicate ();
+
+ ACE_DEBUG ((LM_DEBUG,
+ "(%t) forwarding to Consumer %d\n",
+ (*connection_handler)->connection_id ()));
+
+ if ((*connection_handler)->put (dup_msg) == -1)
+ {
+ if (errno == EWOULDBLOCK) // The queue has filled up!
+ ACE_ERROR ((LM_ERROR,
+ "(%t) %p\n",
+ "gateway is flow controlled, so we're dropping events"));
+ else
+ ACE_ERROR ((LM_ERROR,
+ "(%t) %p transmission error to peer %d\n",
+ "put",
+ (*connection_handler)->connection_id ()));
+
+ // We are responsible for releasing an
+ // ACE_Message_Block if failures occur.
+ dup_msg->release ();
+ }
+ }
+ }
+ }
+ }
+}
+
+int
+Event_Channel::initiate_connection_connection (Connection_Handler *connection_handler,
+ int sync_directly)
+{
+ ACE_Synch_Options synch_options;
+
+ if (sync_directly)
+ // In separated connection handler thread, connection can be
+ // initiated by block mode (synch mode) directly.
+ synch_options = ACE_Synch_Options::synch;
+ else if (Options::instance ()->blocking_semantics () == ACE_NONBLOCK)
+ synch_options = ACE_Synch_Options::asynch;
+ else
+ synch_options = ACE_Synch_Options::synch;
+
+ return this->connector_.initiate_connection (connection_handler,
+ synch_options);
+}
+
+int
+Event_Channel::complete_connection_connection (Connection_Handler *connection_handler)
+{
+ int option = connection_handler->connection_role () == 'S'
+ ? SO_RCVBUF
+ : SO_SNDBUF;
+ int socket_queue_size =
+ Options::instance ()->socket_queue_size ();
+
+ if (socket_queue_size > 0)
+ if (connection_handler->peer ().set_option (SOL_SOCKET,
+ option,
+ &socket_queue_size,
+ sizeof (int)) == -1)
+ ACE_ERROR ((LM_ERROR,
+ "(%t) %p\n",
+ "set_option"));
+
+ connection_handler->thr_mgr (ACE_Thread_Manager::instance ());
+
+ // Our state is now "established."
+ connection_handler->state (Connection_Handler::ESTABLISHED);
+
+ // Restart the timeout to 1.
+ connection_handler->timeout (1);
+
+ ACE_INT32 id = htonl (connection_handler->connection_id ());
+
+ // Send the connection id to the peerd.
+
+ ssize_t n = connection_handler->peer ().send ((const void *) &id,
+ sizeof id);
+
+ if (n != sizeof id)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "(%t) %p\n",
+ n == 0 ? "peer has closed down unexpectedly" : "send"),
+ -1);
+ return 0;
+}
+
+// Restart connection (blocking_semantics dicates whether we restart
+// synchronously or asynchronously).
+
+int
+Event_Channel::reinitiate_connection_connection (Connection_Handler *connection_handler)
+{
+ // Cancel asynchronous connecting before re-initializing. It will
+ // close the peer and cancel the asynchronous connecting.
+ this->cancel_connection_connection(connection_handler);
+
+ if (connection_handler->state () != Connection_Handler::DISCONNECTING)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "(%t) scheduling reinitiation of Connection_Handler %d\n",
+ connection_handler->connection_id ()));
+
+ // Reschedule ourselves to try and connect again.
+ ACE_Time_Value const timeout (connection_handler->timeout ());
+ if (ACE_Reactor::instance ()->schedule_timer
+ (connection_handler,
+ 0,
+ timeout) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "(%t) %p\n",
+ "schedule_timer"),
+ -1);
+ }
+ return 0;
+}
+
+// It is useful to provide a separate method to cancel the
+// asynchronous connecting.
+
+int
+Event_Channel::cancel_connection_connection (Connection_Handler *connection_handler)
+{
+ // Skip over proxies with deactivated handles.
+ if (connection_handler->get_handle () != ACE_INVALID_HANDLE)
+ {
+ // Make sure to close down peer to reclaim descriptor.
+ connection_handler->peer ().close ();
+ // Cancel asynchronous connecting before re-initializing.
+ return this->connector_.cancel(connection_handler);
+ }
+ return 0;
+}
+
+// Initiate active connections with the Consumer and Supplier Peers.
+
+void
+Event_Channel::initiate_connector (void)
+{
+ if (Options::instance ()->enabled
+ (Options::CONSUMER_CONNECTOR | Options::SUPPLIER_CONNECTOR))
+ {
+ CONNECTION_MAP_ITERATOR cmi (this->connection_map_);
+
+ // Iterate through the Consumer Map connecting all the
+ // Connection_Handlers.
+
+ for (CONNECTION_MAP_ENTRY *me = 0;
+ cmi.next (me) != 0;
+ cmi.advance ())
+ {
+ Connection_Handler *connection_handler = me->int_id_;
+
+ if (this->initiate_connection_connection (connection_handler) == -1)
+ continue; // Failures are handled elsewhere...
+ }
+ }
+}
+
+// Initiate passive acceptor to wait for Consumer and Supplier Peers
+// to accept.
+
+int
+Event_Channel::initiate_acceptors (void)
+{
+ if (Options::instance ()->enabled (Options::CONSUMER_ACCEPTOR))
+ {
+ ACE_INET_Addr
+ consumer_addr (Options::instance ()->consumer_acceptor_port ());
+ if (this->consumer_acceptor_.open
+ (consumer_addr,
+ ACE_Reactor::instance (),
+ Options::instance ()->blocking_semantics ()) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "%p\n",
+ "cannot register acceptor"),
+ -1);
+ else
+ ACE_DEBUG ((LM_DEBUG,
+ "accepting Consumers at %d\n",
+ Options::instance ()->consumer_acceptor_port ()));
+ }
+ if (Options::instance ()->enabled (Options::SUPPLIER_ACCEPTOR))
+ {
+ ACE_INET_Addr
+ supplier_addr (Options::instance ()->supplier_acceptor_port ());
+ if (this->supplier_acceptor_.open
+ (supplier_addr,
+ ACE_Reactor::instance (),
+ Options::instance ()->blocking_semantics ()) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "%p\n",
+ "cannot register acceptor"),
+ -1);
+ else
+ ACE_DEBUG ((LM_DEBUG,
+ "accepting Suppliers at %d\n",
+ Options::instance ()->supplier_acceptor_port ()));
+ }
+
+ return 0;
+}
+
+// This method gracefully shuts down all the Handlers in the
+// Connection_Handler Connection Map.
+
+int
+Event_Channel::close (u_long)
+{
+ if (Options::instance ()->threading_strategy () != Options::REACTIVE)
+ {
+ if (ACE_Thread_Manager::instance ()->suspend_all () == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "(%t) %p\n",
+ "suspend_all"),
+ -1);
+ ACE_DEBUG ((LM_DEBUG,
+ "(%t) suspending all threads\n"));
+ }
+
+ // First tell everyone that the spaceship is here...
+ {
+ CONNECTION_MAP_ITERATOR cmi (this->connection_map_);
+
+ // Iterate over all the handlers and shut them down.
+
+ for (CONNECTION_MAP_ENTRY *me = 0; // It's safe to reset me to 0.
+ cmi.next (me) != 0;
+ cmi.advance ())
+ {
+ Connection_Handler *connection_handler = me->int_id_;
+
+ ACE_DEBUG ((LM_DEBUG,
+ "(%t) closing down connection %d\n",
+ connection_handler->connection_id ()));
+
+ // If have no this statement, the gatewayd will abort when exiting
+ // with some Consumer/Supplier not connected.
+ if (connection_handler->state()==Connection_Handler::CONNECTING)
+ this->cancel_connection_connection(connection_handler);
+ // Mark Connection_Handler as DISCONNECTING so we don't try to
+ // reconnect...
+ connection_handler->state (Connection_Handler::DISCONNECTING);
+ }
+ }
+
+ // Close down the connector
+ this->connector_.close ();
+
+ // Close down the supplier acceptor.
+ this->supplier_acceptor_.close ();
+
+ // Close down the consumer acceptor.
+ this->consumer_acceptor_.close ();
+
+ // Now tell everyone that it is now time to commit suicide.
+ {
+ CONNECTION_MAP_ITERATOR cmi (this->connection_map_);
+
+ for (CONNECTION_MAP_ENTRY *me = 0; // It's safe to reset me to 0.
+ cmi.next (me) != 0;
+ cmi.advance ())
+ {
+ Connection_Handler *connection_handler = me->int_id_;
+
+ // Deallocate Connection_Handler resources.
+ connection_handler->destroy (); // Will trigger a delete.
+ }
+ }
+
+ return 0;
+}
+
+int
+Event_Channel::find_proxy (ACE_INT32 connection_id,
+ Connection_Handler *&connection_handler)
+{
+ return this->connection_map_.find (connection_id,
+ connection_handler);
+}
+
+int
+Event_Channel::bind_proxy (Connection_Handler *connection_handler)
+{
+ int result = this->connection_map_.bind (connection_handler->connection_id (),
+ connection_handler);
+
+ switch (result)
+ {
+ case -1:
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "(%t) bind failed for connection %d\n",
+ connection_handler->connection_id ()),
+ -1);
+ /* NOTREACHED */
+ case 1: // Oops, found a duplicate!
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "(%t) duplicate connection %d, already bound\n",
+ connection_handler->connection_id ()),
+ -1);
+ /* NOTREACHED */
+ case 0:
+ // Success.
+ return 0;
+ /* NOTREACHED */
+ default:
+ ACE_ERROR_RETURN ((LM_DEBUG,
+ "(%t) invalid result %d\n",
+ result),
+ -1);
+ /* NOTREACHED */
+ }
+
+ ACE_NOTREACHED (return 0);
+}
+
+int
+Event_Channel::subscribe (const Event_Key &event_addr,
+ Consumer_Dispatch_Set *cds)
+{
+ int result = this->efd_.bind (event_addr, cds);
+
+ // Bind with consumer map, keyed by peer address.
+ switch (result)
+ {
+ case -1:
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "(%t) bind failed for connection %d\n",
+ event_addr.connection_id_),
+ -1);
+ /* NOTREACHED */
+ case 1: // Oops, found a duplicate!
+ ACE_ERROR_RETURN ((LM_DEBUG,
+ "(%t) duplicate consumer map entry %d, "
+ "already bound\n",
+ event_addr.connection_id_),
+ -1);
+ /* NOTREACHED */
+ case 0:
+ // Success.
+ return 0;
+ default:
+ ACE_ERROR_RETURN ((LM_DEBUG,
+ "(%t) invalid result %d\n",
+ result),
+ -1);
+ /* NOTREACHED */
+ }
+
+ ACE_NOTREACHED (return 0);
+}
+
+int
+Event_Channel::open (void *)
+{
+ // Ignore <SIGPIPE> so each <Consumer_Handler> can handle it.
+ ACE_Sig_Action sig ((ACE_SignalHandler) SIG_IGN, SIGPIPE);
+ ACE_UNUSED_ARG (sig);
+
+ // Actively initiate Peer connections.
+ this->initiate_connector ();
+
+ // Passively initiate Peer acceptor.
+ if (this->initiate_acceptors () == -1)
+ return -1;
+
+ // If we're not running reactively, then we need to make sure that
+ // <ACE_Message_Block> reference counting operations are
+ // thread-safe. Therefore, we create an <ACE_Lock_Adapter> that is
+ // parameterized by <ACE_SYNCH_MUTEX> to prevent race conditions.
+ if (Options::instance ()->threading_strategy ()
+ != Options::REACTIVE)
+ {
+ ACE_Lock_Adapter<ACE_SYNCH_MUTEX> *la;
+
+ ACE_NEW_RETURN (la,
+ ACE_Lock_Adapter<ACE_SYNCH_MUTEX>,
+ -1);
+
+ Options::instance ()->locking_strategy (la);
+ }
+
+ return 0;
+}
+
diff --git a/ACE/apps/Gateway/Gateway/Event_Channel.h b/ACE/apps/Gateway/Gateway/Event_Channel.h
new file mode 100644
index 00000000000..64788cc0cdb
--- /dev/null
+++ b/ACE/apps/Gateway/Gateway/Event_Channel.h
@@ -0,0 +1,135 @@
+/* -*- C++ -*- */
+// $Id$
+
+// ============================================================================
+//
+// = LIBRARY
+// gateway
+//
+// = FILENAME
+// Event_Channel.h
+//
+// = AUTHOR
+// Doug Schmidt <schmidt@cs.wustl.edu>
+//
+// ============================================================================
+
+#ifndef ACE_EVENT_CHANNEL
+#define ACE_EVENT_CHANNEL
+
+#include "Connection_Handler_Connector.h"
+#include "Connection_Handler_Acceptor.h"
+#include "Consumer_Dispatch_Set.h"
+#include "Event_Forwarding_Discriminator.h"
+#include "ace/svc_export.h"
+
+typedef ACE_Null_Mutex MAP_MUTEX;
+
+class ACE_Svc_Export Event_Channel : public ACE_Event_Handler
+{
+ // = TITLE
+ // Define a generic Event_Channel.
+ //
+ // = DESCRIPTION
+ // The inspiration for this class is derived from the CORBA COS
+ // Event Channel, though the design is simplified.
+ //
+ // We inherit from <ACE_Event_Handler> so that we can be
+ // registered with an <ACE_Reactor> to handle timeouts.
+public:
+ // = Initialization and termination methods.
+ Event_Channel (void);
+ ~Event_Channel (void);
+
+ virtual int open (void * = 0);
+ // Open the channel.
+
+ virtual int close (u_long = 0);
+ // Close down the Channel.
+
+ // = Proxy management methods.
+ int initiate_connection_connection (Connection_Handler *, int sync_directly = 0);
+ // Initiate the connection of the <Connection_Handler> to its peer.
+ // Second paratemer is used for thread connection-handler which will
+ // block the connecting procedure directly, need not care
+ // Options::blocking_semantics().
+
+ int complete_connection_connection (Connection_Handler *);
+ // Complete the initialization of the <Connection_Handler> once it's
+ // connected to its Peer.
+
+ int reinitiate_connection_connection (Connection_Handler *);
+ // Reinitiate a connection asynchronously when the Peer fails.
+ int cancel_connection_connection (Connection_Handler *);
+ // Cancel a asynchronous connection.
+
+ int bind_proxy (Connection_Handler *);
+ // Bind the <Connection_Handler> to the <connection_map_>.
+
+ int find_proxy (ACE_INT32 connection_id,
+ Connection_Handler *&);
+ // Locate the <Connection_Handler> with <connection_id>.
+
+ int subscribe (const Event_Key &event_addr,
+ Consumer_Dispatch_Set *cds);
+ // Subscribe the <Consumer_Dispatch_Set> to receive events that
+ // match <Event_Key>.
+
+ // = Event processing entry point.
+ virtual int put (ACE_Message_Block *mb,
+ ACE_Time_Value * = 0);
+ // Pass <mb> to the Event Channel so it can forward it to Consumers.
+
+ void initiate_connector (void);
+ // Actively initiate connections to the Peers.
+
+ int initiate_acceptors (void);
+ // Passively initiate the <Peer_Acceptor>s for Consumer and
+ // Suppliers.
+
+private:
+ int parse_args (int argc, char *argv[]);
+ // Parse the command-line arguments.
+
+ // = Methods for handling events.
+ void routing_event (Event_Key *event_key,
+ ACE_Message_Block *data);
+ // Forwards the <data> to Consumer that have registered to receive
+ // it, based on addressing information in the <event_key>.
+
+ void subscription_event (ACE_Message_Block *data);
+ // Add a Consumer subscription.
+
+ int compute_performance_statistics (void);
+ // Perform timer-based performance profiling.
+
+ virtual int handle_timeout (const ACE_Time_Value &,
+ const void *arg);
+ // Periodically callback to perform timer-based performance
+ // profiling.
+
+ Connection_Handler_Connector connector_;
+ // Used to establish the connections actively.
+
+ Connection_Handler_Acceptor supplier_acceptor_;
+ // Used to establish connections passively and create Suppliers.
+
+ Connection_Handler_Acceptor consumer_acceptor_;
+ // Used to establish connections passively and create Consumers.
+
+ // = Make life easier by defining typedefs.
+ typedef ACE_Map_Manager<CONNECTION_ID, Connection_Handler *, MAP_MUTEX>
+ CONNECTION_MAP;
+ typedef ACE_Map_Iterator<CONNECTION_ID, Connection_Handler *, MAP_MUTEX>
+ CONNECTION_MAP_ITERATOR;
+ typedef ACE_Map_Entry<CONNECTION_ID, Connection_Handler *>
+ CONNECTION_MAP_ENTRY;
+
+ CONNECTION_MAP connection_map_;
+ // Table that maps <CONNECTION_ID>s to <Connection_Handler> *'s.
+
+ Event_Forwarding_Discriminator efd_;
+ // Map that associates an event to a set of <Consumer_Handler> *'s.
+};
+
+#endif /* ACE_EVENT_CHANNEL */
diff --git a/ACE/apps/Gateway/Gateway/Event_Forwarding_Discriminator.cpp b/ACE/apps/Gateway/Gateway/Event_Forwarding_Discriminator.cpp
new file mode 100644
index 00000000000..d170dee9005
--- /dev/null
+++ b/ACE/apps/Gateway/Gateway/Event_Forwarding_Discriminator.cpp
@@ -0,0 +1,64 @@
+// $Id$
+
+#if !defined (_CONSUMER_MAP_C)
+#define _CONSUMER_MAP_C
+
+#include "Event_Forwarding_Discriminator.h"
+
+ACE_RCSID(Gateway, Event_Forwarding_Discriminator, "$Id$")
+
+// Bind the Event_Key to the INT_ID.
+
+int
+Event_Forwarding_Discriminator::bind (Event_Key event_addr,
+ Consumer_Dispatch_Set *cds)
+{
+ return this->map_.bind (event_addr, cds);
+}
+
+// Find the Consumer_Dispatch_Set corresponding to the Event_Key.
+
+int
+Event_Forwarding_Discriminator::find (Event_Key event_addr,
+ Consumer_Dispatch_Set *&cds)
+{
+ return this->map_.find (event_addr, cds);
+}
+
+// Unbind (remove) the Event_Key from the map.
+
+int
+Event_Forwarding_Discriminator::unbind (Event_Key event_addr)
+{
+ Consumer_Dispatch_Set *cds = 0;
+ int result = this->map_.unbind (event_addr, cds);
+ delete cds;
+ return result;
+}
+
+Event_Forwarding_Discriminator_Iterator::Event_Forwarding_Discriminator_Iterator
+ (Event_Forwarding_Discriminator &rt)
+ : map_iter_ (rt.map_)
+{
+}
+
+int
+Event_Forwarding_Discriminator_Iterator::next (Consumer_Dispatch_Set *&cds)
+{
+ ACE_Map_Entry<Event_Key, Consumer_Dispatch_Set *> *temp;
+
+ if (this->map_iter_.next (temp) == 0)
+ return 0;
+ else
+ {
+ cds = temp->int_id_;
+ return 1;
+ }
+}
+
+int
+Event_Forwarding_Discriminator_Iterator::advance (void)
+{
+ return this->map_iter_.advance ();
+}
+#endif /* _CONSUMER_MAP_C */
diff --git a/ACE/apps/Gateway/Gateway/Event_Forwarding_Discriminator.h b/ACE/apps/Gateway/Gateway/Event_Forwarding_Discriminator.h
new file mode 100644
index 00000000000..2a83a53a584
--- /dev/null
+++ b/ACE/apps/Gateway/Gateway/Event_Forwarding_Discriminator.h
@@ -0,0 +1,65 @@
+/* -*- C++ -*- */
+// $Id$
+
+// ============================================================================
+//
+// = LIBRARY
+// gateway
+//
+// = FILENAME
+// Event_Forwarding_Discriminator.h
+//
+// = AUTHOR
+// Doug Schmidt
+//
+// ============================================================================
+
+#ifndef _CONSUMER_MAP_H
+#define _CONSUMER_MAP_H
+
+#include "ace/Map_Manager.h"
+
+#if !defined (ACE_LACKS_PRAGMA_ONCE)
+# pragma once
+#endif /* ACE_LACKS_PRAGMA_ONCE */
+
+#include "ace/Null_Mutex.h"
+#include "Event.h"
+#include "Consumer_Dispatch_Set.h"
+
+class Event_Forwarding_Discriminator
+{
+ // = TITLE
+ // Map events to the set of Consumer_Proxies that have subscribed
+ // to receive the event.
+public:
+ int bind (Event_Key event, Consumer_Dispatch_Set *cds);
+ // Associate Event with the Consumer_Dispatch_Set.
+
+ int unbind (Event_Key event);
+ // Locate EXID and pass out parameter via INID. If found,
+ // return 0, else -1.
+
+ int find (Event_Key event, Consumer_Dispatch_Set *&cds);
+ // Break any association of EXID.
+
+public:
+ ACE_Map_Manager<Event_Key, Consumer_Dispatch_Set *, ACE_Null_Mutex> map_;
+ // Map that associates <Event_Key>s (external ids) with
+ // <Consumer_Dispatch_Set> *'s <internal IDs>.
+};
+
+class Event_Forwarding_Discriminator_Iterator
+{
+ // = TITLE
+ // Define an iterator for the Consumer Map.
+public:
+ Event_Forwarding_Discriminator_Iterator (Event_Forwarding_Discriminator &mm);
+ int next (Consumer_Dispatch_Set *&);
+ int advance (void);
+
+private:
+ ACE_Map_Iterator<Event_Key, Consumer_Dispatch_Set *, ACE_Null_Mutex> map_iter_;
+ // Map we are iterating over.
+};
+#endif /* _CONSUMER_MAP_H */
diff --git a/ACE/apps/Gateway/Gateway/File_Parser.cpp b/ACE/apps/Gateway/Gateway/File_Parser.cpp
new file mode 100644
index 00000000000..de82ad0ba10
--- /dev/null
+++ b/ACE/apps/Gateway/Gateway/File_Parser.cpp
@@ -0,0 +1,163 @@
+// $Id$
+
+#ifndef FILE_PARSER_C
+#define FILE_PARSER_C
+
+#include "ace/OS_NS_stdio.h"
+#include "ace/OS_NS_stdlib.h"
+
+#include "File_Parser.h"
+
+// This fixes a nasty bug with cfront-based compilers (like
+// Centerline).
+typedef FP::Return_Type FP_RETURN_TYPE;
+
+// File_Parser stuff.
+
+template <class ENTRY>
+File_Parser<ENTRY>::~File_Parser (void)
+{
+}
+
+template <class ENTRY> int
+File_Parser<ENTRY>::open (const ACE_TCHAR filename[])
+{
+ this->infile_ = ACE_OS::fopen (filename, ACE_TEXT ("r"));
+ if (this->infile_ == 0)
+ return -1;
+ else
+ return 0;
+}
+
+template <class ENTRY> int
+File_Parser<ENTRY>::close (void)
+{
+ return ACE_OS::fclose (this->infile_);
+}
+
+template <class ENTRY> FP_RETURN_TYPE
+File_Parser<ENTRY>::getword (char buf[])
+{
+ return this->readword (buf);
+}
+
+// Get the next string from the file via this->readword()
+// Check make sure the string forms a valid number.
+
+template <class ENTRY> FP_RETURN_TYPE
+File_Parser<ENTRY>::getint (ACE_INT32 &value)
+{
+ char buf[BUFSIZ];
+#if defined (__GNUG__)
+ // egcs 1.1b can't handle the typedef.
+ FP::Return_Type
+#else /* ! __GNUG__ */
+ FP_RETURN_TYPE
+#endif /* ! __GNUG__ */
+ read_result = this->readword (buf);
+
+ if (read_result == FP::RT_SUCCESS)
+ {
+ // Check to see if this is the "use the default value" symbol?
+ if (buf[0] == '*')
+ return FP::RT_DEFAULT;
+ else
+ {
+ // ptr is used for error checking with ACE_OS::strtol.
+ char *ptr;
+
+ // try to convert the buf to a decimal number
+ value = ACE_OS::strtol (buf, &ptr, 10);
+
+ // check if the buf is a decimal or not
+ if (value == 0 && ptr == buf)
+ return FP::RT_PARSE_ERROR;
+ else
+ return FP::RT_SUCCESS;
+ }
+ }
+ else
+ return read_result;
+}
+
+
+template <class ENTRY> FP_RETURN_TYPE
+File_Parser<ENTRY>::readword (char buf[])
+{
+ int wordlength = 0;
+ int c;
+
+ // Skip over leading delimiters and get word.
+
+ while ((c = getc (this->infile_)) != EOF && c != '\n')
+ if (this->delimiter (c))
+ {
+ // We've reached the end of a "word".
+ if (wordlength > 0)
+ break;
+ }
+ else
+ buf[wordlength++] = c;
+
+ buf[wordlength] = '\0';
+
+ if (c == EOF) {
+ // If EOF is just a delimiter, don't return EOF so that the word
+ // gets processed.
+ if (wordlength > 0)
+ {
+ ungetc (c, this->infile_);
+ return FP::RT_SUCCESS;
+ }
+ else
+ // else return EOF so that read loops stop
+ return FP::RT_EOFILE;
+ }
+ else if (c == '\n')
+ {
+ // if the EOLINE is just a delimiter, don't return EOLINE
+ // so that the word gets processed
+ if (wordlength > 0)
+ ungetc (c, this->infile_);
+ else
+ return FP::RT_EOLINE;
+ }
+
+ // Skip comments.
+ if (this->comments (buf[0]))
+ {
+ if (this->skipline () == EOF)
+ return FP::RT_EOFILE;
+ else
+ return FP::RT_COMMENT;
+ }
+ else
+ return FP::RT_SUCCESS;
+}
+
+template <class ENTRY> int
+File_Parser<ENTRY>::delimiter (char ch)
+{
+ return ch == ' ' || ch == ',' || ch == '\t';
+}
+
+template <class ENTRY> int
+File_Parser<ENTRY>::comments (char ch)
+{
+ return ch == '#';
+}
+
+template <class ENTRY> int
+File_Parser<ENTRY>::skipline (void)
+{
+ // Skip the remainder of the line.
+
+ int c;
+
+ while ((c = getc (this->infile_)) != '\n' && c != EOF)
+ continue;
+
+ return c;
+}
+
+#endif /* _FILE_PARSER_C */
diff --git a/ACE/apps/Gateway/Gateway/File_Parser.h b/ACE/apps/Gateway/Gateway/File_Parser.h
new file mode 100644
index 00000000000..6e971090ad8
--- /dev/null
+++ b/ACE/apps/Gateway/Gateway/File_Parser.h
@@ -0,0 +1,97 @@
+/* -*- C++ -*- */
+// $Id$
+
+// ============================================================================
+//
+// = LIBRARY
+// gateway
+//
+// = FILENAME
+// File_Parser.h
+//
+// = AUTHOR
+// Doug Schmidt
+//
+// ============================================================================
+
+#ifndef _FILE_PARSER
+#define _FILE_PARSER
+
+#include "ace/Basic_Types.h"
+
+#if !defined (ACE_LACKS_PRAGMA_ONCE)
+# pragma once
+#endif /* ACE_LACKS_PRAGMA_ONCE */
+
+class FP
+{
+ // = TITLE
+ // This class serves as a namespace for the <Return_Type>.
+public:
+ enum Return_Type
+ {
+ RT_EOLINE,
+ RT_EOFILE,
+ RT_SUCCESS,
+ RT_COMMENT,
+ RT_DEFAULT,
+ RT_PARSE_ERROR
+ };
+};
+
+template <class ENTRY>
+class File_Parser
+{
+ // = TITLE
+ // Class used to parse the configuration file for the
+ // <Consumer_Map>.
+public:
+
+ /// Destructor.
+ virtual ~File_Parser (void);
+
+ // = Open and Close the file specified
+ int open (const ACE_TCHAR filename[]);
+ int close (void);
+
+ virtual FP::Return_Type read_entry (ENTRY &entry,
+ int &line_number) = 0;
+ // Pure virtual hook that subclasses override and use the protected
+ // methods to fill in the <entry>.
+
+protected:
+ FP::Return_Type getword (char buf[]);
+ // Read the next ASCII word.
+
+ FP::Return_Type getint (ACE_INT32 &value);
+ // Read the next integer.
+
+ FP::Return_Type readword (char buf[]);
+ // Read the next "word," which is demarcated by <delimiter>s.
+ //
+ // @@ This function is inherently flawed since it doesn't take a
+ // count of the size of <buf>...
+
+ int delimiter (char ch);
+ // Returns true if <ch> is a delimiter, i.e., ' ', ',', or '\t'.
+
+ int comments (char ch);
+ // Returns true if <ch> is the comment character, i.e., '#'.
+
+ int skipline (void);
+ // Skips to the remainder of a line, e.g., when we find a comment
+ // character.
+
+ FILE *infile_;
+ // Pointer to the file we're reading.
+};
+
+#if defined (ACE_TEMPLATES_REQUIRE_SOURCE)
+#include "File_Parser.cpp"
+#endif /* ACE_TEMPLATES_REQUIRE_SOURCE */
+
+#if defined (ACE_TEMPLATES_REQUIRE_PRAGMA)
+#pragma implementation ("File_Parser.cpp")
+#endif /* ACE_TEMPLATES_REQUIRE_PRAGMA */
+
+#endif /* _FILE_PARSER */
diff --git a/ACE/apps/Gateway/Gateway/Gateway.cpp b/ACE/apps/Gateway/Gateway/Gateway.cpp
new file mode 100644
index 00000000000..0fac7b5085c
--- /dev/null
+++ b/ACE/apps/Gateway/Gateway/Gateway.cpp
@@ -0,0 +1,340 @@
+// $Id$
+
+#define ACE_BUILD_SVC_DLL
+
+#include "ace/OS_NS_stdio.h"
+#include "ace/OS_NS_string.h"
+#include "ace/OS_NS_unistd.h"
+#include "ace/Service_Config.h"
+#include "ace/Signal.h"
+#include "Config_Files.h"
+#include "Event_Channel.h"
+#include "Gateway.h"
+
+ACE_RCSID(Gateway, Gateway, "$Id$")
+
+class ACE_Svc_Export Gateway : public ACE_Service_Object
+{
+ // = TITLE
+ // Integrates the whole Gateway application.
+ //
+ // = DESCRIPTION
+ // This implementation uses the <Event_Channel> as the basis
+ // for the <Gateway> routing.
+protected:
+ // = Service configurator hooks.
+ virtual int init (int argc, ACE_TCHAR *argv[]);
+ // Perform initialization.
+
+ virtual int fini (void);
+ // Perform termination when unlinked dynamically.
+
+ virtual int info (ACE_TCHAR **, size_t) const;
+ // Return info about this service.
+
+ // = Configuration methods.
+ int parse_connection_config_file (void);
+ // Parse the proxy configuration file.
+
+ int parse_consumer_config_file (void);
+ // Parse the consumer configuration file.
+
+ // = Lifecycle management methods.
+ int handle_input (ACE_HANDLE);
+ // Shut down the Gateway when input comes in from the controlling
+ // console.
+
+ int handle_signal (int signum, siginfo_t * = 0, ucontext_t * = 0);
+ // Shut down the Gateway when a signal arrives.
+
+ Event_Channel event_channel_;
+ // The Event Channel routes events from Supplier(s) to Consumer(s)
+ // using <Supplier_Handler> and <Consumer_Handler> objects.
+
+ Connection_Handler_Factory connection_handler_factory_;
+ // Creates the appropriate type of <Connection_Handlers>.
+};
+
+int
+Gateway::handle_signal (int signum, siginfo_t *, ucontext_t *)
+{
+ ACE_UNUSED_ARG (signum);
+
+ // Shut down the main event loop.
+ ACE_Reactor::end_event_loop ();
+ return 0;
+}
+
+int
+Gateway::handle_input (ACE_HANDLE h)
+{
+ char buf[BUFSIZ];
+ // Consume the input...
+ ACE_OS::read (h, buf, sizeof (buf));
+
+ // Shut us down.
+ return this->handle_signal ((int) h);
+}
+
+int
+Gateway::init (int argc, ACE_TCHAR *argv[])
+{
+ // Parse the "command-line" arguments.
+ Options::instance ()->parse_args (argc, argv);
+
+ ACE_Sig_Set sig_set;
+ sig_set.sig_add (SIGINT);
+ sig_set.sig_add (SIGQUIT);
+
+ // Register ourselves to receive signals so we can shut down
+ // gracefully.
+
+ if (ACE_Reactor::instance ()->register_handler (sig_set,
+ this) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ ACE_TEXT ("(%t) %p\n"),
+ ACE_TEXT ("register_handler")),
+ -1);
+
+ // Register this handler to receive events on stdin. We use this to
+ // shutdown the Gateway gracefully.
+ if (ACE_Event_Handler::register_stdin_handler (this,
+ ACE_Reactor::instance (),
+ ACE_Thread_Manager::instance ()) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ ACE_TEXT ("(%t) %p\n"),
+ ACE_TEXT ("register_stdin_handler")),
+ -1);
+
+ // If this->performance_window_ > 0 start a timer.
+
+ if (Options::instance ()->performance_window () > 0)
+ {
+ ACE_Time_Value const performance_time (Options::instance ()->performance_window ());
+ if (ACE_Reactor::instance ()->schedule_timer
+ (&this->event_channel_, 0,
+ performance_time) == -1)
+ ACE_ERROR ((LM_ERROR,
+ ACE_TEXT ("(%t) %p\n"),
+ ACE_TEXT ("schedule_timer")));
+ else
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("starting timer for %d seconds...\n"),
+ Options::instance ()->performance_window ()));
+ }
+
+ // Are we running as a connector?
+ if (Options::instance ()->enabled
+ (Options::CONSUMER_CONNECTOR | Options::SUPPLIER_CONNECTOR))
+ {
+ // Parse the proxy configuration file.
+ this->parse_connection_config_file ();
+
+ // Parse the consumer config file and build the event forwarding
+ // discriminator.
+ this->parse_consumer_config_file ();
+ }
+
+ // Initialize the Event_Channel.
+ return this->event_channel_.open ();
+}
+
+// This method is automatically called when the Gateway is shutdown.
+
+int
+Gateway::fini (void)
+{
+ // Remove the handler that receive events on stdin. Otherwise, we
+ // will crash on shutdown.
+ ACE_Event_Handler::remove_stdin_handler (ACE_Reactor::instance (),
+ ACE_Thread_Manager::instance ());
+
+ // Close down the event channel.
+ this->event_channel_.close ();
+
+ // Need to make sure we cleanup this Singleton.
+ delete Options::instance ();
+ return 0;
+}
+
+// Returns information on the currently active service.
+
+int
+Gateway::info (ACE_TCHAR **strp, size_t length) const
+{
+ ACE_TCHAR buf[BUFSIZ];
+
+ ACE_OS::strcpy
+ (buf, ACE_TEXT ("Gateway daemon\t # Application-level gateway\n"));
+
+ if (*strp == 0 && (*strp = ACE_OS::strdup (buf)) == 0)
+ return -1;
+ else
+ ACE_OS::strncpy (*strp, buf, length);
+ return ACE_OS::strlen (buf);
+}
+
+// Parse and build the proxy table.
+
+int
+Gateway::parse_connection_config_file (void)
+{
+ // File that contains the proxy configuration information.
+ Connection_Config_File_Parser connection_file;
+ int file_empty = 1;
+ int line_number = 0;
+
+ if (connection_file.open (Options::instance ()->connection_config_file ()) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ ACE_TEXT ("(%t) %p\n"),
+ Options::instance ()->connection_config_file ()),
+ -1);
+
+ // Keep track of the previous connection id to make sure the
+ // connection config file isn't corrupted.
+ int previous_connection_id = 0;
+
+ // Read config file one line at a time.
+
+ for (Connection_Config_Info pci;
+ connection_file.read_entry (pci, line_number) != FP::RT_EOFILE;
+ )
+ {
+ file_empty = 0;
+
+ // First time in check.
+ if (previous_connection_id == 0)
+ {
+ previous_connection_id = 1;
+
+ if (pci.connection_id_ != 1)
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("(%t) warning, the first connection id should be 1 not %d\n"),
+ pci.connection_id_));
+ }
+ else if (previous_connection_id + 1 != pci.connection_id_)
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("(%t) warning, connection ids should keep increasing by 1 and %d + 1 != %d\n"),
+ previous_connection_id,
+ pci.connection_id_));
+
+ // Update the last connection id to ensure that we monotonically
+ // increase by 1.
+ previous_connection_id = pci.connection_id_;
+
+ if (Options::instance ()->enabled (Options::DEBUG))
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("(%t) conn id = %d, ")
+ ACE_TEXT ("host = %s, ")
+ ACE_TEXT ("remote port = %d, ")
+ ACE_TEXT ("proxy role = %c, ")
+ ACE_TEXT ("max retry timeout = %d, ")
+ ACE_TEXT ("local port = %d, ")
+ ACE_TEXT ("priority = %d\n"),
+ pci.connection_id_,
+ pci.host_,
+ pci.remote_port_,
+ pci.connection_role_,
+ pci.max_retry_timeout_,
+ pci.local_port_,
+ pci.priority_));
+
+ pci.event_channel_ = &this->event_channel_;
+
+ // Create the appropriate type of Proxy.
+ Connection_Handler *connection_handler;
+
+ ACE_ALLOCATOR_RETURN (connection_handler,
+ this->connection_handler_factory_.make_connection_handler (pci),
+ -1);
+
+ // Bind the new Connection_Handler to the connection ID.
+ this->event_channel_.bind_proxy (connection_handler);
+ }
+
+ // Keep track of the next available connection id, which is
+ // necessary for Peers that connect with us, rather than vice versa.
+ Options::instance ()->connection_id () = previous_connection_id + 1;
+
+ if (file_empty)
+ ACE_ERROR ((LM_WARNING,
+ ACE_TEXT ("warning: connection connection_handler configuration file was empty\n")));
+ return 0;
+}
+
+int
+Gateway::parse_consumer_config_file (void)
+{
+ // File that contains the consumer event forwarding information.
+ Consumer_Config_File_Parser consumer_file;
+ int file_empty = 1;
+ int line_number = 0;
+
+ if (consumer_file.open (Options::instance ()->consumer_config_file ()) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ ACE_TEXT ("(%t) %p\n"),
+ Options::instance ()->consumer_config_file ()),
+ -1);
+
+ // Read config file line at a time.
+ for (Consumer_Config_Info cci_entry;
+ consumer_file.read_entry (cci_entry, line_number) != FP::RT_EOFILE;
+ )
+ {
+ file_empty = 0;
+
+ if (Options::instance ()->enabled (Options::DEBUG))
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("(%t) connection id = %d, payload = %d, ")
+ ACE_TEXT ("number of consumers = %d\n"),
+ cci_entry.connection_id_,
+ cci_entry.type_,
+ cci_entry.total_consumers_));
+
+ for (int i = 0; i < cci_entry.total_consumers_; i++)
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("(%t) destination[%d] = %d\n"),
+ i,
+ cci_entry.consumers_[i]));
+ }
+
+ Consumer_Dispatch_Set *dispatch_set;
+ ACE_NEW_RETURN (dispatch_set,
+ Consumer_Dispatch_Set,
+ -1);
+
+ Event_Key event_addr (cci_entry.connection_id_,
+ cci_entry.type_);
+
+ // Add the Consumers to the Dispatch_Set.
+ for (int i = 0; i < cci_entry.total_consumers_; i++)
+ {
+ Connection_Handler *connection_handler = 0;
+
+ // Lookup destination and add to Consumer_Dispatch_Set set
+ // if found.
+ if (this->event_channel_.find_proxy (cci_entry.consumers_[i],
+ connection_handler) != -1)
+ dispatch_set->insert (connection_handler);
+ else
+ ACE_ERROR ((LM_ERROR,
+ ACE_TEXT ("(%t) not found: destination[%d] = %d\n"),
+ i,
+ cci_entry.consumers_[i]));
+ }
+
+ this->event_channel_.subscribe (event_addr, dispatch_set);
+ }
+
+ if (file_empty)
+ ACE_ERROR ((LM_WARNING,
+ ACE_TEXT ("warning: consumer map configuration file was empty\n")));
+ return 0;
+}
+
+// The following is a "Factory" used by the ACE_Service_Config and
+// svc.conf file to dynamically initialize the state of the Gateway.
+
+ACE_SVC_FACTORY_DEFINE (Gateway)
+
diff --git a/ACE/apps/Gateway/Gateway/Gateway.h b/ACE/apps/Gateway/Gateway/Gateway.h
new file mode 100644
index 00000000000..fe7d138b3a1
--- /dev/null
+++ b/ACE/apps/Gateway/Gateway/Gateway.h
@@ -0,0 +1,33 @@
+// -*- C++ -*-
+//
+// $Id$
+
+// ============================================================================
+//
+// = LIBRARY
+// gateway
+//
+// = FILENAME
+// Gateway.h
+//
+// = DESCRIPTION
+// Since the Gateway is an <ACE_Service_Object>, this file defines
+// the entry point into the Service Configurator framework.
+//
+// = AUTHOR
+// Doug Schmidt
+//
+// ============================================================================
+
+#ifndef ACE_GATEWAY
+#define ACE_GATEWAY
+
+#include "ace/svc_export.h"
+
+#if !defined (ACE_LACKS_PRAGMA_ONCE)
+# pragma once
+#endif /* ACE_LACKS_PRAGMA_ONCE */
+
+ACE_SVC_FACTORY_DECLARE (Gateway)
+
+#endif /* ACE_GATEWAY */
diff --git a/ACE/apps/Gateway/Gateway/Makefile.am b/ACE/apps/Gateway/Gateway/Makefile.am
new file mode 100644
index 00000000000..83392e20564
--- /dev/null
+++ b/ACE/apps/Gateway/Gateway/Makefile.am
@@ -0,0 +1,78 @@
+## Process this file with automake to create Makefile.in
+##
+## $Id$
+##
+## This file was generated by MPC. Any changes made directly to
+## this file will be lost the next time it is generated.
+##
+## MPC Command:
+## /acebuilds/ACE_wrappers-repository/bin/mwc.pl -include /acebuilds/MPC/config -include /acebuilds/MPC/templates -feature_file /acebuilds/ACE_wrappers-repository/local.features -noreldefs -type automake -exclude build,Kokyu
+
+ACE_BUILDDIR = $(top_builddir)
+ACE_ROOT = $(top_srcdir)
+
+## Makefile.Gateway.am
+
+noinst_LTLIBRARIES = libGateway.la
+
+libGateway_la_CPPFLAGS = \
+ -I$(ACE_ROOT) \
+ -I$(ACE_BUILDDIR)
+
+libGateway_la_SOURCES = \
+ Concrete_Connection_Handlers.cpp \
+ Config_Files.cpp \
+ Connection_Handler.cpp \
+ Connection_Handler_Acceptor.cpp \
+ Connection_Handler_Connector.cpp \
+ Event_Channel.cpp \
+ Event_Forwarding_Discriminator.cpp \
+ File_Parser.cpp \
+ Gateway.cpp \
+ Options.cpp
+
+noinst_HEADERS = \
+ Concrete_Connection_Handlers.h \
+ Config_Files.h \
+ Connection_Handler.h \
+ Connection_Handler_Acceptor.h \
+ Connection_Handler_Connector.h \
+ Event_Channel.h \
+ Event_Forwarding_Discriminator.h \
+ File_Parser.h \
+ Gateway.h \
+ Options.h
+
+## Makefile.gatewayd.am
+noinst_PROGRAMS = gatewayd
+
+gatewayd_CPPFLAGS = \
+ -I$(ACE_ROOT) \
+ -I$(ACE_BUILDDIR)
+
+gatewayd_SOURCES = \
+ gatewayd.cpp \
+ Concrete_Connection_Handlers.h \
+ Config_Files.h \
+ Connection_Handler.h \
+ Connection_Handler_Acceptor.h \
+ Connection_Handler_Connector.h \
+ Consumer_Dispatch_Set.h \
+ Event.h \
+ Event_Channel.h \
+ Event_Forwarding_Discriminator.h \
+ File_Parser.h \
+ Gateway.h \
+ Options.h
+
+gatewayd_LDADD = \
+ libGateway.la \
+ $(ACE_BUILDDIR)/ace/libACE.la
+
+## Clean up template repositories, etc.
+clean-local:
+ -rm -f *~ *.bak *.rpo *.sym lib*.*_pure_* core core.*
+ -rm -f gcctemp.c gcctemp so_locations *.ics
+ -rm -rf cxx_repository ptrepository ti_files
+ -rm -rf templateregistry ir.out
+ -rm -rf ptrepository SunWS_cache Templates.DB
diff --git a/ACE/apps/Gateway/Gateway/Options.cpp b/ACE/apps/Gateway/Gateway/Options.cpp
new file mode 100644
index 00000000000..cc80ce06c7d
--- /dev/null
+++ b/ACE/apps/Gateway/Gateway/Options.cpp
@@ -0,0 +1,288 @@
+// $Id$
+
+#define ACE_BUILD_SVC_DLL
+
+#include "Event.h"
+#include "Options.h"
+#include "ace/Get_Opt.h"
+#include "ace/Log_Msg.h"
+#include "ace/OS_NS_string.h"
+#include "ace/OS_NS_strings.h"
+#include "ace/os_include/os_fcntl.h"
+
+ACE_RCSID(Gateway, Options, "$Id$")
+
+// Static initialization.
+Options *Options::instance_ = 0;
+
+// Let's have a usage prompt.
+void
+Options::print_usage (void)
+{
+ ACE_DEBUG ((LM_INFO,
+ "gatewayd [-a {C|S}:acceptor-port] [-c {C|S}:connector-port]"
+ " [-C consumer_config_file] [-P connection_config_filename]"
+ " [-q socket_queue_size] [-t OUTPUT_MT|INPUT_MT] [-w time_out]"
+ " [-b] [-d] [-v] [-T]\n"
+ ""
+ "\t-a Become an Acceptor\n"
+ "\t-b Use blocking connection establishment\n"
+ "\t-c Become a Connector\n"
+ "\t-d debugging\n"
+ "\t-q Use a different socket queue size\n"
+ "\t-t Use a different threading strategy\n"
+ "\t-v Verbose mode\n"
+ "\t-w Time performance for a designated amount of time\n"
+ "\t-C Use a different proxy config filename\n"
+ "\t-P Use a different consumer config filename\n"
+ "\t-T Tracing\n"
+ ));
+}
+Options *
+Options::instance (void)
+{
+ if (Options::instance_ == 0)
+ ACE_NEW_RETURN (Options::instance_, Options, 0);
+
+ return Options::instance_;
+}
+
+Options::Options (void)
+ : locking_strategy_ (0),
+ performance_window_ (0),
+ blocking_semantics_ (ACE_NONBLOCK),
+ socket_queue_size_ (0),
+ threading_strategy_ (REACTIVE),
+ options_ (0),
+ supplier_acceptor_port_ (DEFAULT_GATEWAY_SUPPLIER_PORT),
+ consumer_acceptor_port_ (DEFAULT_GATEWAY_CONSUMER_PORT),
+ supplier_connector_port_ (DEFAULT_PEER_SUPPLIER_PORT),
+ consumer_connector_port_ (DEFAULT_PEER_CONSUMER_PORT),
+ max_timeout_ (MAX_TIMEOUT),
+ max_queue_size_ (MAX_QUEUE_SIZE),
+ connection_id_ (1)
+{
+ ACE_OS::strcpy (this->connection_config_file_, ACE_TEXT("connection_config"));
+ ACE_OS::strcpy (this->consumer_config_file_, ACE_TEXT("consumer_config"));
+}
+
+int
+Options::enabled (int option) const
+{
+ return ACE_BIT_ENABLED (this->options_, option);
+}
+
+Options::~Options (void)
+{
+ delete this->locking_strategy_;
+}
+
+ACE_Lock_Adapter<ACE_SYNCH_MUTEX> *
+Options::locking_strategy (void) const
+{
+ return this->locking_strategy_;
+}
+
+void
+Options::locking_strategy (ACE_Lock_Adapter<ACE_SYNCH_MUTEX> *ls)
+{
+ this->locking_strategy_ = ls;
+}
+
+long
+Options::performance_window (void) const
+{
+ return this->performance_window_;
+}
+
+CONNECTION_ID &
+Options::connection_id (void)
+{
+ return this->connection_id_;
+}
+
+long
+Options::max_timeout (void) const
+{
+ return this->max_timeout_;
+}
+
+int
+Options::blocking_semantics (void) const
+{
+ return this->blocking_semantics_;
+}
+
+int
+Options::socket_queue_size (void) const
+{
+ return this->socket_queue_size_;
+}
+
+u_long
+Options::threading_strategy (void) const
+{
+ return this->threading_strategy_;
+}
+
+const ACE_TCHAR *
+Options::connection_config_file (void) const
+{
+ return this->connection_config_file_;
+}
+
+const ACE_TCHAR *
+Options::consumer_config_file (void) const
+{
+ return this->consumer_config_file_;
+}
+
+u_short
+Options::consumer_acceptor_port (void) const
+{
+ return this->consumer_acceptor_port_;
+}
+
+u_short
+Options::supplier_acceptor_port (void) const
+{
+ return this->supplier_acceptor_port_;
+}
+
+u_short
+Options::consumer_connector_port (void) const
+{
+ return this->consumer_connector_port_;
+}
+
+long
+Options::max_queue_size (void) const
+{
+ return this->max_queue_size_;
+}
+
+u_short
+Options::supplier_connector_port (void) const
+{
+ return this->supplier_connector_port_;
+}
+
+// Parse the "command-line" arguments and set the corresponding flags.
+
+int
+Options::parse_args (int argc, ACE_TCHAR *argv[])
+{
+ // Assign defaults.
+ ACE_Get_Opt get_opt (argc,
+ argv,
+ ACE_TEXT("a:bC:c:dm:P:p:q:r:t:vw:"),
+ 0);
+
+ for (int c; (c = get_opt ()) != EOF; )
+ {
+ switch (c)
+ {
+ case 'a':
+ {
+ // Become an Acceptor.
+
+ for (ACE_TCHAR *flag = ACE_OS::strtok (get_opt.opt_arg (), ACE_TEXT("|"));
+ flag != 0;
+ flag = ACE_OS::strtok (0, ACE_TEXT("|")))
+ if (ACE_OS::strncasecmp (flag, ACE_TEXT("C"), 1) == 0)
+ {
+ ACE_SET_BITS (this->options_,
+ Options::CONSUMER_ACCEPTOR);
+ if (ACE_OS::strlen (flag) > 1)
+ // Set the Consumer Acceptor port number.
+ this->consumer_acceptor_port_ = ACE_OS::atoi (flag + 2);
+ }
+ else if (ACE_OS::strncasecmp (flag, ACE_TEXT("S"), 1) == 0)
+ {
+ ACE_SET_BITS (this->options_,
+ Options::SUPPLIER_ACCEPTOR);
+ if (ACE_OS::strlen (flag) > 1)
+ // Set the Supplier Acceptor port number.
+ this->supplier_acceptor_port_ = ACE_OS::atoi (flag + 2);
+ }
+ }
+ break;
+ /* NOTREACHED */
+ case 'b': // Use blocking connection establishment.
+ this->blocking_semantics_ = 1;
+ break;
+ case 'C': // Use a different proxy config filename.
+ ACE_OS::strncpy (this->consumer_config_file_,
+ get_opt.opt_arg (),
+ sizeof this->consumer_config_file_
+ / sizeof (ACE_TCHAR));
+ break;
+ case 'c':
+ {
+ // Become a Connector.
+
+ for (ACE_TCHAR *flag = ACE_OS::strtok (get_opt.opt_arg (), ACE_TEXT("|"));
+ flag != 0;
+ flag = ACE_OS::strtok (0, ACE_TEXT("|")))
+ if (ACE_OS::strncasecmp (flag, ACE_TEXT("C"), 1) == 0)
+ {
+ ACE_SET_BITS (this->options_,
+ Options::CONSUMER_CONNECTOR);
+ if (ACE_OS::strlen (flag) > 1)
+ // Set the Consumer Connector port number.
+ this->consumer_connector_port_ = ACE_OS::atoi (flag + 2);
+ }
+ else if (ACE_OS::strncasecmp (flag, ACE_TEXT("S"), 1) == 0)
+ {
+ ACE_SET_BITS (this->options_,
+ Options::SUPPLIER_CONNECTOR);
+ if (ACE_OS::strlen (flag) > 1)
+ // Set the Supplier Connector port number.
+ this->supplier_connector_port_ = ACE_OS::atoi (flag + 2);
+ }
+ }
+ break;
+ /* NOTREACHED */
+ case 'd': // We are debugging.
+ ACE_SET_BITS (this->options_,
+ Options::DEBUG);
+ break;
+ case 'P': // Use a different connection config filename.
+ ACE_OS::strncpy (this->connection_config_file_,
+ get_opt.opt_arg (),
+ sizeof this->connection_config_file_);
+ break;
+ case 'q': // Use a different socket queue size.
+ this->socket_queue_size_ = ACE_OS::atoi (get_opt.opt_arg ());
+ break;
+ case 't': // Use a different threading strategy.
+ {
+ for (ACE_TCHAR *flag = ACE_OS::strtok (get_opt.opt_arg (), ACE_TEXT("|"));
+ flag != 0;
+ flag = ACE_OS::strtok (0, ACE_TEXT("|")))
+ if (ACE_OS::strcmp (flag, ACE_TEXT("OUTPUT_MT")) == 0)
+ ACE_SET_BITS (this->threading_strategy_,
+ Options::OUTPUT_MT);
+ else if (ACE_OS::strcmp (flag, ACE_TEXT("INPUT_MT")) == 0)
+ ACE_SET_BITS (this->threading_strategy_,
+ Options::INPUT_MT);
+ break;
+ }
+ case 'v': // Verbose mode.
+ ACE_SET_BITS (this->options_,
+ Options::VERBOSE);
+ break;
+ case 'w': // Time performance for a designated amount of time.
+ this->performance_window_ = ACE_OS::atoi (get_opt.opt_arg ());
+ // Use blocking connection semantics so that we get accurate
+ // timings (since all connections start at once).
+ this->blocking_semantics_ = 0;
+ break;
+ default:
+ this->print_usage(); // It's nice to have a usage prompt.
+ break;
+ }
+ }
+
+ return 0;
+}
diff --git a/ACE/apps/Gateway/Gateway/Options.h b/ACE/apps/Gateway/Gateway/Options.h
new file mode 100644
index 00000000000..56f7c95e7e7
--- /dev/null
+++ b/ACE/apps/Gateway/Gateway/Options.h
@@ -0,0 +1,196 @@
+/* -*- C++ -*- */
+// $Id$
+
+// ============================================================================
+//
+// = LIBRARY
+// gateway
+//
+// = FILENAME
+// Options.h
+//
+// = AUTHOR
+// Douglas C. Schmidt <schmidt@cs.wustl.edu>
+//
+// ============================================================================
+
+#ifndef OPTIONS_H
+#define OPTIONS_H
+
+#include "ace/config-all.h"
+
+#if !defined (ACE_LACKS_PRAGMA_ONCE)
+# pragma once
+#endif /* ACE_LACKS_PRAGMA_ONCE */
+
+#include "ace/svc_export.h"
+#include "ace/Lock_Adapter_T.h"
+#include "ace/Synch_Traits.h"
+#include "ace/Thread_Mutex.h"
+
+class ACE_Svc_Export Options
+{
+ // = TITLE
+ // Singleton that consolidates all Options for a gatewayd.
+public:
+ // = Options that can be enabled/disabled.
+ enum
+ {
+ // = The types of threading strategies.
+ REACTIVE = 0,
+ OUTPUT_MT = 1,
+ INPUT_MT = 2,
+
+ VERBOSE = 01,
+ DEBUG = 02,
+
+ SUPPLIER_ACCEPTOR = 04,
+ CONSUMER_ACCEPTOR = 010,
+ SUPPLIER_CONNECTOR = 020,
+ CONSUMER_CONNECTOR = 040
+ };
+
+ static Options *instance (void);
+ // Return Singleton.
+
+ ~Options (void);
+ // Termination.
+
+ int parse_args (int argc, ACE_TCHAR *argv[]);
+ // Parse the arguments and set the options.
+
+ void print_usage(void);
+ // Print the gateway supported parameters.
+ // = Accessor methods.
+ int enabled (int option) const;
+ // Determine if an option is enabled.
+
+ ACE_Lock_Adapter<ACE_SYNCH_MUTEX> *locking_strategy (void) const;
+ // Gets the locking strategy used for serializing access to the
+ // reference count in <ACE_Message_Block>. If it's 0, then there's
+ // no locking strategy and we're using a REACTIVE concurrency
+ // strategy.
+
+ void locking_strategy (ACE_Lock_Adapter<ACE_SYNCH_MUTEX> *);
+ // Set the locking strategy used for serializing access to the
+ // reference count in <ACE_Message_Block>.
+
+ long performance_window (void) const;
+ // Number of seconds after connection establishment to report
+ // throughput.
+
+ int blocking_semantics (void) const;
+ // 0 == blocking connects, ACE_NONBLOCK == non-blocking connects.
+
+ int socket_queue_size (void) const;
+ // Size of the socket queue (0 means "use default").
+
+ u_long threading_strategy (void) const;
+ // i.e., REACTIVE, OUTPUT_MT, and/or INPUT_MT.
+
+ u_short supplier_acceptor_port (void) const;
+ // Our acceptor port number, i.e., the one that we passively listen
+ // on for connections to arrive from a gatewayd and create a
+ // Supplier.
+
+ u_short consumer_acceptor_port (void) const;
+ // Our acceptor port number, i.e., the one that we passively listen
+ // on for connections to arrive from a gatewayd and create a
+ // Consumer.
+
+ u_short supplier_connector_port (void) const;
+ // The connector port number, i.e., the one that we use to actively
+ // establish connections with a gatewayd and create a Supplier.
+
+ u_short consumer_connector_port (void) const;
+ // The connector port number, i.e., the one that we use to actively
+ // establish connections with a gatewayd and create a Consumer.
+
+ const ACE_TCHAR *connection_config_file (void) const;
+ // Name of the connection configuration file.
+
+ const ACE_TCHAR *consumer_config_file (void) const;
+ // Name of the consumer map configuration file.
+
+ long max_timeout (void) const;
+ // The maximum retry timeout delay.
+
+ long max_queue_size (void) const;
+ // The maximum size of the queue.
+
+ CONNECTION_ID &connection_id (void);
+ // Returns a reference to the next available connection id;
+
+private:
+ enum
+ {
+ MAX_QUEUE_SIZE = 1024 * 1024 * 16,
+ // We'll allow up to 16 megabytes to be queued per-output proxy.
+
+ MAX_TIMEOUT = 32
+ // The maximum timeout for trying to re-establish connections.
+ };
+
+ Options (void);
+ // Initialization.
+
+ static Options *instance_;
+ // Options Singleton instance.
+
+ ACE_Lock_Adapter<ACE_SYNCH_MUTEX> *locking_strategy_;
+ // Points to the locking strategy used for serializing access to the
+ // reference count in <ACE_Message_Block>. If it's 0, then there's
+ // no locking strategy and we're using a REACTIVE concurrency
+ // strategy.
+
+ long performance_window_;
+ // Number of seconds after connection establishment to report
+ // throughput.
+
+ int blocking_semantics_;
+ // 0 == blocking connects, ACE_NONBLOCK == non-blocking connects.
+
+ int socket_queue_size_;
+ // Size of the socket queue (0 means "use default").
+
+ u_long threading_strategy_;
+ // i.e., REACTIVE, OUTPUT_MT, and/or INPUT_MT.
+
+ u_long options_;
+ // Flag to indicate if we want verbose diagnostics.
+
+ u_short supplier_acceptor_port_;
+ // The acceptor port number, i.e., the one that we passively listen
+ // on for connections to arrive from a gatewayd and create a
+ // Supplier.
+
+ u_short consumer_acceptor_port_;
+ // The acceptor port number, i.e., the one that we passively listen
+ // on for connections to arrive from a gatewayd and create a
+ // Consumer.
+
+ u_short supplier_connector_port_;
+ // The connector port number, i.e., the one that we use to actively
+ // establish connections with a gatewayd and create a Supplier.
+
+ u_short consumer_connector_port_;
+ // The connector port number, i.e., the one that we use to actively
+ // establish connections with a gatewayd and create a Consumer.
+
+ long max_timeout_;
+ // The maximum retry timeout delay.
+
+ long max_queue_size_;
+ // The maximum size of the queue.
+
+ CONNECTION_ID connection_id_;
+ // The next available connection id.
+
+ ACE_TCHAR connection_config_file_[MAXPATHLEN + 1];
+ // Name of the connection configuration file.
+
+ ACE_TCHAR consumer_config_file_[MAXPATHLEN + 1];
+ // Name of the consumer map configuration file.
+};
+
+#endif /* OPTIONS_H */
diff --git a/ACE/apps/Gateway/Gateway/connection_config b/ACE/apps/Gateway/Gateway/connection_config
new file mode 100644
index 00000000000..93730edc0da
--- /dev/null
+++ b/ACE/apps/Gateway/Gateway/connection_config
@@ -0,0 +1,55 @@
+# Configuration file that the gatewayd process uses to determine
+# connection information about proxies.
+#
+# The following provides an explanation for the fields in this file,
+# and how they relate to fields in the corresponding "consumer_config"
+# file.
+#
+# 1. Connection ID -- Each Connection Handler is given a unique ID
+# that is used in the "consumer_config" file to specify to which
+# Consumers the Event Channel will forward incoming events from
+# Suppliers using that connection. The Connection ID field is the
+# "key" that is used to match up connections in this file with the
+# Consumer subscription requests in the "consumer_config" file.
+# The connection ids should start at 1 and monotonically increase
+# by increments of 1. This makes it possible for the Gateway to
+# properly allocate connection ids for Peers that connect to it.
+#
+# 2. Host -- The host name where the Supplier/Consumer peerd
+# process is running.
+#
+# 3. Remote Port -- The port number where the remote
+# Supplier/Consumer peerd process is listening on.
+# If this is a '*' character it is an indication to the
+# Gateway to use the "default value," e.g., which can be provided
+# on the command-line, etc.
+#
+# 4. Handler Role -- i.e., Consumer ('C') or Supplier ('S')
+#
+# 5. Max Retry Timeout -- The maximum amount of time that we'll
+# wait between retry attempts (these start at 1 second and
+# double until they reach the Max Retry Timeout).
+# If this is a '*' character it is an indication to the
+# Gateway to use the "default value," e.g., which can be provided
+# on the command-line, etc.
+#
+# 6. Local Port -- The port number that we want to use for
+# our local Proxy connection. If this is the value 0 or the '*'
+# character, then we'll let the socket implementation pick this
+# value for us.
+#
+# 7. Priority -- Each Consumer/Supplier can be given a priority
+# that will determine its importance relative to other
+# Consumers/Suppliers (this feature isn't implemented yet).
+#
+# Connection Host Remote Handler Max Retry Local Priority
+# ID Port Role Timeout Port
+# ---------- -------- ------ ------ ---------- ----- --------
+ 1 localhost * S * * 1
+ 2 localhost * C * * 1
+# 3 mambo.cs * C * * 1
+# 4 lambada.cs * C * * 1
+# 5 lambada.cs * C * * 1
+# 6 tango.cs * C * * 1
+# 7 tango.cs * S * * 1
+# 8 tango.cs * C * * 1
diff --git a/ACE/apps/Gateway/Gateway/consumer_config b/ACE/apps/Gateway/Gateway/consumer_config
new file mode 100644
index 00000000000..1aaa3fc4028
--- /dev/null
+++ b/ACE/apps/Gateway/Gateway/consumer_config
@@ -0,0 +1,35 @@
+# Configuration file that the gatewayd process uses to determine which
+# Consumers will receive events from which Suppliers. For now, the
+# Gateway only allows Consumers to "subscribe" to receive events from
+# particular Suppliers. A more flexible implementation will allow
+# Consumers to subscribe to particular types of events, as well.
+#
+# The following provides an explanation for the fields in this file,
+# and how they relate to fields in the corresponding "connection_config"
+# file.
+#
+# 1. Connection ID -- Each Connection Handler is given a unique ID
+# that is used in the "consumer_config" file to specify to which
+# Consumers the Event Channel will forward incoming events from
+# Suppliers. The Connection ID field is the "key" that is used to
+# match up Consumer subscription requests in this file with
+# connections in the "connection_config" file.
+#
+# 2. Event Type -- Indicates the type of the event. Consumers
+# can use this to only subscribe to certain types of events.
+# This feature is currently not implemented.
+#
+# 3. Consumers -- Indicates which Consumers will receive events sent
+# from this Proxy/Supplier ID, i.e., Consumers can subscribe to
+# receive events from particular Suppliers. Note that more than
+# one Consumer can subscribe to the same Supplier event, i.e.,
+# we support logical "multicast" (which is currently implemented
+# using multi-point unicast via TCP/IP).
+#
+# Connection Event Consumers
+# ID Type
+# ---------- ---- ---------
+ 1 0 2
+# 2 0 3,4
+# 3 0 4
+# 4 0 5
diff --git a/ACE/apps/Gateway/Gateway/gateway.mpc b/ACE/apps/Gateway/Gateway/gateway.mpc
new file mode 100644
index 00000000000..9403ba858b1
--- /dev/null
+++ b/ACE/apps/Gateway/Gateway/gateway.mpc
@@ -0,0 +1,29 @@
+// -*- MPC -*-
+// $Id$
+
+project(Gateway) : acelib {
+ sharedname = Gateway
+ Source_Files {
+ Concrete_Connection_Handlers.cpp
+ Config_Files.cpp
+ File_Parser.cpp
+ Gateway.cpp
+ Event_Channel.cpp
+ Event_Forwarding_Discriminator.cpp
+ Options.cpp
+ Connection_Handler.cpp
+ Connection_Handler_Acceptor.cpp
+ Connection_Handler_Connector.cpp
+ }
+}
+
+project(gatewayd) : aceexe {
+ exename = gatewayd
+ after += Gateway
+ libs += Gateway
+
+ Source_Files {
+ gatewayd.cpp
+ }
+}
+
diff --git a/ACE/apps/Gateway/Gateway/gatewayd.cpp b/ACE/apps/Gateway/Gateway/gatewayd.cpp
new file mode 100644
index 00000000000..2c598c6805f
--- /dev/null
+++ b/ACE/apps/Gateway/Gateway/gatewayd.cpp
@@ -0,0 +1,69 @@
+// $Id$
+
+// ============================================================================
+//
+// = LIBRARY
+// gateway
+//
+// = FILENAME
+// peerd.h
+//
+// = DESCRIPTION
+// Driver for the gateway daemon (gatewayd). Note that this is
+// completely generic code due to the Service Configurator
+// framework!
+//
+// = AUTHOR
+// Douglas C. Schmidt
+//
+// ============================================================================
+
+#include "ace/OS_NS_unistd.h"
+#include "ace/Service_Config.h"
+#include "ace/Service_Object.h"
+#include "ace/Log_Msg.h"
+#include "ace/Reactor.h"
+#include "Gateway.h"
+
+ACE_RCSID (Gateway,
+ gatewayd,
+ "$Id$")
+
+int
+ACE_TMAIN (int argc, ACE_TCHAR *argv[])
+{
+ if (ACE_OS::access (ACE_DEFAULT_SVC_CONF, F_OK) != 0)
+ {
+ // Use static linking.
+ ACE_Service_Object_Ptr sp = ACE_SVC_INVOKE (Gateway);
+
+ if (sp->init (argc - 1, argv + 1) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ ACE_TEXT ("%p\n"),
+ ACE_TEXT ("init")),
+ 1);
+
+ // Run forever, performing the configured services until we
+ // are shut down by a SIGINT/SIGQUIT signal.
+
+ ACE_Reactor::instance ()->run_reactor_event_loop ();
+
+ // Destructor of <ACE_Service_Object_Ptr> automagically call
+ // <fini>.
+ }
+ else
+ {
+ if (ACE_Service_Config::open (argc, argv) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ ACE_TEXT ("%p\n"),
+ ACE_TEXT ("open")),
+ 1);
+ else // Use dynamic linking.
+
+ // Run forever, performing the configured services until we are
+ // shut down by a signal (e.g., SIGINT or SIGQUIT).
+
+ ACE_Reactor::instance ()->run_reactor_event_loop ();
+ }
+ return 0;
+}
diff --git a/ACE/apps/Gateway/Gateway/svc.conf b/ACE/apps/Gateway/Gateway/svc.conf
new file mode 100644
index 00000000000..3698b0e3e13
--- /dev/null
+++ b/ACE/apps/Gateway/Gateway/svc.conf
@@ -0,0 +1,3 @@
+#static Svc_Manager "-d -p 2913"
+dynamic Gateway Service_Object * Gateway:_make_Gateway() active "-b -d -c C|S -a C|S -P connection_config -C consumer_config"
+
diff --git a/ACE/apps/Gateway/Makefile.am b/ACE/apps/Gateway/Makefile.am
new file mode 100644
index 00000000000..ff47f4b6ecd
--- /dev/null
+++ b/ACE/apps/Gateway/Makefile.am
@@ -0,0 +1,14 @@
+## Process this file with automake to create Makefile.in
+##
+## $Id$
+##
+## This file was generated by MPC. Any changes made directly to
+## this file will be lost the next time it is generated.
+##
+## MPC Command:
+## /acebuilds/ACE_wrappers-repository/bin/mwc.pl -include /acebuilds/MPC/config -include /acebuilds/MPC/templates -feature_file /acebuilds/ACE_wrappers-repository/local.features -noreldefs -type automake -exclude build,Kokyu
+
+SUBDIRS = \
+ Gateway \
+ Peer
+
diff --git a/ACE/apps/Gateway/Peer/Makefile.am b/ACE/apps/Gateway/Peer/Makefile.am
new file mode 100644
index 00000000000..843f407270a
--- /dev/null
+++ b/ACE/apps/Gateway/Peer/Makefile.am
@@ -0,0 +1,52 @@
+## Process this file with automake to create Makefile.in
+##
+## $Id$
+##
+## This file was generated by MPC. Any changes made directly to
+## this file will be lost the next time it is generated.
+##
+## MPC Command:
+## /acebuilds/ACE_wrappers-repository/bin/mwc.pl -include /acebuilds/MPC/config -include /acebuilds/MPC/templates -feature_file /acebuilds/ACE_wrappers-repository/local.features -noreldefs -type automake -exclude build,Kokyu
+
+ACE_BUILDDIR = $(top_builddir)
+ACE_ROOT = $(top_srcdir)
+
+## Makefile.Gateway_Peer.am
+
+noinst_LTLIBRARIES = libGateway_Peer.la
+
+libGateway_Peer_la_CPPFLAGS = \
+ -I$(ACE_ROOT) \
+ -I$(ACE_BUILDDIR)
+
+libGateway_Peer_la_SOURCES = \
+ Options.cpp \
+ Peer.cpp
+
+noinst_HEADERS = \
+ Options.h \
+ Peer.h
+
+## Makefile.gateway_peerd.am
+noinst_PROGRAMS = peerd
+
+peerd_CPPFLAGS = \
+ -I$(ACE_ROOT) \
+ -I$(ACE_BUILDDIR)
+
+peerd_SOURCES = \
+ peerd.cpp \
+ Options.h \
+ Peer.h
+
+peerd_LDADD = \
+ libGateway_Peer.la \
+ $(ACE_BUILDDIR)/ace/libACE.la
+
+## Clean up template repositories, etc.
+clean-local:
+ -rm -f *~ *.bak *.rpo *.sym lib*.*_pure_* core core.*
+ -rm -f gcctemp.c gcctemp so_locations *.ics
+ -rm -rf cxx_repository ptrepository ti_files
+ -rm -rf templateregistry ir.out
+ -rm -rf ptrepository SunWS_cache Templates.DB
diff --git a/ACE/apps/Gateway/Peer/Options.cpp b/ACE/apps/Gateway/Peer/Options.cpp
new file mode 100644
index 00000000000..0b33552e629
--- /dev/null
+++ b/ACE/apps/Gateway/Peer/Options.cpp
@@ -0,0 +1,201 @@
+// $Id$
+
+#define ACE_BUILD_SVC_DLL
+
+#include "ace/Get_Opt.h"
+#include "ace/Log_Msg.h"
+#include "ace/OS_NS_stdlib.h"
+#include "ace/OS_NS_strings.h"
+#include "ace/OS_NS_string.h"
+#include "ace/OS_Memory.h"
+#include "Options.h"
+
+ACE_RCSID(Peer, Options, "$Id$")
+
+// Static initialization.
+Options *Options::instance_ = 0;
+
+void
+Options::print_usage_and_die (void)
+{
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("%n [-a {C|S}:acceptor-port] [-c {C|S}:connector-port] [-C connection-id] [-h gateway-host] [-q max-queue-size] [-t timeout] [-v]\n")));
+ ACE_OS::exit (1);
+}
+
+Options::Options (void)
+ : options_ (0),
+ supplier_acceptor_port_ (DEFAULT_PEER_SUPPLIER_PORT),
+ consumer_acceptor_port_ (DEFAULT_PEER_CONSUMER_PORT),
+ supplier_connector_port_ (DEFAULT_GATEWAY_SUPPLIER_PORT),
+ consumer_connector_port_ (DEFAULT_GATEWAY_CONSUMER_PORT),
+ connector_host_ (ACE_DEFAULT_SERVER_HOST),
+ timeout_ (0),
+ max_queue_size_ (MAX_QUEUE_SIZE),
+ connection_id_ (0)
+{
+ char *timeout = ACE_OS::getenv ("TIMEOUT");
+
+ if (timeout == 0)
+ this->timeout_ = Options::DEFAULT_TIMEOUT;
+ else
+ this->timeout_ = ACE_OS::atoi (timeout);
+}
+
+Options *
+Options::instance (void)
+{
+ if (Options::instance_ == 0)
+ ACE_NEW_RETURN (Options::instance_, Options, 0);
+
+ return Options::instance_;
+}
+
+long
+Options::timeout (void) const
+{
+ return this->timeout_;
+}
+
+CONNECTION_ID &
+Options::connection_id (void)
+{
+ return this->connection_id_;
+}
+
+long
+Options::max_queue_size (void) const
+{
+ return this->max_queue_size_;
+}
+
+u_short
+Options::consumer_acceptor_port (void) const
+{
+ return this->consumer_acceptor_port_;
+}
+
+u_short
+Options::supplier_acceptor_port (void) const
+{
+ return this->supplier_acceptor_port_;
+}
+
+u_short
+Options::consumer_connector_port (void) const
+{
+ return this->consumer_connector_port_;
+}
+
+u_short
+Options::supplier_connector_port (void) const
+{
+ return this->supplier_connector_port_;
+}
+
+const ACE_TCHAR *
+Options::connector_host (void) const
+{
+ return this->connector_host_;
+}
+
+int
+Options::enabled (int option) const
+{
+ return ACE_BIT_ENABLED (this->options_, option);
+}
+
+void
+Options::parse_args (int argc, ACE_TCHAR *argv[])
+{
+ ACE_Get_Opt get_opt (argc, argv, ACE_TEXT ("a:c:C:h:m:t:v"), 0);
+
+ for (int c; (c = get_opt ()) != -1; )
+ {
+ switch (c)
+ {
+ case 'a':
+ {
+ // Become an Acceptor.
+
+ for (ACE_TCHAR *flag = ACE_OS::strtok (get_opt.opt_arg (),
+ ACE_TEXT ("|"));
+ flag != 0;
+ flag = ACE_OS::strtok (0, ACE_TEXT ("|")))
+ if (ACE_OS::strncasecmp (flag, ACE_TEXT ("C"), 1) == 0)
+ {
+ ACE_SET_BITS (this->options_,
+ Options::CONSUMER_ACCEPTOR);
+ if (ACE_OS::strlen (flag) > 1)
+ // Set the Consumer Acceptor port number.
+ this->consumer_acceptor_port_ = ACE_OS::atoi (flag + 2);
+ }
+ else if (ACE_OS::strncasecmp (flag, ACE_TEXT ("S"), 1) == 0)
+ {
+ ACE_SET_BITS (this->options_,
+ Options::SUPPLIER_ACCEPTOR);
+ if (ACE_OS::strlen (flag) > 1)
+ // Set the Supplier Acceptor port number.
+ this->supplier_acceptor_port_ = ACE_OS::atoi (flag + 2);
+ }
+ }
+ break;
+ /* NOTREACHED */
+ case 'c':
+ {
+ // Become a Connector.
+
+ for (ACE_TCHAR *flag = ACE_OS::strtok (get_opt.opt_arg (),
+ ACE_TEXT ("|"));
+ flag != 0;
+ flag = ACE_OS::strtok (0, ACE_TEXT ("|")))
+ if (ACE_OS::strncasecmp (flag, ACE_TEXT ("C"), 1) == 0)
+ {
+ ACE_SET_BITS (this->options_,
+ Options::CONSUMER_CONNECTOR);
+ if (ACE_OS::strlen (flag) > 1)
+ // Set the Consumer Connector port number.
+ this->consumer_connector_port_ = ACE_OS::atoi (flag + 2);
+ }
+ else if (ACE_OS::strncasecmp (flag, ACE_TEXT ("S"), 1) == 0)
+ {
+ ACE_SET_BITS (this->options_,
+ Options::SUPPLIER_CONNECTOR);
+ if (ACE_OS::strlen (flag) > 1)
+ // Set the Supplier Connector port number.
+ this->supplier_connector_port_ = ACE_OS::atoi (flag + 2);
+ }
+ }
+ break;
+ /* NOTREACHED */
+ case 'C':
+ this->connection_id_ = ACE_OS::atoi (get_opt.opt_arg ());
+ break;
+ /* NOTREACHED */
+ case 'h':
+ // connector host
+ this->connector_host_ = get_opt.opt_arg ();
+ break;
+ /* NOTREACHED */
+ case 'm':
+ // max queue size.
+ this->max_queue_size_ = ACE_OS::atoi (get_opt.opt_arg ());
+ break;
+ /* NOTREACHED */
+ case 't':
+ // Timeout
+ this->timeout_ = ACE_OS::atoi (get_opt.opt_arg ());
+ break;
+ /* NOTREACHED */
+ case 'v':
+ // Verbose mode.
+ ACE_SET_BITS (this->options_, Options::VERBOSE);
+ break;
+ /* NOTREACHED */
+ default:
+ this->print_usage_and_die ();
+ /* NOTREACHED */
+ }
+ }
+}
+
diff --git a/ACE/apps/Gateway/Peer/Options.h b/ACE/apps/Gateway/Peer/Options.h
new file mode 100644
index 00000000000..bc872fe7df4
--- /dev/null
+++ b/ACE/apps/Gateway/Peer/Options.h
@@ -0,0 +1,135 @@
+// -*- C++ -*-
+//
+// $Id$
+
+// ============================================================================
+//
+// = LIBRARY
+// gateway
+//
+// = FILENAME
+// Options.h
+//
+// = AUTHOR
+// Douglas C. Schmidt
+//
+// ============================================================================
+
+#ifndef OPTIONS_H
+#define OPTIONS_H
+
+#include "../Gateway/Event.h"
+#include "ace/svc_export.h"
+
+class ACE_Svc_Export Options
+ // = TITLE
+ // Singleton that consolidates all Options for a peerd.
+{
+public:
+ // = Options that can be enabled/disabled.
+ enum
+ {
+ VERBOSE = 01,
+ SUPPLIER_ACCEPTOR = 02,
+ CONSUMER_ACCEPTOR = 04,
+ SUPPLIER_CONNECTOR = 010,
+ CONSUMER_CONNECTOR = 020
+ };
+
+ static Options *instance (void);
+ // Return Singleton.
+
+ void parse_args (int argc, ACE_TCHAR *argv[]);
+ // Parse the arguments and set the options.
+
+ // = Accessor methods.
+ int enabled (int option) const;
+ // Determine if an option is enabled.
+
+ u_short supplier_acceptor_port (void) const;
+ // Our acceptor port number, i.e., the one that we passively listen
+ // on for connections to arrive from a gatewayd and create a
+ // Supplier.
+
+ u_short consumer_acceptor_port (void) const;
+ // Our acceptor port number, i.e., the one that we passively listen
+ // on for connections to arrive from a gatewayd and create a
+ // Consumer.
+
+ u_short supplier_connector_port (void) const;
+ // The connector port number, i.e., the one that we use to actively
+ // establish connections with a gatewayd and create a Supplier.
+
+ u_short consumer_connector_port (void) const;
+ // The connector port number, i.e., the one that we use to actively
+ // establish connections with a gatewayd and create a Consumer.
+
+ const ACE_TCHAR *connector_host (void) const;
+ // Our connector port host, i.e., the host running the gatewayd
+ // process.
+
+ long timeout (void) const;
+ // Duration between disconnects.
+
+ long max_queue_size (void) const;
+ // The maximum size of the queue.
+
+ CONNECTION_ID &connection_id (void);
+ // Returns a reference to the connection id.
+
+private:
+ enum
+ {
+ MAX_QUEUE_SIZE = 1024 * 1024 * 16,
+ // We'll allow up to 16 megabytes to be queued per-output
+ // channel!!!! This is clearly a policy in search of
+ // refinement...
+
+ DEFAULT_TIMEOUT = 60
+ // By default, disconnect the peer every minute.
+ };
+
+ Options (void);
+ // Ensure Singleton.
+
+ void print_usage_and_die (void);
+ // Explain usage and exit.
+
+ static Options *instance_;
+ // Singleton.
+
+ u_long options_;
+ // Flag to indicate if we want verbose diagnostics.
+
+ u_short supplier_acceptor_port_;
+ // The acceptor port number, i.e., the one that we passively listen
+ // on for connections to arrive from a gatewayd and create a
+ // Supplier.
+
+ u_short consumer_acceptor_port_;
+ // The acceptor port number, i.e., the one that we passively listen
+ // on for connections to arrive from a gatewayd and create a
+ // Consumer.
+
+ u_short supplier_connector_port_;
+ // The connector port number, i.e., the one that we use to actively
+ // establish connections with a gatewayd and create a Supplier.
+
+ u_short consumer_connector_port_;
+ // The connector port number, i.e., the one that we use to actively
+ // establish connections with a gatewayd and create a Consumer.
+
+ const ACE_TCHAR *connector_host_;
+ // Our connector host, i.e., where the gatewayd process is running.
+
+ long timeout_;
+ // The amount of time to wait before disconnecting from the Peerd.
+
+ long max_queue_size_;
+ // The maximum size that the queue can grow to.
+
+ CONNECTION_ID connection_id_;
+ // The connection id.
+};
+
+#endif /* OPTIONS_H */
diff --git a/ACE/apps/Gateway/Peer/Peer.cpp b/ACE/apps/Gateway/Peer/Peer.cpp
new file mode 100644
index 00000000000..437d39f1611
--- /dev/null
+++ b/ACE/apps/Gateway/Peer/Peer.cpp
@@ -0,0 +1,890 @@
+// $Id$
+
+#define ACE_BUILD_SVC_DLL
+
+#include "ace/OS_NS_stdio.h"
+#include "ace/OS_NS_string.h"
+#include "ace/OS_NS_unistd.h"
+#include "ace/Signal.h"
+#include "Peer.h"
+
+ACE_RCSID(Peer, Peer, "$Id$")
+
+Peer_Handler::Peer_Handler (void)
+ : connection_id_ (-1), // Maybe it's better than 0.
+ msg_frag_ (0),
+ total_bytes_ (0)
+{
+ // Set the high water mark of the <ACE_Message_Queue>. This is used
+ // to exert flow control.
+ this->msg_queue ()->high_water_mark (Options::instance ()->max_queue_size ());
+ first_time_ = 1; // It will be first time to open Peer_Handler.
+}
+
+// Upcall from the <ACE_Acceptor::handle_input> that turns control
+// over to our application-specific Gateway handler.
+
+int
+Peer_Handler::open (void *a)
+{
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("handle = %d\n"),
+ this->peer ().get_handle ()));
+
+ // Call down to the base class to activate and register this handler
+ // with an <ACE_Reactor>.
+ if (this->inherited::open (a) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ ACE_TEXT ("%p\n"),
+ ACE_TEXT ("open")),
+ -1);
+
+ if (this->peer ().enable (ACE_NONBLOCK) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ ACE_TEXT ("%p\n"),
+ ACE_TEXT ("enable")),
+ -1);
+
+ ACE_Time_Value timeout (Options::instance ()->timeout ());
+
+ // Schedule the time between disconnects. This should really be a
+ // "tunable" parameter.
+ if (ACE_Reactor::instance ()->schedule_timer
+ (this, 0, timeout) == -1)
+ ACE_ERROR ((LM_ERROR,
+ ACE_TEXT ("%p\n"),
+ ACE_TEXT ("schedule_timer")));
+
+ // If there are events left in the queue, make sure we enable the
+ // <ACE_Reactor> appropriately to get them sent out.
+ if (this->msg_queue ()->is_empty () == 0
+ && ACE_Reactor::instance ()->schedule_wakeup
+ (this, ACE_Event_Handler::WRITE_MASK) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ ACE_TEXT ("%p\n"),
+ ACE_TEXT ("schedule_wakeup")),
+ -1);
+
+ // First action is to wait to be notified of our connection id.
+ this->do_action_ = &Peer_Handler::await_connection_id;
+ return 0;
+}
+
+int
+Peer_Handler::transmit (ACE_Message_Block *mb,
+ size_t n,
+ int event_type)
+{
+ Event *event = (Event *) mb->rd_ptr ();
+
+ // Initialize the header.
+ new (&event->header_) Event_Header (n,
+ this->connection_id_,
+ event_type,
+ 0);
+
+ // Convert all the fields into network byte order.
+ event->header_.encode ();
+
+ // Move the write pointer to the end of the event.
+ mb->wr_ptr (sizeof (Event_Header) + n);
+
+ if (this->put (mb) == -1)
+ {
+ if (errno == EWOULDBLOCK) // The queue has filled up!
+ ACE_ERROR ((LM_ERROR,
+ ACE_TEXT ("%p\n"),
+ ACE_TEXT ("gateway is flow controlled, so we're dropping events")));
+ else
+ ACE_ERROR ((LM_ERROR,
+ ACE_TEXT ("%p\n"),
+ ACE_TEXT ("transmission failure in transmit()"))); // Function name fixed.
+ // Caller is responsible for freeing a ACE_Message_Block
+ // if failures occur.
+ mb->release ();
+ return -1;
+ }
+ return 0;
+}
+
+// Read events from stdin and send them to the gatewayd.
+
+int
+Peer_Handler::transmit_stdin (void)
+{
+ // If return value is -1, then first_time_ must be reset to 1.
+ int result = 0;
+ if (this->connection_id_ != -1)
+ {
+ ACE_Message_Block *mb;
+
+ ACE_NEW_RETURN (mb,
+ ACE_Message_Block (sizeof (Event)),
+ -1);
+
+ // Cast the message block payload into an <Event> pointer.
+ Event *event = (Event *) mb->rd_ptr ();
+
+ ssize_t n = ACE_OS::read (ACE_STDIN,
+ event->data_,
+ sizeof event->data_);
+ switch (n)
+ {
+ case 0:
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("stdin closing down\n")));
+
+ // Take stdin out of the ACE_Reactor so we stop trying to
+ // send events.
+ ACE_Reactor::instance ()->remove_handler
+ (ACE_STDIN,
+ ACE_Event_Handler::DONT_CALL | ACE_Event_Handler::READ_MASK);
+ mb->release ();
+ result = 0; //
+ break;
+ /* NOTREACHED */
+ case -1:
+ mb->release ();
+ ACE_ERROR ((LM_ERROR,
+ ACE_TEXT ("%p\n"),
+ ACE_TEXT ("read")));
+ result = 0; //
+ break;
+ /* NOTREACHED */
+ default:
+ // Do not return directly, save the return value.
+ result = this->transmit (mb, n, ROUTING_EVENT);
+ break;
+ /* NOTREACHED */
+ }
+
+ // Do not return at here, but at exit of function.
+ /*return 0;*/
+ }
+ else
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("Must transmit over an opened channel.\n")));
+ result = -1; // Save return value at here, return at exit of function.
+ }
+ // If transmit error, the stdin-thread will be cancelled, so should
+ // reset first_time_ to 1, which will register_stdin_handler again.
+ if (result == -1)
+ first_time_ = 1;
+
+ return result;
+}
+
+// Perform a non-blocking <put> of event MB. If we are unable to send
+// the entire event the remainder is re-queue'd at the *front* of the
+// Message_Queue.
+
+int
+Peer_Handler::nonblk_put (ACE_Message_Block *mb)
+{
+ // Try to send the event. If we don't send it all (e.g., due to
+ // flow control), then re-queue the remainder at the head of the
+ // <ACE_Message_Queue> and ask the <ACE_Reactor> to inform us (via
+ // <handle_output>) when it is possible to try again.
+
+ ssize_t n = this->send (mb);
+
+ if (n == -1)
+ // -1 is returned only when things have really gone wrong (i.e.,
+ // not when flow control occurs).
+ return -1;
+ else if (errno == EWOULDBLOCK)
+ {
+ // We didn't manage to send everything, so requeue.
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("queueing activated on handle %d to connection id %d\n"),
+ this->get_handle (),
+ this->connection_id_));
+
+ // Re-queue in *front* of the list to preserve order.
+ if (this->msg_queue ()->enqueue_head
+ (mb,
+ (ACE_Time_Value *) &ACE_Time_Value::zero) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ ACE_TEXT ("%p\n"),
+ ACE_TEXT ("enqueue_head")),
+ -1);
+ // Tell ACE_Reactor to call us back when we can send again.
+ if (ACE_Reactor::instance ()->schedule_wakeup
+ (this, ACE_Event_Handler::WRITE_MASK) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ ACE_TEXT ("%p\n"),
+ ACE_TEXT ("schedule_wakeup")),
+ -1);
+ return 0;
+ }
+ else
+ return n;
+}
+
+// Finish sending a event when flow control conditions abate. This
+// method is automatically called by the ACE_Reactor.
+
+int
+Peer_Handler::handle_output (ACE_HANDLE)
+{
+ ACE_Message_Block *mb = 0;
+
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("in handle_output\n")));
+
+ if (this->msg_queue ()->dequeue_head
+ (mb,
+ (ACE_Time_Value *) &ACE_Time_Value::zero) != -1)
+ {
+ switch (this->nonblk_put (mb))
+ {
+ case 0: // Partial send.
+ ACE_ASSERT (errno == EWOULDBLOCK);
+ // Didn't write everything this time, come back later...
+ break;
+ /* NOTREACHED */
+ case -1:
+ // Caller is responsible for freeing a ACE_Message_Block if
+ // failures occur.
+ mb->release ();
+ ACE_ERROR ((LM_ERROR,
+ ACE_TEXT ("%p\n"),
+ ACE_TEXT ("transmission failure in handle_output")));
+ /* FALLTHROUGH */
+ default: // Sent the whole thing.
+ // If we succeed in writing the entire event (or we did not
+ // fail due to EWOULDBLOCK) then check if there are more
+ // events on the <ACE_Message_Queue>. If there aren't, tell
+ // the <ACE_Reactor> not to notify us anymore (at least
+ // until there are new events queued up).
+
+ if (this->msg_queue ()->is_empty ())
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("queue now empty on handle %d to connection id %d\n"),
+ this->get_handle (),
+ this->connection_id_));
+
+ if (ACE_Reactor::instance ()->cancel_wakeup
+ (this, ACE_Event_Handler::WRITE_MASK) == -1)
+ ACE_ERROR ((LM_ERROR,
+ ACE_TEXT ("%p\n"),
+ ACE_TEXT ("cancel_wakeup")));
+ }
+ }
+ return 0;
+ }
+ else
+ // If the list is empty there's a bug!
+ ACE_ERROR_RETURN ((LM_ERROR,
+ ACE_TEXT ("%p\n"),
+ ACE_TEXT ("dequeue_head")),
+ 0);
+}
+
+// Send an event to a peer (may block if necessary).
+
+int
+Peer_Handler::put (ACE_Message_Block *mb, ACE_Time_Value *)
+{
+ if (this->msg_queue ()->is_empty ())
+ // Try to send the event *without* blocking!
+ return this->nonblk_put (mb);
+ else
+ // If we have queued up events due to flow control then just
+ // enqueue and return.
+ return this->msg_queue ()->enqueue_tail
+ (mb, (ACE_Time_Value *) &ACE_Time_Value::zero);
+}
+
+// Send an Peer event to gatewayd.
+
+int
+Peer_Handler::send (ACE_Message_Block *mb)
+{
+ size_t len = mb->length ();
+
+ ssize_t n = this->peer ().send (mb->rd_ptr (), len);
+
+ if (n <= 0)
+ return errno == EWOULDBLOCK ? 0 : n;
+ else if (n < (ssize_t) len)
+ {
+ // Re-adjust pointer to skip over the part we did send.
+ mb->rd_ptr (n);
+ this->total_bytes_ += n;
+ }
+ else // if (n == length).
+ {
+ // The whole event is sent, we can now safely deallocate the
+ // buffer. Note that this should decrement a reference count...
+ this->total_bytes_ += n;
+ mb->release ();
+ errno = 0;
+ }
+
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("sent %d bytes, total bytes sent = %d\n"),
+ n,
+ this->total_bytes_));
+ return n;
+}
+
+// Receive an Event from gatewayd. Handles fragmentation.
+
+int
+Peer_Handler::recv (ACE_Message_Block *&mb)
+{
+ if (this->msg_frag_ == 0)
+ // No existing fragment...
+ ACE_NEW_RETURN (this->msg_frag_,
+ ACE_Message_Block (sizeof (Event)),
+ -1);
+
+ Event *event = (Event *) this->msg_frag_->rd_ptr ();
+ ssize_t header_received = 0;
+
+ const size_t HEADER_SIZE = sizeof (Event_Header);
+ ssize_t header_bytes_left_to_read =
+ HEADER_SIZE - this->msg_frag_->length ();
+
+ if (header_bytes_left_to_read > 0)
+ {
+ header_received = this->peer ().recv
+ (this->msg_frag_->wr_ptr (),
+ header_bytes_left_to_read);
+
+ if (header_received == -1 /* error */
+ || header_received == 0 /* EOF */)
+ {
+ ACE_ERROR ((LM_ERROR,
+ ACE_TEXT ("%p\n"),
+ ACE_TEXT ("Recv error during header read")));
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("attempted to read %d bytes\n"),
+ header_bytes_left_to_read));
+ this->msg_frag_ = this->msg_frag_->release ();
+ return header_received;
+ }
+
+ // Bump the write pointer by the amount read.
+ this->msg_frag_->wr_ptr (header_received);
+
+ // At this point we may or may not have the ENTIRE header.
+ if (this->msg_frag_->length () < HEADER_SIZE)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("Partial header received: only %d bytes\n"),
+ this->msg_frag_->length ()));
+ // Notify the caller that we didn't get an entire event.
+ errno = EWOULDBLOCK;
+ return -1;
+ }
+
+ // Convert the header into host byte order so that we can access
+ // it directly without having to repeatedly muck with it...
+ event->header_.decode ();
+
+ if (event->header_.len_ > ACE_INT32 (sizeof event->data_))
+ {
+ // This data_ payload is too big!
+ errno = EINVAL;
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("Data payload is too big (%d bytes)\n"),
+ event->header_.len_));
+ return -1;
+ }
+ }
+
+ // At this point there is a complete, valid header in Event. Now we
+ // need to get the event payload. Due to incomplete reads this may
+ // not be the first time we've read in a fragment for this message.
+ // We account for this here. Note that the first time in here
+ // <msg_frag_->wr_ptr> will point to <event->data_>. Every time we
+ // do a successful fragment read, we advance <wr_ptr>. Therefore,
+ // by subtracting how much we've already read from the
+ // <event->header_.len_> we complete the
+ // <data_bytes_left_to_read>...
+
+ ssize_t data_bytes_left_to_read =
+ ssize_t (event->header_.len_ - (msg_frag_->wr_ptr () - event->data_));
+
+ // peer().recv() should not be called when data_bytes_left_to_read is 0.
+ ssize_t data_received = !data_bytes_left_to_read ? 0 :
+ this->peer ().recv (this->msg_frag_->wr_ptr (),
+ data_bytes_left_to_read);
+
+ // Try to receive the remainder of the event.
+
+ switch (data_received)
+ {
+ case -1:
+ if (errno == EWOULDBLOCK)
+ // This might happen if only the header came through.
+ return -1;
+ else
+ /* FALLTHROUGH */;
+
+ case 0: // Premature EOF.
+ if (data_bytes_left_to_read)
+ {
+ this->msg_frag_ = this->msg_frag_->release ();
+ return 0;
+ }
+ /* FALLTHROUGH */;
+
+ default:
+ // Set the write pointer at 1 past the end of the event.
+ this->msg_frag_->wr_ptr (data_received);
+
+ if (data_received != data_bytes_left_to_read)
+ {
+ errno = EWOULDBLOCK;
+ // Inform caller that we didn't get the whole event.
+ return -1;
+ }
+ else
+ {
+ // Set the read pointer to the beginning of the event.
+ this->msg_frag_->rd_ptr (this->msg_frag_->base ());
+
+ mb = this->msg_frag_;
+
+ // Reset the pointer to indicate we've got an entire event.
+ this->msg_frag_ = 0;
+ }
+
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("(%t) connection id = %d, cur len = %d, total bytes read = %d\n"),
+ event->header_.connection_id_,
+ event->header_.len_,
+ data_received + header_received));
+ if (Options::instance ()->enabled (Options::VERBOSE))
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("data_ = %*s\n"),
+ event->header_.len_ - 2,
+ event->data_));
+ return data_received + header_received;
+ }
+}
+
+// Receive various types of input (e.g., Peer event from the gatewayd,
+// as well as stdio).
+
+int
+Peer_Handler::handle_input (ACE_HANDLE sd)
+{
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("in handle_input, sd = %d\n"),
+ sd));
+ if (sd == ACE_STDIN) // Handle event from stdin.
+ return this->transmit_stdin ();
+ else
+ // Perform the appropriate action depending on the state we are
+ // in.
+ return (this->*do_action_) ();
+}
+
+// Action that receives our connection id from the Gateway.
+
+int
+Peer_Handler::await_connection_id (void)
+{
+ ssize_t n = this->peer ().recv (&this->connection_id_,
+ sizeof this->connection_id_);
+
+ if (n != sizeof this->connection_id_)
+ {
+ if (n == 0)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ ACE_TEXT ("gatewayd has closed down unexpectedly\n")),
+ -1);
+ else
+ ACE_ERROR_RETURN ((LM_ERROR,
+ ACE_TEXT ("%p, bytes received on handle %d = %d\n"),
+ ACE_TEXT ("recv"),
+ this->get_handle (),
+ n),
+ -1);
+ }
+ else
+ {
+ this->connection_id_ = ntohl (this->connection_id_);
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("assigned connection id %d\n"),
+ this->connection_id_));
+ }
+
+ // Subscribe for events if we're a Consumer.
+ if (Options::instance ()->enabled (Options::CONSUMER_CONNECTOR))
+ this->subscribe ();
+
+ // No need to disconnect by timeout.
+ ACE_Reactor::instance ()->cancel_timer(this);
+ // Transition to the action that waits for Peer events.
+ this->do_action_ = &Peer_Handler::await_events;
+
+ // Reset standard input.
+ ACE_OS::rewind (stdin);
+
+ // Call register_stdin_handler only once, until the stdin-thread
+ // closed which caused by transmit_stdin error.
+ if (first_time_)
+ {
+ // Register this handler to receive test events on stdin.
+ if (ACE_Event_Handler::register_stdin_handler
+ (this,
+ ACE_Reactor::instance (),
+ ACE_Thread_Manager::instance ()) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ ACE_TEXT ("(%t) %p\n"),
+ ACE_TEXT ("register_stdin_handler")),
+ -1);
+
+ // Next time in await_connection_id(), I'll don't call
+ // register_stdin_handler().
+ first_time_ = 0;
+ }
+ return 0;
+}
+
+int
+Peer_Handler::subscribe (void)
+{
+ ACE_Message_Block *mb;
+
+ ACE_NEW_RETURN (mb,
+ ACE_Message_Block (sizeof (Event)),
+ -1);
+
+ Subscription *subscription =
+ (Subscription *) ((Event *) mb->rd_ptr ())->data_;
+ subscription->connection_id_ =
+ Options::instance ()->connection_id ();
+
+ return this->transmit (mb, sizeof *subscription, SUBSCRIPTION_EVENT);
+}
+
+// Action that receives events from the Gateway.
+
+int
+Peer_Handler::await_events (void)
+{
+ ACE_Message_Block *mb = 0;
+
+ ssize_t n = this->recv (mb);
+
+ switch (n)
+ {
+ case 0:
+ ACE_ERROR_RETURN ((LM_ERROR,
+ ACE_TEXT ("gatewayd has closed down\n")),
+ -1);
+ /* NOTREACHED */
+ case -1:
+ if (errno == EWOULDBLOCK)
+ // A short-read, we'll come back and finish it up later on!
+ return 0;
+ else
+ ACE_ERROR_RETURN ((LM_ERROR,
+ ACE_TEXT ("%p\n"),
+ ACE_TEXT ("recv")),
+ -1);
+ /* NOTREACHED */
+ default:
+ {
+ // We got a valid event, so let's process it now! At the
+ // moment, we just print out the event contents...
+
+ Event *event = (Event *) mb->rd_ptr ();
+ this->total_bytes_ += mb->length ();
+
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("route id = %d, cur len = %d, total len = %d\n"),
+ event->header_.connection_id_,
+ event->header_.len_,
+ this->total_bytes_));
+ if (Options::instance ()->enabled (Options::VERBOSE))
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("data_ = %*s\n"),
+ event->header_.len_ - 2,
+ event->data_));
+ mb->release ();
+ return 0;
+ }
+ }
+}
+
+// Periodically send events via ACE_Reactor timer mechanism.
+
+int
+Peer_Handler::handle_timeout (const ACE_Time_Value &,
+ const void *)
+{
+ // Shut down the handler.
+ return this->handle_close ();
+}
+
+Peer_Handler::~Peer_Handler (void)
+{
+ // Shut down the handler.
+ this->handle_close ();
+}
+
+// Handle shutdown of the Peer object.
+
+int
+Peer_Handler::handle_close (ACE_HANDLE,
+ ACE_Reactor_Mask)
+{
+ if (this->get_handle () != ACE_INVALID_HANDLE)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("shutting down Peer on handle %d\n"),
+ this->get_handle ()));
+
+ ACE_Reactor_Mask mask =
+ ACE_Event_Handler::DONT_CALL | ACE_Event_Handler::READ_MASK;
+
+ // Explicitly remove ourselves for ACE_STDIN (the <ACE_Reactor>
+ // removes the HANDLE. Note that <ACE_Event_Handler::DONT_CALL>
+ // instructs the ACE_Reactor *not* to call <handle_close>, which
+ // would otherwise lead to infinite recursion!).
+ ACE_Reactor::instance ()->remove_handler
+ (ACE_STDIN, mask);
+
+ // Deregister this handler with the ACE_Reactor.
+ if (ACE_Reactor::instance ()->remove_handler
+ (this, mask) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ ACE_TEXT ("handle = %d: %p\n"),
+ this->get_handle (),
+ ACE_TEXT ("remove_handler")),
+ -1);
+ // Close down the peer.
+ this->peer ().close ();
+ }
+ return 0;
+}
+
+int
+Peer_Acceptor::start (u_short port)
+{
+ // This object only gets allocated once and is just recycled
+ // forever.
+ ACE_NEW_RETURN (peer_handler_, Peer_Handler, -1);
+
+ this->addr_.set (port);
+
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("opening acceptor at port %d\n"),
+ port));
+
+ // Call down to the <Acceptor::open> method.
+ if (this->inherited::open (this->addr_) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ ACE_TEXT ("%p\n"),
+ ACE_TEXT ("open")),
+ -1);
+ else if (this->acceptor ().get_local_addr (this->addr_) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ ACE_TEXT ("%p\n"),
+ ACE_TEXT ("get_local_addr")),
+ -1);
+ else
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("accepting at port %d\n"),
+ this->addr_.get_port_number ()));
+ return 0;
+}
+
+Peer_Acceptor::Peer_Acceptor (void)
+ : peer_handler_ (0)
+{
+}
+
+int
+Peer_Acceptor::close (void)
+{
+ // Will trigger a delete.
+ if (this->peer_handler_ != 0)
+ this->peer_handler_->destroy ();
+
+ // Close down the base class.
+ return this->inherited::close ();
+}
+
+// Note how this method just passes back the pre-allocated
+// <Peer_Handler> instead of having the <ACE_Acceptor> allocate a new
+// one each time!
+
+int
+Peer_Acceptor::make_svc_handler (Peer_Handler *&sh)
+{
+ sh = this->peer_handler_;
+ return 0;
+}
+
+int
+Peer_Connector::open_connector (Peer_Handler *&peer_handler,
+ u_short port)
+{
+ // This object only gets allocated once and is just recycled
+ // forever.
+ ACE_NEW_RETURN (peer_handler,
+ Peer_Handler,
+ -1);
+
+ ACE_INET_Addr addr (port,
+ Options::instance ()->connector_host ());
+
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("connecting to %s:%d\n"),
+ addr.get_host_name (),
+ addr.get_port_number ()));
+
+ if (this->connect (peer_handler, addr) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ ACE_TEXT ("%p\n"),
+ ACE_TEXT ("connect")),
+ -1);
+ else
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("connected to %C:%d\n"),
+ addr.get_host_name (),
+ addr.get_port_number ()));
+ return 0;
+}
+
+int
+Peer_Connector::open (ACE_Reactor *, int)
+{
+ this->supplier_peer_handler_ = 0;
+ this->consumer_peer_handler_ = 0;
+
+ if (Options::instance ()->enabled (Options::SUPPLIER_CONNECTOR)
+ && this->open_connector (this->supplier_peer_handler_,
+ Options::instance ()->supplier_connector_port ()) == -1)
+ return -1;
+
+ if (Options::instance ()->enabled (Options::CONSUMER_CONNECTOR)
+ && this->open_connector (this->consumer_peer_handler_,
+ Options::instance ()->consumer_connector_port ()) == -1)
+ return -1;
+
+ return 0;
+}
+
+int
+Peer_Factory::handle_signal (int signum, siginfo_t *, ucontext_t *)
+{
+ if (signum != SIGPIPE)
+ {
+ // Shut down the main event loop.
+ ACE_DEBUG((LM_NOTICE, ACE_TEXT ("Exit case signal\n"))); // Why do I exit?
+ ACE_Reactor::instance ()->end_reactor_event_loop();
+ }
+
+ return 0;
+}
+
+// Returns information on the currently active service.
+
+int
+Peer_Factory::info (ACE_TCHAR **strp, size_t length) const
+{
+ ACE_TCHAR buf[BUFSIZ];
+ ACE_TCHAR consumer_addr_str[BUFSIZ];
+ ACE_TCHAR supplier_addr_str[BUFSIZ];
+
+ ACE_INET_Addr addr;
+
+ if (this->consumer_acceptor_.acceptor ().get_local_addr (addr) == -1)
+ return -1;
+ else if (addr.addr_to_string (consumer_addr_str,
+ sizeof addr) == -1)
+ return -1;
+ else if (this->supplier_acceptor_.acceptor ().get_local_addr (addr) == -1)
+ return -1;
+ else if (addr.addr_to_string (supplier_addr_str,
+ sizeof addr) == -1)
+ return -1;
+
+ ACE_OS::strcpy (buf, ACE_TEXT ("peerd\t C:"));
+ ACE_OS::strcat (buf, consumer_addr_str);
+ ACE_OS::strcat (buf, ACE_TEXT ("|S:"));
+ ACE_OS::strcat (buf, supplier_addr_str);
+ ACE_OS::strcat
+ (buf, ACE_TEXT ("/tcp # Gateway traffic generator and data sink\n"));
+
+ if (*strp == 0 && (*strp = ACE_OS::strdup (buf)) == 0)
+ return -1;
+ else
+ ACE_OS::strncpy (*strp, buf, length);
+ return ACE_OS::strlen (buf);
+}
+
+// Hook called by the explicit dynamic linking facility to terminate
+// the peer.
+
+int
+Peer_Factory::fini (void)
+{
+ this->consumer_acceptor_.close ();
+ this->supplier_acceptor_.close ();
+ return 0;
+}
+
+// Hook called by the explicit dynamic linking facility to initialize
+// the peer.
+
+int
+Peer_Factory::init (int argc, ACE_TCHAR *argv[])
+{
+ Options::instance ()->parse_args (argc, argv);
+
+ ACE_Sig_Set sig_set;
+
+ sig_set.sig_add (SIGINT);
+ sig_set.sig_add (SIGQUIT);
+ sig_set.sig_add (SIGPIPE);
+
+ // Register ourselves to receive signals so we can shut down
+ // gracefully.
+
+ if (ACE_Reactor::instance ()->register_handler (sig_set,
+ this) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ ACE_TEXT ("%p\n"),
+ ACE_TEXT ("register_handler")),
+ -1);
+
+ if (Options::instance ()->enabled (Options::SUPPLIER_ACCEPTOR)
+ && this->supplier_acceptor_.start
+ (Options::instance ()->supplier_acceptor_port ()) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ ACE_TEXT ("%p\n"),
+ ACE_TEXT ("Acceptor::open")),
+ -1);
+ else if (Options::instance ()->enabled (Options::CONSUMER_ACCEPTOR)
+ && this->consumer_acceptor_.start
+ (Options::instance ()->consumer_acceptor_port ()) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ ACE_TEXT ("%p\n"),
+ ACE_TEXT ("Acceptor::open")),
+ -1);
+ else if (this->connector_.open () == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ ACE_TEXT ("%p\n"),
+ ACE_TEXT ("Connector::open")),
+ -1);
+ return 0;
+}
+
+// The following is a "Factory" used by the <ACE_Service_Config> and
+// svc.conf file to dynamically initialize the <Peer_Acceptor> and
+// <Peer_Connector>.
+
+ACE_SVC_FACTORY_DEFINE (Peer_Factory)
+
diff --git a/ACE/apps/Gateway/Peer/Peer.h b/ACE/apps/Gateway/Peer/Peer.h
new file mode 100644
index 00000000000..0723882a243
--- /dev/null
+++ b/ACE/apps/Gateway/Peer/Peer.h
@@ -0,0 +1,257 @@
+/* -*- C++ -*- */
+// $Id$
+
+// ============================================================================
+//
+// = LIBRARY
+// gateway
+//
+// = FILENAME
+// Peer.h
+//
+// = DESCRIPTION
+// These classes process Supplier/Consumer events sent from the
+// gateway (gatewayd) to its various peers (peerd). The general
+// collaboration works as follows:
+//
+// 1. <Peer_Acceptor> creates a listener endpoint and waits
+// passively for gatewayd to connect with it.
+//
+// 2. When a gatewayd connects, <Peer_Acceptor> creates an
+// <Peer_Handler> object that sends/receives events from
+// gatewayd on that connection.
+//
+// 3. The <Peer_Handler> waits for gatewayd to inform it of its
+// connection ID, which is prepended to all subsequent outgoing
+// events sent from peerd.
+//
+// 4. Once the connection ID is set, peerd periodically sends events
+// to gatewayd. Peerd also receives and "processes" events
+// forwarded to it from gatewayd. In this program, peerd
+// "processes" the events sent to it by writing them to stdout.
+//
+// Note that in the current peerd implementation, one Peer process
+// cannot serve as both a Consumer and Supplier of Events. This is
+// because the gatewayd establishes a separate connection for
+// Suppliers and Consumers and the peerd only maintains a single
+// <Peer_Handler> object to handle this one connection. Enhancing
+// this implementation to be both a Consumer and Supplier
+// simultaneously is straightforward, however. In addition,
+// multiple peerd processes can already work together to play these
+// different roles.
+//
+// = AUTHOR
+// Douglas C. Schmidt
+//
+// ============================================================================
+
+#ifndef PEER_H
+#define PEER_H
+
+#include "ace/Service_Config.h"
+
+#if !defined (ACE_LACKS_PRAGMA_ONCE)
+# pragma once
+#endif /* ACE_LACKS_PRAGMA_ONCE */
+
+#include "ace/Acceptor.h"
+#include "ace/SOCK_Acceptor.h"
+#include "ace/SOCK_Connector.h"
+#include "ace/Svc_Handler.h"
+#include "ace/Connector.h"
+#include "ace/Null_Condition.h"
+#include "ace/Null_Mutex.h"
+#include "Options.h"
+
+ACE_SVC_FACTORY_DECLARE (Peer_Factory)
+
+#if defined ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION_EXPORT
+template class ACE_Svc_Export ACE_Svc_Handler<ACE_SOCK_STREAM, ACE_NULL_SYNCH>;
+#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION_EXPORT */
+
+class ACE_Svc_Export Peer_Handler : public ACE_Svc_Handler<ACE_SOCK_STREAM, ACE_NULL_SYNCH>
+{
+ // = TITLE
+ // Handle Peer events arriving from a Gateway.
+public:
+ // = Initialization and termination methods.
+ Peer_Handler (void);
+ // Initialize the peer.
+
+ ~Peer_Handler (void);
+ // Shutdown the Peer.
+
+ virtual int open (void * = 0);
+ // Initialize the handler when called by
+ // <ACE_Acceptor::handle_input>.
+
+ virtual int handle_input (ACE_HANDLE);
+ // Receive and process peer events.
+
+ virtual int put (ACE_Message_Block *, ACE_Time_Value *tv = 0);
+ // Send a event to a gateway (may be queued if necessary due to flow
+ // control).
+
+ virtual int handle_output (ACE_HANDLE);
+ // Finish sending a event when flow control conditions abate.
+
+ virtual int handle_timeout (const ACE_Time_Value &,
+ const void *arg);
+ // Periodically send events via <ACE_Reactor> timer mechanism.
+
+ virtual int handle_close (ACE_HANDLE = ACE_INVALID_HANDLE,
+ ACE_Reactor_Mask = ACE_Event_Handler::ALL_EVENTS_MASK);
+ // Perform object termination.
+
+protected:
+ typedef ACE_Svc_Handler<ACE_SOCK_STREAM, ACE_NULL_SYNCH> inherited;
+
+ int transmit (ACE_Message_Block *mb,
+ size_t n,
+ int event_type);
+ // Transmit <mb> to the gatewayd.
+
+ virtual int recv (ACE_Message_Block *&mb);
+ // Receive an Peer event from gatewayd.
+
+ virtual int send (ACE_Message_Block *mb);
+ // Send an Peer event to gatewayd, using <nonblk_put>.
+
+ virtual int nonblk_put (ACE_Message_Block *mb);
+ // Perform a non-blocking <put>, which tries to send an event to the
+ // gatewayd, but only if it isn't flow controlled.
+
+ int subscribe (void);
+ // Register Consumer subscriptions with the gateway.
+
+ // = Event/state/action handlers.
+ int transmit_stdin (void);
+ // Receive a event from stdin and send it to the gateway.
+
+ int await_connection_id (void);
+ // Action that receives the route id.
+
+ int await_events (void);
+ // Action that receives events.
+
+ int (Peer_Handler::*do_action_)(void);
+ // Pointer-to-member-function for the current action to run in this
+ // state. This points to one of the preceding 3 methods.
+
+ CONNECTION_ID connection_id_;
+ // Connection ID of the peer, which is obtained from the gatewayd.
+
+ ACE_Message_Block *msg_frag_;
+ // Keep track of event fragments that arrive in non-blocking recv's
+ // from the gatewayd.
+
+ size_t total_bytes_;
+ // The total number of bytes sent/received to the gatewayd thus far.
+
+ int first_time_;
+ // Used to call register_stdin_handle only once. Otherwise, thread
+ // leak will occur on Win32.
+};
+
+#if defined ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION_EXPORT
+template class ACE_Svc_Export ACE_Acceptor<Peer_Handler, ACE_SOCK_ACCEPTOR>;
+#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION_EXPORT */
+
+class ACE_Svc_Export Peer_Acceptor : public ACE_Acceptor<Peer_Handler, ACE_SOCK_ACCEPTOR>
+{
+ // = TITLE
+ // Passively accept connections from gatewayd and dynamically
+ // create a new <Peer_Handler> object to communicate with the
+ // gatewayd.
+public:
+ // = Initialization and termination methods.
+ Peer_Acceptor (void);
+ // Default initialization.
+
+ int start (u_short);
+ // the <Peer_Acceptor>.
+
+ int close (void);
+ // Terminate the <Peer_Acceptor>.
+
+ virtual int make_svc_handler (Peer_Handler *&);
+ // Factory method that creates a <Peer_Handler> just once.
+
+private:
+ int open_acceptor (u_short port);
+ // Factor out common code for initializing the <Peer_Acceptor>.
+
+ Peer_Handler *peer_handler_;
+ // Pointer to <Peer_Handler> allocated just once.
+
+ ACE_INET_Addr addr_;
+ // Our acceptor addr.
+
+ typedef ACE_Acceptor<Peer_Handler, ACE_SOCK_ACCEPTOR> inherited;
+};
+
+#if defined ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION_EXPORT
+template class ACE_Svc_Export ACE_Connector<Peer_Handler, ACE_SOCK_CONNECTOR>;
+#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION_EXPORT */
+
+class ACE_Svc_Export Peer_Connector : public ACE_Connector<Peer_Handler, ACE_SOCK_CONNECTOR>
+{
+ // = TITLE
+ // Actively establish connections with gatewayd and dynamically
+ // create a new <Peer_Handler> object to communicate with the
+ // gatewayd.
+public:
+ // = Initialization method.
+ int open (ACE_Reactor * = 0, int = 0);
+ // Initialize the <Peer_Connector>. NOTE: the arguments are
+ // ignored. They are only provided to avoid a compiler warning
+ // about hiding the virtual function ACE_Connector<Peer_Handler,
+ // ACE_SOCK_CONNECTOR>::open(ACE_Reactor*, int).
+
+private:
+ int open_connector (Peer_Handler *&ph, u_short port);
+ // Factor out common code for initializing the <Peer_Connector>.
+
+ Peer_Handler *consumer_peer_handler_;
+ // Consumer <Peer_Handler> that is connected to a gatewayd.
+
+ Peer_Handler *supplier_peer_handler_;
+ // Supplier <Peer_Handler> that is connected to a gatewayd.
+};
+
+class ACE_Svc_Export Peer_Factory : public ACE_Service_Object
+{
+ // = TITLE
+ // A factory class that actively and/or passively establishes
+ // connections with the gatewayd.
+public:
+ // = Dynamic initialization and termination hooks from <ACE_Service_Object>.
+
+ virtual int init (int argc, ACE_TCHAR *argv[]);
+ // Initialize the acceptor and connector.
+
+ virtual int fini (void);
+ // Perform termination activities.
+
+ virtual int info (ACE_TCHAR **, size_t) const;
+ // Return info about this service.
+
+ virtual int handle_signal (int signum, siginfo_t *, ucontext_t *);
+ // Handle various signals (e.g., SIGPIPE, SIGINT, and SIGQUIT).
+
+private:
+ Peer_Acceptor consumer_acceptor_;
+ // Pointer to an instance of our <Peer_Acceptor> that's used to
+ // accept connections and create Consumers.
+
+ Peer_Acceptor supplier_acceptor_;
+ // Pointer to an instance of our <Peer_Acceptor> that's used to
+ // accept connections and create Suppliers.
+
+ Peer_Connector connector_;
+ // An instance of our <Peer_Connector>. Note that one
+ // <Peer_Connector> is used to establish <Peer_Handler>s for both
+ // Consumers and Suppliers.
+};
+
+#endif /* PEER_H */
diff --git a/ACE/apps/Gateway/Peer/peer.mpc b/ACE/apps/Gateway/Peer/peer.mpc
new file mode 100644
index 00000000000..7e2e24ba4aa
--- /dev/null
+++ b/ACE/apps/Gateway/Peer/peer.mpc
@@ -0,0 +1,23 @@
+// -*- MPC -*-
+// $Id$
+
+project(Gateway_Peer) : acelib {
+ sharedname = Gateway_Peer
+ Source_Files {
+ Options.cpp
+ Peer.cpp
+ }
+ Documentation_Files {
+ svc.conf
+ }
+}
+
+project(gateway_peerd) : aceexe {
+ exename = peerd
+ after += Gateway_Peer
+ libs += Gateway_Peer
+
+ Source_Files {
+ peerd.cpp
+ }
+} \ No newline at end of file
diff --git a/ACE/apps/Gateway/Peer/peerd.cpp b/ACE/apps/Gateway/Peer/peerd.cpp
new file mode 100644
index 00000000000..f4abd599322
--- /dev/null
+++ b/ACE/apps/Gateway/Peer/peerd.cpp
@@ -0,0 +1,63 @@
+// $Id$
+
+// ============================================================================
+//
+// = LIBRARY
+// gateway
+//
+// = FILENAME
+// peerd.h
+//
+// = DESCRIPTION
+// Driver for the peer daemon (peerd). Note that this is
+// completely generic code due to the Service Configurator
+// framework!
+//
+// = AUTHOR
+// Douglas C. Schmidt
+//
+// ============================================================================
+
+#include "ace/OS_NS_unistd.h"
+#include "Peer.h"
+
+ACE_RCSID(Peer, peerd, "$Id$")
+
+int
+ACE_TMAIN (int argc, ACE_TCHAR *argv[])
+{
+ if (ACE_OS::access (ACE_DEFAULT_SVC_CONF, F_OK) != 0)
+ {
+ // Use static linking.
+ ACE_Service_Object_Ptr sp = ACE_SVC_INVOKE (Peer_Factory);
+
+ if (sp->init (argc - 1, argv + 1) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ ACE_TEXT ("%p\n"),
+ ACE_TEXT ("init")),
+ 1);
+
+ // Run forever, performing the configured services until we are
+ // shut down by a SIGINT/SIGQUIT signal.
+
+ ACE_Reactor::instance ()->run_reactor_event_loop ();
+
+ // Destructor of <ACE_Service_Object_Ptr> automagically call
+ // <fini>.
+ }
+ else
+ {
+ if (ACE_Service_Config::open (argc, argv) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ ACE_TEXT ("%p\n"),
+ ACE_TEXT ("open")),
+ 1);
+ else // Use dynamic linking.
+
+ // Run forever, performing the configured services until we
+ // are shut down by a signal (e.g., SIGINT or SIGQUIT).
+
+ ACE_Reactor::instance ()->run_reactor_event_loop ();
+ }
+ return 0;
+}
diff --git a/ACE/apps/Gateway/Peer/svc.conf b/ACE/apps/Gateway/Peer/svc.conf
new file mode 100644
index 00000000000..707f457da2f
--- /dev/null
+++ b/ACE/apps/Gateway/Peer/svc.conf
@@ -0,0 +1,2 @@
+#static Svc_Manager "-d -p 291"
+dynamic Peer1 Service_Object * Gateway_Peer:_make_Peer_Factory() active "-a C|S"
diff --git a/ACE/apps/Gateway/README b/ACE/apps/Gateway/README
new file mode 100644
index 00000000000..7861e1051e9
--- /dev/null
+++ b/ACE/apps/Gateway/README
@@ -0,0 +1,136 @@
+OVERVIEW
+
+This directory contains source code for an application-level
+Communication Gateway implemented with ACE. This prototype was
+developed in my cs422 OS class at Washington University in 1994. The
+Gateway has recently been updated to illustrate the use of Event
+Channels, which forward events from Suppliers to Consumers in a
+distributed system.
+
+You can get a paper that explains the patterns used in this
+implementation at the following WWW URL:
+
+http://www.cs.wustl.edu/~schmidt/PDF/TAPOS-00.pdf
+
+----------------------------------------
+
+DIRECTORY STRUCTURE
+
+There are 2 directories:
+
+1. Gateway
+
+ This directory contains the source code for the
+ application-level Gateway process, gatewayd. The gatewayd
+ routes event messages between Peers. By default, the gatewayd
+ plays the Connector role and initializes itself by reading the
+ connection_config and consumer_config files:
+
+ 1. The connection_config file establishes the "physical
+ configuration" of the Consumer and Supplier proxies. This
+ file tells the Gateway what connections to establish with
+ particular hosts using particular ports.
+
+ 2. The consumer_config file establishes the "logical
+ configuration." This file tells the Gateway how to forward
+ data coming from Suppliers to the appropriate Consumers.
+
+ The application Gateway generally should be started after all
+ the Peers described below, though the process should work
+ correctly even if it starts first.
+
+2. Peer
+
+ This directory contains the source code for the Peer process,
+ peerd. There are typically many Peers, which act as suppliers
+ and consumers of event messages that are routed through the
+ gatewayd.
+
+ To do anything interesting you'll need at least two Peers: one
+ to supply events and one to consume events. In the
+ configuration files, these two types of Peers are designated
+ as follows:
+
+ 1. Supplier Peers (designated by an 'S' in the Gateway's
+ connection_config configuration file). These Peers are
+ "suppliers" of events to the Gateway.
+
+ 2. Consumer Peers (designated by an 'C' in the Gateway's
+ connection_config file). These Peers are "consumers" of
+ events forwarded by the Gateway. Forwarding is based on
+ the settings in the consumer_config configuration file.
+
+----------------------------------------
+
+HOW TO RUN THE TESTS
+
+To run the tests do the following:
+
+1. Compile everything (i.e., first compile the ACE libraries, then
+ compile the Gateway and Peer directories).
+
+2. Edit the consumer_config and connection_config files as discussed
+ above to indicate the desired physical and logical mappings
+ for Consumers and Suppliers.
+
+3. Start up the Peers (peerd). You can start up as many as you
+ like, as per the connection_config file, but you'll need at least two
+ (i.e., one Supplier and Consumer). I typically start up each Peer
+ in a different window on a different machine, but you can run them
+ on the same machine as long as you pick different port numbers.
+ The Peers will print out some diagnostic info and then block
+ awaiting connections from the Gateway.
+
+ If you want to set the port numbers of the Peers from
+ the command-line do the following:
+
+ a. Change the svc.conf file in the ./Peer/ directory to
+ another name (e.g., foo.conf). This will keep the
+ program from starting up with the svc.conf file
+ (which dynamically links in the Peers and uses the -a option to
+ set the port).
+
+ b. Then run the peers in different windows as
+
+ # Window 1 (Supplier)
+ % peerd -a S:10011
+
+ # Window 2 (Consumer)
+ % peerd -a C:10012
+
+ etc. Naturally, you can also edit the svc.conf file, but that
+ may be more of a pain if you've got a network filesystem and
+ all your hosts share the same svc.conf file.
+
+4. Start up the Gateway (gatewayd). This will print out a bunch of
+ messages as it reads the config files and connects to all the Peers.
+ By default, the Gateway is purely reactive, i.e., it handles
+ Consumers and Suppliers in the same thread of control. However,
+ if you give the '-t OUTPUT_MT' option the Gateway will handle all
+ Consumers in separate threads. If you give the '-t INPUT_MT' option
+ the Gateway will handle all Suppliers in separate threads. If you
+ give the '-t INPUT_MT|OUTPUT_MT' option both Consumers and Suppliers
+ will be handled in the separate threads.
+
+ Assuming everything works, then all the Peers will be connected.
+ If some of the Peers aren't set up correctly, or if they aren't
+ started first, then the Gateway will use an exponential backoff
+ algorithm to attempt to reestablish those connections.
+
+5. Once the Gateway has connected with all the Peers you can send
+ events from Supplier Peers by typing commands in the Peer window.
+ This Supplier will be sent to the Gateway, which will forward the
+ event to all Consumer Peers that have "subscribed" to receive these
+ events.
+
+ Note that if you type ^C in a Peer window the Peer will shutdown
+ its handlers and exit. The Gateway will detect this and will start
+ trying to reestablish the connection using the same exponential
+ backoff algorithm it used for the initial connection establishment.
+
+7. When you want to terminate a Gateway, just type ^C or type any
+ characters in the ./gatewayd window and the process will shut down
+ gracefully.
+
+
+