summaryrefslogtreecommitdiff
path: root/ACE/examples/Reactor/Proactor/test_proactor3.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'ACE/examples/Reactor/Proactor/test_proactor3.cpp')
-rw-r--r--ACE/examples/Reactor/Proactor/test_proactor3.cpp864
1 files changed, 864 insertions, 0 deletions
diff --git a/ACE/examples/Reactor/Proactor/test_proactor3.cpp b/ACE/examples/Reactor/Proactor/test_proactor3.cpp
new file mode 100644
index 00000000000..c47468276c8
--- /dev/null
+++ b/ACE/examples/Reactor/Proactor/test_proactor3.cpp
@@ -0,0 +1,864 @@
+// $Id$
+
+// ============================================================================
+//
+// = LIBRARY
+// examples
+//
+// = FILENAME
+// test_proactor3.cpp
+//
+// = DESCRIPTION
+// This program illustrates how the <ACE_Proactor> can be used to
+// implement an application that does various asynchronous
+// operations.
+//
+// = AUTHOR
+// Irfan Pyarali <irfan@cs.wustl.edu>
+// modified by Alexander Libman <alibman@baltimore.com>
+// from original test_proactor.cpp
+// ============================================================================
+
+#include "ace/Signal.h"
+
+#include "ace/Service_Config.h"
+#include "ace/Proactor.h"
+#include "ace/Asynch_IO.h"
+#include "ace/Asynch_IO_Impl.h"
+#include "ace/Asynch_Acceptor.h"
+#include "ace/INET_Addr.h"
+#include "ace/SOCK_Connector.h"
+#include "ace/SOCK_Acceptor.h"
+#include "ace/SOCK_Stream.h"
+#include "ace/Message_Block.h"
+#include "ace/Get_Opt.h"
+
+// FUZZ: disable check_for_streams_include
+#include "ace/streams.h"
+
+#include "ace/Task.h"
+
+ACE_RCSID(Proactor, test_proactor, "test_proactor.cpp,v 1.27 2000/03/07 17:15:56 schmidt Exp")
+
+#if ((defined (ACE_WIN32) && !defined (ACE_HAS_WINCE)) || (defined (ACE_HAS_AIO_CALLS)))
+ // This only works on Win32 platforms and on Unix platforms
+ // supporting POSIX aio calls.
+
+#if defined (ACE_WIN32) && !defined (ACE_HAS_WINCE)
+
+# include "ace/WIN32_Proactor.h"
+
+#elif defined (ACE_HAS_AIO_CALLS)
+
+# include "ace/POSIX_Proactor.h"
+# include "ace/SUN_Proactor.h"
+
+#endif /* defined (ACE_WIN32) && !defined (ACE_HAS_WINCE) */
+
+// Some debug helper functions
+static int disable_signal (int sigmin, int sigmax);
+#if 0
+static int print_sigmask (void);
+#endif
+
+#define COUT(X) cout << X; cout.flush ();
+
+// Proactor Type (UNIX only, Win32 ignored) 0-default, 1 -AIOCB,
+// 2-SIG, 3-SUN
+static int proactor_type = 0;
+
+// POSIX : > 0 max number aio operations proactor,
+static int max_aio_operations = 0;
+
+// Host that we're connecting to.
+static ACE_TCHAR *host = 0;
+
+// number of Senders instances
+static int senders = 1;
+static const int MaxSenders = 100;
+
+// duplex mode: ==0 half-duplex
+// !=0 full duplex
+static int duplex = 0;
+
+// number threads in the Proactor thread pool
+static int threads = 1;
+
+// Port that we're receiving connections on.
+static u_short port = ACE_DEFAULT_SERVER_PORT;
+
+class MyTask: public ACE_Task<ACE_MT_SYNCH>
+{
+ // = TITLE
+ // MyTask plays role for Proactor threads pool
+public:
+ MyTask (void) : threads_ (0), proactor_ (0) {}
+
+ int svc (void);
+ void waitready (void) { event_.wait (); }
+
+private:
+ ACE_Recursive_Thread_Mutex mutex_;
+ int threads_;
+ ACE_Proactor *proactor_;
+ ACE_Manual_Event event_;
+
+ void create_proactor (void);
+ void delete_proactor (void);
+};
+
+void
+MyTask::create_proactor (void)
+{
+ ACE_Guard<ACE_Recursive_Thread_Mutex> locker (mutex_);
+
+ if (threads_ == 0)
+ {
+#if defined (ACE_WIN32) && !defined (ACE_HAS_WINCE)
+ ACE_WIN32_Proactor *proactor = new ACE_WIN32_Proactor;
+ ACE_DEBUG ((LM_DEBUG,"(%t) Create Proactor Type=WIN32"));
+
+#elif defined (ACE_HAS_AIO_CALLS)
+
+ ACE_POSIX_Proactor *proactor = 0;
+
+ switch (proactor_type)
+ {
+ case 1: proactor = new ACE_POSIX_AIOCB_Proactor (max_aio_operations);
+ ACE_DEBUG ((LM_DEBUG,"(%t) Create Proactor Type=AIOCB\n"));
+ break;
+ case 2: proactor = new ACE_POSIX_SIG_Proactor;
+ ACE_DEBUG ((LM_DEBUG,"(%t) Create Proactor Type=SIG\n"));
+ break;
+# if defined (sun)
+ case 3: proactor = new ACE_SUN_Proactor (max_aio_operations);
+ ACE_DEBUG ((LM_DEBUG,"(%t) Create Proactor Type=SUN\n"));
+ break;
+# endif /* sun */
+ default:proactor = new ACE_POSIX_SIG_Proactor;
+ ACE_DEBUG ((LM_DEBUG,"(%t) Create Proactor Type=SIG\n"));
+ break;
+ }
+#endif
+
+ proactor_ = new ACE_Proactor (proactor, 1);
+
+ ACE_Proactor::instance(proactor_);
+ event_.signal ();
+ }
+
+ threads_++;
+}
+
+void
+MyTask::delete_proactor (void)
+{
+ ACE_Guard<ACE_Recursive_Thread_Mutex> locker (mutex_);
+ if (--threads_ == 0)
+ {
+ ACE_DEBUG ((LM_DEBUG, "(%t) Delete Proactor\n"));
+ ACE_Proactor::instance ((ACE_Proactor *) 0);
+ delete proactor_;
+ proactor_ = 0;
+ }
+}
+
+int
+MyTask::svc (void)
+{
+ ACE_DEBUG ((LM_DEBUG, "(%t) MyTask started\n"));
+
+ create_proactor ();
+ disable_signal (ACE_SIGRTMIN, ACE_SIGRTMAX);
+
+ while (ACE_Proactor::event_loop_done () == 0)
+ ACE_Proactor::run_event_loop ();
+
+ delete_proactor ();
+
+ ACE_DEBUG ((LM_DEBUG, "(%t) MyTask finished\n"));
+ return 0;
+}
+
+class Receiver : public ACE_Service_Handler
+{
+public:
+
+ Receiver (void);
+ ~Receiver (void);
+
+ virtual void open (ACE_HANDLE handle,
+ ACE_Message_Block &message_block);
+ // This is called after the new connection has been accepted.
+
+ static long get_number_sessions (void) { return sessions_; }
+
+protected:
+ // These methods are called by the framework
+
+ virtual void handle_read_stream (const ACE_Asynch_Read_Stream::Result &result);
+ // This is called when asynchronous <read> operation from the socket
+ // complete.
+
+ virtual void handle_write_stream (const ACE_Asynch_Write_Stream::Result &result);
+ // This is called when an asynchronous <write> to the file
+ // completes.
+
+private:
+ int initiate_read_stream (void);
+ int initiate_write_stream (ACE_Message_Block & mb, int nBytes);
+ int check_destroy (void);
+
+ ACE_Asynch_Read_Stream rs_;
+ ACE_Asynch_Write_Stream ws_;
+ ACE_HANDLE handle_;
+ ACE_Recursive_Thread_Mutex mutex_;
+ long io_count_;
+ static long sessions_;
+};
+
+long Receiver::sessions_ = 0;
+
+Receiver::Receiver (void)
+ : handle_ (ACE_INVALID_HANDLE),
+ io_count_ (0)
+{
+ ACE_Guard<ACE_Recursive_Thread_Mutex> locker (mutex_);
+ sessions_++;
+ ACE_DEBUG ((LM_DEBUG, "Receiver Ctor sessions_=%d\n", sessions_));
+}
+
+Receiver::~Receiver (void)
+{
+ ACE_Guard<ACE_Recursive_Thread_Mutex> locker (mutex_);
+ sessions_--;
+ ACE_OS::closesocket (this->handle_);
+ ACE_DEBUG ((LM_DEBUG, "~Receiver Dtor sessions_=%d\n", sessions_));
+}
+
+// return true if we alive, false we commited suicide
+int
+Receiver::check_destroy (void)
+{
+ {
+ ACE_Guard<ACE_Recursive_Thread_Mutex> locker (mutex_);
+
+ if (io_count_ > 0)
+ return 1;
+ }
+
+ delete this;
+ return 0;
+}
+
+void
+Receiver::open (ACE_HANDLE handle,
+ ACE_Message_Block &)
+{
+ ACE_DEBUG ((LM_DEBUG,
+ "%N:%l:Receiver::open called\n"));
+
+ this->handle_ = handle;
+
+ if (this->ws_.open (*this, this->handle_) == -1)
+ ACE_ERROR ((LM_ERROR,
+ "%p\n",
+ "ACE_Asynch_Write_Stream::open"));
+ else if (this->rs_.open (*this, this->handle_) == -1)
+ ACE_ERROR ((LM_ERROR,
+ "%p\n",
+ "ACE_Asynch_Read_Stream::open"));
+ else
+ initiate_read_stream ();
+
+ check_destroy ();
+}
+
+int
+Receiver::initiate_read_stream (void)
+{
+ ACE_Guard<ACE_Recursive_Thread_Mutex> locker (mutex_);
+
+ ACE_Message_Block *mb = 0;
+ ACE_NEW_RETURN (mb,
+ ACE_Message_Block (BUFSIZ + 1),
+ -1);
+
+ // Inititiate read
+ if (this->rs_.read (*mb, mb->size ()- 1) == -1)
+ {
+ mb->release ();
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "%p\n",
+ "ACE_Asynch_Read_Stream::read"),
+ -1);
+ }
+
+ io_count_++;
+ return 0;
+}
+
+int
+Receiver::initiate_write_stream (ACE_Message_Block &mb, int nbytes)
+{
+ ACE_Guard<ACE_Recursive_Thread_Mutex> locker (mutex_);
+ if (nbytes <= 0)
+ {
+ mb.release ();
+ ACE_ERROR_RETURN((LM_ERROR,
+ "ACE_Asynch_Write_Stream::write nbytes <0 "),
+ -1);
+ }
+
+ if (this->ws_.write (mb, nbytes) == -1)
+ {
+ mb.release ();
+ ACE_ERROR_RETURN((LM_ERROR,
+ "%p\n",
+ "ACE_Asynch_Write_Stream::write"),
+ -1);
+ }
+
+ io_count_++;
+ return 0;
+}
+
+void
+Receiver::handle_read_stream (const ACE_Asynch_Read_Stream::Result &result)
+{
+ // Reset pointers.
+ result.message_block ().rd_ptr ()[result.bytes_transferred ()] = '\0';
+
+ if (result.bytes_transferred () == 0 || result.error () != 0)
+ {
+ ACE_DEBUG ((LM_DEBUG, "handle_read_stream called\n"));
+ ACE_DEBUG ((LM_DEBUG, "********************\n"));
+ ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_to_read", result.bytes_to_read ()));
+ ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "handle", result.handle ()));
+ ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_transfered", result.bytes_transferred ()));
+ ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "act", (u_long) result.act ()));
+ ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "success", result.success ()));
+ ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "completion_key", (u_long) result.completion_key ()));
+ ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "error", result.error ()));
+ ACE_DEBUG ((LM_DEBUG, "********************\n"));
+ ACE_DEBUG ((LM_DEBUG, "%s = %s\n", "message_block", result.message_block ().rd_ptr ()));
+ }
+
+ if (result.success () && result.bytes_transferred () != 0)
+ {
+ // Successful read: write the data to the file asynchronously.
+ // Note how we reuse the <ACE_Message_Block> for the writing.
+ // Therefore, we do not delete this buffer because it is handled
+ // in <handle_write_stream>.
+
+ if(this->initiate_write_stream (result.message_block (),
+ result.bytes_transferred ()) == 0)
+ {
+ if (duplex != 0)
+ {
+ // Initiate new read from the stream.
+ this->initiate_read_stream ();
+ }
+ }
+ }
+ else
+ {
+ result.message_block ().release ();
+ ACE_DEBUG ((LM_DEBUG, "Receiver completed\n"));
+ }
+
+ {
+ ACE_Guard<ACE_Recursive_Thread_Mutex> locker (mutex_);
+ io_count_--;
+ }
+ check_destroy ();
+}
+
+void
+Receiver::handle_write_stream (const ACE_Asynch_Write_Stream::Result &result)
+{
+ if (result.bytes_transferred () == 0 || result.error () != 0)
+ {
+ ACE_DEBUG ((LM_DEBUG, "handle_write_stream called\n"));
+
+ ACE_DEBUG ((LM_DEBUG, "********************\n"));
+ ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_to_write", result.bytes_to_write ()));
+ ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "handle", result.handle ()));
+ ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_transfered", result.bytes_transferred ()));
+ ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "act", (u_long) result.act ()));
+ ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "success", result.success ()));
+ ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "completion_key", (u_long) result.completion_key ()));
+ ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "error", result.error ()));
+ ACE_DEBUG ((LM_DEBUG, "********************\n"));
+ }
+
+ result.message_block ().release ();
+
+ if (result.success () && result.bytes_transferred () != 0)
+ {
+ // This code is not robust enough to deal with short file writes
+ // (which hardly ever happen);-)
+ // ACE_ASSERT (result.bytes_to_write () == result.bytes_transferred ());
+
+ if (duplex == 0)
+ initiate_read_stream ();
+ }
+
+ {
+ ACE_Guard<ACE_Recursive_Thread_Mutex> locker (mutex_);
+ io_count_--;
+ }
+ check_destroy ();
+}
+
+class Sender : public ACE_Handler
+{
+ // = TITLE
+ // Sends welcome messages receives them back.
+public:
+ Sender (void);
+ ~Sender (void);
+ int open (const ACE_TCHAR *host, u_short port);
+ void close (void);
+ ACE_HANDLE handle (void) const;
+
+protected:
+ // These methods are called by the freamwork
+
+ virtual void handle_read_stream (const ACE_Asynch_Read_Stream::Result &result);
+ // This is called when asynchronous reads from the socket complete
+
+ virtual void handle_write_stream (const ACE_Asynch_Write_Stream::Result &result);
+ // This is called when asynchronous writes from the socket complete
+
+private:
+
+ int initiate_read_stream (void);
+ int initiate_write_stream (void);
+
+ ACE_SOCK_Stream stream_;
+ // Network I/O handle
+
+ ACE_Asynch_Write_Stream ws_;
+ // ws (write stream): for writing to the socket
+
+ ACE_Asynch_Read_Stream rs_;
+ // rs (read file): for reading from the socket
+
+ ACE_Message_Block welcome_message_;
+ // Welcome message
+
+ ACE_Recursive_Thread_Mutex mutex_;
+ long io_count_;
+};
+
+static char *data = "Welcome to Irfan World! Irfan RULES here !!\n";
+
+Sender::Sender (void)
+ : io_count_ (0)
+{
+ // Moment of inspiration... :-)
+ this->welcome_message_.init (data, ACE_OS::strlen (data));
+}
+
+Sender::~Sender (void)
+{
+ close ();
+}
+
+void Sender::close (void)
+{
+ this->stream_.close ();
+}
+
+ACE_HANDLE Sender::handle (void) const
+{
+ return this->stream_.get_handle ();
+}
+
+int Sender::open (const ACE_TCHAR *host, u_short port)
+{
+ // Initialize stuff
+ // Connect to remote host
+ ACE_INET_Addr address (port, host);
+ ACE_SOCK_Connector connector;
+
+ if (connector.connect (this->stream_,
+ address) == -1)
+ {
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "%p\n",
+ "ACE_SOCK_Connector::connect"),
+ -1);
+ }
+
+ // Open ACE_Asynch_Write_Stream
+ if (this->ws_.open (*this) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "%p\n",
+ "ACE_Asynch_Write_Stream::open"),
+ -1);
+
+ // Open ACE_Asynch_Read_Stream
+ if (this->rs_.open (*this) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "%p\n",
+ "ACE_Asynch_Read_Stream::open"),
+ -1);
+
+ // Start an asynchronous transmit file
+ if (this->initiate_write_stream () == -1)
+ return -1;
+
+ if (duplex != 0)
+ // Start an asynchronous read file
+ if (this->initiate_read_stream () == -1)
+ return -1;
+
+ return 0;
+}
+
+int
+Sender::initiate_write_stream (void)
+{
+ ACE_Guard<ACE_Recursive_Thread_Mutex> locker (mutex_);
+
+ welcome_message_.rd_ptr(welcome_message_.base ());
+ welcome_message_.wr_ptr(welcome_message_.base ());
+ welcome_message_.wr_ptr (ACE_OS::strlen (data));
+
+ if (this->ws_.write (welcome_message_,
+ welcome_message_.length ()) == -1)
+ ACE_ERROR_RETURN((LM_ERROR,
+ "%p\n",
+ "ACE_Asynch_Write_Stream::write"),
+ -1);
+ io_count_++;
+ return 0;
+}
+
+int
+Sender::initiate_read_stream (void)
+{
+ ACE_Guard<ACE_Recursive_Thread_Mutex> locker (mutex_);
+
+ // Create a new <Message_Block>. Note that this message block will
+ // be used both to <read> data asynchronously from the socket and to
+ // <write> data asynchronously to the file.
+ ACE_DEBUG ((LM_DEBUG,
+ "initiate_read_stream called\n"));
+
+ ACE_Message_Block *mb = 0;
+ ACE_NEW_RETURN (mb,
+ ACE_Message_Block (BUFSIZ + 1),
+ -1);
+
+ // Inititiate read
+ if (this->rs_.read (*mb, mb->size ()- 1) == -1)
+ {
+ mb->release ();
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "%p\n",
+ "ACE_Asynch_Read_Stream::read"),
+ -1);
+ }
+
+ io_count_++;
+ return 0;
+}
+
+void
+Sender::handle_write_stream (const ACE_Asynch_Write_Stream::Result &result)
+{
+ if (result.bytes_transferred () == 0 || result.error () != 0)
+ {
+ ACE_DEBUG ((LM_DEBUG, "handle_write_stream called\n"));
+
+ // Reset pointers.
+ result.message_block ().rd_ptr (result.message_block ().rd_ptr () - result.bytes_transferred ());
+
+ ACE_DEBUG ((LM_DEBUG, "********************\n"));
+ ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_to_write", result.bytes_to_write ()));
+ ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "handle", result.handle ()));
+ ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_transfered", result.bytes_transferred ()));
+ ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "act", (u_long) result.act ()));
+ ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "success", result.success ()));
+ ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "completion_key", (u_long) result.completion_key ()));
+ ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "error", result.error ()));
+ ACE_DEBUG ((LM_DEBUG, "********************\n"));
+ ACE_DEBUG ((LM_DEBUG, "%s = %s\n", "message_block", result.message_block ().rd_ptr ()));
+ }
+
+ // Simplify just for Test
+ if (result.success () && result.bytes_transferred () != 0)
+ {
+ if (duplex != 0) // full duplex, continue write
+ initiate_write_stream ();
+ else // half-duplex read reply, after read we will start write
+ initiate_read_stream ();
+ }
+
+ {
+ ACE_Guard<ACE_Recursive_Thread_Mutex> locker (mutex_);
+ io_count_--;
+ }
+}
+
+void
+Sender::handle_read_stream (const ACE_Asynch_Read_Stream::Result &result)
+{
+ if (result.bytes_transferred () == 0 || result.error () != 0)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "handle_read_stream called\n"));
+
+ // Reset pointers.
+ result.message_block ().rd_ptr ()[result.bytes_transferred ()] = '\0';
+
+ ACE_DEBUG ((LM_DEBUG, "********************\n"));
+ ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_to_read", result.bytes_to_read ()));
+ ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "handle", result.handle ()));
+ ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_transfered", result.bytes_transferred ()));
+ ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "act", (u_long) result.act ()));
+ ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "success", result.success ()));
+ ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "completion_key", (u_long) result.completion_key ()));
+ ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "error", result.error ()));
+ ACE_DEBUG ((LM_DEBUG, "********************\n"));
+ ACE_DEBUG ((LM_DEBUG, "%s = %s\n", "message_block", result.message_block ().rd_ptr ()));
+ }
+
+ result.message_block().release ();
+
+ if (result.success () && result.bytes_transferred () != 0)
+ {
+ // Successful read: write the data to the file asynchronously.
+ // Note how we reuse the <ACE_Message_Block> for the writing.
+ // Therefore, we do not delete this buffer because it is handled
+ // in <handle_write_stream>.
+
+ if (duplex != 0) // full duplex, continue read
+ initiate_read_stream ();
+ else // half-duplex writey, after write we will start read
+ initiate_write_stream ();
+ }
+
+ {
+ ACE_Guard<ACE_Recursive_Thread_Mutex> locker (mutex_);
+ io_count_--;
+ }
+}
+
+static int
+set_proactor_type (const char *ptype)
+{
+ if (!ptype)
+ return false;
+
+ switch (toupper (*ptype))
+ {
+ case 'D' : proactor_type = 0; return true;
+ case 'A' : proactor_type = 1; return true;
+ case 'I' : proactor_type = 2; return true;
+#if defined (sun)
+ case 'S' : proactor_type = 3; return true;
+#endif /* sun */
+ }
+ return false;
+}
+
+static int
+parse_args (int argc, ACE_TCHAR *argv[])
+{
+ ACE_Get_Opt get_opt (argc, argv, ACE_TEXT("t:o:n:p:d:h:s:u"));
+ int c;
+
+ while ((c = get_opt ()) != EOF)
+ switch (c)
+ {
+ case 'd': // duplex
+ duplex = ACE_OS::atoi (get_opt.opt_arg ());
+ break;
+ case 'h': // host for sender
+ host = get_opt.opt_arg ();
+ break;
+ case 'p': // port number
+ port = ACE_OS::atoi (get_opt.opt_arg ());
+ break;
+ case 'n': // thread pool size
+ threads = ACE_OS::atoi (get_opt.opt_arg ());
+ break;
+ case 's': // number of senders
+ senders = ACE_OS::atoi (get_opt.opt_arg ());
+ if (senders > MaxSenders)
+ senders = MaxSenders;
+ break;
+ case 'o': // max number of aio for proactor
+ max_aio_operations = ACE_OS::atoi (get_opt.opt_arg ());
+ break;
+ case 't': // Proactor Type
+ if (set_proactor_type (get_opt.opt_arg ()))
+ break;
+ case 'u':
+ default:
+ ACE_ERROR ((LM_ERROR, "%p.",
+ "\nusage:"
+ "\n-o <max number of started aio operations for Proactor>"
+ "\n-t <Proactor type> UNIX-only, Win32-default always:"
+ "\n a AIOCB"
+ "\n i SIG"
+ "\n s SUN"
+ "\n d default"
+ "\n-d <duplex mode 1-on/0-off>"
+ "\n-h <host> for Sender mode"
+ "\n-n <number threads for Proactor pool>"
+ "\n-p <port to listen/connect>"
+ "\n-s <number of sender's instances>"
+ "\n-u show this message"
+ "\n"));
+
+ return -1;
+ }
+
+ return 0;
+}
+
+int
+ACE_TMAIN (int argc, ACE_TCHAR *argv[])
+{
+#if defined (sun)
+ ACE_DEBUG ((LM_DEBUG, "\nSUN defined!\n"));
+#endif
+ if (parse_args (argc, argv) == -1)
+ return -1;
+
+ disable_signal (ACE_SIGRTMIN, ACE_SIGRTMAX);
+
+ MyTask task1;
+
+ if (task1.activate (THR_NEW_LWP, threads) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "%p.\n",
+ "main"),
+ -1);
+
+ // wait for creation of Proactor
+ task1.waitready ();
+
+ Sender * send_list[MaxSenders];
+
+ ACE_Asynch_Acceptor<Receiver> acceptor;
+
+ int rc = -1;
+ int i;
+ char c;
+
+ if (host == 0) // Acceptor
+ {
+ // Simplify, initial read with zero size
+ if (acceptor.open (ACE_INET_Addr (port),0,1) == 0)
+ rc = 1;
+ }
+ else
+ {
+ for (i = 0; i < senders; ++i)
+ send_list[i] = new Sender;
+
+ for (i = 0; i < senders; ++i)
+ if (send_list[i]->open (host, port) == 0)
+ rc++;
+ }
+
+ if (rc > 0)
+ {
+ cout << "Press any key to stop=>" << flush;
+ cin.clear ();
+ cin >> c;
+ }
+
+ ACE_Proactor::end_event_loop ();
+
+ if (host != 0) // we are sender
+ {
+ for (i = 0; i < senders; ++i)
+ send_list[i]->close ();
+ }
+
+
+ ACE_Thread_Manager *tm =
+ ACE_Thread_Manager::instance();
+
+ tm->wait_task (&task1);
+
+ cout << "\nNumber of Receivers objects="
+ << Receiver::get_number_sessions ()
+ << flush;
+
+ for (i = 0; i < senders; ++i)
+ {
+ delete (send_list[i]);
+ send_list[i] = 0;
+ }
+
+ return 0;
+}
+
+static int
+disable_signal (int sigmin, int sigmax)
+{
+#ifndef ACE_WIN32
+
+ sigset_t signal_set;
+ if (sigemptyset (&signal_set) == - 1)
+ ACE_ERROR ((LM_ERROR,
+ "Error:(%P | %t):%p\n",
+ "sigemptyset failed"));
+
+ for (int i = sigmin; i <= sigmax; i++)
+ sigaddset (&signal_set, i);
+
+ // Put the <signal_set>.
+ if (ACE_OS::pthread_sigmask (SIG_BLOCK, &signal_set, 0) != 0)
+ ACE_ERROR ((LM_ERROR,
+ "Error:(%P | %t):%p\n",
+ "pthread_sigmask failed"));
+#endif /* ACE_WIN32 */
+
+ return 1;
+}
+
+// Get the <signal_set> back from the OS.
+
+#if 0
+static int
+print_sigmask (void)
+{
+#ifndef ACE_WIN32
+ sigset_t mask;
+ int member = 0;
+
+ COUT ("\n=============Signal Mask==========")
+
+ if (ACE_OS::pthread_sigmask (SIG_SETMASK, 0, & mask) != 0)
+ ACE_ERROR ((LM_ERROR,
+ "Error:(%P | %t):%p\n",
+ "ACE_OS::pthread_sigmask failed"));
+ else
+ for (int i = 1; i < 1000; i++)
+ {
+ member = sigismember (&mask,i);
+
+ COUT ("\nSig ")
+ COUT (i)
+ COUT (" is ")
+ COUT (member)
+
+ if (member == -1)
+ break;
+ }
+
+#endif /* ACE_WIN32 */
+ return 0;
+}
+#endif /* 0 */
+
+#endif /* ACE_WIN32 && !ACE_HAS_WINCE || ACE_HAS_AIO_CALLS*/