diff options
Diffstat (limited to 'TAO/tao/Sync_Strategies.cpp')
-rw-r--r-- | TAO/tao/Sync_Strategies.cpp | 277 |
1 files changed, 111 insertions, 166 deletions
diff --git a/TAO/tao/Sync_Strategies.cpp b/TAO/tao/Sync_Strategies.cpp index d09bcd1a5a5..a64223da662 100644 --- a/TAO/tao/Sync_Strategies.cpp +++ b/TAO/tao/Sync_Strategies.cpp @@ -5,138 +5,68 @@ #include "tao/Buffering_Constraint_Policy.h" #include "tao/Stub.h" #include "tao/ORB_Core.h" +#include "tao/debug.h" #if !defined (__ACE_INLINE__) # include "tao/Sync_Strategies.i" #endif /* ! __ACE_INLINE__ */ -TAO_Sync_Strategy::~TAO_Sync_Strategy (void) -{ -} +ACE_RCSID(tao, Sync_Strategies, "$Id$") -ssize_t -TAO_Transport_Sync_Strategy::send (TAO_Transport &transport, - TAO_Stub &, - const ACE_Message_Block *message_block, - const ACE_Time_Value *max_wait_time) +TAO_Sync_Strategy::~TAO_Sync_Strategy (void) { - // Immediate delegation to the transport. - return transport.send (message_block, - max_wait_time); } -#if (TAO_HAS_BUFFERING_CONSTRAINT_POLICY == 1) +// **************************************************************** -ssize_t -TAO_Delayed_Buffering_Sync_Strategy::send (TAO_Transport &transport, - TAO_Stub &stub, - const ACE_Message_Block *mb, - const ACE_Time_Value *max_wait_time) +int +TAO_Transport_Sync_Strategy:: + must_queue (int) { - ACE_Message_Block *message_block = - ACE_const_cast (ACE_Message_Block *, mb); - - ssize_t result = 0; - - // Get the message queue from the transport. - TAO_Transport_Buffering_Queue &buffering_queue = - transport.buffering_queue (); - - // Check if there are messages already in the queue. - if (!buffering_queue.is_empty ()) - return TAO_Eager_Buffering_Sync_Strategy::send (transport, - stub, - message_block, - max_wait_time); - - // - // Otherwise there were no queued messages. We first try to send - // the message right away. - // - - // Actual network send. - size_t bytes_transferred = 0; - result = transport.send (message_block, - max_wait_time, - &bytes_transferred); - - // Cannot send completely: timed out. - if (result == -1 && - errno == ETIME) - { - if (bytes_transferred > 0) - { - // If successful in sending some of the data, reset the - // message block appropriately. - transport.reset_sent_message (message_block, - bytes_transferred); - } - - // Queue the rest. - return bytes_transferred + - TAO_Eager_Buffering_Sync_Strategy::send (transport, - stub, - message_block, - max_wait_time); - } - - // EOF or other errors. - if (result == -1 || - result == 0) - return -1; - - // Everything was successfully delivered. - return result; + return 0; } -ssize_t -TAO_Eager_Buffering_Sync_Strategy::send (TAO_Transport &transport, - TAO_Stub &stub, - const ACE_Message_Block *message_block, - const ACE_Time_Value *max_wait_time) +int +TAO_Transport_Sync_Strategy:: + buffering_constraints_reached (TAO_Stub *, + size_t , + size_t , + int &must_flush, + const ACE_Time_Value &, + int &set_timer, + ACE_Time_Value &) { - ssize_t result = 0; - - // Get the message queue from the transport. - TAO_Transport_Buffering_Queue &buffering_queue = - transport.buffering_queue (); - - // Copy the message. - ACE_Message_Block *copy = message_block->clone (); - - // Enqueue current message. - result = buffering_queue.enqueue_tail (copy); - - // Enqueuing error. - if (result == -1) - { - // Eliminate the copy. - copy->release (); + set_timer = 0; + must_flush = 1; + return 1; +} - // Return error. - return -1; - } +#if (TAO_HAS_BUFFERING_CONSTRAINT_POLICY == 1) - // Check if upper bound has been reached. - if (this->buffering_constraints_reached (transport, - stub, - buffering_queue)) - { - return transport.send_buffered_messages (max_wait_time); - } +// **************************************************************** - // Hoping that this return value is meaningful or at least - // acceptable. - return message_block->total_length (); +int +TAO_Eager_Buffering_Sync_Strategy:: + must_queue (int) +{ + return 1; } int -TAO_Eager_Buffering_Sync_Strategy::buffering_constraints_reached (TAO_Transport &transport, - TAO_Stub &stub, - TAO_Transport_Buffering_Queue &buffering_queue) +TAO_Eager_Buffering_Sync_Strategy:: + buffering_constraints_reached (TAO_Stub *stub, + size_t msg_count, + size_t total_bytes, + int &must_flush, + const ACE_Time_Value ¤t_deadline, + int &set_timer, + ACE_Time_Value &new_deadline) { + must_flush = 0; + set_timer = 0; + TAO_Buffering_Constraint_Policy *buffering_constraint_policy = - stub.buffering_constraint (); + stub->buffering_constraint (); if (buffering_constraint_policy == 0) return 1; @@ -147,80 +77,85 @@ TAO_Eager_Buffering_Sync_Strategy::buffering_constraints_reached (TAO_Transport TAO::BufferingConstraint buffering_constraint; buffering_constraint_policy->get_buffering_constraint (buffering_constraint); - this->timer_check (transport, - buffering_constraint); - if (buffering_constraint.mode == TAO::BUFFER_FLUSH) - return 1; + { + must_flush = 1; + return 1; + } + int constraints_reached = 0; if (ACE_BIT_ENABLED (buffering_constraint.mode, - TAO::BUFFER_MESSAGE_COUNT) && - buffering_queue.message_count () >= buffering_constraint.message_count) - return 1; + TAO::BUFFER_MESSAGE_COUNT) + && msg_count >= buffering_constraint.message_count) + constraints_reached = 1; if (ACE_BIT_ENABLED (buffering_constraint.mode, - TAO::BUFFER_MESSAGE_BYTES) && - buffering_queue.message_length () >= buffering_constraint.message_bytes) - return 1; + TAO::BUFFER_MESSAGE_BYTES) + && total_bytes >= buffering_constraint.message_bytes) + constraints_reached = 1; - return 0; + if (this->timer_check (buffering_constraint, + current_deadline, + set_timer, + new_deadline) != 0) + constraints_reached = 1; + + return constraints_reached; } -void -TAO_Eager_Buffering_Sync_Strategy::timer_check (TAO_Transport &transport, - const TAO::BufferingConstraint &buffering_constraint) +int +TAO_Eager_Buffering_Sync_Strategy:: + timer_check (const TAO::BufferingConstraint &buffering_constraint, + const ACE_Time_Value ¤t_deadline, + int &set_timer, + ACE_Time_Value &new_deadline) { - if (transport.buffering_timer_id () != 0) + set_timer = 0; + if (!ACE_BIT_ENABLED (buffering_constraint.mode, + TAO::BUFFER_TIMEOUT)) { - // - // There is a timeout set by us, though we are not sure if we - // still need the timeout or if the timeout value is correct or - // not. - // - - // Get our reactor. - ACE_Reactor *reactor = transport.orb_core ()->reactor (); - - if (!ACE_BIT_ENABLED (buffering_constraint.mode, - TAO::BUFFER_TIMEOUT)) - { - // Timeouts are no longer needed. Cancel existing one. - reactor->cancel_timer (transport.buffering_timer_id ()); - transport.buffering_timer_id (0); - } - else - { - ACE_Time_Value timeout = - this->time_conversion (buffering_constraint.timeout); - - if (transport.buffering_timeout_value () == timeout) - { - // Timeout value is the same, nothing to be done. - } - else - { - // Timeout value has changed, reset the old timer. - reactor->reset_timer_interval (transport.buffering_timer_id (), - timeout); - } - } + return 0; } - else if (ACE_BIT_ENABLED (buffering_constraint.mode, - TAO::BUFFER_TIMEOUT)) + + // Compute the next deadline... + ACE_Time_Value now = ACE_OS::gettimeofday (); + ACE_Time_Value timeout = + this->time_conversion (buffering_constraint.timeout); + new_deadline = now + timeout; + + // Check if the new deadline is more stringent, or if the deadline + // has expired and thus must be reset anyway. + if (current_deadline > new_deadline + || current_deadline < now) { - // We didn't have timeouts before, but we want them now. - ACE_Time_Value timeout = - this->time_conversion (buffering_constraint.timeout); + set_timer = 1; + } - long timer_id = transport.register_for_timer_event (0, timeout, timeout); + // ... if there is no deadline we don't want to schedule output (the + // deadline will be set because set_timer is set to 1 in that case). + // If there is a deadline but but it has not been reached, we + // don't want to schedule any output either... + if (current_deadline == ACE_Time_Value::zero + || current_deadline >= now) + { + return 0; + } - transport.buffering_timer_id (timer_id); - transport.buffering_timeout_value (timeout); + if (TAO_debug_level > 6) + { + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - TAO_Eager_Buffering_Sync_Strategy::timer_check, " + "Now = %u, Current = %u, New = %u\n", + now.msec (), current_deadline.msec (), + new_deadline.msec ())); } + + return 1; } ACE_Time_Value -TAO_Eager_Buffering_Sync_Strategy::time_conversion (const TimeBase::TimeT &time) +TAO_Eager_Buffering_Sync_Strategy:: + time_conversion (const TimeBase::TimeT &time) { TimeBase::TimeT seconds = time / 10000000u; TimeBase::TimeT microseconds = (time % 10000000u) / 10; @@ -228,4 +163,14 @@ TAO_Eager_Buffering_Sync_Strategy::time_conversion (const TimeBase::TimeT &time) ACE_U64_TO_U32 (microseconds)); } +// **************************************************************** + +int +TAO_Delayed_Buffering_Sync_Strategy:: + must_queue (int queue_empty) +{ + // If the queue is empty we want to send immediately + return !queue_empty; +} + #endif /* TAO_HAS_BUFFERING_CONSTRAINT_POLICY == 1 */ |