diff options
Diffstat (limited to 'ace/Select_Reactor_Notify.cpp')
-rw-r--r-- | ace/Select_Reactor_Notify.cpp | 149 |
1 files changed, 88 insertions, 61 deletions
diff --git a/ace/Select_Reactor_Notify.cpp b/ace/Select_Reactor_Notify.cpp index 18886c44a36..aaf43e8a110 100644 --- a/ace/Select_Reactor_Notify.cpp +++ b/ace/Select_Reactor_Notify.cpp @@ -145,7 +145,7 @@ ACE_Select_Reactor_Notify::open (ACE_Reactor_Impl *r, this->select_reactor_ = ACE_dynamic_cast (ACE_Select_Reactor_Impl *, r); - if (select_reactor_ == 0) + if (this->select_reactor_ == 0) { errno = EINVAL; return -1; @@ -234,67 +234,90 @@ ACE_Select_Reactor_Notify::notify (ACE_Event_Handler *eh, ACE_Notification_Buffer buffer (eh, mask); + int notification_required = 1; + #if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE) // Artificial scope to limit the duration of the mutex. { - // int notification_required = 0; + ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, + mon, + this->notify_queue_lock_, -1); - ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, mon, this->notify_queue_lock_, -1); + if (this->notify_message_to_queue (buffer, + notification_required) == -1) + return -1; + } +#endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */ - // No pending notifications. + ssize_t n = 0; - // We will send notify for every message.. - // if (this->notify_queue_.is_empty ()) - // notification_required = 1; + // Send a notification message on the pipe if required. + if (notification_required) + { + n = ACE::send (this->notification_pipe_.write_handle (), + (char *) &buffer, + sizeof buffer, + timeout); + } - ACE_Notification_Buffer *temp = 0; + if (n == -1) + return -1; - if (free_queue_.dequeue_head (temp) == -1) - { - // Grow the queue of available buffers. - ACE_Notification_Buffer *temp1; + return 0; +} - ACE_NEW_RETURN (temp1, - ACE_Notification_Buffer[ACE_REACTOR_NOTIFICATION_ARRAY_SIZE], - -1); - if (this->alloc_queue_.enqueue_head (temp1) == -1) - { - delete [] temp1; - return -1; - } +#if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE) +int +ACE_Select_Reactor_Notify::notify_message_to_queue (ACE_Notification_Buffer &buffer, + int ¬ification_required) +{ + // If the notification queue is not empty, set the + // notification_required flag to zero. + if (!this->notify_queue_.is_empty ()) + notification_required = 0; - // Start at 1 and enqueue only - // (ACE_REACTOR_NOTIFICATION_ARRAY_SIZE - 1) elements since - // the first one will be used right now. - for (size_t i = 1; - i < ACE_REACTOR_NOTIFICATION_ARRAY_SIZE; - i++) - this->free_queue_.enqueue_head (temp1 + i); + ACE_Notification_Buffer *temp = 0; - temp = temp1; - } + // Take a node from the free list. + if (this->free_queue_.dequeue_head (temp) == -1) + { + // Grow the queue of available buffers. + ACE_Notification_Buffer *temp1; - ACE_ASSERT (temp != 0); - *temp = buffer; + ACE_NEW_RETURN (temp1, + ACE_Notification_Buffer[ACE_REACTOR_NOTIFICATION_ARRAY_SIZE], + -1); - if (notify_queue_.enqueue_tail (temp) == -1) - return -1; - } -#endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */ + if (this->alloc_queue_.enqueue_head (temp1) == -1) + { + delete [] temp1; + return -1; + } - ssize_t n = ACE::send (this->notification_pipe_.write_handle (), - (char *) &buffer, - sizeof buffer, - timeout); - if (n == -1) + // Start at 1 and enqueue only + // (ACE_REACTOR_NOTIFICATION_ARRAY_SIZE - 1) elements since + // the first one will be used right now. + for (size_t i = 1; + i < ACE_REACTOR_NOTIFICATION_ARRAY_SIZE; + i++) + this->free_queue_.enqueue_head (temp1 + i); + + temp = temp1; + } + + + ACE_ASSERT (temp != 0); + *temp = buffer; + + // Enqueue the node in the <notify_queue> + if (this->notify_queue_.enqueue_tail (temp) == -1) return -1; return 0; } -// Handles pending threads (if any) that are waiting to unblock the -// Select_Reactor. +#endif /*ACE_HAS_REACTOR_NOTIFICATION_QUEUE*/ int ACE_Select_Reactor_Notify::dispatch_notifications (int &number_of_active_handles, @@ -393,27 +416,31 @@ ACE_Select_Reactor_Notify::dispatch_notify (ACE_Notification_Buffer &buffer) int result = 0; #if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE) // Dispatch all messages that are in the <notify_queue_>. - { - // We acquire the lock in a block to make sure we're not - // holding the lock while delivering callbacks... - ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, mon, this->notify_queue_lock_, -1); + for (;;) + { + ////////@@@@@@@@@@ Bala Need to work from here... + { + // We acquire the lock in a block to make sure we're not + // holding the lock while delivering callbacks... + ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, mon, this->notify_queue_lock_, -1); - ACE_Notification_Buffer *temp; + if (this->notify_queue_.is_empty ()) + return 0; - if (notify_queue_.is_empty ()) - return 0; - else if (notify_queue_.dequeue_head (temp) == -1) - ACE_ERROR_RETURN ((LM_ERROR, - ACE_LIB_TEXT ("%p\n"), - ACE_LIB_TEXT ("dequeue_head")), - -1); - buffer = *temp; - if (free_queue_.enqueue_head (temp) == -1) - ACE_ERROR_RETURN ((LM_ERROR, - ACE_LIB_TEXT ("%p\n"), - ACE_LIB_TEXT ("enqueue_head")), - -1); - } + ACE_Notification_Buffer *temp; + + if (notify_queue_.dequeue_head (temp) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_LIB_TEXT ("%p\n"), + ACE_LIB_TEXT ("dequeue_head")), + -1); + buffer = *temp; + if (free_queue_.enqueue_head (temp) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_LIB_TEXT ("%p\n"), + ACE_LIB_TEXT ("enqueue_head")), + -1); + } // If eh == 0 then another thread is unblocking the // <ACE_Select_Reactor> to update the <ACE_Select_Reactor>'s |