diff options
-rw-r--r-- | ChangeLog-97a | 31 | ||||
-rw-r--r-- | ace/Asynch_IO.cpp | 76 | ||||
-rw-r--r-- | ace/Asynch_IO.h | 30 | ||||
-rw-r--r-- | ace/Proactor.cpp | 57 | ||||
-rw-r--r-- | ace/Proactor.h | 31 |
5 files changed, 175 insertions, 50 deletions
diff --git a/ChangeLog-97a b/ChangeLog-97a index 17a7c6eef0b..de493322aea 100644 --- a/ChangeLog-97a +++ b/ChangeLog-97a @@ -1,3 +1,34 @@ +Sun Apr 27 22:22:14 1997 <irfan@TWOSTEP> + + * ace/Proactor: The Proactor can now be registered with ReactorEx + and both of them can be run from ReactorEx's event loop. Added a + flag to Proactor's constructor that indicates whether the + Proactor will be used in conjunction with ReactorEx event + loop. Only if this flag is set will the event in the Proactor be + used by the Asynch IO components. This will help with + performance. + + * examples/Reactor/Proactor/test_multiple_loops.cpp: Added a new + test that shows the integration of the event loops of Proactor + and ReactorEx. + + * ace/Asynch_IO: Added an ACE_EVENT parameter to the constructors + of classes that inherit from the OVERLAPPED structure. This way + the Proactor's event_ can be set in the OVERLAPPED structure. + + Also changed the open methods on the Asynch IO classes to take a + Proactor as an extra parameter. + + Changed ACE_Handler's handle_timeout() to handle_time_out() in + ACE_Handler. This is temporary till we decide on the argument + about mixing the interface of ACE_Event_Handler and + ACE_Handler. This change allows user to inherit from ACE_Handler + and ACE_Event_Handler and use the different handle_timeout() + method, one of which returns void and the other return an int. + + * examples/Reactor/Proactor/test_timeout.cpp: This file got + affected by the above change. + Sun Apr 27 17:44:28 1997 Douglas C. Schmidt <schmidt@flamenco.cs.wustl.edu> * ace/Synch_T.h: Added a new macro ACE_SYNCH_RW_MUTEX to diff --git a/ace/Asynch_IO.cpp b/ace/Asynch_IO.cpp index 1e0cb04ba4e..3ecbc992bd4 100644 --- a/ace/Asynch_IO.cpp +++ b/ace/Asynch_IO.cpp @@ -17,7 +17,7 @@ ACE_Asynch_Result::ACE_Asynch_Result (ACE_Handler &handler, const void* act, - HANDLE event, + ACE_HANDLE event, u_long offset, u_long offset_high) : handler_ (handler), @@ -98,8 +98,10 @@ ACE_Asynch_Operation::ACE_Asynch_Operation (void) int ACE_Asynch_Operation::open (ACE_Handler &handler, ACE_HANDLE handle, - const void *completion_key) + const void *completion_key, + ACE_Proactor *proactor) { + this->proactor_ = proactor; this->handler_ = &handler; this->handle_ = handle; @@ -109,14 +111,18 @@ ACE_Asynch_Operation::open (ACE_Handler &handler, if (this->handle_ == ACE_INVALID_HANDLE) return -1; - // Grab the proactor from the <Service_Config> if - // <handler->proactor> is zero - ACE_Proactor *proactor = this->handler_->proactor (); - if (proactor == 0) - proactor = ACE_Service_Config::proactor (); + // If no proactor was passed + if (this->proactor_ == 0) + { + // Grab the proactor from the <Service_Config> if + // <handler->proactor> is zero + this->proactor_ = this->handler_->proactor (); + if (this->proactor_ == 0) + this->proactor_ = ACE_Service_Config::proactor (); + } // Register with the <proactor> - return proactor->register_handle (this->handle_, completion_key); + return this->proactor_->register_handle (this->handle_, completion_key); } int @@ -146,7 +152,8 @@ ACE_Asynch_Read_Stream::read (ACE_Message_Block &message_block, this->handle_, message_block, bytes_to_read, - act), + act, + this->proactor_->get_handle ()), -1); return this->shared_read (result, @@ -199,8 +206,9 @@ ACE_Asynch_Read_Stream::Result::Result (ACE_Handler &handler, ACE_HANDLE handle, ACE_Message_Block &message_block, u_long bytes_to_read, - const void* act) - : ACE_Asynch_Result (handler, act), + const void* act, + ACE_HANDLE event) + : ACE_Asynch_Result (handler, act, event), handle_ (handle), message_block_ (message_block), bytes_to_read_ (bytes_to_read) @@ -261,9 +269,10 @@ ACE_Asynch_Write_Stream::write (ACE_Message_Block &message_block, this->handle_, message_block, bytes_to_write, - act), + act, + this->proactor_->get_handle ()), -1); - + return this->shared_write (result, message_block, bytes_to_write, @@ -314,8 +323,9 @@ ACE_Asynch_Write_Stream::Result::Result (ACE_Handler &handler, ACE_HANDLE handle, ACE_Message_Block &message_block, u_long bytes_to_write, - const void* act) - : ACE_Asynch_Result (handler, act), + const void* act, + ACE_HANDLE event) + : ACE_Asynch_Result (handler, act, event), handle_ (handle), message_block_ (message_block), bytes_to_write_ (bytes_to_write) @@ -376,7 +386,8 @@ ACE_Asynch_Read_File::read (ACE_Message_Block &message_block, bytes_to_read, act, offset, - offset_high), + offset_high, + this->proactor_->get_handle ()), -1); return this->shared_read (result, @@ -393,8 +404,9 @@ ACE_Asynch_Read_File::Result::Result (ACE_Handler &handler, u_long bytes_to_read, const void* act, u_long offset, - u_long offset_high) - : ACE_Asynch_Read_Stream::Result (handler, handle, message_block, bytes_to_read, act) + u_long offset_high, + ACE_HANDLE event) + : ACE_Asynch_Read_Stream::Result (handler, handle, message_block, bytes_to_read, act, event) { this->Offset = offset; this->OffsetHigh = offset_high; @@ -436,7 +448,8 @@ ACE_Asynch_Write_File::write (ACE_Message_Block &message_block, bytes_to_write, act, offset, - offset_high), + offset_high, + this->proactor_->get_handle ()), -1); return this->shared_write (result, @@ -453,8 +466,9 @@ ACE_Asynch_Write_File::Result::Result (ACE_Handler &handler, u_long bytes_to_write, const void* act, u_long offset, - u_long offset_high) - : ACE_Asynch_Write_Stream::Result (handler, handle, message_block, bytes_to_write, act) + u_long offset_high, + ACE_HANDLE event) + : ACE_Asynch_Write_Stream::Result (handler, handle, message_block, bytes_to_write, act, event) { this->Offset = offset; this->OffsetHigh = offset_high; @@ -522,7 +536,8 @@ ACE_Asynch_Accept::accept (ACE_Message_Block &message_block, accept_handle, message_block, bytes_to_read, - act), + act, + this->proactor_->get_handle ()), -1); u_long bytes_read; @@ -598,8 +613,9 @@ ACE_Asynch_Accept::Result::Result (ACE_Handler &handler, ACE_HANDLE accept_handle, ACE_Message_Block &message_block, u_long bytes_to_read, - const void* act) - : ACE_Asynch_Result (handler, act), + const void* act, + ACE_HANDLE event) + : ACE_Asynch_Result (handler, act, event), listen_handle_ (listen_handle), accept_handle_ (accept_handle), message_block_ (message_block), @@ -654,7 +670,8 @@ ACE_Asynch_Transmit_File::transmit_file (ACE_HANDLE file, offset_high, bytes_per_send, flags, - act), + act, + this->proactor_->get_handle ()), -1); LPTRANSMIT_FILE_BUFFERS transmit_buffers = 0; @@ -743,8 +760,9 @@ ACE_Asynch_Transmit_File::Result::Result (ACE_Handler &handler, u_long offset_high, u_long bytes_per_send, u_long flags, - const void *act) - : ACE_Asynch_Result (handler, act, 0, offset, offset_high), + const void *act, + ACE_HANDLE event) + : ACE_Asynch_Result (handler, act, event, offset, offset_high), socket_ (socket), file_ (file), header_and_trailer_ (header_and_trailer), @@ -952,8 +970,8 @@ ACE_Handler::handle_notify (const ACE_Asynch_Notify::Result &result) */ void -ACE_Handler::handle_timeout (const ACE_Time_Value &tv, - const void *act) +ACE_Handler::handle_time_out (const ACE_Time_Value &tv, + const void *act) { } diff --git a/ace/Asynch_IO.h b/ace/Asynch_IO.h index 3145d3104c1..b70febe7a1f 100644 --- a/ace/Asynch_IO.h +++ b/ace/Asynch_IO.h @@ -82,7 +82,7 @@ public: ACE_Asynch_Result (ACE_Handler &handler, const void* act, - ACE_HANDLE event = 0, + ACE_HANDLE event, u_long offset = 0, u_long offset_high = 0); // Constructor @@ -138,7 +138,8 @@ protected: public: int open (ACE_Handler &handler, ACE_HANDLE handle = ACE_INVALID_HANDLE, - const void *completion_key = 0); + const void *completion_key = 0, + ACE_Proactor *proactor = 0); // Initializes the factory with information which will be used with // each asynchronous call. If (<handle> == ACE_INVALID_HANDLE), // <ACE_Handler::handle> will be called on the <handler> to get the @@ -150,6 +151,9 @@ public: // operations issued by other threads. protected: + // Proactor that this Asynch IO will be registered with + ACE_Proactor *proactor_; + ACE_Handler *handler_; // Handler that will receive the callback. @@ -228,7 +232,8 @@ public: ACE_HANDLE handle, ACE_Message_Block &message_block, u_long bytes_to_read, - const void* act); + const void* act, + ACE_HANDLE event); // Constructor is protected since creation is limited to // ACE_Asynch_Read_Stream factory. @@ -320,7 +325,8 @@ public: ACE_HANDLE handle, ACE_Message_Block &message_block, u_long bytes_to_write, - const void* act); + const void* act, + ACE_HANDLE event); // Constructor is protected since creation is limited to // ACE_Asynch_Write_Stream factory. @@ -403,7 +409,8 @@ public: u_long bytes_to_read, const void* act, u_long offset, - u_long offset_high); + u_long offset_high, + ACE_HANDLE event); // Constructor is protected since creation is limited to // ACE_Asynch_Read_File factory. @@ -476,7 +483,8 @@ public: u_long bytes_to_write, const void* act, u_long offset, - u_long offset_high); + u_long offset_high, + ACE_HANDLE event); // Constructor is protected since creation is limited to // ACE_Asynch_Write_File factory. @@ -560,7 +568,8 @@ public: ACE_HANDLE accept_handle, ACE_Message_Block &message_block, u_long bytes_to_read, - const void* act); + const void* act, + ACE_HANDLE event); // Constructor is protected since creation is limited to // ACE_Asynch_Accept factory. @@ -681,7 +690,8 @@ public: u_long offset_high, u_long bytes_per_send, u_long flags, - const void *act); + const void *act, + ACE_HANDLE event); // Constructor is protected since creation is limited to // ACE_Asynch_Transmit_File factory. @@ -819,8 +829,8 @@ public: virtual void handle_notify (const ACE_Asynch_Notify::Result &result); */ - virtual void handle_timeout (const ACE_Time_Value &tv, - const void *act = 0); + virtual void handle_time_out (const ACE_Time_Value &tv, + const void *act = 0); // Called when timer expires. // <tv> was the requested time value and // <act> is the ACT passed when scheduling the timer diff --git a/ace/Proactor.cpp b/ace/Proactor.cpp index 7d4c67e88fd..050b142d144 100644 --- a/ace/Proactor.cpp +++ b/ace/Proactor.cpp @@ -121,11 +121,19 @@ ACE_Proactor_Handle_Timeout_Upcall::operator () (TIMER_QUEUE &timer_queue, "(%t) No Proactor set in ACE_Proactor_Handle_Timeout_Upcall, no completion port to post timeout to?!@\n"), -1); + // Grab the event associated with the Proactor + HANDLE handle = this->proactor_->get_handle (); + // Create the Asynch_Timer ACE_Proactor::Asynch_Timer *asynch_timer = new ACE_Proactor::Asynch_Timer (*handler, act, - time); + time, + handle); + // If Proactor event is valid, signal it + if (handle != ACE_INVALID_HANDLE || + handle != 0) + ACE_OS::event_signal (&handle); // Post a completion if (::PostQueuedCompletionStatus (this->proactor_->completion_port_, // completion port @@ -170,12 +178,14 @@ ACE_Proactor_Handle_Timeout_Upcall::proactor (ACE_Proactor &proactor) ACE_Proactor::ACE_Proactor (size_t number_of_threads, - Timer_Queue *tq) + Timer_Queue *tq, + int used_with_reactorEx_event_loop) : completion_port_ (0), // This *MUST* be 0, *NOT* ACE_INVALID_HANDLE!!!! number_of_threads_ (number_of_threads), timer_queue_ (0), delete_timer_queue_ (0), - timer_handler_ (0) + timer_handler_ (0), + used_with_reactorEx_event_loop_ (used_with_reactorEx_event_loop) { // create the completion port this->completion_port_ = ::CreateIoCompletionPort (INVALID_HANDLE_VALUE, @@ -317,6 +327,39 @@ ACE_Proactor::cancel_timer (ACE_Handler &handler) return this->timer_queue_->cancel (&handler); } +int +ACE_Proactor::handle_signal (int, siginfo_t *, ucontext_t *) +{ + // Perform a non-blocking "poll" for all the I/O events that have + // completed in the I/O completion queue. + + ACE_Time_Value timeout (0, 0); + int result; + + while ((result = this->handle_events (timeout)) == 1) + continue; + + // If our handle_events failed, we'll report a failure to the + // ReactorEx. + return result == -1 ? -1 : 0; +} + +int +ACE_Proactor::handle_close (ACE_HANDLE handle, + ACE_Reactor_Mask close_mask) +{ + return this->close (); +} + +ACE_HANDLE +ACE_Proactor::get_handle (void) const +{ + if (this->used_with_reactorEx_event_loop_) + return this->event_.handle (); + else + return 0; +} + int ACE_Proactor::handle_events (ACE_Time_Value &wait_time) { @@ -478,9 +521,9 @@ ACE_Proactor::timer_queue (Timer_Queue *tq) ACE_Proactor::Asynch_Timer::Asynch_Timer (ACE_Handler &handler, const void *act, - const ACE_Time_Value &tv) - : ACE_Asynch_Result (handler, - act), + const ACE_Time_Value &tv, + ACE_HANDLE event) + : ACE_Asynch_Result (handler, act, event), time_ (tv) { } @@ -491,7 +534,7 @@ ACE_Proactor::Asynch_Timer::complete (u_long bytes_transferred, const void *completion_key, u_long error) { - this->handler_.handle_timeout (this->time_, this->act ()); + this->handler_.handle_time_out (this->time_, this->act ()); } #if defined (ACE_TEMPLATES_REQUIRE_SPECIALIZATION) diff --git a/ace/Proactor.h b/ace/Proactor.h index 65500b07fa1..4d899b79b38 100644 --- a/ace/Proactor.h +++ b/ace/Proactor.h @@ -20,6 +20,8 @@ #include "ace/Asynch_IO.h" #include "ace/Thread_Manager.h" +#include "ace/Event_Handler.h" + #include "ace/Timer_Queue.h" #include "ace/Timer_List.h" #include "ace/Timer_Heap.h" @@ -71,7 +73,7 @@ protected: // Handle to the proactor. This is needed for the completion port. }; -class ACE_Export ACE_Proactor +class ACE_Export ACE_Proactor : public ACE_Event_Handler // // = TITLE // @@ -86,7 +88,7 @@ class ACE_Export ACE_Proactor // Access needed to: thr_mgr_ friend class ACE_Proactor_Handle_Timeout_Upcall; - // Access needed to: Asynch_Timer + // Access needed to: Asynch_Timer, and completion_port_ public: @@ -107,7 +109,8 @@ public: typedef ACE_Timer_Wheel_Iterator_T<ACE_Handler *, ACE_Proactor_Handle_Timeout_Upcall> Timer_Wheel_Iterator; ACE_Proactor (size_t number_of_threads = 0, - Timer_Queue *tq = 0); + Timer_Queue *tq = 0, + int used_with_reactorEx_event_loop = 0); // A do nothing constructor. virtual ~ACE_Proactor (void); @@ -197,9 +200,20 @@ public: Timer_Queue *timer_queue (void) const; void timer_queue (Timer_Queue *); // Get/Set timer queue + + virtual ACE_HANDLE get_handle (void) const; + // Get the event handle. protected: + virtual int handle_signal (int signum, siginfo_t * = 0, ucontext_t * = 0); + // Called when object is signaled by OS (either via UNIX signals or + // when a Win32 object becomes signaled). + + virtual int handle_close (ACE_HANDLE handle, + ACE_Reactor_Mask close_mask); + // Called when object is removed from the ACE_Reactor + void application_specific_code (ACE_Asynch_Result *asynch_result, u_long bytes_transferred, int success, @@ -228,7 +242,8 @@ protected: public: Asynch_Timer (ACE_Handler &handler, const void *act, - const ACE_Time_Value &tv); + const ACE_Time_Value &tv, + ACE_HANDLE event); protected: virtual void complete (u_long bytes_transferred, @@ -258,6 +273,14 @@ protected: ACE_Thread_Manager thr_mgr_; // This will manage the thread in the Timer_Handler + + ACE_Auto_Event event_; + // This event is used in conjunction with ReactorEx when we try to + // integrate the event loops of ReactorEx and the Proactor. + + int used_with_reactorEx_event_loop_; + // Flag that indicates whether we are used in conjunction with + // ReactorEx }; #if defined (__ACE_INLINE__) |