summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorbala <balanatarajan@users.noreply.github.com>2001-09-01 00:27:25 +0000
committerbala <balanatarajan@users.noreply.github.com>2001-09-01 00:27:25 +0000
commit273b4f156a65d3786200019e4b41d99bf5ec9357 (patch)
tree99ae654dd4d7d3f5b5da3f28afd1a1293d2e9c9b
parenta04ff9003642f93f8be5d704b850bb9d437a0746 (diff)
downloadATCD-273b4f156a65d3786200019e4b41d99bf5ec9357.tar.gz
ChangeLogTag: Fri Aug 31 19:14:52 2001 Balachandran Natarajan <bala@cs.wustl.edu>
-rw-r--r--ChangeLog41
-rw-r--r--ChangeLogs/ChangeLog-02a41
-rw-r--r--ChangeLogs/ChangeLog-03a41
-rw-r--r--ace/Reactor_Impl.h7
-rw-r--r--ace/Select_Reactor_Base.cpp222
-rw-r--r--ace/Select_Reactor_Base.h10
-rw-r--r--ace/TP_Reactor.cpp495
-rw-r--r--ace/TP_Reactor.h31
-rw-r--r--ace/TP_Reactor.i10
9 files changed, 537 insertions, 361 deletions
diff --git a/ChangeLog b/ChangeLog
index 2b6981c5cd5..08a9ae8da47 100644
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,3 +1,44 @@
+Fri Aug 31 19:14:52 2001 Balachandran Natarajan <bala@cs.wustl.edu>
+
+ * ace/Reactor_Impl.h: Added a new method by name read_notify_pipe
+ (). Also changed the dispatch_notify () to take in a
+ Notification buffer instead of a ACE_HANDLE.
+
+ * ace/Select_Reactor_Base.cpp:
+ * ace/Select_Reactor_Base.h: Made the following changes
+
+ - Implemented read_notify_pipe (). Will read just one message from
+ the notify pipe or one message from the notification queue.
+
+ - Reimplemented dispatch_notify (). This method would just
+ dispatch the upcall using the information in the notification
+ buffer.
+
+ - The handle_input () now uses the read_notify_pipe () and
+ dispatch_notify () to achieve what it was doing before.
+
+ - The notify () call now sends one notify message on the pipe
+ for every message in the notification queue.
+
+ * ace/TP_Reactor.cpp (handle_socket_events):
+ * ace/TP_Reactor.h:
+ * ace/TP_Reactor.i: We had a race condition. The race condition
+ was because two threads were trying to read from the notify_pipe
+ at the same instance. This race condition was fixed by adding a
+ call to read_notify_pipe () with the lock held and then calling
+ dispatch_notify () with the buffer read after releasing the
+ lock. Did the following minor modifications
+
+ - Changed dispatch_socket_events () as dispatch_socket_event ()
+ as we were dispatching only one event.
+
+ - We dont grab the token in the constructor of the token. We
+ have to make a call specfically to grab_token () to get the
+ token.
+
+ The above checkins should fix the correctness of the reactor
+ problems that we have been seeing.
+
Fri Aug 31 18:30:28 2001 Krishnakumar B <kitty@cs.wustl.edu>
* bin/auto_run_tests.lst:
diff --git a/ChangeLogs/ChangeLog-02a b/ChangeLogs/ChangeLog-02a
index 2b6981c5cd5..08a9ae8da47 100644
--- a/ChangeLogs/ChangeLog-02a
+++ b/ChangeLogs/ChangeLog-02a
@@ -1,3 +1,44 @@
+Fri Aug 31 19:14:52 2001 Balachandran Natarajan <bala@cs.wustl.edu>
+
+ * ace/Reactor_Impl.h: Added a new method by name read_notify_pipe
+ (). Also changed the dispatch_notify () to take in a
+ Notification buffer instead of a ACE_HANDLE.
+
+ * ace/Select_Reactor_Base.cpp:
+ * ace/Select_Reactor_Base.h: Made the following changes
+
+ - Implemented read_notify_pipe (). Will read just one message from
+ the notify pipe or one message from the notification queue.
+
+ - Reimplemented dispatch_notify (). This method would just
+ dispatch the upcall using the information in the notification
+ buffer.
+
+ - The handle_input () now uses the read_notify_pipe () and
+ dispatch_notify () to achieve what it was doing before.
+
+ - The notify () call now sends one notify message on the pipe
+ for every message in the notification queue.
+
+ * ace/TP_Reactor.cpp (handle_socket_events):
+ * ace/TP_Reactor.h:
+ * ace/TP_Reactor.i: We had a race condition. The race condition
+ was because two threads were trying to read from the notify_pipe
+ at the same instance. This race condition was fixed by adding a
+ call to read_notify_pipe () with the lock held and then calling
+ dispatch_notify () with the buffer read after releasing the
+ lock. Did the following minor modifications
+
+ - Changed dispatch_socket_events () as dispatch_socket_event ()
+ as we were dispatching only one event.
+
+ - We dont grab the token in the constructor of the token. We
+ have to make a call specfically to grab_token () to get the
+ token.
+
+ The above checkins should fix the correctness of the reactor
+ problems that we have been seeing.
+
Fri Aug 31 18:30:28 2001 Krishnakumar B <kitty@cs.wustl.edu>
* bin/auto_run_tests.lst:
diff --git a/ChangeLogs/ChangeLog-03a b/ChangeLogs/ChangeLog-03a
index 2b6981c5cd5..08a9ae8da47 100644
--- a/ChangeLogs/ChangeLog-03a
+++ b/ChangeLogs/ChangeLog-03a
@@ -1,3 +1,44 @@
+Fri Aug 31 19:14:52 2001 Balachandran Natarajan <bala@cs.wustl.edu>
+
+ * ace/Reactor_Impl.h: Added a new method by name read_notify_pipe
+ (). Also changed the dispatch_notify () to take in a
+ Notification buffer instead of a ACE_HANDLE.
+
+ * ace/Select_Reactor_Base.cpp:
+ * ace/Select_Reactor_Base.h: Made the following changes
+
+ - Implemented read_notify_pipe (). Will read just one message from
+ the notify pipe or one message from the notification queue.
+
+ - Reimplemented dispatch_notify (). This method would just
+ dispatch the upcall using the information in the notification
+ buffer.
+
+ - The handle_input () now uses the read_notify_pipe () and
+ dispatch_notify () to achieve what it was doing before.
+
+ - The notify () call now sends one notify message on the pipe
+ for every message in the notification queue.
+
+ * ace/TP_Reactor.cpp (handle_socket_events):
+ * ace/TP_Reactor.h:
+ * ace/TP_Reactor.i: We had a race condition. The race condition
+ was because two threads were trying to read from the notify_pipe
+ at the same instance. This race condition was fixed by adding a
+ call to read_notify_pipe () with the lock held and then calling
+ dispatch_notify () with the buffer read after releasing the
+ lock. Did the following minor modifications
+
+ - Changed dispatch_socket_events () as dispatch_socket_event ()
+ as we were dispatching only one event.
+
+ - We dont grab the token in the constructor of the token. We
+ have to make a call specfically to grab_token () to get the
+ token.
+
+ The above checkins should fix the correctness of the reactor
+ problems that we have been seeing.
+
Fri Aug 31 18:30:28 2001 Krishnakumar B <kitty@cs.wustl.edu>
* bin/auto_run_tests.lst:
diff --git a/ace/Reactor_Impl.h b/ace/Reactor_Impl.h
index 9a279e1dea8..b64da0fa527 100644
--- a/ace/Reactor_Impl.h
+++ b/ace/Reactor_Impl.h
@@ -76,8 +76,13 @@ public:
/// Handle one of the notify call on the <handle>. This could be
/// because of a thread trying to unblock the <Reactor_Impl>
- virtual int dispatch_notify (ACE_HANDLE handle) = 0;
+ virtual int dispatch_notify (ACE_Notification_Buffer &buffer) = 0;
+ /// Read one of the notify call on the <handle> into the
+ /// <buffer>. This could be because of a thread trying to unblock
+ /// the <Reactor_Impl>
+ virtual int read_notify_pipe (ACE_HANDLE handle,
+ ACE_Notification_Buffer &buffer) = 0;
/**
* Set the maximum number of times that the <handle_input> method
* will iterate and dispatch the <ACE_Event_Handlers> that are
diff --git a/ace/Select_Reactor_Base.cpp b/ace/Select_Reactor_Base.cpp
index 12402471eba..6eaecc133f3 100644
--- a/ace/Select_Reactor_Base.cpp
+++ b/ace/Select_Reactor_Base.cpp
@@ -684,8 +684,10 @@ ACE_Select_Reactor_Notify::notify (ACE_Event_Handler *eh,
ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, mon, this->notify_queue_lock_, -1);
// No pending notifications.
- if (this->notify_queue_.is_empty ())
- notification_required = 1;
+
+ // We will send notify for every message..
+ // if (this->notify_queue_.is_empty ())
+ // notification_required = 1;
ACE_Notification_Buffer *temp = 0;
@@ -718,7 +720,8 @@ ACE_Select_Reactor_Notify::notify (ACE_Event_Handler *eh,
if (notify_queue_.enqueue_tail (temp) == -1)
return -1;
- if (notification_required)
+ // Let us send a notify for every message
+ // if (notification_required)
{
ssize_t n = ACE::send (this->notification_pipe_.write_handle (),
(char *) &buffer,
@@ -782,11 +785,110 @@ ACE_Select_Reactor_Notify::notify_handle (void)
// Thanks to Paul Stephenson for suggesting this approach.
int
-ACE_Select_Reactor_Notify::dispatch_notify (ACE_HANDLE handle)
+ACE_Select_Reactor_Notify::dispatch_notify (ACE_Notification_Buffer &buffer)
+{
+ int result = 0;
+#if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
+ // Dispatch all messages that are in the <notify_queue_>.
+ {
+ // We acquire the lock in a block to make sure we're not
+ // holding the lock while delivering callbacks...
+ ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, mon, this->notify_queue_lock_, -1);
+
+ ACE_Notification_Buffer *temp;
+
+ if (notify_queue_.is_empty ())
+ break;
+ else if (notify_queue_.dequeue_head (temp) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ ACE_LIB_TEXT ("%p\n"),
+ ACE_LIB_TEXT ("dequeue_head")),
+ -1);
+ buffer = *temp;
+ if (free_queue_.enqueue_head (temp) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ ACE_LIB_TEXT ("%p\n"),
+ ACE_LIB_TEXT ("enqueue_head")),
+ -1);
+ }
+
+ // If eh == 0 then another thread is unblocking the
+ // <ACE_Select_Reactor> to update the <ACE_Select_Reactor>'s
+ // internal structures. Otherwise, we need to dispatch the
+ // appropriate handle_* method on the <ACE_Event_Handler>
+ // pointer we've been passed.
+ if (buffer.eh_ != 0)
+ {
+ int result = 0;
+
+ switch (buffer.mask_)
+ {
+ case ACE_Event_Handler::READ_MASK:
+ case ACE_Event_Handler::ACCEPT_MASK:
+ result = buffer.eh_->handle_input (ACE_INVALID_HANDLE);
+ break;
+ case ACE_Event_Handler::WRITE_MASK:
+ result = buffer.eh_->handle_output (ACE_INVALID_HANDLE);
+ break;
+ case ACE_Event_Handler::EXCEPT_MASK:
+ result = buffer.eh_->handle_exception (ACE_INVALID_HANDLE);
+ break;
+ default:
+ // Should we bail out if we get an invalid mask?
+ ACE_ERROR ((LM_ERROR, ACE_LIB_TEXT ("invalid mask = %d\n"), buffer.mask_));
+ }
+ if (result == -1)
+ buffer.eh_->handle_close (ACE_INVALID_HANDLE,
+ ACE_Event_Handler::EXCEPT_MASK);
+ }
+#else
+ // If eh == 0 then another thread is unblocking the
+ // <ACE_Select_Reactor> to update the <ACE_Select_Reactor>'s
+ // internal structures. Otherwise, we need to dispatch the
+ // appropriate handle_* method on the <ACE_Event_Handler>
+ // pointer we've been passed.
+ if (buffer.eh_ != 0)
+ {
+ switch (buffer.mask_)
+ {
+ case ACE_Event_Handler::READ_MASK:
+ case ACE_Event_Handler::ACCEPT_MASK:
+ result = buffer.eh_->handle_input (ACE_INVALID_HANDLE);
+ break;
+ case ACE_Event_Handler::WRITE_MASK:
+ result = buffer.eh_->handle_output (ACE_INVALID_HANDLE);
+ break;
+ case ACE_Event_Handler::EXCEPT_MASK:
+ result = buffer.eh_->handle_exception (ACE_INVALID_HANDLE);
+ break;
+ case ACE_Event_Handler::QOS_MASK:
+ result = buffer.eh_->handle_qos (ACE_INVALID_HANDLE);
+ break;
+ case ACE_Event_Handler::GROUP_QOS_MASK:
+ result = buffer.eh_->handle_group_qos (ACE_INVALID_HANDLE);
+ break;
+ default:
+ // Should we bail out if we get an invalid mask?
+ ACE_ERROR ((LM_ERROR,
+ ACE_LIB_TEXT ("invalid mask = %d\n"),
+ buffer.mask_));
+ }
+ if (result == -1)
+ buffer.eh_->handle_close (ACE_INVALID_HANDLE,
+ ACE_Event_Handler::EXCEPT_MASK);
+ }
+
+#endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */
+
+ return result;
+}
+
+int
+ACE_Select_Reactor_Notify::read_notify_pipe (ACE_HANDLE handle,
+ ACE_Notification_Buffer &buffer)
{
ACE_TRACE ("ACE_Select_Reactor_Notify::dispatch_notify");
- ACE_Notification_Buffer buffer;
ssize_t n = 0;
if ((n = ACE::recv (handle, (char *) &buffer, sizeof buffer)) > 0)
@@ -806,102 +908,7 @@ ACE_Select_Reactor_Notify::dispatch_notify (ACE_HANDLE handle)
return -1;
}
-#if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
- // Dispatch all messages that are in the <notify_queue_>.
- for (;;)
- {
- {
- // We acquire the lock in a block to make sure we're not
- // holding the lock while delivering callbacks...
- ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, mon, this->notify_queue_lock_, -1);
-
- ACE_Notification_Buffer *temp;
-
- if (notify_queue_.is_empty ())
- break;
- else if (notify_queue_.dequeue_head (temp) == -1)
- ACE_ERROR_RETURN ((LM_ERROR,
- ACE_LIB_TEXT ("%p\n"),
- ACE_LIB_TEXT ("dequeue_head")),
- -1);
- buffer = *temp;
- if (free_queue_.enqueue_head (temp) == -1)
- ACE_ERROR_RETURN ((LM_ERROR,
- ACE_LIB_TEXT ("%p\n"),
- ACE_LIB_TEXT ("enqueue_head")),
- -1);
- }
-
- // If eh == 0 then another thread is unblocking the
- // <ACE_Select_Reactor> to update the <ACE_Select_Reactor>'s
- // internal structures. Otherwise, we need to dispatch the
- // appropriate handle_* method on the <ACE_Event_Handler>
- // pointer we've been passed.
- if (buffer.eh_ != 0)
- {
- int result = 0;
-
- switch (buffer.mask_)
- {
- case ACE_Event_Handler::READ_MASK:
- case ACE_Event_Handler::ACCEPT_MASK:
- result = buffer.eh_->handle_input (ACE_INVALID_HANDLE);
- break;
- case ACE_Event_Handler::WRITE_MASK:
- result = buffer.eh_->handle_output (ACE_INVALID_HANDLE);
- break;
- case ACE_Event_Handler::EXCEPT_MASK:
- result = buffer.eh_->handle_exception (ACE_INVALID_HANDLE);
- break;
- default:
- // Should we bail out if we get an invalid mask?
- ACE_ERROR ((LM_ERROR, ACE_LIB_TEXT ("invalid mask = %d\n"), buffer.mask_));
- }
- if (result == -1)
- buffer.eh_->handle_close (ACE_INVALID_HANDLE,
- ACE_Event_Handler::EXCEPT_MASK);
- }
- }
-#else
- // If eh == 0 then another thread is unblocking the
- // <ACE_Select_Reactor> to update the <ACE_Select_Reactor>'s
- // internal structures. Otherwise, we need to dispatch the
- // appropriate handle_* method on the <ACE_Event_Handler>
- // pointer we've been passed.
- if (buffer.eh_ != 0)
- {
- int result = 0;
-
- switch (buffer.mask_)
- {
- case ACE_Event_Handler::READ_MASK:
- case ACE_Event_Handler::ACCEPT_MASK:
- result = buffer.eh_->handle_input (ACE_INVALID_HANDLE);
- break;
- case ACE_Event_Handler::WRITE_MASK:
- result = buffer.eh_->handle_output (ACE_INVALID_HANDLE);
- break;
- case ACE_Event_Handler::EXCEPT_MASK:
- result = buffer.eh_->handle_exception (ACE_INVALID_HANDLE);
- break;
- case ACE_Event_Handler::QOS_MASK:
- result = buffer.eh_->handle_qos (ACE_INVALID_HANDLE);
- break;
- case ACE_Event_Handler::GROUP_QOS_MASK:
- result = buffer.eh_->handle_group_qos (ACE_INVALID_HANDLE);
- break;
- default:
- // Should we bail out if we get an invalid mask?
- ACE_ERROR ((LM_ERROR,
- ACE_LIB_TEXT ("invalid mask = %d\n"),
- buffer.mask_));
- }
- if (result == -1)
- buffer.eh_->handle_close (ACE_INVALID_HANDLE,
- ACE_Event_Handler::EXCEPT_MASK);
- }
-#endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */
return 1;
}
@@ -922,9 +929,20 @@ ACE_Select_Reactor_Notify::handle_input (ACE_HANDLE handle)
int number_dispatched = 0;
int result = 0;
- while ((result = this->dispatch_notify (handle) > 0))
+ ACE_Notification_Buffer buffer;
+
+ while ((result = this->read_notify_pipe (handle,
+ buffer) > 0))
{
- number_dispatched++;
+ // Dispatch the buffer
+ if (this->dispatch_notify (buffer) > 0)
+ number_dispatched++;
+ else
+ {
+ // If there are any errors in dispatching...
+ result = -1;
+ break;
+ }
// Bail out if we've reached the <notify_threshold_>. Note that
// by default <notify_threshold_> is -1, so we'll loop until all
diff --git a/ace/Select_Reactor_Base.h b/ace/Select_Reactor_Base.h
index 9d60cb1f8aa..db34c2bd0aa 100644
--- a/ace/Select_Reactor_Base.h
+++ b/ace/Select_Reactor_Base.h
@@ -151,7 +151,13 @@ public:
/// Handle one of the notify call on the <handle>. This could be
/// because of a thread trying to unblock the <Reactor_Impl>
- virtual int dispatch_notify (ACE_HANDLE handle);
+ virtual int dispatch_notify (ACE_Notification_Buffer &buffer);
+
+ /// Read one of the notify call on the <handle> into the
+ /// <buffer>. This could be because of a thread trying to unblock
+ /// the <Reactor_Impl>
+ virtual int read_notify_pipe (ACE_HANDLE handle,
+ ACE_Notification_Buffer &buffer);
/// Called back by the <ACE_Select_Reactor> when a thread wants to
/// unblock us.
@@ -193,7 +199,7 @@ public:
/// Declare the dynamic allocation hooks.
ACE_ALLOC_HOOK_DECLARE;
-private:
+protected:
/**
* Keep a back pointer to the <ACE_Select_Reactor>. If this value
* if NULL then the <ACE_Select_Reactor> has been initialized with
diff --git a/ace/TP_Reactor.cpp b/ace/TP_Reactor.cpp
index 8b8fbc97c40..2fffb8b6b9a 100644
--- a/ace/TP_Reactor.cpp
+++ b/ace/TP_Reactor.cpp
@@ -4,7 +4,6 @@
#include "ace/TP_Reactor.h"
#include "ace/Reactor.h"
#include "ace/Thread.h"
-#include "ace/Log_Msg.h"
#if !defined (__ACE_INLINE__)
#include "ace/TP_Reactor.i"
@@ -56,9 +55,6 @@ ACE_TP_Token_Guard::grab_token (ACE_Time_Value *max_wait_time)
return result;
}
-/************************************************************************/
-// Methods for ACE_TP_Reactor
-/************************************************************************/
ACE_TP_Reactor::ACE_TP_Reactor (ACE_Sig_Handler *sh,
ACE_Timer_Queue *tq,
@@ -80,23 +76,25 @@ ACE_TP_Reactor::ACE_TP_Reactor (size_t size,
this->supress_notify_renew (1);
}
-
-
-void
-ACE_TP_Reactor::max_notify_iterations (int /*iterations*/)
+int
+ACE_TP_Reactor::owner (ACE_thread_t, ACE_thread_t *o_id)
{
- ACE_TRACE ("ACE_TP_Reactor::max_notify_iterations");
+ ACE_TRACE ("ACE_TP_Reactor::owner");
+ if (o_id)
+ *o_id = ACE_Thread::self ();
+
+ return 0;
- ACE_ERROR ((LM_ERROR,
- "(%P|%t) This has no effect on the notify processing \n"));
}
int
-ACE_TP_Reactor::max_notify_iterations (void)
+ACE_TP_Reactor::owner (ACE_thread_t *t_id)
{
- ACE_TRACE ("ACE_TP_Reactor::max_notify_iterations");
+ ACE_TRACE ("ACE_TP_Reactor::owner");
+ *t_id = ACE_Thread::self ();
return 0;
+
}
@@ -105,42 +103,50 @@ ACE_TP_Reactor::handle_events (ACE_Time_Value *max_wait_time)
{
ACE_TRACE ("ACE_TP_Reactor::handle_events");
- int result = 0;
-
// Stash the current time -- the destructor of this object will
// automatically compute how much time elpased since this method was
// called.
ACE_Countdown_Time countdown (max_wait_time);
+ // The order of these events is very subtle, modify with care.
+
+
// Instantiate the token guard which will try grabbing the token for
// this thread.
- ACE_TP_Token_Guard guard (this->token_,
- max_wait_time,
- result);
+ ACE_TP_Token_Guard guard (this->token_);
+
+
+ int result = guard.grab_token (max_wait_time);
// If the guard is NOT the owner just return the retval
if (!guard.is_owner ())
return result;
+ // After getting the lock just just for deactivation..
+ if (this->deactivated_)
+ return -1;
+
// Update the countdown to reflect time waiting for the token.
countdown.update ();
- // After acquiring the lock, check if we have been deactivated. If
- // we are deactivated, simply return without handling further
- // events.
- if (this->deactivated_)
- {
- return -1;
- }
+ return this->dispatch_i (max_wait_time,
+ guard);
+}
- // We got the lock, lets handle some events. We collect the events
- // that we need to handle. We release the token and then handle
- // those events that needs handling.
+
+int
+ACE_TP_Reactor::dispatch_i (ACE_Time_Value *max_wait_time,
+ ACE_TP_Token_Guard &guard)
+{
int event_count =
this->get_event_for_dispatching (max_wait_time);
+ int result = 0;
+ // Note: We are passing the <event_count> around, to have record of
+ // how many events still need processing. May be this could be
+ // useful in future.
// Dispatch signals
if (event_count == -1)
@@ -161,14 +167,14 @@ ACE_TP_Reactor::handle_events (ACE_Time_Value *max_wait_time)
// handle timers early since they may have higher latency
// constraints than I/O handlers. Ideally, the order of
// dispatching should be a strategy...
- int retval = this->handle_timer_events (event_count,
- guard);
+ result = this->handle_timer_events (event_count,
+ guard);
- if (retval > 0)
- return retval;
+ if (result > 0)
+ return result;
- // Else just fall through for further handling
- }
+ // Else just fall through for further handling
+ }
if (event_count > 0)
@@ -176,128 +182,27 @@ ACE_TP_Reactor::handle_events (ACE_Time_Value *max_wait_time)
// Next dispatch the notification handlers (if there are any to
// dispatch). These are required to handle multiple-threads that
// are trying to update the <Reactor>.
- int retval = this->handle_notify_events (event_count,
- guard);
+ result = this->handle_notify_events (event_count,
+ guard);
- if (retval > 0)
- return retval;
+ if (result > 0)
+ return result;
- // Else just fall through for further handling
+ // Else just fall through for further handling
}
-
- if (event_count > 0)
- {
- // Handle socket events
- return this->handle_socket_events (event_count,
- guard);
- }
-
- return 0;
-}
-
-int
-ACE_TP_Reactor::handle_events (ACE_Time_Value &max_wait_time)
-{
- ACE_TRACE ("ACE_TP_Reactor::handle_events");
- return this->handle_events (&max_wait_time);
-}
-
-int
-ACE_TP_Reactor::resumable_handler (void)
-{
- return 1;
-}
-
-int
-ACE_TP_Reactor::mask_ops (ACE_HANDLE handle,
- ACE_Reactor_Mask mask,
- int ops)
-{
- ACE_TRACE ("ACE_TP_Reactor::mask_ops");
- ACE_MT (ACE_GUARD_RETURN (ACE_Select_Reactor_Token,
- ace_mon, this->token_, -1));
-
- int result = 0;
-
- // If it looks like the handle isn't suspended, then
- // set the ops on the wait_set_, otherwise set the suspend_set_.
-
- if (this->suspend_set_.rd_mask_.is_set (handle) == 0
- && this->suspend_set_.wr_mask_.is_set (handle) == 0
- && this->suspend_set_.ex_mask_.is_set (handle) == 0)
-
- result = this->bit_ops (handle, mask,
- this->wait_set_,
- ops);
- else
-
- result = this->bit_ops (handle, mask,
- this->suspend_set_,
- ops);
-
- return result;
-}
-
-
-int
-ACE_TP_Reactor::mask_ops (ACE_Event_Handler *eh,
- ACE_Reactor_Mask mask,
- int ops)
-{
- ACE_TRACE ("ACE_TP_Reactor::mask_ops");
- return this->mask_ops (eh->get_handle (), mask, ops);
-}
-
-
-int
-ACE_TP_Reactor::owner (ACE_thread_t, ACE_thread_t *o_id)
-{
- ACE_TRACE ("ACE_TP_Reactor::owner");
- if (o_id)
- *o_id = ACE_Thread::self ();
+ if (event_count > 0)
+ {
+ // Handle socket events
+ return this->handle_socket_events (event_count,
+ guard);
+ }
return 0;
}
-int
-ACE_TP_Reactor::owner (ACE_thread_t *t_id)
-{
- ACE_TRACE ("ACE_TP_Reactor::owner");
- *t_id = ACE_Thread::self ();
- return 0;
-}
-
-
-int
-ACE_TP_Reactor::get_event_for_dispatching (ACE_Time_Value *max_wait_time)
-{
- ACE_TRACE ("ACE_TP_Reactor::get_event_for_dispatching");
- // If the reactor handler state has changed, clear any remembered
- // ready bits and re-scan from the master wait_set.
- if (this->state_changed_)
- {
- this->ready_set_.rd_mask_.reset ();
- this->ready_set_.wr_mask_.reset ();
- this->ready_set_.ex_mask_.reset ();
- this->state_changed_ = 0;
- }
- else
- {
- // This is a hack... somewhere, under certain conditions (which
- // I don't understand...) the mask will have all of its bits clear,
- // yet have a size_ > 0. This is an attempt to remedy the affect,
- // without knowing why it happens.
- this->ready_set_.rd_mask_.sync (this->ready_set_.rd_mask_.max_set ());
- this->ready_set_.wr_mask_.sync (this->ready_set_.wr_mask_.max_set ());
- this->ready_set_.ex_mask_.sync (this->ready_set_.ex_mask_.max_set ());
- }
-
- return this->wait_for_multiple_events (this->ready_set_,
- max_wait_time);
-}
int
@@ -378,6 +283,7 @@ ACE_TP_Reactor::handle_timer_events (int &event_count,
}
+
int
ACE_TP_Reactor::handle_notify_events (int &event_count,
ACE_TP_Token_Guard &guard)
@@ -388,61 +294,83 @@ ACE_TP_Reactor::handle_notify_events (int &event_count,
if (notify_handle != ACE_INVALID_HANDLE)
{
+
// Clear the handle of the read_mask of our <ready_set_>
this->ready_set_.rd_mask_.clr_bit (notify_handle);
- // Decrement the number of events that needs handling yet.
- event_count--;
+ // Now just do a read on the pipe..
+ ACE_Notification_Buffer buffer;
- // Release the token before dispatching notifies...
- guard.release_token ();
+ if (this->notify_handler_->read_notify_pipe (notify_handle,
+ buffer) >= 0)
+ {
+ event_count--;
+
+ // Release the token before dispatching notifies...
+ guard.release_token ();
+
+ this->notify_handler_->dispatch_notify (buffer);
+
+ this->renew ();
+ return 1;
+ }
- // Dipatch and return
- return
- this->notify_handler_->dispatch_notify (notify_handle);
+ return 0;
}
return 0;
}
+
int
ACE_TP_Reactor::handle_socket_events (int &event_count,
ACE_TP_Token_Guard &guard)
{
- ACE_TRACE ("ACE_TP_Reactor::handle_socket_events");
+ // We got the lock, lets handle some events. Note: this method will
+ // *not* dispatch any I/O handlers. It will dispatch signals,
+ // timeouts, and notifications.
ACE_EH_Dispatch_Info dispatch_info;
- // Get the socket event dispatch information
+ this->get_socket_event_info (dispatch_info);
+
// If there is any event handler that is ready to be dispatched, the
// dispatch information is recorded in dispatch_info.
- int result =
- this->get_socket_event_info (dispatch_info);
-
-
- if (result)
+ if (dispatch_info.dispatch ())
{
// Suspend the handler so that other threads don't start
// dispatching it.
- this->suspend_i (dispatch_info.handle_);
+ // Make sure we never suspend the notify_handler_ without holding
+ // the lock.
+ // @@ Actually, we don't even need to suspend the notify_handler_
+ // here. But let me get it to work first.
+ if (dispatch_info.event_handler_ != this->notify_handler_)
+ this->suspend_i (dispatch_info.handle_);
+ }
- // Decrement the number of events that needs handling yet.
- event_count--;
+ /// Decrement the event left
+ --event_count;
- // Release the token before dispatching notifies...
- guard.release_token ();
+ // Release the lock. Others threads can start waiting.
+ guard.release_token ();
- result = this->dispatch_socket_events (dispatch_info);
+ int result = 0;
+ // If there was an event handler ready, dispatch it.
+ if (dispatch_info.dispatch ())
+ {
+ if (this->dispatch_socket_event (dispatch_info) == 0)
+ ++result; // Dispatched one more event
int flag = 0;
if (dispatch_info.event_handler_ != 0)
{
- flag =
- dispatch_info.event_handler_->resume_handler ();
+ flag =
+ dispatch_info.event_handler_->resume_handler ();
}
if (dispatch_info.handle_ != ACE_INVALID_HANDLE &&
+ dispatch_info.event_handler_ != this->notify_handler_ &&
flag == 0)
this->resume_handler (dispatch_info.handle_);
}
@@ -450,67 +378,93 @@ ACE_TP_Reactor::handle_socket_events (int &event_count,
return result;
}
+int
+ACE_TP_Reactor::mask_ops (ACE_HANDLE handle,
+ ACE_Reactor_Mask mask,
+ int ops)
+{
+ ACE_TRACE ("ACE_TP_Reactor::mask_ops");
+ ACE_MT (ACE_GUARD_RETURN (ACE_Select_Reactor_Token,
+ ace_mon, this->token_, -1));
-ACE_HANDLE
-ACE_TP_Reactor::get_notify_handle (void)
+ int result = 0;
+
+ // If it looks like the handle isn't suspended, then
+ // set the ops on the wait_set_, otherwise set the suspend_set_.
+
+ if (this->suspend_set_.rd_mask_.is_set (handle) == 0
+ && this->suspend_set_.wr_mask_.is_set (handle) == 0
+ && this->suspend_set_.ex_mask_.is_set (handle) == 0)
+
+ result = this->bit_ops (handle, mask,
+ this->wait_set_,
+ ops);
+ else
+
+ result = this->bit_ops (handle, mask,
+ this->suspend_set_,
+ ops);
+
+ return result;
+}
+
+
+
+int
+ACE_TP_Reactor::get_event_for_dispatching (ACE_Time_Value *max_wait_time)
{
- // Call the notify handler to get a handle on which we would have a
- // notify waiting
- ACE_HANDLE read_handle =
- this->notify_handler_->notify_handle ();
- // Check whether the rd_mask has been set on that handle. If so
- // return the handle.
- if (read_handle != ACE_INVALID_HANDLE &&
- this->ready_set_.rd_mask_.is_set (read_handle))
+ // If the reactor handler state has changed, clear any remembered
+ // ready bits and re-scan from the master wait_set.
+ if (this->state_changed_)
{
- return read_handle;
+ this->ready_set_.rd_mask_.reset ();
+ this->ready_set_.wr_mask_.reset ();
+ this->ready_set_.ex_mask_.reset ();
+ this->state_changed_ = 0;
}
+ else
+ {
+ // This is a hack... somewhere, under certain conditions (which
+ // I don't understand...) the mask will have all of its bits clear,
+ // yet have a size_ > 0. This is an attempt to remedy the affect,
+ // without knowing why it happens.
- return ACE_INVALID_HANDLE;
-}
-
+ //# if !(defined (__SUNPRO_CC) && (__SUNPRO_CC > 0x500))
+ // SunCC seems to be having problems with this piece of code
+ // here. I am not sure why though. This works fine with other
+ // compilers. As we dont seem to understand when this piece of
+ // code is needed and as it creates problems for SunCC we will
+ // not compile this. Most of the tests in TAO seem to be happy
+ // without this in SunCC.
+ this->ready_set_.rd_mask_.sync (this->ready_set_.rd_mask_.max_set ());
+ this->ready_set_.wr_mask_.sync (this->ready_set_.wr_mask_.max_set ());
+ this->ready_set_.ex_mask_.sync (this->ready_set_.ex_mask_.max_set ());
+ //# endif /* ! __SUNPRO_CC */
+ }
+ return this->wait_for_multiple_events (this->ready_set_,
+ max_wait_time);
+}
int
ACE_TP_Reactor::get_socket_event_info (ACE_EH_Dispatch_Info &event)
{
+ event.reset (); // Nothing to dispatch yet
// Check for dispatch in write, except, read. Only catch one, but if
// one is caught, be sure to clear the handle from each mask in case
// there is more than one mask set for it. This would cause problems
// if the handler is suspended for dispatching, but its set bit in
// another part of ready_set_ kept it from being dispatched.
+ int found_io = 0;
ACE_HANDLE handle;
- // Look at the read masks
- {
- ACE_Handle_Set_Iterator handle_iter (this->ready_set_.rd_mask_);
-
- while ((handle = handle_iter ()) != ACE_INVALID_HANDLE)
- {
- if (this->is_suspended_i (handle))
- continue;
-
- // Remember this info
- event.set (handle,
- this->handler_rep_.find (handle),
- ACE_Event_Handler::READ_MASK,
- &ACE_Event_Handler::handle_input);
- this->ready_set_.rd_mask_.clr_bit (handle);
- this->ready_set_.wr_mask_.clr_bit (handle);
- this->ready_set_.ex_mask_.clr_bit (handle);
- return 1;
- }
- }
-
- // We havent found any rd_masks for processing yet, so look for
- // write masks
{
ACE_Handle_Set_Iterator handle_iter (this->ready_set_.wr_mask_);
- while ((handle = handle_iter ()) != ACE_INVALID_HANDLE)
+ while (!found_io && (handle = handle_iter ()) != ACE_INVALID_HANDLE)
{
if (this->is_suspended_i (handle))
continue;
@@ -523,45 +477,67 @@ ACE_TP_Reactor::get_socket_event_info (ACE_EH_Dispatch_Info &event)
this->ready_set_.wr_mask_.clr_bit (handle);
this->ready_set_.ex_mask_.clr_bit (handle);
this->ready_set_.rd_mask_.clr_bit (handle);
- return 1;
+ found_io = 1;
}
}
- // We havent found any rd_mask and wr_masks for processing yet, so
- // look for ex_masks
- {
- ACE_Handle_Set_Iterator handle_iter (this->ready_set_.ex_mask_);
+ if (!found_io)
+ {
+ ACE_Handle_Set_Iterator handle_iter (this->ready_set_.ex_mask_);
- while ((handle = handle_iter ()) != ACE_INVALID_HANDLE)
- {
- if (this->is_suspended_i (handle))
- continue;
+ while (!found_io && (handle = handle_iter ()) != ACE_INVALID_HANDLE)
+ {
+ if (this->is_suspended_i (handle))
+ continue;
+
+ // Remember this info
+ event.set (handle,
+ this->handler_rep_.find (handle),
+ ACE_Event_Handler::EXCEPT_MASK,
+ &ACE_Event_Handler::handle_exception);
+ this->ready_set_.ex_mask_.clr_bit (handle);
+ this->ready_set_.wr_mask_.clr_bit (handle);
+ this->ready_set_.rd_mask_.clr_bit (handle);
+ found_io = 1;
+ }
+ }
- // Remember this info
- event.set (handle,
- this->handler_rep_.find (handle),
- ACE_Event_Handler::EXCEPT_MASK,
- &ACE_Event_Handler::handle_exception);
- this->ready_set_.ex_mask_.clr_bit (handle);
- this->ready_set_.wr_mask_.clr_bit (handle);
- this->ready_set_.rd_mask_.clr_bit (handle);
- return 1;
- }
+ if (!found_io)
+ {
+ ACE_Handle_Set_Iterator handle_iter (this->ready_set_.rd_mask_);
+
+ while (!found_io && (handle = handle_iter ()) != ACE_INVALID_HANDLE)
+ {
+ if (this->is_suspended_i (handle))
+ continue;
+
+ // Remember this info
+ event.set (handle,
+ this->handler_rep_.find (handle),
+ ACE_Event_Handler::READ_MASK,
+ &ACE_Event_Handler::handle_input);
+ this->ready_set_.rd_mask_.clr_bit (handle);
+ this->ready_set_.wr_mask_.clr_bit (handle);
+ this->ready_set_.ex_mask_.clr_bit (handle);
+ found_io = 1;
+ }
}
- // We didnt find any..
- return 0;
+ return found_io;
}
+
+
+// Dispatches a single event handler
int
-ACE_TP_Reactor::dispatch_socket_events (ACE_EH_Dispatch_Info &info)
+ACE_TP_Reactor::dispatch_socket_event (ACE_EH_Dispatch_Info &dispatch_info)
{
- ACE_TRACE ("ACE_TP_Reactor::dispatch_socket_events");
+ ACE_TRACE ("ACE_TP_Reactor::notify_handle");
- ACE_HANDLE handle = info.handle_;
- ACE_Event_Handler *event_handler = info.event_handler_;
- ACE_Reactor_Mask mask = info.mask_;
- ACE_EH_PTMF callback = info.callback_;
+ ACE_HANDLE handle = dispatch_info.handle_;
+ ACE_Event_Handler *event_handler = dispatch_info.event_handler_;
+ ACE_Reactor_Mask mask = dispatch_info.mask_;
+ ACE_EH_PTMF callback = dispatch_info.callback_;
// Check for removed handlers.
if (event_handler == 0)
@@ -583,17 +559,37 @@ ACE_TP_Reactor::dispatch_socket_events (ACE_EH_Dispatch_Info &info)
this->remove_handler (handle, mask);
// As the handler is no longer valid, invalidate the handle
- info.event_handler_ = 0;
- info.handle_ = ACE_INVALID_HANDLE;
+ dispatch_info.event_handler_ = 0;
+ dispatch_info.handle_ = ACE_INVALID_HANDLE;
return retval;
}
+ // assert (status >= 0);
+ return 0;
+}
+
+int
+ACE_TP_Reactor::resumable_handler (void)
+{
return 1;
}
+ACE_INLINE int
+ACE_TP_Reactor::handle_events (ACE_Time_Value &max_wait_time)
+{
+ return ACE_Select_Reactor::handle_events (max_wait_time);
+}
+
+ACE_INLINE int
+ACE_TP_Reactor::mask_ops (ACE_Event_Handler *eh,
+ ACE_Reactor_Mask mask,
+ int ops)
+{
+ return this->mask_ops (eh->get_handle (), mask, ops);
+}
-void
+ACE_INLINE void
ACE_TP_Reactor::notify_handle (ACE_HANDLE,
ACE_Reactor_Mask,
ACE_Handle_Set &,
@@ -603,3 +599,30 @@ ACE_TP_Reactor::notify_handle (ACE_HANDLE,
ACE_ERROR ((LM_ERROR,
ACE_LIB_TEXT ("ACE_TP_Reactor::notify_handle: Wrong version of notify_handle() gets called")));
}
+
+ACE_HANDLE
+ACE_TP_Reactor::get_notify_handle (void)
+{
+ // Call the notify handler to get a handle on which we would have a
+ // notify waiting
+ ACE_HANDLE read_handle =
+ this->notify_handler_->notify_handle ();
+
+ // Check whether the rd_mask has been set on that handle. If so
+ // return the handle.
+ // if (read_handle != ACE_INVALID_HANDLE &&
+ //this->ready_set_.rd_mask_.is_set (read_handle))
+ if (read_handle != ACE_INVALID_HANDLE)
+ {
+ ACE_Handle_Set_Iterator handle_iter (this->ready_set_.rd_mask_);
+ ACE_HANDLE handle = ACE_INVALID_HANDLE;
+
+ while ((handle = handle_iter ()) == read_handle)
+ {
+ return read_handle;
+ }
+ }
+
+ // None found..
+ return ACE_INVALID_HANDLE;
+}
diff --git a/ace/TP_Reactor.h b/ace/TP_Reactor.h
index b916aa81a22..83f2fbcfd4b 100644
--- a/ace/TP_Reactor.h
+++ b/ace/TP_Reactor.h
@@ -32,13 +32,12 @@
#include "ace/pre.h"
#include "ace/Select_Reactor.h"
-
+#include "ace/Log_Msg.h"
#if !defined (ACE_LACKS_PRAGMA_ONCE)
# pragma once
#endif /* ACE_LACKS_PRAGMA_ONCE */
-
/**
* @class ACE_EH_Dispatch_Info
*
@@ -89,9 +88,7 @@ class ACE_Export ACE_TP_Token_Guard
public:
/// Constructor that will grab the token for us
- ACE_TP_Token_Guard (ACE_Select_Reactor_Token &token,
- ACE_Time_Value *max_wait_time,
- int &result);
+ ACE_TP_Token_Guard (ACE_Select_Reactor_Token &token);
/// Destructor. This will release the token if it hasnt been
/// released till this point
@@ -104,8 +101,6 @@ public:
/// token or not.
int is_owner (void);
-private:
-
/// A helper method that grabs the token for us, after which the
/// thread that owns that can do some actual work.
int grab_token (ACE_Time_Value *max_wait_time);
@@ -126,7 +121,6 @@ private:
ACE_UNIMPLEMENTED_FUNC (ACE_TP_Token_Guard (void))
};
-
/**
* @class ACE_TP_Reactor
*
@@ -184,10 +178,6 @@ public:
ACE_Timer_Queue * = 0,
int mask_signals = 1);
- // = Reactor calls
- virtual void max_notify_iterations (int iter);
- virtual int max_notify_iterations (void);
-
// = Event loop drivers.
/**
@@ -247,21 +237,31 @@ public:
protected:
// = Internal methods that do the actual work.
+
+ /// Dispatch just 1 signal, timer, notification handlers
+ int dispatch_i (ACE_Time_Value *max_wait_time,
+ ACE_TP_Token_Guard &guard);
+
/// Get the event that needs dispatching.It could be either a
/// signal, timer, notification handlers or return possibly 1 I/O
/// handler for dispatching. In the most common use case, this would
/// return 1 I/O handler for dispatching
int get_event_for_dispatching (ACE_Time_Value *max_wait_time);
+ /// Method to handle signals
+ /// NOTE: It is just busted at this point in time.
int handle_signals (int &event_count,
ACE_TP_Token_Guard &g);
+ /// Handle timer events
int handle_timer_events (int &event_count,
ACE_TP_Token_Guard &g);
+ /// Handle notify events
int handle_notify_events (int &event_count,
ACE_TP_Token_Guard &g);
+ /// handle socket events
int handle_socket_events (int &event_count,
ACE_TP_Token_Guard &g);
@@ -273,11 +273,16 @@ protected:
ACE_EH_PTMF callback);
private:
+ /// Get the handle of the notify pipe from the ready set if there is
+ /// an event in the notify pipe.
ACE_HANDLE get_notify_handle (void);
+ /// Get socket event dispatch information.
int get_socket_event_info (ACE_EH_Dispatch_Info &info);
- int dispatch_socket_events (ACE_EH_Dispatch_Info &info);
+ /// Notify the appropriate <callback> in the context of the <eh>
+ /// associated with <handle> that a particular event has occurred.
+ int dispatch_socket_event (ACE_EH_Dispatch_Info &dispatch_info);
private:
/// Deny access since member-wise won't work...
diff --git a/ace/TP_Reactor.i b/ace/TP_Reactor.i
index f169ddca174..9cbec199f80 100644
--- a/ace/TP_Reactor.i
+++ b/ace/TP_Reactor.i
@@ -48,14 +48,11 @@ ACE_EH_Dispatch_Info::dispatch (void) const
/************************************************************************/
ACE_INLINE
-ACE_TP_Token_Guard::ACE_TP_Token_Guard (ACE_Select_Reactor_Token &token,
- ACE_Time_Value *max_wait_time,
- int &result)
+ACE_TP_Token_Guard::ACE_TP_Token_Guard (ACE_Select_Reactor_Token &token)
+
:token_ (token),
owner_ (0)
{
- result = this->grab_token (max_wait_time);
-
}
ACE_INLINE
@@ -71,7 +68,7 @@ ACE_TP_Token_Guard::~ACE_TP_Token_Guard (void)
ACE_INLINE void
ACE_TP_Token_Guard::release_token (void)
{
- if (this->owner_ == 1)
+ if (this->owner_)
{
ACE_MT (this->token_.release ());
@@ -90,7 +87,6 @@ ACE_TP_Token_Guard::is_owner (void)
/************************************************************************/
// Methods for ACE_TP_Reactor
/************************************************************************/
-
ACE_INLINE void
ACE_TP_Reactor::no_op_sleep_hook (void *)
{