diff options
author | bala <balanatarajan@users.noreply.github.com> | 2002-05-13 13:38:54 +0000 |
---|---|---|
committer | bala <balanatarajan@users.noreply.github.com> | 2002-05-13 13:38:54 +0000 |
commit | e42bec044591a5e44f33d37da5dc2fe70b68942f (patch) | |
tree | c15e78a47ed924e77809291a3ba48253037fc19b /tests | |
parent | 49d9287688e072454b63fead8feaa6109ae0e510 (diff) | |
download | ATCD-e42bec044591a5e44f33d37da5dc2fe70b68942f.tar.gz |
ChangeLogTag: Mon May 13 08:36:38 2002 Balachandran Natarajan <bala@cs.wustl.edu>
Diffstat (limited to 'tests')
-rw-r--r-- | tests/Proactor_Test.cpp | 200 |
1 files changed, 134 insertions, 66 deletions
diff --git a/tests/Proactor_Test.cpp b/tests/Proactor_Test.cpp index e9966371ae3..6ca4ce243c5 100644 --- a/tests/Proactor_Test.cpp +++ b/tests/Proactor_Test.cpp @@ -88,21 +88,6 @@ static const size_t MIN_TIME = 1; // min 1 sec static const size_t MAX_TIME = 3600; // max 1 hour static size_t seconds = 2; // default time to run - 2 seconds -#if (defined (ACE_WIN32) && !defined (ACE_HAS_WINCE)) -static ACE_TCHAR request_line[] = - ACE_TEXT ("GET / HTTP/1.1\r\n"); - -static ACE_TCHAR headers[] = - ACE_TEXT ("Accept: */*\r\n") - ACE_TEXT ("Accept-Language: C++\r\n") - ACE_TEXT ("Accept-Encoding: gzip, deflate\r\n") - ACE_TEXT ("User-Agent: Proactor_Test/1.0 (non-compatible)\r\n") - ACE_TEXT ("Connection: Keep-Alive\r\n"); - -static ACE_TCHAR end_of_request_header[] = - ACE_TEXT ("\r\n"); -#endif /* (defined (ACE_WIN32) && !defined (ACE_HAS_WINCE)) */ - static ACE_TCHAR complete_message[] = ACE_TEXT ("GET / HTTP/1.1\r\n") ACE_TEXT ("Accept: */*\r\n") @@ -172,7 +157,11 @@ class MyTask : public ACE_Task<ACE_MT_SYNCH> public: MyTask (void): lock_ (), sem_ (0), proactor_(0) {} - virtual ~MyTask() { (void) this->stop (); } + virtual ~MyTask() + { + (void) this->stop (); + (void) this->delete_proactor(); + } virtual int svc (void); @@ -207,9 +196,9 @@ MyTask::create_proactor (ProactorType type_proactor, size_t max_op) ACE_UNUSED_ARG (type_proactor); ACE_UNUSED_ARG (max_op); - ACE_WIN32_Proactor *proactor = 0; + ACE_WIN32_Proactor *proactor_impl = 0; - ACE_NEW_RETURN (proactor, + ACE_NEW_RETURN (proactor_impl, ACE_WIN32_Proactor, -1); @@ -218,12 +207,12 @@ MyTask::create_proactor (ProactorType type_proactor, size_t max_op) #elif defined (ACE_HAS_AIO_CALLS) - ACE_POSIX_Proactor * proactor = 0; + ACE_POSIX_Proactor * proactor_impl = 0; switch (type_proactor) { case AIOCB: - ACE_NEW_RETURN (proactor, + ACE_NEW_RETURN (proactor_impl, ACE_POSIX_AIOCB_Proactor (max_op), -1); ACE_DEBUG ((LM_DEBUG, @@ -231,7 +220,7 @@ MyTask::create_proactor (ProactorType type_proactor, size_t max_op) break; case SIG: - ACE_NEW_RETURN (proactor, + ACE_NEW_RETURN (proactor_impl, ACE_POSIX_SIG_Proactor (max_op), -1); ACE_DEBUG ((LM_DEBUG, @@ -240,7 +229,7 @@ MyTask::create_proactor (ProactorType type_proactor, size_t max_op) # if defined (sun) case SUN: - ACE_NEW_RETURN (proactor, + ACE_NEW_RETURN (proactor_impl, ACE_SUN_Proactor (max_op), -1); ACE_DEBUG ((LM_DEBUG, @@ -250,7 +239,7 @@ MyTask::create_proactor (ProactorType type_proactor, size_t max_op) # if defined (__sgi) case CB: - ACE_NEW_RETURN (proactor, + ACE_NEW_RETURN (proactor_impl, ACE_POSIX_CB_Proactor (max_op), -1); ACE_DEBUG ((LM_DEBUG, @@ -266,8 +255,9 @@ MyTask::create_proactor (ProactorType type_proactor, size_t max_op) #endif // (ACE_WIN32) && !defined (ACE_HAS_WINCE) + // always delete implementation 1 , not !(proactor_impl == 0) ACE_NEW_RETURN (this->proactor_, - ACE_Proactor (proactor, !(proactor == 0)), + ACE_Proactor (proactor_impl, 1 ), -1); ACE_Proactor::instance (this->proactor_); @@ -334,10 +324,11 @@ MyTask::stop () ACE_TEXT ("%p.\n"), ACE_TEXT ("unable to stop thread pool"))); - if (this->delete_proactor () == -1) - ACE_ERROR ((LM_ERROR, - ACE_TEXT ("%p.\n"), - ACE_TEXT ("unable to delete proactor"))); + // Right choice - to delete proactor in destuctor + //if (this->delete_proactor () == -1) + // ACE_ERROR ((LM_ERROR, + // ACE_TEXT ("%p.\n"), + // ACE_TEXT ("unable to delete proactor"))); return 0; } @@ -373,6 +364,8 @@ public: long get_total_snd (void) { return this->total_snd_; } long get_total_rcv (void) { return this->total_rcv_; } + long get_total_w (void) { return this->total_w_; } + long get_total_r (void) { return this->total_r_; } /// This is called after the new connection has been accepted. virtual void open (ACE_HANDLE handle, @@ -409,6 +402,8 @@ private: int flg_cancel_; long total_snd_; long total_rcv_; + long total_w_; + long total_r_; }; class Acceptor : public ACE_Asynch_Acceptor<Receiver> @@ -418,6 +413,8 @@ public: int get_number_sessions (void) { return this->sessions_; } long get_total_snd (void) { return this->total_snd_; } long get_total_rcv (void) { return this->total_rcv_; } + long get_total_w (void) { return this->total_w_; } + long get_total_r (void) { return this->total_r_; } Acceptor (void); virtual ~Acceptor (void); @@ -437,13 +434,17 @@ private: Receiver *list_receivers_[MAX_RECEIVERS]; long total_snd_; long total_rcv_; + long total_w_; + long total_r_; }; // ************************************************************* Acceptor::Acceptor (void) : sessions_ (0), total_snd_(0), - total_rcv_(0) + total_rcv_(0), + total_w_ (0), + total_r_ (0) { ACE_GUARD (ACE_SYNCH_RECURSIVE_MUTEX, monitor, this->lock_); @@ -511,6 +512,8 @@ Acceptor::on_delete_receiver (Receiver & rcvr) this->total_snd_ += rcvr.get_total_snd(); this->total_rcv_ += rcvr.get_total_rcv(); + this->total_w_ += rcvr.get_total_w(); + this->total_r_ += rcvr.get_total_r(); if (rcvr.index_ >= 0 && rcvr.index_ < MAX_RECEIVERS @@ -520,8 +523,13 @@ Acceptor::on_delete_receiver (Receiver & rcvr) ACE_TCHAR bufs [256]; ACE_TCHAR bufr [256]; - ACE_OS::sprintf ( bufs , ACE_TEXT ("%ld"), rcvr.get_total_snd ()); - ACE_OS::sprintf ( bufr , ACE_TEXT ("%ld"), rcvr.get_total_rcv ()); + ACE_OS::sprintf ( bufs , ACE_TEXT ("%ld(%ld)"), + rcvr.get_total_snd(), + rcvr.get_total_w() ); + + ACE_OS::sprintf ( bufr , ACE_TEXT ("%ld(%ld)"), + rcvr.get_total_rcv(), + rcvr.get_total_r() ); ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("Receiver::~DTOR index=%d snd=%s rcv=%s sessions_=%d\n"), @@ -560,7 +568,9 @@ Receiver::Receiver (Acceptor * acceptor, int index) io_count_ (0), flg_cancel_(0), total_snd_(0), - total_rcv_(0) + total_rcv_(0), + total_w_ (0), + total_r_ (0) { if (this->acceptor_ != 0) this->acceptor_->on_new_receiver (*this); @@ -637,6 +647,7 @@ Receiver::initiate_read_stream (void) } this->io_count_++; + this->total_r_++; return 0; } @@ -667,6 +678,7 @@ Receiver::initiate_write_stream (ACE_Message_Block &mb, int nbytes) } this->io_count_++; + this->total_w_++; return 0; } @@ -840,6 +852,8 @@ public: long get_total_snd (void) { return this->total_snd_; } long get_total_rcv (void) { return this->total_rcv_; } + long get_total_w (void) { return this->total_w_; } + long get_total_r (void) { return this->total_r_; } virtual void handle_read_stream (const ACE_Asynch_Read_Stream::Result &result); // This is called when asynchronous reads from the socket complete @@ -865,6 +879,8 @@ private: int flg_cancel_; long total_snd_; long total_rcv_; + long total_w_; + long total_r_; }; class Connector : public ACE_Asynch_Connector<Sender> @@ -874,6 +890,8 @@ public: int get_number_sessions (void) { return this->sessions_; } long get_total_snd (void) { return this->total_snd_; } long get_total_rcv (void) { return this->total_rcv_; } + long get_total_w (void) { return this->total_w_; } + long get_total_r (void) { return this->total_r_; } Connector (void); virtual ~Connector (void); @@ -894,6 +912,8 @@ private: Sender *list_senders_[MAX_SENDERS]; long total_snd_; long total_rcv_; + long total_w_; + long total_r_; }; // ************************************************************* @@ -901,7 +921,9 @@ private: Connector::Connector (void) : sessions_ (0), total_snd_(0), - total_rcv_(0) + total_rcv_(0), + total_w_ (0), + total_r_ (0) { ACE_GUARD (ACE_SYNCH_RECURSIVE_MUTEX, monitor, this->lock_); @@ -966,6 +988,8 @@ Connector::on_delete_sender (Sender &sndr) this->sessions_--; this->total_snd_ += sndr.get_total_snd(); this->total_rcv_ += sndr.get_total_rcv(); + this->total_w_ += sndr.get_total_w(); + this->total_r_ += sndr.get_total_r(); if (sndr.index_ >= 0 && sndr.index_ < MAX_SENDERS @@ -975,8 +999,13 @@ Connector::on_delete_sender (Sender &sndr) ACE_TCHAR bufs [256]; ACE_TCHAR bufr [256]; - ACE_OS::sprintf ( bufs , ACE_TEXT ("%ld"), sndr.get_total_snd ()); - ACE_OS::sprintf ( bufr , ACE_TEXT ("%ld"), sndr.get_total_rcv ()); + ACE_OS::sprintf ( bufs , ACE_TEXT ("%ld(%ld)"), + sndr.get_total_snd(), + sndr.get_total_w() ); + + ACE_OS::sprintf ( bufr , ACE_TEXT ("%ld(%ld)"), + sndr.get_total_rcv(), + sndr.get_total_r() ); ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("Sender::~DTOR index=%d snd=%s rcv=%s sessions_=%d\n"), @@ -1056,7 +1085,9 @@ Sender::Sender (Connector * connector, int index) io_count_ (0), flg_cancel_(0), total_snd_ (0), - total_rcv_ (0) + total_rcv_ (0), + total_w_ (0), + total_r_ (0) { if (this->connector_ != 0) this->connector_->on_new_sender (*this); @@ -1125,25 +1156,30 @@ Sender::initiate_write_stream (void) if (this->flg_cancel_ != 0 || this->handle_ == ACE_INVALID_HANDLE) return -1; + static const size_t complete_message_length = ACE_OS::strlen (complete_message); + #if (defined (ACE_WIN32) && !defined (ACE_HAS_WINCE)) - static const size_t request_line_length = ACE_OS::strlen (request_line); - static const size_t headers_length = ACE_OS::strlen (headers); - static const size_t end_of_request_header_length = ACE_OS::strlen (end_of_request_header); ACE_Message_Block *mb1 = 0, *mb2 = 0, *mb3 = 0; // No need to allocate +1 for proper printing - the memory includes it already - ACE_NEW_RETURN (mb1, ACE_Message_Block ((char *) request_line, - request_line_length), -1); - mb1->wr_ptr (request_line_length); - ACE_NEW_RETURN (mb2, ACE_Message_Block ((char *) headers, - headers_length), -1); - mb2->wr_ptr (headers_length); - ACE_NEW_RETURN (mb3, ACE_Message_Block ((char *) end_of_request_header, - end_of_request_header_length), -1); - mb3->wr_ptr (end_of_request_header_length); + ACE_NEW_RETURN (mb1, + ACE_Message_Block (complete_message, complete_message_length), + -1); + + ACE_NEW_RETURN (mb2, + ACE_Message_Block (complete_message, complete_message_length), + -1); + + ACE_NEW_RETURN (mb3, + ACE_Message_Block (complete_message, complete_message_length), + -1); + + mb1->wr_ptr (complete_message_length); + mb2->wr_ptr (complete_message_length); + mb3->wr_ptr (complete_message_length); // chain them together mb1->cont (mb2); @@ -1158,7 +1194,6 @@ Sender::initiate_write_stream (void) -1); } #else /* (defined (ACE_WIN32) && !defined (ACE_HAS_WINCE)) */ - static const size_t complete_message_length = ACE_OS::strlen (complete_message); ACE_Message_Block *mb = 0; @@ -1179,6 +1214,7 @@ Sender::initiate_write_stream (void) #endif /* (defined (ACE_WIN32) && !defined (ACE_HAS_WINCE)) */ this->io_count_++; + this->total_w_++; return 0; } @@ -1188,30 +1224,48 @@ Sender::initiate_read_stream (void) if (this->flg_cancel_ != 0 || this->handle_ == ACE_INVALID_HANDLE) return -1; -#if (defined (ACE_WIN32) && !defined (ACE_HAS_WINCE)) - static const size_t request_line_length = ACE_OS::strlen (request_line); - static const size_t headers_length = ACE_OS::strlen (headers); - static const size_t end_of_request_header_length = ACE_OS::strlen (end_of_request_header); + static const size_t complete_message_length = ACE_OS::strlen (complete_message); +#if (defined (ACE_WIN32) && !defined (ACE_HAS_WINCE)) ACE_Message_Block *mb1 = 0, *mb2 = 0, - *mb3 = 0; + *mb3 = 0, + *mb4 = 0, + *mb5 = 0, + *mb6 = 0; // We allocate +1 only for proper printing - we can just set the last byte // to '\0' before printing out - ACE_NEW_RETURN (mb1, ACE_Message_Block (request_line_length + 1), -1); - ACE_NEW_RETURN (mb2, ACE_Message_Block (headers_length + 1), -1); - ACE_NEW_RETURN (mb3, ACE_Message_Block (end_of_request_header_length + 1), -1); + ACE_NEW_RETURN (mb1, ACE_Message_Block (complete_message_length + 1), -1); + ACE_NEW_RETURN (mb2, ACE_Message_Block (complete_message_length + 1), -1); + ACE_NEW_RETURN (mb3, ACE_Message_Block (complete_message_length + 1), -1); + + // Let allocate memory for one more triplet, + // This improves performance + // as we can receive more the than one block at once + // Generally, we can receive more triplets .... + ACE_NEW_RETURN (mb4, ACE_Message_Block (complete_message_length + 1), -1); + ACE_NEW_RETURN (mb5, ACE_Message_Block (complete_message_length + 1), -1); + ACE_NEW_RETURN (mb6, ACE_Message_Block (complete_message_length + 1), -1); mb1->cont (mb2); mb2->cont (mb3); + mb3->cont (mb4); + mb4->cont (mb5); + mb5->cont (mb6); + + // hide last byte in each message block, reserving it for later to set '\0' // for proper printouts mb1->size (mb1->size () - 1); mb2->size (mb2->size () - 1); mb3->size (mb3->size () - 1); + mb4->size (mb4->size () - 1); + mb5->size (mb5->size () - 1); + mb6->size (mb6->size () - 1); + // Inititiate read if (this->rs_.readv (*mb1, mb1->total_size () - 1) == -1) { @@ -1222,14 +1276,17 @@ Sender::initiate_read_stream (void) -1); } #else /* (defined (ACE_WIN32) && !defined (ACE_HAS_WINCE)) */ - static const size_t complete_message_length = ACE_OS::strlen (complete_message); + + // Try to read more chunks + size_t blksize = ( complete_message_length > BUFSIZ ) ? + complete_message_length : BUFSIZ; ACE_Message_Block *mb = 0; // We allocate +1 only for proper printing - we can just set the last byte // to '\0' before printing out ACE_NEW_RETURN (mb, - ACE_Message_Block (complete_message_length + 1) + ACE_Message_Block (blksize + 1) , -1); // Inititiate read @@ -1244,6 +1301,7 @@ Sender::initiate_read_stream (void) #endif /* (defined (ACE_WIN32) && !defined (ACE_HAS_WINCE)) */ this->io_count_++; + this->total_r_++; return 0; } @@ -1346,7 +1404,7 @@ Sender::handle_write_stream (const ACE_Asynch_Write_Stream::Result &result) this->total_snd_ += result.bytes_transferred (); if (duplex != 0 && // full duplex, continue write - (this->total_snd_- this->total_rcv_) < 1024 ) //flow control + (this->total_snd_- this->total_rcv_) < 1024*32 ) //flow control this->initiate_write_stream (); else // half-duplex read reply, after read we will start write this->initiate_read_stream (); @@ -1649,14 +1707,14 @@ ACE_TMAIN (int argc, ACE_TCHAR *argv[]) ACE_TEXT ("Cancel Connector/Senders: sessions_=%d\n"), connector.get_number_sessions () )); - connector.cancel_all (); + //connector.cancel_all (); //Cancel all pending AIO on Acceptor And Receivers ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("Cancel Acceptor/Receivers:sessions_=%d\n"), acceptor.get_number_sessions () )); - acceptor.cancel_all (); + //acceptor.cancel_all (); ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("Stop Thread Pool Task\n") @@ -1683,8 +1741,13 @@ ACE_TMAIN (int argc, ACE_TCHAR *argv[]) ACE_TCHAR bufs [256]; ACE_TCHAR bufr [256]; - ACE_OS::sprintf (bufs , ACE_TEXT ("%ld"), connector.get_total_snd()); - ACE_OS::sprintf (bufr , ACE_TEXT ("%ld"), connector.get_total_rcv() ); + ACE_OS::sprintf ( bufs , ACE_TEXT ("%ld(%ld)"), + connector.get_total_snd(), + connector.get_total_w() ); + + ACE_OS::sprintf ( bufr , ACE_TEXT ("%ld(%ld)"), + connector.get_total_rcv(), + connector.get_total_r() ); ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("Connector/Senders total bytes: snd=%s rcv=%s\n"), @@ -1692,8 +1755,13 @@ ACE_TMAIN (int argc, ACE_TCHAR *argv[]) bufr )); - ACE_OS::sprintf (bufs , ACE_TEXT ("%ld"), acceptor.get_total_snd()); - ACE_OS::sprintf (bufr , ACE_TEXT ("%ld"), acceptor.get_total_rcv() ); + ACE_OS::sprintf ( bufs , ACE_TEXT ("%ld(%ld)"), + acceptor.get_total_snd(), + acceptor.get_total_w() ); + + ACE_OS::sprintf ( bufr , ACE_TEXT ("%ld(%ld)"), + acceptor.get_total_rcv(), + acceptor.get_total_r() ); ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("Acceptor/Receivers total bytes: snd=%s rcv=%s\n"), |