summaryrefslogtreecommitdiff
path: root/tests/Proactor_Test.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'tests/Proactor_Test.cpp')
-rw-r--r--tests/Proactor_Test.cpp846
1 files changed, 413 insertions, 433 deletions
diff --git a/tests/Proactor_Test.cpp b/tests/Proactor_Test.cpp
index 9e1012cd594..9ec8cb123d8 100644
--- a/tests/Proactor_Test.cpp
+++ b/tests/Proactor_Test.cpp
@@ -47,6 +47,9 @@ ACE_RCSID (tests,
#include "ace/OS_NS_sys_socket.h"
#include "ace/os_include/netinet/os_tcp.h"
+#include "ace/Atomic_Op.h"
+#include "ace/Synch_Traits.h"
+
#if defined (ACE_WIN32) && !defined (ACE_HAS_WINCE)
# include "ace/WIN32_Proactor.h"
@@ -76,10 +79,10 @@ static int both = 0;
// Host that we're connecting to.
static const ACE_TCHAR *host = 0;
-// number of Senders instances
-static int senders = 1;
-const int MAX_SENDERS = 1000;
-const int MAX_RECEIVERS = 1000;
+// number of Client instances
+static int clients = 1;
+const int MAX_CLIENTS = 1000;
+const int MAX_SERVERS = 1000;
// duplex mode: == 0 half-duplex
// != 0 full duplex
@@ -94,7 +97,7 @@ static u_short port = ACE_DEFAULT_SERVER_PORT;
// Log options
static int loglevel; // 0 full , 1 only errors
-static size_t xfer_limit; // Number of bytes for Sender to send.
+static size_t xfer_limit; // Number of bytes for Client to send.
static char complete_message[] =
"GET / HTTP/1.1\r\n"
@@ -356,149 +359,304 @@ MyTask::svc (void)
}
-class Acceptor : public ACE_Asynch_Acceptor<Receiver>
+// TestData collects and reports on test-related transfer and connection
+// statistics.
+class TestData
{
- friend class Receiver;
public:
- int get_number_sessions (void) { return this->sessions_; }
- size_t get_total_snd (void) { return this->total_snd_; }
- size_t get_total_rcv (void) { return this->total_rcv_; }
- long get_total_w (void) { return this->total_w_; }
- long get_total_r (void) { return this->total_r_; }
+ TestData ();
+ bool testing_done (void);
+ Server *server_up (void);
+ Client *client_up (void);
+ void server_done (Server *s);
+ void client_done (Client *c);
+ void stop_all (void);
+ void report (void);
- Acceptor (void);
- virtual ~Acceptor (void);
+private:
+ struct Local_Stats
+ {
+ // Track number of sessions that report start, and those that report
+ // their end (and stats).
+ ACE_Atomic_Op<ACE_SYNCH_MUTEX, int> sessions_up_;
+ ACE_Atomic_Op<ACE_SYNCH_MUTEX, int> sessions_down_;
+
+ // Total read and write bytes for all sessions.
+ ACE_Atomic_Op<ACE_SYNCH_MUTEX, size_t> w_cnt_;
+ ACE_Atomic_Op<ACE_SYNCH_MUTEX, size_t> r_cnt_;
+ // Total read and write operations issues for all sessions.
+ ACE_Atomic_Op<ACE_SYNCH_MUTEX, size_t> w_ops_;
+ ACE_Atomic_Op<ACE_SYNCH_MUTEX, size_t> r_ops_;
+ } servers_, clients_;
+
+ ACE_SYNCH_MUTEX list_lock_;
+ Server *server_list_[MAX_SERVERS];
+ Client *client_list_[MAX_CLIENTS];
+};
- void stop (void);
- void cancel_all (void);
+TestData::TestData ()
+{
+ int i;
+ for (i = 0; i < MAX_SERVERS; ++i)
+ this->server_list_[i] = 0;
+ for (i = 0; i < MAX_CLIENTS; ++i)
+ this->client_list_[i] = 0;
+}
- // Virtual from ACE_Asynch_Acceptor
- Receiver *make_handler (void);
+bool
+TestData::testing_done (void)
+{
+ int svr_up = this->servers_.sessions_up_.value ();
+ int svr_dn = this->servers_.sessions_down_.value ();
+ int clt_up = this->clients_.sessions_up_.value ();
+ int clt_dn = this->clients_.sessions_down_.value ();
-private:
- void on_new_receiver (Receiver &rcvr);
- void on_delete_receiver (Receiver &rcvr);
+ if (svr_up == 0 && clt_up == 0) // No connections up yet
+ return false;
- ACE_SYNCH_RECURSIVE_MUTEX lock_;
- int sessions_;
- Receiver *list_receivers_[MAX_RECEIVERS];
- size_t total_snd_;
- size_t total_rcv_;
- long total_w_;
- long total_r_;
-};
+ return (svr_dn >= svr_up && clt_dn >= clt_up);
+}
-// *************************************************************
-Acceptor::Acceptor (void)
- : sessions_ (0),
- total_snd_(0),
- total_rcv_(0),
- total_w_ (0),
- total_r_ (0)
+Server *
+TestData::server_up (void)
{
- ACE_GUARD (ACE_SYNCH_RECURSIVE_MUTEX, monitor, this->lock_);
+ ++this->servers_.sessions_up_;
+ ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, monitor, this->list_lock_, 0);
- for (int i = 0; i < MAX_RECEIVERS; ++i)
- this->list_receivers_[i] = 0;
+ for (int i = 0; i < MAX_SERVERS; ++i)
+ {
+ if (this->server_list_[i] == 0)
+ {
+ ACE_NEW_RETURN (this->server_list_[i], Server (this, i), 0);
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("(%t) Server %d up; now %d up, %d down.\n"),
+ i,
+ this->servers_.sessions_up_.value (),
+ this->servers_.sessions_down_.value ()));
+ return this->server_list_[i];
+ }
+ }
+ return 0;
}
-Acceptor::~Acceptor (void)
+Client *
+TestData::client_up (void)
{
- this->stop ();
-}
+ ++this->clients_.sessions_up_;
+ ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, monitor, this->list_lock_, 0);
+ for (int i = 0; i < MAX_CLIENTS; ++i)
+ {
+ if (this->client_list_[i] == 0)
+ {
+ ACE_NEW_RETURN (this->client_list_[i], Client (this, i), 0);
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("(%t) Client %d up; now %d up, %d down.\n"),
+ i,
+ this->clients_.sessions_up_.value (),
+ this->clients_.sessions_down_.value ()));
+ return this->client_list_[i];
+ }
+ }
+ return 0;
+}
void
-Acceptor::cancel_all (void)
+TestData::server_done (Server *s)
{
- ACE_GUARD (ACE_SYNCH_RECURSIVE_MUTEX, monitor, this->lock_);
-
- this->cancel ();
-
- for (int i = 0; i < MAX_RECEIVERS; ++i)
+ this->servers_.w_cnt_ += s->get_total_snd ();
+ this->servers_.r_cnt_ += s->get_total_rcv ();
+ this->servers_.w_ops_ += s->get_total_w ();
+ this->servers_.r_ops_ += s->get_total_r ();
+ ++this->servers_.sessions_down_;
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("(%t) Server %d gone; now %d up, %d down\n"),
+ s->id (),
+ this->servers_.sessions_up_.value (),
+ this->servers_.sessions_down_.value ()));
+
+ ACE_GUARD (ACE_SYNCH_MUTEX, monitor, this->list_lock_);
+ int i;
+ for (i = 0; i < MAX_SERVERS; ++i)
{
- if (this->list_receivers_[i] != 0)
- this->list_receivers_[i]->cancel ();
+ if (this->server_list_[i] == s)
+ {
+ if (s->id () != i)
+ ACE_ERROR ((LM_ERROR,
+ ACE_TEXT ("Server %d is pos %d in list\n"),
+ s->id (),
+ i));
+ this->server_list_[i] = 0;
+ break;
+ }
}
+ if (i >= MAX_SERVERS)
+ ACE_ERROR ((LM_ERROR, ACE_TEXT ("Server %@ done but not listed\n"), s));
+
return;
}
-
void
-Acceptor::stop (void)
+TestData::client_done (Client *c)
{
- // This method can be called only after proactor event loop is done
- // in all threads.
-
- ACE_GUARD (ACE_SYNCH_RECURSIVE_MUTEX, monitor, this->lock_);
-
- for (int i = 0; i < MAX_RECEIVERS; ++i)
+ this->clients_.w_cnt_ += c->get_total_snd ();
+ this->clients_.r_cnt_ += c->get_total_rcv ();
+ this->clients_.w_ops_ += c->get_total_w ();
+ this->clients_.r_ops_ += c->get_total_r ();
+ ++this->clients_.sessions_down_;
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("(%t) Client %d gone; now %d up, %d down\n"),
+ c->id (),
+ this->clients_.sessions_up_.value (),
+ this->clients_.sessions_down_.value ()));
+
+ ACE_GUARD (ACE_SYNCH_MUTEX, monitor, this->list_lock_);
+ int i;
+ for (i = 0; i < MAX_CLIENTS; ++i)
{
- delete this->list_receivers_[i];
- this->list_receivers_[i] = 0;
+ if (this->client_list_[i] == c)
+ {
+ if (c->id () != i)
+ ACE_ERROR ((LM_ERROR,
+ ACE_TEXT ("Client %d is pos %d in list\n"),
+ c->id (),
+ i));
+ this->client_list_[i] = 0;
+ break;
+ }
}
+ if (i >= MAX_CLIENTS)
+ ACE_ERROR ((LM_ERROR, ACE_TEXT ("Client %@ done but not listed\n"), c));
+
+ return;
}
void
-Acceptor::on_new_receiver (Receiver & rcvr)
+TestData::stop_all (void)
{
- ACE_GUARD (ACE_SYNCH_RECURSIVE_MUTEX, monitor, this->lock_);
- this->sessions_++;
- this->list_receivers_[rcvr.index_] = &rcvr;
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("(%t) Acceptor: receiver %d up; now have %d.\n"),
- rcvr.index_,
- this->sessions_));
+ int i;
+
+ // Lock and cancel everything. Then release the lock, possibly allowing
+ // cleanups, then grab it again and delete all Servers and Clients.
+ {
+ ACE_GUARD (ACE_SYNCH_MUTEX, monitor, this->list_lock_);
+ for (i = 0; i < MAX_CLIENTS; ++i)
+ {
+ if (this->client_list_[i] != 0)
+ this->client_list_[i]->cancel ();
+ }
+
+ for (i = 0; i < MAX_SERVERS; ++i)
+ {
+ if (this->server_list_[i] != 0)
+ this->server_list_[i]->cancel ();
+ }
+ }
+ {
+ ACE_GUARD (ACE_SYNCH_MUTEX, monitor, this->list_lock_);
+ for (i = 0; i < MAX_CLIENTS; ++i)
+ {
+ if (this->client_list_[i] != 0)
+ delete this->client_list_[i];
+ }
+
+ for (i = 0; i < MAX_SERVERS; ++i)
+ {
+ if (this->server_list_[i] != 0)
+ delete this->server_list_[i];
+ }
+ }
}
void
-Acceptor::on_delete_receiver (Receiver & rcvr)
+TestData::report (void)
{
- ACE_GUARD (ACE_SYNCH_RECURSIVE_MUTEX, monitor, this->lock_);
+ // Print statistics
+ ACE_TCHAR bufs [256];
+ ACE_TCHAR bufr [256];
- this->sessions_--;
+ ACE_OS::sprintf (bufs,
+ ACE_SIZE_T_FORMAT_SPECIFIER
+ ACE_TEXT ("(") ACE_SIZE_T_FORMAT_SPECIFIER ACE_TEXT (")"),
+ this->clients_.w_cnt_.value (),
+ this->clients_.w_ops_.value ());
- this->total_snd_ += rcvr.get_total_snd();
- this->total_rcv_ += rcvr.get_total_rcv();
- this->total_w_ += rcvr.get_total_w();
- this->total_r_ += rcvr.get_total_r();
+ ACE_OS::sprintf (bufr,
+ ACE_SIZE_T_FORMAT_SPECIFIER
+ ACE_TEXT ("(") ACE_SIZE_T_FORMAT_SPECIFIER ACE_TEXT (")"),
+ this->clients_.r_cnt_.value (),
+ this->clients_.r_ops_.value ());
- if (rcvr.index_ >= 0
- && rcvr.index_ < MAX_RECEIVERS
- && this->list_receivers_[rcvr.index_] == &rcvr)
- this->list_receivers_[rcvr.index_] = 0;
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("Clients total bytes (ops): snd=%s rcv=%s\n"),
+ bufs,
+ bufr));
+
+ ACE_OS::sprintf (bufs,
+ ACE_SIZE_T_FORMAT_SPECIFIER
+ ACE_TEXT ("(") ACE_SIZE_T_FORMAT_SPECIFIER ACE_TEXT (")"),
+ this->servers_.w_cnt_.value (),
+ this->servers_.w_ops_.value ());
+
+ ACE_OS::sprintf (bufr,
+ ACE_SIZE_T_FORMAT_SPECIFIER
+ ACE_TEXT ("(") ACE_SIZE_T_FORMAT_SPECIFIER ACE_TEXT (")"),
+ this->servers_.r_cnt_.value (),
+ this->servers_.r_ops_.value ());
ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("(%t) Acceptor: receiver %d gone; %d remain\n"),
- rcvr.index_,
- this->sessions_));
+ ACE_TEXT ("Servers total bytes (ops): snd=%s rcv=%s\n"),
+ bufs,
+ bufr));
+
+ if (this->clients_.w_cnt_.value () == 0 ||
+ this->clients_.r_cnt_.value () == 0 ||
+ this->servers_.w_cnt_.value () == 0 ||
+ this->servers_.r_cnt_.value () == 0 )
+ ACE_ERROR ((LM_ERROR, ACE_TEXT ("It appears that this test didn't ")
+ ACE_TEXT ("really do anything. Something is very wrong.\n")));
}
-Receiver *
-Acceptor::make_handler (void)
+
+class Acceptor : public ACE_Asynch_Acceptor<Server>
{
- ACE_GUARD_RETURN (ACE_SYNCH_RECURSIVE_MUTEX, monitor, this->lock_, 0);
+public:
+ Acceptor (TestData *tester);
+ virtual ~Acceptor (void);
- if (this->sessions_ >= MAX_RECEIVERS)
- return 0;
+ // Virtual from ACE_Asynch_Acceptor
+ Server *make_handler (void);
- for (int i = 0; i < MAX_RECEIVERS; ++i)
- {
- if (this->list_receivers_[i] == 0)
- {
- ACE_NEW_RETURN (this->list_receivers_[i],
- Receiver (this, i),
- 0);
- return this->list_receivers_[i];
- }
- }
+private:
+ TestData *tester_;
+};
- return 0;
+// *************************************************************
+Acceptor::Acceptor (TestData *tester)
+ : tester_ (tester)
+{
}
+
+Acceptor::~Acceptor (void)
+{
+ this->cancel ();
+}
+
+Server *
+Acceptor::make_handler (void)
+{
+ return this->tester_->server_up ();
+}
+
// ***************************************************
-Receiver::Receiver (Acceptor * acceptor, int index)
- : acceptor_ (acceptor),
- index_ (index),
+Server::Server ()
+{
+ ACE_ERROR ((LM_ERROR, ACE_TEXT ("Shouldn't use this constructor!\n")));
+}
+
+Server::Server (TestData *tester, int id)
+ : tester_ (tester),
+ id_ (id),
handle_ (ACE_INVALID_HANDLE),
io_count_ (0),
flg_cancel_(0),
@@ -507,26 +665,24 @@ Receiver::Receiver (Acceptor * acceptor, int index)
total_w_ (0),
total_r_ (0)
{
- if (this->acceptor_ != 0)
- this->acceptor_->on_new_receiver (*this);
}
-Receiver::~Receiver (void)
+Server::~Server (void)
{
ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("(%t) Receiver %d dtor; %d sends (%d bytes); ")
+ ACE_TEXT ("(%t) Server %d dtor; %d sends (%d bytes); ")
ACE_TEXT ("%d recvs (%d bytes)\n"),
- this->index_,
+ this->id_,
this->total_w_, this->total_snd_,
this->total_r_, this->total_rcv_));
if (this->io_count_ != 0)
ACE_ERROR ((LM_WARNING,
- ACE_TEXT ("(%t) Receiver %d deleted with ")
+ ACE_TEXT ("(%t) Server %d deleted with ")
ACE_TEXT ("%d I/O outstanding\n"),
- this->index_,
+ this->id_,
this->io_count_));
- // This test bounces data back and forth between Senders and Receivers.
+ // This test bounces data back and forth between Clients and Servers.
// Therefore, if there was significantly more data in one direction, that's
// a problem. Remember, the byte counts are unsigned values.
int issue_data_warning = 0;
@@ -548,18 +704,18 @@ Receiver::~Receiver (void)
ACE_DEBUG ((LM_WARNING,
ACE_TEXT ("(%t) Above byte counts look odd; need review\n")));
- if (this->acceptor_ != 0)
- this->acceptor_->on_delete_receiver (*this);
+ if (this->tester_ != 0)
+ this->tester_->server_done (this);
if (this->handle_ != ACE_INVALID_HANDLE)
ACE_OS::closesocket (this->handle_);
- this->index_ = -1;
+ this->id_ = -1;
this->handle_= ACE_INVALID_HANDLE;
}
void
-Receiver::cancel ()
+Server::cancel ()
{
ACE_GUARD (ACE_SYNCH_MUTEX, monitor, this->lock_);
@@ -571,24 +727,24 @@ Receiver::cancel ()
void
-Receiver::addresses (const ACE_INET_Addr& peer, const ACE_INET_Addr&)
+Server::addresses (const ACE_INET_Addr& peer, const ACE_INET_Addr&)
{
ACE_TCHAR str[256];
if (0 == peer.addr_to_string (str, sizeof (str) / sizeof (ACE_TCHAR)))
ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("(%t) Receiver %d connection from %s\n"),
- this->index_,
+ ACE_TEXT ("(%t) Server %d connection from %s\n"),
+ this->id_,
str));
else
- ACE_ERROR ((LM_ERROR, ACE_TEXT ("(%t) Receiver %d %p\n"),
- this->index_,
+ ACE_ERROR ((LM_ERROR, ACE_TEXT ("(%t) Server %d %p\n"),
+ this->id_,
ACE_TEXT ("addr_to_string")));
return;
}
void
-Receiver::open (ACE_HANDLE handle, ACE_Message_Block &)
+Server::open (ACE_HANDLE handle, ACE_Message_Block &)
{
{
ACE_GUARD (ACE_SYNCH_MUTEX, monitor, this->lock_);
@@ -606,11 +762,11 @@ Receiver::open (ACE_HANDLE handle, ACE_Message_Block &)
if (this->ws_.open (*this, this->handle_) == -1)
ACE_ERROR ((LM_ERROR,
ACE_TEXT ("(%t) %p\n"),
- ACE_TEXT ("Receiver::ACE_Asynch_Write_Stream::open")));
+ ACE_TEXT ("Server::ACE_Asynch_Write_Stream::open")));
else if (this->rs_.open (*this, this->handle_) == -1)
ACE_ERROR ((LM_ERROR,
ACE_TEXT ("(%t) %p\n"),
- ACE_TEXT ("Receiver::ACE_Asynch_Read_Stream::open")));
+ ACE_TEXT ("Server::ACE_Asynch_Read_Stream::open")));
else
this->initiate_read_stream ();
@@ -621,7 +777,7 @@ Receiver::open (ACE_HANDLE handle, ACE_Message_Block &)
}
int
-Receiver::initiate_read_stream (void)
+Server::initiate_read_stream (void)
{
if (this->flg_cancel_ != 0 || this->handle_ == ACE_INVALID_HANDLE)
return -1;
@@ -640,13 +796,13 @@ Receiver::initiate_read_stream (void)
// a 0-byte read as we would if underlying calls used WSARecv.
if (ACE_OS::last_error () == ERROR_NETNAME_DELETED)
ACE_ERROR_RETURN ((LM_DEBUG,
- ACE_TEXT ("(%t) Receiver %d, peer closed\n"),
- this->index_),
+ ACE_TEXT ("(%t) Server %d, peer closed\n"),
+ this->id_),
-1);
#endif /* ACE_WIN32 */
ACE_ERROR_RETURN ((LM_ERROR,
- ACE_TEXT ("(%t) Receiver %d, %p\n"),
- this->index_,
+ ACE_TEXT ("(%t) Server %d, %p\n"),
+ this->id_,
ACE_TEXT ("read")),
-1);
}
@@ -657,7 +813,7 @@ Receiver::initiate_read_stream (void)
}
int
-Receiver::initiate_write_stream (ACE_Message_Block &mb, size_t nbytes)
+Server::initiate_write_stream (ACE_Message_Block &mb, size_t nbytes)
{
if (this->flg_cancel_ != 0 || this->handle_ == ACE_INVALID_HANDLE)
{
@@ -669,7 +825,7 @@ Receiver::initiate_write_stream (ACE_Message_Block &mb, size_t nbytes)
{
mb.release ();
ACE_ERROR_RETURN((LM_ERROR,
- ACE_TEXT ("(%t) Receiver::ACE_Asynch_Write_Stream::write nbytes <0 ")),
+ ACE_TEXT ("(%t) Server::ACE_Asynch_Write_Stream::write nbytes <0 ")),
-1);
}
@@ -680,13 +836,13 @@ Receiver::initiate_write_stream (ACE_Message_Block &mb, size_t nbytes)
// On peer close, WriteFile will yield ERROR_NETNAME_DELETED.
if (ACE_OS::last_error () == ERROR_NETNAME_DELETED)
ACE_ERROR_RETURN ((LM_DEBUG,
- ACE_TEXT ("(%t) Receiver %d, peer gone\n"),
- this->index_),
+ ACE_TEXT ("(%t) Server %d, peer gone\n"),
+ this->id_),
-1);
#endif /* ACE_WIN32 */
ACE_ERROR_RETURN((LM_ERROR,
- ACE_TEXT ("(%t) Receiver %d, %p\n"),
- this->index_,
+ ACE_TEXT ("(%t) Server %d, %p\n"),
+ this->id_,
ACE_TEXT ("write")),
-1);
}
@@ -697,7 +853,7 @@ Receiver::initiate_write_stream (ACE_Message_Block &mb, size_t nbytes)
}
void
-Receiver::handle_read_stream (const ACE_Asynch_Read_Stream::Result &result)
+Server::handle_read_stream (const ACE_Asynch_Read_Stream::Result &result)
{
{
ACE_GUARD (ACE_SYNCH_MUTEX, monitor, this->lock_ );
@@ -712,8 +868,8 @@ Receiver::handle_read_stream (const ACE_Asynch_Read_Stream::Result &result)
LogLocker log_lock;
ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("(%t) **** Receiver %d: handle_read_stream() ****\n"),
- this->index_));
+ ACE_TEXT ("(%t) **** Server %d: handle_read_stream() ****\n"),
+ this->id_));
ACE_DEBUG ((LM_DEBUG,
ACE_TEXT ("%s = %d\n"),
ACE_TEXT ("bytes_to_read"),
@@ -763,15 +919,15 @@ Receiver::handle_read_stream (const ACE_Asynch_Read_Stream::Result &result)
prio = LM_ERROR;
ACE_Log_Msg::instance ()->errnum (result.error ());
ACE_Log_Msg::instance ()->log (prio,
- ACE_TEXT ("(%t) Receiver %d; %p\n"),
- this->index_,
+ ACE_TEXT ("(%t) Server %d; %p\n"),
+ this->id_,
ACE_TEXT ("read"));
}
else if (loglevel > 0)
{
ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("(%t) Receiver %d: read %d bytes\n"),
- this->index_,
+ ACE_TEXT ("(%t) Server %d: read %d bytes\n"),
+ this->id_,
result.bytes_transferred ()));
}
@@ -797,7 +953,7 @@ Receiver::handle_read_stream (const ACE_Asynch_Read_Stream::Result &result)
}
void
-Receiver::handle_write_stream (const ACE_Asynch_Write_Stream::Result &result)
+Server::handle_write_stream (const ACE_Asynch_Write_Stream::Result &result)
{
{
ACE_GUARD (ACE_SYNCH_MUTEX, monitor, this->lock_);
@@ -812,8 +968,8 @@ Receiver::handle_write_stream (const ACE_Asynch_Write_Stream::Result &result)
mb.rd_ptr (mb.rd_ptr () - result.bytes_transferred ());
ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("(%t) **** Receiver %d: handle_write_stream() ****\n"),
- this->index_));
+ ACE_TEXT ("(%t) **** Server %d: handle_write_stream() ****\n"),
+ this->id_));
ACE_DEBUG ((LM_DEBUG,
ACE_TEXT ("%s = %d\n"),
ACE_TEXT ("bytes_to_write"),
@@ -863,15 +1019,15 @@ Receiver::handle_write_stream (const ACE_Asynch_Write_Stream::Result &result)
prio = LM_ERROR;
ACE_Log_Msg::instance ()->errnum (result.error ());
ACE_Log_Msg::instance ()->log (prio,
- ACE_TEXT ("(%t) Receiver %d; %p\n"),
- this->index_,
+ ACE_TEXT ("(%t) Server %d; %p\n"),
+ this->id_,
ACE_TEXT ("write"));
}
else if (loglevel > 0)
{
ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("(%t) Receiver %d: wrote %d bytes ok\n"),
- this->index_,
+ ACE_TEXT ("(%t) Server %d: wrote %d bytes ok\n"),
+ this->id_,
result.bytes_transferred ()));
}
@@ -896,156 +1052,45 @@ Receiver::handle_write_stream (const ACE_Asynch_Write_Stream::Result &result)
// Connector
// *******************************************
-class Connector : public ACE_Asynch_Connector<Sender>
+class Connector : public ACE_Asynch_Connector<Client>
{
- friend class Sender;
public:
- int get_number_sessions (void) { return this->sessions_; }
- size_t get_total_snd (void) { return this->total_snd_; }
- size_t get_total_rcv (void) { return this->total_rcv_; }
- long get_total_w (void) { return this->total_w_; }
- long get_total_r (void) { return this->total_r_; }
-
- Connector (void);
+ Connector (TestData *tester);
virtual ~Connector (void);
int start (const ACE_INET_Addr &addr, int num);
- void stop (void);
- void cancel_all (void);
// Virtual from ACE_Asynch_Connector
- Sender *make_handler (void);
+ Client *make_handler (void);
private:
- void on_new_sender (Sender &rcvr);
- void on_delete_sender (Sender &rcvr);
-
- ACE_SYNCH_RECURSIVE_MUTEX lock_;
- int sessions_;
- Sender *list_senders_[MAX_SENDERS];
- size_t total_snd_;
- size_t total_rcv_;
- long total_w_;
- long total_r_;
+ TestData *tester_;
};
// *************************************************************
-Connector::Connector (void)
- : sessions_ (0),
- total_snd_(0),
- total_rcv_(0),
- total_w_ (0),
- total_r_ (0)
+Connector::Connector (TestData *tester)
+ : tester_ (tester)
{
- ACE_GUARD (ACE_SYNCH_RECURSIVE_MUTEX, monitor, this->lock_);
-
- for (int i = 0; i < MAX_SENDERS; ++i)
- this->list_senders_[i] = 0;
}
Connector::~Connector (void)
{
- this->stop ();
-}
-
-
-void
-Connector::cancel_all(void)
-{
- ACE_GUARD (ACE_SYNCH_RECURSIVE_MUTEX, monitor, this->lock_);
-
this->cancel ();
-
- for (int i = 0; i < MAX_SENDERS; ++i)
- {
- if (this->list_senders_[i] != 0)
- this->list_senders_[i]->cancel ();
- }
- return;
}
-
-void
-Connector::stop (void)
-{
- // This method can be called only after proactor event loop is done
- // in all threads.
-
- ACE_GUARD (ACE_SYNCH_RECURSIVE_MUTEX, monitor, this->lock_);
-
- for (int i = 0; i < MAX_SENDERS; ++i)
- {
- delete this->list_senders_[i];
- this->list_senders_[i] = 0;
- }
-}
-
-void
-Connector::on_new_sender (Sender &sndr)
-{
- ACE_GUARD (ACE_SYNCH_RECURSIVE_MUTEX, monitor, this->lock_);
- this->sessions_++;
- this->list_senders_[sndr.index_] = &sndr;
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("(%t) Connector: sender %d up; now have %d.\n"),
- sndr.index_,
- this->sessions_));
-}
-
-void
-Connector::on_delete_sender (Sender &sndr)
-{
- ACE_GUARD (ACE_SYNCH_RECURSIVE_MUTEX, monitor, this->lock_);
-
- this->sessions_--;
- this->total_snd_ += sndr.get_total_snd();
- this->total_rcv_ += sndr.get_total_rcv();
- this->total_w_ += sndr.get_total_w();
- this->total_r_ += sndr.get_total_r();
-
- if (sndr.index_ >= 0
- && sndr.index_ < MAX_SENDERS
- && this->list_senders_[sndr.index_] == &sndr)
- this->list_senders_[sndr.index_] = 0;
-
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("(%t) Connector: sender %d gone; %d remain\n"),
- sndr.index_,
- this->sessions_));
-}
-
-Sender *
+Client *
Connector::make_handler (void)
{
- ACE_GUARD_RETURN (ACE_SYNCH_RECURSIVE_MUTEX, monitor, this->lock_, 0);
-
- if (this->sessions_ >= MAX_SENDERS)
- return 0;
-
- for (int i = 0; i < MAX_SENDERS; ++i)
- {
- if (this->list_senders_ [i] == 0)
- {
- ACE_NEW_RETURN (this->list_senders_[i],
- Sender (this, i),
- 0);
- return this->list_senders_[i];
- }
- }
-
- return 0;
+ return this->tester_->client_up ();
}
int
Connector::start (const ACE_INET_Addr& addr, int num)
{
-
- ACE_GUARD_RETURN (ACE_SYNCH_RECURSIVE_MUTEX, monitor, this->lock_, 0);
-
- if (num > MAX_SENDERS)
- num = MAX_SENDERS;
+ if (num > MAX_CLIENTS)
+ num = MAX_CLIENTS;
if (num < 0)
num = 1;
@@ -1078,9 +1123,14 @@ Connector::start (const ACE_INET_Addr& addr, int num)
}
-Sender::Sender (Connector * connector, int index)
- : index_ (index),
- connector_ (connector),
+Client::Client ()
+{
+ ACE_ERROR ((LM_ERROR, ACE_TEXT ("Shouldn't use this constructor!\n")));
+}
+
+Client::Client (TestData *tester, int id)
+ : tester_ (tester),
+ id_ (id),
handle_ (ACE_INVALID_HANDLE),
io_count_ (0),
stop_writing_ (0),
@@ -1090,25 +1140,23 @@ Sender::Sender (Connector * connector, int index)
total_w_ (0),
total_r_ (0)
{
- if (this->connector_ != 0)
- this->connector_->on_new_sender (*this);
}
-Sender::~Sender (void)
+Client::~Client (void)
{
ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("(%t) Sender %d dtor; %d sends (%d bytes); ")
+ ACE_TEXT ("(%t) Client %d dtor; %d sends (%d bytes); ")
ACE_TEXT ("%d recvs (%d bytes)\n"),
- this->index_,
+ this->id_,
this->total_w_, this->total_snd_,
this->total_r_, this->total_rcv_));
if (this->io_count_ != 0)
ACE_ERROR ((LM_WARNING,
- ACE_TEXT ("(%t) Sender %d deleted with %d I/O outstanding\n"),
- this->index_,
+ ACE_TEXT ("(%t) Client %d deleted with %d I/O outstanding\n"),
+ this->id_,
this->io_count_));
- // This test bounces data back and forth between Senders and Receivers.
+ // This test bounces data back and forth between Clients and Servers.
// Therefore, if there was significantly more data in one direction, that's
// a problem. Remember, the byte counts are unsigned values.
int issue_data_warning = 0;
@@ -1130,20 +1178,20 @@ Sender::~Sender (void)
ACE_DEBUG ((LM_WARNING,
ACE_TEXT ("(%t) Above byte counts look odd; need review\n")));
- if (this->connector_ != 0)
- this->connector_->on_delete_sender (*this);
+ if (this->tester_ != 0)
+ this->tester_->client_done (this);
+ this->id_ = -1;
+ this->handle_= ACE_INVALID_HANDLE;
if (this->handle_ != ACE_INVALID_HANDLE)
{
ACE_OS::closesocket (this->handle_);
}
-
- this->index_ = -1;
this->handle_= ACE_INVALID_HANDLE;
}
void
-Sender::cancel ()
+Client::cancel ()
{
ACE_GUARD (ACE_SYNCH_MUTEX, monitor, this->lock_);
@@ -1154,12 +1202,12 @@ Sender::cancel ()
}
void
-Sender::close ()
+Client::close ()
{
// This must be called with the lock_ held.
ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("(%t) Closing Sender %d writes; %d I/O outstanding\n"),
- this->index_, this->io_count_));
+ ACE_TEXT ("(%t) Closing Client %d writes; %d I/O outstanding\n"),
+ this->id_, this->io_count_));
ACE_OS::shutdown (this->handle_, ACE_SHUTDOWN_WRITE);
this->stop_writing_ = 1;
return;
@@ -1167,24 +1215,24 @@ Sender::close ()
void
-Sender::addresses (const ACE_INET_Addr& /* peer */, const ACE_INET_Addr& local)
+Client::addresses (const ACE_INET_Addr& /* peer */, const ACE_INET_Addr& local)
{
ACE_TCHAR str[256];
if (0 == local.addr_to_string (str, sizeof (str) / sizeof (ACE_TCHAR)))
ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("(%t) Sender %d connected on %s\n"),
- this->index_,
+ ACE_TEXT ("(%t) Client %d connected on %s\n"),
+ this->id_,
str));
else
- ACE_ERROR ((LM_ERROR, ACE_TEXT ("(%t) Receiver %d %p\n"),
- this->index_,
+ ACE_ERROR ((LM_ERROR, ACE_TEXT ("(%t) Client %d %p\n"),
+ this->id_,
ACE_TEXT ("addr_to_string")));
return;
}
void
-Sender::open (ACE_HANDLE handle, ACE_Message_Block &)
+Client::open (ACE_HANDLE handle, ACE_Message_Block &)
{
{
ACE_GUARD (ACE_SYNCH_MUTEX, monitor, this->lock_);
@@ -1203,13 +1251,13 @@ Sender::open (ACE_HANDLE handle, ACE_Message_Block &)
if (this->ws_.open (*this, this->handle_) == -1)
ACE_ERROR ((LM_ERROR,
ACE_TEXT ("(%t) %p\n"),
- ACE_TEXT ("Sender::ACE_Asynch_Write_Stream::open")));
+ ACE_TEXT ("Client::ACE_Asynch_Write_Stream::open")));
// Open ACE_Asynch_Read_Stream
else if (this->rs_.open (*this, this->handle_) == -1)
ACE_ERROR ((LM_ERROR,
ACE_TEXT ("(%t) %p\n"),
- ACE_TEXT ("Sender::ACE_Asynch_Read_Stream::open")));
+ ACE_TEXT ("Client::ACE_Asynch_Read_Stream::open")));
else if (this->initiate_write_stream () == 0)
{
@@ -1224,7 +1272,7 @@ Sender::open (ACE_HANDLE handle, ACE_Message_Block &)
}
int
-Sender::initiate_write_stream (void)
+Client::initiate_write_stream (void)
{
if (this->flg_cancel_ != 0 ||
this->stop_writing_ ||
@@ -1268,7 +1316,7 @@ Sender::initiate_write_stream (void)
mb1->release ();
ACE_ERROR_RETURN((LM_ERROR,
ACE_TEXT ("(%t) %p\n"),
- ACE_TEXT ("Sender::ACE_Asynch_Stream::writev")),
+ ACE_TEXT ("Client::ACE_Asynch_Stream::writev")),
-1);
}
#else /* (defined (ACE_WIN32) && !defined (ACE_HAS_WINCE)) */
@@ -1288,13 +1336,13 @@ Sender::initiate_write_stream (void)
// On peer close, WriteFile will yield ERROR_NETNAME_DELETED.
if (ACE_OS::last_error () == ERROR_NETNAME_DELETED)
ACE_ERROR_RETURN ((LM_DEBUG,
- ACE_TEXT ("(%t) Sender %d, peer gone\n"),
- this->index_),
+ ACE_TEXT ("(%t) Client %d, peer gone\n"),
+ this->id_),
-1);
#endif /* ACE_WIN32 */
ACE_ERROR_RETURN((LM_ERROR,
- ACE_TEXT ("(%t) Sender %d, %p\n"),
- this->index_,
+ ACE_TEXT ("(%t) Client %d, %p\n"),
+ this->id_,
ACE_TEXT ("write")),
-1);
}
@@ -1306,12 +1354,13 @@ Sender::initiate_write_stream (void)
}
int
-Sender::initiate_read_stream (void)
+Client::initiate_read_stream (void)
{
if (this->flg_cancel_ != 0 || this->handle_ == ACE_INVALID_HANDLE)
return -1;
- static const size_t complete_message_length = ACE_OS::strlen (complete_message);
+ static const size_t complete_message_length =
+ ACE_OS::strlen (complete_message);
#if (defined (ACE_WIN32) && !defined (ACE_HAS_WINCE))
ACE_Message_Block *mb1 = 0,
@@ -1359,7 +1408,7 @@ Sender::initiate_read_stream (void)
mb1->release ();
ACE_ERROR_RETURN ((LM_ERROR,
ACE_TEXT ("(%t) %p\n"),
- ACE_TEXT ("Sender::ACE_Asynch_Read_Stream::readv")),
+ ACE_TEXT ("Client::ACE_Asynch_Read_Stream::readv")),
-1);
}
#else /* (defined (ACE_WIN32) && !defined (ACE_HAS_WINCE)) */
@@ -1373,8 +1422,8 @@ Sender::initiate_read_stream (void)
// We allocate +1 only for proper printing - we can just set the last byte
// to '\0' before printing out
ACE_NEW_RETURN (mb,
- ACE_Message_Block (blksize + 1)
- , -1);
+ ACE_Message_Block (blksize + 1),
+ -1);
// Inititiate read
if (this->rs_.read (*mb, mb->size () - 1) == -1)
@@ -1385,13 +1434,13 @@ Sender::initiate_read_stream (void)
// a 0-byte read as we would if underlying calls used WSARecv.
if (ACE_OS::last_error () == ERROR_NETNAME_DELETED)
ACE_ERROR_RETURN ((LM_DEBUG,
- ACE_TEXT ("(%t) Receiver %d, peer closed\n"),
- this->index_),
+ ACE_TEXT ("(%t) Client %d, peer closed\n"),
+ this->id_),
-1);
#endif /* ACE_WIN32 */
ACE_ERROR_RETURN ((LM_ERROR,
- ACE_TEXT ("(%t) Sender %d, %p\n"),
- this->index_,
+ ACE_TEXT ("(%t) Client %d, %p\n"),
+ this->id_,
ACE_TEXT ("read")),
-1);
}
@@ -1403,7 +1452,7 @@ Sender::initiate_read_stream (void)
}
void
-Sender::handle_write_stream (const ACE_Asynch_Write_Stream::Result &result)
+Client::handle_write_stream (const ACE_Asynch_Write_Stream::Result &result)
{
{
ACE_GUARD (ACE_SYNCH_MUTEX, monitor, this->lock_);
@@ -1415,8 +1464,8 @@ Sender::handle_write_stream (const ACE_Asynch_Write_Stream::Result &result)
LogLocker log_lock;
ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("(%t) **** Sender %d: handle_write_stream() ****\n"),
- index_));
+ ACE_TEXT ("(%t) **** Client %d: handle_write_stream() ****\n"),
+ this->id_));
ACE_DEBUG ((LM_DEBUG,
ACE_TEXT ("%s = %d\n"),
ACE_TEXT ("bytes_to_write"),
@@ -1505,15 +1554,15 @@ Sender::handle_write_stream (const ACE_Asynch_Write_Stream::Result &result)
prio = LM_ERROR;
ACE_Log_Msg::instance ()->errnum (result.error ());
ACE_Log_Msg::instance ()->log (prio,
- ACE_TEXT ("(%t) Sender %d; %p\n"),
- this->index_,
+ ACE_TEXT ("(%t) Client %d; %p\n"),
+ this->id_,
ACE_TEXT ("write"));
}
else if (loglevel > 0)
{
ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("(%t) Sender %d: wrote %d bytes ok\n"),
- this->index_,
+ ACE_TEXT ("(%t) Client %d: wrote %d bytes ok\n"),
+ this->id_,
result.bytes_transferred ()));
}
@@ -1522,13 +1571,13 @@ Sender::handle_write_stream (const ACE_Asynch_Write_Stream::Result &result)
if (result.error () == 0 && result.bytes_transferred () > 0)
{
this->total_snd_ += result.bytes_transferred ();
- if (this->total_snd_ >= xfer_limit)
- {
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("(%t) Sender %d sent %d, limit %d\n"),
- this->index_, this->total_snd_, xfer_limit));
- this->close ();
- }
+ if (this->total_snd_ >= xfer_limit)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("(%t) Client %d sent %d, limit %d\n"),
+ this->id_, this->total_snd_, xfer_limit));
+ this->close ();
+ }
if (duplex != 0) // full duplex, continue write
{
if ((this->total_snd_- this->total_rcv_) < 1024*32 ) //flow control
@@ -1546,7 +1595,7 @@ Sender::handle_write_stream (const ACE_Asynch_Write_Stream::Result &result)
}
void
-Sender::handle_read_stream (const ACE_Asynch_Read_Stream::Result &result)
+Client::handle_read_stream (const ACE_Asynch_Read_Stream::Result &result)
{
{
ACE_GUARD (ACE_SYNCH_MUTEX, monitor, this->lock_);
@@ -1558,8 +1607,8 @@ Sender::handle_read_stream (const ACE_Asynch_Read_Stream::Result &result)
LogLocker log_lock;
ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("(%t) **** Sender %d: handle_read_stream() ****\n"),
- index_));
+ ACE_TEXT ("(%t) **** Client %d: handle_read_stream() ****\n"),
+ this->id_));
ACE_DEBUG ((LM_DEBUG,
ACE_TEXT ("%s = %d\n"),
ACE_TEXT ("bytes_to_read"),
@@ -1631,15 +1680,15 @@ Sender::handle_read_stream (const ACE_Asynch_Read_Stream::Result &result)
prio = LM_ERROR;
ACE_Log_Msg::instance ()->errnum (result.error ());
ACE_Log_Msg::instance ()->log (prio,
- ACE_TEXT ("(%t) Sender %d; %p\n"),
- this->index_,
+ ACE_TEXT ("(%t) Client %d; %p\n"),
+ this->id_,
ACE_TEXT ("read"));
}
else if (loglevel > 0)
{
ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("(%t) Sender %d: read %d bytes ok\n"),
- this->index_,
+ ACE_TEXT ("(%t) Client %d: read %d bytes ok\n"),
+ this->id_,
result.bytes_transferred ()));
}
@@ -1679,10 +1728,10 @@ print_usage (int /* argc */, ACE_TCHAR *argv[])
ACE_TEXT ("\n s SUN")
ACE_TEXT ("\n d default")
ACE_TEXT ("\n-d <duplex mode 1-on/0-off>")
- ACE_TEXT ("\n-h <host> for Sender mode")
+ ACE_TEXT ("\n-h <host> for Client mode")
ACE_TEXT ("\n-n <number threads for Proactor pool>")
ACE_TEXT ("\n-p <port to listen/connect>")
- ACE_TEXT ("\n-s <number of sender's instances>")
+ ACE_TEXT ("\n-c <number of client instances>")
ACE_TEXT ("\n-b run client and server at the same time")
ACE_TEXT ("\n f file")
ACE_TEXT ("\n c console")
@@ -1690,7 +1739,7 @@ print_usage (int /* argc */, ACE_TCHAR *argv[])
ACE_TEXT ("\n 0 - log errors and highlights")
ACE_TEXT ("\n 1 - log level 0 plus progress information")
ACE_TEXT ("\n 2 - log level 1 plus operation parameters and results")
- ACE_TEXT ("\n-x max transfer byte count per Sender")
+ ACE_TEXT ("\n-x max transfer byte count per Client")
ACE_TEXT ("\n-u show this message")
ACE_TEXT ("\n"),
argv[0]
@@ -1742,7 +1791,7 @@ parse_args (int argc, ACE_TCHAR *argv[])
max_aio_operations = 512; // POSIX Proactor params
proactor_type = DEFAULT; // Proactor type = default
threads = 3; // size of Proactor thread pool
- senders = 10; // number of senders
+ clients = 10; // number of clients
loglevel = 0; // log level : only errors and highlights
// Default transfer limit 50 messages per Sender
xfer_limit = 50 * ACE_OS::strlen (complete_message);
@@ -1750,7 +1799,7 @@ parse_args (int argc, ACE_TCHAR *argv[])
if (argc == 1) // no arguments , so one button test
return 0;
- ACE_Get_Opt get_opt (argc, argv, ACE_TEXT ("x:t:o:n:p:d:h:s:v:ub"));
+ ACE_Get_Opt get_opt (argc, argv, ACE_TEXT ("x:t:o:n:p:d:h:c:v:ub"));
int c;
while ((c = get_opt ()) != EOF)
@@ -1758,10 +1807,9 @@ parse_args (int argc, ACE_TCHAR *argv[])
switch (c)
{
case 'x': // xfer limit
- xfer_limit = ACE_static_cast (size_t,
- ACE_OS::atoi (get_opt.opt_arg ()));
+ xfer_limit = static_cast<size_t> (ACE_OS::atoi (get_opt.opt_arg ()));
if (xfer_limit == 0)
- xfer_limit = 1; // Bare minimum.
+ xfer_limit = 1; // Bare minimum.
break;
case 'b': // both client and server
both = 1;
@@ -1781,10 +1829,10 @@ parse_args (int argc, ACE_TCHAR *argv[])
case 'n': // thread pool size
threads = ACE_OS::atoi (get_opt.opt_arg ());
break;
- case 's': // number of senders
- senders = ACE_OS::atoi (get_opt.opt_arg ());
- if (senders > MAX_SENDERS)
- senders = MAX_SENDERS;
+ case 'c': // number of clients
+ clients = ACE_OS::atoi (get_opt.opt_arg ());
+ if (clients > MAX_CLIENTS)
+ clients = MAX_CLIENTS;
break;
case 'o': // max number of aio for proactor
max_aio_operations = ACE_OS::atoi (get_opt.opt_arg ());
@@ -1821,118 +1869,50 @@ run_main (int argc, ACE_TCHAR *argv[])
disable_signal (SIGPIPE, SIGPIPE);
MyTask task1;
- Acceptor acceptor;
- Connector connector;
+ TestData test;
- if (task1.start (threads,
- proactor_type,
- max_aio_operations) == 0)
+ if (task1.start (threads, proactor_type, max_aio_operations) == 0)
{
+ Acceptor acceptor (&test);
+ Connector connector (&test);
+ ACE_INET_Addr addr (port);
+
int rc = 0;
if (both != 0 || host == 0) // Acceptor
{
// Simplify, initial read with zero size
- if (acceptor.open (ACE_INET_Addr (port), 0, 1) == 0)
+ if (acceptor.open (addr, 0, 1) == 0)
rc = 1;
}
if (both != 0 || host != 0)
{
- ACE_INET_Addr addr;
if (host == 0)
host = ACE_LOCALHOST;
- if (addr.set (port, host) == -1)
+ if (addr.set (port, host, 1, addr.get_type ()) == -1)
ACE_ERROR ((LM_ERROR, ACE_TEXT ("%p\n"), host));
else
- rc += connector.start (addr, senders);
+ rc += connector.start (addr, clients);
}
- }
- // Wait a couple of seconds to let things get going, then poll til
- // all sessions are done.
- ACE_OS::sleep (2);
+ // Wait a few seconds to let things get going, then poll til
+ // all sessions are done. Note that when we exit this scope, the
+ // Acceptor and Connector will be destroyed, which should prevent
+ // further connections and also test how well destroyed handlers
+ // are handled.
+ ACE_OS::sleep (3);
+ }
ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) Sleeping til sessions run down.\n")));
- while (acceptor.get_number_sessions () > 0 ||
- connector.get_number_sessions () > 0 )
+ while (!test.testing_done ())
ACE_OS::sleep (1);
-#if 0
- // Cancel all pending AIO on Connector and Senders
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("(%t) Cancel Connector/Senders: sessions_=%d\n"),
- connector.get_number_sessions ()
- ));
- connector.cancel_all ();
-#endif
-
- //Cancel all pending AIO on Acceptor And Receivers
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("(%t) Cancel Acceptor/Receivers:sessions_=%d\n"),
- acceptor.get_number_sessions ()
- ));
- acceptor.cancel_all ();
+ test.stop_all ();
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("(%t) Stop Thread Pool Task\n")
- ));
+ ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) Stop Thread Pool Task\n")));
task1.stop ();
- // As Proactor event loop now is inactive it is safe to destroy all
- // Senders
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("(%t) Stop Connector/Senders: sessions_=%d\n"),
- connector.get_number_sessions ()
- ));
- connector.stop ();
-
- // As Proactor event loop now is inactive it is safe to destroy all
- // Receivers
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("(%t) Stop Acceptor/Receivers:sessions_=%d\n"),
- acceptor.get_number_sessions ()
- ));
- acceptor.stop ();
-
- //Print statistic
- ACE_TCHAR bufs [256];
- ACE_TCHAR bufr [256];
-
- ACE_OS::sprintf (bufs,
- ACE_SIZE_T_FORMAT_SPECIFIER
- ACE_TEXT ("(%ld)"),
- connector.get_total_snd (),
- connector.get_total_w ());
-
- ACE_OS::sprintf (bufr,
- ACE_SIZE_T_FORMAT_SPECIFIER
- ACE_TEXT ("(%ld)"),
- connector.get_total_rcv (),
- connector.get_total_r ());
-
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("Connector/Senders total bytes: snd=%s rcv=%s\n"),
- bufs,
- bufr));
-
- ACE_OS::sprintf (bufs,
- ACE_SIZE_T_FORMAT_SPECIFIER
- ACE_TEXT ("(%ld)"),
- acceptor.get_total_snd (),
- acceptor.get_total_w ());
-
- ACE_OS::sprintf (bufr,
- ACE_SIZE_T_FORMAT_SPECIFIER
- ACE_TEXT ("(%ld)"),
- acceptor.get_total_rcv (),
- acceptor.get_total_r ());
-
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("Acceptor/Receivers total bytes: snd=%s rcv=%s\n"),
- bufs,
- bufr));
-
ACE_END_TEST;
return 0;
@@ -1940,13 +1920,13 @@ run_main (int argc, ACE_TCHAR *argv[])
#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)
-template class ACE_Asynch_Acceptor<Receiver>;
-template class ACE_Asynch_Connector<Sender>;
+template class ACE_Asynch_Acceptor<Server>;
+template class ACE_Asynch_Connector<Client>;
#elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA)
-#pragma instantiate ACE_Asynch_Acceptor<Receiver>
-#pragma instantiate ACE_Asynch_Connector<Sender>
+#pragma instantiate ACE_Asynch_Acceptor<Server>
+#pragma instantiate ACE_Asynch_Connector<Client>
#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */