diff options
author | alex <alex@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 1999-02-16 06:38:49 +0000 |
---|---|---|
committer | alex <alex@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 1999-02-16 06:38:49 +0000 |
commit | 7211c1e2fcda52e3bce16a29f05cc8f94fbab56c (patch) | |
tree | 6103a5605b624fe5306a57d8c8db1e4888734bc1 /ace/POSIX_Proactor.cpp | |
parent | c3ea40ae39854fb98b82ab6c9e415010ff09b26d (diff) | |
download | ATCD-7211c1e2fcda52e3bce16a29f05cc8f94fbab56c.tar.gz |
*** empty log message ***
Diffstat (limited to 'ace/POSIX_Proactor.cpp')
-rw-r--r-- | ace/POSIX_Proactor.cpp | 1138 |
1 files changed, 1138 insertions, 0 deletions
diff --git a/ace/POSIX_Proactor.cpp b/ace/POSIX_Proactor.cpp new file mode 100644 index 00000000000..05bac013d1c --- /dev/null +++ b/ace/POSIX_Proactor.cpp @@ -0,0 +1,1138 @@ +// $Id$ + +#define ACE_BUILD_DLL +#include "ace/POSIX_Proactor.h" + +#if defined (ACE_HAS_AIO_CALLS) + +#include "ace/Task_T.h" +#include "ace/Log_Msg.h" +#include "ace/Object_Manager.h" + +#if !defined (__ACE_INLINE__) +#include "ace/POSIX_Proactor.i" +#endif /* __ACE_INLINE__ */ + +ACE_POSIX_Proactor::~ACE_POSIX_Proactor (void) +{ +} + +int +ACE_POSIX_Proactor::close (void) +{ +#if 0 + // Take care of the timer handler + if (this->timer_handler_) + { + delete this->timer_handler_; + this->timer_handler_ = 0; + } + + // Take care of the timer queue + if (this->delete_timer_queue_) + { + delete this->timer_queue_; + this->timer_queue_ = 0; + this->delete_timer_queue_ = 0; + } +#endif /* 0 */ + return 0; +} + +int +ACE_POSIX_Proactor::register_handle (ACE_HANDLE handle, + const void *completion_key) +{ + ACE_UNUSED_ARG (handle); + ACE_UNUSED_ARG (completion_key); + return 0; +} + +long +ACE_POSIX_Proactor::schedule_timer (ACE_Handler &handler, + const void *act, + const ACE_Time_Value &time) +{ + return this->schedule_timer (handler, + act, + time, + ACE_Time_Value::zero); +} + +long +ACE_POSIX_Proactor::schedule_repeating_timer (ACE_Handler &handler, + const void *act, + const ACE_Time_Value &interval) +{ + return this->schedule_timer (handler, + act, + interval, + interval); +} + +long +ACE_POSIX_Proactor::schedule_timer (ACE_Handler &handler, + const void *act, + const ACE_Time_Value &time, + const ACE_Time_Value &interval) +{ + ACE_UNUSED_ARG (handler); + ACE_UNUSED_ARG (act); + ACE_UNUSED_ARG (time); + ACE_UNUSED_ARG (interval); + ACE_NOTSUP_RETURN ((long) -1); + +#if 0 + // absolute time. + ACE_Time_Value absolute_time = + this->timer_queue_->gettimeofday () + time; + + // Only one guy goes in here at a time + ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex, ace_mon, this->timer_queue_->mutex (), -1); + + // Schedule the timer + long result = this->timer_queue_->schedule (&handler, + act, + absolute_time, + interval); + if (result != -1) + { + // no failures: check to see if we are the earliest time + if (this->timer_queue_->earliest_time () == absolute_time) + + // wake up the timer thread + if (this->timer_handler_->timer_event_.signal () == -1) + { + // Cancel timer + this->timer_queue_->cancel (result); + result = -1; + } + } + return result; +#endif /* 0 */ +} + +int +ACE_POSIX_Proactor::cancel_timer (long timer_id, + const void **arg, + int dont_call_handle_close) +{ + ACE_NOTSUP_RETURN (-1); +#if 0 + // No need to singal timer event here. Even if the cancel timer was + // the earliest, we will have an extra wakeup. + return this->timer_queue_->cancel (timer_id, + arg, + dont_call_handle_close); +#endif /* 0 */ +} + +int +ACE_POSIX_Proactor::cancel_timer (ACE_Handler &handler, + int dont_call_handle_close) +{ + ACE_NOTSUP_RETURN (-1); +#if 0 + // No need to signal timer event here. Even if the cancel timer was + // the earliest, we will have an extra wakeup. + return this->timer_queue_->cancel (&handler, + dont_call_handle_close); +#endif /* 0 */ +} + +int +ACE_POSIX_Proactor::post_completion (ACE_POSIX_Asynch_Result *result) +{ + ACE_UNUSED_ARG (result); + return 0; +} + +#if 0 +int +ACE_POSIX_Proactor::handle_signal (int, siginfo_t *, ucontext_t *) +{ + // Perform a non-blocking "poll" for all the I/O events that have + // completed in the I/O completion queue. + + ACE_Time_Value timeout (0, 0); + int result = 0; + + while (1) + { + result = this->handle_events (timeout); + if (result != 0 || errno == ETIME) + break; + } + + // If our handle_events failed, we'll report a failure to the + // Reactor. + return result == -1 ? -1 : 0; +} + +int +ACE_POSIX_Proactor::handle_close (ACE_HANDLE handle, + ACE_Reactor_Mask close_mask) +{ + ACE_UNUSED_ARG (close_mask); + ACE_UNUSED_ARG (handle); + + return this->close (); +} +#endif /* 0 */ + +ACE_HANDLE +ACE_POSIX_Proactor::get_handle (void) const +{ + // @@ Implement this. + return ACE_INVALID_HANDLE; +} + +ACE_Asynch_Read_Stream_Result_Impl * +ACE_POSIX_Proactor::create_asynch_read_stream_result (ACE_Handler &handler, + ACE_HANDLE handle, + ACE_Message_Block &message_block, + u_long bytes_to_read, + const void* act, + ACE_HANDLE event, + int priority) +{ + ACE_Asynch_Read_Stream_Result_Impl *implementation; + ACE_NEW_RETURN (implementation, + ACE_POSIX_Asynch_Read_Stream_Result (handler, + handle, + message_block, + bytes_to_read, + act, + event, + priority), + 0); + return implementation; +} + +ACE_Asynch_Write_Stream_Result_Impl * +ACE_POSIX_Proactor::create_asynch_write_stream_result (ACE_Handler &handler, + ACE_HANDLE handle, + ACE_Message_Block &message_block, + u_long bytes_to_write, + const void* act, + ACE_HANDLE event, + int priority) +{ + ACE_Asynch_Write_Stream_Result_Impl *implementation; + ACE_NEW_RETURN (implementation, + ACE_POSIX_Asynch_Write_Stream_Result (handler, + handle, + message_block, + bytes_to_write, + act, + event, + priority), + 0); + return implementation; +} + +ACE_Asynch_Read_File_Result_Impl * +ACE_POSIX_Proactor::create_asynch_read_file_result (ACE_Handler &handler, + ACE_HANDLE handle, + ACE_Message_Block &message_block, + u_long bytes_to_read, + const void* act, + u_long offset, + u_long offset_high, + ACE_HANDLE event, + int priority) +{ + ACE_Asynch_Read_File_Result_Impl *implementation; + ACE_NEW_RETURN (implementation, + ACE_POSIX_Asynch_Read_File_Result (handler, + handle, + message_block, + bytes_to_read, + act, + offset, + offset_high, + event, + priority), + 0); + return implementation; +} + +ACE_Asynch_Write_File_Result_Impl * +ACE_POSIX_Proactor::create_asynch_write_file_result (ACE_Handler &handler, + ACE_HANDLE handle, + ACE_Message_Block &message_block, + u_long bytes_to_write, + const void* act, + u_long offset, + u_long offset_high, + ACE_HANDLE event, + int priority) +{ + ACE_Asynch_Write_File_Result_Impl *implementation; + ACE_NEW_RETURN (implementation, + ACE_POSIX_Asynch_Write_File_Result (handler, + handle, + message_block, + bytes_to_write, + act, + offset, + offset_high, + event, + priority), + 0); + return implementation; +} + +ACE_Asynch_Accept_Result_Impl * +ACE_POSIX_Proactor::create_asynch_accept_result (ACE_Handler &handler, + ACE_HANDLE listen_handle, + ACE_HANDLE accept_handle, + ACE_Message_Block &message_block, + u_long bytes_to_read, + const void* act, + ACE_HANDLE event, + int priority) +{ + ACE_Asynch_Accept_Result_Impl *implementation; + ACE_NEW_RETURN (implementation, + ACE_POSIX_Asynch_Accept_Result (handler, + listen_handle, + accept_handle, + message_block, + bytes_to_read, + act, + event, + priority), + 0); + return implementation; +} + +ACE_Asynch_Transmit_File_Result_Impl * +ACE_POSIX_Proactor::create_asynch_transmit_file_result (ACE_Handler &handler, + ACE_HANDLE socket, + ACE_HANDLE file, + ACE_Asynch_Transmit_File::Header_And_Trailer *header_and_trailer, + u_long bytes_to_write, + u_long offset, + u_long offset_high, + u_long bytes_per_send, + u_long flags, + const void *act, + ACE_HANDLE event, + int priority) +{ + ACE_Asynch_Transmit_File_Result_Impl *implementation; + ACE_NEW_RETURN (implementation, + ACE_POSIX_Asynch_Transmit_File_Result (handler, + socket, + file, + header_and_trailer, + bytes_to_write, + offset, + offset_high, + bytes_per_send, + flags, + act, + event, + priority), + 0); + return implementation; +} + +ACE_POSIX_Proactor::ACE_POSIX_Proactor (void) +{ +} + +void +ACE_POSIX_Proactor::application_specific_code (ACE_POSIX_Asynch_Result *asynch_result, + u_long bytes_transferred, + int success, + const void *completion_key, + u_long error) +{ + ACE_SEH_TRY + { + // Call completion hook + asynch_result->complete (bytes_transferred, + success, + (void *) completion_key, + error); + } + ACE_SEH_FINALLY + { + // This is crucial to prevent memory leaks + delete asynch_result; + } +} + + + +int +ACE_POSIX_Proactor::wake_up_dispatch_threads (void) +{ + return 0; +} + +int +ACE_POSIX_Proactor::close_dispatch_threads (int) +{ + return 0; +} + +size_t +ACE_POSIX_Proactor::number_of_threads (void) const +{ + // @@ Implement it. + return 0; +} + +void +ACE_POSIX_Proactor::number_of_threads (size_t threads) +{ + // @@ Implement it. + ACE_UNUSED_ARG (threads); +} + +#if 0 +ACE_Proactor::Timer_Queue * +ACE_POSIX_Proactor::timer_queue (void) const +{ + return this->timer_queue_; +} + +void +ACE_POSIX_Proactor::timer_queue (Timer_Queue *tq) +{ + // cleanup old timer queue + if (this->delete_timer_queue_) + { + delete this->timer_queue_; + this->delete_timer_queue_ = 0; + } + + // new timer queue + if (tq == 0) + { + this->timer_queue_ = new Timer_Heap; + this->delete_timer_queue_ = 1; + } + else + { + this->timer_queue_ = tq; + this->delete_timer_queue_ = 0; + } + + // Set the proactor in the timer queue's functor + this->timer_queue_->upcall_functor ().proactor (*this); +} +#endif /* 0 */ + +// ********************************************************************* + +class ACE_Export ACE_AIO_Accept_Handler : public ACE_Handler +{ + // = TITLE + // Helper class for doing Asynch_Accept in POSIX4 systems, in + // the case of doing AIO_CONTROL_BLOCKS strategy. + // + // = DESCRIPTION + // Doing Asynch_Accept in POSIX4 implementation is tricky. In + // the case of doing the things with AIO_CONTROL_BLOCKS and not + // with Real-Time Signals, this becomes even more trickier. We + // use a notifyn pipe here to implement Asynch_Accpet. This class + // will issue a <Asynch_Read> on the pipe. <Asynch_Accept> will + // send a result pointer containg all the information through + // this pipe. + // Handling the MessageBlock: + // We give this message block to read the result pointer through + // the notify pipe. We expect that to read 4 bytes from the + // notify pipe, for each <accept> call. Before giving this + // message block to another <accept>, we update <wr_ptr> and put + // it in its initial position. +public: + ACE_AIO_Accept_Handler (ACE_POSIX_AIOCB_Proactor *posix_aiocb_proactor); + // Constructor. You need the posix proactor because you need to call + // things like application_specific_code etc. Having the main + // proactor is good so that life is easier when we use Asynch_Read, + // while issuing asynchronous read on the notify pipe. + + virtual ~ACE_AIO_Accept_Handler (void); + // Destructor. + +#if 0 + void proactor (ACE_Proactor *proactor); + // Set the Proactor pointer. POSIX_AIOCB_Proactor may not have the + // Proactor pointer when it creates this object and so we have this + // method so that it can set it when it gets it. +#endif /* 0 */ + + int notify (ACE_POSIX_Asynch_Accept_Result *result); + // Send this Result to Proactor through the notification pipe. + + virtual void handle_read_stream (const ACE_Asynch_Read_Stream::Result &result); + // Read from the pipe is complete. We get the <Result> from + // Asynch_Handler. We have to do the notification here. + +private: +#if 0 + ACE_Proactor *proactor_; + // The main proactor interface. +#endif /* 0 */ + + ACE_POSIX_AIOCB_Proactor *posix_aiocb_proactor_; + // The implementation proactor class. + + ACE_Message_Block message_block_; + // Message block to get ACE_Asynch_Accept::Result pointer from + // ACE_Asych_Accept. + + ACE_Pipe pipe_; + // Pipe for the communication between Proactor and the + // Asynch_Accept. + + ACE_POSIX_AIOCB_Asynch_Read_Stream read_stream_; + // To do asynch_read on the pipe. + + ACE_AIO_Accept_Handler (void); + // Default constructor. Shouldnt be called. +}; + +ACE_AIO_Accept_Handler::ACE_AIO_Accept_Handler (ACE_POSIX_AIOCB_Proactor *posix_aiocb_proactor) + : posix_aiocb_proactor_ (posix_aiocb_proactor), + message_block_ (sizeof (ACE_POSIX_Asynch_Accept_Result *)), + read_stream_ (posix_aiocb_proactor) +{ + // Open the pipe. + this->pipe_.open (); + + // Open the read stream. + if (this->read_stream_.open (*this, + this->pipe_.read_handle (), + 0, // Completion Key + 0) // Proactor + == -1) + ACE_ERROR ((LM_ERROR, + "%N:%l:%p\n", + "Open on Read Stream failed")); + + // Issue an asynch_read on the read_stream of the notify pipe. + if (this->read_stream_.read (this->message_block_, + sizeof (ACE_POSIX_Asynch_Accept_Result *), + 0, // ACT + 0) // Priority + == -1) + ACE_ERROR ((LM_ERROR, + "%N:%l:%p\n", + "Read from stream failed")); +} + +ACE_AIO_Accept_Handler::~ACE_AIO_Accept_Handler (void) +{ +} + +int +ACE_AIO_Accept_Handler::notify (ACE_POSIX_Asynch_Accept_Result *result) +{ + // Send the result pointer through the pipe. + int return_val = ACE::send (this->pipe_.write_handle (), + (char *) &result, + sizeof (result)); + if (return_val != sizeof (result)) + ACE_ERROR_RETURN ((LM_ERROR, + "(%P %t):%p\n", + "Error:Writing on to pipe failed"), + -1); + return 0; +} + +void +ACE_AIO_Accept_Handler::handle_read_stream (const ACE_Asynch_Read_Stream::Result &result) +{ + // @@ + ACE_DEBUG ((LM_DEBUG, "ACE_AIO_Accept_Handler::handle_read_stream called\n")); + + // The message block actually contains the ACE_POSIX_Asynch_Accept_Result + // pointer. + ACE_POSIX_Asynch_Accept_Result *accept_result = 0; + accept_result = *(ACE_POSIX_Asynch_Accept_Result **) result.message_block ().rd_ptr (); + + // Do the upcall. + this->posix_aiocb_proactor_->application_specific_code (accept_result, + 0, // No Bytes transferred. + 1, // Success. + 0, // Completion token. + 0); // Error. + + // Set the message block properly. Put the <wr_ptr> back in the + // initial position. + if (this->message_block_.length () > 0) + this->message_block_.wr_ptr (this->message_block_.rd_ptr ()); + + // One accept has completed. Issue a read to handle any <accept>s in + // the future. + if (this->read_stream_.read (this->message_block_, + sizeof (ACE_POSIX_Asynch_Accept_Result), + 0, // ACT + 0) // Priority + == -1) + ACE_ERROR ((LM_ERROR, + "%N:%l:%p\n", + "Read from stream failed")); +} + +// ********************************************************************* + +ACE_POSIX_AIOCB_Proactor::ACE_POSIX_AIOCB_Proactor (void) + : aio_accept_handler_ (0), + aiocb_list_max_size_ (ACE_RTSIG_MAX), + aiocb_list_cur_size_ (0) +{ + // Initialize the array. + for (size_t ai = 0; ai < this->aiocb_list_max_size_; ai++) + aiocb_list_[ai] = 0; + + // Accept Handler for aio_accept. Remember! this issues a Asynch_Read + // on the notify pipe for doing the Asynch_Accept. + ACE_NEW (aio_accept_handler_, + ACE_AIO_Accept_Handler (this)); +} + +// Destructor. +ACE_POSIX_AIOCB_Proactor::~ACE_POSIX_AIOCB_Proactor (void) +{ +} + +int +ACE_POSIX_AIOCB_Proactor::handle_events (ACE_Time_Value &wait_time) +{ + // Decrement <wait_time> with the amount of time spent in the method + ACE_Countdown_Time countdown (&wait_time); + return this->handle_events (wait_time.msec ()); +} + +int +ACE_POSIX_AIOCB_Proactor::handle_events (void) +{ + return this->handle_events (ACE_INFINITE); +} + +ACE_Asynch_Read_Stream_Impl * +ACE_POSIX_AIOCB_Proactor::create_asynch_read_stream (void) +{ + ACE_Asynch_Read_Stream_Impl *implementation = 0; + ACE_NEW_RETURN (implementation, + ACE_POSIX_AIOCB_Asynch_Read_Stream (this), + 0); + return implementation; +} + +ACE_Asynch_Write_Stream_Impl * +ACE_POSIX_AIOCB_Proactor::create_asynch_write_stream (void) +{ + ACE_Asynch_Write_Stream_Impl *implementation = 0; + ACE_NEW_RETURN (implementation, + ACE_POSIX_AIOCB_Asynch_Write_Stream (this), + 0); + return implementation; +} + +ACE_Asynch_Read_File_Impl * +ACE_POSIX_AIOCB_Proactor::create_asynch_read_file (void) +{ + ACE_Asynch_Read_File_Impl *implementation = 0; + ACE_NEW_RETURN (implementation, + ACE_POSIX_AIOCB_Asynch_Read_File (this), + 0); + return implementation; +} + +ACE_Asynch_Write_File_Impl * +ACE_POSIX_AIOCB_Proactor::create_asynch_write_file (void) +{ + ACE_Asynch_Write_File_Impl *implementation = 0; + ACE_NEW_RETURN (implementation, + ACE_POSIX_AIOCB_Asynch_Write_File (this), + 0); + return implementation; +} + +ACE_Asynch_Accept_Impl * +ACE_POSIX_AIOCB_Proactor::create_asynch_accept (void) +{ + ACE_Asynch_Accept_Impl *implementation = 0; + ACE_NEW_RETURN (implementation, + ACE_POSIX_AIOCB_Asynch_Accept (this), + 0); + return implementation; +} + +ACE_Asynch_Transmit_File_Impl * +ACE_POSIX_AIOCB_Proactor::create_asynch_transmit_file (void) +{ + ACE_Asynch_Transmit_File_Impl *implementation = 0; + ACE_NEW_RETURN (implementation, + ACE_POSIX_AIOCB_Asynch_Transmit_File (this), + 0); + return implementation; +} + +int +ACE_POSIX_AIOCB_Proactor::notify_asynch_accept (ACE_POSIX_Asynch_Accept_Result* result) +{ + this->aio_accept_handler_->notify (result); + + return 0; +} + +int +ACE_POSIX_AIOCB_Proactor::handle_events (unsigned long milli_seconds) +{ + if (this->aiocb_list_cur_size_ == 0) + // No aio is pending. + return 0; + + // Wait for asynch operation to complete. + // @@ Assing milli seconds correctly. + timespec timeout; + timeout.tv_sec = milli_seconds; + timeout.tv_nsec = 0; + + if (aio_suspend (this->aiocb_list_, + this->aiocb_list_max_size_, + &timeout) == -1) + { + // If failure is coz of timeout, then return *0* but set errno + // appropriately. This is what the WinNT proactor does. + if (errno == EAGAIN) + ACE_ERROR_RETURN ((LM_ERROR, + "Error:%p\n", + "aio_suspend"), + 0); + else + ACE_ERROR_RETURN ((LM_ERROR, + "Error:%p\n", + "aio_suspend"), + -1); + } + + // Check which aio has finished. + size_t ai; + int error_status = 0; + int return_status = 0; + for (ai = 0; ai < this->aiocb_list_max_size_; ai++) + { + if (aiocb_list_ [ai] == 0) + continue; + + // Analyze error and return values. + + // Get the error status of the aio_ operation. + error_status = aio_error (aiocb_list_[ai]); + if (error_status == -1) + // <aio_error> itself has failed. + ACE_ERROR_RETURN ((LM_ERROR, + "Error:%p\n", + "<aio_error> has failed"), + -1); + + if (error_status == EINPROGRESS) + // <aio_> operation is still in progress. + continue; + + // Error_status is not -1 and not EINPROGRESS. So, an <aio_> + // operation has finished (successfully or unsuccessfully!!!) + + ACE_ERROR ((LM_ERROR, + "%N:%l:error_status = %d\n", + error_status)); + + // Get the return_status of the <aio_> operation. + return_status = aio_return (aiocb_list_[ai]); + if (return_status == -1) + { + // <aio_return> itself has failed. + ACE_ERROR_RETURN ((LM_ERROR, + "Error:%p\n", + "<aio_return> failed"), + -1); + } + else + { + ACE_DEBUG ((LM_DEBUG, "An aio has finished\n")); + break; + } + } + + if (ai == this->aiocb_list_max_size_) + // Nothing completed. + return 0; + + // Retrive the result pointer. + ACE_POSIX_Asynch_Result *asynch_result = + // dynamic_cast <ACE_POSIX_Asynch_Result *> (this->aiocb_list_[ai]); + (ACE_POSIX_Asynch_Result *) (this->aiocb_list_[ai]); + if (asynch_result == 0) + ACE_ERROR_RETURN ((LM_ERROR, + "Error:%N:%l:%p\n", + "Dynamic cast failed"), + -1); + + // Invalidate entry in the aiocb list. + this->aiocb_list_[ai] = 0; + this->aiocb_list_cur_size_--; + + // Call the application code. + // @@ Pass <errno> instead of 0. Check out on LynxOS. It is set + // to 77 somewhere. + this->application_specific_code (asynch_result, + return_status, // Bytes transferred. + 1, // Success + 0, // No completion key. + error_status); // Error + + return 0; +} + +void +ACE_POSIX_AIOCB_Proactor::application_specific_code (ACE_POSIX_Asynch_Result *asynch_result, + u_long bytes_transferred, + int success, + const void *completion_key, + u_long error) +{ + ACE_POSIX_Proactor::application_specific_code (asynch_result, + bytes_transferred, + success, + completion_key, + error); +} + +int +ACE_POSIX_AIOCB_Proactor::register_aio_with_proactor (aiocb *aiocb_ptr) +{ + if (aiocb_ptr == 0) + { + ACE_DEBUG ((LM_DEBUG, + "Status check max %d cur %d\n", + this->aiocb_list_max_size_, + this->aiocb_list_cur_size_)); + + // Just check the status of the list. + if (this->aiocb_list_cur_size_ >= + this->aiocb_list_max_size_) + return -1; + else + return 0; + } + + // Non-zero ptr. Find a free slot and store. + + // Make sure again. + if (this->aiocb_list_cur_size_ >= + this->aiocb_list_max_size_) + ACE_ERROR_RETURN ((LM_ERROR, + "Error:Asynch_Operation: No space to store the <aio> info.\n"), + -1); + + // Slot(s) available. Find a free one. + size_t ai; + for (ai = 0; + ai < this->aiocb_list_max_size_; + ai++) + if (this->aiocb_list_[ai] == 0) + break; + + // Sanity check. + if (ai == this->aiocb_list_max_size_) + ACE_ERROR_RETURN ((LM_ERROR, + "Error:Asynch_Operation: No space to store the <aio> info.\n"), + -1); + + // Store the pointers. + this->aiocb_list_[ai] = aiocb_ptr; + this->aiocb_list_cur_size_ ++; + + return 0; +} + +// ********************************************************************* + +ACE_POSIX_SIG_Proactor::ACE_POSIX_SIG_Proactor (void) +{ + // Make the sigset_t consisting of the completion signals. + if (sigemptyset (&this->RT_completion_signals_) < 0) + ACE_ERROR ((LM_ERROR, + "Error:%p\n", + "Couldn't init the RT completion signal set")); + + if (sigaddset (&this->RT_completion_signals_, ACE_SIG_AIO) < 0) + ACE_ERROR ((LM_ERROR, + "Error:%p\n", + "Couldnt init the RT completion signal set")); + + // Mask them. + if (sigprocmask (SIG_BLOCK, &RT_completion_signals_, 0) < 0) + ACE_ERROR ((LM_ERROR, + "Error:%p\n", + "Couldnt mask the RT completion signals")); + + // Setting up the handler(actually Null handler) for these signals. + struct sigaction reaction; + sigemptyset (&reaction.sa_mask); // Nothing else to mask. + reaction.sa_flags = SA_SIGINFO; // Realtime flag. +#if defined (SA_SIGACTION) + // Lynx says, it is better to set this bit to be portable. + reaction.sa_flags &= SA_SIGACTION; +#endif /* SA_SIGACTION */ + reaction.sa_sigaction = 0; // No handler. + int sigaction_return = sigaction (ACE_SIG_AIO, + &reaction, + 0); + if (sigaction_return == -1) + ACE_ERROR ((LM_ERROR, + "Error:%p\n", + "Proactor couldnt do sigaction for the RT SIGNAL")); +} + + +ACE_POSIX_SIG_Proactor::~ACE_POSIX_SIG_Proactor (void) +{ + // @@ Enable the masked signals again. +} + +int +ACE_POSIX_SIG_Proactor::handle_events (ACE_Time_Value &wait_time) +{ + // Decrement <wait_time> with the amount of time spent in the method + ACE_Countdown_Time countdown (&wait_time); + return this->handle_events (wait_time.msec ()); +} + +int +ACE_POSIX_SIG_Proactor::handle_events (void) +{ + return this->handle_events (ACE_INFINITE); +} + +ACE_Asynch_Read_Stream_Impl * +ACE_POSIX_SIG_Proactor::create_asynch_read_stream (void) +{ + ACE_Asynch_Read_Stream_Impl *implementation = 0; + ACE_NEW_RETURN (implementation, ACE_POSIX_SIG_Asynch_Read_Stream (this), 0); + return implementation; +} + +ACE_Asynch_Write_Stream_Impl * +ACE_POSIX_SIG_Proactor::create_asynch_write_stream (void) +{ + ACE_Asynch_Write_Stream_Impl *implementation = 0; + ACE_NEW_RETURN (implementation, ACE_POSIX_SIG_Asynch_Write_Stream (this), 0); + return implementation; +} + +ACE_Asynch_Read_File_Impl * +ACE_POSIX_SIG_Proactor::create_asynch_read_file (void) +{ + ACE_Asynch_Read_File_Impl *implementation = 0; + ACE_NEW_RETURN (implementation, ACE_POSIX_SIG_Asynch_Read_File (this), 0); + return implementation; +} + +ACE_Asynch_Write_File_Impl * +ACE_POSIX_SIG_Proactor::create_asynch_write_file (void) +{ + ACE_Asynch_Write_File_Impl *implementation = 0; + ACE_NEW_RETURN (implementation, ACE_POSIX_SIG_Asynch_Write_File (this), 0); + return implementation; +} + +ACE_Asynch_Accept_Impl * +ACE_POSIX_SIG_Proactor::create_asynch_accept (void) +{ + ACE_Asynch_Accept_Impl *implementation = 0; + ACE_NEW_RETURN (implementation, ACE_POSIX_SIG_Asynch_Accept (this), 0); + return implementation; +} + +ACE_Asynch_Transmit_File_Impl * +ACE_POSIX_SIG_Proactor::create_asynch_transmit_file (void) +{ + ACE_Asynch_Transmit_File_Impl *implementation = 0; + ACE_NEW_RETURN (implementation, + ACE_POSIX_SIG_Asynch_Transmit_File (this), + 0); + return implementation; +} + +int +ACE_POSIX_SIG_Proactor::handle_events (unsigned long milli_seconds) +{ + // Wait for <milli_seconds> amount of time. + // @@ Assigning <milli_seconds> to tv_sec. + timespec timeout; + timeout.tv_sec = milli_seconds; + timeout.tv_nsec = 0; + + // To get back the signal info. + siginfo_t sig_info; + + // Await the RT completion signal. + int sig_return = sigtimedwait (&this->RT_completion_signals_, + &sig_info, + &timeout); + + // Error case. + // If failure is coz of timeout, then return *0* but set + // errno appropriately. This is what the WinNT proactor + // does. + if (sig_return == -1) + ACE_ERROR_RETURN ((LM_ERROR, + "Error:%p\n", + "Waiting for RT completion signals"), + 0); + + // RT completion signals returned. + if (sig_return != ACE_SIG_AIO) + ACE_ERROR_RETURN ((LM_ERROR, + "Unexpected signal (%d) has been received while waiting for RT Completion Signals\n", + sig_return), + -1); + + // @@ Debugging. + ACE_DEBUG ((LM_DEBUG, + "Sig number found in the sig_info block : %d\n", + sig_info.si_signo)); + + // Is the signo returned consistent? + if (sig_info.si_signo != sig_return) + ACE_ERROR_RETURN ((LM_ERROR, + "Inconsistent signal number (%d) in the signal info block\n", + sig_info.si_signo), + -1); + + // @@ Debugging. + ACE_DEBUG ((LM_DEBUG, + "Signal code for this signal delivery : %d\n", + sig_info.si_code)); + + // Retrive the result pointer. + ACE_POSIX_Asynch_Result *asynch_result = + (ACE_POSIX_Asynch_Result *) sig_info.si_value.sival_ptr; + + // Check the <signal code> and act according to that. + if (sig_info.si_code == SI_ASYNCIO) + { + // Retrieve the aiocb from Result ptr. + // @@ Some checking should be done to make sure this pointer + // is valid. Otherwise <aio_error> will bomb. + aiocb* aiocb_ptr = + (aiocb *)asynch_result; + + // Analyze error and return values. Return values are + // actually <errno>'s associated with the <aio_> call + // corresponding to aiocb_ptr. + int error_status = aio_error (aiocb_ptr); + if (error_status == -1) + ACE_ERROR_RETURN ((LM_ERROR, + "Error:%p\n", + "Invalid control block was sent to <aio_error> for compleion querying"), + -1); + + // Completion signal has been received, so it can't be in + // progress. + // ACE_ASSERT (error_status != EINPROGRESS) + + // No error occured in the AIO operation. + int return_status = aio_return (aiocb_ptr); + if (return_status == -1) + { + ACE_ERROR_RETURN ((LM_ERROR, + "Error:%p\n", + "Invalid control block was send to <aio_return>"), + -1); + } + else + { + ACE_DEBUG ((LM_DEBUG, "An aio has finished\n")); + + this->application_specific_code (asynch_result, + return_status, + 1, // Result : True. + 0, // No completion_signal. + error_status); // Error. + } + } + else if (sig_info.si_code == SI_QUEUE) + { + // @@ Just debugging. + ACE_DEBUG ((LM_DEBUG, "<sigqueue>'d signal received\n")); + + this->application_specific_code (asynch_result, + 0, // No bytes transferred. + 1, // Result : True. + 0, // No completion key. + 0); // No error. + } + else + // Unknown signal code. + ACE_ERROR_RETURN ((LM_DEBUG, + "Unexpected signal code (%d) returned on completion querying\n", + sig_info.si_code), + -1); + + return 0; +} + +#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION) +#if 0 +template class ACE_Timer_Queue_T<ACE_Handler *, + ACE_Proactor_Handle_Timeout_Upcall, + ACE_SYNCH_RECURSIVE_MUTEX>; +template class ACE_Timer_Queue_Iterator_T<ACE_Handler *, + ACE_Proactor_Handle_Timeout_Upcall, + ACE_SYNCH_RECURSIVE_MUTEX>; +template class ACE_Timer_List_T<ACE_Handler *, + ACE_Proactor_Handle_Timeout_Upcall, + ACE_SYNCH_RECURSIVE_MUTEX>; +template class ACE_Timer_List_Iterator_T<ACE_Handler *, + ACE_Proactor_Handle_Timeout_Upcall, + ACE_SYNCH_RECURSIVE_MUTEX>; +template class ACE_Timer_Node_T<ACE_Handler *>; +template class ACE_Unbounded_Set<ACE_Timer_Node_T<ACE_Handler *> *>; +template class ACE_Unbounded_Set_Iterator<ACE_Timer_Node_T<ACE_Handler *> *>; +template class ACE_Node <ACE_Timer_Node_T<ACE_Handler *> *>; +template class ACE_Free_List<ACE_Timer_Node_T<ACE_Handler *> >; +template class ACE_Locked_Free_List<ACE_Timer_Node_T<ACE_Handler *>, ACE_Null_Mutex>; +template class ACE_Timer_Heap_T<ACE_Handler *, + ACE_Proactor_Handle_Timeout_Upcall, + ACE_SYNCH_RECURSIVE_MUTEX>; +template class ACE_Timer_Heap_Iterator_T<ACE_Handler *, + ACE_Proactor_Handle_Timeout_Upcall, + ACE_SYNCH_RECURSIVE_MUTEX>; +template class ACE_Timer_Wheel_T<ACE_Handler *, + ACE_Proactor_Handle_Timeout_Upcall, + ACE_SYNCH_RECURSIVE_MUTEX>; +template class ACE_Timer_Wheel_Iterator_T<ACE_Handler *, + ACE_Proactor_Handle_Timeout_Upcall, + ACE_SYNCH_RECURSIVE_MUTEX>; +#endif /* 0 */ +#elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA) +#pragma instantiate ACE_Timer_Queue_T<ACE_Handler *,\ + ACE_Proactor_Handle_Timeout_Upcall, ACE_SYNCH_RECURSIVE_MUTEX> +#pragma instantiate ACE_Timer_Queue_Iterator_T<ACE_Handler *,\ + ACE_Proactor_Handle_Timeout_Upcall, ACE_SYNCH_RECURSIVE_MUTEX> +#pragma instantiate ACE_Timer_List_T<ACE_Handler *,\ + ACE_Proactor_Handle_Timeout_Upcall, ACE_SYNCH_RECURSIVE_MUTEX> +#pragma instantiate ACE_Timer_List_Iterator_T<ACE_Handler *,\ + ACE_Proactor_Handle_Timeout_Upcall, ACE_SYNCH_RECURSIVE_MUTEX> +#pragma instantiate ACE_Timer_Heap_T<ACE_Handler *,\ + ACE_Proactor_Handle_Timeout_Upcall, ACE_SYNCH_RECURSIVE_MUTEX> +#pragma instantiate ACE_Timer_Heap_Iterator_T<ACE_Handler *,\ + ACE_Proactor_Handle_Timeout_Upcall, ACE_SYNCH_RECURSIVE_MUTEX> +#pragma instantiate ACE_Timer_Wheel_T<ACE_Handler *, + ACE_Proactor_Handle_Timeout_Upcall, ACE_SYNCH_RECURSIVE_MUTEX> +#pragma instantiate ACE_Timer_Wheel_Iterator_T<ACE_Handler *,\ + ACE_Proactor_Handle_Timeout_Upcall, ACE_SYNCH_RECURSIVE_MUTEX> +#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */ + +#endif /* ACE_HAS_AIO_CALLS */ |