summaryrefslogtreecommitdiff
path: root/ace/Proactor.cpp
diff options
context:
space:
mode:
authoralex <alex@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>1998-06-11 19:45:51 +0000
committeralex <alex@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>1998-06-11 19:45:51 +0000
commit3d3dad5f989ff9d2d51d1262cb5da44fad1b2e47 (patch)
treed3bdb1f2466587af551a0caecbbeea9ce033f338 /ace/Proactor.cpp
parentce9941a44d00e73a5aeb814c2678c0d3aecaf6f9 (diff)
downloadATCD-3d3dad5f989ff9d2d51d1262cb5da44fad1b2e47.tar.gz
*** empty log message ***
Diffstat (limited to 'ace/Proactor.cpp')
-rw-r--r--ace/Proactor.cpp226
1 files changed, 210 insertions, 16 deletions
diff --git a/ace/Proactor.cpp b/ace/Proactor.cpp
index ae41e67d811..52553a5a6f8 100644
--- a/ace/Proactor.cpp
+++ b/ace/Proactor.cpp
@@ -4,8 +4,10 @@
#define ACE_BUILD_DLL
#include "ace/Proactor.h"
-#if defined (ACE_WIN32) && !defined (ACE_HAS_WINCE)
-// This only works on Win32 platforms
+#if (defined (ACE_WIN32) && !defined (ACE_HAS_WINCE)) \
+ || (defined (ACE_HAS_AIO_CALLS))
+// This only works on Win32 platforms and on Unix platforms with aio
+// calls.
#include "ace/Task_T.h"
#include "ace/Log_Msg.h"
@@ -89,13 +91,16 @@ ACE_Proactor_Timer_Handler::~ACE_Proactor_Timer_Handler (void)
int
ACE_Proactor_Timer_Handler::svc (void)
{
+#if defined (ACE_HAS_AIO_CALLS)
+ return 0;
+#else /* ACE_HAS_AIO_CALLS */
u_long time;
ACE_Time_Value absolute_time;
for (; !this->shutting_down_;)
{
// default value
- time = INFINITE;
+ time = ACE_INFINITE;
// If the timer queue is not empty
if (!this->proactor_.timer_queue ()->is_empty ())
@@ -118,17 +123,18 @@ ACE_Proactor_Timer_Handler::svc (void)
time);
switch (result)
{
- case WAIT_TIMEOUT:
+ case ACE_WAIT_TIMEOUT:
// timeout: expire timers
this->proactor_.timer_queue ()->expire ();
break;
- case WAIT_FAILED:
+ case ACE_WAIT_FAILED:
// error
ACE_ERROR_RETURN ((LM_ERROR, ASYS_TEXT ("%p\n"), ASYS_TEXT ("WaitForSingleObject")), -1);
}
}
return 0;
+#endif /* ACE_HAS_AIO_CALLS */
}
ACE_Proactor_Handle_Timeout_Upcall::ACE_Proactor_Handle_Timeout_Upcall (void)
@@ -205,13 +211,36 @@ ACE_Proactor_Handle_Timeout_Upcall::proactor (ACE_Proactor &proactor)
ACE_Proactor::ACE_Proactor (size_t number_of_threads,
Timer_Queue *tq,
int used_with_reactor_event_loop)
- : completion_port_ (0), // This *MUST* be 0, *NOT* ACE_INVALID_HANDLE!!!!
- number_of_threads_ (number_of_threads),
- timer_queue_ (0),
- delete_timer_queue_ (0),
- timer_handler_ (0),
- used_with_reactor_event_loop_ (used_with_reactor_event_loop)
-{
+ :
+#if defined (ACE_HAS_AIO_CALLS)
+ #if defined (AIO_LISTIO_MAX)
+ aiocb_list_max_size_ (AIO_LISTIO_MAX),
+ #else /* AIO_LISTIO_MAX */
+ aiocb_list_max_size_ (2),
+ #endif /* AIO_LISTIO_MAX */
+
+ aiocb_list_cur_size_ (0),
+#else /* ACE_HAS_AIO_CALLS */
+ completion_port_ (0), // This *MUST* be 0, *NOT* ACE_INVALID_HANDLE!!!!
+#endif /* ACE_HAS_AIO_CALLS */
+
+ number_of_threads_ (number_of_threads),
+ timer_queue_ (0),
+ delete_timer_queue_ (0),
+ timer_handler_ (0),
+ used_with_reactor_event_loop_ (used_with_reactor_event_loop)
+{
+#if defined (ACE_HAS_AIO_CALLS)
+ // Init the array.
+ for (size_t ai = 0;
+ ai < this->aiocb_list_max_size_;
+ ai++)
+ {
+ aiocb_list_ [ai] = 0;
+ result_list_ [ai] = 0;
+ }
+ ACE_UNUSED_ARG (tq);
+#else /* ACE_HAS_AIO_CALLS */
// create the completion port
this->completion_port_ = ::CreateIoCompletionPort (INVALID_HANDLE_VALUE,
this->completion_port_,
@@ -229,7 +258,7 @@ ACE_Proactor::ACE_Proactor (size_t number_of_threads,
// activate <timer_handler>
if (this->timer_handler_->activate (THR_NEW_LWP | THR_DETACHED) == -1)
ACE_ERROR ((LM_ERROR, ASYS_TEXT ("%p Could not create thread\n"), ASYS_TEXT ("Task::activate")));
-
+#endif /* ACE_HAS_AIO_CALLS */
}
ACE_Proactor *
@@ -350,6 +379,9 @@ ACE_Proactor::~ACE_Proactor (void)
int
ACE_Proactor::close (void)
{
+#if defined (ACE_HAS_AIO_CALLS)
+ return 0;
+#else /* ACE_HAS_AIO_CALLS */
// Take care of the timer handler
if (this->timer_handler_)
{
@@ -374,12 +406,18 @@ ACE_Proactor::close (void)
}
else
return 0;
+#endif /* ACE_HAS_AIO_CALLS */
}
int
ACE_Proactor::register_handle (ACE_HANDLE handle,
const void *completion_key)
{
+#if defined (ACE_HAS_AIO_CALLS)
+ ACE_UNUSED_ARG (handle);
+ ACE_UNUSED_ARG (completion_key);
+ return 0;
+#else /* ACE_HAS_AIO_CALLS */
// No locking is needed here as no state changes
ACE_HANDLE cp = ::CreateIoCompletionPort (handle,
this->completion_port_,
@@ -394,6 +432,7 @@ ACE_Proactor::register_handle (ACE_HANDLE handle,
ACE_ERROR_RETURN ((LM_ERROR, ASYS_TEXT ("%p\n"), ASYS_TEXT ("CreateIoCompletionPort")), -1);
}
return 0;
+#endif /* ACE_HAS_AIO_CALLS */
}
long
@@ -491,15 +530,22 @@ ACE_Proactor::handle_close (ACE_HANDLE handle,
return this->close ();
}
+
+// @@ get_handle () implementation.
ACE_HANDLE
ACE_Proactor::get_handle (void) const
{
+#if defined (ACE_HAS_AIO_CALLS)
+ return ACE_INVALID_HANDLE;
+#else /* ACE_HAS_AIO_CALLS */
if (this->used_with_reactor_event_loop_)
return this->event_.handle ();
else
return 0;
+#endif /* ACE_HAS_AIO_CALLS */
}
+
int
ACE_Proactor::handle_events (ACE_Time_Value &wait_time)
{
@@ -511,13 +557,80 @@ ACE_Proactor::handle_events (ACE_Time_Value &wait_time)
int
ACE_Proactor::handle_events (void)
{
- return this->handle_events (INFINITE);
+ return this->handle_events (ACE_INFINITE);
}
int
ACE_Proactor::handle_events (unsigned long milli_seconds)
{
- OVERLAPPED *overlapped = 0;
+#if defined (ACE_HAS_AIO_CALLS)
+ // Is there any entries in the list.
+ if (this->aiocb_list_cur_size_ == 0)
+ {
+ ACE_DEBUG ((LM_DEBUG, "No AIO pending"));
+ return 0;
+ }
+
+ // Wait for asynch operation to complete.
+ timespec timeout;
+ timeout.tv_sec = milli_seconds;
+ timeout.tv_nsec = 0;
+ if (aio_suspend (this->aiocb_list_,
+ this->aiocb_list_max_size_,
+ &timeout) < 0)
+ // If failure is coz of timeout, then return *0* but set errno
+ // appropriately. This is what the Win proactor does.
+ if (errno == EINTR)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "(%p):aio_suspend"),
+ 0);
+ else
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "(%p):aio_suspend"),
+ 0);
+
+ // Check which aio has finished.
+ size_t ai;
+ for (ai = 0; ai < this->aiocb_list_max_size_; ai++)
+ // Analyze error and return values.
+ if (aio_error (aiocb_list_ [ai]) != EINPROGRESS)
+ {
+ if (aio_return (aiocb_list_ [ai]) < 0)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "(%p):AIO failed"),
+ -1);
+ else
+ {
+ ACE_DEBUG ((LM_DEBUG, "An aio has finished\n"));
+ // This AIO is done.
+ break;
+ }
+ }
+ if (ai == this->aiocb_list_max_size_)
+ // Nothing completed.
+ return 0;
+
+ // Get the values for the completed aio.
+ size_t bytes_transferred = aiocb_list_[ai]->aio_nbytes;
+ void *completion_key = (void *)aiocb_list_[ai]->aio_sigevent.sigev_value.sival_ptr;
+ ACE_Asynch_Result *asynch_result = this->result_list_[ai];
+
+ // Invalidate entry in the aiocb list.
+ delete this->aiocb_list_[ai];
+ this->aiocb_list_[ai] = 0;
+ this->aiocb_list_cur_size_--;
+ this->result_list_[ai] = 0;
+
+ // Call the application code.
+ this->application_specific_code (asynch_result,
+ bytes_transferred,
+ ACE_TRUE,
+ completion_key,
+ 0);
+
+ return 0;
+#else /* ACE_HAS_AIO_CALLS */
+ ACE_OVERLAPPED *overlapped = 0;
u_long bytes_transferred = 0;
u_long completion_key = 0;
@@ -556,6 +669,7 @@ ACE_Proactor::handle_events (unsigned long milli_seconds)
errno);
}
return 0;
+#endif /* ACE_HAS_AIO_CALLS */
}
void
@@ -583,6 +697,10 @@ ACE_Proactor::application_specific_code (ACE_Asynch_Result *asynch_result,
int
ACE_Proactor::post_completion (ACE_Asynch_Result *result)
{
+#if defined (ACE_HAS_AIO_CALLS)
+ ACE_UNUSED_ARG (result);
+ return 0;
+#else /* ACE_HAS_AIO_CALLS */
// Grab the event associated with the Proactor
HANDLE handle = this->get_handle ();
@@ -603,6 +721,7 @@ ACE_Proactor::post_completion (ACE_Asynch_Result *result)
}
return 0;
+#endif /* ACE_HAS_AIO_CALLS */
}
int
@@ -684,6 +803,81 @@ ACE_Proactor::Asynch_Timer::complete (u_long bytes_transferred,
this->handler_.handle_time_out (this->time_, this->act ());
}
+int
+ACE_Proactor::insert_to_aiocb_list (aiocb *aiocb_ptr,
+ ACE_Asynch_Result *result)
+{
+ // Is there any place?
+ if (this->aiocb_list_cur_size_ >= this->aiocb_list_max_size_)
+ return -1;
+
+ // Find the first free slot.
+ size_t ai;
+ for (ai = 0;
+ ai < this->aiocb_list_max_size_;
+ ai++)
+ if (this->aiocb_list_ [ai] == 0)
+ break;
+
+ if (ai == this->aiocb_list_max_size_)
+ return -1;
+
+ // Store the pointers.
+ this->aiocb_list_ [ai] = aiocb_ptr;
+ this->result_list_ [ai] = result;
+ this->aiocb_list_cur_size_ ++;
+ return 0;
+}
+#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)
+template class ACE_Timer_Queue_T<ACE_Handler *,
+ ACE_Proactor_Handle_Timeout_Upcall,
+ ACE_SYNCH_RECURSIVE_MUTEX>;
+template class ACE_Timer_Queue_Iterator_T<ACE_Handler *,
+ ACE_Proactor_Handle_Timeout_Upcall,
+ ACE_SYNCH_RECURSIVE_MUTEX>;
+template class ACE_Timer_List_T<ACE_Handler *,
+ ACE_Proactor_Handle_Timeout_Upcall,
+ ACE_SYNCH_RECURSIVE_MUTEX>;
+template class ACE_Timer_List_Iterator_T<ACE_Handler *,
+ ACE_Proactor_Handle_Timeout_Upcall,
+ ACE_SYNCH_RECURSIVE_MUTEX>;
+template class ACE_Timer_Node_T<ACE_Handler *>;
+template class ACE_Unbounded_Set<ACE_Timer_Node_T<ACE_Handler *> *>;
+template class ACE_Unbounded_Set_Iterator<ACE_Timer_Node_T<ACE_Handler *> *>;
+template class ACE_Node <ACE_Timer_Node_T<ACE_Handler *> *>;
+template class ACE_Free_List<ACE_Timer_Node_T<ACE_Handler *> >;
+template class ACE_Locked_Free_List<ACE_Timer_Node_T<ACE_Handler *>, ACE_Null_Mutex>;
+template class ACE_Timer_Heap_T<ACE_Handler *,
+ ACE_Proactor_Handle_Timeout_Upcall,
+ ACE_SYNCH_RECURSIVE_MUTEX>;
+template class ACE_Timer_Heap_Iterator_T<ACE_Handler *,
+ ACE_Proactor_Handle_Timeout_Upcall,
+ ACE_SYNCH_RECURSIVE_MUTEX>;
+template class ACE_Timer_Wheel_T<ACE_Handler *,
+ ACE_Proactor_Handle_Timeout_Upcall,
+ ACE_SYNCH_RECURSIVE_MUTEX>;
+template class ACE_Timer_Wheel_Iterator_T<ACE_Handler *,
+ ACE_Proactor_Handle_Timeout_Upcall,
+ ACE_SYNCH_RECURSIVE_MUTEX>;
+#elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA)
+#pragma instantiate ACE_Timer_Queue_T<ACE_Handler *,\
+ ACE_Proactor_Handle_Timeout_Upcall, ACE_SYNCH_RECURSIVE_MUTEX>
+#pragma instantiate ACE_Timer_Queue_Iterator_T<ACE_Handler *,\
+ ACE_Proactor_Handle_Timeout_Upcall, ACE_SYNCH_RECURSIVE_MUTEX>
+#pragma instantiate ACE_Timer_List_T<ACE_Handler *,\
+ ACE_Proactor_Handle_Timeout_Upcall, ACE_SYNCH_RECURSIVE_MUTEX>
+#pragma instantiate ACE_Timer_List_Iterator_T<ACE_Handler *,\
+ ACE_Proactor_Handle_Timeout_Upcall, ACE_SYNCH_RECURSIVE_MUTEX>
+#pragma instantiate ACE_Timer_Heap_T<ACE_Handler *,\
+ ACE_Proactor_Handle_Timeout_Upcall, ACE_SYNCH_RECURSIVE_MUTEX>
+#pragma instantiate ACE_Timer_Heap_Iterator_T<ACE_Handler *,\
+ ACE_Proactor_Handle_Timeout_Upcall, ACE_SYNCH_RECURSIVE_MUTEX>
+#pragma instantiate ACE_Timer_Wheel_T<ACE_Handler *,
+ ACE_Proactor_Handle_Timeout_Upcall, ACE_SYNCH_RECURSIVE_MUTEX>
+#pragma instantiate ACE_Timer_Wheel_Iterator_T<ACE_Handler *,\
+ ACE_Proactor_Handle_Timeout_Upcall, ACE_SYNCH_RECURSIVE_MUTEX>
+#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */
+
#else /* ACE_WIN32 */
ACE_Proactor *
@@ -731,4 +925,4 @@ ACE_Proactor::event_loop_done (void)
{
return sig_atomic_t (1);
}
-#endif /* ACE_WIN32 */
+#endif /* ACE_WIN32 || ACE_HAS_AIO_CALLS*/