diff options
author | schmidt <douglascraigschmidt@users.noreply.github.com> | 1997-01-04 00:06:38 +0000 |
---|---|---|
committer | schmidt <douglascraigschmidt@users.noreply.github.com> | 1997-01-04 00:06:38 +0000 |
commit | 70d108545611dbb86049d0109ef4a7ab1ef6289e (patch) | |
tree | 790c9b07d5eac35a82ae7d9f5e7b59a6243a4b2c | |
parent | 1c44106287219a05ddbff09df4574b90777040ae (diff) | |
download | ATCD-70d108545611dbb86049d0109ef4a7ab1ef6289e.tar.gz |
foo
41 files changed, 771 insertions, 452 deletions
diff --git a/ChangeLog-97a b/ChangeLog-97a index fa53d88624c..bf1a2eda28f 100644 --- a/ChangeLog-97a +++ b/ChangeLog-97a @@ -1,5 +1,87 @@ +Fri Jan 3 10:47:15 1997 Douglas C. Schmidt <schmidt@tango.cs.wustl.edu> + + * ace: Replaced all uses of "delete mb" with mb->release (); + + * ace/Stream_Modules.cpp: Replaced the use of explicit bit + twiddling with the ACE_BIT* macros. + + * ace/Message_Block.cpp: Make sure that we use the + allocator_strategy_ to create the memory for the reference count + since this may need to go into shared memory if that's the + memory pool where the Message_Block allocations are coming from. + + * ace/OS.h: Added two new macros, ACE_ALLOCATOR_RETURN and + ACE_ALLOCATOR, which are similar to ACE_NEW_RETURN and ACE_NEW, + except that these + + * ace/Message_Block.cpp (release): Make sure to "delete this" + outside the scope of the locking_strategy_. + + * ace/Service_Object.cpp: Added a destructor to ACE_Service_Type. + Thanks to Per.Andersson@hfera.ericsson.se (Per Andersson) for + suggesting this. + + * ace/Service_Object.i: Be smarter about how we reassign the name_ + pointer, i.e., delete the old one and make a copy. Thanks to + Per.Andersson@hfera.ericsson.se (Per Andersson) for reporting + this. + + * ace/Module.cpp (open): Rearranged the assignments to + reader_q->mod_ and writer_q->mod_ so that we don't try to + initialize through NULL pointers. Thanks to + Per.Andersson@hfera.ericsson.se (Per Andersson) for reporting + this. + + * ace/Service_Record.cpp (ACE_Service_Record): Initialized name_ + to NULL so that the following change works correctly now. + Thanks to Per.Andersson@hfera.ericsson.se (Per Andersson) for + reporting this. + + * ace/Service_Record.i (name): Make sure to delete [] (char *) + this->name_ before allocating a new one. Thanks to + Per.Andersson@hfera.ericsson.se (Per Andersson) for reporting + this. + + * ace/Message_Block: Reworked the reference counting implemention + so that reference counts are shared correctly amongst their + various owners. This requires making a deep copy the "header" + portion, but a shallow copy of the "data." + + * ace/Message_Block.cpp (ACE_Message_Block): Updated all three + ACE_Message_Block constructors so that they all call the init() + method. This centralizes all the initialization logic in one + place. + Thu Jan 2 00:42:21 1997 Douglas C. Schmidt <schmidt@tango.cs.wustl.edu> + * ace/Message_Block.cpp (ACE_Message_Block): Make sure to set the + cont_ field to 0 after "releasing" it so that we don't + mistakenly think it's still around later on. This problem arose + in the ACE_Message_Queue::close() method, which was trying to + count the number of bytes being freed. + + * ace/Message_Queue.cpp (close): Fixed a subtle bug where we + weren't actually deleting messages from the + ACE_Message_Queue::close() routine. This should work now... + + * ace/Message_Queue.cpp (close): Replaced the use of "delete mb" + with "mb->release()" since the Message_Blocks are now reference + counted. + + * ace/Message_Block: Enhanced the reference counting scheme so + that you can increment and decrement the count by an arbitrary + amount. This is particular useful when you know you'll be + sending the same Message_Block to N consumers. + + * ace/Singleton: The dump() must be used same as instance() + (without supplying an object) so it must be declarated *static*, + i.e., + + static void dump (void); + + Thanks to Sandro Doro <alex@aureus.sublink.org> for reporting + this. + * examples/ASX/Event_Server: Completely rewrote and retested the ACE Event Server example. The new code is *much* easier to understand, has many more comments, is more robust, and compiles @@ -672,6 +672,7 @@ Wayne Vucenic <wvucenic@netgate.net> Harry Gunnarsson <hg@carmenta.se> James CE Johnson <jcej@lads.com> Samuel_Bercovici <Samuel_Bercovici_at_EFT__AD2@mail.icomverse.com> +Per.Andersson@hfera.ericsson.se (Per Andersson) I would particularly like to thank Paul Stephenson, who worked with me at Ericsson and is now at ObjectSpace. Paul devised the recursive diff --git a/ace/Activation_Queue.cpp b/ace/Activation_Queue.cpp index c7a78713173..0544075fc6f 100644 --- a/ace/Activation_Queue.cpp +++ b/ace/Activation_Queue.cpp @@ -53,7 +53,7 @@ ACE_Activation_Queue::dequeue (ACE_Time_Value *tv) ACE_Method_Object *mo = (ACE_Method_Object *) mb->base (); // Delete the message block. - delete mb; + mb->release (); return mo; } else diff --git a/ace/IO_Cntl_Msg.cpp b/ace/IO_Cntl_Msg.cpp index 2b0d38b8355..ff27954a2aa 100644 --- a/ace/IO_Cntl_Msg.cpp +++ b/ace/IO_Cntl_Msg.cpp @@ -2,7 +2,9 @@ // $Id$ #if 0 -/* Forward decl */ +// This is not meant to be used, it's just a place holder... + +// Forward decl template <class SYNCH> class ACE_Module; diff --git a/ace/Message_Block.cpp b/ace/Message_Block.cpp index df4dc918cc6..647230929f9 100644 --- a/ace/Message_Block.cpp +++ b/ace/Message_Block.cpp @@ -68,77 +68,6 @@ ACE_Message_Block::dump (void) const ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP)); } -ACE_Message_Block::ACE_Message_Block (void) - : flags_ (0), - base_ (0), - cur_size_ (0), - max_size_ (0), - rd_ptr_ (0), - wr_ptr_ (0), - type_ (MB_NORMAL), - priority_ (0), - cont_ (0), - next_ (0), - prev_ (0), - allocator_strategy_ (0), - locking_strategy_ (0), - reference_count_ (1) - -{ - ACE_TRACE ("ACE_Message_Block::ACE_Message_Block"); -} - -ACE_Message_Block::ACE_Message_Block (size_t sz, - ACE_Message_Type msg_type, - ACE_Message_Block *msg_cont, - const char *msg_data, - ACE_Allocator *allocator_strategy, - ACE_Lock *locking_strategy) -{ - ACE_TRACE ("ACE_Message_Block::ACE_Message_Block"); - - if (this->init (sz, msg_type, msg_cont, msg_data, - allocator_strategy, locking_strategy) == -1) - ACE_ERROR ((LM_ERROR, "ACE_Message_Block")); -} - -ACE_Message_Block::~ACE_Message_Block (void) -{ - ACE_TRACE ("ACE_Message_Block::~ACE_Message_Block"); - - if (ACE_BIT_DISABLED (this->flags_, ACE_Message_Block::DONT_DELETE)) - { - if (this->allocator_strategy_) - this->allocator_strategy_->free ((void *) this->base_); - else - delete [] this->base_; - } - if (this->cont_) - this->cont_->release (); - this->prev_ = 0; - this->next_ = 0; -} - -ACE_Message_Block::ACE_Message_Block (const char *data, - size_t size) - : flags_ (ACE_Message_Block::DONT_DELETE), - base_ ((char *) data), - cur_size_ (size), - max_size_ (size), - rd_ptr_ (0), - wr_ptr_ (0), - type_ (MB_NORMAL), - priority_ (0), - cont_ (0), - next_ (0), - prev_ (0), - allocator_strategy_ (0), - locking_strategy_ (0), - reference_count_ (1) -{ - ACE_TRACE ("ACE_Message_Block::ACE_Message_Block"); -} - int ACE_Message_Block::size (size_t length) { @@ -154,14 +83,9 @@ ACE_Message_Block::size (size_t length) if (this->allocator_strategy_ == 0) ACE_NEW_RETURN (buf, char[length], -1); else // Use the allocator! - { - buf = (char *) this->allocator_strategy_->malloc (length); - if (buf == 0) - { - errno = ENOMEM; - return -1; - } - } + ACE_ALLOCATOR_RETURN (buf, + (char *) this->allocator_strategy_->malloc (length), + -1); if (ACE_BIT_DISABLED (this->flags_, ACE_Message_Block::DONT_DELETE)) { @@ -188,21 +112,80 @@ ACE_Message_Block::size (size_t length) return 0; } -int -ACE_Message_Block::init (const char *data, - size_t size) +ACE_Message_Block::~ACE_Message_Block (void) { - ACE_TRACE ("ACE_Message_Block::init"); - // Should we also initialize all the other fields, as well? - this->base_ = (char *) data; - this->cur_size_ = size; - this->max_size_ = size; - ACE_SET_BITS (this->flags_, ACE_Message_Block::DONT_DELETE); - return 0; + ACE_TRACE ("ACE_Message_Block::~ACE_Message_Block"); + + // Sanity check... + ACE_ASSERT ((*this->reference_count_ <= 1)); + + // Just to be safe... + *this->reference_count_ = 0; + + if (ACE_BIT_DISABLED (this->flags_, ACE_Message_Block::DONT_DELETE)) + { + if (this->allocator_strategy_) + this->allocator_strategy_->free ((void *) this->base_); + else + delete [] this->base_; + + this->base_ = 0; + } + + // Free up all the continuation messages. + if (this->cont_) + { + this->cont_->release (); + this->cont_ = 0; + } + + this->prev_ = 0; + this->next_ = 0; + + if (this->allocator_strategy_ == 0) + delete this->reference_count_; + else + this->allocator_strategy_->free ((void *) this->reference_count_); + + this->reference_count_ = 0; +} + +ACE_Message_Block::ACE_Message_Block (const char *data, + size_t size) +{ + ACE_TRACE ("ACE_Message_Block::ACE_Message_Block"); + + if (this->init_i (size, MB_NORMAL, 0, data, + 0, 0, 0, ACE_Message_Block::DONT_DELETE) == -1) + ACE_ERROR ((LM_ERROR, "ACE_Message_Block")); +} + +ACE_Message_Block::ACE_Message_Block (size_t size, + ACE_Message_Type msg_type, + ACE_Message_Block *msg_cont, + const char *msg_data, + ACE_Allocator *allocator_strategy, + ACE_Lock *locking_strategy) +{ + ACE_TRACE ("ACE_Message_Block::ACE_Message_Block"); + + if (this->init_i (size, msg_type, msg_cont, msg_data, + allocator_strategy, locking_strategy, + 0, ACE_Message_Block::DONT_DELETE) == -1) + ACE_ERROR ((LM_ERROR, "ACE_Message_Block")); +} + +ACE_Message_Block::ACE_Message_Block (void) +{ + ACE_TRACE ("ACE_Message_Block::ACE_Message_Block"); + + if (this->init_i (0, MB_NORMAL, 0, 0, 0, 0, 0, + ACE_Message_Block::DONT_DELETE) == -1) + ACE_ERROR ((LM_ERROR, "ACE_Message_Block")); } int -ACE_Message_Block::init (size_t sz, +ACE_Message_Block::init (size_t size, ACE_Message_Type msg_type, ACE_Message_Block *msg_cont, const char *msg_data, @@ -210,34 +193,73 @@ ACE_Message_Block::init (size_t sz, ACE_Lock *locking_strategy) { ACE_TRACE ("ACE_Message_Block::init"); - this->flags_ = 0; + + return this->init_i (size, msg_type, msg_cont, msg_data, + allocator_strategy, locking_strategy, + 0, msg_data ? ACE_Message_Block::DONT_DELETE : 0); +} + +int +ACE_Message_Block::init (const char *data, + size_t size) +{ + ACE_TRACE ("ACE_Message_Block::init"); + // Should we also initialize all the other fields, as well? + + return this->init_i (size, MB_NORMAL, 0, data, + 0, 0, 0, + ACE_Message_Block::DONT_DELETE); +} + +ACE_Message_Block::ACE_Message_Block (size_t size, + ACE_Message_Type msg_type, + ACE_Message_Block *msg_cont, + const char *msg_data, + ACE_Allocator *allocator_strategy, + ACE_Lock *locking_strategy, + int *reference_count, + Message_Flags flags) +{ + ACE_TRACE ("ACE_Message_Block::ACE_Message_Block"); + + ACE_TRACE ("ACE_Message_Block::ACE_Message_Block"); + + if (this->init_i (size, msg_type, msg_cont, msg_data, + allocator_strategy, locking_strategy, + reference_count, flags) == -1) + ACE_ERROR ((LM_ERROR, "ACE_Message_Block")); +} + +int +ACE_Message_Block::init_i (size_t size, + ACE_Message_Type msg_type, + ACE_Message_Block *msg_cont, + const char *msg_data, + ACE_Allocator *allocator_strategy, + ACE_Lock *locking_strategy, + int *reference_count, + Message_Flags flags) +{ + ACE_TRACE ("ACE_Message_Block::init_i"); + + this->flags_ = flags; if (msg_data == 0) { - if (allocator_strategy == 0) - { - this->allocator_strategy_ = 0; - ACE_NEW_RETURN (this->base_, char[sz], -1); - } + this->allocator_strategy_ = allocator_strategy; + + if (this->allocator_strategy_ == 0) + ACE_NEW_RETURN (this->base_, char[size], -1); else // Use the allocator! - { - this->allocator_strategy_ = allocator_strategy; - this->base_ = (char *) this->allocator_strategy_->malloc (sz); - if (this->base_ == 0) - { - errno = ENOMEM; - return -1; - } - } + ACE_ALLOCATOR_RETURN (this->base_, + (char *) this->allocator_strategy_->malloc (size), + -1); } else - { - this->base_ = (char *) msg_data; - ACE_SET_BITS (this->flags_, ACE_Message_Block::DONT_DELETE); - } + this->base_ = (char *) msg_data; - this->cur_size_ = sz; - this->max_size_ = sz; + this->cur_size_ = size; + this->max_size_ = size; this->rd_ptr_ = this->base_; this->wr_ptr_ = this->base_; this->priority_ = 0; @@ -246,73 +268,68 @@ ACE_Message_Block::init (size_t sz, this->next_ = 0; this->prev_ = 0; this->locking_strategy_ = locking_strategy; - this->reference_count_ = 1; - return 0; -} - -ACE_Message_Block * -ACE_Message_Block::clone (Message_Flags mask) const -{ - // You always want to clear this one to prevent memory leaks but you - // might add some others later. - const Message_Flags always_clear = ACE_Message_Block::DONT_DELETE; - - ACE_TRACE ("ACE_Message_Block::clone"); - ACE_Message_Block *nb; - - ACE_NEW_RETURN (nb, - ACE_Message_Block (this->max_size_, this->type_, - 0, 0, this->allocator_strategy_), - 0); - - ACE_OS::memcpy (nb->base_, this->base_, this->max_size_); + this->reference_count_ = 0; - nb->rd_ptr (this->rd_ptr_ - this->base_); - nb->wr_ptr (this->wr_ptr_ - this->base_); - - // Set new flags minus the mask... - nb->set_flags (this->flags ()); - nb->clr_flags (mask | always_clear); + if (reference_count == 0) + { + if (this->allocator_strategy_ == 0) + ACE_NEW_RETURN (this->reference_count_, int, -1); + else + ACE_ALLOCATOR_RETURN (this->reference_count_, + (int *) this->allocator_strategy_->malloc (sizeof (int)), + -1); + *this->reference_count_ = 1; + } + else + // Just assign the pointer so that all owners will share the same + // reference count. + this->reference_count_ = reference_count; - if (this->cont_ != 0) - nb->cont_ = this->cont_->clone (mask); - return nb; + return 0; } ACE_Message_Block * -ACE_Message_Block::release (void) +ACE_Message_Block::release (void) { ACE_TRACE ("ACE_Message_Block::release"); - ACE_Message_Block *result = 0; + ACE_Message_Block *result; + // If there's a locking strategy then we need to acquire the lock + // before decrementing the count. if (this->locking_strategy_) { - // We need to acquire the lock before decrementing the count. this->locking_strategy_->acquire (); - this->reference_count_--; - if (this->reference_count_ == 0) - delete this; - else if (this->reference_count_ > 0) + ACE_ASSERT ((*this->reference_count_) > 0); + + (*this->reference_count_)--; + + if ((*this->reference_count_) == 0) + result = 0; + else // if ((*this->reference_count_) > 0) result = this; - // ACE_ASSERT (this->reference_count_ <= 0) - // This shouldn't happen... this->locking_strategy_->release (); } else { - this->reference_count_--; + ACE_ASSERT ((*this->reference_count_) >= 0); + + (*this->reference_count_)--; - if (this->reference_count_ == 0) - delete this; - else if (this->reference_count_ > 0) + if ((*this->reference_count_) == 0) + result = 0; + else // if ((*this->reference_count_) > 0) result = this; - // ACE_ASSERT (this->reference_count_ <= 0) - // This shouldn't happen... } + // We must delete this outside the scope of the locking_strategy_ + // since otherwise we'd be trying to "release" through a deleted + // pointer! + if (result == 0) + delete this; + return result; } @@ -330,16 +347,84 @@ ACE_Message_Block::duplicate (void) { ACE_TRACE ("ACE_Message_Block::duplicate"); + void *memory; + + if (this->allocator_strategy_ == 0) + ACE_NEW_RETURN (memory, char[sizeof (ACE_Message_Block)], 0); + else + { + memory = this->allocator_strategy_->malloc (sizeof (ACE_Message_Block)); + + if ( + } + + ACE_NEW_RETURN (nb, + ACE_Message_Block (this->max_size_, + this->type_, + 0, + this->base_, + this->allocator_strategy_, + this->locking_strategy_, + this->reference_count_, + this->flags_), + 0); + + ACE_Message_Block *nb; + + + + // Create a new <ACE_Message_Block>, but share the <base_> pointer + // data (i.e., don't copy that). if (this->locking_strategy_) { // We need to acquire the lock before incrementing the count. this->locking_strategy_->acquire (); - this->reference_count_++; + (*this->reference_count_)++; this->locking_strategy_->release (); } else - this->reference_count_++; + (*this->reference_count_)++; + + // Increment the reference counts of all the continuation + // messages. + if (this->cont_) + this->cont_ = this->cont_->duplicate (); + + return nb; +} - return this; +ACE_Message_Block * +ACE_Message_Block::clone (Message_Flags mask) const +{ + ACE_TRACE ("ACE_Message_Block::clone"); + + // You always want to clear this one to prevent memory leaks but you + // might add some others later. + const Message_Flags always_clear = ACE_Message_Block::DONT_DELETE; + + ACE_Message_Block *nb; + + ACE_NEW_RETURN (nb, + ACE_Message_Block (this->max_size_, + this->type_, + 0, 0, + this->allocator_strategy_, + this->locking_strategy_, + this->reference_count_, + this->flags_), + 0); + + ACE_OS::memcpy (nb->base_, this->base_, this->max_size_); + + nb->rd_ptr (this->rd_ptr_ - this->base_); + nb->wr_ptr (this->wr_ptr_ - this->base_); + + // Set new flags minus the mask... + nb->clr_flags (mask | always_clear); + + if (this->cont_ != 0) + nb->cont_ = this->cont_->clone (mask); + + return nb; } diff --git a/ace/Message_Block.h b/ace/Message_Block.h index ba1a760b2d1..2df9b3aceb7 100644 --- a/ace/Message_Block.h +++ b/ace/Message_Block.h @@ -157,13 +157,12 @@ public: // = Reference counting methods. ACE_Message_Block *duplicate (void); - // Increment our reference count by one. + // Increase our reference count by 1. ACE_Message_Block *release (void); - // Decrement our reference count by one. If the reference count is - // > 0 then return this; else if reference count == 0 then delete - // <this> and return 0. Behavior is undefined if reference count < - // 0. + // Decrease our reference count by 1. If the reference count is > 0 + // then return this; else if reference count == 0 then delete <this> + // and return 0. Behavior is undefined if reference count < 0. static ACE_Message_Block *release (ACE_Message_Block *mb); // This behaves like the non-static method <release>, except that it @@ -252,6 +251,27 @@ public: // Declare the dynamic allocation hooks. private: + // = Internal initialization methods. + ACE_Message_Block (size_t size, + ACE_Message_Type type, + ACE_Message_Block *cont, + const char *data, + ACE_Allocator *allocator, + ACE_Lock *locking_strategy, + int *reference_count, + Message_Flags flags); + // Perform the actual initialization. + + int init_i (size_t size, + ACE_Message_Type type, + ACE_Message_Block *cont, + const char *data, + ACE_Allocator *allocator, + ACE_Lock *locking_strategy, + int *reference_count, + Message_Flags flags); + // Perform the actual initialization. + Message_Flags flags_; // Misc flags (e.g., DONT_DELETE and USER_FLAGS). @@ -286,16 +306,22 @@ private: ACE_Message_Block *prev_; // Pointer to previous message in the list. + // = Strategies. ACE_Allocator *allocator_strategy_; - // Pointer to the allocator defined for this message block. + // Pointer to the allocator defined for this message block. Note + // that this pointer is shared by all owners of this <Message_Block>. ACE_Lock *locking_strategy_; // Pointer to the locking defined for this message block. This is - // used to protect regions of code containing - - size_t reference_count_; - // Reference count for this <Message_Block> which is used to avoid - // deep copies (i.e., <clone>). + // used to protect regions of code that access shared + // <ACE_Message_Block> state. Note that this lock is shared by all + // owners of the <Message_Block>'s data. + + int *reference_count_; + // Pointer to a reference count for this <Message_Block>, which is + // used to avoid deep copies (i.e., <clone>). Note that this + // pointer value is shared by all owners of the <Message_Block>'s + // data. // = Disallow these operations for now (use <clone> instead). ACE_Message_Block &operator= (const ACE_Message_Block &); diff --git a/ace/Message_Queue.cpp b/ace/Message_Queue.cpp index bb3980bffdb..83d93b1f4e4 100644 --- a/ace/Message_Queue.cpp +++ b/ace/Message_Queue.cpp @@ -24,11 +24,15 @@ ACE_Message_Queue<ACE_SYNCH_2>::dump (void) const "high_water_mark = %d\n" "cur_bytes = %d\n" "cur_count = %d\n", + "head_ = %u\n", + "tail_ = %u\n", this->deactivated_, this->low_water_mark_, this->high_water_mark_, this->cur_bytes_, - this->cur_count_)); + this->cur_count_, + this->head_, + this->tail_)); ACE_DEBUG ((LM_DEBUG,"notfull_cond: \n")); notfull_cond_.dump(); ACE_DEBUG ((LM_DEBUG,"notempty_cond: \n")); @@ -74,7 +78,6 @@ ACE_Message_Queue<ACE_SYNCH_2>::open (size_t hwm, this->cur_count_ = 0; this->tail_ = 0; this->head_ = 0; - this->notification_strategy_ = ns; return 0; } @@ -121,18 +124,22 @@ ACE_Message_Queue<ACE_SYNCH_2>::close (void) for (this->tail_ = 0; this->head_ != 0; ) { + this->cur_count_--; + ACE_Message_Block *temp; - // Make sure we decrement all the counts. + // Decrement all the counts. for (temp = this->head_; temp != 0; temp = temp->cont ()) this->cur_bytes_ -= temp->size (); - this->cur_count_--; - + temp = this->head_; this->head_ = this->head_->next (); - delete temp; + + // Make sure to use <release> rather than <delete> since this is + // reference counted. + temp->release (); } return res; @@ -160,7 +167,6 @@ ACE_Message_Queue<ACE_SYNCH_2>::enqueue_tail_i (ACE_Message_Block *new_item) // Link at the end. else { - new_item->next (0); this->tail_->next (new_item); new_item->prev (this->tail_); diff --git a/ace/Module.cpp b/ace/Module.cpp index c0d30dc1ccf..2c03705c28e 100644 --- a/ace/Module.cpp +++ b/ace/Module.cpp @@ -98,10 +98,6 @@ ACE_Module<ACE_SYNCH_2>::open (const char *mod_name, this->reader (reader_q); this->writer (writer_q); - // Setup back pointers. - reader_q->mod_ = this; - writer_q->mod_ = this; - // Save the flags this->flags_ = flags; @@ -113,14 +109,15 @@ ACE_Module<ACE_SYNCH_2>::open (const char *mod_name, this->close_i (0, M_DELETE_READER); this->close_i (1, M_DELETE_WRITER); - // Reset back pointers. - reader_q->mod_ = 0; - writer_q->mod_ = 0; - errno = ENOMEM; return -1; } + // Setup back pointers (this must come last, after we've made sure + // there's memory allocated here. + reader_q->mod_ = this; + writer_q->mod_ = this; + return 0; } @@ -2456,11 +2456,19 @@ private: #define ACE_NEW_RETURN(POINTER,CONSTRUCTOR,RET_VAL) \ do { POINTER = new CONSTRUCTOR; \ if (POINTER == 0) { errno = ENOMEM; return RET_VAL; } \ - } while (0) + } while (0) #define ACE_NEW(POINTER,CONSTRUCTOR) \ do { POINTER = new CONSTRUCTOR; \ if (POINTER == 0) { errno = ENOMEM; return; } \ - } while (0) + } while (0) +#define ACE_ALLOCATOR_RETURN(POINTER,ALLOCATOR,RET_VAL) \ + do { POINTER = ALLOCATOR; \ + if (POINTER == 0) { errno = ENOMEM; return RET_VAL; } \ + } while (0) +#define ACE_ALLOCATOR(POINTER,ALLOCATOR) \ + do { POINTER = ALLOCATOR; \ + if (POINTER == 0) { errno = ENOMEM; return; } \ + } while (0) #define ACE_DEFAULT_MUTEX_A "ACE_MUTEX" diff --git a/ace/ReactorEx.cpp b/ace/ReactorEx.cpp index bae99ddafaf..9472a4e72cb 100644 --- a/ace/ReactorEx.cpp +++ b/ace/ReactorEx.cpp @@ -357,9 +357,10 @@ ACE_ReactorEx_Notify::handle_signal (int signum, ACE_Notification_Buffer *buffer = (ACE_Notification_Buffer *) mb->base (); - // If eh == 0 then we've got major problems! Otherwise, we need - // to dispatch the appropriate handle_* method on the + // If eh == 0 then we've got major problems! Otherwise, we + // need to dispatch the appropriate handle_* method on the // ACE_Event_Handler pointer we've been passed. + if (buffer->eh_ != 0) { int result = 0; @@ -385,7 +386,7 @@ ACE_ReactorEx_Notify::handle_signal (int signum, } // Make sure to delete the memory regardless of success or // failure! - delete mb; + mb->release (); } } } @@ -409,7 +410,7 @@ ACE_ReactorEx_Notify::notify (ACE_Event_Handler *eh, if (this->message_queue_.enqueue_tail (mb) == -1) { - delete mb; + mb->release (); return -1; } } diff --git a/ace/Service_Object.cpp b/ace/Service_Object.cpp index e0defc425a7..5b8e80fa533 100644 --- a/ace/Service_Object.cpp +++ b/ace/Service_Object.cpp @@ -1,6 +1,7 @@ -// Service_Object.cpp // $Id$ +// Service_Object.cpp + #define ACE_BUILD_DLL #include "ace/Service_Object.h" @@ -10,8 +11,6 @@ ACE_ALLOC_HOOK_DEFINE(ACE_Service_Object) -/* Provide the abstract base class common to all services */ - ACE_Service_Object::ACE_Service_Object (void) { ACE_TRACE ("ACE_Service_Object::ACE_Service_Object"); @@ -48,10 +47,20 @@ ACE_Service_Type::ACE_Service_Type (const void *so, const char *s_name, unsigned int f) : obj_ (so), - flags_ (f) + flags_ (f), + name_ (0) { ACE_TRACE ("ACE_Service_Type::ACE_Service_Type"); - this->name (ACE_OS::strcpy (new char[::strlen (s_name) + 1], s_name)); + this->name (s_name); +} + +ACE_Service_Type::~ACE_Service_Type (void) +{ + ACE_TRACE ("ACE_Service_Type::~ACE_Service_Type"); + + // It's ok to call this, even though we may have already deleted it + // in the fini() method since it would then be NULL. + delete [] (char *) this->name_; } int @@ -62,10 +71,13 @@ ACE_Service_Type::fini (void) const this->name_, this->flags_)); delete [] (char *) this->name_; + ((ACE_Service_Type *) this)->name_ = 0; + if (ACE_BIT_ENABLED (this->flags_, ACE_Service_Type::DELETE_OBJ)) delete (void *) this->object (); + if (ACE_BIT_ENABLED (this->flags_, ACE_Service_Type::DELETE_THIS)) - delete (void *) this; // Prevent object's destructor from being called... + delete (ACE_Service_Type *) this; + return 0; } - diff --git a/ace/Service_Object.h b/ace/Service_Object.h index f06fc88ffb4..27949aa6737 100644 --- a/ace/Service_Object.h +++ b/ace/Service_Object.h @@ -47,10 +47,11 @@ public: DELETE_THIS = 2 // Delete the enclosing object. }; - // = Initialization method. + // = Initialization and termination methods. ACE_Service_Type (const void *object, const char *s_name, u_int flags = 0); + ~ACE_Service_Type (void); // = Pure virtual interface (must be defined by the subclass). virtual int suspend (void) const = 0; diff --git a/ace/Service_Object.i b/ace/Service_Object.i index 530757b6363..e85fd0323bb 100644 --- a/ace/Service_Object.i +++ b/ace/Service_Object.i @@ -21,6 +21,8 @@ ACE_INLINE void ACE_Service_Type::name (const char *n) { ACE_TRACE ("ACE_Service_Type::name"); - this->name_ = n; -} + delete [] (char *) this->name_; + ACE_NEW (this->name_, char[::strlen (n) + 1]); + ACE_OS::strcpy ((char *) this->name_, n); +} diff --git a/ace/Service_Record.cpp b/ace/Service_Record.cpp index 21cd5462e71..10feb4105ae 100644 --- a/ace/Service_Record.cpp +++ b/ace/Service_Record.cpp @@ -4,9 +4,6 @@ #define ACE_BUILD_DLL #include "ace/Service_Record.h" -#if !defined (ACE_SERVICE_RECORD_C) -#define ACE_SERVICE_RECORD_C - #if !defined (__ACE_INLINE__) #include "ace/Service_Record.i" #endif /* __ACE_INLINE__ */ @@ -311,7 +308,10 @@ ACE_Service_Record::ACE_Service_Record (const char *n, ACE_Service_Type *t, const void *h, int active) - : type_ (t), handle_ (h), active_ (active) + : type_ (t), + handle_ (h), + active_ (active), + name_ (0) { ACE_TRACE ("ACE_Service_Record::ACE_Service_Record"); this->name (n); @@ -361,5 +361,3 @@ template class ACE_Thru_Task<ACE_SYNCH>; template class ACE_Stream_Head<ACE_SYNCH>; template class ACE_Stream_Tail<ACE_SYNCH>; #endif /* ACE_TEMPLATES_REQUIRE_SPECIALIZATION */ - -#endif /* ACE_SERVICE_RECORD_C */ diff --git a/ace/Service_Record.i b/ace/Service_Record.i index 0afc4051ee2..2f1de0654fc 100644 --- a/ace/Service_Record.i +++ b/ace/Service_Record.i @@ -49,6 +49,8 @@ ACE_INLINE void ACE_Service_Record::name (const char *n) { ACE_TRACE ("ACE_Service_Record::name"); + + delete [] (char *) this->name_; this->name_ = ACE_OS::strcpy (new char [::strlen (n) + 1], n); } diff --git a/ace/Singleton.cpp b/ace/Singleton.cpp index 5d6a6e8ea31..05fa10e7a5a 100644 --- a/ace/Singleton.cpp +++ b/ace/Singleton.cpp @@ -13,13 +13,12 @@ #endif /* __ACE_INLINE__ */ template <class TYPE, class LOCK> void -ACE_Singleton<TYPE, LOCK>::dump (void) const +ACE_Singleton<TYPE, LOCK>::dump (void) { ACE_TRACE ("ACE_Singleton<TYPE, LOCK>::dump"); - ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this)); #if !defined (ACE_LACKS_STATIC_DATA_MEMBER_TEMPLATES) - ACE_DEBUG ((LM_DEBUG, "instance_ = %x", this->instance_)); + ACE_DEBUG ((LM_DEBUG, "instance_ = %x", instance_)); ace_singleton_lock_.dump (); ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP)); #endif /* ACE_LACKS_STATIC_DATA_MEMBER_TEMPLATES */ diff --git a/ace/Singleton.h b/ace/Singleton.h index 7761acf53c4..a1503183300 100644 --- a/ace/Singleton.h +++ b/ace/Singleton.h @@ -35,7 +35,7 @@ public: static TYPE *instance (void); // Global access point to the Singleton. - void dump (void) const; + static void dump (void); // Dump the state of the object. protected: diff --git a/ace/Stream.cpp b/ace/Stream.cpp index d345c0f7662..92b8e423bca 100644 --- a/ace/Stream.cpp +++ b/ace/Stream.cpp @@ -348,26 +348,28 @@ ACE_Stream<ACE_SYNCH_2>::control (ACE_IO_Cntl_Msg::ACE_IO_Cntl_Cmds cmd, ACE_TRACE ("ACE_Stream<ACE_SYNCH_2>::control"); ACE_IO_Cntl_Msg ioc (cmd); - // Create a data block that contains the user-supplied data. - ACE_Message_Block *db = - new ACE_Message_Block (sizeof (int), - ACE_Message_Block::MB_IOCTL, - 0, - (char *) a); + ACE_Message_Block *db = 0; + + // Try to create a data block that contains the user-supplied data. + ACE_NEW_RETURN (db, ACE_Message_Block (sizeof (int), + ACE_Message_Block::MB_IOCTL, + 0, + (char *) a), -1); - // Create a control block that contains the control field and a - // pointer to the data block. + // Try to create a control block <cb> that contains the control + // field and a pointer to the data block <db> in <cb>'s continuation + // field. ACE_Message_Block *cb = new ACE_Message_Block (sizeof ioc, ACE_Message_Block::MB_IOCTL, db, (char *) &ioc); - // Make sure all of the allocation succeeded before continuing. - if (db == 0 || cb == 0) + // If we can't allocate <cb> then we need to delete db and return + // -1. + if (cb == 0) { - delete cb; - delete db; + db->release (); errno = ENOMEM; return -1; } @@ -381,7 +383,9 @@ ACE_Stream<ACE_SYNCH_2>::control (ACE_IO_Cntl_Msg::ACE_IO_Cntl_Cmds cmd, else result = ((ACE_IO_Cntl_Msg *) cb->rd_ptr ())->rval (); - delete cb; // This also deletes db... + // This will also release db if it's reference count == 0. + cb->release (); + return result; } diff --git a/ace/Stream_Modules.cpp b/ace/Stream_Modules.cpp index 54c6c854435..edea345250b 100644 --- a/ace/Stream_Modules.cpp +++ b/ace/Stream_Modules.cpp @@ -74,7 +74,7 @@ ACE_Stream_Head<ACE_SYNCH_2>::control (ACE_Message_Block *mb) return ioc->rval (); } -/* Performs canonical flushing at the ACE_Stream Head */ +// Performs canonical flushing at the ACE_Stream Head. template <ACE_SYNCH_1> int ACE_Stream_Head<ACE_SYNCH_2>::canonical_flush (ACE_Message_Block *mb) @@ -82,20 +82,22 @@ ACE_Stream_Head<ACE_SYNCH_2>::canonical_flush (ACE_Message_Block *mb) ACE_TRACE ("ACE_Stream_Head<ACE_SYNCH_2>::canonical_flush"); char *cp = mb->rd_ptr (); - if (*cp & ACE_Task_Flags::ACE_FLUSHR) + if (ACE_BIT_ENABLED (*cp, ACE_Task_Flags::ACE_FLUSHR)) { this->flush (ACE_Task_Flags::ACE_FLUSHALL); - *cp &= ~ACE_Task_Flags::ACE_FLUSHR; + ACE_CLR_BITS (*cp, ACE_Task_Flags::ACE_FLUSHR); } - if (*cp & ACE_Task_Flags::ACE_FLUSHW) + + if (ACE_BIT_ENABLED (*cp, ACE_Task_Flags::ACE_FLUSHW)) return this->reply (mb); else - delete mb; + mb->release (); return 0; } template <ACE_SYNCH_1> int -ACE_Stream_Head<ACE_SYNCH_2>::put (ACE_Message_Block *mb, ACE_Time_Value *tv) +ACE_Stream_Head<ACE_SYNCH_2>::put (ACE_Message_Block *mb, + ACE_Time_Value *tv) { ACE_TRACE ("ACE_Stream_Head<ACE_SYNCH_2>::put"); int res = 0; @@ -105,10 +107,8 @@ ACE_Stream_Head<ACE_SYNCH_2>::put (ACE_Message_Block *mb, ACE_Time_Value *tv) return res; if (this->is_writer ()) - { - return this->put_next (mb, tv); - } - else /* this->is_reader () */ + return this->put_next (mb, tv); + else // this->is_reader () { switch (mb->msg_type ()) { @@ -215,7 +215,7 @@ ACE_Stream_Tail<ACE_SYNCH_2>::control (ACE_Message_Block *mb) return this->reply (mb); } -/* Perform flush algorithm as though we were the driver */ +// Perform flush algorithm as though we were the driver. template <ACE_SYNCH_1> int ACE_Stream_Tail<ACE_SYNCH_2>::canonical_flush (ACE_Message_Block *mb) @@ -223,27 +223,29 @@ ACE_Stream_Tail<ACE_SYNCH_2>::canonical_flush (ACE_Message_Block *mb) ACE_TRACE ("ACE_Stream_Tail<ACE_SYNCH_2>::canonical_flush"); char *cp = mb->rd_ptr (); - if (*cp & ACE_Task_Flags::ACE_FLUSHW) + if (ACE_BIT_ENABLED (*cp, ACE_Task_Flags::ACE_FLUSHW)) { this->flush (ACE_Task_Flags::ACE_FLUSHALL); - *cp &= ~ACE_Task_Flags::ACE_FLUSHW; + ACE_BIT_CLR (*cp, ACE_Task_Flags::ACE_FLUSHW); } - if (*cp & ACE_Task_Flags::ACE_FLUSHR) + + if (ACE_BIT_ENABLED (*cp, ACE_Task_Flags::ACE_FLUSHR)) { this->sibling ()->flush (ACE_Task_Flags::ACE_FLUSHALL); return this->reply (mb); } else - delete mb; + mb->release (); + return 0; } template <ACE_SYNCH_1> int -ACE_Stream_Tail<ACE_SYNCH_2>::put (ACE_Message_Block *mb, ACE_Time_Value * - -) +ACE_Stream_Tail<ACE_SYNCH_2>::put (ACE_Message_Block *mb, + ACE_Time_Value *) { ACE_TRACE ("ACE_Stream_Tail<ACE_SYNCH_2>::put"); + if (this->is_writer ()) { switch (mb->msg_type ()) @@ -252,7 +254,7 @@ ACE_Stream_Tail<ACE_SYNCH_2>::put (ACE_Message_Block *mb, ACE_Time_Value * return this->control (mb); /* NOTREACHED */ default: - delete mb; + mb->release (); } } diff --git a/ace/Synch_T.i b/ace/Synch_T.i index 252b90633f1..336a0805b1c 100644 --- a/ace/Synch_T.i +++ b/ace/Synch_T.i @@ -5,6 +5,74 @@ #include "ace/Thread.h" +// Explicitly destroy the lock. +template <class LOCKING_MECHANISM> ACE_INLINE int +ACE_Lock_Adapter<LOCKING_MECHANISM>::remove (void) +{ + return this->lock_.remove (); +} + +// Block the thread until the lock is acquired. +template <class LOCKING_MECHANISM> ACE_INLINE int +ACE_Lock_Adapter<LOCKING_MECHANISM>::acquire (void) +{ + return this->lock_.acquire (); +} + +// Conditionally acquire the lock (i.e., won't block). + +template <class LOCKING_MECHANISM> ACE_INLINE int +ACE_Lock_Adapter<LOCKING_MECHANISM>::tryacquire (void) +{ + return this->lock_.tryacquire (); +} + +// Release the lock. + +template <class LOCKING_MECHANISM> ACE_INLINE int +ACE_Lock_Adapter<LOCKING_MECHANISM>::release (void) +{ + return this->lock_.release (); +} + +// Block until the thread acquires a read lock. If the locking +// mechanism doesn't support read locks then this just calls +// <acquire>. + +template <class LOCKING_MECHANISM> ACE_INLINE int +ACE_Lock_Adapter<LOCKING_MECHANISM>::acquire_read (void) +{ + return this->lock_.acquire_read (); +} + +// Block until the thread acquires a write lock. If the locking +// mechanism doesn't support read locks then this just calls +// <acquire>. + +template <class LOCKING_MECHANISM> ACE_INLINE int +ACE_Lock_Adapter<LOCKING_MECHANISM>::acquire_write (void) +{ + return this->lock_.acquire_write (); +} + +// Conditionally acquire a read lock. If the locking mechanism +// doesn't support read locks then this just calls <acquire>. + +template <class LOCKING_MECHANISM> ACE_INLINE int +ACE_Lock_Adapter<LOCKING_MECHANISM>::tryacquire_read (void) +{ + return this->lock_.tryacquire_read (); +} + +// Conditionally acquire a write lock. If the locking mechanism +// doesn't support write locks then this just calls <acquire>. + +template <class LOCKING_MECHANISM> ACE_INLINE int +ACE_Lock_Adapter<LOCKING_MECHANISM>::tryacquire_write (void) +{ + return this->lock_.tryacquire_write (); +} + template <class LOCK, class TYPE> ACE_INLINE ACE_Atomic_Op<LOCK, TYPE>::ACE_Atomic_Op (const ACE_Atomic_Op<LOCK, TYPE> &rhs) { @@ -199,72 +267,4 @@ ACE_TSS<TYPE>::ts_get (void) const return (TYPE *) &this->type_; } -// Explicitly destroy the lock. -template <class LOCKING_MECHANISM> int -ACE_Lock_Adapter<LOCKING_MECHANISM>::remove (void) -{ - return this->lock_.remove (); -} - -// Block the thread until the lock is acquired. -template <class LOCKING_MECHANISM> int -ACE_Lock_Adapter<LOCKING_MECHANISM>::acquire (void) -{ - return this->lock_.acquire (); -} - -// Conditionally acquire the lock (i.e., won't block). - -template <class LOCKING_MECHANISM> int -ACE_Lock_Adapter<LOCKING_MECHANISM>::tryacquire (void) -{ - return this->lock_.tryacquire (); -} - -// Release the lock. - -template <class LOCKING_MECHANISM> int -ACE_Lock_Adapter<LOCKING_MECHANISM>::release (void) -{ - return this->lock_.release (); -} - -// Block until the thread acquires a read lock. If the locking -// mechanism doesn't support read locks then this just calls -// <acquire>. - -template <class LOCKING_MECHANISM> int -ACE_Lock_Adapter<LOCKING_MECHANISM>::acquire_read (void) -{ - return this->lock_.acquire_read (); -} - -// Block until the thread acquires a write lock. If the locking -// mechanism doesn't support read locks then this just calls -// <acquire>. - -template <class LOCKING_MECHANISM> int -ACE_Lock_Adapter<LOCKING_MECHANISM>::acquire_write (void) -{ - return this->lock_.acquire_write (); -} - -// Conditionally acquire a read lock. If the locking mechanism -// doesn't support read locks then this just calls <acquire>. - -template <class LOCKING_MECHANISM> int -ACE_Lock_Adapter<LOCKING_MECHANISM>::tryacquire_read (void) -{ - return this->lock_.tryacquire_read (); -} - -// Conditionally acquire a write lock. If the locking mechanism -// doesn't support read locks then this just calls <acquire>. - -template <class LOCKING_MECHANISM> int -ACE_Lock_Adapter<LOCKING_MECHANISM>::acquire_write (void) -{ - return this->lock_.acquire_write (); -} - #endif /* defined (ACE_HAS_THREADS) && defined (ACE_HAS_THREAD_SPECIFIC_STORAGE) */ diff --git a/ace/UPIPE_Stream.cpp b/ace/UPIPE_Stream.cpp index b5fba065f27..b68183f06f2 100644 --- a/ace/UPIPE_Stream.cpp +++ b/ace/UPIPE_Stream.cpp @@ -120,8 +120,7 @@ ACE_UPIPE_Stream::recv (char *buffer, this->remaining_); bytes_read += this->remaining_; this->remaining_ = 0; - delete this->mb_last_; - this->mb_last_ = 0; + this->mb_last_ = this->mb_last_->release (); return bytes_read; } else @@ -139,12 +138,9 @@ ACE_UPIPE_Stream::recv (char *buffer, this->remaining_ -= n; if (this->remaining_ == 0) - { - // Now the Message_Buffer is empty. + // Now the Message_Buffer is empty. - delete this->mb_last_; - this->mb_last_ = 0; - } + this->mb_last_ = this->mb_last_->release (); } } else diff --git a/apps/Gateway/Gateway/Event_Channel.cpp b/apps/Gateway/Gateway/Event_Channel.cpp index 532523956a9..6b1337ffab2 100644 --- a/apps/Gateway/Gateway/Event_Channel.cpp +++ b/apps/Gateway/Gateway/Event_Channel.cpp @@ -18,9 +18,12 @@ ACE_Event_Channel_Options::ACE_Event_Channel_Options (void) ACE_Event_Channel::~ACE_Event_Channel (void) { + delete this->lock_adapter_; } ACE_Event_Channel::ACE_Event_Channel (void) + : lock_adapter_ (0), + acceptor_ (*this) { } @@ -30,6 +33,12 @@ ACE_Event_Channel::options (void) return this->options_; } +ACE_Lock * +ACE_Event_Channel::message_block_locking_strategy (void) +{ + return this->lock_adapter_; +} + int ACE_Event_Channel::compute_performance_statistics (void) { @@ -100,7 +109,7 @@ ACE_Event_Channel::compute_performance_statistics (void) ACE_Event_Channel::handle_timeout (const ACE_Time_Value &, const void *) { - return this->collection_performance_statistics (); + return this->compute_performance_statistics (); } // This method forwards the <event> to Consumer that have registered @@ -196,11 +205,6 @@ ACE_Event_Channel::initiate_proxy_connection (Proxy_Handler *proxy_handler, } int -ACE_Event_Channel::initiate_proxy_accept (void) -{ -} - -int ACE_Event_Channel::complete_proxy_connection (Proxy_Handler *proxy_handler) { int option = proxy_handler->proxy_role () == 'S' ? SO_RCVBUF : SO_SNDBUF; @@ -391,12 +395,20 @@ ACE_Event_Channel::open (void *) // Ignore SIPPIPE so each Consumer_Proxy can handle it. ACE_Sig_Action sig (ACE_SignalHandler (SIG_IGN), SIGPIPE); - if (this->connector_role_) + if (this->options ().connector_role_) // Actively initiate Peer connections. this->initiate_connector (); - if (this->acceptor_role_) + if (this->options ().acceptor_role_) // Passively initiate Peer acceptor. this->initiate_acceptor (); + + // If we're not running reactively, then we need to make sure that + // <ACE_Message_Block> reference counting operations are + // thread-safe. Therefore, we create an <ACE_Lock_Adapter> that is + // parameterized by <ACE_Thread_Mutex> to prevent race conditions. + if (this->options ().threading_strategy_ != ACE_Event_Channel_Options::REACTIVE) + ACE_NEW_RETURN (this->lock_adapter_, ACE_Lock_Adapter<ACE_Thread_Mutex>, -1); + return 0; } diff --git a/apps/Gateway/Gateway/Event_Channel.h b/apps/Gateway/Gateway/Event_Channel.h index 513fa36e1f9..0baaac64c81 100644 --- a/apps/Gateway/Gateway/Event_Channel.h +++ b/apps/Gateway/Gateway/Event_Channel.h @@ -130,6 +130,13 @@ private: // Periodically callback to perform timer-based performance // profiling. + ACE_Lock *message_block_locking_strategy (void); + // The strategy for locking <ACE_Message_Block> reference counting. + // This is NULL if our threading strategy is REACTIVE, else it + // points to the ACE_Lock_Adapter<ACE_Thread_Mutex> defined below. + + ACE_Lock_Adapter<ACE_Thread_Mutex> *lock_adapter_; + Proxy_Handler_Connector connector_; // Used to establish the connections actively. diff --git a/apps/Gateway/Gateway/Gateway.cpp b/apps/Gateway/Gateway/Gateway.cpp index 054a06282aa..fd4524036f4 100644 --- a/apps/Gateway/Gateway/Gateway.cpp +++ b/apps/Gateway/Gateway/Gateway.cpp @@ -306,7 +306,7 @@ Gateway::parse_consumer_config_file (void) // Read config file line at a time. for (Consumer_Config_Info cci; - consumer_file.read_entry (cci, line_number) != FP::EOFILE); + consumer_file.read_entry (cci, line_number) != FP::EOFILE; ) { file_empty = 0; diff --git a/examples/ASX/Event_Server/Event_Server/Peer_Router.cpp b/examples/ASX/Event_Server/Event_Server/Peer_Router.cpp index 1cbaa77b83b..a81b2e73e63 100644 --- a/examples/ASX/Event_Server/Event_Server/Peer_Router.cpp +++ b/examples/ASX/Event_Server/Event_Server/Peer_Router.cpp @@ -48,6 +48,7 @@ void Peer_Router_Context::release (void) { this->reference_count_--; + if (this->reference_count_ == 0) delete this; } @@ -141,6 +142,17 @@ Peer_Handler::Peer_Handler (Peer_Router_Context *prc) } #if 0 + +// Right now, Peer_Handlers are purely Reactive, i.e., they all run in +// a single thread of control. It would be easy to make them Active +// Objects by calling activate() in Peer_Handler::open(), making +// Peer_Handler::put() enqueue each message on the message queue, and +// (3) then running the following svc() routine to route each message +// to its final destination within a separate thread. Note that we'd +// want to move the svc() call up to the Consumer_Router and +// Supplier_Router level in order to get the right level of control +// for input and output. + Peer_Handler::svc (void) { ACE_Thread_Control thread_control (tm); @@ -184,10 +196,20 @@ Peer_Handler::svc (void) #endif int -Peer_Handler::put (ACE_Message_Block *mb, ACE_Time_Value *) +Peer_Handler::put (ACE_Message_Block *mb, ACE_Time_Value *tv) { - return this->peer ().send_n (mb->rd_ptr (), - mb->length ()); +#if 0 + // If we're running as Active Objects just enqueue the message here. + return this->putq (mb, tv); +#else + int result = 0; + + result = this->peer ().send_n (mb->rd_ptr (), + mb->length ()); + // Release the memory. + mb->release (); + return result; +#endif } // Initialize a newly connected handler. @@ -203,16 +225,18 @@ Peer_Handler::open (void *) else ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "info"), -1); #if 0 + // If we're running as an Active Object activate the Peer_Handler + // here. if (this->activate (Options::instance ()->t_flags ()) == -1) ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "activation of thread failed"), -1); -#endif ACE_DEBUG ((LM_DEBUG, "(%t) Peer_Handler::open registering with Reactor for handle_input\n")); - +#else // Register with the Reactor to receive messages from our Peer. if (ACE_Service_Config::reactor ()->register_handler (this, ACE_Event_Handler::READ_MASK) == -1) ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "register_handler"), -1); +#endif // Insert outselves into the routing map. else if (this->prc_->bind_peer (this->get_handle (), this) == -1) @@ -239,8 +263,8 @@ Peer_Handler::handle_input (ACE_HANDLE h) // Check for memory failures. if (db == 0 || hb == 0) { - delete hb; - delete db; + hb->release (); + db->release (); errno = ENOMEM; return -1; } diff --git a/examples/ASX/UPIPE_Event_Server/Peer_Router.cpp b/examples/ASX/UPIPE_Event_Server/Peer_Router.cpp index f17560ad0e6..a0bffc95627 100644 --- a/examples/ASX/UPIPE_Event_Server/Peer_Router.cpp +++ b/examples/ASX/UPIPE_Event_Server/Peer_Router.cpp @@ -195,7 +195,7 @@ Peer_Router<PH, PK>::send_peers (ACE_Message_Block *mb) bytes += ss->int_id_->put (data_block); } - delete mb; + mb->release (); return bytes == 0 ? 0 : bytes / iterations; } diff --git a/examples/IPC_SAP/UPIPE_SAP/ex1.cpp b/examples/IPC_SAP/UPIPE_SAP/ex1.cpp index ba5490504c3..4f4d8d99a55 100644 --- a/examples/IPC_SAP/UPIPE_SAP/ex1.cpp +++ b/examples/IPC_SAP/UPIPE_SAP/ex1.cpp @@ -49,7 +49,7 @@ peer1 (void *) ACE_DEBUG ((LM_DEBUG, "(%t) peer1 ack is \"%s\"\n", mb->rd_ptr ())); // Free up the memory block. - delete mb; + mb->release (); // Now try the send()/recv() interface. char mytext[] = "This string is sent by peer1 as buffer"; diff --git a/examples/IPC_SAP/UPIPE_SAP/ex2.cpp b/examples/IPC_SAP/UPIPE_SAP/ex2.cpp index 63e37e7089a..06e831bf9eb 100644 --- a/examples/IPC_SAP/UPIPE_SAP/ex2.cpp +++ b/examples/IPC_SAP/UPIPE_SAP/ex2.cpp @@ -110,7 +110,7 @@ consumer (void *) for (ACE_Message_Block *mb = 0; c_stream.recv (mb) != -1 && mb->size () != 0; - delete mb) + mb->release ()) received_messages++; ACE_OS::time (&currsec); diff --git a/examples/Reactor/Misc/test_demuxing.cpp b/examples/Reactor/Misc/test_demuxing.cpp index f8f7992ce37..56c8e6cf400 100644 --- a/examples/Reactor/Misc/test_demuxing.cpp +++ b/examples/Reactor/Misc/test_demuxing.cpp @@ -270,7 +270,7 @@ Message_Handler::handle_input (ACE_HANDLE) else { ACE_DEBUG ((LM_DEBUG, "(%t) priority = %d\n", mb->msg_priority ())); - delete mb; + mb->release (); } return 0; diff --git a/examples/Reactor/ReactorEx/test_reactorEx.cpp b/examples/Reactor/ReactorEx/test_reactorEx.cpp index 295b36ffda0..ea69d89ade4 100644 --- a/examples/Reactor/ReactorEx/test_reactorEx.cpp +++ b/examples/Reactor/ReactorEx/test_reactorEx.cpp @@ -210,7 +210,7 @@ Peer_Handler::handle_output_complete (ACE_Message_Block *msg, // This was allocated by the STDIN_Handler, queued, dequeued, // passed to the proactor, and now passed back to us. - delete msg; + msg->release (); return 0; // Do not reinvoke a send. } @@ -222,21 +222,25 @@ int Peer_Handler::handle_input_complete (ACE_Message_Block *msg, long bytes_transferred) { + int result = 1; // Reinvokes the recv() operation by default! + if (bytes_transferred > 0 && msg->length () > 0) { msg->rd_ptr ()[bytes_transferred] = '\0'; // Print out the message received from the server. ACE_DEBUG ((LM_DEBUG, "%s", msg->rd_ptr ())); - delete msg; - return 1; // Reinvokes the recv() operation! + } + else + { + // If a read failed, we will assume it's because the remote peer + // went away. We will end the event loop. Since we're in the + // main thread, we don't need to do a notify. + ACE_Service_Config::end_reactorEx_event_loop (); + result = -1; } - delete msg; - // If a read failed, we will assume it's because the remote peer - // went away. We will end the event loop. Since we're in the main - // thread, we don't need to do a notify. - ACE_Service_Config::end_reactorEx_event_loop (); - return -1; // Close down. + msg->release (); + return result; } // This is so the Proactor can get a message to read into. diff --git a/examples/Reactor/WFMO_Reactor/test_reactorEx.cpp b/examples/Reactor/WFMO_Reactor/test_reactorEx.cpp index 295b36ffda0..ea69d89ade4 100644 --- a/examples/Reactor/WFMO_Reactor/test_reactorEx.cpp +++ b/examples/Reactor/WFMO_Reactor/test_reactorEx.cpp @@ -210,7 +210,7 @@ Peer_Handler::handle_output_complete (ACE_Message_Block *msg, // This was allocated by the STDIN_Handler, queued, dequeued, // passed to the proactor, and now passed back to us. - delete msg; + msg->release (); return 0; // Do not reinvoke a send. } @@ -222,21 +222,25 @@ int Peer_Handler::handle_input_complete (ACE_Message_Block *msg, long bytes_transferred) { + int result = 1; // Reinvokes the recv() operation by default! + if (bytes_transferred > 0 && msg->length () > 0) { msg->rd_ptr ()[bytes_transferred] = '\0'; // Print out the message received from the server. ACE_DEBUG ((LM_DEBUG, "%s", msg->rd_ptr ())); - delete msg; - return 1; // Reinvokes the recv() operation! + } + else + { + // If a read failed, we will assume it's because the remote peer + // went away. We will end the event loop. Since we're in the + // main thread, we don't need to do a notify. + ACE_Service_Config::end_reactorEx_event_loop (); + result = -1; } - delete msg; - // If a read failed, we will assume it's because the remote peer - // went away. We will end the event loop. Since we're in the main - // thread, we don't need to do a notify. - ACE_Service_Config::end_reactorEx_event_loop (); - return -1; // Close down. + msg->release (); + return result; } // This is so the Proactor can get a message to read into. diff --git a/examples/Service_Configurator/IPC-tests/server/Handle_Thr_Stream.cpp b/examples/Service_Configurator/IPC-tests/server/Handle_Thr_Stream.cpp index f7e2d98f69a..71a3c10e556 100644 --- a/examples/Service_Configurator/IPC-tests/server/Handle_Thr_Stream.cpp +++ b/examples/Service_Configurator/IPC-tests/server/Handle_Thr_Stream.cpp @@ -13,6 +13,13 @@ #include "Handle_Thr_Stream.i" #endif /* __ACE_INLINE__ */ +// Shorthand names. +#define SH SVC_HANDLER +#define PR_AC_1 ACE_PEER_ACCEPTOR_1 +#define PR_AC_2 ACE_PEER_ACCEPTOR_2 +#define PR_ST_1 ACE_PEER_STREAM_1 +#define PR_ST_2 ACE_PEER_STREAM_2 + template <class SH, PR_AC_1> Handle_Thr_Acceptor<SH, PR_AC_2>::~Handle_Thr_Acceptor (void) { @@ -146,6 +153,12 @@ CLI_Stream<PR_ST_2>::svc (void) return 0; } +#undef SH +#undef PR_AC_1 +#undef PR_AC_2 +#undef PR_ST_1 +#undef PR_ST_2 + //---------------------------------------- #if defined (ACE_HAS_TLI) diff --git a/examples/Threads/barrier2.cpp b/examples/Threads/barrier2.cpp index 30190ace443..08119d9087c 100644 --- a/examples/Threads/barrier2.cpp +++ b/examples/Threads/barrier2.cpp @@ -99,7 +99,7 @@ Worker_Task<BARRIER>::put (ACE_Message_Block *mb, ACE_Time_Value *tv) if (this->output (mb) < 0) ACE_DEBUG ((LM_DEBUG, "(%t) output not connected!\n")); - delete mb; + mb->release (); } return result; } @@ -149,14 +149,14 @@ Worker_Task<BARRIER>::svc (void) if (length == 0) { ACE_DEBUG ((LM_DEBUG, "(%t) in iteration %d got quit, exit!\n", iter)); - delete mb; + mb->release (); break; } this->barrier_.wait (); this->output (mb); - delete mb; + mb->release (); } // Note that the ACE_Task::svc_run () method automatically removes diff --git a/examples/Threads/thread_pool.cpp b/examples/Threads/thread_pool.cpp index ddcad02e4dd..53cd86d1105 100644 --- a/examples/Threads/thread_pool.cpp +++ b/examples/Threads/thread_pool.cpp @@ -96,7 +96,7 @@ Thread_Pool::svc (void) count, length, length - 1, mb->rd_ptr ())); // We're responsible for deallocating this. - delete mb; + mb->release (); if (length == 0) { diff --git a/netsvcs/servers/main.cpp b/netsvcs/servers/main.cpp index b2f614c1567..67c5c9f3b57 100644 --- a/netsvcs/servers/main.cpp +++ b/netsvcs/servers/main.cpp @@ -60,36 +60,36 @@ main (int argc, char *argv[]) l_argv[1] = 0; Service_Ptr sp_2 = ACE_SVC_INVOKE (ACE_TS_Server_Acceptor); - if (so->init (1, l_argv) == -1) + if (sp_2->init (1, l_argv) == -1) ACE_ERROR ((LM_ERROR, "%p\n%a", "ACE_TS_Server_Acceptor", 1)); l_argv[0] = argv[0]; l_argv[1] = "-p 10011"; l_argv[2] = 0; - Service_Ptr sp_2 = ACE_SVC_INVOKE (ACE_TS_Clerk_Processor); + Service_Ptr sp_3 = ACE_SVC_INVOKE (ACE_TS_Clerk_Processor); - if (sp_2->init (2, l_argv) == -1) + if (sp_3->init (2, l_argv) == -1) ACE_ERROR ((LM_ERROR, "%p\n%a", "ACE_TS_Clerk_Processor", 1)); l_argv[0] = "-p " ACE_DEFAULT_TOKEN_SERVER_PORT_STR; l_argv[1] = 0; - Service_Ptr sp_3 = ACE_SVC_INVOKE (ACE_Token_Acceptor); + Service_Ptr sp_4 = ACE_SVC_INVOKE (ACE_Token_Acceptor); - if (sp_3->init (1, l_argv) == -1) + if (sp_4->init (1, l_argv) == -1) ACE_ERROR ((LM_ERROR, "%p\n%a", "Token_Service", 1)); l_argv[0] = "-p " ACE_DEFAULT_LOGGING_SERVER_PORT_STR; l_argv[1] = 0; - Service_Ptr sp_4 = ACE_SVC_INVOKE (ACE_Server_Logging_Acceptor); + Service_Ptr sp_5 = ACE_SVC_INVOKE (ACE_Server_Logging_Acceptor); - if (sp_4->init (1, l_argv) == -1) + if (sp_5->init (1, l_argv) == -1) ACE_ERROR ((LM_ERROR, "%p\n%a", "Logging_Service", 1)); l_argv[0] = "-p " ACE_DEFAULT_THR_LOGGING_SERVER_PORT_STR; l_argv[1] = 0; - Service_Ptr sp_5 = ACE_SVC_INVOKE (ACE_Thr_Server_Logging_Acceptor); + Service_Ptr sp_6 = ACE_SVC_INVOKE (ACE_Thr_Server_Logging_Acceptor); - if (sp_5->init (1, l_argv) == -1) + if (sp_6->init (1, l_argv) == -1) ACE_ERROR ((LM_ERROR, "%p\n%a", "Thr_Logging_Service", 1)); } diff --git a/tests/Buffer_Stream_Test.cpp b/tests/Buffer_Stream_Test.cpp index aa5ec27414c..2ba15d15cc3 100644 --- a/tests/Buffer_Stream_Test.cpp +++ b/tests/Buffer_Stream_Test.cpp @@ -174,7 +174,7 @@ Consumer::svc (void) ACE_ASSERT (c == output[0]); c++; } - delete mb; + mb->release (); if (length == 0) { diff --git a/tests/Future_Test.cpp b/tests/Future_Test.cpp index a02542c9aff..5f01738de12 100644 --- a/tests/Future_Test.cpp +++ b/tests/Future_Test.cpp @@ -382,10 +382,10 @@ main (int, char *[]) ACE_DEBUG ((LM_DEBUG, "(%t) task_count %d future_count %d capsule_count %d methodobject_count %d\n", - (u_long) task_count, - (u_long) future_count, - (u_long) capsule_count, - (u_long) methodobject_count)); + (int) task_count, + (int) future_count, + (int) capsule_count, + (int) methodobject_count)); } // Close things down. @@ -398,10 +398,10 @@ main (int, char *[]) ACE_DEBUG ((LM_DEBUG, "(%t) task_count %d future_count %d capsule_count %d methodobject_count %d\n", - (u_long) task_count, - (u_long) future_count, - (u_long) capsule_count, - (u_long) methodobject_count)); + (int) task_count, + (int) future_count, + (int) capsule_count, + (int) methodobject_count)); ACE_DEBUG ((LM_DEBUG,"(%t) th' that's all folks!\n")); diff --git a/tests/Priority_Buffer_Test.cpp b/tests/Priority_Buffer_Test.cpp index 09f1b7635d9..3a1134c63a2 100644 --- a/tests/Priority_Buffer_Test.cpp +++ b/tests/Priority_Buffer_Test.cpp @@ -68,7 +68,7 @@ consumer (void *args) // Free up the buffer memory and the Message_Block. Note that // the destructor of Message Block will delete the the actual // buffer. - delete mb; + mb->release (); if (length == 0) break; diff --git a/tests/Reactors_Test.cpp b/tests/Reactors_Test.cpp index 661298efd2d..42de847e165 100644 --- a/tests/Reactors_Test.cpp +++ b/tests/Reactors_Test.cpp @@ -127,7 +127,7 @@ Test_Task::handle_input (ACE_HANDLE) done_count--; ACE_DEBUG ((LM_DEBUG, "(%t) handle_input, handled_ = %d, done_count = %d\n", - this->handled_, (u_long) done_count)); + this->handled_, (int) done_count)); } ACE_OS::thr_yield (); diff --git a/tests/Thread_Pool_Test.cpp b/tests/Thread_Pool_Test.cpp index 2363850e7f3..ee4ce7c1ffa 100644 --- a/tests/Thread_Pool_Test.cpp +++ b/tests/Thread_Pool_Test.cpp @@ -10,20 +10,21 @@ // // = DESCRIPTION // This test program illustrates how the ACE task synchronization -// mechanisms work in conjunction with the ACE_Task and the -// ACE_Thread_Manager. If the manual flag is not set input comes -// from stdin until the user enters a return only. This stops -// all workers via a message block of length 0. This is an -// alternative shutdown of workers compared to queue deactivate. +// mechanisms and ACE_Message_Block reference counting works in +// conjunction with the ACE_Task and the ACE_Thread_Manager. If +// the manual flag is not set input comes from stdin until the +// user enters a return. This stops all workers via a message +// block of length 0. This shows an alternative way to shutdown +// worker tasks compared to queue deactivate. // // = AUTHOR // Karlheinz Dorn, Doug Schmidt, and Prashant Jain // // ============================================================================ +#define protected public #include "ace/Task.h" #include "ace/Service_Config.h" - #include "ace/Task.h" #include "test_config.h" @@ -35,22 +36,35 @@ static size_t n_iterations = 100; class Thread_Pool : public ACE_Task<ACE_MT_SYNCH> { public: - Thread_Pool (ACE_Thread_Manager *thr_mgr, int n_threads); + Thread_Pool (int n_threads); + // Create the thread pool containing <n_threads>. + ~Thread_Pool (void); + + virtual int open (void * = 0); + // Produce the messages that are consumed by the threads in the + // thread pool. + virtual int svc (void); // Iterate <n_iterations> time printing off a message and "waiting" // for all other threads to complete this iteration. virtual int put (ACE_Message_Block *mb, ACE_Time_Value *tv = 0); - // This allows the producer to pass messages to the <Thread_Pool>. + // Allows the producer to pass messages to the <Thread_Pool>. private: virtual int close (u_long); + // Close hook. - // = Not needed for this test. - virtual int open (void *) { return 0; } + ACE_Lock_Adapter<ACE_Thread_Mutex> lock_adapter_; + // Serialize access to <ACE_Message_Block> reference count, which + // will be decremented from multiple threads. }; +Thread_Pool::~Thread_Pool (void) +{ +} + int Thread_Pool::close (u_long) { @@ -58,9 +72,7 @@ Thread_Pool::close (u_long) return 0; } -Thread_Pool::Thread_Pool (ACE_Thread_Manager *thr_mgr, - int n_threads) - : ACE_Task<ACE_MT_SYNCH> (thr_mgr) +Thread_Pool::Thread_Pool (int n_threads) { // Create worker threads. if (this->activate (THR_NEW_LWP, n_threads) == -1) @@ -75,42 +87,49 @@ Thread_Pool::put (ACE_Message_Block *mb, ACE_Time_Value *tv) return this->putq (mb, tv); } -// Iterate <n_iterations> time printing off a message and "waiting" -// for all other threads to complete this iteration. +// Iterate <n_iterations> printing off a message and "waiting" for all +// other threads to complete this iteration. int Thread_Pool::svc (void) { ACE_NEW_THREAD; - // Note that the ACE_Task::svc_run () method automatically adds us to - // the Thread_Manager when the thread begins. - - int count = 1; + // The <ACE_Task::svc_run()> method automatically adds us to the + // <ACE_Service_Config>'s <ACE_Thread_Manager> when the thread + // begins. // Keep looping, reading a message out of the queue, until we get a // message with a length == 0, which signals us to quit. - for (;; count++) + for (int count = 1; ; count++) { ACE_Message_Block *mb; + ACE_DEBUG ((LM_DEBUG, "(%t) **** before head = %d, tail = %d\n", + this->msg_queue ()->head_, + this->msg_queue ()->tail_)); ACE_ASSERT (this->getq (mb) != -1); + ACE_DEBUG ((LM_DEBUG, "(%t) ++++ after head = %d, tail = %d\n", + this->msg_queue ()->head_, + this->msg_queue ()->tail_)); + int length = mb->length (); if (length > 0) ACE_DEBUG ((LM_DEBUG, - "(%t) in iteration %d, length = %d, text = \"%*s\"\n", - count, length, length - 1, mb->rd_ptr ())); + "(%t) in iteration %d, queue len = %d, length = %d, text = \"%*s\"\n", + count, this->msg_queue ()->message_count (), + length, length - 1, mb->rd_ptr ())); // We're responsible for deallocating this. - delete mb; + mb->release (); if (length == 0) { ACE_DEBUG ((LM_DEBUG, - "(%t) in iteration %d, got NULL message, exiting\n", - count)); + "(%t) in iteration %d, queue len = %d, got NULL message, exiting\n", + count, this->msg_queue ()->message_count ())); break; } } @@ -120,17 +139,22 @@ Thread_Pool::svc (void) return 0; } -static void -produce (Thread_Pool &thread_pool) +int +Thread_Pool::open (void *) { - ACE_DEBUG ((LM_DEBUG, "(%t) producer start, dumping the Thread_Pool\n")); - thread_pool.dump (); + ACE_DEBUG ((LM_DEBUG, + "(%t) producer start, dumping the Thread_Pool\n")); + this->dump (); + + ACE_Message_Block *mb; for (int n;;) { // Allocate a new message. - ACE_Message_Block *mb; - ACE_NEW (mb, ACE_Message_Block (BUFSIZ)); + ACE_NEW_RETURN (mb, + ACE_Message_Block (BUFSIZ, ACE_Message_Block::MB_DATA, + 0, 0, 0, &this->lock_adapter_), + -1); #if defined (manual) ACE_DEBUG ((LM_DEBUG, @@ -140,50 +164,53 @@ produce (Thread_Pool &thread_pool) static size_t count = 0; ACE_OS::sprintf (mb->rd_ptr (), "%d\n", count); - n = ACE_OS::strlen (mb->rd_ptr ()); - if (count == n_iterations) - n = 1; // Indicate that we need to shut down. + if (count == n_iterations || n <= 1) + break; else count++; if (count == 0 || (count % 20 == 0)) ACE_OS::sleep (1); #endif /* manual */ - if (n > 1) - { - // Send a normal message to the waiting threads and continue - // producing. - mb->wr_ptr (n); - - // Pass the message to the Thread_Pool. - if (thread_pool.put (mb) == -1) - ACE_ERROR ((LM_ERROR, " (%t) %p\n", "put")); - } - else - { - // Send a shutdown message to the waiting threads and exit. - ACE_DEBUG ((LM_DEBUG, "\n(%t) start loop, dump of task:\n")); - thread_pool.dump (); - - for (int i = thread_pool.thr_count (); i > 0; i--) - { - ACE_DEBUG ((LM_DEBUG, - "(%t) EOF, enqueueing NULL block for thread = %d\n", - i)); - - // Enqueue a NULL message to flag each consumer to - // shutdown. - if (thread_pool.put (new ACE_Message_Block) == -1) - ACE_ERROR ((LM_ERROR, " (%t) %p\n", "put")); - } - - ACE_DEBUG ((LM_DEBUG, "\n(%t) end loop, dump of task:\n")); - thread_pool.dump (); - break; - } + // Send a normal message to the waiting threads and continue + // producing. + mb->wr_ptr (n); + + // Pass the message to the Thread_Pool. + if (this->put (mb) == -1) + ACE_ERROR ((LM_ERROR, " (%t) %p\n", "put")); } + + // Send a shutdown message to the waiting threads and exit. + ACE_DEBUG ((LM_DEBUG, + "\n(%t) sending shutdown message to %d threads, dump of task:\n", + this->thr_count ())); + this->dump (); + + ACE_NEW_RETURN (mb, + ACE_Message_Block (0, ACE_Message_Block::MB_DATA, + 0, 0, 0, &this->lock_adapter_), + -1); + + for (int i = this->thr_count (); i > 0; i--) + { + ACE_DEBUG ((LM_DEBUG, + "(%t) EOF, enqueueing NULL block for thread = %d\n", + i)); + + // Enqueue an empty message to flag each consumer to shutdown. + // Note that we use reference counting to avoid having to copy + // the message. + if (this->put (mb->duplicate ()) == -1) + ACE_ERROR ((LM_ERROR, " (%t) %p\n", "put")); + } + + mb->release (); + + ACE_DEBUG ((LM_DEBUG, "\n(%t) end loop, dump of task:\n")); + this->dump (); } #endif /* ACE_HAS_THREADS */ @@ -198,17 +225,21 @@ main (int, char *[]) ACE_DEBUG ((LM_DEBUG, "(%t) threads = %d\n", n_threads)); // Create the worker tasks. - Thread_Pool thread_pool (ACE_Service_Config::thr_mgr (), - n_threads); + Thread_Pool thread_pool (n_threads); // Create work for the worker tasks to process in their own threads. - produce (thread_pool); + thread_pool.open (); // Wait for all the threads to reach their exit point. - ACE_DEBUG ((LM_DEBUG, "(%t) waiting with thread manager...\n")); + ACE_DEBUG ((LM_DEBUG, "(%t) waiting for worker tasks to finish...\n")); + ACE_Service_Config::thr_mgr ()->wait (); + ACE_ASSERT (thread_pool.msg_queue ()->is_empty ()); + ACE_DEBUG ((LM_DEBUG, "(%t) head = %d, tail = %d\n", + thread_pool.msg_queue ()->head_, + thread_pool.msg_queue ()->tail_)); ACE_DEBUG ((LM_DEBUG, "(%t) destroying worker tasks and exiting...\n")); #else ACE_ERROR ((LM_ERROR, "threads not supported on this platform\n")); diff --git a/tests/UPIPE_SAP_Test.cpp b/tests/UPIPE_SAP_Test.cpp index 64a851c28bf..1771575b3cb 100644 --- a/tests/UPIPE_SAP_Test.cpp +++ b/tests/UPIPE_SAP_Test.cpp @@ -59,7 +59,7 @@ connector (void *) ACE_ASSERT (ACE_OS::strcmp (mb->rd_ptr (), "thanks") == 0); // Free up the memory block. - delete mb; + mb->release (); // Now try the send()/recv() interface. char mytext[] = "This string is sent by connector as a buffer"; |