summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorschmidt <douglascraigschmidt@users.noreply.github.com>1998-01-04 05:01:11 +0000
committerschmidt <douglascraigschmidt@users.noreply.github.com>1998-01-04 05:01:11 +0000
commit889c400c0c7ffe71aadef2ce585663db418d3eee (patch)
tree5a9967b15bb49d65d599731a4ff56a103039f239
parent0ab24c128fda23ce5d131e94127180f7cc2ce998 (diff)
downloadATCD-889c400c0c7ffe71aadef2ce585663db418d3eee.tar.gz
*** empty log message ***
-rw-r--r--apps/Gateway/Gateway/Concrete_Connection_Handlers.cpp (renamed from apps/Gateway/Gateway/Concrete_Proxy_Handlers.cpp)126
-rw-r--r--apps/Gateway/Gateway/Concrete_Connection_Handlers.h (renamed from apps/Gateway/Gateway/Concrete_Proxy_Handlers.h)59
-rw-r--r--apps/Gateway/Gateway/Config_Files.cpp130
-rw-r--r--apps/Gateway/Gateway/Config_Files.h29
-rw-r--r--apps/Gateway/Gateway/Connection_Handler.cpp (renamed from apps/Gateway/Gateway/Proxy_Handler.cpp)120
-rw-r--r--apps/Gateway/Gateway/Connection_Handler.h (renamed from apps/Gateway/Gateway/Proxy_Handler.h)65
-rw-r--r--apps/Gateway/Gateway/Connection_Handler_Acceptor.cpp33
-rw-r--r--apps/Gateway/Gateway/Connection_Handler_Acceptor.h51
-rw-r--r--apps/Gateway/Gateway/Connection_Handler_Connector.cpp73
-rw-r--r--apps/Gateway/Gateway/Connection_Handler_Connector.h (renamed from apps/Gateway/Gateway/Proxy_Handler_Connector.h)19
-rw-r--r--apps/Gateway/Gateway/Consumer_Dispatch_Set.h12
-rw-r--r--apps/Gateway/Gateway/Event.h45
-rw-r--r--apps/Gateway/Gateway/Event_Channel.cpp189
-rw-r--r--apps/Gateway/Gateway/Event_Channel.h58
-rw-r--r--apps/Gateway/Gateway/File_Parser.h10
-rw-r--r--apps/Gateway/Gateway/Gateway.cpp69
-rw-r--r--apps/Gateway/Gateway/Makefile52
-rw-r--r--apps/Gateway/Gateway/Options.cpp21
-rw-r--r--apps/Gateway/Gateway/Options.h26
-rw-r--r--apps/Gateway/Gateway/Proxy_Handler_Acceptor.cpp16
-rw-r--r--apps/Gateway/Gateway/Proxy_Handler_Acceptor.h40
-rw-r--r--apps/Gateway/Gateway/Proxy_Handler_Connector.cpp73
-rw-r--r--apps/Gateway/Gateway/connection_config52
-rw-r--r--apps/Gateway/Gateway/consumer_config39
-rw-r--r--apps/Gateway/Gateway/proxy_config51
-rw-r--r--apps/Gateway/Peer/Options.cpp4
-rw-r--r--apps/Gateway/Peer/Peer.cpp40
-rw-r--r--apps/Gateway/Peer/Peer.h6
28 files changed, 778 insertions, 730 deletions
diff --git a/apps/Gateway/Gateway/Concrete_Proxy_Handlers.cpp b/apps/Gateway/Gateway/Concrete_Connection_Handlers.cpp
index 9e0b6c45fa9..f60a8068db4 100644
--- a/apps/Gateway/Gateway/Concrete_Proxy_Handlers.cpp
+++ b/apps/Gateway/Gateway/Concrete_Connection_Handlers.cpp
@@ -2,41 +2,41 @@
#define ACE_BUILD_SVC_DLL
#include "Event_Channel.h"
-#include "Concrete_Proxy_Handlers.h"
+#include "Concrete_Connection_Handlers.h"
-Consumer_Proxy::Consumer_Proxy (const Proxy_Config_Info &pci)
- : Proxy_Handler (pci)
+Consumer_Handler::Consumer_Handler (const Connection_Config_Info &pci)
+ : Connection_Handler (pci)
{
- this->proxy_role_ = 'C';
- this->msg_queue ()->high_water_mark (Consumer_Proxy::MAX_QUEUE_SIZE);
+ this->connection_role_ = 'C';
+ this->msg_queue ()->high_water_mark (Options::instance ()->max_queue_size ());
}
// This method should be called only when the Consumer shuts down
-// unexpectedly. This method simply marks the Proxy_Handler as having
+// unexpectedly. This method simply marks the Connection_Handler as having
// failed so that handle_close () can reconnect.
int
-Consumer_Proxy::handle_input (ACE_HANDLE)
+Consumer_Handler::handle_input (ACE_HANDLE)
{
char buf[1];
- this->state (Proxy_Handler::FAILED);
+ this->state (Connection_Handler::FAILED);
switch (this->peer ().recv (buf, sizeof buf))
{
case -1:
ACE_ERROR_RETURN ((LM_ERROR,
- "(%t) Peer has failed unexpectedly for Consumer_Proxy %d\n",
+ "(%t) Peer has failed unexpectedly for Consumer_Handler %d\n",
this->id ()), -1);
/* NOTREACHED */
case 0:
ACE_ERROR_RETURN ((LM_ERROR,
- "(%t) Peer has shutdown unexpectedly for Consumer_Proxy %d\n",
+ "(%t) Peer has shutdown unexpectedly for Consumer_Handler %d\n",
this->id ()), -1);
/* NOTREACHED */
default:
ACE_ERROR_RETURN ((LM_ERROR,
- "(%t) Consumer is erroneously sending input to Consumer_Proxy %d\n",
+ "(%t) Consumer is erroneously sending input to Consumer_Handler %d\n",
this->id ()), -1);
/* NOTREACHED */
}
@@ -47,7 +47,7 @@ Consumer_Proxy::handle_input (ACE_HANDLE)
// Event_List.
int
-Consumer_Proxy::nonblk_put (ACE_Message_Block *event)
+Consumer_Handler::nonblk_put (ACE_Message_Block *event)
{
// Try to send the event. If we don't send it all (e.g., due to
// flow control), then re-queue the remainder at the head of the
@@ -60,7 +60,7 @@ Consumer_Proxy::nonblk_put (ACE_Message_Block *event)
{
// Things have gone wrong, let's try to close down and set up a
// new reconnection by calling handle_close().
- this->state (Proxy_Handler::FAILED);
+ this->state (Connection_Handler::FAILED);
this->handle_close ();
return -1;
}
@@ -86,7 +86,7 @@ Consumer_Proxy::nonblk_put (ACE_Message_Block *event)
}
ssize_t
-Consumer_Proxy::send (ACE_Message_Block *event)
+Consumer_Handler::send (ACE_Message_Block *event)
{
ACE_DEBUG ((LM_DEBUG, "(%t) sending %d bytes to Consumer %d\n",
event->length (), this->id ()));
@@ -114,7 +114,7 @@ Consumer_Proxy::send (ACE_Message_Block *event)
// This method is automatically called by the ACE_Reactor.
int
-Consumer_Proxy::handle_output (ACE_HANDLE)
+Consumer_Handler::handle_output (ACE_HANDLE)
{
ACE_Message_Block *event = 0;
@@ -169,7 +169,7 @@ Consumer_Proxy::handle_output (ACE_HANDLE)
// Send an event to a Consumer (may queue if necessary).
int
-Consumer_Proxy::put (ACE_Message_Block *event, ACE_Time_Value *)
+Consumer_Handler::put (ACE_Message_Block *event, ACE_Time_Value *)
{
if (this->msg_queue ()->is_empty ())
// Try to send the event *without* blocking!
@@ -181,11 +181,11 @@ Consumer_Proxy::put (ACE_Message_Block *event, ACE_Time_Value *)
(event, (ACE_Time_Value *) &ACE_Time_Value::zero);
}
-Supplier_Proxy::Supplier_Proxy (const Proxy_Config_Info &pci)
- : Proxy_Handler (pci),
+Supplier_Handler::Supplier_Handler (const Connection_Config_Info &pci)
+ : Connection_Handler (pci),
msg_frag_ (0)
{
- this->proxy_role_ = 'S';
+ this->connection_role_ = 'S';
this->msg_queue ()->high_water_mark (0);
}
@@ -201,7 +201,7 @@ Supplier_Proxy::Supplier_Proxy (const Proxy_Config_Info &pci)
// of software from knowledge of the event structure.
int
-Supplier_Proxy::recv (ACE_Message_Block *&forward_addr)
+Supplier_Handler::recv (ACE_Message_Block *&forward_addr)
{
if (this->msg_frag_ == 0)
// No existing fragment...
@@ -329,8 +329,7 @@ Supplier_Proxy::recv (ACE_Message_Block *&forward_addr)
}
Event_Key event_addr (this->id (),
- event->header_.supplier_id_,
- event->header_.type_);
+ event->header_.type_);
// Copy the forwarding address from the Event_Key into
// forward_addr.
forward_addr->copy ((char *) &event_addr, sizeof (Event_Key));
@@ -340,10 +339,16 @@ Supplier_Proxy::recv (ACE_Message_Block *&forward_addr)
}
this->total_bytes (data_received + header_received);
- ACE_DEBUG ((LM_DEBUG, "(%t) supplier id = %d, cur len = %d, total bytes read = %d\n",
- event->header_.supplier_id_, event->header_.len_, data_received + header_received));
+ ACE_DEBUG ((LM_DEBUG,
+ "(%t) connection id = %d, cur len = %d, total bytes read = %d\n",
+ event->header_.connection_id_,
+ event->header_.len_,
+ data_received + header_received));
if (Options::instance ()->enabled (Options::VERBOSE))
- ACE_DEBUG ((LM_DEBUG, "data_ = %*s\n", event->header_.len_ - 2, event->data_));
+ ACE_DEBUG ((LM_DEBUG,
+ "data_ = %*s\n",
+ event->header_.len_ - 2,
+ event->data_));
// Encode before returning so that we can set things out in
// network byte order.
@@ -356,7 +361,7 @@ Supplier_Proxy::recv (ACE_Message_Block *&forward_addr)
// gatewayd, as well as stdio).
int
-Supplier_Proxy::handle_input (ACE_HANDLE)
+Supplier_Handler::handle_input (ACE_HANDLE)
{
ACE_Message_Block *forward_addr = 0;
@@ -365,9 +370,9 @@ Supplier_Proxy::handle_input (ACE_HANDLE)
case 0:
// Note that a peer should never initiate a shutdown by closing
// the connection. Instead, it should reconnect.
- this->state (Proxy_Handler::FAILED);
+ this->state (Connection_Handler::FAILED);
ACE_ERROR_RETURN ((LM_ERROR,
- "(%t) Peer has closed down unexpectedly for Input Proxy_Handler %d\n",
+ "(%t) Peer has closed down unexpectedly for Input Connection_Handler %d\n",
this->id ()), -1);
/* NOTREACHED */
case -1:
@@ -376,8 +381,8 @@ Supplier_Proxy::handle_input (ACE_HANDLE)
return 0;
else // A weird problem occurred, shut down and start again.
{
- this->state (Proxy_Handler::FAILED);
- ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p for Input Proxy_Handler %d\n",
+ this->state (Connection_Handler::FAILED);
+ ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p for Input Connection_Handler %d\n",
"Peer has failed unexpectedly",
this->id ()), -1);
}
@@ -388,32 +393,32 @@ Supplier_Proxy::handle_input (ACE_HANDLE)
}
// Forward an event to its appropriate Consumer(s). This delegates to
-// the <ACE_Event_Channel> to do the actual forwarding.
+// the <Event_Channel> to do the actual forwarding.
int
-Supplier_Proxy::forward (ACE_Message_Block *forward_addr)
+Supplier_Handler::forward (ACE_Message_Block *forward_addr)
{
return this->event_channel_->put (forward_addr);
}
#if defined (ACE_HAS_THREADS)
-Thr_Consumer_Proxy::Thr_Consumer_Proxy (const Proxy_Config_Info &pci)
- : Consumer_Proxy (pci)
+Thr_Consumer_Handler::Thr_Consumer_Handler (const Connection_Config_Info &pci)
+ : Consumer_Handler (pci)
{
}
// This method should be called only when the Consumer shuts down
-// unexpectedly. This method marks the Proxy_Handler as having failed
+// unexpectedly. This method marks the Connection_Handler as having failed
// and deactivates the ACE_Message_Queue (to wake up the thread
// blocked on <dequeue_head> in svc()).
// Thr_Output_Handler::handle_close () will eventually try to
// reconnect...
int
-Thr_Consumer_Proxy::handle_input (ACE_HANDLE h)
+Thr_Consumer_Handler::handle_input (ACE_HANDLE h)
{
- // Call down to the <Consumer_Proxy> to handle this first.
- this->Consumer_Proxy::handle_input (h);
+ // Call down to the <Consumer_Handler> to handle this first.
+ this->Consumer_Handler::handle_input (h);
ACE_Reactor::instance ()->remove_handler
(h, ACE_Event_Handler::ALL_EVENTS_MASK | ACE_Event_Handler::DONT_CALL);
@@ -423,19 +428,19 @@ Thr_Consumer_Proxy::handle_input (ACE_HANDLE h)
return 0;
}
-// Initialize the threaded Consumer_Proxy object and spawn a new
+// Initialize the threaded Consumer_Handler object and spawn a new
// thread.
int
-Thr_Consumer_Proxy::open (void *)
+Thr_Consumer_Handler::open (void *)
{
// Turn off non-blocking I/O.
if (this->peer ().disable (ACE_NONBLOCK) == -1)
ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "enable"), -1);
// Call back to the <Event_Channel> to complete our initialization.
- else if (this->event_channel_->complete_proxy_connection (this) == -1)
- ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "complete_proxy_connection"), -1);
+ else if (this->event_channel_->complete_connection_connection (this) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "complete_connection_connection"), -1);
// Register ourselves to receive input events (which indicate that
// the Consumer has shut down unexpectedly).
@@ -461,10 +466,10 @@ Thr_Consumer_Proxy::open (void *)
}
// Queue up an event for transmission (must not block since
-// Supplier_Proxys may be single-threaded).
+// Supplier_Handlers may be single-threaded).
int
-Thr_Consumer_Proxy::put (ACE_Message_Block *mb, ACE_Time_Value *)
+Thr_Consumer_Handler::put (ACE_Message_Block *mb, ACE_Time_Value *)
{
// Perform non-blocking enqueue.
return this->msg_queue ()->enqueue_tail
@@ -475,13 +480,13 @@ Thr_Consumer_Proxy::put (ACE_Message_Block *mb, ACE_Time_Value *)
// threads...)
int
-Thr_Consumer_Proxy::svc (void)
+Thr_Consumer_Handler::svc (void)
{
for (;;)
{
ACE_DEBUG ((LM_DEBUG,
- "(%t) Thr_Consumer_Proxy's handle = %d\n",
+ "(%t) Thr_Consumer_Handler's handle = %d\n",
this->peer ().get_handle ()));
// Since this method runs in its own thread it is OK to block on
@@ -496,14 +501,14 @@ Thr_Consumer_Proxy::svc (void)
ACE_ASSERT (errno == ESHUTDOWN);
ACE_DEBUG ((LM_DEBUG,
- "(%t) shutting down threaded Consumer_Proxy %d on handle %d\n",
+ "(%t) shutting down threaded Consumer_Handler %d on handle %d\n",
this->id (), this->get_handle ()));
this->peer ().close ();
for (this->timeout (1);
// Default is to reconnect synchronously.
- this->event_channel_->initiate_proxy_connection (this) == -1; )
+ this->event_channel_->initiate_connection_connection (this) == -1; )
{
ACE_Time_Value tv (this->timeout ());
@@ -519,21 +524,21 @@ Thr_Consumer_Proxy::svc (void)
return 0;
}
-Thr_Supplier_Proxy::Thr_Supplier_Proxy (const Proxy_Config_Info &pci)
- : Supplier_Proxy (pci)
+Thr_Supplier_Handler::Thr_Supplier_Handler (const Connection_Config_Info &pci)
+ : Supplier_Handler (pci)
{
}
int
-Thr_Supplier_Proxy::open (void *)
+Thr_Supplier_Handler::open (void *)
{
// Turn off non-blocking I/O.
if (this->peer ().disable (ACE_NONBLOCK) == -1)
ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "enable"), -1);
// Call back to the <Event_Channel> to complete our initialization.
- else if (this->event_channel_->complete_proxy_connection (this) == -1)
- ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "complete_proxy_connection"), -1);
+ else if (this->event_channel_->complete_connection_connection (this) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "complete_connection_connection"), -1);
// Reactivate message queue. If it was active then this is the
// first time in and we need to spawn a thread, otherwise the queue
@@ -556,24 +561,25 @@ Thr_Supplier_Proxy::open (void *)
// existing code!).
int
-Thr_Supplier_Proxy::svc (void)
+Thr_Supplier_Handler::svc (void)
{
for (;;)
{
ACE_DEBUG ((LM_DEBUG,
- "(%t) Thr_Supplier_Proxy's handle = %d\n",
+ "(%t) Thr_Supplier_Handler's handle = %d\n",
this->peer ().get_handle ()));
// Since this method runs in its own thread and processes events
// for one connection it is OK to call down to the
- // <Supplier_Proxy::handle_input> method, which blocks on input.
+ // <Supplier_Handler::handle_input> method, which blocks on input.
- while (this->Supplier_Proxy::handle_input () != -1)
+ while (this->Supplier_Handler::handle_input () != -1)
continue;
ACE_DEBUG ((LM_DEBUG,
- "(%t) shutting down threaded Supplier_Proxy %d on handle %d\n",
- this->id (), this->get_handle ()));
+ "(%t) shutting down threaded Supplier_Handler %d on handle %d\n",
+ this->id (),
+ this->get_handle ()));
this->peer ().close ();
@@ -582,7 +588,7 @@ Thr_Supplier_Proxy::svc (void)
for (this->timeout (1);
// Default is to reconnect synchronously.
- this->event_channel_->initiate_proxy_connection (this) == -1; )
+ this->event_channel_->initiate_connection_connection (this) == -1; )
{
ACE_Time_Value tv (this->timeout ());
ACE_ERROR ((LM_ERROR,
diff --git a/apps/Gateway/Gateway/Concrete_Proxy_Handlers.h b/apps/Gateway/Gateway/Concrete_Connection_Handlers.h
index 24d02094dde..a14d97b9d3e 100644
--- a/apps/Gateway/Gateway/Concrete_Proxy_Handlers.h
+++ b/apps/Gateway/Gateway/Concrete_Connection_Handlers.h
@@ -7,10 +7,10 @@
// gateway
//
// = FILENAME
-// Concrete_Proxy_Handlers.h
+// Concrete_Connection_Handlers.h
//
// = DESCRIPTION
-// These are all the subclasses of Proxy_Handler that define the
+// These are all the subclasses of Connection_Handler that define the
// appropriate threaded/reactive Consumer/Supplier behavior.
//
// = AUTHOR
@@ -18,21 +18,23 @@
//
// ============================================================================
-#if !defined (_CONCRETE_PROXY_HANDLER)
-#define _CONCRETE_PROXY_HANDLER
+#if !defined (CONCRETE_CONNECTION_HANDLER)
+#define CONCRETE_CONNECTION_HANDLER
-#include "Proxy_Handler.h"
+#include "Connection_Handler.h"
-class Supplier_Proxy : public Proxy_Handler
+class Supplier_Handler : public Connection_Handler
+{
// = TITLE
- // Handles reception of Events from Suppliers
+ // Handles reception of Events from Suppliers.
//
// = DESCRIPTION
- // Performs framing and error checking.
-{
+ // Performs framing and error checking on Events. Intended to
+ // run reactively, i.e., in one thread of control using a
+ // Reactor for demuxing and dispatching.
public:
// = Initialization method.
- Supplier_Proxy (const Proxy_Config_Info &);
+ Supplier_Handler (const Connection_Config_Info &);
protected:
// = All the following methods are upcalls, so they can be protected.
@@ -45,33 +47,32 @@ protected:
int forward (ACE_Message_Block *event);
// Forward the <event> to its appropriate Consumer. This delegates
- // to the <ACE_Event_Channel> to do the actual forwarding.
+ // to the <Event_Channel> to do the actual forwarding.
ACE_Message_Block *msg_frag_;
// Keep track of event fragment to handle non-blocking recv's from
// Suppliers.
};
-class Consumer_Proxy : public Proxy_Handler
+class Consumer_Handler : public Connection_Handler
+{
// = TITLE
// Handles transmission of events to Consumers.
//
// = DESCRIPTION
- // Performs queueing and error checking. Uses a single-threaded
- // Reactive approach to handle flow control.
-{
+ // Performs queueing and error checking. Intended to run
+ // reactively, i.e., in one thread of control using a Reactor
+ // for demuxing and dispatching. Also uses a Reactor to handle
+ // flow controlled output connections.
public:
// = Initialization method.
- Consumer_Proxy (const Proxy_Config_Info &);
+ Consumer_Handler (const Connection_Config_Info &);
virtual int put (ACE_Message_Block *event,
ACE_Time_Value * = 0);
// Send an event to a Consumer (may be queued if necessary).
protected:
- // = We'll allow up to 16 megabytes to be queued per-output proxy.
- enum {MAX_QUEUE_SIZE = 1024 * 1024 * 16};
-
virtual int handle_output (ACE_HANDLE);
// Finish sending event when flow control conditions abate.
@@ -85,15 +86,15 @@ protected:
// Receive and process shutdowns from a Consumer.
};
-class Thr_Consumer_Proxy : public Consumer_Proxy
- // = TITLE
- // Runs each Output Proxy_Handler in a separate thread.
+class Thr_Consumer_Handler : public Consumer_Handler
{
+ // = TITLE
+ // Runs each <Consumer_Handler> in a separate thread.
public:
- Thr_Consumer_Proxy (const Proxy_Config_Info &);
+ Thr_Consumer_Handler (const Connection_Config_Info &);
virtual int open (void *);
- // Initialize the threaded Consumer_Proxy object and spawn a new
+ // Initialize the threaded Consumer_Handler object and spawn a new
// thread.
virtual int put (ACE_Message_Block *, ACE_Time_Value * = 0);
@@ -107,12 +108,12 @@ protected:
// Transmit peer messages.
};
-class Thr_Supplier_Proxy : public Supplier_Proxy
- // = TITLE
- // Runs each Input Proxy_Handler in a separate thread.
+class Thr_Supplier_Handler : public Supplier_Handler
{
+ // = TITLE
+ // Runs each <Supplier_Handler> in a separate thread.
public:
- Thr_Supplier_Proxy (const Proxy_Config_Info &pci);
+ Thr_Supplier_Handler (const Connection_Config_Info &pci);
virtual int open (void *);
// Initialize the object and spawn a new thread.
@@ -122,4 +123,4 @@ protected:
// Transmit peer messages.
};
-#endif /* _CONCRETE_PROXY_HANDLER */
+#endif /* CONCRETE_CONNECTION_HANDLER */
diff --git a/apps/Gateway/Gateway/Config_Files.cpp b/apps/Gateway/Gateway/Config_Files.cpp
index f1d784c2bcf..062b16040f2 100644
--- a/apps/Gateway/Gateway/Config_Files.cpp
+++ b/apps/Gateway/Gateway/Config_Files.cpp
@@ -19,18 +19,13 @@ Consumer_Config_File_Parser::read_entry (Consumer_Config_Info &entry,
// Ignore comments, check for EOF and EOLINE if this succeeds, we
// have our connection id.
- while ((result = this->getint (entry.proxy_id_)) != FP::SUCCESS)
+ while ((result = this->getint (entry.connection_id_)) != FP::SUCCESS)
if (result == FP::EOFILE)
return FP::EOFILE;
else if (result == FP::EOLINE
|| result == FP::COMMENT)
line_number++;
- // Get the supplier id.
- result = this->getint (entry.supplier_id_);
- if (result != FP::SUCCESS)
- return result;
-
// Get the payload type.
result = this->getint (entry.type_);
if (result != FP::SUCCESS)
@@ -50,7 +45,7 @@ Consumer_Config_File_Parser::read_entry (Consumer_Config_Info &entry,
}
FP_RETURN_TYPE
-Proxy_Config_File_Parser::read_entry (Proxy_Config_Info &entry,
+Connection_Config_File_Parser::read_entry (Connection_Config_Info &entry,
int &line_number)
{
char buf[BUFSIZ];
@@ -62,7 +57,7 @@ Proxy_Config_File_Parser::read_entry (Proxy_Config_Info &entry,
// Ignore comments, check for EOF and EOLINE if this succeeds, we
// have our connection id
- while ((result = this->getint (entry.proxy_id_)) != FP::SUCCESS)
+ while ((result = this->getint (entry.connection_id_)) != FP::SUCCESS)
if (result == FP::EOFILE)
return FP::EOFILE;
else if (result == FP::EOLINE
@@ -85,11 +80,11 @@ Proxy_Config_File_Parser::read_entry (Proxy_Config_Info &entry,
if (result != FP::SUCCESS)
return result;
else
- entry.proxy_role_ = buf[0];
+ entry.connection_role_ = buf[0];
- if (entry.proxy_role_ == 'C')
+ if (entry.connection_role_ == 'C')
entry.remote_port_ = Options::instance ()->consumer_connector_port ();
- else if (entry.proxy_role_ == 'S')
+ else if (entry.connection_role_ == 'S')
entry.remote_port_ = Options::instance ()->supplier_connector_port ();
else
// Yikes, this is a *weird* error!
@@ -106,7 +101,7 @@ Proxy_Config_File_Parser::read_entry (Proxy_Config_Info &entry,
if (result != FP::SUCCESS)
return result;
else
- entry.proxy_role_ = buf[0];
+ entry.connection_role_ = buf[0];
}
// Get the max retry delay.
@@ -118,7 +113,9 @@ Proxy_Config_File_Parser::read_entry (Proxy_Config_Info &entry,
// Get the local port number.
result = this->getint (port);
- if (result != FP::SUCCESS)
+ if (result == FP::DEFAULT)
+ entry.local_port_ = 0; // @@ Should make this an option.
+ else if (result != FP::SUCCESS)
return result;
else
entry.local_port_ = u_short (port);
@@ -136,70 +133,87 @@ Proxy_Config_File_Parser::read_entry (Proxy_Config_Info &entry,
}
#if defined (DEBUGGING)
-int main (int argc, char *argv[])
+int
+main (int argc, char *argv[])
{
- if (argc != 4)
- {
- // ACE_ERROR_RETURN ((LM_ERROR, "%s filename\n", argv[0]), -1);
- cerr << argv[0] << " CCfilename filename Mapfilename.\n";
- exit (1);
- }
FP_RETURN_TYPE result;
- Proxy_Config_Info entry;
- Proxy_Config_File_Parser CCfile;
-
- CCfile.open (argv[1]);
-
int line_number = 0;
- printf ("ConnID\tHost\t\tRPort\tDir\tRetry\tLPort\n");
-
- // Read config file line at a time.
- while ((result = CCfile.read_entry (entry, line_number)) != EOF)
- {
- if (result != FP::SUCCESS)
- // ACE_DEBUG ((LM_DEBUG, "Error line %d.\n", line_number));
- cerr << "Error at line " << line_number << endl;
- else
- printf ("%d\t%s\t%d\t%c\t%d\t%c\t%d\n",
- entry.proxy_id_, entry.host_, entry.remote_port_, entry.proxy_role_,
- entry.max_retry_timeout_, entry.transform_, entry.local_port_);
- }
- CCfile.close();
-
- Consumer_Config_Info entry;
- Consumer_Config_File_Parser file;
+ {
+ Connection_Config_Info entry;
+ Connection_Config_File_Parser connection_config_file;
- file.open (argv[2]);
+ connection_config_file.open (argc > 1 ? argv[1] : "connection_config");
- line_number = 0;
+ int line_number = 0;
- printf ("\nConnID\tLogic\tPayload\tDestinations\n");
+ ACE_DEBUG ((LM_DEBUG,
+ "ConnID\tHost\t\tRPort\tRole\tRetry\tLPort\tPriority\n"));
- // Read config file line at a time.
- while ((result = file.read_entry (entry, line_number)) != EOF)
- {
- if (result != FP::SUCCESS)
- cerr << "Error at line " << line_number << endl;
+ // Read config file line at a time.
+ while ((result = connection_config_file.read_entry (entry, line_number)) != FP::EOFILE)
+ if (result == FP::PARSE_ERROR)
+ ACE_DEBUG ((LM_DEBUG,
+ "Error line %d.\n",
+ line_number));
+ else
+ ACE_DEBUG ((LM_DEBUG,
+ "%d\t%s\t%d\t%c\t%d\t%d\t%d\n",
+ entry.connection_id_,
+ entry.host_,
+ entry.remote_port_,
+ entry.connection_role_,
+ entry.max_retry_timeout_,
+ entry.local_port_,
+ entry.priority_));
+
+ connection_config_file.close ();
+ }
+
+ {
+ Consumer_Config_Info entry;
+ Consumer_Config_File_Parser consumer_config_file;
+
+ consumer_config_file.open (argc > 2 ? argv[2] : "consumer_config");
+
+ line_number = 0;
+
+ ACE_DEBUG ((LM_DEBUG,
+ "\nConnID\tLogic\tPayload\tDestinations\n"));
+
+ // Read config file line at a time.
+ while ((result = consumer_config_file.read_entry (entry, line_number)) != FP::EOFILE)
+ if (result == FP::PARSE_ERROR)
+ ACE_DEBUG ((LM_DEBUG,
+ "Error line %d.\n",
+ line_number));
else
{
- printf ("%d\t%d\t%d\t%d\t",
- entry.proxy_id_, entry.supplier_id_, entry.type_);
+ ACE_DEBUG ((LM_DEBUG,
+ "%d\t%d\t%d\t%d\t",
+ entry.connection_id_,
+ entry.supplier_id_,
+ entry.type_));
+
while (--entry.total_consumers_ >= 0)
- printf ("%d,", entry.consumers_[entry.total_consumers_]);
- printf ("\n");
+ ACE_DEBUG ((LM_DEBUG,
+ "%d,",
+ entry.consumers_[entry.total_consumers_]));
+ ACE_DEBUG ((LM_DEBUG,
+ "\n"));
}
- }
- file.close();
+
+ consumer_config_file.close ();
+ }
return 0;
}
#endif /* DEBUGGING */
#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)
-template class File_Parser<Proxy_Config_Info>;
+template class File_Parser<Connection_Config_Info>;
template class File_Parser<Consumer_Config_Info>;
#elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA)
-#pragma instantiate File_Parser<Proxy_Config_Info>
+#pragma instantiate File_Parser<Connection_Config_Info>
#pragma instantiate File_Parser<Consumer_Config_Info>
#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */
diff --git a/apps/Gateway/Gateway/Config_Files.h b/apps/Gateway/Gateway/Config_Files.h
index 2f1f1280bc2..1199c615833 100644
--- a/apps/Gateway/Gateway/Config_Files.h
+++ b/apps/Gateway/Gateway/Config_Files.h
@@ -17,19 +17,19 @@
#if !defined (_CONFIG_FILES)
#define _CONFIG_FILES
-#include "ace/OS.h"
#include "File_Parser.h"
+#include "Event.h"
// Forward declaration.
-class ACE_Event_Channel;
+class Event_Channel;
-class Proxy_Config_Info
+class Connection_Config_Info
// = TITLE
// Stores connection configuration information.
{
public:
- ACE_INT32 proxy_id_;
- // Connection id for this Proxy_Handler.
+ ACE_INT32 connection_id_;
+ // Connection id for this Connection_Handler.
char host_[BUFSIZ];
// Host to connect with.
@@ -37,7 +37,7 @@ public:
u_short remote_port_;
// Port to connect with.
- char proxy_role_;
+ char connection_role_;
// 'S' (supplier) or 'C' (consumer).
ACE_INT32 max_retry_timeout_;
@@ -50,19 +50,19 @@ public:
// Priority by which different Consumers and Suppliers should be
// serviced.
- ACE_Event_Channel *event_channel_;
+ Event_Channel *event_channel_;
// We just need a place to store this until we can pass it along
- // when creating a Proxy_Handler.
+ // when creating a Connection_Handler.
};
-class Proxy_Config_File_Parser : public File_Parser<Proxy_Config_Info>
+class Connection_Config_File_Parser : public File_Parser<Connection_Config_Info>
// = TITLE
- // Parser for the Proxy_Handler Connection file.
+ // Parser for the Connection_Handler Connection file.
{
public:
- virtual FP::Return_Type read_entry (Proxy_Config_Info &entry,
+ virtual FP::Return_Type read_entry (Connection_Config_Info &entry,
int &line_number);
- // Read in a <Proxy_Config_Info> entry.
+ // Read in a <Connection_Config_Info> entry.
};
@@ -77,12 +77,9 @@ public:
// Total number of multicast consumers.
};
- ACE_INT32 proxy_id_;
+ ACE_INT32 connection_id_;
// Connection id for this proxy.
- ACE_INT32 supplier_id_;
- // Logical supplier id for this proxy.
-
ACE_INT32 type_;
// Message type.
diff --git a/apps/Gateway/Gateway/Proxy_Handler.cpp b/apps/Gateway/Gateway/Connection_Handler.cpp
index af12f5b6bff..3b9e8909dbc 100644
--- a/apps/Gateway/Gateway/Proxy_Handler.cpp
+++ b/apps/Gateway/Gateway/Connection_Handler.cpp
@@ -2,16 +2,16 @@
#define ACE_BUILD_SVC_DLL
#include "Event_Channel.h"
-#include "Concrete_Proxy_Handlers.h"
+#include "Concrete_Connection_Handlers.h"
void
-Proxy_Handler::id (ACE_INT32 id)
+Connection_Handler::id (ACE_INT32 id)
{
this->id_ = id;
}
ACE_INT32
-Proxy_Handler::id (void)
+Connection_Handler::id (void)
{
return this->id_;
}
@@ -19,27 +19,27 @@ Proxy_Handler::id (void)
// The total number of bytes sent/received on this Proxy.
size_t
-Proxy_Handler::total_bytes (void)
+Connection_Handler::total_bytes (void)
{
return this->total_bytes_;
}
void
-Proxy_Handler::total_bytes (size_t bytes)
+Connection_Handler::total_bytes (size_t bytes)
{
this->total_bytes_ += bytes;
}
-Proxy_Handler::Proxy_Handler (void)
+Connection_Handler::Connection_Handler (void)
{
}
-Proxy_Handler::Proxy_Handler (const Proxy_Config_Info &pci)
+Connection_Handler::Connection_Handler (const Connection_Config_Info &pci)
: remote_addr_ (pci.remote_port_, pci.host_),
local_addr_ (pci.local_port_),
- id_ (pci.proxy_id_),
+ id_ (pci.connection_id_),
total_bytes_ (0),
- state_ (Proxy_Handler::IDLE),
+ state_ (Connection_Handler::IDLE),
timeout_ (1),
max_timeout_ (pci.max_retry_timeout_),
event_channel_ (pci.event_channel_)
@@ -48,26 +48,26 @@ Proxy_Handler::Proxy_Handler (const Proxy_Config_Info &pci)
this->priority (int (pci.priority_));
}
-// Set the proxy_role.
+// Set the connection_role.
void
-Proxy_Handler::proxy_role (char d)
+Connection_Handler::connection_role (char d)
{
- this->proxy_role_ = d;
+ this->connection_role_ = d;
}
-// Get the proxy_role.
+// Get the connection_role.
char
-Proxy_Handler::proxy_role (void)
+Connection_Handler::connection_role (void)
{
- return this->proxy_role_;
+ return this->connection_role_;
}
// Sets the timeout delay.
void
-Proxy_Handler::timeout (int to)
+Connection_Handler::timeout (int to)
{
if (to > this->max_timeout_)
to = this->max_timeout_;
@@ -80,7 +80,7 @@ Proxy_Handler::timeout (int to)
// re-calculation).
int
-Proxy_Handler::timeout (void)
+Connection_Handler::timeout (void)
{
int old_timeout = this->timeout_;
this->timeout_ *= 2;
@@ -94,7 +94,7 @@ Proxy_Handler::timeout (void)
// Sets the max timeout delay.
void
-Proxy_Handler::max_timeout (int mto)
+Connection_Handler::max_timeout (int mto)
{
this->max_timeout_ = mto;
}
@@ -102,7 +102,7 @@ Proxy_Handler::max_timeout (int mto)
// Gets the max timeout delay.
int
-Proxy_Handler::max_timeout (void)
+Connection_Handler::max_timeout (void)
{
return this->max_timeout_;
}
@@ -110,54 +110,54 @@ Proxy_Handler::max_timeout (void)
// Restart connection asynchronously when timeout occurs.
int
-Proxy_Handler::handle_timeout (const ACE_Time_Value &,
+Connection_Handler::handle_timeout (const ACE_Time_Value &,
const void *)
{
ACE_DEBUG ((LM_DEBUG,
- "(%t) attempting to reconnect Proxy_Handler %d with timeout = %d\n",
+ "(%t) attempting to reconnect Connection_Handler %d with timeout = %d\n",
this->id (), this->timeout_));
// Delegate the re-connection attempt to the Event Channel.
- this->event_channel_->initiate_proxy_connection (this);
+ this->event_channel_->initiate_connection_connection (this);
return 0;
}
-// Handle shutdown of the Proxy_Handler object.
+// Handle shutdown of the Connection_Handler object.
int
-Proxy_Handler::handle_close (ACE_HANDLE, ACE_Reactor_Mask)
+Connection_Handler::handle_close (ACE_HANDLE, ACE_Reactor_Mask)
{
ACE_DEBUG ((LM_DEBUG,
- "(%t) shutting down %s Proxy_Handler %d on handle %d\n",
- this->proxy_role () == 'C' ? "Consumer" : "Supplier",
+ "(%t) shutting down %s Connection_Handler %d on handle %d\n",
+ this->connection_role () == 'C' ? "Consumer" : "Supplier",
this->id (), this->get_handle ()));
// Restart the connection, if possible.
- return this->event_channel_->reinitiate_proxy_connection (this);
+ return this->event_channel_->reinitiate_connection_connection (this);
}
// Set the state of the Proxy.
void
-Proxy_Handler::state (Proxy_Handler::State s)
+Connection_Handler::state (Connection_Handler::State s)
{
this->state_ = s;
}
// Upcall from the <ACE_Acceptor> or <ACE_Connector> that delegates
-// control to our Proxy_Handler.
+// control to our Connection_Handler.
int
-Proxy_Handler::open (void *)
+Connection_Handler::open (void *)
{
- ACE_DEBUG ((LM_DEBUG, "(%t) %s Proxy_Handler's handle = %d\n",
- this->proxy_role () == 'C' ? "Consumer" : "Supplier",
+ ACE_DEBUG ((LM_DEBUG, "(%t) %s Connection_Handler's handle = %d\n",
+ this->connection_role () == 'C' ? "Consumer" : "Supplier",
this->peer ().get_handle ()));
// Call back to the <Event_Channel> to complete our initialization.
- if (this->event_channel_->complete_proxy_connection (this) == -1)
- ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "complete_proxy_connection"), -1);
+ if (this->event_channel_->complete_connection_connection (this) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "complete_connection_connection"), -1);
// Turn on non-blocking I/O.
else if (this->peer ().enable (ACE_NONBLOCK) == -1)
@@ -173,72 +173,72 @@ Proxy_Handler::open (void *)
// Return the current state of the Proxy.
-Proxy_Handler::State
-Proxy_Handler::state (void)
+Connection_Handler::State
+Connection_Handler::state (void)
{
return this->state_;
}
ACE_INET_Addr &
-Proxy_Handler::remote_addr (void)
+Connection_Handler::remote_addr (void)
{
return this->remote_addr_;
}
ACE_INET_Addr &
-Proxy_Handler::local_addr (void)
+Connection_Handler::local_addr (void)
{
return this->local_addr_;
}
-// Make the appropriate type of <Proxy_Handler> (i.e.,
-// <Consumer_Proxy>, <Supplier_Proxy>, <Thr_Consumer_Proxy>, or
-// <Thr_Supplier_Proxy>).
+// Make the appropriate type of <Connection_Handler> (i.e.,
+// <Consumer_Handler>, <Supplier_Handler>, <Thr_Consumer_Handler>, or
+// <Thr_Supplier_Handler>).
-Proxy_Handler *
-Proxy_Handler_Factory::make_proxy_handler (const Proxy_Config_Info &pci)
+Connection_Handler *
+Connection_Handler_Factory::make_connection_handler (const Connection_Config_Info &pci)
{
- Proxy_Handler *proxy_handler = 0;
+ Connection_Handler *connection_handler = 0;
// The next few lines of code are dependent on whether we are making
- // a threaded/reactive Supplier_Proxy/Consumer_Proxy.
+ // a threaded/reactive Supplier_Handler/Consumer_Handler.
- if (pci.proxy_role_ == 'C') // Configure a Consumer_Proxy.
+ if (pci.connection_role_ == 'C') // Configure a Consumer_Handler.
{
#if defined (ACE_HAS_THREADS)
- // Create a threaded Consumer_Proxy.
+ // Create a threaded Consumer_Handler.
if (ACE_BIT_ENABLED (Options::instance ()->threading_strategy (),
Options::OUTPUT_MT))
- ACE_NEW_RETURN (proxy_handler,
- Thr_Consumer_Proxy (pci),
+ ACE_NEW_RETURN (connection_handler,
+ Thr_Consumer_Handler (pci),
0);
- // Create a reactive Consumer_Proxy.
+ // Create a reactive Consumer_Handler.
else
#endif /* ACE_HAS_THREADS */
- ACE_NEW_RETURN (proxy_handler,
- Consumer_Proxy (pci),
+ ACE_NEW_RETURN (connection_handler,
+ Consumer_Handler (pci),
0);
}
- else // proxy_role == 'S', so configure a Supplier_Proxy.
+ else // connection_role == 'S', so configure a Supplier_Handler.
{
#if defined (ACE_HAS_THREADS)
- // Create a threaded Supplier_Proxy.
+ // Create a threaded Supplier_Handler.
if (ACE_BIT_ENABLED (Options::instance ()->threading_strategy (),
Options::INPUT_MT))
- ACE_NEW_RETURN (proxy_handler,
- Thr_Supplier_Proxy (pci),
+ ACE_NEW_RETURN (connection_handler,
+ Thr_Supplier_Handler (pci),
0);
- // Create a reactive Supplier_Proxy.
+ // Create a reactive Supplier_Handler.
else
#endif /* ACE_HAS_THREAD */
- ACE_NEW_RETURN (proxy_handler,
- Supplier_Proxy (pci),
+ ACE_NEW_RETURN (connection_handler,
+ Supplier_Handler (pci),
0);
}
- return proxy_handler;
+ return connection_handler;
}
#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)
diff --git a/apps/Gateway/Gateway/Proxy_Handler.h b/apps/Gateway/Gateway/Connection_Handler.h
index 76bd4024478..665c1635a97 100644
--- a/apps/Gateway/Gateway/Proxy_Handler.h
+++ b/apps/Gateway/Gateway/Connection_Handler.h
@@ -7,15 +7,15 @@
// gateway
//
// = FILENAME
-// Proxy_Handler.h
+// Connection_Handler.h
//
// = AUTHOR
// Doug Schmidt
//
// ============================================================================
-#if !defined (_PROXY_HANDLER)
-#define _PROXY_HANDLER
+#if !defined (_CONNECTION_HANDLER)
+#define _CONNECTION_HANDLER
#include "ace/Service_Config.h"
#include "ace/SOCK_Connector.h"
@@ -25,25 +25,26 @@
#include "Event.h"
// Forward declaration.
-class ACE_Event_Channel;
+class Event_Channel;
-class Proxy_Handler : public ACE_Svc_Handler<ACE_SOCK_STREAM, ACE_SYNCH>
+class Connection_Handler : public ACE_Svc_Handler<ACE_SOCK_STREAM, ACE_SYNCH>
+{
// = TITLE
- // Proxy_Handler contains info about connection state and addressing.
+ // Connection_Handler contains info about connection state and
+ // addressing.
//
// = DESCRIPTION
- // The Proxy_Handler classes process events sent to the Event
- // Channel from Suppliers and forward them to Consumers.
-{
+ // The Connection_Handler classes process events sent to the
+ // Event Channel from Suppliers and forward them to Consumers.
public:
- Proxy_Handler (void);
+ Connection_Handler (void);
// Default constructor (needed to make <ACE_Connector> happy).
- Proxy_Handler (const Proxy_Config_Info &);
+ Connection_Handler (const Connection_Config_Info &);
// Real constructor.
virtual int open (void * = 0);
- // Initialize and activate a single-threaded Proxy_Handler (called by
+ // Initialize and activate a single-threaded Connection_Handler (called by
// ACE_Connector::handle_output()).
ACE_INET_Addr &remote_addr (void);
@@ -56,14 +57,14 @@ public:
ACE_INT32 id (void);
void id (ACE_INT32);
- // = The current state of the Proxy_Handler.
+ // = The current state of the Connection_Handler.
enum State
{
IDLE = 1, // Prior to initialization.
CONNECTING, // During connection establishment.
- ESTABLISHED, // Proxy_Handler is established and active.
- DISCONNECTING, // Proxy_Handler is in the process of connecting.
- FAILED // Proxy_Handler has failed.
+ ESTABLISHED, // Connection_Handler is established and active.
+ DISCONNECTING, // Connection_Handler is in the process of connecting.
+ FAILED // Connection_Handler has failed.
};
// = Set/get the current state.
@@ -80,8 +81,8 @@ public:
// = Set/get proxy role (i.e., 'S' for Supplier and 'C' for Consumer
// (necessary for error checking).
- void proxy_role (char);
- char proxy_role (void);
+ void connection_role (char);
+ char connection_role (void);
// = The total number of bytes sent/received on this proxy.
size_t total_bytes (void);
@@ -89,11 +90,11 @@ public:
// Increment count by <bytes>.
virtual int handle_timeout (const ACE_Time_Value &, const void *arg);
- // Perform timer-based Proxy_Handler reconnection.
+ // Perform timer-based Connection_Handler reconnection.
virtual int handle_close (ACE_HANDLE = ACE_INVALID_HANDLE,
ACE_Reactor_Mask = ACE_Event_Handler::ALL_EVENTS_MASK);
- // Perform Proxy_Handler termination.
+ // Perform Connection_Handler termination.
protected:
enum
@@ -122,27 +123,27 @@ protected:
int max_timeout_;
// Maximum amount of time to wait between reconnection attempts.
- char proxy_role_;
+ char connection_role_;
// Indicates which role the proxy plays ('S' == Supplier and 'C' ==
// Consumer).
- ACE_Event_Channel *event_channel_;
- // Reference to the <ACE_Event_Channel> that we use to forward all
+ Event_Channel *event_channel_;
+ // Reference to the <Event_Channel> that we use to forward all
// the events from Consumers and Suppliers.
};
-class Proxy_Handler_Factory : public ACE_Svc_Handler<ACE_SOCK_STREAM, ACE_NULL_SYNCH>
+class Connection_Handler_Factory
+{
// = TITLE
- // Creates the appropriate type of <Proxy_Handler>
+ // Creates the appropriate type of <Connection_Handler>.
//
// = DESCRIPTION
- // <Proxy_Handler>s can include <Consumer_Proxy>,
- // <Supplier_Proxy>, <Thr_Consumer_Proxy>, or
- // <Thr_Supplier_Proxy>).
-{
+ // <Connection_Handler>s can include <Consumer_Handler>,
+ // <Supplier_Handler>, <Thr_Consumer_Handler>, or
+ // <Thr_Supplier_Handler>).
public:
- Proxy_Handler *make_proxy_handler (const Proxy_Config_Info &);
- // Make the appropriate type of <Proxy_Handler>.
+ Connection_Handler *make_connection_handler (const Connection_Config_Info &);
+ // Make the appropriate type of <Connection_Handler>.
};
-#endif /* _PROXY_HANDLER */
+#endif /* _CONNECTION_HANDLER */
diff --git a/apps/Gateway/Gateway/Connection_Handler_Acceptor.cpp b/apps/Gateway/Gateway/Connection_Handler_Acceptor.cpp
new file mode 100644
index 00000000000..f38aa21a23a
--- /dev/null
+++ b/apps/Gateway/Gateway/Connection_Handler_Acceptor.cpp
@@ -0,0 +1,33 @@
+// $Id$
+
+#include "Event_Channel.h"
+#include "Connection_Handler_Acceptor.h"
+
+int
+Connection_Handler_Acceptor::make_svc_handler (Connection_Handler *&ph)
+{
+ ACE_ALLOCATOR_RETURN (ph,
+ this->connection_handler_factory_.make_connection_handler (this->connection_config_info_),
+ -1);
+ return 0;
+}
+
+Connection_Handler_Acceptor::Connection_Handler_Acceptor (Event_Channel &ec,
+ char connection_role)
+ : event_channel_ (ec)
+{
+ this->connection_config_info_.connection_id_ = 0;
+ this->connection_config_info_.host_[0] = '\0';
+ this->connection_config_info_.remote_port_ = 0;
+ this->connection_config_info_.connection_role_ = connection_role;
+ this->connection_config_info_.max_retry_timeout_ = Options::instance ()->max_timeout ();
+ this->connection_config_info_.local_port_ = 0;
+ this->connection_config_info_.priority_ = 1;
+}
+
+#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)
+template class ACE_Acceptor<Connection_Handler, ACE_SOCK_ACCEPTOR>;
+#elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA)
+#pragma instantiate ACE_Acceptor<Connection_Handler, ACE_SOCK_ACCEPTOR>
+#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */
+
diff --git a/apps/Gateway/Gateway/Connection_Handler_Acceptor.h b/apps/Gateway/Gateway/Connection_Handler_Acceptor.h
new file mode 100644
index 00000000000..fc54a363595
--- /dev/null
+++ b/apps/Gateway/Gateway/Connection_Handler_Acceptor.h
@@ -0,0 +1,51 @@
+/* -*- C++ -*- */
+// $Id$
+
+// ============================================================================
+//
+// = LIBRARY
+// gateway
+//
+// = FILENAME
+// Connection_Handler_acceptor.h
+//
+// = AUTHOR
+// Doug Schmidt
+//
+// ============================================================================
+
+#if !defined (_CONNECTION_HANDLER_ACCEPTOR)
+#define _CONNECTION_HANDLER_ACCEPTOR
+
+#include "ace/Acceptor.h"
+#include "ace/SOCK_Acceptor.h"
+#include "Connection_Handler.h"
+
+// Forward declaration
+class Event_Channel;
+
+class Connection_Handler_Acceptor : public ACE_Acceptor<Connection_Handler, ACE_SOCK_ACCEPTOR>
+{
+ // = TITLE
+ // A concrete factory class that setups connections to peerds
+ // and produces a new Connection_Handler object to do the dirty
+ // work...
+public:
+ Connection_Handler_Acceptor (Event_Channel &,
+ char connection_role);
+
+ virtual int make_svc_handler (Connection_Handler *&ph);
+ // Hook method for creating an appropriate <Connection_Handler>.
+
+protected:
+ Event_Channel &event_channel_;
+ // Reference to the event channel.
+
+ Connection_Config_Info connection_config_info_;
+ // Keeps track of what type of proxy we need to create.
+
+ Connection_Handler_Factory connection_handler_factory_;
+ // Make the appropriate type of <Connection_Handler>.
+};
+
+#endif /* _CONNECTION_HANDLER_ACCEPTOR */
diff --git a/apps/Gateway/Gateway/Connection_Handler_Connector.cpp b/apps/Gateway/Gateway/Connection_Handler_Connector.cpp
new file mode 100644
index 00000000000..56ac92d9c2f
--- /dev/null
+++ b/apps/Gateway/Gateway/Connection_Handler_Connector.cpp
@@ -0,0 +1,73 @@
+// $Id$
+
+#include "Connection_Handler_Connector.h"
+
+Connection_Handler_Connector::Connection_Handler_Connector (void)
+{
+}
+
+// Initiate (or reinitiate) a connection to the Connection_Handler.
+
+int
+Connection_Handler_Connector::initiate_connection (Connection_Handler *connection_handler,
+ ACE_Synch_Options &synch_options)
+{
+ char addr_buf[MAXHOSTNAMELEN];
+
+ // Mark ourselves as idle so that the various iterators
+ // will ignore us until we are reconnected.
+ connection_handler->state (Connection_Handler::IDLE);
+
+ // We check the remote addr second so that it remains in the addr_buf.
+ if (connection_handler->local_addr ().addr_to_string (addr_buf, sizeof addr_buf) == -1
+ || connection_handler->remote_addr ().addr_to_string (addr_buf, sizeof addr_buf) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n",
+ "can't obtain peer's address"), -1);
+
+ // Try to connect to the Peer.
+
+ if (this->connect (connection_handler, connection_handler->remote_addr (),
+ synch_options, connection_handler->local_addr ()) == -1)
+ {
+ if (errno != EWOULDBLOCK)
+ {
+ connection_handler->state (Connection_Handler::FAILED);
+ ACE_DEBUG ((LM_DEBUG, "(%t) %p on address %s\n",
+ "connect", addr_buf));
+
+ return -1;
+ }
+ else
+ {
+ connection_handler->state (Connection_Handler::CONNECTING);
+ ACE_DEBUG ((LM_DEBUG,
+ "(%t) in the process of connecting to %s\n",
+ addr_buf));
+ }
+ }
+ else
+ {
+ connection_handler->state (Connection_Handler::ESTABLISHED);
+ ACE_DEBUG ((LM_DEBUG, "(%t) connected to %s on %d\n",
+ addr_buf, connection_handler->get_handle ()));
+ }
+ return 0;
+}
+
+#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)
+template class ACE_Connector<Connection_Handler, ACE_SOCK_CONNECTOR>;
+template class ACE_Svc_Tuple<Connection_Handler>;
+template class ACE_Map_Manager<ACE_HANDLE, ACE_Svc_Tuple<Connection_Handler> *, ACE_SYNCH_RW_MUTEX>;
+template class ACE_Map_Iterator_Base<ACE_HANDLE, ACE_Svc_Tuple<Connection_Handler> *, ACE_SYNCH_RW_MUTEX>;
+template class ACE_Map_Iterator<ACE_HANDLE, ACE_Svc_Tuple<Connection_Handler> *, ACE_SYNCH_RW_MUTEX>;
+template class ACE_Map_Reverse_Iterator<ACE_HANDLE, ACE_Svc_Tuple<Connection_Handler> *, ACE_SYNCH_RW_MUTEX>;
+template class ACE_Map_Entry<ACE_HANDLE, ACE_Svc_Tuple<Connection_Handler> *>;
+#elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA)
+#pragma instantiate ACE_Connector<Connection_Handler, ACE_SOCK_CONNECTOR>
+#pragma instantiate ACE_Svc_Tuple<Connection_Handler>
+#pragma instantiate ACE_Map_Manager<ACE_HANDLE, ACE_Svc_Tuple<Connection_Handler> *, ACE_SYNCH_RW_MUTEX>
+#pragma instantiate ACE_Map_Iterator_Base<ACE_HANDLE, ACE_Svc_Tuple<Connection_Handler> *, ACE_SYNCH_RW_MUTEX>
+#pragma instantiate ACE_Map_Iterator<ACE_HANDLE, ACE_Svc_Tuple<Connection_Handler> *, ACE_SYNCH_RW_MUTEX>
+#pragma instantiate ACE_Map_Reverse_Iterator<ACE_HANDLE, ACE_Svc_Tuple<Connection_Handler> *, ACE_SYNCH_RW_MUTEX>
+#pragma instantiate ACE_Map_Entry<ACE_HANDLE, ACE_Svc_Tuple<Connection_Handler> *>
+#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */
diff --git a/apps/Gateway/Gateway/Proxy_Handler_Connector.h b/apps/Gateway/Gateway/Connection_Handler_Connector.h
index 4e5cc79640e..e81d66d9694 100644
--- a/apps/Gateway/Gateway/Proxy_Handler_Connector.h
+++ b/apps/Gateway/Gateway/Connection_Handler_Connector.h
@@ -7,7 +7,7 @@
// gateway
//
// = FILENAME
-// Proxy_Handler_Connector.h
+// Connection_Handler_Connector.h
//
// = AUTHOR
// Doug Schmidt
@@ -19,18 +19,19 @@
#include "ace/Connector.h"
#include "ace/SOCK_Connector.h"
-#include "Proxy_Handler.h"
+#include "Connection_Handler.h"
-class Proxy_Handler_Connector : public ACE_Connector<Proxy_Handler, ACE_SOCK_CONNECTOR>
- // = TITLE
- // A concrete factory class that setups connections to peerds
- // and produces a new Proxy_Handler object to do the dirty work...
+class Connection_Handler_Connector : public ACE_Connector<Connection_Handler, ACE_SOCK_CONNECTOR>
{
+ // = TITLE
+ // A concrete factory class that setups connections to peerds
+ // and produces a new Connection_Handler object to do the dirty
+ // work...
public:
- Proxy_Handler_Connector (void);
+ Connection_Handler_Connector (void);
- // Initiate (or reinitiate) a connection on the Proxy_Handler.
- int initiate_connection (Proxy_Handler *,
+ // Initiate (or reinitiate) a connection on the Connection_Handler.
+ int initiate_connection (Connection_Handler *,
ACE_Synch_Options & = ACE_Synch_Options::synch);
};
diff --git a/apps/Gateway/Gateway/Consumer_Dispatch_Set.h b/apps/Gateway/Gateway/Consumer_Dispatch_Set.h
index 5ff672679c2..2f89143460d 100644
--- a/apps/Gateway/Gateway/Consumer_Dispatch_Set.h
+++ b/apps/Gateway/Gateway/Consumer_Dispatch_Set.h
@@ -14,15 +14,15 @@
//
// ============================================================================
-#if !defined (_DISPATCH_SET)
-#define _DISPATCH_SET
+#if !defined (CONSUMER_DISPATCH_SET)
+#define CONSUMER_DISPATCH_SET
#include "ace/Containers.h"
// Forward reference.
-class Proxy_Handler;
+class Connection_Handler;
-typedef ACE_Unbounded_Set<Proxy_Handler *> Consumer_Dispatch_Set;
-typedef ACE_Unbounded_Set_Iterator<Proxy_Handler *> Consumer_Dispatch_Set_Iterator;
+typedef ACE_Unbounded_Set<Connection_Handler *> Consumer_Dispatch_Set;
+typedef ACE_Unbounded_Set_Iterator<Connection_Handler *> Consumer_Dispatch_Set_Iterator;
-#endif /* _DISPATCH_SET */
+#endif /* CONSUMER_DISPATCH_SET */
diff --git a/apps/Gateway/Gateway/Event.h b/apps/Gateway/Gateway/Event.h
index f88b1770d2f..58ef1f0a97b 100644
--- a/apps/Gateway/Gateway/Event.h
+++ b/apps/Gateway/Gateway/Event.h
@@ -19,6 +19,11 @@
#include "ace/OS.h"
+// = The following #defines should really be in a separate include
+// file that is shared with the ../Peer/ directory. For now, we'll
+// keep them here to simplify the sharing between the two directories.
+// BTW, this is also the reason why all the methods are inlined...
+
// Used by Peers to create Consumers in a Gateway.
#if !defined (DEFAULT_GATEWAY_CONSUMER_PORT)
#define DEFAULT_GATEWAY_CONSUMER_PORT 10009
@@ -39,11 +44,12 @@
#define DEFAULT_PEER_SUPPLIER_PORT 10012
#endif /* DEFAULT_PEER_SUPPLIER_PORT */
-// This is the unique connection identifier that denotes a particular
-// Proxy_Handler in the Gateway.
-typedef ACE_INT32 ACE_INT32;
+// This is the unique supplier identifier that denotes a particular
+// <Connection_Handler> in the Gateway.
+typedef ACE_INT32 CONNECTION_ID;
class Event_Key
+{
// = TITLE
// Address used to identify the source/destination of an event.
//
@@ -51,45 +57,35 @@ class Event_Key
// This is really a "virtual forwarding address" thatis used to
// decouple the filtering and forwarding logic of the Event
// Channel from the format of the data.
-{
public:
- Event_Key (ACE_INT32 cid = -1,
- u_char sid = 0,
- u_char type = 0)
- : proxy_id_ (cid),
- supplier_id_ (sid),
+ Event_Key (CONNECTION_ID cid = -1,
+ u_char type = 0)
+ : connection_id_ (cid),
type_ (type) {}
int operator== (const Event_Key &event_addr) const
{
- return this->proxy_id_ == event_addr.proxy_id_
- && this->supplier_id_ == event_addr.supplier_id_
+ return this->connection_id_ == event_addr.connection_id_
&& this->type_ == event_addr.type_;
}
- ACE_INT32 proxy_id_;
+ CONNECTION_ID connection_id_;
// Unique connection identifier that denotes a particular
- // Proxy_Handler.
-
- ACE_INT32 supplier_id_;
- // Logical ID.
+ // Connection_Handler.
ACE_INT32 type_;
// Event type.
};
class Event_Header
+{
// = TITLE
// Fixed sized header.
//
// = DESCRIPTION
// This is designed to have a sizeof (16) to avoid alignment
// problems on most platforms.
-{
public:
- typedef ACE_INT32 SUPPLIER_ID;
- // Type used to forward events from gatewayd.
-
enum
{
INVALID_ID = -1 // No peer can validly use this number.
@@ -98,7 +94,6 @@ public:
void decode (void)
{
this->len_ = ntohl (this->len_);
- this->supplier_id_ = ntohl (this->supplier_id_);
this->type_ = ntohl (this->type_);
this->priority_ = ntohl (this->priority_);
}
@@ -107,7 +102,6 @@ public:
void encode (void)
{
this->len_ = htonl (this->len_);
- this->supplier_id_ = htonl (this->supplier_id_);
this->type_ = htonl (this->type_);
this->priority_ = htonl (this->priority_);
}
@@ -116,8 +110,9 @@ public:
size_t len_;
// Length of the data_ payload, in bytes.
- SUPPLIER_ID supplier_id_;
- // Source ID.
+ CONNECTION_ID connection_id_;
+ // Unique connection identifier that denotes a particular
+ // Connection_Handler.
ACE_INT32 type_;
// Event type.
@@ -127,10 +122,10 @@ public:
};
class Event
+{
// = TITLE
// Variable-sized event (data_ may be variable-sized between
// 0 and MAX_PAYLOAD_SIZE).
-{
public:
enum { MAX_PAYLOAD_SIZE = 1024 };
// The maximum size of an Event.
diff --git a/apps/Gateway/Gateway/Event_Channel.cpp b/apps/Gateway/Gateway/Event_Channel.cpp
index ac09c0e239c..c430324b42e 100644
--- a/apps/Gateway/Gateway/Event_Channel.cpp
+++ b/apps/Gateway/Gateway/Event_Channel.cpp
@@ -2,24 +2,24 @@
// $Id$
#define ACE_BUILD_SVC_DLL
-#include "Proxy_Handler_Connector.h"
+#include "Connection_Handler_Connector.h"
#include "Event_Channel.h"
-ACE_Event_Channel::~ACE_Event_Channel (void)
+Event_Channel::~Event_Channel (void)
{
}
-ACE_Event_Channel::ACE_Event_Channel (void)
- : supplier_acceptor_ (*this),
- consumer_acceptor_ (*this)
+Event_Channel::Event_Channel (void)
+ : supplier_acceptor_ (*this, 'S'),
+ consumer_acceptor_ (*this, 'C')
{
}
int
-ACE_Event_Channel::compute_performance_statistics (void)
+Event_Channel::compute_performance_statistics (void)
{
ACE_DEBUG ((LM_DEBUG, "(%t) doing the performance timeout here...\n"));
- PROXY_MAP_ITERATOR cmi (this->proxy_map_);
+ CONNECTION_MAP_ITERATOR cmi (this->connection_map_);
// If we've got a ACE_Thread Manager then use it to suspend all the
// threads. This will enable us to get an accurate count.
@@ -38,44 +38,32 @@ ACE_Event_Channel::compute_performance_statistics (void)
// Iterate through the connection map summing up the number of bytes
// sent/received.
- for (PROXY_MAP_ENTRY *me = 0;
+ for (CONNECTION_MAP_ENTRY *me = 0;
cmi.next (me) != 0;
cmi.advance ())
{
- Proxy_Handler *proxy_handler = me->int_id_;
+ Connection_Handler *connection_handler = me->int_id_;
- if (proxy_handler->proxy_role () == 'C')
- total_bytes_out += proxy_handler->total_bytes ();
- else // proxy_handler->proxy_role () == 'S'
- total_bytes_in += proxy_handler->total_bytes ();
+ if (connection_handler->connection_role () == 'C')
+ total_bytes_out += connection_handler->total_bytes ();
+ else // connection_handler->connection_role () == 'S'
+ total_bytes_in += connection_handler->total_bytes ();
}
-#if defined (ACE_NLOGGING)
- ACE_OS::fprintf (stderr,
- "After %d seconds, \ntotal_bytes_in = %d\ntotal_bytes_out = %d\n",
- performance_window_,
- total_bytes_in,
- total_bytes_out);
-
- ACE_OS::fprintf (stderr, "%f Mbits/sec received.\n",
- (float) (total_bytes_in * 8 / (float) (1024*1024*this->performance_window_)));
-
- ACE_OS::fprintf (stderr, "%f Mbits/sec sent.\n",
- (float) (total_bytes_out * 8 / (float) (1024*1024*this->performance_window_)));
-#else
ACE_DEBUG ((LM_DEBUG, "(%t) after %d seconds, \ntotal_bytes_in = %d\ntotal_bytes_out = %d\n",
Options::instance ()->performance_window (),
total_bytes_in,
total_bytes_out));
+
ACE_DEBUG ((LM_DEBUG,
"(%t) %f Mbits/sec received.\n",
(float) (total_bytes_in * 8 /
(float) (1024 * 1024 * Options::instance ()->performance_window ()))));
+
ACE_DEBUG ((LM_DEBUG,
"(%t) %f Mbits/sec sent.\n",
(float) (total_bytes_out * 8 /
(float) (1024 * 1024 * Options::instance ()->performance_window ()))));
-#endif /* ACE_NLOGGING */
// Resume all the threads again.
@@ -91,7 +79,7 @@ ACE_Event_Channel::compute_performance_statistics (void)
}
int
-ACE_Event_Channel::handle_timeout (const ACE_Time_Value &,
+Event_Channel::handle_timeout (const ACE_Time_Value &,
const void *)
{
return this->compute_performance_statistics ();
@@ -101,7 +89,7 @@ ACE_Event_Channel::handle_timeout (const ACE_Time_Value &,
// to receive it.
int
-ACE_Event_Channel::put (ACE_Message_Block *event,
+Event_Channel::put (ACE_Message_Block *event,
ACE_Time_Value *)
{
// We got a valid event, so determine its virtual forwarding
@@ -120,9 +108,8 @@ ACE_Event_Channel::put (ACE_Message_Block *event,
if (this->efd_.find (*forwarding_addr, dispatch_set) == -1)
// Failure.
ACE_ERROR ((LM_DEBUG,
- "(%t) find failed on conn id = %d, supplier id = %d, type = %d\n",
- forwarding_addr->proxy_id_,
- forwarding_addr->supplier_id_,
+ "(%t) find failed on connection id = %d, type = %d\n",
+ forwarding_addr->connection_id_,
forwarding_addr->type_));
else
{
@@ -140,28 +127,28 @@ ACE_Event_Channel::put (ACE_Message_Block *event,
// multi-threaded configuration.
// data->locking_strategy (MB_Locking_Strategy::instance ());
- for (Proxy_Handler **proxy_handler = 0;
- dsi.next (proxy_handler) != 0;
+ for (Connection_Handler **connection_handler = 0;
+ dsi.next (connection_handler) != 0;
dsi.advance ())
{
- // Only process active proxy_handlers.
- if ((*proxy_handler)->state () == Proxy_Handler::ESTABLISHED)
+ // Only process active connection_handlers.
+ if ((*connection_handler)->state () == Connection_Handler::ESTABLISHED)
{
// Duplicate the event portion via reference
// counting.
ACE_Message_Block *dup_msg = data->duplicate ();
ACE_DEBUG ((LM_DEBUG, "(%t) forwarding to Consumer %d\n",
- (*proxy_handler)->id ()));
+ (*connection_handler)->id ()));
- if ((*proxy_handler)->put (dup_msg) == -1)
+ if ((*connection_handler)->put (dup_msg) == -1)
{
if (errno == EWOULDBLOCK) // The queue has filled up!
ACE_ERROR ((LM_ERROR, "(%t) %p\n",
"gateway is flow controlled, so we're dropping events"));
else
ACE_ERROR ((LM_ERROR, "(%t) %p transmission error to peer %d\n",
- "put", (*proxy_handler)->id ()));
+ "put", (*connection_handler)->id ()));
// We are responsible for releasing an
// ACE_Message_Block if failures occur.
@@ -178,13 +165,13 @@ ACE_Event_Channel::put (ACE_Message_Block *event,
}
int
-ACE_Event_Channel::svc (void)
+Event_Channel::svc (void)
{
return 0;
}
int
-ACE_Event_Channel::initiate_proxy_connection (Proxy_Handler *proxy_handler)
+Event_Channel::initiate_connection_connection (Connection_Handler *connection_handler)
{
ACE_Synch_Options synch_options;
@@ -193,36 +180,36 @@ ACE_Event_Channel::initiate_proxy_connection (Proxy_Handler *proxy_handler)
else
synch_options = ACE_Synch_Options::synch;
- return this->connector_.initiate_connection (proxy_handler,
+ return this->connector_.initiate_connection (connection_handler,
synch_options);
}
int
-ACE_Event_Channel::complete_proxy_connection (Proxy_Handler *proxy_handler)
+Event_Channel::complete_connection_connection (Connection_Handler *connection_handler)
{
- int option = proxy_handler->proxy_role () == 'S' ? SO_RCVBUF : SO_SNDBUF;
+ int option = connection_handler->connection_role () == 'S' ? SO_RCVBUF : SO_SNDBUF;
int socket_queue_size = Options::instance ()->socket_queue_size ();
if (socket_queue_size > 0)
- if (proxy_handler->peer ().set_option (SOL_SOCKET,
+ if (connection_handler->peer ().set_option (SOL_SOCKET,
option,
&socket_queue_size,
sizeof (int)) == -1)
ACE_ERROR ((LM_ERROR, "(%t) %p\n", "set_option"));
- proxy_handler->thr_mgr (ACE_Thread_Manager::instance ());
+ connection_handler->thr_mgr (ACE_Thread_Manager::instance ());
// Our state is now "established."
- proxy_handler->state (Proxy_Handler::ESTABLISHED);
+ connection_handler->state (Connection_Handler::ESTABLISHED);
// Restart the timeout to 1.
- proxy_handler->timeout (1);
+ connection_handler->timeout (1);
- ACE_INT32 id = htonl (proxy_handler->id ());
+ ACE_INT32 id = htonl (connection_handler->id ());
// Send the connection id to the peerd.
- ssize_t n = proxy_handler->peer ().send ((const void *) &id, sizeof id);
+ ssize_t n = connection_handler->peer ().send ((const void *) &id, sizeof id);
if (n != sizeof id)
ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n",
@@ -235,24 +222,24 @@ ACE_Event_Channel::complete_proxy_connection (Proxy_Handler *proxy_handler)
// synchronously or asynchronously).
int
-ACE_Event_Channel::reinitiate_proxy_connection (Proxy_Handler *proxy_handler)
+Event_Channel::reinitiate_connection_connection (Connection_Handler *connection_handler)
{
// Skip over proxies with deactivated handles.
- if (proxy_handler->get_handle () != ACE_INVALID_HANDLE)
+ if (connection_handler->get_handle () != ACE_INVALID_HANDLE)
{
// Make sure to close down peer to reclaim descriptor.
- proxy_handler->peer ().close ();
+ connection_handler->peer ().close ();
}
- if (proxy_handler->state () != Proxy_Handler::DISCONNECTING)
+ if (connection_handler->state () != Connection_Handler::DISCONNECTING)
{
ACE_DEBUG ((LM_DEBUG,
- "(%t) scheduling reinitiation of Proxy_Handler %d\n",
- proxy_handler->id ()));
+ "(%t) scheduling reinitiation of Connection_Handler %d\n",
+ connection_handler->id ()));
// Reschedule ourselves to try and connect again.
if (ACE_Reactor::instance ()->schedule_timer
- (proxy_handler, 0, proxy_handler->timeout ()) == -1)
+ (connection_handler, 0, connection_handler->timeout ()) == -1)
ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n",
"schedule_timer"), -1);
}
@@ -262,22 +249,22 @@ ACE_Event_Channel::reinitiate_proxy_connection (Proxy_Handler *proxy_handler)
// Initiate active connections with the Consumer and Supplier Peers.
void
-ACE_Event_Channel::initiate_connector (void)
+Event_Channel::initiate_connector (void)
{
if (Options::instance ()->enabled (Options::CONSUMER_CONNECTOR | Options::SUPPLIER_CONNECTOR))
{
- PROXY_MAP_ITERATOR cmi (this->proxy_map_);
+ CONNECTION_MAP_ITERATOR cmi (this->connection_map_);
// Iterate through the Consumer Map connecting all the
- // Proxy_Handlers.
+ // Connection_Handlers.
- for (PROXY_MAP_ENTRY *me = 0;
+ for (CONNECTION_MAP_ENTRY *me = 0;
cmi.next (me) != 0;
cmi.advance ())
{
- Proxy_Handler *proxy_handler = me->int_id_;
+ Connection_Handler *connection_handler = me->int_id_;
- if (this->initiate_proxy_connection (proxy_handler) == -1)
+ if (this->initiate_connection_connection (connection_handler) == -1)
continue; // Failures are handled elsewhere...
}
}
@@ -287,7 +274,7 @@ ACE_Event_Channel::initiate_connector (void)
// to accept.
void
-ACE_Event_Channel::initiate_acceptors (void)
+Event_Channel::initiate_acceptors (void)
{
if (Options::instance ()->enabled (Options::CONSUMER_ACCEPTOR)
&& this->consumer_acceptor_.open
@@ -307,10 +294,10 @@ ACE_Event_Channel::initiate_acceptors (void)
}
// This method gracefully shuts down all the Handlers in the
-// Proxy_Handler Connection Map.
+// Connection_Handler Connection Map.
int
-ACE_Event_Channel::close (u_long)
+Event_Channel::close (u_long)
{
if (Options::instance ()->threading_strategy ()
!= Options::REACTIVE)
@@ -322,22 +309,22 @@ ACE_Event_Channel::close (u_long)
// First tell everyone that the spaceship is here...
{
- PROXY_MAP_ITERATOR cmi (this->proxy_map_);
+ CONNECTION_MAP_ITERATOR cmi (this->connection_map_);
// Iterate over all the handlers and shut them down.
- for (PROXY_MAP_ENTRY *me;
+ for (CONNECTION_MAP_ENTRY *me;
cmi.next (me) != 0;
cmi.advance ())
{
- Proxy_Handler *proxy_handler = me->int_id_;
+ Connection_Handler *connection_handler = me->int_id_;
ACE_DEBUG ((LM_DEBUG, "(%t) closing down connection %d\n",
- proxy_handler->id ()));
+ connection_handler->id ()));
- // Mark Proxy_Handler as DISCONNECTING so we don't try to
+ // Mark Connection_Handler as DISCONNECTING so we don't try to
// reconnect...
- proxy_handler->state (Proxy_Handler::DISCONNECTING);
+ connection_handler->state (Connection_Handler::DISCONNECTING);
}
}
@@ -352,16 +339,16 @@ ACE_Event_Channel::close (u_long)
// Now tell everyone that it is now time to commit suicide.
{
- PROXY_MAP_ITERATOR cmi (this->proxy_map_);
+ CONNECTION_MAP_ITERATOR cmi (this->connection_map_);
- for (PROXY_MAP_ENTRY *me;
+ for (CONNECTION_MAP_ENTRY *me;
cmi.next (me) != 0;
cmi.advance ())
{
- Proxy_Handler *proxy_handler = me->int_id_;
+ Connection_Handler *connection_handler = me->int_id_;
- // Deallocate Proxy_Handler resources.
- proxy_handler->destroy (); // Will trigger a delete.
+ // Deallocate Connection_Handler resources.
+ connection_handler->destroy (); // Will trigger a delete.
}
}
@@ -369,28 +356,28 @@ ACE_Event_Channel::close (u_long)
}
int
-ACE_Event_Channel::find_proxy (ACE_INT32 proxy_id,
- Proxy_Handler *&proxy_handler)
+Event_Channel::find_proxy (ACE_INT32 connection_id,
+ Connection_Handler *&connection_handler)
{
- return this->proxy_map_.find (proxy_id, proxy_handler);
+ return this->connection_map_.find (connection_id, connection_handler);
}
int
-ACE_Event_Channel::bind_proxy (Proxy_Handler *proxy_handler)
+Event_Channel::bind_proxy (Connection_Handler *connection_handler)
{
- int result = this->proxy_map_.bind (proxy_handler->id (), proxy_handler);
+ int result = this->connection_map_.bind (connection_handler->id (), connection_handler);
switch (result)
{
case -1:
ACE_ERROR_RETURN ((LM_ERROR,
"(%t) bind failed for connection %d\n",
- proxy_handler->id ()), -1);
+ connection_handler->id ()), -1);
/* NOTREACHED */
case 1: // Oops, found a duplicate!
ACE_ERROR_RETURN ((LM_ERROR,
"(%t) duplicate connection %d, already bound\n",
- proxy_handler->id ()), -1);
+ connection_handler->id ()), -1);
/* NOTREACHED */
case 0:
// Success.
@@ -406,7 +393,7 @@ ACE_Event_Channel::bind_proxy (Proxy_Handler *proxy_handler)
}
int
-ACE_Event_Channel::subscribe (const Event_Key &event_addr,
+Event_Channel::subscribe (const Event_Key &event_addr,
Consumer_Dispatch_Set *cds)
{
int result = this->efd_.bind (event_addr, cds);
@@ -417,12 +404,12 @@ ACE_Event_Channel::subscribe (const Event_Key &event_addr,
case -1:
ACE_ERROR_RETURN ((LM_ERROR,
"(%t) bind failed for connection %d\n",
- event_addr.proxy_id_), -1);
+ event_addr.connection_id_), -1);
/* NOTREACHED */
case 1: // Oops, found a duplicate!
ACE_ERROR_RETURN ((LM_DEBUG,
"(%t) duplicate consumer map entry %d, "
- "already bound\n", event_addr.proxy_id_), -1);
+ "already bound\n", event_addr.connection_id_), -1);
/* NOTREACHED */
case 0:
// Success.
@@ -437,9 +424,9 @@ ACE_Event_Channel::subscribe (const Event_Key &event_addr,
}
int
-ACE_Event_Channel::open (void *)
+Event_Channel::open (void *)
{
- // Ignore SIPPIPE so each Consumer_Proxy can handle it.
+ // Ignore SIPPIPE so each Consumer_Handler can handle it.
ACE_Sig_Action sig (ACE_SignalHandler (SIG_IGN), SIGPIPE);
ACE_UNUSED_ARG (sig);
@@ -469,18 +456,18 @@ ACE_Event_Channel::open (void *)
#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)
template class ACE_Lock_Adapter<ACE_SYNCH_MUTEX>;
-template class ACE_Map_Entry<ACE_INT32, Proxy_Handler *>;
-template class ACE_Map_Iterator<ACE_INT32, Proxy_Handler *, MAP_MUTEX>;
-template class ACE_Map_Reverse_Iterator<ACE_INT32, Proxy_Handler *, MAP_MUTEX>;
-template class ACE_Map_Iterator_Base<ACE_INT32, Proxy_Handler *, MAP_MUTEX>;
-template class ACE_Map_Manager<ACE_INT32, Proxy_Handler *, MAP_MUTEX>;
-template class ACE_Unbounded_Set_Iterator<Proxy_Handler *>;
+template class ACE_Map_Entry<ACE_INT32, Connection_Handler *>;
+template class ACE_Map_Iterator<ACE_INT32, Connection_Handler *, MAP_MUTEX>;
+template class ACE_Map_Reverse_Iterator<ACE_INT32, Connection_Handler *, MAP_MUTEX>;
+template class ACE_Map_Iterator_Base<ACE_INT32, Connection_Handler *, MAP_MUTEX>;
+template class ACE_Map_Manager<ACE_INT32, Connection_Handler *, MAP_MUTEX>;
+template class ACE_Unbounded_Set_Iterator<Connection_Handler *>;
#elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA)
#pragma instantiate ACE_Lock_Adapter<ACE_SYNCH_MUTEX>
-#pragma instantiate ACE_Map_Entry<ACE_INT32, Proxy_Handler *>
-#pragma instantiate ACE_Map_Iterator<ACE_INT32, Proxy_Handler *, MAP_MUTEX>
-#pragma instantiate ACE_Map_Reverse_Iterator<ACE_INT32, Proxy_Handler *, MAP_MUTEX>
-#pragma instantiate ACE_Map_Iterator_Base<ACE_INT32, Proxy_Handler *, MAP_MUTEX>
-#pragma instantiate ACE_Map_Manager<ACE_INT32, Proxy_Handler *, MAP_MUTEX>
-#pragma instantiate ACE_Unbounded_Set_Iterator<Proxy_Handler *>
+#pragma instantiate ACE_Map_Entry<ACE_INT32, Connection_Handler *>
+#pragma instantiate ACE_Map_Iterator<ACE_INT32, Connection_Handler *, MAP_MUTEX>
+#pragma instantiate ACE_Map_Reverse_Iterator<ACE_INT32, Connection_Handler *, MAP_MUTEX>
+#pragma instantiate ACE_Map_Iterator_Base<ACE_INT32, Connection_Handler *, MAP_MUTEX>
+#pragma instantiate ACE_Map_Manager<ACE_INT32, Connection_Handler *, MAP_MUTEX>
+#pragma instantiate ACE_Unbounded_Set_Iterator<Connection_Handler *>
#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */
diff --git a/apps/Gateway/Gateway/Event_Channel.h b/apps/Gateway/Gateway/Event_Channel.h
index 600c63060f9..4735c7b9d44 100644
--- a/apps/Gateway/Gateway/Event_Channel.h
+++ b/apps/Gateway/Gateway/Event_Channel.h
@@ -17,23 +17,25 @@
#if !defined (ACE_EVENT_CHANNEL)
#define ACE_EVENT_CHANNEL
-#include "Proxy_Handler_Connector.h"
-#include "Proxy_Handler_Acceptor.h"
+#include "Connection_Handler_Connector.h"
+#include "Connection_Handler_Acceptor.h"
#include "Consumer_Dispatch_Set.h"
#include "Event_Forwarding_Discriminator.h"
typedef ACE_Null_Mutex MAP_MUTEX;
-class ACE_Svc_Export ACE_Event_Channel : public ACE_Task<ACE_SYNCH>
+class ACE_Svc_Export Event_Channel : public ACE_Event_Handler
+{
// = TITLE
// Define a generic Event_Channel.
//
// = DESCRIPTION
-{
+ // We inherit from <ACE_Event_Handler> so that we can be
+ // registered with an <ACE_Reactor> to handle timeouts.
public:
// = Initialization and termination methods.
- ACE_Event_Channel (void);
- ~ACE_Event_Channel (void);
+ Event_Channel (void);
+ ~Event_Channel (void);
virtual int open (void * = 0);
// Open the channel.
@@ -42,22 +44,22 @@ public:
// Close down the Channel.
// = Proxy management methods.
- int initiate_proxy_connection (Proxy_Handler *);
- // Initiate the connection of the <Proxy_Handler> to its peer.
+ int initiate_connection_connection (Connection_Handler *);
+ // Initiate the connection of the <Connection_Handler> to its peer.
- int complete_proxy_connection (Proxy_Handler *);
- // Complete the initialization of the <Proxy_Handler> once it's
+ int complete_connection_connection (Connection_Handler *);
+ // Complete the initialization of the <Connection_Handler> once it's
// connected to its Peer.
- int reinitiate_proxy_connection (Proxy_Handler *);
+ int reinitiate_connection_connection (Connection_Handler *);
// Reinitiate a connection asynchronously when the Peer fails.
- int bind_proxy (Proxy_Handler *);
- // Bind the <Proxy_Handler> to the <proxy_map_>.
+ int bind_proxy (Connection_Handler *);
+ // Bind the <Connection_Handler> to the <connection_map_>.
- int find_proxy (ACE_INT32 proxy_id,
- Proxy_Handler *&);
- // Locate the <Proxy_Handler> with <proxy_id>.
+ int find_proxy (ACE_INT32 connection_id,
+ Connection_Handler *&);
+ // Locate the <Connection_Handler> with <connection_id>.
int subscribe (const Event_Key &event_addr,
Consumer_Dispatch_Set *cds);
@@ -91,28 +93,28 @@ private:
// Periodically callback to perform timer-based performance
// profiling.
- Proxy_Handler_Connector connector_;
+ Connection_Handler_Connector connector_;
// Used to establish the connections actively.
- Proxy_Handler_Acceptor supplier_acceptor_;
+ Connection_Handler_Acceptor supplier_acceptor_;
// Used to establish connections passively and create Suppliers.
- Proxy_Handler_Acceptor consumer_acceptor_;
+ Connection_Handler_Acceptor consumer_acceptor_;
// Used to establish connections passively and create Consumers.
// = Make life easier by defining typedefs.
- typedef ACE_Map_Manager<ACE_INT32, Proxy_Handler *, MAP_MUTEX>
- PROXY_MAP;
- typedef ACE_Map_Iterator<ACE_INT32, Proxy_Handler *, MAP_MUTEX>
- PROXY_MAP_ITERATOR;
- typedef ACE_Map_Entry<ACE_INT32, Proxy_Handler *>
- PROXY_MAP_ENTRY;
+ typedef ACE_Map_Manager<ACE_INT32, Connection_Handler *, MAP_MUTEX>
+ CONNECTION_MAP;
+ typedef ACE_Map_Iterator<ACE_INT32, Connection_Handler *, MAP_MUTEX>
+ CONNECTION_MAP_ITERATOR;
+ typedef ACE_Map_Entry<ACE_INT32, Connection_Handler *>
+ CONNECTION_MAP_ENTRY;
- PROXY_MAP proxy_map_;
- // Table that maps Connection IDs to Proxy_Handler *'s.
+ CONNECTION_MAP connection_map_;
+ // Table that maps Connection IDs to Connection_Handler *'s.
Event_Forwarding_Discriminator efd_;
- // Map that associates an event to a set of Consumer_Proxy *'s.
+ // Map that associates an event to a set of Consumer_Handler *'s.
};
#endif /* ACE_EVENT_CHANNEL */
diff --git a/apps/Gateway/Gateway/File_Parser.h b/apps/Gateway/Gateway/File_Parser.h
index 64b4d49db59..22490c6329b 100644
--- a/apps/Gateway/Gateway/File_Parser.h
+++ b/apps/Gateway/Gateway/File_Parser.h
@@ -20,9 +20,9 @@
#include "ace/OS.h"
class FP
- // = TITLE
- // This class serves as a namespace for the Return_Type
{
+ // = TITLE
+ // This class serves as a namespace for the <Return_Type>.
public:
enum Return_Type
{
@@ -37,10 +37,10 @@ public:
template <class ENTRY>
class File_Parser
- // = TITLE
- // Class used to parse the configuration file for the Consumer
- // Map.
{
+ // = TITLE
+ // Class used to parse the configuration file for the
+ // <Consumer_Map>.
public:
// = Open and Close the file specified
int open (const char filename[]);
diff --git a/apps/Gateway/Gateway/Gateway.cpp b/apps/Gateway/Gateway/Gateway.cpp
index d7df1458299..d86afab90a4 100644
--- a/apps/Gateway/Gateway/Gateway.cpp
+++ b/apps/Gateway/Gateway/Gateway.cpp
@@ -12,7 +12,7 @@ class ACE_Svc_Export Gateway : public ACE_Service_Object
// Integrates the whole Gateway application.
//
// = DESCRIPTION
- // This implementation uses the <ACE_Event_Channel> as the basis
+ // This implementation uses the <Event_Channel> as the basis
// for the <Gateway> routing.
{
protected:
@@ -27,7 +27,7 @@ protected:
// Return info about this service.
// = Configuration methods.
- int parse_proxy_config_file (void);
+ int parse_connection_config_file (void);
// Parse the proxy configuration file.
int parse_consumer_config_file (void);
@@ -41,12 +41,12 @@ protected:
int handle_signal (int signum, siginfo_t * = 0, ucontext_t * = 0);
// Shut down the Gateway when a signal arrives.
- ACE_Event_Channel event_channel_;
+ Event_Channel event_channel_;
// The Event Channel routes events from Supplier(s) to Consumer(s)
- // using <Supplier_Proxy> and <Consumer_Proxy> objects.
+ // using <Supplier_Handler> and <Consumer_Handler> objects.
- Proxy_Handler_Factory proxy_handler_factory_;
- // Creates the appropriate type of <Proxy_Handlers>.
+ Connection_Handler_Factory connection_handler_factory_;
+ // Creates the appropriate type of <Connection_Handlers>.
int debug_;
// Are we debugging?
@@ -120,10 +120,11 @@ Gateway::init (int argc, char *argv[])
Options::instance ()->performance_window ()));
}
- if (Options::instance ()->enabled (Options::CONSUMER_CONNECTOR | Options::SUPPLIER_CONNECTOR))
+ if (Options::instance ()->enabled
+ (Options::CONSUMER_CONNECTOR | Options::SUPPLIER_CONNECTOR))
{
// Parse the proxy configuration file.
- this->parse_proxy_config_file ();
+ this->parse_connection_config_file ();
// Parse the consumer config file and build the event forwarding
// discriminator.
@@ -172,22 +173,22 @@ Gateway::info (char **strp, size_t length) const
// Parse and build the proxy table.
int
-Gateway::parse_proxy_config_file (void)
+Gateway::parse_connection_config_file (void)
{
// File that contains the proxy configuration information.
- Proxy_Config_File_Parser proxy_file;
+ Connection_Config_File_Parser connection_file;
int file_empty = 1;
int line_number = 0;
- if (proxy_file.open (Options::instance ()->proxy_config_file ()) == -1)
+ if (connection_file.open (Options::instance ()->connection_config_file ()) == -1)
ACE_ERROR_RETURN ((LM_ERROR,
"(%t) %p\n",
- Options::instance ()->proxy_config_file ()),
+ Options::instance ()->connection_config_file ()),
-1);
// Read config file one line at a time.
- for (Proxy_Config_Info pci;
- proxy_file.read_entry (pci, line_number) != FP::EOFILE;
+ for (Connection_Config_Info pci;
+ connection_file.read_entry (pci, line_number) != FP::EOFILE;
)
{
file_empty = 0;
@@ -196,10 +197,10 @@ Gateway::parse_proxy_config_file (void)
ACE_DEBUG ((LM_DEBUG,
"(%t) conn id = %d, host = %s, remote port = %d, proxy role = %c, "
"max retry timeout = %d, local port = %d, priority = %d\n",
- pci.proxy_id_,
+ pci.connection_id_,
pci.host_,
pci.remote_port_,
- pci.proxy_role_,
+ pci.connection_role_,
pci.max_retry_timeout_,
pci.local_port_,
pci.priority_));
@@ -207,22 +208,22 @@ Gateway::parse_proxy_config_file (void)
pci.event_channel_ = &this->event_channel_;
// Create the appropriate type of Proxy.
- Proxy_Handler *proxy_handler =
- this->proxy_handler_factory_.make_proxy_handler (pci);
+ Connection_Handler *connection_handler =
+ this->connection_handler_factory_.make_connection_handler (pci);
- if (proxy_handler == 0)
+ if (connection_handler == 0)
ACE_ERROR_RETURN ((LM_ERROR,
"%p\n",
- "make_proxy_handler"),
+ "make_connection_handler"),
-1);
- // Bind the new Proxy_Handler to the connection ID.
- this->event_channel_.bind_proxy (proxy_handler);
+ // Bind the new Connection_Handler to the connection ID.
+ this->event_channel_.bind_proxy (connection_handler);
}
if (file_empty)
ACE_ERROR ((LM_WARNING,
- "warning: connection proxy_handler configuration file was empty\n"));
+ "warning: connection connection_handler configuration file was empty\n"));
return 0;
}
@@ -250,10 +251,9 @@ Gateway::parse_consumer_config_file (void)
if (Options::instance ()->enabled (Options::DEBUG))
{
ACE_DEBUG ((LM_DEBUG,
- "(%t) conn id = %d, supplier id = %d, payload = %d, "
+ "(%t) connection id = %d, payload = %d, "
"number of consumers = %d\n",
- cci.proxy_id_,
- cci.supplier_id_,
+ cci.connection_id_,
cci.type_,
cci.total_consumers_));
@@ -267,20 +267,19 @@ Gateway::parse_consumer_config_file (void)
Consumer_Dispatch_Set *dispatch_set;
ACE_NEW_RETURN (dispatch_set, Consumer_Dispatch_Set, -1);
- Event_Key event_addr (cci.proxy_id_,
- cci.supplier_id_,
+ Event_Key event_addr (cci.connection_id_,
cci.type_);
// Add the Consumers to the Dispatch_Set.
for (int i = 0; i < cci.total_consumers_; i++)
{
- Proxy_Handler *proxy_handler = 0;
+ Connection_Handler *connection_handler = 0;
// Lookup destination and add to Consumer_Dispatch_Set set
// if found.
if (this->event_channel_.find_proxy (cci.consumers_[i],
- proxy_handler) != -1)
- dispatch_set->insert (proxy_handler);
+ connection_handler) != -1)
+ dispatch_set->insert (connection_handler);
else
ACE_ERROR ((LM_ERROR, "(%t) not found: destination[%d] = %d\n",
i, cci.consumers_[i]));
@@ -301,10 +300,10 @@ Gateway::parse_consumer_config_file (void)
ACE_SVC_FACTORY_DEFINE (Gateway)
#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)
-template class ACE_Node<Proxy_Handler *>;
-template class ACE_Unbounded_Set<Proxy_Handler *>;
+template class ACE_Node<Connection_Handler *>;
+template class ACE_Unbounded_Set<Connection_Handler *>;
#elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA)
-#pragma instantiate ACE_Node<Proxy_Handler *>
-#pragma instantiate ACE_Unbounded_Set<Proxy_Handler *>
+#pragma instantiate ACE_Node<Connection_Handler *>
+#pragma instantiate ACE_Unbounded_Set<Connection_Handler *>
#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */
diff --git a/apps/Gateway/Gateway/Makefile b/apps/Gateway/Gateway/Makefile
index 31d524bdbfc..9004ef6e63c 100644
--- a/apps/Gateway/Gateway/Makefile
+++ b/apps/Gateway/Gateway/Makefile
@@ -12,16 +12,16 @@ BIN = gatewayd
LIB = libGateway.a
SHLIB = libGateway.$(SOEXT)
-FILES = Concrete_Proxy_Handlers \
+FILES = Concrete_Connection_Handlers \
Config_Files \
File_Parser \
Gateway \
Event_Channel \
Event_Forwarding_Discriminator \
Options \
- Proxy_Handler \
- Proxy_Handler_Acceptor \
- Proxy_Handler_Connector
+ Connection_Handler \
+ Connection_Handler_Acceptor \
+ Connection_Handler_Connector
LSRC = $(addsuffix .cpp,$(FILES))
LDLIBS = -lGateway
@@ -52,8 +52,8 @@ include $(ACE_ROOT)/include/makeinclude/rules.local.GNU
# DO NOT DELETE THIS LINE -- g++dep uses it.
# DO NOT PUT ANYTHING AFTER THIS LINE, IT WILL GO AWAY.
-.obj/Concrete_Proxy_Handlers.o .shobj/Concrete_Proxy_Handlers.so: Concrete_Proxy_Handlers.cpp Event_Channel.h \
- Proxy_Handler_Connector.h \
+.obj/Concrete_Connection_Handlers.o .shobj/Concrete_Connection_Handlers.so: Concrete_Connection_Handlers.cpp Event_Channel.h \
+ Connection_Handler_Connector.h \
$(ACE_ROOT)/ace/Connector.h \
$(ACE_ROOT)/ace/Service_Config.h \
$(ACE_ROOT)/ace/Service_Object.h \
@@ -152,14 +152,14 @@ include $(ACE_ROOT)/include/makeinclude/rules.local.GNU
$(ACE_ROOT)/ace/SOCK_Stream.i \
$(ACE_ROOT)/ace/Time_Value.h \
$(ACE_ROOT)/ace/SOCK_Connector.i \
- Proxy_Handler.h Config_Files.h File_Parser.h Event.h \
- Proxy_Handler_Acceptor.h \
+ Connection_Handler.h Config_Files.h File_Parser.h Event.h \
+ Connection_Handler_Acceptor.h \
$(ACE_ROOT)/ace/Acceptor.h \
$(ACE_ROOT)/ace/Acceptor.i \
$(ACE_ROOT)/ace/SOCK_Acceptor.h \
$(ACE_ROOT)/ace/SOCK_Acceptor.i \
Consumer_Dispatch_Set.h Event_Forwarding_Discriminator.h \
- Concrete_Proxy_Handlers.h
+ Concrete_Connection_Handlers.h
.obj/Config_Files.o .shobj/Config_Files.so: Config_Files.cpp \
$(ACE_ROOT)/ace/OS.h \
$(ACE_ROOT)/ace/config.h \
@@ -242,7 +242,7 @@ include $(ACE_ROOT)/include/makeinclude/rules.local.GNU
$(ACE_ROOT)/ace/Reactor.i \
$(ACE_ROOT)/ace/Reactor_Impl.h \
$(ACE_ROOT)/ace/Svc_Conf_Tokens.h \
- Event_Channel.h Proxy_Handler_Connector.h \
+ Event_Channel.h Connection_Handler_Connector.h \
$(ACE_ROOT)/ace/Connector.h \
$(ACE_ROOT)/ace/Map_Manager.h \
$(ACE_ROOT)/ace/Map_Manager.i \
@@ -291,13 +291,13 @@ include $(ACE_ROOT)/include/makeinclude/rules.local.GNU
$(ACE_ROOT)/ace/SOCK_Stream.i \
$(ACE_ROOT)/ace/Time_Value.h \
$(ACE_ROOT)/ace/SOCK_Connector.i \
- Proxy_Handler.h Event.h Proxy_Handler_Acceptor.h \
+ Connection_Handler.h Event.h Connection_Handler_Acceptor.h \
$(ACE_ROOT)/ace/Acceptor.h \
$(ACE_ROOT)/ace/Acceptor.i \
$(ACE_ROOT)/ace/SOCK_Acceptor.h \
$(ACE_ROOT)/ace/SOCK_Acceptor.i \
Consumer_Dispatch_Set.h Event_Forwarding_Discriminator.h Gateway.h
-.obj/Event_Channel.o .shobj/Event_Channel.so: Event_Channel.cpp Proxy_Handler_Connector.h \
+.obj/Event_Channel.o .shobj/Event_Channel.so: Event_Channel.cpp Connection_Handler_Connector.h \
$(ACE_ROOT)/ace/Connector.h \
$(ACE_ROOT)/ace/Service_Config.h \
$(ACE_ROOT)/ace/Service_Object.h \
@@ -396,8 +396,8 @@ include $(ACE_ROOT)/include/makeinclude/rules.local.GNU
$(ACE_ROOT)/ace/SOCK_Stream.i \
$(ACE_ROOT)/ace/Time_Value.h \
$(ACE_ROOT)/ace/SOCK_Connector.i \
- Proxy_Handler.h Config_Files.h File_Parser.h Event.h Event_Channel.h \
- Proxy_Handler_Acceptor.h \
+ Connection_Handler.h Config_Files.h File_Parser.h Event.h Event_Channel.h \
+ Connection_Handler_Acceptor.h \
$(ACE_ROOT)/ace/Acceptor.h \
$(ACE_ROOT)/ace/Acceptor.i \
$(ACE_ROOT)/ace/SOCK_Acceptor.h \
@@ -435,8 +435,8 @@ include $(ACE_ROOT)/include/makeinclude/rules.local.GNU
Consumer_Dispatch_Set.h \
$(ACE_ROOT)/ace/Containers.h \
$(ACE_ROOT)/ace/Containers.i
-.obj/Proxy_Handler.o .shobj/Proxy_Handler.so: Proxy_Handler.cpp Event_Channel.h \
- Proxy_Handler_Connector.h \
+.obj/Connection_Handler.o .shobj/Connection_Handler.so: Connection_Handler.cpp Event_Channel.h \
+ Connection_Handler_Connector.h \
$(ACE_ROOT)/ace/Connector.h \
$(ACE_ROOT)/ace/Service_Config.h \
$(ACE_ROOT)/ace/Service_Object.h \
@@ -535,16 +535,16 @@ include $(ACE_ROOT)/include/makeinclude/rules.local.GNU
$(ACE_ROOT)/ace/SOCK_Stream.i \
$(ACE_ROOT)/ace/Time_Value.h \
$(ACE_ROOT)/ace/SOCK_Connector.i \
- Proxy_Handler.h Config_Files.h File_Parser.h Event.h \
- Proxy_Handler_Acceptor.h \
+ Connection_Handler.h Config_Files.h File_Parser.h Event.h \
+ Connection_Handler_Acceptor.h \
$(ACE_ROOT)/ace/Acceptor.h \
$(ACE_ROOT)/ace/Acceptor.i \
$(ACE_ROOT)/ace/SOCK_Acceptor.h \
$(ACE_ROOT)/ace/SOCK_Acceptor.i \
Consumer_Dispatch_Set.h Event_Forwarding_Discriminator.h \
- Concrete_Proxy_Handlers.h
-.obj/Proxy_Handler_Acceptor.o .shobj/Proxy_Handler_Acceptor.so: Proxy_Handler_Acceptor.cpp Event_Channel.h \
- Proxy_Handler_Connector.h \
+ Concrete_Connection_Handlers.h
+.obj/Connection_Handler_Acceptor.o .shobj/Connection_Handler_Acceptor.so: Connection_Handler_Acceptor.cpp Event_Channel.h \
+ Connection_Handler_Connector.h \
$(ACE_ROOT)/ace/Connector.h \
$(ACE_ROOT)/ace/Service_Config.h \
$(ACE_ROOT)/ace/Service_Object.h \
@@ -643,15 +643,15 @@ include $(ACE_ROOT)/include/makeinclude/rules.local.GNU
$(ACE_ROOT)/ace/SOCK_Stream.i \
$(ACE_ROOT)/ace/Time_Value.h \
$(ACE_ROOT)/ace/SOCK_Connector.i \
- Proxy_Handler.h Config_Files.h File_Parser.h Event.h \
- Proxy_Handler_Acceptor.h \
+ Connection_Handler.h Config_Files.h File_Parser.h Event.h \
+ Connection_Handler_Acceptor.h \
$(ACE_ROOT)/ace/Acceptor.h \
$(ACE_ROOT)/ace/Acceptor.i \
$(ACE_ROOT)/ace/SOCK_Acceptor.h \
$(ACE_ROOT)/ace/SOCK_Acceptor.i \
Consumer_Dispatch_Set.h Event_Forwarding_Discriminator.h
-.obj/Proxy_Handler_Connector.o .shobj/Proxy_Handler_Connector.so: Proxy_Handler_Connector.cpp \
- Proxy_Handler_Connector.h \
+.obj/Connection_Handler_Connector.o .shobj/Connection_Handler_Connector.so: Connection_Handler_Connector.cpp \
+ Connection_Handler_Connector.h \
$(ACE_ROOT)/ace/Connector.h \
$(ACE_ROOT)/ace/Service_Config.h \
$(ACE_ROOT)/ace/Service_Object.h \
@@ -750,6 +750,6 @@ include $(ACE_ROOT)/include/makeinclude/rules.local.GNU
$(ACE_ROOT)/ace/SOCK_Stream.i \
$(ACE_ROOT)/ace/Time_Value.h \
$(ACE_ROOT)/ace/SOCK_Connector.i \
- Proxy_Handler.h Config_Files.h File_Parser.h Event.h
+ Connection_Handler.h Config_Files.h File_Parser.h Event.h
# IF YOU PUT ANYTHING HERE IT WILL GO AWAY
diff --git a/apps/Gateway/Gateway/Options.cpp b/apps/Gateway/Gateway/Options.cpp
index 48f4834bfe9..073eea17494 100644
--- a/apps/Gateway/Gateway/Options.cpp
+++ b/apps/Gateway/Gateway/Options.cpp
@@ -29,9 +29,10 @@ Options::Options (void)
supplier_acceptor_port_ (DEFAULT_PEER_SUPPLIER_PORT),
supplier_connector_port_ (DEFAULT_GATEWAY_SUPPLIER_PORT),
consumer_connector_port_ (DEFAULT_GATEWAY_CONSUMER_PORT),
- max_timeout_ (MAX_TIMEOUT)
+ max_timeout_ (MAX_TIMEOUT),
+ max_queue_size_ (MAX_QUEUE_SIZE)
{
- ACE_OS::strcpy (this->proxy_config_file_, "proxy_config");
+ ACE_OS::strcpy (this->connection_config_file_, "connection_config");
ACE_OS::strcpy (this->consumer_config_file_, "consumer_config");
}
@@ -89,9 +90,9 @@ Options::threading_strategy (void) const
}
const char *
-Options::proxy_config_file (void) const
+Options::connection_config_file (void) const
{
- return this->proxy_config_file_;
+ return this->connection_config_file_;
}
const char *
@@ -118,6 +119,12 @@ Options::consumer_connector_port (void) const
return this->consumer_connector_port_;
}
+long
+Options::max_queue_size (void) const
+{
+ return this->max_queue_size_;
+}
+
u_short
Options::supplier_connector_port (void) const
{
@@ -132,7 +139,7 @@ Options::parse_args (int argc, char *argv[])
// Assign defaults.
ACE_Get_Opt get_opt (argc,
argv,
- "a:bC:c:dP:p:q:r:t:vw:",
+ "a:bC:c:dm:P:p:q:r:t:vw:",
0);
for (int c; (c = get_opt ()) != EOF; )
@@ -200,9 +207,9 @@ Options::parse_args (int argc, char *argv[])
Options::DEBUG);
break;
case 'P': // Use a different consumer config filename.
- ACE_OS::strncpy (this->proxy_config_file_,
+ ACE_OS::strncpy (this->connection_config_file_,
get_opt.optarg,
- sizeof this->proxy_config_file_);
+ sizeof this->connection_config_file_);
break;
case 'q': // Use a different socket queue size.
this->socket_queue_size_ = ACE_OS::atoi (get_opt.optarg);
diff --git a/apps/Gateway/Gateway/Options.h b/apps/Gateway/Gateway/Options.h
index 704d706edd7..7279e61f839 100644
--- a/apps/Gateway/Gateway/Options.h
+++ b/apps/Gateway/Gateway/Options.h
@@ -20,9 +20,9 @@
#include "ace/Synch.h"
class ACE_Svc_Export Options
+{
// = TITLE
// Options Singleton for a gatewayd.
-{
public:
// = Options that can be enabled/disabled.
enum
@@ -38,10 +38,7 @@ public:
SUPPLIER_ACCEPTOR = 04,
CONSUMER_ACCEPTOR = 010,
SUPPLIER_CONNECTOR = 020,
- CONSUMER_CONNECTOR = 040,
-
- MAX_TIMEOUT = 32
- // The maximum timeout for trying to re-establish connections.
+ CONSUMER_CONNECTOR = 040
};
static Options *instance (void);
@@ -102,7 +99,7 @@ public:
// Our connector port host, i.e., the host running the gatewayd
// process.
- const char *proxy_config_file (void) const;
+ const char *connection_config_file (void) const;
// Name of the connection configuration file.
const char *consumer_config_file (void) const;
@@ -111,7 +108,19 @@ public:
long max_timeout (void) const;
// The maximum retry timeout delay.
+ long max_queue_size (void) const;
+ // The maximum size of the queue.
+
private:
+ enum
+ {
+ MAX_QUEUE_SIZE = 1024 * 1024 * 16,
+ // We'll allow up to 16 megabytes to be queued per-output proxy.
+
+ MAX_TIMEOUT = 32
+ // The maximum timeout for trying to re-establish connections.
+ };
+
Options (void);
// Initialization.
@@ -161,7 +170,10 @@ private:
long max_timeout_;
// The maximum retry timeout delay.
- char proxy_config_file_[MAXPATHLEN + 1];
+ long max_queue_size_;
+ // The maximum size of the queue.
+
+ char connection_config_file_[MAXPATHLEN + 1];
// Name of the connection configuration file.
char consumer_config_file_[MAXPATHLEN + 1];
diff --git a/apps/Gateway/Gateway/Proxy_Handler_Acceptor.cpp b/apps/Gateway/Gateway/Proxy_Handler_Acceptor.cpp
deleted file mode 100644
index 487c9e78f9c..00000000000
--- a/apps/Gateway/Gateway/Proxy_Handler_Acceptor.cpp
+++ /dev/null
@@ -1,16 +0,0 @@
-// $Id$
-
-#include "Event_Channel.h"
-#include "Proxy_Handler_Acceptor.h"
-
-Proxy_Handler_Acceptor::Proxy_Handler_Acceptor (ACE_Event_Channel &ec)
- : event_channel_ (ec)
-{
-}
-
-#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)
-template class ACE_Acceptor<Proxy_Handler_Factory, ACE_SOCK_ACCEPTOR>;
-#elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA)
-#pragma instantiate ACE_Acceptor<Proxy_Handler_Factory, ACE_SOCK_ACCEPTOR>
-#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */
-
diff --git a/apps/Gateway/Gateway/Proxy_Handler_Acceptor.h b/apps/Gateway/Gateway/Proxy_Handler_Acceptor.h
deleted file mode 100644
index df1aaa64e86..00000000000
--- a/apps/Gateway/Gateway/Proxy_Handler_Acceptor.h
+++ /dev/null
@@ -1,40 +0,0 @@
-/* -*- C++ -*- */
-// $Id$
-
-// ============================================================================
-//
-// = LIBRARY
-// gateway
-//
-// = FILENAME
-// Proxy_Handler_acceptor.h
-//
-// = AUTHOR
-// Doug Schmidt
-//
-// ============================================================================
-
-#if !defined (_PROXY_HANDLER_ACCEPTOR)
-#define _PROXY_HANDLER_ACCEPTOR
-
-#include "ace/Acceptor.h"
-#include "ace/SOCK_Acceptor.h"
-#include "Proxy_Handler.h"
-
-// Forward declaration
-class ACE_Event_Channel;
-
-class Proxy_Handler_Acceptor : public ACE_Acceptor<Proxy_Handler_Factory, ACE_SOCK_ACCEPTOR>
- // = TITLE
- // A concrete factory class that setups connections to peerds
- // and produces a new Proxy_Handler object to do the dirty work...
-{
-public:
- Proxy_Handler_Acceptor (ACE_Event_Channel &);
-
-protected:
- ACE_Event_Channel &event_channel_;
- // Reference to the event channel.
-};
-
-#endif /* _PROXY_HANDLER_ACCEPTOR */
diff --git a/apps/Gateway/Gateway/Proxy_Handler_Connector.cpp b/apps/Gateway/Gateway/Proxy_Handler_Connector.cpp
deleted file mode 100644
index 4799fbacbd4..00000000000
--- a/apps/Gateway/Gateway/Proxy_Handler_Connector.cpp
+++ /dev/null
@@ -1,73 +0,0 @@
-// $Id$
-
-#include "Proxy_Handler_Connector.h"
-
-Proxy_Handler_Connector::Proxy_Handler_Connector (void)
-{
-}
-
-// Initiate (or reinitiate) a connection to the Proxy_Handler.
-
-int
-Proxy_Handler_Connector::initiate_connection (Proxy_Handler *proxy_handler,
- ACE_Synch_Options &synch_options)
-{
- char addr_buf[MAXHOSTNAMELEN];
-
- // Mark ourselves as idle so that the various iterators
- // will ignore us until we are reconnected.
- proxy_handler->state (Proxy_Handler::IDLE);
-
- // We check the remote addr second so that it remains in the addr_buf.
- if (proxy_handler->local_addr ().addr_to_string (addr_buf, sizeof addr_buf) == -1
- || proxy_handler->remote_addr ().addr_to_string (addr_buf, sizeof addr_buf) == -1)
- ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n",
- "can't obtain peer's address"), -1);
-
- // Try to connect to the Peer.
-
- if (this->connect (proxy_handler, proxy_handler->remote_addr (),
- synch_options, proxy_handler->local_addr ()) == -1)
- {
- if (errno != EWOULDBLOCK)
- {
- proxy_handler->state (Proxy_Handler::FAILED);
- ACE_DEBUG ((LM_DEBUG, "(%t) %p on address %s\n",
- "connect", addr_buf));
-
- return -1;
- }
- else
- {
- proxy_handler->state (Proxy_Handler::CONNECTING);
- ACE_DEBUG ((LM_DEBUG,
- "(%t) in the process of connecting to %s\n",
- addr_buf));
- }
- }
- else
- {
- proxy_handler->state (Proxy_Handler::ESTABLISHED);
- ACE_DEBUG ((LM_DEBUG, "(%t) connected to %s on %d\n",
- addr_buf, proxy_handler->get_handle ()));
- }
- return 0;
-}
-
-#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)
-template class ACE_Connector<Proxy_Handler, ACE_SOCK_CONNECTOR>;
-template class ACE_Svc_Tuple<Proxy_Handler>;
-template class ACE_Map_Manager<ACE_HANDLE, ACE_Svc_Tuple<Proxy_Handler> *, ACE_SYNCH_RW_MUTEX>;
-template class ACE_Map_Iterator_Base<ACE_HANDLE, ACE_Svc_Tuple<Proxy_Handler> *, ACE_SYNCH_RW_MUTEX>;
-template class ACE_Map_Iterator<ACE_HANDLE, ACE_Svc_Tuple<Proxy_Handler> *, ACE_SYNCH_RW_MUTEX>;
-template class ACE_Map_Reverse_Iterator<ACE_HANDLE, ACE_Svc_Tuple<Proxy_Handler> *, ACE_SYNCH_RW_MUTEX>;
-template class ACE_Map_Entry<ACE_HANDLE, ACE_Svc_Tuple<Proxy_Handler> *>;
-#elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA)
-#pragma instantiate ACE_Connector<Proxy_Handler, ACE_SOCK_CONNECTOR>
-#pragma instantiate ACE_Svc_Tuple<Proxy_Handler>
-#pragma instantiate ACE_Map_Manager<ACE_HANDLE, ACE_Svc_Tuple<Proxy_Handler> *, ACE_SYNCH_RW_MUTEX>
-#pragma instantiate ACE_Map_Iterator_Base<ACE_HANDLE, ACE_Svc_Tuple<Proxy_Handler> *, ACE_SYNCH_RW_MUTEX>
-#pragma instantiate ACE_Map_Iterator<ACE_HANDLE, ACE_Svc_Tuple<Proxy_Handler> *, ACE_SYNCH_RW_MUTEX>
-#pragma instantiate ACE_Map_Reverse_Iterator<ACE_HANDLE, ACE_Svc_Tuple<Proxy_Handler> *, ACE_SYNCH_RW_MUTEX>
-#pragma instantiate ACE_Map_Entry<ACE_HANDLE, ACE_Svc_Tuple<Proxy_Handler> *>
-#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */
diff --git a/apps/Gateway/Gateway/connection_config b/apps/Gateway/Gateway/connection_config
new file mode 100644
index 00000000000..10813942dd0
--- /dev/null
+++ b/apps/Gateway/Gateway/connection_config
@@ -0,0 +1,52 @@
+# Configuration file that the gatewayd process uses to determine
+# connection information about proxies.
+#
+# The following provides an explanation for the fields in this file,
+# and how they relate to fields in the corresponding "consumer_config"
+# file.
+#
+# 1. Connection ID -- Each Connection Handler is given a unique ID
+# that is used in the "consumer_config" file to specify to which
+# Consumers the Event Channel will forward incoming events from
+# Suppliers using that connection. The Connection ID field is the
+# "key" that is used to match up connections in this file with the
+# Consumer subscription requests in the "consumer_config" file.
+#
+# 2. Host -- The host name where the Supplier/Consumer peerd
+# process is running.
+#
+# 3. Remote Port -- The port number where the remote
+# Supplier/Consumer peerd process is listening on.
+# If this is a '*' character it is an indication to the
+# Gateway to use the "default value," e.g., which can be provided
+# on the command-line, etc.
+#
+# 4. Handler Role -- i.e., Consumer ('C') or Supplier ('S')
+#
+# 5. Max Retry Timeout -- The maximum amount of time that we'll
+# wait between retry attempts (these start at 1 second and
+# double until they reach the Max Retry Timeout).
+# If this is a '*' character it is an indication to the
+# Gateway to use the "default value," e.g., which can be provided
+# on the command-line, etc.
+#
+# 6. Local Port -- The port number that we want to use for
+# our local Proxy connection. If this is the value 0 or the '*'
+# character, then we'll let the socket implementation pick this
+# value for us.
+#
+# 7. Priority -- Each Consumer/Supplier can be given a priority
+# that will determine its importance relative to other
+# Consumers/Suppliers (this feature isn't implemented yet).
+#
+# Connection Host Remote Handler Max Retry Local Priority
+# ID Port Role Timeout Port
+# ---------- -------- ------ ------ ---------- ----- --------
+ 1 flamenco * S * * 1
+ 2 mambo * C * * 1
+# 3 mambo.cs * C * * 1
+# 4 lambada.cs * C * * 1
+# 5 lambada.cs * C * * 1
+# 6 tango.cs * C * * 1
+# 7 tango.cs * S * * 1
+# 8 tango.cs * C * * 1
diff --git a/apps/Gateway/Gateway/consumer_config b/apps/Gateway/Gateway/consumer_config
index 7d20a50a579..1aaa3fc4028 100644
--- a/apps/Gateway/Gateway/consumer_config
+++ b/apps/Gateway/Gateway/consumer_config
@@ -5,34 +5,31 @@
# Consumers to subscribe to particular types of events, as well.
#
# The following provides an explanation for the fields in this file,
-# and how they relate to fields in the corresponding "proxy_config"
+# and how they relate to fields in the corresponding "connection_config"
# file.
#
-# 1. Proxy ID -- Each Proxy is given a unique ID that is used
-# in the "consumer_config" file to specify to which Consumers
-# the Event Channel will forward incoming events from Suppliers.
-# The Proxy ID field is the "key" that is used to match up
-# Consumer subscription requests in this file with Proxy
-# connections in the "proxy_config" file.
+# 1. Connection ID -- Each Connection Handler is given a unique ID
+# that is used in the "consumer_config" file to specify to which
+# Consumers the Event Channel will forward incoming events from
+# Suppliers. The Connection ID field is the "key" that is used to
+# match up Consumer subscription requests in this file with
+# connections in the "connection_config" file.
#
-# 2. Supplier ID -- Currently, this has the same meaning as the
-# Proxy ID, though a more sophisticated implementation might change
-# this...
+# 2. Event Type -- Indicates the type of the event. Consumers
+# can use this to only subscribe to certain types of events.
+# This feature is currently not implemented.
#
-# 3. Type -- Indicates the type of the event. Consumers
-# can use this to only subscribe to certain types of events. This
-# feature is currently not implemented.
-#
-# 4. Consumers -- Indicates which Consumers will receive events sent
+# 3. Consumers -- Indicates which Consumers will receive events sent
# from this Proxy/Supplier ID, i.e., Consumers can subscribe to
# receive events from particular Suppliers. Note that more than
# one Consumer can subscribe to the same Supplier event, i.e.,
# we support logical "multicast" (which is currently implemented
# using multi-point unicast via TCP/IP).
#
-# Proxy ID Supplier ID Type Consumers
-# -------- ----------- ------- ------------
- 1 1 0 2
-# 2 2 0 3,4
-# 3 3 0 4
-# 4 4 0 5
+# Connection Event Consumers
+# ID Type
+# ---------- ---- ---------
+ 1 0 2
+# 2 0 3,4
+# 3 0 4
+# 4 0 5
diff --git a/apps/Gateway/Gateway/proxy_config b/apps/Gateway/Gateway/proxy_config
deleted file mode 100644
index cdc7d07ffa3..00000000000
--- a/apps/Gateway/Gateway/proxy_config
+++ /dev/null
@@ -1,51 +0,0 @@
-# Configuration file that the gatewayd process uses to determine
-# connection information about proxies.
-#
-# The following provides an explanation for the fields in this file,
-# and how they relate to fields in the corresponding "consumer_config"
-# file.
-#
-# 1. Proxy ID -- Each Proxy is given a unique ID that is used
-# in the "consumer_config" file to specify to which Consumers
-# the Event Channel will forward incoming events from Suppliers.
-# The Proxy ID field is the "key" that is used to match up Proxy
-# connections in this file with the Consumer subscription requests
-# in the "consumer_config" file.
-#
-# 2. Host -- The host name where the Supplier/Consumer peerd
-# process is running.
-#
-# 3. Remote Port -- The port number where the remote
-# Supplier/Consumer peerd process is listening on.
-# If this is a '*' character it is an indication to the
-# Gateway to use the "default value," e.g., which can be provided
-# on the command-line, etc.
-#
-# 4. Proxy Role -- i.e., Consumer ('C') or Supplier ('S')
-#
-# 5. Max Retry Timeout -- The maximum amount of time that we'll
-# wait between retry attempts (these start at 1 second and
-# double until they reach the Max Retry Timeout).
-# If this is a '*' character it is an indication to the
-# Gateway to use the "default value," e.g., which can be provided
-# on the command-line, etc.
-#
-# 6. Local Port -- The port number that we want to use for
-# our local Proxy connection. If this is the value 0, then
-# we'll let the socket implementation pick this value for us.
-#
-# 7. Priority -- Each Consumer/Supplier can be given a priority
-# that will determine its importance relative to other
-# Consumers/Suppliers (this feature isn't implemented yet).
-#
-# Proxy Host Remote Proxy Max Retry Local Priority
-# ID Port Role Timeout Port
-# ---- -------- ------ ------ ---------- ----- --------
- 1 flamenco 10004 S 32 0 1
- 2 tango 10004 C 32 0 1
-# 3 mambo.cs 10002 C 32 0 1
-# 4 lambada.cs 10002 C 32 0 1
-# 5 lambada.cs 10002 C 32 0 1
-# 6 tango.cs 10002 C 32 0 1
-# 7 tango.cs 5001 S 32 0 1
-# 8 tango.cs 5002 C 32 0 1
diff --git a/apps/Gateway/Peer/Options.cpp b/apps/Gateway/Peer/Options.cpp
index fb6daccce91..8931e548f08 100644
--- a/apps/Gateway/Peer/Options.cpp
+++ b/apps/Gateway/Peer/Options.cpp
@@ -88,7 +88,7 @@ Options::enabled (int option) const
void
Options::parse_args (int argc, char *argv[])
{
- ACE_Get_Opt get_opt (argc, argv, "a:c:h:q:t:v", 0);
+ ACE_Get_Opt get_opt (argc, argv, "a:c:h:m:t:v", 0);
for (int c; (c = get_opt ()) != -1; )
{
@@ -147,7 +147,7 @@ Options::parse_args (int argc, char *argv[])
this->connector_host_ = get_opt.optarg;
break;
/* NOTREACHED */
- case 'q':
+ case 'm':
// max queue size.
this->max_queue_size_ = ACE_OS::atoi (get_opt.optarg);
break;
diff --git a/apps/Gateway/Peer/Peer.cpp b/apps/Gateway/Peer/Peer.cpp
index b67867c9be3..bc1d7a24c02 100644
--- a/apps/Gateway/Peer/Peer.cpp
+++ b/apps/Gateway/Peer/Peer.cpp
@@ -6,7 +6,7 @@
#include "Options.h"
Peer_Handler::Peer_Handler (void)
- : proxy_id_ (0),
+ : connection_id_ (0),
msg_frag_ (0),
total_bytes_ (0)
{
@@ -53,8 +53,8 @@ Peer_Handler::open (void *a)
"schedule_wakeup"),
-1);
- // First action is to wait to be notified of our supplier id.
- this->do_action_ = &Peer_Handler::await_supplier_id;
+ // First action is to wait to be notified of our connection id.
+ this->do_action_ = &Peer_Handler::await_connection_id;
return 0;
}
@@ -63,7 +63,7 @@ Peer_Handler::open (void *a)
int
Peer_Handler::xmit_stdin (void)
{
- if (this->proxy_id_ != -1)
+ if (this->connection_id_ != -1)
{
ACE_Message_Block *mb;
@@ -94,9 +94,7 @@ Peer_Handler::xmit_stdin (void)
ACE_ERROR ((LM_ERROR, "%p\n", "read"));
break;
default:
- // For simplicity, we'll use our proxy id as the supplier id
- // (which we must store in network byte order, of course).
- event->header_.supplier_id_ = this->proxy_id_;
+ event->header_.connection_id_ = this->connection_id_;
event->header_.len_ = n;
event->header_.priority_ = 0;
event->header_.type_ = 0;
@@ -148,7 +146,8 @@ Peer_Handler::nonblk_put (ACE_Message_Block *mb)
// We didn't manage to send everything, so requeue.
ACE_DEBUG ((LM_DEBUG,
"queueing activated on handle %d to supplier id %d\n",
- this->get_handle (), this->proxy_id_));
+ this->get_handle (),
+ this->connection_id_));
// Re-queue in *front* of the list to preserve order.
if (this->msg_queue ()->enqueue_head
@@ -211,7 +210,7 @@ Peer_Handler::handle_output (ACE_HANDLE)
ACE_DEBUG ((LM_DEBUG,
"queue now empty on handle %d to supplier id %d\n",
this->get_handle (),
- this->proxy_id_));
+ this->connection_id_));
if (ACE_Reactor::instance ()->cancel_wakeup
(this, ACE_Event_Handler::WRITE_MASK) == -1)
@@ -224,7 +223,10 @@ Peer_Handler::handle_output (ACE_HANDLE)
}
else
// If the list is empty there's a bug!
- ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "dequeue_head"), 0);
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "%p\n",
+ "dequeue_head"),
+ 0);
}
// Send an event to a peer (may block if necessary).
@@ -394,7 +396,7 @@ Peer_Handler::recv (ACE_Message_Block *&mb)
ACE_DEBUG ((LM_DEBUG,
"(%t) supplier id = %d, cur len = %d, total bytes read = %d\n",
- event->header_.supplier_id_,
+ event->header_.connection_id_,
event->header_.len_,
data_received + header_received));
if (Options::instance ()->enabled (Options::VERBOSE))
@@ -424,12 +426,12 @@ Peer_Handler::handle_input (ACE_HANDLE sd)
// Action that receives our supplier id from the Gateway.
int
-Peer_Handler::await_supplier_id (void)
+Peer_Handler::await_connection_id (void)
{
- ssize_t n = this->peer ().recv (&this->proxy_id_,
- sizeof this->proxy_id_);
+ ssize_t n = this->peer ().recv (&this->connection_id_,
+ sizeof this->connection_id_);
- if (n != sizeof this->proxy_id_)
+ if (n != sizeof this->connection_id_)
{
if (n == 0)
ACE_ERROR_RETURN ((LM_ERROR,
@@ -445,10 +447,10 @@ Peer_Handler::await_supplier_id (void)
}
else
{
- this->proxy_id_ = ntohl (this->proxy_id_);
+ this->connection_id_ = ntohl (this->connection_id_);
ACE_DEBUG ((LM_DEBUG,
- "assigned proxy id %d\n",
- this->proxy_id_));
+ "assigned connection id %d\n",
+ this->connection_id_));
}
// Transition to the action that waits for Peer events.
@@ -502,7 +504,7 @@ Peer_Handler::await_events (void)
ACE_DEBUG ((LM_DEBUG,
"route id = %d, cur len = %d, total len = %d\n",
- event->header_.supplier_id_,
+ event->header_.connection_id_,
event->header_.len_,
this->total_bytes_));
if (Options::instance ()->enabled (Options::VERBOSE))
diff --git a/apps/Gateway/Peer/Peer.h b/apps/Gateway/Peer/Peer.h
index fa78f786fde..084c2b6d3b8 100644
--- a/apps/Gateway/Peer/Peer.h
+++ b/apps/Gateway/Peer/Peer.h
@@ -117,10 +117,8 @@ protected:
// Pointer-to-member-function for the current action to run in this
// state. This points to one of the preceding 3 methods.
- ACE_INT32 proxy_id_;
- // Proxy ID of the peer, which is obtained from the gatewayd. For
- // simplicity, in this implementation we also use the Proxy ID as
- // the Supplier ID. This might change in future releases.
+ CONNECTION_ID connection_id_;
+ // Connection ID of the peer, which is obtained from the gatewayd.
ACE_Message_Block *msg_frag_;
// Keep track of event fragments that arrive in non-blocking recv's