summaryrefslogtreecommitdiff
path: root/TAO/tao/Transport.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'TAO/tao/Transport.cpp')
-rw-r--r--TAO/tao/Transport.cpp114
1 files changed, 45 insertions, 69 deletions
diff --git a/TAO/tao/Transport.cpp b/TAO/tao/Transport.cpp
index c272e341914..24e2ba58788 100644
--- a/TAO/tao/Transport.cpp
+++ b/TAO/tao/Transport.cpp
@@ -30,7 +30,6 @@ TAO_Transport::TAO_Transport (CORBA::ULong tag,
, bidirectional_flag_ (-1)
, head_ (0)
, tail_ (0)
- , current_message_ (0)
{
TAO_Client_Strategy_Factory *cf =
this->orb_core_->client_factory ();
@@ -96,13 +95,12 @@ TAO_Transport::handle_output ()
{
// ... there is no current message or it was completely
// sent, time to check the queue....
- int dequeue = this->dequeue_next_message ();
- if (dequeue == -1)
- {
- // ... no more messages in the queue, cancel output...
- (void) this->cancel_output ();
- return 0;
- }
+ // ... no more messages in the queue, cancel output...
+ TAO_Flushing_Strategy *flushing_strategy =
+ this->orb_core ()->flushing_strategy ();
+
+ flushing_strategy->cancel_output (this);
+ return 0;
}
// ... on Win32 we must continue until we get EWOULDBLOCK ...
}
@@ -240,13 +238,13 @@ TAO_Transport::send_current_message (void)
{
ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, ace_mon, this->queue_mutex_, -1);
- if (this->current_message_ == 0)
+ if (this->head_ == 0)
return 1;
size_t bytes_transferred;
ssize_t retval =
- this->send_message_block_chain (this->current_message_->mb (),
+ this->send_message_block_chain (this->head_->mb (),
bytes_transferred);
if (retval == 0)
{
@@ -257,14 +255,14 @@ TAO_Transport::send_current_message (void)
// Because there can be a partial transfer we need to adjust the
// number of bytes sent.
- this->current_message_->bytes_transferred (bytes_transferred);
- if (this->current_message_->done ())
+ this->head_->bytes_transferred (bytes_transferred);
+ if (this->head_->done ())
{
// Remove the current message....
- // @@ We should be using a pool for these guys!
- this->current_message_->destroy ();
+ TAO_Queued_Message *head = this->head_;
+ head->remove_from_list (this->head_, this->tail_);
- this->current_message_ = 0;
+ head->destroy ();
}
if (retval == -1)
@@ -278,33 +276,8 @@ TAO_Transport::send_current_message (void)
return -1;
}
- if (this->current_message_ == 0)
- return 1;
- return 0;
-}
-
-int
-TAO_Transport::dequeue_next_message (void)
-{
- ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, ace_mon, this->queue_mutex_, -1);
if (this->head_ == 0)
- return -1;
-
- this->current_message_ = this->head_;
- this->current_message_->remove_from_list (this->head_, this->tail_);
-
- return 0;
-}
-
-int
-TAO_Transport::cancel_output (void)
-{
- return 0;
-}
-
-int
-TAO_Transport::schedule_output (void)
-{
+ return 1;
return 0;
}
@@ -314,6 +287,9 @@ TAO_Transport::send_message_i (TAO_Stub *stub,
const ACE_Message_Block *message_block,
ACE_Time_Value *max_wait_time)
{
+ TAO_Flushing_Strategy *flushing_strategy =
+ this->orb_core ()->flushing_strategy ();
+
ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, ace_mon, this->queue_mutex_, -1);
int queue_empty = (this->head_ == 0);
@@ -321,7 +297,7 @@ TAO_Transport::send_message_i (TAO_Stub *stub,
// Let's figure out if the message should be queued without trying
// to send first:
int must_queue = 0;
- if (this->current_message_ != 0)
+ if (this->head_ != 0)
must_queue = 1;
else if (!twoway_flag
&& stub->sync_strategy ().must_queue (queue_empty))
@@ -342,16 +318,9 @@ TAO_Transport::send_message_i (TAO_Stub *stub,
this->handle ()));
}
- size_t length = message_block->total_length ();
- ACE_Message_Block *copy =
- new ACE_Message_Block (length);
- for (const ACE_Message_Block *i = message_block;
- i != 0;
- i = i->cont ())
- copy->copy (i->rd_ptr (), i->length ());
-
queued_message =
- new TAO_Queued_Message (copy, 1);
+ this->copy_message_block (message_block);
+
queued_message->push_back (this->head_, this->tail_);
}
else
@@ -400,7 +369,7 @@ TAO_Transport::send_message_i (TAO_Stub *stub,
// ... the message was only partially sent, schedule reactive
// output...
- this->schedule_output ();
+ flushing_strategy->schedule_output (this);
// ... and set it as the current message ...
if (twoway_flag)
@@ -414,16 +383,8 @@ TAO_Transport::send_message_i (TAO_Stub *stub,
}
else
{
- size_t length = message_block->total_length ();
- ACE_Message_Block *copy =
- new ACE_Message_Block (length);
- for (const ACE_Message_Block *i = message_block;
- i != 0;
- i = i->cont ())
- copy->copy (i->rd_ptr (), i->length ());
-
queued_message =
- new TAO_Queued_Message (copy, 1);
+ this->copy_message_block (message_block);
}
// @@ Revisit message queue allocations
@@ -454,15 +415,15 @@ TAO_Transport::send_message_i (TAO_Stub *stub,
queued_message->mb ()->total_length ()));
}
- this->current_message_ = queued_message;
+ // ... insert at the head of the queue, we can use push_back()
+ // because the queue is empty ...
+ queued_message->push_back (this->head_, this->tail_);
}
// ... two choices, this is a twoway request or not, if it is
// then we must only return once the complete message has been
// sent:
- TAO_Flushing_Strategy *flushing_strategy =
- this->orb_core ()->flushing_strategy ();
if (twoway_flag)
{
// Release the mutex, other threads may modify the queue as we
@@ -485,11 +446,6 @@ TAO_Transport::send_message_i (TAO_Stub *stub,
msg_count++;
total_bytes += i->mb ()->total_length ();
}
- if (this->current_message_ != 0)
- {
- msg_count++;
- total_bytes += this->current_message_->mb ()->total_length ();
- }
int set_timer;
ACE_Time_Value interval;
@@ -549,3 +505,23 @@ TAO_Transport::reactor_signalling (void)
{
return 0;
}
+
+TAO_Queued_Message *
+TAO_Transport::copy_message_block (const ACE_Message_Block *message_block)
+{
+ size_t length = message_block->total_length ();
+
+ // @@ Use Auto_Ptr<> to cleanup the message block, should the second
+ // allocation fail
+ ACE_Message_Block *copy;
+ ACE_NEW_RETURN (copy, ACE_Message_Block (length), 0);
+ for (const ACE_Message_Block *i = message_block;
+ i != 0;
+ i = i->cont ())
+ copy->copy (i->rd_ptr (), i->length ());
+
+ TAO_Queued_Message *msg;
+ ACE_NEW_RETURN (msg, TAO_Queued_Message (copy, 1), 0);
+
+ return msg;
+}