From 4b7ada9222c580924e1d4ed6d96047ee5b66973b Mon Sep 17 00:00:00 2001 From: coryan Date: Sat, 14 Apr 2001 01:52:18 +0000 Subject: ChangeLogTag:Fri Apr 13 18:49:32 2001 Carlos O'Ryan --- TAO/ChangeLogs/ChangeLog-02a | 21 ++++++++++++++++ TAO/tao/Block_Flushing_Strategy.cpp | 3 ++- TAO/tao/Block_Flushing_Strategy.h | 3 ++- TAO/tao/Flushing_Strategy.h | 3 ++- TAO/tao/Queued_Message.cpp | 21 +++++++++++++++- TAO/tao/Queued_Message.h | 10 ++++---- TAO/tao/Reactive_Flushing_Strategy.cpp | 5 ++-- TAO/tao/Reactive_Flushing_Strategy.h | 3 ++- TAO/tao/Synch_Queued_Message.cpp | 6 +++++ TAO/tao/Synch_Queued_Message.h | 2 ++ TAO/tao/Transport.cpp | 45 +++++++++++++++++++++++++++++++--- 11 files changed, 106 insertions(+), 16 deletions(-) diff --git a/TAO/ChangeLogs/ChangeLog-02a b/TAO/ChangeLogs/ChangeLog-02a index 891d219d886..618a89ff974 100644 --- a/TAO/ChangeLogs/ChangeLog-02a +++ b/TAO/ChangeLogs/ChangeLog-02a @@ -1,3 +1,24 @@ +Fri Apr 13 18:49:32 2001 Carlos O'Ryan + + * tao/Flushing_Strategy.h: + * tao/Block_Flushing_Strategy.h: + * tao/Block_Flushing_Strategy.cpp: + * tao/Reactive_Flushing_Strategy.h: + * tao/Reactive_Flushing_Strategy.cpp: + Add timeout to the flush_message() operation. + + * tao/Transport.cpp: + Use the new timeout of flush_message() to implement timeouts + for twoways blocked during writes. + + * tao/Queued_Message.h: + * tao/Queued_Message.cpp: + Add method to insert message at the head of the queue. + + * tao/Synch_Queued_Message.h: + * tao/Synch_Queued_Message.cpp: + Add accessor to obtain the current message block. + Thu Apr 12 20:15:22 2001 Carlos O'Ryan * tao/Sync_Strategies.h: diff --git a/TAO/tao/Block_Flushing_Strategy.cpp b/TAO/tao/Block_Flushing_Strategy.cpp index c5e680f7290..fc402ec48bf 100644 --- a/TAO/tao/Block_Flushing_Strategy.cpp +++ b/TAO/tao/Block_Flushing_Strategy.cpp @@ -21,7 +21,8 @@ TAO_Block_Flushing_Strategy::cancel_output (TAO_Transport *) int TAO_Block_Flushing_Strategy::flush_message (TAO_Transport *transport, - TAO_Queued_Message *msg) + TAO_Queued_Message *msg, + ACE_Time_Value *) { while (!msg->all_data_sent ()) { diff --git a/TAO/tao/Block_Flushing_Strategy.h b/TAO/tao/Block_Flushing_Strategy.h index f2524ac2e7b..9b41ef8dd17 100644 --- a/TAO/tao/Block_Flushing_Strategy.h +++ b/TAO/tao/Block_Flushing_Strategy.h @@ -31,7 +31,8 @@ public: virtual int schedule_output (TAO_Transport *transport); virtual int cancel_output (TAO_Transport *transport); virtual int flush_message (TAO_Transport *transport, - TAO_Queued_Message *msg); + TAO_Queued_Message *msg, + ACE_Time_Value *max_wait_time); virtual int flush_transport (TAO_Transport *transport); }; diff --git a/TAO/tao/Flushing_Strategy.h b/TAO/tao/Flushing_Strategy.h index 5d1257a802c..0873936ccf4 100644 --- a/TAO/tao/Flushing_Strategy.h +++ b/TAO/tao/Flushing_Strategy.h @@ -58,7 +58,8 @@ public: /// Wait until msg is sent out. Potentially other messages are /// flushed too, for example, because there are ahead in the queue. virtual int flush_message (TAO_Transport *transport, - TAO_Queued_Message *msg) = 0; + TAO_Queued_Message *msg, + ACE_Time_Value *max_wait_time) = 0; /// Wait until the transport has no messages queued. virtual int flush_transport (TAO_Transport *transport) = 0; diff --git a/TAO/tao/Queued_Message.cpp b/TAO/tao/Queued_Message.cpp index 7d3b19c45c5..0b84413bd46 100644 --- a/TAO/tao/Queued_Message.cpp +++ b/TAO/tao/Queued_Message.cpp @@ -39,7 +39,7 @@ void TAO_Queued_Message::send_failure (void) { this->send_failure_ = 1; - + if (this->callback_ != 0) { this->callback_->send_failed (); @@ -82,3 +82,22 @@ TAO_Queued_Message::push_back (TAO_Queued_Message *&head, this->next_ = 0; tail = this; } + +void +TAO_Queued_Message::push_front (TAO_Queued_Message *&head, + TAO_Queued_Message *&tail) +{ + if (head == 0) + { + tail = this; + head = this; + this->next_ = 0; + this->prev_ = 0; + return; + } + + head->prev_ = this; + this->next_ = head; + this->prev_ = 0; + head = this; +} diff --git a/TAO/tao/Queued_Message.h b/TAO/tao/Queued_Message.h index b050d1a83c8..e403df1d8d1 100644 --- a/TAO/tao/Queued_Message.h +++ b/TAO/tao/Queued_Message.h @@ -117,13 +117,13 @@ public: virtual void remove_from_list (TAO_Queued_Message *&head, TAO_Queued_Message *&tail); - /// Insert the current element after position. - /** - * If position is null then we assume that we are inserting the - * current element into an empty list. - */ + /// Insert the current element at the tail of the queue. virtual void push_back (TAO_Queued_Message *&head, TAO_Queued_Message *&tail); + + /// Insert the current element at the head of the queue. + virtual void push_front (TAO_Queued_Message *&head, + TAO_Queued_Message *&tail); //@} /** @name Template Methods diff --git a/TAO/tao/Reactive_Flushing_Strategy.cpp b/TAO/tao/Reactive_Flushing_Strategy.cpp index ef3d1521924..ed2cdd1da84 100644 --- a/TAO/tao/Reactive_Flushing_Strategy.cpp +++ b/TAO/tao/Reactive_Flushing_Strategy.cpp @@ -23,7 +23,8 @@ TAO_Reactive_Flushing_Strategy::cancel_output (TAO_Transport *transport) int TAO_Reactive_Flushing_Strategy::flush_message (TAO_Transport *transport, - TAO_Queued_Message *msg) + TAO_Queued_Message *msg, + ACE_Time_Value *max_wait_time) { int result = 0; @@ -35,7 +36,7 @@ TAO_Reactive_Flushing_Strategy::flush_message (TAO_Transport *transport, while (!msg->all_data_sent () && result >= 0) { - result = orb_core->run (0, 1, ACE_TRY_ENV); + result = orb_core->run (max_wait_time, 1, ACE_TRY_ENV); ACE_TRY_CHECK; } } diff --git a/TAO/tao/Reactive_Flushing_Strategy.h b/TAO/tao/Reactive_Flushing_Strategy.h index 16371c75253..e1aba16bbb0 100644 --- a/TAO/tao/Reactive_Flushing_Strategy.h +++ b/TAO/tao/Reactive_Flushing_Strategy.h @@ -31,7 +31,8 @@ public: virtual int schedule_output (TAO_Transport *transport); virtual int cancel_output (TAO_Transport *transport); virtual int flush_message (TAO_Transport *transport, - TAO_Queued_Message *msg); + TAO_Queued_Message *msg, + ACE_Time_Value *max_wait_time); virtual int flush_transport (TAO_Transport *transport); }; diff --git a/TAO/tao/Synch_Queued_Message.cpp b/TAO/tao/Synch_Queued_Message.cpp index e6aae104b00..545a1fdadcc 100644 --- a/TAO/tao/Synch_Queued_Message.cpp +++ b/TAO/tao/Synch_Queued_Message.cpp @@ -16,6 +16,12 @@ TAO_Synch_Queued_Message::~TAO_Synch_Queued_Message (void) { } +const ACE_Message_Block * +TAO_Synch_Queued_Message::current_block (void) const +{ + return this->current_block_; +} + size_t TAO_Synch_Queued_Message::message_length (void) const { diff --git a/TAO/tao/Synch_Queued_Message.h b/TAO/tao/Synch_Queued_Message.h index 75f2c258e54..3e0ea5a7610 100644 --- a/TAO/tao/Synch_Queued_Message.h +++ b/TAO/tao/Synch_Queued_Message.h @@ -53,6 +53,8 @@ public: /// Destructor virtual ~TAO_Synch_Queued_Message (void); + const ACE_Message_Block *current_block (void) const; + /** Implement the Template Methods from TAO_Queued_Message */ //@{ diff --git a/TAO/tao/Transport.cpp b/TAO/tao/Transport.cpp index 9819664e789..495ed846039 100644 --- a/TAO/tao/Transport.cpp +++ b/TAO/tao/Transport.cpp @@ -179,7 +179,7 @@ dump_iov (iovec *iov, int iovcnt, int id, id, location, i, iovcnt, iov_len)); - + size_t len; for (size_t offset = 0; offset < iov_len; offset += len) { @@ -463,10 +463,47 @@ TAO_Transport::send_synchronous_message_i (TAO_Stub *stub, ACE_GUARD_RETURN (TAO_REVERSE_SYNCH_MUTEX, ace_mon, reverse, -1); result = flushing_strategy->flush_message (this, - &synch_message); - + &synch_message, + max_wait_time); } - ACE_ASSERT (synch_message.all_data_sent () != 0); + if (result == -1&& errno == ETIME) + { + if (this->head_ != &synch_message) + { + synch_message.remove_from_list (this->head_, this->tail_); + } + + else + { + // This is a timeout, there is only one nasty case: the + // message has been partially sent! We simply cannot take + // the message out of the queue, because that would corrupt + // the connection. + // + // What we do is replace the queued message with an + // asynchronous message, that contains only what remains of + // the timed out request. If you think about sending + // CancelRequests in this case: there is no much point in + // doing that: the receiving ORB would probably ignore it, + // and figuring out the request ID would be a bit of a + // nightmare. + // + + synch_message.remove_from_list (this->head_, this->tail_); + TAO_Queued_Message *queued_message = 0; + ACE_NEW_RETURN (queued_message, + TAO_Asynch_Queued_Message ( + synch_message.current_block ()), + -1); + queued_message->push_front (this->head_, this->tail_); + } + } + + else + { + ACE_ASSERT (synch_message.all_data_sent () != 0); + } + ACE_ASSERT (synch_message.next () == 0); ACE_ASSERT (synch_message.prev () == 0); return result; -- cgit v1.2.1