summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorbala <balanatarajan@users.noreply.github.com>2002-05-07 15:32:49 +0000
committerbala <balanatarajan@users.noreply.github.com>2002-05-07 15:32:49 +0000
commit658c32caa59e98b1342d1e60d5e33ceca9f1ad82 (patch)
tree1d1394e5a8ef6f9bedb4e5c975da293f2c952834
parentbbecaecb61404d98803a629a74d91f885c04c349 (diff)
downloadATCD-658c32caa59e98b1342d1e60d5e33ceca9f1ad82.tar.gz
ChangeLogTag: Tue May 7 10:31:24 2002 Alex Libman <AlexL@rumblegroup.com>
-rw-r--r--ChangeLog9
-rw-r--r--ChangeLogs/ChangeLog-02a9
-rw-r--r--ChangeLogs/ChangeLog-03a9
-rw-r--r--tests/TP_Reactor_Test.cpp128
-rw-r--r--tests/TP_Reactor_Test.h46
-rw-r--r--tests/run_test.lst2
6 files changed, 169 insertions, 34 deletions
diff --git a/ChangeLog b/ChangeLog
index 8bc6ca7e1bd..35c1e23c5e2 100644
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,3 +1,12 @@
+Tue May 7 10:31:24 2002 Alex Libman <AlexL@rumblegroup.com>
+
+ * tests/TP_Reactor_Test.cpp:
+ * tests/TP_Reactor_Test.h: Fixed the tests to work fine on all
+ platforms.
+
+ * tests/run_test.lst (TP_Reactor_Test): Enabled the test for the
+ daily runs.
+
Mon May 06 16:31:14 2002 Irfan Pyarali <irfan@cs.wustl.edu>
* examples/Reactor/WFMO_Reactor/Abandoned.cpp (handle_timeout):
diff --git a/ChangeLogs/ChangeLog-02a b/ChangeLogs/ChangeLog-02a
index 8bc6ca7e1bd..35c1e23c5e2 100644
--- a/ChangeLogs/ChangeLog-02a
+++ b/ChangeLogs/ChangeLog-02a
@@ -1,3 +1,12 @@
+Tue May 7 10:31:24 2002 Alex Libman <AlexL@rumblegroup.com>
+
+ * tests/TP_Reactor_Test.cpp:
+ * tests/TP_Reactor_Test.h: Fixed the tests to work fine on all
+ platforms.
+
+ * tests/run_test.lst (TP_Reactor_Test): Enabled the test for the
+ daily runs.
+
Mon May 06 16:31:14 2002 Irfan Pyarali <irfan@cs.wustl.edu>
* examples/Reactor/WFMO_Reactor/Abandoned.cpp (handle_timeout):
diff --git a/ChangeLogs/ChangeLog-03a b/ChangeLogs/ChangeLog-03a
index 8bc6ca7e1bd..35c1e23c5e2 100644
--- a/ChangeLogs/ChangeLog-03a
+++ b/ChangeLogs/ChangeLog-03a
@@ -1,3 +1,12 @@
+Tue May 7 10:31:24 2002 Alex Libman <AlexL@rumblegroup.com>
+
+ * tests/TP_Reactor_Test.cpp:
+ * tests/TP_Reactor_Test.h: Fixed the tests to work fine on all
+ platforms.
+
+ * tests/run_test.lst (TP_Reactor_Test): Enabled the test for the
+ daily runs.
+
Mon May 06 16:31:14 2002 Irfan Pyarali <irfan@cs.wustl.edu>
* examples/Reactor/WFMO_Reactor/Abandoned.cpp (handle_timeout):
diff --git a/tests/TP_Reactor_Test.cpp b/tests/TP_Reactor_Test.cpp
index 3ef9b447156..b09344faecf 100644
--- a/tests/TP_Reactor_Test.cpp
+++ b/tests/TP_Reactor_Test.cpp
@@ -103,6 +103,17 @@ static char data[] =
"Connection: Keep-Alive\r\n"
"\r\n" ;
+// *************************************************************
+
+class LogLocker
+{
+public:
+
+ LogLocker () { ACE_LOG_MSG->acquire (); }
+ virtual ~LogLocker () { ACE_LOG_MSG->release (); }
+};
+// *************************************************************
+
/**
* @class MyTask
*
@@ -246,7 +257,9 @@ MyTask::svc (void)
Acceptor::Acceptor (void)
: ACE_Acceptor<Receiver,ACE_SOCK_ACCEPTOR> ((ACE_Reactor *) 0),
- sessions_ (0)
+ sessions_ (0),
+ total_snd_(0),
+ total_rcv_(0)
{
ACE_Guard<ACE_Recursive_Thread_Mutex> locker (this->mutex_);
@@ -292,13 +305,27 @@ Acceptor::on_delete_receiver (Receiver &rcvr)
ACE_Guard<ACE_Recursive_Thread_Mutex> locker (this->mutex_);
this->sessions_--;
+
+ this->total_snd_ += rcvr.get_total_snd();
+ this->total_rcv_ += rcvr.get_total_rcv();
+
+ ACE_TCHAR bufs [256];
+ ACE_TCHAR bufr [256];
+
+ ACE_OS::sprintf ( bufs , ACE_TEXT ("%ld"), rcvr.get_total_snd ());
+ ACE_OS::sprintf ( bufr , ACE_TEXT ("%ld"), rcvr.get_total_rcv ());
+
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("Receiver::~DTOR index=%d snd=%s rcv=%s sessions_=%d\n"),
+ rcvr.index_,
+ bufs,
+ bufr,
+ this->sessions_));
+
if (rcvr.index_ < MAX_RECEIVERS
&& this->list_receivers_[rcvr.index_] == &rcvr)
this->list_receivers_[rcvr.index_] = 0;
- ACE_DEBUG ((LM_DEBUG,
- "Receiver::~DTOR sessions_=%d\n",
- this->sessions_));
}
int
@@ -311,7 +338,7 @@ Acceptor::start (const ACE_INET_Addr &addr)
ACE_ERROR_RETURN ((LM_ERROR,
"%p\n",
"Acceptor::start () - open failed"),
- 0);
+ 0);
return 1;
}
@@ -339,7 +366,9 @@ Acceptor::make_svc_handler (Receiver *&sh)
Receiver::Receiver (Acceptor * acceptor, int index)
: acceptor_ (acceptor),
index_ (index),
- flg_mask_ (ACE_Event_Handler::NULL_MASK)
+ flg_mask_ (ACE_Event_Handler::NULL_MASK),
+ total_snd_(0),
+ total_rcv_(0)
{
if (acceptor_ != 0)
acceptor_->on_new_receiver (*this);
@@ -447,7 +476,10 @@ Receiver::handle_input (ACE_HANDLE h)
ssize_t res = this->peer ().recv (mb->rd_ptr (), BUFSIZ-1);
if (res >= 0)
- mb->wr_ptr (res);
+ {
+ mb->wr_ptr (res);
+ this->total_rcv_ += res;
+ }
else
err = errno ;
@@ -455,6 +487,8 @@ Receiver::handle_input (ACE_HANDLE h)
if (loglevel == 0 || res <= 0 || err!= 0)
{
+ LogLocker log_lock;
+
ACE_DEBUG ((LM_DEBUG, "**** Receiver::handle_input () SessionId=%d****\n", index_));
ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_to_read", BUFSIZ));
ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "handle", h));
@@ -530,9 +564,14 @@ Receiver::handle_output (ACE_HANDLE h)
if (res < 0)
err = errno ;
+ else
+ this->total_snd_ += res;
+
if (loglevel == 0 || res <= 0 || err!= 0)
{
+ LogLocker log_lock;
+
ACE_DEBUG ((LM_DEBUG, "**** Receiver::handle_output () SessionId=%d****\n", index_));
ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_to_write", bytes));
ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "handle", h));
@@ -564,7 +603,9 @@ Receiver::handle_output (ACE_HANDLE h)
Connector::Connector (void)
: ACE_Connector<Sender,ACE_SOCK_CONNECTOR> ((ACE_Reactor *) 0),
- sessions_ (0)
+ sessions_ (0),
+ total_snd_(0),
+ total_rcv_(0)
{
ACE_Guard<ACE_Recursive_Thread_Mutex> locker (this->mutex_);
@@ -611,13 +652,27 @@ Connector::on_delete_sender (Sender & sndr)
ACE_Guard<ACE_Recursive_Thread_Mutex> locker (this->mutex_);
this->sessions_--;
+
+ this->total_snd_ += sndr.get_total_snd();
+ this->total_rcv_ += sndr.get_total_rcv();
+
+ ACE_TCHAR bufs [256];
+ ACE_TCHAR bufr [256];
+
+ ACE_OS::sprintf ( bufs , ACE_TEXT ("%ld"), sndr.get_total_snd ());
+ ACE_OS::sprintf ( bufr , ACE_TEXT ("%ld"), sndr.get_total_rcv ());
+
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("Sender::~DTOR index=%d snd=%s rcv=%s sessions_=%d\n"),
+ sndr.index_,
+ bufs,
+ bufr,
+ this->sessions_));
+
if (sndr.index_ < MAX_SENDERS
&& this->list_senders_[sndr.index_] == &sndr)
this->list_senders_[sndr.index_] = 0;
- ACE_DEBUG ((LM_DEBUG,
- "Sender::~DTOR sessions_=%d\n",
- this->sessions_));
}
int
@@ -676,7 +731,9 @@ Connector::make_svc_handler (Sender * & sh)
Sender::Sender (Connector* connector, int index)
: connector_ (connector),
index_ (index),
- flg_mask_ (ACE_Event_Handler::NULL_MASK)
+ flg_mask_ (ACE_Event_Handler::NULL_MASK),
+ total_snd_(0),
+ total_rcv_(0)
{
if (connector_ != 0)
connector_->on_new_sender (*this);
@@ -739,7 +796,7 @@ int Sender::open (void *)
int
Sender::initiate_write (void)
{
- if (this->msg_queue ()->message_count () < 2) // flow control
+ if ( this->msg_queue ()->message_count () < 2) // flow control
{
int nbytes = ACE_OS::strlen (send_buf_);
@@ -821,7 +878,10 @@ Sender::handle_input (ACE_HANDLE h)
BUFSIZ-1);
if (res >= 0)
- mb->wr_ptr (res);
+ {
+ mb->wr_ptr (res);
+ this->total_rcv_ += res;
+ }
else
err = errno ;
@@ -829,6 +889,8 @@ Sender::handle_input (ACE_HANDLE h)
if (loglevel == 0 || res <= 0 || err!= 0)
{
+ LogLocker log_lock;
+
ACE_DEBUG ((LM_DEBUG, "**** Sender::handle_input () SessionId=%d****\n", index_));
ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_to_read", BUFSIZ));
ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "handle", h));
@@ -863,7 +925,7 @@ Sender::handle_input (ACE_HANDLE h)
rc = initiate_write ();
if (rc != 0)
return -1;
-
+
return check_destroy ();
}
@@ -888,9 +950,13 @@ Sender::handle_output (ACE_HANDLE h)
if (res < 0)
err = errno ;
+ else
+ this->total_snd_ += res;
if (loglevel == 0 || res <= 0 || err!= 0)
{
+ LogLocker log_lock;
+
ACE_DEBUG ((LM_DEBUG, "**** Sender::handle_output () SessionId=%d****\n", index_));
ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_to_write", bytes));
ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "handle", h));
@@ -910,7 +976,8 @@ Sender::handle_output (ACE_HANDLE h)
if (qcount < 0) // no more message blocks in queue
{
- if (duplex != 0) // full duplex, continue write
+ if (duplex != 0 && // full duplex, continue write
+ (this->total_snd_ - this->total_rcv_ ) < 1024 ) // flow control
rc = initiate_write ();
else
rc = terminate_io (ACE_Event_Handler::WRITE_MASK);
@@ -965,7 +1032,7 @@ parse_args (int argc, ACE_TCHAR *argv[])
threads = 3; // size of Proactor thread pool
senders = 20; // number of senders
loglevel = 0; // log level : 0 full/ 1 only errors
- seconds = 2; // time to run in seconds
+ seconds = 20; // time to run in seconds
return 0;
}
@@ -1089,9 +1156,7 @@ ACE_TMAIN (int argc, ACE_TCHAR *argv[])
int rc = 0;
if (both != 0 || host == 0) // Acceptor
- // Simplify, initial read with zero size
- if (acceptor.start (ACE_INET_Addr (port)) == 0)
- rc = 1;
+ rc += acceptor.start (ACE_INET_Addr (port));
if (both != 0 || host != 0)
{
@@ -1120,6 +1185,29 @@ ACE_TMAIN (int argc, ACE_TCHAR *argv[])
connector.stop ();
acceptor.stop ();
+
+ //Print statistic
+ ACE_TCHAR bufs [256];
+ ACE_TCHAR bufr [256];
+
+ ACE_OS::sprintf (bufs , ACE_TEXT ("%ld"), connector.get_total_snd());
+ ACE_OS::sprintf (bufr , ACE_TEXT ("%ld"), connector.get_total_rcv() );
+
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("Connector/Senders total bytes: snd=%s rcv=%s\n"),
+ bufs,
+ bufr
+ ));
+
+ ACE_OS::sprintf (bufs , ACE_TEXT ("%ld"), acceptor.get_total_snd());
+ ACE_OS::sprintf (bufr , ACE_TEXT ("%ld"), acceptor.get_total_rcv() );
+
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("Acceptor/Receivers total bytes: snd=%s rcv=%s\n"),
+ bufs,
+ bufr
+ ));
+
#else /* ACE_HAS_THREADS */
ACE_UNUSED_ARG( argc );
ACE_UNUSED_ARG( argv );
diff --git a/tests/TP_Reactor_Test.h b/tests/TP_Reactor_Test.h
index 3a9271f3841..e896f5b853e 100644
--- a/tests/TP_Reactor_Test.h
+++ b/tests/TP_Reactor_Test.h
@@ -30,7 +30,7 @@
#include "ace/Svc_Handler.h"
#include "ace/Synch.h"
-const size_t MAX_SENDERS = 100;
+const size_t MAX_SENDERS = 1000;
const size_t MAX_RECEIVERS = 1000;
@@ -49,6 +49,9 @@ public:
~Receiver (void);
+ long get_total_snd (void) { return this->total_snd_; }
+ long get_total_rcv (void) { return this->total_rcv_; }
+
// virtual from ACE_Svc_Handler<>
virtual int open (void * pVoid);
@@ -67,6 +70,8 @@ private:
int flg_mask_;
ACE_Recursive_Thread_Mutex mutex_;
+ long total_snd_;
+ long total_rcv_;
};
// *************************************************************
@@ -75,25 +80,29 @@ class Acceptor : public ACE_Acceptor<Receiver,ACE_SOCK_ACCEPTOR>
{
friend class Receiver;
public:
- size_t get_number_sessions (void) { return sessions_; }
+ size_t get_number_sessions (void) { return sessions_; }
+ long get_total_snd (void) { return this->total_snd_; }
+ long get_total_rcv (void) { return this->total_rcv_; }
- Acceptor (void);
- virtual ~Acceptor (void);
+ Acceptor (void);
+ virtual ~Acceptor (void);
- void stop (void);
- int start (const ACE_INET_Addr & addr);
+ void stop (void);
+ int start (const ACE_INET_Addr & addr);
- // virtual from ACE_Acceptor<Receiver,ACE_SOCK_ACCEPTOR>
- virtual int make_svc_handler (Receiver * & sh);
+ // virtual from ACE_Acceptor<Receiver,ACE_SOCK_ACCEPTOR>
+ virtual int make_svc_handler (Receiver * & sh);
private:
- ACE_Recursive_Thread_Mutex mutex_;
- size_t sessions_;
- Receiver *list_receivers_[MAX_RECEIVERS];
+ ACE_Recursive_Thread_Mutex mutex_;
+ size_t sessions_;
+ Receiver *list_receivers_[MAX_RECEIVERS];
+ long total_snd_;
+ long total_rcv_;
- void on_new_receiver (Receiver & rcvr);
- void on_delete_receiver (Receiver & rcvr);
+ void on_new_receiver (Receiver & rcvr);
+ void on_delete_receiver (Receiver & rcvr);
};
@@ -112,6 +121,9 @@ public:
~Sender (void);
+ long get_total_snd (void) { return this->total_snd_; }
+ long get_total_rcv (void) { return this->total_rcv_; }
+
// virtual from ACE_Svc_Handler<>
virtual int open (void * pVoid);
@@ -133,6 +145,8 @@ private:
ACE_Recursive_Thread_Mutex mutex_;
char send_buf_ [1024];
+ long total_snd_;
+ long total_rcv_;
};
// *************************************************************
@@ -142,6 +156,9 @@ class Connector: public ACE_Connector<Sender,ACE_SOCK_CONNECTOR>
friend class Sender;
public:
long get_number_sessions (void) { return sessions_; }
+ long get_total_snd (void) { return this->total_snd_; }
+ long get_total_rcv (void) { return this->total_rcv_; }
+
Connector ();
virtual ~Connector ();
@@ -157,9 +174,12 @@ private:
ACE_Recursive_Thread_Mutex mutex_;
size_t sessions_;
Sender * list_senders_ [MAX_SENDERS];
+ long total_snd_;
+ long total_rcv_;
void on_new_sender (Sender & sndr);
void on_delete_sender (Sender & sndr);
};
+
#endif /* ACE_TESTS_TP_REACTOR_TEST_H */
diff --git a/tests/run_test.lst b/tests/run_test.lst
index f5a0f1e3384..99f54949131 100644
--- a/tests/run_test.lst
+++ b/tests/run_test.lst
@@ -117,7 +117,7 @@ Time_Service_Test: ALL !STATIC !DISABLED !missing_netsvcs TOKEN !chorus !Unicos
Time_Value_Test
Token_Strategy_Test
Tokens_Test: ALL MSVC !DISABLED TOKEN !chorus !Unicos
-TP_Reactor_Test: ALL !DISABLED
+TP_Reactor_Test: ALL
TSS_Test
Vector_Test
UPIPE_SAP_Test: !VxWorks