summaryrefslogtreecommitdiff
path: root/TAO/tao/Transport.cpp
diff options
context:
space:
mode:
authorAdam Mitz <mitza-oci@users.noreply.github.com>2006-08-09 15:39:56 +0000
committerAdam Mitz <mitza-oci@users.noreply.github.com>2006-08-09 15:39:56 +0000
commit02b6be8e9fc42875d428cda382627512f6c04a53 (patch)
treedbf7b5750a89613bbe790660b096e99c30b8fed3 /TAO/tao/Transport.cpp
parent1168389f4bc5ac9ecb4c54042530ffea807e38fb (diff)
downloadATCD-02b6be8e9fc42875d428cda382627512f6c04a53.tar.gz
importing initial work on this ticket into the subversion branch
Diffstat (limited to 'TAO/tao/Transport.cpp')
-rw-r--r--TAO/tao/Transport.cpp141
1 files changed, 106 insertions, 35 deletions
diff --git a/TAO/tao/Transport.cpp b/TAO/tao/Transport.cpp
index 7f3c7ebf4b2..b132d4ee612 100644
--- a/TAO/tao/Transport.cpp
+++ b/TAO/tao/Transport.cpp
@@ -27,6 +27,7 @@
#include "ace/OS_NS_stdio.h"
#include "ace/Reactor.h"
#include "ace/os_include/sys/os_uio.h"
+#include "ace/High_Res_Timer.h"
/*
* Specialization hook to add include files from
@@ -478,12 +479,13 @@ TAO_Transport::handle_output (void)
}
int
-TAO_Transport::format_queue_message (TAO_OutputCDR &stream)
+TAO_Transport::format_queue_message (TAO_OutputCDR &stream,
+ ACE_Time_Value *max_wait_time)
{
if (this->messaging_object ()->format_message (stream) != 0)
return -1;
- return this->queue_message_i (stream.begin());
+ return this->queue_message_i (stream.begin (), max_wait_time);
}
int
@@ -503,7 +505,7 @@ TAO_Transport::send_message_block_chain_i (const ACE_Message_Block *mb,
size_t &bytes_transferred,
ACE_Time_Value *)
{
- size_t const total_length = mb->total_length ();
+ const size_t total_length = mb->total_length ();
// We are going to block, so there is no need to clone
// the message block.
@@ -512,7 +514,7 @@ TAO_Transport::send_message_block_chain_i (const ACE_Message_Block *mb,
synch_message.push_back (this->head_, this->tail_);
- int const n = this->drain_queue_i ();
+ const int n = this->drain_queue_i ();
if (n == -1)
{
@@ -541,14 +543,12 @@ TAO_Transport::send_synchronous_message_i (const ACE_Message_Block *mb,
// We are going to block, so there is no need to clone
// the message block.
TAO_Synch_Queued_Message synch_message (mb, this->orb_core_);
+ const size_t message_length = synch_message.message_length ();
- // Push synch_message on to the back of the queue.
synch_message.push_back (this->head_, this->tail_);
- int const n =
- this->send_synch_message_helper_i (synch_message,
- max_wait_time);
-
+ const int n = this->send_synch_message_helper_i (synch_message,
+ 0 /*ignored*/);
if (n == -1 || n == 1)
{
return n;
@@ -558,18 +558,30 @@ TAO_Transport::send_synchronous_message_i (const ACE_Message_Block *mb,
// if (max_wait_time != 0 && errno == ETIME) return -1;
TAO_Flushing_Strategy *flushing_strategy =
this->orb_core ()->flushing_strategy ();
- (void) flushing_strategy->schedule_output (this);
+ int result = flushing_strategy->schedule_output (this);
+ if (result == -1)
+ {
+ synch_message.remove_from_list (this->head_, this->tail_);
+ if (TAO_debug_level > 0)
+ {
+ ACE_ERROR ((LM_ERROR,
+ ACE_TEXT ("TAO (%P|%t) - Transport[%d]::")
+ ACE_TEXT ("send_synchronous_message_i, ")
+ ACE_TEXT ("error while scheduling flush - %m\n"),
+ this->id ()));
+ }
+ return -1;
+ }
+
+ // No need to check for result == TAO_Flushing_Strategy::MUST_FLUSH,
+ // because we're always going to flush anyway.
// Release the mutex, other threads may modify the queue as we
// block for a long time writing out data.
- int result;
{
typedef ACE_Reverse_Lock<ACE_Lock> TAO_REVERSE_LOCK;
TAO_REVERSE_LOCK reverse (*this->handler_lock_);
- ACE_GUARD_RETURN (TAO_REVERSE_LOCK,
- ace_mon,
- reverse,
- -1);
+ ACE_GUARD_RETURN (TAO_REVERSE_LOCK, ace_mon, reverse, -1);
result = flushing_strategy->flush_message (this,
&synch_message,
@@ -582,7 +594,8 @@ TAO_Transport::send_synchronous_message_i (const ACE_Message_Block *mb,
if (errno == ETIME)
{
- if (this->head_ == &synch_message)
+ // If partially sent, then we must queue the remainder.
+ if (message_length != synch_message.message_length ())
{
// This is a timeout, there is only one nasty case: the
// message has been partially sent! We simply cannot take
@@ -597,13 +610,12 @@ TAO_Transport::send_synchronous_message_i (const ACE_Message_Block *mb,
// and figuring out the request ID would be a bit of a
// nightmare.
//
-
- synch_message.remove_from_list (this->head_, this->tail_);
TAO_Queued_Message *queued_message = 0;
ACE_NEW_RETURN (queued_message,
TAO_Asynch_Queued_Message (
synch_message.current_block (),
this->orb_core_,
+ 0, // no timeout
0,
1),
-1);
@@ -682,6 +694,13 @@ TAO_Transport::send_reply_message_i (const ACE_Message_Block *mb,
msg->remove_from_list (this->head_, this->tail_);
msg->destroy ();
}
+ else if (result == TAO_Flushing_Strategy::MUST_FLUSH)
+ {
+ typedef ACE_Reverse_Lock<ACE_Lock> TAO_REVERSE_LOCK;
+ TAO_REVERSE_LOCK reverse (*this->handler_lock_);
+ ACE_GUARD_RETURN (TAO_REVERSE_LOCK, ace_mon, reverse, -1);
+ (void) flushing_strategy->flush_message(this, msg, 0);
+ }
return 1;
}
@@ -797,7 +816,14 @@ TAO_Transport::handle_timeout (const ACE_Time_Value & /* current_time */,
TAO_Flushing_Strategy *flushing_strategy =
this->orb_core ()->flushing_strategy ();
- (void) flushing_strategy->schedule_output (this);
+ int result = flushing_strategy->schedule_output (this);
+ if (result == TAO_Flushing_Strategy::MUST_FLUSH)
+ {
+ typedef ACE_Reverse_Lock<ACE_Lock> TAO_REVERSE_LOCK;
+ TAO_REVERSE_LOCK reverse (*this->handler_lock_);
+ ACE_GUARD_RETURN (TAO_REVERSE_LOCK, ace_mon, reverse, -1);
+ (void) flushing_strategy->flush_transport (this);
+ }
}
return 0;
@@ -920,8 +946,29 @@ TAO_Transport::drain_queue_i (void)
// call.
this->sent_byte_count_ = 0;
+ // Avoid calling this expensive function each time through the loop. Instead
+ // we'll assume that the time is unlikely to change much during the loop.
+ // If we are forced to send in the loop then we'll recompute the time.
+ ACE_Time_Value now = ACE_High_Res_Timer::gettimeofday_hr ();
+
while (i != 0)
{
+ if (i->is_expired (now))
+ {
+ if (TAO_debug_level > 3)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("TAO (%P|%t - Transport[%d]::drain_queue_i, ")
+ ACE_TEXT ("Discarding expired queued message.\n"),
+ this->id ()));
+ }
+ i->state_changed (TAO_LF_Event::LFS_TIMEOUT,
+ this->orb_core_->leader_follower ());
+ i->remove_from_list (this->head_, this->tail_);
+ i->destroy ();
+ i = this->head_;
+ continue;
+ }
// ... each element fills the iovector ...
i->fill_iov (ACE_IOV_MAX, iovcnt, iov);
@@ -933,6 +980,8 @@ TAO_Transport::drain_queue_i (void)
int const retval =
this->drain_queue_helper (iovcnt, iov);
+ now = ACE_High_Res_Timer::gettimeofday_hr ();
+
if (TAO_debug_level > 4)
{
ACE_DEBUG ((LM_DEBUG,
@@ -999,11 +1048,19 @@ TAO_Transport::cleanup_queue_i ()
this->id ()));
}
+ int byte_count = 0;
+ int msg_count = 0;
+
// Cleanup all messages
while (this->head_ != 0)
{
TAO_Queued_Message *i = this->head_;
+ if (TAO_debug_level > 4)
+ {
+ byte_count += i->message_length();
+ ++msg_count;
+ }
// @@ This is a good point to insert a flag to indicate that a
// CloseConnection message was successfully received.
i->state_changed (TAO_LF_Event::LFS_CONNECTION_CLOSED,
@@ -1012,7 +1069,15 @@ TAO_Transport::cleanup_queue_i ()
i->remove_from_list (this->head_, this->tail_);
i->destroy ();
- }
+ }
+
+ if (TAO_debug_level > 4)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("TAO (%P|%t) - Transport[%d]::cleanup_queue_i, ")
+ ACE_TEXT ("discarded %d messages, %d bytes.\n"),
+ this->id (), msg_count, byte_count));
+ }
}
void
@@ -1232,6 +1297,10 @@ TAO_Transport::send_asynchronous_message_i (TAO_Stub *stub,
return 0;
}
+ // If it was partially sent, then we can't allow a timeout
+ if (byte_count > 0)
+ max_wait_time = 0;
+
if (TAO_debug_level > 6)
{
ACE_DEBUG ((LM_DEBUG,
@@ -1262,14 +1331,16 @@ TAO_Transport::send_asynchronous_message_i (TAO_Stub *stub,
this->id ()));
}
- if (this->queue_message_i(message_block) == -1)
+ if (this->queue_message_i (message_block, max_wait_time) == -1)
{
if (TAO_debug_level > 0)
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("TAO (%P|%t) - Transport[%d]::send_asynchronous_message_i, ")
- ACE_TEXT ("cannot queue message for ")
- ACE_TEXT (" - %m\n"),
- this->id ()));
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("TAO (%P|%t) - Transport[%d]::")
+ ACE_TEXT ("send_asynchronous_message_i, ")
+ ACE_TEXT ("cannot queue message for - %m\n"),
+ this->id ()));
+ }
return -1;
}
@@ -1289,7 +1360,11 @@ TAO_Transport::send_asynchronous_message_i (TAO_Stub *stub,
if (constraints_reached || try_sending_first)
{
- (void) flushing_strategy->schedule_output (this);
+ int result = flushing_strategy->schedule_output (this);
+ if (result == TAO_Flushing_Strategy::MUST_FLUSH)
+ {
+ must_flush = true;
+ }
}
if (must_flush)
@@ -1305,12 +1380,14 @@ TAO_Transport::send_asynchronous_message_i (TAO_Stub *stub,
}
int
-TAO_Transport::queue_message_i(const ACE_Message_Block *message_block)
+TAO_Transport::queue_message_i (const ACE_Message_Block *message_block,
+ ACE_Time_Value *max_wait_time)
{
TAO_Queued_Message *queued_message = 0;
ACE_NEW_RETURN (queued_message,
TAO_Asynch_Queued_Message (message_block,
this->orb_core_,
+ max_wait_time,
0,
1),
-1);
@@ -2387,13 +2464,7 @@ TAO_Transport::post_open (size_t id)
// If the wait strategy wants us to be registered with the reactor
// then we do so. If registeration is required and it succeeds,
// #REFCOUNT# becomes two.
- if (this->wait_strategy ()->register_handler () == 0)
- {
- TAO_Flushing_Strategy *flushing_strategy =
- this->orb_core ()->flushing_strategy ();
- (void) flushing_strategy->schedule_output (this);
- }
- else
+ if (this->wait_strategy ()->register_handler () != 0)
{
// Registration failures.