diff options
Diffstat (limited to 'ACE/apps/Gateway')
38 files changed, 6188 insertions, 0 deletions
diff --git a/ACE/apps/Gateway/Gateway/Concrete_Connection_Handlers.cpp b/ACE/apps/Gateway/Gateway/Concrete_Connection_Handlers.cpp new file mode 100644 index 00000000000..e6ac36ba1fe --- /dev/null +++ b/ACE/apps/Gateway/Gateway/Concrete_Connection_Handlers.cpp @@ -0,0 +1,802 @@ +// $Id$ + +#define ACE_BUILD_SVC_DLL + +#include "ace/OS_NS_unistd.h" +#include "Event_Channel.h" +#include "Concrete_Connection_Handlers.h" + +ACE_RCSID(Gateway, Concrete_Connection_Handlers, "$Id$") + +Consumer_Handler::Consumer_Handler (const Connection_Config_Info &pci) + : Connection_Handler (pci) +{ + this->connection_role_ = 'C'; + this->msg_queue ()->high_water_mark (Options::instance ()->max_queue_size ()); +} + +// This method should be called only when the Consumer shuts down +// unexpectedly. This method simply marks the Connection_Handler as +// having failed so that handle_close () can reconnect. + +// Do not close handler when received data successfully. +// Consumer_Handler should could process received data. +// For example, Consumer could send reply-event to Supplier. +int +Consumer_Handler::handle_input (ACE_HANDLE) +{ + // Do not set FAILED state at here, just at real failed place. + + char buf[BUFSIZ]; + ssize_t received = this->peer ().recv (buf, sizeof buf); + + switch (received) + { + case -1: + this->state (Connection_Handler::FAILED); + ACE_ERROR_RETURN ((LM_ERROR, + "(%t) Peer has failed unexpectedly for Consumer_Handler %d\n", + this->connection_id ()), + -1); + /* NOTREACHED */ + case 0: + this->state (Connection_Handler::FAILED); + ACE_ERROR_RETURN ((LM_ERROR, + "(%t) Peer has shutdown unexpectedly for Consumer_Handler %d\n", + this->connection_id ()), + -1); + /* NOTREACHED */ + default: + ACE_ERROR_RETURN ((LM_ERROR, + "(%t) IGNORED: Consumer is erroneously sending input to Consumer_Handler %d\n" + "data size = %d\n", + this->connection_id (), + received), + 0); // Return 0 to identify received data successfully. + /* NOTREACHED */ + } +} + +// Perform a non-blocking put() of event. If we are unable to send +// the entire event the remainder is re-queued at the *front* of the +// Event_List. + +int +Consumer_Handler::nonblk_put (ACE_Message_Block *event) +{ + // Try to send the event. If we don't send it all (e.g., due to + // flow control), then re-queue the remainder at the head of the + // Event_List and ask the ACE_Reactor to inform us (via + // handle_output()) when it is possible to try again. + + ssize_t n = this->send (event); + + if (n == -1) + { + // -1 is returned only when things have really gone wrong (i.e., + // not when flow control occurs). Thus, let's try to close down + // and set up a new reconnection by calling handle_close(). + this->state (Connection_Handler::FAILED); + this->handle_close (); + return -1; + } + else if (errno == EWOULDBLOCK) + { + // We didn't manage to send everything, so we need to queue + // things up. + + ACE_DEBUG ((LM_DEBUG, + "(%t) queueing activated on handle %d to routing id %d\n", + this->get_handle (), + this->connection_id ())); + + // ACE_Queue in *front* of the list to preserve order. + if (this->msg_queue ()->enqueue_head + (event, (ACE_Time_Value *) &ACE_Time_Value::zero) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + "(%t) %p\n", + "enqueue_head"), + -1); + + // Tell ACE_Reactor to call us back when we can send again. + else if (ACE_Reactor::instance ()->schedule_wakeup + (this, ACE_Event_Handler::WRITE_MASK) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + "(%t) %p\n", + "schedule_wakeup"), + -1); + return 0; + } + else + return n; +} + +ssize_t +Consumer_Handler::send (ACE_Message_Block *event) +{ + ACE_DEBUG ((LM_DEBUG, + "(%t) sending %d bytes to Consumer %d\n", + event->length (), + this->connection_id ())); + + ssize_t len = event->length (); + ssize_t n = this->peer ().send (event->rd_ptr (), len); + + if (n <= 0) + return errno == EWOULDBLOCK ? 0 : n; + else if (n < len) + { + // Re-adjust pointer to skip over the part we did send. + event->rd_ptr (n); + errno = EWOULDBLOCK; + } + else // if (n == length) + { + // The whole event is sent, we now decrement the reference count + // (which deletes itself with it reaches 0). + event->release (); + errno = 0; + } + this->total_bytes (n); + return n; +} + +// Finish sending an event when flow control conditions abate. +// This method is automatically called by the ACE_Reactor. + +int +Consumer_Handler::handle_output (ACE_HANDLE) +{ + ACE_Message_Block *event = 0; + + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT("(%t) Receiver signalled 'resume transmission' %d\n"), + this->get_handle ())); + + // WIN32 Notes: When the receiver blocked, we started adding to the + // consumer handler's message Q. At this time, we registered a + // callback with the reactor to tell us when the TCP layer signalled + // that we could continue to send messages to the consumer. However, + // Winsock only sends this notification ONCE, so we have to assume + // at the application level, that we can continue to send until we + // get any subsequent blocking signals from the receiver's buffer. + +#if defined (ACE_WIN32) + // Win32 Winsock doesn't trigger multiple "You can write now" + // signals, so we have to assume that we can continue to write until + // we get another EWOULDBLOCK. + + // We cancel the wakeup callback we set earlier. + if (ACE_Reactor::instance ()->cancel_wakeup + (this, ACE_Event_Handler::WRITE_MASK) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("(%t) %p\n"), + ACE_TEXT ("Error in ACE_Reactor::cancel_wakeup()")), + -1); + + // The list had better not be empty, otherwise there's a bug! + while (this->msg_queue ()->dequeue_head + (event, (ACE_Time_Value *) &ACE_Time_Value::zero) != -1) + { + switch (this->nonblk_put (event)) + { + case -1: // Error sending message to consumer. + { + // We are responsible for releasing an ACE_Message_Block if + // failures occur. + event->release (); + + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("(%t) %p\n"), + ACE_TEXT ("transmission failure"))); + break; + } + case 0: // Partial Send - we got flow controlled by the receiver + { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("%D Partial Send due to flow control") + ACE_TEXT ("- scheduling new wakeup with reactor\n"))); + + // Re-schedule a wakeup call from the reactor when the + // flow control conditions abate. + if (ACE_Reactor::instance ()->schedule_wakeup + (this, + ACE_Event_Handler::WRITE_MASK) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("(%t) %p\n"), + ACE_TEXT ("Error in ACE_Reactor::schedule_wakeup()")), + -1); + + // Didn't write everything this time, come back later... + return 0; + } + default: // Sent the whole thing + { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("Sent message from message Q, Q size = %d\n"), + this->msg_queue()->message_count ())); + break; + } + } + } + + // If we drop out of the while loop, then the message Q should be + // empty...or there's a problem in the dequeue_head() call...but + // thats another story. + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("%D Sent all messages from consumers message Q\n"))); + + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("(%t) queueing deactivated on handle %d to routing id %d\n"), + this->get_handle (), + this->connection_id ())); +#else /* !defined (ACE_WIN32) */ + // The list had better not be empty, otherwise there's a bug! + if (this->msg_queue ()->dequeue_head + (event, (ACE_Time_Value *) &ACE_Time_Value::zero) != -1) + { + switch (this->nonblk_put (event)) + { + case 0: // Partial send. + ACE_ASSERT (errno == EWOULDBLOCK); + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("%D Partial Send\n"))); + + // Didn't write everything this time, come back later... + break; + + case -1: + // We are responsible for releasing an ACE_Message_Block if + // failures occur. + event->release (); + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("(%t) %p\n"), + ACE_TEXT ("transmission failure"))); + + /* FALLTHROUGH */ + default: // Sent the whole thing. + + // If we succeed in writing the entire event (or we did not + // fail due to EWOULDBLOCK) then check if there are more + // events on the Message_Queue. If there aren't, tell the + // ACE_Reactor not to notify us anymore (at least until + // there are new events queued up). + + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("QQQ::Sent Message from consumer's Q\n"))); + + if (this->msg_queue ()->is_empty ()) + { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("(%t) queueing deactivated on handle %d to routing id %d\n"), + this->get_handle (), + this->connection_id ())); + + if (ACE_Reactor::instance ()->cancel_wakeup + (this, ACE_Event_Handler::WRITE_MASK) == -1) + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("(%t) %p\n"), + ACE_TEXT ("cancel_wakeup"))); + } + } + } + else + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("(%t) %p\n"), + ACE_TEXT ("dequeue_head - handle_output called by reactor but nothing in Q"))); +#endif /* ACE_WIN32 */ + return 0; +} + +// Send an event to a Consumer (may queue if necessary). + +int +Consumer_Handler::put (ACE_Message_Block *event, + ACE_Time_Value *) +{ + if (this->msg_queue ()->is_empty ()) + // Try to send the event *without* blocking! + return this->nonblk_put (event); + else + // If we have queued up events due to flow control then just + // enqueue and return. + return this->msg_queue ()->enqueue_tail + (event, (ACE_Time_Value *) &ACE_Time_Value::zero); +} + +Supplier_Handler::Supplier_Handler (const Connection_Config_Info &pci) + : Connection_Handler (pci), + msg_frag_ (0) +{ + this->connection_role_ = 'S'; + this->msg_queue ()->high_water_mark (0); +} + +// Receive an Event from a Supplier. Handles fragmentation. +// +// The event returned from recv consists of two parts: +// +// 1. The Address part, contains the "virtual" routing id. +// +// 2. The Data part, which contains the actual data to be forwarded. +// +// The reason for having two parts is to shield the higher layers +// of software from knowledge of the event structure. + +int +Supplier_Handler::recv (ACE_Message_Block *&forward_addr) +{ + if (this->msg_frag_ == 0) + // No existing fragment... + ACE_NEW_RETURN (this->msg_frag_, + ACE_Message_Block (sizeof (Event), + ACE_Message_Block::MB_DATA, + 0, + 0, + 0, + Options::instance ()->locking_strategy ()), + -1); + + Event *event = (Event *) this->msg_frag_->rd_ptr (); + ssize_t header_received = 0; + + const size_t HEADER_SIZE = sizeof (Event_Header); + ssize_t header_bytes_left_to_read = + HEADER_SIZE - this->msg_frag_->length (); + + if (header_bytes_left_to_read > 0) + { + header_received = this->peer ().recv + (this->msg_frag_->wr_ptr (), header_bytes_left_to_read); + + if (header_received == -1 /* error */ + || header_received == 0 /* EOF */) + { + ACE_ERROR ((LM_ERROR, "%p\n", + "Recv error during header read ")); + ACE_DEBUG ((LM_DEBUG, + "attempted to read %d\n", + header_bytes_left_to_read)); + this->msg_frag_ = this->msg_frag_->release (); + return header_received; + } + + // Bump the write pointer by the amount read. + this->msg_frag_->wr_ptr (header_received); + + // At this point we may or may not have the ENTIRE header. + if (this->msg_frag_->length () < HEADER_SIZE) + { + ACE_DEBUG ((LM_DEBUG, + "Partial header received: only %d bytes\n", + this->msg_frag_->length ())); + // Notify the caller that we didn't get an entire event. + errno = EWOULDBLOCK; + return -1; + } + + // Convert the header into host byte order so that we can access + // it directly without having to repeatedly muck with it... + event->header_.decode (); + + if (event->header_.len_ > ACE_INT32 (sizeof event->data_)) + { + // This data_ payload is too big! + errno = EINVAL; + ACE_DEBUG ((LM_DEBUG, + "Data payload is too big (%d bytes)\n", + event->header_.len_)); + return -1; + } + + } + + // At this point there is a complete, valid header in Event. Now we + // need to get the event payload. Due to incomplete reads this may + // not be the first time we've read in a fragment for this message. + // We account for this here. Note that the first time in here + // msg_frag_->wr_ptr() will point to event->data_. Every time we do + // a successful fragment read, we advance wr_ptr(). Therefore, by + // subtracting how much we've already read from the + // event->header_.len_ we complete the data_bytes_left_to_read... + + ssize_t data_bytes_left_to_read = + ssize_t (event->header_.len_ - (msg_frag_->wr_ptr () - event->data_)); + + ssize_t data_received = + !data_bytes_left_to_read + ? 0 // peer().recv() should not be called when data_bytes_left_to_read is 0. + : this->peer ().recv (this->msg_frag_->wr_ptr (), data_bytes_left_to_read); + + // Try to receive the remainder of the event. + + switch (data_received) + { + case -1: + if (errno == EWOULDBLOCK) + // This might happen if only the header came through. + return -1; + else + /* FALLTHROUGH */; + + case 0: // Premature EOF. + if (data_bytes_left_to_read) + { + this->msg_frag_ = this->msg_frag_->release (); + return 0; + } + /* FALLTHROUGH */; + + default: + // Set the write pointer at 1 past the end of the event. + this->msg_frag_->wr_ptr (data_received); + + if (data_received != data_bytes_left_to_read) + { + errno = EWOULDBLOCK; + // Inform caller that we didn't get the whole event. + return -1; + } + else + { + // Set the read pointer to the beginning of the event. + this->msg_frag_->rd_ptr (this->msg_frag_->base ()); + + // Allocate an event forwarding header and chain the data + // portion onto its continuation field. + forward_addr = new ACE_Message_Block (sizeof (Event_Key), + ACE_Message_Block::MB_PROTO, + this->msg_frag_, + 0, + 0, + Options::instance ()->locking_strategy ()); + if (forward_addr == 0) + { + this->msg_frag_ = this->msg_frag_->release (); + errno = ENOMEM; + return -1; + } + + Event_Key event_addr (this->connection_id (), + event->header_.type_); + // Copy the forwarding address from the Event_Key into + // forward_addr. + forward_addr->copy ((char *) &event_addr, sizeof (Event_Key)); + + // Reset the pointer to indicate we've got an entire event. + this->msg_frag_ = 0; + } + + this->total_bytes (data_received + header_received); + ACE_DEBUG ((LM_DEBUG, + "(%t) connection id = %d, cur len = %d, total bytes read = %d\n", + event->header_.connection_id_, + event->header_.len_, + data_received + header_received)); + if (Options::instance ()->enabled (Options::VERBOSE)) + ACE_DEBUG ((LM_DEBUG, + "data_ = %*s\n", + event->header_.len_ - 2, + event->data_)); + + // Encode before returning so that we can set things out in + // network byte order. + event->header_.encode (); + return data_received + header_received; + } +} + +// Receive various types of input (e.g., Peer event from the gatewayd, +// as well as stdio). + +int +Supplier_Handler::handle_input (ACE_HANDLE) +{ + ACE_Message_Block *event_key = 0; + + switch (this->recv (event_key)) + { + case 0: + // Note that a peer shouldn't initiate a shutdown by closing the + // connection. Therefore, the peer must have crashed, so we'll + // need to bail out here and let the higher layers reconnect. + this->state (Connection_Handler::FAILED); + ACE_ERROR_RETURN ((LM_ERROR, + "(%t) Peer has closed down unexpectedly for Input Connection_Handler %d\n", + this->connection_id ()), + -1); + /* NOTREACHED */ + case -1: + if (errno == EWOULDBLOCK) + // A short-read, we'll come back and finish it up later on! + return 0; + else // A weird problem occurred, shut down and start again. + { + this->state (Connection_Handler::FAILED); + ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p for Input Connection_Handler %d\n", + "Peer has failed unexpectedly", + this->connection_id ()), + -1); + } + /* NOTREACHED */ + default: + // Route messages to Consumers. + return this->process (event_key); + } +} + +// This delegates to the <Event_Channel> to do the actual processing. +// Typically, this forwards the event to its appropriate Consumer(s). + +int +Supplier_Handler::process (ACE_Message_Block *event_key) +{ + return this->event_channel_->put (event_key); +} + +Thr_Consumer_Handler::Thr_Consumer_Handler (const Connection_Config_Info &pci) + : Consumer_Handler (pci) +{ + // It is not in thread svc() now. + in_thread_ = 0; +} + +// Overriding handle_close() method. If in thread svc(), no need to +// process handle_close() when call peer().close(), because the +// connection is blocked now. + +int +Thr_Consumer_Handler::handle_close (ACE_HANDLE h, ACE_Reactor_Mask m) +{ + if (in_thread_) + return 0; + else + return Consumer_Handler::handle_close (h, m); +} + +// This method should be called only when the Consumer shuts down +// unexpectedly. This method marks the Connection_Handler as having +// failed and deactivates the ACE_Message_Queue (to wake up the thread +// blocked on <dequeue_head> in svc()). +// Thr_Consumer_Handler::handle_close () will eventually try to +// reconnect... + +// Let Consumer_Handler receive normal data. +int +Thr_Consumer_Handler::handle_input (ACE_HANDLE h) +{ + // Call down to the <Consumer_Handler> to handle this first. + if (this->Consumer_Handler::handle_input (h) != 0) + { + // Only do such work when failed. + + ACE_Reactor::instance ()->remove_handler + (h, ACE_Event_Handler::ALL_EVENTS_MASK | ACE_Event_Handler::DONT_CALL); + + // Deactivate the queue while we try to get reconnected. + this->msg_queue ()->deactivate (); + // Will call handle_close. + return -1; + } + return 0; +} + +// Initialize the threaded Consumer_Handler object and spawn a new +// thread. + +int +Thr_Consumer_Handler::open (void *) +{ + // Turn off non-blocking I/O. + if (this->peer ().disable (ACE_NONBLOCK) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + "(%t) %p\n", + "disable"), + -1); // Incorrect info fixed. + + // Call back to the <Event_Channel> to complete our initialization. + else if (this->event_channel_->complete_connection_connection (this) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + "(%t) %p\n", + "complete_connection_connection"), + -1); + + // Register ourselves to receive input events (which indicate that + // the Consumer has shut down unexpectedly). + else if (ACE_Reactor::instance ()->register_handler + (this, ACE_Event_Handler::READ_MASK) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + "(%t) %p\n", + "register_handler"), + -1); + + // Reactivate message queue. If it was active then this is the + // first time in and we need to spawn a thread, otherwise the queue + // was inactive due to some problem and we've already got a thread. + else if (this->msg_queue ()->activate () == ACE_Message_Queue<ACE_SYNCH>::WAS_ACTIVE) + { + ACE_DEBUG ((LM_DEBUG, + "(%t) spawning new thread\n")); + // Become an active object by spawning a new thread to transmit + // events to Consumers. + return this->activate (THR_NEW_LWP | THR_DETACHED); + } + else + { + ACE_DEBUG ((LM_DEBUG, + "(%t) reusing existing thread\n")); + return 0; + } +} + +// Queue up an event for transmission (must not block since +// Supplier_Handlers may be single-threaded). + +int +Thr_Consumer_Handler::put (ACE_Message_Block *mb, ACE_Time_Value *) +{ + // Perform non-blocking enqueue, i.e., if <msg_queue> is full + // *don't* block! + return this->msg_queue ()->enqueue_tail + (mb, (ACE_Time_Value *) &ACE_Time_Value::zero); +} + +// Transmit events to the peer. Note the simplification resulting +// from the use of threads, compared with the Reactive solution. + +int +Thr_Consumer_Handler::svc (void) +{ + for (in_thread_ = 1;;) + { + ACE_DEBUG ((LM_DEBUG, + "(%t) Thr_Consumer_Handler's handle = %d\n", + this->peer ().get_handle ())); + + // Since this method runs in its own thread it is OK to block on + // output. + + for (ACE_Message_Block *mb = 0; + this->msg_queue ()->dequeue_head (mb) != -1; + ) + if (this->send (mb) == -1) + ACE_ERROR ((LM_ERROR, + "(%t) %p\n", + "send failed")); + + ACE_ASSERT (errno == ESHUTDOWN); + + ACE_DEBUG ((LM_DEBUG, + "(%t) shutting down threaded Consumer_Handler %d on handle %d\n", + this->connection_id (), + this->get_handle ())); + + this->peer ().close (); + + // Re-establish the connection, using exponential backoff. + for (this->timeout (1); + // Default is to reconnect synchronously. + this->event_channel_->initiate_connection_connection (this, 1) == -1; + // Second parameter '1' means using sync mode directly, + // don't care Options::blocking_semantics(). If don't do + // so, async mode will be used to connect which won't + // satisfy original design. + ) + { + ACE_Time_Value tv (this->timeout ()); + + ACE_ERROR ((LM_ERROR, + "(%t) reattempting connection, sec = %d\n", + tv.sec ())); + + ACE_OS::sleep (tv); + } + } + + ACE_NOTREACHED (return 0;) +} + +Thr_Supplier_Handler::Thr_Supplier_Handler (const Connection_Config_Info &pci) + : Supplier_Handler (pci) +{ + // It is not in thread svc() now. + in_thread_ = 0; +} + +// Overriding handle_close() method. If in thread svc(), no need to +// process handle_close() when call peer().close(), because the +// connection is blocked now. + +int +Thr_Supplier_Handler::handle_close (ACE_HANDLE h, ACE_Reactor_Mask m) +{ + if (in_thread_) + return 0; + else + return Supplier_Handler::handle_close (h, m); +} + +int +Thr_Supplier_Handler::open (void *) +{ + // Turn off non-blocking I/O. + if (this->peer ().disable (ACE_NONBLOCK) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + "(%t) %p\n", + "disable"), + -1); // Incorrect info fixed. + + // Call back to the <Event_Channel> to complete our initialization. + else if (this->event_channel_->complete_connection_connection (this) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + "(%t) %p\n", + "complete_connection_connection"), + -1); + + // Reactivate message queue. If it was active then this is the + // first time in and we need to spawn a thread, otherwise the queue + // was inactive due to some problem and we've already got a thread. + else if (this->msg_queue ()->activate () == ACE_Message_Queue<ACE_SYNCH>::WAS_ACTIVE) + { + ACE_DEBUG ((LM_DEBUG, + "(%t) spawning new thread\n")); + // Become an active object by spawning a new thread to transmit + // events to peers. + return this->activate (THR_NEW_LWP | THR_DETACHED); + } + else + { + ACE_DEBUG ((LM_DEBUG, "(%t) reusing existing thread\n")); + return 0; + } +} + +// Receive events from a Peer in a separate thread (note reuse of +// existing code!). + +int +Thr_Supplier_Handler::svc (void) +{ + for (in_thread_ = 1;;) + { + ACE_DEBUG ((LM_DEBUG, + "(%t) Thr_Supplier_Handler's handle = %d\n", + this->peer ().get_handle ())); + + // Since this method runs in its own thread and processes events + // for one connection it is OK to call down to the + // <Supplier_Handler::handle_input> method, which blocks on + // input. + + while (this->Supplier_Handler::handle_input () != -1) + continue; + + ACE_DEBUG ((LM_DEBUG, + "(%t) shutting down threaded Supplier_Handler %d on handle %d\n", + this->connection_id (), + this->get_handle ())); + + this->peer ().close (); + + // Deactivate the queue while we try to get reconnected. + this->msg_queue ()->deactivate (); + + // Re-establish the connection, using expoential backoff. + for (this->timeout (1); + // Default is to reconnect synchronously. + this->event_channel_->initiate_connection_connection (this, 1) == -1; + // Second parameter '1' means using sync mode directly, + // don't care Options::blocking_semantics(). If don't do + // so, async mode will be used to connect which won't + // satisfy original design. + ) + { + ACE_Time_Value tv (this->timeout ()); + ACE_ERROR ((LM_ERROR, + "(%t) reattempting connection, sec = %d\n", + tv.sec ())); + ACE_OS::sleep (tv); + } + } + ACE_NOTREACHED(return 0); +} diff --git a/ACE/apps/Gateway/Gateway/Concrete_Connection_Handlers.h b/ACE/apps/Gateway/Gateway/Concrete_Connection_Handlers.h new file mode 100644 index 00000000000..287a4c8ec34 --- /dev/null +++ b/ACE/apps/Gateway/Gateway/Concrete_Connection_Handlers.h @@ -0,0 +1,151 @@ +/* -*- C++ -*- */ +// $Id$ + +// ============================================================================ +// +// = LIBRARY +// gateway +// +// = FILENAME +// Concrete_Connection_Handlers.h +// +// = DESCRIPTION +// These are all the subclasses of Connection_Handler that define the +// appropriate threaded/reactive Consumer/Supplier behavior. +// +// = AUTHOR +// Doug Schmidt <schmidt@cs.wustl.edu> +// +// ============================================================================ + +#ifndef CONCRETE_CONNECTION_HANDLER +#define CONCRETE_CONNECTION_HANDLER + +#include "Connection_Handler.h" + +class Supplier_Handler : public Connection_Handler +{ + // = TITLE + // Handles reception of Events from Suppliers. + // + // = DESCRIPTION + // Performs framing and error checking on Events. Intended to + // run reactively, i.e., in one thread of control using a + // Reactor for demuxing and dispatching. +public: + // = Initialization method. + Supplier_Handler (const Connection_Config_Info &); + +protected: + // = All the following methods are upcalls, so they can be protected. + + virtual int handle_input (ACE_HANDLE = ACE_INVALID_HANDLE); + // Receive and process peer events. + + virtual int recv (ACE_Message_Block *&); + // Receive an event from a Supplier. + + int process (ACE_Message_Block *event); + // This delegates to the <Event_Channel> to do the actual + // processing. Typically, it forwards the <event> to its + // appropriate Consumer. + + ACE_Message_Block *msg_frag_; + // Keep track of event fragment to handle non-blocking recv's from + // Suppliers. +}; + +class Consumer_Handler : public Connection_Handler +{ + // = TITLE + // Handles transmission of events to Consumers. + // + // = DESCRIPTION + // Performs queueing and error checking. Intended to run + // reactively, i.e., in one thread of control using a Reactor + // for demuxing and dispatching. Also uses a Reactor to handle + // flow controlled output connections. +public: + // = Initialization method. + Consumer_Handler (const Connection_Config_Info &); + + virtual int put (ACE_Message_Block *event, + ACE_Time_Value * = 0); + // Send an event to a Consumer (may be queued if necessary). + +protected: + virtual int handle_output (ACE_HANDLE); + // Finish sending event when flow control conditions abate. + + int nonblk_put (ACE_Message_Block *mb); + // Perform a non-blocking put(). + + virtual ssize_t send (ACE_Message_Block *); + // Send an event to a Consumer. + + virtual int handle_input (ACE_HANDLE); + // Receive and process shutdowns from a Consumer. +}; + +class Thr_Consumer_Handler : public Consumer_Handler +{ + // = TITLE + // Runs each <Consumer_Handler> in a separate thread. +public: + Thr_Consumer_Handler (const Connection_Config_Info &); + + virtual int open (void *); + // Initialize the threaded Consumer_Handler object and spawn a new + // thread. + + virtual int put (ACE_Message_Block *, ACE_Time_Value * = 0); + // Send a message to a peer. + +protected: + virtual int handle_input (ACE_HANDLE); + // Called when Peer shutdown unexpectedly. + + virtual int svc (void); + // Transmit peer messages. + + virtual int handle_close (ACE_HANDLE = ACE_INVALID_HANDLE, + ACE_Reactor_Mask = ACE_Event_Handler::ALL_EVENTS_MASK); + // When thread started, connection become blocked, so no need to use + // handle_close to reinitiate the connection_handler, so should + // override this function to justify if controlling is in thread or + // not. If yes, handle_close do nothing, otherwise, it call parent + // handle_close(). + +private: + int in_thread_; + // If the controlling is in thread's svc() or not. +}; + +class Thr_Supplier_Handler : public Supplier_Handler +{ + // = TITLE + // Runs each <Supplier_Handler> in a separate thread. +public: + Thr_Supplier_Handler (const Connection_Config_Info &pci); + + virtual int open (void *); + // Initialize the object and spawn a new thread. + +protected: + virtual int handle_close (ACE_HANDLE = ACE_INVALID_HANDLE, + ACE_Reactor_Mask = ACE_Event_Handler::ALL_EVENTS_MASK); + // When thread started, connection become blocked, so no need to use + // handle_close to reinitiate the connection_handler, so should + // override this function to justify if controlling is in thread or + // not. If yes, handle_close do nothing, otherwise, it call parent + // handle_close(). + + virtual int svc (void); + // Transmit peer messages. + +private: + int in_thread_; + // If the controlling is in thread's svc() or not. +}; + +#endif /* CONCRETE_CONNECTION_HANDLER */ diff --git a/ACE/apps/Gateway/Gateway/Config_Files.cpp b/ACE/apps/Gateway/Gateway/Config_Files.cpp new file mode 100644 index 00000000000..f1b9e96dd23 --- /dev/null +++ b/ACE/apps/Gateway/Gateway/Config_Files.cpp @@ -0,0 +1,215 @@ +// $Id$ + +#define ACE_BUILD_SVC_DLL + +#include "Config_Files.h" +#include "Options.h" + +ACE_RCSID(Gateway, Config_Files, "$Id$") + +// This fixes a nasty bug with cfront-based compilers (like +// Centerline). +typedef FP::Return_Type FP_RETURN_TYPE; + +FP_RETURN_TYPE +Consumer_Config_File_Parser::read_entry (Consumer_Config_Info &entry, + int &line_number) +{ + FP_RETURN_TYPE result; + + // Increment the line count. + line_number++; + + // Ignore comments, check for EOF and EOLINE if this succeeds, we + // have our connection id. + + while ((result = this->getint (entry.connection_id_)) != FP::RT_SUCCESS) + if (result == FP::RT_EOFILE) + return FP::RT_EOFILE; + else if (result == FP::RT_EOLINE + || result == FP::RT_COMMENT) + line_number++; + + // Get the payload type. + result = this->getint (entry.type_); + if (result != FP::RT_SUCCESS) + return result; + + // get all the consumers. + entry.total_consumers_ = 0; + + while ((result = this->getint + (entry.consumers_[entry.total_consumers_])) == FP::RT_SUCCESS) + ++entry.total_consumers_; // do nothing (should check against max...) + + if (result == FP::RT_EOLINE || result == FP::RT_EOFILE) + return FP::RT_SUCCESS; + else + return result; +} + +FP_RETURN_TYPE +Connection_Config_File_Parser::read_entry (Connection_Config_Info &entry, + int &line_number) +{ + char buf[BUFSIZ]; + FP_RETURN_TYPE result; + + // Increment the line count. + line_number++; + + // Ignore comments, check for EOF and EOLINE if this succeeds, we + // have our connection id + + while ((result = this->getint (entry.connection_id_)) != FP::RT_SUCCESS) + if (result == FP::RT_EOFILE) + return FP::RT_EOFILE; + else if (result == FP::RT_EOLINE + || result == FP::RT_COMMENT) + line_number++; + + // Get the hostname. + result = this->getword (entry.host_); + if (result != FP::RT_SUCCESS) + return result; + + ACE_INT32 port; + + // Get the port number. + result = this->getint (port); + if (result == FP::RT_DEFAULT) + { + // Get the proxy role, i.e., 'C' (Consumer) or 'S' (Supplier). + result = this->getword (buf); + if (result != FP::RT_SUCCESS) + return result; + else + entry.connection_role_ = buf[0]; + + if (entry.connection_role_ == 'C') + entry.remote_port_ = Options::instance ()->consumer_connector_port (); + else if (entry.connection_role_ == 'S') + entry.remote_port_ = Options::instance ()->supplier_connector_port (); + else + // Yikes, this is a *weird* error! + entry.remote_port_ = 0; + } + else if (result != FP::RT_SUCCESS) + return result; + else + { + entry.remote_port_ = (unsigned short) port; + + // Get the proxy role, i.e., 'C' (Consumer) or 'S' (Supplier). + result = this->getword (buf); + if (result != FP::RT_SUCCESS) + return result; + else + entry.connection_role_ = buf[0]; + } + + // Get the max retry delay. + result = this->getint (entry.max_retry_timeout_); + if (result == FP::RT_DEFAULT) + entry.max_retry_timeout_ = Options::instance ()->max_timeout (); + else if (result != FP::RT_SUCCESS) + return result; + + // Get the local port number. + result = this->getint (port); + if (result == FP::RT_DEFAULT) + entry.local_port_ = 0; // @@ Should make this an option. + else if (result != FP::RT_SUCCESS) + return result; + else + entry.local_port_ = (unsigned short) port; + + ACE_INT32 priority; + + // Get the priority. + result = this->getint (priority); + if (result != FP::RT_SUCCESS) + return result; + else + entry.priority_ = priority; + + return FP::RT_SUCCESS; +} + +#if defined (DEBUGGING) +int +main (int argc, char *argv[]) +{ + FP_RETURN_TYPE result; + int line_number = 0; + + { + Connection_Config_Info entry; + Connection_Config_File_Parser connection_config_file; + + connection_config_file.open (argc > 1 ? argv[1] : "connection_config"); + + int line_number = 0; + + ACE_DEBUG ((LM_DEBUG, + "ConnID\tHost\t\tRPort\tRole\tRetry\tLPort\tPriority\n")); + + // Read config file line at a time. + while ((result = connection_config_file.read_entry (entry, line_number)) != FP::RT_EOFILE) + if (result == FP::RT_PARSE_ERROR) + ACE_DEBUG ((LM_DEBUG, + "Error line %d.\n", + line_number)); + else + ACE_DEBUG ((LM_DEBUG, + "%d\t%s\t%d\t%c\t%d\t%d\t%d\n", + entry.connection_id_, + entry.host_, + entry.remote_port_, + entry.connection_role_, + entry.max_retry_timeout_, + entry.local_port_, + entry.priority_)); + + connection_config_file.close (); + } + + { + Consumer_Config_Info entry; + Consumer_Config_File_Parser consumer_config_file; + + consumer_config_file.open (argc > 2 ? argv[2] : "consumer_config"); + + line_number = 0; + + ACE_DEBUG ((LM_DEBUG, + "\nConnID\tLogic\tPayload\tDestinations\n")); + + // Read config file line at a time. + while ((result = consumer_config_file.read_entry (entry, line_number)) != FP::RT_EOFILE) + if (result == FP::RT_PARSE_ERROR) + ACE_DEBUG ((LM_DEBUG, + "Error line %d.\n", + line_number)); + else + { + ACE_DEBUG ((LM_DEBUG, + "%d\t%d\t", + entry.connection_id_, + entry.type_)); + + while (--entry.total_consumers_ >= 0) + ACE_DEBUG ((LM_DEBUG, + "%d,", + entry.consumers_[entry.total_consumers_])); + ACE_DEBUG ((LM_DEBUG, + "\n")); + } + + consumer_config_file.close (); + } + + return 0; +} +#endif /* DEBUGGING */ + diff --git a/ACE/apps/Gateway/Gateway/Config_Files.h b/ACE/apps/Gateway/Gateway/Config_Files.h new file mode 100644 index 00000000000..9fd96b687f6 --- /dev/null +++ b/ACE/apps/Gateway/Gateway/Config_Files.h @@ -0,0 +1,98 @@ +/* -*- C++ -*- */ +// $Id$ + +// ============================================================================ +// +// = LIBRARY +// gateway +// +// = FILENAME +// Config_Files.h +// +// = AUTHOR +// Doug Schmidt +// +// ============================================================================ + +#ifndef _CONFIG_FILES +#define _CONFIG_FILES + +#include "File_Parser.h" +#include "Event.h" + +// Forward declaration. +class Event_Channel; + +class Connection_Config_Info + // = TITLE + // Stores connection configuration information. +{ +public: + ACE_INT32 connection_id_; + // Connection id for this Connection_Handler. + + char host_[BUFSIZ]; + // Host to connect with. + + u_short remote_port_; + // Port to connect with. + + char connection_role_; + // 'S' (supplier) or 'C' (consumer). + + ACE_INT32 max_retry_timeout_; + // Maximum amount of time to wait for reconnecting. + + u_short local_port_; + // Our local port number. + + ACE_INT32 priority_; + // Priority by which different Consumers and Suppliers should be + // serviced. + + Event_Channel *event_channel_; + // We just need a place to store this until we can pass it along + // when creating a Connection_Handler. +}; + +class Connection_Config_File_Parser : public File_Parser<Connection_Config_Info> + // = TITLE + // Parser for the Connection_Handler Connection file. +{ +public: + virtual FP::Return_Type read_entry (Connection_Config_Info &entry, + int &line_number); + // Read in a <Connection_Config_Info> entry. + +}; + +class Consumer_Config_Info + // = TITLE + // Stores the information in a Consumer Map entry. +{ +public: + ACE_INT32 connection_id_; + // Connection id. + + ACE_INT32 type_; + // Message type. + + ACE_INT32 consumers_[MAX_CONSUMERS]; + // Connection ids for consumers that will be routed information + // containing this <connection_id_> + + ACE_INT32 total_consumers_; + // Total number of these consumers. +}; + +class Consumer_Config_File_Parser : public File_Parser<Consumer_Config_Info> + // = TITLE + // Parser for the Consumer Map file. +{ +public: + virtual FP::Return_Type read_entry (Consumer_Config_Info &entry, + int &line_number); + // Read in a <Consumer_Config_Info> entry. +}; + +#endif /* _CONFIG_FILES */ diff --git a/ACE/apps/Gateway/Gateway/Connection_Handler.cpp b/ACE/apps/Gateway/Gateway/Connection_Handler.cpp new file mode 100644 index 00000000000..ff93886c187 --- /dev/null +++ b/ACE/apps/Gateway/Gateway/Connection_Handler.cpp @@ -0,0 +1,272 @@ +// $Id$ + +#define ACE_BUILD_SVC_DLL + +#include "ace/OS_NS_string.h" +#include "Event_Channel.h" +#include "Concrete_Connection_Handlers.h" + +ACE_RCSID(Gateway, Connection_Handler, "$Id$") + +Event_Channel * +Connection_Handler::event_channel (void) const +{ + return this->event_channel_; +} + +void +Connection_Handler::event_channel (Event_Channel *ec) +{ + this->event_channel_ = ec; +} + +void +Connection_Handler::connection_id (CONNECTION_ID id) +{ + this->connection_id_ = id; +} + +CONNECTION_ID +Connection_Handler::connection_id (void) const +{ + return this->connection_id_; +} + +// The total number of bytes sent/received on this Proxy. + +size_t +Connection_Handler::total_bytes (void) const +{ + return this->total_bytes_; +} + +void +Connection_Handler::total_bytes (size_t bytes) +{ + this->total_bytes_ += bytes; +} + +Connection_Handler::Connection_Handler (void) +{ +} + +Connection_Handler::Connection_Handler (const Connection_Config_Info &pci) + : local_addr_ (pci.local_port_), + connection_id_ (pci.connection_id_), + total_bytes_ (0), + state_ (Connection_Handler::IDLE), + timeout_ (1), + max_timeout_ (pci.max_retry_timeout_), + event_channel_ (pci.event_channel_) +{ + if (ACE_OS::strlen (pci.host_) > 0) + this->remote_addr_.set (pci.remote_port_, pci.host_); + else + this->remote_addr_.set (pci.remote_port_, ACE_DEFAULT_SERVER_HOST); + // Set the priority of the Proxy. + this->priority (int (pci.priority_)); +} + +// Set the connection_role. + +void +Connection_Handler::connection_role (char d) +{ + this->connection_role_ = d; +} + +// Get the connection_role. + +char +Connection_Handler::connection_role (void) const +{ + return this->connection_role_; +} + +// Sets the timeout delay. + +void +Connection_Handler::timeout (long to) +{ + if (to > this->max_timeout_) + to = this->max_timeout_; + + this->timeout_ = to; +} + +// Re-calculate the current retry timeout delay using exponential +// backoff. Returns the original timeout (i.e., before the +// re-calculation). + +long +Connection_Handler::timeout (void) +{ + long old_timeout = this->timeout_; + this->timeout_ *= 2; + + if (this->timeout_ > this->max_timeout_) + this->timeout_ = this->max_timeout_; + + return old_timeout; +} + +// Sets the max timeout delay. + +void +Connection_Handler::max_timeout (long mto) +{ + this->max_timeout_ = mto; +} + +// Gets the max timeout delay. + +long +Connection_Handler::max_timeout (void) const +{ + return this->max_timeout_; +} + +// Restart connection asynchronously when timeout occurs. + +int +Connection_Handler::handle_timeout (const ACE_Time_Value &, + const void *) +{ + ACE_DEBUG ((LM_DEBUG, + "(%t) attempting to reconnect Connection_Handler %d with timeout = %d\n", + this->connection_id (), + this->timeout_)); + + // Delegate the re-connection attempt to the Event Channel. + this->event_channel_->initiate_connection_connection (this); + + return 0; +} + +// Handle shutdown of the Connection_Handler object. + +int +Connection_Handler::handle_close (ACE_HANDLE, ACE_Reactor_Mask) +{ + ACE_DEBUG ((LM_DEBUG, + "(%t) shutting down %s Connection_Handler %d on handle %d\n", + this->connection_role () == 'C' ? "Consumer" : "Supplier", + this->connection_id (), + this->get_handle ())); + + // Restart the connection, if possible. + return this->event_channel_->reinitiate_connection_connection (this); +} + +// Set the state of the Proxy. + +void +Connection_Handler::state (Connection_Handler::State s) +{ + this->state_ = s; +} + +// Return the current state of the Proxy. + +Connection_Handler::State +Connection_Handler::state (void) const +{ + return this->state_; +} + +// Upcall from the <ACE_Acceptor> or <ACE_Connector> that delegates +// control to our Connection_Handler. + +int +Connection_Handler::open (void *) +{ + ACE_DEBUG ((LM_DEBUG, "(%t) %s Connection_Handler's handle = %d\n", + this->connection_role () == 'C' ? "Consumer" : "Supplier", + this->peer ().get_handle ())); + + // Call back to the <Event_Channel> to complete our initialization. + if (this->event_channel_->complete_connection_connection (this) == -1) + ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "complete_connection_connection"), -1); + + // Turn on non-blocking I/O. + else if (this->peer ().enable (ACE_NONBLOCK) == -1) + ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "enable"), -1); + + // Register ourselves to receive input events. + else if (ACE_Reactor::instance ()->register_handler + (this, ACE_Event_Handler::READ_MASK) == -1) + ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "register_handler"), -1); + else + return 0; +} + +const ACE_INET_Addr & +Connection_Handler::remote_addr (void) const +{ + return this->remote_addr_; +} + +void +Connection_Handler::remote_addr (ACE_INET_Addr &ra) +{ + this->remote_addr_ = ra; +} + +const ACE_INET_Addr & +Connection_Handler::local_addr (void) const +{ + return this->local_addr_; +} + +void +Connection_Handler::local_addr (ACE_INET_Addr &la) +{ + this->local_addr_ = la; +} + +// Make the appropriate type of <Connection_Handler> (i.e., +// <Consumer_Handler>, <Supplier_Handler>, <Thr_Consumer_Handler>, or +// <Thr_Supplier_Handler>). + +Connection_Handler * +Connection_Handler_Factory::make_connection_handler (const Connection_Config_Info &pci) +{ + Connection_Handler *connection_handler = 0; + + // The next few lines of code are dependent on whether we are making + // a threaded/reactive Supplier_Handler/Consumer_Handler. + + if (pci.connection_role_ == 'C') // Configure a Consumer_Handler. + { + // Create a threaded Consumer_Handler. + if (ACE_BIT_ENABLED (Options::instance ()->threading_strategy (), + Options::OUTPUT_MT)) + ACE_NEW_RETURN (connection_handler, + Thr_Consumer_Handler (pci), + 0); + + // Create a reactive Consumer_Handler. + else + ACE_NEW_RETURN (connection_handler, + Consumer_Handler (pci), + 0); + } + else // connection_role == 'S', so configure a Supplier_Handler. + { + // Create a threaded Supplier_Handler. + if (ACE_BIT_ENABLED (Options::instance ()->threading_strategy (), + Options::INPUT_MT)) + ACE_NEW_RETURN (connection_handler, + Thr_Supplier_Handler (pci), + 0); + + // Create a reactive Supplier_Handler. + else + ACE_NEW_RETURN (connection_handler, + Supplier_Handler (pci), + 0); + } + + return connection_handler; +} + diff --git a/ACE/apps/Gateway/Gateway/Connection_Handler.h b/ACE/apps/Gateway/Gateway/Connection_Handler.h new file mode 100644 index 00000000000..a8a72830135 --- /dev/null +++ b/ACE/apps/Gateway/Gateway/Connection_Handler.h @@ -0,0 +1,157 @@ +/* -*- C++ -*- */ +// $Id$ + +// ============================================================================ +// +// = LIBRARY +// gateway +// +// = FILENAME +// Connection_Handler.h +// +// = AUTHOR +// Doug Schmidt +// +// ============================================================================ + +#ifndef _CONNECTION_HANDLER +#define _CONNECTION_HANDLER + +#include "ace/Service_Config.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +#include "ace/SOCK_Connector.h" +#include "ace/Svc_Handler.h" +#include "Config_Files.h" +#include "Options.h" +#include "Event.h" + +// Forward declaration. +class Event_Channel; + +class Connection_Handler : public ACE_Svc_Handler<ACE_SOCK_STREAM, ACE_SYNCH> +{ + // = TITLE + // <Connection_Handler> contains info about connection state and + // addressing. + // + // = DESCRIPTION + // The <Connection_Handler> classes process events sent to the + // Event Channel from Suppliers and forward them to Consumers. +public: + Connection_Handler (void); + // Default constructor (needed to make <ACE_Connector> happy). + + Connection_Handler (const Connection_Config_Info &); + // Real constructor. + + virtual int open (void * = 0); + // Initialize and activate a single-threaded <Connection_Handler> + // (called by <ACE_Connector::handle_output>). + + // = The current state of the Connection_Handler. + enum State + { + IDLE = 1, // Prior to initialization. + CONNECTING, // During connection establishment. + ESTABLISHED, // Connection_Handler is established and active. + DISCONNECTING, // Connection_Handler is in the process of connecting. + FAILED // Connection_Handler has failed. + }; + + // = Set/get the current state. + void state (State); + State state (void) const; + + // = Set/get remote INET addr. + void remote_addr (ACE_INET_Addr &); + const ACE_INET_Addr &remote_addr (void) const; + + // = Set/get local INET addr. + void local_addr (ACE_INET_Addr &); + const ACE_INET_Addr &local_addr (void) const; + + // = Set/get connection id. + void connection_id (CONNECTION_ID); + CONNECTION_ID connection_id (void) const; + + // = Set/get the current retry timeout delay. + void timeout (long); + long timeout (void); + + // = Set/get the maximum retry timeout delay. + void max_timeout (long); + long max_timeout (void) const; + + // = Set/get proxy role (i.e., 'S' for Supplier and 'C' for Consumer + // (necessary for error checking). + void connection_role (char); + char connection_role (void) const; + + // = Set/get the <Event_Channel> *. + void event_channel (Event_Channel *); + Event_Channel *event_channel (void) const; + + // = The total number of bytes sent/received on this proxy. + void total_bytes (size_t bytes); + // Increment count by <bytes>. + size_t total_bytes (void) const; + // Return the current byte count. + + virtual int handle_timeout (const ACE_Time_Value &, const void *arg); + // Perform timer-based Connection_Handler reconnection. + + virtual int handle_close (ACE_HANDLE = ACE_INVALID_HANDLE, + ACE_Reactor_Mask = ACE_Event_Handler::ALL_EVENTS_MASK); + // Perform Connection_Handler termination. + +protected: + ACE_INET_Addr remote_addr_; + // Address of peer. + + ACE_INET_Addr local_addr_; + // Address of us. + + CONNECTION_ID connection_id_; + // The assigned connection ID of this entry. + + size_t total_bytes_; + // The total number of bytes sent/received on this proxy. + + State state_; + // The current state of the proxy. + + long timeout_; + // Amount of time to wait between reconnection attempts. + + long max_timeout_; + // Maximum amount of time to wait between reconnection attempts. + + char connection_role_; + // Indicates which role the proxy plays ('S' == Supplier and 'C' == + // Consumer). + + Event_Channel *event_channel_; + // Reference to the <Event_Channel> that we use to forward all + // the events from Consumers and Suppliers. +}; + +class Connection_Handler_Factory +{ + // = TITLE + // Creates the appropriate type of <Connection_Handler>. + // + // = DESCRIPTION + // <Connection_Handler>s can include <Consumer_Handler>, + // <Supplier_Handler>, <Thr_Consumer_Handler>, or + // <Thr_Supplier_Handler>). +public: + Connection_Handler *make_connection_handler (const Connection_Config_Info &); + // Make the appropriate type of <Connection_Handler>, based on the + // <Connection_Config_Info> parameter. +}; + +#endif /* _CONNECTION_HANDLER */ diff --git a/ACE/apps/Gateway/Gateway/Connection_Handler_Acceptor.cpp b/ACE/apps/Gateway/Gateway/Connection_Handler_Acceptor.cpp new file mode 100644 index 00000000000..7790fb83d08 --- /dev/null +++ b/ACE/apps/Gateway/Gateway/Connection_Handler_Acceptor.cpp @@ -0,0 +1,56 @@ +// $Id$ + +#define ACE_BUILD_SVC_DLL + +#include "Event_Channel.h" +#include "Connection_Handler_Acceptor.h" + +ACE_RCSID(Gateway, Connection_Handler_Acceptor, "$Id$") + +int +Connection_Handler_Acceptor::make_svc_handler (Connection_Handler *&ch) +{ + ACE_ALLOCATOR_RETURN (ch, + this->connection_handler_factory_.make_connection_handler (this->connection_config_info_), + -1); + return 0; +} + +int +Connection_Handler_Acceptor::accept_svc_handler (Connection_Handler *ch) +{ + if (this->inherited::accept_svc_handler (ch) == -1) + return -1; + else + { + ch->connection_id (Options::instance ()->connection_id ()); + ACE_INET_Addr remote_addr; + + if (ch->peer ().get_remote_addr (remote_addr) == -1) + return -1; + + // Set the remote address of our connected Peer. + ch->remote_addr (remote_addr); + + // Set the Event_Channel pointer. + ch->event_channel (&this->event_channel_); + + // Increment the connection ID by one. + Options::instance ()->connection_id ()++; + return 0; + } +} + +Connection_Handler_Acceptor::Connection_Handler_Acceptor (Event_Channel &ec, + char connection_role) + : event_channel_ (ec) +{ + this->connection_config_info_.connection_id_ = 0; + this->connection_config_info_.host_[0] = '\0'; + this->connection_config_info_.remote_port_ = 0; + this->connection_config_info_.connection_role_ = connection_role; + this->connection_config_info_.max_retry_timeout_ = Options::instance ()->max_timeout (); + this->connection_config_info_.local_port_ = 0; + this->connection_config_info_.priority_ = 1; +} + diff --git a/ACE/apps/Gateway/Gateway/Connection_Handler_Acceptor.h b/ACE/apps/Gateway/Gateway/Connection_Handler_Acceptor.h new file mode 100644 index 00000000000..777f58c4a5b --- /dev/null +++ b/ACE/apps/Gateway/Gateway/Connection_Handler_Acceptor.h @@ -0,0 +1,65 @@ +/* -*- C++ -*- */ +// $Id$ + +// ============================================================================ +// +// = LIBRARY +// gateway +// +// = FILENAME +// Connection_Handler_acceptor.h +// +// = AUTHOR +// Doug Schmidt +// +// ============================================================================ + +#ifndef _CONNECTION_HANDLER_ACCEPTOR +#define _CONNECTION_HANDLER_ACCEPTOR + +#include "ace/Acceptor.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +#include "ace/SOCK_Acceptor.h" +#include "Connection_Handler.h" + +// Forward declaration +class Event_Channel; + +class Connection_Handler_Acceptor : public ACE_Acceptor<Connection_Handler, ACE_SOCK_ACCEPTOR> +{ + // = TITLE + // A concrete factory class that setups connections to peerds + // and produces a new Connection_Handler object to do the dirty + // work... +public: + Connection_Handler_Acceptor (Event_Channel &, + char connection_role); + // Constructor. + + virtual int make_svc_handler (Connection_Handler *&ch); + // Hook method for creating an appropriate <Connection_Handler>. + + virtual int accept_svc_handler (Connection_Handler *ch); + // Hook method for accepting a connection into the + // <Connection_Handler>. + +protected: + typedef ACE_Acceptor<Connection_Handler, ACE_SOCK_ACCEPTOR> + inherited; + // Make life easier later on. + + Event_Channel &event_channel_; + // Reference to the event channel. + + Connection_Config_Info connection_config_info_; + // Keeps track of what type of proxy we need to create. + + Connection_Handler_Factory connection_handler_factory_; + // Make the appropriate type of <Connection_Handler>. +}; + +#endif /* _CONNECTION_HANDLER_ACCEPTOR */ diff --git a/ACE/apps/Gateway/Gateway/Connection_Handler_Connector.cpp b/ACE/apps/Gateway/Gateway/Connection_Handler_Connector.cpp new file mode 100644 index 00000000000..368ad14f373 --- /dev/null +++ b/ACE/apps/Gateway/Gateway/Connection_Handler_Connector.cpp @@ -0,0 +1,60 @@ +// $Id$ + +#include "Connection_Handler_Connector.h" +#include "ace/os_include/os_netdb.h" + +ACE_RCSID(Gateway, Connection_Handler_Connector, "$Id$") + +Connection_Handler_Connector::Connection_Handler_Connector (void) +{ +} + +// Initiate (or reinitiate) a connection to the Connection_Handler. + +int +Connection_Handler_Connector::initiate_connection (Connection_Handler *connection_handler, + ACE_Synch_Options &synch_options) +{ + ACE_TCHAR addr_buf[MAXHOSTNAMELEN]; + + // Mark ourselves as idle so that the various iterators will ignore + // us until we are reconnected. + connection_handler->state (Connection_Handler::IDLE); + + // We check the remote addr second so that it remains in the + // addr_buf. + if (connection_handler->local_addr ().addr_to_string (addr_buf, sizeof addr_buf) == -1 + || connection_handler->remote_addr ().addr_to_string (addr_buf, sizeof addr_buf) == -1) + ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("(%t) %p\n"), + ACE_TEXT ("can't obtain peer's address")), -1); + + // Try to connect to the Peer. + if (this->connect (connection_handler, + connection_handler->remote_addr (), + synch_options, + connection_handler->local_addr ()) == -1) + { + if (errno != EWOULDBLOCK) + { + connection_handler->state (Connection_Handler::FAILED); + ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) %p on address %s\n"), + ACE_TEXT ("connect"), addr_buf)); + return -1; + } + else + { + connection_handler->state (Connection_Handler::CONNECTING); + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("(%t) in the process of connecting to %s\n"), + addr_buf)); + } + } + else + { + connection_handler->state (Connection_Handler::ESTABLISHED); + ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) connected to %s on %d\n"), + addr_buf, connection_handler->get_handle ())); + } + return 0; +} + diff --git a/ACE/apps/Gateway/Gateway/Connection_Handler_Connector.h b/ACE/apps/Gateway/Gateway/Connection_Handler_Connector.h new file mode 100644 index 00000000000..f4e7d7d06a1 --- /dev/null +++ b/ACE/apps/Gateway/Gateway/Connection_Handler_Connector.h @@ -0,0 +1,44 @@ +/* -*- C++ -*- */ +// $Id$ + +// ============================================================================ +// +// = LIBRARY +// gateway +// +// = FILENAME +// Connection_Handler_Connector.h +// +// = AUTHOR +// Doug Schmidt +// +// ============================================================================ + +#ifndef _IO_HANDLER_CONNECTOR +#define _IO_HANDLER_CONNECTOR + +#include "ace/Connector.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +#include "ace/SOCK_Connector.h" +#include "Connection_Handler.h" + +class Connection_Handler_Connector : public ACE_Connector<Connection_Handler, ACE_SOCK_CONNECTOR> +{ + // = TITLE + // A concrete factory class that setups connections to peerds + // and produces a new Connection_Handler object to do the dirty + // work... +public: + Connection_Handler_Connector (void); + + // Initiate (or reinitiate) a connection on the Connection_Handler. + int initiate_connection (Connection_Handler *, + ACE_Synch_Options & = ACE_Synch_Options::synch); + +}; + +#endif /* _IO_HANDLER_CONNECTOR */ diff --git a/ACE/apps/Gateway/Gateway/Consumer_Dispatch_Set.h b/ACE/apps/Gateway/Gateway/Consumer_Dispatch_Set.h new file mode 100644 index 00000000000..3a5d0cf3e25 --- /dev/null +++ b/ACE/apps/Gateway/Gateway/Consumer_Dispatch_Set.h @@ -0,0 +1,32 @@ +/* -*- C++ -*- */ +// $Id$ + +// ============================================================================ +// +// = LIBRARY +// apps +// +// = FILENAME +// Consumer_Dispatch_Set.h +// +// = AUTHOR +// Doug Schmidt +// +// ============================================================================ + +#ifndef CONSUMER_DISPATCH_SET +#define CONSUMER_DISPATCH_SET + +#include "ace/Containers.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +// Forward reference. +class Connection_Handler; + +typedef ACE_Unbounded_Set<Connection_Handler *> Consumer_Dispatch_Set; +typedef ACE_Unbounded_Set_Iterator<Connection_Handler *> Consumer_Dispatch_Set_Iterator; + +#endif /* CONSUMER_DISPATCH_SET */ diff --git a/ACE/apps/Gateway/Gateway/Event.h b/ACE/apps/Gateway/Gateway/Event.h new file mode 100644 index 00000000000..4c157bbce68 --- /dev/null +++ b/ACE/apps/Gateway/Gateway/Event.h @@ -0,0 +1,225 @@ +/* -*- C++ -*- */ +// $Id$ + +// ============================================================================ +// +// = LIBRARY +// gateway +// +// = FILENAME +// Event.h +// +// = AUTHOR +// Doug Schmidt +// +// ============================================================================ + +#ifndef EVENT_H +#define EVENT_H + +#include "ace/os_include/arpa/os_inet.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +#include "ace/Basic_Types.h" + +// = The following #defines should really be in a separate include +// file that is shared with the ../Peer/ directory. For now, we'll +// keep them here to simplify the sharing between the two directories. +// BTW, this is also the reason why all the methods are inlined... + +// Used by Peers to create Consumers in a Gateway. +#if !defined (DEFAULT_GATEWAY_CONSUMER_PORT) +#define DEFAULT_GATEWAY_CONSUMER_PORT 10009 +#endif /* DEFAULT_GATEWAY_CONSUMER_PORT */ + +// Used by Peers create Suppliers in a Gateway. +#if !defined (DEFAULT_GATEWAY_SUPPLIER_PORT) +#define DEFAULT_GATEWAY_SUPPLIER_PORT 10010 +#endif /* DEFAULT_GATEWAY_SUPPLIER_PORT */ + +// Used by a Gateway to create Consumers in a Peer. +#if !defined (DEFAULT_PEER_CONSUMER_PORT) +#define DEFAULT_PEER_CONSUMER_PORT 10011 +#endif /* DEFAULT_PEER_CONSUMER_PORT */ + +// Used by a Gateway to create Suppliers in a Peer. +#if !defined (DEFAULT_PEER_SUPPLIER_PORT) +#define DEFAULT_PEER_SUPPLIER_PORT 10012 +#endif /* DEFAULT_PEER_SUPPLIER_PORT */ + +#if !defined (MAX_CONSUMERS) +#define MAX_CONSUMERS 1000 +#endif /* MAX_CONSUMERS */ + +// This is the unique supplier identifier that denotes a particular +// <Connection_Handler> in the Gateway. +typedef ACE_INT32 CONNECTION_ID; + +enum +{ + // = These are the types of events generated by the <Suppliers> and + // handled by the <Event_Channel>. + + ROUTING_EVENT = 0, + // A normal event, which is forwarded to the <Consumers>. + + SUBSCRIPTION_EVENT = 1 + // A subscription to <Suppliers> managed by the <Event_Channel>. +}; + +class Event_Key +{ + // = TITLE + // Address used to identify the source/destination of an event. + // + // = DESCRIPTION + // This is really a "processing descriptor" that is used to + // decouple the processing, filtering, and forwarding logic of + // the Event Channel from the format of the data. The + // <connection_id_> and <type_> fields are copied from the + // <Event_Header> class below. +public: + Event_Key (CONNECTION_ID cid = -1, + ACE_INT32 type = 0, + ACE_INT32 priority = 0) + : connection_id_ (cid), + type_ (type), + priority_ (priority) + { + } + + bool operator== (const Event_Key &event_addr) const + { + return this->connection_id_ == event_addr.connection_id_ + && this->type_ == event_addr.type_; + } + + CONNECTION_ID connection_id_; + // Unique connection identifier that denotes a particular + // Connection_Handler. + + ACE_INT32 type_; + // Event type, e.g., <ROUTING_EVENT> or <SUBSCRIPTION_EVENT>. + + ACE_INT32 priority_; + // Event priority. +}; + +class Event_Header +{ + // = TITLE + // Fixed sized header. + // + // = DESCRIPTION + // This is designed to have a sizeof (16) to avoid alignment + // problems on most platforms. +public: + enum + { + INVALID_ID = -1 // No peer can validly use this number. + }; + + Event_Header (ACE_INT32 len, + CONNECTION_ID connection_id, + ACE_INT32 type, + ACE_INT32 priority) + : len_ (len), + connection_id_ (connection_id), + type_ (type), + priority_ (priority) + { + } + + void decode (void) + { + this->len_ = ntohl (this->len_); + this->connection_id_ = ntohl (this->connection_id_); + this->type_ = ntohl (this->type_); + this->priority_ = ntohl (this->priority_); + } + // Decode from network byte order to host byte order. + + void encode (void) + { + this->len_ = htonl (this->len_); + this->connection_id_ = htonl (this->connection_id_); + this->type_ = htonl (this->type_); + this->priority_ = htonl (this->priority_); + } + // Encode from host byte order to network byte order. + + ACE_INT32 len_; + // Length of the data_ payload, in bytes. + + CONNECTION_ID connection_id_; + // Unique connection identifier that denotes a particular + // Connection_Handler. + + ACE_INT32 type_; + // Event type, e.g., <ROUTING_EVENT> or <SUBSCRIPTION_EVENT>. + + ACE_INT32 priority_; + // Event priority. +}; + +class Event +{ + // = TITLE + // Variable-sized event (data_ may be variable-sized between + // 0 and MAX_PAYLOAD_SIZE). +public: + enum { MAX_PAYLOAD_SIZE = 1024 }; + // The maximum size of an Event. + + Event () : header_ (0, -1, 0, 0) {}; + + Event_Header header_; + // Event header. + + char data_[MAX_PAYLOAD_SIZE]; + // Event data. +}; + +class Subscription +{ + // = TITLE + // Allows Consumers to subscribe to be routed information + // arriving from a particular Supplier connection id. +public: + void decode (void) + { + this->connection_id_ = ntohl (this->connection_id_); + + for (ACE_INT32 i = 0; i < this->total_consumers_; i++) + this->consumers_[i] = ntohl (this->consumers_[i]); + + this->total_consumers_ = ntohl (this->total_consumers_); + } + // Decode from network byte order to host byte order. + + void encode (void) + { + this->connection_id_ = htonl (this->connection_id_); + + for (ACE_INT32 i = 0; i < this->total_consumers_; i++) + this->consumers_[i] = htonl (this->consumers_[i]); + + this->total_consumers_ = htonl (this->total_consumers_); + } + // Encode from host byte order to network byte order. + + ACE_INT32 connection_id_; + // Connection id. + + ACE_INT32 consumers_[MAX_CONSUMERS]; + // Connection ids for consumers that will be routed information + // containing this <connection_id_> + + ACE_INT32 total_consumers_; + // Total number of these consumers. +}; + +#endif /* EVENT_H */ diff --git a/ACE/apps/Gateway/Gateway/Event_Channel.cpp b/ACE/apps/Gateway/Gateway/Event_Channel.cpp new file mode 100644 index 00000000000..beb35c1856e --- /dev/null +++ b/ACE/apps/Gateway/Gateway/Event_Channel.cpp @@ -0,0 +1,588 @@ +// $Id$ + +#define ACE_BUILD_SVC_DLL + +#include "Connection_Handler_Connector.h" +#include "Event_Channel.h" +#include "ace/OS_NS_sys_select.h" +#include "ace/Signal.h" + +ACE_RCSID(Gateway, Event_Channel, "$Id$") + +Event_Channel::~Event_Channel (void) +{ +} + +Event_Channel::Event_Channel (void) + : supplier_acceptor_ (*this, 'S'), + consumer_acceptor_ (*this, 'C') +{ +} + +int +Event_Channel::compute_performance_statistics (void) +{ + ACE_DEBUG ((LM_DEBUG, "(%t) doing the performance timeout here...\n")); + CONNECTION_MAP_ITERATOR cmi (this->connection_map_); + + // If we've got a <ACE_Thread_Manager> then use it to suspend all + // the threads. This will enable us to get an accurate count. + + if (Options::instance ()->threading_strategy () + != Options::REACTIVE) + { + if (ACE_Thread_Manager::instance ()->suspend_all () == -1) + ACE_ERROR_RETURN ((LM_ERROR, + "(%t) %p\n", + "suspend_all"), + -1); + ACE_DEBUG ((LM_DEBUG, + "(%t) suspending all threads...")); + } + + size_t total_bytes_in = 0; + size_t total_bytes_out = 0; + + // Iterate through the connection map summing up the number of bytes + // sent/received. + + for (CONNECTION_MAP_ENTRY *me = 0; + cmi.next (me) != 0; + cmi.advance ()) + { + Connection_Handler *connection_handler = me->int_id_; + + if (connection_handler->connection_role () == 'C') + total_bytes_out += connection_handler->total_bytes (); + else // connection_handler->connection_role () == 'S' + total_bytes_in += connection_handler->total_bytes (); + } + + ACE_DEBUG ((LM_DEBUG, + "(%t) after %d seconds, \ntotal_bytes_in = %d\ntotal_bytes_out = %d\n", + Options::instance ()->performance_window (), + total_bytes_in, + total_bytes_out)); + + ACE_DEBUG ((LM_DEBUG, + "(%t) %f Mbits/sec received.\n", + (float) (total_bytes_in * 8 / + (float) (1024 * 1024 * Options::instance ()->performance_window ())))); + + ACE_DEBUG ((LM_DEBUG, + "(%t) %f Mbits/sec sent.\n", + (float) (total_bytes_out * 8 / + (float) (1024 * 1024 * Options::instance ()->performance_window ())))); + + // Resume all the threads again. + + if (Options::instance ()->threading_strategy () + != Options::REACTIVE) + { + if (ACE_Thread_Manager::instance ()->resume_all () == -1) + ACE_ERROR_RETURN ((LM_ERROR, + "(%t) %p\n", + "resume_all"), + -1); + ACE_DEBUG ((LM_DEBUG, + "(%t) resuming all threads...")); + } + + + return 0; +} + +int +Event_Channel::handle_timeout (const ACE_Time_Value &, + const void *) +{ + // This is called periodically to compute performance statistics. + return this->compute_performance_statistics (); +} + +int +Event_Channel::put (ACE_Message_Block *event, + ACE_Time_Value *) +{ + // We got a valid event, so determine its type, which is stored in + // the first of the two <ACE_Message_Block>s, which are chained + // together by <ACE::recv>. + Event_Key *event_key = (Event_Key *) event->rd_ptr (); + + // Skip over the address portion and get the data, which is in the + // second <ACE_Message_Block>. + ACE_Message_Block *data = event->cont (); + + switch (event_key->type_) + { + case ROUTING_EVENT: + this->routing_event (event_key, + data); + break; + case SUBSCRIPTION_EVENT: + this->subscription_event (data); + break; + } + + // Release the memory in the message block. + event->release (); + return 0; +} + +void +Event_Channel::subscription_event (ACE_Message_Block *data) +{ + Event *event = (Event *) data->rd_ptr (); + + ACE_DEBUG ((LM_DEBUG, + "(%t) received a subscription with %d bytes from connection id %d\n", + event->header_.len_, + event->header_.connection_id_)); + Subscription *subscription = (Subscription *) event->data_; + // Convert the subscription into host byte order so that we can + // access it directly without having to repeatedly muck with it... + subscription->decode (); + + ACE_DEBUG ((LM_DEBUG, + "(%t) connection_id_ = %d, total_consumers_ = %d\n", + subscription->connection_id_, + subscription->total_consumers_)); + + for (ACE_INT32 i = 0; + i < subscription->total_consumers_; + i++) + ACE_DEBUG ((LM_DEBUG, + "(%t) consumers_[%d] = %d\n", + i, + subscription->consumers_[i])); + +} + +void +Event_Channel::routing_event (Event_Key *forwarding_address, + ACE_Message_Block *data) +{ + Consumer_Dispatch_Set *dispatch_set = 0; + + // Initialize the <dispatch_set> to points to the set of Consumers + // associated with this forwarding address. + + if (this->efd_.find (*forwarding_address, + dispatch_set) == -1) + // Failure. + ACE_ERROR ((LM_DEBUG, + "(%t) find failed on connection id = %d, type = %d\n", + forwarding_address->connection_id_, + forwarding_address->type_)); + else + { + // Check to see if there are any consumers. + if (dispatch_set->size () == 0) + ACE_DEBUG ((LM_WARNING, + "there are no active consumers for this event currently\n")); + + else // There are consumers, so forward the event. + { + // Initialize the interator. + Consumer_Dispatch_Set_Iterator dsi (*dispatch_set); + + // At this point, we should assign a thread-safe locking + // strategy to the <ACE_Message_Block> is we're running in a + // multi-threaded configuration. + data->locking_strategy (Options::instance ()->locking_strategy ()); + + for (Connection_Handler **connection_handler = 0; + dsi.next (connection_handler) != 0; + dsi.advance ()) + { + // Only process active connection_handlers. + if ((*connection_handler)->state () == Connection_Handler::ESTABLISHED) + { + // Duplicate the event portion via reference + // counting. + ACE_Message_Block *dup_msg = data->duplicate (); + + ACE_DEBUG ((LM_DEBUG, + "(%t) forwarding to Consumer %d\n", + (*connection_handler)->connection_id ())); + + if ((*connection_handler)->put (dup_msg) == -1) + { + if (errno == EWOULDBLOCK) // The queue has filled up! + ACE_ERROR ((LM_ERROR, + "(%t) %p\n", + "gateway is flow controlled, so we're dropping events")); + else + ACE_ERROR ((LM_ERROR, + "(%t) %p transmission error to peer %d\n", + "put", + (*connection_handler)->connection_id ())); + + // We are responsible for releasing an + // ACE_Message_Block if failures occur. + dup_msg->release (); + } + } + } + } + } +} + +int +Event_Channel::initiate_connection_connection (Connection_Handler *connection_handler, + int sync_directly) +{ + ACE_Synch_Options synch_options; + + if (sync_directly) + // In separated connection handler thread, connection can be + // initiated by block mode (synch mode) directly. + synch_options = ACE_Synch_Options::synch; + else if (Options::instance ()->blocking_semantics () == ACE_NONBLOCK) + synch_options = ACE_Synch_Options::asynch; + else + synch_options = ACE_Synch_Options::synch; + + return this->connector_.initiate_connection (connection_handler, + synch_options); +} + +int +Event_Channel::complete_connection_connection (Connection_Handler *connection_handler) +{ + int option = connection_handler->connection_role () == 'S' + ? SO_RCVBUF + : SO_SNDBUF; + int socket_queue_size = + Options::instance ()->socket_queue_size (); + + if (socket_queue_size > 0) + if (connection_handler->peer ().set_option (SOL_SOCKET, + option, + &socket_queue_size, + sizeof (int)) == -1) + ACE_ERROR ((LM_ERROR, + "(%t) %p\n", + "set_option")); + + connection_handler->thr_mgr (ACE_Thread_Manager::instance ()); + + // Our state is now "established." + connection_handler->state (Connection_Handler::ESTABLISHED); + + // Restart the timeout to 1. + connection_handler->timeout (1); + + ACE_INT32 id = htonl (connection_handler->connection_id ()); + + // Send the connection id to the peerd. + + ssize_t n = connection_handler->peer ().send ((const void *) &id, + sizeof id); + + if (n != sizeof id) + ACE_ERROR_RETURN ((LM_ERROR, + "(%t) %p\n", + n == 0 ? "peer has closed down unexpectedly" : "send"), + -1); + return 0; +} + +// Restart connection (blocking_semantics dicates whether we restart +// synchronously or asynchronously). + +int +Event_Channel::reinitiate_connection_connection (Connection_Handler *connection_handler) +{ + // Cancel asynchronous connecting before re-initializing. It will + // close the peer and cancel the asynchronous connecting. + this->cancel_connection_connection(connection_handler); + + if (connection_handler->state () != Connection_Handler::DISCONNECTING) + { + ACE_DEBUG ((LM_DEBUG, + "(%t) scheduling reinitiation of Connection_Handler %d\n", + connection_handler->connection_id ())); + + // Reschedule ourselves to try and connect again. + ACE_Time_Value const timeout (connection_handler->timeout ()); + if (ACE_Reactor::instance ()->schedule_timer + (connection_handler, + 0, + timeout) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + "(%t) %p\n", + "schedule_timer"), + -1); + } + return 0; +} + +// It is useful to provide a separate method to cancel the +// asynchronous connecting. + +int +Event_Channel::cancel_connection_connection (Connection_Handler *connection_handler) +{ + // Skip over proxies with deactivated handles. + if (connection_handler->get_handle () != ACE_INVALID_HANDLE) + { + // Make sure to close down peer to reclaim descriptor. + connection_handler->peer ().close (); + // Cancel asynchronous connecting before re-initializing. + return this->connector_.cancel(connection_handler); + } + return 0; +} + +// Initiate active connections with the Consumer and Supplier Peers. + +void +Event_Channel::initiate_connector (void) +{ + if (Options::instance ()->enabled + (Options::CONSUMER_CONNECTOR | Options::SUPPLIER_CONNECTOR)) + { + CONNECTION_MAP_ITERATOR cmi (this->connection_map_); + + // Iterate through the Consumer Map connecting all the + // Connection_Handlers. + + for (CONNECTION_MAP_ENTRY *me = 0; + cmi.next (me) != 0; + cmi.advance ()) + { + Connection_Handler *connection_handler = me->int_id_; + + if (this->initiate_connection_connection (connection_handler) == -1) + continue; // Failures are handled elsewhere... + } + } +} + +// Initiate passive acceptor to wait for Consumer and Supplier Peers +// to accept. + +int +Event_Channel::initiate_acceptors (void) +{ + if (Options::instance ()->enabled (Options::CONSUMER_ACCEPTOR)) + { + ACE_INET_Addr + consumer_addr (Options::instance ()->consumer_acceptor_port ()); + if (this->consumer_acceptor_.open + (consumer_addr, + ACE_Reactor::instance (), + Options::instance ()->blocking_semantics ()) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + "%p\n", + "cannot register acceptor"), + -1); + else + ACE_DEBUG ((LM_DEBUG, + "accepting Consumers at %d\n", + Options::instance ()->consumer_acceptor_port ())); + } + if (Options::instance ()->enabled (Options::SUPPLIER_ACCEPTOR)) + { + ACE_INET_Addr + supplier_addr (Options::instance ()->supplier_acceptor_port ()); + if (this->supplier_acceptor_.open + (supplier_addr, + ACE_Reactor::instance (), + Options::instance ()->blocking_semantics ()) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + "%p\n", + "cannot register acceptor"), + -1); + else + ACE_DEBUG ((LM_DEBUG, + "accepting Suppliers at %d\n", + Options::instance ()->supplier_acceptor_port ())); + } + + return 0; +} + +// This method gracefully shuts down all the Handlers in the +// Connection_Handler Connection Map. + +int +Event_Channel::close (u_long) +{ + if (Options::instance ()->threading_strategy () != Options::REACTIVE) + { + if (ACE_Thread_Manager::instance ()->suspend_all () == -1) + ACE_ERROR_RETURN ((LM_ERROR, + "(%t) %p\n", + "suspend_all"), + -1); + ACE_DEBUG ((LM_DEBUG, + "(%t) suspending all threads\n")); + } + + // First tell everyone that the spaceship is here... + { + CONNECTION_MAP_ITERATOR cmi (this->connection_map_); + + // Iterate over all the handlers and shut them down. + + for (CONNECTION_MAP_ENTRY *me = 0; // It's safe to reset me to 0. + cmi.next (me) != 0; + cmi.advance ()) + { + Connection_Handler *connection_handler = me->int_id_; + + ACE_DEBUG ((LM_DEBUG, + "(%t) closing down connection %d\n", + connection_handler->connection_id ())); + + // If have no this statement, the gatewayd will abort when exiting + // with some Consumer/Supplier not connected. + if (connection_handler->state()==Connection_Handler::CONNECTING) + this->cancel_connection_connection(connection_handler); + // Mark Connection_Handler as DISCONNECTING so we don't try to + // reconnect... + connection_handler->state (Connection_Handler::DISCONNECTING); + } + } + + // Close down the connector + this->connector_.close (); + + // Close down the supplier acceptor. + this->supplier_acceptor_.close (); + + // Close down the consumer acceptor. + this->consumer_acceptor_.close (); + + // Now tell everyone that it is now time to commit suicide. + { + CONNECTION_MAP_ITERATOR cmi (this->connection_map_); + + for (CONNECTION_MAP_ENTRY *me = 0; // It's safe to reset me to 0. + cmi.next (me) != 0; + cmi.advance ()) + { + Connection_Handler *connection_handler = me->int_id_; + + // Deallocate Connection_Handler resources. + connection_handler->destroy (); // Will trigger a delete. + } + } + + return 0; +} + +int +Event_Channel::find_proxy (ACE_INT32 connection_id, + Connection_Handler *&connection_handler) +{ + return this->connection_map_.find (connection_id, + connection_handler); +} + +int +Event_Channel::bind_proxy (Connection_Handler *connection_handler) +{ + int result = this->connection_map_.bind (connection_handler->connection_id (), + connection_handler); + + switch (result) + { + case -1: + ACE_ERROR_RETURN ((LM_ERROR, + "(%t) bind failed for connection %d\n", + connection_handler->connection_id ()), + -1); + /* NOTREACHED */ + case 1: // Oops, found a duplicate! + ACE_ERROR_RETURN ((LM_ERROR, + "(%t) duplicate connection %d, already bound\n", + connection_handler->connection_id ()), + -1); + /* NOTREACHED */ + case 0: + // Success. + return 0; + /* NOTREACHED */ + default: + ACE_ERROR_RETURN ((LM_DEBUG, + "(%t) invalid result %d\n", + result), + -1); + /* NOTREACHED */ + } + + ACE_NOTREACHED (return 0); +} + +int +Event_Channel::subscribe (const Event_Key &event_addr, + Consumer_Dispatch_Set *cds) +{ + int result = this->efd_.bind (event_addr, cds); + + // Bind with consumer map, keyed by peer address. + switch (result) + { + case -1: + ACE_ERROR_RETURN ((LM_ERROR, + "(%t) bind failed for connection %d\n", + event_addr.connection_id_), + -1); + /* NOTREACHED */ + case 1: // Oops, found a duplicate! + ACE_ERROR_RETURN ((LM_DEBUG, + "(%t) duplicate consumer map entry %d, " + "already bound\n", + event_addr.connection_id_), + -1); + /* NOTREACHED */ + case 0: + // Success. + return 0; + default: + ACE_ERROR_RETURN ((LM_DEBUG, + "(%t) invalid result %d\n", + result), + -1); + /* NOTREACHED */ + } + + ACE_NOTREACHED (return 0); +} + +int +Event_Channel::open (void *) +{ + // Ignore <SIGPIPE> so each <Consumer_Handler> can handle it. + ACE_Sig_Action sig ((ACE_SignalHandler) SIG_IGN, SIGPIPE); + ACE_UNUSED_ARG (sig); + + // Actively initiate Peer connections. + this->initiate_connector (); + + // Passively initiate Peer acceptor. + if (this->initiate_acceptors () == -1) + return -1; + + // If we're not running reactively, then we need to make sure that + // <ACE_Message_Block> reference counting operations are + // thread-safe. Therefore, we create an <ACE_Lock_Adapter> that is + // parameterized by <ACE_SYNCH_MUTEX> to prevent race conditions. + if (Options::instance ()->threading_strategy () + != Options::REACTIVE) + { + ACE_Lock_Adapter<ACE_SYNCH_MUTEX> *la; + + ACE_NEW_RETURN (la, + ACE_Lock_Adapter<ACE_SYNCH_MUTEX>, + -1); + + Options::instance ()->locking_strategy (la); + } + + return 0; +} + diff --git a/ACE/apps/Gateway/Gateway/Event_Channel.h b/ACE/apps/Gateway/Gateway/Event_Channel.h new file mode 100644 index 00000000000..64788cc0cdb --- /dev/null +++ b/ACE/apps/Gateway/Gateway/Event_Channel.h @@ -0,0 +1,135 @@ +/* -*- C++ -*- */ +// $Id$ + +// ============================================================================ +// +// = LIBRARY +// gateway +// +// = FILENAME +// Event_Channel.h +// +// = AUTHOR +// Doug Schmidt <schmidt@cs.wustl.edu> +// +// ============================================================================ + +#ifndef ACE_EVENT_CHANNEL +#define ACE_EVENT_CHANNEL + +#include "Connection_Handler_Connector.h" +#include "Connection_Handler_Acceptor.h" +#include "Consumer_Dispatch_Set.h" +#include "Event_Forwarding_Discriminator.h" +#include "ace/svc_export.h" + +typedef ACE_Null_Mutex MAP_MUTEX; + +class ACE_Svc_Export Event_Channel : public ACE_Event_Handler +{ + // = TITLE + // Define a generic Event_Channel. + // + // = DESCRIPTION + // The inspiration for this class is derived from the CORBA COS + // Event Channel, though the design is simplified. + // + // We inherit from <ACE_Event_Handler> so that we can be + // registered with an <ACE_Reactor> to handle timeouts. +public: + // = Initialization and termination methods. + Event_Channel (void); + ~Event_Channel (void); + + virtual int open (void * = 0); + // Open the channel. + + virtual int close (u_long = 0); + // Close down the Channel. + + // = Proxy management methods. + int initiate_connection_connection (Connection_Handler *, int sync_directly = 0); + // Initiate the connection of the <Connection_Handler> to its peer. + // Second paratemer is used for thread connection-handler which will + // block the connecting procedure directly, need not care + // Options::blocking_semantics(). + + int complete_connection_connection (Connection_Handler *); + // Complete the initialization of the <Connection_Handler> once it's + // connected to its Peer. + + int reinitiate_connection_connection (Connection_Handler *); + // Reinitiate a connection asynchronously when the Peer fails. + int cancel_connection_connection (Connection_Handler *); + // Cancel a asynchronous connection. + + int bind_proxy (Connection_Handler *); + // Bind the <Connection_Handler> to the <connection_map_>. + + int find_proxy (ACE_INT32 connection_id, + Connection_Handler *&); + // Locate the <Connection_Handler> with <connection_id>. + + int subscribe (const Event_Key &event_addr, + Consumer_Dispatch_Set *cds); + // Subscribe the <Consumer_Dispatch_Set> to receive events that + // match <Event_Key>. + + // = Event processing entry point. + virtual int put (ACE_Message_Block *mb, + ACE_Time_Value * = 0); + // Pass <mb> to the Event Channel so it can forward it to Consumers. + + void initiate_connector (void); + // Actively initiate connections to the Peers. + + int initiate_acceptors (void); + // Passively initiate the <Peer_Acceptor>s for Consumer and + // Suppliers. + +private: + int parse_args (int argc, char *argv[]); + // Parse the command-line arguments. + + // = Methods for handling events. + void routing_event (Event_Key *event_key, + ACE_Message_Block *data); + // Forwards the <data> to Consumer that have registered to receive + // it, based on addressing information in the <event_key>. + + void subscription_event (ACE_Message_Block *data); + // Add a Consumer subscription. + + int compute_performance_statistics (void); + // Perform timer-based performance profiling. + + virtual int handle_timeout (const ACE_Time_Value &, + const void *arg); + // Periodically callback to perform timer-based performance + // profiling. + + Connection_Handler_Connector connector_; + // Used to establish the connections actively. + + Connection_Handler_Acceptor supplier_acceptor_; + // Used to establish connections passively and create Suppliers. + + Connection_Handler_Acceptor consumer_acceptor_; + // Used to establish connections passively and create Consumers. + + // = Make life easier by defining typedefs. + typedef ACE_Map_Manager<CONNECTION_ID, Connection_Handler *, MAP_MUTEX> + CONNECTION_MAP; + typedef ACE_Map_Iterator<CONNECTION_ID, Connection_Handler *, MAP_MUTEX> + CONNECTION_MAP_ITERATOR; + typedef ACE_Map_Entry<CONNECTION_ID, Connection_Handler *> + CONNECTION_MAP_ENTRY; + + CONNECTION_MAP connection_map_; + // Table that maps <CONNECTION_ID>s to <Connection_Handler> *'s. + + Event_Forwarding_Discriminator efd_; + // Map that associates an event to a set of <Consumer_Handler> *'s. +}; + +#endif /* ACE_EVENT_CHANNEL */ diff --git a/ACE/apps/Gateway/Gateway/Event_Forwarding_Discriminator.cpp b/ACE/apps/Gateway/Gateway/Event_Forwarding_Discriminator.cpp new file mode 100644 index 00000000000..d170dee9005 --- /dev/null +++ b/ACE/apps/Gateway/Gateway/Event_Forwarding_Discriminator.cpp @@ -0,0 +1,64 @@ +// $Id$ + +#if !defined (_CONSUMER_MAP_C) +#define _CONSUMER_MAP_C + +#include "Event_Forwarding_Discriminator.h" + +ACE_RCSID(Gateway, Event_Forwarding_Discriminator, "$Id$") + +// Bind the Event_Key to the INT_ID. + +int +Event_Forwarding_Discriminator::bind (Event_Key event_addr, + Consumer_Dispatch_Set *cds) +{ + return this->map_.bind (event_addr, cds); +} + +// Find the Consumer_Dispatch_Set corresponding to the Event_Key. + +int +Event_Forwarding_Discriminator::find (Event_Key event_addr, + Consumer_Dispatch_Set *&cds) +{ + return this->map_.find (event_addr, cds); +} + +// Unbind (remove) the Event_Key from the map. + +int +Event_Forwarding_Discriminator::unbind (Event_Key event_addr) +{ + Consumer_Dispatch_Set *cds = 0; + int result = this->map_.unbind (event_addr, cds); + delete cds; + return result; +} + +Event_Forwarding_Discriminator_Iterator::Event_Forwarding_Discriminator_Iterator + (Event_Forwarding_Discriminator &rt) + : map_iter_ (rt.map_) +{ +} + +int +Event_Forwarding_Discriminator_Iterator::next (Consumer_Dispatch_Set *&cds) +{ + ACE_Map_Entry<Event_Key, Consumer_Dispatch_Set *> *temp; + + if (this->map_iter_.next (temp) == 0) + return 0; + else + { + cds = temp->int_id_; + return 1; + } +} + +int +Event_Forwarding_Discriminator_Iterator::advance (void) +{ + return this->map_iter_.advance (); +} +#endif /* _CONSUMER_MAP_C */ diff --git a/ACE/apps/Gateway/Gateway/Event_Forwarding_Discriminator.h b/ACE/apps/Gateway/Gateway/Event_Forwarding_Discriminator.h new file mode 100644 index 00000000000..2a83a53a584 --- /dev/null +++ b/ACE/apps/Gateway/Gateway/Event_Forwarding_Discriminator.h @@ -0,0 +1,65 @@ +/* -*- C++ -*- */ +// $Id$ + +// ============================================================================ +// +// = LIBRARY +// gateway +// +// = FILENAME +// Event_Forwarding_Discriminator.h +// +// = AUTHOR +// Doug Schmidt +// +// ============================================================================ + +#ifndef _CONSUMER_MAP_H +#define _CONSUMER_MAP_H + +#include "ace/Map_Manager.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +#include "ace/Null_Mutex.h" +#include "Event.h" +#include "Consumer_Dispatch_Set.h" + +class Event_Forwarding_Discriminator +{ + // = TITLE + // Map events to the set of Consumer_Proxies that have subscribed + // to receive the event. +public: + int bind (Event_Key event, Consumer_Dispatch_Set *cds); + // Associate Event with the Consumer_Dispatch_Set. + + int unbind (Event_Key event); + // Locate EXID and pass out parameter via INID. If found, + // return 0, else -1. + + int find (Event_Key event, Consumer_Dispatch_Set *&cds); + // Break any association of EXID. + +public: + ACE_Map_Manager<Event_Key, Consumer_Dispatch_Set *, ACE_Null_Mutex> map_; + // Map that associates <Event_Key>s (external ids) with + // <Consumer_Dispatch_Set> *'s <internal IDs>. +}; + +class Event_Forwarding_Discriminator_Iterator +{ + // = TITLE + // Define an iterator for the Consumer Map. +public: + Event_Forwarding_Discriminator_Iterator (Event_Forwarding_Discriminator &mm); + int next (Consumer_Dispatch_Set *&); + int advance (void); + +private: + ACE_Map_Iterator<Event_Key, Consumer_Dispatch_Set *, ACE_Null_Mutex> map_iter_; + // Map we are iterating over. +}; +#endif /* _CONSUMER_MAP_H */ diff --git a/ACE/apps/Gateway/Gateway/File_Parser.cpp b/ACE/apps/Gateway/Gateway/File_Parser.cpp new file mode 100644 index 00000000000..de82ad0ba10 --- /dev/null +++ b/ACE/apps/Gateway/Gateway/File_Parser.cpp @@ -0,0 +1,163 @@ +// $Id$ + +#ifndef FILE_PARSER_C +#define FILE_PARSER_C + +#include "ace/OS_NS_stdio.h" +#include "ace/OS_NS_stdlib.h" + +#include "File_Parser.h" + +// This fixes a nasty bug with cfront-based compilers (like +// Centerline). +typedef FP::Return_Type FP_RETURN_TYPE; + +// File_Parser stuff. + +template <class ENTRY> +File_Parser<ENTRY>::~File_Parser (void) +{ +} + +template <class ENTRY> int +File_Parser<ENTRY>::open (const ACE_TCHAR filename[]) +{ + this->infile_ = ACE_OS::fopen (filename, ACE_TEXT ("r")); + if (this->infile_ == 0) + return -1; + else + return 0; +} + +template <class ENTRY> int +File_Parser<ENTRY>::close (void) +{ + return ACE_OS::fclose (this->infile_); +} + +template <class ENTRY> FP_RETURN_TYPE +File_Parser<ENTRY>::getword (char buf[]) +{ + return this->readword (buf); +} + +// Get the next string from the file via this->readword() +// Check make sure the string forms a valid number. + +template <class ENTRY> FP_RETURN_TYPE +File_Parser<ENTRY>::getint (ACE_INT32 &value) +{ + char buf[BUFSIZ]; +#if defined (__GNUG__) + // egcs 1.1b can't handle the typedef. + FP::Return_Type +#else /* ! __GNUG__ */ + FP_RETURN_TYPE +#endif /* ! __GNUG__ */ + read_result = this->readword (buf); + + if (read_result == FP::RT_SUCCESS) + { + // Check to see if this is the "use the default value" symbol? + if (buf[0] == '*') + return FP::RT_DEFAULT; + else + { + // ptr is used for error checking with ACE_OS::strtol. + char *ptr; + + // try to convert the buf to a decimal number + value = ACE_OS::strtol (buf, &ptr, 10); + + // check if the buf is a decimal or not + if (value == 0 && ptr == buf) + return FP::RT_PARSE_ERROR; + else + return FP::RT_SUCCESS; + } + } + else + return read_result; +} + + +template <class ENTRY> FP_RETURN_TYPE +File_Parser<ENTRY>::readword (char buf[]) +{ + int wordlength = 0; + int c; + + // Skip over leading delimiters and get word. + + while ((c = getc (this->infile_)) != EOF && c != '\n') + if (this->delimiter (c)) + { + // We've reached the end of a "word". + if (wordlength > 0) + break; + } + else + buf[wordlength++] = c; + + buf[wordlength] = '\0'; + + if (c == EOF) { + // If EOF is just a delimiter, don't return EOF so that the word + // gets processed. + if (wordlength > 0) + { + ungetc (c, this->infile_); + return FP::RT_SUCCESS; + } + else + // else return EOF so that read loops stop + return FP::RT_EOFILE; + } + else if (c == '\n') + { + // if the EOLINE is just a delimiter, don't return EOLINE + // so that the word gets processed + if (wordlength > 0) + ungetc (c, this->infile_); + else + return FP::RT_EOLINE; + } + + // Skip comments. + if (this->comments (buf[0])) + { + if (this->skipline () == EOF) + return FP::RT_EOFILE; + else + return FP::RT_COMMENT; + } + else + return FP::RT_SUCCESS; +} + +template <class ENTRY> int +File_Parser<ENTRY>::delimiter (char ch) +{ + return ch == ' ' || ch == ',' || ch == '\t'; +} + +template <class ENTRY> int +File_Parser<ENTRY>::comments (char ch) +{ + return ch == '#'; +} + +template <class ENTRY> int +File_Parser<ENTRY>::skipline (void) +{ + // Skip the remainder of the line. + + int c; + + while ((c = getc (this->infile_)) != '\n' && c != EOF) + continue; + + return c; +} + +#endif /* _FILE_PARSER_C */ diff --git a/ACE/apps/Gateway/Gateway/File_Parser.h b/ACE/apps/Gateway/Gateway/File_Parser.h new file mode 100644 index 00000000000..6e971090ad8 --- /dev/null +++ b/ACE/apps/Gateway/Gateway/File_Parser.h @@ -0,0 +1,97 @@ +/* -*- C++ -*- */ +// $Id$ + +// ============================================================================ +// +// = LIBRARY +// gateway +// +// = FILENAME +// File_Parser.h +// +// = AUTHOR +// Doug Schmidt +// +// ============================================================================ + +#ifndef _FILE_PARSER +#define _FILE_PARSER + +#include "ace/Basic_Types.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +class FP +{ + // = TITLE + // This class serves as a namespace for the <Return_Type>. +public: + enum Return_Type + { + RT_EOLINE, + RT_EOFILE, + RT_SUCCESS, + RT_COMMENT, + RT_DEFAULT, + RT_PARSE_ERROR + }; +}; + +template <class ENTRY> +class File_Parser +{ + // = TITLE + // Class used to parse the configuration file for the + // <Consumer_Map>. +public: + + /// Destructor. + virtual ~File_Parser (void); + + // = Open and Close the file specified + int open (const ACE_TCHAR filename[]); + int close (void); + + virtual FP::Return_Type read_entry (ENTRY &entry, + int &line_number) = 0; + // Pure virtual hook that subclasses override and use the protected + // methods to fill in the <entry>. + +protected: + FP::Return_Type getword (char buf[]); + // Read the next ASCII word. + + FP::Return_Type getint (ACE_INT32 &value); + // Read the next integer. + + FP::Return_Type readword (char buf[]); + // Read the next "word," which is demarcated by <delimiter>s. + // + // @@ This function is inherently flawed since it doesn't take a + // count of the size of <buf>... + + int delimiter (char ch); + // Returns true if <ch> is a delimiter, i.e., ' ', ',', or '\t'. + + int comments (char ch); + // Returns true if <ch> is the comment character, i.e., '#'. + + int skipline (void); + // Skips to the remainder of a line, e.g., when we find a comment + // character. + + FILE *infile_; + // Pointer to the file we're reading. +}; + +#if defined (ACE_TEMPLATES_REQUIRE_SOURCE) +#include "File_Parser.cpp" +#endif /* ACE_TEMPLATES_REQUIRE_SOURCE */ + +#if defined (ACE_TEMPLATES_REQUIRE_PRAGMA) +#pragma implementation ("File_Parser.cpp") +#endif /* ACE_TEMPLATES_REQUIRE_PRAGMA */ + +#endif /* _FILE_PARSER */ diff --git a/ACE/apps/Gateway/Gateway/Gateway.cpp b/ACE/apps/Gateway/Gateway/Gateway.cpp new file mode 100644 index 00000000000..0fac7b5085c --- /dev/null +++ b/ACE/apps/Gateway/Gateway/Gateway.cpp @@ -0,0 +1,340 @@ +// $Id$ + +#define ACE_BUILD_SVC_DLL + +#include "ace/OS_NS_stdio.h" +#include "ace/OS_NS_string.h" +#include "ace/OS_NS_unistd.h" +#include "ace/Service_Config.h" +#include "ace/Signal.h" +#include "Config_Files.h" +#include "Event_Channel.h" +#include "Gateway.h" + +ACE_RCSID(Gateway, Gateway, "$Id$") + +class ACE_Svc_Export Gateway : public ACE_Service_Object +{ + // = TITLE + // Integrates the whole Gateway application. + // + // = DESCRIPTION + // This implementation uses the <Event_Channel> as the basis + // for the <Gateway> routing. +protected: + // = Service configurator hooks. + virtual int init (int argc, ACE_TCHAR *argv[]); + // Perform initialization. + + virtual int fini (void); + // Perform termination when unlinked dynamically. + + virtual int info (ACE_TCHAR **, size_t) const; + // Return info about this service. + + // = Configuration methods. + int parse_connection_config_file (void); + // Parse the proxy configuration file. + + int parse_consumer_config_file (void); + // Parse the consumer configuration file. + + // = Lifecycle management methods. + int handle_input (ACE_HANDLE); + // Shut down the Gateway when input comes in from the controlling + // console. + + int handle_signal (int signum, siginfo_t * = 0, ucontext_t * = 0); + // Shut down the Gateway when a signal arrives. + + Event_Channel event_channel_; + // The Event Channel routes events from Supplier(s) to Consumer(s) + // using <Supplier_Handler> and <Consumer_Handler> objects. + + Connection_Handler_Factory connection_handler_factory_; + // Creates the appropriate type of <Connection_Handlers>. +}; + +int +Gateway::handle_signal (int signum, siginfo_t *, ucontext_t *) +{ + ACE_UNUSED_ARG (signum); + + // Shut down the main event loop. + ACE_Reactor::end_event_loop (); + return 0; +} + +int +Gateway::handle_input (ACE_HANDLE h) +{ + char buf[BUFSIZ]; + // Consume the input... + ACE_OS::read (h, buf, sizeof (buf)); + + // Shut us down. + return this->handle_signal ((int) h); +} + +int +Gateway::init (int argc, ACE_TCHAR *argv[]) +{ + // Parse the "command-line" arguments. + Options::instance ()->parse_args (argc, argv); + + ACE_Sig_Set sig_set; + sig_set.sig_add (SIGINT); + sig_set.sig_add (SIGQUIT); + + // Register ourselves to receive signals so we can shut down + // gracefully. + + if (ACE_Reactor::instance ()->register_handler (sig_set, + this) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("(%t) %p\n"), + ACE_TEXT ("register_handler")), + -1); + + // Register this handler to receive events on stdin. We use this to + // shutdown the Gateway gracefully. + if (ACE_Event_Handler::register_stdin_handler (this, + ACE_Reactor::instance (), + ACE_Thread_Manager::instance ()) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("(%t) %p\n"), + ACE_TEXT ("register_stdin_handler")), + -1); + + // If this->performance_window_ > 0 start a timer. + + if (Options::instance ()->performance_window () > 0) + { + ACE_Time_Value const performance_time (Options::instance ()->performance_window ()); + if (ACE_Reactor::instance ()->schedule_timer + (&this->event_channel_, 0, + performance_time) == -1) + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("(%t) %p\n"), + ACE_TEXT ("schedule_timer"))); + else + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("starting timer for %d seconds...\n"), + Options::instance ()->performance_window ())); + } + + // Are we running as a connector? + if (Options::instance ()->enabled + (Options::CONSUMER_CONNECTOR | Options::SUPPLIER_CONNECTOR)) + { + // Parse the proxy configuration file. + this->parse_connection_config_file (); + + // Parse the consumer config file and build the event forwarding + // discriminator. + this->parse_consumer_config_file (); + } + + // Initialize the Event_Channel. + return this->event_channel_.open (); +} + +// This method is automatically called when the Gateway is shutdown. + +int +Gateway::fini (void) +{ + // Remove the handler that receive events on stdin. Otherwise, we + // will crash on shutdown. + ACE_Event_Handler::remove_stdin_handler (ACE_Reactor::instance (), + ACE_Thread_Manager::instance ()); + + // Close down the event channel. + this->event_channel_.close (); + + // Need to make sure we cleanup this Singleton. + delete Options::instance (); + return 0; +} + +// Returns information on the currently active service. + +int +Gateway::info (ACE_TCHAR **strp, size_t length) const +{ + ACE_TCHAR buf[BUFSIZ]; + + ACE_OS::strcpy + (buf, ACE_TEXT ("Gateway daemon\t # Application-level gateway\n")); + + if (*strp == 0 && (*strp = ACE_OS::strdup (buf)) == 0) + return -1; + else + ACE_OS::strncpy (*strp, buf, length); + return ACE_OS::strlen (buf); +} + +// Parse and build the proxy table. + +int +Gateway::parse_connection_config_file (void) +{ + // File that contains the proxy configuration information. + Connection_Config_File_Parser connection_file; + int file_empty = 1; + int line_number = 0; + + if (connection_file.open (Options::instance ()->connection_config_file ()) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("(%t) %p\n"), + Options::instance ()->connection_config_file ()), + -1); + + // Keep track of the previous connection id to make sure the + // connection config file isn't corrupted. + int previous_connection_id = 0; + + // Read config file one line at a time. + + for (Connection_Config_Info pci; + connection_file.read_entry (pci, line_number) != FP::RT_EOFILE; + ) + { + file_empty = 0; + + // First time in check. + if (previous_connection_id == 0) + { + previous_connection_id = 1; + + if (pci.connection_id_ != 1) + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("(%t) warning, the first connection id should be 1 not %d\n"), + pci.connection_id_)); + } + else if (previous_connection_id + 1 != pci.connection_id_) + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("(%t) warning, connection ids should keep increasing by 1 and %d + 1 != %d\n"), + previous_connection_id, + pci.connection_id_)); + + // Update the last connection id to ensure that we monotonically + // increase by 1. + previous_connection_id = pci.connection_id_; + + if (Options::instance ()->enabled (Options::DEBUG)) + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("(%t) conn id = %d, ") + ACE_TEXT ("host = %s, ") + ACE_TEXT ("remote port = %d, ") + ACE_TEXT ("proxy role = %c, ") + ACE_TEXT ("max retry timeout = %d, ") + ACE_TEXT ("local port = %d, ") + ACE_TEXT ("priority = %d\n"), + pci.connection_id_, + pci.host_, + pci.remote_port_, + pci.connection_role_, + pci.max_retry_timeout_, + pci.local_port_, + pci.priority_)); + + pci.event_channel_ = &this->event_channel_; + + // Create the appropriate type of Proxy. + Connection_Handler *connection_handler; + + ACE_ALLOCATOR_RETURN (connection_handler, + this->connection_handler_factory_.make_connection_handler (pci), + -1); + + // Bind the new Connection_Handler to the connection ID. + this->event_channel_.bind_proxy (connection_handler); + } + + // Keep track of the next available connection id, which is + // necessary for Peers that connect with us, rather than vice versa. + Options::instance ()->connection_id () = previous_connection_id + 1; + + if (file_empty) + ACE_ERROR ((LM_WARNING, + ACE_TEXT ("warning: connection connection_handler configuration file was empty\n"))); + return 0; +} + +int +Gateway::parse_consumer_config_file (void) +{ + // File that contains the consumer event forwarding information. + Consumer_Config_File_Parser consumer_file; + int file_empty = 1; + int line_number = 0; + + if (consumer_file.open (Options::instance ()->consumer_config_file ()) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("(%t) %p\n"), + Options::instance ()->consumer_config_file ()), + -1); + + // Read config file line at a time. + for (Consumer_Config_Info cci_entry; + consumer_file.read_entry (cci_entry, line_number) != FP::RT_EOFILE; + ) + { + file_empty = 0; + + if (Options::instance ()->enabled (Options::DEBUG)) + { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("(%t) connection id = %d, payload = %d, ") + ACE_TEXT ("number of consumers = %d\n"), + cci_entry.connection_id_, + cci_entry.type_, + cci_entry.total_consumers_)); + + for (int i = 0; i < cci_entry.total_consumers_; i++) + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("(%t) destination[%d] = %d\n"), + i, + cci_entry.consumers_[i])); + } + + Consumer_Dispatch_Set *dispatch_set; + ACE_NEW_RETURN (dispatch_set, + Consumer_Dispatch_Set, + -1); + + Event_Key event_addr (cci_entry.connection_id_, + cci_entry.type_); + + // Add the Consumers to the Dispatch_Set. + for (int i = 0; i < cci_entry.total_consumers_; i++) + { + Connection_Handler *connection_handler = 0; + + // Lookup destination and add to Consumer_Dispatch_Set set + // if found. + if (this->event_channel_.find_proxy (cci_entry.consumers_[i], + connection_handler) != -1) + dispatch_set->insert (connection_handler); + else + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("(%t) not found: destination[%d] = %d\n"), + i, + cci_entry.consumers_[i])); + } + + this->event_channel_.subscribe (event_addr, dispatch_set); + } + + if (file_empty) + ACE_ERROR ((LM_WARNING, + ACE_TEXT ("warning: consumer map configuration file was empty\n"))); + return 0; +} + +// The following is a "Factory" used by the ACE_Service_Config and +// svc.conf file to dynamically initialize the state of the Gateway. + +ACE_SVC_FACTORY_DEFINE (Gateway) + diff --git a/ACE/apps/Gateway/Gateway/Gateway.h b/ACE/apps/Gateway/Gateway/Gateway.h new file mode 100644 index 00000000000..fe7d138b3a1 --- /dev/null +++ b/ACE/apps/Gateway/Gateway/Gateway.h @@ -0,0 +1,33 @@ +// -*- C++ -*- +// +// $Id$ + +// ============================================================================ +// +// = LIBRARY +// gateway +// +// = FILENAME +// Gateway.h +// +// = DESCRIPTION +// Since the Gateway is an <ACE_Service_Object>, this file defines +// the entry point into the Service Configurator framework. +// +// = AUTHOR +// Doug Schmidt +// +// ============================================================================ + +#ifndef ACE_GATEWAY +#define ACE_GATEWAY + +#include "ace/svc_export.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +ACE_SVC_FACTORY_DECLARE (Gateway) + +#endif /* ACE_GATEWAY */ diff --git a/ACE/apps/Gateway/Gateway/Makefile.am b/ACE/apps/Gateway/Gateway/Makefile.am new file mode 100644 index 00000000000..83392e20564 --- /dev/null +++ b/ACE/apps/Gateway/Gateway/Makefile.am @@ -0,0 +1,78 @@ +## Process this file with automake to create Makefile.in +## +## $Id$ +## +## This file was generated by MPC. Any changes made directly to +## this file will be lost the next time it is generated. +## +## MPC Command: +## /acebuilds/ACE_wrappers-repository/bin/mwc.pl -include /acebuilds/MPC/config -include /acebuilds/MPC/templates -feature_file /acebuilds/ACE_wrappers-repository/local.features -noreldefs -type automake -exclude build,Kokyu + +ACE_BUILDDIR = $(top_builddir) +ACE_ROOT = $(top_srcdir) + +## Makefile.Gateway.am + +noinst_LTLIBRARIES = libGateway.la + +libGateway_la_CPPFLAGS = \ + -I$(ACE_ROOT) \ + -I$(ACE_BUILDDIR) + +libGateway_la_SOURCES = \ + Concrete_Connection_Handlers.cpp \ + Config_Files.cpp \ + Connection_Handler.cpp \ + Connection_Handler_Acceptor.cpp \ + Connection_Handler_Connector.cpp \ + Event_Channel.cpp \ + Event_Forwarding_Discriminator.cpp \ + File_Parser.cpp \ + Gateway.cpp \ + Options.cpp + +noinst_HEADERS = \ + Concrete_Connection_Handlers.h \ + Config_Files.h \ + Connection_Handler.h \ + Connection_Handler_Acceptor.h \ + Connection_Handler_Connector.h \ + Event_Channel.h \ + Event_Forwarding_Discriminator.h \ + File_Parser.h \ + Gateway.h \ + Options.h + +## Makefile.gatewayd.am +noinst_PROGRAMS = gatewayd + +gatewayd_CPPFLAGS = \ + -I$(ACE_ROOT) \ + -I$(ACE_BUILDDIR) + +gatewayd_SOURCES = \ + gatewayd.cpp \ + Concrete_Connection_Handlers.h \ + Config_Files.h \ + Connection_Handler.h \ + Connection_Handler_Acceptor.h \ + Connection_Handler_Connector.h \ + Consumer_Dispatch_Set.h \ + Event.h \ + Event_Channel.h \ + Event_Forwarding_Discriminator.h \ + File_Parser.h \ + Gateway.h \ + Options.h + +gatewayd_LDADD = \ + libGateway.la \ + $(ACE_BUILDDIR)/ace/libACE.la + +## Clean up template repositories, etc. +clean-local: + -rm -f *~ *.bak *.rpo *.sym lib*.*_pure_* core core.* + -rm -f gcctemp.c gcctemp so_locations *.ics + -rm -rf cxx_repository ptrepository ti_files + -rm -rf templateregistry ir.out + -rm -rf ptrepository SunWS_cache Templates.DB diff --git a/ACE/apps/Gateway/Gateway/Options.cpp b/ACE/apps/Gateway/Gateway/Options.cpp new file mode 100644 index 00000000000..cc80ce06c7d --- /dev/null +++ b/ACE/apps/Gateway/Gateway/Options.cpp @@ -0,0 +1,288 @@ +// $Id$ + +#define ACE_BUILD_SVC_DLL + +#include "Event.h" +#include "Options.h" +#include "ace/Get_Opt.h" +#include "ace/Log_Msg.h" +#include "ace/OS_NS_string.h" +#include "ace/OS_NS_strings.h" +#include "ace/os_include/os_fcntl.h" + +ACE_RCSID(Gateway, Options, "$Id$") + +// Static initialization. +Options *Options::instance_ = 0; + +// Let's have a usage prompt. +void +Options::print_usage (void) +{ + ACE_DEBUG ((LM_INFO, + "gatewayd [-a {C|S}:acceptor-port] [-c {C|S}:connector-port]" + " [-C consumer_config_file] [-P connection_config_filename]" + " [-q socket_queue_size] [-t OUTPUT_MT|INPUT_MT] [-w time_out]" + " [-b] [-d] [-v] [-T]\n" + "" + "\t-a Become an Acceptor\n" + "\t-b Use blocking connection establishment\n" + "\t-c Become a Connector\n" + "\t-d debugging\n" + "\t-q Use a different socket queue size\n" + "\t-t Use a different threading strategy\n" + "\t-v Verbose mode\n" + "\t-w Time performance for a designated amount of time\n" + "\t-C Use a different proxy config filename\n" + "\t-P Use a different consumer config filename\n" + "\t-T Tracing\n" + )); +} +Options * +Options::instance (void) +{ + if (Options::instance_ == 0) + ACE_NEW_RETURN (Options::instance_, Options, 0); + + return Options::instance_; +} + +Options::Options (void) + : locking_strategy_ (0), + performance_window_ (0), + blocking_semantics_ (ACE_NONBLOCK), + socket_queue_size_ (0), + threading_strategy_ (REACTIVE), + options_ (0), + supplier_acceptor_port_ (DEFAULT_GATEWAY_SUPPLIER_PORT), + consumer_acceptor_port_ (DEFAULT_GATEWAY_CONSUMER_PORT), + supplier_connector_port_ (DEFAULT_PEER_SUPPLIER_PORT), + consumer_connector_port_ (DEFAULT_PEER_CONSUMER_PORT), + max_timeout_ (MAX_TIMEOUT), + max_queue_size_ (MAX_QUEUE_SIZE), + connection_id_ (1) +{ + ACE_OS::strcpy (this->connection_config_file_, ACE_TEXT("connection_config")); + ACE_OS::strcpy (this->consumer_config_file_, ACE_TEXT("consumer_config")); +} + +int +Options::enabled (int option) const +{ + return ACE_BIT_ENABLED (this->options_, option); +} + +Options::~Options (void) +{ + delete this->locking_strategy_; +} + +ACE_Lock_Adapter<ACE_SYNCH_MUTEX> * +Options::locking_strategy (void) const +{ + return this->locking_strategy_; +} + +void +Options::locking_strategy (ACE_Lock_Adapter<ACE_SYNCH_MUTEX> *ls) +{ + this->locking_strategy_ = ls; +} + +long +Options::performance_window (void) const +{ + return this->performance_window_; +} + +CONNECTION_ID & +Options::connection_id (void) +{ + return this->connection_id_; +} + +long +Options::max_timeout (void) const +{ + return this->max_timeout_; +} + +int +Options::blocking_semantics (void) const +{ + return this->blocking_semantics_; +} + +int +Options::socket_queue_size (void) const +{ + return this->socket_queue_size_; +} + +u_long +Options::threading_strategy (void) const +{ + return this->threading_strategy_; +} + +const ACE_TCHAR * +Options::connection_config_file (void) const +{ + return this->connection_config_file_; +} + +const ACE_TCHAR * +Options::consumer_config_file (void) const +{ + return this->consumer_config_file_; +} + +u_short +Options::consumer_acceptor_port (void) const +{ + return this->consumer_acceptor_port_; +} + +u_short +Options::supplier_acceptor_port (void) const +{ + return this->supplier_acceptor_port_; +} + +u_short +Options::consumer_connector_port (void) const +{ + return this->consumer_connector_port_; +} + +long +Options::max_queue_size (void) const +{ + return this->max_queue_size_; +} + +u_short +Options::supplier_connector_port (void) const +{ + return this->supplier_connector_port_; +} + +// Parse the "command-line" arguments and set the corresponding flags. + +int +Options::parse_args (int argc, ACE_TCHAR *argv[]) +{ + // Assign defaults. + ACE_Get_Opt get_opt (argc, + argv, + ACE_TEXT("a:bC:c:dm:P:p:q:r:t:vw:"), + 0); + + for (int c; (c = get_opt ()) != EOF; ) + { + switch (c) + { + case 'a': + { + // Become an Acceptor. + + for (ACE_TCHAR *flag = ACE_OS::strtok (get_opt.opt_arg (), ACE_TEXT("|")); + flag != 0; + flag = ACE_OS::strtok (0, ACE_TEXT("|"))) + if (ACE_OS::strncasecmp (flag, ACE_TEXT("C"), 1) == 0) + { + ACE_SET_BITS (this->options_, + Options::CONSUMER_ACCEPTOR); + if (ACE_OS::strlen (flag) > 1) + // Set the Consumer Acceptor port number. + this->consumer_acceptor_port_ = ACE_OS::atoi (flag + 2); + } + else if (ACE_OS::strncasecmp (flag, ACE_TEXT("S"), 1) == 0) + { + ACE_SET_BITS (this->options_, + Options::SUPPLIER_ACCEPTOR); + if (ACE_OS::strlen (flag) > 1) + // Set the Supplier Acceptor port number. + this->supplier_acceptor_port_ = ACE_OS::atoi (flag + 2); + } + } + break; + /* NOTREACHED */ + case 'b': // Use blocking connection establishment. + this->blocking_semantics_ = 1; + break; + case 'C': // Use a different proxy config filename. + ACE_OS::strncpy (this->consumer_config_file_, + get_opt.opt_arg (), + sizeof this->consumer_config_file_ + / sizeof (ACE_TCHAR)); + break; + case 'c': + { + // Become a Connector. + + for (ACE_TCHAR *flag = ACE_OS::strtok (get_opt.opt_arg (), ACE_TEXT("|")); + flag != 0; + flag = ACE_OS::strtok (0, ACE_TEXT("|"))) + if (ACE_OS::strncasecmp (flag, ACE_TEXT("C"), 1) == 0) + { + ACE_SET_BITS (this->options_, + Options::CONSUMER_CONNECTOR); + if (ACE_OS::strlen (flag) > 1) + // Set the Consumer Connector port number. + this->consumer_connector_port_ = ACE_OS::atoi (flag + 2); + } + else if (ACE_OS::strncasecmp (flag, ACE_TEXT("S"), 1) == 0) + { + ACE_SET_BITS (this->options_, + Options::SUPPLIER_CONNECTOR); + if (ACE_OS::strlen (flag) > 1) + // Set the Supplier Connector port number. + this->supplier_connector_port_ = ACE_OS::atoi (flag + 2); + } + } + break; + /* NOTREACHED */ + case 'd': // We are debugging. + ACE_SET_BITS (this->options_, + Options::DEBUG); + break; + case 'P': // Use a different connection config filename. + ACE_OS::strncpy (this->connection_config_file_, + get_opt.opt_arg (), + sizeof this->connection_config_file_); + break; + case 'q': // Use a different socket queue size. + this->socket_queue_size_ = ACE_OS::atoi (get_opt.opt_arg ()); + break; + case 't': // Use a different threading strategy. + { + for (ACE_TCHAR *flag = ACE_OS::strtok (get_opt.opt_arg (), ACE_TEXT("|")); + flag != 0; + flag = ACE_OS::strtok (0, ACE_TEXT("|"))) + if (ACE_OS::strcmp (flag, ACE_TEXT("OUTPUT_MT")) == 0) + ACE_SET_BITS (this->threading_strategy_, + Options::OUTPUT_MT); + else if (ACE_OS::strcmp (flag, ACE_TEXT("INPUT_MT")) == 0) + ACE_SET_BITS (this->threading_strategy_, + Options::INPUT_MT); + break; + } + case 'v': // Verbose mode. + ACE_SET_BITS (this->options_, + Options::VERBOSE); + break; + case 'w': // Time performance for a designated amount of time. + this->performance_window_ = ACE_OS::atoi (get_opt.opt_arg ()); + // Use blocking connection semantics so that we get accurate + // timings (since all connections start at once). + this->blocking_semantics_ = 0; + break; + default: + this->print_usage(); // It's nice to have a usage prompt. + break; + } + } + + return 0; +} diff --git a/ACE/apps/Gateway/Gateway/Options.h b/ACE/apps/Gateway/Gateway/Options.h new file mode 100644 index 00000000000..56f7c95e7e7 --- /dev/null +++ b/ACE/apps/Gateway/Gateway/Options.h @@ -0,0 +1,196 @@ +/* -*- C++ -*- */ +// $Id$ + +// ============================================================================ +// +// = LIBRARY +// gateway +// +// = FILENAME +// Options.h +// +// = AUTHOR +// Douglas C. Schmidt <schmidt@cs.wustl.edu> +// +// ============================================================================ + +#ifndef OPTIONS_H +#define OPTIONS_H + +#include "ace/config-all.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +#include "ace/svc_export.h" +#include "ace/Lock_Adapter_T.h" +#include "ace/Synch_Traits.h" +#include "ace/Thread_Mutex.h" + +class ACE_Svc_Export Options +{ + // = TITLE + // Singleton that consolidates all Options for a gatewayd. +public: + // = Options that can be enabled/disabled. + enum + { + // = The types of threading strategies. + REACTIVE = 0, + OUTPUT_MT = 1, + INPUT_MT = 2, + + VERBOSE = 01, + DEBUG = 02, + + SUPPLIER_ACCEPTOR = 04, + CONSUMER_ACCEPTOR = 010, + SUPPLIER_CONNECTOR = 020, + CONSUMER_CONNECTOR = 040 + }; + + static Options *instance (void); + // Return Singleton. + + ~Options (void); + // Termination. + + int parse_args (int argc, ACE_TCHAR *argv[]); + // Parse the arguments and set the options. + + void print_usage(void); + // Print the gateway supported parameters. + // = Accessor methods. + int enabled (int option) const; + // Determine if an option is enabled. + + ACE_Lock_Adapter<ACE_SYNCH_MUTEX> *locking_strategy (void) const; + // Gets the locking strategy used for serializing access to the + // reference count in <ACE_Message_Block>. If it's 0, then there's + // no locking strategy and we're using a REACTIVE concurrency + // strategy. + + void locking_strategy (ACE_Lock_Adapter<ACE_SYNCH_MUTEX> *); + // Set the locking strategy used for serializing access to the + // reference count in <ACE_Message_Block>. + + long performance_window (void) const; + // Number of seconds after connection establishment to report + // throughput. + + int blocking_semantics (void) const; + // 0 == blocking connects, ACE_NONBLOCK == non-blocking connects. + + int socket_queue_size (void) const; + // Size of the socket queue (0 means "use default"). + + u_long threading_strategy (void) const; + // i.e., REACTIVE, OUTPUT_MT, and/or INPUT_MT. + + u_short supplier_acceptor_port (void) const; + // Our acceptor port number, i.e., the one that we passively listen + // on for connections to arrive from a gatewayd and create a + // Supplier. + + u_short consumer_acceptor_port (void) const; + // Our acceptor port number, i.e., the one that we passively listen + // on for connections to arrive from a gatewayd and create a + // Consumer. + + u_short supplier_connector_port (void) const; + // The connector port number, i.e., the one that we use to actively + // establish connections with a gatewayd and create a Supplier. + + u_short consumer_connector_port (void) const; + // The connector port number, i.e., the one that we use to actively + // establish connections with a gatewayd and create a Consumer. + + const ACE_TCHAR *connection_config_file (void) const; + // Name of the connection configuration file. + + const ACE_TCHAR *consumer_config_file (void) const; + // Name of the consumer map configuration file. + + long max_timeout (void) const; + // The maximum retry timeout delay. + + long max_queue_size (void) const; + // The maximum size of the queue. + + CONNECTION_ID &connection_id (void); + // Returns a reference to the next available connection id; + +private: + enum + { + MAX_QUEUE_SIZE = 1024 * 1024 * 16, + // We'll allow up to 16 megabytes to be queued per-output proxy. + + MAX_TIMEOUT = 32 + // The maximum timeout for trying to re-establish connections. + }; + + Options (void); + // Initialization. + + static Options *instance_; + // Options Singleton instance. + + ACE_Lock_Adapter<ACE_SYNCH_MUTEX> *locking_strategy_; + // Points to the locking strategy used for serializing access to the + // reference count in <ACE_Message_Block>. If it's 0, then there's + // no locking strategy and we're using a REACTIVE concurrency + // strategy. + + long performance_window_; + // Number of seconds after connection establishment to report + // throughput. + + int blocking_semantics_; + // 0 == blocking connects, ACE_NONBLOCK == non-blocking connects. + + int socket_queue_size_; + // Size of the socket queue (0 means "use default"). + + u_long threading_strategy_; + // i.e., REACTIVE, OUTPUT_MT, and/or INPUT_MT. + + u_long options_; + // Flag to indicate if we want verbose diagnostics. + + u_short supplier_acceptor_port_; + // The acceptor port number, i.e., the one that we passively listen + // on for connections to arrive from a gatewayd and create a + // Supplier. + + u_short consumer_acceptor_port_; + // The acceptor port number, i.e., the one that we passively listen + // on for connections to arrive from a gatewayd and create a + // Consumer. + + u_short supplier_connector_port_; + // The connector port number, i.e., the one that we use to actively + // establish connections with a gatewayd and create a Supplier. + + u_short consumer_connector_port_; + // The connector port number, i.e., the one that we use to actively + // establish connections with a gatewayd and create a Consumer. + + long max_timeout_; + // The maximum retry timeout delay. + + long max_queue_size_; + // The maximum size of the queue. + + CONNECTION_ID connection_id_; + // The next available connection id. + + ACE_TCHAR connection_config_file_[MAXPATHLEN + 1]; + // Name of the connection configuration file. + + ACE_TCHAR consumer_config_file_[MAXPATHLEN + 1]; + // Name of the consumer map configuration file. +}; + +#endif /* OPTIONS_H */ diff --git a/ACE/apps/Gateway/Gateway/connection_config b/ACE/apps/Gateway/Gateway/connection_config new file mode 100644 index 00000000000..93730edc0da --- /dev/null +++ b/ACE/apps/Gateway/Gateway/connection_config @@ -0,0 +1,55 @@ +# Configuration file that the gatewayd process uses to determine +# connection information about proxies. +# +# The following provides an explanation for the fields in this file, +# and how they relate to fields in the corresponding "consumer_config" +# file. +# +# 1. Connection ID -- Each Connection Handler is given a unique ID +# that is used in the "consumer_config" file to specify to which +# Consumers the Event Channel will forward incoming events from +# Suppliers using that connection. The Connection ID field is the +# "key" that is used to match up connections in this file with the +# Consumer subscription requests in the "consumer_config" file. +# The connection ids should start at 1 and monotonically increase +# by increments of 1. This makes it possible for the Gateway to +# properly allocate connection ids for Peers that connect to it. +# +# 2. Host -- The host name where the Supplier/Consumer peerd +# process is running. +# +# 3. Remote Port -- The port number where the remote +# Supplier/Consumer peerd process is listening on. +# If this is a '*' character it is an indication to the +# Gateway to use the "default value," e.g., which can be provided +# on the command-line, etc. +# +# 4. Handler Role -- i.e., Consumer ('C') or Supplier ('S') +# +# 5. Max Retry Timeout -- The maximum amount of time that we'll +# wait between retry attempts (these start at 1 second and +# double until they reach the Max Retry Timeout). +# If this is a '*' character it is an indication to the +# Gateway to use the "default value," e.g., which can be provided +# on the command-line, etc. +# +# 6. Local Port -- The port number that we want to use for +# our local Proxy connection. If this is the value 0 or the '*' +# character, then we'll let the socket implementation pick this +# value for us. +# +# 7. Priority -- Each Consumer/Supplier can be given a priority +# that will determine its importance relative to other +# Consumers/Suppliers (this feature isn't implemented yet). +# +# Connection Host Remote Handler Max Retry Local Priority +# ID Port Role Timeout Port +# ---------- -------- ------ ------ ---------- ----- -------- + 1 localhost * S * * 1 + 2 localhost * C * * 1 +# 3 mambo.cs * C * * 1 +# 4 lambada.cs * C * * 1 +# 5 lambada.cs * C * * 1 +# 6 tango.cs * C * * 1 +# 7 tango.cs * S * * 1 +# 8 tango.cs * C * * 1 diff --git a/ACE/apps/Gateway/Gateway/consumer_config b/ACE/apps/Gateway/Gateway/consumer_config new file mode 100644 index 00000000000..1aaa3fc4028 --- /dev/null +++ b/ACE/apps/Gateway/Gateway/consumer_config @@ -0,0 +1,35 @@ +# Configuration file that the gatewayd process uses to determine which +# Consumers will receive events from which Suppliers. For now, the +# Gateway only allows Consumers to "subscribe" to receive events from +# particular Suppliers. A more flexible implementation will allow +# Consumers to subscribe to particular types of events, as well. +# +# The following provides an explanation for the fields in this file, +# and how they relate to fields in the corresponding "connection_config" +# file. +# +# 1. Connection ID -- Each Connection Handler is given a unique ID +# that is used in the "consumer_config" file to specify to which +# Consumers the Event Channel will forward incoming events from +# Suppliers. The Connection ID field is the "key" that is used to +# match up Consumer subscription requests in this file with +# connections in the "connection_config" file. +# +# 2. Event Type -- Indicates the type of the event. Consumers +# can use this to only subscribe to certain types of events. +# This feature is currently not implemented. +# +# 3. Consumers -- Indicates which Consumers will receive events sent +# from this Proxy/Supplier ID, i.e., Consumers can subscribe to +# receive events from particular Suppliers. Note that more than +# one Consumer can subscribe to the same Supplier event, i.e., +# we support logical "multicast" (which is currently implemented +# using multi-point unicast via TCP/IP). +# +# Connection Event Consumers +# ID Type +# ---------- ---- --------- + 1 0 2 +# 2 0 3,4 +# 3 0 4 +# 4 0 5 diff --git a/ACE/apps/Gateway/Gateway/gateway.mpc b/ACE/apps/Gateway/Gateway/gateway.mpc new file mode 100644 index 00000000000..9403ba858b1 --- /dev/null +++ b/ACE/apps/Gateway/Gateway/gateway.mpc @@ -0,0 +1,29 @@ +// -*- MPC -*- +// $Id$ + +project(Gateway) : acelib { + sharedname = Gateway + Source_Files { + Concrete_Connection_Handlers.cpp + Config_Files.cpp + File_Parser.cpp + Gateway.cpp + Event_Channel.cpp + Event_Forwarding_Discriminator.cpp + Options.cpp + Connection_Handler.cpp + Connection_Handler_Acceptor.cpp + Connection_Handler_Connector.cpp + } +} + +project(gatewayd) : aceexe { + exename = gatewayd + after += Gateway + libs += Gateway + + Source_Files { + gatewayd.cpp + } +} + diff --git a/ACE/apps/Gateway/Gateway/gatewayd.cpp b/ACE/apps/Gateway/Gateway/gatewayd.cpp new file mode 100644 index 00000000000..2c598c6805f --- /dev/null +++ b/ACE/apps/Gateway/Gateway/gatewayd.cpp @@ -0,0 +1,69 @@ +// $Id$ + +// ============================================================================ +// +// = LIBRARY +// gateway +// +// = FILENAME +// peerd.h +// +// = DESCRIPTION +// Driver for the gateway daemon (gatewayd). Note that this is +// completely generic code due to the Service Configurator +// framework! +// +// = AUTHOR +// Douglas C. Schmidt +// +// ============================================================================ + +#include "ace/OS_NS_unistd.h" +#include "ace/Service_Config.h" +#include "ace/Service_Object.h" +#include "ace/Log_Msg.h" +#include "ace/Reactor.h" +#include "Gateway.h" + +ACE_RCSID (Gateway, + gatewayd, + "$Id$") + +int +ACE_TMAIN (int argc, ACE_TCHAR *argv[]) +{ + if (ACE_OS::access (ACE_DEFAULT_SVC_CONF, F_OK) != 0) + { + // Use static linking. + ACE_Service_Object_Ptr sp = ACE_SVC_INVOKE (Gateway); + + if (sp->init (argc - 1, argv + 1) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("%p\n"), + ACE_TEXT ("init")), + 1); + + // Run forever, performing the configured services until we + // are shut down by a SIGINT/SIGQUIT signal. + + ACE_Reactor::instance ()->run_reactor_event_loop (); + + // Destructor of <ACE_Service_Object_Ptr> automagically call + // <fini>. + } + else + { + if (ACE_Service_Config::open (argc, argv) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("%p\n"), + ACE_TEXT ("open")), + 1); + else // Use dynamic linking. + + // Run forever, performing the configured services until we are + // shut down by a signal (e.g., SIGINT or SIGQUIT). + + ACE_Reactor::instance ()->run_reactor_event_loop (); + } + return 0; +} diff --git a/ACE/apps/Gateway/Gateway/svc.conf b/ACE/apps/Gateway/Gateway/svc.conf new file mode 100644 index 00000000000..3698b0e3e13 --- /dev/null +++ b/ACE/apps/Gateway/Gateway/svc.conf @@ -0,0 +1,3 @@ +#static Svc_Manager "-d -p 2913" +dynamic Gateway Service_Object * Gateway:_make_Gateway() active "-b -d -c C|S -a C|S -P connection_config -C consumer_config" + diff --git a/ACE/apps/Gateway/Makefile.am b/ACE/apps/Gateway/Makefile.am new file mode 100644 index 00000000000..ff47f4b6ecd --- /dev/null +++ b/ACE/apps/Gateway/Makefile.am @@ -0,0 +1,14 @@ +## Process this file with automake to create Makefile.in +## +## $Id$ +## +## This file was generated by MPC. Any changes made directly to +## this file will be lost the next time it is generated. +## +## MPC Command: +## /acebuilds/ACE_wrappers-repository/bin/mwc.pl -include /acebuilds/MPC/config -include /acebuilds/MPC/templates -feature_file /acebuilds/ACE_wrappers-repository/local.features -noreldefs -type automake -exclude build,Kokyu + +SUBDIRS = \ + Gateway \ + Peer + diff --git a/ACE/apps/Gateway/Peer/Makefile.am b/ACE/apps/Gateway/Peer/Makefile.am new file mode 100644 index 00000000000..843f407270a --- /dev/null +++ b/ACE/apps/Gateway/Peer/Makefile.am @@ -0,0 +1,52 @@ +## Process this file with automake to create Makefile.in +## +## $Id$ +## +## This file was generated by MPC. Any changes made directly to +## this file will be lost the next time it is generated. +## +## MPC Command: +## /acebuilds/ACE_wrappers-repository/bin/mwc.pl -include /acebuilds/MPC/config -include /acebuilds/MPC/templates -feature_file /acebuilds/ACE_wrappers-repository/local.features -noreldefs -type automake -exclude build,Kokyu + +ACE_BUILDDIR = $(top_builddir) +ACE_ROOT = $(top_srcdir) + +## Makefile.Gateway_Peer.am + +noinst_LTLIBRARIES = libGateway_Peer.la + +libGateway_Peer_la_CPPFLAGS = \ + -I$(ACE_ROOT) \ + -I$(ACE_BUILDDIR) + +libGateway_Peer_la_SOURCES = \ + Options.cpp \ + Peer.cpp + +noinst_HEADERS = \ + Options.h \ + Peer.h + +## Makefile.gateway_peerd.am +noinst_PROGRAMS = peerd + +peerd_CPPFLAGS = \ + -I$(ACE_ROOT) \ + -I$(ACE_BUILDDIR) + +peerd_SOURCES = \ + peerd.cpp \ + Options.h \ + Peer.h + +peerd_LDADD = \ + libGateway_Peer.la \ + $(ACE_BUILDDIR)/ace/libACE.la + +## Clean up template repositories, etc. +clean-local: + -rm -f *~ *.bak *.rpo *.sym lib*.*_pure_* core core.* + -rm -f gcctemp.c gcctemp so_locations *.ics + -rm -rf cxx_repository ptrepository ti_files + -rm -rf templateregistry ir.out + -rm -rf ptrepository SunWS_cache Templates.DB diff --git a/ACE/apps/Gateway/Peer/Options.cpp b/ACE/apps/Gateway/Peer/Options.cpp new file mode 100644 index 00000000000..0b33552e629 --- /dev/null +++ b/ACE/apps/Gateway/Peer/Options.cpp @@ -0,0 +1,201 @@ +// $Id$ + +#define ACE_BUILD_SVC_DLL + +#include "ace/Get_Opt.h" +#include "ace/Log_Msg.h" +#include "ace/OS_NS_stdlib.h" +#include "ace/OS_NS_strings.h" +#include "ace/OS_NS_string.h" +#include "ace/OS_Memory.h" +#include "Options.h" + +ACE_RCSID(Peer, Options, "$Id$") + +// Static initialization. +Options *Options::instance_ = 0; + +void +Options::print_usage_and_die (void) +{ + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("%n [-a {C|S}:acceptor-port] [-c {C|S}:connector-port] [-C connection-id] [-h gateway-host] [-q max-queue-size] [-t timeout] [-v]\n"))); + ACE_OS::exit (1); +} + +Options::Options (void) + : options_ (0), + supplier_acceptor_port_ (DEFAULT_PEER_SUPPLIER_PORT), + consumer_acceptor_port_ (DEFAULT_PEER_CONSUMER_PORT), + supplier_connector_port_ (DEFAULT_GATEWAY_SUPPLIER_PORT), + consumer_connector_port_ (DEFAULT_GATEWAY_CONSUMER_PORT), + connector_host_ (ACE_DEFAULT_SERVER_HOST), + timeout_ (0), + max_queue_size_ (MAX_QUEUE_SIZE), + connection_id_ (0) +{ + char *timeout = ACE_OS::getenv ("TIMEOUT"); + + if (timeout == 0) + this->timeout_ = Options::DEFAULT_TIMEOUT; + else + this->timeout_ = ACE_OS::atoi (timeout); +} + +Options * +Options::instance (void) +{ + if (Options::instance_ == 0) + ACE_NEW_RETURN (Options::instance_, Options, 0); + + return Options::instance_; +} + +long +Options::timeout (void) const +{ + return this->timeout_; +} + +CONNECTION_ID & +Options::connection_id (void) +{ + return this->connection_id_; +} + +long +Options::max_queue_size (void) const +{ + return this->max_queue_size_; +} + +u_short +Options::consumer_acceptor_port (void) const +{ + return this->consumer_acceptor_port_; +} + +u_short +Options::supplier_acceptor_port (void) const +{ + return this->supplier_acceptor_port_; +} + +u_short +Options::consumer_connector_port (void) const +{ + return this->consumer_connector_port_; +} + +u_short +Options::supplier_connector_port (void) const +{ + return this->supplier_connector_port_; +} + +const ACE_TCHAR * +Options::connector_host (void) const +{ + return this->connector_host_; +} + +int +Options::enabled (int option) const +{ + return ACE_BIT_ENABLED (this->options_, option); +} + +void +Options::parse_args (int argc, ACE_TCHAR *argv[]) +{ + ACE_Get_Opt get_opt (argc, argv, ACE_TEXT ("a:c:C:h:m:t:v"), 0); + + for (int c; (c = get_opt ()) != -1; ) + { + switch (c) + { + case 'a': + { + // Become an Acceptor. + + for (ACE_TCHAR *flag = ACE_OS::strtok (get_opt.opt_arg (), + ACE_TEXT ("|")); + flag != 0; + flag = ACE_OS::strtok (0, ACE_TEXT ("|"))) + if (ACE_OS::strncasecmp (flag, ACE_TEXT ("C"), 1) == 0) + { + ACE_SET_BITS (this->options_, + Options::CONSUMER_ACCEPTOR); + if (ACE_OS::strlen (flag) > 1) + // Set the Consumer Acceptor port number. + this->consumer_acceptor_port_ = ACE_OS::atoi (flag + 2); + } + else if (ACE_OS::strncasecmp (flag, ACE_TEXT ("S"), 1) == 0) + { + ACE_SET_BITS (this->options_, + Options::SUPPLIER_ACCEPTOR); + if (ACE_OS::strlen (flag) > 1) + // Set the Supplier Acceptor port number. + this->supplier_acceptor_port_ = ACE_OS::atoi (flag + 2); + } + } + break; + /* NOTREACHED */ + case 'c': + { + // Become a Connector. + + for (ACE_TCHAR *flag = ACE_OS::strtok (get_opt.opt_arg (), + ACE_TEXT ("|")); + flag != 0; + flag = ACE_OS::strtok (0, ACE_TEXT ("|"))) + if (ACE_OS::strncasecmp (flag, ACE_TEXT ("C"), 1) == 0) + { + ACE_SET_BITS (this->options_, + Options::CONSUMER_CONNECTOR); + if (ACE_OS::strlen (flag) > 1) + // Set the Consumer Connector port number. + this->consumer_connector_port_ = ACE_OS::atoi (flag + 2); + } + else if (ACE_OS::strncasecmp (flag, ACE_TEXT ("S"), 1) == 0) + { + ACE_SET_BITS (this->options_, + Options::SUPPLIER_CONNECTOR); + if (ACE_OS::strlen (flag) > 1) + // Set the Supplier Connector port number. + this->supplier_connector_port_ = ACE_OS::atoi (flag + 2); + } + } + break; + /* NOTREACHED */ + case 'C': + this->connection_id_ = ACE_OS::atoi (get_opt.opt_arg ()); + break; + /* NOTREACHED */ + case 'h': + // connector host + this->connector_host_ = get_opt.opt_arg (); + break; + /* NOTREACHED */ + case 'm': + // max queue size. + this->max_queue_size_ = ACE_OS::atoi (get_opt.opt_arg ()); + break; + /* NOTREACHED */ + case 't': + // Timeout + this->timeout_ = ACE_OS::atoi (get_opt.opt_arg ()); + break; + /* NOTREACHED */ + case 'v': + // Verbose mode. + ACE_SET_BITS (this->options_, Options::VERBOSE); + break; + /* NOTREACHED */ + default: + this->print_usage_and_die (); + /* NOTREACHED */ + } + } +} + diff --git a/ACE/apps/Gateway/Peer/Options.h b/ACE/apps/Gateway/Peer/Options.h new file mode 100644 index 00000000000..bc872fe7df4 --- /dev/null +++ b/ACE/apps/Gateway/Peer/Options.h @@ -0,0 +1,135 @@ +// -*- C++ -*- +// +// $Id$ + +// ============================================================================ +// +// = LIBRARY +// gateway +// +// = FILENAME +// Options.h +// +// = AUTHOR +// Douglas C. Schmidt +// +// ============================================================================ + +#ifndef OPTIONS_H +#define OPTIONS_H + +#include "../Gateway/Event.h" +#include "ace/svc_export.h" + +class ACE_Svc_Export Options + // = TITLE + // Singleton that consolidates all Options for a peerd. +{ +public: + // = Options that can be enabled/disabled. + enum + { + VERBOSE = 01, + SUPPLIER_ACCEPTOR = 02, + CONSUMER_ACCEPTOR = 04, + SUPPLIER_CONNECTOR = 010, + CONSUMER_CONNECTOR = 020 + }; + + static Options *instance (void); + // Return Singleton. + + void parse_args (int argc, ACE_TCHAR *argv[]); + // Parse the arguments and set the options. + + // = Accessor methods. + int enabled (int option) const; + // Determine if an option is enabled. + + u_short supplier_acceptor_port (void) const; + // Our acceptor port number, i.e., the one that we passively listen + // on for connections to arrive from a gatewayd and create a + // Supplier. + + u_short consumer_acceptor_port (void) const; + // Our acceptor port number, i.e., the one that we passively listen + // on for connections to arrive from a gatewayd and create a + // Consumer. + + u_short supplier_connector_port (void) const; + // The connector port number, i.e., the one that we use to actively + // establish connections with a gatewayd and create a Supplier. + + u_short consumer_connector_port (void) const; + // The connector port number, i.e., the one that we use to actively + // establish connections with a gatewayd and create a Consumer. + + const ACE_TCHAR *connector_host (void) const; + // Our connector port host, i.e., the host running the gatewayd + // process. + + long timeout (void) const; + // Duration between disconnects. + + long max_queue_size (void) const; + // The maximum size of the queue. + + CONNECTION_ID &connection_id (void); + // Returns a reference to the connection id. + +private: + enum + { + MAX_QUEUE_SIZE = 1024 * 1024 * 16, + // We'll allow up to 16 megabytes to be queued per-output + // channel!!!! This is clearly a policy in search of + // refinement... + + DEFAULT_TIMEOUT = 60 + // By default, disconnect the peer every minute. + }; + + Options (void); + // Ensure Singleton. + + void print_usage_and_die (void); + // Explain usage and exit. + + static Options *instance_; + // Singleton. + + u_long options_; + // Flag to indicate if we want verbose diagnostics. + + u_short supplier_acceptor_port_; + // The acceptor port number, i.e., the one that we passively listen + // on for connections to arrive from a gatewayd and create a + // Supplier. + + u_short consumer_acceptor_port_; + // The acceptor port number, i.e., the one that we passively listen + // on for connections to arrive from a gatewayd and create a + // Consumer. + + u_short supplier_connector_port_; + // The connector port number, i.e., the one that we use to actively + // establish connections with a gatewayd and create a Supplier. + + u_short consumer_connector_port_; + // The connector port number, i.e., the one that we use to actively + // establish connections with a gatewayd and create a Consumer. + + const ACE_TCHAR *connector_host_; + // Our connector host, i.e., where the gatewayd process is running. + + long timeout_; + // The amount of time to wait before disconnecting from the Peerd. + + long max_queue_size_; + // The maximum size that the queue can grow to. + + CONNECTION_ID connection_id_; + // The connection id. +}; + +#endif /* OPTIONS_H */ diff --git a/ACE/apps/Gateway/Peer/Peer.cpp b/ACE/apps/Gateway/Peer/Peer.cpp new file mode 100644 index 00000000000..437d39f1611 --- /dev/null +++ b/ACE/apps/Gateway/Peer/Peer.cpp @@ -0,0 +1,890 @@ +// $Id$ + +#define ACE_BUILD_SVC_DLL + +#include "ace/OS_NS_stdio.h" +#include "ace/OS_NS_string.h" +#include "ace/OS_NS_unistd.h" +#include "ace/Signal.h" +#include "Peer.h" + +ACE_RCSID(Peer, Peer, "$Id$") + +Peer_Handler::Peer_Handler (void) + : connection_id_ (-1), // Maybe it's better than 0. + msg_frag_ (0), + total_bytes_ (0) +{ + // Set the high water mark of the <ACE_Message_Queue>. This is used + // to exert flow control. + this->msg_queue ()->high_water_mark (Options::instance ()->max_queue_size ()); + first_time_ = 1; // It will be first time to open Peer_Handler. +} + +// Upcall from the <ACE_Acceptor::handle_input> that turns control +// over to our application-specific Gateway handler. + +int +Peer_Handler::open (void *a) +{ + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("handle = %d\n"), + this->peer ().get_handle ())); + + // Call down to the base class to activate and register this handler + // with an <ACE_Reactor>. + if (this->inherited::open (a) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("%p\n"), + ACE_TEXT ("open")), + -1); + + if (this->peer ().enable (ACE_NONBLOCK) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("%p\n"), + ACE_TEXT ("enable")), + -1); + + ACE_Time_Value timeout (Options::instance ()->timeout ()); + + // Schedule the time between disconnects. This should really be a + // "tunable" parameter. + if (ACE_Reactor::instance ()->schedule_timer + (this, 0, timeout) == -1) + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("%p\n"), + ACE_TEXT ("schedule_timer"))); + + // If there are events left in the queue, make sure we enable the + // <ACE_Reactor> appropriately to get them sent out. + if (this->msg_queue ()->is_empty () == 0 + && ACE_Reactor::instance ()->schedule_wakeup + (this, ACE_Event_Handler::WRITE_MASK) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("%p\n"), + ACE_TEXT ("schedule_wakeup")), + -1); + + // First action is to wait to be notified of our connection id. + this->do_action_ = &Peer_Handler::await_connection_id; + return 0; +} + +int +Peer_Handler::transmit (ACE_Message_Block *mb, + size_t n, + int event_type) +{ + Event *event = (Event *) mb->rd_ptr (); + + // Initialize the header. + new (&event->header_) Event_Header (n, + this->connection_id_, + event_type, + 0); + + // Convert all the fields into network byte order. + event->header_.encode (); + + // Move the write pointer to the end of the event. + mb->wr_ptr (sizeof (Event_Header) + n); + + if (this->put (mb) == -1) + { + if (errno == EWOULDBLOCK) // The queue has filled up! + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("%p\n"), + ACE_TEXT ("gateway is flow controlled, so we're dropping events"))); + else + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("%p\n"), + ACE_TEXT ("transmission failure in transmit()"))); // Function name fixed. + // Caller is responsible for freeing a ACE_Message_Block + // if failures occur. + mb->release (); + return -1; + } + return 0; +} + +// Read events from stdin and send them to the gatewayd. + +int +Peer_Handler::transmit_stdin (void) +{ + // If return value is -1, then first_time_ must be reset to 1. + int result = 0; + if (this->connection_id_ != -1) + { + ACE_Message_Block *mb; + + ACE_NEW_RETURN (mb, + ACE_Message_Block (sizeof (Event)), + -1); + + // Cast the message block payload into an <Event> pointer. + Event *event = (Event *) mb->rd_ptr (); + + ssize_t n = ACE_OS::read (ACE_STDIN, + event->data_, + sizeof event->data_); + switch (n) + { + case 0: + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("stdin closing down\n"))); + + // Take stdin out of the ACE_Reactor so we stop trying to + // send events. + ACE_Reactor::instance ()->remove_handler + (ACE_STDIN, + ACE_Event_Handler::DONT_CALL | ACE_Event_Handler::READ_MASK); + mb->release (); + result = 0; // + break; + /* NOTREACHED */ + case -1: + mb->release (); + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("%p\n"), + ACE_TEXT ("read"))); + result = 0; // + break; + /* NOTREACHED */ + default: + // Do not return directly, save the return value. + result = this->transmit (mb, n, ROUTING_EVENT); + break; + /* NOTREACHED */ + } + + // Do not return at here, but at exit of function. + /*return 0;*/ + } + else + { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("Must transmit over an opened channel.\n"))); + result = -1; // Save return value at here, return at exit of function. + } + // If transmit error, the stdin-thread will be cancelled, so should + // reset first_time_ to 1, which will register_stdin_handler again. + if (result == -1) + first_time_ = 1; + + return result; +} + +// Perform a non-blocking <put> of event MB. If we are unable to send +// the entire event the remainder is re-queue'd at the *front* of the +// Message_Queue. + +int +Peer_Handler::nonblk_put (ACE_Message_Block *mb) +{ + // Try to send the event. If we don't send it all (e.g., due to + // flow control), then re-queue the remainder at the head of the + // <ACE_Message_Queue> and ask the <ACE_Reactor> to inform us (via + // <handle_output>) when it is possible to try again. + + ssize_t n = this->send (mb); + + if (n == -1) + // -1 is returned only when things have really gone wrong (i.e., + // not when flow control occurs). + return -1; + else if (errno == EWOULDBLOCK) + { + // We didn't manage to send everything, so requeue. + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("queueing activated on handle %d to connection id %d\n"), + this->get_handle (), + this->connection_id_)); + + // Re-queue in *front* of the list to preserve order. + if (this->msg_queue ()->enqueue_head + (mb, + (ACE_Time_Value *) &ACE_Time_Value::zero) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("%p\n"), + ACE_TEXT ("enqueue_head")), + -1); + // Tell ACE_Reactor to call us back when we can send again. + if (ACE_Reactor::instance ()->schedule_wakeup + (this, ACE_Event_Handler::WRITE_MASK) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("%p\n"), + ACE_TEXT ("schedule_wakeup")), + -1); + return 0; + } + else + return n; +} + +// Finish sending a event when flow control conditions abate. This +// method is automatically called by the ACE_Reactor. + +int +Peer_Handler::handle_output (ACE_HANDLE) +{ + ACE_Message_Block *mb = 0; + + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("in handle_output\n"))); + + if (this->msg_queue ()->dequeue_head + (mb, + (ACE_Time_Value *) &ACE_Time_Value::zero) != -1) + { + switch (this->nonblk_put (mb)) + { + case 0: // Partial send. + ACE_ASSERT (errno == EWOULDBLOCK); + // Didn't write everything this time, come back later... + break; + /* NOTREACHED */ + case -1: + // Caller is responsible for freeing a ACE_Message_Block if + // failures occur. + mb->release (); + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("%p\n"), + ACE_TEXT ("transmission failure in handle_output"))); + /* FALLTHROUGH */ + default: // Sent the whole thing. + // If we succeed in writing the entire event (or we did not + // fail due to EWOULDBLOCK) then check if there are more + // events on the <ACE_Message_Queue>. If there aren't, tell + // the <ACE_Reactor> not to notify us anymore (at least + // until there are new events queued up). + + if (this->msg_queue ()->is_empty ()) + { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("queue now empty on handle %d to connection id %d\n"), + this->get_handle (), + this->connection_id_)); + + if (ACE_Reactor::instance ()->cancel_wakeup + (this, ACE_Event_Handler::WRITE_MASK) == -1) + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("%p\n"), + ACE_TEXT ("cancel_wakeup"))); + } + } + return 0; + } + else + // If the list is empty there's a bug! + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("%p\n"), + ACE_TEXT ("dequeue_head")), + 0); +} + +// Send an event to a peer (may block if necessary). + +int +Peer_Handler::put (ACE_Message_Block *mb, ACE_Time_Value *) +{ + if (this->msg_queue ()->is_empty ()) + // Try to send the event *without* blocking! + return this->nonblk_put (mb); + else + // If we have queued up events due to flow control then just + // enqueue and return. + return this->msg_queue ()->enqueue_tail + (mb, (ACE_Time_Value *) &ACE_Time_Value::zero); +} + +// Send an Peer event to gatewayd. + +int +Peer_Handler::send (ACE_Message_Block *mb) +{ + size_t len = mb->length (); + + ssize_t n = this->peer ().send (mb->rd_ptr (), len); + + if (n <= 0) + return errno == EWOULDBLOCK ? 0 : n; + else if (n < (ssize_t) len) + { + // Re-adjust pointer to skip over the part we did send. + mb->rd_ptr (n); + this->total_bytes_ += n; + } + else // if (n == length). + { + // The whole event is sent, we can now safely deallocate the + // buffer. Note that this should decrement a reference count... + this->total_bytes_ += n; + mb->release (); + errno = 0; + } + + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("sent %d bytes, total bytes sent = %d\n"), + n, + this->total_bytes_)); + return n; +} + +// Receive an Event from gatewayd. Handles fragmentation. + +int +Peer_Handler::recv (ACE_Message_Block *&mb) +{ + if (this->msg_frag_ == 0) + // No existing fragment... + ACE_NEW_RETURN (this->msg_frag_, + ACE_Message_Block (sizeof (Event)), + -1); + + Event *event = (Event *) this->msg_frag_->rd_ptr (); + ssize_t header_received = 0; + + const size_t HEADER_SIZE = sizeof (Event_Header); + ssize_t header_bytes_left_to_read = + HEADER_SIZE - this->msg_frag_->length (); + + if (header_bytes_left_to_read > 0) + { + header_received = this->peer ().recv + (this->msg_frag_->wr_ptr (), + header_bytes_left_to_read); + + if (header_received == -1 /* error */ + || header_received == 0 /* EOF */) + { + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("%p\n"), + ACE_TEXT ("Recv error during header read"))); + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("attempted to read %d bytes\n"), + header_bytes_left_to_read)); + this->msg_frag_ = this->msg_frag_->release (); + return header_received; + } + + // Bump the write pointer by the amount read. + this->msg_frag_->wr_ptr (header_received); + + // At this point we may or may not have the ENTIRE header. + if (this->msg_frag_->length () < HEADER_SIZE) + { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("Partial header received: only %d bytes\n"), + this->msg_frag_->length ())); + // Notify the caller that we didn't get an entire event. + errno = EWOULDBLOCK; + return -1; + } + + // Convert the header into host byte order so that we can access + // it directly without having to repeatedly muck with it... + event->header_.decode (); + + if (event->header_.len_ > ACE_INT32 (sizeof event->data_)) + { + // This data_ payload is too big! + errno = EINVAL; + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("Data payload is too big (%d bytes)\n"), + event->header_.len_)); + return -1; + } + } + + // At this point there is a complete, valid header in Event. Now we + // need to get the event payload. Due to incomplete reads this may + // not be the first time we've read in a fragment for this message. + // We account for this here. Note that the first time in here + // <msg_frag_->wr_ptr> will point to <event->data_>. Every time we + // do a successful fragment read, we advance <wr_ptr>. Therefore, + // by subtracting how much we've already read from the + // <event->header_.len_> we complete the + // <data_bytes_left_to_read>... + + ssize_t data_bytes_left_to_read = + ssize_t (event->header_.len_ - (msg_frag_->wr_ptr () - event->data_)); + + // peer().recv() should not be called when data_bytes_left_to_read is 0. + ssize_t data_received = !data_bytes_left_to_read ? 0 : + this->peer ().recv (this->msg_frag_->wr_ptr (), + data_bytes_left_to_read); + + // Try to receive the remainder of the event. + + switch (data_received) + { + case -1: + if (errno == EWOULDBLOCK) + // This might happen if only the header came through. + return -1; + else + /* FALLTHROUGH */; + + case 0: // Premature EOF. + if (data_bytes_left_to_read) + { + this->msg_frag_ = this->msg_frag_->release (); + return 0; + } + /* FALLTHROUGH */; + + default: + // Set the write pointer at 1 past the end of the event. + this->msg_frag_->wr_ptr (data_received); + + if (data_received != data_bytes_left_to_read) + { + errno = EWOULDBLOCK; + // Inform caller that we didn't get the whole event. + return -1; + } + else + { + // Set the read pointer to the beginning of the event. + this->msg_frag_->rd_ptr (this->msg_frag_->base ()); + + mb = this->msg_frag_; + + // Reset the pointer to indicate we've got an entire event. + this->msg_frag_ = 0; + } + + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("(%t) connection id = %d, cur len = %d, total bytes read = %d\n"), + event->header_.connection_id_, + event->header_.len_, + data_received + header_received)); + if (Options::instance ()->enabled (Options::VERBOSE)) + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("data_ = %*s\n"), + event->header_.len_ - 2, + event->data_)); + return data_received + header_received; + } +} + +// Receive various types of input (e.g., Peer event from the gatewayd, +// as well as stdio). + +int +Peer_Handler::handle_input (ACE_HANDLE sd) +{ + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("in handle_input, sd = %d\n"), + sd)); + if (sd == ACE_STDIN) // Handle event from stdin. + return this->transmit_stdin (); + else + // Perform the appropriate action depending on the state we are + // in. + return (this->*do_action_) (); +} + +// Action that receives our connection id from the Gateway. + +int +Peer_Handler::await_connection_id (void) +{ + ssize_t n = this->peer ().recv (&this->connection_id_, + sizeof this->connection_id_); + + if (n != sizeof this->connection_id_) + { + if (n == 0) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("gatewayd has closed down unexpectedly\n")), + -1); + else + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("%p, bytes received on handle %d = %d\n"), + ACE_TEXT ("recv"), + this->get_handle (), + n), + -1); + } + else + { + this->connection_id_ = ntohl (this->connection_id_); + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("assigned connection id %d\n"), + this->connection_id_)); + } + + // Subscribe for events if we're a Consumer. + if (Options::instance ()->enabled (Options::CONSUMER_CONNECTOR)) + this->subscribe (); + + // No need to disconnect by timeout. + ACE_Reactor::instance ()->cancel_timer(this); + // Transition to the action that waits for Peer events. + this->do_action_ = &Peer_Handler::await_events; + + // Reset standard input. + ACE_OS::rewind (stdin); + + // Call register_stdin_handler only once, until the stdin-thread + // closed which caused by transmit_stdin error. + if (first_time_) + { + // Register this handler to receive test events on stdin. + if (ACE_Event_Handler::register_stdin_handler + (this, + ACE_Reactor::instance (), + ACE_Thread_Manager::instance ()) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("(%t) %p\n"), + ACE_TEXT ("register_stdin_handler")), + -1); + + // Next time in await_connection_id(), I'll don't call + // register_stdin_handler(). + first_time_ = 0; + } + return 0; +} + +int +Peer_Handler::subscribe (void) +{ + ACE_Message_Block *mb; + + ACE_NEW_RETURN (mb, + ACE_Message_Block (sizeof (Event)), + -1); + + Subscription *subscription = + (Subscription *) ((Event *) mb->rd_ptr ())->data_; + subscription->connection_id_ = + Options::instance ()->connection_id (); + + return this->transmit (mb, sizeof *subscription, SUBSCRIPTION_EVENT); +} + +// Action that receives events from the Gateway. + +int +Peer_Handler::await_events (void) +{ + ACE_Message_Block *mb = 0; + + ssize_t n = this->recv (mb); + + switch (n) + { + case 0: + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("gatewayd has closed down\n")), + -1); + /* NOTREACHED */ + case -1: + if (errno == EWOULDBLOCK) + // A short-read, we'll come back and finish it up later on! + return 0; + else + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("%p\n"), + ACE_TEXT ("recv")), + -1); + /* NOTREACHED */ + default: + { + // We got a valid event, so let's process it now! At the + // moment, we just print out the event contents... + + Event *event = (Event *) mb->rd_ptr (); + this->total_bytes_ += mb->length (); + + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("route id = %d, cur len = %d, total len = %d\n"), + event->header_.connection_id_, + event->header_.len_, + this->total_bytes_)); + if (Options::instance ()->enabled (Options::VERBOSE)) + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("data_ = %*s\n"), + event->header_.len_ - 2, + event->data_)); + mb->release (); + return 0; + } + } +} + +// Periodically send events via ACE_Reactor timer mechanism. + +int +Peer_Handler::handle_timeout (const ACE_Time_Value &, + const void *) +{ + // Shut down the handler. + return this->handle_close (); +} + +Peer_Handler::~Peer_Handler (void) +{ + // Shut down the handler. + this->handle_close (); +} + +// Handle shutdown of the Peer object. + +int +Peer_Handler::handle_close (ACE_HANDLE, + ACE_Reactor_Mask) +{ + if (this->get_handle () != ACE_INVALID_HANDLE) + { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("shutting down Peer on handle %d\n"), + this->get_handle ())); + + ACE_Reactor_Mask mask = + ACE_Event_Handler::DONT_CALL | ACE_Event_Handler::READ_MASK; + + // Explicitly remove ourselves for ACE_STDIN (the <ACE_Reactor> + // removes the HANDLE. Note that <ACE_Event_Handler::DONT_CALL> + // instructs the ACE_Reactor *not* to call <handle_close>, which + // would otherwise lead to infinite recursion!). + ACE_Reactor::instance ()->remove_handler + (ACE_STDIN, mask); + + // Deregister this handler with the ACE_Reactor. + if (ACE_Reactor::instance ()->remove_handler + (this, mask) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("handle = %d: %p\n"), + this->get_handle (), + ACE_TEXT ("remove_handler")), + -1); + // Close down the peer. + this->peer ().close (); + } + return 0; +} + +int +Peer_Acceptor::start (u_short port) +{ + // This object only gets allocated once and is just recycled + // forever. + ACE_NEW_RETURN (peer_handler_, Peer_Handler, -1); + + this->addr_.set (port); + + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("opening acceptor at port %d\n"), + port)); + + // Call down to the <Acceptor::open> method. + if (this->inherited::open (this->addr_) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("%p\n"), + ACE_TEXT ("open")), + -1); + else if (this->acceptor ().get_local_addr (this->addr_) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("%p\n"), + ACE_TEXT ("get_local_addr")), + -1); + else + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("accepting at port %d\n"), + this->addr_.get_port_number ())); + return 0; +} + +Peer_Acceptor::Peer_Acceptor (void) + : peer_handler_ (0) +{ +} + +int +Peer_Acceptor::close (void) +{ + // Will trigger a delete. + if (this->peer_handler_ != 0) + this->peer_handler_->destroy (); + + // Close down the base class. + return this->inherited::close (); +} + +// Note how this method just passes back the pre-allocated +// <Peer_Handler> instead of having the <ACE_Acceptor> allocate a new +// one each time! + +int +Peer_Acceptor::make_svc_handler (Peer_Handler *&sh) +{ + sh = this->peer_handler_; + return 0; +} + +int +Peer_Connector::open_connector (Peer_Handler *&peer_handler, + u_short port) +{ + // This object only gets allocated once and is just recycled + // forever. + ACE_NEW_RETURN (peer_handler, + Peer_Handler, + -1); + + ACE_INET_Addr addr (port, + Options::instance ()->connector_host ()); + + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("connecting to %s:%d\n"), + addr.get_host_name (), + addr.get_port_number ())); + + if (this->connect (peer_handler, addr) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("%p\n"), + ACE_TEXT ("connect")), + -1); + else + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("connected to %C:%d\n"), + addr.get_host_name (), + addr.get_port_number ())); + return 0; +} + +int +Peer_Connector::open (ACE_Reactor *, int) +{ + this->supplier_peer_handler_ = 0; + this->consumer_peer_handler_ = 0; + + if (Options::instance ()->enabled (Options::SUPPLIER_CONNECTOR) + && this->open_connector (this->supplier_peer_handler_, + Options::instance ()->supplier_connector_port ()) == -1) + return -1; + + if (Options::instance ()->enabled (Options::CONSUMER_CONNECTOR) + && this->open_connector (this->consumer_peer_handler_, + Options::instance ()->consumer_connector_port ()) == -1) + return -1; + + return 0; +} + +int +Peer_Factory::handle_signal (int signum, siginfo_t *, ucontext_t *) +{ + if (signum != SIGPIPE) + { + // Shut down the main event loop. + ACE_DEBUG((LM_NOTICE, ACE_TEXT ("Exit case signal\n"))); // Why do I exit? + ACE_Reactor::instance ()->end_reactor_event_loop(); + } + + return 0; +} + +// Returns information on the currently active service. + +int +Peer_Factory::info (ACE_TCHAR **strp, size_t length) const +{ + ACE_TCHAR buf[BUFSIZ]; + ACE_TCHAR consumer_addr_str[BUFSIZ]; + ACE_TCHAR supplier_addr_str[BUFSIZ]; + + ACE_INET_Addr addr; + + if (this->consumer_acceptor_.acceptor ().get_local_addr (addr) == -1) + return -1; + else if (addr.addr_to_string (consumer_addr_str, + sizeof addr) == -1) + return -1; + else if (this->supplier_acceptor_.acceptor ().get_local_addr (addr) == -1) + return -1; + else if (addr.addr_to_string (supplier_addr_str, + sizeof addr) == -1) + return -1; + + ACE_OS::strcpy (buf, ACE_TEXT ("peerd\t C:")); + ACE_OS::strcat (buf, consumer_addr_str); + ACE_OS::strcat (buf, ACE_TEXT ("|S:")); + ACE_OS::strcat (buf, supplier_addr_str); + ACE_OS::strcat + (buf, ACE_TEXT ("/tcp # Gateway traffic generator and data sink\n")); + + if (*strp == 0 && (*strp = ACE_OS::strdup (buf)) == 0) + return -1; + else + ACE_OS::strncpy (*strp, buf, length); + return ACE_OS::strlen (buf); +} + +// Hook called by the explicit dynamic linking facility to terminate +// the peer. + +int +Peer_Factory::fini (void) +{ + this->consumer_acceptor_.close (); + this->supplier_acceptor_.close (); + return 0; +} + +// Hook called by the explicit dynamic linking facility to initialize +// the peer. + +int +Peer_Factory::init (int argc, ACE_TCHAR *argv[]) +{ + Options::instance ()->parse_args (argc, argv); + + ACE_Sig_Set sig_set; + + sig_set.sig_add (SIGINT); + sig_set.sig_add (SIGQUIT); + sig_set.sig_add (SIGPIPE); + + // Register ourselves to receive signals so we can shut down + // gracefully. + + if (ACE_Reactor::instance ()->register_handler (sig_set, + this) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("%p\n"), + ACE_TEXT ("register_handler")), + -1); + + if (Options::instance ()->enabled (Options::SUPPLIER_ACCEPTOR) + && this->supplier_acceptor_.start + (Options::instance ()->supplier_acceptor_port ()) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("%p\n"), + ACE_TEXT ("Acceptor::open")), + -1); + else if (Options::instance ()->enabled (Options::CONSUMER_ACCEPTOR) + && this->consumer_acceptor_.start + (Options::instance ()->consumer_acceptor_port ()) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("%p\n"), + ACE_TEXT ("Acceptor::open")), + -1); + else if (this->connector_.open () == -1) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("%p\n"), + ACE_TEXT ("Connector::open")), + -1); + return 0; +} + +// The following is a "Factory" used by the <ACE_Service_Config> and +// svc.conf file to dynamically initialize the <Peer_Acceptor> and +// <Peer_Connector>. + +ACE_SVC_FACTORY_DEFINE (Peer_Factory) + diff --git a/ACE/apps/Gateway/Peer/Peer.h b/ACE/apps/Gateway/Peer/Peer.h new file mode 100644 index 00000000000..0723882a243 --- /dev/null +++ b/ACE/apps/Gateway/Peer/Peer.h @@ -0,0 +1,257 @@ +/* -*- C++ -*- */ +// $Id$ + +// ============================================================================ +// +// = LIBRARY +// gateway +// +// = FILENAME +// Peer.h +// +// = DESCRIPTION +// These classes process Supplier/Consumer events sent from the +// gateway (gatewayd) to its various peers (peerd). The general +// collaboration works as follows: +// +// 1. <Peer_Acceptor> creates a listener endpoint and waits +// passively for gatewayd to connect with it. +// +// 2. When a gatewayd connects, <Peer_Acceptor> creates an +// <Peer_Handler> object that sends/receives events from +// gatewayd on that connection. +// +// 3. The <Peer_Handler> waits for gatewayd to inform it of its +// connection ID, which is prepended to all subsequent outgoing +// events sent from peerd. +// +// 4. Once the connection ID is set, peerd periodically sends events +// to gatewayd. Peerd also receives and "processes" events +// forwarded to it from gatewayd. In this program, peerd +// "processes" the events sent to it by writing them to stdout. +// +// Note that in the current peerd implementation, one Peer process +// cannot serve as both a Consumer and Supplier of Events. This is +// because the gatewayd establishes a separate connection for +// Suppliers and Consumers and the peerd only maintains a single +// <Peer_Handler> object to handle this one connection. Enhancing +// this implementation to be both a Consumer and Supplier +// simultaneously is straightforward, however. In addition, +// multiple peerd processes can already work together to play these +// different roles. +// +// = AUTHOR +// Douglas C. Schmidt +// +// ============================================================================ + +#ifndef PEER_H +#define PEER_H + +#include "ace/Service_Config.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +#include "ace/Acceptor.h" +#include "ace/SOCK_Acceptor.h" +#include "ace/SOCK_Connector.h" +#include "ace/Svc_Handler.h" +#include "ace/Connector.h" +#include "ace/Null_Condition.h" +#include "ace/Null_Mutex.h" +#include "Options.h" + +ACE_SVC_FACTORY_DECLARE (Peer_Factory) + +#if defined ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION_EXPORT +template class ACE_Svc_Export ACE_Svc_Handler<ACE_SOCK_STREAM, ACE_NULL_SYNCH>; +#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION_EXPORT */ + +class ACE_Svc_Export Peer_Handler : public ACE_Svc_Handler<ACE_SOCK_STREAM, ACE_NULL_SYNCH> +{ + // = TITLE + // Handle Peer events arriving from a Gateway. +public: + // = Initialization and termination methods. + Peer_Handler (void); + // Initialize the peer. + + ~Peer_Handler (void); + // Shutdown the Peer. + + virtual int open (void * = 0); + // Initialize the handler when called by + // <ACE_Acceptor::handle_input>. + + virtual int handle_input (ACE_HANDLE); + // Receive and process peer events. + + virtual int put (ACE_Message_Block *, ACE_Time_Value *tv = 0); + // Send a event to a gateway (may be queued if necessary due to flow + // control). + + virtual int handle_output (ACE_HANDLE); + // Finish sending a event when flow control conditions abate. + + virtual int handle_timeout (const ACE_Time_Value &, + const void *arg); + // Periodically send events via <ACE_Reactor> timer mechanism. + + virtual int handle_close (ACE_HANDLE = ACE_INVALID_HANDLE, + ACE_Reactor_Mask = ACE_Event_Handler::ALL_EVENTS_MASK); + // Perform object termination. + +protected: + typedef ACE_Svc_Handler<ACE_SOCK_STREAM, ACE_NULL_SYNCH> inherited; + + int transmit (ACE_Message_Block *mb, + size_t n, + int event_type); + // Transmit <mb> to the gatewayd. + + virtual int recv (ACE_Message_Block *&mb); + // Receive an Peer event from gatewayd. + + virtual int send (ACE_Message_Block *mb); + // Send an Peer event to gatewayd, using <nonblk_put>. + + virtual int nonblk_put (ACE_Message_Block *mb); + // Perform a non-blocking <put>, which tries to send an event to the + // gatewayd, but only if it isn't flow controlled. + + int subscribe (void); + // Register Consumer subscriptions with the gateway. + + // = Event/state/action handlers. + int transmit_stdin (void); + // Receive a event from stdin and send it to the gateway. + + int await_connection_id (void); + // Action that receives the route id. + + int await_events (void); + // Action that receives events. + + int (Peer_Handler::*do_action_)(void); + // Pointer-to-member-function for the current action to run in this + // state. This points to one of the preceding 3 methods. + + CONNECTION_ID connection_id_; + // Connection ID of the peer, which is obtained from the gatewayd. + + ACE_Message_Block *msg_frag_; + // Keep track of event fragments that arrive in non-blocking recv's + // from the gatewayd. + + size_t total_bytes_; + // The total number of bytes sent/received to the gatewayd thus far. + + int first_time_; + // Used to call register_stdin_handle only once. Otherwise, thread + // leak will occur on Win32. +}; + +#if defined ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION_EXPORT +template class ACE_Svc_Export ACE_Acceptor<Peer_Handler, ACE_SOCK_ACCEPTOR>; +#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION_EXPORT */ + +class ACE_Svc_Export Peer_Acceptor : public ACE_Acceptor<Peer_Handler, ACE_SOCK_ACCEPTOR> +{ + // = TITLE + // Passively accept connections from gatewayd and dynamically + // create a new <Peer_Handler> object to communicate with the + // gatewayd. +public: + // = Initialization and termination methods. + Peer_Acceptor (void); + // Default initialization. + + int start (u_short); + // the <Peer_Acceptor>. + + int close (void); + // Terminate the <Peer_Acceptor>. + + virtual int make_svc_handler (Peer_Handler *&); + // Factory method that creates a <Peer_Handler> just once. + +private: + int open_acceptor (u_short port); + // Factor out common code for initializing the <Peer_Acceptor>. + + Peer_Handler *peer_handler_; + // Pointer to <Peer_Handler> allocated just once. + + ACE_INET_Addr addr_; + // Our acceptor addr. + + typedef ACE_Acceptor<Peer_Handler, ACE_SOCK_ACCEPTOR> inherited; +}; + +#if defined ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION_EXPORT +template class ACE_Svc_Export ACE_Connector<Peer_Handler, ACE_SOCK_CONNECTOR>; +#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION_EXPORT */ + +class ACE_Svc_Export Peer_Connector : public ACE_Connector<Peer_Handler, ACE_SOCK_CONNECTOR> +{ + // = TITLE + // Actively establish connections with gatewayd and dynamically + // create a new <Peer_Handler> object to communicate with the + // gatewayd. +public: + // = Initialization method. + int open (ACE_Reactor * = 0, int = 0); + // Initialize the <Peer_Connector>. NOTE: the arguments are + // ignored. They are only provided to avoid a compiler warning + // about hiding the virtual function ACE_Connector<Peer_Handler, + // ACE_SOCK_CONNECTOR>::open(ACE_Reactor*, int). + +private: + int open_connector (Peer_Handler *&ph, u_short port); + // Factor out common code for initializing the <Peer_Connector>. + + Peer_Handler *consumer_peer_handler_; + // Consumer <Peer_Handler> that is connected to a gatewayd. + + Peer_Handler *supplier_peer_handler_; + // Supplier <Peer_Handler> that is connected to a gatewayd. +}; + +class ACE_Svc_Export Peer_Factory : public ACE_Service_Object +{ + // = TITLE + // A factory class that actively and/or passively establishes + // connections with the gatewayd. +public: + // = Dynamic initialization and termination hooks from <ACE_Service_Object>. + + virtual int init (int argc, ACE_TCHAR *argv[]); + // Initialize the acceptor and connector. + + virtual int fini (void); + // Perform termination activities. + + virtual int info (ACE_TCHAR **, size_t) const; + // Return info about this service. + + virtual int handle_signal (int signum, siginfo_t *, ucontext_t *); + // Handle various signals (e.g., SIGPIPE, SIGINT, and SIGQUIT). + +private: + Peer_Acceptor consumer_acceptor_; + // Pointer to an instance of our <Peer_Acceptor> that's used to + // accept connections and create Consumers. + + Peer_Acceptor supplier_acceptor_; + // Pointer to an instance of our <Peer_Acceptor> that's used to + // accept connections and create Suppliers. + + Peer_Connector connector_; + // An instance of our <Peer_Connector>. Note that one + // <Peer_Connector> is used to establish <Peer_Handler>s for both + // Consumers and Suppliers. +}; + +#endif /* PEER_H */ diff --git a/ACE/apps/Gateway/Peer/peer.mpc b/ACE/apps/Gateway/Peer/peer.mpc new file mode 100644 index 00000000000..7e2e24ba4aa --- /dev/null +++ b/ACE/apps/Gateway/Peer/peer.mpc @@ -0,0 +1,23 @@ +// -*- MPC -*- +// $Id$ + +project(Gateway_Peer) : acelib { + sharedname = Gateway_Peer + Source_Files { + Options.cpp + Peer.cpp + } + Documentation_Files { + svc.conf + } +} + +project(gateway_peerd) : aceexe { + exename = peerd + after += Gateway_Peer + libs += Gateway_Peer + + Source_Files { + peerd.cpp + } +}
\ No newline at end of file diff --git a/ACE/apps/Gateway/Peer/peerd.cpp b/ACE/apps/Gateway/Peer/peerd.cpp new file mode 100644 index 00000000000..f4abd599322 --- /dev/null +++ b/ACE/apps/Gateway/Peer/peerd.cpp @@ -0,0 +1,63 @@ +// $Id$ + +// ============================================================================ +// +// = LIBRARY +// gateway +// +// = FILENAME +// peerd.h +// +// = DESCRIPTION +// Driver for the peer daemon (peerd). Note that this is +// completely generic code due to the Service Configurator +// framework! +// +// = AUTHOR +// Douglas C. Schmidt +// +// ============================================================================ + +#include "ace/OS_NS_unistd.h" +#include "Peer.h" + +ACE_RCSID(Peer, peerd, "$Id$") + +int +ACE_TMAIN (int argc, ACE_TCHAR *argv[]) +{ + if (ACE_OS::access (ACE_DEFAULT_SVC_CONF, F_OK) != 0) + { + // Use static linking. + ACE_Service_Object_Ptr sp = ACE_SVC_INVOKE (Peer_Factory); + + if (sp->init (argc - 1, argv + 1) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("%p\n"), + ACE_TEXT ("init")), + 1); + + // Run forever, performing the configured services until we are + // shut down by a SIGINT/SIGQUIT signal. + + ACE_Reactor::instance ()->run_reactor_event_loop (); + + // Destructor of <ACE_Service_Object_Ptr> automagically call + // <fini>. + } + else + { + if (ACE_Service_Config::open (argc, argv) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("%p\n"), + ACE_TEXT ("open")), + 1); + else // Use dynamic linking. + + // Run forever, performing the configured services until we + // are shut down by a signal (e.g., SIGINT or SIGQUIT). + + ACE_Reactor::instance ()->run_reactor_event_loop (); + } + return 0; +} diff --git a/ACE/apps/Gateway/Peer/svc.conf b/ACE/apps/Gateway/Peer/svc.conf new file mode 100644 index 00000000000..707f457da2f --- /dev/null +++ b/ACE/apps/Gateway/Peer/svc.conf @@ -0,0 +1,2 @@ +#static Svc_Manager "-d -p 291" +dynamic Peer1 Service_Object * Gateway_Peer:_make_Peer_Factory() active "-a C|S" diff --git a/ACE/apps/Gateway/README b/ACE/apps/Gateway/README new file mode 100644 index 00000000000..7861e1051e9 --- /dev/null +++ b/ACE/apps/Gateway/README @@ -0,0 +1,136 @@ +OVERVIEW + +This directory contains source code for an application-level +Communication Gateway implemented with ACE. This prototype was +developed in my cs422 OS class at Washington University in 1994. The +Gateway has recently been updated to illustrate the use of Event +Channels, which forward events from Suppliers to Consumers in a +distributed system. + +You can get a paper that explains the patterns used in this +implementation at the following WWW URL: + +http://www.cs.wustl.edu/~schmidt/PDF/TAPOS-00.pdf + +---------------------------------------- + +DIRECTORY STRUCTURE + +There are 2 directories: + +1. Gateway + + This directory contains the source code for the + application-level Gateway process, gatewayd. The gatewayd + routes event messages between Peers. By default, the gatewayd + plays the Connector role and initializes itself by reading the + connection_config and consumer_config files: + + 1. The connection_config file establishes the "physical + configuration" of the Consumer and Supplier proxies. This + file tells the Gateway what connections to establish with + particular hosts using particular ports. + + 2. The consumer_config file establishes the "logical + configuration." This file tells the Gateway how to forward + data coming from Suppliers to the appropriate Consumers. + + The application Gateway generally should be started after all + the Peers described below, though the process should work + correctly even if it starts first. + +2. Peer + + This directory contains the source code for the Peer process, + peerd. There are typically many Peers, which act as suppliers + and consumers of event messages that are routed through the + gatewayd. + + To do anything interesting you'll need at least two Peers: one + to supply events and one to consume events. In the + configuration files, these two types of Peers are designated + as follows: + + 1. Supplier Peers (designated by an 'S' in the Gateway's + connection_config configuration file). These Peers are + "suppliers" of events to the Gateway. + + 2. Consumer Peers (designated by an 'C' in the Gateway's + connection_config file). These Peers are "consumers" of + events forwarded by the Gateway. Forwarding is based on + the settings in the consumer_config configuration file. + +---------------------------------------- + +HOW TO RUN THE TESTS + +To run the tests do the following: + +1. Compile everything (i.e., first compile the ACE libraries, then + compile the Gateway and Peer directories). + +2. Edit the consumer_config and connection_config files as discussed + above to indicate the desired physical and logical mappings + for Consumers and Suppliers. + +3. Start up the Peers (peerd). You can start up as many as you + like, as per the connection_config file, but you'll need at least two + (i.e., one Supplier and Consumer). I typically start up each Peer + in a different window on a different machine, but you can run them + on the same machine as long as you pick different port numbers. + The Peers will print out some diagnostic info and then block + awaiting connections from the Gateway. + + If you want to set the port numbers of the Peers from + the command-line do the following: + + a. Change the svc.conf file in the ./Peer/ directory to + another name (e.g., foo.conf). This will keep the + program from starting up with the svc.conf file + (which dynamically links in the Peers and uses the -a option to + set the port). + + b. Then run the peers in different windows as + + # Window 1 (Supplier) + % peerd -a S:10011 + + # Window 2 (Consumer) + % peerd -a C:10012 + + etc. Naturally, you can also edit the svc.conf file, but that + may be more of a pain if you've got a network filesystem and + all your hosts share the same svc.conf file. + +4. Start up the Gateway (gatewayd). This will print out a bunch of + messages as it reads the config files and connects to all the Peers. + By default, the Gateway is purely reactive, i.e., it handles + Consumers and Suppliers in the same thread of control. However, + if you give the '-t OUTPUT_MT' option the Gateway will handle all + Consumers in separate threads. If you give the '-t INPUT_MT' option + the Gateway will handle all Suppliers in separate threads. If you + give the '-t INPUT_MT|OUTPUT_MT' option both Consumers and Suppliers + will be handled in the separate threads. + + Assuming everything works, then all the Peers will be connected. + If some of the Peers aren't set up correctly, or if they aren't + started first, then the Gateway will use an exponential backoff + algorithm to attempt to reestablish those connections. + +5. Once the Gateway has connected with all the Peers you can send + events from Supplier Peers by typing commands in the Peer window. + This Supplier will be sent to the Gateway, which will forward the + event to all Consumer Peers that have "subscribed" to receive these + events. + + Note that if you type ^C in a Peer window the Peer will shutdown + its handlers and exit. The Gateway will detect this and will start + trying to reestablish the connection using the same exponential + backoff algorithm it used for the initial connection establishment. + +7. When you want to terminate a Gateway, just type ^C or type any + characters in the ./gatewayd window and the process will shut down + gracefully. + + + |