diff options
Diffstat (limited to 'TAO/tao/Transport.cpp')
-rw-r--r-- | TAO/tao/Transport.cpp | 132 |
1 files changed, 98 insertions, 34 deletions
diff --git a/TAO/tao/Transport.cpp b/TAO/tao/Transport.cpp index 335a0e75c9c..194072dc587 100644 --- a/TAO/tao/Transport.cpp +++ b/TAO/tao/Transport.cpp @@ -479,7 +479,7 @@ TAO_Transport::send_message_block_chain_i (const ACE_Message_Block *mb, int TAO_Transport::send_message_shared (TAO_Stub *stub, - int is_synchronous, + int message_semantics, const ACE_Message_Block *message_block, ACE_Time_Value *max_wait_time) { @@ -490,7 +490,7 @@ TAO_Transport::send_message_shared (TAO_Stub *stub, if (this->check_event_handler_i ("Transport::send_message_shared") == -1) return -1; - r = this->send_message_shared_i (stub, is_synchronous, + r = this->send_message_shared_i (stub, message_semantics, message_block, max_wait_time); } if (r == -1) @@ -511,34 +511,18 @@ TAO_Transport::send_synchronous_message_i (const ACE_Message_Block *mb, synch_message.push_back (this->head_, this->tail_); - int n = this->drain_queue_i (); - if (n == -1) - { - synch_message.remove_from_list (this->head_, this->tail_); - ACE_ASSERT (synch_message.next () == 0); - ACE_ASSERT (synch_message.prev () == 0); - return -1; // Error while sending... - } - else if (n == 1) - { - ACE_ASSERT (synch_message.all_data_sent ()); - ACE_ASSERT (synch_message.next () == 0); - ACE_ASSERT (synch_message.prev () == 0); - return 1; // Empty queue, message was sent.. - } + int n = + this->send_synch_message_helper_i (synch_message, + max_wait_time); - ACE_ASSERT (n == 0); // Some data sent, but data remains. + if (n == -1 || + n == 1) + return n; - if (synch_message.all_data_sent ()) - { - ACE_ASSERT (synch_message.next () == 0); - ACE_ASSERT (synch_message.prev () == 0); - return 1; - } + ACE_ASSERT (n == 0); // @todo: Check for timeouts! // if (max_wait_time != 0 && errno == ETIME) return -1; - TAO_Flushing_Strategy *flushing_strategy = this->orb_core ()->flushing_strategy (); (void) flushing_strategy->schedule_output (this); @@ -608,6 +592,87 @@ TAO_Transport::send_synchronous_message_i (const ACE_Message_Block *mb, } +int +TAO_Transport::send_reply_message_i (const ACE_Message_Block *mb, + ACE_Time_Value *max_wait_time) +{ + // Dont clone now.. We could be sent in one shot! + TAO_Synch_Queued_Message synch_message (mb); + + synch_message.push_back (this->head_, + this->tail_); + + int n = + this->send_synch_message_helper_i (synch_message, + max_wait_time); + + if (n == -1 || + n == 1) + return n; + + ACE_ASSERT (n == 0); + + if (TAO_debug_level > 3) + { + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - Transport[%d]::send_reply_message_i, " + "preparing to add to queue before leaving \n", + this->id ())); + } + + // Till this point we shouldnt have any copying and that is the + // point anyway. Now, remove the node from the list + synch_message.remove_from_list (this->head_, + this->tail_); + + // Clone the node that we have. + TAO_Queued_Message *msg = + synch_message.clone (this->orb_core_->transport_message_buffer_allocator ()); + + // Stick it in the queue + msg->push_back (this->head_, + this->tail_); + + TAO_Flushing_Strategy *flushing_strategy = + this->orb_core ()->flushing_strategy (); + + (void) flushing_strategy->schedule_output (this); + + return 1; +} + +int +TAO_Transport::send_synch_message_helper_i (TAO_Synch_Queued_Message &synch_message, + ACE_Time_Value * /*max_wait_time*/) +{ + // @@todo: Need to send timeouts for writing.. + int n = this->drain_queue_i (); + if (n == -1) + { + synch_message.remove_from_list (this->head_, this->tail_); + ACE_ASSERT (synch_message.next () == 0); + ACE_ASSERT (synch_message.prev () == 0); + return -1; // Error while sending... + } + else if (n == 1) + { + ACE_ASSERT (synch_message.all_data_sent ()); + ACE_ASSERT (synch_message.next () == 0); + ACE_ASSERT (synch_message.prev () == 0); + return 1; // Empty queue, message was sent.. + } + + ACE_ASSERT (n == 0); // Some data sent, but data remains. + + if (synch_message.all_data_sent ()) + { + ACE_ASSERT (synch_message.next () == 0); + ACE_ASSERT (synch_message.prev () == 0); + return 1; + } + + return 0; +} void @@ -659,11 +724,6 @@ TAO_Transport::close_connection_shared (int disable_purge, this->send_connection_closed_notifications (); } - - - - - int TAO_Transport::queue_is_empty_i (void) { @@ -671,8 +731,6 @@ TAO_Transport::queue_is_empty_i (void) } - - int TAO_Transport::schedule_output_i (void) { @@ -1041,15 +1099,21 @@ TAO_Transport::send_connection_closed_notifications (void) int TAO_Transport::send_message_shared_i (TAO_Stub *stub, - int is_synchronous, + int message_semantics, const ACE_Message_Block *message_block, ACE_Time_Value *max_wait_time) { - if (is_synchronous) + if (message_semantics == TAO_Transport::TAO_TWOWAY_REQUEST) { return this->send_synchronous_message_i (message_block, max_wait_time); } + else if (message_semantics == TAO_Transport::TAO_REPLY) + { + return this->send_reply_message_i (message_block, + max_wait_time); + } + // Let's figure out if the message should be queued without trying // to send first: |