summaryrefslogtreecommitdiff
path: root/ace/POSIX_Proactor.cpp
diff options
context:
space:
mode:
authoralex <alex@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>1999-02-23 21:54:24 +0000
committeralex <alex@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>1999-02-23 21:54:24 +0000
commitbb7275c88c82a716d8cd8c453822f48d7e18e0c3 (patch)
treed61a1384c907d09c8fec2f76e93a30067268142b /ace/POSIX_Proactor.cpp
parentcd96078053429a85edbb437607c7347f8df09804 (diff)
downloadATCD-bb7275c88c82a716d8cd8c453822f48d7e18e0c3.tar.gz
Implemented <post_completion> for POSIX platforms. Thanks to Irfan for
the cool design. This API has been changed a little bit for portability. <post_completion> API now exists at <ACE_Asynch_Result_Impl> class. To post completions, users will have to get hold of an <ACE_Asynch_Result_Impl> class (either get it from the predefined factory methods at the Proactor or derive from <ACE_WIN32_Asynch_Result> or <ACE_POSIX_Asynch_Result>, then call <post_completion> on it passing in the <Proactor_Impl *> which can be got through <implementation> method in the <ACE_Proactor>. The need for RTTI has been avioded in this design.
Diffstat (limited to 'ace/POSIX_Proactor.cpp')
-rw-r--r--ace/POSIX_Proactor.cpp302
1 files changed, 148 insertions, 154 deletions
diff --git a/ace/POSIX_Proactor.cpp b/ace/POSIX_Proactor.cpp
index 16417118e20..dae58e21e24 100644
--- a/ace/POSIX_Proactor.cpp
+++ b/ace/POSIX_Proactor.cpp
@@ -140,43 +140,69 @@ ACE_POSIX_Proactor::cancel_timer (ACE_Handler &handler,
#endif /* 0 */
}
-#if 0
int
-ACE_POSIX_Proactor::handle_signal (int, siginfo_t *, ucontext_t *)
+ACE_POSIX_Proactor::wake_up_dispatch_threads (void)
{
- // Perform a non-blocking "poll" for all the I/O events that have
- // completed in the I/O completion queue.
+ return 0;
+}
- ACE_Time_Value timeout (0, 0);
- int result = 0;
+int
+ACE_POSIX_Proactor::close_dispatch_threads (int)
+{
+ return 0;
+}
- while (1)
- {
- result = this->handle_events (timeout);
- if (result != 0 || errno == ETIME)
- break;
- }
+size_t
+ACE_POSIX_Proactor::number_of_threads (void) const
+{
+ // @@ Implement it.
+ ACE_NOTSUP_RETURN (0);
+}
- // If our handle_events failed, we'll report a failure to the
- // Reactor.
- return result == -1 ? -1 : 0;
+void
+ACE_POSIX_Proactor::number_of_threads (size_t threads)
+{
+ // @@ Implement it.
+ ACE_UNUSED_ARG (threads);
}
-int
-ACE_POSIX_Proactor::handle_close (ACE_HANDLE handle,
- ACE_Reactor_Mask close_mask)
+#if 0
+ACE_Proactor::Timer_Queue *
+ACE_POSIX_Proactor::timer_queue (void) const
{
- ACE_UNUSED_ARG (close_mask);
- ACE_UNUSED_ARG (handle);
+ return this->timer_queue_;
+}
- return this->close ();
+void
+ACE_POSIX_Proactor::timer_queue (Timer_Queue *tq)
+{
+ // cleanup old timer queue
+ if (this->delete_timer_queue_)
+ {
+ delete this->timer_queue_;
+ this->delete_timer_queue_ = 0;
+ }
+
+ // new timer queue
+ if (tq == 0)
+ {
+ this->timer_queue_ = new Timer_Heap;
+ this->delete_timer_queue_ = 1;
+ }
+ else
+ {
+ this->timer_queue_ = tq;
+ this->delete_timer_queue_ = 0;
+ }
+
+ // Set the proactor in the timer queue's functor
+ this->timer_queue_->upcall_functor ().proactor (*this);
}
#endif /* 0 */
ACE_HANDLE
ACE_POSIX_Proactor::get_handle (void) const
{
- // @@ Implement this.
return ACE_INVALID_HANDLE;
}
@@ -336,6 +362,39 @@ ACE_POSIX_Proactor::ACE_POSIX_Proactor (void)
{
}
+#if 0
+int
+ACE_POSIX_Proactor::handle_signal (int, siginfo_t *, ucontext_t *)
+{
+ // Perform a non-blocking "poll" for all the I/O events that have
+ // completed in the I/O completion queue.
+
+ ACE_Time_Value timeout (0, 0);
+ int result = 0;
+
+ while (1)
+ {
+ result = this->handle_events (timeout);
+ if (result != 0 || errno == ETIME)
+ break;
+ }
+
+ // If our handle_events failed, we'll report a failure to the
+ // Reactor.
+ return result == -1 ? -1 : 0;
+}
+
+int
+ACE_POSIX_Proactor::handle_close (ACE_HANDLE handle,
+ ACE_Reactor_Mask close_mask)
+{
+ ACE_UNUSED_ARG (close_mask);
+ ACE_UNUSED_ARG (handle);
+
+ return this->close ();
+}
+#endif /* 0 */
+
void
ACE_POSIX_Proactor::application_specific_code (ACE_POSIX_Asynch_Result *asynch_result,
u_long bytes_transferred,
@@ -358,84 +417,28 @@ ACE_POSIX_Proactor::application_specific_code (ACE_POSIX_Asynch_Result *asynch_r
}
}
-
-
-int
-ACE_POSIX_Proactor::wake_up_dispatch_threads (void)
-{
- return 0;
-}
-
-int
-ACE_POSIX_Proactor::close_dispatch_threads (int)
-{
- return 0;
-}
-
-size_t
-ACE_POSIX_Proactor::number_of_threads (void) const
-{
- // @@ Implement it.
- return 0;
-}
-
-void
-ACE_POSIX_Proactor::number_of_threads (size_t threads)
-{
- // @@ Implement it.
- ACE_UNUSED_ARG (threads);
-}
-
-#if 0
-ACE_Proactor::Timer_Queue *
-ACE_POSIX_Proactor::timer_queue (void) const
-{
- return this->timer_queue_;
-}
-
-void
-ACE_POSIX_Proactor::timer_queue (Timer_Queue *tq)
-{
- // cleanup old timer queue
- if (this->delete_timer_queue_)
- {
- delete this->timer_queue_;
- this->delete_timer_queue_ = 0;
- }
-
- // new timer queue
- if (tq == 0)
- {
- this->timer_queue_ = new Timer_Heap;
- this->delete_timer_queue_ = 1;
- }
- else
- {
- this->timer_queue_ = tq;
- this->delete_timer_queue_ = 0;
- }
-
- // Set the proactor in the timer queue's functor
- this->timer_queue_->upcall_functor ().proactor (*this);
-}
-#endif /* 0 */
-
// *********************************************************************
-class ACE_Export ACE_AIO_Accept_Handler : public ACE_Handler
+class ACE_Export ACE_AIOCB_Notify_Pipe_Manager : public ACE_Handler
{
// = TITLE
- // Helper class for doing Asynch_Accept in POSIX4 systems, in
- // the case of doing AIO_CONTROL_BLOCKS strategy.
+ // This class manages the notify pipe of the AIOCB
+ // Proactor. This class acts as the Handler for the
+ // <Asynch_Read> operations issued on the notify pipe. This
+ // class is very useful in implementing <Asynch_Accept> operation
+ // class for the <AIOCB_Proactor>. This is also useful for
+ // implementing <post_completion> for <AIOCB_Proactor>.
//
// = DESCRIPTION
- // Doing Asynch_Accept in POSIX4 implementation is tricky. In
- // the case of doing the things with AIO_CONTROL_BLOCKS and not
- // with Real-Time Signals, this becomes even more trickier. We
- // use a notifyn pipe here to implement Asynch_Accpet. This class
- // will issue a <Asynch_Read> on the pipe. <Asynch_Accept> will
- // send a result pointer containg all the information through
- // this pipe.
+ // <AIOCB_Proactor> class issues a <Asynch_Read> on
+ // the pipe, using this class as the
+ // Handler. <POSIX_Asynch_Result *>'s are sent through the
+ // notify pipe. When <POSIX_Asynch_Result *>'s show up on the
+ // notify pipe, the <POSIX_AIOCB_Proactor> dispatches the
+ // completion of the <Asynch_Read_Stream> and calls the
+ // <handle_read_stream> of this class. This class calls complete
+ // on the <POSIX_Asynch_Result *> and thus calls the application
+ // handler.
// Handling the MessageBlock:
// We give this message block to read the result pointer through
// the notify pipe. We expect that to read 4 bytes from the
@@ -443,41 +446,27 @@ class ACE_Export ACE_AIO_Accept_Handler : public ACE_Handler
// message block to another <accept>, we update <wr_ptr> and put
// it in its initial position.
public:
- ACE_AIO_Accept_Handler (ACE_POSIX_AIOCB_Proactor *posix_aiocb_proactor);
+ ACE_AIOCB_Notify_Pipe_Manager (ACE_POSIX_AIOCB_Proactor *posix_aiocb_proactor);
// Constructor. You need the posix proactor because you need to call
- // things like application_specific_code etc. Having the main
- // proactor is good so that life is easier when we use Asynch_Read,
- // while issuing asynchronous read on the notify pipe.
-
- virtual ~ACE_AIO_Accept_Handler (void);
+ // <application_specific_code>
+
+ virtual ~ACE_AIOCB_Notify_Pipe_Manager (void);
// Destructor.
-
-#if 0
- void proactor (ACE_Proactor *proactor);
- // Set the Proactor pointer. POSIX_AIOCB_Proactor may not have the
- // Proactor pointer when it creates this object and so we have this
- // method so that it can set it when it gets it.
-#endif /* 0 */
-
- int notify (ACE_POSIX_Asynch_Accept_Result *result);
- // Send this Result to Proactor through the notification pipe.
+
+ int notify (ACE_POSIX_Asynch_Result *result);
+ // Send the result pointer through the notification pipe.
virtual void handle_read_stream (const ACE_Asynch_Read_Stream::Result &result);
- // Read from the pipe is complete. We get the <Result> from
- // Asynch_Handler. We have to do the notification here.
+ // This is the call back method when <Asynch_Read> from the pipe is
+ // complete.
private:
-#if 0
- ACE_Proactor *proactor_;
- // The main proactor interface.
-#endif /* 0 */
-
ACE_POSIX_AIOCB_Proactor *posix_aiocb_proactor_;
// The implementation proactor class.
-
+
ACE_Message_Block message_block_;
- // Message block to get ACE_Asynch_Accept::Result pointer from
- // ACE_Asych_Accept.
+ // Message block to get ACE_POSIX_Asynch_Result pointer from the
+ // pipe.
ACE_Pipe pipe_;
// Pipe for the communication between Proactor and the
@@ -486,11 +475,11 @@ private:
ACE_POSIX_AIOCB_Asynch_Read_Stream read_stream_;
// To do asynch_read on the pipe.
- ACE_AIO_Accept_Handler (void);
+ ACE_AIOCB_Notify_Pipe_Manager (void);
// Default constructor. Shouldnt be called.
};
-ACE_AIO_Accept_Handler::ACE_AIO_Accept_Handler (ACE_POSIX_AIOCB_Proactor *posix_aiocb_proactor)
+ACE_AIOCB_Notify_Pipe_Manager::ACE_AIOCB_Notify_Pipe_Manager (ACE_POSIX_AIOCB_Proactor *posix_aiocb_proactor)
: posix_aiocb_proactor_ (posix_aiocb_proactor),
message_block_ (sizeof (ACE_POSIX_Asynch_Accept_Result *)),
read_stream_ (posix_aiocb_proactor)
@@ -510,21 +499,21 @@ ACE_AIO_Accept_Handler::ACE_AIO_Accept_Handler (ACE_POSIX_AIOCB_Proactor *posix_
// Issue an asynch_read on the read_stream of the notify pipe.
if (this->read_stream_.read (this->message_block_,
- sizeof (ACE_POSIX_Asynch_Accept_Result *),
+ sizeof (ACE_POSIX_Asynch_Result *),
0, // ACT
0) // Priority
== -1)
ACE_ERROR ((LM_ERROR,
"%N:%l:%p\n",
- "Read from stream failed"));
+ "Read from pipe failed"));
}
-ACE_AIO_Accept_Handler::~ACE_AIO_Accept_Handler (void)
+ACE_AIOCB_Notify_Pipe_Manager::~ACE_AIOCB_Notify_Pipe_Manager (void)
{
}
int
-ACE_AIO_Accept_Handler::notify (ACE_POSIX_Asynch_Accept_Result *result)
+ACE_AIOCB_Notify_Pipe_Manager::notify (ACE_POSIX_Asynch_Result *result)
{
// Send the result pointer through the pipe.
int return_val = ACE::send (this->pipe_.write_handle (),
@@ -533,24 +522,21 @@ ACE_AIO_Accept_Handler::notify (ACE_POSIX_Asynch_Accept_Result *result)
if (return_val != sizeof (result))
ACE_ERROR_RETURN ((LM_ERROR,
"(%P %t):%p\n",
- "Error:Writing on to pipe failed"),
+ "Error:Writing on to notify pipe failed"),
-1);
return 0;
}
void
-ACE_AIO_Accept_Handler::handle_read_stream (const ACE_Asynch_Read_Stream::Result &result)
+ACE_AIOCB_Notify_Pipe_Manager::handle_read_stream (const ACE_Asynch_Read_Stream::Result &result)
{
- // @@
- ACE_DEBUG ((LM_DEBUG, "ACE_AIO_Accept_Handler::handle_read_stream called\n"));
-
- // The message block actually contains the ACE_POSIX_Asynch_Accept_Result
- // pointer.
- ACE_POSIX_Asynch_Accept_Result *accept_result = 0;
- accept_result = *(ACE_POSIX_Asynch_Accept_Result **) result.message_block ().rd_ptr ();
+ // The message block actually contains the ACE_POSIX_Asynch_Result
+ // pointer.
+ ACE_POSIX_Asynch_Result *asynch_result = 0;
+ asynch_result = *(ACE_POSIX_Asynch_Result **) result.message_block ().rd_ptr ();
// Do the upcall.
- this->posix_aiocb_proactor_->application_specific_code (accept_result,
+ this->posix_aiocb_proactor_->application_specific_code (asynch_result,
0, // No Bytes transferred.
1, // Success.
0, // Completion token.
@@ -561,22 +547,22 @@ ACE_AIO_Accept_Handler::handle_read_stream (const ACE_Asynch_Read_Stream::Result
if (this->message_block_.length () > 0)
this->message_block_.wr_ptr (this->message_block_.rd_ptr ());
- // One accept has completed. Issue a read to handle any <accept>s in
- // the future.
+ // One accept has completed. Issue a read to handle any
+ // <post_completion>s in the future.
if (this->read_stream_.read (this->message_block_,
- sizeof (ACE_POSIX_Asynch_Accept_Result),
+ sizeof (ACE_POSIX_Asynch_Result *),
0, // ACT
0) // Priority
== -1)
ACE_ERROR ((LM_ERROR,
"%N:%l:%p\n",
- "Read from stream failed"));
+ "Read from pipe failed"));
}
// *********************************************************************
ACE_POSIX_AIOCB_Proactor::ACE_POSIX_AIOCB_Proactor (void)
- : aio_accept_handler_ (0),
+ : aiocb_notify_pipe_manager_ (0),
aiocb_list_max_size_ (ACE_RTSIG_MAX),
aiocb_list_cur_size_ (0)
{
@@ -586,8 +572,8 @@ ACE_POSIX_AIOCB_Proactor::ACE_POSIX_AIOCB_Proactor (void)
// Accept Handler for aio_accept. Remember! this issues a Asynch_Read
// on the notify pipe for doing the Asynch_Accept.
- ACE_NEW (aio_accept_handler_,
- ACE_AIO_Accept_Handler (this));
+ ACE_NEW (aiocb_notify_pipe_manager_,
+ ACE_AIOCB_Notify_Pipe_Manager (this));
}
// Destructor.
@@ -612,8 +598,7 @@ ACE_POSIX_AIOCB_Proactor::handle_events (void)
int
ACE_POSIX_AIOCB_Proactor::post_completion (ACE_POSIX_Asynch_Result *result)
{
- ACE_UNUSED_ARG (result);
- return 0;
+ return this->aiocb_notify_pipe_manager_->notify (result);
}
ACE_Asynch_Read_Stream_Impl *
@@ -677,14 +662,6 @@ ACE_POSIX_AIOCB_Proactor::create_asynch_transmit_file (void)
}
int
-ACE_POSIX_AIOCB_Proactor::notify_asynch_accept (ACE_POSIX_Asynch_Accept_Result* result)
-{
- this->aio_accept_handler_->notify (result);
-
- return 0;
-}
-
-int
ACE_POSIX_AIOCB_Proactor::handle_events (unsigned long milli_seconds)
{
if (this->aiocb_list_cur_size_ == 0)
@@ -917,7 +894,24 @@ ACE_POSIX_SIG_Proactor::handle_events (void)
int
ACE_POSIX_SIG_Proactor::post_completion (ACE_POSIX_Asynch_Result *result)
{
- ACE_UNUSED_ARG (result);
+ // Get this process id.
+ pid_t pid = ACE_OS::getpid ();
+ if (pid == (pid_t) -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "Error:(%P | %t):%p",
+ "<getpid> failed\n"),
+ -1);
+
+ // Set the signal information.
+ sigval value;
+ value.sival_ptr = (void *) result;
+
+ // Queue the signal.
+ if (sigqueue (pid, ACE_SIG_AIO, value) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "Error:(%P | %t):%p",
+ "<sigqueue> failed\n"),
+ -1);
return 0;
}