diff options
author | alex <alex@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 1999-02-23 21:54:24 +0000 |
---|---|---|
committer | alex <alex@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 1999-02-23 21:54:24 +0000 |
commit | bb7275c88c82a716d8cd8c453822f48d7e18e0c3 (patch) | |
tree | d61a1384c907d09c8fec2f76e93a30067268142b /ace/POSIX_Proactor.cpp | |
parent | cd96078053429a85edbb437607c7347f8df09804 (diff) | |
download | ATCD-bb7275c88c82a716d8cd8c453822f48d7e18e0c3.tar.gz |
Implemented <post_completion> for POSIX platforms. Thanks to Irfan for
the cool design. This API has been changed a little bit for
portability. <post_completion> API now exists at
<ACE_Asynch_Result_Impl> class. To post completions, users will have
to get hold of an <ACE_Asynch_Result_Impl> class (either get it from
the predefined factory methods at the Proactor or derive from
<ACE_WIN32_Asynch_Result> or <ACE_POSIX_Asynch_Result>, then call
<post_completion> on it passing in the <Proactor_Impl *> which can be
got through <implementation> method in the <ACE_Proactor>.
The need for RTTI has been avioded in this design.
Diffstat (limited to 'ace/POSIX_Proactor.cpp')
-rw-r--r-- | ace/POSIX_Proactor.cpp | 302 |
1 files changed, 148 insertions, 154 deletions
diff --git a/ace/POSIX_Proactor.cpp b/ace/POSIX_Proactor.cpp index 16417118e20..dae58e21e24 100644 --- a/ace/POSIX_Proactor.cpp +++ b/ace/POSIX_Proactor.cpp @@ -140,43 +140,69 @@ ACE_POSIX_Proactor::cancel_timer (ACE_Handler &handler, #endif /* 0 */ } -#if 0 int -ACE_POSIX_Proactor::handle_signal (int, siginfo_t *, ucontext_t *) +ACE_POSIX_Proactor::wake_up_dispatch_threads (void) { - // Perform a non-blocking "poll" for all the I/O events that have - // completed in the I/O completion queue. + return 0; +} - ACE_Time_Value timeout (0, 0); - int result = 0; +int +ACE_POSIX_Proactor::close_dispatch_threads (int) +{ + return 0; +} - while (1) - { - result = this->handle_events (timeout); - if (result != 0 || errno == ETIME) - break; - } +size_t +ACE_POSIX_Proactor::number_of_threads (void) const +{ + // @@ Implement it. + ACE_NOTSUP_RETURN (0); +} - // If our handle_events failed, we'll report a failure to the - // Reactor. - return result == -1 ? -1 : 0; +void +ACE_POSIX_Proactor::number_of_threads (size_t threads) +{ + // @@ Implement it. + ACE_UNUSED_ARG (threads); } -int -ACE_POSIX_Proactor::handle_close (ACE_HANDLE handle, - ACE_Reactor_Mask close_mask) +#if 0 +ACE_Proactor::Timer_Queue * +ACE_POSIX_Proactor::timer_queue (void) const { - ACE_UNUSED_ARG (close_mask); - ACE_UNUSED_ARG (handle); + return this->timer_queue_; +} - return this->close (); +void +ACE_POSIX_Proactor::timer_queue (Timer_Queue *tq) +{ + // cleanup old timer queue + if (this->delete_timer_queue_) + { + delete this->timer_queue_; + this->delete_timer_queue_ = 0; + } + + // new timer queue + if (tq == 0) + { + this->timer_queue_ = new Timer_Heap; + this->delete_timer_queue_ = 1; + } + else + { + this->timer_queue_ = tq; + this->delete_timer_queue_ = 0; + } + + // Set the proactor in the timer queue's functor + this->timer_queue_->upcall_functor ().proactor (*this); } #endif /* 0 */ ACE_HANDLE ACE_POSIX_Proactor::get_handle (void) const { - // @@ Implement this. return ACE_INVALID_HANDLE; } @@ -336,6 +362,39 @@ ACE_POSIX_Proactor::ACE_POSIX_Proactor (void) { } +#if 0 +int +ACE_POSIX_Proactor::handle_signal (int, siginfo_t *, ucontext_t *) +{ + // Perform a non-blocking "poll" for all the I/O events that have + // completed in the I/O completion queue. + + ACE_Time_Value timeout (0, 0); + int result = 0; + + while (1) + { + result = this->handle_events (timeout); + if (result != 0 || errno == ETIME) + break; + } + + // If our handle_events failed, we'll report a failure to the + // Reactor. + return result == -1 ? -1 : 0; +} + +int +ACE_POSIX_Proactor::handle_close (ACE_HANDLE handle, + ACE_Reactor_Mask close_mask) +{ + ACE_UNUSED_ARG (close_mask); + ACE_UNUSED_ARG (handle); + + return this->close (); +} +#endif /* 0 */ + void ACE_POSIX_Proactor::application_specific_code (ACE_POSIX_Asynch_Result *asynch_result, u_long bytes_transferred, @@ -358,84 +417,28 @@ ACE_POSIX_Proactor::application_specific_code (ACE_POSIX_Asynch_Result *asynch_r } } - - -int -ACE_POSIX_Proactor::wake_up_dispatch_threads (void) -{ - return 0; -} - -int -ACE_POSIX_Proactor::close_dispatch_threads (int) -{ - return 0; -} - -size_t -ACE_POSIX_Proactor::number_of_threads (void) const -{ - // @@ Implement it. - return 0; -} - -void -ACE_POSIX_Proactor::number_of_threads (size_t threads) -{ - // @@ Implement it. - ACE_UNUSED_ARG (threads); -} - -#if 0 -ACE_Proactor::Timer_Queue * -ACE_POSIX_Proactor::timer_queue (void) const -{ - return this->timer_queue_; -} - -void -ACE_POSIX_Proactor::timer_queue (Timer_Queue *tq) -{ - // cleanup old timer queue - if (this->delete_timer_queue_) - { - delete this->timer_queue_; - this->delete_timer_queue_ = 0; - } - - // new timer queue - if (tq == 0) - { - this->timer_queue_ = new Timer_Heap; - this->delete_timer_queue_ = 1; - } - else - { - this->timer_queue_ = tq; - this->delete_timer_queue_ = 0; - } - - // Set the proactor in the timer queue's functor - this->timer_queue_->upcall_functor ().proactor (*this); -} -#endif /* 0 */ - // ********************************************************************* -class ACE_Export ACE_AIO_Accept_Handler : public ACE_Handler +class ACE_Export ACE_AIOCB_Notify_Pipe_Manager : public ACE_Handler { // = TITLE - // Helper class for doing Asynch_Accept in POSIX4 systems, in - // the case of doing AIO_CONTROL_BLOCKS strategy. + // This class manages the notify pipe of the AIOCB + // Proactor. This class acts as the Handler for the + // <Asynch_Read> operations issued on the notify pipe. This + // class is very useful in implementing <Asynch_Accept> operation + // class for the <AIOCB_Proactor>. This is also useful for + // implementing <post_completion> for <AIOCB_Proactor>. // // = DESCRIPTION - // Doing Asynch_Accept in POSIX4 implementation is tricky. In - // the case of doing the things with AIO_CONTROL_BLOCKS and not - // with Real-Time Signals, this becomes even more trickier. We - // use a notifyn pipe here to implement Asynch_Accpet. This class - // will issue a <Asynch_Read> on the pipe. <Asynch_Accept> will - // send a result pointer containg all the information through - // this pipe. + // <AIOCB_Proactor> class issues a <Asynch_Read> on + // the pipe, using this class as the + // Handler. <POSIX_Asynch_Result *>'s are sent through the + // notify pipe. When <POSIX_Asynch_Result *>'s show up on the + // notify pipe, the <POSIX_AIOCB_Proactor> dispatches the + // completion of the <Asynch_Read_Stream> and calls the + // <handle_read_stream> of this class. This class calls complete + // on the <POSIX_Asynch_Result *> and thus calls the application + // handler. // Handling the MessageBlock: // We give this message block to read the result pointer through // the notify pipe. We expect that to read 4 bytes from the @@ -443,41 +446,27 @@ class ACE_Export ACE_AIO_Accept_Handler : public ACE_Handler // message block to another <accept>, we update <wr_ptr> and put // it in its initial position. public: - ACE_AIO_Accept_Handler (ACE_POSIX_AIOCB_Proactor *posix_aiocb_proactor); + ACE_AIOCB_Notify_Pipe_Manager (ACE_POSIX_AIOCB_Proactor *posix_aiocb_proactor); // Constructor. You need the posix proactor because you need to call - // things like application_specific_code etc. Having the main - // proactor is good so that life is easier when we use Asynch_Read, - // while issuing asynchronous read on the notify pipe. - - virtual ~ACE_AIO_Accept_Handler (void); + // <application_specific_code> + + virtual ~ACE_AIOCB_Notify_Pipe_Manager (void); // Destructor. - -#if 0 - void proactor (ACE_Proactor *proactor); - // Set the Proactor pointer. POSIX_AIOCB_Proactor may not have the - // Proactor pointer when it creates this object and so we have this - // method so that it can set it when it gets it. -#endif /* 0 */ - - int notify (ACE_POSIX_Asynch_Accept_Result *result); - // Send this Result to Proactor through the notification pipe. + + int notify (ACE_POSIX_Asynch_Result *result); + // Send the result pointer through the notification pipe. virtual void handle_read_stream (const ACE_Asynch_Read_Stream::Result &result); - // Read from the pipe is complete. We get the <Result> from - // Asynch_Handler. We have to do the notification here. + // This is the call back method when <Asynch_Read> from the pipe is + // complete. private: -#if 0 - ACE_Proactor *proactor_; - // The main proactor interface. -#endif /* 0 */ - ACE_POSIX_AIOCB_Proactor *posix_aiocb_proactor_; // The implementation proactor class. - + ACE_Message_Block message_block_; - // Message block to get ACE_Asynch_Accept::Result pointer from - // ACE_Asych_Accept. + // Message block to get ACE_POSIX_Asynch_Result pointer from the + // pipe. ACE_Pipe pipe_; // Pipe for the communication between Proactor and the @@ -486,11 +475,11 @@ private: ACE_POSIX_AIOCB_Asynch_Read_Stream read_stream_; // To do asynch_read on the pipe. - ACE_AIO_Accept_Handler (void); + ACE_AIOCB_Notify_Pipe_Manager (void); // Default constructor. Shouldnt be called. }; -ACE_AIO_Accept_Handler::ACE_AIO_Accept_Handler (ACE_POSIX_AIOCB_Proactor *posix_aiocb_proactor) +ACE_AIOCB_Notify_Pipe_Manager::ACE_AIOCB_Notify_Pipe_Manager (ACE_POSIX_AIOCB_Proactor *posix_aiocb_proactor) : posix_aiocb_proactor_ (posix_aiocb_proactor), message_block_ (sizeof (ACE_POSIX_Asynch_Accept_Result *)), read_stream_ (posix_aiocb_proactor) @@ -510,21 +499,21 @@ ACE_AIO_Accept_Handler::ACE_AIO_Accept_Handler (ACE_POSIX_AIOCB_Proactor *posix_ // Issue an asynch_read on the read_stream of the notify pipe. if (this->read_stream_.read (this->message_block_, - sizeof (ACE_POSIX_Asynch_Accept_Result *), + sizeof (ACE_POSIX_Asynch_Result *), 0, // ACT 0) // Priority == -1) ACE_ERROR ((LM_ERROR, "%N:%l:%p\n", - "Read from stream failed")); + "Read from pipe failed")); } -ACE_AIO_Accept_Handler::~ACE_AIO_Accept_Handler (void) +ACE_AIOCB_Notify_Pipe_Manager::~ACE_AIOCB_Notify_Pipe_Manager (void) { } int -ACE_AIO_Accept_Handler::notify (ACE_POSIX_Asynch_Accept_Result *result) +ACE_AIOCB_Notify_Pipe_Manager::notify (ACE_POSIX_Asynch_Result *result) { // Send the result pointer through the pipe. int return_val = ACE::send (this->pipe_.write_handle (), @@ -533,24 +522,21 @@ ACE_AIO_Accept_Handler::notify (ACE_POSIX_Asynch_Accept_Result *result) if (return_val != sizeof (result)) ACE_ERROR_RETURN ((LM_ERROR, "(%P %t):%p\n", - "Error:Writing on to pipe failed"), + "Error:Writing on to notify pipe failed"), -1); return 0; } void -ACE_AIO_Accept_Handler::handle_read_stream (const ACE_Asynch_Read_Stream::Result &result) +ACE_AIOCB_Notify_Pipe_Manager::handle_read_stream (const ACE_Asynch_Read_Stream::Result &result) { - // @@ - ACE_DEBUG ((LM_DEBUG, "ACE_AIO_Accept_Handler::handle_read_stream called\n")); - - // The message block actually contains the ACE_POSIX_Asynch_Accept_Result - // pointer. - ACE_POSIX_Asynch_Accept_Result *accept_result = 0; - accept_result = *(ACE_POSIX_Asynch_Accept_Result **) result.message_block ().rd_ptr (); + // The message block actually contains the ACE_POSIX_Asynch_Result + // pointer. + ACE_POSIX_Asynch_Result *asynch_result = 0; + asynch_result = *(ACE_POSIX_Asynch_Result **) result.message_block ().rd_ptr (); // Do the upcall. - this->posix_aiocb_proactor_->application_specific_code (accept_result, + this->posix_aiocb_proactor_->application_specific_code (asynch_result, 0, // No Bytes transferred. 1, // Success. 0, // Completion token. @@ -561,22 +547,22 @@ ACE_AIO_Accept_Handler::handle_read_stream (const ACE_Asynch_Read_Stream::Result if (this->message_block_.length () > 0) this->message_block_.wr_ptr (this->message_block_.rd_ptr ()); - // One accept has completed. Issue a read to handle any <accept>s in - // the future. + // One accept has completed. Issue a read to handle any + // <post_completion>s in the future. if (this->read_stream_.read (this->message_block_, - sizeof (ACE_POSIX_Asynch_Accept_Result), + sizeof (ACE_POSIX_Asynch_Result *), 0, // ACT 0) // Priority == -1) ACE_ERROR ((LM_ERROR, "%N:%l:%p\n", - "Read from stream failed")); + "Read from pipe failed")); } // ********************************************************************* ACE_POSIX_AIOCB_Proactor::ACE_POSIX_AIOCB_Proactor (void) - : aio_accept_handler_ (0), + : aiocb_notify_pipe_manager_ (0), aiocb_list_max_size_ (ACE_RTSIG_MAX), aiocb_list_cur_size_ (0) { @@ -586,8 +572,8 @@ ACE_POSIX_AIOCB_Proactor::ACE_POSIX_AIOCB_Proactor (void) // Accept Handler for aio_accept. Remember! this issues a Asynch_Read // on the notify pipe for doing the Asynch_Accept. - ACE_NEW (aio_accept_handler_, - ACE_AIO_Accept_Handler (this)); + ACE_NEW (aiocb_notify_pipe_manager_, + ACE_AIOCB_Notify_Pipe_Manager (this)); } // Destructor. @@ -612,8 +598,7 @@ ACE_POSIX_AIOCB_Proactor::handle_events (void) int ACE_POSIX_AIOCB_Proactor::post_completion (ACE_POSIX_Asynch_Result *result) { - ACE_UNUSED_ARG (result); - return 0; + return this->aiocb_notify_pipe_manager_->notify (result); } ACE_Asynch_Read_Stream_Impl * @@ -677,14 +662,6 @@ ACE_POSIX_AIOCB_Proactor::create_asynch_transmit_file (void) } int -ACE_POSIX_AIOCB_Proactor::notify_asynch_accept (ACE_POSIX_Asynch_Accept_Result* result) -{ - this->aio_accept_handler_->notify (result); - - return 0; -} - -int ACE_POSIX_AIOCB_Proactor::handle_events (unsigned long milli_seconds) { if (this->aiocb_list_cur_size_ == 0) @@ -917,7 +894,24 @@ ACE_POSIX_SIG_Proactor::handle_events (void) int ACE_POSIX_SIG_Proactor::post_completion (ACE_POSIX_Asynch_Result *result) { - ACE_UNUSED_ARG (result); + // Get this process id. + pid_t pid = ACE_OS::getpid (); + if (pid == (pid_t) -1) + ACE_ERROR_RETURN ((LM_ERROR, + "Error:(%P | %t):%p", + "<getpid> failed\n"), + -1); + + // Set the signal information. + sigval value; + value.sival_ptr = (void *) result; + + // Queue the signal. + if (sigqueue (pid, ACE_SIG_AIO, value) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + "Error:(%P | %t):%p", + "<sigqueue> failed\n"), + -1); return 0; } |