summaryrefslogtreecommitdiff
path: root/ace/Dev_Poll_Reactor.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'ace/Dev_Poll_Reactor.cpp')
-rw-r--r--ace/Dev_Poll_Reactor.cpp163
1 files changed, 66 insertions, 97 deletions
diff --git a/ace/Dev_Poll_Reactor.cpp b/ace/Dev_Poll_Reactor.cpp
index f0bf6b15a1b..b8d8eb3530d 100644
--- a/ace/Dev_Poll_Reactor.cpp
+++ b/ace/Dev_Poll_Reactor.cpp
@@ -9,23 +9,19 @@ ACE_RCSID (ace,
#if defined (ACE_HAS_EVENT_POLL) || defined (ACE_HAS_DEV_POLL)
-# if defined (ACE_HAS_EVENT_POLL) && defined (linux)
-
-# include "ace/OS_NS_unistd.h"
-# include "ace/OS_NS_fcntl.h"
-
-# include /**/ <sys/epoll.h>
-
-# elif defined (ACE_HAS_DEV_POLL)
+# include "ace/OS_NS_unistd.h"
+# include "ace/OS_NS_fcntl.h"
+# include "ace/OS_NS_stropts.h"
+# if defined (ACE_HAS_EVENT_POLL) && defined (linux)
+# include /**/ <sys/epoll.h>
+# elif defined (ACE_HAS_DEV_POLL)
# if defined (sun)
# include /**/ <sys/devpoll.h>
# elif defined (linux)
# include /**/ <linux/devpoll.h>
# endif /* sun */
-
-# endif /* ACE_HAS_DEV_POLL */
-
+# endif /* ACE_HAS_DEV_POLL */
#if !defined (__ACE_INLINE__)
# include "ace/Dev_Poll_Reactor.inl"
@@ -64,7 +60,7 @@ ACE_Dev_Poll_Reactor_Notify::open (ACE_Reactor_Impl *r,
if (disable_notify_pipe == 0)
{
- this->dp_reactor_ = ACE_dynamic_cast (ACE_Dev_Poll_Reactor *, r);
+ this->dp_reactor_ = dynamic_cast<ACE_Dev_Poll_Reactor *> (r);
if (this->dp_reactor_ == 0)
{
@@ -133,7 +129,7 @@ ACE_Dev_Poll_Reactor_Notify::close (void)
return this->notification_pipe_.close ();
}
-ssize_t
+int
ACE_Dev_Poll_Reactor_Notify::notify (ACE_Event_Handler *eh,
ACE_Reactor_Mask mask,
ACE_Time_Value *timeout)
@@ -185,11 +181,13 @@ ACE_Dev_Poll_Reactor_Notify::notify (ACE_Event_Handler *eh,
ACE_ASSERT (temp != 0);
*temp = buffer;
+ ACE_Dev_Poll_Handler_Guard eh_guard (eh);
+
if (notify_queue_.enqueue_tail (temp) == -1)
return -1;
- // Let us send a notify for every message
- // if (notification_required)
+ // Now pop the pipe to force the callback for dispatching when ready.
+ // @todo - this only needs to write one byte for ACE_HAS_NOTIFICATION_QUEUE.
ssize_t n = ACE::send (this->notification_pipe_.write_handle (),
(char *) &buffer,
sizeof buffer,
@@ -197,10 +195,17 @@ ACE_Dev_Poll_Reactor_Notify::notify (ACE_Event_Handler *eh,
if (n == -1)
return -1;
+ // Since the notify is queued (and maybe already delivered by now)
+ // we can simply release the guard. The dispatch of this notification
+ // will decrement the reference count.
+ eh_guard.release ();
+
return 0;
#else
ACE_Notification_Buffer buffer (eh, mask);
+ ACE_Dev_Poll_Handler_Guard eh_guard (eh);
+
ssize_t n = ACE::send (this->notification_pipe_.write_handle (),
(char *) &buffer,
sizeof buffer,
@@ -208,6 +213,8 @@ ACE_Dev_Poll_Reactor_Notify::notify (ACE_Event_Handler *eh,
if (n == -1)
return -1;
+ eh_guard.release ();
+
return 0;
#endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */
}
@@ -387,6 +394,7 @@ ACE_Dev_Poll_Reactor_Notify::dispatch_notify (ACE_Notification_Buffer &buffer)
ACE_LIB_TEXT ("enqueue_head")),
-1);
}
+#endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */
// If eh == 0 then another thread is unblocking the
// ACE_Dev_Poll_Reactor to update the ACE_Dev_Poll_Reactor's
@@ -395,6 +403,10 @@ ACE_Dev_Poll_Reactor_Notify::dispatch_notify (ACE_Notification_Buffer &buffer)
// pointer we've been passed.
if (buffer.eh_ != 0)
{
+ // Guard the handler's refcount. Recall that when the notify
+ // was queued, the refcount was incremented, so it need not be
+ // now. The guard insures that it is decremented properly.
+ ACE_Dev_Poll_Handler_Guard eh_guard (buffer.eh_, false);
switch (buffer.mask_)
{
@@ -418,44 +430,6 @@ ACE_Dev_Poll_Reactor_Notify::dispatch_notify (ACE_Notification_Buffer &buffer)
buffer.eh_->handle_close (ACE_INVALID_HANDLE,
ACE_Event_Handler::EXCEPT_MASK);
}
-#else
- // If eh == 0 then another thread is unblocking the
- // ACE_Dev_Poll_Reactor to update the ACE_Dev_Poll_Reactor's
- // internal structures. Otherwise, we need to dispatch the
- // appropriate handle_* method on the ACE_Event_Handler
- // pointer we've been passed.
- if (buffer.eh_ != 0)
- {
- switch (buffer.mask_)
- {
- case ACE_Event_Handler::READ_MASK:
- case ACE_Event_Handler::ACCEPT_MASK:
- result = buffer.eh_->handle_input (ACE_INVALID_HANDLE);
- break;
- case ACE_Event_Handler::WRITE_MASK:
- result = buffer.eh_->handle_output (ACE_INVALID_HANDLE);
- break;
- case ACE_Event_Handler::EXCEPT_MASK:
- result = buffer.eh_->handle_exception (ACE_INVALID_HANDLE);
- break;
- case ACE_Event_Handler::QOS_MASK:
- result = buffer.eh_->handle_qos (ACE_INVALID_HANDLE);
- break;
- case ACE_Event_Handler::GROUP_QOS_MASK:
- result = buffer.eh_->handle_group_qos (ACE_INVALID_HANDLE);
- break;
- default:
- // Should we bail out if we get an invalid mask?
- ACE_ERROR ((LM_ERROR,
- ACE_LIB_TEXT ("invalid mask = %d\n"),
- buffer.mask_));
- }
- if (result == -1)
- buffer.eh_->handle_close (ACE_INVALID_HANDLE,
- ACE_Event_Handler::EXCEPT_MASK);
- }
-
-#endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */
return 1;
}
@@ -1193,7 +1167,21 @@ ACE_Dev_Poll_Reactor::handle_events_i (ACE_Time_Value *max_wait_time)
if (result == 0 || (result == -1 && errno == ETIME))
return 0;
else if (result == -1)
- return -1;
+ {
+ if (errno != EINTR)
+ return -1;
+
+ // Bail out -- we got here since the poll was interrupted.
+ // If it was due to a signal registered through our ACE_Sig_Handler,
+ // then it was dispatched, so we count it in the number of events
+ // handled rather than cause an error return.
+ if (ACE_Sig_Handler::sig_pending () != 0)
+ {
+ ACE_Sig_Handler::sig_pending (0);
+ return 1;
+ }
+ return -1;
+ }
// Dispatch the events, if any.
return this->dispatch ();
@@ -1230,45 +1218,15 @@ ACE_Dev_Poll_Reactor::dispatch (void)
// Perform the Template Method for dispatching all the handlers.
- // First check for interrupts.
- if (0 /* active_handle_count == -1 */)
- {
- // Bail out -- we got here since the poll (i.e. ioctl()) was
- // interrupted.
- if (ACE_Sig_Handler::sig_pending () != 0)
- {
- ACE_Sig_Handler::sig_pending (0);
-
-#if 0
- // If any HANDLES in the <ready_set_> are activated as a
- // result of signals they should be dispatched since
- // they may be time critical...
- pfds = this->ready_set_.pfds;
- active_handle_count = this->ready_set_.nfds;
-#endif /* 0 */
-
- // Record the fact that the Reactor has dispatched a
- // handle_signal() method. We need this to return the
- // appropriate count below.
- signal_occurred = 1;
- }
- else
- return -1;
- }
-
// Handle timers early since they may have higher latency
// constraints than I/O handlers. Ideally, the order of
// dispatching should be a strategy...
- else if (this->dispatch_timer_handlers (other_handlers_dispatched) == -1)
+ if (this->dispatch_timer_handlers (other_handlers_dispatched) == -1)
// State has changed or timer queue has failed, exit loop.
break;
// Check to see if there are no more I/O handles left to
// dispatch AFTER we've handled the timers.
- else if (0 /* active_handle_count == 0 */)
- return io_handlers_dispatched
- + other_handlers_dispatched
- + signal_occurred;
#if 0
// Next dispatch the notification handlers (if there are any to
@@ -1390,7 +1348,11 @@ ACE_Dev_Poll_Reactor::dispatch_io_events (int &io_handlers_dispatched)
{
// Modify the reference count in an exception-safe way.
- ACE_Dev_Poll_Handler_Guard (this->handler_rep_, handle);
+ // Note that eh could be the notify handler. It's not strictly
+ // necessary to manage its refcount, but since we don't enable
+ // the counting policy, it won't do much. Management of the
+ // notified handlers themselves is done in the notify handler.
+ ACE_Dev_Poll_Handler_Guard eh_guard (eh);
// Release the lock during the upcall.
ACE_Reverse_Lock<ACE_SYNCH_MUTEX> reverse_lock (this->lock_);
@@ -1406,6 +1368,8 @@ ACE_Dev_Poll_Reactor::dispatch_io_events (int &io_handlers_dispatched)
if (ACE_BIT_ENABLED (revents, POLLOUT))
#endif /* ACE_HAS_EVENT_POLL */
{
+ ++io_handlers_dispatched;
+
const int status =
this->upcall (eh, &ACE_Event_Handler::handle_output, handle);
@@ -1416,8 +1380,6 @@ ACE_Dev_Poll_Reactor::dispatch_io_events (int &io_handlers_dispatched)
return this->remove_handler (handle,
ACE_Event_Handler::WRITE_MASK);
}
-
- ++io_handlers_dispatched;
}
// Dispatch all "high priority" (e.g. out-of-band data) events.
@@ -1427,6 +1389,8 @@ ACE_Dev_Poll_Reactor::dispatch_io_events (int &io_handlers_dispatched)
if (ACE_BIT_ENABLED (revents, POLLPRI))
#endif /* ACE_HAS_EVENT_POLL */
{
+ ++io_handlers_dispatched;
+
const int status =
this->upcall (eh, &ACE_Event_Handler::handle_exception, handle);
@@ -1437,8 +1401,6 @@ ACE_Dev_Poll_Reactor::dispatch_io_events (int &io_handlers_dispatched)
return this->remove_handler (handle,
ACE_Event_Handler::EXCEPT_MASK);
}
-
- ++io_handlers_dispatched;
}
// Dispatch all input events.
@@ -1448,6 +1410,8 @@ ACE_Dev_Poll_Reactor::dispatch_io_events (int &io_handlers_dispatched)
if (ACE_BIT_ENABLED (revents, POLLIN))
#endif /* ACE_HAS_EVENT_POLL */
{
+ ++io_handlers_dispatched;
+
const int status =
this->upcall (eh, &ACE_Event_Handler::handle_input, handle);
@@ -1458,8 +1422,6 @@ ACE_Dev_Poll_Reactor::dispatch_io_events (int &io_handlers_dispatched)
return this->remove_handler (handle,
ACE_Event_Handler::READ_MASK);
}
-
- ++io_handlers_dispatched;
}
} // The reactor lock is reacquired upon leaving this scope.
}
@@ -1668,7 +1630,7 @@ ACE_Dev_Poll_Reactor::register_handler (const ACE_Sig_Set &sigset,
#if (ACE_NSIG > 0) && !defined (CHORUS)
for (int s = 1; s < ACE_NSIG; ++s)
- if (sigset.is_member (s)
+ if ((sigset.is_member (s) == 1)
&& this->signal_handler_->register_handler (s,
new_sh,
new_disp) == -1)
@@ -1727,7 +1689,16 @@ ACE_Dev_Poll_Reactor::remove_handler_i (ACE_HANDLE handle,
return -1;
if (ACE_BIT_DISABLED (mask, ACE_Event_Handler::DONT_CALL))
- (void) eh->handle_close (handle, mask);
+ {
+ // Release the lock during the "close" upcall.
+ ACE_Reverse_Lock<ACE_SYNCH_MUTEX> reverse_lock (this->lock_);
+ ACE_GUARD_RETURN (ACE_Reverse_Lock<ACE_SYNCH_MUTEX>,
+ reverse_guard,
+ reverse_lock,
+ -1);
+
+ (void) eh->handle_close (handle, mask);
+ }
// Note the fact that we've changed the state of the wait_set,
// i.e. the "interest set," which is used by the dispatching loop to
@@ -1786,7 +1757,7 @@ ACE_Dev_Poll_Reactor::remove_handler (const ACE_Sig_Set &sigset)
#if (ACE_NSIG > 0) && !defined (CHORUS)
for (int s = 1; s < ACE_NSIG; ++s)
- if (sigset.is_member (s)
+ if ((sigset.is_member (s) == 1)
&& this->signal_handler_->remove_handler (s) == -1)
result = -1;
@@ -2254,13 +2225,11 @@ ACE_Dev_Poll_Reactor::wakeup_all_threads (void)
{
ACE_TRACE ("ACE_Dev_Poll_Reactor::wakeup_all_threads");
-#if 0
// Send a notification, but don't block if there's no one to receive
// it.
this->notify (0,
ACE_Event_Handler::NULL_MASK,
(ACE_Time_Value *) &ACE_Time_Value::zero);
-#endif /* 0 */
}
int