diff options
Diffstat (limited to 'tests/Proactor_Test.cpp')
-rw-r--r-- | tests/Proactor_Test.cpp | 416 |
1 files changed, 88 insertions, 328 deletions
diff --git a/tests/Proactor_Test.cpp b/tests/Proactor_Test.cpp index 0d2230049a8..a99e8fffd49 100644 --- a/tests/Proactor_Test.cpp +++ b/tests/Proactor_Test.cpp @@ -219,7 +219,6 @@ MyTask::create_proactor (ProactorType type_proactor, size_t max_op) ACE_TEXT ("(%t) Create Proactor Type = AIOCB\n"))); break; -#if defined(ACE_HAS_POSIX_REALTIME_SIGNALS) case SIG: ACE_NEW_RETURN (proactor_impl, ACE_POSIX_SIG_Proactor (max_op), @@ -227,7 +226,6 @@ MyTask::create_proactor (ProactorType type_proactor, size_t max_op) ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) Create Proactor Type = SIG\n"))); break; -#endif /* ACE_HAS_POSIX_REALTIME_SIGNALS */ # if defined (sun) case SUN: @@ -315,7 +313,7 @@ MyTask::stop () if (this->proactor_ != 0) { ACE_DEBUG ((LM_DEBUG, - ACE_TEXT (" (%t) Calling End Proactor event loop\n"))); + ACE_TEXT ("End Proactor event loop\n"))); ACE_Proactor::end_event_loop (); } @@ -362,10 +360,6 @@ public: long get_total_w (void) { return this->total_w_; } long get_total_r (void) { return this->total_r_; } - // This is called to pass the new connection's addresses. - virtual void addresses (const ACE_INET_Addr& peer, - const ACE_INET_Addr& local); - /// This is called after the new connection has been accepted. virtual void open (ACE_HANDLE handle, ACE_Message_Block &message_block); @@ -397,12 +391,12 @@ private: ACE_HANDLE handle_; ACE_SYNCH_MUTEX lock_; - long io_count_; // Number of currently outstanding I/O requests + long io_count_; int flg_cancel_; - size_t total_snd_; // Number of bytes successfully sent - size_t total_rcv_; // Number of bytes successfully received - long total_w_; // Number of write operations - long total_r_; // Number of read operations + size_t total_snd_; + size_t total_rcv_; + long total_w_; + long total_r_; }; class Acceptor : public ACE_Asynch_Acceptor<Receiver> @@ -460,6 +454,9 @@ Acceptor::~Acceptor (void) void Acceptor::cancel_all (void) { + // This method can be called only after proactor event loop is done + // in all threads. + ACE_GUARD (ACE_SYNCH_RECURSIVE_MUTEX, monitor, this->lock_); this->cancel (); @@ -495,8 +492,7 @@ Acceptor::on_new_receiver (Receiver & rcvr) this->sessions_++; this->list_receivers_[rcvr.index_] = &rcvr; ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("(%t) Acceptor: receiver %d up; now have %d.\n"), - rcvr.index_, + ACE_TEXT ("Receiver::CTOR sessions_ = %d\n"), this->sessions_)); } @@ -517,9 +513,22 @@ Acceptor::on_delete_receiver (Receiver & rcvr) && this->list_receivers_[rcvr.index_] == &rcvr) this->list_receivers_[rcvr.index_] = 0; + ACE_TCHAR bufs [256]; + ACE_TCHAR bufr [256]; + + ACE_OS::sprintf (bufs, ACE_TEXT ("%d(%ld)"), + rcvr.get_total_snd (), + rcvr.get_total_w ()); + + ACE_OS::sprintf (bufr, ACE_TEXT ("%d(%ld)"), + rcvr.get_total_rcv (), + rcvr.get_total_r ()); + ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("(%t) Acceptor: receiver %d gone; %d remain\n"), + ACE_TEXT ("Receiver::~DTOR index=%d snd=%s rcv=%s sessions_=%d\n"), rcvr.index_, + bufs, + bufr, this->sessions_)); } @@ -562,41 +571,6 @@ Receiver::Receiver (Acceptor * acceptor, int index) Receiver::~Receiver (void) { - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("(%t) Receiver %d dtor; %d sends (%d bytes); ") - ACE_TEXT ("%d recvs (%d bytes)\n"), - this->index_, - this->total_w_, this->total_snd_, - this->total_r_, this->total_rcv_)); - if (this->io_count_ != 0) - ACE_ERROR ((LM_WARNING, - ACE_TEXT ("(%t) Receiver %d deleted with ") - ACE_TEXT ("%d I/O outstanding\n"), - this->index_, - this->io_count_)); - - // This test bounces data back and forth between Senders and Receivers. - // Therefore, if there was significantly more data in one direction, that's - // a problem. Remember, the byte counts are unsigned values. - int issue_data_warning = 0; - if (this->total_snd_ > this->total_rcv_) - { - if (this->total_rcv_ == 0) - issue_data_warning = 1; - else if (this->total_snd_ / this->total_rcv_ > 2) - issue_data_warning = 1; - } - else - { - if (this->total_snd_ == 0) - issue_data_warning = 1; - else if (this->total_rcv_ / this->total_snd_ > 2) - issue_data_warning = 1; - } - if (issue_data_warning) - ACE_DEBUG ((LM_WARNING, - ACE_TEXT ("(%t) Above byte counts look odd; need review\n"))); - if (this->acceptor_ != 0) this->acceptor_->on_delete_receiver (*this); @@ -620,45 +594,20 @@ Receiver::cancel () void -Receiver::addresses (const ACE_INET_Addr& peer, const ACE_INET_Addr&) -{ - ACE_TCHAR str[256]; - if (0 == peer.addr_to_string (str, sizeof (str) / sizeof (ACE_TCHAR))) - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("(%t) Receiver %d connection from %s\n"), - this->index_, - str)); - else - ACE_ERROR ((LM_ERROR, ACE_TEXT ("(%t) Receiver %d %p\n"), - this->index_, - ACE_TEXT ("addr_to_string"))); - return; -} - - -void Receiver::open (ACE_HANDLE handle, ACE_Message_Block &) { { ACE_GUARD (ACE_SYNCH_MUTEX, monitor, this->lock_); - // Don't buffer serial sends. this->handle_ = handle; - int nodelay = 1; - ACE_SOCK_Stream option_setter (handle); - if (-1 == option_setter.set_option (IPPROTO_TCP, - TCP_NODELAY, - &nodelay, - sizeof (nodelay))) - ACE_ERROR ((LM_ERROR, "%p\n", "set_option")); if (this->ws_.open (*this, this->handle_) == -1) ACE_ERROR ((LM_ERROR, - ACE_TEXT ("(%t) %p\n"), + ACE_TEXT ("%p\n"), ACE_TEXT ("Receiver::ACE_Asynch_Write_Stream::open"))); else if (this->rs_.open (*this, this->handle_) == -1) ACE_ERROR ((LM_ERROR, - ACE_TEXT ("(%t) %p\n"), + ACE_TEXT ("%p\n"), ACE_TEXT ("Receiver::ACE_Asynch_Read_Stream::open"))); else this->initiate_read_stream (); @@ -685,7 +634,7 @@ Receiver::initiate_read_stream (void) { mb->release (); ACE_ERROR_RETURN ((LM_ERROR, - ACE_TEXT ("(%t) %p\n"), + ACE_TEXT ("%p\n"), ACE_TEXT ("Receiver::ACE_Asynch_Stream::read")), -1); } @@ -708,7 +657,7 @@ Receiver::initiate_write_stream (ACE_Message_Block &mb, size_t nbytes) { mb.release (); ACE_ERROR_RETURN((LM_ERROR, - ACE_TEXT ("(%t) Receiver::ACE_Asynch_Write_Stream::write nbytes <0 ")), + ACE_TEXT ("Receiver::ACE_Asynch_Write_Stream::write nbytes <0 ")), -1); } @@ -716,7 +665,7 @@ Receiver::initiate_write_stream (ACE_Message_Block &mb, size_t nbytes) { mb.release (); ACE_ERROR_RETURN((LM_ERROR, - ACE_TEXT ("(%t) %p\n"), + ACE_TEXT ("%p\n"), ACE_TEXT ("Receiver::ACE_Asynch_Write_Stream::write")), -1); } @@ -737,12 +686,14 @@ Receiver::handle_read_stream (const ACE_Asynch_Read_Stream::Result &result) // Reset pointers. mb.rd_ptr ()[result.bytes_transferred ()] = '\0'; - if (loglevel == 0) + if (loglevel == 0 + || result.bytes_transferred () == 0 + || result.error () != 0) { LogLocker log_lock; ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("(%t) **** Receiver %d: handle_read_stream() ****\n"), + ACE_TEXT ("**** Receiver::handle_read_stream() SessionId = %d ****\n"), this->index_)); ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("%s = %d\n"), @@ -779,31 +730,6 @@ Receiver::handle_read_stream (const ACE_Asynch_Read_Stream::Result &result) ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("**** end of message ****************\n"))); } - else if (result.error () != 0) - { - ACE_Log_Priority prio; -#if defined (ACE_WIN32) - if (result.error () == ERROR_OPERATION_ABORTED) - prio = LM_DEBUG; -#else - if (result.error () == ECANCELED) - prio = LM_DEBUG; -#endif /* ACE_WIN32 */ - else - prio = LM_ERROR; - ACE_Log_Msg::instance ()->errnum (result.error ()); - ACE_Log_Msg::instance ()->log (prio, - ACE_TEXT ("(%t) Receiver %d; %p\n"), - this->index_, - ACE_TEXT ("read")); - } - else - { - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("(%t) Receiver %d: read %d bytes\n"), - this->index_, - result.bytes_transferred ())); - } if (result.error () == 0 && result.bytes_transferred () > 0) { @@ -834,7 +760,9 @@ Receiver::handle_write_stream (const ACE_Asynch_Write_Stream::Result &result) ACE_Message_Block & mb = result.message_block (); - if (loglevel == 0) + if (loglevel == 0 || + result.bytes_transferred () == 0 || + result.error () != 0) { LogLocker log_lock; @@ -842,7 +770,7 @@ Receiver::handle_write_stream (const ACE_Asynch_Write_Stream::Result &result) mb.rd_ptr (mb.rd_ptr () - result.bytes_transferred ()); ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("(%t) **** Receiver %d: handle_write_stream() ****\n"), + ACE_TEXT ("**** Receiver::handle_write_stream() SessionId = %d ****\n"), this->index_)); ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("%s = %d\n"), @@ -879,31 +807,6 @@ Receiver::handle_write_stream (const ACE_Asynch_Write_Stream::Result &result) ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("**** end of message ****************\n"))); } - else if (result.error () != 0) - { - ACE_Log_Priority prio; -#if defined (ACE_WIN32) - if (result.error () == ERROR_OPERATION_ABORTED) - prio = LM_DEBUG; -#else - if (result.error () == ECANCELED) - prio = LM_DEBUG; -#endif /* ACE_WIN32 */ - else - prio = LM_ERROR; - ACE_Log_Msg::instance ()->errnum (result.error ()); - ACE_Log_Msg::instance ()->log (prio, - ACE_TEXT ("(%t) Receiver %d; %p\n"), - this->index_, - ACE_TEXT ("write")); - } - else - { - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("(%t) Receiver %d: wrote %d bytes ok\n"), - this->index_, - result.bytes_transferred ())); - } mb.release (); @@ -945,10 +848,6 @@ public: long get_total_w (void) { return this->total_w_; } long get_total_r (void) { return this->total_r_; } - // This is called to pass the new connection's addresses. - virtual void addresses (const ACE_INET_Addr& peer, - const ACE_INET_Addr& local); - virtual void handle_read_stream (const ACE_Asynch_Read_Stream::Result &result); // This is called when asynchronous reads from the socket complete @@ -958,8 +857,7 @@ public: private: int initiate_read_stream (void); int initiate_write_stream (void); - void cancel (void); - void close (void); + void cancel (); int index_; Connector * connector_; @@ -994,7 +892,6 @@ public: int start (const ACE_INET_Addr &addr, int num); void stop (void); void cancel_all (void); - void close_all (void); // Virtual from ACE_Asynch_Connector Sender *make_handler (void); @@ -1036,6 +933,8 @@ Connector::~Connector (void) void Connector::cancel_all(void) { + // This method can be called only after proactor event loop is done + // in all threads. ACE_GUARD (ACE_SYNCH_RECURSIVE_MUTEX, monitor, this->lock_); this->cancel (); @@ -1048,21 +947,6 @@ Connector::cancel_all(void) return; } - -void -Connector::close_all (void) -{ - ACE_GUARD (ACE_SYNCH_RECURSIVE_MUTEX, monitor, this->lock_); - - for (int i = 0; i < MAX_SENDERS; ++i) - { - if (this->list_senders_[i] != 0) - this->list_senders_[i]->close (); - } - return; -} - - void Connector::stop (void) { @@ -1085,8 +969,7 @@ Connector::on_new_sender (Sender &sndr) this->sessions_++; this->list_senders_[sndr.index_] = &sndr; ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("(%t) Connector: sender %d up; now have %d.\n"), - sndr.index_, + ACE_TEXT ("Sender::CTOR sessions_ = %d\n"), this->sessions_)); } @@ -1106,9 +989,22 @@ Connector::on_delete_sender (Sender &sndr) && this->list_senders_[sndr.index_] == &sndr) this->list_senders_[sndr.index_] = 0; + ACE_TCHAR bufs [256]; + ACE_TCHAR bufr [256]; + + ACE_OS::sprintf (bufs, ACE_TEXT ("%d(%ld)"), + sndr.get_total_snd (), + sndr.get_total_w ()); + + ACE_OS::sprintf (bufr, ACE_TEXT ("%d(%ld)"), + sndr.get_total_rcv (), + sndr.get_total_r ()); + ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("(%t) Connector: sender %d gone; %d remain\n"), + ACE_TEXT ("Sender::~DTOR index=%d snd=%s rcv=%s sessions_=%d\n"), sndr.index_, + bufs, + bufr, this->sessions_)); } @@ -1156,7 +1052,7 @@ Connector::start (const ACE_INET_Addr& addr, int num) if (this->open (1, 0, 1) != 0) { ACE_ERROR ((LM_ERROR, - ACE_LIB_TEXT ("(%t) %p\n"), + ACE_LIB_TEXT ("%p\n"), ACE_LIB_TEXT ("Connector::open failed"))); return rc; } @@ -1166,8 +1062,8 @@ Connector::start (const ACE_INET_Addr& addr, int num) if (this->connect (addr) != 0) { ACE_ERROR ((LM_ERROR, - ACE_TEXT ("(%t) %p\n"), - ACE_TEXT ("Connector::connect failed"))); + ACE_LIB_TEXT ("%p\n"), + ACE_LIB_TEXT ("Connector::connect failed"))); break; } } @@ -1192,40 +1088,6 @@ Sender::Sender (Connector * connector, int index) Sender::~Sender (void) { - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("(%t) Sender %d dtor; %d sends (%d bytes); ") - ACE_TEXT ("%d recvs (%d bytes)\n"), - this->index_, - this->total_w_, this->total_snd_, - this->total_r_, this->total_rcv_)); - if (this->io_count_ != 0) - ACE_ERROR ((LM_WARNING, - ACE_TEXT ("(%t) Sender %d deleted with %d I/O outstanding\n"), - this->index_, - this->io_count_)); - - // This test bounces data back and forth between Senders and Receivers. - // Therefore, if there was significantly more data in one direction, that's - // a problem. Remember, the byte counts are unsigned values. - int issue_data_warning = 0; - if (this->total_snd_ > this->total_rcv_) - { - if (this->total_rcv_ == 0) - issue_data_warning = 1; - else if (this->total_snd_ / this->total_rcv_ > 2) - issue_data_warning = 1; - } - else - { - if (this->total_snd_ == 0) - issue_data_warning = 1; - else if (this->total_rcv_ / this->total_snd_ > 2) - issue_data_warning = 1; - } - if (issue_data_warning) - ACE_DEBUG ((LM_WARNING, - ACE_TEXT ("(%t) Above byte counts look odd; need review\n"))); - if (this->connector_ != 0) this->connector_->on_delete_sender (*this); @@ -1249,62 +1111,24 @@ Sender::cancel () return; } -void -Sender::close () -{ - ACE_GUARD (ACE_SYNCH_MUTEX, monitor, this->lock_); - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("(%t) Closing Sender %d; %d I/O outstanding\n"), - this->index_, this->io_count_)); - ACE_OS::closesocket (this->handle_); - this->handle_ = ACE_INVALID_HANDLE; - return; -} - - -void -Sender::addresses (const ACE_INET_Addr& /* peer */, const ACE_INET_Addr& local) -{ - ACE_TCHAR str[256]; - if (0 == local.addr_to_string (str, sizeof (str) / sizeof (ACE_TCHAR))) - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("(%t) Sender %d connected on %s\n"), - this->index_, - str)); - else - ACE_ERROR ((LM_ERROR, ACE_TEXT ("(%t) Receiver %d %p\n"), - this->index_, - ACE_TEXT ("addr_to_string"))); - return; -} - void Sender::open (ACE_HANDLE handle, ACE_Message_Block &) { { ACE_GUARD (ACE_SYNCH_MUTEX, monitor, this->lock_); - - // Don't buffer serial sends. this->handle_ = handle; - int nodelay = 1; - ACE_SOCK_Stream option_setter (handle); - if (option_setter.set_option (IPPROTO_TCP, - TCP_NODELAY, - &nodelay, - sizeof (nodelay))) - ACE_ERROR ((LM_ERROR, "%p\n", "set_option")); // Open ACE_Asynch_Write_Stream if (this->ws_.open (*this, this->handle_) == -1) ACE_ERROR ((LM_ERROR, - ACE_TEXT ("(%t) %p\n"), + ACE_TEXT ("%p\n"), ACE_TEXT ("Sender::ACE_Asynch_Write_Stream::open"))); // Open ACE_Asynch_Read_Stream else if (this->rs_.open (*this, this->handle_) == -1) ACE_ERROR ((LM_ERROR, - ACE_TEXT ("(%t) %p\n"), + ACE_TEXT ("%p\n"), ACE_TEXT ("Sender::ACE_Asynch_Read_Stream::open"))); else if (this->initiate_write_stream () == 0) @@ -1361,7 +1185,7 @@ Sender::initiate_write_stream (void) { mb1->release (); ACE_ERROR_RETURN((LM_ERROR, - ACE_TEXT ("(%t) %p\n"), + ACE_TEXT ("%p\n"), ACE_TEXT ("Sender::ACE_Asynch_Stream::writev")), -1); } @@ -1379,7 +1203,7 @@ Sender::initiate_write_stream (void) { mb->release (); ACE_ERROR_RETURN((LM_ERROR, - ACE_TEXT ("(%t) %p\n"), + ACE_TEXT ("%p\n"), ACE_TEXT ("Sender::ACE_Asynch_Stream::write")), -1); } @@ -1443,7 +1267,7 @@ Sender::initiate_read_stream (void) { mb1->release (); ACE_ERROR_RETURN ((LM_ERROR, - ACE_TEXT ("(%t) %p\n"), + ACE_TEXT ("%p\n"), ACE_TEXT ("Sender::ACE_Asynch_Read_Stream::readv")), -1); } @@ -1466,7 +1290,7 @@ Sender::initiate_read_stream (void) { mb->release (); ACE_ERROR_RETURN ((LM_ERROR, - ACE_TEXT ("(%t) %p\n"), + ACE_TEXT ("%p\n"), ACE_TEXT ("Sender::ACE_Asynch_Read_Stream::read")), -1); } @@ -1485,12 +1309,14 @@ Sender::handle_write_stream (const ACE_Asynch_Write_Stream::Result &result) ACE_Message_Block & mb = result.message_block (); - if (loglevel == 0) + if (loglevel == 0 + || result.bytes_transferred () == 0 + || result.error () != 0) { LogLocker log_lock; ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("(%t) **** Sender %d: handle_write_stream() ****\n"), + ACE_TEXT ("**** Sender::handle_write_stream() SessionId = %d ****\n"), index_)); ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("%s = %d\n"), @@ -1566,31 +1392,6 @@ Sender::handle_write_stream (const ACE_Asynch_Write_Stream::Result &result) ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("**** end of message ****************\n"))); } - else if (result.error () != 0) - { - ACE_Log_Priority prio; -#if defined (ACE_WIN32) - if (result.error () == ERROR_OPERATION_ABORTED) - prio = LM_DEBUG; -#else - if (result.error () == ECANCELED) - prio = LM_DEBUG; -#endif /* ACE_WIN32 */ - else - prio = LM_ERROR; - ACE_Log_Msg::instance ()->errnum (result.error ()); - ACE_Log_Msg::instance ()->log (prio, - ACE_TEXT ("(%t) Sender %d; %p\n"), - this->index_, - ACE_TEXT ("write")); - } - else - { - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("(%t) Sender %d: wrote %d bytes ok\n"), - this->index_, - result.bytes_transferred ())); - } mb.release (); @@ -1598,11 +1399,9 @@ Sender::handle_write_stream (const ACE_Asynch_Write_Stream::Result &result) { this->total_snd_ += result.bytes_transferred (); - if (duplex != 0) // full duplex, continue write - { - if ((this->total_snd_- this->total_rcv_) < 1024*32 ) //flow control - this->initiate_write_stream (); - } + if (duplex != 0 && // full duplex, continue write + (this->total_snd_- this->total_rcv_) < 1024*32 ) //flow control + this->initiate_write_stream (); else // half-duplex read reply, after read we will start write this->initiate_read_stream (); } @@ -1622,12 +1421,14 @@ Sender::handle_read_stream (const ACE_Asynch_Read_Stream::Result &result) ACE_Message_Block & mb = result.message_block (); - if (loglevel == 0) + if (loglevel == 0 + || result.bytes_transferred () == 0 + || result.error () != 0) { LogLocker log_lock; ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("(%t) **** Sender %d: handle_read_stream() ****\n"), + ACE_TEXT ("**** Sender::handle_read_stream() SessionId = %d ****\n"), index_)); ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("%s = %d\n"), @@ -1686,31 +1487,6 @@ Sender::handle_read_stream (const ACE_Asynch_Read_Stream::Result &result) ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("**** end of message ****************\n"))); } - else if (result.error () != 0) - { - ACE_Log_Priority prio; -#if defined (ACE_WIN32) - if (result.error () == ERROR_OPERATION_ABORTED) - prio = LM_DEBUG; -#else - if (result.error () == ECANCELED) - prio = LM_DEBUG; -#endif /* ACE_WIN32 */ - else - prio = LM_ERROR; - ACE_Log_Msg::instance ()->errnum (result.error ()); - ACE_Log_Msg::instance ()->log (prio, - ACE_TEXT ("(%t) Sender %d; %p\n"), - this->index_, - ACE_TEXT ("read")); - } - else - { - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("(%t) Sender %d: read %d bytes ok\n"), - this->index_, - result.bytes_transferred ())); - } mb.release (); @@ -1815,22 +1591,21 @@ parse_args (int argc, ACE_TCHAR *argv[]) max_aio_operations = 512; // POSIX Proactor params #if defined (sun) proactor_type = SUN; // Proactor type for SunOS - threads = 1; // aiosuspend() not MT Safe. #else proactor_type = DEFAULT; // Proactor type = default - threads = 3; // size of Proactor thread pool #endif + threads = 3; // size of Proactor thread pool #if defined(__sgi) || defined (ACE_LINUX_COMMON_H) ACE_DEBUG (( LM_DEBUG, - "Weak AIO implementation, test will work with 3 clients")); - senders = 3; // number of senders + "Weak AIO implementation, test will work with 1 client")); + senders = 1; // number of senders #else - senders = 10; // number of senders + senders = 20; // number of senders #endif loglevel = 1; // log level : 0 full/ 1 only errors - seconds = 15; // time to run in seconds + seconds = 20; // time to run in seconds return 0; } @@ -1931,44 +1706,29 @@ ACE_TMAIN (int argc, ACE_TCHAR *argv[]) ACE_OS::sleep (seconds); } - // Now close all the connector/senders. This should trip all the receivers - // to close as well. + //Cancel all pending AIO on Connector and Senders ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("(%t) Close Connector/Senders: sessions_=%d\n"), + ACE_TEXT ("Cancel Connector/Senders: sessions_=%d\n"), connector.get_number_sessions () )); - connector.close_all (); - - // Wait til all the sessions run down. - ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) Sleeping til sessions run down.\n"))); - while (acceptor.get_number_sessions () > 0 || - connector.get_number_sessions () > 0 ) - ACE_OS::sleep (1); -#if 0 - // Cancel all pending AIO on Connector and Senders - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("(%t) Cancel Connector/Senders: sessions_=%d\n"), - connector.get_number_sessions () - )); - connector.cancel_all (); -#endif + //connector.cancel_all (); //Cancel all pending AIO on Acceptor And Receivers ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("(%t) Cancel Acceptor/Receivers:sessions_=%d\n"), + ACE_TEXT ("Cancel Acceptor/Receivers:sessions_=%d\n"), acceptor.get_number_sessions () )); - acceptor.cancel_all (); + //acceptor.cancel_all (); ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("(%t) Stop Thread Pool Task\n") + ACE_TEXT ("Stop Thread Pool Task\n") )); task1.stop (); // As Proactor event loop now is inactive it is safe to destroy all // Senders ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("(%t) Stop Connector/Senders: sessions_=%d\n"), + ACE_TEXT ("Stop Connector/Senders: sessions_=%d\n"), connector.get_number_sessions () )); connector.stop (); @@ -1976,7 +1736,7 @@ ACE_TMAIN (int argc, ACE_TCHAR *argv[]) // As Proactor event loop now is inactive it is safe to destroy all // Receivers ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("(%t) Stop Acceptor/Receivers:sessions_=%d\n"), + ACE_TEXT ("Stop Acceptor/Receivers:sessions_=%d\n"), acceptor.get_number_sessions () )); acceptor.stop (); |