diff options
Diffstat (limited to 'ace/CLASSIX/CLASSIX_Select_Reactor.cpp')
-rw-r--r-- | ace/CLASSIX/CLASSIX_Select_Reactor.cpp | 337 |
1 files changed, 0 insertions, 337 deletions
diff --git a/ace/CLASSIX/CLASSIX_Select_Reactor.cpp b/ace/CLASSIX/CLASSIX_Select_Reactor.cpp deleted file mode 100644 index 960e76358aa..00000000000 --- a/ace/CLASSIX/CLASSIX_Select_Reactor.cpp +++ /dev/null @@ -1,337 +0,0 @@ -// $Id$ -/* -// ============================================================================ -// -// = LIBRARY -// ACE -// -// = FILENAME -// CLASSIX_Reactor.cpp -// -// = AUTHOR(S) -// Nokia Telecommunications -// -// ============================================================================ -*/ -#include "ace/CLASSIX/CLASSIX_Select_Reactor.h" - -#if !defined (__ACE_INLINE__) -#include "ace/CLASSIX/CLASSIX_Select_Reactor.i" -#endif /* __ACE_INLINE__ */ - -#include "ace/CLASSIX/CLASSIX_OS.h" -/* ------------------------------------------------------------------------- */ -int -ACE_CLASSIX_Select_Reactor::wait_for_multiple_events -(ACE_Select_Reactor_Handle_Set &theDispatchSet, - ACE_Time_Value *max_wait_time) -{ - - ACE_Time_Value timer_buf (0); - ACE_Time_Value *this_timeout = &timer_buf; - - int number_of_active_handles = this->any_ready (theDispatchSet); - - // If there are any bits enabled in the <ready_set_> then we'll - // handle those first, otherwise we'll block in select(). - - if (number_of_active_handles == 0) - { - int port = K_ANYENABLED; - do - { - // Monitor all enabled ports - // CLASSIX uses -1 rathre than 0 for blocked receive - int msec = -1; - if (this->timer_queue_->calculate_timeout (max_wait_time, - this_timeout) != 0) - { - if ((msec = this_timeout->msec()) == 0) - { - msec = -1; - this_timeout = 0; - } - } - else - this_timeout = 0; - - ACE_CLASSIX_Msg rmsg(0, 0); - port = K_ANYENABLED; - ssize_t size = ::ipcReceive(rmsg.get(), &port, msec); -#if 0 - ACE_DEBUG((LM_DEBUG, - "(%t)ACE_CLASSIX_Select_Reactor::" - "return from ipcReceive():ret = %d" - ", port = %d, timeout = %d\n", - size, port, msec)); -#endif - if (size >= 0) - { - // Is 0 valid ??? - // Keep info about which handler this message is for and - // its size. - if (this->set_current_info_(port, size) == 0) - { - theDispatchSet.rd_mask_.set_bit(port); - number_of_active_handles = 1; - } - else - { - ACE_DEBUG((LM_DEBUG, - "Synchronization problem in Reactor???\n")); - number_of_active_handles = -1; - errno = K_EFAULT; - } - } - else - { - // make the current message information invalid - this->set_current_info_(ACE_INVALID_HANDLE, 0); - if ((errno = size) == K_ETIMEOUT) - number_of_active_handles = 0; - else - number_of_active_handles = -1; - } - } - while (number_of_active_handles == -1 && - this->handle_error_ (port) > 0); - } - // Return the number of events to dispatch. - return number_of_active_handles; -} - -int -ACE_CLASSIX_Select_Reactor::set_current_info_(ACE_HANDLE thePort, - size_t theSize) -{ - ACE_MT(ACE_GUARD_RETURN (ACE_SELECT_REACTOR_MUTEX, ace_mon, - this->token_, -1)); - - this->current_handle_ = thePort; - this->current_msg_size_ = theSize; - return 0; -} - -int -ACE_CLASSIX_Select_Reactor::current_info(ACE_HANDLE thePort, - size_t& theSize) -{ - ACE_MT (ACE_GUARD_RETURN (ACE_SELECT_REACTOR_MUTEX, - ace_mon, this->token_, -1)); - - if (this->current_handle_ == thePort) - { - theSize = this->current_msg_size_; - this->current_msg_size_ = 0; - this->current_handle_ = ACE_INVALID_HANDLE; - return 0; - } - else - { - theSize = 0; - return -1; - } -} - -int -ACE_CLASSIX_Select_Reactor::handle_error_ (int thePort) -{ - - // The thread has been aborted - if (errno == K_EABORT) - return this->restart_; - // No port or a (Chorus) handler is attached to the port - else if (errno == K_ENOPORT || errno == K_EINVAL) - return this->check_handles_ (thePort); - else - return -1; -} - -int -ACE_CLASSIX_Select_Reactor::check_handles_ (int thePort) -{ - ACE_TRACE ("ACE_CLASSIX_Select_Reactor::check_handles_"); - if (thePort == K_ANYENABLED) - return -1; - else - // Don't know how to check if a Chorus port has been disabled or deleted. - return 0; -} - -/* ------------------------------------------------------------------------- */ -void -ACE_CLASSIX_Select_Reactor_Notify::dump (void) const -{ - ACE_TRACE ("ACE_CLASSIX_Select_Reactor_Notify::dump"); - - ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this)); - ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("ACE_CLASSIX_select_reactor_ = %x"), - this->select_reactor_)); - this->notification_sap_.dump (); - ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP)); -} - -int -ACE_CLASSIX_Select_Reactor_Notify::open (ACE_Reactor_Impl *theReactor, - ACE_Timer_Queue*, - int the_disable_notify_pipe) -{ - if (the_disable_notify_pipe == 0) - { - this->select_reactor_ = ACE_dynamic_cast - (ACE_CLASSIX_Select_Reactor *, theReactor); - - if (this->notification_sap_.open (&this->notification_port_) != 0 || - this->notification_sap_.selectable() != 0) - return -1; - - return this->select_reactor_->register_handler - (this->notification_sap_.get_handle (), - this, - ACE_Event_Handler::READ_MASK); - } - else - { - this->select_reactor_ = 0; - } -} - -ssize_t -ACE_CLASSIX_Select_Reactor_Notify::notify (ACE_Event_Handler *eh, - ACE_Reactor_Mask mask, - ACE_Time_Value *timeout) -{ - ACE_Notification_Buffer buffer (eh, mask); - ACE_CLASSIX_Msg msg(&buffer, sizeof (buffer)); - KnIpcDest dest; - dest.target = this->notification_sap_.get_addr().get_id(); - - ssize_t n = ipcSend (msg.get(), K_DEFAULTPORT, &dest); - if (n < 0) - ACE_DEBUG((LM_DEBUG, "ipcSend() error = %d\n", n)); - return n == 0 ? 0 : -1; -} - -// Handles pending threads (if any) that are waiting to unblock the -// Select_Reactor. - -int -ACE_CLASSIX_Select_Reactor_Notify::dispatch_notifications ( - int & number_of_active_handles, - const ACE_Handle_Set &rd_mask) -{ - ACE_TRACE ("(%t) ACE_CLASSIX_Select_Reactor_Notify::dispatch_notifications"); - - ACE_HANDLE read_handle = - this->notification_sap_.get_handle (); - - if (rd_mask.is_set (read_handle)) - { - number_of_active_handles--; - return this->handle_input (read_handle); - } - else - return 0; -} - -// Special trick to unblock select() when updates occur in somewhere -// other than the main ACE_Select_Reactor thread. All we do is write data to -// a pipe that the ACE_Select_Reactor is listening on. Thanks to Paul -// Stephenson for suggesting this approach. - -int -ACE_CLASSIX_Select_Reactor_Notify::handle_input (ACE_HANDLE handle) -{ - // Precondition: this->select_reactor_.token_.current_owner () == - // ACE_Thread::self (); - - if (handle != this->notification_sap_.get_handle()) - { - ACE_DEBUG((LM_DEBUG, "ACE_CLASSIX_Select_Reator_Notify::" - "handle_input() Not my handle\n")); - return 0; - } - - ssize_t n = 0; - size_t n1= 0; - - int number_dispatched = 0; - - ACE_Notification_Buffer buffer; - ACE_CLASSIX_Msg rmsg(&buffer, sizeof (buffer)); - - if (this->select_reactor_->current_info(handle, n1) == -1 || - n1 != sizeof buffer) - { - // I'm not quite sure what to do at this point. It's - // probably best just to return -1. - ACE_DEBUG((LM_DEBUG, - "ACE_CLASSIX_Select_Reactor_Notify:: " - "read not expected by the reactor\n", n1)); - return -1; - } - - while ((n = ::ipcGetData(rmsg.get())) > 0) - { - if (n != sizeof buffer) - { - // I'm not sure quite what to do at this point. It's - // probably best just to return -1. - ACE_DEBUG((LM_DEBUG, - "ACE_CLASSIX_Select_Reactor_Notify::ipcGetDAta() " - "incorrect read(%d)\n", n)); - return -1; - } - - // If eh == 0 then another thread is unblocking the ACE_Select_Reactor - // to update the ACE_Select_Reactor's internal structures. Otherwise, - // we need to dispatch the appropriate handle_* method on the - // ACE_Event_Handler pointer we've been passed. - if (buffer.eh_ != 0) - { - int result = 0; - - switch (buffer.mask_) - { - case ACE_Event_Handler::READ_MASK: - case ACE_Event_Handler::ACCEPT_MASK: - result = buffer.eh_->handle_input (ACE_INVALID_HANDLE); - break; - case ACE_Event_Handler::WRITE_MASK: - result = buffer.eh_->handle_output (ACE_INVALID_HANDLE); - break; - case ACE_Event_Handler::EXCEPT_MASK: - result = buffer.eh_->handle_exception (ACE_INVALID_HANDLE); - break; - default: - // Should we bail out if we get an invalid mask? - ACE_ERROR ((LM_ERROR, ACE_TEXT ("invalid mask = %d\n"), buffer.mask_)); - } - if (result == -1) - buffer.eh_->handle_close (ACE_INVALID_HANDLE, - ACE_Event_Handler::EXCEPT_MASK); - } - - number_dispatched++; - - // Bail out if we've reached the <notify_threshold_>. Note that - // by default <notify_threshold_> is -1, so we'll loop until all - // the notifications in the pipe have been dispatched. - if (number_dispatched == this->select_reactor_->max_notify_iterations()) - break; - } - - // Reassign number_dispatched to -1 if things have gone seriously - // wrong. - if (n < 0) - number_dispatched = -1; - - - // Enqueue ourselves into the list of waiting threads. When we - // reacquire the token we'll be off and running again with ownership - // of the token. The postcondition of this call is that - // this->select_reactor_.token_.current_owner () == ACE_Thread::self (); - this->select_reactor_->renew(); - return number_dispatched; -} -/* ------------------------------------------------------------------------- */ |