summaryrefslogtreecommitdiff
path: root/ACE/tests/Proactor_UDP_Test.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'ACE/tests/Proactor_UDP_Test.cpp')
-rw-r--r--ACE/tests/Proactor_UDP_Test.cpp2214
1 files changed, 2214 insertions, 0 deletions
diff --git a/ACE/tests/Proactor_UDP_Test.cpp b/ACE/tests/Proactor_UDP_Test.cpp
new file mode 100644
index 00000000000..c880b687ca6
--- /dev/null
+++ b/ACE/tests/Proactor_UDP_Test.cpp
@@ -0,0 +1,2214 @@
+// $Id$
+
+// ============================================================================
+/**
+ * @file Proactor_UDP_Test.cpp
+ *
+ * $Id$
+ *
+ * This program illustrates how the ACE_Proactor can be used to
+ * implement an application that uses UDP/IP communications.
+ *
+ * @author Steve Huston <shuston@riverace.com>, based on Proactor_Test.cpp
+ */
+// ============================================================================
+
+#include "test_config.h"
+
+#if defined (ACE_HAS_THREADS) && (defined (ACE_HAS_WIN32_OVERLAPPED_IO) || defined (ACE_HAS_AIO_CALLS))
+ // This only works on Win32 platforms and on Unix platforms
+ // supporting POSIX aio calls.
+
+#include "ace/Signal.h"
+
+#include "ace/Service_Config.h"
+#include "ace/INET_Addr.h"
+#include "ace/SOCK_CODgram.h"
+#include "ace/SOCK_Dgram.h"
+#include "ace/Object_Manager.h"
+#include "ace/Get_Opt.h"
+
+#include "ace/Proactor.h"
+#include "ace/Task.h"
+#include "ace/Thread_Semaphore.h"
+#include "ace/OS_NS_ctype.h"
+#include "ace/OS_NS_errno.h"
+#include "ace/OS_NS_signal.h"
+#include "ace/OS_NS_string.h"
+#include "ace/OS_NS_unistd.h"
+#include "ace/OS_NS_sys_socket.h"
+#include "ace/Sock_Connect.h"
+#include "ace/os_include/netinet/os_tcp.h"
+
+#include "ace/Atomic_Op.h"
+#include "ace/Synch_Traits.h"
+
+#if defined (ACE_WIN32)
+
+# include "ace/WIN32_Proactor.h"
+
+#elif defined (ACE_HAS_AIO_CALLS)
+
+# include "ace/POSIX_Proactor.h"
+# include "ace/POSIX_CB_Proactor.h"
+# include "ace/SUN_Proactor.h"
+
+#endif /* ACE_WIN32 */
+
+// Proactor Type (UNIX only, Win32 ignored)
+typedef enum { DEFAULT = 0, AIOCB, SIG, SUN, CB } ProactorType;
+static ProactorType proactor_type = DEFAULT;
+
+// POSIX : > 0 max number aio operations proactor,
+static size_t max_aio_operations = 0;
+
+// both: 0 run client or server / depends on host
+// != 0 run client and server
+static int both = 0;
+
+// Host that we're connecting to.
+static const ACE_TCHAR *host = 0;
+
+// number of Client instances
+static int clients = 1;
+const int MAX_CLIENTS = 1000;
+const int MAX_SERVERS = 1000;
+
+// duplex mode: == 0 half-duplex
+// != 0 full duplex
+static int duplex = 0;
+
+// number threads in the Proactor thread pool
+static int threads = 1;
+
+// Port that we're receiving session initiations on.
+static u_short port = ACE_DEFAULT_SERVER_PORT;
+
+// Log options
+static int loglevel; // 0 full , 1 only errors
+
+static size_t xfer_limit; // Number of bytes for Client to send.
+
+static char complete_message[] =
+ "GET / HTTP/1.1\r\n"
+ "Accept: */*\r\n"
+ "Accept-Language: C++\r\n"
+ "Accept-Encoding: gzip, deflate\r\n"
+ "User-Agent: Proactor_Test/1.0 (non-compatible)\r\n"
+ "Connection: Keep-Alive\r\n"
+ "\r\n";
+
+static char close_req_msg[] = "CLOSE";
+static char close_ack_msg[] = "CLOSE-ACK";
+
+class LogLocker
+{
+public:
+
+ LogLocker () { ACE_LOG_MSG->acquire (); }
+ virtual ~LogLocker () { ACE_LOG_MSG->release (); }
+};
+
+// Function to remove signals from the signal mask.
+static int
+disable_signal (int sigmin, int sigmax)
+{
+#if !defined (ACE_LACKS_UNIX_SIGNALS)
+ sigset_t signal_set;
+ if (ACE_OS::sigemptyset (&signal_set) == - 1)
+ ACE_ERROR ((LM_ERROR,
+ ACE_TEXT ("Error: (%P|%t):%p\n"),
+ ACE_TEXT ("sigemptyset failed")));
+
+ for (int i = sigmin; i <= sigmax; i++)
+ ACE_OS::sigaddset (&signal_set, i);
+
+ // Put the <signal_set>.
+# if defined (ACE_LACKS_PTHREAD_THR_SIGSETMASK)
+ // In multi-threaded application this is not POSIX compliant
+ // but let's leave it just in case.
+ if (ACE_OS::sigprocmask (SIG_BLOCK, &signal_set, 0) != 0)
+# else
+ if (ACE_OS::thr_sigsetmask (SIG_BLOCK, &signal_set, 0) != 0)
+# endif /* ACE_LACKS_PTHREAD_THR_SIGSETMASK */
+ ACE_ERROR_RETURN ((LM_ERROR,
+ ACE_TEXT ("Error: (%P|%t): %p\n"),
+ ACE_TEXT ("SIG_BLOCK failed")),
+ -1);
+#else
+ ACE_UNUSED_ARG (sigmin);
+ ACE_UNUSED_ARG (sigmax);
+#endif /* ACE_LACKS_UNIX_SIGNALS */
+
+ return 0;
+}
+
+// *************************************************************
+// MyTask is ACE_Task resposible for :
+// 1. creation and deletion of
+// Proactor and Proactor thread pool
+// 2. running Proactor event loop
+// *************************************************************
+
+/**
+ * @class MyTask
+ *
+ * MyTask plays role for Proactor threads pool
+ *
+ * MyTask is ACE_Task resposible for:
+ * 1. Creation and deletion of Proactor and Proactor thread pool
+ * 2. Running Proactor event loop
+ */
+class MyTask : public ACE_Task<ACE_MT_SYNCH>
+{
+public:
+ MyTask (void):
+ lock_ (),
+ sem_ ((unsigned int) 0),
+ proactor_(0) {}
+
+ virtual ~MyTask()
+ {
+ (void) this->stop ();
+ (void) this->delete_proactor();
+ }
+
+ virtual int svc (void);
+
+ int start (int num_threads,
+ ProactorType type_proactor,
+ size_t max_op );
+ int stop (void);
+
+private:
+ int create_proactor (ProactorType type_proactor,
+ size_t max_op);
+ int delete_proactor (void);
+
+ ACE_SYNCH_RECURSIVE_MUTEX lock_;
+ ACE_Thread_Semaphore sem_;
+ ACE_Proactor * proactor_;
+
+};
+
+int
+MyTask::create_proactor (ProactorType type_proactor, size_t max_op)
+{
+ ACE_GUARD_RETURN (ACE_SYNCH_RECURSIVE_MUTEX,
+ monitor,
+ this->lock_,
+ -1);
+
+ ACE_TEST_ASSERT (this->proactor_ == 0);
+
+#if defined (ACE_WIN32)
+
+ ACE_UNUSED_ARG (type_proactor);
+ ACE_UNUSED_ARG (max_op);
+
+ ACE_WIN32_Proactor *proactor_impl = 0;
+
+ ACE_NEW_RETURN (proactor_impl,
+ ACE_WIN32_Proactor,
+ -1);
+
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT("(%t) Create Proactor Type = WIN32\n")));
+
+#elif defined (ACE_HAS_AIO_CALLS)
+
+ ACE_POSIX_Proactor * proactor_impl = 0;
+
+ switch (type_proactor)
+ {
+ case AIOCB:
+ ACE_NEW_RETURN (proactor_impl,
+ ACE_POSIX_AIOCB_Proactor (max_op),
+ -1);
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("(%t) Create Proactor Type = AIOCB\n")));
+ break;
+
+#if defined(ACE_HAS_POSIX_REALTIME_SIGNALS)
+ case SIG:
+ ACE_NEW_RETURN (proactor_impl,
+ ACE_POSIX_SIG_Proactor (max_op),
+ -1);
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("(%t) Create Proactor Type = SIG\n")));
+ break;
+#endif /* ACE_HAS_POSIX_REALTIME_SIGNALS */
+
+# if defined (sun)
+ case SUN:
+ ACE_NEW_RETURN (proactor_impl,
+ ACE_SUN_Proactor (max_op),
+ -1);
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT("(%t) Create Proactor Type = SUN\n")));
+ break;
+# endif /* sun */
+
+# if !defined(ACE_HAS_BROKEN_SIGEVENT_STRUCT)
+ case CB:
+ ACE_NEW_RETURN (proactor_impl,
+ ACE_POSIX_CB_Proactor (max_op),
+ -1);
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("(%t) Create Proactor Type = CB\n")));
+ break;
+# endif /* !ACE_HAS_BROKEN_SIGEVENT_STRUCT */
+
+ default:
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("(%t) Create Proactor Type = DEFAULT\n")));
+ break;
+ }
+
+#endif /* ACE_WIN32 */
+
+ // always delete implementation 1 , not !(proactor_impl == 0)
+ ACE_NEW_RETURN (this->proactor_,
+ ACE_Proactor (proactor_impl, 1 ),
+ -1);
+ // Set new singleton and delete it in close_singleton()
+ ACE_Proactor::instance (this->proactor_, 1);
+ return 0;
+}
+
+int
+MyTask::delete_proactor (void)
+{
+ ACE_GUARD_RETURN (ACE_SYNCH_RECURSIVE_MUTEX,
+ monitor,
+ this->lock_,
+ -1);
+
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("(%t) Delete Proactor\n")));
+
+ ACE_Proactor::close_singleton ();
+ this->proactor_ = 0;
+
+ return 0;
+}
+
+int
+MyTask::start (int num_threads,
+ ProactorType type_proactor,
+ size_t max_op)
+{
+ if (this->create_proactor (type_proactor, max_op) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ ACE_TEXT ("%p.\n"),
+ ACE_TEXT ("unable to create proactor")),
+ -1);
+
+ if (this->activate (THR_NEW_LWP, num_threads) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ ACE_TEXT ("%p.\n"),
+ ACE_TEXT ("unable to activate thread pool")),
+ -1);
+
+ for (; num_threads > 0; num_threads--)
+ {
+ sem_.acquire ();
+ }
+
+ return 0;
+}
+
+
+int
+MyTask::stop ()
+{
+ if (this->proactor_ != 0)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("(%t) Calling End Proactor event loop\n")));
+
+ this->proactor_->proactor_end_event_loop ();
+ }
+
+ if (this->wait () == -1)
+ ACE_ERROR ((LM_ERROR,
+ ACE_TEXT ("%p.\n"),
+ ACE_TEXT ("unable to stop thread pool")));
+
+ return 0;
+}
+
+int
+MyTask::svc (void)
+{
+ ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) MyTask started\n")));
+
+ disable_signal (ACE_SIGRTMIN, ACE_SIGRTMAX);
+ disable_signal (SIGPIPE, SIGPIPE);
+
+ // signal that we are ready
+ sem_.release (1);
+
+ this->proactor_->proactor_run_event_loop ();
+
+ ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) MyTask finished\n")));
+ return 0;
+}
+
+// forward declaration
+class TestData;
+
+// "Server" is one side of a session. It's the same idea as in TCP, but
+// there's no acceptor in UDP; sessions are started by the client sending
+// a "start" datagram to a well-known UDP port. The start datagram tells
+// which port number the client is receiving on. The server then sends an
+// "ack" datagram to indicate the session is set up successfully and to say
+// which port the server is listening on. Thus, a unique pairing of server
+// and client port numbers is established. Each session will require a
+// separate server-side socket as well as the client. Note that experienced
+// UDP programmers will be quivering at this point knowing that there's no
+// reason to have multiple server-side sockets, and no real reason to
+// pre-register the client ports either since all the addressing info is
+// available on normal UDP programming. However, this is all necessary in
+// the POSIX case since the POSIX aio functions were not designed with UDP
+// in mind, and the addressing information is not available in UDP receive
+// completion callbacks; thus, each socket needs to be fully connected before
+// running session data. The addressing information needed to run this
+// use-case in the "normal" way is available on Windows, but this test runs
+// across many platforms, so can't rely on that information.
+class Server : public ACE_Handler
+{
+public:
+ Server ();
+ Server (TestData *tester, int id);
+ ~Server (void);
+
+ int id (void) { return this->id_; }
+ size_t get_total_snd (void) { return this->total_snd_; }
+ size_t get_total_rcv (void) { return this->total_rcv_; }
+ long get_total_w (void) { return this->total_w_; }
+ long get_total_r (void) { return this->total_r_; }
+
+ /// This is called after the new session has been established.
+ void go (ACE_HANDLE handle, const ACE_INET_Addr &client);
+
+ void cancel ();
+
+protected:
+ /**
+ * @name AIO callback handling
+ *
+ * These methods are called by the framework
+ */
+ /// This is called when asynchronous <read> operation from the
+ /// socket completes.
+ virtual void handle_read_dgram (const ACE_Asynch_Read_Dgram::Result &result);
+
+ /// This is called when an asynchronous <write> to the socket
+ /// completes.
+ virtual void handle_write_dgram (const ACE_Asynch_Write_Dgram::Result &result);
+
+private:
+ int initiate_read (void);
+ int initiate_write (ACE_Message_Block *mb, size_t nbytes);
+
+ TestData *tester_;
+ int id_;
+
+ ACE_INET_Addr client_addr_;
+ ACE_Asynch_Read_Dgram rs_;
+ ACE_Asynch_Write_Dgram ws_;
+ ACE_SYNCH_MUTEX lock_;
+
+ int io_count_; // Number of currently outstanding I/O requests
+ bool flg_cancel_;
+ bool flg_closing_;
+ size_t total_snd_; // Number of bytes successfully sent
+ size_t total_rcv_; // Number of bytes successfully received
+ int total_w_; // Number of write operations
+ int total_r_; // Number of read operations
+};
+
+// *******************************************
+// Client
+// *******************************************
+
+class Client : public ACE_Handler
+{
+public:
+ Client ();
+ Client (TestData *tester, int id);
+ ~Client (void);
+
+ void go (ACE_HANDLE h, const ACE_INET_Addr &server);
+ int id (void) { return this->id_; }
+ size_t get_total_snd (void) { return this->total_snd_; }
+ size_t get_total_rcv (void) { return this->total_rcv_; }
+ int get_total_w (void) { return this->total_w_; }
+ int get_total_r (void) { return this->total_r_; }
+
+ // This is called when asynchronous reads from the socket complete
+ virtual void handle_read_dgram (const ACE_Asynch_Read_Dgram::Result &result);
+
+ // This is called when asynchronous writes from the socket complete
+ virtual void handle_write_dgram (const ACE_Asynch_Write_Dgram::Result &result);
+
+ void cancel (void);
+
+private:
+ int initiate_read (void);
+ int initiate_write (void);
+ // FUZZ: disable check_for_lack_ACE_OS
+ void close (void);
+ // FUZZ: enable check_for_lack_ACE_OS
+
+ TestData *tester_;
+ int id_;
+
+ ACE_INET_Addr server_addr_;
+ ACE_Asynch_Read_Dgram rs_;
+ ACE_Asynch_Write_Dgram ws_;
+
+ ACE_SYNCH_MUTEX lock_;
+
+ int io_count_;
+ int stop_writing_; // Writes are shut down; just read.
+ bool flg_cancel_;
+ size_t total_snd_;
+ size_t total_rcv_;
+ int total_w_;
+ int total_r_;
+};
+
+// TestData collects and reports on test-related transfer and connection
+// statistics.
+class TestData
+{
+public:
+ TestData ();
+ bool testing_done (void);
+ Server *server_up (void);
+ Client *client_up (void);
+ void server_done (Server *s);
+ void client_done (Client *c);
+ void stop_all (void);
+ void report (void);
+
+private:
+ struct Local_Stats
+ {
+ // Track number of sessions that report start, and those that report
+ // their end (and stats).
+ ACE_Atomic_Op<ACE_SYNCH_MUTEX, int> sessions_up_;
+ ACE_Atomic_Op<ACE_SYNCH_MUTEX, int> sessions_down_;
+
+ // Total read and write bytes for all sessions.
+ ACE_Atomic_Op<ACE_SYNCH_MUTEX, size_t> w_cnt_;
+ ACE_Atomic_Op<ACE_SYNCH_MUTEX, size_t> r_cnt_;
+ // Total read and write operations issues for all sessions.
+ ACE_Atomic_Op<ACE_SYNCH_MUTEX, size_t> w_ops_;
+ ACE_Atomic_Op<ACE_SYNCH_MUTEX, size_t> r_ops_;
+ } servers_, clients_;
+
+ ACE_SYNCH_MUTEX list_lock_;
+ Server *server_list_[MAX_SERVERS];
+ Client *client_list_[MAX_CLIENTS];
+};
+
+TestData::TestData ()
+{
+ int i;
+ for (i = 0; i < MAX_SERVERS; ++i)
+ this->server_list_[i] = 0;
+ for (i = 0; i < MAX_CLIENTS; ++i)
+ this->client_list_[i] = 0;
+}
+
+bool
+TestData::testing_done (void)
+{
+ int svr_up = this->servers_.sessions_up_.value ();
+ int svr_dn = this->servers_.sessions_down_.value ();
+ int clt_up = this->clients_.sessions_up_.value ();
+ int clt_dn = this->clients_.sessions_down_.value ();
+
+ if (svr_up == 0 && clt_up == 0) // No connections up yet
+ return false;
+
+ return (svr_dn >= svr_up && clt_dn >= clt_up);
+}
+
+Server *
+TestData::server_up (void)
+{
+ ++this->servers_.sessions_up_;
+ ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, monitor, this->list_lock_, 0);
+
+ for (int i = 0; i < MAX_SERVERS; ++i)
+ {
+ if (this->server_list_[i] == 0)
+ {
+ ACE_NEW_RETURN (this->server_list_[i], Server (this, i), 0);
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("(%t) Server %d up; now %d up, %d down.\n"),
+ i,
+ this->servers_.sessions_up_.value (),
+ this->servers_.sessions_down_.value ()));
+ return this->server_list_[i];
+ }
+ }
+ return 0;
+}
+
+Client *
+TestData::client_up (void)
+{
+ ++this->clients_.sessions_up_;
+ ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, monitor, this->list_lock_, 0);
+
+ for (int i = 0; i < MAX_CLIENTS; ++i)
+ {
+ if (this->client_list_[i] == 0)
+ {
+ ACE_NEW_RETURN (this->client_list_[i], Client (this, i), 0);
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("(%t) Client %d up; now %d up, %d down.\n"),
+ i,
+ this->clients_.sessions_up_.value (),
+ this->clients_.sessions_down_.value ()));
+ return this->client_list_[i];
+ }
+ }
+ return 0;
+}
+
+void
+TestData::server_done (Server *s)
+{
+ this->servers_.w_cnt_ += s->get_total_snd ();
+ this->servers_.r_cnt_ += s->get_total_rcv ();
+ this->servers_.w_ops_ += s->get_total_w ();
+ this->servers_.r_ops_ += s->get_total_r ();
+ ++this->servers_.sessions_down_;
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("(%t) Server %d gone; now %d up, %d down\n"),
+ s->id (),
+ this->servers_.sessions_up_.value (),
+ this->servers_.sessions_down_.value ()));
+
+ ACE_GUARD (ACE_SYNCH_MUTEX, monitor, this->list_lock_);
+ int i;
+ for (i = 0; i < MAX_SERVERS; ++i)
+ {
+ if (this->server_list_[i] == s)
+ {
+ if (s->id () != i)
+ ACE_ERROR ((LM_ERROR,
+ ACE_TEXT ("Server %d is pos %d in list\n"),
+ s->id (),
+ i));
+ this->server_list_[i] = 0;
+ break;
+ }
+ }
+ if (i >= MAX_SERVERS)
+ ACE_ERROR ((LM_ERROR, ACE_TEXT ("Server %@ done but not listed\n"), s));
+
+ return;
+}
+
+void
+TestData::client_done (Client *c)
+{
+ this->clients_.w_cnt_ += c->get_total_snd ();
+ this->clients_.r_cnt_ += c->get_total_rcv ();
+ this->clients_.w_ops_ += c->get_total_w ();
+ this->clients_.r_ops_ += c->get_total_r ();
+ ++this->clients_.sessions_down_;
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("(%t) Client %d gone; now %d up, %d down\n"),
+ c->id (),
+ this->clients_.sessions_up_.value (),
+ this->clients_.sessions_down_.value ()));
+
+ ACE_GUARD (ACE_SYNCH_MUTEX, monitor, this->list_lock_);
+ int i;
+ for (i = 0; i < MAX_CLIENTS; ++i)
+ {
+ if (this->client_list_[i] == c)
+ {
+ if (c->id () != i)
+ ACE_ERROR ((LM_ERROR,
+ ACE_TEXT ("Client %d is pos %d in list\n"),
+ c->id (),
+ i));
+ this->client_list_[i] = 0;
+ break;
+ }
+ }
+ if (i >= MAX_CLIENTS)
+ ACE_ERROR ((LM_ERROR, ACE_TEXT ("Client %@ done but not listed\n"), c));
+
+ return;
+}
+
+void
+TestData::stop_all (void)
+{
+ int i;
+
+ // Lock and cancel everything. Then release the lock, possibly allowing
+ // cleanups, then grab it again and delete all Servers and Clients.
+ {
+ ACE_GUARD (ACE_SYNCH_MUTEX, monitor, this->list_lock_);
+ for (i = 0; i < MAX_CLIENTS; ++i)
+ {
+ if (this->client_list_[i] != 0)
+ this->client_list_[i]->cancel ();
+ }
+
+ for (i = 0; i < MAX_SERVERS; ++i)
+ {
+ if (this->server_list_[i] != 0)
+ this->server_list_[i]->cancel ();
+ }
+ }
+ {
+ ACE_GUARD (ACE_SYNCH_MUTEX, monitor, this->list_lock_);
+ for (i = 0; i < MAX_CLIENTS; ++i)
+ {
+ if (this->client_list_[i] != 0)
+ delete this->client_list_[i];
+ }
+
+ for (i = 0; i < MAX_SERVERS; ++i)
+ {
+ if (this->server_list_[i] != 0)
+ delete this->server_list_[i];
+ }
+ }
+}
+
+void
+TestData::report (void)
+{
+ // Print statistics
+ ACE_TCHAR bufs [256];
+ ACE_TCHAR bufr [256];
+
+ ACE_OS::sprintf (bufs,
+ ACE_SIZE_T_FORMAT_SPECIFIER
+ ACE_TEXT ("(") ACE_SIZE_T_FORMAT_SPECIFIER ACE_TEXT (")"),
+ this->clients_.w_cnt_.value (),
+ this->clients_.w_ops_.value ());
+
+ ACE_OS::sprintf (bufr,
+ ACE_SIZE_T_FORMAT_SPECIFIER
+ ACE_TEXT ("(") ACE_SIZE_T_FORMAT_SPECIFIER ACE_TEXT (")"),
+ this->clients_.r_cnt_.value (),
+ this->clients_.r_ops_.value ());
+
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("Clients total bytes (ops): snd=%s rcv=%s\n"),
+ bufs,
+ bufr));
+
+ ACE_OS::sprintf (bufs,
+ ACE_SIZE_T_FORMAT_SPECIFIER
+ ACE_TEXT ("(") ACE_SIZE_T_FORMAT_SPECIFIER ACE_TEXT (")"),
+ this->servers_.w_cnt_.value (),
+ this->servers_.w_ops_.value ());
+
+ ACE_OS::sprintf (bufr,
+ ACE_SIZE_T_FORMAT_SPECIFIER
+ ACE_TEXT ("(") ACE_SIZE_T_FORMAT_SPECIFIER ACE_TEXT (")"),
+ this->servers_.r_cnt_.value (),
+ this->servers_.r_ops_.value ());
+
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("Servers total bytes (ops): snd=%s rcv=%s\n"),
+ bufs,
+ bufr));
+
+ if (this->clients_.w_cnt_.value () == 0 ||
+ this->clients_.r_cnt_.value () == 0 ||
+ this->servers_.w_cnt_.value () == 0 ||
+ this->servers_.r_cnt_.value () == 0 )
+ ACE_ERROR ((LM_ERROR, ACE_TEXT ("It appears that this test didn't ")
+ ACE_TEXT ("really do anything. Something is very wrong.\n")));
+}
+
+// Session set-up struct.
+struct Session_Data
+{
+ ACE_INT32 direction_; // 0 == Start, 1 == Ack
+ ACE_INT32 addr_; // Network byte order, must be IPv4
+ ACE_UINT16 port_; // UDP port, network byte order
+ Session_Data() { ACE_OS::memset (this, 0, sizeof(*this)); }
+};
+
+// Master is the server-side receiver of session establishment requests.
+// For each "start" dgram received, instantiates a new Server object,
+// indicating the addressing info for the client.
+// Master is initialized with a count of the number of expected sessions. After
+// this number are set up, Master will stop listening for session requests.
+// This is a bit fragile but is necessary because on HP-UX, AIX, et al., it
+// is impossible to close/cancel a socket with an outstanding UDP recieve
+// (on AIX the process is so wedged the machine needs to be rebooted to
+// clear it!). So, this bit of messiness is necessary for portability.
+// When the Master is destroyed, it will try to stop establishing sessions
+// but this will only work on Windows.
+class Master : public ACE_Handler
+{
+public:
+ Master (TestData *tester, const ACE_INET_Addr &recv_addr, int expected);
+ ~Master (void);
+
+ // Called when dgram receive operation completes.
+ virtual void handle_read_dgram (const ACE_Asynch_Read_Dgram::Result &result);
+
+private:
+ void start_recv (void);
+
+ TestData *tester_;
+ ACE_INET_Addr recv_addr_;
+ ACE_SOCK_Dgram sock_;
+ ACE_Asynch_Read_Dgram rd_;
+ ACE_Message_Block *mb_;
+ ACE_Atomic_Op<ACE_SYNCH_MUTEX, int> sessions_expected_;
+ volatile bool recv_in_progress_;
+};
+
+// *************************************************************
+Master::Master (TestData *tester, const ACE_INET_Addr &recv_addr, int expected)
+ : tester_ (tester),
+ recv_addr_ (recv_addr),
+ mb_ (0),
+ sessions_expected_ (expected),
+ recv_in_progress_ (false)
+{
+ if (this->sock_.open (recv_addr) == -1)
+ ACE_ERROR ((LM_ERROR, ACE_TEXT ("Master socket %p\n"), ACE_TEXT ("open")));
+ else
+ {
+ if (this->rd_.open (*this, this->sock_.get_handle ()) == -1)
+ ACE_ERROR ((LM_ERROR,
+ ACE_TEXT ("Master reader %p\n"),
+ ACE_TEXT ("open")));
+ this->mb_ = new ACE_Message_Block (sizeof (Session_Data));
+ start_recv ();
+ }
+}
+
+Master::~Master (void)
+{
+ if (this->recv_in_progress_)
+ this->rd_.cancel ();
+ this->sock_.close ();
+
+ if (this->mb_ != 0)
+ {
+ this->mb_->release ();
+ this->mb_ = 0;
+ }
+}
+
+void
+Master::handle_read_dgram (const ACE_Asynch_Read_Dgram::Result &result)
+{
+ // We should only receive Start datagrams with valid addresses to reply to.
+ if (result.success ())
+ {
+ if (result.bytes_transferred () != sizeof (Session_Data))
+ ACE_ERROR ((LM_ERROR,
+ ACE_TEXT ("(%t) Master session data expected %B bytes; ")
+ ACE_TEXT ("received %B\n"),
+ sizeof (Session_Data),
+ result.bytes_transferred ()));
+ else
+ {
+ ACE_Message_Block *mb = result.message_block ();
+ Session_Data *session =
+ reinterpret_cast<Session_Data*>(mb->rd_ptr ());
+ if (session->direction_ == 0)
+ {
+ ACE_INET_Addr client_addr, me_addr;
+ ACE_TCHAR client_str[80], me_str[80];
+ client_addr.set ((u_short)session->port_, session->addr_, 0);
+ client_addr.addr_to_string (client_str, 80);
+
+ // Set up the local and remote addresses - need fully-specified
+ // addresses to use UDP aio on Linux. This is the socket that
+ // the session will run over. The addressing info to be sent
+ // back to the Client will be sent over the receive socket
+ // to ensure it goes back to the client initiating the session.
+ ACE_SOCK_CODgram sock;
+ if (sock.open (client_addr) == -1)
+ {
+ ACE_ERROR ((LM_ERROR,
+ ACE_TEXT ("(%t) Master new socket for ")
+ ACE_TEXT ("client %s: %p\n"),
+ client_str,
+ ACE_TEXT ("open")));
+ }
+ else
+ {
+ sock.get_local_addr (me_addr);
+ me_addr.addr_to_string (me_str, 80);
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("(%t) Master setting up server for ")
+ ACE_TEXT ("local %s, peer %s\n"),
+ me_str,
+ client_str));
+
+ Session_Data session;
+ session.direction_ = 1; // Ack
+ session.addr_ = ACE_HTONL (me_addr.get_ip_address ());
+ session.port_ = ACE_HTONS (me_addr.get_port_number ());
+ if (this->sock_.send (&session,
+ sizeof (session),
+ client_addr) == -1)
+ {
+ ACE_ERROR ((LM_ERROR,
+ ACE_TEXT ("(%t) Master reply %p\n"),
+ ACE_TEXT ("send")));
+ sock.close ();
+ }
+ else
+ {
+ Server *server = this->tester_->server_up ();
+ server->go (sock.get_handle (), client_addr);
+ }
+ }
+ if (--this->sessions_expected_ == 0)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("All expected sessions are up\n")));
+ }
+ }
+ else
+ {
+ ACE_ERROR ((LM_ERROR,
+ ACE_TEXT ("(%t) Badly formed Session request\n")));
+ }
+ }
+ }
+ else
+ {
+ ACE_Log_Priority prio = LM_ERROR;
+#if defined (ACE_WIN32)
+ if (result.error () == ERROR_OPERATION_ABORTED)
+ prio = LM_DEBUG;
+#else
+ if (result.error () == ECANCELED)
+ prio = LM_DEBUG;
+#endif /* ACE_WIN32 */
+ // Multiple steps to log the error without squashing errno.
+ ACE_LOG_MSG->conditional_set (__FILE__,
+ __LINE__,
+ -1,
+ (int)(result.error ()));
+ ACE_LOG_MSG->log (prio,
+ ACE_TEXT ("(%t) Master %p\n"),
+ ACE_TEXT ("recv"));
+ // If canceled, don't try to restart.
+ if (prio == LM_DEBUG)
+ return;
+ }
+ this->start_recv ();
+}
+
+void
+Master::start_recv (void)
+{
+ if (this->mb_ == 0)
+ return;
+
+ size_t unused = 0;
+ this->mb_->reset ();
+ if (this->rd_.recv (this->mb_, unused, 0) == -1)
+ ACE_ERROR ((LM_ERROR, ACE_TEXT ("(%t) Master %p\n"), ACE_TEXT ("recv")));
+ else
+ this->recv_in_progress_ = true;
+}
+
+// ***************************************************
+Server::Server ()
+{
+ ACE_ERROR ((LM_ERROR, ACE_TEXT ("Shouldn't use this constructor!\n")));
+}
+
+Server::Server (TestData *tester, int id)
+ : tester_ (tester),
+ id_ (id),
+ io_count_ (0),
+ flg_cancel_(false),
+ flg_closing_ (false),
+ total_snd_(0),
+ total_rcv_(0),
+ total_w_ (0),
+ total_r_ (0)
+{
+}
+
+Server::~Server (void)
+{
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("(%t) Server %d dtor; %d sends (%B bytes); ")
+ ACE_TEXT ("%d recvs (%B bytes)\n"),
+ this->id_,
+ this->total_w_, this->total_snd_,
+ this->total_r_, this->total_rcv_));
+ if (this->io_count_ != 0)
+ ACE_ERROR ((LM_WARNING,
+ ACE_TEXT ("(%t) Server %d deleted with ")
+ ACE_TEXT ("%d I/O outstanding\n"),
+ this->id_,
+ this->io_count_));
+
+ // This test bounces data back and forth between Clients and Servers.
+ // Therefore, if there was significantly more data in one direction, that's
+ // a problem. Remember, the byte counts are unsigned values.
+ int issue_data_warning = 0;
+ if (this->total_snd_ > this->total_rcv_)
+ {
+ if (this->total_rcv_ == 0)
+ issue_data_warning = 1;
+ else if (this->total_snd_ / this->total_rcv_ > 2)
+ issue_data_warning = 1;
+ }
+ else
+ {
+ if (this->total_snd_ == 0)
+ issue_data_warning = 1;
+ else if (this->total_rcv_ / this->total_snd_ > 2)
+ issue_data_warning = 1;
+ }
+ if (issue_data_warning)
+ ACE_DEBUG ((LM_WARNING,
+ ACE_TEXT ("(%t) Above byte counts look odd; need review\n")));
+
+ if (this->tester_ != 0)
+ this->tester_->server_done (this);
+
+ if (this->handle () != ACE_INVALID_HANDLE)
+ ACE_OS::closesocket (this->handle ());
+
+ this->id_ = -1;
+ this->handle (ACE_INVALID_HANDLE);
+}
+
+void
+Server::cancel ()
+{
+ ACE_GUARD (ACE_SYNCH_MUTEX, monitor, this->lock_);
+
+ this->flg_cancel_ = true;
+ this->ws_.cancel ();
+ this->rs_.cancel ();
+ return;
+}
+
+void
+Server::go (ACE_HANDLE handle, const ACE_INET_Addr &client)
+{
+ this->handle (handle);
+ this->client_addr_.set (client);
+
+ // Lock this before initiating I/O, else it may complete while we're
+ // still setting up.
+ {
+ ACE_GUARD (ACE_SYNCH_MUTEX, monitor, this->lock_);
+
+ if (this->ws_.open (*this, this->handle ()) == -1)
+ ACE_ERROR ((LM_ERROR,
+ ACE_TEXT ("(%t) %p\n"),
+ ACE_TEXT ("Server::ACE_Asynch_Write_Dgram::open")));
+ else if (this->rs_.open (*this, this->handle ()) == -1)
+ ACE_ERROR ((LM_ERROR,
+ ACE_TEXT ("(%t) %p\n"),
+ ACE_TEXT ("Server::ACE_Asynch_Read_Dgram::open")));
+ else
+ this->initiate_read ();
+ }
+
+ if (this->io_count_ > 0)
+ return;
+
+ delete this; // Error setting up I/O factories
+}
+
+int
+Server::initiate_read (void)
+{
+ if (this->flg_cancel_ || this->handle () == ACE_INVALID_HANDLE)
+ return -1;
+
+ ACE_Message_Block *mb = 0;
+ ACE_NEW_RETURN (mb,
+ ACE_Message_Block (1024), //BUFSIZ + 1),
+ -1);
+
+ // Inititiate receive
+ size_t unused = 0;
+ if (this->rs_.recv (mb, unused, 0) == -1)
+ {
+ mb->release ();
+ ACE_ERROR_RETURN ((LM_ERROR,
+ ACE_TEXT ("(%t) Server %d, %p\n"),
+ this->id_,
+ ACE_TEXT ("read")),
+ -1);
+ }
+
+ this->io_count_++;
+ this->total_r_++;
+ return 0;
+}
+
+int
+Server::initiate_write (ACE_Message_Block *mb, size_t nbytes)
+{
+ if (this->flg_cancel_ || this->handle () == ACE_INVALID_HANDLE)
+ {
+ mb->release ();
+ return -1;
+ }
+
+ if (nbytes == 0)
+ {
+ mb->release ();
+ ACE_ERROR_RETURN((LM_ERROR,
+ ACE_TEXT ("(%t) Server %d write nbytes == 0\n"),
+ this->id_),
+ -1);
+ }
+
+ if (this->ws_.send (mb, nbytes, 0, this->client_addr_) == -1)
+ {
+ mb->release ();
+ ACE_ERROR_RETURN((LM_ERROR,
+ ACE_TEXT ("(%t) Server %d, %p\n"),
+ this->id_,
+ ACE_TEXT ("write")),
+ -1);
+ }
+
+ this->io_count_++;
+ this->total_w_++;
+ return 0;
+}
+
+void
+Server::handle_read_dgram (const ACE_Asynch_Read_Dgram::Result &result)
+{
+ {
+ ACE_GUARD (ACE_SYNCH_MUTEX, monitor, this->lock_ );
+
+ ACE_Message_Block *mb = result.message_block ();
+
+ // Reset pointers.
+ mb->rd_ptr ()[result.bytes_transferred ()] = '\0';
+
+ if (loglevel > 1)
+ {
+ LogLocker log_lock;
+
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("(%t) **** Server %d: handle_read_dgram() ****\n"),
+ this->id_));
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("%s = %B\n"),
+ ACE_TEXT ("bytes_to_read"),
+ result.bytes_to_read ()));
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("%s = %d\n"),
+ ACE_TEXT ("handle"),
+ result.handle ()));
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("%s = %B\n"),
+ ACE_TEXT ("bytes_transfered"),
+ result.bytes_transferred ()));
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("%s = %@\n"),
+ ACE_TEXT ("act"),
+ result.act ()));
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("%s = %d\n"),
+ ACE_TEXT ("success"),
+ result.success ()));
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("%s = %@\n"),
+ ACE_TEXT ("completion_key"),
+ result.completion_key ()));
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("%s = %d\n"),
+ ACE_TEXT ("error"),
+ result.error ()));
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("%s = %s\n"),
+ ACE_TEXT ("message_block"),
+ mb->rd_ptr ()));
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("**** end of message ****************\n")));
+ }
+ else if (result.error () != 0)
+ {
+ ACE_Log_Priority prio;
+#if defined (ACE_WIN32)
+ if (result.error () == ERROR_OPERATION_ABORTED)
+ prio = LM_DEBUG;
+#else
+ if (result.error () == ECANCELED)
+ prio = LM_DEBUG;
+#endif /* ACE_WIN32 */
+ else
+ prio = LM_ERROR;
+ ACE_LOG_MSG->errnum (result.error ());
+ ACE_LOG_MSG->log (prio,
+ ACE_TEXT ("(%t) Server %d; %p\n"),
+ this->id_,
+ ACE_TEXT ("read"));
+ }
+ else if (loglevel > 0)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("(%t) Server %d: read %B bytes\n"),
+ this->id_,
+ result.bytes_transferred ()));
+ }
+
+ if (result.error () == 0 && result.bytes_transferred () > 0)
+ {
+ this->total_rcv_ += result.bytes_transferred ();
+
+ // If client says we're done, ack it; we're done reading.
+ size_t to_send = result.bytes_transferred ();
+ if (ACE_OS::strcmp (mb->rd_ptr (), close_req_msg) == 0)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("(%t) Server %d saw close request; ack\n"),
+ this->id_));
+ this->flg_closing_ = true;
+ mb->reset ();
+ mb->copy (close_ack_msg);
+ to_send = mb->length ();
+ }
+ if (this->initiate_write (mb, to_send) == 0)
+ {
+ if (duplex != 0 && !this->flg_closing_)
+ this->initiate_read ();
+ }
+ }
+ else
+ mb->release ();
+
+ --this->io_count_;
+ if (this->io_count_ > 0)
+ return;
+ }
+ delete this;
+}
+
+void
+Server::handle_write_dgram (const ACE_Asynch_Write_Dgram::Result &result)
+{
+ {
+ ACE_GUARD (ACE_SYNCH_MUTEX, monitor, this->lock_);
+
+ ACE_Message_Block *mb = result.message_block ();
+
+ if (loglevel > 1)
+ {
+ LogLocker log_lock;
+
+ //mb.rd_ptr () [0] = '\0';
+ mb->rd_ptr (mb->rd_ptr () - result.bytes_transferred ());
+
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("(%t) **** Server %d: handle_write_dgram() ****\n"),
+ this->id_));
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("%s = %B\n"),
+ ACE_TEXT ("bytes_to_write"),
+ result.bytes_to_write ()));
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("%s = %d\n"),
+ ACE_TEXT ("handle"),
+ result.handle ()));
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("%s = %B\n"),
+ ACE_TEXT ("bytes_transfered"),
+ result.bytes_transferred ()));
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("%s = %@\n"),
+ ACE_TEXT ("act"),
+ result.act ()));
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("%s = %d\n"),
+ ACE_TEXT ("success"),
+ result.success ()));
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("%s = %@\n"),
+ ACE_TEXT ("completion_key"),
+ result.completion_key ()));
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("%s = %d\n"),
+ ACE_TEXT ("error"),
+ result.error ()));
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("%s = %s\n"),
+ ACE_TEXT ("message_block"),
+ mb->rd_ptr ()));
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("**** end of message ****************\n")));
+ }
+ else if (result.error () != 0)
+ {
+ ACE_Log_Priority prio;
+#if defined (ACE_WIN32)
+ if (result.error () == ERROR_OPERATION_ABORTED)
+ prio = LM_DEBUG;
+#else
+ if (result.error () == ECANCELED)
+ prio = LM_DEBUG;
+#endif /* ACE_WIN32 */
+ else
+ prio = LM_ERROR;
+ ACE_LOG_MSG->errnum (result.error ());
+ ACE_LOG_MSG->log (prio,
+ ACE_TEXT ("(%t) Server %d; %p\n"),
+ this->id_,
+ ACE_TEXT ("write"));
+ }
+ else if (loglevel > 0)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("(%t) Server %d: wrote %B bytes ok\n"),
+ this->id_,
+ result.bytes_transferred ()));
+ }
+
+ mb->release ();
+
+ if (result.error () == 0 && result.bytes_transferred () > 0)
+ {
+ this->total_snd_ += result.bytes_transferred ();
+
+ if (duplex == 0 && !this->flg_closing_)
+ this->initiate_read ();
+ }
+
+ --this->io_count_;
+ if (this->io_count_ > 0)
+ return;
+ }
+ delete this;
+}
+
+// *******************************************
+// Connector
+//
+// Connector creates the proper number of Clients and initiates
+// sessions on them.
+// *******************************************
+
+class Connector
+{
+public:
+ Connector (TestData *tester);
+
+ int start (const ACE_INET_Addr &addr, int num);
+
+private:
+ TestData *tester_;
+};
+
+// *************************************************************
+
+Connector::Connector (TestData *tester)
+ : tester_ (tester)
+{
+}
+
+int
+Connector::start (const ACE_INET_Addr& addr, int num)
+{
+ ACE_OS::sleep(3); // Let Master get going
+ if (num > MAX_CLIENTS)
+ num = MAX_CLIENTS;
+
+ if (num < 0)
+ num = 1;
+
+ int rc = 0;
+
+ for (; rc < num; rc++)
+ {
+ ACE_SOCK_CODgram sock;
+ if (sock.open (addr) == -1)
+ ACE_ERROR_BREAK ((LM_ERROR,
+ ACE_TEXT ("(%t) Starting client %d: %p\n"),
+ rc,
+ ACE_TEXT ("open")));
+ ACE_INET_Addr me;
+ sock.get_local_addr (me);
+ u_short my_port = ACE_HTONS (me.get_port_number ());
+ ACE_INT32 my_addr = ACE_HTONL (me.get_ip_address ());
+ Session_Data session;
+ session.direction_ = 0; // Start
+ session.addr_ = my_addr;
+ session.port_ = my_port;
+ if (sock.send (&session, sizeof (session)) == -1)
+ ACE_ERROR_BREAK ((LM_ERROR,
+ ACE_TEXT ("(%t) Starting client %d: %p\n"),
+ rc,
+ ACE_TEXT ("send")));
+ if (sock.recv (&session, sizeof (session)) == -1)
+ ACE_ERROR_BREAK ((LM_ERROR,
+ ACE_TEXT ("(%t) Starting client %d: %p\n"),
+ rc,
+ ACE_TEXT ("recv")));
+ ACE_INET_Addr server;
+ server.set (session.port_, session.addr_, 0);
+ Client *client = this->tester_->client_up ();
+ ACE_TCHAR me_str[80], server_str[80];
+ me.addr_to_string (me_str, 80);
+ server.addr_to_string (server_str, 80);
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("(%t) Client %d setting up local %s, peer %s\n"),
+ client->id(),
+ me_str,
+ server_str));
+ sock.close ();
+ if (sock.open (server, me) == -1)
+ ACE_ERROR_BREAK ((LM_ERROR,
+ ACE_TEXT ("(%t) Re-opening %p\n"),
+ ACE_TEXT ("client")));
+ client->go (sock.get_handle (), server);
+ sock.set_handle (ACE_INVALID_HANDLE);
+ }
+ return rc;
+}
+
+
+Client::Client ()
+{
+ ACE_ERROR ((LM_ERROR, ACE_TEXT ("Shouldn't use this constructor!\n")));
+}
+
+Client::Client (TestData *tester, int id)
+ : tester_ (tester),
+ id_ (id),
+ io_count_ (0),
+ stop_writing_ (0),
+ flg_cancel_ (false),
+ total_snd_ (0),
+ total_rcv_ (0),
+ total_w_ (0),
+ total_r_ (0)
+{
+}
+
+Client::~Client (void)
+{
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("(%t) Client %d dtor; %d sends (%B bytes); ")
+ ACE_TEXT ("%d recvs (%B bytes)\n"),
+ this->id_,
+ this->total_w_, this->total_snd_,
+ this->total_r_, this->total_rcv_));
+ if (this->io_count_ != 0)
+ ACE_ERROR ((LM_WARNING,
+ ACE_TEXT ("(%t) Client %d deleted with %d I/O outstanding\n"),
+ this->id_,
+ this->io_count_));
+
+ // This test bounces data back and forth between Clients and Servers.
+ // Therefore, if there was significantly more data in one direction, that's
+ // a problem. Remember, the byte counts are unsigned values.
+ int issue_data_warning = 0;
+ if (this->total_snd_ > this->total_rcv_)
+ {
+ if (this->total_rcv_ == 0)
+ issue_data_warning = 1;
+ else if (this->total_snd_ / this->total_rcv_ > 2)
+ issue_data_warning = 1;
+ }
+ else
+ {
+ if (this->total_snd_ == 0)
+ issue_data_warning = 1;
+ else if (this->total_rcv_ / this->total_snd_ > 2)
+ issue_data_warning = 1;
+ }
+ if (issue_data_warning)
+ ACE_DEBUG ((LM_WARNING,
+ ACE_TEXT ("(%t) Above byte counts look odd; need review\n")));
+
+ if (this->tester_ != 0)
+ this->tester_->client_done (this);
+
+ this->id_ = -1;
+ if (this->handle () != ACE_INVALID_HANDLE)
+ {
+ ACE_OS::closesocket (this->handle ());
+ }
+ this->handle (ACE_INVALID_HANDLE);
+}
+
+void
+Client::cancel ()
+{
+ ACE_GUARD (ACE_SYNCH_MUTEX, monitor, this->lock_);
+
+ this->flg_cancel_ = true;
+ this->ws_.cancel ();
+ this->rs_.cancel ();
+ return;
+}
+
+void
+Client::close ()
+{
+ // This must be called with the lock_ held.
+ ++this->stop_writing_;
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("(%t) Closing Client %d writes; %d I/O outstanding\n"),
+ this->id_, this->io_count_));
+ return;
+}
+
+void
+Client::go (ACE_HANDLE handle, const ACE_INET_Addr &server)
+{
+ {
+ ACE_GUARD (ACE_SYNCH_MUTEX, monitor, this->lock_);
+
+ this->handle (handle);
+ this->server_addr_.set (server);
+
+ // Open send and receive factories.
+ if (this->ws_.open (*this, handle) == -1)
+ ACE_ERROR ((LM_ERROR,
+ ACE_TEXT ("(%t) Client %d: %p\n"),
+ this->id_,
+ ACE_TEXT ("ACE_Asynch_Write_Dgram::open")));
+ else if (this->rs_.open (*this, handle) == -1)
+ ACE_ERROR ((LM_ERROR,
+ ACE_TEXT ("(%t) Client %d: %p\n"),
+ this->id_,
+ ACE_TEXT ("ACE_Asynch_Read_Dgram::open")));
+ else if (this->initiate_write () == 0)
+ {
+ if (duplex != 0) // Start an asynchronous read
+ this->initiate_read ();
+ }
+
+ if (this->io_count_ > 0)
+ return;
+ }
+ delete this;
+}
+
+int
+Client::initiate_write (void)
+{
+ if (this->flg_cancel_ || this->handle () == ACE_INVALID_HANDLE)
+ return -1;
+
+ // stop_writing_ is set to 1 to say "stop". To avoid repeating the
+ // close datagram for every echo, only send it once. Sure, there's a risk
+ // it will get lost, but since this is most often intra-host, don't
+ // worry about that very small risk.
+ if (this->stop_writing_ > 0) // Need to tell server to "close"
+ {
+ if (this->stop_writing_ > 1) // Already told server to close
+ return 0;
+
+ ++this->stop_writing_;
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("(%t) Client %d requesting close\n"),
+ this->id_));
+ ACE_Message_Block *mb =
+ new ACE_Message_Block (ACE_OS::strlen (close_req_msg) + 1);
+ mb->copy (close_req_msg);
+ size_t unused; // Number of bytes sent
+ if (this->ws_.send (mb, unused, 0, this->server_addr_) == -1)
+ {
+ mb->release ();
+ ACE_ERROR_RETURN ((LM_ERROR,
+ ACE_TEXT ("(%t) Client %d, %p\n"),
+ this->id_,
+ ACE_TEXT ("initiating closing send")),
+ -1);
+ }
+
+ this->io_count_++;
+ this->total_w_++;
+ return 0;
+ }
+
+ static const size_t complete_message_length =
+ ACE_OS::strlen (complete_message);
+
+#if defined (ACE_WIN32)
+
+ ACE_Message_Block *mb1 = 0,
+ *mb2 = 0,
+ *mb3 = 0;
+
+ // No need to allocate +1 for proper printing - the memory includes it already
+ ACE_NEW_RETURN (mb1,
+ ACE_Message_Block ((char *)complete_message,
+ complete_message_length),
+ -1);
+
+ ACE_NEW_RETURN (mb2,
+ ACE_Message_Block ((char *)complete_message,
+ complete_message_length),
+ -1);
+
+ ACE_NEW_RETURN (mb3,
+ ACE_Message_Block ((char *)complete_message,
+ complete_message_length),
+ -1);
+
+ mb1->wr_ptr (complete_message_length);
+ mb2->wr_ptr (complete_message_length);
+ mb3->wr_ptr (complete_message_length);
+
+ // chain them together
+ mb1->cont (mb2);
+ mb2->cont (mb3);
+
+ size_t unused; // Number of bytes sent
+ if (this->ws_.send (mb1, unused, 0, this->server_addr_) == -1)
+ {
+ mb1->release ();
+ ACE_ERROR_RETURN((LM_ERROR,
+ ACE_TEXT ("(%t) %p\n"),
+ ACE_TEXT ("Client::ACE_Asynch_Write_Dgram::send")),
+ -1);
+ }
+#else /* ACE_WIN32 */
+
+ ACE_Message_Block *mb = 0;
+
+ // No need to allocate +1 for proper printing - the memory includes
+ // it already
+ ACE_NEW_RETURN (mb,
+ ACE_Message_Block (complete_message,
+ complete_message_length),
+ -1);
+ mb->wr_ptr (complete_message_length);
+ size_t unused; // Number of bytes sent
+ if (this->ws_.send (mb, unused, 0, this->server_addr_) == -1)
+ {
+ mb->release ();
+ ACE_ERROR_RETURN((LM_ERROR,
+ ACE_TEXT ("(%t) Client %d, %p\n"),
+ this->id_,
+ ACE_TEXT ("send")),
+ -1);
+ }
+#endif /* ACE_WIN32 */
+
+ this->io_count_++;
+ this->total_w_++;
+ return 0;
+}
+
+int
+Client::initiate_read (void)
+{
+ if (this->flg_cancel_ || this->handle_ == ACE_INVALID_HANDLE)
+ return -1;
+
+ static const size_t complete_message_length =
+ ACE_OS::strlen (complete_message);
+
+#if defined (ACE_HAS_WIN32_OVERLAPPED_IO)
+ ACE_Message_Block *mb1 = 0,
+ *mb2 = 0,
+ *mb3 = 0,
+ *mb4 = 0,
+ *mb5 = 0,
+ *mb6 = 0;
+
+ // We allocate +1 only for proper printing - we can just set the last byte
+ // to '\0' before printing out
+ ACE_NEW_RETURN (mb1, ACE_Message_Block (complete_message_length + 1), -1);
+ ACE_NEW_RETURN (mb2, ACE_Message_Block (complete_message_length + 1), -1);
+ ACE_NEW_RETURN (mb3, ACE_Message_Block (complete_message_length + 1), -1);
+
+ // Let allocate memory for one more triplet,
+ // This improves performance
+ // as we can receive more the than one block at once
+ // Generally, we can receive more triplets ....
+ ACE_NEW_RETURN (mb4, ACE_Message_Block (complete_message_length + 1), -1);
+ ACE_NEW_RETURN (mb5, ACE_Message_Block (complete_message_length + 1), -1);
+ ACE_NEW_RETURN (mb6, ACE_Message_Block (complete_message_length + 1), -1);
+
+ mb1->cont (mb2);
+ mb2->cont (mb3);
+
+ mb3->cont (mb4);
+ mb4->cont (mb5);
+ mb5->cont (mb6);
+
+
+ // hide last byte in each message block, reserving it for later to set '\0'
+ // for proper printouts
+ mb1->size (mb1->size () - 1);
+ mb2->size (mb2->size () - 1);
+ mb3->size (mb3->size () - 1);
+
+ mb4->size (mb4->size () - 1);
+ mb5->size (mb5->size () - 1);
+ mb6->size (mb6->size () - 1);
+
+ // Inititiate read
+ size_t unused = 0;
+ if (this->rs_.recv (mb1, unused, 0) == -1)
+ {
+ mb1->release ();
+ ACE_ERROR_RETURN ((LM_ERROR,
+ ACE_TEXT ("(%t) %p\n"),
+ ACE_TEXT ("Client::ACE_Asynch_Read_Stream::readv")),
+ -1);
+ }
+#else /* ACE_HAS_WIN32_OVERLAPPED_IO */
+
+ // Try to read more chunks
+ size_t blksize = ( complete_message_length > BUFSIZ ) ?
+ complete_message_length : BUFSIZ;
+
+ ACE_Message_Block *mb = 0;
+
+ // We allocate +1 only for proper printing - we can just set the last byte
+ // to '\0' before printing out
+ ACE_NEW_RETURN (mb,
+ ACE_Message_Block (blksize + 1),
+ -1);
+
+ // Inititiate read
+ size_t unused = 0;
+ if (this->rs_.recv (mb, unused, 0) == -1)
+ {
+ mb->release ();
+ ACE_ERROR_RETURN ((LM_ERROR,
+ ACE_TEXT ("(%t) Client %d, %p\n"),
+ this->id_,
+ ACE_TEXT ("read")),
+ -1);
+ }
+#endif /* ACE_HAS_WIN32_OVERLAPPED_IO */
+
+ this->io_count_++;
+ this->total_r_++;
+ return 0;
+}
+
+void
+Client::handle_write_dgram (const ACE_Asynch_Write_Dgram::Result &result)
+{
+ {
+ ACE_GUARD (ACE_SYNCH_MUTEX, monitor, this->lock_);
+
+ ACE_Message_Block *mb = result.message_block ();
+
+ if (loglevel > 1)
+ {
+ LogLocker log_lock;
+
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("(%t) **** Client %d: handle_write_dgram() ****\n"),
+ this->id_));
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("%s = %B\n"),
+ ACE_TEXT ("bytes_to_write"),
+ result.bytes_to_write ()));
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("%s = %d\n"),
+ ACE_TEXT ("handle"),
+ result.handle ()));
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("%s = %B\n"),
+ ACE_TEXT ("bytes_transfered"),
+ result.bytes_transferred ()));
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("%s = %@\n"),
+ ACE_TEXT ("act"),
+ result.act ()));
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("%s = %d\n"),
+ ACE_TEXT ("success"),
+ result.success ()));
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("%s = %@\n"),
+ ACE_TEXT ("completion_key"),
+ result.completion_key ()));
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("%s = %d\n"),
+ ACE_TEXT ("error"),
+ result.error ()));
+
+#if defined (ACE_WIN32)
+ size_t bytes_transferred = result.bytes_transferred ();
+ char index = 0;
+ for (ACE_Message_Block* mb_i = mb;
+ (mb_i != 0) && (bytes_transferred > 0);
+ mb_i = mb_i->cont ())
+ {
+ // write 0 at string end for proper printout (if end of mb,
+ // it's 0 already)
+ mb_i->rd_ptr()[0] = '\0';
+
+ size_t len = mb_i->rd_ptr () - mb_i->base ();
+
+ // move rd_ptr backwards as required for printout
+ if (len >= bytes_transferred)
+ {
+ mb_i->rd_ptr (0 - bytes_transferred);
+ bytes_transferred = 0;
+ }
+ else
+ {
+ mb_i->rd_ptr (0 - len);
+ bytes_transferred -= len;
+ }
+
+ ++index;
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("%s%d = %s\n"),
+ ACE_TEXT ("message_block, part "),
+ index,
+ mb_i->rd_ptr ()));
+ }
+#else /* ACE_WIN32 */
+ // write 0 at string end for proper printout (if end of mb, it's 0 already)
+ mb->rd_ptr()[0] = '\0';
+ // move rd_ptr backwards as required for printout
+ mb->rd_ptr (- result.bytes_transferred ());
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("%s = %s\n"),
+ ACE_TEXT ("message_block"),
+ mb->rd_ptr ()));
+#endif /* ACE_WIN32 */
+
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("**** end of message ****************\n")));
+ }
+ else if (result.error () != 0)
+ {
+ ACE_Log_Priority prio;
+#if defined (ACE_WIN32)
+ if (result.error () == ERROR_OPERATION_ABORTED)
+ prio = LM_DEBUG;
+#else
+ if (result.error () == ECANCELED)
+ prio = LM_DEBUG;
+#endif /* ACE_WIN32 */
+ else
+ prio = LM_ERROR;
+ ACE_LOG_MSG->errnum (result.error ());
+ ACE_LOG_MSG->log (prio,
+ ACE_TEXT ("(%t) Client %d; %p\n"),
+ this->id_,
+ ACE_TEXT ("write"));
+ }
+ else if (loglevel > 0)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("(%t) Client %d: wrote %B bytes ok\n"),
+ this->id_,
+ result.bytes_transferred ()));
+ }
+
+ mb->release ();
+
+ if (result.error () == 0 && result.bytes_transferred () > 0)
+ {
+ this->total_snd_ += result.bytes_transferred ();
+ if (this->total_snd_ >= xfer_limit)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("(%t) Client %d sent %B, limit %B\n"),
+ this->id_, this->total_snd_, xfer_limit));
+ this->close ();
+ }
+ if (duplex != 0) // full duplex, continue write
+ {
+ if ((this->total_snd_- this->total_rcv_) < 1024*32 ) //flow control
+ this->initiate_write ();
+ }
+ else // half-duplex read reply, after read we will start write
+ this->initiate_read ();
+ }
+
+ --this->io_count_;
+ if (this->io_count_ > 0)
+ return;
+ }
+ delete this;
+}
+
+void
+Client::handle_read_dgram (const ACE_Asynch_Read_Dgram::Result &result)
+{
+ {
+ ACE_GUARD (ACE_SYNCH_MUTEX, monitor, this->lock_);
+
+ ACE_Message_Block *mb = result.message_block ();
+
+ if (loglevel > 1)
+ {
+ LogLocker log_lock;
+
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("(%t) **** Client %d: handle_read_dgram() ****\n"),
+ this->id_));
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("%s = %B\n"),
+ ACE_TEXT ("bytes_to_read"),
+ result.bytes_to_read ()));
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("%s = %d\n"),
+ ACE_TEXT ("handle"),
+ result.handle ()));
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("%s = %B\n"),
+ ACE_TEXT ("bytes_transfered"),
+ result.bytes_transferred ()));
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("%s = %@\n"),
+ ACE_TEXT ("act"),
+ result.act ()));
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("%s = %d\n"),
+ ACE_TEXT ("success"),
+ result.success ()));
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("%s = %@\n"),
+ ACE_TEXT ("completion_key"),
+ result.completion_key ()));
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("%s = %d\n"),
+ ACE_TEXT ("error"),
+ result.error ()));
+
+#if defined (ACE_WIN32)
+ char index = 0;
+ for (ACE_Message_Block* mb_i = mb;
+ mb_i != 0;
+ mb_i = mb_i->cont ())
+ {
+ ++index;
+ // write 0 at string end for proper printout
+ mb_i->wr_ptr()[0] = '\0';
+
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("%s%d = %s\n"),
+ ACE_TEXT ("message_block, part "),
+ index,
+ mb_i->rd_ptr ()));
+ }
+#else /* ACE_WIN32 */
+ // write 0 at string end for proper printout
+ mb->rd_ptr()[result.bytes_transferred ()] = '\0'; // for proper printout
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("%s = %s\n"),
+ ACE_TEXT ("message_block"),
+ mb->rd_ptr ()));
+#endif /* ACE_WIN32 */
+
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("**** end of message ****************\n")));
+ }
+ else if (result.error () != 0)
+ {
+ ACE_Log_Priority prio;
+#if defined (ACE_WIN32)
+ if (result.error () == ERROR_OPERATION_ABORTED)
+ prio = LM_DEBUG;
+#else
+ if (result.error () == ECANCELED)
+ prio = LM_DEBUG;
+#endif /* ACE_WIN32 */
+ else
+ prio = LM_ERROR;
+ ACE_Log_Msg::instance ()->errnum (result.error ());
+ ACE_Log_Msg::instance ()->log (prio,
+ ACE_TEXT ("(%t) Client %d; %p\n"),
+ this->id_,
+ ACE_TEXT ("read"));
+ }
+ else if (loglevel > 0)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("(%t) Client %d: read %B bytes ok\n"),
+ this->id_,
+ result.bytes_transferred ()));
+ }
+
+ if (result.error () == 0 && result.bytes_transferred () > 0)
+ {
+ this->total_rcv_ += result.bytes_transferred ();
+
+ // If we've closed and the server acked, we're done.
+ if (this->stop_writing_ &&
+ ACE_OS::strcmp (mb->rd_ptr (), close_ack_msg) == 0)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("(%t) Client %d recvd close-ack\n"),
+ this->id_));
+ }
+ else
+ {
+ if (duplex != 0)
+ this->initiate_read ();
+ else // half-duplex write, after write we will start read
+ this->initiate_write ();
+ }
+ }
+
+ mb->release ();
+ --this->io_count_;
+ if (this->io_count_ > 0)
+ return;
+ }
+ delete this;
+}
+
+// *************************************************************
+// Configuration helpers
+// *************************************************************
+int
+print_usage (int /* argc */, ACE_TCHAR *argv[])
+{
+ ACE_ERROR
+ ((LM_ERROR,
+ ACE_TEXT ("\nusage: %s")
+ ACE_TEXT ("\n-o <max number of started aio operations for Proactor>")
+ ACE_TEXT ("\n-t <Proactor type> UNIX-only, Win32-default always:")
+ ACE_TEXT ("\n a AIOCB")
+ ACE_TEXT ("\n i SIG")
+ ACE_TEXT ("\n c CB")
+ ACE_TEXT ("\n s SUN")
+ ACE_TEXT ("\n d default")
+ ACE_TEXT ("\n-d <duplex mode 1-on/0-off>")
+ ACE_TEXT ("\n-h <host> for Client mode")
+ ACE_TEXT ("\n-n <number threads for Proactor pool>")
+ ACE_TEXT ("\n-p <port to listen/connect>")
+ ACE_TEXT ("\n-c <number of client instances>")
+ ACE_TEXT ("\n-b run client and server at the same time")
+ ACE_TEXT ("\n f file")
+ ACE_TEXT ("\n c console")
+ ACE_TEXT ("\n-v log level")
+ ACE_TEXT ("\n 0 - log errors and highlights")
+ ACE_TEXT ("\n 1 - log level 0 plus progress information")
+ ACE_TEXT ("\n 2 - log level 1 plus operation parameters and results")
+ ACE_TEXT ("\n-x max transfer byte count per Client")
+ ACE_TEXT ("\n-u show this message")
+ ACE_TEXT ("\n"),
+ argv[0]
+ ));
+ return -1;
+}
+
+static int
+set_proactor_type (const ACE_TCHAR *ptype)
+{
+ if (!ptype)
+ return 0;
+
+ switch (ACE_OS::ace_toupper (*ptype))
+ {
+ case 'D':
+ proactor_type = DEFAULT;
+ return 1;
+ case 'A':
+ proactor_type = AIOCB;
+ return 1;
+ case 'I':
+ proactor_type = SIG;
+ return 1;
+#if defined (sun)
+ case 'S':
+ proactor_type = SUN;
+ return 1;
+#endif /* sun */
+#if !defined (ACE_HAS_BROKEN_SIGEVENT_STRUCT)
+ case 'C':
+ proactor_type = CB;
+ return 1;
+#endif /* !ACE_HAS_BROKEN_SIGEVENT_STRUCT */
+ default:
+ break;
+ }
+ return 0;
+}
+
+static int
+parse_args (int argc, ACE_TCHAR *argv[])
+{
+ // First, set up all the defaults then let any args change them.
+ both = 1; // client and server simultaneosly
+ duplex = 1; // full duplex is on
+ host = ACE_LOCALHOST; // server to connect
+ port = ACE_DEFAULT_SERVER_PORT; // port to connect/listen
+ max_aio_operations = 512; // POSIX Proactor params
+ proactor_type = DEFAULT; // Proactor type = default
+ threads = 3; // size of Proactor thread pool
+ clients = 10; // number of clients
+ loglevel = 0; // log level : only errors and highlights
+ // Default transfer limit 50 messages per Sender
+ xfer_limit = 50 * ACE_OS::strlen (complete_message);
+
+ // Linux kernels up to at least 2.6.9 (RHEL 4) can't do full duplex aio.
+# if defined (ACE_LINUX)
+ duplex = 0;
+#endif
+
+ if (argc == 1) // no arguments , so one button test
+ return 0;
+
+ ACE_Get_Opt get_opt (argc, argv, ACE_TEXT ("x:t:o:n:p:d:h:c:v:ub"));
+ int c;
+
+ while ((c = get_opt ()) != EOF)
+ {
+ switch (c)
+ {
+ case 'x': // xfer limit
+ xfer_limit = static_cast<size_t> (ACE_OS::atoi (get_opt.opt_arg ()));
+ if (xfer_limit == 0)
+ xfer_limit = 1; // Bare minimum.
+ break;
+ case 'b': // both client and server
+ both = 1;
+ break;
+ case 'v': // log level
+ loglevel = ACE_OS::atoi (get_opt.opt_arg ());
+ break;
+ case 'd': // duplex
+ duplex = ACE_OS::atoi (get_opt.opt_arg ());
+ break;
+ case 'h': // host for sender
+ host = get_opt.opt_arg ();
+ break;
+ case 'p': // port number
+ port = ACE_OS::atoi (get_opt.opt_arg ());
+ break;
+ case 'n': // thread pool size
+ threads = ACE_OS::atoi (get_opt.opt_arg ());
+ break;
+ case 'c': // number of clients
+ clients = ACE_OS::atoi (get_opt.opt_arg ());
+ if (clients > MAX_CLIENTS)
+ clients = MAX_CLIENTS;
+ break;
+ case 'o': // max number of aio for proactor
+ max_aio_operations = ACE_OS::atoi (get_opt.opt_arg ());
+ break;
+ case 't': // Proactor Type
+ if (set_proactor_type (get_opt.opt_arg ()))
+ break;
+ return print_usage (argc, argv);
+ case 'u':
+ default:
+ return print_usage (argc, argv);
+ } // switch
+ } // while
+
+ if (proactor_type == SUN && threads > 1)
+ {
+ ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("Sun aiowait is not thread-safe; ")
+ ACE_TEXT ("changing to 1 thread\n")));
+ threads = 1;
+ }
+
+ return 0;
+}
+
+int
+run_main (int argc, ACE_TCHAR *argv[])
+{
+ ACE_START_TEST (ACE_TEXT ("Proactor_UDP_Test"));
+
+ if (::parse_args (argc, argv) == -1)
+ return -1;
+
+ disable_signal (ACE_SIGRTMIN, ACE_SIGRTMAX);
+ disable_signal (SIGPIPE, SIGPIPE);
+
+ MyTask task1;
+ TestData test;
+
+ if (task1.start (threads, proactor_type, max_aio_operations) == 0)
+ {
+ // NOTE - there's no real reason this test is limited to IPv4 other
+ // than the way Session_Data is set up - to expand this test to work
+ // on IPv6 as well as IPv4, you need to do some work on passing the
+ // Session_Data address differently.
+ ACE_INET_Addr addr (port, ACE_LOCALHOST, AF_INET);
+ Master master (&test, addr, clients);
+ Connector connector (&test);
+ int rc = 0;
+
+ if (both != 0 || host == 0) // Acceptor
+ {
+ // Already running; if not needed will be deleted soon.
+ rc = 1;
+ }
+
+ if (both != 0 || host != 0)
+ {
+ if (host == 0)
+ host = ACE_LOCALHOST;
+
+ if (addr.set (port, host, 1, addr.get_type ()) == -1)
+ ACE_ERROR ((LM_ERROR, ACE_TEXT ("%p\n"), host));
+ else
+ rc += connector.start (addr, clients);
+ }
+
+ // Wait a few seconds to let things get going, then poll til
+ // all sessions are done. Note that when we exit this scope, the
+ // Acceptor and Connector will be destroyed, which should prevent
+ // further connections and also test how well destroyed handlers
+ // are handled.
+ ACE_OS::sleep (3);
+ }
+ ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) Sleeping til sessions run down.\n")));
+ while (!test.testing_done ())
+ ACE_OS::sleep (1);
+
+ test.stop_all ();
+
+ ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) Stop Thread Pool Task\n")));
+ task1.stop ();
+
+ ACE_END_TEST;
+
+ return 0;
+}
+
+#else
+
+int
+run_main (int, ACE_TCHAR *[])
+{
+ ACE_START_TEST (ACE_TEXT ("Proactor_UDP_Test"));
+
+ ACE_DEBUG ((LM_INFO,
+ ACE_TEXT ("Threads or Asynchronous IO is unsupported.\n")
+ ACE_TEXT ("Proactor_UDP_Test will not be run.")));
+
+ ACE_END_TEST;
+
+ return 0;
+}
+
+#endif /* ACE_HAS_WIN32_OVERLAPPED_IO || ACE_HAS_AIO_CALLS */