diff options
author | William R. Otte <wotte@dre.vanderbilt.edu> | 2008-03-04 14:51:23 +0000 |
---|---|---|
committer | William R. Otte <wotte@dre.vanderbilt.edu> | 2008-03-04 14:51:23 +0000 |
commit | 99aa8c60282c7b8072eb35eb9ac815702f5bf586 (patch) | |
tree | bda96bf8c3a4c2875a083d7b16720533c8ffeaf4 /ACE/ace/POSIX_CB_Proactor.cpp | |
parent | c4078c377d74290ebe4e66da0b4975da91732376 (diff) | |
download | ATCD-99aa8c60282c7b8072eb35eb9ac815702f5bf586.tar.gz |
undoing accidental deletion
Diffstat (limited to 'ACE/ace/POSIX_CB_Proactor.cpp')
-rw-r--r-- | ACE/ace/POSIX_CB_Proactor.cpp | 185 |
1 files changed, 185 insertions, 0 deletions
diff --git a/ACE/ace/POSIX_CB_Proactor.cpp b/ACE/ace/POSIX_CB_Proactor.cpp new file mode 100644 index 00000000000..3ce4218090a --- /dev/null +++ b/ACE/ace/POSIX_CB_Proactor.cpp @@ -0,0 +1,185 @@ +// $Id$ + +#include "ace/POSIX_CB_Proactor.h" + +#if defined (ACE_HAS_AIO_CALLS) && !defined (ACE_HAS_BROKEN_SIGEVENT_STRUCT) + +#include "ace/Task_T.h" +#include "ace/Log_Msg.h" +#include "ace/Object_Manager.h" +#include "ace/OS_NS_sys_time.h" + +ACE_RCSID (ace, + POSIX_CB_Proactor, + "$Id$") + +ACE_BEGIN_VERSIONED_NAMESPACE_DECL + +ACE_POSIX_CB_Proactor::ACE_POSIX_CB_Proactor (size_t max_aio_operations) + : ACE_POSIX_AIOCB_Proactor (max_aio_operations, + ACE_POSIX_Proactor::PROACTOR_CB), + sema_ ((unsigned int) 0) +{ + // we should start pseudo-asynchronous accept task + // one per all future acceptors + + this->get_asynch_pseudo_task ().start (); +} + +// Destructor. +ACE_POSIX_CB_Proactor::~ACE_POSIX_CB_Proactor (void) +{ + this->close (); +} + +ACE_POSIX_Proactor::Proactor_Type +ACE_POSIX_CB_Proactor::get_impl_type (void) +{ + return PROACTOR_CB; +} + +void ACE_POSIX_CB_Proactor::aio_completion_func (sigval cb_data) +{ + ACE_POSIX_CB_Proactor * impl = static_cast<ACE_POSIX_CB_Proactor *> (cb_data.sival_ptr); + if ( impl != 0 ) + impl->notify_completion (0); +} + +#if defined (ACE_HAS_SIG_C_FUNC) +extern "C" void +ACE_POSIX_CB_Proactor_aio_completion (sigval cb_data) +{ + ACE_POSIX_CB_Proactor::aio_completion_func (cb_data); +} +#endif /* ACE_HAS_SIG_C_FUNC */ + +int +ACE_POSIX_CB_Proactor::handle_events (ACE_Time_Value &wait_time) +{ + // Decrement <wait_time> with the amount of time spent in the method + ACE_Countdown_Time countdown (&wait_time); + return this->handle_events_i (wait_time.msec ()); +} + +int +ACE_POSIX_CB_Proactor::handle_events (void) +{ + return this->handle_events_i (ACE_INFINITE); +} + +int +ACE_POSIX_CB_Proactor::notify_completion (int sig_num) +{ + ACE_UNUSED_ARG (sig_num); + + return this->sema_.release(); +} + + +ssize_t +ACE_POSIX_CB_Proactor::allocate_aio_slot (ACE_POSIX_Asynch_Result *result) +{ + ssize_t slot = ACE_POSIX_AIOCB_Proactor::allocate_aio_slot (result); + if (slot == -1) + return -1; + + // setup OS notification methods for this aio + // @@ TODO: This gets the completion method back to this proactor to + // find the completed aiocb. It would be so much better to not only get + // the proactor, but the aiocb as well. +#if defined(__sgi) + result->aio_sigevent.sigev_notify = SIGEV_CALLBACK; + result->aio_sigevent.sigev_func = aio_completion_func ; +#else + result->aio_sigevent.sigev_notify = SIGEV_THREAD; +# if defined (ACE_HAS_SIG_C_FUNC) + result->aio_sigevent.sigev_notify_function = + ACE_POSIX_CB_Proactor_aio_completion; +# else + result->aio_sigevent.sigev_notify_function = aio_completion_func; +# endif /* ACE_HAS_SIG_C_FUNC */ + result->aio_sigevent.sigev_notify_attributes = 0; +#endif /* __sgi */ + + result->aio_sigevent.sigev_value.sival_ptr = this ; + + return slot; +} + +int +ACE_POSIX_CB_Proactor::handle_events_i (u_long milli_seconds) +{ + + int result_wait=0; + + // Wait for the signals. + if (milli_seconds == ACE_INFINITE) + { + result_wait = this->sema_.acquire(); + } + else + { + // Wait for <milli_seconds> amount of time. + ACE_Time_Value abs_time = ACE_OS::gettimeofday () + + ACE_Time_Value (0, milli_seconds * 1000); + + result_wait = this->sema_.acquire(abs_time); + } + + // Check for errors + // but let continue work in case of errors + // we should check "post_completed" queue + if (result_wait == -1) + { + int const lerror = errno; + if (lerror != ETIME && // timeout + lerror != EINTR ) // interrupted system call + ACE_ERROR ((LM_ERROR, + "%N:%l:(%P | %t)::%p\n", + "ACE_POSIX_CB_Proactor::handle_events:" + "semaphore acquire failed" + )); + } + + size_t index = 0; // start index to scan aiocb list + size_t count = this->aiocb_list_max_size_; // max number to iterate + + int error_status = 0; + size_t return_status = 0; + + int ret_aio = 0; + int ret_que = 0; + + for (;; ret_aio++) + { + ACE_POSIX_Asynch_Result * asynch_result = + this->find_completed_aio (error_status, + return_status, + index, + count); + + if (asynch_result == 0) + break; + + // Call the application code. + this->application_specific_code (asynch_result, + return_status, // Bytes transferred. + 0, // No completion key. + error_status); // Error + } + + // process post_completed results + ret_que = this->process_result_queue (); + + // Uncomment this if you want to test + // and research the behavior of you system + // ACE_DEBUG ((LM_DEBUG, + // "(%t) NumAIO=%d NumQueue=%d\n", + // ret_aio, ret_que)); + + return ret_aio + ret_que > 0 ? 1 : 0; +} + +ACE_END_VERSIONED_NAMESPACE_DECL + +#endif /* ACE_HAS_AIO_CALLS && !ACE_HAS_BROKEN_SIGEVENT_STRUCT */ |