summaryrefslogtreecommitdiff
path: root/ACE/ace/SUN_Proactor.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'ACE/ace/SUN_Proactor.cpp')
-rw-r--r--ACE/ace/SUN_Proactor.cpp324
1 files changed, 324 insertions, 0 deletions
diff --git a/ACE/ace/SUN_Proactor.cpp b/ACE/ace/SUN_Proactor.cpp
new file mode 100644
index 00000000000..c522f18d64b
--- /dev/null
+++ b/ACE/ace/SUN_Proactor.cpp
@@ -0,0 +1,324 @@
+// $Id$
+
+#include "ace/SUN_Proactor.h"
+
+#if defined (ACE_HAS_AIO_CALLS) && defined (sun)
+
+#include "ace/Task_T.h"
+#include "ace/Log_Msg.h"
+#include "ace/Object_Manager.h"
+
+
+ACE_RCSID (ace,
+ POSIX_CB_Proactor,
+ "$Id$")
+
+
+ACE_BEGIN_VERSIONED_NAMESPACE_DECL
+
+ACE_SUN_Proactor::ACE_SUN_Proactor (size_t max_aio_operations)
+ : ACE_POSIX_AIOCB_Proactor (max_aio_operations,
+ ACE_POSIX_Proactor::PROACTOR_SUN),
+ condition_ (mutex_)
+{
+ // To provide correct virtual calls.
+ create_notify_manager ();
+
+ // we should start pseudo-asynchronous accept task
+ // one per all future acceptors
+
+ this->get_asynch_pseudo_task ().start ();
+}
+
+// Destructor.
+ACE_SUN_Proactor::~ACE_SUN_Proactor (void)
+{
+ this->close ();
+}
+
+int
+ACE_SUN_Proactor::handle_events (ACE_Time_Value &wait_time)
+{
+ // Decrement <wait_time> with the amount of time spent in the method
+ ACE_Countdown_Time countdown (&wait_time);
+ return this->handle_events_i (&wait_time);
+}
+
+int
+ACE_SUN_Proactor::handle_events (void)
+{
+ return this->handle_events_i (0);
+}
+
+int ACE_SUN_Proactor::wait_for_start (ACE_Time_Value * abstime)
+{
+#if defined (ACE_MT_SAFE) && (ACE_MT_SAFE != 0)
+
+ ACE_MT (ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, mutex_, -1));
+
+ if (this->num_started_aio_ != 0) // double check
+ return 0;
+
+ return this->condition_.wait (abstime);
+
+#else
+
+ return 0; // or -1 ???
+
+#endif /* ACE_MT_SAFE */
+}
+
+int
+ACE_SUN_Proactor::handle_events_i (ACE_Time_Value *delta)
+{
+ int retval = 0;
+ aio_result_t *result = 0;
+
+ if (0 == delta)
+ {
+ if (this->num_started_aio_ == 0)
+ this->wait_for_start (0);
+
+ result = aiowait (0);
+ }
+ else
+ {
+ if (this->num_started_aio_ == 0)
+ {
+ // Decrement delta with the amount of time spent waiting
+ ACE_Countdown_Time countdown (delta);
+ ACE_Time_Value tv (*delta);
+ tv += ACE_OS::gettimeofday ();
+ if (this->wait_for_start (&tv) == -1)
+ return -1;
+ }
+ struct timeval delta_tv = *delta;
+ result = aiowait (&delta_tv);
+ }
+
+ if (result == 0)
+ {
+ // timeout, do nothing,
+ // we should process "post_completed" queue
+ }
+ else if (reinterpret_cast<long> (result) == -1)
+ {
+ // Check errno for EINVAL,EAGAIN,EINTR ??
+ switch (errno)
+ {
+ case EINTR : // aiowait() was interrupted by a signal.
+ case EINVAL: // there are no outstanding asynchronous I/O requests.
+ break; // we should process "post_completed" queue
+
+ default: // EFAULT
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "%N:%l:(%P | %t)::%p \nNumAIO=%d\n",
+ "ACE_SUN_Proactor::handle_events: aiowait failed",
+ num_started_aio_),
+ -1);
+ }
+ }
+ else
+ {
+ int error_status = 0;
+ size_t transfer_count = 0;
+
+ ACE_POSIX_Asynch_Result *asynch_result =
+ find_completed_aio (result,
+ error_status,
+ transfer_count);
+
+ if (asynch_result != 0)
+ {
+ // Call the application code.
+ this->application_specific_code (asynch_result,
+ transfer_count,
+ 0, // No completion key.
+ error_status); // Error
+ retval++;
+ }
+ }
+
+ // process post_completed results
+ retval += this->process_result_queue ();
+
+ return retval > 0 ? 1 : 0 ;
+
+}
+
+int
+ACE_SUN_Proactor::get_result_status (ACE_POSIX_Asynch_Result* asynch_result,
+ int &error_status,
+ size_t &transfer_count)
+{
+
+ // Get the error status of the aio_ operation.
+ error_status = asynch_result->aio_resultp.aio_errno;
+ ssize_t op_return = asynch_result->aio_resultp.aio_return;
+
+ // ****** from Sun man pages *********************
+ // Upon completion of the operation both aio_return and aio_errno
+ // are set to reflect the result of the operation.
+ // AIO_INPROGRESS is not a value used by the system
+ // so the client may detect a change in state
+ // by initializing aio_return to this value.
+
+ if (error_status == EINPROGRESS || op_return == AIO_INPROGRESS)
+ return 0; // not completed
+
+ if (op_return < 0)
+ transfer_count = 0; // zero bytes transferred
+ else
+ transfer_count = static_cast<size_t> (op_return);
+
+ return 1; // completed
+}
+
+ACE_POSIX_Asynch_Result *
+ACE_SUN_Proactor::find_completed_aio (aio_result_t *result,
+ int &error_status,
+ size_t &transfer_count)
+{
+ ACE_MT (ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, mutex_, 0));
+
+ size_t ai;
+ error_status = -1;
+ transfer_count = 0;
+
+ // we call find_completed_aio always with result != 0
+
+ for (ai = 0; ai < aiocb_list_max_size_; ai++)
+ if (aiocb_list_[ai] != 0 && //check for non zero
+ result == &aiocb_list_[ai]->aio_resultp)
+ break;
+
+ if (ai >= aiocb_list_max_size_) // not found
+ return 0; // means somebody else uses aio directly!!!
+
+ ACE_POSIX_Asynch_Result *asynch_result = result_list_[ai];
+
+ if (this->get_result_status (asynch_result,
+ error_status,
+ transfer_count) == 0)
+ { // should never be
+ ACE_ERROR ((LM_ERROR,
+ "%N:%l:(%P | %t)::%p\n",
+ "ACE_SUN_Proactor::find_completed_aio:"
+ "should never be !!!\n"));
+ return 0;
+ }
+
+ aiocb_list_[ai] = 0;
+ result_list_[ai] = 0;
+ aiocb_list_cur_size_--;
+
+ num_started_aio_--;
+
+ start_deferred_aio ();
+ //make attempt to start deferred AIO
+ //It is safe as we are protected by mutex_
+
+ return asynch_result;
+}
+
+// start_aio_i has new return codes
+// 0 successful start
+// 1 try later, OS queue overflow
+// -1 invalid request and other errors
+
+int
+ACE_SUN_Proactor::start_aio_i (ACE_POSIX_Asynch_Result *result)
+{
+ ACE_TRACE ("ACE_SUN_Proactor::start_aio_i");
+
+ int ret_val;
+ const ACE_TCHAR *ptype;
+
+ // ****** from Sun man pages *********************
+ // Upon completion of the operation both aio_return and aio_errno
+ // are set to reflect the result of the operation.
+ // AIO_INPROGRESS is not a value used by the system
+ // so the client may detect a change in state
+ // by initializing aio_return to this value.
+ result->aio_resultp.aio_return = AIO_INPROGRESS;
+ result->aio_resultp.aio_errno = EINPROGRESS;
+
+ // Start IO
+ switch (result->aio_lio_opcode)
+ {
+ case LIO_READ :
+ ptype = ACE_LIB_TEXT ("read");
+ ret_val = aioread (result->aio_fildes,
+ (char *) result->aio_buf,
+ result->aio_nbytes,
+ result->aio_offset,
+ SEEK_SET,
+ &result->aio_resultp);
+ break;
+
+ case LIO_WRITE :
+ ptype = ACE_LIB_TEXT ("write");
+ ret_val = aiowrite (result->aio_fildes,
+ (char *) result->aio_buf,
+ result->aio_nbytes,
+ result->aio_offset,
+ SEEK_SET,
+ &result->aio_resultp);
+ break;
+
+ default:
+ ptype = ACE_LIB_TEXT ("?????");
+ ret_val = -1;
+ break;
+ }
+
+ if (ret_val == 0)
+ {
+ this->num_started_aio_++;
+ if (this->num_started_aio_ == 1) // wake up condition
+ this->condition_.broadcast ();
+ }
+ else // if (ret_val == -1)
+ {
+ if (errno == EAGAIN || errno == ENOMEM) // Defer - retry this later.
+ ret_val = 1;
+ else
+ ACE_ERROR ((LM_ERROR,
+ ACE_LIB_TEXT ("%N:%l:(%P | %t)::start_aio: aio%s %p\n"),
+ ptype,
+ ACE_LIB_TEXT ("queueing failed\n")));
+ }
+
+ return ret_val;
+}
+
+int
+ACE_SUN_Proactor::cancel_aiocb (ACE_POSIX_Asynch_Result *result)
+{
+ ACE_TRACE ("ACE_SUN_Proactor::cancel_aiocb");
+ int rc = ::aiocancel (&result->aio_resultp);
+ if (rc == 0) // AIO_CANCELED
+ {
+ // after aiocancel Sun does not notify us
+ // so we should send notification
+ // to save POSIX behavoir.
+ // Also we should do this for deffered aio's
+
+ result->set_error (ECANCELED);
+ result->set_bytes_transferred (0);
+ this->putq_result (result);
+ return 0;
+ }
+
+ return 2;
+}
+
+ACE_POSIX_Proactor::Proactor_Type
+ACE_SUN_Proactor::get_impl_type (void)
+{
+ return PROACTOR_SUN;
+}
+
+ACE_END_VERSIONED_NAMESPACE_DECL
+
+#endif /* ACE_HAS_AIO_CALLS && sun */