diff options
author | alex <alex@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 1998-06-11 19:45:51 +0000 |
---|---|---|
committer | alex <alex@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 1998-06-11 19:45:51 +0000 |
commit | 3d3dad5f989ff9d2d51d1262cb5da44fad1b2e47 (patch) | |
tree | d3bdb1f2466587af551a0caecbbeea9ce033f338 /ace/Proactor.cpp | |
parent | ce9941a44d00e73a5aeb814c2678c0d3aecaf6f9 (diff) | |
download | ATCD-3d3dad5f989ff9d2d51d1262cb5da44fad1b2e47.tar.gz |
*** empty log message ***
Diffstat (limited to 'ace/Proactor.cpp')
-rw-r--r-- | ace/Proactor.cpp | 226 |
1 files changed, 210 insertions, 16 deletions
diff --git a/ace/Proactor.cpp b/ace/Proactor.cpp index ae41e67d811..52553a5a6f8 100644 --- a/ace/Proactor.cpp +++ b/ace/Proactor.cpp @@ -4,8 +4,10 @@ #define ACE_BUILD_DLL #include "ace/Proactor.h" -#if defined (ACE_WIN32) && !defined (ACE_HAS_WINCE) -// This only works on Win32 platforms +#if (defined (ACE_WIN32) && !defined (ACE_HAS_WINCE)) \ + || (defined (ACE_HAS_AIO_CALLS)) +// This only works on Win32 platforms and on Unix platforms with aio +// calls. #include "ace/Task_T.h" #include "ace/Log_Msg.h" @@ -89,13 +91,16 @@ ACE_Proactor_Timer_Handler::~ACE_Proactor_Timer_Handler (void) int ACE_Proactor_Timer_Handler::svc (void) { +#if defined (ACE_HAS_AIO_CALLS) + return 0; +#else /* ACE_HAS_AIO_CALLS */ u_long time; ACE_Time_Value absolute_time; for (; !this->shutting_down_;) { // default value - time = INFINITE; + time = ACE_INFINITE; // If the timer queue is not empty if (!this->proactor_.timer_queue ()->is_empty ()) @@ -118,17 +123,18 @@ ACE_Proactor_Timer_Handler::svc (void) time); switch (result) { - case WAIT_TIMEOUT: + case ACE_WAIT_TIMEOUT: // timeout: expire timers this->proactor_.timer_queue ()->expire (); break; - case WAIT_FAILED: + case ACE_WAIT_FAILED: // error ACE_ERROR_RETURN ((LM_ERROR, ASYS_TEXT ("%p\n"), ASYS_TEXT ("WaitForSingleObject")), -1); } } return 0; +#endif /* ACE_HAS_AIO_CALLS */ } ACE_Proactor_Handle_Timeout_Upcall::ACE_Proactor_Handle_Timeout_Upcall (void) @@ -205,13 +211,36 @@ ACE_Proactor_Handle_Timeout_Upcall::proactor (ACE_Proactor &proactor) ACE_Proactor::ACE_Proactor (size_t number_of_threads, Timer_Queue *tq, int used_with_reactor_event_loop) - : completion_port_ (0), // This *MUST* be 0, *NOT* ACE_INVALID_HANDLE!!!! - number_of_threads_ (number_of_threads), - timer_queue_ (0), - delete_timer_queue_ (0), - timer_handler_ (0), - used_with_reactor_event_loop_ (used_with_reactor_event_loop) -{ + : +#if defined (ACE_HAS_AIO_CALLS) + #if defined (AIO_LISTIO_MAX) + aiocb_list_max_size_ (AIO_LISTIO_MAX), + #else /* AIO_LISTIO_MAX */ + aiocb_list_max_size_ (2), + #endif /* AIO_LISTIO_MAX */ + + aiocb_list_cur_size_ (0), +#else /* ACE_HAS_AIO_CALLS */ + completion_port_ (0), // This *MUST* be 0, *NOT* ACE_INVALID_HANDLE!!!! +#endif /* ACE_HAS_AIO_CALLS */ + + number_of_threads_ (number_of_threads), + timer_queue_ (0), + delete_timer_queue_ (0), + timer_handler_ (0), + used_with_reactor_event_loop_ (used_with_reactor_event_loop) +{ +#if defined (ACE_HAS_AIO_CALLS) + // Init the array. + for (size_t ai = 0; + ai < this->aiocb_list_max_size_; + ai++) + { + aiocb_list_ [ai] = 0; + result_list_ [ai] = 0; + } + ACE_UNUSED_ARG (tq); +#else /* ACE_HAS_AIO_CALLS */ // create the completion port this->completion_port_ = ::CreateIoCompletionPort (INVALID_HANDLE_VALUE, this->completion_port_, @@ -229,7 +258,7 @@ ACE_Proactor::ACE_Proactor (size_t number_of_threads, // activate <timer_handler> if (this->timer_handler_->activate (THR_NEW_LWP | THR_DETACHED) == -1) ACE_ERROR ((LM_ERROR, ASYS_TEXT ("%p Could not create thread\n"), ASYS_TEXT ("Task::activate"))); - +#endif /* ACE_HAS_AIO_CALLS */ } ACE_Proactor * @@ -350,6 +379,9 @@ ACE_Proactor::~ACE_Proactor (void) int ACE_Proactor::close (void) { +#if defined (ACE_HAS_AIO_CALLS) + return 0; +#else /* ACE_HAS_AIO_CALLS */ // Take care of the timer handler if (this->timer_handler_) { @@ -374,12 +406,18 @@ ACE_Proactor::close (void) } else return 0; +#endif /* ACE_HAS_AIO_CALLS */ } int ACE_Proactor::register_handle (ACE_HANDLE handle, const void *completion_key) { +#if defined (ACE_HAS_AIO_CALLS) + ACE_UNUSED_ARG (handle); + ACE_UNUSED_ARG (completion_key); + return 0; +#else /* ACE_HAS_AIO_CALLS */ // No locking is needed here as no state changes ACE_HANDLE cp = ::CreateIoCompletionPort (handle, this->completion_port_, @@ -394,6 +432,7 @@ ACE_Proactor::register_handle (ACE_HANDLE handle, ACE_ERROR_RETURN ((LM_ERROR, ASYS_TEXT ("%p\n"), ASYS_TEXT ("CreateIoCompletionPort")), -1); } return 0; +#endif /* ACE_HAS_AIO_CALLS */ } long @@ -491,15 +530,22 @@ ACE_Proactor::handle_close (ACE_HANDLE handle, return this->close (); } + +// @@ get_handle () implementation. ACE_HANDLE ACE_Proactor::get_handle (void) const { +#if defined (ACE_HAS_AIO_CALLS) + return ACE_INVALID_HANDLE; +#else /* ACE_HAS_AIO_CALLS */ if (this->used_with_reactor_event_loop_) return this->event_.handle (); else return 0; +#endif /* ACE_HAS_AIO_CALLS */ } + int ACE_Proactor::handle_events (ACE_Time_Value &wait_time) { @@ -511,13 +557,80 @@ ACE_Proactor::handle_events (ACE_Time_Value &wait_time) int ACE_Proactor::handle_events (void) { - return this->handle_events (INFINITE); + return this->handle_events (ACE_INFINITE); } int ACE_Proactor::handle_events (unsigned long milli_seconds) { - OVERLAPPED *overlapped = 0; +#if defined (ACE_HAS_AIO_CALLS) + // Is there any entries in the list. + if (this->aiocb_list_cur_size_ == 0) + { + ACE_DEBUG ((LM_DEBUG, "No AIO pending")); + return 0; + } + + // Wait for asynch operation to complete. + timespec timeout; + timeout.tv_sec = milli_seconds; + timeout.tv_nsec = 0; + if (aio_suspend (this->aiocb_list_, + this->aiocb_list_max_size_, + &timeout) < 0) + // If failure is coz of timeout, then return *0* but set errno + // appropriately. This is what the Win proactor does. + if (errno == EINTR) + ACE_ERROR_RETURN ((LM_ERROR, + "(%p):aio_suspend"), + 0); + else + ACE_ERROR_RETURN ((LM_ERROR, + "(%p):aio_suspend"), + 0); + + // Check which aio has finished. + size_t ai; + for (ai = 0; ai < this->aiocb_list_max_size_; ai++) + // Analyze error and return values. + if (aio_error (aiocb_list_ [ai]) != EINPROGRESS) + { + if (aio_return (aiocb_list_ [ai]) < 0) + ACE_ERROR_RETURN ((LM_ERROR, + "(%p):AIO failed"), + -1); + else + { + ACE_DEBUG ((LM_DEBUG, "An aio has finished\n")); + // This AIO is done. + break; + } + } + if (ai == this->aiocb_list_max_size_) + // Nothing completed. + return 0; + + // Get the values for the completed aio. + size_t bytes_transferred = aiocb_list_[ai]->aio_nbytes; + void *completion_key = (void *)aiocb_list_[ai]->aio_sigevent.sigev_value.sival_ptr; + ACE_Asynch_Result *asynch_result = this->result_list_[ai]; + + // Invalidate entry in the aiocb list. + delete this->aiocb_list_[ai]; + this->aiocb_list_[ai] = 0; + this->aiocb_list_cur_size_--; + this->result_list_[ai] = 0; + + // Call the application code. + this->application_specific_code (asynch_result, + bytes_transferred, + ACE_TRUE, + completion_key, + 0); + + return 0; +#else /* ACE_HAS_AIO_CALLS */ + ACE_OVERLAPPED *overlapped = 0; u_long bytes_transferred = 0; u_long completion_key = 0; @@ -556,6 +669,7 @@ ACE_Proactor::handle_events (unsigned long milli_seconds) errno); } return 0; +#endif /* ACE_HAS_AIO_CALLS */ } void @@ -583,6 +697,10 @@ ACE_Proactor::application_specific_code (ACE_Asynch_Result *asynch_result, int ACE_Proactor::post_completion (ACE_Asynch_Result *result) { +#if defined (ACE_HAS_AIO_CALLS) + ACE_UNUSED_ARG (result); + return 0; +#else /* ACE_HAS_AIO_CALLS */ // Grab the event associated with the Proactor HANDLE handle = this->get_handle (); @@ -603,6 +721,7 @@ ACE_Proactor::post_completion (ACE_Asynch_Result *result) } return 0; +#endif /* ACE_HAS_AIO_CALLS */ } int @@ -684,6 +803,81 @@ ACE_Proactor::Asynch_Timer::complete (u_long bytes_transferred, this->handler_.handle_time_out (this->time_, this->act ()); } +int +ACE_Proactor::insert_to_aiocb_list (aiocb *aiocb_ptr, + ACE_Asynch_Result *result) +{ + // Is there any place? + if (this->aiocb_list_cur_size_ >= this->aiocb_list_max_size_) + return -1; + + // Find the first free slot. + size_t ai; + for (ai = 0; + ai < this->aiocb_list_max_size_; + ai++) + if (this->aiocb_list_ [ai] == 0) + break; + + if (ai == this->aiocb_list_max_size_) + return -1; + + // Store the pointers. + this->aiocb_list_ [ai] = aiocb_ptr; + this->result_list_ [ai] = result; + this->aiocb_list_cur_size_ ++; + return 0; +} +#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION) +template class ACE_Timer_Queue_T<ACE_Handler *, + ACE_Proactor_Handle_Timeout_Upcall, + ACE_SYNCH_RECURSIVE_MUTEX>; +template class ACE_Timer_Queue_Iterator_T<ACE_Handler *, + ACE_Proactor_Handle_Timeout_Upcall, + ACE_SYNCH_RECURSIVE_MUTEX>; +template class ACE_Timer_List_T<ACE_Handler *, + ACE_Proactor_Handle_Timeout_Upcall, + ACE_SYNCH_RECURSIVE_MUTEX>; +template class ACE_Timer_List_Iterator_T<ACE_Handler *, + ACE_Proactor_Handle_Timeout_Upcall, + ACE_SYNCH_RECURSIVE_MUTEX>; +template class ACE_Timer_Node_T<ACE_Handler *>; +template class ACE_Unbounded_Set<ACE_Timer_Node_T<ACE_Handler *> *>; +template class ACE_Unbounded_Set_Iterator<ACE_Timer_Node_T<ACE_Handler *> *>; +template class ACE_Node <ACE_Timer_Node_T<ACE_Handler *> *>; +template class ACE_Free_List<ACE_Timer_Node_T<ACE_Handler *> >; +template class ACE_Locked_Free_List<ACE_Timer_Node_T<ACE_Handler *>, ACE_Null_Mutex>; +template class ACE_Timer_Heap_T<ACE_Handler *, + ACE_Proactor_Handle_Timeout_Upcall, + ACE_SYNCH_RECURSIVE_MUTEX>; +template class ACE_Timer_Heap_Iterator_T<ACE_Handler *, + ACE_Proactor_Handle_Timeout_Upcall, + ACE_SYNCH_RECURSIVE_MUTEX>; +template class ACE_Timer_Wheel_T<ACE_Handler *, + ACE_Proactor_Handle_Timeout_Upcall, + ACE_SYNCH_RECURSIVE_MUTEX>; +template class ACE_Timer_Wheel_Iterator_T<ACE_Handler *, + ACE_Proactor_Handle_Timeout_Upcall, + ACE_SYNCH_RECURSIVE_MUTEX>; +#elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA) +#pragma instantiate ACE_Timer_Queue_T<ACE_Handler *,\ + ACE_Proactor_Handle_Timeout_Upcall, ACE_SYNCH_RECURSIVE_MUTEX> +#pragma instantiate ACE_Timer_Queue_Iterator_T<ACE_Handler *,\ + ACE_Proactor_Handle_Timeout_Upcall, ACE_SYNCH_RECURSIVE_MUTEX> +#pragma instantiate ACE_Timer_List_T<ACE_Handler *,\ + ACE_Proactor_Handle_Timeout_Upcall, ACE_SYNCH_RECURSIVE_MUTEX> +#pragma instantiate ACE_Timer_List_Iterator_T<ACE_Handler *,\ + ACE_Proactor_Handle_Timeout_Upcall, ACE_SYNCH_RECURSIVE_MUTEX> +#pragma instantiate ACE_Timer_Heap_T<ACE_Handler *,\ + ACE_Proactor_Handle_Timeout_Upcall, ACE_SYNCH_RECURSIVE_MUTEX> +#pragma instantiate ACE_Timer_Heap_Iterator_T<ACE_Handler *,\ + ACE_Proactor_Handle_Timeout_Upcall, ACE_SYNCH_RECURSIVE_MUTEX> +#pragma instantiate ACE_Timer_Wheel_T<ACE_Handler *, + ACE_Proactor_Handle_Timeout_Upcall, ACE_SYNCH_RECURSIVE_MUTEX> +#pragma instantiate ACE_Timer_Wheel_Iterator_T<ACE_Handler *,\ + ACE_Proactor_Handle_Timeout_Upcall, ACE_SYNCH_RECURSIVE_MUTEX> +#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */ + #else /* ACE_WIN32 */ ACE_Proactor * @@ -731,4 +925,4 @@ ACE_Proactor::event_loop_done (void) { return sig_atomic_t (1); } -#endif /* ACE_WIN32 */ +#endif /* ACE_WIN32 || ACE_HAS_AIO_CALLS*/ |