diff options
Diffstat (limited to 'TAO')
-rw-r--r-- | TAO/ChangeLogs/ChangeLog-02a | 14 | ||||
-rw-r--r-- | TAO/tao/IIOP_Transport.cpp | 30 | ||||
-rw-r--r-- | TAO/tao/IIOP_Transport.h | 3 | ||||
-rw-r--r-- | TAO/tao/Reactive_Flushing_Strategy.cpp | 26 | ||||
-rw-r--r-- | TAO/tao/Transport.cpp | 114 | ||||
-rw-r--r-- | TAO/tao/Transport.h | 31 |
6 files changed, 85 insertions, 133 deletions
diff --git a/TAO/ChangeLogs/ChangeLog-02a b/TAO/ChangeLogs/ChangeLog-02a index 36a0e5af586..808b235e3cb 100644 --- a/TAO/ChangeLogs/ChangeLog-02a +++ b/TAO/ChangeLogs/ChangeLog-02a @@ -1,3 +1,17 @@ +Sat Mar 17 17:52:27 2001 Carlos O'Ryan <coryan@uci.edu> + + * tao/Transport.h: + * tao/Transport.cpp: + Removed the current_message_ field, using the head of the queue + works just as well, at least as long as we always push events to + the end of the queue. + + * tao/IIOP_Transport.h: + * tao/IIOP_Transport.cpp: + * tao/Reactive_Flushing_Strategy.cpp: + Use the Flushing Strategy to schedule output and cancel output + with the reactor. + Sat Mar 17 15:34:14 2001 Carlos O'Ryan <coryan@uci.edu> * tests/Big_Oneways/Session.h: diff --git a/TAO/tao/IIOP_Transport.cpp b/TAO/tao/IIOP_Transport.cpp index 8cf934a8e3a..a27a6198ec9 100644 --- a/TAO/tao/IIOP_Transport.cpp +++ b/TAO/tao/IIOP_Transport.cpp @@ -330,36 +330,6 @@ TAO_IIOP_Transport::tear_listen_point_list (TAO_InputCDR &cdr) } int -TAO_IIOP_Transport::schedule_output (void) -{ - if (TAO_debug_level > 3) - { - ACE_DEBUG ((LM_DEBUG, - "TAO (%P|%t) - IIOP_Transport[%d]::schedule_output\n", - this->handle ())); - } - ACE_Reactor *r = - this->connection_handler_->reactor (); - return r->schedule_wakeup (this->connection_handler_, - ACE_Event_Handler::WRITE_MASK); -} - -int -TAO_IIOP_Transport::cancel_output (void) -{ - if (TAO_debug_level > 3) - { - ACE_DEBUG ((LM_DEBUG, - "TAO (%P|%t) - IIOP_Transport[%d]::cancel_output\n", - this->handle ())); - } - ACE_Reactor *r = - this->connection_handler_->reactor (); - return r->cancel_wakeup (this->connection_handler_, - ACE_Event_Handler::WRITE_MASK); -} - -int TAO_IIOP_Transport::process_message (void) { // Get the <message_type> that we have received diff --git a/TAO/tao/IIOP_Transport.h b/TAO/tao/IIOP_Transport.h index 94b8efb38a0..68ebf14a558 100644 --- a/TAO/tao/IIOP_Transport.h +++ b/TAO/tao/IIOP_Transport.h @@ -125,9 +125,6 @@ public: CORBA::Octet minor); virtual int tear_listen_point_list (TAO_InputCDR &cdr); - - virtual int schedule_output (void); - virtual int cancel_output (void); //@} private: diff --git a/TAO/tao/Reactive_Flushing_Strategy.cpp b/TAO/tao/Reactive_Flushing_Strategy.cpp index 58df0474633..e75f8079e6e 100644 --- a/TAO/tao/Reactive_Flushing_Strategy.cpp +++ b/TAO/tao/Reactive_Flushing_Strategy.cpp @@ -5,6 +5,7 @@ #include "Transport.h" #include "ORB_Core.h" #include "Queued_Message.h" +#include "debug.h" ACE_RCSID(tao, Reactive_Flushing_Strategy, "$Id$") @@ -14,19 +15,32 @@ TAO_Reactive_Flushing_Strategy::schedule_output (TAO_Transport *transport) ACE_Reactor *reactor = transport->orb_core ()->reactor (); - return reactor->register_handler (transport->event_handler (), - ACE_Event_Handler::READ_MASK - | ACE_Event_Handler::WRITE_MASK); + if (TAO_debug_level > 3) + { + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - Reactive_Flushing_Strategy[%d]::schedule_output\n", + transport->handle ())); + } + + return reactor->schedule_wakeup (transport->event_handler (), + ACE_Event_Handler::WRITE_MASK); } int TAO_Reactive_Flushing_Strategy::cancel_output (TAO_Transport *transport) { ACE_Reactor *reactor = - transport->orb_core ()->reactor (); + transport->event_handler ()->reactor (); + + if (TAO_debug_level > 3) + { + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - Reactive_Flushing_Strategy[%d]::cancel_output\n", + transport->handle ())); + } - return reactor->register_handler (transport->event_handler (), - ACE_Event_Handler::READ_MASK); + return reactor->cancel_wakeup (transport->event_handler (), + ACE_Event_Handler::WRITE_MASK); } int diff --git a/TAO/tao/Transport.cpp b/TAO/tao/Transport.cpp index c272e341914..24e2ba58788 100644 --- a/TAO/tao/Transport.cpp +++ b/TAO/tao/Transport.cpp @@ -30,7 +30,6 @@ TAO_Transport::TAO_Transport (CORBA::ULong tag, , bidirectional_flag_ (-1) , head_ (0) , tail_ (0) - , current_message_ (0) { TAO_Client_Strategy_Factory *cf = this->orb_core_->client_factory (); @@ -96,13 +95,12 @@ TAO_Transport::handle_output () { // ... there is no current message or it was completely // sent, time to check the queue.... - int dequeue = this->dequeue_next_message (); - if (dequeue == -1) - { - // ... no more messages in the queue, cancel output... - (void) this->cancel_output (); - return 0; - } + // ... no more messages in the queue, cancel output... + TAO_Flushing_Strategy *flushing_strategy = + this->orb_core ()->flushing_strategy (); + + flushing_strategy->cancel_output (this); + return 0; } // ... on Win32 we must continue until we get EWOULDBLOCK ... } @@ -240,13 +238,13 @@ TAO_Transport::send_current_message (void) { ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, ace_mon, this->queue_mutex_, -1); - if (this->current_message_ == 0) + if (this->head_ == 0) return 1; size_t bytes_transferred; ssize_t retval = - this->send_message_block_chain (this->current_message_->mb (), + this->send_message_block_chain (this->head_->mb (), bytes_transferred); if (retval == 0) { @@ -257,14 +255,14 @@ TAO_Transport::send_current_message (void) // Because there can be a partial transfer we need to adjust the // number of bytes sent. - this->current_message_->bytes_transferred (bytes_transferred); - if (this->current_message_->done ()) + this->head_->bytes_transferred (bytes_transferred); + if (this->head_->done ()) { // Remove the current message.... - // @@ We should be using a pool for these guys! - this->current_message_->destroy (); + TAO_Queued_Message *head = this->head_; + head->remove_from_list (this->head_, this->tail_); - this->current_message_ = 0; + head->destroy (); } if (retval == -1) @@ -278,33 +276,8 @@ TAO_Transport::send_current_message (void) return -1; } - if (this->current_message_ == 0) - return 1; - return 0; -} - -int -TAO_Transport::dequeue_next_message (void) -{ - ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, ace_mon, this->queue_mutex_, -1); if (this->head_ == 0) - return -1; - - this->current_message_ = this->head_; - this->current_message_->remove_from_list (this->head_, this->tail_); - - return 0; -} - -int -TAO_Transport::cancel_output (void) -{ - return 0; -} - -int -TAO_Transport::schedule_output (void) -{ + return 1; return 0; } @@ -314,6 +287,9 @@ TAO_Transport::send_message_i (TAO_Stub *stub, const ACE_Message_Block *message_block, ACE_Time_Value *max_wait_time) { + TAO_Flushing_Strategy *flushing_strategy = + this->orb_core ()->flushing_strategy (); + ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, ace_mon, this->queue_mutex_, -1); int queue_empty = (this->head_ == 0); @@ -321,7 +297,7 @@ TAO_Transport::send_message_i (TAO_Stub *stub, // Let's figure out if the message should be queued without trying // to send first: int must_queue = 0; - if (this->current_message_ != 0) + if (this->head_ != 0) must_queue = 1; else if (!twoway_flag && stub->sync_strategy ().must_queue (queue_empty)) @@ -342,16 +318,9 @@ TAO_Transport::send_message_i (TAO_Stub *stub, this->handle ())); } - size_t length = message_block->total_length (); - ACE_Message_Block *copy = - new ACE_Message_Block (length); - for (const ACE_Message_Block *i = message_block; - i != 0; - i = i->cont ()) - copy->copy (i->rd_ptr (), i->length ()); - queued_message = - new TAO_Queued_Message (copy, 1); + this->copy_message_block (message_block); + queued_message->push_back (this->head_, this->tail_); } else @@ -400,7 +369,7 @@ TAO_Transport::send_message_i (TAO_Stub *stub, // ... the message was only partially sent, schedule reactive // output... - this->schedule_output (); + flushing_strategy->schedule_output (this); // ... and set it as the current message ... if (twoway_flag) @@ -414,16 +383,8 @@ TAO_Transport::send_message_i (TAO_Stub *stub, } else { - size_t length = message_block->total_length (); - ACE_Message_Block *copy = - new ACE_Message_Block (length); - for (const ACE_Message_Block *i = message_block; - i != 0; - i = i->cont ()) - copy->copy (i->rd_ptr (), i->length ()); - queued_message = - new TAO_Queued_Message (copy, 1); + this->copy_message_block (message_block); } // @@ Revisit message queue allocations @@ -454,15 +415,15 @@ TAO_Transport::send_message_i (TAO_Stub *stub, queued_message->mb ()->total_length ())); } - this->current_message_ = queued_message; + // ... insert at the head of the queue, we can use push_back() + // because the queue is empty ... + queued_message->push_back (this->head_, this->tail_); } // ... two choices, this is a twoway request or not, if it is // then we must only return once the complete message has been // sent: - TAO_Flushing_Strategy *flushing_strategy = - this->orb_core ()->flushing_strategy (); if (twoway_flag) { // Release the mutex, other threads may modify the queue as we @@ -485,11 +446,6 @@ TAO_Transport::send_message_i (TAO_Stub *stub, msg_count++; total_bytes += i->mb ()->total_length (); } - if (this->current_message_ != 0) - { - msg_count++; - total_bytes += this->current_message_->mb ()->total_length (); - } int set_timer; ACE_Time_Value interval; @@ -549,3 +505,23 @@ TAO_Transport::reactor_signalling (void) { return 0; } + +TAO_Queued_Message * +TAO_Transport::copy_message_block (const ACE_Message_Block *message_block) +{ + size_t length = message_block->total_length (); + + // @@ Use Auto_Ptr<> to cleanup the message block, should the second + // allocation fail + ACE_Message_Block *copy; + ACE_NEW_RETURN (copy, ACE_Message_Block (length), 0); + for (const ACE_Message_Block *i = message_block; + i != 0; + i = i->cont ()) + copy->copy (i->rd_ptr (), i->length ()); + + TAO_Queued_Message *msg; + ACE_NEW_RETURN (msg, TAO_Queued_Message (copy, 1), 0); + + return msg; +} diff --git a/TAO/tao/Transport.h b/TAO/tao/Transport.h index f5729acdd34..d51ff70828a 100644 --- a/TAO/tao/Transport.h +++ b/TAO/tao/Transport.h @@ -71,10 +71,10 @@ class TAO_Queued_Message; * transport may already be sending another message in a reactive * fashion. * - * Consequently, the Transport must also keep a - * <TT>current_message</TT>, if the current message is not null any - * new messages must be queued. Only once the current message is - * completely sent we can take a message out of the queue. + * Consequently, the Transport must also know if the head of the queue + * has been partially sent. In that case new messages can only follow + * the head. Only once the head is completely sent we can start + * sending new messages. * * <H4>Waiting threads:</H4> One or more threads can be blocked * waiting for the connection to completely send the message. @@ -483,14 +483,6 @@ public: virtual int messaging_init (CORBA::Octet major, CORBA::Octet minor) = 0; - /// There is data queued or pending data in the current - /// message. Enable the reactive calls through the reactor - virtual int schedule_output (void); - - /// There is no more data to send, cancel any reactive calls through - /// the reactor - virtual int cancel_output (void); - //@} /// Send a message block chain, @@ -518,15 +510,8 @@ private: */ int send_current_message (void); - /// Dequeue the next message, if any, and continue sending data - /** - * Once a message is completely sent, a new message is dequeued and - * setup as the current message. - * - * Returns 0 if there is more data to send, -1 if there was an error - * and 1 if the message was completely sent. - */ - int dequeue_next_message (void); + /// Copy the contents of a message block into a Queued_Message + TAO_Queued_Message *copy_message_block (const ACE_Message_Block *mb); /// Prohibited ACE_UNIMPLEMENTED_FUNC (TAO_Transport (const TAO_Transport&)) @@ -585,10 +570,6 @@ protected: /// Implement the outgoing data queue TAO_Queued_Message *head_; TAO_Queued_Message *tail_; - - /// Once part of a message has been sent it is kept here until it is - /// completely sent - TAO_Queued_Message *current_message_; }; #if defined (__ACE_INLINE__) |