summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--ChangeLog29
-rw-r--r--ChangeLogs/ChangeLog-02a29
-rw-r--r--ChangeLogs/ChangeLog-03a29
-rw-r--r--ace/Makefile1
-rw-r--r--ace/POSIX_CB_Proactor.cpp199
-rw-r--r--ace/POSIX_CB_Proactor.h105
-rw-r--r--ace/POSIX_CB_Proactor.i8
-rw-r--r--ace/POSIX_Proactor.cpp356
-rw-r--r--ace/POSIX_Proactor.h26
-rw-r--r--ace/SUN_Proactor.cpp172
-rw-r--r--ace/SUN_Proactor.h6
-rw-r--r--tests/Proactor_Test.cpp161
12 files changed, 907 insertions, 214 deletions
diff --git a/ChangeLog b/ChangeLog
index 84d95eb23ec..7f17e2ad2b8 100644
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,3 +1,32 @@
+Thu Apr 25 15:46:39 2002 Steve Huston <shuston@riverace.com>
+
+ POSIX asynch I/O improvements/corrections submitted by Alex
+ Libman <alibman@ihug.com.au>:
+
+ * ace/POSIX_CB_Proactor.{h i cpp}: New POSIX Proactor implementation
+ that makes use of the AIO facility's callback feature. This has
+ only been tested on SGI Irix.
+
+ * ace/Makefile: Added POSIX_CB_Proactor.
+
+ * ace/POSIX_Proactor.{h cpp}: Refactored some code and added better
+ cancellation/cleanup handling. Added hooks for the new
+ ACE_POSIX_CB_Proactor class and refactored methods
+ close(), get_result_status(), create_result_aiocb_list(), and
+ delete_result_aiocb_list().
+
+ * ace/SUN_Proactor.{h cpp}: Add the new get_result_status() method
+ and refactored the old results code into the new method. Also,
+ make good use of the new aiocb create/delete methods. Improvements
+ to operation status detection based on input from Sun.
+
+ * tests/Proactor_Test.cpp: Add support for new ACE_POSIX_CB_Proactor.
+ Added a log lock to be sure all messages from one transaction are
+ logged together. Also, sets up full duplex I/O for Windows and
+ Solaris; half duplex for all others due to general weakness in
+ AIO subsystems. If further testing reveals that more can be set
+ to full duplex, this can be expanded.
+
Thu Apr 25 14:02:38 2002 Steve Huston <shuston@riverace.com>
* tests/MT_Reactor_Upcall_Test.cpp: Fixed compile errors by using
diff --git a/ChangeLogs/ChangeLog-02a b/ChangeLogs/ChangeLog-02a
index 84d95eb23ec..7f17e2ad2b8 100644
--- a/ChangeLogs/ChangeLog-02a
+++ b/ChangeLogs/ChangeLog-02a
@@ -1,3 +1,32 @@
+Thu Apr 25 15:46:39 2002 Steve Huston <shuston@riverace.com>
+
+ POSIX asynch I/O improvements/corrections submitted by Alex
+ Libman <alibman@ihug.com.au>:
+
+ * ace/POSIX_CB_Proactor.{h i cpp}: New POSIX Proactor implementation
+ that makes use of the AIO facility's callback feature. This has
+ only been tested on SGI Irix.
+
+ * ace/Makefile: Added POSIX_CB_Proactor.
+
+ * ace/POSIX_Proactor.{h cpp}: Refactored some code and added better
+ cancellation/cleanup handling. Added hooks for the new
+ ACE_POSIX_CB_Proactor class and refactored methods
+ close(), get_result_status(), create_result_aiocb_list(), and
+ delete_result_aiocb_list().
+
+ * ace/SUN_Proactor.{h cpp}: Add the new get_result_status() method
+ and refactored the old results code into the new method. Also,
+ make good use of the new aiocb create/delete methods. Improvements
+ to operation status detection based on input from Sun.
+
+ * tests/Proactor_Test.cpp: Add support for new ACE_POSIX_CB_Proactor.
+ Added a log lock to be sure all messages from one transaction are
+ logged together. Also, sets up full duplex I/O for Windows and
+ Solaris; half duplex for all others due to general weakness in
+ AIO subsystems. If further testing reveals that more can be set
+ to full duplex, this can be expanded.
+
Thu Apr 25 14:02:38 2002 Steve Huston <shuston@riverace.com>
* tests/MT_Reactor_Upcall_Test.cpp: Fixed compile errors by using
diff --git a/ChangeLogs/ChangeLog-03a b/ChangeLogs/ChangeLog-03a
index 84d95eb23ec..7f17e2ad2b8 100644
--- a/ChangeLogs/ChangeLog-03a
+++ b/ChangeLogs/ChangeLog-03a
@@ -1,3 +1,32 @@
+Thu Apr 25 15:46:39 2002 Steve Huston <shuston@riverace.com>
+
+ POSIX asynch I/O improvements/corrections submitted by Alex
+ Libman <alibman@ihug.com.au>:
+
+ * ace/POSIX_CB_Proactor.{h i cpp}: New POSIX Proactor implementation
+ that makes use of the AIO facility's callback feature. This has
+ only been tested on SGI Irix.
+
+ * ace/Makefile: Added POSIX_CB_Proactor.
+
+ * ace/POSIX_Proactor.{h cpp}: Refactored some code and added better
+ cancellation/cleanup handling. Added hooks for the new
+ ACE_POSIX_CB_Proactor class and refactored methods
+ close(), get_result_status(), create_result_aiocb_list(), and
+ delete_result_aiocb_list().
+
+ * ace/SUN_Proactor.{h cpp}: Add the new get_result_status() method
+ and refactored the old results code into the new method. Also,
+ make good use of the new aiocb create/delete methods. Improvements
+ to operation status detection based on input from Sun.
+
+ * tests/Proactor_Test.cpp: Add support for new ACE_POSIX_CB_Proactor.
+ Added a log lock to be sure all messages from one transaction are
+ logged together. Also, sets up full duplex I/O for Windows and
+ Solaris; half duplex for all others due to general weakness in
+ AIO subsystems. If further testing reveals that more can be set
+ to full duplex, this can be expanded.
+
Thu Apr 25 14:02:38 2002 Steve Huston <shuston@riverace.com>
* tests/MT_Reactor_Upcall_Test.cpp: Fixed compile errors by using
diff --git a/ace/Makefile b/ace/Makefile
index 5133d8396d2..58a627a9349 100644
--- a/ace/Makefile
+++ b/ace/Makefile
@@ -94,6 +94,7 @@ DEMUX_FILES = \
FlReactor \
Msg_WFMO_Reactor \
POSIX_Proactor \
+ POSIX_CB_Proactor \
WIN32_Proactor \
Priority_Reactor \
Proactor \
diff --git a/ace/POSIX_CB_Proactor.cpp b/ace/POSIX_CB_Proactor.cpp
new file mode 100644
index 00000000000..8aa479d168c
--- /dev/null
+++ b/ace/POSIX_CB_Proactor.cpp
@@ -0,0 +1,199 @@
+/* -*- C++ -*- */
+// $Id$
+
+#include "ace/POSIX_CB_Proactor.h"
+
+#if defined (ACE_HAS_AIO_CALLS) && defined (__sgi)
+
+#include "ace/Task_T.h"
+#include "ace/Log_Msg.h"
+#include "ace/Object_Manager.h"
+
+#if !defined (__ACE_INLINE__)
+#include "ace/POSIX_CB_Proactor.i"
+#endif /* __ACE_INLINE__ */
+
+ACE_POSIX_CB_Proactor::ACE_POSIX_CB_Proactor (size_t max_aio_operations)
+ : ACE_POSIX_AIOCB_Proactor (max_aio_operations,
+ ACE_POSIX_Proactor::PROACTOR_CB),
+ sema_ (0)
+{
+ // we should start pseudo-asynchronous accept task
+ // one per all future acceptors
+
+ this->get_asynch_pseudo_task ().start ();
+}
+
+// Destructor.
+ACE_POSIX_CB_Proactor::~ACE_POSIX_CB_Proactor (void)
+{
+ this->close ();
+}
+
+int
+ACE_POSIX_CB_Proactor::handle_events (ACE_Time_Value &wait_time)
+{
+ // Decrement <wait_time> with the amount of time spent in the method
+ ACE_Countdown_Time countdown (&wait_time);
+ return this->handle_events (wait_time.msec ());
+}
+
+int
+ACE_POSIX_CB_Proactor::handle_events (void)
+{
+ return this->handle_events (ACE_INFINITE);
+}
+
+int
+ACE_POSIX_CB_Proactor::handle_events (unsigned long milli_seconds)
+{
+
+ int result_wait=0;
+
+ // Wait for the signals.
+ if (milli_seconds == ACE_INFINITE)
+ {
+ result_wait = this->sema_.acquire();
+ }
+ else
+ {
+ // Wait for <milli_seconds> amount of time.
+ ACE_Time_Value abs_time = ACE_OS::gettimeofday ()
+ + ACE_Time_Value ( milli_seconds);
+
+ result_wait = this->sema_.acquire(abs_time);
+ }
+
+
+
+ // Check for errors
+ // but let continue work in case of errors
+ // we should check "post_completed" queue
+ if ( result_wait == -1)
+ {
+ if (errno != ETIME && // timeout
+ errno != EINTR ) // interrupted system call
+ ACE_ERROR ((LM_ERROR,
+ "%N:%l:(%P | %t)::%p\n",
+ "ACE_POSIX_CB_Proactor::handle_events:"
+ "semaphore acquire failed"
+ ));
+ }
+
+ size_t index = 0; // start index to scan aiocb list
+ size_t count = aiocb_list_max_size_; // max number to iterate
+
+ int error_status = 0;
+ int return_status = 0;
+
+ int ret_aio = 0;
+ int ret_que = 0;
+
+ for (;; ret_aio++)
+ {
+ ACE_POSIX_Asynch_Result * asynch_result =
+ find_completed_aio (error_status,
+ return_status,
+ index,
+ count);
+
+ if (asynch_result == 0)
+ break;
+
+ // Call the application code.
+ this->application_specific_code (asynch_result,
+ return_status, // Bytes transferred.
+ 1, // Success
+ 0, // No completion key.
+ error_status); // Error
+ }
+
+ // process post_completed results
+ ret_que = this->process_result_queue ();
+
+ // Uncomment this if you want to test
+ // and research the behavior of you system
+ // ACE_DEBUG ((LM_DEBUG,
+ // "(%t) NumAIO=%d NumQueue=%d\n",
+ // ret_aio, ret_que));
+
+ return ret_aio + ret_que > 0 ? 1 : 0;
+}
+
+void ACE_POSIX_CB_Proactor::aio_completion_func ( sigval_t cb_data )
+{
+#if defined (__FreeBSD__)
+ ACE_POSIX_CB_Proactor * impl = ACE_static_cast (ACE_POSIX_CB_Proactor *, cb_data.sigval_ptr);
+#else
+ ACE_POSIX_CB_Proactor * impl = ACE_static_cast (ACE_POSIX_CB_Proactor *, cb_data.sival_ptr);
+#endif
+
+ if ( impl != 0 )
+ impl->notify_completion (0);
+}
+
+int
+ACE_POSIX_CB_Proactor::notify_completion(int sig_num)
+{
+ ACE_UNUSED_ARG (sig_num);
+
+ return this->sema_.release();
+}
+
+
+int
+ACE_POSIX_CB_Proactor::allocate_aio_slot (ACE_POSIX_Asynch_Result *result)
+{
+ size_t i = 0;
+
+ //try to find free slot as usual, starting from 0
+ for (i = 0; i < this->aiocb_list_max_size_; i++)
+ if (result_list_[i] == 0)
+ break;
+
+ if (i >= this->aiocb_list_max_size_)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "%N:%l:(%P | %t)::\n"
+ "ACE_POSIX_CB_Proactor::allocate_aio_slot "
+ "internal Proactor error 1\n"),
+ -1);
+
+ int retval = ACE_static_cast (int, i);
+
+ // setup OS notification methods for this aio
+ // store index!!, not pointer in signal info
+ result->aio_sigevent.sigev_notify = SIGEV_CALLBACK;
+ result->aio_sigevent.sigev_func = aio_completion_func ;
+
+#if defined (__FreeBSD__)
+ result->aio_sigevent.sigev_value.sigval_ptr = this ;
+#else
+ result->aio_sigevent.sigev_value.sival_ptr = this ;
+#endif /* __FreeBSD__ */
+
+ return retval;
+}
+
+int
+ACE_POSIX_CB_Proactor::get_result_status ( ACE_POSIX_Asynch_Result* asynch_result,
+ int & error_status,
+ int & return_status )
+{
+ return ACE_POSIX_AIOCB_Proactor::get_result_status (asynch_result,
+ error_status,
+ return_status );
+}
+
+int
+ACE_POSIX_CB_Proactor::cancel_aiocb (ACE_POSIX_Asynch_Result *result)
+{
+ return ACE_POSIX_AIOCB_Proactor::cancel_aiocb (result);
+}
+
+int
+ACE_POSIX_CB_Proactor::cancel_aio (ACE_HANDLE handle)
+{
+ return ACE_POSIX_AIOCB_Proactor::cancel_aio (handle);
+}
+
+#endif /* ACE_HAS_AIO_CALLS && __sgi */
diff --git a/ace/POSIX_CB_Proactor.h b/ace/POSIX_CB_Proactor.h
new file mode 100644
index 00000000000..e8315e3c8d2
--- /dev/null
+++ b/ace/POSIX_CB_Proactor.h
@@ -0,0 +1,105 @@
+/* -*- C++ -*- */
+
+//=============================================================================
+/**
+ * @file POSIX_CB_Proactor.h
+ *
+ * $Id$
+ *
+ * @author Alexander Libman <alibman@ihug.com.au>
+ */
+//=============================================================================
+
+#ifndef ACE_POSIX_CB_PROACTOR_H
+#define ACE_POSIX_CB_PROACTOR_H
+
+#include "ace/config-all.h"
+
+#if !defined (ACE_LACKS_PRAGMA_ONCE)
+# pragma once
+#endif /* ACE_LACKS_PRAGMA_ONCE */
+
+#if defined (ACE_HAS_AIO_CALLS) && defined (__sgi)
+
+#include "ace/POSIX_Proactor.h"
+
+/**
+ * @class ACE_POSIX_CB_Proactor
+ *
+ * @brief Implementation of SGI IRIX Proactor
+ * };
+ */
+class ACE_Export ACE_POSIX_CB_Proactor : public ACE_POSIX_AIOCB_Proactor
+{
+
+public:
+ virtual Proactor_Type get_impl_type (void);
+
+ /// Destructor.
+ virtual ~ACE_POSIX_CB_Proactor (void);
+
+ /// Constructor defines max number asynchronous operations that can
+ /// be started at the same time.
+ ACE_POSIX_CB_Proactor (size_t max_aio_operations = ACE_AIO_DEFAULT_SIZE);
+
+protected:
+
+ static void aio_completion_func ( sigval_t cb_data );
+
+ /**
+ * Dispatch a single set of events. If <wait_time> elapses before
+ * any events occur, return 0. Return 1 on success i.e., when a
+ * completion is dispatched, non-zero (-1) on errors and errno is
+ * set accordingly.
+ */
+ virtual int handle_events (ACE_Time_Value &wait_time);
+
+ /**
+ * Dispatch a single set of events. If <milli_seconds> elapses
+ * before any events occur, return 0. Return 1 if a completion is
+ * dispatched. Return -1 on errors.
+ */
+ virtual int handle_events (u_long milli_seconds);
+
+ /**
+ * Block indefinitely until at least one event is dispatched.
+ * Dispatch a single set of events. If <wait_time> elapses before
+ * any events occur, return 0. Return 1 on success i.e., when a
+ * completion is dispatched, non-zero (-1) on errors and errno is
+ * set accordingly.
+ */
+ virtual int handle_events (void);
+
+
+ /// Check AIO for completion, error and result status
+ /// Return: 1 - AIO completed , 0 - not completed yet
+ virtual int get_result_status ( ACE_POSIX_Asynch_Result* asynch_result,
+ int & error_status,
+ int & return_status );
+
+
+
+ /// From ACE_POSIX_AIOCB_Proactor.
+ /// Attempt to cancel running request
+ virtual int cancel_aiocb (ACE_POSIX_Asynch_Result *result);
+ virtual int cancel_aio (ACE_HANDLE handle);
+
+ /// Find free slot to store result and aiocb pointer
+ virtual int allocate_aio_slot (ACE_POSIX_Asynch_Result *result);
+
+ /// Notify queue of "post_completed" ACE_POSIX_Asynch_Results
+ /// called from post_completion method
+ virtual int notify_completion (int sig_num);
+
+
+ /// semaphore variable to notify
+ /// used to wait the first AIO start
+ ACE_SYNCH_SEMAPHORE sema_;
+};
+
+#if defined (__ACE_INLINE__)
+#include "ace/POSIX_CB_Proactor.i"
+#endif /* __ACE_INLINE__ */
+
+#endif /* ACE_HAS_AIO_CALLS && __sgi */
+#endif /* ACE_POSIX_CB_PROACTOR_H*/
diff --git a/ace/POSIX_CB_Proactor.i b/ace/POSIX_CB_Proactor.i
new file mode 100644
index 00000000000..6cc8655ddba
--- /dev/null
+++ b/ace/POSIX_CB_Proactor.i
@@ -0,0 +1,8 @@
+/* -*- C++ -*- */
+// $Id$
+
+ACE_INLINE
+ACE_POSIX_Proactor::Proactor_Type ACE_POSIX_CB_Proactor::get_impl_type (void)
+{
+ return PROACTOR_CB;
+}
diff --git a/ace/POSIX_Proactor.cpp b/ace/POSIX_Proactor.cpp
index 381d7521303..c3fba575a62 100644
--- a/ace/POSIX_Proactor.cpp
+++ b/ace/POSIX_Proactor.cpp
@@ -562,23 +562,51 @@ ACE_AIOCB_Notify_Pipe_Manager::ACE_AIOCB_Notify_Pipe_Manager (ACE_POSIX_AIOCB_Pr
ACE_AIOCB_Notify_Pipe_Manager::~ACE_AIOCB_Notify_Pipe_Manager (void)
{
+ // 1. try to cancel pending aio
+ this->read_stream_.cancel ();
+
+ // 2. close both handles
+ // Destuctor of ACE_Pipe does not close handles.
+ // We can not use ACE_Pipe::close() as it
+ // closes read_handle and than write_handle.
+ // In some systems close() may wait for
+ // completion for all asynch. pending requests.
+ // So we should close write_handle firstly
+ // to force read completion ( if 1. does not help )
+ // and then read_handle and not vice versa
+
+ ACE_HANDLE h = this->pipe_.write_handle ();
+ if (h != ACE_INVALID_HANDLE)
+ ACE_OS::closesocket (h);
+
+ h = this->pipe_.read_handle ();
+ if ( h != ACE_INVALID_HANDLE)
+ ACE_OS::closesocket (h);
+
}
+
int
ACE_AIOCB_Notify_Pipe_Manager::notify ()
{
// Send the result pointer through the pipe.
char char_send = 0;
- int ret_val = ACE::send (this->pipe_.write_handle (),
- & char_send ,
- sizeof (char_send));
+ int ret_val = ACE::send (this->pipe_.write_handle (),
+ &char_send,
+ sizeof (char_send));
+
+ if (ret_val < 0)
+ {
+ if (errno != EWOULDBLOCK)
+#if 0
+ ACE_ERROR ((LM_ERROR,
+ ACE_LIB_TEXT ("(%P %t):%p\n"),
+ ACE_LIB_TEXT ("ACE_AIOCB_Notify_Pipe_Manager::notify")
+ ACE_LIB_TEXT ("Error:Writing on to notify pipe failed")));
+#endif /* 0 */
+ return -1;
+ }
- if (ret_val < 0 && errno != EWOULDBLOCK)
- ACE_ERROR_RETURN ((LM_ERROR,
- "(%P %t):%p\n",
- "ACE_AIOCB_Notify_Pipe_Manager::notify"
- "Error:Writing on to notify pipe failed"),
- -1);
return 0;
}
@@ -595,15 +623,14 @@ ACE_AIOCB_Notify_Pipe_Manager::handle_read_stream
// One accept has completed. Issue a read to handle any
// <post_completion>s in the future.
- if (this->read_stream_.read (this->message_block_,
- 1, // enough to read 1 byte
- 0, // ACT
- 0) // Priority
- == -1)
+ if (-1 == this->read_stream_.read (this->message_block_,
+ 1, // enough to read 1 byte
+ 0, // ACT
+ 0)) // Priority
ACE_ERROR ((LM_ERROR,
- "%N:%l:(%P | %t):%p\n",
- "ACE_AIOCB_Notify_Pipe_Manager::handle_read_stream:"
- "Read from pipe failed"));
+ ACE_LIB_TEXT ("%N:%l:(%P | %t):%p\n"),
+ ACE_LIB_TEXT ("ACE_AIOCB_Notify_Pipe_Manager::handle_read_stream:")
+ ACE_LIB_TEXT ("Read from pipe failed")));
// 2. Do the upcalls
@@ -621,22 +648,12 @@ ACE_POSIX_AIOCB_Proactor::ACE_POSIX_AIOCB_Proactor (size_t max_aio_operations)
num_deferred_aiocb_ (0),
num_started_aio_ (0)
{
- //check for correct value for max_aio_operations
+ // Check for correct value for max_aio_operations
check_max_aio_num ();
- ACE_NEW (aiocb_list_,
- aiocb *[aiocb_list_max_size_]);
- ACE_NEW (result_list_,
- ACE_POSIX_Asynch_Result *[aiocb_list_max_size_]);
+ this->create_result_aiocb_list ();
- // Initialize the array.
- for (size_t ai = 0; ai < this->aiocb_list_max_size_; ai++)
- {
- aiocb_list_[ai] = 0;
- result_list_[ai] = 0;
- }
-
- create_notify_manager ();
+ this->create_notify_manager ();
// start pseudo-asynchronous accept task
// one per all future acceptors
@@ -646,7 +663,7 @@ ACE_POSIX_AIOCB_Proactor::ACE_POSIX_AIOCB_Proactor (size_t max_aio_operations)
// Special protected constructor for ACE_SUN_Proactor
ACE_POSIX_AIOCB_Proactor::ACE_POSIX_AIOCB_Proactor (size_t max_aio_operations,
- ACE_POSIX_Proactor::Proactor_Type ptype)
+ ACE_POSIX_Proactor::Proactor_Type)
: aiocb_notify_pipe_manager_ (0),
aiocb_list_ (0),
result_list_ (0),
@@ -656,15 +673,49 @@ ACE_POSIX_AIOCB_Proactor::ACE_POSIX_AIOCB_Proactor (size_t max_aio_operations,
num_deferred_aiocb_ (0),
num_started_aio_ (0)
{
- ACE_UNUSED_ARG (ptype);
-
//check for correct value for max_aio_operations
- check_max_aio_num ();
+ this->check_max_aio_num ();
+
+ this->create_result_aiocb_list ();
+
+ // @@ We should create Notify_Pipe_Manager in the derived class to
+ // provide correct calls for virtual functions !!!
+}
+
+// Destructor.
+ACE_POSIX_AIOCB_Proactor::~ACE_POSIX_AIOCB_Proactor (void)
+{
+ this->close();
+}
+
+int
+ACE_POSIX_AIOCB_Proactor::close (void)
+{
+ // stop asynch accept task
+ this->get_asynch_pseudo_task().stop ();
+
+ this->delete_notify_manager ();
+
+ this->clear_result_queue ();
+
+ return this->delete_result_aiocb_list ();
+}
+
+void ACE_POSIX_AIOCB_Proactor::set_notify_handle (ACE_HANDLE h)
+{
+ notify_pipe_read_handle_ = h;
+}
- ACE_NEW (aiocb_list_,
- aiocb *[aiocb_list_max_size_]);
- ACE_NEW (result_list_,
- ACE_POSIX_Asynch_Result *[aiocb_list_max_size_]);
+int ACE_POSIX_AIOCB_Proactor::create_result_aiocb_list (void)
+{
+ if (aiocb_list_ != 0)
+ return 0;
+
+ ACE_NEW_RETURN (aiocb_list_, aiocb *[aiocb_list_max_size_], -1);
+
+ ACE_NEW_RETURN (result_list_,
+ ACE_POSIX_Asynch_Result *[aiocb_list_max_size_],
+ -1);
// Initialize the array.
for (size_t ai = 0; ai < this->aiocb_list_max_size_; ai++)
@@ -673,54 +724,99 @@ ACE_POSIX_AIOCB_Proactor::ACE_POSIX_AIOCB_Proactor (size_t max_aio_operations,
result_list_[ai] = 0;
}
-
- // @@ We should create Notify_Pipe_Manager in the derived class to
- // provide correct calls for virtual functions !!!
+ return 0;
}
-// Destructor.
-ACE_POSIX_AIOCB_Proactor::~ACE_POSIX_AIOCB_Proactor (void)
+int ACE_POSIX_AIOCB_Proactor::delete_result_aiocb_list (void)
{
- // stop asynch accept task
- this->get_asynch_pseudo_task().stop ();
+ if (aiocb_list_ == 0) // already deleted
+ return 0;
+
+ // try to cancel all uncomlpeted operarion
+ // POSIX systems may have hidden system threads
+ // that still can work with our aiocb's!
+ for (size_t ai = 0; ai < aiocb_list_max_size_; ai++)
+ {
+ if (this->aiocb_list_[ai] != 0) // active operation
+ this->cancel_aiocb (result_list_[ai]);
+ }
- delete_notify_manager ();
+ int num_pending = 0;
- // delete all uncomlpeted operarion
- // as nobody will notify client since now
for (size_t ai = 0; ai < aiocb_list_max_size_; ai++)
{
- delete result_list_[ai];
- result_list_[ai] = 0;
- aiocb_list_[ai] = 0;
+ if (this->aiocb_list_[ai] == 0 ) // not active operation
+ continue;
+
+ // Get the error and return status of the aio_ operation.
+ int error_status = 0;
+ int return_status = 0;
+ int flg_completed = this->get_result_status (result_list_[ai],
+ error_status,
+ return_status);
+
+ //don't delete uncompleted AIOCB's
+ if (flg_completed == 0) // not completed !!!
+ {
+ num_pending++;
+#if 0
+ char * errtxt = ACE_OS::strerror (error_status);
+ if (errtxt == 0)
+ errtxt ="?????????";
+
+ char * op = (aiocb_list_[ai]->aio_lio_opcode == LIO_WRITE )?
+ "WRITE":"READ" ;
+
+
+ ACE_ERROR ((LM_ERROR,
+ ACE_LIB_TEXT("slot=%d op=%s status=%d return=%d %s\n"),
+ ai,
+ op,
+ error_status,
+ return_status,
+ errtxt ));
+#endif /* 0 */
+ }
+ else // completed , OK
+ {
+ delete this->result_list_[ai];
+ this->result_list_[ai] = 0;
+ this->aiocb_list_[ai] = 0;
+ }
}
- delete [] aiocb_list_;
- aiocb_list_ = 0;
+ //if it is not possible cancel some operation (num_pending > 0 ),
+ //we can we do only one thing -report about this
+ //and complain about POSIX implementation
+ //we know that we have memory leaks,
+ //but it is better than segmentation fault!
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_LIB_TEXT("ACE_POSIX_AIOCB_Proactor::delete_result_aiocb_list\n")
+ ACE_LIB_TEXT(" number pending AIO=%d\n"),
+ num_pending
+ ));
- delete [] result_list_;
- result_list_ = 0;
+ delete [] this->aiocb_list_;
+ this->aiocb_list_ = 0;
- clear_result_queue ();
-}
+ delete [] this->result_list_;
+ this->result_list_ = 0;
-void ACE_POSIX_AIOCB_Proactor::set_notify_handle (ACE_HANDLE h)
-{
- notify_pipe_read_handle_ = h;
+ return (num_pending == 0 ? 0 : -1);
+ // ?? or just always return 0;
}
void ACE_POSIX_AIOCB_Proactor::check_max_aio_num ()
{
- long max_os_aio_num = ACE_OS ::sysconf (_SC_AIO_MAX);
+ long max_os_aio_num = ACE_OS::sysconf (_SC_AIO_MAX);
// Define max limit AIO's for concrete OS
// -1 means that there is no limit, but it is not true
// (example, SunOS 5.6)
- if (max_os_aio_num > 0
- && aiocb_list_max_size_ > (unsigned long) max_os_aio_num
- )
+ if (max_os_aio_num > 0 &&
+ aiocb_list_max_size_ > (unsigned long) max_os_aio_num)
aiocb_list_max_size_ = max_os_aio_num;
#if defined (HPUX)
@@ -729,7 +825,7 @@ void ACE_POSIX_AIOCB_Proactor::check_max_aio_num ()
// it has a limit 256 max elements for aio_suspend ()
// It is a pity, but ...
- long max_os_listio_num = ACE_OS ::sysconf (_SC_AIO_LISTIO_MAX);
+ long max_os_listio_num = ACE_OS::sysconf (_SC_AIO_LISTIO_MAX);
if (max_os_listio_num > 0
&& aiocb_list_max_size_ > (unsigned long) max_os_listio_num)
aiocb_list_max_size_ = max_os_listio_num;
@@ -762,6 +858,39 @@ void ACE_POSIX_AIOCB_Proactor::check_max_aio_num ()
"(%P | %t) ACE_POSIX_AIOCB_Proactor::Max Number of AIOs=%d\n",
aiocb_list_max_size_));
+#if defined(__sgi)
+
+ ACE_DEBUG((LM_DEBUG,
+ ACE_LIB_TEXT( "SGI IRIX specific: aio_init!\n")));
+
+//typedef struct aioinit {
+// int aio_threads; /* The number of aio threads to start (5) */
+// int aio_locks; /* Initial number of preallocated locks (3) */
+// int aio_num; /* estimated total simultanious aiobc structs (1000) */
+// int aio_usedba; /* Try to use DBA for raw I/O in lio_listio (0) */
+// int aio_debug; /* turn on debugging (0) */
+// int aio_numusers; /* max number of user sprocs making aio_* calls (5) */
+// int aio_reserved[3];
+//} aioinit_t;
+
+ aioinit_t aioinit;
+
+ aioinit.aio_threads = 10; /* The number of aio threads to start (5) */
+ aioinit.aio_locks = 20; /* Initial number of preallocated locks (3) */
+ /* estimated total simultaneous aiobc structs (1000) */
+ aioinit.aio_num = aiocb_list_max_size_;
+ aioinit.aio_usedba = 0; /* Try to use DBA for raw IO in lio_listio (0) */
+ aioinit.aio_debug = 0; /* turn on debugging (0) */
+ aioinit.aio_numusers = 100; /* max number of user sprocs making aio_* calls (5) */
+ aioinit.aio_reserved[0] = 0;
+ aioinit.aio_reserved[1] = 0;
+ aioinit.aio_reserved[2] = 0;
+
+ aio_sgi_init (&aioinit);
+
+#endif
+
+ return;
}
void
@@ -834,7 +963,10 @@ ACE_POSIX_AIOCB_Proactor::putq_result (ACE_POSIX_Asynch_Result *result)
"%N:%l:ACE_POSIX_AIOCB_Proactor::putq_result failed\n"),
-1);
- this->notify_completion (sig_num);
+ // let try not to overflow signal queue or notification pipe
+
+ if (this->result_queue_.size () == 1)
+ this->notify_completion (sig_num);
return 0;
}
@@ -1054,6 +1186,47 @@ ACE_POSIX_AIOCB_Proactor::handle_events (u_long milli_seconds)
return retval > 0 ? 1 : 0;
}
+int
+ACE_POSIX_AIOCB_Proactor::get_result_status (ACE_POSIX_Asynch_Result* asynch_result,
+ int &error_status,
+ int &return_status)
+{
+ return_status = 0;
+
+ // Get the error status of the aio_ operation.
+ error_status = aio_error (asynch_result);
+
+#if 0
+ if (error_status == -1) // <aio_error> itself has failed.
+ ACE_ERROR ((LM_ERROR,
+ "%N:%l:(%P | %t)::%p\n",
+ "ACE_POSIX_AIOCB_Proactor::get_result_status:"
+ "<aio_error> has failed\n"));
+#endif /* 0 */
+
+ if (error_status == EINPROGRESS)
+ {
+ return_status = 0;
+ return 0; // not completed
+ }
+
+ return_status = aio_return (asynch_result);
+
+ if (return_status < 0)
+ {
+ return_status = 0; // zero bytes transferred
+#if 0
+ if (error_status == 0) // nonsense
+ ACE_ERROR ((LM_ERROR,
+ "%N:%l:(%P | %t)::%p\n",
+ "ACE_POSIX_AIOCB_Proactor::get_result_status:"
+ "<aio_return> failed\n"));
+#endif /* 0 */
+ }
+
+ return 1; // completed
+}
+
ACE_POSIX_Asynch_Result *
ACE_POSIX_AIOCB_Proactor::find_completed_aio (int &error_status,
int &return_status,
@@ -1082,30 +1255,9 @@ ACE_POSIX_AIOCB_Proactor::find_completed_aio (int &error_status,
if (aiocb_list_[index] == 0) // Dont process null blocks.
continue;
- // Get the error status of the aio_ operation.
- error_status = aio_error (aiocb_list_[index]);
-
- if (error_status == -1) // <aio_error> itself has failed.
- {
- ACE_ERROR ((LM_ERROR,
- "%N:%l:(%P | %t)::%p\n",
- "ACE_POSIX_AIOCB_Proactor::find_completed_aio:"
- "<aio_error> has failed\n"));
-
- break;
-
- // we should notify user, otherwise :
- // memory leak for result and "hanging" user
- // what was before skip this operation
-
- //aiocb_list_[index] = 0;
- //result_list_[index] = 0;
- //aiocb_list_cur_size_--;
- //continue;
- }
-
- // Continue the loop if <aio_> operation is still in progress.
- if (error_status != EINPROGRESS)
+ if (0 != this->get_result_status (result_list_[index],
+ error_status,
+ return_status)) // completed
break;
} // end for
@@ -1113,25 +1265,6 @@ ACE_POSIX_AIOCB_Proactor::find_completed_aio (int &error_status,
if (count == 0) // all processed , nothing found
return asynch_result;
- if (error_status == ECANCELED)
- return_status = 0;
- else if (error_status == -1)
- return_status = 0;
- else
- return_status = aio_return (aiocb_list_[index]);
-
- if (return_status == -1)
- {
- return_status = 0; // zero bytes transferred
-
- if (error_status == 0) // nonsense
- ACE_ERROR ((LM_ERROR,
- "%N:%l:(%P | %t)::%p\n",
- "ACE_POSIX_AIOCB_Proactor::find_completed_aio:"
- "<aio_return> failed\n"));
- }
-
-
asynch_result = result_list_[index];
aiocb_list_[index] = 0;
@@ -1196,7 +1329,7 @@ ACE_POSIX_AIOCB_Proactor::register_and_start_aio (ACE_POSIX_Asynch_Result *resul
if (ret_val != 0) // No free slot
{
- errno = EAGAIN;
+ errno = EAGAIN;
ACE_ERROR_RETURN ((LM_ERROR,
"%N:%l:(%P | %t)::\n"
"register_and_start_aio: "
@@ -1570,8 +1703,7 @@ ACE_POSIX_SIG_Proactor::ACE_POSIX_SIG_Proactor (const sigset_t signal_set,
ACE_POSIX_SIG_Proactor::~ACE_POSIX_SIG_Proactor (void)
{
- // stop asynch accept task
- this->get_asynch_pseudo_task().stop ();
+ this->close ();
// @@ Enable the masked signals again.
}
diff --git a/ace/POSIX_Proactor.h b/ace/POSIX_Proactor.h
index 8781218633f..e44ea545612 100644
--- a/ace/POSIX_Proactor.h
+++ b/ace/POSIX_Proactor.h
@@ -55,12 +55,14 @@ class ACE_Export ACE_POSIX_Proactor : public ACE_Proactor_Impl
public:
enum Proactor_Type
{
- PROACTOR_POSIX = 0, // base class type
- PROACTOR_AIOCB = 1,
- PROACTOR_SIG = 2,
- PROACTOR_SUN = 3
+ PROACTOR_POSIX = 0, // base class type
+ PROACTOR_AIOCB = 1, // aio_suspend() based
+ PROACTOR_SIG = 2, // signals notifications
+ PROACTOR_SUN = 3, // SUN specific aiowait()
+ PROACTOR_CB = 4 // callback notifications
};
+
enum SystemType // open for future extention
{
OS_UNDEFINED= 0x0000,
@@ -281,6 +283,9 @@ public:
/// Destructor.
virtual ~ACE_POSIX_AIOCB_Proactor (void);
+ /// Close down the Proactor.
+ virtual int close (void);
+
/**
* Dispatch a single set of events. If <wait_time> elapses before
* any events occur, return 0. Return 1 on success i.e., when a
@@ -342,10 +347,23 @@ protected:
ACE_POSIX_AIOCB_Proactor (size_t nmaxop,
ACE_POSIX_Proactor::Proactor_Type ptype);
+ /// Check AIO for completion, error and result status
+ /// Return: 1 - AIO completed , 0 - not completed yet
+ virtual int get_result_status ( ACE_POSIX_Asynch_Result* asynch_result,
+ int & error_status,
+ int & return_status );
/// Task to process pseudo-asynchronous operations
ACE_Asynch_Pseudo_Task & get_asynch_pseudo_task();
+ /// Create aiocb list
+ int create_result_aiocb_list (void);
+
+ /// Call this method from derived class when virtual table is
+ /// built.
+ int delete_result_aiocb_list (void);
+
+
/// Call these methods from derived class when virtual table is
/// built.
void create_notify_manager (void);
diff --git a/ace/SUN_Proactor.cpp b/ace/SUN_Proactor.cpp
index 5d674493565..baeac28096d 100644
--- a/ace/SUN_Proactor.cpp
+++ b/ace/SUN_Proactor.cpp
@@ -30,11 +30,7 @@ ACE_SUN_Proactor::ACE_SUN_Proactor (size_t max_aio_operations)
// Destructor.
ACE_SUN_Proactor::~ACE_SUN_Proactor (void)
{
- // stop asynch accept task
- this->get_asynch_pseudo_task ().stop ();
-
- // to provide correct virtual calls
- delete_notify_manager ();
+ this->close ();
}
int
@@ -151,84 +147,96 @@ ACE_SUN_Proactor::handle_events (u_long milli_seconds)
}
+int
+ACE_SUN_Proactor::get_result_status (ACE_POSIX_Asynch_Result* asynch_result,
+ int &error_status,
+ int &return_status)
+{
+
+ // Get the error status of the aio_ operation.
+ error_status = asynch_result->aio_resultp.aio_errno;
+ return_status = asynch_result->aio_resultp.aio_return;
+
+ // ****** from Sun man pages *********************
+ // Upon completion of the operation both aio_return and aio_errno
+ // are set to reflect the result of the operation.
+ // AIO_INPROGRESS is not a value used by the system
+ // so the client may detect a change in state
+ // by initializing aio_return to this value.
+
+ if (return_status == AIO_INPROGRESS || error_status == EINPROGRESS)
+ {
+ return_status = 0;
+ return 0; // not completed
+ }
+
+#if 0
+ if (error_status == -1) // should never be
+ ACE_ERROR ((LM_ERROR,
+ "%N:%l:(%P | %t)::%p\n",
+ "ACE_SUN_Proactor::get_result_status:"
+ "<aio_errno> has failed\n"));
+#endif /* 0 */
+
+ if (return_status < 0)
+ {
+ return_status = 0; // zero bytes transferred
+#if 0
+ if (error_status == 0) // nonsense
+ ACE_ERROR ((LM_ERROR,
+ "%N:%l:(%P | %t)::%p\n",
+ "ACE_SUN_Proactor::get_result_status:"
+ "<aio_return> failed\n"));
+#endif /* 0 */
+ }
+
+ return 1; // completed
+}
+
ACE_POSIX_Asynch_Result *
ACE_SUN_Proactor::find_completed_aio (aio_result_t *result,
- int &error_status,
- int &return_status)
+ int &error_status,
+ int &return_status)
{
ACE_MT (ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, mutex_, 0));
size_t ai;
error_status = -1;
- return_status= 0;
+ return_status = 0;
- // we call find_completed_aio always with result != 0
+ // we call find_completed_aio always with result != 0
for (ai = 0; ai < aiocb_list_max_size_; ai++)
if (aiocb_list_[ai] !=0 && //check for non zero
result == &aiocb_list_[ai]->aio_resultp)
break;
-
- if (ai >= aiocb_list_max_size_) // not found
- return 0;
- error_status = result->aio_errno;
- return_status= result->aio_return;
+ if (ai >= aiocb_list_max_size_) // not found
+ return 0; // means somebody else uses aio directly!!!
- if (error_status == -1) // should never be
- {
+ ACE_POSIX_Asynch_Result *asynch_result = result_list_[ai];
+
+ if (this->get_result_status (asynch_result,
+ error_status,
+ return_status) == 0)
+ { // should never be
+#if 0
ACE_ERROR ((LM_ERROR,
"%N:%l:(%P | %t)::%p\n",
"ACE_SUN_Proactor::find_completed_aio:"
- "<aio_errno> has failed\n"));
-
- return_status = 0;
-
- // we should notify user, otherwise :
- // memory leak for result and "hanging" user
- // what was before :
-
- // aiocb_list_[ai] = 0;
- // result_list_[ai] = 0;
- // aiocb_list_cur_size_--;
- // return 0;
+ "should never be !!!\n"));
+#endif /* 0 */
+ return 0;
}
- switch (error_status)
- {
- case EINPROGRESS : // should never be
- case AIO_INPROGRESS : // according to SUN doc
- return 0;
-
- case ECANCELED : // canceled
- return_status = 0;
- break;
-
- case 0 : // no error
- if (return_status == -1) // return_status should be >= 0
- {
- ACE_ERROR ((LM_ERROR,
- "%N:%l:(%P | %t)::%p\n",
- "ACE_SUN_Proactor::find_completed_aio:"
- "<aio_return> failed\n"));
-
- return_status = 0; // zero bytes transferred
- }
- break;
-
- default : // other errors
- if (return_status == -1) // normal status for I/O Error
- return_status = 0; // zero bytes transferred
- break;
- }
-
- ACE_POSIX_Asynch_Result *asynch_result = result_list_[ai];
+ if (return_status < 0)
+ return_status = 0; // zero bytes transferred
aiocb_list_[ai] = 0;
result_list_[ai] = 0;
aiocb_list_cur_size_--;
- num_started_aio_ --;
+ num_started_aio_--;
start_deferred_aio ();
//make attempt to start deferred AIO
@@ -250,23 +258,21 @@ ACE_SUN_Proactor::start_aio (ACE_POSIX_Asynch_Result *result)
int ret_val;
const ACE_TCHAR *ptype;
- // Start IO
+ // ****** from Sun man pages *********************
+ // Upon completion of the operation both aio_return and aio_errno
+ // are set to reflect the result of the operation.
+ // AIO_INPROGRESS is not a value used by the system
+ // so the client may detect a change in state
+ // by initializing aio_return to this value.
+ result->aio_resultp.aio_return = AIO_INPROGRESS;
+ result->aio_resultp.aio_errno = EINPROGRESS;
+ // Start IO
switch (result->aio_lio_opcode)
{
case LIO_READ :
- ptype = "read";
+ ptype = ACE_LIB_TEXT ("read");
ret_val = aioread (result->aio_fildes,
- (char *) result->aio_buf,
- result->aio_nbytes,
- result->aio_offset,
- SEEK_SET,
- &result->aio_resultp);
- break;
-
- case LIO_WRITE :
- ptype = "write";
- ret_val = aiowrite (result->aio_fildes,
(char *) result->aio_buf,
result->aio_nbytes,
result->aio_offset,
@@ -274,27 +280,37 @@ ACE_SUN_Proactor::start_aio (ACE_POSIX_Asynch_Result *result)
&result->aio_resultp);
break;
+ case LIO_WRITE :
+ ptype = ACE_LIB_TEXT ("write");
+ ret_val = aiowrite (result->aio_fildes,
+ (char *) result->aio_buf,
+ result->aio_nbytes,
+ result->aio_offset,
+ SEEK_SET,
+ &result->aio_resultp);
+ break;
+
default:
- ptype = "?????";
+ ptype = ACE_LIB_TEXT ("?????");
ret_val = -1;
break;
}
-
+
if (ret_val == 0)
{
- num_started_aio_ ++ ;
- if (num_started_aio_ == 1) // wake up condition
+ num_started_aio_++;
+ if (num_started_aio_ == 1) // wake up condition
condition_.broadcast ();
}
else // if (ret_val == -1)
{
if (errno == EAGAIN) //try later, it will be deferred AIO
- ret_val = 1 ;
+ ret_val = 1;
else
ACE_ERROR ((LM_ERROR,
- "%N:%l:(%P | %t)::start_aio: aio%s %p\n",
- ptype,
- "queueing failed\n"));
+ ACE_LIB_TEXT ("%N:%l:(%P | %t)::start_aio: aio%s %p\n"),
+ ptype,
+ ACE_LIB_TEXT ("queueing failed\n")));
}
return ret_val;
diff --git a/ace/SUN_Proactor.h b/ace/SUN_Proactor.h
index 3c14a243576..c1ddbaf0cbc 100644
--- a/ace/SUN_Proactor.h
+++ b/ace/SUN_Proactor.h
@@ -100,6 +100,12 @@ protected:
/// From ACE_POSIX_AIOCB_Proactor.
virtual int start_aio (ACE_POSIX_Asynch_Result *result);
+ /// Check AIO for completion, error and result status
+ /// Return: 1 - AIO completed , 0 - not completed yet
+ virtual int get_result_status (ACE_POSIX_Asynch_Result* asynch_result,
+ int &error_status,
+ int &return_status);
+
/// Extract the results of aio.
ACE_POSIX_Asynch_Result *find_completed_aio (aio_result_t *result,
int &error_status,
diff --git a/tests/Proactor_Test.cpp b/tests/Proactor_Test.cpp
index 3dedb3e39d2..cb8470ef99b 100644
--- a/tests/Proactor_Test.cpp
+++ b/tests/Proactor_Test.cpp
@@ -47,6 +47,7 @@ ACE_RCSID (tests,
#elif defined (ACE_HAS_AIO_CALLS)
# include "ace/POSIX_Proactor.h"
+# include "ace/POSIX_CB_Proactor.h"
# include "ace/SUN_Proactor.h"
#endif /* defined (ACE_WIN32) && !defined (ACE_HAS_WINCE) */
@@ -55,8 +56,9 @@ ACE_RCSID (tests,
static int disable_signal (int sigmin, int sigmax);
// Proactor Type (UNIX only, Win32 ignored) 0-default, 1 -AIOCB,
-// 2-SIG, 3-SUN
-static int proactor_type = 0;
+// 2-SIG, 3-SUN, 4-CALLBACK
+typedef enum { DEFAULT = 0, AIOCB, SIG, SUN, CALLBACK } ProactorType;
+static ProactorType proactor_type = DEFAULT;
// POSIX : > 0 max number aio operations proactor,
static size_t max_aio_operations = 0;
@@ -100,6 +102,15 @@ static ACE_TCHAR data[] =
"Connection: Keep-Alive\r\n"
"\r\n";
+class LogLocker
+{
+public:
+
+ LogLocker () { ACE_LOG_MSG->acquire (); }
+ virtual ~LogLocker () { ACE_LOG_MSG->release (); }
+};
+
+
// *************************************************************
// MyTask is ACE_Task resposible for :
// 1. creation and deletion of
@@ -126,12 +137,12 @@ public:
virtual int svc (void);
int start (size_t num_threads,
- int type_proactor,
+ ProactorType type_proactor,
size_t max_op );
int stop (void);
private:
- int create_proactor (int type_proactor,
+ int create_proactor (ProactorType type_proactor,
size_t max_op);
int delete_proactor (void);
@@ -142,7 +153,7 @@ private:
};
int
-MyTask::create_proactor (int type_proactor, size_t max_op)
+MyTask::create_proactor (ProactorType type_proactor, size_t max_op)
{
ACE_GUARD_RETURN (ACE_SYNCH_RECURSIVE_MUTEX,
monitor,
@@ -171,7 +182,7 @@ MyTask::create_proactor (int type_proactor, size_t max_op)
switch (type_proactor)
{
- case 1:
+ case AIOCB:
ACE_NEW_RETURN (proactor,
ACE_POSIX_AIOCB_Proactor (max_op),
-1);
@@ -179,7 +190,7 @@ MyTask::create_proactor (int type_proactor, size_t max_op)
ACE_TEXT ("(%t) Create Proactor Type = AIOCB\n")));
break;
- case 2:
+ case SIG:
ACE_NEW_RETURN (proactor,
ACE_POSIX_SIG_Proactor (max_op),
-1);
@@ -188,17 +199,25 @@ MyTask::create_proactor (int type_proactor, size_t max_op)
break;
# if defined (sun)
-
- case 3:
+ case SUN:
ACE_NEW_RETURN (proactor,
ACE_SUN_Proactor (max_op),
-1);
ACE_DEBUG ((LM_DEBUG,
ACE_TEXT("(%t) Create Proactor Type = SUN\n")));
break;
-
# endif /* sun */
+# if defined (__sgi)
+ case 4:
+ ACE_NEW_RETURN (proactor,
+ ACE_POSIX_CB_Proactor (max_op),
+ -1);
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("(%t) Create Proactor Type = CALLBACK\n")));
+ break;
+# endif
+
default:
ACE_NEW_RETURN (proactor,
ACE_POSIX_SIG_Proactor (max_op),
@@ -238,7 +257,7 @@ MyTask::delete_proactor (void)
int
MyTask::start (size_t num_threads,
- int type_proactor,
+ ProactorType type_proactor,
size_t max_op)
{
if (this->create_proactor (type_proactor, max_op) == -1)
@@ -337,6 +356,7 @@ private:
int initiate_read_stream (void);
int initiate_write_stream (ACE_Message_Block &mb, int nbytes);
int check_destroy (void);
+ void cancel ();
Acceptor *acceptor_;
int index_;
@@ -347,6 +367,7 @@ private:
ACE_SYNCH_RECURSIVE_MUTEX lock_;
long io_count_;
+ int flg_cancel_;
};
class Acceptor : public ACE_Asynch_Acceptor<Receiver>
@@ -359,6 +380,7 @@ public:
virtual ~Acceptor (void);
void stop (void);
+ void cancel_all (void);
// Virtual from ACE_Asynch_Acceptor
Receiver *make_handler (void);
@@ -387,6 +409,26 @@ Acceptor::~Acceptor (void)
this->stop ();
}
+
+void
+Acceptor::cancel_all (void)
+{
+ // This method can be called only after proactor event loop is done
+ // in all threads.
+
+ ACE_GUARD (ACE_SYNCH_RECURSIVE_MUTEX, monitor, this->lock_);
+
+ this->cancel ();
+
+ for (int i = 0; i < MAX_RECEIVERS; ++i)
+ {
+ if (this->list_receivers_[i] != 0)
+ this->list_receivers_[i]->cancel ();
+ }
+ return;
+}
+
+
void
Acceptor::stop (void)
{
@@ -455,7 +497,8 @@ Receiver::Receiver (Acceptor * acceptor, int index)
: acceptor_ (acceptor),
index_ (index),
handle_ (ACE_INVALID_HANDLE),
- io_count_ (0)
+ io_count_ (0),
+ flg_cancel_ (0)
{
if (this->acceptor_ != 0)
this->acceptor_->on_new_receiver (*this);
@@ -488,6 +531,19 @@ Receiver::check_destroy (void)
return 0;
}
+
+void
+Receiver::cancel ()
+{
+ ACE_GUARD (ACE_SYNCH_RECURSIVE_MUTEX, monitor, this->lock_);
+
+ this->flg_cancel_ = 1;
+ this->ws_.cancel ();
+ this->rs_.cancel ();
+ return;
+}
+
+
void
Receiver::open (ACE_HANDLE handle, ACE_Message_Block &)
{
@@ -512,6 +568,9 @@ Receiver::initiate_read_stream (void)
{
ACE_GUARD_RETURN (ACE_SYNCH_RECURSIVE_MUTEX, monitor, this->lock_, -1);
+ if (this->flg_cancel_ != 0)
+ return 0;
+
ACE_Message_Block *mb = 0;
ACE_NEW_RETURN (mb,
ACE_Message_Block (1024), //BUFSIZ + 1),
@@ -535,6 +594,13 @@ int
Receiver::initiate_write_stream (ACE_Message_Block &mb, int nbytes)
{
ACE_GUARD_RETURN (ACE_SYNCH_RECURSIVE_MUTEX, monitor, this->lock_, -1);
+
+ if (this->flg_cancel_ != 0)
+ {
+ mb.release ();
+ return -1;
+ }
+
if (nbytes <= 0)
{
mb.release ();
@@ -568,6 +634,8 @@ Receiver::handle_read_stream (const ACE_Asynch_Read_Stream::Result &result)
|| result.bytes_transferred () == 0
|| result.error () != 0)
{
+ LogLocker log_lock;
+
ACE_DEBUG ((LM_DEBUG,
ACE_TEXT ("**** Receiver::handle_read_stream() SessionId = %d ****\n"),
this->index_));
@@ -639,6 +707,8 @@ Receiver::handle_write_stream (const ACE_Asynch_Write_Stream::Result &result)
result.bytes_transferred () == 0 ||
result.error () != 0)
{
+ LogLocker log_lock;
+
//mb.rd_ptr () [0] = '\0';
mb.rd_ptr (mb.rd_ptr () - result.bytes_transferred ());
@@ -725,6 +795,7 @@ private:
int check_destroy (void);
int initiate_read_stream (void);
int initiate_write_stream (void);
+ void cancel ();
int index_;
Connector * connector_;
@@ -738,7 +809,7 @@ private:
ACE_SYNCH_RECURSIVE_MUTEX lock_;
long io_count_;
-
+ int flg_cancel_;
};
class Connector : public ACE_Asynch_Connector<Sender>
@@ -752,6 +823,7 @@ public:
int start (const ACE_INET_Addr &addr, int num);
void stop (void);
+ void cancel_all (void);
// Virtual from ACE_Asynch_Connector
Sender *make_handler (void);
@@ -781,6 +853,24 @@ Connector::~Connector (void)
this->stop ();
}
+
+void
+Connector::cancel_all(void)
+{
+ // This method can be called only after proactor event loop is done
+ // in all threads.
+ ACE_GUARD (ACE_SYNCH_RECURSIVE_MUTEX, monitor, this->lock_);
+
+ this->cancel ();
+
+ for (int i = 0; i < MAX_SENDERS; ++i)
+ {
+ if (this->list_senders_[i] != 0)
+ this->list_senders_[i]->cancel ();
+ }
+ return;
+}
+
void
Connector::stop (void)
{
@@ -890,7 +980,8 @@ Sender::Sender (Connector * connector, int index)
: index_ (index),
connector_ (connector),
handle_ (ACE_INVALID_HANDLE),
- io_count_ (0)
+ io_count_ (0),
+ flg_cancel_ (0)
{
if (this->connector_ != 0)
this->connector_->on_new_sender (*this);
@@ -928,6 +1019,18 @@ Sender::check_destroy (void)
}
void
+Sender::cancel ()
+{
+ ACE_GUARD (ACE_SYNCH_RECURSIVE_MUTEX, monitor, this->lock_);
+
+ this->flg_cancel_ = 1;
+ this->ws_.cancel ();
+ this->rs_.cancel ();
+ return;
+}
+
+
+void
Sender::open (ACE_HANDLE handle, ACE_Message_Block &)
{
this->handle_ = handle;
@@ -959,6 +1062,9 @@ Sender::initiate_write_stream (void)
{
ACE_GUARD_RETURN (ACE_SYNCH_RECURSIVE_MUTEX, monitor, this->lock_, -1);
+ if (this->flg_cancel_ != 0)
+ return -1;
+
size_t nbytes = ACE_OS::strlen (this->send_buf_);
ACE_Message_Block *mb = 0;
@@ -984,6 +1090,9 @@ Sender::initiate_read_stream (void)
{
ACE_GUARD_RETURN (ACE_SYNCH_RECURSIVE_MUTEX, monitor, this->lock_, -1);
+ if (this->flg_cancel_ != 0)
+ return -1;
+
ACE_Message_Block *mb = 0;
ACE_NEW_RETURN (mb, ACE_Message_Block (1024 + 1), -1);
@@ -1010,6 +1119,8 @@ Sender::handle_write_stream (const ACE_Asynch_Write_Stream::Result &result)
|| result.bytes_transferred () == 0
|| result.error () != 0)
{
+ LogLocker log_lock;
+
// Reset pointers.
//mb.rd_ptr()[0] ='\0';
mb.rd_ptr (mb.rd_ptr () - result.bytes_transferred ());
@@ -1080,6 +1191,8 @@ Sender::handle_read_stream (const ACE_Asynch_Read_Stream::Result &result)
|| result.bytes_transferred () == 0
|| result.error () != 0)
{
+ LogLocker log_lock;
+
// Reset pointers.
mb.rd_ptr ()[result.bytes_transferred ()] = '\0';
@@ -1153,6 +1266,7 @@ print_usage (int /* argc */, ACE_TCHAR *argv[])
ACE_TEXT ("\n-t <Proactor type> UNIX-only, Win32-default always:")
ACE_TEXT ("\n a AIOCB")
ACE_TEXT ("\n i SIG")
+ ACE_TEXT ("\n c CALLBACK")
ACE_TEXT ("\n s SUN")
ACE_TEXT ("\n d default")
ACE_TEXT ("\n-d <duplex mode 1-on/0-off>")
@@ -1183,19 +1297,22 @@ set_proactor_type (const ACE_TCHAR *ptype)
switch (toupper (*ptype))
{
case 'D':
- proactor_type = 0;
+ proactor_type = DEFAULT;
return 1;
case 'A':
- proactor_type = 1;
+ proactor_type = AIOCB;
return 1;
case 'I':
- proactor_type = 2;
+ proactor_type = SIG;
return 1;
#if defined (sun)
case 'S':
- proactor_type = 3;
+ proactor_type = SUN;
return 1;
#endif /* sun */
+ case 'C':
+ proactor_type = CALLBACK;
+ return 1;
default:
break;
}
@@ -1208,14 +1325,18 @@ parse_args (int argc, ACE_TCHAR *argv[])
if (argc == 1) // no arguments , so one button test
{
both = 1; // client and server simultaneosly
+#if defined(ACE_WIN32) || defined(sun)
duplex = 1; // full duplex is on
+#else // Linux,IRIX - weak AIO implementation
+ duplex = 0; // full duplex is off
+#endif
host = ACE_TEXT ("localhost"); // server to connect
port = ACE_DEFAULT_SERVER_PORT; // port to connect/listen
max_aio_operations = 512; // POSIX Proactor params
#if defined (sun)
- proactor_type = 3; // Proactor type for SunOS
+ proactor_type = SUN; // Proactor type for SunOS
#else
- proactor_type = 1; // Proactor type = default
+ proactor_type = AIOCB; // Proactor type = default
#endif
threads = 3; // size of Proactor thread pool
senders = 20; // number of senders