From 1b1f8d303e43e7985dca0195b130b8f6baade493 Mon Sep 17 00:00:00 2001 From: bala Date: Mon, 12 Aug 2002 12:59:23 +0000 Subject: ChangeLogTag:Mon Aug 12 08:14:51 2002 Balachandran Natarajan --- ace/Select_Reactor_Notify.cpp | 149 +++++++++++++++++++++++++----------------- ace/Select_Reactor_Notify.h | 11 ++++ tests/ChangeLog | 8 +++ 3 files changed, 107 insertions(+), 61 deletions(-) diff --git a/ace/Select_Reactor_Notify.cpp b/ace/Select_Reactor_Notify.cpp index 18886c44a36..aaf43e8a110 100644 --- a/ace/Select_Reactor_Notify.cpp +++ b/ace/Select_Reactor_Notify.cpp @@ -145,7 +145,7 @@ ACE_Select_Reactor_Notify::open (ACE_Reactor_Impl *r, this->select_reactor_ = ACE_dynamic_cast (ACE_Select_Reactor_Impl *, r); - if (select_reactor_ == 0) + if (this->select_reactor_ == 0) { errno = EINVAL; return -1; @@ -234,67 +234,90 @@ ACE_Select_Reactor_Notify::notify (ACE_Event_Handler *eh, ACE_Notification_Buffer buffer (eh, mask); + int notification_required = 1; + #if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE) // Artificial scope to limit the duration of the mutex. { - // int notification_required = 0; + ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, + mon, + this->notify_queue_lock_, -1); - ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, mon, this->notify_queue_lock_, -1); + if (this->notify_message_to_queue (buffer, + notification_required) == -1) + return -1; + } +#endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */ - // No pending notifications. + ssize_t n = 0; - // We will send notify for every message.. - // if (this->notify_queue_.is_empty ()) - // notification_required = 1; + // Send a notification message on the pipe if required. + if (notification_required) + { + n = ACE::send (this->notification_pipe_.write_handle (), + (char *) &buffer, + sizeof buffer, + timeout); + } - ACE_Notification_Buffer *temp = 0; + if (n == -1) + return -1; - if (free_queue_.dequeue_head (temp) == -1) - { - // Grow the queue of available buffers. - ACE_Notification_Buffer *temp1; + return 0; +} - ACE_NEW_RETURN (temp1, - ACE_Notification_Buffer[ACE_REACTOR_NOTIFICATION_ARRAY_SIZE], - -1); - if (this->alloc_queue_.enqueue_head (temp1) == -1) - { - delete [] temp1; - return -1; - } +#if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE) +int +ACE_Select_Reactor_Notify::notify_message_to_queue (ACE_Notification_Buffer &buffer, + int ¬ification_required) +{ + // If the notification queue is not empty, set the + // notification_required flag to zero. + if (!this->notify_queue_.is_empty ()) + notification_required = 0; - // Start at 1 and enqueue only - // (ACE_REACTOR_NOTIFICATION_ARRAY_SIZE - 1) elements since - // the first one will be used right now. - for (size_t i = 1; - i < ACE_REACTOR_NOTIFICATION_ARRAY_SIZE; - i++) - this->free_queue_.enqueue_head (temp1 + i); + ACE_Notification_Buffer *temp = 0; - temp = temp1; - } + // Take a node from the free list. + if (this->free_queue_.dequeue_head (temp) == -1) + { + // Grow the queue of available buffers. + ACE_Notification_Buffer *temp1; - ACE_ASSERT (temp != 0); - *temp = buffer; + ACE_NEW_RETURN (temp1, + ACE_Notification_Buffer[ACE_REACTOR_NOTIFICATION_ARRAY_SIZE], + -1); - if (notify_queue_.enqueue_tail (temp) == -1) - return -1; - } -#endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */ + if (this->alloc_queue_.enqueue_head (temp1) == -1) + { + delete [] temp1; + return -1; + } - ssize_t n = ACE::send (this->notification_pipe_.write_handle (), - (char *) &buffer, - sizeof buffer, - timeout); - if (n == -1) + // Start at 1 and enqueue only + // (ACE_REACTOR_NOTIFICATION_ARRAY_SIZE - 1) elements since + // the first one will be used right now. + for (size_t i = 1; + i < ACE_REACTOR_NOTIFICATION_ARRAY_SIZE; + i++) + this->free_queue_.enqueue_head (temp1 + i); + + temp = temp1; + } + + + ACE_ASSERT (temp != 0); + *temp = buffer; + + // Enqueue the node in the + if (this->notify_queue_.enqueue_tail (temp) == -1) return -1; return 0; } -// Handles pending threads (if any) that are waiting to unblock the -// Select_Reactor. +#endif /*ACE_HAS_REACTOR_NOTIFICATION_QUEUE*/ int ACE_Select_Reactor_Notify::dispatch_notifications (int &number_of_active_handles, @@ -393,27 +416,31 @@ ACE_Select_Reactor_Notify::dispatch_notify (ACE_Notification_Buffer &buffer) int result = 0; #if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE) // Dispatch all messages that are in the . - { - // We acquire the lock in a block to make sure we're not - // holding the lock while delivering callbacks... - ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, mon, this->notify_queue_lock_, -1); + for (;;) + { + ////////@@@@@@@@@@ Bala Need to work from here... + { + // We acquire the lock in a block to make sure we're not + // holding the lock while delivering callbacks... + ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, mon, this->notify_queue_lock_, -1); - ACE_Notification_Buffer *temp; + if (this->notify_queue_.is_empty ()) + return 0; - if (notify_queue_.is_empty ()) - return 0; - else if (notify_queue_.dequeue_head (temp) == -1) - ACE_ERROR_RETURN ((LM_ERROR, - ACE_LIB_TEXT ("%p\n"), - ACE_LIB_TEXT ("dequeue_head")), - -1); - buffer = *temp; - if (free_queue_.enqueue_head (temp) == -1) - ACE_ERROR_RETURN ((LM_ERROR, - ACE_LIB_TEXT ("%p\n"), - ACE_LIB_TEXT ("enqueue_head")), - -1); - } + ACE_Notification_Buffer *temp; + + if (notify_queue_.dequeue_head (temp) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_LIB_TEXT ("%p\n"), + ACE_LIB_TEXT ("dequeue_head")), + -1); + buffer = *temp; + if (free_queue_.enqueue_head (temp) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_LIB_TEXT ("%p\n"), + ACE_LIB_TEXT ("enqueue_head")), + -1); + } // If eh == 0 then another thread is unblocking the // to update the 's diff --git a/ace/Select_Reactor_Notify.h b/ace/Select_Reactor_Notify.h index 5e1b972021e..c920395f9a3 100644 --- a/ace/Select_Reactor_Notify.h +++ b/ace/Select_Reactor_Notify.h @@ -138,6 +138,17 @@ public: /// Declare the dynamic allocation hooks. ACE_ALLOC_HOOK_DECLARE; +protected: + +#if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE) + + /// Add the to the user level notification queue. The + /// returns whether the queue needs a + /// notification to be sent in the pipe or not. + int notify_message_to_queue (ACE_Notification_Buffer &buffer, + int ¬ification_required); +#endif/*ACE_HAS_REACTOR_NOTIFICATION_QUEUE*/ + protected: /** * Keep a back pointer to the . If this value diff --git a/tests/ChangeLog b/tests/ChangeLog index 55f05fcf975..0796afaa345 100644 --- a/tests/ChangeLog +++ b/tests/ChangeLog @@ -1,3 +1,11 @@ +Mon Aug 12 08:14:51 2002 Balachandran Natarajan + + * ace/Select_Reactor_Notify.cpp: + * ace/Select_Reactor_Notify.h: Added a new method + notify_message_to_queue () which puts the notify message into a + user-level queue. Now the reactor sends only message into the + pipe for all the messages in queue. + Tue Aug 06 00:39:15 2002 Balachandran Natarajan * ace/Select_Reactor_Notify.h: -- cgit v1.2.1