summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorbala <balanatarajan@users.noreply.github.com>2002-05-13 04:23:33 +0000
committerbala <balanatarajan@users.noreply.github.com>2002-05-13 04:23:33 +0000
commit8c479b9d712c43613eeb4769bc8306230e0865ed (patch)
tree05bad4caebade61c561319e203bb87b9fa8c47a0
parente5d117da106c4656b2f7420727e4869475fe411a (diff)
downloadATCD-8c479b9d712c43613eeb4769bc8306230e0865ed.tar.gz
ChangeLogTag: Sun May 12 23:20:06 2002 Alex Libman <AlexL@rumblegroup.com>
-rw-r--r--ChangeLog12
-rw-r--r--ChangeLogs/ChangeLog-02a12
-rw-r--r--ChangeLogs/ChangeLog-03a12
-rw-r--r--tests/TP_Reactor_Test.cpp97
-rw-r--r--tests/TP_Reactor_Test.h16
5 files changed, 120 insertions, 29 deletions
diff --git a/ChangeLog b/ChangeLog
index b6b7e724521..1fed9cf64d6 100644
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,3 +1,15 @@
+Sun May 12 23:20:06 2002 Alex Libman <AlexL@rumblegroup.com>
+
+ * tests/TP_Reactor_Test.{h,cpp}:
+ Improved flow-control in full-duplex mode by increasing the
+ window size, total_send - total_recv to achieve max performance
+ from TCP/IP
+
+ Improved static info which includes the number of bytes
+ sent/recv and the number of read/write operations. This allows
+ to see distribution of I/O operations between handlers. Would be
+ useful for TP_Reactor's future improvements.
+
Sun May 12 10:11:07 2002 Douglas C. Schmidt <schmidt@macarena.cs.wustl.edu>
* ace/Message_Queue_T.cpp: Updated all the enqueue*() and
diff --git a/ChangeLogs/ChangeLog-02a b/ChangeLogs/ChangeLog-02a
index b6b7e724521..1fed9cf64d6 100644
--- a/ChangeLogs/ChangeLog-02a
+++ b/ChangeLogs/ChangeLog-02a
@@ -1,3 +1,15 @@
+Sun May 12 23:20:06 2002 Alex Libman <AlexL@rumblegroup.com>
+
+ * tests/TP_Reactor_Test.{h,cpp}:
+ Improved flow-control in full-duplex mode by increasing the
+ window size, total_send - total_recv to achieve max performance
+ from TCP/IP
+
+ Improved static info which includes the number of bytes
+ sent/recv and the number of read/write operations. This allows
+ to see distribution of I/O operations between handlers. Would be
+ useful for TP_Reactor's future improvements.
+
Sun May 12 10:11:07 2002 Douglas C. Schmidt <schmidt@macarena.cs.wustl.edu>
* ace/Message_Queue_T.cpp: Updated all the enqueue*() and
diff --git a/ChangeLogs/ChangeLog-03a b/ChangeLogs/ChangeLog-03a
index b6b7e724521..1fed9cf64d6 100644
--- a/ChangeLogs/ChangeLog-03a
+++ b/ChangeLogs/ChangeLog-03a
@@ -1,3 +1,15 @@
+Sun May 12 23:20:06 2002 Alex Libman <AlexL@rumblegroup.com>
+
+ * tests/TP_Reactor_Test.{h,cpp}:
+ Improved flow-control in full-duplex mode by increasing the
+ window size, total_send - total_recv to achieve max performance
+ from TCP/IP
+
+ Improved static info which includes the number of bytes
+ sent/recv and the number of read/write operations. This allows
+ to see distribution of I/O operations between handlers. Would be
+ useful for TP_Reactor's future improvements.
+
Sun May 12 10:11:07 2002 Douglas C. Schmidt <schmidt@macarena.cs.wustl.edu>
* ace/Message_Queue_T.cpp: Updated all the enqueue*() and
diff --git a/tests/TP_Reactor_Test.cpp b/tests/TP_Reactor_Test.cpp
index 52530475341..cbea833a088 100644
--- a/tests/TP_Reactor_Test.cpp
+++ b/tests/TP_Reactor_Test.cpp
@@ -259,7 +259,9 @@ Acceptor::Acceptor (void)
: ACE_Acceptor<Receiver,ACE_SOCK_ACCEPTOR> ((ACE_Reactor *) 0),
sessions_ (0),
total_snd_(0),
- total_rcv_(0)
+ total_rcv_(0),
+ total_w_ (0),
+ total_r_ (0)
{
ACE_Guard<ACE_Recursive_Thread_Mutex> locker (this->mutex_);
@@ -306,14 +308,26 @@ Acceptor::on_delete_receiver (Receiver &rcvr)
this->sessions_--;
- this->total_snd_ += rcvr.get_total_snd();
- this->total_rcv_ += rcvr.get_total_rcv();
+ this->total_snd_ += rcvr.get_total_snd ();
+ this->total_rcv_ += rcvr.get_total_rcv ();
+ this->total_w_ += rcvr.get_total_w ();
+ this->total_r_ += rcvr.get_total_r ();
+
+ if (rcvr.index_ >= 0
+ && rcvr.index_ < MAX_RECEIVERS
+ && this->list_receivers_[rcvr.index_] == &rcvr)
+ this->list_receivers_[rcvr.index_] = 0;
ACE_TCHAR bufs [256];
ACE_TCHAR bufr [256];
- ACE_OS::sprintf ( bufs , ACE_TEXT ("%ld"), rcvr.get_total_snd ());
- ACE_OS::sprintf ( bufr , ACE_TEXT ("%ld"), rcvr.get_total_rcv ());
+ ACE_OS::sprintf ( bufs , ACE_TEXT ("%ld(%ld)"),
+ rcvr.get_total_snd (),
+ rcvr.get_total_w () );
+
+ ACE_OS::sprintf ( bufr , ACE_TEXT ("%ld(%ld)"),
+ rcvr.get_total_rcv (),
+ rcvr.get_total_r ());
ACE_DEBUG ((LM_DEBUG,
ACE_TEXT ("Receiver::~DTOR index=%d snd=%s rcv=%s sessions_=%d\n"),
@@ -321,11 +335,6 @@ Acceptor::on_delete_receiver (Receiver &rcvr)
bufs,
bufr,
this->sessions_));
-
- if (rcvr.index_ < MAX_RECEIVERS
- && this->list_receivers_[rcvr.index_] == &rcvr)
- this->list_receivers_[rcvr.index_] = 0;
-
}
int
@@ -368,7 +377,9 @@ Receiver::Receiver (Acceptor * acceptor, int index)
index_ (index),
flg_mask_ (ACE_Event_Handler::NULL_MASK),
total_snd_(0),
- total_rcv_(0)
+ total_rcv_(0),
+ total_w_ (0),
+ total_r_ (0)
{
if (acceptor_ != 0)
acceptor_->on_new_receiver (*this);
@@ -475,6 +486,8 @@ Receiver::handle_input (ACE_HANDLE h)
int err = 0;
ssize_t res = this->peer ().recv (mb->rd_ptr (), BUFSIZ-1);
+ this->total_r_++;
+
if (res >= 0)
{
mb->wr_ptr (res);
@@ -527,7 +540,7 @@ Receiver::handle_input (ACE_HANDLE h)
rc = this->terminate_io (ACE_Event_Handler::READ_MASK);
else // full duplex
{
- if (qcount >= 2) // flow control, stop read
+ if (qcount >= 20 ) // flow control, stop read
rc = this->terminate_io (ACE_Event_Handler::READ_MASK);
else
rc = this->initiate_io (ACE_Event_Handler::READ_MASK);
@@ -562,6 +575,8 @@ Receiver::handle_output (ACE_HANDLE h)
bytes = mb->length ();
res = this->peer ().send (mb->rd_ptr (), bytes);
+ this->total_w_++;
+
if (res < 0)
err = errno ;
else
@@ -587,7 +602,7 @@ Receiver::handle_output (ACE_HANDLE h)
if (err != 0 || res < 0)
return -1;
- if (qcount < 0) // no more message blocks in queue
+ if (qcount <= 0) // no more message blocks in queue
{
if (this->terminate_io (ACE_Event_Handler::WRITE_MASK) != 0)
return -1;
@@ -605,7 +620,9 @@ Connector::Connector (void)
: ACE_Connector<Sender,ACE_SOCK_CONNECTOR> ((ACE_Reactor *) 0),
sessions_ (0),
total_snd_(0),
- total_rcv_(0)
+ total_rcv_(0),
+ total_w_ (0),
+ total_r_ (0)
{
ACE_Guard<ACE_Recursive_Thread_Mutex> locker (this->mutex_);
@@ -652,15 +669,26 @@ Connector::on_delete_sender (Sender & sndr)
ACE_Guard<ACE_Recursive_Thread_Mutex> locker (this->mutex_);
this->sessions_--;
-
this->total_snd_ += sndr.get_total_snd();
this->total_rcv_ += sndr.get_total_rcv();
+ this->total_w_ += sndr.get_total_w();
+ this->total_r_ += sndr.get_total_r();
+
+ if (sndr.index_ >= 0
+ && sndr.index_ < MAX_SENDERS
+ && this->list_senders_[sndr.index_] == &sndr)
+ this->list_senders_[sndr.index_] = 0;
ACE_TCHAR bufs [256];
ACE_TCHAR bufr [256];
- ACE_OS::sprintf ( bufs , ACE_TEXT ("%ld"), sndr.get_total_snd ());
- ACE_OS::sprintf ( bufr , ACE_TEXT ("%ld"), sndr.get_total_rcv ());
+ ACE_OS::sprintf ( bufs , ACE_TEXT ("%ld(%ld)"),
+ sndr.get_total_snd(),
+ sndr.get_total_w() );
+
+ ACE_OS::sprintf ( bufr , ACE_TEXT ("%ld(%ld)"),
+ sndr.get_total_rcv(),
+ sndr.get_total_r() );
ACE_DEBUG ((LM_DEBUG,
ACE_TEXT ("Sender::~DTOR index=%d snd=%s rcv=%s sessions_=%d\n"),
@@ -669,10 +697,6 @@ Connector::on_delete_sender (Sender & sndr)
bufr,
this->sessions_));
- if (sndr.index_ < MAX_SENDERS
- && this->list_senders_[sndr.index_] == &sndr)
- this->list_senders_[sndr.index_] = 0;
-
}
int
@@ -733,7 +757,9 @@ Sender::Sender (Connector* connector, int index)
index_ (index),
flg_mask_ (ACE_Event_Handler::NULL_MASK),
total_snd_(0),
- total_rcv_(0)
+ total_rcv_(0),
+ total_w_ (0),
+ total_r_ (0)
{
if (connector_ != 0)
connector_->on_new_sender (*this);
@@ -796,7 +822,7 @@ int Sender::open (void *)
int
Sender::initiate_write (void)
{
- if ( this->msg_queue ()->message_count () < 2) // flow control
+ if ( this->msg_queue ()->message_count () < 20) // flow control
{
int nbytes = ACE_OS::strlen (send_buf_);
@@ -876,6 +902,7 @@ Sender::handle_input (ACE_HANDLE h)
int err = 0;
ssize_t res = this->peer ().recv (mb->rd_ptr (),
BUFSIZ-1);
+ this->total_r_++;
if (res >= 0)
{
@@ -948,6 +975,8 @@ Sender::handle_output (ACE_HANDLE h)
bytes = mb->length ();
res = this->peer ().send (mb->rd_ptr (), bytes);
+ this->total_w_++;
+
if (res < 0)
err = errno ;
else
@@ -974,10 +1003,10 @@ Sender::handle_output (ACE_HANDLE h)
int rc = 0;
- if (qcount < 0) // no more message blocks in queue
+ if (qcount <= 0) // no more message blocks in queue
{
if (duplex != 0 && // full duplex, continue write
- (this->total_snd_ - this->total_rcv_ ) < 1024 ) // flow control
+ (this->total_snd_ - this->total_rcv_ ) < 1024*32 ) // flow control
rc = initiate_write ();
else
rc = terminate_io (ACE_Event_Handler::WRITE_MASK);
@@ -1190,8 +1219,13 @@ ACE_TMAIN (int argc, ACE_TCHAR *argv[])
ACE_TCHAR bufs [256];
ACE_TCHAR bufr [256];
- ACE_OS::sprintf (bufs , ACE_TEXT ("%ld"), connector.get_total_snd());
- ACE_OS::sprintf (bufr , ACE_TEXT ("%ld"), connector.get_total_rcv() );
+ ACE_OS::sprintf ( bufs , ACE_TEXT ("%ld(%ld)"),
+ connector.get_total_snd(),
+ connector.get_total_w() );
+
+ ACE_OS::sprintf ( bufr , ACE_TEXT ("%ld(%ld)"),
+ connector.get_total_rcv(),
+ connector.get_total_r() );
ACE_DEBUG ((LM_DEBUG,
ACE_TEXT ("Connector/Senders total bytes: snd=%s rcv=%s\n"),
@@ -1199,8 +1233,13 @@ ACE_TMAIN (int argc, ACE_TCHAR *argv[])
bufr
));
- ACE_OS::sprintf (bufs , ACE_TEXT ("%ld"), acceptor.get_total_snd());
- ACE_OS::sprintf (bufr , ACE_TEXT ("%ld"), acceptor.get_total_rcv() );
+ ACE_OS::sprintf ( bufs , ACE_TEXT ("%ld(%ld)"),
+ acceptor.get_total_snd(),
+ acceptor.get_total_w() );
+
+ ACE_OS::sprintf ( bufr , ACE_TEXT ("%ld(%ld)"),
+ acceptor.get_total_rcv(),
+ acceptor.get_total_r() );
ACE_DEBUG ((LM_DEBUG,
ACE_TEXT ("Acceptor/Receivers total bytes: snd=%s rcv=%s\n"),
diff --git a/tests/TP_Reactor_Test.h b/tests/TP_Reactor_Test.h
index e896f5b853e..6f192b5efb3 100644
--- a/tests/TP_Reactor_Test.h
+++ b/tests/TP_Reactor_Test.h
@@ -51,6 +51,8 @@ public:
long get_total_snd (void) { return this->total_snd_; }
long get_total_rcv (void) { return this->total_rcv_; }
+ long get_total_w (void) { return this->total_w_; }
+ long get_total_r (void) { return this->total_r_; }
// virtual from ACE_Svc_Handler<>
virtual int open (void * pVoid);
@@ -72,6 +74,8 @@ private:
ACE_Recursive_Thread_Mutex mutex_;
long total_snd_;
long total_rcv_;
+ long total_w_;
+ long total_r_;
};
// *************************************************************
@@ -83,6 +87,8 @@ public:
size_t get_number_sessions (void) { return sessions_; }
long get_total_snd (void) { return this->total_snd_; }
long get_total_rcv (void) { return this->total_rcv_; }
+ long get_total_w (void) { return this->total_w_; }
+ long get_total_r (void) { return this->total_r_; }
Acceptor (void);
virtual ~Acceptor (void);
@@ -100,6 +106,8 @@ private:
Receiver *list_receivers_[MAX_RECEIVERS];
long total_snd_;
long total_rcv_;
+ long total_w_;
+ long total_r_;
void on_new_receiver (Receiver & rcvr);
void on_delete_receiver (Receiver & rcvr);
@@ -123,6 +131,8 @@ public:
long get_total_snd (void) { return this->total_snd_; }
long get_total_rcv (void) { return this->total_rcv_; }
+ long get_total_w (void) { return this->total_w_; }
+ long get_total_r (void) { return this->total_r_; }
// virtual from ACE_Svc_Handler<>
virtual int open (void * pVoid);
@@ -147,6 +157,8 @@ private:
char send_buf_ [1024];
long total_snd_;
long total_rcv_;
+ long total_w_;
+ long total_r_;
};
// *************************************************************
@@ -158,6 +170,8 @@ public:
long get_number_sessions (void) { return sessions_; }
long get_total_snd (void) { return this->total_snd_; }
long get_total_rcv (void) { return this->total_rcv_; }
+ long get_total_w (void) { return this->total_w_; }
+ long get_total_r (void) { return this->total_r_; }
Connector ();
@@ -176,6 +190,8 @@ private:
Sender * list_senders_ [MAX_SENDERS];
long total_snd_;
long total_rcv_;
+ long total_w_;
+ long total_r_;
void on_new_sender (Sender & sndr);
void on_delete_sender (Sender & sndr);