summaryrefslogtreecommitdiff
path: root/TAO/tao/Sync_Strategies.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'TAO/tao/Sync_Strategies.cpp')
-rw-r--r--TAO/tao/Sync_Strategies.cpp277
1 files changed, 111 insertions, 166 deletions
diff --git a/TAO/tao/Sync_Strategies.cpp b/TAO/tao/Sync_Strategies.cpp
index d09bcd1a5a5..a64223da662 100644
--- a/TAO/tao/Sync_Strategies.cpp
+++ b/TAO/tao/Sync_Strategies.cpp
@@ -5,138 +5,68 @@
#include "tao/Buffering_Constraint_Policy.h"
#include "tao/Stub.h"
#include "tao/ORB_Core.h"
+#include "tao/debug.h"
#if !defined (__ACE_INLINE__)
# include "tao/Sync_Strategies.i"
#endif /* ! __ACE_INLINE__ */
-TAO_Sync_Strategy::~TAO_Sync_Strategy (void)
-{
-}
+ACE_RCSID(tao, Sync_Strategies, "$Id$")
-ssize_t
-TAO_Transport_Sync_Strategy::send (TAO_Transport &transport,
- TAO_Stub &,
- const ACE_Message_Block *message_block,
- const ACE_Time_Value *max_wait_time)
+TAO_Sync_Strategy::~TAO_Sync_Strategy (void)
{
- // Immediate delegation to the transport.
- return transport.send (message_block,
- max_wait_time);
}
-#if (TAO_HAS_BUFFERING_CONSTRAINT_POLICY == 1)
+// ****************************************************************
-ssize_t
-TAO_Delayed_Buffering_Sync_Strategy::send (TAO_Transport &transport,
- TAO_Stub &stub,
- const ACE_Message_Block *mb,
- const ACE_Time_Value *max_wait_time)
+int
+TAO_Transport_Sync_Strategy::
+ must_queue (int)
{
- ACE_Message_Block *message_block =
- ACE_const_cast (ACE_Message_Block *, mb);
-
- ssize_t result = 0;
-
- // Get the message queue from the transport.
- TAO_Transport_Buffering_Queue &buffering_queue =
- transport.buffering_queue ();
-
- // Check if there are messages already in the queue.
- if (!buffering_queue.is_empty ())
- return TAO_Eager_Buffering_Sync_Strategy::send (transport,
- stub,
- message_block,
- max_wait_time);
-
- //
- // Otherwise there were no queued messages. We first try to send
- // the message right away.
- //
-
- // Actual network send.
- size_t bytes_transferred = 0;
- result = transport.send (message_block,
- max_wait_time,
- &bytes_transferred);
-
- // Cannot send completely: timed out.
- if (result == -1 &&
- errno == ETIME)
- {
- if (bytes_transferred > 0)
- {
- // If successful in sending some of the data, reset the
- // message block appropriately.
- transport.reset_sent_message (message_block,
- bytes_transferred);
- }
-
- // Queue the rest.
- return bytes_transferred +
- TAO_Eager_Buffering_Sync_Strategy::send (transport,
- stub,
- message_block,
- max_wait_time);
- }
-
- // EOF or other errors.
- if (result == -1 ||
- result == 0)
- return -1;
-
- // Everything was successfully delivered.
- return result;
+ return 0;
}
-ssize_t
-TAO_Eager_Buffering_Sync_Strategy::send (TAO_Transport &transport,
- TAO_Stub &stub,
- const ACE_Message_Block *message_block,
- const ACE_Time_Value *max_wait_time)
+int
+TAO_Transport_Sync_Strategy::
+ buffering_constraints_reached (TAO_Stub *,
+ size_t ,
+ size_t ,
+ int &must_flush,
+ const ACE_Time_Value &,
+ int &set_timer,
+ ACE_Time_Value &)
{
- ssize_t result = 0;
-
- // Get the message queue from the transport.
- TAO_Transport_Buffering_Queue &buffering_queue =
- transport.buffering_queue ();
-
- // Copy the message.
- ACE_Message_Block *copy = message_block->clone ();
-
- // Enqueue current message.
- result = buffering_queue.enqueue_tail (copy);
-
- // Enqueuing error.
- if (result == -1)
- {
- // Eliminate the copy.
- copy->release ();
+ set_timer = 0;
+ must_flush = 1;
+ return 1;
+}
- // Return error.
- return -1;
- }
+#if (TAO_HAS_BUFFERING_CONSTRAINT_POLICY == 1)
- // Check if upper bound has been reached.
- if (this->buffering_constraints_reached (transport,
- stub,
- buffering_queue))
- {
- return transport.send_buffered_messages (max_wait_time);
- }
+// ****************************************************************
- // Hoping that this return value is meaningful or at least
- // acceptable.
- return message_block->total_length ();
+int
+TAO_Eager_Buffering_Sync_Strategy::
+ must_queue (int)
+{
+ return 1;
}
int
-TAO_Eager_Buffering_Sync_Strategy::buffering_constraints_reached (TAO_Transport &transport,
- TAO_Stub &stub,
- TAO_Transport_Buffering_Queue &buffering_queue)
+TAO_Eager_Buffering_Sync_Strategy::
+ buffering_constraints_reached (TAO_Stub *stub,
+ size_t msg_count,
+ size_t total_bytes,
+ int &must_flush,
+ const ACE_Time_Value &current_deadline,
+ int &set_timer,
+ ACE_Time_Value &new_deadline)
{
+ must_flush = 0;
+ set_timer = 0;
+
TAO_Buffering_Constraint_Policy *buffering_constraint_policy =
- stub.buffering_constraint ();
+ stub->buffering_constraint ();
if (buffering_constraint_policy == 0)
return 1;
@@ -147,80 +77,85 @@ TAO_Eager_Buffering_Sync_Strategy::buffering_constraints_reached (TAO_Transport
TAO::BufferingConstraint buffering_constraint;
buffering_constraint_policy->get_buffering_constraint (buffering_constraint);
- this->timer_check (transport,
- buffering_constraint);
-
if (buffering_constraint.mode == TAO::BUFFER_FLUSH)
- return 1;
+ {
+ must_flush = 1;
+ return 1;
+ }
+ int constraints_reached = 0;
if (ACE_BIT_ENABLED (buffering_constraint.mode,
- TAO::BUFFER_MESSAGE_COUNT) &&
- buffering_queue.message_count () >= buffering_constraint.message_count)
- return 1;
+ TAO::BUFFER_MESSAGE_COUNT)
+ && msg_count >= buffering_constraint.message_count)
+ constraints_reached = 1;
if (ACE_BIT_ENABLED (buffering_constraint.mode,
- TAO::BUFFER_MESSAGE_BYTES) &&
- buffering_queue.message_length () >= buffering_constraint.message_bytes)
- return 1;
+ TAO::BUFFER_MESSAGE_BYTES)
+ && total_bytes >= buffering_constraint.message_bytes)
+ constraints_reached = 1;
- return 0;
+ if (this->timer_check (buffering_constraint,
+ current_deadline,
+ set_timer,
+ new_deadline) != 0)
+ constraints_reached = 1;
+
+ return constraints_reached;
}
-void
-TAO_Eager_Buffering_Sync_Strategy::timer_check (TAO_Transport &transport,
- const TAO::BufferingConstraint &buffering_constraint)
+int
+TAO_Eager_Buffering_Sync_Strategy::
+ timer_check (const TAO::BufferingConstraint &buffering_constraint,
+ const ACE_Time_Value &current_deadline,
+ int &set_timer,
+ ACE_Time_Value &new_deadline)
{
- if (transport.buffering_timer_id () != 0)
+ set_timer = 0;
+ if (!ACE_BIT_ENABLED (buffering_constraint.mode,
+ TAO::BUFFER_TIMEOUT))
{
- //
- // There is a timeout set by us, though we are not sure if we
- // still need the timeout or if the timeout value is correct or
- // not.
- //
-
- // Get our reactor.
- ACE_Reactor *reactor = transport.orb_core ()->reactor ();
-
- if (!ACE_BIT_ENABLED (buffering_constraint.mode,
- TAO::BUFFER_TIMEOUT))
- {
- // Timeouts are no longer needed. Cancel existing one.
- reactor->cancel_timer (transport.buffering_timer_id ());
- transport.buffering_timer_id (0);
- }
- else
- {
- ACE_Time_Value timeout =
- this->time_conversion (buffering_constraint.timeout);
-
- if (transport.buffering_timeout_value () == timeout)
- {
- // Timeout value is the same, nothing to be done.
- }
- else
- {
- // Timeout value has changed, reset the old timer.
- reactor->reset_timer_interval (transport.buffering_timer_id (),
- timeout);
- }
- }
+ return 0;
}
- else if (ACE_BIT_ENABLED (buffering_constraint.mode,
- TAO::BUFFER_TIMEOUT))
+
+ // Compute the next deadline...
+ ACE_Time_Value now = ACE_OS::gettimeofday ();
+ ACE_Time_Value timeout =
+ this->time_conversion (buffering_constraint.timeout);
+ new_deadline = now + timeout;
+
+ // Check if the new deadline is more stringent, or if the deadline
+ // has expired and thus must be reset anyway.
+ if (current_deadline > new_deadline
+ || current_deadline < now)
{
- // We didn't have timeouts before, but we want them now.
- ACE_Time_Value timeout =
- this->time_conversion (buffering_constraint.timeout);
+ set_timer = 1;
+ }
- long timer_id = transport.register_for_timer_event (0, timeout, timeout);
+ // ... if there is no deadline we don't want to schedule output (the
+ // deadline will be set because set_timer is set to 1 in that case).
+ // If there is a deadline but but it has not been reached, we
+ // don't want to schedule any output either...
+ if (current_deadline == ACE_Time_Value::zero
+ || current_deadline >= now)
+ {
+ return 0;
+ }
- transport.buffering_timer_id (timer_id);
- transport.buffering_timeout_value (timeout);
+ if (TAO_debug_level > 6)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "TAO (%P|%t) - TAO_Eager_Buffering_Sync_Strategy::timer_check, "
+ "Now = %u, Current = %u, New = %u\n",
+ now.msec (), current_deadline.msec (),
+ new_deadline.msec ()));
}
+
+ return 1;
}
ACE_Time_Value
-TAO_Eager_Buffering_Sync_Strategy::time_conversion (const TimeBase::TimeT &time)
+TAO_Eager_Buffering_Sync_Strategy::
+ time_conversion (const TimeBase::TimeT &time)
{
TimeBase::TimeT seconds = time / 10000000u;
TimeBase::TimeT microseconds = (time % 10000000u) / 10;
@@ -228,4 +163,14 @@ TAO_Eager_Buffering_Sync_Strategy::time_conversion (const TimeBase::TimeT &time)
ACE_U64_TO_U32 (microseconds));
}
+// ****************************************************************
+
+int
+TAO_Delayed_Buffering_Sync_Strategy::
+ must_queue (int queue_empty)
+{
+ // If the queue is empty we want to send immediately
+ return !queue_empty;
+}
+
#endif /* TAO_HAS_BUFFERING_CONSTRAINT_POLICY == 1 */