From 9f15ca5b47d4851db1315591c15a36c15b5023c8 Mon Sep 17 00:00:00 2001 From: bala Date: Mon, 25 Jun 2001 12:57:34 +0000 Subject: ChangeLogTag: Mon Jun 25 07:54:31 2001 Balachandran Natarajan --- TAO/tao/Acceptor_Filter.cpp | 3 + TAO/tao/Adapter.cpp | 1 + TAO/tao/Any.cpp | 2 + TAO/tao/Asynch_Reply_Dispatcher.cpp | 14 +- TAO/tao/CDR.cpp | 2 + TAO/tao/CORBALOC_Parser.cpp | 1 + TAO/tao/ClientRequestInfo.cpp | 1 + TAO/tao/Connection_Handler.cpp | 3 +- TAO/tao/Connection_Handler.h | 6 +- TAO/tao/DomainC.cpp | 1 + TAO/tao/DomainC.i | 1 + TAO/tao/DynamicC.cpp | 1 + TAO/tao/DynamicC.i | 153 +++--- TAO/tao/DynamicInterface/DII_Reply_Dispatcher.cpp | 10 +- TAO/tao/Exception.cpp | 2 + TAO/tao/Exclusive_TMS.cpp | 4 - TAO/tao/GIOP_Message_Base.cpp | 137 ++++-- TAO/tao/GIOP_Message_Base.h | 39 +- TAO/tao/GIOP_Message_Reactive_Handler.cpp | 574 +++++----------------- TAO/tao/GIOP_Message_Reactive_Handler.h | 31 +- TAO/tao/GIOP_Message_Reactive_Handler.inl | 4 + TAO/tao/GIOP_Message_State.cpp | 305 +++++++++--- TAO/tao/GIOP_Message_State.h | 115 +++-- TAO/tao/GIOP_Message_State.i | 1 + TAO/tao/IIOP_Acceptor.cpp | 1 + TAO/tao/IIOP_Connection_Handler.cpp | 39 +- TAO/tao/IIOP_Connection_Handler.h | 6 +- TAO/tao/IIOP_Connector.cpp | 1 - TAO/tao/IIOP_Transport.cpp | 22 +- TAO/tao/IIOP_Transport.h | 4 +- TAO/tao/IORInfo.cpp | 22 +- TAO/tao/Invocation.cpp | 3 + TAO/tao/Invocation_Endpoint_Selectors.cpp | 1 + TAO/tao/LocalObject.cpp | 2 + TAO/tao/Makefile | 5 +- TAO/tao/MessagingC.h | 1 + TAO/tao/Muxed_TMS.cpp | 2 + TAO/tao/ORB.cpp | 10 +- TAO/tao/Object_Ref_Table.cpp | 2 + TAO/tao/Pluggable_Messaging.h | 18 +- TAO/tao/Pluggable_Messaging_Utils.cpp | 2 + TAO/tao/PolicyC.cpp | 1 + TAO/tao/PolicyC.h | 1 + TAO/tao/PolicyC.i | 1 + TAO/tao/PolicyFactory_Registry.cpp | 1 + TAO/tao/PortableInterceptor.pidl | 1 + TAO/tao/Profile.cpp | 20 +- TAO/tao/Synch_Reply_Dispatcher.cpp | 24 +- TAO/tao/TAO_Server_Request.cpp | 1 + TAO/tao/Transport.cpp | 412 +++++++++++++++- TAO/tao/Transport.h | 105 ++-- TAO/tao/Wait_On_Read.cpp | 4 +- 52 files changed, 1287 insertions(+), 836 deletions(-) diff --git a/TAO/tao/Acceptor_Filter.cpp b/TAO/tao/Acceptor_Filter.cpp index 8ba9afd45ee..98a3efb01d2 100644 --- a/TAO/tao/Acceptor_Filter.cpp +++ b/TAO/tao/Acceptor_Filter.cpp @@ -1,13 +1,16 @@ // $Id$ + #include "tao/Acceptor_Filter.h" #if !defined (__ACE_INLINE__) # include "Acceptor_Filter.i" #endif /* __ACE_INLINE__ */ + ACE_RCSID(tao, Acceptor_Filter, "$Id$") + TAO_Acceptor_Filter::~TAO_Acceptor_Filter (void) { } diff --git a/TAO/tao/Adapter.cpp b/TAO/tao/Adapter.cpp index 41c9e3a8bd9..1f363431999 100644 --- a/TAO/tao/Adapter.cpp +++ b/TAO/tao/Adapter.cpp @@ -1,5 +1,6 @@ // $Id$ + #include "tao/Adapter.h" #include "tao/Object.h" #include "tao/Object_KeyC.h" diff --git a/TAO/tao/Any.cpp b/TAO/tao/Any.cpp index 4a5c307d38e..35aadd83e34 100644 --- a/TAO/tao/Any.cpp +++ b/TAO/tao/Any.cpp @@ -1,5 +1,6 @@ // $Id$ + // Portions of this file are: // Copyright 1994-1995 by Sun Microsystems Inc. // All Rights Reserved @@ -18,6 +19,7 @@ ACE_RCSID(tao, Any, "$Id$") + CORBA::TypeCode_ptr CORBA_Any::type (void) const { diff --git a/TAO/tao/Asynch_Reply_Dispatcher.cpp b/TAO/tao/Asynch_Reply_Dispatcher.cpp index b4a20a82253..5e7b2adb3aa 100644 --- a/TAO/tao/Asynch_Reply_Dispatcher.cpp +++ b/TAO/tao/Asynch_Reply_Dispatcher.cpp @@ -1,6 +1,5 @@ // $Id$ - #include "tao/Asynch_Reply_Dispatcher.h" #include "tao/Pluggable_Messaging_Utils.h" @@ -48,11 +47,6 @@ TAO_Asynch_Reply_Dispatcher_Base::dispatch_reply ( return 0; } -/*TAO_GIOP_Message_State * -TAO_Asynch_Reply_Dispatcher_Base::message_state (void) -{ - return this->message_state_; -} */ void TAO_Asynch_Reply_Dispatcher_Base::dispatcher_bound (TAO_Transport *) @@ -113,11 +107,11 @@ TAO_Asynch_Reply_Dispatcher::dispatch_reply ( this->reply_status_ = params.reply_status_; - // this->message_state_ = message_state; - - // Steal the buffer so that no copying is done. - this->reply_cdr_.exchange_data_blocks (params.input_cdr_); + // Transfer the 's content to this->reply_cdr_ + ACE_Data_Block *db = + this->reply_cdr_.clone_from (params.input_cdr_); + ACE_UNUSED_ARG (db); // Steal the buffer, that way we don't do any unnecesary copies of // this data. CORBA::ULong max = params.svc_ctx_.maximum (); diff --git a/TAO/tao/CDR.cpp b/TAO/tao/CDR.cpp index 07eb03a7cf2..a08222e531f 100644 --- a/TAO/tao/CDR.cpp +++ b/TAO/tao/CDR.cpp @@ -41,9 +41,11 @@ # include "tao/CDR.i" #endif /* ! __ACE_INLINE__ */ + ACE_RCSID(tao, CDR, "$Id$") + #if defined (ACE_ENABLE_TIMEPROBES) static const char *TAO_CDR_Timeprobe_Description[] = diff --git a/TAO/tao/CORBALOC_Parser.cpp b/TAO/tao/CORBALOC_Parser.cpp index 13f09350804..ab4b228771a 100644 --- a/TAO/tao/CORBALOC_Parser.cpp +++ b/TAO/tao/CORBALOC_Parser.cpp @@ -1,5 +1,6 @@ // $Id$ + #include "CORBALOC_Parser.h" #include "ORB_Core.h" #include "Stub.h" diff --git a/TAO/tao/ClientRequestInfo.cpp b/TAO/tao/ClientRequestInfo.cpp index 8dab949ae98..281ca1ed6a7 100644 --- a/TAO/tao/ClientRequestInfo.cpp +++ b/TAO/tao/ClientRequestInfo.cpp @@ -2,6 +2,7 @@ // // $Id$ + #include "ClientRequestInfo.h" #include "Invocation.h" #include "Stub.h" diff --git a/TAO/tao/Connection_Handler.cpp b/TAO/tao/Connection_Handler.cpp index 985dbbf44c8..099e79d5eef 100644 --- a/TAO/tao/Connection_Handler.cpp +++ b/TAO/tao/Connection_Handler.cpp @@ -93,7 +93,8 @@ TAO_Connection_Handler::svc_i (void) while (!this->orb_core_->has_shutdown () && result >= 0) { - result = this->handle_input_i (ACE_INVALID_HANDLE, max_wait_time); + result = + this->transport ()->handle_input_i (ACE_INVALID_HANDLE, max_wait_time); if (result == -1 && errno == ETIME) { diff --git a/TAO/tao/Connection_Handler.h b/TAO/tao/Connection_Handler.h index 7caeccecd4b..6ca525bd1b0 100644 --- a/TAO/tao/Connection_Handler.h +++ b/TAO/tao/Connection_Handler.h @@ -101,9 +101,9 @@ protected: /// Object. int svc_i (void); - /// Need to be implemented by the underlying protocol objects - virtual int handle_input_i (ACE_HANDLE = ACE_INVALID_HANDLE, - ACE_Time_Value *max_wait_time = 0) = 0; +#if !defined (TAO_CONNECTION_HANDLER_BUF_SIZE) +# define TAO_CONNECTION_HANDLER_BUF_SIZE 1024 +#endif /*TAO_CONNECTION_HANDLER_BUF_SIZE */ private: diff --git a/TAO/tao/DomainC.cpp b/TAO/tao/DomainC.cpp index 7dfa513c009..f2e9509a4f6 100644 --- a/TAO/tao/DomainC.cpp +++ b/TAO/tao/DomainC.cpp @@ -2,6 +2,7 @@ // // $Id$ + // **** Code generated by the The ACE ORB (TAO) IDL Compiler **** // TAO and the TAO IDL Compiler have been developed by: // Center for Distributed Object Computing diff --git a/TAO/tao/DomainC.i b/TAO/tao/DomainC.i index 08c89453f73..b86dfb17929 100644 --- a/TAO/tao/DomainC.i +++ b/TAO/tao/DomainC.i @@ -2,6 +2,7 @@ // // $Id$ + // **** Code generated by the The ACE ORB (TAO) IDL Compiler **** // TAO and the TAO IDL Compiler have been developed by: // Center for Distributed Object Computing diff --git a/TAO/tao/DynamicC.cpp b/TAO/tao/DynamicC.cpp index 310487d401c..3ba3d039091 100644 --- a/TAO/tao/DynamicC.cpp +++ b/TAO/tao/DynamicC.cpp @@ -2,6 +2,7 @@ // // $Id$ + // **** Code generated by the The ACE ORB (TAO) IDL Compiler **** // TAO and the TAO IDL Compiler have been developed by: // Center for Distributed Object Computing diff --git a/TAO/tao/DynamicC.i b/TAO/tao/DynamicC.i index 09fd7a3ff51..b45699ab5bb 100644 --- a/TAO/tao/DynamicC.i +++ b/TAO/tao/DynamicC.i @@ -70,7 +70,7 @@ Dynamic::Parameter_var::operator= (const ::Dynamic::Parameter_var &p) { Parameter *deep_copy = new Parameter (*p.ptr_); - + if (deep_copy != 0) { Parameter *tmp = deep_copy; @@ -80,7 +80,7 @@ Dynamic::Parameter_var::operator= (const ::Dynamic::Parameter_var &p) } } } - + return *this; } @@ -103,20 +103,20 @@ Dynamic::Parameter_var::operator const ::Dynamic::Parameter &() const // cast } ACE_INLINE -Dynamic::Parameter_var::operator ::Dynamic::Parameter &() // cast +Dynamic::Parameter_var::operator ::Dynamic::Parameter &() // cast { return *this->ptr_; } ACE_INLINE -Dynamic::Parameter_var::operator ::Dynamic::Parameter &() const // cast +Dynamic::Parameter_var::operator ::Dynamic::Parameter &() const // cast { return *this->ptr_; } // variable-size types only ACE_INLINE -Dynamic::Parameter_var::operator ::Dynamic::Parameter *&() // cast +Dynamic::Parameter_var::operator ::Dynamic::Parameter *&() // cast { return this->ptr_; } @@ -133,7 +133,7 @@ Dynamic::Parameter_var::inout (void) return *this->ptr_; } -// mapping for variable size +// mapping for variable size ACE_INLINE ::Dynamic::Parameter *& Dynamic::Parameter_var::out (void) { @@ -194,7 +194,7 @@ Dynamic::Parameter_out::operator= (Parameter *p) return *this; } -ACE_INLINE +ACE_INLINE Dynamic::Parameter_out::operator ::Dynamic::Parameter *&() // cast { return this->ptr_; @@ -214,7 +214,7 @@ Dynamic::Parameter_out::operator-> (void) #if !defined (TAO_USE_SEQUENCE_TEMPLATES) - + #if !defined (__TAO_UNBOUNDED_SEQUENCE_DYNAMIC_PARAMETERLIST_CI_) #define __TAO_UNBOUNDED_SEQUENCE_DYNAMIC_PARAMETERLIST_CI_ @@ -227,24 +227,24 @@ Dynamic::Parameter_out::operator-> (void) ACE_NEW_RETURN (retval, Dynamic::Parameter[size], 0); return retval; } - + ACE_INLINE void Dynamic::_TAO_Unbounded_Sequence_Dynamic_ParameterList::freebuf (Dynamic::Parameter *buffer) // Free the sequence. { delete [] buffer; } - + ACE_INLINE Dynamic::_TAO_Unbounded_Sequence_Dynamic_ParameterList::_TAO_Unbounded_Sequence_Dynamic_ParameterList (void) // Default constructor. { } - + ACE_INLINE Dynamic::_TAO_Unbounded_Sequence_Dynamic_ParameterList::_TAO_Unbounded_Sequence_Dynamic_ParameterList (CORBA::ULong maximum) // Constructor using a maximum length value. : TAO_Unbounded_Base_Sequence (maximum, _TAO_Unbounded_Sequence_Dynamic_ParameterList::allocbuf (maximum)) { } - + ACE_INLINE Dynamic::_TAO_Unbounded_Sequence_Dynamic_ParameterList::_TAO_Unbounded_Sequence_Dynamic_ParameterList (CORBA::ULong maximum, CORBA::ULong length, @@ -253,7 +253,7 @@ Dynamic::Parameter_out::operator-> (void) : TAO_Unbounded_Base_Sequence (maximum, length, data, release) { } - + ACE_INLINE Dynamic::_TAO_Unbounded_Sequence_Dynamic_ParameterList::_TAO_Unbounded_Sequence_Dynamic_ParameterList (const _TAO_Unbounded_Sequence_Dynamic_ParameterList &rhs) // Copy constructor. @@ -263,10 +263,10 @@ Dynamic::Parameter_out::operator-> (void) { Dynamic::Parameter *tmp1 = _TAO_Unbounded_Sequence_Dynamic_ParameterList::allocbuf (this->maximum_); Dynamic::Parameter * const tmp2 = ACE_reinterpret_cast (Dynamic::Parameter * ACE_CAST_CONST, rhs.buffer_); - + for (CORBA::ULong i = 0; i < this->length_; ++i) tmp1[i] = tmp2[i]; - + this->buffer_ = tmp1; } else @@ -274,14 +274,14 @@ Dynamic::Parameter_out::operator-> (void) this->buffer_ = 0; } } - + ACE_INLINE Dynamic::_TAO_Unbounded_Sequence_Dynamic_ParameterList & Dynamic::_TAO_Unbounded_Sequence_Dynamic_ParameterList::operator= (const _TAO_Unbounded_Sequence_Dynamic_ParameterList &rhs) // Assignment operator. { if (this == &rhs) return *this; - + if (this->release_) { if (this->maximum_ < rhs.maximum_) @@ -294,18 +294,18 @@ Dynamic::Parameter_out::operator-> (void) } else this->buffer_ = _TAO_Unbounded_Sequence_Dynamic_ParameterList::allocbuf (rhs.maximum_); - + TAO_Unbounded_Base_Sequence::operator= (rhs); - + Dynamic::Parameter *tmp1 = ACE_reinterpret_cast (Dynamic::Parameter *, this->buffer_); Dynamic::Parameter * const tmp2 = ACE_reinterpret_cast (Dynamic::Parameter * ACE_CAST_CONST, rhs.buffer_); - + for (CORBA::ULong i = 0; i < this->length_; ++i) tmp1[i] = tmp2[i]; - + return *this; } - + // = Accessors. ACE_INLINE Dynamic::Parameter & Dynamic::_TAO_Unbounded_Sequence_Dynamic_ParameterList::operator[] (CORBA::ULong i) @@ -315,7 +315,7 @@ Dynamic::Parameter_out::operator-> (void) Dynamic::Parameter* tmp = ACE_reinterpret_cast(Dynamic::Parameter*,this->buffer_); return tmp[i]; } - + ACE_INLINE const Dynamic::Parameter & Dynamic::_TAO_Unbounded_Sequence_Dynamic_ParameterList::operator[] (CORBA::ULong i) const // operator [] @@ -324,9 +324,9 @@ Dynamic::Parameter_out::operator-> (void) Dynamic::Parameter * const tmp = ACE_reinterpret_cast (Dynamic::Parameter* ACE_CAST_CONST, this->buffer_); return tmp[i]; } - + // Implement the TAO_Base_Sequence methods (see Sequence.h) - + ACE_INLINE Dynamic::Parameter * Dynamic::_TAO_Unbounded_Sequence_Dynamic_ParameterList::get_buffer (CORBA::Boolean orphan) { @@ -360,13 +360,13 @@ Dynamic::Parameter_out::operator-> (void) } return result; } - + ACE_INLINE const Dynamic::Parameter * Dynamic::_TAO_Unbounded_Sequence_Dynamic_ParameterList::get_buffer (void) const { return ACE_reinterpret_cast(const Dynamic::Parameter * ACE_CAST_CONST, this->buffer_); } - + ACE_INLINE void Dynamic::_TAO_Unbounded_Sequence_Dynamic_ParameterList::replace (CORBA::ULong max, CORBA::ULong length, @@ -383,11 +383,11 @@ Dynamic::Parameter_out::operator-> (void) this->buffer_ = data; this->release_ = release; } - + #endif /* end #if !defined */ -#endif /* !TAO_USE_SEQUENCE_TEMPLATES */ +#endif /* !TAO_USE_SEQUENCE_TEMPLATES */ #if !defined (_DYNAMIC_PARAMETERLIST_CI_) #define _DYNAMIC_PARAMETERLIST_CI_ @@ -443,7 +443,7 @@ Dynamic::ParameterList_var::operator= (const ::Dynamic::ParameterList_var &p) { ParameterList *deep_copy = new ParameterList (*p.ptr_); - + if (deep_copy != 0) { ParameterList *tmp = deep_copy; @@ -453,7 +453,7 @@ Dynamic::ParameterList_var::operator= (const ::Dynamic::ParameterList_var &p) } } } - + return *this; } @@ -469,27 +469,27 @@ Dynamic::ParameterList_var::operator-> (void) return this->ptr_; } -ACE_INLINE +ACE_INLINE Dynamic::ParameterList_var::operator const ::Dynamic::ParameterList &() const // cast { return *this->ptr_; } -ACE_INLINE -Dynamic::ParameterList_var::operator ::Dynamic::ParameterList &() // cast +ACE_INLINE +Dynamic::ParameterList_var::operator ::Dynamic::ParameterList &() // cast { return *this->ptr_; } -ACE_INLINE -Dynamic::ParameterList_var::operator ::Dynamic::ParameterList &() const // cast +ACE_INLINE +Dynamic::ParameterList_var::operator ::Dynamic::ParameterList &() const // cast { return *this->ptr_; } // variable-size types only ACE_INLINE -Dynamic::ParameterList_var::operator ::Dynamic::ParameterList *&() // cast +Dynamic::ParameterList_var::operator ::Dynamic::ParameterList *&() // cast { return this->ptr_; } @@ -518,7 +518,7 @@ Dynamic::ParameterList_var::inout (void) return *this->ptr_; } -// mapping for variable size +// mapping for variable size ACE_INLINE ::Dynamic::ParameterList *& Dynamic::ParameterList_var::out (void) { @@ -579,7 +579,7 @@ Dynamic::ParameterList_out::operator= (ParameterList *p) return *this; } -ACE_INLINE +ACE_INLINE Dynamic::ParameterList_out::operator ::Dynamic::ParameterList *&() // cast { return this->ptr_; @@ -608,7 +608,7 @@ Dynamic::ParameterList_out::operator[] (CORBA::ULong index) #if !defined (TAO_USE_SEQUENCE_TEMPLATES) - + #if !defined (__TAO_UNBOUNDED_OBJECT_SEQUENCE_DYNAMIC_EXCEPTIONLIST_CI_) #define __TAO_UNBOUNDED_OBJECT_SEQUENCE_DYNAMIC_EXCEPTIONLIST_CI_ @@ -616,36 +616,36 @@ Dynamic::ParameterList_out::operator[] (CORBA::ULong index) Dynamic::_TAO_Unbounded_Object_Sequence_Dynamic_ExceptionList::allocbuf (CORBA::ULong nelems) { CORBA::TypeCode **buf = 0; - + ACE_NEW_RETURN (buf, CORBA::TypeCode*[nelems], 0); - + for (CORBA::ULong i = 0; i < nelems; i++) { buf[i] = CORBA::TypeCode::_nil (); } - + return buf; } - - ACE_INLINE void + + ACE_INLINE void Dynamic::_TAO_Unbounded_Object_Sequence_Dynamic_ExceptionList::freebuf (CORBA::TypeCode **buffer) { if (buffer == 0) return; delete[] buffer; } - + ACE_INLINE Dynamic::_TAO_Unbounded_Object_Sequence_Dynamic_ExceptionList::_TAO_Unbounded_Object_Sequence_Dynamic_ExceptionList (void) { } - + ACE_INLINE Dynamic::_TAO_Unbounded_Object_Sequence_Dynamic_ExceptionList::_TAO_Unbounded_Object_Sequence_Dynamic_ExceptionList (CORBA::ULong maximum) : TAO_Unbounded_Base_Sequence (maximum, _TAO_Unbounded_Object_Sequence_Dynamic_ExceptionList::allocbuf (maximum)) { } - + ACE_INLINE Dynamic::_TAO_Unbounded_Object_Sequence_Dynamic_ExceptionList::_TAO_Unbounded_Object_Sequence_Dynamic_ExceptionList (CORBA::ULong maximum, CORBA::ULong length, @@ -654,7 +654,7 @@ Dynamic::ParameterList_out::operator[] (CORBA::ULong index) : TAO_Unbounded_Base_Sequence (maximum, length, value, release) { } - + ACE_INLINE Dynamic::_TAO_Unbounded_Object_Sequence_Dynamic_ExceptionList::_TAO_Unbounded_Object_Sequence_Dynamic_ExceptionList(const _TAO_Unbounded_Object_Sequence_Dynamic_ExceptionList &rhs) : TAO_Unbounded_Base_Sequence (rhs) @@ -663,12 +663,12 @@ Dynamic::ParameterList_out::operator[] (CORBA::ULong index) { CORBA::TypeCode **tmp1 = _TAO_Unbounded_Object_Sequence_Dynamic_ExceptionList::allocbuf (this->maximum_); CORBA::TypeCode ** const tmp2 = ACE_reinterpret_cast (CORBA::TypeCode ** ACE_CAST_CONST, rhs.buffer_); - + for (CORBA::ULong i = 0; i < rhs.length_; ++i) { tmp1[i] = CORBA::TypeCode::_duplicate (tmp2[i]); } - + this->buffer_ = tmp1; } else @@ -676,17 +676,17 @@ Dynamic::ParameterList_out::operator[] (CORBA::ULong index) this->buffer_ = 0; } } - + ACE_INLINE Dynamic::_TAO_Unbounded_Object_Sequence_Dynamic_ExceptionList & Dynamic::_TAO_Unbounded_Object_Sequence_Dynamic_ExceptionList::operator= (const _TAO_Unbounded_Object_Sequence_Dynamic_ExceptionList &rhs) { if (this == &rhs) return *this; - + if (this->release_) { CORBA::TypeCode **tmp = ACE_reinterpret_cast (CORBA::TypeCode **, this->buffer_); - + for (CORBA::ULong i = 0; i < this->length_; ++i) { CORBA::release (tmp[i]); @@ -700,20 +700,20 @@ Dynamic::ParameterList_out::operator[] (CORBA::ULong index) } else this->buffer_ = _TAO_Unbounded_Object_Sequence_Dynamic_ExceptionList::allocbuf (rhs.maximum_); - + TAO_Unbounded_Base_Sequence::operator= (rhs); - + CORBA::TypeCode **tmp1 = ACE_reinterpret_cast (CORBA::TypeCode **, this->buffer_); CORBA::TypeCode ** const tmp2 = ACE_reinterpret_cast (CORBA::TypeCode ** ACE_CAST_CONST, rhs.buffer_); - + for (CORBA::ULong i = 0; i < rhs.length_; ++i) { tmp1[i] = CORBA::TypeCode::_duplicate (tmp2[i]); } - + return *this; } - + ACE_INLINE TAO_Pseudo_Object_Manager Dynamic::_TAO_Unbounded_Object_Sequence_Dynamic_ExceptionList::operator[] (CORBA::ULong index) const // read-write accessor @@ -722,7 +722,7 @@ Dynamic::ParameterList_out::operator[] (CORBA::ULong index) CORBA::TypeCode ** const tmp = ACE_reinterpret_cast (CORBA::TypeCode ** ACE_CAST_CONST, this->buffer_); return TAO_Pseudo_Object_Manager (tmp + index, this->release_); } - + ACE_INLINE CORBA::TypeCode* * Dynamic::_TAO_Unbounded_Object_Sequence_Dynamic_ExceptionList::get_buffer (CORBA::Boolean orphan) { @@ -756,18 +756,18 @@ Dynamic::ParameterList_out::operator[] (CORBA::ULong index) } return result; } - + ACE_INLINE const CORBA::TypeCode* * Dynamic::_TAO_Unbounded_Object_Sequence_Dynamic_ExceptionList::get_buffer (void) const { return ACE_reinterpret_cast(const CORBA::TypeCode ** ACE_CAST_CONST, this->buffer_); } - - + + #endif /* end #if !defined */ -#endif /* !TAO_USE_SEQUENCE_TEMPLATES */ +#endif /* !TAO_USE_SEQUENCE_TEMPLATES */ #if !defined (_DYNAMIC_EXCEPTIONLIST_CI_) #define _DYNAMIC_EXCEPTIONLIST_CI_ @@ -823,7 +823,7 @@ Dynamic::ExceptionList_var::operator= (const ::Dynamic::ExceptionList_var &p) { ExceptionList *deep_copy = new ExceptionList (*p.ptr_); - + if (deep_copy != 0) { ExceptionList *tmp = deep_copy; @@ -833,7 +833,7 @@ Dynamic::ExceptionList_var::operator= (const ::Dynamic::ExceptionList_var &p) } } } - + return *this; } @@ -849,27 +849,27 @@ Dynamic::ExceptionList_var::operator-> (void) return this->ptr_; } -ACE_INLINE +ACE_INLINE Dynamic::ExceptionList_var::operator const ::Dynamic::ExceptionList &() const // cast { return *this->ptr_; } -ACE_INLINE -Dynamic::ExceptionList_var::operator ::Dynamic::ExceptionList &() // cast +ACE_INLINE +Dynamic::ExceptionList_var::operator ::Dynamic::ExceptionList &() // cast { return *this->ptr_; } -ACE_INLINE -Dynamic::ExceptionList_var::operator ::Dynamic::ExceptionList &() const // cast +ACE_INLINE +Dynamic::ExceptionList_var::operator ::Dynamic::ExceptionList &() const // cast { return *this->ptr_; } // variable-size types only ACE_INLINE -Dynamic::ExceptionList_var::operator ::Dynamic::ExceptionList *&() // cast +Dynamic::ExceptionList_var::operator ::Dynamic::ExceptionList *&() // cast { return this->ptr_; } @@ -892,7 +892,7 @@ Dynamic::ExceptionList_var::inout (void) return *this->ptr_; } -// mapping for variable size +// mapping for variable size ACE_INLINE ::Dynamic::ExceptionList *& Dynamic::ExceptionList_var::out (void) { @@ -953,7 +953,7 @@ Dynamic::ExceptionList_out::operator= (ExceptionList *p) return *this; } -ACE_INLINE +ACE_INLINE Dynamic::ExceptionList_out::operator ::Dynamic::ExceptionList *&() // cast { return this->ptr_; @@ -989,7 +989,7 @@ ACE_INLINE CORBA::Boolean operator<< (TAO_OutputCDR &strm, const Dynamic::Parame return 1; else return 0; - + } ACE_INLINE CORBA::Boolean operator>> (TAO_InputCDR &strm, Dynamic::Parameter &_tao_aggregate) @@ -1001,7 +1001,7 @@ ACE_INLINE CORBA::Boolean operator>> (TAO_InputCDR &strm, Dynamic::Parameter &_t return 1; else return 0; - + } @@ -1033,4 +1033,3 @@ CORBA::Boolean TAO_Export operator>> ( ); #endif /* _TAO_CDR_OP_Dynamic_ExceptionList_I_ */ - diff --git a/TAO/tao/DynamicInterface/DII_Reply_Dispatcher.cpp b/TAO/tao/DynamicInterface/DII_Reply_Dispatcher.cpp index 5471cd4d1f3..ff807dfb5fe 100644 --- a/TAO/tao/DynamicInterface/DII_Reply_Dispatcher.cpp +++ b/TAO/tao/DynamicInterface/DII_Reply_Dispatcher.cpp @@ -2,11 +2,14 @@ + #include "DII_Reply_Dispatcher.h" + ACE_RCSID(DynamicInterface, DII_Reply_Dispatcher, "$Id$") + #include "Request.h" #include "tao/Pluggable.h" #include "tao/Environment.h" @@ -44,8 +47,11 @@ TAO_DII_Deferred_Reply_Dispatcher::dispatch_reply ( { this->reply_status_ = params.reply_status_; - // Steal the buffer so that no copying is done. - this->reply_cdr_.steal_from (params.input_cdr_); + // Transfer the 's content to this->reply_cdr_ + ACE_Data_Block *db = + this->reply_cdr_.clone_from (params.input_cdr_); + + ACE_UNUSED_ARG (db); // Steal the buffer, that way we don't do any unnecesary copies of // this data. diff --git a/TAO/tao/Exception.cpp b/TAO/tao/Exception.cpp index 9d9f12fe3e9..d33b1ebc4b8 100644 --- a/TAO/tao/Exception.cpp +++ b/TAO/tao/Exception.cpp @@ -1,5 +1,6 @@ // $Id$ + // THREADING NOTE: calling thread handles mutual exclusion policy // on all of these data structures. @@ -26,6 +27,7 @@ ACE_RCSID (TAO, Exception, "$Id$") + // Static initializers. ACE_Allocator *TAO_Exceptions::global_allocator_; diff --git a/TAO/tao/Exclusive_TMS.cpp b/TAO/tao/Exclusive_TMS.cpp index 3645be52122..47eff9ca3be 100644 --- a/TAO/tao/Exclusive_TMS.cpp +++ b/TAO/tao/Exclusive_TMS.cpp @@ -43,10 +43,6 @@ TAO_Exclusive_TMS::bind_dispatcher (CORBA::ULong request_id, this->request_id_ = request_id; this->rd_ = rd; - // If there was a previous reply, cleanup its state first. - // if (this->message_state_.message_size != 0) - // this->message_state_.reset (0); - return TAO_Transport_Mux_Strategy::bind_dispatcher (request_id, rd); } diff --git a/TAO/tao/GIOP_Message_Base.cpp b/TAO/tao/GIOP_Message_Base.cpp index 9ffb54cc572..e3882c63802 100644 --- a/TAO/tao/GIOP_Message_Base.cpp +++ b/TAO/tao/GIOP_Message_Base.cpp @@ -17,11 +17,11 @@ ACE_RCSID (tao, GIOP_Message_Base, "$Id$") + TAO_GIOP_Message_Base::TAO_GIOP_Message_Base (TAO_ORB_Core *orb_core, - size_t input_cdr_size) - : message_handler_ (orb_core, - this, - input_cdr_size), + size_t /*input_cdr_size*/) + : message_state_ (orb_core, + this), output_ (0), generator_parser_ (0) { @@ -62,10 +62,10 @@ TAO_GIOP_Message_Base::init (CORBA::Octet major, void -TAO_GIOP_Message_Base::reset (int reset_flag) +TAO_GIOP_Message_Base::reset (int /*reset_flag*/) { // Reset the message state - this->message_handler_.reset (reset_flag); + // this->message_handler_.reset (reset_flag); } int @@ -184,10 +184,11 @@ TAO_GIOP_Message_Base::generate_reply_header ( int -TAO_GIOP_Message_Base::read_message (TAO_Transport *transport, +TAO_GIOP_Message_Base::read_message (TAO_Transport * /*transport*/, int /*block */, ACE_Time_Value * /*max_wait_time*/) { +#if 0 // Call the handler to read and do a simple parse of the header of // the message. int retval = @@ -217,6 +218,7 @@ TAO_GIOP_Message_Base::read_message (TAO_Transport *transport, state.giop_version.minor); } +#endif /* if 0*/ // We return 2, it is ugly. But the reactor semantics has made us to // limp :( return 2; @@ -286,7 +288,7 @@ TAO_GIOP_Message_Base::format_message (TAO_OutputCDR &stream) TAO_Pluggable_Message_Type TAO_GIOP_Message_Base::message_type (void) { - switch (this->message_handler_.message_state ().message_type) + switch (this->message_state_.message_type_) { case TAO_GIOP_REQUEST: case TAO_GIOP_LOCATEREQUEST: @@ -312,12 +314,62 @@ TAO_GIOP_Message_Base::message_type (void) return TAO_PLUGGABLE_MESSAGE_MESSAGERROR; } +int +TAO_GIOP_Message_Base::parse_incoming_messages (ACE_Message_Block &incoming) +{ + + + if (this->message_state_.parse_message_header (incoming) == -1) + { + return -1; + } + + // Set the state internally for parsing and generating messages + this->set_state (this->message_state_.giop_version_.major, + this->message_state_.giop_version_.minor); + return 0; +} + +size_t +TAO_GIOP_Message_Base::missing_data (ACE_Message_Block &incoming) +{ + // @@Bala: Look for fragmentation here.. + // If we had recd. fragmented messages and if the GIOP minor version + // is greater than 1, then include the FRAGMENT HEADER to calculate + // the effective length of the message + /*if (this->message_state_.more_fragments_ && + this->message_state_.giop_version_.minor > 1) + len -= TAO_GIOP_MESSAGE_FRAGMENT_HEADER; + */ + + size_t len = incoming.length (); + + if (len >= this->message_state_.message_size ()) + return 0; + + return this->message_state_.message_size () - len; +} + +CORBA::Octet +TAO_GIOP_Message_Base::byte_order (void) +{ + return this->message_state_.byte_order_; +} + +int +TAO_GIOP_Message_Base::is_message_complete (ACE_Message_Block & /*incoming*/) +{ + // @@Bala: Implement other cases + return 0; +} + int TAO_GIOP_Message_Base::process_request_message (TAO_Transport *transport, - TAO_ORB_Core *orb_core) + TAO_ORB_Core *orb_core, + ACE_Message_Block &incoming, + CORBA::Octet byte_order) { // Set the upcall thread - orb_core->lf_strategy ().set_upcall_thread (orb_core->leader_follower ()); // Reset the output CDR stream. @@ -325,31 +377,36 @@ TAO_GIOP_Message_Base::process_request_message (TAO_Transport *transport, this->output_->reset (); // Get the read and write positions before we steal data. - size_t rd_pos = this->message_handler_.rd_pos (); - size_t wr_pos = this->message_handler_.wr_pos (); + size_t rd_pos = incoming.rd_ptr () - incoming.base (); + size_t wr_pos = incoming.wr_ptr () - incoming.base (); + rd_pos += TAO_GIOP_MESSAGE_HEADER_LEN; + + this->dump_msg ("recv", + ACE_reinterpret_cast (u_char *, incoming.rd_ptr ()), + incoming.length ()); - TAO_GIOP_Message_State &state = - this->message_handler_.message_state (); // Create a input CDR stream. // NOTE: We use the same data block in which we read the message and // we pass it on to the higher layers of the ORB. So we dont to any // copies at all here. The same is also done in the higher layers. - TAO_InputCDR input_cdr (this->message_handler_.steal_data_block (), - 0, + + TAO_InputCDR input_cdr (incoming.data_block (), + ACE_Message_Block::DONT_DELETE, rd_pos, wr_pos, - this->message_handler_.message_state ().byte_order, - state.giop_version.major, - state.giop_version.minor, + byte_order, + this->message_state_.giop_version_.major, + this->message_state_.giop_version_.minor, orb_core); // Set giop version info for the outstream so that server replies // in correct GIOP version - output_->set_version (state.giop_version.major, state.giop_version.minor); + this->output_->set_version (this->message_state_.giop_version_.major, + this->message_state_.giop_version_.minor); // Reset the message handler to receive upcalls if any - this->message_handler_.reset (0); + // this->message_handler_.reset (0); // We know we have some request message. Check whether it is a // GIOP_REQUEST or GIOP_LOCATE_REQUEST to take action. @@ -358,7 +415,7 @@ TAO_GIOP_Message_Base::process_request_message (TAO_Transport *transport, // the stream and never touch that again for anything. We basically // loose ownership of the data_block. - switch (this->message_handler_.message_state ().message_type) + switch (this->message_state_.message_type_) { case TAO_GIOP_REQUEST: // Should be taken care by the state specific invocations. They @@ -379,31 +436,37 @@ TAO_GIOP_Message_Base::process_request_message (TAO_Transport *transport, int TAO_GIOP_Message_Base::process_reply_message ( - TAO_Pluggable_Reply_Params ¶ms + TAO_Pluggable_Reply_Params ¶ms, + ACE_Message_Block &incoming, + CORBA::Octet byte_order ) { + // Get the read and write positions before we steal data. - size_t rd_pos = this->message_handler_.rd_pos (); - size_t wr_pos = this->message_handler_.wr_pos (); + size_t rd_pos = incoming.rd_ptr () - incoming.base (); + size_t wr_pos = incoming.wr_ptr () - incoming.base (); + rd_pos += TAO_GIOP_MESSAGE_HEADER_LEN; - TAO_GIOP_Message_State &state = - this->message_handler_.message_state (); + this->dump_msg ("recv", + ACE_reinterpret_cast (u_char *, incoming.rd_ptr ()), + incoming.length ()); - // Create a input CDR stream. + + // Create a empty buffer on stack // NOTE: We use the same data block in which we read the message and // we pass it on to the higher layers of the ORB. So we dont to any // copies at all here. The same is alos done in the higher layers. - TAO_InputCDR input_cdr (this->message_handler_.steal_data_block (), - 0, + TAO_InputCDR input_cdr (incoming.data_block (), + ACE_Message_Block::DONT_DELETE, rd_pos, wr_pos, - this->message_handler_.message_state ().byte_order, - state.giop_version.major, - state.giop_version.minor); + this->message_state_.giop_version_.major, + this->message_state_.giop_version_.minor, + byte_order); // Reset the message state. Now, we are ready for the next nested // upcall if any. - this->message_handler_.reset (0); + // this->message_handler_.reset (0); // We know we have some reply message. Check whether it is a // GIOP_REPLY or GIOP_LOCATE_REPLY to take action. @@ -412,7 +475,7 @@ TAO_GIOP_Message_Base::process_reply_message ( // the stream and never touch that again for anything. We basically // loose ownership of the data_block. - switch (this->message_handler_.message_state ().message_type) + switch (this->message_state_.message_type_) { case TAO_GIOP_REPLY: // Should be taken care by the state specific parsing @@ -1182,6 +1245,7 @@ TAO_GIOP_Message_Base::is_ready_for_bidirectional (void) int TAO_GIOP_Message_Base::more_messages (void) { +# if 0 int retval = this->message_handler_.is_message_ready (); @@ -1198,4 +1262,7 @@ TAO_GIOP_Message_Base::more_messages (void) state.giop_version.minor); return retval; +#endif + + return 0; } diff --git a/TAO/tao/GIOP_Message_Base.h b/TAO/tao/GIOP_Message_Base.h index ecb194b1148..1ca368005fe 100644 --- a/TAO/tao/GIOP_Message_Base.h +++ b/TAO/tao/GIOP_Message_Base.h @@ -22,8 +22,9 @@ #endif /* ACE_LACKS_PRAGMA_ONCE */ #include "tao/GIOP_Message_Generator_Parser_Impl.h" -#include "tao/GIOP_Message_Reactive_Handler.h" #include "tao/GIOP_Utils.h" +#include "tao/GIOP_Message_State.h" + class TAO_Pluggable_Reply_Params; @@ -41,7 +42,7 @@ class TAO_Pluggable_Reply_Params; class TAO_Export TAO_GIOP_Message_Base : public TAO_Pluggable_Messaging { public: - friend class TAO_GIOP_Message_Reactive_Handler; + // friend class TAO_GIOP_Message_Reactive_Handler; /// Constructor TAO_GIOP_Message_Base (TAO_ORB_Core *orb_core, @@ -100,18 +101,31 @@ public: /// TAO_PLUGGABLE_MESSAGE_MESSAGE_ERROR. virtual TAO_Pluggable_Message_Type message_type (void); + /// @@Bala:Documentation please... + virtual int parse_incoming_messages (ACE_Message_Block &message_block); + + /// @@Bala:Documentation please.. + virtual int is_message_complete (ACE_Message_Block &message_block); + /// @@Bala:Documentation please.. + virtual size_t missing_data (ACE_Message_Block &message_block); + + virtual CORBA::Octet byte_order (void); /// Process the request message that we have received on the /// connection virtual int process_request_message (TAO_Transport *transport, - TAO_ORB_Core *orb_core); + TAO_ORB_Core *orb_core, + ACE_Message_Block &block, + CORBA::Octet byte_order); /// Parse the reply message that we received and return the reply /// information though virtual int process_reply_message ( - TAO_Pluggable_Reply_Params &reply_info - ); + TAO_Pluggable_Reply_Params &reply_info, + ACE_Message_Block &block, + CORBA::Octet byte_order); + /// Generate a reply message with the exception . virtual int generate_exception_reply ( @@ -191,22 +205,11 @@ private: /// Thr message handler object that does reading and parsing of the /// incoming messages - TAO_GIOP_Message_Reactive_Handler message_handler_; + TAO_GIOP_Message_State message_state_; /// Output CDR TAO_OutputCDR *output_; - /// Allocators for the output CDR that we hold. As we cannot rely on - /// the resources from ORB Core we reserve our own resources. The - /// reason that we cannot believe the ORB core is that, for a - /// multi-threaded servers it dishes out resources cached in - /// TSS. This would be dangerous as TSS gets destroyed before we - /// would. So we have our own memory that we can rely on. - /// Implementations of GIOP that we have - ACE_Allocator *cdr_buffer_alloc_; - ACE_Allocator *cdr_dblock_alloc_; - ACE_Allocator *cdr_msgblock_alloc_; - /// A buffer that we will use to initialise the CDR stream char repbuf_[ACE_CDR::DEFAULT_BUFSIZE]; @@ -214,9 +217,9 @@ private: TAO_GIOP_Message_Generator_Parser_Impl tao_giop_impl_; protected: - /// The generator and parser state. TAO_GIOP_Message_Generator_Parser *generator_parser_; + }; diff --git a/TAO/tao/GIOP_Message_Reactive_Handler.cpp b/TAO/tao/GIOP_Message_Reactive_Handler.cpp index a8af560081a..da22b2e93ab 100644 --- a/TAO/tao/GIOP_Message_Reactive_Handler.cpp +++ b/TAO/tao/GIOP_Message_Reactive_Handler.cpp @@ -8,393 +8,95 @@ #include "tao/GIOP_Message_Base.h" #include "Transport.h" +#if 0 #if !defined (__ACE_INLINE__) # include "tao/GIOP_Message_Reactive_Handler.inl" #endif /* __ACE_INLINE__ */ -ACE_RCSID(tao, GIOP_Message_Reactive_Handler, "$Id$") +#endif /*if 0*/ -TAO_GIOP_Message_Reactive_Handler::TAO_GIOP_Message_Reactive_Handler (TAO_ORB_Core * orb_core, - TAO_GIOP_Message_Base *base, - size_t input_cdr_size) - : message_state_ (orb_core), - mesg_base_ (base), - message_status_ (TAO_GIOP_WAITING_FOR_HEADER), - message_size_ (input_cdr_size), - current_buffer_ (orb_core->data_block_for_message_block (input_cdr_size)), - supp_buffer_ (orb_core->data_block_for_message_block (input_cdr_size)) +ACE_RCSID(tao, GIOP_Message_Reactive_Handler, "$Id$") +TAO_GIOP_Message_Reactive_Handler::TAO_GIOP_Message_Reactive_Handler ( + TAO_ORB_Core * /*orb_core*/, + TAO_GIOP_Message_Base * /*base*/) { - // NOTE: The message blocks here use a locked allocator which is not - // from the TSS even if there is one. We are getting the allocators - // from the global memory. We shouldn't be using the TSS stuff for - // the following reason - // (a) The connection handlers are per-connection and not - // per-thread. - // (b) The order of cleaning is important if we use allocators from - // TSS. The TSS goes away when the threads go away. But the - // connection handlers go away only when the ORB decides to shut - // it down. - ACE_CDR::mb_align (&this->current_buffer_); - - // Calculate the effective message after alignment - this->message_size_ -= this->rd_pos (); -} - - - -int -TAO_GIOP_Message_Reactive_Handler::read_messages (TAO_Transport *transport) -{ - // Read the message from the transport. The size of the message read - // is the maximum size of the buffer that we have less the amount of - // data that has already been read in to the buffer. - ssize_t n = transport->recv (this->current_buffer_.wr_ptr (), - this->current_buffer_.space ()); - - if (n == -1) - { - if (errno == EWOULDBLOCK) - return 0; - - return -1; - } - // @@ What are the other error handling here?? - else if (n == 0) - { - return -1; - } - - if (TAO_debug_level == 5) - { - ACE_DEBUG ((LM_DEBUG, - "TAO (%P|%t) - GIOP_Message_Reactive_Handler::read_messages" - " received %d bytes\n", - n)); - - size_t len; - for (size_t offset = 0; offset < size_t(n); offset += len) - { - len = n - offset; - if (len > 512) - len = 512; - ACE_HEX_DUMP ((LM_DEBUG, - this->current_buffer_.wr_ptr () + offset, - len, - "TAO (%P|%t) - read_messages ")); - } - ACE_DEBUG ((LM_DEBUG, "TAO (%P|%t) - received %d bytes \n", n)); - } - - - // Now we have a succesful read. First adjust the write pointer - this->current_buffer_.wr_ptr (n); - - // Success - return 1; } -int -TAO_GIOP_Message_Reactive_Handler::parse_message_header (void) -{ - // Check what message are we waiting for and take suitable action - if (this->message_status_ == TAO_GIOP_WAITING_FOR_HEADER) - { - size_t len = this->current_buffer_.length (); - char *buf = this->current_buffer_.rd_ptr (); - - if (len > TAO_GIOP_MESSAGE_HEADER_LEN) - { - // Parse the GIOP header - if (this->parse_message_header_i (buf) == -1) - - return -1; - - int retval = this->parse_fragment_header (buf + TAO_GIOP_MESSAGE_HEADER_LEN, - len); - - // Set the pointer read pointer position in the - // - size_t pos = TAO_GIOP_MESSAGE_HEADER_LEN; - - if (retval) - { - // We had a fragment header, so the position should be - // beyond that - pos += TAO_GIOP_MESSAGE_FRAGMENT_HEADER; - } - - this->current_buffer_.rd_ptr (pos); - buf = this->current_buffer_.rd_ptr (); - - // The GIOP header has been parsed. Set the status to wait for - // payload - this->message_status_ = TAO_GIOP_WAITING_FOR_PAYLOAD; - - return 1; - } - - // We dont have sufficient information to decipher the GIOP - // header. Make sure that the reactor calls us back. - return -2; - } - - // The last read just "read" left-over messages - return 0; -} int -TAO_GIOP_Message_Reactive_Handler::is_message_ready (void) +TAO_GIOP_Message_Reactive_Handler::parse_message_header (ACE_Message_Block &incoming, + TAO_GIOP_Message_State &state) { - if (this->message_status_ == TAO_GIOP_WAITING_FOR_PAYLOAD) - { - size_t len = this->current_buffer_.length (); - char *buf = this->current_buffer_.rd_ptr (); - - // Set the buf pointer to the start of the GIOP header - buf -= TAO_GIOP_MESSAGE_HEADER_LEN; - - // Dump the incoming message . It will be dumped only if the - // debug level is greater than 5 anyway. - this->mesg_base_->dump_msg ( - "Recv msg", - ACE_reinterpret_cast (u_char *, - buf), - len + TAO_GIOP_MESSAGE_HEADER_LEN); - if (len == this->message_state_.message_size) - { - // If the buffer length is equal to the size of the payload we - // have exactly one message. - this->message_status_ = TAO_GIOP_WAITING_FOR_HEADER; - - // Check whether we have received only the first part of the - // fragment. - return this->message_state_.is_complete (this->current_buffer_); - } - else if (len > this->message_state_.message_size) - { - // If the length is greater we have received some X messages - // and a part of X + 1 messages (probably) with X varying - // from 1 to N. - this->message_status_ = TAO_GIOP_MULTIPLE_MESSAGES; - - // Clone the data that we read in. - this->supp_buffer_.data_block ( - this->current_buffer_.data_block ()->clone ()); - - // Set the read and write pointer for the supplementary - // buffer. - size_t rd_pos = this->rd_pos (); - this->supp_buffer_.rd_ptr (rd_pos + - this->message_state_.message_size); - this->supp_buffer_.wr_ptr (this->wr_pos ()); - - // Reset the current buffer - this->current_buffer_.reset (); - - // Set the read and write pointers again for the current - // buffer. We change the write pointer settings as we would - // like to process a single message. - this->current_buffer_.rd_ptr (rd_pos); - this->current_buffer_.wr_ptr (rd_pos + - this->message_state_.message_size); - - return this->message_state_.is_complete (this->current_buffer_); - } - } - else if (this->message_status_ == TAO_GIOP_MULTIPLE_MESSAGES) + // @@Bala: Need to make a check whether we are waiting for the + // header... + if (incoming.length () > TAO_GIOP_MESSAGE_HEADER_LEN) { - size_t len = this->supp_buffer_.length (); - - if (len > TAO_GIOP_MESSAGE_HEADER_LEN) - { - // @@ What about fragment headers??? - this->current_buffer_.copy (this->supp_buffer_.rd_ptr (), - TAO_GIOP_MESSAGE_HEADER_LEN); - - this->supp_buffer_.rd_ptr (TAO_GIOP_MESSAGE_HEADER_LEN); - - len = this->current_buffer_.length (); - char *buf = this->current_buffer_.rd_ptr (); - - if (this->parse_message_header_i (buf) == -1) - - return -1; - - // Set the pointer read pointer position in the - // - this->current_buffer_.rd_ptr (TAO_GIOP_MESSAGE_HEADER_LEN); - - // The GIOP header has been parsed. Set the status to wait for - // payload - this->message_status_ = TAO_GIOP_WAITING_FOR_PAYLOAD; - - return this->get_message (); - } - else - { - // We have smaller than the header size left here. We - // just copy the rest of the stuff and reset things so that - // we can read the rest of the stuff from the socket. - this->current_buffer_.copy ( - this->supp_buffer_.rd_ptr (), - len); - - // Reset the supp buffer now - this->supp_buffer_.reset (); - - this->message_status_ = TAO_GIOP_WAITING_FOR_HEADER; - } - + // Parse the GIOP header + if (this->parse_message_header_i (incoming, state) == -1) + return -1; } - // Just send us back to the reactor so that we can for more data to - // come in . return 0; } -ACE_Data_Block * -TAO_GIOP_Message_Reactive_Handler::steal_data_block (void) -{ - ACE_Data_Block *db = - this->current_buffer_.data_block ()->clone_nocopy (); - - ACE_Data_Block *old_db = - this->current_buffer_.replace_data_block (db); - - ACE_CDR::mb_align (&this->current_buffer_); - - return old_db; -} - - -void -TAO_GIOP_Message_Reactive_Handler::reset (int reset_flag) -{ - // Reset the contents of the message state - this->message_state_.reset (reset_flag); - - // Reset the current buffer - this->current_buffer_.reset (); - - ACE_CDR::mb_align (&this->current_buffer_); - - if (this->message_status_ != TAO_GIOP_MULTIPLE_MESSAGES) - { - this->supp_buffer_.reset (); - ACE_CDR::mb_align (&this->supp_buffer_); - } - -} - - int -TAO_GIOP_Message_Reactive_Handler::parse_message_header_i (char *buf) +TAO_GIOP_Message_Reactive_Handler::parse_message_header_i (ACE_Message_Block &incoming, + TAO_GIOP_Message_State &state) { if (TAO_debug_level > 8) { ACE_DEBUG ((LM_DEBUG, "TAO (%P|%t) - parsing header\n")); } - // Check whether we have a GIOP Message in the first place - if (this->parse_magic_bytes (buf) == -1) - return -1; + // Grab the rd_ptr_ from the message block.. + char *buf = incoming.rd_ptr (); - // Let us be specific that this is for 1.0 - if (this->message_state_.giop_version.minor == 0 && - this->message_state_.giop_version.major == 1) + // Parse the magic bytes first + if (this->parse_magic_bytes (buf) == -1) { - this->message_state_.byte_order = - buf[TAO_GIOP_MESSAGE_FLAGS_OFFSET]; - - if (this->message_state_.byte_order != 0 && - this->message_state_.byte_order != 1) - { - if (TAO_debug_level > 2) - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("TAO (%P|%t) invalid byte order <%d>") - ACE_TEXT (" for version <1.0>\n"), - this->message_state_.byte_order)); - return -1; - } + return -1; } - else - { - // Read the byte ORDER - this->message_state_.byte_order = - (CORBA::Octet) (buf[TAO_GIOP_MESSAGE_FLAGS_OFFSET]& 0x01); - // Read the fragment bit - this->message_state_.more_fragments = - (CORBA::Octet) (buf[TAO_GIOP_MESSAGE_FLAGS_OFFSET]& 0x02); + // Get the version information + if (this->get_version_info (buf, state) == -1) + return -1; - if ((buf[TAO_GIOP_MESSAGE_FLAGS_OFFSET] & ~0x3) != 0) - { - if (TAO_debug_level > 2) - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("TAO (%P|%t) invalid flags for <%d>") - ACE_TEXT (" for version <%d %d> \n"), - buf[TAO_GIOP_MESSAGE_FLAGS_OFFSET], - this->message_state_.giop_version.major, - this->message_state_.giop_version.minor)); - return -1; - } - } + // Get the byte order information... + if (this->get_byte_order_info (buf, state) == -1) + return -1; // Get the message type - this->message_state_.message_type = - buf[TAO_GIOP_MESSAGE_TYPE_OFFSET]; - - // Get the payload size. If the payload size is greater than the - // length then set the length of the message block to that - // size. Move the rd_ptr to the end of the GIOP header - this->message_state_.message_size = this->get_payload_size (buf); + state.message_type = buf[TAO_GIOP_MESSAGE_TYPE_OFFSET]; - // If the message_size or the payload_size is zero then something - // is fishy. So return an error. - if (this->message_state_.message_size == 0) - return -1; + // Get the payload size + this->get_payload_size (buf, state); - if (TAO_debug_level > 2) - { - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("TAO (%P|%t) Parsed header = <%d,%d,%d,%d,%d>\n"), - this->message_state_.giop_version.major, - this->message_state_.giop_version.minor, - this->message_state_.byte_order, - this->message_state_.message_type, - this->message_state_.message_size)); - } + // Parse the + int retval = this->parse_fragment_header (buf + TAO_GIOP_MESSAGE_HEADER_LEN, + len); - return 1; -} + // Set the pointer read pointer position in the + // + size_t pos = TAO_GIOP_MESSAGE_HEADER_LEN; + if (retval) + { + // We had a fragment header, so the position should be + // beyond that + pos += TAO_GIOP_MESSAGE_FRAGMENT_HEADER; + } -int -TAO_GIOP_Message_Reactive_Handler::parse_fragment_header (char *buf, - size_t length) -{ - size_t len = - TAO_GIOP_MESSAGE_FRAGMENT_HEADER + TAO_GIOP_MESSAGE_HEADER_LEN; - - // By this point we are doubly sure that we have a more or less - // valid GIOP message with a valid major revision number. - if (this->message_state_.more_fragments && - this->message_state_.giop_version.minor == 2 && - length > len) - { - // Fragmented message in GIOP 1.2 should have a fragment header - // following the GIOP header. Grab the rd_ptr to get that - // info. - this->message_state_.request_id = this->read_ulong (buf); + this->current_buffer_.rd_ptr (pos); + buf = this->current_buffer_.rd_ptr (); - // As we parsed the header - return 1; + // The GIOP header has been parsed. Set the status to wait for + // payload + this->message_status_ = TAO_GIOP_WAITING_FOR_PAYLOAD; } + return 1; - return 0; -} int @@ -406,7 +108,7 @@ TAO_GIOP_Message_Reactive_Handler::parse_magic_bytes (char *buf) && buf [2] == 0x4f // 'O' && buf [3] == 0x50)) // 'P' { - // For the present... + // For the present... if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("TAO (%P|%t) bad header, " @@ -418,12 +120,25 @@ TAO_GIOP_Message_Reactive_Handler::parse_magic_bytes (char *buf) return -1; } + return 0; +} + +int +TAO_GIOP_Message_Reactive_Handler::get_version_info (char *buf, + TAO_GIOP_Message_State &state) +{ + if (TAO_debug_level > 8) + { + ACE_DEBUG ((LM_DEBUG, "(%P|%t) Getting version info.. \n")); + } + // We have a GIOP message on hand. Get its revision numbers CORBA::Octet incoming_major = buf[TAO_GIOP_VERSION_MAJOR_OFFSET]; CORBA::Octet incoming_minor = buf[TAO_GIOP_VERSION_MINOR_OFFSET]; + // Check the revision information if (TAO_GIOP_Message_Generator_Parser_Impl::check_revision ( incoming_major, incoming_minor) == 0) @@ -439,55 +154,81 @@ TAO_GIOP_Message_Reactive_Handler::parse_magic_bytes (char *buf) } // Set the version - this->message_state_.giop_version.minor = incoming_minor; - this->message_state_.giop_version.major = incoming_major; + state.giop_version.minor = incoming_minor; + state.giop_version.major = incoming_major; return 0; } +int +TAO_GIOP_Message_Reactive_Handler::get_byte_order_info (char *buf, + TAO_GIOP_Message_State &message_state) +{ + if (TAO_debug_level > 8) + { + ACE_DEBUG ((LM_DEBUG, "(%P|%t) Getting byte order info.. \n")); + } + + // Let us be specific that this is for 1.0 + if (message_state.giop_version.minor == 0 && + message_state.giop_version.major == 1) + { + message_state_.byte_order = + buf[TAO_GIOP_MESSAGE_FLAGS_OFFSET]; + + if (message_state.byte_order != 0 && + message_state.byte_order != 1) + { + if (TAO_debug_level > 2) + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("TAO (%P|%t) invalid byte order <%d>") + ACE_TEXT (" for version <1.0>\n"), + message_state.byte_order)); + return -1; + } + } + else + { + // Read the byte ORDER + message_state.byte_order = + (CORBA::Octet) (buf[TAO_GIOP_MESSAGE_FLAGS_OFFSET]& 0x01); + + // Read the fragment bit + message_state.more_fragments = + (CORBA::Octet) (buf[TAO_GIOP_MESSAGE_FLAGS_OFFSET]& 0x02); + + if ((buf[TAO_GIOP_MESSAGE_FLAGS_OFFSET] & ~0x3) != 0) + { + if (TAO_debug_level > 2) + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("TAO (%P|%t) invalid flags for <%d>") + ACE_TEXT (" for version <%d %d> \n"), + buf[TAO_GIOP_MESSAGE_FLAGS_OFFSET], + message_state_.giop_version.major, + message_state_.giop_version.minor)); + return -1; + } + } + + return 0; +} CORBA::ULong -TAO_GIOP_Message_Reactive_Handler::get_payload_size (char *rd_ptr) +TAO_GIOP_Message_Reactive_Handler::get_payload_size (char *rd_ptr, + TAO_GIOP_Message_State &message_state) { // Move the read pointer rd_ptr += TAO_GIOP_MESSAGE_SIZE_OFFSET; - CORBA::ULong x = this->read_ulong (rd_ptr); - - if ((x + TAO_GIOP_MESSAGE_HEADER_LEN) > this->message_size_) - { - if (ACE_CDR::grow (&this->current_buffer_, - x + TAO_GIOP_MESSAGE_HEADER_LEN) == -1) - { - if (TAO_debug_level > 0) - ACE_ERROR ((LM_ERROR, - ACE_TEXT ("(%P | %t) Unable to increase the size \n") - ACE_TEXT ("of the buffer \n"))); - return 0; - } - - // New message size is the size of the now larger buffer. - this->message_size_ = x + - TAO_GIOP_MESSAGE_HEADER_LEN + - ACE_CDR::MAX_ALIGNMENT; - } - - // Set the read pointer to the end of the GIOP message - // this->current_buffer_.rd_ptr (TAO_GIOP_MESSAGE_HEADER_LEN); - return x; -} + CORBA::ULong x = 0; -CORBA::ULong -TAO_GIOP_Message_Reactive_Handler::read_ulong (const char *ptr) -{ size_t msg_size = 4; - char *buf = ACE_ptr_align_binary (ptr, + char *buf = ACE_ptr_align_binary (rd_ptr, msg_size); - CORBA::ULong x; #if !defined (ACE_DISABLE_SWAP_ON_READ) - if (!(this->message_state_.byte_order != ACE_CDR_BYTE_ORDER)) + if (!(state.byte_order != ACE_CDR_BYTE_ORDER)) { x = *ACE_reinterpret_cast (ACE_CDR::ULong*, buf); } @@ -499,76 +240,5 @@ TAO_GIOP_Message_Reactive_Handler::read_ulong (const char *ptr) x = *ACE_reinterpret_cast(ACE_CDR::ULong*, buf); #endif /* ACE_DISABLE_SWAP_ON_READ */ - return x; -} - - - - -int -TAO_GIOP_Message_Reactive_Handler::get_message (void) -{ - if (this->message_status_ == TAO_GIOP_WAITING_FOR_PAYLOAD) - { - size_t len = this->supp_buffer_.length (); - char * buf = - this->current_buffer_.rd_ptr (); - - buf -= TAO_GIOP_MESSAGE_HEADER_LEN; - - if (len == this->message_state_.message_size) - { - // If the buffer length is equal to the size of the payload we - // have exactly one message. Check whether we have received - // only the first part of the fragment. - this->message_status_ = TAO_GIOP_WAITING_FOR_HEADER; - this->current_buffer_.copy (this->supp_buffer_.rd_ptr (), - this->message_state_.message_size); - - // The message will be dumped only if the debug level is - // greater than 5 anyway. - this->mesg_base_->dump_msg ( - "Recv msg", - ACE_reinterpret_cast (u_char *, - buf), - len + - TAO_GIOP_MESSAGE_HEADER_LEN); - - this->supp_buffer_.rd_ptr (this->message_state_.message_size); - return this->message_state_.is_complete (this->current_buffer_); - } - else if (len > this->message_state_.message_size) - { - // If the length is greater we have received some X messages - // and a part of X + 1 messages (probably) with X varying - // from 1 to N. - this->message_status_ = TAO_GIOP_MULTIPLE_MESSAGES; - - this->current_buffer_.copy (this->supp_buffer_.rd_ptr (), - this->message_state_.message_size); - - // The message will be dumped only if the debug level is - // greater than 5 anyway. - this->mesg_base_->dump_msg ( - "Recv msg", - ACE_reinterpret_cast (u_char *, - buf), - len + - TAO_GIOP_MESSAGE_HEADER_LEN); - - this->supp_buffer_.rd_ptr (this->message_state_.message_size); - return this->message_state_.is_complete (this->current_buffer_); - } - else - { - // The remaining message in the supp buffer - this->current_buffer_.copy (this->supp_buffer_.rd_ptr (), - this->supp_buffer_.length ()); - - // Reset the supp buffer now - this->supp_buffer_.reset (); - } - } - - return 0; + message_state.message_size = x; } diff --git a/TAO/tao/GIOP_Message_Reactive_Handler.h b/TAO/tao/GIOP_Message_Reactive_Handler.h index b38a74c27c5..16c0503851e 100644 --- a/TAO/tao/GIOP_Message_Reactive_Handler.h +++ b/TAO/tao/GIOP_Message_Reactive_Handler.h @@ -13,29 +13,19 @@ #ifndef TAO_GIOP_MESSAGE_REACTIVE_HANDLER_H #define TAO_GIOP_MESSAGE_REACTIVE_HANDLER_H #include "ace/pre.h" -#include "ace/Message_Block.h" + #if !defined (ACE_LACKS_PRAGMA_ONCE) # pragma once #endif /* ACE_LACKS_PRAGMA_ONCE */ -#include "tao/GIOP_Message_State.h" -class TAO_Transport; + class TAO_ORB_Core; class TAO_GIOP_Message_Base; +class TAO_GIOP_Message_State; +class ACE_Message_Block; -enum TAO_GIOP_Message_Status -{ - /// The buffer is waiting for the header of the message yet - TAO_GIOP_WAITING_FOR_HEADER = 0, - - /// The buffer is waiting for the payload to appear on the socket - TAO_GIOP_WAITING_FOR_PAYLOAD, - - /// The buffer has got multiple messages - TAO_GIOP_MULTIPLE_MESSAGES -}; /** * @class TAO_GIOP_Message_Reactive_Handler @@ -54,6 +44,9 @@ enum TAO_GIOP_Message_Status * reading the header and the payload seperately. */ + + +#if 0 class TAO_Export TAO_GIOP_Message_Reactive_Handler { public: @@ -79,7 +72,7 @@ public: /// - We have sufficient info for processing the header and we /// processed it succesfully. (return 1); /// - Any errors in processing will return a -1. - int parse_message_header (void); + int parse_message_header (ACE_Message_Block &message_block); /// Check whether we have atleast one complete message ready for /// processing. @@ -173,7 +166,11 @@ private: /// is then sent to the higher layers of the ORB. ACE_Message_Block supp_buffer_; }; +#if defined (__ACE_INLINE__) +# include "tao/GIOP_Message_Reactive_Handler.inl" +#endif /* __ACE_INLINE__ */ +#endif const size_t TAO_GIOP_MESSAGE_HEADER_LEN = 12; const size_t TAO_GIOP_MESSAGE_SIZE_OFFSET = 8; @@ -183,9 +180,7 @@ const size_t TAO_GIOP_VERSION_MINOR_OFFSET = 5; const size_t TAO_GIOP_VERSION_MAJOR_OFFSET = 4; const size_t TAO_GIOP_MESSAGE_FRAGMENT_HEADER = 4; -#if defined (__ACE_INLINE__) -# include "tao/GIOP_Message_Reactive_Handler.inl" -#endif /* __ACE_INLINE__ */ + #include "ace/post.h" #endif /*TAO_GIOP_MESSAGE_REACTIVE_HANDLER_H*/ diff --git a/TAO/tao/GIOP_Message_Reactive_Handler.inl b/TAO/tao/GIOP_Message_Reactive_Handler.inl index 91211b34464..66b856377dc 100644 --- a/TAO/tao/GIOP_Message_Reactive_Handler.inl +++ b/TAO/tao/GIOP_Message_Reactive_Handler.inl @@ -3,6 +3,8 @@ + +#if 0 ACE_INLINE TAO_GIOP_Message_State & TAO_GIOP_Message_Reactive_Handler::message_state (void) { @@ -28,3 +30,5 @@ TAO_GIOP_Message_Reactive_Handler::wr_pos (void) const return this->current_buffer_.wr_ptr () - this->current_buffer_.base (); } + +#endif diff --git a/TAO/tao/GIOP_Message_State.cpp b/TAO/tao/GIOP_Message_State.cpp index c71c5a638e5..3171f3de61b 100644 --- a/TAO/tao/GIOP_Message_State.cpp +++ b/TAO/tao/GIOP_Message_State.cpp @@ -1,115 +1,278 @@ -// -*- C++ -*- - -//$Id$ +// $Id$ #include "tao/GIOP_Message_State.h" -#include "tao/GIOP_Utils.h" +#include "tao/GIOP_Message_Generator_Parser_Impl.h" #include "tao/ORB_Core.h" +#include "tao/Pluggable.h" +#include "tao/debug.h" +#include "tao/GIOP_Message_Base.h" +//#include "Transport.h" #if !defined (__ACE_INLINE__) -# include "tao/GIOP_Message_State.i" +# include "tao/GIOP_Message_State.inl" #endif /* __ACE_INLINE__ */ + ACE_RCSID(tao, GIOP_Message_State, "$Id$") - TAO_GIOP_Message_State::TAO_GIOP_Message_State (TAO_ORB_Core* /*orb_core*/) - : byte_order (TAO_ENCAP_BYTE_ORDER), - message_type (TAO_GIOP_MESSAGERROR), - message_size (0), - request_id (0), - // Problem similar to GIOP_Message_handler.cpp - Bala - fragmented_messages (ACE_CDR::DEFAULT_BUFSIZE), - more_fragments (0) +TAO_GIOP_Message_State::TAO_GIOP_Message_State ( + TAO_ORB_Core * /*orb_core*/, + TAO_GIOP_Message_Base * /*base*/) + : giop_version_ (TAO_DEF_GIOP_MAJOR, + TAO_DEF_GIOP_MINOR), + byte_order_ (0), + message_type_ (0), + message_size_ (0), + request_id_ (0), + more_fragments_ (0), + missing_data_ (0), + message_status_ (TAO_GIOP_WAITING_FOR_HEADER) { - //giop_version.major = TAO_DEF_GIOP_MAJOR; - //giop_version.minor = TAO_DEF_GIOP_MINOR; + +} + + +int +TAO_GIOP_Message_State::parse_message_header (ACE_Message_Block &incoming) +{ + // @@Bala: Need to make a check whether we are waiting for the + // header... + if (incoming.length () > TAO_GIOP_MESSAGE_HEADER_LEN) + { + // Parse the GIOP header + if (this->parse_message_header_i (incoming) == -1) + return -1; + } + + return 0; +} + +int +TAO_GIOP_Message_State::parse_message_header_i (ACE_Message_Block &incoming) +{ + if (TAO_debug_level > 8) + { + ACE_DEBUG ((LM_DEBUG, "TAO (%P|%t) - parsing header\n")); + } + + // Grab the rd_ptr_ from the message block.. + char *buf = incoming.rd_ptr (); + + // Parse the magic bytes first + if (this->parse_magic_bytes (buf) == -1) + { + return -1; + } + + // Get the version information + if (this->get_version_info (buf) == -1) + return -1; + + // Get the byte order information... + if (this->get_byte_order_info (buf) == -1) + return -1; + + // Get the message type + this->message_type_ = buf[TAO_GIOP_MESSAGE_TYPE_OFFSET]; + + // Get the size of the message.. + this->get_payload_size (buf); + + if (this->message_size_ == 0) + { + if (TAO_debug_level > 0) + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("(%P|%t) Message with size 0 recd.. \n"))); + return -1; + } + + if (this->more_fragments_) + { + // Parse the + /*int retval = */ + this->parse_fragment_header (buf, + incoming.length ()); + } + + // The GIOP header has been parsed. Set the status to wait for + // payload + this->message_status_ = TAO_GIOP_WAITING_FOR_PAYLOAD; + + return 0; } -TAO_GIOP_Message_State::~TAO_GIOP_Message_State (void) + + + +int +TAO_GIOP_Message_State::parse_magic_bytes (char *buf) { - // @@ Bala: this is not a very useful comment, is it? - //no-op + // The values are hard-coded to support non-ASCII platforms. + if (!(buf [0] == 0x47 // 'G' + && buf [1] == 0x49 // 'I' + && buf [2] == 0x4f // 'O' + && buf [3] == 0x50)) // 'P' + { + if (TAO_debug_level > 0) + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("TAO (%P|%t) bad header, " + "magic word [%2.2x,%2.2x,%2.2x,%2.2x]\n"), + buf[0], + buf[1], + buf[2], + buf[3])); + return -1; + } + + return 0; } int -TAO_GIOP_Message_State::is_complete (ACE_Message_Block ¤t_buf) +TAO_GIOP_Message_State::get_version_info (char *buf) { - if (this->more_fragments) + if (TAO_debug_level > 8) { - if (this->fragmented_messages.length () == 0) + ACE_DEBUG ((LM_DEBUG, "(%P|%t) Getting version info.. \n")); + } + + // We have a GIOP message on hand. Get its revision numbers + CORBA::Octet incoming_major = + buf[TAO_GIOP_VERSION_MAJOR_OFFSET]; + CORBA::Octet incoming_minor = + buf[TAO_GIOP_VERSION_MINOR_OFFSET]; + + // Check the revision information + if (TAO_GIOP_Message_Generator_Parser_Impl::check_revision ( + incoming_major, + incoming_minor) == 0) + { + if (TAO_debug_level > 0) { - this->first_fragment_byte_order = this->byte_order; - this->first_fragment_giop_version = this->giop_version; - this->first_fragment_message_type = this->message_type; - // this->fragments_end = this->fragments_begin = current; - this->fragmented_messages.copy (current_buf.rd_ptr (), - current_buf.length ()); - - // Reset the buffer - current_buf.reset (); - - // Reset our state - this->reset (); - return 0; + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("TAO (%P|%t|%N|%l) bad version <%d.%d>\n"), + incoming_major, incoming_minor)); } - return this->append_fragment (current_buf); + return -1; } - if (this->fragmented_messages.length () != 0) + // Set the version + this->giop_version_.minor = incoming_minor; + this->giop_version_.major = incoming_major; + + return 0; +} + +int +TAO_GIOP_Message_State::get_byte_order_info (char *buf) +{ + if (TAO_debug_level > 8) { - // This is the last message, but we must defragment before - // sending - if (this->append_fragment (current_buf) == -1) - return -1; + ACE_DEBUG ((LM_DEBUG, "(%P|%t) Getting byte order info.. \n")); + } - // Copy the entire message block into - current_buf.data_block (this->fragmented_messages.data_block ()->clone ()); + // Let us be specific that this is for 1.0 + if (this->giop_version_.minor == 0 && + this->giop_version_.major == 1) + { + this->byte_order_ = + buf[TAO_GIOP_MESSAGE_FLAGS_OFFSET]; - this->fragmented_messages.reset (); + if (this->byte_order_ != 0 && + this->byte_order_ != 1) + { + if (TAO_debug_level > 2) + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("TAO (%P|%t) invalid byte order <%d>") + ACE_TEXT (" for version <1.0>\n"), + this->byte_order_)); + return -1; + } + } + else + { + // Read the byte ORDER + this->byte_order_ = + (CORBA::Octet) (buf[TAO_GIOP_MESSAGE_FLAGS_OFFSET]& 0x01); - this->byte_order = this->first_fragment_byte_order; - this->giop_version = this->first_fragment_giop_version; - this->message_type = this->first_fragment_message_type; + // Read the fragment bit + this->more_fragments_ = + (CORBA::Octet) (buf[TAO_GIOP_MESSAGE_FLAGS_OFFSET]& 0x02); - // This message has no more fragments, and there where no fragments - // before it, just return. Notice that current_buf has the - // *right* contents + if ((buf[TAO_GIOP_MESSAGE_FLAGS_OFFSET] & ~0x3) != 0) + { + if (TAO_debug_level > 2) + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("TAO (%P|%t) invalid flags for <%d>") + ACE_TEXT (" for version <%d %d> \n"), + buf[TAO_GIOP_MESSAGE_FLAGS_OFFSET], + this->giop_version_.major, + this->giop_version_.minor)); + return -1; + } } + return 0; +} - return 1; +void +TAO_GIOP_Message_State::get_payload_size (char *rd_ptr) +{ + // Move the read pointer + rd_ptr += TAO_GIOP_MESSAGE_SIZE_OFFSET; + + this->message_size_ = this->read_ulong (rd_ptr); } + + int -TAO_GIOP_Message_State::append_fragment (ACE_Message_Block& current) +TAO_GIOP_Message_State::parse_fragment_header (char *buf, + size_t length) { - if (this->first_fragment_byte_order != this->byte_order - || this->first_fragment_giop_version.major != this->giop_version.major - || this->first_fragment_giop_version.minor != this->giop_version.minor) + size_t len = + TAO_GIOP_MESSAGE_FRAGMENT_HEADER + TAO_GIOP_MESSAGE_HEADER_LEN; + + buf += TAO_GIOP_MESSAGE_HEADER_LEN; + + // By this point we are doubly sure that we have a more or less + // valid GIOP message with a valid major revision number. + if (this->giop_version_.minor == 2 && length > len) { - // Yes, print it out in all debug levels!. This is an error by - // CORBA 2.4 spec - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("TAO (%P|%t) incompatible fragments:\n") - ACE_TEXT (" Different GIOP versions or byte order\n"))); - this->reset (); - return -1; + // Fragmented message in GIOP 1.2 should have a fragment header + // following the GIOP header. Grab the rd_ptr to get that + // info. + this->request_id_ = this->read_ulong (buf); + + // As we parsed the header + return 1; } - size_t req_size = - this->fragmented_messages.size () + current.length (); + return 0; +} - this->fragmented_messages.size (req_size); +CORBA::ULong +TAO_GIOP_Message_State::read_ulong (char *rd_ptr) +{ + CORBA::ULong x = 0; - // Copy the message - this->fragmented_messages.copy (current.rd_ptr (), - current.length ()); + size_t msg_size = 4; - current.reset (); + char *buf = ACE_ptr_align_binary (rd_ptr, + msg_size); - // Reset our state - this->reset (); +#if !defined (ACE_DISABLE_SWAP_ON_READ) + if (!(this->byte_order_ != ACE_CDR_BYTE_ORDER)) + { + x = *ACE_reinterpret_cast (ACE_CDR::ULong*, buf); + } + else + { + ACE_CDR::swap_4 (buf, ACE_reinterpret_cast (char*, &x)); + } +#else + x = *ACE_reinterpret_cast(ACE_CDR::ULong*, buf); +#endif /* ACE_DISABLE_SWAP_ON_READ */ - return 0; + return x; } diff --git a/TAO/tao/GIOP_Message_State.h b/TAO/tao/GIOP_Message_State.h index a71d53869ac..f6d7cd2d4a9 100644 --- a/TAO/tao/GIOP_Message_State.h +++ b/TAO/tao/GIOP_Message_State.h @@ -11,23 +11,19 @@ * * @author Chris Cleeland * @author Carlos O' Ryan + * @author modified by Balachandran Natarajan */ //============================================================================= - - #ifndef TAO_GIOP_MESSAGE_STATE_H #define TAO_GIOP_MESSAGE_STATE_H #include "ace/pre.h" -#include "tao/TAO_Export.h" +#include "tao/GIOP_Message_Version.h" #if !defined (ACE_LACKS_PRAGMA_ONCE) # pragma once #endif /* ACE_LACKS_PRAGMA_ONCE */ -#include "tao/corbafwd.h" -#include "tao/CDR.h" -#include "tao/GIOP_Message_Version.h" class TAO_ORB_Core; @@ -37,49 +33,98 @@ class TAO_ORB_Core; * * @brief Generic definitions for Message States. * + * @@Bala: More documentation please... + * * This represents the state of the incoming GIOP message * As the ORB processes incoming messages it needs to keep track of * how much of the message has been read, if there are any * fragments following this message etc. */ + + + class TAO_Export TAO_GIOP_Message_State { public: + friend class TAO_GIOP_Message_Base; + + + enum TAO_GIOP_Message_Status + { + /// The header of the message hasn't shown up yet. + TAO_GIOP_WAITING_FOR_HEADER = 0, + + /// The payload hasn't fully shown up in the yet + TAO_GIOP_WAITING_FOR_PAYLOAD, + + /// The message read has got multiple requests + TAO_GIOP_MULTIPLE_MESSAGES + }; + + /// @@Bala: Documentation please... + /// Parse the message header. + int parse_message_header (ACE_Message_Block &incoming); + + /// Return the message size + CORBA::ULong message_size (void) const; + + /// Return the message size + CORBA::ULong payload_size (void) const; + + /// Return the byte order information + CORBA::Octet byte_order (void) const; + +private: + /// Ctor - TAO_GIOP_Message_State (TAO_ORB_Core *orb_core); + TAO_GIOP_Message_State (TAO_ORB_Core *orb_core, + TAO_GIOP_Message_Base *base); - /// Dtor - ~TAO_GIOP_Message_State (void); + /// @@Bala: Documentation please... + int parse_message_header_i (ACE_Message_Block &incoming); - ///Reset the message header state and prepare it to receive the next - /// event. - void reset (int reset_contents = 1); + /// Checks for the magic word 'GIOP' in the start of the incoing + /// stream + int parse_magic_bytes (char *buf); - /// Has the header been received? - CORBA::Boolean header_received (void) const; + /// Extracts the version information from the incoming + /// stream. Performs a check for whether the version information is + /// right and sets the information in the + int get_version_info (char *buf); - /// Check if the current message is complete, adjusting the fragments - /// if required... - int is_complete (ACE_Message_Block ¤t_buf); + /// Extracts the byte order information from the incoming + /// stream. Performs a check for whether the byte order information + /// right and sets the information in the + int get_byte_order_info (char *buf); - /// Did we get fragmented messages? - int message_fragmented (void); + /// Gets the size of the payload and set the size in the + void get_payload_size (char *buf); - /// Version info - TAO_GIOP_Message_Version giop_version; + /// Parses the GIOP FRAGMENT_HEADER information from the incoming + /// stream. + int parse_fragment_header (char *buf, + size_t length); + + /// Read the unsigned long from the buffer. The should just + /// point to the next 4 bytes data that represent the ULong + CORBA::ULong read_ulong (char *buf); + +private: + + TAO_GIOP_Message_Version giop_version_; /// 0 = big, 1 = little - CORBA::Octet byte_order; + CORBA::Octet byte_order_; /// MsgType above - CORBA::Octet message_type; + CORBA::Octet message_type_; /// in byte_order! - CORBA::ULong message_size; + CORBA::ULong message_size_; /// Request Id from the Fragment header - CORBA::ULong request_id; + CORBA::ULong request_id_; /** * The fragments are collected in a chain of message blocks (using @@ -116,20 +161,26 @@ public: CORBA::Octet first_fragment_message_type; /// (Requests and Replys) - CORBA::Octet more_fragments; - -private: - /// Append to the list of fragments - /// Also resets the state, because the current message was consumed. - int append_fragment (ACE_Message_Block ¤t); + CORBA::Octet more_fragments_; + /// Missing data + CORBA::ULong missing_data_; + /// @@Bala: Documentation?? + TAO_GIOP_Message_Status message_status_; }; +const size_t TAO_GIOP_MESSAGE_HEADER_LEN = 12; +const size_t TAO_GIOP_MESSAGE_SIZE_OFFSET = 8; +const size_t TAO_GIOP_MESSAGE_FLAGS_OFFSET = 6; +const size_t TAO_GIOP_MESSAGE_TYPE_OFFSET = 7; +const size_t TAO_GIOP_VERSION_MINOR_OFFSET = 5; +const size_t TAO_GIOP_VERSION_MAJOR_OFFSET = 4; +const size_t TAO_GIOP_MESSAGE_FRAGMENT_HEADER = 4; #if defined (__ACE_INLINE__) -# include "tao/GIOP_Message_State.i" +# include "tao/GIOP_Message_State.inl" #endif /* __ACE_INLINE__ */ #include "ace/post.h" diff --git a/TAO/tao/GIOP_Message_State.i b/TAO/tao/GIOP_Message_State.i index 653bf47d6a8..b2633815ebf 100644 --- a/TAO/tao/GIOP_Message_State.i +++ b/TAO/tao/GIOP_Message_State.i @@ -1,6 +1,7 @@ // -*- C++ -*- //$Id$ + // **************************************************************** // @@ Bala: we use the stars to separate classes in ACE+TAO diff --git a/TAO/tao/IIOP_Acceptor.cpp b/TAO/tao/IIOP_Acceptor.cpp index 1a0e99030e2..4cfe224bae8 100644 --- a/TAO/tao/IIOP_Acceptor.cpp +++ b/TAO/tao/IIOP_Acceptor.cpp @@ -2,6 +2,7 @@ // $Id$ + #include "tao/IIOP_Acceptor.h" #include "tao/IIOP_Profile.h" #include "tao/MProfile.h" diff --git a/TAO/tao/IIOP_Connection_Handler.cpp b/TAO/tao/IIOP_Connection_Handler.cpp index c05a558efd3..28891e5e092 100644 --- a/TAO/tao/IIOP_Connection_Handler.cpp +++ b/TAO/tao/IIOP_Connection_Handler.cpp @@ -45,7 +45,7 @@ TAO_IIOP_Connection_Handler::TAO_IIOP_Connection_Handler (TAO_ORB_Core *orb_core { TAO_IIOP_Transport* specific_transport = 0; ACE_NEW(specific_transport, - TAO_IIOP_Transport(this, orb_core, 0)); + TAO_IIOP_Transport (this, orb_core, 0)); // store this pointer (indirectly increment ref count) this->transport(specific_transport); @@ -243,6 +243,12 @@ TAO_IIOP_Connection_Handler::fetch_handle (void) return this->get_handle (); } +int +TAO_IIOP_Connection_Handler::resume_handler (void) +{ + return 1; +} + int TAO_IIOP_Connection_Handler::handle_output (ACE_HANDLE) { @@ -313,45 +319,22 @@ int TAO_IIOP_Connection_Handler::handle_input (ACE_HANDLE h) { - return this->handle_input_i (h); -} - - -int -TAO_IIOP_Connection_Handler::handle_input_i (ACE_HANDLE, - ACE_Time_Value *max_wait_time) -{ + // Increase the reference count on the upcall that have passed us. this->pending_upcalls_++; - // Call the transport read the message - int result = this->transport ()->read_process_message (max_wait_time); - - // Now the message has been read - if (result == -1 && TAO_debug_level > 0) - { - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("TAO (%P|%t) - %p\n"), - ACE_TEXT ("IIOP_Connection_Handler::read_message \n"))); - - } + int retval = this->transport ()->handle_input_i (h); // The upcall is done. Bump down the reference count if (--this->pending_upcalls_ <= 0) - result = -1; + retval = -1; - if (result == -1 || - result == 1) - return result; - - return 0; + return retval; } - - // **************************************************************** #if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION) diff --git a/TAO/tao/IIOP_Connection_Handler.h b/TAO/tao/IIOP_Connection_Handler.h index fe220557d87..40a53456a4f 100644 --- a/TAO/tao/IIOP_Connection_Handler.h +++ b/TAO/tao/IIOP_Connection_Handler.h @@ -111,6 +111,10 @@ public: /// Return the underlying handle virtual ACE_HANDLE fetch_handle (void); + /// Send a TRUE value to the reactor, so that the reactor does not + /// resume the handler + virtual int resume_handler (void); + /// Use peer() to drain the outgoing message queue virtual int handle_output (ACE_HANDLE); @@ -131,8 +135,6 @@ protected: /// ensure that server threads eventually exit. virtual int handle_input (ACE_HANDLE = ACE_INVALID_HANDLE); - virtual int handle_input_i (ACE_HANDLE = ACE_INVALID_HANDLE, - ACE_Time_Value *max_wait_time = 0); private: diff --git a/TAO/tao/IIOP_Connector.cpp b/TAO/tao/IIOP_Connector.cpp index 0561c25189a..d3e5eb60938 100644 --- a/TAO/tao/IIOP_Connector.cpp +++ b/TAO/tao/IIOP_Connector.cpp @@ -17,7 +17,6 @@ ACE_RCSID (TAO, IIOP_Connector, "$Id$") - #if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION) template class ACE_Node; diff --git a/TAO/tao/IIOP_Transport.cpp b/TAO/tao/IIOP_Transport.cpp index a680226fd1e..997226316cc 100644 --- a/TAO/tao/IIOP_Transport.cpp +++ b/TAO/tao/IIOP_Transport.cpp @@ -16,7 +16,7 @@ #include "tao/ORB_Core.h" #include "tao/debug.h" #include "tao/GIOP_Message_Base.h" -#include "tao/GIOP_Message_Lite.h" +//#include "tao/GIOP_Message_Lite.h" #if !defined (__ACE_INLINE__) # include "tao/IIOP_Transport.i" @@ -26,12 +26,13 @@ ACE_RCSID (tao, IIOP_Transport, "$Id$") TAO_IIOP_Transport::TAO_IIOP_Transport (TAO_IIOP_Connection_Handler *handler, TAO_ORB_Core *orb_core, - CORBA::Boolean flag) + CORBA::Boolean /*flag*/) : TAO_Transport (TAO_TAG_IIOP_PROFILE, orb_core) , connection_handler_ (handler) , messaging_object_ (0) { +#if 0 if (flag) { // Use the lite version of the protocol @@ -39,6 +40,7 @@ TAO_IIOP_Transport::TAO_IIOP_Transport (TAO_IIOP_Connection_Handler *handler, TAO_GIOP_Message_Lite (orb_core)); } else +#endif { // Use the normal GIOP object ACE_NEW (this->messaging_object_, @@ -87,6 +89,7 @@ TAO_IIOP_Transport::recv_i (char *buf, } +#if 0 int TAO_IIOP_Transport::read_process_message (ACE_Time_Value *max_wait_time, int block) @@ -120,12 +123,13 @@ TAO_IIOP_Transport::read_process_message (ACE_Time_Value *max_wait_time, // See we use the reactor semantics again while (result > 0) { - result = this->process_message (); + // result = this->process_message (); } return result; } +#endif int TAO_IIOP_Transport::register_handler_i (void) @@ -261,6 +265,16 @@ TAO_IIOP_Transport::tear_listen_point_list (TAO_InputCDR &cdr) return this->connection_handler_->process_listen_point_list (listen_list); } + + + +#if 0 +int +TAO_IIOP_Transport::process_message (ACE_Message_Block &message_block) +{ + // Call the messaging object to process the read data +} + int TAO_IIOP_Transport::process_message (void) { @@ -384,7 +398,7 @@ TAO_IIOP_Transport::process_message (void) return 1; } - +#endif /*if 0 */ void TAO_IIOP_Transport::set_bidir_context_info (TAO_Operation_Details &opdetails) diff --git a/TAO/tao/IIOP_Transport.h b/TAO/tao/IIOP_Transport.h index 940ab085b2e..9de0e2c6c01 100644 --- a/TAO/tao/IIOP_Transport.h +++ b/TAO/tao/IIOP_Transport.h @@ -82,8 +82,10 @@ protected: size_t len, const ACE_Time_Value *s = 0); +#if 0 virtual int read_process_message (ACE_Time_Value *max_time_value = 0, int block =0); +#endif virtual int register_handler_i (void); @@ -119,7 +121,7 @@ public: private: /// Process the message that we have read - int process_message (void); + int process_message (ACE_Message_Block &message); /// Set the Bidirectional context info in the service context list void set_bidir_context_info (TAO_Operation_Details &opdetails); diff --git a/TAO/tao/IORInfo.cpp b/TAO/tao/IORInfo.cpp index ab7edb16ccb..0e3e2071078 100644 --- a/TAO/tao/IORInfo.cpp +++ b/TAO/tao/IORInfo.cpp @@ -2,6 +2,7 @@ // // $Id$ + #include "IORInfo.h" #include "PolicyC.h" #include "IOPC.h" @@ -9,10 +10,9 @@ ACE_RCSID (tao, IORInfo, "$Id$") - TAO_IORInfo::TAO_IORInfo (TAO_ORB_Core *orb_core, - TAO_MProfile &mp, - CORBA::PolicyList *policy_list) + TAO_MProfile &mp, + CORBA::PolicyList *policy_list) : orb_core_ (orb_core), mp_ (mp), policy_list_ (policy_list) @@ -25,7 +25,7 @@ TAO_IORInfo::~TAO_IORInfo (void) CORBA::Policy_ptr TAO_IORInfo::get_effective_policy (CORBA::PolicyType type, - CORBA::Environment &ACE_TRY_ENV) + CORBA::Environment &ACE_TRY_ENV) ACE_THROW_SPEC ((CORBA::SystemException)) { // Check the policy list supplied by the POA. @@ -34,12 +34,12 @@ TAO_IORInfo::get_effective_policy (CORBA::PolicyType type, for (CORBA::ULong i = 0; i < policy_count; ++i) { CORBA::PolicyType pt = - (*(this->policy_list_))[i]->policy_type ( + (*(this->policy_list_))[i]->policy_type ( TAO_ENV_SINGLE_ARG_PARAMETER); ACE_CHECK_RETURN (CORBA::Policy::_nil ()); if (pt == type) - return CORBA::Policy::_duplicate ((*(this->policy_list_))[i]); + return CORBA::Policy::_duplicate ((*(this->policy_list_))[i]); } // TODO: Now check the global ORB policies. @@ -47,12 +47,12 @@ TAO_IORInfo::get_effective_policy (CORBA::PolicyType type, ACE_THROW_RETURN (CORBA::INV_POLICY (TAO_OMG_VMCID | 2, CORBA::COMPLETED_NO), - CORBA::Policy::_nil ()); + CORBA::Policy::_nil ()); } void TAO_IORInfo::add_ior_component (const IOP::TaggedComponent &component, - CORBA::Environment &ACE_TRY_ENV) + CORBA::Environment &ACE_TRY_ENV) ACE_THROW_SPEC ((CORBA::SystemException)) { // Add the given tagged component to all profiles. @@ -85,12 +85,12 @@ TAO_IORInfo::add_ior_component_to_profile ( TAO_Profile *profile = this->mp_.get_profile (i); if (profile->tag () == profile_id) - { - profile->add_tagged_component (component, ACE_TRY_ENV); + { + profile->add_tagged_component (component, ACE_TRY_ENV); ACE_CHECK; found_profile = 1; - } + } } // According to the Portable Interceptor specification, we're diff --git a/TAO/tao/Invocation.cpp b/TAO/tao/Invocation.cpp index 5f975f1e7b5..c50a3166851 100644 --- a/TAO/tao/Invocation.cpp +++ b/TAO/tao/Invocation.cpp @@ -1,5 +1,6 @@ // $Id$ + #include "Invocation.h" #include "Principal.h" #include "Stub.h" @@ -29,8 +30,10 @@ # include "Invocation.i" #endif /* ! __ACE_INLINE__ */ + ACE_RCSID(tao, Invocation, "$Id$") + #if defined (ACE_ENABLE_TIMEPROBES) static const char *TAO_Invocation_Timeprobe_Description[] = diff --git a/TAO/tao/Invocation_Endpoint_Selectors.cpp b/TAO/tao/Invocation_Endpoint_Selectors.cpp index fafe05a84aa..1e9a83da328 100644 --- a/TAO/tao/Invocation_Endpoint_Selectors.cpp +++ b/TAO/tao/Invocation_Endpoint_Selectors.cpp @@ -14,6 +14,7 @@ ACE_RCSID(tao, Invocation_Endpoint_Selectors, "$Id$") + TAO_Invocation_Endpoint_Selector::~TAO_Invocation_Endpoint_Selector (void) { } diff --git a/TAO/tao/LocalObject.cpp b/TAO/tao/LocalObject.cpp index e8952ac76b2..2070ba6e8d1 100644 --- a/TAO/tao/LocalObject.cpp +++ b/TAO/tao/LocalObject.cpp @@ -2,6 +2,7 @@ // // $Id$ + #include "LocalObject.h" #if !defined (__ACE_INLINE__) @@ -14,6 +15,7 @@ ACE_RCSID (tao, LocalObject, "$Id$") + CORBA_LocalObject::~CORBA_LocalObject (void) { } diff --git a/TAO/tao/Makefile b/TAO/tao/Makefile index 980e6c4d87e..df75fcf4faf 100644 --- a/TAO/tao/Makefile +++ b/TAO/tao/Makefile @@ -1,6 +1,6 @@ #---------------------------------------------------------------------------- - +# # $Id$ # # Makefile for TAO @@ -106,8 +106,6 @@ PLUGGABLE_MESSAGING_FILES = \ Pluggable_Messaging \ Pluggable_Messaging_Utils \ GIOP_Message_Base \ - GIOP_Message_Lite \ - GIOP_Message_Reactive_Handler \ GIOP_Message_Generator_Parser \ GIOP_Message_Generator_Parser_10 \ GIOP_Message_Generator_Parser_11 \ @@ -118,6 +116,7 @@ PLUGGABLE_MESSAGING_FILES = \ target_specification \ GIOP_Message_State \ GIOP_Message_Version \ + Incoming_Message_Queue \ Tagged_Profile DEFAULT_RESOURCES_FILES = \ diff --git a/TAO/tao/MessagingC.h b/TAO/tao/MessagingC.h index 8795ab51435..7b35f808d59 100644 --- a/TAO/tao/MessagingC.h +++ b/TAO/tao/MessagingC.h @@ -2,6 +2,7 @@ // // $Id$ + // **** Code generated by the The ACE ORB (TAO) IDL Compiler **** // TAO and the TAO IDL Compiler have been developed by: // Center for Distributed Object Computing diff --git a/TAO/tao/Muxed_TMS.cpp b/TAO/tao/Muxed_TMS.cpp index ee5f9647379..5adb6ae2b7c 100644 --- a/TAO/tao/Muxed_TMS.cpp +++ b/TAO/tao/Muxed_TMS.cpp @@ -1,5 +1,6 @@ // $Id$ + #include "tao/Muxed_TMS.h" #include "tao/Reply_Dispatcher.h" #include "tao/GIOP_Message_Version.h" @@ -9,6 +10,7 @@ ACE_RCSID(tao, Muxed_TMS, "$Id$") + TAO_Muxed_TMS::TAO_Muxed_TMS (TAO_Transport *transport) : TAO_Transport_Mux_Strategy (transport), request_id_generator_ (0), diff --git a/TAO/tao/ORB.cpp b/TAO/tao/ORB.cpp index f54ba3a1d1f..0c0efbaf6db 100644 --- a/TAO/tao/ORB.cpp +++ b/TAO/tao/ORB.cpp @@ -1,6 +1,7 @@ // $Id$ + #include "ORB.h" #include "ORB_Table.h" #include "Connector_Registry.h" @@ -70,6 +71,7 @@ using std::set_unexpected; ACE_RCSID(tao, ORB, "$Id$") + static const char ior_prefix [] = "IOR:"; // = Static initialization. @@ -1755,6 +1757,7 @@ CORBA_ORB::object_to_string (CORBA::Object_ptr obj, CORBA::COMPLETED_NO), 0); + // Application writer controls what kind of objref strings they get, // maybe along with other things, by how they initialize the ORB. @@ -1981,8 +1984,11 @@ CORBA_ORB::ior_string_to_object (const char *str, int byte_order = *(mb.rd_ptr ()); mb.rd_ptr (1); mb.wr_ptr (len); - TAO_InputCDR stream (&mb, byte_order, TAO_DEF_GIOP_MAJOR, - TAO_DEF_GIOP_MINOR, this->orb_core_); + TAO_InputCDR stream (&mb, + byte_order, + TAO_DEF_GIOP_MAJOR, + TAO_DEF_GIOP_MINOR, + this->orb_core_); CORBA::Object_ptr objref = CORBA::Object::_nil (); stream >> objref; diff --git a/TAO/tao/Object_Ref_Table.cpp b/TAO/tao/Object_Ref_Table.cpp index c883296595d..f5892459696 100644 --- a/TAO/tao/Object_Ref_Table.cpp +++ b/TAO/tao/Object_Ref_Table.cpp @@ -2,6 +2,7 @@ // // $Id$ + #include "Object_Ref_Table.h" #include "Object.h" #include "Exception.h" @@ -13,6 +14,7 @@ ACE_RCSID (tao, Object_Ref_Table, "$Id$") + // **************************************************************** TAO_Object_Ref_Table::TAO_Object_Ref_Table (void) diff --git a/TAO/tao/Pluggable_Messaging.h b/TAO/tao/Pluggable_Messaging.h index eb8fda573b0..288d946999e 100644 --- a/TAO/tao/Pluggable_Messaging.h +++ b/TAO/tao/Pluggable_Messaging.h @@ -120,15 +120,29 @@ public: virtual void reset (int reset_flag = 1) = 0; // Reset the messaging object + /// @@ Bala: Documentation + virtual int parse_incoming_messages (ACE_Message_Block &message_block) = 0; + + /// @@Bala: Documentation please... + virtual int is_message_complete (ACE_Message_Block &message_block) = 0; + + virtual size_t missing_data (ACE_Message_Block &incoming) = 0; + + virtual CORBA::Octet byte_order (void) = 0; + /// Parse the request message, make an upcall and send the reply back /// to the "request initiator" virtual int process_request_message (TAO_Transport *transport, - TAO_ORB_Core *orb_core) = 0; + TAO_ORB_Core *orb_core, + ACE_Message_Block &m, + CORBA::Octet byte_order) = 0; /// Parse the reply message that we received and return the reply /// information though virtual int process_reply_message ( - TAO_Pluggable_Reply_Params &reply_info) = 0; + TAO_Pluggable_Reply_Params &reply_info, + ACE_Message_Block &m, + CORBA::Octet byte_order) = 0; /// Generate a reply message with the exception . virtual int generate_exception_reply ( diff --git a/TAO/tao/Pluggable_Messaging_Utils.cpp b/TAO/tao/Pluggable_Messaging_Utils.cpp index 8c6420c3694..fa73c68fa55 100644 --- a/TAO/tao/Pluggable_Messaging_Utils.cpp +++ b/TAO/tao/Pluggable_Messaging_Utils.cpp @@ -1,4 +1,5 @@ //$Id$ + #include "tao/Pluggable_Messaging_Utils.h" #include "tao/ORB_Core.h" @@ -8,6 +9,7 @@ ACE_RCSID(tao, Pluggable_Messaging_Utils, "$Id$") + TAO_Pluggable_Reply_Params::TAO_Pluggable_Reply_Params ( TAO_ORB_Core *orb_core ) diff --git a/TAO/tao/PolicyC.cpp b/TAO/tao/PolicyC.cpp index 6eb368913ae..84cec53c08d 100644 --- a/TAO/tao/PolicyC.cpp +++ b/TAO/tao/PolicyC.cpp @@ -2,6 +2,7 @@ // // $Id$ + // **** Code generated by the The ACE ORB (TAO) IDL Compiler **** // TAO and the TAO IDL Compiler have been developed by: // Center for Distributed Object Computing diff --git a/TAO/tao/PolicyC.h b/TAO/tao/PolicyC.h index 1adfc2f1126..f24161fff61 100644 --- a/TAO/tao/PolicyC.h +++ b/TAO/tao/PolicyC.h @@ -2,6 +2,7 @@ // // $Id$ + // **** Code generated by the The ACE ORB (TAO) IDL Compiler **** // TAO and the TAO IDL Compiler have been developed by: // Center for Distributed Object Computing diff --git a/TAO/tao/PolicyC.i b/TAO/tao/PolicyC.i index ceb80909d8b..1b2b0c267c7 100644 --- a/TAO/tao/PolicyC.i +++ b/TAO/tao/PolicyC.i @@ -2,6 +2,7 @@ // // $Id$ + // **** Code generated by the The ACE ORB (TAO) IDL Compiler **** // TAO and the TAO IDL Compiler have been developed by: // Center for Distributed Object Computing diff --git a/TAO/tao/PolicyFactory_Registry.cpp b/TAO/tao/PolicyFactory_Registry.cpp index 29910a7f3c7..814f6e0d1c4 100644 --- a/TAO/tao/PolicyFactory_Registry.cpp +++ b/TAO/tao/PolicyFactory_Registry.cpp @@ -2,6 +2,7 @@ // // $Id$ + #include "PolicyFactory_Registry.h" ACE_RCSID(tao, PolicyFactory_Registry, "$Id$") diff --git a/TAO/tao/PortableInterceptor.pidl b/TAO/tao/PortableInterceptor.pidl index e8421db4f8e..b2e4e8980a4 100644 --- a/TAO/tao/PortableInterceptor.pidl +++ b/TAO/tao/PortableInterceptor.pidl @@ -2,6 +2,7 @@ // // $Id$ + // ================================================================ // // This file was used to generate the code in PortableInterceptorC.* diff --git a/TAO/tao/Profile.cpp b/TAO/tao/Profile.cpp index b7b4f9b17a2..a6b816f6855 100644 --- a/TAO/tao/Profile.cpp +++ b/TAO/tao/Profile.cpp @@ -1,5 +1,6 @@ // $Id$ + #include "Profile.h" #include "Object_KeyC.h" @@ -13,6 +14,7 @@ ACE_RCSID(tao, Profile, "$Id$") + // **************************************************************** TAO_Profile::~TAO_Profile (void) @@ -261,9 +263,9 @@ TAO_Profile::verify_orb_configuration (CORBA::Environment &ACE_TRY_ENV) if (TAO_debug_level > 0) ACE_ERROR ((LM_ERROR, ACE_TEXT ("(%P|%t) Cannot add ") - ACE_TEXT ("IOP::TaggedComponent to profile.\n") - ACE_TEXT ("(%P|%t) Standard profile components ") - ACE_TEXT ("have been disabled or URL style IORs\n") + ACE_TEXT ("IOP::TaggedComponent to profile.\n") + ACE_TEXT ("(%P|%t) Standard profile components ") + ACE_TEXT ("have been disabled or URL style IORs\n") ACE_TEXT ("(%P|%t) are in use. Try ") ACE_TEXT ("\"-ORBStdProfileComponents 1\" and/or\n") ACE_TEXT ("(%P|%t) \"-ORBObjRefStyle IOR\".\n"))); @@ -276,8 +278,8 @@ TAO_Profile::verify_orb_configuration (CORBA::Environment &ACE_TRY_ENV) ACE_THROW (CORBA::BAD_PARAM ( CORBA_SystemException::_tao_minor_code ( TAO_DEFAULT_MINOR_CODE, - EINVAL), - CORBA::COMPLETED_NO)); + EINVAL), + CORBA::COMPLETED_NO)); } } @@ -292,8 +294,8 @@ TAO_Profile::verify_profile_version (CORBA::Environment &ACE_TRY_ENV) if (TAO_debug_level > 0) ACE_ERROR ((LM_ERROR, ACE_TEXT ("(%P|%t) Cannot add ") - ACE_TEXT ("IOP::TaggedComponent to GIOP 1.0") - ACE_TEXT ("IOR profile.\n") + ACE_TEXT ("IOP::TaggedComponent to GIOP 1.0") + ACE_TEXT ("IOR profile.\n") ACE_TEXT ("(%P|%t) Try using a GIOP 1.1 or ") ACE_TEXT ("greater endpoint.\n"))); @@ -305,8 +307,8 @@ TAO_Profile::verify_profile_version (CORBA::Environment &ACE_TRY_ENV) ACE_THROW (CORBA::BAD_PARAM ( CORBA_SystemException::_tao_minor_code ( TAO_DEFAULT_MINOR_CODE, - EINVAL), - CORBA::COMPLETED_NO)); + EINVAL), + CORBA::COMPLETED_NO)); } } diff --git a/TAO/tao/Synch_Reply_Dispatcher.cpp b/TAO/tao/Synch_Reply_Dispatcher.cpp index 70c17a67629..a2334851139 100644 --- a/TAO/tao/Synch_Reply_Dispatcher.cpp +++ b/TAO/tao/Synch_Reply_Dispatcher.cpp @@ -8,6 +8,7 @@ ACE_RCSID(tao, Synch_Reply_Dispatcher, "$Id$") + // Constructor. TAO_Synch_Reply_Dispatcher::TAO_Synch_Reply_Dispatcher ( TAO_ORB_Core *orb_core, @@ -61,20 +62,11 @@ TAO_Synch_Reply_Dispatcher::dispatch_reply ( // dispatcher is used because the request must be re-sent. //this->message_state_.reset (0); - // Steal the buffer so that no copying is done. - this->reply_cdr_.exchange_data_blocks (params.input_cdr_); - - /*if (&this->message_state_ != message_state) - { - // The Transport Mux Strategy did not use our Message_State to - // receive the event, possibly because it is muxing multiple - // requests over the same connection. - - // Steal the buffer so that no copying is done. - this->message_state_.cdr.steal_from (message_state->cdr); + // Transfer the 's content to this->reply_cdr_ + ACE_Data_Block *db = + this->reply_cdr_.clone_from (params.input_cdr_); - // There is no need to copy the other fields! - }*/ + ACE_UNUSED_ARG (db); if (this->wait_strategy_ != 0) { @@ -91,12 +83,6 @@ TAO_Synch_Reply_Dispatcher::dispatch_reply ( return 1; } -/*TAO_GIOP_Message_State * -TAO_Synch_Reply_Dispatcher::message_state (void) -{ - return &this->message_state_; -}*/ - void TAO_Synch_Reply_Dispatcher::dispatcher_bound (TAO_Transport *transport) { diff --git a/TAO/tao/TAO_Server_Request.cpp b/TAO/tao/TAO_Server_Request.cpp index c17d1f68ce1..bf1b278f813 100644 --- a/TAO/tao/TAO_Server_Request.cpp +++ b/TAO/tao/TAO_Server_Request.cpp @@ -1,5 +1,6 @@ // $Id$ + // Implementation of the Dynamic Server Skeleton Interface (for GIOP) #include "TAO_Server_Request.h" diff --git a/TAO/tao/Transport.cpp b/TAO/tao/Transport.cpp index eff531ec2df..cd15fe869d3 100644 --- a/TAO/tao/Transport.cpp +++ b/TAO/tao/Transport.cpp @@ -1,6 +1,7 @@ // -*- C++ -*- // $Id$ + #include "Transport.h" #include "Exception.h" @@ -26,6 +27,7 @@ ACE_RCSID(tao, Transport, "$Id$") + TAO_Synch_Refcountable::TAO_Synch_Refcountable (ACE_Lock *lock, int refcount) : ACE_Refcountable (refcount) , refcount_lock_ (lock) @@ -67,6 +69,7 @@ TAO_Transport::TAO_Transport (CORBA::ULong tag, , bidirectional_flag_ (-1) , head_ (0) , tail_ (0) + , incoming_message_queue_ (orb_core) , current_deadline_ (ACE_Time_Value::zero) , flush_timer_id_ (-1) , transport_timer_ (this) @@ -735,7 +738,37 @@ TAO_Transport::recv (char *buffer, return -1; // now call the template method - return this->recv_i (buffer, len, timeout); + ssize_t n = + this->recv_i (buffer, len, timeout); + + // Most of the errors handling is common for + // Now the message has been read + if (n == -1 && TAO_debug_level > 0) + { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("TAO (%P|%t) - %p \n"), + ACE_TEXT ("TAO - read message failure \n") + ACE_TEXT ("TAO - handle_input () \n"))); + } + + // Error handling + if (n == -1) + { + if (errno == EWOULDBLOCK) + return 0; + + // Close the connection + this->tms_->connection_closed (); + + return -1; + } + // @@ What are the other error handling here?? + else if (n == 0) + { + return -1; + } + + return n; } @@ -782,6 +815,383 @@ TAO_Transport::generate_request_header ( return 0; } +int +TAO_Transport::handle_input_i (ACE_HANDLE h, + ACE_Time_Value * max_wait_time, + int /*block*/) +{ + // The buffer on the stack which will be used to hold the input + // messages + char buf [TAO_CONNECTION_HANDLER_BUF_SIZE]; + +#if defined (ACE_HAS_PURIFY) + (void) ACE_OS::memset (buf, + '\0', + sizeof buf); +#endif /* ACE_HAS_PURIFY */ + + // Create a data block + ACE_Data_Block db (sizeof (buf), + ACE_Message_Block::MB_DATA, + buf, + this->orb_core_->message_block_buffer_allocator (), + this->orb_core_->locking_strategy (), + ACE_Message_Block::DONT_DELETE, + this->orb_core_->message_block_dblock_allocator ()); + + // Create a message block + ACE_Message_Block message_block (&db, + ACE_Message_Block::DONT_DELETE, + this->orb_core_->message_block_msgblock_allocator ()); + + + // Align the message block + ACE_CDR::mb_align (&message_block); + + // Read the message into the message block that we have created on + // the stack. + ssize_t n = + this->recv (message_block.rd_ptr (), + message_block.space (), + max_wait_time); + + + if (n <= 0) + return n; + + + // Set the write pointer in the stack buffer + message_block.wr_ptr (n); + + if (this->parse_incoming_messages (message_block) == -1) + return -1; + + // Check whether we have a complete message for processing + size_t missing_data = + this->missing_data (message_block); + + if (missing_data) + { + return this->consolidate_message (message_block, + missing_data, + h, + max_wait_time); + } + + + // @@Bala: + return this->process_parsed_messages ( + message_block, + this->messaging_object ()->byte_order (), + h); +} + + + +int +TAO_Transport::parse_incoming_messages (ACE_Message_Block &message_block) +{ + // If we have a queue and if the last message is not complete a + // complete one, then this read will get us the remaining data. So + // do not try to parse the header if we have an incomplete message + // in the queue. + if (!this->incoming_message_queue_.is_complete_message ()) + { + return 0; + } + + // Now that a new message has been read, process the message. Call + // the messaging object to do the parsing + if (this->messaging_object ()->parse_incoming_messages (message_block) == -1) + { + if (TAO_debug_level > 0) + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("TAO (%P|%t) - %p\n"), + ACE_TEXT ("TAO (%P|%t) - error in incoming message \n"))); + + this->tms_->connection_closed (); + return -1; + } + + return 0; +} + + +size_t +TAO_Transport::missing_data (ACE_Message_Block &incoming) +{ + // If we have message in the queue then find out how much of data + // is required to get a complete message + if (this->incoming_message_queue_.queue_length ()) + { + return this->incoming_message_queue_.missing_data (); + } + + return this->messaging_object ()->missing_data (incoming); +} + + +int +TAO_Transport::consolidate_message (ACE_Message_Block &incoming, + size_t missing_data, + ACE_HANDLE h, + ACE_Time_Value *max_wait_time) +{ + // The write pointer which will be used for reading data from the + // socket. + if (!this->incoming_message_queue_.is_complete_message ()) + { + return this->consolidate_message_queue (incoming, + missing_data, + h, + max_wait_time); + } + + // Calculate the actual length of the load that we are supposed to + // read which is equal to the + length of the buffer + // that we have.. + size_t payload = missing_data + incoming.length (); + + // Grow the buffer to the size of the message + ACE_CDR::grow (&incoming, + payload); + + // .. do a read on the socket again. + ssize_t n = this->recv (incoming.wr_ptr (), + missing_data, + max_wait_time); + + // If we got an EWOULDBLOCK or some other error.. + if (n <= 0) + return n; + + // Move the write pointer + incoming.wr_ptr (n); + + // ..Decrement + missing_data -= n; + + // Get the byte order information + CORBA::Octet byte_order = + this->messaging_object ()->byte_order (); + + if (missing_data > 0) + { + // Duplicate the message block + ACE_Message_Block *mb = + incoming.duplicate (); + + // Stick the message in queue with the byte order information + if (this->incoming_message_queue_.add_message (mb, + missing_data, + byte_order) == -1) + { + return -1; + } + return 0; + } + + // Now we have a full message in our buffer. Just go ahead and + // process that + return this->process_parsed_messages (incoming, + byte_order, + h); +} + +int +TAO_Transport::consolidate_message_queue (ACE_Message_Block &incoming, + size_t missing_data, + ACE_HANDLE h, + ACE_Time_Value *max_wait_time) +{ + // If the queue did not have a complete message put this piece of + // message in the queue + this->incoming_message_queue_.copy_message (incoming); + missing_data = this->incoming_message_queue_.missing_data (); + + if (missing_data > 0) + { + // Read the message into the last node of the message queue.. + ssize_t n = this->recv (this->incoming_message_queue_.wr_ptr (), + missing_data, + max_wait_time); + + // Error... + if (n <= 0) + return n; + + // Move the write pointer + incoming.wr_ptr (n); + + // Decrement the missing data + this->incoming_message_queue_.queued_data_->missing_data_ -= n; + } + + + if (!this->incoming_message_queue_.is_complete_message ()) + { + return 0; + } + + CORBA::Octet byte_order = 0; + + // Get the message on the head of the queue.. + ACE_Message_Block *msg_block = + this->incoming_message_queue_.dequeue_head (byte_order); + + // Process the message... + if (this->process_parsed_messages (*msg_block, + byte_order, + h) == -1) + return -1; + + // Delete the message block... + delete msg_block; + + return 0; +} +int +TAO_Transport::process_parsed_messages (ACE_Message_Block &message_block, + CORBA::Octet byte_order, + ACE_HANDLE h) +{ + // If we have a complete message, just resume the handler + // Resume the handler. + // @@Bala: Try to solve this issue of reactor resumptions.. + this->orb_core_->reactor ()->resume_handler (h); + + // Get the that we have received + TAO_Pluggable_Message_Type t = + this->messaging_object ()->message_type (); + + + int result = 0; + if (t == TAO_PLUGGABLE_MESSAGE_CLOSECONNECTION) + { + if (TAO_debug_level > 0) + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("TAO (%P|%t) - %p\n"), + ACE_TEXT ("Close Connection Message recd \n"))); + + // Close the TMS + this->tms_->connection_closed (); + + // Return a "-1" so that the next stage can take care of + // closing connection and the necessary memory management. + return -1; + } + else if (t == TAO_PLUGGABLE_MESSAGE_REQUEST) + { + if (this->messaging_object ()->process_request_message ( + this, + this->orb_core (), + message_block, + byte_order) == -1) + { + // Close the TMS + this->tms_->connection_closed (); + + // Return a "-1" so that the next stage can take care of + // closing connection and the necessary memory management. + return -1; + } + } + else if (t == TAO_PLUGGABLE_MESSAGE_REPLY) + { + // @@Bala: Maybe the input_cdr can be constructed from the + // message_block + TAO_Pluggable_Reply_Params params (this->orb_core ()); + + if (this->messaging_object ()->process_reply_message (params, + message_block, + byte_order) == -1) + { + if (TAO_debug_level > 0) + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("TAO (%P|%t) - %p\n"), + ACE_TEXT ("IIOP_Transport::process_message, ") + ACE_TEXT ("process_reply_message ()"))); + + this->messaging_object ()->reset (); + this->tms_->connection_closed (); + return -1; + } + + result = this->tms ()->dispatch_reply (params); + + // @@ Somehow it seems dangerous to reset the state *after* + // dispatching the request, what if another threads receives + // another reply in the same connection? + // My guess is that it works as follows: + // - For the exclusive case there can be no such thread. + // - The the muxed case each thread has its own message_state. + // I'm pretty sure this comment is right. Could somebody else + // please look at it and confirm my guess? + + // @@ The above comment was found in the older versions of the + // code. The code was also written in such a way that, when + // the client thread on a call from handle_input () from the + // reactor a call would be made on the handle_client_input + // (). The implementation of handle_client_input () looked so + // flaky. It used to create a message state upon entry in to + // the function using the TMS and destroy that on exit. All + // this was fine _theoretically_ for multiple threads. But + // the flakiness was originating in the implementation of + // get_message_state () where we were creating message state + // only once and dishing it out for every thread till one of + // them destroy's it. So, it looked broken. That has been + // changed. Why?. To my knowledge, the reactor does not call + // handle_input () on two threads at the same time. So, IMHO + // that defeats the purpose of creating a message state for + // every thread. This is just my guess. If we run in to + // problems this place needs to be revisited. If someone else + // is going to take a look please contact bala@cs.wustl.edu + // for details on this-- Bala + + if (result == -1) + { + // Something really critical happened, we will forget about + // every reply on this connection. + if (TAO_debug_level > 0) + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("TAO (%P|%t) : IIOP_Transport::") + ACE_TEXT ("process_message - ") + ACE_TEXT ("dispatch reply failed\n"))); + + this->messaging_object ()->reset (); + this->tms_->connection_closed (); + return -1; + } + + if (result == 0) + { + this->messaging_object ()->reset (); + + // The reply dispatcher was no longer registered. + // This can happened when the request/reply + // times out. + // To throw away all registered reply handlers is + // not the right thing, as there might be just one + // old reply coming in and several valid new ones + // pending. If we would invoke + // we would throw away also the valid ones. + //return 0; + } + + + // This is a NOOP for the Exclusive request case, but it actually + // destroys the stream in the muxed case. + //this->tms_->destroy_message_state (message_state); + } + else if (t == TAO_PLUGGABLE_MESSAGE_MESSAGERROR) + { + return -1; + } + + // If not, just return back.. + return 0; +} + int TAO_Transport::queue_is_empty (void) { diff --git a/TAO/tao/Transport.h b/TAO/tao/Transport.h index 54f8539ae1d..789761ee4e0 100644 --- a/TAO/tao/Transport.h +++ b/TAO/tao/Transport.h @@ -19,15 +19,18 @@ #include "ace/pre.h" #include "corbafwd.h" + + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + #include "Exception.h" #include "Transport_Descriptor_Interface.h" #include "Transport_Cache_Manager.h" #include "Transport_Timer.h" #include "ace/Strategies.h" - -#if !defined (ACE_LACKS_PRAGMA_ONCE) -# pragma once -#endif /* ACE_LACKS_PRAGMA_ONCE */ +#include "Incoming_Message_Queue.h" class TAO_ORB_Core; class TAO_Target_Specification; @@ -72,7 +75,7 @@ protected: * *

