diff options
Diffstat (limited to 'TAO/tao/Transport.cpp')
-rw-r--r-- | TAO/tao/Transport.cpp | 888 |
1 files changed, 720 insertions, 168 deletions
diff --git a/TAO/tao/Transport.cpp b/TAO/tao/Transport.cpp index 32733dee5c5..ba8108218f1 100644 --- a/TAO/tao/Transport.cpp +++ b/TAO/tao/Transport.cpp @@ -12,36 +12,39 @@ #include "Sync_Strategies.h" #include "Connection_Handler.h" #include "Pluggable_Messaging.h" +#include "Synch_Queued_Message.h" +#include "Asynch_Queued_Message.h" +#include "Flushing_Strategy.h" #include "debug.h" +#include "ace/Message_Block.h" + #if !defined (__ACE_INLINE__) # include "Transport.inl" #endif /* __ACE_INLINE__ */ ACE_RCSID(tao, Transport, "$Id$") -TAO_Synch_Refcountable::TAO_Synch_Refcountable (ACE_Lock *lock, int refcount) +TAO_Synch_Refcountable::TAO_Synch_Refcountable (int refcount) : ACE_Refcountable (refcount) - , refcount_lock_ (lock) { } TAO_Synch_Refcountable::~TAO_Synch_Refcountable (void) { - delete this->refcount_lock_; } int TAO_Synch_Refcountable::increment (void) { - ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->refcount_lock_, 0); + ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->mutex_, 0); return ACE_Refcountable::increment (); } int TAO_Synch_Refcountable::decrement (void) { - ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->refcount_lock_, 0); + ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->mutex_, 0); return ACE_Refcountable::decrement (); } @@ -54,13 +57,16 @@ TAO_Synch_Refcountable::refcount (void) const // Constructor. TAO_Transport::TAO_Transport (CORBA::ULong tag, TAO_ORB_Core *orb_core) - : TAO_Synch_Refcountable (orb_core->resource_factory ()->create_cached_connection_lock (), 1) + : TAO_Synch_Refcountable (1) , tag_ (tag) , orb_core_ (orb_core) , cache_map_entry_ (0) - , buffering_queue_ (0) - , buffering_timer_id_ (0) , bidirectional_flag_ (-1) + , head_ (0) + , tail_ (0) + , current_deadline_ (ACE_Time_Value::zero) + , flush_timer_id_ (-1) + , transport_timer_ (this) , id_ ((long) this) { TAO_Client_Strategy_Factory *cf = @@ -85,31 +91,76 @@ TAO_Transport::~TAO_Transport (void) delete this->tms_; this->tms_ = 0; - delete this->buffering_queue_; - delete this->handler_lock_; -} + TAO_Queued_Message *i = this->head_; + while (i != 0) + { + // @@ This is a good point to insert a flag to indicate that a + // CloseConnection message was successfully received. + i->connection_closed (); + TAO_Queued_Message *tmp = i; + i = i->next (); -ssize_t -TAO_Transport::send_or_buffer (TAO_Stub *stub, - int two_way, - const ACE_Message_Block *message_block, - const ACE_Time_Value *max_wait_time) + tmp->destroy (); + } +} + +int +TAO_Transport::handle_output () { + if (TAO_debug_level > 4) + { + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - Transport[%d]::handle_output\n", + this->id ())); + } - if (stub == 0 || two_way) + // The flushing strategy (potentially via the Reactor) wants to send + // more data, first check if there is a current message that needs + // more sending... + int retval = this->drain_queue (); + + if (TAO_debug_level > 4) { - return this->send (message_block, max_wait_time); + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - Transport[%d]::handle_output, " + "drain_queue returns %d/%d\n", + this->id (), + retval, errno)); } - TAO_Sync_Strategy &sync_strategy = stub->sync_strategy (); + if (retval == 1) + { + // ... there is no current message or it was completely + // sent, cancel output... + TAO_Flushing_Strategy *flushing_strategy = + this->orb_core ()->flushing_strategy (); + + ACE_MT (ACE_GUARD_RETURN (ACE_Lock, guard, *this->handler_lock_, -1)); + + flushing_strategy->cancel_output (this); - return sync_strategy.send (*this, - *stub, - message_block, - max_wait_time); + if (this->flush_timer_id_ != -1) + { + ACE_Event_Handler *eh = this->event_handler_i (); + if (eh != 0) + { + ACE_Reactor *reactor = eh->reactor (); + if (reactor != 0) + { + (void) reactor->cancel_timer (this->flush_timer_id_); + } + } + this->current_deadline_ = ACE_Time_Value::zero; + this->flush_timer_id_ = -1; + } + return 0; + } + + // Any errors are returned directly to the Reactor + return retval; } void @@ -124,150 +175,342 @@ TAO_Transport::provide_handle (ACE_Handle_Set &handle_set) handle_set.set_bit (eh->get_handle ()); } -ssize_t -TAO_Transport::send_buffered_messages (const ACE_Time_Value *max_wait_time) +static void +dump_iov (iovec *iov, int iovcnt, int id, + size_t current_transfer, + const char *location) { - // Make sure we have a buffering queue and there are messages in it. - if (this->buffering_queue_ == 0 || - this->buffering_queue_->is_empty ()) - return 1; + ACE_Log_Msg::instance ()->acquire (); - // Now, we can take the lock and try to do something. - // - // @@CJC We might be able to reduce the length of time we hold - // the lock depending on whether or not we need to hold the - // hold the lock while we're doing queueing activities. - ACE_MT (ACE_GUARD_RETURN (ACE_Lock, - guard, - *this->handler_lock_, - -1)); + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - Transport[%d]::%s" + " sending %d buffers\n", + id, location, iovcnt)); + for (int i = 0; i != iovcnt && 0 < current_transfer; ++i) + { + size_t iov_len = iov[i].iov_len; + + // Possibly a partially sent iovec entry. + if (current_transfer < iov_len) + iov_len = current_transfer; + + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - Transport[%d]::%s" + " buffer %d/%d has %d bytes\n", + id, location, + i, iovcnt, + iov_len)); + + size_t len; + for (size_t offset = 0; offset < iov_len; offset += len) + { + char header[1024]; + ACE_OS::sprintf (header, + "TAO - Transport[%d]::%s (%d/%d)\n", + id, location, offset, iov_len); + + len = iov_len - offset; + if (len > 512) + len = 512; + ACE_HEX_DUMP ((LM_DEBUG, + ACE_static_cast(char*,iov[i].iov_base) + offset, + len, + header)); + } + current_transfer -= iov_len; + } + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - Transport[%d]::%s" + " end of data\n", + id, location)); + + ACE_Log_Msg::instance ()->release (); +} + +int +TAO_Transport::send_message_block_chain (const ACE_Message_Block *mb, + size_t &bytes_transferred, + ACE_Time_Value *max_wait_time) +{ + ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->handler_lock_, -1); + + if (this->check_event_handler_i ("TAO_Transport::send_message_block_chain") == -1) + return -1; + + return this->send_message_block_chain_i (mb, + bytes_transferred, + max_wait_time); +} + +int +TAO_Transport::send_message_block_chain_i (const ACE_Message_Block *mb, + size_t &bytes_transferred, + ACE_Time_Value *) +{ + size_t total_length = mb->total_length (); + + // We are going to block, so there is no need to clone + // the message block. + TAO_Synch_Queued_Message synch_message (mb); + + synch_message.push_back (this->head_, this->tail_); + + int n = this->drain_queue_i (); + if (n == -1) + { + synch_message.remove_from_list (this->head_, this->tail_); + ACE_ASSERT (synch_message.next () == 0); + ACE_ASSERT (synch_message.prev () == 0); + return -1; // Error while sending... + } + else if (n == 1) + { + ACE_ASSERT (synch_message.all_data_sent ()); + ACE_ASSERT (synch_message.next () == 0); + ACE_ASSERT (synch_message.prev () == 0); + return 1; // Empty queue, message was sent.. + } + + ACE_ASSERT (n == 0); // Some data sent, but data remains. + + // Remove the temporary message from the queue... + synch_message.remove_from_list (this->head_, this->tail_); + + bytes_transferred = + total_length - synch_message.message_length (); + + return 0; +} + +int +TAO_Transport::send_message_i (TAO_Stub *stub, + int is_synchronous, + const ACE_Message_Block *message_block, + ACE_Time_Value *max_wait_time) +{ + ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->handler_lock_, -1); + + if (this->check_event_handler_i ("TAO_Transport::send_message_i") == -1) + return -1; + + if (is_synchronous) + { + return this->send_synchronous_message_i (message_block, + max_wait_time); + } + + // Let's figure out if the message should be queued without trying + // to send first: + int try_sending_first = 1; + + int queue_empty = (this->head_ == 0); - // Get the first message from the queue. - ACE_Message_Block *queued_message = 0; - ssize_t result = this->buffering_queue_->peek_dequeue_head (queued_message); + if (!queue_empty) + try_sending_first = 0; + else if (stub->sync_strategy ().must_queue (queue_empty)) + try_sending_first = 0; - // @@ What to do here on failures? - ACE_ASSERT (result != -1); + size_t byte_count = 0; + ssize_t n; - // @@CJC take lock?? - // Actual network send. - size_t bytes_transferred = 0; - result = this->send_i (queued_message, - max_wait_time, - &bytes_transferred); - // @@CJC release lock?? + TAO_Flushing_Strategy *flushing_strategy = + this->orb_core ()->flushing_strategy (); - // Cannot send completely: timed out. - if (result == -1 && - errno == ETIME) + if (try_sending_first) { - if (bytes_transferred > 0) + // ... in this case we must try to send the message first ... + + if (TAO_debug_level > 6) { - // If successful in sending some of the data, reset the - // queue appropriately. - this->reset_queued_message (queued_message, - bytes_transferred); + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - Transport[%d]::send_message_i, " + "trying to send the message\n", + this->id ())); + } - // Indicate some success. - return bytes_transferred; + // @@ I don't think we want to hold the mutex here, however if + // we release it we need to recheck the status of the transport + // after we return... once I understand the final form for this + // code I will re-visit this decision + n = this->send_message_block_chain_i (message_block, + byte_count, + max_wait_time); + if (n == 0) + return -1; // EOF + else if (n == -1) + { + // ... if this is just an EWOULDBLOCK we must schedule the + // message for later, if it is ETIME we still have to send + // the complete message, because cutting off the message at + // this point will destroy the synchronization with the + // server ... + if (errno != EWOULDBLOCK && errno != ETIME) + { + return -1; + } } - // Since we queue up the message, this is not an error. We can - // try next time around. - return 1; + // ... let's figure out if the complete message was sent ... + if (message_block->total_length () == byte_count) + { + // Done, just return. Notice that there are no allocations + // or copies up to this point (though some fancy calling + // back and forth). + // This is the common case for the critical path, it should + // be fast. + return 0; + } } - // EOF or other errors. - if (result == -1 || - result == 0) + // ... either the message must be queued or we need to queue it + // because it was not completely sent out ... + + if (TAO_debug_level > 6) { - this->dequeue_all (); - return -1; + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - Transport[%d]::send_message_i, " + "message is queued\n", + this->id ())); } - // If successful in sending data, reset the queue appropriately. - this->reset_queued_message (queued_message, - bytes_transferred); + TAO_Queued_Message *queued_message = 0; + ACE_NEW_RETURN (queued_message, + TAO_Asynch_Queued_Message (message_block), + -1); + queued_message->bytes_transferred (byte_count); + queued_message->push_back (this->head_, this->tail_); + + // ... if the queue is full we need to activate the output on the + // queue ... + int must_flush = 0; + int constraints_reached = + this->check_buffering_constraints_i (stub, + must_flush); + + // ... but we also want to activate it if the message was partially + // sent.... Plus, when we use the blocking flushing strategy the + // queue is flushed as a side-effect of 'schedule_output()' + + if (constraints_reached || try_sending_first) + { + (void) flushing_strategy->schedule_output (this); + } - // Everything was successfully delivered. - return result; -} + if (must_flush) + { + typedef ACE_Reverse_Lock<ACE_Lock> TAO_REVERSE_LOCK; + TAO_REVERSE_LOCK reverse (*this->handler_lock_); + ACE_GUARD_RETURN (TAO_REVERSE_LOCK, ace_mon, reverse, -1); -void -TAO_Transport::reset_sent_message (ACE_Message_Block *message_block, - size_t bytes_delivered) -{ - this->reset_message (message_block, - bytes_delivered, - 0); -} + (void) flushing_strategy->flush_transport (this); + } -void -TAO_Transport::reset_queued_message (ACE_Message_Block *message_block, - size_t bytes_delivered) -{ - this->reset_message (message_block, - bytes_delivered, - 1); + return 0; } -void -TAO_Transport::reset_message (ACE_Message_Block *message_block, - size_t bytes_delivered, - int queued_message) +int +TAO_Transport::send_synchronous_message_i (const ACE_Message_Block *mb, + ACE_Time_Value *max_wait_time) { - while (bytes_delivered != 0) - { - // Our current message block chain. - ACE_Message_Block *current_message_block = message_block; + // We are going to block, so there is no need to clone + // the message block. + TAO_Synch_Queued_Message synch_message (mb); - int completely_delivered_current_message_block_chain = 0; + synch_message.push_back (this->head_, this->tail_); - while (current_message_block != 0 && - bytes_delivered != 0) - { - size_t current_message_block_length = - current_message_block->length (); + int n = this->drain_queue_i (); + if (n == -1) + { + synch_message.remove_from_list (this->head_, this->tail_); + ACE_ASSERT (synch_message.next () == 0); + ACE_ASSERT (synch_message.prev () == 0); + return -1; // Error while sending... + } + else if (n == 1) + { + ACE_ASSERT (synch_message.all_data_sent ()); + ACE_ASSERT (synch_message.next () == 0); + ACE_ASSERT (synch_message.prev () == 0); + return 1; // Empty queue, message was sent.. + } - int completely_delivered_current_message_block = - bytes_delivered >= current_message_block_length; + ACE_ASSERT (n == 0); // Some data sent, but data remains. - size_t adjustment_size = - ACE_MIN (current_message_block_length, bytes_delivered); + if (synch_message.all_data_sent ()) + { + ACE_ASSERT (synch_message.next () == 0); + ACE_ASSERT (synch_message.prev () == 0); + return 1; + } - // Reset according to send size. - current_message_block->rd_ptr (adjustment_size); + // @todo: Check for timeouts! + // if (max_wait_time != 0 && errno == ETIME) return -1; - // If queued message, adjust the queue. - if (queued_message) - // Hand adjust <message_length>. - this->buffering_queue_->message_length ( - this->buffering_queue_->message_length () - adjustment_size); + TAO_Flushing_Strategy *flushing_strategy = + this->orb_core ()->flushing_strategy (); + (void) flushing_strategy->schedule_output (this); - // Adjust <bytes_delivered>. - bytes_delivered -= adjustment_size; + // Release the mutex, other threads may modify the queue as we + // block for a long time writing out data. + int result; + { + typedef ACE_Reverse_Lock<ACE_Lock> TAO_REVERSE_LOCK; + TAO_REVERSE_LOCK reverse (*this->handler_lock_); + ACE_GUARD_RETURN (TAO_REVERSE_LOCK, ace_mon, reverse, -1); - if (completely_delivered_current_message_block) + result = flushing_strategy->flush_message (this, + &synch_message, + max_wait_time); + } + if (result == -1) + { + synch_message.remove_from_list (this->head_, this->tail_); + if (errno == ETIME) + { + if (this->head_ == &synch_message) { - // Next message block in the continuation chain. - current_message_block = current_message_block->cont (); - - if (current_message_block == 0) - completely_delivered_current_message_block_chain = 1; + // This is a timeout, there is only one nasty case: the + // message has been partially sent! We simply cannot take + // the message out of the queue, because that would corrupt + // the connection. + // + // What we do is replace the queued message with an + // asynchronous message, that contains only what remains of + // the timed out request. If you think about sending + // CancelRequests in this case: there is no much point in + // doing that: the receiving ORB would probably ignore it, + // and figuring out the request ID would be a bit of a + // nightmare. + // + + synch_message.remove_from_list (this->head_, this->tail_); + TAO_Queued_Message *queued_message = 0; + ACE_NEW_RETURN (queued_message, + TAO_Asynch_Queued_Message ( + synch_message.current_block ()), + -1); + queued_message->push_front (this->head_, this->tail_); } } - if (completely_delivered_current_message_block_chain) + if (TAO_debug_level > 0) { - // Go to the next message block chain. - message_block = message_block->next (); - - // If queued message, adjust the queue. - if (queued_message) - // Release this <current_message_block>. - this->dequeue_head (); + ACE_ERROR ((LM_ERROR, + "TAO (%P|%t) TAO_Transport::send_synchronous_message_i, " + "error while flushing message %p\n", "")); } + + return -1; } + + else + { + ACE_ASSERT (synch_message.all_data_sent () != 0); + } + + ACE_ASSERT (synch_message.next () == 0); + ACE_ASSERT (synch_message.prev () == 0); + return 1; } int @@ -320,14 +563,6 @@ TAO_Transport::connection_handler_closing (void) TAO_Transport::release(this); } -#if 0 -TAO_Connection_Handler* -TAO_Transport::connection_handler (void) const -{ - return 0; -} -#endif - TAO_Transport* TAO_Transport::_duplicate (TAO_Transport* transport) { @@ -377,9 +612,6 @@ TAO_Transport::mark_invalid (void) // @@ Do we need this method at all?? this->orb_core_->transport_cache ().mark_invalid ( this->cache_map_entry_); - - - } int @@ -429,32 +661,28 @@ TAO_Transport::close_connection (void) // work, for some reason they hold the mutex while they do // that work though. this->orb_core_->transport_cache ().purge_entry (this->cache_map_entry_); + + for (TAO_Queued_Message *i = this->head_; i != 0; i = i->next ()) + { + i->connection_closed (); + } } ssize_t -TAO_Transport::send (const ACE_Message_Block *mblk, - const ACE_Time_Value *timeout, - size_t *bytes_transferred) +TAO_Transport::send (iovec *iov, int iovcnt, + size_t &bytes_transferred, + const ACE_Time_Value *timeout) { ACE_MT (ACE_GUARD_RETURN (ACE_Lock, guard, *this->handler_lock_, -1)); - // if there's no associated event handler, then we act like a null transport - if (this->event_handler_i () == 0) - { - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("(%P|%t) transport %d (tag=%d) send() ") - ACE_TEXT ("no longer associated with handler, returning -1 with errno = ENOENT\n"), - this->id (), - this->tag_)); - errno = ENOENT; - return -1; - } + if (this->check_event_handler_i ("TAO_Transport::send") == -1) + return -1; // now call the template method - return this->send_i (mblk, timeout, bytes_transferred); + return this->send_i (iov, iovcnt, bytes_transferred, timeout); } ssize_t @@ -467,18 +695,8 @@ TAO_Transport::recv (char *buffer, *this->handler_lock_, -1)); - // if there's no associated event handler, then we act like a null transport - if (this->event_handler_i () == 0) - { - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("(%P|%t) transport %d (tag=%d) recv() ") - ACE_TEXT ("no longer associated with handler, returning -1 with errno = ENOENT\n"), - this->id (), - this->tag_)); - // @@CJC Should we return -1, like an error, or should we return 0, like an EOF? - errno = ENOENT; - return -1; - } + if (this->check_event_handler_i ("TAO_Transport::recv") == -1) + return -1; // now call the template method return this->recv_i (buffer, len, timeout); @@ -546,6 +764,19 @@ TAO_Transport::register_for_timer_event (const void* arg, } int +TAO_Transport::queue_is_empty (void) +{ + ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->handler_lock_, -1); + return this->queue_is_empty_i (); +} + +int +TAO_Transport::queue_is_empty_i (void) +{ + return (this->head_ == 0); +} + +int TAO_Transport::register_handler (void) { ACE_MT (ACE_GUARD_RETURN (ACE_Lock, @@ -566,3 +797,324 @@ TAO_Transport::id (int id) { this->id_ = id; } + +int +TAO_Transport::schedule_output_i (void) +{ + ACE_Event_Handler *eh = this->event_handler_i (); + if (eh == 0) + return -1; + + ACE_Reactor *reactor = eh->reactor (); + if (reactor == 0) + return -1; + + if (TAO_debug_level > 3) + { + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - Transport[%d]::schedule_output\n", + this->id ())); + } + + return reactor->schedule_wakeup (eh, ACE_Event_Handler::WRITE_MASK); +} + +int +TAO_Transport::cancel_output_i (void) +{ + ACE_Event_Handler *eh = this->event_handler_i (); + if (eh == 0) + return -1; + + ACE_Reactor *reactor = eh->reactor (); + if (reactor == 0) + return -1; + + if (TAO_debug_level > 3) + { + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - Transport[%d]::cancel output\n", + this->id ())); + } + + return reactor->cancel_wakeup (eh, ACE_Event_Handler::WRITE_MASK); +} + +int +TAO_Transport::handle_timeout (const ACE_Time_Value & /* current_time */, + const void *act) +{ + /// This is the only legal ACT in the current configuration.... + if (act != &this->current_deadline_) + return -1; + + if (this->flush_timer_pending ()) + { + // The timer is always a oneshot timer, so mark is as not + // pending. + this->reset_flush_timer (); + + TAO_Flushing_Strategy *flushing_strategy = + this->orb_core ()->flushing_strategy (); + (void) flushing_strategy->schedule_output (this); + } + return 0; +} + +int +TAO_Transport::drain_queue (void) +{ + ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->handler_lock_, -1); + + return this->drain_queue_i (); +} + +int +TAO_Transport::drain_queue_helper (int &iovcnt, iovec iov[]) +{ + if (this->check_event_handler_i ("TAO_Transport::drain_queue_helper") == -1) + return -1; + + size_t byte_count = 0; + + // ... send the message ... + ssize_t retval = + this->send_i (iov, iovcnt, byte_count); + + if (TAO_debug_level == 5) + { + dump_iov (iov, iovcnt, this->id (), + byte_count, "drain_queue_helper"); + } + + // ... now we need to update the queue, removing elements + // that have been sent, and updating the last element if it + // was only partially sent ... + this->cleanup_queue (byte_count); + iovcnt = 0; + + if (retval == 0) + { + if (TAO_debug_level > 4) + { + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - TAO_Transport::drain_queue_helper, " + "send() returns 0")); + } + return -1; + } + else if (retval == -1) + { + if (TAO_debug_level > 4) + { + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - TAO_Transport::drain_queue_helper, " + "%p", "send()")); + } + if (errno == EWOULDBLOCK) + return 0; + return -1; + } + + // ... start over, how do we guarantee progress? Because if + // no bytes are sent send() can only return 0 or -1 + ACE_ASSERT (byte_count != 0); + + if (TAO_debug_level > 4) + { + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - TAO_Transport::drain_queue_helper, " + "byte_count = %d, head_is_empty = %d\n", + byte_count, (this->head_ == 0))); + } + return 1; +} + +int +TAO_Transport::drain_queue_i (void) +{ + if (this->head_ == 0) + return 1; + + // This is the vector used to send data, it must be declared outside + // the loop because after the loop there may still be data to be + // sent + int iovcnt = 0; + iovec iov[IOV_MAX]; + + // We loop over all the elements in the queue ... + TAO_Queued_Message *i = this->head_; + while (i != 0) + { + // ... each element fills the iovector ... + i->fill_iov (IOV_MAX, iovcnt, iov); + + // ... the vector is full, no choice but to send some data out. + // We need to loop because a single message can span multiple + // IOV_MAX elements ... + if (iovcnt == IOV_MAX) + { + int retval = + this->drain_queue_helper (iovcnt, iov); + + if (TAO_debug_level > 4) + { + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - TAO_Transport::drain_queue_i, " + "helper retval = %d\n", + retval)); + } + if (retval != 1) + return retval; + + i = this->head_; + continue; + } + // ... notice that this line is only reached if there is still + // room in the iovector ... + i = i->next (); + } + + + if (iovcnt != 0) + { + int retval = + this->drain_queue_helper (iovcnt, iov); + + if (TAO_debug_level > 4) + { + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - TAO_Transport::drain_queue_i, " + "helper retval = %d\n", + retval)); + } + if (retval != 1) + return retval; + } + + if (this->head_ == 0) + return 1; + + return 0; +} + +void +TAO_Transport::cleanup_queue (size_t byte_count) +{ + while (this->head_ != 0 && byte_count > 0) + { + TAO_Queued_Message *i = this->head_; + + if (TAO_debug_level > 4) + { + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - TAO_Transport::cleanup_queue, " + "byte_count = %d, head_is_empty = %d\n", + byte_count, (this->head_ == 0))); + } + + // Update the state of the first message + i->bytes_transferred (byte_count); + + if (TAO_debug_level > 4) + { + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - TAO_Transport::cleanup_queue, " + "after transfer, byte_count = %d, all_sent = %d\n", + byte_count, i->all_data_sent ())); + } + + // ... if all the data was sent the message must be removed from + // the queue... + if (i->all_data_sent ()) + { + i->remove_from_list (this->head_, this->tail_); + i->destroy (); + } + } +} + +int +TAO_Transport::check_buffering_constraints_i (TAO_Stub *stub, + int &must_flush) +{ + // First let's compute the size of the queue: + size_t msg_count = 0; + size_t total_bytes = 0; + for (TAO_Queued_Message *i = this->head_; i != 0; i = i->next ()) + { + msg_count++; + total_bytes += i->message_length (); + } + + int set_timer; + ACE_Time_Value new_deadline; + + int constraints_reached = + stub->sync_strategy ().buffering_constraints_reached (stub, + msg_count, + total_bytes, + must_flush, + this->current_deadline_, + set_timer, + new_deadline); + + // ... set the new timer, also cancel any previous timers ... + if (set_timer) + { + ACE_MT (ACE_GUARD_RETURN (ACE_Lock, + guard, + *this->handler_lock_, + -1)); + + ACE_Event_Handler *eh = this->event_handler_i (); + if (eh != 0) + { + ACE_Reactor *reactor = eh->reactor (); + if (reactor != 0) + { + this->current_deadline_ = new_deadline; + ACE_Time_Value delay = + new_deadline - ACE_OS::gettimeofday (); + + if (this->flush_timer_pending ()) + { + (void) reactor->cancel_timer (this->flush_timer_id_); + } + this->flush_timer_id_ = + reactor->schedule_timer (&this->transport_timer_, + &this->current_deadline_, + delay); + } + } + } + + return constraints_reached; +} + +void +TAO_Transport::report_invalid_event_handler (const char *caller) +{ + if (TAO_debug_level > 0) + { + ACE_DEBUG ((LM_DEBUG, + "(%P|%t) transport %d (tag=%d) %s " + "no longer associated with handler, " + "returning -1 with errno = ENOENT\n", + this->id (), + this->tag_, + caller)); + } +} + +#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION) + +template class ACE_Reverse_Lock<ACE_Lock>; +template class ACE_Guard<ACE_Reverse_Lock<ACE_Lock> >; + +#elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA) + +#pragma instantiate ACE_Reverse_Lock<ACE_Lock> +#pragma instantiate ACE_Guard<ACE_Reverse_Lock<ACE_Lock> > + +#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */ |