summaryrefslogtreecommitdiff
path: root/TAO/tao/Transport.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'TAO/tao/Transport.cpp')
-rw-r--r--TAO/tao/Transport.cpp318
1 files changed, 179 insertions, 139 deletions
diff --git a/TAO/tao/Transport.cpp b/TAO/tao/Transport.cpp
index 814bd170f97..68893083f9a 100644
--- a/TAO/tao/Transport.cpp
+++ b/TAO/tao/Transport.cpp
@@ -155,12 +155,16 @@ TAO_Transport::provide_handle (ACE_Handle_Set &handle_set)
}
static void
-dump_iov (iovec *iov, int iovcnt, int id, size_t current_transfer)
+dump_iov (iovec *iov, int iovcnt, int id,
+ size_t current_transfer,
+ const char *location)
{
+ ACE_Log_Msg::instance ()->acquire ();
+
ACE_DEBUG ((LM_DEBUG,
- "TAO (%P|%t) - Transport[%d]::send_message_block_chain"
+ "TAO (%P|%t) - Transport[%d]::%s"
" sending %d buffers\n",
- id, iovcnt));
+ id, location, iovcnt));
for (int i = 0; i != iovcnt && 0 < current_transfer; ++i)
{
size_t iov_len = iov[i].iov_len;
@@ -170,28 +174,36 @@ dump_iov (iovec *iov, int iovcnt, int id, size_t current_transfer)
iov_len = current_transfer;
ACE_DEBUG ((LM_DEBUG,
- "TAO (%P|%t) - Transport[%d]::send_message_block_chain"
+ "TAO (%P|%t) - Transport[%d]::%s"
" buffer %d/%d has %d bytes\n",
- id,
+ id, location,
i, iovcnt,
iov_len));
+
size_t len;
for (size_t offset = 0; offset < iov_len; offset += len)
{
+ char header[1024];
+ ACE_OS::sprintf (header,
+ "TAO - Transport[%d]::%s (%d/%d)\n",
+ id, location, offset, iov_len);
+
len = iov_len - offset;
if (len > 512)
len = 512;
ACE_HEX_DUMP ((LM_DEBUG,
ACE_static_cast(char*,iov[i].iov_base) + offset,
len,
- "TAO (%P|%t) - Transport::send_message_block_chain "));
+ header));
}
current_transfer -= iov_len;
}
ACE_DEBUG ((LM_DEBUG,
- "TAO (%P|%t) - Transport[%d]::send_mesage_block_chain"
+ "TAO (%P|%t) - Transport[%d]::%s"
" end of data\n",
- id));
+ id, location));
+
+ ACE_Log_Msg::instance ()->release ();
}
int
@@ -230,9 +242,10 @@ TAO_Transport::send_message_block_chain (const ACE_Message_Block *message_block,
ssize_t result =
this->send (iov, iovcnt, current_transfer, timeout);
- if (TAO_debug_level > 6)
+ if (TAO_debug_level == 2)
{
- dump_iov (iov, iovcnt, this->id (), current_transfer);
+ dump_iov (iov, iovcnt, this->id (),
+ current_transfer, "send_message_block_chain");
}
// Add to total bytes transferred.
@@ -260,9 +273,10 @@ TAO_Transport::send_message_block_chain (const ACE_Message_Block *message_block,
ssize_t result =
this->send (iov, iovcnt, current_transfer, timeout);
- if (TAO_debug_level > 6)
+ if (TAO_debug_level == 2)
{
- dump_iov (iov, iovcnt, this->id (), current_transfer);
+ dump_iov (iov, iovcnt, this->id (),
+ current_transfer, "send_message_block_chain");
}
// Add to total bytes transferred.
@@ -279,162 +293,155 @@ TAO_Transport::send_message_block_chain (const ACE_Message_Block *message_block,
int
TAO_Transport::send_message_i (TAO_Stub *stub,
- int twoway_flag,
+ int is_synchronous,
const ACE_Message_Block *message_block,
ACE_Time_Value *max_wait_time)
{
- TAO_Flushing_Strategy *flushing_strategy =
- this->orb_core ()->flushing_strategy ();
-
ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, ace_mon, this->queue_mutex_, -1);
- int queue_empty = (this->head_ == 0);
+ if (is_synchronous)
+ {
+ return this->send_synchronous_message_i (stub,
+ message_block,
+ max_wait_time);
+ }
// Let's figure out if the message should be queued without trying
// to send first:
- int must_queue = 0;
- if (this->head_ != 0)
- must_queue = 1;
- else if (!twoway_flag
- && stub->sync_strategy ().must_queue (queue_empty))
- {
- must_queue = 1;
- }
+ int try_sending_first = 1;
- if (must_queue)
+ int queue_empty = (this->head_ == 0);
+
+ if (!queue_empty)
+ try_sending_first = 0;
+ else if (stub->sync_strategy ().must_queue (queue_empty))
+ try_sending_first = 0;
+
+ size_t byte_count = 0;
+ ssize_t n;
+ if (try_sending_first)
{
- // ... simply queue the message ...
+ // ... in this case we must try to send the message first ...
if (TAO_debug_level > 6)
{
ACE_DEBUG ((LM_DEBUG,
"TAO (%P|%t) - Transport[%d]::send_message_i, "
- "message is queued\n",
+ "trying to send the message\n",
this->id ()));
}
- TAO_Queued_Message *queued_message = 0;
- ACE_NEW_RETURN (queued_message,
- TAO_Asynch_Queued_Message (message_block),
- -1);
- queued_message->push_back (this->head_, this->tail_);
+ // @@ I don't think we want to hold the mutex here, however if
+ // we release it we need to recheck the status of the transport
+ // after we return... once I understand the final form for this
+ // code I will re-visit this decision
+ n = this->send_message_block_chain (message_block,
+ byte_count,
+ max_wait_time);
+ if (n == 0)
+ return -1; // EOF
+ else if (n == -1)
+ {
+ // ... if this is just an EWOULDBLOCK we must schedule the
+ // message for later, if it is ETIME we still have to send
+ // the complete message, because cutting off the message at
+ // this point will destroy the synchronization with the
+ // server ...
+ if (errno != EWOULDBLOCK && errno != ETIME)
+ {
+ return -1;
+ }
+ }
- if (this->must_flush_queue_i (stub))
+ // ... let's figure out if the complete message was sent ...
+ if (message_block->total_length () == byte_count)
{
- ace_mon.release ();
- int result = flushing_strategy->flush_message (this,
- this->tail_);
- return result;
+ // Done, just return. Notice that there are no allocations
+ // or copies up to this point (though some fancy calling
+ // back and forth).
+ // This is the common case for the critical path, it should
+ // be fast.
+ return 0;
}
- return 0;
}
- // ... in this case we must try to send the message first ...
+ // ... either the message must be queued or we need to queue it
+ // because it was not completely sent out ...
if (TAO_debug_level > 6)
{
ACE_DEBUG ((LM_DEBUG,
"TAO (%P|%t) - Transport[%d]::send_message_i, "
- "trying to send the message\n",
+ "message is queued\n",
this->id ()));
}
- size_t byte_count;
+ TAO_Queued_Message *queued_message = 0;
+ ACE_NEW_RETURN (queued_message,
+ TAO_Asynch_Queued_Message (message_block),
+ -1);
+ queued_message->bytes_transferred (byte_count);
+ queued_message->push_back (this->head_, this->tail_);
- // @@ I don't think we want to hold the mutex here, however if
- // we release it we need to recheck the status of the transport
- // after we return... once I understand the final form for this
- // code I will re-visit this decision
- ssize_t n = this->send_message_block_chain (message_block,
- byte_count,
- max_wait_time);
- if (n == 0)
- return -1; // EOF
- else if (n == -1)
+ // ... if the queue is full we need to activate the output on the
+ // queue ...
+ if (this->must_flush_queue_i (stub))
{
- // ... if this is just an EWOULDBLOCK we must schedule the
- // message for later ...
- if (errno != EWOULDBLOCK)
- {
- return -1;
- }
+ this->orb_core ()->flushing_strategy ()->schedule_output (this);
}
- // ... let's figure out if the complete message was sent ...
- if (message_block->total_length () == byte_count)
- {
- // Done, just return. Notice that there are no allocations
- // or copies up to this point (though some fancy calling
- // back and forth).
- // This is the common case for the critical path, it should
- // be fast.
- return 0;
- }
+ // ... in any case, check for timeouts and report them to the
+ // application ...
+ if (max_wait_time != 0 && n == -1 && errno == ETIME)
+ return -1;
- // ... the message was only partially sent, schedule reactive
- // output...
- flushing_strategy->schedule_output (this);
+ return 0;
+}
- // ... and set it as the current message ...
- if (twoway_flag)
- {
- // ... we are going to block, so there is no need to clone
- // the message block...
- // @@ It seems wasteful to allocate a TAO_Queued_Message in
- // this case, but it is simpler to do it this way.
- TAO_Synch_Queued_Message synch_message (message_block);
-
- synch_message.bytes_transferred (byte_count);
- synch_message.push_back (this->head_, this->tail_);
-
- // Release the mutex, other threads may modify the queue as we
- // block for a long time writing out data.
- int result;
- {
- ace_mon.release ();
- result = flushing_strategy->flush_message (this,
- &synch_message);
-
- ace_mon.acquire ();
- }
- ACE_ASSERT (synch_message.next () == 0);
- ACE_ASSERT (synch_message.prev () == 0);
- synch_message.destroy ();
- return result;
- }
+int
+TAO_Transport::send_synchronous_message_i (TAO_Stub *stub,
+ const ACE_Message_Block *message_block,
+ ACE_Time_Value *max_wait_time)
+{
+ // We are going to block, so there is no need to clone
+ // the message block.
+ TAO_Synch_Queued_Message synch_message (message_block);
- TAO_Queued_Message *queued_message = 0;
- ACE_NEW_RETURN (queued_message,
- TAO_Asynch_Queued_Message (message_block),
- -1);
+ synch_message.push_back (this->head_, this->tail_);
- if (TAO_debug_level > 6)
- {
- ACE_DEBUG ((LM_DEBUG,
- "TAO (%P|%t) - Transport[%d]::send_message_i, "
- " queued anyway, %d bytes sent\n",
- this->id (),
- byte_count));
- }
+ int n = this->drain_queue_i ();
+ if (n == -1)
+ return -1; // Error while sending...
+ else if (n == 1)
+ return 1; // Empty queue, message was sent..
- // ... insert at the head of the queue, we can use push_back()
- // because the queue is empty ...
+ ACE_ASSERT (n == 0); // Some data sent, but data remains.
- queued_message->bytes_transferred (byte_count);
- queued_message->push_back (this->head_, this->tail_);
+ if (synch_message.all_data_sent ())
+ return 1;
- // ... this is not a twoway. We must check if the buffering
- // constraints have been reached, if so, then we should start
- // flushing out data....
+ // @todo: Check for timeouts!
+ // if (max_wait_time != 0 && errno == ETIME) return -1;
- if (this->must_flush_queue_i (stub))
- {
- ace_mon.release ();
- int result = flushing_strategy->flush_message (this,
- this->tail_);
- return result;
- }
- return 0;
+ TAO_Flushing_Strategy *flushing_strategy =
+ this->orb_core ()->flushing_strategy ();
+ (void) flushing_strategy->schedule_output (this);
+
+ // Release the mutex, other threads may modify the queue as we
+ // block for a long time writing out data.
+ int result;
+ {
+ typedef ACE_Reverse_Lock<TAO_SYNCH_MUTEX> TAO_REVERSE_SYNCH_MUTEX;
+ TAO_REVERSE_SYNCH_MUTEX reverse (this->queue_mutex_);
+
+ ACE_GUARD_RETURN (TAO_REVERSE_SYNCH_MUTEX, ace_mon, reverse, -1);
+ result = flushing_strategy->flush_message (this,
+ &synch_message);
+
+ }
+ ACE_ASSERT (synch_message.next () == 0);
+ ACE_ASSERT (synch_message.prev () == 0);
+ return result;
}
int
@@ -547,18 +554,29 @@ TAO_Transport::make_idle (void)
void
TAO_Transport::close_connection (void)
{
- ACE_MT (ACE_GUARD (ACE_Lock,
- guard,
- *this->handler_lock_));
+ ACE_Event_Handler *eh = 0;
+ {
+ ACE_MT (ACE_GUARD (ACE_Lock, guard, *this->handler_lock_));
- // Call handle close on the handler.
- // The event handler is as common as we can get
- ACE_Event_Handler *eh = this->event_handler_i ();
- if (eh)
- eh->handle_close (ACE_INVALID_HANDLE,
- ACE_Event_Handler::ALL_EVENTS_MASK);
+ eh = this->event_handler_i ();
+
+ this->transition_handler_state_i ();
+
+ if (eh == 0)
+ return;
+ }
+
+ // Close the underlying connection, it is enough to get an
+ // Event_Handler pointer to do this, so we can factor out the code
+ // in the base TAO_Transport class.
+ (void) eh->handle_close (ACE_INVALID_HANDLE,
+ ACE_Event_Handler::ALL_EVENTS_MASK);
// Purge the entry
+ // @todo This is redundant, handle_close() eventually calls
+ // this->connection_handler_closing(), that performs the same
+ // work, for some reason they hold the mutex while they do
+ // that work though.
this->orb_core_->transport_cache ().purge_entry (this->cache_map_entry_);
for (TAO_Queued_Message *i = this->head_; i != 0; i = i->next ())
@@ -580,11 +598,14 @@ TAO_Transport::send (iovec *iov, int iovcnt,
// if there's no associated event handler, then we act like a null transport
if (this->event_handler_i () == 0)
{
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("(%P|%t) transport %d (tag=%d) send() ")
- ACE_TEXT ("no longer associated with handler, returning -1 with errno = ENOENT\n"),
- this->id (),
- this->tag_));
+ if (TAO_debug_level > 0)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("(%P|%t) transport %d (tag=%d) send() ")
+ ACE_TEXT ("no longer associated with handler, returning -1 with errno = ENOENT\n"),
+ this->id (),
+ this->tag_));
+ }
errno = ENOENT;
return -1;
}
@@ -716,6 +737,12 @@ TAO_Transport::drain_queue (void)
{
ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, ace_mon, this->queue_mutex_, -1);
+ return this->drain_queue_i ();
+}
+
+int
+TAO_Transport::drain_queue_i (void)
+{
if (this->head_ == 0)
return 1;
@@ -743,6 +770,12 @@ TAO_Transport::drain_queue (void)
ssize_t retval =
this->send (iov, iovcnt, byte_count);
+ if (TAO_debug_level == 2)
+ {
+ dump_iov (iov, iovcnt, this->id (),
+ byte_count, "drain_queue_i");
+ }
+
// ... now we need to update the queue, removing elements
// that have been sent, and updating the last element if it
// was only partially sent ...
@@ -775,6 +808,12 @@ TAO_Transport::drain_queue (void)
ssize_t retval =
this->send (iov, iovcnt, byte_count);
+ if (TAO_debug_level == 2)
+ {
+ dump_iov (iov, iovcnt, this->id (),
+ byte_count, "drain_queue_i");
+ }
+
this->cleanup_queue (byte_count);
iovcnt = 0;
@@ -788,6 +827,7 @@ TAO_Transport::drain_queue (void)
return 0;
return -1;
}
+ ACE_ASSERT (byte_count != 0);
if (this->head_ == 0)
return 1;