summaryrefslogtreecommitdiff
path: root/ace/POSIX_Proactor.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'ace/POSIX_Proactor.cpp')
-rw-r--r--ace/POSIX_Proactor.cpp422
1 files changed, 184 insertions, 238 deletions
diff --git a/ace/POSIX_Proactor.cpp b/ace/POSIX_Proactor.cpp
index 8634454e154..77220da3abd 100644
--- a/ace/POSIX_Proactor.cpp
+++ b/ace/POSIX_Proactor.cpp
@@ -1,6 +1,5 @@
-// $Id$
-
-#define ACE_BUILD_DLL
+// $Id$
+
#include "ace/POSIX_Proactor.h"
#if defined (ACE_HAS_AIO_CALLS)
@@ -16,9 +15,14 @@
class ACE_Export ACE_POSIX_Wakeup_Completion : public ACE_POSIX_Asynch_Result
{
// = TITLE
- // This is result object is used by the <end_event_loop> of the
+ //
+ // This is result object is used by the <end_event_loop> of the
// ACE_Proactor interface to wake up all the threads blocking
// for completions.
+ //
+ // = DESCRIPTION
+ //
+
public:
ACE_POSIX_Wakeup_Completion (ACE_Handler &handler,
const void *act = 0,
@@ -26,11 +30,11 @@ public:
int priority = 0,
int signal_number = ACE_SIGRTMIN);
// Constructor.
-
+
virtual ~ACE_POSIX_Wakeup_Completion (void);
// Destructor.
-
-
+
+
virtual void complete (u_long bytes_transferred = 0,
int success = 1,
const void *completion_key = 0,
@@ -290,7 +294,7 @@ ACE_POSIX_Proactor::handle_signal (int, siginfo_t *, ucontext_t *)
ACE_Time_Value timeout (0, 0);
int result = 0;
- for (;;)
+ while (1)
{
result = this->handle_events (timeout);
if (result != 0 || errno == ETIME)
@@ -339,19 +343,18 @@ int
ACE_POSIX_Proactor::post_wakeup_completions (int how_many)
{
ACE_POSIX_Wakeup_Completion *wakeup_completion = 0;
-
for (ssize_t ci = 0; ci < how_many; ci++)
{
ACE_NEW_RETURN (wakeup_completion,
ACE_POSIX_Wakeup_Completion (this->wakeup_handler_),
-1);
-
+
if (wakeup_completion->post_completion (this) == -1)
return -1;
}
-
+
return 0;
-}
+}
class ACE_Export ACE_AIOCB_Notify_Pipe_Manager : public ACE_Handler
{
@@ -498,96 +501,29 @@ ACE_AIOCB_Notify_Pipe_Manager::handle_read_stream (const ACE_Asynch_Read_Stream:
"Read from pipe failed"));
}
-// Public constructor for common use.
-ACE_POSIX_AIOCB_Proactor::ACE_POSIX_AIOCB_Proactor (size_t max_aio_operations)
- : aiocb_notify_pipe_manager_ (0),
- aiocb_list_ (0),
- result_list_ (0),
- aiocb_list_max_size_ (max_aio_operations),
- aiocb_list_cur_size_ (0)
-{
- if (aiocb_list_max_size_ > 8192)
- // @@ Alex, this shouldn't be a magic number -- it should be a
- // constant, e.g., ACE_AIO_MAX_SIZE or something.
- aiocb_list_max_size_ = 8192;
-
- ACE_NEW (aiocb_list_,
- aiocb *[aiocb_list_max_size_]);
- ACE_NEW (result_list_,
- ACE_POSIX_Asynch_Result *[aiocb_list_max_size_]);
-
- // Initialize the array.
- for (size_t ai = 0; ai < this->aiocb_list_max_size_; ai++)
- {
- aiocb_list_[ai] = 0;
- result_list_[ai] = 0;
- }
-
- create_notify_manager ();
-}
+// *********************************************************************
-// Special protected constructor for ACE_SUN_Proactor
-ACE_POSIX_AIOCB_Proactor::ACE_POSIX_AIOCB_Proactor (size_t max_aio_operations,int Flg)
+ACE_POSIX_AIOCB_Proactor::ACE_POSIX_AIOCB_Proactor (void)
: aiocb_notify_pipe_manager_ (0),
- aiocb_list_ (0),
- result_list_ (0),
- aiocb_list_max_size_ (max_aio_operations),
+ aiocb_list_max_size_ (ACE_RTSIG_MAX),
aiocb_list_cur_size_ (0)
{
- ACE_UNUSED_ARG (Flg);
-
- if (aiocb_list_max_size_ > 8192)
- // @@ Alex, this shouldn't be a magic number -- it should be a
- // constant, e.g., ACE_AIO_MAX_SIZE or something.
- aiocb_list_max_size_ = 8192;
-
- ACE_NEW (aiocb_list_,
- aiocb *[aiocb_list_max_size_]);
- ACE_NEW (result_list_,
- ACE_POSIX_Asynch_Result *[aiocb_list_max_size_]);
-
// Initialize the array.
for (size_t ai = 0; ai < this->aiocb_list_max_size_; ai++)
{
aiocb_list_[ai] = 0;
- result_list_[ai] = 0;
+ result_list_ [ai] = 0;
}
- // @@ We should create Notify_Pipe_Manager in the derived class to
- // provide correct calls for virtual functions !!!
-}
-
-// Destructor.
-ACE_POSIX_AIOCB_Proactor::~ACE_POSIX_AIOCB_Proactor (void)
-{
- delete_notify_manager ();
-
- delete [] aiocb_list_;
- aiocb_list_ = 0;
-
- delete [] result_list_;
- result_list_ = 0;
-}
-
-void
-ACE_POSIX_AIOCB_Proactor::create_notify_manager (void)
-{
// Accept Handler for aio_accept. Remember! this issues a Asynch_Read
// on the notify pipe for doing the Asynch_Accept.
-
- if (aiocb_notify_pipe_manager_ == 0)
- ACE_NEW (aiocb_notify_pipe_manager_,
- ACE_AIOCB_Notify_Pipe_Manager (this));
+ ACE_NEW (aiocb_notify_pipe_manager_,
+ ACE_AIOCB_Notify_Pipe_Manager (this));
}
-void
-ACE_POSIX_AIOCB_Proactor::delete_notify_manager (void)
+// Destructor.
+ACE_POSIX_AIOCB_Proactor::~ACE_POSIX_AIOCB_Proactor (void)
{
- // We are responsible for delete as all pointers set to 0 after
- // delete, it is save to delete twice
-
- delete aiocb_notify_pipe_manager_;
- aiocb_notify_pipe_manager_ = 0;
}
int
@@ -672,122 +608,132 @@ ACE_POSIX_AIOCB_Proactor::create_asynch_transmit_file (void)
}
int
-ACE_POSIX_AIOCB_Proactor::handle_events (u_long milli_seconds)
+ACE_POSIX_AIOCB_Proactor::handle_events (unsigned long milli_seconds)
{
int result_suspend = 0;
-
if (milli_seconds == ACE_INFINITE)
- // Indefinite blocking.
- result_suspend = aio_suspend (aiocb_list_,
- aiocb_list_max_size_,
- 0);
+ {
+ // Indefinite blocking.
+ result_suspend = aio_suspend (this->aiocb_list_,
+ this->aiocb_list_max_size_,
+ 0);
+ }
else
{
// Block on <aio_suspend> for <milli_seconds>
timespec timeout;
timeout.tv_sec = milli_seconds / 1000;
timeout.tv_nsec = (milli_seconds - (timeout.tv_sec * 1000)) * 1000;
- result_suspend = aio_suspend (aiocb_list_,
- aiocb_list_max_size_,
+ result_suspend = aio_suspend (this->aiocb_list_,
+ this->aiocb_list_max_size_,
&timeout);
}
// Check for errors
if (result_suspend == -1)
{
- if (errno == EAGAIN) // Timeout
+ // If failure is because of timeout, then return *0*, otherwise
+ // return -1.
+ if (errno == EAGAIN)
return 0;
+ else
+ {
+ ACE_DEBUG ((LM_ERROR,
+ "%N:%l:(%P | %t)::%p\n",
+ "ACE_POSIX_AIOCB_Proactor::handle_events:"
+ "aio_suspend failed"));
- ACE_ERROR_RETURN ((LM_ERROR,
- "%N:%l:(%P | %t)::%p\n",
- "ACE_POSIX_AIOCB_Proactor::handle_events:"
- "aio_suspend failed\n"),
- 0); // let continue work
+ return 0;
+ }
}
+ // Retrive the result pointer.
+ ACE_POSIX_Asynch_Result *asynch_result = 0;
+ size_t ai;
int error_status = 0;
int return_status = 0;
- ACE_POSIX_Asynch_Result *asynch_result =
- find_completed_aio (error_status, return_status);
+ // !!! Protected area.
+ {
+ ACE_Guard<ACE_Thread_Mutex> locker (this->mtx_AIOCB_);
- if (asynch_result == 0)
- return 0;
+ for (ai = 0; ai < this->aiocb_list_max_size_; ai++)
+ {
+ // Dont process null blocks.
+ if (aiocb_list_ [ai] == 0)
+ continue;
- // Call the application code.
- this->application_specific_code (asynch_result,
- return_status, // Bytes transferred.
- 1, // Success
- 0, // No completion key.
- error_status); // Error
- return 1;
-}
+ // = Analyze error and return values.
-ACE_POSIX_Asynch_Result *
-ACE_POSIX_AIOCB_Proactor::find_completed_aio (int &error_status,
- int &return_status)
-{
- ACE_MT (ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, this->mutex_, 0));
+ // Get the error status of the aio_ operation.
+ error_status = aio_error (aiocb_list_[ai]);
+ if (error_status == -1)
+ // <aio_error> itself has failed.
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "%N:%l:(%P | %t)::%p\n",
+ "ACE_POSIX_AIOCB_Proactor::handle_events:"
+ "<aio_error> has failed"),
+ -1);
- size_t ai;
- ACE_POSIX_Asynch_Result *asynch_result = 0;
+ // Continue the loop if <aio_> operation is still in progress.
+ if (error_status == EINPROGRESS)
+ continue;
- error_status = 0;
- return_status= 0;
-
- for (ai = 0; ai < aiocb_list_max_size_; ai++)
- {
- if (aiocb_list_[ai] == 0) // Dont process null blocks.
- continue;
+ // Handle cancel'ed asynchronous operation. We dont have to call
+ // <aio_return> in this case, since return_status is going to be
+ // -1. We will pass 0 for the <bytes_transferred> in this case
+ if (error_status == ECANCELED)
+ {
+ return_status = 0;
+ break;
+ }
+ else if (error_status == 0)
+ {
+ // Error_status is not -1 and not EINPROGRESS. So, an <aio_>
+ // operation has finished (successfully or unsuccessfully!!!)
+ // Get the return_status of the <aio_> operation.
+ return_status = aio_return (aiocb_list_[ai]);
- // Get the error status of the aio_ operation.
- error_status = aio_error (aiocb_list_[ai]);
+ if (return_status == -1)
+ {
+ ACE_DEBUG ((LM_ERROR,
+ "%N:%l:(%P | %t)::%p\n",
+ "ACE_POSIX_AIOCB_Proactor::handle_events:"
+ "<aio_return> failed to transfer any data\n"));
- if (error_status == -1) // <aio_error> itself has failed.
- {
- ACE_ERROR ((LM_ERROR,
- "%N:%l:(%P | %t)::%p\n",
- "ACE_POSIX_AIOCB_Proactor::find_completed_aio:"
- "<aio_error> has failed\n"));
-
- // skip this operation
- aiocb_list_[ai] = 0;
- result_list_[ai] = 0;
- aiocb_list_cur_size_--;
-
- continue;
- }
+ return_status = 0;
+ }
- // Continue the loop if <aio_> operation is still in progress.
- if (error_status != EINPROGRESS)
- break;
+ break;
+ }
+ }
- } // end for
+ // Something should have completed.
+ ACE_ASSERT (ai != this->aiocb_list_max_size_);
- if (ai >= this->aiocb_list_max_size_) // all processed
- return asynch_result;
- else if (error_status == ECANCELED)
- return_status = 0;
- else
- return_status = aio_return (aiocb_list_[ai]);
+ // Retrive the result pointer.
+ asynch_result = this->result_list_ [ai];
- if (return_status == -1)
- {
- // was ACE_ERROR_RETURN
- ACE_ERROR ((LM_ERROR,
- "%N:%l:(%P | %t)::%p\n",
- "ACE_POSIX_AIOCB_Proactor::find_completed_aio:"
- "<aio_return> failed\n"));
- return_status = 0; // zero bytes transferred
- }
+ // ACE_reinterpret_cast (ACE_POSIX_Asynch_Result *,
+ // this->aiocb_list_[ai]);
+ // ACE_dynamic_cast (ACE_POSIX_Asynch_Result *,
+ // this->aiocb_list_[ai]);
- asynch_result = result_list_[ai];
+ // Invalidate entry in the aiocb list.
+ this->aiocb_list_[ai] = 0;
+ this->result_list_ [ai] = 0;
+ this->aiocb_list_cur_size_--;
+ } // !! End of protected area.
- aiocb_list_[ai] = 0;
- result_list_[ai] = 0;
- aiocb_list_cur_size_--;
+ // Call the application code.
+ this->application_specific_code (asynch_result,
+ return_status, // Bytes transferred.
+ 1, // Success
+ 0, // No completion key.
+ error_status); // Error
- return asynch_result;
+ // Success
+ return 1;
}
void
@@ -805,79 +751,79 @@ ACE_POSIX_AIOCB_Proactor::application_specific_code (ACE_POSIX_Asynch_Result *as
}
int
-ACE_POSIX_AIOCB_Proactor::register_and_start_aio
- (ACE_POSIX_Asynch_Result *result, int op)
+ACE_POSIX_AIOCB_Proactor::register_aio_with_proactor (ACE_POSIX_Asynch_Result *result, int operation)
{
- ACE_TRACE ("ACE_POSIX_AIOCB_Proactor::register_and_start_aio");
-
- ACE_MT (ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, this->mutex_, -1));
+ ACE_TRACE ("ACE_POSIX_AIOCB_Proactor::register_aio_with_proactor");
- int ret_val = (aiocb_list_cur_size_ >= aiocb_list_max_size_) ? -1 : 0;
+ // Protect the atomic action , which is: find free slot , start IO ,
+ // save ptr in the lists
- if (result == 0) // Just check the status of the list
- return ret_val;
+ ACE_Guard<ACE_Thread_Mutex> locker (this->mtx_AIOCB_);
- // Non-zero ptr. Find a free slot and store.
- if (ret_val == 0)
+ if (result == 0)
{
- for (size_t i= 0; i < this->aiocb_list_max_size_; i++)
- if (aiocb_list_[i] == 0)
- {
- ret_val = start_aio (result, op);
-
- if (ret_val == 0) // Store the pointers.
- {
- aiocb_list_[i] = result;
- result_list_[i] = result;
-
- aiocb_list_cur_size_++;
- }
- return ret_val;
- }
-
- errno = EAGAIN;
- ret_val = -1;
+ // Just check the status of the list.
+ if (this->aiocb_list_cur_size_ >=
+ this->aiocb_list_max_size_)
+ return -1;
+ else
+ return 0;
}
-
- ACE_ERROR ((LM_ERROR,
- "%N:%l:(%P | %t)::\n"
- "register_and_start_aio: No space to store the <aio>info\n"));
- return ret_val;
-}
-int
-ACE_POSIX_AIOCB_Proactor::start_aio (ACE_POSIX_Asynch_Result *result, int op)
-{
- ACE_TRACE ("ACE_POSIX_AIOCB_Proactor::start_aio");
+ // Non-zero ptr. Find a free slot and store.
+
+ // Make sure again.
+ if (this->aiocb_list_cur_size_ >=
+ this->aiocb_list_max_size_)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "Error:Asynch_Operation: No space to store the <aio> info.\n"),
+ -1);
- int ret_val;
- const ACE_TCHAR *ptype;
+ // Slot(s) available. Find a free one.
+ size_t ai;
+ for (ai = 0;
+ ai < this->aiocb_list_max_size_;
+ ai++)
+ if (this->aiocb_list_[ai] == 0)
+ break;
- // Start IO
+ // Sanity check.
+ if (ai == this->aiocb_list_max_size_)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "Error:Asynch_Operation: No space to store the <aio> info.\n"),
+ -1);
- switch (op)
+ // Start the IO.
+ if (operation == 0)
{
- case 0:
- ptype = "read ";
- ret_val = aio_read (result);
- break;
- case 1:
- ptype = "write";
- ret_val = aio_write (result);
- break;
- default:
- ptype = "?????";
- ret_val = -1;
- break;
+ // Read
+ if (aio_read (result) == -1)
+ {
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "Error:%p\n",
+ "Asynch_Read_XXXX: aio_read queueing failed\n"),
+ -1);
+ }
+ }
+ else
+ {
+ // write
+ if (aio_write (result) == -1)
+ {
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "Error:%p\n",
+ "Asynch_Read_XXXX: aio_read queueing failed\n"),
+ -1);
+ }
}
-
- if (ret_val == -1)
- ACE_ERROR ((LM_ERROR,
- "%N:%l:(%P | %t)::start_aio: aio_%s %p\n",
- ptype,
- "queueing failed\n"));
- return ret_val;
+ // Store the pointers.
+ this->aiocb_list_[ai] = result;
+ this->result_list_ [ai] = result;
+
+ this->aiocb_list_cur_size_ ++;
+
+ return 0;
}
// *********************************************************************
@@ -1085,7 +1031,7 @@ ACE_POSIX_SIG_Proactor::create_asynch_timer (ACE_Handler &handler,
si);
if (is_member == -1)
ACE_ERROR_RETURN ((LM_ERROR,
- "%N:%l:(%P | %t)::%s\n",
+ "%N:%l:(%P | %t)::\n",
"ACE_POSIX_SIG_Proactor::create_asynch_timer:"
"sigismember failed"),
0);
@@ -1235,10 +1181,10 @@ ACE_POSIX_SIG_Proactor::handle_events (unsigned long milli_seconds)
if (sig_info.si_code == SI_ASYNCIO)
{
// Analyze error and return values.
-
+
int error_status = 0;
int return_status = 0;
-
+
// Check the error status
error_status = aio_error (asynch_result);
@@ -1267,25 +1213,25 @@ ACE_POSIX_SIG_Proactor::handle_events (unsigned long milli_seconds)
{
return_status = 0;
}
- else
+ else
{
// Get the return_status of the <aio_> operation.
return_status = aio_return (asynch_result);
// Failure.
if (return_status == -1)
- {
- ACE_ERROR ((LM_ERROR,
+ {
+ ACE_DEBUG ((LM_ERROR,
"%N:%l:(%P | %t)::%p\n",
"ACE_POSIX_SIG_Proactor::handle_events:"
- "<aio_return> failed"));
- return_status = 0; // zero bytes transferred
- }
+ "<aio_return> failed to transfer any data\n"));
+ return_status = 0;
+ }
}
- // Error status and return status are obtained. Dispatch the
- // completion.
+ // error status and return status are obtained. Dispatch the
+ // completion .
this->application_specific_code (asynch_result,
return_status,
1, // Result : True.