summaryrefslogtreecommitdiff
path: root/ace/Select_Reactor_Notify.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'ace/Select_Reactor_Notify.cpp')
-rw-r--r--ace/Select_Reactor_Notify.cpp149
1 files changed, 88 insertions, 61 deletions
diff --git a/ace/Select_Reactor_Notify.cpp b/ace/Select_Reactor_Notify.cpp
index 18886c44a36..aaf43e8a110 100644
--- a/ace/Select_Reactor_Notify.cpp
+++ b/ace/Select_Reactor_Notify.cpp
@@ -145,7 +145,7 @@ ACE_Select_Reactor_Notify::open (ACE_Reactor_Impl *r,
this->select_reactor_ =
ACE_dynamic_cast (ACE_Select_Reactor_Impl *, r);
- if (select_reactor_ == 0)
+ if (this->select_reactor_ == 0)
{
errno = EINVAL;
return -1;
@@ -234,67 +234,90 @@ ACE_Select_Reactor_Notify::notify (ACE_Event_Handler *eh,
ACE_Notification_Buffer buffer (eh, mask);
+ int notification_required = 1;
+
#if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
// Artificial scope to limit the duration of the mutex.
{
- // int notification_required = 0;
+ ACE_GUARD_RETURN (ACE_SYNCH_MUTEX,
+ mon,
+ this->notify_queue_lock_, -1);
- ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, mon, this->notify_queue_lock_, -1);
+ if (this->notify_message_to_queue (buffer,
+ notification_required) == -1)
+ return -1;
+ }
+#endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */
- // No pending notifications.
+ ssize_t n = 0;
- // We will send notify for every message..
- // if (this->notify_queue_.is_empty ())
- // notification_required = 1;
+ // Send a notification message on the pipe if required.
+ if (notification_required)
+ {
+ n = ACE::send (this->notification_pipe_.write_handle (),
+ (char *) &buffer,
+ sizeof buffer,
+ timeout);
+ }
- ACE_Notification_Buffer *temp = 0;
+ if (n == -1)
+ return -1;
- if (free_queue_.dequeue_head (temp) == -1)
- {
- // Grow the queue of available buffers.
- ACE_Notification_Buffer *temp1;
+ return 0;
+}
- ACE_NEW_RETURN (temp1,
- ACE_Notification_Buffer[ACE_REACTOR_NOTIFICATION_ARRAY_SIZE],
- -1);
- if (this->alloc_queue_.enqueue_head (temp1) == -1)
- {
- delete [] temp1;
- return -1;
- }
+#if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
+int
+ACE_Select_Reactor_Notify::notify_message_to_queue (ACE_Notification_Buffer &buffer,
+ int &notification_required)
+{
+ // If the notification queue is not empty, set the
+ // notification_required flag to zero.
+ if (!this->notify_queue_.is_empty ())
+ notification_required = 0;
- // Start at 1 and enqueue only
- // (ACE_REACTOR_NOTIFICATION_ARRAY_SIZE - 1) elements since
- // the first one will be used right now.
- for (size_t i = 1;
- i < ACE_REACTOR_NOTIFICATION_ARRAY_SIZE;
- i++)
- this->free_queue_.enqueue_head (temp1 + i);
+ ACE_Notification_Buffer *temp = 0;
- temp = temp1;
- }
+ // Take a node from the free list.
+ if (this->free_queue_.dequeue_head (temp) == -1)
+ {
+ // Grow the queue of available buffers.
+ ACE_Notification_Buffer *temp1;
- ACE_ASSERT (temp != 0);
- *temp = buffer;
+ ACE_NEW_RETURN (temp1,
+ ACE_Notification_Buffer[ACE_REACTOR_NOTIFICATION_ARRAY_SIZE],
+ -1);
- if (notify_queue_.enqueue_tail (temp) == -1)
- return -1;
- }
-#endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */
+ if (this->alloc_queue_.enqueue_head (temp1) == -1)
+ {
+ delete [] temp1;
+ return -1;
+ }
- ssize_t n = ACE::send (this->notification_pipe_.write_handle (),
- (char *) &buffer,
- sizeof buffer,
- timeout);
- if (n == -1)
+ // Start at 1 and enqueue only
+ // (ACE_REACTOR_NOTIFICATION_ARRAY_SIZE - 1) elements since
+ // the first one will be used right now.
+ for (size_t i = 1;
+ i < ACE_REACTOR_NOTIFICATION_ARRAY_SIZE;
+ i++)
+ this->free_queue_.enqueue_head (temp1 + i);
+
+ temp = temp1;
+ }
+
+
+ ACE_ASSERT (temp != 0);
+ *temp = buffer;
+
+ // Enqueue the node in the <notify_queue>
+ if (this->notify_queue_.enqueue_tail (temp) == -1)
return -1;
return 0;
}
-// Handles pending threads (if any) that are waiting to unblock the
-// Select_Reactor.
+#endif /*ACE_HAS_REACTOR_NOTIFICATION_QUEUE*/
int
ACE_Select_Reactor_Notify::dispatch_notifications (int &number_of_active_handles,
@@ -393,27 +416,31 @@ ACE_Select_Reactor_Notify::dispatch_notify (ACE_Notification_Buffer &buffer)
int result = 0;
#if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
// Dispatch all messages that are in the <notify_queue_>.
- {
- // We acquire the lock in a block to make sure we're not
- // holding the lock while delivering callbacks...
- ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, mon, this->notify_queue_lock_, -1);
+ for (;;)
+ {
+ ////////@@@@@@@@@@ Bala Need to work from here...
+ {
+ // We acquire the lock in a block to make sure we're not
+ // holding the lock while delivering callbacks...
+ ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, mon, this->notify_queue_lock_, -1);
- ACE_Notification_Buffer *temp;
+ if (this->notify_queue_.is_empty ())
+ return 0;
- if (notify_queue_.is_empty ())
- return 0;
- else if (notify_queue_.dequeue_head (temp) == -1)
- ACE_ERROR_RETURN ((LM_ERROR,
- ACE_LIB_TEXT ("%p\n"),
- ACE_LIB_TEXT ("dequeue_head")),
- -1);
- buffer = *temp;
- if (free_queue_.enqueue_head (temp) == -1)
- ACE_ERROR_RETURN ((LM_ERROR,
- ACE_LIB_TEXT ("%p\n"),
- ACE_LIB_TEXT ("enqueue_head")),
- -1);
- }
+ ACE_Notification_Buffer *temp;
+
+ if (notify_queue_.dequeue_head (temp) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ ACE_LIB_TEXT ("%p\n"),
+ ACE_LIB_TEXT ("dequeue_head")),
+ -1);
+ buffer = *temp;
+ if (free_queue_.enqueue_head (temp) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ ACE_LIB_TEXT ("%p\n"),
+ ACE_LIB_TEXT ("enqueue_head")),
+ -1);
+ }
// If eh == 0 then another thread is unblocking the
// <ACE_Select_Reactor> to update the <ACE_Select_Reactor>'s