summaryrefslogtreecommitdiff
path: root/ACE/examples/Reactor/Proactor
diff options
context:
space:
mode:
authorWilliam R. Otte <wotte@dre.vanderbilt.edu>2006-07-24 15:50:30 +0000
committerWilliam R. Otte <wotte@dre.vanderbilt.edu>2006-07-24 15:50:30 +0000
commitc44379cc7d9c7aa113989237ab0f56db12aa5219 (patch)
tree66a84b20d47f2269d8bdc6e0323f338763424d3a /ACE/examples/Reactor/Proactor
parent3aff90f4a822fcf5d902bbfbcc9fa931d6191a8c (diff)
downloadATCD-c44379cc7d9c7aa113989237ab0f56db12aa5219.tar.gz
Repo restructuring
Diffstat (limited to 'ACE/examples/Reactor/Proactor')
-rw-r--r--ACE/examples/Reactor/Proactor/.cvsignore7
-rw-r--r--ACE/examples/Reactor/Proactor/Aio_Platform_Test_C.cpp137
-rw-r--r--ACE/examples/Reactor/Proactor/Makefile.am153
-rw-r--r--ACE/examples/Reactor/Proactor/Proactor.mpc59
-rw-r--r--ACE/examples/Reactor/Proactor/README75
-rw-r--r--ACE/examples/Reactor/Proactor/post_completions.cpp306
-rw-r--r--ACE/examples/Reactor/Proactor/simple_test_proactor.cpp269
-rw-r--r--ACE/examples/Reactor/Proactor/test_aiocb.cpp239
-rw-r--r--ACE/examples/Reactor/Proactor/test_aiocb_ace.cpp259
-rw-r--r--ACE/examples/Reactor/Proactor/test_aiosig.cpp294
-rw-r--r--ACE/examples/Reactor/Proactor/test_aiosig_ace.cpp358
-rw-r--r--ACE/examples/Reactor/Proactor/test_cancel.cpp246
-rw-r--r--ACE/examples/Reactor/Proactor/test_cancel.h47
-rw-r--r--ACE/examples/Reactor/Proactor/test_end_event_loop.cpp168
-rw-r--r--ACE/examples/Reactor/Proactor/test_multiple_loops.cpp140
-rw-r--r--ACE/examples/Reactor/Proactor/test_proactor.cpp679
-rw-r--r--ACE/examples/Reactor/Proactor/test_proactor.h56
-rw-r--r--ACE/examples/Reactor/Proactor/test_proactor2.cpp808
-rw-r--r--ACE/examples/Reactor/Proactor/test_proactor3.cpp864
-rw-r--r--ACE/examples/Reactor/Proactor/test_timeout.cpp130
-rw-r--r--ACE/examples/Reactor/Proactor/test_timeout_st.cpp99
-rw-r--r--ACE/examples/Reactor/Proactor/test_udp_proactor.cpp432
22 files changed, 5825 insertions, 0 deletions
diff --git a/ACE/examples/Reactor/Proactor/.cvsignore b/ACE/examples/Reactor/Proactor/.cvsignore
new file mode 100644
index 00000000000..34179361b75
--- /dev/null
+++ b/ACE/examples/Reactor/Proactor/.cvsignore
@@ -0,0 +1,7 @@
+test_cancel
+test_end_event_loop
+test_multiple_loops
+test_post_completions
+test_proactor
+test_timeout
+test_udp_proactor
diff --git a/ACE/examples/Reactor/Proactor/Aio_Platform_Test_C.cpp b/ACE/examples/Reactor/Proactor/Aio_Platform_Test_C.cpp
new file mode 100644
index 00000000000..be720fdef40
--- /dev/null
+++ b/ACE/examples/Reactor/Proactor/Aio_Platform_Test_C.cpp
@@ -0,0 +1,137 @@
+// $Id$
+// ============================================================================
+//
+// = FILENAME
+// aio_platform_test_c.cpp
+//
+// = DESCRITPTION
+// Testing the platform for POSIX Asynchronous I/O. This is the C
+// version of the $ACE_ROOT/tests/Aio_Platform_Test.cpp. Useful
+// to send bug reports.
+//
+// = AUTHOR
+// Programming for the Real World. Bill O. GallMeister.
+// Modified by Alexander Babu Arulanthu <alex@cs.wustl.edu>
+//
+// =====================================================================
+
+
+#include <unistd.h>
+#include <fcntl.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <signal.h>
+#include <string.h>
+#include <errno.h>
+#include <stdio.h>
+
+#include <limits.h>
+
+#include <aio.h>
+
+int do_sysconf (void);
+int have_asynchio (void);
+
+static int file_handle = -1;
+char mb1 [BUFSIZ + 1];
+char mb2 [BUFSIZ + 1];
+aiocb aiocb1, aiocb2;
+sigset_t completion_signal;
+
+// For testing the <aio> stuff.
+int test_aio_calls (void);
+int issue_aio_calls (void);
+int query_aio_completions (void);
+int setup_signal_delivery (void);
+int do_sysconf (void);
+int have_asynchio (void);
+
+int
+do_sysconf (void)
+{
+ // Call sysconf to find out runtime values.
+ errno = 0;
+#if defined (_SC_LISTIO_AIO_MAX)
+ printf ("Runtime value of LISTIO_AIO_MAX is %d, errno = %d\n",
+ sysconf(_SC_LISTIO_AIO_MAX),
+ errno);
+#else
+ printf ("Runtime value of AIO_LISTIO_MAX is %d, errno = %d\n",
+ sysconf(_SC_AIO_LISTIO_MAX),
+ errno);
+#endif
+
+ errno = 0;
+ printf ("Runtime value of AIO_MAX is %d, errno = %d\n",
+ sysconf (_SC_AIO_MAX),
+ errno);
+
+ errno = 0;
+ printf ("Runtime value of _POSIX_ASYNCHRONOUS_IO is %d, errno = %d\n",
+ sysconf (_SC_ASYNCHRONOUS_IO),
+ errno);
+
+ errno = 0;
+ printf ("Runtime value of _POSIX_REALTIME_SIGNALS is %d, errno = %d\n",
+ sysconf (_SC_REALTIME_SIGNALS),
+ errno);
+
+ errno = 0;
+ printf ("Runtime value of RTSIG_MAX %d, Errno = %d\n",
+ sysconf (_SC_RTSIG_MAX),
+ errno);
+
+ errno = 0;
+ printf ("Runtime value of SIGQUEUE_MAX %d, Errno = %d\n",
+ sysconf (_SC_SIGQUEUE_MAX),
+ errno);
+ return 0;
+}
+
+int
+have_asynchio (void)
+{
+#if defined (_POSIX_ASYNCHRONOUS_IO)
+ // POSIX Asynch IO is present in this system.
+#if defined (_POSIX_ASYNC_IO)
+ // If this is defined and it is not -1, POSIX_ASYNCH is supported
+ // everywhere in the system.
+#if _POSIX_ASYNC_IO == -1
+ printf ("_POSIX_ASYNC_IO = -1.. ASYNCH IO NOT supported at all\n");
+ return -1;
+#else /* Not _POSIX_ASYNC_IO == -1 */
+ printf ("_POSIX_ASYNC_IO = %d\n ASYNCH IO is supported FULLY\n",
+ _POSIX_ASYNC_IO);
+#endif /* _POSIX_ASYNC_IO == -1 */
+
+#else /* Not defined _POSIX_ASYNC_IO */
+ printf ("_POSIX_ASYNC_IO is not defined.\n");
+ printf ("AIO might *not* be supported on some paths\n");
+#endif /* _POSIX_ASYNC_IO */
+
+ // System defined POSIX Values.
+ printf ("System claims to have POSIX_ASYNCHRONOUS_IO\n");
+
+ printf ("_POSIX_AIO_LISTIO_MAX = %d\n", _POSIX_AIO_LISTIO_MAX);
+ printf ("_POSIX_AIO_MAX = %d\n", _POSIX_AIO_MAX);
+
+ // Check and print the run time values.
+ do_sysconf ();
+
+ return 0;
+
+#else /* Not _POSIX_ASYNCHRONOUS_IO */
+ printf ("No support._POSIX_ASYNCHRONOUS_IO itself is not defined\n");
+ return -1;
+#endif /* _POSIX_ASYNCHRONOUS_IO */
+}
+
+int
+main (int, char *[])
+{
+ if (have_asynchio () == 0)
+ printf ("Test successful\n");
+ else
+ printf ("Test not successful\n");
+ return 0;
+}
diff --git a/ACE/examples/Reactor/Proactor/Makefile.am b/ACE/examples/Reactor/Proactor/Makefile.am
new file mode 100644
index 00000000000..7f1bc4b8a57
--- /dev/null
+++ b/ACE/examples/Reactor/Proactor/Makefile.am
@@ -0,0 +1,153 @@
+## Process this file with automake to create Makefile.in
+##
+## $Id$
+##
+## This file was generated by MPC. Any changes made directly to
+## this file will be lost the next time it is generated.
+##
+## MPC Command:
+## /acebuilds/ACE_wrappers-repository/bin/mwc.pl -include /acebuilds/MPC/config -include /acebuilds/MPC/templates -feature_file /acebuilds/ACE_wrappers-repository/local.features -noreldefs -type automake -exclude build,Kokyu
+
+ACE_BUILDDIR = $(top_builddir)
+ACE_ROOT = $(top_srcdir)
+
+noinst_PROGRAMS =
+
+## Makefile.Proactor_Cancel.am
+
+if !BUILD_ACE_FOR_TAO
+noinst_PROGRAMS += test_cancel
+
+test_cancel_CPPFLAGS = \
+ -I$(ACE_ROOT) \
+ -I$(ACE_BUILDDIR)
+
+test_cancel_SOURCES = \
+ test_cancel.cpp \
+ test_cancel.h
+
+test_cancel_LDADD = \
+ $(ACE_BUILDDIR)/ace/libACE.la
+
+endif !BUILD_ACE_FOR_TAO
+
+## Makefile.Proactor_End_Event_Loops.am
+
+if !BUILD_ACE_FOR_TAO
+noinst_PROGRAMS += test_end_event_loop
+
+test_end_event_loop_CPPFLAGS = \
+ -I$(ACE_ROOT) \
+ -I$(ACE_BUILDDIR)
+
+test_end_event_loop_SOURCES = \
+ test_end_event_loop.cpp \
+ test_cancel.h \
+ test_proactor.h
+
+test_end_event_loop_LDADD = \
+ $(ACE_BUILDDIR)/ace/libACE.la
+
+endif !BUILD_ACE_FOR_TAO
+
+## Makefile.Proactor_Multiple_Loops.am
+
+if !BUILD_ACE_FOR_TAO
+noinst_PROGRAMS += test_multiple_loops
+
+test_multiple_loops_CPPFLAGS = \
+ -I$(ACE_ROOT) \
+ -I$(ACE_BUILDDIR)
+
+test_multiple_loops_SOURCES = \
+ test_multiple_loops.cpp \
+ test_cancel.h \
+ test_proactor.h
+
+test_multiple_loops_LDADD = \
+ $(ACE_BUILDDIR)/ace/libACE.la
+
+endif !BUILD_ACE_FOR_TAO
+
+## Makefile.Proactor_Post_Completions.am
+
+if !BUILD_ACE_FOR_TAO
+noinst_PROGRAMS += test_post_completions
+
+test_post_completions_CPPFLAGS = \
+ -I$(ACE_ROOT) \
+ -I$(ACE_BUILDDIR)
+
+test_post_completions_SOURCES = \
+ post_completions.cpp \
+ test_cancel.h \
+ test_proactor.h
+
+test_post_completions_LDADD = \
+ $(ACE_BUILDDIR)/ace/libACE.la
+
+endif !BUILD_ACE_FOR_TAO
+
+## Makefile.Proactor_Proactor.am
+
+if !BUILD_ACE_FOR_TAO
+noinst_PROGRAMS += test_proactor
+
+test_proactor_CPPFLAGS = \
+ -I$(ACE_ROOT) \
+ -I$(ACE_BUILDDIR)
+
+test_proactor_SOURCES = \
+ test_proactor.cpp \
+ test_proactor.h
+
+test_proactor_LDADD = \
+ $(ACE_BUILDDIR)/ace/libACE.la
+
+endif !BUILD_ACE_FOR_TAO
+
+## Makefile.Proactor_Timeout.am
+
+if !BUILD_ACE_FOR_TAO
+noinst_PROGRAMS += test_timeout
+
+test_timeout_CPPFLAGS = \
+ -I$(ACE_ROOT) \
+ -I$(ACE_BUILDDIR)
+
+test_timeout_SOURCES = \
+ test_timeout.cpp \
+ test_cancel.h \
+ test_proactor.h
+
+test_timeout_LDADD = \
+ $(ACE_BUILDDIR)/ace/libACE.la
+
+endif !BUILD_ACE_FOR_TAO
+
+## Makefile.Proactor_Udp_Proactor.am
+
+if !BUILD_ACE_FOR_TAO
+noinst_PROGRAMS += test_udp_proactor
+
+test_udp_proactor_CPPFLAGS = \
+ -I$(ACE_ROOT) \
+ -I$(ACE_BUILDDIR)
+
+test_udp_proactor_SOURCES = \
+ test_udp_proactor.cpp \
+ test_cancel.h \
+ test_proactor.h
+
+test_udp_proactor_LDADD = \
+ $(ACE_BUILDDIR)/ace/libACE.la
+
+endif !BUILD_ACE_FOR_TAO
+
+## Clean up template repositories, etc.
+clean-local:
+ -rm -f *~ *.bak *.rpo *.sym lib*.*_pure_* core core.*
+ -rm -f gcctemp.c gcctemp so_locations *.ics
+ -rm -rf cxx_repository ptrepository ti_files
+ -rm -rf templateregistry ir.out
+ -rm -rf ptrepository SunWS_cache Templates.DB
diff --git a/ACE/examples/Reactor/Proactor/Proactor.mpc b/ACE/examples/Reactor/Proactor/Proactor.mpc
new file mode 100644
index 00000000000..c2c52207ca1
--- /dev/null
+++ b/ACE/examples/Reactor/Proactor/Proactor.mpc
@@ -0,0 +1,59 @@
+// -*- MPC -*-
+// $Id$
+
+project(*cancel) : aceexe {
+ avoids += ace_for_tao
+ exename = test_cancel
+ Source_Files {
+ test_cancel.cpp
+ }
+}
+
+project(*end_event_loops) : aceexe {
+ avoids += ace_for_tao
+ exename = test_end_event_loop
+ Source_Files {
+ test_end_event_loop.cpp
+ }
+}
+
+project(*multiple_loops) : aceexe {
+ avoids += ace_for_tao
+ exename = test_multiple_loops
+ Source_Files {
+ test_multiple_loops.cpp
+ }
+}
+
+project(*post_completions) : aceexe {
+ avoids += ace_for_tao
+ exename = test_post_completions
+ Source_Files {
+ post_completions.cpp
+ }
+}
+
+project(*proactor) : aceexe {
+ avoids += ace_for_tao
+ exename = test_proactor
+ Source_Files {
+ test_proactor.cpp
+ }
+}
+
+project(*timeout) : aceexe {
+ avoids += ace_for_tao
+ exename = test_timeout
+ Source_Files {
+ test_timeout.cpp
+ }
+}
+
+project(*udp_proactor) : aceexe {
+ avoids += ace_for_tao
+ exename = test_udp_proactor
+ Source_Files {
+ test_udp_proactor.cpp
+ }
+}
+
diff --git a/ACE/examples/Reactor/Proactor/README b/ACE/examples/Reactor/Proactor/README
new file mode 100644
index 00000000000..29f2a0b1832
--- /dev/null
+++ b/ACE/examples/Reactor/Proactor/README
@@ -0,0 +1,75 @@
+$Id$
+
+This README file lists all the example applications for the Proactor framework.
+
+Test/Example Applications for Proactor:
+=========================================
+
+The following tests are available.
+
+o $ACE_ROOT/tests/Aio_Platform_Test.cpp : Tests basic limits
+ pertaining to the POSIX features
+
+o $ACE_ROOT/examples/Reactor/Proactor/test_aiocb.cpp :
+ This is a C++ program for testing the AIOCB (AIO Control
+ Blocks) based completion approach which uses <aio_suspend> for
+ completion querying.
+
+o $ACE_ROOT/examples/Reactor/Proactor/test_aiosig.cpp : This is a
+ C++ program for testing the Signal based completion approach
+ that uses <sigtimedwait> for completion querying.
+
+o $ACE_ROOT/examples/Reactor/Proactor/test_aiocb_ace.cpp: Portable
+ version of test_aiocb.cpp. (Same as test_aiocb.cpp, but uses
+ ACE_DEBUGs instead of printf's and ACE_Message_Blocks instead
+ of char*'s.
+
+o $ACE_ROOT/examples/Reactor/Proactor/test_aiosig_ace.cpp: Portable
+ version of test_aiosig.cpp. (Same as test_aiosig.cpp, but uses
+ ACE_DEBUGs instead of printf's and ACE_Message_Blocks instead
+ of char*'s.
+
+o test_proactor.cpp (with ACE_POSIX_AIOCB_Proactor) : Test for
+ ACE_Proactor which uses AIOCB (AIO Control Blocks) based
+ completions strategy Proactor. (#define
+ ACE_POSIX_AIOCB_PROACTOR in the config file, but this is the
+ default option)
+
+o test_proactor.cpp (with ACE_POSIX_SIG_Proactor) : Test for
+ ACE_Proactor which uses real time signal based completion
+ strategy proactor. (#define ACE_POSIX_SIG_PROACTOR in the
+ config file)
+
+o test_multiple_loops.cpp : This example application shows how
+ to write programs that combine the Proactor and Reactor event
+ loops. This is possible only on WIN32 platform.
+
+o test_timeout.cpp : Multithreaded application testing the Timers
+ mechanism of the Proactor.
+
+o test_timeout_st.cpp : Single-threaded version of test_timeout.cpp.
+
+o post_completions.cpp : Tests the completion posting mechanism of
+ the Proactor.
+
+o test_end_event_loop.cpp : Tests the event loop mechanism of the
+ Proactor.
+
+o test_cancel.cpp : Tests <cancel> interface of the
+ Asynch_Operation class.
+
+Behavior of POSIX AIO of various platforms:
+==========================================
+
+Sun 5.6 : POSIX4 Real-Time signals implementation is broken in
+ this platform.
+ Only POSIX AIOCB Proactor works in this platform.
+ Therefore, it is not possible to use multiple threads
+ with in the framework.
+
+Sun 5.7 : AIOCB and SIG Proactors work fine.
+
+LynxOS 3.0.0 : <pthread_sigmask> is not available in this
+ platform. So, only AIOCB Proactor works here.
+
+
diff --git a/ACE/examples/Reactor/Proactor/post_completions.cpp b/ACE/examples/Reactor/Proactor/post_completions.cpp
new file mode 100644
index 00000000000..e6545241953
--- /dev/null
+++ b/ACE/examples/Reactor/Proactor/post_completions.cpp
@@ -0,0 +1,306 @@
+// $Id$
+// ============================================================================
+//
+// = FILENAME
+// post_completions.cpp
+//
+// = DESCRITPTION
+// This program demonstrates how to post fake completions to The
+// Proactor. It also shows the how to specify the particular
+// real-time signals to post completions. The Real-time signal
+// based completion strategy is implemented with
+// ACE_POSIX_SIG_PROACTOR.
+// (So, it can be used only if both ACE_HAS_AIO_CALLS and
+// ACE_HAS_POSIX_REALTIME_SIGNALS are defined.)
+// Since it is faking results, you have to pay by knowing and
+// using platform-specific implementation objects for Asynchronous
+// Result classes.
+// This example shows using an arbitrary result class for faking
+// completions. You can also use the predefined Result classes for
+// faking. The factory methods in the Proactor class create the
+// Result objects.
+//
+// = COMPILATION
+// make
+//
+// = RUN
+// ./post_completions
+//
+// = AUTHOR
+// Alexander Babu Arulanthu <alex@cs.wustl.edu>
+//
+// =====================================================================
+
+#include "ace/OS_NS_unistd.h"
+#include "ace/OS_main.h"
+#include "ace/Proactor.h"
+#include "ace/Task.h"
+#include "ace/WIN32_Proactor.h"
+#include "ace/POSIX_Proactor.h"
+#include "ace/Atomic_Op.h"
+#include "ace/Thread_Mutex.h"
+
+// Keep track of how many completions are still expected.
+static ACE_Atomic_Op <ACE_SYNCH_MUTEX, size_t> Completions_To_Go;
+
+
+#if (defined (ACE_WIN32) && !defined (ACE_HAS_WINCE)) || \
+ defined (ACE_HAS_AIO_CALLS)
+// This only works on Win32 platforms and on Unix platforms supporting
+// POSIX aio calls.
+
+#if defined (ACE_HAS_AIO_CALLS)
+#define RESULT_CLASS ACE_POSIX_Asynch_Result
+#elif defined (ACE_WIN32) && !defined (ACE_HAS_WINCE)
+#define RESULT_CLASS ACE_WIN32_Asynch_Result
+#endif /* ACE_HAS_AIO_CALLS */
+
+class My_Result : public RESULT_CLASS
+{
+ // = TITLE
+ //
+ // Result Object that we will post to the Proactor.
+ //
+ // = DESCRIPTION
+ //
+
+public:
+ My_Result (ACE_Handler &handler,
+ const void *act,
+ int signal_number,
+ size_t sequence_number)
+ : RESULT_CLASS (handler.proxy (),
+ act,
+ ACE_INVALID_HANDLE,
+ 0, // Offset
+ 0, // OffsetHigh
+ 0, // Priority
+ signal_number),
+ sequence_number_ (sequence_number)
+ {}
+ // Constructor.
+
+ virtual ~My_Result (void)
+ {}
+ // Destructor.
+
+ void complete (size_t,
+ int success,
+ const void *completion_key,
+ u_long error)
+ // This is the method that will be called by the Proactor for
+ // dispatching the completion. This method generally calls one of
+ // the call back hood methods defined in the ACE_Handler
+ // class. But, we will just handle the completions here.
+ {
+ this->success_ = success;
+ this->completion_key_ = completion_key;
+ this->error_ = error;
+
+ size_t to_go = --Completions_To_Go;
+
+ // Print the completion details.
+ ACE_DEBUG ((LM_DEBUG,
+ "(%t) Completion sequence number %d, success : %d, error : %d, signal_number : %d, %u more to go\n",
+ this->sequence_number_,
+ this->success_,
+ this->error_,
+ this->signal_number (),
+ to_go));
+
+ // Sleep for a while.
+ ACE_OS::sleep (4);
+ }
+
+private:
+ size_t sequence_number_;
+ // Sequence number for the result object.
+};
+
+class My_Handler : public ACE_Handler
+{
+ // = TITLE
+ //
+ // Handler class for faked completions.
+ //
+ // = DESCRIPTION
+ //
+
+public:
+ My_Handler (void) {}
+ // Constructor.
+
+ virtual ~My_Handler (void) {}
+ // Destructor.
+};
+
+class My_Task: public ACE_Task <ACE_NULL_SYNCH>
+{
+ // = TITLE
+ //
+ // Contains thread functions which execute event loops. Each
+ // thread waits for a different signal.
+ //
+public:
+ My_Task (void) {}
+ // Constructor.
+
+ virtual ~My_Task (void) {}
+ // Destructor.
+
+ int open (void *proactor)
+ {
+ // Store the proactor.
+ this->proactor_ = (ACE_Proactor *) proactor;
+
+ // Activate the Task.
+ this->activate (THR_NEW_LWP, 5);
+ return 0;
+ }
+
+ int svc (void)
+ {
+ // Handle events for 13 seconds.
+ ACE_Time_Value run_time (13);
+
+ ACE_DEBUG ((LM_DEBUG, "(%t):Starting svc routine\n"));
+
+ if (this->proactor_->handle_events (run_time) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR, "(%t):%p.\n", "Worker::svc"), -1);
+
+ ACE_DEBUG ((LM_DEBUG, "(%t) work complete\n"));
+
+ return 0;
+ }
+
+private:
+ ACE_Proactor *proactor_;
+ // Proactor for this task.
+};
+
+int
+ACE_TMAIN (int argc, ACE_TCHAR *argv[])
+{
+ ACE_UNUSED_ARG (argc);
+ ACE_UNUSED_ARG (argv);
+
+ ACE_DEBUG ((LM_DEBUG,
+ "(%P | %t):Test starts \n"));
+
+ // = Get two POSIX_SIG_Proactors, one with SIGRTMIN and one with
+ // SIGRTMAX.
+
+ ACE_Proactor proactor1;
+ // Proactor1. SIGRTMIN Proactor. (default).
+
+ // = Proactor2. SIGRTMAX Proactor.
+#if defined (ACE_HAS_AIO_CALLS) && defined (ACE_HAS_POSIX_REALTIME_SIGNALS)
+
+ ACE_DEBUG ((LM_DEBUG, "Using ACE_POSIX_SIG_Proactor\n"));
+
+ sigset_t signal_set;
+ // Signal set that we want to mask.
+
+ // Clear the signal set.
+ if (sigemptyset (&signal_set) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "Error:%p\n",
+ "sigemptyset failed"),
+ 1);
+
+ // Add the SIGRTMAX to the signal set.
+ if (sigaddset (&signal_set, ACE_SIGRTMAX) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "Error:%p\n",
+ "sigaddset failed"),
+ 1);
+
+ // Make the POSIX Proactor.
+ ACE_POSIX_SIG_Proactor posix_proactor (signal_set);
+ // Get the Proactor interface out of it.
+ ACE_Proactor proactor2 (&posix_proactor);
+#else /* ACE_HAS_AIO_CALLS && ACE_HAS_POSIX_REALTIME_SIGNALS */
+ ACE_Proactor proactor2;
+#endif /* ACE_HAS_AIO_CALLS && ACE_HAS_POSIX_REALTIME_SIGNALS */
+
+ // = Create Tasks. One pool of threads to handle completions on
+ // SIGRTMIN and the other one to handle completions on SIGRTMAX.
+ My_Task task1, task2;
+ task1.open (&proactor1);
+ task2.open (&proactor2);
+
+ // Handler for completions.
+ My_Handler handler;
+
+ // = Create a few MyResult objects and post them to Proactor.
+ const size_t NrCompletions (10);
+ My_Result *result_objects [NrCompletions];
+ int signal_number = ACE_SIGRTMAX;
+ size_t ri = 0;
+
+ Completions_To_Go = NrCompletions;
+
+ // Creation.
+ for (ri = 0; ri < NrCompletions; ri++)
+ {
+ // Use RTMIN and RTMAX proactor alternatively, to post
+ // completions.
+ if (ri % 2)
+ signal_number = ACE_SIGRTMIN;
+ else
+ signal_number = ACE_SIGRTMAX;
+ // Create the result.
+ ACE_NEW_RETURN (result_objects [ri],
+ My_Result (handler,
+ 0,
+ signal_number,
+ ri),
+ 1);
+ }
+ ACE_OS::sleep(5);
+ // Post all the result objects.
+ ACE_Proactor *proactor;
+ for (ri = 0; ri < NrCompletions; ri++)
+ {
+ // Use RTMIN and RTMAX Proactor alternatively, to post
+ // completions.
+ if (ri % 2)
+ proactor = &proactor1;
+ else
+ proactor = &proactor2;
+ if (result_objects [ri]->post_completion (proactor->implementation ())
+ == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "Test failed\n"),
+ 1);
+ }
+
+ ACE_Thread_Manager::instance ()->wait ();
+
+ int status = 0;
+ size_t to_go = Completions_To_Go.value ();
+ if (size_t (0) != to_go)
+ {
+ ACE_ERROR ((LM_ERROR,
+ "Fail! Expected all completions to finish but %u to go\n",
+ to_go));
+ status = 1;
+ }
+
+ ACE_DEBUG ((LM_DEBUG,
+ "(%P | %t):Test ends\n"));
+ return status;
+}
+
+#else /* ACE_WIN32 && !ACE_HAS_WINCE || ACE_HAS_AIO_CALLS && !ACE_POSIX_AIOCB_PROACTOR*/
+
+int
+main (int, char *[])
+{
+ ACE_DEBUG ((LM_DEBUG,
+ "This example cannot work with AIOCB_Proactor.\n"));
+ return 1;
+}
+
+#endif /* ACE_WIN32 && !ACE_HAS_WINCE || ACE_HAS_AIO_CALLS && !ACE_POSIX_AIOCB_PROACTOR*/
+
diff --git a/ACE/examples/Reactor/Proactor/simple_test_proactor.cpp b/ACE/examples/Reactor/Proactor/simple_test_proactor.cpp
new file mode 100644
index 00000000000..1f4557d7df5
--- /dev/null
+++ b/ACE/examples/Reactor/Proactor/simple_test_proactor.cpp
@@ -0,0 +1,269 @@
+// $Id$
+
+// ============================================================================
+//
+// = LIBRARY
+// examples
+//
+// = FILENAME
+// simple_test_proactor.cpp
+//
+// = DESCRIPTION
+// Very simple version of test_proactor.cpp.
+//
+// = AUTHOR
+// Alexander Babu Arulanthu (alex@cs.wustl.edu)
+//
+// ============================================================================
+
+#include "ace/Service_Config.h"
+#include "ace/Proactor.h"
+#include "ace/Asynch_IO.h"
+#include "ace/Asynch_IO_Impl.h"
+#include "ace/Message_Block.h"
+#include "ace/Get_Opt.h"
+#include "ace/OS_main.h"
+
+ACE_RCSID(Proactor, test_proactor, "simple_test_proactor.cpp,v 1.1 1999/05/18 22:15:30 alex Exp")
+
+#if ((defined (ACE_WIN32) && !defined (ACE_HAS_WINCE)) || (defined (ACE_HAS_AIO_CALLS)))
+ // This only works on Win32 platforms and on Unix platforms supporting
+ // POSIX aio calls.
+
+static ACE_TCHAR *file = ACE_TEXT("simple_test_proactor.cpp");
+static ACE_TCHAR *dump_file = ACE_TEXT("simple_output");
+
+class Simple_Tester : public ACE_Handler
+{
+ // = TITLE
+ //
+ // Simple_Tester
+ //
+ // = DESCRIPTION
+ //
+ // The class will be created by main(). This class reads a block
+ // from the file and write that to the dump file.
+
+public:
+ Simple_Tester (void);
+ // Constructor.
+
+ ~Simple_Tester (void);
+
+ int open (void);
+ // Open the operations and initiate read from the file.
+
+protected:
+ // = These methods are called by the freamwork.
+
+ virtual void handle_read_file (const ACE_Asynch_Read_File::Result &result);
+ // This is called when asynchronous reads from the socket complete.
+
+ virtual void handle_write_file (const ACE_Asynch_Write_File::Result &result);
+ // This is called when asynchronous writes from the socket complete.
+
+private:
+ int initiate_read_file (void);
+
+ ACE_Asynch_Read_File rf_;
+ // rf (read file): for writing from the file.
+
+ ACE_Asynch_Write_File wf_;
+ // ws (write File): for writing to the file.
+
+ ACE_HANDLE input_file_;
+ // File to read from.
+
+ ACE_HANDLE dump_file_;
+ // File for dumping data.
+
+ // u_long file_offset_;
+ // Current file offset
+
+ // u_long file_size_;
+ // File size
+};
+
+
+Simple_Tester::Simple_Tester (void)
+ : input_file_ (ACE_INVALID_HANDLE),
+ dump_file_ (ACE_INVALID_HANDLE)
+{
+}
+
+Simple_Tester::~Simple_Tester (void)
+{
+ ACE_OS::close (this->input_file_);
+ ACE_OS::close (this->dump_file_);
+}
+
+
+int
+Simple_Tester::open (void)
+{
+ // Initialize stuff
+
+ // Open input file (in OVERLAPPED mode)
+ this->input_file_ = ACE_OS::open (file,
+ GENERIC_READ | FILE_FLAG_OVERLAPPED);
+ if (this->input_file_ == ACE_INVALID_HANDLE)
+ ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "ACE_OS::open"), -1);
+
+ // Open dump file (in OVERLAPPED mode)
+ this->dump_file_ = ACE_OS::open (dump_file,
+ O_CREAT | O_RDWR | O_TRUNC | FILE_FLAG_OVERLAPPED,
+ 0644);
+ if (this->dump_file_ == ACE_INVALID_HANDLE)
+ ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "ACE_OS::open"), -1);
+
+ // Open ACE_Asynch_Read_File
+ if (this->rf_.open (*this, this->input_file_) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "ACE_Asynch_Read_File::open"), -1);
+
+ // Open ACE_Asynch_Write_File
+ if (this->wf_.open (*this, this->dump_file_) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "ACE_Asynch_Write_File::open"), -1);
+
+ ACE_DEBUG ((LM_DEBUG,
+ "Simple_Tester::open: Files and Asynch Operations opened sucessfully\n"));
+
+
+ // Start an asynchronous read file
+ if (this->initiate_read_file () == -1)
+ return -1;
+
+ return 0;
+}
+
+
+int
+Simple_Tester::initiate_read_file (void)
+{
+ // Create Message_Block
+ ACE_Message_Block *mb = 0;
+ ACE_NEW_RETURN (mb, ACE_Message_Block (BUFSIZ + 1), -1);
+
+ // Inititiate an asynchronous read from the file
+ if (this->rf_.read (*mb,
+ mb->size () - 1) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "ACE_Asynch_Read_File::read"), -1);
+
+ ACE_DEBUG ((LM_DEBUG,
+ "Simple_Tester:initiate_read_file: Asynch Read File issued sucessfully\n"));
+
+ return 0;
+}
+
+void
+Simple_Tester::handle_read_file (const ACE_Asynch_Read_File::Result &result)
+{
+ ACE_DEBUG ((LM_DEBUG, "handle_read_file called\n"));
+
+ result.message_block ().rd_ptr ()[result.bytes_transferred ()] = '\0';
+
+ ACE_DEBUG ((LM_DEBUG, "********************\n"));
+ ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_to_read", result.bytes_to_read ()));
+ ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "handle", result.handle ()));
+ ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_transfered", result.bytes_transferred ()));
+ ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "act", (u_long) result.act ()));
+ ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "success", result.success ()));
+ ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "completion_key", (u_long) result.completion_key ()));
+ ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "error", result.error ()));
+ ACE_DEBUG ((LM_DEBUG, "********************\n"));
+ // Watch out if you need to enable this... the ACE_Log_Record::MAXLOGMSGLEN
+ // value controls to max length of a log record, and a large output
+ // buffer may smash it.
+#if 0
+ ACE_DEBUG ((LM_DEBUG, "%s = %s\n",
+ "message_block",
+ result.message_block ().rd_ptr ()));
+#endif /* 0 */
+
+ if (result.success ())
+ {
+ // Read successful: write this to the file.
+ if (this->wf_.write (result.message_block (),
+ result.bytes_transferred ()) == -1)
+ {
+ ACE_ERROR ((LM_ERROR, "%p\n", "ACE_Asynch_Write_File::write"));
+ return;
+ }
+ }
+}
+
+void
+Simple_Tester::handle_write_file (const ACE_Asynch_Write_File::Result &result)
+{
+ ACE_DEBUG ((LM_DEBUG, "handle_write_File called\n"));
+
+ // Reset pointers
+ result.message_block ().rd_ptr (result.message_block ().rd_ptr () - result.bytes_transferred ());
+
+ result.message_block ().rd_ptr ()[result.bytes_transferred ()] = '\0';
+
+ ACE_DEBUG ((LM_DEBUG, "********************\n"));
+ ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_to_write", result.bytes_to_write ()));
+ ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "handle", result.handle ()));
+ ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_transfered", result.bytes_transferred ()));
+ ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "act", (u_long) result.act ()));
+ ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "success", result.success ()));
+ ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "completion_key", (u_long) result.completion_key ()));
+ ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "error", result.error ()));
+ ACE_DEBUG ((LM_DEBUG, "********************\n"));
+ // Watch out if you need to enable this... the ACE_Log_Record::MAXLOGMSGLEN
+ // value controls to max length of a log record, and a large output
+ // buffer may smash it.
+#if 0
+ ACE_DEBUG ((LM_DEBUG, "%s = %s\n",
+ "message_block",
+ result.message_block ().rd_ptr ()));
+#endif /* 0 */
+ ACE_Proactor::end_event_loop ();
+}
+
+static int
+parse_args (int argc, ACE_TCHAR *argv[])
+{
+ ACE_Get_Opt get_opt (argc, argv, ACE_TEXT("f:d:"));
+ int c;
+
+ while ((c = get_opt ()) != EOF)
+ switch (c)
+ {
+ case 'f':
+ file = get_opt.opt_arg ();
+ break;
+ case 'd':
+ dump_file = get_opt.opt_arg ();
+ break;
+ default:
+ ACE_ERROR ((LM_ERROR, "%p.\n",
+ "usage :\n"
+ "-d <dumpfile>\n"
+ "-f <file>\n"));
+ return -1;
+ }
+
+ return 0;
+}
+
+int
+ACE_TMAIN (int argc, ACE_TCHAR *argv[])
+{
+ if (parse_args (argc, argv) == -1)
+ return -1;
+
+ Simple_Tester Simple_Tester;
+
+ if (Simple_Tester.open () == -1)
+ return -1;
+
+ int success = 1;
+
+ // dispatch events
+ success = !(ACE_Proactor::run_event_loop () == -1);
+
+ return success ? 0 : 1;
+}
+
+#endif /* ACE_WIN32 && !ACE_HAS_WINCE || ACE_HAS_AIO_CALLS*/
diff --git a/ACE/examples/Reactor/Proactor/test_aiocb.cpp b/ACE/examples/Reactor/Proactor/test_aiocb.cpp
new file mode 100644
index 00000000000..c9c0d280f1b
--- /dev/null
+++ b/ACE/examples/Reactor/Proactor/test_aiocb.cpp
@@ -0,0 +1,239 @@
+// $Id$
+
+// ============================================================================
+//
+// = LIBRARY
+// proactor
+//
+// = FILENAME
+// test_aiocb.cpp
+//
+// = DESCRIPTION
+// Checkout $ACE_ROOT/examples/Reactor/Proactor/test_aiocb_ace.cpp,
+// which is the ACE'ified version of this program.
+//
+// = COMPILE and RUN
+// % CC -g -o test_aiocb -lrt test_aiocb.cpp
+// % ./test_aiocb
+//
+// = AUTHOR
+// Alexander Babu Arulanthu <alex@cs.wustl.edu>
+//
+// ============================================================================
+
+#include <unistd.h>
+#include <fcntl.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <signal.h>
+#include <string.h>
+#include <errno.h>
+#include <stdio.h>
+#include <iostream.h>
+
+#include <aio.h>
+
+class Test_Aio
+{
+public:
+ Test_Aio (void);
+ // Default constructor.
+
+ int init (void);
+ // Initting the output file and the buffer.
+
+ int do_aio (void);
+ // Doing the testing stuff.
+
+ ~Test_Aio (void);
+ // Destructor.
+private:
+ int out_fd_;
+ // Output file descriptor.
+
+ struct aiocb *aiocb_write_;
+ // For writing to the file.
+
+ struct aiocb *aiocb_read_;
+ // Reading stuff from the file.
+
+ char *buffer_write_;
+ // The buffer to be written to the out_fd.
+
+ char *buffer_read_;
+ // The buffer to be read back from the file.
+};
+
+Test_Aio::Test_Aio (void)
+ : aiocb_write_ (new struct aiocb),
+ aiocb_read_ (new struct aiocb),
+ buffer_write_ (0),
+ buffer_read_ (0)
+{
+}
+
+Test_Aio::~Test_Aio (void)
+{
+ delete aiocb_write_;
+ delete aiocb_read_;
+ delete buffer_write_;
+ delete buffer_read_;
+}
+
+// Init the output file and init the buffer.
+int
+Test_Aio::init (void)
+{
+ // Open the output file.
+ this->out_fd_ = open ("test_aio.log", O_RDWR | O_CREAT | O_TRUNC, 0666);
+ if (this->out_fd_ == 0)
+ {
+ cout << "Error : Opening file" << endl;
+ return -1;
+ }
+
+ // Init the buffers.
+ this->buffer_write_ = strdup ("Welcome to the world of AIO... AIO Rules !!!");
+ cout << "The buffer : " << this->buffer_write_ << endl;
+ this->buffer_read_ = new char [strlen (this->buffer_write_) + 1];
+ return 0;
+}
+
+// Set the necessary things for the AIO stuff.
+// Write the buffer asynchly.hmm Disable signals.
+// Go on aio_suspend. Wait for completion.
+// Print out the result.
+int
+Test_Aio::do_aio (void)
+{
+ // = Write to the file.
+
+ // Setup AIOCB.
+ this->aiocb_write_->aio_fildes = this->out_fd_;
+ this->aiocb_write_->aio_offset = 0;
+ this->aiocb_write_->aio_buf = this->buffer_write_;
+ this->aiocb_write_->aio_nbytes = strlen (this->buffer_write_);
+ this->aiocb_write_->aio_reqprio = 0;
+ this->aiocb_write_->aio_sigevent.sigev_notify = SIGEV_NONE;
+ //this->this->aiocb_.aio_sigevent.sigev_signo = SIGRTMAX;
+ this->aiocb_write_->aio_sigevent.sigev_value.sival_ptr =
+ (void *) this->aiocb_write_;
+
+ // Fire off the aio write.
+ if (aio_write (this->aiocb_write_) != 0)
+ {
+ perror ("aio_write");
+ return -1;
+ }
+
+ // = Read from that file.
+
+ // Setup AIOCB.
+ this->aiocb_read_->aio_fildes = this->out_fd_;
+ this->aiocb_read_->aio_offset = 0;
+ this->aiocb_read_->aio_buf = this->buffer_read_;
+ this->aiocb_read_->aio_nbytes = strlen (this->buffer_write_);
+ this->aiocb_read_->aio_reqprio = 0;
+ this->aiocb_read_->aio_sigevent.sigev_notify = SIGEV_NONE;
+ //this->this->aiocb_.aio_sigevent.sigev_signo = SIGRTMAX;
+ this->aiocb_read_->aio_sigevent.sigev_value.sival_ptr =
+ (void *) this->aiocb_read_;
+
+ // Fire off the aio write. If it doesnt get queued, carry on to get
+ // the completion for the first one.
+ if (aio_read (this->aiocb_read_) < 0)
+ perror ("aio_read");
+
+ // Wait for the completion on aio_suspend.
+ struct aiocb *list_aiocb[2];
+ list_aiocb [0] = this->aiocb_write_;
+ list_aiocb [1] = this->aiocb_read_;
+
+ // Do suspend till all the aiocbs in the list are done.
+ int done = 0;
+ int return_val = 0;
+ while (!done)
+ {
+ return_val = aio_suspend (list_aiocb,
+ 2,
+ 0);
+ cerr << "Return value :" << return_val << endl;
+
+ // Analyze return and error values.
+ if (list_aiocb[0] != 0)
+ {
+ if (aio_error (list_aiocb [0]) != EINPROGRESS)
+ {
+ if (aio_return (list_aiocb [0]) == -1)
+ {
+ perror ("aio_return");
+ return -1;
+ }
+ else
+ {
+ // Successful. Store the pointer somewhere and make the
+ // entry NULL in the list.
+ this->aiocb_write_ = list_aiocb [0];
+ list_aiocb [0] = 0;
+ }
+ }
+ else
+ cout << "AIO write in progress" << endl;
+ }
+
+ if (list_aiocb[1] != 0)
+ {
+ if (aio_error (list_aiocb [1]) != EINPROGRESS)
+ {
+ int read_return = aio_return (list_aiocb[1]);
+ if (read_return == -1)
+ {
+ perror ("aio_return");
+ return -1;
+ }
+ else
+ {
+ // Successful. Store the pointer somewhere and make the
+ // entry NULL in the list.
+ this->aiocb_read_ = list_aiocb [1];
+ list_aiocb [1] = 0;
+ this->buffer_read_[read_return] = '\0';
+ }
+ }
+ else
+ cout << "AIO read in progress" << endl;
+ }
+
+ // Is it done?
+ if ((list_aiocb [0] == 0) && (list_aiocb [1] == 0))
+ done = 1;
+ }
+
+ cout << "Both the AIO operations done." << endl;
+ cout << "The buffer is :" << this->buffer_read_ << endl;
+
+ return 0;
+}
+
+int
+main (int argc, char **argv)
+{
+ Test_Aio test_aio;
+
+ if (test_aio.init () != 0)
+ {
+ printf ("AIOCB test failed:\n"
+ "ACE_POSIX_AIOCB_PROACTOR may not work in this platform\n");
+ return -1;
+ }
+
+ if (test_aio.do_aio () != 0)
+ {
+ printf ("AIOCB test failed:\n"
+ "ACE_POSIX_AIOCB_PROACTOR may not work in this platform\n");
+ return -1;
+ }
+ printf ("AIOCB test successful:\n"
+ "ACE_POSIX_AIOCB_PROACTOR should work in this platform\n");
+ return 0;
+}
diff --git a/ACE/examples/Reactor/Proactor/test_aiocb_ace.cpp b/ACE/examples/Reactor/Proactor/test_aiocb_ace.cpp
new file mode 100644
index 00000000000..17705de1f03
--- /dev/null
+++ b/ACE/examples/Reactor/Proactor/test_aiocb_ace.cpp
@@ -0,0 +1,259 @@
+// $Id$
+
+// ============================================================================
+//
+// = LIBRARY
+// proactor
+//
+// = FILENAME
+// test_aiocb_ace.cpp
+//
+// = DESCRIPTION
+// This program helps you to test the <aio_*> calls on a
+// platform.
+//
+// Before running this test, make sure the platform can
+// support POSIX <aio_> calls, using
+// ACE_ROOT/tests/Aio_Platform_Test.
+//
+// This program tests the AIOCB (AIO Control Blocks) based
+// completion approach which uses <aio_suspend> for completion
+// querying.
+//
+// If this test is successful, ACE_POSIX_AIOCB_PROACTOR
+// can be used on this platform.
+//
+// = COMPILE and RUN
+// % make
+// % ./test_aiocb_ace
+//
+// = AUTHOR
+// Alexander Babu Arulanthu <alex@cs.wustl.edu>
+//
+// ============================================================================
+
+#include "ace/ACE.h"
+#include "ace/Log_Msg.h"
+#include "ace/os_include/os_aio.h"
+#include "ace/OS_NS_string.h"
+
+class Test_Aio
+{
+public:
+ Test_Aio (void);
+ // Default constructor.
+
+ int init (void);
+ // Initting the output file and the buffer.
+
+ int do_aio (void);
+ // Doing the testing stuff.
+
+ ~Test_Aio (void);
+ // Destructor.
+private:
+ int out_fd_;
+ // Output file descriptor.
+
+ struct aiocb *aiocb_write_;
+ // For writing to the file.
+
+ struct aiocb *aiocb_read_;
+ // Reading stuff from the file.
+
+ char *buffer_write_;
+ // The buffer to be written to the out_fd.
+
+ char *buffer_read_;
+ // The buffer to be read back from the file.
+};
+
+Test_Aio::Test_Aio (void)
+ : aiocb_write_ (0),
+ aiocb_read_ (0),
+ buffer_write_ (0),
+ buffer_read_ (0)
+{
+ ACE_NEW (this->aiocb_write_,
+ struct aiocb);
+ ACE_NEW (this->aiocb_read_,
+ struct aiocb);
+}
+
+Test_Aio::~Test_Aio (void)
+{
+ delete aiocb_write_;
+ delete aiocb_read_;
+ delete buffer_write_;
+ delete buffer_read_;
+}
+
+// Init the output file and init the buffer.
+int
+Test_Aio::init (void)
+{
+ // Open the output file.
+ this->out_fd_ = ACE_OS::open ("test_aio.log",
+ O_RDWR | O_CREAT | O_TRUNC,
+ 0666);
+ if (this->out_fd_ == 0)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "Error: Opening file\n"),
+ -1);
+
+ // Init the buffers.
+ this->buffer_write_ = ACE::strnew ("Welcome to the world of AIO... AIO Rules !!!");
+ ACE_DEBUG ((LM_DEBUG,
+ "The buffer : %s\n",
+ this->buffer_write_));
+
+ // Allocate memory for the read buffer.
+ ACE_NEW_RETURN (this->buffer_read_,
+ char [ACE_OS::strlen (this->buffer_write_)],
+ -1);
+
+ return 0;
+}
+
+// Set the necessary things for the AIO stuff.
+// Write the buffer asynchly.hmm Disable signals.
+// Go on aio_suspend. Wait for completion.
+// Print out the result.
+int
+Test_Aio::do_aio (void)
+{
+ // = Write to the file.
+
+ // Setup AIOCB.
+ this->aiocb_write_->aio_fildes = this->out_fd_;
+ this->aiocb_write_->aio_offset = 0;
+ this->aiocb_write_->aio_buf = this->buffer_write_;
+ this->aiocb_write_->aio_nbytes = ACE_OS::strlen (this->buffer_write_);
+ this->aiocb_write_->aio_reqprio = 0;
+ this->aiocb_write_->aio_sigevent.sigev_notify = SIGEV_NONE;
+ //this->this->aiocb_.aio_sigevent.sigev_signo = SIGRTMAX;
+ this->aiocb_write_->aio_sigevent.sigev_value.sival_ptr =
+ (void *) this->aiocb_write_;
+
+ // Fire off the aio write.
+ if (aio_write (this->aiocb_write_) != 0)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "%p\n",
+ "aio_write"),
+ -1);
+
+ // = Read from that file.
+
+ // Setup AIOCB.
+ this->aiocb_read_->aio_fildes = this->out_fd_;
+ this->aiocb_read_->aio_offset = 0;
+ this->aiocb_read_->aio_buf = this->buffer_read_;
+ this->aiocb_read_->aio_nbytes = ACE_OS::strlen (this->buffer_write_);
+ this->aiocb_read_->aio_reqprio = 0;
+ this->aiocb_read_->aio_sigevent.sigev_notify = SIGEV_NONE;
+ //this->this->aiocb_.aio_sigevent.sigev_signo = SIGRTMAX;
+ this->aiocb_read_->aio_sigevent.sigev_value.sival_ptr =
+ (void *) this->aiocb_read_;
+
+ // Fire off the aio write. If it doesnt get queued, carry on to get
+ // the completion for the first one.
+ if (aio_read (this->aiocb_read_) < 0)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "%p\n",
+ "aio_read"),
+ -1);
+
+ // Wait for the completion on aio_suspend.
+ struct aiocb *list_aiocb[2];
+ list_aiocb [0] = this->aiocb_write_;
+ list_aiocb [1] = this->aiocb_read_;
+
+ // Do suspend till all the aiocbs in the list are done.
+ int to_finish = 2;
+ int return_val = 0;
+ while (to_finish > 0)
+ {
+ return_val = aio_suspend (list_aiocb,
+ to_finish,
+ 0);
+ ACE_DEBUG ((LM_DEBUG,
+ "Result of <aio_suspend> : %d\n",
+ return_val));
+
+ // Analyze return and error values.
+ if (to_finish > 1)
+ {
+ if (aio_error (list_aiocb [1]) != EINPROGRESS)
+ {
+ if (aio_return (list_aiocb [1]) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "%p\n",
+ "aio_return, item 1"),
+ -1);
+ else
+ {
+ // Successful. Remember we have one less thing to finish.
+ --to_finish;
+ list_aiocb [1] = 0;
+ }
+ }
+ else
+ ACE_DEBUG ((LM_DEBUG,
+ "aio_error says aio 1 is in progress\n"));
+ }
+
+ if (aio_error (list_aiocb [0]) != EINPROGRESS)
+ {
+ if (aio_return (list_aiocb [0]) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "%p\n",
+ "aio_return, item 0"),
+ -1);
+ else
+ {
+ // Successful. Store the pointer somewhere and bump the
+ // read entry up to the front, if it is still not done.
+ --to_finish;
+ list_aiocb [0] = this->aiocb_read_;
+ }
+ }
+ else
+ ACE_DEBUG ((LM_DEBUG,
+ "aio_error says aio 0 is in progress\n"));
+ }
+
+ ACE_DEBUG ((LM_DEBUG,
+ "Both the AIO operations done.\n"
+ "The buffer is : %s\n",
+ this->buffer_read_));
+
+ return 0;
+}
+
+int
+main (int argc, char **argv)
+{
+
+ ACE_UNUSED_ARG (argc);
+ ACE_UNUSED_ARG (argv);
+
+ Test_Aio test_aio;
+
+ if (test_aio.init () != 0)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "AIOCB test failed:\n"
+ "ACE_POSIX_AIOCB_PROACTOR may not work in this platform\n"),
+ -1);
+
+ if (test_aio.do_aio () != 0)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "AIOCB test failed:\n"
+ "ACE_POSIX_AIOCB_PROACTOR may not work in this platform\n"),
+ -1);
+
+ ACE_DEBUG ((LM_DEBUG,
+ "AIOCB test successful:\n"
+ "ACE_POSIX_AIOCB_PROACTOR should work in this platform\n"));
+
+ return 0;
+}
diff --git a/ACE/examples/Reactor/Proactor/test_aiosig.cpp b/ACE/examples/Reactor/Proactor/test_aiosig.cpp
new file mode 100644
index 00000000000..1746a10a49c
--- /dev/null
+++ b/ACE/examples/Reactor/Proactor/test_aiosig.cpp
@@ -0,0 +1,294 @@
+// $Id$
+// ============================================================================
+//
+// = FILENAME
+// test_aiosig.cpp
+//
+// = DESCRITPTION
+// Check out test_aiosig_ace.cpp, the ACE'ified version of this
+// program. This program may not be uptodate.
+//
+// = COMPILATION
+// CC -g -o test_aiosig -lrt test_aiosig.cpp
+//
+// = RUN
+// ./test_aiosig
+//
+// = AUTHOR
+// Programming for the Real World. Bill O. GallMeister.
+// Modified by Alexander Babu Arulanthu <alex@cs.wustl.edu>
+//
+// =====================================================================
+
+
+#include <unistd.h>
+#include <fcntl.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <signal.h>
+#include <string.h>
+#include <errno.h>
+#include <stdio.h>
+
+#include <limits.h>
+
+#include <aio.h>
+
+int file_handle = -1;
+char mb1 [BUFSIZ + 1];
+char mb2 [BUFSIZ + 1];
+aiocb aiocb1, aiocb2;
+sigset_t completion_signal;
+
+// Function prototypes.
+int setup_signal_delivery (void);
+int issue_aio_calls (void);
+int query_aio_completions (void);
+int test_aio_calls (void);
+int setup_signal_handler (void);
+int setup_signal_handler (int signal_number);
+
+int
+setup_signal_delivery (void)
+{
+ // Make the sigset_t consisting of the completion signal.
+ if (sigemptyset (&completion_signal) == -1)
+ {
+ perror ("Error:Couldnt init the RT completion signal set\n");
+ return -1;
+ }
+
+ if (sigaddset (&completion_signal, SIGRTMIN) == -1)
+ {
+ perror ("Error:Couldnt init the RT completion signal set\n");
+ return -1;
+ }
+
+ // Mask them.
+ if (pthread_sigmask (SIG_BLOCK, &completion_signal, 0) == -1)
+ {
+ perror ("Error:Couldnt maks the RT completion signals\n");
+ return -1;
+ }
+
+ return setup_signal_handler (SIGRTMIN);
+}
+
+int
+issue_aio_calls (void)
+{
+ // Setup AIOCB.
+ aiocb1.aio_fildes = file_handle;
+ aiocb1.aio_offset = 0;
+ aiocb1.aio_buf = mb1;
+ aiocb1.aio_nbytes = BUFSIZ;
+ aiocb1.aio_reqprio = 0;
+ aiocb1.aio_sigevent.sigev_notify = SIGEV_SIGNAL;
+ aiocb1.aio_sigevent.sigev_signo = SIGRTMIN;
+ aiocb1.aio_sigevent.sigev_value.sival_ptr = (void *) &aiocb1;
+
+ // Fire off the aio write.
+ if (aio_read (&aiocb1) == -1)
+ {
+ // Queueing failed.
+ perror ("Error:Asynch_Read_Stream: aio_read queueing failed\n");
+ return -1;
+ }
+
+ // Setup AIOCB.
+ aiocb2.aio_fildes = file_handle;
+ aiocb2.aio_offset = BUFSIZ + 1;
+ aiocb2.aio_buf = mb2;
+ aiocb2.aio_nbytes = BUFSIZ;
+ aiocb2.aio_reqprio = 0;
+ aiocb2.aio_sigevent.sigev_notify = SIGEV_SIGNAL;
+ aiocb2.aio_sigevent.sigev_signo = SIGRTMIN;
+ aiocb2.aio_sigevent.sigev_value.sival_ptr = (void *) &aiocb2;
+
+ // Fire off the aio write.
+ if (aio_read (&aiocb2) == -1)
+ {
+ // Queueing failed.
+ perror ("Error:Asynch_Read_Stream: aio_read queueing failed\n");
+ return -1;
+ }
+ return 0;
+}
+
+int
+query_aio_completions (void)
+{
+ int result = 0;
+ size_t number_of_compleions = 0;
+ for (number_of_compleions = 0;
+ number_of_compleions < 2;
+ number_of_compleions ++)
+ {
+ // Wait for <milli_seconds> amount of time.
+ // @@ Assigning <milli_seconds> to tv_sec.
+ timespec timeout;
+ timeout.tv_sec = INT_MAX;
+ timeout.tv_nsec = 0;
+
+ // To get back the signal info.
+ siginfo_t sig_info;
+
+ // Await the RT completion signal.
+ int sig_return = sigtimedwait (&completion_signal,
+ &sig_info,
+ &timeout);
+
+ // Error case.
+ // If failure is coz of timeout, then return *0* but set
+ // errno appropriately. This is what the WinNT proactor
+ // does.
+ if (sig_return == -1)
+ {
+ perror ("Error:Error waiting for RT completion signals\n");
+ return -1;
+ }
+
+ // RT completion signals returned.
+ if (sig_return != SIGRTMIN)
+ {
+ printf ("Unexpected signal (%d) has been received while waiting for RT Completion Signals\n",
+ sig_return);
+ return -1;
+ }
+
+ // @@ Debugging.
+ printf ("Sig number found in the sig_info block : %d\n",
+ sig_info.si_signo);
+
+ // Is the signo returned consistent?
+ if (sig_info.si_signo != sig_return)
+ {
+ printf ("Inconsistent signal number (%d) in the signal info block\n",
+ sig_info.si_signo);
+ return -1;
+ }
+
+ // @@ Debugging.
+ printf ("Signal code for this signal delivery : %d\n",
+ sig_info.si_code);
+
+ // Is the signal code an aio completion one?
+ if ((sig_info.si_code != SI_ASYNCIO) &&
+ (sig_info.si_code != SI_QUEUE))
+ {
+ printf ("Unexpected signal code (%d) returned on completion querying\n",
+ sig_info.si_code);
+ return -1;
+ }
+
+ // Retrive the aiocb.
+ aiocb* aiocb_ptr = (aiocb *) sig_info.si_value.sival_ptr;
+
+ // Analyze error and return values. Return values are
+ // actually <errno>'s associated with the <aio_> call
+ // corresponding to aiocb_ptr.
+ int error_code = aio_error (aiocb_ptr);
+ if (error_code == -1)
+ {
+ perror ("Error:Invalid control block was sent to <aio_error> for compleion querying\n");
+ return -1;
+ }
+
+ if (error_code != 0)
+ {
+ // Error occurred in the <aio_>call. Return the errno
+ // corresponding to that <aio_> call.
+ printf ("Error:An AIO call has failed:Error code = %d\n",
+ error_code);
+ return -1;
+ }
+
+ // No error occured in the AIO operation.
+ int nbytes = aio_return (aiocb_ptr);
+ if (nbytes == -1)
+ {
+ perror ("Error:Invalid control block was send to <aio_return>\n");
+ return -1;
+ }
+
+ if (number_of_compleions == 0)
+ // Print the buffer.
+ printf ("Number of bytes transferred : %d\n The buffer : %s \n",
+ nbytes,
+ mb1);
+ else
+ // Print the buffer.
+ printf ("Number of bytes transferred : %d\n The buffer : %s \n",
+ nbytes,
+ mb2);
+ }
+ return 0;
+}
+
+int
+test_aio_calls (void)
+{
+ // Set up the input file.
+ // Open file (in SEQUENTIAL_SCAN mode)
+ file_handle = open ("test_aiosig.cpp", O_RDONLY);
+
+ if (file_handle == -1)
+ {
+ perror ("Error:Opening the inputfile");
+ return -1;
+ }
+
+ if (setup_signal_delivery () < 0)
+ return -1;
+
+ if (issue_aio_calls () < 0)
+ return -1;
+
+ if (query_aio_completions () < 0)
+ return -1;
+
+ return 0;
+}
+
+int
+setup_signal_handler (int signal_number)
+{
+ // Setting up the handler(!) for these signals.
+ struct sigaction reaction;
+ sigemptyset (&reaction.sa_mask); // Nothing else to mask.
+ reaction.sa_flags = SA_SIGINFO; // Realtime flag.
+#if defined (SA_SIGACTION)
+ // Lynx says, it is better to set this bit to be portable.
+ reaction.sa_flags &= SA_SIGACTION;
+#endif /* SA_SIGACTION */
+ reaction.sa_sigaction = null_handler; // Null handler.
+ int sigaction_return = sigaction (SIGRTMIN,
+ &reaction,
+ 0);
+ if (sigaction_return == -1)
+ {
+ perror ("Error:Proactor couldnt do sigaction for the RT SIGNAL");
+ return -1;
+ }
+
+ return 0;
+}
+
+void
+null_handler (int /* signal_number */,
+ siginfo_t * /* info */,
+ void * /* context */)
+{
+}
+
+int
+main (int, char *[])
+{
+ if (test_aio_calls () == 0)
+ printf ("RT SIG test successful:\n"
+ "ACE_POSIX_SIG_PROACTOR should work in this platform\n");
+ else
+ printf ("RT SIG test failed:\n"
+ "ACE_POSIX_SIG_PROACTOR may not work in this platform\n");
+ return 0;
+}
diff --git a/ACE/examples/Reactor/Proactor/test_aiosig_ace.cpp b/ACE/examples/Reactor/Proactor/test_aiosig_ace.cpp
new file mode 100644
index 00000000000..34c1b9b5ab2
--- /dev/null
+++ b/ACE/examples/Reactor/Proactor/test_aiosig_ace.cpp
@@ -0,0 +1,358 @@
+// $Id$
+
+// ============================================================================
+//
+// = FILENAME
+// test_aiosig_sig.cpp
+//
+// = DESCRITPTION
+// This program helps you to test the <aio_*> calls on a
+// platform.
+// Before running this test, make sure the platform can
+// support POSIX <aio_> calls, using ACE_ROOT/tests/Aio_Plaform_Test.cpp
+//
+// This program tests the Signal based completion approach which
+// uses <sigtimedwait> for completion querying.
+// If this test is successful, ACE_POSIX_SIG_PROACTOR
+// can be used on this platform.
+//
+// This program is a ACE version of the
+// $ACE_ROOT/examples/Reactor/Proactor/test_aiosig.cpp, with
+// ACE_DEBUGs and Message_Blocks.
+//
+// This test does the following:
+// Issue two <aio_read>s.
+// Assign SIGRTMIN as the notification signal.
+// Mask these signals from delivery.
+// Receive this signal by doing <sigtimedwait>.
+// Wait for two completions (two signals)
+//
+// = COMPILATION
+// make
+//
+// = RUN
+// ./test_aiosig_ace
+//
+// = AUTHOR
+// Programming for the Real World. Bill O. GallMeister.
+// Modified by Alexander Babu Arulanthu <alex@cs.wustl.edu>
+//
+// =====================================================================
+
+#include "ace/Message_Block.h"
+#include "ace/Log_Msg.h"
+#include "ace/os_include/os_aio.h"
+#include "ace/OS_NS_signal.h"
+#include "ace/OS_NS_unistd.h"
+#include "ace/OS_NS_fcntl.h"
+#include "ace/Asynch_IO.h" // for ACE_INFINITE
+
+static ACE_HANDLE file_handle = ACE_INVALID_HANDLE;
+static ACE_Message_Block mb1 (BUFSIZ + 1);
+static ACE_Message_Block mb2 (BUFSIZ + 1);
+static aiocb aiocb1;
+static aiocb aiocb2;
+static aiocb aiocb3;
+static sigset_t completion_signal;
+
+// Function prototypes.
+static int setup_signal_delivery (void);
+static int issue_aio_calls (void);
+static int query_aio_completions (void);
+static int test_aio_calls (void);
+static void null_handler (int signal_number, siginfo_t *info, void *context);
+static int setup_signal_handler (int signal_number);
+
+static int
+setup_signal_delivery (void)
+{
+ // = Mask all the signals.
+
+ sigset_t full_set;
+
+ // Get full set.
+ if (sigfillset (&full_set) != 0)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "Error:(%P | %t):%p\n",
+ "sigfillset failed"),
+ -1);
+
+ // Mask them.
+ if (ACE_OS::pthread_sigmask (SIG_SETMASK, &full_set, 0) != 0)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "Error:(%P | %t):%p\n",
+ "pthread_sigmask failed"),
+ -1);
+
+ // = Make a mask with SIGRTMIN only. We use only that signal to
+ // issue <aio_>'s.
+
+ if (sigemptyset (&completion_signal) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR, "Error: %p\n",
+ "Couldnt init the RT completion signal set"),
+ -1);
+
+ if (sigaddset (&completion_signal,
+ SIGRTMIN) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR, "Error: %p\n",
+ "Couldnt init the RT completion signal set"),
+ -1);
+
+ // Set up signal handler for this signal.
+ return setup_signal_handler (SIGRTMIN);
+}
+
+static int
+setup_signal_handler (int signal_number)
+{
+ ACE_UNUSED_ARG (signal_number);
+
+ // Setting up the handler(!) for these signals.
+ struct sigaction reaction;
+ sigemptyset (&reaction.sa_mask); // Nothing else to mask.
+ reaction.sa_flags = SA_SIGINFO; // Realtime flag.
+#if defined (SA_SIGACTION)
+ // Lynx says, it is better to set this bit to be portable.
+ reaction.sa_flags &= SA_SIGACTION;
+#endif /* SA_SIGACTION */
+ reaction.sa_sigaction = null_handler; // Null handler.
+ int sigaction_return = sigaction (SIGRTMIN,
+ &reaction,
+ 0);
+ if (sigaction_return == -1)
+ ACE_ERROR_RETURN ((LM_ERROR, "Error: %p\n",
+ "Proactor couldnt do sigaction for the RT SIGNAL"),
+ -1);
+ return 0;
+}
+
+
+static int
+issue_aio_calls (void)
+{
+ // Setup AIOCB.
+ aiocb1.aio_fildes = file_handle;
+ aiocb1.aio_offset = 0;
+ aiocb1.aio_buf = mb1.wr_ptr ();
+ aiocb1.aio_nbytes = BUFSIZ;
+ aiocb1.aio_reqprio = 0;
+ aiocb1.aio_sigevent.sigev_notify = SIGEV_SIGNAL;
+ aiocb1.aio_sigevent.sigev_signo = SIGRTMIN;
+ aiocb1.aio_sigevent.sigev_value.sival_ptr = (void *) &aiocb1;
+
+ // Fire off the aio read.
+ if (aio_read (&aiocb1) == -1)
+ // Queueing failed.
+ ACE_ERROR_RETURN ((LM_ERROR, "Error: %p\n",
+ "Asynch_Read_Stream: aio_read queueing failed"),
+ -1);
+
+ // Setup AIOCB.
+ aiocb2.aio_fildes = file_handle;
+ aiocb2.aio_offset = BUFSIZ + 1;
+ aiocb2.aio_buf = mb2.wr_ptr ();
+ aiocb2.aio_nbytes = BUFSIZ;
+ aiocb2.aio_reqprio = 0;
+ aiocb2.aio_sigevent.sigev_notify = SIGEV_SIGNAL;
+ aiocb2.aio_sigevent.sigev_signo = SIGRTMIN;
+ aiocb2.aio_sigevent.sigev_value.sival_ptr = (void *) &aiocb2;
+
+ // Fire off the aio read.
+ if (aio_read (&aiocb2) == -1)
+ // Queueing failed.
+ ACE_ERROR_RETURN ((LM_ERROR, "Error: %p\n",
+ "Asynch_Read_Stream: aio_read queueing failed"),
+ -1);
+
+ // Setup sigval.
+ aiocb3.aio_fildes = ACE_INVALID_HANDLE;
+ aiocb3.aio_offset = 0;
+ aiocb3.aio_buf = 0;
+ aiocb3.aio_nbytes = 0;
+ aiocb3.aio_reqprio = 0;
+ aiocb3.aio_sigevent.sigev_notify = SIGEV_SIGNAL;
+ aiocb3.aio_sigevent.sigev_signo = SIGRTMIN;
+ aiocb3.aio_sigevent.sigev_value.sival_ptr = (void *) &aiocb3;
+ sigval value;
+ value.sival_ptr = reinterpret_cast<void *> (&aiocb3);
+ // Queue this one for completion right now.
+ if (sigqueue (ACE_OS::getpid (), SIGRTMIN, value) == -1)
+ // Queueing failed.
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "Error: %p\n", "sigqueue"),
+ -1);
+
+ return 0;
+}
+
+static int
+query_aio_completions (void)
+{
+ for (size_t number_of_compleions = 0;
+ number_of_compleions < 3;
+ number_of_compleions ++)
+ {
+ // Wait for <milli_seconds> amount of time. @@ Assigning
+ // <milli_seconds> to tv_sec.
+ timespec timeout;
+ timeout.tv_sec = ACE_INFINITE;
+ timeout.tv_nsec = 0;
+
+ // To get back the signal info.
+ siginfo_t sig_info;
+
+ // Await the RT completion signal.
+ int sig_return = sigtimedwait (&completion_signal,
+ &sig_info,
+ &timeout);
+
+ // Error case.
+ // If failure is coz of timeout, then return *0* but set
+ // errno appropriately. This is what the WinNT proactor
+ // does.
+ if (sig_return == -1)
+ ACE_ERROR_RETURN ((LM_ERROR, "Error: %p\n",
+ "Error waiting for RT completion signals"),
+ -1);
+
+ // RT completion signals returned.
+ if (sig_return != SIGRTMIN)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "Unexpected signal (%d) has been received while waiting for RT Completion Signals\n",
+ sig_return),
+ -1);
+
+ // @@ Debugging.
+ ACE_DEBUG ((LM_DEBUG,
+ "Sig number found in the sig_info block : %d\n",
+ sig_info.si_signo));
+
+ // Is the signo returned consistent?
+ if (sig_info.si_signo != sig_return)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "Inconsistent signal number (%d) in the signal info block\n",
+ sig_info.si_signo),
+ -1);
+
+ // @@ Debugging.
+ ACE_DEBUG ((LM_DEBUG,
+ "Signal code for this signal delivery : %d\n",
+ sig_info.si_code));
+
+ // Is the signal code an aio completion one?
+ if ((sig_info.si_code != SI_ASYNCIO) &&
+ (sig_info.si_code != SI_QUEUE))
+ ACE_ERROR_RETURN ((LM_DEBUG,
+ "Unexpected signal code (%d) returned on completion querying\n",
+ sig_info.si_code),
+ -1);
+
+ // Retrive the aiocb.
+ aiocb* aiocb_ptr = (aiocb *) sig_info.si_value.sival_ptr;
+ if (aiocb_ptr == &aiocb3)
+ {
+ ACE_ASSERT (sig_info.si_code == SI_QUEUE);
+ ACE_DEBUG ((LM_DEBUG, "sigqueue caught... good\n"));
+ }
+ else
+ {
+ // Analyze error and return values. Return values are
+ // actually <errno>'s associated with the <aio_> call
+ // corresponding to aiocb_ptr.
+ int error_code = aio_error (aiocb_ptr);
+ if (error_code == -1)
+ ACE_ERROR_RETURN ((LM_ERROR, "%p\n",
+ "Invalid control block was sent to <aio_error> for completion querying"),
+ -1);
+
+ if (error_code != 0)
+ // Error occurred in the <aio_>call. Return the errno
+ // corresponding to that <aio_> call.
+ ACE_ERROR_RETURN ((LM_ERROR, "%p\n",
+ "An AIO call has failed"),
+ error_code);
+
+ // No error occured in the AIO operation.
+ int nbytes = aio_return (aiocb_ptr);
+ if (nbytes == -1)
+ ACE_ERROR_RETURN ((LM_ERROR, "%p\n",
+ "Invalid control block was send to <aio_return>"),
+ -1);
+ if (number_of_compleions == 0)
+ {
+ // Print the buffer.
+ ACE_DEBUG ((LM_DEBUG,
+ "\n Number of bytes transferred : %d\n",
+ nbytes));
+ // Note... the dumps of the buffers are disabled because they
+ // may easily overrun the ACE_Log_Msg output buffer. If you need
+ // to turn the on for some reason, be careful of this.
+#if 0
+ ACE_DEBUG ((LM_DEBUG, "The buffer : %s \n", mb1.rd_ptr ()));
+#endif /* 0 */
+ }
+ else
+ {
+ // Print the buffer.
+ ACE_DEBUG ((LM_DEBUG,
+ "\n Number of bytes transferred : %d\n",
+ nbytes));
+#if 0
+ ACE_DEBUG ((LM_DEBUG, "The buffer : %s \n", mb2.rd_ptr ()));
+#endif /* 0 */
+ }
+ }
+ }
+
+ return 0;
+}
+
+static int
+test_aio_calls (void)
+{
+ // Set up the input file.
+ // Open file (in SEQUENTIAL_SCAN mode)
+ file_handle = ACE_OS::open ("test_aiosig_ace.cpp",
+ O_RDONLY);
+
+ if (file_handle == ACE_INVALID_HANDLE)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "%p\n",
+ "ACE_OS::open"),
+ -1);
+
+ if (setup_signal_delivery () == -1)
+ return -1;
+
+ if (issue_aio_calls () == -1)
+ return -1;
+
+ if (query_aio_completions () == -1)
+ return -1;
+
+ return 0;
+}
+
+static void
+null_handler (int signal_number,
+ siginfo_t */* info */,
+ void * /* context */)
+{
+ ACE_ERROR ((LM_ERROR,
+ "Error:%s:Signal number %d\n"
+ "Mask all the RT signals for this thread",
+ "ACE_POSIX_SIG_Proactor::null_handler called",
+ signal_number));
+}
+
+int
+main (int, char *[])
+{
+ if (test_aio_calls () == 0)
+ printf ("RT SIG test successful:\n"
+ "ACE_POSIX_SIG_PROACTOR should work in this platform\n");
+ else
+ printf ("RT SIG test failed:\n"
+ "ACE_POSIX_SIG_PROACTOR may not work in this platform\n");
+ return 0;
+}
diff --git a/ACE/examples/Reactor/Proactor/test_cancel.cpp b/ACE/examples/Reactor/Proactor/test_cancel.cpp
new file mode 100644
index 00000000000..c10f8e9be2c
--- /dev/null
+++ b/ACE/examples/Reactor/Proactor/test_cancel.cpp
@@ -0,0 +1,246 @@
+// $Id$
+
+// ============================================================================
+//
+// = LIBRARY
+// examples
+//
+// = FILENAME
+// test_cancel.cpp
+//
+// = DESCRIPTION
+// This program tests cancelling an Asynchronous Operation in the
+// Proactor framework.
+//
+// This tests accepts a connection and issues an Asynchronous Read
+// Stream. It reads <read_size> (option -s) number of bytes and
+// when this operation completes, it issues another Asynchronous
+// Read Stream to <read_size> and immediately calls <cancel> to
+// cancel the operation and so the program exits closing the
+// connection.
+//
+// Works fine on NT. On Solaris platforms, the asynch read is
+// pending, but the cancel returns with the value <AIO_ALLDONE>
+// indicating all the operations in that handle are done.
+// But, LynxOS has a good <aio_cancel> implementation. It works
+// fine.
+//
+// = RUN
+// ./test_cancel -p <port_number>
+// Then telnet to this port and send <read_size> bytes and your
+// connection should get closed down.
+//
+// = AUTHOR
+// Irfan Pyarali (irfan@cs.wustl.edu)
+//
+// ============================================================================
+
+#include "ace/OS_main.h"
+#include "ace/Service_Config.h"
+#include "ace/Proactor.h"
+#include "ace/Asynch_IO.h"
+#include "ace/Asynch_IO_Impl.h"
+#include "ace/Asynch_Acceptor.h"
+#include "ace/INET_Addr.h"
+#include "ace/SOCK_Connector.h"
+#include "ace/SOCK_Acceptor.h"
+#include "ace/SOCK_Stream.h"
+#include "ace/Message_Block.h"
+#include "ace/Get_Opt.h"
+#include "ace/Log_Msg.h"
+#include "ace/OS_NS_sys_socket.h"
+
+ACE_RCSID (Proactor, test_proactor, "$Id$")
+
+#if ((defined (ACE_WIN32) && !defined (ACE_HAS_WINCE)) || (defined (ACE_HAS_AIO_CALLS)))
+ // This only works on Win32 platforms and on Unix platforms supporting
+ // POSIX aio calls.
+
+#include "test_cancel.h"
+
+static u_short port = ACE_DEFAULT_SERVER_PORT;
+static int done = 0;
+static int read_size = 2;
+
+
+Receiver::Receiver (void)
+ : mb_ (read_size + 1),
+ handle_ (ACE_INVALID_HANDLE)
+{
+}
+
+Receiver::~Receiver (void)
+{
+ ACE_DEBUG ((LM_DEBUG,
+ "Receiver: Closing down Remote connection:%d\n",
+ this->handle_));
+
+ ACE_OS::closesocket (this->handle_);
+}
+
+void
+Receiver::open (ACE_HANDLE handle,
+ ACE_Message_Block &)
+{
+ // New connection, initiate stuff
+
+ ACE_DEBUG ((LM_DEBUG, "%N:%l:Receiver::open called\n"));
+
+ // Cache the new connection
+ this->handle_ = handle;
+
+ // Initiate ACE_Asynch_Read_Stream
+ if (this->rs_.open (*this, this->handle_) == -1)
+ {
+ ACE_ERROR ((LM_ERROR, "%p\n", "ACE_Asynch_Read_Stream::open"));
+ return;
+ }
+
+ // Try to read <n> bytes from the stream.
+
+ ACE_DEBUG ((LM_DEBUG,
+ "Receiver::open: Issuing Asynch Read of (%d) bytes from the stream\n",
+ read_size));
+
+ if (this->rs_.read (this->mb_,
+ read_size) == -1)
+ ACE_ERROR ((LM_ERROR,
+ "%p\n",
+ "Receiver::open: Failed to issue the read"));
+}
+
+void
+Receiver::handle_read_stream (const ACE_Asynch_Read_Stream::Result &result)
+{
+ ACE_DEBUG ((LM_DEBUG, "handle_read_stream called\n"));
+
+ // Reset pointers
+ result.message_block ().rd_ptr ()[result.bytes_transferred ()] = '\0';
+
+ ACE_DEBUG ((LM_DEBUG, "********************\n"));
+ ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_to_read", result.bytes_to_read ()));
+ ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "handle", result.handle ()));
+ ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_transfered", result.bytes_transferred ()));
+ ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "act", (u_long) result.act ()));
+ ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "success", result.success ()));
+ ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "completion_key", (u_long) result.completion_key ()));
+ ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "error", result.error ()));
+ ACE_DEBUG ((LM_DEBUG, "********************\n"));
+ ACE_DEBUG ((LM_DEBUG, "%s = %s\n", "message_block", result.message_block ().rd_ptr ()));
+
+ if (result.success () && !result.error ())
+ {
+ // Successful read: No error.
+
+ // Set the pointers back in the message block.
+ result.message_block ().wr_ptr (result.message_block ().rd_ptr ());
+
+ // Issue another read, but immediately cancel it.
+
+ // Issue the read.
+
+ ACE_DEBUG ((LM_DEBUG,
+ "Issuing Asynch Read of (%d) bytes from the stream\n",
+ read_size));
+
+ if (this->rs_.read (this->mb_,
+ read_size) == -1)
+ ACE_ERROR ((LM_ERROR,
+ "%p\n",
+ "Receiver::handle_read_stream: Failed to issue the read"));
+
+ // Cancel the read.
+
+ ACE_DEBUG ((LM_DEBUG,
+ "Cacelling Asynch Read "));
+
+ int ret_val = this->rs_.cancel ();
+ if (ret_val == -1)
+ ACE_ERROR ((LM_ERROR,
+ "%p\n",
+ "Receiver::handle_read_stream: Failed to cancel the read"));
+
+ ACE_DEBUG ((LM_DEBUG, "Asynch IO : Cancel : Result = %d\n",
+ ret_val));
+ }
+ else
+ {
+ done = 1;
+
+ ACE_DEBUG ((LM_DEBUG, "Receiver completed\n"));
+
+ // Print the error message if any.
+ if (result.error () != 0)
+ {
+ errno = result.error ();
+
+ ACE_ERROR ((LM_ERROR,
+ "%p\n",
+ "Asynch Read Stream Error: "));
+ }
+ }
+}
+
+static int
+parse_args (int argc, ACE_TCHAR *argv[])
+{
+ ACE_Get_Opt get_opt (argc, argv, ACE_TEXT("p:s:"));
+ int c;
+
+ while ((c = get_opt ()) != EOF)
+ switch (c)
+ {
+ case 'p':
+ port = ACE_OS::atoi (get_opt.opt_arg ());
+ break;
+ case 's':
+ read_size = ACE_OS::atoi (get_opt.opt_arg ());
+ break;
+ default:
+ ACE_ERROR ((LM_ERROR, "%p.\n",
+ "usage :\n"
+ "-p <port>\n"
+ "-s <read_size>\n"));
+ return -1;
+ }
+
+ return 0;
+}
+
+int
+ACE_TMAIN (int argc, ACE_TCHAR *argv[])
+{
+ if (parse_args (argc, argv) == -1)
+ return -1;
+
+ // Note: acceptor parameterized by the Receiver
+ ACE_Asynch_Acceptor<Receiver> acceptor;
+
+ // Listening passively.
+ if (acceptor.open (ACE_INET_Addr (port),
+ read_size,
+ 1) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "ACE:acceptor::open failed\n"),
+ 1);
+
+ int success = 1;
+
+ while (success > 0 && !done)
+ // dispatch events
+ success = ACE_Proactor::instance ()->handle_events ();
+
+ return 0;
+}
+
+#else /* ACE_WIN32 && !ACE_HAS_WINCE || ACE_HAS_AIO_CALLS*/
+
+int
+ACE_TMAIN (int, ACE_TCHAR *[])
+{
+ ACE_DEBUG ((LM_DEBUG,
+ "This example does not work on this platform.\n"));
+ return 1;
+}
+
+#endif /* ACE_WIN32 && !ACE_HAS_WINCE || ACE_HAS_AIO_CALLS*/
diff --git a/ACE/examples/Reactor/Proactor/test_cancel.h b/ACE/examples/Reactor/Proactor/test_cancel.h
new file mode 100644
index 00000000000..45c4bfbc85b
--- /dev/null
+++ b/ACE/examples/Reactor/Proactor/test_cancel.h
@@ -0,0 +1,47 @@
+/*
+** $Id$
+*/
+
+#ifndef _TEST_CANCEL_H
+#define _TEST_CANCEL_H
+
+#include "ace/Asynch_IO.h"
+
+class Receiver : public ACE_Service_Handler
+{
+ // = TITLE
+ //
+ // Receiver
+ //
+ // = DESCRIPTION
+ //
+ // The class will be created by ACE_Asynch_Acceptor when new
+ // connections arrive. This class will then receive data from
+ // the network connection and dump it to a file.
+
+public:
+ Receiver (void);
+ ~Receiver (void);
+
+ virtual void open (ACE_HANDLE handle,
+ ACE_Message_Block &message_block);
+ // This is called after the new connection has been accepted.
+
+protected:
+ // These methods are called by the framework
+
+ virtual void handle_read_stream (const ACE_Asynch_Read_Stream::Result &result);
+ // This is called when asynchronous read from the socket complete
+
+private:
+ ACE_Asynch_Read_Stream rs_;
+ // rs (read stream): for reading from a socket
+
+ ACE_Message_Block mb_;
+ // Message block to read from the stream.
+
+ ACE_HANDLE handle_;
+ // Handle for IO to remote peer
+};
+
+#endif /* _TEST_CANCEL_H */
diff --git a/ACE/examples/Reactor/Proactor/test_end_event_loop.cpp b/ACE/examples/Reactor/Proactor/test_end_event_loop.cpp
new file mode 100644
index 00000000000..096f77b089d
--- /dev/null
+++ b/ACE/examples/Reactor/Proactor/test_end_event_loop.cpp
@@ -0,0 +1,168 @@
+// $Id$
+// ============================================================================
+//
+// = FILENAME
+// test_end_event_loop.cpp
+//
+// = DESCRITPTION
+// This program tests the event loop mechanism of the
+// Proactor. To end the event loop, threads that are blocked in
+// waiting for completions are woken up and the event loop comes
+// to the end. This is tested in this program.
+//
+// Threads are doing <run_event_loop> with/without time_out
+// values and the main thread calls <end_event_loop>.
+//
+// = COMPILATION
+// make
+//
+// = RUN
+// ./test_end_event_loop
+//
+// = AUTHOR
+// Alexander Babu Arulanthu <alex@cs.wustl.edu>
+//
+// =====================================================================
+
+#include "ace/OS_NS_unistd.h"
+#include "ace/Proactor.h"
+#include "ace/Task.h"
+#include "ace/WIN32_Proactor.h"
+#include "ace/POSIX_Proactor.h"
+#include "ace/OS_main.h"
+
+#if ((defined (ACE_WIN32) && !defined (ACE_HAS_WINCE)) || \
+ (defined (ACE_HAS_AIO_CALLS)) && !defined (ACE_POSIX_AIOCB_PROACTOR))
+// This only works on Win32 platforms and on Unix platforms supporting
+// POSIX aio calls.
+
+class My_Task: public ACE_Task <ACE_NULL_SYNCH>
+{
+ // = TITLE
+ //
+ // Contains thread functions which execute event loops. Each
+ // thread waits for a different signal.
+ //
+public:
+ // Constructor.
+ My_Task (void)
+ : time_flag_ (0)
+ {}
+
+
+ virtual ~My_Task (void) {}
+ // Destructor.
+
+ // If time_flag is zero do the eventloop indefinitely, otherwise do
+ // it for finite amount of time (13secs!!!).
+ int open (void *timed_event_loop)
+ {
+ // Set the local variable.
+ if (timed_event_loop == 0)
+ this->time_flag_ = 0;
+ else
+ this->time_flag_ = 1;
+
+ // Spawn the threads.
+ if (this->activate (THR_NEW_LWP, 5) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "%N:%l:%p\n",
+ "My_Task:open: <activate> failed"),
+ -1);
+
+ return 0;
+ }
+
+ // Thread function.
+ int svc (void)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "(%P|%t):Starting svc routine\n"));
+
+ if (this->time_flag_)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "(%P|%t):Going to do *timed* <run_event_loop> \n"));
+
+ ACE_Time_Value run_time (13);
+
+ if (ACE_Proactor::instance ()->run_event_loop (run_time) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR, "(%P|%t):%p.\n",
+ "<Proactor::run_event_loop> failed"),
+ -1);
+ }
+ else
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "(%P|%t):Going to do *indefinite* <run_event_loop> \n"));
+
+ if (ACE_Proactor::instance ()->run_event_loop () == -1)
+ ACE_ERROR_RETURN ((LM_ERROR, "(%P|%t):%p.\n",
+ "<Proactor::run_event_loop> failed"),
+ -1);
+ }
+ return 0;
+ };
+
+private:
+ int time_flag_;
+ // If zero, indefinite event loop, otherwise timed event loop.
+};
+
+int
+ACE_TMAIN (int argc, ACE_TCHAR *argv [])
+{
+ ACE_UNUSED_ARG (argc);
+ ACE_UNUSED_ARG (argv);
+
+ ACE_DEBUG ((LM_DEBUG,
+ "(%P | %t):Test starts \n"));
+
+ // Let us get the singleton proactor created here. This is very
+ // important. This will mask the signal used in the Proactor masked
+ // for the main thread (and all the threads).
+ ACE_Proactor *proactor = ACE_Proactor::instance ();
+ ACE_UNUSED_ARG (proactor);
+
+ My_Task task1, task2;
+
+ // Test the indefinite run event loop.
+ if (task1.open (0) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "%N:%l:(%P | %t):Failed to <open> the task\n"),
+ 1);
+
+ // Test the indefinite run event loop. Just pass a non-zero.
+ if (task2.open ((void *)&task2) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "%N:%l:(%P | %t):Failed to <open> the task\n"),
+ 1);
+
+ // Give a gap.
+ ACE_OS::sleep (3);
+
+ // End the event loop.
+ if (ACE_Proactor::instance ()->end_event_loop () == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "%N:%l:(%P | %t):Failed to <end_event_loop>\n"),
+ 1);
+
+ ACE_Thread_Manager::instance ()->wait ();
+
+ ACE_DEBUG ((LM_DEBUG,
+ "(%P | %t):Test ends\n"));
+ return 0;
+}
+
+#else /* ACE_WIN32 && !ACE_HAS_WINCE || ACE_HAS_AIO_CALLS && !ACE_POSIX_AIOCB_PROACTOR*/
+
+int
+main (int, char *[])
+{
+ ACE_DEBUG ((LM_DEBUG,
+ "This example cannot work with AIOCB_Proactor.\n"));
+ return 1;
+}
+
+#endif /* ACE_WIN32 && !ACE_HAS_WINCE || ACE_HAS_AIO_CALLS && !ACE_POSIX_AIOCB_PROACTOR*/
+
diff --git a/ACE/examples/Reactor/Proactor/test_multiple_loops.cpp b/ACE/examples/Reactor/Proactor/test_multiple_loops.cpp
new file mode 100644
index 00000000000..ac4228ab641
--- /dev/null
+++ b/ACE/examples/Reactor/Proactor/test_multiple_loops.cpp
@@ -0,0 +1,140 @@
+// $Id$
+
+// ============================================================================
+//
+// = LIBRARY
+// examples
+//
+// = FILENAME
+// test_multiple_loops.cpp
+//
+// = DESCRIPTION
+//
+// This example application shows how to write programs that
+// combine the Proactor and Reactor event loops. This is possible
+// only on WIN32 platform.
+//
+// = AUTHOR
+// Irfan Pyarali
+//
+// ============================================================================
+
+#include "ace/Task.h"
+#include "ace/Proactor.h"
+#include "ace/WIN32_Proactor.h"
+#include "ace/Atomic_Op.h"
+#include "ace/OS_NS_unistd.h"
+
+ACE_RCSID(Proactor, test_multiple_loops, "$Id$")
+
+#if (defined (ACE_WIN32) && !defined (ACE_HAS_WINCE))
+
+class Timeout_Handler : public ACE_Handler, public ACE_Event_Handler
+{
+ // = TITLE
+ // Generic timeout handler.
+
+public:
+ Timeout_Handler (void)
+ {
+ }
+
+ // This is called by the Proactor. This is declared in ACE_Handler.
+ virtual void handle_time_out (const ACE_Time_Value &tv,
+ const void *arg)
+ {
+ // Print out when timeouts occur.
+ ACE_DEBUG ((LM_DEBUG, "(%t) %d timeout occurred for %s @ %d.\n",
+ ++count_,
+ (char *) arg,
+ tv.sec ()));
+
+ // Since there is only one thread that can do the timeouts in
+ // Reactor, lets keep the handle_timeout short for that
+ // thread.
+ if (ACE_OS::strcmp ((char *) arg, "Proactor") == 0)
+ // Sleep for a while
+ ACE_OS::sleep (1);
+ }
+
+ // This method is declared in ACE_Event_Handler.
+ virtual int handle_timeout (const ACE_Time_Value &tv,
+ const void *arg)
+ {
+ this->handle_time_out (tv, arg);
+ return 0;
+ }
+
+private:
+ ACE_Atomic_Op <ACE_Thread_Mutex, int> count_;
+};
+
+class Worker : public ACE_Task <ACE_NULL_SYNCH>
+{
+public:
+
+ // Thread fuction.
+ int svc (void)
+ {
+ ACE_DEBUG ((LM_DEBUG, "(%t) Worker started\n"));
+
+ // Handle events for 13 seconds.
+ ACE_Time_Value run_time (13);
+
+ // Try to become the owner
+ ACE_Reactor::instance ()->owner (ACE_Thread::self ());
+
+ if (ACE_Reactor::run_event_loop (run_time) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR, "%p.\n", "Worker::svc"), -1);
+ else
+ ACE_DEBUG ((LM_DEBUG, "(%t) work complete\n"));
+
+ return 0;
+ }
+};
+
+int
+ACE_TMAIN (int, ACE_TCHAR *[])
+{
+ Timeout_Handler handler;
+ ACE_WIN32_Proactor win32_proactor (0, 1);
+ ACE_Proactor proactor (&win32_proactor, 0, 0);
+
+ ACE_Reactor::instance ()->register_handler (proactor.implementation ());
+
+ // Register a 2 second timer.
+ ACE_Time_Value foo_tv (2);
+ if (proactor.schedule_timer (handler,
+ (void *) "Proactor",
+ ACE_Time_Value::zero,
+ foo_tv) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "schedule_timer"), -1);
+
+ // Register a 3 second timer.
+ ACE_Time_Value bar_tv (3);
+ if (ACE_Reactor::instance ()->schedule_timer (&handler,
+ (void *) "Reactor",
+ ACE_Time_Value::zero,
+ bar_tv) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "schedule_timer"), -1);
+
+ Worker worker;
+
+ if (worker.activate (THR_NEW_LWP, 10) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR, "%p.\n", "main"), -1);
+
+ ACE_Thread_Manager::instance ()->wait ();
+
+ // Remove from reactor
+ ACE_Reactor::instance ()->remove_handler (&proactor,
+ ACE_Event_Handler::DONT_CALL);
+
+ return 0;
+}
+#else
+int
+main (int, char *[])
+{
+ return 0;
+}
+#endif /* ACE_WIN32 && !ACE_HAS_WINCE */
diff --git a/ACE/examples/Reactor/Proactor/test_proactor.cpp b/ACE/examples/Reactor/Proactor/test_proactor.cpp
new file mode 100644
index 00000000000..035a2facf6a
--- /dev/null
+++ b/ACE/examples/Reactor/Proactor/test_proactor.cpp
@@ -0,0 +1,679 @@
+// $Id$
+
+// ============================================================================
+//
+// = LIBRARY
+// examples
+//
+// = FILENAME
+// test_proactor.cpp
+//
+// = DESCRIPTION
+// This program illustrates how the <ACE_Proactor> can be used to
+// implement an application that does various asynchronous
+// operations.
+//
+// = AUTHOR
+// Irfan Pyarali <irfan@cs.wustl.edu>
+//
+// ============================================================================
+
+#include "ace/OS_NS_string.h"
+#include "ace/OS_main.h"
+#include "ace/Service_Config.h"
+#include "ace/Proactor.h"
+#include "ace/Asynch_IO.h"
+#include "ace/Asynch_IO_Impl.h"
+#include "ace/Asynch_Acceptor.h"
+#include "ace/INET_Addr.h"
+#include "ace/SOCK_Connector.h"
+#include "ace/SOCK_Acceptor.h"
+#include "ace/SOCK_Stream.h"
+#include "ace/Message_Block.h"
+#include "ace/Get_Opt.h"
+#include "ace/Log_Msg.h"
+#include "ace/OS_NS_sys_stat.h"
+#include "ace/OS_NS_sys_socket.h"
+#include "ace/OS_NS_unistd.h"
+#include "ace/OS_NS_fcntl.h"
+
+ACE_RCSID(Proactor, test_proactor, "$Id$")
+
+#if ((defined (ACE_WIN32) && !defined (ACE_HAS_WINCE)) || (defined (ACE_HAS_AIO_CALLS)))
+ // This only works on Win32 platforms and on Unix platforms supporting
+ // POSIX aio calls.
+
+#include "test_proactor.h"
+
+
+// Host that we're connecting to.
+static ACE_TCHAR *host = 0;
+
+// Port that we're receiving connections on.
+static u_short port = ACE_DEFAULT_SERVER_PORT;
+
+// File that we're sending.
+static const ACE_TCHAR *file = ACE_TEXT("test_proactor.cpp");
+
+// Name of the output file.
+static const ACE_TCHAR *dump_file = ACE_TEXT("output");
+
+// Keep track of when we're done.
+static int done = 0;
+
+// Size of each initial asynchronous <read> operation.
+static int initial_read_size = BUFSIZ;
+
+
+Receiver::Receiver (void)
+ : dump_file_ (ACE_INVALID_HANDLE),
+ handle_ (ACE_INVALID_HANDLE)
+{
+}
+
+Receiver::~Receiver (void)
+{
+ ACE_OS::close (this->dump_file_);
+ ACE_OS::closesocket (this->handle_);
+}
+
+void
+Receiver::open (ACE_HANDLE handle,
+ ACE_Message_Block &message_block)
+{
+ ACE_DEBUG ((LM_DEBUG,
+ "%N:%l:Receiver::open called\n"));
+
+ // New connection, so initiate stuff.
+
+ // Cache the new connection
+ this->handle_ = handle;
+
+ // File offset starts at zero
+ this->file_offset_ = 0;
+
+ // Open dump file (in OVERLAPPED mode)
+ this->dump_file_ = ACE_OS::open (dump_file,
+ O_CREAT | O_RDWR | O_TRUNC | \
+ FILE_FLAG_OVERLAPPED);
+ if (this->dump_file_ == ACE_INVALID_HANDLE)
+ {
+ ACE_ERROR ((LM_ERROR,
+ "%p\n",
+ "ACE_OS::open"));
+ return;
+ }
+
+ // Initiate <ACE_Asynch_Write_File>.
+ if (this->wf_.open (*this,
+ this->dump_file_) == -1)
+ {
+ ACE_ERROR ((LM_ERROR,
+ "%p\n",
+ "ACE_Asynch_Write_File::open"));
+ return;
+ }
+
+ // Initiate <ACE_Asynch_Read_Stream>.
+ if (this->rs_.open (*this, this->handle_) == -1)
+ {
+ ACE_ERROR ((LM_ERROR,
+ "%p\n",
+ "ACE_Asynch_Read_Stream::open"));
+ return;
+ }
+
+ // Fake the result and make the <handle_read_stream> get
+ // called. But, not, if there is '0' is transferred.
+ if (message_block.length () != 0)
+ {
+ // Duplicate the message block so that we can keep it around.
+ ACE_Message_Block &duplicate =
+ *message_block.duplicate ();
+
+ // Fake the result so that we will get called back.
+ ACE_Asynch_Read_Stream_Result_Impl *fake_result =
+ ACE_Proactor::instance ()->create_asynch_read_stream_result (this->proxy (),
+ this->handle_,
+ duplicate,
+ initial_read_size,
+ 0,
+ ACE_INVALID_HANDLE,
+ 0,
+ 0);
+
+ size_t bytes_transferred = message_block.length ();
+
+ // <complete> for Accept would have already moved the <wr_ptr>
+ // forward. Update it to the beginning position.
+ duplicate.wr_ptr (duplicate.wr_ptr () - bytes_transferred);
+
+ // This will call the callback.
+ fake_result->complete (message_block.length (),
+ 1,
+ 0);
+
+ // Zap the fake result.
+ delete fake_result;
+ }
+ else
+ // Otherwise, make sure we proceed. Initiate reading the socket
+ // stream.
+ if (this->initiate_read_stream () == -1)
+ return;
+}
+
+int
+Receiver::initiate_read_stream (void)
+{
+ // Create a new <Message_Block>. Note that this message block will
+ // be used both to <read> data asynchronously from the socket and to
+ // <write> data asynchronously to the file.
+ ACE_Message_Block *mb = 0;
+ ACE_NEW_RETURN (mb,
+ ACE_Message_Block (BUFSIZ + 1),
+ -1);
+
+ // Inititiate read
+ if (this->rs_.read (*mb,
+ mb->size () - 1) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "%p\n",
+ "ACE_Asynch_Read_Stream::read"),
+ -1);
+ return 0;
+}
+
+void
+Receiver::handle_read_stream (const ACE_Asynch_Read_Stream::Result &result)
+{
+ ACE_DEBUG ((LM_DEBUG,
+ "handle_read_stream called\n"));
+
+ // Reset pointers.
+ result.message_block ().rd_ptr ()[result.bytes_transferred ()] = '\0';
+
+ ACE_DEBUG ((LM_DEBUG, "********************\n"));
+ ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_to_read", result.bytes_to_read ()));
+ ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "handle", result.handle ()));
+ ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_transfered", result.bytes_transferred ()));
+ ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "act", (u_long) result.act ()));
+ ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "success", result.success ()));
+ ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "completion_key", (u_long) result.completion_key ()));
+ ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "error", result.error ()));
+ ACE_DEBUG ((LM_DEBUG, "********************\n"));
+#if 0
+ // This can overrun the ACE_Log_Msg buffer and do bad things.
+ // Re-enable it at your risk.
+ ACE_DEBUG ((LM_DEBUG, "%s = %s\n", "message_block", result.message_block ().rd_ptr ()));
+#endif /* 0 */
+
+ if (result.success () && result.bytes_transferred () != 0)
+ {
+ // Successful read: write the data to the file asynchronously.
+ // Note how we reuse the <ACE_Message_Block> for the writing.
+ // Therefore, we do not delete this buffer because it is handled
+ // in <handle_write_stream>.
+ if (this->wf_.write (result.message_block (),
+ result.bytes_transferred (),
+ this->file_offset_) == -1)
+ {
+ ACE_ERROR ((LM_ERROR,
+ "%p\n",
+ "ACE_Asynch_Write_File::write"));
+ return;
+ }
+
+ // Initiate new read from the stream.
+ if (this->initiate_read_stream () == -1)
+ return;
+ }
+ else
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "Receiver completed\n"));
+
+ // No need for this message block anymore.
+ result.message_block ().release ();
+
+ // Note that we are done with the test.
+ done = 1;
+
+ // We are done: commit suicide.
+ delete this;
+ }
+}
+
+void
+Receiver::handle_write_file (const ACE_Asynch_Write_File::Result &result)
+{
+ ACE_DEBUG ((LM_DEBUG, "handle_write_file called\n"));
+
+ ACE_DEBUG ((LM_DEBUG, "********************\n"));
+ ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_to_write", result.bytes_to_write ()));
+ ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "handle", result.handle ()));
+ ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_transfered", result.bytes_transferred ()));
+ ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "act", (u_long) result.act ()));
+ ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "success", result.success ()));
+ ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "completion_key", (u_long) result.completion_key ()));
+ ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "error", result.error ()));
+ ACE_DEBUG ((LM_DEBUG, "********************\n"));
+
+ result.message_block ().release ();
+
+ if (result.success ())
+ // Write successful: Increment file offset
+ this->file_offset_ += result.bytes_transferred ();
+
+ // This code is not robust enough to deal with short file writes
+ // (which hardly ever happen) ;-)
+ ACE_ASSERT (result.bytes_to_write () == result.bytes_transferred ());
+}
+
+class Sender : public ACE_Handler
+{
+ // = TITLE
+ // The class will be created by <main>. After connecting to the
+ // host, this class will then read data from a file and send it
+ // to the network connection.
+public:
+ Sender (void);
+ ~Sender (void);
+ int open (const ACE_TCHAR *host,
+ u_short port);
+ ACE_HANDLE handle (void) const;
+ void handle (ACE_HANDLE);
+
+protected:
+ // These methods are called by the freamwork
+
+ virtual void handle_transmit_file (const ACE_Asynch_Transmit_File::Result &result);
+ // This is called when asynchronous transmit files complete
+ virtual void handle_write_stream (const ACE_Asynch_Write_Stream::Result &result);
+ // This is called when asynchronous writes from the socket complete
+ virtual void handle_read_file (const ACE_Asynch_Read_File::Result &result);
+ // This is called when asynchronous reads from the socket complete
+
+private:
+ int transmit_file (void);
+ // Transmit the entire file in one fell swoop.
+
+ int initiate_read_file (void);
+ // Initiate an asynchronous file read.
+
+ ACE_SOCK_Stream stream_;
+ // Network I/O handle
+
+ ACE_Asynch_Write_Stream ws_;
+ // ws (write stream): for writing to the socket
+
+ ACE_Asynch_Read_File rf_;
+ // rf (read file): for writing from the file
+
+ ACE_Asynch_Transmit_File tf_;
+ // Transmit file.
+
+ ACE_HANDLE input_file_;
+ // File to read from
+
+ u_long file_offset_;
+ // Current file offset
+
+ u_long file_size_;
+ // File size
+
+ ACE_Message_Block welcome_message_;
+ // Welcome message
+
+ ACE_Asynch_Transmit_File::Header_And_Trailer header_and_trailer_;
+ // Header and trailer which goes with transmit_file
+
+ int stream_write_done_;
+ int transmit_file_done_;
+ // These flags help to determine when to close down the event loop
+};
+
+Sender::Sender (void)
+ : input_file_ (ACE_INVALID_HANDLE),
+ file_offset_ (0),
+ file_size_ (0),
+ stream_write_done_ (0),
+ transmit_file_done_ (0)
+{
+ // Moment of inspiration... :-)
+ static const char *data = "Welcome to Irfan World! Irfan RULES here !!\n";
+ this->welcome_message_.init (data,
+ ACE_OS::strlen (data));
+ this->welcome_message_.wr_ptr (ACE_OS::strlen (data));
+}
+
+Sender::~Sender (void)
+{
+ this->stream_.close ();
+}
+
+ACE_HANDLE
+Sender::handle (void) const
+{
+ return this->stream_.get_handle ();
+}
+
+void
+Sender::handle (ACE_HANDLE handle)
+{
+ this->stream_.set_handle (handle);
+}
+
+int
+Sender::open (const ACE_TCHAR *host,
+ u_short port)
+{
+ // Initialize stuff
+
+ // Open input file (in OVERLAPPED mode)
+ this->input_file_ =
+ ACE_OS::open (file, GENERIC_READ | FILE_FLAG_OVERLAPPED);
+ if (this->input_file_ == ACE_INVALID_HANDLE)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "%p\n",
+ "ACE_OS::open"), -1);
+
+ // Find file size
+ this->file_size_ =
+ ACE_OS::filesize (this->input_file_);
+
+ // Connect to remote host
+ ACE_INET_Addr address (port, host);
+ ACE_SOCK_Connector connector;
+ if (connector.connect (this->stream_,
+ address) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "%p\n",
+ "ACE_SOCK_Connector::connect"),
+ -1);
+
+ // Open ACE_Asynch_Write_Stream
+ if (this->ws_.open (*this) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "%p\n",
+ "ACE_Asynch_Write_Stream::open"),
+ -1);
+
+ // Open ACE_Asynch_Read_File
+ if (this->rf_.open (*this, this->input_file_) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "%p\n",
+ "ACE_Asynch_Read_File::open"),
+ -1);
+
+ // Start an asynchronous transmit file
+ if (this->transmit_file () == -1)
+ return -1;
+
+ // Start an asynchronous read file
+ if (this->initiate_read_file () == -1)
+ return -1;
+
+ return 0;
+}
+
+int
+Sender::transmit_file (void)
+{
+ // Open file (in SEQUENTIAL_SCAN mode)
+ ACE_HANDLE file_handle =
+ ACE_OS::open (file, GENERIC_READ | FILE_FLAG_SEQUENTIAL_SCAN);
+ if (file_handle == ACE_INVALID_HANDLE)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "%p\n",
+ "ACE_OS::open"),
+ -1);
+
+ // Open ACE_Asynch_Transmit_File
+ if (this->tf_.open (*this) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "%p\n",
+ "ACE_Asynch_Transmit_File::open"),
+ -1);
+
+ // Header and trailer data for the file.
+ // @@ What happens if header and trailer are the same?
+ this->header_and_trailer_.header_and_trailer (&this->welcome_message_,
+ this->welcome_message_.length (),
+ &this->welcome_message_,
+ this->welcome_message_.length ());
+
+ // Send the entire file in one fell swoop!
+ if (this->tf_.transmit_file (file_handle,
+ &this->header_and_trailer_) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "%p\n",
+ "ACE_Asynch_Transmit_File::transmit_file"),
+ -1);
+
+ return 0;
+}
+
+void
+Sender::handle_transmit_file (const ACE_Asynch_Transmit_File::Result &result)
+{
+ ACE_DEBUG ((LM_DEBUG,
+ "handle_transmit_file called\n"));
+
+ ACE_DEBUG ((LM_DEBUG, "********************\n"));
+ ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "socket", result.socket ()));
+ ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "file", result.file ()));
+ ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_to_write", result.bytes_to_write ()));
+ ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_per_send", result.bytes_per_send ()));
+ ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "flags", result.flags ()));
+ ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_transfered", result.bytes_transferred ()));
+ ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "act", (u_long) result.act ()));
+ ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "success", result.success ()));
+ ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "completion_key", (u_long) result.completion_key ()));
+ ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "error", result.error ()));
+ ACE_DEBUG ((LM_DEBUG, "********************\n"));
+
+ // Done with file
+ ACE_OS::close (result.file ());
+
+ this->transmit_file_done_ = 1;
+ if (this->stream_write_done_)
+ done = 1;
+}
+
+int
+Sender::initiate_read_file (void)
+{
+ // Create a new <Message_Block>. Note that this message block will
+ // be used both to <read> data asynchronously from the file and to
+ // <write> data asynchronously to the socket.
+ ACE_Message_Block *mb = 0;
+ ACE_NEW_RETURN (mb,
+ ACE_Message_Block (BUFSIZ + 1),
+ -1);
+
+ // Inititiate an asynchronous read from the file
+ if (this->rf_.read (*mb,
+ mb->size () - 1,
+ this->file_offset_) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "%p\n",
+ "ACE_Asynch_Read_File::read"),
+ -1);
+ return 0;
+}
+
+void
+Sender::handle_read_file (const ACE_Asynch_Read_File::Result &result)
+{
+ ACE_DEBUG ((LM_DEBUG,
+ "handle_read_file called\n"));
+
+ result.message_block ().rd_ptr ()[result.bytes_transferred ()] = '\0';
+
+ ACE_DEBUG ((LM_DEBUG, "********************\n"));
+ ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_to_read", result.bytes_to_read ()));
+ ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "handle", result.handle ()));
+ ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_transfered", result.bytes_transferred ()));
+ ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "act", (u_long) result.act ()));
+ ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "success", result.success ()));
+ ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "completion_key", (u_long) result.completion_key ()));
+ ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "error", result.error ()));
+ ACE_DEBUG ((LM_DEBUG, "********************\n"));
+ //ACE_DEBUG ((LM_DEBUG, "%s = %s\n", "message_block", result.message_block ().rd_ptr ()));
+
+ if (result.success ())
+ {
+ // Read successful: increment offset and write data to network.
+ // Note how we reuse the <ACE_Message_Block> for the writing.
+ // Therefore, we do not delete this buffer because it is handled
+ // in <handle_write_stream>.
+
+ this->file_offset_ += result.bytes_transferred ();
+ if (this->ws_.write (result.message_block (),
+ result.bytes_transferred ()) == -1)
+ {
+ ACE_ERROR ((LM_ERROR,
+ "%p\n",
+ "ACE_Asynch_Write_Stream::write"));
+ return;
+ }
+
+ if (this->file_size_ > this->file_offset_)
+ {
+ // Start an asynchronous read file.
+ if (initiate_read_file () == -1)
+ return;
+ }
+ }
+}
+
+void
+Sender::handle_write_stream (const ACE_Asynch_Write_Stream::Result &result)
+{
+ ACE_DEBUG ((LM_DEBUG,
+ "handle_write_stream called\n"));
+
+ // Reset pointers.
+ result.message_block ().rd_ptr (result.message_block ().rd_ptr () - result.bytes_transferred ());
+
+ ACE_DEBUG ((LM_DEBUG, "********************\n"));
+ ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_to_write", result.bytes_to_write ()));
+ ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "handle", result.handle ()));
+ ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_transfered", result.bytes_transferred ()));
+ ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "act", (u_long) result.act ()));
+ ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "success", result.success ()));
+ ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "completion_key", (u_long) result.completion_key ()));
+ ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "error", result.error ()));
+ ACE_DEBUG ((LM_DEBUG, "********************\n"));
+#if 0
+ ACE_DEBUG ((LM_DEBUG, "%s = %s\n", "message_block", result.message_block ().rd_ptr ()));
+#endif
+
+ if (result.success ())
+ {
+ // Partial write to socket
+ int unsent_data = result.bytes_to_write () - result.bytes_transferred ();
+ if (unsent_data != 0)
+ {
+ // Reset pointers
+ result.message_block ().rd_ptr (result.bytes_transferred ());
+
+ // Duplicate the message block and retry remaining data
+ if (this->ws_.write (*result.message_block ().duplicate (),
+ unsent_data) == -1)
+ {
+ ACE_ERROR ((LM_ERROR,
+ "%p\n",
+ "ACE_Asynch_Write_Stream::write"));
+ return;
+ }
+ }
+ else if (!(this->file_size_ > this->file_offset_))
+ {
+ this->stream_write_done_ = 1;
+ if (this->transmit_file_done_)
+ done = 1;
+ }
+ }
+
+ // Release message block.
+ result.message_block ().release ();
+}
+
+static int
+parse_args (int argc, ACE_TCHAR *argv[])
+{
+ ACE_Get_Opt get_opt (argc, argv, ACE_TEXT("h:p:f:d:"));
+ int c;
+
+ while ((c = get_opt ()) != EOF)
+ switch (c)
+ {
+ case 'h':
+ host = get_opt.opt_arg ();
+ break;
+ case 'p':
+ port = ACE_OS::atoi (get_opt.opt_arg ());
+ break;
+ case 'f':
+ file = get_opt.opt_arg ();
+ break;
+ case 'd':
+ dump_file = get_opt.opt_arg ();
+ break;
+ default:
+ ACE_ERROR ((LM_ERROR, "%p.\n",
+ "usage :\n"
+ "-h <host>\n"
+ "-p <port>\n"
+ "-f <file>\n"));
+ return -1;
+ }
+
+ return 0;
+}
+
+int
+ACE_TMAIN (int argc, ACE_TCHAR *argv[])
+{
+ if (parse_args (argc, argv) == -1)
+ return -1;
+
+ Sender sender;
+
+ // Note: acceptor parameterized by the Receiver.
+ ACE_Asynch_Acceptor<Receiver> acceptor;
+
+ // If passive side
+ if (host == 0)
+ {
+ if (acceptor.open (ACE_INET_Addr (port),
+ initial_read_size,
+ 1) == -1)
+ return -1;
+ }
+ // If active side
+ else if (sender.open (host, port) == -1)
+ return -1;
+
+ int success = 1;
+
+ while (success > 0 && !done)
+ // Dispatch events via Proactor singleton.
+ success = ACE_Proactor::instance ()->handle_events ();
+
+ return 0;
+}
+
+#else /* ACE_WIN32 && !ACE_HAS_WINCE || ACE_HAS_AIO_CALLS*/
+
+int
+ACE_TMAIN (int, ACE_TCHAR *[])
+{
+ ACE_DEBUG ((LM_DEBUG,
+ "This example does not work on this platform.\n"));
+ return 1;
+}
+
+#endif /* ACE_WIN32 && !ACE_HAS_WINCE || ACE_HAS_AIO_CALLS*/
diff --git a/ACE/examples/Reactor/Proactor/test_proactor.h b/ACE/examples/Reactor/Proactor/test_proactor.h
new file mode 100644
index 00000000000..482e176041e
--- /dev/null
+++ b/ACE/examples/Reactor/Proactor/test_proactor.h
@@ -0,0 +1,56 @@
+/*
+** $Id$
+*/
+
+#ifndef _TEST_PROACTOR_H
+#define _TEST_PROACTOR_H
+
+#include "ace/Asynch_IO.h"
+
+class Receiver : public ACE_Service_Handler
+{
+ // = TITLE
+ // The class will be created by <ACE_Asynch_Acceptor> when new
+ // connections arrive. This class will then receive data from
+ // the network connection and dump it to a file.
+public:
+ // = Initialization and termination.
+ Receiver (void);
+ ~Receiver (void);
+
+ virtual void open (ACE_HANDLE handle,
+ ACE_Message_Block &message_block);
+ // This is called after the new connection has been accepted.
+
+protected:
+ // These methods are called by the framework
+
+ virtual void handle_read_stream (const ACE_Asynch_Read_Stream::Result &result);
+ // This is called when asynchronous <read> operation from the socket
+ // complete.
+
+ virtual void handle_write_file (const ACE_Asynch_Write_File::Result &result);
+ // This is called when an asynchronous <write> to the file
+ // completes.
+
+private:
+ int initiate_read_stream (void);
+ // Initiate an asynchronous <read> operation on the socket.
+
+ ACE_Asynch_Read_Stream rs_;
+ // rs (read stream): for reading from a socket.
+
+ ACE_HANDLE dump_file_;
+ // File for dumping data.
+
+ ACE_Asynch_Write_File wf_;
+ // wf (write file): for writing to a file.
+
+ u_long file_offset_;
+ // Offset for the file.
+
+ ACE_HANDLE handle_;
+ // Handle for IO to remote peer.
+};
+
+#endif /* _TEST_PROACTOR_H */
diff --git a/ACE/examples/Reactor/Proactor/test_proactor2.cpp b/ACE/examples/Reactor/Proactor/test_proactor2.cpp
new file mode 100644
index 00000000000..cd5cbf7092e
--- /dev/null
+++ b/ACE/examples/Reactor/Proactor/test_proactor2.cpp
@@ -0,0 +1,808 @@
+// $Id$
+
+// ============================================================================
+//
+// = LIBRARY
+// examples
+//
+// = FILENAME
+// test_proactor2.cpp
+//
+// = DESCRIPTION
+// Alexander Libman <Alibman@baltimore.com> modified
+// <test_proactor> and made this test. Instead of writing received
+// data to the file, the receiver sends them back to the
+// sender,i.e. ACE_Asynch_Write_File wf_ has been changed to
+// ACE_Asynch_Write_Stream wf_.
+//
+// = AUTHOR
+// Irfan Pyarali <irfan@cs.wustl.edu> and Alexander Libman
+// <Alibman@baltimore.com>.
+// ============================================================================
+
+#include "ace/Signal.h"
+
+#include "ace/Service_Config.h"
+#include "ace/Proactor.h"
+#include "ace/Asynch_IO.h"
+#include "ace/Asynch_IO_Impl.h"
+#include "ace/Asynch_Acceptor.h"
+#include "ace/INET_Addr.h"
+#include "ace/SOCK_Connector.h"
+#include "ace/SOCK_Acceptor.h"
+#include "ace/SOCK_Stream.h"
+#include "ace/Message_Block.h"
+#include "ace/Get_Opt.h"
+
+// FUZZ: disable check_for_streams_include
+#include "ace/streams.h"
+
+#include "ace/Task.h"
+#include "ace/OS_main.h"
+
+ACE_RCSID(Proactor, test_proactor2, "test_proactor2.cpp,v 1.27 2000/03/07 17:15:56 schmidt Exp")
+
+#if ((defined (ACE_WIN32) && !defined (ACE_HAS_WINCE)) || (defined (ACE_HAS_AIO_CALLS)))
+ // This only works on Win32 platforms and on Unix platforms supporting
+ // POSIX aio calls.
+
+#if defined (ACE_WIN32) && !defined (ACE_HAS_WINCE)
+
+#include "ace/WIN32_Proactor.h"
+
+#elif defined (ACE_HAS_AIO_CALLS)
+
+#include "ace/POSIX_Proactor.h"
+
+#endif
+
+ // Some debug helper functions
+ int DisableSignal ( int SigNum );
+int PrintSigMask ();
+
+#define COUT(X) cout << X ; cout.flush ();
+
+// Host that we're connecting to.
+static ACE_TCHAR *host = 0;
+
+// duplex mode: ==0 half-duplex
+// !=0 full duplex
+static int duplex = 0 ;
+
+// number threads in the Proactor thread pool
+static int nThreads = 1;
+
+// Port that we're receiving connections on.
+static u_short port = ACE_DEFAULT_SERVER_PORT;
+
+// Size of each initial asynchronous <read> operation.
+static int initial_read_size = BUFSIZ;
+
+
+#define MyMutex ACE_Recursive_Thread_Mutex
+//#define MyMutex ACE_Thread_Mutex
+//#define MyMutex ACE_Null_Mutex
+
+//--------------------------------------------------------------------------
+// MyTask plays role for Proactor threads pool
+//--------------------------------------------------------------------------
+class MyTask: public ACE_Task<ACE_MT_SYNCH>
+{
+
+public:
+
+ int svc (void) ;
+};
+
+
+int MyTask::svc (void )
+{
+ ACE_DEBUG ((LM_DEBUG, "(%t) MyTask started\n"));
+
+ while ( ACE_Proactor::event_loop_done () == 0 )
+ {
+ ACE_Proactor::run_event_loop ();
+ }
+
+ ACE_DEBUG ((LM_DEBUG, "(%t) MyTask finished\n"));
+ return 0 ;
+}
+
+//-----------------------------------------------------------
+// Receiver
+//-----------------------------------------------------------
+class Receiver : public ACE_Service_Handler
+{
+public:
+
+ Receiver (void);
+ ~Receiver (void);
+
+ virtual void open (ACE_HANDLE handle,
+ ACE_Message_Block &message_block);
+ // This is called after the new connection has been accepted.
+
+protected:
+ // These methods are called by the framework
+
+ virtual void handle_read_stream (const ACE_Asynch_Read_Stream::Result
+ &result);
+ // This is called when asynchronous <read> operation from the socket
+ // complete.
+
+ virtual void handle_write_stream (const ACE_Asynch_Write_Stream::Result
+ &result);
+ // This is called when an asynchronous <write> to the file
+ // completes.
+
+private:
+ int initiate_read_stream (void);
+ int initiate_write_stream (ACE_Message_Block & mb, int nBytes );
+ bool check_destroy () ;
+
+ ACE_Asynch_Read_Stream rs_;
+ ACE_Asynch_Write_Stream ws_;
+ ACE_HANDLE handle_;
+ MyMutex m_Mtx ;
+ long nIOCount ;
+ static long nSessions ;
+};
+
+
+long Receiver::nSessions = 0 ;
+
+Receiver::Receiver (void)
+ : handle_ (ACE_INVALID_HANDLE),
+ nIOCount ( 0 )
+{
+ ACE_Guard<MyMutex> locker (m_Mtx) ;
+ nSessions ++ ;
+ ACE_DEBUG ((LM_DEBUG, "Receiver Ctor nSessions=%d\n", nSessions ));
+}
+
+Receiver::~Receiver (void)
+{
+ ACE_Guard<MyMutex> locker (m_Mtx) ;
+ nSessions -- ;
+ ACE_OS::closesocket (this->handle_);
+ ACE_DEBUG ((LM_DEBUG, "~Receiver Dtor nSessions=%d\n", nSessions ));
+}
+
+//---------------------------------------------------------------------
+// return true if we alive, false we commited suicide
+//
+//---------------------------------------------------------------------
+bool Receiver::check_destroy ()
+{
+ {
+ ACE_Guard<MyMutex> locker (m_Mtx) ;
+
+ if ( nIOCount > 0 )
+ {
+ return true ;
+ }
+ }
+
+ delete this ;
+ return false ;
+}
+
+
+void Receiver::open (ACE_HANDLE handle,
+ ACE_Message_Block &message_block)
+{
+ ACE_UNUSED_ARG (message_block);
+
+ ACE_DEBUG ((LM_DEBUG,
+ "%N:%l:Receiver::open called\n"));
+
+
+ this->handle_ = handle;
+
+ if (this->ws_.open (*this, this->handle_ ) == -1)
+ {
+ ACE_ERROR ((LM_ERROR,
+ "%p\n",
+ "ACE_Asynch_Write_Stream::open"));
+
+ }
+ else if (this->rs_.open (*this, this->handle_) == -1)
+ {
+ ACE_ERROR ((LM_ERROR,
+ "%p\n",
+ "ACE_Asynch_Read_Stream::open"));
+ }
+ else
+ {
+ initiate_read_stream ();
+ }
+
+
+ check_destroy ();
+}
+
+int Receiver::initiate_read_stream (void)
+{
+ ACE_Guard<MyMutex> locker (m_Mtx) ;
+
+ // Create a new <Message_Block>. Note that this message block will
+ // be used both to <read> data asynchronously from the socket and to
+ // <write> data asynchronously to the file.
+ ACE_DEBUG ((LM_DEBUG,
+ "initiate_read_stream called\n"));
+
+
+ ACE_Message_Block *mb = 0;
+ ACE_NEW_RETURN (mb,
+ ACE_Message_Block (BUFSIZ + 1),
+ -1);
+
+ // Inititiate read
+ if (this->rs_.read (*mb, mb->size ()- 1) == -1)
+ {
+ mb->release () ;
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "%p\n",
+ "ACE_Asynch_Read_Stream::read"),
+ -1);
+ }
+
+ nIOCount++ ;
+ return 0;
+}
+
+int Receiver::initiate_write_stream (ACE_Message_Block & mb, int nBytes )
+{
+ ACE_Guard<MyMutex> locker (m_Mtx) ;
+ if (this->ws_.write (mb , nBytes ) == -1)
+ {
+ mb.release ();
+ ACE_ERROR_RETURN((LM_ERROR,
+ "%p\n",
+ "ACE_Asynch_Write_File::write"),
+ -1);
+ }
+
+ nIOCount++ ;
+ return 0;
+}
+
+void
+Receiver::handle_read_stream (const ACE_Asynch_Read_Stream::Result &result)
+{
+ ACE_DEBUG ((LM_DEBUG,
+ "handle_read_stream called\n"));
+
+ // Reset pointers.
+ result.message_block ().rd_ptr ()[result.bytes_transferred ()] =
+ '\0';
+
+ ACE_DEBUG ((LM_DEBUG, "********************\n"));
+ ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_to_read", result.bytes_to_read
+ ()));
+ ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "handle", result.handle ()));
+ ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_transfered",
+ result.bytes_transferred ()));
+ ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "act", (u_long) result.act ()));
+ ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "success", result.success ()));
+ ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "completion_key", (u_long)
+ result.completion_key ()));
+ ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "error", result.error ()));
+ ACE_DEBUG ((LM_DEBUG, "********************\n"));
+ ACE_DEBUG ((LM_DEBUG, "%s = %s\n", "message_block",
+ result.message_block ().rd_ptr ()));
+
+ if ( result.success () && result.bytes_transferred () != 0)
+ {
+ // Successful read: write the data to the file asynchronously.
+ // Note how we reuse the <ACE_Message_Block> for the writing.
+ // Therefore, we do not delete this buffer because it is handled
+ // in <handle_write_stream>.
+
+ if(this->initiate_write_stream (result.message_block (),
+
+ result.bytes_transferred () ) == 0 )
+ {
+ if ( duplex != 0 )
+ {
+ // Initiate new read from the stream.
+ this->initiate_read_stream () ;
+ }
+ }
+ }
+ else
+ {
+ result.message_block ().release ();
+ ACE_DEBUG ((LM_DEBUG, "Receiver completed\n"));
+ }
+
+ {
+ ACE_Guard<MyMutex> locker (m_Mtx) ;
+ nIOCount-- ;
+ }
+ check_destroy () ;
+}
+
+void
+Receiver::handle_write_stream (const ACE_Asynch_Write_Stream::Result
+ &result)
+{
+ ACE_DEBUG ((LM_DEBUG, "handle_write_stream called\n"));
+
+ ACE_DEBUG ((LM_DEBUG, "********************\n"));
+ ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_to_write",
+ result.bytes_to_write ()));
+ ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "handle", result.handle ()));
+ ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_transfered",
+ result.bytes_transferred ()));
+ ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "act", (u_long) result.act ()));
+ ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "success", result.success ()));
+ ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "completion_key", (u_long)
+ result.completion_key ()));
+ ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "error", result.error ()));
+ ACE_DEBUG ((LM_DEBUG, "********************\n"));
+
+ result.message_block ().release ();
+
+ if (result.success ())
+ {
+ // This code is not robust enough to deal with short file writes
+ // (which hardly ever happen) ;-)
+ //ACE_ASSERT (result.bytes_to_write () == result.bytes_transferred ());
+
+ if ( duplex == 0 )
+ {
+ initiate_read_stream () ;
+ }
+ }
+
+ {
+ ACE_Guard<MyMutex> locker (m_Mtx) ;
+ nIOCount-- ;
+ }
+ check_destroy () ;
+}
+
+//-------------------------------------------------------------------------
+// Sender: sends indefinetely welcome message
+// and recieves it back
+//------------------------------------------------------------------------
+class Sender : public ACE_Handler
+{
+public:
+ Sender (void);
+ ~Sender (void);
+ int open (const ACE_TCHAR *host, u_short port);
+ void close ();
+ ACE_HANDLE handle (void) const;
+ void handle (ACE_HANDLE);
+
+protected:
+// These methods are called by the freamwork
+
+virtual void handle_read_stream (const ACE_Asynch_Read_Stream::Result
+&result);
+// This is called when asynchronous reads from the socket complete
+
+virtual void handle_write_stream (const ACE_Asynch_Write_Stream::Result
+&result);
+// This is called when asynchronous writes from the socket complete
+
+private:
+
+int initiate_read_stream (void);
+int initiate_write_stream (void);
+
+ACE_SOCK_Stream stream_;
+// Network I/O handle
+
+ACE_Asynch_Write_Stream ws_;
+// ws (write stream): for writing to the socket
+
+ACE_Asynch_Read_Stream rs_;
+// rs (read file): for reading from the socket
+
+ACE_Message_Block welcome_message_;
+// Welcome message
+
+MyMutex m_Mtx ;
+long nIOCount ;
+};
+
+static char *data = "Welcome to Irfan World! Irfan RULES here !!\n";
+
+Sender::Sender (void)
+ :nIOCount ( 0 )
+{
+ // Moment of inspiration... :-)
+ this->welcome_message_.init (data, ACE_OS::strlen (data));
+}
+
+Sender::~Sender (void)
+{
+ close ();
+}
+
+void Sender::close ()
+{
+ this->stream_.close ();
+}
+
+ACE_HANDLE Sender::handle (void) const
+{
+ return this->stream_.get_handle ();
+}
+
+void Sender::handle (ACE_HANDLE handle)
+{
+ this->stream_.set_handle (handle);
+}
+
+int Sender::open (const ACE_TCHAR *host, u_short port)
+{
+ // Initialize stuff
+ // Connect to remote host
+ ACE_INET_Addr address (port, host);
+ ACE_SOCK_Connector connector;
+
+ if (connector.connect (this->stream_,
+ address) == -1)
+ {
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "%p\n",
+ "ACE_SOCK_Connector::connect"),
+ -1);
+ }
+
+ // Open ACE_Asynch_Write_Stream
+ if (this->ws_.open (*this) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "%p\n",
+ "ACE_Asynch_Write_Stream::open"),
+ -1);
+
+ // Open ACE_Asynch_Read_Stream
+ if (this->rs_.open (*this) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "%p\n",
+ "ACE_Asynch_Read_File::open"),
+ -1);
+
+ // Start an asynchronous transmit file
+ if ( this->initiate_write_stream () == -1)
+ return -1;
+
+ if ( duplex != 0 )
+ {
+ // Start an asynchronous read file
+ if (this->initiate_read_stream () == -1)
+ return -1;
+ }
+
+ return 0;
+}
+
+int Sender::initiate_write_stream (void)
+{
+ ACE_Guard<MyMutex> locker (m_Mtx) ;
+
+
+ welcome_message_.rd_ptr( welcome_message_.base ());
+ welcome_message_.wr_ptr( welcome_message_.base ());
+ welcome_message_.wr_ptr (ACE_OS::strlen (data));
+
+ if (this->ws_.write (welcome_message_,
+ welcome_message_.length ()
+ ) == -1)
+ {
+ ACE_ERROR_RETURN((LM_ERROR,
+ "%p\n",
+ "ACE_Asynch_Write_File::write"),
+ -1);
+ }
+
+ nIOCount++ ;
+ return 0;
+}
+
+int Sender::initiate_read_stream (void)
+{
+ ACE_Guard<MyMutex> locker (m_Mtx) ;
+
+ // Create a new <Message_Block>. Note that this message block will
+ // be used both to <read> data asynchronously from the socket and to
+ // <write> data asynchronously to the file.
+ ACE_DEBUG ((LM_DEBUG,
+ "initiate_read_stream called\n"));
+
+
+ ACE_Message_Block *mb = 0;
+ ACE_NEW_RETURN (mb,
+ ACE_Message_Block (BUFSIZ + 1),
+ -1);
+
+ // Inititiate read
+ if (this->rs_.read (*mb, mb->size ()- 1) == -1)
+ {
+ mb->release () ;
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "%p\n",
+ "ACE_Asynch_Read_Stream::read"),
+ -1);
+ }
+
+ nIOCount++ ;
+ return 0;
+}
+
+
+void Sender::handle_write_stream (const ACE_Asynch_Write_Stream::Result
+ &result)
+{
+ ACE_DEBUG ((LM_DEBUG,
+ "handle_write_stream called\n"));
+
+ // Reset pointers.
+ result.message_block ().rd_ptr (result.message_block ().rd_ptr () -
+ result.bytes_transferred ());
+
+
+ ACE_DEBUG ((LM_DEBUG, "********************\n"));
+ ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_to_write",
+ result.bytes_to_write ()));
+ ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "handle", result.handle ()));
+ ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_transfered",
+ result.bytes_transferred ()));
+ ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "act", (u_long) result.act ()));
+ ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "success", result.success ()));
+ ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "completion_key", (u_long)
+ result.completion_key ()));
+ ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "error", result.error ()));
+ ACE_DEBUG ((LM_DEBUG, "********************\n"));
+ ACE_DEBUG ((LM_DEBUG, "%s = %s\n", "message_block",
+ result.message_block ().rd_ptr ()));
+
+ // Simplify just for Test
+ if (result.success () && result.bytes_transferred () != 0)
+ {
+ if ( duplex != 0 ) // full duplex, continue write
+ {
+ initiate_write_stream () ;
+ }
+ else // half-duplex read reply, after read we will start
+ // write
+ {
+ initiate_read_stream () ;
+ }
+ }
+
+ {
+ ACE_Guard<MyMutex> locker (m_Mtx) ;
+ nIOCount-- ;
+ }
+}
+
+void
+Sender::handle_read_stream (const ACE_Asynch_Read_Stream::Result &result)
+{
+ ACE_DEBUG ((LM_DEBUG,
+ "handle_read_stream called\n"));
+
+ // Reset pointers.
+ result.message_block ().rd_ptr ()[result.bytes_transferred ()] =
+ '\0';
+
+ ACE_DEBUG ((LM_DEBUG, "********************\n"));
+ ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_to_read", result.bytes_to_read
+ ()));
+ ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "handle", result.handle ()));
+ ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_transfered",
+ result.bytes_transferred ()));
+ ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "act", (u_long) result.act ()));
+ ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "success", result.success ()));
+ ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "completion_key", (u_long)
+ result.completion_key ()));
+ ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "error", result.error ()));
+ ACE_DEBUG ((LM_DEBUG, "********************\n"));
+ ACE_DEBUG ((LM_DEBUG, "%s = %s\n", "message_block",
+ result.message_block ().rd_ptr ()));
+
+ result.message_block().release ();
+
+ if ( result.success () && result.bytes_transferred () != 0)
+ {
+ // Successful read: write the data to the file asynchronously.
+ // Note how we reuse the <ACE_Message_Block> for the writing.
+ // Therefore, we do not delete this buffer because it is handled
+ // in <handle_write_stream>.
+
+ if ( duplex != 0 ) // full duplex, continue read
+ {
+ initiate_read_stream () ;
+ }
+ else // half-duplex writey, after write we will start read
+ {
+ initiate_write_stream () ;
+ }
+ }
+
+ {
+ ACE_Guard<MyMutex> locker (m_Mtx) ;
+ nIOCount-- ;
+ }
+}
+
+//--------------------------------------------------------------------------
+
+static int
+parse_args (int argc, ACE_TCHAR *argv[])
+{
+ ACE_Get_Opt get_opt (argc, argv, ACE_TEXT("n:p:d:h:"));
+ int c;
+
+ while ((c = get_opt ()) != EOF)
+ switch (c)
+ {
+ case 'h':
+ host = get_opt.opt_arg ();
+ break;
+ case 'n':
+ nThreads = ACE_OS::atoi (get_opt.opt_arg ()) ;
+ break;
+ case 'p':
+ port = ACE_OS::atoi (get_opt.opt_arg ());
+ break;
+ case 'd':
+ duplex = ACE_OS::atoi (get_opt.opt_arg ());
+ break;
+ default:
+ ACE_ERROR ((LM_ERROR, "%p.\n",
+ "usage :\n"
+ "-h <host> for Sender mode\n"
+ "-d <duplex mode 1-on/0-off>\n"
+ "-p <port to listen/connect>\n"
+ "-n <number threads for Proactor pool>\n"));
+ return -1;
+ }
+
+ return 0;
+}
+
+int
+ACE_TMAIN (int argc, ACE_TCHAR *argv[])
+{
+ ACE_UNUSED_ARG (initial_read_size);
+
+ if (parse_args (argc, argv) == -1)
+ return -1;
+
+#if defined (ACE_WIN32) && !defined (ACE_HAS_WINCE)
+
+ ACE_WIN32_Proactor * pImpl = new ACE_WIN32_Proactor;
+
+#elif defined (ACE_HAS_AIO_CALLS)
+
+ // ACE_POSIX_AIOCB_Proactor * pImpl = new ACE_POSIX_AIOCB_Proactor;
+ ACE_POSIX_SIG_Proactor * pImpl = new ACE_POSIX_SIG_Proactor;
+#endif
+
+ ACE_Proactor Proactor ( pImpl ,1 );
+
+ ACE_Proactor::instance( & Proactor );
+
+
+ MyTask Task1 ;
+
+ if (Task1.activate (THR_NEW_LWP, nThreads ) == -1)
+ {
+ ACE_ERROR_RETURN ((LM_ERROR, "%p.\n", "main"), -1);
+ }
+
+ Sender sender;
+ ACE_Asynch_Acceptor<Receiver> acceptor;
+
+ int Rc = -1 ;
+
+ if ( host == NULL ) // Acceptor
+ {
+ // Simplify , initial read with zero size
+ Rc = acceptor.open (ACE_INET_Addr (port),0,1);
+
+ }
+ else
+ {
+ Rc = sender.open (host, port);
+ }
+
+ if ( Rc == 0 )
+ {
+ char c ;
+ cout << "Press any key to stop and exit=>\n" << flush ;
+ cin.clear ();
+ cin >> c ;
+ }
+
+ ACE_Proactor::end_event_loop () ;
+
+ if ( host != NULL ) // we are sender
+ {
+ sender.close () ; // disconnect to get reciever error !!!
+ }
+
+
+ ACE_Thread_Manager * pTM = ACE_Thread_Manager::instance();
+
+ pTM->wait_task ( & Task1 ) ;
+
+ ACE_Proactor::instance( ( ACE_Proactor* )NULL );
+
+ return 0;
+}
+//--------------------------------------------------------------------
+//
+//--------------------------------------------------------------------
+int DisableSignal ( int SigNum )
+{
+
+#ifndef ACE_WIN32
+ sigset_t signal_set;
+ if ( sigemptyset (&signal_set) == - 1 )
+ {
+ ACE_ERROR ((LM_ERROR,
+ "Error:(%P | %t):%p\n",
+ "sigemptyset failed"));
+ }
+
+ sigaddset (&signal_set, SigNum);
+
+ // Put the <signal_set>.
+ if (ACE_OS::pthread_sigmask (SIG_BLOCK, &signal_set, 0) != 0)
+ {
+ ACE_ERROR ((LM_ERROR,
+ "Error:(%P | %t):%p\n",
+ "pthread_sigmask failed"));
+ }
+#else
+ ACE_UNUSED_ARG(SigNum);
+#endif
+
+ return 1;
+}
+//--------------------------------------------------------------------
+// Get the <signal_set> back from the OS.
+//--------------------------------------------------------------------
+
+int PrintSigMask ()
+{
+#ifndef ACE_WIN32
+
+ sigset_t mask ;
+ int member = 0;
+
+ COUT ( "\n=============Signal Mask==========" )
+
+ if (ACE_OS::pthread_sigmask (SIG_SETMASK, 0, & mask ) != 0)
+ {
+ ACE_ERROR ((LM_ERROR,
+ "Error:(%P | %t):%p\n",
+ "ACE_OS::pthread_sigmask failed"));
+ }
+ else for (int i = 1 ; i < 1000; i++)
+ {
+ member = sigismember (&mask,i);
+
+ COUT ( "\nSig " )
+ COUT ( i )
+ COUT ( " is " )
+ COUT (member )
+
+ if (member == -1)
+ {
+ break ;
+ }
+ }
+#endif
+ return 0;
+}
+
+#endif /* ACE_WIN32 && !ACE_HAS_WINCE || ACE_HAS_AIO_CALLS*/
diff --git a/ACE/examples/Reactor/Proactor/test_proactor3.cpp b/ACE/examples/Reactor/Proactor/test_proactor3.cpp
new file mode 100644
index 00000000000..c47468276c8
--- /dev/null
+++ b/ACE/examples/Reactor/Proactor/test_proactor3.cpp
@@ -0,0 +1,864 @@
+// $Id$
+
+// ============================================================================
+//
+// = LIBRARY
+// examples
+//
+// = FILENAME
+// test_proactor3.cpp
+//
+// = DESCRIPTION
+// This program illustrates how the <ACE_Proactor> can be used to
+// implement an application that does various asynchronous
+// operations.
+//
+// = AUTHOR
+// Irfan Pyarali <irfan@cs.wustl.edu>
+// modified by Alexander Libman <alibman@baltimore.com>
+// from original test_proactor.cpp
+// ============================================================================
+
+#include "ace/Signal.h"
+
+#include "ace/Service_Config.h"
+#include "ace/Proactor.h"
+#include "ace/Asynch_IO.h"
+#include "ace/Asynch_IO_Impl.h"
+#include "ace/Asynch_Acceptor.h"
+#include "ace/INET_Addr.h"
+#include "ace/SOCK_Connector.h"
+#include "ace/SOCK_Acceptor.h"
+#include "ace/SOCK_Stream.h"
+#include "ace/Message_Block.h"
+#include "ace/Get_Opt.h"
+
+// FUZZ: disable check_for_streams_include
+#include "ace/streams.h"
+
+#include "ace/Task.h"
+
+ACE_RCSID(Proactor, test_proactor, "test_proactor.cpp,v 1.27 2000/03/07 17:15:56 schmidt Exp")
+
+#if ((defined (ACE_WIN32) && !defined (ACE_HAS_WINCE)) || (defined (ACE_HAS_AIO_CALLS)))
+ // This only works on Win32 platforms and on Unix platforms
+ // supporting POSIX aio calls.
+
+#if defined (ACE_WIN32) && !defined (ACE_HAS_WINCE)
+
+# include "ace/WIN32_Proactor.h"
+
+#elif defined (ACE_HAS_AIO_CALLS)
+
+# include "ace/POSIX_Proactor.h"
+# include "ace/SUN_Proactor.h"
+
+#endif /* defined (ACE_WIN32) && !defined (ACE_HAS_WINCE) */
+
+// Some debug helper functions
+static int disable_signal (int sigmin, int sigmax);
+#if 0
+static int print_sigmask (void);
+#endif
+
+#define COUT(X) cout << X; cout.flush ();
+
+// Proactor Type (UNIX only, Win32 ignored) 0-default, 1 -AIOCB,
+// 2-SIG, 3-SUN
+static int proactor_type = 0;
+
+// POSIX : > 0 max number aio operations proactor,
+static int max_aio_operations = 0;
+
+// Host that we're connecting to.
+static ACE_TCHAR *host = 0;
+
+// number of Senders instances
+static int senders = 1;
+static const int MaxSenders = 100;
+
+// duplex mode: ==0 half-duplex
+// !=0 full duplex
+static int duplex = 0;
+
+// number threads in the Proactor thread pool
+static int threads = 1;
+
+// Port that we're receiving connections on.
+static u_short port = ACE_DEFAULT_SERVER_PORT;
+
+class MyTask: public ACE_Task<ACE_MT_SYNCH>
+{
+ // = TITLE
+ // MyTask plays role for Proactor threads pool
+public:
+ MyTask (void) : threads_ (0), proactor_ (0) {}
+
+ int svc (void);
+ void waitready (void) { event_.wait (); }
+
+private:
+ ACE_Recursive_Thread_Mutex mutex_;
+ int threads_;
+ ACE_Proactor *proactor_;
+ ACE_Manual_Event event_;
+
+ void create_proactor (void);
+ void delete_proactor (void);
+};
+
+void
+MyTask::create_proactor (void)
+{
+ ACE_Guard<ACE_Recursive_Thread_Mutex> locker (mutex_);
+
+ if (threads_ == 0)
+ {
+#if defined (ACE_WIN32) && !defined (ACE_HAS_WINCE)
+ ACE_WIN32_Proactor *proactor = new ACE_WIN32_Proactor;
+ ACE_DEBUG ((LM_DEBUG,"(%t) Create Proactor Type=WIN32"));
+
+#elif defined (ACE_HAS_AIO_CALLS)
+
+ ACE_POSIX_Proactor *proactor = 0;
+
+ switch (proactor_type)
+ {
+ case 1: proactor = new ACE_POSIX_AIOCB_Proactor (max_aio_operations);
+ ACE_DEBUG ((LM_DEBUG,"(%t) Create Proactor Type=AIOCB\n"));
+ break;
+ case 2: proactor = new ACE_POSIX_SIG_Proactor;
+ ACE_DEBUG ((LM_DEBUG,"(%t) Create Proactor Type=SIG\n"));
+ break;
+# if defined (sun)
+ case 3: proactor = new ACE_SUN_Proactor (max_aio_operations);
+ ACE_DEBUG ((LM_DEBUG,"(%t) Create Proactor Type=SUN\n"));
+ break;
+# endif /* sun */
+ default:proactor = new ACE_POSIX_SIG_Proactor;
+ ACE_DEBUG ((LM_DEBUG,"(%t) Create Proactor Type=SIG\n"));
+ break;
+ }
+#endif
+
+ proactor_ = new ACE_Proactor (proactor, 1);
+
+ ACE_Proactor::instance(proactor_);
+ event_.signal ();
+ }
+
+ threads_++;
+}
+
+void
+MyTask::delete_proactor (void)
+{
+ ACE_Guard<ACE_Recursive_Thread_Mutex> locker (mutex_);
+ if (--threads_ == 0)
+ {
+ ACE_DEBUG ((LM_DEBUG, "(%t) Delete Proactor\n"));
+ ACE_Proactor::instance ((ACE_Proactor *) 0);
+ delete proactor_;
+ proactor_ = 0;
+ }
+}
+
+int
+MyTask::svc (void)
+{
+ ACE_DEBUG ((LM_DEBUG, "(%t) MyTask started\n"));
+
+ create_proactor ();
+ disable_signal (ACE_SIGRTMIN, ACE_SIGRTMAX);
+
+ while (ACE_Proactor::event_loop_done () == 0)
+ ACE_Proactor::run_event_loop ();
+
+ delete_proactor ();
+
+ ACE_DEBUG ((LM_DEBUG, "(%t) MyTask finished\n"));
+ return 0;
+}
+
+class Receiver : public ACE_Service_Handler
+{
+public:
+
+ Receiver (void);
+ ~Receiver (void);
+
+ virtual void open (ACE_HANDLE handle,
+ ACE_Message_Block &message_block);
+ // This is called after the new connection has been accepted.
+
+ static long get_number_sessions (void) { return sessions_; }
+
+protected:
+ // These methods are called by the framework
+
+ virtual void handle_read_stream (const ACE_Asynch_Read_Stream::Result &result);
+ // This is called when asynchronous <read> operation from the socket
+ // complete.
+
+ virtual void handle_write_stream (const ACE_Asynch_Write_Stream::Result &result);
+ // This is called when an asynchronous <write> to the file
+ // completes.
+
+private:
+ int initiate_read_stream (void);
+ int initiate_write_stream (ACE_Message_Block & mb, int nBytes);
+ int check_destroy (void);
+
+ ACE_Asynch_Read_Stream rs_;
+ ACE_Asynch_Write_Stream ws_;
+ ACE_HANDLE handle_;
+ ACE_Recursive_Thread_Mutex mutex_;
+ long io_count_;
+ static long sessions_;
+};
+
+long Receiver::sessions_ = 0;
+
+Receiver::Receiver (void)
+ : handle_ (ACE_INVALID_HANDLE),
+ io_count_ (0)
+{
+ ACE_Guard<ACE_Recursive_Thread_Mutex> locker (mutex_);
+ sessions_++;
+ ACE_DEBUG ((LM_DEBUG, "Receiver Ctor sessions_=%d\n", sessions_));
+}
+
+Receiver::~Receiver (void)
+{
+ ACE_Guard<ACE_Recursive_Thread_Mutex> locker (mutex_);
+ sessions_--;
+ ACE_OS::closesocket (this->handle_);
+ ACE_DEBUG ((LM_DEBUG, "~Receiver Dtor sessions_=%d\n", sessions_));
+}
+
+// return true if we alive, false we commited suicide
+int
+Receiver::check_destroy (void)
+{
+ {
+ ACE_Guard<ACE_Recursive_Thread_Mutex> locker (mutex_);
+
+ if (io_count_ > 0)
+ return 1;
+ }
+
+ delete this;
+ return 0;
+}
+
+void
+Receiver::open (ACE_HANDLE handle,
+ ACE_Message_Block &)
+{
+ ACE_DEBUG ((LM_DEBUG,
+ "%N:%l:Receiver::open called\n"));
+
+ this->handle_ = handle;
+
+ if (this->ws_.open (*this, this->handle_) == -1)
+ ACE_ERROR ((LM_ERROR,
+ "%p\n",
+ "ACE_Asynch_Write_Stream::open"));
+ else if (this->rs_.open (*this, this->handle_) == -1)
+ ACE_ERROR ((LM_ERROR,
+ "%p\n",
+ "ACE_Asynch_Read_Stream::open"));
+ else
+ initiate_read_stream ();
+
+ check_destroy ();
+}
+
+int
+Receiver::initiate_read_stream (void)
+{
+ ACE_Guard<ACE_Recursive_Thread_Mutex> locker (mutex_);
+
+ ACE_Message_Block *mb = 0;
+ ACE_NEW_RETURN (mb,
+ ACE_Message_Block (BUFSIZ + 1),
+ -1);
+
+ // Inititiate read
+ if (this->rs_.read (*mb, mb->size ()- 1) == -1)
+ {
+ mb->release ();
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "%p\n",
+ "ACE_Asynch_Read_Stream::read"),
+ -1);
+ }
+
+ io_count_++;
+ return 0;
+}
+
+int
+Receiver::initiate_write_stream (ACE_Message_Block &mb, int nbytes)
+{
+ ACE_Guard<ACE_Recursive_Thread_Mutex> locker (mutex_);
+ if (nbytes <= 0)
+ {
+ mb.release ();
+ ACE_ERROR_RETURN((LM_ERROR,
+ "ACE_Asynch_Write_Stream::write nbytes <0 "),
+ -1);
+ }
+
+ if (this->ws_.write (mb, nbytes) == -1)
+ {
+ mb.release ();
+ ACE_ERROR_RETURN((LM_ERROR,
+ "%p\n",
+ "ACE_Asynch_Write_Stream::write"),
+ -1);
+ }
+
+ io_count_++;
+ return 0;
+}
+
+void
+Receiver::handle_read_stream (const ACE_Asynch_Read_Stream::Result &result)
+{
+ // Reset pointers.
+ result.message_block ().rd_ptr ()[result.bytes_transferred ()] = '\0';
+
+ if (result.bytes_transferred () == 0 || result.error () != 0)
+ {
+ ACE_DEBUG ((LM_DEBUG, "handle_read_stream called\n"));
+ ACE_DEBUG ((LM_DEBUG, "********************\n"));
+ ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_to_read", result.bytes_to_read ()));
+ ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "handle", result.handle ()));
+ ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_transfered", result.bytes_transferred ()));
+ ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "act", (u_long) result.act ()));
+ ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "success", result.success ()));
+ ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "completion_key", (u_long) result.completion_key ()));
+ ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "error", result.error ()));
+ ACE_DEBUG ((LM_DEBUG, "********************\n"));
+ ACE_DEBUG ((LM_DEBUG, "%s = %s\n", "message_block", result.message_block ().rd_ptr ()));
+ }
+
+ if (result.success () && result.bytes_transferred () != 0)
+ {
+ // Successful read: write the data to the file asynchronously.
+ // Note how we reuse the <ACE_Message_Block> for the writing.
+ // Therefore, we do not delete this buffer because it is handled
+ // in <handle_write_stream>.
+
+ if(this->initiate_write_stream (result.message_block (),
+ result.bytes_transferred ()) == 0)
+ {
+ if (duplex != 0)
+ {
+ // Initiate new read from the stream.
+ this->initiate_read_stream ();
+ }
+ }
+ }
+ else
+ {
+ result.message_block ().release ();
+ ACE_DEBUG ((LM_DEBUG, "Receiver completed\n"));
+ }
+
+ {
+ ACE_Guard<ACE_Recursive_Thread_Mutex> locker (mutex_);
+ io_count_--;
+ }
+ check_destroy ();
+}
+
+void
+Receiver::handle_write_stream (const ACE_Asynch_Write_Stream::Result &result)
+{
+ if (result.bytes_transferred () == 0 || result.error () != 0)
+ {
+ ACE_DEBUG ((LM_DEBUG, "handle_write_stream called\n"));
+
+ ACE_DEBUG ((LM_DEBUG, "********************\n"));
+ ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_to_write", result.bytes_to_write ()));
+ ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "handle", result.handle ()));
+ ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_transfered", result.bytes_transferred ()));
+ ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "act", (u_long) result.act ()));
+ ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "success", result.success ()));
+ ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "completion_key", (u_long) result.completion_key ()));
+ ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "error", result.error ()));
+ ACE_DEBUG ((LM_DEBUG, "********************\n"));
+ }
+
+ result.message_block ().release ();
+
+ if (result.success () && result.bytes_transferred () != 0)
+ {
+ // This code is not robust enough to deal with short file writes
+ // (which hardly ever happen);-)
+ // ACE_ASSERT (result.bytes_to_write () == result.bytes_transferred ());
+
+ if (duplex == 0)
+ initiate_read_stream ();
+ }
+
+ {
+ ACE_Guard<ACE_Recursive_Thread_Mutex> locker (mutex_);
+ io_count_--;
+ }
+ check_destroy ();
+}
+
+class Sender : public ACE_Handler
+{
+ // = TITLE
+ // Sends welcome messages receives them back.
+public:
+ Sender (void);
+ ~Sender (void);
+ int open (const ACE_TCHAR *host, u_short port);
+ void close (void);
+ ACE_HANDLE handle (void) const;
+
+protected:
+ // These methods are called by the freamwork
+
+ virtual void handle_read_stream (const ACE_Asynch_Read_Stream::Result &result);
+ // This is called when asynchronous reads from the socket complete
+
+ virtual void handle_write_stream (const ACE_Asynch_Write_Stream::Result &result);
+ // This is called when asynchronous writes from the socket complete
+
+private:
+
+ int initiate_read_stream (void);
+ int initiate_write_stream (void);
+
+ ACE_SOCK_Stream stream_;
+ // Network I/O handle
+
+ ACE_Asynch_Write_Stream ws_;
+ // ws (write stream): for writing to the socket
+
+ ACE_Asynch_Read_Stream rs_;
+ // rs (read file): for reading from the socket
+
+ ACE_Message_Block welcome_message_;
+ // Welcome message
+
+ ACE_Recursive_Thread_Mutex mutex_;
+ long io_count_;
+};
+
+static char *data = "Welcome to Irfan World! Irfan RULES here !!\n";
+
+Sender::Sender (void)
+ : io_count_ (0)
+{
+ // Moment of inspiration... :-)
+ this->welcome_message_.init (data, ACE_OS::strlen (data));
+}
+
+Sender::~Sender (void)
+{
+ close ();
+}
+
+void Sender::close (void)
+{
+ this->stream_.close ();
+}
+
+ACE_HANDLE Sender::handle (void) const
+{
+ return this->stream_.get_handle ();
+}
+
+int Sender::open (const ACE_TCHAR *host, u_short port)
+{
+ // Initialize stuff
+ // Connect to remote host
+ ACE_INET_Addr address (port, host);
+ ACE_SOCK_Connector connector;
+
+ if (connector.connect (this->stream_,
+ address) == -1)
+ {
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "%p\n",
+ "ACE_SOCK_Connector::connect"),
+ -1);
+ }
+
+ // Open ACE_Asynch_Write_Stream
+ if (this->ws_.open (*this) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "%p\n",
+ "ACE_Asynch_Write_Stream::open"),
+ -1);
+
+ // Open ACE_Asynch_Read_Stream
+ if (this->rs_.open (*this) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "%p\n",
+ "ACE_Asynch_Read_Stream::open"),
+ -1);
+
+ // Start an asynchronous transmit file
+ if (this->initiate_write_stream () == -1)
+ return -1;
+
+ if (duplex != 0)
+ // Start an asynchronous read file
+ if (this->initiate_read_stream () == -1)
+ return -1;
+
+ return 0;
+}
+
+int
+Sender::initiate_write_stream (void)
+{
+ ACE_Guard<ACE_Recursive_Thread_Mutex> locker (mutex_);
+
+ welcome_message_.rd_ptr(welcome_message_.base ());
+ welcome_message_.wr_ptr(welcome_message_.base ());
+ welcome_message_.wr_ptr (ACE_OS::strlen (data));
+
+ if (this->ws_.write (welcome_message_,
+ welcome_message_.length ()) == -1)
+ ACE_ERROR_RETURN((LM_ERROR,
+ "%p\n",
+ "ACE_Asynch_Write_Stream::write"),
+ -1);
+ io_count_++;
+ return 0;
+}
+
+int
+Sender::initiate_read_stream (void)
+{
+ ACE_Guard<ACE_Recursive_Thread_Mutex> locker (mutex_);
+
+ // Create a new <Message_Block>. Note that this message block will
+ // be used both to <read> data asynchronously from the socket and to
+ // <write> data asynchronously to the file.
+ ACE_DEBUG ((LM_DEBUG,
+ "initiate_read_stream called\n"));
+
+ ACE_Message_Block *mb = 0;
+ ACE_NEW_RETURN (mb,
+ ACE_Message_Block (BUFSIZ + 1),
+ -1);
+
+ // Inititiate read
+ if (this->rs_.read (*mb, mb->size ()- 1) == -1)
+ {
+ mb->release ();
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "%p\n",
+ "ACE_Asynch_Read_Stream::read"),
+ -1);
+ }
+
+ io_count_++;
+ return 0;
+}
+
+void
+Sender::handle_write_stream (const ACE_Asynch_Write_Stream::Result &result)
+{
+ if (result.bytes_transferred () == 0 || result.error () != 0)
+ {
+ ACE_DEBUG ((LM_DEBUG, "handle_write_stream called\n"));
+
+ // Reset pointers.
+ result.message_block ().rd_ptr (result.message_block ().rd_ptr () - result.bytes_transferred ());
+
+ ACE_DEBUG ((LM_DEBUG, "********************\n"));
+ ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_to_write", result.bytes_to_write ()));
+ ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "handle", result.handle ()));
+ ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_transfered", result.bytes_transferred ()));
+ ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "act", (u_long) result.act ()));
+ ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "success", result.success ()));
+ ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "completion_key", (u_long) result.completion_key ()));
+ ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "error", result.error ()));
+ ACE_DEBUG ((LM_DEBUG, "********************\n"));
+ ACE_DEBUG ((LM_DEBUG, "%s = %s\n", "message_block", result.message_block ().rd_ptr ()));
+ }
+
+ // Simplify just for Test
+ if (result.success () && result.bytes_transferred () != 0)
+ {
+ if (duplex != 0) // full duplex, continue write
+ initiate_write_stream ();
+ else // half-duplex read reply, after read we will start write
+ initiate_read_stream ();
+ }
+
+ {
+ ACE_Guard<ACE_Recursive_Thread_Mutex> locker (mutex_);
+ io_count_--;
+ }
+}
+
+void
+Sender::handle_read_stream (const ACE_Asynch_Read_Stream::Result &result)
+{
+ if (result.bytes_transferred () == 0 || result.error () != 0)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "handle_read_stream called\n"));
+
+ // Reset pointers.
+ result.message_block ().rd_ptr ()[result.bytes_transferred ()] = '\0';
+
+ ACE_DEBUG ((LM_DEBUG, "********************\n"));
+ ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_to_read", result.bytes_to_read ()));
+ ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "handle", result.handle ()));
+ ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_transfered", result.bytes_transferred ()));
+ ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "act", (u_long) result.act ()));
+ ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "success", result.success ()));
+ ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "completion_key", (u_long) result.completion_key ()));
+ ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "error", result.error ()));
+ ACE_DEBUG ((LM_DEBUG, "********************\n"));
+ ACE_DEBUG ((LM_DEBUG, "%s = %s\n", "message_block", result.message_block ().rd_ptr ()));
+ }
+
+ result.message_block().release ();
+
+ if (result.success () && result.bytes_transferred () != 0)
+ {
+ // Successful read: write the data to the file asynchronously.
+ // Note how we reuse the <ACE_Message_Block> for the writing.
+ // Therefore, we do not delete this buffer because it is handled
+ // in <handle_write_stream>.
+
+ if (duplex != 0) // full duplex, continue read
+ initiate_read_stream ();
+ else // half-duplex writey, after write we will start read
+ initiate_write_stream ();
+ }
+
+ {
+ ACE_Guard<ACE_Recursive_Thread_Mutex> locker (mutex_);
+ io_count_--;
+ }
+}
+
+static int
+set_proactor_type (const char *ptype)
+{
+ if (!ptype)
+ return false;
+
+ switch (toupper (*ptype))
+ {
+ case 'D' : proactor_type = 0; return true;
+ case 'A' : proactor_type = 1; return true;
+ case 'I' : proactor_type = 2; return true;
+#if defined (sun)
+ case 'S' : proactor_type = 3; return true;
+#endif /* sun */
+ }
+ return false;
+}
+
+static int
+parse_args (int argc, ACE_TCHAR *argv[])
+{
+ ACE_Get_Opt get_opt (argc, argv, ACE_TEXT("t:o:n:p:d:h:s:u"));
+ int c;
+
+ while ((c = get_opt ()) != EOF)
+ switch (c)
+ {
+ case 'd': // duplex
+ duplex = ACE_OS::atoi (get_opt.opt_arg ());
+ break;
+ case 'h': // host for sender
+ host = get_opt.opt_arg ();
+ break;
+ case 'p': // port number
+ port = ACE_OS::atoi (get_opt.opt_arg ());
+ break;
+ case 'n': // thread pool size
+ threads = ACE_OS::atoi (get_opt.opt_arg ());
+ break;
+ case 's': // number of senders
+ senders = ACE_OS::atoi (get_opt.opt_arg ());
+ if (senders > MaxSenders)
+ senders = MaxSenders;
+ break;
+ case 'o': // max number of aio for proactor
+ max_aio_operations = ACE_OS::atoi (get_opt.opt_arg ());
+ break;
+ case 't': // Proactor Type
+ if (set_proactor_type (get_opt.opt_arg ()))
+ break;
+ case 'u':
+ default:
+ ACE_ERROR ((LM_ERROR, "%p.",
+ "\nusage:"
+ "\n-o <max number of started aio operations for Proactor>"
+ "\n-t <Proactor type> UNIX-only, Win32-default always:"
+ "\n a AIOCB"
+ "\n i SIG"
+ "\n s SUN"
+ "\n d default"
+ "\n-d <duplex mode 1-on/0-off>"
+ "\n-h <host> for Sender mode"
+ "\n-n <number threads for Proactor pool>"
+ "\n-p <port to listen/connect>"
+ "\n-s <number of sender's instances>"
+ "\n-u show this message"
+ "\n"));
+
+ return -1;
+ }
+
+ return 0;
+}
+
+int
+ACE_TMAIN (int argc, ACE_TCHAR *argv[])
+{
+#if defined (sun)
+ ACE_DEBUG ((LM_DEBUG, "\nSUN defined!\n"));
+#endif
+ if (parse_args (argc, argv) == -1)
+ return -1;
+
+ disable_signal (ACE_SIGRTMIN, ACE_SIGRTMAX);
+
+ MyTask task1;
+
+ if (task1.activate (THR_NEW_LWP, threads) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "%p.\n",
+ "main"),
+ -1);
+
+ // wait for creation of Proactor
+ task1.waitready ();
+
+ Sender * send_list[MaxSenders];
+
+ ACE_Asynch_Acceptor<Receiver> acceptor;
+
+ int rc = -1;
+ int i;
+ char c;
+
+ if (host == 0) // Acceptor
+ {
+ // Simplify, initial read with zero size
+ if (acceptor.open (ACE_INET_Addr (port),0,1) == 0)
+ rc = 1;
+ }
+ else
+ {
+ for (i = 0; i < senders; ++i)
+ send_list[i] = new Sender;
+
+ for (i = 0; i < senders; ++i)
+ if (send_list[i]->open (host, port) == 0)
+ rc++;
+ }
+
+ if (rc > 0)
+ {
+ cout << "Press any key to stop=>" << flush;
+ cin.clear ();
+ cin >> c;
+ }
+
+ ACE_Proactor::end_event_loop ();
+
+ if (host != 0) // we are sender
+ {
+ for (i = 0; i < senders; ++i)
+ send_list[i]->close ();
+ }
+
+
+ ACE_Thread_Manager *tm =
+ ACE_Thread_Manager::instance();
+
+ tm->wait_task (&task1);
+
+ cout << "\nNumber of Receivers objects="
+ << Receiver::get_number_sessions ()
+ << flush;
+
+ for (i = 0; i < senders; ++i)
+ {
+ delete (send_list[i]);
+ send_list[i] = 0;
+ }
+
+ return 0;
+}
+
+static int
+disable_signal (int sigmin, int sigmax)
+{
+#ifndef ACE_WIN32
+
+ sigset_t signal_set;
+ if (sigemptyset (&signal_set) == - 1)
+ ACE_ERROR ((LM_ERROR,
+ "Error:(%P | %t):%p\n",
+ "sigemptyset failed"));
+
+ for (int i = sigmin; i <= sigmax; i++)
+ sigaddset (&signal_set, i);
+
+ // Put the <signal_set>.
+ if (ACE_OS::pthread_sigmask (SIG_BLOCK, &signal_set, 0) != 0)
+ ACE_ERROR ((LM_ERROR,
+ "Error:(%P | %t):%p\n",
+ "pthread_sigmask failed"));
+#endif /* ACE_WIN32 */
+
+ return 1;
+}
+
+// Get the <signal_set> back from the OS.
+
+#if 0
+static int
+print_sigmask (void)
+{
+#ifndef ACE_WIN32
+ sigset_t mask;
+ int member = 0;
+
+ COUT ("\n=============Signal Mask==========")
+
+ if (ACE_OS::pthread_sigmask (SIG_SETMASK, 0, & mask) != 0)
+ ACE_ERROR ((LM_ERROR,
+ "Error:(%P | %t):%p\n",
+ "ACE_OS::pthread_sigmask failed"));
+ else
+ for (int i = 1; i < 1000; i++)
+ {
+ member = sigismember (&mask,i);
+
+ COUT ("\nSig ")
+ COUT (i)
+ COUT (" is ")
+ COUT (member)
+
+ if (member == -1)
+ break;
+ }
+
+#endif /* ACE_WIN32 */
+ return 0;
+}
+#endif /* 0 */
+
+#endif /* ACE_WIN32 && !ACE_HAS_WINCE || ACE_HAS_AIO_CALLS*/
diff --git a/ACE/examples/Reactor/Proactor/test_timeout.cpp b/ACE/examples/Reactor/Proactor/test_timeout.cpp
new file mode 100644
index 00000000000..39351717db9
--- /dev/null
+++ b/ACE/examples/Reactor/Proactor/test_timeout.cpp
@@ -0,0 +1,130 @@
+// $Id: test_timeout.cpp
+
+// ============================================================================
+//
+// = LIBRARY
+// examples
+//
+// = FILENAME
+// test_timeout.cpp
+//
+// = DESCRIPTION
+//
+// This example application shows how to write event loops that
+// handle events for some fixed amount of time. Note that any
+// thread in the Proactor thread pool can call back the handler. On
+// POSIX4 systems, this test works only with POSIX_SIG_Proactor,
+// which can work with multiple threads.
+//
+// = AUTHOR
+// Irfan Pyarali and Alexander Babu Arulanthu
+//
+// ============================================================================
+
+#include "ace/Proactor.h"
+#include "ace/Task.h"
+#include "ace/Atomic_Op.h"
+#include "ace/OS_NS_sys_time.h"
+#include "ace/OS_NS_unistd.h"
+#include "ace/OS_main.h"
+
+ACE_RCSID(Proactor, test_timeout, "$Id$")
+
+#if ((defined (ACE_WIN32) && !defined (ACE_HAS_WINCE)) || \
+ (defined (ACE_HAS_AIO_CALLS)) && !defined (ACE_POSIX_AIOCB_PROACTOR))
+ // This only works on Win32 platforms and on Unix platforms supporting
+ // POSIX aio calls.
+
+class Timeout_Handler : public ACE_Handler
+{
+ // = TITLE
+ // Generic timeout handler.
+public:
+ Timeout_Handler (void)
+ : start_time_ (ACE_OS::gettimeofday ())
+ {
+ }
+
+ virtual void handle_time_out (const ACE_Time_Value &tv,
+ const void *arg)
+ {
+ // Print out when timeouts occur.
+ ACE_DEBUG ((LM_DEBUG, "(%t) %d timeout occurred for %s @ %d.\n",
+ ++count_,
+ (char *) arg,
+ (tv - this->start_time_).sec ()));
+
+ // Sleep for a while
+ ACE_OS::sleep (4);
+ }
+
+private:
+ ACE_Atomic_Op <ACE_SYNCH_MUTEX, int> count_;
+ // Number of the timer event.
+
+ ACE_Time_Value start_time_;
+ // Starting time of the test.
+};
+
+class Worker : public ACE_Task <ACE_NULL_SYNCH>
+{
+public:
+ int svc (void)
+ {
+ // Handle events for 13 seconds.
+ ACE_Time_Value run_time (13);
+
+ ACE_DEBUG ((LM_DEBUG, "(%t):Starting svc routine\n"));
+
+ if (ACE_Proactor::run_event_loop(run_time) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR, "(%t):%p.\n", "Worker::svc"), -1);
+
+ ACE_DEBUG ((LM_DEBUG, "(%t) work complete\n"));
+
+ return 0;
+ }
+};
+
+int
+ACE_TMAIN (int, ACE_TCHAR *[])
+{
+ Timeout_Handler handler;
+
+ // Register a 2 second timer.
+ ACE_Time_Value foo_tv (2);
+ if (ACE_Proactor::instance ()->schedule_timer (handler,
+ (void *) "Foo",
+ ACE_Time_Value::zero,
+ foo_tv) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "schedule_timer"), -1);
+
+ // Register a 3 second timer.
+ ACE_Time_Value bar_tv (3);
+ if (ACE_Proactor::instance ()->schedule_timer (handler,
+ (void *) "Bar",
+ ACE_Time_Value::zero,
+ bar_tv) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "schedule_timer"), -1);
+
+ Worker worker;
+
+ if (worker.activate (THR_NEW_LWP, 10) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR, "%p.\n", "main"), -1);
+
+ ACE_Thread_Manager::instance ()->wait ();
+
+ return 0;
+}
+
+#else /* ACE_WIN32 && !ACE_HAS_WINCE || ACE_HAS_AIO_CALLS && !ACE_POSIX_AIOCB_PROACTOR*/
+
+int
+main (int, char *[])
+{
+ ACE_DEBUG ((LM_DEBUG,
+ "This example is multithreaded version of test_timeout_st.cpp\n"
+ "This doesnt work on this platform !!!\n"));
+ return 1;
+}
+
+#endif /* ACE_WIN32 && !ACE_HAS_WINCE || ACE_HAS_AIO_CALLS && !ACE_POSIX_AIOCB_PROACTOR*/
diff --git a/ACE/examples/Reactor/Proactor/test_timeout_st.cpp b/ACE/examples/Reactor/Proactor/test_timeout_st.cpp
new file mode 100644
index 00000000000..ae44c2ba1f4
--- /dev/null
+++ b/ACE/examples/Reactor/Proactor/test_timeout_st.cpp
@@ -0,0 +1,99 @@
+// $Id$
+
+// ============================================================================
+//
+// = LIBRARY
+// examples
+//
+// = FILENAME
+// test_timeout_st.cpp
+//
+// = DESCRIPTION
+//
+// This example application shows how to write event loops that
+// handle events for some fixed amount of time. This is the single
+// threaded version of the test_timeout.cpp application.
+//
+// = AUTHOR
+// Irfan Pyarali and Alexander Babu Arulanthu
+//
+// ============================================================================
+
+#include "ace/Proactor.h"
+#include "ace/OS_main.h"
+
+ACE_RCSID(Proactor, test_timeout, "$Id$")
+
+#if ((defined (ACE_WIN32) && !defined (ACE_HAS_WINCE)) || (defined (ACE_HAS_AIO_CALLS)))
+// This only works on Win32 platforms and on Unix platforms supporting
+// POSIX aio calls.
+
+class Timeout_Handler : public ACE_Handler
+{
+ // = TITLE
+ // Generic timeout handler.
+
+public:
+ Timeout_Handler (void)
+ : count_ (0),
+ start_time_ (ACE_OS::gettimeofday ())
+ {
+ }
+
+ virtual void handle_time_out (const ACE_Time_Value &tv,
+ const void *arg)
+ {
+ // Print out when timeouts occur.
+ ACE_DEBUG ((LM_DEBUG, "(%t) %d timeout occurred for %s @ %d.\n",
+ ++count_,
+ (char *) arg,
+ (tv - this->start_time_).sec ()));
+ }
+
+private:
+ int count_;
+ // Sequence number for the timeouts.
+
+ ACE_Time_Value start_time_;
+ // Starting time of the test.
+};
+
+
+int
+ACE_TMAIN (int, ACE_TCHAR *[])
+{
+ Timeout_Handler handler;
+
+ // Register a 2 second timer.
+ ACE_Time_Value foo_tv (2);
+ if (ACE_Proactor::instance ()->schedule_timer (handler,
+ (void *) "Foo",
+ ACE_Time_Value::zero,
+ foo_tv) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "schedule_timer"), -1);
+
+ // Register a 3 second timer.
+ ACE_Time_Value bar_tv (3);
+ if (ACE_Proactor::instance ()->schedule_timer (handler,
+ (void *) "Bar",
+ ACE_Time_Value::zero,
+ bar_tv) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "schedule_timer"), -1);
+
+ // Handle events for 13 seconds.
+ ACE_Time_Value run_time (13);
+
+ ACE_DEBUG ((LM_DEBUG, "Starting event loop\n"));
+
+ // Run the event loop.
+ if (ACE_Proactor::run_event_loop(run_time) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "(%t):%p.\n", "Worker::svc"),
+ 1);
+
+ ACE_DEBUG ((LM_DEBUG, "Ending event loop\n"));
+
+ return 0;
+}
+
+#endif /* ACE_WIN32 && !ACE_HAS_WINCE || ACE_HAS_AIO_CALLS*/
diff --git a/ACE/examples/Reactor/Proactor/test_udp_proactor.cpp b/ACE/examples/Reactor/Proactor/test_udp_proactor.cpp
new file mode 100644
index 00000000000..49d834a2884
--- /dev/null
+++ b/ACE/examples/Reactor/Proactor/test_udp_proactor.cpp
@@ -0,0 +1,432 @@
+// $Id$
+
+// ============================================================================
+//
+// = LIBRARY
+// examples
+//
+// = FILENAME
+// test_udp_proactor.cpp
+//
+// = DESCRIPTION
+// This program illustrates how the <ACE_Proactor> can be used to
+// implement an application that does asynchronous operations using
+// datagrams.
+//
+// = AUTHOR
+// Irfan Pyarali <irfan@cs.wustl.edu> and
+// Roger Tragin <r.tragin@computer.org>
+//
+// ============================================================================
+
+#include "ace/OS_NS_string.h"
+#include "ace/OS_main.h"
+#include "ace/Proactor.h"
+#include "ace/Asynch_IO.h"
+#include "ace/INET_Addr.h"
+#include "ace/SOCK_Dgram.h"
+#include "ace/Message_Block.h"
+#include "ace/Get_Opt.h"
+#include "ace/Log_Msg.h"
+
+ACE_RCSID(Proactor, test_udp_proactor, "test_proactor.cpp,v 1.29 2001/02/02 23:41:16 shuston Exp")
+
+#if defined (ACE_WIN32) && !defined (ACE_HAS_WINCE) || defined (ACE_HAS_AIO_CALLS)
+ // This only works on Win32 platforms.
+
+// Host that we're connecting to.
+static ACE_TCHAR *host = 0;
+
+// Port that we're receiving connections on.
+static u_short port = ACE_DEFAULT_SERVER_PORT;
+
+// Keep track of when we're done.
+static int done = 0;
+
+class Receiver : public ACE_Service_Handler
+{
+ // = TITLE
+ // This class will receive data from
+ // the network connection and dump it to a file.
+public:
+ // = Initialization and termination.
+ Receiver (void);
+ ~Receiver (void);
+
+ int open_addr (const ACE_INET_Addr &localAddr);
+
+protected:
+ // These methods are called by the framework
+
+ /// This method will be called when an asynchronous read completes on
+ /// a UDP socket.
+ virtual void handle_read_dgram (const ACE_Asynch_Read_Dgram::Result &result);
+
+private:
+ ACE_SOCK_Dgram sock_dgram_;
+
+ ACE_Asynch_Read_Dgram rd_;
+ // rd (read dgram): for reading from a UDP socket.
+ const char* completion_key_;
+ const char* act_;
+};
+
+Receiver::Receiver (void)
+ : completion_key_ ("Receiver Completion Key"),
+ act_ ("Receiver ACT")
+{
+}
+
+Receiver::~Receiver (void)
+{
+ sock_dgram_.close ();
+}
+
+int
+Receiver::open_addr (const ACE_INET_Addr &localAddr)
+{
+ ACE_DEBUG ((LM_DEBUG,
+ "%N:%l:Receiver::open_addr called\n"));
+
+ // Create a local UDP socket to receive datagrams.
+ if (this->sock_dgram_.open (localAddr) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "%p\n",
+ "ACE_SOCK_Dgram::open"), -1);
+
+ // Initialize the asynchronous read.
+ if (this->rd_.open (*this,
+ this->sock_dgram_.get_handle (),
+ this->completion_key_,
+ ACE_Proactor::instance ()) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "%p\n",
+ "ACE_Asynch_Read_Dgram::open"), -1);
+
+ // Create a buffer to read into. We are using scatter/gather to
+ // read the message header and message body into 2 buffers
+
+ // create a message block to read the message header
+ ACE_Message_Block* msg = 0;
+ ACE_NEW_RETURN (msg, ACE_Message_Block (1024), -1);
+
+ // the next line sets the size of the header, even though we
+ // allocated a the message block of 1k, by setting the size to 20
+ // bytes then the first 20 bytes of the reveived datagram will be
+ // put into this message block.
+ msg->size (20); // size of header to read is 20 bytes
+
+ // create a message block to read the message body
+ ACE_Message_Block* body = 0;
+ ACE_NEW_RETURN (body, ACE_Message_Block (1024), -1);
+ // The message body will not exceed 1024 bytes, at least not in this test.
+
+ // set body as the cont of msg. This associates the 2 message
+ // blocks so that a read will fill the first block (which is the
+ // header) up to size (), and use the cont () block for the rest of
+ // the data. You can chain up to IOV_MAX message block using this
+ // method.
+ msg->cont (body);
+
+ // ok lets do the asynch read
+ size_t number_of_bytes_recvd = 0;
+
+ int res = rd_.recv (msg,
+ number_of_bytes_recvd,
+ 0,
+ PF_INET,
+ this->act_);
+ switch (res)
+ {
+ case 0:
+ // this is a good error. The proactor will call our handler when the
+ // read has completed.
+ break;
+ case 1:
+ // actually read something, we will handle it in the handler callback
+ ACE_DEBUG ((LM_DEBUG, "********************\n"));
+ ACE_DEBUG ((LM_DEBUG,
+ "%s = %d\n",
+ "bytes recieved immediately",
+ number_of_bytes_recvd));
+ ACE_DEBUG ((LM_DEBUG, "********************\n"));
+ res = 0;
+ break;
+ case -1:
+ // Something else went wrong.
+ ACE_ERROR ((LM_ERROR,
+ "%p\n",
+ "ACE_Asynch_Read_Dgram::recv"));
+ // the handler will not get called in this case so lets clean up our msg
+ msg->release ();
+ break;
+ default:
+ // Something undocumented really went wrong.
+ ACE_ERROR ((LM_ERROR,
+ "%p\n",
+ "ACE_Asynch_Read_Dgram::recv"));
+ msg->release ();
+ break;
+ }
+
+ return res;
+}
+
+void
+Receiver::handle_read_dgram (const ACE_Asynch_Read_Dgram::Result &result)
+{
+ ACE_DEBUG ((LM_DEBUG,
+ "handle_read_dgram called\n"));
+
+ ACE_DEBUG ((LM_DEBUG, "********************\n"));
+ ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_to_read", result.bytes_to_read ()));
+ ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "handle", result.handle ()));
+ ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_transfered", result.bytes_transferred ()));
+ ACE_INET_Addr peerAddr;
+ result.remote_address (peerAddr);
+ ACE_DEBUG ((LM_DEBUG, "%s = %s:%d\n", "peer_address", peerAddr.get_host_addr (), peerAddr.get_port_number ()));
+ ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "flags", result.flags ()));
+ ACE_DEBUG ((LM_DEBUG, "%s = %s\n", "act", result.act ()));
+ ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "success", result.success ()));
+ ACE_DEBUG ((LM_DEBUG, "%s = %s\n", "completion_key", result.completion_key ()));
+ ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "error", result.error ()));
+ ACE_DEBUG ((LM_DEBUG, "********************\n"));
+
+ if (result.success () && result.bytes_transferred () != 0)
+ {
+ // loop through our message block and print out the contents
+ for (const ACE_Message_Block* msg = result.message_block (); msg != 0; msg = msg->cont ())
+ { // use msg->length () to get the number of bytes written to the message
+ // block.
+ ACE_DEBUG ((LM_DEBUG, "Buf=[size=<%d>", msg->length ()));
+ for (u_long i = 0; i < msg->length (); ++i)
+ ACE_DEBUG ((LM_DEBUG,
+ "%c", (msg->rd_ptr ())[i]));
+ ACE_DEBUG ((LM_DEBUG, "]\n"));
+ }
+ }
+
+ ACE_DEBUG ((LM_DEBUG,
+ "Receiver completed\n"));
+
+ // No need for this message block anymore.
+ result.message_block ()->release ();
+
+ // Note that we are done with the test.
+ done++;
+}
+
+class Sender : public ACE_Handler
+{
+ // = TITLE
+ // The class will be created by <main>.
+public:
+ Sender (void);
+ ~Sender (void);
+ int open (const ACE_TCHAR *host, u_short port);
+
+protected:
+ // These methods are called by the freamwork
+
+ virtual void handle_write_dgram (const ACE_Asynch_Write_Dgram::Result &result);
+ // This is called when asynchronous writes from the dgram socket
+ // complete
+
+private:
+
+ ACE_SOCK_Dgram sock_dgram_;
+ // Network I/O handle
+
+ ACE_Asynch_Write_Dgram wd_;
+ // wd (write dgram): for writing to the socket
+
+ const char* completion_key_;
+ const char* act_;
+};
+
+Sender::Sender (void)
+ : completion_key_ ("Sender completion key"),
+ act_ ("Sender ACT")
+{
+}
+
+Sender::~Sender (void)
+{
+ this->sock_dgram_.close ();
+}
+
+int
+Sender::open (const ACE_TCHAR *host,
+ u_short port)
+{
+ // Initialize stuff
+
+ if (this->sock_dgram_.open (ACE_INET_Addr::sap_any) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "%p\n",
+ "ACE_SOCK_Dgram::open"), -1);
+
+ // Initialize the asynchronous read.
+ if (this->wd_.open (*this,
+ this->sock_dgram_.get_handle (),
+ this->completion_key_,
+ ACE_Proactor::instance ()) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "%p\n",
+ "ACE_Asynch_Write_Dgram::open"), -1);
+
+ // We are using scatter/gather to send the message header and
+ // message body using 2 buffers
+
+ // create a message block for the message header
+ ACE_Message_Block* msg = 0;
+ ACE_NEW_RETURN (msg, ACE_Message_Block (100), -1);
+ const char raw_msg [] = "To be or not to be.";
+ // Copy buf into the Message_Block and update the wr_ptr ().
+ msg->copy (raw_msg, ACE_OS::strlen (raw_msg) + 1);
+
+ // create a message block for the message body
+ ACE_Message_Block* body = 0;
+ ACE_NEW_RETURN (body, ACE_Message_Block (100), -1);
+ ACE_OS::memset (body->wr_ptr (), 'X', 100);
+ body->wr_ptr (100); // always remember to update the wr_ptr ()
+
+ // set body as the cont of msg. This associates the 2 message blocks so
+ // that a send will send the first block (which is the header) up to
+ // length (), and use the cont () to get the next block to send. You can
+ // chain up to IOV_MAX message block using this method.
+ msg->cont (body);
+
+ // do the asynch send
+ size_t number_of_bytes_sent = 0;
+ ACE_INET_Addr serverAddr (port, host);
+ int res = this->wd_.send (msg, number_of_bytes_sent, 0, serverAddr, this->act_);
+
+ switch (res)
+ {
+ case 0:
+ // this is a good error. The proactor will call our handler when the
+ // send has completed.
+ break;
+ case 1:
+ // actually sent something, we will handle it in the handler callback
+ ACE_DEBUG ((LM_DEBUG, "********************\n"));
+ ACE_DEBUG ((LM_DEBUG,
+ "%s = %d\n",
+ "bytes sent immediately",
+ number_of_bytes_sent));
+ ACE_DEBUG ((LM_DEBUG, "********************\n"));
+ res = 0;
+ break;
+ case -1:
+ // Something else went wrong.
+ ACE_ERROR ((LM_ERROR,
+ "%p\n",
+ "ACE_Asynch_Write_Dgram::recv"));
+ // the handler will not get called in this case so lets clean up our msg
+ msg->release ();
+ break;
+ default:
+ // Something undocumented really went wrong.
+ ACE_ERROR ((LM_ERROR,
+ "%p\n",
+ "ACE_Asynch_Write_Dgram::recv"));
+ msg->release ();
+ break;
+ }
+ return res;
+}
+
+void
+Sender::handle_write_dgram (const ACE_Asynch_Write_Dgram::Result &result)
+{
+ ACE_DEBUG ((LM_DEBUG,
+ "handle_write_dgram called\n"));
+
+ ACE_DEBUG ((LM_DEBUG, "********************\n"));
+ ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_to_write", result.bytes_to_write ()));
+ ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "handle", result.handle ()));
+ ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_transfered", result.bytes_transferred ()));
+ ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "flags", result.flags ()));
+ ACE_DEBUG ((LM_DEBUG, "%s = %s\n", "act", result.act ()));
+ ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "success", result.success ()));
+ ACE_DEBUG ((LM_DEBUG, "%s = %s\n", "completion_key", result.completion_key ()));
+ ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "error", result.error ()));
+ ACE_DEBUG ((LM_DEBUG, "********************\n"));
+
+
+ ACE_DEBUG ((LM_DEBUG,
+ "Sender completed\n"));
+
+ // No need for this message block anymore.
+ result.message_block ()->release ();
+
+ // Note that we are done with the test.
+ done++;
+}
+
+static int
+parse_args (int argc, ACE_TCHAR *argv[])
+{
+ ACE_Get_Opt get_opt (argc, argv, ACE_TEXT ("h:p:"));
+ int c;
+
+ while ((c = get_opt ()) != EOF)
+ switch (c)
+ {
+ case 'h':
+ host = get_opt.opt_arg ();
+ break;
+ case 'p':
+ port = ACE_OS::atoi (get_opt.opt_arg ());
+ break;
+ default:
+ ACE_ERROR_RETURN ((LM_ERROR, "%p.\n",
+ "usage :\n"
+ "-h <host>\n"), -1);
+ }
+
+ return 0;
+}
+
+int
+ACE_TMAIN (int argc, ACE_TCHAR *argv[])
+{
+ if (parse_args (argc, argv) == -1)
+ return -1;
+
+ Sender sender;
+
+ Receiver receiver;
+
+ // If passive side
+ if (host == 0)
+ {
+ if (receiver.open_addr (ACE_INET_Addr (port)) == -1)
+ return -1;
+ }
+ // If active side
+ else if (sender.open (host, port) == -1)
+ return -1;
+
+ for (int success = 1;
+ success > 0 && !done;
+ )
+ // Dispatch events via Proactor singleton.
+ success = ACE_Proactor::instance ()->handle_events ();
+
+ return 0;
+}
+
+#else /* ACE_WIN32 && !ACE_HAS_WINCE || ACE_HAS_AIO_CALLS*/
+
+int
+ACE_TMAIN (int, ACE_TCHAR *[])
+{
+ ACE_DEBUG ((LM_DEBUG,
+ "This example does not work on this platform.\n"));
+ return 1;
+}
+
+#endif /* ACE_WIN32 && !ACE_HAS_WINCE || ACE_HAS_AIO_CALLS*/
+