diff options
author | bala <balanatarajan@users.noreply.github.com> | 2002-05-13 04:23:33 +0000 |
---|---|---|
committer | bala <balanatarajan@users.noreply.github.com> | 2002-05-13 04:23:33 +0000 |
commit | 8c479b9d712c43613eeb4769bc8306230e0865ed (patch) | |
tree | 05bad4caebade61c561319e203bb87b9fa8c47a0 | |
parent | e5d117da106c4656b2f7420727e4869475fe411a (diff) | |
download | ATCD-8c479b9d712c43613eeb4769bc8306230e0865ed.tar.gz |
ChangeLogTag: Sun May 12 23:20:06 2002 Alex Libman <AlexL@rumblegroup.com>
-rw-r--r-- | ChangeLog | 12 | ||||
-rw-r--r-- | ChangeLogs/ChangeLog-02a | 12 | ||||
-rw-r--r-- | ChangeLogs/ChangeLog-03a | 12 | ||||
-rw-r--r-- | tests/TP_Reactor_Test.cpp | 97 | ||||
-rw-r--r-- | tests/TP_Reactor_Test.h | 16 |
5 files changed, 120 insertions, 29 deletions
diff --git a/ChangeLog b/ChangeLog index b6b7e724521..1fed9cf64d6 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,15 @@ +Sun May 12 23:20:06 2002 Alex Libman <AlexL@rumblegroup.com> + + * tests/TP_Reactor_Test.{h,cpp}: + Improved flow-control in full-duplex mode by increasing the + window size, total_send - total_recv to achieve max performance + from TCP/IP + + Improved static info which includes the number of bytes + sent/recv and the number of read/write operations. This allows + to see distribution of I/O operations between handlers. Would be + useful for TP_Reactor's future improvements. + Sun May 12 10:11:07 2002 Douglas C. Schmidt <schmidt@macarena.cs.wustl.edu> * ace/Message_Queue_T.cpp: Updated all the enqueue*() and diff --git a/ChangeLogs/ChangeLog-02a b/ChangeLogs/ChangeLog-02a index b6b7e724521..1fed9cf64d6 100644 --- a/ChangeLogs/ChangeLog-02a +++ b/ChangeLogs/ChangeLog-02a @@ -1,3 +1,15 @@ +Sun May 12 23:20:06 2002 Alex Libman <AlexL@rumblegroup.com> + + * tests/TP_Reactor_Test.{h,cpp}: + Improved flow-control in full-duplex mode by increasing the + window size, total_send - total_recv to achieve max performance + from TCP/IP + + Improved static info which includes the number of bytes + sent/recv and the number of read/write operations. This allows + to see distribution of I/O operations between handlers. Would be + useful for TP_Reactor's future improvements. + Sun May 12 10:11:07 2002 Douglas C. Schmidt <schmidt@macarena.cs.wustl.edu> * ace/Message_Queue_T.cpp: Updated all the enqueue*() and diff --git a/ChangeLogs/ChangeLog-03a b/ChangeLogs/ChangeLog-03a index b6b7e724521..1fed9cf64d6 100644 --- a/ChangeLogs/ChangeLog-03a +++ b/ChangeLogs/ChangeLog-03a @@ -1,3 +1,15 @@ +Sun May 12 23:20:06 2002 Alex Libman <AlexL@rumblegroup.com> + + * tests/TP_Reactor_Test.{h,cpp}: + Improved flow-control in full-duplex mode by increasing the + window size, total_send - total_recv to achieve max performance + from TCP/IP + + Improved static info which includes the number of bytes + sent/recv and the number of read/write operations. This allows + to see distribution of I/O operations between handlers. Would be + useful for TP_Reactor's future improvements. + Sun May 12 10:11:07 2002 Douglas C. Schmidt <schmidt@macarena.cs.wustl.edu> * ace/Message_Queue_T.cpp: Updated all the enqueue*() and diff --git a/tests/TP_Reactor_Test.cpp b/tests/TP_Reactor_Test.cpp index 52530475341..cbea833a088 100644 --- a/tests/TP_Reactor_Test.cpp +++ b/tests/TP_Reactor_Test.cpp @@ -259,7 +259,9 @@ Acceptor::Acceptor (void) : ACE_Acceptor<Receiver,ACE_SOCK_ACCEPTOR> ((ACE_Reactor *) 0), sessions_ (0), total_snd_(0), - total_rcv_(0) + total_rcv_(0), + total_w_ (0), + total_r_ (0) { ACE_Guard<ACE_Recursive_Thread_Mutex> locker (this->mutex_); @@ -306,14 +308,26 @@ Acceptor::on_delete_receiver (Receiver &rcvr) this->sessions_--; - this->total_snd_ += rcvr.get_total_snd(); - this->total_rcv_ += rcvr.get_total_rcv(); + this->total_snd_ += rcvr.get_total_snd (); + this->total_rcv_ += rcvr.get_total_rcv (); + this->total_w_ += rcvr.get_total_w (); + this->total_r_ += rcvr.get_total_r (); + + if (rcvr.index_ >= 0 + && rcvr.index_ < MAX_RECEIVERS + && this->list_receivers_[rcvr.index_] == &rcvr) + this->list_receivers_[rcvr.index_] = 0; ACE_TCHAR bufs [256]; ACE_TCHAR bufr [256]; - ACE_OS::sprintf ( bufs , ACE_TEXT ("%ld"), rcvr.get_total_snd ()); - ACE_OS::sprintf ( bufr , ACE_TEXT ("%ld"), rcvr.get_total_rcv ()); + ACE_OS::sprintf ( bufs , ACE_TEXT ("%ld(%ld)"), + rcvr.get_total_snd (), + rcvr.get_total_w () ); + + ACE_OS::sprintf ( bufr , ACE_TEXT ("%ld(%ld)"), + rcvr.get_total_rcv (), + rcvr.get_total_r ()); ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("Receiver::~DTOR index=%d snd=%s rcv=%s sessions_=%d\n"), @@ -321,11 +335,6 @@ Acceptor::on_delete_receiver (Receiver &rcvr) bufs, bufr, this->sessions_)); - - if (rcvr.index_ < MAX_RECEIVERS - && this->list_receivers_[rcvr.index_] == &rcvr) - this->list_receivers_[rcvr.index_] = 0; - } int @@ -368,7 +377,9 @@ Receiver::Receiver (Acceptor * acceptor, int index) index_ (index), flg_mask_ (ACE_Event_Handler::NULL_MASK), total_snd_(0), - total_rcv_(0) + total_rcv_(0), + total_w_ (0), + total_r_ (0) { if (acceptor_ != 0) acceptor_->on_new_receiver (*this); @@ -475,6 +486,8 @@ Receiver::handle_input (ACE_HANDLE h) int err = 0; ssize_t res = this->peer ().recv (mb->rd_ptr (), BUFSIZ-1); + this->total_r_++; + if (res >= 0) { mb->wr_ptr (res); @@ -527,7 +540,7 @@ Receiver::handle_input (ACE_HANDLE h) rc = this->terminate_io (ACE_Event_Handler::READ_MASK); else // full duplex { - if (qcount >= 2) // flow control, stop read + if (qcount >= 20 ) // flow control, stop read rc = this->terminate_io (ACE_Event_Handler::READ_MASK); else rc = this->initiate_io (ACE_Event_Handler::READ_MASK); @@ -562,6 +575,8 @@ Receiver::handle_output (ACE_HANDLE h) bytes = mb->length (); res = this->peer ().send (mb->rd_ptr (), bytes); + this->total_w_++; + if (res < 0) err = errno ; else @@ -587,7 +602,7 @@ Receiver::handle_output (ACE_HANDLE h) if (err != 0 || res < 0) return -1; - if (qcount < 0) // no more message blocks in queue + if (qcount <= 0) // no more message blocks in queue { if (this->terminate_io (ACE_Event_Handler::WRITE_MASK) != 0) return -1; @@ -605,7 +620,9 @@ Connector::Connector (void) : ACE_Connector<Sender,ACE_SOCK_CONNECTOR> ((ACE_Reactor *) 0), sessions_ (0), total_snd_(0), - total_rcv_(0) + total_rcv_(0), + total_w_ (0), + total_r_ (0) { ACE_Guard<ACE_Recursive_Thread_Mutex> locker (this->mutex_); @@ -652,15 +669,26 @@ Connector::on_delete_sender (Sender & sndr) ACE_Guard<ACE_Recursive_Thread_Mutex> locker (this->mutex_); this->sessions_--; - this->total_snd_ += sndr.get_total_snd(); this->total_rcv_ += sndr.get_total_rcv(); + this->total_w_ += sndr.get_total_w(); + this->total_r_ += sndr.get_total_r(); + + if (sndr.index_ >= 0 + && sndr.index_ < MAX_SENDERS + && this->list_senders_[sndr.index_] == &sndr) + this->list_senders_[sndr.index_] = 0; ACE_TCHAR bufs [256]; ACE_TCHAR bufr [256]; - ACE_OS::sprintf ( bufs , ACE_TEXT ("%ld"), sndr.get_total_snd ()); - ACE_OS::sprintf ( bufr , ACE_TEXT ("%ld"), sndr.get_total_rcv ()); + ACE_OS::sprintf ( bufs , ACE_TEXT ("%ld(%ld)"), + sndr.get_total_snd(), + sndr.get_total_w() ); + + ACE_OS::sprintf ( bufr , ACE_TEXT ("%ld(%ld)"), + sndr.get_total_rcv(), + sndr.get_total_r() ); ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("Sender::~DTOR index=%d snd=%s rcv=%s sessions_=%d\n"), @@ -669,10 +697,6 @@ Connector::on_delete_sender (Sender & sndr) bufr, this->sessions_)); - if (sndr.index_ < MAX_SENDERS - && this->list_senders_[sndr.index_] == &sndr) - this->list_senders_[sndr.index_] = 0; - } int @@ -733,7 +757,9 @@ Sender::Sender (Connector* connector, int index) index_ (index), flg_mask_ (ACE_Event_Handler::NULL_MASK), total_snd_(0), - total_rcv_(0) + total_rcv_(0), + total_w_ (0), + total_r_ (0) { if (connector_ != 0) connector_->on_new_sender (*this); @@ -796,7 +822,7 @@ int Sender::open (void *) int Sender::initiate_write (void) { - if ( this->msg_queue ()->message_count () < 2) // flow control + if ( this->msg_queue ()->message_count () < 20) // flow control { int nbytes = ACE_OS::strlen (send_buf_); @@ -876,6 +902,7 @@ Sender::handle_input (ACE_HANDLE h) int err = 0; ssize_t res = this->peer ().recv (mb->rd_ptr (), BUFSIZ-1); + this->total_r_++; if (res >= 0) { @@ -948,6 +975,8 @@ Sender::handle_output (ACE_HANDLE h) bytes = mb->length (); res = this->peer ().send (mb->rd_ptr (), bytes); + this->total_w_++; + if (res < 0) err = errno ; else @@ -974,10 +1003,10 @@ Sender::handle_output (ACE_HANDLE h) int rc = 0; - if (qcount < 0) // no more message blocks in queue + if (qcount <= 0) // no more message blocks in queue { if (duplex != 0 && // full duplex, continue write - (this->total_snd_ - this->total_rcv_ ) < 1024 ) // flow control + (this->total_snd_ - this->total_rcv_ ) < 1024*32 ) // flow control rc = initiate_write (); else rc = terminate_io (ACE_Event_Handler::WRITE_MASK); @@ -1190,8 +1219,13 @@ ACE_TMAIN (int argc, ACE_TCHAR *argv[]) ACE_TCHAR bufs [256]; ACE_TCHAR bufr [256]; - ACE_OS::sprintf (bufs , ACE_TEXT ("%ld"), connector.get_total_snd()); - ACE_OS::sprintf (bufr , ACE_TEXT ("%ld"), connector.get_total_rcv() ); + ACE_OS::sprintf ( bufs , ACE_TEXT ("%ld(%ld)"), + connector.get_total_snd(), + connector.get_total_w() ); + + ACE_OS::sprintf ( bufr , ACE_TEXT ("%ld(%ld)"), + connector.get_total_rcv(), + connector.get_total_r() ); ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("Connector/Senders total bytes: snd=%s rcv=%s\n"), @@ -1199,8 +1233,13 @@ ACE_TMAIN (int argc, ACE_TCHAR *argv[]) bufr )); - ACE_OS::sprintf (bufs , ACE_TEXT ("%ld"), acceptor.get_total_snd()); - ACE_OS::sprintf (bufr , ACE_TEXT ("%ld"), acceptor.get_total_rcv() ); + ACE_OS::sprintf ( bufs , ACE_TEXT ("%ld(%ld)"), + acceptor.get_total_snd(), + acceptor.get_total_w() ); + + ACE_OS::sprintf ( bufr , ACE_TEXT ("%ld(%ld)"), + acceptor.get_total_rcv(), + acceptor.get_total_r() ); ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("Acceptor/Receivers total bytes: snd=%s rcv=%s\n"), diff --git a/tests/TP_Reactor_Test.h b/tests/TP_Reactor_Test.h index e896f5b853e..6f192b5efb3 100644 --- a/tests/TP_Reactor_Test.h +++ b/tests/TP_Reactor_Test.h @@ -51,6 +51,8 @@ public: long get_total_snd (void) { return this->total_snd_; } long get_total_rcv (void) { return this->total_rcv_; } + long get_total_w (void) { return this->total_w_; } + long get_total_r (void) { return this->total_r_; } // virtual from ACE_Svc_Handler<> virtual int open (void * pVoid); @@ -72,6 +74,8 @@ private: ACE_Recursive_Thread_Mutex mutex_; long total_snd_; long total_rcv_; + long total_w_; + long total_r_; }; // ************************************************************* @@ -83,6 +87,8 @@ public: size_t get_number_sessions (void) { return sessions_; } long get_total_snd (void) { return this->total_snd_; } long get_total_rcv (void) { return this->total_rcv_; } + long get_total_w (void) { return this->total_w_; } + long get_total_r (void) { return this->total_r_; } Acceptor (void); virtual ~Acceptor (void); @@ -100,6 +106,8 @@ private: Receiver *list_receivers_[MAX_RECEIVERS]; long total_snd_; long total_rcv_; + long total_w_; + long total_r_; void on_new_receiver (Receiver & rcvr); void on_delete_receiver (Receiver & rcvr); @@ -123,6 +131,8 @@ public: long get_total_snd (void) { return this->total_snd_; } long get_total_rcv (void) { return this->total_rcv_; } + long get_total_w (void) { return this->total_w_; } + long get_total_r (void) { return this->total_r_; } // virtual from ACE_Svc_Handler<> virtual int open (void * pVoid); @@ -147,6 +157,8 @@ private: char send_buf_ [1024]; long total_snd_; long total_rcv_; + long total_w_; + long total_r_; }; // ************************************************************* @@ -158,6 +170,8 @@ public: long get_number_sessions (void) { return sessions_; } long get_total_snd (void) { return this->total_snd_; } long get_total_rcv (void) { return this->total_rcv_; } + long get_total_w (void) { return this->total_w_; } + long get_total_r (void) { return this->total_r_; } Connector (); @@ -176,6 +190,8 @@ private: Sender * list_senders_ [MAX_SENDERS]; long total_snd_; long total_rcv_; + long total_w_; + long total_r_; void on_new_sender (Sender & sndr); void on_delete_sender (Sender & sndr); |