summaryrefslogtreecommitdiff
path: root/examples/Reactor/Proactor/test_proactor3.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'examples/Reactor/Proactor/test_proactor3.cpp')
-rw-r--r--examples/Reactor/Proactor/test_proactor3.cpp868
1 files changed, 0 insertions, 868 deletions
diff --git a/examples/Reactor/Proactor/test_proactor3.cpp b/examples/Reactor/Proactor/test_proactor3.cpp
deleted file mode 100644
index 38b9d3c0daa..00000000000
--- a/examples/Reactor/Proactor/test_proactor3.cpp
+++ /dev/null
@@ -1,868 +0,0 @@
-// $Id$
-
-// ============================================================================
-//
-// = LIBRARY
-// examples
-//
-// = FILENAME
-// test_proactor3.cpp
-//
-// = DESCRIPTION
-// This program illustrates how the <ACE_Proactor> can be used to
-// implement an application that does various asynchronous
-// operations.
-//
-// = AUTHOR
-// Irfan Pyarali <irfan@cs.wustl.edu>
-// modified by Alexander Libman <alibman@baltimore.com>
-// from original test_proactor.cpp
-// ============================================================================
-
-#include "ace/Signal.h"
-
-#include "ace/Service_Config.h"
-#include "ace/Proactor.h"
-#include "ace/Asynch_IO.h"
-#include "ace/Asynch_IO_Impl.h"
-#include "ace/Asynch_Acceptor.h"
-#include "ace/INET_Addr.h"
-#include "ace/SOCK_Connector.h"
-#include "ace/SOCK_Acceptor.h"
-#include "ace/SOCK_Stream.h"
-#include "ace/Message_Block.h"
-#include "ace/Get_Opt.h"
-#include "ace/streams.h"
-
-#include "ace/Task.h"
-
-ACE_RCSID(Proactor, test_proactor, "test_proactor.cpp,v 1.27 2000/03/07 17:15:56 schmidt Exp")
-
-#if ((defined (ACE_WIN32) && !defined (ACE_HAS_WINCE)) || (defined (ACE_HAS_AIO_CALLS)))
- // This only works on Win32 platforms and on Unix platforms
- // supporting POSIX aio calls.
-
-#if defined (ACE_WIN32) && !defined (ACE_HAS_WINCE)
-
-# include "ace/WIN32_Proactor.h"
-
-#elif defined (ACE_HAS_AIO_CALLS)
-
-# include "ace/POSIX_Proactor.h"
-# include "ace/SUN_Proactor.h"
-
-#endif /* defined (ACE_WIN32) && !defined (ACE_HAS_WINCE) */
-
-// Some debug helper functions
-static int disable_signal (int sigmin, int sigmax);
-#if 0
-static int print_sigmask (void);
-#endif
-
-#define COUT(X) cout << X; cout.flush ();
-
-// Proactor Type (UNIX only, Win32 ignored) 0-default, 1 -AIOCB,
-// 2-SIG, 3-SUN
-static int proactor_type = 0;
-
-// POSIX : > 0 max number aio operations proactor,
-static int max_aio_operations = 0;
-
-// Host that we're connecting to.
-static char *host = 0;
-
-// number of Senders instances
-static int senders = 1;
-static const int MaxSenders = 100;
-
-// duplex mode: ==0 half-duplex
-// !=0 full duplex
-static int duplex = 0;
-
-// number threads in the Proactor thread pool
-static int threads = 1;
-
-// Port that we're receiving connections on.
-static u_short port = ACE_DEFAULT_SERVER_PORT;
-
-class MyTask: public ACE_Task<ACE_MT_SYNCH>
-{
- // = TITLE
- // MyTask plays role for Proactor threads pool
-public:
- MyTask (void) : threads_ (0), proactor_ (0) {}
-
- int svc (void);
- void waitready (void) { event_.wait (); }
-
-private:
- ACE_Recursive_Thread_Mutex mutex_;
- int threads_;
- ACE_Proactor *proactor_;
- ACE_Manual_Event event_;
-
- void create_proactor (void);
- void delete_proactor (void);
-};
-
-void
-MyTask::create_proactor (void)
-{
- ACE_Guard<ACE_Recursive_Thread_Mutex> locker (mutex_);
-
- if (threads_ == 0)
- {
-#if defined (ACE_WIN32) && !defined (ACE_HAS_WINCE)
- ACE_WIN32_Proactor *proactor = new ACE_WIN32_Proactor;
- ACE_DEBUG ((LM_DEBUG,"(%t) Create Proactor Type=WIN32"));
-
-#elif defined (ACE_HAS_AIO_CALLS)
-
- ACE_POSIX_Proactor *proactor = 0;
-
- switch (proactor_type)
- {
- case 1: proactor = new ACE_POSIX_AIOCB_Proactor (max_aio_operations);
- ACE_DEBUG ((LM_DEBUG,"(%t) Create Proactor Type=AIOCB\n"));
- break;
- case 2: proactor = new ACE_POSIX_SIG_Proactor;
- ACE_DEBUG ((LM_DEBUG,"(%t) Create Proactor Type=SIG\n"));
- break;
-# if defined (sun)
- case 3: proactor = new ACE_SUN_Proactor (max_aio_operations);
- ACE_DEBUG ((LM_DEBUG,"(%t) Create Proactor Type=SUN\n"));
- break;
-# endif /* sun */
- default:proactor = new ACE_POSIX_SIG_Proactor;
- ACE_DEBUG ((LM_DEBUG,"(%t) Create Proactor Type=SIG\n"));
- break;
- }
-#endif
-
- proactor_ = new ACE_Proactor (proactor, 1);
-
- ACE_Proactor::instance(proactor_);
- event_.signal ();
- }
-
- threads_++;
-}
-
-void
-MyTask::delete_proactor (void)
-{
- ACE_Guard<ACE_Recursive_Thread_Mutex> locker (mutex_);
- if (--threads_ == 0)
- {
- ACE_DEBUG ((LM_DEBUG, "(%t) Delete Proactor\n"));
- ACE_Proactor::instance ((ACE_Proactor *) 0);
- delete proactor_;
- proactor_ = 0;
- }
-}
-
-int
-MyTask::svc (void)
-{
- ACE_DEBUG ((LM_DEBUG, "(%t) MyTask started\n"));
-
- create_proactor ();
- disable_signal (ACE_SIGRTMIN, ACE_SIGRTMAX);
-
- while (ACE_Proactor::event_loop_done () == 0)
- ACE_Proactor::run_event_loop ();
-
- delete_proactor ();
-
- ACE_DEBUG ((LM_DEBUG, "(%t) MyTask finished\n"));
- return 0;
-}
-
-class Receiver : public ACE_Service_Handler
-{
-public:
-
- Receiver (void);
- ~Receiver (void);
-
- virtual void open (ACE_HANDLE handle,
- ACE_Message_Block &message_block);
- // This is called after the new connection has been accepted.
-
- static long get_number_sessions (void) { return sessions_; }
-
-protected:
- // These methods are called by the framework
-
- virtual void handle_read_stream (const ACE_Asynch_Read_Stream::Result &result);
- // This is called when asynchronous <read> operation from the socket
- // complete.
-
- virtual void handle_write_stream (const ACE_Asynch_Write_Stream::Result &result);
- // This is called when an asynchronous <write> to the file
- // completes.
-
-private:
- int initiate_read_stream (void);
- int initiate_write_stream (ACE_Message_Block & mb, int nBytes);
- int check_destroy (void);
-
- ACE_Asynch_Read_Stream rs_;
- ACE_Asynch_Write_Stream ws_;
- ACE_HANDLE handle_;
- ACE_Recursive_Thread_Mutex mutex_;
- long io_count_;
- static long sessions_;
-};
-
-long Receiver::sessions_ = 0;
-
-Receiver::Receiver (void)
- : handle_ (ACE_INVALID_HANDLE),
- io_count_ (0)
-{
- ACE_Guard<ACE_Recursive_Thread_Mutex> locker (mutex_);
- sessions_++;
- ACE_DEBUG ((LM_DEBUG, "Receiver Ctor sessions_=%d\n", sessions_));
-}
-
-Receiver::~Receiver (void)
-{
- ACE_Guard<ACE_Recursive_Thread_Mutex> locker (mutex_);
- sessions_--;
- ACE_OS::closesocket (this->handle_);
- ACE_DEBUG ((LM_DEBUG, "~Receiver Dtor sessions_=%d\n", sessions_));
-}
-
-// return true if we alive, false we commited suicide
-int
-Receiver::check_destroy (void)
-{
- {
- ACE_Guard<ACE_Recursive_Thread_Mutex> locker (mutex_);
-
- if (io_count_ > 0)
- return 1;
- }
-
- delete this;
- return 0;
-}
-
-void
-Receiver::open (ACE_HANDLE handle,
- ACE_Message_Block &)
-{
- ACE_DEBUG ((LM_DEBUG,
- "%N:%l:Receiver::open called\n"));
-
- this->handle_ = handle;
-
- if (this->ws_.open (*this, this->handle_) == -1)
- ACE_ERROR ((LM_ERROR,
- "%p\n",
- "ACE_Asynch_Write_Stream::open"));
- else if (this->rs_.open (*this, this->handle_) == -1)
- ACE_ERROR ((LM_ERROR,
- "%p\n",
- "ACE_Asynch_Read_Stream::open"));
- else
- initiate_read_stream ();
-
- check_destroy ();
-}
-
-int
-Receiver::initiate_read_stream (void)
-{
- ACE_Guard<ACE_Recursive_Thread_Mutex> locker (mutex_);
-
- ACE_Message_Block *mb = 0;
- ACE_NEW_RETURN (mb,
- ACE_Message_Block (BUFSIZ + 1),
- -1);
-
- // Inititiate read
- if (this->rs_.read (*mb, mb->size ()- 1) == -1)
- {
- mb->release ();
- ACE_ERROR_RETURN ((LM_ERROR,
- "%p\n",
- "ACE_Asynch_Read_Stream::read"),
- -1);
- }
-
- io_count_++;
- return 0;
-}
-
-int
-Receiver::initiate_write_stream (ACE_Message_Block &mb, int nbytes)
-{
- ACE_Guard<ACE_Recursive_Thread_Mutex> locker (mutex_);
- if (nbytes <= 0)
- {
- mb.release ();
- ACE_ERROR_RETURN((LM_ERROR,
- "ACE_Asynch_Write_Stream::write nbytes <0 "),
- -1);
- }
-
- if (this->ws_.write (mb, nbytes) == -1)
- {
- mb.release ();
- ACE_ERROR_RETURN((LM_ERROR,
- "%p\n",
- "ACE_Asynch_Write_Stream::write"),
- -1);
- }
-
- io_count_++;
- return 0;
-}
-
-void
-Receiver::handle_read_stream (const ACE_Asynch_Read_Stream::Result &result)
-{
- // Reset pointers.
- result.message_block ().rd_ptr ()[result.bytes_transferred ()] = '\0';
-
- if (result.bytes_transferred () == 0 || result.error () != 0)
- {
- ACE_DEBUG ((LM_DEBUG, "handle_read_stream called\n"));
- ACE_DEBUG ((LM_DEBUG, "********************\n"));
- ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_to_read", result.bytes_to_read ()));
- ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "handle", result.handle ()));
- ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_transfered", result.bytes_transferred ()));
- ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "act", (u_long) result.act ()));
- ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "success", result.success ()));
- ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "completion_key", (u_long) result.completion_key ()));
- ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "error", result.error ()));
- ACE_DEBUG ((LM_DEBUG, "********************\n"));
- ACE_DEBUG ((LM_DEBUG, "%s = %s\n", "message_block", result.message_block ().rd_ptr ()));
- }
-
- if (result.success () && result.bytes_transferred () != 0)
- {
- // Successful read: write the data to the file asynchronously.
- // Note how we reuse the <ACE_Message_Block> for the writing.
- // Therefore, we do not delete this buffer because it is handled
- // in <handle_write_stream>.
-
- if(this->initiate_write_stream (result.message_block (),
- result.bytes_transferred ()) == 0)
- {
- if (duplex != 0)
- {
- // Initiate new read from the stream.
- this->initiate_read_stream ();
- }
- }
- }
- else
- {
- result.message_block ().release ();
- ACE_DEBUG ((LM_DEBUG, "Receiver completed\n"));
- }
-
- {
- ACE_Guard<ACE_Recursive_Thread_Mutex> locker (mutex_);
- io_count_--;
- }
- check_destroy ();
-}
-
-void
-Receiver::handle_write_stream (const ACE_Asynch_Write_Stream::Result &result)
-{
- if (result.bytes_transferred () == 0 || result.error () != 0)
- {
- ACE_DEBUG ((LM_DEBUG, "handle_write_stream called\n"));
-
- ACE_DEBUG ((LM_DEBUG, "********************\n"));
- ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_to_write", result.bytes_to_write ()));
- ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "handle", result.handle ()));
- ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_transfered", result.bytes_transferred ()));
- ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "act", (u_long) result.act ()));
- ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "success", result.success ()));
- ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "completion_key", (u_long) result.completion_key ()));
- ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "error", result.error ()));
- ACE_DEBUG ((LM_DEBUG, "********************\n"));
- }
-
- result.message_block ().release ();
-
- if (result.success () && result.bytes_transferred () != 0)
- {
- // This code is not robust enough to deal with short file writes
- // (which hardly ever happen);-)
- // ACE_ASSERT (result.bytes_to_write () == result.bytes_transferred ());
-
- if (duplex == 0)
- initiate_read_stream ();
- }
-
- {
- ACE_Guard<ACE_Recursive_Thread_Mutex> locker (mutex_);
- io_count_--;
- }
- check_destroy ();
-}
-
-class Sender : public ACE_Handler
-{
- // = TITLE
- // Sends welcome messages receives them back.
-public:
- Sender (void);
- ~Sender (void);
- int open (const char *host, u_short port);
- void close (void);
- ACE_HANDLE handle (void) const;
-
-protected:
- // These methods are called by the freamwork
-
- virtual void handle_read_stream (const ACE_Asynch_Read_Stream::Result &result);
- // This is called when asynchronous reads from the socket complete
-
- virtual void handle_write_stream (const ACE_Asynch_Write_Stream::Result &result);
- // This is called when asynchronous writes from the socket complete
-
-private:
-
- int initiate_read_stream (void);
- int initiate_write_stream (void);
-
- ACE_SOCK_Stream stream_;
- // Network I/O handle
-
- ACE_Asynch_Write_Stream ws_;
- // ws (write stream): for writing to the socket
-
- ACE_Asynch_Read_Stream rs_;
- // rs (read file): for reading from the socket
-
- ACE_Message_Block welcome_message_;
- // Welcome message
-
- ACE_Recursive_Thread_Mutex mutex_;
- long io_count_;
-};
-
-static char *data = "Welcome to Irfan World! Irfan RULES here !!\n";
-
-Sender::Sender (void)
- : io_count_ (0)
-{
- // Moment of inspiration... :-)
- this->welcome_message_.init (data, ACE_OS::strlen (data));
-}
-
-Sender::~Sender (void)
-{
- close ();
-}
-
-void Sender::close (void)
-{
- this->stream_.close ();
-}
-
-ACE_HANDLE Sender::handle (void) const
-{
- return this->stream_.get_handle ();
-}
-
-int Sender::open (const char *host, u_short port)
-{
- // Initialize stuff
- // Connect to remote host
- ACE_INET_Addr address (port, host);
- ACE_SOCK_Connector connector;
-
- if (connector.connect (this->stream_,
- address) == -1)
- {
- ACE_ERROR_RETURN ((LM_ERROR,
- "%p\n",
- "ACE_SOCK_Connector::connect"),
- -1);
- }
-
- // Open ACE_Asynch_Write_Stream
- if (this->ws_.open (*this) == -1)
- ACE_ERROR_RETURN ((LM_ERROR,
- "%p\n",
- "ACE_Asynch_Write_Stream::open"),
- -1);
-
- // Open ACE_Asynch_Read_Stream
- if (this->rs_.open (*this) == -1)
- ACE_ERROR_RETURN ((LM_ERROR,
- "%p\n",
- "ACE_Asynch_Read_Stream::open"),
- -1);
-
- // Start an asynchronous transmit file
- if (this->initiate_write_stream () == -1)
- return -1;
-
- if (duplex != 0)
- // Start an asynchronous read file
- if (this->initiate_read_stream () == -1)
- return -1;
-
- return 0;
-}
-
-int
-Sender::initiate_write_stream (void)
-{
- ACE_Guard<ACE_Recursive_Thread_Mutex> locker (mutex_);
-
- welcome_message_.rd_ptr(welcome_message_.base ());
- welcome_message_.wr_ptr(welcome_message_.base ());
- welcome_message_.wr_ptr (ACE_OS::strlen (data));
-
- if (this->ws_.write (welcome_message_,
- welcome_message_.length ()) == -1)
- ACE_ERROR_RETURN((LM_ERROR,
- "%p\n",
- "ACE_Asynch_Write_Stream::write"),
- -1);
- io_count_++;
- return 0;
-}
-
-int
-Sender::initiate_read_stream (void)
-{
- ACE_Guard<ACE_Recursive_Thread_Mutex> locker (mutex_);
-
- // Create a new <Message_Block>. Note that this message block will
- // be used both to <read> data asynchronously from the socket and to
- // <write> data asynchronously to the file.
- ACE_DEBUG ((LM_DEBUG,
- "initiate_read_stream called\n"));
-
- ACE_Message_Block *mb = 0;
- ACE_NEW_RETURN (mb,
- ACE_Message_Block (BUFSIZ + 1),
- -1);
-
- // Inititiate read
- if (this->rs_.read (*mb, mb->size ()- 1) == -1)
- {
- mb->release ();
- ACE_ERROR_RETURN ((LM_ERROR,
- "%p\n",
- "ACE_Asynch_Read_Stream::read"),
- -1);
- }
-
- io_count_++;
- return 0;
-}
-
-void
-Sender::handle_write_stream (const ACE_Asynch_Write_Stream::Result &result)
-{
- if (result.bytes_transferred () == 0 || result.error () != 0)
- {
- ACE_DEBUG ((LM_DEBUG, "handle_write_stream called\n"));
-
- // Reset pointers.
- result.message_block ().rd_ptr (result.message_block ().rd_ptr () - result.bytes_transferred ());
-
- ACE_DEBUG ((LM_DEBUG, "********************\n"));
- ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_to_write", result.bytes_to_write ()));
- ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "handle", result.handle ()));
- ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_transfered", result.bytes_transferred ()));
- ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "act", (u_long) result.act ()));
- ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "success", result.success ()));
- ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "completion_key", (u_long) result.completion_key ()));
- ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "error", result.error ()));
- ACE_DEBUG ((LM_DEBUG, "********************\n"));
- ACE_DEBUG ((LM_DEBUG, "%s = %s\n", "message_block", result.message_block ().rd_ptr ()));
- }
-
- // Simplify just for Test
- if (result.success () && result.bytes_transferred () != 0)
- {
- if (duplex != 0) // full duplex, continue write
- initiate_write_stream ();
- else // half-duplex read reply, after read we will start write
- initiate_read_stream ();
- }
-
- {
- ACE_Guard<ACE_Recursive_Thread_Mutex> locker (mutex_);
- io_count_--;
- }
-}
-
-void
-Sender::handle_read_stream (const ACE_Asynch_Read_Stream::Result &result)
-{
- if (result.bytes_transferred () == 0 || result.error () != 0)
- {
- ACE_DEBUG ((LM_DEBUG,
- "handle_read_stream called\n"));
-
- // Reset pointers.
- result.message_block ().rd_ptr ()[result.bytes_transferred ()] = '\0';
-
- ACE_DEBUG ((LM_DEBUG, "********************\n"));
- ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_to_read", result.bytes_to_read ()));
- ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "handle", result.handle ()));
- ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_transfered", result.bytes_transferred ()));
- ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "act", (u_long) result.act ()));
- ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "success", result.success ()));
- ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "completion_key", (u_long) result.completion_key ()));
- ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "error", result.error ()));
- ACE_DEBUG ((LM_DEBUG, "********************\n"));
- ACE_DEBUG ((LM_DEBUG, "%s = %s\n", "message_block", result.message_block ().rd_ptr ()));
- }
-
- result.message_block().release ();
-
- if (result.success () && result.bytes_transferred () != 0)
- {
- // Successful read: write the data to the file asynchronously.
- // Note how we reuse the <ACE_Message_Block> for the writing.
- // Therefore, we do not delete this buffer because it is handled
- // in <handle_write_stream>.
-
- if (duplex != 0) // full duplex, continue read
- initiate_read_stream ();
- else // half-duplex writey, after write we will start read
- initiate_write_stream ();
- }
-
- {
- ACE_Guard<ACE_Recursive_Thread_Mutex> locker (mutex_);
- io_count_--;
- }
-}
-
-static int
-set_proactor_type (const char *ptype)
-{
- if (!ptype)
- return false;
-
- switch (toupper (*ptype))
- {
- case 'D' : proactor_type = 0; return true;
- case 'A' : proactor_type = 1; return true;
- case 'I' : proactor_type = 2; return true;
-#if defined (sun)
- case 'S' : proactor_type = 3; return true;
-#endif /* sun */
- }
- return false;
-}
-
-static int
-parse_args (int argc, char *argv[])
-{
- ACE_Get_Opt get_opt (argc, argv, "t:o:n:p:d:h:s:u");
- int c;
-
- while ((c = get_opt ()) != EOF)
- switch (c)
- {
- case 'd': // duplex
- duplex = ACE_OS::atoi (get_opt.optarg);
- break;
- case 'h': // host for sender
- host = get_opt.optarg;
- break;
- case 'p': // port number
- port = ACE_OS::atoi (get_opt.optarg);
- break;
- case 'n': // thread pool size
- threads = ACE_OS::atoi (get_opt.optarg);
- break;
- case 's': // number of senders
- senders = ACE_OS::atoi (get_opt.optarg);
- if (senders > MaxSenders)
- senders = MaxSenders;
- break;
- case 'o': // max number of aio for proactor
- max_aio_operations = ACE_OS::atoi (get_opt.optarg);
- break;
- case 't': // Proactor Type
- if (set_proactor_type (get_opt.optarg))
- break;
- case 'u':
- default:
- ACE_ERROR ((LM_ERROR, "%p.",
- "\nusage:"
- "\n-o <max number of started aio operations for Proactor>"
- "\n-t <Proactor type> UNIX-only, Win32-default always:"
- "\n a AIOCB"
- "\n i SIG"
- "\n s SUN"
- "\n d default"
- "\n-d <duplex mode 1-on/0-off>"
- "\n-h <host> for Sender mode"
- "\n-n <number threads for Proactor pool>"
- "\n-p <port to listen/connect>"
- "\n-s <number of sender's instances>"
- "\n-u show this message"
- "\n"));
-
- return -1;
- }
-
- return 0;
-}
-
-int
-main (int argc, char *argv[])
-{
-#if defined (sun)
- ACE_DEBUG ((LM_DEBUG, "\nSUN defined!\n"));
-#endif
- if (parse_args (argc, argv) == -1)
- return -1;
-
- disable_signal (ACE_SIGRTMIN, ACE_SIGRTMAX);
-
- MyTask task1;
-
- if (task1.activate (THR_NEW_LWP, threads) == -1)
- ACE_ERROR_RETURN ((LM_ERROR,
- "%p.\n",
- "main"),
- -1);
-
- // wait for creation of Proactor
- task1.waitready ();
-
- Sender * send_list[MaxSenders];
-
- ACE_Asynch_Acceptor<Receiver> acceptor;
-
- int rc = -1;
- int i;
- char c;
-
- if (host == 0) // Acceptor
- {
- // Simplify, initial read with zero size
- if (acceptor.open (ACE_INET_Addr (port),0,1) == 0)
- rc = 1;
- }
- else
- {
- for (i = 0; i < senders; ++i)
- send_list[i] = new Sender;
-
- for (i = 0; i < senders; ++i)
- if (send_list[i]->open (host, port) == 0)
- rc++;
- }
-
- if (rc > 0)
- {
- cout << "Press any key to stop=>" << flush;
- cin.clear ();
- cin >> c;
- }
-
- ACE_Proactor::end_event_loop ();
-
- if (host != 0) // we are sender
- {
- for (i = 0; i < senders; ++i)
- send_list[i]->close ();
- }
-
-
- ACE_Thread_Manager *tm =
- ACE_Thread_Manager::instance();
-
- tm->wait_task (&task1);
-
- cout << "\nNumber of Receivers objects="
- << Receiver::get_number_sessions ()
- << flush;
-
- for (i = 0; i < senders; ++i)
- {
- delete (send_list[i]);
- send_list[i] = 0;
- }
-
- return 0;
-}
-
-static int
-disable_signal (int sigmin, int sigmax)
-{
-#ifndef ACE_WIN32
-
- sigset_t signal_set;
- if (sigemptyset (&signal_set) == - 1)
- ACE_ERROR ((LM_ERROR,
- "Error:(%P | %t):%p\n",
- "sigemptyset failed"));
-
- for (int i = sigmin; i <= sigmax; i++)
- sigaddset (&signal_set, i);
-
- // Put the <signal_set>.
- if (ACE_OS::pthread_sigmask (SIG_BLOCK, &signal_set, 0) != 0)
- ACE_ERROR ((LM_ERROR,
- "Error:(%P | %t):%p\n",
- "pthread_sigmask failed"));
-#endif /* ACE_WIN32 */
-
- return 1;
-}
-
-// Get the <signal_set> back from the OS.
-
-#if 0
-static int
-print_sigmask (void)
-{
-#ifndef ACE_WIN32
- sigset_t mask;
- int member = 0;
-
- COUT ("\n=============Signal Mask==========")
-
- if (ACE_OS::pthread_sigmask (SIG_SETMASK, 0, & mask) != 0)
- ACE_ERROR ((LM_ERROR,
- "Error:(%P | %t):%p\n",
- "ACE_OS::pthread_sigmask failed"));
- else
- for (int i = 1; i < 1000; i++)
- {
- member = sigismember (&mask,i);
-
- COUT ("\nSig ")
- COUT (i)
- COUT (" is ")
- COUT (member)
-
- if (member == -1)
- break;
- }
-
-#endif /* ACE_WIN32 */
- return 0;
-}
-#endif /* 0 */
-
-#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)
-template class ACE_Asynch_Acceptor<Receiver>;
-#elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA)
-#pragma instantiate ACE_Asynch_Acceptor<Receiver>
-#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */
-
-#endif /* ACE_WIN32 && !ACE_HAS_WINCE || ACE_HAS_AIO_CALLS*/