diff options
author | alex <alex@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 1998-09-25 19:51:45 +0000 |
---|---|---|
committer | alex <alex@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 1998-09-25 19:51:45 +0000 |
commit | 3a376a7975fce2943b3b4651df2ab80d202c848b (patch) | |
tree | 0a8a701211e88221ae2bcce719c8e72f2eef55bd /ace/Proactor.cpp | |
parent | e7aa3b33c2e6c46bd45996026b55b5294d8f7fee (diff) | |
download | ATCD-3a376a7975fce2943b3b4651df2ab80d202c848b.tar.gz |
Implemented Asynch_Accept to work for AIO_CONTROL_BLOCKS strategy of
completion notification.
Defined an auxillary Accept_Handler called ACE_AIO_Accept_Handler in
addition to the ACE_Asynch_Accept_Handler. ACE_AIO_Accept_Handler
holds the notification pipe and does a read on it to handle the
<ACE_Asynch_Accept::Result> coming from Asynch_Accept_handler.
THANKS to Doug and Irfan for suggesting this 'notification pipe' based
implementation for AIO_CONTROL_BLOCKS strategy, so that Proactor will
work on platforms where POSIX4 RT_Signals are broken (Solaris 2.6)
Diffstat (limited to 'ace/Proactor.cpp')
-rw-r--r-- | ace/Proactor.cpp | 211 |
1 files changed, 190 insertions, 21 deletions
diff --git a/ace/Proactor.cpp b/ace/Proactor.cpp index 02af632873a..4e8ac342934 100644 --- a/ace/Proactor.cpp +++ b/ace/Proactor.cpp @@ -68,6 +68,8 @@ class ACE_Export ACE_Proactor_Timer_Handler : public ACE_Task <ACE_NULL_SYNCH> // Flag used to indicate when we are shutting down. }; + + ACE_Proactor_Timer_Handler::ACE_Proactor_Timer_Handler (ACE_Proactor &proactor) : ACE_Task <ACE_NULL_SYNCH> (&proactor.thr_mgr_), proactor_ (proactor), @@ -136,6 +138,136 @@ ACE_Proactor_Timer_Handler::svc (void) #endif /* ACE_HAS_AIO_CALLS */ } +#if defined (ACE_HAS_AIO_CALLS) +class ACE_Export ACE_AIO_Accept_Handler : public ACE_Handler +{ + // = TITLE + // Helper class for doing Asynch_Accept in POSIX4 systems, in + // the case of doing AIO_CONTROL_BLOCKS strategy. + // + // = DESCRIPTION + // Doing Asynch_Accept in POSIX4 implementation is tricky. In + // the case of doing the things with AIO_CONTROL_BLOCKS and not + // with Real-Time Signals, this becomes even more trickier. We + // use a notifyn pipe here to implement Asynch_Accpet. This class + // will issue a <Asynch_Read> on the pipe. <Asynch_Accept> will + // send a result pointer containg all the information through + // this pipe. +public: + ACE_AIO_Accept_Handler (ACE_Proactor* proactor); + // Constructor. + + ~ACE_AIO_Accept_Handler (void); + // Destructor. + +#if 0 + int accept (void); + // <Asynch_Accept> calls this when an <accept> call has been issued + // by the application. We issue an <Asynch_Read> here on the <pipe>, + // so that <Asynch_Accept> can notify us by sending us Result + // pointer. +#endif /* 0 */ + + int notify (ACE_Asynch_Accept::Result* result); + // Send this Result to Proactor through the notification pipe. + + virtual void handle_read_stream (const ACE_Asynch_Read_Stream::Result &result); + // Read from the pipe is complete. We get the <Result> from + // Asynch_Handler. We have to do the notification here. + +private: + ACE_Proactor* proactor_; + // The proactor in use. + + ACE_Message_Block message_block_; + // Message block to get ACE_Asynch_Accept::Result from + // ACE_Asych_Accept. + + ACE_Pipe pipe_; + // Pipe for the communication between Proactor and the + // Asynch_Accept. + + ACE_Asynch_Read_Stream read_stream_; + // To do asynch_read on the pipe. + + ACE_AIO_Accept_Handler (void); + // Default constructor. Shouldnt be called. +}; + +ACE_AIO_Accept_Handler::ACE_AIO_Accept_Handler (ACE_Proactor* proactor) + : proactor_ (proactor), + message_block_ (sizeof (ACE_Asynch_Accept::Result)) +{ + // Open the pipe. + this->pipe_.open (); + + // Open the read stream. + if (this->read_stream_.open (*this, + this->pipe_.read_handle (), + 0, + this->proactor_) == -1) + ACE_ERROR ((LM_ERROR, + "%N:%l:%p\n", + "Open on Read Stream failed")); + + // Issue an asynch_read on the read_stream. + if (this->read_stream_.read (this->message_block_, + sizeof (ACE_Asynch_Accept::Result)) == -1) + ACE_ERROR ((LM_ERROR, + "%N:%l:%p\n", + "Read from stream failed")); +} + +#if 0 +int +ACE_AIO_Accept_Handler::accept (void) +{ + // Issue an asynch_read on the read_stream. + if (this->read_stream_.read (this->message_block, + sizeof (ACE_Asynch_Accept::Result)) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + "%N:%l:%p\n", + "Read from stream failed"), + -1); +} +#endif /* 0 */ + +ACE_AIO_Accept_Handler::~ACE_AIO_Accept_Handler (void) +{ +} + +int +ACE_AIO_Accept_Handler::notify (ACE_Asynch_Accept::Result* result) +{ + // Send the result through the pipe. + if (ACE_OS::write (this->pipe_.write_handle (), + (void *) result, + sizeof (*result)) < sizeof (*result)) + ACE_ERROR_RETURN ((LM_ERROR, + "(%P %t):%p\n", + "Error:Writing on to pipe failed"), + -1); +} + +void +ACE_AIO_Accept_Handler::handle_read_stream (const ACE_Asynch_Read_Stream::Result &result) +{ + // @@ + ACE_DEBUG ((LM_DEBUG, "ACE_AIO_Accept_Handler::handle_read_stream called\n")); + + // The message block actually contains the ACE_Asynch_Accept::Result. + ACE_Asynch_Accept::Result* accept_result = + (ACE_Asynch_Accept::Result*) result.message_block ().rd_ptr (); + + // Do the upcall. + this->proactor_->application_specific_code (accept_result, + 0, // Bytes transferred. + 1, // Success. + 0, // Completion token. + 0); // Error. +} +#endif /* ACE_HAS_AIO_CALLS */ + ACE_Proactor_Handle_Timeout_Upcall::ACE_Proactor_Handle_Timeout_Upcall (void) : proactor_ (0) { @@ -217,6 +349,8 @@ ACE_Proactor::ACE_Proactor (size_t number_of_threads, POSIX_COMPLETION_STRATEGY completion_strategy) : #if defined (ACE_HAS_AIO_CALLS) + posix_completion_strategy_ (completion_strategy), + aio_accept_handler_ (0), aiocb_list_max_size_ (ACE_RTSIG_MAX), aiocb_list_cur_size_ (0), #else /* ACE_HAS_AIO_CALLS */ @@ -228,20 +362,28 @@ ACE_Proactor::ACE_Proactor (size_t number_of_threads, delete_timer_queue_ (0), timer_handler_ (0), used_with_reactor_event_loop_ (used_with_reactor_event_loop) -#if defined (ACE_HAS_AIO_CALLS) - , posix_completion_strategy_ (completion_strategy) -#endif /* ACE_HAS_AIO_CALLS */ { #if defined (ACE_HAS_AIO_CALLS) - // Initialize the array. - for (size_t ai = 0; - ai < this->aiocb_list_max_size_; - ai++) - aiocb_list_[ai] = 0; - ACE_UNUSED_ARG (number_of_threads); ACE_UNUSED_ARG (tq); + // The following things are necessary only for the + // AIO_CONTROL_BLOCKS strategy. + if (this->posix_completion_strategy_ == AIO_CONTROL_BLOCKS) + { + // Initialize the array. + for (size_t ai = 0; + ai < this->aiocb_list_max_size_; + ai++) + aiocb_list_[ai] = 0; + + // Accept Handler for aio_accept. Remember! this issues a Asynch_Read + // on the notify pipe for doing the Asynch_Accept. + ACE_NEW (aio_accept_handler_, + ACE_AIO_Accept_Handler (this)); + } + + // Mask the RT_compeltion signals if we are using the RT_SIGNALS // STRATEGY for completion querying. if (completion_strategy == RT_SIGNALS) @@ -249,17 +391,20 @@ ACE_Proactor::ACE_Proactor (size_t number_of_threads, // Make the sigset_t consisting of the completion signals. if (sigemptyset (&this->RT_completion_signals_) < 0) ACE_ERROR ((LM_ERROR, - "Error:%p:Couldnt init the RT completion signal set\n")); + "Error:%p\n", + "Couldn't init the RT completion signal set")); if (sigaddset (&this->RT_completion_signals_, ACE_SIG_AIO) < 0) ACE_ERROR ((LM_ERROR, - "Error:%p:Couldnt init the RT completion signal set\n")); + "Error:%p\n", + "Couldnt init the RT completion signal set")); // Mask them. if (sigprocmask (SIG_BLOCK, &RT_completion_signals_, 0) < 0) ACE_ERROR ((LM_ERROR, - "Error:%p:Couldnt maks the RT completion signals\n")); + "Error:%p\n", + "Couldnt mask the RT completion signals")); // Setting up the handler(!) for these signals. struct sigaction reaction; @@ -275,7 +420,8 @@ ACE_Proactor::ACE_Proactor (size_t number_of_threads, 0); if (sigaction_return == -1) ACE_ERROR ((LM_ERROR, - "Error:%p:Proactorc ouldnt do sigaction for the RT SIGNAL")); + "Error:%p\n", + "Proactor couldnt do sigaction for the RT SIGNAL")); } #else /* ACE_HAS_AIO_CALLS */ @@ -615,6 +761,22 @@ ACE_Proactor::posix_completion_strategy (void) { return posix_completion_strategy_; } + +#if 0 +void +ACE_Proactor::posix_completion_strategy (ACE_Proactor::POSIX_COMPLETION_STRATEGY strategy) +{ + this->posix_completion_strategy_ = strategy; +} +#endif /* 0 */ + +int +ACE_Proactor::notify_asynch_accept (ACE_Asynch_Accept::Result* result) +{ + this->aio_accept_handler_->notify (result); + + return 0; +} #endif /* ACE_HAS_AIO_CALLS */ int @@ -659,7 +821,8 @@ ACE_Proactor::handle_events (unsigned long milli_seconds) // does. if (sig_return == -1) ACE_ERROR_RETURN ((LM_ERROR, - "Error:%p:Error waiting for RT completion signals\n"), + "Error:%p\n", + "Error waiting for RT completion signals"), 0); // RT completion signals returned. @@ -705,21 +868,24 @@ ACE_Proactor::handle_events (unsigned long milli_seconds) int error_code = aio_error (aiocb_ptr); if (error_code == -1) ACE_ERROR_RETURN ((LM_ERROR, - "%p:Invalid control block was sent to <aio_error> for compleion querying\n"), + "Error:%p\n", + "Invalid control block was sent to <aio_error> for compleion querying"), -1); if (error_code != 0) // Error occurred in the <aio_>call. Return the errno // corresponding to that <aio_> call. ACE_ERROR_RETURN ((LM_ERROR, - "%p:An AIO call has failed\n"), + "Error:%p\n", + "An AIO call has failed"), error_code); // No error occured in the AIO operation. int nbytes = aio_return (aiocb_ptr); if (nbytes == -1) ACE_ERROR_RETURN ((LM_ERROR, - "%p:Invalid control block was send to <aio_return>\n"), + "Error:%p\n", + "Invalid control block was send to <aio_return>"), -1); // <nbytes> have been successfully transmitted. @@ -742,7 +908,7 @@ ACE_Proactor::handle_events (unsigned long milli_seconds) 0, // No bytes transferred. 1, // Result : True. 0, // No completion key. - 0); + 0); // No error. } else @@ -773,11 +939,13 @@ ACE_Proactor::handle_events (unsigned long milli_seconds) // appropriately. This is what the WinNT proactor does. if (errno == EAGAIN) ACE_ERROR_RETURN ((LM_ERROR, - "(%p):aio_suspend"), + "Error:%p\n", + "aio_suspend"), 0); else ACE_ERROR_RETURN ((LM_ERROR, - "(%p):aio_suspend"), + "Error:%p\n", + "aio_suspend"), -1); // Check which aio has finished. size_t ai; @@ -793,7 +961,8 @@ ACE_Proactor::handle_events (unsigned long milli_seconds) if (nbytes == -1) ACE_ERROR_RETURN ((LM_ERROR, - "(%p):AIO failed"), + "Error:%p\n", + "AIO failed"), -1); else { |