summaryrefslogtreecommitdiff
path: root/tests/Proactor_Test.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'tests/Proactor_Test.cpp')
-rw-r--r--tests/Proactor_Test.cpp416
1 files changed, 88 insertions, 328 deletions
diff --git a/tests/Proactor_Test.cpp b/tests/Proactor_Test.cpp
index 0d2230049a8..a99e8fffd49 100644
--- a/tests/Proactor_Test.cpp
+++ b/tests/Proactor_Test.cpp
@@ -219,7 +219,6 @@ MyTask::create_proactor (ProactorType type_proactor, size_t max_op)
ACE_TEXT ("(%t) Create Proactor Type = AIOCB\n")));
break;
-#if defined(ACE_HAS_POSIX_REALTIME_SIGNALS)
case SIG:
ACE_NEW_RETURN (proactor_impl,
ACE_POSIX_SIG_Proactor (max_op),
@@ -227,7 +226,6 @@ MyTask::create_proactor (ProactorType type_proactor, size_t max_op)
ACE_DEBUG ((LM_DEBUG,
ACE_TEXT ("(%t) Create Proactor Type = SIG\n")));
break;
-#endif /* ACE_HAS_POSIX_REALTIME_SIGNALS */
# if defined (sun)
case SUN:
@@ -315,7 +313,7 @@ MyTask::stop ()
if (this->proactor_ != 0)
{
ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT (" (%t) Calling End Proactor event loop\n")));
+ ACE_TEXT ("End Proactor event loop\n")));
ACE_Proactor::end_event_loop ();
}
@@ -362,10 +360,6 @@ public:
long get_total_w (void) { return this->total_w_; }
long get_total_r (void) { return this->total_r_; }
- // This is called to pass the new connection's addresses.
- virtual void addresses (const ACE_INET_Addr& peer,
- const ACE_INET_Addr& local);
-
/// This is called after the new connection has been accepted.
virtual void open (ACE_HANDLE handle,
ACE_Message_Block &message_block);
@@ -397,12 +391,12 @@ private:
ACE_HANDLE handle_;
ACE_SYNCH_MUTEX lock_;
- long io_count_; // Number of currently outstanding I/O requests
+ long io_count_;
int flg_cancel_;
- size_t total_snd_; // Number of bytes successfully sent
- size_t total_rcv_; // Number of bytes successfully received
- long total_w_; // Number of write operations
- long total_r_; // Number of read operations
+ size_t total_snd_;
+ size_t total_rcv_;
+ long total_w_;
+ long total_r_;
};
class Acceptor : public ACE_Asynch_Acceptor<Receiver>
@@ -460,6 +454,9 @@ Acceptor::~Acceptor (void)
void
Acceptor::cancel_all (void)
{
+ // This method can be called only after proactor event loop is done
+ // in all threads.
+
ACE_GUARD (ACE_SYNCH_RECURSIVE_MUTEX, monitor, this->lock_);
this->cancel ();
@@ -495,8 +492,7 @@ Acceptor::on_new_receiver (Receiver & rcvr)
this->sessions_++;
this->list_receivers_[rcvr.index_] = &rcvr;
ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("(%t) Acceptor: receiver %d up; now have %d.\n"),
- rcvr.index_,
+ ACE_TEXT ("Receiver::CTOR sessions_ = %d\n"),
this->sessions_));
}
@@ -517,9 +513,22 @@ Acceptor::on_delete_receiver (Receiver & rcvr)
&& this->list_receivers_[rcvr.index_] == &rcvr)
this->list_receivers_[rcvr.index_] = 0;
+ ACE_TCHAR bufs [256];
+ ACE_TCHAR bufr [256];
+
+ ACE_OS::sprintf (bufs, ACE_TEXT ("%d(%ld)"),
+ rcvr.get_total_snd (),
+ rcvr.get_total_w ());
+
+ ACE_OS::sprintf (bufr, ACE_TEXT ("%d(%ld)"),
+ rcvr.get_total_rcv (),
+ rcvr.get_total_r ());
+
ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("(%t) Acceptor: receiver %d gone; %d remain\n"),
+ ACE_TEXT ("Receiver::~DTOR index=%d snd=%s rcv=%s sessions_=%d\n"),
rcvr.index_,
+ bufs,
+ bufr,
this->sessions_));
}
@@ -562,41 +571,6 @@ Receiver::Receiver (Acceptor * acceptor, int index)
Receiver::~Receiver (void)
{
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("(%t) Receiver %d dtor; %d sends (%d bytes); ")
- ACE_TEXT ("%d recvs (%d bytes)\n"),
- this->index_,
- this->total_w_, this->total_snd_,
- this->total_r_, this->total_rcv_));
- if (this->io_count_ != 0)
- ACE_ERROR ((LM_WARNING,
- ACE_TEXT ("(%t) Receiver %d deleted with ")
- ACE_TEXT ("%d I/O outstanding\n"),
- this->index_,
- this->io_count_));
-
- // This test bounces data back and forth between Senders and Receivers.
- // Therefore, if there was significantly more data in one direction, that's
- // a problem. Remember, the byte counts are unsigned values.
- int issue_data_warning = 0;
- if (this->total_snd_ > this->total_rcv_)
- {
- if (this->total_rcv_ == 0)
- issue_data_warning = 1;
- else if (this->total_snd_ / this->total_rcv_ > 2)
- issue_data_warning = 1;
- }
- else
- {
- if (this->total_snd_ == 0)
- issue_data_warning = 1;
- else if (this->total_rcv_ / this->total_snd_ > 2)
- issue_data_warning = 1;
- }
- if (issue_data_warning)
- ACE_DEBUG ((LM_WARNING,
- ACE_TEXT ("(%t) Above byte counts look odd; need review\n")));
-
if (this->acceptor_ != 0)
this->acceptor_->on_delete_receiver (*this);
@@ -620,45 +594,20 @@ Receiver::cancel ()
void
-Receiver::addresses (const ACE_INET_Addr& peer, const ACE_INET_Addr&)
-{
- ACE_TCHAR str[256];
- if (0 == peer.addr_to_string (str, sizeof (str) / sizeof (ACE_TCHAR)))
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("(%t) Receiver %d connection from %s\n"),
- this->index_,
- str));
- else
- ACE_ERROR ((LM_ERROR, ACE_TEXT ("(%t) Receiver %d %p\n"),
- this->index_,
- ACE_TEXT ("addr_to_string")));
- return;
-}
-
-
-void
Receiver::open (ACE_HANDLE handle, ACE_Message_Block &)
{
{
ACE_GUARD (ACE_SYNCH_MUTEX, monitor, this->lock_);
- // Don't buffer serial sends.
this->handle_ = handle;
- int nodelay = 1;
- ACE_SOCK_Stream option_setter (handle);
- if (-1 == option_setter.set_option (IPPROTO_TCP,
- TCP_NODELAY,
- &nodelay,
- sizeof (nodelay)))
- ACE_ERROR ((LM_ERROR, "%p\n", "set_option"));
if (this->ws_.open (*this, this->handle_) == -1)
ACE_ERROR ((LM_ERROR,
- ACE_TEXT ("(%t) %p\n"),
+ ACE_TEXT ("%p\n"),
ACE_TEXT ("Receiver::ACE_Asynch_Write_Stream::open")));
else if (this->rs_.open (*this, this->handle_) == -1)
ACE_ERROR ((LM_ERROR,
- ACE_TEXT ("(%t) %p\n"),
+ ACE_TEXT ("%p\n"),
ACE_TEXT ("Receiver::ACE_Asynch_Read_Stream::open")));
else
this->initiate_read_stream ();
@@ -685,7 +634,7 @@ Receiver::initiate_read_stream (void)
{
mb->release ();
ACE_ERROR_RETURN ((LM_ERROR,
- ACE_TEXT ("(%t) %p\n"),
+ ACE_TEXT ("%p\n"),
ACE_TEXT ("Receiver::ACE_Asynch_Stream::read")),
-1);
}
@@ -708,7 +657,7 @@ Receiver::initiate_write_stream (ACE_Message_Block &mb, size_t nbytes)
{
mb.release ();
ACE_ERROR_RETURN((LM_ERROR,
- ACE_TEXT ("(%t) Receiver::ACE_Asynch_Write_Stream::write nbytes <0 ")),
+ ACE_TEXT ("Receiver::ACE_Asynch_Write_Stream::write nbytes <0 ")),
-1);
}
@@ -716,7 +665,7 @@ Receiver::initiate_write_stream (ACE_Message_Block &mb, size_t nbytes)
{
mb.release ();
ACE_ERROR_RETURN((LM_ERROR,
- ACE_TEXT ("(%t) %p\n"),
+ ACE_TEXT ("%p\n"),
ACE_TEXT ("Receiver::ACE_Asynch_Write_Stream::write")),
-1);
}
@@ -737,12 +686,14 @@ Receiver::handle_read_stream (const ACE_Asynch_Read_Stream::Result &result)
// Reset pointers.
mb.rd_ptr ()[result.bytes_transferred ()] = '\0';
- if (loglevel == 0)
+ if (loglevel == 0
+ || result.bytes_transferred () == 0
+ || result.error () != 0)
{
LogLocker log_lock;
ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("(%t) **** Receiver %d: handle_read_stream() ****\n"),
+ ACE_TEXT ("**** Receiver::handle_read_stream() SessionId = %d ****\n"),
this->index_));
ACE_DEBUG ((LM_DEBUG,
ACE_TEXT ("%s = %d\n"),
@@ -779,31 +730,6 @@ Receiver::handle_read_stream (const ACE_Asynch_Read_Stream::Result &result)
ACE_DEBUG ((LM_DEBUG,
ACE_TEXT ("**** end of message ****************\n")));
}
- else if (result.error () != 0)
- {
- ACE_Log_Priority prio;
-#if defined (ACE_WIN32)
- if (result.error () == ERROR_OPERATION_ABORTED)
- prio = LM_DEBUG;
-#else
- if (result.error () == ECANCELED)
- prio = LM_DEBUG;
-#endif /* ACE_WIN32 */
- else
- prio = LM_ERROR;
- ACE_Log_Msg::instance ()->errnum (result.error ());
- ACE_Log_Msg::instance ()->log (prio,
- ACE_TEXT ("(%t) Receiver %d; %p\n"),
- this->index_,
- ACE_TEXT ("read"));
- }
- else
- {
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("(%t) Receiver %d: read %d bytes\n"),
- this->index_,
- result.bytes_transferred ()));
- }
if (result.error () == 0 && result.bytes_transferred () > 0)
{
@@ -834,7 +760,9 @@ Receiver::handle_write_stream (const ACE_Asynch_Write_Stream::Result &result)
ACE_Message_Block & mb = result.message_block ();
- if (loglevel == 0)
+ if (loglevel == 0 ||
+ result.bytes_transferred () == 0 ||
+ result.error () != 0)
{
LogLocker log_lock;
@@ -842,7 +770,7 @@ Receiver::handle_write_stream (const ACE_Asynch_Write_Stream::Result &result)
mb.rd_ptr (mb.rd_ptr () - result.bytes_transferred ());
ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("(%t) **** Receiver %d: handle_write_stream() ****\n"),
+ ACE_TEXT ("**** Receiver::handle_write_stream() SessionId = %d ****\n"),
this->index_));
ACE_DEBUG ((LM_DEBUG,
ACE_TEXT ("%s = %d\n"),
@@ -879,31 +807,6 @@ Receiver::handle_write_stream (const ACE_Asynch_Write_Stream::Result &result)
ACE_DEBUG ((LM_DEBUG,
ACE_TEXT ("**** end of message ****************\n")));
}
- else if (result.error () != 0)
- {
- ACE_Log_Priority prio;
-#if defined (ACE_WIN32)
- if (result.error () == ERROR_OPERATION_ABORTED)
- prio = LM_DEBUG;
-#else
- if (result.error () == ECANCELED)
- prio = LM_DEBUG;
-#endif /* ACE_WIN32 */
- else
- prio = LM_ERROR;
- ACE_Log_Msg::instance ()->errnum (result.error ());
- ACE_Log_Msg::instance ()->log (prio,
- ACE_TEXT ("(%t) Receiver %d; %p\n"),
- this->index_,
- ACE_TEXT ("write"));
- }
- else
- {
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("(%t) Receiver %d: wrote %d bytes ok\n"),
- this->index_,
- result.bytes_transferred ()));
- }
mb.release ();
@@ -945,10 +848,6 @@ public:
long get_total_w (void) { return this->total_w_; }
long get_total_r (void) { return this->total_r_; }
- // This is called to pass the new connection's addresses.
- virtual void addresses (const ACE_INET_Addr& peer,
- const ACE_INET_Addr& local);
-
virtual void handle_read_stream (const ACE_Asynch_Read_Stream::Result &result);
// This is called when asynchronous reads from the socket complete
@@ -958,8 +857,7 @@ public:
private:
int initiate_read_stream (void);
int initiate_write_stream (void);
- void cancel (void);
- void close (void);
+ void cancel ();
int index_;
Connector * connector_;
@@ -994,7 +892,6 @@ public:
int start (const ACE_INET_Addr &addr, int num);
void stop (void);
void cancel_all (void);
- void close_all (void);
// Virtual from ACE_Asynch_Connector
Sender *make_handler (void);
@@ -1036,6 +933,8 @@ Connector::~Connector (void)
void
Connector::cancel_all(void)
{
+ // This method can be called only after proactor event loop is done
+ // in all threads.
ACE_GUARD (ACE_SYNCH_RECURSIVE_MUTEX, monitor, this->lock_);
this->cancel ();
@@ -1048,21 +947,6 @@ Connector::cancel_all(void)
return;
}
-
-void
-Connector::close_all (void)
-{
- ACE_GUARD (ACE_SYNCH_RECURSIVE_MUTEX, monitor, this->lock_);
-
- for (int i = 0; i < MAX_SENDERS; ++i)
- {
- if (this->list_senders_[i] != 0)
- this->list_senders_[i]->close ();
- }
- return;
-}
-
-
void
Connector::stop (void)
{
@@ -1085,8 +969,7 @@ Connector::on_new_sender (Sender &sndr)
this->sessions_++;
this->list_senders_[sndr.index_] = &sndr;
ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("(%t) Connector: sender %d up; now have %d.\n"),
- sndr.index_,
+ ACE_TEXT ("Sender::CTOR sessions_ = %d\n"),
this->sessions_));
}
@@ -1106,9 +989,22 @@ Connector::on_delete_sender (Sender &sndr)
&& this->list_senders_[sndr.index_] == &sndr)
this->list_senders_[sndr.index_] = 0;
+ ACE_TCHAR bufs [256];
+ ACE_TCHAR bufr [256];
+
+ ACE_OS::sprintf (bufs, ACE_TEXT ("%d(%ld)"),
+ sndr.get_total_snd (),
+ sndr.get_total_w ());
+
+ ACE_OS::sprintf (bufr, ACE_TEXT ("%d(%ld)"),
+ sndr.get_total_rcv (),
+ sndr.get_total_r ());
+
ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("(%t) Connector: sender %d gone; %d remain\n"),
+ ACE_TEXT ("Sender::~DTOR index=%d snd=%s rcv=%s sessions_=%d\n"),
sndr.index_,
+ bufs,
+ bufr,
this->sessions_));
}
@@ -1156,7 +1052,7 @@ Connector::start (const ACE_INET_Addr& addr, int num)
if (this->open (1, 0, 1) != 0)
{
ACE_ERROR ((LM_ERROR,
- ACE_LIB_TEXT ("(%t) %p\n"),
+ ACE_LIB_TEXT ("%p\n"),
ACE_LIB_TEXT ("Connector::open failed")));
return rc;
}
@@ -1166,8 +1062,8 @@ Connector::start (const ACE_INET_Addr& addr, int num)
if (this->connect (addr) != 0)
{
ACE_ERROR ((LM_ERROR,
- ACE_TEXT ("(%t) %p\n"),
- ACE_TEXT ("Connector::connect failed")));
+ ACE_LIB_TEXT ("%p\n"),
+ ACE_LIB_TEXT ("Connector::connect failed")));
break;
}
}
@@ -1192,40 +1088,6 @@ Sender::Sender (Connector * connector, int index)
Sender::~Sender (void)
{
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("(%t) Sender %d dtor; %d sends (%d bytes); ")
- ACE_TEXT ("%d recvs (%d bytes)\n"),
- this->index_,
- this->total_w_, this->total_snd_,
- this->total_r_, this->total_rcv_));
- if (this->io_count_ != 0)
- ACE_ERROR ((LM_WARNING,
- ACE_TEXT ("(%t) Sender %d deleted with %d I/O outstanding\n"),
- this->index_,
- this->io_count_));
-
- // This test bounces data back and forth between Senders and Receivers.
- // Therefore, if there was significantly more data in one direction, that's
- // a problem. Remember, the byte counts are unsigned values.
- int issue_data_warning = 0;
- if (this->total_snd_ > this->total_rcv_)
- {
- if (this->total_rcv_ == 0)
- issue_data_warning = 1;
- else if (this->total_snd_ / this->total_rcv_ > 2)
- issue_data_warning = 1;
- }
- else
- {
- if (this->total_snd_ == 0)
- issue_data_warning = 1;
- else if (this->total_rcv_ / this->total_snd_ > 2)
- issue_data_warning = 1;
- }
- if (issue_data_warning)
- ACE_DEBUG ((LM_WARNING,
- ACE_TEXT ("(%t) Above byte counts look odd; need review\n")));
-
if (this->connector_ != 0)
this->connector_->on_delete_sender (*this);
@@ -1249,62 +1111,24 @@ Sender::cancel ()
return;
}
-void
-Sender::close ()
-{
- ACE_GUARD (ACE_SYNCH_MUTEX, monitor, this->lock_);
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("(%t) Closing Sender %d; %d I/O outstanding\n"),
- this->index_, this->io_count_));
- ACE_OS::closesocket (this->handle_);
- this->handle_ = ACE_INVALID_HANDLE;
- return;
-}
-
-
-void
-Sender::addresses (const ACE_INET_Addr& /* peer */, const ACE_INET_Addr& local)
-{
- ACE_TCHAR str[256];
- if (0 == local.addr_to_string (str, sizeof (str) / sizeof (ACE_TCHAR)))
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("(%t) Sender %d connected on %s\n"),
- this->index_,
- str));
- else
- ACE_ERROR ((LM_ERROR, ACE_TEXT ("(%t) Receiver %d %p\n"),
- this->index_,
- ACE_TEXT ("addr_to_string")));
- return;
-}
-
void
Sender::open (ACE_HANDLE handle, ACE_Message_Block &)
{
{
ACE_GUARD (ACE_SYNCH_MUTEX, monitor, this->lock_);
-
- // Don't buffer serial sends.
this->handle_ = handle;
- int nodelay = 1;
- ACE_SOCK_Stream option_setter (handle);
- if (option_setter.set_option (IPPROTO_TCP,
- TCP_NODELAY,
- &nodelay,
- sizeof (nodelay)))
- ACE_ERROR ((LM_ERROR, "%p\n", "set_option"));
// Open ACE_Asynch_Write_Stream
if (this->ws_.open (*this, this->handle_) == -1)
ACE_ERROR ((LM_ERROR,
- ACE_TEXT ("(%t) %p\n"),
+ ACE_TEXT ("%p\n"),
ACE_TEXT ("Sender::ACE_Asynch_Write_Stream::open")));
// Open ACE_Asynch_Read_Stream
else if (this->rs_.open (*this, this->handle_) == -1)
ACE_ERROR ((LM_ERROR,
- ACE_TEXT ("(%t) %p\n"),
+ ACE_TEXT ("%p\n"),
ACE_TEXT ("Sender::ACE_Asynch_Read_Stream::open")));
else if (this->initiate_write_stream () == 0)
@@ -1361,7 +1185,7 @@ Sender::initiate_write_stream (void)
{
mb1->release ();
ACE_ERROR_RETURN((LM_ERROR,
- ACE_TEXT ("(%t) %p\n"),
+ ACE_TEXT ("%p\n"),
ACE_TEXT ("Sender::ACE_Asynch_Stream::writev")),
-1);
}
@@ -1379,7 +1203,7 @@ Sender::initiate_write_stream (void)
{
mb->release ();
ACE_ERROR_RETURN((LM_ERROR,
- ACE_TEXT ("(%t) %p\n"),
+ ACE_TEXT ("%p\n"),
ACE_TEXT ("Sender::ACE_Asynch_Stream::write")),
-1);
}
@@ -1443,7 +1267,7 @@ Sender::initiate_read_stream (void)
{
mb1->release ();
ACE_ERROR_RETURN ((LM_ERROR,
- ACE_TEXT ("(%t) %p\n"),
+ ACE_TEXT ("%p\n"),
ACE_TEXT ("Sender::ACE_Asynch_Read_Stream::readv")),
-1);
}
@@ -1466,7 +1290,7 @@ Sender::initiate_read_stream (void)
{
mb->release ();
ACE_ERROR_RETURN ((LM_ERROR,
- ACE_TEXT ("(%t) %p\n"),
+ ACE_TEXT ("%p\n"),
ACE_TEXT ("Sender::ACE_Asynch_Read_Stream::read")),
-1);
}
@@ -1485,12 +1309,14 @@ Sender::handle_write_stream (const ACE_Asynch_Write_Stream::Result &result)
ACE_Message_Block & mb = result.message_block ();
- if (loglevel == 0)
+ if (loglevel == 0
+ || result.bytes_transferred () == 0
+ || result.error () != 0)
{
LogLocker log_lock;
ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("(%t) **** Sender %d: handle_write_stream() ****\n"),
+ ACE_TEXT ("**** Sender::handle_write_stream() SessionId = %d ****\n"),
index_));
ACE_DEBUG ((LM_DEBUG,
ACE_TEXT ("%s = %d\n"),
@@ -1566,31 +1392,6 @@ Sender::handle_write_stream (const ACE_Asynch_Write_Stream::Result &result)
ACE_DEBUG ((LM_DEBUG,
ACE_TEXT ("**** end of message ****************\n")));
}
- else if (result.error () != 0)
- {
- ACE_Log_Priority prio;
-#if defined (ACE_WIN32)
- if (result.error () == ERROR_OPERATION_ABORTED)
- prio = LM_DEBUG;
-#else
- if (result.error () == ECANCELED)
- prio = LM_DEBUG;
-#endif /* ACE_WIN32 */
- else
- prio = LM_ERROR;
- ACE_Log_Msg::instance ()->errnum (result.error ());
- ACE_Log_Msg::instance ()->log (prio,
- ACE_TEXT ("(%t) Sender %d; %p\n"),
- this->index_,
- ACE_TEXT ("write"));
- }
- else
- {
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("(%t) Sender %d: wrote %d bytes ok\n"),
- this->index_,
- result.bytes_transferred ()));
- }
mb.release ();
@@ -1598,11 +1399,9 @@ Sender::handle_write_stream (const ACE_Asynch_Write_Stream::Result &result)
{
this->total_snd_ += result.bytes_transferred ();
- if (duplex != 0) // full duplex, continue write
- {
- if ((this->total_snd_- this->total_rcv_) < 1024*32 ) //flow control
- this->initiate_write_stream ();
- }
+ if (duplex != 0 && // full duplex, continue write
+ (this->total_snd_- this->total_rcv_) < 1024*32 ) //flow control
+ this->initiate_write_stream ();
else // half-duplex read reply, after read we will start write
this->initiate_read_stream ();
}
@@ -1622,12 +1421,14 @@ Sender::handle_read_stream (const ACE_Asynch_Read_Stream::Result &result)
ACE_Message_Block & mb = result.message_block ();
- if (loglevel == 0)
+ if (loglevel == 0
+ || result.bytes_transferred () == 0
+ || result.error () != 0)
{
LogLocker log_lock;
ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("(%t) **** Sender %d: handle_read_stream() ****\n"),
+ ACE_TEXT ("**** Sender::handle_read_stream() SessionId = %d ****\n"),
index_));
ACE_DEBUG ((LM_DEBUG,
ACE_TEXT ("%s = %d\n"),
@@ -1686,31 +1487,6 @@ Sender::handle_read_stream (const ACE_Asynch_Read_Stream::Result &result)
ACE_DEBUG ((LM_DEBUG,
ACE_TEXT ("**** end of message ****************\n")));
}
- else if (result.error () != 0)
- {
- ACE_Log_Priority prio;
-#if defined (ACE_WIN32)
- if (result.error () == ERROR_OPERATION_ABORTED)
- prio = LM_DEBUG;
-#else
- if (result.error () == ECANCELED)
- prio = LM_DEBUG;
-#endif /* ACE_WIN32 */
- else
- prio = LM_ERROR;
- ACE_Log_Msg::instance ()->errnum (result.error ());
- ACE_Log_Msg::instance ()->log (prio,
- ACE_TEXT ("(%t) Sender %d; %p\n"),
- this->index_,
- ACE_TEXT ("read"));
- }
- else
- {
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("(%t) Sender %d: read %d bytes ok\n"),
- this->index_,
- result.bytes_transferred ()));
- }
mb.release ();
@@ -1815,22 +1591,21 @@ parse_args (int argc, ACE_TCHAR *argv[])
max_aio_operations = 512; // POSIX Proactor params
#if defined (sun)
proactor_type = SUN; // Proactor type for SunOS
- threads = 1; // aiosuspend() not MT Safe.
#else
proactor_type = DEFAULT; // Proactor type = default
- threads = 3; // size of Proactor thread pool
#endif
+ threads = 3; // size of Proactor thread pool
#if defined(__sgi) || defined (ACE_LINUX_COMMON_H)
ACE_DEBUG (( LM_DEBUG,
- "Weak AIO implementation, test will work with 3 clients"));
- senders = 3; // number of senders
+ "Weak AIO implementation, test will work with 1 client"));
+ senders = 1; // number of senders
#else
- senders = 10; // number of senders
+ senders = 20; // number of senders
#endif
loglevel = 1; // log level : 0 full/ 1 only errors
- seconds = 15; // time to run in seconds
+ seconds = 20; // time to run in seconds
return 0;
}
@@ -1931,44 +1706,29 @@ ACE_TMAIN (int argc, ACE_TCHAR *argv[])
ACE_OS::sleep (seconds);
}
- // Now close all the connector/senders. This should trip all the receivers
- // to close as well.
+ //Cancel all pending AIO on Connector and Senders
ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("(%t) Close Connector/Senders: sessions_=%d\n"),
+ ACE_TEXT ("Cancel Connector/Senders: sessions_=%d\n"),
connector.get_number_sessions ()
));
- connector.close_all ();
-
- // Wait til all the sessions run down.
- ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) Sleeping til sessions run down.\n")));
- while (acceptor.get_number_sessions () > 0 ||
- connector.get_number_sessions () > 0 )
- ACE_OS::sleep (1);
-#if 0
- // Cancel all pending AIO on Connector and Senders
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("(%t) Cancel Connector/Senders: sessions_=%d\n"),
- connector.get_number_sessions ()
- ));
- connector.cancel_all ();
-#endif
+ //connector.cancel_all ();
//Cancel all pending AIO on Acceptor And Receivers
ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("(%t) Cancel Acceptor/Receivers:sessions_=%d\n"),
+ ACE_TEXT ("Cancel Acceptor/Receivers:sessions_=%d\n"),
acceptor.get_number_sessions ()
));
- acceptor.cancel_all ();
+ //acceptor.cancel_all ();
ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("(%t) Stop Thread Pool Task\n")
+ ACE_TEXT ("Stop Thread Pool Task\n")
));
task1.stop ();
// As Proactor event loop now is inactive it is safe to destroy all
// Senders
ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("(%t) Stop Connector/Senders: sessions_=%d\n"),
+ ACE_TEXT ("Stop Connector/Senders: sessions_=%d\n"),
connector.get_number_sessions ()
));
connector.stop ();
@@ -1976,7 +1736,7 @@ ACE_TMAIN (int argc, ACE_TCHAR *argv[])
// As Proactor event loop now is inactive it is safe to destroy all
// Receivers
ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("(%t) Stop Acceptor/Receivers:sessions_=%d\n"),
+ ACE_TEXT ("Stop Acceptor/Receivers:sessions_=%d\n"),
acceptor.get_number_sessions ()
));
acceptor.stop ();