summaryrefslogtreecommitdiff
path: root/ace/Select_Reactor_Base.cpp
diff options
context:
space:
mode:
authorbala <balanatarajan@users.noreply.github.com>2001-09-01 00:27:25 +0000
committerbala <balanatarajan@users.noreply.github.com>2001-09-01 00:27:25 +0000
commit273b4f156a65d3786200019e4b41d99bf5ec9357 (patch)
tree99ae654dd4d7d3f5b5da3f28afd1a1293d2e9c9b /ace/Select_Reactor_Base.cpp
parenta04ff9003642f93f8be5d704b850bb9d437a0746 (diff)
downloadATCD-273b4f156a65d3786200019e4b41d99bf5ec9357.tar.gz
ChangeLogTag: Fri Aug 31 19:14:52 2001 Balachandran Natarajan <bala@cs.wustl.edu>
Diffstat (limited to 'ace/Select_Reactor_Base.cpp')
-rw-r--r--ace/Select_Reactor_Base.cpp222
1 files changed, 120 insertions, 102 deletions
diff --git a/ace/Select_Reactor_Base.cpp b/ace/Select_Reactor_Base.cpp
index 12402471eba..6eaecc133f3 100644
--- a/ace/Select_Reactor_Base.cpp
+++ b/ace/Select_Reactor_Base.cpp
@@ -684,8 +684,10 @@ ACE_Select_Reactor_Notify::notify (ACE_Event_Handler *eh,
ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, mon, this->notify_queue_lock_, -1);
// No pending notifications.
- if (this->notify_queue_.is_empty ())
- notification_required = 1;
+
+ // We will send notify for every message..
+ // if (this->notify_queue_.is_empty ())
+ // notification_required = 1;
ACE_Notification_Buffer *temp = 0;
@@ -718,7 +720,8 @@ ACE_Select_Reactor_Notify::notify (ACE_Event_Handler *eh,
if (notify_queue_.enqueue_tail (temp) == -1)
return -1;
- if (notification_required)
+ // Let us send a notify for every message
+ // if (notification_required)
{
ssize_t n = ACE::send (this->notification_pipe_.write_handle (),
(char *) &buffer,
@@ -782,11 +785,110 @@ ACE_Select_Reactor_Notify::notify_handle (void)
// Thanks to Paul Stephenson for suggesting this approach.
int
-ACE_Select_Reactor_Notify::dispatch_notify (ACE_HANDLE handle)
+ACE_Select_Reactor_Notify::dispatch_notify (ACE_Notification_Buffer &buffer)
+{
+ int result = 0;
+#if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
+ // Dispatch all messages that are in the <notify_queue_>.
+ {
+ // We acquire the lock in a block to make sure we're not
+ // holding the lock while delivering callbacks...
+ ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, mon, this->notify_queue_lock_, -1);
+
+ ACE_Notification_Buffer *temp;
+
+ if (notify_queue_.is_empty ())
+ break;
+ else if (notify_queue_.dequeue_head (temp) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ ACE_LIB_TEXT ("%p\n"),
+ ACE_LIB_TEXT ("dequeue_head")),
+ -1);
+ buffer = *temp;
+ if (free_queue_.enqueue_head (temp) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ ACE_LIB_TEXT ("%p\n"),
+ ACE_LIB_TEXT ("enqueue_head")),
+ -1);
+ }
+
+ // If eh == 0 then another thread is unblocking the
+ // <ACE_Select_Reactor> to update the <ACE_Select_Reactor>'s
+ // internal structures. Otherwise, we need to dispatch the
+ // appropriate handle_* method on the <ACE_Event_Handler>
+ // pointer we've been passed.
+ if (buffer.eh_ != 0)
+ {
+ int result = 0;
+
+ switch (buffer.mask_)
+ {
+ case ACE_Event_Handler::READ_MASK:
+ case ACE_Event_Handler::ACCEPT_MASK:
+ result = buffer.eh_->handle_input (ACE_INVALID_HANDLE);
+ break;
+ case ACE_Event_Handler::WRITE_MASK:
+ result = buffer.eh_->handle_output (ACE_INVALID_HANDLE);
+ break;
+ case ACE_Event_Handler::EXCEPT_MASK:
+ result = buffer.eh_->handle_exception (ACE_INVALID_HANDLE);
+ break;
+ default:
+ // Should we bail out if we get an invalid mask?
+ ACE_ERROR ((LM_ERROR, ACE_LIB_TEXT ("invalid mask = %d\n"), buffer.mask_));
+ }
+ if (result == -1)
+ buffer.eh_->handle_close (ACE_INVALID_HANDLE,
+ ACE_Event_Handler::EXCEPT_MASK);
+ }
+#else
+ // If eh == 0 then another thread is unblocking the
+ // <ACE_Select_Reactor> to update the <ACE_Select_Reactor>'s
+ // internal structures. Otherwise, we need to dispatch the
+ // appropriate handle_* method on the <ACE_Event_Handler>
+ // pointer we've been passed.
+ if (buffer.eh_ != 0)
+ {
+ switch (buffer.mask_)
+ {
+ case ACE_Event_Handler::READ_MASK:
+ case ACE_Event_Handler::ACCEPT_MASK:
+ result = buffer.eh_->handle_input (ACE_INVALID_HANDLE);
+ break;
+ case ACE_Event_Handler::WRITE_MASK:
+ result = buffer.eh_->handle_output (ACE_INVALID_HANDLE);
+ break;
+ case ACE_Event_Handler::EXCEPT_MASK:
+ result = buffer.eh_->handle_exception (ACE_INVALID_HANDLE);
+ break;
+ case ACE_Event_Handler::QOS_MASK:
+ result = buffer.eh_->handle_qos (ACE_INVALID_HANDLE);
+ break;
+ case ACE_Event_Handler::GROUP_QOS_MASK:
+ result = buffer.eh_->handle_group_qos (ACE_INVALID_HANDLE);
+ break;
+ default:
+ // Should we bail out if we get an invalid mask?
+ ACE_ERROR ((LM_ERROR,
+ ACE_LIB_TEXT ("invalid mask = %d\n"),
+ buffer.mask_));
+ }
+ if (result == -1)
+ buffer.eh_->handle_close (ACE_INVALID_HANDLE,
+ ACE_Event_Handler::EXCEPT_MASK);
+ }
+
+#endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */
+
+ return result;
+}
+
+int
+ACE_Select_Reactor_Notify::read_notify_pipe (ACE_HANDLE handle,
+ ACE_Notification_Buffer &buffer)
{
ACE_TRACE ("ACE_Select_Reactor_Notify::dispatch_notify");
- ACE_Notification_Buffer buffer;
ssize_t n = 0;
if ((n = ACE::recv (handle, (char *) &buffer, sizeof buffer)) > 0)
@@ -806,102 +908,7 @@ ACE_Select_Reactor_Notify::dispatch_notify (ACE_HANDLE handle)
return -1;
}
-#if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
- // Dispatch all messages that are in the <notify_queue_>.
- for (;;)
- {
- {
- // We acquire the lock in a block to make sure we're not
- // holding the lock while delivering callbacks...
- ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, mon, this->notify_queue_lock_, -1);
-
- ACE_Notification_Buffer *temp;
-
- if (notify_queue_.is_empty ())
- break;
- else if (notify_queue_.dequeue_head (temp) == -1)
- ACE_ERROR_RETURN ((LM_ERROR,
- ACE_LIB_TEXT ("%p\n"),
- ACE_LIB_TEXT ("dequeue_head")),
- -1);
- buffer = *temp;
- if (free_queue_.enqueue_head (temp) == -1)
- ACE_ERROR_RETURN ((LM_ERROR,
- ACE_LIB_TEXT ("%p\n"),
- ACE_LIB_TEXT ("enqueue_head")),
- -1);
- }
-
- // If eh == 0 then another thread is unblocking the
- // <ACE_Select_Reactor> to update the <ACE_Select_Reactor>'s
- // internal structures. Otherwise, we need to dispatch the
- // appropriate handle_* method on the <ACE_Event_Handler>
- // pointer we've been passed.
- if (buffer.eh_ != 0)
- {
- int result = 0;
-
- switch (buffer.mask_)
- {
- case ACE_Event_Handler::READ_MASK:
- case ACE_Event_Handler::ACCEPT_MASK:
- result = buffer.eh_->handle_input (ACE_INVALID_HANDLE);
- break;
- case ACE_Event_Handler::WRITE_MASK:
- result = buffer.eh_->handle_output (ACE_INVALID_HANDLE);
- break;
- case ACE_Event_Handler::EXCEPT_MASK:
- result = buffer.eh_->handle_exception (ACE_INVALID_HANDLE);
- break;
- default:
- // Should we bail out if we get an invalid mask?
- ACE_ERROR ((LM_ERROR, ACE_LIB_TEXT ("invalid mask = %d\n"), buffer.mask_));
- }
- if (result == -1)
- buffer.eh_->handle_close (ACE_INVALID_HANDLE,
- ACE_Event_Handler::EXCEPT_MASK);
- }
- }
-#else
- // If eh == 0 then another thread is unblocking the
- // <ACE_Select_Reactor> to update the <ACE_Select_Reactor>'s
- // internal structures. Otherwise, we need to dispatch the
- // appropriate handle_* method on the <ACE_Event_Handler>
- // pointer we've been passed.
- if (buffer.eh_ != 0)
- {
- int result = 0;
-
- switch (buffer.mask_)
- {
- case ACE_Event_Handler::READ_MASK:
- case ACE_Event_Handler::ACCEPT_MASK:
- result = buffer.eh_->handle_input (ACE_INVALID_HANDLE);
- break;
- case ACE_Event_Handler::WRITE_MASK:
- result = buffer.eh_->handle_output (ACE_INVALID_HANDLE);
- break;
- case ACE_Event_Handler::EXCEPT_MASK:
- result = buffer.eh_->handle_exception (ACE_INVALID_HANDLE);
- break;
- case ACE_Event_Handler::QOS_MASK:
- result = buffer.eh_->handle_qos (ACE_INVALID_HANDLE);
- break;
- case ACE_Event_Handler::GROUP_QOS_MASK:
- result = buffer.eh_->handle_group_qos (ACE_INVALID_HANDLE);
- break;
- default:
- // Should we bail out if we get an invalid mask?
- ACE_ERROR ((LM_ERROR,
- ACE_LIB_TEXT ("invalid mask = %d\n"),
- buffer.mask_));
- }
- if (result == -1)
- buffer.eh_->handle_close (ACE_INVALID_HANDLE,
- ACE_Event_Handler::EXCEPT_MASK);
- }
-#endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */
return 1;
}
@@ -922,9 +929,20 @@ ACE_Select_Reactor_Notify::handle_input (ACE_HANDLE handle)
int number_dispatched = 0;
int result = 0;
- while ((result = this->dispatch_notify (handle) > 0))
+ ACE_Notification_Buffer buffer;
+
+ while ((result = this->read_notify_pipe (handle,
+ buffer) > 0))
{
- number_dispatched++;
+ // Dispatch the buffer
+ if (this->dispatch_notify (buffer) > 0)
+ number_dispatched++;
+ else
+ {
+ // If there are any errors in dispatching...
+ result = -1;
+ break;
+ }
// Bail out if we've reached the <notify_threshold_>. Note that
// by default <notify_threshold_> is -1, so we'll loop until all