diff options
Diffstat (limited to 'tests/TP_Reactor_Test.cpp')
-rw-r--r-- | tests/TP_Reactor_Test.cpp | 97 |
1 files changed, 68 insertions, 29 deletions
diff --git a/tests/TP_Reactor_Test.cpp b/tests/TP_Reactor_Test.cpp index 52530475341..cbea833a088 100644 --- a/tests/TP_Reactor_Test.cpp +++ b/tests/TP_Reactor_Test.cpp @@ -259,7 +259,9 @@ Acceptor::Acceptor (void) : ACE_Acceptor<Receiver,ACE_SOCK_ACCEPTOR> ((ACE_Reactor *) 0), sessions_ (0), total_snd_(0), - total_rcv_(0) + total_rcv_(0), + total_w_ (0), + total_r_ (0) { ACE_Guard<ACE_Recursive_Thread_Mutex> locker (this->mutex_); @@ -306,14 +308,26 @@ Acceptor::on_delete_receiver (Receiver &rcvr) this->sessions_--; - this->total_snd_ += rcvr.get_total_snd(); - this->total_rcv_ += rcvr.get_total_rcv(); + this->total_snd_ += rcvr.get_total_snd (); + this->total_rcv_ += rcvr.get_total_rcv (); + this->total_w_ += rcvr.get_total_w (); + this->total_r_ += rcvr.get_total_r (); + + if (rcvr.index_ >= 0 + && rcvr.index_ < MAX_RECEIVERS + && this->list_receivers_[rcvr.index_] == &rcvr) + this->list_receivers_[rcvr.index_] = 0; ACE_TCHAR bufs [256]; ACE_TCHAR bufr [256]; - ACE_OS::sprintf ( bufs , ACE_TEXT ("%ld"), rcvr.get_total_snd ()); - ACE_OS::sprintf ( bufr , ACE_TEXT ("%ld"), rcvr.get_total_rcv ()); + ACE_OS::sprintf ( bufs , ACE_TEXT ("%ld(%ld)"), + rcvr.get_total_snd (), + rcvr.get_total_w () ); + + ACE_OS::sprintf ( bufr , ACE_TEXT ("%ld(%ld)"), + rcvr.get_total_rcv (), + rcvr.get_total_r ()); ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("Receiver::~DTOR index=%d snd=%s rcv=%s sessions_=%d\n"), @@ -321,11 +335,6 @@ Acceptor::on_delete_receiver (Receiver &rcvr) bufs, bufr, this->sessions_)); - - if (rcvr.index_ < MAX_RECEIVERS - && this->list_receivers_[rcvr.index_] == &rcvr) - this->list_receivers_[rcvr.index_] = 0; - } int @@ -368,7 +377,9 @@ Receiver::Receiver (Acceptor * acceptor, int index) index_ (index), flg_mask_ (ACE_Event_Handler::NULL_MASK), total_snd_(0), - total_rcv_(0) + total_rcv_(0), + total_w_ (0), + total_r_ (0) { if (acceptor_ != 0) acceptor_->on_new_receiver (*this); @@ -475,6 +486,8 @@ Receiver::handle_input (ACE_HANDLE h) int err = 0; ssize_t res = this->peer ().recv (mb->rd_ptr (), BUFSIZ-1); + this->total_r_++; + if (res >= 0) { mb->wr_ptr (res); @@ -527,7 +540,7 @@ Receiver::handle_input (ACE_HANDLE h) rc = this->terminate_io (ACE_Event_Handler::READ_MASK); else // full duplex { - if (qcount >= 2) // flow control, stop read + if (qcount >= 20 ) // flow control, stop read rc = this->terminate_io (ACE_Event_Handler::READ_MASK); else rc = this->initiate_io (ACE_Event_Handler::READ_MASK); @@ -562,6 +575,8 @@ Receiver::handle_output (ACE_HANDLE h) bytes = mb->length (); res = this->peer ().send (mb->rd_ptr (), bytes); + this->total_w_++; + if (res < 0) err = errno ; else @@ -587,7 +602,7 @@ Receiver::handle_output (ACE_HANDLE h) if (err != 0 || res < 0) return -1; - if (qcount < 0) // no more message blocks in queue + if (qcount <= 0) // no more message blocks in queue { if (this->terminate_io (ACE_Event_Handler::WRITE_MASK) != 0) return -1; @@ -605,7 +620,9 @@ Connector::Connector (void) : ACE_Connector<Sender,ACE_SOCK_CONNECTOR> ((ACE_Reactor *) 0), sessions_ (0), total_snd_(0), - total_rcv_(0) + total_rcv_(0), + total_w_ (0), + total_r_ (0) { ACE_Guard<ACE_Recursive_Thread_Mutex> locker (this->mutex_); @@ -652,15 +669,26 @@ Connector::on_delete_sender (Sender & sndr) ACE_Guard<ACE_Recursive_Thread_Mutex> locker (this->mutex_); this->sessions_--; - this->total_snd_ += sndr.get_total_snd(); this->total_rcv_ += sndr.get_total_rcv(); + this->total_w_ += sndr.get_total_w(); + this->total_r_ += sndr.get_total_r(); + + if (sndr.index_ >= 0 + && sndr.index_ < MAX_SENDERS + && this->list_senders_[sndr.index_] == &sndr) + this->list_senders_[sndr.index_] = 0; ACE_TCHAR bufs [256]; ACE_TCHAR bufr [256]; - ACE_OS::sprintf ( bufs , ACE_TEXT ("%ld"), sndr.get_total_snd ()); - ACE_OS::sprintf ( bufr , ACE_TEXT ("%ld"), sndr.get_total_rcv ()); + ACE_OS::sprintf ( bufs , ACE_TEXT ("%ld(%ld)"), + sndr.get_total_snd(), + sndr.get_total_w() ); + + ACE_OS::sprintf ( bufr , ACE_TEXT ("%ld(%ld)"), + sndr.get_total_rcv(), + sndr.get_total_r() ); ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("Sender::~DTOR index=%d snd=%s rcv=%s sessions_=%d\n"), @@ -669,10 +697,6 @@ Connector::on_delete_sender (Sender & sndr) bufr, this->sessions_)); - if (sndr.index_ < MAX_SENDERS - && this->list_senders_[sndr.index_] == &sndr) - this->list_senders_[sndr.index_] = 0; - } int @@ -733,7 +757,9 @@ Sender::Sender (Connector* connector, int index) index_ (index), flg_mask_ (ACE_Event_Handler::NULL_MASK), total_snd_(0), - total_rcv_(0) + total_rcv_(0), + total_w_ (0), + total_r_ (0) { if (connector_ != 0) connector_->on_new_sender (*this); @@ -796,7 +822,7 @@ int Sender::open (void *) int Sender::initiate_write (void) { - if ( this->msg_queue ()->message_count () < 2) // flow control + if ( this->msg_queue ()->message_count () < 20) // flow control { int nbytes = ACE_OS::strlen (send_buf_); @@ -876,6 +902,7 @@ Sender::handle_input (ACE_HANDLE h) int err = 0; ssize_t res = this->peer ().recv (mb->rd_ptr (), BUFSIZ-1); + this->total_r_++; if (res >= 0) { @@ -948,6 +975,8 @@ Sender::handle_output (ACE_HANDLE h) bytes = mb->length (); res = this->peer ().send (mb->rd_ptr (), bytes); + this->total_w_++; + if (res < 0) err = errno ; else @@ -974,10 +1003,10 @@ Sender::handle_output (ACE_HANDLE h) int rc = 0; - if (qcount < 0) // no more message blocks in queue + if (qcount <= 0) // no more message blocks in queue { if (duplex != 0 && // full duplex, continue write - (this->total_snd_ - this->total_rcv_ ) < 1024 ) // flow control + (this->total_snd_ - this->total_rcv_ ) < 1024*32 ) // flow control rc = initiate_write (); else rc = terminate_io (ACE_Event_Handler::WRITE_MASK); @@ -1190,8 +1219,13 @@ ACE_TMAIN (int argc, ACE_TCHAR *argv[]) ACE_TCHAR bufs [256]; ACE_TCHAR bufr [256]; - ACE_OS::sprintf (bufs , ACE_TEXT ("%ld"), connector.get_total_snd()); - ACE_OS::sprintf (bufr , ACE_TEXT ("%ld"), connector.get_total_rcv() ); + ACE_OS::sprintf ( bufs , ACE_TEXT ("%ld(%ld)"), + connector.get_total_snd(), + connector.get_total_w() ); + + ACE_OS::sprintf ( bufr , ACE_TEXT ("%ld(%ld)"), + connector.get_total_rcv(), + connector.get_total_r() ); ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("Connector/Senders total bytes: snd=%s rcv=%s\n"), @@ -1199,8 +1233,13 @@ ACE_TMAIN (int argc, ACE_TCHAR *argv[]) bufr )); - ACE_OS::sprintf (bufs , ACE_TEXT ("%ld"), acceptor.get_total_snd()); - ACE_OS::sprintf (bufr , ACE_TEXT ("%ld"), acceptor.get_total_rcv() ); + ACE_OS::sprintf ( bufs , ACE_TEXT ("%ld(%ld)"), + acceptor.get_total_snd(), + acceptor.get_total_w() ); + + ACE_OS::sprintf ( bufr , ACE_TEXT ("%ld(%ld)"), + acceptor.get_total_rcv(), + acceptor.get_total_r() ); ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("Acceptor/Receivers total bytes: snd=%s rcv=%s\n"), |