summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authoralex <alex@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>1999-04-25 03:59:50 +0000
committeralex <alex@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>1999-04-25 03:59:50 +0000
commita85e43d6ce0ed824713d611dddef982d837cee76 (patch)
treea1ca7ec32d41c7914d56c968a2be0233cd285f86
parente4e33898d9bc410f268e4c0da0dc7ca47bb2ed9f (diff)
downloadATCD-a85e43d6ce0ed824713d611dddef982d837cee76.tar.gz
- Completed Multithreading of POSIX_SIG_Proactor.
- Introduced another constructor for ACE_POSIX_SIG_Proactor for taking signal mask to be used with the Proactor. - Thanks to Dave Butenhof <butenhof@zko.dec.com> for helping a lot to understand the various things in the POSIX4 standard. - Thanks to Dave suggestion of keeping null_handler for the sigaction to real-time signals. With this and a couple of other correct POSIX things SIG proactor is now working with in Solaris 2.7. - Lynx OS doesnt support <pthread_sigmask>, so it couldnt be multithreaded. Enabled AIOCB_Proactor for this platform. - Added an example to make use of the real-time signal numbers for the asynchronous I/O calls.
-rw-r--r--ace/POSIX_Asynch_IO.cpp62
-rw-r--r--ace/POSIX_Asynch_IO.h12
-rw-r--r--ace/POSIX_Proactor.cpp267
-rw-r--r--ace/POSIX_Proactor.h110
-rw-r--r--ace/Proactor.cpp94
-rw-r--r--ace/Proactor.h42
-rw-r--r--ace/Proactor_Impl.h20
-rw-r--r--ace/WIN32_Proactor.cpp2
-rw-r--r--ace/config-linux-lxpthreads.h2
-rw-r--r--ace/config-lynxos.h10
10 files changed, 371 insertions, 250 deletions
diff --git a/ace/POSIX_Asynch_IO.cpp b/ace/POSIX_Asynch_IO.cpp
index f789eb9cdec..a427e00c7ba 100644
--- a/ace/POSIX_Asynch_IO.cpp
+++ b/ace/POSIX_Asynch_IO.cpp
@@ -236,12 +236,6 @@ ACE_POSIX_SIG_Asynch_Operation::~ACE_POSIX_SIG_Asynch_Operation (void)
{
}
-int
-ACE_POSIX_SIG_Asynch_Operation::register_aio_with_proactor (ACE_POSIX_Asynch_Result *result)
-{
- return this->posix_proactor ()->register_aio_with_proactor (result);
-}
-
// *********************************************************************
u_long
@@ -540,10 +534,6 @@ ACE_POSIX_SIG_Asynch_Read_Stream::shared_read (ACE_POSIX_Asynch_Read_Stream_Resu
result->aio_sigevent.sigev_notify = SIGEV_SIGNAL;
result->aio_sigevent.sigev_signo = result->signal_number ();
result->aio_sigevent.sigev_value.sival_ptr = (void *) result;
-
- // Register the real-time signal with the Proactor.
- if (this->register_aio_with_proactor (result) == -1)
- return -1;
// Fire off the aio read.
if (aio_read (result) == -1)
@@ -873,10 +863,6 @@ ACE_POSIX_SIG_Asynch_Write_Stream::shared_write (ACE_POSIX_Asynch_Write_Stream_R
result->aio_sigevent.sigev_signo = result->signal_number ();
result->aio_sigevent.sigev_value.sival_ptr = (void *) result;
- // Register the real-time signal with the Proactor.
- if (this->register_aio_with_proactor (result) == -1)
- return -1;
-
// Fire off the aio write.
if (aio_write (result) == -1)
// Queueing failed.
@@ -1957,9 +1943,8 @@ ACE_POSIX_SIG_Asynch_Accept_Handler::register_accept_call (ACE_POSIX_Asynch_Acce
// Do the work.
if (this->register_accept_call_i (result) == -1)
return -1;
-
- // Also register the real-time signal.
- return this->posix_proactor_->register_aio_with_proactor (result);
+
+ return 0;
}
int
@@ -2476,7 +2461,9 @@ class ACE_Export ACE_POSIX_Asynch_Transmit_Handler : public ACE_Handler
// = DESCRIPTION
//
// This is a helper class for implementing
- // <ACE_POSIX_Asynch_Transmit_File> in Unix systems.
+ // <ACE_POSIX_Asynch_Transmit_File> in Unix systems. This class
+ // abstracts out all the commonalities in the two different
+ // POSIX Transmit Handler implementations.
public:
virtual ~ACE_POSIX_Asynch_Transmit_Handler (void);
@@ -2695,32 +2682,25 @@ ACE_POSIX_AIOCB_Asynch_Transmit_Handler::handle_write_stream (const ACE_Asynch_W
// Check the success parameter.
if (result.success () == 0)
{
+ // Failure.
ACE_ERROR ((LM_ERROR,
"Asynch_Transmit_File failed.\n"));
-
- // Check the success parameter.
- if (result.success () == 0)
+
+ ACE_SEH_TRY
{
- // Failure.
- ACE_ERROR ((LM_ERROR,
- "Asynch_Transmit_File failed.\n"));
-
- ACE_SEH_TRY
- {
- this->result_->complete (this->bytes_transferred_,
- 0, // Failure.
- 0, // @@ Completion key.
- 0); // @@ Error no.
- }
- ACE_SEH_FINALLY
- {
- // This is crucial to prevent memory leaks. This deletes
- // the result pointer also.
- delete this;
- }
+ this->result_->complete (this->bytes_transferred_,
+ 0, // Failure.
+ 0, // @@ Completion key.
+ 0); // @@ Error no.
+ }
+ ACE_SEH_FINALLY
+ {
+ // This is crucial to prevent memory leaks. This deletes
+ // the result pointer also.
+ delete this;
}
}
-
+
// Write stream successful.
// Partial write to socket.
@@ -2816,8 +2796,10 @@ ACE_POSIX_AIOCB_Asynch_Transmit_Handler::handle_read_file (const ACE_Asynch_Read
if (result.bytes_transferred () == 0)
return;
- // Increment offset and write data to network.
+ // Increment offset.
this->file_offset_ += result.bytes_transferred ();
+
+ // Write data to network.
if (this->ws_.write (result.message_block (),
result.bytes_transferred (),
(void *)&this->data_act_,
diff --git a/ace/POSIX_Asynch_IO.h b/ace/POSIX_Asynch_IO.h
index 521ecd31b45..56224912845 100644
--- a/ace/POSIX_Asynch_IO.h
+++ b/ace/POSIX_Asynch_IO.h
@@ -172,12 +172,6 @@ protected:
virtual ~ACE_POSIX_Asynch_Operation (void);
// Destructor.
- virtual int register_aio_with_proactor (ACE_POSIX_Asynch_Result *result) = 0;
- // <Asynch_Operation> class call this method to register the <aio_>
- // call with the Proactor, so that Proactor can do completion
- // querying effectively. Different POSIX Proactor implementations do
- // different things to register the <aio_>.
-
ACE_Proactor *proactor_;
// Proactor that this Asynch IO will be registered with.
@@ -245,12 +239,6 @@ protected:
virtual ~ACE_POSIX_SIG_Asynch_Operation (void);
// Destructor.
- int register_aio_with_proactor (ACE_POSIX_Asynch_Result *result);
- // <Asynch_Operation> classes call this, to register the real-time
- // signal used to issue the <aio_> call with the Proator. This
- // should be done so that the Proactor can wait for completions of
- // the asynchronous calls, issued using those signals.
-
ACE_POSIX_SIG_Proactor *posix_sig_proactor_;
// It is easy to get this specific implementation proactor here,
// since it is the one that creates the correct POSIX_Asynch_*
diff --git a/ace/POSIX_Proactor.cpp b/ace/POSIX_Proactor.cpp
index 3861e8aa110..5bfcc4e1c5a 100644
--- a/ace/POSIX_Proactor.cpp
+++ b/ace/POSIX_Proactor.cpp
@@ -720,13 +720,80 @@ ACE_POSIX_AIOCB_Proactor::register_aio_with_proactor (ACE_POSIX_Asynch_Result *r
ACE_POSIX_SIG_Proactor::ACE_POSIX_SIG_Proactor (void)
{
- // Make the sigset_t consisting of the completion signals.
+ // = Mask all the signals, keep a mask set with ACE_SIGRTMIN and set
+ // up signal handler for SIGRTMIN.
+
+ // Mask all the signals.
+ if (this->mask_all () != 0)
+ return;
+
+ // = Keep a mask set with ACE_SIGRTMIN.
+
+ // Clear the signal set.
if (sigemptyset (&this->RT_completion_signals_) == -1)
ACE_ERROR ((LM_ERROR,
"Error:%p\n",
"Couldn't init the RT completion signal set"));
+
+ // Add the signal number to the signal set.
+ if (sigaddset (&this->RT_completion_signals_, ACE_SIGRTMIN) == -1)
+ ACE_ERROR ((LM_ERROR,
+ "Error:%p\n",
+ "Couldnt init the RT completion signal set"));
+
+ // Set up the signal handler for SIGRTMIN.
+ setup_signal_handler (ACE_SIGRTMIN);
}
+ACE_POSIX_SIG_Proactor::ACE_POSIX_SIG_Proactor (const sigset_t signal_set)
+{
+ // = Keep <Signal_set> with the Proactor, mask all the signals and
+ // setup signal handlers for the signals in the <signal_set>.
+
+ // = Keep <signal_set> with the Proactor.
+
+ // Empty the signal set first.
+ if (sigemptyset (&this->RT_completion_signals_) == -1)
+ ACE_ERROR ((LM_ERROR,
+ "Error:(%P | %t):%p\n",
+ "sigemptyset failed"));
+
+ // Put the <signal_set>.
+ if (pthread_sigmask (SIG_SETMASK, &signal_set, 0) != 0)
+ ACE_ERROR ((LM_ERROR,
+ "Error:(%P | %t):%p\n",
+ "pthread_sigmask failed"));
+
+ // Get the <signal_set> back from the OS.
+ if (pthread_sigmask (SIG_SETMASK, 0, &this->RT_completion_signals_) != 0)
+ ACE_ERROR ((LM_ERROR,
+ "Error:(%P | %t):%p\n",
+ "pthread_sigmask failed"));
+
+
+ // Mask all the signals.
+ if (this->mask_all () != 0)
+ return;
+
+ // For each signal number present in the <signal_set>, set up the
+ // signal handler.
+ int member = 0;
+ for (int si = ACE_SIGRTMIN; si <= ACE_SIGRTMAX; si++)
+ {
+ member = sigismember (&signal_set,
+ si);
+ if (member == -1)
+ ACE_ERROR ((LM_ERROR,
+ "%N:%l:(%P | %t)::%p\n",
+ "ACE_POSIX_SIG_Proactor::ACE_POSIX_SIG_Proactor:"
+ "sigismember failed"));
+ else if (member == 1)
+ {
+ if (this->setup_signal_handler (si) == -1)
+ return;
+ }
+ }
+}
ACE_POSIX_SIG_Proactor::~ACE_POSIX_SIG_Proactor (void)
{
@@ -754,23 +821,19 @@ ACE_POSIX_SIG_Proactor::post_completion (ACE_POSIX_Asynch_Result *result)
pid_t pid = ACE_OS::getpid ();
if (pid == (pid_t) -1)
ACE_ERROR_RETURN ((LM_ERROR,
- "Error:(%P | %t):%p",
- "<getpid> failed\n"),
+ "Error:%N:%l(%P | %t):%p",
+ "<getpid> failed"),
-1);
// Set the signal information.
sigval value;
value.sival_ptr = (void *) result;
- // Register the signal number with the Proactor.
- if (this->register_aio_with_proactor (result) == -1)
- return -1;
-
// Queue the signal.
if (sigqueue (pid, result->signal_number (), value) == -1)
ACE_ERROR_RETURN ((LM_ERROR,
- "Error:(%P | %t):%p",
- "<sigqueue> failed\n"),
+ "Error:%N:%l:(%P | %t):%p\n",
+ "<sigqueue> failed"),
-1);
return 0;
}
@@ -825,15 +888,131 @@ ACE_POSIX_SIG_Proactor::create_asynch_transmit_file (void)
return implementation;
}
+ACE_Asynch_Result_Impl *
+ACE_POSIX_SIG_Proactor::create_asynch_timer (ACE_Handler &handler,
+ const void *act,
+ const ACE_Time_Value &tv,
+ ACE_HANDLE event,
+ int priority,
+ int signal_number)
+{
+ int is_member = 0;
+
+ // Fix the signal number.
+ if (signal_number == -1)
+ {
+ int si;
+ for (si = ACE_SIGRTMAX;
+ (is_member == 0) && (si >= ACE_SIGRTMIN);
+ si--)
+ {
+ is_member = sigismember (&this->RT_completion_signals_,
+ si);
+ if (is_member == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "%N:%l:(%P | %t)::\n",
+ "ACE_POSIX_SIG_Proactor::create_asynch_timer:"
+ "sigismember failed"),
+ 0);
+ }
+
+ if (is_member == 0)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "Error:%N:%l:(%P | %t)::%s\n",
+ "ACE_POSIX_SIG_Proactor::ACE_POSIX_SIG_Proactor:"
+ "Signal mask set empty"),
+ 0);
+ else
+ // + 1 to nullify loop increment.
+ signal_number = si + 1;
+ }
+
+ ACE_Asynch_Result_Impl *implementation;
+ ACE_NEW_RETURN (implementation,
+ ACE_POSIX_Asynch_Timer (handler,
+ act,
+ tv,
+ event,
+ priority,
+ signal_number),
+ 0);
+ return implementation;
+}
+
+int
+ACE_POSIX_SIG_Proactor::setup_signal_handler (int signal_number) const
+{
+ // Set up the handler(actually Null handler) for this real-time
+ // signal.
+ struct sigaction reaction;
+ sigemptyset (&reaction.sa_mask); // Nothing else to mask.
+ reaction.sa_flags = SA_SIGINFO; // Realtime flag.
+#if defined (SA_SIGACTION)
+ // Lynx says, it is better to set this bit, to be portable.
+ reaction.sa_flags &= SA_SIGACTION;
+#endif /* SA_SIGACTION */
+ reaction.sa_sigaction = null_handler; // Null handler function.
+ int sigaction_return = sigaction (signal_number,
+ &reaction,
+ 0);
+ if (sigaction_return == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "Error:%p\n",
+ "Proactor couldnt do sigaction for the RT SIGNAL"),
+ -1);
+ return 0;
+}
+
+void
+ACE_POSIX_SIG_Proactor::null_handler (int signal_number,
+ siginfo_t * /* info */,
+ void * /* context */)
+{
+ ACE_ERROR ((LM_ERROR,
+ "Error:(%P | %t):%s:Signal number %d\n"
+ "Mask all the RT signals for this thread",
+ "ACE_POSIX_SIG_Proactor::null_handler called",
+ signal_number));
+}
+
+int
+ACE_POSIX_SIG_Proactor::mask_all (void) const
+{
+ sigset_t full_set;
+
+ // Get full set.
+ if (sigfillset (&full_set) != 0)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "Error:(%P | %t):%p\n",
+ "sigfillset failed"),
+ -1);
+
+ // Mask them.
+ if (pthread_sigmask (SIG_SETMASK, &full_set, 0) != 0)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "Error:(%P | %t):%p\n",
+ "pthread_sigmask failed"),
+ -1);
+
+ return 0;
+}
+
int
ACE_POSIX_SIG_Proactor::handle_events (unsigned long milli_seconds)
{
int result_sigwait = 0;
siginfo_t sig_info;
+
+ // Mask all the signals.
+ if (this->mask_all () != 0)
+ return -1;
+ // Wait for the signals.
if (milli_seconds == ACE_INFINITE)
- result_sigwait = sigwaitinfo (&this->RT_completion_signals_,
- &sig_info);
+ {
+ result_sigwait = sigwaitinfo (&this->RT_completion_signals_,
+ &sig_info);
+ }
else
{
// Wait for <milli_seconds> amount of time.
@@ -862,22 +1041,10 @@ ACE_POSIX_SIG_Proactor::handle_events (unsigned long milli_seconds)
// No errors, RT compleion signal is received.
- // We deal only with the signal numbers that have been registered
- // already.
- int member = sigismember (&this->RT_completion_signals_,
- result_sigwait);
- if (member != 1)
- ACE_ERROR_RETURN ((LM_ERROR,
- "%N:%l:(%P | %t)::%p\n",
- "ACE_POSIX_SIG_Proactor::handle_events:"
- "Unexpected signal (%d) received while waiting for RT Completion Signal",
- result_sigwait),
- -1);
-
// Is the signo returned consistent with the sig info?
if (sig_info.si_signo != result_sigwait)
ACE_ERROR_RETURN ((LM_ERROR,
- "%N:%l:(%P | %t):"
+ "Error:%N:%l:(%P | %t):"
"ACE_POSIX_SIG_Proactor::handle_events:"
"Inconsistent signal number (%d) in the signal info block",
sig_info.si_signo),
@@ -948,58 +1115,6 @@ ACE_POSIX_SIG_Proactor::handle_events (unsigned long milli_seconds)
return 1;
}
-int
-ACE_POSIX_SIG_Proactor::register_aio_with_proactor (ACE_POSIX_Asynch_Result *result)
-{
- // Nothing to do if this signal is already there in the signal set,
- // that has been already masked out.
- int member = sigismember (&this->RT_completion_signals_,
- result->signal_number ());
- if (member == -1)
- ACE_ERROR_RETURN ((LM_ERROR,
- "%N:%l:(%P | %t)::%p\n",
- "ACE_POSIX_SIG_Proactor::register_aio_with_proactor:"
- "sigismember failed"),
- -1);
- // Return if it is already there.
- if (member == 1)
- return 0;
-
- // Add the signal number to the signal set.
- if (sigaddset (&this->RT_completion_signals_, result->signal_number ()) < 0)
- ACE_ERROR_RETURN ((LM_ERROR,
- "Error:%p\n",
- "Couldnt init the RT completion signal set"),
- -1);
-
- // Mask them.
- if (sigprocmask (SIG_BLOCK, &RT_completion_signals_, 0) < 0)
- ACE_ERROR_RETURN ((LM_ERROR,
- "Error:%p\n",
- "Couldnt mask the RT completion signals"),
- -1);
-
- // Set up the handler(actually Null handler) for this real-time
- // signal.
- struct sigaction reaction;
- sigemptyset (&reaction.sa_mask); // Nothing else to mask.
- reaction.sa_flags = SA_SIGINFO; // Realtime flag.
-#if defined (SA_SIGACTION)
- // Lynx says, it is better to set this bit, to be portable.
- reaction.sa_flags &= SA_SIGACTION;
-#endif /* SA_SIGACTION */
- reaction.sa_sigaction = 0; // No handler function.
- int sigaction_return = sigaction (result->signal_number (),
- &reaction,
- 0);
- if (sigaction_return == -1)
- ACE_ERROR ((LM_ERROR,
- "Error:%p\n",
- "Proactor couldnt do sigaction for the RT SIGNAL"));
- return 0;
-}
-
-
// *********************************************************************
ACE_POSIX_Asynch_Timer::ACE_POSIX_Asynch_Timer (ACE_Handler &handler,
diff --git a/ace/POSIX_Proactor.h b/ace/POSIX_Proactor.h
index 889ff0c4234..f868d354034 100644
--- a/ace/POSIX_Proactor.h
+++ b/ace/POSIX_Proactor.h
@@ -92,18 +92,18 @@ public:
ACE_Message_Block &message_block,
u_long bytes_to_read,
const void* act,
- ACE_HANDLE event,
- int priority,
- int signal_number);
+ ACE_HANDLE event = ACE_INVALID_HANDLE,
+ int priority = 0,
+ int signal_number = ACE_SIGRTMIN);
virtual ACE_Asynch_Write_Stream_Result_Impl *create_asynch_write_stream_result (ACE_Handler &handler,
ACE_HANDLE handle,
ACE_Message_Block &message_block,
u_long bytes_to_write,
const void* act,
- ACE_HANDLE event,
- int priority,
- int signal_number);
+ ACE_HANDLE event = ACE_INVALID_HANDLE,
+ int priority = 0,
+ int signal_number = ACE_SIGRTMIN);
virtual ACE_Asynch_Read_File_Result_Impl *create_asynch_read_file_result (ACE_Handler &handler,
ACE_HANDLE handle,
@@ -112,9 +112,9 @@ public:
const void* act,
u_long offset,
u_long offset_high,
- ACE_HANDLE event,
- int priority,
- int signal_number);
+ ACE_HANDLE event = ACE_INVALID_HANDLE,
+ int priority = 0,
+ int signal_number = ACE_SIGRTMIN);
virtual ACE_Asynch_Write_File_Result_Impl *create_asynch_write_file_result (ACE_Handler &handler,
ACE_HANDLE handle,
@@ -123,9 +123,9 @@ public:
const void* act,
u_long offset,
u_long offset_high,
- ACE_HANDLE event,
- int priority,
- int signal_number);
+ ACE_HANDLE event = ACE_INVALID_HANDLE,
+ int priority = 0,
+ int signal_number = ACE_SIGRTMIN);
virtual ACE_Asynch_Accept_Result_Impl *create_asynch_accept_result (ACE_Handler &handler,
ACE_HANDLE listen_handle,
@@ -133,9 +133,9 @@ public:
ACE_Message_Block &message_block,
u_long bytes_to_read,
const void* act,
- ACE_HANDLE event,
- int priority,
- int signal_number);
+ ACE_HANDLE event = ACE_INVALID_HANDLE,
+ int priority = 0,
+ int signal_number = ACE_SIGRTMIN);
virtual ACE_Asynch_Transmit_File_Result_Impl *create_asynch_transmit_file_result (ACE_Handler &handler,
ACE_HANDLE socket,
@@ -147,16 +147,16 @@ public:
u_long bytes_per_send,
u_long flags,
const void *act,
- ACE_HANDLE event,
- int priority,
- int signal_number);
+ ACE_HANDLE event = ACE_INVALID_HANDLE,
+ int priority = 0,
+ int signal_number = ACE_SIGRTMIN);
virtual ACE_Asynch_Result_Impl *create_asynch_timer (ACE_Handler &handler,
const void *act,
const ACE_Time_Value &tv,
- ACE_HANDLE event,
- int priority,
- int signal_number);
+ ACE_HANDLE event = ACE_INVALID_HANDLE,
+ int priority = 0,
+ int signal_number = ACE_SIGRTMIN);
// Create a timer result object which can be used with the Timer
// mechanism of the Proactor.
@@ -171,25 +171,6 @@ protected:
u_long error);
// Protect against structured exceptions caused by user code when
// dispatching handles.
-
- virtual int register_aio_with_proactor (ACE_POSIX_Asynch_Result *result) = 0;
- // <Asynch_Operation> class call this method to register the <aio_>
- // call with the Proactor, so that Proactor can do completion
- // querying. Different <POSIX_Proactor> implementations do different
- // things to register the <aio_>.
-
-#if 0
- ACE_Thread_Manager thr_mgr_;
- // This will manage the thread in the Timer_Handler.
-
- ACE_Auto_Event event_;
- // This event is used in conjunction with Reactor when we try to
- // integrate the event loops of Reactor and the Proactor.
-
- int used_with_reactor_event_loop_;
- // Flag that indicates whether we are used in conjunction with
- // Reactor.
-#endif /* 0 */
};
// Forward declarations.
@@ -296,8 +277,11 @@ class ACE_Export ACE_POSIX_SIG_Proactor : public ACE_POSIX_Proactor
// = TITLE
//
// This Proactor implementation does compeltion querying using
- // POSIX Real Time signals. <sigtimedwait> call is used to get
- // the notify/get the completions.
+ // POSIX Real Time signals. <sigtimedwait>/<sigwaitinfo> call is
+ // used to get the notify/get the completions.
+ // The real-time signals that are going to be used with this
+ // Proactor should be given apriori in the constructor, so that
+ // those signals can be masked from asynchornous delivery.
//
// = DESCRIPTION
//
@@ -309,7 +293,15 @@ class ACE_Export ACE_POSIX_SIG_Proactor : public ACE_POSIX_Proactor
public:
ACE_POSIX_SIG_Proactor (void);
- // Constructor.
+ // This constructor masks only the <ACE_SIGRTMIN>
+ // real-time signal. Only this signal should be used to issue
+ // asynchronous operations using this Proctor.
+
+ ACE_POSIX_SIG_Proactor (const sigset_t mask_set);
+ // This constructor should be used to tell the Proactor to mask and
+ // wait for the real-time signals specified in this set. Only these
+ // signals should be used by the asynchronous operations when they
+ // use this Proactor.
virtual ~ACE_POSIX_SIG_Proactor (void);
// Destructor.
@@ -345,18 +337,33 @@ public:
virtual ACE_Asynch_Transmit_File_Impl *create_asynch_transmit_file (void);
+ virtual ACE_Asynch_Result_Impl *create_asynch_timer (ACE_Handler &handler,
+ const void *act,
+ const ACE_Time_Value &tv,
+ ACE_HANDLE event = ACE_INVALID_HANDLE,
+ int priority = 0,
+ int signal_number = ACE_SIGRTMIN) ;
+ // If <signal_number> is -1, check with the Proactor and use one of
+ // the signals that is present in the mask set (i.e. the signals for
+ // which the Proactor will be waiting) of the Proactor. If there are
+ // more than one signal, the higher numbered signal will be chosen.
+
protected:
+ int setup_signal_handler (int signal_number) const;
+ // To setup the handler for a real-time signbal.
+
+ static void null_handler (int signal_number, siginfo_t *info, void *context);
+ // Dummy signal handler. This wont get called at all, since we are
+ // going to be masking the signal in all the threads.
+
+ int mask_all (void) const;
+ // To mask all the signals in a thread.
+
virtual int handle_events (unsigned long milli_seconds);
// Dispatch a single set of events. If <milli_seconds> elapses
// before any events occur, return 0. Return 1 if a completion is
// dispatched. Return -1 on errors.
- int register_aio_with_proactor (ACE_POSIX_Asynch_Result *result);
- // <Asynch_Operation> classes call this, to register the real-time
- // signal used to issue the <aio_> call with the Proator. This
- // should be done so that the Proactor can wait for completions of
- // the asynchronous calls, issued using those signals.
-
sigset_t RT_completion_signals_;
// These signals are used for completion notification by the
// Proactor. The signals specified while issueing <Asynch
@@ -373,7 +380,8 @@ class ACE_Export ACE_POSIX_Asynch_Timer : public ACE_POSIX_Asynch_Result
// called.
friend class ACE_POSIX_Proactor;
- // The factory method for this class is with the POSIX_Proactor
+ friend class ACE_POSIX_SIG_Proactor;
+ // The factory method for this class is with the POSIX_Proactor
// class.
protected:
@@ -382,7 +390,7 @@ protected:
const ACE_Time_Value &tv,
ACE_HANDLE event = ACE_INVALID_HANDLE,
int priority = 0,
- int signal_number = SIGRTMIN);
+ int signal_number = ACE_SIGRTMIN);
// Constructor.
virtual ~ACE_POSIX_Asynch_Timer (void) {}
diff --git a/ace/Proactor.cpp b/ace/Proactor.cpp
index 6e16958df67..bd2ce7a52e6 100644
--- a/ace/Proactor.cpp
+++ b/ace/Proactor.cpp
@@ -96,35 +96,28 @@ ACE_Proactor_Timer_Handler::svc (void)
{
ACE_Time_Value absolute_time;
int empty_flag = 0;
-
+ int result = 0;
+
while (this->shutting_down_ == 0)
{
- // If the timer queue is not empty
+ // Is the timer queue empty?
empty_flag = this->proactor_.timer_queue ()->is_empty ();
+
if (!empty_flag)
{
// Get the earliest absolute time.
- absolute_time =
- this->proactor_.timer_queue ()->earliest_time () -
- this->proactor_.timer_queue ()->gettimeofday ();
-#if 0
- ACE_DEBUG ((LM_DEBUG,
- "%N%l:(%t):Earliest Time %d sec, %d msec time\n",
- absolute_time.sec (),
- absolute_time.msec ()));
-#endif
- // Make it zero if it is negative.
- if (absolute_time < ACE_Time_Value::zero)
- absolute_time = ACE_Time_Value::zero;
+ absolute_time = this->proactor_.timer_queue ()->earliest_time ();
+
+ // Block for absolute time.
+ result = this->timer_event_.wait (&absolute_time);
}
-
- // Wait for event upto <absolute_time>.
- int result = 0;
- if (empty_flag)
- result = this->timer_event_.wait (0);
else
- result = this->timer_event_.wait (&absolute_time);
+ {
+ // Wait for ever.
+ result = this->timer_event_.wait ();
+ }
+ // Check for timer expiries.
if (result == -1)
{
switch (errno)
@@ -142,7 +135,6 @@ ACE_Proactor_Timer_Handler::svc (void)
}
}
}
-
return 0;
}
@@ -166,19 +158,21 @@ ACE_Proactor_Handle_Timeout_Upcall::timeout (TIMER_QUEUE &timer_queue,
ASYS_TEXT ("(%t) No Proactor set in ACE_Proactor_Handle_Timeout_Upcall,")
ASYS_TEXT (" no completion port to post timeout to?!@\n")),
-1);
-
+
// Create the Asynch_Timer.
ACE_Asynch_Result_Impl *asynch_timer = this->proactor_->create_asynch_timer (*handler,
- act,
- time,
- 0);
+ act,
+ time,
+ ACE_INVALID_HANDLE,
+ 0,
+ -1);
if (asynch_timer == 0)
ACE_ERROR_RETURN ((LM_ERROR,
"%N:%l:(%P | %t):%p\n",
"ACE_Proactor_Handle_Timeout_Upcall::timeout:"
"create_asynch_timer failed"),
-1);
-
+
// Post a completion.
if (asynch_timer->post_completion (this->proactor_->implementation ()) == -1)
ACE_ERROR_RETURN ((LM_ERROR,
@@ -239,7 +233,7 @@ ACE_Proactor::ACE_Proactor (ACE_Proactor_Impl *implementation,
delete_timer_queue_ (0)
{
this->implementation (implementation);
-
+
if (this->implementation () == 0)
{
#if defined (ACE_HAS_AIO_CALLS)
@@ -248,8 +242,8 @@ ACE_Proactor::ACE_Proactor (ACE_Proactor_Impl *implementation,
ACE_NEW (implementation, ACE_POSIX_AIOCB_Proactor);
#elif defined (ACE_POSIX_SIG_PROACTOR)
ACE_NEW (implementation, ACE_POSIX_SIG_Proactor);
- #else /* Default is to use the AIOCB one */
- ACE_NEW (implementation, ACE_POSIX_AIOCB_Proactor);
+#else /* Default is to use the SIG one */
+ ACE_NEW (implementation, ACE_POSIX_SIG_Proactor);
#endif
#elif (defined (ACE_WIN32) && !defined (ACE_HAS_WINCE))
// WIN_Proactor.
@@ -637,7 +631,8 @@ ACE_Proactor::create_asynch_read_stream_result (ACE_Handler &handler,
u_long bytes_to_read,
const void* act,
ACE_HANDLE event,
- int priority)
+ int priority,
+ int signal_number)
{
return this->implementation ()->create_asynch_read_stream_result (handler,
handle,
@@ -645,7 +640,8 @@ ACE_Proactor::create_asynch_read_stream_result (ACE_Handler &handler,
bytes_to_read,
act,
event,
- priority);
+ priority,
+ signal_number);
}
@@ -656,7 +652,8 @@ ACE_Proactor::create_asynch_write_stream_result (ACE_Handler &handler,
u_long bytes_to_write,
const void* act,
ACE_HANDLE event,
- int priority)
+ int priority,
+ int signal_number)
{
return this->implementation ()->create_asynch_write_stream_result (handler,
@@ -665,7 +662,8 @@ ACE_Proactor::create_asynch_write_stream_result (ACE_Handler &handler,
bytes_to_write,
act,
event,
- priority);
+ priority,
+ signal_number);
}
@@ -679,7 +677,8 @@ ACE_Proactor::create_asynch_read_file_result (ACE_Handler &handler,
u_long offset,
u_long offset_high,
ACE_HANDLE event,
- int priority)
+ int priority,
+ int signal_number)
{
return this->implementation ()->create_asynch_read_file_result (handler,
@@ -690,7 +689,8 @@ ACE_Proactor::create_asynch_read_file_result (ACE_Handler &handler,
offset,
offset_high,
event,
- priority);
+ priority,
+ signal_number);
}
@@ -704,7 +704,8 @@ ACE_Proactor::create_asynch_write_file_result (ACE_Handler &handler,
u_long offset,
u_long offset_high,
ACE_HANDLE event,
- int priority)
+ int priority,
+ int signal_number)
{
return this->implementation ()->create_asynch_write_file_result (handler,
@@ -715,7 +716,8 @@ ACE_Proactor::create_asynch_write_file_result (ACE_Handler &handler,
offset,
offset_high,
event,
- priority);
+ priority,
+ signal_number);
}
@@ -727,7 +729,8 @@ ACE_Proactor::create_asynch_accept_result (ACE_Handler &handler,
u_long bytes_to_read,
const void* act,
ACE_HANDLE event,
- int priority)
+ int priority,
+ int signal_number)
{
return this->implementation ()->create_asynch_accept_result (handler,
@@ -737,7 +740,8 @@ ACE_Proactor::create_asynch_accept_result (ACE_Handler &handler,
bytes_to_read,
act,
event,
- priority);
+ priority,
+ signal_number);
}
ACE_Asynch_Transmit_File_Result_Impl *
@@ -752,7 +756,8 @@ ACE_Proactor::create_asynch_transmit_file_result (ACE_Handler &handler,
u_long flags,
const void *act,
ACE_HANDLE event,
- int priority)
+ int priority,
+ int signal_number)
{
return this->implementation ()->create_asynch_transmit_file_result (handler,
@@ -766,7 +771,8 @@ ACE_Proactor::create_asynch_transmit_file_result (ACE_Handler &handler,
flags,
act,
event,
- priority);
+ priority,
+ signal_number);
}
ACE_Asynch_Result_Impl *
@@ -774,13 +780,15 @@ ACE_Proactor::create_asynch_timer (ACE_Handler &handler,
const void *act,
const ACE_Time_Value &tv,
ACE_HANDLE event,
- int priority)
+ int priority,
+ int signal_number)
{
return this->implementation ()->create_asynch_timer (handler,
act,
tv,
event,
- priority);
+ priority,
+ signal_number);
}
void
diff --git a/ace/Proactor.h b/ace/Proactor.h
index abc0d605a82..4f3904f43e4 100644
--- a/ace/Proactor.h
+++ b/ace/Proactor.h
@@ -304,8 +304,9 @@ public:
ACE_Message_Block &message_block,
u_long bytes_to_read,
const void* act,
- ACE_HANDLE event,
- int priority);
+ ACE_HANDLE event = ACE_INVALID_HANDLE,
+ int priority = 0,
+ int signal_number = ACE_SIGRTMIN);
// Create the correct implementation class for ACE_Asynch_Read_Stream::Result class.
virtual ACE_Asynch_Write_Stream_Result_Impl *create_asynch_write_stream_result (ACE_Handler &handler,
@@ -313,8 +314,9 @@ public:
ACE_Message_Block &message_block,
u_long bytes_to_write,
const void* act,
- ACE_HANDLE event,
- int priority);
+ ACE_HANDLE event = ACE_INVALID_HANDLE,
+ int priority = 0,
+ int signal_number = ACE_SIGRTMIN);
// Create the correct implementation class for ACE_Asynch_Write_Stream::Result.
virtual ACE_Asynch_Read_File_Result_Impl *create_asynch_read_file_result (ACE_Handler &handler,
@@ -324,8 +326,9 @@ public:
const void* act,
u_long offset,
u_long offset_high,
- ACE_HANDLE event,
- int priority);
+ ACE_HANDLE event = ACE_INVALID_HANDLE,
+ int priority = 0,
+ int signal_number = ACE_SIGRTMIN);
// Create the correct implementation class for ACE_Asynch_Read_File::Result.
virtual ACE_Asynch_Write_File_Result_Impl *create_asynch_write_file_result (ACE_Handler &handler,
@@ -335,8 +338,9 @@ public:
const void* act,
u_long offset,
u_long offset_high,
- ACE_HANDLE event,
- int priority);
+ ACE_HANDLE event = ACE_INVALID_HANDLE,
+ int priority = 0,
+ int signal_number = ACE_SIGRTMIN);
// Create the correct implementation class for ACE_Asynch_Write_File::Result.
virtual ACE_Asynch_Accept_Result_Impl *create_asynch_accept_result (ACE_Handler &handler,
@@ -345,8 +349,9 @@ public:
ACE_Message_Block &message_block,
u_long bytes_to_read,
const void* act,
- ACE_HANDLE event,
- int priority);
+ ACE_HANDLE event = ACE_INVALID_HANDLE,
+ int priority = 0,
+ int signal_number = ACE_SIGRTMIN);
// Create the correct implementation class for ACE_Asynch_Accept::Result.
virtual ACE_Asynch_Transmit_File_Result_Impl *create_asynch_transmit_file_result (ACE_Handler &handler,
@@ -359,17 +364,22 @@ public:
u_long bytes_per_send,
u_long flags,
const void *act,
- ACE_HANDLE event,
- int priority);
+ ACE_HANDLE event = ACE_INVALID_HANDLE,
+ int priority = 0,
+ int signal_number = ACE_SIGRTMIN);
// Create the correct implementation class for ACE_Asynch_Transmit_File::Result.
-
+
virtual ACE_Asynch_Result_Impl *create_asynch_timer (ACE_Handler &handler,
const void *act,
const ACE_Time_Value &tv,
- ACE_HANDLE event,
- int priority = 0);
+ ACE_HANDLE event = ACE_INVALID_HANDLE,
+ int priority = 0,
+ int signal_number = ACE_SIGRTMIN);
// Create a timer result object which can be used with the Timer
- // mechanism of the Proactor.
+ // mechanism of the Proactor.
+ // If <signal_number> is -1, <POSIX_SIG_Proactor> will create a
+ // Timer object with a meaningful signal number, choosing the
+ // largest signal number from the signal mask of the Proactor.
protected:
virtual void implementation (ACE_Proactor_Impl *implementation);
diff --git a/ace/Proactor_Impl.h b/ace/Proactor_Impl.h
index 347de43fcd7..edae3452e1a 100644
--- a/ace/Proactor_Impl.h
+++ b/ace/Proactor_Impl.h
@@ -113,7 +113,7 @@ public:
ACE_Message_Block &message_block,
u_long bytes_to_read,
const void* act,
- ACE_HANDLE event,
+ ACE_HANDLE event = ACE_INVALID_HANDLE,
int priority = 0,
int signal_number = ACE_SIGRTMIN) = 0;
// Create the correct implementation class for ACE_Asynch_Read_Stream::Result class.
@@ -123,7 +123,7 @@ public:
ACE_Message_Block &message_block,
u_long bytes_to_write,
const void* act,
- ACE_HANDLE event,
+ ACE_HANDLE event = ACE_INVALID_HANDLE,
int priority = 0,
int signal_number = ACE_SIGRTMIN) = 0;
// Create the correct implementation class for ACE_Asynch_Write_Stream::Result.
@@ -135,7 +135,7 @@ public:
const void* act,
u_long offset,
u_long offset_high,
- ACE_HANDLE event,
+ ACE_HANDLE event = ACE_INVALID_HANDLE,
int priority = 0,
int signal_number = ACE_SIGRTMIN) = 0;
// Create the correct implementation class for ACE_Asynch_Read_File::Result.
@@ -147,7 +147,7 @@ public:
const void* act,
u_long offset,
u_long offset_high,
- ACE_HANDLE event,
+ ACE_HANDLE event = ACE_INVALID_HANDLE,
int priority = 0,
int signal_number = ACE_SIGRTMIN) = 0;
// Create the correct implementation class for ACE_Asynch_Write_File::Result.
@@ -158,7 +158,7 @@ public:
ACE_Message_Block &message_block,
u_long bytes_to_read,
const void* act,
- ACE_HANDLE event,
+ ACE_HANDLE event = ACE_INVALID_HANDLE,
int priority = 0,
int signal_number = ACE_SIGRTMIN) = 0;
// Create the correct implementation class for ACE_Asynch_Accept::Result.
@@ -173,7 +173,7 @@ public:
u_long bytes_per_send,
u_long flags,
const void *act,
- ACE_HANDLE event,
+ ACE_HANDLE event = ACE_INVALID_HANDLE,
int priority = 0,
int signal_number = ACE_SIGRTMIN) = 0;
// Create the correct implementation class for ACE_Asynch_Transmit_File::Result.
@@ -181,10 +181,12 @@ public:
virtual ACE_Asynch_Result_Impl *create_asynch_timer (ACE_Handler &handler,
const void *act,
const ACE_Time_Value &tv,
- ACE_HANDLE event,
+ ACE_HANDLE event = ACE_INVALID_HANDLE,
int priority = 0,
- int signal_number = ACE_SIGRTMIN) = 0;
- // Create the correct implementation object for the Timer result.
+ int signal_number = 0) = 0;
+ // Create the correct implementation object for the Timer
+ // result. POSIX_SIG_Proactor will create a Timer object with a
+ // meaningful signal number, if you leave the signal number as 0.
};
#endif /* (ACE_WIN32 && ACE_HAS_WINCE) || ACE_HAS_AIO_CALLS */
diff --git a/ace/WIN32_Proactor.cpp b/ace/WIN32_Proactor.cpp
index e67e3bef3e6..9e94e4699ec 100644
--- a/ace/WIN32_Proactor.cpp
+++ b/ace/WIN32_Proactor.cpp
@@ -462,7 +462,7 @@ ACE_WIN32_Proactor::post_completion (ACE_WIN32_Asynch_Result *result)
) == FALSE)
{
delete result;
- ACE_ERROR_RETURN ((LM_ERROR, "Failure in dealing with timers: PostQueuedCompletionStatus failed\n"), -1);
+ ACE_ERROR_RETURN ((LM_ERROR, "PostQueuedCompletionStatus failed\n"), -1);
}
return 0;
diff --git a/ace/config-linux-lxpthreads.h b/ace/config-linux-lxpthreads.h
index 5995b9c8dd8..240ba39f8c3 100644
--- a/ace/config-linux-lxpthreads.h
+++ b/ace/config-linux-lxpthreads.h
@@ -73,4 +73,6 @@
#include /**/ <pthread.h>
+#define ACE_HAS_AIO_CALLS
+
#endif /* ACE_CONFIG_H */
diff --git a/ace/config-lynxos.h b/ace/config-lynxos.h
index e60b2125bec..b6862299735 100644
--- a/ace/config-lynxos.h
+++ b/ace/config-lynxos.h
@@ -200,6 +200,12 @@ extern "C"
// Aio works on lynx
#define ACE_HAS_AIO_CALLS
-// AIOCB Proactor doesnt work on lynx yet.
-#define ACE_POSIX_SIG_PROACTOR
+// AIOCB Proactor works on Lynx. But it is not
+// multi-threaded.
+// Lynx OS 3.0.0 lacks POSIX call <pthread_sigmask>. So,we cannot use
+// SIG Proactor also, with multiple threads. So, let us use the AIOCB
+// Proactor. Once <pthreadd_sigmask> is available on Lynx, we can turn
+// on SIG Proactor for this platform.
+// #define ACE_POSIX_SIG_PROACTOR
+#define ACE_POSIX_AIOCB_PROACTOR
#endif /* ACE_CONFIG_H */