summaryrefslogtreecommitdiff
path: root/ACE/ace/POSIX_CB_Proactor.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'ACE/ace/POSIX_CB_Proactor.cpp')
-rw-r--r--ACE/ace/POSIX_CB_Proactor.cpp184
1 files changed, 184 insertions, 0 deletions
diff --git a/ACE/ace/POSIX_CB_Proactor.cpp b/ACE/ace/POSIX_CB_Proactor.cpp
new file mode 100644
index 00000000000..708fdcf8b5d
--- /dev/null
+++ b/ACE/ace/POSIX_CB_Proactor.cpp
@@ -0,0 +1,184 @@
+// $Id$
+
+#include "ace/POSIX_CB_Proactor.h"
+
+#if defined (ACE_HAS_AIO_CALLS) && !defined(__Lynx__) && !defined (__FreeBSD__)
+
+#include "ace/Task_T.h"
+#include "ace/Log_Msg.h"
+#include "ace/Object_Manager.h"
+#include "ace/OS_NS_sys_time.h"
+
+ACE_RCSID (ace,
+ POSIX_CB_Proactor,
+ "$Id$")
+
+ACE_BEGIN_VERSIONED_NAMESPACE_DECL
+
+ACE_POSIX_CB_Proactor::ACE_POSIX_CB_Proactor (size_t max_aio_operations)
+ : ACE_POSIX_AIOCB_Proactor (max_aio_operations,
+ ACE_POSIX_Proactor::PROACTOR_CB),
+ sema_ ((unsigned int) 0)
+{
+ // we should start pseudo-asynchronous accept task
+ // one per all future acceptors
+
+ this->get_asynch_pseudo_task ().start ();
+}
+
+// Destructor.
+ACE_POSIX_CB_Proactor::~ACE_POSIX_CB_Proactor (void)
+{
+ this->close ();
+}
+
+ACE_POSIX_Proactor::Proactor_Type
+ACE_POSIX_CB_Proactor::get_impl_type (void)
+{
+ return PROACTOR_CB;
+}
+
+void ACE_POSIX_CB_Proactor::aio_completion_func (sigval_t cb_data)
+{
+ ACE_POSIX_CB_Proactor * impl = static_cast<ACE_POSIX_CB_Proactor *> (cb_data.sival_ptr);
+ if ( impl != 0 )
+ impl->notify_completion (0);
+}
+
+#if defined (ACE_HAS_SIG_C_FUNC)
+extern "C" void
+ACE_POSIX_CB_Proactor_aio_completion (sigval_t cb_data)
+{
+ ACE_POSIX_CB_Proactor::aio_completion_func (cb_data);
+}
+#endif /* ACE_HAS_SIG_C_FUNC */
+
+int
+ACE_POSIX_CB_Proactor::handle_events (ACE_Time_Value &wait_time)
+{
+ // Decrement <wait_time> with the amount of time spent in the method
+ ACE_Countdown_Time countdown (&wait_time);
+ return this->handle_events_i (wait_time.msec ());
+}
+
+int
+ACE_POSIX_CB_Proactor::handle_events (void)
+{
+ return this->handle_events_i (ACE_INFINITE);
+}
+
+int
+ACE_POSIX_CB_Proactor::notify_completion (int sig_num)
+{
+ ACE_UNUSED_ARG (sig_num);
+
+ return this->sema_.release();
+}
+
+
+ssize_t
+ACE_POSIX_CB_Proactor::allocate_aio_slot (ACE_POSIX_Asynch_Result *result)
+{
+ ssize_t slot = ACE_POSIX_AIOCB_Proactor::allocate_aio_slot (result);
+ if (slot == -1)
+ return -1;
+
+ // setup OS notification methods for this aio
+ // @@ TODO: This gets the completion method back to this proactor to
+ // find the completed aiocb. It would be so much better to not only get
+ // the proactor, but the aiocb as well.
+#if defined(__sgi)
+ result->aio_sigevent.sigev_notify = SIGEV_CALLBACK;
+ result->aio_sigevent.sigev_func = aio_completion_func ;
+#else
+ result->aio_sigevent.sigev_notify = SIGEV_THREAD;
+# if defined (ACE_HAS_SIG_C_FUNC)
+ result->aio_sigevent.sigev_notify_function =
+ ACE_POSIX_CB_Proactor_aio_completion;
+# else
+ result->aio_sigevent.sigev_notify_function = aio_completion_func;
+# endif /* ACE_HAS_SIG_C_FUNC */
+ result->aio_sigevent.sigev_notify_attributes = 0;
+#endif /* __sgi */
+
+ result->aio_sigevent.sigev_value.sival_ptr = this ;
+
+ return slot;
+}
+
+int
+ACE_POSIX_CB_Proactor::handle_events_i (u_long milli_seconds)
+{
+
+ int result_wait=0;
+
+ // Wait for the signals.
+ if (milli_seconds == ACE_INFINITE)
+ {
+ result_wait = this->sema_.acquire();
+ }
+ else
+ {
+ // Wait for <milli_seconds> amount of time.
+ ACE_Time_Value abs_time = ACE_OS::gettimeofday ()
+ + ACE_Time_Value (0, milli_seconds * 1000);
+
+ result_wait = this->sema_.acquire(abs_time);
+ }
+
+ // Check for errors
+ // but let continue work in case of errors
+ // we should check "post_completed" queue
+ if (result_wait == -1)
+ {
+ if (errno != ETIME && // timeout
+ errno != EINTR ) // interrupted system call
+ ACE_ERROR ((LM_ERROR,
+ "%N:%l:(%P | %t)::%p\n",
+ "ACE_POSIX_CB_Proactor::handle_events:"
+ "semaphore acquire failed"
+ ));
+ }
+
+ size_t index = 0; // start index to scan aiocb list
+ size_t count = this->aiocb_list_max_size_; // max number to iterate
+
+ int error_status = 0;
+ size_t return_status = 0;
+
+ int ret_aio = 0;
+ int ret_que = 0;
+
+ for (;; ret_aio++)
+ {
+ ACE_POSIX_Asynch_Result * asynch_result =
+ this->find_completed_aio (error_status,
+ return_status,
+ index,
+ count);
+
+ if (asynch_result == 0)
+ break;
+
+ // Call the application code.
+ this->application_specific_code (asynch_result,
+ return_status, // Bytes transferred.
+ 0, // No completion key.
+ error_status); // Error
+ }
+
+ // process post_completed results
+ ret_que = this->process_result_queue ();
+
+ // Uncomment this if you want to test
+ // and research the behavior of you system
+ // ACE_DEBUG ((LM_DEBUG,
+ // "(%t) NumAIO=%d NumQueue=%d\n",
+ // ret_aio, ret_que));
+
+ return ret_aio + ret_que > 0 ? 1 : 0;
+}
+
+ACE_END_VERSIONED_NAMESPACE_DECL
+
+#endif /* ACE_HAS_AIO_CALLS && !__Lynx__ && !__FreeBSD__ */