diff options
Diffstat (limited to 'ACE/ace/WFMO_Reactor.inl')
-rw-r--r-- | ACE/ace/WFMO_Reactor.inl | 1173 |
1 files changed, 1173 insertions, 0 deletions
diff --git a/ACE/ace/WFMO_Reactor.inl b/ACE/ace/WFMO_Reactor.inl new file mode 100644 index 00000000000..b2d24fb05d7 --- /dev/null +++ b/ACE/ace/WFMO_Reactor.inl @@ -0,0 +1,1173 @@ +// -*- C++ -*- +// +// $Id$ + +#include "ace/Handle_Set.h" +#include "ace/Reactor.h" +#include "ace/Thread.h" +#include "ace/Sig_Handler.h" +#include "ace/OS_NS_errno.h" + +ACE_BEGIN_VERSIONED_NAMESPACE_DECL + +/************************************************************/ + +ACE_INLINE int +ACE_Wakeup_All_Threads_Handler::handle_signal (int /* signum */, + siginfo_t * /* siginfo */, + ucontext_t *) +{ + // This will get called when <WFMO_Reactor->wakeup_all_threads_> event + // is signaled. There is nothing to be done here. + // ACE_DEBUG ((LM_DEBUG, ACE_LIB_TEXT ("(%t) waking up to get updated handle set info\n"))); + return 0; +} + +#if defined (ACE_WIN32) + +/************************************************************/ + +ACE_INLINE +ACE_WFMO_Reactor_Handler_Repository::Common_Info::Common_Info (void) + : io_entry_ (0), + event_handler_ (0), + io_handle_ (ACE_INVALID_HANDLE), + network_events_ (0), + delete_event_ (0), + delete_entry_ (0), + close_masks_ (ACE_Event_Handler::NULL_MASK) +{ +} + +ACE_INLINE void +ACE_WFMO_Reactor_Handler_Repository::Common_Info::reset (void) +{ + this->event_handler_ = 0; + this->io_entry_ = 0; + this->io_handle_ = ACE_INVALID_HANDLE; + this->network_events_ = 0; + this->delete_event_ = 0; + this->delete_entry_ = 0; + this->close_masks_ = ACE_Event_Handler::NULL_MASK; +} + +ACE_INLINE void +ACE_WFMO_Reactor_Handler_Repository::Common_Info::set (int io_entry, + ACE_Event_Handler *event_handler, + ACE_HANDLE io_handle, + long network_events, + int delete_event, + int delete_entry, + ACE_Reactor_Mask close_masks) +{ + this->event_handler_ = event_handler; + this->io_entry_ = io_entry; + this->io_handle_ = io_handle; + this->network_events_ = network_events; + this->delete_event_ = delete_event; + this->delete_entry_ = delete_entry; + this->close_masks_ = close_masks; +} + +ACE_INLINE void +ACE_WFMO_Reactor_Handler_Repository::Common_Info::set (Common_Info &common_info) +{ + *this = common_info; +} + +ACE_INLINE void +ACE_WFMO_Reactor_Handler_Repository::Common_Info::dump (void) const +{ +#if defined (ACE_HAS_DUMP) + ACE_TRACE ("ACE_WFMO_Reactor_Handler_Repository::Common_Info::dump"); + + ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this)); + + ACE_DEBUG ((LM_DEBUG, + ACE_LIB_TEXT ("I/O Entry = %d\n"), + this->io_entry_)); + + ACE_DEBUG ((LM_DEBUG, + ACE_LIB_TEXT ("Event Handler = %d\n"), + this->event_handler_)); + + ACE_DEBUG ((LM_DEBUG, + ACE_LIB_TEXT ("I/O Handle = %d\n"), + this->io_handle_)); + + ACE_DEBUG ((LM_DEBUG, + ACE_LIB_TEXT ("Network Events = %d\n"), + this->network_events_)); + + ACE_DEBUG ((LM_DEBUG, + ACE_LIB_TEXT ("Delete Event = %d\n"), + this->delete_event_)); + + ACE_DEBUG ((LM_DEBUG, + ACE_LIB_TEXT ("Delete Entry = %d\n"), + this->delete_entry_)); + + ACE_DEBUG ((LM_DEBUG, + ACE_LIB_TEXT ("Close Masks = %d\n"), + this->close_masks_)); + + ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP)); +#endif /* ACE_HAS_DUMP */ +} + +/************************************************************/ + +ACE_INLINE +ACE_WFMO_Reactor_Handler_Repository::Current_Info::Current_Info (void) + : suspend_entry_ (0) +{ +} + +ACE_INLINE void +ACE_WFMO_Reactor_Handler_Repository::Current_Info::set (int io_entry, + ACE_Event_Handler *event_handler, + ACE_HANDLE io_handle, + long network_events, + int delete_event, + int delete_entry, + ACE_Reactor_Mask close_masks, + int suspend_entry) +{ + this->suspend_entry_ = suspend_entry; + Common_Info::set (io_entry, + event_handler, + io_handle, + network_events, + delete_event, + delete_entry, + close_masks); +} + +ACE_INLINE void +ACE_WFMO_Reactor_Handler_Repository::Current_Info::set (Common_Info &common_info, + int suspend_entry) +{ + this->suspend_entry_ = suspend_entry; + Common_Info::set (common_info); +} + +ACE_INLINE void +ACE_WFMO_Reactor_Handler_Repository::Current_Info::reset (void) +{ + this->suspend_entry_ = 0; + Common_Info::reset (); +} + +ACE_INLINE void +ACE_WFMO_Reactor_Handler_Repository::Current_Info::dump (ACE_HANDLE event_handle) const +{ +#if defined (ACE_HAS_DUMP) + ACE_TRACE ("ACE_WFMO_Reactor_Handler_Repository::Current_Info::dump"); + + ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this)); + + Common_Info::dump (); + + ACE_DEBUG ((LM_DEBUG, + ACE_LIB_TEXT ("Event Handle = %d\n"), + event_handle)); + + ACE_DEBUG ((LM_DEBUG, + ACE_LIB_TEXT ("Suspend Entry = %d\n"), + this->suspend_entry_)); + + ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP)); +#else /* !ACE_HAS_DUMP */ + ACE_UNUSED_ARG (event_handle); +#endif /* ACE_HAS_DUMP */ +} + +/************************************************************/ + +ACE_INLINE +ACE_WFMO_Reactor_Handler_Repository::To_Be_Added_Info::To_Be_Added_Info (void) + : event_handle_ (ACE_INVALID_HANDLE), + suspend_entry_ (0) +{ +} + +ACE_INLINE void +ACE_WFMO_Reactor_Handler_Repository::To_Be_Added_Info::set (ACE_HANDLE event_handle, + int io_entry, + ACE_Event_Handler *event_handler, + ACE_HANDLE io_handle, + long network_events, + int delete_event, + int delete_entry, + ACE_Reactor_Mask close_masks, + int suspend_entry) +{ + this->event_handle_ = event_handle; + this->suspend_entry_ = suspend_entry; + Common_Info::set (io_entry, + event_handler, + io_handle, + network_events, + delete_event, + delete_entry, + close_masks); +} + +ACE_INLINE void +ACE_WFMO_Reactor_Handler_Repository::To_Be_Added_Info::set (ACE_HANDLE event_handle, + Common_Info &common_info, + int suspend_entry) +{ + this->event_handle_ = event_handle; + this->suspend_entry_ = suspend_entry; + Common_Info::set (common_info); +} + +ACE_INLINE void +ACE_WFMO_Reactor_Handler_Repository::To_Be_Added_Info::reset (void) +{ + this->event_handle_ = ACE_INVALID_HANDLE; + this->suspend_entry_ = 0; + Common_Info::reset (); +} + +ACE_INLINE void +ACE_WFMO_Reactor_Handler_Repository::To_Be_Added_Info::dump (void) const +{ +#if defined (ACE_HAS_DUMP) + ACE_TRACE ("ACE_WFMO_Reactor_Handler_Repository::To_Be_Added_Info::dump"); + + ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this)); + + Common_Info::dump (); + + ACE_DEBUG ((LM_DEBUG, + ACE_LIB_TEXT ("Event Handle = %d\n"), + this->event_handle_)); + + ACE_DEBUG ((LM_DEBUG, + ACE_LIB_TEXT ("Suspend Entry = %d\n"), + this->suspend_entry_)); + + ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP)); +#endif /* ACE_HAS_DUMP */ +} + +/************************************************************/ + +ACE_INLINE +ACE_WFMO_Reactor_Handler_Repository::Suspended_Info::Suspended_Info (void) + : event_handle_ (ACE_INVALID_HANDLE), + resume_entry_ (0) +{ +} + +ACE_INLINE void +ACE_WFMO_Reactor_Handler_Repository::Suspended_Info::reset (void) +{ + this->event_handle_ = ACE_INVALID_HANDLE; + this->resume_entry_ = 0; + Common_Info::reset (); +} + +ACE_INLINE void +ACE_WFMO_Reactor_Handler_Repository::Suspended_Info::set (ACE_HANDLE event_handle, + int io_entry, + ACE_Event_Handler *event_handler, + ACE_HANDLE io_handle, + long network_events, + int delete_event, + int delete_entry, + ACE_Reactor_Mask close_masks, + int resume_entry) +{ + this->event_handle_ = event_handle; + this->resume_entry_ = resume_entry; + Common_Info::set (io_entry, + event_handler, + io_handle, + network_events, + delete_event, + delete_entry, + close_masks); +} + +ACE_INLINE void +ACE_WFMO_Reactor_Handler_Repository::Suspended_Info::set (ACE_HANDLE event_handle, + Common_Info &common_info, + int resume_entry) +{ + this->event_handle_ = event_handle; + this->resume_entry_ = resume_entry; + Common_Info::set (common_info); +} + +ACE_INLINE void +ACE_WFMO_Reactor_Handler_Repository::Suspended_Info::dump (void) const +{ +#if defined (ACE_HAS_DUMP) + ACE_TRACE ("ACE_WFMO_Reactor_Handler_Repository::Suspended_Info::dump"); + + ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this)); + + Common_Info::dump (); + + ACE_DEBUG ((LM_DEBUG, + ACE_LIB_TEXT ("Event Handle = %d\n"), + this->event_handle_)); + + ACE_DEBUG ((LM_DEBUG, + ACE_LIB_TEXT ("Resume Entry = %d\n"), + this->resume_entry_)); + + ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP)); +#endif /* ACE_HAS_DUMP */ +} + +/************************************************************/ + +ACE_INLINE int +ACE_WFMO_Reactor_Handler_Repository::close (void) +{ + // Let all the handlers know that the <WFMO_Reactor> is closing down + this->unbind_all (); + + return 0; +} + +ACE_INLINE ACE_HANDLE * +ACE_WFMO_Reactor_Handler_Repository::handles (void) const +{ + // This code is probably too subtle to be useful in the long run... + // The basic idea is that all threads wait on all user handles plus + // the <wakeup_all_threads_> handle. The owner thread additional + // waits on the <notify_> handle. This is to ensure that only the + // <owner_> thread get to expire timers and handle event on the + // notify pipe. + if (ACE_Thread::self () == this->wfmo_reactor_.owner_i ()) + return this->current_handles_; + else + return this->current_handles_ + 1; +} + +ACE_INLINE ACE_WFMO_Reactor_Handler_Repository::Current_Info * +ACE_WFMO_Reactor_Handler_Repository::current_info (void) const +{ + if (ACE_Thread::self () == this->wfmo_reactor_.owner_i ()) + return this->current_info_; + else + return this->current_info_ + 1; +} + +ACE_INLINE DWORD +ACE_WFMO_Reactor_Handler_Repository::max_handlep1 (void) const +{ + if (ACE_Thread::self () == this->wfmo_reactor_.owner_i ()) + return this->max_handlep1_; + else + return this->max_handlep1_ - 1; +} + +ACE_INLINE int +ACE_WFMO_Reactor_Handler_Repository::scheduled_for_deletion (size_t slot) const +{ + if (ACE_Thread::self () == this->wfmo_reactor_.owner_i ()) + return this->current_info_[slot].delete_entry_ == 1; + else + return this->current_info_[slot + 1].delete_entry_ == 1; +} + +ACE_INLINE int +ACE_WFMO_Reactor_Handler_Repository::invalid_handle (ACE_HANDLE handle) const +{ + ACE_TRACE ("ACE_WFMO_Reactor_Handler_Repository::invalid_handle"); + // It's too expensive to perform more exhaustive validity checks on + // Win32 due to the way that they implement SOCKET HANDLEs. + if (handle == ACE_INVALID_HANDLE) + { + errno = EINVAL; + return 1; + } + else + return 0; +} + +ACE_INLINE int +ACE_WFMO_Reactor_Handler_Repository::changes_required (void) +{ + // Check if handles have be scheduled for additions or removal + return this->handles_to_be_added_ > 0 + || this->handles_to_be_deleted_ > 0 + || this->handles_to_be_suspended_ > 0 + || this->handles_to_be_resumed_ > 0; +} + +ACE_INLINE int +ACE_WFMO_Reactor_Handler_Repository::make_changes (void) +{ + // This method must ONLY be called by the + // <WFMO_Reactor->change_state_thread_>. We therefore assume that + // there will be no contention for this method and hence no guards + // are neccessary. + + // Deletions and suspensions in current_info_ + this->make_changes_in_current_infos (); + + // Deletions and resumptions in current_suspended_info_ + this->make_changes_in_suspension_infos (); + + // Deletions in to_be_added_info_, or transfers to current_info_ or + // current_suspended_info_ from to_be_added_info_ + this->make_changes_in_to_be_added_infos (); + + return 0; +} + +ACE_INLINE int +ACE_WFMO_Reactor_Handler_Repository::unbind (ACE_HANDLE handle, + ACE_Reactor_Mask mask) +{ + if (this->invalid_handle (handle)) + return -1; + + ACE_GUARD_RETURN (ACE_Process_Mutex, ace_mon, this->wfmo_reactor_.lock_, -1); + + int changes_required = 0; + int const result = this->unbind_i (handle, + mask, + changes_required); + + if (changes_required) + // Wake up all threads in WaitForMultipleObjects so that they can + // reconsult the handle set + this->wfmo_reactor_.wakeup_all_threads (); + + return result; +} + +ACE_INLINE int +ACE_WFMO_Reactor::reset_timer_interval + (long timer_id, + const ACE_Time_Value &interval) +{ + ACE_TRACE ("ACE_WFMO_Reactor::reset_timer_interval"); + + if (0 != this->timer_queue_) + { + long result = this->timer_queue_->reset_interval + (timer_id, + interval); + + // Wakeup the owner thread so that it gets the latest timer values + this->notify (); + + return result; + } + + errno = ESHUTDOWN; + return -1; +} + +ACE_INLINE long +ACE_WFMO_Reactor::schedule_timer (ACE_Event_Handler *handler, + const void *arg, + const ACE_Time_Value &delay_time, + const ACE_Time_Value &interval) +{ + ACE_TRACE ("ACE_WFMO_Reactor::schedule_timer"); + + if (0 != this->timer_queue_) + { + long result = this->timer_queue_->schedule + (handler, + arg, + timer_queue_->gettimeofday () + delay_time, + interval); + + // Wakeup the owner thread so that it gets the latest timer values + this->notify (); + + return result; + } + + errno = ESHUTDOWN; + return -1; +} + +ACE_INLINE int +ACE_WFMO_Reactor::cancel_timer (ACE_Event_Handler *handler, + int dont_call_handle_close) +{ + ACE_TRACE ("ACE_WFMO_Reactor::cancel_timer"); + if (0 != this->timer_queue_) + return this->timer_queue_->cancel (handler, dont_call_handle_close); + return 0; +} + +ACE_INLINE int +ACE_WFMO_Reactor::cancel_timer (long timer_id, + const void **arg, + int dont_call_handle_close) +{ + ACE_TRACE ("ACE_WFMO_Reactor::cancel_timer"); + if (0 != this->timer_queue_) + return this->timer_queue_->cancel (timer_id, arg, dont_call_handle_close); + return 0; +} + +ACE_INLINE int +ACE_WFMO_Reactor::register_handler (ACE_Event_Handler *event_handler, + ACE_HANDLE event_handle) +{ + // This GUARD is necessary since we are updating shared state. + ACE_GUARD_RETURN (ACE_Process_Mutex, ace_mon, this->lock_, -1); + + return this->handler_rep_.bind_i (0, + event_handler, + 0, + ACE_INVALID_HANDLE, + event_handle, + 0); +} + +ACE_INLINE int +ACE_WFMO_Reactor::register_handler (ACE_Event_Handler *event_handler, + ACE_Reactor_Mask mask) +{ + // This GUARD is necessary since we are updating shared state. + ACE_GUARD_RETURN (ACE_Process_Mutex, ace_mon, this->lock_, -1); + + return this->register_handler_i (ACE_INVALID_HANDLE, + ACE_INVALID_HANDLE, + event_handler, + mask); +} + +ACE_INLINE int +ACE_WFMO_Reactor::register_handler (ACE_HANDLE io_handle, + ACE_Event_Handler *event_handler, + ACE_Reactor_Mask mask) +{ + // This GUARD is necessary since we are updating shared state. + ACE_GUARD_RETURN (ACE_Process_Mutex, ace_mon, this->lock_, -1); + + return this->register_handler_i (ACE_INVALID_HANDLE, + io_handle, + event_handler, + mask); +} + +ACE_INLINE int +ACE_WFMO_Reactor::register_handler (ACE_HANDLE event_handle, + ACE_HANDLE io_handle, + ACE_Event_Handler *event_handler, + ACE_Reactor_Mask mask) +{ + // This GUARD is necessary since we are updating shared state. + ACE_GUARD_RETURN (ACE_Process_Mutex, ace_mon, this->lock_, -1); + + return this->register_handler_i (event_handle, + io_handle, + event_handler, + mask); +} + +ACE_INLINE int +ACE_WFMO_Reactor::register_handler (const ACE_Handle_Set &handles, + ACE_Event_Handler *handler, + ACE_Reactor_Mask mask) +{ + // This GUARD is necessary since we are updating shared state. + ACE_GUARD_RETURN (ACE_Process_Mutex, ace_mon, this->lock_, -1); + + ACE_Handle_Set_Iterator handle_iter (handles); + ACE_HANDLE h; + + while ((h = handle_iter ()) != ACE_INVALID_HANDLE) + if (this->register_handler_i (h, + ACE_INVALID_HANDLE, + handler, + mask) == -1) + return -1; + + return 0; +} + +ACE_INLINE int +ACE_WFMO_Reactor::schedule_wakeup (ACE_HANDLE io_handle, + ACE_Reactor_Mask masks_to_be_added) +{ + // This GUARD is necessary since we are updating shared state. + ACE_GUARD_RETURN (ACE_Process_Mutex, ace_mon, this->lock_, -1); + + return this->mask_ops_i (io_handle, + masks_to_be_added, + ACE_Reactor::ADD_MASK); +} + +ACE_INLINE int +ACE_WFMO_Reactor::schedule_wakeup (ACE_Event_Handler *event_handler, + ACE_Reactor_Mask masks_to_be_added) +{ + // This GUARD is necessary since we are updating shared state. + ACE_GUARD_RETURN (ACE_Process_Mutex, ace_mon, this->lock_, -1); + + return this->mask_ops_i (event_handler->get_handle (), + masks_to_be_added, + ACE_Reactor::ADD_MASK); +} + +ACE_INLINE int +ACE_WFMO_Reactor::cancel_wakeup (ACE_HANDLE io_handle, + ACE_Reactor_Mask masks_to_be_removed) +{ + // This GUARD is necessary since we are updating shared state. + ACE_GUARD_RETURN (ACE_Process_Mutex, ace_mon, this->lock_, -1); + + return this->mask_ops_i (io_handle, + masks_to_be_removed, + ACE_Reactor::CLR_MASK); +} + +ACE_INLINE int +ACE_WFMO_Reactor::cancel_wakeup (ACE_Event_Handler *event_handler, + ACE_Reactor_Mask masks_to_be_removed) +{ + // This GUARD is necessary since we are updating shared state. + ACE_GUARD_RETURN (ACE_Process_Mutex, ace_mon, this->lock_, -1); + + return this->mask_ops_i (event_handler->get_handle (), + masks_to_be_removed, + ACE_Reactor::CLR_MASK); +} + +ACE_INLINE int +ACE_WFMO_Reactor::remove_handler (ACE_Event_Handler *event_handler, + ACE_Reactor_Mask mask) +{ + return this->handler_rep_.unbind (event_handler->get_handle (), + mask); +} + +ACE_INLINE int +ACE_WFMO_Reactor::remove_handler (ACE_HANDLE handle, + ACE_Reactor_Mask mask) +{ + return this->handler_rep_.unbind (handle, + mask); +} + +ACE_INLINE int +ACE_WFMO_Reactor::remove_handler (const ACE_Handle_Set &handles, + ACE_Reactor_Mask mask) +{ + ACE_Handle_Set_Iterator handle_iter (handles); + ACE_HANDLE h; + int changes_required = 0; + + ACE_GUARD_RETURN (ACE_Process_Mutex, ace_mon, this->lock_, -1); + + while ((h = handle_iter ()) != ACE_INVALID_HANDLE) + if (this->handler_rep_.unbind_i (h, + mask, + changes_required) == -1) + return -1; + + // Wake up all threads in WaitForMultipleObjects so that they can + // reconsult the handle set + this->wakeup_all_threads (); + + return 0; +} + +ACE_INLINE int +ACE_WFMO_Reactor::suspend_handler (ACE_HANDLE handle) +{ + ACE_GUARD_RETURN (ACE_Process_Mutex, ace_mon, this->lock_, -1); + + int changes_required = 0; + int const result = + this->handler_rep_.suspend_handler_i (handle, + changes_required); + + if (changes_required) + // Wake up all threads in WaitForMultipleObjects so that they can + // reconsult the handle set + this->wakeup_all_threads (); + + return result; +} + +ACE_INLINE int +ACE_WFMO_Reactor::suspend_handler (ACE_Event_Handler *event_handler) +{ + return this->suspend_handler (event_handler->get_handle ()); +} + +ACE_INLINE int +ACE_WFMO_Reactor::suspend_handler (const ACE_Handle_Set &handles) +{ + ACE_Handle_Set_Iterator handle_iter (handles); + ACE_HANDLE h; + int changes_required = 0; + + ACE_GUARD_RETURN (ACE_Process_Mutex, ace_mon, this->lock_, -1); + + while ((h = handle_iter ()) != ACE_INVALID_HANDLE) + if (this->handler_rep_.suspend_handler_i (h, + changes_required) == -1) + return -1; + + // Wake up all threads in WaitForMultipleObjects so that they can + // reconsult the handle set + this->wakeup_all_threads (); + + return 0; +} + +ACE_INLINE int +ACE_WFMO_Reactor::suspend_handlers (void) +{ + int error = 0; + int result = 0; + ACE_GUARD_RETURN (ACE_Process_Mutex, ace_mon, this->lock_, -1); + + // First suspend all current handles + int changes_required = 0; + + for (size_t i = 0; + i < this->handler_rep_.max_handlep1_ && error == 0; + i++) + { + result = + this->handler_rep_.suspend_handler_i (this->handler_rep_.current_handles_[i], + changes_required); + if (result == -1) + error = 1; + } + + if (!error) + // Then suspend all to_be_added_handles + for (size_t i = 0; + i < this->handler_rep_.handles_to_be_added_; + i++) + this->handler_rep_.to_be_added_info_[i].suspend_entry_ = 1; + + // Wake up all threads in WaitForMultipleObjects so that they can + // reconsult the handle set + this->wakeup_all_threads (); + + return error ? -1 : 0; +} + +ACE_INLINE int +ACE_WFMO_Reactor::resume_handler (ACE_HANDLE handle) +{ + ACE_GUARD_RETURN (ACE_Process_Mutex, ace_mon, this->lock_, -1); + + int changes_required = 0; + int result = + this->handler_rep_.resume_handler_i (handle, + changes_required); + + if (changes_required) + // Wake up all threads in WaitForMultipleObjects so that they can + // reconsult the handle set + this->wakeup_all_threads (); + + return result; +} + +ACE_INLINE int +ACE_WFMO_Reactor::resume_handler (ACE_Event_Handler *event_handler) +{ + return this->resume_handler (event_handler->get_handle ()); +} + +ACE_INLINE int +ACE_WFMO_Reactor::resume_handler (const ACE_Handle_Set &handles) +{ + ACE_Handle_Set_Iterator handle_iter (handles); + ACE_HANDLE h; + int changes_required = 0; + + ACE_GUARD_RETURN (ACE_Process_Mutex, ace_mon, this->lock_, -1); + + while ((h = handle_iter ()) != ACE_INVALID_HANDLE) + if (this->handler_rep_.resume_handler_i (h, + changes_required) == -1) + return -1; + + // Wake up all threads in WaitForMultipleObjects so that they can + // reconsult the handle set + this->wakeup_all_threads (); + + return 0; +} + +ACE_INLINE int +ACE_WFMO_Reactor::resume_handlers (void) +{ + int error = 0; + int result = 0; + ACE_GUARD_RETURN (ACE_Process_Mutex, ace_mon, this->lock_, -1); + + int changes_required = 0; + for (size_t i = 0; + i < this->handler_rep_.suspended_handles_ && error == 0; + i++) + { + result = + this->handler_rep_.resume_handler_i (this->handler_rep_.current_suspended_info_[i].event_handle_, + changes_required); + if (result == -1) + error = 1; + } + + if (!error) + // Then resume all to_be_added_handles + for (size_t i = 0; + i < this->handler_rep_.handles_to_be_added_; + i++) + this->handler_rep_.to_be_added_info_[i].suspend_entry_ = 0; + + // Wake up all threads in WaitForMultipleObjects so that they can + // reconsult the handle set + this->wakeup_all_threads (); + + return error ? -1 : 0; +} + +ACE_INLINE int +ACE_WFMO_Reactor::uses_event_associations (void) +{ + // Since the WFMO_Reactor does use event associations, this function + // always return 1. + return 1; +} + +ACE_INLINE int +ACE_WFMO_Reactor::handle_events (ACE_Time_Value &how_long) +{ + return this->event_handling (&how_long, FALSE); +} + +ACE_INLINE int +ACE_WFMO_Reactor::alertable_handle_events (ACE_Time_Value &how_long) +{ + return this->event_handling (&how_long, TRUE); +} + +ACE_INLINE int +ACE_WFMO_Reactor::handle_events (ACE_Time_Value *how_long) +{ + return this->event_handling (how_long, FALSE); +} + +ACE_INLINE int +ACE_WFMO_Reactor::alertable_handle_events (ACE_Time_Value *how_long) +{ + return this->event_handling (how_long, TRUE); +} + +ACE_INLINE int +ACE_WFMO_Reactor::deactivated (void) +{ + return this->deactivated_; +} + +ACE_INLINE void +ACE_WFMO_Reactor::deactivate (int do_stop) +{ + this->deactivated_ = do_stop; + this->wakeup_all_threads (); +} + +ACE_INLINE int +ACE_WFMO_Reactor::owner (ACE_thread_t *t) +{ + ACE_GUARD_RETURN (ACE_Process_Mutex, ace_mon, this->lock_, -1); + *t = this->owner_i (); + return 0; +} + +ACE_INLINE ACE_thread_t +ACE_WFMO_Reactor::owner_i (void) +{ + return this->owner_; +} + +ACE_INLINE int +ACE_WFMO_Reactor::owner (ACE_thread_t new_owner, ACE_thread_t *old_owner) +{ + ACE_GUARD_RETURN (ACE_Process_Mutex, monitor, this->lock_, -1); + this->new_owner_ = new_owner; + + if (old_owner != 0) + *old_owner = this->owner_i (); + + // Wake up all threads in WaitForMultipleObjects so that they can + // reconsult the new owner responsibilities + this->wakeup_all_threads (); + + return 0; +} + +ACE_INLINE int +ACE_WFMO_Reactor::new_owner (void) +{ + return this->new_owner_ != ACE_thread_t (0); +} + +ACE_INLINE int +ACE_WFMO_Reactor::change_owner (void) +{ + this->owner_ = this->new_owner_; + this->new_owner_ = ACE_thread_t (0); + return 0; +} + +ACE_INLINE int +ACE_WFMO_Reactor::safe_dispatch (DWORD wait_status) +{ + int result = -1; + ACE_SEH_TRY + { + result = this->dispatch (wait_status); + } + ACE_SEH_FINALLY + { + this->update_state (); + } + + return result; +} + +ACE_INLINE int +ACE_WFMO_Reactor::dispatch_window_messages (void) +{ + return 0; +} + +ACE_INLINE void +ACE_WFMO_Reactor::wakeup_all_threads (void) +{ + this->wakeup_all_threads_.signal (); +} + +ACE_INLINE int +ACE_WFMO_Reactor::notify (ACE_Event_Handler *event_handler, + ACE_Reactor_Mask mask, + ACE_Time_Value *timeout) +{ + return this->notify_handler_->notify (event_handler, mask, timeout); +} + +ACE_INLINE int +ACE_WFMO_Reactor::register_handler (int signum, + ACE_Event_Handler *new_sh, + ACE_Sig_Action *new_disp, + ACE_Event_Handler **old_sh, + ACE_Sig_Action *old_disp) +{ + return this->signal_handler_->register_handler (signum, + new_sh, new_disp, + old_sh, old_disp); +} + +ACE_INLINE int +ACE_WFMO_Reactor::register_handler (const ACE_Sig_Set &sigset, + ACE_Event_Handler *new_sh, + ACE_Sig_Action *new_disp) +{ + int result = 0; + +#if (ACE_NSIG > 0) + for (int s = 1; s < ACE_NSIG; s++) + if (sigset.is_member (s) + && this->signal_handler_->register_handler (s, + new_sh, + new_disp) == -1) + result = -1; +#else + ACE_UNUSED_ARG (sigset); + ACE_UNUSED_ARG (new_sh); + ACE_UNUSED_ARG (new_disp); +#endif /* ACE_NSIG */ + + return result; +} + +ACE_INLINE int +ACE_WFMO_Reactor::remove_handler (int signum, + ACE_Sig_Action *new_disp, + ACE_Sig_Action *old_disp, + int sigkey) +{ + return this->signal_handler_->remove_handler (signum, + new_disp, + old_disp, + sigkey); +} + +ACE_INLINE int +ACE_WFMO_Reactor::remove_handler (const ACE_Sig_Set &sigset) +{ + int result = 0; + +#if (ACE_NSIG > 0) + for (int s = 1; s < ACE_NSIG; s++) + if (sigset.is_member (s) + && this->signal_handler_->remove_handler (s) == -1) + result = -1; +#else + ACE_UNUSED_ARG (sigset); +#endif /* ACE_NSIG */ + + return result; +} + +ACE_INLINE int +ACE_WFMO_Reactor::handler (int signum, ACE_Event_Handler **eh) +{ + ACE_Event_Handler *handler = + this->signal_handler_->handler (signum); + + if (handler == 0) + return -1; + else if (eh != 0) + *eh = handler; + return 0; +} + +ACE_INLINE int +ACE_WFMO_Reactor::mask_ops (ACE_Event_Handler *event_handler, + ACE_Reactor_Mask mask, + int operation) +{ + ACE_GUARD_RETURN (ACE_Process_Mutex, monitor, this->lock_, -1); + + return this->mask_ops_i (event_handler->get_handle (), + mask, + operation); +} + +ACE_INLINE int +ACE_WFMO_Reactor::mask_ops (ACE_HANDLE io_handle, + ACE_Reactor_Mask mask, + int operation) +{ + ACE_GUARD_RETURN (ACE_Process_Mutex, monitor, this->lock_, -1); + + return this->mask_ops_i (io_handle, + mask, + operation); +} + +ACE_INLINE void +ACE_WFMO_Reactor::requeue_position (int) +{ + // Not implemented +} + +ACE_INLINE int +ACE_WFMO_Reactor::requeue_position (void) +{ + // Don't have an implementation for this yet... + ACE_NOTSUP_RETURN (-1); +} + +ACE_INLINE int +ACE_WFMO_Reactor::restart (void) +{ + return 0; +} + +ACE_INLINE int +ACE_WFMO_Reactor::restart (int) +{ + return 0; +} + +ACE_INLINE int +ACE_WFMO_Reactor::ready_ops (ACE_Event_Handler *event_handler, + ACE_Reactor_Mask mask, + int ops) +{ + // Don't have an implementation for this yet... + ACE_UNUSED_ARG (event_handler); + ACE_UNUSED_ARG (mask); + ACE_UNUSED_ARG (ops); + ACE_NOTSUP_RETURN (-1); +} + +ACE_INLINE int +ACE_WFMO_Reactor::ready_ops (ACE_HANDLE handle, + ACE_Reactor_Mask, + int ops) +{ + // Don't have an implementation for this yet... + ACE_UNUSED_ARG (handle); + ACE_UNUSED_ARG (ops); + ACE_NOTSUP_RETURN (-1); +} + +ACE_INLINE ACE_Event_Handler * +ACE_WFMO_Reactor::find_handler (ACE_HANDLE handle) +{ + ACE_GUARD_RETURN (ACE_Process_Mutex, ace_mon, this->lock_, 0); + + return this->handler_rep_.find_handler (handle); +} + +ACE_INLINE int +ACE_WFMO_Reactor::handler (ACE_HANDLE handle, + ACE_Reactor_Mask mask, + ACE_Event_Handler **event_handler) +{ + ACE_GUARD_RETURN (ACE_Process_Mutex, ace_mon, this->lock_, -1); + + return this->handler_rep_.handler (handle, + mask, + event_handler); +} + +ACE_INLINE int +ACE_WFMO_Reactor::initialized (void) +{ + return this->open_for_business_; +} + +ACE_INLINE ACE_Lock & +ACE_WFMO_Reactor::lock (void) +{ + return this->lock_adapter_; +} + +ACE_INLINE size_t +ACE_WFMO_Reactor::size (void) const +{ + // Size of repository minus the 2 used for internal purposes + return this->handler_rep_.max_size_ - 2; +} +#else +ACE_INLINE int +ACE_WFMO_Reactor_Handler_Repository::changes_required (void) +{ + return 0; +} + +ACE_INLINE int +ACE_WFMO_Reactor_Handler_Repository::make_changes (void) +{ + return 0; +} + +ACE_INLINE +ACE_WFMO_Reactor_Handler_Repository::~ACE_WFMO_Reactor_Handler_Repository (void) +{ +} + +#endif /* ACE_WIN32 */ + +ACE_END_VERSIONED_NAMESPACE_DECL |