summaryrefslogtreecommitdiff
path: root/TAO/tao/Sync_Strategies.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'TAO/tao/Sync_Strategies.cpp')
-rw-r--r--TAO/tao/Sync_Strategies.cpp229
1 files changed, 62 insertions, 167 deletions
diff --git a/TAO/tao/Sync_Strategies.cpp b/TAO/tao/Sync_Strategies.cpp
index c679fc3405c..36c923f1ee0 100644
--- a/TAO/tao/Sync_Strategies.cpp
+++ b/TAO/tao/Sync_Strategies.cpp
@@ -14,129 +14,47 @@ TAO_Sync_Strategy::~TAO_Sync_Strategy (void)
{
}
-ssize_t
-TAO_Transport_Sync_Strategy::send (TAO_Transport &transport,
- TAO_Stub &,
- const ACE_Message_Block *message_block,
- const ACE_Time_Value *max_wait_time)
-{
- // Immediate delegation to the transport.
- return transport.send (message_block,
- max_wait_time);
-}
+// ****************************************************************
-#if (TAO_HAS_BUFFERING_CONSTRAINT_POLICY == 1)
-
-ssize_t
-TAO_Delayed_Buffering_Sync_Strategy::send (TAO_Transport &transport,
- TAO_Stub &stub,
- const ACE_Message_Block *mb,
- const ACE_Time_Value *max_wait_time)
+int
+TAO_Transport_Sync_Strategy::
+ must_queue (int)
{
- ACE_Message_Block *message_block =
- ACE_const_cast (ACE_Message_Block *, mb);
-
- ssize_t result = 0;
-
- // Get the message queue from the transport.
- TAO_Transport_Buffering_Queue &buffering_queue =
- transport.buffering_queue ();
-
- // Check if there are messages already in the queue.
- if (!buffering_queue.is_empty ())
- return TAO_Eager_Buffering_Sync_Strategy::send (transport,
- stub,
- message_block,
- max_wait_time);
-
- //
- // Otherwise there were no queued messages. We first try to send
- // the message right away.
- //
-
- // Actual network send.
- size_t bytes_transferred = 0;
- result = transport.send (message_block,
- max_wait_time,
- &bytes_transferred);
-
- // Cannot send completely: timed out.
- if (result == -1 &&
- errno == ETIME)
- {
- if (bytes_transferred > 0)
- {
- // If successful in sending some of the data, reset the
- // message block appropriately.
- transport.reset_sent_message (message_block,
- bytes_transferred);
- }
-
- // Queue the rest.
- return bytes_transferred +
- TAO_Eager_Buffering_Sync_Strategy::send (transport,
- stub,
- message_block,
- max_wait_time);
- }
-
- // EOF or other errors.
- if (result == -1 ||
- result == 0)
- return -1;
-
- // Everything was successfully delivered.
- return result;
+ return 0;
}
-ssize_t
-TAO_Eager_Buffering_Sync_Strategy::send (TAO_Transport &transport,
- TAO_Stub &stub,
- const ACE_Message_Block *message_block,
- const ACE_Time_Value *max_wait_time)
+int
+TAO_Transport_Sync_Strategy::
+ buffering_constraints_reached (TAO_Stub *,
+ size_t ,
+ size_t ,
+ int &,
+ ACE_Time_Value &)
{
- ssize_t result = 0;
-
- // Get the message queue from the transport.
- TAO_Transport_Buffering_Queue &buffering_queue =
- transport.buffering_queue ();
-
- // Copy the message.
- ACE_Message_Block *copy = message_block->clone ();
-
- // Enqueue current message.
- result = buffering_queue.enqueue_tail (copy);
-
- // Enqueuing error.
- if (result == -1)
- {
- // Eliminate the copy.
- copy->release ();
+ return 0;
+}
- // Return error.
- return -1;
- }
+#if (TAO_HAS_BUFFERING_CONSTRAINT_POLICY == 1)
- // Check if upper bound has been reached.
- if (this->buffering_constraints_reached (transport,
- stub,
- buffering_queue))
- {
- return transport.send_buffered_messages (max_wait_time);
- }
+// ****************************************************************
- // Hoping that this return value is meaningful or at least
- // acceptable.
- return message_block->total_length ();
+int
+TAO_Eager_Buffering_Sync_Strategy::
+ must_queue (int)
+{
+ return 1;
}
int
-TAO_Eager_Buffering_Sync_Strategy::buffering_constraints_reached (TAO_Transport &transport,
- TAO_Stub &stub,
- TAO_Transport_Buffering_Queue &buffering_queue)
+TAO_Eager_Buffering_Sync_Strategy::
+ buffering_constraints_reached (TAO_Stub *stub,
+ size_t msg_count,
+ size_t total_bytes,
+ int &set_timer,
+ ACE_Time_Value &interval)
{
TAO_Buffering_Constraint_Policy *buffering_constraint_policy =
- stub.buffering_constraint ();
+ stub->buffering_constraint ();
if (buffering_constraint_policy == 0)
return 1;
@@ -147,86 +65,53 @@ TAO_Eager_Buffering_Sync_Strategy::buffering_constraints_reached (TAO_Transport
TAO::BufferingConstraint buffering_constraint;
buffering_constraint_policy->get_buffering_constraint (buffering_constraint);
- this->timer_check (transport,
- buffering_constraint);
+ this->timer_check (buffering_constraint, set_timer, interval);
if (buffering_constraint.mode == TAO::BUFFER_FLUSH)
return 1;
if (ACE_BIT_ENABLED (buffering_constraint.mode,
- TAO::BUFFER_MESSAGE_COUNT) &&
- buffering_queue.message_count () >= buffering_constraint.message_count)
+ TAO::BUFFER_MESSAGE_COUNT)
+ && msg_count >= buffering_constraint.message_count)
return 1;
if (ACE_BIT_ENABLED (buffering_constraint.mode,
- TAO::BUFFER_MESSAGE_BYTES) &&
- buffering_queue.message_length () >= buffering_constraint.message_bytes)
+ TAO::BUFFER_MESSAGE_BYTES)
+ && total_bytes >= buffering_constraint.message_bytes)
return 1;
return 0;
}
void
-TAO_Eager_Buffering_Sync_Strategy::timer_check (TAO_Transport &transport,
- const TAO::BufferingConstraint &buffering_constraint)
+TAO_Eager_Buffering_Sync_Strategy::
+ timer_check (const TAO::BufferingConstraint &buffering_constraint,
+ int &set_timer,
+ ACE_Time_Value &interval)
{
- if (transport.buffering_timer_id () != 0)
+ if (!ACE_BIT_ENABLED (buffering_constraint.mode,
+ TAO::BUFFER_TIMEOUT))
{
- //
- // There is a timeout set by us, though we are not sure if we
- // still need the timeout or if the timeout value is correct or
- // not.
- //
-
- // Get our reactor.
- ACE_Reactor *reactor = transport.orb_core ()->reactor ();
-
- if (!ACE_BIT_ENABLED (buffering_constraint.mode,
- TAO::BUFFER_TIMEOUT))
- {
- // Timeouts are no longer needed. Cancel existing one.
- reactor->cancel_timer (transport.buffering_timer_id ());
- transport.buffering_timer_id (0);
- }
- else
- {
- ACE_Time_Value timeout =
- this->time_conversion (buffering_constraint.timeout);
-
- if (transport.buffering_timeout_value () == timeout)
- {
- // Timeout value is the same, nothing to be done.
- }
- else
- {
- // Timeout value has changed, reset the old timer.
- reactor->reset_timer_interval (transport.buffering_timer_id (),
- timeout);
- }
- }
+ set_timer = 0;
+ return;
}
- else if (ACE_BIT_ENABLED (buffering_constraint.mode,
- TAO::BUFFER_TIMEOUT))
- {
- // We didn't have timeouts before, but we want them now.
- ACE_Time_Value timeout =
- this->time_conversion (buffering_constraint.timeout);
- // Get our reactor.
- ACE_Reactor *reactor = transport.orb_core ()->reactor ();
+ ACE_Time_Value timeout =
+ this->time_conversion (buffering_constraint.timeout);
- long timer_id = reactor->schedule_timer (transport.event_handler (),
- 0,
- timeout,
- timeout);
-
- transport.buffering_timer_id (timer_id);
- transport.buffering_timeout_value (timeout);
+ if (interval == timeout)
+ {
+ set_timer = 0;
+ return;
}
+
+ set_timer = 1;
+ interval = timeout;
}
ACE_Time_Value
-TAO_Eager_Buffering_Sync_Strategy::time_conversion (const TimeBase::TimeT &time)
+TAO_Eager_Buffering_Sync_Strategy::
+ time_conversion (const TimeBase::TimeT &time)
{
TimeBase::TimeT seconds = time / 10000000u;
TimeBase::TimeT microseconds = (time % 10000000u) / 10;
@@ -234,4 +119,14 @@ TAO_Eager_Buffering_Sync_Strategy::time_conversion (const TimeBase::TimeT &time)
ACE_U64_TO_U32 (microseconds));
}
+// ****************************************************************
+
+int
+TAO_Delayed_Buffering_Sync_Strategy::
+ must_queue (int queue_empty)
+{
+ // If the queue is empty we want to send immediately
+ return !queue_empty;
+}
+
#endif /* TAO_HAS_BUFFERING_CONSTRAINT_POLICY == 1 */