summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorbala <balanatarajan@users.noreply.github.com>2002-05-13 13:38:54 +0000
committerbala <balanatarajan@users.noreply.github.com>2002-05-13 13:38:54 +0000
commite42bec044591a5e44f33d37da5dc2fe70b68942f (patch)
treec15e78a47ed924e77809291a3ba48253037fc19b
parent49d9287688e072454b63fead8feaa6109ae0e510 (diff)
downloadATCD-e42bec044591a5e44f33d37da5dc2fe70b68942f.tar.gz
ChangeLogTag: Mon May 13 08:36:38 2002 Balachandran Natarajan <bala@cs.wustl.edu>
-rw-r--r--ChangeLog17
-rw-r--r--ChangeLogs/ChangeLog-02a17
-rw-r--r--ChangeLogs/ChangeLog-03a17
-rw-r--r--tests/Proactor_Test.cpp200
4 files changed, 185 insertions, 66 deletions
diff --git a/ChangeLog b/ChangeLog
index 4a8f4cc9a93..7f261bd017b 100644
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,3 +1,20 @@
+Mon May 13 08:36:38 2002 Balachandran Natarajan <bala@cs.wustl.edu>
+
+ * tests/Proactor_Test.cpp:
+
+ Improved flow-control in full-duplex mode by increasing the
+ window size, total_send - total_recv to achieve max performance
+ from TCP/IP
+
+ Improved static info which includes the number of bytes
+ sent/recv and the number of read/write operations. This allows
+ to see distribution of I/O operations between handlers. Would be
+ useful for Proactor's future improvements.
+
+ Impoved scatter/gather branch under Win32. Instead of splitting
+ one message into 3 small blocks we now send them as a
+ chain. This helps to improve performance.
+
Mon May 13 07:33:43 2002 Chad Elliott <elliott_c@ociweb.com>
* ACEXML/examples/SAXPrint/main.cpp:
diff --git a/ChangeLogs/ChangeLog-02a b/ChangeLogs/ChangeLog-02a
index 4a8f4cc9a93..7f261bd017b 100644
--- a/ChangeLogs/ChangeLog-02a
+++ b/ChangeLogs/ChangeLog-02a
@@ -1,3 +1,20 @@
+Mon May 13 08:36:38 2002 Balachandran Natarajan <bala@cs.wustl.edu>
+
+ * tests/Proactor_Test.cpp:
+
+ Improved flow-control in full-duplex mode by increasing the
+ window size, total_send - total_recv to achieve max performance
+ from TCP/IP
+
+ Improved static info which includes the number of bytes
+ sent/recv and the number of read/write operations. This allows
+ to see distribution of I/O operations between handlers. Would be
+ useful for Proactor's future improvements.
+
+ Impoved scatter/gather branch under Win32. Instead of splitting
+ one message into 3 small blocks we now send them as a
+ chain. This helps to improve performance.
+
Mon May 13 07:33:43 2002 Chad Elliott <elliott_c@ociweb.com>
* ACEXML/examples/SAXPrint/main.cpp:
diff --git a/ChangeLogs/ChangeLog-03a b/ChangeLogs/ChangeLog-03a
index 4a8f4cc9a93..7f261bd017b 100644
--- a/ChangeLogs/ChangeLog-03a
+++ b/ChangeLogs/ChangeLog-03a
@@ -1,3 +1,20 @@
+Mon May 13 08:36:38 2002 Balachandran Natarajan <bala@cs.wustl.edu>
+
+ * tests/Proactor_Test.cpp:
+
+ Improved flow-control in full-duplex mode by increasing the
+ window size, total_send - total_recv to achieve max performance
+ from TCP/IP
+
+ Improved static info which includes the number of bytes
+ sent/recv and the number of read/write operations. This allows
+ to see distribution of I/O operations between handlers. Would be
+ useful for Proactor's future improvements.
+
+ Impoved scatter/gather branch under Win32. Instead of splitting
+ one message into 3 small blocks we now send them as a
+ chain. This helps to improve performance.
+
Mon May 13 07:33:43 2002 Chad Elliott <elliott_c@ociweb.com>
* ACEXML/examples/SAXPrint/main.cpp:
diff --git a/tests/Proactor_Test.cpp b/tests/Proactor_Test.cpp
index e9966371ae3..6ca4ce243c5 100644
--- a/tests/Proactor_Test.cpp
+++ b/tests/Proactor_Test.cpp
@@ -88,21 +88,6 @@ static const size_t MIN_TIME = 1; // min 1 sec
static const size_t MAX_TIME = 3600; // max 1 hour
static size_t seconds = 2; // default time to run - 2 seconds
-#if (defined (ACE_WIN32) && !defined (ACE_HAS_WINCE))
-static ACE_TCHAR request_line[] =
- ACE_TEXT ("GET / HTTP/1.1\r\n");
-
-static ACE_TCHAR headers[] =
- ACE_TEXT ("Accept: */*\r\n")
- ACE_TEXT ("Accept-Language: C++\r\n")
- ACE_TEXT ("Accept-Encoding: gzip, deflate\r\n")
- ACE_TEXT ("User-Agent: Proactor_Test/1.0 (non-compatible)\r\n")
- ACE_TEXT ("Connection: Keep-Alive\r\n");
-
-static ACE_TCHAR end_of_request_header[] =
- ACE_TEXT ("\r\n");
-#endif /* (defined (ACE_WIN32) && !defined (ACE_HAS_WINCE)) */
-
static ACE_TCHAR complete_message[] =
ACE_TEXT ("GET / HTTP/1.1\r\n")
ACE_TEXT ("Accept: */*\r\n")
@@ -172,7 +157,11 @@ class MyTask : public ACE_Task<ACE_MT_SYNCH>
public:
MyTask (void): lock_ (), sem_ (0), proactor_(0) {}
- virtual ~MyTask() { (void) this->stop (); }
+ virtual ~MyTask()
+ {
+ (void) this->stop ();
+ (void) this->delete_proactor();
+ }
virtual int svc (void);
@@ -207,9 +196,9 @@ MyTask::create_proactor (ProactorType type_proactor, size_t max_op)
ACE_UNUSED_ARG (type_proactor);
ACE_UNUSED_ARG (max_op);
- ACE_WIN32_Proactor *proactor = 0;
+ ACE_WIN32_Proactor *proactor_impl = 0;
- ACE_NEW_RETURN (proactor,
+ ACE_NEW_RETURN (proactor_impl,
ACE_WIN32_Proactor,
-1);
@@ -218,12 +207,12 @@ MyTask::create_proactor (ProactorType type_proactor, size_t max_op)
#elif defined (ACE_HAS_AIO_CALLS)
- ACE_POSIX_Proactor * proactor = 0;
+ ACE_POSIX_Proactor * proactor_impl = 0;
switch (type_proactor)
{
case AIOCB:
- ACE_NEW_RETURN (proactor,
+ ACE_NEW_RETURN (proactor_impl,
ACE_POSIX_AIOCB_Proactor (max_op),
-1);
ACE_DEBUG ((LM_DEBUG,
@@ -231,7 +220,7 @@ MyTask::create_proactor (ProactorType type_proactor, size_t max_op)
break;
case SIG:
- ACE_NEW_RETURN (proactor,
+ ACE_NEW_RETURN (proactor_impl,
ACE_POSIX_SIG_Proactor (max_op),
-1);
ACE_DEBUG ((LM_DEBUG,
@@ -240,7 +229,7 @@ MyTask::create_proactor (ProactorType type_proactor, size_t max_op)
# if defined (sun)
case SUN:
- ACE_NEW_RETURN (proactor,
+ ACE_NEW_RETURN (proactor_impl,
ACE_SUN_Proactor (max_op),
-1);
ACE_DEBUG ((LM_DEBUG,
@@ -250,7 +239,7 @@ MyTask::create_proactor (ProactorType type_proactor, size_t max_op)
# if defined (__sgi)
case CB:
- ACE_NEW_RETURN (proactor,
+ ACE_NEW_RETURN (proactor_impl,
ACE_POSIX_CB_Proactor (max_op),
-1);
ACE_DEBUG ((LM_DEBUG,
@@ -266,8 +255,9 @@ MyTask::create_proactor (ProactorType type_proactor, size_t max_op)
#endif // (ACE_WIN32) && !defined (ACE_HAS_WINCE)
+ // always delete implementation 1 , not !(proactor_impl == 0)
ACE_NEW_RETURN (this->proactor_,
- ACE_Proactor (proactor, !(proactor == 0)),
+ ACE_Proactor (proactor_impl, 1 ),
-1);
ACE_Proactor::instance (this->proactor_);
@@ -334,10 +324,11 @@ MyTask::stop ()
ACE_TEXT ("%p.\n"),
ACE_TEXT ("unable to stop thread pool")));
- if (this->delete_proactor () == -1)
- ACE_ERROR ((LM_ERROR,
- ACE_TEXT ("%p.\n"),
- ACE_TEXT ("unable to delete proactor")));
+ // Right choice - to delete proactor in destuctor
+ //if (this->delete_proactor () == -1)
+ // ACE_ERROR ((LM_ERROR,
+ // ACE_TEXT ("%p.\n"),
+ // ACE_TEXT ("unable to delete proactor")));
return 0;
}
@@ -373,6 +364,8 @@ public:
long get_total_snd (void) { return this->total_snd_; }
long get_total_rcv (void) { return this->total_rcv_; }
+ long get_total_w (void) { return this->total_w_; }
+ long get_total_r (void) { return this->total_r_; }
/// This is called after the new connection has been accepted.
virtual void open (ACE_HANDLE handle,
@@ -409,6 +402,8 @@ private:
int flg_cancel_;
long total_snd_;
long total_rcv_;
+ long total_w_;
+ long total_r_;
};
class Acceptor : public ACE_Asynch_Acceptor<Receiver>
@@ -418,6 +413,8 @@ public:
int get_number_sessions (void) { return this->sessions_; }
long get_total_snd (void) { return this->total_snd_; }
long get_total_rcv (void) { return this->total_rcv_; }
+ long get_total_w (void) { return this->total_w_; }
+ long get_total_r (void) { return this->total_r_; }
Acceptor (void);
virtual ~Acceptor (void);
@@ -437,13 +434,17 @@ private:
Receiver *list_receivers_[MAX_RECEIVERS];
long total_snd_;
long total_rcv_;
+ long total_w_;
+ long total_r_;
};
// *************************************************************
Acceptor::Acceptor (void)
: sessions_ (0),
total_snd_(0),
- total_rcv_(0)
+ total_rcv_(0),
+ total_w_ (0),
+ total_r_ (0)
{
ACE_GUARD (ACE_SYNCH_RECURSIVE_MUTEX, monitor, this->lock_);
@@ -511,6 +512,8 @@ Acceptor::on_delete_receiver (Receiver & rcvr)
this->total_snd_ += rcvr.get_total_snd();
this->total_rcv_ += rcvr.get_total_rcv();
+ this->total_w_ += rcvr.get_total_w();
+ this->total_r_ += rcvr.get_total_r();
if (rcvr.index_ >= 0
&& rcvr.index_ < MAX_RECEIVERS
@@ -520,8 +523,13 @@ Acceptor::on_delete_receiver (Receiver & rcvr)
ACE_TCHAR bufs [256];
ACE_TCHAR bufr [256];
- ACE_OS::sprintf ( bufs , ACE_TEXT ("%ld"), rcvr.get_total_snd ());
- ACE_OS::sprintf ( bufr , ACE_TEXT ("%ld"), rcvr.get_total_rcv ());
+ ACE_OS::sprintf ( bufs , ACE_TEXT ("%ld(%ld)"),
+ rcvr.get_total_snd(),
+ rcvr.get_total_w() );
+
+ ACE_OS::sprintf ( bufr , ACE_TEXT ("%ld(%ld)"),
+ rcvr.get_total_rcv(),
+ rcvr.get_total_r() );
ACE_DEBUG ((LM_DEBUG,
ACE_TEXT ("Receiver::~DTOR index=%d snd=%s rcv=%s sessions_=%d\n"),
@@ -560,7 +568,9 @@ Receiver::Receiver (Acceptor * acceptor, int index)
io_count_ (0),
flg_cancel_(0),
total_snd_(0),
- total_rcv_(0)
+ total_rcv_(0),
+ total_w_ (0),
+ total_r_ (0)
{
if (this->acceptor_ != 0)
this->acceptor_->on_new_receiver (*this);
@@ -637,6 +647,7 @@ Receiver::initiate_read_stream (void)
}
this->io_count_++;
+ this->total_r_++;
return 0;
}
@@ -667,6 +678,7 @@ Receiver::initiate_write_stream (ACE_Message_Block &mb, int nbytes)
}
this->io_count_++;
+ this->total_w_++;
return 0;
}
@@ -840,6 +852,8 @@ public:
long get_total_snd (void) { return this->total_snd_; }
long get_total_rcv (void) { return this->total_rcv_; }
+ long get_total_w (void) { return this->total_w_; }
+ long get_total_r (void) { return this->total_r_; }
virtual void handle_read_stream (const ACE_Asynch_Read_Stream::Result &result);
// This is called when asynchronous reads from the socket complete
@@ -865,6 +879,8 @@ private:
int flg_cancel_;
long total_snd_;
long total_rcv_;
+ long total_w_;
+ long total_r_;
};
class Connector : public ACE_Asynch_Connector<Sender>
@@ -874,6 +890,8 @@ public:
int get_number_sessions (void) { return this->sessions_; }
long get_total_snd (void) { return this->total_snd_; }
long get_total_rcv (void) { return this->total_rcv_; }
+ long get_total_w (void) { return this->total_w_; }
+ long get_total_r (void) { return this->total_r_; }
Connector (void);
virtual ~Connector (void);
@@ -894,6 +912,8 @@ private:
Sender *list_senders_[MAX_SENDERS];
long total_snd_;
long total_rcv_;
+ long total_w_;
+ long total_r_;
};
// *************************************************************
@@ -901,7 +921,9 @@ private:
Connector::Connector (void)
: sessions_ (0),
total_snd_(0),
- total_rcv_(0)
+ total_rcv_(0),
+ total_w_ (0),
+ total_r_ (0)
{
ACE_GUARD (ACE_SYNCH_RECURSIVE_MUTEX, monitor, this->lock_);
@@ -966,6 +988,8 @@ Connector::on_delete_sender (Sender &sndr)
this->sessions_--;
this->total_snd_ += sndr.get_total_snd();
this->total_rcv_ += sndr.get_total_rcv();
+ this->total_w_ += sndr.get_total_w();
+ this->total_r_ += sndr.get_total_r();
if (sndr.index_ >= 0
&& sndr.index_ < MAX_SENDERS
@@ -975,8 +999,13 @@ Connector::on_delete_sender (Sender &sndr)
ACE_TCHAR bufs [256];
ACE_TCHAR bufr [256];
- ACE_OS::sprintf ( bufs , ACE_TEXT ("%ld"), sndr.get_total_snd ());
- ACE_OS::sprintf ( bufr , ACE_TEXT ("%ld"), sndr.get_total_rcv ());
+ ACE_OS::sprintf ( bufs , ACE_TEXT ("%ld(%ld)"),
+ sndr.get_total_snd(),
+ sndr.get_total_w() );
+
+ ACE_OS::sprintf ( bufr , ACE_TEXT ("%ld(%ld)"),
+ sndr.get_total_rcv(),
+ sndr.get_total_r() );
ACE_DEBUG ((LM_DEBUG,
ACE_TEXT ("Sender::~DTOR index=%d snd=%s rcv=%s sessions_=%d\n"),
@@ -1056,7 +1085,9 @@ Sender::Sender (Connector * connector, int index)
io_count_ (0),
flg_cancel_(0),
total_snd_ (0),
- total_rcv_ (0)
+ total_rcv_ (0),
+ total_w_ (0),
+ total_r_ (0)
{
if (this->connector_ != 0)
this->connector_->on_new_sender (*this);
@@ -1125,25 +1156,30 @@ Sender::initiate_write_stream (void)
if (this->flg_cancel_ != 0 || this->handle_ == ACE_INVALID_HANDLE)
return -1;
+ static const size_t complete_message_length = ACE_OS::strlen (complete_message);
+
#if (defined (ACE_WIN32) && !defined (ACE_HAS_WINCE))
- static const size_t request_line_length = ACE_OS::strlen (request_line);
- static const size_t headers_length = ACE_OS::strlen (headers);
- static const size_t end_of_request_header_length = ACE_OS::strlen (end_of_request_header);
ACE_Message_Block *mb1 = 0,
*mb2 = 0,
*mb3 = 0;
// No need to allocate +1 for proper printing - the memory includes it already
- ACE_NEW_RETURN (mb1, ACE_Message_Block ((char *) request_line,
- request_line_length), -1);
- mb1->wr_ptr (request_line_length);
- ACE_NEW_RETURN (mb2, ACE_Message_Block ((char *) headers,
- headers_length), -1);
- mb2->wr_ptr (headers_length);
- ACE_NEW_RETURN (mb3, ACE_Message_Block ((char *) end_of_request_header,
- end_of_request_header_length), -1);
- mb3->wr_ptr (end_of_request_header_length);
+ ACE_NEW_RETURN (mb1,
+ ACE_Message_Block (complete_message, complete_message_length),
+ -1);
+
+ ACE_NEW_RETURN (mb2,
+ ACE_Message_Block (complete_message, complete_message_length),
+ -1);
+
+ ACE_NEW_RETURN (mb3,
+ ACE_Message_Block (complete_message, complete_message_length),
+ -1);
+
+ mb1->wr_ptr (complete_message_length);
+ mb2->wr_ptr (complete_message_length);
+ mb3->wr_ptr (complete_message_length);
// chain them together
mb1->cont (mb2);
@@ -1158,7 +1194,6 @@ Sender::initiate_write_stream (void)
-1);
}
#else /* (defined (ACE_WIN32) && !defined (ACE_HAS_WINCE)) */
- static const size_t complete_message_length = ACE_OS::strlen (complete_message);
ACE_Message_Block *mb = 0;
@@ -1179,6 +1214,7 @@ Sender::initiate_write_stream (void)
#endif /* (defined (ACE_WIN32) && !defined (ACE_HAS_WINCE)) */
this->io_count_++;
+ this->total_w_++;
return 0;
}
@@ -1188,30 +1224,48 @@ Sender::initiate_read_stream (void)
if (this->flg_cancel_ != 0 || this->handle_ == ACE_INVALID_HANDLE)
return -1;
-#if (defined (ACE_WIN32) && !defined (ACE_HAS_WINCE))
- static const size_t request_line_length = ACE_OS::strlen (request_line);
- static const size_t headers_length = ACE_OS::strlen (headers);
- static const size_t end_of_request_header_length = ACE_OS::strlen (end_of_request_header);
+ static const size_t complete_message_length = ACE_OS::strlen (complete_message);
+#if (defined (ACE_WIN32) && !defined (ACE_HAS_WINCE))
ACE_Message_Block *mb1 = 0,
*mb2 = 0,
- *mb3 = 0;
+ *mb3 = 0,
+ *mb4 = 0,
+ *mb5 = 0,
+ *mb6 = 0;
// We allocate +1 only for proper printing - we can just set the last byte
// to '\0' before printing out
- ACE_NEW_RETURN (mb1, ACE_Message_Block (request_line_length + 1), -1);
- ACE_NEW_RETURN (mb2, ACE_Message_Block (headers_length + 1), -1);
- ACE_NEW_RETURN (mb3, ACE_Message_Block (end_of_request_header_length + 1), -1);
+ ACE_NEW_RETURN (mb1, ACE_Message_Block (complete_message_length + 1), -1);
+ ACE_NEW_RETURN (mb2, ACE_Message_Block (complete_message_length + 1), -1);
+ ACE_NEW_RETURN (mb3, ACE_Message_Block (complete_message_length + 1), -1);
+
+ // Let allocate memory for one more triplet,
+ // This improves performance
+ // as we can receive more the than one block at once
+ // Generally, we can receive more triplets ....
+ ACE_NEW_RETURN (mb4, ACE_Message_Block (complete_message_length + 1), -1);
+ ACE_NEW_RETURN (mb5, ACE_Message_Block (complete_message_length + 1), -1);
+ ACE_NEW_RETURN (mb6, ACE_Message_Block (complete_message_length + 1), -1);
mb1->cont (mb2);
mb2->cont (mb3);
+ mb3->cont (mb4);
+ mb4->cont (mb5);
+ mb5->cont (mb6);
+
+
// hide last byte in each message block, reserving it for later to set '\0'
// for proper printouts
mb1->size (mb1->size () - 1);
mb2->size (mb2->size () - 1);
mb3->size (mb3->size () - 1);
+ mb4->size (mb4->size () - 1);
+ mb5->size (mb5->size () - 1);
+ mb6->size (mb6->size () - 1);
+
// Inititiate read
if (this->rs_.readv (*mb1, mb1->total_size () - 1) == -1)
{
@@ -1222,14 +1276,17 @@ Sender::initiate_read_stream (void)
-1);
}
#else /* (defined (ACE_WIN32) && !defined (ACE_HAS_WINCE)) */
- static const size_t complete_message_length = ACE_OS::strlen (complete_message);
+
+ // Try to read more chunks
+ size_t blksize = ( complete_message_length > BUFSIZ ) ?
+ complete_message_length : BUFSIZ;
ACE_Message_Block *mb = 0;
// We allocate +1 only for proper printing - we can just set the last byte
// to '\0' before printing out
ACE_NEW_RETURN (mb,
- ACE_Message_Block (complete_message_length + 1)
+ ACE_Message_Block (blksize + 1)
, -1);
// Inititiate read
@@ -1244,6 +1301,7 @@ Sender::initiate_read_stream (void)
#endif /* (defined (ACE_WIN32) && !defined (ACE_HAS_WINCE)) */
this->io_count_++;
+ this->total_r_++;
return 0;
}
@@ -1346,7 +1404,7 @@ Sender::handle_write_stream (const ACE_Asynch_Write_Stream::Result &result)
this->total_snd_ += result.bytes_transferred ();
if (duplex != 0 && // full duplex, continue write
- (this->total_snd_- this->total_rcv_) < 1024 ) //flow control
+ (this->total_snd_- this->total_rcv_) < 1024*32 ) //flow control
this->initiate_write_stream ();
else // half-duplex read reply, after read we will start write
this->initiate_read_stream ();
@@ -1649,14 +1707,14 @@ ACE_TMAIN (int argc, ACE_TCHAR *argv[])
ACE_TEXT ("Cancel Connector/Senders: sessions_=%d\n"),
connector.get_number_sessions ()
));
- connector.cancel_all ();
+ //connector.cancel_all ();
//Cancel all pending AIO on Acceptor And Receivers
ACE_DEBUG ((LM_DEBUG,
ACE_TEXT ("Cancel Acceptor/Receivers:sessions_=%d\n"),
acceptor.get_number_sessions ()
));
- acceptor.cancel_all ();
+ //acceptor.cancel_all ();
ACE_DEBUG ((LM_DEBUG,
ACE_TEXT ("Stop Thread Pool Task\n")
@@ -1683,8 +1741,13 @@ ACE_TMAIN (int argc, ACE_TCHAR *argv[])
ACE_TCHAR bufs [256];
ACE_TCHAR bufr [256];
- ACE_OS::sprintf (bufs , ACE_TEXT ("%ld"), connector.get_total_snd());
- ACE_OS::sprintf (bufr , ACE_TEXT ("%ld"), connector.get_total_rcv() );
+ ACE_OS::sprintf ( bufs , ACE_TEXT ("%ld(%ld)"),
+ connector.get_total_snd(),
+ connector.get_total_w() );
+
+ ACE_OS::sprintf ( bufr , ACE_TEXT ("%ld(%ld)"),
+ connector.get_total_rcv(),
+ connector.get_total_r() );
ACE_DEBUG ((LM_DEBUG,
ACE_TEXT ("Connector/Senders total bytes: snd=%s rcv=%s\n"),
@@ -1692,8 +1755,13 @@ ACE_TMAIN (int argc, ACE_TCHAR *argv[])
bufr
));
- ACE_OS::sprintf (bufs , ACE_TEXT ("%ld"), acceptor.get_total_snd());
- ACE_OS::sprintf (bufr , ACE_TEXT ("%ld"), acceptor.get_total_rcv() );
+ ACE_OS::sprintf ( bufs , ACE_TEXT ("%ld(%ld)"),
+ acceptor.get_total_snd(),
+ acceptor.get_total_w() );
+
+ ACE_OS::sprintf ( bufr , ACE_TEXT ("%ld(%ld)"),
+ acceptor.get_total_rcv(),
+ acceptor.get_total_r() );
ACE_DEBUG ((LM_DEBUG,
ACE_TEXT ("Acceptor/Receivers total bytes: snd=%s rcv=%s\n"),