summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--ChangeLog-97a31
-rw-r--r--ace/Asynch_IO.cpp76
-rw-r--r--ace/Asynch_IO.h30
-rw-r--r--ace/Proactor.cpp57
-rw-r--r--ace/Proactor.h31
5 files changed, 175 insertions, 50 deletions
diff --git a/ChangeLog-97a b/ChangeLog-97a
index 17a7c6eef0b..de493322aea 100644
--- a/ChangeLog-97a
+++ b/ChangeLog-97a
@@ -1,3 +1,34 @@
+Sun Apr 27 22:22:14 1997 <irfan@TWOSTEP>
+
+ * ace/Proactor: The Proactor can now be registered with ReactorEx
+ and both of them can be run from ReactorEx's event loop. Added a
+ flag to Proactor's constructor that indicates whether the
+ Proactor will be used in conjunction with ReactorEx event
+ loop. Only if this flag is set will the event in the Proactor be
+ used by the Asynch IO components. This will help with
+ performance.
+
+ * examples/Reactor/Proactor/test_multiple_loops.cpp: Added a new
+ test that shows the integration of the event loops of Proactor
+ and ReactorEx.
+
+ * ace/Asynch_IO: Added an ACE_EVENT parameter to the constructors
+ of classes that inherit from the OVERLAPPED structure. This way
+ the Proactor's event_ can be set in the OVERLAPPED structure.
+
+ Also changed the open methods on the Asynch IO classes to take a
+ Proactor as an extra parameter.
+
+ Changed ACE_Handler's handle_timeout() to handle_time_out() in
+ ACE_Handler. This is temporary till we decide on the argument
+ about mixing the interface of ACE_Event_Handler and
+ ACE_Handler. This change allows user to inherit from ACE_Handler
+ and ACE_Event_Handler and use the different handle_timeout()
+ method, one of which returns void and the other return an int.
+
+ * examples/Reactor/Proactor/test_timeout.cpp: This file got
+ affected by the above change.
+
Sun Apr 27 17:44:28 1997 Douglas C. Schmidt <schmidt@flamenco.cs.wustl.edu>
* ace/Synch_T.h: Added a new macro ACE_SYNCH_RW_MUTEX to
diff --git a/ace/Asynch_IO.cpp b/ace/Asynch_IO.cpp
index 1e0cb04ba4e..3ecbc992bd4 100644
--- a/ace/Asynch_IO.cpp
+++ b/ace/Asynch_IO.cpp
@@ -17,7 +17,7 @@
ACE_Asynch_Result::ACE_Asynch_Result (ACE_Handler &handler,
const void* act,
- HANDLE event,
+ ACE_HANDLE event,
u_long offset,
u_long offset_high)
: handler_ (handler),
@@ -98,8 +98,10 @@ ACE_Asynch_Operation::ACE_Asynch_Operation (void)
int
ACE_Asynch_Operation::open (ACE_Handler &handler,
ACE_HANDLE handle,
- const void *completion_key)
+ const void *completion_key,
+ ACE_Proactor *proactor)
{
+ this->proactor_ = proactor;
this->handler_ = &handler;
this->handle_ = handle;
@@ -109,14 +111,18 @@ ACE_Asynch_Operation::open (ACE_Handler &handler,
if (this->handle_ == ACE_INVALID_HANDLE)
return -1;
- // Grab the proactor from the <Service_Config> if
- // <handler->proactor> is zero
- ACE_Proactor *proactor = this->handler_->proactor ();
- if (proactor == 0)
- proactor = ACE_Service_Config::proactor ();
+ // If no proactor was passed
+ if (this->proactor_ == 0)
+ {
+ // Grab the proactor from the <Service_Config> if
+ // <handler->proactor> is zero
+ this->proactor_ = this->handler_->proactor ();
+ if (this->proactor_ == 0)
+ this->proactor_ = ACE_Service_Config::proactor ();
+ }
// Register with the <proactor>
- return proactor->register_handle (this->handle_, completion_key);
+ return this->proactor_->register_handle (this->handle_, completion_key);
}
int
@@ -146,7 +152,8 @@ ACE_Asynch_Read_Stream::read (ACE_Message_Block &message_block,
this->handle_,
message_block,
bytes_to_read,
- act),
+ act,
+ this->proactor_->get_handle ()),
-1);
return this->shared_read (result,
@@ -199,8 +206,9 @@ ACE_Asynch_Read_Stream::Result::Result (ACE_Handler &handler,
ACE_HANDLE handle,
ACE_Message_Block &message_block,
u_long bytes_to_read,
- const void* act)
- : ACE_Asynch_Result (handler, act),
+ const void* act,
+ ACE_HANDLE event)
+ : ACE_Asynch_Result (handler, act, event),
handle_ (handle),
message_block_ (message_block),
bytes_to_read_ (bytes_to_read)
@@ -261,9 +269,10 @@ ACE_Asynch_Write_Stream::write (ACE_Message_Block &message_block,
this->handle_,
message_block,
bytes_to_write,
- act),
+ act,
+ this->proactor_->get_handle ()),
-1);
-
+
return this->shared_write (result,
message_block,
bytes_to_write,
@@ -314,8 +323,9 @@ ACE_Asynch_Write_Stream::Result::Result (ACE_Handler &handler,
ACE_HANDLE handle,
ACE_Message_Block &message_block,
u_long bytes_to_write,
- const void* act)
- : ACE_Asynch_Result (handler, act),
+ const void* act,
+ ACE_HANDLE event)
+ : ACE_Asynch_Result (handler, act, event),
handle_ (handle),
message_block_ (message_block),
bytes_to_write_ (bytes_to_write)
@@ -376,7 +386,8 @@ ACE_Asynch_Read_File::read (ACE_Message_Block &message_block,
bytes_to_read,
act,
offset,
- offset_high),
+ offset_high,
+ this->proactor_->get_handle ()),
-1);
return this->shared_read (result,
@@ -393,8 +404,9 @@ ACE_Asynch_Read_File::Result::Result (ACE_Handler &handler,
u_long bytes_to_read,
const void* act,
u_long offset,
- u_long offset_high)
- : ACE_Asynch_Read_Stream::Result (handler, handle, message_block, bytes_to_read, act)
+ u_long offset_high,
+ ACE_HANDLE event)
+ : ACE_Asynch_Read_Stream::Result (handler, handle, message_block, bytes_to_read, act, event)
{
this->Offset = offset;
this->OffsetHigh = offset_high;
@@ -436,7 +448,8 @@ ACE_Asynch_Write_File::write (ACE_Message_Block &message_block,
bytes_to_write,
act,
offset,
- offset_high),
+ offset_high,
+ this->proactor_->get_handle ()),
-1);
return this->shared_write (result,
@@ -453,8 +466,9 @@ ACE_Asynch_Write_File::Result::Result (ACE_Handler &handler,
u_long bytes_to_write,
const void* act,
u_long offset,
- u_long offset_high)
- : ACE_Asynch_Write_Stream::Result (handler, handle, message_block, bytes_to_write, act)
+ u_long offset_high,
+ ACE_HANDLE event)
+ : ACE_Asynch_Write_Stream::Result (handler, handle, message_block, bytes_to_write, act, event)
{
this->Offset = offset;
this->OffsetHigh = offset_high;
@@ -522,7 +536,8 @@ ACE_Asynch_Accept::accept (ACE_Message_Block &message_block,
accept_handle,
message_block,
bytes_to_read,
- act),
+ act,
+ this->proactor_->get_handle ()),
-1);
u_long bytes_read;
@@ -598,8 +613,9 @@ ACE_Asynch_Accept::Result::Result (ACE_Handler &handler,
ACE_HANDLE accept_handle,
ACE_Message_Block &message_block,
u_long bytes_to_read,
- const void* act)
- : ACE_Asynch_Result (handler, act),
+ const void* act,
+ ACE_HANDLE event)
+ : ACE_Asynch_Result (handler, act, event),
listen_handle_ (listen_handle),
accept_handle_ (accept_handle),
message_block_ (message_block),
@@ -654,7 +670,8 @@ ACE_Asynch_Transmit_File::transmit_file (ACE_HANDLE file,
offset_high,
bytes_per_send,
flags,
- act),
+ act,
+ this->proactor_->get_handle ()),
-1);
LPTRANSMIT_FILE_BUFFERS transmit_buffers = 0;
@@ -743,8 +760,9 @@ ACE_Asynch_Transmit_File::Result::Result (ACE_Handler &handler,
u_long offset_high,
u_long bytes_per_send,
u_long flags,
- const void *act)
- : ACE_Asynch_Result (handler, act, 0, offset, offset_high),
+ const void *act,
+ ACE_HANDLE event)
+ : ACE_Asynch_Result (handler, act, event, offset, offset_high),
socket_ (socket),
file_ (file),
header_and_trailer_ (header_and_trailer),
@@ -952,8 +970,8 @@ ACE_Handler::handle_notify (const ACE_Asynch_Notify::Result &result)
*/
void
-ACE_Handler::handle_timeout (const ACE_Time_Value &tv,
- const void *act)
+ACE_Handler::handle_time_out (const ACE_Time_Value &tv,
+ const void *act)
{
}
diff --git a/ace/Asynch_IO.h b/ace/Asynch_IO.h
index 3145d3104c1..b70febe7a1f 100644
--- a/ace/Asynch_IO.h
+++ b/ace/Asynch_IO.h
@@ -82,7 +82,7 @@ public:
ACE_Asynch_Result (ACE_Handler &handler,
const void* act,
- ACE_HANDLE event = 0,
+ ACE_HANDLE event,
u_long offset = 0,
u_long offset_high = 0);
// Constructor
@@ -138,7 +138,8 @@ protected:
public:
int open (ACE_Handler &handler,
ACE_HANDLE handle = ACE_INVALID_HANDLE,
- const void *completion_key = 0);
+ const void *completion_key = 0,
+ ACE_Proactor *proactor = 0);
// Initializes the factory with information which will be used with
// each asynchronous call. If (<handle> == ACE_INVALID_HANDLE),
// <ACE_Handler::handle> will be called on the <handler> to get the
@@ -150,6 +151,9 @@ public:
// operations issued by other threads.
protected:
+ // Proactor that this Asynch IO will be registered with
+ ACE_Proactor *proactor_;
+
ACE_Handler *handler_;
// Handler that will receive the callback.
@@ -228,7 +232,8 @@ public:
ACE_HANDLE handle,
ACE_Message_Block &message_block,
u_long bytes_to_read,
- const void* act);
+ const void* act,
+ ACE_HANDLE event);
// Constructor is protected since creation is limited to
// ACE_Asynch_Read_Stream factory.
@@ -320,7 +325,8 @@ public:
ACE_HANDLE handle,
ACE_Message_Block &message_block,
u_long bytes_to_write,
- const void* act);
+ const void* act,
+ ACE_HANDLE event);
// Constructor is protected since creation is limited to
// ACE_Asynch_Write_Stream factory.
@@ -403,7 +409,8 @@ public:
u_long bytes_to_read,
const void* act,
u_long offset,
- u_long offset_high);
+ u_long offset_high,
+ ACE_HANDLE event);
// Constructor is protected since creation is limited to
// ACE_Asynch_Read_File factory.
@@ -476,7 +483,8 @@ public:
u_long bytes_to_write,
const void* act,
u_long offset,
- u_long offset_high);
+ u_long offset_high,
+ ACE_HANDLE event);
// Constructor is protected since creation is limited to
// ACE_Asynch_Write_File factory.
@@ -560,7 +568,8 @@ public:
ACE_HANDLE accept_handle,
ACE_Message_Block &message_block,
u_long bytes_to_read,
- const void* act);
+ const void* act,
+ ACE_HANDLE event);
// Constructor is protected since creation is limited to
// ACE_Asynch_Accept factory.
@@ -681,7 +690,8 @@ public:
u_long offset_high,
u_long bytes_per_send,
u_long flags,
- const void *act);
+ const void *act,
+ ACE_HANDLE event);
// Constructor is protected since creation is limited to
// ACE_Asynch_Transmit_File factory.
@@ -819,8 +829,8 @@ public:
virtual void handle_notify (const ACE_Asynch_Notify::Result &result);
*/
- virtual void handle_timeout (const ACE_Time_Value &tv,
- const void *act = 0);
+ virtual void handle_time_out (const ACE_Time_Value &tv,
+ const void *act = 0);
// Called when timer expires.
// <tv> was the requested time value and
// <act> is the ACT passed when scheduling the timer
diff --git a/ace/Proactor.cpp b/ace/Proactor.cpp
index 7d4c67e88fd..050b142d144 100644
--- a/ace/Proactor.cpp
+++ b/ace/Proactor.cpp
@@ -121,11 +121,19 @@ ACE_Proactor_Handle_Timeout_Upcall::operator () (TIMER_QUEUE &timer_queue,
"(%t) No Proactor set in ACE_Proactor_Handle_Timeout_Upcall, no completion port to post timeout to?!@\n"),
-1);
+ // Grab the event associated with the Proactor
+ HANDLE handle = this->proactor_->get_handle ();
+
// Create the Asynch_Timer
ACE_Proactor::Asynch_Timer *asynch_timer
= new ACE_Proactor::Asynch_Timer (*handler,
act,
- time);
+ time,
+ handle);
+ // If Proactor event is valid, signal it
+ if (handle != ACE_INVALID_HANDLE ||
+ handle != 0)
+ ACE_OS::event_signal (&handle);
// Post a completion
if (::PostQueuedCompletionStatus (this->proactor_->completion_port_, // completion port
@@ -170,12 +178,14 @@ ACE_Proactor_Handle_Timeout_Upcall::proactor (ACE_Proactor &proactor)
ACE_Proactor::ACE_Proactor (size_t number_of_threads,
- Timer_Queue *tq)
+ Timer_Queue *tq,
+ int used_with_reactorEx_event_loop)
: completion_port_ (0), // This *MUST* be 0, *NOT* ACE_INVALID_HANDLE!!!!
number_of_threads_ (number_of_threads),
timer_queue_ (0),
delete_timer_queue_ (0),
- timer_handler_ (0)
+ timer_handler_ (0),
+ used_with_reactorEx_event_loop_ (used_with_reactorEx_event_loop)
{
// create the completion port
this->completion_port_ = ::CreateIoCompletionPort (INVALID_HANDLE_VALUE,
@@ -317,6 +327,39 @@ ACE_Proactor::cancel_timer (ACE_Handler &handler)
return this->timer_queue_->cancel (&handler);
}
+int
+ACE_Proactor::handle_signal (int, siginfo_t *, ucontext_t *)
+{
+ // Perform a non-blocking "poll" for all the I/O events that have
+ // completed in the I/O completion queue.
+
+ ACE_Time_Value timeout (0, 0);
+ int result;
+
+ while ((result = this->handle_events (timeout)) == 1)
+ continue;
+
+ // If our handle_events failed, we'll report a failure to the
+ // ReactorEx.
+ return result == -1 ? -1 : 0;
+}
+
+int
+ACE_Proactor::handle_close (ACE_HANDLE handle,
+ ACE_Reactor_Mask close_mask)
+{
+ return this->close ();
+}
+
+ACE_HANDLE
+ACE_Proactor::get_handle (void) const
+{
+ if (this->used_with_reactorEx_event_loop_)
+ return this->event_.handle ();
+ else
+ return 0;
+}
+
int
ACE_Proactor::handle_events (ACE_Time_Value &wait_time)
{
@@ -478,9 +521,9 @@ ACE_Proactor::timer_queue (Timer_Queue *tq)
ACE_Proactor::Asynch_Timer::Asynch_Timer (ACE_Handler &handler,
const void *act,
- const ACE_Time_Value &tv)
- : ACE_Asynch_Result (handler,
- act),
+ const ACE_Time_Value &tv,
+ ACE_HANDLE event)
+ : ACE_Asynch_Result (handler, act, event),
time_ (tv)
{
}
@@ -491,7 +534,7 @@ ACE_Proactor::Asynch_Timer::complete (u_long bytes_transferred,
const void *completion_key,
u_long error)
{
- this->handler_.handle_timeout (this->time_, this->act ());
+ this->handler_.handle_time_out (this->time_, this->act ());
}
#if defined (ACE_TEMPLATES_REQUIRE_SPECIALIZATION)
diff --git a/ace/Proactor.h b/ace/Proactor.h
index 65500b07fa1..4d899b79b38 100644
--- a/ace/Proactor.h
+++ b/ace/Proactor.h
@@ -20,6 +20,8 @@
#include "ace/Asynch_IO.h"
#include "ace/Thread_Manager.h"
+#include "ace/Event_Handler.h"
+
#include "ace/Timer_Queue.h"
#include "ace/Timer_List.h"
#include "ace/Timer_Heap.h"
@@ -71,7 +73,7 @@ protected:
// Handle to the proactor. This is needed for the completion port.
};
-class ACE_Export ACE_Proactor
+class ACE_Export ACE_Proactor : public ACE_Event_Handler
//
// = TITLE
//
@@ -86,7 +88,7 @@ class ACE_Export ACE_Proactor
// Access needed to: thr_mgr_
friend class ACE_Proactor_Handle_Timeout_Upcall;
- // Access needed to: Asynch_Timer
+ // Access needed to: Asynch_Timer, and completion_port_
public:
@@ -107,7 +109,8 @@ public:
typedef ACE_Timer_Wheel_Iterator_T<ACE_Handler *, ACE_Proactor_Handle_Timeout_Upcall> Timer_Wheel_Iterator;
ACE_Proactor (size_t number_of_threads = 0,
- Timer_Queue *tq = 0);
+ Timer_Queue *tq = 0,
+ int used_with_reactorEx_event_loop = 0);
// A do nothing constructor.
virtual ~ACE_Proactor (void);
@@ -197,9 +200,20 @@ public:
Timer_Queue *timer_queue (void) const;
void timer_queue (Timer_Queue *);
// Get/Set timer queue
+
+ virtual ACE_HANDLE get_handle (void) const;
+ // Get the event handle.
protected:
+ virtual int handle_signal (int signum, siginfo_t * = 0, ucontext_t * = 0);
+ // Called when object is signaled by OS (either via UNIX signals or
+ // when a Win32 object becomes signaled).
+
+ virtual int handle_close (ACE_HANDLE handle,
+ ACE_Reactor_Mask close_mask);
+ // Called when object is removed from the ACE_Reactor
+
void application_specific_code (ACE_Asynch_Result *asynch_result,
u_long bytes_transferred,
int success,
@@ -228,7 +242,8 @@ protected:
public:
Asynch_Timer (ACE_Handler &handler,
const void *act,
- const ACE_Time_Value &tv);
+ const ACE_Time_Value &tv,
+ ACE_HANDLE event);
protected:
virtual void complete (u_long bytes_transferred,
@@ -258,6 +273,14 @@ protected:
ACE_Thread_Manager thr_mgr_;
// This will manage the thread in the Timer_Handler
+
+ ACE_Auto_Event event_;
+ // This event is used in conjunction with ReactorEx when we try to
+ // integrate the event loops of ReactorEx and the Proactor.
+
+ int used_with_reactorEx_event_loop_;
+ // Flag that indicates whether we are used in conjunction with
+ // ReactorEx
};
#if defined (__ACE_INLINE__)