diff options
-rw-r--r-- | ace/ACE.cpp | 4 | ||||
-rw-r--r-- | ace/Acceptor.cpp | 21 | ||||
-rw-r--r-- | ace/Acceptor.h | 10 | ||||
-rw-r--r-- | ace/FILE_Addr.cpp | 4 | ||||
-rw-r--r-- | ace/LSOCK_Acceptor.h | 3 | ||||
-rw-r--r-- | ace/Reactor.cpp | 38 | ||||
-rw-r--r-- | ace/Reactor.h | 20 | ||||
-rw-r--r-- | ace/Reactor.i | 7 | ||||
-rw-r--r-- | ace/Reactor_Impl.h | 40 | ||||
-rw-r--r-- | ace/Select_Reactor.cpp | 107 | ||||
-rw-r--r-- | ace/Select_Reactor.h | 70 | ||||
-rw-r--r-- | ace/WFMO_Reactor.cpp | 208 | ||||
-rw-r--r-- | ace/WFMO_Reactor.h | 53 |
13 files changed, 416 insertions, 169 deletions
diff --git a/ace/ACE.cpp b/ace/ACE.cpp index c2060972caf..145c6b213c4 100644 --- a/ace/ACE.cpp +++ b/ace/ACE.cpp @@ -642,8 +642,8 @@ ACE::ldfind (const ASYS_TCHAR filename[], // Make sure we've got enough space in searchfilename. if (ACE_OS::strlen (searchfilename) + ACE_OS::strlen (ACE_DLL_PREFIX) - + got_suffix ? 0 ACE_OS::strlen (dll_suffix) >= (sizeof searchfilename / - sizeof (ASYS_TCHAR))) + + got_suffix ? 0 : ACE_OS::strlen (dll_suffix) >= (sizeof searchfilename / + sizeof (ASYS_TCHAR))) { errno = ENOMEM; return -1; diff --git a/ace/Acceptor.cpp b/ace/Acceptor.cpp index 2fcc9185185..05276762fc8 100644 --- a/ace/Acceptor.cpp +++ b/ace/Acceptor.cpp @@ -80,7 +80,9 @@ ACE_Acceptor<SVC_HANDLER, ACE_PEER_ACCEPTOR_2>::open // Simple constructor. template <class SVC_HANDLER, ACE_PEER_ACCEPTOR_1> -ACE_Acceptor<SVC_HANDLER, ACE_PEER_ACCEPTOR_2>::ACE_Acceptor (ACE_Reactor *reactor) +ACE_Acceptor<SVC_HANDLER, ACE_PEER_ACCEPTOR_2>::ACE_Acceptor (ACE_Reactor *reactor, + int use_select) + : use_select_ (use_select) { ACE_TRACE ("ACE_Acceptor<SVC_HANDLER, ACE_PEER_ACCEPTOR_2>::ACE_Acceptor"); @@ -91,11 +93,16 @@ template <class SVC_HANDLER, ACE_PEER_ACCEPTOR_1> ACE_Acceptor<SVC_HANDLER, ACE_PEER_ACCEPTOR_2>::ACE_Acceptor (const ACE_PEER_ACCEPTOR_ADDR &addr, ACE_Reactor *reactor, - int flags) + int flags, + use_select) + : use_select_ (use_select) { ACE_TRACE ("ACE_Acceptor<SVC_HANDLER, ACE_PEER_ACCEPTOR_2>::ACE_Acceptor"); + if (this->open (addr, reactor, flags) == -1) - ACE_ERROR ((LM_ERROR, ASYS_TEXT ("%p\n"), ASYS_TEXT ("ACE_Acceptor::ACE_Acceptor"))); + ACE_ERROR ((LM_ERROR, + ASYS_TEXT ("%p\n"), + ASYS_TEXT ("ACE_Acceptor::ACE_Acceptor"))); } template <class SVC_HANDLER, ACE_PEER_ACCEPTOR_1> @@ -332,9 +339,15 @@ ACE_Acceptor<SVC_HANDLER, ACE_PEER_ACCEPTOR_2>::handle_input (ACE_HANDLE listene conn_handle.set_bit (listener); } + // Now, check to see if there is another connection pending and // break out of the loop if there is none. - while (ACE_OS::select (int (listener) + 1, conn_handle, 0, 0, &timeout) == 1); + while (this->use_select_ + && ACE_OS::select (int (listener) + 1, + conn_handle, + 0, + 0, + &timeout) == 1); return 0; } diff --git a/ace/Acceptor.h b/ace/Acceptor.h index d1c4149210c..f067206a25b 100644 --- a/ace/Acceptor.h +++ b/ace/Acceptor.h @@ -48,12 +48,14 @@ class ACE_Acceptor : public ACE_Service_Object // a group. public: // = Initialization and termination methods. - ACE_Acceptor (ACE_Reactor * = 0); + ACE_Acceptor (ACE_Reactor * = 0, + int use_select = 1); // "Do-nothing" constructor. ACE_Acceptor (const ACE_PEER_ACCEPTOR_ADDR &local_addr, ACE_Reactor * = ACE_Reactor::instance (), - int flags = 0); + int flags = 0, + int use_select = 1); // Initialize and register <this> with the Reactor and listen for // connection requests at the designated <local_addr>. <flags> // indicates how <SVC_HANDLER>'s should be initialized prior to @@ -157,6 +159,10 @@ private: // prior to being activated. Right now, the only flag that is // processed is <ACE_NONBLOCK>, which enabled non-blocking I/O on // the <SVC_HANDLER> when it is opened. + + int use_select_; + // Flag that indicates whether it shall use <select> in the + // <accept>-loop. }; template <class SVC_HANDLER, ACE_PEER_ACCEPTOR_1> diff --git a/ace/FILE_Addr.cpp b/ace/FILE_Addr.cpp index 3c367d8b180..76d14fa77dc 100644 --- a/ace/FILE_Addr.cpp +++ b/ace/FILE_Addr.cpp @@ -23,8 +23,8 @@ ACE_FILE_Addr::set (const ACE_FILE_Addr &sa) if (sa.get_type () == AF_ANY) this->filename_[0] = '\0'; else - (void) ACE_OS::strncpy ((void *) &this->filename_, - (void *) &sa.filename_, + (void) ACE_OS::strncpy (this->filename_, + sa.filename_, sa.get_size ()); return 0; } diff --git a/ace/LSOCK_Acceptor.h b/ace/LSOCK_Acceptor.h index 1284e96dc4d..1d2bb1935e0 100644 --- a/ace/LSOCK_Acceptor.h +++ b/ace/LSOCK_Acceptor.h @@ -23,6 +23,9 @@ #if !defined (ACE_LACKS_UNIX_DOMAIN_SOCKETS) +// Forward decl. +class ACE_Reactor; + class ACE_Export ACE_LSOCK_Acceptor : public ACE_SOCK_Acceptor { // = TITLE diff --git a/ace/Reactor.cpp b/ace/Reactor.cpp index 4c97be43438..2cb8b58391b 100644 --- a/ace/Reactor.cpp +++ b/ace/Reactor.cpp @@ -18,23 +18,35 @@ ACE_ALLOC_HOOK_DEFINE(ACE_Reactor) -ACE_Reactor::ACE_Reactor (ACE_Reactor_Impl *impl) +ACE_Reactor::ACE_Reactor (ACE_Reactor_Impl *impl, + int delete_implementation) : implementation_ (0), - delete_implementation_ (0) + delete_implementation_ (delete_implementation) { this->implementation (impl); if (this->implementation () == 0) { -#if !defined (ACE_WIN32) || !defined (ACE_HAS_WINSOCK2) || (ACE_HAS_WINSOCK2 == 0) || defined (ACE_USE_SELECT_REACTOR_FOR_REACTOR_IMPL) - ACE_NEW (impl, ACE_Select_Reactor); -#else /* We are on Win32 and we have winsock and ACE_USE_SELECT_REACTOR_FOR_REACTOR_IMPL is not defined */ +#if !defined (ACE_WIN32) + || !defined (ACE_HAS_WINSOCK2) + || (ACE_HAS_WINSOCK2 == 0) + || defined (ACE_USE_SELECT_REACTOR_FOR_REACTOR_IMPL) + ACE_NEW (impl, + ACE_Select_Reactor); +#else + // We are on Win32 and we have winsock and + // ACE_USE_SELECT_REACTOR_FOR_REACTOR_IMPL is not defined. #if defined (ACE_USE_MSG_WFMO_REACTOR_FOR_REACTOR_IMPL) - ACE_NEW (impl, ACE_Msg_WFMO_Reactor); + ACE_NEW (impl, + ACE_Msg_WFMO_Reactor); #else - ACE_NEW (impl, ACE_WFMO_Reactor); + ACE_NEW (impl, + ACE_WFMO_Reactor); #endif /* ACE_USE_MSG_WFMO_REACTOR_FOR_REACTOR_IMPL */ -#endif /* !defined (ACE_WIN32) || !defined (ACE_HAS_WINSOCK2) || (ACE_HAS_WINSOCK2 == 0) || defined (ACE_USE_SELECT_REACTOR_FOR_REACTOR_IMPL) */ +#endif /* !defined (ACE_WIN32) + || !defined (ACE_HAS_WINSOCK2) + || (ACE_HAS_WINSOCK2 == 0) + || defined (ACE_USE_SELECT_REACTOR_FOR_REACTOR_IMPL) */ this->implementation (impl); this->delete_implementation_ = 1; } @@ -77,15 +89,19 @@ ACE_Reactor::instance (void) } ACE_Reactor * -ACE_Reactor::instance (ACE_Reactor *r) +ACE_Reactor::instance (ACE_Reactor *r, + int delete_reactor) { ACE_TRACE ("ACE_Reactor::instance"); ACE_MT (ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex, ace_mon, *ACE_Static_Object_Lock::instance (), 0)); ACE_Reactor *t = ACE_Reactor::reactor_; - // We can't safely delete it since we don't know who created it! - ACE_Reactor::delete_reactor_ = 0; + if (delete_reactor != 0) + ACE_Reactor::delete_reactor_ = 1; + else + // We can't safely delete it since we don't know who created it! + ACE_Reactor::delete_reactor_ = 0; ACE_Reactor::reactor_ = r; return t; diff --git a/ace/Reactor.h b/ace/Reactor.h index 6f2aaf982e5..eafb5a1f436 100644 --- a/ace/Reactor.h +++ b/ace/Reactor.h @@ -61,9 +61,11 @@ public: static ACE_Reactor *instance (void); // Get pointer to a process-wide <ACE_Reactor>. - static ACE_Reactor *instance (ACE_Reactor *); + static ACE_Reactor *instance (ACE_Reactor *, + int delete_reactor = 0); // Set pointer to a process-wide <ACE_Reactor> and return existing - // pointer. + // pointer. If <delete_reactor> != 0 then we'll delete the Reactor + // at destruction time. static void close_singleton (void); // Delete the dynamically allocated Singleton @@ -94,8 +96,11 @@ public: // Resets the <ACE_Reactor::end_event_loop_> static so that the // <run_event_loop> method can be restarted. - ACE_Reactor (ACE_Reactor_Impl *implementation = 0); - // Create the Reactor using <implementation> + ACE_Reactor (ACE_Reactor_Impl *implementation = 0, + int delete_implementation = 0); + // Create the Reactor using <implementation>. The flag + // <delete_implementation> tells the Reactor whether or not to + // delete the <implementation> on destruction. virtual ~ACE_Reactor (void); // Close down and release all resources. @@ -420,6 +425,13 @@ public: virtual ACE_Reactor_Impl *implementation (void); // Get the implementation class + virtual int current_info (ACE_HANDLE handle, + size_t &msg_size); + // Returns 0, if the size of the current message has been put in + // <size> Returns -1, if not. ACE_HANDLE allows the reactor to + // check if the caller is valid. Used for CLASSIX Reactor + // implementation. + virtual int uses_event_associations (void); // Return 1 if we any event associations were made by the reactor // for the handles that it waits on, 0 otherwise. diff --git a/ace/Reactor.i b/ace/Reactor.i index d53bba16797..9bc9c833eac 100644 --- a/ace/Reactor.i +++ b/ace/Reactor.i @@ -16,6 +16,13 @@ ACE_Reactor::implementation (ACE_Reactor_Impl *impl) this->implementation_ = impl; } +ACE_INLINE int +ACE_Reactor::current_info (ACE_HANDLE handle, + size_t &size) +{ + return this->implementation ()->current_info (handle, size); +} + ACE_INLINE int ACE_Reactor::open (size_t size, int restart, diff --git a/ace/Reactor_Impl.h b/ace/Reactor_Impl.h index 1b34a10f468..3c094e8a815 100644 --- a/ace/Reactor_Impl.h +++ b/ace/Reactor_Impl.h @@ -17,8 +17,6 @@ #if !defined (ACE_REACTOR_IMPL_H) #define ACE_REACTOR_IMPL_H -class ACE_Handle_Set; - // Timer Queue is a complicated template class. A simple forward // declaration will not work #include "ace/Timer_Queue.h" @@ -31,6 +29,39 @@ class ACE_Handle_Set; // forward declaration will be useful here #include "ace/Signal.h" +// Forward decls +class ACE_Handle_Set; +class ACE_Reactor_Impl; + +class ACE_Export ACE_Reactor_Notify : public ACE_Event_Handler +{ + // = TITLE + // Unblock an <ACE_Reactor_Impl> from its event loop. +public: + // = Initialization and termination methods. + virtual int open (ACE_Reactor_Impl *, + ACE_Timer_Queue *timer_queue = 0, + int disable_notify = 0) = 0; + virtual int close (void) = 0; + + virtual ssize_t notify (ACE_Event_Handler * = 0, + ACE_Reactor_Mask = ACE_Event_Handler::EXCEPT_MASK, + ACE_Time_Value * = 0) = 0; + // Called by a thread when it wants to unblock the <Reactor_Impl>. + // This wakeups the <Reactor_Impl> if currently blocked. Pass over + // both the <Event_Handler> *and* the <mask> to allow the caller to + // dictate which <Event_Handler> method the <Reactor_Impl> will + // invoke. The <ACE_Time_Value> indicates how long to blocking + // trying to notify the <Reactor_Impl>. If <timeout> == 0, the + // caller will block until action is possible, else will wait until + // the relative time specified in *<timeout> elapses). + + virtual int dispatch_notifications (int &number_of_active_handles, + const ACE_Handle_Set &rd_mask) = 0; + // Handles pending threads (if any) that are waiting to unblock the + // <Reactor_Impl>. +}; + class ACE_Export ACE_Reactor_Impl { // = TITLE @@ -46,6 +77,11 @@ public: int disable_notify_pipe = 0) = 0; // Initialization. + virtual int current_info (ACE_HANDLE, size_t & /* size */) = 0; + // Returns 0, if the size of the current message has been put in + // <size> Returns -1, if not. ACE_HANDLE allows the reactor to + // check if the caller is valid. + virtual int set_sig_handler (ACE_Sig_Handler *signal_handler) = 0; // Use a user specified signal handler instead. diff --git a/ace/Select_Reactor.cpp b/ace/Select_Reactor.cpp index 9ca17320f5a..6dfdec76b06 100644 --- a/ace/Select_Reactor.cpp +++ b/ace/Select_Reactor.cpp @@ -357,9 +357,9 @@ ACE_Select_Reactor_Handler_Repository::unbind (ACE_HANDLE handle, } ACE_Select_Reactor_Handler_Repository_Iterator::ACE_Select_Reactor_Handler_Repository_Iterator -(const ACE_Select_Reactor_Handler_Repository *s) - : rep_ (s), - current_ (-1) + (const ACE_Select_Reactor_Handler_Repository *s) + : rep_ (s), + current_ (-1) { this->advance (); } @@ -639,14 +639,21 @@ ACE_Select_Reactor_Notify::dump (void) const } int -ACE_Select_Reactor_Notify::open (ACE_Select_Reactor *r, +ACE_Select_Reactor_Notify::open (ACE_Reactor_Impl *r, + ACE_Timer_Queue *, int disable_notify_pipe) { ACE_TRACE ("ACE_Select_Reactor_Notify::open"); if (disable_notify_pipe == 0) { - this->select_reactor_ = r; + this->select_reactor_ = ACE_dynamic_cast (ACE_Select_Reactor, r); + + if (select_reactor_ == 0) + { + errno = EINVAL; + return -1; + } if (this->notification_pipe_.open () == -1) return -1; @@ -822,7 +829,7 @@ ACE_Select_Reactor::notify (ACE_Event_Handler *eh, // caller to dictate which Event_Handler method the receiver // invokes. Note that this call can timeout. - n = this->notify_handler_.notify (eh, mask, timeout); + n = this->notify_handler_->notify (eh, mask, timeout); return n == -1 ? -1 : 0; } @@ -960,7 +967,8 @@ ACE_Select_Reactor::open (size_t size, int restart, ACE_Sig_Handler *sh, ACE_Timer_Queue *tq, - int disable_notify_pipe) + int disable_notify_pipe, + ACE_Select_Reactor_Notify *notify) { ACE_TRACE ("ACE_Select_Reactor::open"); ACE_MT (ACE_GUARD_RETURN (ACE_SELECT_REACTOR_MUTEX, ace_mon, this->token_, -1)); @@ -973,13 +981,16 @@ ACE_Select_Reactor::open (size_t size, this->restart_ = restart; this->signal_handler_ = sh; this->timer_queue_ = tq; + this->notify_handler_ = notify; int result = 0; // Allows the signal handler to be overridden. if (this->signal_handler_ == 0) { - this->signal_handler_ = new ACE_Sig_Handler; + ACE_NEW_RETURN (this->signal_handler_, + ACE_Sig_Handler, + -1); if (this->signal_handler_ == 0) result = -1; @@ -987,9 +998,12 @@ ACE_Select_Reactor::open (size_t size, this->delete_signal_handler_ = 1; } + // Allows the timer queue to be overridden. if (result != -1 && this->timer_queue_ == 0) { - this->timer_queue_ = new ACE_Timer_Heap; + ACE_NEW_RETURN (this->timer_queue_, + ACE_Timer_Heap, + -1); if (this->timer_queue_ == 0) result = -1; @@ -997,9 +1011,24 @@ ACE_Select_Reactor::open (size_t size, this->delete_timer_queue_ = 1; } + // Allows the Notify_Handler to be overridden. + if (result != -1 && this->notify_handler_ == 0) + { + ACE_NEW_RETURN (this->notify_handler_, + ACE_Select_Reactor_Notify, + -1); + + if (this->notify_handler_ == 0) + result = -1; + else + this->delete_notify_handler_ = 1; + } + if (result != -1 && this->handler_rep_.open (size) == -1) result = -1; - else if (this->notify_handler_.open (this, disable_notify_pipe) == -1) + else if (this->notify_handler_->open (this, + 0, + disable_notify_pipe) == -1) result = -1; if (result != -1) @@ -1034,11 +1063,13 @@ ACE_Select_Reactor::set_timer_queue (ACE_Timer_Queue *timer_queue) ACE_Select_Reactor::ACE_Select_Reactor (ACE_Sig_Handler *sh, ACE_Timer_Queue *tq, - int disable_notify_pipe) + int disable_notify_pipe, + ACE_Select_Reactor_Notify *notify) : handler_rep_ (*this), timer_queue_ (0), delete_timer_queue_ (0), delete_signal_handler_ (0), + delete_notify_handler_ (0), requeue_position_ (-1), // Requeue at end of waiters by default. max_notify_iterations_ (-1), initialized_ (0), @@ -1054,10 +1085,12 @@ ACE_Select_Reactor::ACE_Select_Reactor (ACE_Sig_Handler *sh, 0, sh, tq, - disable_notify_pipe) == -1) + disable_notify_pipe, + notify) == -1) ACE_ERROR ((LM_ERROR, ASYS_TEXT ("%p\n"), - ASYS_TEXT ("ACE_Select_Reactor::open failed inside ACE_Select_Reactor::CTOR"))); + ASYS_TEXT ("ACE_Select_Reactor::open") + ASYS_TEXT ("failed inside ACE_Select_Reactor::CTOR"))); } // Initialize ACE_Select_Reactor. @@ -1066,11 +1099,13 @@ ACE_Select_Reactor::ACE_Select_Reactor (size_t size, int rs, ACE_Sig_Handler *sh, ACE_Timer_Queue *tq, - int disable_notify_pipe) + int disable_notify_pipe, + ACE_Select_Reactor_Notify *notify) : handler_rep_ (*this), timer_queue_ (0), delete_timer_queue_ (0), delete_signal_handler_ (0), + delete_notify_handler_ (0), requeue_position_ (-1), // Requeue at end of waiters by default. max_notify_iterations_ (-1), initialized_ (0), @@ -1078,7 +1113,7 @@ ACE_Select_Reactor::ACE_Select_Reactor (size_t size, #if defined (ACE_MT_SAFE) && (ACE_MT_SAFE != 0) token_ (*this), #endif /* ACE_MT_SAFE */ - lock_adapter_ (token_) + lock_adapter_ (token_), { ACE_TRACE ("ACE_Select_Reactor::ACE_Select_Reactor"); @@ -1086,10 +1121,12 @@ ACE_Select_Reactor::ACE_Select_Reactor (size_t size, rs, sh, tq, - disable_notify_pipe) == -1) + disable_notify_pipe, + notify) == -1) ACE_ERROR ((LM_ERROR, ASYS_TEXT ("%p\n"), - ASYS_TEXT ("ACE_Select_Reactor::open failed inside ACE_Select_Reactor::CTOR"))); + ASYS_TEXT ("ACE_Select_Reactor::open") + ASYS_TEXT ("failed inside ACE_Select_Reactor::CTOR"))); } // Close down the ACE_Select_Reactor instance, detaching any remaining @@ -1103,21 +1140,41 @@ ACE_Select_Reactor::close (void) ACE_MT (ACE_GUARD_RETURN (ACE_SELECT_REACTOR_MUTEX, ace_mon, this->token_, -1)); if (this->delete_signal_handler_) - delete this->signal_handler_; - this->signal_handler_ = 0; + { + delete this->signal_handler_; + this->signal_handler_ = 0; + this->delete_signal_handler_ = 0; + } this->handler_rep_.close (); if (this->delete_timer_queue_) - delete this->timer_queue_; - this->timer_queue_ = 0; + { + delete this->timer_queue_; + this->timer_queue_ = 0; + this->delete_timer_queue_ = 0; + } + + this->notify_handler_->close (); + + if (this->delete_notify_handler_) + { + delete this->notify_handler_; + this->notify_handler_ = 0; + this->delete_notify_handler_ = 0; + } - this->notify_handler_.close (); this->initialized_ = 0; return 0; } +int +ACE_Select_Reactor::current_info (ACE_HANDLE, size_t &) +{ + return -1; +} + ACE_Select_Reactor::~ACE_Select_Reactor (void) { ACE_TRACE ("ACE_Select_Reactor::~ACE_Select_Reactor"); @@ -1557,8 +1614,8 @@ ACE_Select_Reactor::dispatch_notification_handlers (int &number_of_active_handle // threads and then break out to continue the event loop. int number_dispatched = - this->notify_handler_.dispatch_notifications (number_of_active_handles, - dispatch_set.rd_mask_); + this->notify_handler_->dispatch_notifications (number_of_active_handles, + dispatch_set.rd_mask_); return this->state_changed_ ? -1 : number_dispatched; #else ACE_UNUSED_ARG (number_of_active_handles); @@ -1883,7 +1940,7 @@ ACE_Select_Reactor::dump (void) const ACE_DEBUG ((LM_DEBUG, ASYS_TEXT ("\nowner_ = %d\n"), this->owner_)); #if defined (ACE_MT_SAFE) && (ACE_MT_SAFE != 0) - this->notify_handler_.dump (); + this->notify_handler_->dump (); this->token_.dump (); #endif /* ACE_MT_SAFE */ diff --git a/ace/Select_Reactor.h b/ace/Select_Reactor.h index 2674189978d..71b556f0c55 100644 --- a/ace/Select_Reactor.h +++ b/ace/Select_Reactor.h @@ -157,10 +157,7 @@ public: ACE_Event_Handler* event_handler_; }; -// The following two classes have to be moved out here to keep the SGI -// C++ compiler happy (it doesn't like nested classes). - -class ACE_Export ACE_Select_Reactor_Notify : public ACE_Event_Handler +class ACE_Export ACE_Select_Reactor_Notify : public ACE_Reactor_Notify { // = TITLE // Unblock the <ACE_Select_Reactor> from its event loop. @@ -181,29 +178,35 @@ public: // Default dtor. // = Initialization and termination methods. - int open (ACE_Select_Reactor *, int disable_notify_pipe); - int close (void); + virtual int open (ACE_Reactor_Impl *, + ACE_Timer_Queue * = 0, + int disable_notify_pipe = 0); + // Initialize. - int dispatch_notifications (int &number_of_active_handles, - const ACE_Handle_Set &rd_mask); + virtual int close (void); + // Destroy. + + virtual ssize_t notify (ACE_Event_Handler * = 0, + ACE_Reactor_Mask = ACE_Event_Handler::EXCEPT_MASK, + ACE_Time_Value * = 0); + // Called by a thread when it wants to unblock the + // <ACE_Select_Reactor>. This wakeups the <ACE_Select_Reactor> if + // currently blocked in select()/poll(). Pass over both the + // <Event_Handler> *and* the <mask> to allow the caller to dictate + // which <Event_Handler> method the <ACE_Select_Reactor> will + // invoke. The <ACE_Time_Value> indicates how long to blocking + // trying to notify the <ACE_Select_Reactor>. If <timeout> == 0, + // the caller will block until action is possible, else will wait + // until the relative time specified in *<timeout> elapses). + + virtual int dispatch_notifications (int &number_of_active_handles, + const ACE_Handle_Set &rd_mask); // Handles pending threads (if any) that are waiting to unblock the - // Select_Reactor. - - ssize_t notify (ACE_Event_Handler * = 0, - ACE_Reactor_Mask = ACE_Event_Handler::EXCEPT_MASK, - ACE_Time_Value * = 0); - // Called by a thread when it wants to unblock the Select_Reactor. - // This wakeups the <ACE_Select_Reactor> if currently blocked in - // select()/poll(). Pass over both the <Event_Handler> *and* the - // <mask> to allow the caller to dictate which <Event_Handler> - // method the <Select_Reactor> will invoke. The <ACE_Time_Value> - // indicates how long to blocking trying to notify the - // <Select_Reactor>. If <timeout> == 0, the caller will block until - // action is possible, else will wait until the relative time - // specified in *<timeout> elapses). + // <ACE_Select_Reactor>. virtual int handle_input (ACE_HANDLE handle); - // Called back by the Select_Reactor when a thread wants to unblock us. + // Called back by the <ACE_Select_Reactor> when a thread wants to + // unblock us. void dump (void) const; // Dump the state of an object. @@ -386,14 +389,16 @@ public: ACE_Select_Reactor (ACE_Sig_Handler * = 0, ACE_Timer_Queue * = 0, - int disable_notify_pipe = 0); + int disable_notify_pipe = 0, + ACE_Select_Reactor *notify = 0); // Initialize <ACE_Select_Reactor> with the default size. ACE_Select_Reactor (size_t size, int restart = 0, ACE_Sig_Handler * = 0, ACE_Timer_Queue * = 0, - int disable_notify_pipe = 0); + int disable_notify_pipe = 0, + ACE_Select_Reactor *notify = 0); // Initialize <ACE_Select_Reactor> with size <size>. virtual int open (size_t size = DEFAULT_SIZE, @@ -403,6 +408,9 @@ public: int disable_notify_pipe = 0); // Initialize <ACE_Select_Reactor> with size <size>. + virtual int current_info (ACE_HANDLE, size_t & /* size */); + // Returns -1 (not used in this implementation); + virtual int set_sig_handler (ACE_Sig_Handler *signal_handler); // Use a user specified signal handler instead. @@ -856,6 +864,14 @@ protected: // Keeps track of whether we should delete the signal handler (if we // didn't create it, then we don't delete it). + ACE_Select_Reactor_Notify *notify_handler_; + // Callback object that unblocks the ACE_Select_Reactor if it's + // sleeping. + + int delete_notify_handler_; + // Keeps track of whether we need to delete the notify handler (if + // we didn't create it, then we don't delete it). + ACE_Select_Reactor_Handle_Set wait_set_; // Tracks handles that are waited for by select(). @@ -901,10 +917,6 @@ protected: ACE_Lock_Adapter<ACE_Select_Reactor_Token> lock_adapter_; // Adapter used to return internal lock to outside world. - ACE_Select_Reactor_Notify notify_handler_; - // Callback object that unblocks the ACE_Select_Reactor if it's - // sleeping. - void renew (void); // Enqueue ourselves into the list of waiting threads at the // appropriate point specified by <requeue_position_>. diff --git a/ace/WFMO_Reactor.cpp b/ace/WFMO_Reactor.cpp index 78bc3813990..74637f63144 100644 --- a/ace/WFMO_Reactor.cpp +++ b/ace/WFMO_Reactor.cpp @@ -25,10 +25,18 @@ int ACE_WFMO_Reactor_Handler_Repository::open (size_t size) { // Dynamic allocation - ACE_NEW_RETURN (this->current_handles_, ACE_HANDLE[size], -1); - ACE_NEW_RETURN (this->current_info_, Current_Info[size], -1); - ACE_NEW_RETURN (this->current_suspended_info_, Suspended_Info[size], -1); - ACE_NEW_RETURN (this->to_be_added_info_, To_Be_Added_Info[size], -1); + ACE_NEW_RETURN (this->current_handles_, + ACE_HANDLE[size], + -1); + ACE_NEW_RETURN (this->current_info_, + Current_Info[size], + -1); + ACE_NEW_RETURN (this->current_suspended_info_, + Suspended_Info[size], + -1); + ACE_NEW_RETURN (this->to_be_added_info_, + To_Be_Added_Info[size], + -1); // Initialization this->max_size_ = size; @@ -58,28 +66,35 @@ void ACE_WFMO_Reactor_Handler_Repository::remove_network_events_i (long &existing_masks, ACE_Reactor_Mask to_be_removed_masks) { - if (ACE_BIT_ENABLED (to_be_removed_masks, ACE_Event_Handler::READ_MASK)) + if (ACE_BIT_ENABLED (to_be_removed_masks, + ACE_Event_Handler::READ_MASK)) { ACE_CLR_BITS (existing_masks, FD_READ); ACE_CLR_BITS (existing_masks, FD_CLOSE); } - if (ACE_BIT_ENABLED (to_be_removed_masks, ACE_Event_Handler::WRITE_MASK)) + if (ACE_BIT_ENABLED (to_be_removed_masks, + ACE_Event_Handler::WRITE_MASK)) ACE_CLR_BITS (existing_masks, FD_WRITE); - if (ACE_BIT_ENABLED (to_be_removed_masks, ACE_Event_Handler::EXCEPT_MASK)) + if (ACE_BIT_ENABLED (to_be_removed_masks, + ACE_Event_Handler::EXCEPT_MASK)) ACE_CLR_BITS (existing_masks, FD_OOB); - if (ACE_BIT_ENABLED (to_be_removed_masks, ACE_Event_Handler::ACCEPT_MASK)) + if (ACE_BIT_ENABLED (to_be_removed_masks, + ACE_Event_Handler::ACCEPT_MASK)) ACE_CLR_BITS (existing_masks, FD_ACCEPT); - if (ACE_BIT_ENABLED (to_be_removed_masks, ACE_Event_Handler::CONNECT_MASK)) + if (ACE_BIT_ENABLED (to_be_removed_masks, + ACE_Event_Handler::CONNECT_MASK)) ACE_CLR_BITS (existing_masks, FD_CONNECT); - if (ACE_BIT_ENABLED (to_be_removed_masks, ACE_Event_Handler::QOS_MASK)) + if (ACE_BIT_ENABLED (to_be_removed_masks, + ACE_Event_Handler::QOS_MASK)) ACE_CLR_BITS (existing_masks, FD_QOS); - if (ACE_BIT_ENABLED (to_be_removed_masks, ACE_Event_Handler::GROUP_QOS_MASK)) + if (ACE_BIT_ENABLED (to_be_removed_masks, + ACE_Event_Handler::GROUP_QOS_MASK)) ACE_CLR_BITS (existing_masks, FD_GROUP_QOS); } @@ -765,35 +780,54 @@ ACE_WFMO_Reactor_Handler_Repository::dump (void) const ASYS_TEXT ("Max size = %d\n"), this->max_size_)); - ACE_DEBUG ((LM_DEBUG, ASYS_TEXT ("Current info table\n\n"))); - ACE_DEBUG ((LM_DEBUG, ASYS_TEXT ("\tSize = %d\n"), this->max_handlep1_)); - ACE_DEBUG ((LM_DEBUG, ASYS_TEXT ("\tHandles to be suspended = %d\n"), this->handles_to_be_suspended_)); + ACE_DEBUG ((LM_DEBUG, + ASYS_TEXT ("Current info table\n\n"))); + ACE_DEBUG ((LM_DEBUG, + ASYS_TEXT ("\tSize = %d\n"), + this->max_handlep1_)); + ACE_DEBUG ((LM_DEBUG, + ASYS_TEXT ("\tHandles to be suspended = %d\n"), + this->handles_to_be_suspended_)); + for (i = 0; i < this->max_handlep1_; i++) - { - this->current_info_[i].dump (this->current_handles_[i]); - } - ACE_DEBUG ((LM_DEBUG, ASYS_TEXT ("\n"))); + this->current_info_[i].dump (this->current_handles_[i]); + + ACE_DEBUG ((LM_DEBUG, + ASYS_TEXT ("\n"))); + + ACE_DEBUG ((LM_DEBUG, + ASYS_TEXT ("To-be-added info table\n\n"))); + ACE_DEBUG ((LM_DEBUG, + ASYS_TEXT ("\tSize = %d\n"), + this->handles_to_be_added_)); - ACE_DEBUG ((LM_DEBUG, ASYS_TEXT ("To-be-added info table\n\n"))); - ACE_DEBUG ((LM_DEBUG, ASYS_TEXT ("\tSize = %d\n"), this->handles_to_be_added_)); for (i = 0; i < this->handles_to_be_added_; i++) - { - this->to_be_added_info_[i].dump (); - } - ACE_DEBUG ((LM_DEBUG, ASYS_TEXT ("\n"))); + this->to_be_added_info_[i].dump (); + + ACE_DEBUG ((LM_DEBUG, + ASYS_TEXT ("\n"))); + + ACE_DEBUG ((LM_DEBUG, + ASYS_TEXT ("Suspended info table\n\n"))); + ACE_DEBUG ((LM_DEBUG, + ASYS_TEXT ("\tSize = %d\n"), + this->suspended_handles_)); + ACE_DEBUG ((LM_DEBUG, + ASYS_TEXT ("\tHandles to be resumed = %d\n"), + this->handles_to_be_resumed_)); - ACE_DEBUG ((LM_DEBUG, ASYS_TEXT ("Suspended info table\n\n"))); - ACE_DEBUG ((LM_DEBUG, ASYS_TEXT ("\tSize = %d\n"), this->suspended_handles_)); - ACE_DEBUG ((LM_DEBUG, ASYS_TEXT ("\tHandles to be resumed = %d\n"), this->handles_to_be_resumed_)); for (i = 0; i < this->suspended_handles_; i++) - { - this->current_suspended_info_[i].dump (); - } - ACE_DEBUG ((LM_DEBUG, ASYS_TEXT ("\n"))); + this->current_suspended_info_[i].dump (); - ACE_DEBUG ((LM_DEBUG, ASYS_TEXT ("Total handles to be deleted = %d\n"), this->handles_to_be_deleted_)); + ACE_DEBUG ((LM_DEBUG, + ASYS_TEXT ("\n"))); - ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP)); + ACE_DEBUG ((LM_DEBUG, + ASYS_TEXT ("Total handles to be deleted = %d\n"), + this->handles_to_be_deleted_)); + + ACE_DEBUG ((LM_DEBUG, + ACE_END_DUMP)); } /************************************************************/ @@ -806,6 +840,7 @@ ACE_WFMO_Reactor::ACE_WFMO_Reactor (ACE_Sig_Handler *sh, delete_timer_queue_ (0), handler_rep_ (*this), delete_handler_rep_ (0), + delete_notify_handler_ (0), lock_adapter_ (lock_), // this event is initially signaled ok_to_wait_ (1), @@ -833,6 +868,7 @@ ACE_WFMO_Reactor::ACE_WFMO_Reactor (size_t size, delete_timer_queue_ (0), handler_rep_ (*this), delete_handler_rep_ (0), + delete_notify_handler_ (0), lock_adapter_ (lock_), // this event is initially signaled ok_to_wait_ (1), @@ -853,11 +889,18 @@ ACE_WFMO_Reactor::ACE_WFMO_Reactor (size_t size, } int +ACE_WFMO_Reactor::current_info (ACE_HANDLE, size_t &) +{ + return -1; +} + +int ACE_WFMO_Reactor::open (size_t size, int unused, ACE_Sig_Handler *sh, ACE_Timer_Queue *tq, - int disable_notify_pipe) + int disable_notify_pipe, + ACE_Reactor_Notify *notify) { ACE_UNUSED_ARG (unused); ACE_UNUSED_ARG (sh); @@ -907,8 +950,8 @@ ACE_WFMO_Reactor::open (size_t size, if (this->delete_handler_rep_) this->handler_rep_.~ACE_WFMO_Reactor_Handler_Repository (); - // Open the handle repository - // Two additional handles for internal purposes + // Open the handle repository. Two additional handles for internal + // purposes if (this->handler_rep_.open (size + 2) == -1) ACE_ERROR_RETURN ((LM_ERROR, ASYS_TEXT ("%p\n"), ASYS_TEXT ("opening handler repository")), @@ -916,19 +959,35 @@ ACE_WFMO_Reactor::open (size_t size, else this->delete_handler_rep_ = 1; + this->notify_handler_ = notify; + + if (this->notify_handler_ == 0) + { + ACE_NEW_RETURN (this->notify_handler_, + ACE_WMFO_Reactor_Notify, + -1); + + if (this->notify_handler_ == 0) + result = -1; + else + this->delete_notify_handler_ = 1; + } + /* NOTE */ // The order of the following two registrations is very important // Open the notification handler - if (this->notify_handler_.open (*this, this->timer_queue_) == -1) - ACE_ERROR_RETURN ((LM_ERROR, ASYS_TEXT ("%p\n"), + if (this->notify_handler_->open (this, this->timer_queue_) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + ASYS_TEXT ("%p\n"), ASYS_TEXT ("opening notify handler ")), -1); // Register for <wakeup_all_threads> event if (this->register_handler (&this->wakeup_all_threads_handler_, this->wakeup_all_threads_.handle ()) == -1) - ACE_ERROR_RETURN ((LM_ERROR, ASYS_TEXT ("%p\n"), + ACE_ERROR_RETURN ((LM_ERROR, + ASYS_TEXT ("%p\n"), ASYS_TEXT ("registering thread wakeup handler")), -1); @@ -996,7 +1055,7 @@ ACE_WFMO_Reactor::~ACE_WFMO_Reactor (void) this->close (); // Make necessary changes to the handler repository that we caused - // by close () + // by <close>. this->handler_rep_.make_changes (); if (this->delete_timer_queue_) @@ -1012,6 +1071,13 @@ ACE_WFMO_Reactor::~ACE_WFMO_Reactor (void) this->signal_handler_ = 0; this->delete_signal_handler_ = 0; } + + if (this->delete_notify_handler_) + { + delete this->notify_handler_; + this->notify_handler_ = 0; + this->delete_notify_handler_ = 0; + } } int @@ -1023,6 +1089,7 @@ ACE_WFMO_Reactor::register_handler_i (ACE_HANDLE event_handle, // Make sure that the <handle> is valid if (io_handle == ACE_INVALID_HANDLE) io_handle = event_handler->get_handle (); + if (this->handler_rep_.invalid_handle (io_handle)) return -1; @@ -1051,30 +1118,25 @@ ACE_WFMO_Reactor::register_handler_i (ACE_HANDLE event_handle, int result = ::WSAEventSelect ((SOCKET) io_handle, event_handle, new_network_events); - // If we had found the <Event_Handler> there is nothing more to do if (found) return result; - - else + else if (result != SOCKET_ERROR && + this->handler_rep_.bind_i (1, + event_handler, + new_network_events, + io_handle, + event_handle, + delete_event) != -1) { - // The <Event_Handler was not found in the repository - // Add to the repository - if (result != SOCKET_ERROR && - this->handler_rep_.bind_i (1, - event_handler, - new_network_events, - io_handle, - event_handle, - delete_event) != -1) - { - if (delete_event) - event->handle (ACE_INVALID_HANDLE); - return 0; - } - else - return -1; + // The <Event_Handler was not found in the repository Add to the + // repository. + if (delete_event) + event->handle (ACE_INVALID_HANDLE); + return 0; } + else + return -1; } int @@ -1096,7 +1158,6 @@ ACE_WFMO_Reactor::schedule_wakeup_i (ACE_HANDLE io_handle, new_network_events, event_handle, delete_event); - if (found) return ::WSAEventSelect ((SOCKET) io_handle, event_handle, @@ -1640,6 +1701,19 @@ ACE_WFMO_Reactor::dump (void) const // ************************************************************ +int +ACE_WFMO_Reactor_Notify::dispatch_notifications (int &number_of_active_handles, + const ACE_Handle_Set &rd_mask) +{ + return -1; +} + +int +ACE_WFMO_Reactor_Notify::close (void) +{ + return -1; +} + ACE_WFMO_Reactor_Notify::ACE_WFMO_Reactor_Notify (void) : max_notify_iterations_ (-1), timer_queue_ (0) @@ -1647,11 +1721,12 @@ ACE_WFMO_Reactor_Notify::ACE_WFMO_Reactor_Notify (void) } int -ACE_WFMO_Reactor_Notify::open (ACE_WFMO_Reactor &wfmo_reactor, - ACE_Timer_Queue *timer_queue) +ACE_WFMO_Reactor_Notify::open (ACE_Reactor_Impl *wfmo_reactor, + ACE_Timer_Queue *timer_queue, + int ignore_notify) { timer_queue_ = timer_queue; - return wfmo_reactor.register_handler (this); + return wfmo_reactor->register_handler (this); } ACE_HANDLE @@ -1675,7 +1750,8 @@ ACE_WFMO_Reactor_Notify::handle_signal (int signum, // This will get called when <WFMO_Reactor->wakeup_one_thread_> event // is signaled. - // ACE_DEBUG ((LM_DEBUG, ASYS_TEXT ("(%t) waking up to handle internal notifications\n"))); + // ACE_DEBUG ((LM_DEBUG, + // ASYS_TEXT ("(%t) waking up to handle internal notifications\n"))); for (int i = 1; ; i++) { @@ -1806,7 +1882,7 @@ ACE_WFMO_Reactor::max_notify_iterations (int iterations) ACE_GUARD (ACE_Process_Mutex, monitor, this->lock_); // Must always be > 0 or < 0 to optimize the loop exit condition. - this->notify_handler_.max_notify_iterations (iterations); + this->notify_handler_->max_notify_iterations (iterations); } int @@ -1815,7 +1891,7 @@ ACE_WFMO_Reactor::max_notify_iterations (void) ACE_TRACE ("ACE_WFMO_Reactor::max_notify_iterations"); ACE_GUARD_RETURN (ACE_Process_Mutex, monitor, this->lock_, -1); - return this->notify_handler_.max_notify_iterations (); + return this->notify_handler_->max_notify_iterations (); } // No-op WinSOCK2 methods to help WFMO_Reactor compile diff --git a/ace/WFMO_Reactor.h b/ace/WFMO_Reactor.h index 2b793f7e6fc..8115f5170e8 100644 --- a/ace/WFMO_Reactor.h +++ b/ace/WFMO_Reactor.h @@ -382,12 +382,10 @@ protected: class ACE_Export ACE_WFMO_Reactor_Notify : public ACE_Event_Handler { // = TITLE - // // Unblock the <ACE_WFMO_Reactor> from its event loop, passing // it an optional <ACE_Event_Handler> to dispatch. // // = DESCRIPTION - // // This implementation is necessary for cases where the // <ACE_WFMO_Reactor> is run in a multi-threaded program. In // this case, we need to be able to unblock @@ -396,26 +394,33 @@ class ACE_Export ACE_WFMO_Reactor_Notify : public ACE_Event_Handler // auto-reset event the <ACE_WFMO_Reactor> is listening on. If // an <ACE_Event_Handler> and <ACE_Reactor_Mask> is passed to // <notify>, the appropriate <handle_*> method is dispatched. - // public: ACE_WFMO_Reactor_Notify (void); // Constructor - int open (ACE_WFMO_Reactor &wfmo_reactor, - ACE_Timer_Queue *timer_queue); - // Initialization. <timer_queue> is stored to call gettimeofday. + virtual int open (ACE_Reactor_Impl *wfmo_reactor, + ACE_Timer_Queue *timer_queue, + int disable_notify = 0); + // Initialization. <timer_queue> is stored to call <gettimeofday>. + + virtual int close (void); + // No-op. - int notify (ACE_Event_Handler *event_handler = 0, - ACE_Reactor_Mask mask = ACE_Event_Handler::EXCEPT_MASK, - ACE_Time_Value *timeout = 0); - // Special trick to unblock WaitForMultipleObjects() when updates + ssize_t notify (ACE_Event_Handler *event_handler = 0, + ACE_Reactor_Mask mask = ACE_Event_Handler::EXCEPT_MASK, + ACE_Time_Value *timeout = 0); + // Special trick to unblock <WaitForMultipleObjects> when updates // occur. All we do is enqueue <event_handler> and <mask> onto the - // <ACE_Message_Queue> and wakeup the WFMO_Reactor by signaling its - // <ACE_Event> handle. The <ACE_Time_Value> indicates how long to - // blocking trying to notify the <WFMO_Reactor>. If <timeout> == 0, - // the caller will block until action is possible, else will wait + // <ACE_Message_Queue> and wakeup the <WFMO_Reactor> by signaling + // its <ACE_Event> handle. The <ACE_Time_Value> indicates how long + // to blocking trying to notify the <WFMO_Reactor>. If <timeout> == + // 0, the caller will block until action is possible, else will wait // until the relative time specified in <timeout> elapses). + virtual int dispatch_notifications (int &number_of_active_handles, + const ACE_Handle_Set &rd_mask); + // No-op. + virtual ACE_HANDLE get_handle (void) const; // Returns a handle to the <ACE_Auto_Event>. @@ -471,12 +476,10 @@ private: class ACE_Export ACE_WFMO_Reactor : public ACE_Reactor_Impl { // = TITLE - // // An object oriented event demultiplexor and event handler // WFMO_Reactor for Win32 WaitForMultipleObjects // // = DESCRIPTION - // // The ACE_WFMO_Reactor is an object-oriented event // demultiplexor and event handler Reactor. The sources of // events that the ACE_WFMO_Reactor waits for and dispatches @@ -496,7 +499,6 @@ class ACE_Export ACE_WFMO_Reactor : public ACE_Reactor_Impl // handler->handle_close(), use the DONT_CALL flag with // remove_handler(). Or else, dynamically allocate the handler, // and then call "delete this" inside handler->handle_close(). - // public: friend class ACE_WFMO_Reactor_Handler_Repository; friend class ACE_WFMO_Reactor_Test; @@ -528,11 +530,15 @@ public: int restart = 0, ACE_Sig_Handler * = 0, ACE_Timer_Queue * = 0, - int disable_notify_pipe = 0); + int disable_notify_pipe = 0, + ACE_Reactor_Notify * = 0); // Initialize <ACE_WFMO_Reactor> with size <size>. Two slots will // be added to the <size> parameter which will store handles used // for internal management purposes. + virtual int current_info (ACE_HANDLE, size_t & /* size */); + // Returns -1 (not used in this implementation); + virtual int set_sig_handler (ACE_Sig_Handler *signal_handler); // Use a user specified signal handler instead. @@ -975,6 +981,12 @@ protected: int delete_handler_rep_; // Keeps track of whether we should delete the handler repository + ACE_WFMO_Reactor_Notify *notify_handler_; + // Used when <notify> is called. + + int delete_notify_handler_; + // Keeps track of whether we should delete the notify handler. + ACE_Process_Mutex lock_; // Synchronization for the ACE_WFMO_Reactor. // @@ -990,9 +1002,6 @@ protected: ACE_WFMO_Reactor_Handler_Repository handler_rep_; // Table that maps <ACE_HANDLEs> to <ACE_Event_Handler *>'s. - ACE_WFMO_Reactor_Notify notify_handler_; - // Used when <notify> is called. - ACE_Manual_Event ok_to_wait_; // A manual event used to block threads from proceeding into // WaitForMultipleObjects @@ -1040,7 +1049,7 @@ private: // Deny access since member-wise won't work... }; -// if we don't have WinSOCK2, we need these defined +// If we don't have WinSOCK2, we need these defined #if !defined (ACE_HAS_WINSOCK2) || (ACE_HAS_WINSOCK2 == 0) /* * WinSock 2 extension -- bit values and indices for FD_XXX network events |