diff options
author | irfan <irfan@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 1997-04-22 08:15:21 +0000 |
---|---|---|
committer | irfan <irfan@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 1997-04-22 08:15:21 +0000 |
commit | 69ece2d7d2b032c8ef6f827723874fb11b3b6ba7 (patch) | |
tree | aff461430c99ded6240bc5cd2572d3c1c31e94c3 /ace/Proactor.cpp | |
parent | 9f66663966051d24f2b41daca601ea0f19a52dce (diff) | |
download | ATCD-69ece2d7d2b032c8ef6f827723874fb11b3b6ba7.tar.gz |
*** empty log message ***
Diffstat (limited to 'ace/Proactor.cpp')
-rw-r--r-- | ace/Proactor.cpp | 312 |
1 files changed, 306 insertions, 6 deletions
diff --git a/ace/Proactor.cpp b/ace/Proactor.cpp index 7a76cfa55f8..c7bf5f60089 100644 --- a/ace/Proactor.cpp +++ b/ace/Proactor.cpp @@ -7,10 +7,68 @@ #if defined (ACE_WIN32) // This only works on Win32 platforms +#include "ace/Task.h" #include "ace/Timer_Queue.h" +#include "ace/Timer_List.h" #include "ace/Log_Msg.h" -#include "ace/Asynch_IO.h" +class ACE_Export ACE_Proactor_Timer_Handler : public ACE_Task <ACE_NULL_SYNCH> + // + // = TITLE + // + // A Handler for timer. It helps in the management of timers + // registered with the Proactor. + // + // = DESCRIPTION + // + // This object has a thread that will wait on the earliest + // time in a list of timers and an event. When a timer + // expires, the thread will post a completion event on the + // port and go back to waiting on the timer queue and + // event. If the event is signaled, the thread will refresh + // the time it is currently waiting on (in case the earliest + // time has changed) + // +{ + friend class ACE_Proactor; + // Proactor has special privileges + +public: + ACE_Proactor_Timer_Handler (ACE_Proactor &proactor); + +protected: + virtual int svc (void); + // Run by a daemon thread to handle deferred processing. In other + // words, this method will do the waiting on the earliest timer + // and event + + virtual int handle_timeout (const ACE_Time_Value &tv, + const void *arg = 0); + // Framework calls this when a timer expires + + ACE_Auto_Event timer_event_; + // Event to wait on + + ACE_Proactor &proactor_; + // Proactor + +public: + class ACE_Export ACT + // + // = TITLE + // + // A new ACT that combines the old ACT and Handler + // + { + public: + ACT (ACE_Handler &handler, + const void *act); + + ACE_Handler &handler_; + const void *act_; + }; +}; + #if !defined (__ACE_INLINE__) #include "ace/Proactor.i" #endif /* __ACE_INLINE__ */ @@ -18,14 +76,27 @@ ACE_Proactor::ACE_Proactor (size_t number_of_threads, ACE_Timer_Queue *tq) : completion_port_ (0), // This *MUST* be 0, *NOT* ACE_INVALID_HANDLE!!!! - number_of_threads_ (number_of_threads) + number_of_threads_ (number_of_threads), + timer_queue_ (0), + delete_timer_queue_ (0), + timer_handler_ (0) { + ACE_NEW (this->timer_handler_, ACE_Proactor_Timer_Handler (*this)); + + // set the timer queue + this->timer_queue (tq); + + // create the completion port this->completion_port_ = ::CreateIoCompletionPort (INVALID_HANDLE_VALUE, this->completion_port_, 0, this->number_of_threads_); if (this->completion_port_ == 0) ACE_ERROR ((LM_ERROR, "%p\n", "CreateIoCompletionPort")); + + // activate <timer_handler> + if (this->timer_handler_->activate () == -1) + ACE_ERROR ((LM_ERROR, "%p Could not create thread\n", "Task::activate")); } ACE_Proactor::~ACE_Proactor (void) @@ -36,6 +107,21 @@ ACE_Proactor::~ACE_Proactor (void) int ACE_Proactor::close (void) { + // Take care of the timer handler + if (this->timer_handler_) + { + delete this->timer_handler_; + this->timer_handler_ = 0; + } + + // Take care of the timer queue + if (this->delete_timer_queue_) + { + delete this->timer_queue_; + this->timer_queue_ = 0; + this->delete_timer_queue_ = 0; + } + // Close the completion port if (this->completion_port_ != 0) { @@ -68,14 +154,87 @@ ACE_Proactor::register_handle (ACE_HANDLE handle, } int +ACE_Proactor::schedule_timer (ACE_Handler &handler, + const void *act, + const ACE_Time_Value &time) +{ + return this->schedule_timer (handler, act, time, ACE_Time_Value::zero); +} + +int +ACE_Proactor::schedule_repeating_timer (ACE_Handler &handler, + const void *act, + const ACE_Time_Value &interval) +{ + return this->schedule_timer (handler, act, interval, interval); +} + +int +ACE_Proactor::schedule_timer (ACE_Handler &handler, + const void *act, + const ACE_Time_Value &time, + const ACE_Time_Value &interval) +{ + // Create a new ACT + ACE_Proactor_Timer_Handler::ACT *new_act = 0; + ACE_NEW_RETURN (new_act, ACE_Proactor_Timer_Handler::ACT (handler, act), -1); + + // Only one guy goes in here at a time + ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex, ace_mon, this->timer_queue_->lock (), -1); + + // absolute time + ACE_Time_Value absolute_time = this->timer_queue_->gettimeofday () + time; + + // Schedule the timer + int result = this->timer_queue_->schedule (this->timer_handler_, + new_act, + absolute_time, + interval); + // Failure of schedule + if (result == -1) + // cleanup + delete new_act; + + // no failures: check to see if we are the earliest time + else if (this->timer_queue_->earliest_time () == absolute_time) + // wake up the timer thread + if (this->timer_handler_->timer_event_.signal () == -1) + { + // failure: cleanup + delete new_act; + // Cancel timer + this->timer_queue_->cancel (result); + result = -1; + } + return result; +} + +int +ACE_Proactor::cancel_timer (int timer_id, + const void **arg) +{ + // No need to singal timer event here. Even if the cancel timer was + // the earliest, we will have an extra wakeup. + return this->timer_queue_->cancel (timer_id, arg); +} + +int ACE_Proactor::handle_events (ACE_Time_Value &wait_time) { - return 0; + // Decrement <wait_time> with the amount of time spent in the method + ACE_Countdown_Time countdown (&wait_time); + return this->handle_events (wait_time.msec ()); } int ACE_Proactor::handle_events (void) { + return this->handle_events (INFINITE); +} + +int +ACE_Proactor::handle_events (unsigned long milli_seconds) +{ OVERLAPPED *overlapped = 0; u_long bytes_transferred = 0; u_long completion_key = 0; @@ -85,11 +244,20 @@ ACE_Proactor::handle_events (void) &bytes_transferred, &completion_key, &overlapped, - INFINITE); - + milli_seconds); if (result == FALSE && overlapped == 0) - ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "GetQueuedCompletionStatus"), -1); + { + errno = ::GetLastError (); + // @@ What's the WIN32 constant for timeout (258)?!?!?! + if (errno == 258) + { + errno = ETIMEDOUT; + return 0; + } + else + ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "GetQueuedCompletionStatus"), -1); + } else { // Narrow result @@ -177,4 +345,136 @@ ACE_Proactor::number_of_threads (size_t threads) this->number_of_threads_ = threads; } +ACE_Timer_Queue * +ACE_Proactor::timer_queue (void) const +{ + return this->timer_queue_; +} + +void +ACE_Proactor::timer_queue (ACE_Timer_Queue *tq) +{ + // cleanup old timer queue + if (this->delete_timer_queue_) + { + delete this->timer_queue_; + this->delete_timer_queue_ = 0; + } + + // new timer queue + if (tq == 0) + { + this->timer_queue_ = new ACE_Timer_List; + this->delete_timer_queue_ = 1; + } + else + { + this->timer_queue_ = tq; + this->delete_timer_queue_ = 0; + } +} + +ACE_Proactor_Timer_Handler::ACE_Proactor_Timer_Handler (ACE_Proactor &proactor) + : proactor_ (proactor), + ACE_Task <ACE_NULL_SYNCH> (&proactor.thr_mgr_) +{ +} + +int +ACE_Proactor_Timer_Handler::svc (void) +{ + u_long time; + ACE_Time_Value absolute_time; + + for (;;) + { + // default value + time = INFINITE; + + // If the timer queue is not empty + if (!this->proactor_.timer_queue_->is_empty ()) + { + // Get the earliest absolute time + absolute_time + = this->proactor_.timer_queue_->earliest_time () + - this->proactor_.timer_queue_->gettimeofday (); + + // time to wait + time = absolute_time.msec (); + + // Make sure the time is positive + if (time < 0) + time = 0; + } + + // Wait for event upto <time> milli seconds + int result = ::WaitForSingleObject (this->timer_event_.handle (), + time); + switch (result) + { + case WAIT_TIMEOUT: + // timeout: expire timers + this->proactor_.timer_queue_->expire (); + break; + case WAIT_FAILED: + // error + ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "WaitForSingleObject"), -1); + } + } + + return 0; +} + +int +ACE_Proactor_Timer_Handler::handle_timeout (const ACE_Time_Value &tv, + const void *arg) +{ + ACE_Proactor_Timer_Handler::ACT *new_act + = (ACE_Proactor_Timer_Handler::ACT *) arg; + + // Create the Asynch_Timer + ACE_Proactor::Asynch_Timer *asynch_timer + = new ACE_Proactor::Asynch_Timer (new_act->handler_, + new_act->act_, + tv); + + // Post a completion + if (::PostQueuedCompletionStatus (this->proactor_.completion_port_, // completion port + 0, // number of bytes tranferred + 0, // completion key + asynch_timer // overlapped + ) == FALSE) + { + ACE_ERROR ((LM_ERROR, "Failure in dealing with timers: PostQueuedCompletionStatus failed")); + delete asynch_timer; + } + + return 0; +} + +ACE_Proactor::Asynch_Timer::Asynch_Timer (ACE_Handler &handler, + const void *act, + const ACE_Time_Value &tv) + : ACE_Asynch_Result (handler, + act), + time_ (tv) +{ +} + +void +ACE_Proactor::Asynch_Timer::complete (u_long bytes_transferred, + int success, + const void *completion_key, + u_long error) +{ + this->handler_.handle_timeout (this->time_, this->act ()); +} + +ACE_Proactor_Timer_Handler::ACT::ACT (ACE_Handler &handler, + const void *act) + : act_ (act), + handler_ (handler) +{ +} + #endif /* ACE_WIN32 */ |