summaryrefslogtreecommitdiff
path: root/ace/Proactor.cpp
diff options
context:
space:
mode:
authoralex <alex@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>1999-04-05 18:31:37 +0000
committeralex <alex@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>1999-04-05 18:31:37 +0000
commite4ddd4c9c22e030b4688f081bce8a9cab4233a28 (patch)
treeece8c412381b37dc79c9dfce7108c4d4771a737f /ace/Proactor.cpp
parent80c9c09ce43eecd3f4817c17a288204cc4230458 (diff)
downloadATCD-e4ddd4c9c22e030b4688f081bce8a9cab4233a28.tar.gz
Completed Timers implementation for POSIX platforms. Timers
implementation is now common for POSIX and WIN32. Portable ACE_Auto_Event is used in the aux thread to wait for the Timer events. The Timer's code in WIN32 has been removed and the common code exists in Proactor.{h,cpp} only. A new factory method called create_asynch_timer has been created to create the Timer Result class. This is used internally by the Proactor to post timer completions to the Proactor completion queue. Application may want to use this directly to fake completions.
Diffstat (limited to 'ace/Proactor.cpp')
-rw-r--r--ace/Proactor.cpp444
1 files changed, 412 insertions, 32 deletions
diff --git a/ace/Proactor.cpp b/ace/Proactor.cpp
index 0b104032304..44f8b989af4 100644
--- a/ace/Proactor.cpp
+++ b/ace/Proactor.cpp
@@ -4,6 +4,7 @@
#include "ace/Proactor.h"
#include "ace/Proactor_Impl.h"
#include "ace/Object_Manager.h"
+#include "ace/Task_T.h"
ACE_RCSID(ace, Proactor, "$Id$")
@@ -33,10 +34,224 @@ int ACE_Proactor::delete_proactor_ = 0;
// Terminate the eventloop.
sig_atomic_t ACE_Proactor::end_event_loop_ = 0;
+class ACE_Export ACE_Proactor_Timer_Handler : public ACE_Task <ACE_NULL_SYNCH>
+{
+ // = TITLE
+ // A Handler for timer. It helps in the management of timers
+ // registered with the Proactor.
+ //
+ // = DESCRIPTION
+ // This object has a thread that will wait on the earliest time
+ // in a list of timers and an event. When a timer expires, the
+ // thread will post a completion event on the port and go back
+ // to waiting on the timer queue and event. If the event is
+ // signaled, the thread will refresh the time it is currently
+ // waiting on (in case the earliest time has changed).
+
+ friend class ACE_Proactor;
+ // Proactor has special privileges
+ // Access needed to: timer_event_
+
+public:
+ ACE_Proactor_Timer_Handler (ACE_Proactor &proactor);
+ // Constructor.
+
+ ~ACE_Proactor_Timer_Handler (void);
+ // Destructor.
+
+protected:
+ virtual int svc (void);
+ // Run by a daemon thread to handle deferred processing. In other
+ // words, this method will do the waiting on the earliest timer and
+ // event.
+
+ ACE_Auto_Event timer_event_;
+ // Event to wait on.
+
+ ACE_Proactor &proactor_;
+ // Proactor.
+
+ int shutting_down_;
+ // Flag used to indicate when we are shutting down.
+};
+
+ACE_Proactor_Timer_Handler::ACE_Proactor_Timer_Handler (ACE_Proactor &proactor)
+ : ACE_Task <ACE_NULL_SYNCH> (&proactor.thr_mgr_),
+ proactor_ (proactor),
+ shutting_down_ (0)
+{
+}
+
+ACE_Proactor_Timer_Handler::~ACE_Proactor_Timer_Handler (void)
+{
+ // Mark for closing down.
+ this->shutting_down_ = 1;
+
+ // Signal timer event.
+ this->timer_event_.signal ();
+}
+
+int
+ACE_Proactor_Timer_Handler::svc (void)
+{
+ ACE_Time_Value absolute_time;
+ int empty_flag = 0;
+
+ while (this->shutting_down_ == 0)
+ {
+ // If the timer queue is not empty
+ empty_flag = this->proactor_.timer_queue ()->is_empty ();
+ if (!empty_flag)
+ {
+ // Get the earliest absolute time.
+ absolute_time =
+ this->proactor_.timer_queue ()->earliest_time () -
+ this->proactor_.timer_queue ()->gettimeofday ();
+ // #if 0
+ ACE_DEBUG ((LM_DEBUG,
+ "%N%l:(%t):Earliest Time %d sec, %d msec time\n",
+ absolute_time.sec (),
+ absolute_time.msec ()));
+ // #endif
+ // Make it zero if it is negative.
+ if (absolute_time < ACE_Time_Value::zero)
+ absolute_time = ACE_Time_Value::zero;
+ }
+
+ // Wait for event upto <absolute_time>.
+ int result = 0;
+ // #if 0
+ ACE_DEBUG ((LM_DEBUG,
+ "%N%l:(%t):Waiting %d sec, %d msec time\n",
+ absolute_time.sec (),
+ absolute_time.msec ()));
+ // #endif
+ if (empty_flag)
+ {
+ // #if 0
+ ACE_DEBUG ((LM_DEBUG, "%N%l:(%t):Wait for ever\n"));
+ // #endif
+ result = this->timer_event_.wait (0);
+ }
+ else
+ {
+ // #if 0
+ ACE_DEBUG ((LM_DEBUG, "%N%l:(%t):wait for time\n"));
+ // #endif
+ result = this->timer_event_.wait (&absolute_time);
+ }
+ if (result == -1)
+ {
+ switch (errno)
+ {
+ case ETIME:
+ // timeout: expire timers
+ this->proactor_.timer_queue ()->expire ();
+ break;
+ default:
+ // Error.
+ ACE_ERROR_RETURN ((LM_ERROR,
+ ASYS_TEXT ("%N:%l:(%P | %t):%p\n"),
+ ASYS_TEXT ("ACE_Proactor_Timer_Handler::svc:wait failed")),
+ -1);
+ }
+ }
+ }
+
+ return 0;
+}
+
+// *********************************************************************
+
+ACE_Proactor_Handle_Timeout_Upcall::ACE_Proactor_Handle_Timeout_Upcall (void)
+ : proactor_ (0)
+{
+}
+
+int
+ACE_Proactor_Handle_Timeout_Upcall::timeout (TIMER_QUEUE &timer_queue,
+ ACE_Handler *handler,
+ const void *act,
+ const ACE_Time_Value &time)
+{
+ ACE_UNUSED_ARG (timer_queue);
+
+ if (this->proactor_ == 0)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ ASYS_TEXT ("(%t) No Proactor set in ACE_Proactor_Handle_Timeout_Upcall,")
+ ASYS_TEXT (" no completion port to post timeout to?!@\n")),
+ -1);
+
+ // Create the Asynch_Timer.
+ ACE_Asynch_Result_Impl *asynch_timer = this->proactor_->create_asynch_timer (*handler,
+ act,
+ time,
+ 0);
+ if (asynch_timer == 0)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "%N:%l:(%P | %t):%p\n",
+ "ACE_Proactor_Handle_Timeout_Upcall::timeout:"
+ "create_asynch_timer failed"),
+ -1);
+
+ // Post a completion.
+ if (asynch_timer->post_completion (this->proactor_->implementation ()) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ ASYS_TEXT ("Failure in dealing with timers: ")
+ ASYS_TEXT ("PostQueuedCompletionStatus failed\n")),
+ -1);
+ return 0;
+}
+
+int
+ACE_Proactor_Handle_Timeout_Upcall::cancellation (TIMER_QUEUE &timer_queue,
+ ACE_Handler *handler)
+{
+ ACE_UNUSED_ARG (timer_queue);
+ ACE_UNUSED_ARG (handler);
+
+ // Do nothing
+ return 0;
+}
+
+int
+ACE_Proactor_Handle_Timeout_Upcall::deletion (TIMER_QUEUE &timer_queue,
+ ACE_Handler *handler,
+ const void *arg)
+{
+ ACE_UNUSED_ARG (timer_queue);
+ ACE_UNUSED_ARG (handler);
+ ACE_UNUSED_ARG (arg);
+
+ // Do nothing
+ return 0;
+}
+
+int
+ACE_Proactor_Handle_Timeout_Upcall::proactor (ACE_Proactor &proactor)
+{
+ if (this->proactor_ == 0)
+ {
+ this->proactor_ = &proactor;
+ return 0;
+ }
+ else
+ ACE_ERROR_RETURN ((LM_ERROR,
+ ASYS_TEXT ("ACE_Proactor_Handle_Timeout_Upcall is only suppose")
+ ASYS_TEXT (" to be used with ONE (and only one) Proactor\n")),
+ -1);
+}
+
+// *********************************************************************
+
ACE_Proactor::ACE_Proactor (ACE_Proactor_Impl *implementation,
+ TIMER_QUEUE *tq,
int delete_implementation)
: implementation_ (0),
- delete_implementation_ (delete_implementation)
+ delete_implementation_ (delete_implementation),
+ timer_handler_ (0),
+ timer_queue_ (0),
+ delete_timer_queue_ (0)
{
this->implementation (implementation);
@@ -58,12 +273,24 @@ ACE_Proactor::ACE_Proactor (ACE_Proactor_Impl *implementation,
this->implementation (implementation);
this->delete_implementation_ = 1;
}
+
+ // Set the timer queue.
+ this->timer_queue (tq);
+
+ // Create the timer handler
+ ACE_NEW (this->timer_handler_,
+ ACE_Proactor_Timer_Handler (*this));
+
+ // Activate <timer_handler>
+ if (this->timer_handler_->activate (THR_NEW_LWP | THR_DETACHED) == -1)
+ ACE_ERROR ((LM_ERROR,
+ ASYS_TEXT ("%N:%l:(%P | %t):%p\n"),
+ ASYS_TEXT ("Task::activate:could not create thread\n")));
}
ACE_Proactor::~ACE_Proactor (void)
{
- if (this->delete_implementation_)
- delete this->implementation ();
+ this->close ();
}
ACE_Proactor *
@@ -183,7 +410,36 @@ ACE_Proactor::event_loop_done (void)
int
ACE_Proactor::close (void)
{
- return this->implementation ()->close ();
+ // Close the implementation.
+ if (this->implementation ()->close () == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "%N:%l:(%P | %t):%p\n",
+ "ACE_Proactor::close:implementation couldnt be closed"),
+ -1);
+
+ // Delete the implementation.
+ if (this->delete_implementation_)
+ {
+ delete this->implementation ();
+ this->implementation_ = 0;
+ }
+
+ // Take care of the timer handler
+ if (this->timer_handler_)
+ {
+ delete this->timer_handler_;
+ this->timer_handler_ = 0;
+ }
+
+ // Take care of the timer queue
+ if (this->delete_timer_queue_)
+ {
+ delete this->timer_queue_;
+ this->timer_queue_ = 0;
+ this->delete_timer_queue_ = 0;
+ }
+
+ return 0;
}
int
@@ -196,54 +452,80 @@ ACE_Proactor::register_handle (ACE_HANDLE handle,
long
ACE_Proactor::schedule_timer (ACE_Handler &handler,
- const void *act,
- const ACE_Time_Value &time)
+ const void *act,
+ const ACE_Time_Value &time)
{
- return this->implementation ()->schedule_timer (handler,
- act,
- time,
- ACE_Time_Value::zero);
+ return this->schedule_timer (handler,
+ act,
+ time,
+ ACE_Time_Value::zero);
}
long
ACE_Proactor::schedule_repeating_timer (ACE_Handler &handler,
- const void *act,
- const ACE_Time_Value &interval)
+ const void *act,
+ const ACE_Time_Value &interval)
{
- return this->implementation ()->schedule_timer (handler,
- act,
- interval,
- interval);
+ return this->schedule_timer (handler,
+ act,
+ interval,
+ interval);
}
long
ACE_Proactor::schedule_timer (ACE_Handler &handler,
- const void *act,
- const ACE_Time_Value &time,
- const ACE_Time_Value &interval)
-{
- return this->implementation ()->schedule_timer (handler,
- act,
- time,
- interval);
+ const void *act,
+ const ACE_Time_Value &time,
+ const ACE_Time_Value &interval)
+{
+ // absolute time.
+ ACE_Time_Value absolute_time =
+ this->timer_queue_->gettimeofday () + time;
+
+ // Only one guy goes in here at a time
+ ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex, ace_mon, this->timer_queue_->mutex (), -1);
+
+ // Schedule the timer
+ long result = this->timer_queue_->schedule (&handler,
+ act,
+ absolute_time,
+ interval);
+ if (result != -1)
+ {
+ // no failures: check to see if we are the earliest time
+ if (this->timer_queue_->earliest_time () == absolute_time)
+
+ // wake up the timer thread
+ if (this->timer_handler_->timer_event_.signal () == -1)
+ {
+ // Cancel timer
+ this->timer_queue_->cancel (result);
+ result = -1;
+ }
+ }
+ return result;
}
int
ACE_Proactor::cancel_timer (long timer_id,
- const void **arg,
- int dont_call_handle_close)
+ const void **arg,
+ int dont_call_handle_close)
{
- return this->implementation ()->cancel_timer (timer_id,
- arg,
- dont_call_handle_close);
+ // No need to singal timer event here. Even if the cancel timer was
+ // the earliest, we will have an extra wakeup.
+ return this->timer_queue_->cancel (timer_id,
+ arg,
+ dont_call_handle_close);
}
int
ACE_Proactor::cancel_timer (ACE_Handler &handler,
- int dont_call_handle_close)
+ int dont_call_handle_close)
{
- return this->implementation ()->cancel_timer (handler,
- dont_call_handle_close);
+ // No need to signal timer event here. Even if the cancel timer was
+ // the earliest, we will have an extra wakeup.
+ return this->timer_queue_->cancel (&handler,
+ dont_call_handle_close);
}
int
@@ -282,6 +564,38 @@ ACE_Proactor::number_of_threads (size_t threads)
this->implementation ()->number_of_threads (threads);
}
+ACE_Proactor::TIMER_QUEUE *
+ACE_Proactor::timer_queue (void) const
+{
+ return this->timer_queue_;
+}
+
+void
+ACE_Proactor::timer_queue (TIMER_QUEUE *tq)
+{
+ // Cleanup old timer queue.
+ if (this->delete_timer_queue_)
+ {
+ delete this->timer_queue_;
+ this->delete_timer_queue_ = 0;
+ }
+
+ // New timer queue.
+ if (tq == 0)
+ {
+ this->timer_queue_ = new TIMER_HEAP;
+ this->delete_timer_queue_ = 1;
+ }
+ else
+ {
+ this->timer_queue_ = tq;
+ this->delete_timer_queue_ = 0;
+ }
+
+ // Set the proactor in the timer queue's functor
+ this->timer_queue_->upcall_functor ().proactor (*this);
+}
+
ACE_HANDLE
ACE_Proactor::get_handle (void) const
{
@@ -470,13 +784,78 @@ ACE_Proactor::create_asynch_transmit_file_result (ACE_Handler &handler,
priority);
}
+ACE_Asynch_Result_Impl *
+ACE_Proactor::create_asynch_timer (ACE_Handler &handler,
+ const void *act,
+ const ACE_Time_Value &tv,
+ ACE_HANDLE event,
+ int priority)
+{
+ return this->implementation ()->create_asynch_timer (handler,
+ act,
+ tv,
+ event,
+ priority);
+}
+
void
ACE_Proactor::implementation (ACE_Proactor_Impl *implementation)
{
this->implementation_ = implementation;
}
+#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)
+template class ACE_Timer_Queue_T<ACE_Handler *,
+ ACE_Proactor_Handle_Timeout_Upcall,
+ ACE_SYNCH_RECURSIVE_MUTEX>;
+template class ACE_Timer_Queue_Iterator_T<ACE_Handler *,
+ ACE_Proactor_Handle_Timeout_Upcall,
+ ACE_SYNCH_RECURSIVE_MUTEX>;
+template class ACE_Timer_List_T<ACE_Handler *,
+ ACE_Proactor_Handle_Timeout_Upcall,
+ ACE_SYNCH_RECURSIVE_MUTEX>;
+template class ACE_Timer_List_Iterator_T<ACE_Handler *,
+ ACE_Proactor_Handle_Timeout_Upcall,
+ ACE_SYNCH_RECURSIVE_MUTEX>;
+template class ACE_Timer_Node_T<ACE_Handler *>;
+template class ACE_Unbounded_Set<ACE_Timer_Node_T<ACE_Handler *> *>;
+template class ACE_Unbounded_Set_Iterator<ACE_Timer_Node_T<ACE_Handler *> *>;
+template class ACE_Node <ACE_Timer_Node_T<ACE_Handler *> *>;
+template class ACE_Free_List<ACE_Timer_Node_T<ACE_Handler *> >;
+template class ACE_Locked_Free_List<ACE_Timer_Node_T<ACE_Handler *>, ACE_Null_Mutex>;
+template class ACE_Timer_Heap_T<ACE_Handler *,
+ ACE_Proactor_Handle_Timeout_Upcall,
+ ACE_SYNCH_RECURSIVE_MUTEX>;
+template class ACE_Timer_Heap_Iterator_T<ACE_Handler *,
+ ACE_Proactor_Handle_Timeout_Upcall,
+ ACE_SYNCH_RECURSIVE_MUTEX>;
+template class ACE_Timer_Wheel_T<ACE_Handler *,
+ ACE_Proactor_Handle_Timeout_Upcall,
+ ACE_SYNCH_RECURSIVE_MUTEX>;
+template class ACE_Timer_Wheel_Iterator_T<ACE_Handler *,
+ ACE_Proactor_Handle_Timeout_Upcall,
+ ACE_SYNCH_RECURSIVE_MUTEX>;
+#elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA)
+#pragma instantiate ACE_Timer_Queue_T<ACE_Handler *,\
+ ACE_Proactor_Handle_Timeout_Upcall, ACE_SYNCH_RECURSIVE_MUTEX>
+#pragma instantiate ACE_Timer_Queue_Iterator_T<ACE_Handler *,\
+ ACE_Proactor_Handle_Timeout_Upcall, ACE_SYNCH_RECURSIVE_MUTEX>
+#pragma instantiate ACE_Timer_List_T<ACE_Handler *,\
+ ACE_Proactor_Handle_Timeout_Upcall, ACE_SYNCH_RECURSIVE_MUTEX>
+#pragma instantiate ACE_Timer_List_Iterator_T<ACE_Handler *,\
+ ACE_Proactor_Handle_Timeout_Upcall, ACE_SYNCH_RECURSIVE_MUTEX>
+#pragma instantiate ACE_Timer_Heap_T<ACE_Handler *,\
+ ACE_Proactor_Handle_Timeout_Upcall, ACE_SYNCH_RECURSIVE_MUTEX>
+#pragma instantiate ACE_Timer_Heap_Iterator_T<ACE_Handler *,\
+ ACE_Proactor_Handle_Timeout_Upcall, ACE_SYNCH_RECURSIVE_MUTEX>
+#pragma instantiate ACE_Timer_Wheel_T<ACE_Handler *,
+ ACE_Proactor_Handle_Timeout_Upcall, ACE_SYNCH_RECURSIVE_MUTEX>
+#pragma instantiate ACE_Timer_Wheel_Iterator_T<ACE_Handler *,\
+ ACE_Proactor_Handle_Timeout_Upcall, ACE_SYNCH_RECURSIVE_MUTEX>
+#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */
+
#else /* !ACE_WIN32 || !ACE_HAS_AIO_CALLS */
+
ACE_Proactor *
ACE_Proactor::instance (size_t threads)
{
@@ -522,4 +901,5 @@ ACE_Proactor::event_loop_done (void)
{
return sig_atomic_t (1);
}
+
#endif /* ACE_WIN32 || ACE_HAS_AIO_CALLS*/