diff options
Diffstat (limited to 'ace/Dev_Poll_Reactor.cpp')
-rw-r--r-- | ace/Dev_Poll_Reactor.cpp | 140 |
1 files changed, 57 insertions, 83 deletions
diff --git a/ace/Dev_Poll_Reactor.cpp b/ace/Dev_Poll_Reactor.cpp index bbc9473f615..b8d8eb3530d 100644 --- a/ace/Dev_Poll_Reactor.cpp +++ b/ace/Dev_Poll_Reactor.cpp @@ -11,6 +11,7 @@ ACE_RCSID (ace, # include "ace/OS_NS_unistd.h" # include "ace/OS_NS_fcntl.h" +# include "ace/OS_NS_stropts.h" # if defined (ACE_HAS_EVENT_POLL) && defined (linux) # include /**/ <sys/epoll.h> @@ -59,7 +60,7 @@ ACE_Dev_Poll_Reactor_Notify::open (ACE_Reactor_Impl *r, if (disable_notify_pipe == 0) { - this->dp_reactor_ = ACE_dynamic_cast (ACE_Dev_Poll_Reactor *, r); + this->dp_reactor_ = dynamic_cast<ACE_Dev_Poll_Reactor *> (r); if (this->dp_reactor_ == 0) { @@ -180,11 +181,13 @@ ACE_Dev_Poll_Reactor_Notify::notify (ACE_Event_Handler *eh, ACE_ASSERT (temp != 0); *temp = buffer; + ACE_Dev_Poll_Handler_Guard eh_guard (eh); + if (notify_queue_.enqueue_tail (temp) == -1) return -1; - // Let us send a notify for every message - // if (notification_required) + // Now pop the pipe to force the callback for dispatching when ready. + // @todo - this only needs to write one byte for ACE_HAS_NOTIFICATION_QUEUE. ssize_t n = ACE::send (this->notification_pipe_.write_handle (), (char *) &buffer, sizeof buffer, @@ -192,10 +195,17 @@ ACE_Dev_Poll_Reactor_Notify::notify (ACE_Event_Handler *eh, if (n == -1) return -1; + // Since the notify is queued (and maybe already delivered by now) + // we can simply release the guard. The dispatch of this notification + // will decrement the reference count. + eh_guard.release (); + return 0; #else ACE_Notification_Buffer buffer (eh, mask); + ACE_Dev_Poll_Handler_Guard eh_guard (eh); + ssize_t n = ACE::send (this->notification_pipe_.write_handle (), (char *) &buffer, sizeof buffer, @@ -203,6 +213,8 @@ ACE_Dev_Poll_Reactor_Notify::notify (ACE_Event_Handler *eh, if (n == -1) return -1; + eh_guard.release (); + return 0; #endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */ } @@ -382,6 +394,7 @@ ACE_Dev_Poll_Reactor_Notify::dispatch_notify (ACE_Notification_Buffer &buffer) ACE_LIB_TEXT ("enqueue_head")), -1); } +#endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */ // If eh == 0 then another thread is unblocking the // ACE_Dev_Poll_Reactor to update the ACE_Dev_Poll_Reactor's @@ -390,6 +403,10 @@ ACE_Dev_Poll_Reactor_Notify::dispatch_notify (ACE_Notification_Buffer &buffer) // pointer we've been passed. if (buffer.eh_ != 0) { + // Guard the handler's refcount. Recall that when the notify + // was queued, the refcount was incremented, so it need not be + // now. The guard insures that it is decremented properly. + ACE_Dev_Poll_Handler_Guard eh_guard (buffer.eh_, false); switch (buffer.mask_) { @@ -413,44 +430,6 @@ ACE_Dev_Poll_Reactor_Notify::dispatch_notify (ACE_Notification_Buffer &buffer) buffer.eh_->handle_close (ACE_INVALID_HANDLE, ACE_Event_Handler::EXCEPT_MASK); } -#else - // If eh == 0 then another thread is unblocking the - // ACE_Dev_Poll_Reactor to update the ACE_Dev_Poll_Reactor's - // internal structures. Otherwise, we need to dispatch the - // appropriate handle_* method on the ACE_Event_Handler - // pointer we've been passed. - if (buffer.eh_ != 0) - { - switch (buffer.mask_) - { - case ACE_Event_Handler::READ_MASK: - case ACE_Event_Handler::ACCEPT_MASK: - result = buffer.eh_->handle_input (ACE_INVALID_HANDLE); - break; - case ACE_Event_Handler::WRITE_MASK: - result = buffer.eh_->handle_output (ACE_INVALID_HANDLE); - break; - case ACE_Event_Handler::EXCEPT_MASK: - result = buffer.eh_->handle_exception (ACE_INVALID_HANDLE); - break; - case ACE_Event_Handler::QOS_MASK: - result = buffer.eh_->handle_qos (ACE_INVALID_HANDLE); - break; - case ACE_Event_Handler::GROUP_QOS_MASK: - result = buffer.eh_->handle_group_qos (ACE_INVALID_HANDLE); - break; - default: - // Should we bail out if we get an invalid mask? - ACE_ERROR ((LM_ERROR, - ACE_LIB_TEXT ("invalid mask = %d\n"), - buffer.mask_)); - } - if (result == -1) - buffer.eh_->handle_close (ACE_INVALID_HANDLE, - ACE_Event_Handler::EXCEPT_MASK); - } - -#endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */ return 1; } @@ -1188,7 +1167,21 @@ ACE_Dev_Poll_Reactor::handle_events_i (ACE_Time_Value *max_wait_time) if (result == 0 || (result == -1 && errno == ETIME)) return 0; else if (result == -1) - return -1; + { + if (errno != EINTR) + return -1; + + // Bail out -- we got here since the poll was interrupted. + // If it was due to a signal registered through our ACE_Sig_Handler, + // then it was dispatched, so we count it in the number of events + // handled rather than cause an error return. + if (ACE_Sig_Handler::sig_pending () != 0) + { + ACE_Sig_Handler::sig_pending (0); + return 1; + } + return -1; + } // Dispatch the events, if any. return this->dispatch (); @@ -1225,45 +1218,15 @@ ACE_Dev_Poll_Reactor::dispatch (void) // Perform the Template Method for dispatching all the handlers. - // First check for interrupts. - if (0 /* active_handle_count == -1 */) - { - // Bail out -- we got here since the poll (i.e. ioctl()) was - // interrupted. - if (ACE_Sig_Handler::sig_pending () != 0) - { - ACE_Sig_Handler::sig_pending (0); - -#if 0 - // If any HANDLES in the <ready_set_> are activated as a - // result of signals they should be dispatched since - // they may be time critical... - pfds = this->ready_set_.pfds; - active_handle_count = this->ready_set_.nfds; -#endif /* 0 */ - - // Record the fact that the Reactor has dispatched a - // handle_signal() method. We need this to return the - // appropriate count below. - signal_occurred = 1; - } - else - return -1; - } - // Handle timers early since they may have higher latency // constraints than I/O handlers. Ideally, the order of // dispatching should be a strategy... - else if (this->dispatch_timer_handlers (other_handlers_dispatched) == -1) + if (this->dispatch_timer_handlers (other_handlers_dispatched) == -1) // State has changed or timer queue has failed, exit loop. break; // Check to see if there are no more I/O handles left to // dispatch AFTER we've handled the timers. - else if (0 /* active_handle_count == 0 */) - return io_handlers_dispatched - + other_handlers_dispatched - + signal_occurred; #if 0 // Next dispatch the notification handlers (if there are any to @@ -1385,7 +1348,11 @@ ACE_Dev_Poll_Reactor::dispatch_io_events (int &io_handlers_dispatched) { // Modify the reference count in an exception-safe way. - ACE_Dev_Poll_Handler_Guard (this->handler_rep_, handle); + // Note that eh could be the notify handler. It's not strictly + // necessary to manage its refcount, but since we don't enable + // the counting policy, it won't do much. Management of the + // notified handlers themselves is done in the notify handler. + ACE_Dev_Poll_Handler_Guard eh_guard (eh); // Release the lock during the upcall. ACE_Reverse_Lock<ACE_SYNCH_MUTEX> reverse_lock (this->lock_); @@ -1401,6 +1368,8 @@ ACE_Dev_Poll_Reactor::dispatch_io_events (int &io_handlers_dispatched) if (ACE_BIT_ENABLED (revents, POLLOUT)) #endif /* ACE_HAS_EVENT_POLL */ { + ++io_handlers_dispatched; + const int status = this->upcall (eh, &ACE_Event_Handler::handle_output, handle); @@ -1411,8 +1380,6 @@ ACE_Dev_Poll_Reactor::dispatch_io_events (int &io_handlers_dispatched) return this->remove_handler (handle, ACE_Event_Handler::WRITE_MASK); } - - ++io_handlers_dispatched; } // Dispatch all "high priority" (e.g. out-of-band data) events. @@ -1422,6 +1389,8 @@ ACE_Dev_Poll_Reactor::dispatch_io_events (int &io_handlers_dispatched) if (ACE_BIT_ENABLED (revents, POLLPRI)) #endif /* ACE_HAS_EVENT_POLL */ { + ++io_handlers_dispatched; + const int status = this->upcall (eh, &ACE_Event_Handler::handle_exception, handle); @@ -1432,8 +1401,6 @@ ACE_Dev_Poll_Reactor::dispatch_io_events (int &io_handlers_dispatched) return this->remove_handler (handle, ACE_Event_Handler::EXCEPT_MASK); } - - ++io_handlers_dispatched; } // Dispatch all input events. @@ -1443,6 +1410,8 @@ ACE_Dev_Poll_Reactor::dispatch_io_events (int &io_handlers_dispatched) if (ACE_BIT_ENABLED (revents, POLLIN)) #endif /* ACE_HAS_EVENT_POLL */ { + ++io_handlers_dispatched; + const int status = this->upcall (eh, &ACE_Event_Handler::handle_input, handle); @@ -1453,8 +1422,6 @@ ACE_Dev_Poll_Reactor::dispatch_io_events (int &io_handlers_dispatched) return this->remove_handler (handle, ACE_Event_Handler::READ_MASK); } - - ++io_handlers_dispatched; } } // The reactor lock is reacquired upon leaving this scope. } @@ -1722,7 +1689,16 @@ ACE_Dev_Poll_Reactor::remove_handler_i (ACE_HANDLE handle, return -1; if (ACE_BIT_DISABLED (mask, ACE_Event_Handler::DONT_CALL)) - (void) eh->handle_close (handle, mask); + { + // Release the lock during the "close" upcall. + ACE_Reverse_Lock<ACE_SYNCH_MUTEX> reverse_lock (this->lock_); + ACE_GUARD_RETURN (ACE_Reverse_Lock<ACE_SYNCH_MUTEX>, + reverse_guard, + reverse_lock, + -1); + + (void) eh->handle_close (handle, mask); + } // Note the fact that we've changed the state of the wait_set, // i.e. the "interest set," which is used by the dispatching loop to @@ -2249,13 +2225,11 @@ ACE_Dev_Poll_Reactor::wakeup_all_threads (void) { ACE_TRACE ("ACE_Dev_Poll_Reactor::wakeup_all_threads"); -#if 0 // Send a notification, but don't block if there's no one to receive // it. this->notify (0, ACE_Event_Handler::NULL_MASK, (ACE_Time_Value *) &ACE_Time_Value::zero); -#endif /* 0 */ } int |