diff options
author | William R. Otte <wotte@dre.vanderbilt.edu> | 2006-07-24 15:50:30 +0000 |
---|---|---|
committer | William R. Otte <wotte@dre.vanderbilt.edu> | 2006-07-24 15:50:30 +0000 |
commit | c44379cc7d9c7aa113989237ab0f56db12aa5219 (patch) | |
tree | 66a84b20d47f2269d8bdc6e0323f338763424d3a /ACE/examples/Reactor/Proactor | |
parent | 3aff90f4a822fcf5d902bbfbcc9fa931d6191a8c (diff) | |
download | ATCD-c44379cc7d9c7aa113989237ab0f56db12aa5219.tar.gz |
Repo restructuring
Diffstat (limited to 'ACE/examples/Reactor/Proactor')
22 files changed, 5825 insertions, 0 deletions
diff --git a/ACE/examples/Reactor/Proactor/.cvsignore b/ACE/examples/Reactor/Proactor/.cvsignore new file mode 100644 index 00000000000..34179361b75 --- /dev/null +++ b/ACE/examples/Reactor/Proactor/.cvsignore @@ -0,0 +1,7 @@ +test_cancel +test_end_event_loop +test_multiple_loops +test_post_completions +test_proactor +test_timeout +test_udp_proactor diff --git a/ACE/examples/Reactor/Proactor/Aio_Platform_Test_C.cpp b/ACE/examples/Reactor/Proactor/Aio_Platform_Test_C.cpp new file mode 100644 index 00000000000..be720fdef40 --- /dev/null +++ b/ACE/examples/Reactor/Proactor/Aio_Platform_Test_C.cpp @@ -0,0 +1,137 @@ +// $Id$ +// ============================================================================ +// +// = FILENAME +// aio_platform_test_c.cpp +// +// = DESCRITPTION +// Testing the platform for POSIX Asynchronous I/O. This is the C +// version of the $ACE_ROOT/tests/Aio_Platform_Test.cpp. Useful +// to send bug reports. +// +// = AUTHOR +// Programming for the Real World. Bill O. GallMeister. +// Modified by Alexander Babu Arulanthu <alex@cs.wustl.edu> +// +// ===================================================================== + + +#include <unistd.h> +#include <fcntl.h> +#include <sys/types.h> +#include <sys/stat.h> +#include <signal.h> +#include <string.h> +#include <errno.h> +#include <stdio.h> + +#include <limits.h> + +#include <aio.h> + +int do_sysconf (void); +int have_asynchio (void); + +static int file_handle = -1; +char mb1 [BUFSIZ + 1]; +char mb2 [BUFSIZ + 1]; +aiocb aiocb1, aiocb2; +sigset_t completion_signal; + +// For testing the <aio> stuff. +int test_aio_calls (void); +int issue_aio_calls (void); +int query_aio_completions (void); +int setup_signal_delivery (void); +int do_sysconf (void); +int have_asynchio (void); + +int +do_sysconf (void) +{ + // Call sysconf to find out runtime values. + errno = 0; +#if defined (_SC_LISTIO_AIO_MAX) + printf ("Runtime value of LISTIO_AIO_MAX is %d, errno = %d\n", + sysconf(_SC_LISTIO_AIO_MAX), + errno); +#else + printf ("Runtime value of AIO_LISTIO_MAX is %d, errno = %d\n", + sysconf(_SC_AIO_LISTIO_MAX), + errno); +#endif + + errno = 0; + printf ("Runtime value of AIO_MAX is %d, errno = %d\n", + sysconf (_SC_AIO_MAX), + errno); + + errno = 0; + printf ("Runtime value of _POSIX_ASYNCHRONOUS_IO is %d, errno = %d\n", + sysconf (_SC_ASYNCHRONOUS_IO), + errno); + + errno = 0; + printf ("Runtime value of _POSIX_REALTIME_SIGNALS is %d, errno = %d\n", + sysconf (_SC_REALTIME_SIGNALS), + errno); + + errno = 0; + printf ("Runtime value of RTSIG_MAX %d, Errno = %d\n", + sysconf (_SC_RTSIG_MAX), + errno); + + errno = 0; + printf ("Runtime value of SIGQUEUE_MAX %d, Errno = %d\n", + sysconf (_SC_SIGQUEUE_MAX), + errno); + return 0; +} + +int +have_asynchio (void) +{ +#if defined (_POSIX_ASYNCHRONOUS_IO) + // POSIX Asynch IO is present in this system. +#if defined (_POSIX_ASYNC_IO) + // If this is defined and it is not -1, POSIX_ASYNCH is supported + // everywhere in the system. +#if _POSIX_ASYNC_IO == -1 + printf ("_POSIX_ASYNC_IO = -1.. ASYNCH IO NOT supported at all\n"); + return -1; +#else /* Not _POSIX_ASYNC_IO == -1 */ + printf ("_POSIX_ASYNC_IO = %d\n ASYNCH IO is supported FULLY\n", + _POSIX_ASYNC_IO); +#endif /* _POSIX_ASYNC_IO == -1 */ + +#else /* Not defined _POSIX_ASYNC_IO */ + printf ("_POSIX_ASYNC_IO is not defined.\n"); + printf ("AIO might *not* be supported on some paths\n"); +#endif /* _POSIX_ASYNC_IO */ + + // System defined POSIX Values. + printf ("System claims to have POSIX_ASYNCHRONOUS_IO\n"); + + printf ("_POSIX_AIO_LISTIO_MAX = %d\n", _POSIX_AIO_LISTIO_MAX); + printf ("_POSIX_AIO_MAX = %d\n", _POSIX_AIO_MAX); + + // Check and print the run time values. + do_sysconf (); + + return 0; + +#else /* Not _POSIX_ASYNCHRONOUS_IO */ + printf ("No support._POSIX_ASYNCHRONOUS_IO itself is not defined\n"); + return -1; +#endif /* _POSIX_ASYNCHRONOUS_IO */ +} + +int +main (int, char *[]) +{ + if (have_asynchio () == 0) + printf ("Test successful\n"); + else + printf ("Test not successful\n"); + return 0; +} diff --git a/ACE/examples/Reactor/Proactor/Makefile.am b/ACE/examples/Reactor/Proactor/Makefile.am new file mode 100644 index 00000000000..7f1bc4b8a57 --- /dev/null +++ b/ACE/examples/Reactor/Proactor/Makefile.am @@ -0,0 +1,153 @@ +## Process this file with automake to create Makefile.in +## +## $Id$ +## +## This file was generated by MPC. Any changes made directly to +## this file will be lost the next time it is generated. +## +## MPC Command: +## /acebuilds/ACE_wrappers-repository/bin/mwc.pl -include /acebuilds/MPC/config -include /acebuilds/MPC/templates -feature_file /acebuilds/ACE_wrappers-repository/local.features -noreldefs -type automake -exclude build,Kokyu + +ACE_BUILDDIR = $(top_builddir) +ACE_ROOT = $(top_srcdir) + +noinst_PROGRAMS = + +## Makefile.Proactor_Cancel.am + +if !BUILD_ACE_FOR_TAO +noinst_PROGRAMS += test_cancel + +test_cancel_CPPFLAGS = \ + -I$(ACE_ROOT) \ + -I$(ACE_BUILDDIR) + +test_cancel_SOURCES = \ + test_cancel.cpp \ + test_cancel.h + +test_cancel_LDADD = \ + $(ACE_BUILDDIR)/ace/libACE.la + +endif !BUILD_ACE_FOR_TAO + +## Makefile.Proactor_End_Event_Loops.am + +if !BUILD_ACE_FOR_TAO +noinst_PROGRAMS += test_end_event_loop + +test_end_event_loop_CPPFLAGS = \ + -I$(ACE_ROOT) \ + -I$(ACE_BUILDDIR) + +test_end_event_loop_SOURCES = \ + test_end_event_loop.cpp \ + test_cancel.h \ + test_proactor.h + +test_end_event_loop_LDADD = \ + $(ACE_BUILDDIR)/ace/libACE.la + +endif !BUILD_ACE_FOR_TAO + +## Makefile.Proactor_Multiple_Loops.am + +if !BUILD_ACE_FOR_TAO +noinst_PROGRAMS += test_multiple_loops + +test_multiple_loops_CPPFLAGS = \ + -I$(ACE_ROOT) \ + -I$(ACE_BUILDDIR) + +test_multiple_loops_SOURCES = \ + test_multiple_loops.cpp \ + test_cancel.h \ + test_proactor.h + +test_multiple_loops_LDADD = \ + $(ACE_BUILDDIR)/ace/libACE.la + +endif !BUILD_ACE_FOR_TAO + +## Makefile.Proactor_Post_Completions.am + +if !BUILD_ACE_FOR_TAO +noinst_PROGRAMS += test_post_completions + +test_post_completions_CPPFLAGS = \ + -I$(ACE_ROOT) \ + -I$(ACE_BUILDDIR) + +test_post_completions_SOURCES = \ + post_completions.cpp \ + test_cancel.h \ + test_proactor.h + +test_post_completions_LDADD = \ + $(ACE_BUILDDIR)/ace/libACE.la + +endif !BUILD_ACE_FOR_TAO + +## Makefile.Proactor_Proactor.am + +if !BUILD_ACE_FOR_TAO +noinst_PROGRAMS += test_proactor + +test_proactor_CPPFLAGS = \ + -I$(ACE_ROOT) \ + -I$(ACE_BUILDDIR) + +test_proactor_SOURCES = \ + test_proactor.cpp \ + test_proactor.h + +test_proactor_LDADD = \ + $(ACE_BUILDDIR)/ace/libACE.la + +endif !BUILD_ACE_FOR_TAO + +## Makefile.Proactor_Timeout.am + +if !BUILD_ACE_FOR_TAO +noinst_PROGRAMS += test_timeout + +test_timeout_CPPFLAGS = \ + -I$(ACE_ROOT) \ + -I$(ACE_BUILDDIR) + +test_timeout_SOURCES = \ + test_timeout.cpp \ + test_cancel.h \ + test_proactor.h + +test_timeout_LDADD = \ + $(ACE_BUILDDIR)/ace/libACE.la + +endif !BUILD_ACE_FOR_TAO + +## Makefile.Proactor_Udp_Proactor.am + +if !BUILD_ACE_FOR_TAO +noinst_PROGRAMS += test_udp_proactor + +test_udp_proactor_CPPFLAGS = \ + -I$(ACE_ROOT) \ + -I$(ACE_BUILDDIR) + +test_udp_proactor_SOURCES = \ + test_udp_proactor.cpp \ + test_cancel.h \ + test_proactor.h + +test_udp_proactor_LDADD = \ + $(ACE_BUILDDIR)/ace/libACE.la + +endif !BUILD_ACE_FOR_TAO + +## Clean up template repositories, etc. +clean-local: + -rm -f *~ *.bak *.rpo *.sym lib*.*_pure_* core core.* + -rm -f gcctemp.c gcctemp so_locations *.ics + -rm -rf cxx_repository ptrepository ti_files + -rm -rf templateregistry ir.out + -rm -rf ptrepository SunWS_cache Templates.DB diff --git a/ACE/examples/Reactor/Proactor/Proactor.mpc b/ACE/examples/Reactor/Proactor/Proactor.mpc new file mode 100644 index 00000000000..c2c52207ca1 --- /dev/null +++ b/ACE/examples/Reactor/Proactor/Proactor.mpc @@ -0,0 +1,59 @@ +// -*- MPC -*- +// $Id$ + +project(*cancel) : aceexe { + avoids += ace_for_tao + exename = test_cancel + Source_Files { + test_cancel.cpp + } +} + +project(*end_event_loops) : aceexe { + avoids += ace_for_tao + exename = test_end_event_loop + Source_Files { + test_end_event_loop.cpp + } +} + +project(*multiple_loops) : aceexe { + avoids += ace_for_tao + exename = test_multiple_loops + Source_Files { + test_multiple_loops.cpp + } +} + +project(*post_completions) : aceexe { + avoids += ace_for_tao + exename = test_post_completions + Source_Files { + post_completions.cpp + } +} + +project(*proactor) : aceexe { + avoids += ace_for_tao + exename = test_proactor + Source_Files { + test_proactor.cpp + } +} + +project(*timeout) : aceexe { + avoids += ace_for_tao + exename = test_timeout + Source_Files { + test_timeout.cpp + } +} + +project(*udp_proactor) : aceexe { + avoids += ace_for_tao + exename = test_udp_proactor + Source_Files { + test_udp_proactor.cpp + } +} + diff --git a/ACE/examples/Reactor/Proactor/README b/ACE/examples/Reactor/Proactor/README new file mode 100644 index 00000000000..29f2a0b1832 --- /dev/null +++ b/ACE/examples/Reactor/Proactor/README @@ -0,0 +1,75 @@ +$Id$ + +This README file lists all the example applications for the Proactor framework. + +Test/Example Applications for Proactor: +========================================= + +The following tests are available. + +o $ACE_ROOT/tests/Aio_Platform_Test.cpp : Tests basic limits + pertaining to the POSIX features + +o $ACE_ROOT/examples/Reactor/Proactor/test_aiocb.cpp : + This is a C++ program for testing the AIOCB (AIO Control + Blocks) based completion approach which uses <aio_suspend> for + completion querying. + +o $ACE_ROOT/examples/Reactor/Proactor/test_aiosig.cpp : This is a + C++ program for testing the Signal based completion approach + that uses <sigtimedwait> for completion querying. + +o $ACE_ROOT/examples/Reactor/Proactor/test_aiocb_ace.cpp: Portable + version of test_aiocb.cpp. (Same as test_aiocb.cpp, but uses + ACE_DEBUGs instead of printf's and ACE_Message_Blocks instead + of char*'s. + +o $ACE_ROOT/examples/Reactor/Proactor/test_aiosig_ace.cpp: Portable + version of test_aiosig.cpp. (Same as test_aiosig.cpp, but uses + ACE_DEBUGs instead of printf's and ACE_Message_Blocks instead + of char*'s. + +o test_proactor.cpp (with ACE_POSIX_AIOCB_Proactor) : Test for + ACE_Proactor which uses AIOCB (AIO Control Blocks) based + completions strategy Proactor. (#define + ACE_POSIX_AIOCB_PROACTOR in the config file, but this is the + default option) + +o test_proactor.cpp (with ACE_POSIX_SIG_Proactor) : Test for + ACE_Proactor which uses real time signal based completion + strategy proactor. (#define ACE_POSIX_SIG_PROACTOR in the + config file) + +o test_multiple_loops.cpp : This example application shows how + to write programs that combine the Proactor and Reactor event + loops. This is possible only on WIN32 platform. + +o test_timeout.cpp : Multithreaded application testing the Timers + mechanism of the Proactor. + +o test_timeout_st.cpp : Single-threaded version of test_timeout.cpp. + +o post_completions.cpp : Tests the completion posting mechanism of + the Proactor. + +o test_end_event_loop.cpp : Tests the event loop mechanism of the + Proactor. + +o test_cancel.cpp : Tests <cancel> interface of the + Asynch_Operation class. + +Behavior of POSIX AIO of various platforms: +========================================== + +Sun 5.6 : POSIX4 Real-Time signals implementation is broken in + this platform. + Only POSIX AIOCB Proactor works in this platform. + Therefore, it is not possible to use multiple threads + with in the framework. + +Sun 5.7 : AIOCB and SIG Proactors work fine. + +LynxOS 3.0.0 : <pthread_sigmask> is not available in this + platform. So, only AIOCB Proactor works here. + + diff --git a/ACE/examples/Reactor/Proactor/post_completions.cpp b/ACE/examples/Reactor/Proactor/post_completions.cpp new file mode 100644 index 00000000000..e6545241953 --- /dev/null +++ b/ACE/examples/Reactor/Proactor/post_completions.cpp @@ -0,0 +1,306 @@ +// $Id$ +// ============================================================================ +// +// = FILENAME +// post_completions.cpp +// +// = DESCRITPTION +// This program demonstrates how to post fake completions to The +// Proactor. It also shows the how to specify the particular +// real-time signals to post completions. The Real-time signal +// based completion strategy is implemented with +// ACE_POSIX_SIG_PROACTOR. +// (So, it can be used only if both ACE_HAS_AIO_CALLS and +// ACE_HAS_POSIX_REALTIME_SIGNALS are defined.) +// Since it is faking results, you have to pay by knowing and +// using platform-specific implementation objects for Asynchronous +// Result classes. +// This example shows using an arbitrary result class for faking +// completions. You can also use the predefined Result classes for +// faking. The factory methods in the Proactor class create the +// Result objects. +// +// = COMPILATION +// make +// +// = RUN +// ./post_completions +// +// = AUTHOR +// Alexander Babu Arulanthu <alex@cs.wustl.edu> +// +// ===================================================================== + +#include "ace/OS_NS_unistd.h" +#include "ace/OS_main.h" +#include "ace/Proactor.h" +#include "ace/Task.h" +#include "ace/WIN32_Proactor.h" +#include "ace/POSIX_Proactor.h" +#include "ace/Atomic_Op.h" +#include "ace/Thread_Mutex.h" + +// Keep track of how many completions are still expected. +static ACE_Atomic_Op <ACE_SYNCH_MUTEX, size_t> Completions_To_Go; + + +#if (defined (ACE_WIN32) && !defined (ACE_HAS_WINCE)) || \ + defined (ACE_HAS_AIO_CALLS) +// This only works on Win32 platforms and on Unix platforms supporting +// POSIX aio calls. + +#if defined (ACE_HAS_AIO_CALLS) +#define RESULT_CLASS ACE_POSIX_Asynch_Result +#elif defined (ACE_WIN32) && !defined (ACE_HAS_WINCE) +#define RESULT_CLASS ACE_WIN32_Asynch_Result +#endif /* ACE_HAS_AIO_CALLS */ + +class My_Result : public RESULT_CLASS +{ + // = TITLE + // + // Result Object that we will post to the Proactor. + // + // = DESCRIPTION + // + +public: + My_Result (ACE_Handler &handler, + const void *act, + int signal_number, + size_t sequence_number) + : RESULT_CLASS (handler.proxy (), + act, + ACE_INVALID_HANDLE, + 0, // Offset + 0, // OffsetHigh + 0, // Priority + signal_number), + sequence_number_ (sequence_number) + {} + // Constructor. + + virtual ~My_Result (void) + {} + // Destructor. + + void complete (size_t, + int success, + const void *completion_key, + u_long error) + // This is the method that will be called by the Proactor for + // dispatching the completion. This method generally calls one of + // the call back hood methods defined in the ACE_Handler + // class. But, we will just handle the completions here. + { + this->success_ = success; + this->completion_key_ = completion_key; + this->error_ = error; + + size_t to_go = --Completions_To_Go; + + // Print the completion details. + ACE_DEBUG ((LM_DEBUG, + "(%t) Completion sequence number %d, success : %d, error : %d, signal_number : %d, %u more to go\n", + this->sequence_number_, + this->success_, + this->error_, + this->signal_number (), + to_go)); + + // Sleep for a while. + ACE_OS::sleep (4); + } + +private: + size_t sequence_number_; + // Sequence number for the result object. +}; + +class My_Handler : public ACE_Handler +{ + // = TITLE + // + // Handler class for faked completions. + // + // = DESCRIPTION + // + +public: + My_Handler (void) {} + // Constructor. + + virtual ~My_Handler (void) {} + // Destructor. +}; + +class My_Task: public ACE_Task <ACE_NULL_SYNCH> +{ + // = TITLE + // + // Contains thread functions which execute event loops. Each + // thread waits for a different signal. + // +public: + My_Task (void) {} + // Constructor. + + virtual ~My_Task (void) {} + // Destructor. + + int open (void *proactor) + { + // Store the proactor. + this->proactor_ = (ACE_Proactor *) proactor; + + // Activate the Task. + this->activate (THR_NEW_LWP, 5); + return 0; + } + + int svc (void) + { + // Handle events for 13 seconds. + ACE_Time_Value run_time (13); + + ACE_DEBUG ((LM_DEBUG, "(%t):Starting svc routine\n")); + + if (this->proactor_->handle_events (run_time) == -1) + ACE_ERROR_RETURN ((LM_ERROR, "(%t):%p.\n", "Worker::svc"), -1); + + ACE_DEBUG ((LM_DEBUG, "(%t) work complete\n")); + + return 0; + } + +private: + ACE_Proactor *proactor_; + // Proactor for this task. +}; + +int +ACE_TMAIN (int argc, ACE_TCHAR *argv[]) +{ + ACE_UNUSED_ARG (argc); + ACE_UNUSED_ARG (argv); + + ACE_DEBUG ((LM_DEBUG, + "(%P | %t):Test starts \n")); + + // = Get two POSIX_SIG_Proactors, one with SIGRTMIN and one with + // SIGRTMAX. + + ACE_Proactor proactor1; + // Proactor1. SIGRTMIN Proactor. (default). + + // = Proactor2. SIGRTMAX Proactor. +#if defined (ACE_HAS_AIO_CALLS) && defined (ACE_HAS_POSIX_REALTIME_SIGNALS) + + ACE_DEBUG ((LM_DEBUG, "Using ACE_POSIX_SIG_Proactor\n")); + + sigset_t signal_set; + // Signal set that we want to mask. + + // Clear the signal set. + if (sigemptyset (&signal_set) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + "Error:%p\n", + "sigemptyset failed"), + 1); + + // Add the SIGRTMAX to the signal set. + if (sigaddset (&signal_set, ACE_SIGRTMAX) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + "Error:%p\n", + "sigaddset failed"), + 1); + + // Make the POSIX Proactor. + ACE_POSIX_SIG_Proactor posix_proactor (signal_set); + // Get the Proactor interface out of it. + ACE_Proactor proactor2 (&posix_proactor); +#else /* ACE_HAS_AIO_CALLS && ACE_HAS_POSIX_REALTIME_SIGNALS */ + ACE_Proactor proactor2; +#endif /* ACE_HAS_AIO_CALLS && ACE_HAS_POSIX_REALTIME_SIGNALS */ + + // = Create Tasks. One pool of threads to handle completions on + // SIGRTMIN and the other one to handle completions on SIGRTMAX. + My_Task task1, task2; + task1.open (&proactor1); + task2.open (&proactor2); + + // Handler for completions. + My_Handler handler; + + // = Create a few MyResult objects and post them to Proactor. + const size_t NrCompletions (10); + My_Result *result_objects [NrCompletions]; + int signal_number = ACE_SIGRTMAX; + size_t ri = 0; + + Completions_To_Go = NrCompletions; + + // Creation. + for (ri = 0; ri < NrCompletions; ri++) + { + // Use RTMIN and RTMAX proactor alternatively, to post + // completions. + if (ri % 2) + signal_number = ACE_SIGRTMIN; + else + signal_number = ACE_SIGRTMAX; + // Create the result. + ACE_NEW_RETURN (result_objects [ri], + My_Result (handler, + 0, + signal_number, + ri), + 1); + } + ACE_OS::sleep(5); + // Post all the result objects. + ACE_Proactor *proactor; + for (ri = 0; ri < NrCompletions; ri++) + { + // Use RTMIN and RTMAX Proactor alternatively, to post + // completions. + if (ri % 2) + proactor = &proactor1; + else + proactor = &proactor2; + if (result_objects [ri]->post_completion (proactor->implementation ()) + == -1) + ACE_ERROR_RETURN ((LM_ERROR, + "Test failed\n"), + 1); + } + + ACE_Thread_Manager::instance ()->wait (); + + int status = 0; + size_t to_go = Completions_To_Go.value (); + if (size_t (0) != to_go) + { + ACE_ERROR ((LM_ERROR, + "Fail! Expected all completions to finish but %u to go\n", + to_go)); + status = 1; + } + + ACE_DEBUG ((LM_DEBUG, + "(%P | %t):Test ends\n")); + return status; +} + +#else /* ACE_WIN32 && !ACE_HAS_WINCE || ACE_HAS_AIO_CALLS && !ACE_POSIX_AIOCB_PROACTOR*/ + +int +main (int, char *[]) +{ + ACE_DEBUG ((LM_DEBUG, + "This example cannot work with AIOCB_Proactor.\n")); + return 1; +} + +#endif /* ACE_WIN32 && !ACE_HAS_WINCE || ACE_HAS_AIO_CALLS && !ACE_POSIX_AIOCB_PROACTOR*/ + diff --git a/ACE/examples/Reactor/Proactor/simple_test_proactor.cpp b/ACE/examples/Reactor/Proactor/simple_test_proactor.cpp new file mode 100644 index 00000000000..1f4557d7df5 --- /dev/null +++ b/ACE/examples/Reactor/Proactor/simple_test_proactor.cpp @@ -0,0 +1,269 @@ +// $Id$ + +// ============================================================================ +// +// = LIBRARY +// examples +// +// = FILENAME +// simple_test_proactor.cpp +// +// = DESCRIPTION +// Very simple version of test_proactor.cpp. +// +// = AUTHOR +// Alexander Babu Arulanthu (alex@cs.wustl.edu) +// +// ============================================================================ + +#include "ace/Service_Config.h" +#include "ace/Proactor.h" +#include "ace/Asynch_IO.h" +#include "ace/Asynch_IO_Impl.h" +#include "ace/Message_Block.h" +#include "ace/Get_Opt.h" +#include "ace/OS_main.h" + +ACE_RCSID(Proactor, test_proactor, "simple_test_proactor.cpp,v 1.1 1999/05/18 22:15:30 alex Exp") + +#if ((defined (ACE_WIN32) && !defined (ACE_HAS_WINCE)) || (defined (ACE_HAS_AIO_CALLS))) + // This only works on Win32 platforms and on Unix platforms supporting + // POSIX aio calls. + +static ACE_TCHAR *file = ACE_TEXT("simple_test_proactor.cpp"); +static ACE_TCHAR *dump_file = ACE_TEXT("simple_output"); + +class Simple_Tester : public ACE_Handler +{ + // = TITLE + // + // Simple_Tester + // + // = DESCRIPTION + // + // The class will be created by main(). This class reads a block + // from the file and write that to the dump file. + +public: + Simple_Tester (void); + // Constructor. + + ~Simple_Tester (void); + + int open (void); + // Open the operations and initiate read from the file. + +protected: + // = These methods are called by the freamwork. + + virtual void handle_read_file (const ACE_Asynch_Read_File::Result &result); + // This is called when asynchronous reads from the socket complete. + + virtual void handle_write_file (const ACE_Asynch_Write_File::Result &result); + // This is called when asynchronous writes from the socket complete. + +private: + int initiate_read_file (void); + + ACE_Asynch_Read_File rf_; + // rf (read file): for writing from the file. + + ACE_Asynch_Write_File wf_; + // ws (write File): for writing to the file. + + ACE_HANDLE input_file_; + // File to read from. + + ACE_HANDLE dump_file_; + // File for dumping data. + + // u_long file_offset_; + // Current file offset + + // u_long file_size_; + // File size +}; + + +Simple_Tester::Simple_Tester (void) + : input_file_ (ACE_INVALID_HANDLE), + dump_file_ (ACE_INVALID_HANDLE) +{ +} + +Simple_Tester::~Simple_Tester (void) +{ + ACE_OS::close (this->input_file_); + ACE_OS::close (this->dump_file_); +} + + +int +Simple_Tester::open (void) +{ + // Initialize stuff + + // Open input file (in OVERLAPPED mode) + this->input_file_ = ACE_OS::open (file, + GENERIC_READ | FILE_FLAG_OVERLAPPED); + if (this->input_file_ == ACE_INVALID_HANDLE) + ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "ACE_OS::open"), -1); + + // Open dump file (in OVERLAPPED mode) + this->dump_file_ = ACE_OS::open (dump_file, + O_CREAT | O_RDWR | O_TRUNC | FILE_FLAG_OVERLAPPED, + 0644); + if (this->dump_file_ == ACE_INVALID_HANDLE) + ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "ACE_OS::open"), -1); + + // Open ACE_Asynch_Read_File + if (this->rf_.open (*this, this->input_file_) == -1) + ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "ACE_Asynch_Read_File::open"), -1); + + // Open ACE_Asynch_Write_File + if (this->wf_.open (*this, this->dump_file_) == -1) + ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "ACE_Asynch_Write_File::open"), -1); + + ACE_DEBUG ((LM_DEBUG, + "Simple_Tester::open: Files and Asynch Operations opened sucessfully\n")); + + + // Start an asynchronous read file + if (this->initiate_read_file () == -1) + return -1; + + return 0; +} + + +int +Simple_Tester::initiate_read_file (void) +{ + // Create Message_Block + ACE_Message_Block *mb = 0; + ACE_NEW_RETURN (mb, ACE_Message_Block (BUFSIZ + 1), -1); + + // Inititiate an asynchronous read from the file + if (this->rf_.read (*mb, + mb->size () - 1) == -1) + ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "ACE_Asynch_Read_File::read"), -1); + + ACE_DEBUG ((LM_DEBUG, + "Simple_Tester:initiate_read_file: Asynch Read File issued sucessfully\n")); + + return 0; +} + +void +Simple_Tester::handle_read_file (const ACE_Asynch_Read_File::Result &result) +{ + ACE_DEBUG ((LM_DEBUG, "handle_read_file called\n")); + + result.message_block ().rd_ptr ()[result.bytes_transferred ()] = '\0'; + + ACE_DEBUG ((LM_DEBUG, "********************\n")); + ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_to_read", result.bytes_to_read ())); + ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "handle", result.handle ())); + ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_transfered", result.bytes_transferred ())); + ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "act", (u_long) result.act ())); + ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "success", result.success ())); + ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "completion_key", (u_long) result.completion_key ())); + ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "error", result.error ())); + ACE_DEBUG ((LM_DEBUG, "********************\n")); + // Watch out if you need to enable this... the ACE_Log_Record::MAXLOGMSGLEN + // value controls to max length of a log record, and a large output + // buffer may smash it. +#if 0 + ACE_DEBUG ((LM_DEBUG, "%s = %s\n", + "message_block", + result.message_block ().rd_ptr ())); +#endif /* 0 */ + + if (result.success ()) + { + // Read successful: write this to the file. + if (this->wf_.write (result.message_block (), + result.bytes_transferred ()) == -1) + { + ACE_ERROR ((LM_ERROR, "%p\n", "ACE_Asynch_Write_File::write")); + return; + } + } +} + +void +Simple_Tester::handle_write_file (const ACE_Asynch_Write_File::Result &result) +{ + ACE_DEBUG ((LM_DEBUG, "handle_write_File called\n")); + + // Reset pointers + result.message_block ().rd_ptr (result.message_block ().rd_ptr () - result.bytes_transferred ()); + + result.message_block ().rd_ptr ()[result.bytes_transferred ()] = '\0'; + + ACE_DEBUG ((LM_DEBUG, "********************\n")); + ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_to_write", result.bytes_to_write ())); + ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "handle", result.handle ())); + ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_transfered", result.bytes_transferred ())); + ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "act", (u_long) result.act ())); + ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "success", result.success ())); + ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "completion_key", (u_long) result.completion_key ())); + ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "error", result.error ())); + ACE_DEBUG ((LM_DEBUG, "********************\n")); + // Watch out if you need to enable this... the ACE_Log_Record::MAXLOGMSGLEN + // value controls to max length of a log record, and a large output + // buffer may smash it. +#if 0 + ACE_DEBUG ((LM_DEBUG, "%s = %s\n", + "message_block", + result.message_block ().rd_ptr ())); +#endif /* 0 */ + ACE_Proactor::end_event_loop (); +} + +static int +parse_args (int argc, ACE_TCHAR *argv[]) +{ + ACE_Get_Opt get_opt (argc, argv, ACE_TEXT("f:d:")); + int c; + + while ((c = get_opt ()) != EOF) + switch (c) + { + case 'f': + file = get_opt.opt_arg (); + break; + case 'd': + dump_file = get_opt.opt_arg (); + break; + default: + ACE_ERROR ((LM_ERROR, "%p.\n", + "usage :\n" + "-d <dumpfile>\n" + "-f <file>\n")); + return -1; + } + + return 0; +} + +int +ACE_TMAIN (int argc, ACE_TCHAR *argv[]) +{ + if (parse_args (argc, argv) == -1) + return -1; + + Simple_Tester Simple_Tester; + + if (Simple_Tester.open () == -1) + return -1; + + int success = 1; + + // dispatch events + success = !(ACE_Proactor::run_event_loop () == -1); + + return success ? 0 : 1; +} + +#endif /* ACE_WIN32 && !ACE_HAS_WINCE || ACE_HAS_AIO_CALLS*/ diff --git a/ACE/examples/Reactor/Proactor/test_aiocb.cpp b/ACE/examples/Reactor/Proactor/test_aiocb.cpp new file mode 100644 index 00000000000..c9c0d280f1b --- /dev/null +++ b/ACE/examples/Reactor/Proactor/test_aiocb.cpp @@ -0,0 +1,239 @@ +// $Id$ + +// ============================================================================ +// +// = LIBRARY +// proactor +// +// = FILENAME +// test_aiocb.cpp +// +// = DESCRIPTION +// Checkout $ACE_ROOT/examples/Reactor/Proactor/test_aiocb_ace.cpp, +// which is the ACE'ified version of this program. +// +// = COMPILE and RUN +// % CC -g -o test_aiocb -lrt test_aiocb.cpp +// % ./test_aiocb +// +// = AUTHOR +// Alexander Babu Arulanthu <alex@cs.wustl.edu> +// +// ============================================================================ + +#include <unistd.h> +#include <fcntl.h> +#include <sys/types.h> +#include <sys/stat.h> +#include <signal.h> +#include <string.h> +#include <errno.h> +#include <stdio.h> +#include <iostream.h> + +#include <aio.h> + +class Test_Aio +{ +public: + Test_Aio (void); + // Default constructor. + + int init (void); + // Initting the output file and the buffer. + + int do_aio (void); + // Doing the testing stuff. + + ~Test_Aio (void); + // Destructor. +private: + int out_fd_; + // Output file descriptor. + + struct aiocb *aiocb_write_; + // For writing to the file. + + struct aiocb *aiocb_read_; + // Reading stuff from the file. + + char *buffer_write_; + // The buffer to be written to the out_fd. + + char *buffer_read_; + // The buffer to be read back from the file. +}; + +Test_Aio::Test_Aio (void) + : aiocb_write_ (new struct aiocb), + aiocb_read_ (new struct aiocb), + buffer_write_ (0), + buffer_read_ (0) +{ +} + +Test_Aio::~Test_Aio (void) +{ + delete aiocb_write_; + delete aiocb_read_; + delete buffer_write_; + delete buffer_read_; +} + +// Init the output file and init the buffer. +int +Test_Aio::init (void) +{ + // Open the output file. + this->out_fd_ = open ("test_aio.log", O_RDWR | O_CREAT | O_TRUNC, 0666); + if (this->out_fd_ == 0) + { + cout << "Error : Opening file" << endl; + return -1; + } + + // Init the buffers. + this->buffer_write_ = strdup ("Welcome to the world of AIO... AIO Rules !!!"); + cout << "The buffer : " << this->buffer_write_ << endl; + this->buffer_read_ = new char [strlen (this->buffer_write_) + 1]; + return 0; +} + +// Set the necessary things for the AIO stuff. +// Write the buffer asynchly.hmm Disable signals. +// Go on aio_suspend. Wait for completion. +// Print out the result. +int +Test_Aio::do_aio (void) +{ + // = Write to the file. + + // Setup AIOCB. + this->aiocb_write_->aio_fildes = this->out_fd_; + this->aiocb_write_->aio_offset = 0; + this->aiocb_write_->aio_buf = this->buffer_write_; + this->aiocb_write_->aio_nbytes = strlen (this->buffer_write_); + this->aiocb_write_->aio_reqprio = 0; + this->aiocb_write_->aio_sigevent.sigev_notify = SIGEV_NONE; + //this->this->aiocb_.aio_sigevent.sigev_signo = SIGRTMAX; + this->aiocb_write_->aio_sigevent.sigev_value.sival_ptr = + (void *) this->aiocb_write_; + + // Fire off the aio write. + if (aio_write (this->aiocb_write_) != 0) + { + perror ("aio_write"); + return -1; + } + + // = Read from that file. + + // Setup AIOCB. + this->aiocb_read_->aio_fildes = this->out_fd_; + this->aiocb_read_->aio_offset = 0; + this->aiocb_read_->aio_buf = this->buffer_read_; + this->aiocb_read_->aio_nbytes = strlen (this->buffer_write_); + this->aiocb_read_->aio_reqprio = 0; + this->aiocb_read_->aio_sigevent.sigev_notify = SIGEV_NONE; + //this->this->aiocb_.aio_sigevent.sigev_signo = SIGRTMAX; + this->aiocb_read_->aio_sigevent.sigev_value.sival_ptr = + (void *) this->aiocb_read_; + + // Fire off the aio write. If it doesnt get queued, carry on to get + // the completion for the first one. + if (aio_read (this->aiocb_read_) < 0) + perror ("aio_read"); + + // Wait for the completion on aio_suspend. + struct aiocb *list_aiocb[2]; + list_aiocb [0] = this->aiocb_write_; + list_aiocb [1] = this->aiocb_read_; + + // Do suspend till all the aiocbs in the list are done. + int done = 0; + int return_val = 0; + while (!done) + { + return_val = aio_suspend (list_aiocb, + 2, + 0); + cerr << "Return value :" << return_val << endl; + + // Analyze return and error values. + if (list_aiocb[0] != 0) + { + if (aio_error (list_aiocb [0]) != EINPROGRESS) + { + if (aio_return (list_aiocb [0]) == -1) + { + perror ("aio_return"); + return -1; + } + else + { + // Successful. Store the pointer somewhere and make the + // entry NULL in the list. + this->aiocb_write_ = list_aiocb [0]; + list_aiocb [0] = 0; + } + } + else + cout << "AIO write in progress" << endl; + } + + if (list_aiocb[1] != 0) + { + if (aio_error (list_aiocb [1]) != EINPROGRESS) + { + int read_return = aio_return (list_aiocb[1]); + if (read_return == -1) + { + perror ("aio_return"); + return -1; + } + else + { + // Successful. Store the pointer somewhere and make the + // entry NULL in the list. + this->aiocb_read_ = list_aiocb [1]; + list_aiocb [1] = 0; + this->buffer_read_[read_return] = '\0'; + } + } + else + cout << "AIO read in progress" << endl; + } + + // Is it done? + if ((list_aiocb [0] == 0) && (list_aiocb [1] == 0)) + done = 1; + } + + cout << "Both the AIO operations done." << endl; + cout << "The buffer is :" << this->buffer_read_ << endl; + + return 0; +} + +int +main (int argc, char **argv) +{ + Test_Aio test_aio; + + if (test_aio.init () != 0) + { + printf ("AIOCB test failed:\n" + "ACE_POSIX_AIOCB_PROACTOR may not work in this platform\n"); + return -1; + } + + if (test_aio.do_aio () != 0) + { + printf ("AIOCB test failed:\n" + "ACE_POSIX_AIOCB_PROACTOR may not work in this platform\n"); + return -1; + } + printf ("AIOCB test successful:\n" + "ACE_POSIX_AIOCB_PROACTOR should work in this platform\n"); + return 0; +} diff --git a/ACE/examples/Reactor/Proactor/test_aiocb_ace.cpp b/ACE/examples/Reactor/Proactor/test_aiocb_ace.cpp new file mode 100644 index 00000000000..17705de1f03 --- /dev/null +++ b/ACE/examples/Reactor/Proactor/test_aiocb_ace.cpp @@ -0,0 +1,259 @@ +// $Id$ + +// ============================================================================ +// +// = LIBRARY +// proactor +// +// = FILENAME +// test_aiocb_ace.cpp +// +// = DESCRIPTION +// This program helps you to test the <aio_*> calls on a +// platform. +// +// Before running this test, make sure the platform can +// support POSIX <aio_> calls, using +// ACE_ROOT/tests/Aio_Platform_Test. +// +// This program tests the AIOCB (AIO Control Blocks) based +// completion approach which uses <aio_suspend> for completion +// querying. +// +// If this test is successful, ACE_POSIX_AIOCB_PROACTOR +// can be used on this platform. +// +// = COMPILE and RUN +// % make +// % ./test_aiocb_ace +// +// = AUTHOR +// Alexander Babu Arulanthu <alex@cs.wustl.edu> +// +// ============================================================================ + +#include "ace/ACE.h" +#include "ace/Log_Msg.h" +#include "ace/os_include/os_aio.h" +#include "ace/OS_NS_string.h" + +class Test_Aio +{ +public: + Test_Aio (void); + // Default constructor. + + int init (void); + // Initting the output file and the buffer. + + int do_aio (void); + // Doing the testing stuff. + + ~Test_Aio (void); + // Destructor. +private: + int out_fd_; + // Output file descriptor. + + struct aiocb *aiocb_write_; + // For writing to the file. + + struct aiocb *aiocb_read_; + // Reading stuff from the file. + + char *buffer_write_; + // The buffer to be written to the out_fd. + + char *buffer_read_; + // The buffer to be read back from the file. +}; + +Test_Aio::Test_Aio (void) + : aiocb_write_ (0), + aiocb_read_ (0), + buffer_write_ (0), + buffer_read_ (0) +{ + ACE_NEW (this->aiocb_write_, + struct aiocb); + ACE_NEW (this->aiocb_read_, + struct aiocb); +} + +Test_Aio::~Test_Aio (void) +{ + delete aiocb_write_; + delete aiocb_read_; + delete buffer_write_; + delete buffer_read_; +} + +// Init the output file and init the buffer. +int +Test_Aio::init (void) +{ + // Open the output file. + this->out_fd_ = ACE_OS::open ("test_aio.log", + O_RDWR | O_CREAT | O_TRUNC, + 0666); + if (this->out_fd_ == 0) + ACE_ERROR_RETURN ((LM_ERROR, + "Error: Opening file\n"), + -1); + + // Init the buffers. + this->buffer_write_ = ACE::strnew ("Welcome to the world of AIO... AIO Rules !!!"); + ACE_DEBUG ((LM_DEBUG, + "The buffer : %s\n", + this->buffer_write_)); + + // Allocate memory for the read buffer. + ACE_NEW_RETURN (this->buffer_read_, + char [ACE_OS::strlen (this->buffer_write_)], + -1); + + return 0; +} + +// Set the necessary things for the AIO stuff. +// Write the buffer asynchly.hmm Disable signals. +// Go on aio_suspend. Wait for completion. +// Print out the result. +int +Test_Aio::do_aio (void) +{ + // = Write to the file. + + // Setup AIOCB. + this->aiocb_write_->aio_fildes = this->out_fd_; + this->aiocb_write_->aio_offset = 0; + this->aiocb_write_->aio_buf = this->buffer_write_; + this->aiocb_write_->aio_nbytes = ACE_OS::strlen (this->buffer_write_); + this->aiocb_write_->aio_reqprio = 0; + this->aiocb_write_->aio_sigevent.sigev_notify = SIGEV_NONE; + //this->this->aiocb_.aio_sigevent.sigev_signo = SIGRTMAX; + this->aiocb_write_->aio_sigevent.sigev_value.sival_ptr = + (void *) this->aiocb_write_; + + // Fire off the aio write. + if (aio_write (this->aiocb_write_) != 0) + ACE_ERROR_RETURN ((LM_ERROR, + "%p\n", + "aio_write"), + -1); + + // = Read from that file. + + // Setup AIOCB. + this->aiocb_read_->aio_fildes = this->out_fd_; + this->aiocb_read_->aio_offset = 0; + this->aiocb_read_->aio_buf = this->buffer_read_; + this->aiocb_read_->aio_nbytes = ACE_OS::strlen (this->buffer_write_); + this->aiocb_read_->aio_reqprio = 0; + this->aiocb_read_->aio_sigevent.sigev_notify = SIGEV_NONE; + //this->this->aiocb_.aio_sigevent.sigev_signo = SIGRTMAX; + this->aiocb_read_->aio_sigevent.sigev_value.sival_ptr = + (void *) this->aiocb_read_; + + // Fire off the aio write. If it doesnt get queued, carry on to get + // the completion for the first one. + if (aio_read (this->aiocb_read_) < 0) + ACE_ERROR_RETURN ((LM_ERROR, + "%p\n", + "aio_read"), + -1); + + // Wait for the completion on aio_suspend. + struct aiocb *list_aiocb[2]; + list_aiocb [0] = this->aiocb_write_; + list_aiocb [1] = this->aiocb_read_; + + // Do suspend till all the aiocbs in the list are done. + int to_finish = 2; + int return_val = 0; + while (to_finish > 0) + { + return_val = aio_suspend (list_aiocb, + to_finish, + 0); + ACE_DEBUG ((LM_DEBUG, + "Result of <aio_suspend> : %d\n", + return_val)); + + // Analyze return and error values. + if (to_finish > 1) + { + if (aio_error (list_aiocb [1]) != EINPROGRESS) + { + if (aio_return (list_aiocb [1]) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + "%p\n", + "aio_return, item 1"), + -1); + else + { + // Successful. Remember we have one less thing to finish. + --to_finish; + list_aiocb [1] = 0; + } + } + else + ACE_DEBUG ((LM_DEBUG, + "aio_error says aio 1 is in progress\n")); + } + + if (aio_error (list_aiocb [0]) != EINPROGRESS) + { + if (aio_return (list_aiocb [0]) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + "%p\n", + "aio_return, item 0"), + -1); + else + { + // Successful. Store the pointer somewhere and bump the + // read entry up to the front, if it is still not done. + --to_finish; + list_aiocb [0] = this->aiocb_read_; + } + } + else + ACE_DEBUG ((LM_DEBUG, + "aio_error says aio 0 is in progress\n")); + } + + ACE_DEBUG ((LM_DEBUG, + "Both the AIO operations done.\n" + "The buffer is : %s\n", + this->buffer_read_)); + + return 0; +} + +int +main (int argc, char **argv) +{ + + ACE_UNUSED_ARG (argc); + ACE_UNUSED_ARG (argv); + + Test_Aio test_aio; + + if (test_aio.init () != 0) + ACE_ERROR_RETURN ((LM_ERROR, + "AIOCB test failed:\n" + "ACE_POSIX_AIOCB_PROACTOR may not work in this platform\n"), + -1); + + if (test_aio.do_aio () != 0) + ACE_ERROR_RETURN ((LM_ERROR, + "AIOCB test failed:\n" + "ACE_POSIX_AIOCB_PROACTOR may not work in this platform\n"), + -1); + + ACE_DEBUG ((LM_DEBUG, + "AIOCB test successful:\n" + "ACE_POSIX_AIOCB_PROACTOR should work in this platform\n")); + + return 0; +} diff --git a/ACE/examples/Reactor/Proactor/test_aiosig.cpp b/ACE/examples/Reactor/Proactor/test_aiosig.cpp new file mode 100644 index 00000000000..1746a10a49c --- /dev/null +++ b/ACE/examples/Reactor/Proactor/test_aiosig.cpp @@ -0,0 +1,294 @@ +// $Id$ +// ============================================================================ +// +// = FILENAME +// test_aiosig.cpp +// +// = DESCRITPTION +// Check out test_aiosig_ace.cpp, the ACE'ified version of this +// program. This program may not be uptodate. +// +// = COMPILATION +// CC -g -o test_aiosig -lrt test_aiosig.cpp +// +// = RUN +// ./test_aiosig +// +// = AUTHOR +// Programming for the Real World. Bill O. GallMeister. +// Modified by Alexander Babu Arulanthu <alex@cs.wustl.edu> +// +// ===================================================================== + + +#include <unistd.h> +#include <fcntl.h> +#include <sys/types.h> +#include <sys/stat.h> +#include <signal.h> +#include <string.h> +#include <errno.h> +#include <stdio.h> + +#include <limits.h> + +#include <aio.h> + +int file_handle = -1; +char mb1 [BUFSIZ + 1]; +char mb2 [BUFSIZ + 1]; +aiocb aiocb1, aiocb2; +sigset_t completion_signal; + +// Function prototypes. +int setup_signal_delivery (void); +int issue_aio_calls (void); +int query_aio_completions (void); +int test_aio_calls (void); +int setup_signal_handler (void); +int setup_signal_handler (int signal_number); + +int +setup_signal_delivery (void) +{ + // Make the sigset_t consisting of the completion signal. + if (sigemptyset (&completion_signal) == -1) + { + perror ("Error:Couldnt init the RT completion signal set\n"); + return -1; + } + + if (sigaddset (&completion_signal, SIGRTMIN) == -1) + { + perror ("Error:Couldnt init the RT completion signal set\n"); + return -1; + } + + // Mask them. + if (pthread_sigmask (SIG_BLOCK, &completion_signal, 0) == -1) + { + perror ("Error:Couldnt maks the RT completion signals\n"); + return -1; + } + + return setup_signal_handler (SIGRTMIN); +} + +int +issue_aio_calls (void) +{ + // Setup AIOCB. + aiocb1.aio_fildes = file_handle; + aiocb1.aio_offset = 0; + aiocb1.aio_buf = mb1; + aiocb1.aio_nbytes = BUFSIZ; + aiocb1.aio_reqprio = 0; + aiocb1.aio_sigevent.sigev_notify = SIGEV_SIGNAL; + aiocb1.aio_sigevent.sigev_signo = SIGRTMIN; + aiocb1.aio_sigevent.sigev_value.sival_ptr = (void *) &aiocb1; + + // Fire off the aio write. + if (aio_read (&aiocb1) == -1) + { + // Queueing failed. + perror ("Error:Asynch_Read_Stream: aio_read queueing failed\n"); + return -1; + } + + // Setup AIOCB. + aiocb2.aio_fildes = file_handle; + aiocb2.aio_offset = BUFSIZ + 1; + aiocb2.aio_buf = mb2; + aiocb2.aio_nbytes = BUFSIZ; + aiocb2.aio_reqprio = 0; + aiocb2.aio_sigevent.sigev_notify = SIGEV_SIGNAL; + aiocb2.aio_sigevent.sigev_signo = SIGRTMIN; + aiocb2.aio_sigevent.sigev_value.sival_ptr = (void *) &aiocb2; + + // Fire off the aio write. + if (aio_read (&aiocb2) == -1) + { + // Queueing failed. + perror ("Error:Asynch_Read_Stream: aio_read queueing failed\n"); + return -1; + } + return 0; +} + +int +query_aio_completions (void) +{ + int result = 0; + size_t number_of_compleions = 0; + for (number_of_compleions = 0; + number_of_compleions < 2; + number_of_compleions ++) + { + // Wait for <milli_seconds> amount of time. + // @@ Assigning <milli_seconds> to tv_sec. + timespec timeout; + timeout.tv_sec = INT_MAX; + timeout.tv_nsec = 0; + + // To get back the signal info. + siginfo_t sig_info; + + // Await the RT completion signal. + int sig_return = sigtimedwait (&completion_signal, + &sig_info, + &timeout); + + // Error case. + // If failure is coz of timeout, then return *0* but set + // errno appropriately. This is what the WinNT proactor + // does. + if (sig_return == -1) + { + perror ("Error:Error waiting for RT completion signals\n"); + return -1; + } + + // RT completion signals returned. + if (sig_return != SIGRTMIN) + { + printf ("Unexpected signal (%d) has been received while waiting for RT Completion Signals\n", + sig_return); + return -1; + } + + // @@ Debugging. + printf ("Sig number found in the sig_info block : %d\n", + sig_info.si_signo); + + // Is the signo returned consistent? + if (sig_info.si_signo != sig_return) + { + printf ("Inconsistent signal number (%d) in the signal info block\n", + sig_info.si_signo); + return -1; + } + + // @@ Debugging. + printf ("Signal code for this signal delivery : %d\n", + sig_info.si_code); + + // Is the signal code an aio completion one? + if ((sig_info.si_code != SI_ASYNCIO) && + (sig_info.si_code != SI_QUEUE)) + { + printf ("Unexpected signal code (%d) returned on completion querying\n", + sig_info.si_code); + return -1; + } + + // Retrive the aiocb. + aiocb* aiocb_ptr = (aiocb *) sig_info.si_value.sival_ptr; + + // Analyze error and return values. Return values are + // actually <errno>'s associated with the <aio_> call + // corresponding to aiocb_ptr. + int error_code = aio_error (aiocb_ptr); + if (error_code == -1) + { + perror ("Error:Invalid control block was sent to <aio_error> for compleion querying\n"); + return -1; + } + + if (error_code != 0) + { + // Error occurred in the <aio_>call. Return the errno + // corresponding to that <aio_> call. + printf ("Error:An AIO call has failed:Error code = %d\n", + error_code); + return -1; + } + + // No error occured in the AIO operation. + int nbytes = aio_return (aiocb_ptr); + if (nbytes == -1) + { + perror ("Error:Invalid control block was send to <aio_return>\n"); + return -1; + } + + if (number_of_compleions == 0) + // Print the buffer. + printf ("Number of bytes transferred : %d\n The buffer : %s \n", + nbytes, + mb1); + else + // Print the buffer. + printf ("Number of bytes transferred : %d\n The buffer : %s \n", + nbytes, + mb2); + } + return 0; +} + +int +test_aio_calls (void) +{ + // Set up the input file. + // Open file (in SEQUENTIAL_SCAN mode) + file_handle = open ("test_aiosig.cpp", O_RDONLY); + + if (file_handle == -1) + { + perror ("Error:Opening the inputfile"); + return -1; + } + + if (setup_signal_delivery () < 0) + return -1; + + if (issue_aio_calls () < 0) + return -1; + + if (query_aio_completions () < 0) + return -1; + + return 0; +} + +int +setup_signal_handler (int signal_number) +{ + // Setting up the handler(!) for these signals. + struct sigaction reaction; + sigemptyset (&reaction.sa_mask); // Nothing else to mask. + reaction.sa_flags = SA_SIGINFO; // Realtime flag. +#if defined (SA_SIGACTION) + // Lynx says, it is better to set this bit to be portable. + reaction.sa_flags &= SA_SIGACTION; +#endif /* SA_SIGACTION */ + reaction.sa_sigaction = null_handler; // Null handler. + int sigaction_return = sigaction (SIGRTMIN, + &reaction, + 0); + if (sigaction_return == -1) + { + perror ("Error:Proactor couldnt do sigaction for the RT SIGNAL"); + return -1; + } + + return 0; +} + +void +null_handler (int /* signal_number */, + siginfo_t * /* info */, + void * /* context */) +{ +} + +int +main (int, char *[]) +{ + if (test_aio_calls () == 0) + printf ("RT SIG test successful:\n" + "ACE_POSIX_SIG_PROACTOR should work in this platform\n"); + else + printf ("RT SIG test failed:\n" + "ACE_POSIX_SIG_PROACTOR may not work in this platform\n"); + return 0; +} diff --git a/ACE/examples/Reactor/Proactor/test_aiosig_ace.cpp b/ACE/examples/Reactor/Proactor/test_aiosig_ace.cpp new file mode 100644 index 00000000000..34c1b9b5ab2 --- /dev/null +++ b/ACE/examples/Reactor/Proactor/test_aiosig_ace.cpp @@ -0,0 +1,358 @@ +// $Id$ + +// ============================================================================ +// +// = FILENAME +// test_aiosig_sig.cpp +// +// = DESCRITPTION +// This program helps you to test the <aio_*> calls on a +// platform. +// Before running this test, make sure the platform can +// support POSIX <aio_> calls, using ACE_ROOT/tests/Aio_Plaform_Test.cpp +// +// This program tests the Signal based completion approach which +// uses <sigtimedwait> for completion querying. +// If this test is successful, ACE_POSIX_SIG_PROACTOR +// can be used on this platform. +// +// This program is a ACE version of the +// $ACE_ROOT/examples/Reactor/Proactor/test_aiosig.cpp, with +// ACE_DEBUGs and Message_Blocks. +// +// This test does the following: +// Issue two <aio_read>s. +// Assign SIGRTMIN as the notification signal. +// Mask these signals from delivery. +// Receive this signal by doing <sigtimedwait>. +// Wait for two completions (two signals) +// +// = COMPILATION +// make +// +// = RUN +// ./test_aiosig_ace +// +// = AUTHOR +// Programming for the Real World. Bill O. GallMeister. +// Modified by Alexander Babu Arulanthu <alex@cs.wustl.edu> +// +// ===================================================================== + +#include "ace/Message_Block.h" +#include "ace/Log_Msg.h" +#include "ace/os_include/os_aio.h" +#include "ace/OS_NS_signal.h" +#include "ace/OS_NS_unistd.h" +#include "ace/OS_NS_fcntl.h" +#include "ace/Asynch_IO.h" // for ACE_INFINITE + +static ACE_HANDLE file_handle = ACE_INVALID_HANDLE; +static ACE_Message_Block mb1 (BUFSIZ + 1); +static ACE_Message_Block mb2 (BUFSIZ + 1); +static aiocb aiocb1; +static aiocb aiocb2; +static aiocb aiocb3; +static sigset_t completion_signal; + +// Function prototypes. +static int setup_signal_delivery (void); +static int issue_aio_calls (void); +static int query_aio_completions (void); +static int test_aio_calls (void); +static void null_handler (int signal_number, siginfo_t *info, void *context); +static int setup_signal_handler (int signal_number); + +static int +setup_signal_delivery (void) +{ + // = Mask all the signals. + + sigset_t full_set; + + // Get full set. + if (sigfillset (&full_set) != 0) + ACE_ERROR_RETURN ((LM_ERROR, + "Error:(%P | %t):%p\n", + "sigfillset failed"), + -1); + + // Mask them. + if (ACE_OS::pthread_sigmask (SIG_SETMASK, &full_set, 0) != 0) + ACE_ERROR_RETURN ((LM_ERROR, + "Error:(%P | %t):%p\n", + "pthread_sigmask failed"), + -1); + + // = Make a mask with SIGRTMIN only. We use only that signal to + // issue <aio_>'s. + + if (sigemptyset (&completion_signal) == -1) + ACE_ERROR_RETURN ((LM_ERROR, "Error: %p\n", + "Couldnt init the RT completion signal set"), + -1); + + if (sigaddset (&completion_signal, + SIGRTMIN) == -1) + ACE_ERROR_RETURN ((LM_ERROR, "Error: %p\n", + "Couldnt init the RT completion signal set"), + -1); + + // Set up signal handler for this signal. + return setup_signal_handler (SIGRTMIN); +} + +static int +setup_signal_handler (int signal_number) +{ + ACE_UNUSED_ARG (signal_number); + + // Setting up the handler(!) for these signals. + struct sigaction reaction; + sigemptyset (&reaction.sa_mask); // Nothing else to mask. + reaction.sa_flags = SA_SIGINFO; // Realtime flag. +#if defined (SA_SIGACTION) + // Lynx says, it is better to set this bit to be portable. + reaction.sa_flags &= SA_SIGACTION; +#endif /* SA_SIGACTION */ + reaction.sa_sigaction = null_handler; // Null handler. + int sigaction_return = sigaction (SIGRTMIN, + &reaction, + 0); + if (sigaction_return == -1) + ACE_ERROR_RETURN ((LM_ERROR, "Error: %p\n", + "Proactor couldnt do sigaction for the RT SIGNAL"), + -1); + return 0; +} + + +static int +issue_aio_calls (void) +{ + // Setup AIOCB. + aiocb1.aio_fildes = file_handle; + aiocb1.aio_offset = 0; + aiocb1.aio_buf = mb1.wr_ptr (); + aiocb1.aio_nbytes = BUFSIZ; + aiocb1.aio_reqprio = 0; + aiocb1.aio_sigevent.sigev_notify = SIGEV_SIGNAL; + aiocb1.aio_sigevent.sigev_signo = SIGRTMIN; + aiocb1.aio_sigevent.sigev_value.sival_ptr = (void *) &aiocb1; + + // Fire off the aio read. + if (aio_read (&aiocb1) == -1) + // Queueing failed. + ACE_ERROR_RETURN ((LM_ERROR, "Error: %p\n", + "Asynch_Read_Stream: aio_read queueing failed"), + -1); + + // Setup AIOCB. + aiocb2.aio_fildes = file_handle; + aiocb2.aio_offset = BUFSIZ + 1; + aiocb2.aio_buf = mb2.wr_ptr (); + aiocb2.aio_nbytes = BUFSIZ; + aiocb2.aio_reqprio = 0; + aiocb2.aio_sigevent.sigev_notify = SIGEV_SIGNAL; + aiocb2.aio_sigevent.sigev_signo = SIGRTMIN; + aiocb2.aio_sigevent.sigev_value.sival_ptr = (void *) &aiocb2; + + // Fire off the aio read. + if (aio_read (&aiocb2) == -1) + // Queueing failed. + ACE_ERROR_RETURN ((LM_ERROR, "Error: %p\n", + "Asynch_Read_Stream: aio_read queueing failed"), + -1); + + // Setup sigval. + aiocb3.aio_fildes = ACE_INVALID_HANDLE; + aiocb3.aio_offset = 0; + aiocb3.aio_buf = 0; + aiocb3.aio_nbytes = 0; + aiocb3.aio_reqprio = 0; + aiocb3.aio_sigevent.sigev_notify = SIGEV_SIGNAL; + aiocb3.aio_sigevent.sigev_signo = SIGRTMIN; + aiocb3.aio_sigevent.sigev_value.sival_ptr = (void *) &aiocb3; + sigval value; + value.sival_ptr = reinterpret_cast<void *> (&aiocb3); + // Queue this one for completion right now. + if (sigqueue (ACE_OS::getpid (), SIGRTMIN, value) == -1) + // Queueing failed. + ACE_ERROR_RETURN ((LM_ERROR, + "Error: %p\n", "sigqueue"), + -1); + + return 0; +} + +static int +query_aio_completions (void) +{ + for (size_t number_of_compleions = 0; + number_of_compleions < 3; + number_of_compleions ++) + { + // Wait for <milli_seconds> amount of time. @@ Assigning + // <milli_seconds> to tv_sec. + timespec timeout; + timeout.tv_sec = ACE_INFINITE; + timeout.tv_nsec = 0; + + // To get back the signal info. + siginfo_t sig_info; + + // Await the RT completion signal. + int sig_return = sigtimedwait (&completion_signal, + &sig_info, + &timeout); + + // Error case. + // If failure is coz of timeout, then return *0* but set + // errno appropriately. This is what the WinNT proactor + // does. + if (sig_return == -1) + ACE_ERROR_RETURN ((LM_ERROR, "Error: %p\n", + "Error waiting for RT completion signals"), + -1); + + // RT completion signals returned. + if (sig_return != SIGRTMIN) + ACE_ERROR_RETURN ((LM_ERROR, + "Unexpected signal (%d) has been received while waiting for RT Completion Signals\n", + sig_return), + -1); + + // @@ Debugging. + ACE_DEBUG ((LM_DEBUG, + "Sig number found in the sig_info block : %d\n", + sig_info.si_signo)); + + // Is the signo returned consistent? + if (sig_info.si_signo != sig_return) + ACE_ERROR_RETURN ((LM_ERROR, + "Inconsistent signal number (%d) in the signal info block\n", + sig_info.si_signo), + -1); + + // @@ Debugging. + ACE_DEBUG ((LM_DEBUG, + "Signal code for this signal delivery : %d\n", + sig_info.si_code)); + + // Is the signal code an aio completion one? + if ((sig_info.si_code != SI_ASYNCIO) && + (sig_info.si_code != SI_QUEUE)) + ACE_ERROR_RETURN ((LM_DEBUG, + "Unexpected signal code (%d) returned on completion querying\n", + sig_info.si_code), + -1); + + // Retrive the aiocb. + aiocb* aiocb_ptr = (aiocb *) sig_info.si_value.sival_ptr; + if (aiocb_ptr == &aiocb3) + { + ACE_ASSERT (sig_info.si_code == SI_QUEUE); + ACE_DEBUG ((LM_DEBUG, "sigqueue caught... good\n")); + } + else + { + // Analyze error and return values. Return values are + // actually <errno>'s associated with the <aio_> call + // corresponding to aiocb_ptr. + int error_code = aio_error (aiocb_ptr); + if (error_code == -1) + ACE_ERROR_RETURN ((LM_ERROR, "%p\n", + "Invalid control block was sent to <aio_error> for completion querying"), + -1); + + if (error_code != 0) + // Error occurred in the <aio_>call. Return the errno + // corresponding to that <aio_> call. + ACE_ERROR_RETURN ((LM_ERROR, "%p\n", + "An AIO call has failed"), + error_code); + + // No error occured in the AIO operation. + int nbytes = aio_return (aiocb_ptr); + if (nbytes == -1) + ACE_ERROR_RETURN ((LM_ERROR, "%p\n", + "Invalid control block was send to <aio_return>"), + -1); + if (number_of_compleions == 0) + { + // Print the buffer. + ACE_DEBUG ((LM_DEBUG, + "\n Number of bytes transferred : %d\n", + nbytes)); + // Note... the dumps of the buffers are disabled because they + // may easily overrun the ACE_Log_Msg output buffer. If you need + // to turn the on for some reason, be careful of this. +#if 0 + ACE_DEBUG ((LM_DEBUG, "The buffer : %s \n", mb1.rd_ptr ())); +#endif /* 0 */ + } + else + { + // Print the buffer. + ACE_DEBUG ((LM_DEBUG, + "\n Number of bytes transferred : %d\n", + nbytes)); +#if 0 + ACE_DEBUG ((LM_DEBUG, "The buffer : %s \n", mb2.rd_ptr ())); +#endif /* 0 */ + } + } + } + + return 0; +} + +static int +test_aio_calls (void) +{ + // Set up the input file. + // Open file (in SEQUENTIAL_SCAN mode) + file_handle = ACE_OS::open ("test_aiosig_ace.cpp", + O_RDONLY); + + if (file_handle == ACE_INVALID_HANDLE) + ACE_ERROR_RETURN ((LM_ERROR, + "%p\n", + "ACE_OS::open"), + -1); + + if (setup_signal_delivery () == -1) + return -1; + + if (issue_aio_calls () == -1) + return -1; + + if (query_aio_completions () == -1) + return -1; + + return 0; +} + +static void +null_handler (int signal_number, + siginfo_t */* info */, + void * /* context */) +{ + ACE_ERROR ((LM_ERROR, + "Error:%s:Signal number %d\n" + "Mask all the RT signals for this thread", + "ACE_POSIX_SIG_Proactor::null_handler called", + signal_number)); +} + +int +main (int, char *[]) +{ + if (test_aio_calls () == 0) + printf ("RT SIG test successful:\n" + "ACE_POSIX_SIG_PROACTOR should work in this platform\n"); + else + printf ("RT SIG test failed:\n" + "ACE_POSIX_SIG_PROACTOR may not work in this platform\n"); + return 0; +} diff --git a/ACE/examples/Reactor/Proactor/test_cancel.cpp b/ACE/examples/Reactor/Proactor/test_cancel.cpp new file mode 100644 index 00000000000..c10f8e9be2c --- /dev/null +++ b/ACE/examples/Reactor/Proactor/test_cancel.cpp @@ -0,0 +1,246 @@ +// $Id$ + +// ============================================================================ +// +// = LIBRARY +// examples +// +// = FILENAME +// test_cancel.cpp +// +// = DESCRIPTION +// This program tests cancelling an Asynchronous Operation in the +// Proactor framework. +// +// This tests accepts a connection and issues an Asynchronous Read +// Stream. It reads <read_size> (option -s) number of bytes and +// when this operation completes, it issues another Asynchronous +// Read Stream to <read_size> and immediately calls <cancel> to +// cancel the operation and so the program exits closing the +// connection. +// +// Works fine on NT. On Solaris platforms, the asynch read is +// pending, but the cancel returns with the value <AIO_ALLDONE> +// indicating all the operations in that handle are done. +// But, LynxOS has a good <aio_cancel> implementation. It works +// fine. +// +// = RUN +// ./test_cancel -p <port_number> +// Then telnet to this port and send <read_size> bytes and your +// connection should get closed down. +// +// = AUTHOR +// Irfan Pyarali (irfan@cs.wustl.edu) +// +// ============================================================================ + +#include "ace/OS_main.h" +#include "ace/Service_Config.h" +#include "ace/Proactor.h" +#include "ace/Asynch_IO.h" +#include "ace/Asynch_IO_Impl.h" +#include "ace/Asynch_Acceptor.h" +#include "ace/INET_Addr.h" +#include "ace/SOCK_Connector.h" +#include "ace/SOCK_Acceptor.h" +#include "ace/SOCK_Stream.h" +#include "ace/Message_Block.h" +#include "ace/Get_Opt.h" +#include "ace/Log_Msg.h" +#include "ace/OS_NS_sys_socket.h" + +ACE_RCSID (Proactor, test_proactor, "$Id$") + +#if ((defined (ACE_WIN32) && !defined (ACE_HAS_WINCE)) || (defined (ACE_HAS_AIO_CALLS))) + // This only works on Win32 platforms and on Unix platforms supporting + // POSIX aio calls. + +#include "test_cancel.h" + +static u_short port = ACE_DEFAULT_SERVER_PORT; +static int done = 0; +static int read_size = 2; + + +Receiver::Receiver (void) + : mb_ (read_size + 1), + handle_ (ACE_INVALID_HANDLE) +{ +} + +Receiver::~Receiver (void) +{ + ACE_DEBUG ((LM_DEBUG, + "Receiver: Closing down Remote connection:%d\n", + this->handle_)); + + ACE_OS::closesocket (this->handle_); +} + +void +Receiver::open (ACE_HANDLE handle, + ACE_Message_Block &) +{ + // New connection, initiate stuff + + ACE_DEBUG ((LM_DEBUG, "%N:%l:Receiver::open called\n")); + + // Cache the new connection + this->handle_ = handle; + + // Initiate ACE_Asynch_Read_Stream + if (this->rs_.open (*this, this->handle_) == -1) + { + ACE_ERROR ((LM_ERROR, "%p\n", "ACE_Asynch_Read_Stream::open")); + return; + } + + // Try to read <n> bytes from the stream. + + ACE_DEBUG ((LM_DEBUG, + "Receiver::open: Issuing Asynch Read of (%d) bytes from the stream\n", + read_size)); + + if (this->rs_.read (this->mb_, + read_size) == -1) + ACE_ERROR ((LM_ERROR, + "%p\n", + "Receiver::open: Failed to issue the read")); +} + +void +Receiver::handle_read_stream (const ACE_Asynch_Read_Stream::Result &result) +{ + ACE_DEBUG ((LM_DEBUG, "handle_read_stream called\n")); + + // Reset pointers + result.message_block ().rd_ptr ()[result.bytes_transferred ()] = '\0'; + + ACE_DEBUG ((LM_DEBUG, "********************\n")); + ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_to_read", result.bytes_to_read ())); + ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "handle", result.handle ())); + ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_transfered", result.bytes_transferred ())); + ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "act", (u_long) result.act ())); + ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "success", result.success ())); + ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "completion_key", (u_long) result.completion_key ())); + ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "error", result.error ())); + ACE_DEBUG ((LM_DEBUG, "********************\n")); + ACE_DEBUG ((LM_DEBUG, "%s = %s\n", "message_block", result.message_block ().rd_ptr ())); + + if (result.success () && !result.error ()) + { + // Successful read: No error. + + // Set the pointers back in the message block. + result.message_block ().wr_ptr (result.message_block ().rd_ptr ()); + + // Issue another read, but immediately cancel it. + + // Issue the read. + + ACE_DEBUG ((LM_DEBUG, + "Issuing Asynch Read of (%d) bytes from the stream\n", + read_size)); + + if (this->rs_.read (this->mb_, + read_size) == -1) + ACE_ERROR ((LM_ERROR, + "%p\n", + "Receiver::handle_read_stream: Failed to issue the read")); + + // Cancel the read. + + ACE_DEBUG ((LM_DEBUG, + "Cacelling Asynch Read ")); + + int ret_val = this->rs_.cancel (); + if (ret_val == -1) + ACE_ERROR ((LM_ERROR, + "%p\n", + "Receiver::handle_read_stream: Failed to cancel the read")); + + ACE_DEBUG ((LM_DEBUG, "Asynch IO : Cancel : Result = %d\n", + ret_val)); + } + else + { + done = 1; + + ACE_DEBUG ((LM_DEBUG, "Receiver completed\n")); + + // Print the error message if any. + if (result.error () != 0) + { + errno = result.error (); + + ACE_ERROR ((LM_ERROR, + "%p\n", + "Asynch Read Stream Error: ")); + } + } +} + +static int +parse_args (int argc, ACE_TCHAR *argv[]) +{ + ACE_Get_Opt get_opt (argc, argv, ACE_TEXT("p:s:")); + int c; + + while ((c = get_opt ()) != EOF) + switch (c) + { + case 'p': + port = ACE_OS::atoi (get_opt.opt_arg ()); + break; + case 's': + read_size = ACE_OS::atoi (get_opt.opt_arg ()); + break; + default: + ACE_ERROR ((LM_ERROR, "%p.\n", + "usage :\n" + "-p <port>\n" + "-s <read_size>\n")); + return -1; + } + + return 0; +} + +int +ACE_TMAIN (int argc, ACE_TCHAR *argv[]) +{ + if (parse_args (argc, argv) == -1) + return -1; + + // Note: acceptor parameterized by the Receiver + ACE_Asynch_Acceptor<Receiver> acceptor; + + // Listening passively. + if (acceptor.open (ACE_INET_Addr (port), + read_size, + 1) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + "ACE:acceptor::open failed\n"), + 1); + + int success = 1; + + while (success > 0 && !done) + // dispatch events + success = ACE_Proactor::instance ()->handle_events (); + + return 0; +} + +#else /* ACE_WIN32 && !ACE_HAS_WINCE || ACE_HAS_AIO_CALLS*/ + +int +ACE_TMAIN (int, ACE_TCHAR *[]) +{ + ACE_DEBUG ((LM_DEBUG, + "This example does not work on this platform.\n")); + return 1; +} + +#endif /* ACE_WIN32 && !ACE_HAS_WINCE || ACE_HAS_AIO_CALLS*/ diff --git a/ACE/examples/Reactor/Proactor/test_cancel.h b/ACE/examples/Reactor/Proactor/test_cancel.h new file mode 100644 index 00000000000..45c4bfbc85b --- /dev/null +++ b/ACE/examples/Reactor/Proactor/test_cancel.h @@ -0,0 +1,47 @@ +/* +** $Id$ +*/ + +#ifndef _TEST_CANCEL_H +#define _TEST_CANCEL_H + +#include "ace/Asynch_IO.h" + +class Receiver : public ACE_Service_Handler +{ + // = TITLE + // + // Receiver + // + // = DESCRIPTION + // + // The class will be created by ACE_Asynch_Acceptor when new + // connections arrive. This class will then receive data from + // the network connection and dump it to a file. + +public: + Receiver (void); + ~Receiver (void); + + virtual void open (ACE_HANDLE handle, + ACE_Message_Block &message_block); + // This is called after the new connection has been accepted. + +protected: + // These methods are called by the framework + + virtual void handle_read_stream (const ACE_Asynch_Read_Stream::Result &result); + // This is called when asynchronous read from the socket complete + +private: + ACE_Asynch_Read_Stream rs_; + // rs (read stream): for reading from a socket + + ACE_Message_Block mb_; + // Message block to read from the stream. + + ACE_HANDLE handle_; + // Handle for IO to remote peer +}; + +#endif /* _TEST_CANCEL_H */ diff --git a/ACE/examples/Reactor/Proactor/test_end_event_loop.cpp b/ACE/examples/Reactor/Proactor/test_end_event_loop.cpp new file mode 100644 index 00000000000..096f77b089d --- /dev/null +++ b/ACE/examples/Reactor/Proactor/test_end_event_loop.cpp @@ -0,0 +1,168 @@ +// $Id$ +// ============================================================================ +// +// = FILENAME +// test_end_event_loop.cpp +// +// = DESCRITPTION +// This program tests the event loop mechanism of the +// Proactor. To end the event loop, threads that are blocked in +// waiting for completions are woken up and the event loop comes +// to the end. This is tested in this program. +// +// Threads are doing <run_event_loop> with/without time_out +// values and the main thread calls <end_event_loop>. +// +// = COMPILATION +// make +// +// = RUN +// ./test_end_event_loop +// +// = AUTHOR +// Alexander Babu Arulanthu <alex@cs.wustl.edu> +// +// ===================================================================== + +#include "ace/OS_NS_unistd.h" +#include "ace/Proactor.h" +#include "ace/Task.h" +#include "ace/WIN32_Proactor.h" +#include "ace/POSIX_Proactor.h" +#include "ace/OS_main.h" + +#if ((defined (ACE_WIN32) && !defined (ACE_HAS_WINCE)) || \ + (defined (ACE_HAS_AIO_CALLS)) && !defined (ACE_POSIX_AIOCB_PROACTOR)) +// This only works on Win32 platforms and on Unix platforms supporting +// POSIX aio calls. + +class My_Task: public ACE_Task <ACE_NULL_SYNCH> +{ + // = TITLE + // + // Contains thread functions which execute event loops. Each + // thread waits for a different signal. + // +public: + // Constructor. + My_Task (void) + : time_flag_ (0) + {} + + + virtual ~My_Task (void) {} + // Destructor. + + // If time_flag is zero do the eventloop indefinitely, otherwise do + // it for finite amount of time (13secs!!!). + int open (void *timed_event_loop) + { + // Set the local variable. + if (timed_event_loop == 0) + this->time_flag_ = 0; + else + this->time_flag_ = 1; + + // Spawn the threads. + if (this->activate (THR_NEW_LWP, 5) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + "%N:%l:%p\n", + "My_Task:open: <activate> failed"), + -1); + + return 0; + } + + // Thread function. + int svc (void) + { + ACE_DEBUG ((LM_DEBUG, + "(%P|%t):Starting svc routine\n")); + + if (this->time_flag_) + { + ACE_DEBUG ((LM_DEBUG, + "(%P|%t):Going to do *timed* <run_event_loop> \n")); + + ACE_Time_Value run_time (13); + + if (ACE_Proactor::instance ()->run_event_loop (run_time) == -1) + ACE_ERROR_RETURN ((LM_ERROR, "(%P|%t):%p.\n", + "<Proactor::run_event_loop> failed"), + -1); + } + else + { + ACE_DEBUG ((LM_DEBUG, + "(%P|%t):Going to do *indefinite* <run_event_loop> \n")); + + if (ACE_Proactor::instance ()->run_event_loop () == -1) + ACE_ERROR_RETURN ((LM_ERROR, "(%P|%t):%p.\n", + "<Proactor::run_event_loop> failed"), + -1); + } + return 0; + }; + +private: + int time_flag_; + // If zero, indefinite event loop, otherwise timed event loop. +}; + +int +ACE_TMAIN (int argc, ACE_TCHAR *argv []) +{ + ACE_UNUSED_ARG (argc); + ACE_UNUSED_ARG (argv); + + ACE_DEBUG ((LM_DEBUG, + "(%P | %t):Test starts \n")); + + // Let us get the singleton proactor created here. This is very + // important. This will mask the signal used in the Proactor masked + // for the main thread (and all the threads). + ACE_Proactor *proactor = ACE_Proactor::instance (); + ACE_UNUSED_ARG (proactor); + + My_Task task1, task2; + + // Test the indefinite run event loop. + if (task1.open (0) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + "%N:%l:(%P | %t):Failed to <open> the task\n"), + 1); + + // Test the indefinite run event loop. Just pass a non-zero. + if (task2.open ((void *)&task2) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + "%N:%l:(%P | %t):Failed to <open> the task\n"), + 1); + + // Give a gap. + ACE_OS::sleep (3); + + // End the event loop. + if (ACE_Proactor::instance ()->end_event_loop () == -1) + ACE_ERROR_RETURN ((LM_ERROR, + "%N:%l:(%P | %t):Failed to <end_event_loop>\n"), + 1); + + ACE_Thread_Manager::instance ()->wait (); + + ACE_DEBUG ((LM_DEBUG, + "(%P | %t):Test ends\n")); + return 0; +} + +#else /* ACE_WIN32 && !ACE_HAS_WINCE || ACE_HAS_AIO_CALLS && !ACE_POSIX_AIOCB_PROACTOR*/ + +int +main (int, char *[]) +{ + ACE_DEBUG ((LM_DEBUG, + "This example cannot work with AIOCB_Proactor.\n")); + return 1; +} + +#endif /* ACE_WIN32 && !ACE_HAS_WINCE || ACE_HAS_AIO_CALLS && !ACE_POSIX_AIOCB_PROACTOR*/ + diff --git a/ACE/examples/Reactor/Proactor/test_multiple_loops.cpp b/ACE/examples/Reactor/Proactor/test_multiple_loops.cpp new file mode 100644 index 00000000000..ac4228ab641 --- /dev/null +++ b/ACE/examples/Reactor/Proactor/test_multiple_loops.cpp @@ -0,0 +1,140 @@ +// $Id$ + +// ============================================================================ +// +// = LIBRARY +// examples +// +// = FILENAME +// test_multiple_loops.cpp +// +// = DESCRIPTION +// +// This example application shows how to write programs that +// combine the Proactor and Reactor event loops. This is possible +// only on WIN32 platform. +// +// = AUTHOR +// Irfan Pyarali +// +// ============================================================================ + +#include "ace/Task.h" +#include "ace/Proactor.h" +#include "ace/WIN32_Proactor.h" +#include "ace/Atomic_Op.h" +#include "ace/OS_NS_unistd.h" + +ACE_RCSID(Proactor, test_multiple_loops, "$Id$") + +#if (defined (ACE_WIN32) && !defined (ACE_HAS_WINCE)) + +class Timeout_Handler : public ACE_Handler, public ACE_Event_Handler +{ + // = TITLE + // Generic timeout handler. + +public: + Timeout_Handler (void) + { + } + + // This is called by the Proactor. This is declared in ACE_Handler. + virtual void handle_time_out (const ACE_Time_Value &tv, + const void *arg) + { + // Print out when timeouts occur. + ACE_DEBUG ((LM_DEBUG, "(%t) %d timeout occurred for %s @ %d.\n", + ++count_, + (char *) arg, + tv.sec ())); + + // Since there is only one thread that can do the timeouts in + // Reactor, lets keep the handle_timeout short for that + // thread. + if (ACE_OS::strcmp ((char *) arg, "Proactor") == 0) + // Sleep for a while + ACE_OS::sleep (1); + } + + // This method is declared in ACE_Event_Handler. + virtual int handle_timeout (const ACE_Time_Value &tv, + const void *arg) + { + this->handle_time_out (tv, arg); + return 0; + } + +private: + ACE_Atomic_Op <ACE_Thread_Mutex, int> count_; +}; + +class Worker : public ACE_Task <ACE_NULL_SYNCH> +{ +public: + + // Thread fuction. + int svc (void) + { + ACE_DEBUG ((LM_DEBUG, "(%t) Worker started\n")); + + // Handle events for 13 seconds. + ACE_Time_Value run_time (13); + + // Try to become the owner + ACE_Reactor::instance ()->owner (ACE_Thread::self ()); + + if (ACE_Reactor::run_event_loop (run_time) == -1) + ACE_ERROR_RETURN ((LM_ERROR, "%p.\n", "Worker::svc"), -1); + else + ACE_DEBUG ((LM_DEBUG, "(%t) work complete\n")); + + return 0; + } +}; + +int +ACE_TMAIN (int, ACE_TCHAR *[]) +{ + Timeout_Handler handler; + ACE_WIN32_Proactor win32_proactor (0, 1); + ACE_Proactor proactor (&win32_proactor, 0, 0); + + ACE_Reactor::instance ()->register_handler (proactor.implementation ()); + + // Register a 2 second timer. + ACE_Time_Value foo_tv (2); + if (proactor.schedule_timer (handler, + (void *) "Proactor", + ACE_Time_Value::zero, + foo_tv) == -1) + ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "schedule_timer"), -1); + + // Register a 3 second timer. + ACE_Time_Value bar_tv (3); + if (ACE_Reactor::instance ()->schedule_timer (&handler, + (void *) "Reactor", + ACE_Time_Value::zero, + bar_tv) == -1) + ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "schedule_timer"), -1); + + Worker worker; + + if (worker.activate (THR_NEW_LWP, 10) == -1) + ACE_ERROR_RETURN ((LM_ERROR, "%p.\n", "main"), -1); + + ACE_Thread_Manager::instance ()->wait (); + + // Remove from reactor + ACE_Reactor::instance ()->remove_handler (&proactor, + ACE_Event_Handler::DONT_CALL); + + return 0; +} +#else +int +main (int, char *[]) +{ + return 0; +} +#endif /* ACE_WIN32 && !ACE_HAS_WINCE */ diff --git a/ACE/examples/Reactor/Proactor/test_proactor.cpp b/ACE/examples/Reactor/Proactor/test_proactor.cpp new file mode 100644 index 00000000000..035a2facf6a --- /dev/null +++ b/ACE/examples/Reactor/Proactor/test_proactor.cpp @@ -0,0 +1,679 @@ +// $Id$ + +// ============================================================================ +// +// = LIBRARY +// examples +// +// = FILENAME +// test_proactor.cpp +// +// = DESCRIPTION +// This program illustrates how the <ACE_Proactor> can be used to +// implement an application that does various asynchronous +// operations. +// +// = AUTHOR +// Irfan Pyarali <irfan@cs.wustl.edu> +// +// ============================================================================ + +#include "ace/OS_NS_string.h" +#include "ace/OS_main.h" +#include "ace/Service_Config.h" +#include "ace/Proactor.h" +#include "ace/Asynch_IO.h" +#include "ace/Asynch_IO_Impl.h" +#include "ace/Asynch_Acceptor.h" +#include "ace/INET_Addr.h" +#include "ace/SOCK_Connector.h" +#include "ace/SOCK_Acceptor.h" +#include "ace/SOCK_Stream.h" +#include "ace/Message_Block.h" +#include "ace/Get_Opt.h" +#include "ace/Log_Msg.h" +#include "ace/OS_NS_sys_stat.h" +#include "ace/OS_NS_sys_socket.h" +#include "ace/OS_NS_unistd.h" +#include "ace/OS_NS_fcntl.h" + +ACE_RCSID(Proactor, test_proactor, "$Id$") + +#if ((defined (ACE_WIN32) && !defined (ACE_HAS_WINCE)) || (defined (ACE_HAS_AIO_CALLS))) + // This only works on Win32 platforms and on Unix platforms supporting + // POSIX aio calls. + +#include "test_proactor.h" + + +// Host that we're connecting to. +static ACE_TCHAR *host = 0; + +// Port that we're receiving connections on. +static u_short port = ACE_DEFAULT_SERVER_PORT; + +// File that we're sending. +static const ACE_TCHAR *file = ACE_TEXT("test_proactor.cpp"); + +// Name of the output file. +static const ACE_TCHAR *dump_file = ACE_TEXT("output"); + +// Keep track of when we're done. +static int done = 0; + +// Size of each initial asynchronous <read> operation. +static int initial_read_size = BUFSIZ; + + +Receiver::Receiver (void) + : dump_file_ (ACE_INVALID_HANDLE), + handle_ (ACE_INVALID_HANDLE) +{ +} + +Receiver::~Receiver (void) +{ + ACE_OS::close (this->dump_file_); + ACE_OS::closesocket (this->handle_); +} + +void +Receiver::open (ACE_HANDLE handle, + ACE_Message_Block &message_block) +{ + ACE_DEBUG ((LM_DEBUG, + "%N:%l:Receiver::open called\n")); + + // New connection, so initiate stuff. + + // Cache the new connection + this->handle_ = handle; + + // File offset starts at zero + this->file_offset_ = 0; + + // Open dump file (in OVERLAPPED mode) + this->dump_file_ = ACE_OS::open (dump_file, + O_CREAT | O_RDWR | O_TRUNC | \ + FILE_FLAG_OVERLAPPED); + if (this->dump_file_ == ACE_INVALID_HANDLE) + { + ACE_ERROR ((LM_ERROR, + "%p\n", + "ACE_OS::open")); + return; + } + + // Initiate <ACE_Asynch_Write_File>. + if (this->wf_.open (*this, + this->dump_file_) == -1) + { + ACE_ERROR ((LM_ERROR, + "%p\n", + "ACE_Asynch_Write_File::open")); + return; + } + + // Initiate <ACE_Asynch_Read_Stream>. + if (this->rs_.open (*this, this->handle_) == -1) + { + ACE_ERROR ((LM_ERROR, + "%p\n", + "ACE_Asynch_Read_Stream::open")); + return; + } + + // Fake the result and make the <handle_read_stream> get + // called. But, not, if there is '0' is transferred. + if (message_block.length () != 0) + { + // Duplicate the message block so that we can keep it around. + ACE_Message_Block &duplicate = + *message_block.duplicate (); + + // Fake the result so that we will get called back. + ACE_Asynch_Read_Stream_Result_Impl *fake_result = + ACE_Proactor::instance ()->create_asynch_read_stream_result (this->proxy (), + this->handle_, + duplicate, + initial_read_size, + 0, + ACE_INVALID_HANDLE, + 0, + 0); + + size_t bytes_transferred = message_block.length (); + + // <complete> for Accept would have already moved the <wr_ptr> + // forward. Update it to the beginning position. + duplicate.wr_ptr (duplicate.wr_ptr () - bytes_transferred); + + // This will call the callback. + fake_result->complete (message_block.length (), + 1, + 0); + + // Zap the fake result. + delete fake_result; + } + else + // Otherwise, make sure we proceed. Initiate reading the socket + // stream. + if (this->initiate_read_stream () == -1) + return; +} + +int +Receiver::initiate_read_stream (void) +{ + // Create a new <Message_Block>. Note that this message block will + // be used both to <read> data asynchronously from the socket and to + // <write> data asynchronously to the file. + ACE_Message_Block *mb = 0; + ACE_NEW_RETURN (mb, + ACE_Message_Block (BUFSIZ + 1), + -1); + + // Inititiate read + if (this->rs_.read (*mb, + mb->size () - 1) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + "%p\n", + "ACE_Asynch_Read_Stream::read"), + -1); + return 0; +} + +void +Receiver::handle_read_stream (const ACE_Asynch_Read_Stream::Result &result) +{ + ACE_DEBUG ((LM_DEBUG, + "handle_read_stream called\n")); + + // Reset pointers. + result.message_block ().rd_ptr ()[result.bytes_transferred ()] = '\0'; + + ACE_DEBUG ((LM_DEBUG, "********************\n")); + ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_to_read", result.bytes_to_read ())); + ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "handle", result.handle ())); + ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_transfered", result.bytes_transferred ())); + ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "act", (u_long) result.act ())); + ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "success", result.success ())); + ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "completion_key", (u_long) result.completion_key ())); + ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "error", result.error ())); + ACE_DEBUG ((LM_DEBUG, "********************\n")); +#if 0 + // This can overrun the ACE_Log_Msg buffer and do bad things. + // Re-enable it at your risk. + ACE_DEBUG ((LM_DEBUG, "%s = %s\n", "message_block", result.message_block ().rd_ptr ())); +#endif /* 0 */ + + if (result.success () && result.bytes_transferred () != 0) + { + // Successful read: write the data to the file asynchronously. + // Note how we reuse the <ACE_Message_Block> for the writing. + // Therefore, we do not delete this buffer because it is handled + // in <handle_write_stream>. + if (this->wf_.write (result.message_block (), + result.bytes_transferred (), + this->file_offset_) == -1) + { + ACE_ERROR ((LM_ERROR, + "%p\n", + "ACE_Asynch_Write_File::write")); + return; + } + + // Initiate new read from the stream. + if (this->initiate_read_stream () == -1) + return; + } + else + { + ACE_DEBUG ((LM_DEBUG, + "Receiver completed\n")); + + // No need for this message block anymore. + result.message_block ().release (); + + // Note that we are done with the test. + done = 1; + + // We are done: commit suicide. + delete this; + } +} + +void +Receiver::handle_write_file (const ACE_Asynch_Write_File::Result &result) +{ + ACE_DEBUG ((LM_DEBUG, "handle_write_file called\n")); + + ACE_DEBUG ((LM_DEBUG, "********************\n")); + ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_to_write", result.bytes_to_write ())); + ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "handle", result.handle ())); + ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_transfered", result.bytes_transferred ())); + ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "act", (u_long) result.act ())); + ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "success", result.success ())); + ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "completion_key", (u_long) result.completion_key ())); + ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "error", result.error ())); + ACE_DEBUG ((LM_DEBUG, "********************\n")); + + result.message_block ().release (); + + if (result.success ()) + // Write successful: Increment file offset + this->file_offset_ += result.bytes_transferred (); + + // This code is not robust enough to deal with short file writes + // (which hardly ever happen) ;-) + ACE_ASSERT (result.bytes_to_write () == result.bytes_transferred ()); +} + +class Sender : public ACE_Handler +{ + // = TITLE + // The class will be created by <main>. After connecting to the + // host, this class will then read data from a file and send it + // to the network connection. +public: + Sender (void); + ~Sender (void); + int open (const ACE_TCHAR *host, + u_short port); + ACE_HANDLE handle (void) const; + void handle (ACE_HANDLE); + +protected: + // These methods are called by the freamwork + + virtual void handle_transmit_file (const ACE_Asynch_Transmit_File::Result &result); + // This is called when asynchronous transmit files complete + virtual void handle_write_stream (const ACE_Asynch_Write_Stream::Result &result); + // This is called when asynchronous writes from the socket complete + virtual void handle_read_file (const ACE_Asynch_Read_File::Result &result); + // This is called when asynchronous reads from the socket complete + +private: + int transmit_file (void); + // Transmit the entire file in one fell swoop. + + int initiate_read_file (void); + // Initiate an asynchronous file read. + + ACE_SOCK_Stream stream_; + // Network I/O handle + + ACE_Asynch_Write_Stream ws_; + // ws (write stream): for writing to the socket + + ACE_Asynch_Read_File rf_; + // rf (read file): for writing from the file + + ACE_Asynch_Transmit_File tf_; + // Transmit file. + + ACE_HANDLE input_file_; + // File to read from + + u_long file_offset_; + // Current file offset + + u_long file_size_; + // File size + + ACE_Message_Block welcome_message_; + // Welcome message + + ACE_Asynch_Transmit_File::Header_And_Trailer header_and_trailer_; + // Header and trailer which goes with transmit_file + + int stream_write_done_; + int transmit_file_done_; + // These flags help to determine when to close down the event loop +}; + +Sender::Sender (void) + : input_file_ (ACE_INVALID_HANDLE), + file_offset_ (0), + file_size_ (0), + stream_write_done_ (0), + transmit_file_done_ (0) +{ + // Moment of inspiration... :-) + static const char *data = "Welcome to Irfan World! Irfan RULES here !!\n"; + this->welcome_message_.init (data, + ACE_OS::strlen (data)); + this->welcome_message_.wr_ptr (ACE_OS::strlen (data)); +} + +Sender::~Sender (void) +{ + this->stream_.close (); +} + +ACE_HANDLE +Sender::handle (void) const +{ + return this->stream_.get_handle (); +} + +void +Sender::handle (ACE_HANDLE handle) +{ + this->stream_.set_handle (handle); +} + +int +Sender::open (const ACE_TCHAR *host, + u_short port) +{ + // Initialize stuff + + // Open input file (in OVERLAPPED mode) + this->input_file_ = + ACE_OS::open (file, GENERIC_READ | FILE_FLAG_OVERLAPPED); + if (this->input_file_ == ACE_INVALID_HANDLE) + ACE_ERROR_RETURN ((LM_ERROR, + "%p\n", + "ACE_OS::open"), -1); + + // Find file size + this->file_size_ = + ACE_OS::filesize (this->input_file_); + + // Connect to remote host + ACE_INET_Addr address (port, host); + ACE_SOCK_Connector connector; + if (connector.connect (this->stream_, + address) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + "%p\n", + "ACE_SOCK_Connector::connect"), + -1); + + // Open ACE_Asynch_Write_Stream + if (this->ws_.open (*this) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + "%p\n", + "ACE_Asynch_Write_Stream::open"), + -1); + + // Open ACE_Asynch_Read_File + if (this->rf_.open (*this, this->input_file_) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + "%p\n", + "ACE_Asynch_Read_File::open"), + -1); + + // Start an asynchronous transmit file + if (this->transmit_file () == -1) + return -1; + + // Start an asynchronous read file + if (this->initiate_read_file () == -1) + return -1; + + return 0; +} + +int +Sender::transmit_file (void) +{ + // Open file (in SEQUENTIAL_SCAN mode) + ACE_HANDLE file_handle = + ACE_OS::open (file, GENERIC_READ | FILE_FLAG_SEQUENTIAL_SCAN); + if (file_handle == ACE_INVALID_HANDLE) + ACE_ERROR_RETURN ((LM_ERROR, + "%p\n", + "ACE_OS::open"), + -1); + + // Open ACE_Asynch_Transmit_File + if (this->tf_.open (*this) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + "%p\n", + "ACE_Asynch_Transmit_File::open"), + -1); + + // Header and trailer data for the file. + // @@ What happens if header and trailer are the same? + this->header_and_trailer_.header_and_trailer (&this->welcome_message_, + this->welcome_message_.length (), + &this->welcome_message_, + this->welcome_message_.length ()); + + // Send the entire file in one fell swoop! + if (this->tf_.transmit_file (file_handle, + &this->header_and_trailer_) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + "%p\n", + "ACE_Asynch_Transmit_File::transmit_file"), + -1); + + return 0; +} + +void +Sender::handle_transmit_file (const ACE_Asynch_Transmit_File::Result &result) +{ + ACE_DEBUG ((LM_DEBUG, + "handle_transmit_file called\n")); + + ACE_DEBUG ((LM_DEBUG, "********************\n")); + ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "socket", result.socket ())); + ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "file", result.file ())); + ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_to_write", result.bytes_to_write ())); + ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_per_send", result.bytes_per_send ())); + ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "flags", result.flags ())); + ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_transfered", result.bytes_transferred ())); + ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "act", (u_long) result.act ())); + ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "success", result.success ())); + ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "completion_key", (u_long) result.completion_key ())); + ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "error", result.error ())); + ACE_DEBUG ((LM_DEBUG, "********************\n")); + + // Done with file + ACE_OS::close (result.file ()); + + this->transmit_file_done_ = 1; + if (this->stream_write_done_) + done = 1; +} + +int +Sender::initiate_read_file (void) +{ + // Create a new <Message_Block>. Note that this message block will + // be used both to <read> data asynchronously from the file and to + // <write> data asynchronously to the socket. + ACE_Message_Block *mb = 0; + ACE_NEW_RETURN (mb, + ACE_Message_Block (BUFSIZ + 1), + -1); + + // Inititiate an asynchronous read from the file + if (this->rf_.read (*mb, + mb->size () - 1, + this->file_offset_) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + "%p\n", + "ACE_Asynch_Read_File::read"), + -1); + return 0; +} + +void +Sender::handle_read_file (const ACE_Asynch_Read_File::Result &result) +{ + ACE_DEBUG ((LM_DEBUG, + "handle_read_file called\n")); + + result.message_block ().rd_ptr ()[result.bytes_transferred ()] = '\0'; + + ACE_DEBUG ((LM_DEBUG, "********************\n")); + ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_to_read", result.bytes_to_read ())); + ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "handle", result.handle ())); + ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_transfered", result.bytes_transferred ())); + ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "act", (u_long) result.act ())); + ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "success", result.success ())); + ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "completion_key", (u_long) result.completion_key ())); + ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "error", result.error ())); + ACE_DEBUG ((LM_DEBUG, "********************\n")); + //ACE_DEBUG ((LM_DEBUG, "%s = %s\n", "message_block", result.message_block ().rd_ptr ())); + + if (result.success ()) + { + // Read successful: increment offset and write data to network. + // Note how we reuse the <ACE_Message_Block> for the writing. + // Therefore, we do not delete this buffer because it is handled + // in <handle_write_stream>. + + this->file_offset_ += result.bytes_transferred (); + if (this->ws_.write (result.message_block (), + result.bytes_transferred ()) == -1) + { + ACE_ERROR ((LM_ERROR, + "%p\n", + "ACE_Asynch_Write_Stream::write")); + return; + } + + if (this->file_size_ > this->file_offset_) + { + // Start an asynchronous read file. + if (initiate_read_file () == -1) + return; + } + } +} + +void +Sender::handle_write_stream (const ACE_Asynch_Write_Stream::Result &result) +{ + ACE_DEBUG ((LM_DEBUG, + "handle_write_stream called\n")); + + // Reset pointers. + result.message_block ().rd_ptr (result.message_block ().rd_ptr () - result.bytes_transferred ()); + + ACE_DEBUG ((LM_DEBUG, "********************\n")); + ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_to_write", result.bytes_to_write ())); + ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "handle", result.handle ())); + ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_transfered", result.bytes_transferred ())); + ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "act", (u_long) result.act ())); + ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "success", result.success ())); + ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "completion_key", (u_long) result.completion_key ())); + ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "error", result.error ())); + ACE_DEBUG ((LM_DEBUG, "********************\n")); +#if 0 + ACE_DEBUG ((LM_DEBUG, "%s = %s\n", "message_block", result.message_block ().rd_ptr ())); +#endif + + if (result.success ()) + { + // Partial write to socket + int unsent_data = result.bytes_to_write () - result.bytes_transferred (); + if (unsent_data != 0) + { + // Reset pointers + result.message_block ().rd_ptr (result.bytes_transferred ()); + + // Duplicate the message block and retry remaining data + if (this->ws_.write (*result.message_block ().duplicate (), + unsent_data) == -1) + { + ACE_ERROR ((LM_ERROR, + "%p\n", + "ACE_Asynch_Write_Stream::write")); + return; + } + } + else if (!(this->file_size_ > this->file_offset_)) + { + this->stream_write_done_ = 1; + if (this->transmit_file_done_) + done = 1; + } + } + + // Release message block. + result.message_block ().release (); +} + +static int +parse_args (int argc, ACE_TCHAR *argv[]) +{ + ACE_Get_Opt get_opt (argc, argv, ACE_TEXT("h:p:f:d:")); + int c; + + while ((c = get_opt ()) != EOF) + switch (c) + { + case 'h': + host = get_opt.opt_arg (); + break; + case 'p': + port = ACE_OS::atoi (get_opt.opt_arg ()); + break; + case 'f': + file = get_opt.opt_arg (); + break; + case 'd': + dump_file = get_opt.opt_arg (); + break; + default: + ACE_ERROR ((LM_ERROR, "%p.\n", + "usage :\n" + "-h <host>\n" + "-p <port>\n" + "-f <file>\n")); + return -1; + } + + return 0; +} + +int +ACE_TMAIN (int argc, ACE_TCHAR *argv[]) +{ + if (parse_args (argc, argv) == -1) + return -1; + + Sender sender; + + // Note: acceptor parameterized by the Receiver. + ACE_Asynch_Acceptor<Receiver> acceptor; + + // If passive side + if (host == 0) + { + if (acceptor.open (ACE_INET_Addr (port), + initial_read_size, + 1) == -1) + return -1; + } + // If active side + else if (sender.open (host, port) == -1) + return -1; + + int success = 1; + + while (success > 0 && !done) + // Dispatch events via Proactor singleton. + success = ACE_Proactor::instance ()->handle_events (); + + return 0; +} + +#else /* ACE_WIN32 && !ACE_HAS_WINCE || ACE_HAS_AIO_CALLS*/ + +int +ACE_TMAIN (int, ACE_TCHAR *[]) +{ + ACE_DEBUG ((LM_DEBUG, + "This example does not work on this platform.\n")); + return 1; +} + +#endif /* ACE_WIN32 && !ACE_HAS_WINCE || ACE_HAS_AIO_CALLS*/ diff --git a/ACE/examples/Reactor/Proactor/test_proactor.h b/ACE/examples/Reactor/Proactor/test_proactor.h new file mode 100644 index 00000000000..482e176041e --- /dev/null +++ b/ACE/examples/Reactor/Proactor/test_proactor.h @@ -0,0 +1,56 @@ +/* +** $Id$ +*/ + +#ifndef _TEST_PROACTOR_H +#define _TEST_PROACTOR_H + +#include "ace/Asynch_IO.h" + +class Receiver : public ACE_Service_Handler +{ + // = TITLE + // The class will be created by <ACE_Asynch_Acceptor> when new + // connections arrive. This class will then receive data from + // the network connection and dump it to a file. +public: + // = Initialization and termination. + Receiver (void); + ~Receiver (void); + + virtual void open (ACE_HANDLE handle, + ACE_Message_Block &message_block); + // This is called after the new connection has been accepted. + +protected: + // These methods are called by the framework + + virtual void handle_read_stream (const ACE_Asynch_Read_Stream::Result &result); + // This is called when asynchronous <read> operation from the socket + // complete. + + virtual void handle_write_file (const ACE_Asynch_Write_File::Result &result); + // This is called when an asynchronous <write> to the file + // completes. + +private: + int initiate_read_stream (void); + // Initiate an asynchronous <read> operation on the socket. + + ACE_Asynch_Read_Stream rs_; + // rs (read stream): for reading from a socket. + + ACE_HANDLE dump_file_; + // File for dumping data. + + ACE_Asynch_Write_File wf_; + // wf (write file): for writing to a file. + + u_long file_offset_; + // Offset for the file. + + ACE_HANDLE handle_; + // Handle for IO to remote peer. +}; + +#endif /* _TEST_PROACTOR_H */ diff --git a/ACE/examples/Reactor/Proactor/test_proactor2.cpp b/ACE/examples/Reactor/Proactor/test_proactor2.cpp new file mode 100644 index 00000000000..cd5cbf7092e --- /dev/null +++ b/ACE/examples/Reactor/Proactor/test_proactor2.cpp @@ -0,0 +1,808 @@ +// $Id$ + +// ============================================================================ +// +// = LIBRARY +// examples +// +// = FILENAME +// test_proactor2.cpp +// +// = DESCRIPTION +// Alexander Libman <Alibman@baltimore.com> modified +// <test_proactor> and made this test. Instead of writing received +// data to the file, the receiver sends them back to the +// sender,i.e. ACE_Asynch_Write_File wf_ has been changed to +// ACE_Asynch_Write_Stream wf_. +// +// = AUTHOR +// Irfan Pyarali <irfan@cs.wustl.edu> and Alexander Libman +// <Alibman@baltimore.com>. +// ============================================================================ + +#include "ace/Signal.h" + +#include "ace/Service_Config.h" +#include "ace/Proactor.h" +#include "ace/Asynch_IO.h" +#include "ace/Asynch_IO_Impl.h" +#include "ace/Asynch_Acceptor.h" +#include "ace/INET_Addr.h" +#include "ace/SOCK_Connector.h" +#include "ace/SOCK_Acceptor.h" +#include "ace/SOCK_Stream.h" +#include "ace/Message_Block.h" +#include "ace/Get_Opt.h" + +// FUZZ: disable check_for_streams_include +#include "ace/streams.h" + +#include "ace/Task.h" +#include "ace/OS_main.h" + +ACE_RCSID(Proactor, test_proactor2, "test_proactor2.cpp,v 1.27 2000/03/07 17:15:56 schmidt Exp") + +#if ((defined (ACE_WIN32) && !defined (ACE_HAS_WINCE)) || (defined (ACE_HAS_AIO_CALLS))) + // This only works on Win32 platforms and on Unix platforms supporting + // POSIX aio calls. + +#if defined (ACE_WIN32) && !defined (ACE_HAS_WINCE) + +#include "ace/WIN32_Proactor.h" + +#elif defined (ACE_HAS_AIO_CALLS) + +#include "ace/POSIX_Proactor.h" + +#endif + + // Some debug helper functions + int DisableSignal ( int SigNum ); +int PrintSigMask (); + +#define COUT(X) cout << X ; cout.flush (); + +// Host that we're connecting to. +static ACE_TCHAR *host = 0; + +// duplex mode: ==0 half-duplex +// !=0 full duplex +static int duplex = 0 ; + +// number threads in the Proactor thread pool +static int nThreads = 1; + +// Port that we're receiving connections on. +static u_short port = ACE_DEFAULT_SERVER_PORT; + +// Size of each initial asynchronous <read> operation. +static int initial_read_size = BUFSIZ; + + +#define MyMutex ACE_Recursive_Thread_Mutex +//#define MyMutex ACE_Thread_Mutex +//#define MyMutex ACE_Null_Mutex + +//-------------------------------------------------------------------------- +// MyTask plays role for Proactor threads pool +//-------------------------------------------------------------------------- +class MyTask: public ACE_Task<ACE_MT_SYNCH> +{ + +public: + + int svc (void) ; +}; + + +int MyTask::svc (void ) +{ + ACE_DEBUG ((LM_DEBUG, "(%t) MyTask started\n")); + + while ( ACE_Proactor::event_loop_done () == 0 ) + { + ACE_Proactor::run_event_loop (); + } + + ACE_DEBUG ((LM_DEBUG, "(%t) MyTask finished\n")); + return 0 ; +} + +//----------------------------------------------------------- +// Receiver +//----------------------------------------------------------- +class Receiver : public ACE_Service_Handler +{ +public: + + Receiver (void); + ~Receiver (void); + + virtual void open (ACE_HANDLE handle, + ACE_Message_Block &message_block); + // This is called after the new connection has been accepted. + +protected: + // These methods are called by the framework + + virtual void handle_read_stream (const ACE_Asynch_Read_Stream::Result + &result); + // This is called when asynchronous <read> operation from the socket + // complete. + + virtual void handle_write_stream (const ACE_Asynch_Write_Stream::Result + &result); + // This is called when an asynchronous <write> to the file + // completes. + +private: + int initiate_read_stream (void); + int initiate_write_stream (ACE_Message_Block & mb, int nBytes ); + bool check_destroy () ; + + ACE_Asynch_Read_Stream rs_; + ACE_Asynch_Write_Stream ws_; + ACE_HANDLE handle_; + MyMutex m_Mtx ; + long nIOCount ; + static long nSessions ; +}; + + +long Receiver::nSessions = 0 ; + +Receiver::Receiver (void) + : handle_ (ACE_INVALID_HANDLE), + nIOCount ( 0 ) +{ + ACE_Guard<MyMutex> locker (m_Mtx) ; + nSessions ++ ; + ACE_DEBUG ((LM_DEBUG, "Receiver Ctor nSessions=%d\n", nSessions )); +} + +Receiver::~Receiver (void) +{ + ACE_Guard<MyMutex> locker (m_Mtx) ; + nSessions -- ; + ACE_OS::closesocket (this->handle_); + ACE_DEBUG ((LM_DEBUG, "~Receiver Dtor nSessions=%d\n", nSessions )); +} + +//--------------------------------------------------------------------- +// return true if we alive, false we commited suicide +// +//--------------------------------------------------------------------- +bool Receiver::check_destroy () +{ + { + ACE_Guard<MyMutex> locker (m_Mtx) ; + + if ( nIOCount > 0 ) + { + return true ; + } + } + + delete this ; + return false ; +} + + +void Receiver::open (ACE_HANDLE handle, + ACE_Message_Block &message_block) +{ + ACE_UNUSED_ARG (message_block); + + ACE_DEBUG ((LM_DEBUG, + "%N:%l:Receiver::open called\n")); + + + this->handle_ = handle; + + if (this->ws_.open (*this, this->handle_ ) == -1) + { + ACE_ERROR ((LM_ERROR, + "%p\n", + "ACE_Asynch_Write_Stream::open")); + + } + else if (this->rs_.open (*this, this->handle_) == -1) + { + ACE_ERROR ((LM_ERROR, + "%p\n", + "ACE_Asynch_Read_Stream::open")); + } + else + { + initiate_read_stream (); + } + + + check_destroy (); +} + +int Receiver::initiate_read_stream (void) +{ + ACE_Guard<MyMutex> locker (m_Mtx) ; + + // Create a new <Message_Block>. Note that this message block will + // be used both to <read> data asynchronously from the socket and to + // <write> data asynchronously to the file. + ACE_DEBUG ((LM_DEBUG, + "initiate_read_stream called\n")); + + + ACE_Message_Block *mb = 0; + ACE_NEW_RETURN (mb, + ACE_Message_Block (BUFSIZ + 1), + -1); + + // Inititiate read + if (this->rs_.read (*mb, mb->size ()- 1) == -1) + { + mb->release () ; + ACE_ERROR_RETURN ((LM_ERROR, + "%p\n", + "ACE_Asynch_Read_Stream::read"), + -1); + } + + nIOCount++ ; + return 0; +} + +int Receiver::initiate_write_stream (ACE_Message_Block & mb, int nBytes ) +{ + ACE_Guard<MyMutex> locker (m_Mtx) ; + if (this->ws_.write (mb , nBytes ) == -1) + { + mb.release (); + ACE_ERROR_RETURN((LM_ERROR, + "%p\n", + "ACE_Asynch_Write_File::write"), + -1); + } + + nIOCount++ ; + return 0; +} + +void +Receiver::handle_read_stream (const ACE_Asynch_Read_Stream::Result &result) +{ + ACE_DEBUG ((LM_DEBUG, + "handle_read_stream called\n")); + + // Reset pointers. + result.message_block ().rd_ptr ()[result.bytes_transferred ()] = + '\0'; + + ACE_DEBUG ((LM_DEBUG, "********************\n")); + ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_to_read", result.bytes_to_read + ())); + ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "handle", result.handle ())); + ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_transfered", + result.bytes_transferred ())); + ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "act", (u_long) result.act ())); + ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "success", result.success ())); + ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "completion_key", (u_long) + result.completion_key ())); + ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "error", result.error ())); + ACE_DEBUG ((LM_DEBUG, "********************\n")); + ACE_DEBUG ((LM_DEBUG, "%s = %s\n", "message_block", + result.message_block ().rd_ptr ())); + + if ( result.success () && result.bytes_transferred () != 0) + { + // Successful read: write the data to the file asynchronously. + // Note how we reuse the <ACE_Message_Block> for the writing. + // Therefore, we do not delete this buffer because it is handled + // in <handle_write_stream>. + + if(this->initiate_write_stream (result.message_block (), + + result.bytes_transferred () ) == 0 ) + { + if ( duplex != 0 ) + { + // Initiate new read from the stream. + this->initiate_read_stream () ; + } + } + } + else + { + result.message_block ().release (); + ACE_DEBUG ((LM_DEBUG, "Receiver completed\n")); + } + + { + ACE_Guard<MyMutex> locker (m_Mtx) ; + nIOCount-- ; + } + check_destroy () ; +} + +void +Receiver::handle_write_stream (const ACE_Asynch_Write_Stream::Result + &result) +{ + ACE_DEBUG ((LM_DEBUG, "handle_write_stream called\n")); + + ACE_DEBUG ((LM_DEBUG, "********************\n")); + ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_to_write", + result.bytes_to_write ())); + ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "handle", result.handle ())); + ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_transfered", + result.bytes_transferred ())); + ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "act", (u_long) result.act ())); + ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "success", result.success ())); + ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "completion_key", (u_long) + result.completion_key ())); + ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "error", result.error ())); + ACE_DEBUG ((LM_DEBUG, "********************\n")); + + result.message_block ().release (); + + if (result.success ()) + { + // This code is not robust enough to deal with short file writes + // (which hardly ever happen) ;-) + //ACE_ASSERT (result.bytes_to_write () == result.bytes_transferred ()); + + if ( duplex == 0 ) + { + initiate_read_stream () ; + } + } + + { + ACE_Guard<MyMutex> locker (m_Mtx) ; + nIOCount-- ; + } + check_destroy () ; +} + +//------------------------------------------------------------------------- +// Sender: sends indefinetely welcome message +// and recieves it back +//------------------------------------------------------------------------ +class Sender : public ACE_Handler +{ +public: + Sender (void); + ~Sender (void); + int open (const ACE_TCHAR *host, u_short port); + void close (); + ACE_HANDLE handle (void) const; + void handle (ACE_HANDLE); + +protected: +// These methods are called by the freamwork + +virtual void handle_read_stream (const ACE_Asynch_Read_Stream::Result +&result); +// This is called when asynchronous reads from the socket complete + +virtual void handle_write_stream (const ACE_Asynch_Write_Stream::Result +&result); +// This is called when asynchronous writes from the socket complete + +private: + +int initiate_read_stream (void); +int initiate_write_stream (void); + +ACE_SOCK_Stream stream_; +// Network I/O handle + +ACE_Asynch_Write_Stream ws_; +// ws (write stream): for writing to the socket + +ACE_Asynch_Read_Stream rs_; +// rs (read file): for reading from the socket + +ACE_Message_Block welcome_message_; +// Welcome message + +MyMutex m_Mtx ; +long nIOCount ; +}; + +static char *data = "Welcome to Irfan World! Irfan RULES here !!\n"; + +Sender::Sender (void) + :nIOCount ( 0 ) +{ + // Moment of inspiration... :-) + this->welcome_message_.init (data, ACE_OS::strlen (data)); +} + +Sender::~Sender (void) +{ + close (); +} + +void Sender::close () +{ + this->stream_.close (); +} + +ACE_HANDLE Sender::handle (void) const +{ + return this->stream_.get_handle (); +} + +void Sender::handle (ACE_HANDLE handle) +{ + this->stream_.set_handle (handle); +} + +int Sender::open (const ACE_TCHAR *host, u_short port) +{ + // Initialize stuff + // Connect to remote host + ACE_INET_Addr address (port, host); + ACE_SOCK_Connector connector; + + if (connector.connect (this->stream_, + address) == -1) + { + ACE_ERROR_RETURN ((LM_ERROR, + "%p\n", + "ACE_SOCK_Connector::connect"), + -1); + } + + // Open ACE_Asynch_Write_Stream + if (this->ws_.open (*this) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + "%p\n", + "ACE_Asynch_Write_Stream::open"), + -1); + + // Open ACE_Asynch_Read_Stream + if (this->rs_.open (*this) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + "%p\n", + "ACE_Asynch_Read_File::open"), + -1); + + // Start an asynchronous transmit file + if ( this->initiate_write_stream () == -1) + return -1; + + if ( duplex != 0 ) + { + // Start an asynchronous read file + if (this->initiate_read_stream () == -1) + return -1; + } + + return 0; +} + +int Sender::initiate_write_stream (void) +{ + ACE_Guard<MyMutex> locker (m_Mtx) ; + + + welcome_message_.rd_ptr( welcome_message_.base ()); + welcome_message_.wr_ptr( welcome_message_.base ()); + welcome_message_.wr_ptr (ACE_OS::strlen (data)); + + if (this->ws_.write (welcome_message_, + welcome_message_.length () + ) == -1) + { + ACE_ERROR_RETURN((LM_ERROR, + "%p\n", + "ACE_Asynch_Write_File::write"), + -1); + } + + nIOCount++ ; + return 0; +} + +int Sender::initiate_read_stream (void) +{ + ACE_Guard<MyMutex> locker (m_Mtx) ; + + // Create a new <Message_Block>. Note that this message block will + // be used both to <read> data asynchronously from the socket and to + // <write> data asynchronously to the file. + ACE_DEBUG ((LM_DEBUG, + "initiate_read_stream called\n")); + + + ACE_Message_Block *mb = 0; + ACE_NEW_RETURN (mb, + ACE_Message_Block (BUFSIZ + 1), + -1); + + // Inititiate read + if (this->rs_.read (*mb, mb->size ()- 1) == -1) + { + mb->release () ; + ACE_ERROR_RETURN ((LM_ERROR, + "%p\n", + "ACE_Asynch_Read_Stream::read"), + -1); + } + + nIOCount++ ; + return 0; +} + + +void Sender::handle_write_stream (const ACE_Asynch_Write_Stream::Result + &result) +{ + ACE_DEBUG ((LM_DEBUG, + "handle_write_stream called\n")); + + // Reset pointers. + result.message_block ().rd_ptr (result.message_block ().rd_ptr () - + result.bytes_transferred ()); + + + ACE_DEBUG ((LM_DEBUG, "********************\n")); + ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_to_write", + result.bytes_to_write ())); + ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "handle", result.handle ())); + ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_transfered", + result.bytes_transferred ())); + ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "act", (u_long) result.act ())); + ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "success", result.success ())); + ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "completion_key", (u_long) + result.completion_key ())); + ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "error", result.error ())); + ACE_DEBUG ((LM_DEBUG, "********************\n")); + ACE_DEBUG ((LM_DEBUG, "%s = %s\n", "message_block", + result.message_block ().rd_ptr ())); + + // Simplify just for Test + if (result.success () && result.bytes_transferred () != 0) + { + if ( duplex != 0 ) // full duplex, continue write + { + initiate_write_stream () ; + } + else // half-duplex read reply, after read we will start + // write + { + initiate_read_stream () ; + } + } + + { + ACE_Guard<MyMutex> locker (m_Mtx) ; + nIOCount-- ; + } +} + +void +Sender::handle_read_stream (const ACE_Asynch_Read_Stream::Result &result) +{ + ACE_DEBUG ((LM_DEBUG, + "handle_read_stream called\n")); + + // Reset pointers. + result.message_block ().rd_ptr ()[result.bytes_transferred ()] = + '\0'; + + ACE_DEBUG ((LM_DEBUG, "********************\n")); + ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_to_read", result.bytes_to_read + ())); + ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "handle", result.handle ())); + ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_transfered", + result.bytes_transferred ())); + ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "act", (u_long) result.act ())); + ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "success", result.success ())); + ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "completion_key", (u_long) + result.completion_key ())); + ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "error", result.error ())); + ACE_DEBUG ((LM_DEBUG, "********************\n")); + ACE_DEBUG ((LM_DEBUG, "%s = %s\n", "message_block", + result.message_block ().rd_ptr ())); + + result.message_block().release (); + + if ( result.success () && result.bytes_transferred () != 0) + { + // Successful read: write the data to the file asynchronously. + // Note how we reuse the <ACE_Message_Block> for the writing. + // Therefore, we do not delete this buffer because it is handled + // in <handle_write_stream>. + + if ( duplex != 0 ) // full duplex, continue read + { + initiate_read_stream () ; + } + else // half-duplex writey, after write we will start read + { + initiate_write_stream () ; + } + } + + { + ACE_Guard<MyMutex> locker (m_Mtx) ; + nIOCount-- ; + } +} + +//-------------------------------------------------------------------------- + +static int +parse_args (int argc, ACE_TCHAR *argv[]) +{ + ACE_Get_Opt get_opt (argc, argv, ACE_TEXT("n:p:d:h:")); + int c; + + while ((c = get_opt ()) != EOF) + switch (c) + { + case 'h': + host = get_opt.opt_arg (); + break; + case 'n': + nThreads = ACE_OS::atoi (get_opt.opt_arg ()) ; + break; + case 'p': + port = ACE_OS::atoi (get_opt.opt_arg ()); + break; + case 'd': + duplex = ACE_OS::atoi (get_opt.opt_arg ()); + break; + default: + ACE_ERROR ((LM_ERROR, "%p.\n", + "usage :\n" + "-h <host> for Sender mode\n" + "-d <duplex mode 1-on/0-off>\n" + "-p <port to listen/connect>\n" + "-n <number threads for Proactor pool>\n")); + return -1; + } + + return 0; +} + +int +ACE_TMAIN (int argc, ACE_TCHAR *argv[]) +{ + ACE_UNUSED_ARG (initial_read_size); + + if (parse_args (argc, argv) == -1) + return -1; + +#if defined (ACE_WIN32) && !defined (ACE_HAS_WINCE) + + ACE_WIN32_Proactor * pImpl = new ACE_WIN32_Proactor; + +#elif defined (ACE_HAS_AIO_CALLS) + + // ACE_POSIX_AIOCB_Proactor * pImpl = new ACE_POSIX_AIOCB_Proactor; + ACE_POSIX_SIG_Proactor * pImpl = new ACE_POSIX_SIG_Proactor; +#endif + + ACE_Proactor Proactor ( pImpl ,1 ); + + ACE_Proactor::instance( & Proactor ); + + + MyTask Task1 ; + + if (Task1.activate (THR_NEW_LWP, nThreads ) == -1) + { + ACE_ERROR_RETURN ((LM_ERROR, "%p.\n", "main"), -1); + } + + Sender sender; + ACE_Asynch_Acceptor<Receiver> acceptor; + + int Rc = -1 ; + + if ( host == NULL ) // Acceptor + { + // Simplify , initial read with zero size + Rc = acceptor.open (ACE_INET_Addr (port),0,1); + + } + else + { + Rc = sender.open (host, port); + } + + if ( Rc == 0 ) + { + char c ; + cout << "Press any key to stop and exit=>\n" << flush ; + cin.clear (); + cin >> c ; + } + + ACE_Proactor::end_event_loop () ; + + if ( host != NULL ) // we are sender + { + sender.close () ; // disconnect to get reciever error !!! + } + + + ACE_Thread_Manager * pTM = ACE_Thread_Manager::instance(); + + pTM->wait_task ( & Task1 ) ; + + ACE_Proactor::instance( ( ACE_Proactor* )NULL ); + + return 0; +} +//-------------------------------------------------------------------- +// +//-------------------------------------------------------------------- +int DisableSignal ( int SigNum ) +{ + +#ifndef ACE_WIN32 + sigset_t signal_set; + if ( sigemptyset (&signal_set) == - 1 ) + { + ACE_ERROR ((LM_ERROR, + "Error:(%P | %t):%p\n", + "sigemptyset failed")); + } + + sigaddset (&signal_set, SigNum); + + // Put the <signal_set>. + if (ACE_OS::pthread_sigmask (SIG_BLOCK, &signal_set, 0) != 0) + { + ACE_ERROR ((LM_ERROR, + "Error:(%P | %t):%p\n", + "pthread_sigmask failed")); + } +#else + ACE_UNUSED_ARG(SigNum); +#endif + + return 1; +} +//-------------------------------------------------------------------- +// Get the <signal_set> back from the OS. +//-------------------------------------------------------------------- + +int PrintSigMask () +{ +#ifndef ACE_WIN32 + + sigset_t mask ; + int member = 0; + + COUT ( "\n=============Signal Mask==========" ) + + if (ACE_OS::pthread_sigmask (SIG_SETMASK, 0, & mask ) != 0) + { + ACE_ERROR ((LM_ERROR, + "Error:(%P | %t):%p\n", + "ACE_OS::pthread_sigmask failed")); + } + else for (int i = 1 ; i < 1000; i++) + { + member = sigismember (&mask,i); + + COUT ( "\nSig " ) + COUT ( i ) + COUT ( " is " ) + COUT (member ) + + if (member == -1) + { + break ; + } + } +#endif + return 0; +} + +#endif /* ACE_WIN32 && !ACE_HAS_WINCE || ACE_HAS_AIO_CALLS*/ diff --git a/ACE/examples/Reactor/Proactor/test_proactor3.cpp b/ACE/examples/Reactor/Proactor/test_proactor3.cpp new file mode 100644 index 00000000000..c47468276c8 --- /dev/null +++ b/ACE/examples/Reactor/Proactor/test_proactor3.cpp @@ -0,0 +1,864 @@ +// $Id$ + +// ============================================================================ +// +// = LIBRARY +// examples +// +// = FILENAME +// test_proactor3.cpp +// +// = DESCRIPTION +// This program illustrates how the <ACE_Proactor> can be used to +// implement an application that does various asynchronous +// operations. +// +// = AUTHOR +// Irfan Pyarali <irfan@cs.wustl.edu> +// modified by Alexander Libman <alibman@baltimore.com> +// from original test_proactor.cpp +// ============================================================================ + +#include "ace/Signal.h" + +#include "ace/Service_Config.h" +#include "ace/Proactor.h" +#include "ace/Asynch_IO.h" +#include "ace/Asynch_IO_Impl.h" +#include "ace/Asynch_Acceptor.h" +#include "ace/INET_Addr.h" +#include "ace/SOCK_Connector.h" +#include "ace/SOCK_Acceptor.h" +#include "ace/SOCK_Stream.h" +#include "ace/Message_Block.h" +#include "ace/Get_Opt.h" + +// FUZZ: disable check_for_streams_include +#include "ace/streams.h" + +#include "ace/Task.h" + +ACE_RCSID(Proactor, test_proactor, "test_proactor.cpp,v 1.27 2000/03/07 17:15:56 schmidt Exp") + +#if ((defined (ACE_WIN32) && !defined (ACE_HAS_WINCE)) || (defined (ACE_HAS_AIO_CALLS))) + // This only works on Win32 platforms and on Unix platforms + // supporting POSIX aio calls. + +#if defined (ACE_WIN32) && !defined (ACE_HAS_WINCE) + +# include "ace/WIN32_Proactor.h" + +#elif defined (ACE_HAS_AIO_CALLS) + +# include "ace/POSIX_Proactor.h" +# include "ace/SUN_Proactor.h" + +#endif /* defined (ACE_WIN32) && !defined (ACE_HAS_WINCE) */ + +// Some debug helper functions +static int disable_signal (int sigmin, int sigmax); +#if 0 +static int print_sigmask (void); +#endif + +#define COUT(X) cout << X; cout.flush (); + +// Proactor Type (UNIX only, Win32 ignored) 0-default, 1 -AIOCB, +// 2-SIG, 3-SUN +static int proactor_type = 0; + +// POSIX : > 0 max number aio operations proactor, +static int max_aio_operations = 0; + +// Host that we're connecting to. +static ACE_TCHAR *host = 0; + +// number of Senders instances +static int senders = 1; +static const int MaxSenders = 100; + +// duplex mode: ==0 half-duplex +// !=0 full duplex +static int duplex = 0; + +// number threads in the Proactor thread pool +static int threads = 1; + +// Port that we're receiving connections on. +static u_short port = ACE_DEFAULT_SERVER_PORT; + +class MyTask: public ACE_Task<ACE_MT_SYNCH> +{ + // = TITLE + // MyTask plays role for Proactor threads pool +public: + MyTask (void) : threads_ (0), proactor_ (0) {} + + int svc (void); + void waitready (void) { event_.wait (); } + +private: + ACE_Recursive_Thread_Mutex mutex_; + int threads_; + ACE_Proactor *proactor_; + ACE_Manual_Event event_; + + void create_proactor (void); + void delete_proactor (void); +}; + +void +MyTask::create_proactor (void) +{ + ACE_Guard<ACE_Recursive_Thread_Mutex> locker (mutex_); + + if (threads_ == 0) + { +#if defined (ACE_WIN32) && !defined (ACE_HAS_WINCE) + ACE_WIN32_Proactor *proactor = new ACE_WIN32_Proactor; + ACE_DEBUG ((LM_DEBUG,"(%t) Create Proactor Type=WIN32")); + +#elif defined (ACE_HAS_AIO_CALLS) + + ACE_POSIX_Proactor *proactor = 0; + + switch (proactor_type) + { + case 1: proactor = new ACE_POSIX_AIOCB_Proactor (max_aio_operations); + ACE_DEBUG ((LM_DEBUG,"(%t) Create Proactor Type=AIOCB\n")); + break; + case 2: proactor = new ACE_POSIX_SIG_Proactor; + ACE_DEBUG ((LM_DEBUG,"(%t) Create Proactor Type=SIG\n")); + break; +# if defined (sun) + case 3: proactor = new ACE_SUN_Proactor (max_aio_operations); + ACE_DEBUG ((LM_DEBUG,"(%t) Create Proactor Type=SUN\n")); + break; +# endif /* sun */ + default:proactor = new ACE_POSIX_SIG_Proactor; + ACE_DEBUG ((LM_DEBUG,"(%t) Create Proactor Type=SIG\n")); + break; + } +#endif + + proactor_ = new ACE_Proactor (proactor, 1); + + ACE_Proactor::instance(proactor_); + event_.signal (); + } + + threads_++; +} + +void +MyTask::delete_proactor (void) +{ + ACE_Guard<ACE_Recursive_Thread_Mutex> locker (mutex_); + if (--threads_ == 0) + { + ACE_DEBUG ((LM_DEBUG, "(%t) Delete Proactor\n")); + ACE_Proactor::instance ((ACE_Proactor *) 0); + delete proactor_; + proactor_ = 0; + } +} + +int +MyTask::svc (void) +{ + ACE_DEBUG ((LM_DEBUG, "(%t) MyTask started\n")); + + create_proactor (); + disable_signal (ACE_SIGRTMIN, ACE_SIGRTMAX); + + while (ACE_Proactor::event_loop_done () == 0) + ACE_Proactor::run_event_loop (); + + delete_proactor (); + + ACE_DEBUG ((LM_DEBUG, "(%t) MyTask finished\n")); + return 0; +} + +class Receiver : public ACE_Service_Handler +{ +public: + + Receiver (void); + ~Receiver (void); + + virtual void open (ACE_HANDLE handle, + ACE_Message_Block &message_block); + // This is called after the new connection has been accepted. + + static long get_number_sessions (void) { return sessions_; } + +protected: + // These methods are called by the framework + + virtual void handle_read_stream (const ACE_Asynch_Read_Stream::Result &result); + // This is called when asynchronous <read> operation from the socket + // complete. + + virtual void handle_write_stream (const ACE_Asynch_Write_Stream::Result &result); + // This is called when an asynchronous <write> to the file + // completes. + +private: + int initiate_read_stream (void); + int initiate_write_stream (ACE_Message_Block & mb, int nBytes); + int check_destroy (void); + + ACE_Asynch_Read_Stream rs_; + ACE_Asynch_Write_Stream ws_; + ACE_HANDLE handle_; + ACE_Recursive_Thread_Mutex mutex_; + long io_count_; + static long sessions_; +}; + +long Receiver::sessions_ = 0; + +Receiver::Receiver (void) + : handle_ (ACE_INVALID_HANDLE), + io_count_ (0) +{ + ACE_Guard<ACE_Recursive_Thread_Mutex> locker (mutex_); + sessions_++; + ACE_DEBUG ((LM_DEBUG, "Receiver Ctor sessions_=%d\n", sessions_)); +} + +Receiver::~Receiver (void) +{ + ACE_Guard<ACE_Recursive_Thread_Mutex> locker (mutex_); + sessions_--; + ACE_OS::closesocket (this->handle_); + ACE_DEBUG ((LM_DEBUG, "~Receiver Dtor sessions_=%d\n", sessions_)); +} + +// return true if we alive, false we commited suicide +int +Receiver::check_destroy (void) +{ + { + ACE_Guard<ACE_Recursive_Thread_Mutex> locker (mutex_); + + if (io_count_ > 0) + return 1; + } + + delete this; + return 0; +} + +void +Receiver::open (ACE_HANDLE handle, + ACE_Message_Block &) +{ + ACE_DEBUG ((LM_DEBUG, + "%N:%l:Receiver::open called\n")); + + this->handle_ = handle; + + if (this->ws_.open (*this, this->handle_) == -1) + ACE_ERROR ((LM_ERROR, + "%p\n", + "ACE_Asynch_Write_Stream::open")); + else if (this->rs_.open (*this, this->handle_) == -1) + ACE_ERROR ((LM_ERROR, + "%p\n", + "ACE_Asynch_Read_Stream::open")); + else + initiate_read_stream (); + + check_destroy (); +} + +int +Receiver::initiate_read_stream (void) +{ + ACE_Guard<ACE_Recursive_Thread_Mutex> locker (mutex_); + + ACE_Message_Block *mb = 0; + ACE_NEW_RETURN (mb, + ACE_Message_Block (BUFSIZ + 1), + -1); + + // Inititiate read + if (this->rs_.read (*mb, mb->size ()- 1) == -1) + { + mb->release (); + ACE_ERROR_RETURN ((LM_ERROR, + "%p\n", + "ACE_Asynch_Read_Stream::read"), + -1); + } + + io_count_++; + return 0; +} + +int +Receiver::initiate_write_stream (ACE_Message_Block &mb, int nbytes) +{ + ACE_Guard<ACE_Recursive_Thread_Mutex> locker (mutex_); + if (nbytes <= 0) + { + mb.release (); + ACE_ERROR_RETURN((LM_ERROR, + "ACE_Asynch_Write_Stream::write nbytes <0 "), + -1); + } + + if (this->ws_.write (mb, nbytes) == -1) + { + mb.release (); + ACE_ERROR_RETURN((LM_ERROR, + "%p\n", + "ACE_Asynch_Write_Stream::write"), + -1); + } + + io_count_++; + return 0; +} + +void +Receiver::handle_read_stream (const ACE_Asynch_Read_Stream::Result &result) +{ + // Reset pointers. + result.message_block ().rd_ptr ()[result.bytes_transferred ()] = '\0'; + + if (result.bytes_transferred () == 0 || result.error () != 0) + { + ACE_DEBUG ((LM_DEBUG, "handle_read_stream called\n")); + ACE_DEBUG ((LM_DEBUG, "********************\n")); + ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_to_read", result.bytes_to_read ())); + ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "handle", result.handle ())); + ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_transfered", result.bytes_transferred ())); + ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "act", (u_long) result.act ())); + ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "success", result.success ())); + ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "completion_key", (u_long) result.completion_key ())); + ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "error", result.error ())); + ACE_DEBUG ((LM_DEBUG, "********************\n")); + ACE_DEBUG ((LM_DEBUG, "%s = %s\n", "message_block", result.message_block ().rd_ptr ())); + } + + if (result.success () && result.bytes_transferred () != 0) + { + // Successful read: write the data to the file asynchronously. + // Note how we reuse the <ACE_Message_Block> for the writing. + // Therefore, we do not delete this buffer because it is handled + // in <handle_write_stream>. + + if(this->initiate_write_stream (result.message_block (), + result.bytes_transferred ()) == 0) + { + if (duplex != 0) + { + // Initiate new read from the stream. + this->initiate_read_stream (); + } + } + } + else + { + result.message_block ().release (); + ACE_DEBUG ((LM_DEBUG, "Receiver completed\n")); + } + + { + ACE_Guard<ACE_Recursive_Thread_Mutex> locker (mutex_); + io_count_--; + } + check_destroy (); +} + +void +Receiver::handle_write_stream (const ACE_Asynch_Write_Stream::Result &result) +{ + if (result.bytes_transferred () == 0 || result.error () != 0) + { + ACE_DEBUG ((LM_DEBUG, "handle_write_stream called\n")); + + ACE_DEBUG ((LM_DEBUG, "********************\n")); + ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_to_write", result.bytes_to_write ())); + ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "handle", result.handle ())); + ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_transfered", result.bytes_transferred ())); + ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "act", (u_long) result.act ())); + ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "success", result.success ())); + ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "completion_key", (u_long) result.completion_key ())); + ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "error", result.error ())); + ACE_DEBUG ((LM_DEBUG, "********************\n")); + } + + result.message_block ().release (); + + if (result.success () && result.bytes_transferred () != 0) + { + // This code is not robust enough to deal with short file writes + // (which hardly ever happen);-) + // ACE_ASSERT (result.bytes_to_write () == result.bytes_transferred ()); + + if (duplex == 0) + initiate_read_stream (); + } + + { + ACE_Guard<ACE_Recursive_Thread_Mutex> locker (mutex_); + io_count_--; + } + check_destroy (); +} + +class Sender : public ACE_Handler +{ + // = TITLE + // Sends welcome messages receives them back. +public: + Sender (void); + ~Sender (void); + int open (const ACE_TCHAR *host, u_short port); + void close (void); + ACE_HANDLE handle (void) const; + +protected: + // These methods are called by the freamwork + + virtual void handle_read_stream (const ACE_Asynch_Read_Stream::Result &result); + // This is called when asynchronous reads from the socket complete + + virtual void handle_write_stream (const ACE_Asynch_Write_Stream::Result &result); + // This is called when asynchronous writes from the socket complete + +private: + + int initiate_read_stream (void); + int initiate_write_stream (void); + + ACE_SOCK_Stream stream_; + // Network I/O handle + + ACE_Asynch_Write_Stream ws_; + // ws (write stream): for writing to the socket + + ACE_Asynch_Read_Stream rs_; + // rs (read file): for reading from the socket + + ACE_Message_Block welcome_message_; + // Welcome message + + ACE_Recursive_Thread_Mutex mutex_; + long io_count_; +}; + +static char *data = "Welcome to Irfan World! Irfan RULES here !!\n"; + +Sender::Sender (void) + : io_count_ (0) +{ + // Moment of inspiration... :-) + this->welcome_message_.init (data, ACE_OS::strlen (data)); +} + +Sender::~Sender (void) +{ + close (); +} + +void Sender::close (void) +{ + this->stream_.close (); +} + +ACE_HANDLE Sender::handle (void) const +{ + return this->stream_.get_handle (); +} + +int Sender::open (const ACE_TCHAR *host, u_short port) +{ + // Initialize stuff + // Connect to remote host + ACE_INET_Addr address (port, host); + ACE_SOCK_Connector connector; + + if (connector.connect (this->stream_, + address) == -1) + { + ACE_ERROR_RETURN ((LM_ERROR, + "%p\n", + "ACE_SOCK_Connector::connect"), + -1); + } + + // Open ACE_Asynch_Write_Stream + if (this->ws_.open (*this) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + "%p\n", + "ACE_Asynch_Write_Stream::open"), + -1); + + // Open ACE_Asynch_Read_Stream + if (this->rs_.open (*this) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + "%p\n", + "ACE_Asynch_Read_Stream::open"), + -1); + + // Start an asynchronous transmit file + if (this->initiate_write_stream () == -1) + return -1; + + if (duplex != 0) + // Start an asynchronous read file + if (this->initiate_read_stream () == -1) + return -1; + + return 0; +} + +int +Sender::initiate_write_stream (void) +{ + ACE_Guard<ACE_Recursive_Thread_Mutex> locker (mutex_); + + welcome_message_.rd_ptr(welcome_message_.base ()); + welcome_message_.wr_ptr(welcome_message_.base ()); + welcome_message_.wr_ptr (ACE_OS::strlen (data)); + + if (this->ws_.write (welcome_message_, + welcome_message_.length ()) == -1) + ACE_ERROR_RETURN((LM_ERROR, + "%p\n", + "ACE_Asynch_Write_Stream::write"), + -1); + io_count_++; + return 0; +} + +int +Sender::initiate_read_stream (void) +{ + ACE_Guard<ACE_Recursive_Thread_Mutex> locker (mutex_); + + // Create a new <Message_Block>. Note that this message block will + // be used both to <read> data asynchronously from the socket and to + // <write> data asynchronously to the file. + ACE_DEBUG ((LM_DEBUG, + "initiate_read_stream called\n")); + + ACE_Message_Block *mb = 0; + ACE_NEW_RETURN (mb, + ACE_Message_Block (BUFSIZ + 1), + -1); + + // Inititiate read + if (this->rs_.read (*mb, mb->size ()- 1) == -1) + { + mb->release (); + ACE_ERROR_RETURN ((LM_ERROR, + "%p\n", + "ACE_Asynch_Read_Stream::read"), + -1); + } + + io_count_++; + return 0; +} + +void +Sender::handle_write_stream (const ACE_Asynch_Write_Stream::Result &result) +{ + if (result.bytes_transferred () == 0 || result.error () != 0) + { + ACE_DEBUG ((LM_DEBUG, "handle_write_stream called\n")); + + // Reset pointers. + result.message_block ().rd_ptr (result.message_block ().rd_ptr () - result.bytes_transferred ()); + + ACE_DEBUG ((LM_DEBUG, "********************\n")); + ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_to_write", result.bytes_to_write ())); + ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "handle", result.handle ())); + ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_transfered", result.bytes_transferred ())); + ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "act", (u_long) result.act ())); + ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "success", result.success ())); + ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "completion_key", (u_long) result.completion_key ())); + ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "error", result.error ())); + ACE_DEBUG ((LM_DEBUG, "********************\n")); + ACE_DEBUG ((LM_DEBUG, "%s = %s\n", "message_block", result.message_block ().rd_ptr ())); + } + + // Simplify just for Test + if (result.success () && result.bytes_transferred () != 0) + { + if (duplex != 0) // full duplex, continue write + initiate_write_stream (); + else // half-duplex read reply, after read we will start write + initiate_read_stream (); + } + + { + ACE_Guard<ACE_Recursive_Thread_Mutex> locker (mutex_); + io_count_--; + } +} + +void +Sender::handle_read_stream (const ACE_Asynch_Read_Stream::Result &result) +{ + if (result.bytes_transferred () == 0 || result.error () != 0) + { + ACE_DEBUG ((LM_DEBUG, + "handle_read_stream called\n")); + + // Reset pointers. + result.message_block ().rd_ptr ()[result.bytes_transferred ()] = '\0'; + + ACE_DEBUG ((LM_DEBUG, "********************\n")); + ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_to_read", result.bytes_to_read ())); + ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "handle", result.handle ())); + ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_transfered", result.bytes_transferred ())); + ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "act", (u_long) result.act ())); + ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "success", result.success ())); + ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "completion_key", (u_long) result.completion_key ())); + ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "error", result.error ())); + ACE_DEBUG ((LM_DEBUG, "********************\n")); + ACE_DEBUG ((LM_DEBUG, "%s = %s\n", "message_block", result.message_block ().rd_ptr ())); + } + + result.message_block().release (); + + if (result.success () && result.bytes_transferred () != 0) + { + // Successful read: write the data to the file asynchronously. + // Note how we reuse the <ACE_Message_Block> for the writing. + // Therefore, we do not delete this buffer because it is handled + // in <handle_write_stream>. + + if (duplex != 0) // full duplex, continue read + initiate_read_stream (); + else // half-duplex writey, after write we will start read + initiate_write_stream (); + } + + { + ACE_Guard<ACE_Recursive_Thread_Mutex> locker (mutex_); + io_count_--; + } +} + +static int +set_proactor_type (const char *ptype) +{ + if (!ptype) + return false; + + switch (toupper (*ptype)) + { + case 'D' : proactor_type = 0; return true; + case 'A' : proactor_type = 1; return true; + case 'I' : proactor_type = 2; return true; +#if defined (sun) + case 'S' : proactor_type = 3; return true; +#endif /* sun */ + } + return false; +} + +static int +parse_args (int argc, ACE_TCHAR *argv[]) +{ + ACE_Get_Opt get_opt (argc, argv, ACE_TEXT("t:o:n:p:d:h:s:u")); + int c; + + while ((c = get_opt ()) != EOF) + switch (c) + { + case 'd': // duplex + duplex = ACE_OS::atoi (get_opt.opt_arg ()); + break; + case 'h': // host for sender + host = get_opt.opt_arg (); + break; + case 'p': // port number + port = ACE_OS::atoi (get_opt.opt_arg ()); + break; + case 'n': // thread pool size + threads = ACE_OS::atoi (get_opt.opt_arg ()); + break; + case 's': // number of senders + senders = ACE_OS::atoi (get_opt.opt_arg ()); + if (senders > MaxSenders) + senders = MaxSenders; + break; + case 'o': // max number of aio for proactor + max_aio_operations = ACE_OS::atoi (get_opt.opt_arg ()); + break; + case 't': // Proactor Type + if (set_proactor_type (get_opt.opt_arg ())) + break; + case 'u': + default: + ACE_ERROR ((LM_ERROR, "%p.", + "\nusage:" + "\n-o <max number of started aio operations for Proactor>" + "\n-t <Proactor type> UNIX-only, Win32-default always:" + "\n a AIOCB" + "\n i SIG" + "\n s SUN" + "\n d default" + "\n-d <duplex mode 1-on/0-off>" + "\n-h <host> for Sender mode" + "\n-n <number threads for Proactor pool>" + "\n-p <port to listen/connect>" + "\n-s <number of sender's instances>" + "\n-u show this message" + "\n")); + + return -1; + } + + return 0; +} + +int +ACE_TMAIN (int argc, ACE_TCHAR *argv[]) +{ +#if defined (sun) + ACE_DEBUG ((LM_DEBUG, "\nSUN defined!\n")); +#endif + if (parse_args (argc, argv) == -1) + return -1; + + disable_signal (ACE_SIGRTMIN, ACE_SIGRTMAX); + + MyTask task1; + + if (task1.activate (THR_NEW_LWP, threads) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + "%p.\n", + "main"), + -1); + + // wait for creation of Proactor + task1.waitready (); + + Sender * send_list[MaxSenders]; + + ACE_Asynch_Acceptor<Receiver> acceptor; + + int rc = -1; + int i; + char c; + + if (host == 0) // Acceptor + { + // Simplify, initial read with zero size + if (acceptor.open (ACE_INET_Addr (port),0,1) == 0) + rc = 1; + } + else + { + for (i = 0; i < senders; ++i) + send_list[i] = new Sender; + + for (i = 0; i < senders; ++i) + if (send_list[i]->open (host, port) == 0) + rc++; + } + + if (rc > 0) + { + cout << "Press any key to stop=>" << flush; + cin.clear (); + cin >> c; + } + + ACE_Proactor::end_event_loop (); + + if (host != 0) // we are sender + { + for (i = 0; i < senders; ++i) + send_list[i]->close (); + } + + + ACE_Thread_Manager *tm = + ACE_Thread_Manager::instance(); + + tm->wait_task (&task1); + + cout << "\nNumber of Receivers objects=" + << Receiver::get_number_sessions () + << flush; + + for (i = 0; i < senders; ++i) + { + delete (send_list[i]); + send_list[i] = 0; + } + + return 0; +} + +static int +disable_signal (int sigmin, int sigmax) +{ +#ifndef ACE_WIN32 + + sigset_t signal_set; + if (sigemptyset (&signal_set) == - 1) + ACE_ERROR ((LM_ERROR, + "Error:(%P | %t):%p\n", + "sigemptyset failed")); + + for (int i = sigmin; i <= sigmax; i++) + sigaddset (&signal_set, i); + + // Put the <signal_set>. + if (ACE_OS::pthread_sigmask (SIG_BLOCK, &signal_set, 0) != 0) + ACE_ERROR ((LM_ERROR, + "Error:(%P | %t):%p\n", + "pthread_sigmask failed")); +#endif /* ACE_WIN32 */ + + return 1; +} + +// Get the <signal_set> back from the OS. + +#if 0 +static int +print_sigmask (void) +{ +#ifndef ACE_WIN32 + sigset_t mask; + int member = 0; + + COUT ("\n=============Signal Mask==========") + + if (ACE_OS::pthread_sigmask (SIG_SETMASK, 0, & mask) != 0) + ACE_ERROR ((LM_ERROR, + "Error:(%P | %t):%p\n", + "ACE_OS::pthread_sigmask failed")); + else + for (int i = 1; i < 1000; i++) + { + member = sigismember (&mask,i); + + COUT ("\nSig ") + COUT (i) + COUT (" is ") + COUT (member) + + if (member == -1) + break; + } + +#endif /* ACE_WIN32 */ + return 0; +} +#endif /* 0 */ + +#endif /* ACE_WIN32 && !ACE_HAS_WINCE || ACE_HAS_AIO_CALLS*/ diff --git a/ACE/examples/Reactor/Proactor/test_timeout.cpp b/ACE/examples/Reactor/Proactor/test_timeout.cpp new file mode 100644 index 00000000000..39351717db9 --- /dev/null +++ b/ACE/examples/Reactor/Proactor/test_timeout.cpp @@ -0,0 +1,130 @@ +// $Id: test_timeout.cpp + +// ============================================================================ +// +// = LIBRARY +// examples +// +// = FILENAME +// test_timeout.cpp +// +// = DESCRIPTION +// +// This example application shows how to write event loops that +// handle events for some fixed amount of time. Note that any +// thread in the Proactor thread pool can call back the handler. On +// POSIX4 systems, this test works only with POSIX_SIG_Proactor, +// which can work with multiple threads. +// +// = AUTHOR +// Irfan Pyarali and Alexander Babu Arulanthu +// +// ============================================================================ + +#include "ace/Proactor.h" +#include "ace/Task.h" +#include "ace/Atomic_Op.h" +#include "ace/OS_NS_sys_time.h" +#include "ace/OS_NS_unistd.h" +#include "ace/OS_main.h" + +ACE_RCSID(Proactor, test_timeout, "$Id$") + +#if ((defined (ACE_WIN32) && !defined (ACE_HAS_WINCE)) || \ + (defined (ACE_HAS_AIO_CALLS)) && !defined (ACE_POSIX_AIOCB_PROACTOR)) + // This only works on Win32 platforms and on Unix platforms supporting + // POSIX aio calls. + +class Timeout_Handler : public ACE_Handler +{ + // = TITLE + // Generic timeout handler. +public: + Timeout_Handler (void) + : start_time_ (ACE_OS::gettimeofday ()) + { + } + + virtual void handle_time_out (const ACE_Time_Value &tv, + const void *arg) + { + // Print out when timeouts occur. + ACE_DEBUG ((LM_DEBUG, "(%t) %d timeout occurred for %s @ %d.\n", + ++count_, + (char *) arg, + (tv - this->start_time_).sec ())); + + // Sleep for a while + ACE_OS::sleep (4); + } + +private: + ACE_Atomic_Op <ACE_SYNCH_MUTEX, int> count_; + // Number of the timer event. + + ACE_Time_Value start_time_; + // Starting time of the test. +}; + +class Worker : public ACE_Task <ACE_NULL_SYNCH> +{ +public: + int svc (void) + { + // Handle events for 13 seconds. + ACE_Time_Value run_time (13); + + ACE_DEBUG ((LM_DEBUG, "(%t):Starting svc routine\n")); + + if (ACE_Proactor::run_event_loop(run_time) == -1) + ACE_ERROR_RETURN ((LM_ERROR, "(%t):%p.\n", "Worker::svc"), -1); + + ACE_DEBUG ((LM_DEBUG, "(%t) work complete\n")); + + return 0; + } +}; + +int +ACE_TMAIN (int, ACE_TCHAR *[]) +{ + Timeout_Handler handler; + + // Register a 2 second timer. + ACE_Time_Value foo_tv (2); + if (ACE_Proactor::instance ()->schedule_timer (handler, + (void *) "Foo", + ACE_Time_Value::zero, + foo_tv) == -1) + ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "schedule_timer"), -1); + + // Register a 3 second timer. + ACE_Time_Value bar_tv (3); + if (ACE_Proactor::instance ()->schedule_timer (handler, + (void *) "Bar", + ACE_Time_Value::zero, + bar_tv) == -1) + ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "schedule_timer"), -1); + + Worker worker; + + if (worker.activate (THR_NEW_LWP, 10) == -1) + ACE_ERROR_RETURN ((LM_ERROR, "%p.\n", "main"), -1); + + ACE_Thread_Manager::instance ()->wait (); + + return 0; +} + +#else /* ACE_WIN32 && !ACE_HAS_WINCE || ACE_HAS_AIO_CALLS && !ACE_POSIX_AIOCB_PROACTOR*/ + +int +main (int, char *[]) +{ + ACE_DEBUG ((LM_DEBUG, + "This example is multithreaded version of test_timeout_st.cpp\n" + "This doesnt work on this platform !!!\n")); + return 1; +} + +#endif /* ACE_WIN32 && !ACE_HAS_WINCE || ACE_HAS_AIO_CALLS && !ACE_POSIX_AIOCB_PROACTOR*/ diff --git a/ACE/examples/Reactor/Proactor/test_timeout_st.cpp b/ACE/examples/Reactor/Proactor/test_timeout_st.cpp new file mode 100644 index 00000000000..ae44c2ba1f4 --- /dev/null +++ b/ACE/examples/Reactor/Proactor/test_timeout_st.cpp @@ -0,0 +1,99 @@ +// $Id$ + +// ============================================================================ +// +// = LIBRARY +// examples +// +// = FILENAME +// test_timeout_st.cpp +// +// = DESCRIPTION +// +// This example application shows how to write event loops that +// handle events for some fixed amount of time. This is the single +// threaded version of the test_timeout.cpp application. +// +// = AUTHOR +// Irfan Pyarali and Alexander Babu Arulanthu +// +// ============================================================================ + +#include "ace/Proactor.h" +#include "ace/OS_main.h" + +ACE_RCSID(Proactor, test_timeout, "$Id$") + +#if ((defined (ACE_WIN32) && !defined (ACE_HAS_WINCE)) || (defined (ACE_HAS_AIO_CALLS))) +// This only works on Win32 platforms and on Unix platforms supporting +// POSIX aio calls. + +class Timeout_Handler : public ACE_Handler +{ + // = TITLE + // Generic timeout handler. + +public: + Timeout_Handler (void) + : count_ (0), + start_time_ (ACE_OS::gettimeofday ()) + { + } + + virtual void handle_time_out (const ACE_Time_Value &tv, + const void *arg) + { + // Print out when timeouts occur. + ACE_DEBUG ((LM_DEBUG, "(%t) %d timeout occurred for %s @ %d.\n", + ++count_, + (char *) arg, + (tv - this->start_time_).sec ())); + } + +private: + int count_; + // Sequence number for the timeouts. + + ACE_Time_Value start_time_; + // Starting time of the test. +}; + + +int +ACE_TMAIN (int, ACE_TCHAR *[]) +{ + Timeout_Handler handler; + + // Register a 2 second timer. + ACE_Time_Value foo_tv (2); + if (ACE_Proactor::instance ()->schedule_timer (handler, + (void *) "Foo", + ACE_Time_Value::zero, + foo_tv) == -1) + ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "schedule_timer"), -1); + + // Register a 3 second timer. + ACE_Time_Value bar_tv (3); + if (ACE_Proactor::instance ()->schedule_timer (handler, + (void *) "Bar", + ACE_Time_Value::zero, + bar_tv) == -1) + ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "schedule_timer"), -1); + + // Handle events for 13 seconds. + ACE_Time_Value run_time (13); + + ACE_DEBUG ((LM_DEBUG, "Starting event loop\n")); + + // Run the event loop. + if (ACE_Proactor::run_event_loop(run_time) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + "(%t):%p.\n", "Worker::svc"), + 1); + + ACE_DEBUG ((LM_DEBUG, "Ending event loop\n")); + + return 0; +} + +#endif /* ACE_WIN32 && !ACE_HAS_WINCE || ACE_HAS_AIO_CALLS*/ diff --git a/ACE/examples/Reactor/Proactor/test_udp_proactor.cpp b/ACE/examples/Reactor/Proactor/test_udp_proactor.cpp new file mode 100644 index 00000000000..49d834a2884 --- /dev/null +++ b/ACE/examples/Reactor/Proactor/test_udp_proactor.cpp @@ -0,0 +1,432 @@ +// $Id$ + +// ============================================================================ +// +// = LIBRARY +// examples +// +// = FILENAME +// test_udp_proactor.cpp +// +// = DESCRIPTION +// This program illustrates how the <ACE_Proactor> can be used to +// implement an application that does asynchronous operations using +// datagrams. +// +// = AUTHOR +// Irfan Pyarali <irfan@cs.wustl.edu> and +// Roger Tragin <r.tragin@computer.org> +// +// ============================================================================ + +#include "ace/OS_NS_string.h" +#include "ace/OS_main.h" +#include "ace/Proactor.h" +#include "ace/Asynch_IO.h" +#include "ace/INET_Addr.h" +#include "ace/SOCK_Dgram.h" +#include "ace/Message_Block.h" +#include "ace/Get_Opt.h" +#include "ace/Log_Msg.h" + +ACE_RCSID(Proactor, test_udp_proactor, "test_proactor.cpp,v 1.29 2001/02/02 23:41:16 shuston Exp") + +#if defined (ACE_WIN32) && !defined (ACE_HAS_WINCE) || defined (ACE_HAS_AIO_CALLS) + // This only works on Win32 platforms. + +// Host that we're connecting to. +static ACE_TCHAR *host = 0; + +// Port that we're receiving connections on. +static u_short port = ACE_DEFAULT_SERVER_PORT; + +// Keep track of when we're done. +static int done = 0; + +class Receiver : public ACE_Service_Handler +{ + // = TITLE + // This class will receive data from + // the network connection and dump it to a file. +public: + // = Initialization and termination. + Receiver (void); + ~Receiver (void); + + int open_addr (const ACE_INET_Addr &localAddr); + +protected: + // These methods are called by the framework + + /// This method will be called when an asynchronous read completes on + /// a UDP socket. + virtual void handle_read_dgram (const ACE_Asynch_Read_Dgram::Result &result); + +private: + ACE_SOCK_Dgram sock_dgram_; + + ACE_Asynch_Read_Dgram rd_; + // rd (read dgram): for reading from a UDP socket. + const char* completion_key_; + const char* act_; +}; + +Receiver::Receiver (void) + : completion_key_ ("Receiver Completion Key"), + act_ ("Receiver ACT") +{ +} + +Receiver::~Receiver (void) +{ + sock_dgram_.close (); +} + +int +Receiver::open_addr (const ACE_INET_Addr &localAddr) +{ + ACE_DEBUG ((LM_DEBUG, + "%N:%l:Receiver::open_addr called\n")); + + // Create a local UDP socket to receive datagrams. + if (this->sock_dgram_.open (localAddr) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + "%p\n", + "ACE_SOCK_Dgram::open"), -1); + + // Initialize the asynchronous read. + if (this->rd_.open (*this, + this->sock_dgram_.get_handle (), + this->completion_key_, + ACE_Proactor::instance ()) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + "%p\n", + "ACE_Asynch_Read_Dgram::open"), -1); + + // Create a buffer to read into. We are using scatter/gather to + // read the message header and message body into 2 buffers + + // create a message block to read the message header + ACE_Message_Block* msg = 0; + ACE_NEW_RETURN (msg, ACE_Message_Block (1024), -1); + + // the next line sets the size of the header, even though we + // allocated a the message block of 1k, by setting the size to 20 + // bytes then the first 20 bytes of the reveived datagram will be + // put into this message block. + msg->size (20); // size of header to read is 20 bytes + + // create a message block to read the message body + ACE_Message_Block* body = 0; + ACE_NEW_RETURN (body, ACE_Message_Block (1024), -1); + // The message body will not exceed 1024 bytes, at least not in this test. + + // set body as the cont of msg. This associates the 2 message + // blocks so that a read will fill the first block (which is the + // header) up to size (), and use the cont () block for the rest of + // the data. You can chain up to IOV_MAX message block using this + // method. + msg->cont (body); + + // ok lets do the asynch read + size_t number_of_bytes_recvd = 0; + + int res = rd_.recv (msg, + number_of_bytes_recvd, + 0, + PF_INET, + this->act_); + switch (res) + { + case 0: + // this is a good error. The proactor will call our handler when the + // read has completed. + break; + case 1: + // actually read something, we will handle it in the handler callback + ACE_DEBUG ((LM_DEBUG, "********************\n")); + ACE_DEBUG ((LM_DEBUG, + "%s = %d\n", + "bytes recieved immediately", + number_of_bytes_recvd)); + ACE_DEBUG ((LM_DEBUG, "********************\n")); + res = 0; + break; + case -1: + // Something else went wrong. + ACE_ERROR ((LM_ERROR, + "%p\n", + "ACE_Asynch_Read_Dgram::recv")); + // the handler will not get called in this case so lets clean up our msg + msg->release (); + break; + default: + // Something undocumented really went wrong. + ACE_ERROR ((LM_ERROR, + "%p\n", + "ACE_Asynch_Read_Dgram::recv")); + msg->release (); + break; + } + + return res; +} + +void +Receiver::handle_read_dgram (const ACE_Asynch_Read_Dgram::Result &result) +{ + ACE_DEBUG ((LM_DEBUG, + "handle_read_dgram called\n")); + + ACE_DEBUG ((LM_DEBUG, "********************\n")); + ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_to_read", result.bytes_to_read ())); + ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "handle", result.handle ())); + ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_transfered", result.bytes_transferred ())); + ACE_INET_Addr peerAddr; + result.remote_address (peerAddr); + ACE_DEBUG ((LM_DEBUG, "%s = %s:%d\n", "peer_address", peerAddr.get_host_addr (), peerAddr.get_port_number ())); + ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "flags", result.flags ())); + ACE_DEBUG ((LM_DEBUG, "%s = %s\n", "act", result.act ())); + ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "success", result.success ())); + ACE_DEBUG ((LM_DEBUG, "%s = %s\n", "completion_key", result.completion_key ())); + ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "error", result.error ())); + ACE_DEBUG ((LM_DEBUG, "********************\n")); + + if (result.success () && result.bytes_transferred () != 0) + { + // loop through our message block and print out the contents + for (const ACE_Message_Block* msg = result.message_block (); msg != 0; msg = msg->cont ()) + { // use msg->length () to get the number of bytes written to the message + // block. + ACE_DEBUG ((LM_DEBUG, "Buf=[size=<%d>", msg->length ())); + for (u_long i = 0; i < msg->length (); ++i) + ACE_DEBUG ((LM_DEBUG, + "%c", (msg->rd_ptr ())[i])); + ACE_DEBUG ((LM_DEBUG, "]\n")); + } + } + + ACE_DEBUG ((LM_DEBUG, + "Receiver completed\n")); + + // No need for this message block anymore. + result.message_block ()->release (); + + // Note that we are done with the test. + done++; +} + +class Sender : public ACE_Handler +{ + // = TITLE + // The class will be created by <main>. +public: + Sender (void); + ~Sender (void); + int open (const ACE_TCHAR *host, u_short port); + +protected: + // These methods are called by the freamwork + + virtual void handle_write_dgram (const ACE_Asynch_Write_Dgram::Result &result); + // This is called when asynchronous writes from the dgram socket + // complete + +private: + + ACE_SOCK_Dgram sock_dgram_; + // Network I/O handle + + ACE_Asynch_Write_Dgram wd_; + // wd (write dgram): for writing to the socket + + const char* completion_key_; + const char* act_; +}; + +Sender::Sender (void) + : completion_key_ ("Sender completion key"), + act_ ("Sender ACT") +{ +} + +Sender::~Sender (void) +{ + this->sock_dgram_.close (); +} + +int +Sender::open (const ACE_TCHAR *host, + u_short port) +{ + // Initialize stuff + + if (this->sock_dgram_.open (ACE_INET_Addr::sap_any) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + "%p\n", + "ACE_SOCK_Dgram::open"), -1); + + // Initialize the asynchronous read. + if (this->wd_.open (*this, + this->sock_dgram_.get_handle (), + this->completion_key_, + ACE_Proactor::instance ()) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + "%p\n", + "ACE_Asynch_Write_Dgram::open"), -1); + + // We are using scatter/gather to send the message header and + // message body using 2 buffers + + // create a message block for the message header + ACE_Message_Block* msg = 0; + ACE_NEW_RETURN (msg, ACE_Message_Block (100), -1); + const char raw_msg [] = "To be or not to be."; + // Copy buf into the Message_Block and update the wr_ptr (). + msg->copy (raw_msg, ACE_OS::strlen (raw_msg) + 1); + + // create a message block for the message body + ACE_Message_Block* body = 0; + ACE_NEW_RETURN (body, ACE_Message_Block (100), -1); + ACE_OS::memset (body->wr_ptr (), 'X', 100); + body->wr_ptr (100); // always remember to update the wr_ptr () + + // set body as the cont of msg. This associates the 2 message blocks so + // that a send will send the first block (which is the header) up to + // length (), and use the cont () to get the next block to send. You can + // chain up to IOV_MAX message block using this method. + msg->cont (body); + + // do the asynch send + size_t number_of_bytes_sent = 0; + ACE_INET_Addr serverAddr (port, host); + int res = this->wd_.send (msg, number_of_bytes_sent, 0, serverAddr, this->act_); + + switch (res) + { + case 0: + // this is a good error. The proactor will call our handler when the + // send has completed. + break; + case 1: + // actually sent something, we will handle it in the handler callback + ACE_DEBUG ((LM_DEBUG, "********************\n")); + ACE_DEBUG ((LM_DEBUG, + "%s = %d\n", + "bytes sent immediately", + number_of_bytes_sent)); + ACE_DEBUG ((LM_DEBUG, "********************\n")); + res = 0; + break; + case -1: + // Something else went wrong. + ACE_ERROR ((LM_ERROR, + "%p\n", + "ACE_Asynch_Write_Dgram::recv")); + // the handler will not get called in this case so lets clean up our msg + msg->release (); + break; + default: + // Something undocumented really went wrong. + ACE_ERROR ((LM_ERROR, + "%p\n", + "ACE_Asynch_Write_Dgram::recv")); + msg->release (); + break; + } + return res; +} + +void +Sender::handle_write_dgram (const ACE_Asynch_Write_Dgram::Result &result) +{ + ACE_DEBUG ((LM_DEBUG, + "handle_write_dgram called\n")); + + ACE_DEBUG ((LM_DEBUG, "********************\n")); + ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_to_write", result.bytes_to_write ())); + ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "handle", result.handle ())); + ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_transfered", result.bytes_transferred ())); + ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "flags", result.flags ())); + ACE_DEBUG ((LM_DEBUG, "%s = %s\n", "act", result.act ())); + ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "success", result.success ())); + ACE_DEBUG ((LM_DEBUG, "%s = %s\n", "completion_key", result.completion_key ())); + ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "error", result.error ())); + ACE_DEBUG ((LM_DEBUG, "********************\n")); + + + ACE_DEBUG ((LM_DEBUG, + "Sender completed\n")); + + // No need for this message block anymore. + result.message_block ()->release (); + + // Note that we are done with the test. + done++; +} + +static int +parse_args (int argc, ACE_TCHAR *argv[]) +{ + ACE_Get_Opt get_opt (argc, argv, ACE_TEXT ("h:p:")); + int c; + + while ((c = get_opt ()) != EOF) + switch (c) + { + case 'h': + host = get_opt.opt_arg (); + break; + case 'p': + port = ACE_OS::atoi (get_opt.opt_arg ()); + break; + default: + ACE_ERROR_RETURN ((LM_ERROR, "%p.\n", + "usage :\n" + "-h <host>\n"), -1); + } + + return 0; +} + +int +ACE_TMAIN (int argc, ACE_TCHAR *argv[]) +{ + if (parse_args (argc, argv) == -1) + return -1; + + Sender sender; + + Receiver receiver; + + // If passive side + if (host == 0) + { + if (receiver.open_addr (ACE_INET_Addr (port)) == -1) + return -1; + } + // If active side + else if (sender.open (host, port) == -1) + return -1; + + for (int success = 1; + success > 0 && !done; + ) + // Dispatch events via Proactor singleton. + success = ACE_Proactor::instance ()->handle_events (); + + return 0; +} + +#else /* ACE_WIN32 && !ACE_HAS_WINCE || ACE_HAS_AIO_CALLS*/ + +int +ACE_TMAIN (int, ACE_TCHAR *[]) +{ + ACE_DEBUG ((LM_DEBUG, + "This example does not work on this platform.\n")); + return 1; +} + +#endif /* ACE_WIN32 && !ACE_HAS_WINCE || ACE_HAS_AIO_CALLS*/ + |