diff options
-rw-r--r-- | ChangeLog | 29 | ||||
-rw-r--r-- | ChangeLogs/ChangeLog-02a | 29 | ||||
-rw-r--r-- | ChangeLogs/ChangeLog-03a | 29 | ||||
-rw-r--r-- | ace/Makefile | 1 | ||||
-rw-r--r-- | ace/POSIX_CB_Proactor.cpp | 199 | ||||
-rw-r--r-- | ace/POSIX_CB_Proactor.h | 105 | ||||
-rw-r--r-- | ace/POSIX_CB_Proactor.i | 8 | ||||
-rw-r--r-- | ace/POSIX_Proactor.cpp | 356 | ||||
-rw-r--r-- | ace/POSIX_Proactor.h | 26 | ||||
-rw-r--r-- | ace/SUN_Proactor.cpp | 172 | ||||
-rw-r--r-- | ace/SUN_Proactor.h | 6 | ||||
-rw-r--r-- | tests/Proactor_Test.cpp | 161 |
12 files changed, 907 insertions, 214 deletions
diff --git a/ChangeLog b/ChangeLog index 84d95eb23ec..7f17e2ad2b8 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,32 @@ +Thu Apr 25 15:46:39 2002 Steve Huston <shuston@riverace.com> + + POSIX asynch I/O improvements/corrections submitted by Alex + Libman <alibman@ihug.com.au>: + + * ace/POSIX_CB_Proactor.{h i cpp}: New POSIX Proactor implementation + that makes use of the AIO facility's callback feature. This has + only been tested on SGI Irix. + + * ace/Makefile: Added POSIX_CB_Proactor. + + * ace/POSIX_Proactor.{h cpp}: Refactored some code and added better + cancellation/cleanup handling. Added hooks for the new + ACE_POSIX_CB_Proactor class and refactored methods + close(), get_result_status(), create_result_aiocb_list(), and + delete_result_aiocb_list(). + + * ace/SUN_Proactor.{h cpp}: Add the new get_result_status() method + and refactored the old results code into the new method. Also, + make good use of the new aiocb create/delete methods. Improvements + to operation status detection based on input from Sun. + + * tests/Proactor_Test.cpp: Add support for new ACE_POSIX_CB_Proactor. + Added a log lock to be sure all messages from one transaction are + logged together. Also, sets up full duplex I/O for Windows and + Solaris; half duplex for all others due to general weakness in + AIO subsystems. If further testing reveals that more can be set + to full duplex, this can be expanded. + Thu Apr 25 14:02:38 2002 Steve Huston <shuston@riverace.com> * tests/MT_Reactor_Upcall_Test.cpp: Fixed compile errors by using diff --git a/ChangeLogs/ChangeLog-02a b/ChangeLogs/ChangeLog-02a index 84d95eb23ec..7f17e2ad2b8 100644 --- a/ChangeLogs/ChangeLog-02a +++ b/ChangeLogs/ChangeLog-02a @@ -1,3 +1,32 @@ +Thu Apr 25 15:46:39 2002 Steve Huston <shuston@riverace.com> + + POSIX asynch I/O improvements/corrections submitted by Alex + Libman <alibman@ihug.com.au>: + + * ace/POSIX_CB_Proactor.{h i cpp}: New POSIX Proactor implementation + that makes use of the AIO facility's callback feature. This has + only been tested on SGI Irix. + + * ace/Makefile: Added POSIX_CB_Proactor. + + * ace/POSIX_Proactor.{h cpp}: Refactored some code and added better + cancellation/cleanup handling. Added hooks for the new + ACE_POSIX_CB_Proactor class and refactored methods + close(), get_result_status(), create_result_aiocb_list(), and + delete_result_aiocb_list(). + + * ace/SUN_Proactor.{h cpp}: Add the new get_result_status() method + and refactored the old results code into the new method. Also, + make good use of the new aiocb create/delete methods. Improvements + to operation status detection based on input from Sun. + + * tests/Proactor_Test.cpp: Add support for new ACE_POSIX_CB_Proactor. + Added a log lock to be sure all messages from one transaction are + logged together. Also, sets up full duplex I/O for Windows and + Solaris; half duplex for all others due to general weakness in + AIO subsystems. If further testing reveals that more can be set + to full duplex, this can be expanded. + Thu Apr 25 14:02:38 2002 Steve Huston <shuston@riverace.com> * tests/MT_Reactor_Upcall_Test.cpp: Fixed compile errors by using diff --git a/ChangeLogs/ChangeLog-03a b/ChangeLogs/ChangeLog-03a index 84d95eb23ec..7f17e2ad2b8 100644 --- a/ChangeLogs/ChangeLog-03a +++ b/ChangeLogs/ChangeLog-03a @@ -1,3 +1,32 @@ +Thu Apr 25 15:46:39 2002 Steve Huston <shuston@riverace.com> + + POSIX asynch I/O improvements/corrections submitted by Alex + Libman <alibman@ihug.com.au>: + + * ace/POSIX_CB_Proactor.{h i cpp}: New POSIX Proactor implementation + that makes use of the AIO facility's callback feature. This has + only been tested on SGI Irix. + + * ace/Makefile: Added POSIX_CB_Proactor. + + * ace/POSIX_Proactor.{h cpp}: Refactored some code and added better + cancellation/cleanup handling. Added hooks for the new + ACE_POSIX_CB_Proactor class and refactored methods + close(), get_result_status(), create_result_aiocb_list(), and + delete_result_aiocb_list(). + + * ace/SUN_Proactor.{h cpp}: Add the new get_result_status() method + and refactored the old results code into the new method. Also, + make good use of the new aiocb create/delete methods. Improvements + to operation status detection based on input from Sun. + + * tests/Proactor_Test.cpp: Add support for new ACE_POSIX_CB_Proactor. + Added a log lock to be sure all messages from one transaction are + logged together. Also, sets up full duplex I/O for Windows and + Solaris; half duplex for all others due to general weakness in + AIO subsystems. If further testing reveals that more can be set + to full duplex, this can be expanded. + Thu Apr 25 14:02:38 2002 Steve Huston <shuston@riverace.com> * tests/MT_Reactor_Upcall_Test.cpp: Fixed compile errors by using diff --git a/ace/Makefile b/ace/Makefile index 5133d8396d2..58a627a9349 100644 --- a/ace/Makefile +++ b/ace/Makefile @@ -94,6 +94,7 @@ DEMUX_FILES = \ FlReactor \ Msg_WFMO_Reactor \ POSIX_Proactor \ + POSIX_CB_Proactor \ WIN32_Proactor \ Priority_Reactor \ Proactor \ diff --git a/ace/POSIX_CB_Proactor.cpp b/ace/POSIX_CB_Proactor.cpp new file mode 100644 index 00000000000..8aa479d168c --- /dev/null +++ b/ace/POSIX_CB_Proactor.cpp @@ -0,0 +1,199 @@ +/* -*- C++ -*- */ +// $Id$ + +#include "ace/POSIX_CB_Proactor.h" + +#if defined (ACE_HAS_AIO_CALLS) && defined (__sgi) + +#include "ace/Task_T.h" +#include "ace/Log_Msg.h" +#include "ace/Object_Manager.h" + +#if !defined (__ACE_INLINE__) +#include "ace/POSIX_CB_Proactor.i" +#endif /* __ACE_INLINE__ */ + +ACE_POSIX_CB_Proactor::ACE_POSIX_CB_Proactor (size_t max_aio_operations) + : ACE_POSIX_AIOCB_Proactor (max_aio_operations, + ACE_POSIX_Proactor::PROACTOR_CB), + sema_ (0) +{ + // we should start pseudo-asynchronous accept task + // one per all future acceptors + + this->get_asynch_pseudo_task ().start (); +} + +// Destructor. +ACE_POSIX_CB_Proactor::~ACE_POSIX_CB_Proactor (void) +{ + this->close (); +} + +int +ACE_POSIX_CB_Proactor::handle_events (ACE_Time_Value &wait_time) +{ + // Decrement <wait_time> with the amount of time spent in the method + ACE_Countdown_Time countdown (&wait_time); + return this->handle_events (wait_time.msec ()); +} + +int +ACE_POSIX_CB_Proactor::handle_events (void) +{ + return this->handle_events (ACE_INFINITE); +} + +int +ACE_POSIX_CB_Proactor::handle_events (unsigned long milli_seconds) +{ + + int result_wait=0; + + // Wait for the signals. + if (milli_seconds == ACE_INFINITE) + { + result_wait = this->sema_.acquire(); + } + else + { + // Wait for <milli_seconds> amount of time. + ACE_Time_Value abs_time = ACE_OS::gettimeofday () + + ACE_Time_Value ( milli_seconds); + + result_wait = this->sema_.acquire(abs_time); + } + + + + // Check for errors + // but let continue work in case of errors + // we should check "post_completed" queue + if ( result_wait == -1) + { + if (errno != ETIME && // timeout + errno != EINTR ) // interrupted system call + ACE_ERROR ((LM_ERROR, + "%N:%l:(%P | %t)::%p\n", + "ACE_POSIX_CB_Proactor::handle_events:" + "semaphore acquire failed" + )); + } + + size_t index = 0; // start index to scan aiocb list + size_t count = aiocb_list_max_size_; // max number to iterate + + int error_status = 0; + int return_status = 0; + + int ret_aio = 0; + int ret_que = 0; + + for (;; ret_aio++) + { + ACE_POSIX_Asynch_Result * asynch_result = + find_completed_aio (error_status, + return_status, + index, + count); + + if (asynch_result == 0) + break; + + // Call the application code. + this->application_specific_code (asynch_result, + return_status, // Bytes transferred. + 1, // Success + 0, // No completion key. + error_status); // Error + } + + // process post_completed results + ret_que = this->process_result_queue (); + + // Uncomment this if you want to test + // and research the behavior of you system + // ACE_DEBUG ((LM_DEBUG, + // "(%t) NumAIO=%d NumQueue=%d\n", + // ret_aio, ret_que)); + + return ret_aio + ret_que > 0 ? 1 : 0; +} + +void ACE_POSIX_CB_Proactor::aio_completion_func ( sigval_t cb_data ) +{ +#if defined (__FreeBSD__) + ACE_POSIX_CB_Proactor * impl = ACE_static_cast (ACE_POSIX_CB_Proactor *, cb_data.sigval_ptr); +#else + ACE_POSIX_CB_Proactor * impl = ACE_static_cast (ACE_POSIX_CB_Proactor *, cb_data.sival_ptr); +#endif + + if ( impl != 0 ) + impl->notify_completion (0); +} + +int +ACE_POSIX_CB_Proactor::notify_completion(int sig_num) +{ + ACE_UNUSED_ARG (sig_num); + + return this->sema_.release(); +} + + +int +ACE_POSIX_CB_Proactor::allocate_aio_slot (ACE_POSIX_Asynch_Result *result) +{ + size_t i = 0; + + //try to find free slot as usual, starting from 0 + for (i = 0; i < this->aiocb_list_max_size_; i++) + if (result_list_[i] == 0) + break; + + if (i >= this->aiocb_list_max_size_) + ACE_ERROR_RETURN ((LM_ERROR, + "%N:%l:(%P | %t)::\n" + "ACE_POSIX_CB_Proactor::allocate_aio_slot " + "internal Proactor error 1\n"), + -1); + + int retval = ACE_static_cast (int, i); + + // setup OS notification methods for this aio + // store index!!, not pointer in signal info + result->aio_sigevent.sigev_notify = SIGEV_CALLBACK; + result->aio_sigevent.sigev_func = aio_completion_func ; + +#if defined (__FreeBSD__) + result->aio_sigevent.sigev_value.sigval_ptr = this ; +#else + result->aio_sigevent.sigev_value.sival_ptr = this ; +#endif /* __FreeBSD__ */ + + return retval; +} + +int +ACE_POSIX_CB_Proactor::get_result_status ( ACE_POSIX_Asynch_Result* asynch_result, + int & error_status, + int & return_status ) +{ + return ACE_POSIX_AIOCB_Proactor::get_result_status (asynch_result, + error_status, + return_status ); +} + +int +ACE_POSIX_CB_Proactor::cancel_aiocb (ACE_POSIX_Asynch_Result *result) +{ + return ACE_POSIX_AIOCB_Proactor::cancel_aiocb (result); +} + +int +ACE_POSIX_CB_Proactor::cancel_aio (ACE_HANDLE handle) +{ + return ACE_POSIX_AIOCB_Proactor::cancel_aio (handle); +} + +#endif /* ACE_HAS_AIO_CALLS && __sgi */ diff --git a/ace/POSIX_CB_Proactor.h b/ace/POSIX_CB_Proactor.h new file mode 100644 index 00000000000..e8315e3c8d2 --- /dev/null +++ b/ace/POSIX_CB_Proactor.h @@ -0,0 +1,105 @@ +/* -*- C++ -*- */ + +//============================================================================= +/** + * @file POSIX_CB_Proactor.h + * + * $Id$ + * + * @author Alexander Libman <alibman@ihug.com.au> + */ +//============================================================================= + +#ifndef ACE_POSIX_CB_PROACTOR_H +#define ACE_POSIX_CB_PROACTOR_H + +#include "ace/config-all.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +#if defined (ACE_HAS_AIO_CALLS) && defined (__sgi) + +#include "ace/POSIX_Proactor.h" + +/** + * @class ACE_POSIX_CB_Proactor + * + * @brief Implementation of SGI IRIX Proactor + * }; + */ +class ACE_Export ACE_POSIX_CB_Proactor : public ACE_POSIX_AIOCB_Proactor +{ + +public: + virtual Proactor_Type get_impl_type (void); + + /// Destructor. + virtual ~ACE_POSIX_CB_Proactor (void); + + /// Constructor defines max number asynchronous operations that can + /// be started at the same time. + ACE_POSIX_CB_Proactor (size_t max_aio_operations = ACE_AIO_DEFAULT_SIZE); + +protected: + + static void aio_completion_func ( sigval_t cb_data ); + + /** + * Dispatch a single set of events. If <wait_time> elapses before + * any events occur, return 0. Return 1 on success i.e., when a + * completion is dispatched, non-zero (-1) on errors and errno is + * set accordingly. + */ + virtual int handle_events (ACE_Time_Value &wait_time); + + /** + * Dispatch a single set of events. If <milli_seconds> elapses + * before any events occur, return 0. Return 1 if a completion is + * dispatched. Return -1 on errors. + */ + virtual int handle_events (u_long milli_seconds); + + /** + * Block indefinitely until at least one event is dispatched. + * Dispatch a single set of events. If <wait_time> elapses before + * any events occur, return 0. Return 1 on success i.e., when a + * completion is dispatched, non-zero (-1) on errors and errno is + * set accordingly. + */ + virtual int handle_events (void); + + + /// Check AIO for completion, error and result status + /// Return: 1 - AIO completed , 0 - not completed yet + virtual int get_result_status ( ACE_POSIX_Asynch_Result* asynch_result, + int & error_status, + int & return_status ); + + + + /// From ACE_POSIX_AIOCB_Proactor. + /// Attempt to cancel running request + virtual int cancel_aiocb (ACE_POSIX_Asynch_Result *result); + virtual int cancel_aio (ACE_HANDLE handle); + + /// Find free slot to store result and aiocb pointer + virtual int allocate_aio_slot (ACE_POSIX_Asynch_Result *result); + + /// Notify queue of "post_completed" ACE_POSIX_Asynch_Results + /// called from post_completion method + virtual int notify_completion (int sig_num); + + + /// semaphore variable to notify + /// used to wait the first AIO start + ACE_SYNCH_SEMAPHORE sema_; +}; + +#if defined (__ACE_INLINE__) +#include "ace/POSIX_CB_Proactor.i" +#endif /* __ACE_INLINE__ */ + +#endif /* ACE_HAS_AIO_CALLS && __sgi */ +#endif /* ACE_POSIX_CB_PROACTOR_H*/ diff --git a/ace/POSIX_CB_Proactor.i b/ace/POSIX_CB_Proactor.i new file mode 100644 index 00000000000..6cc8655ddba --- /dev/null +++ b/ace/POSIX_CB_Proactor.i @@ -0,0 +1,8 @@ +/* -*- C++ -*- */ +// $Id$ + +ACE_INLINE +ACE_POSIX_Proactor::Proactor_Type ACE_POSIX_CB_Proactor::get_impl_type (void) +{ + return PROACTOR_CB; +} diff --git a/ace/POSIX_Proactor.cpp b/ace/POSIX_Proactor.cpp index 381d7521303..c3fba575a62 100644 --- a/ace/POSIX_Proactor.cpp +++ b/ace/POSIX_Proactor.cpp @@ -562,23 +562,51 @@ ACE_AIOCB_Notify_Pipe_Manager::ACE_AIOCB_Notify_Pipe_Manager (ACE_POSIX_AIOCB_Pr ACE_AIOCB_Notify_Pipe_Manager::~ACE_AIOCB_Notify_Pipe_Manager (void) { + // 1. try to cancel pending aio + this->read_stream_.cancel (); + + // 2. close both handles + // Destuctor of ACE_Pipe does not close handles. + // We can not use ACE_Pipe::close() as it + // closes read_handle and than write_handle. + // In some systems close() may wait for + // completion for all asynch. pending requests. + // So we should close write_handle firstly + // to force read completion ( if 1. does not help ) + // and then read_handle and not vice versa + + ACE_HANDLE h = this->pipe_.write_handle (); + if (h != ACE_INVALID_HANDLE) + ACE_OS::closesocket (h); + + h = this->pipe_.read_handle (); + if ( h != ACE_INVALID_HANDLE) + ACE_OS::closesocket (h); + } + int ACE_AIOCB_Notify_Pipe_Manager::notify () { // Send the result pointer through the pipe. char char_send = 0; - int ret_val = ACE::send (this->pipe_.write_handle (), - & char_send , - sizeof (char_send)); + int ret_val = ACE::send (this->pipe_.write_handle (), + &char_send, + sizeof (char_send)); + + if (ret_val < 0) + { + if (errno != EWOULDBLOCK) +#if 0 + ACE_ERROR ((LM_ERROR, + ACE_LIB_TEXT ("(%P %t):%p\n"), + ACE_LIB_TEXT ("ACE_AIOCB_Notify_Pipe_Manager::notify") + ACE_LIB_TEXT ("Error:Writing on to notify pipe failed"))); +#endif /* 0 */ + return -1; + } - if (ret_val < 0 && errno != EWOULDBLOCK) - ACE_ERROR_RETURN ((LM_ERROR, - "(%P %t):%p\n", - "ACE_AIOCB_Notify_Pipe_Manager::notify" - "Error:Writing on to notify pipe failed"), - -1); return 0; } @@ -595,15 +623,14 @@ ACE_AIOCB_Notify_Pipe_Manager::handle_read_stream // One accept has completed. Issue a read to handle any // <post_completion>s in the future. - if (this->read_stream_.read (this->message_block_, - 1, // enough to read 1 byte - 0, // ACT - 0) // Priority - == -1) + if (-1 == this->read_stream_.read (this->message_block_, + 1, // enough to read 1 byte + 0, // ACT + 0)) // Priority ACE_ERROR ((LM_ERROR, - "%N:%l:(%P | %t):%p\n", - "ACE_AIOCB_Notify_Pipe_Manager::handle_read_stream:" - "Read from pipe failed")); + ACE_LIB_TEXT ("%N:%l:(%P | %t):%p\n"), + ACE_LIB_TEXT ("ACE_AIOCB_Notify_Pipe_Manager::handle_read_stream:") + ACE_LIB_TEXT ("Read from pipe failed"))); // 2. Do the upcalls @@ -621,22 +648,12 @@ ACE_POSIX_AIOCB_Proactor::ACE_POSIX_AIOCB_Proactor (size_t max_aio_operations) num_deferred_aiocb_ (0), num_started_aio_ (0) { - //check for correct value for max_aio_operations + // Check for correct value for max_aio_operations check_max_aio_num (); - ACE_NEW (aiocb_list_, - aiocb *[aiocb_list_max_size_]); - ACE_NEW (result_list_, - ACE_POSIX_Asynch_Result *[aiocb_list_max_size_]); + this->create_result_aiocb_list (); - // Initialize the array. - for (size_t ai = 0; ai < this->aiocb_list_max_size_; ai++) - { - aiocb_list_[ai] = 0; - result_list_[ai] = 0; - } - - create_notify_manager (); + this->create_notify_manager (); // start pseudo-asynchronous accept task // one per all future acceptors @@ -646,7 +663,7 @@ ACE_POSIX_AIOCB_Proactor::ACE_POSIX_AIOCB_Proactor (size_t max_aio_operations) // Special protected constructor for ACE_SUN_Proactor ACE_POSIX_AIOCB_Proactor::ACE_POSIX_AIOCB_Proactor (size_t max_aio_operations, - ACE_POSIX_Proactor::Proactor_Type ptype) + ACE_POSIX_Proactor::Proactor_Type) : aiocb_notify_pipe_manager_ (0), aiocb_list_ (0), result_list_ (0), @@ -656,15 +673,49 @@ ACE_POSIX_AIOCB_Proactor::ACE_POSIX_AIOCB_Proactor (size_t max_aio_operations, num_deferred_aiocb_ (0), num_started_aio_ (0) { - ACE_UNUSED_ARG (ptype); - //check for correct value for max_aio_operations - check_max_aio_num (); + this->check_max_aio_num (); + + this->create_result_aiocb_list (); + + // @@ We should create Notify_Pipe_Manager in the derived class to + // provide correct calls for virtual functions !!! +} + +// Destructor. +ACE_POSIX_AIOCB_Proactor::~ACE_POSIX_AIOCB_Proactor (void) +{ + this->close(); +} + +int +ACE_POSIX_AIOCB_Proactor::close (void) +{ + // stop asynch accept task + this->get_asynch_pseudo_task().stop (); + + this->delete_notify_manager (); + + this->clear_result_queue (); + + return this->delete_result_aiocb_list (); +} + +void ACE_POSIX_AIOCB_Proactor::set_notify_handle (ACE_HANDLE h) +{ + notify_pipe_read_handle_ = h; +} - ACE_NEW (aiocb_list_, - aiocb *[aiocb_list_max_size_]); - ACE_NEW (result_list_, - ACE_POSIX_Asynch_Result *[aiocb_list_max_size_]); +int ACE_POSIX_AIOCB_Proactor::create_result_aiocb_list (void) +{ + if (aiocb_list_ != 0) + return 0; + + ACE_NEW_RETURN (aiocb_list_, aiocb *[aiocb_list_max_size_], -1); + + ACE_NEW_RETURN (result_list_, + ACE_POSIX_Asynch_Result *[aiocb_list_max_size_], + -1); // Initialize the array. for (size_t ai = 0; ai < this->aiocb_list_max_size_; ai++) @@ -673,54 +724,99 @@ ACE_POSIX_AIOCB_Proactor::ACE_POSIX_AIOCB_Proactor (size_t max_aio_operations, result_list_[ai] = 0; } - - // @@ We should create Notify_Pipe_Manager in the derived class to - // provide correct calls for virtual functions !!! + return 0; } -// Destructor. -ACE_POSIX_AIOCB_Proactor::~ACE_POSIX_AIOCB_Proactor (void) +int ACE_POSIX_AIOCB_Proactor::delete_result_aiocb_list (void) { - // stop asynch accept task - this->get_asynch_pseudo_task().stop (); + if (aiocb_list_ == 0) // already deleted + return 0; + + // try to cancel all uncomlpeted operarion + // POSIX systems may have hidden system threads + // that still can work with our aiocb's! + for (size_t ai = 0; ai < aiocb_list_max_size_; ai++) + { + if (this->aiocb_list_[ai] != 0) // active operation + this->cancel_aiocb (result_list_[ai]); + } - delete_notify_manager (); + int num_pending = 0; - // delete all uncomlpeted operarion - // as nobody will notify client since now for (size_t ai = 0; ai < aiocb_list_max_size_; ai++) { - delete result_list_[ai]; - result_list_[ai] = 0; - aiocb_list_[ai] = 0; + if (this->aiocb_list_[ai] == 0 ) // not active operation + continue; + + // Get the error and return status of the aio_ operation. + int error_status = 0; + int return_status = 0; + int flg_completed = this->get_result_status (result_list_[ai], + error_status, + return_status); + + //don't delete uncompleted AIOCB's + if (flg_completed == 0) // not completed !!! + { + num_pending++; +#if 0 + char * errtxt = ACE_OS::strerror (error_status); + if (errtxt == 0) + errtxt ="?????????"; + + char * op = (aiocb_list_[ai]->aio_lio_opcode == LIO_WRITE )? + "WRITE":"READ" ; + + + ACE_ERROR ((LM_ERROR, + ACE_LIB_TEXT("slot=%d op=%s status=%d return=%d %s\n"), + ai, + op, + error_status, + return_status, + errtxt )); +#endif /* 0 */ + } + else // completed , OK + { + delete this->result_list_[ai]; + this->result_list_[ai] = 0; + this->aiocb_list_[ai] = 0; + } } - delete [] aiocb_list_; - aiocb_list_ = 0; + //if it is not possible cancel some operation (num_pending > 0 ), + //we can we do only one thing -report about this + //and complain about POSIX implementation + //we know that we have memory leaks, + //but it is better than segmentation fault! + ACE_DEBUG ((LM_DEBUG, + ACE_LIB_TEXT("ACE_POSIX_AIOCB_Proactor::delete_result_aiocb_list\n") + ACE_LIB_TEXT(" number pending AIO=%d\n"), + num_pending + )); - delete [] result_list_; - result_list_ = 0; + delete [] this->aiocb_list_; + this->aiocb_list_ = 0; - clear_result_queue (); -} + delete [] this->result_list_; + this->result_list_ = 0; -void ACE_POSIX_AIOCB_Proactor::set_notify_handle (ACE_HANDLE h) -{ - notify_pipe_read_handle_ = h; + return (num_pending == 0 ? 0 : -1); + // ?? or just always return 0; } void ACE_POSIX_AIOCB_Proactor::check_max_aio_num () { - long max_os_aio_num = ACE_OS ::sysconf (_SC_AIO_MAX); + long max_os_aio_num = ACE_OS::sysconf (_SC_AIO_MAX); // Define max limit AIO's for concrete OS // -1 means that there is no limit, but it is not true // (example, SunOS 5.6) - if (max_os_aio_num > 0 - && aiocb_list_max_size_ > (unsigned long) max_os_aio_num - ) + if (max_os_aio_num > 0 && + aiocb_list_max_size_ > (unsigned long) max_os_aio_num) aiocb_list_max_size_ = max_os_aio_num; #if defined (HPUX) @@ -729,7 +825,7 @@ void ACE_POSIX_AIOCB_Proactor::check_max_aio_num () // it has a limit 256 max elements for aio_suspend () // It is a pity, but ... - long max_os_listio_num = ACE_OS ::sysconf (_SC_AIO_LISTIO_MAX); + long max_os_listio_num = ACE_OS::sysconf (_SC_AIO_LISTIO_MAX); if (max_os_listio_num > 0 && aiocb_list_max_size_ > (unsigned long) max_os_listio_num) aiocb_list_max_size_ = max_os_listio_num; @@ -762,6 +858,39 @@ void ACE_POSIX_AIOCB_Proactor::check_max_aio_num () "(%P | %t) ACE_POSIX_AIOCB_Proactor::Max Number of AIOs=%d\n", aiocb_list_max_size_)); +#if defined(__sgi) + + ACE_DEBUG((LM_DEBUG, + ACE_LIB_TEXT( "SGI IRIX specific: aio_init!\n"))); + +//typedef struct aioinit { +// int aio_threads; /* The number of aio threads to start (5) */ +// int aio_locks; /* Initial number of preallocated locks (3) */ +// int aio_num; /* estimated total simultanious aiobc structs (1000) */ +// int aio_usedba; /* Try to use DBA for raw I/O in lio_listio (0) */ +// int aio_debug; /* turn on debugging (0) */ +// int aio_numusers; /* max number of user sprocs making aio_* calls (5) */ +// int aio_reserved[3]; +//} aioinit_t; + + aioinit_t aioinit; + + aioinit.aio_threads = 10; /* The number of aio threads to start (5) */ + aioinit.aio_locks = 20; /* Initial number of preallocated locks (3) */ + /* estimated total simultaneous aiobc structs (1000) */ + aioinit.aio_num = aiocb_list_max_size_; + aioinit.aio_usedba = 0; /* Try to use DBA for raw IO in lio_listio (0) */ + aioinit.aio_debug = 0; /* turn on debugging (0) */ + aioinit.aio_numusers = 100; /* max number of user sprocs making aio_* calls (5) */ + aioinit.aio_reserved[0] = 0; + aioinit.aio_reserved[1] = 0; + aioinit.aio_reserved[2] = 0; + + aio_sgi_init (&aioinit); + +#endif + + return; } void @@ -834,7 +963,10 @@ ACE_POSIX_AIOCB_Proactor::putq_result (ACE_POSIX_Asynch_Result *result) "%N:%l:ACE_POSIX_AIOCB_Proactor::putq_result failed\n"), -1); - this->notify_completion (sig_num); + // let try not to overflow signal queue or notification pipe + + if (this->result_queue_.size () == 1) + this->notify_completion (sig_num); return 0; } @@ -1054,6 +1186,47 @@ ACE_POSIX_AIOCB_Proactor::handle_events (u_long milli_seconds) return retval > 0 ? 1 : 0; } +int +ACE_POSIX_AIOCB_Proactor::get_result_status (ACE_POSIX_Asynch_Result* asynch_result, + int &error_status, + int &return_status) +{ + return_status = 0; + + // Get the error status of the aio_ operation. + error_status = aio_error (asynch_result); + +#if 0 + if (error_status == -1) // <aio_error> itself has failed. + ACE_ERROR ((LM_ERROR, + "%N:%l:(%P | %t)::%p\n", + "ACE_POSIX_AIOCB_Proactor::get_result_status:" + "<aio_error> has failed\n")); +#endif /* 0 */ + + if (error_status == EINPROGRESS) + { + return_status = 0; + return 0; // not completed + } + + return_status = aio_return (asynch_result); + + if (return_status < 0) + { + return_status = 0; // zero bytes transferred +#if 0 + if (error_status == 0) // nonsense + ACE_ERROR ((LM_ERROR, + "%N:%l:(%P | %t)::%p\n", + "ACE_POSIX_AIOCB_Proactor::get_result_status:" + "<aio_return> failed\n")); +#endif /* 0 */ + } + + return 1; // completed +} + ACE_POSIX_Asynch_Result * ACE_POSIX_AIOCB_Proactor::find_completed_aio (int &error_status, int &return_status, @@ -1082,30 +1255,9 @@ ACE_POSIX_AIOCB_Proactor::find_completed_aio (int &error_status, if (aiocb_list_[index] == 0) // Dont process null blocks. continue; - // Get the error status of the aio_ operation. - error_status = aio_error (aiocb_list_[index]); - - if (error_status == -1) // <aio_error> itself has failed. - { - ACE_ERROR ((LM_ERROR, - "%N:%l:(%P | %t)::%p\n", - "ACE_POSIX_AIOCB_Proactor::find_completed_aio:" - "<aio_error> has failed\n")); - - break; - - // we should notify user, otherwise : - // memory leak for result and "hanging" user - // what was before skip this operation - - //aiocb_list_[index] = 0; - //result_list_[index] = 0; - //aiocb_list_cur_size_--; - //continue; - } - - // Continue the loop if <aio_> operation is still in progress. - if (error_status != EINPROGRESS) + if (0 != this->get_result_status (result_list_[index], + error_status, + return_status)) // completed break; } // end for @@ -1113,25 +1265,6 @@ ACE_POSIX_AIOCB_Proactor::find_completed_aio (int &error_status, if (count == 0) // all processed , nothing found return asynch_result; - if (error_status == ECANCELED) - return_status = 0; - else if (error_status == -1) - return_status = 0; - else - return_status = aio_return (aiocb_list_[index]); - - if (return_status == -1) - { - return_status = 0; // zero bytes transferred - - if (error_status == 0) // nonsense - ACE_ERROR ((LM_ERROR, - "%N:%l:(%P | %t)::%p\n", - "ACE_POSIX_AIOCB_Proactor::find_completed_aio:" - "<aio_return> failed\n")); - } - - asynch_result = result_list_[index]; aiocb_list_[index] = 0; @@ -1196,7 +1329,7 @@ ACE_POSIX_AIOCB_Proactor::register_and_start_aio (ACE_POSIX_Asynch_Result *resul if (ret_val != 0) // No free slot { - errno = EAGAIN; + errno = EAGAIN; ACE_ERROR_RETURN ((LM_ERROR, "%N:%l:(%P | %t)::\n" "register_and_start_aio: " @@ -1570,8 +1703,7 @@ ACE_POSIX_SIG_Proactor::ACE_POSIX_SIG_Proactor (const sigset_t signal_set, ACE_POSIX_SIG_Proactor::~ACE_POSIX_SIG_Proactor (void) { - // stop asynch accept task - this->get_asynch_pseudo_task().stop (); + this->close (); // @@ Enable the masked signals again. } diff --git a/ace/POSIX_Proactor.h b/ace/POSIX_Proactor.h index 8781218633f..e44ea545612 100644 --- a/ace/POSIX_Proactor.h +++ b/ace/POSIX_Proactor.h @@ -55,12 +55,14 @@ class ACE_Export ACE_POSIX_Proactor : public ACE_Proactor_Impl public: enum Proactor_Type { - PROACTOR_POSIX = 0, // base class type - PROACTOR_AIOCB = 1, - PROACTOR_SIG = 2, - PROACTOR_SUN = 3 + PROACTOR_POSIX = 0, // base class type + PROACTOR_AIOCB = 1, // aio_suspend() based + PROACTOR_SIG = 2, // signals notifications + PROACTOR_SUN = 3, // SUN specific aiowait() + PROACTOR_CB = 4 // callback notifications }; + enum SystemType // open for future extention { OS_UNDEFINED= 0x0000, @@ -281,6 +283,9 @@ public: /// Destructor. virtual ~ACE_POSIX_AIOCB_Proactor (void); + /// Close down the Proactor. + virtual int close (void); + /** * Dispatch a single set of events. If <wait_time> elapses before * any events occur, return 0. Return 1 on success i.e., when a @@ -342,10 +347,23 @@ protected: ACE_POSIX_AIOCB_Proactor (size_t nmaxop, ACE_POSIX_Proactor::Proactor_Type ptype); + /// Check AIO for completion, error and result status + /// Return: 1 - AIO completed , 0 - not completed yet + virtual int get_result_status ( ACE_POSIX_Asynch_Result* asynch_result, + int & error_status, + int & return_status ); /// Task to process pseudo-asynchronous operations ACE_Asynch_Pseudo_Task & get_asynch_pseudo_task(); + /// Create aiocb list + int create_result_aiocb_list (void); + + /// Call this method from derived class when virtual table is + /// built. + int delete_result_aiocb_list (void); + + /// Call these methods from derived class when virtual table is /// built. void create_notify_manager (void); diff --git a/ace/SUN_Proactor.cpp b/ace/SUN_Proactor.cpp index 5d674493565..baeac28096d 100644 --- a/ace/SUN_Proactor.cpp +++ b/ace/SUN_Proactor.cpp @@ -30,11 +30,7 @@ ACE_SUN_Proactor::ACE_SUN_Proactor (size_t max_aio_operations) // Destructor. ACE_SUN_Proactor::~ACE_SUN_Proactor (void) { - // stop asynch accept task - this->get_asynch_pseudo_task ().stop (); - - // to provide correct virtual calls - delete_notify_manager (); + this->close (); } int @@ -151,84 +147,96 @@ ACE_SUN_Proactor::handle_events (u_long milli_seconds) } +int +ACE_SUN_Proactor::get_result_status (ACE_POSIX_Asynch_Result* asynch_result, + int &error_status, + int &return_status) +{ + + // Get the error status of the aio_ operation. + error_status = asynch_result->aio_resultp.aio_errno; + return_status = asynch_result->aio_resultp.aio_return; + + // ****** from Sun man pages ********************* + // Upon completion of the operation both aio_return and aio_errno + // are set to reflect the result of the operation. + // AIO_INPROGRESS is not a value used by the system + // so the client may detect a change in state + // by initializing aio_return to this value. + + if (return_status == AIO_INPROGRESS || error_status == EINPROGRESS) + { + return_status = 0; + return 0; // not completed + } + +#if 0 + if (error_status == -1) // should never be + ACE_ERROR ((LM_ERROR, + "%N:%l:(%P | %t)::%p\n", + "ACE_SUN_Proactor::get_result_status:" + "<aio_errno> has failed\n")); +#endif /* 0 */ + + if (return_status < 0) + { + return_status = 0; // zero bytes transferred +#if 0 + if (error_status == 0) // nonsense + ACE_ERROR ((LM_ERROR, + "%N:%l:(%P | %t)::%p\n", + "ACE_SUN_Proactor::get_result_status:" + "<aio_return> failed\n")); +#endif /* 0 */ + } + + return 1; // completed +} + ACE_POSIX_Asynch_Result * ACE_SUN_Proactor::find_completed_aio (aio_result_t *result, - int &error_status, - int &return_status) + int &error_status, + int &return_status) { ACE_MT (ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, mutex_, 0)); size_t ai; error_status = -1; - return_status= 0; + return_status = 0; - // we call find_completed_aio always with result != 0 + // we call find_completed_aio always with result != 0 for (ai = 0; ai < aiocb_list_max_size_; ai++) if (aiocb_list_[ai] !=0 && //check for non zero result == &aiocb_list_[ai]->aio_resultp) break; - - if (ai >= aiocb_list_max_size_) // not found - return 0; - error_status = result->aio_errno; - return_status= result->aio_return; + if (ai >= aiocb_list_max_size_) // not found + return 0; // means somebody else uses aio directly!!! - if (error_status == -1) // should never be - { + ACE_POSIX_Asynch_Result *asynch_result = result_list_[ai]; + + if (this->get_result_status (asynch_result, + error_status, + return_status) == 0) + { // should never be +#if 0 ACE_ERROR ((LM_ERROR, "%N:%l:(%P | %t)::%p\n", "ACE_SUN_Proactor::find_completed_aio:" - "<aio_errno> has failed\n")); - - return_status = 0; - - // we should notify user, otherwise : - // memory leak for result and "hanging" user - // what was before : - - // aiocb_list_[ai] = 0; - // result_list_[ai] = 0; - // aiocb_list_cur_size_--; - // return 0; + "should never be !!!\n")); +#endif /* 0 */ + return 0; } - switch (error_status) - { - case EINPROGRESS : // should never be - case AIO_INPROGRESS : // according to SUN doc - return 0; - - case ECANCELED : // canceled - return_status = 0; - break; - - case 0 : // no error - if (return_status == -1) // return_status should be >= 0 - { - ACE_ERROR ((LM_ERROR, - "%N:%l:(%P | %t)::%p\n", - "ACE_SUN_Proactor::find_completed_aio:" - "<aio_return> failed\n")); - - return_status = 0; // zero bytes transferred - } - break; - - default : // other errors - if (return_status == -1) // normal status for I/O Error - return_status = 0; // zero bytes transferred - break; - } - - ACE_POSIX_Asynch_Result *asynch_result = result_list_[ai]; + if (return_status < 0) + return_status = 0; // zero bytes transferred aiocb_list_[ai] = 0; result_list_[ai] = 0; aiocb_list_cur_size_--; - num_started_aio_ --; + num_started_aio_--; start_deferred_aio (); //make attempt to start deferred AIO @@ -250,23 +258,21 @@ ACE_SUN_Proactor::start_aio (ACE_POSIX_Asynch_Result *result) int ret_val; const ACE_TCHAR *ptype; - // Start IO + // ****** from Sun man pages ********************* + // Upon completion of the operation both aio_return and aio_errno + // are set to reflect the result of the operation. + // AIO_INPROGRESS is not a value used by the system + // so the client may detect a change in state + // by initializing aio_return to this value. + result->aio_resultp.aio_return = AIO_INPROGRESS; + result->aio_resultp.aio_errno = EINPROGRESS; + // Start IO switch (result->aio_lio_opcode) { case LIO_READ : - ptype = "read"; + ptype = ACE_LIB_TEXT ("read"); ret_val = aioread (result->aio_fildes, - (char *) result->aio_buf, - result->aio_nbytes, - result->aio_offset, - SEEK_SET, - &result->aio_resultp); - break; - - case LIO_WRITE : - ptype = "write"; - ret_val = aiowrite (result->aio_fildes, (char *) result->aio_buf, result->aio_nbytes, result->aio_offset, @@ -274,27 +280,37 @@ ACE_SUN_Proactor::start_aio (ACE_POSIX_Asynch_Result *result) &result->aio_resultp); break; + case LIO_WRITE : + ptype = ACE_LIB_TEXT ("write"); + ret_val = aiowrite (result->aio_fildes, + (char *) result->aio_buf, + result->aio_nbytes, + result->aio_offset, + SEEK_SET, + &result->aio_resultp); + break; + default: - ptype = "?????"; + ptype = ACE_LIB_TEXT ("?????"); ret_val = -1; break; } - + if (ret_val == 0) { - num_started_aio_ ++ ; - if (num_started_aio_ == 1) // wake up condition + num_started_aio_++; + if (num_started_aio_ == 1) // wake up condition condition_.broadcast (); } else // if (ret_val == -1) { if (errno == EAGAIN) //try later, it will be deferred AIO - ret_val = 1 ; + ret_val = 1; else ACE_ERROR ((LM_ERROR, - "%N:%l:(%P | %t)::start_aio: aio%s %p\n", - ptype, - "queueing failed\n")); + ACE_LIB_TEXT ("%N:%l:(%P | %t)::start_aio: aio%s %p\n"), + ptype, + ACE_LIB_TEXT ("queueing failed\n"))); } return ret_val; diff --git a/ace/SUN_Proactor.h b/ace/SUN_Proactor.h index 3c14a243576..c1ddbaf0cbc 100644 --- a/ace/SUN_Proactor.h +++ b/ace/SUN_Proactor.h @@ -100,6 +100,12 @@ protected: /// From ACE_POSIX_AIOCB_Proactor. virtual int start_aio (ACE_POSIX_Asynch_Result *result); + /// Check AIO for completion, error and result status + /// Return: 1 - AIO completed , 0 - not completed yet + virtual int get_result_status (ACE_POSIX_Asynch_Result* asynch_result, + int &error_status, + int &return_status); + /// Extract the results of aio. ACE_POSIX_Asynch_Result *find_completed_aio (aio_result_t *result, int &error_status, diff --git a/tests/Proactor_Test.cpp b/tests/Proactor_Test.cpp index 3dedb3e39d2..cb8470ef99b 100644 --- a/tests/Proactor_Test.cpp +++ b/tests/Proactor_Test.cpp @@ -47,6 +47,7 @@ ACE_RCSID (tests, #elif defined (ACE_HAS_AIO_CALLS) # include "ace/POSIX_Proactor.h" +# include "ace/POSIX_CB_Proactor.h" # include "ace/SUN_Proactor.h" #endif /* defined (ACE_WIN32) && !defined (ACE_HAS_WINCE) */ @@ -55,8 +56,9 @@ ACE_RCSID (tests, static int disable_signal (int sigmin, int sigmax); // Proactor Type (UNIX only, Win32 ignored) 0-default, 1 -AIOCB, -// 2-SIG, 3-SUN -static int proactor_type = 0; +// 2-SIG, 3-SUN, 4-CALLBACK +typedef enum { DEFAULT = 0, AIOCB, SIG, SUN, CALLBACK } ProactorType; +static ProactorType proactor_type = DEFAULT; // POSIX : > 0 max number aio operations proactor, static size_t max_aio_operations = 0; @@ -100,6 +102,15 @@ static ACE_TCHAR data[] = "Connection: Keep-Alive\r\n" "\r\n"; +class LogLocker +{ +public: + + LogLocker () { ACE_LOG_MSG->acquire (); } + virtual ~LogLocker () { ACE_LOG_MSG->release (); } +}; + + // ************************************************************* // MyTask is ACE_Task resposible for : // 1. creation and deletion of @@ -126,12 +137,12 @@ public: virtual int svc (void); int start (size_t num_threads, - int type_proactor, + ProactorType type_proactor, size_t max_op ); int stop (void); private: - int create_proactor (int type_proactor, + int create_proactor (ProactorType type_proactor, size_t max_op); int delete_proactor (void); @@ -142,7 +153,7 @@ private: }; int -MyTask::create_proactor (int type_proactor, size_t max_op) +MyTask::create_proactor (ProactorType type_proactor, size_t max_op) { ACE_GUARD_RETURN (ACE_SYNCH_RECURSIVE_MUTEX, monitor, @@ -171,7 +182,7 @@ MyTask::create_proactor (int type_proactor, size_t max_op) switch (type_proactor) { - case 1: + case AIOCB: ACE_NEW_RETURN (proactor, ACE_POSIX_AIOCB_Proactor (max_op), -1); @@ -179,7 +190,7 @@ MyTask::create_proactor (int type_proactor, size_t max_op) ACE_TEXT ("(%t) Create Proactor Type = AIOCB\n"))); break; - case 2: + case SIG: ACE_NEW_RETURN (proactor, ACE_POSIX_SIG_Proactor (max_op), -1); @@ -188,17 +199,25 @@ MyTask::create_proactor (int type_proactor, size_t max_op) break; # if defined (sun) - - case 3: + case SUN: ACE_NEW_RETURN (proactor, ACE_SUN_Proactor (max_op), -1); ACE_DEBUG ((LM_DEBUG, ACE_TEXT("(%t) Create Proactor Type = SUN\n"))); break; - # endif /* sun */ +# if defined (__sgi) + case 4: + ACE_NEW_RETURN (proactor, + ACE_POSIX_CB_Proactor (max_op), + -1); + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("(%t) Create Proactor Type = CALLBACK\n"))); + break; +# endif + default: ACE_NEW_RETURN (proactor, ACE_POSIX_SIG_Proactor (max_op), @@ -238,7 +257,7 @@ MyTask::delete_proactor (void) int MyTask::start (size_t num_threads, - int type_proactor, + ProactorType type_proactor, size_t max_op) { if (this->create_proactor (type_proactor, max_op) == -1) @@ -337,6 +356,7 @@ private: int initiate_read_stream (void); int initiate_write_stream (ACE_Message_Block &mb, int nbytes); int check_destroy (void); + void cancel (); Acceptor *acceptor_; int index_; @@ -347,6 +367,7 @@ private: ACE_SYNCH_RECURSIVE_MUTEX lock_; long io_count_; + int flg_cancel_; }; class Acceptor : public ACE_Asynch_Acceptor<Receiver> @@ -359,6 +380,7 @@ public: virtual ~Acceptor (void); void stop (void); + void cancel_all (void); // Virtual from ACE_Asynch_Acceptor Receiver *make_handler (void); @@ -387,6 +409,26 @@ Acceptor::~Acceptor (void) this->stop (); } + +void +Acceptor::cancel_all (void) +{ + // This method can be called only after proactor event loop is done + // in all threads. + + ACE_GUARD (ACE_SYNCH_RECURSIVE_MUTEX, monitor, this->lock_); + + this->cancel (); + + for (int i = 0; i < MAX_RECEIVERS; ++i) + { + if (this->list_receivers_[i] != 0) + this->list_receivers_[i]->cancel (); + } + return; +} + + void Acceptor::stop (void) { @@ -455,7 +497,8 @@ Receiver::Receiver (Acceptor * acceptor, int index) : acceptor_ (acceptor), index_ (index), handle_ (ACE_INVALID_HANDLE), - io_count_ (0) + io_count_ (0), + flg_cancel_ (0) { if (this->acceptor_ != 0) this->acceptor_->on_new_receiver (*this); @@ -488,6 +531,19 @@ Receiver::check_destroy (void) return 0; } + +void +Receiver::cancel () +{ + ACE_GUARD (ACE_SYNCH_RECURSIVE_MUTEX, monitor, this->lock_); + + this->flg_cancel_ = 1; + this->ws_.cancel (); + this->rs_.cancel (); + return; +} + + void Receiver::open (ACE_HANDLE handle, ACE_Message_Block &) { @@ -512,6 +568,9 @@ Receiver::initiate_read_stream (void) { ACE_GUARD_RETURN (ACE_SYNCH_RECURSIVE_MUTEX, monitor, this->lock_, -1); + if (this->flg_cancel_ != 0) + return 0; + ACE_Message_Block *mb = 0; ACE_NEW_RETURN (mb, ACE_Message_Block (1024), //BUFSIZ + 1), @@ -535,6 +594,13 @@ int Receiver::initiate_write_stream (ACE_Message_Block &mb, int nbytes) { ACE_GUARD_RETURN (ACE_SYNCH_RECURSIVE_MUTEX, monitor, this->lock_, -1); + + if (this->flg_cancel_ != 0) + { + mb.release (); + return -1; + } + if (nbytes <= 0) { mb.release (); @@ -568,6 +634,8 @@ Receiver::handle_read_stream (const ACE_Asynch_Read_Stream::Result &result) || result.bytes_transferred () == 0 || result.error () != 0) { + LogLocker log_lock; + ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("**** Receiver::handle_read_stream() SessionId = %d ****\n"), this->index_)); @@ -639,6 +707,8 @@ Receiver::handle_write_stream (const ACE_Asynch_Write_Stream::Result &result) result.bytes_transferred () == 0 || result.error () != 0) { + LogLocker log_lock; + //mb.rd_ptr () [0] = '\0'; mb.rd_ptr (mb.rd_ptr () - result.bytes_transferred ()); @@ -725,6 +795,7 @@ private: int check_destroy (void); int initiate_read_stream (void); int initiate_write_stream (void); + void cancel (); int index_; Connector * connector_; @@ -738,7 +809,7 @@ private: ACE_SYNCH_RECURSIVE_MUTEX lock_; long io_count_; - + int flg_cancel_; }; class Connector : public ACE_Asynch_Connector<Sender> @@ -752,6 +823,7 @@ public: int start (const ACE_INET_Addr &addr, int num); void stop (void); + void cancel_all (void); // Virtual from ACE_Asynch_Connector Sender *make_handler (void); @@ -781,6 +853,24 @@ Connector::~Connector (void) this->stop (); } + +void +Connector::cancel_all(void) +{ + // This method can be called only after proactor event loop is done + // in all threads. + ACE_GUARD (ACE_SYNCH_RECURSIVE_MUTEX, monitor, this->lock_); + + this->cancel (); + + for (int i = 0; i < MAX_SENDERS; ++i) + { + if (this->list_senders_[i] != 0) + this->list_senders_[i]->cancel (); + } + return; +} + void Connector::stop (void) { @@ -890,7 +980,8 @@ Sender::Sender (Connector * connector, int index) : index_ (index), connector_ (connector), handle_ (ACE_INVALID_HANDLE), - io_count_ (0) + io_count_ (0), + flg_cancel_ (0) { if (this->connector_ != 0) this->connector_->on_new_sender (*this); @@ -928,6 +1019,18 @@ Sender::check_destroy (void) } void +Sender::cancel () +{ + ACE_GUARD (ACE_SYNCH_RECURSIVE_MUTEX, monitor, this->lock_); + + this->flg_cancel_ = 1; + this->ws_.cancel (); + this->rs_.cancel (); + return; +} + + +void Sender::open (ACE_HANDLE handle, ACE_Message_Block &) { this->handle_ = handle; @@ -959,6 +1062,9 @@ Sender::initiate_write_stream (void) { ACE_GUARD_RETURN (ACE_SYNCH_RECURSIVE_MUTEX, monitor, this->lock_, -1); + if (this->flg_cancel_ != 0) + return -1; + size_t nbytes = ACE_OS::strlen (this->send_buf_); ACE_Message_Block *mb = 0; @@ -984,6 +1090,9 @@ Sender::initiate_read_stream (void) { ACE_GUARD_RETURN (ACE_SYNCH_RECURSIVE_MUTEX, monitor, this->lock_, -1); + if (this->flg_cancel_ != 0) + return -1; + ACE_Message_Block *mb = 0; ACE_NEW_RETURN (mb, ACE_Message_Block (1024 + 1), -1); @@ -1010,6 +1119,8 @@ Sender::handle_write_stream (const ACE_Asynch_Write_Stream::Result &result) || result.bytes_transferred () == 0 || result.error () != 0) { + LogLocker log_lock; + // Reset pointers. //mb.rd_ptr()[0] ='\0'; mb.rd_ptr (mb.rd_ptr () - result.bytes_transferred ()); @@ -1080,6 +1191,8 @@ Sender::handle_read_stream (const ACE_Asynch_Read_Stream::Result &result) || result.bytes_transferred () == 0 || result.error () != 0) { + LogLocker log_lock; + // Reset pointers. mb.rd_ptr ()[result.bytes_transferred ()] = '\0'; @@ -1153,6 +1266,7 @@ print_usage (int /* argc */, ACE_TCHAR *argv[]) ACE_TEXT ("\n-t <Proactor type> UNIX-only, Win32-default always:") ACE_TEXT ("\n a AIOCB") ACE_TEXT ("\n i SIG") + ACE_TEXT ("\n c CALLBACK") ACE_TEXT ("\n s SUN") ACE_TEXT ("\n d default") ACE_TEXT ("\n-d <duplex mode 1-on/0-off>") @@ -1183,19 +1297,22 @@ set_proactor_type (const ACE_TCHAR *ptype) switch (toupper (*ptype)) { case 'D': - proactor_type = 0; + proactor_type = DEFAULT; return 1; case 'A': - proactor_type = 1; + proactor_type = AIOCB; return 1; case 'I': - proactor_type = 2; + proactor_type = SIG; return 1; #if defined (sun) case 'S': - proactor_type = 3; + proactor_type = SUN; return 1; #endif /* sun */ + case 'C': + proactor_type = CALLBACK; + return 1; default: break; } @@ -1208,14 +1325,18 @@ parse_args (int argc, ACE_TCHAR *argv[]) if (argc == 1) // no arguments , so one button test { both = 1; // client and server simultaneosly +#if defined(ACE_WIN32) || defined(sun) duplex = 1; // full duplex is on +#else // Linux,IRIX - weak AIO implementation + duplex = 0; // full duplex is off +#endif host = ACE_TEXT ("localhost"); // server to connect port = ACE_DEFAULT_SERVER_PORT; // port to connect/listen max_aio_operations = 512; // POSIX Proactor params #if defined (sun) - proactor_type = 3; // Proactor type for SunOS + proactor_type = SUN; // Proactor type for SunOS #else - proactor_type = 1; // Proactor type = default + proactor_type = AIOCB; // Proactor type = default #endif threads = 3; // size of Proactor thread pool senders = 20; // number of senders |