diff options
author | schmidt <douglascraigschmidt@users.noreply.github.com> | 1998-01-11 06:35:40 +0000 |
---|---|---|
committer | schmidt <douglascraigschmidt@users.noreply.github.com> | 1998-01-11 06:35:40 +0000 |
commit | 19271220140a7e0a9e8b0a6e024f50245b4dbdb8 (patch) | |
tree | 8badff2cf8786f3a791570623ff0b87967caa147 /apps/Gateway | |
parent | eaa8071f2d32c071a8c8259928c8dc48402d8156 (diff) | |
download | ATCD-19271220140a7e0a9e8b0a6e024f50245b4dbdb8.tar.gz |
*** empty log message ***
Diffstat (limited to 'apps/Gateway')
-rw-r--r-- | apps/Gateway/Gateway/Concrete_Connection_Handlers.cpp | 41 | ||||
-rw-r--r-- | apps/Gateway/Gateway/Concrete_Connection_Handlers.h | 7 | ||||
-rw-r--r-- | apps/Gateway/Gateway/Config_Files.h | 13 | ||||
-rw-r--r-- | apps/Gateway/Gateway/Connection_Handler.cpp | 52 | ||||
-rw-r--r-- | apps/Gateway/Gateway/Connection_Handler.h | 35 | ||||
-rw-r--r-- | apps/Gateway/Gateway/Connection_Handler_Acceptor.cpp | 31 | ||||
-rw-r--r-- | apps/Gateway/Gateway/Connection_Handler_Acceptor.h | 13 | ||||
-rw-r--r-- | apps/Gateway/Gateway/Event.h | 119 | ||||
-rw-r--r-- | apps/Gateway/Gateway/Event_Channel.cpp | 124 | ||||
-rw-r--r-- | apps/Gateway/Gateway/Event_Channel.h | 14 | ||||
-rw-r--r-- | apps/Gateway/Gateway/Gateway.cpp | 42 | ||||
-rw-r--r-- | apps/Gateway/Gateway/Options.cpp | 9 | ||||
-rw-r--r-- | apps/Gateway/Gateway/Options.h | 6 | ||||
-rw-r--r-- | apps/Gateway/Gateway/connection_config | 7 | ||||
-rw-r--r-- | apps/Gateway/Gateway/svc.conf | 2 | ||||
-rw-r--r-- | apps/Gateway/Peer/Options.cpp | 15 | ||||
-rw-r--r-- | apps/Gateway/Peer/Options.h | 6 | ||||
-rw-r--r-- | apps/Gateway/Peer/Peer.cpp | 97 | ||||
-rw-r--r-- | apps/Gateway/Peer/Peer.h | 14 |
19 files changed, 483 insertions, 164 deletions
diff --git a/apps/Gateway/Gateway/Concrete_Connection_Handlers.cpp b/apps/Gateway/Gateway/Concrete_Connection_Handlers.cpp index 02ff3e3dec8..86d73d3e9b2 100644 --- a/apps/Gateway/Gateway/Concrete_Connection_Handlers.cpp +++ b/apps/Gateway/Gateway/Concrete_Connection_Handlers.cpp @@ -71,18 +71,24 @@ Consumer_Handler::nonblk_put (ACE_Message_Block *event) { ACE_DEBUG ((LM_DEBUG, "(%t) queueing activated on handle %d to routing id %d\n", - this->get_handle (), + this->get_handle (), this->connection_id ())); // ACE_Queue in *front* of the list to preserve order. if (this->msg_queue ()->enqueue_head (event, (ACE_Time_Value *) &ACE_Time_Value::zero) == -1) - ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "enqueue_head"), -1); + ACE_ERROR_RETURN ((LM_ERROR, + "(%t) %p\n", + "enqueue_head"), + -1); // Tell ACE_Reactor to call us back when we can send again. else if (ACE_Reactor::instance ()->schedule_wakeup (this, ACE_Event_Handler::WRITE_MASK) == -1) - ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "schedule_wakeup"), -1); + ACE_ERROR_RETURN ((LM_ERROR, + "(%t) %p\n", + "schedule_wakeup"), + -1); return 0; } else @@ -144,7 +150,9 @@ Consumer_Handler::handle_output (ACE_HANDLE) // We are responsible for releasing an ACE_Message_Block if // failures occur. event->release (); - ACE_ERROR ((LM_ERROR, "(%t) %p\n", "transmission failure")); + ACE_ERROR ((LM_ERROR, + "(%t) %p\n", + "transmission failure")); /* FALLTHROUGH */ default: // Sent the whole thing. @@ -165,19 +173,24 @@ Consumer_Handler::handle_output (ACE_HANDLE) if (ACE_Reactor::instance ()->cancel_wakeup (this, ACE_Event_Handler::WRITE_MASK) == -1) - ACE_ERROR ((LM_ERROR, "(%t) %p\n", "cancel_wakeup")); + ACE_ERROR ((LM_ERROR, + "(%t) %p\n", + "cancel_wakeup")); } } } else - ACE_ERROR ((LM_ERROR, "(%t) %p\n", "dequeue_head")); + ACE_ERROR ((LM_ERROR, + "(%t) %p\n", + "dequeue_head")); return 0; } // Send an event to a Consumer (may queue if necessary). int -Consumer_Handler::put (ACE_Message_Block *event, ACE_Time_Value *) +Consumer_Handler::put (ACE_Message_Block *event, + ACE_Time_Value *) { if (this->msg_queue ()->is_empty ()) // Try to send the event *without* blocking! @@ -371,9 +384,9 @@ Supplier_Handler::recv (ACE_Message_Block *&forward_addr) int Supplier_Handler::handle_input (ACE_HANDLE) { - ACE_Message_Block *forward_addr = 0; + ACE_Message_Block *event_key = 0; - switch (this->recv (forward_addr)) + switch (this->recv (event_key)) { case 0: // Note that a peer shouldn't initiate a shutdown by closing the @@ -400,17 +413,17 @@ Supplier_Handler::handle_input (ACE_HANDLE) /* NOTREACHED */ default: // Route messages to Consumers. - return this->forward (forward_addr); + return this->process (event_key); } } -// Forward an event to its appropriate Consumer(s). This delegates to -// the <Event_Channel> to do the actual forwarding. +// This delegates to the <Event_Channel> to do the actual processing. +// Typically, this forwards the event to its appropriate Consumer(s). int -Supplier_Handler::forward (ACE_Message_Block *forward_addr) +Supplier_Handler::process (ACE_Message_Block *event_key) { - return this->event_channel_->put (forward_addr); + return this->event_channel_->put (event_key); } Thr_Consumer_Handler::Thr_Consumer_Handler (const Connection_Config_Info &pci) diff --git a/apps/Gateway/Gateway/Concrete_Connection_Handlers.h b/apps/Gateway/Gateway/Concrete_Connection_Handlers.h index a14d97b9d3e..ca36e3f517e 100644 --- a/apps/Gateway/Gateway/Concrete_Connection_Handlers.h +++ b/apps/Gateway/Gateway/Concrete_Connection_Handlers.h @@ -45,9 +45,10 @@ protected: virtual int recv (ACE_Message_Block *&); // Receive an event from a Supplier. - int forward (ACE_Message_Block *event); - // Forward the <event> to its appropriate Consumer. This delegates - // to the <Event_Channel> to do the actual forwarding. + int process (ACE_Message_Block *event); + // This delegates to the <Event_Channel> to do the actual + // processing. Typically, it forwards the <event> to its + // appropriate Consumer. ACE_Message_Block *msg_frag_; // Keep track of event fragment to handle non-blocking recv's from diff --git a/apps/Gateway/Gateway/Config_Files.h b/apps/Gateway/Gateway/Config_Files.h index 1199c615833..1e23006ddbd 100644 --- a/apps/Gateway/Gateway/Config_Files.h +++ b/apps/Gateway/Gateway/Config_Files.h @@ -71,22 +71,17 @@ class Consumer_Config_Info // Stores the information in a Consumer Map entry. { public: - enum - { - MAX_CONSUMERS = 1000 - // Total number of multicast consumers. - }; - ACE_INT32 connection_id_; - // Connection id for this proxy. + // Connection id. ACE_INT32 type_; // Message type. ACE_INT32 consumers_[MAX_CONSUMERS]; - // Connection ids for consumers that we're routing to. + // Connection ids for consumers that will be routed information + // containing this <connection_id_> - int total_consumers_; + ACE_INT32 total_consumers_; // Total number of these consumers. }; diff --git a/apps/Gateway/Gateway/Connection_Handler.cpp b/apps/Gateway/Gateway/Connection_Handler.cpp index 328f9d502c0..7072ce940a3 100644 --- a/apps/Gateway/Gateway/Connection_Handler.cpp +++ b/apps/Gateway/Gateway/Connection_Handler.cpp @@ -4,6 +4,18 @@ #include "Event_Channel.h" #include "Concrete_Connection_Handlers.h" +Event_Channel * +Connection_Handler::event_channel (void) const +{ + return this->event_channel_; +} + +void +Connection_Handler::event_channel (Event_Channel *ec) +{ + this->event_channel_ = ec; +} + void Connection_Handler::connection_id (CONNECTION_ID id) { @@ -11,7 +23,7 @@ Connection_Handler::connection_id (CONNECTION_ID id) } CONNECTION_ID -Connection_Handler::connection_id (void) +Connection_Handler::connection_id (void) const { return this->connection_id_; } @@ -19,7 +31,7 @@ Connection_Handler::connection_id (void) // The total number of bytes sent/received on this Proxy. size_t -Connection_Handler::total_bytes (void) +Connection_Handler::total_bytes (void) const { return this->total_bytes_; } @@ -35,7 +47,7 @@ Connection_Handler::Connection_Handler (void) } Connection_Handler::Connection_Handler (const Connection_Config_Info &pci) - : remote_addr_ (pci.remote_port_, pci.host_), + : remote_addr_ (pci.remote_port_, pci.host_[0] == '\0' ? ACE_DEFAULT_SERVER_HOST : pci.host_), local_addr_ (pci.local_port_), connection_id_ (pci.connection_id_), total_bytes_ (0), @@ -59,7 +71,7 @@ Connection_Handler::connection_role (char d) // Get the connection_role. char -Connection_Handler::connection_role (void) +Connection_Handler::connection_role (void) const { return this->connection_role_; } @@ -102,7 +114,7 @@ Connection_Handler::max_timeout (int mto) // Gets the max timeout delay. int -Connection_Handler::max_timeout (void) +Connection_Handler::max_timeout (void) const { return this->max_timeout_; } @@ -147,6 +159,14 @@ Connection_Handler::state (Connection_Handler::State s) this->state_ = s; } +// Return the current state of the Proxy. + +Connection_Handler::State +Connection_Handler::state (void) const +{ + return this->state_; +} + // Upcall from the <ACE_Acceptor> or <ACE_Connector> that delegates // control to our Connection_Handler. @@ -173,26 +193,30 @@ Connection_Handler::open (void *) return 0; } -// Return the current state of the Proxy. - -Connection_Handler::State -Connection_Handler::state (void) +const ACE_INET_Addr & +Connection_Handler::remote_addr (void) const { - return this->state_; + return this->remote_addr_; } -const ACE_INET_Addr & -Connection_Handler::remote_addr (void) +void +Connection_Handler::remote_addr (ACE_INET_Addr &ra) { - return this->remote_addr_; + this->remote_addr_ = ra; } const ACE_INET_Addr & -Connection_Handler::local_addr (void) +Connection_Handler::local_addr (void) const { return this->local_addr_; } +void +Connection_Handler::local_addr (ACE_INET_Addr &la) +{ + this->local_addr_ = la; +} + // Make the appropriate type of <Connection_Handler> (i.e., // <Consumer_Handler>, <Supplier_Handler>, <Thr_Consumer_Handler>, or // <Thr_Supplier_Handler>). diff --git a/apps/Gateway/Gateway/Connection_Handler.h b/apps/Gateway/Gateway/Connection_Handler.h index ea21af476b8..876f69d97f4 100644 --- a/apps/Gateway/Gateway/Connection_Handler.h +++ b/apps/Gateway/Gateway/Connection_Handler.h @@ -47,16 +47,6 @@ public: // Initialize and activate a single-threaded <Connection_Handler> // (called by <ACE_Connector::handle_output>). - const ACE_INET_Addr &remote_addr (void); - // Returns the peer's routing address. - - const ACE_INET_Addr &local_addr (void); - // Returns our local address. - - // = Set/get connection id. - CONNECTION_ID connection_id (void); - void connection_id (CONNECTION_ID); - // = The current state of the Connection_Handler. enum State { @@ -69,7 +59,19 @@ public: // = Set/get the current state. void state (State); - State state (void); + State state (void) const; + + // = Set/get remote INET addr. + void remote_addr (ACE_INET_Addr &); + const ACE_INET_Addr &remote_addr (void) const; + + // = Set/get local INET addr. + void local_addr (ACE_INET_Addr &); + const ACE_INET_Addr &local_addr (void) const; + + // = Set/get connection id. + void connection_id (CONNECTION_ID); + CONNECTION_ID connection_id (void) const; // = Set/get the current retry timeout delay. void timeout (int); @@ -77,17 +79,22 @@ public: // = Set/get the maximum retry timeout delay. void max_timeout (int); - int max_timeout (void); + int max_timeout (void) const; // = Set/get proxy role (i.e., 'S' for Supplier and 'C' for Consumer // (necessary for error checking). void connection_role (char); - char connection_role (void); + char connection_role (void) const; + + // = Set/get the <Event_Channel> *. + void event_channel (Event_Channel *); + Event_Channel *event_channel (void) const; // = The total number of bytes sent/received on this proxy. - size_t total_bytes (void); void total_bytes (size_t bytes); // Increment count by <bytes>. + size_t total_bytes (void) const; + // Return the current byte count. virtual int handle_timeout (const ACE_Time_Value &, const void *arg); // Perform timer-based Connection_Handler reconnection. diff --git a/apps/Gateway/Gateway/Connection_Handler_Acceptor.cpp b/apps/Gateway/Gateway/Connection_Handler_Acceptor.cpp index 20cfec0ea26..4f399c2b721 100644 --- a/apps/Gateway/Gateway/Connection_Handler_Acceptor.cpp +++ b/apps/Gateway/Gateway/Connection_Handler_Acceptor.cpp @@ -6,16 +6,41 @@ #include "Connection_Handler_Acceptor.h" int -Connection_Handler_Acceptor::make_svc_handler (Connection_Handler *&ph) +Connection_Handler_Acceptor::make_svc_handler (Connection_Handler *&ch) { - ACE_ALLOCATOR_RETURN (ph, + ACE_ALLOCATOR_RETURN (ch, this->connection_handler_factory_.make_connection_handler (this->connection_config_info_), -1); return 0; } +int +Connection_Handler_Acceptor::accept_svc_handler (Connection_Handler *ch) +{ + if (this->inherited::accept_svc_handler (ch) == -1) + return -1; + else + { + ch->connection_id (Options::instance ()->connection_id ()); + ACE_INET_Addr remote_addr; + + if (ch->peer ().get_remote_addr (remote_addr) == -1) + return -1; + + // Set the remote address of our connected Peer. + ch->remote_addr (remote_addr); + + // Set the Event_Channel pointer. + ch->event_channel (&this->event_channel_); + + // Increment the connection ID by one. + Options::instance ()->connection_id ()++; + return 0; + } +} + Connection_Handler_Acceptor::Connection_Handler_Acceptor (Event_Channel &ec, - char connection_role) + char connection_role) : event_channel_ (ec) { this->connection_config_info_.connection_id_ = 0; diff --git a/apps/Gateway/Gateway/Connection_Handler_Acceptor.h b/apps/Gateway/Gateway/Connection_Handler_Acceptor.h index 31ca2f99c0c..fb8d84ad667 100644 --- a/apps/Gateway/Gateway/Connection_Handler_Acceptor.h +++ b/apps/Gateway/Gateway/Connection_Handler_Acceptor.h @@ -32,12 +32,21 @@ class Connection_Handler_Acceptor : public ACE_Acceptor<Connection_Handler, ACE_ // work... public: Connection_Handler_Acceptor (Event_Channel &, - char connection_role); + char connection_role); + // Constructor. - virtual int make_svc_handler (Connection_Handler *&ph); + virtual int make_svc_handler (Connection_Handler *&ch); // Hook method for creating an appropriate <Connection_Handler>. + virtual int accept_svc_handler (Connection_Handler *ch); + // Hook method for accepting a connection into the + // <Connection_Handler>. + protected: + typedef ACE_Acceptor<Connection_Handler, ACE_SOCK_ACCEPTOR> + inherited; + // Make life easier later on. + Event_Channel &event_channel_; // Reference to the event channel. diff --git a/apps/Gateway/Gateway/Event.h b/apps/Gateway/Gateway/Event.h index 58ef1f0a97b..f99b9a30ad4 100644 --- a/apps/Gateway/Gateway/Event.h +++ b/apps/Gateway/Gateway/Event.h @@ -14,8 +14,8 @@ // // ============================================================================ -#if !defined (EVENT) -#define EVENT +#if !defined (EVENT_H) +#define EVENT_H #include "ace/OS.h" @@ -44,24 +44,46 @@ #define DEFAULT_PEER_SUPPLIER_PORT 10012 #endif /* DEFAULT_PEER_SUPPLIER_PORT */ +#if !defined (MAX_CONSUMERS) +#define MAX_CONSUMERS 1000 +#endif /* MAX_CONSUMERS */ + // This is the unique supplier identifier that denotes a particular // <Connection_Handler> in the Gateway. typedef ACE_INT32 CONNECTION_ID; +enum +{ + // = These are the types of events generated by the <Suppliers> and + // handled by the <Event_Channel>. + + ROUTING_EVENT = 0, + // A normal event, which is forwarded to the <Consumers>. + + SUBSCRIPTION_EVENT = 1 + // A subscription to <Suppliers> managed by the <Event_Channel>. +}; + class Event_Key { // = TITLE // Address used to identify the source/destination of an event. // // = DESCRIPTION - // This is really a "virtual forwarding address" thatis used to - // decouple the filtering and forwarding logic of the Event - // Channel from the format of the data. + // This is really a "processing descriptor" that is used to + // decouple the processing, filtering, and forwarding logic of + // the Event Channel from the format of the data. The + // <connection_id_> and <type_> fields are copied from the + // <Event_Header> class below. public: Event_Key (CONNECTION_ID cid = -1, - u_char type = 0) + ACE_INT32 type = 0, + ACE_INT32 priority = 0) : connection_id_ (cid), - type_ (type) {} + type_ (type), + priority_ (priority) + { + } int operator== (const Event_Key &event_addr) const { @@ -74,7 +96,10 @@ public: // Connection_Handler. ACE_INT32 type_; - // Event type. + // Event type, e.g., <ROUTING_EVENT> or <SUBSCRIPTION_EVENT>. + + ACE_INT32 priority_; + // Event priority. }; class Event_Header @@ -91,23 +116,36 @@ public: INVALID_ID = -1 // No peer can validly use this number. }; + Event_Header (ACE_INT32 len, + CONNECTION_ID connection_id, + ACE_INT32 type, + ACE_INT32 priority) + : len_ (len), + connection_id_ (connection_id), + type_ (type), + priority_ (priority) + { + } + void decode (void) - { - this->len_ = ntohl (this->len_); - this->type_ = ntohl (this->type_); - this->priority_ = ntohl (this->priority_); - } + { + this->len_ = ntohl (this->len_); + this->connection_id_ = ntohl (this->connection_id_); + this->type_ = ntohl (this->type_); + this->priority_ = ntohl (this->priority_); + } // Decode from network byte order to host byte order. void encode (void) - { - this->len_ = htonl (this->len_); - this->type_ = htonl (this->type_); - this->priority_ = htonl (this->priority_); - } + { + this->len_ = htonl (this->len_); + this->connection_id_ = htonl (this->connection_id_); + this->type_ = htonl (this->type_); + this->priority_ = htonl (this->priority_); + } // Encode from host byte order to network byte order. - size_t len_; + ACE_INT32 len_; // Length of the data_ payload, in bytes. CONNECTION_ID connection_id_; @@ -115,7 +153,7 @@ public: // Connection_Handler. ACE_INT32 type_; - // Event type. + // Event type, e.g., <ROUTING_EVENT> or <SUBSCRIPTION_EVENT>. ACE_INT32 priority_; // Event priority. @@ -137,4 +175,43 @@ public: // Event data. }; -#endif /* EVENT */ +class Subscription +{ + // = TITLE + // Allows Consumers to subscribe to be routed information + // arriving from a particular Supplier connection id. +public: + void decode (void) + { + this->connection_id_ = ntohl (this->connection_id_); + + for (ACE_INT32 i = 0; i < this->total_consumers_; i++) + this->consumers_[i] = ntohl (this->consumers_[i]); + + this->total_consumers_ = ntohl (this->total_consumers_); + } + // Decode from network byte order to host byte order. + + void encode (void) + { + this->connection_id_ = htonl (this->connection_id_); + this->total_consumers_ = htonl (this->total_consumers_); + + for (ACE_INT32 i = 0; i < this->total_consumers_; i++) + this->consumers_[i] = htonl (this->consumers_[i]); + + } + // Encode from host byte order to network byte order. + + ACE_INT32 connection_id_; + // Connection id. + + ACE_INT32 consumers_[MAX_CONSUMERS]; + // Connection ids for consumers that will be routed information + // containing this <connection_id_> + + ACE_INT32 total_consumers_; + // Total number of these consumers. +}; + +#endif /* EVENT_H */ diff --git a/apps/Gateway/Gateway/Event_Channel.cpp b/apps/Gateway/Gateway/Event_Channel.cpp index 7d3477166ed..60dbaecd4e7 100644 --- a/apps/Gateway/Gateway/Event_Channel.cpp +++ b/apps/Gateway/Gateway/Event_Channel.cpp @@ -81,51 +81,100 @@ Event_Channel::compute_performance_statistics (void) int Event_Channel::handle_timeout (const ACE_Time_Value &, - const void *) + const void *) { // This is called periodically to compute performance statistics. return this->compute_performance_statistics (); } -// This method forwards the <event> to Consumer that have registered -// to receive it. - int Event_Channel::put (ACE_Message_Block *event, ACE_Time_Value *) { - // We got a valid event, so determine its virtual forwarding - // address, which is stored in the first of the two event blocks, - // which are chained together by <ACE::recv>. - - Event_Key *forwarding_addr = (Event_Key *) event->rd_ptr (); + // We got a valid event, so determine its type, which is stored in + // the first of the two <ACE_Message_Block>s, which are chained + // together by <ACE::recv>. + Event_Key *event_key = (Event_Key *) event->rd_ptr (); - // Skip over the address portion and get the data. + // Skip over the address portion and get the data, which is in the + // second <ACE_Message_Block>. ACE_Message_Block *data = event->cont (); - // <dispatch_set> points to the set of Consumers associated with - // this forwarding address. + switch (event_key->type_) + { + case ROUTING_EVENT: + this->routing_event (event_key, + data); + break; + case SUBSCRIPTION_EVENT: + this->subscription_event (data); + break; + } + + // Release the memory in the message block. + event->release (); + return 0; +} + +void +Event_Channel::subscription_event (ACE_Message_Block *data) +{ + Event *event = (Event *) data->rd_ptr (); + + ACE_DEBUG ((LM_DEBUG, + "(%t) received a subscription with %d bytes from connection id %d\n", + event->header_.len_, + event->header_.connection_id_)); + Subscription *subscription = (Subscription *) event->data_; + // Convert the subscription into host byte order so that we can + // access it directly without having to repeatedly muck with it... + subscription->decode (); + + ACE_DEBUG ((LM_DEBUG, + "(%t) connection_id_ = %d, total_consumers_ = %d\n", + subscription->connection_id_, + subscription->total_consumers_)); + + for (ACE_INT32 i = 0; + i < subscription->total_consumers_; + i++) + ACE_DEBUG ((LM_DEBUG, + "(%t) consumers_[%d] = %d\n", + i, + subscription->consumers_[i])); + +} + +void +Event_Channel::routing_event (Event_Key *forwarding_address, + ACE_Message_Block *data) +{ Consumer_Dispatch_Set *dispatch_set = 0; - if (this->efd_.find (*forwarding_addr, dispatch_set) == -1) + // Initialize the <dispatch_set> to points to the set of Consumers + // associated with this forwarding address. + + if (this->efd_.find (*forwarding_address, + dispatch_set) == -1) // Failure. ACE_ERROR ((LM_DEBUG, "(%t) find failed on connection id = %d, type = %d\n", - forwarding_addr->connection_id_, - forwarding_addr->type_)); + forwarding_address->connection_id_, + forwarding_address->type_)); else { // Check to see if there are any consumers. if (dispatch_set->size () == 0) ACE_DEBUG ((LM_WARNING, - "there are no active consumers for this event currently\n")); + "there are no active consumers for this event currently\n")); else // There are consumers, so forward the event. { + // Initialize the interator. Consumer_Dispatch_Set_Iterator dsi (*dispatch_set); // At this point, we should assign a thread-safe locking - // strategy to the Message_Block is we're running in a + // strategy to the <ACE_Message_Block> is we're running in a // multi-threaded configuration. data->locking_strategy (Options::instance ()->locking_strategy ()); @@ -147,11 +196,11 @@ Event_Channel::put (ACE_Message_Block *event, { if (errno == EWOULDBLOCK) // The queue has filled up! ACE_ERROR ((LM_ERROR, "(%t) %p\n", - "gateway is flow controlled, so we're dropping events")); + "gateway is flow controlled, so we're dropping events")); else ACE_ERROR ((LM_ERROR, "(%t) %p transmission error to peer %d\n", - "put", + "put", (*connection_handler)->connection_id ())); // We are responsible for releasing an @@ -162,16 +211,6 @@ Event_Channel::put (ACE_Message_Block *event, } } } - - // Release the memory in the message block. - event->release (); - return 0; -} - -int -Event_Channel::svc (void) -{ - return 0; } int @@ -284,18 +323,28 @@ Event_Channel::initiate_acceptors (void) (Options::instance ()->consumer_acceptor_port (), ACE_Reactor::instance (), Options::instance ()->blocking_semantics ()) == -1) - ACE_ERROR_RETURN ((LM_ERROR, "%p\n", + ACE_ERROR_RETURN ((LM_ERROR, + "%p\n", "cannot register acceptor"), -1); + else + ACE_DEBUG ((LM_DEBUG, + "accepting Consumers at %d\n", + Options::instance ()->consumer_acceptor_port ())); if (Options::instance ()->enabled (Options::SUPPLIER_CONNECTOR) && this->supplier_acceptor_.open (Options::instance ()->supplier_acceptor_port (), ACE_Reactor::instance (), Options::instance ()->blocking_semantics ()) == -1) - ACE_ERROR_RETURN ((LM_ERROR, "%p\n", + ACE_ERROR_RETURN ((LM_ERROR, + "%p\n", "cannot register acceptor"), -1); + else + ACE_DEBUG ((LM_DEBUG, + "accepting Suppliers at %d\n", + Options::instance ()->supplier_acceptor_port ())); return 0; } @@ -306,12 +355,15 @@ Event_Channel::initiate_acceptors (void) int Event_Channel::close (u_long) { - if (Options::instance ()->threading_strategy () - != Options::REACTIVE) + if (Options::instance ()->threading_strategy () != Options::REACTIVE) { if (ACE_Thread_Manager::instance ()->suspend_all () == -1) - ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "suspend_all"), -1); - ACE_DEBUG ((LM_DEBUG, "(%t) suspending all threads\n")); + ACE_ERROR_RETURN ((LM_ERROR, + "(%t) %p\n", + "suspend_all"), + -1); + ACE_DEBUG ((LM_DEBUG, + "(%t) suspending all threads\n")); } // First tell everyone that the spaceship is here... @@ -408,7 +460,7 @@ Event_Channel::bind_proxy (Connection_Handler *connection_handler) int Event_Channel::subscribe (const Event_Key &event_addr, - Consumer_Dispatch_Set *cds) + Consumer_Dispatch_Set *cds) { int result = this->efd_.bind (event_addr, cds); diff --git a/apps/Gateway/Gateway/Event_Channel.h b/apps/Gateway/Gateway/Event_Channel.h index f90815be3af..dfe5faa6fd4 100644 --- a/apps/Gateway/Gateway/Event_Channel.h +++ b/apps/Gateway/Gateway/Event_Channel.h @@ -69,7 +69,7 @@ public: // Subscribe the <Consumer_Dispatch_Set> to receive events that // match <Event_Key>. - // = Event forwarding method. + // = Event processing entry point. virtual int put (ACE_Message_Block *mb, ACE_Time_Value * = 0); // Pass <mb> to the Event Channel so it can forward it to Consumers. @@ -82,12 +82,18 @@ public: // Suppliers. private: - virtual int svc (void); - // Run as an active object. - int parse_args (int argc, char *argv[]); // Parse the command-line arguments. + // = Methods for handling events. + void routing_event (Event_Key *event_key, + ACE_Message_Block *data); + // Forwards the <data> to Consumer that have registered to receive + // it, based on addressing information in the <event_key>. + + void subscription_event (ACE_Message_Block *data); + // Add a Consumer subscription. + int compute_performance_statistics (void); // Perform timer-based performance profiling. diff --git a/apps/Gateway/Gateway/Gateway.cpp b/apps/Gateway/Gateway/Gateway.cpp index 383e9705acb..fee231cfe15 100644 --- a/apps/Gateway/Gateway/Gateway.cpp +++ b/apps/Gateway/Gateway/Gateway.cpp @@ -184,17 +184,47 @@ Gateway::parse_connection_config_file (void) Options::instance ()->connection_config_file ()), -1); + // Keep track of the previous connection id to make sure the + // connection config file isn't corrupted. + int previous_connection_id = 0; + // Read config file one line at a time. + for (Connection_Config_Info pci; connection_file.read_entry (pci, line_number) != FP::EOFILE; ) { file_empty = 0; + // First time in check. + if (previous_connection_id == 0) + { + previous_connection_id == 1; + + if (pci.connection_id_ != 1) + ACE_DEBUG ((LM_DEBUG, + "(%t) warning, the first connection id should be 1 not %d\n", + pci.connection_id_)); + } + else if (previous_connection_id + 1 != pci.connection_id_) + ACE_DEBUG ((LM_DEBUG, + "(%t) warning, connection ids should keep increasing by 1 and %d + 1 != %d\n", + previous_connection_id, + pci.connection_id_)); + + // Update the last connection id to ensure that we monotonically + // increase by 1. + previous_connection_id = pci.connection_id_; + if (Options::instance ()->enabled (Options::DEBUG)) ACE_DEBUG ((LM_DEBUG, - "(%t) conn id = %d, host = %s, remote port = %d, proxy role = %c, " - "max retry timeout = %d, local port = %d, priority = %d\n", + "(%t) conn id = %d, " + "host = %s, " + "remote port = %d, " + "proxy role = %c, " + "max retry timeout = %d, " + "local port = %d, " + "priority = %d\n", pci.connection_id_, pci.host_, pci.remote_port_, @@ -216,6 +246,10 @@ Gateway::parse_connection_config_file (void) this->event_channel_.bind_proxy (connection_handler); } + // Keep track of the next available connection id, which is + // necessary for Peers that connect with us, rather than vice versa. + Options::instance ()->connection_id () = previous_connection_id + 1; + if (file_empty) ACE_ERROR ((LM_WARNING, "warning: connection connection_handler configuration file was empty\n")); @@ -260,7 +294,9 @@ Gateway::parse_consumer_config_file (void) } Consumer_Dispatch_Set *dispatch_set; - ACE_NEW_RETURN (dispatch_set, Consumer_Dispatch_Set, -1); + ACE_NEW_RETURN (dispatch_set, + Consumer_Dispatch_Set, + -1); Event_Key event_addr (cci_entry.connection_id_, cci_entry.type_); diff --git a/apps/Gateway/Gateway/Options.cpp b/apps/Gateway/Gateway/Options.cpp index 59281148879..0d93e4cd7fa 100644 --- a/apps/Gateway/Gateway/Options.cpp +++ b/apps/Gateway/Gateway/Options.cpp @@ -31,7 +31,8 @@ Options::Options (void) supplier_connector_port_ (DEFAULT_PEER_SUPPLIER_PORT), consumer_connector_port_ (DEFAULT_PEER_CONSUMER_PORT), max_timeout_ (MAX_TIMEOUT), - max_queue_size_ (MAX_QUEUE_SIZE) + max_queue_size_ (MAX_QUEUE_SIZE), + connection_id_ (1) { ACE_OS::strcpy (this->connection_config_file_, "connection_config"); ACE_OS::strcpy (this->consumer_config_file_, "consumer_config"); @@ -66,6 +67,12 @@ Options::performance_window (void) const return this->performance_window_; } +CONNECTION_ID & +Options::connection_id (void) +{ + return this->connection_id_; +} + long Options::max_timeout (void) const { diff --git a/apps/Gateway/Gateway/Options.h b/apps/Gateway/Gateway/Options.h index 090ab222dc2..da61b8202c3 100644 --- a/apps/Gateway/Gateway/Options.h +++ b/apps/Gateway/Gateway/Options.h @@ -111,6 +111,9 @@ public: long max_queue_size (void) const; // The maximum size of the queue. + CONNECTION_ID &connection_id (void); + // Returns a reference to the next available connection id; + private: enum { @@ -173,6 +176,9 @@ private: long max_queue_size_; // The maximum size of the queue. + CONNECTION_ID connection_id_; + // The next available connection id. + char connection_config_file_[MAXPATHLEN + 1]; // Name of the connection configuration file. diff --git a/apps/Gateway/Gateway/connection_config b/apps/Gateway/Gateway/connection_config index 205b43d5bc3..ce6fa6b4adf 100644 --- a/apps/Gateway/Gateway/connection_config +++ b/apps/Gateway/Gateway/connection_config @@ -11,6 +11,9 @@ # Suppliers using that connection. The Connection ID field is the # "key" that is used to match up connections in this file with the # Consumer subscription requests in the "consumer_config" file. +# The connection ids should start at 1 and monotonically increase +# by increments of 1. This makes it possible for the Gateway to +# properly allocate connection ids for Peers that connect to it. # # 2. Host -- The host name where the Supplier/Consumer peerd # process is running. @@ -42,8 +45,8 @@ # Connection Host Remote Handler Max Retry Local Priority # ID Port Role Timeout Port # ---------- -------- ------ ------ ---------- ----- -------- - 1 lindy * S * * 1 - 2 polka * C * * 1 + 1 flamenco * S * * 1 + 2 lindy * C * * 1 # 3 mambo.cs * C * * 1 # 4 lambada.cs * C * * 1 # 5 lambada.cs * C * * 1 diff --git a/apps/Gateway/Gateway/svc.conf b/apps/Gateway/Gateway/svc.conf index 8eff73af0dc..9b35a7dcbd6 100644 --- a/apps/Gateway/Gateway/svc.conf +++ b/apps/Gateway/Gateway/svc.conf @@ -1,3 +1,3 @@ #static Svc_Manager "-d -p 2913" -dynamic Gateway Service_Object * ./Gateway:_make_Gateway() active "-b -d -c C|S -P connection_config -C consumer_config" +dynamic Gateway Service_Object * ./Gateway:_make_Gateway() active "-b -d -c C|S -a C|S -P connection_config -C consumer_config" diff --git a/apps/Gateway/Peer/Options.cpp b/apps/Gateway/Peer/Options.cpp index 5199be47304..f3fe8f119dd 100644 --- a/apps/Gateway/Peer/Options.cpp +++ b/apps/Gateway/Peer/Options.cpp @@ -9,7 +9,7 @@ Options *Options::instance_ = 0; void Options::print_usage_and_die (void) { - ACE_DEBUG ((LM_DEBUG, "%n [-a {C|S}:acceptor-port] [-c {C|S}:connector-port] [-h gateway-host] [-q max-queue-size] [-t timeout] [-v]\n")); + ACE_DEBUG ((LM_DEBUG, "%n [-a {C|S}:acceptor-port] [-c {C|S}:connector-port] [-C connection-id] [-h gateway-host] [-q max-queue-size] [-t timeout] [-v]\n")); } Options::Options (void) @@ -20,7 +20,8 @@ Options::Options (void) consumer_connector_port_ (DEFAULT_GATEWAY_CONSUMER_PORT), connector_host_ (ACE_DEFAULT_SERVER_HOST), timeout_ (0), - max_queue_size_ (MAX_QUEUE_SIZE) + max_queue_size_ (MAX_QUEUE_SIZE), + connection_id_ (0) { char *timeout = ACE_OS::getenv ("TIMEOUT"); @@ -45,6 +46,12 @@ Options::timeout (void) const return this->timeout_; } +CONNECTION_ID & +Options::connection_id (void) +{ + return this->connection_id_; +} + long Options::max_queue_size (void) const { @@ -148,6 +155,10 @@ Options::parse_args (int argc, char *argv[]) } break; /* NOTREACHED */ + case 'C': + this->connection_id_ = ACE_OS::atoi (get_opt.optarg); + break; + /* NOTREACHED */ case 'h': // connector host this->connector_host_ = get_opt.optarg; diff --git a/apps/Gateway/Peer/Options.h b/apps/Gateway/Peer/Options.h index d9bb02facca..c957e1a295e 100644 --- a/apps/Gateway/Peer/Options.h +++ b/apps/Gateway/Peer/Options.h @@ -72,6 +72,9 @@ public: long max_queue_size (void) const; // The maximum size of the queue. + CONNECTION_ID &connection_id (void); + // Returns a reference to the connection id. + private: enum { @@ -122,6 +125,9 @@ private: long max_queue_size_; // The maximum size that the queue can grow to. + + CONNECTION_ID connection_id_; + // The connection id. }; #endif /* OPTIONS_H */ diff --git a/apps/Gateway/Peer/Peer.cpp b/apps/Gateway/Peer/Peer.cpp index d1125d5d1c1..f65db04f6ca 100644 --- a/apps/Gateway/Peer/Peer.cpp +++ b/apps/Gateway/Peer/Peer.cpp @@ -57,10 +57,45 @@ Peer_Handler::open (void *a) return 0; } +void +Peer_Handler::transmit (ACE_Message_Block *mb, + size_t n, + int event_type) +{ + Event *event = (Event *) mb->rd_ptr (); + + // Initialize the header. + new (&event->header_) Event_Header (n, + this->connection_id_, + 0, + event_type); + + // Convert all the fields into network byte order. + event->header_.encode (); + + // Move the write pointer to the end of the event. + mb->wr_ptr (sizeof (Event_Header) + n); + + if (this->put (mb) == -1) + { + if (errno == EWOULDBLOCK) // The queue has filled up! + ACE_ERROR ((LM_ERROR, + "%p\n", + "gateway is flow controlled, so we're dropping events")); + else + ACE_ERROR ((LM_ERROR, + "%p\n", + "transmission failure in transmit_stdin")); + // Caller is responsible for freeing a ACE_Message_Block + // if failures occur. + mb->release (); + } +} + // Read events from stdin and send them to the gatewayd. int -Peer_Handler::xmit_stdin (void) +Peer_Handler::transmit_stdin (void) { if (this->connection_id_ != -1) { @@ -88,36 +123,16 @@ Peer_Handler::xmit_stdin (void) ACE_Event_Handler::DONT_CALL | ACE_Event_Handler::READ_MASK); mb->release (); break; + /* NOTREACHED */ case -1: mb->release (); ACE_ERROR ((LM_ERROR, "%p\n", "read")); break; + /* NOTREACHED */ default: - event->header_.connection_id_ = this->connection_id_; - event->header_.len_ = n; - event->header_.priority_ = 0; - event->header_.type_ = 0; - - // Convert all the fields into network byte order. - event->header_.encode (); - - // Move the write pointer to the end of the event. - mb->wr_ptr (sizeof (Event_Header) + n); - - if (this->put (mb) == -1) - { - if (errno == EWOULDBLOCK) // The queue has filled up! - ACE_ERROR ((LM_ERROR, - "%p\n", - "gateway is flow controlled, so we're dropping events")); - else - ACE_ERROR ((LM_ERROR, - "%p\n", - "transmission failure in xmit_stdin")); - // Caller is responsible for freeing a ACE_Message_Block - // if failures occur. - mb->release (); - } + this->transmit (mb, n, ROUTING_EVENT); + break; + /* NOTREACHED */ } } @@ -144,7 +159,7 @@ Peer_Handler::nonblk_put (ACE_Message_Block *mb) { // We didn't manage to send everything, so requeue. ACE_DEBUG ((LM_DEBUG, - "queueing activated on handle %d to supplier id %d\n", + "queueing activated on handle %d to connection id %d\n", this->get_handle (), this->connection_id_)); @@ -207,7 +222,7 @@ Peer_Handler::handle_output (ACE_HANDLE) if (this->msg_queue ()->is_empty ()) { ACE_DEBUG ((LM_DEBUG, - "queue now empty on handle %d to supplier id %d\n", + "queue now empty on handle %d to connection id %d\n", this->get_handle (), this->connection_id_)); @@ -394,7 +409,7 @@ Peer_Handler::recv (ACE_Message_Block *&mb) } ACE_DEBUG ((LM_DEBUG, - "(%t) supplier id = %d, cur len = %d, total bytes read = %d\n", + "(%t) connection id = %d, cur len = %d, total bytes read = %d\n", event->header_.connection_id_, event->header_.len_, data_received + header_received)); @@ -415,14 +430,14 @@ Peer_Handler::handle_input (ACE_HANDLE sd) { ACE_DEBUG ((LM_DEBUG, "in handle_input, sd = %d\n", sd)); if (sd == ACE_STDIN) // Handle event from stdin. - return this->xmit_stdin (); + return this->transmit_stdin (); else // Perform the appropriate action depending on the state we are // in. return (this->*do_action_) (); } -// Action that receives our supplier id from the Gateway. +// Action that receives our connection id from the Gateway. int Peer_Handler::await_connection_id (void) @@ -452,6 +467,10 @@ Peer_Handler::await_connection_id (void) this->connection_id_)); } + // Subscribe for events if we're a Consumer. + if (Options::instance ()->enabled (Options::CONSUMER_CONNECTOR)) + this->subscribe (); + // Transition to the action that waits for Peer events. this->do_action_ = &Peer_Handler::await_events; @@ -467,7 +486,21 @@ Peer_Handler::await_connection_id (void) return 0; } -// Action that receives events. +int +Peer_Handler::subscribe (void) +{ + ACE_Message_Block *mb; + + ACE_NEW_RETURN (mb, + ACE_Message_Block (sizeof (Event)), + -1); + + Subscription *subscription = (Subscription *) ((Event *) mb->rd_ptr ())->data_; + subscription->connection_id_ = Options::instance ()->connection_id (); + this->transmit (mb, sizeof *subscription, SUBSCRIPTION_EVENT); +} + +// Action that receives events from the Gateway. int Peer_Handler::await_events (void) diff --git a/apps/Gateway/Peer/Peer.h b/apps/Gateway/Peer/Peer.h index d2fa6b17e40..80707d9e3e1 100644 --- a/apps/Gateway/Peer/Peer.h +++ b/apps/Gateway/Peer/Peer.h @@ -94,18 +94,26 @@ public: protected: typedef ACE_Svc_Handler<ACE_SOCK_STREAM, ACE_NULL_SYNCH> inherited; - virtual int recv (ACE_Message_Block *&); + void transmit (ACE_Message_Block *mb, + size_t n, + int event_type); + // Transmit <mb> to the gatewayd. + + virtual int recv (ACE_Message_Block *&mb); // Receive an Peer event from gatewayd. - virtual int send (ACE_Message_Block *); + virtual int send (ACE_Message_Block *mb); // Send an Peer event to gatewayd, using <nonblk_put>. virtual int nonblk_put (ACE_Message_Block *mb); // Perform a non-blocking <put>, which tries to send an event to the // gatewayd, but only if it isn't flow controlled. + int subscribe (void); + // Register Consumer subscriptions with the gateway. + // = Event/state/action handlers. - int xmit_stdin (void); + int transmit_stdin (void); // Receive a event from stdin and send it to the gateway. int await_connection_id (void); |