summaryrefslogtreecommitdiff
path: root/TAO/tao/Transport.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'TAO/tao/Transport.cpp')
-rw-r--r--TAO/tao/Transport.cpp1208
1 files changed, 604 insertions, 604 deletions
diff --git a/TAO/tao/Transport.cpp b/TAO/tao/Transport.cpp
index 1640644edf9..9923ff1f895 100644
--- a/TAO/tao/Transport.cpp
+++ b/TAO/tao/Transport.cpp
@@ -19,9 +19,7 @@
#include "Resume_Handle.h"
#include "Codeset_Manager.h"
#include "Codeset_Translator_Factory.h"
-#include "GIOP_Message_State.h"
#include "ace/OS_NS_sys_time.h"
-#include "ace/Message_Block.h"
#include "ace/Reactor.h"
@@ -112,15 +110,12 @@ TAO_Transport::TAO_Transport (CORBA::ULong tag,
, head_ (0)
, tail_ (0)
, incoming_message_queue_ (orb_core)
- , uncompleted_message_ (0)
, current_deadline_ (ACE_Time_Value::zero)
, flush_timer_id_ (-1)
, transport_timer_ (this)
, handler_lock_ (orb_core->resource_factory ()->create_cached_connection_lock ())
, id_ ((size_t) this)
, purging_order_ (0)
- , recv_buffer_size_ (0)
- , sent_byte_count_ (0)
, char_translator_ (0)
, wchar_translator_ (0)
, tcs_set_ (0)
@@ -241,9 +236,13 @@ TAO_Transport::generate_request_header (
{
// codeset service context is only supposed to be sent in the first request
// on a particular connection.
- TAO_Codeset_Manager *csm = this->orb_core()->codeset_manager();
- if (csm && this->first_request_)
- csm->generate_service_context( opdetails, *this );
+ if (this->first_request_)
+ {
+ this->orb_core ()->codeset_manager ()->generate_service_context (
+ opdetails,
+ *this
+ );
+ }
if (this->messaging_object ()->generate_request_header (opdetails,
spec,
@@ -605,12 +604,7 @@ int
TAO_Transport::schedule_output_i (void)
{
ACE_Event_Handler *eh = this->event_handler_i ();
- if (eh == 0)
- return -1;
-
ACE_Reactor *reactor = eh->reactor ();
- if (reactor == 0)
- return -1;
if (TAO_debug_level > 3)
{
@@ -626,12 +620,7 @@ int
TAO_Transport::cancel_output_i (void)
{
ACE_Event_Handler *eh = this->event_handler_i ();
- if (eh == 0)
- return -1;
-
ACE_Reactor *reactor = eh->reactor ();
- if (reactor == 0)
- return -1;
if (TAO_debug_level > 3)
{
@@ -750,9 +739,6 @@ TAO_Transport::drain_queue_helper (int &iovcnt, iovec iov[])
// no bytes are sent send() can only return 0 or -1
ACE_ASSERT (byte_count != 0);
- // Total no. of bytes sent for a send call
- this->sent_byte_count_ += byte_count;
-
if (TAO_debug_level > 4)
{
ACE_DEBUG ((LM_DEBUG,
@@ -776,10 +762,6 @@ TAO_Transport::drain_queue_i (void)
// We loop over all the elements in the queue ...
TAO_Queued_Message *i = this->head_;
- // reset the value so that the counting is done for each new send
- // call.
- this->sent_byte_count_ = 0;
-
while (i != 0)
{
// ... each element fills the iovector ...
@@ -838,14 +820,8 @@ TAO_Transport::drain_queue_i (void)
if (this->flush_timer_pending ())
{
ACE_Event_Handler *eh = this->event_handler_i ();
- if (eh != 0)
- {
- ACE_Reactor *reactor = eh->reactor ();
- if (reactor != 0)
- {
- (void) reactor->cancel_timer (this->flush_timer_id_);
- }
- }
+ ACE_Reactor *reactor = eh->reactor ();
+ reactor->cancel_timer (this->flush_timer_id_);
this->reset_flush_timer ();
}
@@ -922,25 +898,20 @@ TAO_Transport::check_buffering_constraints_i (TAO_Stub *stub,
if (set_timer)
{
ACE_Event_Handler *eh = this->event_handler_i ();
- if (eh != 0)
- {
- ACE_Reactor *reactor = eh->reactor ();
- if (reactor != 0)
- {
- this->current_deadline_ = new_deadline;
- ACE_Time_Value delay =
- new_deadline - ACE_OS::gettimeofday ();
+ ACE_Reactor *reactor = eh->reactor ();
+ this->current_deadline_ = new_deadline;
+ ACE_Time_Value delay =
+ new_deadline - ACE_OS::gettimeofday ();
- if (this->flush_timer_pending ())
- {
- (void) reactor->cancel_timer (this->flush_timer_id_);
- }
- this->flush_timer_id_ =
- reactor->schedule_timer (&this->transport_timer_,
- &this->current_deadline_,
- delay);
- }
+ if (this->flush_timer_pending ())
+ {
+ reactor->cancel_timer (this->flush_timer_id_);
}
+
+ this->flush_timer_id_ =
+ reactor->schedule_timer (&this->transport_timer_,
+ &this->current_deadline_,
+ delay);
}
return constraints_reached;
@@ -1147,18 +1118,6 @@ TAO_Transport::send_message_shared_i (TAO_Stub *stub,
return 0;
}
-
-class CTHack
-{
-public:
- CTHack() { enter(); }
- ~CTHack() { leave(); }
-private:
- void enter() { x = 1; }
- void leave() { x = 0; }
- int x;
-};
-
/*
*
* All the methods relevant to the incoming data path of the ORB are
@@ -1170,8 +1129,6 @@ TAO_Transport::handle_input (TAO_Resume_Handle &rh,
ACE_Time_Value * max_wait_time,
int /*block*/)
{
- CTHack cthack;
-
if (TAO_debug_level > 3)
{
ACE_DEBUG ((LM_DEBUG,
@@ -1179,8 +1136,9 @@ TAO_Transport::handle_input (TAO_Resume_Handle &rh,
this->id ()));
}
- // First try to process messages off the head of the incoming queue.
+ // First try to process messages of the head of the incoming queue.
int retval = this->process_queue_head (rh);
+
if (retval <= 0)
{
if (retval == -1)
@@ -1191,6 +1149,7 @@ TAO_Transport::handle_input (TAO_Resume_Handle &rh,
"error while parsing the head of the queue\n",
this->id()));
}
+
return retval;
}
@@ -1199,7 +1158,7 @@ TAO_Transport::handle_input (TAO_Resume_Handle &rh,
// The buffer on the stack which will be used to hold the input
// messages
- char buf[TAO_MAXBUFSIZE];
+ char buf [TAO_MAXBUFSIZE];
#if defined (ACE_HAS_PURIFY)
(void) ACE_OS::memset (buf,
@@ -1221,35 +1180,26 @@ TAO_Transport::handle_input (TAO_Resume_Handle &rh,
ACE_Message_Block::DONT_DELETE,
this->orb_core_->input_cdr_msgblock_allocator ());
- // We'll loop trying to complete the message this number of times,
- // and that's it.
- unsigned int number_of_read_attempts = TAO_MAX_TRANSPORT_REREAD_ATTEMPTS;
-
- unsigned int did_queue_message = 0;
// Align the message block
ACE_CDR::mb_align (&message_block);
size_t recv_size = 0;
+
if (this->orb_core_->orb_params ()->single_read_optimization ())
{
- recv_size = message_block.space ();
+ recv_size =
+ message_block.space ();
}
else
{
- recv_size = this->messaging_object ()->header_length ();
+ recv_size =
+ this->messaging_object ()->header_length ();
}
- // Saving the size of the received buffer in case any one needs to
- // get the size of the message thats received in the
- // context. Obviously the value will be changed for each recv call
- // and the user is supposed to invoke the accessor only in the
- // invocation context to get meaningful information.
- this->recv_buffer_size_ = recv_size;
-
// Read the message into the message block that we have created on
// the stack.
- ssize_t n = this->recv (message_block.wr_ptr (),
+ ssize_t n = this->recv (message_block.rd_ptr (),
recv_size,
max_wait_time);
@@ -1262,7 +1212,7 @@ TAO_Transport::handle_input (TAO_Resume_Handle &rh,
if (TAO_debug_level > 2)
{
ACE_DEBUG ((LM_DEBUG,
- "TAO (%P|%t) - Transport[%d]::handle_input_i: "
+ "TAO (%P|%t) - Transport[%d]::handle_input_i, "
"read %d bytes\n",
this->id (), n));
}
@@ -1270,372 +1220,172 @@ TAO_Transport::handle_input (TAO_Resume_Handle &rh,
// Set the write pointer in the stack buffer
message_block.wr_ptr (n);
- if (TAO_debug_level >= 10)
- ACE_HEX_DUMP ((LM_DEBUG,
- (const char *) message_block.rd_ptr (),
- message_block.length (),
- ACE_TEXT ("TAO (%P|%t) Transport::handle_input_i(): bytes read from socket")));
-
+ // Parse the message and try consolidating the message if
+ // needed.
+ retval = this->parse_consolidate_messages (message_block,
+ rh,
+ max_wait_time);
-complete_message_and_possibly_enqueue:
- // Check to see if we're still working to complete a message
- if (this->uncompleted_message_)
+ if (retval <= 0)
{
- // try to complete it
+ if (retval == -1 && TAO_debug_level > 0)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "TAO (%P|%t) - Transport[%d]::handle_input_i, "
+ "error while parsing and consolidating\n",
+ this->id ()));
+ }
+ return retval;
+ }
- // on exit from this frame we have one of the following states:
- //
- // (a) an uncompleted message still in uncompleted_message_
- // AND message_block is empty
- //
- // (b) uncompleted_message_ zero, the completed message at the
- // tail of the incoming queue; message_block could be empty
- // or still contain bytes
+ // Make a node of the message block..
+ TAO_Queued_Data qd (&message_block,
+ this->orb_core_->transport_message_buffer_allocator ());
- // ==> repeat
- do
- {
- /*
- * Append the "right number of bytes" to uncompleted_message_
- */
- // ==> right_number_of_bytes = MIN(bytes missing from
- // uncompleted_message_, length of message_block);
- size_t right_number_of_bytes =
- ACE_MIN (this->uncompleted_message_->missing_data_bytes_,
- message_block.length () );
+ // Extract the data for the node..
+ this->messaging_object ()->get_message_data (&qd);
- if (TAO_debug_level > 2)
- {
- ACE_DEBUG ((LM_DEBUG,
- "(%P|%t) Transport[%d]::handle_input_i: "
- "trying to use %u (of %u) "
- "bytes to complete message missing %u bytes\n",
- this->id (),
- right_number_of_bytes,
- message_block.length (),
- this->uncompleted_message_->missing_data_bytes_));
- }
+ // Check whether the message was fragmented..
+ if (qd.more_fragments_ ||
+ (qd.msg_type_ == TAO_PLUGGABLE_MESSAGE_FRAGMENT))
+ {
+ // Duplicate the node that we have as the node is on stack..
+ TAO_Queued_Data *nqd =
+ TAO_Queued_Data::duplicate (qd);
- // ==> append right_number_of_bytes from message_block
- // to uncomplete_message_ & update read pointer of
- // message_block;
-
- // 1. we assume that uncompleted_message_.msg_block_'s
- // wr_ptr is properly maintained
- // 2. we presume that uncompleted_message_.msg_block was
- // allocated with enough space to contain the *entire*
- // expected GIOP message, so this copy shouldn't involve an
- // additional allocation
- this->uncompleted_message_->msg_block_->copy (message_block.rd_ptr (),
- right_number_of_bytes);
- this->uncompleted_message_->missing_data_bytes_ -= right_number_of_bytes;
- message_block.rd_ptr (right_number_of_bytes);
-
- switch (this->uncompleted_message_->current_state_)
- {
- case TAO_Queued_Data::WAITING_TO_COMPLETE_HEADER:
- {
- int hdrvalidity = this->messaging_object()->check_for_valid_header (
- *this->uncompleted_message_->msg_block_);
- if (hdrvalidity == 0)
- {
- // According to the spec, Section 15.4.8, we should send
- // the MessageError GIOP message on receipt of "any message...whose
- // header is not properly formed (e.g., has the wrong magic value)".
- //
- // So, rather than returning -1, what we REALLY need to do is
- // send a MessageError in reply.
- //
- // I'm not sure what the best way to trigger that is...probably to
- // queue up a special internal-only COMPLETED message that, when
- // processed, sends the MessageError as part of its processing.
- return -1;
- }
- else if (hdrvalidity == 1)
- {
- // ==> update bytes missing from uncompleted_message_
- // with size of message from valid header;
- this->messaging_object()->set_queued_data_from_message_header (
- this->uncompleted_message_,
- *this->uncompleted_message_->msg_block_);
- // ==> change state of uncompleted_event_ to
- // WAITING_TO_COMPLETE_PAYLOAD;
- this->uncompleted_message_->current_state_ =
- TAO_Queued_Data::WAITING_TO_COMPLETE_PAYLOAD;
-
- // ==> Resize the message block to have capacity for
- // the rest of the incoming message
- ACE_Message_Block & mb = *this->uncompleted_message_->msg_block_;
- ACE_CDR::grow (&mb,
- mb.size ()
- + this->uncompleted_message_->missing_data_bytes_);
-
- if (TAO_debug_level > 2)
- {
- ACE_DEBUG ((LM_DEBUG,
- "(%P|%t) Transport[%d]::handle_input_i: "
- "found a valid header in the message; "
- "waiting for %u bytes to complete payload\n",
- this->id (),
- this->uncompleted_message_->missing_data_bytes_));
- }
-
- // Continue the loop...
- continue;
- }
- // In the case where we don't have enough information (hdrvalidity == -1),
- // we just have to fall through and collect more.
-#if 0
- else
- {
- // What the heck will we do with a bad header? Just
- // better to close the connection and let things
- // re-train from there.
- if (this->uncompleted_message_->msg_block_->length () ==
- this->messaging_object()->header_length())
- return -1;
-
-#if 0 // I don't think I need this clause, but I'm leaving it just in case.
- // ==> bytes missing from uncompleted_message_ -= right_number_of_bytes;
- this->uncompleted_message_->missing_data_bytes_ -= right_number_of_bytes;
- ACE_ASSERT (this->uncompleted_message_->missing_data_bytes_ > 0);
-#endif
- }
-#endif
- }
- break;
-
- case TAO_Queued_Data::WAITING_TO_COMPLETE_PAYLOAD:
- // Here we have an opportunity to try to finish reading the
- // uncompleted message. This is a Good Idea(TM) because there are
- // good odds that either more data came available since the last
- // time we read, or that we simply didn't read the whole message on
- // the first read. So, we try to read again.
- //
- // NOTE! this changes this->uncompleted_message_!
- this->try_to_complete (max_wait_time);
+ return this->consolidate_fragments (nqd, rh);
+ }
- // ==> if (bytes missing from uncompleted_message_ == 0)
- if (this->uncompleted_message_->missing_data_bytes_ == 0)
- {
- /*
- * We completed the message! Hooray!
- */
- // ==> place uncompleted_message_ (which is now
- // complete!) at the tail of the incoming message
- // queue;
-
- // ---> NOTE: whoever pulls this off the queue must delete it!
- this->uncompleted_message_->current_state_
- = TAO_Queued_Data::COMPLETED;
-
- // @@CJC NEED TO CHECK RETURN VALUE HERE!
- this->enqueue_incoming_message (this->uncompleted_message_);
- did_queue_message = 1;
- // zero out uncompleted_message_;
- this->uncompleted_message_ = 0;
-
- if (TAO_debug_level > 2)
- {
- ACE_DEBUG ((LM_DEBUG,
- "(%P|%t) Transport[%d]::handle_input_i: "
- "completed and queued message for processing!\n",
- this->id ()));
- }
+ // Process the message
+ return this->process_parsed_messages (&qd,
+ rh);
+}
- }
- else
- {
+int
+TAO_Transport::parse_consolidate_messages (ACE_Message_Block &block,
+ TAO_Resume_Handle &rh,
+ ACE_Time_Value *max_wait_time)
+{
+ // Parse the incoming message for validity. The check needs to be
+ // performed by the messaging objects.
+ if (this->parse_incoming_messages (block) == -1)
+ {
+ return -1;
+ }
- if (TAO_debug_level > 2)
- {
- ACE_DEBUG ((LM_DEBUG,
- "(%P|%t) Transport[%d]::handle_input_i: "
- "still need %u bytes to complete uncompleted message.\n",
- this->id (),
- this->uncompleted_message_->missing_data_bytes_));
- }
- }
- break;
+ // Check whether we have a complete message for processing
+ ssize_t missing_data = this->missing_data (block);
- default:
- // @@CJC What do we do here?!
- ACE_ASSERT (! "Transport::handle_input_i: unexpected state"
- "in uncompleted_message_");
- }
- }
- // Does the order of the checks matter? In both (a) and (b),
- // message_block is empty, but only in (b) is there no
- // uncompleted_message_.
- // ==> until (message_block is empty || there is no uncompleted_message_);
- // or, rewritten in C++ looping constructs
- // ==> while ( ! message_block is empty && there is an uncompleted_message_ );
- while (message_block.length() != 0 && this->uncompleted_message_);
- }
-
- // *****************************
- // @@ CJC
- //
- // Once upon a time we tried to complete reading the uncompleted
- // message here, but testing found that completing later worked
- // better.
- // *****************************
-
-
- // At this point, there should be nothing in uncompleted_message_.
- // We now need to chop up the bytes in message_block and store any
- // complete messages in the incoming message queue.
- //
- // ==> if (message_block still has data)
- if (message_block.length () != 0)
- {
- TAO_Queued_Data *complete_message = 0;
- do
- {
- if (TAO_debug_level >= 10)
- {
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT("TAO (%P|%t) Transport::handle_input_i: ")
- ACE_TEXT("extracting complete messages\n")));
- ACE_HEX_DUMP ((LM_DEBUG,
- message_block.rd_ptr (),
- message_block.length (),
- ACE_TEXT (" from this message buffer")));
- }
- complete_message =
- TAO_Queued_Data::make_completed_message (
- message_block, *this->messaging_object ());
- if (complete_message)
- {
- this->enqueue_incoming_message (complete_message);
- did_queue_message = 1;
- }
- }
- while (complete_message != 0);
- // On exit from this frame we have one of the following states:
- // (a) message_block is empty
- // (b) message_block contains bytes from a partial message
- }
-
- // If, at this point, there's still data in message_block, it's
- // an incomplete message. Therefore, we stuff it into the
- // uncompleted_message_ and clear out message_block.
- // ==> if (message_block still has data)
- if (message_block.length () != 0)
- {
- // duplicate message_block remainder into this->uncompleted_message_
- ACE_ASSERT (this->uncompleted_message_ == 0);
- this->uncompleted_message_ =
- TAO_Queued_Data::make_uncompleted_message (&message_block,
- *this->messaging_object ());
- ACE_ASSERT (this->uncompleted_message_ != 0);
-
- // In a debug build, we won't reach this point if we couldn't
- // create an uncompleted message because the above ASSERT will
- // trip. However, in an optimized build, the ASSERT isn't
- // there, so we'll go past here.
- //
- // We could put a check in here similar to the ASSERT condition,
- // but doing that would terminate this loop early and result in
- // our never processing any completed messages that were received
- // in this trip to handle_input_i.
- //
- // Maybe we could instead queue up a special completed message that,
- // when processed, causes the connection to get closed in a non-graceful
- // termination scenario.
- }
-
- // We should have consumed ALL the bytes by now.
- ACE_ASSERT (message_block.length () == 0);
-
- //
- // We don't want to try to re-read earlier because we may not have
- // an uncompleted message until we get to this point. So, if we did
- // it earlier, we could have missed the opportunity to complete it
- // and dispatch.
- //
- // Thanks to Bala <bala@cse.wustl.edu> for the idea to read again
- // to increase throughput!
-
- if (this->uncompleted_message_)
- {
- if (number_of_read_attempts--)
- {
- // We try to read again just in case more data arrived while
- // we were doing the stuff above. This way, we can increase
- // throughput without much of a penalty.
+ if (missing_data < 0)
+ {
+ // If we have more than one message
+ return this->consolidate_extra_messages (block,
+ rh);
+ }
+ else if (missing_data > 0)
+ {
+ // If we have missing data then try doing a read or try queueing
+ // them.
+ return this->consolidate_message (block,
+ missing_data,
+ rh,
+ max_wait_time);
+ }
- if (TAO_debug_level > 2)
- {
- ACE_DEBUG ((LM_DEBUG,
- "(%P|%t) Transport[%d]::handle_input_i: "
- "still have an uncompleted message; "
- "will try %d more times before letting "
- "somebody else have a chance.\n",
- this->id (),
- number_of_read_attempts));
- }
+ return 1;
+}
- // We only bother trying to complete payload, not header, because the
- // retry only happens in the complete-the-payload clause above.
- if (this->uncompleted_message_->current_state_ ==
- TAO_Queued_Data::WAITING_TO_COMPLETE_PAYLOAD)
- goto complete_message_and_possibly_enqueue;
- }
- else
+int
+TAO_Transport::parse_incoming_messages (ACE_Message_Block &block)
+{
+ // If we have a queue and if the last message is not complete a
+ // complete one, then this read will get us the remaining data. So
+ // do not try to parse the header if we have an incomplete message
+ // in the queue.
+ if (this->incoming_message_queue_.is_tail_complete () != 0)
+ {
+ // As it looks like a new message has been read, process the
+ // message. Call the messaging object to do the parsing..
+ int retval =
+ this->messaging_object ()->parse_incoming_messages (block);
+
+ if (retval == -1)
{
- // The queue should be empty because it should have been processed
- // above. But I wonder if I should put a check in here anyway.
if (TAO_debug_level > 2)
- {
- ACE_DEBUG ((LM_DEBUG,
- "(%P|%t) Transport[%d]::handle_input_i: "
- "giving up reading for now and returning "
- "with incoming queue length = %d\n",
- this->id (),
- this->incoming_message_queue_.queue_length ()));
- if (this->uncompleted_message_)
- ACE_DEBUG ((LM_DEBUG,
- "(%P|%t) Transport[%d]::handle_input_i: "
- "missing bytes from uncompleted message = %u\n",
- this->id (),
- this->uncompleted_message_->missing_data_bytes_));
- }
- return 1;
+ ACE_DEBUG ((LM_DEBUG,
+ "TAO (%P|%t) - Transport[%d]::parse_incoming_messages, "
+ "error in incoming message\n",
+ this->id ()));
+
+ return -1;
}
}
- // **** END CJC PMG CHANGES ****
+ return 0;
+}
+
+
+size_t
+TAO_Transport::missing_data (ACE_Message_Block &incoming)
+{
+ // If we have a incomplete message in the queue then find out how
+ // much of data is required to get a complete message.
+ if (this->incoming_message_queue_.is_tail_complete () == 0)
+ {
+ return this->incoming_message_queue_.missing_data_tail ();
+ }
- return did_queue_message ? this->process_queue_head (rh) : 1;
+ return this->messaging_object ()->missing_data (incoming);
}
-void
-TAO_Transport::try_to_complete (ACE_Time_Value *max_wait_time)
+int
+TAO_Transport::consolidate_message (ACE_Message_Block &incoming,
+ ssize_t missing_data,
+ TAO_Resume_Handle &rh,
+ ACE_Time_Value *max_wait_time)
{
- if (this->uncompleted_message_ == 0)
- return;
+ // Check whether the last message in the queue is complete..
+ if (this->incoming_message_queue_.is_tail_complete () == 0)
+ {
+ return this->consolidate_message_queue (incoming,
+ missing_data,
+ rh,
+ max_wait_time);
+ }
+
+ if (TAO_debug_level > 4)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "TAO (%P|%t) - Transport[%d]::consolidate_message\n",
+ this->id ()));
+ }
+
+ // Calculate the actual length of the load that we are supposed to
+ // read which is equal to the <missing_data> + length of the buffer
+ // that we have..
+ size_t payload = missing_data + incoming.size ();
+
+ // Grow the buffer to the size of the message
+ ACE_CDR::grow (&incoming,
+ payload);
ssize_t n = 0;
- size_t &missing_data = this->uncompleted_message_->missing_data_bytes_;
- ACE_Message_Block &mb = *this->uncompleted_message_->msg_block_;
- // Try to complete this until we error or block right here...
- for (ssize_t bytes = missing_data;
- bytes != 0;
- bytes -= n)
+ // As this used for transports where things are available in one
+ // shot this looping should not create any problems.
+ for (ssize_t bytes = missing_data; bytes != 0; bytes -= n)
{
// .. do a read on the socket again.
- n = this->recv (mb.wr_ptr (),
+ n = this->recv (incoming.wr_ptr (),
bytes,
max_wait_time);
if (TAO_debug_level > 6)
{
ACE_DEBUG ((LM_DEBUG,
- "TAO (%P|%t) - Transport[%d]::handle_input_i, "
+ "TAO (%P|%t) - Transport[%d]::consolidate_message, "
"read %d bytes on attempt\n",
this->id(), n));
}
@@ -1645,168 +1395,375 @@ TAO_Transport::try_to_complete (ACE_Time_Value *max_wait_time)
break;
}
- mb.wr_ptr (n);
+ incoming.wr_ptr (n);
missing_data -= n;
}
+
+ // If we got an error..
+ if (n == -1)
+ {
+ if (TAO_debug_level > 4)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "TAO (%P|%t) - Trasport[%d]::consolidate_message, "
+ "error while trying to consolidate\n",
+ this->id ()));
+ }
+
+ return -1;
+ }
+
+ // If we had gotten a EWOULDBLOCK n would be equal to zero. But we
+ // have to put the message in the queue anyway. So let us proceed
+ // to do that and return...
+
+ // Check to see if we have messages in queue or if we have missing
+ // data . AT this point we cannot have have semi-complete messages
+ // in the queue as they would have been taken care before. Put
+ // ourselves in the queue and then try processing one of the
+ // messages..
+ if ((missing_data > 0
+ ||this->incoming_message_queue_.queue_length ())
+ && this->incoming_message_queue_.is_tail_fragmented () == 0)
+ {
+ if (TAO_debug_level > 4)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "TAO (%P|%t) - Transport[%d]::consolidate_message, "
+ "queueing up the message\n",
+ this->id ()));
+ }
+
+ // Get a queued data
+ TAO_Queued_Data *qd =
+ this->make_queued_data (incoming);
+
+ // Add the missing data to the queue
+ qd->missing_data_ = missing_data;
+
+ // Get the rest of the messaging data
+ this->messaging_object ()->get_message_data (qd);
+
+ // Add it to the tail of the queue..
+ this->incoming_message_queue_.enqueue_tail (qd);
+
+ if (this->incoming_message_queue_.is_head_complete ())
+ {
+ return this->process_queue_head (rh);
+ }
+
+ return 0;
+ }
+
+ // We dont have any missing data. Just make a queued_data node with
+ // the existing message block and send it to the higher layers of
+ // the ORB.
+ TAO_Queued_Data pqd (&incoming,
+ this->orb_core_->transport_message_buffer_allocator ());
+ pqd.missing_data_ = missing_data;
+ this->messaging_object ()->get_message_data (&pqd);
+
+ // Check whether the message was fragmented and try to consolidate
+ // the fragments..
+ if (pqd.more_fragments_ ||
+ (pqd.msg_type_ == TAO_PLUGGABLE_MESSAGE_FRAGMENT))
+ {
+ // Duplicate the queued data as it is on stack..
+ TAO_Queued_Data *nqd = TAO_Queued_Data::duplicate (pqd);
+
+ return this->consolidate_fragments (nqd, rh);
+ }
+
+ // Now we have a full message in our buffer. Just go ahead and
+ // process that
+ return this->process_parsed_messages (&pqd,
+ rh);
}
+int
+TAO_Transport::consolidate_fragments (TAO_Queued_Data *qd,
+ TAO_Resume_Handle &rh)
+{
+ // If we have received a fragment message then we have to
+ // consolidate <qd> with the last message in queue
+ // @@todo: this piece of logic follows GIOP a bit... Need to revisit
+ // if we have protocols other than GIOP
+
+ // @@todo: Fragments now have too much copying overhead. Need to get
+ // them out if we want to have some reasonable performance metrics
+ // in future.. Post 1.2 seems a nice time..
+ if (qd->msg_type_ == TAO_PLUGGABLE_MESSAGE_FRAGMENT)
+ {
+ TAO_Queued_Data *tqd =
+ this->incoming_message_queue_.dequeue_tail ();
+
+ tqd->more_fragments_ = qd->more_fragments_;
+ tqd->missing_data_ = qd->missing_data_;
+
+ if (this->messaging_object ()->consolidate_fragments (tqd, qd) == -1)
+ {
+ return -1;
+ }
+
+ TAO_Queued_Data::release (qd);
+ this->incoming_message_queue_.enqueue_tail (tqd);
+ this->process_queue_head (rh);
+ }
+ else
+ {
+ // if we dont have a fragment already in the queue just add it in
+ // the queue
+ this->incoming_message_queue_.enqueue_tail (qd);
+ }
+
+ return 0;
+}
int
-TAO_Transport::enqueue_incoming_message (TAO_Queued_Data *queueable_message)
+TAO_Transport::consolidate_message_queue (ACE_Message_Block &incoming,
+ ssize_t missing_data,
+ TAO_Resume_Handle &rh,
+ ACE_Time_Value *max_wait_time)
{
- // Get the GIOP version
- CORBA::Octet major = queueable_message->major_version_;
- CORBA::Octet minor = queueable_message->minor_version_;
- CORBA::UShort whole = major << 8 | minor;
-
- // Set up a couple of pointers that are shared by the code
- // for the different GIOP versions.
- ACE_Message_Block *mb = 0;
- TAO_Queued_Data *fragment_message = 0;
-
- switch(whole)
- {
- case 0x0100: // GIOP 1.0
- if (!queueable_message->more_fragments_)
- return this->incoming_message_queue_.enqueue_tail (
- queueable_message);
-
- // Fragments aren't supported in 1.0. This is an error and
- // we should reject it somehow. What do we do here? Do we throw
- // an exception to the receiving side? Do we throw an exception
- // to the sending side?
- //
- // At the very least, we need to log the fact that we received
- // nonsense.
- ACE_ERROR_RETURN ((LM_ERROR,
- "TAO (%P|%t) - "
- "TAO_Transport::enqueue_incoming_message "
- "detected a fragmented GIOP 1.0 message\n"),
- -1);
- break;
- case 0x0101: // GIOP 1.1
- // In 1.1, fragments kinda suck because they don't have they're
- // own message-specific header. Therefore, we have to do the
- // following:
- fragment_message =
- this->incoming_message_queue_.find_fragment (major, minor);
-
- // No fragment was found
- if (fragment_message == 0)
- return this->incoming_message_queue_.enqueue_tail (
- queueable_message);
-
- if (queueable_message->more_fragments_)
+ if (TAO_debug_level > 4)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "TAO (%P|%t) - Transport[%d]::consolidate_message_queue\n",
+ this->id ()));
+ }
+
+ // If the queue did not have a complete message put this piece of
+ // message in the queue. We know it did not have a complete
+ // message. That is why we are here.
+ size_t n =
+ this->incoming_message_queue_.copy_tail (incoming);
+
+ if (TAO_debug_level > 6)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "TAO (%P|%t) - Transport[%d]::consolidate_message_queue, "
+ "copied [%d] bytes to the tail\n",
+ this->id (),
+ n));
+ }
+
+ // Update the missing data...
+ missing_data =
+ this->incoming_message_queue_.missing_data_tail ();
+
+ if (TAO_debug_level > 6)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "TAO (%P|%t) - Transport[%d]::consolidate_message_queue, "
+ "missing [%d] bytes in the tail message\n",
+ this->id (),
+ missing_data));
+ }
+
+ // Move the read pointer of the <incoming> message block to the end
+ // of the copied message and process the remaining portion...
+ incoming.rd_ptr (n);
+
+ // If we have some more information left in the message block..
+ if (incoming.length ())
+ {
+ // We may have to parse & consolidate. This part of the message
+ // doesn't seem to be part of the last message in the queue (as
+ // the copy () hasn't taken away this message).
+ int retval = this->parse_consolidate_messages (incoming,
+ rh,
+ max_wait_time);
+
+ // If there is an error return
+ if (retval == -1)
{
- // Find the last message block in the continuation
- mb = fragment_message->msg_block_;
- while (mb->cont () != 0)
- mb = mb->cont ();
-
- // Add the current message block to the end of the chain
- // after adjusting the read pointer to skip the GIOP header
- queueable_message->msg_block_->rd_ptr(TAO_GIOP_MESSAGE_HEADER_LEN);
- mb->cont (queueable_message->msg_block_);
-
- // Get rid of the queuable message but save the message block
- queueable_message->msg_block_ = 0;
- queueable_message->release ();
-
- // One note is that TAO_Queued_Data contains version numbers,
- // but doesn't indicate the actual protocol to which those
- // version numbers refer. That's not a problem, though, because
- // instances of TAO_Queued_Data live in a queue, and that queue
- // lives in a particular instance of a Transport, and the
- // transport instance has an association with a particular
- // messaging_object. The concrete messaging object embodies a
- // messaging protocol, and must cover all versions of that
- // protocol. Therefore, we just need to cover the bases of all
- // versions of that one protocol.
+ if (TAO_debug_level)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "TAO (%P|%t) - Transport[%d]::consolidate_message_queue, "
+ "error while consolidating, part of the read message\n",
+ this->id ()));
+ }
+ return retval;
}
- else
+ else if (retval == 1)
{
- // There is a complete chain of fragments
- fragment_message->consolidate ();
+ // If the message in the <incoming> message block has only
+ // one message left we need to process that seperately.
+
+ // Get a queued data
+ TAO_Queued_Data *qd = this->make_queued_data (incoming);
+
+ // Get the rest of the message data
+ this->messaging_object ()->get_message_data (qd);
+
+ // Add the missing data to the queue
+ qd->missing_data_ = 0;
- // Go ahead and enqueue this message
- return this->incoming_message_queue_.enqueue_tail (
- queueable_message);
+ // Check whether the message was fragmented and try to consolidate
+ // the fragments..
+ if (qd->more_fragments_ ||
+ (qd->msg_type_ == TAO_PLUGGABLE_MESSAGE_FRAGMENT))
+ {
+ return this->consolidate_fragments (qd, rh);
+ }
+
+ // Add it to the tail of the queue..
+ this->incoming_message_queue_.enqueue_tail (qd);
+
+ // We should surely have a message in queue now. So just
+ // process that.
+ return this->process_queue_head (rh);
}
- break;
- case 0x0102: // GIOP 1.2
- // In 1.2, we get a little more context. There's a
- // FRAGMENT message-specific header, and inside that is the
- // request id with which the fragment is associated.
- fragment_message =
- this->incoming_message_queue_.find_fragment (
- queueable_message->request_id_);
-
- // No fragment was found
- if (fragment_message == 0)
- return this->incoming_message_queue_.enqueue_tail (
- queueable_message);
-
- if (fragment_message->major_version_ != major ||
- fragment_message->minor_version_ != minor)
- ACE_ERROR_RETURN ((LM_ERROR,
- "TAO (%P|%t) - "
- "TAO_Transport::enqueue_incoming_message "
- "GIOP versions do not match "
- "(%d.%d != %d.%d\n",
- fragment_message->major_version_,
- fragment_message->minor_version_,
- major, minor),
- -1);
-
- // Find the last message block in the continuation
- mb = fragment_message->msg_block_;
- while (mb->cont () != 0)
- mb = mb->cont ();
-
- // Add the current message block to the end of the chain
- // after adjusting the read pointer to skip the GIOP header
- queueable_message->msg_block_->rd_ptr(TAO_GIOP_MESSAGE_HEADER_LEN +
- TAO_GIOP_MESSAGE_FRAGMENT_HEADER);
- mb->cont (queueable_message->msg_block_);
-
- // Remove our reference to the message block. At this point
- // the message block of the fragment head owns it as part of a
- // chain
- queueable_message->msg_block_ = 0;
-
- if (!queueable_message->more_fragments_)
+
+ // parse_consolidate_messages () would have processed one of the
+ // messages, so we better return as we dont want to starve other
+ // threads.
+ return 0;
+ }
+
+ // If we still have some missing data..
+ if (missing_data > 0)
+ {
+ // Get the last message from the Queue
+ TAO_Queued_Data *qd =
+ this->incoming_message_queue_.dequeue_tail ();
+
+ if (TAO_debug_level > 5)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "TAO (%P|%t) - Transport[%d]::consolidate_message_queue, "
+ "trying recv, again\n",
+ this->id ()));
+ }
+
+ // Try to do a read again. If we have some luck it would be
+ // great..
+ ssize_t n = this->recv (qd->msg_block_->wr_ptr (),
+ missing_data,
+ max_wait_time);
+
+ if (TAO_debug_level > 5)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "TAO (%P|%t) - Transport[%d]::consolidate_message_queue, "
+ "recv retval [%d]\n",
+ this->id (),
+ n));
+ }
+
+ // Error...
+ if (n < 0)
+ {
+ return n;
+ }
+
+ // If we get a EWOULDBLOCK ie. n==0, we should anyway put the
+ // message in queue before returning..
+ // Move the write pointer
+ qd->msg_block_->wr_ptr (n);
+
+ // Decrement the missing data
+ qd->missing_data_ -= n;
+
+ // Now put the TAO_Queued_Data back in the queue
+ this->incoming_message_queue_.enqueue_tail (qd);
+
+ // Any way as we have come this far and are about to return,
+ // just try to process a message if it is there in the queue.
+ if (this->incoming_message_queue_.is_head_complete ())
{
- // This is the end of the fragments for this request
- fragment_message->consolidate ();
+ return this->process_queue_head (rh);
}
- // Get rid of the queuable message
- queueable_message->release ();
- break;
- default:
- if (!queueable_message->more_fragments_)
- return this->incoming_message_queue_.enqueue_tail (
- queueable_message);
- // This is an unknown GIOP version
- ACE_ERROR_RETURN ((LM_ERROR,
- "TAO (%P|%t) - "
- "TAO_Transport::enqueue_incoming_message "
- "can not handle a fragmented GIOP %d.%d "
- "message\n", major, minor),
- -1);
+ return 0;
}
- return 0;
+ // Process a message in the head of the queue if we have one..
+ return this->process_queue_head (rh);
}
int
+TAO_Transport::consolidate_extra_messages (ACE_Message_Block
+ &incoming,
+ TAO_Resume_Handle &rh)
+{
+ if (TAO_debug_level > 4)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "TAO (%P|%t) - Transport[%d]::consolidate_extra_messages\n",
+ this->id ()));
+ }
+
+ // Pick the tail of the queue
+ TAO_Queued_Data *tail =
+ this->incoming_message_queue_.dequeue_tail ();
+
+ if (tail)
+ {
+ // If we have a node in the tail, checek to see whether it needs
+ // consolidation. If so, just consolidate it.
+ if (this->messaging_object ()->consolidate_node (tail, incoming) == -1)
+ {
+ return -1;
+ }
+
+ // .. put the tail back in queue..
+ this->incoming_message_queue_.enqueue_tail (tail);
+ }
+
+ int retval = 1;
+
+ if (TAO_debug_level > 6)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "TAO (%P|%t) - Transport[%d]::consolidate_extra_messages, "
+ "extracting extra messages\n",
+ this->id ()));
+ }
+
+ // Extract messages..
+ while (retval == 1)
+ {
+ TAO_Queued_Data *q_data = 0;
+
+ retval =
+ this->messaging_object ()->extract_next_message (incoming,
+ q_data);
+ if (q_data)
+ {
+ // If we have read a framented message then...
+ if (q_data->more_fragments_ ||
+ q_data->msg_type_ == TAO_PLUGGABLE_MESSAGE_FRAGMENT)
+ {
+ this->consolidate_fragments (q_data, rh);
+ }
+ else
+ {
+ this->incoming_message_queue_.enqueue_tail (q_data);
+ }
+ }
+ }
+
+ // In case of error return..
+ if (retval == -1)
+ {
+ return retval;
+ }
+
+ return this->process_queue_head (rh);
+}
+
+int
TAO_Transport::process_parsed_messages (TAO_Queued_Data *qd,
TAO_Resume_Handle &rh)
{
// Get the <message_type> that we have received
TAO_Pluggable_Message_Type t = qd->msg_type_;
+ // int result = 0;
+
if (t == TAO_PLUGGABLE_MESSAGE_CLOSECONNECTION)
{
if (TAO_debug_level > 0)
@@ -1871,69 +1828,126 @@ TAO_Transport::process_parsed_messages (TAO_Queued_Data *qd,
return 0;
}
-int
-TAO_Transport::process_queue_head (TAO_Resume_Handle &rh)
+TAO_Queued_Data *
+TAO_Transport::make_queued_data (ACE_Message_Block &incoming)
{
- if (TAO_debug_level > 3)
+ // Get an instance of TAO_Queued_Data
+ TAO_Queued_Data *qd =
+ TAO_Queued_Data::get_queued_data (
+ this->orb_core_->transport_message_buffer_allocator ());
+
+ // Get the flag for the details of the data block...
+ ACE_Message_Block::Message_Flags flg =
+ incoming.self_flags ();
+
+ if (ACE_BIT_DISABLED (flg,
+ ACE_Message_Block::DONT_DELETE))
{
- ACE_DEBUG ((LM_DEBUG,
- "TAO (%P|%t) - Transport[%d]::process_queue_head\n",
- this->id ()));
+ // Duplicate the data block before putting it in the queue.
+ qd->msg_block_ = ACE_Message_Block::duplicate (&incoming);
}
+ else
+ {
+ // As we are in CORBA mode, all the data blocks would be aligned
+ // on an 8 byte boundary. Hence create a data block for more
+ // than the actual length
+ ACE_Data_Block *db =
+ this->orb_core_->create_input_cdr_data_block (incoming.length ()+
+ ACE_CDR::MAX_ALIGNMENT);
- if (this->incoming_message_queue_.is_head_complete () != 1)
- return 1;
+ // Get the allocator..
+ ACE_Allocator *alloc =
+ this->orb_core_->input_cdr_msgblock_allocator ();
- // Get the message on the head of the queue..
- TAO_Queued_Data *qd =
- this->incoming_message_queue_.dequeue_head ();
+ // Make message block..
+ ACE_Message_Block mb (db,
+ 0,
+ alloc);
+
+ // Duplicate the block..
+ qd->msg_block_ = mb.duplicate ();
+
+ // Align the message block
+ ACE_CDR::mb_align (qd->msg_block_);
+
+ // Copy the data..
+ qd->msg_block_->copy (incoming.rd_ptr (),
+ incoming.length ());
+ }
+
+
+ return qd;
+}
+int
+TAO_Transport::process_queue_head (TAO_Resume_Handle &rh)
+{
if (TAO_debug_level > 3)
{
ACE_DEBUG ((LM_DEBUG,
- "TAO (%P|%t) - Transport[%d]::process_queue_head, "
- "the size of the queue is [%d]\n",
- this->id (),
- this->incoming_message_queue_.queue_length()));
+ "TAO (%P|%t) - Transport[%d]::process_queue_head\n",
+ this->id ()));
}
- // Now that we have pulled out out one message out of the queue,
- // check whether we have one more message in the queue...
- if (this->incoming_message_queue_.queue_length () > 0)
+
+ // See if the message in the head of the queue is complete...
+ if (this->incoming_message_queue_.is_head_complete () > 0)
{
- if (TAO_debug_level > 0)
+ // Get the message on the head of the queue..
+ TAO_Queued_Data *qd =
+ this->incoming_message_queue_.dequeue_head ();
+
+ if (TAO_debug_level > 3)
{
ACE_DEBUG ((LM_DEBUG,
"TAO (%P|%t) - Transport[%d]::process_queue_head, "
- "notify reactor\n",
- this->id ()));
+ "the size of the queue is [%d]\n",
+ this->id (),
+ this->incoming_message_queue_.queue_length()));
+ }
+ // Now that we have pulled out out one message out of the queue,
+ // check whether we have one more message in the queue...
+ if (this->incoming_message_queue_.is_head_complete () > 0)
+ {
+ if (TAO_debug_level > 0)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "TAO (%P|%t) - Transport[%d]::process_queue_head, "
+ "notify reactor\n",
+ this->id ()));
+ }
+ int retval =
+ this->notify_reactor ();
+
+ if (retval == 1)
+ {
+ // Let the class know that it doesn't need to resume the
+ // handle..
+ rh.set_flag (TAO_Resume_Handle::TAO_HANDLE_LEAVE_SUSPENDED);
+ }
+ else if (retval < 0)
+ return -1;
+ }
+ else
+ {
+ // As we are ready to process the last message just resume
+ // the handle. Set the flag incase someone had reset the flag..
+ rh.set_flag (TAO_Resume_Handle::TAO_HANDLE_RESUMABLE);
}
- int retval =
- this->notify_reactor ();
- if (retval == 1)
+ // Process the message...
+ if (this->process_parsed_messages (qd, rh) == -1)
{
- // Let the class know that it doesn't need to resume the
- // handle..
- rh.set_flag (TAO_Resume_Handle::TAO_HANDLE_LEAVE_SUSPENDED);
+ return -1;
}
- else if (retval < 0)
- return -1;
- }
- else
- {
- // As we are ready to process the last message just resume
- // the handle. Set the flag incase someone had reset the flag..
- rh.set_flag (TAO_Resume_Handle::TAO_HANDLE_RESUMABLE);
- }
- // Process the message...
- int retval = this->process_parsed_messages (qd, rh);
+ // Delete the Queued_Data..
+ TAO_Queued_Data::release (qd);
- // Delete the Queued_Data..
- TAO_Queued_Data::release (qd);
+ return 0;
+ }
- return (retval == -1) ? -1 : 0;
+ return 1;
}
int
@@ -1945,8 +1959,6 @@ TAO_Transport::notify_reactor (void)
}
ACE_Event_Handler *eh = this->event_handler_i ();
- if (eh == 0)
- return -1;
// Get the reactor associated with the event handler
ACE_Reactor *reactor = this->orb_core ()->reactor ();
@@ -1983,18 +1995,6 @@ TAO_Transport::transport_cache_manager (void)
return this->orb_core_->lane_resources ().transport_cache ();
}
-size_t
-TAO_Transport::recv_buffer_size (void)
-{
- return this->recv_buffer_size_;
-}
-
-size_t
-TAO_Transport::sent_byte_count (void)
-{
- return this->sent_byte_count_;
-}
-
void
TAO_Transport::assign_translators (TAO_InputCDR *inp, TAO_OutputCDR *outp)
{