diff options
Diffstat (limited to 'ace/Dev_Poll_Reactor.cpp')
-rw-r--r-- | ace/Dev_Poll_Reactor.cpp | 163 |
1 files changed, 66 insertions, 97 deletions
diff --git a/ace/Dev_Poll_Reactor.cpp b/ace/Dev_Poll_Reactor.cpp index f0bf6b15a1b..b8d8eb3530d 100644 --- a/ace/Dev_Poll_Reactor.cpp +++ b/ace/Dev_Poll_Reactor.cpp @@ -9,23 +9,19 @@ ACE_RCSID (ace, #if defined (ACE_HAS_EVENT_POLL) || defined (ACE_HAS_DEV_POLL) -# if defined (ACE_HAS_EVENT_POLL) && defined (linux) - -# include "ace/OS_NS_unistd.h" -# include "ace/OS_NS_fcntl.h" - -# include /**/ <sys/epoll.h> - -# elif defined (ACE_HAS_DEV_POLL) +# include "ace/OS_NS_unistd.h" +# include "ace/OS_NS_fcntl.h" +# include "ace/OS_NS_stropts.h" +# if defined (ACE_HAS_EVENT_POLL) && defined (linux) +# include /**/ <sys/epoll.h> +# elif defined (ACE_HAS_DEV_POLL) # if defined (sun) # include /**/ <sys/devpoll.h> # elif defined (linux) # include /**/ <linux/devpoll.h> # endif /* sun */ - -# endif /* ACE_HAS_DEV_POLL */ - +# endif /* ACE_HAS_DEV_POLL */ #if !defined (__ACE_INLINE__) # include "ace/Dev_Poll_Reactor.inl" @@ -64,7 +60,7 @@ ACE_Dev_Poll_Reactor_Notify::open (ACE_Reactor_Impl *r, if (disable_notify_pipe == 0) { - this->dp_reactor_ = ACE_dynamic_cast (ACE_Dev_Poll_Reactor *, r); + this->dp_reactor_ = dynamic_cast<ACE_Dev_Poll_Reactor *> (r); if (this->dp_reactor_ == 0) { @@ -133,7 +129,7 @@ ACE_Dev_Poll_Reactor_Notify::close (void) return this->notification_pipe_.close (); } -ssize_t +int ACE_Dev_Poll_Reactor_Notify::notify (ACE_Event_Handler *eh, ACE_Reactor_Mask mask, ACE_Time_Value *timeout) @@ -185,11 +181,13 @@ ACE_Dev_Poll_Reactor_Notify::notify (ACE_Event_Handler *eh, ACE_ASSERT (temp != 0); *temp = buffer; + ACE_Dev_Poll_Handler_Guard eh_guard (eh); + if (notify_queue_.enqueue_tail (temp) == -1) return -1; - // Let us send a notify for every message - // if (notification_required) + // Now pop the pipe to force the callback for dispatching when ready. + // @todo - this only needs to write one byte for ACE_HAS_NOTIFICATION_QUEUE. ssize_t n = ACE::send (this->notification_pipe_.write_handle (), (char *) &buffer, sizeof buffer, @@ -197,10 +195,17 @@ ACE_Dev_Poll_Reactor_Notify::notify (ACE_Event_Handler *eh, if (n == -1) return -1; + // Since the notify is queued (and maybe already delivered by now) + // we can simply release the guard. The dispatch of this notification + // will decrement the reference count. + eh_guard.release (); + return 0; #else ACE_Notification_Buffer buffer (eh, mask); + ACE_Dev_Poll_Handler_Guard eh_guard (eh); + ssize_t n = ACE::send (this->notification_pipe_.write_handle (), (char *) &buffer, sizeof buffer, @@ -208,6 +213,8 @@ ACE_Dev_Poll_Reactor_Notify::notify (ACE_Event_Handler *eh, if (n == -1) return -1; + eh_guard.release (); + return 0; #endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */ } @@ -387,6 +394,7 @@ ACE_Dev_Poll_Reactor_Notify::dispatch_notify (ACE_Notification_Buffer &buffer) ACE_LIB_TEXT ("enqueue_head")), -1); } +#endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */ // If eh == 0 then another thread is unblocking the // ACE_Dev_Poll_Reactor to update the ACE_Dev_Poll_Reactor's @@ -395,6 +403,10 @@ ACE_Dev_Poll_Reactor_Notify::dispatch_notify (ACE_Notification_Buffer &buffer) // pointer we've been passed. if (buffer.eh_ != 0) { + // Guard the handler's refcount. Recall that when the notify + // was queued, the refcount was incremented, so it need not be + // now. The guard insures that it is decremented properly. + ACE_Dev_Poll_Handler_Guard eh_guard (buffer.eh_, false); switch (buffer.mask_) { @@ -418,44 +430,6 @@ ACE_Dev_Poll_Reactor_Notify::dispatch_notify (ACE_Notification_Buffer &buffer) buffer.eh_->handle_close (ACE_INVALID_HANDLE, ACE_Event_Handler::EXCEPT_MASK); } -#else - // If eh == 0 then another thread is unblocking the - // ACE_Dev_Poll_Reactor to update the ACE_Dev_Poll_Reactor's - // internal structures. Otherwise, we need to dispatch the - // appropriate handle_* method on the ACE_Event_Handler - // pointer we've been passed. - if (buffer.eh_ != 0) - { - switch (buffer.mask_) - { - case ACE_Event_Handler::READ_MASK: - case ACE_Event_Handler::ACCEPT_MASK: - result = buffer.eh_->handle_input (ACE_INVALID_HANDLE); - break; - case ACE_Event_Handler::WRITE_MASK: - result = buffer.eh_->handle_output (ACE_INVALID_HANDLE); - break; - case ACE_Event_Handler::EXCEPT_MASK: - result = buffer.eh_->handle_exception (ACE_INVALID_HANDLE); - break; - case ACE_Event_Handler::QOS_MASK: - result = buffer.eh_->handle_qos (ACE_INVALID_HANDLE); - break; - case ACE_Event_Handler::GROUP_QOS_MASK: - result = buffer.eh_->handle_group_qos (ACE_INVALID_HANDLE); - break; - default: - // Should we bail out if we get an invalid mask? - ACE_ERROR ((LM_ERROR, - ACE_LIB_TEXT ("invalid mask = %d\n"), - buffer.mask_)); - } - if (result == -1) - buffer.eh_->handle_close (ACE_INVALID_HANDLE, - ACE_Event_Handler::EXCEPT_MASK); - } - -#endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */ return 1; } @@ -1193,7 +1167,21 @@ ACE_Dev_Poll_Reactor::handle_events_i (ACE_Time_Value *max_wait_time) if (result == 0 || (result == -1 && errno == ETIME)) return 0; else if (result == -1) - return -1; + { + if (errno != EINTR) + return -1; + + // Bail out -- we got here since the poll was interrupted. + // If it was due to a signal registered through our ACE_Sig_Handler, + // then it was dispatched, so we count it in the number of events + // handled rather than cause an error return. + if (ACE_Sig_Handler::sig_pending () != 0) + { + ACE_Sig_Handler::sig_pending (0); + return 1; + } + return -1; + } // Dispatch the events, if any. return this->dispatch (); @@ -1230,45 +1218,15 @@ ACE_Dev_Poll_Reactor::dispatch (void) // Perform the Template Method for dispatching all the handlers. - // First check for interrupts. - if (0 /* active_handle_count == -1 */) - { - // Bail out -- we got here since the poll (i.e. ioctl()) was - // interrupted. - if (ACE_Sig_Handler::sig_pending () != 0) - { - ACE_Sig_Handler::sig_pending (0); - -#if 0 - // If any HANDLES in the <ready_set_> are activated as a - // result of signals they should be dispatched since - // they may be time critical... - pfds = this->ready_set_.pfds; - active_handle_count = this->ready_set_.nfds; -#endif /* 0 */ - - // Record the fact that the Reactor has dispatched a - // handle_signal() method. We need this to return the - // appropriate count below. - signal_occurred = 1; - } - else - return -1; - } - // Handle timers early since they may have higher latency // constraints than I/O handlers. Ideally, the order of // dispatching should be a strategy... - else if (this->dispatch_timer_handlers (other_handlers_dispatched) == -1) + if (this->dispatch_timer_handlers (other_handlers_dispatched) == -1) // State has changed or timer queue has failed, exit loop. break; // Check to see if there are no more I/O handles left to // dispatch AFTER we've handled the timers. - else if (0 /* active_handle_count == 0 */) - return io_handlers_dispatched - + other_handlers_dispatched - + signal_occurred; #if 0 // Next dispatch the notification handlers (if there are any to @@ -1390,7 +1348,11 @@ ACE_Dev_Poll_Reactor::dispatch_io_events (int &io_handlers_dispatched) { // Modify the reference count in an exception-safe way. - ACE_Dev_Poll_Handler_Guard (this->handler_rep_, handle); + // Note that eh could be the notify handler. It's not strictly + // necessary to manage its refcount, but since we don't enable + // the counting policy, it won't do much. Management of the + // notified handlers themselves is done in the notify handler. + ACE_Dev_Poll_Handler_Guard eh_guard (eh); // Release the lock during the upcall. ACE_Reverse_Lock<ACE_SYNCH_MUTEX> reverse_lock (this->lock_); @@ -1406,6 +1368,8 @@ ACE_Dev_Poll_Reactor::dispatch_io_events (int &io_handlers_dispatched) if (ACE_BIT_ENABLED (revents, POLLOUT)) #endif /* ACE_HAS_EVENT_POLL */ { + ++io_handlers_dispatched; + const int status = this->upcall (eh, &ACE_Event_Handler::handle_output, handle); @@ -1416,8 +1380,6 @@ ACE_Dev_Poll_Reactor::dispatch_io_events (int &io_handlers_dispatched) return this->remove_handler (handle, ACE_Event_Handler::WRITE_MASK); } - - ++io_handlers_dispatched; } // Dispatch all "high priority" (e.g. out-of-band data) events. @@ -1427,6 +1389,8 @@ ACE_Dev_Poll_Reactor::dispatch_io_events (int &io_handlers_dispatched) if (ACE_BIT_ENABLED (revents, POLLPRI)) #endif /* ACE_HAS_EVENT_POLL */ { + ++io_handlers_dispatched; + const int status = this->upcall (eh, &ACE_Event_Handler::handle_exception, handle); @@ -1437,8 +1401,6 @@ ACE_Dev_Poll_Reactor::dispatch_io_events (int &io_handlers_dispatched) return this->remove_handler (handle, ACE_Event_Handler::EXCEPT_MASK); } - - ++io_handlers_dispatched; } // Dispatch all input events. @@ -1448,6 +1410,8 @@ ACE_Dev_Poll_Reactor::dispatch_io_events (int &io_handlers_dispatched) if (ACE_BIT_ENABLED (revents, POLLIN)) #endif /* ACE_HAS_EVENT_POLL */ { + ++io_handlers_dispatched; + const int status = this->upcall (eh, &ACE_Event_Handler::handle_input, handle); @@ -1458,8 +1422,6 @@ ACE_Dev_Poll_Reactor::dispatch_io_events (int &io_handlers_dispatched) return this->remove_handler (handle, ACE_Event_Handler::READ_MASK); } - - ++io_handlers_dispatched; } } // The reactor lock is reacquired upon leaving this scope. } @@ -1668,7 +1630,7 @@ ACE_Dev_Poll_Reactor::register_handler (const ACE_Sig_Set &sigset, #if (ACE_NSIG > 0) && !defined (CHORUS) for (int s = 1; s < ACE_NSIG; ++s) - if (sigset.is_member (s) + if ((sigset.is_member (s) == 1) && this->signal_handler_->register_handler (s, new_sh, new_disp) == -1) @@ -1727,7 +1689,16 @@ ACE_Dev_Poll_Reactor::remove_handler_i (ACE_HANDLE handle, return -1; if (ACE_BIT_DISABLED (mask, ACE_Event_Handler::DONT_CALL)) - (void) eh->handle_close (handle, mask); + { + // Release the lock during the "close" upcall. + ACE_Reverse_Lock<ACE_SYNCH_MUTEX> reverse_lock (this->lock_); + ACE_GUARD_RETURN (ACE_Reverse_Lock<ACE_SYNCH_MUTEX>, + reverse_guard, + reverse_lock, + -1); + + (void) eh->handle_close (handle, mask); + } // Note the fact that we've changed the state of the wait_set, // i.e. the "interest set," which is used by the dispatching loop to @@ -1786,7 +1757,7 @@ ACE_Dev_Poll_Reactor::remove_handler (const ACE_Sig_Set &sigset) #if (ACE_NSIG > 0) && !defined (CHORUS) for (int s = 1; s < ACE_NSIG; ++s) - if (sigset.is_member (s) + if ((sigset.is_member (s) == 1) && this->signal_handler_->remove_handler (s) == -1) result = -1; @@ -2254,13 +2225,11 @@ ACE_Dev_Poll_Reactor::wakeup_all_threads (void) { ACE_TRACE ("ACE_Dev_Poll_Reactor::wakeup_all_threads"); -#if 0 // Send a notification, but don't block if there's no one to receive // it. this->notify (0, ACE_Event_Handler::NULL_MASK, (ACE_Time_Value *) &ACE_Time_Value::zero); -#endif /* 0 */ } int |