summaryrefslogtreecommitdiff
path: root/TAO
diff options
context:
space:
mode:
authorelliott_c <elliott_c@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>2005-01-19 14:27:41 +0000
committerelliott_c <elliott_c@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>2005-01-19 14:27:41 +0000
commitbcb05bfa515a6caf4fff170ab03d70bfa28a0998 (patch)
tree2155c284d23c8235ddda51e4f36f7622916abf47 /TAO
parent5a3732aa360540887df0dc2fe5d9af9edfc953b2 (diff)
downloadATCD-bcb05bfa515a6caf4fff170ab03d70bfa28a0998.tar.gz
ChangeLogTag: Wed Jan 19 08:24:36 2005 Chad Elliott <elliott_c@ociweb.com>
Diffstat (limited to 'TAO')
-rw-r--r--TAO/ChangeLog201
-rw-r--r--TAO/tao/GIOP_Message_Base.cpp166
-rw-r--r--TAO/tao/GIOP_Message_Base.h23
-rw-r--r--TAO/tao/GIOP_Message_Generator_Parser.h3
-rw-r--r--TAO/tao/GIOP_Message_Generator_Parser_10.cpp6
-rw-r--r--TAO/tao/GIOP_Message_Generator_Parser_10.h2
-rw-r--r--TAO/tao/GIOP_Message_Generator_Parser_12.cpp7
-rw-r--r--TAO/tao/GIOP_Message_Generator_Parser_12.h3
-rw-r--r--TAO/tao/GIOP_Message_Generator_Parser_Impl.inl19
-rw-r--r--TAO/tao/GIOP_Message_Lite.cpp164
-rw-r--r--TAO/tao/GIOP_Message_Lite.h18
-rw-r--r--TAO/tao/GIOP_Message_State.cpp57
-rw-r--r--TAO/tao/GIOP_Message_State.h19
-rw-r--r--TAO/tao/Incoming_Message_Queue.cpp192
-rw-r--r--TAO/tao/Incoming_Message_Queue.h86
-rw-r--r--TAO/tao/Incoming_Message_Queue.inl17
-rw-r--r--TAO/tao/Pluggable_Messaging.h8
-rw-r--r--TAO/tao/Transport.cpp398
-rw-r--r--TAO/tao/Transport.h15
-rw-r--r--TAO/tests/GIOP_Fragments/Java_Big_Reply/Client_Task.cpp60
-rw-r--r--TAO/tests/GIOP_Fragments/Java_Big_Reply/Client_Task.h38
-rw-r--r--TAO/tests/GIOP_Fragments/Java_Big_Reply/Java_Big_Reply.mpc45
-rw-r--r--TAO/tests/GIOP_Fragments/Java_Big_Reply/Test.idl18
-rw-r--r--TAO/tests/GIOP_Fragments/Java_Big_Reply/client.cpp95
-rwxr-xr-xTAO/tests/GIOP_Fragments/Java_Big_Reply/run_test.pl62
-rw-r--r--TAO/tests/GIOP_Fragments/Java_Big_Reply/server.java62
-rw-r--r--TAO/tests/GIOP_Fragments/Java_Big_Request/Java_Big_Request.mpc42
-rw-r--r--TAO/tests/GIOP_Fragments/Java_Big_Request/Payload_Receiver.cpp45
-rw-r--r--TAO/tests/GIOP_Fragments/Java_Big_Request/Payload_Receiver.h51
-rw-r--r--TAO/tests/GIOP_Fragments/Java_Big_Request/Test.idl21
-rw-r--r--TAO/tests/GIOP_Fragments/Java_Big_Request/client.java47
-rwxr-xr-xTAO/tests/GIOP_Fragments/Java_Big_Request/run_test.pl53
-rw-r--r--TAO/tests/GIOP_Fragments/Java_Big_Request/server.cpp119
-rw-r--r--TAO/tests/GIOP_Fragments/PMB_With_Fragments/PMB_With_Fragments.mpc2
-rw-r--r--TAO/tests/GIOP_Fragments/PMB_With_Fragments/Payload_Receiver.cpp45
-rw-r--r--TAO/tests/GIOP_Fragments/PMB_With_Fragments/Payload_Receiver.h51
-rw-r--r--TAO/tests/GIOP_Fragments/PMB_With_Fragments/Test.idl21
-rwxr-xr-xTAO/tests/GIOP_Fragments/PMB_With_Fragments/dribble.pl147
-rw-r--r--TAO/tests/GIOP_Fragments/PMB_With_Fragments/giop1.2_fragments.datbin0 -> 2032304 bytes
-rw-r--r--TAO/tests/GIOP_Fragments/PMB_With_Fragments/giop1.2_fragments.layout86
-rwxr-xr-xTAO/tests/GIOP_Fragments/PMB_With_Fragments/run_test.pl47
-rw-r--r--TAO/tests/GIOP_Fragments/PMB_With_Fragments/server.cpp137
42 files changed, 2251 insertions, 447 deletions
diff --git a/TAO/ChangeLog b/TAO/ChangeLog
index c02c4a27077..4b36a778365 100644
--- a/TAO/ChangeLog
+++ b/TAO/ChangeLog
@@ -1,3 +1,64 @@
+Wed Jan 19 08:24:36 2005 Chad Elliott <elliott_c@ociweb.com>
+
+ * tao/GIOP_Message_Base.h:
+ * tao/GIOP_Message_Base.cpp:
+ * tao/GIOP_Message_Generator_Parser.h:
+ * tao/GIOP_Message_Generator_Parser_10.h:
+ * tao/GIOP_Message_Generator_Parser_10.cpp:
+ * tao/GIOP_Message_Generator_Parser_12.h:
+ * tao/GIOP_Message_Generator_Parser_12.cpp:
+ * tao/GIOP_Message_Generator_Parser_Impl.inl:
+ * tao/GIOP_Message_Lite.h:
+ * tao/GIOP_Message_Lite.cpp:
+ * tao/GIOP_Message_State.h:
+ * tao/GIOP_Message_State.cpp:
+ * tao/Incoming_Message_Queue.h:
+ * tao/Incoming_Message_Queue.inl:
+ * tao/Incoming_Message_Queue.cpp:
+ * tao/Pluggable_Messaging.h:
+ * tao/Transport.h:
+ * tao/Transport.cpp:
+
+ Adding an alternate implementation of PMB and an implementation of
+ GIOP fragment handling that only consolidates fragments when the
+ last one is received.
+
+ Partial GIOP messages are accumulated in a message block within
+ the TAO_Transport until a sufficient amount of data is collected.
+ At that point, processing occurs as usual.
+
+ * tests/GIOP_Fragments/Java_Big_Reply/Client_Task.h:
+ * tests/GIOP_Fragments/Java_Big_Reply/Client_Task.cpp:
+ * tests/GIOP_Fragments/Java_Big_Reply/Java_Big_Reply.mpc:
+ * tests/GIOP_Fragments/Java_Big_Reply/Test.idl:
+ * tests/GIOP_Fragments/Java_Big_Reply/client.cpp:
+ * tests/GIOP_Fragments/Java_Big_Reply/run_test.pl:
+ * tests/GIOP_Fragments/Java_Big_Reply/server.java:
+ * tests/GIOP_Fragments/Java_Big_Request/Java_Big_Request.mpc:
+ * tests/GIOP_Fragments/Java_Big_Request/Payload_Receiver.h:
+ * tests/GIOP_Fragments/Java_Big_Request/Payload_Receiver.cpp:
+ * tests/GIOP_Fragments/Java_Big_Request/Test.idl:
+ * tests/GIOP_Fragments/Java_Big_Request/client.java:
+ * tests/GIOP_Fragments/Java_Big_Request/run_test.pl:
+ * tests/GIOP_Fragments/Java_Big_Request/server.cpp:
+
+ When the Java client/server is run using the JDK ORB, it will
+ fragment the GIOP messages to exercise the GIOP fragment handling
+ code in TAO.
+
+ * tests/GIOP_Fragments/PMB_With_Fragments/PMB_With_Fragments.mpc:
+ * tests/GIOP_Fragments/PMB_With_Fragments/Payload_Receiver.h:
+ * tests/GIOP_Fragments/PMB_With_Fragments/Payload_Receiver.cpp:
+ * tests/GIOP_Fragments/PMB_With_Fragments/Test.idl:
+ * tests/GIOP_Fragments/PMB_With_Fragments/dribble.pl:
+ * tests/GIOP_Fragments/PMB_With_Fragments/giop1.2_fragments.dat:
+ * tests/GIOP_Fragments/PMB_With_Fragments/giop1.2_fragments.layout:
+ * tests/GIOP_Fragments/PMB_With_Fragments/run_test.pl:
+ * tests/GIOP_Fragments/PMB_With_Fragments/server.cpp:
+
+ This test uses canned GIOP 1.2 fragment data and exercises the PMB
+ code as well as GIOP fragment handling code.
+
Wed Jan 19 12:26:12 UTC 2005 Johnny Willemsen <jwillemsen@remedy.nl>
* tao/Strategies/SHMIOP_Endpoint.cpp:
@@ -73,120 +134,120 @@ Thu Jan 13 13:50:26 2005 Jeff Parsons <j.parsons@vanderbilt.edu>
Thu Jan 13 11:55:55 2005 Ossama Othman <ossama@dre.vanderbilt.edu>
- * orbsvcs/orbsvcs/CosEvent/CEC_Event_Loader.cpp:
+ * orbsvcs/orbsvcs/CosEvent/CEC_Event_Loader.cpp:
- Include "ace/OS_NS_unistd.h" to pull in ACE_OS::getpid()
- declaration.
+ Include "ace/OS_NS_unistd.h" to pull in ACE_OS::getpid()
+ declaration.
- Replaced ACE_static_cast macro usage with standard C++
- static_cast<>.
+ Replaced ACE_static_cast macro usage with standard C++
+ static_cast<>.
Thu Jan 13 11:52:55 2005 Ossama Othman <ossama@dre.vanderbilt.edu>
- * orbsvcs/tests/Concurrency/CC_command.cpp (execute):
+ * orbsvcs/tests/Concurrency/CC_command.cpp (execute):
- Use "fgetc (stdin)" instead of "getchar ()". ACE doesn't wrap
- the latter. Fixes a build error.
+ Use "fgetc (stdin)" instead of "getchar ()". ACE doesn't wrap
+ the latter. Fixes a build error.
Thu Jan 13 01:07:11 2005 Ossama Othman <ossama@dre.vanderbilt.edu>
- * tao/PortableServer/POA.h (objectkey_prefix):
- * tao/PortableServer/POA.cpp (objectkey_prefix):
+ * tao/PortableServer/POA.h (objectkey_prefix):
+ * tao/PortableServer/POA.cpp (objectkey_prefix):
- Changed this static array is to be an array of constant
- CORBA::Octets of instead of an array of non-constant
- CORBA::Octets. The former is what was intended. Also provides
- compiler with additional optimization opportunities.
+ Changed this static array is to be an array of constant
+ CORBA::Octets of instead of an array of non-constant
+ CORBA::Octets. The former is what was intended. Also provides
+ compiler with additional optimization opportunities.
Wed Jan 12 23:01:24 2005 Ossama Othman <ossama@dre.vanderbilt.edu>
- * tests/Portable_Interceptors/PolicyFactory/IORInterceptor.cpp:
+ * tests/Portable_Interceptors/PolicyFactory/IORInterceptor.cpp:
- Fixed spelling typo.
+ Fixed spelling typo.
- * tests/Portable_Interceptors/PolicyFactory/server.cpp:
+ * tests/Portable_Interceptors/PolicyFactory/server.cpp:
- Cosmetic change.
+ Cosmetic change.
Wed Jan 12 22:36:58 2005 Ossama Othman <ossama@dre.vanderbilt.edu>
- * tao/ObjRefTemplate/ORT_Adapter_Impl.h (tao_ort_template_):
+ * tao/ObjRefTemplate/ORT_Adapter_Impl.h (tao_ort_template_):
- Removed this attribute. A local variable within the activate()
- method is all that is needed.
+ Removed this attribute. A local variable within the activate()
+ method is all that is needed.
- (ORT_Adapter_Impl):
+ (ORT_Adapter_Impl):
- Removed default constructor declaration. The compiler generated
- one will now suffice since all remaining class members define
- suitable default constructors.
+ Removed default constructor declaration. The compiler generated
+ one will now suffice since all remaining class members define
+ suitable default constructors.
- * tao/ObjRefTemplate/ORT_Adapter_Impl.cpp:
+ * tao/ObjRefTemplate/ORT_Adapter_Impl.cpp:
- Coding convention improvements.
+ Coding convention improvements.
- (ORT_Adapter_Impl):
+ (ORT_Adapter_Impl):
- Removed default constructor implementation. See default
- constructor declaration removal description above.
+ Removed default constructor implementation. See default
+ constructor declaration removal description above.
- (activate):
+ (activate):
- Fixed invalid widening assignment from
- PortableInterceptor::ObjectReferenceTemplate_var to
- PortableInterceptor::ObjectReferenceFactory_var.
+ Fixed invalid widening assignment from
+ PortableInterceptor::ObjectReferenceTemplate_var to
+ PortableInterceptor::ObjectReferenceFactory_var.
- * tao/PortableServer/ORT_Adapter.h (release):
+ * tao/PortableServer/ORT_Adapter.h (release):
- Added this new pure virtual method. It allows the POA to
- explicitly release an ObjectReferenceTemplate instance without
- pulling in valuetype related code.
+ Added this new pure virtual method. It allows the POA to
+ explicitly release an ObjectReferenceTemplate instance without
+ pulling in valuetype related code.
- * tao/PortableServer/POA.cpp (destroy_i):
+ * tao/PortableServer/POA.cpp (destroy_i):
- Fixed memory leak related to tricky ObjectReferenceTemplate
- memory management.
+ Fixed memory leak related to tricky ObjectReferenceTemplate
+ memory management.
- * tao/PortableServer/POA.h:
+ * tao/PortableServer/POA.h:
- Cosmetic improvement.
+ Cosmetic improvement.
- * tao/RTCORBA/Network_Priority_Mapping_Manager.h:
- * tao/RTCORBA/Priority_Mapping_Manager.h:
- * tao/RTScheduling/Current.h:
- * tao/RTScheduling/RTScheduler_Manager.h:
+ * tao/RTCORBA/Network_Priority_Mapping_Manager.h:
+ * tao/RTCORBA/Priority_Mapping_Manager.h:
+ * tao/RTScheduling/Current.h:
+ * tao/RTScheduling/RTScheduler_Manager.h:
- Changed all TAO-specific "_var" classes to inherit privately,
- rather than publicly, from TAO_Base_var. Public inheritance in
- this case is not correct since (1) the TAO-specific "_var"
- classes are not meant to satisfy an "IS-A" relationship with
- TAO_Base_var, and (2) since it allows some invalid widening
- assignments to occur.
+ Changed all TAO-specific "_var" classes to inherit privately,
+ rather than publicly, from TAO_Base_var. Public inheritance in
+ this case is not correct since (1) the TAO-specific "_var"
+ classes are not meant to satisfy an "IS-A" relationship with
+ TAO_Base_var, and (2) since it allows some invalid widening
+ assignments to occur.
- * tao/Valuetype/Value_VarOut_T.h (TAO_Value_Var_T):
+ * tao/Valuetype/Value_VarOut_T.h (TAO_Value_Var_T):
- Inherit privately from TAO_Base_var and declare a private copy
- constructor and assignment operator that accept a reference to a
- constant TAO_Base_var. Prevents invalid widening assignments.
+ Inherit privately from TAO_Base_var and declare a private copy
+ constructor and assignment operator that accept a reference to a
+ constant TAO_Base_var. Prevents invalid widening assignments.
- * tao/Valuetype/Value_VarOut_T.cpp (TAO_Value_Var_T):
+ * tao/Valuetype/Value_VarOut_T.cpp (TAO_Value_Var_T):
- Explicitly initialize TAO_Base_var base class in base member
- initializer list of copy constructor.
+ Explicitly initialize TAO_Base_var base class in base member
+ initializer list of copy constructor.
- * orbsvcs/tests/Concurrency/CC_command.cpp (execute):
+ * orbsvcs/tests/Concurrency/CC_command.cpp (execute):
- Replaced use of insecure gets() function call with one to
- getchar(). The latter is all that is needed since only one
- character is needed from stdin.
+ Replaced use of insecure gets() function call with one to
+ getchar(). The latter is all that is needed since only one
+ character is needed from stdin.
Wed Jan 12 19:53:06 2005 J.T. Conklin <jtc@acorntoolworks.com>
- * orbsvcs/orbsvcs/CosEvent/CEC_Event_Loader.cpp:
+ * orbsvcs/orbsvcs/CosEvent/CEC_Event_Loader.cpp:
- Changed to not crash if ior file can not be opened.
+ Changed to not crash if ior file can not be opened.
- Added '-p' command line option to write process id to file.
+ Added '-p' command line option to write process id to file.
Wed Jan 12 15:57:30 2005 Huang-Ming Huang <hh1@cse.wustl.edu>
@@ -197,10 +258,10 @@ Wed Jan 12 15:57:30 2005 Huang-Ming Huang <hh1@cse.wustl.edu>
Tue Jan 11 23:19:33 2005 Balachandran Natarajan <bala@dre.vanderbilt.edu>
- * tao/GIOP_Message_Base.cpp:
+ * tao/GIOP_Message_Base.cpp:
- Fixed a debug output which was appearing without the debug level
- being set. Thanks to Lother Werzinger for reporting this.
+ Fixed a debug output which was appearing without the debug level
+ being set. Thanks to Lother Werzinger for reporting this.
Tue Jan 11 20:44:30 2005 Huang-Ming Huang <hh1@cse.wustl.edu>
diff --git a/TAO/tao/GIOP_Message_Base.cpp b/TAO/tao/GIOP_Message_Base.cpp
index 643df2a84f3..d37a4f06f15 100644
--- a/TAO/tao/GIOP_Message_Base.cpp
+++ b/TAO/tao/GIOP_Message_Base.cpp
@@ -20,8 +20,7 @@ ACE_RCSID (tao,
TAO_GIOP_Message_Base::TAO_GIOP_Message_Base (TAO_ORB_Core *orb_core,
size_t /*input_cdr_size*/)
: orb_core_ (orb_core)
- , message_state_ (orb_core,
- this)
+ , message_state_ ()
, out_stream_ (this->buffer_,
sizeof this->buffer_, /* ACE_CDR::DEFAULT_BUFSIZE */
TAO_ENCAP_BYTE_ORDER,
@@ -40,7 +39,6 @@ TAO_GIOP_Message_Base::TAO_GIOP_Message_Base (TAO_ORB_Core *orb_core,
TAO_GIOP_Message_Base::~TAO_GIOP_Message_Base (void)
{
-
}
@@ -293,7 +291,7 @@ TAO_GIOP_Message_Base::format_message (TAO_OutputCDR &stream)
TAO_Pluggable_Message_Type
TAO_GIOP_Message_Base::message_type (
- TAO_GIOP_Message_State &msg_state)
+ const TAO_GIOP_Message_State &msg_state) const
{
// Convert to the right type of Pluggable Messaging message type.
@@ -330,13 +328,7 @@ TAO_GIOP_Message_Base::message_type (
int
TAO_GIOP_Message_Base::parse_incoming_messages (ACE_Message_Block &incoming)
{
-
- if (this->message_state_.parse_message_header (incoming) == -1)
- {
- return -1;
- }
-
- return 0;
+ return this->message_state_.parse_message_header (incoming);
}
ssize_t
@@ -366,9 +358,6 @@ int
TAO_GIOP_Message_Base::extract_next_message (ACE_Message_Block &incoming,
TAO_Queued_Data *&qd)
{
- TAO_GIOP_Message_State state (this->orb_core_,
- this);
-
if (incoming.length () < TAO_GIOP_MESSAGE_HEADER_LEN)
{
if (incoming.length () > 0)
@@ -385,6 +374,7 @@ TAO_GIOP_Message_Base::extract_next_message (ACE_Message_Block &incoming,
return 0;
}
+ TAO_GIOP_Message_State state;
if (state.parse_message_header (incoming) == -1)
{
return -1;
@@ -396,8 +386,7 @@ TAO_GIOP_Message_Base::extract_next_message (ACE_Message_Block &incoming,
if (copying_len > incoming.length ())
{
- qd->missing_data_ =
- copying_len - incoming.length ();
+ qd->missing_data_ = copying_len - incoming.length ();
copying_len = incoming.length ();
}
@@ -406,10 +395,8 @@ TAO_GIOP_Message_Base::extract_next_message (ACE_Message_Block &incoming,
copying_len);
incoming.rd_ptr (copying_len);
- qd->byte_order_ = state.byte_order_;
- qd->major_version_ = state.giop_version_.major;
- qd->minor_version_ = state.giop_version_.minor;
- qd->msg_type_ = this->message_type (state);
+ this->init_queued_data (qd, state);
+
return 1;
}
@@ -434,8 +421,7 @@ TAO_GIOP_Message_Base::consolidate_node (TAO_Queued_Data *qd,
// Move the rd_ptr () in the incoming message block..
incoming.rd_ptr (TAO_GIOP_MESSAGE_HEADER_LEN - len);
- TAO_GIOP_Message_State state (this->orb_core_,
- this);
+ TAO_GIOP_Message_State state;
// Parse the message header now...
if (state.parse_message_header (*qd->msg_block_) == -1)
@@ -451,16 +437,14 @@ TAO_GIOP_Message_Base::consolidate_node (TAO_Queued_Data *qd,
// Copy the pay load..
// Calculate the bytes that needs to be copied in the queue...
- size_t copy_len =
- state.payload_size ();
+ size_t copy_len = state.payload_size ();
// If the data that needs to be copied is more than that is
// available to us ..
if (copy_len > incoming.length ())
{
// Calculate the missing data..
- qd->missing_data_ =
- copy_len - incoming.length ();
+ qd->missing_data_ = copy_len - incoming.length ();
// Set the actual possible copy_len that is available...
copy_len = incoming.length ();
@@ -479,10 +463,7 @@ TAO_GIOP_Message_Base::consolidate_node (TAO_Queued_Data *qd,
incoming.rd_ptr (copy_len);
// Get the other details...
- qd->byte_order_ = state.byte_order_;
- qd->major_version_ = state.giop_version_.major;
- qd->minor_version_ = state.giop_version_.minor;
- qd->msg_type_ = this->message_type (state);
+ this->init_queued_data (qd, state);
}
else
{
@@ -492,8 +473,7 @@ TAO_GIOP_Message_Base::consolidate_node (TAO_Queued_Data *qd,
if (copy_len > incoming.length ())
{
// Calculate the missing data..
- qd->missing_data_ =
- copy_len - incoming.length ();
+ qd->missing_data_ = copy_len - incoming.length ();
// Set the actual possible copy_len that is available...
copy_len = incoming.length ();
@@ -509,54 +489,6 @@ TAO_GIOP_Message_Base::consolidate_node (TAO_Queued_Data *qd,
}
-
- return 0;
-}
-
-
-int
-TAO_GIOP_Message_Base::consolidate_fragments (TAO_Queued_Data *dqd,
- const TAO_Queued_Data *sqd)
-{
- if (dqd->byte_order_ != sqd->byte_order_
- || dqd->major_version_ != sqd->major_version_
- || dqd->minor_version_ != sqd->minor_version_)
- {
- // Yes, print it out in all debug levels!. This is an error by
- // CORBA 2.4 spec
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("TAO (%P|%t) incompatible fragments:")
- ACE_TEXT ("different GIOP versions or byte order\n")));
- return -1;
- }
-
- // Skip the header in the incoming message
- sqd->msg_block_->rd_ptr (TAO_GIOP_MESSAGE_HEADER_LEN);
-
- // If we have a fragment header skip the header length too..
- if (sqd->minor_version_ == 2 &&
- sqd->msg_type_ == TAO_PLUGGABLE_MESSAGE_FRAGMENT)
- sqd->msg_block_->rd_ptr (TAO_GIOP_MESSAGE_FRAGMENT_HEADER);
-
- // Get the length of the incoming message block..
- size_t incoming_length =
- sqd->msg_block_->length ();
-
- // Increase the size of the destination message block if we need
- // to.
- ACE_Message_Block *mb =
- dqd->msg_block_;
-
- // Check space before growing.
- if (mb->space () < incoming_length)
- {
- ACE_CDR::grow (mb,
- mb->length () + incoming_length);
- }
-
- // Copy the data
- dqd->msg_block_->copy (sqd->msg_block_->rd_ptr (),
- incoming_length);
return 0;
}
@@ -564,22 +496,7 @@ void
TAO_GIOP_Message_Base::get_message_data (TAO_Queued_Data *qd)
{
// Get the message information
- qd->byte_order_ =
- this->message_state_.byte_order_;
- qd->major_version_ =
- this->message_state_.giop_version_.major;
- qd->minor_version_ =
- this->message_state_.giop_version_.minor;
-
- //qd->more_fragments_ = this->message_state_.more_fragments_;
-
- if (this->message_state_.more_fragments_)
- qd->more_fragments_ = 1;
- else
- qd->more_fragments_ = 0;
-
- qd->msg_type_=
- this->message_type (this->message_state_);
+ this->init_queued_data (qd, this->message_state_);
// Reset the message_state
this->message_state_.reset ();
@@ -588,7 +505,6 @@ TAO_GIOP_Message_Base::get_message_data (TAO_Queued_Data *qd)
int
TAO_GIOP_Message_Base::process_request_message (TAO_Transport *transport,
TAO_Queued_Data *qd)
-
{
// Set the upcall thread
this->orb_core_->lf_strategy ().set_upcall_thread (this->orb_core_->leader_follower ());
@@ -602,15 +518,15 @@ TAO_GIOP_Message_Base::process_request_message (TAO_Transport *transport,
generator_parser);
// A buffer that we will use to initialise the CDR stream
-#if defined (ACE_HAS_PURIFY)
+#if defined (ACE_INITIALIZE_MEMORY_BEFORE_USE)
char repbuf[ACE_CDR::DEFAULT_BUFSIZE] = { 0 };
#else
char repbuf[ACE_CDR::DEFAULT_BUFSIZE];
-#endif /* ACE_HAS_PURIFY */
+#endif /* ACE_INITIALIZE_MEMORY_BEFORE_USE */
// Initialze an output CDR on the stack
// NOTE: Don't jump to a conclusion as to why we are using the
- // inpout_cdr and hence the global pool here. These pools will move
+ // input_cdr and hence the global pool here. These pools will move
// to the lanes anyway at some point of time. Further, it would have
// been awesome to have this in TSS. But for some reason the cloning
// that happens when the ORB gets flow controlled while writing a
@@ -823,7 +739,7 @@ TAO_GIOP_Message_Base::generate_exception_reply (
// Close the handle.
ACE_DEBUG ((LM_DEBUG,
ACE_TEXT ("(%P|%t|%N|%l) cannot marshal exception, ")
- ACE_TEXT ("generate_exception_reply ()")));
+ ACE_TEXT ("generate_exception_reply ()\n")));
return -1;
}
ACE_ENDTRY;
@@ -833,7 +749,7 @@ TAO_GIOP_Message_Base::generate_exception_reply (
}
int
-TAO_GIOP_Message_Base::write_protocol_header (TAO_GIOP_Message_Type t,
+TAO_GIOP_Message_Base::write_protocol_header (TAO_GIOP_Message_Type type,
TAO_OutputCDR &msg)
{
// Reset the message type
@@ -862,7 +778,7 @@ TAO_GIOP_Message_Base::write_protocol_header (TAO_GIOP_Message_Type t,
// version info , Bala
header[6] = (TAO_ENCAP_BYTE_ORDER ^ msg.do_byte_swap ());
- header[7] = CORBA::Octet(t);
+ header[7] = CORBA::Octet(type);
static int header_size = sizeof (header) / sizeof (header[0]);
msg.write_octet_array (header, header_size);
@@ -1298,7 +1214,7 @@ void
TAO_GIOP_Message_Base::set_state (
CORBA::Octet def_major,
CORBA::Octet def_minor,
- TAO_GIOP_Message_Generator_Parser *&gen_parser)
+ TAO_GIOP_Message_Generator_Parser *&gen_parser) const
{
switch (def_major)
{
@@ -1307,15 +1223,18 @@ TAO_GIOP_Message_Base::set_state (
{
case 0:
gen_parser =
- &this->tao_giop_impl_.tao_giop_10;
+ const_cast<TAO_GIOP_Message_Generator_Parser_10 *> (
+ &this->tao_giop_impl_.tao_giop_10);
break;
case 1:
gen_parser =
- &this->tao_giop_impl_.tao_giop_11;
+ const_cast<TAO_GIOP_Message_Generator_Parser_11 *> (
+ &this->tao_giop_impl_.tao_giop_11);
break;
case 2:
gen_parser =
- &this->tao_giop_impl_.tao_giop_12;
+ const_cast<TAO_GIOP_Message_Generator_Parser_12 *> (
+ &this->tao_giop_impl_.tao_giop_12);
break;
default:
break;
@@ -1492,7 +1411,7 @@ TAO_GIOP_Message_Base::dump_msg (const char *label,
int byte_order = ptr[TAO_GIOP_MESSAGE_FLAGS_OFFSET] & 0x01;
// Get the version info
- // CORBA::Octet major = ptr[TAO_GIOP_VERSION_MAJOR_OFFSET];
+ CORBA::Octet major = ptr[TAO_GIOP_VERSION_MAJOR_OFFSET];
CORBA::Octet minor = ptr[TAO_GIOP_VERSION_MINOR_OFFSET];
// request/reply id.
@@ -1503,7 +1422,7 @@ TAO_GIOP_Message_Base::dump_msg (const char *label,
if (ptr[TAO_GIOP_MESSAGE_TYPE_OFFSET] == TAO_GIOP_REQUEST ||
ptr[TAO_GIOP_MESSAGE_TYPE_OFFSET] == TAO_GIOP_REPLY)
{
- if (minor < 2)
+ if (major == 1 && minor < 2)
{
// @@ Only works if ServiceContextList is empty....
tmp_id = (char * ) (ptr + TAO_GIOP_MESSAGE_HEADER_LEN + 4);
@@ -1584,7 +1503,7 @@ TAO_GIOP_Message_Base::make_queued_data (size_t sz)
{
// Get a node for the queue..
TAO_Queued_Data *qd =
- TAO_Queued_Data::get_queued_data (
+ TAO_Queued_Data::make_queued_data (
this->orb_core_->transport_message_buffer_allocator ());
// @@todo: We have a similar method in Transport.cpp. Need to see how
@@ -1620,3 +1539,30 @@ TAO_GIOP_Message_Base::header_length (void) const
{
return TAO_GIOP_MESSAGE_HEADER_LEN;
}
+
+size_t
+TAO_GIOP_Message_Base::fragment_header_length (CORBA::Octet major,
+ CORBA::Octet minor) const
+{
+ TAO_GIOP_Message_Generator_Parser *generator_parser = 0;
+
+ // Get the state information that we need to use
+ this->set_state (major,
+ minor,
+ generator_parser);
+
+ return generator_parser->fragment_header_length ();
+}
+
+void
+TAO_GIOP_Message_Base::init_queued_data (
+ TAO_Queued_Data* qd,
+ const TAO_GIOP_Message_State& state) const
+{
+ qd->byte_order_ = state.byte_order_;
+ qd->major_version_ = state.giop_version_.major;
+ qd->minor_version_ = state.giop_version_.minor;
+ qd->more_fragments_ = state.more_fragments_;
+ qd->request_id_ = state.request_id_;
+ qd->msg_type_ = this->message_type (state);
+}
diff --git a/TAO/tao/GIOP_Message_Base.h b/TAO/tao/GIOP_Message_Base.h
index f50ba9c4c0d..52e0ff0fdad 100644
--- a/TAO/tao/GIOP_Message_Base.h
+++ b/TAO/tao/GIOP_Message_Base.h
@@ -94,6 +94,11 @@ public:
virtual int format_message (TAO_OutputCDR &cdr);
/// Parse the incoming messages..
+ ///
+ /// \return -1 There was some error parsing the GIOP header
+ /// \return 0 The GIOP header was parsed correctly
+ /// \return 1 There was not enough data in the message block to
+ /// parse the header
virtual int parse_incoming_messages (ACE_Message_Block &message_block);
/// Calculate the amount of data that is missing in the <incoming>
@@ -114,10 +119,6 @@ public:
/// Get the details of the message parsed through the @a qd.
virtual void get_message_data (TAO_Queued_Data *qd);
- /// @@Bala:Docu??
- virtual int consolidate_fragments (TAO_Queued_Data *dqd,
- const TAO_Queued_Data *sqd);
-
/// Process the request message that we have received on the
/// connection
virtual int process_request_message (TAO_Transport *transport,
@@ -140,6 +141,10 @@ public:
/// Header length
virtual size_t header_length (void) const;
+ /// The header length of a fragment
+ virtual size_t fragment_header_length (CORBA::Octet major,
+ CORBA::Octet minor) const;
+
virtual TAO_OutputCDR &out_stream (void);
protected:
@@ -158,7 +163,7 @@ protected:
/// Set the state
void set_state (CORBA::Octet major,
CORBA::Octet minor,
- TAO_GIOP_Message_Generator_Parser *&);
+ TAO_GIOP_Message_Generator_Parser *&) const;
/// Print out a debug messages..
void dump_msg (const char *label,
@@ -171,7 +176,8 @@ protected:
/// TAO_PLUGGABLE_MESSAGE_REPLY,
/// TAO_PLUGGABLE_MESSAGE_CLOSECONNECTION,
/// TAO_PLUGGABLE_MESSAGE_MESSAGE_ERROR.
- TAO_Pluggable_Message_Type message_type (TAO_GIOP_Message_State &state);
+ TAO_Pluggable_Message_Type message_type (
+ const TAO_GIOP_Message_State &state) const;
private:
@@ -221,6 +227,11 @@ private:
/// node of size @a sz.
TAO_Queued_Data *make_queued_data (size_t sz);
+ /// Initialize the TAO_Queued_Data from the relevant portions of
+ /// a GIOP_Message_State.
+ void init_queued_data (TAO_Queued_Data* qd,
+ const TAO_GIOP_Message_State& state) const;
+
private:
/// Cached ORB_Core pointer...
TAO_ORB_Core *orb_core_;
diff --git a/TAO/tao/GIOP_Message_Generator_Parser.h b/TAO/tao/GIOP_Message_Generator_Parser.h
index a0ccd7cf543..e5ace6e2579 100644
--- a/TAO/tao/GIOP_Message_Generator_Parser.h
+++ b/TAO/tao/GIOP_Message_Generator_Parser.h
@@ -96,6 +96,9 @@ public:
/// request/response?
virtual int is_ready_for_bidirectional (void);
+ /// The header length of a fragment
+ virtual size_t fragment_header_length (void) const = 0;
+
protected:
/// Marshall the reply status
diff --git a/TAO/tao/GIOP_Message_Generator_Parser_10.cpp b/TAO/tao/GIOP_Message_Generator_Parser_10.cpp
index 4e7d60ed2d8..dad464a724c 100644
--- a/TAO/tao/GIOP_Message_Generator_Parser_10.cpp
+++ b/TAO/tao/GIOP_Message_Generator_Parser_10.cpp
@@ -476,3 +476,9 @@ TAO_GIOP_Message_Generator_Parser_10::minor_version (void)
// Any harm in hardcoding??
return 0;
}
+
+size_t
+TAO_GIOP_Message_Generator_Parser_10::fragment_header_length (void) const
+{
+ return 0;
+}
diff --git a/TAO/tao/GIOP_Message_Generator_Parser_10.h b/TAO/tao/GIOP_Message_Generator_Parser_10.h
index 1d163e8654a..848f83fffd2 100644
--- a/TAO/tao/GIOP_Message_Generator_Parser_10.h
+++ b/TAO/tao/GIOP_Message_Generator_Parser_10.h
@@ -86,6 +86,8 @@ public:
virtual CORBA::Octet major_version (void);
virtual CORBA::Octet minor_version (void);
+ /// The header length of a fragment
+ virtual size_t fragment_header_length (void) const;
};
#include /**/ "ace/post.h"
diff --git a/TAO/tao/GIOP_Message_Generator_Parser_12.cpp b/TAO/tao/GIOP_Message_Generator_Parser_12.cpp
index e39b5125a11..04daba08086 100644
--- a/TAO/tao/GIOP_Message_Generator_Parser_12.cpp
+++ b/TAO/tao/GIOP_Message_Generator_Parser_12.cpp
@@ -5,6 +5,7 @@
#include "tao/operation_details.h"
#include "tao/debug.h"
#include "tao/Pluggable_Messaging_Utils.h"
+#include "tao/GIOP_Message_State.h"
#include "tao/TAO_Server_Request.h"
#include "tao/TAOC.h"
#include "tao/ORB_Core.h"
@@ -558,3 +559,9 @@ TAO_GIOP_Message_Generator_Parser_12::process_bidir_context (
return transport->tear_listen_point_list (cdr);
}
+
+size_t
+TAO_GIOP_Message_Generator_Parser_12::fragment_header_length (void) const
+{
+ return TAO_GIOP_MESSAGE_FRAGMENT_HEADER;
+}
diff --git a/TAO/tao/GIOP_Message_Generator_Parser_12.h b/TAO/tao/GIOP_Message_Generator_Parser_12.h
index 92f28a0d727..5fa4d531ba9 100644
--- a/TAO/tao/GIOP_Message_Generator_Parser_12.h
+++ b/TAO/tao/GIOP_Message_Generator_Parser_12.h
@@ -92,6 +92,9 @@ public:
/// request/response?
virtual int is_ready_for_bidirectional (void);
+ /// The header length of a fragment
+ virtual size_t fragment_header_length (void) const;
+
private:
/// Marshall the TargetSpecification
diff --git a/TAO/tao/GIOP_Message_Generator_Parser_Impl.inl b/TAO/tao/GIOP_Message_Generator_Parser_Impl.inl
index 18bb7936ffd..19c5c70d086 100644
--- a/TAO/tao/GIOP_Message_Generator_Parser_Impl.inl
+++ b/TAO/tao/GIOP_Message_Generator_Parser_Impl.inl
@@ -5,9 +5,22 @@ TAO_GIOP_Message_Generator_Parser_Impl::
check_revision (CORBA::Octet incoming_major,
CORBA::Octet incoming_minor)
{
- if (incoming_major > TAO_DEF_GIOP_MAJOR ||
- incoming_minor > TAO_DEF_GIOP_MINOR)
+ CORBA::UShort version_as_whole_num = incoming_major << 8 | incoming_minor;
+ CORBA::UShort max_allowable_version = TAO_DEF_GIOP_MAJOR << 8 | TAO_DEF_GIOP_MINOR;
+
+ // If it's greater than the max, we know it's not allowed.
+ if (version_as_whole_num > max_allowable_version)
return 0;
- return 1;
+ // If it's less than the max, though, we still have to check for
+ // each explicit version and only allow the ones we know work.
+ switch (version_as_whole_num)
+ {
+ case 0x0100:
+ case 0x0101:
+ case 0x0102:
+ return 1;
+ }
+
+ return 0;
}
diff --git a/TAO/tao/GIOP_Message_Lite.cpp b/TAO/tao/GIOP_Message_Lite.cpp
index c7e68a6bd8a..96135ffeaa4 100644
--- a/TAO/tao/GIOP_Message_Lite.cpp
+++ b/TAO/tao/GIOP_Message_Lite.cpp
@@ -29,7 +29,7 @@ TAO_GIOP_Message_Lite::TAO_GIOP_Message_Lite (TAO_ORB_Core *orb_core,
: orb_core_ (orb_core),
message_type_ (0),
message_size_ (0),
- byte_order_ (ACE_CDR_BYTE_ORDER)
+ byte_order_ (TAO_ENCAP_BYTE_ORDER)
{
}
@@ -107,7 +107,7 @@ TAO_GIOP_Message_Lite::generate_locate_request_header (
if (!this->write_protocol_header (TAO_GIOP_LOCATEREQUEST,
cdr))
{
- if (TAO_debug_level > 3)
+ if (TAO_debug_level)
{
ACE_ERROR ((LM_ERROR,
ACE_TEXT ("(%P|%t) Error in writing GIOPLite header \n")));
@@ -121,7 +121,7 @@ TAO_GIOP_Message_Lite::generate_locate_request_header (
spec,
cdr))
{
- if (TAO_debug_level > 4)
+ if (TAO_debug_level)
ACE_ERROR ((LM_ERROR,
ACE_TEXT ("(%P|%t) Error in writing locate request header \n")));
@@ -142,7 +142,7 @@ TAO_GIOP_Message_Lite::generate_reply_header (
if (!this->write_protocol_header (TAO_GIOP_REPLY,
cdr))
{
- if (TAO_debug_level > 3)
+ if (TAO_debug_level)
{
ACE_ERROR ((LM_ERROR,
ACE_TEXT ("(%P|%t) Error in writing GIOPLite header \n")));
@@ -248,11 +248,18 @@ TAO_GIOP_Message_Lite::format_message (TAO_OutputCDR &stream)
int
TAO_GIOP_Message_Lite::parse_incoming_messages (ACE_Message_Block &block)
{
+ // Make sure we have enough bytes in the header to read all
+ // of the information.
+ if (block.length () < TAO_GIOP_LITE_HEADER_LEN)
+ {
+ return 1;
+ }
+
// Get the read pointer
char *rd_ptr = block.rd_ptr ();
- // We dont need to do this sort of copy. But some compilers (read it
- // as solaris ones) have a problem in deferencing from the
+ // We don't need to do this sort of copy. But some compilers (read it
+ // as SunCC) have a problem in deferencing from the
// reinterpret_cast pointer of the <rd_ptr>, as the <rd_ptr> can be
// on stack. So let us go ahead with this copying...
char buf [4];
@@ -263,7 +270,7 @@ TAO_GIOP_Message_Lite::parse_incoming_messages (ACE_Message_Block &block)
CORBA::ULong x = 0;
#if !defined (ACE_DISABLE_SWAP_ON_READ)
- if (!(this->byte_order_ != ACE_CDR_BYTE_ORDER))
+ if (!(this->byte_order_ != TAO_ENCAP_BYTE_ORDER))
{
x = *ACE_reinterpret_cast (ACE_CDR::ULong*, buf);
}
@@ -284,7 +291,7 @@ TAO_GIOP_Message_Lite::parse_incoming_messages (ACE_Message_Block &block)
}
TAO_Pluggable_Message_Type
-TAO_GIOP_Message_Lite::message_type (void)
+TAO_GIOP_Message_Lite::message_type (void) const
{
switch (this->message_type_)
{
@@ -368,8 +375,7 @@ TAO_GIOP_Message_Lite::extract_next_message (ACE_Message_Block &incoming,
if (copying_len > incoming.length ())
{
- qd->missing_data_ =
- copying_len - incoming.length ();
+ qd->missing_data_ = copying_len - incoming.length ();
copying_len = incoming.length ();
}
@@ -378,10 +384,8 @@ TAO_GIOP_Message_Lite::extract_next_message (ACE_Message_Block &incoming,
copying_len);
incoming.rd_ptr (copying_len);
- qd->byte_order_ = TAO_ENCAP_BYTE_ORDER;
- qd->major_version_ = TAO_DEF_GIOP_MAJOR;
- qd->minor_version_ = TAO_DEF_GIOP_MINOR;
- qd->msg_type_ = this->message_type ();
+ this->init_queued_data (qd);
+
return 1;
}
@@ -420,13 +424,12 @@ TAO_GIOP_Message_Lite::consolidate_node (TAO_Queued_Data *qd,
// Calculate the bytes that needs to be copied in the queue...
size_t copy_len = this->message_size_;
- // If teh data that needs to be copied is more than that is
+ // If the data that needs to be copied is more than that is
// available to us ..
if (copy_len > incoming.length ())
{
// Calculate the missing data..
- qd->missing_data_ =
- copy_len - incoming.length ();
+ qd->missing_data_ = copy_len - incoming.length ();
// Set the actual possible copy_len that is available...
copy_len = incoming.length ();
@@ -445,10 +448,7 @@ TAO_GIOP_Message_Lite::consolidate_node (TAO_Queued_Data *qd,
incoming.rd_ptr (copy_len);
// Get the other details...
- qd->byte_order_ = TAO_ENCAP_BYTE_ORDER;
- qd->major_version_ = TAO_DEF_GIOP_MAJOR;
- qd->minor_version_ = TAO_DEF_GIOP_MINOR;
- qd->msg_type_ = this->message_type ();
+ this->init_queued_data (qd);
}
else
{
@@ -458,8 +458,7 @@ TAO_GIOP_Message_Lite::consolidate_node (TAO_Queued_Data *qd,
if (copy_len > incoming.length ())
{
// Calculate the missing data..
- qd->missing_data_ =
- copy_len - incoming.length ();
+ qd->missing_data_ = copy_len - incoming.length ();
// Set the actual possible copy_len that is available...
copy_len = incoming.length ();
@@ -483,28 +482,13 @@ void
TAO_GIOP_Message_Lite::get_message_data (TAO_Queued_Data *qd)
{
// Get the message information
- qd->byte_order_ =
- ACE_CDR_BYTE_ORDER;
- qd->major_version_ =
- TAO_DEF_GIOP_MAJOR;
- qd->minor_version_ =
- TAO_DEF_GIOP_MINOR;
-
- qd->msg_type_=
- this->message_type ();
+ this->init_queued_data (qd);
+ // Reset the message_state
this->reset ();
}
int
-TAO_GIOP_Message_Lite::consolidate_fragments (TAO_Queued_Data * /*dqd*/,
- const TAO_Queued_Data * /*sqd*/)
-{
- // We dont know what fragments are???
- return -1;
-}
-
-int
TAO_GIOP_Message_Lite::process_request_message (TAO_Transport *transport,
TAO_Queued_Data *qd)
{
@@ -539,10 +523,11 @@ TAO_GIOP_Message_Lite::process_request_message (TAO_Transport *transport,
size_t wr_pos = qd->msg_block_->wr_ptr () - qd->msg_block_->base ();
rd_pos += TAO_GIOP_LITE_HEADER_LEN;
- this->dump_msg ("recv",
- ACE_reinterpret_cast (u_char *,
- qd->msg_block_->rd_ptr ()),
- qd->msg_block_->length ());
+ if (TAO_debug_level > 0)
+ this->dump_msg ("recv",
+ ACE_reinterpret_cast (u_char *,
+ qd->msg_block_->rd_ptr ()),
+ qd->msg_block_->length ());
// Create a input CDR stream.
@@ -593,16 +578,17 @@ TAO_GIOP_Message_Lite::process_reply_message (
size_t wr_pos = qd->msg_block_->wr_ptr () - qd->msg_block_->base ();
rd_pos += TAO_GIOP_LITE_HEADER_LEN;
- this->dump_msg ("recv",
- ACE_reinterpret_cast (u_char *,
- qd->msg_block_->rd_ptr ()),
- qd->msg_block_->length ());
+ if (TAO_debug_level > 0)
+ this->dump_msg ("recv",
+ ACE_reinterpret_cast (u_char *,
+ qd->msg_block_->rd_ptr ()),
+ qd->msg_block_->length ());
// Create a empty buffer on stack
// NOTE: We use the same data block in which we read the message and
// we pass it on to the higher layers of the ORB. So we dont to any
- // copies at all here. The same is alos done in the higher layers.
+ // copies at all here. The same is also done in the higher layers.
TAO_InputCDR input_cdr (qd->msg_block_->data_block (),
ACE_Message_Block::DONT_DELETE,
rd_pos,
@@ -627,11 +613,11 @@ TAO_GIOP_Message_Lite::process_reply_message (
// GIOP_REPLY or GIOP_LOCATE_REPLY to take action.
switch (qd->msg_type_)
{
- case TAO_GIOP_REPLY:
+ case TAO_PLUGGABLE_MESSAGE_REPLY:
// Should be taken care by the state specific parsing
return this->parse_reply (input_cdr,
params);
- case TAO_GIOP_LOCATEREPLY:
+ case TAO_PLUGGABLE_MESSAGE_LOCATEREPLY:
// We call parse_reply () here because, the message format for
// the LOCATEREPLY & REPLY are same.
return this->parse_reply (input_cdr,
@@ -680,7 +666,7 @@ TAO_GIOP_Message_Lite::generate_exception_reply (
int
TAO_GIOP_Message_Lite::write_protocol_header (
- TAO_GIOP_Message_Type t,
+ TAO_GIOP_Message_Type type,
TAO_OutputCDR &msg)
{
// Reset the message type
@@ -700,7 +686,7 @@ TAO_GIOP_Message_Lite::write_protocol_header (
CORBA::ULong size = 0;
msg.write_ulong (size);
- msg.write_octet ((CORBA::Octet) t);
+ msg.write_octet ((CORBA::Octet) type);
return 1;
}
@@ -811,15 +797,16 @@ TAO_GIOP_Message_Lite::process_request (TAO_Transport *transport,
ACE_TEXT ("TAO: (%P|%t|%N|%l) %p: ")
ACE_TEXT ("cannot send exception\n"),
ACE_TEXT ("process_request ()")));
- ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION,
- "TAO: ");
+ ACE_PRINT_EXCEPTION (
+ ACE_ANY_EXCEPTION,
+ "TAO_GIOP_Message_Lite::process_request[2]");
}
}
}
else if (TAO_debug_level > 0)
{
- // It is unfotunate that an exception (probably a system
+ // It is unfortunate that an exception (probably a system
// exception) was thrown by the upcall code (even by the
// user) when the client was not expecting a response.
// However, in this case, we cannot close the connection
@@ -867,10 +854,13 @@ TAO_GIOP_Message_Lite::process_request (TAO_Transport *transport,
if (TAO_debug_level > 0)
{
ACE_ERROR ((LM_ERROR,
- ACE_TEXT ("TAO: (%P|%t|%N|%l) %p: ")
+ ACE_TEXT ("TAO (%P|%t) - TAO_GIOP_Message_Lite::process_request[3], ")
+ ACE_TEXT ("%p: ")
ACE_TEXT ("cannot send exception\n"),
ACE_TEXT ("process_request ()")));
- ACE_PRINT_EXCEPTION (exception, "TAO: ");
+ ACE_PRINT_EXCEPTION (
+ exception,
+ "TAO_GIOP_Message_Lite::process_request[3]");
}
}
}
@@ -973,7 +963,8 @@ TAO_GIOP_Message_Lite::process_locate_request (TAO_Transport *transport,
status_info.status = TAO_GIOP_OBJECT_FORWARD;
status_info.forward_location_var = forward_to;
ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("handle_locate has been called: forwarding\n")));
+ ACE_TEXT ("TAO (%P|%t) - TAO_GIOP_Message_Lite::process_locate_request, ")
+ ACE_TEXT ("called: forwarding\n")));
}
else if (server_request.exception_type () == TAO_GIOP_NO_EXCEPTION)
{
@@ -981,7 +972,8 @@ TAO_GIOP_Message_Lite::process_locate_request (TAO_Transport *transport,
status_info.status = TAO_GIOP_OBJECT_HERE;
if (TAO_debug_level > 0)
ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("TAO: (%P|%t) handle_locate() : found\n")));
+ ACE_TEXT ("TAO (%P|%t) - TAO_GIOP_Message_Lite::process_locate_request, ")
+ ACE_TEXT ("found\n")));
}
else
{
@@ -991,14 +983,16 @@ TAO_GIOP_Message_Lite::process_locate_request (TAO_Transport *transport,
{
status_info.status = TAO_GIOP_OBJECT_FORWARD;
ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("handle_locate has been called: forwarding\n")));
+ ACE_TEXT ("TAO (%P|%t) - TAO_GIOP_Message_Lite::process_locate_request, ")
+ ACE_TEXT ("forwarding\n")));
}
else
{
// Normal exception, so the object is not here
status_info.status = TAO_GIOP_UNKNOWN_OBJECT;
ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("handle_locate has been called: not here\n")));
+ ACE_TEXT ("TAO (%P|%t) - TAO_GIOP_Message_Lite::process_locate_request, ")
+ ACE_TEXT ("not here\n")));
}
}
}
@@ -1416,11 +1410,11 @@ TAO_GIOP_Message_Lite::send_reply_exception (
// Create a new output CDR stream
char repbuf[ACE_CDR::DEFAULT_BUFSIZE];
-#if defined(ACE_HAS_PURIFY)
+#if defined(ACE_INITIALIZE_MEMORY_BEFORE_USE)
(void) ACE_OS::memset (repbuf,
'\0',
sizeof repbuf);
-#endif /* ACE_HAS_PURIFY */
+#endif /* ACE_INITIALIZE_MEMORY_BEFORE_USE */
TAO_OutputCDR output (repbuf,
sizeof repbuf,
TAO_ENCAP_BYTE_ORDER,
@@ -1569,20 +1563,20 @@ TAO_GIOP_Message_Lite::dump_msg (const char *label,
const u_char *ptr,
size_t len)
{
- static const char *names [] =
- {
- "Request",
- "Reply",
- "CancelRequest",
- "LocateRequest",
- "LocateReply",
- "CloseConnection",
- "MessageError"
- "Fragment"
- };
-
if (TAO_debug_level >= 5)
{
+ static const char *names [] =
+ {
+ "Request",
+ "Reply",
+ "CancelRequest",
+ "LocateRequest",
+ "LocateReply",
+ "CloseConnection",
+ "MessageError",
+ "Fragment"
+ };
+
// Message name.
const char *message_name = "UNKNOWN MESSAGE";
u_long slot = ptr[TAO_GIOP_LITE_MESSAGE_TYPE_OFFSET];
@@ -1633,7 +1627,7 @@ TAO_GIOP_Message_Lite::make_queued_data (size_t sz)
{
// Get a node for the queue..
TAO_Queued_Data *qd =
- TAO_Queued_Data::get_queued_data ();
+ TAO_Queued_Data::make_queued_data ();
// Make a datablock for the size requested + something. The
// "something" is required because we are going to align the data
@@ -1681,3 +1675,19 @@ TAO_GIOP_Message_Lite::header_length (void) const
{
return TAO_GIOP_LITE_HEADER_LEN;
}
+
+size_t
+TAO_GIOP_Message_Lite::fragment_header_length (CORBA::Octet,
+ CORBA::Octet) const
+{
+ return 0;
+}
+
+void
+TAO_GIOP_Message_Lite::init_queued_data (TAO_Queued_Data* qd) const
+{
+ qd->byte_order_ = TAO_ENCAP_BYTE_ORDER;
+ qd->major_version_ = TAO_DEF_GIOP_MAJOR;
+ qd->minor_version_ = TAO_DEF_GIOP_MINOR;
+ qd->msg_type_ = this->message_type ();
+}
diff --git a/TAO/tao/GIOP_Message_Lite.h b/TAO/tao/GIOP_Message_Lite.h
index 2b1d2b03a70..8ddaa6ef89b 100644
--- a/TAO/tao/GIOP_Message_Lite.h
+++ b/TAO/tao/GIOP_Message_Lite.h
@@ -90,6 +90,11 @@ public:
virtual int format_message (TAO_OutputCDR &cdr);
/// Parse the incoming messages..
+ ///
+ /// \return -1 There was some error parsing the GIOP header
+ /// \return 0 The GIOP header was parsed correctly
+ /// \return 1 There was not enough data in the message block to
+ /// parse the header
virtual int parse_incoming_messages (ACE_Message_Block &message_block);
/// Get the message type. The return value would be one of the
@@ -98,7 +103,7 @@ public:
/// TAO_PLUGGABLE_MESSAGE_REPLY,
/// TAO_PLUGGABLE_MESSAGE_CLOSECONNECTION,
/// TAO_PLUGGABLE_MESSAGE_MESSAGE_ERROR.
- TAO_Pluggable_Message_Type message_type (void);
+ TAO_Pluggable_Message_Type message_type (void) const;
/// Calculate the amount of data that is missing in the <incoming>
@@ -119,10 +124,6 @@ public:
/// Get the details of the message parsed through the <qd>.
virtual void get_message_data (TAO_Queued_Data *qd);
- /// @@Bala: Docu???
- virtual int consolidate_fragments (TAO_Queued_Data *dqd,
- const TAO_Queued_Data *sqd);
-
/// Process the request message that we have received on the
/// connection
virtual int process_request_message (TAO_Transport *transport,
@@ -203,6 +204,10 @@ private:
/// Header length
virtual size_t header_length (void) const;
+ /// Fragment header length
+ virtual size_t fragment_header_length (CORBA::Octet major,
+ CORBA::Octet minor) const;
+
virtual TAO_OutputCDR &out_stream (void);
private:
@@ -249,6 +254,9 @@ private:
int parse_locate_reply (TAO_InputCDR &input,
TAO_Pluggable_Reply_Params &params);
+ /// Initialize the relevant portions of a TAO_Queued_Data
+ void init_queued_data (TAO_Queued_Data* qd) const;
+
private:
/// Our copy of the ORB core...
diff --git a/TAO/tao/GIOP_Message_State.cpp b/TAO/tao/GIOP_Message_State.cpp
index 90449d1b4f5..6100e1fb4fe 100644
--- a/TAO/tao/GIOP_Message_State.cpp
+++ b/TAO/tao/GIOP_Message_State.cpp
@@ -14,11 +14,8 @@ ACE_RCSID (tao,
GIOP_Message_State,
"$Id$")
-TAO_GIOP_Message_State::TAO_GIOP_Message_State (
- TAO_ORB_Core * /*orb_core*/,
- TAO_GIOP_Message_Base *base)
- : base_ (base),
- giop_version_ (TAO_DEF_GIOP_MAJOR,
+TAO_GIOP_Message_State::TAO_GIOP_Message_State (void)
+ : giop_version_ (TAO_DEF_GIOP_MAJOR,
TAO_DEF_GIOP_MINOR),
byte_order_ (0),
message_type_ (0),
@@ -36,11 +33,12 @@ TAO_GIOP_Message_State::parse_message_header (ACE_Message_Block &incoming)
if (incoming.length () >= TAO_GIOP_MESSAGE_HEADER_LEN)
{
// Parse the GIOP header
- if (this->parse_message_header_i (incoming) == -1)
- return -1;
+ return this->parse_message_header_i (incoming);
}
- return 0;
+ // One indicates that we didn't have enough data in the message to
+ // parse the header
+ return 1;
}
int
@@ -73,7 +71,6 @@ TAO_GIOP_Message_State::parse_message_header_i (ACE_Message_Block &incoming)
// Get the message type
this->message_type_ = buf[TAO_GIOP_MESSAGE_TYPE_OFFSET];
-
// Get the size of the message..
this->get_payload_size (buf);
@@ -99,13 +96,8 @@ TAO_GIOP_Message_State::parse_message_header_i (ACE_Message_Block &incoming)
}
}
- if (this->more_fragments_)
- {
- (void) this->parse_fragment_header (buf,
- incoming.length ());
- }
-
- return 0;
+ // Get the request id
+ return this->parse_fragment_header (buf, incoming.length ());
}
@@ -240,39 +232,38 @@ TAO_GIOP_Message_State::get_payload_size (char *rd_ptr)
int
-TAO_GIOP_Message_State::parse_fragment_header (char *buf,
+TAO_GIOP_Message_State::parse_fragment_header (const char *buf,
size_t length)
{
- size_t len =
- TAO_GIOP_MESSAGE_FRAGMENT_HEADER + TAO_GIOP_MESSAGE_HEADER_LEN;
-
- buf += TAO_GIOP_MESSAGE_HEADER_LEN;
-
// By this point we are doubly sure that we have a more or less
// valid GIOP message with a valid major revision number.
- if (this->giop_version_.minor == 2 &&
- this->message_type_ == TAO_GIOP_FRAGMENT &&
- length > len)
+ if ((this->giop_version_.major > 1 || this->giop_version_.minor >= 2) &&
+ (this->more_fragments_ || this->message_type_ == TAO_GIOP_FRAGMENT))
{
+ static const size_t len =
+ TAO_GIOP_MESSAGE_HEADER_LEN + TAO_GIOP_MESSAGE_FRAGMENT_HEADER;
+
+ // If there is not enough data in the header to get the request
+ // id, then we need to indicate that by returning 1.
+ if (length < len)
+ return 1;
+
// Fragmented message in GIOP 1.2 should have a fragment header
- // following the GIOP header. Grab the rd_ptr to get that
- // info.
+ // following the GIOP header.
+ buf += TAO_GIOP_MESSAGE_HEADER_LEN;
this->request_id_ = this->read_ulong (buf);
-
- // As we parsed the header
- return 1;
}
return 0;
}
CORBA::ULong
-TAO_GIOP_Message_State::read_ulong (char *rd_ptr)
+TAO_GIOP_Message_State::read_ulong (const char *rd_ptr)
{
CORBA::ULong x = 0;
- // We dont need to do this sort of copy. But some compilers (read it
- // as solaris ones) have a problem in deferencing from the
+ // We don't need to do this sort of copy. But some compilers (read it
+ // as SunCC) have a problem in deferencing from the
// reinterpret_cast pointer of the <rd_ptr>, as the <rd_ptr> can be
// on stack. So let us go ahead with this copying...
char buf [4];
diff --git a/TAO/tao/GIOP_Message_State.h b/TAO/tao/GIOP_Message_State.h
index f902fa03a0e..16d9c11d14c 100644
--- a/TAO/tao/GIOP_Message_State.h
+++ b/TAO/tao/GIOP_Message_State.h
@@ -41,10 +41,14 @@ class TAO_Export TAO_GIOP_Message_State
public:
/// Ctor
- TAO_GIOP_Message_State (TAO_ORB_Core *orb_core,
- TAO_GIOP_Message_Base *base);
+ TAO_GIOP_Message_State (void);
/// Parse the message header.
+ ///
+ /// \return -1 There was some error parsing the GIOP header
+ /// \return 0 The GIOP header was parsed correctly
+ /// \return 1 There was not enough data in the message block to
+ /// parse the header
int parse_message_header (ACE_Message_Block &incoming);
/// Return the message size
@@ -85,18 +89,14 @@ private:
/// Parses the GIOP FRAGMENT_HEADER information from the incoming
/// stream.
- int parse_fragment_header (char *buf,
+ int parse_fragment_header (const char *buf,
size_t length);
/// Read the unsigned long from the buffer. The <buf> should just
/// point to the next 4 bytes data that represent the ULong
- CORBA::ULong read_ulong (char *buf);
+ CORBA::ULong read_ulong (const char *buf);
private:
-
- /// The GIOP base class..
- TAO_GIOP_Message_Base *base_;
-
// GIOP version information..
TAO_GIOP_Message_Version giop_version_;
@@ -113,6 +113,9 @@ private:
CORBA::ULong request_id_;
/// (Requests and Replys)
+ /// A value of zero indicates that this message does not have any
+ /// fragments. A value of non-zero indicates that it does have
+ /// fragments.
CORBA::Octet more_fragments_;
/// Missing data
diff --git a/TAO/tao/Incoming_Message_Queue.cpp b/TAO/tao/Incoming_Message_Queue.cpp
index 662ee3afbec..61999fb1a11 100644
--- a/TAO/tao/Incoming_Message_Queue.cpp
+++ b/TAO/tao/Incoming_Message_Queue.cpp
@@ -15,7 +15,7 @@ ACE_RCSID (tao,
"$Id$")
TAO_Incoming_Message_Queue::TAO_Incoming_Message_Queue (TAO_ORB_Core *orb_core)
- : queued_data_ (0),
+ : last_added_ (0),
size_ (0),
orb_core_ (orb_core)
{
@@ -45,21 +45,21 @@ TAO_Incoming_Message_Queue::copy_tail (ACE_Message_Block &block)
{
// Check to see if the length of the incoming block is less than
// that of the <missing_data_> of the tail.
- if ((CORBA::Long)block.length () <= this->queued_data_->missing_data_)
+ if ((CORBA::Long)block.length () < this->last_added_->missing_data_)
{
n = block.length ();
}
else
{
- n = this->queued_data_->missing_data_;
+ n = this->last_added_->missing_data_;
}
// Do the copy
- this->queued_data_->msg_block_->copy (block.rd_ptr (),
+ this->last_added_->msg_block_->copy (block.rd_ptr (),
n);
// Decerement the missing data
- this->queued_data_->missing_data_ -= n;
+ this->last_added_->missing_data_ -= n;
}
return n;
@@ -68,17 +68,20 @@ TAO_Incoming_Message_Queue::copy_tail (ACE_Message_Block &block)
TAO_Queued_Data *
TAO_Incoming_Message_Queue::dequeue_head (void)
{
+ if (this->size_ == 0)
+ return 0;
+
// Get the node on the head of the queue...
- TAO_Queued_Data *tmp =
- this->queued_data_->next_;
+ TAO_Queued_Data *head = this->last_added_->next_;
// Reset the head node..
- this->queued_data_->next_ = tmp->next_;
+ this->last_added_->next_ = head->next_;
- // Decrease the size
- --this->size_;
+ // Decrease the size and reset last_added_ if empty
+ if (--this->size_ == 0)
+ this->last_added_ = 0;
- return tmp;
+ return head;
}
TAO_Queued_Data *
@@ -89,52 +92,158 @@ TAO_Incoming_Message_Queue::dequeue_tail (void)
return 0;
// Get the node on the head of the queue...
- TAO_Queued_Data *tmp =
- this->queued_data_->next_;
+ TAO_Queued_Data *head =
+ this->last_added_->next_;
- while (tmp->next_ != this->queued_data_)
+ while (head->next_ != this->last_added_)
{
- tmp = tmp->next_;
+ head = head->next_;
}
// Put the head in tmp.
- tmp->next_ = this->queued_data_->next_;
+ head->next_ = this->last_added_->next_;
- TAO_Queued_Data *ret_qd = this->queued_data_;
+ TAO_Queued_Data *ret_qd = this->last_added_;
- this->queued_data_ = tmp;
+ this->last_added_ = head;
// Decrease the size
- --this->size_;
+ if (--this->size_ == 0)
+ this->last_added_ = 0;
return ret_qd;
}
-
int
TAO_Incoming_Message_Queue::enqueue_tail (TAO_Queued_Data *nd)
{
if (this->size_ == 0)
{
- this->queued_data_ = nd;
- this->queued_data_->next_ = this->queued_data_;
+ this->last_added_ = nd;
+ this->last_added_->next_ = this->last_added_;
}
else
{
- nd->next_ = this->queued_data_->next_;
- this->queued_data_->next_ = nd;
- this->queued_data_ = nd;
+ nd->next_ = this->last_added_->next_;
+ this->last_added_->next_ = nd;
+ this->last_added_ = nd;
}
++ this->size_;
return 0;
}
+TAO_Queued_Data *
+TAO_Incoming_Message_Queue::find_fragment_chain (CORBA::Octet major,
+ CORBA::Octet minor) const
+{
+ TAO_Queued_Data *found = 0;
+ if (this->last_added_ != 0)
+ {
+ TAO_Queued_Data *qd = this->last_added_->next_;
+
+ do {
+ if (qd->more_fragments_ &&
+ qd->major_version_ == major && qd->minor_version_ == minor)
+ {
+ found = qd;
+ }
+ else
+ {
+ qd = qd->next_;
+ }
+ } while (found == 0 && qd != this->last_added_->next_);
+ }
+
+ return found;
+}
+
+TAO_Queued_Data *
+TAO_Incoming_Message_Queue::find_fragment_chain (CORBA::ULong request_id) const
+{
+ TAO_Queued_Data *found = 0;
+ if (this->last_added_ != 0)
+ {
+ TAO_Queued_Data *qd = this->last_added_->next_;
+
+ do {
+ if (qd->more_fragments_ && qd->request_id_ == request_id)
+ {
+ found = qd;
+ }
+ else
+ {
+ qd = qd->next_;
+ }
+ } while (found == 0 && qd != this->last_added_->next_);
+ }
+
+ return found;
+}
+
/************************************************************************/
// Methods for TAO_Queued_Data
/************************************************************************/
+/*!
+ \brief Allocate and return a new empty message block of size \a new_size mimicking parameters of \a mb.
+
+ This function allocates a new aligned message block using the same
+ allocators and flags as found in \a mb. The size of the new message
+ block is at least \a new_size; the size may be adjusted up in order
+ to accomodate alignment requirements and still fit \a new_size bytes
+ into the aligned buffer.
+
+ \param mb message block whose parameters should be mimicked
+ \param new_size size of the new message block (will be adjusted for proper alignment)
+ \return an aligned message block with rd_ptr sitting at correct alignment spot, 0 on failure
+
+ \author Thanks to Rich Seibel for helping implement the public API for ACE_Message_Block!
+ */
+static ACE_Message_Block*
+clone_mb_nocopy_size (ACE_Message_Block *mb, size_t span_size)
+{
+ // Calculate the required size of the cloned block with alignment
+ size_t aligned_size = ACE_CDR::first_size (span_size + ACE_CDR::MAX_ALIGNMENT);
+
+ // Get the allocators
+ ACE_Allocator *data_allocator;
+ ACE_Allocator *data_block_allocator;
+ ACE_Allocator *message_block_allocator;
+ mb->access_allocators (data_allocator,
+ data_block_allocator,
+ message_block_allocator);
+
+ // Create a new Message Block
+ ACE_Message_Block *nb;
+ ACE_NEW_MALLOC_RETURN (nb,
+ static_cast<ACE_Message_Block*> (
+ message_block_allocator->malloc (
+ sizeof (ACE_Message_Block))),
+ ACE_Message_Block(aligned_size,
+ mb->msg_type(),
+ mb->cont(),
+ 0, //we want the data block created
+ data_allocator,
+ mb->locking_strategy(),
+ mb->msg_priority(),
+ mb->msg_execution_time (),
+ mb->msg_deadline_time (),
+ data_block_allocator,
+ message_block_allocator),
+ 0);
+
+ ACE_CDR::mb_align (nb);
+
+ // Copy the flags over, but be SURE to clear the DONT_DELETE flag, since
+ // we just dynamically allocated the two things.
+ nb->set_flags (mb->flags());
+ nb->clr_flags (ACE_Message_Block::DONT_DELETE);
+
+ return nb;
+}
+
TAO_Queued_Data::TAO_Queued_Data (ACE_Allocator *alloc)
: msg_block_ (0),
missing_data_ (0),
@@ -142,6 +251,7 @@ TAO_Queued_Data::TAO_Queued_Data (ACE_Allocator *alloc)
major_version_ (0),
minor_version_ (0),
more_fragments_ (0),
+ request_id_ (0),
msg_type_ (TAO_PLUGGABLE_MESSAGE_MESSAGERROR),
next_ (0),
allocator_ (alloc)
@@ -156,6 +266,7 @@ TAO_Queued_Data::TAO_Queued_Data (ACE_Message_Block *mb,
major_version_ (0),
minor_version_ (0),
more_fragments_ (0),
+ request_id_ (0),
msg_type_ (TAO_PLUGGABLE_MESSAGE_MESSAGERROR),
next_ (0),
allocator_ (alloc)
@@ -169,6 +280,7 @@ TAO_Queued_Data::TAO_Queued_Data (const TAO_Queued_Data &qd)
major_version_ (qd.major_version_),
minor_version_ (qd.minor_version_),
more_fragments_ (qd.more_fragments_),
+ request_id_ (qd.request_id_),
msg_type_ (qd.msg_type_),
next_ (0),
allocator_ (qd.allocator_)
@@ -177,7 +289,7 @@ TAO_Queued_Data::TAO_Queued_Data (const TAO_Queued_Data &qd)
/*static*/
TAO_Queued_Data *
-TAO_Queued_Data::get_queued_data (ACE_Allocator *alloc)
+TAO_Queued_Data::make_queued_data (ACE_Allocator *alloc)
{
TAO_Queued_Data *qd = 0;
@@ -284,3 +396,31 @@ TAO_Queued_Data::duplicate (TAO_Queued_Data &sqd)
return qd;
}
+
+void
+TAO_Queued_Data::consolidate (void)
+{
+ // Is this a chain of fragments?
+ if (this->more_fragments_ && this->msg_block_->cont () != 0)
+ {
+ // Create a message block big enough to hold the entire chain
+ ACE_Message_Block *dest = clone_mb_nocopy_size (
+ this->msg_block_,
+ this->msg_block_->total_length ());
+
+ // Reset the cont() parameter. We have cloned the message
+ // block but not the chain as we will no longer have chain.
+ dest->cont (0);
+
+ // Use ACE_CDR to consolidate the chain for us
+ ACE_CDR::consolidate (dest, this->msg_block_);
+
+ // free the original message block chain
+ this->msg_block_->release ();
+
+ // Set the message block to the new consolidated message block
+ this->msg_block_ = dest;
+ this->more_fragments_ = 0;
+ }
+}
+
diff --git a/TAO/tao/Incoming_Message_Queue.h b/TAO/tao/Incoming_Message_Queue.h
index 9d600afb16e..0949b673e76 100644
--- a/TAO/tao/Incoming_Message_Queue.h
+++ b/TAO/tao/Incoming_Message_Queue.h
@@ -75,30 +75,67 @@ public:
/// Return the length of the queue..
CORBA::ULong queue_length (void);
- /// Methods for sanity check. Checks to see whether the node on the
- /// head or tail is complete or not and ready for further
- /// processing.
+ /*!
+ @name Node Inspection Predicates
+
+ \brief These methods allow inspection of head and tail nodes for "completeness".
+
+ These methods check to see whether the node on the head or tail is
+ "complete" and ready for further processing. See each method's
+ documentation for its definition of "complete".
+ */
+ //@{
+ /*!
+ "complete" == the GIOP message at the tail is not missing any data (it may be a complete GIOP Fragment, though)
+
+ \return -1 queue is empty
+ \return 0 tail is not "complete"
+ \return 1 tail is "complete"
+ */
int is_tail_complete (void);
+
+ /*!
+
+ "complete" == the GIOP message at the head is not missing any data
+ AND, if it's the first message in a series of GIOP fragments, all
+ the fragments have been received, parsed, and placed into the
+ queue
+
+ \return -1 if queue is empty
+ \return 0 if head is not "complete"
+ \return 1 if head is "complete"
+ */
int is_head_complete (void);
+ //@}
- /// This method checks whether the last message that was queued up
- /// was fragmented...
+ /*!
+ \brief Check to see if the message at the tail (complete or incomplete) is a GIOP Fragment.
+ */
int is_tail_fragmented (void);
/// Return the size of data that is missing in tail of the queue.
size_t missing_data_tail (void) const;
+ /// Find the first fragment that matches the GIOP version
+ TAO_Queued_Data *find_fragment_chain (CORBA::Octet major,
+ CORBA::Octet minor) const;
+
+ /// Find the first fragment that matches the request id
+ TAO_Queued_Data *find_fragment_chain (CORBA::ULong request_id) const;
+
private:
friend class TAO_Transport;
- /// Make a node for the queue.
- TAO_Queued_Data *get_node (void);
-
private:
+ /*!
+ \brief A circular linked list of messages awaiting processing.
- /// A linked listof messages that await processing
- TAO_Queued_Data *queued_data_;
+ \a last_message_added_ points to the most recent message added to
+ the list. The earliest addition can be easily accessed via
+ \a last_message_added_->next_.
+ */
+ TAO_Queued_Data *last_added_;
/// The size of the queue
CORBA::ULong size_;
@@ -118,6 +155,11 @@ private:
* stored in the queue. Such a node can be used by the incoming thread
* from the reactor to dequeue and process the message by sending it
* to the higher layers of the ORB.
+ *
+ * The ACE_Message_Block contained within this class may contain a chain
+ * of message blocks (usually when GIOP fragments are involved). In that
+ * case consolidate () needs to be called prior to being sent to higher
+ * layers of the ORB when the GIOP fragment chain is complete.
*/
class TAO_Export TAO_Queued_Data
@@ -132,24 +174,35 @@ public:
/// Copy constructor.
TAO_Queued_Data (const TAO_Queued_Data &qd);
- /// Creation and deletion of a node in the queue.
- static TAO_Queued_Data* get_queued_data (ACE_Allocator *alloc = 0);
+ /// Creation of a node in the queue.
+ static TAO_Queued_Data* make_queued_data (ACE_Allocator *alloc = 0);
+ /// Deletion of a node from the queue.
static void release (TAO_Queued_Data *qd);
/// Duplicate ourselves. This creates a copy of ourselves on the
/// heap and returns a pointer to the duplicated node.
static TAO_Queued_Data* duplicate (TAO_Queued_Data &qd);
+ /// Consolidate this fragments chained message blocks into one.
+ void consolidate (void);
+
public:
/// The message block that contains the message.
ACE_Message_Block *msg_block_;
- /// Data missing in the above message that hasn't been read or
- /// processed yet.
+ /*!
+ @name Missing Data details
+
+ The \a missing_data_ member contains the number of bytes of
+ data missing from \a msg_block_.
+ */
+ //@{
+ /*! Data missing in the above message that hasn't been read or processed yet. */
CORBA::Long missing_data_;
+ //@}
- /// The byte order of the message that is stored in the node..
+ /// The byte order of the message that is stored in the node.
CORBA::Octet byte_order_;
/// Many protocols like GIOP have a major and minor version
@@ -164,6 +217,9 @@ public:
/// queue already has more fragments that is missing..
CORBA::Octet more_fragments_;
+ /// The fragment request id
+ CORBA::ULong request_id_;
+
/// The message type of the message
TAO_Pluggable_Message_Type msg_type_;
diff --git a/TAO/tao/Incoming_Message_Queue.inl b/TAO/tao/Incoming_Message_Queue.inl
index d67bd485383..99bcb4978d3 100644
--- a/TAO/tao/Incoming_Message_Queue.inl
+++ b/TAO/tao/Incoming_Message_Queue.inl
@@ -18,7 +18,7 @@ TAO_Incoming_Message_Queue::is_tail_complete (void)
return -1;
if (this->size_ &&
- this->queued_data_->missing_data_ == 0)
+ this->last_added_->missing_data_ == 0)
return 1;
return 0;
@@ -31,8 +31,8 @@ TAO_Incoming_Message_Queue::is_head_complete (void)
return -1;
if (this->size_ &&
- this->queued_data_->next_->missing_data_ == 0 &&
- this->queued_data_->next_->more_fragments_ == 0)
+ this->last_added_->next_->missing_data_ == 0 &&
+ !this->last_added_->next_->more_fragments_)
return 1;
return 0;
@@ -45,7 +45,7 @@ TAO_Incoming_Message_Queue::is_tail_fragmented (void)
return 0;
if (this->size_ &&
- this->queued_data_->more_fragments_ == 1)
+ this->last_added_->more_fragments_)
return 1;
return 0;
@@ -55,19 +55,12 @@ ACE_INLINE size_t
TAO_Incoming_Message_Queue::missing_data_tail (void) const
{
if (this->size_ != 0)
- return this->queued_data_->missing_data_;
+ return this->last_added_->missing_data_;
return 0;
}
-
-ACE_INLINE TAO_Queued_Data *
-TAO_Incoming_Message_Queue::get_node (void)
-{
- return TAO_Queued_Data::get_queued_data ();
-}
-
/************************************************************************/
// Methods for TAO_Queued_Data
/************************************************************************/
diff --git a/TAO/tao/Pluggable_Messaging.h b/TAO/tao/Pluggable_Messaging.h
index c2f04e0c9c4..86989c0865b 100644
--- a/TAO/tao/Pluggable_Messaging.h
+++ b/TAO/tao/Pluggable_Messaging.h
@@ -141,10 +141,6 @@ public:
virtual int consolidate_node (TAO_Queued_Data *qd,
ACE_Message_Block &incoming) = 0;
- /// @@Bala:Docu??
- virtual int consolidate_fragments (TAO_Queued_Data *dqd,
- const TAO_Queued_Data *sqd) = 0;
-
/// Parse the request message, make an upcall and send the reply back
/// to the "request initiator"
virtual int process_request_message (TAO_Transport *transport,
@@ -174,6 +170,10 @@ public:
/// Header length
virtual size_t header_length (void) const = 0;
+ /// Fragment header length
+ virtual size_t fragment_header_length (CORBA::Octet major,
+ CORBA::Octet minor) const = 0;
+
/// Accessor for the output CDR stream
virtual TAO_OutputCDR &out_stream (void) = 0;
};
diff --git a/TAO/tao/Transport.cpp b/TAO/tao/Transport.cpp
index 16f5fcd1c3c..d4d6f4c520d 100644
--- a/TAO/tao/Transport.cpp
+++ b/TAO/tao/Transport.cpp
@@ -129,6 +129,7 @@ TAO_Transport::TAO_Transport (CORBA::ULong tag,
, wchar_translator_ (0)
, tcs_set_ (0)
, first_request_ (1)
+ , partial_message_ (0)
{
TAO_Client_Strategy_Factory *cf =
this->orb_core_->client_factory ();
@@ -158,6 +159,10 @@ TAO_Transport::~TAO_Transport (void)
this->purge_entry();
}
+ // Release the partial message block, however we may
+ // have never allocated one.
+ ACE_Message_Block::release (this->partial_message_);
+
// By the time the destructor is reached here all the connection stuff
// *must* have been cleaned up.
ACE_ASSERT (this->head_ == 0);
@@ -1277,6 +1282,26 @@ TAO_Transport::handle_input (TAO_Resume_Handle &rh,
this->messaging_object ()->header_length ();
}
+ // If we have a partial message, copy it into our message block
+ // and clear out the partial message.
+ if (this->partial_message_ != 0 && this->partial_message_->length () != 0)
+ {
+ if (message_block.copy (this->partial_message_->rd_ptr (),
+ this->partial_message_->length ()) == 0)
+ {
+ recv_size -= this->partial_message_->length ();
+ this->partial_message_->reset ();
+ }
+ else
+ {
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "TAO (%P|%t) - Transport[%d]::handle_input, "
+ "unable to copy the partial message\n",
+ this->id ()),
+ -1);
+ }
+ }
+
// Saving the size of the received buffer in case any one needs to
// get the size of the message thats received in the
// context. Obviously the value will be changed for each recv call
@@ -1286,7 +1311,7 @@ TAO_Transport::handle_input (TAO_Resume_Handle &rh,
// Read the message into the message block that we have created on
// the stack.
- ssize_t n = this->recv (message_block.rd_ptr (),
+ ssize_t n = this->recv (message_block.wr_ptr (),
recv_size,
max_wait_time);
@@ -1325,27 +1350,32 @@ TAO_Transport::handle_input (TAO_Resume_Handle &rh,
return retval;
}
- // Make a node of the message block..
- TAO_Queued_Data qd (&message_block,
- this->orb_core_->transport_message_buffer_allocator ());
+ if (message_block.length () > 0)
+ {
+ // Make a node of the message block..
+ TAO_Queued_Data qd (&message_block,
+ this->orb_core_->transport_message_buffer_allocator ());
- // Extract the data for the node..
- this->messaging_object ()->get_message_data (&qd);
+ // Extract the data for the node..
+ this->messaging_object ()->get_message_data (&qd);
- // Check whether the message was fragmented..
- if (qd.more_fragments_ ||
- (qd.msg_type_ == TAO_PLUGGABLE_MESSAGE_FRAGMENT))
- {
- // Duplicate the node that we have as the node is on stack..
- TAO_Queued_Data *nqd =
- TAO_Queued_Data::duplicate (qd);
+ // Check whether the message was fragmented..
+ if (qd.more_fragments_ ||
+ (qd.msg_type_ == TAO_PLUGGABLE_MESSAGE_FRAGMENT))
+ {
+ // Duplicate the node that we have as the node is on stack..
+ TAO_Queued_Data *nqd =
+ TAO_Queued_Data::duplicate (qd);
+
+ return this->consolidate_fragments (nqd, rh);
+ }
- return this->consolidate_fragments (nqd, rh);
+ // Process the message
+ return this->process_parsed_messages (&qd,
+ rh);
}
- // Process the message
- return this->process_parsed_messages (&qd,
- rh);
+ return 0;
}
int
@@ -1355,9 +1385,47 @@ TAO_Transport::parse_consolidate_messages (ACE_Message_Block &block,
{
// Parse the incoming message for validity. The check needs to be
// performed by the messaging objects.
- if (this->parse_incoming_messages (block) == -1)
+ switch (this->parse_incoming_messages (block))
{
+ // An error has occurred during message parsing
+ case -1:
return -1;
+
+ // This message block does not contain enough data to
+ // parse the header. We do not need to grow the partial
+ // message block since we are guaranteed that it can hold
+ // at least a GIOP header plus a GIOP fragment header.
+ case 1:
+ if (this->partial_message_ == 0)
+ {
+ this->allocate_partial_message_block ();
+ }
+
+ if (this->partial_message_ != 0 &&
+ this->partial_message_->copy (block.rd_ptr (),
+ block.length ()) == 0)
+ {
+ block.rd_ptr (block.length ());
+ return 0;
+ }
+ else
+ {
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "TAO (%P|%t) - Transport[%d]::parse_consolidate_messages, "
+ "unable to save the partial message\n",
+ this->id ()),
+ -1);
+ }
+
+ case 0: // The normal case
+ break;
+
+ default:
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "TAO (%P|%t) - Transport[%d]::parse_consolidate_messages, "
+ "impossible return value from parse_incoming_messages\n",
+ this->id ()),
+ -1);
}
// Check whether we have a complete message for processing
@@ -1397,18 +1465,15 @@ TAO_Transport::parse_incoming_messages (ACE_Message_Block &block)
int retval =
this->messaging_object ()->parse_incoming_messages (block);
- if (retval == -1)
+ if (retval == -1 && TAO_debug_level > 2)
{
- if (TAO_debug_level > 2)
- {
- ACE_DEBUG ((LM_DEBUG,
- "TAO (%P|%t) - Transport[%d]::parse_incoming_messages, "
- "error in incoming message\n",
- this->id ()));
- }
-
- return -1;
+ ACE_DEBUG ((LM_DEBUG,
+ "TAO (%P|%t) - Transport[%d]::parse_incoming_messages, "
+ "error in incoming message\n",
+ this->id ()));
}
+
+ return retval;
}
return 0;
@@ -1511,40 +1576,98 @@ TAO_Transport::consolidate_message (ACE_Message_Block &incoming,
// in the queue as they would have been taken care before. Put
// ourselves in the queue and then try processing one of the
// messages..
- if ((missing_data > 0
- ||this->incoming_message_queue_.queue_length ())
- && this->incoming_message_queue_.is_tail_fragmented () == 0)
+ if (missing_data >= 0 ||
+ this->incoming_message_queue_.queue_length () != 0)
{
- if (TAO_debug_level > 4)
+ if (missing_data == 0 ||
+ !this->incoming_message_queue_.is_tail_fragmented ())
{
- ACE_DEBUG ((LM_DEBUG,
- "TAO (%P|%t) - Transport[%d]::consolidate_message, "
- "queueing up the message\n",
- this->id ()));
- }
+ if (TAO_debug_level > 4)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "TAO (%P|%t) - Transport[%d]::consolidate_message, "
+ "queueing up the message\n",
+ this->id ()));
+ }
- // Get a queued data
- TAO_Queued_Data *qd =
- this->make_queued_data (incoming);
+ // Get a queued data
+ TAO_Queued_Data *qd =
+ this->make_queued_data (incoming);
- // Add the missing data to the queue
- qd->missing_data_ = missing_data;
+ // Add the missing data to the queue
+ qd->missing_data_ = missing_data;
- // Get the rest of the messaging data
- this->messaging_object ()->get_message_data (qd);
+ // Get the rest of the messaging data
+ this->messaging_object ()->get_message_data (qd);
- // Add it to the tail of the queue..
- this->incoming_message_queue_.enqueue_tail (qd);
+ // If this is a full GIOP fragment, then we need only
+ // to consolidate the fragments
+ if (missing_data == 0 &&
+ (qd->more_fragments_ ||
+ qd->msg_type_ == TAO_PLUGGABLE_MESSAGE_FRAGMENT))
+ {
+ this->consolidate_fragments (qd, rh);
+ }
+ else
+ {
+ // Add it to the tail of the queue..
+ this->incoming_message_queue_.enqueue_tail (qd);
- if (this->incoming_message_queue_.is_head_complete ())
+ if (this->incoming_message_queue_.is_head_complete ())
+ {
+ return this->process_queue_head (rh);
+ }
+ }
+ }
+ else
{
- return this->process_queue_head (rh);
+ // This block of code will only come into play when GIOP
+ // message fragmentation is employed. If we have a fragment
+ // in the message queue, we can only chain message blocks
+ // onto the TAO_Queued_Data for that fragment. Unless we have
+ // a full GIOP fragment, and since we know we're missing data,
+ // we need to save what we have until we can read in some more of
+ // the fragment until we get it all. This bit of data could be
+ // larger than what the partial message block can hold, so we may
+ // need to grow the partial message block.
+ if (this->partial_message_ == 0)
+ {
+ this->allocate_partial_message_block ();
+ }
+
+ if (this->partial_message_ != 0)
+ {
+ const size_t incoming_length = incoming.length ();
+ ACE_CDR::grow (this->partial_message_,
+ incoming_length);
+ if (this->partial_message_->copy (incoming.rd_ptr (),
+ incoming_length) == 0)
+ {
+ incoming.rd_ptr (incoming_length);
+ }
+ else
+ {
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "TAO (%P|%t) - Transport[%d]::consolidate_message, "
+ "unable to save the partial message\n",
+ this->id ()),
+ -1);
+ }
+ }
+ else
+ {
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "TAO (%P|%t) - Transport[%d]::consolidate_message, "
+ "unable to allocate the partial message\n",
+ this->id ()),
+ -1);
+ }
}
return 0;
}
- // We dont have any missing data. Just make a queued_data node with
+ // We don't have any missing data. Just make a queued_data node with
// the existing message block and send it to the higher layers of
// the ORB.
TAO_Queued_Data pqd (&incoming,
@@ -1557,10 +1680,10 @@ TAO_Transport::consolidate_message (ACE_Message_Block &incoming,
if (pqd.more_fragments_ ||
(pqd.msg_type_ == TAO_PLUGGABLE_MESSAGE_FRAGMENT))
{
- // Duplicate the queued data as it is on stack..
- TAO_Queued_Data *nqd = TAO_Queued_Data::duplicate (pqd);
+ // Duplicate the queued data as it is on stack..
+ TAO_Queued_Data *nqd = TAO_Queued_Data::duplicate (pqd);
- return this->consolidate_fragments (nqd, rh);
+ return this->consolidate_fragments (nqd, rh);
}
// Now we have a full message in our buffer. Just go ahead and
@@ -1570,42 +1693,148 @@ TAO_Transport::consolidate_message (ACE_Message_Block &incoming,
}
int
-TAO_Transport::consolidate_fragments (TAO_Queued_Data *qd,
+TAO_Transport::consolidate_fragments (TAO_Queued_Data *queueable_message,
TAO_Resume_Handle &rh)
{
- // If we have received a fragment message then we have to
- // consolidate <qd> with the last message in queue
- // @@todo: this piece of logic follows GIOP a bit... Need to revisit
- // if we have protocols other than GIOP
+ // Get the version numbers
+ CORBA::Octet major = queueable_message->major_version_;
+ CORBA::Octet minor = queueable_message->minor_version_;
+ CORBA::UShort whole = major << 8 | minor;
- // @@todo: Fragments now have too much copying overhead. Need to get
- // them out if we want to have some reasonable performance metrics
- // in future.. Post 1.2 seems a nice time..
- if (qd->msg_type_ == TAO_PLUGGABLE_MESSAGE_FRAGMENT)
+ switch(whole)
{
- TAO_Queued_Data *tqd =
- this->incoming_message_queue_.dequeue_tail ();
-
- tqd->more_fragments_ = qd->more_fragments_;
- tqd->missing_data_ = qd->missing_data_;
-
- if (this->messaging_object ()->consolidate_fragments (tqd, qd) == -1)
+ case 0x0100:
+ if (!queueable_message->more_fragments_)
{
- return -1;
+ this->incoming_message_queue_.enqueue_tail (queueable_message);
+ }
+ else
+ {
+ // Fragments aren't supported in 1.0. This is an error and
+ // we should reject it somehow. What do we do here? Do we throw
+ // an exception to the receiving side? Do we throw an exception
+ // to the sending side?
+ //
+ // At the very least, we need to log the fact that we received
+ // nonsense.
+ ACE_ERROR_RETURN ((LM_ERROR,
+ ACE_TEXT("TAO (%P|%t) - ")
+ ACE_TEXT("TAO_Transport::enqueue_incoming_message ")
+ ACE_TEXT("detected a fragmented GIOP 1.0 message\n")),
+ -1);
}
+ break;
+ case 0x0101:
+ {
+ // One note is that TAO_Queued_Data contains version numbers,
+ // but doesn't indicate the actual protocol to which those
+ // version numbers refer. That's not a problem, though, because
+ // instances of TAO_Queued_Data live in a queue, and that queue
+ // lives in a particular instance of a Transport, and the
+ // transport instance has an association with a particular
+ // messaging_object. The concrete messaging object embodies a
+ // messaging protocol, and must cover all versions of that
+ // protocol. Therefore, we just need to cover the bases of all
+ // versions of that one protocol.
+
+ // In 1.1, fragments kinda suck because they don't have they're
+ // own message-specific header. Therefore, we have to find the
+ // fragment based on the major and minor version.
+ TAO_Queued_Data* fragment_message_chain =
+ this->incoming_message_queue_.find_fragment_chain (major, minor);
+
+ // Deal with the fragment and the queueable message
+ this->process_fragment (fragment_message_chain,
+ queueable_message,
+ major, minor, rh);
+ break;
+ }
+ case 0x0102:
+ {
+ // In 1.2, we get a little more context. There's a
+ // FRAGMENT message-specific header, and inside that is the
+ // request id with which the fragment is associated.
+ TAO_Queued_Data* fragment_message_chain =
+ this->incoming_message_queue_.find_fragment_chain (
+ queueable_message->request_id_);
+
+ // Deal with the fragment and the queueable message
+ this->process_fragment (fragment_message_chain,
+ queueable_message,
+ major, minor, rh);
+ break;
+ }
+ default:
+ ACE_ERROR ((LM_ERROR,
+ ACE_TEXT("TAO (%P|%t) - ")
+ ACE_TEXT("TAO_Transport::consolidate_fragments ")
+ ACE_TEXT("can not handle a GIOP %d.%d ")
+ ACE_TEXT("message\n"), major, minor));
+ ACE_HEX_DUMP ((LM_DEBUG,
+ queueable_message->msg_block_->rd_ptr (),
+ queueable_message->msg_block_->length ()));
+ return -1;
+ }
- TAO_Queued_Data::release (qd);
- this->incoming_message_queue_.enqueue_tail (tqd);
- this->process_queue_head (rh);
+ return 0;
+}
+
+void
+TAO_Transport::process_fragment (TAO_Queued_Data* fragment_message_chain,
+ TAO_Queued_Data* queueable_message,
+ CORBA::Octet major,
+ CORBA::Octet minor,
+ TAO_Resume_Handle &rh)
+{
+ // No fragment was found
+ if (fragment_message_chain == 0)
+ {
+ this->incoming_message_queue_.enqueue_tail (queueable_message);
}
else
{
- // if we dont have a fragment already in the queue just add it in
- // the queue
- this->incoming_message_queue_.enqueue_tail (qd);
- }
+ if (fragment_message_chain->major_version_ != major ||
+ fragment_message_chain->minor_version_ != minor)
+ ACE_ERROR ((LM_ERROR,
+ ACE_TEXT("TAO (%P|%t) - ")
+ ACE_TEXT("TAO_Transport::process_fragment ")
+ ACE_TEXT("GIOP versions do not match ")
+ ACE_TEXT("(%d.%d != %d.%d\n"),
+ fragment_message_chain->major_version_,
+ fragment_message_chain->minor_version_,
+ major, minor));
+
+ // Find the last message block in the continuation
+ ACE_Message_Block* mb = fragment_message_chain->msg_block_;
+ while (mb->cont () != 0)
+ mb = mb->cont ();
+
+ // Add the current message block to the end of the chain
+ // after adjusting the read pointer to skip the header(s)
+ const size_t header_adjustment =
+ this->messaging_object ()->header_length () +
+ this->messaging_object ()->fragment_header_length (major, minor);
+ queueable_message->msg_block_->rd_ptr(header_adjustment);
+ mb->cont (queueable_message->msg_block_);
+
+ // Remove our reference to the message block. At this point
+ // the message block of the fragment head owns it as part of a
+ // chain
+ queueable_message->msg_block_ = 0;
+
+ if (!queueable_message->more_fragments_)
+ {
+ // This is the end of the fragments for this request
+ fragment_message_chain->consolidate ();
- return 0;
+ // Process the queue head to make sure that the newly
+ // consolidated fragments get handled
+ this->process_queue_head (rh);
+ }
+
+ // Get rid of the queuable message
+ TAO_Queued_Data::release (queueable_message);
+ }
}
int
@@ -1922,7 +2151,7 @@ TAO_Transport::make_queued_data (ACE_Message_Block &incoming)
{
// Get an instance of TAO_Queued_Data
TAO_Queued_Data *qd =
- TAO_Queued_Data::get_queued_data (
+ TAO_Queued_Data::make_queued_data (
this->orb_core_->transport_message_buffer_allocator ());
// Get the flag for the details of the data block...
@@ -2180,6 +2409,19 @@ TAO_Transport::post_open (size_t id)
return true;
}
+void
+TAO_Transport::allocate_partial_message_block (void)
+{
+ if (this->partial_message_ == 0)
+ {
+ // This value must be at least large enough to hold a GIOP message
+ // header plus a GIOP fragment header
+ const size_t partial_message_size = 16;
+ ACE_NEW (this->partial_message_,
+ ACE_Message_Block (partial_message_size));
+ }
+}
+
#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)
template class ACE_Reverse_Lock<ACE_Lock>;
diff --git a/TAO/tao/Transport.h b/TAO/tao/Transport.h
index ff9e669037d..265e9c0a28c 100644
--- a/TAO/tao/Transport.h
+++ b/TAO/tao/Transport.h
@@ -847,6 +847,18 @@ private:
/// Assume the lock is held
void send_connection_closed_notifications_i (void);
+ /// Process a non-version specific fragment by either consolidating
+ /// the fragments or enqueuing the queueable message
+ void process_fragment (TAO_Queued_Data* fragment_message,
+ TAO_Queued_Data* queueable_message,
+ CORBA::Octet major,
+ CORBA::Octet minor,
+ TAO_Resume_Handle &rh);
+
+ /// Allocate a partial message block and store it in our
+ /// partial_message_ data member.
+ void allocate_partial_message_block (void);
+
/// Prohibited
ACE_UNIMPLEMENTED_FUNC (TAO_Transport (const TAO_Transport&))
ACE_UNIMPLEMENTED_FUNC (void operator= (const TAO_Transport&))
@@ -973,6 +985,9 @@ private:
/// first request. After that, the translators are fixed for the life of the
/// connection.
CORBA::Boolean first_request_;
+
+ /// Holds the partial GIOP message (if there is one)
+ ACE_Message_Block* partial_message_;
};
/**
diff --git a/TAO/tests/GIOP_Fragments/Java_Big_Reply/Client_Task.cpp b/TAO/tests/GIOP_Fragments/Java_Big_Reply/Client_Task.cpp
new file mode 100644
index 00000000000..a6a92dab129
--- /dev/null
+++ b/TAO/tests/GIOP_Fragments/Java_Big_Reply/Client_Task.cpp
@@ -0,0 +1,60 @@
+//
+// $Id$
+//
+
+#include "Client_Task.h"
+
+ACE_RCSID(Muxing, Client_Task, "$Id$")
+
+Client_Task::Client_Task (Test::Big_Reply_ptr reply_gen,
+ int event_count,
+ ACE_Thread_Manager *thr_mgr)
+ : ACE_Task_Base (thr_mgr)
+ , reply_gen_ (Test::Big_Reply::_duplicate (reply_gen))
+ , event_count_ (event_count)
+
+{
+}
+
+int
+Client_Task::svc (void)
+{
+ ACE_DEBUG ((LM_DEBUG, "(%P|%t) Starting client task\n"));
+
+ ACE_DECLARE_NEW_CORBA_ENV;
+
+ // Now get the big replies..
+ ACE_TRY
+ {
+ for (int i = 0; i != this->event_count_; ++i)
+ {
+ Test::Octet_Seq_var seq =
+ this->reply_gen_->get_big_reply (ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+
+ CORBA::ULong length = seq->length ();
+ for(CORBA::ULong i = 0; i < length; ++i)
+ {
+ if (seq[i] != 'A')
+ {
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "Invalid data '%c' at %d\n", seq[i], i),
+ -1);
+ }
+ }
+
+// ACE_Time_Value tv(0, 10000000);
+// ACE_OS::sleep(tv);
+ }
+ }
+ ACE_CATCHANY
+ {
+ ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION,
+ "Exception Caught \n");
+ return -1;
+ }
+ ACE_ENDTRY;
+
+ ACE_DEBUG ((LM_DEBUG, "(%P|%t) Client task finished\n"));
+ return 0;
+}
diff --git a/TAO/tests/GIOP_Fragments/Java_Big_Reply/Client_Task.h b/TAO/tests/GIOP_Fragments/Java_Big_Reply/Client_Task.h
new file mode 100644
index 00000000000..40c669d8344
--- /dev/null
+++ b/TAO/tests/GIOP_Fragments/Java_Big_Reply/Client_Task.h
@@ -0,0 +1,38 @@
+//
+// $Id$
+//
+
+#ifndef BIG_REPLY_CLIENT_TASK_H
+#define BIG_REPLY_CLIENT_TASK_H
+#include "ace/pre.h"
+
+#include "TestC.h"
+#include "ace/Task.h"
+
+#if !defined (ACE_LACKS_PRAGMA_ONCE)
+# pragma once
+#endif /* ACE_LACKS_PRAGMA_ONCE */
+
+/// Implement a Task to run the experiments using multiple threads.
+class Client_Task : public ACE_Task_Base
+{
+public:
+ /// Constructor
+ Client_Task (Test::Big_Reply_ptr receiver,
+ CORBA::Long event_count,
+ ACE_Thread_Manager *thr_mgr);
+
+ /// Thread entry point
+ int svc (void);
+
+private:
+
+ /// Reference to the test interface
+ Test::Big_Reply_var reply_gen_;
+
+ /// Number of remote calls
+ int event_count_;
+};
+
+#include "ace/post.h"
+#endif /* BIG_REPLY_CLIENT_TASK_H */
diff --git a/TAO/tests/GIOP_Fragments/Java_Big_Reply/Java_Big_Reply.mpc b/TAO/tests/GIOP_Fragments/Java_Big_Reply/Java_Big_Reply.mpc
new file mode 100644
index 00000000000..627a8a61136
--- /dev/null
+++ b/TAO/tests/GIOP_Fragments/Java_Big_Reply/Java_Big_Reply.mpc
@@ -0,0 +1,45 @@
+project(*IDL) {
+ requires += java
+
+ // Define the Java IDL with a hack output extension
+ // Since our idl file has a module, the generated java files
+ // will go int the module name so we set the extension to /
+ Define_Custom(JAVA_IDL) {
+ command = idlj
+ inputext = .idl
+ generic_outputext = /
+ }
+
+ // Make sure we don't get any of the defaults
+ Source_Files {
+ }
+ Inline_Files {
+ }
+ Header_Files {
+ }
+
+ JAVA_IDL_Files {
+ // The idlj will automatically put the generated files
+ // in the Test directory
+ gendir = Test
+
+ // We want both server and client side
+ commandflags += -fall
+
+ Test.idl
+ }
+}
+
+// This project will build the java server and C++ client
+project(Java_Big_Reply): taoexe, portableserver {
+ requires += java
+ after += *IDL
+
+ // Define the java type
+ Define_Custom(JAVA) {
+ command = javac
+ commandflags = -d .
+ inputext = .java
+ generic_outputext = .class
+ }
+}
diff --git a/TAO/tests/GIOP_Fragments/Java_Big_Reply/Test.idl b/TAO/tests/GIOP_Fragments/Java_Big_Reply/Test.idl
new file mode 100644
index 00000000000..442d6a746be
--- /dev/null
+++ b/TAO/tests/GIOP_Fragments/Java_Big_Reply/Test.idl
@@ -0,0 +1,18 @@
+//$Id$
+module Test
+{
+
+ typedef sequence<octet> Octet_Seq;
+
+ interface Big_Reply
+ {
+ /// Receive a big reply
+ Octet_Seq get_big_reply ();
+
+ /// Ping message
+ void ping ();
+
+ /// Shudown the remote ORB
+ oneway void shutdown ();
+ };
+};
diff --git a/TAO/tests/GIOP_Fragments/Java_Big_Reply/client.cpp b/TAO/tests/GIOP_Fragments/Java_Big_Reply/client.cpp
new file mode 100644
index 00000000000..e900b46cdc1
--- /dev/null
+++ b/TAO/tests/GIOP_Fragments/Java_Big_Reply/client.cpp
@@ -0,0 +1,95 @@
+// $Id$
+
+#include "Client_Task.h"
+#include "ace/Get_Opt.h"
+
+ACE_RCSID(Muxing, client, "$Id$")
+
+static const char *ior = "file://server.ior";
+static size_t nthreads = 1;
+
+int
+parse_args (int argc, char *argv[])
+{
+ ACE_Get_Opt get_opts (argc, argv, "k:n:");
+ int c;
+
+ while ((c = get_opts ()) != -1)
+ switch (c)
+ {
+ case 'k':
+ ior = get_opts.opt_arg ();
+ break;
+ case 'n':
+ nthreads = ACE_OS::atoi (get_opts.opt_arg ());
+ break;
+ case '?':
+ default:
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "usage: %s "
+ "-k <ior> "
+ "-n <nthreads> "
+ "\n",
+ argv [0]),
+ -1);
+ }
+ // Indicates sucessful parsing of the command line
+ return 0;
+}
+
+int
+main (int argc, char *argv[])
+{
+ ACE_TRY_NEW_ENV
+ {
+ CORBA::ORB_var orb =
+ CORBA::ORB_init (argc, argv, "" ACE_ENV_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+
+ if (parse_args (argc, argv) != 0)
+ return 1;
+
+ CORBA::Object_var tmp =
+ orb->string_to_object(ior ACE_ENV_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+
+ Test::Big_Reply_var reply_gen =
+ Test::Big_Reply::_narrow(tmp.in () ACE_ENV_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+
+ if (CORBA::is_nil (reply_gen.in ()))
+ {
+ ACE_ERROR_RETURN ((LM_DEBUG,
+ "Nil coordinator reference <%s>\n",
+ ior),
+ 1);
+ }
+
+
+ Client_Task client_task (reply_gen.in (),
+ 10,
+ ACE_Thread_Manager::instance ());
+
+ if (client_task.activate (THR_NEW_LWP | THR_JOINABLE,
+ nthreads, 1) == -1)
+ {
+ ACE_ERROR ((LM_ERROR, "Error activating client task\n"));
+ }
+ ACE_Thread_Manager::instance ()->wait ();
+
+ reply_gen->shutdown (ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+
+ orb->destroy (ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+ }
+ ACE_CATCHANY
+ {
+ ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION,
+ "Exception caught:");
+ return 1;
+ }
+ ACE_ENDTRY;
+
+ return 0;
+}
diff --git a/TAO/tests/GIOP_Fragments/Java_Big_Reply/run_test.pl b/TAO/tests/GIOP_Fragments/Java_Big_Reply/run_test.pl
new file mode 100755
index 00000000000..d51fafed5e6
--- /dev/null
+++ b/TAO/tests/GIOP_Fragments/Java_Big_Reply/run_test.pl
@@ -0,0 +1,62 @@
+eval '(exit $?0)' && eval 'exec perl -S $0 ${1+"$@"}'
+ & eval 'exec perl -S $0 $argv:q'
+ if 0;
+
+# $Id$
+# -*- perl -*-
+
+use lib '../../../../bin';
+use PerlACE::Run_Test;
+use Config;
+sub which {
+ my($prog) = shift;
+ my($exec) = $prog;
+
+ if (defined $ENV{'PATH'}) {
+ my($part) = '';
+ foreach $part (split($Config{'path_sep'}, $ENV{'PATH'})) {
+ $part .= "/$prog";
+ if ( -x $part ) {
+ $exec = $part;
+ last;
+ }
+ }
+ }
+
+ return $exec;
+}
+
+$iorfile = PerlACE::LocalFile ('server.ior');
+unlink $iorfile;
+
+$SV = new PerlACE::Process (which('java'), 'server');
+$CL = new PerlACE::Process ('client');
+
+$SV->Spawn ();
+
+if (PerlACE::waitforfile_timed ($iorfile, 15) == -1) {
+ print STDERR "ERROR: cannot find file <$iorfile>\n";
+ $SV->Kill (); $SV->TimedWait (1);
+ exit 1;
+}
+
+$CL->Spawn (60);
+
+$client = $CL->WaitKill (60);
+
+if ($client1 != 0) {
+ print STDERR "ERROR: client 1 returned $client1\n";
+ $status = 1;
+}
+
+
+$server = $SV->WaitKill (10);
+
+if ($server != 0) {
+ print STDERR "ERROR: server returned $server\n";
+ $status = 1;
+}
+
+unlink $iorfile;
+
+exit $status;
diff --git a/TAO/tests/GIOP_Fragments/Java_Big_Reply/server.java b/TAO/tests/GIOP_Fragments/Java_Big_Reply/server.java
new file mode 100644
index 00000000000..f23d552c542
--- /dev/null
+++ b/TAO/tests/GIOP_Fragments/Java_Big_Reply/server.java
@@ -0,0 +1,62 @@
+// $Id$
+
+// If this server is compiled and run with the JDK ORB, it will
+// fragment the GIOP Messages sent when get_big_reply() is called.
+
+import org.omg.CORBA.*;
+import org.omg.PortableServer.*;
+import Test.Big_ReplyPOA;
+
+class Big_ReplyImpl extends Big_ReplyPOA
+{
+ private org.omg.CORBA.ORB orb_;
+
+ public byte[] get_big_reply () {
+ byte [] seq = new byte [1000000];
+ for (int i = 0; i < seq.length; i++)
+ seq [i] = 'A';
+ return seq;
+ }
+
+ public void setORB (org.omg.CORBA.ORB orb_val) {
+ orb_ = orb_val;
+ }
+
+ public void ping () {
+ }
+
+ public void shutdown () {
+ orb_.shutdown (false);
+ }
+}
+
+
+public class server
+{
+ public static void main (String args[]) {
+ try {
+ ORB orb = ORB.init (args, null);
+ POA poa = org.omg.PortableServer.POAHelper.narrow (
+ orb.resolve_initial_references ("RootPOA"));
+
+ Big_ReplyImpl servant = new Big_ReplyImpl ();
+ servant.setORB (orb);
+ poa.activate_object (servant);
+
+ String filename = new String ("server.ior");
+ String ior = orb.object_to_string (servant._this ());
+ java.io.FileWriter file = new java.io.FileWriter (filename);
+ file.write (ior);
+ file.flush ();
+ file.close ();
+
+ poa.the_POAManager ().activate ();
+ System.out.println ("Ready...");
+ orb.run ();
+ }
+ catch (Exception e) {
+ System.err.println ("ERROR: " + e);
+ e.printStackTrace (System.out);
+ }
+ }
+}
diff --git a/TAO/tests/GIOP_Fragments/Java_Big_Request/Java_Big_Request.mpc b/TAO/tests/GIOP_Fragments/Java_Big_Request/Java_Big_Request.mpc
new file mode 100644
index 00000000000..6503fb95c9b
--- /dev/null
+++ b/TAO/tests/GIOP_Fragments/Java_Big_Request/Java_Big_Request.mpc
@@ -0,0 +1,42 @@
+project(*IDL) {
+ requires += java
+
+ // Define the Java IDL with a hack output extension
+ // Since our idl file has a module, the generated java files
+ // will go int the module name so we set the extension to /
+ Define_Custom(JAVA_IDL) {
+ command = idlj
+ inputext = .idl
+ generic_outputext = /
+ }
+
+ // Make sure we don't get any of the defaults
+ Source_Files {
+ }
+ Inline_Files {
+ }
+ Header_Files {
+ }
+
+ JAVA_IDL_Files {
+ // The idlj will automatically put the generated files
+ // in the Test directory
+ gendir = Test
+
+ Test.idl
+ }
+}
+
+// This project will build the java server and C++ client
+project(Java_Big_Request): taoexe, portableserver {
+ requires += java
+ after += *IDL
+
+ // Define the java type
+ Define_Custom(JAVA) {
+ command = javac
+ commandflags = -d .
+ inputext = .java
+ generic_outputext = .class
+ }
+}
diff --git a/TAO/tests/GIOP_Fragments/Java_Big_Request/Payload_Receiver.cpp b/TAO/tests/GIOP_Fragments/Java_Big_Request/Payload_Receiver.cpp
new file mode 100644
index 00000000000..41bb21bca37
--- /dev/null
+++ b/TAO/tests/GIOP_Fragments/Java_Big_Request/Payload_Receiver.cpp
@@ -0,0 +1,45 @@
+//
+// $Id$
+//
+#include "Payload_Receiver.h"
+
+ACE_RCSID(Big_Request_Muxing, Payload_Receiver, "$Id$")
+
+Payload_Receiver::Payload_Receiver (CORBA::ORB_ptr orb)
+ : count_ (0),
+ orb_(CORBA::ORB::_duplicate (orb))
+{
+}
+
+void
+Payload_Receiver::more_data (const Test::Payload &payload
+ ACE_ENV_ARG_DECL)
+ ACE_THROW_SPEC ((CORBA::SystemException,
+ Test::Payload_Receiver::Invalid_Payload))
+{
+ ++this->count_;
+
+ CORBA::ULong length = payload.length ();
+ for (CORBA::ULong i = 0; i < length; ++i)
+ {
+ if (payload[i] != 'A')
+ {
+ ACE_THROW (Test::Payload_Receiver::Invalid_Payload ());
+ }
+ }
+}
+
+
+void
+Payload_Receiver::shutdown (ACE_ENV_SINGLE_ARG_DECL)
+ ACE_THROW_SPEC ((CORBA::SystemException))
+{
+ this->orb_->shutdown ();
+}
+
+
+int
+Payload_Receiver::get_count () const
+{
+ return count_;
+}
diff --git a/TAO/tests/GIOP_Fragments/Java_Big_Request/Payload_Receiver.h b/TAO/tests/GIOP_Fragments/Java_Big_Request/Payload_Receiver.h
new file mode 100644
index 00000000000..8a0c4eafe74
--- /dev/null
+++ b/TAO/tests/GIOP_Fragments/Java_Big_Request/Payload_Receiver.h
@@ -0,0 +1,51 @@
+//
+// $Id$
+//
+
+#ifndef BIG_REQUEST_MUXING_PAYLOAD_RECEIVER_H
+#define BIG_REQUEST_MUXING_PAYLOAD_RECEIVER_H
+#include "ace/pre.h"
+
+#include "TestS.h"
+
+#if defined (_MSC_VER)
+# if (_MSC_VER >= 1200)
+# pragma warning(push)
+# endif /* _MSC_VER >= 1200 */
+# pragma warning (disable:4250)
+#endif /* _MSC_VER */
+
+/// Implement the Test::Payload_Receiver interface
+/**
+ * Simply print count how many bytes were received.
+ */
+class Payload_Receiver
+ : public virtual POA_Test::Payload_Receiver
+ , public virtual PortableServer::RefCountServantBase
+{
+public:
+ /// Constructor
+ Payload_Receiver (CORBA::ORB_ptr orb);
+
+ // = The skeleton methods
+ virtual void more_data (const Test::Payload &payload
+ ACE_ENV_ARG_DECL)
+ ACE_THROW_SPEC ((CORBA::SystemException,
+ Test::Payload_Receiver::Invalid_Payload));
+
+ virtual void shutdown (ACE_ENV_SINGLE_ARG_DECL)
+ ACE_THROW_SPEC ((CORBA::SystemException));
+
+ int get_count (void) const;
+
+public:
+ int count_;
+ CORBA::ORB_var orb_;
+};
+
+#if defined(_MSC_VER) && (_MSC_VER >= 1200)
+# pragma warning(pop)
+#endif /* _MSC_VER */
+
+#include "ace/post.h"
+#endif /* BIG_REQUEST_MUXING_PAYLOAD_RECEIVER_H */
diff --git a/TAO/tests/GIOP_Fragments/Java_Big_Request/Test.idl b/TAO/tests/GIOP_Fragments/Java_Big_Request/Test.idl
new file mode 100644
index 00000000000..902aed29cf4
--- /dev/null
+++ b/TAO/tests/GIOP_Fragments/Java_Big_Request/Test.idl
@@ -0,0 +1,21 @@
+//
+// $Id$
+//
+
+module Test
+{
+ typedef sequence<octet> Payload;
+
+ interface Payload_Receiver
+ {
+ exception Invalid_Payload {
+ };
+
+ /// Send the data using a twoway operation
+ void more_data (in Payload the_payload)
+ raises (Invalid_Payload);
+
+ /// Shudown the remote ORB
+ oneway void shutdown ();
+ };
+};
diff --git a/TAO/tests/GIOP_Fragments/Java_Big_Request/client.java b/TAO/tests/GIOP_Fragments/Java_Big_Request/client.java
new file mode 100644
index 00000000000..b3d1dd3eafb
--- /dev/null
+++ b/TAO/tests/GIOP_Fragments/Java_Big_Request/client.java
@@ -0,0 +1,47 @@
+// $Id$
+
+// If this server is compiled and run with the JDK ORB, it will
+// fragment the GIOP Messages sent when more_data() is called.
+
+import java.io.FileReader;
+import java.io.BufferedReader;
+import org.omg.CORBA.*;
+import org.omg.PortableServer.*;
+import Test.Payload_Receiver;
+import Test.Payload_ReceiverHelper;
+
+public class client
+{
+ public static void main (String args[]) {
+ try {
+ ORB orb = ORB.init (args, null);
+
+ // Get the object reference
+ BufferedReader reader = new BufferedReader (
+ new FileReader ("server.ior"));
+ StringBuffer ior = new StringBuffer();
+ String line = null;
+ while ((line = reader.readLine()) != null) {
+ ior.append(line);
+ }
+
+ org.omg.CORBA.Object obj = orb.string_to_object (ior.toString ());
+ Payload_Receiver receiver = Payload_ReceiverHelper.narrow (obj);
+
+ // Set up the payload
+ byte [] seq = new byte [1000000];
+ for (int i = 0; i < seq.length; ++i)
+ seq [i] = 'A';
+
+ // Invoke the method on the server
+ for (int i = 0; i < 20; ++i)
+ receiver.more_data (seq);
+
+ receiver.shutdown ();
+ }
+ catch (Exception e) {
+ System.err.println ("ERROR: " + e);
+ e.printStackTrace (System.out);
+ }
+ }
+}
diff --git a/TAO/tests/GIOP_Fragments/Java_Big_Request/run_test.pl b/TAO/tests/GIOP_Fragments/Java_Big_Request/run_test.pl
new file mode 100755
index 00000000000..e8514914409
--- /dev/null
+++ b/TAO/tests/GIOP_Fragments/Java_Big_Request/run_test.pl
@@ -0,0 +1,53 @@
+eval '(exit $?0)' && eval 'exec perl -S $0 ${1+"$@"}'
+ & eval 'exec perl -S $0 $argv:q'
+ if 0;
+
+# $Id$
+# -*- perl -*-
+
+use lib '../../../../bin';
+use PerlACE::Run_Test;
+
+$status = 0;
+$iorfile = PerlACE::LocalFile ('server.ior');
+$TARGETHOSTNAME = 'localhost';
+$port = PerlACE::uniqueid () + 12000;
+
+## No ORB fragments GIOP 1.0 messages.
+## The JDK ORB only fragments GIOP 1.2 messages.
+foreach my $giop ('1.2') {
+ print "Testing GIOP $giop Fragmentation\n";
+ unlink $iorfile;
+
+ $SV = new PerlACE::Process ('server',
+ '-ORBEndpoint ' .
+ "iiop://$giop\@$TARGETHOSTNAME" . ":$port");
+ $SV->Spawn ();
+
+ if (PerlACE::waitforfile_timed ($iorfile, 15) == -1) {
+ print STDERR "ERROR: cannot find file <$iorfile>\n";
+ $SV->Kill (); $SV->TimedWait (1);
+ exit 1;
+ }
+
+ my($cl) = system('java client');
+ if ($cl != 0) {
+ print STDERR "ERROR: client returned $cl\n";
+ ++$status;
+ }
+
+ $server = $SV->WaitKill (20);
+
+ if ($server != 0) {
+ print STDERR "ERROR: server returned $server\n";
+ ++$status;
+ }
+
+ unlink $iorfile;
+
+ if ($status) {
+ last;
+ }
+}
+
+exit $status;
diff --git a/TAO/tests/GIOP_Fragments/Java_Big_Request/server.cpp b/TAO/tests/GIOP_Fragments/Java_Big_Request/server.cpp
new file mode 100644
index 00000000000..8a7bbbd6c4b
--- /dev/null
+++ b/TAO/tests/GIOP_Fragments/Java_Big_Request/server.cpp
@@ -0,0 +1,119 @@
+// $Id$
+
+#include "Payload_Receiver.h"
+#include "ace/Get_Opt.h"
+
+ACE_RCSID(Big_Request_Muxing, server, "$Id$")
+
+const char *ior_output_file = "server.ior";
+
+int
+parse_args (int argc, char *argv[])
+{
+ ACE_Get_Opt get_opts (argc, argv, "o:");
+ int c;
+
+ while ((c = get_opts ()) != -1)
+ switch (c)
+ {
+ case 'o':
+ ior_output_file = get_opts.opt_arg ();
+ break;
+ case '?':
+ default:
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "usage: %s "
+ "-o <iorfile>"
+ "\n",
+ argv [0]),
+ -1);
+ }
+ // Indicates sucessful parsing of the command line
+ return 0;
+}
+
+int
+main (int argc, char *argv[])
+{
+ int status = 0;
+ ACE_TRY_NEW_ENV
+ {
+ CORBA::ORB_var orb =
+ CORBA::ORB_init (argc, argv, "" ACE_ENV_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+
+ CORBA::Object_var poa_object =
+ orb->resolve_initial_references("RootPOA" ACE_ENV_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+
+ if (CORBA::is_nil (poa_object.in ()))
+ ACE_ERROR_RETURN ((LM_ERROR,
+ " (%P|%t) Unable to initialize the POA.\n"),
+ 1);
+
+ PortableServer::POA_var root_poa =
+ PortableServer::POA::_narrow (poa_object.in () ACE_ENV_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+
+ PortableServer::POAManager_var poa_manager =
+ root_poa->the_POAManager (ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+
+ if (parse_args (argc, argv) != 0)
+ return 1;
+
+ Payload_Receiver *payload_receiver_impl;
+ ACE_NEW_RETURN (payload_receiver_impl,
+ Payload_Receiver (orb.in ()),
+ 1);
+ PortableServer::ServantBase_var receiver_owner_transfer(payload_receiver_impl);
+
+ Test::Payload_Receiver_var payload_receiver =
+ payload_receiver_impl->_this (ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+
+ CORBA::String_var ior =
+ orb->object_to_string (payload_receiver.in () ACE_ENV_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+
+ // If the ior_output_file exists, output the ior to it
+ FILE *output_file= ACE_OS::fopen (ior_output_file, "w");
+ if (output_file == 0)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "Cannot open output file for writing IOR: %s",
+ ior_output_file),
+ 1);
+ ACE_OS::fprintf (output_file, "%s", ior.in ());
+ ACE_OS::fclose (output_file);
+
+ poa_manager->activate (ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+
+ orb->run (ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+
+ if (payload_receiver_impl->get_count () != 20)
+ {
+ ACE_ERROR ((LM_ERROR,
+ "ERROR: %d is not the correct "
+ "number of calls\n",
+ payload_receiver_impl->get_count ()));
+ ++status;
+ }
+
+ root_poa->destroy (1, 1 ACE_ENV_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+
+ orb->destroy (ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+ }
+ ACE_CATCHANY
+ {
+ ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION,
+ "Exception caught:");
+ ++status;
+ }
+ ACE_ENDTRY;
+
+ return status;
+}
diff --git a/TAO/tests/GIOP_Fragments/PMB_With_Fragments/PMB_With_Fragments.mpc b/TAO/tests/GIOP_Fragments/PMB_With_Fragments/PMB_With_Fragments.mpc
new file mode 100644
index 00000000000..adbd1fd9303
--- /dev/null
+++ b/TAO/tests/GIOP_Fragments/PMB_With_Fragments/PMB_With_Fragments.mpc
@@ -0,0 +1,2 @@
+project: taoexe, portableserver {
+}
diff --git a/TAO/tests/GIOP_Fragments/PMB_With_Fragments/Payload_Receiver.cpp b/TAO/tests/GIOP_Fragments/PMB_With_Fragments/Payload_Receiver.cpp
new file mode 100644
index 00000000000..41bb21bca37
--- /dev/null
+++ b/TAO/tests/GIOP_Fragments/PMB_With_Fragments/Payload_Receiver.cpp
@@ -0,0 +1,45 @@
+//
+// $Id$
+//
+#include "Payload_Receiver.h"
+
+ACE_RCSID(Big_Request_Muxing, Payload_Receiver, "$Id$")
+
+Payload_Receiver::Payload_Receiver (CORBA::ORB_ptr orb)
+ : count_ (0),
+ orb_(CORBA::ORB::_duplicate (orb))
+{
+}
+
+void
+Payload_Receiver::more_data (const Test::Payload &payload
+ ACE_ENV_ARG_DECL)
+ ACE_THROW_SPEC ((CORBA::SystemException,
+ Test::Payload_Receiver::Invalid_Payload))
+{
+ ++this->count_;
+
+ CORBA::ULong length = payload.length ();
+ for (CORBA::ULong i = 0; i < length; ++i)
+ {
+ if (payload[i] != 'A')
+ {
+ ACE_THROW (Test::Payload_Receiver::Invalid_Payload ());
+ }
+ }
+}
+
+
+void
+Payload_Receiver::shutdown (ACE_ENV_SINGLE_ARG_DECL)
+ ACE_THROW_SPEC ((CORBA::SystemException))
+{
+ this->orb_->shutdown ();
+}
+
+
+int
+Payload_Receiver::get_count () const
+{
+ return count_;
+}
diff --git a/TAO/tests/GIOP_Fragments/PMB_With_Fragments/Payload_Receiver.h b/TAO/tests/GIOP_Fragments/PMB_With_Fragments/Payload_Receiver.h
new file mode 100644
index 00000000000..8a0c4eafe74
--- /dev/null
+++ b/TAO/tests/GIOP_Fragments/PMB_With_Fragments/Payload_Receiver.h
@@ -0,0 +1,51 @@
+//
+// $Id$
+//
+
+#ifndef BIG_REQUEST_MUXING_PAYLOAD_RECEIVER_H
+#define BIG_REQUEST_MUXING_PAYLOAD_RECEIVER_H
+#include "ace/pre.h"
+
+#include "TestS.h"
+
+#if defined (_MSC_VER)
+# if (_MSC_VER >= 1200)
+# pragma warning(push)
+# endif /* _MSC_VER >= 1200 */
+# pragma warning (disable:4250)
+#endif /* _MSC_VER */
+
+/// Implement the Test::Payload_Receiver interface
+/**
+ * Simply print count how many bytes were received.
+ */
+class Payload_Receiver
+ : public virtual POA_Test::Payload_Receiver
+ , public virtual PortableServer::RefCountServantBase
+{
+public:
+ /// Constructor
+ Payload_Receiver (CORBA::ORB_ptr orb);
+
+ // = The skeleton methods
+ virtual void more_data (const Test::Payload &payload
+ ACE_ENV_ARG_DECL)
+ ACE_THROW_SPEC ((CORBA::SystemException,
+ Test::Payload_Receiver::Invalid_Payload));
+
+ virtual void shutdown (ACE_ENV_SINGLE_ARG_DECL)
+ ACE_THROW_SPEC ((CORBA::SystemException));
+
+ int get_count (void) const;
+
+public:
+ int count_;
+ CORBA::ORB_var orb_;
+};
+
+#if defined(_MSC_VER) && (_MSC_VER >= 1200)
+# pragma warning(pop)
+#endif /* _MSC_VER */
+
+#include "ace/post.h"
+#endif /* BIG_REQUEST_MUXING_PAYLOAD_RECEIVER_H */
diff --git a/TAO/tests/GIOP_Fragments/PMB_With_Fragments/Test.idl b/TAO/tests/GIOP_Fragments/PMB_With_Fragments/Test.idl
new file mode 100644
index 00000000000..902aed29cf4
--- /dev/null
+++ b/TAO/tests/GIOP_Fragments/PMB_With_Fragments/Test.idl
@@ -0,0 +1,21 @@
+//
+// $Id$
+//
+
+module Test
+{
+ typedef sequence<octet> Payload;
+
+ interface Payload_Receiver
+ {
+ exception Invalid_Payload {
+ };
+
+ /// Send the data using a twoway operation
+ void more_data (in Payload the_payload)
+ raises (Invalid_Payload);
+
+ /// Shudown the remote ORB
+ oneway void shutdown ();
+ };
+};
diff --git a/TAO/tests/GIOP_Fragments/PMB_With_Fragments/dribble.pl b/TAO/tests/GIOP_Fragments/PMB_With_Fragments/dribble.pl
new file mode 100755
index 00000000000..248b60d6a11
--- /dev/null
+++ b/TAO/tests/GIOP_Fragments/PMB_With_Fragments/dribble.pl
@@ -0,0 +1,147 @@
+eval '(exit $?0)' && eval 'exec perl -w -S $0 ${1+"$@"}'
+ & eval 'exec perl -w -S $0 $argv:q'
+ if 0;
+
+# ******************************************************************
+# Author: Chris Cleeland
+# Date: 10/26/2002
+# $Id$
+# ******************************************************************
+
+use Getopt::Long;
+use File::Basename;
+use IO::Socket::INET;
+use IO::File;
+use Carp;
+
+sub getnum {
+ use POSIX qw(strtoul);
+ my $str = shift;
+ $str =~ s/^\s+//;
+ $str =~ s/\s+$//;
+ $! = 0;
+ my($num, $unparsed) = strtoul($str);
+ if (($str eq '') || ($unparsed != 0) || $!) {
+ return undef;
+ } else {
+ return $num;
+ }
+}
+
+my $corba_server_addr = "127.0.0.1";
+my $corba_server_port = 12345;
+my $progress_interval = 10;
+my $verbosity = 0;
+my $interactive = undef;
+my $datastreamfile = 'datastream.dat';
+my $dataxmitlayoutfile = undef;
+my $delay = .25;
+
+my %options = ('host|h=s' => \$corba_server_addr,
+ 'port|p=s' => \$corba_server_port,
+ 'interval|i=i' => \$progress_interval,
+ 'verbose|verbosity|v+' => \$verbosity,
+ 'interactive' => \$interactive,
+ 'stream|s=s' => \$datastreamfile,
+ 'layout|l=s' => \$dataxmitlayoutfile,
+ 'delay|d=f' => \$delay,
+ );
+my $result = GetOptions (%options);
+
+if (!$result) {
+ my($str) = 'Usage: ' . basename($0);
+ my($initial) = length($str);
+ my($length) = $initial;
+ my($maxLine) = 78;
+
+ print $str;
+
+ foreach my $key (sort keys %options) {
+ my($opt, $type) = split(/[|=]/, $key);
+ my($str) = " [--$opt" . (defined $type ? " <$opt>" : "") . "]";
+ my($len) = length($str);
+ if ($length + $len > $maxLine) {
+ print "\n" . (" " x $initial);
+ $length = $initial;
+ }
+ print $str;
+ $length += $len;
+ }
+ print "\n";
+ exit(0);
+}
+
+croak "Must supply a data file using --stream and a layout file using --layout"
+ unless defined $datastreamfile && defined $dataxmitlayoutfile;
+
+my $sock = IO::Socket::INET->new(PeerAddr => $corba_server_addr,
+ PeerPort => $corba_server_port,
+ Proto => 'tcp')
+ or croak "Unable to establish connection to $corba_server_addr:$corba_server_port: $!\n";
+
+#
+# Infer a name for the layout file
+#
+if (! defined $dataxmitlayoutfile) {
+ $dataxmitlayoutfile = $datastreamfile;
+ $dataxmitlayoutfile =~ s/\.dat$/\.layout/;
+}
+
+# Now we just start reading from <> and writing to the socket We
+# currently assume all are oneways, so we don't worry about reading
+# from the socket.
+
+my $stream = new IO::File $datastreamfile, "r"
+ or croak "Unable to open $datastreamfile for reading: $!";
+my $layout = new IO::File $dataxmitlayoutfile, "r"
+ or croak "Unable to open $dataxmitlayoutfile for reading: $!";
+
+$| = 1;
+print "Sending...(1 dot every $progress_interval hunks)\n";
+print "Hit <RETURN> key to send a packet\n" if defined $interactive;
+my $numread;
+do {
+
+ #
+ # Determine the size of the hunk we have to read/send
+ #
+ $_ = $layout->getline;
+ s/#.*$//;
+ chomp $_;
+ next if ($_ =~ /^\s*$/);
+
+ my $requested_hunksize = getnum($_);
+ my $l = $layout->input_line_number;
+
+ my $hunk;
+
+ #
+ # Read in the hunk size we want
+ #
+ $numread = $stream->sysread($hunk, $requested_hunksize);
+ if (! defined $numread) {
+ carp "\nHunk $l: Error reading from stream: $!";
+ next;
+ }
+ elsif ($numread != $requested_hunksize) {
+ carp "\nHunk $l: short read (expected $requested_hunksize, got $numread); going on.";
+ }
+
+ print "Hunk $l: length ", length($hunk), "\n" if ($verbosity >= 1);
+ if (defined $interactive) {
+ <STDIN>;
+ }
+ else {
+ select(undef, undef, undef, $delay);
+ }
+
+ my $ret = $sock->send($hunk, 0);
+ if ($ret != length($hunk)) {
+ carp "\nHunk $l: problem sending hunk $.: $!\n";
+ exit(1);
+ }
+
+ print "." if ($. % $progress_interval == 0);
+} until $numread == 0 || $layout->eof;
+print "\nDone.\n"
+
diff --git a/TAO/tests/GIOP_Fragments/PMB_With_Fragments/giop1.2_fragments.dat b/TAO/tests/GIOP_Fragments/PMB_With_Fragments/giop1.2_fragments.dat
new file mode 100644
index 00000000000..09582c5f2c4
--- /dev/null
+++ b/TAO/tests/GIOP_Fragments/PMB_With_Fragments/giop1.2_fragments.dat
Binary files differ
diff --git a/TAO/tests/GIOP_Fragments/PMB_With_Fragments/giop1.2_fragments.layout b/TAO/tests/GIOP_Fragments/PMB_With_Fragments/giop1.2_fragments.layout
new file mode 100644
index 00000000000..aa5766e5a62
--- /dev/null
+++ b/TAO/tests/GIOP_Fragments/PMB_With_Fragments/giop1.2_fragments.layout
@@ -0,0 +1,86 @@
+2
+2
+6
+2
+3
+9
+272
+25815
+25815
+25815
+25815
+25815
+25815
+25815
+25815
+25815
+25815
+25815
+25815
+25815
+25815
+25815
+25815
+25815
+25815
+25815
+25815
+25815
+25815
+25815
+25815
+25815
+25815
+25815
+25815
+25815
+25815
+25815
+25815
+25815
+25815
+25815
+25815
+25815
+25815
+25815
+25815
+25815
+25815
+25815
+25815
+25815
+25815
+25815
+25815
+25815
+25815
+25815
+25815
+25815
+25815
+25815
+25815
+25815
+25815
+25815
+25815
+25815
+25815
+25815
+25815
+25815
+25815
+25815
+25815
+25815
+25815
+25815
+25815
+25815
+25815
+25815
+25815
+25815
+25815
+18438
diff --git a/TAO/tests/GIOP_Fragments/PMB_With_Fragments/run_test.pl b/TAO/tests/GIOP_Fragments/PMB_With_Fragments/run_test.pl
new file mode 100755
index 00000000000..41ae198c47f
--- /dev/null
+++ b/TAO/tests/GIOP_Fragments/PMB_With_Fragments/run_test.pl
@@ -0,0 +1,47 @@
+eval '(exit $?0)' && eval 'exec perl -S $0 ${1+"$@"}'
+ & eval 'exec perl -S $0 $argv:q'
+ if 0;
+
+# $Id$
+# -*- perl -*-
+
+use lib '../../../../bin';
+use PerlACE::Run_Test;
+
+$status = 0;
+$iorfile = PerlACE::LocalFile ('server.ior');
+$TARGETHOSTNAME = 'localhost';
+$port = PerlACE::uniqueid () + 12000;
+$debug = 0;
+
+unlink $iorfile;
+
+$SV = new PerlACE::Process ('server',
+ '-ORBEndpoint ' .
+ "iiop://$TARGETHOSTNAME" . ":$port " .
+ "-ORBDebugLevel $debug");
+$SV->Spawn ();
+
+if (PerlACE::waitforfile_timed ($iorfile, 15) == -1) {
+ print STDERR "ERROR: cannot find file <$iorfile>\n";
+ $SV->Kill (); $SV->TimedWait (1);
+ exit 1;
+}
+
+my($cl) = system("$^X dribble.pl --port=$port " .
+ "--stream=giop1.2_fragments.dat " .
+ "--layout=giop1.2_fragments.layout");
+if ($cl != 0) {
+ print STDERR "ERROR: client returned $cl\n";
+ ++$status;
+}
+
+$server = $SV->WaitKill (10);
+
+if ($server != 0) {
+ print STDERR "ERROR: server returned $server\n";
+ ++$status;
+}
+
+unlink $iorfile;
+exit $status;
diff --git a/TAO/tests/GIOP_Fragments/PMB_With_Fragments/server.cpp b/TAO/tests/GIOP_Fragments/PMB_With_Fragments/server.cpp
new file mode 100644
index 00000000000..b130d8c5747
--- /dev/null
+++ b/TAO/tests/GIOP_Fragments/PMB_With_Fragments/server.cpp
@@ -0,0 +1,137 @@
+// $Id$
+
+#include "Payload_Receiver.h"
+#include "ace/Get_Opt.h"
+
+ACE_RCSID(Big_Request_Muxing, server, "$Id$")
+
+const char *ior_output_file = "server.ior";
+
+int
+parse_args (int argc, char *argv[])
+{
+ ACE_Get_Opt get_opts (argc, argv, "o:");
+ int c;
+
+ while ((c = get_opts ()) != -1)
+ switch (c)
+ {
+ case 'o':
+ ior_output_file = get_opts.opt_arg ();
+ break;
+ case '?':
+ default:
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "usage: %s "
+ "-o <iorfile>"
+ "\n",
+ argv [0]),
+ -1);
+ }
+ // Indicates sucessful parsing of the command line
+ return 0;
+}
+
+int
+main (int argc, char *argv[])
+{
+ int status = 0;
+ ACE_TRY_NEW_ENV
+ {
+ CORBA::ORB_var orb =
+ CORBA::ORB_init (argc, argv, "" ACE_ENV_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+
+ CORBA::Object_var poa_object =
+ orb->resolve_initial_references("RootPOA" ACE_ENV_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+
+ if (CORBA::is_nil (poa_object.in ()))
+ ACE_ERROR_RETURN ((LM_ERROR,
+ " (%P|%t) Unable to initialize the POA.\n"),
+ 1);
+
+ PortableServer::POA_var root_poa =
+ PortableServer::POA::_narrow (poa_object.in () ACE_ENV_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+
+ PortableServer::POAManager_var poa_manager =
+ root_poa->the_POAManager (ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+
+ if (parse_args (argc, argv) != 0)
+ return 1;
+
+ PortableServer::LifespanPolicy_var lifespan =
+ root_poa->create_lifespan_policy (PortableServer::PERSISTENT);
+ CORBA::PolicyList policy_list;
+ policy_list.length (1);
+ policy_list[0] = PortableServer::LifespanPolicy::_duplicate (
+ lifespan.in ());
+ PortableServer::POA_var persistent_poa =
+ root_poa->create_POA ("PersistentPOA", poa_manager.in (),
+ policy_list);
+ lifespan->destroy ();
+
+
+ Payload_Receiver *payload_receiver_impl;
+ ACE_NEW_RETURN (payload_receiver_impl,
+ Payload_Receiver (orb.in ()),
+ 1);
+ PortableServer::ServantBase_var receiver_owner_transfer(payload_receiver_impl);
+
+ PortableServer::ObjectId_var id =
+ persistent_poa->activate_object (payload_receiver_impl
+ ACE_ENV_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+
+ CORBA::Object_var obj =
+ persistent_poa->id_to_reference (id.in ()
+ ACE_ENV_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+
+ CORBA::String_var ior =
+ orb->object_to_string (obj.in () ACE_ENV_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+
+ // If the ior_output_file exists, output the ior to it
+ FILE *output_file= ACE_OS::fopen (ior_output_file, "w");
+ if (output_file == 0)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "Cannot open output file for writing IOR: %s",
+ ior_output_file),
+ 1);
+ ACE_OS::fprintf (output_file, "%s", ior.in ());
+ ACE_OS::fclose (output_file);
+
+ poa_manager->activate (ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+
+ orb->run (ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+
+ if (payload_receiver_impl->get_count () != 2)
+ {
+ ACE_ERROR ((LM_ERROR,
+ "ERROR: %d is not the correct "
+ "number of calls\n",
+ payload_receiver_impl->get_count ()));
+ ++status;
+ }
+
+ root_poa->destroy (1, 1 ACE_ENV_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+
+ orb->destroy (ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+ }
+ ACE_CATCHANY
+ {
+ ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION,
+ "Exception caught:");
+ ++status;
+ }
+ ACE_ENDTRY;
+
+ return status;
+}