diff options
Diffstat (limited to 'ace/CLASSIX/Select_Reactor.cpp')
-rw-r--r-- | ace/CLASSIX/Select_Reactor.cpp | 440 |
1 files changed, 0 insertions, 440 deletions
diff --git a/ace/CLASSIX/Select_Reactor.cpp b/ace/CLASSIX/Select_Reactor.cpp deleted file mode 100644 index 50ddfbaf5da..00000000000 --- a/ace/CLASSIX/Select_Reactor.cpp +++ /dev/null @@ -1,440 +0,0 @@ -/* -*- C++ -*- */ -// $Id$ - -// ============================================================================ -// -// = LIBRARY -// ACE -// -// = FILENAME -// CLASSIX/Reactor.cpp -// -// = AUTHOR(S) -// Wei Chiang -// -// = COPYRIGHT -// Copyright 1998 Nokia Telecommunications -// -// ============================================================================ - -#include "CLASSIX/Reactor.h" - -#if !defined (__ACE_INLINE__) -#include "CLASSIX/Reactor.i" -#endif /* __ACE_INLINE__ */ - -#include "CLASSIX/OS.h" -/* ------------------------------------------------------------------------- */ -ACE_CLASSIX_Select_Reactor::ACE_CLASSIX_Select_Reactor( - ACE_Timer_Queue *theTimeQ) - : ACE_Select_Reactor(0, theTimeQ, 0, - new ACE_CLASSIX_Select_Reactor_Notify()), - current_msg_size_ (0), - current_handle_ (ACE_INVALID_HANDLE) -{ -} - -ACE_CLASSIX_Select_Reactor::ACE_CLASSIX_Select_Reactor ( - size_t theSize, - int theRs, - ACE_Timer_Queue *theTimeQ) - : ACE_Select_Reactor (theSize, theRs, 0, theTimeQ, 0, - new ACE_CLASSIX_Select_Reactor_Notify() ), - current_msg_size_ (0), - current_handle_ (ACE_INVALID_HANDLE) -{ -} - -int -ACE_CLASSIX_Select_Reactor::wait_for_multiple_events - (ACE_Select_Reactor_Handle_Set &theDispatchSet, - ACE_Time_Value *max_wait_time) -{ - - ACE_Time_Value timer_buf (0); - ACE_Time_Value *this_timeout = &timer_buf; - - int number_of_active_handles = this->any_ready (theDispatchSet); - - // If there are any bits enabled in the <ready_set_> then we'll - // handle those first, otherwise we'll block in select(). - - if (number_of_active_handles == 0) - { - int port = K_ANYENABLED; - do - { - // Monitor all enabled ports - // CLASSIX uses -1 rathre than 0 for blocked receive - int msec = -1; - if (this->timer_queue_->calculate_timeout (max_wait_time, - this_timeout) != 0) - { - if ((msec = this_timeout->msec()) == 0) - { - msec = -1; - this_timeout = 0; - } - } - else - this_timeout = 0; - - ACE_CLASSIX_Msg rmsg(0, 0); - port = K_ANYENABLED; - ssize_t size = ::ipcReceive(rmsg.get(), &port, msec); -#if 0 - ACE_DEBUG((LM_DEBUG, - "(%t)ACE_CLASSIX_Select_Reactor::" - "return from ipcReceive():ret = %d" - ", port = %d, timeout = %d\n", - size, port, msec)); -#endif - if (size >= 0) - { - // Is 0 valid ??? - // Keep info about which handler this message is for and - // its size. - if (this->set_current_info_(port, size) == 0) - { - theDispatchSet.rd_mask_.set_bit(port); - number_of_active_handles = 1; - } - else - { - ACE_DEBUG((LM_DEBUG, - "Synchronization problem in Reactor???\n")); - number_of_active_handles = -1; - errno = K_EFAULT; - } - } - else - { - // make the current message information invalid - this->set_current_info_(ACE_INVALID_HANDLE, 0); - if ((errno = size) == K_ETIMEOUT) - number_of_active_handles = 0; - else - number_of_active_handles = -1; - } - } - while (number_of_active_handles == -1 && - this->handle_error_ (port) > 0); - } - // Return the number of events to dispatch. - return number_of_active_handles; -} - -int -ACE_CLASSIX_Select_Reactor::set_current_info_(ACE_HANDLE thePort, - size_t theSize) -{ -// ACE_MT (ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex, ace_mon, -// this->current_lock_, -1)); - ACE_MT(ACE_GUARD_RETURN (ACE_SELECT_REACTOR_MUTEX, ace_mon, - this->token_, -1)); - - this->current_handle_ = thePort; - this->current_msg_size_ = theSize; - return 0; -} - -int -ACE_CLASSIX_Select_Reactor::get_current_info(ACE_HANDLE thePort, - size_t& theSize) -{ -// ACE_MT (ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex, ace_mon, -// this->current_lock_, 0)); - ACE_MT (ACE_GUARD_RETURN (ACE_SELECT_REACTOR_MUTEX, - ace_mon, this->token_, -1)); - - if (this->current_handle_ == thePort) - { - theSize = this->current_msg_size_; - this->current_msg_size_ = 0; - this->current_handle_ = ACE_INVALID_HANDLE; - return 0; - } - else - { - theSize = 0; - return -1; - } -} - -#if 0 -ACE_Message_Block* -ACE_CLASSIX_Select_Reactor::collect_msg(ACE_HANDLE theHandle) -{ - ACE_MT (ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex, ace_mon, - this->lock_current_, 0)); - - if (this->current_handle_ != ACE_INVALID_HANDLE && - theHandle == this->current_handle_) - { - ACE_Message_Block *msg = this->current_msg_; - this->current_msg_ = 0; - this->current_handle_ = ACE_INVALID_HANDLE; - return msg; - } - else - { - return 0; - } -} - - -int -ACE_CLASSIX_Select_Reactor::get_data_(ACE_HANDLE theHandle, size_t theSize) -{ - ACE_MT (ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex, ace_mon, - this->lock_current_, 0)); - - // reset current information - this->current_error_ = 0; - this->current_handle_= theHandle; - - // Remove the previous message that has not been taken - if (this->current_msg_) - this->current_msg_->release(); - - // Read the current message into a block bigger than the actual size - // to facilitate adding potential paddings by the handler - this->current_msg_ = new ACE_Message_Block(theSize + this->overhead_); - char *data = this->current_msg_ == 0 ? 0 : this->current_msg_->rd_ptr(); - // Invoke the read operation even though we may not have a buffer for it. - // This is to clean up the CLASSIX' current message space. - ACE_CLASSIX_Msg rmsg(data, data == 0 ? 0 : theSize); - int result = ::ipcGetData(rmsg.get()); - - if (!this->current_msg_) - this->current_error_ = ENOBUFS; - else if (result < 0) - this->current_error_ = ACE_CLASSIX_OS::convert_io_error(result); - else if (result != theSize) - this->current_error_ = EFAULT; - - if (this->current_error_ < 0 && this->current_msg_) - { - this->current_msg_->release(); - this->current_msg_ = 0; - } - return this->current_error_; -} -#endif - -int -ACE_CLASSIX_Select_Reactor::handle_error_ (int thePort) -{ - - // The thread has been aborted - if (errno == K_EABORT) - return this->restart_; - // No port or a (Chorus) handler is attached to the port - else if (errno == K_ENOPORT || errno == K_EINVAL) - return this->check_handles_ (thePort); - else - return -1; -} - -int -ACE_CLASSIX_Select_Reactor::check_handles_ (int thePort) -{ - ACE_TRACE ("ACE_Select_Reactor::check_handles"); - if (thePort == K_ANYENABLED) - return -1; - else - // Don't know how to check if a Chorus port has been disabled or deleted. - return 0; -} - -/* ------------------------------------------------------------------------- */ - -ACE_CLASSIX_Select_Reactor_Notify::ACE_CLASSIX_Select_Reactor_Notify(void) - : ACE_Select_Reactor_Notify() -{ - // Get the address for the notification port - // this->notification_sap_.set_addr - // (ACE_CLASSIX_Port(this->notification_port_)); -} - -void -ACE_CLASSIX_Select_Reactor_Notify::dump (void) const -{ - ACE_TRACE ("ACE_CLASSIX_Select_Reactor_Notify::dump"); - - ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this)); - ACE_DEBUG ((LM_DEBUG, ASYS_TEXT ("CLASSIX_select_reactor_ = %x"), - this->select_reactor_)); - this->notification_sap_.dump (); - ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP)); -} - -int -ACE_CLASSIX_Select_Reactor_Notify::open (ACE_Select_Reactor *r, - int disable_notify_pipe) -{ - if (disable_notify_pipe == 0) - { - this->select_reactor_ = r; - - if (this->notification_sap_.open (&this->notification_port_) != 0 || - this->notification_sap_.selectable() != 0) - return -1; - - return this->select_reactor_->register_handler - (this->notification_sap_.get_handle (), - this, - ACE_Event_Handler::READ_MASK); - } - else - { - this->select_reactor_ = 0; - } -} - -int -ACE_CLASSIX_Select_Reactor_Notify::close (void) -{ - ACE_TRACE ("ACE_Select_Reactor_Notify::close"); - // deregister handle ??? - return this->notification_sap_.close (); -} - -ssize_t -ACE_CLASSIX_Select_Reactor_Notify::notify (ACE_Event_Handler *eh, - ACE_Reactor_Mask mask, - ACE_Time_Value *timeout) -{ - ACE_Notification_Buffer buffer (eh, mask); - ACE_CLASSIX_Msg msg(&buffer, sizeof (buffer)); - KnIpcDest dest; - dest.target = this->notification_sap_.get_addr().get_id(); - - ssize_t n = ipcSend (msg.get(), K_DEFAULTPORT, &dest); - if (n < 0) - ACE_DEBUG((LM_DEBUG, "ipcSend() error = %d\n", n)); - return n == 0 ? 0 : -1; -} - -// Handles pending threads (if any) that are waiting to unblock the -// Select_Reactor. - -int -ACE_CLASSIX_Select_Reactor_Notify::dispatch_notifications ( - int & number_of_active_handles, - const ACE_Handle_Set &rd_mask) -{ - ACE_TRACE ("(%t) ACE_Select_Reactor_Notify::handle_notification"); - - ACE_HANDLE read_handle = - this->notification_sap_.get_handle (); - - if (rd_mask.is_set (read_handle)) - { - number_of_active_handles--; - return this->handle_input (read_handle); - } - else - return 0; -} - -// Special trick to unblock select() when updates occur in somewhere -// other than the main ACE_Select_Reactor thread. All we do is write data to -// a pipe that the ACE_Select_Reactor is listening on. Thanks to Paul -// Stephenson for suggesting this approach. - -int -ACE_CLASSIX_Select_Reactor_Notify::handle_input (ACE_HANDLE handle) -{ - // Precondition: this->select_reactor_.token_.current_owner () == - // ACE_Thread::self (); - - if (handle != this->notification_sap_.get_handle()) - { - ACE_DEBUG((LM_DEBUG, "ACE_CLASSIX_Select_Reator_Notify::" - "handle_input() Not my handle\n")); - return 0; - } - - ssize_t n = 0; - size_t n1= 0; - - int number_dispatched = 0; - - ACE_Notification_Buffer buffer; - ACE_CLASSIX_Msg rmsg(&buffer, sizeof (buffer)); - - if (this->select_reactor_->get_current_info(handle, n1) == -1 || - n1 != sizeof buffer) - { - // I'm not sure quite what to do at this point. It's - // probably best just to return -1. - ACE_DEBUG((LM_DEBUG, - "ACE_CLASSIX_Select_Reactor_Notify:: " - "read not expected by the reactor\n", n1)); - return -1; - } - - while ((n = ::ipcGetData(rmsg.get())) > 0) - { - if (n != sizeof buffer) - { - // I'm not sure quite what to do at this point. It's - // probably best just to return -1. - ACE_DEBUG((LM_DEBUG, - "ACE_CLASSIX_Select_Reactor_Notify::ipcGetDAta() " - "incorrect read(%d)\n", n)); - return -1; - } - - // If eh == 0 then another thread is unblocking the ACE_Select_Reactor - // to update the ACE_Select_Reactor's internal structures. Otherwise, - // we need to dispatch the appropriate handle_* method on the - // ACE_Event_Handler pointer we've been passed. - if (buffer.eh_ != 0) - { - int result = 0; - - switch (buffer.mask_) - { - case ACE_Event_Handler::READ_MASK: - case ACE_Event_Handler::ACCEPT_MASK: - result = buffer.eh_->handle_input (ACE_INVALID_HANDLE); - break; - case ACE_Event_Handler::WRITE_MASK: - result = buffer.eh_->handle_output (ACE_INVALID_HANDLE); - break; - case ACE_Event_Handler::EXCEPT_MASK: - result = buffer.eh_->handle_exception (ACE_INVALID_HANDLE); - break; - default: - // Should we bail out if we get an invalid mask? - ACE_ERROR ((LM_ERROR, ASYS_TEXT ("invalid mask = %d\n"), buffer.mask_)); - } - if (result == -1) - buffer.eh_->handle_close (ACE_INVALID_HANDLE, - ACE_Event_Handler::EXCEPT_MASK); - } - - number_dispatched++; - - // Bail out if we've reached the <notify_threshold_>. Note that - // by default <notify_threshold_> is -1, so we'll loop until all - // the notifications in the pipe have been dispatched. - if (number_dispatched == this->select_reactor_->max_notify_iterations()) - break; - } - - // Reassign number_dispatched to -1 if things have gone seriously - // wrong. - if (n < 0) - number_dispatched = -1; - - - // Enqueue ourselves into the list of waiting threads. When we - // reacquire the token we'll be off and running again with ownership - // of the token. The postcondition of this call is that - // this->select_reactor_.token_.current_owner () == ACE_Thread::self (); - this->select_reactor_->renew(); - return number_dispatched; -} |