summaryrefslogtreecommitdiff
path: root/TAO/tao/Transport.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'TAO/tao/Transport.cpp')
-rw-r--r--TAO/tao/Transport.cpp257
1 files changed, 91 insertions, 166 deletions
diff --git a/TAO/tao/Transport.cpp b/TAO/tao/Transport.cpp
index 94beb0d4f9b..d3ca90cee6f 100644
--- a/TAO/tao/Transport.cpp
+++ b/TAO/tao/Transport.cpp
@@ -51,169 +51,20 @@ TAO_Transport::~TAO_Transport (void)
// delete this->buffering_queue_;
- for (TAO_Queued_Message *i = this->head_; i != 0; i = i->next ())
+ TAO_Queued_Message *i = this->head_;
+ while (i != 0)
{
// @@ This is a good point to insert a flag to indicate that a
// CloseConnection message was successfully received.
i->connection_closed ();
- i->destroy ();
- }
-}
-
-#if 0
-ssize_t
-TAO_Transport::send_or_buffer (TAO_Stub *stub,
- int two_way,
- const ACE_Message_Block *message_block,
- const ACE_Time_Value *max_wait_time)
-{
- if (stub == 0 || two_way)
- return this->send (message_block, max_wait_time);
+ TAO_Queued_Message *tmp = i;
+ i = i->next ();
- TAO_Sync_Strategy &sync_strategy = stub->sync_strategy ();
-
- return sync_strategy.send (*this,
- *stub,
- message_block,
- max_wait_time);
-}
-
-ssize_t
-TAO_Transport::send_buffered_messages (const ACE_Time_Value *max_wait_time)
-{
- // Make sure we have a buffering queue and there are messages in it.
- if (this->buffering_queue_ == 0 ||
- this->buffering_queue_->is_empty ())
- return 1;
-
- // Get the first message from the queue.
- ACE_Message_Block *queued_message = 0;
- ssize_t result = this->buffering_queue_->peek_dequeue_head (queued_message);
-
- // @@ What to do here on failures?
- ACE_ASSERT (result != -1);
-
- // Actual network send.
- size_t bytes_transferred = 0;
- result = this->send (queued_message,
- max_wait_time,
- &bytes_transferred);
-
- // Cannot send completely: timed out.
- if (result == -1 &&
- errno == ETIME)
- {
- if (bytes_transferred > 0)
- {
- // If successful in sending some of the data, reset the
- // queue appropriately.
- this->reset_queued_message (queued_message,
- bytes_transferred);
-
- // Indicate some success.
- return bytes_transferred;
- }
-
- // Since we queue up the message, this is not an error. We can
- // try next time around.
- return 1;
+ tmp->destroy ();
}
-
- // EOF or other errors.
- if (result == -1 ||
- result == 0)
- {
- this->dequeue_all ();
- return -1;
- }
-
- // If successful in sending data, reset the queue appropriately.
- this->reset_queued_message (queued_message,
- bytes_transferred);
-
- // Everything was successfully delivered.
- return result;
-}
-
-void
-TAO_Transport::reset_sent_message (ACE_Message_Block *message_block,
- size_t bytes_delivered)
-{
- this->reset_message (message_block,
- bytes_delivered,
- 0);
-}
-
-void
-TAO_Transport::reset_queued_message (ACE_Message_Block *message_block,
- size_t bytes_delivered)
-{
- this->reset_message (message_block,
- bytes_delivered,
- 1);
}
-void
-TAO_Transport::reset_message (ACE_Message_Block *message_block,
- size_t bytes_delivered,
- int queued_message)
-{
- while (bytes_delivered != 0)
- {
- // Our current message block chain.
- ACE_Message_Block *current_message_block = message_block;
-
- int completely_delivered_current_message_block_chain = 0;
-
- while (current_message_block != 0 &&
- bytes_delivered != 0)
- {
- size_t current_message_block_length =
- current_message_block->length ();
-
- int completely_delivered_current_message_block =
- bytes_delivered >= current_message_block_length;
-
- size_t adjustment_size =
- ACE_MIN (current_message_block_length, bytes_delivered);
-
- // Reset according to send size.
- current_message_block->rd_ptr (adjustment_size);
-
- // If queued message, adjust the queue.
- if (queued_message)
- // Hand adjust <message_length>.
- this->buffering_queue_->message_length (
- this->buffering_queue_->message_length () - adjustment_size);
-
- // Adjust <bytes_delivered>.
- bytes_delivered -= adjustment_size;
-
- if (completely_delivered_current_message_block)
- {
- // Next message block in the continuation chain.
- current_message_block = current_message_block->cont ();
-
- if (current_message_block == 0)
- completely_delivered_current_message_block_chain = 1;
- }
- }
-
- if (completely_delivered_current_message_block_chain)
- {
- // Go to the next message block chain.
- message_block = message_block->next ();
-
- // If queued message, adjust the queue.
- if (queued_message)
- // Release this <current_message_block>.
- this->dequeue_head ();
- }
- }
-}
-#endif /* 0 */
-
int
TAO_Transport::handle_output ()
{
@@ -252,16 +103,90 @@ TAO_Transport::handle_output ()
}
int
+TAO_Transport::send_message_block_chain (const ACE_Message_Block *message_block,
+ size_t &bytes_transferred,
+ ACE_Time_Value *timeout)
+{
+ bytes_transferred = 0;
+
+ iovec iov[IOV_MAX];
+ int iovcnt = 0;
+
+ while (message_block != 0)
+ {
+ size_t message_block_length =
+ message_block->length ();
+
+ // Check if this block has any data to be sent.
+ if (message_block_length > 0)
+ {
+ // Collect the data in the iovec.
+ iov[iovcnt].iov_base = message_block->rd_ptr ();
+ iov[iovcnt].iov_len = message_block_length;
+
+ // Increment iovec counter.
+ iovcnt++;
+
+ // The buffer is full make a OS call. @@ TODO find a way to
+ // find IOV_MAX for platforms that do not define it rather
+ // than simply setting IOV_MAX to some arbitrary value such
+ // as 16.
+ if (iovcnt == IOV_MAX)
+ {
+ size_t current_transfer = 0;
+
+ ssize_t result =
+ this->send (iov, iovcnt, current_transfer, timeout);
+
+
+ // Errors.
+ if (result == -1 || result == 0)
+ return result;
+
+ // Add to total bytes transferred.
+ bytes_transferred += current_transfer;
+
+ // Reset iovec counter.
+ iovcnt = 0;
+ }
+ }
+
+ // Select the next message block in the chain.
+ message_block = message_block->cont ();
+ }
+
+ // Check for remaining buffers to be sent. This will happen when
+ // IOV_MAX is not a multiple of the number of message blocks.
+ if (iovcnt != 0)
+ {
+ size_t current_transfer = 0;
+
+ ssize_t result =
+ this->send (iov, iovcnt, current_transfer, timeout);
+ // Errors.
+ if (result == -1 || result == 0)
+ return result;
+
+ // Add to total bytes transferred.
+ bytes_transferred += current_transfer;
+ }
+
+ // Return total bytes transferred.
+ return bytes_transferred;
+}
+
+int
TAO_Transport::send_current_message (void)
{
if (this->current_message_ == 0)
return 1;
- size_t byte_count;
- ssize_t n = this->send (this->current_message_->mb (),
- 0, /* it is non-blocking */
- &byte_count);
- if (n == 0)
+ size_t bytes_transferred;
+
+ ssize_t retval =
+ this->send_message_block_chain (this->current_message_->mb (),
+ bytes_transferred);
+ if (retval == 0)
{
// The connection was closed, return -1 to have the Reactor
// close this transport and event handler
@@ -270,7 +195,7 @@ TAO_Transport::send_current_message (void)
// Because there can be a partial transfer we need to adjust the
// number of bytes sent.
- this->current_message_->bytes_transferred (byte_count);
+ this->current_message_->bytes_transferred (bytes_transferred);
if (this->current_message_->done ())
{
// Remove the current message....
@@ -279,13 +204,13 @@ TAO_Transport::send_current_message (void)
this->current_message_ = 0;
- if (n == -1)
+ if (retval == -1)
return -1;
return 1;
}
- if (n == -1)
+ if (retval == -1)
{
// ... timeouts and flow control are not real errors, the
// connection is still valid and we must continue sending the
@@ -347,16 +272,16 @@ TAO_Transport::send_message_i (TAO_Stub *stub,
if (non_queued_message)
{
// ... in this case we must try to send the message first ...
-
+
size_t byte_count;
// @@ I don't think we want to hold the mutex here, however if
// we release it we need to recheck the status of the transport
// after we return... once I understand the final form for this
// code I will re-visit this stuff.
- ssize_t n = this->send (message_block,
- 0, // non-blocking
- &byte_count);
+ ssize_t n = this->send_message_block_chain (message_block,
+ byte_count,
+ 0 /* non-blocking */);
if (n == 0)
return -1;
else if (n == -1 && errno != EWOULDBLOCK)