summaryrefslogtreecommitdiff
path: root/ace/Proactor.cpp
diff options
context:
space:
mode:
authoralex <alex@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>1998-09-25 19:51:45 +0000
committeralex <alex@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>1998-09-25 19:51:45 +0000
commit036c81e60d05de7d1613dbf9b29f3be7cf2ab5a1 (patch)
tree0a8a701211e88221ae2bcce719c8e72f2eef55bd /ace/Proactor.cpp
parent447c0d71a34fc01455b1cfeff5971d8b209701ce (diff)
downloadATCD-036c81e60d05de7d1613dbf9b29f3be7cf2ab5a1.tar.gz
Implemented Asynch_Accept to work for AIO_CONTROL_BLOCKS strategy of
completion notification. Defined an auxillary Accept_Handler called ACE_AIO_Accept_Handler in addition to the ACE_Asynch_Accept_Handler. ACE_AIO_Accept_Handler holds the notification pipe and does a read on it to handle the <ACE_Asynch_Accept::Result> coming from Asynch_Accept_handler. THANKS to Doug and Irfan for suggesting this 'notification pipe' based implementation for AIO_CONTROL_BLOCKS strategy, so that Proactor will work on platforms where POSIX4 RT_Signals are broken (Solaris 2.6)
Diffstat (limited to 'ace/Proactor.cpp')
-rw-r--r--ace/Proactor.cpp211
1 files changed, 190 insertions, 21 deletions
diff --git a/ace/Proactor.cpp b/ace/Proactor.cpp
index 02af632873a..4e8ac342934 100644
--- a/ace/Proactor.cpp
+++ b/ace/Proactor.cpp
@@ -68,6 +68,8 @@ class ACE_Export ACE_Proactor_Timer_Handler : public ACE_Task <ACE_NULL_SYNCH>
// Flag used to indicate when we are shutting down.
};
+
+
ACE_Proactor_Timer_Handler::ACE_Proactor_Timer_Handler (ACE_Proactor &proactor)
: ACE_Task <ACE_NULL_SYNCH> (&proactor.thr_mgr_),
proactor_ (proactor),
@@ -136,6 +138,136 @@ ACE_Proactor_Timer_Handler::svc (void)
#endif /* ACE_HAS_AIO_CALLS */
}
+#if defined (ACE_HAS_AIO_CALLS)
+class ACE_Export ACE_AIO_Accept_Handler : public ACE_Handler
+{
+ // = TITLE
+ // Helper class for doing Asynch_Accept in POSIX4 systems, in
+ // the case of doing AIO_CONTROL_BLOCKS strategy.
+ //
+ // = DESCRIPTION
+ // Doing Asynch_Accept in POSIX4 implementation is tricky. In
+ // the case of doing the things with AIO_CONTROL_BLOCKS and not
+ // with Real-Time Signals, this becomes even more trickier. We
+ // use a notifyn pipe here to implement Asynch_Accpet. This class
+ // will issue a <Asynch_Read> on the pipe. <Asynch_Accept> will
+ // send a result pointer containg all the information through
+ // this pipe.
+public:
+ ACE_AIO_Accept_Handler (ACE_Proactor* proactor);
+ // Constructor.
+
+ ~ACE_AIO_Accept_Handler (void);
+ // Destructor.
+
+#if 0
+ int accept (void);
+ // <Asynch_Accept> calls this when an <accept> call has been issued
+ // by the application. We issue an <Asynch_Read> here on the <pipe>,
+ // so that <Asynch_Accept> can notify us by sending us Result
+ // pointer.
+#endif /* 0 */
+
+ int notify (ACE_Asynch_Accept::Result* result);
+ // Send this Result to Proactor through the notification pipe.
+
+ virtual void handle_read_stream (const ACE_Asynch_Read_Stream::Result &result);
+ // Read from the pipe is complete. We get the <Result> from
+ // Asynch_Handler. We have to do the notification here.
+
+private:
+ ACE_Proactor* proactor_;
+ // The proactor in use.
+
+ ACE_Message_Block message_block_;
+ // Message block to get ACE_Asynch_Accept::Result from
+ // ACE_Asych_Accept.
+
+ ACE_Pipe pipe_;
+ // Pipe for the communication between Proactor and the
+ // Asynch_Accept.
+
+ ACE_Asynch_Read_Stream read_stream_;
+ // To do asynch_read on the pipe.
+
+ ACE_AIO_Accept_Handler (void);
+ // Default constructor. Shouldnt be called.
+};
+
+ACE_AIO_Accept_Handler::ACE_AIO_Accept_Handler (ACE_Proactor* proactor)
+ : proactor_ (proactor),
+ message_block_ (sizeof (ACE_Asynch_Accept::Result))
+{
+ // Open the pipe.
+ this->pipe_.open ();
+
+ // Open the read stream.
+ if (this->read_stream_.open (*this,
+ this->pipe_.read_handle (),
+ 0,
+ this->proactor_) == -1)
+ ACE_ERROR ((LM_ERROR,
+ "%N:%l:%p\n",
+ "Open on Read Stream failed"));
+
+ // Issue an asynch_read on the read_stream.
+ if (this->read_stream_.read (this->message_block_,
+ sizeof (ACE_Asynch_Accept::Result)) == -1)
+ ACE_ERROR ((LM_ERROR,
+ "%N:%l:%p\n",
+ "Read from stream failed"));
+}
+
+#if 0
+int
+ACE_AIO_Accept_Handler::accept (void)
+{
+ // Issue an asynch_read on the read_stream.
+ if (this->read_stream_.read (this->message_block,
+ sizeof (ACE_Asynch_Accept::Result)) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "%N:%l:%p\n",
+ "Read from stream failed"),
+ -1);
+}
+#endif /* 0 */
+
+ACE_AIO_Accept_Handler::~ACE_AIO_Accept_Handler (void)
+{
+}
+
+int
+ACE_AIO_Accept_Handler::notify (ACE_Asynch_Accept::Result* result)
+{
+ // Send the result through the pipe.
+ if (ACE_OS::write (this->pipe_.write_handle (),
+ (void *) result,
+ sizeof (*result)) < sizeof (*result))
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "(%P %t):%p\n",
+ "Error:Writing on to pipe failed"),
+ -1);
+}
+
+void
+ACE_AIO_Accept_Handler::handle_read_stream (const ACE_Asynch_Read_Stream::Result &result)
+{
+ // @@
+ ACE_DEBUG ((LM_DEBUG, "ACE_AIO_Accept_Handler::handle_read_stream called\n"));
+
+ // The message block actually contains the ACE_Asynch_Accept::Result.
+ ACE_Asynch_Accept::Result* accept_result =
+ (ACE_Asynch_Accept::Result*) result.message_block ().rd_ptr ();
+
+ // Do the upcall.
+ this->proactor_->application_specific_code (accept_result,
+ 0, // Bytes transferred.
+ 1, // Success.
+ 0, // Completion token.
+ 0); // Error.
+}
+#endif /* ACE_HAS_AIO_CALLS */
+
ACE_Proactor_Handle_Timeout_Upcall::ACE_Proactor_Handle_Timeout_Upcall (void)
: proactor_ (0)
{
@@ -217,6 +349,8 @@ ACE_Proactor::ACE_Proactor (size_t number_of_threads,
POSIX_COMPLETION_STRATEGY completion_strategy)
:
#if defined (ACE_HAS_AIO_CALLS)
+ posix_completion_strategy_ (completion_strategy),
+ aio_accept_handler_ (0),
aiocb_list_max_size_ (ACE_RTSIG_MAX),
aiocb_list_cur_size_ (0),
#else /* ACE_HAS_AIO_CALLS */
@@ -228,20 +362,28 @@ ACE_Proactor::ACE_Proactor (size_t number_of_threads,
delete_timer_queue_ (0),
timer_handler_ (0),
used_with_reactor_event_loop_ (used_with_reactor_event_loop)
-#if defined (ACE_HAS_AIO_CALLS)
- , posix_completion_strategy_ (completion_strategy)
-#endif /* ACE_HAS_AIO_CALLS */
{
#if defined (ACE_HAS_AIO_CALLS)
- // Initialize the array.
- for (size_t ai = 0;
- ai < this->aiocb_list_max_size_;
- ai++)
- aiocb_list_[ai] = 0;
-
ACE_UNUSED_ARG (number_of_threads);
ACE_UNUSED_ARG (tq);
+ // The following things are necessary only for the
+ // AIO_CONTROL_BLOCKS strategy.
+ if (this->posix_completion_strategy_ == AIO_CONTROL_BLOCKS)
+ {
+ // Initialize the array.
+ for (size_t ai = 0;
+ ai < this->aiocb_list_max_size_;
+ ai++)
+ aiocb_list_[ai] = 0;
+
+ // Accept Handler for aio_accept. Remember! this issues a Asynch_Read
+ // on the notify pipe for doing the Asynch_Accept.
+ ACE_NEW (aio_accept_handler_,
+ ACE_AIO_Accept_Handler (this));
+ }
+
+
// Mask the RT_compeltion signals if we are using the RT_SIGNALS
// STRATEGY for completion querying.
if (completion_strategy == RT_SIGNALS)
@@ -249,17 +391,20 @@ ACE_Proactor::ACE_Proactor (size_t number_of_threads,
// Make the sigset_t consisting of the completion signals.
if (sigemptyset (&this->RT_completion_signals_) < 0)
ACE_ERROR ((LM_ERROR,
- "Error:%p:Couldnt init the RT completion signal set\n"));
+ "Error:%p\n",
+ "Couldn't init the RT completion signal set"));
if (sigaddset (&this->RT_completion_signals_,
ACE_SIG_AIO) < 0)
ACE_ERROR ((LM_ERROR,
- "Error:%p:Couldnt init the RT completion signal set\n"));
+ "Error:%p\n",
+ "Couldnt init the RT completion signal set"));
// Mask them.
if (sigprocmask (SIG_BLOCK, &RT_completion_signals_, 0) < 0)
ACE_ERROR ((LM_ERROR,
- "Error:%p:Couldnt maks the RT completion signals\n"));
+ "Error:%p\n",
+ "Couldnt mask the RT completion signals"));
// Setting up the handler(!) for these signals.
struct sigaction reaction;
@@ -275,7 +420,8 @@ ACE_Proactor::ACE_Proactor (size_t number_of_threads,
0);
if (sigaction_return == -1)
ACE_ERROR ((LM_ERROR,
- "Error:%p:Proactorc ouldnt do sigaction for the RT SIGNAL"));
+ "Error:%p\n",
+ "Proactor couldnt do sigaction for the RT SIGNAL"));
}
#else /* ACE_HAS_AIO_CALLS */
@@ -615,6 +761,22 @@ ACE_Proactor::posix_completion_strategy (void)
{
return posix_completion_strategy_;
}
+
+#if 0
+void
+ACE_Proactor::posix_completion_strategy (ACE_Proactor::POSIX_COMPLETION_STRATEGY strategy)
+{
+ this->posix_completion_strategy_ = strategy;
+}
+#endif /* 0 */
+
+int
+ACE_Proactor::notify_asynch_accept (ACE_Asynch_Accept::Result* result)
+{
+ this->aio_accept_handler_->notify (result);
+
+ return 0;
+}
#endif /* ACE_HAS_AIO_CALLS */
int
@@ -659,7 +821,8 @@ ACE_Proactor::handle_events (unsigned long milli_seconds)
// does.
if (sig_return == -1)
ACE_ERROR_RETURN ((LM_ERROR,
- "Error:%p:Error waiting for RT completion signals\n"),
+ "Error:%p\n",
+ "Error waiting for RT completion signals"),
0);
// RT completion signals returned.
@@ -705,21 +868,24 @@ ACE_Proactor::handle_events (unsigned long milli_seconds)
int error_code = aio_error (aiocb_ptr);
if (error_code == -1)
ACE_ERROR_RETURN ((LM_ERROR,
- "%p:Invalid control block was sent to <aio_error> for compleion querying\n"),
+ "Error:%p\n",
+ "Invalid control block was sent to <aio_error> for compleion querying"),
-1);
if (error_code != 0)
// Error occurred in the <aio_>call. Return the errno
// corresponding to that <aio_> call.
ACE_ERROR_RETURN ((LM_ERROR,
- "%p:An AIO call has failed\n"),
+ "Error:%p\n",
+ "An AIO call has failed"),
error_code);
// No error occured in the AIO operation.
int nbytes = aio_return (aiocb_ptr);
if (nbytes == -1)
ACE_ERROR_RETURN ((LM_ERROR,
- "%p:Invalid control block was send to <aio_return>\n"),
+ "Error:%p\n",
+ "Invalid control block was send to <aio_return>"),
-1);
// <nbytes> have been successfully transmitted.
@@ -742,7 +908,7 @@ ACE_Proactor::handle_events (unsigned long milli_seconds)
0, // No bytes transferred.
1, // Result : True.
0, // No completion key.
- 0);
+ 0); // No error.
}
else
@@ -773,11 +939,13 @@ ACE_Proactor::handle_events (unsigned long milli_seconds)
// appropriately. This is what the WinNT proactor does.
if (errno == EAGAIN)
ACE_ERROR_RETURN ((LM_ERROR,
- "(%p):aio_suspend"),
+ "Error:%p\n",
+ "aio_suspend"),
0);
else
ACE_ERROR_RETURN ((LM_ERROR,
- "(%p):aio_suspend"),
+ "Error:%p\n",
+ "aio_suspend"),
-1);
// Check which aio has finished.
size_t ai;
@@ -793,7 +961,8 @@ ACE_Proactor::handle_events (unsigned long milli_seconds)
if (nbytes == -1)
ACE_ERROR_RETURN ((LM_ERROR,
- "(%p):AIO failed"),
+ "Error:%p\n",
+ "AIO failed"),
-1);
else
{