From 0cb8c8eb9307f0c8043219ac06660524d21285f9 Mon Sep 17 00:00:00 2001 From: martinezm Date: Mon, 6 Dec 2004 19:50:15 +0000 Subject: Mon Dec 6 13:44:51 2004 Mike Martinez --- TAO/ChangeLog | 37 ++++- TAO/tao/GIOP_Message_Base.cpp | 373 +++++++++++++++++------------------------- TAO/tao/GIOP_Message_Base.h | 5 + TAO/tao/GIOP_Message_Lite.cpp | 10 ++ TAO/tao/GIOP_Message_Lite.h | 5 + TAO/tao/Pluggable_Messaging.h | 5 + TAO/tao/Transport.cpp | 140 ++++------------ TAO/tao/Transport.h | 2 - 8 files changed, 242 insertions(+), 335 deletions(-) diff --git a/TAO/ChangeLog b/TAO/ChangeLog index bc74f78880a..6f4a2e4dc65 100644 --- a/TAO/ChangeLog +++ b/TAO/ChangeLog @@ -1,11 +1,38 @@ +Mon Dec 6 13:44:51 2004 Mike Martinez + + * tao/GIOP_Message_Base.h: + * tao/GIOP_Message_Base.cpp: + * tao/GIOP_Message_Lite.h: + * tao/GIOP_Message_Lite.cpp: + * tao/Pluggable_Messaging.h: + + Added set_request_id_from_peek() method to perform actual + request ID parsing. Maintained current mechanism (which + incorrectly estimates the GIOP1.0 and GIOP1.1 request ID + value). + + * tao/GIOP_Message_Base.cpp: + + More cleanup. Moved InputCDR from heap to stack. + + * tao/Transport.h: + + Removed locking strategy to let the default ORB strategy be + used only. + + * tao/Transport.cpp: + + More cleanup. Pushed request ID parsing down into messaging + object. + Wed Dec 1 13:06:29 2004 Mike Martinez * tao/GIOP_Message_Base.cpp ( process_request, process_locate_request): - + Added new diagnostics. * tao/GIOP_Message_Base.cpp ( process_request_message, process_reply_message): - + Added new diagnostics. Added check for alignment with respect to original header instead of message data after GIOP header. When re-aligning, force the intial offset to correspond to the @@ -23,17 +50,17 @@ Wed Dec 1 13:06:29 2004 Mike Martinez inlined read_ulong. * tao/Transport.cpp( handle_input): - + Aligned read buffer before use. Cleaned up and added diagnostic messages. Conditional debug code. Removed some dead code. * tao/Transport.cpp( enqueue_incoming_message): - + Peek at request Id to initialize TAO_Queued_Data object. * tao/Transport.cpp( parse_queue_head, process_parsed_messages): - + Cleaned up diagnostics, added more conditions to diagnostic generation. diff --git a/TAO/tao/GIOP_Message_Base.cpp b/TAO/tao/GIOP_Message_Base.cpp index 5a3d1f552e5..ae217ec5c26 100644 --- a/TAO/tao/GIOP_Message_Base.cpp +++ b/TAO/tao/GIOP_Message_Base.cpp @@ -23,6 +23,7 @@ ACE_RCSID (tao, "$Id$") namespace { enum { GIOP_HEADER_ALIGNMENT_OFFSET = 4 } ; } +// namespace { TAO_InputCDR placeholder_cdr( static_cast(0)) ; } TAO_GIOP_Message_Base::TAO_GIOP_Message_Base (TAO_ORB_Core *orb_core, size_t /*input_cdr_size*/) @@ -342,42 +343,17 @@ TAO_GIOP_Message_Base::process_request_message (TAO_Transport *transport, ACE_TEXT("TAO (%P|%t) - Transport[%d], ") ACE_TEXT("TAO_GIOP_Message_Base::process_request_message: ") ACE_TEXT("entered with chained data: %s, refcount: %d, total length: %d, ") - ACE_TEXT("base: %x should be aligned to %d bytes.\n"), + ACE_TEXT("start: %x should be aligned to %d bytes.\n"), transport->id (), ((qd->msg_block_->cont()==0)? ACE_TEXT("no"): ACE_TEXT("yes")), qd->msg_block_->data_block()->reference_count(), qd->msg_block_->total_length(), reinterpret_cast(qd->msg_block_->rd_ptr()), // ACE_CDR::LONG_ALIGN - ACE_CDR::MAX_ALIGNMENT + (ACE_CDR::MAX_ALIGNMENT + GIOP_HEADER_ALIGNMENT_OFFSET) )); } -#define DEBUG_PMB_CODE -#ifdef DEBUG_PMB_CODE - if( qd == 0) { - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT("TAO (%P|%t) - Transport[%d], "), - ACE_TEXT("TAO_GIOP_Message_Base::process_request_message: "), - ACE_TEXT("entered with chain: %d, refcount: %d?\n"), - transport->id (), (qd->msg_block_->cont()?"yes":"no"), qd->msg_block_->data_block()->reference_count() )); - - } else if( qd->msg_block_ == 0) { - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT("TAO (%P|%t) - Transport[%d], "), - ACE_TEXT("TAO_GIOP_Message_Base::process_request_message: "), - ACE_TEXT("NULL CHAIN after processing!"), - transport->id () )) ; - - } else if( qd->msg_block_->data_block()->reference_count() == 0) { - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT("TAO (%P|%t) - Transport[%d], "), - ACE_TEXT("TAO_GIOP_Message_Base::process_request_message: "), - ACE_TEXT("UNREFERENCED DATA BLOCK before processing!"), - transport->id () )) ; - } -#endif // DEBUG_PMB_CODE - // Set the upcall thread this->orb_core_->lf_strategy ().set_upcall_thread (this->orb_core_->leader_follower ()); @@ -419,135 +395,82 @@ TAO_GIOP_Message_Base::process_request_message (TAO_Transport *transport, // Get the read and write positions before we steal data. size_t rd_pos = qd->msg_block_->rd_ptr () - qd->msg_block_->base (); size_t wr_pos = qd->msg_block_->wr_ptr () - qd->msg_block_->base (); -// We are now eliding the header before calling this method. -#ifdef BOGUS -rd_pos += TAO_GIOP_MESSAGE_HEADER_LEN; -#endif // BOGUS if (TAO_debug_level >= 10) qd->dump_msg ("request message") ; - - // Create a input CDR stream. - -#ifdef MIKE_SEZ_COPYIT // - // We force a copy out of the receive buffer here, which allows us to - // use the NULL locking strategy within the Transport processing. Do - // we need to change the locking back here? + // I tried making the placeholder static, but there were issues when + // the static destructors ran, so I waste time here. I really need + // just a static nil CDR to use while I determine which CDR to + // actually create for demarshaling. // - TAO_InputCDR input_cdr (qd->msg_block_, - qd->byte_order_, - qd->major_version_, - qd->minor_version_, - this->orb_core_); - if( TAO_debug_level > 3) { - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT("TAO (%P|%t) - Transport[%d], ") - ACE_TEXT("TAO_GIOP_Message_Base::process_request_message: ") - ACE_TEXT("consolidated, refcount: %d, total length: %d\n"), - transport->id (), - input_cdr.start()->data_block()->reference_count(), - input_cdr.start()->total_length() - )); - } + TAO_InputCDR placeholder_cdr( static_cast(0)) ; + TAO_InputCDR& input_cdr = placeholder_cdr ; -#else // MIKE_SEZ_COPYIT -TAO_InputCDR* real_cdr = 0 ; if( qd->msg_block_->cont() == 0 - && ((reinterpret_cast(qd->msg_block_->rd_ptr()) % ACE_CDR::MAX_ALIGNMENT) == GIOP_HEADER_ALIGNMENT_OFFSET) - ) { + && ((reinterpret_cast(qd->msg_block_->rd_ptr()) + % ACE_CDR::MAX_ALIGNMENT + ) == GIOP_HEADER_ALIGNMENT_OFFSET)) { // - // The message block is not chained, use the data block directly. - // Indicate the read and write pointers since we are likely to only - // be using a portion of the underlying buffer. + // Message is aligned and in a single block, no copy required. // - ACE_NEW_RETURN( - real_cdr, - TAO_InputCDR(qd->msg_block_->data_block ()->duplicate(), - qd->msg_block_->self_flags (), - rd_pos, - wr_pos, - qd->byte_order_, - qd->major_version_, - qd->minor_version_, - this->orb_core_ - ), - -1 - ) ; + TAO_InputCDR not_copied_cdr(qd->msg_block_->data_block ()->duplicate(), + qd->msg_block_->self_flags (), + rd_pos, + wr_pos, + qd->byte_order_, + qd->major_version_, + qd->minor_version_, + this->orb_core_); + input_cdr = not_copied_cdr ; } else { // // Prepend a stack buffer with enough data to ensure correct // alignment of the data. // +#if defined (ACE_HAS_PURIFY) + char buffer[ GIOP_HEADER_ALIGNMENT_OFFSET + ACE_CDR::MAX_ALIGNMENT] = { 0 } ; +#else char buffer[ GIOP_HEADER_ALIGNMENT_OFFSET + ACE_CDR::MAX_ALIGNMENT] ; +#endif /* ACE_HAS_PURIFY */ ACE_Message_Block header_proxy( buffer, GIOP_HEADER_ALIGNMENT_OFFSET) ; ACE_CDR::mb_align( &header_proxy) ; header_proxy.wr_ptr( GIOP_HEADER_ALIGNMENT_OFFSET) ; header_proxy.cont( qd->msg_block_) ; // - // The message block is chained or misaligned, consolidate the whole - // thing by copying, since the data conversion expects a single - // contiguous buffer. + // Let the constructor to the copying. // - ACE_NEW_RETURN( - real_cdr, - TAO_InputCDR( &header_proxy, - qd->byte_order_, - qd->major_version_, - qd->minor_version_, - this->orb_core_ - ), - -1 - ) ; - if( TAO_debug_level > 3) { - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT("TAO (%P|%t) - Transport[%d], ") - ACE_TEXT("TAO_GIOP_Message_Base::process_request_message: ") - ACE_TEXT("consolidated, refcount: %d, total length: %d\n"), - transport->id (), - real_cdr->start()->data_block()->reference_count(), - real_cdr->start()->total_length() - )); - } + TAO_InputCDR copied_cdr( &header_proxy, + qd->byte_order_, + qd->major_version_, + qd->minor_version_, + this->orb_core_) ; + input_cdr = copied_cdr ; // // Skip the empty header alignment bytes. // - real_cdr->skip_bytes( GIOP_HEADER_ALIGNMENT_OFFSET) ; - } + input_cdr.skip_bytes( GIOP_HEADER_ALIGNMENT_OFFSET) ; - TAO_InputCDR& input_cdr = *real_cdr ; - -#endif // MIKE_SEZ_COPYIT - -#ifdef DO_NOT_COMPILE - - TAO_InputCDR input_cdr (qd->msg_block_->data_block ()->duplicate(), - qd->msg_block_->self_flags (), - rd_pos, - wr_pos, - qd->byte_order_, - qd->major_version_, - qd->minor_version_, - this->orb_core_); - - if( qd->msg_block_->cont() != 0) { - // - // The message block is chained, consolidate the whole thing by - // copying, since the data conversion expects a single contiguous - // buffer. - // - // NOTE: This may leak the previous flavor, so we may need to manage - // that if we pursue this change. Do we need to reset the - // pointers here as well? - // - input_cdr.reset( qd->msg_block_, qd->byte_order_) ; } -#endif // DO_NOT_COMPILE + if( TAO_debug_level > 3) { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT("TAO (%P|%t) - Transport[%d], ") + ACE_TEXT("TAO_GIOP_Message_Base::process_request_message: ") + ACE_TEXT("cdr starts at: %x\n"), + transport->id (), + input_cdr.start()->rd_ptr() + )); + } + if (TAO_debug_level >= 10) + ACE_HEX_DUMP ((LM_DEBUG, + (const char *) input_cdr.start()->rd_ptr(), + input_cdr.start()->length(), + ACE_TEXT ("Request message"))); transport->assign_translators(&input_cdr,&output); @@ -568,20 +491,19 @@ TAO_InputCDR* real_cdr = 0 ; input_cdr, output, generator_parser); + break ; case TAO_PLUGGABLE_MESSAGE_LOCATEREQUEST: return this->process_locate_request (transport, input_cdr, output, generator_parser); + break ; + default: return -1; + break ; } - -#ifndef MIKE_SEZ_COPYIT - delete real_cdr ; -#endif // MIKE_SEZ_COPYIT - } int @@ -592,9 +514,10 @@ TAO_GIOP_Message_Base::process_reply_message ( if( TAO_debug_level > 3) { ACE_DEBUG ((LM_DEBUG, ACE_TEXT("TAO (%P|%t) - ") - ACE_TEXT("TAO_GIOP_Message_Base::process_reply_message: ") + ACE_TEXT("TAO_GIOP_Message_Base[%d]::process_reply_message: ") ACE_TEXT("entered with chained data: %s, refcount: %d, total length: %d, ") ACE_TEXT("base: %x should be aligned to %d bytes.\n"), + params.transport_->id (), ((qd->msg_block_->cont()==0)? ACE_TEXT("no"): ACE_TEXT("yes")), qd->msg_block_->data_block()->reference_count(), qd->msg_block_->total_length(), @@ -621,115 +544,78 @@ TAO_GIOP_Message_Base::process_reply_message ( if (TAO_debug_level >= 10) qd->dump_msg ("reply message"); -#ifdef MIKE_SEZ_COPYIT // - // We force a copy out of the receive buffer here, which allows us to - // use the NULL locking strategy within the Transport processing. Do - // we need to change the locking back here? + // I tried making the placeholder static, but there were issues when + // the static destructors ran, so I waste time here. I really need + // just a static nil CDR to use while I determine which CDR to + // actually create for demarshaling. // - TAO_InputCDR input_cdr (qd->msg_block_, - qd->byte_order_, - qd->major_version_, - qd->minor_version_, - this->orb_core_); - -#else // MIKE_SEZ_COPYIT - - TAO_InputCDR* real_cdr = 0 ; + TAO_InputCDR placeholder_cdr( static_cast(0)) ; + TAO_InputCDR& input_cdr = placeholder_cdr ; - if( (qd->msg_block_->cont() == 0) - && ((reinterpret_cast(qd->msg_block_->rd_ptr()) % ACE_CDR::MAX_ALIGNMENT) == GIOP_HEADER_ALIGNMENT_OFFSET) - ) { + if( qd->msg_block_->cont() == 0 + && ((reinterpret_cast(qd->msg_block_->rd_ptr()) + % ACE_CDR::MAX_ALIGNMENT + ) == GIOP_HEADER_ALIGNMENT_OFFSET)) { // - // The message block is not chained, use the data block directly. - // Indicate the read and write pointers since we are likely to only - // be using a portion of the underlying buffer. + // Message is aligned and in a single block, no copy required. // - ACE_NEW_RETURN( - real_cdr, - TAO_InputCDR(qd->msg_block_->data_block ()->duplicate(), - qd->msg_block_->self_flags (), - rd_pos, - wr_pos, - qd->byte_order_, - qd->major_version_, - qd->minor_version_, - this->orb_core_ - ), - -1 - ) ; + TAO_InputCDR not_copied_cdr(qd->msg_block_->data_block ()->duplicate(), + qd->msg_block_->self_flags (), + rd_pos, + wr_pos, + qd->byte_order_, + qd->major_version_, + qd->minor_version_, + this->orb_core_); + input_cdr = not_copied_cdr ; } else { // // Prepend a stack buffer with enough data to ensure correct // alignment of the data. // +#if defined (ACE_HAS_PURIFY) + char buffer[ GIOP_HEADER_ALIGNMENT_OFFSET + ACE_CDR::MAX_ALIGNMENT] = { 0 } ; +#else char buffer[ GIOP_HEADER_ALIGNMENT_OFFSET + ACE_CDR::MAX_ALIGNMENT] ; +#endif /* ACE_HAS_PURIFY */ ACE_Message_Block header_proxy( buffer, GIOP_HEADER_ALIGNMENT_OFFSET) ; ACE_CDR::mb_align( &header_proxy) ; header_proxy.wr_ptr( GIOP_HEADER_ALIGNMENT_OFFSET) ; header_proxy.cont( qd->msg_block_) ; // - // The message block is chained, consolidate the whole thing by - // copying, since the data conversion expects a single contiguous - // buffer. + // Let the constructor to the copying. // - ACE_NEW_RETURN( - real_cdr, - TAO_InputCDR( &header_proxy, - qd->byte_order_, - qd->major_version_, - qd->minor_version_, - this->orb_core_ - ), - -1 - ) ; - if( TAO_debug_level > 3) { - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT("TAO (%P|%t) - ") - ACE_TEXT("TAO_GIOP_Message_Base::process_reply_message: ") - ACE_TEXT("consolidated, refcount: %d, total length: %d\n"), - real_cdr->start()->data_block()->reference_count(), - real_cdr->start()->total_length() - )); - } + TAO_InputCDR copied_cdr( &header_proxy, + qd->byte_order_, + qd->major_version_, + qd->minor_version_, + this->orb_core_) ; + input_cdr = copied_cdr ; // // Skip the empty header alignment bytes. // - real_cdr->skip_bytes( GIOP_HEADER_ALIGNMENT_OFFSET) ; - } - - TAO_InputCDR& input_cdr = *real_cdr ; - -#endif // MIKE_SEZ_COPYIT + input_cdr.skip_bytes( GIOP_HEADER_ALIGNMENT_OFFSET) ; -#ifdef DO_NOT_COMPILE - - TAO_InputCDR input_cdr (qd->msg_block_->data_block ()->duplicate(), - qd->msg_block_->self_flags (), - rd_pos, - wr_pos, - qd->byte_order_, - qd->major_version_, - qd->minor_version_, - this->orb_core_); - - if( qd->msg_block_->cont() != 0) { - // - // The message block is chained, consolidate the whole thing by - // copying, since the data conversion expects a single contiguous - // buffer. - // - // NOTE: This may leak the previous flavor, so we may need to manage - // that if we pursue this change. Do we need to reset the - // pointers here as well? - // - input_cdr.reset( qd->msg_block_, qd->byte_order_) ; } -#endif // DO_NOT_COMPILE + if( TAO_debug_level > 3) { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT("TAO (%P|%t) - ") + ACE_TEXT("TAO_GIOP_Message_Base[%d]::process_reply_message: ") + ACE_TEXT("cdr starts at: %x\n"), + params.transport_->id (), + input_cdr.start()->rd_ptr() + )); + } + if (TAO_debug_level >= 10) + ACE_HEX_DUMP ((LM_DEBUG, + (const char *) input_cdr.start()->rd_ptr(), + input_cdr.start()->length(), + ACE_TEXT ("Reply message"))); // We know we have some reply message. Check whether it is a @@ -761,8 +647,9 @@ TAO_GIOP_Message_Base::process_reply_message ( if( TAO_debug_level > 3) { ACE_DEBUG ((LM_DEBUG, ACE_TEXT("TAO (%P|%t) - ") - ACE_TEXT("TAO_GIOP_Message_Base::process_reply_message: ") + ACE_TEXT("TAO_GIOP_Message_Base[%d]::process_reply_message: ") ACE_TEXT("parsed reply, return value: %d\n"), + params.transport_->id (), retval )); } @@ -778,8 +665,9 @@ TAO_GIOP_Message_Base::process_reply_message ( if( TAO_debug_level > 3) { ACE_DEBUG ((LM_DEBUG, ACE_TEXT("TAO (%P|%t) - ") - ACE_TEXT("TAO_GIOP_Message_Base::process_reply_message: ") + ACE_TEXT("TAO_GIOP_Message_Base[%d]::process_reply_message: ") ACE_TEXT("dispatched reply, return value: %d\n"), + params.transport_->id (), retval )); } @@ -795,10 +683,6 @@ TAO_GIOP_Message_Base::process_reply_message ( params.transport_->id ())); } -#ifndef MIKE_SEZ_COPYIT - delete real_cdr ; -#endif // MIKE_SEZ_COPYIT - return retval; } @@ -1658,6 +1542,55 @@ TAO_GIOP_Message_Base::set_queued_data_from_message_header ( qd->missing_data_bytes_ = state.payload_size (); } +void +TAO_GIOP_Message_Base::set_request_id_from_peek ( + TAO_Queued_Data* qd, + const ACE_Message_Block* mb +) const +{ + // + // No message data means no request header means no request ID. + // + if( mb == 0) + { + return ; + } + + // + // Each protocol version has the request ID located differently. + // + switch( qd->major_version_<<8 | qd->minor_version_) { + case 0x0100: + case 0x0101: // GIOP 1.0 and 1.1 have same location for request ID. + // IOP { + // typedef unsigned long ServiceId; + // struct ServiceContext { + // ServiceId context_id; + // sequence context_data; + // }; + // typedef sequence ServiceContextList; + // }; + // + // struct RequestHeader_1_0 { // and struct RequestHeader_1_1 { + // IOP::ServiceContextList service_context; + // unsigned long request_id; + + /// @todo: Need to skip the sequence thingies here. + + /// @todo: Without the skip, this is what the code does today. + qd->request_id_ + = TAO_GIOP_Message_State::read4( mb->rd_ptr(), qd->byte_order_) ; + break ; + + case 0x0102: // GIOP 1.2 has request ID first. + // struct RequestHeader_1_2 { + // unsigned long request_id ; + qd->request_id_ + = TAO_GIOP_Message_State::read4( mb->rd_ptr(), qd->byte_order_) ; + break ; + } +} + int TAO_GIOP_Message_Base::check_for_valid_header ( const ACE_Message_Block &mb diff --git a/TAO/tao/GIOP_Message_Base.h b/TAO/tao/GIOP_Message_Base.h index d6dd76fc73e..99d7f3e3f9b 100644 --- a/TAO/tao/GIOP_Message_Base.h +++ b/TAO/tao/GIOP_Message_Base.h @@ -125,6 +125,11 @@ public: const ACE_Message_Block &mb) const; + /// Peek at the request ID and load it into the queued data. + virtual void set_request_id_from_peek ( + TAO_Queued_Data* qd, + const ACE_Message_Block* mb) const; + /// Parse the reply message that we received and return the reply /// information through @a reply_info diff --git a/TAO/tao/GIOP_Message_Lite.cpp b/TAO/tao/GIOP_Message_Lite.cpp index e0d1010a201..7e1a225f177 100644 --- a/TAO/tao/GIOP_Message_Lite.cpp +++ b/TAO/tao/GIOP_Message_Lite.cpp @@ -1669,6 +1669,16 @@ TAO_GIOP_Message_Lite::set_queued_data_from_message_header ( ACE_UNUSED_ARG (mb); } +void +TAO_GIOP_Message_Lite::set_request_id_from_peek ( + TAO_Queued_Data* qd, + const ACE_Message_Block* mb +) const +{ + ACE_UNUSED_ARG (qd); + ACE_UNUSED_ARG (mb); +} + int TAO_GIOP_Message_Lite::check_for_valid_header ( const ACE_Message_Block &mb diff --git a/TAO/tao/GIOP_Message_Lite.h b/TAO/tao/GIOP_Message_Lite.h index cb28abf36cc..d28f7094d4f 100644 --- a/TAO/tao/GIOP_Message_Lite.h +++ b/TAO/tao/GIOP_Message_Lite.h @@ -115,6 +115,11 @@ public: TAO_Queued_Data *, const ACE_Message_Block &mb) const; + /// Peek at the request ID and load it into the queued data. + virtual void set_request_id_from_peek ( + TAO_Queued_Data* qd, + const ACE_Message_Block* mb) const; + /// Parse the reply message that we received and return the reply /// information through @a reply_info virtual int process_reply_message ( diff --git a/TAO/tao/Pluggable_Messaging.h b/TAO/tao/Pluggable_Messaging.h index 87db01ea50e..36e874746db 100644 --- a/TAO/tao/Pluggable_Messaging.h +++ b/TAO/tao/Pluggable_Messaging.h @@ -146,6 +146,11 @@ public: TAO_Queued_Data *, const ACE_Message_Block &mb) const = 0; + /// Peek at the request ID and load it into the queued data. + virtual void set_request_id_from_peek ( + TAO_Queued_Data* qd, + const ACE_Message_Block* mb) const = 0; + /// Parse the reply message that we received and return the reply /// information through @a reply_info virtual int process_reply_message ( diff --git a/TAO/tao/Transport.cpp b/TAO/tao/Transport.cpp index dd300986aab..39299fcda2a 100644 --- a/TAO/tao/Transport.cpp +++ b/TAO/tao/Transport.cpp @@ -129,7 +129,6 @@ TAO_Transport::TAO_Transport (CORBA::ULong tag, , wchar_translator_ (0) , tcs_set_ (0) , first_request_ (1) - , data_locking_strategy_ (0) { TAO_Client_Strategy_Factory *cf = this->orb_core_->client_factory (); @@ -139,12 +138,6 @@ TAO_Transport::TAO_Transport (CORBA::ULong tag, // Create TMS now. this->tms_ = cf->create_transport_mux_strategy (this); - -#ifdef MIKE_SEZ_COPYIT - this->data_locking_strategy_ = new ACE_Lock_Adapter ; -#else // MIKE_SEZ_COPYIT - this->data_locking_strategy_ = this->orb_core_->locking_strategy() ; -#endif // MIKE_SEZ_COPYIT } TAO_Transport::~TAO_Transport (void) @@ -169,10 +162,6 @@ TAO_Transport::~TAO_Transport (void) // *must* have been cleaned up. ACE_ASSERT (this->head_ == 0); ACE_ASSERT (this->cache_map_entry_ == 0); - -#ifdef MIKE_SEZ_COPYIT - delete this->data_locking_strategy_ ; -#endif // MIKE_SEZ_COPYIT } void @@ -1185,29 +1174,6 @@ TAO_Transport::send_message_shared_i (TAO_Stub *stub, return 0; } -#define DEBUG_PMB_CODE -#ifdef DEBUG_PMB_CODE -static void -dump_db_refcounts( - const ACE_Message_Block* b -) -{ - const ACE_Data_Block* d = b->data_block() ; - ACE_DEBUG ((LM_DEBUG, ACE_TEXT("---Chain ref count: "))) ; - for( ; b ; b = b->cont()) { - if( b->data_block() != d) { - d = b->data_block() ; - } - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT("(%@) %d, "), - d, d->reference_count() - )) ; - - } - ACE_DEBUG ((LM_DEBUG, ACE_TEXT("\n"))) ; -} -#endif // DEBUG_PMB_CODE - int TAO_Transport::queue_message_i(const ACE_Message_Block *message_block) { @@ -1296,10 +1262,10 @@ TAO_Transport::handle_input (TAO_Resume_Handle &rh, ->malloc( sizeof(ACE_Data_Block)), ACE_Data_Block( 4*TAO_MAXBUFSIZE, // S/B tunable - ACE_Message_Block::MB_DATA, + ACE_Message_Block::MB_DATA, 0, // Create new buffer on heap - this->orb_core_->input_cdr_buffer_allocator (), - this->data_locking_strategy_, // SYNCH or NULL + this->orb_core_->input_cdr_buffer_allocator (), + this->orb_core_->locking_strategy (), 0, // flags -- on the heap and fully owned! this->orb_core_->input_cdr_dblock_allocator() ), @@ -1356,6 +1322,9 @@ TAO_Transport::handle_input (TAO_Resume_Handle &rh, // #define NO_RETRY_ON_EMPTY_HANDLE #ifdef NO_RETRY_ON_EMPTY_HANDLE } else { + // + // @todo: Determine the desired action here -- ask Chris. + // partial = false ; #endif // NO_RETRY_ON_EMPTY_HANDLE @@ -1433,22 +1402,22 @@ TAO_Transport::handle_input (TAO_Resume_Handle &rh, } if (TAO_debug_level > 3) - { - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT("TAO (%P|%t) - Transport[%d]::handle_input, ") - ACE_TEXT("processed %d header bytes, header is %s.\n"), - this->id (), current_block->length(), - (partial == true) ? ACE_TEXT("incomplete") : ACE_TEXT("complete") + { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT("TAO (%P|%t) - Transport[%d]::handle_input, ") + ACE_TEXT("processed %d header bytes, header is %s.\n"), + this->id (), current_block->length(), + (partial == true) ? ACE_TEXT("incomplete") : ACE_TEXT("complete") )); - } + } if (TAO_debug_level >= 10) - { + { ACE_HEX_DUMP ((LM_DEBUG, (const char *) current_block->rd_ptr(), current_block->length(), ACE_TEXT ("handle data"))); - } + } // // Install the new block appropriately. @@ -1495,7 +1464,6 @@ TAO_Transport::handle_input (TAO_Resume_Handle &rh, // location->cont( current_block) ; } - } // @@ -1633,7 +1601,7 @@ TAO_Transport::handle_input (TAO_Resume_Handle &rh, data_left_in_buffer -= wanted_size ; this->current_message_->missing_data_bytes_ = 0 ; this->current_message_->current_state_ = TAO_Queued_Data::COMPLETED ; - } + } if (TAO_debug_level > 3) { @@ -1644,7 +1612,7 @@ TAO_Transport::handle_input (TAO_Resume_Handle &rh, ACE_TEXT("%d bytes left to read to complete message.\n"), this->id (), current_block->length(), data_left_in_buffer, this->current_message_->missing_data_bytes_ - )); + )); } if (TAO_debug_level >= 10) @@ -1692,7 +1660,6 @@ TAO_Transport::handle_input (TAO_Resume_Handle &rh, if( this->current_message_->current_state_ == TAO_Queued_Data::COMPLETED ) { -#ifndef BOGUS // // Remove and release the header from the queued message. // This works since we are assured that the header has been @@ -1703,7 +1670,6 @@ TAO_Transport::handle_input (TAO_Resume_Handle &rh, = this->current_message_->msg_block_->cont() ; current_block->cont( 0) ; current_block->release() ; -#endif // BOGUS if (TAO_debug_level > 3) { @@ -1711,21 +1677,12 @@ TAO_Transport::handle_input (TAO_Resume_Handle &rh, ACE_TEXT("TAO (%P|%t) - Transport[%d]::handle_input, ") ACE_TEXT("enqueueing message for processing.\n"), this->id ())); -#ifdef BOGUS - ACE_DEBUG ((LM_DEBUG,"---Data block ref count: %d\n", - this->current_message_ - ->msg_block_->cont() - ->data_block() - ->reference_count() - )) ; -#else // BOGUS ACE_DEBUG ((LM_DEBUG,"---Data block ref count: %d\n", this->current_message_ ->msg_block_ ->data_block() ->reference_count() )) ; -#endif // BOGUS } if (TAO_debug_level >= 10) @@ -1757,8 +1714,16 @@ TAO_Transport::handle_input (TAO_Resume_Handle &rh, rh.set_flag (TAO_Resume_Handle::TAO_HANDLE_LEAVE_SUSPENDED); } + // + // @todo: Need to determine whether to process the queue head if we + // are left with a partial message. If we have a partial + // message, we will process the messages on the queue, then + // wait for more input. This is regardless of wheter we have a + // partial or not -- not sure if the calling code needs to know + // that we have a partial message waiting to complete or not. + // int processing_results = 0 ; - if( (message_enqueued == true) && (partial == false)) { + if( (message_enqueued == true) ){// && (partial == false)) { // because of return val? processing_results = this->process_queue_head( rh) ; } @@ -1792,23 +1757,13 @@ TAO_Transport::enqueue_incoming_message (TAO_Queued_Data *queueable_message) TAO_Queued_Data *fragment_message = 0; // - // No message block is probably a close connection message. + // Peek at the request ID since we may need that to find any + // previous fragments. // - if( queueable_message->msg_block_ != 0) { - // - // Grab the message request ID. This should likely be moved out of - // the transport and into the protocol processing. - // - queueable_message->request_id_ - = TAO_GIOP_Message_State::read4( -#ifdef BOGUS - queueable_message->msg_block_->cont()->rd_ptr(), -#else // BOGUS - queueable_message->msg_block_->rd_ptr(), -#endif // BOGUS - queueable_message->byte_order_ - ) ; - } + this->messaging_object()->set_request_id_from_peek( + queueable_message, + queueable_message->msg_block_ + ); switch(whole) { @@ -1960,7 +1915,6 @@ TAO_Transport::process_parsed_messages (TAO_Queued_Data *qd, qd->dump_msg( "dispatching") ; } -// int result = 0; if (t == TAO_PLUGGABLE_MESSAGE_CLOSECONNECTION) { if (TAO_debug_level > 0) @@ -2010,8 +1964,6 @@ TAO_Transport::process_parsed_messages (TAO_Queued_Data *qd, return -1; } -/************************************************************************ -*/ int result = this->tms ()->dispatch_reply (params); if (result == -1) @@ -2026,9 +1978,6 @@ TAO_Transport::process_parsed_messages (TAO_Queued_Data *qd, return -1; } -/* -************************************************************************/ - } else if (t == TAO_PLUGGABLE_MESSAGE_MESSAGERROR) { @@ -2069,14 +2018,6 @@ TAO_Transport::process_queue_head (TAO_Resume_Handle &rh) if (this->incoming_message_queue_.is_head_complete () != 1) return 1; - if (TAO_debug_level > 3) - { - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT("TAO (%P|%t) - Transport[%d]::process_queue_head ") - ACE_TEXT("processing a complete message.\n"), - this->id ())); - } - // Get the message on the head of the queue.. TAO_Queued_Data *qd = this->incoming_message_queue_.dequeue_head (); @@ -2138,23 +2079,6 @@ TAO_Transport::process_queue_head (TAO_Resume_Handle &rh) // Process the message... int retval = this->process_parsed_messages (qd, rh); -#define DEBUG_PMB_CODE -#ifdef DEBUG_PMB_CODE - if( qd == 0) { - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT("TAO (%P|%t) - Transport[%d]::handle_input: processed a NULL message?\n"), - this->id ())); - - } else if( qd->msg_block_ == 0) { - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT("TAO (%P|%t) - Transport[%d]::handle_input: processed message with no data.\n"), - this->id ())); - - } else if( qd->msg_block_->data_block()->reference_count() == 0) { - dump_db_refcounts( qd->msg_block_) ; - } -#endif // DEBUG_PMB_CODE - // Delete the Queued_Data.. TAO_Queued_Data::release (qd); diff --git a/TAO/tao/Transport.h b/TAO/tao/Transport.h index 4bbc454b7e7..46f69ea9aa7 100644 --- a/TAO/tao/Transport.h +++ b/TAO/tao/Transport.h @@ -967,8 +967,6 @@ private: /// first request. After that, the translators are fixed for the life of the /// connection. CORBA::Boolean first_request_; - - ACE_Lock* data_locking_strategy_; // SYNCH or NULL }; /** -- cgit v1.2.1