summaryrefslogtreecommitdiff
path: root/TAO/tao/Transport.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'TAO/tao/Transport.cpp')
-rw-r--r--TAO/tao/Transport.cpp1306
1 files changed, 648 insertions, 658 deletions
diff --git a/TAO/tao/Transport.cpp b/TAO/tao/Transport.cpp
index df1e0ffcfe9..6449173e522 100644
--- a/TAO/tao/Transport.cpp
+++ b/TAO/tao/Transport.cpp
@@ -18,6 +18,7 @@
#include "Resume_Handle.h"
#include "Codeset_Manager.h"
#include "Codeset_Translator_Factory.h"
+#include "GIOP_Message_State.h"
#include "debug.h"
#include "ace/OS_NS_sys_time.h"
@@ -115,6 +116,7 @@ TAO_Transport::TAO_Transport (CORBA::ULong tag,
, head_ (0)
, tail_ (0)
, incoming_message_queue_ (orb_core)
+ , uncompleted_message_ (0)
, current_deadline_ (ACE_Time_Value::zero)
, flush_timer_id_ (-1)
, transport_timer_ (this)
@@ -200,7 +202,7 @@ TAO_Transport::register_handler (void)
if (TAO_debug_level > 4)
{
ACE_DEBUG ((LM_DEBUG,
- "TAO (%P|%t) - Transport[%d]::register_handler\n",
+ ACE_TEXT("TAO (%P|%t) - Transport[%d]::register_handler\n"),
this->id ()));
}
@@ -240,12 +242,10 @@ TAO_Transport::generate_locate_request (
== -1)
{
if (TAO_debug_level > 0)
- {
- ACE_DEBUG ((LM_DEBUG,
- "TAO (%P|%t) - Transport[%d]::generate_locate_request, "
- "error while marshalling the LocateRequest header\n",
- this->id ()));
- }
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT("TAO (%P|%t) - Transport[%d]::generate_locate_request, ")
+ ACE_TEXT("error while marshalling the LocateRequest header\n"),
+ this->id ()));
return -1;
}
@@ -261,25 +261,19 @@ TAO_Transport::generate_request_header (
{
// codeset service context is only supposed to be sent in the first request
// on a particular connection.
- if (this->first_request_)
- {
- this->orb_core ()->codeset_manager ()->generate_service_context (
- opdetails,
- *this
- );
- }
+ TAO_Codeset_Manager *csm = this->orb_core()->codeset_manager();
+ if (csm && this->first_request_)
+ csm->generate_service_context( opdetails, *this );
if (this->messaging_object ()->generate_request_header (opdetails,
spec,
output) == -1)
{
if (TAO_debug_level > 0)
- {
ACE_DEBUG ((LM_DEBUG,
- "(%P|%t) - Transport[%d]::generate_request_header, "
- "error while marshalling the Request header\n",
- this->id()));
- }
+ ACE_TEXT("(%P|%t) - Transport[%d]::generate_request_header, ")
+ ACE_TEXT("error while marshalling the Request header\n"),
+ this->id()));
return -1;
}
@@ -336,7 +330,7 @@ TAO_Transport::handle_output (void)
if (TAO_debug_level > 3)
{
ACE_DEBUG ((LM_DEBUG,
- "TAO (%P|%t) - Transport[%d]::handle_output\n",
+ ACE_TEXT("TAO (%P|%t) - Transport[%d]::handle_output\n"),
this->id ()));
}
@@ -348,8 +342,8 @@ TAO_Transport::handle_output (void)
if (TAO_debug_level > 3)
{
ACE_DEBUG ((LM_DEBUG,
- "TAO (%P|%t) - Transport[%d]::handle_output, "
- "drain_queue returns %d/%d\n",
+ ACE_TEXT("TAO (%P|%t) - Transport[%d]::handle_output, ")
+ ACE_TEXT("drain_queue returns %d/%d\n"),
this->id (),
retval, errno));
}
@@ -487,7 +481,6 @@ TAO_Transport::send_synchronous_message_i (const ACE_Message_Block *mb,
&synch_message,
max_wait_time);
}
-
if (result == -1)
{
synch_message.remove_from_list (this->head_, this->tail_);
@@ -525,9 +518,9 @@ TAO_Transport::send_synchronous_message_i (const ACE_Message_Block *mb,
if (TAO_debug_level > 0)
{
ACE_ERROR ((LM_ERROR,
- "TAO (%P|%t) - Transport[%d]::send_synchronous_message_i, "
- "error while flushing message - %m\n",
- this->id ()));
+ ACE_TEXT("TAO (%P|%t) - Transport[%d]::send_synchronous_message_i, ")
+ ACE_TEXT("error while flushing message %p\n"),
+ this->id (), ""));
}
return -1;
@@ -568,8 +561,8 @@ TAO_Transport::send_reply_message_i (const ACE_Message_Block *mb,
if (TAO_debug_level > 3)
{
ACE_DEBUG ((LM_DEBUG,
- "TAO (%P|%t) - Transport[%d]::send_reply_message_i, "
- "preparing to add to queue before leaving \n",
+ ACE_TEXT("TAO (%P|%t) - Transport[%d]::send_reply_message_i, ")
+ ACE_TEXT("preparing to add to queue before leaving \n"),
this->id ()));
}
@@ -644,7 +637,7 @@ TAO_Transport::schedule_output_i (void)
if (TAO_debug_level > 3)
{
ACE_DEBUG ((LM_DEBUG,
- "TAO (%P|%t) - Transport[%d]::schedule_output_i\n",
+ ACE_TEXT("TAO (%P|%t) - Transport[%d]::schedule_output\n"),
this->id ()));
}
@@ -660,7 +653,7 @@ TAO_Transport::cancel_output_i (void)
if (TAO_debug_level > 3)
{
ACE_DEBUG ((LM_DEBUG,
- "TAO (%P|%t) - Transport[%d]::cancel_output_i\n",
+ ACE_TEXT("TAO (%P|%t) - Transport[%d]::cancel_output\n"),
this->id ()));
}
@@ -674,8 +667,8 @@ TAO_Transport::handle_timeout (const ACE_Time_Value & /* current_time */,
if (TAO_debug_level > 6)
{
ACE_DEBUG ((LM_DEBUG,
- "TAO (%P|%t) - TAO_Transport[%d]::handle_timeout, "
- "timer expired\n",
+ ACE_TEXT("TAO (%P|%t) - TAO_Transport[%d]::handle_timeout, ")
+ ACE_TEXT("timer expired\n"),
this->id ()));
}
@@ -703,6 +696,7 @@ int
TAO_Transport::drain_queue (void)
{
ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->handler_lock_, -1);
+
int retval = this->drain_queue_i ();
if (retval == 1)
@@ -746,8 +740,8 @@ TAO_Transport::drain_queue_helper (int &iovcnt, iovec iov[])
if (TAO_debug_level > 4)
{
ACE_DEBUG ((LM_DEBUG,
- "TAO (%P|%t) - Transport[%d]::drain_queue_helper, "
- "send() returns 0\n",
+ ACE_TEXT("TAO (%P|%t) - Transport[%d]::drain_queue_helper, ")
+ ACE_TEXT("send() returns 0"),
this->id ()));
}
return -1;
@@ -757,9 +751,9 @@ TAO_Transport::drain_queue_helper (int &iovcnt, iovec iov[])
if (TAO_debug_level > 4)
{
ACE_DEBUG ((LM_DEBUG,
- "TAO (%P|%t) - Transport[%d]::drain_queue_helper, "
- "error during %p\n",
- this->id (), ACE_TEXT ("send()")));
+ ACE_TEXT("TAO (%P|%t) - Transport[%d]::drain_queue_helper, ")
+ ACE_TEXT("error during %p\n"),
+ this->id (), "send()"));
}
if (errno == EWOULDBLOCK || errno == EAGAIN)
@@ -780,8 +774,8 @@ TAO_Transport::drain_queue_helper (int &iovcnt, iovec iov[])
if (TAO_debug_level > 4)
{
ACE_DEBUG ((LM_DEBUG,
- "TAO (%P|%t) - Transport[%d]::drain_queue_helper, "
- "byte_count = %d, head_is_empty = %d\n",
+ ACE_TEXT("TAO (%P|%t) - Transport[%d]::drain_queue_helper, ")
+ ACE_TEXT("byte_count = %d, head_is_empty = %d\n"),
this->id(), byte_count, (this->head_ == 0)));
}
@@ -820,8 +814,8 @@ TAO_Transport::drain_queue_i (void)
if (TAO_debug_level > 4)
{
ACE_DEBUG ((LM_DEBUG,
- "TAO (%P|%t) - Transport[%d]::drain_queue_i, "
- "helper retval = %d\n",
+ ACE_TEXT("TAO (%P|%t) - Transport[%d]::drain_queue_i, ")
+ ACE_TEXT("helper retval = %d\n"),
this->id (), retval));
}
@@ -846,8 +840,8 @@ TAO_Transport::drain_queue_i (void)
if (TAO_debug_level > 4)
{
ACE_DEBUG ((LM_DEBUG,
- "TAO (%P|%t) - Transport[%d]::drain_queue_i, "
- "helper retval = %d\n",
+ ACE_TEXT("TAO (%P|%t) - Transport[%d]::drain_queue_i, ")
+ ACE_TEXT("helper retval = %d\n"),
this->id (), retval));
}
@@ -909,8 +903,8 @@ TAO_Transport::cleanup_queue (size_t byte_count)
if (TAO_debug_level > 4)
{
ACE_DEBUG ((LM_DEBUG,
- "TAO (%P|%t) - Transport[%d]::cleanup_queue, "
- "byte_count = %d\n",
+ ACE_TEXT("TAO (%P|%t) - Transport[%d]::cleanup_queue, ")
+ ACE_TEXT("byte_count = %d\n"),
this->id (), byte_count));
}
@@ -920,8 +914,8 @@ TAO_Transport::cleanup_queue (size_t byte_count)
if (TAO_debug_level > 4)
{
ACE_DEBUG ((LM_DEBUG,
- "TAO (%P|%t) - Transport[%d]::cleanup_queue, "
- "after transfer, bc = %d, all_sent = %d, ml = %d\n",
+ ACE_TEXT("TAO (%P|%t) - Transport[%d]::cleanup_queue, ")
+ ACE_TEXT("after transfer, bc = %d, all_sent = %d, ml = %d\n"),
this->id (), byte_count, i->all_data_sent (),
i->message_length ()));
}
@@ -991,9 +985,9 @@ TAO_Transport::report_invalid_event_handler (const char *caller)
if (TAO_debug_level > 0)
{
ACE_DEBUG ((LM_DEBUG,
- "TAO (%P|%t) - Transport[%d]::report_invalid_event_handler"
- "(%s) no longer associated with handler [tag=%d]\n",
- this->id (), ACE_TEXT_CHAR_TO_TCHAR (caller), this->tag_));
+ ACE_TEXT("TAO (%P|%t) - Transport[%d]::report_invalid_event_handler")
+ ACE_TEXT("(%s) no longer associated with handler [tag=%d]\n"),
+ this->id (), caller, this->tag_));
}
}
@@ -1068,8 +1062,8 @@ TAO_Transport::send_message_shared_i (TAO_Stub *stub,
if (TAO_debug_level > 6)
{
ACE_DEBUG ((LM_DEBUG,
- "TAO (%P|%t) - Transport[%d]::send_message_shared_i, "
- "trying to send the message (ml = %d)\n",
+ ACE_TEXT("TAO (%P|%t) - Transport[%d]::send_message_i, ")
+ ACE_TEXT("trying to send the message (ml = %d)\n"),
this->id (), total_length));
}
@@ -1091,11 +1085,11 @@ TAO_Transport::send_message_shared_i (TAO_Stub *stub,
{
if (TAO_debug_level > 0)
{
- ACE_ERROR ((LM_ERROR,
- "TAO (%P|%t) - Transport[%d]::send_message_shared_i, "
- "fatal error in "
- "send_message_block_chain_i - %m\n",
- this->id ()));
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT("TAO (%P|%t) - Transport[%d]::send_message_i, ")
+ ACE_TEXT("fatal error in ")
+ ACE_TEXT("send_message_block_chain_i %p\n"),
+ this->id (), ""));
}
return -1;
}
@@ -1115,8 +1109,8 @@ TAO_Transport::send_message_shared_i (TAO_Stub *stub,
if (TAO_debug_level > 6)
{
ACE_DEBUG ((LM_DEBUG,
- "TAO (%P|%t) - Transport[%d]::send_message_shared_i, "
- "partial send %d / %d bytes\n",
+ ACE_TEXT("TAO (%P|%t) - Transport[%d]::send_message_i, ")
+ ACE_TEXT("partial send %d / %d bytes\n"),
this->id (), byte_count, total_length));
}
@@ -1138,8 +1132,8 @@ TAO_Transport::send_message_shared_i (TAO_Stub *stub,
if (TAO_debug_level > 6)
{
ACE_DEBUG ((LM_DEBUG,
- "TAO (%P|%t) - Transport[%d]::send_message_shared_i, "
- "message is queued\n",
+ ACE_TEXT("TAO (%P|%t) - Transport[%d]::send_message_i, ")
+ ACE_TEXT("message is queued\n"),
this->id ()));
}
@@ -1194,6 +1188,16 @@ TAO_Transport::queue_message_i(const ACE_Message_Block *message_block)
return 0;
}
+class CTHack
+{
+public:
+ CTHack() { enter(); }
+ ~CTHack() { leave(); }
+private:
+ void enter() { x = 1; }
+ void leave() { x = 0; }
+ int x;
+};
/*
*
@@ -1206,27 +1210,26 @@ TAO_Transport::handle_input (TAO_Resume_Handle &rh,
ACE_Time_Value * max_wait_time,
int /*block*/)
{
+ CTHack cthack;
+
if (TAO_debug_level > 3)
{
ACE_DEBUG ((LM_DEBUG,
- "TAO (%P|%t) - Transport[%d]::handle_input\n",
+ ACE_TEXT("TAO (%P|%t) - Transport[%d]::handle_input\n"),
this->id ()));
}
- // First try to process messages of the head of the incoming queue.
+ // First try to process messages off the head of the incoming queue.
int retval = this->process_queue_head (rh);
-
if (retval <= 0)
{
if (retval == -1)
{
if (TAO_debug_level > 2)
- {
- ACE_DEBUG ((LM_DEBUG,
- "TAO (%P|%t) - Transport[%d]::handle_input, "
- "error while parsing the head of the queue\n",
- this->id()));
- }
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT("TAO (%P|%t) - Transport[%d]::handle_input, ")
+ ACE_TEXT("error while parsing the head of the queue\n"),
+ this->id()));
}
return retval;
@@ -1237,7 +1240,7 @@ TAO_Transport::handle_input (TAO_Resume_Handle &rh,
// The buffer on the stack which will be used to hold the input
// messages
- char buf [TAO_MAXBUFSIZE];
+ char buf[TAO_MAXBUFSIZE];
#if defined (ACE_INITIALIZE_MEMORY_BEFORE_USE)
(void) ACE_OS::memset (buf,
@@ -1259,21 +1262,23 @@ TAO_Transport::handle_input (TAO_Resume_Handle &rh,
ACE_Message_Block::DONT_DELETE,
this->orb_core_->input_cdr_msgblock_allocator ());
+ // We'll loop trying to complete the message this number of times,
+ // and that's it.
+ unsigned int number_of_read_attempts = TAO_MAX_TRANSPORT_REREAD_ATTEMPTS;
+
+ unsigned int did_queue_message = 0;
// Align the message block
ACE_CDR::mb_align (&message_block);
size_t recv_size = 0;
-
if (this->orb_core_->orb_params ()->single_read_optimization ())
{
- recv_size =
- message_block.space ();
+ recv_size = message_block.space ();
}
else
{
- recv_size =
- this->messaging_object ()->header_length ();
+ recv_size = this->messaging_object ()->header_length ();
}
// Saving the size of the received buffer in case any one needs to
@@ -1285,7 +1290,7 @@ TAO_Transport::handle_input (TAO_Resume_Handle &rh,
// Read the message into the message block that we have created on
// the stack.
- ssize_t n = this->recv (message_block.rd_ptr (),
+ ssize_t n = this->recv (message_block.wr_ptr (),
recv_size,
max_wait_time);
@@ -1298,183 +1303,413 @@ TAO_Transport::handle_input (TAO_Resume_Handle &rh,
if (TAO_debug_level > 2)
{
ACE_DEBUG ((LM_DEBUG,
- "TAO (%P|%t) - Transport[%d]::handle_input, "
- "read %d bytes\n",
+ ACE_TEXT("TAO (%P|%t) - Transport[%d]::handle_input: ")
+ ACE_TEXT("read %d bytes\n"),
this->id (), n));
}
// Set the write pointer in the stack buffer
message_block.wr_ptr (n);
- // Parse the message and try consolidating the message if
- // needed.
- retval = this->parse_consolidate_messages (message_block,
- rh,
- max_wait_time);
+ if (TAO_debug_level >= 10)
+ ACE_HEX_DUMP ((LM_DEBUG,
+ (const char *) message_block.rd_ptr (),
+ message_block.length (),
+ ACE_TEXT ("TAO (%P|%t) Transport::handle_input(): bytes read from socket")));
- if (retval <= 0)
+
+complete_message_and_possibly_enqueue:
+ // Check to see if we're still working to complete a message
+ if (this->uncompleted_message_)
{
- if (retval == -1 && TAO_debug_level > 0)
- {
- ACE_DEBUG ((LM_DEBUG,
- "TAO (%P|%t) - Transport[%d]::handle_input, "
- "error while parsing and consolidating\n",
- this->id ()));
- }
- return retval;
- }
+ // try to complete it
- // Make a node of the message block..
- TAO_Queued_Data qd (&message_block,
- this->orb_core_->transport_message_buffer_allocator ());
+ // on exit from this frame we have one of the following states:
+ //
+ // (a) an uncompleted message still in uncompleted_message_
+ // AND message_block is empty
+ //
+ // (b) uncompleted_message_ zero, the completed message at the
+ // tail of the incoming queue; message_block could be empty
+ // or still contain bytes
- // Extract the data for the node..
- this->messaging_object ()->get_message_data (&qd);
+ // ==> repeat
+ do
+ {
+ /*
+ * Append the "right number of bytes" to uncompleted_message_
+ */
+ // ==> right_number_of_bytes = MIN(bytes missing from
+ // uncompleted_message_, length of message_block);
+ size_t right_number_of_bytes =
+ ACE_MIN (this->uncompleted_message_->missing_data_bytes_,
+ message_block.length () );
- // Check whether the message was fragmented..
- if (qd.more_fragments_ ||
- (qd.msg_type_ == TAO_PLUGGABLE_MESSAGE_FRAGMENT))
- {
- // Duplicate the node that we have as the node is on stack..
- TAO_Queued_Data *nqd =
- TAO_Queued_Data::duplicate (qd);
+ if (TAO_debug_level > 2)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT("(%P|%t) Transport[%d]::handle_input: ")
+ ACE_TEXT("trying to use %u (of %u) ")
+ ACE_TEXT("bytes to complete message missing %u bytes\n"),
+ this->id (),
+ right_number_of_bytes,
+ message_block.length (),
+ this->uncompleted_message_->missing_data_bytes_));
+ }
- return this->consolidate_fragments (nqd, rh);
- }
+ // ==> append right_number_of_bytes from message_block
+ // to uncomplete_message_ & update read pointer of
+ // message_block;
+
+ // 1. we assume that uncompleted_message_.msg_block_'s
+ // wr_ptr is properly maintained
+ // 2. we presume that uncompleted_message_.msg_block was
+ // allocated with enough space to contain the *entire*
+ // expected GIOP message, so this copy shouldn't involve an
+ // additional allocation
+ this->uncompleted_message_->msg_block_->copy (message_block.rd_ptr (),
+ right_number_of_bytes);
+ this->uncompleted_message_->missing_data_bytes_ -= right_number_of_bytes;
+ message_block.rd_ptr (right_number_of_bytes);
+
+ switch (this->uncompleted_message_->current_state_)
+ {
+ case TAO_Queued_Data::WAITING_TO_COMPLETE_HEADER:
+ {
+ int hdrvalidity = this->messaging_object()->check_for_valid_header (
+ *this->uncompleted_message_->msg_block_);
+ if (hdrvalidity == 0)
+ {
+ // According to the spec, Section 15.4.8, we should send
+ // the MessageError GIOP message on receipt of "any message...whose
+ // header is not properly formed (e.g., has the wrong magic value)".
+ //
+ // So, rather than returning -1, what we REALLY need to do is
+ // send a MessageError in reply.
+ //
+ // I'm not sure what the best way to trigger that is...probably to
+ // queue up a special internal-only COMPLETED message that, when
+ // processed, sends the MessageError as part of its processing.
+ return -1;
+ }
+ else if (hdrvalidity == 1)
+ {
+ // ==> update bytes missing from uncompleted_message_
+ // with size of message from valid header;
+ this->messaging_object()->set_queued_data_from_message_header (
+ this->uncompleted_message_,
+ *this->uncompleted_message_->msg_block_);
+ // ==> change state of uncompleted_event_ to
+ // WAITING_TO_COMPLETE_PAYLOAD;
+ this->uncompleted_message_->current_state_ =
+ TAO_Queued_Data::WAITING_TO_COMPLETE_PAYLOAD;
+
+ // ==> Resize the message block to have capacity for
+ // the rest of the incoming message
+ ACE_Message_Block & mb = *this->uncompleted_message_->msg_block_;
+ ACE_CDR::grow (&mb,
+ mb.size ()
+ + this->uncompleted_message_->missing_data_bytes_);
+
+ if (TAO_debug_level > 2)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT("(%P|%t) Transport[%d]::handle_input: ")
+ ACE_TEXT("found a valid header in the message; ")
+ ACE_TEXT("waiting for %u bytes to complete payload\n"),
+ this->id (),
+ this->uncompleted_message_->missing_data_bytes_));
+ }
+
+ // Continue the loop...
+ continue;
+ }
+
+ // In the case where we don't have enough information
+ // (hdrvalidity == -1), we just have to fall through
+ // and collect more information, i.e., bytes.
+
+ }
+ break;
+
+ case TAO_Queued_Data::WAITING_TO_COMPLETE_PAYLOAD:
+ // Here we have an opportunity to try to finish reading the
+ // uncompleted message. This is a Good Idea(TM) because there are
+ // good odds that either more data came available since the last
+ // time we read, or that we simply didn't read the whole message on
+ // the first read. So, we try to read again.
+ //
+ // NOTE! this changes this->uncompleted_message_!
+ this->try_to_complete (max_wait_time);
- // Process the message
- return this->process_parsed_messages (&qd,
- rh);
-}
+ // ==> if (bytes missing from uncompleted_message_ == 0)
+ if (this->uncompleted_message_->missing_data_bytes_ == 0)
+ {
+ /*
+ * We completed the message! Hooray!
+ */
+ // ==> place uncompleted_message_ (which is now
+ // complete!) at the tail of the incoming message
+ // queue;
+
+ // ---> NOTE: whoever pulls this off the queue must delete it!
+ this->uncompleted_message_->current_state_
+ = TAO_Queued_Data::COMPLETED;
+
+ // @@CJC NEED TO CHECK RETURN VALUE HERE!
+ this->enqueue_incoming_message (this->uncompleted_message_);
+ did_queue_message = 1;
+ // zero out uncompleted_message_;
+ this->uncompleted_message_ = 0;
+
+ if (TAO_debug_level > 2)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT("(%P|%t) Transport[%d]::handle_input: ")
+ ACE_TEXT("completed and queued message for processing!\n"),
+ this->id ()));
+ }
-int
-TAO_Transport::parse_consolidate_messages (ACE_Message_Block &block,
- TAO_Resume_Handle &rh,
- ACE_Time_Value *max_wait_time)
-{
- // Parse the incoming message for validity. The check needs to be
- // performed by the messaging objects.
- if (this->parse_incoming_messages (block) == -1)
- {
- return -1;
- }
+ }
+ else
+ {
- // Check whether we have a complete message for processing
- const ssize_t missing_data = this->missing_data (block);
+ if (TAO_debug_level > 2)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT("(%P|%t) Transport[%d]::handle_input: ")
+ ACE_TEXT("still need %u bytes to complete uncompleted message.\n"),
+ this->id (),
+ this->uncompleted_message_->missing_data_bytes_));
+ }
+ }
+ break;
+ default:
+ // @@CJC What do we do here?!
+ ACE_ASSERT (! ACE_TEXT("Transport::handle_input: unexpected state")
+ ACE_TEXT("in uncompleted_message_"));
+ }
+ }
+ // Does the order of the checks matter? In both (a) and (b),
+ // message_block is empty, but only in (b) is there no
+ // uncompleted_message_.
+ // ==> until (message_block is empty || there is no uncompleted_message_);
+ // or, rewritten in C++ looping constructs
+ // ==> while ( ! message_block is empty && there is an uncompleted_message_ );
+ while (message_block.length() != 0 && this->uncompleted_message_);
+ }
+
+ // *****************************
+ // @@ CJC
+ //
+ // Once upon a time we tried to complete reading the uncompleted
+ // message here, but testing found that completing later worked
+ // better.
+ // *****************************
+
+
+ // At this point, there should be nothing in uncompleted_message_.
+ // We now need to chop up the bytes in message_block and store any
+ // complete messages in the incoming message queue.
+ //
+ // ==> if (message_block still has data)
+ if (message_block.length () != 0)
+ {
+ TAO_Queued_Data *complete_message = 0;
+ do
+ {
+ if (TAO_debug_level >= 10)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT("TAO (%P|%t) Transport::handle_input: ")
+ ACE_TEXT("extracting complete messages\n")));
+ ACE_HEX_DUMP ((LM_DEBUG,
+ message_block.rd_ptr (),
+ message_block.length (),
+ ACE_TEXT (" from this message buffer")));
+ }
- if (missing_data < 0)
- {
- // If we have more than one message
- return this->consolidate_extra_messages (block,
- rh);
+ complete_message =
+ TAO_Queued_Data::make_completed_message (
+ message_block, *this->messaging_object ());
+ if ((complete_message == TAO_Queued_Data::COULD_NOT_FIND_VALID_HEADER.get()) ||
+ (complete_message == TAO_Queued_Data::DYNAMIC_ALLOCATION_FAILED.get()) ||
+ (complete_message == TAO_Queued_Data::COULD_NOT_UNDERSTAND_HEADER.get()))
+ {
+ TAO_Queued_Data* qd = TAO_Queued_Data::make_close_connection ();
+ this->enqueue_incoming_message (qd);
+ did_queue_message = 1;
+ message_block.rd_ptr (message_block.length());
+ }
+ else if (complete_message != TAO_Queued_Data::GENERAL_FAILURE.get())
+ {
+ // If it was a GENERAL FAILURE, we let it go because it might
+ // be that we can complete it later.
+ this->enqueue_incoming_message (complete_message);
+ did_queue_message = 1;
+ }
+ }
+ while (complete_message != TAO_Queued_Data::GENERAL_FAILURE.get());
+ // On exit from this frame we have one of the following states:
+ // (a) message_block is empty
+ // (b) message_block contains bytes from a partial message
}
- else if (missing_data > 0)
+
+ // If, at this point, there's still data in message_block, it's
+ // an incomplete message. Therefore, we stuff it into the
+ // uncompleted_message_ and clear out message_block.
+ // ==> if (message_block still has data)
+ if (message_block.length () != 0)
{
- // If we have missing data then try doing a read or try queueing
- // them.
- return this->consolidate_message (block,
- missing_data,
- rh,
- max_wait_time);
- }
+ if (this->uncompleted_message_ != 0)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT("(%P|%t) Transport[%d]::handle_input:%d ")
+ ACE_TEXT("have an uncompleted message I didn't expect.\n"),
+ this->id (), __LINE__
+ ));
+ }
- return 1;
-}
+ // duplicate message_block remainder into this->uncompleted_message_
+ ACE_ASSERT (this->uncompleted_message_ == 0);
+ this->uncompleted_message_ =
+ TAO_Queued_Data::make_uncompleted_message (&message_block,
+ *this->messaging_object ());
+ if (this->uncompleted_message_ == TAO_Queued_Data::GENERAL_FAILURE.get())
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT("(%P|%t) Transport[%d]::handle_input:%d ")
+ ACE_TEXT("failed to create an uncompleted message.\n"),
+ this->id (), __LINE__
+ ));
+ }
+ else if ((this->uncompleted_message_ == TAO_Queued_Data::COULD_NOT_FIND_VALID_HEADER.get()) ||
+ (this->uncompleted_message_ == TAO_Queued_Data::DYNAMIC_ALLOCATION_FAILED.get()) ||
+ (this->uncompleted_message_ == TAO_Queued_Data::COULD_NOT_UNDERSTAND_HEADER.get()))
+ {
+ TAO_Queued_Data* qd = TAO_Queued_Data::make_close_connection ();
+ this->enqueue_incoming_message (qd);
+ did_queue_message = 1;
+ message_block.rd_ptr (message_block.length());
+ }
+ ACE_ASSERT (this->uncompleted_message_ != 0);
+
+ // In a debug build, we won't reach this point if we couldn't
+ // create an uncompleted message because the above ASSERT will
+ // trip. However, in an optimized build, the ASSERT isn't
+ // there, so we'll go past here.
+ //
+ // We could put a check in here similar to the ASSERT condition,
+ // but doing that would terminate this loop early and result in
+ // our never processing any completed messages that were received
+ // in this trip to handle_input.
+ //
+ // Maybe we could instead queue up a special completed message that,
+ // when processed, causes the connection to get closed in a non-graceful
+ // termination scenario.
+ }
+
+ // We should have consumed ALL the bytes by now.
+ ACE_ASSERT (message_block.length () == 0);
+
+ //
+ // We don't want to try to re-read earlier because we may not have
+ // an uncompleted message until we get to this point. So, if we did
+ // it earlier, we could have missed the opportunity to complete it
+ // and dispatch.
+ //
+ // Thanks to Bala <bala@cse.wustl.edu> for the idea to read again
+ // to increase throughput!
+
+ if (this->uncompleted_message_)
+ {
+ if (number_of_read_attempts--)
+ {
+ // We try to read again just in case more data arrived while
+ // we were doing the stuff above. This way, we can increase
+ // throughput without much of a penalty.
-int
-TAO_Transport::parse_incoming_messages (ACE_Message_Block &block)
-{
- // If we have a queue and if the last message is not complete a
- // complete one, then this read will get us the remaining data. So
- // do not try to parse the header if we have an incomplete message
- // in the queue.
- if (this->incoming_message_queue_.is_tail_complete () != 0)
- {
- // As it looks like a new message has been read, process the
- // message. Call the messaging object to do the parsing..
- int retval =
- this->messaging_object ()->parse_incoming_messages (block);
+ if (TAO_debug_level > 2)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT("(%P|%t) Transport[%d]::handle_input: ")
+ ACE_TEXT("still have an uncompleted message; ")
+ ACE_TEXT("will try %d more times before letting ")
+ ACE_TEXT("somebody else have a chance.\n"),
+ this->id (),
+ number_of_read_attempts));
+ }
- if (retval == -1)
+ // We only bother trying to complete payload, not header, because the
+ // retry only happens in the complete-the-payload clause above.
+ if (this->uncompleted_message_->current_state_ ==
+ TAO_Queued_Data::WAITING_TO_COMPLETE_PAYLOAD)
+ goto complete_message_and_possibly_enqueue;
+ }
+ else
{
+ // The queue should be empty because it should have been processed
+ // above. But I wonder if I should put a check in here anyway.
if (TAO_debug_level > 2)
{
ACE_DEBUG ((LM_DEBUG,
- "TAO (%P|%t) - Transport[%d]::parse_incoming_messages, "
- "error in incoming message\n",
- this->id ()));
+ ACE_TEXT("(%P|%t) Transport[%d]::handle_input: ")
+ ACE_TEXT("giving up reading for now and returning ")
+ ACE_TEXT("with incoming queue length = %d\n"),
+ this->id (),
+ this->incoming_message_queue_.queue_length ()));
+ if (this->uncompleted_message_)
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT("(%P|%t) Transport[%d]::handle_input: ")
+ ACE_TEXT("missing bytes from uncompleted message = %u\n"),
+ this->id (),
+ this->uncompleted_message_->missing_data_bytes_));
}
-
- return -1;
+ // tell the upper layer not to resume the handle
+ rh.set_flag (TAO_Resume_Handle::TAO_HANDLE_LEAVE_SUSPENDED);
+ return 1;
}
}
- return 0;
-}
+ // **** END CJC PMG CHANGES ****
+ retval = did_queue_message ? this->process_queue_head (rh) : 1;
-size_t
-TAO_Transport::missing_data (ACE_Message_Block &incoming)
-{
- // If we have a incomplete message in the queue then find out how
- // much of data is required to get a complete message.
- if (this->incoming_message_queue_.is_tail_complete () == 0)
+ if (retval == 1)
{
- return this->incoming_message_queue_.missing_data_tail ();
+ rh.set_flag (TAO_Resume_Handle::TAO_HANDLE_LEAVE_SUSPENDED);
}
- return this->messaging_object ()->missing_data (incoming);
+ return retval;
}
-int
-TAO_Transport::consolidate_message (ACE_Message_Block &incoming,
- ssize_t missing_data,
- TAO_Resume_Handle &rh,
- ACE_Time_Value *max_wait_time)
+void
+TAO_Transport::try_to_complete (ACE_Time_Value *max_wait_time)
{
- // Check whether the last message in the queue is complete..
- if (this->incoming_message_queue_.is_tail_complete () == 0)
- {
- return this->consolidate_message_queue (incoming,
- missing_data,
- rh,
- max_wait_time);
- }
-
- if (TAO_debug_level > 4)
- {
- ACE_DEBUG ((LM_DEBUG,
- "TAO (%P|%t) - Transport[%d]::consolidate_message\n",
- this->id ()));
- }
-
- // Calculate the actual length of the load that we are supposed to
- // read which is equal to the <missing_data> + length of the buffer
- // that we have..
- const size_t payload = missing_data + incoming.size ();
-
- // Grow the buffer to the size of the message
- ACE_CDR::grow (&incoming,
- payload);
+ if (this->uncompleted_message_ == 0)
+ return;
ssize_t n = 0;
+ size_t &missing_data = this->uncompleted_message_->missing_data_bytes_;
+ ACE_Message_Block &mb = *this->uncompleted_message_->msg_block_;
- // As this used for transports where things are available in one
- // shot this looping should not create any problems.
- for (ssize_t bytes = missing_data; bytes != 0; bytes -= n)
+ // Try to complete this until we error or block right here...
+ for (ssize_t bytes = missing_data;
+ bytes != 0;
+ bytes -= n)
{
// .. do a read on the socket again.
- n = this->recv (incoming.wr_ptr (),
+ n = this->recv (mb.wr_ptr (),
bytes,
max_wait_time);
if (TAO_debug_level > 6)
{
ACE_DEBUG ((LM_DEBUG,
- "TAO (%P|%t) - Transport[%d]::consolidate_message, "
- "read %d bytes on attempt\n",
+ ACE_TEXT("TAO (%P|%t) - Transport[%d]::handle_input_i, ")
+ ACE_TEXT("read %d bytes on attempt\n"),
this->id(), n));
}
@@ -1483,381 +1718,175 @@ TAO_Transport::consolidate_message (ACE_Message_Block &incoming,
break;
}
- incoming.wr_ptr (n);
+ mb.wr_ptr (n);
missing_data -= n;
}
-
- // If we got an error..
- if (n == -1)
- {
- if (TAO_debug_level > 4)
- {
- ACE_DEBUG ((LM_DEBUG,
- "TAO (%P|%t) - Transport[%d]::consolidate_message, "
- "error while trying to consolidate\n",
- this->id ()));
- }
-
- return -1;
- }
-
- // If we had gotten a EWOULDBLOCK n would be equal to zero. But we
- // have to put the message in the queue anyway. So let us proceed
- // to do that and return...
-
- // Check to see if we have messages in queue or if we have missing
- // data . AT this point we cannot have have semi-complete messages
- // in the queue as they would have been taken care before. Put
- // ourselves in the queue and then try processing one of the
- // messages..
- if ((missing_data > 0
- ||this->incoming_message_queue_.queue_length ())
- && this->incoming_message_queue_.is_tail_fragmented () == 0)
- {
- if (TAO_debug_level > 4)
- {
- ACE_DEBUG ((LM_DEBUG,
- "TAO (%P|%t) - Transport[%d]::consolidate_message, "
- "queueing up the message\n",
- this->id ()));
- }
-
- // Get a queued data
- TAO_Queued_Data *qd =
- this->make_queued_data (incoming);
-
- // Add the missing data to the queue
- qd->missing_data_ = missing_data;
-
- // Get the rest of the messaging data
- this->messaging_object ()->get_message_data (qd);
-
- // Add it to the tail of the queue..
- this->incoming_message_queue_.enqueue_tail (qd);
-
- if (this->incoming_message_queue_.is_head_complete ())
- {
- return this->process_queue_head (rh);
- }
-
- return 0;
- }
-
- // We dont have any missing data. Just make a queued_data node with
- // the existing message block and send it to the higher layers of
- // the ORB.
- TAO_Queued_Data pqd (&incoming,
- this->orb_core_->transport_message_buffer_allocator ());
- pqd.missing_data_ = missing_data;
- this->messaging_object ()->get_message_data (&pqd);
-
- // Check whether the message was fragmented and try to consolidate
- // the fragments..
- if (pqd.more_fragments_ ||
- (pqd.msg_type_ == TAO_PLUGGABLE_MESSAGE_FRAGMENT))
- {
- // Duplicate the queued data as it is on stack..
- TAO_Queued_Data *nqd = TAO_Queued_Data::duplicate (pqd);
-
- return this->consolidate_fragments (nqd, rh);
- }
-
- // Now we have a full message in our buffer. Just go ahead and
- // process that
- return this->process_parsed_messages (&pqd,
- rh);
}
-int
-TAO_Transport::consolidate_fragments (TAO_Queued_Data *qd,
- TAO_Resume_Handle &rh)
-{
- // If we have received a fragment message then we have to
- // consolidate <qd> with the last message in queue
- // @@todo: this piece of logic follows GIOP a bit... Need to revisit
- // if we have protocols other than GIOP
-
- // @@todo: Fragments now have too much copying overhead. Need to get
- // them out if we want to have some reasonable performance metrics
- // in future.. Post 1.2 seems a nice time..
- if (qd->msg_type_ == TAO_PLUGGABLE_MESSAGE_FRAGMENT)
- {
- TAO_Queued_Data *tqd =
- this->incoming_message_queue_.dequeue_tail ();
-
- tqd->more_fragments_ = qd->more_fragments_;
- tqd->missing_data_ = qd->missing_data_;
-
- if (this->messaging_object ()->consolidate_fragments (tqd, qd) == -1)
- {
- return -1;
- }
-
- TAO_Queued_Data::release (qd);
- this->incoming_message_queue_.enqueue_tail (tqd);
- this->process_queue_head (rh);
- }
- else
- {
- // if we dont have a fragment already in the queue just add it in
- // the queue
- this->incoming_message_queue_.enqueue_tail (qd);
- }
-
- return 0;
-}
int
-TAO_Transport::consolidate_message_queue (ACE_Message_Block &incoming,
- ssize_t missing_data,
- TAO_Resume_Handle &rh,
- ACE_Time_Value *max_wait_time)
+TAO_Transport::enqueue_incoming_message (TAO_Queued_Data *queueable_message)
{
- if (TAO_debug_level > 4)
- {
- ACE_DEBUG ((LM_DEBUG,
- "TAO (%P|%t) - Transport[%d]::consolidate_message_queue\n",
- this->id ()));
- }
-
- // If the queue did not have a complete message put this piece of
- // message in the queue. We know it did not have a complete
- // message. That is why we are here.
- const size_t n =
- this->incoming_message_queue_.copy_tail (incoming);
-
- if (TAO_debug_level > 6)
- {
- ACE_DEBUG ((LM_DEBUG,
- "TAO (%P|%t) - Transport[%d]::consolidate_message_queue, "
- "copied [%d] bytes to the tail\n",
- this->id (),
- n));
- }
-
- // Update the missing data...
- missing_data =
- this->incoming_message_queue_.missing_data_tail ();
-
- if (TAO_debug_level > 6)
- {
- ACE_DEBUG ((LM_DEBUG,
- "TAO (%P|%t) - Transport[%d]::consolidate_message_queue, "
- "missing [%d] bytes in the tail message\n",
- this->id (),
- missing_data));
- }
-
- // Move the read pointer of the <incoming> message block to the end
- // of the copied message and process the remaining portion...
- incoming.rd_ptr (n);
-
- // If we have some more information left in the message block..
- if (incoming.length ())
- {
- // We may have to parse & consolidate. This part of the message
- // doesn't seem to be part of the last message in the queue (as
- // the copy () hasn't taken away this message).
- const int retval = this->parse_consolidate_messages (incoming,
- rh,
- max_wait_time);
-
- // If there is an error return
- if (retval == -1)
- {
- if (TAO_debug_level)
- {
- ACE_DEBUG ((LM_DEBUG,
- "TAO (%P|%t) - Transport[%d]::consolidate_message_queue, "
- "error while consolidating, part of the read message\n",
- this->id ()));
- }
- return retval;
- }
- else if (retval == 1)
+ // Get the GIOP version
+ CORBA::Octet major = queueable_message->major_version_;
+ CORBA::Octet minor = queueable_message->minor_version_;
+ CORBA::UShort whole = major << 8 | minor;
+
+ // Set up a couple of pointers that are shared by the code
+ // for the different GIOP versions.
+ ACE_Message_Block *mb = 0;
+ TAO_Queued_Data *fragment_message = 0;
+
+ switch(whole)
+ {
+ case 0x0100: // GIOP 1.0
+ if (!queueable_message->more_fragments_)
+ return this->incoming_message_queue_.enqueue_tail (
+ queueable_message);
+
+ // Fragments aren't supported in 1.0. This is an error and
+ // we should reject it somehow. What do we do here? Do we throw
+ // an exception to the receiving side? Do we throw an exception
+ // to the sending side?
+ //
+ // At the very least, we need to log the fact that we received
+ // nonsense.
+ ACE_ERROR_RETURN ((LM_ERROR,
+ ACE_TEXT("TAO (%P|%t) - ")
+ ACE_TEXT("TAO_Transport::enqueue_incoming_message ")
+ ACE_TEXT("detected a fragmented GIOP 1.0 message\n")),
+ -1);
+ break;
+ case 0x0101: // GIOP 1.1
+ // In 1.1, fragments kinda suck because they don't have they're
+ // own message-specific header. Therefore, we have to do the
+ // following:
+ fragment_message =
+ this->incoming_message_queue_.find_fragment (major, minor);
+
+ // No fragment was found
+ if (fragment_message == 0)
+ return this->incoming_message_queue_.enqueue_tail (
+ queueable_message);
+
+ if (queueable_message->more_fragments_)
{
- // If the message in the <incoming> message block has only
- // one message left we need to process that seperately.
-
- // Get a queued data
- TAO_Queued_Data *qd = this->make_queued_data (incoming);
-
- // Get the rest of the message data
- this->messaging_object ()->get_message_data (qd);
-
- // Add the missing data to the queue
- qd->missing_data_ = 0;
-
- // Check whether the message was fragmented and try to consolidate
- // the fragments..
- if (qd->more_fragments_
- || (qd->msg_type_ == TAO_PLUGGABLE_MESSAGE_FRAGMENT))
- {
- return this->consolidate_fragments (qd, rh);
- }
-
- // Add it to the tail of the queue..
- this->incoming_message_queue_.enqueue_tail (qd);
-
- // We should surely have a message in queue now. So just
- // process that.
- return this->process_queue_head (rh);
+ // Find the last message block in the continuation
+ mb = fragment_message->msg_block_;
+ while (mb->cont () != 0)
+ mb = mb->cont ();
+
+ // Add the current message block to the end of the chain
+ // after adjusting the read pointer to skip the GIOP header
+ queueable_message->msg_block_->rd_ptr(TAO_GIOP_MESSAGE_HEADER_LEN);
+ mb->cont (queueable_message->msg_block_);
+
+ // Get rid of the queuable message but save the message block
+ queueable_message->msg_block_ = 0;
+ queueable_message->release ();
+
+ // One note is that TAO_Queued_Data contains version numbers,
+ // but doesn't indicate the actual protocol to which those
+ // version numbers refer. That's not a problem, though, because
+ // instances of TAO_Queued_Data live in a queue, and that queue
+ // lives in a particular instance of a Transport, and the
+ // transport instance has an association with a particular
+ // messaging_object. The concrete messaging object embodies a
+ // messaging protocol, and must cover all versions of that
+ // protocol. Therefore, we just need to cover the bases of all
+ // versions of that one protocol.
}
-
- // parse_consolidate_messages () would have processed one of the
- // messages, so we better return as we dont want to starve other
- // threads.
- return 0;
- }
-
- // If we still have some missing data..
- if (missing_data > 0)
- {
- // Get the last message from the Queue
- TAO_Queued_Data *qd =
- this->incoming_message_queue_.dequeue_tail ();
-
- if (TAO_debug_level > 5)
- {
- ACE_DEBUG ((LM_DEBUG,
- "TAO (%P|%t) - Transport[%d]::consolidate_message_queue, "
- "trying recv, again\n",
- this->id ()));
- }
-
- // Try to do a read again. If we have some luck it would be
- // great..
- const ssize_t n = this->recv (qd->msg_block_->wr_ptr (),
- missing_data,
- max_wait_time);
-
- if (TAO_debug_level > 5)
+ else
{
- ACE_DEBUG ((LM_DEBUG,
- "TAO (%P|%t) - Transport[%d]::consolidate_message_queue, "
- "recv retval [%d]\n",
- this->id (),
- n));
- }
+ // There is a complete chain of fragments
+ fragment_message->consolidate ();
- // Error...
- if (n < 0)
- {
- return n;
+ // Go ahead and enqueue this message
+ return this->incoming_message_queue_.enqueue_tail (
+ queueable_message);
}
-
- // If we get a EWOULDBLOCK ie. n==0, we should anyway put the
- // message in queue before returning..
- // Move the write pointer
- qd->msg_block_->wr_ptr (n);
-
- // Decrement the missing data
- qd->missing_data_ -= n;
-
- // Now put the TAO_Queued_Data back in the queue
- this->incoming_message_queue_.enqueue_tail (qd);
-
- // Any way as we have come this far and are about to return,
- // just try to process a message if it is there in the queue.
- if (this->incoming_message_queue_.is_head_complete ())
+ break;
+ case 0x0102: // GIOP 1.2
+ // In 1.2, we get a little more context. There's a
+ // FRAGMENT message-specific header, and inside that is the
+ // request id with which the fragment is associated.
+ fragment_message =
+ this->incoming_message_queue_.find_fragment (
+ queueable_message->request_id_);
+
+ // No fragment was found
+ if (fragment_message == 0)
+ return this->incoming_message_queue_.enqueue_tail (
+ queueable_message);
+
+ if (fragment_message->major_version_ != major ||
+ fragment_message->minor_version_ != minor)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ ACE_TEXT("TAO (%P|%t) - ")
+ ACE_TEXT("TAO_Transport::enqueue_incoming_message ")
+ ACE_TEXT("GIOP versions do not match ")
+ ACE_TEXT("(%d.%d != %d.%d\n"),
+ fragment_message->major_version_,
+ fragment_message->minor_version_,
+ major, minor),
+ -1);
+
+ // Find the last message block in the continuation
+ mb = fragment_message->msg_block_;
+ while (mb->cont () != 0)
+ mb = mb->cont ();
+
+ // Add the current message block to the end of the chain
+ // after adjusting the read pointer to skip the GIOP header
+ queueable_message->msg_block_->rd_ptr(TAO_GIOP_MESSAGE_HEADER_LEN +
+ TAO_GIOP_MESSAGE_FRAGMENT_HEADER);
+ mb->cont (queueable_message->msg_block_);
+
+ // Remove our reference to the message block. At this point
+ // the message block of the fragment head owns it as part of a
+ // chain
+ queueable_message->msg_block_ = 0;
+
+ if (!queueable_message->more_fragments_)
{
- return this->process_queue_head (rh);
+ // This is the end of the fragments for this request
+ fragment_message->consolidate ();
}
- return 0;
+ // Get rid of the queuable message
+ queueable_message->release ();
+ break;
+ default:
+ if (!queueable_message->more_fragments_)
+ return this->incoming_message_queue_.enqueue_tail (
+ queueable_message);
+ // This is an unknown GIOP version
+ ACE_ERROR_RETURN ((LM_ERROR,
+ ACE_TEXT("TAO (%P|%t) - ")
+ ACE_TEXT("TAO_Transport::enqueue_incoming_message ")
+ ACE_TEXT("can not handle a fragmented GIOP %d.%d ")
+ ACE_TEXT("message\n"), major, minor),
+ -1);
}
- // Process a message in the head of the queue if we have one..
- return this->process_queue_head (rh);
+ return 0;
}
int
-TAO_Transport::consolidate_extra_messages (ACE_Message_Block
- &incoming,
- TAO_Resume_Handle &rh)
-{
- if (TAO_debug_level > 4)
- {
- ACE_DEBUG ((LM_DEBUG,
- "TAO (%P|%t) - Transport[%d]::consolidate_extra_messages\n",
- this->id ()));
- }
-
- // Pick the tail of the queue
- TAO_Queued_Data *tail =
- this->incoming_message_queue_.dequeue_tail ();
-
- if (tail)
- {
- // If we have a node in the tail, checek to see whether it needs
- // consolidation. If so, just consolidate it.
- if (this->messaging_object ()->consolidate_node (tail, incoming) == -1)
- {
- return -1;
- }
-
- // .. put the tail back in queue..
- this->incoming_message_queue_.enqueue_tail (tail);
- }
-
- int retval = 1;
-
- if (TAO_debug_level > 6)
- {
- ACE_DEBUG ((LM_DEBUG,
- "TAO (%P|%t) - Transport[%d]::consolidate_extra_messages, "
- "extracting extra messages\n",
- this->id ()));
- }
-
- // Extract messages..
- while (retval == 1)
- {
- TAO_Queued_Data *q_data = 0;
-
- retval =
- this->messaging_object ()->extract_next_message (incoming,
- q_data);
- if (q_data)
- {
- // If we have read a framented message then...
- if (q_data->more_fragments_ ||
- q_data->msg_type_ == TAO_PLUGGABLE_MESSAGE_FRAGMENT)
- {
- this->consolidate_fragments (q_data, rh);
- }
- else
- {
- this->incoming_message_queue_.enqueue_tail (q_data);
- }
- }
- }
-
- // In case of error return..
- if (retval == -1)
- {
- return retval;
- }
-
- return this->process_queue_head (rh);
-}
-
-int
TAO_Transport::process_parsed_messages (TAO_Queued_Data *qd,
TAO_Resume_Handle &rh)
{
// Get the <message_type> that we have received
- const TAO_Pluggable_Message_Type t = qd->msg_type_;
-
- // int result = 0;
+ TAO_Pluggable_Message_Type t = qd->msg_type_;
+ int result = 0;
if (t == TAO_PLUGGABLE_MESSAGE_CLOSECONNECTION)
{
if (TAO_debug_level > 0)
ACE_DEBUG ((LM_DEBUG,
- "TAO (%P|%t) - Transport[%d]::process_parsed_messages, "
- "received CloseConnection message - %m\n",
+ ACE_TEXT("TAO (%P|%t) - Transport[%d]::process_parsed_messages, ")
+ ACE_TEXT("received CloseConnection message\n"),
this->id()));
// Return a "-1" so that the next stage can take care of
@@ -1885,15 +1914,32 @@ TAO_Transport::process_parsed_messages (TAO_Queued_Data *qd,
{
rh.resume_handle ();
+ // @@todo: Maybe the input_cdr can be constructed from the
+ // message_block
TAO_Pluggable_Reply_Params params (this);
+
if (this->messaging_object ()->process_reply_message (params,
qd) == -1)
{
if (TAO_debug_level > 0)
ACE_DEBUG ((LM_DEBUG,
- "TAO (%P|%t) - Transport[%d]::process_parsed_messages, "
- "error in process_reply_message - %m\n",
+ ACE_TEXT("TAO (%P|%t) - Transport[%d]::process_parsed_messages, ")
+ ACE_TEXT("error in process_reply_message %p\n"),
+ this->id (), ""));
+ return -1;
+ }
+
+ result = this->tms ()->dispatch_reply (params);
+
+ if (result == -1)
+ {
+ // Something really critical happened, we will forget about
+ // every reply on this connection.
+ if (TAO_debug_level > 0)
+ ACE_ERROR ((LM_ERROR,
+ ACE_TEXT("TAO (%P|%t) - Transport[%d]::process_parsed_messages, ")
+ ACE_TEXT("dispatch reply failed\n"),
this->id ()));
return -1;
@@ -1904,137 +1950,81 @@ TAO_Transport::process_parsed_messages (TAO_Queued_Data *qd,
{
if (TAO_debug_level)
{
- ACE_ERROR ((LM_ERROR,
- "TAO (%P|%t) - Transport[%d]::process_parsed_messages, "
- "received MessageError, closing connection\n",
- this->id ()));
+ ACE_ERROR_RETURN ((LM_ERROR,
+ ACE_TEXT("TAO (%P|%t) - Transport[%d]::process_parsed_messages, ")
+ ACE_TEXT("received MessageError, closing connection\n"),
+ this->id ()),
+ -1);
}
- return -1;
}
// If not, just return back..
return 0;
}
-TAO_Queued_Data *
-TAO_Transport::make_queued_data (ACE_Message_Block &incoming)
+int
+TAO_Transport::process_queue_head (TAO_Resume_Handle &rh)
{
- // Get an instance of TAO_Queued_Data
- TAO_Queued_Data *qd =
- TAO_Queued_Data::get_queued_data (
- this->orb_core_->transport_message_buffer_allocator ());
-
- // Get the flag for the details of the data block...
- ACE_Message_Block::Message_Flags flg =
- incoming.self_flags ();
-
- if (ACE_BIT_DISABLED (flg,
- ACE_Message_Block::DONT_DELETE))
+ if (TAO_debug_level > 3)
{
- // Duplicate the data block before putting it in the queue.
- qd->msg_block_ = ACE_Message_Block::duplicate (&incoming);
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT("TAO (%P|%t) - Transport[%d]::process_queue_head\n"),
+ this->id ()));
}
- else
- {
- // As we are in CORBA mode, all the data blocks would be aligned
- // on an 8 byte boundary. Hence create a data block for more
- // than the actual length
- ACE_Data_Block *db =
- this->orb_core_->create_input_cdr_data_block (incoming.length ()+
- ACE_CDR::MAX_ALIGNMENT);
-
- // Get the allocator..
- ACE_Allocator *alloc =
- this->orb_core_->input_cdr_msgblock_allocator ();
-
- // Make message block..
- ACE_Message_Block mb (db,
- 0,
- alloc);
-
- // Duplicate the block..
- qd->msg_block_ = mb.duplicate ();
-
- // Align the message block
- ACE_CDR::mb_align (qd->msg_block_);
- // Copy the data..
- qd->msg_block_->copy (incoming.rd_ptr (),
- incoming.length ());
- }
+ if (this->incoming_message_queue_.is_head_complete () != 1)
+ return 1;
- return qd;
-}
+ // Get the message on the head of the queue..
+ TAO_Queued_Data *qd =
+ this->incoming_message_queue_.dequeue_head ();
-int
-TAO_Transport::process_queue_head (TAO_Resume_Handle &rh)
-{
if (TAO_debug_level > 3)
{
ACE_DEBUG ((LM_DEBUG,
- "TAO (%P|%t) - Transport[%d]::process_queue_head\n",
- this->id ()));
+ ACE_TEXT("TAO (%P|%t) - Transport[%d]::process_queue_head, ")
+ ACE_TEXT("the size of the queue is [%d]\n"),
+ this->id (),
+ this->incoming_message_queue_.queue_length()));
}
-
- // See if the message in the head of the queue is complete...
- if (this->incoming_message_queue_.is_head_complete () > 0)
+ // Now that we have pulled out out one message out of the queue,
+ // check whether we have one more message in the queue...
+ if (this->incoming_message_queue_.queue_length () > 0)
{
- // Get the message on the head of the queue..
- TAO_Queued_Data *qd =
- this->incoming_message_queue_.dequeue_head ();
-
- if (TAO_debug_level > 3)
+ if (TAO_debug_level > 0)
{
ACE_DEBUG ((LM_DEBUG,
- "TAO (%P|%t) - Transport[%d]::process_queue_head, "
- "the size of the queue is [%d]\n",
- this->id (),
- this->incoming_message_queue_.queue_length()));
- }
- // Now that we have pulled out out one message out of the queue,
- // check whether we have one more message in the queue...
- if (this->incoming_message_queue_.is_head_complete () > 0)
- {
- if (TAO_debug_level > 0)
- {
- ACE_DEBUG ((LM_DEBUG,
- "TAO (%P|%t) - Transport[%d]::process_queue_head, "
- "notify reactor\n",
- this->id ()));
-
- }
-
- const int retval = this->notify_reactor ();
+ ACE_TEXT("TAO (%P|%t) - Transport[%d]::process_queue_head, ")
+ ACE_TEXT("notify reactor\n"),
+ this->id ()));
- if (retval == 1)
- {
- // Let the class know that it doesn't need to resume the
- // handle..
- rh.set_flag (TAO_Resume_Handle::TAO_HANDLE_LEAVE_SUSPENDED);
- }
- else if (retval < 0)
- return -1;
- }
- else
- {
- // As we are ready to process the last message just resume
- // the handle. Set the flag incase someone had reset the flag..
- rh.set_flag (TAO_Resume_Handle::TAO_HANDLE_RESUMABLE);
}
+ int retval =
+ this->notify_reactor ();
- // Process the message...
- if (this->process_parsed_messages (qd, rh) == -1)
+ if (retval == 1)
{
- return -1;
+ // Let the class know that it doesn't need to resume the
+ // handle..
+ rh.set_flag (TAO_Resume_Handle::TAO_HANDLE_LEAVE_SUSPENDED);
}
+ else if (retval < 0)
+ return -1;
+ }
+ else
+ {
+ // As we are ready to process the last message just resume
+ // the handle. Set the flag incase someone had reset the flag..
+ rh.set_flag (TAO_Resume_Handle::TAO_HANDLE_RESUMABLE);
+ }
- // Delete the Queued_Data..
- TAO_Queued_Data::release (qd);
+ // Process the message...
+ int retval = this->process_parsed_messages (qd, rh);
- return 0;
- }
+ // Delete the Queued_Data..
+ TAO_Queued_Data::release (qd);
- return 1;
+ return (retval == -1) ? -1 : 0;
}
int
@@ -2053,8 +2043,8 @@ TAO_Transport::notify_reactor (void)
if (TAO_debug_level > 0)
{
ACE_DEBUG ((LM_DEBUG,
- "TAO (%P|%t) - Transport[%d]::notify_reactor, "
- "notify to Reactor\n",
+ ACE_TEXT("TAO (%P|%t) - Transport[%d]::notify_reactor, ")
+ ACE_TEXT("notify to Reactor\n"),
this->id ()));
}
@@ -2068,8 +2058,8 @@ TAO_Transport::notify_reactor (void)
// @@todo: need to think about what is the action that
// we can take when we get here.
ACE_DEBUG ((LM_DEBUG,
- "TAO (%P|%t) - Transport[%d]::notify_reactor, "
- "notify to the reactor failed..\n",
+ ACE_TEXT("TAO (%P|%t) - Transport[%d]::process_queue_head, ")
+ ACE_TEXT("notify to the reactor failed..\n"),
this->id ()));
}