summaryrefslogtreecommitdiff
path: root/ace/Select_Reactor_Base.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'ace/Select_Reactor_Base.cpp')
-rw-r--r--ace/Select_Reactor_Base.cpp226
1 files changed, 35 insertions, 191 deletions
diff --git a/ace/Select_Reactor_Base.cpp b/ace/Select_Reactor_Base.cpp
index 94fa673b84e..0306fc7007a 100644
--- a/ace/Select_Reactor_Base.cpp
+++ b/ace/Select_Reactor_Base.cpp
@@ -561,84 +561,7 @@ ACE_Select_Reactor_Notify::purge_pending_notifications (ACE_Event_Handler *eh,
#if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
- ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, mon, this->notify_queue_lock_, -1);
-
- if (this->notify_queue_.is_empty ())
- return 0;
-
- ACE_Notification_Buffer *temp = 0;
- ACE_Unbounded_Queue <ACE_Notification_Buffer *> local_queue;
-
- size_t queue_size = this->notify_queue_.size ();
- int number_purged = 0;
- size_t i;
- for (i = 0; i < queue_size; ++i)
- {
- if (-1 == this->notify_queue_.dequeue_head (temp))
- ACE_ERROR_RETURN ((LM_ERROR,
- ACE_LIB_TEXT ("%p\n"),
- ACE_LIB_TEXT ("dequeue_head")),
- -1);
-
- // If this is not a Reactor notify (it is for a particular handler),
- // and it matches the specified handler (or purging all),
- // and applying the mask would totally eliminate the notification, then
- // release it and count the number purged.
- if ((0 != temp->eh_) &&
- (0 == eh || eh == temp->eh_) &&
- ACE_BIT_DISABLED (temp->mask_, ~mask)) // the existing notificationmask
- // is left with nothing when
- // applying the mask
- {
- if (-1 == this->free_queue_.enqueue_head (temp))
- ACE_ERROR_RETURN ((LM_ERROR,
- ACE_LIB_TEXT ("%p\n"),
- ACE_LIB_TEXT ("enqueue_head")),
- -1);
-
- ACE_Event_Handler *event_handler = temp->eh_;
- event_handler->remove_reference ();
-
- ++number_purged;
- }
- else
- {
- // To preserve it, move it to the local_queue.
- // But first, if this is not a Reactor notify (it is for a particularhandler),
- // and it matches the specified handler (or purging all), then
- // apply the mask
- if ((0 != temp->eh_) &&
- (0 == eh || eh == temp->eh_))
- ACE_CLR_BITS(temp->mask_, mask);
- if (-1 == local_queue.enqueue_head (temp))
- return -1;
- }
- }
-
- if (this->notify_queue_.size ())
- { // should be empty!
- ACE_ASSERT (0);
- return -1;
- }
-
- // now put it back in the notify queue
- queue_size = local_queue.size ();
- for (i = 0; i < queue_size; ++i)
- {
- if (-1 == local_queue.dequeue_head (temp))
- ACE_ERROR_RETURN ((LM_ERROR,
- ACE_LIB_TEXT ("%p\n"),
- ACE_LIB_TEXT ("dequeue_head")),
- -1);
-
- if (-1 == this->notify_queue_.enqueue_head (temp))
- ACE_ERROR_RETURN ((LM_ERROR,
- ACE_LIB_TEXT ("%p\n"),
- ACE_LIB_TEXT ("enqueue_head")),
- -1);
- }
-
- return number_purged;
+ return notification_queue_.purge_pending_notifications(eh, mask);
#else /* defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE) */
ACE_UNUSED_ARG (eh);
@@ -686,22 +609,10 @@ ACE_Select_Reactor_Notify::open (ACE_Reactor_Impl *r,
#endif /* F_SETFD */
#if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
- ACE_Notification_Buffer *temp = 0;
-
- ACE_NEW_RETURN (temp,
- ACE_Notification_Buffer[ACE_REACTOR_NOTIFICATION_ARRAY_SIZE],
- -1);
-
- if (this->alloc_queue_.enqueue_head (temp) == -1)
- {
- delete [] temp;
- return -1;
- }
-
- for (size_t i = 0; i < ACE_REACTOR_NOTIFICATION_ARRAY_SIZE; ++i)
- if (free_queue_.enqueue_head (temp + i) == -1)
- return -1;
-
+ if (notification_queue_.open() == -1)
+ {
+ return -1;
+ }
#endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */
// There seems to be a Win32 bug with this... Set this into
@@ -728,20 +639,7 @@ ACE_Select_Reactor_Notify::close (void)
ACE_TRACE ("ACE_Select_Reactor_Notify::close");
#if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
- // Free up the dynamically allocated resources.
- ACE_Notification_Buffer **b = 0;
-
- for (ACE_Unbounded_Queue_Iterator<ACE_Notification_Buffer *> alloc_iter (this->alloc_queue_);
- alloc_iter.next (b) != 0;
- alloc_iter.advance ())
- {
- delete [] *b;
- *b = 0;
- }
-
- this->alloc_queue_.reset ();
- this->notify_queue_.reset ();
- this->free_queue_.reset ();
+ notification_queue_.reset();
#endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */
return this->notification_pipe_.close ();
@@ -767,57 +665,20 @@ ACE_Select_Reactor_Notify::notify (ACE_Event_Handler *event_handler,
ACE_Notification_Buffer buffer (event_handler, mask);
#if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
- // Artificial scope to limit the duration of the mutex.
- {
- bool notification_required = false;
+ int notification_required =
+ notification_queue_.push_new_notification(buffer);
- ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, mon, this->notify_queue_lock_, -1);
-
- // No pending notifications.
- if (this->notify_queue_.is_empty ())
- notification_required = true;
-
- ACE_Notification_Buffer *temp = 0;
-
- if (free_queue_.dequeue_head (temp) == -1)
- {
- // Grow the queue of available buffers.
- ACE_Notification_Buffer *temp1 = 0;
-
- ACE_NEW_RETURN (temp1,
- ACE_Notification_Buffer[ACE_REACTOR_NOTIFICATION_ARRAY_SIZE],
- -1);
-
- if (this->alloc_queue_.enqueue_head (temp1) == -1)
- {
- delete [] temp1;
- return -1;
- }
-
- // Start at 1 and enqueue only
- // (ACE_REACTOR_NOTIFICATION_ARRAY_SIZE - 1) elements since
- // the first one will be used right now.
- for (size_t i = 1;
- i < ACE_REACTOR_NOTIFICATION_ARRAY_SIZE;
- ++i)
- this->free_queue_.enqueue_head (temp1 + i);
-
- temp = temp1;
- }
-
- ACE_ASSERT (temp != 0);
- *temp = buffer;
-
- if (notify_queue_.enqueue_tail (temp) == -1)
- return -1;
+ if (notification_required == -1)
+ {
+ return -1;
+ }
- if (!notification_required)
- {
- // No failures.
- safe_handler.release ();
+ if (notification_required == 0)
+ {
+ // No failures, the handler is now owned by the notification queue
+ safe_handler.release ();
- return 0;
- }
+ return 0;
}
#endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */
@@ -897,45 +758,28 @@ ACE_Select_Reactor_Notify::dispatch_notify (ACE_Notification_Buffer &buffer)
// Dispatch one message from the notify queue, and put another in
// the pipe if one is available. Remember, the idea is to keep
// exactly one message in the pipe at a time.
- {
- // We acquire the lock in a block to make sure we're not
- // holding the lock while delivering callbacks...
- ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, mon, this->notify_queue_lock_, -1);
- ACE_Notification_Buffer *temp = 0;
+ bool more_messages_queued = false;
+ ACE_Notification_Buffer next;
+
+ result = notification_queue_.pop_next_notification(
+ buffer, more_messages_queued, next);
- if (notify_queue_.is_empty ())
+ if (result == 0)
+ {
return 0;
- else if (notify_queue_.dequeue_head (temp) == -1)
- ACE_ERROR_RETURN ((LM_ERROR,
- ACE_LIB_TEXT ("%p\n"),
- ACE_LIB_TEXT ("dequeue_head")),
- -1);
- buffer = *temp;
- if (free_queue_.enqueue_head (temp) == -1)
- ACE_ERROR_RETURN ((LM_ERROR,
- ACE_LIB_TEXT ("%p\n"),
- ACE_LIB_TEXT ("enqueue_head")),
- -1);
-
- bool write_next_buffer = false;
- ACE_Notification_Buffer ** next = 0;
-
- if(!this->notify_queue_.is_empty())
- {
- // The queue is not empty, need to queue another message.
- this->notify_queue_.get (next, 0);
- write_next_buffer = true;
- }
-
- if(write_next_buffer)
- {
- (void) ACE::send(
- this->notification_pipe_.write_handle(),
- (char *)*next, sizeof(ACE_Notification_Buffer));
- }
- }
+ }
+ if (result == -1)
+ {
+ return -1;
+ }
+
+ if(more_messages_queued)
+ {
+ (void) ACE::send(this->notification_pipe_.write_handle(),
+ (char *)&next, sizeof(ACE_Notification_Buffer));
+ }
#endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */
// If eh == 0 then another thread is unblocking the