From 273b4f156a65d3786200019e4b41d99bf5ec9357 Mon Sep 17 00:00:00 2001 From: bala Date: Sat, 1 Sep 2001 00:27:25 +0000 Subject: ChangeLogTag: Fri Aug 31 19:14:52 2001 Balachandran Natarajan --- ace/Select_Reactor_Base.cpp | 222 ++++++++++++++++++++++++-------------------- 1 file changed, 120 insertions(+), 102 deletions(-) (limited to 'ace/Select_Reactor_Base.cpp') diff --git a/ace/Select_Reactor_Base.cpp b/ace/Select_Reactor_Base.cpp index 12402471eba..6eaecc133f3 100644 --- a/ace/Select_Reactor_Base.cpp +++ b/ace/Select_Reactor_Base.cpp @@ -684,8 +684,10 @@ ACE_Select_Reactor_Notify::notify (ACE_Event_Handler *eh, ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, mon, this->notify_queue_lock_, -1); // No pending notifications. - if (this->notify_queue_.is_empty ()) - notification_required = 1; + + // We will send notify for every message.. + // if (this->notify_queue_.is_empty ()) + // notification_required = 1; ACE_Notification_Buffer *temp = 0; @@ -718,7 +720,8 @@ ACE_Select_Reactor_Notify::notify (ACE_Event_Handler *eh, if (notify_queue_.enqueue_tail (temp) == -1) return -1; - if (notification_required) + // Let us send a notify for every message + // if (notification_required) { ssize_t n = ACE::send (this->notification_pipe_.write_handle (), (char *) &buffer, @@ -782,11 +785,110 @@ ACE_Select_Reactor_Notify::notify_handle (void) // Thanks to Paul Stephenson for suggesting this approach. int -ACE_Select_Reactor_Notify::dispatch_notify (ACE_HANDLE handle) +ACE_Select_Reactor_Notify::dispatch_notify (ACE_Notification_Buffer &buffer) +{ + int result = 0; +#if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE) + // Dispatch all messages that are in the . + { + // We acquire the lock in a block to make sure we're not + // holding the lock while delivering callbacks... + ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, mon, this->notify_queue_lock_, -1); + + ACE_Notification_Buffer *temp; + + if (notify_queue_.is_empty ()) + break; + else if (notify_queue_.dequeue_head (temp) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_LIB_TEXT ("%p\n"), + ACE_LIB_TEXT ("dequeue_head")), + -1); + buffer = *temp; + if (free_queue_.enqueue_head (temp) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_LIB_TEXT ("%p\n"), + ACE_LIB_TEXT ("enqueue_head")), + -1); + } + + // If eh == 0 then another thread is unblocking the + // to update the 's + // internal structures. Otherwise, we need to dispatch the + // appropriate handle_* method on the + // pointer we've been passed. + if (buffer.eh_ != 0) + { + int result = 0; + + switch (buffer.mask_) + { + case ACE_Event_Handler::READ_MASK: + case ACE_Event_Handler::ACCEPT_MASK: + result = buffer.eh_->handle_input (ACE_INVALID_HANDLE); + break; + case ACE_Event_Handler::WRITE_MASK: + result = buffer.eh_->handle_output (ACE_INVALID_HANDLE); + break; + case ACE_Event_Handler::EXCEPT_MASK: + result = buffer.eh_->handle_exception (ACE_INVALID_HANDLE); + break; + default: + // Should we bail out if we get an invalid mask? + ACE_ERROR ((LM_ERROR, ACE_LIB_TEXT ("invalid mask = %d\n"), buffer.mask_)); + } + if (result == -1) + buffer.eh_->handle_close (ACE_INVALID_HANDLE, + ACE_Event_Handler::EXCEPT_MASK); + } +#else + // If eh == 0 then another thread is unblocking the + // to update the 's + // internal structures. Otherwise, we need to dispatch the + // appropriate handle_* method on the + // pointer we've been passed. + if (buffer.eh_ != 0) + { + switch (buffer.mask_) + { + case ACE_Event_Handler::READ_MASK: + case ACE_Event_Handler::ACCEPT_MASK: + result = buffer.eh_->handle_input (ACE_INVALID_HANDLE); + break; + case ACE_Event_Handler::WRITE_MASK: + result = buffer.eh_->handle_output (ACE_INVALID_HANDLE); + break; + case ACE_Event_Handler::EXCEPT_MASK: + result = buffer.eh_->handle_exception (ACE_INVALID_HANDLE); + break; + case ACE_Event_Handler::QOS_MASK: + result = buffer.eh_->handle_qos (ACE_INVALID_HANDLE); + break; + case ACE_Event_Handler::GROUP_QOS_MASK: + result = buffer.eh_->handle_group_qos (ACE_INVALID_HANDLE); + break; + default: + // Should we bail out if we get an invalid mask? + ACE_ERROR ((LM_ERROR, + ACE_LIB_TEXT ("invalid mask = %d\n"), + buffer.mask_)); + } + if (result == -1) + buffer.eh_->handle_close (ACE_INVALID_HANDLE, + ACE_Event_Handler::EXCEPT_MASK); + } + +#endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */ + + return result; +} + +int +ACE_Select_Reactor_Notify::read_notify_pipe (ACE_HANDLE handle, + ACE_Notification_Buffer &buffer) { ACE_TRACE ("ACE_Select_Reactor_Notify::dispatch_notify"); - ACE_Notification_Buffer buffer; ssize_t n = 0; if ((n = ACE::recv (handle, (char *) &buffer, sizeof buffer)) > 0) @@ -806,102 +908,7 @@ ACE_Select_Reactor_Notify::dispatch_notify (ACE_HANDLE handle) return -1; } -#if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE) - // Dispatch all messages that are in the . - for (;;) - { - { - // We acquire the lock in a block to make sure we're not - // holding the lock while delivering callbacks... - ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, mon, this->notify_queue_lock_, -1); - - ACE_Notification_Buffer *temp; - - if (notify_queue_.is_empty ()) - break; - else if (notify_queue_.dequeue_head (temp) == -1) - ACE_ERROR_RETURN ((LM_ERROR, - ACE_LIB_TEXT ("%p\n"), - ACE_LIB_TEXT ("dequeue_head")), - -1); - buffer = *temp; - if (free_queue_.enqueue_head (temp) == -1) - ACE_ERROR_RETURN ((LM_ERROR, - ACE_LIB_TEXT ("%p\n"), - ACE_LIB_TEXT ("enqueue_head")), - -1); - } - - // If eh == 0 then another thread is unblocking the - // to update the 's - // internal structures. Otherwise, we need to dispatch the - // appropriate handle_* method on the - // pointer we've been passed. - if (buffer.eh_ != 0) - { - int result = 0; - - switch (buffer.mask_) - { - case ACE_Event_Handler::READ_MASK: - case ACE_Event_Handler::ACCEPT_MASK: - result = buffer.eh_->handle_input (ACE_INVALID_HANDLE); - break; - case ACE_Event_Handler::WRITE_MASK: - result = buffer.eh_->handle_output (ACE_INVALID_HANDLE); - break; - case ACE_Event_Handler::EXCEPT_MASK: - result = buffer.eh_->handle_exception (ACE_INVALID_HANDLE); - break; - default: - // Should we bail out if we get an invalid mask? - ACE_ERROR ((LM_ERROR, ACE_LIB_TEXT ("invalid mask = %d\n"), buffer.mask_)); - } - if (result == -1) - buffer.eh_->handle_close (ACE_INVALID_HANDLE, - ACE_Event_Handler::EXCEPT_MASK); - } - } -#else - // If eh == 0 then another thread is unblocking the - // to update the 's - // internal structures. Otherwise, we need to dispatch the - // appropriate handle_* method on the - // pointer we've been passed. - if (buffer.eh_ != 0) - { - int result = 0; - - switch (buffer.mask_) - { - case ACE_Event_Handler::READ_MASK: - case ACE_Event_Handler::ACCEPT_MASK: - result = buffer.eh_->handle_input (ACE_INVALID_HANDLE); - break; - case ACE_Event_Handler::WRITE_MASK: - result = buffer.eh_->handle_output (ACE_INVALID_HANDLE); - break; - case ACE_Event_Handler::EXCEPT_MASK: - result = buffer.eh_->handle_exception (ACE_INVALID_HANDLE); - break; - case ACE_Event_Handler::QOS_MASK: - result = buffer.eh_->handle_qos (ACE_INVALID_HANDLE); - break; - case ACE_Event_Handler::GROUP_QOS_MASK: - result = buffer.eh_->handle_group_qos (ACE_INVALID_HANDLE); - break; - default: - // Should we bail out if we get an invalid mask? - ACE_ERROR ((LM_ERROR, - ACE_LIB_TEXT ("invalid mask = %d\n"), - buffer.mask_)); - } - if (result == -1) - buffer.eh_->handle_close (ACE_INVALID_HANDLE, - ACE_Event_Handler::EXCEPT_MASK); - } -#endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */ return 1; } @@ -922,9 +929,20 @@ ACE_Select_Reactor_Notify::handle_input (ACE_HANDLE handle) int number_dispatched = 0; int result = 0; - while ((result = this->dispatch_notify (handle) > 0)) + ACE_Notification_Buffer buffer; + + while ((result = this->read_notify_pipe (handle, + buffer) > 0)) { - number_dispatched++; + // Dispatch the buffer + if (this->dispatch_notify (buffer) > 0) + number_dispatched++; + else + { + // If there are any errors in dispatching... + result = -1; + break; + } // Bail out if we've reached the . Note that // by default is -1, so we'll loop until all -- cgit v1.2.1