summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Cleeland <chris.cleeland@gmail.com>2003-12-10 20:03:57 +0000
committerChris Cleeland <chris.cleeland@gmail.com>2003-12-10 20:03:57 +0000
commitfc9c046e4dd9b6f63bc0a1dc7d5251b36c734465 (patch)
tree3c58d861979da2c37d0a8c221b4740888c57752f
parentc869905174c8b01084f5c7583211a84dc3219327 (diff)
downloadATCD-fc9c046e4dd9b6f63bc0a1dc7d5251b36c734465.tar.gz
Wed Dec 10 13:58:41 2003 Chris Cleeland <cleeland_c@ociweb.com>
PMB changes and minor related changes.
-rw-r--r--TAO/ChangeLog81
-rw-r--r--TAO/ChangeLogs/PMBChangeLog381
-rw-r--r--TAO/tao/GIOP_Message_Base.cpp353
-rw-r--r--TAO/tao/GIOP_Message_Base.h52
-rw-r--r--TAO/tao/GIOP_Message_Base.i8
-rw-r--r--TAO/tao/GIOP_Message_Generator_Parser_Impl.inl23
-rw-r--r--TAO/tao/GIOP_Message_Lite.cpp57
-rw-r--r--TAO/tao/GIOP_Message_Lite.h45
-rw-r--r--TAO/tao/GIOP_Message_State.cpp197
-rw-r--r--TAO/tao/GIOP_Message_State.h44
-rw-r--r--TAO/tao/GIOP_Message_State.inl31
-rw-r--r--TAO/tao/IIOP_Transport.cpp2
-rw-r--r--TAO/tao/Incoming_Message_Queue.cpp459
-rw-r--r--TAO/tao/Incoming_Message_Queue.h162
-rw-r--r--TAO/tao/Incoming_Message_Queue.inl23
-rw-r--r--TAO/tao/Pluggable_Messaging.h44
-rw-r--r--TAO/tao/Strategies/DIOP_Transport.cpp229
-rw-r--r--TAO/tao/Strategies/SHMIOP_Transport.cpp2
-rw-r--r--TAO/tao/Strategies/SHMIOP_Transport.h4
-rw-r--r--TAO/tao/Transport.cpp1208
-rw-r--r--TAO/tao/Transport.h104
-rw-r--r--TAO/tao/default_client.cpp2
-rw-r--r--TAO/tao/orbconf.h19
23 files changed, 2208 insertions, 1322 deletions
diff --git a/TAO/ChangeLog b/TAO/ChangeLog
index a3aad3e0413..687635a1f78 100644
--- a/TAO/ChangeLog
+++ b/TAO/ChangeLog
@@ -1,3 +1,84 @@
+Wed Dec 10 13:58:41 2003 Chris Cleeland <cleeland_c@ociweb.com>
+
+ * tao/default_client.cpp (parse_args): Corrected erroneous option
+ name printed when -ORBConnectStrategy option parsing hits an
+ error.
+
+ * tao/IIOP_Transport.cpp (recv): Corrected method name printed in
+ debug message.
+
+ * tao/GIOP_Message_Base.cpp:
+ * tao/GIOP_Message_Base.h:
+ * tao/GIOP_Message_Base.i:
+ * tao/GIOP_Message_Generator_Parser_Impl.inl:
+ * tao/GIOP_Message_Lite.cpp:
+ * tao/GIOP_Message_Lite.h:
+ * tao/GIOP_Message_State.cpp:
+ * tao/GIOP_Message_State.h:
+ * tao/GIOP_Message_State.inl:
+ * tao/Incoming_Message_Queue.cpp:
+ * tao/Incoming_Message_Queue.h:
+ * tao/Incoming_Message_Queue.inl:
+ * tao/Pluggable_Messaging.h:
+ * tao/Transport.cpp:
+ * tao/Transport.h:
+ * tao/orbconf.h:
+ * tao/Strategies/DIOP_Transport.cpp
+ * tao/Strategies/SHMIOP_Transport.cpp
+ * tao/Strategies/SHMIOP_Transport.h
+
+
+ Integrated aggregate fix to infamous Parse Magic Bytes problem.
+ The fix was originally developed in OCI's repository and
+ migrated here. An overall description of the problem and
+ solution follow, and a detailed changelog that follows the
+ development of this solution is found in
+ ChangeLogs/PMBChangeLog.
+
+ The old way of handling input naively assumed that a header would
+ never be broken across multiple read()s from the socket. Back in
+ the old days, this assumption was okay, because TAO performed a
+ recv_n() specifically for the header, and thus insured that it
+ wouldn't get less than a full header. However, when it was
+ changed to use a single read, that no longer worked.
+
+ Further complicating matters was the incoming message queue, and
+ the myriad of methods that manipulated elements on the message
+ queue.
+
+ The PMB changes rework things in the following ways:
+
+ 1. The incoming message queue now holds only completely-read
+ GIOP messages awaiting processing. Each GIOP message is held in
+ a heap-allocated ACE_Message_Block. When the message has been
+ processed, the processing code is responsible for calling
+ release on the queued message to return the memory to the heap.
+
+ 2. Since GIOP, by definition, assumes an ordered byte stream,
+ there can be only one partially read, i.e., uncompleted, message;
+ this is now held explicitly in TAO_Transport::uncompleted_message_
+ as a pointer to a heap-allocated ACE_Message_Block of the exact
+ correct size for the GIOP message.
+
+ 3. States for reading from the transport have been explicitly
+ defined rather than implied from values of other data members.
+
+ This change also correctly handles correct receipt of GIOP
+ fragments:
+
+ a. In the case of a GIOP 1.2 fragment, each fragment is a complete
+ GIOP message. It is, however, unable to be processed until all
+ fragments have arrived.
+
+ b. Unlike a TCP segment, a GIOP fragment is not stateless; all
+ GIOP 1.2 fragments for a specific request must be sent in-order.
+ However, with transport multiplexing, other messages--including
+ fragments for other requests--may interleave these fragments.
+
+ c. By design, there is no way to know the total number of payload
+ bytes across all GIOP fragments until the final fragment is
+ received.
+
Wed Dec 10 15:12:34 UTC 2003 Johnny Willemsen <jwillemsen@remedy.nl>
* tests/Smart_Proxies/Collocation/Smart_Proxy_Impl.cpp:
diff --git a/TAO/ChangeLogs/PMBChangeLog b/TAO/ChangeLogs/PMBChangeLog
new file mode 100644
index 00000000000..d93c29dae06
--- /dev/null
+++ b/TAO/ChangeLogs/PMBChangeLog
@@ -0,0 +1,381 @@
+Thu Mar 20 12:34:24 2003 Chris Cleeland <cleeland_c@ociweb.com>
+
+ * all: First merge back to OCI.
+
+Thu Mar 20 12:17:23 2003 Chris Cleeland <cleeland_c@ociweb.com>
+
+ * tao/Strategies/DIOP_Transport.cpp (handle_input_i):
+ * orbsvcs/orbsvcs/PortableGroup/UIPMC_Transport.cpp
+ (handle_input_i):
+
+ Update to use new parsing and message processing methods
+ consistent with PMB changes.
+
+ * tao/Transport.cpp (process_queue_head): Corrected a possible memory
+ leak where, if process_parsed_messages returned with an error,
+ the TAO_Queued_Data instance wouldn't be released.
+
+Tue Mar 18 14:57:07 2003 Chris Cleeland <cleeland_c@ociweb.com>
+
+ * VERSION:
+ * ChangeLog:
+ * tao/tao.mpc:
+ * tao/Version.h:
+ * tao/IIOP_Transport.cpp:
+ * tao/MCAST_Parser.cpp:
+ * tao/RTCORBA/Linear_Priority_Mapping.cpp:
+ * tao/RTCORBA/RT_ORBInitializer.cpp:
+ * tao/RTCORBA/RT_ORBInitializer.h:
+ * tao/RTCORBA/RT_ORB_Loader.cpp:
+ * tao/RTCORBA/RT_Protocols_Hooks.cpp:
+
+ Merged in changes from TAO 1.3.1. The actual tag for the merge
+ was pmb_branch_mainline_mergeout_1. The command was
+
+ $ cvs update -jpmb_branch_start -jpmb_branch_mainline_mergeout_1
+
+ after manually determining which files needed updates.
+
+Mon Mar 17 11:09:00 2003 Chris Cleeland <cleeland_c@ociweb.com>
+
+ * tao/GIOP_Message_Base.cpp (check_for_valid_header): Optimized
+ implementation.
+
+ * tao/GIOP_Message_Base.* (header_length): Moved to inline for
+ optimization.
+
+ * tao/GIOP_Message_State.cpp (TAO_Debug_Msg_Emitter_Guard):
+ Improved implementation so it didn't make copies of the message
+ even when it was never going to output anything due to debug
+ level being lower than the value that would cause debug output.
+
+ * tao/GIOP_Message_State.cpp (take_values_from_message_block):
+ * tao/GIOP_Message_State.cpp (set_version_info_from_buffer):
+ * tao/GIOP_Message_State.cpp (set_byte_order_info_from_buffer):
+ * tao/GIOP_Message_State.cpp (set_payload_size_from_buffer):
+
+ Removed use of TAO_Debug_Msg_Emitter_Guard.
+
+ * tao/GIOP_Message_State.inl (set_payload_size_from_buffer):
+
+ Moved to inline file.
+
+ * tao/Incoming_Message_Queue.h (TAO_Incoming_Message_Queue):
+ Changed name of member 'queued_data_' to 'last_added_' to be
+ more clear about what the pointer actually points to.
+
+ Added documentation that describes the structure of the linked
+ list so that hopefully future maintainers don't have to go
+ through the same learning curve I did.
+
+ * tao/Incoming_Message_Queue.*: Updates to reflect last_added_
+ member name change.
+
+ * tao/Incoming_Message_Queue.cpp (dequeue_head): Add check to not
+ allow dequeuing when size is zero, plus we now zero-out the
+ last_added_ pointer after we've dequeued the last item in the
+ list.
+
+ * tao/Incoming_Message_Queue.cpp (clone_mb_nocopy_size): Fix
+ setting and copying of flags on the message and data blocks so
+ that the DONT_DELETE flag specifically doesn't get copied. This
+ cured a HUGE memory leak that was causing substantial
+ performance problems.
+
+ * tao/Incoming_Message_Queue.cpp (make_uncompleted_message):
+ * tao/Incoming_Message_Queue.cpp (make_completed_message):
+
+ Cache values used over and over again within these methods such
+ as header and message block lengths
+
+ * tao/Transport.cpp (handle_input_i):
+
+ Added flag noting that we enqueued a message.
+
+ Changed how re-reading is performed; see entry for
+ try_to_complete for information.
+
+ Used ACE_CDR::grow to grow the uncompleted_message message block
+ once the payload size is known. This insures that the growth
+ happens with proper alignment constraints.
+
+ * tao/Transport.* (try_to_complete): Added this method which
+ tries to complete whatever partial message is held in
+ uncompleted_message_. The difference between doing this and
+ simply revisiting the top of handle_input_i is that this will
+ only try to read enough to complete the message rather than also
+ read another partial.
+
+ * tao/Transport.cpp (process_queue_head): Added check to insure
+ that we don't try to do any processing when there's nothing in
+ the queue. Also, removed old code leftover from when the
+ incoming queue could have partial GIOP messages in it. Finally,
+ insured that the return value matched previous requirements.
+
+
+Thu Mar 6 15:27:14 2003 Chris Cleeland <cleeland_c@ociweb.com>
+
+ * tao/Incoming_Message_Queue.cpp: Removed Shattering Encapsulation
+ hack around ace/Message_Block.h inclusion to make this link
+ properly on VC6.
+
+ * tao/GIOP_Message_State.cpp: Moved initialization of
+ TAO_Debug_Msg_Emitter_Guard::MAGIC_LENGTH outside the class
+ declaration.
+
+Wed Mar 5 13:08:37 2003 Chris Cleeland <cleeland_c@ociweb.com>
+
+ * tao/GIOP_Message_Generator_Parser_Impl.inl (check_revision):
+ Modified to only explicitly allow versions of GIOP that TAO
+ supports, and no others. Note that this change means that
+ whenever a new version of GIOP gets added to TAO, this function
+ must be updated, too. Hopefully that won't happen so often that
+ this becomes horribly burdensome. Thanks to Chad Elliott and
+ Paul Calabrese for ideas in getting this right.
+
+ * tao/Incoming_Message_Queue.* (make_uncompleted_message):
+
+ This now behaves similar to make_completed_message, i.e., it
+ allocates the message block held in TAO_Queued_Data and advnaced
+ the rd_ptr past the bytes it consumes.
+
+ * tao/Incoming_Message_Queue.cpp: Eliminated hacks on
+ ACE_Message_Block and replaced with functions
+ clone_mb_nocopy_size() and copy_mb_span() that utilize ONLY the
+ public API. make_completed_message() and
+ make_uncompleted_message() now use these.
+
+ * tao/Transport.cpp (handle_input_i): Enlarged the buffer size and
+ replaced the magic number for the maximum number of re-read
+ attempts with the manifest constant
+ TAO_MAX_TRANSPORT_REREAD_ATTEMPTS, now defined in orbconf.h.
+
+ Changed uses of make_uncompleted_message() to be consistent with
+ changes outlined above.
+
+ Corrected a bug where, upon re-reading after completely emptying
+ the stack-allocated message block, I forgot to reset the
+ rd_ptr/wr_ptr in the message block.
+
+ * tao/orbconf.h: Added new manifest constant
+ TAO_MAX_TRANSPORT_REREAD_ATTEMPTS. Documentation is in the file.
+
+Wed Feb 26 21:44:34 2003 Chris Cleeland <cleeland_c@ociweb.com>
+
+ * TAO/tao/GIOP_Message_Base.cpp:
+
+ Eliminate parse_incoming_messages(), missing_data(),
+ extract_next_message(), consolidate_node(),
+ consolidate_fragments(), get_message_data(), make_queued_data().
+
+ Add new methods set_queued_data_from_message_header() and
+ check_for_valid_header().
+
+ * TAO/tao/GIOP_Message_Base.h:
+
+ Eliminate #if0 parse_incoming_messages(), missing_data(),
+ extract_next_message(), consolidate_node(),
+ consolidate_fragments(), get_message_data(), and
+ make_queued_data() declarations.
+
+ Add declarations for set_queued_data_from_message_header() and
+ check_for_valid_header().
+
+ Make message_type() static.
+
+ * TAO/tao/GIOP_Message_Generator_Parser_Impl.inl:
+
+ Reimplement check_revision() to be exact about which GIOP
+ revisions are okay. This is different from before where it
+ loosely assumed that everything was valid unless it want higher
+ than a particular range.
+
+ * TAO/tao/GIOP_Message_Lite.cpp:
+
+ Remove parse_incoming_messages() thru consolidate_fragments();
+ remove make_queued_data().
+
+ Add set_queued_data_from_message_header() and
+ check_for_valid_header().
+
+ * TAO/tao/GIOP_Message_Lite.h:
+
+ Remove declarations for parse_incoming_messages() thru
+ consolidate_fragments(); remove make_queued_data().
+
+ Add declarations for set_queued_data_from_message_header() and
+ check_for_valid_header().
+
+ * TAO/tao/GIOP_Message_State.cpp:
+
+ Add TAO_Debug_Message_Emitter_Guard class within the compilation
+ unit.
+
+ Change CTOR not to use the 2nd argument.
+
+ Change CTOR not to initialize this->base_with the 2nd arg.
+
+ Add take_values_from_message_block().
+
+ Remove parse_magic_bytes(), parse_message_header_i(),
+ parse_message_header().
+
+ Rename get_version_info() to set_version_info_from_buffer();
+ change argument type to 'const char*'.
+
+ Rename get_byte_order_info() to set_byte_order_info_from_buffer();
+ change argument type to 'const char*'.
+
+ Rename get_payload_size() to set_payload_size_from_buffer();
+ change argument type to 'const char*'.
+
+ Change argument type for read_ulong() to 'const char*'.
+
+ * TAO/tao/GIOP_Message_State.h:
+
+ Add default value for CTOR arguments so we don't have to pass
+ them.
+
+ Add declaration for take_values_from_message_block().
+
+ #if0 parse_message_header() declaration.
+
+ Add documentation for byte_order().
+
+ Add new GIOP information accessors: giop_version(),
+ more_fragments(), and message_type().
+
+ #if0 declaration for parse_message_header_i() and
+ parse_magic_bytes().
+
+ Rename delcarations for get_version_info(), get_byte_order_info(),
+ and get_payload_size().
+
+ #if0 base_ member variable.
+
+ * TAO/tao/GIOP_Message_State.inl:
+
+ Add definitions for giop_version(), more_fragments(), and
+ message_type().
+
+ * TAO/tao/Incoming_Message_Queue.cpp:
+
+ Add #include for ace/Message_Block.h, but shatter encapsulation
+ and effectively extend its interface so that we can clone spans of
+ message blocks.
+
+ Modify copy_tail() to use new TAO_Queued_Data member names.
+
+ * TAO/tao/Incoming_Message_Queue.cpp (class TAO_Queued_Data):
+
+ Update CTORs to initialize new this->current_state_ data member.
+
+ Add new make_uncompleted_message(), ACE_Message_Block hack
+ "extensions", make_completed_message().
+
+ Rename get_queued_data() to make_queued_data().
+
+ * TAO/tao/Incoming_Message_Queue.h:
+
+ Changed #include <Pluggable_Messaging_Utils.h> to #include
+ <Pluggable_Messaging.h>
+
+ Documentation for is_tail_complete() and is_head_complete() and
+ is_tail_fragmented().
+
+ #if0 get_node().
+
+ * TAO/tao/Incoming_Message_Queue.h (class TAO_Queued_Data):
+
+ Make CTORs protected so creation can only go through new factory
+ methods, i.e., make_completed_message() and
+ make_uncompleted_message().
+
+ Add new factory methods make_completed_message() and
+ make_uncompleted_message().
+
+ Add release() method [in addition to static of same name].
+
+ Add explicit definition of various states and a member to hold the
+ current state (this->current_state_).
+
+ Rename this->missing_data_ flag/value with
+ this->missing_data_bytes_ count.
+
+ Documentation for this->byte_order_.
+
+ * TAO/tao/Incoming_Message_Queue.inl:
+
+ Replace uses of this->missing_data_ with
+ this->missing_data_bytes_.
+
+ #if0 get_node().
+
+ Add release() method.
+
+ * TAO/tao/Pluggable_Messaging.h:
+
+ #if0 parse_incoming_messages() through consolidate_fragments().
+
+ Add declarations for check_for_valid_header() and
+ set_queued_data_from_message_header().
+
+ * TAO/tao/Transport.cpp:
+
+ Add #include <ace/Min_Max.h>
+
+ Add uncompleted_message_ to initialization list.
+
+ Modify what happens when reading in handle_input_i():
+
+ - number_of_read_attempts is how many times we'll try to read
+ again to complete a message before we give up.
+
+ - read_from_the_connection label added
+
+ - correct 1st argument to this->recv() to be wr_ptr() rather than
+ rd_ptr().
+
+ - lots of changes that are substantially documented in the code.
+
+ #if0 parse_consolidate_messages(), parse_incoming_messages(),
+ missing_data(), consolidate_message(), consolidate_fragments(),
+ consolidate_message_queue(), consolidate_extra_messages(), and
+ make_queued_data().
+
+ * TAO/tao/Transport.h:
+
+ #if0 declarations for parse_consolidate_messages(), parse_incoming_messages(),
+ missing_data(), consolidate_message(), consolidate_fragments(),
+ consolidate_message_queue(), consolidate_extra_messages(), and
+ make_queued_data().
+
+ Add this->uncompleted_message_ data member.
+
+ * TAO/tao/Strategies/DIOP_Transport.cpp:
+
+ Change 1st arg of this->recv() in handle_input_i() to wr_ptr()
+ from rd_ptr().
+
+ * TAO/tao/Strategies/SHMIOP_Transport.cpp:
+
+ #if0 consolidate_message().
+
+ * TAO/tao/Strategies/SHMIOP_Transport.h:
+
+ #if0 declaration for consolidate_message().
+
+ * TAO/tests/AMI/run_test.pl:
+
+ Increase default number of iterations from 1 to 10000. Earlier
+ versions of the PMB changes succeeded just fine with low numbers
+ of iterations, but began to fail miserably when the number of
+ iterations climbed.
+
+ * TAO/tests/BiDirectional/run_test.pl:
+
+ Added level 10 debug.
+
+ * TAO/tests/InterOp-Naming/INS_test_client.cpp:
+
+ Changes that don't appear to be related in any way to PMB...
diff --git a/TAO/tao/GIOP_Message_Base.cpp b/TAO/tao/GIOP_Message_Base.cpp
index 76aeeaee51e..7ada6c903ee 100644
--- a/TAO/tao/GIOP_Message_Base.cpp
+++ b/TAO/tao/GIOP_Message_Base.cpp
@@ -24,7 +24,7 @@ TAO_GIOP_Message_Base::TAO_GIOP_Message_Base (TAO_ORB_Core *orb_core,
size_t /*input_cdr_size*/)
: orb_core_ (orb_core)
, message_state_ (orb_core,
- this)
+ this)
, out_stream_ (this->buffer_,
sizeof this->buffer_, /* ACE_CDR::DEFAULT_BUFSIZE */
TAO_ENCAP_BYTE_ORDER,
@@ -295,6 +295,7 @@ TAO_GIOP_Message_Base::format_message (TAO_OutputCDR &stream)
TAO_Pluggable_Message_Type
TAO_GIOP_Message_Base::message_type (
TAO_GIOP_Message_State &msg_state)
+ const
{
// Convert to the right type of Pluggable Messaging message type.
@@ -329,264 +330,6 @@ TAO_GIOP_Message_Base::message_type (
}
int
-TAO_GIOP_Message_Base::parse_incoming_messages (ACE_Message_Block &incoming)
-{
-
- if (this->message_state_.parse_message_header (incoming) == -1)
- {
- return -1;
- }
-
- return 0;
-}
-
-ssize_t
-TAO_GIOP_Message_Base::missing_data (ACE_Message_Block &incoming)
-{
- // Actual message size including the header..
- CORBA::ULong msg_size =
- this->message_state_.message_size ();
-
- size_t len = incoming.length ();
-
- // If we have too many messages or if we have less than even a size
- // of the GIOP header then ..
- if (len > msg_size ||
- len < TAO_GIOP_MESSAGE_HEADER_LEN)
- {
- return -1;
- }
- else if (len == msg_size)
- return 0;
-
- return msg_size - len;
-}
-
-
-int
-TAO_GIOP_Message_Base::extract_next_message (ACE_Message_Block &incoming,
- TAO_Queued_Data *&qd)
-{
- TAO_GIOP_Message_State state (this->orb_core_,
- this);
-
- if (incoming.length () < TAO_GIOP_MESSAGE_HEADER_LEN)
- {
- if (incoming.length () > 0)
- {
- // Make a node which has a message block of the size of
- // MESSAGE_HEADER_LEN.
- qd =
- this->make_queued_data (TAO_GIOP_MESSAGE_HEADER_LEN);
-
- qd->msg_block_->copy (incoming.rd_ptr (),
- incoming.length ());
- qd->missing_data_ = -1;
- }
- return 0;
- }
-
- if (state.parse_message_header (incoming) == -1)
- {
- return -1;
- }
-
- size_t copying_len = state.message_size ();
-
- qd = this->make_queued_data (copying_len);
-
- if (copying_len > incoming.length ())
- {
- qd->missing_data_ =
- copying_len - incoming.length ();
-
- copying_len = incoming.length ();
- }
-
- qd->msg_block_->copy (incoming.rd_ptr (),
- copying_len);
-
- incoming.rd_ptr (copying_len);
- qd->byte_order_ = state.byte_order_;
- qd->major_version_ = state.giop_version_.major;
- qd->minor_version_ = state.giop_version_.minor;
- qd->msg_type_ = this->message_type (state);
- return 1;
-}
-
-int
-TAO_GIOP_Message_Base::consolidate_node (TAO_Queued_Data *qd,
- ACE_Message_Block &incoming)
-{
- // Look to see whether we had atleast parsed the GIOP header ...
- if (qd->missing_data_ == -1)
- {
- // The data length that has been stuck in there during the last
- // read ....
- size_t len =
- qd->msg_block_->length ();
-
- // We know that we would have space for
- // TAO_GIOP_MESSAGE_HEADER_LEN here. So copy that much of data
- // from the <incoming> into the message block in <qd>
- qd->msg_block_->copy (incoming.rd_ptr (),
- TAO_GIOP_MESSAGE_HEADER_LEN - len);
-
- // Move the rd_ptr () in the incoming message block..
- incoming.rd_ptr (TAO_GIOP_MESSAGE_HEADER_LEN - len);
-
- TAO_GIOP_Message_State state (this->orb_core_,
- this);
-
- // Parse the message header now...
- if (state.parse_message_header (*qd->msg_block_) == -1)
- return -1;
-
- // Now grow the message block so that we can copy the rest of
- // the data...
- if (qd->msg_block_->space () < state.message_size ())
- {
- ACE_CDR::grow (qd->msg_block_,
- state.message_size ());
- }
-
- // Copy the pay load..
- // Calculate the bytes that needs to be copied in the queue...
- size_t copy_len =
- state.payload_size ();
-
- // If the data that needs to be copied is more than that is
- // available to us ..
- if (copy_len > incoming.length ())
- {
- // Calculate the missing data..
- qd->missing_data_ =
- copy_len - incoming.length ();
-
- // Set the actual possible copy_len that is available...
- copy_len = incoming.length ();
- }
- else
- {
- qd->missing_data_ = 0;
- }
-
- // ..now we are set to copy the right amount of data to the
- // node..
- qd->msg_block_->copy (incoming.rd_ptr (),
- copy_len);
-
- // Set the <rd_ptr> of the <incoming>..
- incoming.rd_ptr (copy_len);
-
- // Get the other details...
- qd->byte_order_ = state.byte_order_;
- qd->major_version_ = state.giop_version_.major;
- qd->minor_version_ = state.giop_version_.minor;
- qd->msg_type_ = this->message_type (state);
- }
- else
- {
- // @@todo: Need to abstract this out to a seperate method...
- size_t copy_len = qd->missing_data_;
-
- if (copy_len > incoming.length ())
- {
- // Calculate the missing data..
- qd->missing_data_ =
- copy_len - incoming.length ();
-
- // Set the actual possible copy_len that is available...
- copy_len = incoming.length ();
- }
-
- // Copy the right amount of data in to the node...
- // node..
- qd->msg_block_->copy (incoming.rd_ptr (),
- copy_len);
-
- // Set the <rd_ptr> of the <incoming>..
- qd->msg_block_->rd_ptr (copy_len);
-
- }
-
-
- return 0;
-}
-
-
-int
-TAO_GIOP_Message_Base::consolidate_fragments (TAO_Queued_Data *dqd,
- const TAO_Queued_Data *sqd)
-{
- if (dqd->byte_order_ != sqd->byte_order_
- || dqd->major_version_ != sqd->major_version_
- || dqd->minor_version_ != sqd->minor_version_)
- {
- // Yes, print it out in all debug levels!. This is an error by
- // CORBA 2.4 spec
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("TAO (%P|%t) incompatible fragments:")
- ACE_TEXT ("different GIOP versions or byte order\n")));
- return -1;
- }
-
- // Skip the header in the incoming message
- sqd->msg_block_->rd_ptr (TAO_GIOP_MESSAGE_HEADER_LEN);
-
- // If we have a fragment header skip the header length too..
- if (sqd->minor_version_ == 2 &&
- sqd->msg_type_ == TAO_PLUGGABLE_MESSAGE_FRAGMENT)
- sqd->msg_block_->rd_ptr (TAO_GIOP_MESSAGE_FRAGMENT_HEADER);
-
- // Get the length of the incoming message block..
- size_t incoming_length =
- sqd->msg_block_->length ();
-
- // Increase the size of the destination message block if we need
- // to.
- ACE_Message_Block *mb =
- dqd->msg_block_;
-
- // Check space before growing.
- if (mb->space () < incoming_length)
- {
- ACE_CDR::grow (mb,
- mb->length () + incoming_length);
- }
-
- // Copy the data
- dqd->msg_block_->copy (sqd->msg_block_->rd_ptr (),
- incoming_length);
- return 0;
-}
-
-void
-TAO_GIOP_Message_Base::get_message_data (TAO_Queued_Data *qd)
-{
- // Get the message information
- qd->byte_order_ =
- this->message_state_.byte_order_;
- qd->major_version_ =
- this->message_state_.giop_version_.major;
- qd->minor_version_ =
- this->message_state_.giop_version_.minor;
-
- //qd->more_fragments_ = this->message_state_.more_fragments_;
-
- if (this->message_state_.more_fragments_)
- qd->more_fragments_ = 1;
- else
- qd->more_fragments_ = 0;
-
- qd->msg_type_=
- this->message_type (this->message_state_);
-
- // Reset the message_state
- this->message_state_.reset ();
-}
-
-int
TAO_GIOP_Message_Base::process_request_message (TAO_Transport *transport,
TAO_Queued_Data *qd)
@@ -764,13 +507,13 @@ TAO_GIOP_Message_Base::process_reply_message (
// Should be taken care by the state specific parsing
retval =
generator_parser->parse_reply (input_cdr,
- params);
+ params);
break;
case TAO_PLUGGABLE_MESSAGE_LOCATEREPLY:
retval =
generator_parser->parse_locate_reply (input_cdr,
- params);
+ params);
break;
default:
retval = -1;
@@ -898,7 +641,9 @@ TAO_GIOP_Message_Base::process_request (TAO_Transport *transport,
parse_error =
parser->parse_request_header (request);
- request.orb_core()->codeset_manager()->process_service_context(request);
+ TAO_Codeset_Manager *csm = request.orb_core()->codeset_manager();
+ if (csm)
+ csm->process_service_context(request);
transport->assign_translators(&cdr,&output);
// Throw an exception if the
@@ -1098,9 +843,9 @@ TAO_GIOP_Message_Base::process_locate_request (TAO_Transport *transport,
}
TAO::ObjectKey tmp_key (locate_request.object_key ().length (),
- locate_request.object_key ().length (),
- locate_request.object_key ().get_buffer (),
- 0);
+ locate_request.object_key ().length (),
+ locate_request.object_key ().get_buffer (),
+ 0);
// Set it to an error state
parse_error = 1;
@@ -1324,6 +1069,7 @@ TAO_GIOP_Message_Base::set_state (
}
+#if 0
// Server sends an "I'm shutting down now, any requests you've sent me
// can be retried" message to the server. The message is prefab, for
// simplicity.
@@ -1419,7 +1165,7 @@ TAO_GIOP_Message_Base::
transport-> id ()));
}
-
+#endif
int
TAO_GIOP_Message_Base::send_reply_exception (
@@ -1576,44 +1322,49 @@ TAO_GIOP_Message_Base::is_ready_for_bidirectional (TAO_OutputCDR &msg)
}
-TAO_Queued_Data *
-TAO_GIOP_Message_Base::make_queued_data (size_t sz)
+void
+TAO_GIOP_Message_Base::set_queued_data_from_message_header (
+ TAO_Queued_Data *qd,
+ const ACE_Message_Block &mb
+ ) const
{
- // Get a node for the queue..
- TAO_Queued_Data *qd =
- TAO_Queued_Data::get_queued_data (
- this->orb_core_->transport_message_buffer_allocator ());
-
- // @@todo: We have a similar method in Transport.cpp. Need to see how
- // we can factor them out..
- // Make a datablock for the size requested + something. The
- // "something" is required because we are going to align the data
- // block in the message block. During alignment we could loose some
- // bytes. As we may not know how many bytes will be lost, we will
- // allocate ACE_CDR::MAX_ALIGNMENT extra.
- ACE_Data_Block *db =
- this->orb_core_->create_input_cdr_data_block (sz +
- ACE_CDR::MAX_ALIGNMENT);
-
- ACE_Allocator *alloc =
- this->orb_core_->input_cdr_msgblock_allocator ();
-
- ACE_Message_Block mb (db,
- 0,
- alloc);
-
- ACE_Message_Block *new_mb = mb.duplicate ();
-
- ACE_CDR::mb_align (new_mb);
-
- qd->msg_block_ = new_mb;
-
+ // @@CJC: Try leaving out the declaration for this->message_state_
+ // and see what pukes. I don't think we need it any more.
+ TAO_GIOP_Message_State state;
+ if (state.take_values_from_message_block (mb) == -1)
+ {
+ // what the heck do we do here?!
+ qd->current_state_ = TAO_Queued_Data::INVALID;
+ return;
+ }
- return qd;
+ // It'd be nice to have an abstract base for GIOP_Message_State
+ // so that there could just be a line like:
+ // qd->take_values_from (state);
+ // Get the message information
+ qd->byte_order_ = state.byte_order ();
+ qd->major_version_ = state.giop_version ().major;
+ qd->minor_version_ = state.giop_version ().minor;
+ qd->more_fragments_ = state.more_fragments () ? 1 : 0;
+ qd->request_id_ = state.request_id_;
+ qd->msg_type_= message_type (state);
+ qd->missing_data_bytes_ = state.payload_size ();
}
-size_t
-TAO_GIOP_Message_Base::header_length (void) const
+int
+TAO_GIOP_Message_Base::check_for_valid_header (
+ const ACE_Message_Block &mb
+ ) const
{
- return TAO_GIOP_MESSAGE_HEADER_LEN;
+ // NOTE! We don't hardcode the length of the header b/c header_length should
+ // be eligible for inlining by pretty much any compiler, and it should return
+ // a constant. The rest of this method is hard-coded and hand-optimized because
+ // this method gets called A LOT.
+ if (mb.length () < this->header_length ())
+ return -1;
+
+ // Is finding that it's the right length and the magic bytes present
+ // enough to declare it a valid header? I think so...
+ register const char* h = mb.rd_ptr ();
+ return (h[0] == 'G' && h[1] == 'I' && h[2] == 'O' && h[3] == 'P') ? 1 : 0;
}
diff --git a/TAO/tao/GIOP_Message_Base.h b/TAO/tao/GIOP_Message_Base.h
index 461673bd359..7bf61e340f7 100644
--- a/TAO/tao/GIOP_Message_Base.h
+++ b/TAO/tao/GIOP_Message_Base.h
@@ -94,35 +94,37 @@ public:
/// the message.
virtual int format_message (TAO_OutputCDR &cdr);
- /// Parse the incoming messages..
- virtual int parse_incoming_messages (ACE_Message_Block &message_block);
+ /// Process the request message that we have received on the
+ /// connection
+ virtual int process_request_message (TAO_Transport *transport,
+ TAO_Queued_Data *qd);
- /// Calculate the amount of data that is missing in the <incoming>
- /// message block.
- virtual ssize_t missing_data (ACE_Message_Block &message_block);
+ /*!
+ \brief Inspects the bytes in \param mb to see if they "look like" the beginning of a message.
- /* Extract the details of the next message from the <incoming>
- * through <qd>. Returns 1 if there are more messages and returns a
- * 0 if there are no more messages in <incoming>.
- */
- virtual int extract_next_message (ACE_Message_Block &incoming,
- TAO_Queued_Data *&qd);
+ Inspects the bytes in \param mb, beginning at \code mb.rd_ptr, to
+ see if they look like the beginning of a message. If \code mb does not
+ contain less than \code header_length() bytes, this method cannot make a
+ complete evaluation, and returns a commensurate value.
- /// Check whether the node <qd> needs consolidation from <incoming>
- virtual int consolidate_node (TAO_Queued_Data *qd,
- ACE_Message_Block &incoming);
+ \return 1 \code header_length() bytes found, and constitute a valid header
+ \return 0 \code header_length() bytes found, and do not constitute a valid header
+ \return -1 not enough bytes available to make a determination of header validity
+ */
+ virtual int check_for_valid_header (const ACE_Message_Block &mb) const;
- /// Get the details of the message parsed through the <qd>.
- virtual void get_message_data (TAO_Queued_Data *qd);
+ /*!
+ \brief Set fields in \param qd based on values derived from \param mb.
- /// @@Bala:Docu??
- virtual int consolidate_fragments (TAO_Queued_Data *dqd,
- const TAO_Queued_Data *sqd);
+ This function sets fields in \param qd based on values derived
+ from \param mb. It assumes that if the length of \param mb is
+ enough to hold a header, then the data in there can be trusted to
+ make sense.
+ */
+ virtual void set_queued_data_from_message_header (
+ TAO_Queued_Data *,
+ const ACE_Message_Block &mb) const;
- /// Process the request message that we have received on the
- /// connection
- virtual int process_request_message (TAO_Transport *transport,
- TAO_Queued_Data *qd);
/// Parse the reply message that we received and return the reply
@@ -174,7 +176,7 @@ protected:
/// TAO_PLUGGABLE_MESSAGE_REPLY,
/// TAO_PLUGGABLE_MESSAGE_CLOSECONNECTION,
/// TAO_PLUGGABLE_MESSAGE_MESSAGE_ERROR.
- TAO_Pluggable_Message_Type message_type (TAO_GIOP_Message_State &state);
+ TAO_Pluggable_Message_Type message_type (TAO_GIOP_Message_State &state) const;
private:
@@ -197,10 +199,12 @@ private:
/// Send error messages
int send_error (TAO_Transport *transport);
+#if 0
/// Close a connection, first sending GIOP::CloseConnection.
void send_close_connection (const TAO_GIOP_Message_Version &version,
TAO_Transport *transport,
void *ctx);
+#endif
/// We must send a LocateReply through <transport>, this request
/// resulted in some kind of exception.
diff --git a/TAO/tao/GIOP_Message_Base.i b/TAO/tao/GIOP_Message_Base.i
index a589447a413..f5e39d9aa54 100644
--- a/TAO/tao/GIOP_Message_Base.i
+++ b/TAO/tao/GIOP_Message_Base.i
@@ -4,3 +4,11 @@
//
// GIOP_Message_Base
//
+
+
+ACE_INLINE size_t
+TAO_GIOP_Message_Base::header_length (void) const
+{
+ return TAO_GIOP_MESSAGE_HEADER_LEN;
+}
+
diff --git a/TAO/tao/GIOP_Message_Generator_Parser_Impl.inl b/TAO/tao/GIOP_Message_Generator_Parser_Impl.inl
index 18bb7936ffd..47e4730befb 100644
--- a/TAO/tao/GIOP_Message_Generator_Parser_Impl.inl
+++ b/TAO/tao/GIOP_Message_Generator_Parser_Impl.inl
@@ -5,9 +5,24 @@ TAO_GIOP_Message_Generator_Parser_Impl::
check_revision (CORBA::Octet incoming_major,
CORBA::Octet incoming_minor)
{
- if (incoming_major > TAO_DEF_GIOP_MAJOR ||
- incoming_minor > TAO_DEF_GIOP_MINOR)
+ CORBA::UShort version_as_whole_num = incoming_major << 8 | incoming_minor;
+ CORBA::UShort max_allowable_version = TAO_DEF_GIOP_MAJOR << 8 | TAO_DEF_GIOP_MINOR;
+
+ CORBA::Boolean ret = 0;
+
+ // If it's greater than the max, we know it's not allowed.
+ if (version_as_whole_num > max_allowable_version)
return 0;
-
- return 1;
+
+ // If it's less than the max, though, we still have to check for
+ // each explicit version and only allow the ones we know work.
+ switch (version_as_whole_num)
+ {
+ case 0x0100:
+ case 0x0101:
+ case 0x0102:
+ ret = 1;
+ }
+
+ return ret;
}
diff --git a/TAO/tao/GIOP_Message_Lite.cpp b/TAO/tao/GIOP_Message_Lite.cpp
index 462266dd9bf..3ff0683729a 100644
--- a/TAO/tao/GIOP_Message_Lite.cpp
+++ b/TAO/tao/GIOP_Message_Lite.cpp
@@ -243,6 +243,7 @@ TAO_GIOP_Message_Lite::format_message (TAO_OutputCDR &stream)
}
+#if 0
int
TAO_GIOP_Message_Lite::parse_incoming_messages (ACE_Message_Block &block)
{
@@ -501,6 +502,7 @@ TAO_GIOP_Message_Lite::consolidate_fragments (TAO_Queued_Data * /*dqd*/,
// We dont know what fragments are???
return -1;
}
+#endif
int
TAO_GIOP_Message_Lite::process_request_message (TAO_Transport *transport,
@@ -728,7 +730,9 @@ TAO_GIOP_Message_Lite::process_request (TAO_Transport *transport,
parse_error =
this->parse_request_header (request);
- request.orb_core()->codeset_manager()->process_service_context(request);
+ TAO_Codeset_Manager *csm = request.orb_core()->codeset_manager();
+ if (csm)
+ csm->process_service_context(request);
transport->assign_translators(&cdr,&output);
// Throw an exception if the
@@ -1626,38 +1630,6 @@ TAO_GIOP_Message_Lite::dump_msg (const char *label,
}
}
-TAO_Queued_Data *
-TAO_GIOP_Message_Lite::make_queued_data (size_t sz)
-{
- // Get a node for the queue..
- TAO_Queued_Data *qd =
- TAO_Queued_Data::get_queued_data ();
-
- // Make a datablock for the size requested + something. The
- // "something" is required because we are going to align the data
- // block in the message block. During alignment we could loose some
- // bytes. As we may not know how many bytes will be lost, we will
- // allocate ACE_CDR::MAX_ALIGNMENT extra.
- ACE_Data_Block *db =
- this->orb_core_->create_input_cdr_data_block (sz +
- ACE_CDR::MAX_ALIGNMENT);
-
- ACE_Allocator *alloc =
- this->orb_core_->input_cdr_msgblock_allocator ();
-
- ACE_Message_Block mb (db,
- 0,
- alloc);
-
- ACE_Message_Block *new_mb = mb.duplicate ();
-
- ACE_CDR::mb_align (new_mb);
-
- qd->msg_block_ = new_mb;
-
- return qd;
-}
-
int
TAO_GIOP_Message_Lite::generate_locate_reply_header (
TAO_OutputCDR & /*cdr*/,
@@ -1679,3 +1651,22 @@ TAO_GIOP_Message_Lite::header_length (void) const
{
return TAO_GIOP_LITE_HEADER_LEN;
}
+
+void
+TAO_GIOP_Message_Lite::set_queued_data_from_message_header (
+ TAO_Queued_Data *qd,
+ const ACE_Message_Block &mb
+ ) const
+{
+ ACE_UNUSED_ARG (qd);
+ ACE_UNUSED_ARG (mb);
+}
+
+int
+TAO_GIOP_Message_Lite::check_for_valid_header (
+ const ACE_Message_Block &mb
+ ) const
+{
+ ACE_UNUSED_ARG (mb);
+ return 0;
+}
diff --git a/TAO/tao/GIOP_Message_Lite.h b/TAO/tao/GIOP_Message_Lite.h
index b10ef17982c..bfc7f5f01ad 100644
--- a/TAO/tao/GIOP_Message_Lite.h
+++ b/TAO/tao/GIOP_Message_Lite.h
@@ -88,8 +88,11 @@ public:
/// the message.
virtual int format_message (TAO_OutputCDR &cdr);
- /// Parse the incoming messages..
- virtual int parse_incoming_messages (ACE_Message_Block &message_block);
+ /// Process the request message that we have received on the
+ /// connection
+ virtual int process_request_message (TAO_Transport *transport,
+ TAO_Queued_Data *qd);
+
/// Get the message type. The return value would be one of the
/// following:
@@ -99,33 +102,25 @@ public:
/// TAO_PLUGGABLE_MESSAGE_MESSAGE_ERROR.
TAO_Pluggable_Message_Type message_type (void);
+ /*!
+ \brief Inspects the bytes in \param mb to see if they "look like" the beginning of a message.
- /// Calculate the amount of data that is missing in the <incoming>
- /// message block.
- virtual ssize_t missing_data (ACE_Message_Block &message_block);
-
- /* Extract the details of the next message from the <incoming>
- * through <qd>. Returns 1 if there are more messages and returns a
- * 0 if there are no more messages in <incoming>.
+ Inspects the bytes in \param mb, beginning at \code mb.rd_ptr, to
+ see if they look like the beginning of a message. Does
*/
- virtual int extract_next_message (ACE_Message_Block &incoming,
- TAO_Queued_Data *&qd);
-
- /// Check whether the node <qd> needs consolidation from <incoming>
- virtual int consolidate_node (TAO_Queued_Data *qd,
- ACE_Message_Block &incoming);
+ virtual int check_for_valid_header (const ACE_Message_Block &mb) const;
- /// Get the details of the message parsed through the <qd>.
- virtual void get_message_data (TAO_Queued_Data *qd);
+ /*!
+ \brief Set fields in \param qd based on values derived from \param mb.
- /// @@Bala: Docu???
- virtual int consolidate_fragments (TAO_Queued_Data *dqd,
- const TAO_Queued_Data *sqd);
-
- /// Process the request message that we have received on the
- /// connection
- virtual int process_request_message (TAO_Transport *transport,
- TAO_Queued_Data *qd);
+ This function sets fields in \param qd based on values derived
+ from \param mb. It assumes that if the length of \param mb is
+ enough to hold a header, then the data in there can be trusted to
+ make sense.
+ */
+ virtual void set_queued_data_from_message_header (
+ TAO_Queued_Data *,
+ const ACE_Message_Block &mb) const;
/// Parse the reply message that we received and return the reply
/// information though <reply_info>
diff --git a/TAO/tao/GIOP_Message_State.cpp b/TAO/tao/GIOP_Message_State.cpp
index e3a3ca20bf3..1d923ecb45f 100644
--- a/TAO/tao/GIOP_Message_State.cpp
+++ b/TAO/tao/GIOP_Message_State.cpp
@@ -10,15 +10,54 @@
# include "tao/GIOP_Message_State.inl"
#endif /* __ACE_INLINE__ */
-ACE_RCSID (tao,
- GIOP_Message_State,
- "$Id$")
+
+class TAO_Debug_Msg_Emitter_Guard
+{
+public:
+ TAO_Debug_Msg_Emitter_Guard (unsigned int debug_level, const char* msg)
+ : which_level_(debug_level)
+ {
+ if (TAO_debug_level < this->which_level_)
+ {
+ msg_ = 0;
+ return;
+ }
+ this->msg_ = new char[ACE_OS::strlen (msg) + MAGIC_LENGTH ];
+ ACE_OS::strcpy (this->msg_, msg);
+ ACE_OS::strcat (this->msg_, " begin\n");
+ if (TAO_debug_level >= this->which_level_)
+ ACE_DEBUG ((LM_DEBUG, this->msg_ ));
+ }
+
+ ~TAO_Debug_Msg_Emitter_Guard ()
+ {
+ if (this->msg_)
+ {
+ if (TAO_debug_level >= this->which_level_)
+ {
+ char* begin_start =
+ this->msg_ + ACE_OS::strlen(this->msg_) - MAGIC_LENGTH + 1;
+ ACE_OS::strcpy (begin_start, " end\n");
+ ACE_DEBUG ((LM_DEBUG, this->msg_));
+ }
+ delete[] this->msg_;
+ }
+ }
+
+private:
+ static const int MAGIC_LENGTH;
+ unsigned int which_level_;
+ char* msg_;
+};
+
+const int TAO_Debug_Msg_Emitter_Guard::MAGIC_LENGTH = 8; // " begin\n" + \000
+
+ACE_RCSID(tao, GIOP_Message_State, "$Id$")
TAO_GIOP_Message_State::TAO_GIOP_Message_State (
TAO_ORB_Core * /*orb_core*/,
- TAO_GIOP_Message_Base *base)
- : base_ (base),
- giop_version_ (TAO_DEF_GIOP_MAJOR,
+ TAO_GIOP_Message_Base * /*base*/)
+ : giop_version_ (TAO_DEF_GIOP_MAJOR,
TAO_DEF_GIOP_MINOR),
byte_order_ (0),
message_type_ (0),
@@ -29,125 +68,64 @@ TAO_GIOP_Message_State::TAO_GIOP_Message_State (
{
}
-
+// This doesn't check the message block's length, so that means that
+// the *caller* needs to do that first.
int
-TAO_GIOP_Message_State::parse_message_header (ACE_Message_Block &incoming)
+TAO_GIOP_Message_State::take_values_from_message_block (
+ const ACE_Message_Block& mb
+ )
{
- if (incoming.length () >= TAO_GIOP_MESSAGE_HEADER_LEN)
- {
- // Parse the GIOP header
- if (this->parse_message_header_i (incoming) == -1)
- return -1;
- }
-
- return 0;
-}
-
-int
-TAO_GIOP_Message_State::parse_message_header_i (ACE_Message_Block &incoming)
-{
- if (TAO_debug_level > 8)
- {
- ACE_DEBUG ((LM_DEBUG,
- "TAO (%P|%t) - GIOP_Message_State::parse_message_header_i\n"
- ));
- }
-
- // Grab the rd_ptr_ from the message block..
- char *buf = incoming.rd_ptr ();
-
- // Parse the magic bytes first
- if (this->parse_magic_bytes (buf) == -1)
- {
- return -1;
- }
+ const char* buf = mb.rd_ptr ();
// Get the version information
- if (this->get_version_info (buf) == -1)
+ if (this->set_version_info_from_buffer (buf) == -1)
return -1;
// Get the byte order information...
- if (this->get_byte_order_info (buf) == -1)
+ if (this->set_byte_order_info_from_buffer (buf) == -1)
return -1;
// Get the message type
this->message_type_ = buf[TAO_GIOP_MESSAGE_TYPE_OFFSET];
-
// Get the size of the message..
- this->get_payload_size (buf);
+ this->set_payload_size_from_buffer (buf);
+
+ // Get the request id
+ this->parse_fragment_header (buf, mb.length ());
if (this->message_size_ == 0)
{
- if (this->message_type_ == TAO_GIOP_MESSAGERROR)
+ const char* msgname = 0;
+ switch (this->message_type_)
+ {
+ case TAO_GIOP_MESSAGERROR:
+ msgname = "GIOP_MESSAGE_ERROR"; break;
+ case TAO_GIOP_CLOSECONNECTION:
+ msgname = "GIOP_CLOSE_CONNECTION"; break;
+ }
+ if (msgname != 0)
{
if (TAO_debug_level > 0)
- {
- ACE_DEBUG ((LM_DEBUG,
- "TAO (%P|%t) -"
- "GIOP_MESSAGE_ERROR received \n"));
- }
- return 0;
+ ACE_DEBUG ((LM_DEBUG, "(%P|%t) GIOP_Message_State::take_values: %s rcv'd.\n",
+ msgname));
}
else
{
if (TAO_debug_level > 0)
- ACE_DEBUG ((LM_DEBUG,
- "TAO (%P|%t) - "
- "Message of size zero recd. \n"));
+ ACE_DEBUG ((LM_DEBUG, "(%P|%t) GIOP_Message_State::take_values: Message of size zero rcv'd.\n"));
return -1;
}
}
-
- if (this->more_fragments_)
- {
- (void) this->parse_fragment_header (buf,
- incoming.length ());
- }
-
return 0;
}
-
-
-
-int
-TAO_GIOP_Message_State::parse_magic_bytes (char *buf)
-{
- // The values are hard-coded to support non-ASCII platforms.
- if (!(buf [0] == 0x47 // 'G'
- && buf [1] == 0x49 // 'I'
- && buf [2] == 0x4f // 'O'
- && buf [3] == 0x50)) // 'P'
- {
- if (TAO_debug_level > 0)
- ACE_DEBUG ((LM_DEBUG,
- ACE_LIB_TEXT ("TAO (%P|%t) - bad header, ")
- ACE_LIB_TEXT ("magic word [%2.2x,%2.2x,%2.2x,%2.2x]\n"),
- buf[0],
- buf[1],
- buf[2],
- buf[3]));
- return -1;
- }
-
- return 0;
-}
-
int
-TAO_GIOP_Message_State::get_version_info (char *buf)
+TAO_GIOP_Message_State::set_version_info_from_buffer (const char *buf)
{
- if (TAO_debug_level > 8)
- {
- ACE_DEBUG ((LM_DEBUG,
- "TAO (%P|%t) - GIOP_Message_State::get_version_info\n"));
- }
-
// We have a GIOP message on hand. Get its revision numbers
- CORBA::Octet incoming_major =
- buf[TAO_GIOP_VERSION_MAJOR_OFFSET];
- CORBA::Octet incoming_minor =
- buf[TAO_GIOP_VERSION_MINOR_OFFSET];
+ CORBA::Octet incoming_major = buf[TAO_GIOP_VERSION_MAJOR_OFFSET];
+ CORBA::Octet incoming_minor = buf[TAO_GIOP_VERSION_MINOR_OFFSET];
// Check the revision information
if (TAO_GIOP_Message_Generator_Parser_Impl::check_revision (
@@ -157,7 +135,9 @@ TAO_GIOP_Message_State::get_version_info (char *buf)
if (TAO_debug_level > 0)
{
ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("TAO (%P|%t) - bad version <%d.%d>\n"),
+ ACE_TEXT ("TAO (%P|%t) - ")
+ ACE_TEXT ("GIOP_Message_State::set_version_info_from_buffer:")
+ ACE_TEXT ("bad version <%d.%d>\n"),
incoming_major, incoming_minor));
}
@@ -172,15 +152,9 @@ TAO_GIOP_Message_State::get_version_info (char *buf)
}
int
-TAO_GIOP_Message_State::get_byte_order_info (char *buf)
+TAO_GIOP_Message_State::set_byte_order_info_from_buffer (const char *buf)
{
- if (TAO_debug_level > 8)
- {
- ACE_DEBUG ((LM_DEBUG,
- "TAO (%P|%t) - GIOP_Message_State::get_byte_order_info\n"));
- }
-
- // Let us be specific that this is for 1.0
+ // Let us be specific that this is for 1.0
if (this->giop_version_.minor == 0 &&
this->giop_version_.major == 1)
{
@@ -224,19 +198,10 @@ TAO_GIOP_Message_State::get_byte_order_info (char *buf)
return 0;
}
-void
-TAO_GIOP_Message_State::get_payload_size (char *rd_ptr)
-{
- // Move the read pointer
- rd_ptr += TAO_GIOP_MESSAGE_SIZE_OFFSET;
-
- this->message_size_ = this->read_ulong (rd_ptr);
-}
-
int
-TAO_GIOP_Message_State::parse_fragment_header (char *buf,
+TAO_GIOP_Message_State::parse_fragment_header (const char *buf,
size_t length)
{
size_t len =
@@ -246,9 +211,7 @@ TAO_GIOP_Message_State::parse_fragment_header (char *buf,
// By this point we are doubly sure that we have a more or less
// valid GIOP message with a valid major revision number.
- if (this->giop_version_.minor == 2 &&
- this->message_type_ == TAO_GIOP_FRAGMENT &&
- length > len)
+ if (this->giop_version_.minor >= 2 && length > len)
{
// Fragmented message in GIOP 1.2 should have a fragment header
// following the GIOP header. Grab the rd_ptr to get that
@@ -263,7 +226,7 @@ TAO_GIOP_Message_State::parse_fragment_header (char *buf,
}
CORBA::ULong
-TAO_GIOP_Message_State::read_ulong (char *rd_ptr)
+TAO_GIOP_Message_State::read_ulong (const char *rd_ptr)
{
CORBA::ULong x = 0;
diff --git a/TAO/tao/GIOP_Message_State.h b/TAO/tao/GIOP_Message_State.h
index f902fa03a0e..9ae3db82230 100644
--- a/TAO/tao/GIOP_Message_State.h
+++ b/TAO/tao/GIOP_Message_State.h
@@ -41,11 +41,10 @@ class TAO_Export TAO_GIOP_Message_State
public:
/// Ctor
- TAO_GIOP_Message_State (TAO_ORB_Core *orb_core,
- TAO_GIOP_Message_Base *base);
+ TAO_GIOP_Message_State (TAO_ORB_Core *orb_core = 0,
+ TAO_GIOP_Message_Base *base = 0);
- /// Parse the message header.
- int parse_message_header (ACE_Message_Block &incoming);
+ int take_values_from_message_block (const ACE_Message_Block& mb);
/// Return the message size
CORBA::ULong message_size (void) const;
@@ -53,9 +52,24 @@ public:
/// Return the message size
CORBA::ULong payload_size (void) const;
- /// Return the byte order information
+ /*!
+ \brief Return the byte order information.
+ \return 0 big-endian
+ \return 1 little-endian
+ */
CORBA::Octet byte_order (void) const;
+ /*!
+ \brief Return GIOP version information.
+ */
+ const TAO_GIOP_Message_Version &giop_version () const;
+
+ /// (Requests and Replys)
+ CORBA::Octet more_fragments () const;
+
+ /// MsgType above
+ CORBA::Octet message_type () const;
+
/// Reset the state..
void reset (void);
@@ -63,40 +77,30 @@ private:
friend class TAO_GIOP_Message_Base;
- /// Parse the message header.
- int parse_message_header_i (ACE_Message_Block &incoming);
-
- /// Checks for the magic word 'GIOP' in the start of the incoing
- /// stream
- int parse_magic_bytes (char *buf);
-
/// Extracts the version information from the incoming
/// stream. Performs a check for whether the version information is
/// right and sets the information in the <state>
- int get_version_info (char *buf);
+ int set_version_info_from_buffer (const char *buf);
/// Extracts the byte order information from the incoming
/// stream. Performs a check for whether the byte order information
/// right and sets the information in the <state>
- int get_byte_order_info (char *buf);
+ int set_byte_order_info_from_buffer (const char *buf);
/// Gets the size of the payload and set the size in the <state>
- void get_payload_size (char *buf);
+ void set_payload_size_from_buffer (const char *buf);
/// Parses the GIOP FRAGMENT_HEADER information from the incoming
/// stream.
- int parse_fragment_header (char *buf,
+ int parse_fragment_header (const char *buf,
size_t length);
/// Read the unsigned long from the buffer. The <buf> should just
/// point to the next 4 bytes data that represent the ULong
- CORBA::ULong read_ulong (char *buf);
+ CORBA::ULong read_ulong (const char *buf);
private:
- /// The GIOP base class..
- TAO_GIOP_Message_Base *base_;
-
// GIOP version information..
TAO_GIOP_Message_Version giop_version_;
diff --git a/TAO/tao/GIOP_Message_State.inl b/TAO/tao/GIOP_Message_State.inl
index fe076bee689..80d421c7340 100644
--- a/TAO/tao/GIOP_Message_State.inl
+++ b/TAO/tao/GIOP_Message_State.inl
@@ -33,22 +33,29 @@ TAO_GIOP_Message_State::reset (void)
this->missing_data_ = 0;
}
-#if 0
-ACE_INLINE int
-TAO_GIOP_Message_State::message_fragmented (void)
+ACE_INLINE const TAO_GIOP_Message_Version &
+TAO_GIOP_Message_State::giop_version () const
{
- if (this->more_fragments)
- return 1;
-
- return 0;
+ return this->giop_version_;
}
+ACE_INLINE CORBA::Octet
+TAO_GIOP_Message_State::more_fragments () const
+{
+ return this->more_fragments_;
+}
-
-ACE_INLINE CORBA::Boolean
-TAO_GIOP_Message_State::header_received (void) const
+ACE_INLINE CORBA::Octet
+TAO_GIOP_Message_State::message_type () const
{
- return this->message_size != 0;
+ return this->message_type_;
}
-#endif
+ACE_INLINE void
+TAO_GIOP_Message_State::set_payload_size_from_buffer (const char *rd_ptr)
+{
+ // Move the read pointer
+ rd_ptr += TAO_GIOP_MESSAGE_SIZE_OFFSET;
+
+ this->message_size_ = this->read_ulong (rd_ptr);
+}
diff --git a/TAO/tao/IIOP_Transport.cpp b/TAO/tao/IIOP_Transport.cpp
index 1d802aa25c7..25b16308ff1 100644
--- a/TAO/tao/IIOP_Transport.cpp
+++ b/TAO/tao/IIOP_Transport.cpp
@@ -95,7 +95,7 @@ TAO_IIOP_Transport::recv (char *buf,
{
ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("TAO (%P|%t) - IIOP_Transport[%d]::recv_i, ")
+ ACE_TEXT ("TAO (%P|%t) - IIOP_Transport[%d]::recv, ")
ACE_TEXT ("read failure - %m\n"),
this->id ()));
}
diff --git a/TAO/tao/Incoming_Message_Queue.cpp b/TAO/tao/Incoming_Message_Queue.cpp
index 14970106b50..a510be13f4b 100644
--- a/TAO/tao/Incoming_Message_Queue.cpp
+++ b/TAO/tao/Incoming_Message_Queue.cpp
@@ -1,7 +1,8 @@
#include "Incoming_Message_Queue.h"
+#include "Pluggable_Messaging.h"
#include "debug.h"
-
-#include "ace/Log_Msg.h"
+#include "ace/Malloc_T.h"
+#include "ace/Message_Block.h"
#if !defined (__ACE_INLINE__)
# include "Incoming_Message_Queue.inl"
@@ -12,7 +13,7 @@ ACE_RCSID (tao,
"$Id$")
TAO_Incoming_Message_Queue::TAO_Incoming_Message_Queue (TAO_ORB_Core *orb_core)
- : queued_data_ (0),
+ : last_added_ (0),
size_ (0),
orb_core_ (orb_core)
{
@@ -42,21 +43,21 @@ TAO_Incoming_Message_Queue::copy_tail (ACE_Message_Block &block)
{
// Check to see if the length of the incoming block is less than
// that of the <missing_data_> of the tail.
- if ((CORBA::Long)block.length () <= this->queued_data_->missing_data_)
+ if (block.length () <= this->last_added_->missing_data_bytes_)
{
n = block.length ();
}
else
{
- n = this->queued_data_->missing_data_;
+ n = this->last_added_->missing_data_bytes_;
}
// Do the copy
- this->queued_data_->msg_block_->copy (block.rd_ptr (),
+ this->last_added_->msg_block_->copy (block.rd_ptr (),
n);
// Decerement the missing data
- this->queued_data_->missing_data_ -= n;
+ this->last_added_->missing_data_bytes_ -= n;
}
return n;
@@ -65,17 +66,20 @@ TAO_Incoming_Message_Queue::copy_tail (ACE_Message_Block &block)
TAO_Queued_Data *
TAO_Incoming_Message_Queue::dequeue_head (void)
{
+ if (this->size_ == 0)
+ return 0;
+
// Get the node on the head of the queue...
- TAO_Queued_Data *tmp =
- this->queued_data_->next_;
+ TAO_Queued_Data *head = this->last_added_->next_;
// Reset the head node..
- this->queued_data_->next_ = tmp->next_;
-
- // Decrease the size
- --this->size_;
+ this->last_added_->next_ = head->next_;
+
+ // Decrease the size and reset last_added_ if empty
+ if (--this->size_ == 0)
+ this->last_added_ = 0;
- return tmp;
+ return head;
}
TAO_Queued_Data *
@@ -86,95 +90,412 @@ TAO_Incoming_Message_Queue::dequeue_tail (void)
return 0;
// Get the node on the head of the queue...
- TAO_Queued_Data *tmp =
- this->queued_data_->next_;
+ TAO_Queued_Data *head =
+ this->last_added_->next_;
- while (tmp->next_ != this->queued_data_)
+ while (head->next_ != this->last_added_)
{
- tmp = tmp->next_;
+ head = head->next_;
}
// Put the head in tmp.
- tmp->next_ = this->queued_data_->next_;
+ head->next_ = this->last_added_->next_;
- TAO_Queued_Data *ret_qd = this->queued_data_;
+ TAO_Queued_Data *ret_qd = this->last_added_;
- this->queued_data_ = tmp;
+ this->last_added_ = head;
// Decrease the size
- --this->size_;
+ if (--this->size_ == 0)
+ this->last_added_ = 0;
return ret_qd;
}
-
int
TAO_Incoming_Message_Queue::enqueue_tail (TAO_Queued_Data *nd)
{
if (this->size_ == 0)
{
- this->queued_data_ = nd;
- this->queued_data_->next_ = this->queued_data_;
+ this->last_added_ = nd;
+ this->last_added_->next_ = this->last_added_;
}
else
{
- nd->next_ = this->queued_data_->next_;
- this->queued_data_->next_ = nd;
- this->queued_data_ = nd;
+ nd->next_ = this->last_added_->next_;
+ this->last_added_->next_ = nd;
+ this->last_added_ = nd;
}
++ this->size_;
return 0;
}
+TAO_Queued_Data *
+TAO_Incoming_Message_Queue::find_fragment (CORBA::Octet major,
+ CORBA::Octet minor) const
+{
+ TAO_Queued_Data *found = 0;
+ if (this->last_added_ != 0)
+ {
+ TAO_Queued_Data *qd = this->last_added_->next_;
+
+ do {
+ if (qd->more_fragments_ &&
+ qd->major_version_ == major && qd->minor_version_ == minor)
+ {
+ found = qd;
+ }
+ else
+ {
+ qd = qd->next_;
+ }
+ } while (found == 0 && qd != this->last_added_->next_);
+ }
+
+ return found;
+}
+
+TAO_Queued_Data *
+TAO_Incoming_Message_Queue::find_fragment (CORBA::ULong request_id) const
+{
+ TAO_Queued_Data *found = 0;
+ if (this->last_added_ != 0)
+ {
+ TAO_Queued_Data *qd = this->last_added_->next_;
+
+ do {
+ if (qd->more_fragments_ && qd->request_id_ == request_id)
+ {
+ found = qd;
+ }
+ else
+ {
+ qd = qd->next_;
+ }
+ } while (found == 0 && qd != this->last_added_->next_);
+ }
+
+ return found;
+}
+
/************************************************************************/
// Methods for TAO_Queued_Data
/************************************************************************/
TAO_Queued_Data::TAO_Queued_Data (ACE_Allocator *alloc)
- : msg_block_ (0),
- missing_data_ (0),
- byte_order_ (0),
- major_version_ (0),
- minor_version_ (0),
- more_fragments_ (0),
- msg_type_ (TAO_PLUGGABLE_MESSAGE_MESSAGERROR),
- next_ (0),
- allocator_ (alloc)
+ : msg_block_ (0)
+ , current_state_ (INVALID)
+ , missing_data_bytes_ (0)
+ , byte_order_ (0)
+ , major_version_ (0)
+ , minor_version_ (0)
+ , more_fragments_ (0)
+ , request_id_ (0)
+ , msg_type_ (TAO_PLUGGABLE_MESSAGE_MESSAGERROR)
+ , next_ (0)
+ , allocator_ (alloc)
{
}
TAO_Queued_Data::TAO_Queued_Data (ACE_Message_Block *mb,
ACE_Allocator *alloc)
- : msg_block_ (mb),
- missing_data_ (0),
- byte_order_ (0),
- major_version_ (0),
- minor_version_ (0),
- more_fragments_ (0),
- msg_type_ (TAO_PLUGGABLE_MESSAGE_MESSAGERROR),
- next_ (0),
- allocator_ (alloc)
+ : msg_block_ (mb)
+ , current_state_ (INVALID)
+ , missing_data_bytes_ (0)
+ , byte_order_ (0)
+ , major_version_ (0)
+ , minor_version_ (0)
+ , more_fragments_ (0)
+ , request_id_ (0)
+ , msg_type_ (TAO_PLUGGABLE_MESSAGE_MESSAGERROR)
+ , next_ (0)
+ , allocator_ (alloc)
{
}
TAO_Queued_Data::TAO_Queued_Data (const TAO_Queued_Data &qd)
- : msg_block_ (qd.msg_block_->duplicate ()),
- missing_data_ (qd.missing_data_),
- byte_order_ (qd.byte_order_),
- major_version_ (qd.major_version_),
- minor_version_ (qd.minor_version_),
- more_fragments_ (qd.more_fragments_),
- msg_type_ (qd.msg_type_),
- next_ (0),
- allocator_ (qd.allocator_)
+ : msg_block_ (qd.msg_block_->duplicate ())
+ , current_state_ (qd.current_state_)
+ , missing_data_bytes_ (qd.missing_data_bytes_)
+ , byte_order_ (qd.byte_order_)
+ , major_version_ (qd.major_version_)
+ , minor_version_ (qd.minor_version_)
+ , more_fragments_ (qd.more_fragments_)
+ , request_id_ (qd.request_id_)
+ , msg_type_ (qd.msg_type_)
+ , next_ (0)
+ , allocator_ (qd.allocator_)
{
}
+
+/*!
+ \brief Allocate and return a new empty message block of size \a new_size mimicking parameters of \a mb.
+
+ This function allocates a new aligned message block using the same
+ allocators and flags as found in \a mb. The size of the new message
+ block is at least \a new_size; the size may be adjusted up in order
+ to accomodate alignment requirements and still fit \a new_size bytes
+ into the aligned buffer.
+
+ \param mb message block whose parameters should be mimicked
+ \param new_size size of the new message block (will be adjusted for proper alignment)
+ \return an aligned message block with rd_ptr sitting at correct alignment spot, 0 on failure
+
+ \author Thanks to Rich Seibel for helping implement with the public API for ACE_Message_Block!
+ */
+static ACE_Message_Block*
+clone_mb_nocopy_size (ACE_Message_Block *mb, size_t span_size)
+{
+ // Calculate the required size of the cloned block with alignment
+ size_t aligned_size = ACE_CDR::first_size (span_size + ACE_CDR::MAX_ALIGNMENT);
+
+ // Get the allocators
+ ACE_Allocator *data_allocator;
+ ACE_Allocator *data_block_allocator;
+ ACE_Allocator *message_block_allocator;
+ mb->access_allocators (data_allocator,
+ data_block_allocator,
+ message_block_allocator);
+
+ // Create a new Message Block
+ ACE_Message_Block *nb;
+ ACE_NEW_MALLOC_RETURN (nb,
+ ACE_static_cast(ACE_Message_Block*,
+ message_block_allocator->malloc (
+ sizeof (ACE_Message_Block))),
+ ACE_Message_Block(aligned_size,
+ mb->msg_type(),
+ mb->cont(),
+ 0, //we want the data block created
+ data_allocator,
+ mb->locking_strategy(),
+ mb->msg_priority(),
+ mb->msg_execution_time (),
+ mb->msg_deadline_time (),
+ data_block_allocator,
+ message_block_allocator),
+ 0);
+
+ ACE_CDR::mb_align (nb);
+
+ // Copy the flags over, but be SURE to clear the DONT_DELETE flag, since
+ // we just dynamically allocated the two things.
+ nb->set_flags (mb->flags());
+ nb->clr_flags (ACE_Message_Block::DONT_DELETE);
+
+ return nb;
+}
+
+/*!
+ \brief Copy data from \a src->rd_ptr to \a dst->wr_ptr, of at most \a span_size bytes.
+
+ (This is similar to memcpy, although with message blocks we can be a
+ little smarter.) This function assumes that \a dst has enough space
+ for \a span_size bytes, and that \a src has at least \a span_size
+ bytes available to copy. When everything is copied \a dst->wr_ptr
+ gets updated accordingly, but \a src->rd_ptr is left to the caller
+ to update.
+
+ \param dst the destination message block
+ \param src the source message block
+ \param span_size size of the maximum span of bytes to be copied
+ \return 0 on failure, otherwise \a dst
+ */
+static ACE_Message_Block*
+copy_mb_span (ACE_Message_Block *dst, ACE_Message_Block *src, size_t span_size)
+{
+ // @todo check for enough space in dst, and src contains at least span_size
+
+ if (src == 0 || dst == 0)
+ return 0;
+
+ if (span_size == 0)
+ return dst;
+
+ dst->copy (src->rd_ptr (), span_size);
+ return dst;
+}
+
/*static*/
TAO_Queued_Data *
-TAO_Queued_Data::get_queued_data (ACE_Allocator *alloc)
+TAO_Queued_Data::make_uncompleted_message (ACE_Message_Block *mb,
+ TAO_Pluggable_Messaging &msging_obj,
+ ACE_Allocator *alloc)
+{
+ register TAO_Queued_Data *new_qd = 0;
+ register const size_t HDR_LEN = msging_obj.header_length (); /* COMPUTE ONCE! */
+ register const size_t MB_LEN = mb->length (); /* COMPUTE ONCE! */
+
+ // Validate arguments.
+ if (mb == 0)
+ goto failure;
+
+ new_qd = make_queued_data (alloc);
+ if (new_qd == 0)
+ goto failure;
+
+ // do we have enough bytes to make a complete header?
+ if (MB_LEN >= HDR_LEN)
+ {
+ // Since we have enough bytes to make a complete header,
+ // the header needs to be valid. Check that now, and punt
+ // if it's not valid.
+ if (! msging_obj.check_for_valid_header (*mb))
+ {
+ goto failure;
+ }
+ else
+ {
+ new_qd->current_state_ = WAITING_TO_COMPLETE_PAYLOAD;
+ msging_obj.set_queued_data_from_message_header (new_qd, *mb);
+ if (new_qd->current_state_ == INVALID)
+ goto failure;
+
+ // missing_data_bytes_ now has the full GIOP message size, so we allocate
+ // a new message block of that size, plus the header.
+ new_qd->msg_block_ = clone_mb_nocopy_size (mb,
+ new_qd->missing_data_bytes_ +
+ HDR_LEN);
+ // Of course, we don't have the whole message (if we did, we
+ // wouldn't be here!), so we copy only what we've got, i.e., whatever's
+ // in the message block.
+ if (copy_mb_span (new_qd->msg_block_, mb, MB_LEN) == 0)
+ goto failure;
+
+ // missing_data_bytes_ now has the full GIOP message size, but
+ // there might still be stuff in mb. Therefore, we have to adjust
+ // missing_data_bytes_, i.e., decrease it by the number of "actual
+ // payload bytes" in mb.
+ //
+ // "actual payload bytes" :== length of mb (which included the header) - header length
+ new_qd->missing_data_bytes_ -= (MB_LEN - HDR_LEN);
+ mb->rd_ptr (MB_LEN);
+ }
+ }
+ else
+ {
+ new_qd->current_state_ = WAITING_TO_COMPLETE_HEADER;
+ new_qd->msg_block_ = clone_mb_nocopy_size (mb, HDR_LEN);
+ if (new_qd->msg_block_ == 0 ||
+ copy_mb_span (new_qd->msg_block_, mb, MB_LEN) == 0)
+ goto failure;
+ new_qd->missing_data_bytes_ = HDR_LEN - MB_LEN;
+ mb->rd_ptr (MB_LEN);
+ }
+
+ ACE_ASSERT (new_qd->current_state_ != INVALID);
+ if (TAO_debug_level > 7)
+ {
+ const char* s = "?unk?";
+ switch (new_qd->current_state_)
+ {
+ case WAITING_TO_COMPLETE_HEADER: s = "WAITING_TO_COMPLETE_HEADER"; break;
+ case WAITING_TO_COMPLETE_PAYLOAD: s = "WAITING_TO_COMPLETE_PAYLOAD"; break;
+ case INVALID: s = "INVALID"; break;
+ case COMPLETED: s = "COMPLETED"; break;
+ }
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("TAO (%P|%t) Queued_Data::make_uncompleted_message: ")
+ ACE_TEXT ("made uncompleted message from %u bytes into qd=%-08x:")
+ ACE_TEXT ("state=%s,missing_data_bytes=%u\n"),
+ new_qd->msg_block_->length(), new_qd, s, new_qd->missing_data_bytes_));
+ }
+ return new_qd;
+
+failure:
+ if (TAO_debug_level > 7)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("TAO (%P|%t) Queued_Data::make_uncompleted_message: ")
+ ACE_TEXT ("failed to make uncompleted message: mb=%-08x, qd=%-08x\n"),
+ mb, new_qd));
+ }
+ TAO_Queued_Data::release (new_qd);
+ return 0;
+}
+
+
+/*static*/
+TAO_Queued_Data *
+TAO_Queued_Data::make_completed_message (ACE_Message_Block &mb,
+ TAO_Pluggable_Messaging &msging_obj,
+ ACE_Allocator *alloc)
+{
+ register const size_t HDR_LEN = msging_obj.header_length ();
+ register const size_t MB_LEN = mb.length ();
+
+ // Validate arguments.
+ if (MB_LEN < HDR_LEN)
+ return 0;
+
+ size_t total_msg_len = 0;
+ register TAO_Queued_Data *new_qd = make_queued_data (alloc);
+ if (new_qd == 0)
+ goto failure;
+
+ // We can assume that there are enough bytes for a header, so
+ // extract the header data. Don't assume that there's enough for
+ // the payload just yet.
+ new_qd->current_state_ = WAITING_TO_COMPLETE_PAYLOAD;
+ msging_obj.set_queued_data_from_message_header (new_qd, mb);
+ if (new_qd->current_state_ == INVALID)
+ goto failure;
+
+ // new_qd_->missing_data_bytes_ + protocol header length should be
+ // *at least* the length of the message. Verify that we have that
+ // many bytes in the message block and, if we don't, release the new
+ // qd and fail.
+ total_msg_len = new_qd->missing_data_bytes_ + HDR_LEN;
+ if (total_msg_len > MB_LEN)
+ goto failure;
+
+ // Make a copy of the relevant portion of mb and hang on to it
+ if ((new_qd->msg_block_ = clone_mb_nocopy_size (&mb, total_msg_len)) == 0)
+ goto failure;
+
+ if (copy_mb_span (new_qd->msg_block_, &mb, total_msg_len) == 0)
+ goto failure;
+
+ // Update missing data and the current state
+ new_qd->missing_data_bytes_ = 0;
+ new_qd->current_state_ = COMPLETED;
+
+ // Advance the rd_ptr on the message block
+ mb.rd_ptr (total_msg_len);
+
+ if (TAO_debug_level > 7)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("TAO (%P|%t) Queued_Data::make_complete_message: ")
+ ACE_TEXT ("extracted complete message (%u bytes incl hdr) from mblk=%-08x into qd=%-08x\n"),
+ total_msg_len, &mb, new_qd));
+ }
+
+ return new_qd;
+
+failure:
+ if (TAO_debug_level > 7)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("TAO (%P|%t) Queued_Data::make_complete_message: ")
+ ACE_TEXT ("failed to find complete message in mblk=%-08x; leaving %u bytes in block\n"),
+ &mb, MB_LEN));
+ if (TAO_debug_level >= 10)
+ ACE_HEX_DUMP ((LM_DEBUG,
+ mb.rd_ptr (), MB_LEN,
+ ACE_TEXT (" residual bytes in buffer")));
+
+ }
+ TAO_Queued_Data::release (new_qd);
+ return 0;
+}
+
+/*static*/
+TAO_Queued_Data *
+TAO_Queued_Data::make_queued_data (ACE_Allocator *alloc)
{
TAO_Queued_Data *qd = 0;
@@ -281,3 +602,29 @@ TAO_Queued_Data::duplicate (TAO_Queued_Data &sqd)
return qd;
}
+
+void
+TAO_Queued_Data::consolidate (void)
+{
+ // Is this a chain of fragments?
+ if (this->more_fragments_ && this->msg_block_->cont () != 0)
+ {
+ // Create a message block big enough to hold the entire chain
+ ACE_Message_Block *dest = clone_mb_nocopy_size (
+ this->msg_block_,
+ this->msg_block_->total_length ());
+ // Reset the cont() parameter
+ dest->cont (0);
+
+ // Use ACE_CDR to consolidate the chain for us
+ ACE_CDR::consolidate (dest, this->msg_block_);
+
+ // free the original message block chain
+ this->msg_block_->release ();
+
+ // Set the message block to the new consolidated message block
+ this->msg_block_ = dest;
+ this->more_fragments_ = 0;
+ }
+}
+
diff --git a/TAO/tao/Incoming_Message_Queue.h b/TAO/tao/Incoming_Message_Queue.h
index 3adfc3d24ac..660a207090b 100644
--- a/TAO/tao/Incoming_Message_Queue.h
+++ b/TAO/tao/Incoming_Message_Queue.h
@@ -27,6 +27,7 @@ class ACE_Allocator;
class TAO_ORB_Core;
class TAO_Queued_Data;
class TAO_Transport;
+class TAO_Pluggable_Messaging;
/**
* @class TAO_Incoming_Message_Queue
@@ -75,31 +76,68 @@ public:
/// Return the length of the queue..
CORBA::ULong queue_length (void);
- /// Methods for sanity check. Checks to see whether the node on the
- /// head or tail is complete or not and ready for further
- /// processing.
+ /*!
+ @name Node Inspection Predicates
+
+ \brief These methods allow inspection of head and tail nodes for "completeness".
+
+ These methods check to see whether the node on the head or tail is
+ "complete" and ready for further processing. See each method's
+ documentation for its definition of "complete".
+ */
+ //@{
+ /*!
+ "complete" == the GIOP message at the tail is not missing any data (it may be a complete GIOP Fragment, though)
+
+ \return -1 queue is empty
+ \return 0 tail is not "complete"
+ \return 1 tail is "complete"
+ */
int is_tail_complete (void);
+
+ /*!
+
+ "complete" == the GIOP message at the head is not missing any data
+ AND, if it's the first message in a series of GIOP fragments, all
+ the fragments have been received, parsed, and placed into the
+ queue
+
+ \return -1 if queue is empty
+ \return 0 if head is not "complete"
+ \return 1 if head is "complete"
+ */
int is_head_complete (void);
+ //@}
- /// This method checks whether the last message that was queued up
- /// was fragmented...
+ /*!
+ \brief Check to see if the message at the tail (complete or incomplete) is a GIOP Fragment.
+ */
int is_tail_fragmented (void);
/// Return the size of data that is missing in tail of the queue.
size_t missing_data_tail (void) const;
/// void missing_data (size_t data);
+ /// Find the first fragment that matches the GIOP version
+ TAO_Queued_Data *find_fragment (CORBA::Octet major,
+ CORBA::Octet minor) const;
+
+ /// Find the first fragment that matches the request id
+ TAO_Queued_Data *find_fragment (CORBA::ULong request_id) const;
+
private:
friend class TAO_Transport;
- /// Make a node for the queue.
- TAO_Queued_Data *get_node (void);
-
private:
+ /*!
+ \brief A circular linked list of messages awaiting processing.
- /// A linked listof messages that await processing
- TAO_Queued_Data *queued_data_;
+ \a last_message_added_ points to the most recent message added to
+ the list. The earliest addition can be easily accessed via
+ \a last_message_added_->next_.
+ */
+ TAO_Queued_Data *last_added_;
/// The size of the queue
CORBA::ULong size_;
@@ -123,20 +161,73 @@ private:
class TAO_Export TAO_Queued_Data
{
-public:
+protected:
/// Default Constructor
TAO_Queued_Data (ACE_Allocator *alloc = 0);
/// Constructor.
TAO_Queued_Data (ACE_Message_Block *mb, ACE_Allocator *alloc = 0);
+public:
/// Copy constructor.
TAO_Queued_Data (const TAO_Queued_Data &qd);
- /// Creation and deletion of a node in the queue.
- static TAO_Queued_Data* get_queued_data (ACE_Allocator *alloc = 0);
+ /*!
+ \name Factory Methods
+
+ These methods manufacture instances of TAO_Queued_Data and return
+ them. These instances should be removed via TAO_Queued_Data::release.
+
+ Instances are initialized from data in the ACE_Message_Block,
+ interpreted according to rules defined in the
+ TAO_Pluggable_Messaging object.
+
+ The manufactured instance adopts the message block \em without
+ duplicating it; therefore, the caller must duplicate or orphan the
+ message block. The caller also must insure that the message block
+ can be released via ACE_Message_Block::release, and that its life
+ endures beyond the calling scope.
+
+ For the purposes of TAO_Queued_Data, a completed message is a
+ completely received message as defined by the messaging protocol
+ object. For GIOP, that means that the number of bytes specified
+ in the general GIOP header have been completely received. It
+ specifically DOES NOT mean that all \em fragments have been
+ received. Fragment reassembly is another matter altogether.
+ */
+ //@{
+ /*!
+ \brief Make and return an instance of TAO_Queued_Data suitable for use as an uncompleted message.
+ */
+ static TAO_Queued_Data* make_uncompleted_message (ACE_Message_Block *mb,
+ TAO_Pluggable_Messaging &msging_obj,
+ ACE_Allocator *alloc = 0);
+ /*!
+ \brief Make and return an instance of TAO_Queued_Data suitable for use as a completed message.
+ */
+ // THIS IMPLEMENTATION DOESN'T WORK THE SAME AS ITS USAGE!
+ // WE CAN'T JUST ADOPT mb, BECAUSE IT MAY CONTAIN MORE THAN
+ // ONE PROTOCOL MESSAGE. WE THEREFORE NEED TO CLONE IT. THIS
+ // MEANS UPDATING THE DOCUMENTATION, AND IT ALSO MEANS THAT IT
+ // BEHAVES DIFFERENTLY FROM make_uncompleted_message.
+ static TAO_Queued_Data* make_completed_message (ACE_Message_Block &mb,
+ TAO_Pluggable_Messaging &msging_obj,
+ ACE_Allocator *alloc = 0);
+
+ /// Consolidate this fragments chained message blocks into one.
+ void consolidate (void);
+
+ /*!
+ \brief Creation and deletion of a node in the queue.
+ \todo Maybe this should be private?
+ */
+private:
+ static TAO_Queued_Data* make_queued_data (ACE_Allocator *alloc = 0);
+public:
+ //@}
static void release (TAO_Queued_Data *qd);
+ void release (void);
/// Duplicate ourselves. This creates a copy of ourselves on the
/// heap and returns a pointer to the duplicated node.
@@ -146,11 +237,43 @@ public:
/// The message block that contains the message.
ACE_Message_Block *msg_block_;
- /// Data missing in the above message that hasn't been read or
- /// processed yet.
- CORBA::Long missing_data_;
-
- /// The byte order of the message that is stored in the node..
+ /*!
+ @name Missing Data details
+
+ The \a missing_data_bytes_ member contains the number of bytes of
+ data missing from \a msg_block_. However, there can be two places
+ where data is missing: header and payload. We cannot know how
+ much data is missing from the payload until we have a complete
+ header. Fortunately, headers are a fixed length, so we can know
+ how much we're missing from the header.
+
+ We use \param current_state_ to indicate which portion of the message
+ \param missing_data_bytes_ refers to, as well as the general state of
+ the message.
+ */
+ //@{
+ /*!
+ Describes the meaning given to the number stored in \a missing_data_bytes_.
+ */
+ enum Queued_Data_State
+ {
+ INVALID = -1, //!< The queued data is in an invalid/uninitialized state, and no data should be trusted.
+ COMPLETED = 0, //!< Message is complete; \a missing_data_bytes_ should be zero.
+ WAITING_TO_COMPLETE_HEADER, //!< Value in \a missing_data_bytes_ indicates part of header is missing.
+ WAITING_TO_COMPLETE_PAYLOAD //!< Value in \a missing_data_bytes_ indicates part of payload is missing.
+ };
+
+ /*!
+ Indicates the current state of the message, including hints at
+ how to interpret the value stored in \a missing_data_bytes_.
+ */
+ Queued_Data_State current_state_;
+
+ /*! Data missing in the above message that hasn't been read or processed yet. */
+ size_t missing_data_bytes_;
+ //@}
+
+ /*! The byte order of the message that is stored in the node. */
CORBA::Octet byte_order_;
/// Many protocols like GIOP have a major and minor version
@@ -165,6 +288,9 @@ public:
/// queue already has more fragments that is missing..
CORBA::Octet more_fragments_;
+ /// The fragment request id
+ CORBA::ULong request_id_;
+
/// The message type of the message
TAO_Pluggable_Message_Type msg_type_;
diff --git a/TAO/tao/Incoming_Message_Queue.inl b/TAO/tao/Incoming_Message_Queue.inl
index d67bd485383..f82267b5cea 100644
--- a/TAO/tao/Incoming_Message_Queue.inl
+++ b/TAO/tao/Incoming_Message_Queue.inl
@@ -18,7 +18,7 @@ TAO_Incoming_Message_Queue::is_tail_complete (void)
return -1;
if (this->size_ &&
- this->queued_data_->missing_data_ == 0)
+ this->last_added_->missing_data_bytes_ == 0)
return 1;
return 0;
@@ -31,8 +31,8 @@ TAO_Incoming_Message_Queue::is_head_complete (void)
return -1;
if (this->size_ &&
- this->queued_data_->next_->missing_data_ == 0 &&
- this->queued_data_->next_->more_fragments_ == 0)
+ this->last_added_->next_->missing_data_bytes_ == 0 &&
+ this->last_added_->next_->more_fragments_ == 0)
return 1;
return 0;
@@ -45,7 +45,7 @@ TAO_Incoming_Message_Queue::is_tail_fragmented (void)
return 0;
if (this->size_ &&
- this->queued_data_->more_fragments_ == 1)
+ this->last_added_->more_fragments_ == 1)
return 1;
return 0;
@@ -55,23 +55,22 @@ ACE_INLINE size_t
TAO_Incoming_Message_Queue::missing_data_tail (void) const
{
if (this->size_ != 0)
- return this->queued_data_->missing_data_;
+ return this->last_added_->missing_data_bytes_;
return 0;
}
-
-ACE_INLINE TAO_Queued_Data *
-TAO_Incoming_Message_Queue::get_node (void)
-{
- return TAO_Queued_Data::get_queued_data ();
-}
-
/************************************************************************/
// Methods for TAO_Queued_Data
/************************************************************************/
+ACE_INLINE void
+TAO_Queued_Data::release (void)
+{
+ TAO_Queued_Data::release (this);
+}
+
/*static*/
ACE_INLINE void
TAO_Queued_Data::replace_data_block (ACE_Message_Block &mb)
diff --git a/TAO/tao/Pluggable_Messaging.h b/TAO/tao/Pluggable_Messaging.h
index 8ac38de01e0..3e84635d767 100644
--- a/TAO/tao/Pluggable_Messaging.h
+++ b/TAO/tao/Pluggable_Messaging.h
@@ -121,36 +121,30 @@ public:
virtual void init (CORBA::Octet major,
CORBA::Octet minor) = 0;
- /// Parse the incoming messages..
- virtual int parse_incoming_messages (ACE_Message_Block &message_block) = 0;
-
- /// Calculate the amount of data that is missing in the <incoming>
- /// message block.
- virtual ssize_t missing_data (ACE_Message_Block &incoming) = 0;
-
- /// Get the details of the message parsed through the <qd>.
- virtual void get_message_data (TAO_Queued_Data *qd) = 0;
-
- /* Extract the details of the next message from the <incoming>
- * through <qd>. Returns 1 if there are more messages and returns a
- * 0 if there are no more messages in <incoming>.
- */
- virtual int extract_next_message (ACE_Message_Block &incoming,
- TAO_Queued_Data *&qd) = 0;
-
- /// Check whether the node <qd> needs consolidation from <incoming>
- virtual int consolidate_node (TAO_Queued_Data *qd,
- ACE_Message_Block &incoming) = 0;
-
- /// @@Bala:Docu??
- virtual int consolidate_fragments (TAO_Queued_Data *dqd,
- const TAO_Queued_Data *sqd) = 0;
-
/// Parse the request message, make an upcall and send the reply back
/// to the "request initiator"
virtual int process_request_message (TAO_Transport *transport,
TAO_Queued_Data *qd) = 0;
+ /*!
+ \brief Inspects the bytes in \param mb to see if they "look like" the beginning of a message.
+
+ Inspects the bytes in \param mb, beginning at \code mb.rd_ptr, to
+ see if they look like the beginning of a message. Does
+ */
+ virtual int check_for_valid_header (const ACE_Message_Block &mb) const = 0;
+
+ /*!
+ \brief Set fields in \param qd based on values derived from \param mb.
+
+ This function sets fields in \param qd based on values derived
+ from \param mb. It assumes that if the length of \param mb is
+ enough to hold a header, then the data in there can be trusted to
+ make sense.
+ */
+ virtual void set_queued_data_from_message_header (
+ TAO_Queued_Data *,
+ const ACE_Message_Block &mb) const = 0;
/// Parse the reply message that we received and return the reply
/// information though <reply_info>
diff --git a/TAO/tao/Strategies/DIOP_Transport.cpp b/TAO/tao/Strategies/DIOP_Transport.cpp
index d4edd4c8c80..aa5a569892b 100644
--- a/TAO/tao/Strategies/DIOP_Transport.cpp
+++ b/TAO/tao/Strategies/DIOP_Transport.cpp
@@ -88,12 +88,19 @@ TAO_DIOP_Transport::send (iovec *iov, int iovcnt,
for (int i = 0; i < iovcnt; i++)
bytes_to_send += iov[i].iov_len;
- this->connection_handler_->dgram ().send (iov,
- iovcnt,
- addr);
+ ssize_t n = this->connection_handler_->dgram ().send (iov,
+ iovcnt,
+ addr);
// @@ Michael:
// Always return a positive number of bytes sent, as we do
// not handle sending errors in DIOP.
+ if (n == -1 && TAO_debug_level > 0)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("TAO: (%P|%t|%N|%l) Send of %d bytes failed %p\n"),
+ bytes_to_send,
+ ACE_TEXT ("send_i ()\n")));
+ }
bytes_transferred = bytes_to_send;
@@ -191,7 +198,7 @@ TAO_DIOP_Transport::handle_input (TAO_Resume_Handle &rh,
// Read the message into the message block that we have created on
// the stack.
- ssize_t n = this->recv (message_block.rd_ptr (),
+ ssize_t n = this->recv (message_block.wr_ptr (),
message_block.space (),
max_wait_time);
@@ -199,6 +206,7 @@ TAO_DIOP_Transport::handle_input (TAO_Resume_Handle &rh,
if (n <= 0)
{
if (n == -1)
+ // @@ Why not send_connection_closed_notifications() ?
this->tms_->connection_closed ();
return n;
@@ -207,23 +215,43 @@ TAO_DIOP_Transport::handle_input (TAO_Resume_Handle &rh,
// Set the write pointer in the stack buffer
message_block.wr_ptr (n);
- // Parse the incoming message for validity. The check needs to be
+ // Check the incoming message for validity. The check needs to be
// performed by the messaging objects.
- if (this->parse_incoming_messages (message_block) == -1)
- return -1;
+ if (this->messaging_object ()->check_for_valid_header (message_block) == 0)
+ {
+ if (TAO_debug_level)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("TAO: (%P|%t|%N|%l) failed to find a valid header on transport %d after fault %p\n"),
+ this->id (),
+ ACE_TEXT ("handle_input_i ()\n")));
+ }
+
+ return -1;
+ }
// NOTE: We are not performing any queueing nor any checking for
- // missing data. We are assuming that ALL the data would be got in a
+ // missing data. We are assuming that ALL the data arrives in a
// single read.
// Make a node of the message block..
- TAO_Queued_Data qd (&message_block);
-
- // Extract the data for the node..
- this->messaging_object ()->get_message_data (&qd);
-
- // Process the message
- return this->process_parsed_messages (&qd, rh);
+ //
+ // We could make this more efficient by having a fixed Queued Data
+ // allocator, i.e., it always gave back the same thing. Actually,
+ // we *could* create an allocator that took a stack-allocated object
+ // as an argument and returned that when asked an allocation is
+ // done. Something to contemplate...
+ TAO_Queued_Data* qd =
+ TAO_Queued_Data::make_completed_message (message_block,
+ *this->messaging_object ());
+ int retval = -1;
+ if (qd)
+ {
+ // Process the message
+ retval = this->process_parsed_messages (qd, rh);
+ TAO_Queued_Data::release (qd);
+ }
+ return retval;
}
@@ -310,4 +338,175 @@ TAO_DIOP_Transport::messaging_init (CORBA::Octet major,
return 1;
}
+// @@ Frank: Hopefully DIOP doesn't need this
+/*
+int
+TAO_DIOP_Transport::tear_listen_point_list (TAO_InputCDR &cdr)
+{
+ CORBA::Boolean byte_order;
+ if ((cdr >> ACE_InputCDR::to_boolean (byte_order)) == 0)
+ return -1;
+
+ cdr.reset_byte_order (ACE_static_cast(int,byte_order));
+
+ DIOP::ListenPointList listen_list;
+ if ((cdr >> listen_list) == 0)
+ return -1;
+
+ // As we have received a bidirectional information, set the flag to
+ // 1
+ this->bidirectional_flag (1);
+ return this->connection_handler_->process_listen_point_list (listen_list);
+}
+*/
+
+
+
+// @@ Frank: Hopefully DIOP doesn't need this
+/*
+void
+TAO_DIOP_Transport::set_bidir_context_info (TAO_Operation_Details &opdetails)
+{
+
+ // Get a handle on to the acceptor registry
+ TAO_Acceptor_Registry * ar =
+ this->orb_core ()->acceptor_registry ();
+
+
+ // Get the first acceptor in the registry
+ TAO_AcceptorSetIterator acceptor = ar->begin ();
+
+ DIOP::ListenPointList listen_point_list;
+
+ for (;
+ acceptor != ar->end ();
+ acceptor++)
+ {
+ // Check whether it is a DIOP acceptor
+ if ((*acceptor)->tag () == TAO_TAG_UDP_PROFILE)
+ {
+ this->get_listen_point (listen_point_list,
+ *acceptor);
+ }
+ }
+
+ // We have the ListenPointList at this point. Create a output CDR
+ // stream at this point
+ TAO_OutputCDR cdr;
+
+ // Marshall the information into the stream
+ if ((cdr << ACE_OutputCDR::from_boolean (TAO_ENCAP_BYTE_ORDER)== 0)
+ || (cdr << listen_point_list) == 0)
+ return;
+
+ // Add this info in to the svc_list
+ opdetails.service_context ().set_context (IOP::BI_DIR_DIOP,
+ cdr);
+
+ return;
+}
+
+
+int
+TAO_DIOP_Transport::get_listen_point (
+ DIOP::ListenPointList &listen_point_list,
+ TAO_Acceptor *acceptor)
+{
+ TAO_DIOP_Acceptor *iiop_acceptor =
+ ACE_dynamic_cast (TAO_DIOP_Acceptor *,
+ acceptor );
+
+ // Get the array of endpoints serviced by <iiop_acceptor>
+ const ACE_INET_Addr *endpoint_addr =
+ iiop_acceptor->endpoints ();
+
+ // Get the count
+ size_t count =
+ iiop_acceptor->endpoint_count ();
+
+ // Get the local address of the connection
+ ACE_INET_Addr local_addr;
+
+ if (this->connection_handler_->peer ().get_local_addr (local_addr)
+ == -1)
+ {
+ ACE_ERROR_RETURN ((LM_ERROR,
+ ACE_TEXT ("(%P|%t) Could not resolve local host")
+ ACE_TEXT (" address in set_bidir_context_info () \n")),
+ -1);
+ }
+
+
+ // Note: Looks like there is no point in sending the list of
+ // endpoints on interfaces on which this connection has not
+ // been established. If this is wrong, please correct me.
+ char *local_interface = 0;
+
+ // Get the hostname for the local address
+ if (iiop_acceptor->hostname (this->orb_core_,
+ local_addr,
+ local_interface) == -1)
+ {
+ ACE_ERROR_RETURN ((LM_ERROR,
+ ACE_TEXT ("(%P|%t) Could not resolve local host")
+ ACE_TEXT (" name \n")),
+ -1);
+ }
+
+ ACE_INET_Addr *tmp_addr = ACE_const_cast (ACE_INET_Addr *,
+ endpoint_addr);
+
+ for (size_t index = 0;
+ index <= count;
+ index++)
+ {
+ // Get the listen point on that acceptor if it has the same
+ // interface on which this connection is established
+ char *acceptor_interface = 0;
+
+ if (iiop_acceptor->hostname (this->orb_core_,
+ tmp_addr[index],
+ acceptor_interface) == -1)
+ continue;
+
+ // @@ This is very bad for performance, but it is a one time
+ // affair
+ if (ACE_OS::strcmp (local_interface,
+ acceptor_interface) == 0)
+ {
+ // We have the connection and the acceptor endpoint on the
+ // same interface
+ DIOP::ListenPoint point;
+ point.host = CORBA::string_dup (local_interface);
+ point.port = endpoint_addr[index].get_port_number ();
+
+ // Get the count of the number of elements
+ CORBA::ULong len = listen_point_list.length ();
+
+ // Increase the length by 1
+ listen_point_list.length (len + 1);
+
+ // Add the new length to the list
+ listen_point_list[len] = point;
+ }
+
+ // @@ This is bad....
+ CORBA::string_free (acceptor_interface);
+ }
+
+ CORBA::string_free (local_interface);
+ return 1;
+}
+*/
+
+#if 0
+TAO_Connection_Handler *
+TAO_DIOP_Transport::invalidate_event_handler_i (void)
+{
+ TAO_Connection_Handler * eh = this->connection_handler_;
+ this->connection_handler_ = 0;
+ return eh;
+}
+#endif
+
#endif /* TAO_HAS_DIOP && TAO_HAS_DIOP != 0 */
diff --git a/TAO/tao/Strategies/SHMIOP_Transport.cpp b/TAO/tao/Strategies/SHMIOP_Transport.cpp
index aaebc6860cf..c7f845eee81 100644
--- a/TAO/tao/Strategies/SHMIOP_Transport.cpp
+++ b/TAO/tao/Strategies/SHMIOP_Transport.cpp
@@ -136,6 +136,7 @@ TAO_SHMIOP_Transport::recv (char *buf,
}
+#if 0
int
TAO_SHMIOP_Transport::consolidate_message (ACE_Message_Block &incoming,
ssize_t missing_data,
@@ -191,6 +192,7 @@ TAO_SHMIOP_Transport::consolidate_message (ACE_Message_Block &incoming,
// process that
return this->process_parsed_messages (&pqd, rh);
}
+#endif
int
TAO_SHMIOP_Transport::send_request (TAO_Stub *stub,
diff --git a/TAO/tao/Strategies/SHMIOP_Transport.h b/TAO/tao/Strategies/SHMIOP_Transport.h
index 02c67c63116..8b54426aaa7 100644
--- a/TAO/tao/Strategies/SHMIOP_Transport.h
+++ b/TAO/tao/Strategies/SHMIOP_Transport.h
@@ -82,10 +82,14 @@ protected:
size_t len,
const ACE_Time_Value *s = 0);
+#if 0
+ // This no longer exists with the PMB-change flow. Not sure how to deal with that,
+ // so for now we ditch the method and see if things work.
virtual int consolidate_message (ACE_Message_Block &incoming,
ssize_t missing_data,
TAO_Resume_Handle &rh,
ACE_Time_Value *max_wait_time);
+#endif
//@}
diff --git a/TAO/tao/Transport.cpp b/TAO/tao/Transport.cpp
index 9923ff1f895..1640644edf9 100644
--- a/TAO/tao/Transport.cpp
+++ b/TAO/tao/Transport.cpp
@@ -19,7 +19,9 @@
#include "Resume_Handle.h"
#include "Codeset_Manager.h"
#include "Codeset_Translator_Factory.h"
+#include "GIOP_Message_State.h"
#include "ace/OS_NS_sys_time.h"
+#include "ace/Message_Block.h"
#include "ace/Reactor.h"
@@ -110,12 +112,15 @@ TAO_Transport::TAO_Transport (CORBA::ULong tag,
, head_ (0)
, tail_ (0)
, incoming_message_queue_ (orb_core)
+ , uncompleted_message_ (0)
, current_deadline_ (ACE_Time_Value::zero)
, flush_timer_id_ (-1)
, transport_timer_ (this)
, handler_lock_ (orb_core->resource_factory ()->create_cached_connection_lock ())
, id_ ((size_t) this)
, purging_order_ (0)
+ , recv_buffer_size_ (0)
+ , sent_byte_count_ (0)
, char_translator_ (0)
, wchar_translator_ (0)
, tcs_set_ (0)
@@ -236,13 +241,9 @@ TAO_Transport::generate_request_header (
{
// codeset service context is only supposed to be sent in the first request
// on a particular connection.
- if (this->first_request_)
- {
- this->orb_core ()->codeset_manager ()->generate_service_context (
- opdetails,
- *this
- );
- }
+ TAO_Codeset_Manager *csm = this->orb_core()->codeset_manager();
+ if (csm && this->first_request_)
+ csm->generate_service_context( opdetails, *this );
if (this->messaging_object ()->generate_request_header (opdetails,
spec,
@@ -604,7 +605,12 @@ int
TAO_Transport::schedule_output_i (void)
{
ACE_Event_Handler *eh = this->event_handler_i ();
+ if (eh == 0)
+ return -1;
+
ACE_Reactor *reactor = eh->reactor ();
+ if (reactor == 0)
+ return -1;
if (TAO_debug_level > 3)
{
@@ -620,7 +626,12 @@ int
TAO_Transport::cancel_output_i (void)
{
ACE_Event_Handler *eh = this->event_handler_i ();
+ if (eh == 0)
+ return -1;
+
ACE_Reactor *reactor = eh->reactor ();
+ if (reactor == 0)
+ return -1;
if (TAO_debug_level > 3)
{
@@ -739,6 +750,9 @@ TAO_Transport::drain_queue_helper (int &iovcnt, iovec iov[])
// no bytes are sent send() can only return 0 or -1
ACE_ASSERT (byte_count != 0);
+ // Total no. of bytes sent for a send call
+ this->sent_byte_count_ += byte_count;
+
if (TAO_debug_level > 4)
{
ACE_DEBUG ((LM_DEBUG,
@@ -762,6 +776,10 @@ TAO_Transport::drain_queue_i (void)
// We loop over all the elements in the queue ...
TAO_Queued_Message *i = this->head_;
+ // reset the value so that the counting is done for each new send
+ // call.
+ this->sent_byte_count_ = 0;
+
while (i != 0)
{
// ... each element fills the iovector ...
@@ -820,8 +838,14 @@ TAO_Transport::drain_queue_i (void)
if (this->flush_timer_pending ())
{
ACE_Event_Handler *eh = this->event_handler_i ();
- ACE_Reactor *reactor = eh->reactor ();
- reactor->cancel_timer (this->flush_timer_id_);
+ if (eh != 0)
+ {
+ ACE_Reactor *reactor = eh->reactor ();
+ if (reactor != 0)
+ {
+ (void) reactor->cancel_timer (this->flush_timer_id_);
+ }
+ }
this->reset_flush_timer ();
}
@@ -898,20 +922,25 @@ TAO_Transport::check_buffering_constraints_i (TAO_Stub *stub,
if (set_timer)
{
ACE_Event_Handler *eh = this->event_handler_i ();
- ACE_Reactor *reactor = eh->reactor ();
- this->current_deadline_ = new_deadline;
- ACE_Time_Value delay =
- new_deadline - ACE_OS::gettimeofday ();
-
- if (this->flush_timer_pending ())
+ if (eh != 0)
{
- reactor->cancel_timer (this->flush_timer_id_);
- }
+ ACE_Reactor *reactor = eh->reactor ();
+ if (reactor != 0)
+ {
+ this->current_deadline_ = new_deadline;
+ ACE_Time_Value delay =
+ new_deadline - ACE_OS::gettimeofday ();
- this->flush_timer_id_ =
- reactor->schedule_timer (&this->transport_timer_,
- &this->current_deadline_,
- delay);
+ if (this->flush_timer_pending ())
+ {
+ (void) reactor->cancel_timer (this->flush_timer_id_);
+ }
+ this->flush_timer_id_ =
+ reactor->schedule_timer (&this->transport_timer_,
+ &this->current_deadline_,
+ delay);
+ }
+ }
}
return constraints_reached;
@@ -1118,6 +1147,18 @@ TAO_Transport::send_message_shared_i (TAO_Stub *stub,
return 0;
}
+
+class CTHack
+{
+public:
+ CTHack() { enter(); }
+ ~CTHack() { leave(); }
+private:
+ void enter() { x = 1; }
+ void leave() { x = 0; }
+ int x;
+};
+
/*
*
* All the methods relevant to the incoming data path of the ORB are
@@ -1129,6 +1170,8 @@ TAO_Transport::handle_input (TAO_Resume_Handle &rh,
ACE_Time_Value * max_wait_time,
int /*block*/)
{
+ CTHack cthack;
+
if (TAO_debug_level > 3)
{
ACE_DEBUG ((LM_DEBUG,
@@ -1136,9 +1179,8 @@ TAO_Transport::handle_input (TAO_Resume_Handle &rh,
this->id ()));
}
- // First try to process messages of the head of the incoming queue.
+ // First try to process messages off the head of the incoming queue.
int retval = this->process_queue_head (rh);
-
if (retval <= 0)
{
if (retval == -1)
@@ -1149,7 +1191,6 @@ TAO_Transport::handle_input (TAO_Resume_Handle &rh,
"error while parsing the head of the queue\n",
this->id()));
}
-
return retval;
}
@@ -1158,7 +1199,7 @@ TAO_Transport::handle_input (TAO_Resume_Handle &rh,
// The buffer on the stack which will be used to hold the input
// messages
- char buf [TAO_MAXBUFSIZE];
+ char buf[TAO_MAXBUFSIZE];
#if defined (ACE_HAS_PURIFY)
(void) ACE_OS::memset (buf,
@@ -1180,26 +1221,35 @@ TAO_Transport::handle_input (TAO_Resume_Handle &rh,
ACE_Message_Block::DONT_DELETE,
this->orb_core_->input_cdr_msgblock_allocator ());
+ // We'll loop trying to complete the message this number of times,
+ // and that's it.
+ unsigned int number_of_read_attempts = TAO_MAX_TRANSPORT_REREAD_ATTEMPTS;
+
+ unsigned int did_queue_message = 0;
// Align the message block
ACE_CDR::mb_align (&message_block);
size_t recv_size = 0;
-
if (this->orb_core_->orb_params ()->single_read_optimization ())
{
- recv_size =
- message_block.space ();
+ recv_size = message_block.space ();
}
else
{
- recv_size =
- this->messaging_object ()->header_length ();
+ recv_size = this->messaging_object ()->header_length ();
}
+ // Saving the size of the received buffer in case any one needs to
+ // get the size of the message thats received in the
+ // context. Obviously the value will be changed for each recv call
+ // and the user is supposed to invoke the accessor only in the
+ // invocation context to get meaningful information.
+ this->recv_buffer_size_ = recv_size;
+
// Read the message into the message block that we have created on
// the stack.
- ssize_t n = this->recv (message_block.rd_ptr (),
+ ssize_t n = this->recv (message_block.wr_ptr (),
recv_size,
max_wait_time);
@@ -1212,7 +1262,7 @@ TAO_Transport::handle_input (TAO_Resume_Handle &rh,
if (TAO_debug_level > 2)
{
ACE_DEBUG ((LM_DEBUG,
- "TAO (%P|%t) - Transport[%d]::handle_input_i, "
+ "TAO (%P|%t) - Transport[%d]::handle_input_i: "
"read %d bytes\n",
this->id (), n));
}
@@ -1220,172 +1270,372 @@ TAO_Transport::handle_input (TAO_Resume_Handle &rh,
// Set the write pointer in the stack buffer
message_block.wr_ptr (n);
- // Parse the message and try consolidating the message if
- // needed.
- retval = this->parse_consolidate_messages (message_block,
- rh,
- max_wait_time);
+ if (TAO_debug_level >= 10)
+ ACE_HEX_DUMP ((LM_DEBUG,
+ (const char *) message_block.rd_ptr (),
+ message_block.length (),
+ ACE_TEXT ("TAO (%P|%t) Transport::handle_input_i(): bytes read from socket")));
- if (retval <= 0)
- {
- if (retval == -1 && TAO_debug_level > 0)
- {
- ACE_DEBUG ((LM_DEBUG,
- "TAO (%P|%t) - Transport[%d]::handle_input_i, "
- "error while parsing and consolidating\n",
- this->id ()));
- }
- return retval;
- }
-
- // Make a node of the message block..
- TAO_Queued_Data qd (&message_block,
- this->orb_core_->transport_message_buffer_allocator ());
-
- // Extract the data for the node..
- this->messaging_object ()->get_message_data (&qd);
- // Check whether the message was fragmented..
- if (qd.more_fragments_ ||
- (qd.msg_type_ == TAO_PLUGGABLE_MESSAGE_FRAGMENT))
+complete_message_and_possibly_enqueue:
+ // Check to see if we're still working to complete a message
+ if (this->uncompleted_message_)
{
- // Duplicate the node that we have as the node is on stack..
- TAO_Queued_Data *nqd =
- TAO_Queued_Data::duplicate (qd);
+ // try to complete it
- return this->consolidate_fragments (nqd, rh);
- }
+ // on exit from this frame we have one of the following states:
+ //
+ // (a) an uncompleted message still in uncompleted_message_
+ // AND message_block is empty
+ //
+ // (b) uncompleted_message_ zero, the completed message at the
+ // tail of the incoming queue; message_block could be empty
+ // or still contain bytes
- // Process the message
- return this->process_parsed_messages (&qd,
- rh);
-}
+ // ==> repeat
+ do
+ {
+ /*
+ * Append the "right number of bytes" to uncompleted_message_
+ */
+ // ==> right_number_of_bytes = MIN(bytes missing from
+ // uncompleted_message_, length of message_block);
+ size_t right_number_of_bytes =
+ ACE_MIN (this->uncompleted_message_->missing_data_bytes_,
+ message_block.length () );
-int
-TAO_Transport::parse_consolidate_messages (ACE_Message_Block &block,
- TAO_Resume_Handle &rh,
- ACE_Time_Value *max_wait_time)
-{
- // Parse the incoming message for validity. The check needs to be
- // performed by the messaging objects.
- if (this->parse_incoming_messages (block) == -1)
- {
- return -1;
- }
+ if (TAO_debug_level > 2)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "(%P|%t) Transport[%d]::handle_input_i: "
+ "trying to use %u (of %u) "
+ "bytes to complete message missing %u bytes\n",
+ this->id (),
+ right_number_of_bytes,
+ message_block.length (),
+ this->uncompleted_message_->missing_data_bytes_));
+ }
- // Check whether we have a complete message for processing
- ssize_t missing_data = this->missing_data (block);
+ // ==> append right_number_of_bytes from message_block
+ // to uncomplete_message_ & update read pointer of
+ // message_block;
+
+ // 1. we assume that uncompleted_message_.msg_block_'s
+ // wr_ptr is properly maintained
+ // 2. we presume that uncompleted_message_.msg_block was
+ // allocated with enough space to contain the *entire*
+ // expected GIOP message, so this copy shouldn't involve an
+ // additional allocation
+ this->uncompleted_message_->msg_block_->copy (message_block.rd_ptr (),
+ right_number_of_bytes);
+ this->uncompleted_message_->missing_data_bytes_ -= right_number_of_bytes;
+ message_block.rd_ptr (right_number_of_bytes);
+
+ switch (this->uncompleted_message_->current_state_)
+ {
+ case TAO_Queued_Data::WAITING_TO_COMPLETE_HEADER:
+ {
+ int hdrvalidity = this->messaging_object()->check_for_valid_header (
+ *this->uncompleted_message_->msg_block_);
+ if (hdrvalidity == 0)
+ {
+ // According to the spec, Section 15.4.8, we should send
+ // the MessageError GIOP message on receipt of "any message...whose
+ // header is not properly formed (e.g., has the wrong magic value)".
+ //
+ // So, rather than returning -1, what we REALLY need to do is
+ // send a MessageError in reply.
+ //
+ // I'm not sure what the best way to trigger that is...probably to
+ // queue up a special internal-only COMPLETED message that, when
+ // processed, sends the MessageError as part of its processing.
+ return -1;
+ }
+ else if (hdrvalidity == 1)
+ {
+ // ==> update bytes missing from uncompleted_message_
+ // with size of message from valid header;
+ this->messaging_object()->set_queued_data_from_message_header (
+ this->uncompleted_message_,
+ *this->uncompleted_message_->msg_block_);
+ // ==> change state of uncompleted_event_ to
+ // WAITING_TO_COMPLETE_PAYLOAD;
+ this->uncompleted_message_->current_state_ =
+ TAO_Queued_Data::WAITING_TO_COMPLETE_PAYLOAD;
+
+ // ==> Resize the message block to have capacity for
+ // the rest of the incoming message
+ ACE_Message_Block & mb = *this->uncompleted_message_->msg_block_;
+ ACE_CDR::grow (&mb,
+ mb.size ()
+ + this->uncompleted_message_->missing_data_bytes_);
+
+ if (TAO_debug_level > 2)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "(%P|%t) Transport[%d]::handle_input_i: "
+ "found a valid header in the message; "
+ "waiting for %u bytes to complete payload\n",
+ this->id (),
+ this->uncompleted_message_->missing_data_bytes_));
+ }
+
+ // Continue the loop...
+ continue;
+ }
+ // In the case where we don't have enough information (hdrvalidity == -1),
+ // we just have to fall through and collect more.
+#if 0
+ else
+ {
+ // What the heck will we do with a bad header? Just
+ // better to close the connection and let things
+ // re-train from there.
+ if (this->uncompleted_message_->msg_block_->length () ==
+ this->messaging_object()->header_length())
+ return -1;
+
+#if 0 // I don't think I need this clause, but I'm leaving it just in case.
+ // ==> bytes missing from uncompleted_message_ -= right_number_of_bytes;
+ this->uncompleted_message_->missing_data_bytes_ -= right_number_of_bytes;
+ ACE_ASSERT (this->uncompleted_message_->missing_data_bytes_ > 0);
+#endif
+ }
+#endif
+ }
+ break;
+
+ case TAO_Queued_Data::WAITING_TO_COMPLETE_PAYLOAD:
+ // Here we have an opportunity to try to finish reading the
+ // uncompleted message. This is a Good Idea(TM) because there are
+ // good odds that either more data came available since the last
+ // time we read, or that we simply didn't read the whole message on
+ // the first read. So, we try to read again.
+ //
+ // NOTE! this changes this->uncompleted_message_!
+ this->try_to_complete (max_wait_time);
+ // ==> if (bytes missing from uncompleted_message_ == 0)
+ if (this->uncompleted_message_->missing_data_bytes_ == 0)
+ {
+ /*
+ * We completed the message! Hooray!
+ */
+ // ==> place uncompleted_message_ (which is now
+ // complete!) at the tail of the incoming message
+ // queue;
+
+ // ---> NOTE: whoever pulls this off the queue must delete it!
+ this->uncompleted_message_->current_state_
+ = TAO_Queued_Data::COMPLETED;
+
+ // @@CJC NEED TO CHECK RETURN VALUE HERE!
+ this->enqueue_incoming_message (this->uncompleted_message_);
+ did_queue_message = 1;
+ // zero out uncompleted_message_;
+ this->uncompleted_message_ = 0;
+
+ if (TAO_debug_level > 2)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "(%P|%t) Transport[%d]::handle_input_i: "
+ "completed and queued message for processing!\n",
+ this->id ()));
+ }
- if (missing_data < 0)
- {
- // If we have more than one message
- return this->consolidate_extra_messages (block,
- rh);
- }
- else if (missing_data > 0)
- {
- // If we have missing data then try doing a read or try queueing
- // them.
- return this->consolidate_message (block,
- missing_data,
- rh,
- max_wait_time);
- }
+ }
+ else
+ {
- return 1;
-}
+ if (TAO_debug_level > 2)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "(%P|%t) Transport[%d]::handle_input_i: "
+ "still need %u bytes to complete uncompleted message.\n",
+ this->id (),
+ this->uncompleted_message_->missing_data_bytes_));
+ }
+ }
+ break;
-int
-TAO_Transport::parse_incoming_messages (ACE_Message_Block &block)
-{
- // If we have a queue and if the last message is not complete a
- // complete one, then this read will get us the remaining data. So
- // do not try to parse the header if we have an incomplete message
- // in the queue.
- if (this->incoming_message_queue_.is_tail_complete () != 0)
- {
- // As it looks like a new message has been read, process the
- // message. Call the messaging object to do the parsing..
- int retval =
- this->messaging_object ()->parse_incoming_messages (block);
+ default:
+ // @@CJC What do we do here?!
+ ACE_ASSERT (! "Transport::handle_input_i: unexpected state"
+ "in uncompleted_message_");
+ }
+ }
+ // Does the order of the checks matter? In both (a) and (b),
+ // message_block is empty, but only in (b) is there no
+ // uncompleted_message_.
+ // ==> until (message_block is empty || there is no uncompleted_message_);
+ // or, rewritten in C++ looping constructs
+ // ==> while ( ! message_block is empty && there is an uncompleted_message_ );
+ while (message_block.length() != 0 && this->uncompleted_message_);
+ }
+
+ // *****************************
+ // @@ CJC
+ //
+ // Once upon a time we tried to complete reading the uncompleted
+ // message here, but testing found that completing later worked
+ // better.
+ // *****************************
+
+
+ // At this point, there should be nothing in uncompleted_message_.
+ // We now need to chop up the bytes in message_block and store any
+ // complete messages in the incoming message queue.
+ //
+ // ==> if (message_block still has data)
+ if (message_block.length () != 0)
+ {
+ TAO_Queued_Data *complete_message = 0;
+ do
+ {
+ if (TAO_debug_level >= 10)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT("TAO (%P|%t) Transport::handle_input_i: ")
+ ACE_TEXT("extracting complete messages\n")));
+ ACE_HEX_DUMP ((LM_DEBUG,
+ message_block.rd_ptr (),
+ message_block.length (),
+ ACE_TEXT (" from this message buffer")));
+ }
- if (retval == -1)
+ complete_message =
+ TAO_Queued_Data::make_completed_message (
+ message_block, *this->messaging_object ());
+ if (complete_message)
+ {
+ this->enqueue_incoming_message (complete_message);
+ did_queue_message = 1;
+ }
+ }
+ while (complete_message != 0);
+ // On exit from this frame we have one of the following states:
+ // (a) message_block is empty
+ // (b) message_block contains bytes from a partial message
+ }
+
+ // If, at this point, there's still data in message_block, it's
+ // an incomplete message. Therefore, we stuff it into the
+ // uncompleted_message_ and clear out message_block.
+ // ==> if (message_block still has data)
+ if (message_block.length () != 0)
+ {
+ // duplicate message_block remainder into this->uncompleted_message_
+ ACE_ASSERT (this->uncompleted_message_ == 0);
+ this->uncompleted_message_ =
+ TAO_Queued_Data::make_uncompleted_message (&message_block,
+ *this->messaging_object ());
+ ACE_ASSERT (this->uncompleted_message_ != 0);
+
+ // In a debug build, we won't reach this point if we couldn't
+ // create an uncompleted message because the above ASSERT will
+ // trip. However, in an optimized build, the ASSERT isn't
+ // there, so we'll go past here.
+ //
+ // We could put a check in here similar to the ASSERT condition,
+ // but doing that would terminate this loop early and result in
+ // our never processing any completed messages that were received
+ // in this trip to handle_input_i.
+ //
+ // Maybe we could instead queue up a special completed message that,
+ // when processed, causes the connection to get closed in a non-graceful
+ // termination scenario.
+ }
+
+ // We should have consumed ALL the bytes by now.
+ ACE_ASSERT (message_block.length () == 0);
+
+ //
+ // We don't want to try to re-read earlier because we may not have
+ // an uncompleted message until we get to this point. So, if we did
+ // it earlier, we could have missed the opportunity to complete it
+ // and dispatch.
+ //
+ // Thanks to Bala <bala@cse.wustl.edu> for the idea to read again
+ // to increase throughput!
+
+ if (this->uncompleted_message_)
+ {
+ if (number_of_read_attempts--)
{
+ // We try to read again just in case more data arrived while
+ // we were doing the stuff above. This way, we can increase
+ // throughput without much of a penalty.
+
if (TAO_debug_level > 2)
- ACE_DEBUG ((LM_DEBUG,
- "TAO (%P|%t) - Transport[%d]::parse_incoming_messages, "
- "error in incoming message\n",
- this->id ()));
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "(%P|%t) Transport[%d]::handle_input_i: "
+ "still have an uncompleted message; "
+ "will try %d more times before letting "
+ "somebody else have a chance.\n",
+ this->id (),
+ number_of_read_attempts));
+ }
- return -1;
+ // We only bother trying to complete payload, not header, because the
+ // retry only happens in the complete-the-payload clause above.
+ if (this->uncompleted_message_->current_state_ ==
+ TAO_Queued_Data::WAITING_TO_COMPLETE_PAYLOAD)
+ goto complete_message_and_possibly_enqueue;
+ }
+ else
+ {
+ // The queue should be empty because it should have been processed
+ // above. But I wonder if I should put a check in here anyway.
+ if (TAO_debug_level > 2)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "(%P|%t) Transport[%d]::handle_input_i: "
+ "giving up reading for now and returning "
+ "with incoming queue length = %d\n",
+ this->id (),
+ this->incoming_message_queue_.queue_length ()));
+ if (this->uncompleted_message_)
+ ACE_DEBUG ((LM_DEBUG,
+ "(%P|%t) Transport[%d]::handle_input_i: "
+ "missing bytes from uncompleted message = %u\n",
+ this->id (),
+ this->uncompleted_message_->missing_data_bytes_));
+ }
+ return 1;
}
}
- return 0;
-}
-
-
-size_t
-TAO_Transport::missing_data (ACE_Message_Block &incoming)
-{
- // If we have a incomplete message in the queue then find out how
- // much of data is required to get a complete message.
- if (this->incoming_message_queue_.is_tail_complete () == 0)
- {
- return this->incoming_message_queue_.missing_data_tail ();
- }
+ // **** END CJC PMG CHANGES ****
- return this->messaging_object ()->missing_data (incoming);
+ return did_queue_message ? this->process_queue_head (rh) : 1;
}
-int
-TAO_Transport::consolidate_message (ACE_Message_Block &incoming,
- ssize_t missing_data,
- TAO_Resume_Handle &rh,
- ACE_Time_Value *max_wait_time)
+void
+TAO_Transport::try_to_complete (ACE_Time_Value *max_wait_time)
{
- // Check whether the last message in the queue is complete..
- if (this->incoming_message_queue_.is_tail_complete () == 0)
- {
- return this->consolidate_message_queue (incoming,
- missing_data,
- rh,
- max_wait_time);
- }
-
- if (TAO_debug_level > 4)
- {
- ACE_DEBUG ((LM_DEBUG,
- "TAO (%P|%t) - Transport[%d]::consolidate_message\n",
- this->id ()));
- }
-
- // Calculate the actual length of the load that we are supposed to
- // read which is equal to the <missing_data> + length of the buffer
- // that we have..
- size_t payload = missing_data + incoming.size ();
-
- // Grow the buffer to the size of the message
- ACE_CDR::grow (&incoming,
- payload);
+ if (this->uncompleted_message_ == 0)
+ return;
ssize_t n = 0;
+ size_t &missing_data = this->uncompleted_message_->missing_data_bytes_;
+ ACE_Message_Block &mb = *this->uncompleted_message_->msg_block_;
- // As this used for transports where things are available in one
- // shot this looping should not create any problems.
- for (ssize_t bytes = missing_data; bytes != 0; bytes -= n)
+ // Try to complete this until we error or block right here...
+ for (ssize_t bytes = missing_data;
+ bytes != 0;
+ bytes -= n)
{
// .. do a read on the socket again.
- n = this->recv (incoming.wr_ptr (),
+ n = this->recv (mb.wr_ptr (),
bytes,
max_wait_time);
if (TAO_debug_level > 6)
{
ACE_DEBUG ((LM_DEBUG,
- "TAO (%P|%t) - Transport[%d]::consolidate_message, "
+ "TAO (%P|%t) - Transport[%d]::handle_input_i, "
"read %d bytes on attempt\n",
this->id(), n));
}
@@ -1395,375 +1645,168 @@ TAO_Transport::consolidate_message (ACE_Message_Block &incoming,
break;
}
- incoming.wr_ptr (n);
+ mb.wr_ptr (n);
missing_data -= n;
}
-
- // If we got an error..
- if (n == -1)
- {
- if (TAO_debug_level > 4)
- {
- ACE_DEBUG ((LM_DEBUG,
- "TAO (%P|%t) - Trasport[%d]::consolidate_message, "
- "error while trying to consolidate\n",
- this->id ()));
- }
-
- return -1;
- }
-
- // If we had gotten a EWOULDBLOCK n would be equal to zero. But we
- // have to put the message in the queue anyway. So let us proceed
- // to do that and return...
-
- // Check to see if we have messages in queue or if we have missing
- // data . AT this point we cannot have have semi-complete messages
- // in the queue as they would have been taken care before. Put
- // ourselves in the queue and then try processing one of the
- // messages..
- if ((missing_data > 0
- ||this->incoming_message_queue_.queue_length ())
- && this->incoming_message_queue_.is_tail_fragmented () == 0)
- {
- if (TAO_debug_level > 4)
- {
- ACE_DEBUG ((LM_DEBUG,
- "TAO (%P|%t) - Transport[%d]::consolidate_message, "
- "queueing up the message\n",
- this->id ()));
- }
-
- // Get a queued data
- TAO_Queued_Data *qd =
- this->make_queued_data (incoming);
-
- // Add the missing data to the queue
- qd->missing_data_ = missing_data;
-
- // Get the rest of the messaging data
- this->messaging_object ()->get_message_data (qd);
-
- // Add it to the tail of the queue..
- this->incoming_message_queue_.enqueue_tail (qd);
-
- if (this->incoming_message_queue_.is_head_complete ())
- {
- return this->process_queue_head (rh);
- }
-
- return 0;
- }
-
- // We dont have any missing data. Just make a queued_data node with
- // the existing message block and send it to the higher layers of
- // the ORB.
- TAO_Queued_Data pqd (&incoming,
- this->orb_core_->transport_message_buffer_allocator ());
- pqd.missing_data_ = missing_data;
- this->messaging_object ()->get_message_data (&pqd);
-
- // Check whether the message was fragmented and try to consolidate
- // the fragments..
- if (pqd.more_fragments_ ||
- (pqd.msg_type_ == TAO_PLUGGABLE_MESSAGE_FRAGMENT))
- {
- // Duplicate the queued data as it is on stack..
- TAO_Queued_Data *nqd = TAO_Queued_Data::duplicate (pqd);
-
- return this->consolidate_fragments (nqd, rh);
- }
-
- // Now we have a full message in our buffer. Just go ahead and
- // process that
- return this->process_parsed_messages (&pqd,
- rh);
}
-int
-TAO_Transport::consolidate_fragments (TAO_Queued_Data *qd,
- TAO_Resume_Handle &rh)
-{
- // If we have received a fragment message then we have to
- // consolidate <qd> with the last message in queue
- // @@todo: this piece of logic follows GIOP a bit... Need to revisit
- // if we have protocols other than GIOP
-
- // @@todo: Fragments now have too much copying overhead. Need to get
- // them out if we want to have some reasonable performance metrics
- // in future.. Post 1.2 seems a nice time..
- if (qd->msg_type_ == TAO_PLUGGABLE_MESSAGE_FRAGMENT)
- {
- TAO_Queued_Data *tqd =
- this->incoming_message_queue_.dequeue_tail ();
-
- tqd->more_fragments_ = qd->more_fragments_;
- tqd->missing_data_ = qd->missing_data_;
-
- if (this->messaging_object ()->consolidate_fragments (tqd, qd) == -1)
- {
- return -1;
- }
-
- TAO_Queued_Data::release (qd);
- this->incoming_message_queue_.enqueue_tail (tqd);
- this->process_queue_head (rh);
- }
- else
- {
- // if we dont have a fragment already in the queue just add it in
- // the queue
- this->incoming_message_queue_.enqueue_tail (qd);
- }
-
- return 0;
-}
int
-TAO_Transport::consolidate_message_queue (ACE_Message_Block &incoming,
- ssize_t missing_data,
- TAO_Resume_Handle &rh,
- ACE_Time_Value *max_wait_time)
+TAO_Transport::enqueue_incoming_message (TAO_Queued_Data *queueable_message)
{
- if (TAO_debug_level > 4)
- {
- ACE_DEBUG ((LM_DEBUG,
- "TAO (%P|%t) - Transport[%d]::consolidate_message_queue\n",
- this->id ()));
- }
-
- // If the queue did not have a complete message put this piece of
- // message in the queue. We know it did not have a complete
- // message. That is why we are here.
- size_t n =
- this->incoming_message_queue_.copy_tail (incoming);
-
- if (TAO_debug_level > 6)
- {
- ACE_DEBUG ((LM_DEBUG,
- "TAO (%P|%t) - Transport[%d]::consolidate_message_queue, "
- "copied [%d] bytes to the tail\n",
- this->id (),
- n));
- }
-
- // Update the missing data...
- missing_data =
- this->incoming_message_queue_.missing_data_tail ();
-
- if (TAO_debug_level > 6)
- {
- ACE_DEBUG ((LM_DEBUG,
- "TAO (%P|%t) - Transport[%d]::consolidate_message_queue, "
- "missing [%d] bytes in the tail message\n",
- this->id (),
- missing_data));
- }
-
- // Move the read pointer of the <incoming> message block to the end
- // of the copied message and process the remaining portion...
- incoming.rd_ptr (n);
-
- // If we have some more information left in the message block..
- if (incoming.length ())
- {
- // We may have to parse & consolidate. This part of the message
- // doesn't seem to be part of the last message in the queue (as
- // the copy () hasn't taken away this message).
- int retval = this->parse_consolidate_messages (incoming,
- rh,
- max_wait_time);
-
- // If there is an error return
- if (retval == -1)
+ // Get the GIOP version
+ CORBA::Octet major = queueable_message->major_version_;
+ CORBA::Octet minor = queueable_message->minor_version_;
+ CORBA::UShort whole = major << 8 | minor;
+
+ // Set up a couple of pointers that are shared by the code
+ // for the different GIOP versions.
+ ACE_Message_Block *mb = 0;
+ TAO_Queued_Data *fragment_message = 0;
+
+ switch(whole)
+ {
+ case 0x0100: // GIOP 1.0
+ if (!queueable_message->more_fragments_)
+ return this->incoming_message_queue_.enqueue_tail (
+ queueable_message);
+
+ // Fragments aren't supported in 1.0. This is an error and
+ // we should reject it somehow. What do we do here? Do we throw
+ // an exception to the receiving side? Do we throw an exception
+ // to the sending side?
+ //
+ // At the very least, we need to log the fact that we received
+ // nonsense.
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "TAO (%P|%t) - "
+ "TAO_Transport::enqueue_incoming_message "
+ "detected a fragmented GIOP 1.0 message\n"),
+ -1);
+ break;
+ case 0x0101: // GIOP 1.1
+ // In 1.1, fragments kinda suck because they don't have they're
+ // own message-specific header. Therefore, we have to do the
+ // following:
+ fragment_message =
+ this->incoming_message_queue_.find_fragment (major, minor);
+
+ // No fragment was found
+ if (fragment_message == 0)
+ return this->incoming_message_queue_.enqueue_tail (
+ queueable_message);
+
+ if (queueable_message->more_fragments_)
{
- if (TAO_debug_level)
- {
- ACE_DEBUG ((LM_DEBUG,
- "TAO (%P|%t) - Transport[%d]::consolidate_message_queue, "
- "error while consolidating, part of the read message\n",
- this->id ()));
- }
- return retval;
+ // Find the last message block in the continuation
+ mb = fragment_message->msg_block_;
+ while (mb->cont () != 0)
+ mb = mb->cont ();
+
+ // Add the current message block to the end of the chain
+ // after adjusting the read pointer to skip the GIOP header
+ queueable_message->msg_block_->rd_ptr(TAO_GIOP_MESSAGE_HEADER_LEN);
+ mb->cont (queueable_message->msg_block_);
+
+ // Get rid of the queuable message but save the message block
+ queueable_message->msg_block_ = 0;
+ queueable_message->release ();
+
+ // One note is that TAO_Queued_Data contains version numbers,
+ // but doesn't indicate the actual protocol to which those
+ // version numbers refer. That's not a problem, though, because
+ // instances of TAO_Queued_Data live in a queue, and that queue
+ // lives in a particular instance of a Transport, and the
+ // transport instance has an association with a particular
+ // messaging_object. The concrete messaging object embodies a
+ // messaging protocol, and must cover all versions of that
+ // protocol. Therefore, we just need to cover the bases of all
+ // versions of that one protocol.
}
- else if (retval == 1)
- {
- // If the message in the <incoming> message block has only
- // one message left we need to process that seperately.
-
- // Get a queued data
- TAO_Queued_Data *qd = this->make_queued_data (incoming);
-
- // Get the rest of the message data
- this->messaging_object ()->get_message_data (qd);
-
- // Add the missing data to the queue
- qd->missing_data_ = 0;
-
- // Check whether the message was fragmented and try to consolidate
- // the fragments..
- if (qd->more_fragments_ ||
- (qd->msg_type_ == TAO_PLUGGABLE_MESSAGE_FRAGMENT))
- {
- return this->consolidate_fragments (qd, rh);
- }
-
- // Add it to the tail of the queue..
- this->incoming_message_queue_.enqueue_tail (qd);
-
- // We should surely have a message in queue now. So just
- // process that.
- return this->process_queue_head (rh);
- }
-
- // parse_consolidate_messages () would have processed one of the
- // messages, so we better return as we dont want to starve other
- // threads.
- return 0;
- }
-
- // If we still have some missing data..
- if (missing_data > 0)
- {
- // Get the last message from the Queue
- TAO_Queued_Data *qd =
- this->incoming_message_queue_.dequeue_tail ();
-
- if (TAO_debug_level > 5)
- {
- ACE_DEBUG ((LM_DEBUG,
- "TAO (%P|%t) - Transport[%d]::consolidate_message_queue, "
- "trying recv, again\n",
- this->id ()));
- }
-
- // Try to do a read again. If we have some luck it would be
- // great..
- ssize_t n = this->recv (qd->msg_block_->wr_ptr (),
- missing_data,
- max_wait_time);
-
- if (TAO_debug_level > 5)
+ else
{
- ACE_DEBUG ((LM_DEBUG,
- "TAO (%P|%t) - Transport[%d]::consolidate_message_queue, "
- "recv retval [%d]\n",
- this->id (),
- n));
- }
+ // There is a complete chain of fragments
+ fragment_message->consolidate ();
- // Error...
- if (n < 0)
- {
- return n;
+ // Go ahead and enqueue this message
+ return this->incoming_message_queue_.enqueue_tail (
+ queueable_message);
}
-
- // If we get a EWOULDBLOCK ie. n==0, we should anyway put the
- // message in queue before returning..
- // Move the write pointer
- qd->msg_block_->wr_ptr (n);
-
- // Decrement the missing data
- qd->missing_data_ -= n;
-
- // Now put the TAO_Queued_Data back in the queue
- this->incoming_message_queue_.enqueue_tail (qd);
-
- // Any way as we have come this far and are about to return,
- // just try to process a message if it is there in the queue.
- if (this->incoming_message_queue_.is_head_complete ())
+ break;
+ case 0x0102: // GIOP 1.2
+ // In 1.2, we get a little more context. There's a
+ // FRAGMENT message-specific header, and inside that is the
+ // request id with which the fragment is associated.
+ fragment_message =
+ this->incoming_message_queue_.find_fragment (
+ queueable_message->request_id_);
+
+ // No fragment was found
+ if (fragment_message == 0)
+ return this->incoming_message_queue_.enqueue_tail (
+ queueable_message);
+
+ if (fragment_message->major_version_ != major ||
+ fragment_message->minor_version_ != minor)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "TAO (%P|%t) - "
+ "TAO_Transport::enqueue_incoming_message "
+ "GIOP versions do not match "
+ "(%d.%d != %d.%d\n",
+ fragment_message->major_version_,
+ fragment_message->minor_version_,
+ major, minor),
+ -1);
+
+ // Find the last message block in the continuation
+ mb = fragment_message->msg_block_;
+ while (mb->cont () != 0)
+ mb = mb->cont ();
+
+ // Add the current message block to the end of the chain
+ // after adjusting the read pointer to skip the GIOP header
+ queueable_message->msg_block_->rd_ptr(TAO_GIOP_MESSAGE_HEADER_LEN +
+ TAO_GIOP_MESSAGE_FRAGMENT_HEADER);
+ mb->cont (queueable_message->msg_block_);
+
+ // Remove our reference to the message block. At this point
+ // the message block of the fragment head owns it as part of a
+ // chain
+ queueable_message->msg_block_ = 0;
+
+ if (!queueable_message->more_fragments_)
{
- return this->process_queue_head (rh);
+ // This is the end of the fragments for this request
+ fragment_message->consolidate ();
}
- return 0;
+ // Get rid of the queuable message
+ queueable_message->release ();
+ break;
+ default:
+ if (!queueable_message->more_fragments_)
+ return this->incoming_message_queue_.enqueue_tail (
+ queueable_message);
+ // This is an unknown GIOP version
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "TAO (%P|%t) - "
+ "TAO_Transport::enqueue_incoming_message "
+ "can not handle a fragmented GIOP %d.%d "
+ "message\n", major, minor),
+ -1);
}
- // Process a message in the head of the queue if we have one..
- return this->process_queue_head (rh);
+ return 0;
}
int
-TAO_Transport::consolidate_extra_messages (ACE_Message_Block
- &incoming,
- TAO_Resume_Handle &rh)
-{
- if (TAO_debug_level > 4)
- {
- ACE_DEBUG ((LM_DEBUG,
- "TAO (%P|%t) - Transport[%d]::consolidate_extra_messages\n",
- this->id ()));
- }
-
- // Pick the tail of the queue
- TAO_Queued_Data *tail =
- this->incoming_message_queue_.dequeue_tail ();
-
- if (tail)
- {
- // If we have a node in the tail, checek to see whether it needs
- // consolidation. If so, just consolidate it.
- if (this->messaging_object ()->consolidate_node (tail, incoming) == -1)
- {
- return -1;
- }
-
- // .. put the tail back in queue..
- this->incoming_message_queue_.enqueue_tail (tail);
- }
-
- int retval = 1;
-
- if (TAO_debug_level > 6)
- {
- ACE_DEBUG ((LM_DEBUG,
- "TAO (%P|%t) - Transport[%d]::consolidate_extra_messages, "
- "extracting extra messages\n",
- this->id ()));
- }
-
- // Extract messages..
- while (retval == 1)
- {
- TAO_Queued_Data *q_data = 0;
-
- retval =
- this->messaging_object ()->extract_next_message (incoming,
- q_data);
- if (q_data)
- {
- // If we have read a framented message then...
- if (q_data->more_fragments_ ||
- q_data->msg_type_ == TAO_PLUGGABLE_MESSAGE_FRAGMENT)
- {
- this->consolidate_fragments (q_data, rh);
- }
- else
- {
- this->incoming_message_queue_.enqueue_tail (q_data);
- }
- }
- }
-
- // In case of error return..
- if (retval == -1)
- {
- return retval;
- }
-
- return this->process_queue_head (rh);
-}
-
-int
TAO_Transport::process_parsed_messages (TAO_Queued_Data *qd,
TAO_Resume_Handle &rh)
{
// Get the <message_type> that we have received
TAO_Pluggable_Message_Type t = qd->msg_type_;
- // int result = 0;
-
if (t == TAO_PLUGGABLE_MESSAGE_CLOSECONNECTION)
{
if (TAO_debug_level > 0)
@@ -1828,57 +1871,6 @@ TAO_Transport::process_parsed_messages (TAO_Queued_Data *qd,
return 0;
}
-TAO_Queued_Data *
-TAO_Transport::make_queued_data (ACE_Message_Block &incoming)
-{
- // Get an instance of TAO_Queued_Data
- TAO_Queued_Data *qd =
- TAO_Queued_Data::get_queued_data (
- this->orb_core_->transport_message_buffer_allocator ());
-
- // Get the flag for the details of the data block...
- ACE_Message_Block::Message_Flags flg =
- incoming.self_flags ();
-
- if (ACE_BIT_DISABLED (flg,
- ACE_Message_Block::DONT_DELETE))
- {
- // Duplicate the data block before putting it in the queue.
- qd->msg_block_ = ACE_Message_Block::duplicate (&incoming);
- }
- else
- {
- // As we are in CORBA mode, all the data blocks would be aligned
- // on an 8 byte boundary. Hence create a data block for more
- // than the actual length
- ACE_Data_Block *db =
- this->orb_core_->create_input_cdr_data_block (incoming.length ()+
- ACE_CDR::MAX_ALIGNMENT);
-
- // Get the allocator..
- ACE_Allocator *alloc =
- this->orb_core_->input_cdr_msgblock_allocator ();
-
- // Make message block..
- ACE_Message_Block mb (db,
- 0,
- alloc);
-
- // Duplicate the block..
- qd->msg_block_ = mb.duplicate ();
-
- // Align the message block
- ACE_CDR::mb_align (qd->msg_block_);
-
- // Copy the data..
- qd->msg_block_->copy (incoming.rd_ptr (),
- incoming.length ());
- }
-
-
- return qd;
-}
-
int
TAO_Transport::process_queue_head (TAO_Resume_Handle &rh)
{
@@ -1889,65 +1881,59 @@ TAO_Transport::process_queue_head (TAO_Resume_Handle &rh)
this->id ()));
}
- // See if the message in the head of the queue is complete...
- if (this->incoming_message_queue_.is_head_complete () > 0)
- {
- // Get the message on the head of the queue..
- TAO_Queued_Data *qd =
- this->incoming_message_queue_.dequeue_head ();
+ if (this->incoming_message_queue_.is_head_complete () != 1)
+ return 1;
- if (TAO_debug_level > 3)
+ // Get the message on the head of the queue..
+ TAO_Queued_Data *qd =
+ this->incoming_message_queue_.dequeue_head ();
+
+ if (TAO_debug_level > 3)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "TAO (%P|%t) - Transport[%d]::process_queue_head, "
+ "the size of the queue is [%d]\n",
+ this->id (),
+ this->incoming_message_queue_.queue_length()));
+ }
+ // Now that we have pulled out out one message out of the queue,
+ // check whether we have one more message in the queue...
+ if (this->incoming_message_queue_.queue_length () > 0)
+ {
+ if (TAO_debug_level > 0)
{
ACE_DEBUG ((LM_DEBUG,
"TAO (%P|%t) - Transport[%d]::process_queue_head, "
- "the size of the queue is [%d]\n",
- this->id (),
- this->incoming_message_queue_.queue_length()));
- }
- // Now that we have pulled out out one message out of the queue,
- // check whether we have one more message in the queue...
- if (this->incoming_message_queue_.is_head_complete () > 0)
- {
- if (TAO_debug_level > 0)
- {
- ACE_DEBUG ((LM_DEBUG,
- "TAO (%P|%t) - Transport[%d]::process_queue_head, "
- "notify reactor\n",
- this->id ()));
-
- }
- int retval =
- this->notify_reactor ();
+ "notify reactor\n",
+ this->id ()));
- if (retval == 1)
- {
- // Let the class know that it doesn't need to resume the
- // handle..
- rh.set_flag (TAO_Resume_Handle::TAO_HANDLE_LEAVE_SUSPENDED);
- }
- else if (retval < 0)
- return -1;
- }
- else
- {
- // As we are ready to process the last message just resume
- // the handle. Set the flag incase someone had reset the flag..
- rh.set_flag (TAO_Resume_Handle::TAO_HANDLE_RESUMABLE);
}
+ int retval =
+ this->notify_reactor ();
- // Process the message...
- if (this->process_parsed_messages (qd, rh) == -1)
+ if (retval == 1)
{
- return -1;
+ // Let the class know that it doesn't need to resume the
+ // handle..
+ rh.set_flag (TAO_Resume_Handle::TAO_HANDLE_LEAVE_SUSPENDED);
}
+ else if (retval < 0)
+ return -1;
+ }
+ else
+ {
+ // As we are ready to process the last message just resume
+ // the handle. Set the flag incase someone had reset the flag..
+ rh.set_flag (TAO_Resume_Handle::TAO_HANDLE_RESUMABLE);
+ }
- // Delete the Queued_Data..
- TAO_Queued_Data::release (qd);
+ // Process the message...
+ int retval = this->process_parsed_messages (qd, rh);
- return 0;
- }
+ // Delete the Queued_Data..
+ TAO_Queued_Data::release (qd);
- return 1;
+ return (retval == -1) ? -1 : 0;
}
int
@@ -1959,6 +1945,8 @@ TAO_Transport::notify_reactor (void)
}
ACE_Event_Handler *eh = this->event_handler_i ();
+ if (eh == 0)
+ return -1;
// Get the reactor associated with the event handler
ACE_Reactor *reactor = this->orb_core ()->reactor ();
@@ -1995,6 +1983,18 @@ TAO_Transport::transport_cache_manager (void)
return this->orb_core_->lane_resources ().transport_cache ();
}
+size_t
+TAO_Transport::recv_buffer_size (void)
+{
+ return this->recv_buffer_size_;
+}
+
+size_t
+TAO_Transport::sent_byte_count (void)
+{
+ return this->sent_byte_count_;
+}
+
void
TAO_Transport::assign_translators (TAO_InputCDR *inp, TAO_OutputCDR *outp)
{
diff --git a/TAO/tao/Transport.h b/TAO/tao/Transport.h
index b6059b211fe..4b885eb40e5 100644
--- a/TAO/tao/Transport.h
+++ b/TAO/tao/Transport.h
@@ -447,7 +447,6 @@ public:
virtual ACE_Event_Handler * event_handler_i (void) = 0;
protected:
-
virtual TAO_Connection_Handler * connection_handler_i (void) = 0;
public:
@@ -489,6 +488,7 @@ public:
virtual int handle_input (TAO_Resume_Handle &rh,
ACE_Time_Value *max_wait_time = 0,
int block = 0);
+ void try_to_complete (ACE_Time_Value *max_wait_time);
enum
{
@@ -568,60 +568,11 @@ public:
protected:
- /// Called by the handle_input_i (). This method is used to parse
- /// message read by the handle_input_i () call. It also decides
- /// whether the message needs consolidation before processing.
- int parse_consolidate_messages (ACE_Message_Block &bl,
- TAO_Resume_Handle &rh,
- ACE_Time_Value *time = 0);
-
-
- /// Method does parsing of the message if we have a fresh message in
- /// the <message_block> or just returns if we have read part of the
- /// previously stored message.
- int parse_incoming_messages (ACE_Message_Block &message_block);
-
- /// Return if we have any missing data in the queue of messages
- /// or determine if we have more information left out in the
- /// presently read message to make it complete.
- size_t missing_data (ACE_Message_Block &message_block);
-
- /// Consolidate the currently read message or consolidate the last
- /// message in the queue. The consolidation of the last message in
- /// the queue is done by calling consolidate_message_queue ().
- virtual int consolidate_message (ACE_Message_Block &incoming,
- ssize_t missing_data,
- TAO_Resume_Handle &rh,
- ACE_Time_Value *max_wait_time);
-
- /// @@Bala: Docu???
- int consolidate_fragments (TAO_Queued_Data *qd,
- TAO_Resume_Handle &rh);
-
- /// First consolidate the message queue. If the message is still not
- /// complete, try to read from the handle again to make it
- /// complete. If these dont help put the message back in the queue
- /// and try to check the queue if we have message to process. (the
- /// thread needs to do some work anyway :-))
- int consolidate_message_queue (ACE_Message_Block &incoming,
- ssize_t missing_data,
- TAO_Resume_Handle &rh,
- ACE_Time_Value *max_wait_time);
-
- /// Called by parse_consolidate_message () if we have more messages
- /// in one read. Queue up the messages and try to process one of
- /// them, atleast at the head of them.
- int consolidate_extra_messages (ACE_Message_Block &incoming,
- TAO_Resume_Handle &rh);
-
/// Process the message by sending it to the higher layers of the
/// ORB.
int process_parsed_messages (TAO_Queued_Data *qd,
TAO_Resume_Handle &rh);
- /// Make a queued data from the <incoming> message block
- TAO_Queued_Data *make_queued_data (ACE_Message_Block &incoming);
-
/// Implement send_message_shared() assuming the handler_lock_ is
/// held.
int send_message_shared_i (TAO_Stub *stub,
@@ -669,6 +620,40 @@ public:
int handle_timeout (const ACE_Time_Value &current_time,
const void* act);
+ /// Accessor to recv_buffer_size_
+ size_t recv_buffer_size (void);
+
+ /// Accessor to sent_byte_count_
+ size_t sent_byte_count (void);
+
+
+ /*!
+ \name Incoming Queue Methods
+ */
+ //@{
+ /*!
+ \brief Queue up \a queueable_message as a completely-received incoming message.
+
+ This method queues up a completely-received queueable GIOP message
+ (i.e., it must be dynamically-allocated). It does not assemble a
+ complete GIOP message; that should be done prior to calling this
+ message, and is currently done in handle_input_i.
+
+ This does, however, assure that a completely-received GIOP
+ FRAGMENT gets associated with any previously-received related
+ fragments. It does this through collaboration with the messaging
+ object (since fragment reassembly is protocol specific).
+
+ \param queueable_message instance as returned by one of the TAO_Queued_Data::make_*_message that's been completely received
+
+ \return 0 successfully enqueued \a queueable_message
+
+ \return -1 failed to enqueue \a queueable_message
+ \todo How do we indicate \em what may have failed?
+ */
+ int enqueue_incoming_message (TAO_Queued_Data *queueable_message);
+ //@}
+
/// CodeSet Negotiation - Get the char codeset translator factory
///
TAO_Codeset_Translator_Factory *char_translator (void) const;
@@ -792,10 +777,14 @@ private:
/// Print out error messages if the event handler is not valid
void report_invalid_event_handler (const char *caller);
- /*
+ /**
* Process the message that is in the head of the incoming queue.
* If there are more messages in the queue, this method calls
- * this->notify_reactor () to wake up a thread
+ * this->notify_reactor () to wake up a thread.
+ *
+ * \return -1 An error occurred; occurs independent presence of messages in the queue.
+ * \return 1 No messages in the queue to process; nothing processed.
+ * \return 0 Messages were in the queue to process and one got processed.
*/
int process_queue_head (TAO_Resume_Handle &rh);
@@ -856,9 +845,12 @@ protected:
TAO_Queued_Message *head_;
TAO_Queued_Message *tail_;
- /// Queue of the incoming messages..
+ /// Queue of the completely-received incoming messages..
TAO_Incoming_Message_Queue incoming_message_queue_;
+ /// Place to hold a partially-received (waiting-to-be-completed) message
+ TAO_Queued_Data * uncompleted_message_;
+
/// The queue will start draining no later than <queing_deadline_>
/// *if* the deadline is
ACE_Time_Value current_deadline_;
@@ -893,8 +885,11 @@ protected:
/// Used by the LRU, LFU and FIFO Connection Purging Strategies.
unsigned long purging_order_;
-private:
+ /// Size of the buffer received.
+ size_t recv_buffer_size_;
+ /// Number of bytes sent.
+ size_t sent_byte_count_;
/// @@Phil, I think it would be nice if we could think of a way to
/// do the following.
/// We have been trying to use the transport for marking about
@@ -907,6 +902,7 @@ private:
/// we can move this to the connection_handler and it may more sense
/// with the DSCP stuff around there. Do you agree?
+private:
/// Additional member values required to support codeset translation
TAO_Codeset_Translator_Factory *char_translator_;
TAO_Codeset_Translator_Factory *wchar_translator_;
diff --git a/TAO/tao/default_client.cpp b/TAO/tao/default_client.cpp
index 0af1359c670..9d6cc79e0b8 100644
--- a/TAO/tao/default_client.cpp
+++ b/TAO/tao/default_client.cpp
@@ -179,7 +179,7 @@ TAO_Default_Client_Strategy_Factory::parse_args (int argc, ACE_TCHAR* argv[])
ACE_LIB_TEXT("LF")) == 0)
this->connect_strategy_ = TAO_LEADER_FOLLOWER_CONNECT;
else
- this->report_option_value_error (ACE_LIB_TEXT("-ORBTransportMuxStrategy"), name);
+ this->report_option_value_error (ACE_LIB_TEXT("-ORBConnectStrategy"), name);
}
}
else if (ACE_OS::strcmp (argv[curarg],
diff --git a/TAO/tao/orbconf.h b/TAO/tao/orbconf.h
index 0abdc639a0e..f2b8d6a583b 100644
--- a/TAO/tao/orbconf.h
+++ b/TAO/tao/orbconf.h
@@ -146,6 +146,25 @@ const size_t TAO_DEFAULT_VALUE_FACTORY_TABLE_SIZE = 128;
#define TAO_MAXBUFSIZE 1024
#endif /* TAO_MAXBUFSIZE */
+/*!
+
+ The number of times the transport will try to re-read before
+ returning control to the reactor when it has an uncompleted
+ message (see TAO_Transport::handle_input()).
+
+ The idea behind re-reading is that more data may have arrived
+ while the transport was busy deciding what to do with the bytes
+ it got, so we should probably try to re-read.
+
+ This value shouldn't be too large, lest the transport starve
+ out other transports while trying to complete its message.
+
+ When choosing a value, think of the type of this as 'unsigned int'.
+ */
+#if !defined(TAO_MAX_TRANSPORT_REREAD_ATTEMPTS)
+#define TAO_MAX_TRANSPORT_REREAD_ATTEMPTS 2
+#endif
+
// This controls the alignment for TAO structs. It supports built-in
// types up to and including 16 bytes (128 bits) in size.
#if !defined (TAO_MAXIMUM_NATIVE_TYPE_SIZE)