summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorbala <balanatarajan@users.noreply.github.com>2001-06-25 12:57:34 +0000
committerbala <balanatarajan@users.noreply.github.com>2001-06-25 12:57:34 +0000
commit9f15ca5b47d4851db1315591c15a36c15b5023c8 (patch)
treec8c3c67b72362662d2c64b505a0b34184a4f0079
parent6d08dada7e53cf332656492ad7d463f0a8b6203d (diff)
downloadATCD-9f15ca5b47d4851db1315591c15a36c15b5023c8.tar.gz
ChangeLogTag: Mon Jun 25 07:54:31 2001 Balachandran Natarajan <bala@cs.wustl.edu>
-rw-r--r--TAO/tao/Acceptor_Filter.cpp3
-rw-r--r--TAO/tao/Adapter.cpp1
-rw-r--r--TAO/tao/Any.cpp2
-rw-r--r--TAO/tao/Asynch_Reply_Dispatcher.cpp14
-rw-r--r--TAO/tao/CDR.cpp2
-rw-r--r--TAO/tao/CORBALOC_Parser.cpp1
-rw-r--r--TAO/tao/ClientRequestInfo.cpp1
-rw-r--r--TAO/tao/Connection_Handler.cpp3
-rw-r--r--TAO/tao/Connection_Handler.h6
-rw-r--r--TAO/tao/DomainC.cpp1
-rw-r--r--TAO/tao/DomainC.i1
-rw-r--r--TAO/tao/DynamicC.cpp1
-rw-r--r--TAO/tao/DynamicC.i153
-rw-r--r--TAO/tao/DynamicInterface/DII_Reply_Dispatcher.cpp10
-rw-r--r--TAO/tao/Exception.cpp2
-rw-r--r--TAO/tao/Exclusive_TMS.cpp4
-rw-r--r--TAO/tao/GIOP_Message_Base.cpp137
-rw-r--r--TAO/tao/GIOP_Message_Base.h39
-rw-r--r--TAO/tao/GIOP_Message_Reactive_Handler.cpp574
-rw-r--r--TAO/tao/GIOP_Message_Reactive_Handler.h31
-rw-r--r--TAO/tao/GIOP_Message_Reactive_Handler.inl4
-rw-r--r--TAO/tao/GIOP_Message_State.cpp305
-rw-r--r--TAO/tao/GIOP_Message_State.h115
-rw-r--r--TAO/tao/GIOP_Message_State.i1
-rw-r--r--TAO/tao/IIOP_Acceptor.cpp1
-rw-r--r--TAO/tao/IIOP_Connection_Handler.cpp39
-rw-r--r--TAO/tao/IIOP_Connection_Handler.h6
-rw-r--r--TAO/tao/IIOP_Connector.cpp1
-rw-r--r--TAO/tao/IIOP_Transport.cpp22
-rw-r--r--TAO/tao/IIOP_Transport.h4
-rw-r--r--TAO/tao/IORInfo.cpp22
-rw-r--r--TAO/tao/Invocation.cpp3
-rw-r--r--TAO/tao/Invocation_Endpoint_Selectors.cpp1
-rw-r--r--TAO/tao/LocalObject.cpp2
-rw-r--r--TAO/tao/Makefile5
-rw-r--r--TAO/tao/MessagingC.h1
-rw-r--r--TAO/tao/Muxed_TMS.cpp2
-rw-r--r--TAO/tao/ORB.cpp10
-rw-r--r--TAO/tao/Object_Ref_Table.cpp2
-rw-r--r--TAO/tao/Pluggable_Messaging.h18
-rw-r--r--TAO/tao/Pluggable_Messaging_Utils.cpp2
-rw-r--r--TAO/tao/PolicyC.cpp1
-rw-r--r--TAO/tao/PolicyC.h1
-rw-r--r--TAO/tao/PolicyC.i1
-rw-r--r--TAO/tao/PolicyFactory_Registry.cpp1
-rw-r--r--TAO/tao/PortableInterceptor.pidl1
-rw-r--r--TAO/tao/Profile.cpp20
-rw-r--r--TAO/tao/Synch_Reply_Dispatcher.cpp24
-rw-r--r--TAO/tao/TAO_Server_Request.cpp1
-rw-r--r--TAO/tao/Transport.cpp412
-rw-r--r--TAO/tao/Transport.h105
-rw-r--r--TAO/tao/Wait_On_Read.cpp4
52 files changed, 1287 insertions, 836 deletions
diff --git a/TAO/tao/Acceptor_Filter.cpp b/TAO/tao/Acceptor_Filter.cpp
index 8ba9afd45ee..98a3efb01d2 100644
--- a/TAO/tao/Acceptor_Filter.cpp
+++ b/TAO/tao/Acceptor_Filter.cpp
@@ -1,13 +1,16 @@
// $Id$
+
#include "tao/Acceptor_Filter.h"
#if !defined (__ACE_INLINE__)
# include "Acceptor_Filter.i"
#endif /* __ACE_INLINE__ */
+
ACE_RCSID(tao, Acceptor_Filter, "$Id$")
+
TAO_Acceptor_Filter::~TAO_Acceptor_Filter (void)
{
}
diff --git a/TAO/tao/Adapter.cpp b/TAO/tao/Adapter.cpp
index 41c9e3a8bd9..1f363431999 100644
--- a/TAO/tao/Adapter.cpp
+++ b/TAO/tao/Adapter.cpp
@@ -1,5 +1,6 @@
// $Id$
+
#include "tao/Adapter.h"
#include "tao/Object.h"
#include "tao/Object_KeyC.h"
diff --git a/TAO/tao/Any.cpp b/TAO/tao/Any.cpp
index 4a5c307d38e..35aadd83e34 100644
--- a/TAO/tao/Any.cpp
+++ b/TAO/tao/Any.cpp
@@ -1,5 +1,6 @@
// $Id$
+
// Portions of this file are:
// Copyright 1994-1995 by Sun Microsystems Inc.
// All Rights Reserved
@@ -18,6 +19,7 @@
ACE_RCSID(tao, Any, "$Id$")
+
CORBA::TypeCode_ptr
CORBA_Any::type (void) const
{
diff --git a/TAO/tao/Asynch_Reply_Dispatcher.cpp b/TAO/tao/Asynch_Reply_Dispatcher.cpp
index b4a20a82253..5e7b2adb3aa 100644
--- a/TAO/tao/Asynch_Reply_Dispatcher.cpp
+++ b/TAO/tao/Asynch_Reply_Dispatcher.cpp
@@ -1,6 +1,5 @@
// $Id$
-
#include "tao/Asynch_Reply_Dispatcher.h"
#include "tao/Pluggable_Messaging_Utils.h"
@@ -48,11 +47,6 @@ TAO_Asynch_Reply_Dispatcher_Base::dispatch_reply (
return 0;
}
-/*TAO_GIOP_Message_State *
-TAO_Asynch_Reply_Dispatcher_Base::message_state (void)
-{
- return this->message_state_;
-} */
void
TAO_Asynch_Reply_Dispatcher_Base::dispatcher_bound (TAO_Transport *)
@@ -113,11 +107,11 @@ TAO_Asynch_Reply_Dispatcher::dispatch_reply (
this->reply_status_ = params.reply_status_;
- // this->message_state_ = message_state;
-
- // Steal the buffer so that no copying is done.
- this->reply_cdr_.exchange_data_blocks (params.input_cdr_);
+ // Transfer the <params.input_cdr_>'s content to this->reply_cdr_
+ ACE_Data_Block *db =
+ this->reply_cdr_.clone_from (params.input_cdr_);
+ ACE_UNUSED_ARG (db);
// Steal the buffer, that way we don't do any unnecesary copies of
// this data.
CORBA::ULong max = params.svc_ctx_.maximum ();
diff --git a/TAO/tao/CDR.cpp b/TAO/tao/CDR.cpp
index 07eb03a7cf2..a08222e531f 100644
--- a/TAO/tao/CDR.cpp
+++ b/TAO/tao/CDR.cpp
@@ -41,9 +41,11 @@
# include "tao/CDR.i"
#endif /* ! __ACE_INLINE__ */
+
ACE_RCSID(tao, CDR, "$Id$")
+
#if defined (ACE_ENABLE_TIMEPROBES)
static const char *TAO_CDR_Timeprobe_Description[] =
diff --git a/TAO/tao/CORBALOC_Parser.cpp b/TAO/tao/CORBALOC_Parser.cpp
index 13f09350804..ab4b228771a 100644
--- a/TAO/tao/CORBALOC_Parser.cpp
+++ b/TAO/tao/CORBALOC_Parser.cpp
@@ -1,5 +1,6 @@
// $Id$
+
#include "CORBALOC_Parser.h"
#include "ORB_Core.h"
#include "Stub.h"
diff --git a/TAO/tao/ClientRequestInfo.cpp b/TAO/tao/ClientRequestInfo.cpp
index 8dab949ae98..281ca1ed6a7 100644
--- a/TAO/tao/ClientRequestInfo.cpp
+++ b/TAO/tao/ClientRequestInfo.cpp
@@ -2,6 +2,7 @@
//
// $Id$
+
#include "ClientRequestInfo.h"
#include "Invocation.h"
#include "Stub.h"
diff --git a/TAO/tao/Connection_Handler.cpp b/TAO/tao/Connection_Handler.cpp
index 985dbbf44c8..099e79d5eef 100644
--- a/TAO/tao/Connection_Handler.cpp
+++ b/TAO/tao/Connection_Handler.cpp
@@ -93,7 +93,8 @@ TAO_Connection_Handler::svc_i (void)
while (!this->orb_core_->has_shutdown ()
&& result >= 0)
{
- result = this->handle_input_i (ACE_INVALID_HANDLE, max_wait_time);
+ result =
+ this->transport ()->handle_input_i (ACE_INVALID_HANDLE, max_wait_time);
if (result == -1 && errno == ETIME)
{
diff --git a/TAO/tao/Connection_Handler.h b/TAO/tao/Connection_Handler.h
index 7caeccecd4b..6ca525bd1b0 100644
--- a/TAO/tao/Connection_Handler.h
+++ b/TAO/tao/Connection_Handler.h
@@ -101,9 +101,9 @@ protected:
/// Object.
int svc_i (void);
- /// Need to be implemented by the underlying protocol objects
- virtual int handle_input_i (ACE_HANDLE = ACE_INVALID_HANDLE,
- ACE_Time_Value *max_wait_time = 0) = 0;
+#if !defined (TAO_CONNECTION_HANDLER_BUF_SIZE)
+# define TAO_CONNECTION_HANDLER_BUF_SIZE 1024
+#endif /*TAO_CONNECTION_HANDLER_BUF_SIZE */
private:
diff --git a/TAO/tao/DomainC.cpp b/TAO/tao/DomainC.cpp
index 7dfa513c009..f2e9509a4f6 100644
--- a/TAO/tao/DomainC.cpp
+++ b/TAO/tao/DomainC.cpp
@@ -2,6 +2,7 @@
//
// $Id$
+
// **** Code generated by the The ACE ORB (TAO) IDL Compiler ****
// TAO and the TAO IDL Compiler have been developed by:
// Center for Distributed Object Computing
diff --git a/TAO/tao/DomainC.i b/TAO/tao/DomainC.i
index 08c89453f73..b86dfb17929 100644
--- a/TAO/tao/DomainC.i
+++ b/TAO/tao/DomainC.i
@@ -2,6 +2,7 @@
//
// $Id$
+
// **** Code generated by the The ACE ORB (TAO) IDL Compiler ****
// TAO and the TAO IDL Compiler have been developed by:
// Center for Distributed Object Computing
diff --git a/TAO/tao/DynamicC.cpp b/TAO/tao/DynamicC.cpp
index 310487d401c..3ba3d039091 100644
--- a/TAO/tao/DynamicC.cpp
+++ b/TAO/tao/DynamicC.cpp
@@ -2,6 +2,7 @@
//
// $Id$
+
// **** Code generated by the The ACE ORB (TAO) IDL Compiler ****
// TAO and the TAO IDL Compiler have been developed by:
// Center for Distributed Object Computing
diff --git a/TAO/tao/DynamicC.i b/TAO/tao/DynamicC.i
index 09fd7a3ff51..b45699ab5bb 100644
--- a/TAO/tao/DynamicC.i
+++ b/TAO/tao/DynamicC.i
@@ -70,7 +70,7 @@ Dynamic::Parameter_var::operator= (const ::Dynamic::Parameter_var &p)
{
Parameter *deep_copy =
new Parameter (*p.ptr_);
-
+
if (deep_copy != 0)
{
Parameter *tmp = deep_copy;
@@ -80,7 +80,7 @@ Dynamic::Parameter_var::operator= (const ::Dynamic::Parameter_var &p)
}
}
}
-
+
return *this;
}
@@ -103,20 +103,20 @@ Dynamic::Parameter_var::operator const ::Dynamic::Parameter &() const // cast
}
ACE_INLINE
-Dynamic::Parameter_var::operator ::Dynamic::Parameter &() // cast
+Dynamic::Parameter_var::operator ::Dynamic::Parameter &() // cast
{
return *this->ptr_;
}
ACE_INLINE
-Dynamic::Parameter_var::operator ::Dynamic::Parameter &() const // cast
+Dynamic::Parameter_var::operator ::Dynamic::Parameter &() const // cast
{
return *this->ptr_;
}
// variable-size types only
ACE_INLINE
-Dynamic::Parameter_var::operator ::Dynamic::Parameter *&() // cast
+Dynamic::Parameter_var::operator ::Dynamic::Parameter *&() // cast
{
return this->ptr_;
}
@@ -133,7 +133,7 @@ Dynamic::Parameter_var::inout (void)
return *this->ptr_;
}
-// mapping for variable size
+// mapping for variable size
ACE_INLINE ::Dynamic::Parameter *&
Dynamic::Parameter_var::out (void)
{
@@ -194,7 +194,7 @@ Dynamic::Parameter_out::operator= (Parameter *p)
return *this;
}
-ACE_INLINE
+ACE_INLINE
Dynamic::Parameter_out::operator ::Dynamic::Parameter *&() // cast
{
return this->ptr_;
@@ -214,7 +214,7 @@ Dynamic::Parameter_out::operator-> (void)
#if !defined (TAO_USE_SEQUENCE_TEMPLATES)
-
+
#if !defined (__TAO_UNBOUNDED_SEQUENCE_DYNAMIC_PARAMETERLIST_CI_)
#define __TAO_UNBOUNDED_SEQUENCE_DYNAMIC_PARAMETERLIST_CI_
@@ -227,24 +227,24 @@ Dynamic::Parameter_out::operator-> (void)
ACE_NEW_RETURN (retval, Dynamic::Parameter[size], 0);
return retval;
}
-
+
ACE_INLINE void Dynamic::_TAO_Unbounded_Sequence_Dynamic_ParameterList::freebuf (Dynamic::Parameter *buffer)
// Free the sequence.
{
delete [] buffer;
}
-
+
ACE_INLINE
Dynamic::_TAO_Unbounded_Sequence_Dynamic_ParameterList::_TAO_Unbounded_Sequence_Dynamic_ParameterList (void) // Default constructor.
{
}
-
+
ACE_INLINE
Dynamic::_TAO_Unbounded_Sequence_Dynamic_ParameterList::_TAO_Unbounded_Sequence_Dynamic_ParameterList (CORBA::ULong maximum) // Constructor using a maximum length value.
: TAO_Unbounded_Base_Sequence (maximum, _TAO_Unbounded_Sequence_Dynamic_ParameterList::allocbuf (maximum))
{
}
-
+
ACE_INLINE
Dynamic::_TAO_Unbounded_Sequence_Dynamic_ParameterList::_TAO_Unbounded_Sequence_Dynamic_ParameterList (CORBA::ULong maximum,
CORBA::ULong length,
@@ -253,7 +253,7 @@ Dynamic::Parameter_out::operator-> (void)
: TAO_Unbounded_Base_Sequence (maximum, length, data, release)
{
}
-
+
ACE_INLINE
Dynamic::_TAO_Unbounded_Sequence_Dynamic_ParameterList::_TAO_Unbounded_Sequence_Dynamic_ParameterList (const _TAO_Unbounded_Sequence_Dynamic_ParameterList &rhs)
// Copy constructor.
@@ -263,10 +263,10 @@ Dynamic::Parameter_out::operator-> (void)
{
Dynamic::Parameter *tmp1 = _TAO_Unbounded_Sequence_Dynamic_ParameterList::allocbuf (this->maximum_);
Dynamic::Parameter * const tmp2 = ACE_reinterpret_cast (Dynamic::Parameter * ACE_CAST_CONST, rhs.buffer_);
-
+
for (CORBA::ULong i = 0; i < this->length_; ++i)
tmp1[i] = tmp2[i];
-
+
this->buffer_ = tmp1;
}
else
@@ -274,14 +274,14 @@ Dynamic::Parameter_out::operator-> (void)
this->buffer_ = 0;
}
}
-
+
ACE_INLINE Dynamic::_TAO_Unbounded_Sequence_Dynamic_ParameterList &
Dynamic::_TAO_Unbounded_Sequence_Dynamic_ParameterList::operator= (const _TAO_Unbounded_Sequence_Dynamic_ParameterList &rhs)
// Assignment operator.
{
if (this == &rhs)
return *this;
-
+
if (this->release_)
{
if (this->maximum_ < rhs.maximum_)
@@ -294,18 +294,18 @@ Dynamic::Parameter_out::operator-> (void)
}
else
this->buffer_ = _TAO_Unbounded_Sequence_Dynamic_ParameterList::allocbuf (rhs.maximum_);
-
+
TAO_Unbounded_Base_Sequence::operator= (rhs);
-
+
Dynamic::Parameter *tmp1 = ACE_reinterpret_cast (Dynamic::Parameter *, this->buffer_);
Dynamic::Parameter * const tmp2 = ACE_reinterpret_cast (Dynamic::Parameter * ACE_CAST_CONST, rhs.buffer_);
-
+
for (CORBA::ULong i = 0; i < this->length_; ++i)
tmp1[i] = tmp2[i];
-
+
return *this;
}
-
+
// = Accessors.
ACE_INLINE Dynamic::Parameter &
Dynamic::_TAO_Unbounded_Sequence_Dynamic_ParameterList::operator[] (CORBA::ULong i)
@@ -315,7 +315,7 @@ Dynamic::Parameter_out::operator-> (void)
Dynamic::Parameter* tmp = ACE_reinterpret_cast(Dynamic::Parameter*,this->buffer_);
return tmp[i];
}
-
+
ACE_INLINE const Dynamic::Parameter &
Dynamic::_TAO_Unbounded_Sequence_Dynamic_ParameterList::operator[] (CORBA::ULong i) const
// operator []
@@ -324,9 +324,9 @@ Dynamic::Parameter_out::operator-> (void)
Dynamic::Parameter * const tmp = ACE_reinterpret_cast (Dynamic::Parameter* ACE_CAST_CONST, this->buffer_);
return tmp[i];
}
-
+
// Implement the TAO_Base_Sequence methods (see Sequence.h)
-
+
ACE_INLINE Dynamic::Parameter *
Dynamic::_TAO_Unbounded_Sequence_Dynamic_ParameterList::get_buffer (CORBA::Boolean orphan)
{
@@ -360,13 +360,13 @@ Dynamic::Parameter_out::operator-> (void)
}
return result;
}
-
+
ACE_INLINE const Dynamic::Parameter *
Dynamic::_TAO_Unbounded_Sequence_Dynamic_ParameterList::get_buffer (void) const
{
return ACE_reinterpret_cast(const Dynamic::Parameter * ACE_CAST_CONST, this->buffer_);
}
-
+
ACE_INLINE void
Dynamic::_TAO_Unbounded_Sequence_Dynamic_ParameterList::replace (CORBA::ULong max,
CORBA::ULong length,
@@ -383,11 +383,11 @@ Dynamic::Parameter_out::operator-> (void)
this->buffer_ = data;
this->release_ = release;
}
-
+
#endif /* end #if !defined */
-#endif /* !TAO_USE_SEQUENCE_TEMPLATES */
+#endif /* !TAO_USE_SEQUENCE_TEMPLATES */
#if !defined (_DYNAMIC_PARAMETERLIST_CI_)
#define _DYNAMIC_PARAMETERLIST_CI_
@@ -443,7 +443,7 @@ Dynamic::ParameterList_var::operator= (const ::Dynamic::ParameterList_var &p)
{
ParameterList *deep_copy =
new ParameterList (*p.ptr_);
-
+
if (deep_copy != 0)
{
ParameterList *tmp = deep_copy;
@@ -453,7 +453,7 @@ Dynamic::ParameterList_var::operator= (const ::Dynamic::ParameterList_var &p)
}
}
}
-
+
return *this;
}
@@ -469,27 +469,27 @@ Dynamic::ParameterList_var::operator-> (void)
return this->ptr_;
}
-ACE_INLINE
+ACE_INLINE
Dynamic::ParameterList_var::operator const ::Dynamic::ParameterList &() const // cast
{
return *this->ptr_;
}
-ACE_INLINE
-Dynamic::ParameterList_var::operator ::Dynamic::ParameterList &() // cast
+ACE_INLINE
+Dynamic::ParameterList_var::operator ::Dynamic::ParameterList &() // cast
{
return *this->ptr_;
}
-ACE_INLINE
-Dynamic::ParameterList_var::operator ::Dynamic::ParameterList &() const // cast
+ACE_INLINE
+Dynamic::ParameterList_var::operator ::Dynamic::ParameterList &() const // cast
{
return *this->ptr_;
}
// variable-size types only
ACE_INLINE
-Dynamic::ParameterList_var::operator ::Dynamic::ParameterList *&() // cast
+Dynamic::ParameterList_var::operator ::Dynamic::ParameterList *&() // cast
{
return this->ptr_;
}
@@ -518,7 +518,7 @@ Dynamic::ParameterList_var::inout (void)
return *this->ptr_;
}
-// mapping for variable size
+// mapping for variable size
ACE_INLINE ::Dynamic::ParameterList *&
Dynamic::ParameterList_var::out (void)
{
@@ -579,7 +579,7 @@ Dynamic::ParameterList_out::operator= (ParameterList *p)
return *this;
}
-ACE_INLINE
+ACE_INLINE
Dynamic::ParameterList_out::operator ::Dynamic::ParameterList *&() // cast
{
return this->ptr_;
@@ -608,7 +608,7 @@ Dynamic::ParameterList_out::operator[] (CORBA::ULong index)
#if !defined (TAO_USE_SEQUENCE_TEMPLATES)
-
+
#if !defined (__TAO_UNBOUNDED_OBJECT_SEQUENCE_DYNAMIC_EXCEPTIONLIST_CI_)
#define __TAO_UNBOUNDED_OBJECT_SEQUENCE_DYNAMIC_EXCEPTIONLIST_CI_
@@ -616,36 +616,36 @@ Dynamic::ParameterList_out::operator[] (CORBA::ULong index)
Dynamic::_TAO_Unbounded_Object_Sequence_Dynamic_ExceptionList::allocbuf (CORBA::ULong nelems)
{
CORBA::TypeCode **buf = 0;
-
+
ACE_NEW_RETURN (buf, CORBA::TypeCode*[nelems], 0);
-
+
for (CORBA::ULong i = 0; i < nelems; i++)
{
buf[i] = CORBA::TypeCode::_nil ();
}
-
+
return buf;
}
-
- ACE_INLINE void
+
+ ACE_INLINE void
Dynamic::_TAO_Unbounded_Object_Sequence_Dynamic_ExceptionList::freebuf (CORBA::TypeCode **buffer)
{
if (buffer == 0)
return;
delete[] buffer;
}
-
+
ACE_INLINE
Dynamic::_TAO_Unbounded_Object_Sequence_Dynamic_ExceptionList::_TAO_Unbounded_Object_Sequence_Dynamic_ExceptionList (void)
{
}
-
+
ACE_INLINE
Dynamic::_TAO_Unbounded_Object_Sequence_Dynamic_ExceptionList::_TAO_Unbounded_Object_Sequence_Dynamic_ExceptionList (CORBA::ULong maximum)
: TAO_Unbounded_Base_Sequence (maximum, _TAO_Unbounded_Object_Sequence_Dynamic_ExceptionList::allocbuf (maximum))
{
}
-
+
ACE_INLINE
Dynamic::_TAO_Unbounded_Object_Sequence_Dynamic_ExceptionList::_TAO_Unbounded_Object_Sequence_Dynamic_ExceptionList (CORBA::ULong maximum,
CORBA::ULong length,
@@ -654,7 +654,7 @@ Dynamic::ParameterList_out::operator[] (CORBA::ULong index)
: TAO_Unbounded_Base_Sequence (maximum, length, value, release)
{
}
-
+
ACE_INLINE
Dynamic::_TAO_Unbounded_Object_Sequence_Dynamic_ExceptionList::_TAO_Unbounded_Object_Sequence_Dynamic_ExceptionList(const _TAO_Unbounded_Object_Sequence_Dynamic_ExceptionList &rhs)
: TAO_Unbounded_Base_Sequence (rhs)
@@ -663,12 +663,12 @@ Dynamic::ParameterList_out::operator[] (CORBA::ULong index)
{
CORBA::TypeCode **tmp1 = _TAO_Unbounded_Object_Sequence_Dynamic_ExceptionList::allocbuf (this->maximum_);
CORBA::TypeCode ** const tmp2 = ACE_reinterpret_cast (CORBA::TypeCode ** ACE_CAST_CONST, rhs.buffer_);
-
+
for (CORBA::ULong i = 0; i < rhs.length_; ++i)
{
tmp1[i] = CORBA::TypeCode::_duplicate (tmp2[i]);
}
-
+
this->buffer_ = tmp1;
}
else
@@ -676,17 +676,17 @@ Dynamic::ParameterList_out::operator[] (CORBA::ULong index)
this->buffer_ = 0;
}
}
-
+
ACE_INLINE Dynamic::_TAO_Unbounded_Object_Sequence_Dynamic_ExceptionList &
Dynamic::_TAO_Unbounded_Object_Sequence_Dynamic_ExceptionList::operator= (const _TAO_Unbounded_Object_Sequence_Dynamic_ExceptionList &rhs)
{
if (this == &rhs)
return *this;
-
+
if (this->release_)
{
CORBA::TypeCode **tmp = ACE_reinterpret_cast (CORBA::TypeCode **, this->buffer_);
-
+
for (CORBA::ULong i = 0; i < this->length_; ++i)
{
CORBA::release (tmp[i]);
@@ -700,20 +700,20 @@ Dynamic::ParameterList_out::operator[] (CORBA::ULong index)
}
else
this->buffer_ = _TAO_Unbounded_Object_Sequence_Dynamic_ExceptionList::allocbuf (rhs.maximum_);
-
+
TAO_Unbounded_Base_Sequence::operator= (rhs);
-
+
CORBA::TypeCode **tmp1 = ACE_reinterpret_cast (CORBA::TypeCode **, this->buffer_);
CORBA::TypeCode ** const tmp2 = ACE_reinterpret_cast (CORBA::TypeCode ** ACE_CAST_CONST, rhs.buffer_);
-
+
for (CORBA::ULong i = 0; i < rhs.length_; ++i)
{
tmp1[i] = CORBA::TypeCode::_duplicate (tmp2[i]);
}
-
+
return *this;
}
-
+
ACE_INLINE TAO_Pseudo_Object_Manager<Dynamic::TypeCode,Dynamic::TypeCode_var>
Dynamic::_TAO_Unbounded_Object_Sequence_Dynamic_ExceptionList::operator[] (CORBA::ULong index) const
// read-write accessor
@@ -722,7 +722,7 @@ Dynamic::ParameterList_out::operator[] (CORBA::ULong index)
CORBA::TypeCode ** const tmp = ACE_reinterpret_cast (CORBA::TypeCode ** ACE_CAST_CONST, this->buffer_);
return TAO_Pseudo_Object_Manager<Dynamic::TypeCode,Dynamic::TypeCode_var> (tmp + index, this->release_);
}
-
+
ACE_INLINE CORBA::TypeCode* *
Dynamic::_TAO_Unbounded_Object_Sequence_Dynamic_ExceptionList::get_buffer (CORBA::Boolean orphan)
{
@@ -756,18 +756,18 @@ Dynamic::ParameterList_out::operator[] (CORBA::ULong index)
}
return result;
}
-
+
ACE_INLINE const CORBA::TypeCode* *
Dynamic::_TAO_Unbounded_Object_Sequence_Dynamic_ExceptionList::get_buffer (void) const
{
return ACE_reinterpret_cast(const CORBA::TypeCode ** ACE_CAST_CONST, this->buffer_);
}
-
-
+
+
#endif /* end #if !defined */
-#endif /* !TAO_USE_SEQUENCE_TEMPLATES */
+#endif /* !TAO_USE_SEQUENCE_TEMPLATES */
#if !defined (_DYNAMIC_EXCEPTIONLIST_CI_)
#define _DYNAMIC_EXCEPTIONLIST_CI_
@@ -823,7 +823,7 @@ Dynamic::ExceptionList_var::operator= (const ::Dynamic::ExceptionList_var &p)
{
ExceptionList *deep_copy =
new ExceptionList (*p.ptr_);
-
+
if (deep_copy != 0)
{
ExceptionList *tmp = deep_copy;
@@ -833,7 +833,7 @@ Dynamic::ExceptionList_var::operator= (const ::Dynamic::ExceptionList_var &p)
}
}
}
-
+
return *this;
}
@@ -849,27 +849,27 @@ Dynamic::ExceptionList_var::operator-> (void)
return this->ptr_;
}
-ACE_INLINE
+ACE_INLINE
Dynamic::ExceptionList_var::operator const ::Dynamic::ExceptionList &() const // cast
{
return *this->ptr_;
}
-ACE_INLINE
-Dynamic::ExceptionList_var::operator ::Dynamic::ExceptionList &() // cast
+ACE_INLINE
+Dynamic::ExceptionList_var::operator ::Dynamic::ExceptionList &() // cast
{
return *this->ptr_;
}
-ACE_INLINE
-Dynamic::ExceptionList_var::operator ::Dynamic::ExceptionList &() const // cast
+ACE_INLINE
+Dynamic::ExceptionList_var::operator ::Dynamic::ExceptionList &() const // cast
{
return *this->ptr_;
}
// variable-size types only
ACE_INLINE
-Dynamic::ExceptionList_var::operator ::Dynamic::ExceptionList *&() // cast
+Dynamic::ExceptionList_var::operator ::Dynamic::ExceptionList *&() // cast
{
return this->ptr_;
}
@@ -892,7 +892,7 @@ Dynamic::ExceptionList_var::inout (void)
return *this->ptr_;
}
-// mapping for variable size
+// mapping for variable size
ACE_INLINE ::Dynamic::ExceptionList *&
Dynamic::ExceptionList_var::out (void)
{
@@ -953,7 +953,7 @@ Dynamic::ExceptionList_out::operator= (ExceptionList *p)
return *this;
}
-ACE_INLINE
+ACE_INLINE
Dynamic::ExceptionList_out::operator ::Dynamic::ExceptionList *&() // cast
{
return this->ptr_;
@@ -989,7 +989,7 @@ ACE_INLINE CORBA::Boolean operator<< (TAO_OutputCDR &strm, const Dynamic::Parame
return 1;
else
return 0;
-
+
}
ACE_INLINE CORBA::Boolean operator>> (TAO_InputCDR &strm, Dynamic::Parameter &_tao_aggregate)
@@ -1001,7 +1001,7 @@ ACE_INLINE CORBA::Boolean operator>> (TAO_InputCDR &strm, Dynamic::Parameter &_t
return 1;
else
return 0;
-
+
}
@@ -1033,4 +1033,3 @@ CORBA::Boolean TAO_Export operator>> (
);
#endif /* _TAO_CDR_OP_Dynamic_ExceptionList_I_ */
-
diff --git a/TAO/tao/DynamicInterface/DII_Reply_Dispatcher.cpp b/TAO/tao/DynamicInterface/DII_Reply_Dispatcher.cpp
index 5471cd4d1f3..ff807dfb5fe 100644
--- a/TAO/tao/DynamicInterface/DII_Reply_Dispatcher.cpp
+++ b/TAO/tao/DynamicInterface/DII_Reply_Dispatcher.cpp
@@ -2,11 +2,14 @@
+
#include "DII_Reply_Dispatcher.h"
+
ACE_RCSID(DynamicInterface, DII_Reply_Dispatcher, "$Id$")
+
#include "Request.h"
#include "tao/Pluggable.h"
#include "tao/Environment.h"
@@ -44,8 +47,11 @@ TAO_DII_Deferred_Reply_Dispatcher::dispatch_reply (
{
this->reply_status_ = params.reply_status_;
- // Steal the buffer so that no copying is done.
- this->reply_cdr_.steal_from (params.input_cdr_);
+ // Transfer the <params.input_cdr_>'s content to this->reply_cdr_
+ ACE_Data_Block *db =
+ this->reply_cdr_.clone_from (params.input_cdr_);
+
+ ACE_UNUSED_ARG (db);
// Steal the buffer, that way we don't do any unnecesary copies of
// this data.
diff --git a/TAO/tao/Exception.cpp b/TAO/tao/Exception.cpp
index 9d9f12fe3e9..d33b1ebc4b8 100644
--- a/TAO/tao/Exception.cpp
+++ b/TAO/tao/Exception.cpp
@@ -1,5 +1,6 @@
// $Id$
+
// THREADING NOTE: calling thread handles mutual exclusion policy
// on all of these data structures.
@@ -26,6 +27,7 @@ ACE_RCSID (TAO,
Exception,
"$Id$")
+
// Static initializers.
ACE_Allocator *TAO_Exceptions::global_allocator_;
diff --git a/TAO/tao/Exclusive_TMS.cpp b/TAO/tao/Exclusive_TMS.cpp
index 3645be52122..47eff9ca3be 100644
--- a/TAO/tao/Exclusive_TMS.cpp
+++ b/TAO/tao/Exclusive_TMS.cpp
@@ -43,10 +43,6 @@ TAO_Exclusive_TMS::bind_dispatcher (CORBA::ULong request_id,
this->request_id_ = request_id;
this->rd_ = rd;
- // If there was a previous reply, cleanup its state first.
- // if (this->message_state_.message_size != 0)
- // this->message_state_.reset (0);
-
return TAO_Transport_Mux_Strategy::bind_dispatcher (request_id,
rd);
}
diff --git a/TAO/tao/GIOP_Message_Base.cpp b/TAO/tao/GIOP_Message_Base.cpp
index 9ffb54cc572..e3882c63802 100644
--- a/TAO/tao/GIOP_Message_Base.cpp
+++ b/TAO/tao/GIOP_Message_Base.cpp
@@ -17,11 +17,11 @@
ACE_RCSID (tao, GIOP_Message_Base, "$Id$")
+
TAO_GIOP_Message_Base::TAO_GIOP_Message_Base (TAO_ORB_Core *orb_core,
- size_t input_cdr_size)
- : message_handler_ (orb_core,
- this,
- input_cdr_size),
+ size_t /*input_cdr_size*/)
+ : message_state_ (orb_core,
+ this),
output_ (0),
generator_parser_ (0)
{
@@ -62,10 +62,10 @@ TAO_GIOP_Message_Base::init (CORBA::Octet major,
void
-TAO_GIOP_Message_Base::reset (int reset_flag)
+TAO_GIOP_Message_Base::reset (int /*reset_flag*/)
{
// Reset the message state
- this->message_handler_.reset (reset_flag);
+ // this->message_handler_.reset (reset_flag);
}
int
@@ -184,10 +184,11 @@ TAO_GIOP_Message_Base::generate_reply_header (
int
-TAO_GIOP_Message_Base::read_message (TAO_Transport *transport,
+TAO_GIOP_Message_Base::read_message (TAO_Transport * /*transport*/,
int /*block */,
ACE_Time_Value * /*max_wait_time*/)
{
+#if 0
// Call the handler to read and do a simple parse of the header of
// the message.
int retval =
@@ -217,6 +218,7 @@ TAO_GIOP_Message_Base::read_message (TAO_Transport *transport,
state.giop_version.minor);
}
+#endif /* if 0*/
// We return 2, it is ugly. But the reactor semantics has made us to
// limp :(
return 2;
@@ -286,7 +288,7 @@ TAO_GIOP_Message_Base::format_message (TAO_OutputCDR &stream)
TAO_Pluggable_Message_Type
TAO_GIOP_Message_Base::message_type (void)
{
- switch (this->message_handler_.message_state ().message_type)
+ switch (this->message_state_.message_type_)
{
case TAO_GIOP_REQUEST:
case TAO_GIOP_LOCATEREQUEST:
@@ -313,11 +315,61 @@ TAO_GIOP_Message_Base::message_type (void)
}
int
+TAO_GIOP_Message_Base::parse_incoming_messages (ACE_Message_Block &incoming)
+{
+
+
+ if (this->message_state_.parse_message_header (incoming) == -1)
+ {
+ return -1;
+ }
+
+ // Set the state internally for parsing and generating messages
+ this->set_state (this->message_state_.giop_version_.major,
+ this->message_state_.giop_version_.minor);
+ return 0;
+}
+
+size_t
+TAO_GIOP_Message_Base::missing_data (ACE_Message_Block &incoming)
+{
+ // @@Bala: Look for fragmentation here..
+ // If we had recd. fragmented messages and if the GIOP minor version
+ // is greater than 1, then include the FRAGMENT HEADER to calculate
+ // the effective length of the message
+ /*if (this->message_state_.more_fragments_ &&
+ this->message_state_.giop_version_.minor > 1)
+ len -= TAO_GIOP_MESSAGE_FRAGMENT_HEADER;
+ */
+
+ size_t len = incoming.length ();
+
+ if (len >= this->message_state_.message_size ())
+ return 0;
+
+ return this->message_state_.message_size () - len;
+}
+
+CORBA::Octet
+TAO_GIOP_Message_Base::byte_order (void)
+{
+ return this->message_state_.byte_order_;
+}
+
+int
+TAO_GIOP_Message_Base::is_message_complete (ACE_Message_Block & /*incoming*/)
+{
+ // @@Bala: Implement other cases
+ return 0;
+}
+
+int
TAO_GIOP_Message_Base::process_request_message (TAO_Transport *transport,
- TAO_ORB_Core *orb_core)
+ TAO_ORB_Core *orb_core,
+ ACE_Message_Block &incoming,
+ CORBA::Octet byte_order)
{
// Set the upcall thread
-
orb_core->lf_strategy ().set_upcall_thread (orb_core->leader_follower ());
// Reset the output CDR stream.
@@ -325,31 +377,36 @@ TAO_GIOP_Message_Base::process_request_message (TAO_Transport *transport,
this->output_->reset ();
// Get the read and write positions before we steal data.
- size_t rd_pos = this->message_handler_.rd_pos ();
- size_t wr_pos = this->message_handler_.wr_pos ();
+ size_t rd_pos = incoming.rd_ptr () - incoming.base ();
+ size_t wr_pos = incoming.wr_ptr () - incoming.base ();
+ rd_pos += TAO_GIOP_MESSAGE_HEADER_LEN;
+
+ this->dump_msg ("recv",
+ ACE_reinterpret_cast (u_char *, incoming.rd_ptr ()),
+ incoming.length ());
- TAO_GIOP_Message_State &state =
- this->message_handler_.message_state ();
// Create a input CDR stream.
// NOTE: We use the same data block in which we read the message and
// we pass it on to the higher layers of the ORB. So we dont to any
// copies at all here. The same is also done in the higher layers.
- TAO_InputCDR input_cdr (this->message_handler_.steal_data_block (),
- 0,
+
+ TAO_InputCDR input_cdr (incoming.data_block (),
+ ACE_Message_Block::DONT_DELETE,
rd_pos,
wr_pos,
- this->message_handler_.message_state ().byte_order,
- state.giop_version.major,
- state.giop_version.minor,
+ byte_order,
+ this->message_state_.giop_version_.major,
+ this->message_state_.giop_version_.minor,
orb_core);
// Set giop version info for the outstream so that server replies
// in correct GIOP version
- output_->set_version (state.giop_version.major, state.giop_version.minor);
+ this->output_->set_version (this->message_state_.giop_version_.major,
+ this->message_state_.giop_version_.minor);
// Reset the message handler to receive upcalls if any
- this->message_handler_.reset (0);
+ // this->message_handler_.reset (0);
// We know we have some request message. Check whether it is a
// GIOP_REQUEST or GIOP_LOCATE_REQUEST to take action.
@@ -358,7 +415,7 @@ TAO_GIOP_Message_Base::process_request_message (TAO_Transport *transport,
// the stream and never touch that again for anything. We basically
// loose ownership of the data_block.
- switch (this->message_handler_.message_state ().message_type)
+ switch (this->message_state_.message_type_)
{
case TAO_GIOP_REQUEST:
// Should be taken care by the state specific invocations. They
@@ -379,31 +436,37 @@ TAO_GIOP_Message_Base::process_request_message (TAO_Transport *transport,
int
TAO_GIOP_Message_Base::process_reply_message (
- TAO_Pluggable_Reply_Params &params
+ TAO_Pluggable_Reply_Params &params,
+ ACE_Message_Block &incoming,
+ CORBA::Octet byte_order
)
{
+
// Get the read and write positions before we steal data.
- size_t rd_pos = this->message_handler_.rd_pos ();
- size_t wr_pos = this->message_handler_.wr_pos ();
+ size_t rd_pos = incoming.rd_ptr () - incoming.base ();
+ size_t wr_pos = incoming.wr_ptr () - incoming.base ();
+ rd_pos += TAO_GIOP_MESSAGE_HEADER_LEN;
- TAO_GIOP_Message_State &state =
- this->message_handler_.message_state ();
+ this->dump_msg ("recv",
+ ACE_reinterpret_cast (u_char *, incoming.rd_ptr ()),
+ incoming.length ());
- // Create a input CDR stream.
+
+ // Create a empty buffer on stack
// NOTE: We use the same data block in which we read the message and
// we pass it on to the higher layers of the ORB. So we dont to any
// copies at all here. The same is alos done in the higher layers.
- TAO_InputCDR input_cdr (this->message_handler_.steal_data_block (),
- 0,
+ TAO_InputCDR input_cdr (incoming.data_block (),
+ ACE_Message_Block::DONT_DELETE,
rd_pos,
wr_pos,
- this->message_handler_.message_state ().byte_order,
- state.giop_version.major,
- state.giop_version.minor);
+ this->message_state_.giop_version_.major,
+ this->message_state_.giop_version_.minor,
+ byte_order);
// Reset the message state. Now, we are ready for the next nested
// upcall if any.
- this->message_handler_.reset (0);
+ // this->message_handler_.reset (0);
// We know we have some reply message. Check whether it is a
// GIOP_REPLY or GIOP_LOCATE_REPLY to take action.
@@ -412,7 +475,7 @@ TAO_GIOP_Message_Base::process_reply_message (
// the stream and never touch that again for anything. We basically
// loose ownership of the data_block.
- switch (this->message_handler_.message_state ().message_type)
+ switch (this->message_state_.message_type_)
{
case TAO_GIOP_REPLY:
// Should be taken care by the state specific parsing
@@ -1182,6 +1245,7 @@ TAO_GIOP_Message_Base::is_ready_for_bidirectional (void)
int
TAO_GIOP_Message_Base::more_messages (void)
{
+# if 0
int retval =
this->message_handler_.is_message_ready ();
@@ -1198,4 +1262,7 @@ TAO_GIOP_Message_Base::more_messages (void)
state.giop_version.minor);
return retval;
+#endif
+
+ return 0;
}
diff --git a/TAO/tao/GIOP_Message_Base.h b/TAO/tao/GIOP_Message_Base.h
index ecb194b1148..1ca368005fe 100644
--- a/TAO/tao/GIOP_Message_Base.h
+++ b/TAO/tao/GIOP_Message_Base.h
@@ -22,8 +22,9 @@
#endif /* ACE_LACKS_PRAGMA_ONCE */
#include "tao/GIOP_Message_Generator_Parser_Impl.h"
-#include "tao/GIOP_Message_Reactive_Handler.h"
#include "tao/GIOP_Utils.h"
+#include "tao/GIOP_Message_State.h"
+
class TAO_Pluggable_Reply_Params;
@@ -41,7 +42,7 @@ class TAO_Pluggable_Reply_Params;
class TAO_Export TAO_GIOP_Message_Base : public TAO_Pluggable_Messaging
{
public:
- friend class TAO_GIOP_Message_Reactive_Handler;
+ // friend class TAO_GIOP_Message_Reactive_Handler;
/// Constructor
TAO_GIOP_Message_Base (TAO_ORB_Core *orb_core,
@@ -100,18 +101,31 @@ public:
/// TAO_PLUGGABLE_MESSAGE_MESSAGE_ERROR.
virtual TAO_Pluggable_Message_Type message_type (void);
+ /// @@Bala:Documentation please...
+ virtual int parse_incoming_messages (ACE_Message_Block &message_block);
+
+ /// @@Bala:Documentation please..
+ virtual int is_message_complete (ACE_Message_Block &message_block);
+ /// @@Bala:Documentation please..
+ virtual size_t missing_data (ACE_Message_Block &message_block);
+
+ virtual CORBA::Octet byte_order (void);
/// Process the request message that we have received on the
/// connection
virtual int process_request_message (TAO_Transport *transport,
- TAO_ORB_Core *orb_core);
+ TAO_ORB_Core *orb_core,
+ ACE_Message_Block &block,
+ CORBA::Octet byte_order);
/// Parse the reply message that we received and return the reply
/// information though <reply_info>
virtual int process_reply_message (
- TAO_Pluggable_Reply_Params &reply_info
- );
+ TAO_Pluggable_Reply_Params &reply_info,
+ ACE_Message_Block &block,
+ CORBA::Octet byte_order);
+
/// Generate a reply message with the exception <ex>.
virtual int generate_exception_reply (
@@ -191,22 +205,11 @@ private:
/// Thr message handler object that does reading and parsing of the
/// incoming messages
- TAO_GIOP_Message_Reactive_Handler message_handler_;
+ TAO_GIOP_Message_State message_state_;
/// Output CDR
TAO_OutputCDR *output_;
- /// Allocators for the output CDR that we hold. As we cannot rely on
- /// the resources from ORB Core we reserve our own resources. The
- /// reason that we cannot believe the ORB core is that, for a
- /// multi-threaded servers it dishes out resources cached in
- /// TSS. This would be dangerous as TSS gets destroyed before we
- /// would. So we have our own memory that we can rely on.
- /// Implementations of GIOP that we have
- ACE_Allocator *cdr_buffer_alloc_;
- ACE_Allocator *cdr_dblock_alloc_;
- ACE_Allocator *cdr_msgblock_alloc_;
-
/// A buffer that we will use to initialise the CDR stream
char repbuf_[ACE_CDR::DEFAULT_BUFSIZE];
@@ -214,9 +217,9 @@ private:
TAO_GIOP_Message_Generator_Parser_Impl tao_giop_impl_;
protected:
-
/// The generator and parser state.
TAO_GIOP_Message_Generator_Parser *generator_parser_;
+
};
diff --git a/TAO/tao/GIOP_Message_Reactive_Handler.cpp b/TAO/tao/GIOP_Message_Reactive_Handler.cpp
index a8af560081a..da22b2e93ab 100644
--- a/TAO/tao/GIOP_Message_Reactive_Handler.cpp
+++ b/TAO/tao/GIOP_Message_Reactive_Handler.cpp
@@ -8,393 +8,95 @@
#include "tao/GIOP_Message_Base.h"
#include "Transport.h"
+#if 0
#if !defined (__ACE_INLINE__)
# include "tao/GIOP_Message_Reactive_Handler.inl"
#endif /* __ACE_INLINE__ */
-ACE_RCSID(tao, GIOP_Message_Reactive_Handler, "$Id$")
+#endif /*if 0*/
-TAO_GIOP_Message_Reactive_Handler::TAO_GIOP_Message_Reactive_Handler (TAO_ORB_Core * orb_core,
- TAO_GIOP_Message_Base *base,
- size_t input_cdr_size)
- : message_state_ (orb_core),
- mesg_base_ (base),
- message_status_ (TAO_GIOP_WAITING_FOR_HEADER),
- message_size_ (input_cdr_size),
- current_buffer_ (orb_core->data_block_for_message_block (input_cdr_size)),
- supp_buffer_ (orb_core->data_block_for_message_block (input_cdr_size))
+ACE_RCSID(tao, GIOP_Message_Reactive_Handler, "$Id$")
+TAO_GIOP_Message_Reactive_Handler::TAO_GIOP_Message_Reactive_Handler (
+ TAO_ORB_Core * /*orb_core*/,
+ TAO_GIOP_Message_Base * /*base*/)
{
- // NOTE: The message blocks here use a locked allocator which is not
- // from the TSS even if there is one. We are getting the allocators
- // from the global memory. We shouldn't be using the TSS stuff for
- // the following reason
- // (a) The connection handlers are per-connection and not
- // per-thread.
- // (b) The order of cleaning is important if we use allocators from
- // TSS. The TSS goes away when the threads go away. But the
- // connection handlers go away only when the ORB decides to shut
- // it down.
- ACE_CDR::mb_align (&this->current_buffer_);
-
- // Calculate the effective message after alignment
- this->message_size_ -= this->rd_pos ();
-}
-
-
-
-int
-TAO_GIOP_Message_Reactive_Handler::read_messages (TAO_Transport *transport)
-{
- // Read the message from the transport. The size of the message read
- // is the maximum size of the buffer that we have less the amount of
- // data that has already been read in to the buffer.
- ssize_t n = transport->recv (this->current_buffer_.wr_ptr (),
- this->current_buffer_.space ());
-
- if (n == -1)
- {
- if (errno == EWOULDBLOCK)
- return 0;
-
- return -1;
- }
- // @@ What are the other error handling here??
- else if (n == 0)
- {
- return -1;
- }
-
- if (TAO_debug_level == 5)
- {
- ACE_DEBUG ((LM_DEBUG,
- "TAO (%P|%t) - GIOP_Message_Reactive_Handler::read_messages"
- " received %d bytes\n",
- n));
-
- size_t len;
- for (size_t offset = 0; offset < size_t(n); offset += len)
- {
- len = n - offset;
- if (len > 512)
- len = 512;
- ACE_HEX_DUMP ((LM_DEBUG,
- this->current_buffer_.wr_ptr () + offset,
- len,
- "TAO (%P|%t) - read_messages "));
- }
- ACE_DEBUG ((LM_DEBUG, "TAO (%P|%t) - received %d bytes \n", n));
- }
-
-
- // Now we have a succesful read. First adjust the write pointer
- this->current_buffer_.wr_ptr (n);
-
- // Success
- return 1;
}
-int
-TAO_GIOP_Message_Reactive_Handler::parse_message_header (void)
-{
- // Check what message are we waiting for and take suitable action
- if (this->message_status_ == TAO_GIOP_WAITING_FOR_HEADER)
- {
- size_t len = this->current_buffer_.length ();
- char *buf = this->current_buffer_.rd_ptr ();
-
- if (len > TAO_GIOP_MESSAGE_HEADER_LEN)
- {
- // Parse the GIOP header
- if (this->parse_message_header_i (buf) == -1)
-
- return -1;
-
- int retval = this->parse_fragment_header (buf + TAO_GIOP_MESSAGE_HEADER_LEN,
- len);
-
- // Set the pointer read pointer position in the
- // <current_buffer_>
- size_t pos = TAO_GIOP_MESSAGE_HEADER_LEN;
-
- if (retval)
- {
- // We had a fragment header, so the position should be
- // beyond that
- pos += TAO_GIOP_MESSAGE_FRAGMENT_HEADER;
- }
-
- this->current_buffer_.rd_ptr (pos);
- buf = this->current_buffer_.rd_ptr ();
-
- // The GIOP header has been parsed. Set the status to wait for
- // payload
- this->message_status_ = TAO_GIOP_WAITING_FOR_PAYLOAD;
-
- return 1;
- }
-
- // We dont have sufficient information to decipher the GIOP
- // header. Make sure that the reactor calls us back.
- return -2;
- }
-
- // The last read just "read" left-over messages
- return 0;
-}
int
-TAO_GIOP_Message_Reactive_Handler::is_message_ready (void)
+TAO_GIOP_Message_Reactive_Handler::parse_message_header (ACE_Message_Block &incoming,
+ TAO_GIOP_Message_State &state)
{
- if (this->message_status_ == TAO_GIOP_WAITING_FOR_PAYLOAD)
- {
- size_t len = this->current_buffer_.length ();
- char *buf = this->current_buffer_.rd_ptr ();
-
- // Set the buf pointer to the start of the GIOP header
- buf -= TAO_GIOP_MESSAGE_HEADER_LEN;
-
- // Dump the incoming message . It will be dumped only if the
- // debug level is greater than 5 anyway.
- this->mesg_base_->dump_msg (
- "Recv msg",
- ACE_reinterpret_cast (u_char *,
- buf),
- len + TAO_GIOP_MESSAGE_HEADER_LEN);
- if (len == this->message_state_.message_size)
- {
- // If the buffer length is equal to the size of the payload we
- // have exactly one message.
- this->message_status_ = TAO_GIOP_WAITING_FOR_HEADER;
-
- // Check whether we have received only the first part of the
- // fragment.
- return this->message_state_.is_complete (this->current_buffer_);
- }
- else if (len > this->message_state_.message_size)
- {
- // If the length is greater we have received some X messages
- // and a part of X + 1 messages (probably) with X varying
- // from 1 to N.
- this->message_status_ = TAO_GIOP_MULTIPLE_MESSAGES;
-
- // Clone the data that we read in.
- this->supp_buffer_.data_block (
- this->current_buffer_.data_block ()->clone ());
-
- // Set the read and write pointer for the supplementary
- // buffer.
- size_t rd_pos = this->rd_pos ();
- this->supp_buffer_.rd_ptr (rd_pos +
- this->message_state_.message_size);
- this->supp_buffer_.wr_ptr (this->wr_pos ());
-
- // Reset the current buffer
- this->current_buffer_.reset ();
-
- // Set the read and write pointers again for the current
- // buffer. We change the write pointer settings as we would
- // like to process a single message.
- this->current_buffer_.rd_ptr (rd_pos);
- this->current_buffer_.wr_ptr (rd_pos +
- this->message_state_.message_size);
-
- return this->message_state_.is_complete (this->current_buffer_);
- }
- }
- else if (this->message_status_ == TAO_GIOP_MULTIPLE_MESSAGES)
+ // @@Bala: Need to make a check whether we are waiting for the
+ // header...
+ if (incoming.length () > TAO_GIOP_MESSAGE_HEADER_LEN)
{
- size_t len = this->supp_buffer_.length ();
-
- if (len > TAO_GIOP_MESSAGE_HEADER_LEN)
- {
- // @@ What about fragment headers???
- this->current_buffer_.copy (this->supp_buffer_.rd_ptr (),
- TAO_GIOP_MESSAGE_HEADER_LEN);
-
- this->supp_buffer_.rd_ptr (TAO_GIOP_MESSAGE_HEADER_LEN);
-
- len = this->current_buffer_.length ();
- char *buf = this->current_buffer_.rd_ptr ();
-
- if (this->parse_message_header_i (buf) == -1)
-
- return -1;
-
- // Set the pointer read pointer position in the
- // <current_buffer_>
- this->current_buffer_.rd_ptr (TAO_GIOP_MESSAGE_HEADER_LEN);
-
- // The GIOP header has been parsed. Set the status to wait for
- // payload
- this->message_status_ = TAO_GIOP_WAITING_FOR_PAYLOAD;
-
- return this->get_message ();
- }
- else
- {
- // We have smaller than the header size left here. We
- // just copy the rest of the stuff and reset things so that
- // we can read the rest of the stuff from the socket.
- this->current_buffer_.copy (
- this->supp_buffer_.rd_ptr (),
- len);
-
- // Reset the supp buffer now
- this->supp_buffer_.reset ();
-
- this->message_status_ = TAO_GIOP_WAITING_FOR_HEADER;
- }
-
+ // Parse the GIOP header
+ if (this->parse_message_header_i (incoming, state) == -1)
+ return -1;
}
- // Just send us back to the reactor so that we can for more data to
- // come in .
return 0;
}
-ACE_Data_Block *
-TAO_GIOP_Message_Reactive_Handler::steal_data_block (void)
-{
- ACE_Data_Block *db =
- this->current_buffer_.data_block ()->clone_nocopy ();
-
- ACE_Data_Block *old_db =
- this->current_buffer_.replace_data_block (db);
-
- ACE_CDR::mb_align (&this->current_buffer_);
-
- return old_db;
-}
-
-
-void
-TAO_GIOP_Message_Reactive_Handler::reset (int reset_flag)
-{
- // Reset the contents of the message state
- this->message_state_.reset (reset_flag);
-
- // Reset the current buffer
- this->current_buffer_.reset ();
-
- ACE_CDR::mb_align (&this->current_buffer_);
-
- if (this->message_status_ != TAO_GIOP_MULTIPLE_MESSAGES)
- {
- this->supp_buffer_.reset ();
- ACE_CDR::mb_align (&this->supp_buffer_);
- }
-
-}
-
-
int
-TAO_GIOP_Message_Reactive_Handler::parse_message_header_i (char *buf)
+TAO_GIOP_Message_Reactive_Handler::parse_message_header_i (ACE_Message_Block &incoming,
+ TAO_GIOP_Message_State &state)
{
if (TAO_debug_level > 8)
{
ACE_DEBUG ((LM_DEBUG, "TAO (%P|%t) - parsing header\n"));
}
- // Check whether we have a GIOP Message in the first place
- if (this->parse_magic_bytes (buf) == -1)
- return -1;
+ // Grab the rd_ptr_ from the message block..
+ char *buf = incoming.rd_ptr ();
- // Let us be specific that this is for 1.0
- if (this->message_state_.giop_version.minor == 0 &&
- this->message_state_.giop_version.major == 1)
+ // Parse the magic bytes first
+ if (this->parse_magic_bytes (buf) == -1)
{
- this->message_state_.byte_order =
- buf[TAO_GIOP_MESSAGE_FLAGS_OFFSET];
-
- if (this->message_state_.byte_order != 0 &&
- this->message_state_.byte_order != 1)
- {
- if (TAO_debug_level > 2)
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("TAO (%P|%t) invalid byte order <%d>")
- ACE_TEXT (" for version <1.0>\n"),
- this->message_state_.byte_order));
- return -1;
- }
+ return -1;
}
- else
- {
- // Read the byte ORDER
- this->message_state_.byte_order =
- (CORBA::Octet) (buf[TAO_GIOP_MESSAGE_FLAGS_OFFSET]& 0x01);
- // Read the fragment bit
- this->message_state_.more_fragments =
- (CORBA::Octet) (buf[TAO_GIOP_MESSAGE_FLAGS_OFFSET]& 0x02);
+ // Get the version information
+ if (this->get_version_info (buf, state) == -1)
+ return -1;
- if ((buf[TAO_GIOP_MESSAGE_FLAGS_OFFSET] & ~0x3) != 0)
- {
- if (TAO_debug_level > 2)
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("TAO (%P|%t) invalid flags for <%d>")
- ACE_TEXT (" for version <%d %d> \n"),
- buf[TAO_GIOP_MESSAGE_FLAGS_OFFSET],
- this->message_state_.giop_version.major,
- this->message_state_.giop_version.minor));
- return -1;
- }
- }
+ // Get the byte order information...
+ if (this->get_byte_order_info (buf, state) == -1)
+ return -1;
// Get the message type
- this->message_state_.message_type =
- buf[TAO_GIOP_MESSAGE_TYPE_OFFSET];
-
- // Get the payload size. If the payload size is greater than the
- // length then set the length of the message block to that
- // size. Move the rd_ptr to the end of the GIOP header
- this->message_state_.message_size = this->get_payload_size (buf);
+ state.message_type = buf[TAO_GIOP_MESSAGE_TYPE_OFFSET];
- // If the message_size or the payload_size is zero then something
- // is fishy. So return an error.
- if (this->message_state_.message_size == 0)
- return -1;
+ // Get the payload size
+ this->get_payload_size (buf, state);
- if (TAO_debug_level > 2)
- {
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("TAO (%P|%t) Parsed header = <%d,%d,%d,%d,%d>\n"),
- this->message_state_.giop_version.major,
- this->message_state_.giop_version.minor,
- this->message_state_.byte_order,
- this->message_state_.message_type,
- this->message_state_.message_size));
- }
+ // Parse the
+ int retval = this->parse_fragment_header (buf + TAO_GIOP_MESSAGE_HEADER_LEN,
+ len);
- return 1;
-}
+ // Set the pointer read pointer position in the
+ // <current_buffer_>
+ size_t pos = TAO_GIOP_MESSAGE_HEADER_LEN;
+ if (retval)
+ {
+ // We had a fragment header, so the position should be
+ // beyond that
+ pos += TAO_GIOP_MESSAGE_FRAGMENT_HEADER;
+ }
-int
-TAO_GIOP_Message_Reactive_Handler::parse_fragment_header (char *buf,
- size_t length)
-{
- size_t len =
- TAO_GIOP_MESSAGE_FRAGMENT_HEADER + TAO_GIOP_MESSAGE_HEADER_LEN;
-
- // By this point we are doubly sure that we have a more or less
- // valid GIOP message with a valid major revision number.
- if (this->message_state_.more_fragments &&
- this->message_state_.giop_version.minor == 2 &&
- length > len)
- {
- // Fragmented message in GIOP 1.2 should have a fragment header
- // following the GIOP header. Grab the rd_ptr to get that
- // info.
- this->message_state_.request_id = this->read_ulong (buf);
+ this->current_buffer_.rd_ptr (pos);
+ buf = this->current_buffer_.rd_ptr ();
- // As we parsed the header
- return 1;
+ // The GIOP header has been parsed. Set the status to wait for
+ // payload
+ this->message_status_ = TAO_GIOP_WAITING_FOR_PAYLOAD;
}
+ return 1;
- return 0;
-}
int
@@ -406,7 +108,7 @@ TAO_GIOP_Message_Reactive_Handler::parse_magic_bytes (char *buf)
&& buf [2] == 0x4f // 'O'
&& buf [3] == 0x50)) // 'P'
{
- // For the present...
+ // For the present...
if (TAO_debug_level > 0)
ACE_DEBUG ((LM_DEBUG,
ACE_TEXT ("TAO (%P|%t) bad header, "
@@ -418,12 +120,25 @@ TAO_GIOP_Message_Reactive_Handler::parse_magic_bytes (char *buf)
return -1;
}
+ return 0;
+}
+
+int
+TAO_GIOP_Message_Reactive_Handler::get_version_info (char *buf,
+ TAO_GIOP_Message_State &state)
+{
+ if (TAO_debug_level > 8)
+ {
+ ACE_DEBUG ((LM_DEBUG, "(%P|%t) Getting version info.. \n"));
+ }
+
// We have a GIOP message on hand. Get its revision numbers
CORBA::Octet incoming_major =
buf[TAO_GIOP_VERSION_MAJOR_OFFSET];
CORBA::Octet incoming_minor =
buf[TAO_GIOP_VERSION_MINOR_OFFSET];
+ // Check the revision information
if (TAO_GIOP_Message_Generator_Parser_Impl::check_revision (
incoming_major,
incoming_minor) == 0)
@@ -439,55 +154,81 @@ TAO_GIOP_Message_Reactive_Handler::parse_magic_bytes (char *buf)
}
// Set the version
- this->message_state_.giop_version.minor = incoming_minor;
- this->message_state_.giop_version.major = incoming_major;
+ state.giop_version.minor = incoming_minor;
+ state.giop_version.major = incoming_major;
return 0;
}
+int
+TAO_GIOP_Message_Reactive_Handler::get_byte_order_info (char *buf,
+ TAO_GIOP_Message_State &message_state)
+{
+ if (TAO_debug_level > 8)
+ {
+ ACE_DEBUG ((LM_DEBUG, "(%P|%t) Getting byte order info.. \n"));
+ }
+
+ // Let us be specific that this is for 1.0
+ if (message_state.giop_version.minor == 0 &&
+ message_state.giop_version.major == 1)
+ {
+ message_state_.byte_order =
+ buf[TAO_GIOP_MESSAGE_FLAGS_OFFSET];
+
+ if (message_state.byte_order != 0 &&
+ message_state.byte_order != 1)
+ {
+ if (TAO_debug_level > 2)
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("TAO (%P|%t) invalid byte order <%d>")
+ ACE_TEXT (" for version <1.0>\n"),
+ message_state.byte_order));
+ return -1;
+ }
+ }
+ else
+ {
+ // Read the byte ORDER
+ message_state.byte_order =
+ (CORBA::Octet) (buf[TAO_GIOP_MESSAGE_FLAGS_OFFSET]& 0x01);
+
+ // Read the fragment bit
+ message_state.more_fragments =
+ (CORBA::Octet) (buf[TAO_GIOP_MESSAGE_FLAGS_OFFSET]& 0x02);
+
+ if ((buf[TAO_GIOP_MESSAGE_FLAGS_OFFSET] & ~0x3) != 0)
+ {
+ if (TAO_debug_level > 2)
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("TAO (%P|%t) invalid flags for <%d>")
+ ACE_TEXT (" for version <%d %d> \n"),
+ buf[TAO_GIOP_MESSAGE_FLAGS_OFFSET],
+ message_state_.giop_version.major,
+ message_state_.giop_version.minor));
+ return -1;
+ }
+ }
+
+ return 0;
+}
CORBA::ULong
-TAO_GIOP_Message_Reactive_Handler::get_payload_size (char *rd_ptr)
+TAO_GIOP_Message_Reactive_Handler::get_payload_size (char *rd_ptr,
+ TAO_GIOP_Message_State &message_state)
{
// Move the read pointer
rd_ptr += TAO_GIOP_MESSAGE_SIZE_OFFSET;
- CORBA::ULong x = this->read_ulong (rd_ptr);
-
- if ((x + TAO_GIOP_MESSAGE_HEADER_LEN) > this->message_size_)
- {
- if (ACE_CDR::grow (&this->current_buffer_,
- x + TAO_GIOP_MESSAGE_HEADER_LEN) == -1)
- {
- if (TAO_debug_level > 0)
- ACE_ERROR ((LM_ERROR,
- ACE_TEXT ("(%P | %t) Unable to increase the size \n")
- ACE_TEXT ("of the buffer \n")));
- return 0;
- }
-
- // New message size is the size of the now larger buffer.
- this->message_size_ = x +
- TAO_GIOP_MESSAGE_HEADER_LEN +
- ACE_CDR::MAX_ALIGNMENT;
- }
-
- // Set the read pointer to the end of the GIOP message
- // this->current_buffer_.rd_ptr (TAO_GIOP_MESSAGE_HEADER_LEN);
- return x;
-}
+ CORBA::ULong x = 0;
-CORBA::ULong
-TAO_GIOP_Message_Reactive_Handler::read_ulong (const char *ptr)
-{
size_t msg_size = 4;
- char *buf = ACE_ptr_align_binary (ptr,
+ char *buf = ACE_ptr_align_binary (rd_ptr,
msg_size);
- CORBA::ULong x;
#if !defined (ACE_DISABLE_SWAP_ON_READ)
- if (!(this->message_state_.byte_order != ACE_CDR_BYTE_ORDER))
+ if (!(state.byte_order != ACE_CDR_BYTE_ORDER))
{
x = *ACE_reinterpret_cast (ACE_CDR::ULong*, buf);
}
@@ -499,76 +240,5 @@ TAO_GIOP_Message_Reactive_Handler::read_ulong (const char *ptr)
x = *ACE_reinterpret_cast(ACE_CDR::ULong*, buf);
#endif /* ACE_DISABLE_SWAP_ON_READ */
- return x;
-}
-
-
-
-
-int
-TAO_GIOP_Message_Reactive_Handler::get_message (void)
-{
- if (this->message_status_ == TAO_GIOP_WAITING_FOR_PAYLOAD)
- {
- size_t len = this->supp_buffer_.length ();
- char * buf =
- this->current_buffer_.rd_ptr ();
-
- buf -= TAO_GIOP_MESSAGE_HEADER_LEN;
-
- if (len == this->message_state_.message_size)
- {
- // If the buffer length is equal to the size of the payload we
- // have exactly one message. Check whether we have received
- // only the first part of the fragment.
- this->message_status_ = TAO_GIOP_WAITING_FOR_HEADER;
- this->current_buffer_.copy (this->supp_buffer_.rd_ptr (),
- this->message_state_.message_size);
-
- // The message will be dumped only if the debug level is
- // greater than 5 anyway.
- this->mesg_base_->dump_msg (
- "Recv msg",
- ACE_reinterpret_cast (u_char *,
- buf),
- len +
- TAO_GIOP_MESSAGE_HEADER_LEN);
-
- this->supp_buffer_.rd_ptr (this->message_state_.message_size);
- return this->message_state_.is_complete (this->current_buffer_);
- }
- else if (len > this->message_state_.message_size)
- {
- // If the length is greater we have received some X messages
- // and a part of X + 1 messages (probably) with X varying
- // from 1 to N.
- this->message_status_ = TAO_GIOP_MULTIPLE_MESSAGES;
-
- this->current_buffer_.copy (this->supp_buffer_.rd_ptr (),
- this->message_state_.message_size);
-
- // The message will be dumped only if the debug level is
- // greater than 5 anyway.
- this->mesg_base_->dump_msg (
- "Recv msg",
- ACE_reinterpret_cast (u_char *,
- buf),
- len +
- TAO_GIOP_MESSAGE_HEADER_LEN);
-
- this->supp_buffer_.rd_ptr (this->message_state_.message_size);
- return this->message_state_.is_complete (this->current_buffer_);
- }
- else
- {
- // The remaining message in the supp buffer
- this->current_buffer_.copy (this->supp_buffer_.rd_ptr (),
- this->supp_buffer_.length ());
-
- // Reset the supp buffer now
- this->supp_buffer_.reset ();
- }
- }
-
- return 0;
+ message_state.message_size = x;
}
diff --git a/TAO/tao/GIOP_Message_Reactive_Handler.h b/TAO/tao/GIOP_Message_Reactive_Handler.h
index b38a74c27c5..16c0503851e 100644
--- a/TAO/tao/GIOP_Message_Reactive_Handler.h
+++ b/TAO/tao/GIOP_Message_Reactive_Handler.h
@@ -13,29 +13,19 @@
#ifndef TAO_GIOP_MESSAGE_REACTIVE_HANDLER_H
#define TAO_GIOP_MESSAGE_REACTIVE_HANDLER_H
#include "ace/pre.h"
-#include "ace/Message_Block.h"
+
#if !defined (ACE_LACKS_PRAGMA_ONCE)
# pragma once
#endif /* ACE_LACKS_PRAGMA_ONCE */
-#include "tao/GIOP_Message_State.h"
-class TAO_Transport;
+
class TAO_ORB_Core;
class TAO_GIOP_Message_Base;
+class TAO_GIOP_Message_State;
+class ACE_Message_Block;
-enum TAO_GIOP_Message_Status
-{
- /// The buffer is waiting for the header of the message yet
- TAO_GIOP_WAITING_FOR_HEADER = 0,
-
- /// The buffer is waiting for the payload to appear on the socket
- TAO_GIOP_WAITING_FOR_PAYLOAD,
-
- /// The buffer has got multiple messages
- TAO_GIOP_MULTIPLE_MESSAGES
-};
/**
* @class TAO_GIOP_Message_Reactive_Handler
@@ -54,6 +44,9 @@ enum TAO_GIOP_Message_Status
* reading the header and the payload seperately.
*/
+
+
+#if 0
class TAO_Export TAO_GIOP_Message_Reactive_Handler
{
public:
@@ -79,7 +72,7 @@ public:
/// - We have sufficient info for processing the header and we
/// processed it succesfully. (return 1);
/// - Any errors in processing will return a -1.
- int parse_message_header (void);
+ int parse_message_header (ACE_Message_Block &message_block);
/// Check whether we have atleast one complete message ready for
/// processing.
@@ -173,7 +166,11 @@ private:
/// is then sent to the higher layers of the ORB.
ACE_Message_Block supp_buffer_;
};
+#if defined (__ACE_INLINE__)
+# include "tao/GIOP_Message_Reactive_Handler.inl"
+#endif /* __ACE_INLINE__ */
+#endif
const size_t TAO_GIOP_MESSAGE_HEADER_LEN = 12;
const size_t TAO_GIOP_MESSAGE_SIZE_OFFSET = 8;
@@ -183,9 +180,7 @@ const size_t TAO_GIOP_VERSION_MINOR_OFFSET = 5;
const size_t TAO_GIOP_VERSION_MAJOR_OFFSET = 4;
const size_t TAO_GIOP_MESSAGE_FRAGMENT_HEADER = 4;
-#if defined (__ACE_INLINE__)
-# include "tao/GIOP_Message_Reactive_Handler.inl"
-#endif /* __ACE_INLINE__ */
+
#include "ace/post.h"
#endif /*TAO_GIOP_MESSAGE_REACTIVE_HANDLER_H*/
diff --git a/TAO/tao/GIOP_Message_Reactive_Handler.inl b/TAO/tao/GIOP_Message_Reactive_Handler.inl
index 91211b34464..66b856377dc 100644
--- a/TAO/tao/GIOP_Message_Reactive_Handler.inl
+++ b/TAO/tao/GIOP_Message_Reactive_Handler.inl
@@ -3,6 +3,8 @@
+
+#if 0
ACE_INLINE TAO_GIOP_Message_State &
TAO_GIOP_Message_Reactive_Handler::message_state (void)
{
@@ -28,3 +30,5 @@ TAO_GIOP_Message_Reactive_Handler::wr_pos (void) const
return
this->current_buffer_.wr_ptr () - this->current_buffer_.base ();
}
+
+#endif
diff --git a/TAO/tao/GIOP_Message_State.cpp b/TAO/tao/GIOP_Message_State.cpp
index c71c5a638e5..3171f3de61b 100644
--- a/TAO/tao/GIOP_Message_State.cpp
+++ b/TAO/tao/GIOP_Message_State.cpp
@@ -1,115 +1,278 @@
-// -*- C++ -*-
-
-//$Id$
+// $Id$
#include "tao/GIOP_Message_State.h"
-#include "tao/GIOP_Utils.h"
+#include "tao/GIOP_Message_Generator_Parser_Impl.h"
#include "tao/ORB_Core.h"
+#include "tao/Pluggable.h"
+#include "tao/debug.h"
+#include "tao/GIOP_Message_Base.h"
+//#include "Transport.h"
#if !defined (__ACE_INLINE__)
-# include "tao/GIOP_Message_State.i"
+# include "tao/GIOP_Message_State.inl"
#endif /* __ACE_INLINE__ */
+
ACE_RCSID(tao, GIOP_Message_State, "$Id$")
- TAO_GIOP_Message_State::TAO_GIOP_Message_State (TAO_ORB_Core* /*orb_core*/)
- : byte_order (TAO_ENCAP_BYTE_ORDER),
- message_type (TAO_GIOP_MESSAGERROR),
- message_size (0),
- request_id (0),
- // Problem similar to GIOP_Message_handler.cpp - Bala
- fragmented_messages (ACE_CDR::DEFAULT_BUFSIZE),
- more_fragments (0)
+TAO_GIOP_Message_State::TAO_GIOP_Message_State (
+ TAO_ORB_Core * /*orb_core*/,
+ TAO_GIOP_Message_Base * /*base*/)
+ : giop_version_ (TAO_DEF_GIOP_MAJOR,
+ TAO_DEF_GIOP_MINOR),
+ byte_order_ (0),
+ message_type_ (0),
+ message_size_ (0),
+ request_id_ (0),
+ more_fragments_ (0),
+ missing_data_ (0),
+ message_status_ (TAO_GIOP_WAITING_FOR_HEADER)
{
- //giop_version.major = TAO_DEF_GIOP_MAJOR;
- //giop_version.minor = TAO_DEF_GIOP_MINOR;
+
+}
+
+
+int
+TAO_GIOP_Message_State::parse_message_header (ACE_Message_Block &incoming)
+{
+ // @@Bala: Need to make a check whether we are waiting for the
+ // header...
+ if (incoming.length () > TAO_GIOP_MESSAGE_HEADER_LEN)
+ {
+ // Parse the GIOP header
+ if (this->parse_message_header_i (incoming) == -1)
+ return -1;
+ }
+
+ return 0;
+}
+
+int
+TAO_GIOP_Message_State::parse_message_header_i (ACE_Message_Block &incoming)
+{
+ if (TAO_debug_level > 8)
+ {
+ ACE_DEBUG ((LM_DEBUG, "TAO (%P|%t) - parsing header\n"));
+ }
+
+ // Grab the rd_ptr_ from the message block..
+ char *buf = incoming.rd_ptr ();
+
+ // Parse the magic bytes first
+ if (this->parse_magic_bytes (buf) == -1)
+ {
+ return -1;
+ }
+
+ // Get the version information
+ if (this->get_version_info (buf) == -1)
+ return -1;
+
+ // Get the byte order information...
+ if (this->get_byte_order_info (buf) == -1)
+ return -1;
+
+ // Get the message type
+ this->message_type_ = buf[TAO_GIOP_MESSAGE_TYPE_OFFSET];
+
+ // Get the size of the message..
+ this->get_payload_size (buf);
+
+ if (this->message_size_ == 0)
+ {
+ if (TAO_debug_level > 0)
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("(%P|%t) Message with size 0 recd.. \n")));
+ return -1;
+ }
+
+ if (this->more_fragments_)
+ {
+ // Parse the
+ /*int retval = */
+ this->parse_fragment_header (buf,
+ incoming.length ());
+ }
+
+ // The GIOP header has been parsed. Set the status to wait for
+ // payload
+ this->message_status_ = TAO_GIOP_WAITING_FOR_PAYLOAD;
+
+ return 0;
}
-TAO_GIOP_Message_State::~TAO_GIOP_Message_State (void)
+
+
+
+int
+TAO_GIOP_Message_State::parse_magic_bytes (char *buf)
{
- // @@ Bala: this is not a very useful comment, is it?
- //no-op
+ // The values are hard-coded to support non-ASCII platforms.
+ if (!(buf [0] == 0x47 // 'G'
+ && buf [1] == 0x49 // 'I'
+ && buf [2] == 0x4f // 'O'
+ && buf [3] == 0x50)) // 'P'
+ {
+ if (TAO_debug_level > 0)
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("TAO (%P|%t) bad header, "
+ "magic word [%2.2x,%2.2x,%2.2x,%2.2x]\n"),
+ buf[0],
+ buf[1],
+ buf[2],
+ buf[3]));
+ return -1;
+ }
+
+ return 0;
}
int
-TAO_GIOP_Message_State::is_complete (ACE_Message_Block &current_buf)
+TAO_GIOP_Message_State::get_version_info (char *buf)
{
- if (this->more_fragments)
+ if (TAO_debug_level > 8)
{
- if (this->fragmented_messages.length () == 0)
+ ACE_DEBUG ((LM_DEBUG, "(%P|%t) Getting version info.. \n"));
+ }
+
+ // We have a GIOP message on hand. Get its revision numbers
+ CORBA::Octet incoming_major =
+ buf[TAO_GIOP_VERSION_MAJOR_OFFSET];
+ CORBA::Octet incoming_minor =
+ buf[TAO_GIOP_VERSION_MINOR_OFFSET];
+
+ // Check the revision information
+ if (TAO_GIOP_Message_Generator_Parser_Impl::check_revision (
+ incoming_major,
+ incoming_minor) == 0)
+ {
+ if (TAO_debug_level > 0)
{
- this->first_fragment_byte_order = this->byte_order;
- this->first_fragment_giop_version = this->giop_version;
- this->first_fragment_message_type = this->message_type;
- // this->fragments_end = this->fragments_begin = current;
- this->fragmented_messages.copy (current_buf.rd_ptr (),
- current_buf.length ());
-
- // Reset the buffer
- current_buf.reset ();
-
- // Reset our state
- this->reset ();
- return 0;
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("TAO (%P|%t|%N|%l) bad version <%d.%d>\n"),
+ incoming_major, incoming_minor));
}
- return this->append_fragment (current_buf);
+ return -1;
}
- if (this->fragmented_messages.length () != 0)
+ // Set the version
+ this->giop_version_.minor = incoming_minor;
+ this->giop_version_.major = incoming_major;
+
+ return 0;
+}
+
+int
+TAO_GIOP_Message_State::get_byte_order_info (char *buf)
+{
+ if (TAO_debug_level > 8)
{
- // This is the last message, but we must defragment before
- // sending
- if (this->append_fragment (current_buf) == -1)
- return -1;
+ ACE_DEBUG ((LM_DEBUG, "(%P|%t) Getting byte order info.. \n"));
+ }
- // Copy the entire message block into <current_buf>
- current_buf.data_block (this->fragmented_messages.data_block ()->clone ());
+ // Let us be specific that this is for 1.0
+ if (this->giop_version_.minor == 0 &&
+ this->giop_version_.major == 1)
+ {
+ this->byte_order_ =
+ buf[TAO_GIOP_MESSAGE_FLAGS_OFFSET];
- this->fragmented_messages.reset ();
+ if (this->byte_order_ != 0 &&
+ this->byte_order_ != 1)
+ {
+ if (TAO_debug_level > 2)
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("TAO (%P|%t) invalid byte order <%d>")
+ ACE_TEXT (" for version <1.0>\n"),
+ this->byte_order_));
+ return -1;
+ }
+ }
+ else
+ {
+ // Read the byte ORDER
+ this->byte_order_ =
+ (CORBA::Octet) (buf[TAO_GIOP_MESSAGE_FLAGS_OFFSET]& 0x01);
- this->byte_order = this->first_fragment_byte_order;
- this->giop_version = this->first_fragment_giop_version;
- this->message_type = this->first_fragment_message_type;
+ // Read the fragment bit
+ this->more_fragments_ =
+ (CORBA::Octet) (buf[TAO_GIOP_MESSAGE_FLAGS_OFFSET]& 0x02);
- // This message has no more fragments, and there where no fragments
- // before it, just return. Notice that current_buf has the
- // *right* contents
+ if ((buf[TAO_GIOP_MESSAGE_FLAGS_OFFSET] & ~0x3) != 0)
+ {
+ if (TAO_debug_level > 2)
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("TAO (%P|%t) invalid flags for <%d>")
+ ACE_TEXT (" for version <%d %d> \n"),
+ buf[TAO_GIOP_MESSAGE_FLAGS_OFFSET],
+ this->giop_version_.major,
+ this->giop_version_.minor));
+ return -1;
+ }
}
+ return 0;
+}
- return 1;
+void
+TAO_GIOP_Message_State::get_payload_size (char *rd_ptr)
+{
+ // Move the read pointer
+ rd_ptr += TAO_GIOP_MESSAGE_SIZE_OFFSET;
+
+ this->message_size_ = this->read_ulong (rd_ptr);
}
+
+
int
-TAO_GIOP_Message_State::append_fragment (ACE_Message_Block& current)
+TAO_GIOP_Message_State::parse_fragment_header (char *buf,
+ size_t length)
{
- if (this->first_fragment_byte_order != this->byte_order
- || this->first_fragment_giop_version.major != this->giop_version.major
- || this->first_fragment_giop_version.minor != this->giop_version.minor)
+ size_t len =
+ TAO_GIOP_MESSAGE_FRAGMENT_HEADER + TAO_GIOP_MESSAGE_HEADER_LEN;
+
+ buf += TAO_GIOP_MESSAGE_HEADER_LEN;
+
+ // By this point we are doubly sure that we have a more or less
+ // valid GIOP message with a valid major revision number.
+ if (this->giop_version_.minor == 2 && length > len)
{
- // Yes, print it out in all debug levels!. This is an error by
- // CORBA 2.4 spec
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("TAO (%P|%t) incompatible fragments:\n")
- ACE_TEXT (" Different GIOP versions or byte order\n")));
- this->reset ();
- return -1;
+ // Fragmented message in GIOP 1.2 should have a fragment header
+ // following the GIOP header. Grab the rd_ptr to get that
+ // info.
+ this->request_id_ = this->read_ulong (buf);
+
+ // As we parsed the header
+ return 1;
}
- size_t req_size =
- this->fragmented_messages.size () + current.length ();
+ return 0;
+}
- this->fragmented_messages.size (req_size);
+CORBA::ULong
+TAO_GIOP_Message_State::read_ulong (char *rd_ptr)
+{
+ CORBA::ULong x = 0;
- // Copy the message
- this->fragmented_messages.copy (current.rd_ptr (),
- current.length ());
+ size_t msg_size = 4;
- current.reset ();
+ char *buf = ACE_ptr_align_binary (rd_ptr,
+ msg_size);
- // Reset our state
- this->reset ();
+#if !defined (ACE_DISABLE_SWAP_ON_READ)
+ if (!(this->byte_order_ != ACE_CDR_BYTE_ORDER))
+ {
+ x = *ACE_reinterpret_cast (ACE_CDR::ULong*, buf);
+ }
+ else
+ {
+ ACE_CDR::swap_4 (buf, ACE_reinterpret_cast (char*, &x));
+ }
+#else
+ x = *ACE_reinterpret_cast(ACE_CDR::ULong*, buf);
+#endif /* ACE_DISABLE_SWAP_ON_READ */
- return 0;
+ return x;
}
diff --git a/TAO/tao/GIOP_Message_State.h b/TAO/tao/GIOP_Message_State.h
index a71d53869ac..f6d7cd2d4a9 100644
--- a/TAO/tao/GIOP_Message_State.h
+++ b/TAO/tao/GIOP_Message_State.h
@@ -11,23 +11,19 @@
*
* @author Chris Cleeland <cleeland@cs.wustl.edu>
* @author Carlos O' Ryan <coryan@uci.edu>
+ * @author modified by Balachandran Natarajan <bala@cs.wustl.edu>
*/
//=============================================================================
-
-
#ifndef TAO_GIOP_MESSAGE_STATE_H
#define TAO_GIOP_MESSAGE_STATE_H
#include "ace/pre.h"
-#include "tao/TAO_Export.h"
+#include "tao/GIOP_Message_Version.h"
#if !defined (ACE_LACKS_PRAGMA_ONCE)
# pragma once
#endif /* ACE_LACKS_PRAGMA_ONCE */
-#include "tao/corbafwd.h"
-#include "tao/CDR.h"
-#include "tao/GIOP_Message_Version.h"
class TAO_ORB_Core;
@@ -37,49 +33,98 @@ class TAO_ORB_Core;
*
* @brief Generic definitions for Message States.
*
+ * @@Bala: More documentation please...
+ *
* This represents the state of the incoming GIOP message
* As the ORB processes incoming messages it needs to keep track of
* how much of the message has been read, if there are any
* fragments following this message etc.
*/
+
+
+
class TAO_Export TAO_GIOP_Message_State
{
public:
+ friend class TAO_GIOP_Message_Base;
+
+
+ enum TAO_GIOP_Message_Status
+ {
+ /// The header of the message hasn't shown up yet.
+ TAO_GIOP_WAITING_FOR_HEADER = 0,
+
+ /// The payload hasn't fully shown up in the yet
+ TAO_GIOP_WAITING_FOR_PAYLOAD,
+
+ /// The message read has got multiple requests
+ TAO_GIOP_MULTIPLE_MESSAGES
+ };
+
+ /// @@Bala: Documentation please...
+ /// Parse the message header.
+ int parse_message_header (ACE_Message_Block &incoming);
+
+ /// Return the message size
+ CORBA::ULong message_size (void) const;
+
+ /// Return the message size
+ CORBA::ULong payload_size (void) const;
+
+ /// Return the byte order information
+ CORBA::Octet byte_order (void) const;
+
+private:
+
/// Ctor
- TAO_GIOP_Message_State (TAO_ORB_Core *orb_core);
+ TAO_GIOP_Message_State (TAO_ORB_Core *orb_core,
+ TAO_GIOP_Message_Base *base);
- /// Dtor
- ~TAO_GIOP_Message_State (void);
+ /// @@Bala: Documentation please...
+ int parse_message_header_i (ACE_Message_Block &incoming);
- ///Reset the message header state and prepare it to receive the next
- /// event.
- void reset (int reset_contents = 1);
+ /// Checks for the magic word 'GIOP' in the start of the incoing
+ /// stream
+ int parse_magic_bytes (char *buf);
- /// Has the header been received?
- CORBA::Boolean header_received (void) const;
+ /// Extracts the version information from the incoming
+ /// stream. Performs a check for whether the version information is
+ /// right and sets the information in the <state>
+ int get_version_info (char *buf);
- /// Check if the current message is complete, adjusting the fragments
- /// if required...
- int is_complete (ACE_Message_Block &current_buf);
+ /// Extracts the byte order information from the incoming
+ /// stream. Performs a check for whether the byte order information
+ /// right and sets the information in the <state>
+ int get_byte_order_info (char *buf);
- /// Did we get fragmented messages?
- int message_fragmented (void);
+ /// Gets the size of the payload and set the size in the <state>
+ void get_payload_size (char *buf);
- /// Version info
- TAO_GIOP_Message_Version giop_version;
+ /// Parses the GIOP FRAGMENT_HEADER information from the incoming
+ /// stream.
+ int parse_fragment_header (char *buf,
+ size_t length);
+
+ /// Read the unsigned long from the buffer. The <buf> should just
+ /// point to the next 4 bytes data that represent the ULong
+ CORBA::ULong read_ulong (char *buf);
+
+private:
+
+ TAO_GIOP_Message_Version giop_version_;
/// 0 = big, 1 = little
- CORBA::Octet byte_order;
+ CORBA::Octet byte_order_;
/// MsgType above
- CORBA::Octet message_type;
+ CORBA::Octet message_type_;
/// in byte_order!
- CORBA::ULong message_size;
+ CORBA::ULong message_size_;
/// Request Id from the Fragment header
- CORBA::ULong request_id;
+ CORBA::ULong request_id_;
/**
* The fragments are collected in a chain of message blocks (using
@@ -116,20 +161,26 @@ public:
CORBA::Octet first_fragment_message_type;
/// (Requests and Replys)
- CORBA::Octet more_fragments;
-
-private:
- /// Append <current> to the list of fragments
- /// Also resets the state, because the current message was consumed.
- int append_fragment (ACE_Message_Block &current);
+ CORBA::Octet more_fragments_;
+ /// Missing data
+ CORBA::ULong missing_data_;
+ /// @@Bala: Documentation??
+ TAO_GIOP_Message_Status message_status_;
};
+const size_t TAO_GIOP_MESSAGE_HEADER_LEN = 12;
+const size_t TAO_GIOP_MESSAGE_SIZE_OFFSET = 8;
+const size_t TAO_GIOP_MESSAGE_FLAGS_OFFSET = 6;
+const size_t TAO_GIOP_MESSAGE_TYPE_OFFSET = 7;
+const size_t TAO_GIOP_VERSION_MINOR_OFFSET = 5;
+const size_t TAO_GIOP_VERSION_MAJOR_OFFSET = 4;
+const size_t TAO_GIOP_MESSAGE_FRAGMENT_HEADER = 4;
#if defined (__ACE_INLINE__)
-# include "tao/GIOP_Message_State.i"
+# include "tao/GIOP_Message_State.inl"
#endif /* __ACE_INLINE__ */
#include "ace/post.h"
diff --git a/TAO/tao/GIOP_Message_State.i b/TAO/tao/GIOP_Message_State.i
index 653bf47d6a8..b2633815ebf 100644
--- a/TAO/tao/GIOP_Message_State.i
+++ b/TAO/tao/GIOP_Message_State.i
@@ -1,6 +1,7 @@
// -*- C++ -*-
//$Id$
+
// ****************************************************************
// @@ Bala: we use the stars to separate classes in ACE+TAO
diff --git a/TAO/tao/IIOP_Acceptor.cpp b/TAO/tao/IIOP_Acceptor.cpp
index 1a0e99030e2..4cfe224bae8 100644
--- a/TAO/tao/IIOP_Acceptor.cpp
+++ b/TAO/tao/IIOP_Acceptor.cpp
@@ -2,6 +2,7 @@
// $Id$
+
#include "tao/IIOP_Acceptor.h"
#include "tao/IIOP_Profile.h"
#include "tao/MProfile.h"
diff --git a/TAO/tao/IIOP_Connection_Handler.cpp b/TAO/tao/IIOP_Connection_Handler.cpp
index c05a558efd3..28891e5e092 100644
--- a/TAO/tao/IIOP_Connection_Handler.cpp
+++ b/TAO/tao/IIOP_Connection_Handler.cpp
@@ -45,7 +45,7 @@ TAO_IIOP_Connection_Handler::TAO_IIOP_Connection_Handler (TAO_ORB_Core *orb_core
{
TAO_IIOP_Transport* specific_transport = 0;
ACE_NEW(specific_transport,
- TAO_IIOP_Transport(this, orb_core, 0));
+ TAO_IIOP_Transport (this, orb_core, 0));
// store this pointer (indirectly increment ref count)
this->transport(specific_transport);
@@ -244,6 +244,12 @@ TAO_IIOP_Connection_Handler::fetch_handle (void)
}
int
+TAO_IIOP_Connection_Handler::resume_handler (void)
+{
+ return 1;
+}
+
+int
TAO_IIOP_Connection_Handler::handle_output (ACE_HANDLE)
{
return this->transport ()->handle_output ();
@@ -313,45 +319,22 @@ int
TAO_IIOP_Connection_Handler::handle_input (ACE_HANDLE h)
{
- return this->handle_input_i (h);
-}
-
-
-int
-TAO_IIOP_Connection_Handler::handle_input_i (ACE_HANDLE,
- ACE_Time_Value *max_wait_time)
-{
+ // Increase the reference count on the upcall that have passed us.
this->pending_upcalls_++;
- // Call the transport read the message
- int result = this->transport ()->read_process_message (max_wait_time);
-
- // Now the message has been read
- if (result == -1 && TAO_debug_level > 0)
- {
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("TAO (%P|%t) - %p\n"),
- ACE_TEXT ("IIOP_Connection_Handler::read_message \n")));
-
- }
+ int retval = this->transport ()->handle_input_i (h);
// The upcall is done. Bump down the reference count
if (--this->pending_upcalls_ <= 0)
- result = -1;
+ retval = -1;
- if (result == -1 ||
- result == 1)
- return result;
-
- return 0;
+ return retval;
}
-
-
// ****************************************************************
#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)
diff --git a/TAO/tao/IIOP_Connection_Handler.h b/TAO/tao/IIOP_Connection_Handler.h
index fe220557d87..40a53456a4f 100644
--- a/TAO/tao/IIOP_Connection_Handler.h
+++ b/TAO/tao/IIOP_Connection_Handler.h
@@ -111,6 +111,10 @@ public:
/// Return the underlying handle
virtual ACE_HANDLE fetch_handle (void);
+ /// Send a TRUE value to the reactor, so that the reactor does not
+ /// resume the handler
+ virtual int resume_handler (void);
+
/// Use peer() to drain the outgoing message queue
virtual int handle_output (ACE_HANDLE);
@@ -131,8 +135,6 @@ protected:
/// ensure that server threads eventually exit.
virtual int handle_input (ACE_HANDLE = ACE_INVALID_HANDLE);
- virtual int handle_input_i (ACE_HANDLE = ACE_INVALID_HANDLE,
- ACE_Time_Value *max_wait_time = 0);
private:
diff --git a/TAO/tao/IIOP_Connector.cpp b/TAO/tao/IIOP_Connector.cpp
index 0561c25189a..d3e5eb60938 100644
--- a/TAO/tao/IIOP_Connector.cpp
+++ b/TAO/tao/IIOP_Connector.cpp
@@ -17,7 +17,6 @@ ACE_RCSID (TAO,
IIOP_Connector,
"$Id$")
-
#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)
template class ACE_Node<ACE_INET_Addr>;
diff --git a/TAO/tao/IIOP_Transport.cpp b/TAO/tao/IIOP_Transport.cpp
index a680226fd1e..997226316cc 100644
--- a/TAO/tao/IIOP_Transport.cpp
+++ b/TAO/tao/IIOP_Transport.cpp
@@ -16,7 +16,7 @@
#include "tao/ORB_Core.h"
#include "tao/debug.h"
#include "tao/GIOP_Message_Base.h"
-#include "tao/GIOP_Message_Lite.h"
+//#include "tao/GIOP_Message_Lite.h"
#if !defined (__ACE_INLINE__)
# include "tao/IIOP_Transport.i"
@@ -26,12 +26,13 @@ ACE_RCSID (tao, IIOP_Transport, "$Id$")
TAO_IIOP_Transport::TAO_IIOP_Transport (TAO_IIOP_Connection_Handler *handler,
TAO_ORB_Core *orb_core,
- CORBA::Boolean flag)
+ CORBA::Boolean /*flag*/)
: TAO_Transport (TAO_TAG_IIOP_PROFILE,
orb_core)
, connection_handler_ (handler)
, messaging_object_ (0)
{
+#if 0
if (flag)
{
// Use the lite version of the protocol
@@ -39,6 +40,7 @@ TAO_IIOP_Transport::TAO_IIOP_Transport (TAO_IIOP_Connection_Handler *handler,
TAO_GIOP_Message_Lite (orb_core));
}
else
+#endif
{
// Use the normal GIOP object
ACE_NEW (this->messaging_object_,
@@ -87,6 +89,7 @@ TAO_IIOP_Transport::recv_i (char *buf,
}
+#if 0
int
TAO_IIOP_Transport::read_process_message (ACE_Time_Value *max_wait_time,
int block)
@@ -120,12 +123,13 @@ TAO_IIOP_Transport::read_process_message (ACE_Time_Value *max_wait_time,
// See we use the reactor semantics again
while (result > 0)
{
- result = this->process_message ();
+ // result = this->process_message ();
}
return result;
}
+#endif
int
TAO_IIOP_Transport::register_handler_i (void)
@@ -261,6 +265,16 @@ TAO_IIOP_Transport::tear_listen_point_list (TAO_InputCDR &cdr)
return this->connection_handler_->process_listen_point_list (listen_list);
}
+
+
+
+#if 0
+int
+TAO_IIOP_Transport::process_message (ACE_Message_Block &message_block)
+{
+ // Call the messaging object to process the read data
+}
+
int
TAO_IIOP_Transport::process_message (void)
{
@@ -384,7 +398,7 @@ TAO_IIOP_Transport::process_message (void)
return 1;
}
-
+#endif /*if 0 */
void
TAO_IIOP_Transport::set_bidir_context_info (TAO_Operation_Details &opdetails)
diff --git a/TAO/tao/IIOP_Transport.h b/TAO/tao/IIOP_Transport.h
index 940ab085b2e..9de0e2c6c01 100644
--- a/TAO/tao/IIOP_Transport.h
+++ b/TAO/tao/IIOP_Transport.h
@@ -82,8 +82,10 @@ protected:
size_t len,
const ACE_Time_Value *s = 0);
+#if 0
virtual int read_process_message (ACE_Time_Value *max_time_value = 0,
int block =0);
+#endif
virtual int register_handler_i (void);
@@ -119,7 +121,7 @@ public:
private:
/// Process the message that we have read
- int process_message (void);
+ int process_message (ACE_Message_Block &message);
/// Set the Bidirectional context info in the service context list
void set_bidir_context_info (TAO_Operation_Details &opdetails);
diff --git a/TAO/tao/IORInfo.cpp b/TAO/tao/IORInfo.cpp
index ab7edb16ccb..0e3e2071078 100644
--- a/TAO/tao/IORInfo.cpp
+++ b/TAO/tao/IORInfo.cpp
@@ -2,6 +2,7 @@
//
// $Id$
+
#include "IORInfo.h"
#include "PolicyC.h"
#include "IOPC.h"
@@ -9,10 +10,9 @@
ACE_RCSID (tao, IORInfo, "$Id$")
-
TAO_IORInfo::TAO_IORInfo (TAO_ORB_Core *orb_core,
- TAO_MProfile &mp,
- CORBA::PolicyList *policy_list)
+ TAO_MProfile &mp,
+ CORBA::PolicyList *policy_list)
: orb_core_ (orb_core),
mp_ (mp),
policy_list_ (policy_list)
@@ -25,7 +25,7 @@ TAO_IORInfo::~TAO_IORInfo (void)
CORBA::Policy_ptr
TAO_IORInfo::get_effective_policy (CORBA::PolicyType type,
- CORBA::Environment &ACE_TRY_ENV)
+ CORBA::Environment &ACE_TRY_ENV)
ACE_THROW_SPEC ((CORBA::SystemException))
{
// Check the policy list supplied by the POA.
@@ -34,12 +34,12 @@ TAO_IORInfo::get_effective_policy (CORBA::PolicyType type,
for (CORBA::ULong i = 0; i < policy_count; ++i)
{
CORBA::PolicyType pt =
- (*(this->policy_list_))[i]->policy_type (
+ (*(this->policy_list_))[i]->policy_type (
TAO_ENV_SINGLE_ARG_PARAMETER);
ACE_CHECK_RETURN (CORBA::Policy::_nil ());
if (pt == type)
- return CORBA::Policy::_duplicate ((*(this->policy_list_))[i]);
+ return CORBA::Policy::_duplicate ((*(this->policy_list_))[i]);
}
// TODO: Now check the global ORB policies.
@@ -47,12 +47,12 @@ TAO_IORInfo::get_effective_policy (CORBA::PolicyType type,
ACE_THROW_RETURN (CORBA::INV_POLICY (TAO_OMG_VMCID | 2,
CORBA::COMPLETED_NO),
- CORBA::Policy::_nil ());
+ CORBA::Policy::_nil ());
}
void
TAO_IORInfo::add_ior_component (const IOP::TaggedComponent &component,
- CORBA::Environment &ACE_TRY_ENV)
+ CORBA::Environment &ACE_TRY_ENV)
ACE_THROW_SPEC ((CORBA::SystemException))
{
// Add the given tagged component to all profiles.
@@ -85,12 +85,12 @@ TAO_IORInfo::add_ior_component_to_profile (
TAO_Profile *profile = this->mp_.get_profile (i);
if (profile->tag () == profile_id)
- {
- profile->add_tagged_component (component, ACE_TRY_ENV);
+ {
+ profile->add_tagged_component (component, ACE_TRY_ENV);
ACE_CHECK;
found_profile = 1;
- }
+ }
}
// According to the Portable Interceptor specification, we're
diff --git a/TAO/tao/Invocation.cpp b/TAO/tao/Invocation.cpp
index 5f975f1e7b5..c50a3166851 100644
--- a/TAO/tao/Invocation.cpp
+++ b/TAO/tao/Invocation.cpp
@@ -1,5 +1,6 @@
// $Id$
+
#include "Invocation.h"
#include "Principal.h"
#include "Stub.h"
@@ -29,8 +30,10 @@
# include "Invocation.i"
#endif /* ! __ACE_INLINE__ */
+
ACE_RCSID(tao, Invocation, "$Id$")
+
#if defined (ACE_ENABLE_TIMEPROBES)
static const char *TAO_Invocation_Timeprobe_Description[] =
diff --git a/TAO/tao/Invocation_Endpoint_Selectors.cpp b/TAO/tao/Invocation_Endpoint_Selectors.cpp
index fafe05a84aa..1e9a83da328 100644
--- a/TAO/tao/Invocation_Endpoint_Selectors.cpp
+++ b/TAO/tao/Invocation_Endpoint_Selectors.cpp
@@ -14,6 +14,7 @@
ACE_RCSID(tao, Invocation_Endpoint_Selectors, "$Id$")
+
TAO_Invocation_Endpoint_Selector::~TAO_Invocation_Endpoint_Selector (void)
{
}
diff --git a/TAO/tao/LocalObject.cpp b/TAO/tao/LocalObject.cpp
index e8952ac76b2..2070ba6e8d1 100644
--- a/TAO/tao/LocalObject.cpp
+++ b/TAO/tao/LocalObject.cpp
@@ -2,6 +2,7 @@
//
// $Id$
+
#include "LocalObject.h"
#if !defined (__ACE_INLINE__)
@@ -14,6 +15,7 @@ ACE_RCSID (tao,
LocalObject,
"$Id$")
+
CORBA_LocalObject::~CORBA_LocalObject (void)
{
}
diff --git a/TAO/tao/Makefile b/TAO/tao/Makefile
index 980e6c4d87e..df75fcf4faf 100644
--- a/TAO/tao/Makefile
+++ b/TAO/tao/Makefile
@@ -1,6 +1,6 @@
#----------------------------------------------------------------------------
-
+#
# $Id$
#
# Makefile for TAO
@@ -106,8 +106,6 @@ PLUGGABLE_MESSAGING_FILES = \
Pluggable_Messaging \
Pluggable_Messaging_Utils \
GIOP_Message_Base \
- GIOP_Message_Lite \
- GIOP_Message_Reactive_Handler \
GIOP_Message_Generator_Parser \
GIOP_Message_Generator_Parser_10 \
GIOP_Message_Generator_Parser_11 \
@@ -118,6 +116,7 @@ PLUGGABLE_MESSAGING_FILES = \
target_specification \
GIOP_Message_State \
GIOP_Message_Version \
+ Incoming_Message_Queue \
Tagged_Profile
DEFAULT_RESOURCES_FILES = \
diff --git a/TAO/tao/MessagingC.h b/TAO/tao/MessagingC.h
index 8795ab51435..7b35f808d59 100644
--- a/TAO/tao/MessagingC.h
+++ b/TAO/tao/MessagingC.h
@@ -2,6 +2,7 @@
//
// $Id$
+
// **** Code generated by the The ACE ORB (TAO) IDL Compiler ****
// TAO and the TAO IDL Compiler have been developed by:
// Center for Distributed Object Computing
diff --git a/TAO/tao/Muxed_TMS.cpp b/TAO/tao/Muxed_TMS.cpp
index ee5f9647379..5adb6ae2b7c 100644
--- a/TAO/tao/Muxed_TMS.cpp
+++ b/TAO/tao/Muxed_TMS.cpp
@@ -1,5 +1,6 @@
// $Id$
+
#include "tao/Muxed_TMS.h"
#include "tao/Reply_Dispatcher.h"
#include "tao/GIOP_Message_Version.h"
@@ -9,6 +10,7 @@
ACE_RCSID(tao, Muxed_TMS, "$Id$")
+
TAO_Muxed_TMS::TAO_Muxed_TMS (TAO_Transport *transport)
: TAO_Transport_Mux_Strategy (transport),
request_id_generator_ (0),
diff --git a/TAO/tao/ORB.cpp b/TAO/tao/ORB.cpp
index f54ba3a1d1f..0c0efbaf6db 100644
--- a/TAO/tao/ORB.cpp
+++ b/TAO/tao/ORB.cpp
@@ -1,6 +1,7 @@
// $Id$
+
#include "ORB.h"
#include "ORB_Table.h"
#include "Connector_Registry.h"
@@ -70,6 +71,7 @@ using std::set_unexpected;
ACE_RCSID(tao, ORB, "$Id$")
+
static const char ior_prefix [] = "IOR:";
// = Static initialization.
@@ -1755,6 +1757,7 @@ CORBA_ORB::object_to_string (CORBA::Object_ptr obj,
CORBA::COMPLETED_NO),
0);
+
// Application writer controls what kind of objref strings they get,
// maybe along with other things, by how they initialize the ORB.
@@ -1981,8 +1984,11 @@ CORBA_ORB::ior_string_to_object (const char *str,
int byte_order = *(mb.rd_ptr ());
mb.rd_ptr (1);
mb.wr_ptr (len);
- TAO_InputCDR stream (&mb, byte_order, TAO_DEF_GIOP_MAJOR,
- TAO_DEF_GIOP_MINOR, this->orb_core_);
+ TAO_InputCDR stream (&mb,
+ byte_order,
+ TAO_DEF_GIOP_MAJOR,
+ TAO_DEF_GIOP_MINOR,
+ this->orb_core_);
CORBA::Object_ptr objref = CORBA::Object::_nil ();
stream >> objref;
diff --git a/TAO/tao/Object_Ref_Table.cpp b/TAO/tao/Object_Ref_Table.cpp
index c883296595d..f5892459696 100644
--- a/TAO/tao/Object_Ref_Table.cpp
+++ b/TAO/tao/Object_Ref_Table.cpp
@@ -2,6 +2,7 @@
//
// $Id$
+
#include "Object_Ref_Table.h"
#include "Object.h"
#include "Exception.h"
@@ -13,6 +14,7 @@ ACE_RCSID (tao,
Object_Ref_Table,
"$Id$")
+
// ****************************************************************
TAO_Object_Ref_Table::TAO_Object_Ref_Table (void)
diff --git a/TAO/tao/Pluggable_Messaging.h b/TAO/tao/Pluggable_Messaging.h
index eb8fda573b0..288d946999e 100644
--- a/TAO/tao/Pluggable_Messaging.h
+++ b/TAO/tao/Pluggable_Messaging.h
@@ -120,15 +120,29 @@ public:
virtual void reset (int reset_flag = 1) = 0;
// Reset the messaging object
+ /// @@ Bala: Documentation
+ virtual int parse_incoming_messages (ACE_Message_Block &message_block) = 0;
+
+ /// @@Bala: Documentation please...
+ virtual int is_message_complete (ACE_Message_Block &message_block) = 0;
+
+ virtual size_t missing_data (ACE_Message_Block &incoming) = 0;
+
+ virtual CORBA::Octet byte_order (void) = 0;
+
/// Parse the request message, make an upcall and send the reply back
/// to the "request initiator"
virtual int process_request_message (TAO_Transport *transport,
- TAO_ORB_Core *orb_core) = 0;
+ TAO_ORB_Core *orb_core,
+ ACE_Message_Block &m,
+ CORBA::Octet byte_order) = 0;
/// Parse the reply message that we received and return the reply
/// information though <reply_info>
virtual int process_reply_message (
- TAO_Pluggable_Reply_Params &reply_info) = 0;
+ TAO_Pluggable_Reply_Params &reply_info,
+ ACE_Message_Block &m,
+ CORBA::Octet byte_order) = 0;
/// Generate a reply message with the exception <ex>.
virtual int generate_exception_reply (
diff --git a/TAO/tao/Pluggable_Messaging_Utils.cpp b/TAO/tao/Pluggable_Messaging_Utils.cpp
index 8c6420c3694..fa73c68fa55 100644
--- a/TAO/tao/Pluggable_Messaging_Utils.cpp
+++ b/TAO/tao/Pluggable_Messaging_Utils.cpp
@@ -1,4 +1,5 @@
//$Id$
+
#include "tao/Pluggable_Messaging_Utils.h"
#include "tao/ORB_Core.h"
@@ -8,6 +9,7 @@
ACE_RCSID(tao, Pluggable_Messaging_Utils, "$Id$")
+
TAO_Pluggable_Reply_Params::TAO_Pluggable_Reply_Params (
TAO_ORB_Core *orb_core
)
diff --git a/TAO/tao/PolicyC.cpp b/TAO/tao/PolicyC.cpp
index 6eb368913ae..84cec53c08d 100644
--- a/TAO/tao/PolicyC.cpp
+++ b/TAO/tao/PolicyC.cpp
@@ -2,6 +2,7 @@
//
// $Id$
+
// **** Code generated by the The ACE ORB (TAO) IDL Compiler ****
// TAO and the TAO IDL Compiler have been developed by:
// Center for Distributed Object Computing
diff --git a/TAO/tao/PolicyC.h b/TAO/tao/PolicyC.h
index 1adfc2f1126..f24161fff61 100644
--- a/TAO/tao/PolicyC.h
+++ b/TAO/tao/PolicyC.h
@@ -2,6 +2,7 @@
//
// $Id$
+
// **** Code generated by the The ACE ORB (TAO) IDL Compiler ****
// TAO and the TAO IDL Compiler have been developed by:
// Center for Distributed Object Computing
diff --git a/TAO/tao/PolicyC.i b/TAO/tao/PolicyC.i
index ceb80909d8b..1b2b0c267c7 100644
--- a/TAO/tao/PolicyC.i
+++ b/TAO/tao/PolicyC.i
@@ -2,6 +2,7 @@
//
// $Id$
+
// **** Code generated by the The ACE ORB (TAO) IDL Compiler ****
// TAO and the TAO IDL Compiler have been developed by:
// Center for Distributed Object Computing
diff --git a/TAO/tao/PolicyFactory_Registry.cpp b/TAO/tao/PolicyFactory_Registry.cpp
index 29910a7f3c7..814f6e0d1c4 100644
--- a/TAO/tao/PolicyFactory_Registry.cpp
+++ b/TAO/tao/PolicyFactory_Registry.cpp
@@ -2,6 +2,7 @@
//
// $Id$
+
#include "PolicyFactory_Registry.h"
ACE_RCSID(tao, PolicyFactory_Registry, "$Id$")
diff --git a/TAO/tao/PortableInterceptor.pidl b/TAO/tao/PortableInterceptor.pidl
index e8421db4f8e..b2e4e8980a4 100644
--- a/TAO/tao/PortableInterceptor.pidl
+++ b/TAO/tao/PortableInterceptor.pidl
@@ -2,6 +2,7 @@
//
// $Id$
+
// ================================================================
//
// This file was used to generate the code in PortableInterceptorC.*
diff --git a/TAO/tao/Profile.cpp b/TAO/tao/Profile.cpp
index b7b4f9b17a2..a6b816f6855 100644
--- a/TAO/tao/Profile.cpp
+++ b/TAO/tao/Profile.cpp
@@ -1,5 +1,6 @@
// $Id$
+
#include "Profile.h"
#include "Object_KeyC.h"
@@ -13,6 +14,7 @@
ACE_RCSID(tao, Profile, "$Id$")
+
// ****************************************************************
TAO_Profile::~TAO_Profile (void)
@@ -261,9 +263,9 @@ TAO_Profile::verify_orb_configuration (CORBA::Environment &ACE_TRY_ENV)
if (TAO_debug_level > 0)
ACE_ERROR ((LM_ERROR,
ACE_TEXT ("(%P|%t) Cannot add ")
- ACE_TEXT ("IOP::TaggedComponent to profile.\n")
- ACE_TEXT ("(%P|%t) Standard profile components ")
- ACE_TEXT ("have been disabled or URL style IORs\n")
+ ACE_TEXT ("IOP::TaggedComponent to profile.\n")
+ ACE_TEXT ("(%P|%t) Standard profile components ")
+ ACE_TEXT ("have been disabled or URL style IORs\n")
ACE_TEXT ("(%P|%t) are in use. Try ")
ACE_TEXT ("\"-ORBStdProfileComponents 1\" and/or\n")
ACE_TEXT ("(%P|%t) \"-ORBObjRefStyle IOR\".\n")));
@@ -276,8 +278,8 @@ TAO_Profile::verify_orb_configuration (CORBA::Environment &ACE_TRY_ENV)
ACE_THROW (CORBA::BAD_PARAM (
CORBA_SystemException::_tao_minor_code (
TAO_DEFAULT_MINOR_CODE,
- EINVAL),
- CORBA::COMPLETED_NO));
+ EINVAL),
+ CORBA::COMPLETED_NO));
}
}
@@ -292,8 +294,8 @@ TAO_Profile::verify_profile_version (CORBA::Environment &ACE_TRY_ENV)
if (TAO_debug_level > 0)
ACE_ERROR ((LM_ERROR,
ACE_TEXT ("(%P|%t) Cannot add ")
- ACE_TEXT ("IOP::TaggedComponent to GIOP 1.0")
- ACE_TEXT ("IOR profile.\n")
+ ACE_TEXT ("IOP::TaggedComponent to GIOP 1.0")
+ ACE_TEXT ("IOR profile.\n")
ACE_TEXT ("(%P|%t) Try using a GIOP 1.1 or ")
ACE_TEXT ("greater endpoint.\n")));
@@ -305,8 +307,8 @@ TAO_Profile::verify_profile_version (CORBA::Environment &ACE_TRY_ENV)
ACE_THROW (CORBA::BAD_PARAM (
CORBA_SystemException::_tao_minor_code (
TAO_DEFAULT_MINOR_CODE,
- EINVAL),
- CORBA::COMPLETED_NO));
+ EINVAL),
+ CORBA::COMPLETED_NO));
}
}
diff --git a/TAO/tao/Synch_Reply_Dispatcher.cpp b/TAO/tao/Synch_Reply_Dispatcher.cpp
index 70c17a67629..a2334851139 100644
--- a/TAO/tao/Synch_Reply_Dispatcher.cpp
+++ b/TAO/tao/Synch_Reply_Dispatcher.cpp
@@ -8,6 +8,7 @@
ACE_RCSID(tao, Synch_Reply_Dispatcher, "$Id$")
+
// Constructor.
TAO_Synch_Reply_Dispatcher::TAO_Synch_Reply_Dispatcher (
TAO_ORB_Core *orb_core,
@@ -61,20 +62,11 @@ TAO_Synch_Reply_Dispatcher::dispatch_reply (
// dispatcher is used because the request must be re-sent.
//this->message_state_.reset (0);
- // Steal the buffer so that no copying is done.
- this->reply_cdr_.exchange_data_blocks (params.input_cdr_);
-
- /*if (&this->message_state_ != message_state)
- {
- // The Transport Mux Strategy did not use our Message_State to
- // receive the event, possibly because it is muxing multiple
- // requests over the same connection.
-
- // Steal the buffer so that no copying is done.
- this->message_state_.cdr.steal_from (message_state->cdr);
+ // Transfer the <params.input_cdr_>'s content to this->reply_cdr_
+ ACE_Data_Block *db =
+ this->reply_cdr_.clone_from (params.input_cdr_);
- // There is no need to copy the other fields!
- }*/
+ ACE_UNUSED_ARG (db);
if (this->wait_strategy_ != 0)
{
@@ -91,12 +83,6 @@ TAO_Synch_Reply_Dispatcher::dispatch_reply (
return 1;
}
-/*TAO_GIOP_Message_State *
-TAO_Synch_Reply_Dispatcher::message_state (void)
-{
- return &this->message_state_;
-}*/
-
void
TAO_Synch_Reply_Dispatcher::dispatcher_bound (TAO_Transport *transport)
{
diff --git a/TAO/tao/TAO_Server_Request.cpp b/TAO/tao/TAO_Server_Request.cpp
index c17d1f68ce1..bf1b278f813 100644
--- a/TAO/tao/TAO_Server_Request.cpp
+++ b/TAO/tao/TAO_Server_Request.cpp
@@ -1,5 +1,6 @@
// $Id$
+
// Implementation of the Dynamic Server Skeleton Interface (for GIOP)
#include "TAO_Server_Request.h"
diff --git a/TAO/tao/Transport.cpp b/TAO/tao/Transport.cpp
index eff531ec2df..cd15fe869d3 100644
--- a/TAO/tao/Transport.cpp
+++ b/TAO/tao/Transport.cpp
@@ -1,6 +1,7 @@
// -*- C++ -*-
// $Id$
+
#include "Transport.h"
#include "Exception.h"
@@ -26,6 +27,7 @@
ACE_RCSID(tao, Transport, "$Id$")
+
TAO_Synch_Refcountable::TAO_Synch_Refcountable (ACE_Lock *lock, int refcount)
: ACE_Refcountable (refcount)
, refcount_lock_ (lock)
@@ -67,6 +69,7 @@ TAO_Transport::TAO_Transport (CORBA::ULong tag,
, bidirectional_flag_ (-1)
, head_ (0)
, tail_ (0)
+ , incoming_message_queue_ (orb_core)
, current_deadline_ (ACE_Time_Value::zero)
, flush_timer_id_ (-1)
, transport_timer_ (this)
@@ -735,7 +738,37 @@ TAO_Transport::recv (char *buffer,
return -1;
// now call the template method
- return this->recv_i (buffer, len, timeout);
+ ssize_t n =
+ this->recv_i (buffer, len, timeout);
+
+ // Most of the errors handling is common for
+ // Now the message has been read
+ if (n == -1 && TAO_debug_level > 0)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("TAO (%P|%t) - %p \n"),
+ ACE_TEXT ("TAO - read message failure \n")
+ ACE_TEXT ("TAO - handle_input () \n")));
+ }
+
+ // Error handling
+ if (n == -1)
+ {
+ if (errno == EWOULDBLOCK)
+ return 0;
+
+ // Close the connection
+ this->tms_->connection_closed ();
+
+ return -1;
+ }
+ // @@ What are the other error handling here??
+ else if (n == 0)
+ {
+ return -1;
+ }
+
+ return n;
}
@@ -783,6 +816,383 @@ TAO_Transport::generate_request_header (
}
int
+TAO_Transport::handle_input_i (ACE_HANDLE h,
+ ACE_Time_Value * max_wait_time,
+ int /*block*/)
+{
+ // The buffer on the stack which will be used to hold the input
+ // messages
+ char buf [TAO_CONNECTION_HANDLER_BUF_SIZE];
+
+#if defined (ACE_HAS_PURIFY)
+ (void) ACE_OS::memset (buf,
+ '\0',
+ sizeof buf);
+#endif /* ACE_HAS_PURIFY */
+
+ // Create a data block
+ ACE_Data_Block db (sizeof (buf),
+ ACE_Message_Block::MB_DATA,
+ buf,
+ this->orb_core_->message_block_buffer_allocator (),
+ this->orb_core_->locking_strategy (),
+ ACE_Message_Block::DONT_DELETE,
+ this->orb_core_->message_block_dblock_allocator ());
+
+ // Create a message block
+ ACE_Message_Block message_block (&db,
+ ACE_Message_Block::DONT_DELETE,
+ this->orb_core_->message_block_msgblock_allocator ());
+
+
+ // Align the message block
+ ACE_CDR::mb_align (&message_block);
+
+ // Read the message into the message block that we have created on
+ // the stack.
+ ssize_t n =
+ this->recv (message_block.rd_ptr (),
+ message_block.space (),
+ max_wait_time);
+
+
+ if (n <= 0)
+ return n;
+
+
+ // Set the write pointer in the stack buffer
+ message_block.wr_ptr (n);
+
+ if (this->parse_incoming_messages (message_block) == -1)
+ return -1;
+
+ // Check whether we have a complete message for processing
+ size_t missing_data =
+ this->missing_data (message_block);
+
+ if (missing_data)
+ {
+ return this->consolidate_message (message_block,
+ missing_data,
+ h,
+ max_wait_time);
+ }
+
+
+ // @@Bala:
+ return this->process_parsed_messages (
+ message_block,
+ this->messaging_object ()->byte_order (),
+ h);
+}
+
+
+
+int
+TAO_Transport::parse_incoming_messages (ACE_Message_Block &message_block)
+{
+ // If we have a queue and if the last message is not complete a
+ // complete one, then this read will get us the remaining data. So
+ // do not try to parse the header if we have an incomplete message
+ // in the queue.
+ if (!this->incoming_message_queue_.is_complete_message ())
+ {
+ return 0;
+ }
+
+ // Now that a new message has been read, process the message. Call
+ // the messaging object to do the parsing
+ if (this->messaging_object ()->parse_incoming_messages (message_block) == -1)
+ {
+ if (TAO_debug_level > 0)
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("TAO (%P|%t) - %p\n"),
+ ACE_TEXT ("TAO (%P|%t) - error in incoming message \n")));
+
+ this->tms_->connection_closed ();
+ return -1;
+ }
+
+ return 0;
+}
+
+
+size_t
+TAO_Transport::missing_data (ACE_Message_Block &incoming)
+{
+ // If we have message in the queue then find out how much of data
+ // is required to get a complete message
+ if (this->incoming_message_queue_.queue_length ())
+ {
+ return this->incoming_message_queue_.missing_data ();
+ }
+
+ return this->messaging_object ()->missing_data (incoming);
+}
+
+
+int
+TAO_Transport::consolidate_message (ACE_Message_Block &incoming,
+ size_t missing_data,
+ ACE_HANDLE h,
+ ACE_Time_Value *max_wait_time)
+{
+ // The write pointer which will be used for reading data from the
+ // socket.
+ if (!this->incoming_message_queue_.is_complete_message ())
+ {
+ return this->consolidate_message_queue (incoming,
+ missing_data,
+ h,
+ max_wait_time);
+ }
+
+ // Calculate the actual length of the load that we are supposed to
+ // read which is equal to the <missing_data> + length of the buffer
+ // that we have..
+ size_t payload = missing_data + incoming.length ();
+
+ // Grow the buffer to the size of the message
+ ACE_CDR::grow (&incoming,
+ payload);
+
+ // .. do a read on the socket again.
+ ssize_t n = this->recv (incoming.wr_ptr (),
+ missing_data,
+ max_wait_time);
+
+ // If we got an EWOULDBLOCK or some other error..
+ if (n <= 0)
+ return n;
+
+ // Move the write pointer
+ incoming.wr_ptr (n);
+
+ // ..Decrement
+ missing_data -= n;
+
+ // Get the byte order information
+ CORBA::Octet byte_order =
+ this->messaging_object ()->byte_order ();
+
+ if (missing_data > 0)
+ {
+ // Duplicate the message block
+ ACE_Message_Block *mb =
+ incoming.duplicate ();
+
+ // Stick the message in queue with the byte order information
+ if (this->incoming_message_queue_.add_message (mb,
+ missing_data,
+ byte_order) == -1)
+ {
+ return -1;
+ }
+ return 0;
+ }
+
+ // Now we have a full message in our buffer. Just go ahead and
+ // process that
+ return this->process_parsed_messages (incoming,
+ byte_order,
+ h);
+}
+
+int
+TAO_Transport::consolidate_message_queue (ACE_Message_Block &incoming,
+ size_t missing_data,
+ ACE_HANDLE h,
+ ACE_Time_Value *max_wait_time)
+{
+ // If the queue did not have a complete message put this piece of
+ // message in the queue
+ this->incoming_message_queue_.copy_message (incoming);
+ missing_data = this->incoming_message_queue_.missing_data ();
+
+ if (missing_data > 0)
+ {
+ // Read the message into the last node of the message queue..
+ ssize_t n = this->recv (this->incoming_message_queue_.wr_ptr (),
+ missing_data,
+ max_wait_time);
+
+ // Error...
+ if (n <= 0)
+ return n;
+
+ // Move the write pointer
+ incoming.wr_ptr (n);
+
+ // Decrement the missing data
+ this->incoming_message_queue_.queued_data_->missing_data_ -= n;
+ }
+
+
+ if (!this->incoming_message_queue_.is_complete_message ())
+ {
+ return 0;
+ }
+
+ CORBA::Octet byte_order = 0;
+
+ // Get the message on the head of the queue..
+ ACE_Message_Block *msg_block =
+ this->incoming_message_queue_.dequeue_head (byte_order);
+
+ // Process the message...
+ if (this->process_parsed_messages (*msg_block,
+ byte_order,
+ h) == -1)
+ return -1;
+
+ // Delete the message block...
+ delete msg_block;
+
+ return 0;
+}
+int
+TAO_Transport::process_parsed_messages (ACE_Message_Block &message_block,
+ CORBA::Octet byte_order,
+ ACE_HANDLE h)
+{
+ // If we have a complete message, just resume the handler
+ // Resume the handler.
+ // @@Bala: Try to solve this issue of reactor resumptions..
+ this->orb_core_->reactor ()->resume_handler (h);
+
+ // Get the <message_type> that we have received
+ TAO_Pluggable_Message_Type t =
+ this->messaging_object ()->message_type ();
+
+
+ int result = 0;
+ if (t == TAO_PLUGGABLE_MESSAGE_CLOSECONNECTION)
+ {
+ if (TAO_debug_level > 0)
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("TAO (%P|%t) - %p\n"),
+ ACE_TEXT ("Close Connection Message recd \n")));
+
+ // Close the TMS
+ this->tms_->connection_closed ();
+
+ // Return a "-1" so that the next stage can take care of
+ // closing connection and the necessary memory management.
+ return -1;
+ }
+ else if (t == TAO_PLUGGABLE_MESSAGE_REQUEST)
+ {
+ if (this->messaging_object ()->process_request_message (
+ this,
+ this->orb_core (),
+ message_block,
+ byte_order) == -1)
+ {
+ // Close the TMS
+ this->tms_->connection_closed ();
+
+ // Return a "-1" so that the next stage can take care of
+ // closing connection and the necessary memory management.
+ return -1;
+ }
+ }
+ else if (t == TAO_PLUGGABLE_MESSAGE_REPLY)
+ {
+ // @@Bala: Maybe the input_cdr can be constructed from the
+ // message_block
+ TAO_Pluggable_Reply_Params params (this->orb_core ());
+
+ if (this->messaging_object ()->process_reply_message (params,
+ message_block,
+ byte_order) == -1)
+ {
+ if (TAO_debug_level > 0)
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("TAO (%P|%t) - %p\n"),
+ ACE_TEXT ("IIOP_Transport::process_message, ")
+ ACE_TEXT ("process_reply_message ()")));
+
+ this->messaging_object ()->reset ();
+ this->tms_->connection_closed ();
+ return -1;
+ }
+
+ result = this->tms ()->dispatch_reply (params);
+
+ // @@ Somehow it seems dangerous to reset the state *after*
+ // dispatching the request, what if another threads receives
+ // another reply in the same connection?
+ // My guess is that it works as follows:
+ // - For the exclusive case there can be no such thread.
+ // - The the muxed case each thread has its own message_state.
+ // I'm pretty sure this comment is right. Could somebody else
+ // please look at it and confirm my guess?
+
+ // @@ The above comment was found in the older versions of the
+ // code. The code was also written in such a way that, when
+ // the client thread on a call from handle_input () from the
+ // reactor a call would be made on the handle_client_input
+ // (). The implementation of handle_client_input () looked so
+ // flaky. It used to create a message state upon entry in to
+ // the function using the TMS and destroy that on exit. All
+ // this was fine _theoretically_ for multiple threads. But
+ // the flakiness was originating in the implementation of
+ // get_message_state () where we were creating message state
+ // only once and dishing it out for every thread till one of
+ // them destroy's it. So, it looked broken. That has been
+ // changed. Why?. To my knowledge, the reactor does not call
+ // handle_input () on two threads at the same time. So, IMHO
+ // that defeats the purpose of creating a message state for
+ // every thread. This is just my guess. If we run in to
+ // problems this place needs to be revisited. If someone else
+ // is going to take a look please contact bala@cs.wustl.edu
+ // for details on this-- Bala
+
+ if (result == -1)
+ {
+ // Something really critical happened, we will forget about
+ // every reply on this connection.
+ if (TAO_debug_level > 0)
+ ACE_ERROR ((LM_ERROR,
+ ACE_TEXT ("TAO (%P|%t) : IIOP_Transport::")
+ ACE_TEXT ("process_message - ")
+ ACE_TEXT ("dispatch reply failed\n")));
+
+ this->messaging_object ()->reset ();
+ this->tms_->connection_closed ();
+ return -1;
+ }
+
+ if (result == 0)
+ {
+ this->messaging_object ()->reset ();
+
+ // The reply dispatcher was no longer registered.
+ // This can happened when the request/reply
+ // times out.
+ // To throw away all registered reply handlers is
+ // not the right thing, as there might be just one
+ // old reply coming in and several valid new ones
+ // pending. If we would invoke <connection_closed>
+ // we would throw away also the valid ones.
+ //return 0;
+ }
+
+
+ // This is a NOOP for the Exclusive request case, but it actually
+ // destroys the stream in the muxed case.
+ //this->tms_->destroy_message_state (message_state);
+ }
+ else if (t == TAO_PLUGGABLE_MESSAGE_MESSAGERROR)
+ {
+ return -1;
+ }
+
+ // If not, just return back..
+ return 0;
+}
+
+int
TAO_Transport::queue_is_empty (void)
{
ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->handler_lock_, -1);
diff --git a/TAO/tao/Transport.h b/TAO/tao/Transport.h
index 54f8539ae1d..789761ee4e0 100644
--- a/TAO/tao/Transport.h
+++ b/TAO/tao/Transport.h
@@ -19,15 +19,18 @@
#include "ace/pre.h"
#include "corbafwd.h"
+
+
+#if !defined (ACE_LACKS_PRAGMA_ONCE)
+# pragma once
+#endif /* ACE_LACKS_PRAGMA_ONCE */
+
#include "Exception.h"
#include "Transport_Descriptor_Interface.h"
#include "Transport_Cache_Manager.h"
#include "Transport_Timer.h"
#include "ace/Strategies.h"
-
-#if !defined (ACE_LACKS_PRAGMA_ONCE)
-# pragma once
-#endif /* ACE_LACKS_PRAGMA_ONCE */
+#include "Incoming_Message_Queue.h"
class TAO_ORB_Core;
class TAO_Target_Specification;
@@ -72,7 +75,7 @@ protected:
*
* <H3>The outgoing data path:</H3>
*
- * One of the responsabilities of the TAO_Transport class is to send
+ * One of the responsibilities of the TAO_Transport class is to send
* out GIOP messages as efficiently as possible. In most cases
* messages are put out in FIFO order, the transport object will put
* out the message using a single system call and return control to
@@ -136,7 +139,19 @@ protected:
*
* <H3>The incoming data path:</H3>
*
- * @todo Document the incoming data path design forces.
+ * One of the main responsibilities of the transport is to read and
+ * process the incoming GIOP message as quickly and efficiently as
+ * possible. There are other forces that needs to be given due
+ * consideration. They are
+ * - Multiple threads should read from the same handle
+ * - Reads on the handle could give one or more messages.
+ * - Minimise locking and copying overhead when trying to attack the
+ * above.
+ * <H3> Parsing messages (GIOP) & processing the message:</H3>
+ *
+ * The messages should be checked for validity and the right
+ * information should be sent to the higher layer for processing.
+ *
*
*
* <B>See Also:</B>
@@ -451,6 +466,33 @@ public:
TAO_Target_Specification &spec,
TAO_OutputCDR &msg);
+ /// Callback to read incoming data
+ /// @@ Bala: Change documentation here.... They dont make sense
+ /// anymore
+ /**
+ * The ACE_Event_Handler adapter invokes this method as part of its
+ * handle_input() operation.
+ *
+ * @todo: the method name is confusing! Calling it handle_input()
+ * would probably make things easier to understand and follow!
+ *
+ * Once a complete message is read the Transport class delegates on
+ * the Messaging layer to invoke the right upcall (on the server) or
+ * the TAO_Reply_Dispatcher (on the client side).
+ *
+ * @param max_wait_time In some cases the I/O is synchronous, e.g. a
+ * thread-per-connection server or when Wait_On_Read is enabled. In
+ * those cases a maximum read time can be specified.
+ *
+ * @param block Is deprecated and ignored.
+ *
+ */
+ // @@ lockme
+ virtual int handle_input_i (ACE_HANDLE h = ACE_INVALID_HANDLE,
+ ACE_Time_Value *max_wait_time = 0,
+ int block = 0);
+
+
/// Prepare the waiting and demuxing strategy to receive a reply for
/// a new request.
/**
@@ -501,29 +543,6 @@ public:
int is_synchronous = 1,
ACE_Time_Value *max_time_wait = 0) = 0;
- /// Callback to read incoming data
- /**
- * The ACE_Event_Handler adapter invokes this method as part of its
- * handle_input() operation.
- *
- * @todo: the method name is confusing! Calling it handle_input()
- * would probably make things easier to understand and follow!
- *
- * Once a complete message is read the Transport class delegates on
- * the Messaging layer to invoke the right upcall (on the server) or
- * the TAO_Reply_Dispatcher (on the client side).
- *
- * @param max_wait_time In some cases the I/O is synchronous, e.g. a
- * thread-per-connection server or when Wait_On_Read is enabled. In
- * those cases a maximum read time can be specified.
- *
- * @param block Is deprecated and ignored.
- *
- */
- // @@ lockme
- virtual int read_process_message (ACE_Time_Value *max_wait_time = 0,
- int block = 0) = 0;
-
protected:
/// Register the handler with the reactor.
/**
@@ -548,6 +567,28 @@ protected:
*/
virtual void transition_handler_state_i (void) = 0;
+ /// @@ Bala: Documentation
+ int parse_incoming_messages (ACE_Message_Block &message_block);
+
+ size_t missing_data (ACE_Message_Block &message_block);
+
+ int check_message_integrity (ACE_Message_Block &message_block);
+
+ int consolidate_message (ACE_Message_Block &incoming,
+ size_t missing_data,
+ ACE_HANDLE h,
+ ACE_Time_Value *max_wait_time);
+
+ int consolidate_message_queue (ACE_Message_Block &incoming,
+ size_t missing_data,
+ ACE_HANDLE h,
+ ACE_Time_Value *max_wait_time);
+
+ /// @@ Bala: Documentation
+ virtual int process_parsed_messages (ACE_Message_Block &message_block,
+ CORBA::Octet byte_order,
+ ACE_HANDLE h = ACE_INVALID_HANDLE);
+
public:
/// Method for the connection handler to signify that it
/// is being closed and destroyed.
@@ -622,6 +663,9 @@ public:
int handle_timeout (const ACE_Time_Value &current_time,
const void* act);
+ // @@ Bala : Add documentation
+ // int process_message (ACE_Message_Block &message_block) = 0;
+
private:
/// Send some of the data in the queue.
/**
@@ -747,6 +791,9 @@ protected:
TAO_Queued_Message *head_;
TAO_Queued_Message *tail_;
+ /// @@Bala: Docu??
+ TAO_Incoming_Message_Queue incoming_message_queue_;
+
/// The queue will start draining no later than <queing_deadline_>
/// *if* the deadline is
ACE_Time_Value current_deadline_;
diff --git a/TAO/tao/Wait_On_Read.cpp b/TAO/tao/Wait_On_Read.cpp
index 3320298e49d..f582e02d496 100644
--- a/TAO/tao/Wait_On_Read.cpp
+++ b/TAO/tao/Wait_On_Read.cpp
@@ -29,7 +29,9 @@ TAO_Wait_On_Read::wait (ACE_Time_Value * max_wait_time,
while (1)
{
retval =
- this->transport_->read_process_message (max_wait_time, 1);
+ this->transport_->handle_input_i (ACE_INVALID_HANDLE,
+ max_wait_time,
+ 1);
// If we got our reply, no need to run the loop any
// further.