The outgoing data path:

* - * One of the responsabilities of the TAO_Transport class is to send + * One of the responsibilities of the TAO_Transport class is to send * out GIOP messages as efficiently as possible. In most cases * messages are put out in FIFO order, the transport object will put * out the message using a single system call and return control to @@ -136,7 +139,19 @@ protected: * *

The incoming data path:

* - * @todo Document the incoming data path design forces. + * One of the main responsibilities of the transport is to read and + * process the incoming GIOP message as quickly and efficiently as + * possible. There are other forces that needs to be given due + * consideration. They are + * - Multiple threads should read from the same handle + * - Reads on the handle could give one or more messages. + * - Minimise locking and copying overhead when trying to attack the + * above. + *

Parsing messages (GIOP) & processing the message:

+ * + * The messages should be checked for validity and the right + * information should be sent to the higher layer for processing. + * * * * See Also: @@ -451,6 +466,33 @@ public: TAO_Target_Specification &spec, TAO_OutputCDR &msg); + /// Callback to read incoming data + /// @@ Bala: Change documentation here.... They dont make sense + /// anymore + /** + * The ACE_Event_Handler adapter invokes this method as part of its + * handle_input() operation. + * + * @todo: the method name is confusing! Calling it handle_input() + * would probably make things easier to understand and follow! + * + * Once a complete message is read the Transport class delegates on + * the Messaging layer to invoke the right upcall (on the server) or + * the TAO_Reply_Dispatcher (on the client side). + * + * @param max_wait_time In some cases the I/O is synchronous, e.g. a + * thread-per-connection server or when Wait_On_Read is enabled. In + * those cases a maximum read time can be specified. + * + * @param block Is deprecated and ignored. + * + */ + // @@ lockme + virtual int handle_input_i (ACE_HANDLE h = ACE_INVALID_HANDLE, + ACE_Time_Value *max_wait_time = 0, + int block = 0); + + /// Prepare the waiting and demuxing strategy to receive a reply for /// a new request. /** @@ -501,29 +543,6 @@ public: int is_synchronous = 1, ACE_Time_Value *max_time_wait = 0) = 0; - /// Callback to read incoming data - /** - * The ACE_Event_Handler adapter invokes this method as part of its - * handle_input() operation. - * - * @todo: the method name is confusing! Calling it handle_input() - * would probably make things easier to understand and follow! - * - * Once a complete message is read the Transport class delegates on - * the Messaging layer to invoke the right upcall (on the server) or - * the TAO_Reply_Dispatcher (on the client side). - * - * @param max_wait_time In some cases the I/O is synchronous, e.g. a - * thread-per-connection server or when Wait_On_Read is enabled. In - * those cases a maximum read time can be specified. - * - * @param block Is deprecated and ignored. - * - */ - // @@ lockme - virtual int read_process_message (ACE_Time_Value *max_wait_time = 0, - int block = 0) = 0; - protected: /// Register the handler with the reactor. /** @@ -548,6 +567,28 @@ protected: */ virtual void transition_handler_state_i (void) = 0; + /// @@ Bala: Documentation + int parse_incoming_messages (ACE_Message_Block &message_block); + + size_t missing_data (ACE_Message_Block &message_block); + + int check_message_integrity (ACE_Message_Block &message_block); + + int consolidate_message (ACE_Message_Block &incoming, + size_t missing_data, + ACE_HANDLE h, + ACE_Time_Value *max_wait_time); + + int consolidate_message_queue (ACE_Message_Block &incoming, + size_t missing_data, + ACE_HANDLE h, + ACE_Time_Value *max_wait_time); + + /// @@ Bala: Documentation + virtual int process_parsed_messages (ACE_Message_Block &message_block, + CORBA::Octet byte_order, + ACE_HANDLE h = ACE_INVALID_HANDLE); + public: /// Method for the connection handler to signify that it /// is being closed and destroyed. @@ -622,6 +663,9 @@ public: int handle_timeout (const ACE_Time_Value ¤t_time, const void* act); + // @@ Bala : Add documentation + // int process_message (ACE_Message_Block &message_block) = 0; + private: /// Send some of the data in the queue. /** @@ -747,6 +791,9 @@ protected: TAO_Queued_Message *head_; TAO_Queued_Message *tail_; + /// @@Bala: Docu?? + TAO_Incoming_Message_Queue incoming_message_queue_; + /// The queue will start draining no later than /// *if* the deadline is ACE_Time_Value current_deadline_; diff --git a/TAO/tao/Wait_On_Read.cpp b/TAO/tao/Wait_On_Read.cpp index 3320298e49d..f582e02d496 100644 --- a/TAO/tao/Wait_On_Read.cpp +++ b/TAO/tao/Wait_On_Read.cpp @@ -29,7 +29,9 @@ TAO_Wait_On_Read::wait (ACE_Time_Value * max_wait_time, while (1) { retval = - this->transport_->read_process_message (max_wait_time, 1); + this->transport_->handle_input_i (ACE_INVALID_HANDLE, + max_wait_time, + 1); // If we got our reply, no need to run the loop any // further. -- cgit v1.2.1