summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--ChangeLog-97a82
-rw-r--r--README1
-rw-r--r--ace/Activation_Queue.cpp2
-rw-r--r--ace/IO_Cntl_Msg.cpp4
-rw-r--r--ace/Message_Block.cpp401
-rw-r--r--ace/Message_Block.h48
-rw-r--r--ace/Message_Queue.cpp20
-rw-r--r--ace/Module.cpp13
-rw-r--r--ace/OS.h12
-rw-r--r--ace/ReactorEx.cpp9
-rw-r--r--ace/Service_Object.cpp26
-rw-r--r--ace/Service_Object.h3
-rw-r--r--ace/Service_Object.i6
-rw-r--r--ace/Service_Record.cpp10
-rw-r--r--ace/Service_Record.i2
-rw-r--r--ace/Singleton.cpp5
-rw-r--r--ace/Singleton.h2
-rw-r--r--ace/Stream.cpp30
-rw-r--r--ace/Stream_Modules.cpp40
-rw-r--r--ace/Synch_T.i136
-rw-r--r--ace/UPIPE_Stream.cpp10
-rw-r--r--apps/Gateway/Gateway/Event_Channel.cpp28
-rw-r--r--apps/Gateway/Gateway/Event_Channel.h7
-rw-r--r--apps/Gateway/Gateway/Gateway.cpp2
-rw-r--r--examples/ASX/Event_Server/Event_Server/Peer_Router.cpp38
-rw-r--r--examples/ASX/UPIPE_Event_Server/Peer_Router.cpp2
-rw-r--r--examples/IPC_SAP/UPIPE_SAP/ex1.cpp2
-rw-r--r--examples/IPC_SAP/UPIPE_SAP/ex2.cpp2
-rw-r--r--examples/Reactor/Misc/test_demuxing.cpp2
-rw-r--r--examples/Reactor/ReactorEx/test_reactorEx.cpp22
-rw-r--r--examples/Reactor/WFMO_Reactor/test_reactorEx.cpp22
-rw-r--r--examples/Service_Configurator/IPC-tests/server/Handle_Thr_Stream.cpp13
-rw-r--r--examples/Threads/barrier2.cpp6
-rw-r--r--examples/Threads/thread_pool.cpp2
-rw-r--r--netsvcs/servers/main.cpp18
-rw-r--r--tests/Buffer_Stream_Test.cpp2
-rw-r--r--tests/Future_Test.cpp16
-rw-r--r--tests/Priority_Buffer_Test.cpp2
-rw-r--r--tests/Reactors_Test.cpp2
-rw-r--r--tests/Thread_Pool_Test.cpp171
-rw-r--r--tests/UPIPE_SAP_Test.cpp2
41 files changed, 771 insertions, 452 deletions
diff --git a/ChangeLog-97a b/ChangeLog-97a
index fa53d88624c..bf1a2eda28f 100644
--- a/ChangeLog-97a
+++ b/ChangeLog-97a
@@ -1,5 +1,87 @@
+Fri Jan 3 10:47:15 1997 Douglas C. Schmidt <schmidt@tango.cs.wustl.edu>
+
+ * ace: Replaced all uses of "delete mb" with mb->release ();
+
+ * ace/Stream_Modules.cpp: Replaced the use of explicit bit
+ twiddling with the ACE_BIT* macros.
+
+ * ace/Message_Block.cpp: Make sure that we use the
+ allocator_strategy_ to create the memory for the reference count
+ since this may need to go into shared memory if that's the
+ memory pool where the Message_Block allocations are coming from.
+
+ * ace/OS.h: Added two new macros, ACE_ALLOCATOR_RETURN and
+ ACE_ALLOCATOR, which are similar to ACE_NEW_RETURN and ACE_NEW,
+ except that these
+
+ * ace/Message_Block.cpp (release): Make sure to "delete this"
+ outside the scope of the locking_strategy_.
+
+ * ace/Service_Object.cpp: Added a destructor to ACE_Service_Type.
+ Thanks to Per.Andersson@hfera.ericsson.se (Per Andersson) for
+ suggesting this.
+
+ * ace/Service_Object.i: Be smarter about how we reassign the name_
+ pointer, i.e., delete the old one and make a copy. Thanks to
+ Per.Andersson@hfera.ericsson.se (Per Andersson) for reporting
+ this.
+
+ * ace/Module.cpp (open): Rearranged the assignments to
+ reader_q->mod_ and writer_q->mod_ so that we don't try to
+ initialize through NULL pointers. Thanks to
+ Per.Andersson@hfera.ericsson.se (Per Andersson) for reporting
+ this.
+
+ * ace/Service_Record.cpp (ACE_Service_Record): Initialized name_
+ to NULL so that the following change works correctly now.
+ Thanks to Per.Andersson@hfera.ericsson.se (Per Andersson) for
+ reporting this.
+
+ * ace/Service_Record.i (name): Make sure to delete [] (char *)
+ this->name_ before allocating a new one. Thanks to
+ Per.Andersson@hfera.ericsson.se (Per Andersson) for reporting
+ this.
+
+ * ace/Message_Block: Reworked the reference counting implemention
+ so that reference counts are shared correctly amongst their
+ various owners. This requires making a deep copy the "header"
+ portion, but a shallow copy of the "data."
+
+ * ace/Message_Block.cpp (ACE_Message_Block): Updated all three
+ ACE_Message_Block constructors so that they all call the init()
+ method. This centralizes all the initialization logic in one
+ place.
+
Thu Jan 2 00:42:21 1997 Douglas C. Schmidt <schmidt@tango.cs.wustl.edu>
+ * ace/Message_Block.cpp (ACE_Message_Block): Make sure to set the
+ cont_ field to 0 after "releasing" it so that we don't
+ mistakenly think it's still around later on. This problem arose
+ in the ACE_Message_Queue::close() method, which was trying to
+ count the number of bytes being freed.
+
+ * ace/Message_Queue.cpp (close): Fixed a subtle bug where we
+ weren't actually deleting messages from the
+ ACE_Message_Queue::close() routine. This should work now...
+
+ * ace/Message_Queue.cpp (close): Replaced the use of "delete mb"
+ with "mb->release()" since the Message_Blocks are now reference
+ counted.
+
+ * ace/Message_Block: Enhanced the reference counting scheme so
+ that you can increment and decrement the count by an arbitrary
+ amount. This is particular useful when you know you'll be
+ sending the same Message_Block to N consumers.
+
+ * ace/Singleton: The dump() must be used same as instance()
+ (without supplying an object) so it must be declarated *static*,
+ i.e.,
+
+ static void dump (void);
+
+ Thanks to Sandro Doro <alex@aureus.sublink.org> for reporting
+ this.
+
* examples/ASX/Event_Server: Completely rewrote and retested the
ACE Event Server example. The new code is *much* easier to
understand, has many more comments, is more robust, and compiles
diff --git a/README b/README
index 21f495eeb92..484a030392f 100644
--- a/README
+++ b/README
@@ -672,6 +672,7 @@ Wayne Vucenic <wvucenic@netgate.net>
Harry Gunnarsson <hg@carmenta.se>
James CE Johnson <jcej@lads.com>
Samuel_Bercovici <Samuel_Bercovici_at_EFT__AD2@mail.icomverse.com>
+Per.Andersson@hfera.ericsson.se (Per Andersson)
I would particularly like to thank Paul Stephenson, who worked with me
at Ericsson and is now at ObjectSpace. Paul devised the recursive
diff --git a/ace/Activation_Queue.cpp b/ace/Activation_Queue.cpp
index c7a78713173..0544075fc6f 100644
--- a/ace/Activation_Queue.cpp
+++ b/ace/Activation_Queue.cpp
@@ -53,7 +53,7 @@ ACE_Activation_Queue::dequeue (ACE_Time_Value *tv)
ACE_Method_Object *mo = (ACE_Method_Object *) mb->base ();
// Delete the message block.
- delete mb;
+ mb->release ();
return mo;
}
else
diff --git a/ace/IO_Cntl_Msg.cpp b/ace/IO_Cntl_Msg.cpp
index 2b0d38b8355..ff27954a2aa 100644
--- a/ace/IO_Cntl_Msg.cpp
+++ b/ace/IO_Cntl_Msg.cpp
@@ -2,7 +2,9 @@
// $Id$
#if 0
-/* Forward decl */
+// This is not meant to be used, it's just a place holder...
+
+// Forward decl
template <class SYNCH> class ACE_Module;
diff --git a/ace/Message_Block.cpp b/ace/Message_Block.cpp
index df4dc918cc6..647230929f9 100644
--- a/ace/Message_Block.cpp
+++ b/ace/Message_Block.cpp
@@ -68,77 +68,6 @@ ACE_Message_Block::dump (void) const
ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP));
}
-ACE_Message_Block::ACE_Message_Block (void)
- : flags_ (0),
- base_ (0),
- cur_size_ (0),
- max_size_ (0),
- rd_ptr_ (0),
- wr_ptr_ (0),
- type_ (MB_NORMAL),
- priority_ (0),
- cont_ (0),
- next_ (0),
- prev_ (0),
- allocator_strategy_ (0),
- locking_strategy_ (0),
- reference_count_ (1)
-
-{
- ACE_TRACE ("ACE_Message_Block::ACE_Message_Block");
-}
-
-ACE_Message_Block::ACE_Message_Block (size_t sz,
- ACE_Message_Type msg_type,
- ACE_Message_Block *msg_cont,
- const char *msg_data,
- ACE_Allocator *allocator_strategy,
- ACE_Lock *locking_strategy)
-{
- ACE_TRACE ("ACE_Message_Block::ACE_Message_Block");
-
- if (this->init (sz, msg_type, msg_cont, msg_data,
- allocator_strategy, locking_strategy) == -1)
- ACE_ERROR ((LM_ERROR, "ACE_Message_Block"));
-}
-
-ACE_Message_Block::~ACE_Message_Block (void)
-{
- ACE_TRACE ("ACE_Message_Block::~ACE_Message_Block");
-
- if (ACE_BIT_DISABLED (this->flags_, ACE_Message_Block::DONT_DELETE))
- {
- if (this->allocator_strategy_)
- this->allocator_strategy_->free ((void *) this->base_);
- else
- delete [] this->base_;
- }
- if (this->cont_)
- this->cont_->release ();
- this->prev_ = 0;
- this->next_ = 0;
-}
-
-ACE_Message_Block::ACE_Message_Block (const char *data,
- size_t size)
- : flags_ (ACE_Message_Block::DONT_DELETE),
- base_ ((char *) data),
- cur_size_ (size),
- max_size_ (size),
- rd_ptr_ (0),
- wr_ptr_ (0),
- type_ (MB_NORMAL),
- priority_ (0),
- cont_ (0),
- next_ (0),
- prev_ (0),
- allocator_strategy_ (0),
- locking_strategy_ (0),
- reference_count_ (1)
-{
- ACE_TRACE ("ACE_Message_Block::ACE_Message_Block");
-}
-
int
ACE_Message_Block::size (size_t length)
{
@@ -154,14 +83,9 @@ ACE_Message_Block::size (size_t length)
if (this->allocator_strategy_ == 0)
ACE_NEW_RETURN (buf, char[length], -1);
else // Use the allocator!
- {
- buf = (char *) this->allocator_strategy_->malloc (length);
- if (buf == 0)
- {
- errno = ENOMEM;
- return -1;
- }
- }
+ ACE_ALLOCATOR_RETURN (buf,
+ (char *) this->allocator_strategy_->malloc (length),
+ -1);
if (ACE_BIT_DISABLED (this->flags_, ACE_Message_Block::DONT_DELETE))
{
@@ -188,21 +112,80 @@ ACE_Message_Block::size (size_t length)
return 0;
}
-int
-ACE_Message_Block::init (const char *data,
- size_t size)
+ACE_Message_Block::~ACE_Message_Block (void)
{
- ACE_TRACE ("ACE_Message_Block::init");
- // Should we also initialize all the other fields, as well?
- this->base_ = (char *) data;
- this->cur_size_ = size;
- this->max_size_ = size;
- ACE_SET_BITS (this->flags_, ACE_Message_Block::DONT_DELETE);
- return 0;
+ ACE_TRACE ("ACE_Message_Block::~ACE_Message_Block");
+
+ // Sanity check...
+ ACE_ASSERT ((*this->reference_count_ <= 1));
+
+ // Just to be safe...
+ *this->reference_count_ = 0;
+
+ if (ACE_BIT_DISABLED (this->flags_, ACE_Message_Block::DONT_DELETE))
+ {
+ if (this->allocator_strategy_)
+ this->allocator_strategy_->free ((void *) this->base_);
+ else
+ delete [] this->base_;
+
+ this->base_ = 0;
+ }
+
+ // Free up all the continuation messages.
+ if (this->cont_)
+ {
+ this->cont_->release ();
+ this->cont_ = 0;
+ }
+
+ this->prev_ = 0;
+ this->next_ = 0;
+
+ if (this->allocator_strategy_ == 0)
+ delete this->reference_count_;
+ else
+ this->allocator_strategy_->free ((void *) this->reference_count_);
+
+ this->reference_count_ = 0;
+}
+
+ACE_Message_Block::ACE_Message_Block (const char *data,
+ size_t size)
+{
+ ACE_TRACE ("ACE_Message_Block::ACE_Message_Block");
+
+ if (this->init_i (size, MB_NORMAL, 0, data,
+ 0, 0, 0, ACE_Message_Block::DONT_DELETE) == -1)
+ ACE_ERROR ((LM_ERROR, "ACE_Message_Block"));
+}
+
+ACE_Message_Block::ACE_Message_Block (size_t size,
+ ACE_Message_Type msg_type,
+ ACE_Message_Block *msg_cont,
+ const char *msg_data,
+ ACE_Allocator *allocator_strategy,
+ ACE_Lock *locking_strategy)
+{
+ ACE_TRACE ("ACE_Message_Block::ACE_Message_Block");
+
+ if (this->init_i (size, msg_type, msg_cont, msg_data,
+ allocator_strategy, locking_strategy,
+ 0, ACE_Message_Block::DONT_DELETE) == -1)
+ ACE_ERROR ((LM_ERROR, "ACE_Message_Block"));
+}
+
+ACE_Message_Block::ACE_Message_Block (void)
+{
+ ACE_TRACE ("ACE_Message_Block::ACE_Message_Block");
+
+ if (this->init_i (0, MB_NORMAL, 0, 0, 0, 0, 0,
+ ACE_Message_Block::DONT_DELETE) == -1)
+ ACE_ERROR ((LM_ERROR, "ACE_Message_Block"));
}
int
-ACE_Message_Block::init (size_t sz,
+ACE_Message_Block::init (size_t size,
ACE_Message_Type msg_type,
ACE_Message_Block *msg_cont,
const char *msg_data,
@@ -210,34 +193,73 @@ ACE_Message_Block::init (size_t sz,
ACE_Lock *locking_strategy)
{
ACE_TRACE ("ACE_Message_Block::init");
- this->flags_ = 0;
+
+ return this->init_i (size, msg_type, msg_cont, msg_data,
+ allocator_strategy, locking_strategy,
+ 0, msg_data ? ACE_Message_Block::DONT_DELETE : 0);
+}
+
+int
+ACE_Message_Block::init (const char *data,
+ size_t size)
+{
+ ACE_TRACE ("ACE_Message_Block::init");
+ // Should we also initialize all the other fields, as well?
+
+ return this->init_i (size, MB_NORMAL, 0, data,
+ 0, 0, 0,
+ ACE_Message_Block::DONT_DELETE);
+}
+
+ACE_Message_Block::ACE_Message_Block (size_t size,
+ ACE_Message_Type msg_type,
+ ACE_Message_Block *msg_cont,
+ const char *msg_data,
+ ACE_Allocator *allocator_strategy,
+ ACE_Lock *locking_strategy,
+ int *reference_count,
+ Message_Flags flags)
+{
+ ACE_TRACE ("ACE_Message_Block::ACE_Message_Block");
+
+ ACE_TRACE ("ACE_Message_Block::ACE_Message_Block");
+
+ if (this->init_i (size, msg_type, msg_cont, msg_data,
+ allocator_strategy, locking_strategy,
+ reference_count, flags) == -1)
+ ACE_ERROR ((LM_ERROR, "ACE_Message_Block"));
+}
+
+int
+ACE_Message_Block::init_i (size_t size,
+ ACE_Message_Type msg_type,
+ ACE_Message_Block *msg_cont,
+ const char *msg_data,
+ ACE_Allocator *allocator_strategy,
+ ACE_Lock *locking_strategy,
+ int *reference_count,
+ Message_Flags flags)
+{
+ ACE_TRACE ("ACE_Message_Block::init_i");
+
+ this->flags_ = flags;
if (msg_data == 0)
{
- if (allocator_strategy == 0)
- {
- this->allocator_strategy_ = 0;
- ACE_NEW_RETURN (this->base_, char[sz], -1);
- }
+ this->allocator_strategy_ = allocator_strategy;
+
+ if (this->allocator_strategy_ == 0)
+ ACE_NEW_RETURN (this->base_, char[size], -1);
else // Use the allocator!
- {
- this->allocator_strategy_ = allocator_strategy;
- this->base_ = (char *) this->allocator_strategy_->malloc (sz);
- if (this->base_ == 0)
- {
- errno = ENOMEM;
- return -1;
- }
- }
+ ACE_ALLOCATOR_RETURN (this->base_,
+ (char *) this->allocator_strategy_->malloc (size),
+ -1);
}
else
- {
- this->base_ = (char *) msg_data;
- ACE_SET_BITS (this->flags_, ACE_Message_Block::DONT_DELETE);
- }
+ this->base_ = (char *) msg_data;
- this->cur_size_ = sz;
- this->max_size_ = sz;
+ this->cur_size_ = size;
+ this->max_size_ = size;
this->rd_ptr_ = this->base_;
this->wr_ptr_ = this->base_;
this->priority_ = 0;
@@ -246,73 +268,68 @@ ACE_Message_Block::init (size_t sz,
this->next_ = 0;
this->prev_ = 0;
this->locking_strategy_ = locking_strategy;
- this->reference_count_ = 1;
- return 0;
-}
-
-ACE_Message_Block *
-ACE_Message_Block::clone (Message_Flags mask) const
-{
- // You always want to clear this one to prevent memory leaks but you
- // might add some others later.
- const Message_Flags always_clear = ACE_Message_Block::DONT_DELETE;
-
- ACE_TRACE ("ACE_Message_Block::clone");
- ACE_Message_Block *nb;
-
- ACE_NEW_RETURN (nb,
- ACE_Message_Block (this->max_size_, this->type_,
- 0, 0, this->allocator_strategy_),
- 0);
-
- ACE_OS::memcpy (nb->base_, this->base_, this->max_size_);
+ this->reference_count_ = 0;
- nb->rd_ptr (this->rd_ptr_ - this->base_);
- nb->wr_ptr (this->wr_ptr_ - this->base_);
-
- // Set new flags minus the mask...
- nb->set_flags (this->flags ());
- nb->clr_flags (mask | always_clear);
+ if (reference_count == 0)
+ {
+ if (this->allocator_strategy_ == 0)
+ ACE_NEW_RETURN (this->reference_count_, int, -1);
+ else
+ ACE_ALLOCATOR_RETURN (this->reference_count_,
+ (int *) this->allocator_strategy_->malloc (sizeof (int)),
+ -1);
+ *this->reference_count_ = 1;
+ }
+ else
+ // Just assign the pointer so that all owners will share the same
+ // reference count.
+ this->reference_count_ = reference_count;
- if (this->cont_ != 0)
- nb->cont_ = this->cont_->clone (mask);
- return nb;
+ return 0;
}
ACE_Message_Block *
-ACE_Message_Block::release (void)
+ACE_Message_Block::release (void)
{
ACE_TRACE ("ACE_Message_Block::release");
- ACE_Message_Block *result = 0;
+ ACE_Message_Block *result;
+ // If there's a locking strategy then we need to acquire the lock
+ // before decrementing the count.
if (this->locking_strategy_)
{
- // We need to acquire the lock before decrementing the count.
this->locking_strategy_->acquire ();
- this->reference_count_--;
- if (this->reference_count_ == 0)
- delete this;
- else if (this->reference_count_ > 0)
+ ACE_ASSERT ((*this->reference_count_) > 0);
+
+ (*this->reference_count_)--;
+
+ if ((*this->reference_count_) == 0)
+ result = 0;
+ else // if ((*this->reference_count_) > 0)
result = this;
- // ACE_ASSERT (this->reference_count_ <= 0)
- // This shouldn't happen...
this->locking_strategy_->release ();
}
else
{
- this->reference_count_--;
+ ACE_ASSERT ((*this->reference_count_) >= 0);
+
+ (*this->reference_count_)--;
- if (this->reference_count_ == 0)
- delete this;
- else if (this->reference_count_ > 0)
+ if ((*this->reference_count_) == 0)
+ result = 0;
+ else // if ((*this->reference_count_) > 0)
result = this;
- // ACE_ASSERT (this->reference_count_ <= 0)
- // This shouldn't happen...
}
+ // We must delete this outside the scope of the locking_strategy_
+ // since otherwise we'd be trying to "release" through a deleted
+ // pointer!
+ if (result == 0)
+ delete this;
+
return result;
}
@@ -330,16 +347,84 @@ ACE_Message_Block::duplicate (void)
{
ACE_TRACE ("ACE_Message_Block::duplicate");
+ void *memory;
+
+ if (this->allocator_strategy_ == 0)
+ ACE_NEW_RETURN (memory, char[sizeof (ACE_Message_Block)], 0);
+ else
+ {
+ memory = this->allocator_strategy_->malloc (sizeof (ACE_Message_Block));
+
+ if (
+ }
+
+ ACE_NEW_RETURN (nb,
+ ACE_Message_Block (this->max_size_,
+ this->type_,
+ 0,
+ this->base_,
+ this->allocator_strategy_,
+ this->locking_strategy_,
+ this->reference_count_,
+ this->flags_),
+ 0);
+
+ ACE_Message_Block *nb;
+
+
+
+ // Create a new <ACE_Message_Block>, but share the <base_> pointer
+ // data (i.e., don't copy that).
if (this->locking_strategy_)
{
// We need to acquire the lock before incrementing the count.
this->locking_strategy_->acquire ();
- this->reference_count_++;
+ (*this->reference_count_)++;
this->locking_strategy_->release ();
}
else
- this->reference_count_++;
+ (*this->reference_count_)++;
+
+ // Increment the reference counts of all the continuation
+ // messages.
+ if (this->cont_)
+ this->cont_ = this->cont_->duplicate ();
+
+ return nb;
+}
- return this;
+ACE_Message_Block *
+ACE_Message_Block::clone (Message_Flags mask) const
+{
+ ACE_TRACE ("ACE_Message_Block::clone");
+
+ // You always want to clear this one to prevent memory leaks but you
+ // might add some others later.
+ const Message_Flags always_clear = ACE_Message_Block::DONT_DELETE;
+
+ ACE_Message_Block *nb;
+
+ ACE_NEW_RETURN (nb,
+ ACE_Message_Block (this->max_size_,
+ this->type_,
+ 0, 0,
+ this->allocator_strategy_,
+ this->locking_strategy_,
+ this->reference_count_,
+ this->flags_),
+ 0);
+
+ ACE_OS::memcpy (nb->base_, this->base_, this->max_size_);
+
+ nb->rd_ptr (this->rd_ptr_ - this->base_);
+ nb->wr_ptr (this->wr_ptr_ - this->base_);
+
+ // Set new flags minus the mask...
+ nb->clr_flags (mask | always_clear);
+
+ if (this->cont_ != 0)
+ nb->cont_ = this->cont_->clone (mask);
+
+ return nb;
}
diff --git a/ace/Message_Block.h b/ace/Message_Block.h
index ba1a760b2d1..2df9b3aceb7 100644
--- a/ace/Message_Block.h
+++ b/ace/Message_Block.h
@@ -157,13 +157,12 @@ public:
// = Reference counting methods.
ACE_Message_Block *duplicate (void);
- // Increment our reference count by one.
+ // Increase our reference count by 1.
ACE_Message_Block *release (void);
- // Decrement our reference count by one. If the reference count is
- // > 0 then return this; else if reference count == 0 then delete
- // <this> and return 0. Behavior is undefined if reference count <
- // 0.
+ // Decrease our reference count by 1. If the reference count is > 0
+ // then return this; else if reference count == 0 then delete <this>
+ // and return 0. Behavior is undefined if reference count < 0.
static ACE_Message_Block *release (ACE_Message_Block *mb);
// This behaves like the non-static method <release>, except that it
@@ -252,6 +251,27 @@ public:
// Declare the dynamic allocation hooks.
private:
+ // = Internal initialization methods.
+ ACE_Message_Block (size_t size,
+ ACE_Message_Type type,
+ ACE_Message_Block *cont,
+ const char *data,
+ ACE_Allocator *allocator,
+ ACE_Lock *locking_strategy,
+ int *reference_count,
+ Message_Flags flags);
+ // Perform the actual initialization.
+
+ int init_i (size_t size,
+ ACE_Message_Type type,
+ ACE_Message_Block *cont,
+ const char *data,
+ ACE_Allocator *allocator,
+ ACE_Lock *locking_strategy,
+ int *reference_count,
+ Message_Flags flags);
+ // Perform the actual initialization.
+
Message_Flags flags_;
// Misc flags (e.g., DONT_DELETE and USER_FLAGS).
@@ -286,16 +306,22 @@ private:
ACE_Message_Block *prev_;
// Pointer to previous message in the list.
+ // = Strategies.
ACE_Allocator *allocator_strategy_;
- // Pointer to the allocator defined for this message block.
+ // Pointer to the allocator defined for this message block. Note
+ // that this pointer is shared by all owners of this <Message_Block>.
ACE_Lock *locking_strategy_;
// Pointer to the locking defined for this message block. This is
- // used to protect regions of code containing
-
- size_t reference_count_;
- // Reference count for this <Message_Block> which is used to avoid
- // deep copies (i.e., <clone>).
+ // used to protect regions of code that access shared
+ // <ACE_Message_Block> state. Note that this lock is shared by all
+ // owners of the <Message_Block>'s data.
+
+ int *reference_count_;
+ // Pointer to a reference count for this <Message_Block>, which is
+ // used to avoid deep copies (i.e., <clone>). Note that this
+ // pointer value is shared by all owners of the <Message_Block>'s
+ // data.
// = Disallow these operations for now (use <clone> instead).
ACE_Message_Block &operator= (const ACE_Message_Block &);
diff --git a/ace/Message_Queue.cpp b/ace/Message_Queue.cpp
index bb3980bffdb..83d93b1f4e4 100644
--- a/ace/Message_Queue.cpp
+++ b/ace/Message_Queue.cpp
@@ -24,11 +24,15 @@ ACE_Message_Queue<ACE_SYNCH_2>::dump (void) const
"high_water_mark = %d\n"
"cur_bytes = %d\n"
"cur_count = %d\n",
+ "head_ = %u\n",
+ "tail_ = %u\n",
this->deactivated_,
this->low_water_mark_,
this->high_water_mark_,
this->cur_bytes_,
- this->cur_count_));
+ this->cur_count_,
+ this->head_,
+ this->tail_));
ACE_DEBUG ((LM_DEBUG,"notfull_cond: \n"));
notfull_cond_.dump();
ACE_DEBUG ((LM_DEBUG,"notempty_cond: \n"));
@@ -74,7 +78,6 @@ ACE_Message_Queue<ACE_SYNCH_2>::open (size_t hwm,
this->cur_count_ = 0;
this->tail_ = 0;
this->head_ = 0;
-
this->notification_strategy_ = ns;
return 0;
}
@@ -121,18 +124,22 @@ ACE_Message_Queue<ACE_SYNCH_2>::close (void)
for (this->tail_ = 0; this->head_ != 0; )
{
+ this->cur_count_--;
+
ACE_Message_Block *temp;
- // Make sure we decrement all the counts.
+ // Decrement all the counts.
for (temp = this->head_;
temp != 0;
temp = temp->cont ())
this->cur_bytes_ -= temp->size ();
- this->cur_count_--;
-
+ temp = this->head_;
this->head_ = this->head_->next ();
- delete temp;
+
+ // Make sure to use <release> rather than <delete> since this is
+ // reference counted.
+ temp->release ();
}
return res;
@@ -160,7 +167,6 @@ ACE_Message_Queue<ACE_SYNCH_2>::enqueue_tail_i (ACE_Message_Block *new_item)
// Link at the end.
else
{
-
new_item->next (0);
this->tail_->next (new_item);
new_item->prev (this->tail_);
diff --git a/ace/Module.cpp b/ace/Module.cpp
index c0d30dc1ccf..2c03705c28e 100644
--- a/ace/Module.cpp
+++ b/ace/Module.cpp
@@ -98,10 +98,6 @@ ACE_Module<ACE_SYNCH_2>::open (const char *mod_name,
this->reader (reader_q);
this->writer (writer_q);
- // Setup back pointers.
- reader_q->mod_ = this;
- writer_q->mod_ = this;
-
// Save the flags
this->flags_ = flags;
@@ -113,14 +109,15 @@ ACE_Module<ACE_SYNCH_2>::open (const char *mod_name,
this->close_i (0, M_DELETE_READER);
this->close_i (1, M_DELETE_WRITER);
- // Reset back pointers.
- reader_q->mod_ = 0;
- writer_q->mod_ = 0;
-
errno = ENOMEM;
return -1;
}
+ // Setup back pointers (this must come last, after we've made sure
+ // there's memory allocated here.
+ reader_q->mod_ = this;
+ writer_q->mod_ = this;
+
return 0;
}
diff --git a/ace/OS.h b/ace/OS.h
index 0490acd87f7..8fd10df7b48 100644
--- a/ace/OS.h
+++ b/ace/OS.h
@@ -2456,11 +2456,19 @@ private:
#define ACE_NEW_RETURN(POINTER,CONSTRUCTOR,RET_VAL) \
do { POINTER = new CONSTRUCTOR; \
if (POINTER == 0) { errno = ENOMEM; return RET_VAL; } \
- } while (0)
+ } while (0)
#define ACE_NEW(POINTER,CONSTRUCTOR) \
do { POINTER = new CONSTRUCTOR; \
if (POINTER == 0) { errno = ENOMEM; return; } \
- } while (0)
+ } while (0)
+#define ACE_ALLOCATOR_RETURN(POINTER,ALLOCATOR,RET_VAL) \
+ do { POINTER = ALLOCATOR; \
+ if (POINTER == 0) { errno = ENOMEM; return RET_VAL; } \
+ } while (0)
+#define ACE_ALLOCATOR(POINTER,ALLOCATOR) \
+ do { POINTER = ALLOCATOR; \
+ if (POINTER == 0) { errno = ENOMEM; return; } \
+ } while (0)
#define ACE_DEFAULT_MUTEX_A "ACE_MUTEX"
diff --git a/ace/ReactorEx.cpp b/ace/ReactorEx.cpp
index bae99ddafaf..9472a4e72cb 100644
--- a/ace/ReactorEx.cpp
+++ b/ace/ReactorEx.cpp
@@ -357,9 +357,10 @@ ACE_ReactorEx_Notify::handle_signal (int signum,
ACE_Notification_Buffer *buffer =
(ACE_Notification_Buffer *) mb->base ();
- // If eh == 0 then we've got major problems! Otherwise, we need
- // to dispatch the appropriate handle_* method on the
+ // If eh == 0 then we've got major problems! Otherwise, we
+ // need to dispatch the appropriate handle_* method on the
// ACE_Event_Handler pointer we've been passed.
+
if (buffer->eh_ != 0)
{
int result = 0;
@@ -385,7 +386,7 @@ ACE_ReactorEx_Notify::handle_signal (int signum,
}
// Make sure to delete the memory regardless of success or
// failure!
- delete mb;
+ mb->release ();
}
}
}
@@ -409,7 +410,7 @@ ACE_ReactorEx_Notify::notify (ACE_Event_Handler *eh,
if (this->message_queue_.enqueue_tail (mb) == -1)
{
- delete mb;
+ mb->release ();
return -1;
}
}
diff --git a/ace/Service_Object.cpp b/ace/Service_Object.cpp
index e0defc425a7..5b8e80fa533 100644
--- a/ace/Service_Object.cpp
+++ b/ace/Service_Object.cpp
@@ -1,6 +1,7 @@
-// Service_Object.cpp
// $Id$
+// Service_Object.cpp
+
#define ACE_BUILD_DLL
#include "ace/Service_Object.h"
@@ -10,8 +11,6 @@
ACE_ALLOC_HOOK_DEFINE(ACE_Service_Object)
-/* Provide the abstract base class common to all services */
-
ACE_Service_Object::ACE_Service_Object (void)
{
ACE_TRACE ("ACE_Service_Object::ACE_Service_Object");
@@ -48,10 +47,20 @@ ACE_Service_Type::ACE_Service_Type (const void *so,
const char *s_name,
unsigned int f)
: obj_ (so),
- flags_ (f)
+ flags_ (f),
+ name_ (0)
{
ACE_TRACE ("ACE_Service_Type::ACE_Service_Type");
- this->name (ACE_OS::strcpy (new char[::strlen (s_name) + 1], s_name));
+ this->name (s_name);
+}
+
+ACE_Service_Type::~ACE_Service_Type (void)
+{
+ ACE_TRACE ("ACE_Service_Type::~ACE_Service_Type");
+
+ // It's ok to call this, even though we may have already deleted it
+ // in the fini() method since it would then be NULL.
+ delete [] (char *) this->name_;
}
int
@@ -62,10 +71,13 @@ ACE_Service_Type::fini (void) const
this->name_, this->flags_));
delete [] (char *) this->name_;
+ ((ACE_Service_Type *) this)->name_ = 0;
+
if (ACE_BIT_ENABLED (this->flags_, ACE_Service_Type::DELETE_OBJ))
delete (void *) this->object ();
+
if (ACE_BIT_ENABLED (this->flags_, ACE_Service_Type::DELETE_THIS))
- delete (void *) this; // Prevent object's destructor from being called...
+ delete (ACE_Service_Type *) this;
+
return 0;
}
-
diff --git a/ace/Service_Object.h b/ace/Service_Object.h
index f06fc88ffb4..27949aa6737 100644
--- a/ace/Service_Object.h
+++ b/ace/Service_Object.h
@@ -47,10 +47,11 @@ public:
DELETE_THIS = 2 // Delete the enclosing object.
};
- // = Initialization method.
+ // = Initialization and termination methods.
ACE_Service_Type (const void *object,
const char *s_name,
u_int flags = 0);
+ ~ACE_Service_Type (void);
// = Pure virtual interface (must be defined by the subclass).
virtual int suspend (void) const = 0;
diff --git a/ace/Service_Object.i b/ace/Service_Object.i
index 530757b6363..e85fd0323bb 100644
--- a/ace/Service_Object.i
+++ b/ace/Service_Object.i
@@ -21,6 +21,8 @@ ACE_INLINE void
ACE_Service_Type::name (const char *n)
{
ACE_TRACE ("ACE_Service_Type::name");
- this->name_ = n;
-}
+ delete [] (char *) this->name_;
+ ACE_NEW (this->name_, char[::strlen (n) + 1]);
+ ACE_OS::strcpy ((char *) this->name_, n);
+}
diff --git a/ace/Service_Record.cpp b/ace/Service_Record.cpp
index 21cd5462e71..10feb4105ae 100644
--- a/ace/Service_Record.cpp
+++ b/ace/Service_Record.cpp
@@ -4,9 +4,6 @@
#define ACE_BUILD_DLL
#include "ace/Service_Record.h"
-#if !defined (ACE_SERVICE_RECORD_C)
-#define ACE_SERVICE_RECORD_C
-
#if !defined (__ACE_INLINE__)
#include "ace/Service_Record.i"
#endif /* __ACE_INLINE__ */
@@ -311,7 +308,10 @@ ACE_Service_Record::ACE_Service_Record (const char *n,
ACE_Service_Type *t,
const void *h,
int active)
- : type_ (t), handle_ (h), active_ (active)
+ : type_ (t),
+ handle_ (h),
+ active_ (active),
+ name_ (0)
{
ACE_TRACE ("ACE_Service_Record::ACE_Service_Record");
this->name (n);
@@ -361,5 +361,3 @@ template class ACE_Thru_Task<ACE_SYNCH>;
template class ACE_Stream_Head<ACE_SYNCH>;
template class ACE_Stream_Tail<ACE_SYNCH>;
#endif /* ACE_TEMPLATES_REQUIRE_SPECIALIZATION */
-
-#endif /* ACE_SERVICE_RECORD_C */
diff --git a/ace/Service_Record.i b/ace/Service_Record.i
index 0afc4051ee2..2f1de0654fc 100644
--- a/ace/Service_Record.i
+++ b/ace/Service_Record.i
@@ -49,6 +49,8 @@ ACE_INLINE void
ACE_Service_Record::name (const char *n)
{
ACE_TRACE ("ACE_Service_Record::name");
+
+ delete [] (char *) this->name_;
this->name_ = ACE_OS::strcpy (new char [::strlen (n) + 1], n);
}
diff --git a/ace/Singleton.cpp b/ace/Singleton.cpp
index 5d6a6e8ea31..05fa10e7a5a 100644
--- a/ace/Singleton.cpp
+++ b/ace/Singleton.cpp
@@ -13,13 +13,12 @@
#endif /* __ACE_INLINE__ */
template <class TYPE, class LOCK> void
-ACE_Singleton<TYPE, LOCK>::dump (void) const
+ACE_Singleton<TYPE, LOCK>::dump (void)
{
ACE_TRACE ("ACE_Singleton<TYPE, LOCK>::dump");
- ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this));
#if !defined (ACE_LACKS_STATIC_DATA_MEMBER_TEMPLATES)
- ACE_DEBUG ((LM_DEBUG, "instance_ = %x", this->instance_));
+ ACE_DEBUG ((LM_DEBUG, "instance_ = %x", instance_));
ace_singleton_lock_.dump ();
ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP));
#endif /* ACE_LACKS_STATIC_DATA_MEMBER_TEMPLATES */
diff --git a/ace/Singleton.h b/ace/Singleton.h
index 7761acf53c4..a1503183300 100644
--- a/ace/Singleton.h
+++ b/ace/Singleton.h
@@ -35,7 +35,7 @@ public:
static TYPE *instance (void);
// Global access point to the Singleton.
- void dump (void) const;
+ static void dump (void);
// Dump the state of the object.
protected:
diff --git a/ace/Stream.cpp b/ace/Stream.cpp
index d345c0f7662..92b8e423bca 100644
--- a/ace/Stream.cpp
+++ b/ace/Stream.cpp
@@ -348,26 +348,28 @@ ACE_Stream<ACE_SYNCH_2>::control (ACE_IO_Cntl_Msg::ACE_IO_Cntl_Cmds cmd,
ACE_TRACE ("ACE_Stream<ACE_SYNCH_2>::control");
ACE_IO_Cntl_Msg ioc (cmd);
- // Create a data block that contains the user-supplied data.
- ACE_Message_Block *db =
- new ACE_Message_Block (sizeof (int),
- ACE_Message_Block::MB_IOCTL,
- 0,
- (char *) a);
+ ACE_Message_Block *db = 0;
+
+ // Try to create a data block that contains the user-supplied data.
+ ACE_NEW_RETURN (db, ACE_Message_Block (sizeof (int),
+ ACE_Message_Block::MB_IOCTL,
+ 0,
+ (char *) a), -1);
- // Create a control block that contains the control field and a
- // pointer to the data block.
+ // Try to create a control block <cb> that contains the control
+ // field and a pointer to the data block <db> in <cb>'s continuation
+ // field.
ACE_Message_Block *cb =
new ACE_Message_Block (sizeof ioc,
ACE_Message_Block::MB_IOCTL,
db,
(char *) &ioc);
- // Make sure all of the allocation succeeded before continuing.
- if (db == 0 || cb == 0)
+ // If we can't allocate <cb> then we need to delete db and return
+ // -1.
+ if (cb == 0)
{
- delete cb;
- delete db;
+ db->release ();
errno = ENOMEM;
return -1;
}
@@ -381,7 +383,9 @@ ACE_Stream<ACE_SYNCH_2>::control (ACE_IO_Cntl_Msg::ACE_IO_Cntl_Cmds cmd,
else
result = ((ACE_IO_Cntl_Msg *) cb->rd_ptr ())->rval ();
- delete cb; // This also deletes db...
+ // This will also release db if it's reference count == 0.
+ cb->release ();
+
return result;
}
diff --git a/ace/Stream_Modules.cpp b/ace/Stream_Modules.cpp
index 54c6c854435..edea345250b 100644
--- a/ace/Stream_Modules.cpp
+++ b/ace/Stream_Modules.cpp
@@ -74,7 +74,7 @@ ACE_Stream_Head<ACE_SYNCH_2>::control (ACE_Message_Block *mb)
return ioc->rval ();
}
-/* Performs canonical flushing at the ACE_Stream Head */
+// Performs canonical flushing at the ACE_Stream Head.
template <ACE_SYNCH_1> int
ACE_Stream_Head<ACE_SYNCH_2>::canonical_flush (ACE_Message_Block *mb)
@@ -82,20 +82,22 @@ ACE_Stream_Head<ACE_SYNCH_2>::canonical_flush (ACE_Message_Block *mb)
ACE_TRACE ("ACE_Stream_Head<ACE_SYNCH_2>::canonical_flush");
char *cp = mb->rd_ptr ();
- if (*cp & ACE_Task_Flags::ACE_FLUSHR)
+ if (ACE_BIT_ENABLED (*cp, ACE_Task_Flags::ACE_FLUSHR))
{
this->flush (ACE_Task_Flags::ACE_FLUSHALL);
- *cp &= ~ACE_Task_Flags::ACE_FLUSHR;
+ ACE_CLR_BITS (*cp, ACE_Task_Flags::ACE_FLUSHR);
}
- if (*cp & ACE_Task_Flags::ACE_FLUSHW)
+
+ if (ACE_BIT_ENABLED (*cp, ACE_Task_Flags::ACE_FLUSHW))
return this->reply (mb);
else
- delete mb;
+ mb->release ();
return 0;
}
template <ACE_SYNCH_1> int
-ACE_Stream_Head<ACE_SYNCH_2>::put (ACE_Message_Block *mb, ACE_Time_Value *tv)
+ACE_Stream_Head<ACE_SYNCH_2>::put (ACE_Message_Block *mb,
+ ACE_Time_Value *tv)
{
ACE_TRACE ("ACE_Stream_Head<ACE_SYNCH_2>::put");
int res = 0;
@@ -105,10 +107,8 @@ ACE_Stream_Head<ACE_SYNCH_2>::put (ACE_Message_Block *mb, ACE_Time_Value *tv)
return res;
if (this->is_writer ())
- {
- return this->put_next (mb, tv);
- }
- else /* this->is_reader () */
+ return this->put_next (mb, tv);
+ else // this->is_reader ()
{
switch (mb->msg_type ())
{
@@ -215,7 +215,7 @@ ACE_Stream_Tail<ACE_SYNCH_2>::control (ACE_Message_Block *mb)
return this->reply (mb);
}
-/* Perform flush algorithm as though we were the driver */
+// Perform flush algorithm as though we were the driver.
template <ACE_SYNCH_1> int
ACE_Stream_Tail<ACE_SYNCH_2>::canonical_flush (ACE_Message_Block *mb)
@@ -223,27 +223,29 @@ ACE_Stream_Tail<ACE_SYNCH_2>::canonical_flush (ACE_Message_Block *mb)
ACE_TRACE ("ACE_Stream_Tail<ACE_SYNCH_2>::canonical_flush");
char *cp = mb->rd_ptr ();
- if (*cp & ACE_Task_Flags::ACE_FLUSHW)
+ if (ACE_BIT_ENABLED (*cp, ACE_Task_Flags::ACE_FLUSHW))
{
this->flush (ACE_Task_Flags::ACE_FLUSHALL);
- *cp &= ~ACE_Task_Flags::ACE_FLUSHW;
+ ACE_BIT_CLR (*cp, ACE_Task_Flags::ACE_FLUSHW);
}
- if (*cp & ACE_Task_Flags::ACE_FLUSHR)
+
+ if (ACE_BIT_ENABLED (*cp, ACE_Task_Flags::ACE_FLUSHR))
{
this->sibling ()->flush (ACE_Task_Flags::ACE_FLUSHALL);
return this->reply (mb);
}
else
- delete mb;
+ mb->release ();
+
return 0;
}
template <ACE_SYNCH_1> int
-ACE_Stream_Tail<ACE_SYNCH_2>::put (ACE_Message_Block *mb, ACE_Time_Value *
-
-)
+ACE_Stream_Tail<ACE_SYNCH_2>::put (ACE_Message_Block *mb,
+ ACE_Time_Value *)
{
ACE_TRACE ("ACE_Stream_Tail<ACE_SYNCH_2>::put");
+
if (this->is_writer ())
{
switch (mb->msg_type ())
@@ -252,7 +254,7 @@ ACE_Stream_Tail<ACE_SYNCH_2>::put (ACE_Message_Block *mb, ACE_Time_Value *
return this->control (mb);
/* NOTREACHED */
default:
- delete mb;
+ mb->release ();
}
}
diff --git a/ace/Synch_T.i b/ace/Synch_T.i
index 252b90633f1..336a0805b1c 100644
--- a/ace/Synch_T.i
+++ b/ace/Synch_T.i
@@ -5,6 +5,74 @@
#include "ace/Thread.h"
+// Explicitly destroy the lock.
+template <class LOCKING_MECHANISM> ACE_INLINE int
+ACE_Lock_Adapter<LOCKING_MECHANISM>::remove (void)
+{
+ return this->lock_.remove ();
+}
+
+// Block the thread until the lock is acquired.
+template <class LOCKING_MECHANISM> ACE_INLINE int
+ACE_Lock_Adapter<LOCKING_MECHANISM>::acquire (void)
+{
+ return this->lock_.acquire ();
+}
+
+// Conditionally acquire the lock (i.e., won't block).
+
+template <class LOCKING_MECHANISM> ACE_INLINE int
+ACE_Lock_Adapter<LOCKING_MECHANISM>::tryacquire (void)
+{
+ return this->lock_.tryacquire ();
+}
+
+// Release the lock.
+
+template <class LOCKING_MECHANISM> ACE_INLINE int
+ACE_Lock_Adapter<LOCKING_MECHANISM>::release (void)
+{
+ return this->lock_.release ();
+}
+
+// Block until the thread acquires a read lock. If the locking
+// mechanism doesn't support read locks then this just calls
+// <acquire>.
+
+template <class LOCKING_MECHANISM> ACE_INLINE int
+ACE_Lock_Adapter<LOCKING_MECHANISM>::acquire_read (void)
+{
+ return this->lock_.acquire_read ();
+}
+
+// Block until the thread acquires a write lock. If the locking
+// mechanism doesn't support read locks then this just calls
+// <acquire>.
+
+template <class LOCKING_MECHANISM> ACE_INLINE int
+ACE_Lock_Adapter<LOCKING_MECHANISM>::acquire_write (void)
+{
+ return this->lock_.acquire_write ();
+}
+
+// Conditionally acquire a read lock. If the locking mechanism
+// doesn't support read locks then this just calls <acquire>.
+
+template <class LOCKING_MECHANISM> ACE_INLINE int
+ACE_Lock_Adapter<LOCKING_MECHANISM>::tryacquire_read (void)
+{
+ return this->lock_.tryacquire_read ();
+}
+
+// Conditionally acquire a write lock. If the locking mechanism
+// doesn't support write locks then this just calls <acquire>.
+
+template <class LOCKING_MECHANISM> ACE_INLINE int
+ACE_Lock_Adapter<LOCKING_MECHANISM>::tryacquire_write (void)
+{
+ return this->lock_.tryacquire_write ();
+}
+
template <class LOCK, class TYPE> ACE_INLINE
ACE_Atomic_Op<LOCK, TYPE>::ACE_Atomic_Op (const ACE_Atomic_Op<LOCK, TYPE> &rhs)
{
@@ -199,72 +267,4 @@ ACE_TSS<TYPE>::ts_get (void) const
return (TYPE *) &this->type_;
}
-// Explicitly destroy the lock.
-template <class LOCKING_MECHANISM> int
-ACE_Lock_Adapter<LOCKING_MECHANISM>::remove (void)
-{
- return this->lock_.remove ();
-}
-
-// Block the thread until the lock is acquired.
-template <class LOCKING_MECHANISM> int
-ACE_Lock_Adapter<LOCKING_MECHANISM>::acquire (void)
-{
- return this->lock_.acquire ();
-}
-
-// Conditionally acquire the lock (i.e., won't block).
-
-template <class LOCKING_MECHANISM> int
-ACE_Lock_Adapter<LOCKING_MECHANISM>::tryacquire (void)
-{
- return this->lock_.tryacquire ();
-}
-
-// Release the lock.
-
-template <class LOCKING_MECHANISM> int
-ACE_Lock_Adapter<LOCKING_MECHANISM>::release (void)
-{
- return this->lock_.release ();
-}
-
-// Block until the thread acquires a read lock. If the locking
-// mechanism doesn't support read locks then this just calls
-// <acquire>.
-
-template <class LOCKING_MECHANISM> int
-ACE_Lock_Adapter<LOCKING_MECHANISM>::acquire_read (void)
-{
- return this->lock_.acquire_read ();
-}
-
-// Block until the thread acquires a write lock. If the locking
-// mechanism doesn't support read locks then this just calls
-// <acquire>.
-
-template <class LOCKING_MECHANISM> int
-ACE_Lock_Adapter<LOCKING_MECHANISM>::acquire_write (void)
-{
- return this->lock_.acquire_write ();
-}
-
-// Conditionally acquire a read lock. If the locking mechanism
-// doesn't support read locks then this just calls <acquire>.
-
-template <class LOCKING_MECHANISM> int
-ACE_Lock_Adapter<LOCKING_MECHANISM>::tryacquire_read (void)
-{
- return this->lock_.tryacquire_read ();
-}
-
-// Conditionally acquire a write lock. If the locking mechanism
-// doesn't support read locks then this just calls <acquire>.
-
-template <class LOCKING_MECHANISM> int
-ACE_Lock_Adapter<LOCKING_MECHANISM>::acquire_write (void)
-{
- return this->lock_.acquire_write ();
-}
-
#endif /* defined (ACE_HAS_THREADS) && defined (ACE_HAS_THREAD_SPECIFIC_STORAGE) */
diff --git a/ace/UPIPE_Stream.cpp b/ace/UPIPE_Stream.cpp
index b5fba065f27..b68183f06f2 100644
--- a/ace/UPIPE_Stream.cpp
+++ b/ace/UPIPE_Stream.cpp
@@ -120,8 +120,7 @@ ACE_UPIPE_Stream::recv (char *buffer,
this->remaining_);
bytes_read += this->remaining_;
this->remaining_ = 0;
- delete this->mb_last_;
- this->mb_last_ = 0;
+ this->mb_last_ = this->mb_last_->release ();
return bytes_read;
}
else
@@ -139,12 +138,9 @@ ACE_UPIPE_Stream::recv (char *buffer,
this->remaining_ -= n;
if (this->remaining_ == 0)
- {
- // Now the Message_Buffer is empty.
+ // Now the Message_Buffer is empty.
- delete this->mb_last_;
- this->mb_last_ = 0;
- }
+ this->mb_last_ = this->mb_last_->release ();
}
}
else
diff --git a/apps/Gateway/Gateway/Event_Channel.cpp b/apps/Gateway/Gateway/Event_Channel.cpp
index 532523956a9..6b1337ffab2 100644
--- a/apps/Gateway/Gateway/Event_Channel.cpp
+++ b/apps/Gateway/Gateway/Event_Channel.cpp
@@ -18,9 +18,12 @@ ACE_Event_Channel_Options::ACE_Event_Channel_Options (void)
ACE_Event_Channel::~ACE_Event_Channel (void)
{
+ delete this->lock_adapter_;
}
ACE_Event_Channel::ACE_Event_Channel (void)
+ : lock_adapter_ (0),
+ acceptor_ (*this)
{
}
@@ -30,6 +33,12 @@ ACE_Event_Channel::options (void)
return this->options_;
}
+ACE_Lock *
+ACE_Event_Channel::message_block_locking_strategy (void)
+{
+ return this->lock_adapter_;
+}
+
int
ACE_Event_Channel::compute_performance_statistics (void)
{
@@ -100,7 +109,7 @@ ACE_Event_Channel::compute_performance_statistics (void)
ACE_Event_Channel::handle_timeout (const ACE_Time_Value &,
const void *)
{
- return this->collection_performance_statistics ();
+ return this->compute_performance_statistics ();
}
// This method forwards the <event> to Consumer that have registered
@@ -196,11 +205,6 @@ ACE_Event_Channel::initiate_proxy_connection (Proxy_Handler *proxy_handler,
}
int
-ACE_Event_Channel::initiate_proxy_accept (void)
-{
-}
-
-int
ACE_Event_Channel::complete_proxy_connection (Proxy_Handler *proxy_handler)
{
int option = proxy_handler->proxy_role () == 'S' ? SO_RCVBUF : SO_SNDBUF;
@@ -391,12 +395,20 @@ ACE_Event_Channel::open (void *)
// Ignore SIPPIPE so each Consumer_Proxy can handle it.
ACE_Sig_Action sig (ACE_SignalHandler (SIG_IGN), SIGPIPE);
- if (this->connector_role_)
+ if (this->options ().connector_role_)
// Actively initiate Peer connections.
this->initiate_connector ();
- if (this->acceptor_role_)
+ if (this->options ().acceptor_role_)
// Passively initiate Peer acceptor.
this->initiate_acceptor ();
+
+ // If we're not running reactively, then we need to make sure that
+ // <ACE_Message_Block> reference counting operations are
+ // thread-safe. Therefore, we create an <ACE_Lock_Adapter> that is
+ // parameterized by <ACE_Thread_Mutex> to prevent race conditions.
+ if (this->options ().threading_strategy_ != ACE_Event_Channel_Options::REACTIVE)
+ ACE_NEW_RETURN (this->lock_adapter_, ACE_Lock_Adapter<ACE_Thread_Mutex>, -1);
+
return 0;
}
diff --git a/apps/Gateway/Gateway/Event_Channel.h b/apps/Gateway/Gateway/Event_Channel.h
index 513fa36e1f9..0baaac64c81 100644
--- a/apps/Gateway/Gateway/Event_Channel.h
+++ b/apps/Gateway/Gateway/Event_Channel.h
@@ -130,6 +130,13 @@ private:
// Periodically callback to perform timer-based performance
// profiling.
+ ACE_Lock *message_block_locking_strategy (void);
+ // The strategy for locking <ACE_Message_Block> reference counting.
+ // This is NULL if our threading strategy is REACTIVE, else it
+ // points to the ACE_Lock_Adapter<ACE_Thread_Mutex> defined below.
+
+ ACE_Lock_Adapter<ACE_Thread_Mutex> *lock_adapter_;
+
Proxy_Handler_Connector connector_;
// Used to establish the connections actively.
diff --git a/apps/Gateway/Gateway/Gateway.cpp b/apps/Gateway/Gateway/Gateway.cpp
index 054a06282aa..fd4524036f4 100644
--- a/apps/Gateway/Gateway/Gateway.cpp
+++ b/apps/Gateway/Gateway/Gateway.cpp
@@ -306,7 +306,7 @@ Gateway::parse_consumer_config_file (void)
// Read config file line at a time.
for (Consumer_Config_Info cci;
- consumer_file.read_entry (cci, line_number) != FP::EOFILE);
+ consumer_file.read_entry (cci, line_number) != FP::EOFILE;
)
{
file_empty = 0;
diff --git a/examples/ASX/Event_Server/Event_Server/Peer_Router.cpp b/examples/ASX/Event_Server/Event_Server/Peer_Router.cpp
index 1cbaa77b83b..a81b2e73e63 100644
--- a/examples/ASX/Event_Server/Event_Server/Peer_Router.cpp
+++ b/examples/ASX/Event_Server/Event_Server/Peer_Router.cpp
@@ -48,6 +48,7 @@ void
Peer_Router_Context::release (void)
{
this->reference_count_--;
+
if (this->reference_count_ == 0)
delete this;
}
@@ -141,6 +142,17 @@ Peer_Handler::Peer_Handler (Peer_Router_Context *prc)
}
#if 0
+
+// Right now, Peer_Handlers are purely Reactive, i.e., they all run in
+// a single thread of control. It would be easy to make them Active
+// Objects by calling activate() in Peer_Handler::open(), making
+// Peer_Handler::put() enqueue each message on the message queue, and
+// (3) then running the following svc() routine to route each message
+// to its final destination within a separate thread. Note that we'd
+// want to move the svc() call up to the Consumer_Router and
+// Supplier_Router level in order to get the right level of control
+// for input and output.
+
Peer_Handler::svc (void)
{
ACE_Thread_Control thread_control (tm);
@@ -184,10 +196,20 @@ Peer_Handler::svc (void)
#endif
int
-Peer_Handler::put (ACE_Message_Block *mb, ACE_Time_Value *)
+Peer_Handler::put (ACE_Message_Block *mb, ACE_Time_Value *tv)
{
- return this->peer ().send_n (mb->rd_ptr (),
- mb->length ());
+#if 0
+ // If we're running as Active Objects just enqueue the message here.
+ return this->putq (mb, tv);
+#else
+ int result = 0;
+
+ result = this->peer ().send_n (mb->rd_ptr (),
+ mb->length ());
+ // Release the memory.
+ mb->release ();
+ return result;
+#endif
}
// Initialize a newly connected handler.
@@ -203,16 +225,18 @@ Peer_Handler::open (void *)
else
ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "info"), -1);
#if 0
+ // If we're running as an Active Object activate the Peer_Handler
+ // here.
if (this->activate (Options::instance ()->t_flags ()) == -1)
ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "activation of thread failed"), -1);
-#endif
ACE_DEBUG ((LM_DEBUG,
"(%t) Peer_Handler::open registering with Reactor for handle_input\n"));
-
+#else
// Register with the Reactor to receive messages from our Peer.
if (ACE_Service_Config::reactor ()->register_handler
(this, ACE_Event_Handler::READ_MASK) == -1)
ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "register_handler"), -1);
+#endif
// Insert outselves into the routing map.
else if (this->prc_->bind_peer (this->get_handle (), this) == -1)
@@ -239,8 +263,8 @@ Peer_Handler::handle_input (ACE_HANDLE h)
// Check for memory failures.
if (db == 0 || hb == 0)
{
- delete hb;
- delete db;
+ hb->release ();
+ db->release ();
errno = ENOMEM;
return -1;
}
diff --git a/examples/ASX/UPIPE_Event_Server/Peer_Router.cpp b/examples/ASX/UPIPE_Event_Server/Peer_Router.cpp
index f17560ad0e6..a0bffc95627 100644
--- a/examples/ASX/UPIPE_Event_Server/Peer_Router.cpp
+++ b/examples/ASX/UPIPE_Event_Server/Peer_Router.cpp
@@ -195,7 +195,7 @@ Peer_Router<PH, PK>::send_peers (ACE_Message_Block *mb)
bytes += ss->int_id_->put (data_block);
}
- delete mb;
+ mb->release ();
return bytes == 0 ? 0 : bytes / iterations;
}
diff --git a/examples/IPC_SAP/UPIPE_SAP/ex1.cpp b/examples/IPC_SAP/UPIPE_SAP/ex1.cpp
index ba5490504c3..4f4d8d99a55 100644
--- a/examples/IPC_SAP/UPIPE_SAP/ex1.cpp
+++ b/examples/IPC_SAP/UPIPE_SAP/ex1.cpp
@@ -49,7 +49,7 @@ peer1 (void *)
ACE_DEBUG ((LM_DEBUG, "(%t) peer1 ack is \"%s\"\n", mb->rd_ptr ()));
// Free up the memory block.
- delete mb;
+ mb->release ();
// Now try the send()/recv() interface.
char mytext[] = "This string is sent by peer1 as buffer";
diff --git a/examples/IPC_SAP/UPIPE_SAP/ex2.cpp b/examples/IPC_SAP/UPIPE_SAP/ex2.cpp
index 63e37e7089a..06e831bf9eb 100644
--- a/examples/IPC_SAP/UPIPE_SAP/ex2.cpp
+++ b/examples/IPC_SAP/UPIPE_SAP/ex2.cpp
@@ -110,7 +110,7 @@ consumer (void *)
for (ACE_Message_Block *mb = 0;
c_stream.recv (mb) != -1 && mb->size () != 0;
- delete mb)
+ mb->release ())
received_messages++;
ACE_OS::time (&currsec);
diff --git a/examples/Reactor/Misc/test_demuxing.cpp b/examples/Reactor/Misc/test_demuxing.cpp
index f8f7992ce37..56c8e6cf400 100644
--- a/examples/Reactor/Misc/test_demuxing.cpp
+++ b/examples/Reactor/Misc/test_demuxing.cpp
@@ -270,7 +270,7 @@ Message_Handler::handle_input (ACE_HANDLE)
else
{
ACE_DEBUG ((LM_DEBUG, "(%t) priority = %d\n", mb->msg_priority ()));
- delete mb;
+ mb->release ();
}
return 0;
diff --git a/examples/Reactor/ReactorEx/test_reactorEx.cpp b/examples/Reactor/ReactorEx/test_reactorEx.cpp
index 295b36ffda0..ea69d89ade4 100644
--- a/examples/Reactor/ReactorEx/test_reactorEx.cpp
+++ b/examples/Reactor/ReactorEx/test_reactorEx.cpp
@@ -210,7 +210,7 @@ Peer_Handler::handle_output_complete (ACE_Message_Block *msg,
// This was allocated by the STDIN_Handler, queued, dequeued,
// passed to the proactor, and now passed back to us.
- delete msg;
+ msg->release ();
return 0; // Do not reinvoke a send.
}
@@ -222,21 +222,25 @@ int
Peer_Handler::handle_input_complete (ACE_Message_Block *msg,
long bytes_transferred)
{
+ int result = 1; // Reinvokes the recv() operation by default!
+
if (bytes_transferred > 0 && msg->length () > 0)
{
msg->rd_ptr ()[bytes_transferred] = '\0';
// Print out the message received from the server.
ACE_DEBUG ((LM_DEBUG, "%s", msg->rd_ptr ()));
- delete msg;
- return 1; // Reinvokes the recv() operation!
+ }
+ else
+ {
+ // If a read failed, we will assume it's because the remote peer
+ // went away. We will end the event loop. Since we're in the
+ // main thread, we don't need to do a notify.
+ ACE_Service_Config::end_reactorEx_event_loop ();
+ result = -1;
}
- delete msg;
- // If a read failed, we will assume it's because the remote peer
- // went away. We will end the event loop. Since we're in the main
- // thread, we don't need to do a notify.
- ACE_Service_Config::end_reactorEx_event_loop ();
- return -1; // Close down.
+ msg->release ();
+ return result;
}
// This is so the Proactor can get a message to read into.
diff --git a/examples/Reactor/WFMO_Reactor/test_reactorEx.cpp b/examples/Reactor/WFMO_Reactor/test_reactorEx.cpp
index 295b36ffda0..ea69d89ade4 100644
--- a/examples/Reactor/WFMO_Reactor/test_reactorEx.cpp
+++ b/examples/Reactor/WFMO_Reactor/test_reactorEx.cpp
@@ -210,7 +210,7 @@ Peer_Handler::handle_output_complete (ACE_Message_Block *msg,
// This was allocated by the STDIN_Handler, queued, dequeued,
// passed to the proactor, and now passed back to us.
- delete msg;
+ msg->release ();
return 0; // Do not reinvoke a send.
}
@@ -222,21 +222,25 @@ int
Peer_Handler::handle_input_complete (ACE_Message_Block *msg,
long bytes_transferred)
{
+ int result = 1; // Reinvokes the recv() operation by default!
+
if (bytes_transferred > 0 && msg->length () > 0)
{
msg->rd_ptr ()[bytes_transferred] = '\0';
// Print out the message received from the server.
ACE_DEBUG ((LM_DEBUG, "%s", msg->rd_ptr ()));
- delete msg;
- return 1; // Reinvokes the recv() operation!
+ }
+ else
+ {
+ // If a read failed, we will assume it's because the remote peer
+ // went away. We will end the event loop. Since we're in the
+ // main thread, we don't need to do a notify.
+ ACE_Service_Config::end_reactorEx_event_loop ();
+ result = -1;
}
- delete msg;
- // If a read failed, we will assume it's because the remote peer
- // went away. We will end the event loop. Since we're in the main
- // thread, we don't need to do a notify.
- ACE_Service_Config::end_reactorEx_event_loop ();
- return -1; // Close down.
+ msg->release ();
+ return result;
}
// This is so the Proactor can get a message to read into.
diff --git a/examples/Service_Configurator/IPC-tests/server/Handle_Thr_Stream.cpp b/examples/Service_Configurator/IPC-tests/server/Handle_Thr_Stream.cpp
index f7e2d98f69a..71a3c10e556 100644
--- a/examples/Service_Configurator/IPC-tests/server/Handle_Thr_Stream.cpp
+++ b/examples/Service_Configurator/IPC-tests/server/Handle_Thr_Stream.cpp
@@ -13,6 +13,13 @@
#include "Handle_Thr_Stream.i"
#endif /* __ACE_INLINE__ */
+// Shorthand names.
+#define SH SVC_HANDLER
+#define PR_AC_1 ACE_PEER_ACCEPTOR_1
+#define PR_AC_2 ACE_PEER_ACCEPTOR_2
+#define PR_ST_1 ACE_PEER_STREAM_1
+#define PR_ST_2 ACE_PEER_STREAM_2
+
template <class SH, PR_AC_1>
Handle_Thr_Acceptor<SH, PR_AC_2>::~Handle_Thr_Acceptor (void)
{
@@ -146,6 +153,12 @@ CLI_Stream<PR_ST_2>::svc (void)
return 0;
}
+#undef SH
+#undef PR_AC_1
+#undef PR_AC_2
+#undef PR_ST_1
+#undef PR_ST_2
+
//----------------------------------------
#if defined (ACE_HAS_TLI)
diff --git a/examples/Threads/barrier2.cpp b/examples/Threads/barrier2.cpp
index 30190ace443..08119d9087c 100644
--- a/examples/Threads/barrier2.cpp
+++ b/examples/Threads/barrier2.cpp
@@ -99,7 +99,7 @@ Worker_Task<BARRIER>::put (ACE_Message_Block *mb, ACE_Time_Value *tv)
if (this->output (mb) < 0)
ACE_DEBUG ((LM_DEBUG, "(%t) output not connected!\n"));
- delete mb;
+ mb->release ();
}
return result;
}
@@ -149,14 +149,14 @@ Worker_Task<BARRIER>::svc (void)
if (length == 0)
{
ACE_DEBUG ((LM_DEBUG, "(%t) in iteration %d got quit, exit!\n", iter));
- delete mb;
+ mb->release ();
break;
}
this->barrier_.wait ();
this->output (mb);
- delete mb;
+ mb->release ();
}
// Note that the ACE_Task::svc_run () method automatically removes
diff --git a/examples/Threads/thread_pool.cpp b/examples/Threads/thread_pool.cpp
index ddcad02e4dd..53cd86d1105 100644
--- a/examples/Threads/thread_pool.cpp
+++ b/examples/Threads/thread_pool.cpp
@@ -96,7 +96,7 @@ Thread_Pool::svc (void)
count, length, length - 1, mb->rd_ptr ()));
// We're responsible for deallocating this.
- delete mb;
+ mb->release ();
if (length == 0)
{
diff --git a/netsvcs/servers/main.cpp b/netsvcs/servers/main.cpp
index b2f614c1567..67c5c9f3b57 100644
--- a/netsvcs/servers/main.cpp
+++ b/netsvcs/servers/main.cpp
@@ -60,36 +60,36 @@ main (int argc, char *argv[])
l_argv[1] = 0;
Service_Ptr sp_2 = ACE_SVC_INVOKE (ACE_TS_Server_Acceptor);
- if (so->init (1, l_argv) == -1)
+ if (sp_2->init (1, l_argv) == -1)
ACE_ERROR ((LM_ERROR, "%p\n%a", "ACE_TS_Server_Acceptor", 1));
l_argv[0] = argv[0];
l_argv[1] = "-p 10011";
l_argv[2] = 0;
- Service_Ptr sp_2 = ACE_SVC_INVOKE (ACE_TS_Clerk_Processor);
+ Service_Ptr sp_3 = ACE_SVC_INVOKE (ACE_TS_Clerk_Processor);
- if (sp_2->init (2, l_argv) == -1)
+ if (sp_3->init (2, l_argv) == -1)
ACE_ERROR ((LM_ERROR, "%p\n%a", "ACE_TS_Clerk_Processor", 1));
l_argv[0] = "-p " ACE_DEFAULT_TOKEN_SERVER_PORT_STR;
l_argv[1] = 0;
- Service_Ptr sp_3 = ACE_SVC_INVOKE (ACE_Token_Acceptor);
+ Service_Ptr sp_4 = ACE_SVC_INVOKE (ACE_Token_Acceptor);
- if (sp_3->init (1, l_argv) == -1)
+ if (sp_4->init (1, l_argv) == -1)
ACE_ERROR ((LM_ERROR, "%p\n%a", "Token_Service", 1));
l_argv[0] = "-p " ACE_DEFAULT_LOGGING_SERVER_PORT_STR;
l_argv[1] = 0;
- Service_Ptr sp_4 = ACE_SVC_INVOKE (ACE_Server_Logging_Acceptor);
+ Service_Ptr sp_5 = ACE_SVC_INVOKE (ACE_Server_Logging_Acceptor);
- if (sp_4->init (1, l_argv) == -1)
+ if (sp_5->init (1, l_argv) == -1)
ACE_ERROR ((LM_ERROR, "%p\n%a", "Logging_Service", 1));
l_argv[0] = "-p " ACE_DEFAULT_THR_LOGGING_SERVER_PORT_STR;
l_argv[1] = 0;
- Service_Ptr sp_5 = ACE_SVC_INVOKE (ACE_Thr_Server_Logging_Acceptor);
+ Service_Ptr sp_6 = ACE_SVC_INVOKE (ACE_Thr_Server_Logging_Acceptor);
- if (sp_5->init (1, l_argv) == -1)
+ if (sp_6->init (1, l_argv) == -1)
ACE_ERROR ((LM_ERROR, "%p\n%a", "Thr_Logging_Service", 1));
}
diff --git a/tests/Buffer_Stream_Test.cpp b/tests/Buffer_Stream_Test.cpp
index aa5ec27414c..2ba15d15cc3 100644
--- a/tests/Buffer_Stream_Test.cpp
+++ b/tests/Buffer_Stream_Test.cpp
@@ -174,7 +174,7 @@ Consumer::svc (void)
ACE_ASSERT (c == output[0]);
c++;
}
- delete mb;
+ mb->release ();
if (length == 0)
{
diff --git a/tests/Future_Test.cpp b/tests/Future_Test.cpp
index a02542c9aff..5f01738de12 100644
--- a/tests/Future_Test.cpp
+++ b/tests/Future_Test.cpp
@@ -382,10 +382,10 @@ main (int, char *[])
ACE_DEBUG ((LM_DEBUG,
"(%t) task_count %d future_count %d capsule_count %d methodobject_count %d\n",
- (u_long) task_count,
- (u_long) future_count,
- (u_long) capsule_count,
- (u_long) methodobject_count));
+ (int) task_count,
+ (int) future_count,
+ (int) capsule_count,
+ (int) methodobject_count));
}
// Close things down.
@@ -398,10 +398,10 @@ main (int, char *[])
ACE_DEBUG ((LM_DEBUG,
"(%t) task_count %d future_count %d capsule_count %d methodobject_count %d\n",
- (u_long) task_count,
- (u_long) future_count,
- (u_long) capsule_count,
- (u_long) methodobject_count));
+ (int) task_count,
+ (int) future_count,
+ (int) capsule_count,
+ (int) methodobject_count));
ACE_DEBUG ((LM_DEBUG,"(%t) th' that's all folks!\n"));
diff --git a/tests/Priority_Buffer_Test.cpp b/tests/Priority_Buffer_Test.cpp
index 09f1b7635d9..3a1134c63a2 100644
--- a/tests/Priority_Buffer_Test.cpp
+++ b/tests/Priority_Buffer_Test.cpp
@@ -68,7 +68,7 @@ consumer (void *args)
// Free up the buffer memory and the Message_Block. Note that
// the destructor of Message Block will delete the the actual
// buffer.
- delete mb;
+ mb->release ();
if (length == 0)
break;
diff --git a/tests/Reactors_Test.cpp b/tests/Reactors_Test.cpp
index 661298efd2d..42de847e165 100644
--- a/tests/Reactors_Test.cpp
+++ b/tests/Reactors_Test.cpp
@@ -127,7 +127,7 @@ Test_Task::handle_input (ACE_HANDLE)
done_count--;
ACE_DEBUG ((LM_DEBUG,
"(%t) handle_input, handled_ = %d, done_count = %d\n",
- this->handled_, (u_long) done_count));
+ this->handled_, (int) done_count));
}
ACE_OS::thr_yield ();
diff --git a/tests/Thread_Pool_Test.cpp b/tests/Thread_Pool_Test.cpp
index 2363850e7f3..ee4ce7c1ffa 100644
--- a/tests/Thread_Pool_Test.cpp
+++ b/tests/Thread_Pool_Test.cpp
@@ -10,20 +10,21 @@
//
// = DESCRIPTION
// This test program illustrates how the ACE task synchronization
-// mechanisms work in conjunction with the ACE_Task and the
-// ACE_Thread_Manager. If the manual flag is not set input comes
-// from stdin until the user enters a return only. This stops
-// all workers via a message block of length 0. This is an
-// alternative shutdown of workers compared to queue deactivate.
+// mechanisms and ACE_Message_Block reference counting works in
+// conjunction with the ACE_Task and the ACE_Thread_Manager. If
+// the manual flag is not set input comes from stdin until the
+// user enters a return. This stops all workers via a message
+// block of length 0. This shows an alternative way to shutdown
+// worker tasks compared to queue deactivate.
//
// = AUTHOR
// Karlheinz Dorn, Doug Schmidt, and Prashant Jain
//
// ============================================================================
+#define protected public
#include "ace/Task.h"
#include "ace/Service_Config.h"
-
#include "ace/Task.h"
#include "test_config.h"
@@ -35,22 +36,35 @@ static size_t n_iterations = 100;
class Thread_Pool : public ACE_Task<ACE_MT_SYNCH>
{
public:
- Thread_Pool (ACE_Thread_Manager *thr_mgr, int n_threads);
+ Thread_Pool (int n_threads);
+ // Create the thread pool containing <n_threads>.
+ ~Thread_Pool (void);
+
+ virtual int open (void * = 0);
+ // Produce the messages that are consumed by the threads in the
+ // thread pool.
+
virtual int svc (void);
// Iterate <n_iterations> time printing off a message and "waiting"
// for all other threads to complete this iteration.
virtual int put (ACE_Message_Block *mb, ACE_Time_Value *tv = 0);
- // This allows the producer to pass messages to the <Thread_Pool>.
+ // Allows the producer to pass messages to the <Thread_Pool>.
private:
virtual int close (u_long);
+ // Close hook.
- // = Not needed for this test.
- virtual int open (void *) { return 0; }
+ ACE_Lock_Adapter<ACE_Thread_Mutex> lock_adapter_;
+ // Serialize access to <ACE_Message_Block> reference count, which
+ // will be decremented from multiple threads.
};
+Thread_Pool::~Thread_Pool (void)
+{
+}
+
int
Thread_Pool::close (u_long)
{
@@ -58,9 +72,7 @@ Thread_Pool::close (u_long)
return 0;
}
-Thread_Pool::Thread_Pool (ACE_Thread_Manager *thr_mgr,
- int n_threads)
- : ACE_Task<ACE_MT_SYNCH> (thr_mgr)
+Thread_Pool::Thread_Pool (int n_threads)
{
// Create worker threads.
if (this->activate (THR_NEW_LWP, n_threads) == -1)
@@ -75,42 +87,49 @@ Thread_Pool::put (ACE_Message_Block *mb, ACE_Time_Value *tv)
return this->putq (mb, tv);
}
-// Iterate <n_iterations> time printing off a message and "waiting"
-// for all other threads to complete this iteration.
+// Iterate <n_iterations> printing off a message and "waiting" for all
+// other threads to complete this iteration.
int
Thread_Pool::svc (void)
{
ACE_NEW_THREAD;
- // Note that the ACE_Task::svc_run () method automatically adds us to
- // the Thread_Manager when the thread begins.
-
- int count = 1;
+ // The <ACE_Task::svc_run()> method automatically adds us to the
+ // <ACE_Service_Config>'s <ACE_Thread_Manager> when the thread
+ // begins.
// Keep looping, reading a message out of the queue, until we get a
// message with a length == 0, which signals us to quit.
- for (;; count++)
+ for (int count = 1; ; count++)
{
ACE_Message_Block *mb;
+ ACE_DEBUG ((LM_DEBUG, "(%t) **** before head = %d, tail = %d\n",
+ this->msg_queue ()->head_,
+ this->msg_queue ()->tail_));
ACE_ASSERT (this->getq (mb) != -1);
+ ACE_DEBUG ((LM_DEBUG, "(%t) ++++ after head = %d, tail = %d\n",
+ this->msg_queue ()->head_,
+ this->msg_queue ()->tail_));
+
int length = mb->length ();
if (length > 0)
ACE_DEBUG ((LM_DEBUG,
- "(%t) in iteration %d, length = %d, text = \"%*s\"\n",
- count, length, length - 1, mb->rd_ptr ()));
+ "(%t) in iteration %d, queue len = %d, length = %d, text = \"%*s\"\n",
+ count, this->msg_queue ()->message_count (),
+ length, length - 1, mb->rd_ptr ()));
// We're responsible for deallocating this.
- delete mb;
+ mb->release ();
if (length == 0)
{
ACE_DEBUG ((LM_DEBUG,
- "(%t) in iteration %d, got NULL message, exiting\n",
- count));
+ "(%t) in iteration %d, queue len = %d, got NULL message, exiting\n",
+ count, this->msg_queue ()->message_count ()));
break;
}
}
@@ -120,17 +139,22 @@ Thread_Pool::svc (void)
return 0;
}
-static void
-produce (Thread_Pool &thread_pool)
+int
+Thread_Pool::open (void *)
{
- ACE_DEBUG ((LM_DEBUG, "(%t) producer start, dumping the Thread_Pool\n"));
- thread_pool.dump ();
+ ACE_DEBUG ((LM_DEBUG,
+ "(%t) producer start, dumping the Thread_Pool\n"));
+ this->dump ();
+
+ ACE_Message_Block *mb;
for (int n;;)
{
// Allocate a new message.
- ACE_Message_Block *mb;
- ACE_NEW (mb, ACE_Message_Block (BUFSIZ));
+ ACE_NEW_RETURN (mb,
+ ACE_Message_Block (BUFSIZ, ACE_Message_Block::MB_DATA,
+ 0, 0, 0, &this->lock_adapter_),
+ -1);
#if defined (manual)
ACE_DEBUG ((LM_DEBUG,
@@ -140,50 +164,53 @@ produce (Thread_Pool &thread_pool)
static size_t count = 0;
ACE_OS::sprintf (mb->rd_ptr (), "%d\n", count);
-
n = ACE_OS::strlen (mb->rd_ptr ());
- if (count == n_iterations)
- n = 1; // Indicate that we need to shut down.
+ if (count == n_iterations || n <= 1)
+ break;
else
count++;
if (count == 0 || (count % 20 == 0))
ACE_OS::sleep (1);
#endif /* manual */
- if (n > 1)
- {
- // Send a normal message to the waiting threads and continue
- // producing.
- mb->wr_ptr (n);
-
- // Pass the message to the Thread_Pool.
- if (thread_pool.put (mb) == -1)
- ACE_ERROR ((LM_ERROR, " (%t) %p\n", "put"));
- }
- else
- {
- // Send a shutdown message to the waiting threads and exit.
- ACE_DEBUG ((LM_DEBUG, "\n(%t) start loop, dump of task:\n"));
- thread_pool.dump ();
-
- for (int i = thread_pool.thr_count (); i > 0; i--)
- {
- ACE_DEBUG ((LM_DEBUG,
- "(%t) EOF, enqueueing NULL block for thread = %d\n",
- i));
-
- // Enqueue a NULL message to flag each consumer to
- // shutdown.
- if (thread_pool.put (new ACE_Message_Block) == -1)
- ACE_ERROR ((LM_ERROR, " (%t) %p\n", "put"));
- }
-
- ACE_DEBUG ((LM_DEBUG, "\n(%t) end loop, dump of task:\n"));
- thread_pool.dump ();
- break;
- }
+ // Send a normal message to the waiting threads and continue
+ // producing.
+ mb->wr_ptr (n);
+
+ // Pass the message to the Thread_Pool.
+ if (this->put (mb) == -1)
+ ACE_ERROR ((LM_ERROR, " (%t) %p\n", "put"));
}
+
+ // Send a shutdown message to the waiting threads and exit.
+ ACE_DEBUG ((LM_DEBUG,
+ "\n(%t) sending shutdown message to %d threads, dump of task:\n",
+ this->thr_count ()));
+ this->dump ();
+
+ ACE_NEW_RETURN (mb,
+ ACE_Message_Block (0, ACE_Message_Block::MB_DATA,
+ 0, 0, 0, &this->lock_adapter_),
+ -1);
+
+ for (int i = this->thr_count (); i > 0; i--)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "(%t) EOF, enqueueing NULL block for thread = %d\n",
+ i));
+
+ // Enqueue an empty message to flag each consumer to shutdown.
+ // Note that we use reference counting to avoid having to copy
+ // the message.
+ if (this->put (mb->duplicate ()) == -1)
+ ACE_ERROR ((LM_ERROR, " (%t) %p\n", "put"));
+ }
+
+ mb->release ();
+
+ ACE_DEBUG ((LM_DEBUG, "\n(%t) end loop, dump of task:\n"));
+ this->dump ();
}
#endif /* ACE_HAS_THREADS */
@@ -198,17 +225,21 @@ main (int, char *[])
ACE_DEBUG ((LM_DEBUG, "(%t) threads = %d\n", n_threads));
// Create the worker tasks.
- Thread_Pool thread_pool (ACE_Service_Config::thr_mgr (),
- n_threads);
+ Thread_Pool thread_pool (n_threads);
// Create work for the worker tasks to process in their own threads.
- produce (thread_pool);
+ thread_pool.open ();
// Wait for all the threads to reach their exit point.
- ACE_DEBUG ((LM_DEBUG, "(%t) waiting with thread manager...\n"));
+ ACE_DEBUG ((LM_DEBUG, "(%t) waiting for worker tasks to finish...\n"));
+
ACE_Service_Config::thr_mgr ()->wait ();
+ ACE_ASSERT (thread_pool.msg_queue ()->is_empty ());
+ ACE_DEBUG ((LM_DEBUG, "(%t) head = %d, tail = %d\n",
+ thread_pool.msg_queue ()->head_,
+ thread_pool.msg_queue ()->tail_));
ACE_DEBUG ((LM_DEBUG, "(%t) destroying worker tasks and exiting...\n"));
#else
ACE_ERROR ((LM_ERROR, "threads not supported on this platform\n"));
diff --git a/tests/UPIPE_SAP_Test.cpp b/tests/UPIPE_SAP_Test.cpp
index 64a851c28bf..1771575b3cb 100644
--- a/tests/UPIPE_SAP_Test.cpp
+++ b/tests/UPIPE_SAP_Test.cpp
@@ -59,7 +59,7 @@ connector (void *)
ACE_ASSERT (ACE_OS::strcmp (mb->rd_ptr (), "thanks") == 0);
// Free up the memory block.
- delete mb;
+ mb->release ();
// Now try the send()/recv() interface.
char mytext[] = "This string is sent by connector as a buffer";