diff options
Diffstat (limited to 'ace/Select_Reactor_Base.cpp')
-rw-r--r-- | ace/Select_Reactor_Base.cpp | 226 |
1 files changed, 35 insertions, 191 deletions
diff --git a/ace/Select_Reactor_Base.cpp b/ace/Select_Reactor_Base.cpp index 94fa673b84e..0306fc7007a 100644 --- a/ace/Select_Reactor_Base.cpp +++ b/ace/Select_Reactor_Base.cpp @@ -561,84 +561,7 @@ ACE_Select_Reactor_Notify::purge_pending_notifications (ACE_Event_Handler *eh, #if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE) - ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, mon, this->notify_queue_lock_, -1); - - if (this->notify_queue_.is_empty ()) - return 0; - - ACE_Notification_Buffer *temp = 0; - ACE_Unbounded_Queue <ACE_Notification_Buffer *> local_queue; - - size_t queue_size = this->notify_queue_.size (); - int number_purged = 0; - size_t i; - for (i = 0; i < queue_size; ++i) - { - if (-1 == this->notify_queue_.dequeue_head (temp)) - ACE_ERROR_RETURN ((LM_ERROR, - ACE_LIB_TEXT ("%p\n"), - ACE_LIB_TEXT ("dequeue_head")), - -1); - - // If this is not a Reactor notify (it is for a particular handler), - // and it matches the specified handler (or purging all), - // and applying the mask would totally eliminate the notification, then - // release it and count the number purged. - if ((0 != temp->eh_) && - (0 == eh || eh == temp->eh_) && - ACE_BIT_DISABLED (temp->mask_, ~mask)) // the existing notificationmask - // is left with nothing when - // applying the mask - { - if (-1 == this->free_queue_.enqueue_head (temp)) - ACE_ERROR_RETURN ((LM_ERROR, - ACE_LIB_TEXT ("%p\n"), - ACE_LIB_TEXT ("enqueue_head")), - -1); - - ACE_Event_Handler *event_handler = temp->eh_; - event_handler->remove_reference (); - - ++number_purged; - } - else - { - // To preserve it, move it to the local_queue. - // But first, if this is not a Reactor notify (it is for a particularhandler), - // and it matches the specified handler (or purging all), then - // apply the mask - if ((0 != temp->eh_) && - (0 == eh || eh == temp->eh_)) - ACE_CLR_BITS(temp->mask_, mask); - if (-1 == local_queue.enqueue_head (temp)) - return -1; - } - } - - if (this->notify_queue_.size ()) - { // should be empty! - ACE_ASSERT (0); - return -1; - } - - // now put it back in the notify queue - queue_size = local_queue.size (); - for (i = 0; i < queue_size; ++i) - { - if (-1 == local_queue.dequeue_head (temp)) - ACE_ERROR_RETURN ((LM_ERROR, - ACE_LIB_TEXT ("%p\n"), - ACE_LIB_TEXT ("dequeue_head")), - -1); - - if (-1 == this->notify_queue_.enqueue_head (temp)) - ACE_ERROR_RETURN ((LM_ERROR, - ACE_LIB_TEXT ("%p\n"), - ACE_LIB_TEXT ("enqueue_head")), - -1); - } - - return number_purged; + return notification_queue_.purge_pending_notifications(eh, mask); #else /* defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE) */ ACE_UNUSED_ARG (eh); @@ -686,22 +609,10 @@ ACE_Select_Reactor_Notify::open (ACE_Reactor_Impl *r, #endif /* F_SETFD */ #if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE) - ACE_Notification_Buffer *temp = 0; - - ACE_NEW_RETURN (temp, - ACE_Notification_Buffer[ACE_REACTOR_NOTIFICATION_ARRAY_SIZE], - -1); - - if (this->alloc_queue_.enqueue_head (temp) == -1) - { - delete [] temp; - return -1; - } - - for (size_t i = 0; i < ACE_REACTOR_NOTIFICATION_ARRAY_SIZE; ++i) - if (free_queue_.enqueue_head (temp + i) == -1) - return -1; - + if (notification_queue_.open() == -1) + { + return -1; + } #endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */ // There seems to be a Win32 bug with this... Set this into @@ -728,20 +639,7 @@ ACE_Select_Reactor_Notify::close (void) ACE_TRACE ("ACE_Select_Reactor_Notify::close"); #if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE) - // Free up the dynamically allocated resources. - ACE_Notification_Buffer **b = 0; - - for (ACE_Unbounded_Queue_Iterator<ACE_Notification_Buffer *> alloc_iter (this->alloc_queue_); - alloc_iter.next (b) != 0; - alloc_iter.advance ()) - { - delete [] *b; - *b = 0; - } - - this->alloc_queue_.reset (); - this->notify_queue_.reset (); - this->free_queue_.reset (); + notification_queue_.reset(); #endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */ return this->notification_pipe_.close (); @@ -767,57 +665,20 @@ ACE_Select_Reactor_Notify::notify (ACE_Event_Handler *event_handler, ACE_Notification_Buffer buffer (event_handler, mask); #if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE) - // Artificial scope to limit the duration of the mutex. - { - bool notification_required = false; + int notification_required = + notification_queue_.push_new_notification(buffer); - ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, mon, this->notify_queue_lock_, -1); - - // No pending notifications. - if (this->notify_queue_.is_empty ()) - notification_required = true; - - ACE_Notification_Buffer *temp = 0; - - if (free_queue_.dequeue_head (temp) == -1) - { - // Grow the queue of available buffers. - ACE_Notification_Buffer *temp1 = 0; - - ACE_NEW_RETURN (temp1, - ACE_Notification_Buffer[ACE_REACTOR_NOTIFICATION_ARRAY_SIZE], - -1); - - if (this->alloc_queue_.enqueue_head (temp1) == -1) - { - delete [] temp1; - return -1; - } - - // Start at 1 and enqueue only - // (ACE_REACTOR_NOTIFICATION_ARRAY_SIZE - 1) elements since - // the first one will be used right now. - for (size_t i = 1; - i < ACE_REACTOR_NOTIFICATION_ARRAY_SIZE; - ++i) - this->free_queue_.enqueue_head (temp1 + i); - - temp = temp1; - } - - ACE_ASSERT (temp != 0); - *temp = buffer; - - if (notify_queue_.enqueue_tail (temp) == -1) - return -1; + if (notification_required == -1) + { + return -1; + } - if (!notification_required) - { - // No failures. - safe_handler.release (); + if (notification_required == 0) + { + // No failures, the handler is now owned by the notification queue + safe_handler.release (); - return 0; - } + return 0; } #endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */ @@ -897,45 +758,28 @@ ACE_Select_Reactor_Notify::dispatch_notify (ACE_Notification_Buffer &buffer) // Dispatch one message from the notify queue, and put another in // the pipe if one is available. Remember, the idea is to keep // exactly one message in the pipe at a time. - { - // We acquire the lock in a block to make sure we're not - // holding the lock while delivering callbacks... - ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, mon, this->notify_queue_lock_, -1); - ACE_Notification_Buffer *temp = 0; + bool more_messages_queued = false; + ACE_Notification_Buffer next; + + result = notification_queue_.pop_next_notification( + buffer, more_messages_queued, next); - if (notify_queue_.is_empty ()) + if (result == 0) + { return 0; - else if (notify_queue_.dequeue_head (temp) == -1) - ACE_ERROR_RETURN ((LM_ERROR, - ACE_LIB_TEXT ("%p\n"), - ACE_LIB_TEXT ("dequeue_head")), - -1); - buffer = *temp; - if (free_queue_.enqueue_head (temp) == -1) - ACE_ERROR_RETURN ((LM_ERROR, - ACE_LIB_TEXT ("%p\n"), - ACE_LIB_TEXT ("enqueue_head")), - -1); - - bool write_next_buffer = false; - ACE_Notification_Buffer ** next = 0; - - if(!this->notify_queue_.is_empty()) - { - // The queue is not empty, need to queue another message. - this->notify_queue_.get (next, 0); - write_next_buffer = true; - } - - if(write_next_buffer) - { - (void) ACE::send( - this->notification_pipe_.write_handle(), - (char *)*next, sizeof(ACE_Notification_Buffer)); - } - } + } + if (result == -1) + { + return -1; + } + + if(more_messages_queued) + { + (void) ACE::send(this->notification_pipe_.write_handle(), + (char *)&next, sizeof(ACE_Notification_Buffer)); + } #endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */ // If eh == 0 then another thread is unblocking the |