summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAdam Mitz <mitza-oci@users.noreply.github.com>2008-01-15 22:23:14 +0000
committerAdam Mitz <mitza-oci@users.noreply.github.com>2008-01-15 22:23:14 +0000
commita64ee193298479978c502a6ea98d37ac8945fc86 (patch)
tree07caae2b1f21e45394ae431604f786f0ba67250b
parenta85f3ab0fabdadcaff8a7680e06b221fb83b973b (diff)
downloadATCD-a64ee193298479978c502a6ea98d37ac8945fc86.tar.gz
ChangeLogTag: Tue Jan 15 22:19:48 UTC 2008 Adam Mitz <mitza@ociweb.com>
-rw-r--r--TAO/ChangeLog15
-rw-r--r--TAO/tao/Block_Flushing_Strategy.cpp9
-rw-r--r--TAO/tao/Block_Flushing_Strategy.h3
-rw-r--r--TAO/tao/Connection_Handler.cpp2
-rw-r--r--TAO/tao/Flushing_Strategy.h2
-rw-r--r--TAO/tao/IIOP_Transport.cpp25
-rw-r--r--TAO/tao/IIOP_Transport.h5
-rw-r--r--TAO/tao/Leader_Follower_Flushing_Strategy.cpp5
-rw-r--r--TAO/tao/Leader_Follower_Flushing_Strategy.h2
-rw-r--r--TAO/tao/Reactive_Flushing_Strategy.cpp5
-rw-r--r--TAO/tao/Reactive_Flushing_Strategy.h3
-rw-r--r--TAO/tao/Transport.cpp229
-rw-r--r--TAO/tao/Transport.h10
13 files changed, 159 insertions, 156 deletions
diff --git a/TAO/ChangeLog b/TAO/ChangeLog
index 9630577bd58..4bcae696385 100644
--- a/TAO/ChangeLog
+++ b/TAO/ChangeLog
@@ -1,3 +1,18 @@
+Tue Jan 15 22:19:48 UTC 2008 Adam Mitz <mitza@ociweb.com>
+
+ * tao/Block_Flushing_Strategy.h:
+ * tao/Block_Flushing_Strategy.cpp:
+ * tao/Connection_Handler.cpp:
+ * tao/Flushing_Strategy.h:
+ * tao/IIOP_Transport.h:
+ * tao/IIOP_Transport.cpp:
+ * tao/Leader_Follower_Flushing_Strategy.h:
+ * tao/Leader_Follower_Flushing_Strategy.cpp:
+ * tao/Reactive_Flushing_Strategy.h:
+ * tao/Reactive_Flushing_Strategy.cpp:
+ * tao/Transport.h:
+ * tao/Transport.cpp:
+
Tue Jan 15 19:49:31 UTC 2008 Ciju John <johnc at ociweb dot com>
* tests/Bug_3193_Regression/test_i.h:
diff --git a/TAO/tao/Block_Flushing_Strategy.cpp b/TAO/tao/Block_Flushing_Strategy.cpp
index 87082f19819..f6ef164b948 100644
--- a/TAO/tao/Block_Flushing_Strategy.cpp
+++ b/TAO/tao/Block_Flushing_Strategy.cpp
@@ -23,22 +23,23 @@ TAO_Block_Flushing_Strategy::cancel_output (TAO_Transport *)
int
TAO_Block_Flushing_Strategy::flush_message (TAO_Transport *transport,
TAO_Queued_Message *msg,
- ACE_Time_Value *)
+ ACE_Time_Value *max_wait_time)
{
while (!msg->all_data_sent ())
{
- if (transport->handle_output () == -1)
+ if (transport->handle_output (max_wait_time) == -1)
return -1;
}
return 0;
}
int
-TAO_Block_Flushing_Strategy::flush_transport (TAO_Transport *transport)
+TAO_Block_Flushing_Strategy::flush_transport (TAO_Transport *transport
+ , ACE_Time_Value *max_wait_time)
{
while (!transport->queue_is_empty ())
{
- if (transport->handle_output () == -1)
+ if (transport->handle_output (max_wait_time) == -1)
return -1;
}
return 0;
diff --git a/TAO/tao/Block_Flushing_Strategy.h b/TAO/tao/Block_Flushing_Strategy.h
index 835b97755e7..b82ec8db0ee 100644
--- a/TAO/tao/Block_Flushing_Strategy.h
+++ b/TAO/tao/Block_Flushing_Strategy.h
@@ -35,7 +35,8 @@ public:
virtual int flush_message (TAO_Transport *transport,
TAO_Queued_Message *msg,
ACE_Time_Value *max_wait_time);
- virtual int flush_transport (TAO_Transport *transport);
+ virtual int flush_transport (TAO_Transport *transport
+ , ACE_Time_Value *max_wait_time);
};
TAO_END_VERSIONED_NAMESPACE_DECL
diff --git a/TAO/tao/Connection_Handler.cpp b/TAO/tao/Connection_Handler.cpp
index 527378707c8..f21b278d0b7 100644
--- a/TAO/tao/Connection_Handler.cpp
+++ b/TAO/tao/Connection_Handler.cpp
@@ -201,7 +201,7 @@ TAO_Connection_Handler::handle_output_eh (
return return_value;
}
- return_value = this->transport ()->handle_output ();
+ return_value = this->transport ()->handle_output (0);
this->pos_io_hook (return_value);
diff --git a/TAO/tao/Flushing_Strategy.h b/TAO/tao/Flushing_Strategy.h
index 61c9b51de10..4bcab3d5589 100644
--- a/TAO/tao/Flushing_Strategy.h
+++ b/TAO/tao/Flushing_Strategy.h
@@ -75,7 +75,7 @@ public:
ACE_Time_Value *max_wait_time) = 0;
/// Wait until the transport has no messages queued.
- virtual int flush_transport (TAO_Transport *transport) = 0;
+ virtual int flush_transport (TAO_Transport *transport, ACE_Time_Value *max_wait_time) = 0;
};
TAO_END_VERSIONED_NAMESPACE_DECL
diff --git a/TAO/tao/IIOP_Transport.cpp b/TAO/tao/IIOP_Transport.cpp
index 45e441d8f87..5dfcfbc8cc8 100644
--- a/TAO/tao/IIOP_Transport.cpp
+++ b/TAO/tao/IIOP_Transport.cpp
@@ -66,7 +66,6 @@ TAO_IIOP_Transport::send (iovec *iov, int iovcnt,
this->connection_handler_->peer ().sendv (iov,
iovcnt,
max_wait_time);
-
if (retval > 0)
bytes_transferred = retval;
else
@@ -259,30 +258,6 @@ TAO_IIOP_Transport::send_message (TAO_OutputCDR &stream,
}
int
-TAO_IIOP_Transport::send_message_shared (
- TAO_Stub *stub,
- TAO_Message_Semantics message_semantics,
- const ACE_Message_Block *message_block,
- ACE_Time_Value *max_wait_time)
-{
- int r;
-
- {
- ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->handler_lock_, -1);
-
- r = this->send_message_shared_i (stub, message_semantics,
- message_block, max_wait_time);
- }
-
- if (r == -1)
- {
- this->close_connection ();
- }
-
- return r;
-}
-
-int
TAO_IIOP_Transport::generate_request_header (TAO_Operation_Details &opdetails,
TAO_Target_Specification &spec,
TAO_OutputCDR &msg)
diff --git a/TAO/tao/IIOP_Transport.h b/TAO/tao/IIOP_Transport.h
index 25485689663..205aa3aee25 100644
--- a/TAO/tao/IIOP_Transport.h
+++ b/TAO/tao/IIOP_Transport.h
@@ -90,11 +90,6 @@ protected:
virtual ssize_t recv (char *buf, size_t len, const ACE_Time_Value *s = 0);
- virtual int send_message_shared (TAO_Stub *stub,
- TAO_Message_Semantics message_semantics,
- const ACE_Message_Block *message_block,
- ACE_Time_Value *max_wait_time);
-
public:
diff --git a/TAO/tao/Leader_Follower_Flushing_Strategy.cpp b/TAO/tao/Leader_Follower_Flushing_Strategy.cpp
index db4e9fa6831..5efc7bbd8a5 100644
--- a/TAO/tao/Leader_Follower_Flushing_Strategy.cpp
+++ b/TAO/tao/Leader_Follower_Flushing_Strategy.cpp
@@ -40,7 +40,8 @@ TAO_Leader_Follower_Flushing_Strategy::flush_message (
int
TAO_Leader_Follower_Flushing_Strategy::flush_transport (
- TAO_Transport *transport)
+ TAO_Transport *transport,
+ ACE_Time_Value *max_wait_time)
{
// @todo This is not the right way to do this....
@@ -50,7 +51,7 @@ TAO_Leader_Follower_Flushing_Strategy::flush_transport (
while (!transport->queue_is_empty ())
{
- if (orb_core->run (0, 1) == -1)
+ if (orb_core->run (max_wait_time, 1) == -1)
return -1;
}
}
diff --git a/TAO/tao/Leader_Follower_Flushing_Strategy.h b/TAO/tao/Leader_Follower_Flushing_Strategy.h
index 421ec7a591f..fba9f0e514f 100644
--- a/TAO/tao/Leader_Follower_Flushing_Strategy.h
+++ b/TAO/tao/Leader_Follower_Flushing_Strategy.h
@@ -38,7 +38,7 @@ public:
virtual int flush_message (TAO_Transport *transport,
TAO_Queued_Message *msg,
ACE_Time_Value *max_wait_time);
- virtual int flush_transport (TAO_Transport *transport);
+ virtual int flush_transport (TAO_Transport *transport, ACE_Time_Value *max_wait_time);
};
TAO_END_VERSIONED_NAMESPACE_DECL
diff --git a/TAO/tao/Reactive_Flushing_Strategy.cpp b/TAO/tao/Reactive_Flushing_Strategy.cpp
index 312b955192b..13060233926 100644
--- a/TAO/tao/Reactive_Flushing_Strategy.cpp
+++ b/TAO/tao/Reactive_Flushing_Strategy.cpp
@@ -51,7 +51,8 @@ TAO_Reactive_Flushing_Strategy::flush_message (TAO_Transport *transport,
}
int
-TAO_Reactive_Flushing_Strategy::flush_transport (TAO_Transport *transport)
+TAO_Reactive_Flushing_Strategy::flush_transport (TAO_Transport *transport
+ , ACE_Time_Value *max_wait_time)
{
// @@ Should we pass this down? Can we?
try
@@ -60,7 +61,7 @@ TAO_Reactive_Flushing_Strategy::flush_transport (TAO_Transport *transport)
while (!transport->queue_is_empty ())
{
- if (orb_core->run (0, 1) == -1)
+ if (orb_core->run (max_wait_time, 1) == -1)
return -1;
}
}
diff --git a/TAO/tao/Reactive_Flushing_Strategy.h b/TAO/tao/Reactive_Flushing_Strategy.h
index 56896b01f95..1ac4072867a 100644
--- a/TAO/tao/Reactive_Flushing_Strategy.h
+++ b/TAO/tao/Reactive_Flushing_Strategy.h
@@ -36,7 +36,8 @@ public:
virtual int flush_message (TAO_Transport *transport,
TAO_Queued_Message *msg,
ACE_Time_Value *max_wait_time);
- virtual int flush_transport (TAO_Transport *transport);
+ virtual int flush_transport (TAO_Transport *transport
+ , ACE_Time_Value *max_wait_time);
};
TAO_END_VERSIONED_NAMESPACE_DECL
diff --git a/TAO/tao/Transport.cpp b/TAO/tao/Transport.cpp
index 5b8360e7cbf..6a86a410c78 100644
--- a/TAO/tao/Transport.cpp
+++ b/TAO/tao/Transport.cpp
@@ -29,6 +29,7 @@
#include "ace/Reactor.h"
#include "ace/os_include/sys/os_uio.h"
#include "ace/High_Res_Timer.h"
+#include "ace/Countdown_Time.h"
#include "ace/CORBA_macros.h"
/*
@@ -306,6 +307,10 @@ TAO_Transport::send_message_shared (TAO_Stub *stub,
if (result == -1)
{
+ // The connection needs to be closed here.
+ // In the case of a partially written message this is the only way to cleanup
+ // the physical connection as well as the Transport. An EOF on the remote end
+ // will cancel the partially received message.
this->close_connection ();
}
@@ -480,7 +485,7 @@ TAO_Transport::update_transport (void)
*
*/
int
-TAO_Transport::handle_output (void)
+TAO_Transport::handle_output (ACE_Time_Value *max_wait_time)
{
if (TAO_debug_level > 3)
{
@@ -492,7 +497,7 @@ TAO_Transport::handle_output (void)
// The flushing strategy (potentially via the Reactor) wants to send
// more data, first check if there is a current message that needs
// more sending...
- int const retval = this->drain_queue ();
+ int const retval = this->drain_queue (max_wait_time);
if (TAO_debug_level > 3)
{
@@ -532,7 +537,7 @@ TAO_Transport::send_message_block_chain (const ACE_Message_Block *mb,
int
TAO_Transport::send_message_block_chain_i (const ACE_Message_Block *mb,
size_t &bytes_transferred,
- ACE_Time_Value *)
+ ACE_Time_Value *max_wait_time)
{
size_t const total_length = mb->total_length ();
@@ -542,7 +547,7 @@ TAO_Transport::send_message_block_chain_i (const ACE_Message_Block *mb,
synch_message.push_back (this->head_, this->tail_);
- int const n = this->drain_queue_i ();
+ int const n = this->drain_queue_i (max_wait_time);
if (n == -1)
{
@@ -570,98 +575,74 @@ TAO_Transport::send_synchronous_message_i (const ACE_Message_Block *mb,
// We are going to block, so there is no need to clone
// the message block.
TAO_Synch_Queued_Message synch_message (mb, this->orb_core_);
- size_t const message_length = synch_message.message_length ();
synch_message.push_back (this->head_, this->tail_);
- int const n = this->send_synch_message_helper_i (synch_message,
- 0 /*ignored*/);
- if (n == -1 || n == 1)
+ int const result = this->send_synch_message_helper_i (synch_message,
+ max_wait_time);
+ // A timeout doesn't return -1
+ if (result == -1 || result == 1)
{
- return n;
+ return result;
}
- // @todo: Check for timeouts!
- // if (max_wait_time != 0 && errno == ETIME) return -1;
- TAO_Flushing_Strategy *flushing_strategy =
- this->orb_core ()->flushing_strategy ();
- int result = flushing_strategy->schedule_output (this);
- if (result == -1)
+ if (max_wait_time == 0 || errno != ETIME)
{
- synch_message.remove_from_list (this->head_, this->tail_);
- if (TAO_debug_level > 0)
+ TAO_Flushing_Strategy *flushing_strategy =
+ this->orb_core ()->flushing_strategy ();
+ int result = flushing_strategy->schedule_output (this);
+ if (result == -1)
{
- ACE_ERROR ((LM_ERROR,
- ACE_TEXT ("TAO (%P|%t) - Transport[%d]::")
- ACE_TEXT ("send_synchronous_message_i, ")
- ACE_TEXT ("error while scheduling flush - %m\n"),
- this->id ()));
+ synch_message.remove_from_list (this->head_, this->tail_);
+ if (TAO_debug_level > 0)
+ {
+ ACE_ERROR ((LM_ERROR,
+ ACE_TEXT ("TAO (%P|%t) - Transport[%d]::")
+ ACE_TEXT ("send_synchronous_message_i, ")
+ ACE_TEXT ("error while scheduling flush - %m\n"),
+ this->id ()));
+ }
+ return -1;
}
- return -1;
- }
- // No need to check for result == TAO_Flushing_Strategy::MUST_FLUSH,
- // because we're always going to flush anyway.
+ // No need to check for result == TAO_Flushing_Strategy::MUST_FLUSH,
+ // because we're always going to flush anyway.
- // Release the mutex, other threads may modify the queue as we
- // block for a long time writing out data.
- {
- typedef ACE_Reverse_Lock<ACE_Lock> TAO_REVERSE_LOCK;
- TAO_REVERSE_LOCK reverse (*this->handler_lock_);
- ACE_GUARD_RETURN (TAO_REVERSE_LOCK, ace_mon, reverse, -1);
+ // Release the mutex, other threads may modify the queue as we
+ // block for a long time writing out data.
+ {
+ typedef ACE_Reverse_Lock<ACE_Lock> TAO_REVERSE_LOCK;
+ TAO_REVERSE_LOCK reverse (*this->handler_lock_);
+ ACE_GUARD_RETURN (TAO_REVERSE_LOCK, ace_mon, reverse, -1);
- result = flushing_strategy->flush_message (this,
- &synch_message,
- max_wait_time);
- }
+ result = flushing_strategy->flush_message (this,
+ &synch_message,
+ max_wait_time);
+ }
+ }
+ // The result could be -1 from either the send_synch_message_helper_i() call
+ // or the later flush. In either case return -1.
if (result == -1)
{
synch_message.remove_from_list (this->head_, this->tail_);
- if (errno == ETIME)
- {
- // If partially sent, then we must queue the remainder.
- if (message_length != synch_message.message_length ())
- {
- // This is a timeout, there is only one nasty case: the
- // message has been partially sent! We simply cannot take
- // the message out of the queue, because that would corrupt
- // the connection.
- //
- // What we do is replace the queued message with an
- // asynchronous message, that contains only what remains of
- // the timed out request. If you think about sending
- // CancelRequests in this case: there is no much point in
- // doing that: the receiving ORB would probably ignore it,
- // and figuring out the request ID would be a bit of a
- // nightmare.
- //
- TAO_Queued_Message *queued_message = 0;
- ACE_NEW_RETURN (queued_message,
- TAO_Asynch_Queued_Message (
- synch_message.current_block (),
- this->orb_core_,
- 0, // no timeout
- 0,
- true),
- -1);
- queued_message->push_front (this->head_, this->tail_);
- }
- }
+ // We don't need to do anything special for the timeout case.
+ // The connection is going to get closed and the Transport destroyed.
+ // The only thing to do maybe is to empty the queue.
if (TAO_debug_level > 0)
{
ACE_ERROR ((LM_ERROR,
ACE_TEXT ("TAO (%P|%t) - Transport[%d]::send_synchronous_message_i, ")
- ACE_TEXT ("error while flushing message - %m\n"),
+ ACE_TEXT ("error while sending message - %m\n"),
this->id ()));
}
return -1;
}
- return 1;
+ return result;
}
@@ -677,6 +658,7 @@ TAO_Transport::send_reply_message_i (const ACE_Message_Block *mb,
int const n =
this->send_synch_message_helper_i (synch_message, max_wait_time);
+ // What about partially sent messages.
if (n == -1 || n == 1)
{
return n;
@@ -730,10 +712,9 @@ TAO_Transport::send_reply_message_i (const ACE_Message_Block *mb,
int
TAO_Transport::send_synch_message_helper_i (TAO_Synch_Queued_Message &synch_message,
- ACE_Time_Value * /*max_wait_time*/)
+ ACE_Time_Value * max_wait_time)
{
- // @todo: Need to send timeouts for writing..
- int const n = this->drain_queue_i ();
+ int const n = this->drain_queue_i (max_wait_time);
if (n == -1)
{
@@ -851,7 +832,9 @@ TAO_Transport::handle_timeout (const ACE_Time_Value & /* current_time */,
typedef ACE_Reverse_Lock<ACE_Lock> TAO_REVERSE_LOCK;
TAO_REVERSE_LOCK reverse (*this->handler_lock_);
ACE_GUARD_RETURN (TAO_REVERSE_LOCK, ace_mon, reverse, -1);
- (void) flushing_strategy->flush_transport (this);
+ if (flushing_strategy->flush_transport (this, 0) == -1) {
+ return -1;
+ }
}
}
@@ -859,10 +842,10 @@ TAO_Transport::handle_timeout (const ACE_Time_Value & /* current_time */,
}
int
-TAO_Transport::drain_queue (void)
+TAO_Transport::drain_queue (ACE_Time_Value *max_wait_time)
{
ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->handler_lock_, -1);
- int const retval = this->drain_queue_i ();
+ int const retval = this->drain_queue_i (max_wait_time);
if (retval == 1)
{
@@ -880,9 +863,10 @@ TAO_Transport::drain_queue (void)
}
int
-TAO_Transport::drain_queue_helper (int &iovcnt, iovec iov[])
+TAO_Transport::drain_queue_helper (int &iovcnt, iovec iov[], ACE_Time_Value *max_wait_time)
{
size_t byte_count = 0;
+ ACE_Countdown_Time countdown (max_wait_time);
// ... send the message ...
ssize_t retval = -1;
@@ -895,7 +879,7 @@ TAO_Transport::drain_queue_helper (int &iovcnt, iovec iov[])
byte_count);
else
#endif /* TAO_HAS_SENDFILE==1 */
- retval = this->send (iov, iovcnt, byte_count);
+ retval = this->send (iov, iovcnt, byte_count, max_wait_time);
if (TAO_debug_level == 5)
{
@@ -956,7 +940,7 @@ TAO_Transport::drain_queue_helper (int &iovcnt, iovec iov[])
}
int
-TAO_Transport::drain_queue_i (void)
+TAO_Transport::drain_queue_i (ACE_Time_Value *max_wait_time)
{
// This is the vector used to send data, it must be declared outside
// the loop because after the loop there may still be data to be
@@ -1007,7 +991,8 @@ TAO_Transport::drain_queue_i (void)
// IOV_MAX elements ...
if (iovcnt == ACE_IOV_MAX)
{
- int const retval = this->drain_queue_helper (iovcnt, iov);
+ int const retval = this->drain_queue_helper (iovcnt, iov,
+ max_wait_time);
now = ACE_High_Res_Timer::gettimeofday_hr ();
@@ -1034,7 +1019,7 @@ TAO_Transport::drain_queue_i (void)
if (iovcnt != 0)
{
- int const retval = this->drain_queue_helper (iovcnt, iov);
+ int const retval = this->drain_queue_helper (iovcnt, iov, max_wait_time);
if (TAO_debug_level > 4)
{
@@ -1299,6 +1284,9 @@ TAO_Transport::send_asynchronous_message_i (TAO_Stub *stub,
}
}
+ bool partially_sent = false;
+ bool timeout_encountered = false;
+
if (try_sending_first)
{
ssize_t n = 0;
@@ -1322,6 +1310,7 @@ TAO_Transport::send_asynchronous_message_i (TAO_Stub *stub,
n = this->send_message_block_chain_i (message_block,
byte_count,
max_wait_time);
+
if (n == -1)
{
// ... if this is just an EWOULDBLOCK we must schedule the
@@ -1354,9 +1343,14 @@ TAO_Transport::send_asynchronous_message_i (TAO_Stub *stub,
return 0;
}
- // If it was partially sent, then we can't allow a timeout
- if (byte_count > 0)
- max_wait_time = 0;
+ if (byte_count > 0) {
+ partially_sent = true;
+ }
+
+ // If it was partially sent, then push to front of queue and don't flush
+ if (errno == ETIME) {
+ timeout_encountered = true;
+ }
if (TAO_debug_level > 6)
{
@@ -1388,7 +1382,9 @@ TAO_Transport::send_asynchronous_message_i (TAO_Stub *stub,
this->id ()));
}
- if (this->queue_message_i (message_block, max_wait_time) == -1)
+ bool front = (partially_sent ? true: false);
+
+ if (this->queue_message_i (message_block, max_wait_time, front) == -1)
{
if (TAO_debug_level > 0)
{
@@ -1401,44 +1397,56 @@ TAO_Transport::send_asynchronous_message_i (TAO_Stub *stub,
return -1;
}
- // ... if the queue is full we need to activate the output on the
- // queue ...
- bool must_flush = false;
- const bool constraints_reached =
- this->check_buffering_constraints_i (stub,
- must_flush);
+ // We can't flush if we have already encountered a timeout
+ if (!timeout_encountered)
+ {
+ // ... if the queue is full we need to activate the output on the
+ // queue ...
+ bool must_flush = false;
+ const bool constraints_reached =
+ this->check_buffering_constraints_i (stub,
+ must_flush);
- // ... but we also want to activate it if the message was partially
- // sent.... Plus, when we use the blocking flushing strategy the
- // queue is flushed as a side-effect of 'schedule_output()'
+ // ... but we also want to activate it if the message was partially
+ // sent.... Plus, when we use the blocking flushing strategy the
+ // queue is flushed as a side-effect of 'schedule_output()'
- TAO_Flushing_Strategy *flushing_strategy =
- this->orb_core ()->flushing_strategy ();
+ TAO_Flushing_Strategy *flushing_strategy =
+ this->orb_core ()->flushing_strategy ();
- if (constraints_reached || try_sending_first)
- {
- int const result = flushing_strategy->schedule_output (this);
- if (result == TAO_Flushing_Strategy::MUST_FLUSH)
+ if (constraints_reached || try_sending_first)
{
- must_flush = true;
+ int const result = flushing_strategy->schedule_output (this);
+ if (result == TAO_Flushing_Strategy::MUST_FLUSH)
+ {
+ must_flush = true;
+ }
}
- }
- if (must_flush)
- {
- typedef ACE_Reverse_Lock<ACE_Lock> TAO_REVERSE_LOCK;
- TAO_REVERSE_LOCK reverse (*this->handler_lock_);
- ACE_GUARD_RETURN (TAO_REVERSE_LOCK, ace_mon, reverse, -1);
+ if (must_flush)
+ {
+ typedef ACE_Reverse_Lock<ACE_Lock> TAO_REVERSE_LOCK;
+ TAO_REVERSE_LOCK reverse (*this->handler_lock_);
+ ACE_GUARD_RETURN (TAO_REVERSE_LOCK, ace_mon, reverse, -1);
- (void) flushing_strategy->flush_transport (this);
+ if (flushing_strategy->flush_transport (this, max_wait_time) == -1) {
+ // We may need to empty the transport queue here.
+ return -1;
+ }
+ }
}
+ else {
+ // We may need to empty the transport queue here as well.
+ // encountered a timeout
+ return -1;
+ }
return 0;
}
int
TAO_Transport::queue_message_i (const ACE_Message_Block *message_block,
- ACE_Time_Value *max_wait_time)
+ ACE_Time_Value *max_wait_time, bool back)
{
TAO_Queued_Message *queued_message = 0;
ACE_NEW_RETURN (queued_message,
@@ -1448,7 +1456,12 @@ TAO_Transport::queue_message_i (const ACE_Message_Block *message_block,
0,
true),
-1);
- queued_message->push_back (this->head_, this->tail_);
+ if (back) {
+ queued_message->push_back (this->head_, this->tail_);
+ }
+ else {
+ queued_message->push_front (this->head_, this->tail_);
+ }
return 0;
}
diff --git a/TAO/tao/Transport.h b/TAO/tao/Transport.h
index b4dc1a1b5cc..217835558b3 100644
--- a/TAO/tao/Transport.h
+++ b/TAO/tao/Transport.h
@@ -289,7 +289,7 @@ public:
TAO_Wait_Strategy *wait_strategy (void) const;
/// Callback method to reactively drain the outgoing data queue
- int handle_output (void);
+ int handle_output (ACE_Time_Value *max_wait_time);
/// Get the bidirectional flag
int bidirectional_flag (void) const;
@@ -687,7 +687,7 @@ protected:
/// @param max_wait_time The maximum time that the operation can
/// block, used in the implementation of timeouts.
int queue_message_i (const ACE_Message_Block *message_block,
- ACE_Time_Value *max_wait_time);
+ ACE_Time_Value *max_wait_time, bool back=true);
public:
/// Format and queue a message for @a stream
@@ -786,10 +786,10 @@ private:
* Returns 0 if there is more data to send, -1 if there was an error
* and 1 if the message was completely sent.
*/
- int drain_queue (void);
+ int drain_queue (ACE_Time_Value *max_wait_time);
/// Implement drain_queue() assuming the lock is held
- int drain_queue_i (void);
+ int drain_queue_i (ACE_Time_Value *max_wait_time);
/// Check if there are messages pending in the queue
/**
@@ -801,7 +801,7 @@ private:
bool queue_is_empty_i (void);
/// A helper routine used in drain_queue_i()
- int drain_queue_helper (int &iovcnt, iovec iov[]);
+ int drain_queue_helper (int &iovcnt, iovec iov[], ACE_Time_Value *max_wait_time);
/// These classes need privileged access to:
/// - schedule_output_i()