diff options
Diffstat (limited to 'TAO/tao/Sync_Strategies.cpp')
-rw-r--r-- | TAO/tao/Sync_Strategies.cpp | 229 |
1 files changed, 62 insertions, 167 deletions
diff --git a/TAO/tao/Sync_Strategies.cpp b/TAO/tao/Sync_Strategies.cpp index c679fc3405c..36c923f1ee0 100644 --- a/TAO/tao/Sync_Strategies.cpp +++ b/TAO/tao/Sync_Strategies.cpp @@ -14,129 +14,47 @@ TAO_Sync_Strategy::~TAO_Sync_Strategy (void) { } -ssize_t -TAO_Transport_Sync_Strategy::send (TAO_Transport &transport, - TAO_Stub &, - const ACE_Message_Block *message_block, - const ACE_Time_Value *max_wait_time) -{ - // Immediate delegation to the transport. - return transport.send (message_block, - max_wait_time); -} +// **************************************************************** -#if (TAO_HAS_BUFFERING_CONSTRAINT_POLICY == 1) - -ssize_t -TAO_Delayed_Buffering_Sync_Strategy::send (TAO_Transport &transport, - TAO_Stub &stub, - const ACE_Message_Block *mb, - const ACE_Time_Value *max_wait_time) +int +TAO_Transport_Sync_Strategy:: + must_queue (int) { - ACE_Message_Block *message_block = - ACE_const_cast (ACE_Message_Block *, mb); - - ssize_t result = 0; - - // Get the message queue from the transport. - TAO_Transport_Buffering_Queue &buffering_queue = - transport.buffering_queue (); - - // Check if there are messages already in the queue. - if (!buffering_queue.is_empty ()) - return TAO_Eager_Buffering_Sync_Strategy::send (transport, - stub, - message_block, - max_wait_time); - - // - // Otherwise there were no queued messages. We first try to send - // the message right away. - // - - // Actual network send. - size_t bytes_transferred = 0; - result = transport.send (message_block, - max_wait_time, - &bytes_transferred); - - // Cannot send completely: timed out. - if (result == -1 && - errno == ETIME) - { - if (bytes_transferred > 0) - { - // If successful in sending some of the data, reset the - // message block appropriately. - transport.reset_sent_message (message_block, - bytes_transferred); - } - - // Queue the rest. - return bytes_transferred + - TAO_Eager_Buffering_Sync_Strategy::send (transport, - stub, - message_block, - max_wait_time); - } - - // EOF or other errors. - if (result == -1 || - result == 0) - return -1; - - // Everything was successfully delivered. - return result; + return 0; } -ssize_t -TAO_Eager_Buffering_Sync_Strategy::send (TAO_Transport &transport, - TAO_Stub &stub, - const ACE_Message_Block *message_block, - const ACE_Time_Value *max_wait_time) +int +TAO_Transport_Sync_Strategy:: + buffering_constraints_reached (TAO_Stub *, + size_t , + size_t , + int &, + ACE_Time_Value &) { - ssize_t result = 0; - - // Get the message queue from the transport. - TAO_Transport_Buffering_Queue &buffering_queue = - transport.buffering_queue (); - - // Copy the message. - ACE_Message_Block *copy = message_block->clone (); - - // Enqueue current message. - result = buffering_queue.enqueue_tail (copy); - - // Enqueuing error. - if (result == -1) - { - // Eliminate the copy. - copy->release (); + return 0; +} - // Return error. - return -1; - } +#if (TAO_HAS_BUFFERING_CONSTRAINT_POLICY == 1) - // Check if upper bound has been reached. - if (this->buffering_constraints_reached (transport, - stub, - buffering_queue)) - { - return transport.send_buffered_messages (max_wait_time); - } +// **************************************************************** - // Hoping that this return value is meaningful or at least - // acceptable. - return message_block->total_length (); +int +TAO_Eager_Buffering_Sync_Strategy:: + must_queue (int) +{ + return 1; } int -TAO_Eager_Buffering_Sync_Strategy::buffering_constraints_reached (TAO_Transport &transport, - TAO_Stub &stub, - TAO_Transport_Buffering_Queue &buffering_queue) +TAO_Eager_Buffering_Sync_Strategy:: + buffering_constraints_reached (TAO_Stub *stub, + size_t msg_count, + size_t total_bytes, + int &set_timer, + ACE_Time_Value &interval) { TAO_Buffering_Constraint_Policy *buffering_constraint_policy = - stub.buffering_constraint (); + stub->buffering_constraint (); if (buffering_constraint_policy == 0) return 1; @@ -147,86 +65,53 @@ TAO_Eager_Buffering_Sync_Strategy::buffering_constraints_reached (TAO_Transport TAO::BufferingConstraint buffering_constraint; buffering_constraint_policy->get_buffering_constraint (buffering_constraint); - this->timer_check (transport, - buffering_constraint); + this->timer_check (buffering_constraint, set_timer, interval); if (buffering_constraint.mode == TAO::BUFFER_FLUSH) return 1; if (ACE_BIT_ENABLED (buffering_constraint.mode, - TAO::BUFFER_MESSAGE_COUNT) && - buffering_queue.message_count () >= buffering_constraint.message_count) + TAO::BUFFER_MESSAGE_COUNT) + && msg_count >= buffering_constraint.message_count) return 1; if (ACE_BIT_ENABLED (buffering_constraint.mode, - TAO::BUFFER_MESSAGE_BYTES) && - buffering_queue.message_length () >= buffering_constraint.message_bytes) + TAO::BUFFER_MESSAGE_BYTES) + && total_bytes >= buffering_constraint.message_bytes) return 1; return 0; } void -TAO_Eager_Buffering_Sync_Strategy::timer_check (TAO_Transport &transport, - const TAO::BufferingConstraint &buffering_constraint) +TAO_Eager_Buffering_Sync_Strategy:: + timer_check (const TAO::BufferingConstraint &buffering_constraint, + int &set_timer, + ACE_Time_Value &interval) { - if (transport.buffering_timer_id () != 0) + if (!ACE_BIT_ENABLED (buffering_constraint.mode, + TAO::BUFFER_TIMEOUT)) { - // - // There is a timeout set by us, though we are not sure if we - // still need the timeout or if the timeout value is correct or - // not. - // - - // Get our reactor. - ACE_Reactor *reactor = transport.orb_core ()->reactor (); - - if (!ACE_BIT_ENABLED (buffering_constraint.mode, - TAO::BUFFER_TIMEOUT)) - { - // Timeouts are no longer needed. Cancel existing one. - reactor->cancel_timer (transport.buffering_timer_id ()); - transport.buffering_timer_id (0); - } - else - { - ACE_Time_Value timeout = - this->time_conversion (buffering_constraint.timeout); - - if (transport.buffering_timeout_value () == timeout) - { - // Timeout value is the same, nothing to be done. - } - else - { - // Timeout value has changed, reset the old timer. - reactor->reset_timer_interval (transport.buffering_timer_id (), - timeout); - } - } + set_timer = 0; + return; } - else if (ACE_BIT_ENABLED (buffering_constraint.mode, - TAO::BUFFER_TIMEOUT)) - { - // We didn't have timeouts before, but we want them now. - ACE_Time_Value timeout = - this->time_conversion (buffering_constraint.timeout); - // Get our reactor. - ACE_Reactor *reactor = transport.orb_core ()->reactor (); + ACE_Time_Value timeout = + this->time_conversion (buffering_constraint.timeout); - long timer_id = reactor->schedule_timer (transport.event_handler (), - 0, - timeout, - timeout); - - transport.buffering_timer_id (timer_id); - transport.buffering_timeout_value (timeout); + if (interval == timeout) + { + set_timer = 0; + return; } + + set_timer = 1; + interval = timeout; } ACE_Time_Value -TAO_Eager_Buffering_Sync_Strategy::time_conversion (const TimeBase::TimeT &time) +TAO_Eager_Buffering_Sync_Strategy:: + time_conversion (const TimeBase::TimeT &time) { TimeBase::TimeT seconds = time / 10000000u; TimeBase::TimeT microseconds = (time % 10000000u) / 10; @@ -234,4 +119,14 @@ TAO_Eager_Buffering_Sync_Strategy::time_conversion (const TimeBase::TimeT &time) ACE_U64_TO_U32 (microseconds)); } +// **************************************************************** + +int +TAO_Delayed_Buffering_Sync_Strategy:: + must_queue (int queue_empty) +{ + // If the queue is empty we want to send immediately + return !queue_empty; +} + #endif /* TAO_HAS_BUFFERING_CONSTRAINT_POLICY == 1 */ |