summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorcoryan <coryan@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>2001-02-12 18:23:32 +0000
committercoryan <coryan@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>2001-02-12 18:23:32 +0000
commit52a75283d2a03b8ac4be54000614c611685604c7 (patch)
tree4b82492c75fbb801070c482fdc1a759a51f3537b
parent71961c9812e2f01a9842b0109de5a988bf1aca25 (diff)
downloadATCD-52a75283d2a03b8ac4be54000614c611685604c7.tar.gz
ChangeLogTag:Mon Feb 12 10:15:47 2001 Carlos O'Ryan <coryan@uci.edu>
-rw-r--r--TAO/ChangeLogs/ChangeLog-02a96
-rw-r--r--TAO/tao/GIOP_Message_Base.cpp7
-rw-r--r--TAO/tao/IIOP_Transport.cpp15
-rw-r--r--TAO/tao/IIOP_Transport.h7
-rw-r--r--TAO/tao/Queued_Message.cpp8
-rw-r--r--TAO/tao/Transport.cpp257
-rw-r--r--TAO/tao/Transport.h30
7 files changed, 179 insertions, 241 deletions
diff --git a/TAO/ChangeLogs/ChangeLog-02a b/TAO/ChangeLogs/ChangeLog-02a
index 16d4123403c..a6d4ffad94d 100644
--- a/TAO/ChangeLogs/ChangeLog-02a
+++ b/TAO/ChangeLogs/ChangeLog-02a
@@ -1,19 +1,37 @@
+Mon Feb 12 10:15:47 2001 Carlos O'Ryan <coryan@uci.edu>
+
+ * tao/Transport.h:
+ * tao/Transport.cpp:
+ Add new method to send a message block chain.
+ The template method to send data takes an iovec argument.
+
+ * tao/IIOP_Transport.h:
+ * tao/IIOP_Transport.cpp:
+ Implement the iovec-based send() template method.
+
+ * tao/Queued_Message.cpp:
+ The cleanup code was broken.
+
+ * tao/GIOP_Message_Base.cpp:
+ Use the message block chain method to send short critical
+ messages.
+
Fri Feb 09 10:50:47 2001 Carlos O'Ryan <coryan@uci.edu>
* tao/Queued_Message.h:
* tao/Queued_Message.inl:
* tao/Queued_Message.cpp:
- Add new class to represent a queued message in the outgoing
- path. This class has to keep more than just the message block,
- it also takes care of timeouts and signaling any waiting thread
- when the message is sent.
+ Add new class to represent a queued message in the outgoing
+ path. This class has to keep more than just the message block,
+ it also takes care of timeouts and signaling any waiting thread
+ when the message is sent.
* tao/Message_Sent_Callback.h:
* tao/Message_Sent_Callback.inl:
* tao/Message_Sent_Callback.cpp:
- Define interface to signal threads waiting for a message to be
- sent out, e.g. twoway requests blocked waiting for a queued
- message.
+ Define interface to signal threads waiting for a message to be
+ sent out, e.g. twoway requests blocked waiting for a queued
+ message.
* tao/Flushing_Strategy.h:
* tao/Flushing_Strategy.cpp:
@@ -21,40 +39,40 @@ Fri Feb 09 10:50:47 2001 Carlos O'Ryan <coryan@uci.edu>
* tao/Block_Flushing_Strategy.cpp:
* tao/Reactive_Flushing_Strategy.h
* tao/Reactive_Flushing_Strategy.cpp:
- New classes to control how the outgoing data is flushed,
- either by blocking on write() or by using the reactor.
+ New classes to control how the outgoing data is flushed,
+ either by blocking on write() or by using the reactor.
- * tao/Resource_Factory.h:
+ * tao/Resource_Factory.h:
* tao/default_resource.h:
* tao/default_resource.cpp:
- Add new methods to create the flushing strategy.
+ Add new methods to create the flushing strategy.
* tao/ORB_Core.h:
* tao/ORB_Core.i:
* tao/ORB_Core.cpp:
- Add accessor for the flushing strategy. Notice that the
- strategy is stateless so a single instance (per-ORB) is needed.
+ Add accessor for the flushing strategy. Notice that the
+ strategy is stateless so a single instance (per-ORB) is needed.
* tao/Sync_Strategies.h:
* tao/Sync_Strategies.cpp:
- The Sync_Strategies have been simplified. They are now
- stateless, and they only need to answer a couple of questions
- (1) should a message be buffered, (2) should the ORB flush a
- queue.
+ The Sync_Strategies have been simplified. They are now
+ stateless, and they only need to answer a couple of questions
+ (1) should a message be buffered, (2) should the ORB flush a
+ queue.
* performance-tests/Latency/st_client.cpp:
- Destroy the ORB on shutdown.
+ Destroy the ORB on shutdown.
* tao/TAO.dsp:
* tao/TAO_Static.dsp:
- Add the new files to the TAO project files.
+ Add the new files to the TAO project files.
* tao/Transport.h:
* tao/Transport.inl:
* tao/Transport.cpp:
- Move much of the functionality of sending and outgoing message
- queue up to the base transport class. Remove a lot of code
- deailing with the previous (blocking) queues.
+ Move much of the functionality of sending and outgoing message
+ queue up to the base transport class. Remove a lot of code
+ deailing with the previous (blocking) queues.
* tao/IIOP_Transport.cpp:
* tao/IIOP_Connection_Handler.cpp:
@@ -64,18 +82,18 @@ Fri Feb 09 10:50:47 2001 Carlos O'Ryan <coryan@uci.edu>
* tao/Strategies/UIOP_Connection_Handler.cpp:
* orbsvcs/orbsvcs/SSLIOP/SSLIOP_Transport.cpp:
* orbsvcs/orbsvcs/SSLIOP/SSLIOP_Connection_Handler.cpp:
- Change the transport and connection handlers to use the new
- outgoing message queue, the flushing strategy and the changes to
- the SyncStrategy.
+ Change the transport and connection handlers to use the new
+ outgoing message queue, the flushing strategy and the changes to
+ the SyncStrategy.
* tao/GIOP_Message_Base.cpp:
- Propagate a few interface changes.
+ Propagate a few interface changes.
* tests/LongWrites/LongWrites.dsw:
* tests/LongWrites/client.dsp:
* tests/LongWrites/server.dsp:
* tests/LongWrites/run_test.pl:
- Got the test to compile (and run) under NT.
+ Got the test to compile (and run) under NT.
Wed Feb 7 14:31:48 2001 Jeff Parsons <parsons@cs.wustl.edu>
@@ -94,18 +112,18 @@ Wed Feb 7 14:31:48 2001 Jeff Parsons <parsons@cs.wustl.edu>
Wed Feb 7 12:57:35 2001 Frank Hunleth <fhunleth@cs.wustl.edu>
- * tao/Acceptor_Registry.cpp:
- Fixed redundant call to strtok_r in open_i when a protocol is
- specified without an endpoint. This was causing core dumps on
- Linux platforms. Thanks to Christian Ewald
- <christian.ewald@zuehlke.com> for reporting this.
+ * tao/Acceptor_Registry.cpp:
+ Fixed redundant call to strtok_r in open_i when a protocol is
+ specified without an endpoint. This was causing core dumps on
+ Linux platforms. Thanks to Christian Ewald
+ <christian.ewald@zuehlke.com> for reporting this.
Tue Feb 6 22:39:06 2001 Frank Hunleth <fhunleth@cs.wustl.edu>
- * tao/Connection_Cache_Manager.cpp:
- Fixed core dump in close_i () that was being caused by
- unbinding hash map entries while iterating over the
- hash map.
+ * tao/Connection_Cache_Manager.cpp:
+ Fixed core dump in close_i () that was being caused by
+ unbinding hash map entries while iterating over the
+ hash map.
Tue Feb 06 15:49:41 2001 Carlos O'Ryan <coryan@uci.edu>
@@ -117,9 +135,9 @@ Tue Feb 06 15:49:41 2001 Carlos O'Ryan <coryan@uci.edu>
Tue Feb 6 16:34:16 2001 Yamuna Krishnamurthy <yamuna@cs.wustl.edu>
- * orbsvcs/tests/AVStreams/Multicast/ftp.cpp:
-
- Fixed compile warnings on SunCC42.
+ * orbsvcs/tests/AVStreams/Multicast/ftp.cpp:
+
+ Fixed compile warnings on SunCC42.
Tue Feb 6 10:20:02 2001 Jeff Parsons <parsons@cs.wustl.edu>
diff --git a/TAO/tao/GIOP_Message_Base.cpp b/TAO/tao/GIOP_Message_Base.cpp
index de84da9427f..ab0dd9677a9 100644
--- a/TAO/tao/GIOP_Message_Base.cpp
+++ b/TAO/tao/GIOP_Message_Base.cpp
@@ -876,7 +876,8 @@ TAO_GIOP_Message_Base::send_error (TAO_Transport *transport)
ACE_Message_Block message_block(&data_block);
message_block.wr_ptr (TAO_GIOP_MESSAGE_HEADER_LEN);
- int result = transport->send (&message_block);
+ size_t bt;
+ int result = transport->send_message_block_chain (&message_block, bt);
if (result == -1)
{
if (TAO_debug_level > 0)
@@ -1001,7 +1002,9 @@ TAO_GIOP_Message_Base::
ACE_Message_Block message_block(&data_block);
message_block.wr_ptr (TAO_GIOP_MESSAGE_HEADER_LEN);
- if (transport->send (&message_block) == -1)
+ size_t bt;
+ int result = transport->send_message_block_chain (&message_block, bt);
+ if (result == -1)
{
if (TAO_orbdebug)
ACE_ERROR ((LM_ERROR,
diff --git a/TAO/tao/IIOP_Transport.cpp b/TAO/tao/IIOP_Transport.cpp
index 25de3704e1f..858ce766fee 100644
--- a/TAO/tao/IIOP_Transport.cpp
+++ b/TAO/tao/IIOP_Transport.cpp
@@ -86,13 +86,16 @@ TAO_IIOP_Transport::idle (void)
}
ssize_t
-TAO_IIOP_Transport::send (const ACE_Message_Block *message_block,
- const ACE_Time_Value *max_wait_time,
- size_t *bytes_transferred)
+TAO_IIOP_Transport::send (iovec *iov, int iovcnt,
+ size_t &bytes_transferred,
+ const ACE_Time_Value *max_wait_time)
{
- return ACE::send (this->handle (),
- message_block,
- bytes_transferred);
+ ssize_t retval = this->service_handler ()->peer ().sendv (iov, iovcnt,
+ max_wait_time);
+ if (retval > 0)
+ bytes_transferred = retval;
+
+ return retval;
}
ssize_t
diff --git a/TAO/tao/IIOP_Transport.h b/TAO/tao/IIOP_Transport.h
index e28be5436a8..df20d162ab5 100644
--- a/TAO/tao/IIOP_Transport.h
+++ b/TAO/tao/IIOP_Transport.h
@@ -75,10 +75,9 @@ public:
virtual int idle (void);
/// Write the complete Message_Block chain to the connection.
- virtual ssize_t send (const ACE_Message_Block *mblk,
- const ACE_Time_Value *s = 0,
- size_t *bytes_transferred = 0);
-
+ virtual ssize_t send (iovec *iov, int iovcnt,
+ size_t &bytes_transferred,
+ const ACE_Time_Value *timeout = 0);
/// Read len bytes from into buf.
virtual ssize_t recv (char *buf,
diff --git a/TAO/tao/Queued_Message.cpp b/TAO/tao/Queued_Message.cpp
index f3d4a2ed8f8..d2613ca902b 100644
--- a/TAO/tao/Queued_Message.cpp
+++ b/TAO/tao/Queued_Message.cpp
@@ -21,7 +21,13 @@ TAO_Queued_Message::TAO_Queued_Message (ACE_Message_Block *contents,
TAO_Queued_Message::~TAO_Queued_Message (void)
{
- ACE_Message_Block::release (this->contents_);
+ ACE_Message_Block *i = this->contents_;
+ while (i != 0)
+ {
+ ACE_Message_Block *cont = i->cont (); i->cont (0);
+ ACE_Message_Block::release (i);
+ i = cont;
+ }
}
void
diff --git a/TAO/tao/Transport.cpp b/TAO/tao/Transport.cpp
index 94beb0d4f9b..d3ca90cee6f 100644
--- a/TAO/tao/Transport.cpp
+++ b/TAO/tao/Transport.cpp
@@ -51,169 +51,20 @@ TAO_Transport::~TAO_Transport (void)
// delete this->buffering_queue_;
- for (TAO_Queued_Message *i = this->head_; i != 0; i = i->next ())
+ TAO_Queued_Message *i = this->head_;
+ while (i != 0)
{
// @@ This is a good point to insert a flag to indicate that a
// CloseConnection message was successfully received.
i->connection_closed ();
- i->destroy ();
- }
-}
-
-#if 0
-ssize_t
-TAO_Transport::send_or_buffer (TAO_Stub *stub,
- int two_way,
- const ACE_Message_Block *message_block,
- const ACE_Time_Value *max_wait_time)
-{
- if (stub == 0 || two_way)
- return this->send (message_block, max_wait_time);
+ TAO_Queued_Message *tmp = i;
+ i = i->next ();
- TAO_Sync_Strategy &sync_strategy = stub->sync_strategy ();
-
- return sync_strategy.send (*this,
- *stub,
- message_block,
- max_wait_time);
-}
-
-ssize_t
-TAO_Transport::send_buffered_messages (const ACE_Time_Value *max_wait_time)
-{
- // Make sure we have a buffering queue and there are messages in it.
- if (this->buffering_queue_ == 0 ||
- this->buffering_queue_->is_empty ())
- return 1;
-
- // Get the first message from the queue.
- ACE_Message_Block *queued_message = 0;
- ssize_t result = this->buffering_queue_->peek_dequeue_head (queued_message);
-
- // @@ What to do here on failures?
- ACE_ASSERT (result != -1);
-
- // Actual network send.
- size_t bytes_transferred = 0;
- result = this->send (queued_message,
- max_wait_time,
- &bytes_transferred);
-
- // Cannot send completely: timed out.
- if (result == -1 &&
- errno == ETIME)
- {
- if (bytes_transferred > 0)
- {
- // If successful in sending some of the data, reset the
- // queue appropriately.
- this->reset_queued_message (queued_message,
- bytes_transferred);
-
- // Indicate some success.
- return bytes_transferred;
- }
-
- // Since we queue up the message, this is not an error. We can
- // try next time around.
- return 1;
+ tmp->destroy ();
}
-
- // EOF or other errors.
- if (result == -1 ||
- result == 0)
- {
- this->dequeue_all ();
- return -1;
- }
-
- // If successful in sending data, reset the queue appropriately.
- this->reset_queued_message (queued_message,
- bytes_transferred);
-
- // Everything was successfully delivered.
- return result;
-}
-
-void
-TAO_Transport::reset_sent_message (ACE_Message_Block *message_block,
- size_t bytes_delivered)
-{
- this->reset_message (message_block,
- bytes_delivered,
- 0);
-}
-
-void
-TAO_Transport::reset_queued_message (ACE_Message_Block *message_block,
- size_t bytes_delivered)
-{
- this->reset_message (message_block,
- bytes_delivered,
- 1);
}
-void
-TAO_Transport::reset_message (ACE_Message_Block *message_block,
- size_t bytes_delivered,
- int queued_message)
-{
- while (bytes_delivered != 0)
- {
- // Our current message block chain.
- ACE_Message_Block *current_message_block = message_block;
-
- int completely_delivered_current_message_block_chain = 0;
-
- while (current_message_block != 0 &&
- bytes_delivered != 0)
- {
- size_t current_message_block_length =
- current_message_block->length ();
-
- int completely_delivered_current_message_block =
- bytes_delivered >= current_message_block_length;
-
- size_t adjustment_size =
- ACE_MIN (current_message_block_length, bytes_delivered);
-
- // Reset according to send size.
- current_message_block->rd_ptr (adjustment_size);
-
- // If queued message, adjust the queue.
- if (queued_message)
- // Hand adjust <message_length>.
- this->buffering_queue_->message_length (
- this->buffering_queue_->message_length () - adjustment_size);
-
- // Adjust <bytes_delivered>.
- bytes_delivered -= adjustment_size;
-
- if (completely_delivered_current_message_block)
- {
- // Next message block in the continuation chain.
- current_message_block = current_message_block->cont ();
-
- if (current_message_block == 0)
- completely_delivered_current_message_block_chain = 1;
- }
- }
-
- if (completely_delivered_current_message_block_chain)
- {
- // Go to the next message block chain.
- message_block = message_block->next ();
-
- // If queued message, adjust the queue.
- if (queued_message)
- // Release this <current_message_block>.
- this->dequeue_head ();
- }
- }
-}
-#endif /* 0 */
-
int
TAO_Transport::handle_output ()
{
@@ -252,16 +103,90 @@ TAO_Transport::handle_output ()
}
int
+TAO_Transport::send_message_block_chain (const ACE_Message_Block *message_block,
+ size_t &bytes_transferred,
+ ACE_Time_Value *timeout)
+{
+ bytes_transferred = 0;
+
+ iovec iov[IOV_MAX];
+ int iovcnt = 0;
+
+ while (message_block != 0)
+ {
+ size_t message_block_length =
+ message_block->length ();
+
+ // Check if this block has any data to be sent.
+ if (message_block_length > 0)
+ {
+ // Collect the data in the iovec.
+ iov[iovcnt].iov_base = message_block->rd_ptr ();
+ iov[iovcnt].iov_len = message_block_length;
+
+ // Increment iovec counter.
+ iovcnt++;
+
+ // The buffer is full make a OS call. @@ TODO find a way to
+ // find IOV_MAX for platforms that do not define it rather
+ // than simply setting IOV_MAX to some arbitrary value such
+ // as 16.
+ if (iovcnt == IOV_MAX)
+ {
+ size_t current_transfer = 0;
+
+ ssize_t result =
+ this->send (iov, iovcnt, current_transfer, timeout);
+
+
+ // Errors.
+ if (result == -1 || result == 0)
+ return result;
+
+ // Add to total bytes transferred.
+ bytes_transferred += current_transfer;
+
+ // Reset iovec counter.
+ iovcnt = 0;
+ }
+ }
+
+ // Select the next message block in the chain.
+ message_block = message_block->cont ();
+ }
+
+ // Check for remaining buffers to be sent. This will happen when
+ // IOV_MAX is not a multiple of the number of message blocks.
+ if (iovcnt != 0)
+ {
+ size_t current_transfer = 0;
+
+ ssize_t result =
+ this->send (iov, iovcnt, current_transfer, timeout);
+ // Errors.
+ if (result == -1 || result == 0)
+ return result;
+
+ // Add to total bytes transferred.
+ bytes_transferred += current_transfer;
+ }
+
+ // Return total bytes transferred.
+ return bytes_transferred;
+}
+
+int
TAO_Transport::send_current_message (void)
{
if (this->current_message_ == 0)
return 1;
- size_t byte_count;
- ssize_t n = this->send (this->current_message_->mb (),
- 0, /* it is non-blocking */
- &byte_count);
- if (n == 0)
+ size_t bytes_transferred;
+
+ ssize_t retval =
+ this->send_message_block_chain (this->current_message_->mb (),
+ bytes_transferred);
+ if (retval == 0)
{
// The connection was closed, return -1 to have the Reactor
// close this transport and event handler
@@ -270,7 +195,7 @@ TAO_Transport::send_current_message (void)
// Because there can be a partial transfer we need to adjust the
// number of bytes sent.
- this->current_message_->bytes_transferred (byte_count);
+ this->current_message_->bytes_transferred (bytes_transferred);
if (this->current_message_->done ())
{
// Remove the current message....
@@ -279,13 +204,13 @@ TAO_Transport::send_current_message (void)
this->current_message_ = 0;
- if (n == -1)
+ if (retval == -1)
return -1;
return 1;
}
- if (n == -1)
+ if (retval == -1)
{
// ... timeouts and flow control are not real errors, the
// connection is still valid and we must continue sending the
@@ -347,16 +272,16 @@ TAO_Transport::send_message_i (TAO_Stub *stub,
if (non_queued_message)
{
// ... in this case we must try to send the message first ...
-
+
size_t byte_count;
// @@ I don't think we want to hold the mutex here, however if
// we release it we need to recheck the status of the transport
// after we return... once I understand the final form for this
// code I will re-visit this stuff.
- ssize_t n = this->send (message_block,
- 0, // non-blocking
- &byte_count);
+ ssize_t n = this->send_message_block_chain (message_block,
+ byte_count,
+ 0 /* non-blocking */);
if (n == 0)
return -1;
else if (n == -1 && errno != EWOULDBLOCK)
diff --git a/TAO/tao/Transport.h b/TAO/tao/Transport.h
index 1505925b4e9..04dfef8f626 100644
--- a/TAO/tao/Transport.h
+++ b/TAO/tao/Transport.h
@@ -324,9 +324,9 @@ public:
* bytes already on the OS I/O subsystem.
*
*/
- virtual ssize_t send (const ACE_Message_Block *mblk,
- const ACE_Time_Value *timeout = 0,
- size_t *bytes_transferred = 0) = 0;
+ virtual ssize_t send (iovec *iov, int iovcnt,
+ size_t &bytes_transferred,
+ const ACE_Time_Value *timeout = 0) = 0;
// Read len bytes from into buf.
/**
@@ -466,26 +466,10 @@ public:
CORBA::Octet minor) = 0;
//@}
-protected:
-#if 0
- /// Remove the first message from the outgoing queue.
- void dequeue_head (void);
-
- /// Update the state of the outgoing queue, assuming that
- /// bytes_delivered bytes have been sent already.
- void reset_queued_message (ACE_Message_Block *message_block,
- size_t bytes_delivered);
-
- /// Update the state of the outgoing queue, this time a complete
- /// message was sent.
- void reset_sent_message (ACE_Message_Block *message_block,
- size_t bytes_delivered);
-
- /// Helper function used to implement the two methods above.
- void reset_message (ACE_Message_Block *message_block,
- size_t bytes_delivered,
- int queued_message);
-#endif /* 0 */
+ /// Send a message block chain,
+ int send_message_block_chain (const ACE_Message_Block *message_block,
+ size_t &bytes_transferred,
+ ACE_Time_Value *max_wait_time = 0);
protected:
/// Sent the contents of <message_block>, blocking if required by