summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Cleeland <chris.cleeland@gmail.com>2003-12-15 22:31:47 +0000
committerChris Cleeland <chris.cleeland@gmail.com>2003-12-15 22:31:47 +0000
commitaa8cdced83cdfdf41381ed556543afc87658ec24 (patch)
tree1417384e5bb13a7370994d3958056aed19af2e64
parent26b202ad383751b37987343fbd8a70398c966720 (diff)
downloadATCD-unlabeled-1.97.2.tar.gz
Tag: pmb_integrationunlabeled-1.97.2
Started work on performance enhancements for PMB.
-rw-r--r--TAO/tao/GIOP_Message_Base.cpp353
1 files changed, 52 insertions, 301 deletions
diff --git a/TAO/tao/GIOP_Message_Base.cpp b/TAO/tao/GIOP_Message_Base.cpp
index 76aeeaee51e..7ada6c903ee 100644
--- a/TAO/tao/GIOP_Message_Base.cpp
+++ b/TAO/tao/GIOP_Message_Base.cpp
@@ -24,7 +24,7 @@ TAO_GIOP_Message_Base::TAO_GIOP_Message_Base (TAO_ORB_Core *orb_core,
size_t /*input_cdr_size*/)
: orb_core_ (orb_core)
, message_state_ (orb_core,
- this)
+ this)
, out_stream_ (this->buffer_,
sizeof this->buffer_, /* ACE_CDR::DEFAULT_BUFSIZE */
TAO_ENCAP_BYTE_ORDER,
@@ -295,6 +295,7 @@ TAO_GIOP_Message_Base::format_message (TAO_OutputCDR &stream)
TAO_Pluggable_Message_Type
TAO_GIOP_Message_Base::message_type (
TAO_GIOP_Message_State &msg_state)
+ const
{
// Convert to the right type of Pluggable Messaging message type.
@@ -329,264 +330,6 @@ TAO_GIOP_Message_Base::message_type (
}
int
-TAO_GIOP_Message_Base::parse_incoming_messages (ACE_Message_Block &incoming)
-{
-
- if (this->message_state_.parse_message_header (incoming) == -1)
- {
- return -1;
- }
-
- return 0;
-}
-
-ssize_t
-TAO_GIOP_Message_Base::missing_data (ACE_Message_Block &incoming)
-{
- // Actual message size including the header..
- CORBA::ULong msg_size =
- this->message_state_.message_size ();
-
- size_t len = incoming.length ();
-
- // If we have too many messages or if we have less than even a size
- // of the GIOP header then ..
- if (len > msg_size ||
- len < TAO_GIOP_MESSAGE_HEADER_LEN)
- {
- return -1;
- }
- else if (len == msg_size)
- return 0;
-
- return msg_size - len;
-}
-
-
-int
-TAO_GIOP_Message_Base::extract_next_message (ACE_Message_Block &incoming,
- TAO_Queued_Data *&qd)
-{
- TAO_GIOP_Message_State state (this->orb_core_,
- this);
-
- if (incoming.length () < TAO_GIOP_MESSAGE_HEADER_LEN)
- {
- if (incoming.length () > 0)
- {
- // Make a node which has a message block of the size of
- // MESSAGE_HEADER_LEN.
- qd =
- this->make_queued_data (TAO_GIOP_MESSAGE_HEADER_LEN);
-
- qd->msg_block_->copy (incoming.rd_ptr (),
- incoming.length ());
- qd->missing_data_ = -1;
- }
- return 0;
- }
-
- if (state.parse_message_header (incoming) == -1)
- {
- return -1;
- }
-
- size_t copying_len = state.message_size ();
-
- qd = this->make_queued_data (copying_len);
-
- if (copying_len > incoming.length ())
- {
- qd->missing_data_ =
- copying_len - incoming.length ();
-
- copying_len = incoming.length ();
- }
-
- qd->msg_block_->copy (incoming.rd_ptr (),
- copying_len);
-
- incoming.rd_ptr (copying_len);
- qd->byte_order_ = state.byte_order_;
- qd->major_version_ = state.giop_version_.major;
- qd->minor_version_ = state.giop_version_.minor;
- qd->msg_type_ = this->message_type (state);
- return 1;
-}
-
-int
-TAO_GIOP_Message_Base::consolidate_node (TAO_Queued_Data *qd,
- ACE_Message_Block &incoming)
-{
- // Look to see whether we had atleast parsed the GIOP header ...
- if (qd->missing_data_ == -1)
- {
- // The data length that has been stuck in there during the last
- // read ....
- size_t len =
- qd->msg_block_->length ();
-
- // We know that we would have space for
- // TAO_GIOP_MESSAGE_HEADER_LEN here. So copy that much of data
- // from the <incoming> into the message block in <qd>
- qd->msg_block_->copy (incoming.rd_ptr (),
- TAO_GIOP_MESSAGE_HEADER_LEN - len);
-
- // Move the rd_ptr () in the incoming message block..
- incoming.rd_ptr (TAO_GIOP_MESSAGE_HEADER_LEN - len);
-
- TAO_GIOP_Message_State state (this->orb_core_,
- this);
-
- // Parse the message header now...
- if (state.parse_message_header (*qd->msg_block_) == -1)
- return -1;
-
- // Now grow the message block so that we can copy the rest of
- // the data...
- if (qd->msg_block_->space () < state.message_size ())
- {
- ACE_CDR::grow (qd->msg_block_,
- state.message_size ());
- }
-
- // Copy the pay load..
- // Calculate the bytes that needs to be copied in the queue...
- size_t copy_len =
- state.payload_size ();
-
- // If the data that needs to be copied is more than that is
- // available to us ..
- if (copy_len > incoming.length ())
- {
- // Calculate the missing data..
- qd->missing_data_ =
- copy_len - incoming.length ();
-
- // Set the actual possible copy_len that is available...
- copy_len = incoming.length ();
- }
- else
- {
- qd->missing_data_ = 0;
- }
-
- // ..now we are set to copy the right amount of data to the
- // node..
- qd->msg_block_->copy (incoming.rd_ptr (),
- copy_len);
-
- // Set the <rd_ptr> of the <incoming>..
- incoming.rd_ptr (copy_len);
-
- // Get the other details...
- qd->byte_order_ = state.byte_order_;
- qd->major_version_ = state.giop_version_.major;
- qd->minor_version_ = state.giop_version_.minor;
- qd->msg_type_ = this->message_type (state);
- }
- else
- {
- // @@todo: Need to abstract this out to a seperate method...
- size_t copy_len = qd->missing_data_;
-
- if (copy_len > incoming.length ())
- {
- // Calculate the missing data..
- qd->missing_data_ =
- copy_len - incoming.length ();
-
- // Set the actual possible copy_len that is available...
- copy_len = incoming.length ();
- }
-
- // Copy the right amount of data in to the node...
- // node..
- qd->msg_block_->copy (incoming.rd_ptr (),
- copy_len);
-
- // Set the <rd_ptr> of the <incoming>..
- qd->msg_block_->rd_ptr (copy_len);
-
- }
-
-
- return 0;
-}
-
-
-int
-TAO_GIOP_Message_Base::consolidate_fragments (TAO_Queued_Data *dqd,
- const TAO_Queued_Data *sqd)
-{
- if (dqd->byte_order_ != sqd->byte_order_
- || dqd->major_version_ != sqd->major_version_
- || dqd->minor_version_ != sqd->minor_version_)
- {
- // Yes, print it out in all debug levels!. This is an error by
- // CORBA 2.4 spec
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("TAO (%P|%t) incompatible fragments:")
- ACE_TEXT ("different GIOP versions or byte order\n")));
- return -1;
- }
-
- // Skip the header in the incoming message
- sqd->msg_block_->rd_ptr (TAO_GIOP_MESSAGE_HEADER_LEN);
-
- // If we have a fragment header skip the header length too..
- if (sqd->minor_version_ == 2 &&
- sqd->msg_type_ == TAO_PLUGGABLE_MESSAGE_FRAGMENT)
- sqd->msg_block_->rd_ptr (TAO_GIOP_MESSAGE_FRAGMENT_HEADER);
-
- // Get the length of the incoming message block..
- size_t incoming_length =
- sqd->msg_block_->length ();
-
- // Increase the size of the destination message block if we need
- // to.
- ACE_Message_Block *mb =
- dqd->msg_block_;
-
- // Check space before growing.
- if (mb->space () < incoming_length)
- {
- ACE_CDR::grow (mb,
- mb->length () + incoming_length);
- }
-
- // Copy the data
- dqd->msg_block_->copy (sqd->msg_block_->rd_ptr (),
- incoming_length);
- return 0;
-}
-
-void
-TAO_GIOP_Message_Base::get_message_data (TAO_Queued_Data *qd)
-{
- // Get the message information
- qd->byte_order_ =
- this->message_state_.byte_order_;
- qd->major_version_ =
- this->message_state_.giop_version_.major;
- qd->minor_version_ =
- this->message_state_.giop_version_.minor;
-
- //qd->more_fragments_ = this->message_state_.more_fragments_;
-
- if (this->message_state_.more_fragments_)
- qd->more_fragments_ = 1;
- else
- qd->more_fragments_ = 0;
-
- qd->msg_type_=
- this->message_type (this->message_state_);
-
- // Reset the message_state
- this->message_state_.reset ();
-}
-
-int
TAO_GIOP_Message_Base::process_request_message (TAO_Transport *transport,
TAO_Queued_Data *qd)
@@ -764,13 +507,13 @@ TAO_GIOP_Message_Base::process_reply_message (
// Should be taken care by the state specific parsing
retval =
generator_parser->parse_reply (input_cdr,
- params);
+ params);
break;
case TAO_PLUGGABLE_MESSAGE_LOCATEREPLY:
retval =
generator_parser->parse_locate_reply (input_cdr,
- params);
+ params);
break;
default:
retval = -1;
@@ -898,7 +641,9 @@ TAO_GIOP_Message_Base::process_request (TAO_Transport *transport,
parse_error =
parser->parse_request_header (request);
- request.orb_core()->codeset_manager()->process_service_context(request);
+ TAO_Codeset_Manager *csm = request.orb_core()->codeset_manager();
+ if (csm)
+ csm->process_service_context(request);
transport->assign_translators(&cdr,&output);
// Throw an exception if the
@@ -1098,9 +843,9 @@ TAO_GIOP_Message_Base::process_locate_request (TAO_Transport *transport,
}
TAO::ObjectKey tmp_key (locate_request.object_key ().length (),
- locate_request.object_key ().length (),
- locate_request.object_key ().get_buffer (),
- 0);
+ locate_request.object_key ().length (),
+ locate_request.object_key ().get_buffer (),
+ 0);
// Set it to an error state
parse_error = 1;
@@ -1324,6 +1069,7 @@ TAO_GIOP_Message_Base::set_state (
}
+#if 0
// Server sends an "I'm shutting down now, any requests you've sent me
// can be retried" message to the server. The message is prefab, for
// simplicity.
@@ -1419,7 +1165,7 @@ TAO_GIOP_Message_Base::
transport-> id ()));
}
-
+#endif
int
TAO_GIOP_Message_Base::send_reply_exception (
@@ -1576,44 +1322,49 @@ TAO_GIOP_Message_Base::is_ready_for_bidirectional (TAO_OutputCDR &msg)
}
-TAO_Queued_Data *
-TAO_GIOP_Message_Base::make_queued_data (size_t sz)
+void
+TAO_GIOP_Message_Base::set_queued_data_from_message_header (
+ TAO_Queued_Data *qd,
+ const ACE_Message_Block &mb
+ ) const
{
- // Get a node for the queue..
- TAO_Queued_Data *qd =
- TAO_Queued_Data::get_queued_data (
- this->orb_core_->transport_message_buffer_allocator ());
-
- // @@todo: We have a similar method in Transport.cpp. Need to see how
- // we can factor them out..
- // Make a datablock for the size requested + something. The
- // "something" is required because we are going to align the data
- // block in the message block. During alignment we could loose some
- // bytes. As we may not know how many bytes will be lost, we will
- // allocate ACE_CDR::MAX_ALIGNMENT extra.
- ACE_Data_Block *db =
- this->orb_core_->create_input_cdr_data_block (sz +
- ACE_CDR::MAX_ALIGNMENT);
-
- ACE_Allocator *alloc =
- this->orb_core_->input_cdr_msgblock_allocator ();
-
- ACE_Message_Block mb (db,
- 0,
- alloc);
-
- ACE_Message_Block *new_mb = mb.duplicate ();
-
- ACE_CDR::mb_align (new_mb);
-
- qd->msg_block_ = new_mb;
-
+ // @@CJC: Try leaving out the declaration for this->message_state_
+ // and see what pukes. I don't think we need it any more.
+ TAO_GIOP_Message_State state;
+ if (state.take_values_from_message_block (mb) == -1)
+ {
+ // what the heck do we do here?!
+ qd->current_state_ = TAO_Queued_Data::INVALID;
+ return;
+ }
- return qd;
+ // It'd be nice to have an abstract base for GIOP_Message_State
+ // so that there could just be a line like:
+ // qd->take_values_from (state);
+ // Get the message information
+ qd->byte_order_ = state.byte_order ();
+ qd->major_version_ = state.giop_version ().major;
+ qd->minor_version_ = state.giop_version ().minor;
+ qd->more_fragments_ = state.more_fragments () ? 1 : 0;
+ qd->request_id_ = state.request_id_;
+ qd->msg_type_= message_type (state);
+ qd->missing_data_bytes_ = state.payload_size ();
}
-size_t
-TAO_GIOP_Message_Base::header_length (void) const
+int
+TAO_GIOP_Message_Base::check_for_valid_header (
+ const ACE_Message_Block &mb
+ ) const
{
- return TAO_GIOP_MESSAGE_HEADER_LEN;
+ // NOTE! We don't hardcode the length of the header b/c header_length should
+ // be eligible for inlining by pretty much any compiler, and it should return
+ // a constant. The rest of this method is hard-coded and hand-optimized because
+ // this method gets called A LOT.
+ if (mb.length () < this->header_length ())
+ return -1;
+
+ // Is finding that it's the right length and the magic bytes present
+ // enough to declare it a valid header? I think so...
+ register const char* h = mb.rd_ptr ();
+ return (h[0] == 'G' && h[1] == 'I' && h[2] == 'O' && h[3] == 'P') ? 1 : 0;
}