summaryrefslogtreecommitdiff
path: root/TAO/tao/GIOP_Message_Base.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'TAO/tao/GIOP_Message_Base.cpp')
-rw-r--r--TAO/tao/GIOP_Message_Base.cpp353
1 files changed, 301 insertions, 52 deletions
diff --git a/TAO/tao/GIOP_Message_Base.cpp b/TAO/tao/GIOP_Message_Base.cpp
index 7ada6c903ee..76aeeaee51e 100644
--- a/TAO/tao/GIOP_Message_Base.cpp
+++ b/TAO/tao/GIOP_Message_Base.cpp
@@ -24,7 +24,7 @@ TAO_GIOP_Message_Base::TAO_GIOP_Message_Base (TAO_ORB_Core *orb_core,
size_t /*input_cdr_size*/)
: orb_core_ (orb_core)
, message_state_ (orb_core,
- this)
+ this)
, out_stream_ (this->buffer_,
sizeof this->buffer_, /* ACE_CDR::DEFAULT_BUFSIZE */
TAO_ENCAP_BYTE_ORDER,
@@ -295,7 +295,6 @@ TAO_GIOP_Message_Base::format_message (TAO_OutputCDR &stream)
TAO_Pluggable_Message_Type
TAO_GIOP_Message_Base::message_type (
TAO_GIOP_Message_State &msg_state)
- const
{
// Convert to the right type of Pluggable Messaging message type.
@@ -330,6 +329,264 @@ TAO_GIOP_Message_Base::message_type (
}
int
+TAO_GIOP_Message_Base::parse_incoming_messages (ACE_Message_Block &incoming)
+{
+
+ if (this->message_state_.parse_message_header (incoming) == -1)
+ {
+ return -1;
+ }
+
+ return 0;
+}
+
+ssize_t
+TAO_GIOP_Message_Base::missing_data (ACE_Message_Block &incoming)
+{
+ // Actual message size including the header..
+ CORBA::ULong msg_size =
+ this->message_state_.message_size ();
+
+ size_t len = incoming.length ();
+
+ // If we have too many messages or if we have less than even a size
+ // of the GIOP header then ..
+ if (len > msg_size ||
+ len < TAO_GIOP_MESSAGE_HEADER_LEN)
+ {
+ return -1;
+ }
+ else if (len == msg_size)
+ return 0;
+
+ return msg_size - len;
+}
+
+
+int
+TAO_GIOP_Message_Base::extract_next_message (ACE_Message_Block &incoming,
+ TAO_Queued_Data *&qd)
+{
+ TAO_GIOP_Message_State state (this->orb_core_,
+ this);
+
+ if (incoming.length () < TAO_GIOP_MESSAGE_HEADER_LEN)
+ {
+ if (incoming.length () > 0)
+ {
+ // Make a node which has a message block of the size of
+ // MESSAGE_HEADER_LEN.
+ qd =
+ this->make_queued_data (TAO_GIOP_MESSAGE_HEADER_LEN);
+
+ qd->msg_block_->copy (incoming.rd_ptr (),
+ incoming.length ());
+ qd->missing_data_ = -1;
+ }
+ return 0;
+ }
+
+ if (state.parse_message_header (incoming) == -1)
+ {
+ return -1;
+ }
+
+ size_t copying_len = state.message_size ();
+
+ qd = this->make_queued_data (copying_len);
+
+ if (copying_len > incoming.length ())
+ {
+ qd->missing_data_ =
+ copying_len - incoming.length ();
+
+ copying_len = incoming.length ();
+ }
+
+ qd->msg_block_->copy (incoming.rd_ptr (),
+ copying_len);
+
+ incoming.rd_ptr (copying_len);
+ qd->byte_order_ = state.byte_order_;
+ qd->major_version_ = state.giop_version_.major;
+ qd->minor_version_ = state.giop_version_.minor;
+ qd->msg_type_ = this->message_type (state);
+ return 1;
+}
+
+int
+TAO_GIOP_Message_Base::consolidate_node (TAO_Queued_Data *qd,
+ ACE_Message_Block &incoming)
+{
+ // Look to see whether we had atleast parsed the GIOP header ...
+ if (qd->missing_data_ == -1)
+ {
+ // The data length that has been stuck in there during the last
+ // read ....
+ size_t len =
+ qd->msg_block_->length ();
+
+ // We know that we would have space for
+ // TAO_GIOP_MESSAGE_HEADER_LEN here. So copy that much of data
+ // from the <incoming> into the message block in <qd>
+ qd->msg_block_->copy (incoming.rd_ptr (),
+ TAO_GIOP_MESSAGE_HEADER_LEN - len);
+
+ // Move the rd_ptr () in the incoming message block..
+ incoming.rd_ptr (TAO_GIOP_MESSAGE_HEADER_LEN - len);
+
+ TAO_GIOP_Message_State state (this->orb_core_,
+ this);
+
+ // Parse the message header now...
+ if (state.parse_message_header (*qd->msg_block_) == -1)
+ return -1;
+
+ // Now grow the message block so that we can copy the rest of
+ // the data...
+ if (qd->msg_block_->space () < state.message_size ())
+ {
+ ACE_CDR::grow (qd->msg_block_,
+ state.message_size ());
+ }
+
+ // Copy the pay load..
+ // Calculate the bytes that needs to be copied in the queue...
+ size_t copy_len =
+ state.payload_size ();
+
+ // If the data that needs to be copied is more than that is
+ // available to us ..
+ if (copy_len > incoming.length ())
+ {
+ // Calculate the missing data..
+ qd->missing_data_ =
+ copy_len - incoming.length ();
+
+ // Set the actual possible copy_len that is available...
+ copy_len = incoming.length ();
+ }
+ else
+ {
+ qd->missing_data_ = 0;
+ }
+
+ // ..now we are set to copy the right amount of data to the
+ // node..
+ qd->msg_block_->copy (incoming.rd_ptr (),
+ copy_len);
+
+ // Set the <rd_ptr> of the <incoming>..
+ incoming.rd_ptr (copy_len);
+
+ // Get the other details...
+ qd->byte_order_ = state.byte_order_;
+ qd->major_version_ = state.giop_version_.major;
+ qd->minor_version_ = state.giop_version_.minor;
+ qd->msg_type_ = this->message_type (state);
+ }
+ else
+ {
+ // @@todo: Need to abstract this out to a seperate method...
+ size_t copy_len = qd->missing_data_;
+
+ if (copy_len > incoming.length ())
+ {
+ // Calculate the missing data..
+ qd->missing_data_ =
+ copy_len - incoming.length ();
+
+ // Set the actual possible copy_len that is available...
+ copy_len = incoming.length ();
+ }
+
+ // Copy the right amount of data in to the node...
+ // node..
+ qd->msg_block_->copy (incoming.rd_ptr (),
+ copy_len);
+
+ // Set the <rd_ptr> of the <incoming>..
+ qd->msg_block_->rd_ptr (copy_len);
+
+ }
+
+
+ return 0;
+}
+
+
+int
+TAO_GIOP_Message_Base::consolidate_fragments (TAO_Queued_Data *dqd,
+ const TAO_Queued_Data *sqd)
+{
+ if (dqd->byte_order_ != sqd->byte_order_
+ || dqd->major_version_ != sqd->major_version_
+ || dqd->minor_version_ != sqd->minor_version_)
+ {
+ // Yes, print it out in all debug levels!. This is an error by
+ // CORBA 2.4 spec
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("TAO (%P|%t) incompatible fragments:")
+ ACE_TEXT ("different GIOP versions or byte order\n")));
+ return -1;
+ }
+
+ // Skip the header in the incoming message
+ sqd->msg_block_->rd_ptr (TAO_GIOP_MESSAGE_HEADER_LEN);
+
+ // If we have a fragment header skip the header length too..
+ if (sqd->minor_version_ == 2 &&
+ sqd->msg_type_ == TAO_PLUGGABLE_MESSAGE_FRAGMENT)
+ sqd->msg_block_->rd_ptr (TAO_GIOP_MESSAGE_FRAGMENT_HEADER);
+
+ // Get the length of the incoming message block..
+ size_t incoming_length =
+ sqd->msg_block_->length ();
+
+ // Increase the size of the destination message block if we need
+ // to.
+ ACE_Message_Block *mb =
+ dqd->msg_block_;
+
+ // Check space before growing.
+ if (mb->space () < incoming_length)
+ {
+ ACE_CDR::grow (mb,
+ mb->length () + incoming_length);
+ }
+
+ // Copy the data
+ dqd->msg_block_->copy (sqd->msg_block_->rd_ptr (),
+ incoming_length);
+ return 0;
+}
+
+void
+TAO_GIOP_Message_Base::get_message_data (TAO_Queued_Data *qd)
+{
+ // Get the message information
+ qd->byte_order_ =
+ this->message_state_.byte_order_;
+ qd->major_version_ =
+ this->message_state_.giop_version_.major;
+ qd->minor_version_ =
+ this->message_state_.giop_version_.minor;
+
+ //qd->more_fragments_ = this->message_state_.more_fragments_;
+
+ if (this->message_state_.more_fragments_)
+ qd->more_fragments_ = 1;
+ else
+ qd->more_fragments_ = 0;
+
+ qd->msg_type_=
+ this->message_type (this->message_state_);
+
+ // Reset the message_state
+ this->message_state_.reset ();
+}
+
+int
TAO_GIOP_Message_Base::process_request_message (TAO_Transport *transport,
TAO_Queued_Data *qd)
@@ -507,13 +764,13 @@ TAO_GIOP_Message_Base::process_reply_message (
// Should be taken care by the state specific parsing
retval =
generator_parser->parse_reply (input_cdr,
- params);
+ params);
break;
case TAO_PLUGGABLE_MESSAGE_LOCATEREPLY:
retval =
generator_parser->parse_locate_reply (input_cdr,
- params);
+ params);
break;
default:
retval = -1;
@@ -641,9 +898,7 @@ TAO_GIOP_Message_Base::process_request (TAO_Transport *transport,
parse_error =
parser->parse_request_header (request);
- TAO_Codeset_Manager *csm = request.orb_core()->codeset_manager();
- if (csm)
- csm->process_service_context(request);
+ request.orb_core()->codeset_manager()->process_service_context(request);
transport->assign_translators(&cdr,&output);
// Throw an exception if the
@@ -843,9 +1098,9 @@ TAO_GIOP_Message_Base::process_locate_request (TAO_Transport *transport,
}
TAO::ObjectKey tmp_key (locate_request.object_key ().length (),
- locate_request.object_key ().length (),
- locate_request.object_key ().get_buffer (),
- 0);
+ locate_request.object_key ().length (),
+ locate_request.object_key ().get_buffer (),
+ 0);
// Set it to an error state
parse_error = 1;
@@ -1069,7 +1324,6 @@ TAO_GIOP_Message_Base::set_state (
}
-#if 0
// Server sends an "I'm shutting down now, any requests you've sent me
// can be retried" message to the server. The message is prefab, for
// simplicity.
@@ -1165,7 +1419,7 @@ TAO_GIOP_Message_Base::
transport-> id ()));
}
-#endif
+
int
TAO_GIOP_Message_Base::send_reply_exception (
@@ -1322,49 +1576,44 @@ TAO_GIOP_Message_Base::is_ready_for_bidirectional (TAO_OutputCDR &msg)
}
-void
-TAO_GIOP_Message_Base::set_queued_data_from_message_header (
- TAO_Queued_Data *qd,
- const ACE_Message_Block &mb
- ) const
+TAO_Queued_Data *
+TAO_GIOP_Message_Base::make_queued_data (size_t sz)
{
- // @@CJC: Try leaving out the declaration for this->message_state_
- // and see what pukes. I don't think we need it any more.
- TAO_GIOP_Message_State state;
- if (state.take_values_from_message_block (mb) == -1)
- {
- // what the heck do we do here?!
- qd->current_state_ = TAO_Queued_Data::INVALID;
- return;
- }
+ // Get a node for the queue..
+ TAO_Queued_Data *qd =
+ TAO_Queued_Data::get_queued_data (
+ this->orb_core_->transport_message_buffer_allocator ());
- // It'd be nice to have an abstract base for GIOP_Message_State
- // so that there could just be a line like:
- // qd->take_values_from (state);
- // Get the message information
- qd->byte_order_ = state.byte_order ();
- qd->major_version_ = state.giop_version ().major;
- qd->minor_version_ = state.giop_version ().minor;
- qd->more_fragments_ = state.more_fragments () ? 1 : 0;
- qd->request_id_ = state.request_id_;
- qd->msg_type_= message_type (state);
- qd->missing_data_bytes_ = state.payload_size ();
+ // @@todo: We have a similar method in Transport.cpp. Need to see how
+ // we can factor them out..
+ // Make a datablock for the size requested + something. The
+ // "something" is required because we are going to align the data
+ // block in the message block. During alignment we could loose some
+ // bytes. As we may not know how many bytes will be lost, we will
+ // allocate ACE_CDR::MAX_ALIGNMENT extra.
+ ACE_Data_Block *db =
+ this->orb_core_->create_input_cdr_data_block (sz +
+ ACE_CDR::MAX_ALIGNMENT);
+
+ ACE_Allocator *alloc =
+ this->orb_core_->input_cdr_msgblock_allocator ();
+
+ ACE_Message_Block mb (db,
+ 0,
+ alloc);
+
+ ACE_Message_Block *new_mb = mb.duplicate ();
+
+ ACE_CDR::mb_align (new_mb);
+
+ qd->msg_block_ = new_mb;
+
+
+ return qd;
}
-int
-TAO_GIOP_Message_Base::check_for_valid_header (
- const ACE_Message_Block &mb
- ) const
+size_t
+TAO_GIOP_Message_Base::header_length (void) const
{
- // NOTE! We don't hardcode the length of the header b/c header_length should
- // be eligible for inlining by pretty much any compiler, and it should return
- // a constant. The rest of this method is hard-coded and hand-optimized because
- // this method gets called A LOT.
- if (mb.length () < this->header_length ())
- return -1;
-
- // Is finding that it's the right length and the magic bytes present
- // enough to declare it a valid header? I think so...
- register const char* h = mb.rd_ptr ();
- return (h[0] == 'G' && h[1] == 'I' && h[2] == 'O' && h[3] == 'P') ? 1 : 0;
+ return TAO_GIOP_MESSAGE_HEADER_LEN;
}