From 658c32caa59e98b1342d1e60d5e33ceca9f1ad82 Mon Sep 17 00:00:00 2001 From: bala Date: Tue, 7 May 2002 15:32:49 +0000 Subject: ChangeLogTag: Tue May 7 10:31:24 2002 Alex Libman --- ChangeLog | 9 ++++ ChangeLogs/ChangeLog-02a | 9 ++++ ChangeLogs/ChangeLog-03a | 9 ++++ tests/TP_Reactor_Test.cpp | 128 ++++++++++++++++++++++++++++++++++++++-------- tests/TP_Reactor_Test.h | 46 ++++++++++++----- tests/run_test.lst | 2 +- 6 files changed, 169 insertions(+), 34 deletions(-) diff --git a/ChangeLog b/ChangeLog index 8bc6ca7e1bd..35c1e23c5e2 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,12 @@ +Tue May 7 10:31:24 2002 Alex Libman + + * tests/TP_Reactor_Test.cpp: + * tests/TP_Reactor_Test.h: Fixed the tests to work fine on all + platforms. + + * tests/run_test.lst (TP_Reactor_Test): Enabled the test for the + daily runs. + Mon May 06 16:31:14 2002 Irfan Pyarali * examples/Reactor/WFMO_Reactor/Abandoned.cpp (handle_timeout): diff --git a/ChangeLogs/ChangeLog-02a b/ChangeLogs/ChangeLog-02a index 8bc6ca7e1bd..35c1e23c5e2 100644 --- a/ChangeLogs/ChangeLog-02a +++ b/ChangeLogs/ChangeLog-02a @@ -1,3 +1,12 @@ +Tue May 7 10:31:24 2002 Alex Libman + + * tests/TP_Reactor_Test.cpp: + * tests/TP_Reactor_Test.h: Fixed the tests to work fine on all + platforms. + + * tests/run_test.lst (TP_Reactor_Test): Enabled the test for the + daily runs. + Mon May 06 16:31:14 2002 Irfan Pyarali * examples/Reactor/WFMO_Reactor/Abandoned.cpp (handle_timeout): diff --git a/ChangeLogs/ChangeLog-03a b/ChangeLogs/ChangeLog-03a index 8bc6ca7e1bd..35c1e23c5e2 100644 --- a/ChangeLogs/ChangeLog-03a +++ b/ChangeLogs/ChangeLog-03a @@ -1,3 +1,12 @@ +Tue May 7 10:31:24 2002 Alex Libman + + * tests/TP_Reactor_Test.cpp: + * tests/TP_Reactor_Test.h: Fixed the tests to work fine on all + platforms. + + * tests/run_test.lst (TP_Reactor_Test): Enabled the test for the + daily runs. + Mon May 06 16:31:14 2002 Irfan Pyarali * examples/Reactor/WFMO_Reactor/Abandoned.cpp (handle_timeout): diff --git a/tests/TP_Reactor_Test.cpp b/tests/TP_Reactor_Test.cpp index 3ef9b447156..b09344faecf 100644 --- a/tests/TP_Reactor_Test.cpp +++ b/tests/TP_Reactor_Test.cpp @@ -103,6 +103,17 @@ static char data[] = "Connection: Keep-Alive\r\n" "\r\n" ; +// ************************************************************* + +class LogLocker +{ +public: + + LogLocker () { ACE_LOG_MSG->acquire (); } + virtual ~LogLocker () { ACE_LOG_MSG->release (); } +}; +// ************************************************************* + /** * @class MyTask * @@ -246,7 +257,9 @@ MyTask::svc (void) Acceptor::Acceptor (void) : ACE_Acceptor ((ACE_Reactor *) 0), - sessions_ (0) + sessions_ (0), + total_snd_(0), + total_rcv_(0) { ACE_Guard locker (this->mutex_); @@ -292,13 +305,27 @@ Acceptor::on_delete_receiver (Receiver &rcvr) ACE_Guard locker (this->mutex_); this->sessions_--; + + this->total_snd_ += rcvr.get_total_snd(); + this->total_rcv_ += rcvr.get_total_rcv(); + + ACE_TCHAR bufs [256]; + ACE_TCHAR bufr [256]; + + ACE_OS::sprintf ( bufs , ACE_TEXT ("%ld"), rcvr.get_total_snd ()); + ACE_OS::sprintf ( bufr , ACE_TEXT ("%ld"), rcvr.get_total_rcv ()); + + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("Receiver::~DTOR index=%d snd=%s rcv=%s sessions_=%d\n"), + rcvr.index_, + bufs, + bufr, + this->sessions_)); + if (rcvr.index_ < MAX_RECEIVERS && this->list_receivers_[rcvr.index_] == &rcvr) this->list_receivers_[rcvr.index_] = 0; - ACE_DEBUG ((LM_DEBUG, - "Receiver::~DTOR sessions_=%d\n", - this->sessions_)); } int @@ -311,7 +338,7 @@ Acceptor::start (const ACE_INET_Addr &addr) ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "Acceptor::start () - open failed"), - 0); + 0); return 1; } @@ -339,7 +366,9 @@ Acceptor::make_svc_handler (Receiver *&sh) Receiver::Receiver (Acceptor * acceptor, int index) : acceptor_ (acceptor), index_ (index), - flg_mask_ (ACE_Event_Handler::NULL_MASK) + flg_mask_ (ACE_Event_Handler::NULL_MASK), + total_snd_(0), + total_rcv_(0) { if (acceptor_ != 0) acceptor_->on_new_receiver (*this); @@ -447,7 +476,10 @@ Receiver::handle_input (ACE_HANDLE h) ssize_t res = this->peer ().recv (mb->rd_ptr (), BUFSIZ-1); if (res >= 0) - mb->wr_ptr (res); + { + mb->wr_ptr (res); + this->total_rcv_ += res; + } else err = errno ; @@ -455,6 +487,8 @@ Receiver::handle_input (ACE_HANDLE h) if (loglevel == 0 || res <= 0 || err!= 0) { + LogLocker log_lock; + ACE_DEBUG ((LM_DEBUG, "**** Receiver::handle_input () SessionId=%d****\n", index_)); ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_to_read", BUFSIZ)); ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "handle", h)); @@ -530,9 +564,14 @@ Receiver::handle_output (ACE_HANDLE h) if (res < 0) err = errno ; + else + this->total_snd_ += res; + if (loglevel == 0 || res <= 0 || err!= 0) { + LogLocker log_lock; + ACE_DEBUG ((LM_DEBUG, "**** Receiver::handle_output () SessionId=%d****\n", index_)); ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_to_write", bytes)); ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "handle", h)); @@ -564,7 +603,9 @@ Receiver::handle_output (ACE_HANDLE h) Connector::Connector (void) : ACE_Connector ((ACE_Reactor *) 0), - sessions_ (0) + sessions_ (0), + total_snd_(0), + total_rcv_(0) { ACE_Guard locker (this->mutex_); @@ -611,13 +652,27 @@ Connector::on_delete_sender (Sender & sndr) ACE_Guard locker (this->mutex_); this->sessions_--; + + this->total_snd_ += sndr.get_total_snd(); + this->total_rcv_ += sndr.get_total_rcv(); + + ACE_TCHAR bufs [256]; + ACE_TCHAR bufr [256]; + + ACE_OS::sprintf ( bufs , ACE_TEXT ("%ld"), sndr.get_total_snd ()); + ACE_OS::sprintf ( bufr , ACE_TEXT ("%ld"), sndr.get_total_rcv ()); + + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("Sender::~DTOR index=%d snd=%s rcv=%s sessions_=%d\n"), + sndr.index_, + bufs, + bufr, + this->sessions_)); + if (sndr.index_ < MAX_SENDERS && this->list_senders_[sndr.index_] == &sndr) this->list_senders_[sndr.index_] = 0; - ACE_DEBUG ((LM_DEBUG, - "Sender::~DTOR sessions_=%d\n", - this->sessions_)); } int @@ -676,7 +731,9 @@ Connector::make_svc_handler (Sender * & sh) Sender::Sender (Connector* connector, int index) : connector_ (connector), index_ (index), - flg_mask_ (ACE_Event_Handler::NULL_MASK) + flg_mask_ (ACE_Event_Handler::NULL_MASK), + total_snd_(0), + total_rcv_(0) { if (connector_ != 0) connector_->on_new_sender (*this); @@ -739,7 +796,7 @@ int Sender::open (void *) int Sender::initiate_write (void) { - if (this->msg_queue ()->message_count () < 2) // flow control + if ( this->msg_queue ()->message_count () < 2) // flow control { int nbytes = ACE_OS::strlen (send_buf_); @@ -821,7 +878,10 @@ Sender::handle_input (ACE_HANDLE h) BUFSIZ-1); if (res >= 0) - mb->wr_ptr (res); + { + mb->wr_ptr (res); + this->total_rcv_ += res; + } else err = errno ; @@ -829,6 +889,8 @@ Sender::handle_input (ACE_HANDLE h) if (loglevel == 0 || res <= 0 || err!= 0) { + LogLocker log_lock; + ACE_DEBUG ((LM_DEBUG, "**** Sender::handle_input () SessionId=%d****\n", index_)); ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_to_read", BUFSIZ)); ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "handle", h)); @@ -863,7 +925,7 @@ Sender::handle_input (ACE_HANDLE h) rc = initiate_write (); if (rc != 0) return -1; - + return check_destroy (); } @@ -888,9 +950,13 @@ Sender::handle_output (ACE_HANDLE h) if (res < 0) err = errno ; + else + this->total_snd_ += res; if (loglevel == 0 || res <= 0 || err!= 0) { + LogLocker log_lock; + ACE_DEBUG ((LM_DEBUG, "**** Sender::handle_output () SessionId=%d****\n", index_)); ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_to_write", bytes)); ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "handle", h)); @@ -910,7 +976,8 @@ Sender::handle_output (ACE_HANDLE h) if (qcount < 0) // no more message blocks in queue { - if (duplex != 0) // full duplex, continue write + if (duplex != 0 && // full duplex, continue write + (this->total_snd_ - this->total_rcv_ ) < 1024 ) // flow control rc = initiate_write (); else rc = terminate_io (ACE_Event_Handler::WRITE_MASK); @@ -965,7 +1032,7 @@ parse_args (int argc, ACE_TCHAR *argv[]) threads = 3; // size of Proactor thread pool senders = 20; // number of senders loglevel = 0; // log level : 0 full/ 1 only errors - seconds = 2; // time to run in seconds + seconds = 20; // time to run in seconds return 0; } @@ -1089,9 +1156,7 @@ ACE_TMAIN (int argc, ACE_TCHAR *argv[]) int rc = 0; if (both != 0 || host == 0) // Acceptor - // Simplify, initial read with zero size - if (acceptor.start (ACE_INET_Addr (port)) == 0) - rc = 1; + rc += acceptor.start (ACE_INET_Addr (port)); if (both != 0 || host != 0) { @@ -1120,6 +1185,29 @@ ACE_TMAIN (int argc, ACE_TCHAR *argv[]) connector.stop (); acceptor.stop (); + + //Print statistic + ACE_TCHAR bufs [256]; + ACE_TCHAR bufr [256]; + + ACE_OS::sprintf (bufs , ACE_TEXT ("%ld"), connector.get_total_snd()); + ACE_OS::sprintf (bufr , ACE_TEXT ("%ld"), connector.get_total_rcv() ); + + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("Connector/Senders total bytes: snd=%s rcv=%s\n"), + bufs, + bufr + )); + + ACE_OS::sprintf (bufs , ACE_TEXT ("%ld"), acceptor.get_total_snd()); + ACE_OS::sprintf (bufr , ACE_TEXT ("%ld"), acceptor.get_total_rcv() ); + + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("Acceptor/Receivers total bytes: snd=%s rcv=%s\n"), + bufs, + bufr + )); + #else /* ACE_HAS_THREADS */ ACE_UNUSED_ARG( argc ); ACE_UNUSED_ARG( argv ); diff --git a/tests/TP_Reactor_Test.h b/tests/TP_Reactor_Test.h index 3a9271f3841..e896f5b853e 100644 --- a/tests/TP_Reactor_Test.h +++ b/tests/TP_Reactor_Test.h @@ -30,7 +30,7 @@ #include "ace/Svc_Handler.h" #include "ace/Synch.h" -const size_t MAX_SENDERS = 100; +const size_t MAX_SENDERS = 1000; const size_t MAX_RECEIVERS = 1000; @@ -49,6 +49,9 @@ public: ~Receiver (void); + long get_total_snd (void) { return this->total_snd_; } + long get_total_rcv (void) { return this->total_rcv_; } + // virtual from ACE_Svc_Handler<> virtual int open (void * pVoid); @@ -67,6 +70,8 @@ private: int flg_mask_; ACE_Recursive_Thread_Mutex mutex_; + long total_snd_; + long total_rcv_; }; // ************************************************************* @@ -75,25 +80,29 @@ class Acceptor : public ACE_Acceptor { friend class Receiver; public: - size_t get_number_sessions (void) { return sessions_; } + size_t get_number_sessions (void) { return sessions_; } + long get_total_snd (void) { return this->total_snd_; } + long get_total_rcv (void) { return this->total_rcv_; } - Acceptor (void); - virtual ~Acceptor (void); + Acceptor (void); + virtual ~Acceptor (void); - void stop (void); - int start (const ACE_INET_Addr & addr); + void stop (void); + int start (const ACE_INET_Addr & addr); - // virtual from ACE_Acceptor - virtual int make_svc_handler (Receiver * & sh); + // virtual from ACE_Acceptor + virtual int make_svc_handler (Receiver * & sh); private: - ACE_Recursive_Thread_Mutex mutex_; - size_t sessions_; - Receiver *list_receivers_[MAX_RECEIVERS]; + ACE_Recursive_Thread_Mutex mutex_; + size_t sessions_; + Receiver *list_receivers_[MAX_RECEIVERS]; + long total_snd_; + long total_rcv_; - void on_new_receiver (Receiver & rcvr); - void on_delete_receiver (Receiver & rcvr); + void on_new_receiver (Receiver & rcvr); + void on_delete_receiver (Receiver & rcvr); }; @@ -112,6 +121,9 @@ public: ~Sender (void); + long get_total_snd (void) { return this->total_snd_; } + long get_total_rcv (void) { return this->total_rcv_; } + // virtual from ACE_Svc_Handler<> virtual int open (void * pVoid); @@ -133,6 +145,8 @@ private: ACE_Recursive_Thread_Mutex mutex_; char send_buf_ [1024]; + long total_snd_; + long total_rcv_; }; // ************************************************************* @@ -142,6 +156,9 @@ class Connector: public ACE_Connector friend class Sender; public: long get_number_sessions (void) { return sessions_; } + long get_total_snd (void) { return this->total_snd_; } + long get_total_rcv (void) { return this->total_rcv_; } + Connector (); virtual ~Connector (); @@ -157,9 +174,12 @@ private: ACE_Recursive_Thread_Mutex mutex_; size_t sessions_; Sender * list_senders_ [MAX_SENDERS]; + long total_snd_; + long total_rcv_; void on_new_sender (Sender & sndr); void on_delete_sender (Sender & sndr); }; + #endif /* ACE_TESTS_TP_REACTOR_TEST_H */ diff --git a/tests/run_test.lst b/tests/run_test.lst index f5a0f1e3384..99f54949131 100644 --- a/tests/run_test.lst +++ b/tests/run_test.lst @@ -117,7 +117,7 @@ Time_Service_Test: ALL !STATIC !DISABLED !missing_netsvcs TOKEN !chorus !Unicos Time_Value_Test Token_Strategy_Test Tokens_Test: ALL MSVC !DISABLED TOKEN !chorus !Unicos -TP_Reactor_Test: ALL !DISABLED +TP_Reactor_Test: ALL TSS_Test Vector_Test UPIPE_SAP_Test: !VxWorks -- cgit v1.2.1