summaryrefslogtreecommitdiff
path: root/ace/Proactor.cpp
diff options
context:
space:
mode:
authorirfan <irfan@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>1997-04-22 08:15:21 +0000
committerirfan <irfan@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>1997-04-22 08:15:21 +0000
commit69ece2d7d2b032c8ef6f827723874fb11b3b6ba7 (patch)
treeaff461430c99ded6240bc5cd2572d3c1c31e94c3 /ace/Proactor.cpp
parent9f66663966051d24f2b41daca601ea0f19a52dce (diff)
downloadATCD-69ece2d7d2b032c8ef6f827723874fb11b3b6ba7.tar.gz
*** empty log message ***
Diffstat (limited to 'ace/Proactor.cpp')
-rw-r--r--ace/Proactor.cpp312
1 files changed, 306 insertions, 6 deletions
diff --git a/ace/Proactor.cpp b/ace/Proactor.cpp
index 7a76cfa55f8..c7bf5f60089 100644
--- a/ace/Proactor.cpp
+++ b/ace/Proactor.cpp
@@ -7,10 +7,68 @@
#if defined (ACE_WIN32)
// This only works on Win32 platforms
+#include "ace/Task.h"
#include "ace/Timer_Queue.h"
+#include "ace/Timer_List.h"
#include "ace/Log_Msg.h"
-#include "ace/Asynch_IO.h"
+class ACE_Export ACE_Proactor_Timer_Handler : public ACE_Task <ACE_NULL_SYNCH>
+ //
+ // = TITLE
+ //
+ // A Handler for timer. It helps in the management of timers
+ // registered with the Proactor.
+ //
+ // = DESCRIPTION
+ //
+ // This object has a thread that will wait on the earliest
+ // time in a list of timers and an event. When a timer
+ // expires, the thread will post a completion event on the
+ // port and go back to waiting on the timer queue and
+ // event. If the event is signaled, the thread will refresh
+ // the time it is currently waiting on (in case the earliest
+ // time has changed)
+ //
+{
+ friend class ACE_Proactor;
+ // Proactor has special privileges
+
+public:
+ ACE_Proactor_Timer_Handler (ACE_Proactor &proactor);
+
+protected:
+ virtual int svc (void);
+ // Run by a daemon thread to handle deferred processing. In other
+ // words, this method will do the waiting on the earliest timer
+ // and event
+
+ virtual int handle_timeout (const ACE_Time_Value &tv,
+ const void *arg = 0);
+ // Framework calls this when a timer expires
+
+ ACE_Auto_Event timer_event_;
+ // Event to wait on
+
+ ACE_Proactor &proactor_;
+ // Proactor
+
+public:
+ class ACE_Export ACT
+ //
+ // = TITLE
+ //
+ // A new ACT that combines the old ACT and Handler
+ //
+ {
+ public:
+ ACT (ACE_Handler &handler,
+ const void *act);
+
+ ACE_Handler &handler_;
+ const void *act_;
+ };
+};
+
#if !defined (__ACE_INLINE__)
#include "ace/Proactor.i"
#endif /* __ACE_INLINE__ */
@@ -18,14 +76,27 @@
ACE_Proactor::ACE_Proactor (size_t number_of_threads,
ACE_Timer_Queue *tq)
: completion_port_ (0), // This *MUST* be 0, *NOT* ACE_INVALID_HANDLE!!!!
- number_of_threads_ (number_of_threads)
+ number_of_threads_ (number_of_threads),
+ timer_queue_ (0),
+ delete_timer_queue_ (0),
+ timer_handler_ (0)
{
+ ACE_NEW (this->timer_handler_, ACE_Proactor_Timer_Handler (*this));
+
+ // set the timer queue
+ this->timer_queue (tq);
+
+ // create the completion port
this->completion_port_ = ::CreateIoCompletionPort (INVALID_HANDLE_VALUE,
this->completion_port_,
0,
this->number_of_threads_);
if (this->completion_port_ == 0)
ACE_ERROR ((LM_ERROR, "%p\n", "CreateIoCompletionPort"));
+
+ // activate <timer_handler>
+ if (this->timer_handler_->activate () == -1)
+ ACE_ERROR ((LM_ERROR, "%p Could not create thread\n", "Task::activate"));
}
ACE_Proactor::~ACE_Proactor (void)
@@ -36,6 +107,21 @@ ACE_Proactor::~ACE_Proactor (void)
int
ACE_Proactor::close (void)
{
+ // Take care of the timer handler
+ if (this->timer_handler_)
+ {
+ delete this->timer_handler_;
+ this->timer_handler_ = 0;
+ }
+
+ // Take care of the timer queue
+ if (this->delete_timer_queue_)
+ {
+ delete this->timer_queue_;
+ this->timer_queue_ = 0;
+ this->delete_timer_queue_ = 0;
+ }
+
// Close the completion port
if (this->completion_port_ != 0)
{
@@ -68,14 +154,87 @@ ACE_Proactor::register_handle (ACE_HANDLE handle,
}
int
+ACE_Proactor::schedule_timer (ACE_Handler &handler,
+ const void *act,
+ const ACE_Time_Value &time)
+{
+ return this->schedule_timer (handler, act, time, ACE_Time_Value::zero);
+}
+
+int
+ACE_Proactor::schedule_repeating_timer (ACE_Handler &handler,
+ const void *act,
+ const ACE_Time_Value &interval)
+{
+ return this->schedule_timer (handler, act, interval, interval);
+}
+
+int
+ACE_Proactor::schedule_timer (ACE_Handler &handler,
+ const void *act,
+ const ACE_Time_Value &time,
+ const ACE_Time_Value &interval)
+{
+ // Create a new ACT
+ ACE_Proactor_Timer_Handler::ACT *new_act = 0;
+ ACE_NEW_RETURN (new_act, ACE_Proactor_Timer_Handler::ACT (handler, act), -1);
+
+ // Only one guy goes in here at a time
+ ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex, ace_mon, this->timer_queue_->lock (), -1);
+
+ // absolute time
+ ACE_Time_Value absolute_time = this->timer_queue_->gettimeofday () + time;
+
+ // Schedule the timer
+ int result = this->timer_queue_->schedule (this->timer_handler_,
+ new_act,
+ absolute_time,
+ interval);
+ // Failure of schedule
+ if (result == -1)
+ // cleanup
+ delete new_act;
+
+ // no failures: check to see if we are the earliest time
+ else if (this->timer_queue_->earliest_time () == absolute_time)
+ // wake up the timer thread
+ if (this->timer_handler_->timer_event_.signal () == -1)
+ {
+ // failure: cleanup
+ delete new_act;
+ // Cancel timer
+ this->timer_queue_->cancel (result);
+ result = -1;
+ }
+ return result;
+}
+
+int
+ACE_Proactor::cancel_timer (int timer_id,
+ const void **arg)
+{
+ // No need to singal timer event here. Even if the cancel timer was
+ // the earliest, we will have an extra wakeup.
+ return this->timer_queue_->cancel (timer_id, arg);
+}
+
+int
ACE_Proactor::handle_events (ACE_Time_Value &wait_time)
{
- return 0;
+ // Decrement <wait_time> with the amount of time spent in the method
+ ACE_Countdown_Time countdown (&wait_time);
+ return this->handle_events (wait_time.msec ());
}
int
ACE_Proactor::handle_events (void)
{
+ return this->handle_events (INFINITE);
+}
+
+int
+ACE_Proactor::handle_events (unsigned long milli_seconds)
+{
OVERLAPPED *overlapped = 0;
u_long bytes_transferred = 0;
u_long completion_key = 0;
@@ -85,11 +244,20 @@ ACE_Proactor::handle_events (void)
&bytes_transferred,
&completion_key,
&overlapped,
- INFINITE);
-
+ milli_seconds);
if (result == FALSE && overlapped == 0)
- ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "GetQueuedCompletionStatus"), -1);
+ {
+ errno = ::GetLastError ();
+ // @@ What's the WIN32 constant for timeout (258)?!?!?!
+ if (errno == 258)
+ {
+ errno = ETIMEDOUT;
+ return 0;
+ }
+ else
+ ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "GetQueuedCompletionStatus"), -1);
+ }
else
{
// Narrow result
@@ -177,4 +345,136 @@ ACE_Proactor::number_of_threads (size_t threads)
this->number_of_threads_ = threads;
}
+ACE_Timer_Queue *
+ACE_Proactor::timer_queue (void) const
+{
+ return this->timer_queue_;
+}
+
+void
+ACE_Proactor::timer_queue (ACE_Timer_Queue *tq)
+{
+ // cleanup old timer queue
+ if (this->delete_timer_queue_)
+ {
+ delete this->timer_queue_;
+ this->delete_timer_queue_ = 0;
+ }
+
+ // new timer queue
+ if (tq == 0)
+ {
+ this->timer_queue_ = new ACE_Timer_List;
+ this->delete_timer_queue_ = 1;
+ }
+ else
+ {
+ this->timer_queue_ = tq;
+ this->delete_timer_queue_ = 0;
+ }
+}
+
+ACE_Proactor_Timer_Handler::ACE_Proactor_Timer_Handler (ACE_Proactor &proactor)
+ : proactor_ (proactor),
+ ACE_Task <ACE_NULL_SYNCH> (&proactor.thr_mgr_)
+{
+}
+
+int
+ACE_Proactor_Timer_Handler::svc (void)
+{
+ u_long time;
+ ACE_Time_Value absolute_time;
+
+ for (;;)
+ {
+ // default value
+ time = INFINITE;
+
+ // If the timer queue is not empty
+ if (!this->proactor_.timer_queue_->is_empty ())
+ {
+ // Get the earliest absolute time
+ absolute_time
+ = this->proactor_.timer_queue_->earliest_time ()
+ - this->proactor_.timer_queue_->gettimeofday ();
+
+ // time to wait
+ time = absolute_time.msec ();
+
+ // Make sure the time is positive
+ if (time < 0)
+ time = 0;
+ }
+
+ // Wait for event upto <time> milli seconds
+ int result = ::WaitForSingleObject (this->timer_event_.handle (),
+ time);
+ switch (result)
+ {
+ case WAIT_TIMEOUT:
+ // timeout: expire timers
+ this->proactor_.timer_queue_->expire ();
+ break;
+ case WAIT_FAILED:
+ // error
+ ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "WaitForSingleObject"), -1);
+ }
+ }
+
+ return 0;
+}
+
+int
+ACE_Proactor_Timer_Handler::handle_timeout (const ACE_Time_Value &tv,
+ const void *arg)
+{
+ ACE_Proactor_Timer_Handler::ACT *new_act
+ = (ACE_Proactor_Timer_Handler::ACT *) arg;
+
+ // Create the Asynch_Timer
+ ACE_Proactor::Asynch_Timer *asynch_timer
+ = new ACE_Proactor::Asynch_Timer (new_act->handler_,
+ new_act->act_,
+ tv);
+
+ // Post a completion
+ if (::PostQueuedCompletionStatus (this->proactor_.completion_port_, // completion port
+ 0, // number of bytes tranferred
+ 0, // completion key
+ asynch_timer // overlapped
+ ) == FALSE)
+ {
+ ACE_ERROR ((LM_ERROR, "Failure in dealing with timers: PostQueuedCompletionStatus failed"));
+ delete asynch_timer;
+ }
+
+ return 0;
+}
+
+ACE_Proactor::Asynch_Timer::Asynch_Timer (ACE_Handler &handler,
+ const void *act,
+ const ACE_Time_Value &tv)
+ : ACE_Asynch_Result (handler,
+ act),
+ time_ (tv)
+{
+}
+
+void
+ACE_Proactor::Asynch_Timer::complete (u_long bytes_transferred,
+ int success,
+ const void *completion_key,
+ u_long error)
+{
+ this->handler_.handle_timeout (this->time_, this->act ());
+}
+
+ACE_Proactor_Timer_Handler::ACT::ACT (ACE_Handler &handler,
+ const void *act)
+ : act_ (act),
+ handler_ (handler)
+{
+}
+
#endif /* ACE_WIN32 */