summaryrefslogtreecommitdiff
path: root/apps/Gateway
diff options
context:
space:
mode:
authorschmidt <douglascraigschmidt@users.noreply.github.com>1998-01-11 06:35:40 +0000
committerschmidt <douglascraigschmidt@users.noreply.github.com>1998-01-11 06:35:40 +0000
commit19271220140a7e0a9e8b0a6e024f50245b4dbdb8 (patch)
tree8badff2cf8786f3a791570623ff0b87967caa147 /apps/Gateway
parenteaa8071f2d32c071a8c8259928c8dc48402d8156 (diff)
downloadATCD-19271220140a7e0a9e8b0a6e024f50245b4dbdb8.tar.gz
*** empty log message ***
Diffstat (limited to 'apps/Gateway')
-rw-r--r--apps/Gateway/Gateway/Concrete_Connection_Handlers.cpp41
-rw-r--r--apps/Gateway/Gateway/Concrete_Connection_Handlers.h7
-rw-r--r--apps/Gateway/Gateway/Config_Files.h13
-rw-r--r--apps/Gateway/Gateway/Connection_Handler.cpp52
-rw-r--r--apps/Gateway/Gateway/Connection_Handler.h35
-rw-r--r--apps/Gateway/Gateway/Connection_Handler_Acceptor.cpp31
-rw-r--r--apps/Gateway/Gateway/Connection_Handler_Acceptor.h13
-rw-r--r--apps/Gateway/Gateway/Event.h119
-rw-r--r--apps/Gateway/Gateway/Event_Channel.cpp124
-rw-r--r--apps/Gateway/Gateway/Event_Channel.h14
-rw-r--r--apps/Gateway/Gateway/Gateway.cpp42
-rw-r--r--apps/Gateway/Gateway/Options.cpp9
-rw-r--r--apps/Gateway/Gateway/Options.h6
-rw-r--r--apps/Gateway/Gateway/connection_config7
-rw-r--r--apps/Gateway/Gateway/svc.conf2
-rw-r--r--apps/Gateway/Peer/Options.cpp15
-rw-r--r--apps/Gateway/Peer/Options.h6
-rw-r--r--apps/Gateway/Peer/Peer.cpp97
-rw-r--r--apps/Gateway/Peer/Peer.h14
19 files changed, 483 insertions, 164 deletions
diff --git a/apps/Gateway/Gateway/Concrete_Connection_Handlers.cpp b/apps/Gateway/Gateway/Concrete_Connection_Handlers.cpp
index 02ff3e3dec8..86d73d3e9b2 100644
--- a/apps/Gateway/Gateway/Concrete_Connection_Handlers.cpp
+++ b/apps/Gateway/Gateway/Concrete_Connection_Handlers.cpp
@@ -71,18 +71,24 @@ Consumer_Handler::nonblk_put (ACE_Message_Block *event)
{
ACE_DEBUG ((LM_DEBUG,
"(%t) queueing activated on handle %d to routing id %d\n",
- this->get_handle (),
+ this->get_handle (),
this->connection_id ()));
// ACE_Queue in *front* of the list to preserve order.
if (this->msg_queue ()->enqueue_head
(event, (ACE_Time_Value *) &ACE_Time_Value::zero) == -1)
- ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "enqueue_head"), -1);
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "(%t) %p\n",
+ "enqueue_head"),
+ -1);
// Tell ACE_Reactor to call us back when we can send again.
else if (ACE_Reactor::instance ()->schedule_wakeup
(this, ACE_Event_Handler::WRITE_MASK) == -1)
- ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "schedule_wakeup"), -1);
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "(%t) %p\n",
+ "schedule_wakeup"),
+ -1);
return 0;
}
else
@@ -144,7 +150,9 @@ Consumer_Handler::handle_output (ACE_HANDLE)
// We are responsible for releasing an ACE_Message_Block if
// failures occur.
event->release ();
- ACE_ERROR ((LM_ERROR, "(%t) %p\n", "transmission failure"));
+ ACE_ERROR ((LM_ERROR,
+ "(%t) %p\n",
+ "transmission failure"));
/* FALLTHROUGH */
default: // Sent the whole thing.
@@ -165,19 +173,24 @@ Consumer_Handler::handle_output (ACE_HANDLE)
if (ACE_Reactor::instance ()->cancel_wakeup
(this, ACE_Event_Handler::WRITE_MASK) == -1)
- ACE_ERROR ((LM_ERROR, "(%t) %p\n", "cancel_wakeup"));
+ ACE_ERROR ((LM_ERROR,
+ "(%t) %p\n",
+ "cancel_wakeup"));
}
}
}
else
- ACE_ERROR ((LM_ERROR, "(%t) %p\n", "dequeue_head"));
+ ACE_ERROR ((LM_ERROR,
+ "(%t) %p\n",
+ "dequeue_head"));
return 0;
}
// Send an event to a Consumer (may queue if necessary).
int
-Consumer_Handler::put (ACE_Message_Block *event, ACE_Time_Value *)
+Consumer_Handler::put (ACE_Message_Block *event,
+ ACE_Time_Value *)
{
if (this->msg_queue ()->is_empty ())
// Try to send the event *without* blocking!
@@ -371,9 +384,9 @@ Supplier_Handler::recv (ACE_Message_Block *&forward_addr)
int
Supplier_Handler::handle_input (ACE_HANDLE)
{
- ACE_Message_Block *forward_addr = 0;
+ ACE_Message_Block *event_key = 0;
- switch (this->recv (forward_addr))
+ switch (this->recv (event_key))
{
case 0:
// Note that a peer shouldn't initiate a shutdown by closing the
@@ -400,17 +413,17 @@ Supplier_Handler::handle_input (ACE_HANDLE)
/* NOTREACHED */
default:
// Route messages to Consumers.
- return this->forward (forward_addr);
+ return this->process (event_key);
}
}
-// Forward an event to its appropriate Consumer(s). This delegates to
-// the <Event_Channel> to do the actual forwarding.
+// This delegates to the <Event_Channel> to do the actual processing.
+// Typically, this forwards the event to its appropriate Consumer(s).
int
-Supplier_Handler::forward (ACE_Message_Block *forward_addr)
+Supplier_Handler::process (ACE_Message_Block *event_key)
{
- return this->event_channel_->put (forward_addr);
+ return this->event_channel_->put (event_key);
}
Thr_Consumer_Handler::Thr_Consumer_Handler (const Connection_Config_Info &pci)
diff --git a/apps/Gateway/Gateway/Concrete_Connection_Handlers.h b/apps/Gateway/Gateway/Concrete_Connection_Handlers.h
index a14d97b9d3e..ca36e3f517e 100644
--- a/apps/Gateway/Gateway/Concrete_Connection_Handlers.h
+++ b/apps/Gateway/Gateway/Concrete_Connection_Handlers.h
@@ -45,9 +45,10 @@ protected:
virtual int recv (ACE_Message_Block *&);
// Receive an event from a Supplier.
- int forward (ACE_Message_Block *event);
- // Forward the <event> to its appropriate Consumer. This delegates
- // to the <Event_Channel> to do the actual forwarding.
+ int process (ACE_Message_Block *event);
+ // This delegates to the <Event_Channel> to do the actual
+ // processing. Typically, it forwards the <event> to its
+ // appropriate Consumer.
ACE_Message_Block *msg_frag_;
// Keep track of event fragment to handle non-blocking recv's from
diff --git a/apps/Gateway/Gateway/Config_Files.h b/apps/Gateway/Gateway/Config_Files.h
index 1199c615833..1e23006ddbd 100644
--- a/apps/Gateway/Gateway/Config_Files.h
+++ b/apps/Gateway/Gateway/Config_Files.h
@@ -71,22 +71,17 @@ class Consumer_Config_Info
// Stores the information in a Consumer Map entry.
{
public:
- enum
- {
- MAX_CONSUMERS = 1000
- // Total number of multicast consumers.
- };
-
ACE_INT32 connection_id_;
- // Connection id for this proxy.
+ // Connection id.
ACE_INT32 type_;
// Message type.
ACE_INT32 consumers_[MAX_CONSUMERS];
- // Connection ids for consumers that we're routing to.
+ // Connection ids for consumers that will be routed information
+ // containing this <connection_id_>
- int total_consumers_;
+ ACE_INT32 total_consumers_;
// Total number of these consumers.
};
diff --git a/apps/Gateway/Gateway/Connection_Handler.cpp b/apps/Gateway/Gateway/Connection_Handler.cpp
index 328f9d502c0..7072ce940a3 100644
--- a/apps/Gateway/Gateway/Connection_Handler.cpp
+++ b/apps/Gateway/Gateway/Connection_Handler.cpp
@@ -4,6 +4,18 @@
#include "Event_Channel.h"
#include "Concrete_Connection_Handlers.h"
+Event_Channel *
+Connection_Handler::event_channel (void) const
+{
+ return this->event_channel_;
+}
+
+void
+Connection_Handler::event_channel (Event_Channel *ec)
+{
+ this->event_channel_ = ec;
+}
+
void
Connection_Handler::connection_id (CONNECTION_ID id)
{
@@ -11,7 +23,7 @@ Connection_Handler::connection_id (CONNECTION_ID id)
}
CONNECTION_ID
-Connection_Handler::connection_id (void)
+Connection_Handler::connection_id (void) const
{
return this->connection_id_;
}
@@ -19,7 +31,7 @@ Connection_Handler::connection_id (void)
// The total number of bytes sent/received on this Proxy.
size_t
-Connection_Handler::total_bytes (void)
+Connection_Handler::total_bytes (void) const
{
return this->total_bytes_;
}
@@ -35,7 +47,7 @@ Connection_Handler::Connection_Handler (void)
}
Connection_Handler::Connection_Handler (const Connection_Config_Info &pci)
- : remote_addr_ (pci.remote_port_, pci.host_),
+ : remote_addr_ (pci.remote_port_, pci.host_[0] == '\0' ? ACE_DEFAULT_SERVER_HOST : pci.host_),
local_addr_ (pci.local_port_),
connection_id_ (pci.connection_id_),
total_bytes_ (0),
@@ -59,7 +71,7 @@ Connection_Handler::connection_role (char d)
// Get the connection_role.
char
-Connection_Handler::connection_role (void)
+Connection_Handler::connection_role (void) const
{
return this->connection_role_;
}
@@ -102,7 +114,7 @@ Connection_Handler::max_timeout (int mto)
// Gets the max timeout delay.
int
-Connection_Handler::max_timeout (void)
+Connection_Handler::max_timeout (void) const
{
return this->max_timeout_;
}
@@ -147,6 +159,14 @@ Connection_Handler::state (Connection_Handler::State s)
this->state_ = s;
}
+// Return the current state of the Proxy.
+
+Connection_Handler::State
+Connection_Handler::state (void) const
+{
+ return this->state_;
+}
+
// Upcall from the <ACE_Acceptor> or <ACE_Connector> that delegates
// control to our Connection_Handler.
@@ -173,26 +193,30 @@ Connection_Handler::open (void *)
return 0;
}
-// Return the current state of the Proxy.
-
-Connection_Handler::State
-Connection_Handler::state (void)
+const ACE_INET_Addr &
+Connection_Handler::remote_addr (void) const
{
- return this->state_;
+ return this->remote_addr_;
}
-const ACE_INET_Addr &
-Connection_Handler::remote_addr (void)
+void
+Connection_Handler::remote_addr (ACE_INET_Addr &ra)
{
- return this->remote_addr_;
+ this->remote_addr_ = ra;
}
const ACE_INET_Addr &
-Connection_Handler::local_addr (void)
+Connection_Handler::local_addr (void) const
{
return this->local_addr_;
}
+void
+Connection_Handler::local_addr (ACE_INET_Addr &la)
+{
+ this->local_addr_ = la;
+}
+
// Make the appropriate type of <Connection_Handler> (i.e.,
// <Consumer_Handler>, <Supplier_Handler>, <Thr_Consumer_Handler>, or
// <Thr_Supplier_Handler>).
diff --git a/apps/Gateway/Gateway/Connection_Handler.h b/apps/Gateway/Gateway/Connection_Handler.h
index ea21af476b8..876f69d97f4 100644
--- a/apps/Gateway/Gateway/Connection_Handler.h
+++ b/apps/Gateway/Gateway/Connection_Handler.h
@@ -47,16 +47,6 @@ public:
// Initialize and activate a single-threaded <Connection_Handler>
// (called by <ACE_Connector::handle_output>).
- const ACE_INET_Addr &remote_addr (void);
- // Returns the peer's routing address.
-
- const ACE_INET_Addr &local_addr (void);
- // Returns our local address.
-
- // = Set/get connection id.
- CONNECTION_ID connection_id (void);
- void connection_id (CONNECTION_ID);
-
// = The current state of the Connection_Handler.
enum State
{
@@ -69,7 +59,19 @@ public:
// = Set/get the current state.
void state (State);
- State state (void);
+ State state (void) const;
+
+ // = Set/get remote INET addr.
+ void remote_addr (ACE_INET_Addr &);
+ const ACE_INET_Addr &remote_addr (void) const;
+
+ // = Set/get local INET addr.
+ void local_addr (ACE_INET_Addr &);
+ const ACE_INET_Addr &local_addr (void) const;
+
+ // = Set/get connection id.
+ void connection_id (CONNECTION_ID);
+ CONNECTION_ID connection_id (void) const;
// = Set/get the current retry timeout delay.
void timeout (int);
@@ -77,17 +79,22 @@ public:
// = Set/get the maximum retry timeout delay.
void max_timeout (int);
- int max_timeout (void);
+ int max_timeout (void) const;
// = Set/get proxy role (i.e., 'S' for Supplier and 'C' for Consumer
// (necessary for error checking).
void connection_role (char);
- char connection_role (void);
+ char connection_role (void) const;
+
+ // = Set/get the <Event_Channel> *.
+ void event_channel (Event_Channel *);
+ Event_Channel *event_channel (void) const;
// = The total number of bytes sent/received on this proxy.
- size_t total_bytes (void);
void total_bytes (size_t bytes);
// Increment count by <bytes>.
+ size_t total_bytes (void) const;
+ // Return the current byte count.
virtual int handle_timeout (const ACE_Time_Value &, const void *arg);
// Perform timer-based Connection_Handler reconnection.
diff --git a/apps/Gateway/Gateway/Connection_Handler_Acceptor.cpp b/apps/Gateway/Gateway/Connection_Handler_Acceptor.cpp
index 20cfec0ea26..4f399c2b721 100644
--- a/apps/Gateway/Gateway/Connection_Handler_Acceptor.cpp
+++ b/apps/Gateway/Gateway/Connection_Handler_Acceptor.cpp
@@ -6,16 +6,41 @@
#include "Connection_Handler_Acceptor.h"
int
-Connection_Handler_Acceptor::make_svc_handler (Connection_Handler *&ph)
+Connection_Handler_Acceptor::make_svc_handler (Connection_Handler *&ch)
{
- ACE_ALLOCATOR_RETURN (ph,
+ ACE_ALLOCATOR_RETURN (ch,
this->connection_handler_factory_.make_connection_handler (this->connection_config_info_),
-1);
return 0;
}
+int
+Connection_Handler_Acceptor::accept_svc_handler (Connection_Handler *ch)
+{
+ if (this->inherited::accept_svc_handler (ch) == -1)
+ return -1;
+ else
+ {
+ ch->connection_id (Options::instance ()->connection_id ());
+ ACE_INET_Addr remote_addr;
+
+ if (ch->peer ().get_remote_addr (remote_addr) == -1)
+ return -1;
+
+ // Set the remote address of our connected Peer.
+ ch->remote_addr (remote_addr);
+
+ // Set the Event_Channel pointer.
+ ch->event_channel (&this->event_channel_);
+
+ // Increment the connection ID by one.
+ Options::instance ()->connection_id ()++;
+ return 0;
+ }
+}
+
Connection_Handler_Acceptor::Connection_Handler_Acceptor (Event_Channel &ec,
- char connection_role)
+ char connection_role)
: event_channel_ (ec)
{
this->connection_config_info_.connection_id_ = 0;
diff --git a/apps/Gateway/Gateway/Connection_Handler_Acceptor.h b/apps/Gateway/Gateway/Connection_Handler_Acceptor.h
index 31ca2f99c0c..fb8d84ad667 100644
--- a/apps/Gateway/Gateway/Connection_Handler_Acceptor.h
+++ b/apps/Gateway/Gateway/Connection_Handler_Acceptor.h
@@ -32,12 +32,21 @@ class Connection_Handler_Acceptor : public ACE_Acceptor<Connection_Handler, ACE_
// work...
public:
Connection_Handler_Acceptor (Event_Channel &,
- char connection_role);
+ char connection_role);
+ // Constructor.
- virtual int make_svc_handler (Connection_Handler *&ph);
+ virtual int make_svc_handler (Connection_Handler *&ch);
// Hook method for creating an appropriate <Connection_Handler>.
+ virtual int accept_svc_handler (Connection_Handler *ch);
+ // Hook method for accepting a connection into the
+ // <Connection_Handler>.
+
protected:
+ typedef ACE_Acceptor<Connection_Handler, ACE_SOCK_ACCEPTOR>
+ inherited;
+ // Make life easier later on.
+
Event_Channel &event_channel_;
// Reference to the event channel.
diff --git a/apps/Gateway/Gateway/Event.h b/apps/Gateway/Gateway/Event.h
index 58ef1f0a97b..f99b9a30ad4 100644
--- a/apps/Gateway/Gateway/Event.h
+++ b/apps/Gateway/Gateway/Event.h
@@ -14,8 +14,8 @@
//
// ============================================================================
-#if !defined (EVENT)
-#define EVENT
+#if !defined (EVENT_H)
+#define EVENT_H
#include "ace/OS.h"
@@ -44,24 +44,46 @@
#define DEFAULT_PEER_SUPPLIER_PORT 10012
#endif /* DEFAULT_PEER_SUPPLIER_PORT */
+#if !defined (MAX_CONSUMERS)
+#define MAX_CONSUMERS 1000
+#endif /* MAX_CONSUMERS */
+
// This is the unique supplier identifier that denotes a particular
// <Connection_Handler> in the Gateway.
typedef ACE_INT32 CONNECTION_ID;
+enum
+{
+ // = These are the types of events generated by the <Suppliers> and
+ // handled by the <Event_Channel>.
+
+ ROUTING_EVENT = 0,
+ // A normal event, which is forwarded to the <Consumers>.
+
+ SUBSCRIPTION_EVENT = 1
+ // A subscription to <Suppliers> managed by the <Event_Channel>.
+};
+
class Event_Key
{
// = TITLE
// Address used to identify the source/destination of an event.
//
// = DESCRIPTION
- // This is really a "virtual forwarding address" thatis used to
- // decouple the filtering and forwarding logic of the Event
- // Channel from the format of the data.
+ // This is really a "processing descriptor" that is used to
+ // decouple the processing, filtering, and forwarding logic of
+ // the Event Channel from the format of the data. The
+ // <connection_id_> and <type_> fields are copied from the
+ // <Event_Header> class below.
public:
Event_Key (CONNECTION_ID cid = -1,
- u_char type = 0)
+ ACE_INT32 type = 0,
+ ACE_INT32 priority = 0)
: connection_id_ (cid),
- type_ (type) {}
+ type_ (type),
+ priority_ (priority)
+ {
+ }
int operator== (const Event_Key &event_addr) const
{
@@ -74,7 +96,10 @@ public:
// Connection_Handler.
ACE_INT32 type_;
- // Event type.
+ // Event type, e.g., <ROUTING_EVENT> or <SUBSCRIPTION_EVENT>.
+
+ ACE_INT32 priority_;
+ // Event priority.
};
class Event_Header
@@ -91,23 +116,36 @@ public:
INVALID_ID = -1 // No peer can validly use this number.
};
+ Event_Header (ACE_INT32 len,
+ CONNECTION_ID connection_id,
+ ACE_INT32 type,
+ ACE_INT32 priority)
+ : len_ (len),
+ connection_id_ (connection_id),
+ type_ (type),
+ priority_ (priority)
+ {
+ }
+
void decode (void)
- {
- this->len_ = ntohl (this->len_);
- this->type_ = ntohl (this->type_);
- this->priority_ = ntohl (this->priority_);
- }
+ {
+ this->len_ = ntohl (this->len_);
+ this->connection_id_ = ntohl (this->connection_id_);
+ this->type_ = ntohl (this->type_);
+ this->priority_ = ntohl (this->priority_);
+ }
// Decode from network byte order to host byte order.
void encode (void)
- {
- this->len_ = htonl (this->len_);
- this->type_ = htonl (this->type_);
- this->priority_ = htonl (this->priority_);
- }
+ {
+ this->len_ = htonl (this->len_);
+ this->connection_id_ = htonl (this->connection_id_);
+ this->type_ = htonl (this->type_);
+ this->priority_ = htonl (this->priority_);
+ }
// Encode from host byte order to network byte order.
- size_t len_;
+ ACE_INT32 len_;
// Length of the data_ payload, in bytes.
CONNECTION_ID connection_id_;
@@ -115,7 +153,7 @@ public:
// Connection_Handler.
ACE_INT32 type_;
- // Event type.
+ // Event type, e.g., <ROUTING_EVENT> or <SUBSCRIPTION_EVENT>.
ACE_INT32 priority_;
// Event priority.
@@ -137,4 +175,43 @@ public:
// Event data.
};
-#endif /* EVENT */
+class Subscription
+{
+ // = TITLE
+ // Allows Consumers to subscribe to be routed information
+ // arriving from a particular Supplier connection id.
+public:
+ void decode (void)
+ {
+ this->connection_id_ = ntohl (this->connection_id_);
+
+ for (ACE_INT32 i = 0; i < this->total_consumers_; i++)
+ this->consumers_[i] = ntohl (this->consumers_[i]);
+
+ this->total_consumers_ = ntohl (this->total_consumers_);
+ }
+ // Decode from network byte order to host byte order.
+
+ void encode (void)
+ {
+ this->connection_id_ = htonl (this->connection_id_);
+ this->total_consumers_ = htonl (this->total_consumers_);
+
+ for (ACE_INT32 i = 0; i < this->total_consumers_; i++)
+ this->consumers_[i] = htonl (this->consumers_[i]);
+
+ }
+ // Encode from host byte order to network byte order.
+
+ ACE_INT32 connection_id_;
+ // Connection id.
+
+ ACE_INT32 consumers_[MAX_CONSUMERS];
+ // Connection ids for consumers that will be routed information
+ // containing this <connection_id_>
+
+ ACE_INT32 total_consumers_;
+ // Total number of these consumers.
+};
+
+#endif /* EVENT_H */
diff --git a/apps/Gateway/Gateway/Event_Channel.cpp b/apps/Gateway/Gateway/Event_Channel.cpp
index 7d3477166ed..60dbaecd4e7 100644
--- a/apps/Gateway/Gateway/Event_Channel.cpp
+++ b/apps/Gateway/Gateway/Event_Channel.cpp
@@ -81,51 +81,100 @@ Event_Channel::compute_performance_statistics (void)
int
Event_Channel::handle_timeout (const ACE_Time_Value &,
- const void *)
+ const void *)
{
// This is called periodically to compute performance statistics.
return this->compute_performance_statistics ();
}
-// This method forwards the <event> to Consumer that have registered
-// to receive it.
-
int
Event_Channel::put (ACE_Message_Block *event,
ACE_Time_Value *)
{
- // We got a valid event, so determine its virtual forwarding
- // address, which is stored in the first of the two event blocks,
- // which are chained together by <ACE::recv>.
-
- Event_Key *forwarding_addr = (Event_Key *) event->rd_ptr ();
+ // We got a valid event, so determine its type, which is stored in
+ // the first of the two <ACE_Message_Block>s, which are chained
+ // together by <ACE::recv>.
+ Event_Key *event_key = (Event_Key *) event->rd_ptr ();
- // Skip over the address portion and get the data.
+ // Skip over the address portion and get the data, which is in the
+ // second <ACE_Message_Block>.
ACE_Message_Block *data = event->cont ();
- // <dispatch_set> points to the set of Consumers associated with
- // this forwarding address.
+ switch (event_key->type_)
+ {
+ case ROUTING_EVENT:
+ this->routing_event (event_key,
+ data);
+ break;
+ case SUBSCRIPTION_EVENT:
+ this->subscription_event (data);
+ break;
+ }
+
+ // Release the memory in the message block.
+ event->release ();
+ return 0;
+}
+
+void
+Event_Channel::subscription_event (ACE_Message_Block *data)
+{
+ Event *event = (Event *) data->rd_ptr ();
+
+ ACE_DEBUG ((LM_DEBUG,
+ "(%t) received a subscription with %d bytes from connection id %d\n",
+ event->header_.len_,
+ event->header_.connection_id_));
+ Subscription *subscription = (Subscription *) event->data_;
+ // Convert the subscription into host byte order so that we can
+ // access it directly without having to repeatedly muck with it...
+ subscription->decode ();
+
+ ACE_DEBUG ((LM_DEBUG,
+ "(%t) connection_id_ = %d, total_consumers_ = %d\n",
+ subscription->connection_id_,
+ subscription->total_consumers_));
+
+ for (ACE_INT32 i = 0;
+ i < subscription->total_consumers_;
+ i++)
+ ACE_DEBUG ((LM_DEBUG,
+ "(%t) consumers_[%d] = %d\n",
+ i,
+ subscription->consumers_[i]));
+
+}
+
+void
+Event_Channel::routing_event (Event_Key *forwarding_address,
+ ACE_Message_Block *data)
+{
Consumer_Dispatch_Set *dispatch_set = 0;
- if (this->efd_.find (*forwarding_addr, dispatch_set) == -1)
+ // Initialize the <dispatch_set> to points to the set of Consumers
+ // associated with this forwarding address.
+
+ if (this->efd_.find (*forwarding_address,
+ dispatch_set) == -1)
// Failure.
ACE_ERROR ((LM_DEBUG,
"(%t) find failed on connection id = %d, type = %d\n",
- forwarding_addr->connection_id_,
- forwarding_addr->type_));
+ forwarding_address->connection_id_,
+ forwarding_address->type_));
else
{
// Check to see if there are any consumers.
if (dispatch_set->size () == 0)
ACE_DEBUG ((LM_WARNING,
- "there are no active consumers for this event currently\n"));
+ "there are no active consumers for this event currently\n"));
else // There are consumers, so forward the event.
{
+ // Initialize the interator.
Consumer_Dispatch_Set_Iterator dsi (*dispatch_set);
// At this point, we should assign a thread-safe locking
- // strategy to the Message_Block is we're running in a
+ // strategy to the <ACE_Message_Block> is we're running in a
// multi-threaded configuration.
data->locking_strategy (Options::instance ()->locking_strategy ());
@@ -147,11 +196,11 @@ Event_Channel::put (ACE_Message_Block *event,
{
if (errno == EWOULDBLOCK) // The queue has filled up!
ACE_ERROR ((LM_ERROR, "(%t) %p\n",
- "gateway is flow controlled, so we're dropping events"));
+ "gateway is flow controlled, so we're dropping events"));
else
ACE_ERROR ((LM_ERROR,
"(%t) %p transmission error to peer %d\n",
- "put",
+ "put",
(*connection_handler)->connection_id ()));
// We are responsible for releasing an
@@ -162,16 +211,6 @@ Event_Channel::put (ACE_Message_Block *event,
}
}
}
-
- // Release the memory in the message block.
- event->release ();
- return 0;
-}
-
-int
-Event_Channel::svc (void)
-{
- return 0;
}
int
@@ -284,18 +323,28 @@ Event_Channel::initiate_acceptors (void)
(Options::instance ()->consumer_acceptor_port (),
ACE_Reactor::instance (),
Options::instance ()->blocking_semantics ()) == -1)
- ACE_ERROR_RETURN ((LM_ERROR, "%p\n",
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "%p\n",
"cannot register acceptor"),
-1);
+ else
+ ACE_DEBUG ((LM_DEBUG,
+ "accepting Consumers at %d\n",
+ Options::instance ()->consumer_acceptor_port ()));
if (Options::instance ()->enabled (Options::SUPPLIER_CONNECTOR)
&& this->supplier_acceptor_.open
(Options::instance ()->supplier_acceptor_port (),
ACE_Reactor::instance (),
Options::instance ()->blocking_semantics ()) == -1)
- ACE_ERROR_RETURN ((LM_ERROR, "%p\n",
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "%p\n",
"cannot register acceptor"),
-1);
+ else
+ ACE_DEBUG ((LM_DEBUG,
+ "accepting Suppliers at %d\n",
+ Options::instance ()->supplier_acceptor_port ()));
return 0;
}
@@ -306,12 +355,15 @@ Event_Channel::initiate_acceptors (void)
int
Event_Channel::close (u_long)
{
- if (Options::instance ()->threading_strategy ()
- != Options::REACTIVE)
+ if (Options::instance ()->threading_strategy () != Options::REACTIVE)
{
if (ACE_Thread_Manager::instance ()->suspend_all () == -1)
- ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "suspend_all"), -1);
- ACE_DEBUG ((LM_DEBUG, "(%t) suspending all threads\n"));
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "(%t) %p\n",
+ "suspend_all"),
+ -1);
+ ACE_DEBUG ((LM_DEBUG,
+ "(%t) suspending all threads\n"));
}
// First tell everyone that the spaceship is here...
@@ -408,7 +460,7 @@ Event_Channel::bind_proxy (Connection_Handler *connection_handler)
int
Event_Channel::subscribe (const Event_Key &event_addr,
- Consumer_Dispatch_Set *cds)
+ Consumer_Dispatch_Set *cds)
{
int result = this->efd_.bind (event_addr, cds);
diff --git a/apps/Gateway/Gateway/Event_Channel.h b/apps/Gateway/Gateway/Event_Channel.h
index f90815be3af..dfe5faa6fd4 100644
--- a/apps/Gateway/Gateway/Event_Channel.h
+++ b/apps/Gateway/Gateway/Event_Channel.h
@@ -69,7 +69,7 @@ public:
// Subscribe the <Consumer_Dispatch_Set> to receive events that
// match <Event_Key>.
- // = Event forwarding method.
+ // = Event processing entry point.
virtual int put (ACE_Message_Block *mb,
ACE_Time_Value * = 0);
// Pass <mb> to the Event Channel so it can forward it to Consumers.
@@ -82,12 +82,18 @@ public:
// Suppliers.
private:
- virtual int svc (void);
- // Run as an active object.
-
int parse_args (int argc, char *argv[]);
// Parse the command-line arguments.
+ // = Methods for handling events.
+ void routing_event (Event_Key *event_key,
+ ACE_Message_Block *data);
+ // Forwards the <data> to Consumer that have registered to receive
+ // it, based on addressing information in the <event_key>.
+
+ void subscription_event (ACE_Message_Block *data);
+ // Add a Consumer subscription.
+
int compute_performance_statistics (void);
// Perform timer-based performance profiling.
diff --git a/apps/Gateway/Gateway/Gateway.cpp b/apps/Gateway/Gateway/Gateway.cpp
index 383e9705acb..fee231cfe15 100644
--- a/apps/Gateway/Gateway/Gateway.cpp
+++ b/apps/Gateway/Gateway/Gateway.cpp
@@ -184,17 +184,47 @@ Gateway::parse_connection_config_file (void)
Options::instance ()->connection_config_file ()),
-1);
+ // Keep track of the previous connection id to make sure the
+ // connection config file isn't corrupted.
+ int previous_connection_id = 0;
+
// Read config file one line at a time.
+
for (Connection_Config_Info pci;
connection_file.read_entry (pci, line_number) != FP::EOFILE;
)
{
file_empty = 0;
+ // First time in check.
+ if (previous_connection_id == 0)
+ {
+ previous_connection_id == 1;
+
+ if (pci.connection_id_ != 1)
+ ACE_DEBUG ((LM_DEBUG,
+ "(%t) warning, the first connection id should be 1 not %d\n",
+ pci.connection_id_));
+ }
+ else if (previous_connection_id + 1 != pci.connection_id_)
+ ACE_DEBUG ((LM_DEBUG,
+ "(%t) warning, connection ids should keep increasing by 1 and %d + 1 != %d\n",
+ previous_connection_id,
+ pci.connection_id_));
+
+ // Update the last connection id to ensure that we monotonically
+ // increase by 1.
+ previous_connection_id = pci.connection_id_;
+
if (Options::instance ()->enabled (Options::DEBUG))
ACE_DEBUG ((LM_DEBUG,
- "(%t) conn id = %d, host = %s, remote port = %d, proxy role = %c, "
- "max retry timeout = %d, local port = %d, priority = %d\n",
+ "(%t) conn id = %d, "
+ "host = %s, "
+ "remote port = %d, "
+ "proxy role = %c, "
+ "max retry timeout = %d, "
+ "local port = %d, "
+ "priority = %d\n",
pci.connection_id_,
pci.host_,
pci.remote_port_,
@@ -216,6 +246,10 @@ Gateway::parse_connection_config_file (void)
this->event_channel_.bind_proxy (connection_handler);
}
+ // Keep track of the next available connection id, which is
+ // necessary for Peers that connect with us, rather than vice versa.
+ Options::instance ()->connection_id () = previous_connection_id + 1;
+
if (file_empty)
ACE_ERROR ((LM_WARNING,
"warning: connection connection_handler configuration file was empty\n"));
@@ -260,7 +294,9 @@ Gateway::parse_consumer_config_file (void)
}
Consumer_Dispatch_Set *dispatch_set;
- ACE_NEW_RETURN (dispatch_set, Consumer_Dispatch_Set, -1);
+ ACE_NEW_RETURN (dispatch_set,
+ Consumer_Dispatch_Set,
+ -1);
Event_Key event_addr (cci_entry.connection_id_,
cci_entry.type_);
diff --git a/apps/Gateway/Gateway/Options.cpp b/apps/Gateway/Gateway/Options.cpp
index 59281148879..0d93e4cd7fa 100644
--- a/apps/Gateway/Gateway/Options.cpp
+++ b/apps/Gateway/Gateway/Options.cpp
@@ -31,7 +31,8 @@ Options::Options (void)
supplier_connector_port_ (DEFAULT_PEER_SUPPLIER_PORT),
consumer_connector_port_ (DEFAULT_PEER_CONSUMER_PORT),
max_timeout_ (MAX_TIMEOUT),
- max_queue_size_ (MAX_QUEUE_SIZE)
+ max_queue_size_ (MAX_QUEUE_SIZE),
+ connection_id_ (1)
{
ACE_OS::strcpy (this->connection_config_file_, "connection_config");
ACE_OS::strcpy (this->consumer_config_file_, "consumer_config");
@@ -66,6 +67,12 @@ Options::performance_window (void) const
return this->performance_window_;
}
+CONNECTION_ID &
+Options::connection_id (void)
+{
+ return this->connection_id_;
+}
+
long
Options::max_timeout (void) const
{
diff --git a/apps/Gateway/Gateway/Options.h b/apps/Gateway/Gateway/Options.h
index 090ab222dc2..da61b8202c3 100644
--- a/apps/Gateway/Gateway/Options.h
+++ b/apps/Gateway/Gateway/Options.h
@@ -111,6 +111,9 @@ public:
long max_queue_size (void) const;
// The maximum size of the queue.
+ CONNECTION_ID &connection_id (void);
+ // Returns a reference to the next available connection id;
+
private:
enum
{
@@ -173,6 +176,9 @@ private:
long max_queue_size_;
// The maximum size of the queue.
+ CONNECTION_ID connection_id_;
+ // The next available connection id.
+
char connection_config_file_[MAXPATHLEN + 1];
// Name of the connection configuration file.
diff --git a/apps/Gateway/Gateway/connection_config b/apps/Gateway/Gateway/connection_config
index 205b43d5bc3..ce6fa6b4adf 100644
--- a/apps/Gateway/Gateway/connection_config
+++ b/apps/Gateway/Gateway/connection_config
@@ -11,6 +11,9 @@
# Suppliers using that connection. The Connection ID field is the
# "key" that is used to match up connections in this file with the
# Consumer subscription requests in the "consumer_config" file.
+# The connection ids should start at 1 and monotonically increase
+# by increments of 1. This makes it possible for the Gateway to
+# properly allocate connection ids for Peers that connect to it.
#
# 2. Host -- The host name where the Supplier/Consumer peerd
# process is running.
@@ -42,8 +45,8 @@
# Connection Host Remote Handler Max Retry Local Priority
# ID Port Role Timeout Port
# ---------- -------- ------ ------ ---------- ----- --------
- 1 lindy * S * * 1
- 2 polka * C * * 1
+ 1 flamenco * S * * 1
+ 2 lindy * C * * 1
# 3 mambo.cs * C * * 1
# 4 lambada.cs * C * * 1
# 5 lambada.cs * C * * 1
diff --git a/apps/Gateway/Gateway/svc.conf b/apps/Gateway/Gateway/svc.conf
index 8eff73af0dc..9b35a7dcbd6 100644
--- a/apps/Gateway/Gateway/svc.conf
+++ b/apps/Gateway/Gateway/svc.conf
@@ -1,3 +1,3 @@
#static Svc_Manager "-d -p 2913"
-dynamic Gateway Service_Object * ./Gateway:_make_Gateway() active "-b -d -c C|S -P connection_config -C consumer_config"
+dynamic Gateway Service_Object * ./Gateway:_make_Gateway() active "-b -d -c C|S -a C|S -P connection_config -C consumer_config"
diff --git a/apps/Gateway/Peer/Options.cpp b/apps/Gateway/Peer/Options.cpp
index 5199be47304..f3fe8f119dd 100644
--- a/apps/Gateway/Peer/Options.cpp
+++ b/apps/Gateway/Peer/Options.cpp
@@ -9,7 +9,7 @@ Options *Options::instance_ = 0;
void
Options::print_usage_and_die (void)
{
- ACE_DEBUG ((LM_DEBUG, "%n [-a {C|S}:acceptor-port] [-c {C|S}:connector-port] [-h gateway-host] [-q max-queue-size] [-t timeout] [-v]\n"));
+ ACE_DEBUG ((LM_DEBUG, "%n [-a {C|S}:acceptor-port] [-c {C|S}:connector-port] [-C connection-id] [-h gateway-host] [-q max-queue-size] [-t timeout] [-v]\n"));
}
Options::Options (void)
@@ -20,7 +20,8 @@ Options::Options (void)
consumer_connector_port_ (DEFAULT_GATEWAY_CONSUMER_PORT),
connector_host_ (ACE_DEFAULT_SERVER_HOST),
timeout_ (0),
- max_queue_size_ (MAX_QUEUE_SIZE)
+ max_queue_size_ (MAX_QUEUE_SIZE),
+ connection_id_ (0)
{
char *timeout = ACE_OS::getenv ("TIMEOUT");
@@ -45,6 +46,12 @@ Options::timeout (void) const
return this->timeout_;
}
+CONNECTION_ID &
+Options::connection_id (void)
+{
+ return this->connection_id_;
+}
+
long
Options::max_queue_size (void) const
{
@@ -148,6 +155,10 @@ Options::parse_args (int argc, char *argv[])
}
break;
/* NOTREACHED */
+ case 'C':
+ this->connection_id_ = ACE_OS::atoi (get_opt.optarg);
+ break;
+ /* NOTREACHED */
case 'h':
// connector host
this->connector_host_ = get_opt.optarg;
diff --git a/apps/Gateway/Peer/Options.h b/apps/Gateway/Peer/Options.h
index d9bb02facca..c957e1a295e 100644
--- a/apps/Gateway/Peer/Options.h
+++ b/apps/Gateway/Peer/Options.h
@@ -72,6 +72,9 @@ public:
long max_queue_size (void) const;
// The maximum size of the queue.
+ CONNECTION_ID &connection_id (void);
+ // Returns a reference to the connection id.
+
private:
enum
{
@@ -122,6 +125,9 @@ private:
long max_queue_size_;
// The maximum size that the queue can grow to.
+
+ CONNECTION_ID connection_id_;
+ // The connection id.
};
#endif /* OPTIONS_H */
diff --git a/apps/Gateway/Peer/Peer.cpp b/apps/Gateway/Peer/Peer.cpp
index d1125d5d1c1..f65db04f6ca 100644
--- a/apps/Gateway/Peer/Peer.cpp
+++ b/apps/Gateway/Peer/Peer.cpp
@@ -57,10 +57,45 @@ Peer_Handler::open (void *a)
return 0;
}
+void
+Peer_Handler::transmit (ACE_Message_Block *mb,
+ size_t n,
+ int event_type)
+{
+ Event *event = (Event *) mb->rd_ptr ();
+
+ // Initialize the header.
+ new (&event->header_) Event_Header (n,
+ this->connection_id_,
+ 0,
+ event_type);
+
+ // Convert all the fields into network byte order.
+ event->header_.encode ();
+
+ // Move the write pointer to the end of the event.
+ mb->wr_ptr (sizeof (Event_Header) + n);
+
+ if (this->put (mb) == -1)
+ {
+ if (errno == EWOULDBLOCK) // The queue has filled up!
+ ACE_ERROR ((LM_ERROR,
+ "%p\n",
+ "gateway is flow controlled, so we're dropping events"));
+ else
+ ACE_ERROR ((LM_ERROR,
+ "%p\n",
+ "transmission failure in transmit_stdin"));
+ // Caller is responsible for freeing a ACE_Message_Block
+ // if failures occur.
+ mb->release ();
+ }
+}
+
// Read events from stdin and send them to the gatewayd.
int
-Peer_Handler::xmit_stdin (void)
+Peer_Handler::transmit_stdin (void)
{
if (this->connection_id_ != -1)
{
@@ -88,36 +123,16 @@ Peer_Handler::xmit_stdin (void)
ACE_Event_Handler::DONT_CALL | ACE_Event_Handler::READ_MASK);
mb->release ();
break;
+ /* NOTREACHED */
case -1:
mb->release ();
ACE_ERROR ((LM_ERROR, "%p\n", "read"));
break;
+ /* NOTREACHED */
default:
- event->header_.connection_id_ = this->connection_id_;
- event->header_.len_ = n;
- event->header_.priority_ = 0;
- event->header_.type_ = 0;
-
- // Convert all the fields into network byte order.
- event->header_.encode ();
-
- // Move the write pointer to the end of the event.
- mb->wr_ptr (sizeof (Event_Header) + n);
-
- if (this->put (mb) == -1)
- {
- if (errno == EWOULDBLOCK) // The queue has filled up!
- ACE_ERROR ((LM_ERROR,
- "%p\n",
- "gateway is flow controlled, so we're dropping events"));
- else
- ACE_ERROR ((LM_ERROR,
- "%p\n",
- "transmission failure in xmit_stdin"));
- // Caller is responsible for freeing a ACE_Message_Block
- // if failures occur.
- mb->release ();
- }
+ this->transmit (mb, n, ROUTING_EVENT);
+ break;
+ /* NOTREACHED */
}
}
@@ -144,7 +159,7 @@ Peer_Handler::nonblk_put (ACE_Message_Block *mb)
{
// We didn't manage to send everything, so requeue.
ACE_DEBUG ((LM_DEBUG,
- "queueing activated on handle %d to supplier id %d\n",
+ "queueing activated on handle %d to connection id %d\n",
this->get_handle (),
this->connection_id_));
@@ -207,7 +222,7 @@ Peer_Handler::handle_output (ACE_HANDLE)
if (this->msg_queue ()->is_empty ())
{
ACE_DEBUG ((LM_DEBUG,
- "queue now empty on handle %d to supplier id %d\n",
+ "queue now empty on handle %d to connection id %d\n",
this->get_handle (),
this->connection_id_));
@@ -394,7 +409,7 @@ Peer_Handler::recv (ACE_Message_Block *&mb)
}
ACE_DEBUG ((LM_DEBUG,
- "(%t) supplier id = %d, cur len = %d, total bytes read = %d\n",
+ "(%t) connection id = %d, cur len = %d, total bytes read = %d\n",
event->header_.connection_id_,
event->header_.len_,
data_received + header_received));
@@ -415,14 +430,14 @@ Peer_Handler::handle_input (ACE_HANDLE sd)
{
ACE_DEBUG ((LM_DEBUG, "in handle_input, sd = %d\n", sd));
if (sd == ACE_STDIN) // Handle event from stdin.
- return this->xmit_stdin ();
+ return this->transmit_stdin ();
else
// Perform the appropriate action depending on the state we are
// in.
return (this->*do_action_) ();
}
-// Action that receives our supplier id from the Gateway.
+// Action that receives our connection id from the Gateway.
int
Peer_Handler::await_connection_id (void)
@@ -452,6 +467,10 @@ Peer_Handler::await_connection_id (void)
this->connection_id_));
}
+ // Subscribe for events if we're a Consumer.
+ if (Options::instance ()->enabled (Options::CONSUMER_CONNECTOR))
+ this->subscribe ();
+
// Transition to the action that waits for Peer events.
this->do_action_ = &Peer_Handler::await_events;
@@ -467,7 +486,21 @@ Peer_Handler::await_connection_id (void)
return 0;
}
-// Action that receives events.
+int
+Peer_Handler::subscribe (void)
+{
+ ACE_Message_Block *mb;
+
+ ACE_NEW_RETURN (mb,
+ ACE_Message_Block (sizeof (Event)),
+ -1);
+
+ Subscription *subscription = (Subscription *) ((Event *) mb->rd_ptr ())->data_;
+ subscription->connection_id_ = Options::instance ()->connection_id ();
+ this->transmit (mb, sizeof *subscription, SUBSCRIPTION_EVENT);
+}
+
+// Action that receives events from the Gateway.
int
Peer_Handler::await_events (void)
diff --git a/apps/Gateway/Peer/Peer.h b/apps/Gateway/Peer/Peer.h
index d2fa6b17e40..80707d9e3e1 100644
--- a/apps/Gateway/Peer/Peer.h
+++ b/apps/Gateway/Peer/Peer.h
@@ -94,18 +94,26 @@ public:
protected:
typedef ACE_Svc_Handler<ACE_SOCK_STREAM, ACE_NULL_SYNCH> inherited;
- virtual int recv (ACE_Message_Block *&);
+ void transmit (ACE_Message_Block *mb,
+ size_t n,
+ int event_type);
+ // Transmit <mb> to the gatewayd.
+
+ virtual int recv (ACE_Message_Block *&mb);
// Receive an Peer event from gatewayd.
- virtual int send (ACE_Message_Block *);
+ virtual int send (ACE_Message_Block *mb);
// Send an Peer event to gatewayd, using <nonblk_put>.
virtual int nonblk_put (ACE_Message_Block *mb);
// Perform a non-blocking <put>, which tries to send an event to the
// gatewayd, but only if it isn't flow controlled.
+ int subscribe (void);
+ // Register Consumer subscriptions with the gateway.
+
// = Event/state/action handlers.
- int xmit_stdin (void);
+ int transmit_stdin (void);
// Receive a event from stdin and send it to the gateway.
int await_connection_id (void);