diff options
author | Adam Mitz <mitza-oci@users.noreply.github.com> | 2008-01-15 22:23:14 +0000 |
---|---|---|
committer | Adam Mitz <mitza-oci@users.noreply.github.com> | 2008-01-15 22:23:14 +0000 |
commit | a64ee193298479978c502a6ea98d37ac8945fc86 (patch) | |
tree | 07caae2b1f21e45394ae431604f786f0ba67250b | |
parent | a85f3ab0fabdadcaff8a7680e06b221fb83b973b (diff) | |
download | ATCD-a64ee193298479978c502a6ea98d37ac8945fc86.tar.gz |
ChangeLogTag: Tue Jan 15 22:19:48 UTC 2008 Adam Mitz <mitza@ociweb.com>
-rw-r--r-- | TAO/ChangeLog | 15 | ||||
-rw-r--r-- | TAO/tao/Block_Flushing_Strategy.cpp | 9 | ||||
-rw-r--r-- | TAO/tao/Block_Flushing_Strategy.h | 3 | ||||
-rw-r--r-- | TAO/tao/Connection_Handler.cpp | 2 | ||||
-rw-r--r-- | TAO/tao/Flushing_Strategy.h | 2 | ||||
-rw-r--r-- | TAO/tao/IIOP_Transport.cpp | 25 | ||||
-rw-r--r-- | TAO/tao/IIOP_Transport.h | 5 | ||||
-rw-r--r-- | TAO/tao/Leader_Follower_Flushing_Strategy.cpp | 5 | ||||
-rw-r--r-- | TAO/tao/Leader_Follower_Flushing_Strategy.h | 2 | ||||
-rw-r--r-- | TAO/tao/Reactive_Flushing_Strategy.cpp | 5 | ||||
-rw-r--r-- | TAO/tao/Reactive_Flushing_Strategy.h | 3 | ||||
-rw-r--r-- | TAO/tao/Transport.cpp | 229 | ||||
-rw-r--r-- | TAO/tao/Transport.h | 10 |
13 files changed, 159 insertions, 156 deletions
diff --git a/TAO/ChangeLog b/TAO/ChangeLog index 9630577bd58..4bcae696385 100644 --- a/TAO/ChangeLog +++ b/TAO/ChangeLog @@ -1,3 +1,18 @@ +Tue Jan 15 22:19:48 UTC 2008 Adam Mitz <mitza@ociweb.com> + + * tao/Block_Flushing_Strategy.h: + * tao/Block_Flushing_Strategy.cpp: + * tao/Connection_Handler.cpp: + * tao/Flushing_Strategy.h: + * tao/IIOP_Transport.h: + * tao/IIOP_Transport.cpp: + * tao/Leader_Follower_Flushing_Strategy.h: + * tao/Leader_Follower_Flushing_Strategy.cpp: + * tao/Reactive_Flushing_Strategy.h: + * tao/Reactive_Flushing_Strategy.cpp: + * tao/Transport.h: + * tao/Transport.cpp: + Tue Jan 15 19:49:31 UTC 2008 Ciju John <johnc at ociweb dot com> * tests/Bug_3193_Regression/test_i.h: diff --git a/TAO/tao/Block_Flushing_Strategy.cpp b/TAO/tao/Block_Flushing_Strategy.cpp index 87082f19819..f6ef164b948 100644 --- a/TAO/tao/Block_Flushing_Strategy.cpp +++ b/TAO/tao/Block_Flushing_Strategy.cpp @@ -23,22 +23,23 @@ TAO_Block_Flushing_Strategy::cancel_output (TAO_Transport *) int TAO_Block_Flushing_Strategy::flush_message (TAO_Transport *transport, TAO_Queued_Message *msg, - ACE_Time_Value *) + ACE_Time_Value *max_wait_time) { while (!msg->all_data_sent ()) { - if (transport->handle_output () == -1) + if (transport->handle_output (max_wait_time) == -1) return -1; } return 0; } int -TAO_Block_Flushing_Strategy::flush_transport (TAO_Transport *transport) +TAO_Block_Flushing_Strategy::flush_transport (TAO_Transport *transport + , ACE_Time_Value *max_wait_time) { while (!transport->queue_is_empty ()) { - if (transport->handle_output () == -1) + if (transport->handle_output (max_wait_time) == -1) return -1; } return 0; diff --git a/TAO/tao/Block_Flushing_Strategy.h b/TAO/tao/Block_Flushing_Strategy.h index 835b97755e7..b82ec8db0ee 100644 --- a/TAO/tao/Block_Flushing_Strategy.h +++ b/TAO/tao/Block_Flushing_Strategy.h @@ -35,7 +35,8 @@ public: virtual int flush_message (TAO_Transport *transport, TAO_Queued_Message *msg, ACE_Time_Value *max_wait_time); - virtual int flush_transport (TAO_Transport *transport); + virtual int flush_transport (TAO_Transport *transport + , ACE_Time_Value *max_wait_time); }; TAO_END_VERSIONED_NAMESPACE_DECL diff --git a/TAO/tao/Connection_Handler.cpp b/TAO/tao/Connection_Handler.cpp index 527378707c8..f21b278d0b7 100644 --- a/TAO/tao/Connection_Handler.cpp +++ b/TAO/tao/Connection_Handler.cpp @@ -201,7 +201,7 @@ TAO_Connection_Handler::handle_output_eh ( return return_value; } - return_value = this->transport ()->handle_output (); + return_value = this->transport ()->handle_output (0); this->pos_io_hook (return_value); diff --git a/TAO/tao/Flushing_Strategy.h b/TAO/tao/Flushing_Strategy.h index 61c9b51de10..4bcab3d5589 100644 --- a/TAO/tao/Flushing_Strategy.h +++ b/TAO/tao/Flushing_Strategy.h @@ -75,7 +75,7 @@ public: ACE_Time_Value *max_wait_time) = 0; /// Wait until the transport has no messages queued. - virtual int flush_transport (TAO_Transport *transport) = 0; + virtual int flush_transport (TAO_Transport *transport, ACE_Time_Value *max_wait_time) = 0; }; TAO_END_VERSIONED_NAMESPACE_DECL diff --git a/TAO/tao/IIOP_Transport.cpp b/TAO/tao/IIOP_Transport.cpp index 45e441d8f87..5dfcfbc8cc8 100644 --- a/TAO/tao/IIOP_Transport.cpp +++ b/TAO/tao/IIOP_Transport.cpp @@ -66,7 +66,6 @@ TAO_IIOP_Transport::send (iovec *iov, int iovcnt, this->connection_handler_->peer ().sendv (iov, iovcnt, max_wait_time); - if (retval > 0) bytes_transferred = retval; else @@ -259,30 +258,6 @@ TAO_IIOP_Transport::send_message (TAO_OutputCDR &stream, } int -TAO_IIOP_Transport::send_message_shared ( - TAO_Stub *stub, - TAO_Message_Semantics message_semantics, - const ACE_Message_Block *message_block, - ACE_Time_Value *max_wait_time) -{ - int r; - - { - ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->handler_lock_, -1); - - r = this->send_message_shared_i (stub, message_semantics, - message_block, max_wait_time); - } - - if (r == -1) - { - this->close_connection (); - } - - return r; -} - -int TAO_IIOP_Transport::generate_request_header (TAO_Operation_Details &opdetails, TAO_Target_Specification &spec, TAO_OutputCDR &msg) diff --git a/TAO/tao/IIOP_Transport.h b/TAO/tao/IIOP_Transport.h index 25485689663..205aa3aee25 100644 --- a/TAO/tao/IIOP_Transport.h +++ b/TAO/tao/IIOP_Transport.h @@ -90,11 +90,6 @@ protected: virtual ssize_t recv (char *buf, size_t len, const ACE_Time_Value *s = 0); - virtual int send_message_shared (TAO_Stub *stub, - TAO_Message_Semantics message_semantics, - const ACE_Message_Block *message_block, - ACE_Time_Value *max_wait_time); - public: diff --git a/TAO/tao/Leader_Follower_Flushing_Strategy.cpp b/TAO/tao/Leader_Follower_Flushing_Strategy.cpp index db4e9fa6831..5efc7bbd8a5 100644 --- a/TAO/tao/Leader_Follower_Flushing_Strategy.cpp +++ b/TAO/tao/Leader_Follower_Flushing_Strategy.cpp @@ -40,7 +40,8 @@ TAO_Leader_Follower_Flushing_Strategy::flush_message ( int TAO_Leader_Follower_Flushing_Strategy::flush_transport ( - TAO_Transport *transport) + TAO_Transport *transport, + ACE_Time_Value *max_wait_time) { // @todo This is not the right way to do this.... @@ -50,7 +51,7 @@ TAO_Leader_Follower_Flushing_Strategy::flush_transport ( while (!transport->queue_is_empty ()) { - if (orb_core->run (0, 1) == -1) + if (orb_core->run (max_wait_time, 1) == -1) return -1; } } diff --git a/TAO/tao/Leader_Follower_Flushing_Strategy.h b/TAO/tao/Leader_Follower_Flushing_Strategy.h index 421ec7a591f..fba9f0e514f 100644 --- a/TAO/tao/Leader_Follower_Flushing_Strategy.h +++ b/TAO/tao/Leader_Follower_Flushing_Strategy.h @@ -38,7 +38,7 @@ public: virtual int flush_message (TAO_Transport *transport, TAO_Queued_Message *msg, ACE_Time_Value *max_wait_time); - virtual int flush_transport (TAO_Transport *transport); + virtual int flush_transport (TAO_Transport *transport, ACE_Time_Value *max_wait_time); }; TAO_END_VERSIONED_NAMESPACE_DECL diff --git a/TAO/tao/Reactive_Flushing_Strategy.cpp b/TAO/tao/Reactive_Flushing_Strategy.cpp index 312b955192b..13060233926 100644 --- a/TAO/tao/Reactive_Flushing_Strategy.cpp +++ b/TAO/tao/Reactive_Flushing_Strategy.cpp @@ -51,7 +51,8 @@ TAO_Reactive_Flushing_Strategy::flush_message (TAO_Transport *transport, } int -TAO_Reactive_Flushing_Strategy::flush_transport (TAO_Transport *transport) +TAO_Reactive_Flushing_Strategy::flush_transport (TAO_Transport *transport + , ACE_Time_Value *max_wait_time) { // @@ Should we pass this down? Can we? try @@ -60,7 +61,7 @@ TAO_Reactive_Flushing_Strategy::flush_transport (TAO_Transport *transport) while (!transport->queue_is_empty ()) { - if (orb_core->run (0, 1) == -1) + if (orb_core->run (max_wait_time, 1) == -1) return -1; } } diff --git a/TAO/tao/Reactive_Flushing_Strategy.h b/TAO/tao/Reactive_Flushing_Strategy.h index 56896b01f95..1ac4072867a 100644 --- a/TAO/tao/Reactive_Flushing_Strategy.h +++ b/TAO/tao/Reactive_Flushing_Strategy.h @@ -36,7 +36,8 @@ public: virtual int flush_message (TAO_Transport *transport, TAO_Queued_Message *msg, ACE_Time_Value *max_wait_time); - virtual int flush_transport (TAO_Transport *transport); + virtual int flush_transport (TAO_Transport *transport + , ACE_Time_Value *max_wait_time); }; TAO_END_VERSIONED_NAMESPACE_DECL diff --git a/TAO/tao/Transport.cpp b/TAO/tao/Transport.cpp index 5b8360e7cbf..6a86a410c78 100644 --- a/TAO/tao/Transport.cpp +++ b/TAO/tao/Transport.cpp @@ -29,6 +29,7 @@ #include "ace/Reactor.h" #include "ace/os_include/sys/os_uio.h" #include "ace/High_Res_Timer.h" +#include "ace/Countdown_Time.h" #include "ace/CORBA_macros.h" /* @@ -306,6 +307,10 @@ TAO_Transport::send_message_shared (TAO_Stub *stub, if (result == -1) { + // The connection needs to be closed here. + // In the case of a partially written message this is the only way to cleanup + // the physical connection as well as the Transport. An EOF on the remote end + // will cancel the partially received message. this->close_connection (); } @@ -480,7 +485,7 @@ TAO_Transport::update_transport (void) * */ int -TAO_Transport::handle_output (void) +TAO_Transport::handle_output (ACE_Time_Value *max_wait_time) { if (TAO_debug_level > 3) { @@ -492,7 +497,7 @@ TAO_Transport::handle_output (void) // The flushing strategy (potentially via the Reactor) wants to send // more data, first check if there is a current message that needs // more sending... - int const retval = this->drain_queue (); + int const retval = this->drain_queue (max_wait_time); if (TAO_debug_level > 3) { @@ -532,7 +537,7 @@ TAO_Transport::send_message_block_chain (const ACE_Message_Block *mb, int TAO_Transport::send_message_block_chain_i (const ACE_Message_Block *mb, size_t &bytes_transferred, - ACE_Time_Value *) + ACE_Time_Value *max_wait_time) { size_t const total_length = mb->total_length (); @@ -542,7 +547,7 @@ TAO_Transport::send_message_block_chain_i (const ACE_Message_Block *mb, synch_message.push_back (this->head_, this->tail_); - int const n = this->drain_queue_i (); + int const n = this->drain_queue_i (max_wait_time); if (n == -1) { @@ -570,98 +575,74 @@ TAO_Transport::send_synchronous_message_i (const ACE_Message_Block *mb, // We are going to block, so there is no need to clone // the message block. TAO_Synch_Queued_Message synch_message (mb, this->orb_core_); - size_t const message_length = synch_message.message_length (); synch_message.push_back (this->head_, this->tail_); - int const n = this->send_synch_message_helper_i (synch_message, - 0 /*ignored*/); - if (n == -1 || n == 1) + int const result = this->send_synch_message_helper_i (synch_message, + max_wait_time); + // A timeout doesn't return -1 + if (result == -1 || result == 1) { - return n; + return result; } - // @todo: Check for timeouts! - // if (max_wait_time != 0 && errno == ETIME) return -1; - TAO_Flushing_Strategy *flushing_strategy = - this->orb_core ()->flushing_strategy (); - int result = flushing_strategy->schedule_output (this); - if (result == -1) + if (max_wait_time == 0 || errno != ETIME) { - synch_message.remove_from_list (this->head_, this->tail_); - if (TAO_debug_level > 0) + TAO_Flushing_Strategy *flushing_strategy = + this->orb_core ()->flushing_strategy (); + int result = flushing_strategy->schedule_output (this); + if (result == -1) { - ACE_ERROR ((LM_ERROR, - ACE_TEXT ("TAO (%P|%t) - Transport[%d]::") - ACE_TEXT ("send_synchronous_message_i, ") - ACE_TEXT ("error while scheduling flush - %m\n"), - this->id ())); + synch_message.remove_from_list (this->head_, this->tail_); + if (TAO_debug_level > 0) + { + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("TAO (%P|%t) - Transport[%d]::") + ACE_TEXT ("send_synchronous_message_i, ") + ACE_TEXT ("error while scheduling flush - %m\n"), + this->id ())); + } + return -1; } - return -1; - } - // No need to check for result == TAO_Flushing_Strategy::MUST_FLUSH, - // because we're always going to flush anyway. + // No need to check for result == TAO_Flushing_Strategy::MUST_FLUSH, + // because we're always going to flush anyway. - // Release the mutex, other threads may modify the queue as we - // block for a long time writing out data. - { - typedef ACE_Reverse_Lock<ACE_Lock> TAO_REVERSE_LOCK; - TAO_REVERSE_LOCK reverse (*this->handler_lock_); - ACE_GUARD_RETURN (TAO_REVERSE_LOCK, ace_mon, reverse, -1); + // Release the mutex, other threads may modify the queue as we + // block for a long time writing out data. + { + typedef ACE_Reverse_Lock<ACE_Lock> TAO_REVERSE_LOCK; + TAO_REVERSE_LOCK reverse (*this->handler_lock_); + ACE_GUARD_RETURN (TAO_REVERSE_LOCK, ace_mon, reverse, -1); - result = flushing_strategy->flush_message (this, - &synch_message, - max_wait_time); - } + result = flushing_strategy->flush_message (this, + &synch_message, + max_wait_time); + } + } + // The result could be -1 from either the send_synch_message_helper_i() call + // or the later flush. In either case return -1. if (result == -1) { synch_message.remove_from_list (this->head_, this->tail_); - if (errno == ETIME) - { - // If partially sent, then we must queue the remainder. - if (message_length != synch_message.message_length ()) - { - // This is a timeout, there is only one nasty case: the - // message has been partially sent! We simply cannot take - // the message out of the queue, because that would corrupt - // the connection. - // - // What we do is replace the queued message with an - // asynchronous message, that contains only what remains of - // the timed out request. If you think about sending - // CancelRequests in this case: there is no much point in - // doing that: the receiving ORB would probably ignore it, - // and figuring out the request ID would be a bit of a - // nightmare. - // - TAO_Queued_Message *queued_message = 0; - ACE_NEW_RETURN (queued_message, - TAO_Asynch_Queued_Message ( - synch_message.current_block (), - this->orb_core_, - 0, // no timeout - 0, - true), - -1); - queued_message->push_front (this->head_, this->tail_); - } - } + // We don't need to do anything special for the timeout case. + // The connection is going to get closed and the Transport destroyed. + // The only thing to do maybe is to empty the queue. if (TAO_debug_level > 0) { ACE_ERROR ((LM_ERROR, ACE_TEXT ("TAO (%P|%t) - Transport[%d]::send_synchronous_message_i, ") - ACE_TEXT ("error while flushing message - %m\n"), + ACE_TEXT ("error while sending message - %m\n"), this->id ())); } return -1; } - return 1; + return result; } @@ -677,6 +658,7 @@ TAO_Transport::send_reply_message_i (const ACE_Message_Block *mb, int const n = this->send_synch_message_helper_i (synch_message, max_wait_time); + // What about partially sent messages. if (n == -1 || n == 1) { return n; @@ -730,10 +712,9 @@ TAO_Transport::send_reply_message_i (const ACE_Message_Block *mb, int TAO_Transport::send_synch_message_helper_i (TAO_Synch_Queued_Message &synch_message, - ACE_Time_Value * /*max_wait_time*/) + ACE_Time_Value * max_wait_time) { - // @todo: Need to send timeouts for writing.. - int const n = this->drain_queue_i (); + int const n = this->drain_queue_i (max_wait_time); if (n == -1) { @@ -851,7 +832,9 @@ TAO_Transport::handle_timeout (const ACE_Time_Value & /* current_time */, typedef ACE_Reverse_Lock<ACE_Lock> TAO_REVERSE_LOCK; TAO_REVERSE_LOCK reverse (*this->handler_lock_); ACE_GUARD_RETURN (TAO_REVERSE_LOCK, ace_mon, reverse, -1); - (void) flushing_strategy->flush_transport (this); + if (flushing_strategy->flush_transport (this, 0) == -1) { + return -1; + } } } @@ -859,10 +842,10 @@ TAO_Transport::handle_timeout (const ACE_Time_Value & /* current_time */, } int -TAO_Transport::drain_queue (void) +TAO_Transport::drain_queue (ACE_Time_Value *max_wait_time) { ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->handler_lock_, -1); - int const retval = this->drain_queue_i (); + int const retval = this->drain_queue_i (max_wait_time); if (retval == 1) { @@ -880,9 +863,10 @@ TAO_Transport::drain_queue (void) } int -TAO_Transport::drain_queue_helper (int &iovcnt, iovec iov[]) +TAO_Transport::drain_queue_helper (int &iovcnt, iovec iov[], ACE_Time_Value *max_wait_time) { size_t byte_count = 0; + ACE_Countdown_Time countdown (max_wait_time); // ... send the message ... ssize_t retval = -1; @@ -895,7 +879,7 @@ TAO_Transport::drain_queue_helper (int &iovcnt, iovec iov[]) byte_count); else #endif /* TAO_HAS_SENDFILE==1 */ - retval = this->send (iov, iovcnt, byte_count); + retval = this->send (iov, iovcnt, byte_count, max_wait_time); if (TAO_debug_level == 5) { @@ -956,7 +940,7 @@ TAO_Transport::drain_queue_helper (int &iovcnt, iovec iov[]) } int -TAO_Transport::drain_queue_i (void) +TAO_Transport::drain_queue_i (ACE_Time_Value *max_wait_time) { // This is the vector used to send data, it must be declared outside // the loop because after the loop there may still be data to be @@ -1007,7 +991,8 @@ TAO_Transport::drain_queue_i (void) // IOV_MAX elements ... if (iovcnt == ACE_IOV_MAX) { - int const retval = this->drain_queue_helper (iovcnt, iov); + int const retval = this->drain_queue_helper (iovcnt, iov, + max_wait_time); now = ACE_High_Res_Timer::gettimeofday_hr (); @@ -1034,7 +1019,7 @@ TAO_Transport::drain_queue_i (void) if (iovcnt != 0) { - int const retval = this->drain_queue_helper (iovcnt, iov); + int const retval = this->drain_queue_helper (iovcnt, iov, max_wait_time); if (TAO_debug_level > 4) { @@ -1299,6 +1284,9 @@ TAO_Transport::send_asynchronous_message_i (TAO_Stub *stub, } } + bool partially_sent = false; + bool timeout_encountered = false; + if (try_sending_first) { ssize_t n = 0; @@ -1322,6 +1310,7 @@ TAO_Transport::send_asynchronous_message_i (TAO_Stub *stub, n = this->send_message_block_chain_i (message_block, byte_count, max_wait_time); + if (n == -1) { // ... if this is just an EWOULDBLOCK we must schedule the @@ -1354,9 +1343,14 @@ TAO_Transport::send_asynchronous_message_i (TAO_Stub *stub, return 0; } - // If it was partially sent, then we can't allow a timeout - if (byte_count > 0) - max_wait_time = 0; + if (byte_count > 0) { + partially_sent = true; + } + + // If it was partially sent, then push to front of queue and don't flush + if (errno == ETIME) { + timeout_encountered = true; + } if (TAO_debug_level > 6) { @@ -1388,7 +1382,9 @@ TAO_Transport::send_asynchronous_message_i (TAO_Stub *stub, this->id ())); } - if (this->queue_message_i (message_block, max_wait_time) == -1) + bool front = (partially_sent ? true: false); + + if (this->queue_message_i (message_block, max_wait_time, front) == -1) { if (TAO_debug_level > 0) { @@ -1401,44 +1397,56 @@ TAO_Transport::send_asynchronous_message_i (TAO_Stub *stub, return -1; } - // ... if the queue is full we need to activate the output on the - // queue ... - bool must_flush = false; - const bool constraints_reached = - this->check_buffering_constraints_i (stub, - must_flush); + // We can't flush if we have already encountered a timeout + if (!timeout_encountered) + { + // ... if the queue is full we need to activate the output on the + // queue ... + bool must_flush = false; + const bool constraints_reached = + this->check_buffering_constraints_i (stub, + must_flush); - // ... but we also want to activate it if the message was partially - // sent.... Plus, when we use the blocking flushing strategy the - // queue is flushed as a side-effect of 'schedule_output()' + // ... but we also want to activate it if the message was partially + // sent.... Plus, when we use the blocking flushing strategy the + // queue is flushed as a side-effect of 'schedule_output()' - TAO_Flushing_Strategy *flushing_strategy = - this->orb_core ()->flushing_strategy (); + TAO_Flushing_Strategy *flushing_strategy = + this->orb_core ()->flushing_strategy (); - if (constraints_reached || try_sending_first) - { - int const result = flushing_strategy->schedule_output (this); - if (result == TAO_Flushing_Strategy::MUST_FLUSH) + if (constraints_reached || try_sending_first) { - must_flush = true; + int const result = flushing_strategy->schedule_output (this); + if (result == TAO_Flushing_Strategy::MUST_FLUSH) + { + must_flush = true; + } } - } - if (must_flush) - { - typedef ACE_Reverse_Lock<ACE_Lock> TAO_REVERSE_LOCK; - TAO_REVERSE_LOCK reverse (*this->handler_lock_); - ACE_GUARD_RETURN (TAO_REVERSE_LOCK, ace_mon, reverse, -1); + if (must_flush) + { + typedef ACE_Reverse_Lock<ACE_Lock> TAO_REVERSE_LOCK; + TAO_REVERSE_LOCK reverse (*this->handler_lock_); + ACE_GUARD_RETURN (TAO_REVERSE_LOCK, ace_mon, reverse, -1); - (void) flushing_strategy->flush_transport (this); + if (flushing_strategy->flush_transport (this, max_wait_time) == -1) { + // We may need to empty the transport queue here. + return -1; + } + } } + else { + // We may need to empty the transport queue here as well. + // encountered a timeout + return -1; + } return 0; } int TAO_Transport::queue_message_i (const ACE_Message_Block *message_block, - ACE_Time_Value *max_wait_time) + ACE_Time_Value *max_wait_time, bool back) { TAO_Queued_Message *queued_message = 0; ACE_NEW_RETURN (queued_message, @@ -1448,7 +1456,12 @@ TAO_Transport::queue_message_i (const ACE_Message_Block *message_block, 0, true), -1); - queued_message->push_back (this->head_, this->tail_); + if (back) { + queued_message->push_back (this->head_, this->tail_); + } + else { + queued_message->push_front (this->head_, this->tail_); + } return 0; } diff --git a/TAO/tao/Transport.h b/TAO/tao/Transport.h index b4dc1a1b5cc..217835558b3 100644 --- a/TAO/tao/Transport.h +++ b/TAO/tao/Transport.h @@ -289,7 +289,7 @@ public: TAO_Wait_Strategy *wait_strategy (void) const; /// Callback method to reactively drain the outgoing data queue - int handle_output (void); + int handle_output (ACE_Time_Value *max_wait_time); /// Get the bidirectional flag int bidirectional_flag (void) const; @@ -687,7 +687,7 @@ protected: /// @param max_wait_time The maximum time that the operation can /// block, used in the implementation of timeouts. int queue_message_i (const ACE_Message_Block *message_block, - ACE_Time_Value *max_wait_time); + ACE_Time_Value *max_wait_time, bool back=true); public: /// Format and queue a message for @a stream @@ -786,10 +786,10 @@ private: * Returns 0 if there is more data to send, -1 if there was an error * and 1 if the message was completely sent. */ - int drain_queue (void); + int drain_queue (ACE_Time_Value *max_wait_time); /// Implement drain_queue() assuming the lock is held - int drain_queue_i (void); + int drain_queue_i (ACE_Time_Value *max_wait_time); /// Check if there are messages pending in the queue /** @@ -801,7 +801,7 @@ private: bool queue_is_empty_i (void); /// A helper routine used in drain_queue_i() - int drain_queue_helper (int &iovcnt, iovec iov[]); + int drain_queue_helper (int &iovcnt, iovec iov[], ACE_Time_Value *max_wait_time); /// These classes need privileged access to: /// - schedule_output_i() |