summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorschmidt <douglascraigschmidt@users.noreply.github.com>1997-01-01 08:00:34 +0000
committerschmidt <douglascraigschmidt@users.noreply.github.com>1997-01-01 08:00:34 +0000
commitea0d28240863caf437a18071bfd03e7b146c5ade (patch)
tree91b695852b885a5f44f9be8c3a22bbf7f5a96b8d
parenta6e2ced2f5279e011b712995095a1712a29e22f0 (diff)
downloadATCD-ea0d28240863caf437a18071bfd03e7b146c5ade.tar.gz
foo
-rw-r--r--ChangeLog-96b66
-rw-r--r--ace/Acceptor.h7
-rw-r--r--ace/Connector.cpp2
-rw-r--r--ace/Connector.h2
-rw-r--r--ace/Event_Handler.h4
-rw-r--r--ace/Log_Msg.cpp8
-rw-r--r--ace/Message_Block.cpp102
-rw-r--r--ace/Message_Block.h81
-rw-r--r--ace/Message_Block.i16
-rw-r--r--ace/Reactor.cpp8
-rw-r--r--ace/SOCK_Acceptor.h1
-rw-r--r--ace/Svc_Handler.h2
-rw-r--r--ace/Synch.h162
-rw-r--r--ace/Synch.i80
-rw-r--r--ace/Synch_T.h51
-rw-r--r--ace/Synch_T.i68
-rw-r--r--apps/Gateway/Gateway/Concurrency_Strategies.h12
-rw-r--r--apps/Gateway/Gateway/Config_Files.cpp36
-rw-r--r--apps/Gateway/Gateway/Config_Files.h34
-rw-r--r--apps/Gateway/Gateway/Consumer_Dispatch_Set.h28
-rw-r--r--apps/Gateway/Gateway/Event.h38
-rw-r--r--apps/Gateway/Gateway/Event_Channel.cpp498
-rw-r--r--apps/Gateway/Gateway/Event_Channel.h113
-rw-r--r--apps/Gateway/Gateway/Event_Forwarding_Discriminator.cpp40
-rw-r--r--apps/Gateway/Gateway/Event_Forwarding_Discriminator.h28
-rw-r--r--apps/Gateway/Gateway/File_Parser.cpp22
-rw-r--r--apps/Gateway/Gateway/File_Parser.h2
-rw-r--r--apps/Gateway/Gateway/Gateway.cpp231
-rw-r--r--apps/Gateway/Gateway/Makefile12
-rw-r--r--apps/Gateway/Gateway/Proxy_Handler.cpp345
-rw-r--r--apps/Gateway/Gateway/Proxy_Handler.h91
-rw-r--r--apps/Gateway/Gateway/Proxy_Handler_Connector.cpp41
-rw-r--r--apps/Gateway/Gateway/Thr_Proxy_Handler.cpp157
-rw-r--r--apps/Gateway/Gateway/Thr_Proxy_Handler.h24
-rw-r--r--apps/Gateway/Gateway/gatewayd.cpp7
-rw-r--r--apps/Gateway/Peer/Event.h38
-rw-r--r--apps/Gateway/Peer/peerd.cpp7
-rw-r--r--apps/Orbix-Examples/Event_Comm/Consumer/Notification_Receiver_Handler.cpp4
-rw-r--r--examples/ASX/Event_Server/Event_Server/Peer_Router.cpp2
-rw-r--r--examples/ASX/Message_Queue/bounded_buffer.cpp2
-rw-r--r--examples/ASX/Message_Queue/buffer_stream.cpp2
-rw-r--r--examples/ASX/Message_Queue/priority_buffer.cpp6
-rw-r--r--examples/ASX/UPIPE_Event_Server/Peer_Router.cpp2
-rw-r--r--examples/Connection/blocking/SPIPE-connector.h2
-rw-r--r--examples/Logger/simple-server/Logging_Handler.cpp2
-rw-r--r--netsvcs/lib/TS_Clerk_Handler.cpp2
46 files changed, 1569 insertions, 919 deletions
diff --git a/ChangeLog-96b b/ChangeLog-96b
index 349f954ba24..64de3de46b5 100644
--- a/ChangeLog-96b
+++ b/ChangeLog-96b
@@ -1,3 +1,60 @@
+Wed Jan 1 00:10:47 1997 Douglas C. Schmidt <schmidt@tango.cs.wustl.edu>
+
+ * apps/Gateway/Gateway: Moved all of the configuration file
+ parsing logic *outside* of the Event_Channel into the Gateway
+ class so that we wouldn't have unnecessary dependencies.
+
+ * apps/Gateway/Gateway: Redesigned the Gateway so that the
+ Proxy_Handlers (i.e., the Consumer_Proxy and Supplier_Proxy)
+ most of their work to the Event_Channel. This "lightweight
+ proxy" design is an improvement since it is now possible to
+ emulate the COS Event Channel semantics within the Event_Channel
+ "kernel."
+
+ * Happy new year!
+
+Tue Dec 31 18:27:50 1996 Douglas C. Schmidt <schmidt@tango.cs.wustl.edu>
+
+ * ace/Log_Msg.cpp (log): Added a test so that if we're
+ (1) not printing to stderr and (2) aborting the program we still
+ print a message to stderr.
+
+ * ace/Message_Block: Added synchronization support to
+ ACE_Message_Block. This is necessary now that we've got
+ reference counting to ensure that we don't have race conditions
+ when incrementing and decrementing the reference count in
+ separate threads. The approach is very clean and uses the new
+ ACE_Lock mechanism to conditionally acquire()/release() the
+ locking strategy if concurrency control is necessary.
+
+ * ace/Synch_T: Created a new set of ACE_Lock and
+ ACE_Lock_Adapter<> classes which are similar in spirit to the
+ ACE_Allocator and ACE_Allocator_Adapter<> classes. These make
+ it possible to treat polymorphically synchronization mechanisms
+ in ACE polymorphically, *without* creating an entire new
+ parallel hierarchy of locking mechanisms.
+
+ * ace/Synch: Added the full suite of acquire_{read|write}() and
+ tryacquire_{read|write}() methods to ACE_Semaphore and
+ ACE_Process_Semaphore so they will be consist with the other
+ synchronization APIs.
+
+Tue Dec 31 00:11:56 1996 Douglas C. Schmidt <schmidt@flamenco.cs.wustl.edu>
+
+ * Changed all uses of ACE_Event_Handler::RWE_MASK to
+ ACE_Event_Handler::ALL_EVENTS_MASK to reflect the fact that
+ we will soon have more than READ, WRITE, and EXCEPT events.
+ However, I've kept RWE_MASK around for backwards
+ compatibility.
+
+ * examples/ASX/Message_Queue: Changed the tests so that they use
+ the new ACE_Message_Block::release() method rather than calling
+ delete explicitly.
+
+ * apps/Gateway: Revised the implementation of the Gateway and Peer
+ applications to take advantage of the new ACE_Message_Block
+ reference counting scheme.
+
Tue Dec 31 15:06:51 1996 David L. Levine <levine@cs.wustl.edu>
* ace/Task.cpp: added comments that try to explain interaction
@@ -12,6 +69,15 @@ Tue Dec 31 15:06:51 1996 David L. Levine <levine@cs.wustl.edu>
Mon Dec 30 15:24:59 1996 Douglas C. Schmidt <schmidt@flamenco.cs.wustl.edu>
+ * ace/Message_Block: Added reference counting to ACE_Message_Block
+ so that we no longer have to clone() messages when we want to
+ pass them around "by reference."
+
+ * apps/Gateway/Peer/Peer.cpp (init): The Peer_Acceptor had gotten
+ out of date wrt newer ACE features, so I updated it.
+
+Mon Dec 30 15:24:59 1996 Douglas C. Schmidt <schmidt@flamenco.cs.wustl.edu>
+
* ace/OS.h: Added a special case for ACE_UNUSED_ARG that works
with G++. Thanks to David Levine for this.
diff --git a/ace/Acceptor.h b/ace/Acceptor.h
index 7071af03800..feca8eb7d2f 100644
--- a/ace/Acceptor.h
+++ b/ace/Acceptor.h
@@ -1,7 +1,6 @@
/* -*- C++ -*- */
// $Id$
-
// ============================================================================
//
// = LIBRARY
@@ -109,7 +108,7 @@ protected:
// = Demultiplexing hooks.
virtual int handle_close (ACE_HANDLE = ACE_INVALID_HANDLE,
- ACE_Reactor_Mask = ACE_Event_Handler::RWE_MASK);
+ ACE_Reactor_Mask = ACE_Event_Handler::ALL_EVENTS_MASK);
// Perform termination activities when <this> is removed from the
// <reactor>.
@@ -252,7 +251,7 @@ protected:
// Returns the listening acceptor's <ACE_HANDLE>.
virtual int handle_close (ACE_HANDLE = ACE_INVALID_HANDLE,
- ACE_Reactor_Mask = ACE_Event_Handler::RWE_MASK);
+ ACE_Reactor_Mask = ACE_Event_Handler::ALL_EVENTS_MASK);
// Perform termination activities when <this> is removed from the
// <Reactor>.
@@ -397,7 +396,7 @@ protected:
// Returns the listening acceptor's <ACE_HANDLE>.
virtual int handle_close (ACE_HANDLE = ACE_INVALID_HANDLE,
- ACE_Reactor_Mask = ACE_Event_Handler::RWE_MASK);
+ ACE_Reactor_Mask = ACE_Event_Handler::ALL_EVENTS_MASK);
// Perform termination activities when <this> is removed from the
// <reactor>.
diff --git a/ace/Connector.cpp b/ace/Connector.cpp
index 0cace4e72c3..03fa643dfc8 100644
--- a/ace/Connector.cpp
+++ b/ace/Connector.cpp
@@ -268,7 +268,7 @@ ACE_Connector<SH, PR_CO_2>::cleanup_AST (ACE_HANDLE handle,
this->reactor_->cancel_timer (ast->cancellation_id ());
// Remove ACE_HANDLE from ACE_Reactor.
- this->reactor_->remove_handler (handle, ACE_Event_Handler::RWE_MASK
+ this->reactor_->remove_handler (handle, ACE_Event_Handler::ALL_EVENTS_MASK
| ACE_Event_Handler::DONT_CALL);
// Remove ACE_HANDLE from the map.
diff --git a/ace/Connector.h b/ace/Connector.h
index 8d9e7d44a5c..02a193621b4 100644
--- a/ace/Connector.h
+++ b/ace/Connector.h
@@ -222,7 +222,7 @@ protected:
// = Demultiplexing hooks.
virtual int handle_close (ACE_HANDLE = ACE_INVALID_HANDLE,
- ACE_Reactor_Mask = ACE_Event_Handler::RWE_MASK);
+ ACE_Reactor_Mask = ACE_Event_Handler::ALL_EVENTS_MASK);
// Terminate the Client ACE_Connector by iterating over any
// unconnected ACE_Svc_Handler's and removing them from the
// ACE_Reactor.
diff --git a/ace/Event_Handler.h b/ace/Event_Handler.h
index 07140a82ad6..f414d930ed4 100644
--- a/ace/Event_Handler.h
+++ b/ace/Event_Handler.h
@@ -54,8 +54,8 @@ public:
EXCEPT_MASK = 0x2,
#endif /* ACE_USE_POLL */
ACCEPT_MASK = 0x8,
- ALL_MASK = READ_MASK | WRITE_MASK | EXCEPT_MASK | ACCEPT_MASK,
- RWE_MASK = ALL_MASK,
+ ALL_EVENTS_MASK = READ_MASK | WRITE_MASK | EXCEPT_MASK | ACCEPT_MASK,
+ RWE_MASK = ALL_EVENTS_MASK,
DONT_CALL = 0x100
};
diff --git a/ace/Log_Msg.cpp b/ace/Log_Msg.cpp
index e0bbef3abfe..8c509eee15d 100644
--- a/ace/Log_Msg.cpp
+++ b/ace/Log_Msg.cpp
@@ -690,9 +690,11 @@ ACE_Log_Msg::log (const char *format_str,
if (abort_prog)
{
- // _always_ print a message to stderr if aborting, not verbose
- // to help avoid recursive aborts if something is hosed
- log_record.print (ACE_Log_Msg::local_host_, 0);
+ // *always* print a message to stderr if we're aborting (and
+ // have not already done so). We don't use verbose, however, to
+ // avoid recursive aborts if something is hosed.
+ if (!ACE_BIT_ENABLED (ACE_Log_Msg::flags_, ACE_Log_Msg::STDERR))
+ log_record.print (ACE_Log_Msg::local_host_, 0);
ACE_OS::exit (exit_value);
}
diff --git a/ace/Message_Block.cpp b/ace/Message_Block.cpp
index 9f9503ef429..d58e13a8c03 100644
--- a/ace/Message_Block.cpp
+++ b/ace/Message_Block.cpp
@@ -80,7 +80,10 @@ ACE_Message_Block::ACE_Message_Block (void)
cont_ (0),
next_ (0),
prev_ (0),
- allocator_ (0)
+ allocator_strategy_ (0),
+ locking_strategy_ (0),
+ reference_count_ (1)
+
{
ACE_TRACE ("ACE_Message_Block::ACE_Message_Block");
}
@@ -89,20 +92,24 @@ ACE_Message_Block::ACE_Message_Block (size_t sz,
ACE_Message_Type msg_type,
ACE_Message_Block *msg_cont,
const char *msg_data,
- ACE_Allocator *alloc)
+ ACE_Allocator *allocator_strategy,
+ ACE_Lock *locking_strategy)
{
ACE_TRACE ("ACE_Message_Block::ACE_Message_Block");
- if (this->init (sz, msg_type, msg_cont, msg_data, alloc) == -1)
+
+ if (this->init (sz, msg_type, msg_cont, msg_data,
+ allocator_strategy, locking_strategy) == -1)
ACE_ERROR ((LM_ERROR, "ACE_Message_Block"));
}
ACE_Message_Block::~ACE_Message_Block (void)
{
ACE_TRACE ("ACE_Message_Block::~ACE_Message_Block");
+
if (ACE_BIT_DISABLED (this->flags_, ACE_Message_Block::DONT_DELETE))
{
- if (this->allocator_)
- this->allocator_->free ((void *) this->base_);
+ if (this->allocator_strategy_)
+ this->allocator_strategy_->free ((void *) this->base_);
else
delete [] this->base_;
}
@@ -125,7 +132,9 @@ ACE_Message_Block::ACE_Message_Block (const char *data,
cont_ (0),
next_ (0),
prev_ (0),
- allocator_ (0)
+ allocator_strategy_ (0),
+ locking_strategy_ (0),
+ reference_count_ (1)
{
ACE_TRACE ("ACE_Message_Block::ACE_Message_Block");
}
@@ -134,6 +143,7 @@ int
ACE_Message_Block::size (size_t length)
{
ACE_TRACE ("ACE_Message_Block::size");
+
if (length < this->max_size_)
this->cur_size_ = length;
else
@@ -141,11 +151,11 @@ ACE_Message_Block::size (size_t length)
int r_delta, w_delta;
char *buf;
- if (this->allocator_ == 0)
+ if (this->allocator_strategy_ == 0)
ACE_NEW_RETURN (buf, char[length], -1);
else // Use the allocator!
{
- buf = (char *) this->allocator_->malloc (length);
+ buf = (char *) this->allocator_strategy_->malloc (length);
if (buf == 0)
{
errno = ENOMEM;
@@ -155,8 +165,8 @@ ACE_Message_Block::size (size_t length)
if (ACE_BIT_DISABLED (this->flags_, ACE_Message_Block::DONT_DELETE))
{
- if (this->allocator_)
- this->allocator_->free ((void *) this->base_);
+ if (this->allocator_strategy_)
+ this->allocator_strategy_->free ((void *) this->base_);
else
delete [] this->base_;
}
@@ -183,6 +193,7 @@ ACE_Message_Block::init (const char *data,
size_t size)
{
ACE_TRACE ("ACE_Message_Block::init");
+ // Should we also initialize all the other fields, as well?
this->base_ = (char *) data;
this->cur_size_ = size;
this->max_size_ = size;
@@ -195,22 +206,23 @@ ACE_Message_Block::init (size_t sz,
ACE_Message_Type msg_type,
ACE_Message_Block *msg_cont,
const char *msg_data,
- ACE_Allocator *alloc)
+ ACE_Allocator *allocator_strategy,
+ ACE_Lock *locking_strategy)
{
ACE_TRACE ("ACE_Message_Block::init");
this->flags_ = 0;
if (msg_data == 0)
{
- if (alloc == 0)
+ if (allocator_strategy == 0)
{
- this->allocator_ = 0;
+ this->allocator_strategy_ = 0;
ACE_NEW_RETURN (this->base_, char[sz], -1);
}
else // Use the allocator!
{
- this->allocator_ = alloc;
- this->base_ = (char *) alloc->malloc (sz);
+ this->allocator_strategy_ = allocator_strategy;
+ this->base_ = (char *) this->allocator_strategy_->malloc (sz);
if (this->base_ == 0)
{
errno = ENOMEM;
@@ -233,6 +245,8 @@ ACE_Message_Block::init (size_t sz,
this->cont_ = msg_cont;
this->next_ = 0;
this->prev_ = 0;
+ this->locking_strategy_ = locking_strategy;
+ this->reference_count_ = 1;
return 0;
}
@@ -248,7 +262,7 @@ ACE_Message_Block::clone (Message_Flags mask) const
ACE_NEW_RETURN (nb,
ACE_Message_Block (this->max_size_, this->type_,
- 0, 0, this->allocator_),
+ 0, 0, this->allocator_strategy_),
0);
ACE_OS::memcpy (nb->base_, this->base_, this->max_size_);
@@ -264,3 +278,59 @@ ACE_Message_Block::clone (Message_Flags mask) const
nb->cont_ = this->cont_->clone (mask);
return nb;
}
+
+ACE_Message_Block *
+ACE_Message_Block::release (void)
+{
+ ACE_TRACE ("ACE_Message_Block::release");
+
+ ACE_Message_Block *result = 0;
+
+ if (this->locking_strategy_)
+ {
+ // We need to acquire the lock before decrementing the count.
+ this->locking_strategy_->acquire ();
+ this->reference_count_--;
+
+ if (this->reference_count_ == 0)
+ delete this;
+ else if (this->reference_count_ > 0)
+ result = this;
+ // ACE_ASSERT (this->reference_count_ <= 0)
+ // This shouldn't happen...
+
+ this->locking_strategy_->release ();
+ }
+ else
+ {
+ this->reference_count_--;
+
+ if (this->reference_count_ == 0)
+ delete this;
+ else if (this->reference_count_ > 0)
+ result = this;
+ // ACE_ASSERT (this->reference_count_ <= 0)
+ // This shouldn't happen...
+ }
+
+ return result;
+}
+
+ACE_INLINE ACE_Message_Block *
+ACE_Message_Block::duplicate (void)
+{
+ ACE_TRACE ("ACE_Message_Block::duplicate");
+
+ if (this->locking_strategy_)
+ {
+ // We need to acquire the lock before incrementing the count.
+ this->locking_strategy_->acquire ();
+ this->reference_count_++;
+ this->locking_strategy_->release ();
+ }
+ else
+ this->reference_count_++;
+
+ return this;
+}
+
diff --git a/ace/Message_Block.h b/ace/Message_Block.h
index 599e1039813..30852f4dc8c 100644
--- a/ace/Message_Block.h
+++ b/ace/Message_Block.h
@@ -23,17 +23,21 @@
class ACE_Export ACE_Message_Block
// = TITLE
- // Object used to store messages in the ASX framework.
+ // Stores messages for use throughout ACE (particularly
+ // <ACE_Message_Queue>).
//
// = DESCRIPTION
- // An ACE_Message_Block is modeled after the message data
- // structures used in System V STREAMS. A Message_Block is
- // composed of one or more Message_Blocks that are linked
- // together by PREV and NEXT pointers. In addition, a
- // ACE_Message_Block may also be linked to a chain of other
- // Message_Blocks. This structure enables efficient
+ // An <ACE_Message_Block> is modeled after the message data
+ // structures used in System V STREAMS. An <ACE_Message_Block>
+ // is composed of one or more <ACE_Message_Blocks> that can be
+ // linked to form a ``fragment chain.'' In addition,
+ // <ACE_Message_Blocks> can be linked together by <prev_> and
+ // <next_> pointers to form a queue of messages (this is how
+ // <ACE_Message_Queue> works). This structure enables efficient
// manipulation of arbitrarily-large messages *without*
- // incurring memory copying overhead.
+ // incurring memory copying overhead since (1)
+ // <ACE_Message_Blocks> can be chained together via pointers and
+ // (2) <ACE_Message_Blocks> keep a reference count.
{
public:
enum ACE_Message_Type
@@ -90,13 +94,16 @@ public:
ACE_Message_Type type = MB_DATA,
ACE_Message_Block *cont = 0,
const char *data = 0,
- ACE_Allocator *allocator = 0);
+ ACE_Allocator *allocator_strategy_ = 0,
+ ACE_Lock *locking_strategy = 0);
// Create an initialized message of type <type> containing <size>
// bytes. The <cont> argument initializes the continuation field in
// the <Message_Block>. If <data> == 0 then we create and own the
// <data>, using <allocator> to get the data if it's non-0. If
// <data> != 0 we assume ownership of the <data> (and don't delete
- // it).
+ // it). If <locking_strategy> is non-0 then this is used to protect
+ // regions of code that access shared state (e.g., reference
+ // counting) from race conditions.
int init (const char *data,
size_t size = 0);
@@ -107,13 +114,16 @@ public:
ACE_Message_Type type = MB_DATA,
ACE_Message_Block *cont = 0,
const char *data = 0,
- ACE_Allocator *allocator = 0);
+ ACE_Allocator *allocator = 0,
+ ACE_Lock *locking_strategy = 0);
// Create an initialized message of type <type> containing <size>
// bytes. The <cont> argument initializes the continuation field in
// the <Message_Block>. If <data> == 0 then we create and own the
// <data>, using <allocator> to get the data if it's non-0. If
// <data> != 0 we assume ownership of the <data> (and don't delete
- // it).
+ // it). If <locking_strategy> is non-0 then this is used to protect
+ // regions of code that access shared state (e.g., reference
+ // counting) from race conditions.
~ACE_Message_Block (void);
// Delete all the resources held in the message.
@@ -145,6 +155,16 @@ public:
ACE_Message_Block *clone (Message_Flags mask = ACE_Message_Block::DONT_DELETE) const;
// Return an exact "deep copy" of the message.
+ // = Reference counting methods.
+ ACE_Message_Block *duplicate (void);
+ // Increment our reference count by one.
+
+ ACE_Message_Block *release (void);
+ // Decrement our reference count by one. If the reference count is
+ // > 0 then return this; else if reference count == 0 then delete
+ // <this> and return 0. Behavior is undefined if reference count <
+ // 0.
+
// = Operations on Message data
int copy (const char *buf, size_t n);
@@ -160,7 +180,9 @@ public:
char *base (void) const;
// Get message data.
- void base (char *data, size_t size, Message_Flags = DONT_DELETE);
+ void base (char *data,
+ size_t size,
+ Message_Flags = DONT_DELETE);
// Set message data.
char *end (void) const;
@@ -180,42 +202,43 @@ public:
void wr_ptr (size_t n);
// Set the write pointer ahead <n> bytes.
- // = The length of a message is computed as the length between the
- // wr_ptr() - rd_ptr ()).
+ // = Message length is wr_ptr() - rd_ptr ().
size_t length (void) const;
// Get the length of the message
void length (size_t n);
// Set the length of the message
- // = The size of the allocated buffer is the total amount of space
- // alloted.
+ // = Message size is the total amount of space alloted.
size_t size (void) const;
// Get the total amount of space in the message.
int size (size_t length);
// Set the total amount of space in the message. Returns 0 if
// successful, else -1.
- // = The coninuation field is used to chain together composite
- // messages.
+ // = The continuation field chains together composite messages.
ACE_Message_Block *cont (void) const;
// Get the continuation field.
void cont (ACE_Message_Block *);
// Set the continuation field.
- // = The <next_> pointer points to the <Message_Block> directly ahead
- // in the Message_Queue.
+ // = The <next_> pointer is a link to the <Message_Block> directly ahead in the Message_Queue.
ACE_Message_Block *next (void) const;
// Get link to next message.
void next (ACE_Message_Block *);
// Set link to next message.
- // = The <prev_> pointer points to the <Message_Block> directly
- // ahead in the Message_Queue.
+ // = The <prev_> pointer is a link to the <Message_Block> directly ahead in the Message_Queue.
ACE_Message_Block *prev (void) const;
// Get link to prev message.
void prev (ACE_Message_Block *);
// Set link to prev message.
+ // = The locking strategy prevents race condition.
+ ACE_Lock *locking_strategy (void);
+ // Get the locking strategy.
+ ACE_Lock *locking_strategy (ACE_Lock *);
+ // Set a new locking strategy and return the hold one.
+
void dump (void) const;
// Dump the state of an object.
@@ -224,7 +247,7 @@ public:
private:
Message_Flags flags_;
- // Misc flags.
+ // Misc flags (e.g., DONT_DELETE and USER_FLAGS).
char *base_;
// Pointer to beginning of message block.
@@ -257,9 +280,17 @@ private:
ACE_Message_Block *prev_;
// Pointer to previous message in the list.
- ACE_Allocator *allocator_;
+ ACE_Allocator *allocator_strategy_;
// Pointer to the allocator defined for this message block.
+ ACE_Lock *locking_strategy_;
+ // Pointer to the locking defined for this message block. This is
+ // used to protect regions of code containing
+
+ size_t reference_count_;
+ // Reference count for this <Message_Block> which is used to avoid
+ // deep copies (i.e., <clone>).
+
// = Disallow these operations for now (use <clone> instead).
ACE_Message_Block &operator= (const ACE_Message_Block &);
ACE_Message_Block (const ACE_Message_Block &);
diff --git a/ace/Message_Block.i b/ace/Message_Block.i
index eebe94937d6..72b4f42f67b 100644
--- a/ace/Message_Block.i
+++ b/ace/Message_Block.i
@@ -230,3 +230,19 @@ ACE_Message_Block::prev (void) const
return this->prev_;
}
+ACE_INLINE ACE_Lock *
+ACE_Message_Block::locking_strategy (void)
+{
+ ACE_TRACE ("ACE_Message_Block::locking_strategy");
+ return this->locking_strategy_;
+}
+
+ACE_INLINE ACE_Lock *
+ACE_Message_Block::locking_strategy (ACE_Lock *nls)
+{
+ ACE_TRACE ("ACE_Message_Block::locking_strategy");
+ ACE_Lock *ols = this->locking_strategy_;
+ this->locking_strategy_ = nls;
+ return ols;
+}
+
diff --git a/ace/Reactor.cpp b/ace/Reactor.cpp
index 1a0a9d8377a..c7a629d028b 100644
--- a/ace/Reactor.cpp
+++ b/ace/Reactor.cpp
@@ -92,7 +92,7 @@ ACE_Handler_Repository::close (ACE_Reactor *reactor)
i < this->cur_size_;
i++)
reactor->detach (this->event_handlers_[i].handle_,
- ACE_Event_Handler::RWE_MASK);
+ ACE_Event_Handler::ALL_EVENTS_MASK);
delete [] this->event_handlers_;
this->event_handlers_ = 0;
@@ -100,7 +100,7 @@ ACE_Handler_Repository::close (ACE_Reactor *reactor)
for (ACE_HANDLE h = 0;
h < this->max_handlep1_;
h++)
- reactor->detach (h, ACE_Event_Handler::RWE_MASK);
+ reactor->detach (h, ACE_Event_Handler::ALL_EVENTS_MASK);
delete [] this->event_handlers_;
this->event_handlers_ = 0;
@@ -1591,7 +1591,7 @@ ACE_Reactor::check_handles (void)
if (ACE_OS::poll (&p_handle, 1, 0) == -1)
{
result = 1;
- this->detach (handle, ACE_Event_Handler::RWE_MASK);
+ this->detach (handle, ACE_Event_Handler::ALL_EVENTS_MASK);
}
#else
rmask.set_bit (handle);
@@ -1600,7 +1600,7 @@ ACE_Reactor::check_handles (void)
&time_poll) < 0)
{
result = 1;
- this->detach (handle, ACE_Event_Handler::RWE_MASK);
+ this->detach (handle, ACE_Event_Handler::ALL_EVENTS_MASK);
}
rmask.clr_bit (handle);
#endif /* ACE_USE_POLL */
diff --git a/ace/SOCK_Acceptor.h b/ace/SOCK_Acceptor.h
index fd3c2534a92..fed78c498ab 100644
--- a/ace/SOCK_Acceptor.h
+++ b/ace/SOCK_Acceptor.h
@@ -1,7 +1,6 @@
/* -*- C++ -*- */
// $Id$
-
// ============================================================================
//
// = LIBRARY
diff --git a/ace/Svc_Handler.h b/ace/Svc_Handler.h
index 1228ad0964a..89bf2d73d99 100644
--- a/ace/Svc_Handler.h
+++ b/ace/Svc_Handler.h
@@ -73,7 +73,7 @@ public:
// = Demultiplexing hooks.
virtual int handle_close (ACE_HANDLE = ACE_INVALID_HANDLE,
- ACE_Reactor_Mask = ACE_Event_Handler::RWE_MASK);
+ ACE_Reactor_Mask = ACE_Event_Handler::ALL_EVENTS_MASK);
// Perform termination activities on the SVC_HANDLER. The default
// behavior is to close down the <peer_> (to avoid descriptor leaks)
// and to delete this (to avoid memory leaks)! If you don't want
diff --git a/ace/Synch.h b/ace/Synch.h
index 6d3e864d582..3a88bee5a32 100644
--- a/ace/Synch.h
+++ b/ace/Synch.h
@@ -29,6 +29,54 @@
class ACE_Time_Value;
// template <class ACE_COND_MUTEX> class ACE_Condition;
+class ACE_Lock
+ // = TITLE
+ // This is the abstract base class that contains the uniform
+ // locking API that is supported by all the ACE synchronization
+ // mechanisms.
+ //
+ // = DESCRIPTION
+ // This class is typically used in conjunction with the
+ // <ACE_Lock_Adapter> in order to provide a polymorphic
+ // interface to the ACE synchronization mechanisms (e.g.,
+ // <ACE_Mutex>, <ACE_Semaphore>, <ACE_RW_Lock>, etc). Note that
+ // the reason that all of ACE doesn't use polymorphic locks is
+ // that (1) they add ~20% extra overhead for virtual function
+ // calls and (2) objects with virtual functions can't be placed
+ // into shared memory.
+{
+public:
+ virtual int remove (void) = 0;
+ // Explicitly destroy the lock.
+
+ virtual int acquire (void) = 0;
+ // Block the thread until the lock is acquired.
+
+ virtual int tryacquire (void) = 0;
+ // Conditionally acquire the lock (i.e., won't block).
+
+ virtual int release (void) = 0;
+ // Release the lock.
+
+ virtual int acquire_read (void) = 0;
+ // Block until the thread acquires a read lock. If the locking
+ // mechanism doesn't support read locks then this just calls
+ // <acquire>.
+
+ virtual int acquire_write (void) = 0;
+ // Block until the thread acquires a write lock. If the locking
+ // mechanism doesn't support read locks then this just calls
+ // <acquire>.
+
+ virtual int tryacquire_read (void) = 0;
+ // Conditionally acquire a read lock. If the locking mechanism
+ // doesn't support read locks then this just calls <acquire>.
+
+ virtual int tryacquire_write (void) = 0;
+ // Conditionally acquire a write lock. If the locking mechanism
+ // doesn't support read locks then this just calls <acquire>.
+};
+
class ACE_Export ACE_File_Lock
// = TITLE
// A wrapper around the UNIX file locking mechanism.
@@ -111,12 +159,12 @@ class ACE_Export ACE_Semaphore
{
public:
// = Initialization and termination.
- ACE_Semaphore (u_int count,
+ ACE_Semaphore (u_int count = 1, // By default make this unlocked.
int type = USYNC_THREAD,
LPCTSTR name = 0,
void * = 0,
int max = 0x7fffffff);
- // Initialize the semaphore, with default value of "count".
+ // Initialize the semaphore, with initial value of "count".
~ACE_Semaphore (void);
// Implicitly destroy the semaphore.
@@ -129,13 +177,33 @@ public:
// greater than 0, then decrement it.
int tryacquire (void);
- // Conditionally decrement the semaphore if count is greater
- // than 0 (i.e., won't block).
+ // Conditionally decrement the semaphore if count is greater than 0
+ // (i.e., won't block).
int release (void);
// Increment the semaphore, potentially unblocking
// a waiting thread.
+ int acquire_read (void);
+ // Acquire semaphore ownership. This calls <acquire> and is only
+ // here to make the <ACE_Semaphore> interface consistent with the
+ // other synchronization APIs.
+
+ int acquire_write (void);
+ // Acquire semaphore ownership. This calls <acquire> and is only
+ // here to make the <ACE_Semaphore> interface consistent with the
+ // other synchronization APIs.
+
+ int tryacquire_read (void);
+ // Conditionally acquire semaphore (i.e., won't block). This calls
+ // <tryacquire> and is only here to make the <ACE_Semaphore>
+ // interface consistent with the other synchronization APIs.
+
+ int tryacquire_write (void);
+ // Conditionally acquire semaphore (i.e., won't block). This calls
+ // <tryacquire> and is only here to make the <ACE_Semaphore>
+ // interface consistent with the other synchronization APIs.
+
void dump (void) const;
// Dump the state of an object.
@@ -159,8 +227,10 @@ class ACE_Export ACE_Process_Semaphore
// across processes.
{
public:
- ACE_Process_Semaphore (u_int count, LPCTSTR name = 0,
- void * = 0, int max = 0x7FFFFFFF);
+ ACE_Process_Semaphore (u_int count = 1, // By default make this unlocked.
+ LPCTSTR name = 0,
+ void * = 0,
+ int max = 0x7FFFFFFF);
// Initialize the semaphore, with an initial value of <count> and a
// maximum value of <max>.
@@ -171,16 +241,35 @@ public:
// Explicitly destroy the semaphore.
int acquire (void);
- // Block the thread until the semaphore count becomes
- // greater than 0, then decrement it.
+ // Block the thread until the semaphore count becomes greater than
+ // 0, then decrement it.
int tryacquire (void);
- // Conditionally decrement the semaphore if count is greater
- // than 0 (i.e., won't block).
+ // Conditionally decrement the semaphore if count is greater than 0
+ // (i.e., won't block).
int release (void);
- // Increment the semaphore, potentially unblocking
- // a waiting thread.
+ // Increment the semaphore, potentially unblocking a waiting thread.
+
+ int acquire_read (void);
+ // Acquire semaphore ownership. This calls <acquire> and is only
+ // here to make the <ACE_Process_Semaphore> interface consistent
+ // with the other synchronization APIs.
+
+ int acquire_write (void);
+ // Acquire semaphore ownership. This calls <acquire> and is only
+ // here to make the <ACE_Process_Semaphore> interface consistent
+ // with the other synchronization APIs.
+
+ int tryacquire_read (void);
+ // Conditionally acquire semaphore (i.e., won't block). This calls
+ // <tryacquire> and is only here to make the <ACE_Process_Semaphore>
+ // interface consistent with the other synchronization APIs.
+
+ int tryacquire_write (void);
+ // Conditionally acquire semaphore (i.e., won't block). This calls
+ // <tryacquire> and is only here to make the <ACE_Process_Semaphore>
+ // interface consistent with the other synchronization APIs.
void dump (void) const;
// Dump the state of an object.
@@ -233,12 +322,12 @@ public:
int acquire (void);
// Note, for interface uniformity with other synchronization
// wrappers we include the <acquire> method. This is implemented as
- // a write-lock to be on the safe-side...
+ // a write-lock to safe...
int tryacquire (void);
// Note, for interface uniformity with other synchronization
// wrappers we include the <tryacquire> method. This is implemented
- // as a write-lock to be on the safe-side...
+ // as a write-lock to be safe...
int release (void);
// Unlock a readers/writer lock.
@@ -263,8 +352,8 @@ private:
class ACE_Export ACE_Mutex
// = TITLE
- // ACE_Mutex wrapper (valid in same process or across processes
- // (depending on TYPE flag))
+ // <ACE_Mutex> wrapper (valid in same process or across
+ // processes (depending on TYPE flag)).
{
public:
ACE_Mutex (int type = USYNC_THREAD,
@@ -288,16 +377,24 @@ public:
// Release lock and unblock a thread at head of priority queue.
int acquire_read (void);
- // Acquire lock ownership (wait on priority queue if necessary).
+ // Acquire mutex ownership. This calls <acquire> and is only
+ // here to make the <ACE_Mutex> interface consistent with the
+ // other synchronization APIs.
int acquire_write (void);
- // Acquire lock ownership (wait on priority queue if necessary).
+ // Acquire mutex ownership. This calls <acquire> and is only
+ // here to make the <ACE_Mutex> interface consistent with the
+ // other synchronization APIs.
int tryacquire_read (void);
- // Conditionally acquire a lock (i.e., won't block).
+ // Conditionally acquire mutex (i.e., won't block). This calls
+ // <tryacquire> and is only here to make the <ACE_Mutex>
+ // interface consistent with the other synchronization APIs.
int tryacquire_write (void);
- // Conditionally acquire a lock (i.e., won't block).
+ // Conditionally acquire mutex (i.e., won't block). This calls
+ // <tryacquire> and is only here to make the <ACE_Mutex>
+ // interface consistent with the other synchronization APIs.
const ACE_mutex_t &lock (void) const;
// Return the underlying mutex.
@@ -324,7 +421,8 @@ class ACE_Export ACE_Process_Mutex
// processes).
{
public:
- ACE_Process_Mutex (LPCTSTR name = ACE_DEFAULT_MUTEX, void *arg = 0);
+ ACE_Process_Mutex (LPCTSTR name = ACE_DEFAULT_MUTEX,
+ void *arg = 0);
// Create a Process_Mutex, passing in the optional <name>.
~ACE_Process_Mutex (void);
@@ -669,16 +767,24 @@ public:
// Release lock and unblock a thread at head of priority queue.
int acquire_read (void);
- // Acquire lock ownership (wait on priority queue if necessary).
+ // Acquire mutex ownership. This calls <acquire> and is only here
+ // to make the <ACE_Thread_Mutex> interface consistent with the
+ // other synchronization APIs.
int acquire_write (void);
- // Acquire lock ownership (wait on priority queue if necessary).
+ // Acquire mutex ownership. This calls <acquire> and is only here
+ // to make the <ACE_Thread_Mutex> interface consistent with the
+ // other synchronization APIs.
int tryacquire_read (void);
- // Conditionally acquire a lock (i.e., won't block).
+ // Conditionally acquire mutex (i.e., won't block). This calls
+ // <tryacquire> and is only here to make the <ACE_Thread_Mutex>
+ // interface consistent with the other synchronization APIs.
int tryacquire_write (void);
- // Conditionally acquire a lock (i.e., won't block).
+ // Conditionally acquire mutex (i.e., won't block). This calls
+ // <tryacquire> and is only here to make the <ACE_Thread_Mutex>
+ // interface consistent with the other synchronization APIs.
const ACE_thread_mutex_t &lock (void) const;
// Return the underlying mutex.
@@ -912,8 +1018,10 @@ class ACE_Export ACE_Thread_Semaphore : public ACE_Semaphore
// only within on process.
{
public:
- ACE_Thread_Semaphore (u_int count, LPCTSTR name = 0,
- void * = 0, int max = 0x7FFFFFFF);
+ ACE_Thread_Semaphore (u_int count = 1, // By default make this unlocked.
+ LPCTSTR name = 0,
+ void * = 0,
+ int max = 0x7FFFFFFF);
// Initialize the semaphore, with an initial value of <count> and a
// maximum value of <max>.
diff --git a/ace/Synch.i b/ace/Synch.i
index 3bc15d21e39..12b4260c4ef 100644
--- a/ace/Synch.i
+++ b/ace/Synch.i
@@ -235,6 +235,86 @@ ACE_Semaphore::release (void)
return ACE_OS::sema_post (&this->semaphore_);
}
+// Acquire semaphore ownership. This calls <acquire> and is only
+// here to make the <ACE_Semaphore> interface consistent with the
+// other synchronization APIs.
+
+ACE_INLINE int
+ACE_Semaphore::acquire_read (void)
+{
+ return this->acquire ();
+}
+
+// Acquire semaphore ownership. This calls <acquire> and is only
+// here to make the <ACE_Semaphore> interface consistent with the
+// other synchronization APIs.
+
+ACE_INLINE int
+ACE_Semaphore::acquire_write (void)
+{
+ return this->acquire ();
+}
+
+// Conditionally acquire semaphore (i.e., won't block). This calls
+// <tryacquire> and is only here to make the <ACE_Semaphore>
+// interface consistent with the other synchronization APIs.
+
+ACE_INLINE int
+ACE_Semaphore::tryacquire_read (void)
+{
+ return this->tryacquire ();
+}
+
+// Conditionally acquire semaphore (i.e., won't block). This calls
+// <tryacquire> and is only here to make the <ACE_Semaphore>
+// interface consistent with the other synchronization APIs.
+
+ACE_INLINE int
+ACE_Semaphore::tryacquire_write (void)
+{
+ return this->tryacquire ();
+}
+
+// Acquire semaphore ownership. This calls <acquire> and is only here
+// to make the <ACE_Process_Semaphore> interface consistent with the
+// other synchronization APIs.
+
+ACE_INLINE int
+ACE_Process_Semaphore::acquire_read (void)
+{
+ return this->acquire ();
+}
+
+// Acquire semaphore ownership. This calls <acquire> and is only here
+// to make the <ACE_Process_Semaphore> interface consistent with the
+// other synchronization APIs.
+
+ACE_INLINE int
+ACE_Process_Semaphore::acquire_write (void)
+{
+ return this->acquire ();
+}
+
+// Conditionally acquire semaphore (i.e., won't block). This calls
+// <tryacquire> and is only here to make the <ACE_Process_Semaphore>
+// interface consistent with the other synchronization APIs.
+
+ACE_INLINE int
+ACE_Process_Semaphore::tryacquire_read (void)
+{
+ return this->tryacquire ();
+}
+
+// Conditionally acquire semaphore (i.e., won't block). This calls
+// <tryacquire> and is only here to make the <ACE_Process_Semaphore>
+// interface consistent with the other synchronization APIs.
+
+ACE_INLINE int
+ACE_Process_Semaphore::tryacquire_write (void)
+{
+ return this->tryacquire ();
+}
+
#if defined (ACE_HAS_THREADS)
ACE_INLINE const ACE_thread_mutex_t &
diff --git a/ace/Synch_T.h b/ace/Synch_T.h
index 167bf36950a..ee3ab76422e 100644
--- a/ace/Synch_T.h
+++ b/ace/Synch_T.h
@@ -23,6 +23,57 @@
// Forward decl
class ACE_Time_Value;
+template <class LOCKING_MECHANISM>
+class ACE_Lock_Adapter : public ACE_Lock
+ // = TITLE
+
+ // This is an adapter that allows applications to transparently
+ // combine the <ACE_Lock> abstract base class (which contains
+ // pure virtual methods) with any of the other concrete ACE
+ // synchronization classes (e.g., <ACE_Mutex>, <ACE_Semaphore>,
+ // <ACE_RW_Lock>, etc.).
+ //
+ // = DESCRIPTION
+ // This class uses a form of the Adapter pattern.
+{
+public:
+ typedef LOCKING_MECHANISM LOCK;
+
+ virtual int remove (void);
+ // Explicitly destroy the lock.
+
+ virtual int acquire (void);
+ // Block the thread until the lock is acquired.
+
+ virtual int tryacquire (void);
+ // Conditionally acquire the lock (i.e., won't block).
+
+ virtual int release (void);
+ // Release the lock.
+
+ virtual int acquire_read (void);
+ // Block until the thread acquires a read lock. If the locking
+ // mechanism doesn't support read locks then this just calls
+ // <acquire>.
+
+ virtual int acquire_write (void);
+ // Block until the thread acquires a write lock. If the locking
+ // mechanism doesn't support read locks then this just calls
+ // <acquire>.
+
+ virtual int tryacquire_read (void);
+ // Conditionally acquire a read lock. If the locking mechanism
+ // doesn't support read locks then this just calls <acquire>.
+
+ virtual int tryacquire_write (void);
+ // Conditionally acquire a write lock. If the locking mechanism
+ // doesn't support read locks then this just calls <acquire>.
+
+private:
+ LOCKING_MECHANISM lock_;
+ // The concrete locking mechanism that all the methods delegate to.
+};
+
template <class LOCK, class TYPE>
class ACE_Test_and_Set : public ACE_Event_Handler
{
diff --git a/ace/Synch_T.i b/ace/Synch_T.i
index 198975ee38a..252b90633f1 100644
--- a/ace/Synch_T.i
+++ b/ace/Synch_T.i
@@ -199,4 +199,72 @@ ACE_TSS<TYPE>::ts_get (void) const
return (TYPE *) &this->type_;
}
+// Explicitly destroy the lock.
+template <class LOCKING_MECHANISM> int
+ACE_Lock_Adapter<LOCKING_MECHANISM>::remove (void)
+{
+ return this->lock_.remove ();
+}
+
+// Block the thread until the lock is acquired.
+template <class LOCKING_MECHANISM> int
+ACE_Lock_Adapter<LOCKING_MECHANISM>::acquire (void)
+{
+ return this->lock_.acquire ();
+}
+
+// Conditionally acquire the lock (i.e., won't block).
+
+template <class LOCKING_MECHANISM> int
+ACE_Lock_Adapter<LOCKING_MECHANISM>::tryacquire (void)
+{
+ return this->lock_.tryacquire ();
+}
+
+// Release the lock.
+
+template <class LOCKING_MECHANISM> int
+ACE_Lock_Adapter<LOCKING_MECHANISM>::release (void)
+{
+ return this->lock_.release ();
+}
+
+// Block until the thread acquires a read lock. If the locking
+// mechanism doesn't support read locks then this just calls
+// <acquire>.
+
+template <class LOCKING_MECHANISM> int
+ACE_Lock_Adapter<LOCKING_MECHANISM>::acquire_read (void)
+{
+ return this->lock_.acquire_read ();
+}
+
+// Block until the thread acquires a write lock. If the locking
+// mechanism doesn't support read locks then this just calls
+// <acquire>.
+
+template <class LOCKING_MECHANISM> int
+ACE_Lock_Adapter<LOCKING_MECHANISM>::acquire_write (void)
+{
+ return this->lock_.acquire_write ();
+}
+
+// Conditionally acquire a read lock. If the locking mechanism
+// doesn't support read locks then this just calls <acquire>.
+
+template <class LOCKING_MECHANISM> int
+ACE_Lock_Adapter<LOCKING_MECHANISM>::tryacquire_read (void)
+{
+ return this->lock_.tryacquire_read ();
+}
+
+// Conditionally acquire a write lock. If the locking mechanism
+// doesn't support read locks then this just calls <acquire>.
+
+template <class LOCKING_MECHANISM> int
+ACE_Lock_Adapter<LOCKING_MECHANISM>::acquire_write (void)
+{
+ return this->lock_.acquire_write ();
+}
+
#endif /* defined (ACE_HAS_THREADS) && defined (ACE_HAS_THREAD_SPECIFIC_STORAGE) */
diff --git a/apps/Gateway/Gateway/Concurrency_Strategies.h b/apps/Gateway/Gateway/Concurrency_Strategies.h
index 8d1b2979a49..28e59a4b2e6 100644
--- a/apps/Gateway/Gateway/Concurrency_Strategies.h
+++ b/apps/Gateway/Gateway/Concurrency_Strategies.h
@@ -55,20 +55,20 @@ class Supplier_Proxy;
#if defined (ACE_HAS_THREADS) && (defined (USE_OUTPUT_MT) || defined (USE_INPUT_MT))
#if defined (USE_OUTPUT_MT)
-typedef Thr_Consumer_Proxy CONSUMER_HANDLER;
+typedef Thr_Consumer_Proxy CONSUMER_PROXY;
#else
-typedef Consumer_Proxy CONSUMER_HANDLER;
+typedef Consumer_Proxy CONSUMER_PROXY;
#endif /* USE_OUTPUT_MT */
#if defined (USE_INPUT_MT)
-typedef Thr_Supplier_Proxy SUPPLIER_HANDLER;
+typedef Thr_Supplier_Proxy SUPPLIER_PROXY;
#else
-typedef Supplier_Proxy SUPPLIER_HANDLER;
+typedef Supplier_Proxy SUPPLIER_PROXY;
#endif /* USE_INPUT_MT */
#else
// Instantiate a non-multi-threaded Gateway.
-typedef Supplier_Proxy SUPPLIER_HANDLER;
-typedef Consumer_Proxy CONSUMER_HANDLER;
+typedef Supplier_Proxy SUPPLIER_PROXY;
+typedef Consumer_Proxy CONSUMER_PROXY;
#endif /* ACE_HAS_THREADS */
#endif /* _CONCURRENCY_STRATEGIES */
diff --git a/apps/Gateway/Gateway/Config_Files.cpp b/apps/Gateway/Gateway/Config_Files.cpp
index 7e99902b0db..5b95dc4fbf0 100644
--- a/apps/Gateway/Gateway/Config_Files.cpp
+++ b/apps/Gateway/Gateway/Config_Files.cpp
@@ -27,7 +27,7 @@ Consumer_Config_File_Parser::read_entry (Consumer_Config_File_Entry &entry,
line_number++;
}
- // Get the logic id.
+ // Get the logical id.
if ((read_result = this->getint (entry.supplier_id_)) != FP::SUCCESS)
return read_result;
@@ -35,12 +35,12 @@ Consumer_Config_File_Parser::read_entry (Consumer_Config_File_Entry &entry,
if ((read_result = this->getint (entry.type_)) != FP::SUCCESS)
return read_result;
- // get all the destinations.
- entry.total_destinations_ = 0;
+ // get all the consumers.
+ entry.total_consumers_ = 0;
- while ((read_result = this->getint (entry.destinations_[entry.total_destinations_]))
+ while ((read_result = this->getint (entry.consumers_[entry.total_consumers_]))
== FP::SUCCESS)
- ++entry.total_destinations_; // do nothing
+ ++entry.total_consumers_; // do nothing (should check against max...)
if (read_result == FP::EOLINE || read_result == FP::EOFILE)
return FP::SUCCESS;
@@ -63,8 +63,8 @@ Connection_Config_File_Parser::read_entry (Connection_Config_File_Entry &entry,
{
if (read_result == FP::EOFILE)
return FP::EOFILE;
- else if (read_result == FP::EOLINE ||
- read_result == FP::COMMENT)
+ else if (read_result == FP::EOLINE
+ || read_result == FP::COMMENT)
line_number++;
}
@@ -72,19 +72,19 @@ Connection_Config_File_Parser::read_entry (Connection_Config_File_Entry &entry,
if ((read_result = this->getword (entry.host_)) != FP::SUCCESS)
return read_result;
- int port;
+ ACE_INT32 port;
// Get the port number.
if ((read_result = this->getint (port)) != FP::SUCCESS)
return read_result;
else
- entry.remote_poconsumer_ = (u_short) port;
+ entry.remote_port_ = (u_short) port;
- // Get the direction.
+ // Get the proxy role.
if ((read_result = this->getword (buf)) != FP::SUCCESS)
return read_result;
else
- entry.direction_ = buf[0];
+ entry.proxy_role_ = buf[0];
// Get the max retry delay.
if ((read_result = this->getint (entry.max_retry_delay_)) != FP::SUCCESS)
@@ -94,7 +94,7 @@ Connection_Config_File_Parser::read_entry (Connection_Config_File_Entry &entry,
if ((read_result = this->getint (port)) != FP::SUCCESS)
return read_result;
else
- entry.local_poconsumer_ = (u_short) port;
+ entry.local_port_ = (u_short) port;
return FP::SUCCESS;
}
@@ -108,7 +108,7 @@ int main (int argc, char *argv[])
exit (1);
}
FP_RETURN_TYPE result;
- Connection_Config_File_Entry CCentry;
+ Connection_Config_File_Entry entry;
Connection_Config_File_Parser CCfile;
CCfile.open (argv[1]);
@@ -118,15 +118,15 @@ int main (int argc, char *argv[])
printf ("ConnID\tHost\t\tRPort\tDir\tRetry\tLPort\n");
// Read config file line at a time.
- while ((result = CCfile.read_entry (CCentry, line_number)) != EOF)
+ while ((result = CCfile.read_entry (entry, line_number)) != EOF)
{
if (result != FP::SUCCESS)
// ACE_DEBUG ((LM_DEBUG, "Error line %d.\n", line_number));
cerr << "Error at line " << line_number << endl;
else
printf ("%d\t%s\t%d\t%c\t%d\t%c\t%d\n",
- CCentry.conn_id_, CCentry.host_, CCentry.remote_poconsumer_, CCentry.direction_,
- CCentry.max_retry_delay_, CCentry.transform_, CCentry.local_poconsumer_);
+ entry.conn_id_, entry.host_, entry.remote_port_, entry.proxy_role_,
+ entry.max_retry_delay_, entry.transform_, entry.local_port_);
}
CCfile.close();
@@ -148,8 +148,8 @@ int main (int argc, char *argv[])
{
printf ("%d\t%d\t%d\t%d\t",
entry.conn_id_, entry.supplier_id_, entry.type_);
- while (--entry.total_destinations_ >= 0)
- printf ("%d,", entry.destinations_[entry.total_destinations_]);
+ while (--entry.total_consumers_ >= 0)
+ printf ("%d,", entry.consumers_[entry.total_consumers_]);
printf ("\n");
}
}
diff --git a/apps/Gateway/Gateway/Config_Files.h b/apps/Gateway/Gateway/Config_Files.h
index 2620301e25b..eae0248eb8c 100644
--- a/apps/Gateway/Gateway/Config_Files.h
+++ b/apps/Gateway/Gateway/Config_Files.h
@@ -22,25 +22,25 @@
class Connection_Config_File_Entry
// = TITLE
- // Stores the Proxy_Handler entry for connection configuration.
+ // Stores connection configuration information.
{
public:
- int conn_id_;
+ ACE_INT32 conn_id_;
// Connection id for this Proxy_Handler.
char host_[BUFSIZ];
// Host to connect with.
- u_short remote_poconsumer_;
+ u_short remote_port_;
// Port to connect with.
- char direction_;
+ char proxy_role_;
// 'S' (supplier) or 'C' (consumer).
- int max_retry_delay_;
+ ACE_INT32 max_retry_delay_;
// Maximum amount of time to wait for reconnecting.
- u_short local_poconsumer_;
+ u_short local_port_;
// Our local port number.
};
@@ -59,23 +59,23 @@ class Consumer_Config_File_Entry
{
public:
enum {
- MAX_DESTINATIONS = 1000 // Total number of multicast destinations.
+ MAX_CONSUMERS = 1000 // Total number of multicast consumers.
};
- int conn_id_;
- // Connection id for this channel.
+ ACE_INT32 conn_id_;
+ // Connection id for this proxy.
- int supplier_id_;
- // Logical routing id for this channel.
+ ACE_INT32 supplier_id_;
+ // Logical supplier id for this proxy.
- int type_;
- // Type of payload in the message.
+ ACE_INT32 type_;
+ // Message type.
- int destinations_[MAX_DESTINATIONS];
- // Connection ids for destinations that we're routing to.
+ ACE_INT32 consumers_[MAX_CONSUMERS];
+ // Connection ids for consumers that we're routing to.
- int total_destinations_;
- // Total number of these destinations.
+ int total_consumers_;
+ // Total number of these consumers.
};
class Consumer_Config_File_Parser : public File_Parser<Consumer_Config_File_Entry>
diff --git a/apps/Gateway/Gateway/Consumer_Dispatch_Set.h b/apps/Gateway/Gateway/Consumer_Dispatch_Set.h
new file mode 100644
index 00000000000..71e2046b56e
--- /dev/null
+++ b/apps/Gateway/Gateway/Consumer_Dispatch_Set.h
@@ -0,0 +1,28 @@
+/* -*- C++ -*- */
+// $Id$
+
+// ============================================================================
+//
+// = LIBRARY
+// apps
+//
+// = FILENAME
+// Consumer_Dispatch_Set.h
+//
+// = AUTHOR
+// Doug Schmidt
+//
+// ============================================================================
+
+#if !defined (_DISPATCH_SET)
+#define _DISPATCH_SET
+
+#include "ace/Set.h"
+
+// Forward reference.
+class Proxy_Handler;
+
+typedef ACE_Unbounded_Set<Proxy_Handler *> Consumer_Dispatch_Set;
+typedef ACE_Unbounded_Set_Iterator<Proxy_Handler *> Consumer_Dispatch_Set_Iterator;
+
+#endif /* _DISPATCH_SET */
diff --git a/apps/Gateway/Gateway/Event.h b/apps/Gateway/Gateway/Event.h
index 24881c3e85b..5e288edf910 100644
--- a/apps/Gateway/Gateway/Event.h
+++ b/apps/Gateway/Gateway/Event.h
@@ -23,7 +23,7 @@
// Proxy_Handler in the Gateway.
typedef ACE_INT32 ACE_INT32;
-class Event_Addr
+class Event_Key
// = TITLE
// Address used to identify the source/destination of an event.
//
@@ -33,14 +33,14 @@ class Event_Addr
// Channel from the format of the data.
{
public:
- Event_Addr (ACE_INT32 cid = -1,
+ Event_Key (ACE_INT32 cid = -1,
u_char sid = 0,
u_char type = 0)
: conn_id_ (cid),
supplier_id_ (sid),
type_ (type) {}
- int operator== (const Event_Addr &event_addr) const
+ int operator== (const Event_Key &event_addr) const
{
return this->conn_id_ == event_addr.conn_id_
&& this->supplier_id_ == event_addr.supplier_id_
@@ -58,10 +58,13 @@ public:
// Event type.
};
-
class Event_Header
// = TITLE
- // Fixed sized header.
+ // Fixed sized header.
+ //
+ // = DESCRIPTION
+ // This is designed to have a sizeof (16) to avoid alignment
+ // problems on most platforms.
{
public:
typedef ACE_INT32 SUPPLIER_ID;
@@ -72,14 +75,35 @@ public:
INVALID_ID = -1 // No peer can validly use this number.
};
+ void decode (void)
+ {
+ this->len_ = ntohl (this->len_);
+ this->supplier_id_ = ntohl (this->supplier_id_);
+ this->type_ = ntohl (this->type_);
+ this->priority_ = ntohl (this->priority_);
+ }
+ // Decode from network byte order to host byte order.
+
+ void encode (void)
+ {
+ this->len_ = htonl (this->len_);
+ this->supplier_id_ = htonl (this->supplier_id_);
+ this->type_ = htonl (this->type_);
+ this->priority_ = htonl (this->priority_);
+ }
+ // Encode from host byte order to network byte order.
+
+ size_t len_;
+ // Length of the data_ payload, in bytes.
+
SUPPLIER_ID supplier_id_;
// Source ID.
ACE_INT32 type_;
// Event type.
- size_t len_;
- // Length of the entire event (including data payload) in bytes.
+ ACE_INT32 priority_;
+ // Event priority.
};
class Event
diff --git a/apps/Gateway/Gateway/Event_Channel.cpp b/apps/Gateway/Gateway/Event_Channel.cpp
index d146ddfb362..02f2cd465f8 100644
--- a/apps/Gateway/Gateway/Event_Channel.cpp
+++ b/apps/Gateway/Gateway/Event_Channel.cpp
@@ -2,38 +2,35 @@
// $Id$
#define ACE_BUILD_SVC_DLL
-#include "ace/Get_Opt.h"
-#include "Config_Files.h"
#include "Proxy_Handler_Connector.h"
#include "Event_Channel.h"
-#if !defined (ACE_EVENT_CHANNEL_C)
-#define ACE_EVENT_CHANNEL_C
+ACE_Event_Channel_Options::ACE_Event_Channel_Options (void)
+ : performance_window_ (0),
+ blocking_semantics_ (ACE_NONBLOCK),
+ socket_queue_size_ (0)
+{
+}
-template <class SH, class CH>
-ACE_Event_Channel<SH, CH>::~ACE_Event_Channel (void)
+ACE_Event_Channel::~ACE_Event_Channel (void)
{
}
-template <class SH, class CH>
-ACE_Event_Channel<SH, CH>::ACE_Event_Channel (void)
- : connection_config_file_ ("connection_config"),
- consumer_config_file_ ("consumer_config"),
- active_connector_role_ (1),
- performance_window_ (0),
- blocking_semantics_ (ACE_NONBLOCK),
- debug_ (0),
- connector_ (0),
- socket_queue_size_ (0)
+ACE_Event_Channel::ACE_Event_Channel (void)
+{
+}
+
+ACE_Event_Channel_Options &
+ACE_Event_Channel::options (void)
{
+ return this->options_;
}
-template <class SH, class CH> int
-ACE_Event_Channel<SH, CH>::handle_timeout (const ACE_Time_Value &,
- const void *)
+ACE_Event_Channel::handle_timeout (const ACE_Time_Value &,
+ const void *)
{
ACE_DEBUG ((LM_DEBUG, "(%t) doing the performance timeout here...\n"));
- CONNECTION_MAP_ITERATOR cti (this->connection_map_);
+ CONNECTION_MAP_ITERATOR cmi (this->connection_map_);
// If we've got a ACE_Thread Manager then use it to suspend all the
// threads. This will enable us to get an accurate count.
@@ -47,17 +44,18 @@ ACE_Event_Channel<SH, CH>::handle_timeout (const ACE_Time_Value &,
size_t total_bytes_in = 0;
size_t total_bytes_out = 0;
- // Iterate through the consumer map connecting all the Proxy_Handlers.
+ // Iterate through the connection map summing up the number of bytes
+ // sent/received.
for (CONNECTION_MAP_ENTRY *me = 0;
- cti.next (me) != 0;
- cti.advance ())
+ cmi.next (me) != 0;
+ cmi.advance ())
{
Proxy_Handler *proxy_handler = me->int_id_;
- if (proxy_handler->direction () == 'C')
+ if (proxy_handler->proxy_role () == 'C')
total_bytes_out += proxy_handler->total_bytes ();
- else // proxy_handler->direction () == 'S'
+ else // proxy_handler->proxy_role () == 'S'
total_bytes_in += proxy_handler->total_bytes ();
}
@@ -74,13 +72,13 @@ ACE_Event_Channel<SH, CH>::handle_timeout (const ACE_Time_Value &,
(float) (total_bytes_out * 8 / (float) (1024*1024*this->performance_window_)));
#else
ACE_DEBUG ((LM_DEBUG, "(%t) after %d seconds, \ntotal_bytes_in = %d\ntotal_bytes_out = %d\n",
- this->performance_window_,
- total_bytes_in,
+ this->options ().performance_window_,
+ total_bytes_in,
total_bytes_out));
ACE_DEBUG ((LM_DEBUG, "(%t) %f Mbits/sec received.\n",
- (float) (total_bytes_in * 8 / (float) (1024*1024*this->performance_window_))));
+ (float) (total_bytes_in * 8 / (float) (1024 * 1024 * this->options ().performance_window_))));
ACE_DEBUG ((LM_DEBUG, "(%t) %f Mbits/sec sent.\n",
- (float) (total_bytes_out * 8 / (float) (1024*1024*this->performance_window_))));
+ (float) (total_bytes_out * 8 / (float) (1024 * 1024 * this->options ().performance_window_))));
#endif /* ACE_NLOGGING */
#if defined (USE_INPUT_MT) || defined (USE_OUTPUT_MT)
@@ -95,31 +93,177 @@ ACE_Event_Channel<SH, CH>::handle_timeout (const ACE_Time_Value &,
return 0;
}
+ACE_Event_Channel::put (ACE_Message_Block *forward_addr,
+ ACE_Time_Value *)
+{
+ // We got a valid event, so determine its virtual forwarding
+ // address, which is stored in the first of the two event blocks,
+ // which are chained together by this->recv().
+
+ Event_Key *forwarding_addr = (Event_Key *) forward_addr->rd_ptr ();
+
+ // Skip over the address portion and get the data.
+ ACE_Message_Block *data = forward_addr->cont ();
+
+ // <dispatch_set> points to the set of Consumers associated with
+ // this forwarding address.
+ Consumer_Dispatch_Set *dispatch_set = 0;
+
+ if (this->efd_.find (*forwarding_addr, dispatch_set) == -1)
+ // Failure.
+ ACE_ERROR ((LM_DEBUG,
+ "(%t) find failed on conn id = %d, logical id = %d, type = %d\n",
+ forwarding_addr->conn_id_,
+ forwarding_addr->supplier_id_,
+ forwarding_addr->type_));
+ else
+ {
+ // Check to see if there are any consumers.
+ if (dispatch_set->size () == 0)
+ ACE_DEBUG ((LM_WARNING,
+ "there are no active consumers for this event currently\n"));
+
+ else // There are consumers, so forward the event.
+ {
+ Consumer_Dispatch_Set_Iterator dsi (*dispatch_set);
+
+ // At this point, we should assign a thread-safe locking
+ // strategy to the Message_Block is we're running in a
+ // multi-threaded configuration.
+ // data->locking_strategy (MB_Locking_Strategy::instance ());
+
+ for (Proxy_Handler **proxy_handler = 0;
+ dsi.next (proxy_handler) != 0;
+ dsi.advance ())
+ {
+ // Only process active proxy_handlers.
+ if ((*proxy_handler)->state () == Proxy_Handler::ESTABLISHED)
+ {
+ // Duplicate the event portion via reference
+ // counting.
+ ACE_Message_Block *dup_msg = data->duplicate ();
+
+ ACE_DEBUG ((LM_DEBUG, "(%t) sending to peer %d\n",
+ (*proxy_handler)->id ()));
+
+ if ((*proxy_handler)->put (dup_msg) == -1)
+ {
+ if (errno == EWOULDBLOCK) // The queue has filled up!
+ ACE_ERROR ((LM_ERROR, "(%t) %p\n",
+ "gateway is flow controlled, so we're dropping events"));
+ else
+ ACE_ERROR ((LM_ERROR, "(%t) %p transmission error to peer %d\n",
+ "put", (*proxy_handler)->id ()));
+
+ // We are responsible for releasing an
+ // ACE_Message_Block if failures occur.
+ dup_msg->release ();
+ }
+ }
+ }
+ }
+ }
+
+ // Release the memory in the message block.
+ forward_addr->release ();
+ return 0;
+}
+
+ACE_Event_Channel::svc (void)
+{
+ return 0;
+}
+
+int
+ACE_Event_Channel::initiate_proxy_connection (Proxy_Handler *proxy_handler,
+ ACE_Synch_Options &synch_options)
+{
+ return this->connector_.initiate_connection (proxy_handler,
+ synch_options);
+}
+
+int
+ACE_Event_Channel::complete_proxy_connection (Proxy_Handler *proxy_handler)
+{
+ int option = proxy_handler->proxy_role () == 'S' ? SO_RCVBUF : SO_SNDBUF;
+ int socket_queue_size = this->options ().socket_queue_size_;
+
+ if (proxy_handler->peer ().set_option (SOL_SOCKET,
+ option,
+ &socket_queue_size,
+ sizeof (int)) == -1)
+ ACE_ERROR ((LM_ERROR, "(%t) %p\n", "set_option"));
+
+ proxy_handler->thr_mgr (ACE_Service_Config::thr_mgr ());
+
+ // Our state is now "established."
+ proxy_handler->state (Proxy_Handler::ESTABLISHED);
+
+ // Restart the timeout to 1.
+ proxy_handler->timeout (1);
+
+ ACE_INT32 id = htonl (proxy_handler->id ());
+
+ // Send the connection id to the peerd.
+
+ ssize_t n = proxy_handler->peer ().send ((const void *) &id, sizeof id);
+
+ if (n != sizeof id)
+ ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n",
+ n == 0 ? "peer has closed down unexpectedly" : "send"),
+ -1);
+}
+
+// Restart connection (blocking_semantics dicates whether we restart
+// synchronously or asynchronously).
+
+int
+ACE_Event_Channel::reinitiate_proxy_connection (Proxy_Handler *proxy_handler)
+{
+ // Skip over deactivated descriptors.
+ if (proxy_handler->get_handle () != ACE_INVALID_HANDLE)
+ {
+ // Make sure to close down peer to reclaim descriptor.
+ proxy_handler->peer ().close ();
+
+ ACE_DEBUG ((LM_DEBUG,
+ "(%t) scheduling reinitiation of Proxy_Handler %d\n",
+ proxy_handler->id ()));
+
+ // Reschedule ourselves to try and connect again.
+ if (ACE_Service_Config::reactor ()->schedule_timer
+ (proxy_handler, 0, proxy_handler->timeout ()) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n",
+ "schedule_timer"), -1);
+ }
+ return 0;
+}
+
// Initiate connections with the Consumer and Supplier Peers.
-template <class SH, class CH> int
-ACE_Event_Channel<SH, CH>::initiate_connections (void)
+ACE_Event_Channel::initiate_connections (void)
{
- CONNECTION_MAP_ITERATOR cti (this->connection_map_);
+ CONNECTION_MAP_ITERATOR cmi (this->connection_map_);
ACE_Synch_Options synch_options;
- if (this->blocking_semantics_ == ACE_NONBLOCK)
+ if (this->options ().blocking_semantics_ == ACE_NONBLOCK)
synch_options = ACE_Synch_Options::asynch;
else
synch_options = ACE_Synch_Options::synch;
- // Iterate through the Consumer Map connecting all the Proxy_Handlers.
+ // Iterate through the Consumer Map connecting all the
+ // Proxy_Handlers.
for (CONNECTION_MAP_ENTRY *me = 0;
- cti.next (me) != 0;
- cti.advance ())
+ cmi.next (me) != 0;
+ cmi.advance ())
{
Proxy_Handler *proxy_handler = me->int_id_;
- if (this->connector_->initiate_connection
+ if (this->initiate_proxy_connection
(proxy_handler, synch_options) == -1)
- continue;
+ continue; // Failures are handled elsewhere...
}
return 0;
@@ -128,8 +272,7 @@ ACE_Event_Channel<SH, CH>::initiate_connections (void)
// This method gracefully shuts down all the Handlers in the
// Proxy_Handler Connection Map.
-template <class SH, class CH> int
-ACE_Event_Channel<SH, CH>::close (void)
+ACE_Event_Channel::close (u_long)
{
#if defined (USE_INPUT_MT) || defined (USE_OUTPUT_MT)
ACE_DEBUG ((LM_DEBUG, "(%t) suspending all threads\n"));
@@ -147,7 +290,7 @@ ACE_Event_Channel<SH, CH>::close (void)
{
Proxy_Handler *proxy_handler = me->int_id_;
- ACE_DEBUG ((LM_DEBUG, "(%t) closing down route %d\n",
+ ACE_DEBUG ((LM_DEBUG, "(%t) closing down connection %d\n",
proxy_handler->id ()));
if (proxy_handler->state () != Proxy_Handler::IDLE)
@@ -159,247 +302,76 @@ ACE_Event_Channel<SH, CH>::close (void)
proxy_handler->destroy (); // Will trigger a delete.
}
- // Free up the resources allocated dynamically by the ACE_Connector.
- delete this->connector_;
return 0;
}
-template <class SH, class CH> int
-ACE_Event_Channel<SH, CH>::open (int argc, char *argv[])
+int
+ACE_Event_Channel::find_proxy (ACE_INT32 conn_id,
+ Proxy_Handler *&proxy_handler)
{
- this->parse_args (argc, argv);
-
- ACE_NEW_RETURN (this->connector_, Proxy_Handler_Connector (), -1);
-
- // Ignore SIPPIPE so each Consumer_Proxy can handle it.
- ACE_Sig_Action sig (ACE_SignalHandler (SIG_IGN), SIGPIPE);
-
- if (this->active_connector_role_)
- {
- // Parse the connection configuration file.
- this->parse_connection_config_file ();
-
- // Parse the consumer map config file and build the consumer map.
- this->parse_consumer_config_file ();
-
- // Initiate connections with the peers.
- this->initiate_connections ();
- }
-
- // If this->performance_window_ > 0 start a timer.
-
- if (this->performance_window_ > 0)
- {
- if (ACE_Service_Config::reactor ()->schedule_timer
- (this, 0, this->performance_window_) == -1)
- ACE_ERROR ((LM_ERROR, "(%t) %p\n", "schedule_timer"));
- else
- ACE_DEBUG ((LM_DEBUG, "starting timer for %d seconds...\n",
- this->performance_window_));
- }
-
- return 0;
+ return this->connection_map_.find (conn_id, proxy_handler);
}
-// Parse and build the connection table.
-
-template <class SH, class CH> int
-ACE_Event_Channel<SH, CH>::parse_connection_config_file (void)
+int
+ACE_Event_Channel::bind_proxy (Proxy_Handler *proxy_handler)
{
- // File that contains the consumer map configuration information.
- Connection_Config_File_Parser connection_file;
- Connection_Config_File_Entry entry;
- int file_empty = 1;
- int line_number = 0;
-
- if (connection_file.open (this->connection_config_file_) == -1)
- ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", this->connection_config_file_), -1);
-
- // Read config file one line at a time.
- while (connection_file.read_entry (entry, line_number) != FP::EOFILE)
+ switch (this->connection_map_.bind (proxy_handler->id (), proxy_handler))
{
- file_empty = 0;
-
- if (this->debug_)
- ACE_DEBUG ((LM_DEBUG, "(%t) conn id = %d, host = %s, remote port = %d, "
- "direction = %c, max retry timeout = %d, local port = %d\n",
- entry.conn_id_,
- entry.host_,
- entry.remote_poconsumer_,
- entry.direction_,
- entry.max_retry_delay_,
- entry.local_poconsumer_));
-
- Proxy_Handler *proxy_handler = 0;
-
- // The next few lines of code are dependent on whether we are
- // making an Supplier_Proxy or an Consumer_Proxy.
-
- if (entry.direction_ == 'C') // Configure a Consumer_Proxy.
- ACE_NEW_RETURN (proxy_handler,
- CONSUMER_HANDLER (&this->efd_,
- this->connector_,
- ACE_Service_Config::thr_mgr (),
- this->socket_queue_size_),
- -1);
- else /* direction == 'S' */ // Configure a Supplier_Proxy.
- ACE_NEW_RETURN (proxy_handler,
- SUPPLIER_HANDLER (&this->efd_,
- this->connector_,
- ACE_Service_Config::thr_mgr (),
- this->socket_queue_size_),
- -1);
-
- // The following code is common to both Supplier_Proxys_ and
- // Consumer_Proxys.
-
- // Initialize the routing entry's peer addressing info.
- proxy_handler->bind (ACE_INET_Addr (entry.remote_poconsumer_, entry.host_),
- ACE_INET_Addr (entry.local_poconsumer_), entry.conn_id_);
-
- // Initialize max timeout.
- proxy_handler->max_timeout (entry.max_retry_delay_);
-
- // Try to bind the new Proxy_Handler to the connection ID.
- switch (this->connection_map_.bind (entry.conn_id_, proxy_handler))
- {
- case -1:
- ACE_ERROR_RETURN ((LM_ERROR,
- "(%t) bind failed for connection %d\n",
- entry.conn_id_), -1);
- /* NOTREACHED */
- case 1: // Oops, found a duplicate!
- ACE_DEBUG ((LM_DEBUG,
- "(%t) duplicate connection %d, already bound\n",
- entry.conn_id_));
- break;
- case 0:
- // Success.
- break;
- }
+ case -1:
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "(%t) bind failed for connection %d\n",
+ proxy_handler->id ()), -1);
+ /* NOTREACHED */
+ case 1: // Oops, found a duplicate!
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "(%t) duplicate connection %d, already bound\n",
+ proxy_handler->id ()), -1);
+ /* NOTREACHED */
+ case 0:
+ // Success.
+ return 0;
}
-
- if (file_empty)
- ACE_ERROR ((LM_WARNING,
- "warning: connection proxy_handler configuration file was empty\n"));
- return 0;
}
-template <class SH, class CH> int
-ACE_Event_Channel<SH, CH>::parse_consumer_config_file (void)
+int
+ACE_Event_Channel::subscribe (const Event_Key &event_addr,
+ Consumer_Dispatch_Set *cds)
{
- // File that contains the consumer map configuration information.
- Consumer_Config_File_Parser consumer_file;
- Consumer_Config_File_Entry entry;
- int file_empty = 1;
- int line_number = 0;
-
- if (consumer_file.open (this->consumer_config_file_) == -1)
- ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", this->consumer_config_file_), -1);
-
- // Read config file line at a time.
- while (consumer_file.read_entry (entry, line_number) != FP::EOFILE)
+ // Bind with consumer map, keyed by peer address.
+ switch (this->efd_.bind (event_addr, cds))
{
- file_empty = 0;
-
- if (this->debug_)
- {
- ACE_DEBUG ((LM_DEBUG, "(%t) conn id = %d, logical id = %d, payload = %d, "
- "number of destinations = %d\n",
- entry.conn_id_,
- entry.supplier_id_,
- entry.type_,
- entry.total_destinations_));
- for (int i = 0; i < entry.total_destinations_; i++)
- ACE_DEBUG ((LM_DEBUG, "(%t) destination[%d] = %d\n",
- i, entry.destinations_[i]));
- }
-
- Dispatch_Set *dispatch_set;
- ACE_NEW_RETURN (dispatch_set, Dispatch_Set, -1);
-
- Event_Addr event_addr (entry.conn_id_,
- entry.supplier_id_,
- entry.type_);
-
- // Add the destinations to the Routing Entry.
- for (int i = 0; i < entry.total_destinations_; i++)
- {
- Proxy_Handler *proxy_handler = 0;
-
- // Lookup destination and add to Dispatch_Set set if found.
- if (this->connection_map_.find (entry.destinations_[i],
- proxy_handler) != -1)
- dispatch_set->insert (proxy_handler);
- else
- ACE_ERROR ((LM_ERROR, "(%t) not found: destination[%d] = %d\n",
- i, entry.destinations_[i]));
- }
-
- // Bind with consumer map, keyed by peer address.
- switch (this->efd_.bind (event_addr, dispatch_set))
- {
- case -1:
- ACE_ERROR_RETURN ((LM_ERROR, "(%t) bind failed for connection %d\n",
- entry.conn_id_), -1);
- /* NOTREACHED */
- case 1: // Oops, found a duplicate!
- ACE_DEBUG ((LM_DEBUG, "(%t) duplicate consumer map entry %d, "
- "already bound\n", entry.conn_id_));
- break;
- case 0:
- // Success.
- break;
- }
+ case -1:
+ ACE_ERROR_RETURN ((LM_ERROR, "(%t) bind failed for connection %d\n",
+ event_addr.conn_id_), -1);
+ /* NOTREACHED */
+ case 1: // Oops, found a duplicate!
+ ACE_ERROR_RETURN ((LM_DEBUG, "(%t) duplicate consumer map entry %d, "
+ "already bound\n", event_addr.conn_id_), -1);
+ /* NOTREACHED */
+ case 0:
+ // Success.
+ return 0;
}
-
- if (file_empty)
- ACE_ERROR ((LM_WARNING,
- "warning: consumer map configuration file was empty\n"));
- return 0;
}
-// Parse the "command-line" arguments and set the corresponding flags.
-
-template <class SH, class CH> int
-ACE_Event_Channel<SH, CH>::parse_args (int argc, char *argv[])
+ACE_Event_Channel::open (void *)
{
- ACE_Get_Opt get_opt (argc, argv, "bc:dpr:q:w:", 0);
+ // Ignore SIPPIPE so each Consumer_Proxy can handle it.
+ ACE_Sig_Action sig (ACE_SignalHandler (SIG_IGN), SIGPIPE);
- for (int c; (c = get_opt ()) != -1; )
+#if 0
+ // If this->performance_window_ > 0 start a timer.
+
+ if (this->options ().performance_window_ > 0)
{
- switch (c)
- {
- case 'b': // Use blocking connection establishment.
- this->blocking_semantics_ = 0;
- break;
- case 'c':
- this->connection_config_file_ = get_opt.optarg;
- break;
- case 'd':
- this->debug_ = 1;
- break;
- case 'p':
- // We are not playing the active Connector role.
- this->active_connector_role_ = 0;
- break;
- case 'q':
- this->socket_queue_size_ = ACE_OS::atoi (get_opt.optarg);
- break;
- case 'r':
- this->consumer_config_file_ = get_opt.optarg;
- break;
- case 'w': // Time performance for a designated amount of time.
- this->performance_window_ = ACE_OS::atoi (get_opt.optarg);
- // Use blocking connection semantics so that we get accurate
- // timings (since all connections start at once).
- this->blocking_semantics_ = 0;
- break;
- default:
- break;
- }
+ if (ACE_Service_Config::reactor ()->schedule_timer
+ (this, 0, this->options ().performance_window_) == -1)
+ ACE_ERROR ((LM_ERROR, "(%t) %p\n", "schedule_timer"));
+ else
+ ACE_DEBUG ((LM_DEBUG, "starting timer for %d seconds...\n",
+ this->options ().performance_window_)));
}
+#endif
+
return 0;
}
-
-#endif /* ACE_EVENT_CHANNEL_C */
diff --git a/apps/Gateway/Gateway/Event_Channel.h b/apps/Gateway/Gateway/Event_Channel.h
index 4e7afc5d328..1ecf468addf 100644
--- a/apps/Gateway/Gateway/Event_Channel.h
+++ b/apps/Gateway/Gateway/Event_Channel.h
@@ -19,64 +19,95 @@
#include "Proxy_Handler_Connector.h"
-template <class SUPPLIER_HANDLER, class CONSUMER_HANDLER>
-class ACE_Svc_Export ACE_Event_Channel : public ACE_Event_Handler
+class ACE_Svc_Export ACE_Event_Channel_Options
+ // = TITLE
+ // Maintains the options for an <ACE_Event_Channel>.
+{
+public:
+ ACE_Event_Channel_Options (void);
+ // Initialization.
+
+ int performance_window_;
+ // Number of seconds after connection establishment to report
+ // throughput.
+
+ int blocking_semantics_;
+ // 0 == blocking connects, ACE_NONBLOCK == non-blocking connects.
+
+ int socket_queue_size_;
+ // Size of the socket queue (0 means "use default").
+};
+
+class ACE_Svc_Export ACE_Event_Channel : public ACE_Task<SYNCH_STRATEGY>
// = TITLE
// Define a generic Event_Channel.
+ //
+ // = DESCRIPTION
{
public:
// = Initialization and termination methods.
ACE_Event_Channel (void);
~ACE_Event_Channel (void);
- int open (int argc, char *argv[]);
- // Initialize the Channel.
+ virtual int open (void * = 0);
+ // Open the channel.
- int close (void);
+ virtual int close (u_long = 0);
// Close down the Channel.
-private:
- int parse_args (int argc, char *argv[]);
- // Parse the command-line arguments.
+ // = Proxy management methods.
+ int initiate_proxy_connection (Proxy_Handler *,
+ ACE_Synch_Options & = ACE_Synch_Options::synch);
+ // Initiate the connection of the <Proxy_Handler> to its peer.
- int parse_connection_config_file (void);
- // Parse the connection configuration file.
+ int complete_proxy_connection (Proxy_Handler *);
+ // Complete the initialization of the <Proxy_Handler> once it's
+ // connected to its Peer.
- int parse_consumer_config_file (void);
- // Parse the consumer map configuration file.
+ int reinitiate_proxy_connection (Proxy_Handler *);
+ // Reinitiate a connection asynchronously when the Peer fails.
- int initiate_connections (void);
- // Initiate connections to the peers.
+ int bind_proxy (Proxy_Handler *);
+ // Bind the <Proxy_Handler> to the <connection_map_>.
- virtual int handle_timeout (const ACE_Time_Value &, const void *arg);
- // Perform timer-based performance profiling.
+ int find_proxy (ACE_INT32 conn_id, Proxy_Handler *&);
+ // Locate the <Proxy_Handler> with <conn_id>.
- const char *connection_config_file_;
- // Name of the connection configuration file.
+ int subscribe (const Event_Key &event_addr,
+ Consumer_Dispatch_Set *cds);
+ // Subscribe the <Consumer_Dispatch_Set> to receive events that
+ // match <Event_Key>.
- const char *consumer_config_file_;
- // Name of the consumer map configuration file.
+ // = Event forwarding method.
+ virtual int put (ACE_Message_Block *mb, ACE_Time_Value * = 0);
+ // Pass <mb> to the Event Channel so it can forward it to Consumers.
- int active_connector_role_;
- // Enabled if we are playing the role of the active Connector.
+ ACE_Event_Channel_Options &options (void);
+ // Points to the Event_Channel options.
- int performance_window_;
- // Number of seconds after connection establishment to report
- // throughput.
-
- int blocking_semantics_;
- // 0 == blocking connects, ACE_NONBLOCK == non-blocking connects.
+ int initiate_connections (void);
+ // Initiate connections to the peers.
+
+private:
+ virtual int svc (void);
+ // Run as an active object.
- int debug_;
- // Are we debugging?
+ int parse_args (int argc, char *argv[]);
+ // Parse the command-line arguments.
- Proxy_Handler_Connector *connector_;
- // This is used to establish the connections actively.
+ virtual int handle_timeout (const ACE_Time_Value &,
+ const void *arg);
+ // Perform timer-based performance profiling.
- int socket_queue_size_;
- // Size of the socket queue (0 means "use default").
+ Proxy_Handler_Connector connector_;
+ // Used to establish the connections actively.
+
+ // Proxy_Handler_Acceptor acceptor_;
+ // Used to establish the connections passively.
// = Make life easier by defining typedefs.
+ // Note that Proxy_Handler is assumed to the base class of
+ // SUPPLIER_PROXY and CONSUMER_PROXY.
typedef ACE_Map_Manager<ACE_INT32, Proxy_Handler *, MAP_MUTEX> CONNECTION_MAP;
typedef ACE_Map_Iterator<ACE_INT32, Proxy_Handler *, MAP_MUTEX> CONNECTION_MAP_ITERATOR;
typedef ACE_Map_Entry<ACE_INT32, Proxy_Handler *> CONNECTION_MAP_ENTRY;
@@ -85,16 +116,10 @@ private:
// Table that maps Connection IDs to Proxy_Handler *'s.
Event_Forwarding_Discriminator efd_;
- // Map that associates event addresses to a set of Consumer_Proxy
- // *'s.
-};
+ // Map that associates an event to a set of Consumer_Proxy *'s.
-#if defined (ACE_TEMPLATES_REQUIRE_SOURCE)
-#include "Event_Channel.cpp"
-#endif /* ACE_TEMPLATES_REQUIRE_SOURCE */
-
-#if defined (ACE_TEMPLATES_REQUIRE_PRAGMA)
-#pragma implementation ("Event_Channel.cpp")
-#endif /* ACE_TEMPLATES_REQUIRE_PRAGMA */
+ ACE_Event_Channel_Options options_;
+ // The options for the channel.
+};
#endif /* ACE_EVENT_CHANNEL */
diff --git a/apps/Gateway/Gateway/Event_Forwarding_Discriminator.cpp b/apps/Gateway/Gateway/Event_Forwarding_Discriminator.cpp
index 8261ea13eb2..4dfbb658c1f 100644
--- a/apps/Gateway/Gateway/Event_Forwarding_Discriminator.cpp
+++ b/apps/Gateway/Gateway/Event_Forwarding_Discriminator.cpp
@@ -6,51 +6,49 @@
#include "Event_Forwarding_Discriminator.h"
-// Bind the Event_Addr to the INT_ID.
+// Bind the Event_Key to the INT_ID.
int
-Event_Forwarding_Discriminator::bind (Event_Addr event_addr,
- Dispatch_Set *Dispatch_Set)
+Event_Forwarding_Discriminator::bind (Event_Key event_addr,
+ Consumer_Dispatch_Set *cds)
{
- return this->map_.bind (event_addr, Dispatch_Set);
+ return this->map_.bind (event_addr, cds);
}
-// Find the Dispatch_Set corresponding to the Event_Addr.
+// Find the Consumer_Dispatch_Set corresponding to the Event_Key.
int
-Event_Forwarding_Discriminator::find (Event_Addr event_addr,
- Dispatch_Set *&Dispatch_Set)
+Event_Forwarding_Discriminator::find (Event_Key event_addr,
+ Consumer_Dispatch_Set *&cds)
{
- return this->map_.find (event_addr, Dispatch_Set);
+ return this->map_.find (event_addr, cds);
}
-// Unbind (remove) the Event_Addr from the map.
+// Unbind (remove) the Event_Key from the map.
int
-Event_Forwarding_Discriminator::unbind (Event_Addr event_addr)
+Event_Forwarding_Discriminator::unbind (Event_Key event_addr)
{
return this->map_.unbind (event_addr);
}
-Event_Forwarding_Discriminator_Iterator::Event_Forwarding_Discriminator_Iterator (Event_Forwarding_Discriminator &rt)
- : map_iter_ (rt.map_)
+Event_Forwarding_Discriminator_Iterator::Event_Forwarding_Discriminator_Iterator
+ (Event_Forwarding_Discriminator &rt)
+ : map_iter_ (rt.map_)
{
}
int
-Event_Forwarding_Discriminator_Iterator::next (Dispatch_Set *&ss)
+Event_Forwarding_Discriminator_Iterator::next (Consumer_Dispatch_Set *&cds)
{
- // Loop in order to skip over inactive entries if necessary.
-
- for (ACE_Map_Entry<Event_Addr, Dispatch_Set *> *temp = 0;
- this->map_iter_.next (temp) != 0;
- this->advance ())
+ ACE_Map_Entry<Event_Key, Consumer_Dispatch_Set *> *temp;
+ if (this->map_iter_.next (temp) == 0)
+ return 0;
+ else
{
- // Otherwise, return the next item.
- ss = temp->int_id_;
+ cds = temp->int_id_;
return 1;
}
- return 0;
}
int
diff --git a/apps/Gateway/Gateway/Event_Forwarding_Discriminator.h b/apps/Gateway/Gateway/Event_Forwarding_Discriminator.h
index 35a594b61b5..9b7531c1f46 100644
--- a/apps/Gateway/Gateway/Event_Forwarding_Discriminator.h
+++ b/apps/Gateway/Gateway/Event_Forwarding_Discriminator.h
@@ -20,29 +20,27 @@
#include "ace/Map_Manager.h"
#include "Concurrency_Strategies.h"
#include "Event.h"
-#include "Dispatch_Set.h"
+#include "Consumer_Dispatch_Set.h"
class Event_Forwarding_Discriminator
{
// = TITLE
- // Define a generic consumer map based on the ACE Map_Manager.
- //
- // = DESCRIPTION
- // This class makes it easier to use the Map_Manager.
+ // Map events to the set of Consumer_Proxies that have subscribed
+ // to receive the event.
public:
- int bind (Event_Addr event, Dispatch_Set *Dispatch_Set);
- // Associate Event with the Dispatch_Set.
+ int bind (Event_Key event, Consumer_Dispatch_Set *cds);
+ // Associate Event with the Consumer_Dispatch_Set.
- int find (Event_Addr event, Dispatch_Set *&Dispatch_Set);
- // Break any association of EXID.
-
- int unbind (Event_Addr event);
+ int unbind (Event_Key event);
// Locate EXID and pass out parameter via INID. If found,
// return 0, else -1.
+ int find (Event_Key event, Consumer_Dispatch_Set *&cds);
+ // Break any association of EXID.
+
public:
- ACE_Map_Manager<Event_Addr, Dispatch_Set *, MAP_MUTEX> map_;
- // Map that associates Event Addrs (external ids) with Dispatch_Set *'s
+ ACE_Map_Manager<Event_Key, Consumer_Dispatch_Set *, MAP_MUTEX> map_;
+ // Map that associates Event Addrs (external ids) with Consumer_Dispatch_Set *'s
// <internal IDs>.
};
@@ -52,11 +50,11 @@ class Event_Forwarding_Discriminator_Iterator
// Define an iterator for the Consumer Map.
public:
Event_Forwarding_Discriminator_Iterator (Event_Forwarding_Discriminator &mm);
- int next (Dispatch_Set *&);
+ int next (Consumer_Dispatch_Set *&);
int advance (void);
private:
- ACE_Map_Iterator<Event_Addr, Dispatch_Set *, MAP_MUTEX> map_iter_;
+ ACE_Map_Iterator<Event_Key, Consumer_Dispatch_Set *, MAP_MUTEX> map_iter_;
// Map we are iterating over.
};
#endif /* _CONSUMER_MAP_H */
diff --git a/apps/Gateway/Gateway/File_Parser.cpp b/apps/Gateway/Gateway/File_Parser.cpp
index be33e9a96d2..07bda87180b 100644
--- a/apps/Gateway/Gateway/File_Parser.cpp
+++ b/apps/Gateway/Gateway/File_Parser.cpp
@@ -15,7 +15,11 @@ typedef FP::Return_Type FP_RETURN_TYPE;
template <class ENTRY> int
File_Parser<ENTRY>::open (const char filename[])
{
- return (this->infile_ = ACE_OS::fopen (filename, "r")) == 0 ? -1 : 0;
+ this->infile_ = ACE_OS::fopen (filename, "r");
+ if (this->infile_ == 0)
+ return -1;
+ else
+ return 0;
}
template <class ENTRY> int
@@ -27,17 +31,13 @@ File_Parser<ENTRY>::close (void)
template <class ENTRY> FP_RETURN_TYPE
File_Parser<ENTRY>::getword (char buf[])
{
- FP_RETURN_TYPE read_result = this->readword(buf);
- if (read_result == FP::SUCCESS)
- return FP::SUCCESS;
- else
- return read_result;
+ return this->readword (buf);
}
// Get the next string from the file via this->readword()
// Check make sure the string forms a valid number.
template <class ENTRY> FP_RETURN_TYPE
-File_Parser<ENTRY>::getint (int &value)
+File_Parser<ENTRY>::getint (ACE_INT32 &value)
{
char buf[BUFSIZ];
FP_RETURN_TYPE read_result = this->readword(buf);
@@ -50,7 +50,7 @@ File_Parser<ENTRY>::getint (int &value)
value = ACE_OS::strtol (buf, &ptr, 10);
// check if the buf is a decimal or not
- if ((value == 0) && (ptr == buf))
+ if (value == 0 && ptr == buf)
return FP::ERROR;
else
return FP::SUCCESS;
@@ -81,8 +81,8 @@ File_Parser<ENTRY>::readword (char buf[])
buf[wordlength] = '\0';
if (c == EOF) {
- // If the EOF is just a dilimeter, don't return EOF so that the
- // word gets processed
+ // If EOF is just a delimiter, don't return EOF so that the word
+ // gets processed.
if (wordlength > 0)
{
ungetc (c, this->infile_);
@@ -94,7 +94,7 @@ File_Parser<ENTRY>::readword (char buf[])
}
else if (c == '\n')
{
- // if the EOLINE is just a dilimeter, don't return EOLINE
+ // if the EOLINE is just a delimiter, don't return EOLINE
// so that the word gets processed
if (wordlength > 0)
ungetc (c, this->infile_);
diff --git a/apps/Gateway/Gateway/File_Parser.h b/apps/Gateway/Gateway/File_Parser.h
index 80b768aff84..f1de7429db0 100644
--- a/apps/Gateway/Gateway/File_Parser.h
+++ b/apps/Gateway/Gateway/File_Parser.h
@@ -52,7 +52,7 @@ protected:
FP::Return_Type getword (char buf[]);
// Read the next ASCII word.
- FP::Return_Type getint (int &value);
+ FP::Return_Type getint (ACE_INT32 &value);
// Read the next integer.
FP::Return_Type readword (char buf[]);
diff --git a/apps/Gateway/Gateway/Gateway.cpp b/apps/Gateway/Gateway/Gateway.cpp
index 2c963ff3d7f..4ff09aed1b7 100644
--- a/apps/Gateway/Gateway/Gateway.cpp
+++ b/apps/Gateway/Gateway/Gateway.cpp
@@ -1,6 +1,8 @@
/* -*- C++ -*- */
// $Id$
+#include "ace/Get_Opt.h"
+#include "Config_Files.h"
#include "ace/Service_Config.h"
#include "Event_Channel.h"
#include "Gateway.h"
@@ -24,6 +26,12 @@ public:
virtual int info (char **, size_t) const;
// Return info about this service.
+ int parse_connection_config_file (void);
+ // Parse the connection configuration file.
+
+ int parse_consumer_config_file (void);
+ // Parse the consumer configuration file.
+
protected:
int handle_input (ACE_HANDLE);
// Shut down the Gateway when input comes in from the controlling
@@ -36,13 +44,21 @@ protected:
// Parse gateway configuration arguments obtained from svc.conf
// file.
- ACE_Event_Channel<SUPPLIER_HANDLER, CONSUMER_HANDLER> event_channel_;
+ char connection_config_file_[MAXPATHLEN + 1];
+ // Name of the connection configuration file.
+
+ char consumer_config_file_[MAXPATHLEN + 1];
+ // Name of the consumer map configuration file.
+
+ ACE_Event_Channel event_channel_;
// The Event Channel routes events from Supplier(s) to Consumer(s).
-};
-// Convenient shorthands.
-// #define IC SUPPLIER_HANDLER
-// #define OC CONSUMER_HANDLER
+ int active_connector_role_;
+ // Enabled if we are playing the role of the active Connector.
+
+ int debug_;
+ // Are we debugging?
+};
int
Gateway::handle_signal (int signum, siginfo_t *, ucontext_t *)
@@ -70,18 +86,76 @@ Gateway::handle_input (ACE_HANDLE h)
return this->handle_signal (h);
}
+// Parse the "command-line" arguments and set the corresponding flags.
+
+int
+Gateway::parse_args (int argc, char *argv[])
+{
+ ACE_OS::strcpy (this->connection_config_file_, "connection_config");
+ ACE_OS::strcpy (this->consumer_config_file_, "consumer_config");
+ this->active_connector_role_ = 1;
+ this->debug_ = 0;
+
+ ACE_Get_Opt get_opt (argc, argv, "bc:dpr:q:w:", 0);
+
+ for (int c; (c = get_opt ()) != -1; )
+ {
+ switch (c)
+ {
+ case 'b': // Use blocking connection establishment.
+ this->event_channel_.options ().blocking_semantics_ = 0;
+ break;
+ case 'c':
+ ACE_OS::strncpy (this->connection_config_file_,
+ get_opt.optarg,
+ sizeof this->connection_config_file_);
+ break;
+ case 'd':
+ this->debug_ = 1;
+ break;
+ case 'p':
+ // We are not playing the active Connector role.
+ this->active_connector_role_ = 0;
+ break;
+ case 'q':
+ this->event_channel_.options ().socket_queue_size_ =
+ ACE_OS::atoi (get_opt.optarg);
+ break;
+ case 'r':
+ ACE_OS::strncpy (this->consumer_config_file_,
+ get_opt.optarg,
+ sizeof this->consumer_config_file_);
+ break;
+ case 'w': // Time performance for a designated amount of time.
+ this->event_channel_.options ().performance_window_ =
+ ACE_OS::atoi (get_opt.optarg);
+ // Use blocking connection semantics so that we get accurate
+ // timings (since all connections start at once).
+ this->event_channel_.options ().blocking_semantics_ = 0;
+ break;
+ default:
+ break;
+ }
+ }
+ return 0;
+}
+
int
Gateway::init (int argc, char *argv[])
{
- if (this->event_channel_.open (argc, argv) == -1)
+ // Initialize the Event_Channel.
+ if (this->event_channel_.open () == -1)
ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "open"), -1);
+ // Parse the "command-line" arguments.
+ this->parse_args (argc, argv);
+
ACE_Sig_Set sig_set;
sig_set.sig_add (SIGINT);
sig_set.sig_add (SIGQUIT);
- // Register ourselves to receive SIGINT and SIGQUIT
- // so we can shut down gracefully via signals.
+ // Register ourselves to receive SIGINT and SIGQUIT so we can shut
+ // down gracefully via signals.
if (ACE_Service_Config::reactor ()->register_handler
(sig_set, this) == -1)
@@ -90,6 +164,20 @@ Gateway::init (int argc, char *argv[])
if (ACE_Service_Config::reactor ()->register_handler
(0, this, ACE_Event_Handler::READ_MASK) == -1)
ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "register_handler"), -1);
+
+ if (this->active_connector_role_)
+ {
+ // Parse the connection configuration file.
+ this->parse_connection_config_file ();
+
+ // Parse the consumer map config file and build the consumer
+ // map.
+ this->parse_consumer_config_file ();
+
+ // Initiate connections with the peers.
+ this->event_channel_.initiate_connections ();
+ }
+
return 0;
}
@@ -120,6 +208,133 @@ Gateway::info (char **strp, size_t length) const
return ACE_OS::strlen (buf);
}
+// Parse and build the connection table.
+
+int
+Gateway::parse_connection_config_file (void)
+{
+ // File that contains the consumer map configuration information.
+ Connection_Config_File_Parser connection_file;
+ Connection_Config_File_Entry entry;
+ int file_empty = 1;
+ int line_number = 0;
+
+ if (connection_file.open (this->connection_config_file_) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", this->connection_config_file_), -1);
+
+ // Read config file one line at a time.
+ while (connection_file.read_entry (entry, line_number) != FP::EOFILE)
+ {
+ file_empty = 0;
+
+ if (this->debug_)
+ ACE_DEBUG ((LM_DEBUG, "(%t) conn id = %d, host = %s, remote port = %d, "
+ "proxy role = %c, max retry timeout = %d, local port = %d\n",
+ entry.conn_id_,
+ entry.host_,
+ entry.remote_port_,
+ entry.proxy_role_,
+ entry.max_retry_delay_,
+ entry.local_port_));
+
+ Proxy_Handler *proxy_handler = 0;
+
+ // Initialize the entry's peer addressing info.
+
+ ACE_INET_Addr remote_addr (entry.remote_port_, entry.host_);
+ ACE_INET_Addr local_addr (entry.local_port_);
+
+ // The next few lines of code are dependent on whether we are
+ // making an Supplier_Proxy or an Consumer_Proxy.
+
+ if (entry.proxy_role_ == 'C') // Configure a Consumer_Proxy.
+ ACE_NEW_RETURN (proxy_handler,
+ CONSUMER_PROXY (this->event_channel_, remote_addr,
+ local_addr, entry.conn_id_),
+ -1);
+ else // proxy_role == 'S', so configure a Supplier_Proxy.
+ ACE_NEW_RETURN (proxy_handler,
+ SUPPLIER_PROXY (this->event_channel_, remote_addr,
+ local_addr, entry.conn_id_),
+ -1);
+
+ // The following code is common to both Supplier_Proxys_ and
+ // Consumer_Proxys.
+
+ // Initialize max timeout.
+ proxy_handler->max_timeout (entry.max_retry_delay_);
+
+ // Bind the new Proxy_Handler to the connection ID.
+ this->event_channel_.bind_proxy (proxy_handler);
+ }
+
+ if (file_empty)
+ ACE_ERROR ((LM_WARNING,
+ "warning: connection proxy_handler configuration file was empty\n"));
+ return 0;
+}
+
+int
+Gateway::parse_consumer_config_file (void)
+{
+ // File that contains the consumer event forwarding information.
+ Consumer_Config_File_Parser consumer_file;
+ Consumer_Config_File_Entry entry;
+ int file_empty = 1;
+ int line_number = 0;
+
+ if (consumer_file.open (this->consumer_config_file_) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", this->consumer_config_file_), -1);
+
+ // Read config file line at a time.
+ while (consumer_file.read_entry (entry, line_number) != FP::EOFILE)
+ {
+ file_empty = 0;
+
+ if (this->debug_)
+ {
+ ACE_DEBUG ((LM_DEBUG, "(%t) conn id = %d, logical id = %d, payload = %d, "
+ "number of consumers = %d\n",
+ entry.conn_id_,
+ entry.supplier_id_,
+ entry.type_,
+ entry.total_consumers_));
+ for (int i = 0; i < entry.total_consumers_; i++)
+ ACE_DEBUG ((LM_DEBUG, "(%t) destination[%d] = %d\n",
+ i, entry.consumers_[i]));
+ }
+
+ Consumer_Dispatch_Set *dispatch_set;
+ ACE_NEW_RETURN (dispatch_set, Consumer_Dispatch_Set, -1);
+
+ Event_Key event_addr (entry.conn_id_,
+ entry.supplier_id_,
+ entry.type_);
+
+ // Add the Consumers to the Dispatch_Set.
+ for (int i = 0; i < entry.total_consumers_; i++)
+ {
+ Proxy_Handler *proxy_handler = 0;
+
+ // Lookup destination and add to Consumer_Dispatch_Set set
+ // if found.
+ if (this->event_channel_.find_proxy (entry.consumers_[i],
+ proxy_handler) != -1)
+ dispatch_set->insert (proxy_handler);
+ else
+ ACE_ERROR ((LM_ERROR, "(%t) not found: destination[%d] = %d\n",
+ i, entry.consumers_[i]));
+ }
+
+ this->event_channel_.subscribe (event_addr, dispatch_set);
+ }
+
+ if (file_empty)
+ ACE_ERROR ((LM_WARNING,
+ "warning: consumer map configuration file was empty\n"));
+ return 0;
+}
+
// The following is a "Factory" used by the ACE_Service_Config and
// svc.conf file to dynamically initialize the state of the Gateway.
diff --git a/apps/Gateway/Gateway/Makefile b/apps/Gateway/Gateway/Makefile
index 0f5ddc07eb0..c3ae8dffe4d 100644
--- a/apps/Gateway/Gateway/Makefile
+++ b/apps/Gateway/Gateway/Makefile
@@ -153,7 +153,7 @@ include $(WRAPPER_ROOT)/include/makeinclude/rules.local.GNU
$(WRAPPER_ROOT)/ace/SOCK_Connector.h \
$(WRAPPER_ROOT)/ace/SOCK_Connector.i \
Event_Forwarding_Discriminator.h Concurrency_Strategies.h Event.h \
- Dispatch_Set.h Gateway.h
+ Consumer_Dispatch_Set.h Gateway.h
.obj/Event_Channel.o .shobj/Event_Channel.so: Event_Channel.cpp \
$(WRAPPER_ROOT)/ace/Get_Opt.h \
$(WRAPPER_ROOT)/ace/ACE.h \
@@ -222,7 +222,7 @@ include $(WRAPPER_ROOT)/include/makeinclude/rules.local.GNU
$(WRAPPER_ROOT)/ace/SOCK_Connector.h \
$(WRAPPER_ROOT)/ace/SOCK_Connector.i \
Event_Forwarding_Discriminator.h Concurrency_Strategies.h Event.h \
- Dispatch_Set.h Event_Channel.h
+ Consumer_Dispatch_Set.h Event_Channel.h
.obj/Event_Forwarding_Discriminator.o .shobj/Event_Forwarding_Discriminator.so: Event_Forwarding_Discriminator.cpp \
Event_Forwarding_Discriminator.h \
$(WRAPPER_ROOT)/ace/Map_Manager.h \
@@ -245,9 +245,9 @@ include $(WRAPPER_ROOT)/include/makeinclude/rules.local.GNU
$(WRAPPER_ROOT)/ace/SV_Semaphore_Complex.i \
$(WRAPPER_ROOT)/ace/Synch_T.h \
$(WRAPPER_ROOT)/ace/Event_Handler.h \
- Event.h Dispatch_Set.h \
+ Event.h Consumer_Dispatch_Set.h \
$(WRAPPER_ROOT)/ace/Set.h
-.obj/Proxy_Handler.o .shobj/Proxy_Handler.so: Proxy_Handler.cpp Dispatch_Set.h \
+.obj/Proxy_Handler.o .shobj/Proxy_Handler.so: Proxy_Handler.cpp Consumer_Dispatch_Set.h \
$(WRAPPER_ROOT)/ace/Set.h \
$(WRAPPER_ROOT)/ace/ACE.h \
$(WRAPPER_ROOT)/ace/OS.h \
@@ -381,7 +381,7 @@ include $(WRAPPER_ROOT)/include/makeinclude/rules.local.GNU
$(WRAPPER_ROOT)/ace/SOCK_Connector.h \
$(WRAPPER_ROOT)/ace/SOCK_Connector.i \
Event_Forwarding_Discriminator.h Concurrency_Strategies.h Event.h \
- Dispatch_Set.h
+ Consumer_Dispatch_Set.h
.obj/Thr_Proxy_Handler.o .shobj/Thr_Proxy_Handler.so: Thr_Proxy_Handler.cpp Thr_Proxy_Handler.h \
Proxy_Handler.h \
$(WRAPPER_ROOT)/ace/Service_Config.h \
@@ -446,7 +446,7 @@ include $(WRAPPER_ROOT)/include/makeinclude/rules.local.GNU
$(WRAPPER_ROOT)/ace/Task_T.h \
Event_Forwarding_Discriminator.h \
$(WRAPPER_ROOT)/ace/Map_Manager.h \
- Concurrency_Strategies.h Event.h Dispatch_Set.h \
+ Concurrency_Strategies.h Event.h Consumer_Dispatch_Set.h \
Proxy_Handler_Connector.h \
$(WRAPPER_ROOT)/ace/Connector.h \
$(WRAPPER_ROOT)/ace/Connector.i
diff --git a/apps/Gateway/Gateway/Proxy_Handler.cpp b/apps/Gateway/Gateway/Proxy_Handler.cpp
index 86e0fff8e41..2f161c171f6 100644
--- a/apps/Gateway/Gateway/Proxy_Handler.cpp
+++ b/apps/Gateway/Gateway/Proxy_Handler.cpp
@@ -1,11 +1,18 @@
// $Id$
-#include "Dispatch_Set.h"
-#include "Proxy_Handler_Connector.h"
+#include "Event_Channel.h"
-// Convenient short-hands.
-#define CO CONDITION
-#define MU MAP_MUTEX
+void
+Proxy_Handler::id (ACE_INT32 id)
+{
+ this->id_ = id;
+}
+
+ACE_INT32
+Proxy_Handler::id (void)
+{
+ return this->id_;
+}
// The total number of bytes sent/received on this Proxy.
@@ -21,36 +28,35 @@ Proxy_Handler::total_bytes (size_t bytes)
this->total_bytes_ += bytes;
}
-Proxy_Handler::Proxy_Handler (Event_Forwarding_Discriminator *efd,
- Proxy_Handler_Connector *ioc,
- ACE_Thread_Manager *thr_mgr,
- int socket_queue_size)
- : ACE_Svc_Handler<ACE_SOCK_STREAM, SYNCH_STRATEGY> (thr_mgr),
- efd_ (efd),
- id_ (-1),
+Proxy_Handler::Proxy_Handler (ACE_Event_Channel &ec,
+ const ACE_INET_Addr &remote_addr,
+ const ACE_INET_Addr &local_addr,
+ ACE_INT32 conn_id)
+ : remote_addr_ (remote_addr),
+ local_addr_ (local_addr),
+ id_ (conn_id),
total_bytes_ (0),
state_ (Proxy_Handler::IDLE),
- connector_ (ioc),
timeout_ (1),
- max_timeout_ (Proxy_Handler::MAX_RETRY_TIMEOUT),
- socket_queue_size_ (socket_queue_size)
+ max_timeout_ (Proxy_Handler::MAX_RETRY_TIMEOUT),
+ event_channel_ (ec)
{
}
-// Set the direction.
+// Set the proxy_role.
void
-Proxy_Handler::direction (char d)
+Proxy_Handler::proxy_role (char d)
{
- this->direction_ = d;
+ this->proxy_role_ = d;
}
-// Get the direction.
+// Get the proxy_role.
char
-Proxy_Handler::direction (void)
+Proxy_Handler::proxy_role (void)
{
- return this->direction_;
+ return this->proxy_role_;
}
// Sets the timeout delay.
@@ -64,9 +70,9 @@ Proxy_Handler::timeout (int to)
this->timeout_ = to;
}
-// Recalculate the current retry timeout delay using exponential
+// Re-calculate the current retry timeout delay using exponential
// backoff. Returns the original timeout (i.e., before the
-// recalculation).
+// re-calculation).
int
Proxy_Handler::timeout (void)
@@ -99,37 +105,16 @@ Proxy_Handler::max_timeout (void)
// Restart connection asynchronously when timeout occurs.
int
-Proxy_Handler::handle_timeout (const ACE_Time_Value &, const void *)
+Proxy_Handler::handle_timeout (const ACE_Time_Value &,
+ const void *)
{
ACE_DEBUG ((LM_DEBUG,
"(%t) attempting to reconnect Proxy_Handler %d with timeout = %d\n",
this->id (), this->timeout_));
- return this->connector_->initiate_connection (this, ACE_Synch_Options::asynch);
-}
-
-// Restart connection (blocking_semantics dicates whether we
-// restart synchronously or asynchronously).
-int
-Proxy_Handler::reinitiate_connection (void)
-{
- // Skip over deactivated descriptors.
- if (this->get_handle () != ACE_INVALID_HANDLE)
- {
- // Make sure to close down peer to reclaim descriptor.
- this->peer ().close ();
-
- ACE_DEBUG ((LM_DEBUG,
- "(%t) scheduling reinitiation of Proxy_Handler %d\n",
- this->id ()));
-
- // Reschedule ourselves to try and connect again.
- if (ACE_Service_Config::reactor ()->schedule_timer
- (this, 0, this->timeout ()) == -1)
- ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n",
- "schedule_timer"), -1);
- }
- return 0;
+ // Delegate the re-connection attempt to the Event Channel.
+ return this->event_channel_.initiate_proxy_connection
+ (this, ACE_Synch_Options::asynch);
}
// Handle shutdown of the Proxy_Handler object.
@@ -141,7 +126,8 @@ Proxy_Handler::handle_close (ACE_HANDLE, ACE_Reactor_Mask)
"(%t) shutting down Proxy_Handler %d on handle %d\n",
this->id (), this->get_handle ()));
- return this->reinitiate_connection ();
+ // Restart the connection, if possible.
+ return this->event_channel_.reinitiate_proxy_connection (this);
}
// Set the state of the Proxy.
@@ -152,66 +138,29 @@ Proxy_Handler::state (Proxy_Handler::State s)
this->state_ = s;
}
-// Perform the first-time initiation of a connection to the peer.
-
-int
-Proxy_Handler::initialize_connection (void)
-{
- this->state_ = Proxy_Handler::ESTABLISHED;
-
- // Restart the timeout to 1.
- this->timeout (1);
-
- // Action that sends the connection id to the peerd.
-
- ACE_INT32 id = htonl (this->id ());
-
- ssize_t n = this->peer ().send ((const void *) &id, sizeof id);
-
- if (n != sizeof id)
- ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n",
- n == 0 ? "peer has closed down unexpectedly" : "send"),
- -1);
- return 0;
-}
-
-// Set the size of the socket queue.
-
-void
-Proxy_Handler::socket_queue_size (void)
-{
- if (this->socket_queue_size_ > 0)
- {
- int option = this->direction_ == 'S' ? SO_RCVBUF : SO_SNDBUF;
-
- if (this->peer ().set_option (SOL_SOCKET, option,
- &this->socket_queue_size_,
- sizeof (int)) == -1)
- ACE_ERROR ((LM_ERROR, "(%t) %p\n", "set_option"));
- }
-}
-
-// Upcall from the ACE_Acceptor::handle_input() that
-// delegates control to our application-specific Proxy_Handler.
+// Upcall from the <ACE_Acceptor> or <ACE_Connector> that delegates
+// control to our Proxy_Handler.
int
-Proxy_Handler::open (void *a)
+Proxy_Handler::open (void *)
{
- ACE_DEBUG ((LM_DEBUG, "(%t) Proxy_Handler's fd = %d\n",
+ ACE_DEBUG ((LM_DEBUG, "(%t) Proxy_Handler's handle = %d\n",
this->peer ().get_handle ()));
- // Set the size of the socket queue.
- this->socket_queue_size ();
-
// Turn on non-blocking I/O.
if (this->peer ().enable (ACE_NONBLOCK) == -1)
ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "enable"), -1);
- // Call down to the base class to activate and register this handler.
- if (this->ACE_Svc_Handler<ACE_SOCK_STREAM, SYNCH_STRATEGY>::open (a) == -1)
- ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "activate"), -1);
+ // Call back to the <Event_Channel> to complete our initialization.
+ else if (this->event_channel_.complete_proxy_connection (this) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "complete_proxy_connection"), -1);
- return this->initialize_connection ();
+ // Register ourselves to receive input events.
+ else if (ACE_Service_Config::reactor ()->register_handler
+ (this, ACE_Event_Handler::READ_MASK) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "register_handler"), -1);
+ else
+ return 0;
}
// Return the current state of the Proxy.
@@ -222,30 +171,6 @@ Proxy_Handler::state (void)
return this->state_;
}
-void
-Proxy_Handler::id (ACE_INT32 id)
-{
- this->id_ = id;
-}
-
-ACE_INT32
-Proxy_Handler::id (void)
-{
- return this->id_;
-}
-
-// Set the peer's address information.
-int
-Proxy_Handler::bind (const ACE_INET_Addr &remote_addr,
- const ACE_INET_Addr &local_addr,
- ACE_INT32 id)
-{
- this->remote_addr_ = remote_addr;
- this->local_addr_ = local_addr;
- this->id_ = id;
- return 0;
-}
-
ACE_INET_Addr &
Proxy_Handler::remote_addr (void)
{
@@ -258,15 +183,13 @@ Proxy_Handler::local_addr (void)
return this->local_addr_;
}
-// Constructor sets the consumer map pointer.
-
-Consumer_Proxy::Consumer_Proxy (Event_Forwarding_Discriminator *efd,
- Proxy_Handler_Connector *ioc,
- ACE_Thread_Manager *thr_mgr,
- int socket_queue_size)
- : Proxy_Handler (efd, ioc, thr_mgr, socket_queue_size)
+Consumer_Proxy::Consumer_Proxy (ACE_Event_Channel &ec,
+ const ACE_INET_Addr &remote_addr,
+ const ACE_INET_Addr &local_addr,
+ ACE_INT32 conn_id)
+ : Proxy_Handler (ec, remote_addr, local_addr, conn_id)
{
- this->direction_ = 'C';
+ this->proxy_role_ = 'C';
this->msg_queue ()->high_water_mark (Consumer_Proxy::MAX_QUEUE_SIZE);
}
@@ -311,7 +234,7 @@ Consumer_Proxy::nonblk_put (ACE_Message_Block *event)
// Try to send the event. If we don't send it all (e.g., due to
// flow control), then re-queue the remainder at the head of the
// Event_List and ask the ACE_Reactor to inform us (via
- // handle_output()) when it is possible to try again.
+ // handle_output()) when it is possible to try again.
ssize_t n = this->send (event);
@@ -325,7 +248,8 @@ Consumer_Proxy::nonblk_put (ACE_Message_Block *event)
}
else if (errno == EWOULDBLOCK) // Didn't manage to send everything.
{
- ACE_DEBUG ((LM_DEBUG, "(%t) queueing activated on handle %d to routing id %d\n",
+ ACE_DEBUG ((LM_DEBUG,
+ "(%t) queueing activated on handle %d to routing id %d\n",
this->get_handle (), this->id ()));
// ACE_Queue in *front* of the list to preserve order.
@@ -354,11 +278,11 @@ Consumer_Proxy::send (ACE_Message_Block *event)
else if (n < len)
// Re-adjust pointer to skip over the part we did send.
event->rd_ptr (n);
- else /* if (n == length) */
+ else // if (n == length)
{
- // The whole event is sent, we can now safely deallocate the
- // buffer. Note that this should decrement a reference count...
- delete event;
+ // The whole event is sent, we now decrement the reference count
+ // (which deletes itself with it reaches 0.
+ event->release ();
errno = 0;
}
this->total_bytes (n);
@@ -389,9 +313,9 @@ Consumer_Proxy::handle_output (ACE_HANDLE)
break;
case -1:
- // We are responsible for freeing an ACE_Message_Block if
+ // We are responsible for releasing an ACE_Message_Block if
// failures occur.
- delete event;
+ event->release ();
ACE_ERROR ((LM_ERROR, "(%t) %p\n", "transmission failure"));
/* FALLTHROUGH */
@@ -436,17 +360,14 @@ Consumer_Proxy::put (ACE_Message_Block *event, ACE_Time_Value *)
(event, (ACE_Time_Value *) &ACE_Time_Value::zero);
}
-// Constructor sets the consumer map pointer and the connector
-// pointer.
-
-Supplier_Proxy::Supplier_Proxy (Event_Forwarding_Discriminator *efd,
- Proxy_Handler_Connector *ioc,
- ACE_Thread_Manager *thr_mgr,
- int socket_queue_size)
+Supplier_Proxy::Supplier_Proxy (ACE_Event_Channel &ec,
+ const ACE_INET_Addr &remote_addr,
+ const ACE_INET_Addr &local_addr,
+ ACE_INT32 conn_id)
: msg_frag_ (0),
- Proxy_Handler (efd, ioc, thr_mgr, socket_queue_size)
+ Proxy_Handler (ec, remote_addr, local_addr, conn_id)
{
- this->direction_ = 'S';
+ this->proxy_role_ = 'S';
this->msg_queue ()->high_water_mark (0);
}
@@ -490,8 +411,7 @@ Supplier_Proxy::recv (ACE_Message_Block *&forward_addr)
ACE_DEBUG ((LM_DEBUG,
"attempted to read %d\n",
header_bytes_left_to_read));
- delete this->msg_frag_;
- this->msg_frag_ = 0;
+ this->msg_frag_ = this->msg_frag_->release ();
return header_received;
}
@@ -508,11 +428,34 @@ Supplier_Proxy::recv (ACE_Message_Block *&forward_addr)
errno = EWOULDBLOCK;
return -1;
}
+
+ // Convert the header into host byte order so that we can access
+ // it directly without having to repeatedly muck with it...
+ event->header_.decode ();
+
+ if (event->header_.len_ > sizeof event->data_)
+ {
+ // This data_ payload is too big!
+ errno = EINVAL;
+ ACE_DEBUG ((LM_DEBUG,
+ "Data payload is too big (%d bytes)\n",
+ event->header_.len_));
+ return -1;
+ }
+
}
- // At this point there is a complete, valid header in msg_frag_
+ // At this point there is a complete, valid header in Event. Now we
+ // need to get the event payload. Due to incomplete reads this may
+ // not be the first time we've read in a fragment for this message.
+ // We account for this here. Note that the first time in here
+ // msg_frag_->wr_ptr() will point to event->data_. Every time we do
+ // a successful fragment read, we advance wr_ptr(). Therefore, by
+ // subtracting how much we've already read from the
+ // event->header_.len_ we complete the data_bytes_left_to_read...
+
ssize_t data_bytes_left_to_read =
- sizeof (Event) - this->msg_frag_->length ();
+ ssize_t (event->header_.len_ - (msg_frag_->wr_ptr () - event->data_));
ssize_t data_received =
this->peer ().recv (this->msg_frag_->wr_ptr (), data_bytes_left_to_read);
@@ -529,8 +472,7 @@ Supplier_Proxy::recv (ACE_Message_Block *&forward_addr)
/* FALLTHROUGH */;
case 0: // Premature EOF.
- delete this->msg_frag_;
- this->msg_frag_ = 0;
+ this->msg_frag_ = this->msg_frag_->release ();
return 0;
default:
@@ -550,23 +492,22 @@ Supplier_Proxy::recv (ACE_Message_Block *&forward_addr)
// Allocate an event forwarding header and chain the data
// portion onto its continuation field.
- forward_addr = new ACE_Message_Block (sizeof (Event_Addr),
+ forward_addr = new ACE_Message_Block (sizeof (Event_Key),
ACE_Message_Block::MB_PROTO,
this->msg_frag_);
if (forward_addr == 0)
{
- delete this->msg_frag_;
- this->msg_frag_ = 0;
+ this->msg_frag_ = this->msg_frag_->release ();
errno = ENOMEM;
return -1;
}
- Event_Addr event_addr (this->id (),
+ Event_Key event_addr (this->id (),
event->header_.supplier_id_,
event->header_.type_);
- // Copy the forwarding address from the Event_Addr into
+ // Copy the forwarding address from the Event_Key into
// forward_addr.
- forward_addr->copy ((char *) &event_addr, sizeof (Event_Addr));
+ forward_addr->copy ((char *) &event_addr, sizeof (Event_Key));
// Reset the pointer to indicate we've got an entire event.
this->msg_frag_ = 0;
@@ -579,8 +520,12 @@ Supplier_Proxy::recv (ACE_Message_Block *&forward_addr)
event->header_.len_, event->data_));
#else
ACE_DEBUG ((LM_DEBUG, "(%t) supplier id = %d, cur len = %d, total bytes read = %d\n",
- event->header_.supplier_id_, event->header_.len_, this->total_bytes ()));
-#endif
+ event->header_.supplier_id_, event->header_.len_, data_received + header_received));
+#endif /* VERBOSE */
+
+ // Encode before returning so that we can set things out in
+ // network byte order.
+ event->header_.encode ();
return data_received + header_received;
}
}
@@ -620,79 +565,17 @@ Supplier_Proxy::handle_input (ACE_HANDLE)
}
}
-// Forward an event to its appropriate Consumer(s).
+// Forward an event to its appropriate Consumer(s). This delegates to
+// the <ACE_Event_Channel> to do the actual forwarding.
int
Supplier_Proxy::forward (ACE_Message_Block *forward_addr)
{
- // We got a valid event, so determine its virtual forwarding
- // address, which is stored in the first of the two event blocks,
- // which are chained together by this->recv().
-
- Event_Addr *forwarding_addr = (Event_Addr *) forward_addr->rd_ptr ();
-
- // Skip over the address portion and get the data.
- const ACE_Message_Block *const data = forward_addr->cont ();
-
- // <dispatch_set> points to the set of Consumers associated with
- // this forwarding address.
- Dispatch_Set *dispatch_set = 0;
-
- if (this->efd_->find (*forwarding_addr, dispatch_set) != -1)
- {
- // Check to see if there are any destinations.
- if (dispatch_set->size () == 0)
- ACE_DEBUG ((LM_WARNING,
- "there are no active destinations for this event currently\n"));
-
- else // There are destinations, so forward the event.
- {
- Dispatch_Set_Iterator dsi (*dispatch_set);
-
- for (Proxy_Handler **proxy_handler = 0;
- dsi.next (proxy_handler) != 0;
- dsi.advance ())
- {
- // Only process active proxy_handlers.
- if ((*proxy_handler)->state () == Proxy_Handler::ESTABLISHED)
- {
- // Clone the event portion (should be doing
- // reference counting here...)
- ACE_Message_Block *newmsg = data->clone ();
-
- ACE_DEBUG ((LM_DEBUG, "(%t) sending to peer %d\n",
- (*proxy_handler)->id ()));
-
- if ((*proxy_handler)->put (newmsg) == -1)
- {
- if (errno == EWOULDBLOCK) // The queue has filled up!
- ACE_ERROR ((LM_ERROR, "(%t) %p\n",
- "gateway is flow controlled, so we're dropping events"));
- else
- ACE_ERROR ((LM_ERROR, "(%t) %p transmission error to peer %d\n",
- "put", (*proxy_handler)->id ()));
-
- // We are responsible for freeing a
- // ACE_Message_Block if failures occur.
- delete newmsg;
- }
- }
- }
- // Will become superfluous once we have reference
- // counting...
- delete forward_addr;
- return 0;
- }
- }
- delete forward_addr;
- // Failure return.
- ACE_ERROR ((LM_DEBUG, "(%t) find failed on conn id = %d, logical id = %d, payload = %d\n",
- forwarding_addr->conn_id_, forwarding_addr->supplier_id_, forwarding_addr->type_));
- return 0;
+ return this->event_channel_.put (forward_addr);
}
#if defined (ACE_TEMPLATES_REQUIRE_SPECIALIZATION)
-template class ACE_Map_Manager<Event_Addr, Dispatch_Set *, MAP_MUTEX>;
-template class ACE_Map_Iterator<Event_Addr, Dispatch_Set *, MAP_MUTEX>;
-template class ACE_Map_Entry<Event_Addr, Dispatch_Set *>;
+template class ACE_Map_Manager<Event_Key, Consumer_Dispatch_Set *, MAP_MUTEX>;
+template class ACE_Map_Iterator<Event_Key, Consumer_Dispatch_Set *, MAP_MUTEX>;
+template class ACE_Map_Entry<Event_Key, Consumer_Dispatch_Set *>;
#endif /* ACE_TEMPLATES_REQUIRE_SPECIALIZATION */
diff --git a/apps/Gateway/Gateway/Proxy_Handler.h b/apps/Gateway/Gateway/Proxy_Handler.h
index d91fa3108ff..ffce18d1c71 100644
--- a/apps/Gateway/Gateway/Proxy_Handler.h
+++ b/apps/Gateway/Gateway/Proxy_Handler.h
@@ -21,11 +21,12 @@
#include "ace/SOCK_Connector.h"
#include "ace/Svc_Handler.h"
#include "Event_Forwarding_Discriminator.h"
-#include "Dispatch_Set.h"
+#include "Consumer_Dispatch_Set.h"
#include "Event.h"
// Forward declaration.
class Proxy_Handler_Connector;
+class ACE_Event_Channel;
class Proxy_Handler : public ACE_Svc_Handler<ACE_SOCK_STREAM, SYNCH_STRATEGY>
// = TITLE
@@ -36,20 +37,15 @@ class Proxy_Handler : public ACE_Svc_Handler<ACE_SOCK_STREAM, SYNCH_STRATEGY>
// Channel from Suppliers and forward them to Consumers.
{
public:
- Proxy_Handler (Event_Forwarding_Discriminator *,
- Proxy_Handler_Connector *,
- ACE_Thread_Manager * = 0,
- int socket_queue_size = 0);
+ Proxy_Handler (ACE_Event_Channel &,
+ const ACE_INET_Addr &remote_addr,
+ const ACE_INET_Addr &local_addr,
+ ACE_INT32 conn_id);
virtual int open (void * = 0);
// Initialize and activate a single-threaded Proxy_Handler (called by
// ACE_Connector::handle_output()).
- int bind (const ACE_INET_Addr &remote_addr,
- const ACE_INET_Addr &local_addr,
- ACE_INT32);
- // Set the peer's addressing and routing information.
-
ACE_INET_Addr &remote_addr (void);
// Returns the peer's routing address.
@@ -82,12 +78,12 @@ public:
void max_timeout (int);
int max_timeout (void);
- // = Set/get direction (i.e., 'S' for Supplier and 'C' for Consumer
+ // = Set/get proxy role (i.e., 'S' for Supplier and 'C' for Consumer
// (necessary for error checking).
- void direction (char);
- char direction (void);
+ void proxy_role (char);
+ char proxy_role (void);
- // = The total number of bytes sent/received on this channel.
+ // = The total number of bytes sent/received on this proxy.
size_t total_bytes (void);
void total_bytes (size_t bytes);
// Increment count by <bytes>.
@@ -101,22 +97,10 @@ protected:
MAX_RETRY_TIMEOUT = 300 // 5 minutes is the maximum timeout.
};
- int initialize_connection (void);
- // Perform the first-time initiation of a connection to the peer.
-
- int reinitiate_connection (void);
- // Reinitiate a connection asynchronously when peers fail.
-
- void socket_queue_size (void);
- // Set the socket queue size.
-
virtual int handle_close (ACE_HANDLE = ACE_INVALID_HANDLE,
- ACE_Reactor_Mask = ACE_Event_Handler::RWE_MASK);
+ ACE_Reactor_Mask = ACE_Event_Handler::ALL_EVENTS_MASK);
// Perform Proxy_Handler termination.
- Event_Forwarding_Discriminator *efd_;
- // Maps Events to a set of Consumers.
-
ACE_INET_Addr remote_addr_;
// Address of peer.
@@ -127,14 +111,10 @@ protected:
// The assigned routing ID of this entry.
size_t total_bytes_;
- // The total number of bytes sent/received on this channel.
+ // The total number of bytes sent/received on this proxy.
State state_;
- // The current state of the channel.
-
- Proxy_Handler_Connector *connector_;
- // Back pointer to Proxy_Handler_Connector to reestablish broken
- // connections.
+ // The current state of the proxy.
int timeout_;
// Amount of time to wait between reconnection attempts.
@@ -142,12 +122,13 @@ protected:
int max_timeout_;
// Maximum amount of time to wait between reconnection attempts.
- char direction_;
- // Indicates which direction data flows through the channel ('S' ==
- // Supplier and 'C' == Consumer).
+ char proxy_role_;
+ // Indicates which role the proxy plays ('S' == Supplier and 'C' ==
+ // Consumer).
- int socket_queue_size_;
- // Size of the socket queue (0 means "use default").
+ ACE_Event_Channel &event_channel_;
+ // Reference to the <ACE_Event_Channel> that we use to forward all
+ // the events from Consumers and Suppliers.
};
class Supplier_Proxy : public Proxy_Handler
@@ -158,13 +139,15 @@ class Supplier_Proxy : public Proxy_Handler
// Performs framing and error checking.
{
public:
- Supplier_Proxy (Event_Forwarding_Discriminator *,
- Proxy_Handler_Connector *,
- ACE_Thread_Manager * = 0,
- int socket_queue_size = 0);
- // Constructor sets the consumer map pointer.
+ // = Initialization method.
+ Supplier_Proxy (ACE_Event_Channel &,
+ const ACE_INET_Addr &remote_addr,
+ const ACE_INET_Addr &local_addr,
+ ACE_INT32 conn_id);
protected:
+ // = All the following methods are upcalls, so they can be protected.
+
virtual int handle_input (ACE_HANDLE = ACE_INVALID_HANDLE);
// Receive and process peer events.
@@ -172,7 +155,8 @@ protected:
// Receive an event from a Supplier.
int forward (ACE_Message_Block *event);
- // Forward the Event to a Consumer.
+ // Forward the <event> to its appropriate Consumer. This delegates
+ // to the <ACE_Event_Channel> to do the actual forwarding.
ACE_Message_Block *msg_frag_;
// Keep track of event fragment to handle non-blocking recv's from
@@ -184,19 +168,22 @@ class Consumer_Proxy : public Proxy_Handler
// Handles transmission of events to Consumers.
//
// = DESCRIPTION
- // Uses a single-threaded approach.
+ // Performs queueing and error checking. Uses a single-threaded
+ // Reactive approach to handle flow control.
{
public:
- Consumer_Proxy (Event_Forwarding_Discriminator *,
- Proxy_Handler_Connector *,
- ACE_Thread_Manager * = 0,
- int socket_queue_size = 0);
-
- virtual int put (ACE_Message_Block *, ACE_Time_Value * = 0);
+ // = Initialization method.
+ Consumer_Proxy (ACE_Event_Channel &,
+ const ACE_INET_Addr &remote_addr,
+ const ACE_INET_Addr &local_addr,
+ ACE_INT32 conn_id);
+
+ virtual int put (ACE_Message_Block *event,
+ ACE_Time_Value * = 0);
// Send an event to a Consumer (may be queued if necessary).
protected:
- // = We'll allow up to 16 megabytes to be queued per-output channel.
+ // = We'll allow up to 16 megabytes to be queued per-output proxy.
enum {MAX_QUEUE_SIZE = 1024 * 1024 * 16};
virtual int handle_output (ACE_HANDLE);
diff --git a/apps/Gateway/Gateway/Proxy_Handler_Connector.cpp b/apps/Gateway/Gateway/Proxy_Handler_Connector.cpp
index 7ac0a77a2d4..dc18eca8500 100644
--- a/apps/Gateway/Gateway/Proxy_Handler_Connector.cpp
+++ b/apps/Gateway/Gateway/Proxy_Handler_Connector.cpp
@@ -18,15 +18,15 @@ Proxy_Handler_Connector::handle_close (ACE_HANDLE sd, ACE_Reactor_Mask)
// Locate the ACE_Svc_Handler corresponding to the socket descriptor.
if (this->handler_map_.find (sd, stp) == -1)
- ACE_ERROR_RETURN ((LM_ERROR, "(%t) can't locate channel %d in map, %p\n",
+ ACE_ERROR_RETURN ((LM_ERROR, "(%t) can't locate proxy %d in connector map, %p\n",
sd, "find"), -1);
- Proxy_Handler *channel = stp->svc_handler ();
+ Proxy_Handler *proxy_handler = stp->svc_handler ();
// Schedule a reconnection request at some point in the future
- // (note that channel uses an exponential backoff scheme).
- if (ACE_Service_Config::reactor ()->schedule_timer (channel, 0,
- channel->timeout ()) == -1)
+ // (note that proxy_handler uses an exponential backoff scheme).
+ if (ACE_Service_Config::reactor ()->schedule_timer
+ (proxy_handler, 0, proxy_handler->timeout ()) == -1)
ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n",
"schedule_timer"), -1);
return 0;
@@ -35,36 +35,37 @@ Proxy_Handler_Connector::handle_close (ACE_HANDLE sd, ACE_Reactor_Mask)
// Initiate (or reinitiate) a connection to the Proxy_Handler.
int
-Proxy_Handler_Connector::initiate_connection (Proxy_Handler *channel,
- ACE_Synch_Options &synch_options)
+Proxy_Handler_Connector::initiate_connection (Proxy_Handler *proxy_handler,
+ ACE_Synch_Options &synch_options)
{
- char buf[MAXHOSTNAMELEN];
+ char addr_buf[MAXHOSTNAMELEN];
// Mark ourselves as idle so that the various iterators
// will ignore us until we are reconnected.
- channel->state (Proxy_Handler::IDLE);
+ proxy_handler->state (Proxy_Handler::IDLE);
- if (channel->remote_addr ().addr_to_string (buf, sizeof buf) == -1
- || channel->local_addr ().addr_to_string (buf, sizeof buf) == -1)
+ // We check the remote addr second so that it remains in the addr_buf.
+ if (proxy_handler->local_addr ().addr_to_string (addr_buf, sizeof addr_buf) == -1
+ || proxy_handler->remote_addr ().addr_to_string (addr_buf, sizeof addr_buf) == -1)
ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n",
"can't obtain peer's address"), -1);
// Try to connect to the Peer.
- if (this->connect (channel, channel->remote_addr (),
- synch_options, channel->local_addr ()) == -1)
+ if (this->connect (proxy_handler, proxy_handler->remote_addr (),
+ synch_options, proxy_handler->local_addr ()) == -1)
{
if (errno != EWOULDBLOCK)
{
- channel->state (Proxy_Handler::FAILED);
+ proxy_handler->state (Proxy_Handler::FAILED);
ACE_DEBUG ((LM_DEBUG, "(%t) %p on address %s\n",
- "connect", buf));
+ "connect", addr_buf));
// Reschedule ourselves to try and connect again.
if (synch_options[ACE_Synch_Options::USE_REACTOR])
{
if (ACE_Service_Config::reactor ()->schedule_timer
- (channel, 0, channel->timeout ()) == 0)
+ (proxy_handler, 0, proxy_handler->timeout ()) == 0)
ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n",
"schedule_timer"), -1);
}
@@ -75,18 +76,18 @@ Proxy_Handler_Connector::initiate_connection (Proxy_Handler *channel,
}
else
{
- channel->state (Proxy_Handler::CONNECTING);
+ proxy_handler->state (Proxy_Handler::CONNECTING);
ACE_DEBUG ((LM_DEBUG,
"(%t) in the process of connecting %s to %s\n",
synch_options[ACE_Synch_Options::USE_REACTOR]
- ? "asynchronously" : "synchronously", buf));
+ ? "asynchronously" : "synchronously", addr_buf));
}
}
else
{
- channel->state (Proxy_Handler::ESTABLISHED);
+ proxy_handler->state (Proxy_Handler::ESTABLISHED);
ACE_DEBUG ((LM_DEBUG, "(%t) connected to %s on %d\n",
- buf, channel->get_handle ()));
+ addr_buf, proxy_handler->get_handle ()));
}
return 0;
}
diff --git a/apps/Gateway/Gateway/Thr_Proxy_Handler.cpp b/apps/Gateway/Gateway/Thr_Proxy_Handler.cpp
index 98722a96295..f316e4e82bf 100644
--- a/apps/Gateway/Gateway/Thr_Proxy_Handler.cpp
+++ b/apps/Gateway/Gateway/Thr_Proxy_Handler.cpp
@@ -1,30 +1,33 @@
-#include "Thr_Proxy_Handler.h"
// $Id$
-#include "Proxy_Handler_Connector.h"
+#include "Event_Channel.h"
+#include "Thr_Proxy_Handler.h"
#if defined (ACE_HAS_THREADS)
-Thr_Consumer_Proxy::Thr_Consumer_Proxy (Event_Forwarding_Discriminator *efd,
- Proxy_Handler_Connector *ioc,
- ACE_Thread_Manager *thr_mgr,
- int socket_queue_size)
- : Consumer_Proxy (efd, ioc, thr_mgr, socket_queue_size)
+Thr_Consumer_Proxy::Thr_Consumer_Proxy (ACE_Event_Channel &ec,
+ const ACE_INET_Addr &remote_addr,
+ const ACE_INET_Addr &local_addr,
+ ACE_INT32 conn_id)
+ : Consumer_Proxy (ec, remote_addr, local_addr, conn_id)
{
}
-// This method should be called only when the peer shuts down
-// unexpectedly. This method marks the Proxy_Handler as having failed and
-// deactivates the ACE_Message_Queue (to wake up the thread blocked on
-// <dequeue_head> in svc()). Thr_Output_Handler::handle_close () will
-// eventually try to reconnect...
+// This method should be called only when the Consumer shuts down
+// unexpectedly. This method marks the Proxy_Handler as having failed
+// and deactivates the ACE_Message_Queue (to wake up the thread
+// blocked on <dequeue_head> in svc()).
+// Thr_Output_Handler::handle_close () will eventually try to
+// reconnect...
int
Thr_Consumer_Proxy::handle_input (ACE_HANDLE h)
{
+ // Call down to the <Consumer_Proxy> to handle this first.
this->Consumer_Proxy::handle_input (h);
- ACE_Service_Config::reactor ()->remove_handler (h,
- ACE_Event_Handler::RWE_MASK
- | ACE_Event_Handler::DONT_CALL);
+
+ ACE_Service_Config::reactor ()->remove_handler
+ (h, ACE_Event_Handler::ALL_EVENTS_MASK | ACE_Event_Handler::DONT_CALL);
+
// Deactivate the queue while we try to get reconnected.
this->msg_queue ()->deactivate ();
return 0;
@@ -36,31 +39,28 @@ Thr_Consumer_Proxy::handle_input (ACE_HANDLE h)
int
Thr_Consumer_Proxy::open (void *)
{
- // Set the size of the socket queue.
- this->socket_queue_size ();
-
// Turn off non-blocking I/O.
if (this->peer ().disable (ACE_NONBLOCK) == -1)
ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "enable"), -1);
+ // Call back to the <Event_Channel> to complete our initialization.
+ else if (this->event_channel_.complete_proxy_connection (this) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "complete_proxy_connection"), -1);
+
// Register ourselves to receive input events (which indicate that
- // the Peer has shut down unexpectedly).
- if (ACE_Service_Config::reactor ()->register_handler (this,
- ACE_Event_Handler::READ_MASK) == -1)
+ // the Consumer has shut down unexpectedly).
+ else if (ACE_Service_Config::reactor ()->register_handler
+ (this, ACE_Event_Handler::READ_MASK) == -1)
ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "register_handler"), -1);
- if (this->initialize_connection ())
- ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n",
- "initialize_connection"), -1);
-
// Reactivate message queue. If it was active then this is the
// first time in and we need to spawn a thread, otherwise the queue
// was inactive due to some problem and we've already got a thread.
- if (this->msg_queue ()->activate () == ACE_Message_Queue<SYNCH_STRATEGY>::WAS_ACTIVE)
+ else if (this->msg_queue ()->activate () == ACE_Message_Queue<SYNCH_STRATEGY>::WAS_ACTIVE)
{
ACE_DEBUG ((LM_DEBUG, "(%t) spawning new thread\n"));
// Become an active object by spawning a new thread to transmit
- // messages to peers.
+ // events to Consumers.
return this->activate (THR_NEW_LWP | THR_DETACHED);
}
else
@@ -70,87 +70,93 @@ Thr_Consumer_Proxy::open (void *)
}
}
-// ACE_Queue up a message for transmission (must not block since all
-// Supplier_Proxys are single-threaded).
+// Queue up an event for transmission (must not block since
+// Supplier_Proxys may be single-threaded).
int
Thr_Consumer_Proxy::put (ACE_Message_Block *mb, ACE_Time_Value *)
{
// Perform non-blocking enqueue.
- return this->msg_queue ()->enqueue_tail (mb, (ACE_Time_Value *) &ACE_Time_Value::zero);
+ return this->msg_queue ()->enqueue_tail
+ (mb, (ACE_Time_Value *) &ACE_Time_Value::zero);
}
-// Transmit messages to the peer (note simplification resulting from
+// Transmit events to the peer (note simplification resulting from
// threads...)
int
Thr_Consumer_Proxy::svc (void)
{
+
for (;;)
{
- ACE_DEBUG ((LM_DEBUG, "(%t) connected! Thr_Consumer_Proxy's fd = %d\n",
+ ACE_DEBUG ((LM_DEBUG,
+ "(%t) connected! Thr_Consumer_Proxy's handle = %d\n",
this->peer ().get_handle ()));
// Since this method runs in its own thread it is OK to block on
// output.
for (ACE_Message_Block *mb = 0;
- this->msg_queue ()->dequeue_head (mb) != -1; )
- if (this->send (mb) == -1)
- ACE_ERROR ((LM_ERROR, "(%t) %p\n", "send failed"));
-
- ACE_ASSERT (errno == ESHUTDOWN);
-
- ACE_DEBUG ((LM_DEBUG, "(%t) shutting down threaded Consumer_Proxy %d on handle %d\n",
- this->id (), this->get_handle ()));
-
- this->peer ().close ();
-
- for (this->timeout (1);
- // Default is to reconnect synchronously.
- this->connector_->initiate_connection (this) == -1; )
- {
- ACE_Time_Value tv (this->timeout ());
- ACE_ERROR ((LM_ERROR,
- "(%t) reattempting connection, sec = %d\n",
- tv.sec ()));
- ACE_OS::sleep (tv);
- }
+ this->msg_queue ()->dequeue_head (mb) != -1;
+ )
+ {
+ if (this->send (mb) == -1)
+ ACE_ERROR ((LM_ERROR, "(%t) %p\n", "send failed"));
+ }
+
+ ACE_ASSERT (errno == ESHUTDOWN);
+
+ ACE_DEBUG ((LM_DEBUG,
+ "(%t) shutting down threaded Consumer_Proxy %d on handle %d\n",
+ this->id (), this->get_handle ()));
+
+ this->peer ().close ();
+
+ for (this->timeout (1);
+ // Default is to reconnect synchronously.
+ this->event_channel_.initiate_proxy_connection (this) == -1; )
+ {
+ ACE_Time_Value tv (this->timeout ());
+
+ ACE_ERROR ((LM_ERROR,
+ "(%t) reattempting connection, sec = %d\n",
+ tv.sec ()));
+
+ ACE_OS::sleep (tv);
+ }
}
return 0;
}
-Thr_Supplier_Proxy::Thr_Supplier_Proxy (Event_Forwarding_Discriminator *efd,
- Proxy_Handler_Connector *ioc,
- ACE_Thread_Manager *thr_mgr,
- int socket_queue_size)
- : Supplier_Proxy (efd, ioc, thr_mgr, socket_queue_size)
+Thr_Supplier_Proxy::Thr_Supplier_Proxy (ACE_Event_Channel &ec,
+ const ACE_INET_Addr &remote_addr,
+ const ACE_INET_Addr &local_addr,
+ ACE_INT32 conn_id)
+ : Supplier_Proxy (ec, remote_addr, local_addr, conn_id)
{
}
int
Thr_Supplier_Proxy::open (void *)
{
- // Set the size of the socket queue.
- this->socket_queue_size ();
-
// Turn off non-blocking I/O.
if (this->peer ().disable (ACE_NONBLOCK) == -1)
ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "enable"), -1);
- if (this->initialize_connection ())
- ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n",
- "initialize_connection"), -1);
+ // Call back to the <Event_Channel> to complete our initialization.
+ else if (this->event_channel_.complete_proxy_connection (this) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "complete_proxy_connection"), -1);
// Reactivate message queue. If it was active then this is the
// first time in and we need to spawn a thread, otherwise the queue
// was inactive due to some problem and we've already got a thread.
- if (this->msg_queue ()->activate () == ACE_Message_Queue<SYNCH_STRATEGY>::WAS_ACTIVE)
+ else if (this->msg_queue ()->activate () == ACE_Message_Queue<SYNCH_STRATEGY>::WAS_ACTIVE)
{
ACE_DEBUG ((LM_DEBUG, "(%t) spawning new thread\n"));
// Become an active object by spawning a new thread to transmit
- // messages to peers.
+ // events to peers.
return this->activate (THR_NEW_LWP | THR_DETACHED);
}
else
@@ -160,7 +166,7 @@ Thr_Supplier_Proxy::open (void *)
}
}
-// Receive messages from a Peer in a separate thread (note reuse of
+// Receive events from a Peer in a separate thread (note reuse of
// existing code!).
int
@@ -168,20 +174,20 @@ Thr_Supplier_Proxy::svc (void)
{
for (;;)
{
- ACE_DEBUG ((LM_DEBUG, "(%t) connected! Thr_Supplier_Proxy's fd = %d\n",
+ ACE_DEBUG ((LM_DEBUG,
+ "(%t) connected! Thr_Supplier_Proxy's handle = %d\n",
this->peer ().get_handle ()));
- // Since this method runs in its own thread and processes
- // messages for one connection it is OK to block on input and
- // output.
+ // Since this method runs in its own thread and processes events
+ // for one connection it is OK to call down to the
+ // <Supplier_Proxy::handle_input> method, which blocks on input.
while (this->handle_input () != -1)
continue;
ACE_DEBUG ((LM_DEBUG,
"(%t) shutting down threaded Supplier_Proxy %d on handle %d\n",
- this->id (),
- this->get_handle ()));
+ this->id (), this->get_handle ()));
this->peer ().close ();
@@ -190,11 +196,12 @@ Thr_Supplier_Proxy::svc (void)
for (this->timeout (1);
// Default is to reconnect synchronously.
- this->connector_->initiate_connection (this) == -1; )
+ this->event_channel_.initiate_proxy_connection (this) == -1; )
{
ACE_Time_Value tv (this->timeout ());
ACE_ERROR ((LM_ERROR,
- "(%t) reattempting connection, sec = %d\n", tv.sec ()));
+ "(%t) reattempting connection, sec = %d\n",
+ tv.sec ()));
ACE_OS::sleep (tv);
}
}
diff --git a/apps/Gateway/Gateway/Thr_Proxy_Handler.h b/apps/Gateway/Gateway/Thr_Proxy_Handler.h
index 8ecced3805c..275bc87b320 100644
--- a/apps/Gateway/Gateway/Thr_Proxy_Handler.h
+++ b/apps/Gateway/Gateway/Thr_Proxy_Handler.h
@@ -25,21 +25,22 @@ class Thr_Consumer_Proxy : public Consumer_Proxy
// Runs each Output Proxy_Handler in a separate thread.
{
public:
- Thr_Consumer_Proxy (Event_Forwarding_Discriminator *,
- Proxy_Handler_Connector *,
- ACE_Thread_Manager *,
- int socket_queue_size);
+ Thr_Consumer_Proxy (ACE_Event_Channel &,
+ const ACE_INET_Addr &remote_addr,
+ const ACE_INET_Addr &local_addr,
+ ACE_INT32 conn_id);
virtual int open (void *);
// Initialize the threaded Consumer_Proxy object and spawn a new
// thread.
- virtual int handle_input (ACE_HANDLE);
- // Called when Peer shutdown unexpectedly.
-
virtual int put (ACE_Message_Block *, ACE_Time_Value * = 0);
// Send a message to a peer.
+protected:
+ virtual int handle_input (ACE_HANDLE);
+ // Called when Peer shutdown unexpectedly.
+
virtual int svc (void);
// Transmit peer messages.
};
@@ -49,14 +50,15 @@ class Thr_Supplier_Proxy : public Supplier_Proxy
// Runs each Input Proxy_Handler in a separate thread.
{
public:
- Thr_Supplier_Proxy (Event_Forwarding_Discriminator *,
- Proxy_Handler_Connector *,
- ACE_Thread_Manager *,
- int socket_queue_size);
+ Thr_Supplier_Proxy (ACE_Event_Channel &,
+ const ACE_INET_Addr &remote_addr,
+ const ACE_INET_Addr &local_addr,
+ ACE_INT32 conn_id);
virtual int open (void *);
// Initialize the object and spawn a new thread.
+protected:
virtual int svc (void);
// Transmit peer messages.
};
diff --git a/apps/Gateway/Gateway/gatewayd.cpp b/apps/Gateway/Gateway/gatewayd.cpp
index 48b6e9a173d..b0af5f7cace 100644
--- a/apps/Gateway/Gateway/gatewayd.cpp
+++ b/apps/Gateway/Gateway/gatewayd.cpp
@@ -17,18 +17,17 @@ main (int argc, char *argv[])
ACE_ERROR ((LM_ERROR, "%p\n%a", "open", 1));
else // Use static binding.
{
- static char *l_argv[3] = { "-d" };
ACE_Service_Object *so = ACE_SVC_INVOKE (Gateway);
- if (so->init (1, l_argv) == -1)
+ if (so->init (argc - 1, argv + 1) == -1)
ACE_ERROR ((LM_ERROR, "%p\n%a", "init", 1));
}
}
// Run forever, performing the configured services until we are shut
- // down by a signal.
+ // down by a SIGINT/SIGQUIT signal.
- ACE_Service_Config::run_reactor_event_loop ();
+ daemon.run_reactor_event_loop ();
return 0;
}
diff --git a/apps/Gateway/Peer/Event.h b/apps/Gateway/Peer/Event.h
index 24881c3e85b..5e288edf910 100644
--- a/apps/Gateway/Peer/Event.h
+++ b/apps/Gateway/Peer/Event.h
@@ -23,7 +23,7 @@
// Proxy_Handler in the Gateway.
typedef ACE_INT32 ACE_INT32;
-class Event_Addr
+class Event_Key
// = TITLE
// Address used to identify the source/destination of an event.
//
@@ -33,14 +33,14 @@ class Event_Addr
// Channel from the format of the data.
{
public:
- Event_Addr (ACE_INT32 cid = -1,
+ Event_Key (ACE_INT32 cid = -1,
u_char sid = 0,
u_char type = 0)
: conn_id_ (cid),
supplier_id_ (sid),
type_ (type) {}
- int operator== (const Event_Addr &event_addr) const
+ int operator== (const Event_Key &event_addr) const
{
return this->conn_id_ == event_addr.conn_id_
&& this->supplier_id_ == event_addr.supplier_id_
@@ -58,10 +58,13 @@ public:
// Event type.
};
-
class Event_Header
// = TITLE
- // Fixed sized header.
+ // Fixed sized header.
+ //
+ // = DESCRIPTION
+ // This is designed to have a sizeof (16) to avoid alignment
+ // problems on most platforms.
{
public:
typedef ACE_INT32 SUPPLIER_ID;
@@ -72,14 +75,35 @@ public:
INVALID_ID = -1 // No peer can validly use this number.
};
+ void decode (void)
+ {
+ this->len_ = ntohl (this->len_);
+ this->supplier_id_ = ntohl (this->supplier_id_);
+ this->type_ = ntohl (this->type_);
+ this->priority_ = ntohl (this->priority_);
+ }
+ // Decode from network byte order to host byte order.
+
+ void encode (void)
+ {
+ this->len_ = htonl (this->len_);
+ this->supplier_id_ = htonl (this->supplier_id_);
+ this->type_ = htonl (this->type_);
+ this->priority_ = htonl (this->priority_);
+ }
+ // Encode from host byte order to network byte order.
+
+ size_t len_;
+ // Length of the data_ payload, in bytes.
+
SUPPLIER_ID supplier_id_;
// Source ID.
ACE_INT32 type_;
// Event type.
- size_t len_;
- // Length of the entire event (including data payload) in bytes.
+ ACE_INT32 priority_;
+ // Event priority.
};
class Event
diff --git a/apps/Gateway/Peer/peerd.cpp b/apps/Gateway/Peer/peerd.cpp
index 3b7bdb0cb2d..ab59567fc08 100644
--- a/apps/Gateway/Peer/peerd.cpp
+++ b/apps/Gateway/Peer/peerd.cpp
@@ -17,12 +17,9 @@ main (int argc, char *argv[])
ACE_ERROR ((LM_ERROR, "%p\n%a", "open", 1));
else // Use static binding.
{
- static char *l_argv[3] = { "-d", "-p", "10002" };
-
- ACE_Service_Object *so = _make_Peer_Acceptor ();
-
+ ACE_Service_Object *so = ACE_SVC_INVOKE (Peer_Acceptor);
- if (so->init (3, l_argv) == -1)
+ if (so->init (argc - 1, argv + 1) == -1)
ACE_ERROR ((LM_ERROR, "%p\n%a", "init", 1));
}
}
diff --git a/apps/Orbix-Examples/Event_Comm/Consumer/Notification_Receiver_Handler.cpp b/apps/Orbix-Examples/Event_Comm/Consumer/Notification_Receiver_Handler.cpp
index 5eca7a7e853..31164061eea 100644
--- a/apps/Orbix-Examples/Event_Comm/Consumer/Notification_Receiver_Handler.cpp
+++ b/apps/Orbix-Examples/Event_Comm/Consumer/Notification_Receiver_Handler.cpp
@@ -1,6 +1,6 @@
-#include "Notification_Receiver_Handler.h"
// $Id$
+#include "Notification_Receiver_Handler.h"
#if defined (ACE_HAS_ORBIX)
@@ -108,7 +108,7 @@ Notification_Receiver_Handler::notifier (void)
Notification_Receiver_Handler::~Notification_Receiver_Handler (void)
{
- this->handle_close (-1, ACE_Event_Handler::RWE_MASK);
+ this->handle_close (-1, ACE_Event_Handler::ALL_EVENTS_MASK);
}
#endif /* ACE_HAS_ORBIX */
diff --git a/examples/ASX/Event_Server/Event_Server/Peer_Router.cpp b/examples/ASX/Event_Server/Event_Server/Peer_Router.cpp
index ebe56b2ff9c..38e2977140d 100644
--- a/examples/ASX/Event_Server/Event_Server/Peer_Router.cpp
+++ b/examples/ASX/Event_Server/Event_Server/Peer_Router.cpp
@@ -66,7 +66,7 @@ Peer_Handler<ROUTER, KEY>::svc (void)
{
#if 0
ACE_Thread_Control thread_control (tm);
- // Just a try !! we're just reading from our Message_Queue
+
ACE_Message_Block *db, *hb;
int n;
diff --git a/examples/ASX/Message_Queue/bounded_buffer.cpp b/examples/ASX/Message_Queue/bounded_buffer.cpp
index 08df53c8b93..194fbf07280 100644
--- a/examples/ASX/Message_Queue/bounded_buffer.cpp
+++ b/examples/ASX/Message_Queue/bounded_buffer.cpp
@@ -88,7 +88,7 @@ static void *consumer (ACE_Message_Queue<ACE_MT_SYNCH> *msg_queue)
if (length > 0)
ACE_OS::write (ACE_STDOUT, mb->rd_ptr (), length);
- delete mb;
+ mb->release ();
if (length == 0)
break;
diff --git a/examples/ASX/Message_Queue/buffer_stream.cpp b/examples/ASX/Message_Queue/buffer_stream.cpp
index cc2c475bef1..7d0fcef4160 100644
--- a/examples/ASX/Message_Queue/buffer_stream.cpp
+++ b/examples/ASX/Message_Queue/buffer_stream.cpp
@@ -163,7 +163,7 @@ Consumer::svc (void)
if (length > 0)
ACE_OS::write (ACE_STDOUT, mb->rd_ptr (), length);
- delete mb;
+ mb->release ();
if (length == 0)
break;
diff --git a/examples/ASX/Message_Queue/priority_buffer.cpp b/examples/ASX/Message_Queue/priority_buffer.cpp
index b80c23234c3..2d057fd69c2 100644
--- a/examples/ASX/Message_Queue/priority_buffer.cpp
+++ b/examples/ASX/Message_Queue/priority_buffer.cpp
@@ -44,7 +44,7 @@ consumer (ACE_Message_Queue<ACE_MT_SYNCH> *msg_queue)
// Free up the buffer memory and the Message_Block.
ACE_Service_Config::alloc ()->free (mb->rd_ptr ());
- delete mb;
+ mb->release ();
if (length == 0)
break;
@@ -95,8 +95,8 @@ producer (ACE_Message_Queue<ACE_MT_SYNCH> *msg_queue)
mb->msg_priority (rb.size ());
mb->wr_ptr (rb.size ());
- ACE_DEBUG ((LM_DEBUG, "enqueueing message of size %d\n",
- mb->msg_priority ()));
+ ACE_DEBUG ((LM_DEBUG, "enqueueing message of size %d\n",
+ mb->msg_priority ()));
// Enqueue in priority order.
if (msg_queue->enqueue_prio (mb) == -1)
ACE_ERROR ((LM_ERROR, "(%t) %p\n", "put_next"));
diff --git a/examples/ASX/UPIPE_Event_Server/Peer_Router.cpp b/examples/ASX/UPIPE_Event_Server/Peer_Router.cpp
index 23d9f6c7a35..f17560ad0e6 100644
--- a/examples/ASX/UPIPE_Event_Server/Peer_Router.cpp
+++ b/examples/ASX/UPIPE_Event_Server/Peer_Router.cpp
@@ -140,7 +140,7 @@ Peer_Handler<ROUTER, KEY>::handle_input (ACE_HANDLE h)
ACE_DEBUG ((LM_DEBUG, "(%t) input arrived on sd %d\n", h));
// ACE_Service_Config::reactor ()->remove_handler(h,
-// ACE_Event_Handler::RWE_MASK
+// ACE_Event_Handler::ALL_EVENTS_MASK
// |ACE_Event_Handler::DONT_CALL);
// this method should be called only if the peer shuts down
// so we deactivate our ACE_Message_Queue to awake our svc thread
diff --git a/examples/Connection/blocking/SPIPE-connector.h b/examples/Connection/blocking/SPIPE-connector.h
index 6a6fc97976f..8dd26a32e20 100644
--- a/examples/Connection/blocking/SPIPE-connector.h
+++ b/examples/Connection/blocking/SPIPE-connector.h
@@ -26,7 +26,7 @@ public:
// = Demultiplexing hooks.
virtual int handle_input (ACE_HANDLE);
virtual int handle_close (ACE_HANDLE handle = ACE_INVALID_HANDLE,
- ACE_Reactor_Mask mask = ACE_Event_Handler::RWE_MASK);
+ ACE_Reactor_Mask mask = ACE_Event_Handler::ALL_EVENTS_MASK);
virtual ACE_HANDLE get_handle (void) const;
diff --git a/examples/Logger/simple-server/Logging_Handler.cpp b/examples/Logger/simple-server/Logging_Handler.cpp
index ceb88ac14da..8f6b435089b 100644
--- a/examples/Logger/simple-server/Logging_Handler.cpp
+++ b/examples/Logger/simple-server/Logging_Handler.cpp
@@ -136,5 +136,5 @@ int
Logging_Handler::close (void)
{
return this->handle_close (ACE_INVALID_HANDLE,
- ACE_Event_Handler::RWE_MASK);
+ ACE_Event_Handler::ALL_EVENTS_MASK);
}
diff --git a/netsvcs/lib/TS_Clerk_Handler.cpp b/netsvcs/lib/TS_Clerk_Handler.cpp
index e0142256ab0..2e84117e592 100644
--- a/netsvcs/lib/TS_Clerk_Handler.cpp
+++ b/netsvcs/lib/TS_Clerk_Handler.cpp
@@ -74,7 +74,7 @@ public:
// Return the handle of the message_fifo_;
virtual int handle_close (ACE_HANDLE = ACE_INVALID_HANDLE,
- ACE_Reactor_Mask = ACE_Event_Handler::RWE_MASK);
+ ACE_Reactor_Mask = ACE_Event_Handler::ALL_EVENTS_MASK);
// Called when object is removed from the ACE_Reactor
virtual int handle_input (ACE_HANDLE);