summaryrefslogtreecommitdiff
path: root/ace/POSIX_Proactor.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'ace/POSIX_Proactor.cpp')
-rw-r--r--ace/POSIX_Proactor.cpp209
1 files changed, 61 insertions, 148 deletions
diff --git a/ace/POSIX_Proactor.cpp b/ace/POSIX_Proactor.cpp
index 68c8aaf7be0..38aac828254 100644
--- a/ace/POSIX_Proactor.cpp
+++ b/ace/POSIX_Proactor.cpp
@@ -13,38 +13,6 @@
#include "ace/POSIX_Proactor.i"
#endif /* __ACE_INLINE__ */
-class ACE_Export ACE_POSIX_Wakeup_Completion : public ACE_POSIX_Asynch_Result
-{
- // = TITLE
- //
- // This is result object is used by the <end_event_loop> of the
- // ACE_Proactor interface to wake up all the threads blocking
- // for completions.
- //
- // = DESCRIPTION
- //
-
-public:
- ACE_POSIX_Wakeup_Completion (ACE_Handler &handler,
- const void *act = 0,
- ACE_HANDLE event = ACE_INVALID_HANDLE,
- int priority = 0,
- int signal_number = ACE_SIGRTMIN);
- // Constructor.
-
- virtual ~ACE_POSIX_Wakeup_Completion (void);
- // Destructor.
-
-
- virtual void complete (u_long bytes_transferred = 0,
- int success = 1,
- const void *completion_key = 0,
- u_long error = 0);
- // This method calls the <handler>'s <handle_wakeup> method.
-};
-
-// *********************************************************************
-
ACE_POSIX_Proactor::~ACE_POSIX_Proactor (void)
{
this->close ();
@@ -80,7 +48,7 @@ ACE_POSIX_Proactor::close_dispatch_threads (int)
size_t
ACE_POSIX_Proactor::number_of_threads (void) const
{
- // @@ Implement it.
+ // @@ Implement it.
ACE_NOTSUP_RETURN (0);
}
@@ -139,7 +107,7 @@ ACE_POSIX_Proactor::create_asynch_write_stream_result (ACE_Handler &handler,
bytes_to_write,
act,
event,
- priority,
+ priority,
signal_number),
0);
return implementation;
@@ -322,7 +290,7 @@ void
ACE_POSIX_Proactor::application_specific_code (ACE_POSIX_Asynch_Result *asynch_result,
u_long bytes_transferred,
int success,
- const void */* completion_key*/,
+ const void *completion_key,
u_long error)
{
ACE_SEH_TRY
@@ -330,7 +298,7 @@ ACE_POSIX_Proactor::application_specific_code (ACE_POSIX_Asynch_Result *asynch_r
// Call completion hook
asynch_result->complete (bytes_transferred,
success,
- 0, // No completion key.
+ (void *) completion_key,
error);
}
ACE_SEH_FINALLY
@@ -340,23 +308,6 @@ ACE_POSIX_Proactor::application_specific_code (ACE_POSIX_Asynch_Result *asynch_r
}
}
-int
-ACE_POSIX_Proactor::post_wakeup_completions (int how_many)
-{
- ACE_POSIX_Wakeup_Completion *wakeup_completion = 0;
- for (ssize_t ci = 0; ci < how_many; ci++)
- {
- ACE_NEW_RETURN (wakeup_completion,
- ACE_POSIX_Wakeup_Completion (this->wakeup_handler_),
- -1);
-
- if (wakeup_completion->post_completion (this) == -1)
- return -1;
- }
-
- return 0;
-}
-
// *********************************************************************
class ACE_Export ACE_AIOCB_Notify_Pipe_Manager : public ACE_Handler
@@ -378,7 +329,7 @@ class ACE_Export ACE_AIOCB_Notify_Pipe_Manager : public ACE_Handler
// completion of the <Asynch_Read_Stream> and calls the
// <handle_read_stream> of this class. This class calls
// <complete> on the <POSIX_Asynch_Result *> and thus calls the
- // application handler.
+ // application handler.
// Handling the MessageBlock:
// We give this message block to read the result pointer through
// the notify pipe. We expect that to read 4 bytes from the
@@ -389,10 +340,10 @@ public:
ACE_AIOCB_Notify_Pipe_Manager (ACE_POSIX_AIOCB_Proactor *posix_aiocb_proactor);
// Constructor. You need the posix proactor because you need to call
// <application_specific_code>
-
+
virtual ~ACE_AIOCB_Notify_Pipe_Manager (void);
// Destructor.
-
+
int notify (ACE_POSIX_Asynch_Result *result);
// Send the result pointer through the notification pipe.
@@ -403,10 +354,10 @@ public:
private:
ACE_POSIX_AIOCB_Proactor *posix_aiocb_proactor_;
// The implementation proactor class.
-
+
ACE_Message_Block message_block_;
// Message block to get ACE_POSIX_Asynch_Result pointer from the
- // pipe.
+ // pipe.
ACE_Pipe pipe_;
// Pipe for the communication between Proactor and the
@@ -459,8 +410,7 @@ ACE_AIOCB_Notify_Pipe_Manager::notify (ACE_POSIX_Asynch_Result *result)
{
// Send the result pointer through the pipe.
int return_val = ACE::send (this->pipe_.write_handle (),
- ACE_reinterpret_cast (char *,
- &result),
+ (char *) &result,
sizeof (result));
if (return_val != sizeof (result))
ACE_ERROR_RETURN ((LM_ERROR,
@@ -475,7 +425,7 @@ void
ACE_AIOCB_Notify_Pipe_Manager::handle_read_stream (const ACE_Asynch_Read_Stream::Result &result)
{
// The message block actually contains the ACE_POSIX_Asynch_Result
- // pointer.
+ // pointer.
ACE_POSIX_Asynch_Result *asynch_result = 0;
asynch_result = *(ACE_POSIX_Asynch_Result **) result.message_block ().rd_ptr ();
@@ -513,10 +463,7 @@ ACE_POSIX_AIOCB_Proactor::ACE_POSIX_AIOCB_Proactor (void)
{
// Initialize the array.
for (size_t ai = 0; ai < this->aiocb_list_max_size_; ai++)
- {
- aiocb_list_[ai] = 0;
- result_list_ [ai] = 0;
- }
+ aiocb_list_[ai] = 0;
// Accept Handler for aio_accept. Remember! this issues a Asynch_Read
// on the notify pipe for doing the Asynch_Accept.
@@ -615,12 +562,10 @@ ACE_POSIX_AIOCB_Proactor::handle_events (unsigned long milli_seconds)
{
int result_suspend = 0;
if (milli_seconds == ACE_INFINITE)
- {
- // Indefinite blocking.
- result_suspend = aio_suspend (this->aiocb_list_,
- this->aiocb_list_max_size_,
- 0);
- }
+ // Indefinite blocking.
+ result_suspend = aio_suspend (this->aiocb_list_,
+ this->aiocb_list_max_size_,
+ 0);
else
{
// Block on <aio_suspend> for <milli_seconds>
@@ -631,12 +576,12 @@ ACE_POSIX_AIOCB_Proactor::handle_events (unsigned long milli_seconds)
this->aiocb_list_max_size_,
&timeout);
}
-
+
// Check for errors
if (result_suspend == -1)
{
// If failure is because of timeout, then return *0*, otherwise
- // return -1.
+ // return -1.
if (errno == EAGAIN)
return 0;
else
@@ -656,7 +601,7 @@ ACE_POSIX_AIOCB_Proactor::handle_events (unsigned long milli_seconds)
// Dont process null blocks.
if (aiocb_list_ [ai] == 0)
continue;
-
+
// Analyze error and return values.
// Get the error status of the aio_ operation.
@@ -688,21 +633,17 @@ ACE_POSIX_AIOCB_Proactor::handle_events (unsigned long milli_seconds)
// This AIO has finished.
break;
}
-
+
// Something should have completed.
ACE_ASSERT (ai != this->aiocb_list_max_size_);
-
+
// Retrive the result pointer.
- ACE_POSIX_Asynch_Result *asynch_result = this->result_list_ [ai];
-
- // ACE_reinterpret_cast (ACE_POSIX_Asynch_Result *,
- // this->aiocb_list_[ai]);
- // ACE_dynamic_cast (ACE_POSIX_Asynch_Result *,
- // this->aiocb_list_[ai]);
+ ACE_POSIX_Asynch_Result *asynch_result =
+ (ACE_POSIX_Asynch_Result *) this->aiocb_list_[ai];
+
// Invalidate entry in the aiocb list.
this->aiocb_list_[ai] = 0;
- this->result_list_ [ai] = 0;
this->aiocb_list_cur_size_--;
// Call the application code.
@@ -711,7 +652,7 @@ ACE_POSIX_AIOCB_Proactor::handle_events (unsigned long milli_seconds)
1, // Success
0, // No completion key.
error_status); // Error
-
+
// Success
return 1;
}
@@ -769,9 +710,7 @@ ACE_POSIX_AIOCB_Proactor::register_aio_with_proactor (ACE_POSIX_Asynch_Result *r
-1);
// Store the pointers.
- this->aiocb_list_[ai] = result;
- this->result_list_ [ai] = result;
-
+ this->aiocb_list_[ai] = result;
this->aiocb_list_cur_size_ ++;
return 0;
@@ -783,19 +722,19 @@ ACE_POSIX_SIG_Proactor::ACE_POSIX_SIG_Proactor (void)
{
// = Mask all the signals, keep a mask set with ACE_SIGRTMIN and set
// up signal handler for SIGRTMIN.
-
+
// Mask all the signals.
if (this->mask_all () != 0)
return;
-
+
// = Keep a mask set with ACE_SIGRTMIN.
-
+
// Clear the signal set.
if (sigemptyset (&this->RT_completion_signals_) == -1)
ACE_ERROR ((LM_ERROR,
"Error:%p\n",
"Couldn't init the RT completion signal set"));
-
+
// Add the signal number to the signal set.
if (sigaddset (&this->RT_completion_signals_, ACE_SIGRTMIN) == -1)
ACE_ERROR ((LM_ERROR,
@@ -809,16 +748,16 @@ ACE_POSIX_SIG_Proactor::ACE_POSIX_SIG_Proactor (void)
ACE_POSIX_SIG_Proactor::ACE_POSIX_SIG_Proactor (const sigset_t signal_set)
{
// = Keep <Signal_set> with the Proactor, mask all the signals and
- // setup signal handlers for the signals in the <signal_set>.
-
+ // setup signal handlers for the signals in the <signal_set>.
+
// = Keep <signal_set> with the Proactor.
-
+
// Empty the signal set first.
if (sigemptyset (&this->RT_completion_signals_) == -1)
ACE_ERROR ((LM_ERROR,
"Error:(%P | %t):%p\n",
"sigemptyset failed"));
-
+
// Put the <signal_set>.
if (ACE_OS::pthread_sigmask (SIG_SETMASK, &signal_set, 0) != 0)
ACE_ERROR ((LM_ERROR,
@@ -832,11 +771,11 @@ ACE_POSIX_SIG_Proactor::ACE_POSIX_SIG_Proactor (const sigset_t signal_set)
"ACE_OS::pthread_sigmask failed"));
- // Mask all the signals.
+ // Mask all the signals.
if (this->mask_all () != 0)
return;
-
- // For each signal number present in the <signal_set>, set up the
+
+ // For each signal number present in the <signal_set>, set up the
// signal handler.
int member = 0;
for (int si = ACE_SIGRTMIN; si <= ACE_SIGRTMAX; si++)
@@ -885,12 +824,11 @@ ACE_POSIX_SIG_Proactor::post_completion (ACE_POSIX_Asynch_Result *result)
"Error:%N:%l(%P | %t):%p",
"<getpid> failed"),
-1);
-
+
// Set the signal information.
sigval value;
- value.sival_ptr = ACE_reinterpret_cast (void *,
- result);
-
+ value.sival_ptr = (void *) result;
+
// Queue the signal.
if (sigqueue (pid, result->signal_number (), value) == -1)
ACE_ERROR_RETURN ((LM_ERROR,
@@ -977,7 +915,7 @@ ACE_POSIX_SIG_Proactor::create_asynch_timer (ACE_Handler &handler,
"sigismember failed"),
0);
}
-
+
if (is_member == 0)
ACE_ERROR_RETURN ((LM_ERROR,
"Error:%N:%l:(%P | %t)::%s\n",
@@ -988,7 +926,7 @@ ACE_POSIX_SIG_Proactor::create_asynch_timer (ACE_Handler &handler,
// + 1 to nullify loop increment.
signal_number = si + 1;
}
-
+
ACE_Asynch_Result_Impl *implementation;
ACE_NEW_RETURN (implementation,
ACE_POSIX_Asynch_Timer (handler,
@@ -1005,12 +943,12 @@ int
ACE_POSIX_SIG_Proactor::setup_signal_handler (int signal_number) const
{
// Set up the handler(actually Null handler) for this real-time
- // signal.
+ // signal.
struct sigaction reaction;
sigemptyset (&reaction.sa_mask); // Nothing else to mask.
reaction.sa_flags = SA_SIGINFO; // Realtime flag.
#if defined (SA_SIGACTION)
- // Lynx says, it is better to set this bit, to be portable.
+ // Lynx says, it is better to set this bit, to be portable.
reaction.sa_flags &= SA_SIGACTION;
#endif /* SA_SIGACTION */
reaction.sa_sigaction = null_handler; // Null handler function.
@@ -1031,9 +969,9 @@ ACE_POSIX_SIG_Proactor::null_handler (int signal_number,
void * /* context */)
{
ACE_ERROR ((LM_ERROR,
- "Error:(%P | %t):ACE_POSIX_SIG_Proactor::null_handler called,"
- "Signal number %d,"
- "Mask all the RT signals for this thread\n",
+ "Error:(%P | %t):%s:Signal number %d\n"
+ "Mask all the RT signals for this thread",
+ "ACE_POSIX_SIG_Proactor::null_handler called",
signal_number));
}
@@ -1041,21 +979,21 @@ int
ACE_POSIX_SIG_Proactor::mask_all (void) const
{
sigset_t full_set;
-
+
// Get full set.
if (sigfillset (&full_set) != 0)
ACE_ERROR_RETURN ((LM_ERROR,
"Error:(%P | %t):%p\n",
"sigfillset failed"),
-1);
-
+
// Mask them.
if (ACE_OS::pthread_sigmask (SIG_SETMASK, &full_set, 0) != 0)
ACE_ERROR_RETURN ((LM_ERROR,
"Error:(%P | %t):%p\n",
"pthread_sigmask failed"),
-1);
-
+
return 0;
}
@@ -1068,7 +1006,7 @@ ACE_POSIX_SIG_Proactor::handle_events (unsigned long milli_seconds)
// Mask all the signals.
if (this->mask_all () != 0)
return -1;
-
+
// Wait for the signals.
if (milli_seconds == ACE_INFINITE)
{
@@ -1085,7 +1023,7 @@ ACE_POSIX_SIG_Proactor::handle_events (unsigned long milli_seconds)
&sig_info,
&timeout);
}
-
+
// Check for errors
if (result_sigwait == -1)
{
@@ -1100,9 +1038,9 @@ ACE_POSIX_SIG_Proactor::handle_events (unsigned long milli_seconds)
"sigtimedwait/sigwaitinfo failed"),
-1);
}
-
+
// No errors, RT compleion signal is received.
-
+
// Is the signo returned consistent with the sig info?
if (sig_info.si_signo != result_sigwait)
ACE_ERROR_RETURN ((LM_ERROR,
@@ -1111,15 +1049,15 @@ ACE_POSIX_SIG_Proactor::handle_events (unsigned long milli_seconds)
"Inconsistent signal number (%d) in the signal info block",
sig_info.si_signo),
-1);
-
+
// Retrive the result pointer.
- ACE_POSIX_Asynch_Result *asynch_result = ACE_reinterpret_cast (ACE_POSIX_Asynch_Result *,
- sig_info.si_value.sival_ptr);
+ ACE_POSIX_Asynch_Result *asynch_result =
+ (ACE_POSIX_Asynch_Result *) sig_info.si_value.sival_ptr;
// Check the <signal code> and act according to that.
if (sig_info.si_code == SI_ASYNCIO)
{
- // Analyze error and return values.
+ // Analyze error and return values.
// Check the error status
int error_status = aio_error (asynch_result);
@@ -1130,11 +1068,11 @@ ACE_POSIX_SIG_Proactor::handle_events (unsigned long milli_seconds)
"ACE_POSIX_SIG_Proactor::handle_events:"
"<aio_error> has failed"),
-1);
-
+
// Completion signal has been received, so it can't be in
// progress.
ACE_ASSERT (error_status != EINPROGRESS);
-
+
// Error_status is not -1 and not EINPROGRESS. So, an <aio_>
// operation has finished (successfully or unsuccessfully!!!)
// Get the return_status of the <aio_> operation.
@@ -1152,7 +1090,7 @@ ACE_POSIX_SIG_Proactor::handle_events (unsigned long milli_seconds)
this->application_specific_code (asynch_result,
return_status,
1, // Result : True.
- 0, // No completion key.
+ 0, // No completion_signal.
error_status); // Error.
}
}
@@ -1205,29 +1143,4 @@ ACE_POSIX_Asynch_Timer::complete (u_long bytes_transferred,
this->handler_.handle_time_out (this->time_, this->act ());
}
-// *********************************************************************
-
-ACE_POSIX_Wakeup_Completion::ACE_POSIX_Wakeup_Completion (ACE_Handler &handler,
- const void *act,
- ACE_HANDLE event,
- int priority,
- int signal_number)
- : ACE_Asynch_Result_Impl (),
- ACE_POSIX_Asynch_Result (handler, act, event, 0, 0, priority, signal_number)
-{
-}
-
-ACE_POSIX_Wakeup_Completion::~ACE_POSIX_Wakeup_Completion (void)
-{
-}
-
-void
-ACE_POSIX_Wakeup_Completion::complete (u_long /* bytes_transferred */,
- int /* success */,
- const void * /* completion_key */,
- u_long /* error */)
-{
- this->handler_.handle_wakeup ();
-}
-
#endif /* ACE_HAS_AIO_CALLS */