diff options
author | msmit <msmit@remedy.nl> | 2009-02-13 14:52:13 +0000 |
---|---|---|
committer | msmit <msmit@remedy.nl> | 2009-02-13 14:52:13 +0000 |
commit | 97dd3923a92f571f4bd7d29e0739d68b3e658377 (patch) | |
tree | ad8496cbe0f0caa5554879dff794e9d9f3eb202e /TAO/tao | |
parent | 9f1e1dcd68536ed4a6407ec3b0fcb95bcd8424fa (diff) | |
download | ATCD-97dd3923a92f571f4bd7d29e0739d68b3e658377.tar.gz |
Fri Feb 13 14:35:39 UTC 2009 Marcel Smit <msmit@remedy.nl>
Implementation of the ZIOP Beta 1 spec
* tao/CDR.cpp:
* tao/CDR.h:
* tao/CDR.inl:
* tao/Messaging/Asynch_Invocation.cpp:
* tao/TAO_Server_Request.cpp:
Removed obsolete compression flag.
* tao/Compression/zlib/ZlibCompressor.cpp:
* tao/Compression/bzip2/Bzip2Compressor.cpp:
* tao/Compression/Compression.pidl:
Added description to Compression exception in order to
meet the ZIOP Beta 1 specification
* tao/Compression/Compression_Manager.cpp:
No major changes made.
* tao/GIOP_Message_Base.cpp:
* tao/GIOP_Message_Base.h:
* tao/GIOP_Message_State.cpp:
Implementation of compression and decompression methods.
* tao/ORB_Core.h:
* tao/ORB_Core.inl:
Removed ziop_enabled method since it became obsolete.
* tao/orbconf.h:
Implemented compression policies.
* tao/Remote_Invocation.cpp:
* tao/PortableServer/Upcall_Wrapper.cpp:
Removed compression and decompression methods here (moved
to GIOP_Message_Base.
* tao/Synch_Invocation.cpp:
Due to interface change of format_message method in GIOP_Message_Base.
Removed obsolete compression flag.
* tao/ZIOP_Adapter.h:
* tao/ZIOP/ZIOP.cpp:
* tao/ZIOP/ZIOP.h:
* tao/ZIOP/ZIOP.pidl:
Refactored current ZIOP implementation in order to meet
the ZIOP Beta 1 specification.
* tao/ZIOP/ZIOP_Policy_i.cpp:
* tao/ZIOP/ZIOP_Policy_i.h:
* tao/ZIOP/ZIOP_Policy_Validator.cpp:
* tao/ZIOP/ZIOP_PolicyFactory.cpp:
Implemented compression policies.
* tao/Transport.cpp:
* tao/Transport.h:
* tao/IIOP_Transport.cpp:
* orbsvcs/orbsvcs/HTIOP/HTIOP_Transport.cpp:
* orbsvcs/orbsvcs/PortableGroup/UIPMC_Transport.cpp:
* tao/Strategies/DIOP_Transport.cpp:
* tao/Strategies/SHMIOP_Transport.cpp:
Due to interface change of format_message method in GIOP_Message_Base.
Diffstat (limited to 'TAO/tao')
31 files changed, 589 insertions, 552 deletions
diff --git a/TAO/tao/CDR.cpp b/TAO/tao/CDR.cpp index 72aaae5a00a..ab398fabd52 100644 --- a/TAO/tao/CDR.cpp +++ b/TAO/tao/CDR.cpp @@ -76,9 +76,6 @@ TAO_OutputCDR::TAO_OutputCDR (size_t size, , stub_ (0) , message_semantics_ (TAO_TWOWAY_REQUEST) , timeout_ (0) -#if defined (TAO_HAS_ZIOP) && TAO_HAS_ZIOP ==1 - , compressed_ (false) -#endif { ACE_FUNCTION_TIMEPROBE (TAO_OUTPUT_CDR_CTOR1_ENTER); @@ -115,9 +112,6 @@ TAO_OutputCDR::TAO_OutputCDR (char *data, , stub_ (0) , message_semantics_ (TAO_TWOWAY_REQUEST) , timeout_ (0) -#if defined (TAO_HAS_ZIOP) && TAO_HAS_ZIOP ==1 - , compressed_ (false) -#endif { ACE_FUNCTION_TIMEPROBE (TAO_OUTPUT_CDR_CTOR2_ENTER); } @@ -147,9 +141,6 @@ TAO_OutputCDR::TAO_OutputCDR (char *data, , stub_ (0) , message_semantics_ (TAO_TWOWAY_REQUEST) , timeout_ (0) -#if defined (TAO_HAS_ZIOP) && TAO_HAS_ZIOP ==1 - , compressed_ (false) -#endif { ACE_FUNCTION_TIMEPROBE (TAO_OUTPUT_CDR_CTOR3_ENTER); } @@ -170,9 +161,6 @@ TAO_OutputCDR::TAO_OutputCDR (ACE_Message_Block *data, , stub_ (0) , message_semantics_ (TAO_TWOWAY_REQUEST) , timeout_ (0) -#if defined (TAO_HAS_ZIOP) && TAO_HAS_ZIOP ==1 - , compressed_ (false) -#endif { ACE_FUNCTION_TIMEPROBE (TAO_OUTPUT_CDR_CTOR4_ENTER); } @@ -196,9 +184,6 @@ TAO_OutputCDR::TAO_OutputCDR (ACE_Data_Block *data_block, , stub_ (0) , message_semantics_ (TAO_TWOWAY_REQUEST) , timeout_ (0) -#if defined (TAO_HAS_ZIOP) && TAO_HAS_ZIOP ==1 - , compressed_ (false) -#endif { ACE_FUNCTION_TIMEPROBE (TAO_OUTPUT_CDR_CTOR5_ENTER); } @@ -292,9 +277,6 @@ TAO_InputCDR::TAO_InputCDR (const TAO_OutputCDR& rhs, : (orb_core ? orb_core->output_cdr_msgblock_allocator () : 0)), orb_core_ (orb_core) -#if defined (TAO_HAS_ZIOP) && TAO_HAS_ZIOP ==1 - , compressed_ (false) -#endif { } diff --git a/TAO/tao/CDR.h b/TAO/tao/CDR.h index a69df9d28c5..8d3125cf334 100644 --- a/TAO/tao/CDR.h +++ b/TAO/tao/CDR.h @@ -180,20 +180,11 @@ public: /// Specify whether there are more data fragments to come. void more_fragments (bool more); -#if defined (TAO_HAS_ZIOP) && TAO_HAS_ZIOP ==1 - /// Are we containing compressed data? - bool compressed (void) const; - - /// Specify whether we have compressed data. - void compressed (bool compressed); -#endif - /// Set fragmented message attributes. void message_attributes (CORBA::ULong request_id, TAO_Stub * stub, TAO_Message_Semantics message_semantics, - ACE_Time_Value * timeout, - bool compressed); + ACE_Time_Value * timeout); /// Fragmented message request ID. CORBA::ULong request_id (void) const; @@ -244,11 +235,6 @@ private: /// Request/reply send timeout. ACE_Time_Value * timeout_; - -#if defined (TAO_HAS_ZIOP) && TAO_HAS_ZIOP ==1 - /// Do we contain compressed data - bool compressed_; -#endif //@} }; @@ -380,21 +366,9 @@ public: static void throw_stub_exception (int error_num); static void throw_skel_exception (int error_num); -#if defined (TAO_HAS_ZIOP) && TAO_HAS_ZIOP ==1 - /// Are we containing compressed data? - bool compressed (void) const; - - /// Specify whether we have compressed data. - void compressed (bool compressed); -#endif - private: /// The ORB_Core, required to extract object references. TAO_ORB_Core* orb_core_; - -#if defined (TAO_HAS_ZIOP) && TAO_HAS_ZIOP ==1 - CORBA::Boolean compressed_; -#endif }; TAO_END_VERSIONED_NAMESPACE_DECL diff --git a/TAO/tao/CDR.inl b/TAO/tao/CDR.inl index 25611598b02..bbcbbc5a40f 100644 --- a/TAO/tao/CDR.inl +++ b/TAO/tao/CDR.inl @@ -23,36 +23,16 @@ TAO_OutputCDR::more_fragments (bool more) this->more_fragments_ = more; } -#if defined (TAO_HAS_ZIOP) && TAO_HAS_ZIOP ==1 -ACE_INLINE bool -TAO_OutputCDR::compressed (void) const -{ - return this->compressed_; -} - -ACE_INLINE void -TAO_OutputCDR::compressed (bool compressed) -{ - this->compressed_ = compressed; -} -#endif - ACE_INLINE void TAO_OutputCDR::message_attributes (CORBA::ULong request_id, TAO_Stub * stub, TAO_Message_Semantics message_semantics, - ACE_Time_Value * timeout, - bool compressed) + ACE_Time_Value * timeout) { this->request_id_ = request_id; this->stub_ = stub; this->message_semantics_ = message_semantics; this->timeout_ = timeout; -#if defined (TAO_HAS_ZIOP) && TAO_HAS_ZIOP ==1 - this->compressed_ = compressed; -#else - ACE_UNUSED_ARG (compressed); -#endif } ACE_INLINE CORBA::ULong @@ -100,9 +80,6 @@ TAO_InputCDR::TAO_InputCDR (const char *buf, major_version, minor_version), orb_core_ (orb_core) -#if defined (TAO_HAS_ZIOP) && TAO_HAS_ZIOP ==1 - , compressed_ (false) -#endif { } @@ -117,9 +94,6 @@ TAO_InputCDR::TAO_InputCDR (size_t bufsiz, major_version, minor_version), orb_core_ (orb_core) -#if defined (TAO_HAS_ZIOP) && TAO_HAS_ZIOP ==1 - , compressed_ (false) -#endif { } @@ -134,9 +108,6 @@ TAO_InputCDR::TAO_InputCDR (const ACE_Message_Block *data, major_version, minor_version), orb_core_ (orb_core) -#if defined (TAO_HAS_ZIOP) && TAO_HAS_ZIOP ==1 - , compressed_ (false) -#endif { } @@ -153,9 +124,6 @@ TAO_InputCDR::TAO_InputCDR (const ACE_Message_Block *data, minor_version, lock), orb_core_ (orb_core) -#if defined (TAO_HAS_ZIOP) && TAO_HAS_ZIOP ==1 - , compressed_ (false) -#endif { } @@ -172,9 +140,6 @@ TAO_InputCDR::TAO_InputCDR (ACE_Data_Block *data, major_version, minor_version), orb_core_ (orb_core) -#if defined (TAO_HAS_ZIOP) && TAO_HAS_ZIOP ==1 - , compressed_ (false) -#endif { } @@ -196,9 +161,6 @@ TAO_InputCDR::TAO_InputCDR (ACE_Data_Block *data, major_version, minor_version), orb_core_ (orb_core) -#if defined (TAO_HAS_ZIOP) && TAO_HAS_ZIOP ==1 - , compressed_ (false) -#endif { } @@ -211,9 +173,6 @@ TAO_InputCDR::TAO_InputCDR (const TAO_InputCDR& rhs, size, offset), orb_core_ (rhs.orb_core_) -#if defined (TAO_HAS_ZIOP) && TAO_HAS_ZIOP ==1 - , compressed_ (false) -#endif { } @@ -223,9 +182,6 @@ TAO_InputCDR::TAO_InputCDR (const TAO_InputCDR& rhs, : ACE_InputCDR (rhs, size), orb_core_ (rhs.orb_core_) -#if defined (TAO_HAS_ZIOP) && TAO_HAS_ZIOP ==1 - , compressed_ (false) -#endif { } @@ -233,9 +189,6 @@ ACE_INLINE TAO_InputCDR::TAO_InputCDR (const TAO_InputCDR& rhs) : ACE_InputCDR (rhs), orb_core_ (rhs.orb_core_) -#if defined (TAO_HAS_ZIOP) && TAO_HAS_ZIOP ==1 - , compressed_ (false) -#endif { } @@ -244,9 +197,6 @@ TAO_InputCDR::TAO_InputCDR (ACE_InputCDR::Transfer_Contents rhs, TAO_ORB_Core* orb_core) : ACE_InputCDR (rhs), orb_core_ (orb_core) -#if defined (TAO_HAS_ZIOP) && TAO_HAS_ZIOP ==1 - , compressed_ (false) -#endif { } @@ -261,20 +211,6 @@ TAO_InputCDR::orb_core (void) const return this->orb_core_; } -#if defined (TAO_HAS_ZIOP) && TAO_HAS_ZIOP ==1 -ACE_INLINE bool -TAO_InputCDR::compressed (void) const -{ - return this->compressed_; -} - -ACE_INLINE void -TAO_InputCDR::compressed (bool compressed) -{ - this->compressed_ = compressed; -} -#endif - // **************************************************************** ACE_INLINE CORBA::Boolean operator<< (TAO_OutputCDR &os, diff --git a/TAO/tao/Compression/Compression.pidl b/TAO/tao/Compression/Compression.pidl index 41fa649a165..0fed028ce7c 100644 --- a/TAO/tao/Compression/Compression.pidl +++ b/TAO/tao/Compression/Compression.pidl @@ -22,6 +22,7 @@ module Compression exception CompressionException { unsigned long reason; + string description; }; /** @@ -42,7 +43,7 @@ module Compression /** * CompressorId type. */ - typedef unsigned long CompressorId; + typedef unsigned short CompressorId; const CompressorId COMPRESSORID_NONE = 0; const CompressorId COMPRESSORID_GZIP = 1; const CompressorId COMPRESSORID_PKZIP = 2; @@ -55,19 +56,18 @@ module Compression const CompressorId COMPRESSORID_XAR = 9; /** - * CompressorId list + * CompressionLevel type. */ - typedef sequence <CompressorId> CompressorIdList; + typedef unsigned short CompressionLevel; /** - * CompressionLevel type. + * CompressionRatio type. */ - typedef unsigned long CompressionLevel; - typedef long CompressionRatio; - local interface CompressorFactory; - + /** + * CompressionLevelId struc. + */ struct CompressorIdLevel { CompressorId compressor_id; CompressionLevel compression_level; @@ -76,6 +76,8 @@ module Compression typedef CORBA::OctetSeq Buffer; + local interface CompressorFactory; + /** * Compressor - abstraction of a compressor and decompressor. */ diff --git a/TAO/tao/Compression/Compression_Manager.cpp b/TAO/tao/Compression/Compression_Manager.cpp index d9ea7e2dbfc..035a9c2a95e 100644 --- a/TAO/tao/Compression/Compression_Manager.cpp +++ b/TAO/tao/Compression/Compression_Manager.cpp @@ -18,7 +18,6 @@ namespace TAO if (!::CORBA::is_nil (compressor_factory)) { ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->mutex_); - CORBA::ULong const length = this->factories_.length (); for (CORBA::ULong i = 0; i < length; ++i) @@ -62,7 +61,6 @@ namespace TAO this->factories_[i] = ::Compression::CompressorFactory::_nil (); // make sequence smaller - return; } diff --git a/TAO/tao/Compression/bzip2/Bzip2Compressor.cpp b/TAO/tao/Compression/bzip2/Bzip2Compressor.cpp index 357e4eaf4ff..fcfd6a83a0d 100644 --- a/TAO/tao/Compression/bzip2/Bzip2Compressor.cpp +++ b/TAO/tao/Compression/bzip2/Bzip2Compressor.cpp @@ -23,7 +23,8 @@ Bzip2Compressor::compress ( ::Compression::Buffer & target ) { - unsigned int max_length = static_cast <unsigned int> (source.length () * 1.1) + 12; + unsigned int max_length = + static_cast <unsigned int> (source.length () * 1.1) + TAO_GIOP_MESSAGE_HEADER_LEN; target.length (static_cast <CORBA::ULong> (max_length)); // todo, check 0,1 values @@ -37,7 +38,7 @@ Bzip2Compressor::compress ( if (retval != BZ_OK) { - throw ::Compression::CompressionException (retval); + throw ::Compression::CompressionException (retval, ""); } else { @@ -64,7 +65,7 @@ Bzip2Compressor::decompress ( if (retval != BZ_OK) { - throw ::Compression::CompressionException (retval); + throw ::Compression::CompressionException (retval, ""); } else { diff --git a/TAO/tao/Compression/zlib/ZlibCompressor.cpp b/TAO/tao/Compression/zlib/ZlibCompressor.cpp index 4a22a0f5fe5..45c27e26765 100644 --- a/TAO/tao/Compression/zlib/ZlibCompressor.cpp +++ b/TAO/tao/Compression/zlib/ZlibCompressor.cpp @@ -23,7 +23,8 @@ ZlibCompressor::compress ( ::Compression::Buffer & target ) { - uLongf max_length = static_cast <uLongf> (source.length () * 1.1) + 12; + uLongf max_length = + static_cast <uLongf> (source.length () * 1.1) + TAO_GIOP_MESSAGE_HEADER_LEN; target.length (static_cast <CORBA::ULong> (max_length)); int const retval = ::compress2 (reinterpret_cast <Bytef*>(target.get_buffer ()), @@ -34,7 +35,8 @@ ZlibCompressor::compress ( if (retval != Z_OK) { - throw ::Compression::CompressionException (retval); + throw ::Compression::CompressionException (retval, + CORBA::string_dup("")); } else { @@ -58,7 +60,7 @@ ZlibCompressor::decompress ( if (retval != Z_OK) { - throw ::Compression::CompressionException (retval); + throw ::Compression::CompressionException (retval, ""); } else { diff --git a/TAO/tao/GIOP_Message_Base.cpp b/TAO/tao/GIOP_Message_Base.cpp index d0350c5c591..a02ffca1f1e 100644 --- a/TAO/tao/GIOP_Message_Base.cpp +++ b/TAO/tao/GIOP_Message_Base.cpp @@ -236,14 +236,66 @@ TAO_GIOP_Message_Base::generate_fragment_header (TAO_OutputCDR & cdr, return 0; } -int -TAO_GIOP_Message_Base::format_message (TAO_OutputCDR &stream) +int +TAO_GIOP_Message_Base::dump_consolidated_msg (TAO_OutputCDR &stream, bool hex_dump_only) { - // Ptr to first buffer. + // Check whether the output cdr stream is build up of multiple + // messageblocks. If so, consolidate them to one block that can be + // dumped + ACE_Message_Block* consolidated_block = 0; char *buf = const_cast <char*> (stream.buffer ()); + size_t const total_len = stream.total_length (); + if (stream.begin()->cont () != 0) + { + ACE_NEW_RETURN (consolidated_block, ACE_Message_Block, 0); + ACE_CDR::consolidate (consolidated_block, stream.begin ()); + buf = (char *) (consolidated_block->rd_ptr ()); + } + /// + this->dump_msg ("send", reinterpret_cast <u_char *> (buf), total_len, hex_dump_only); + // + delete consolidated_block; + consolidated_block = 0; + // + return 0; +} + +int +TAO_GIOP_Message_Base::format_message (TAO_OutputCDR &stream, TAO_Stub& stub) +{ this->set_giop_flags (stream); +#if defined (TAO_HAS_ZIOP) && TAO_HAS_ZIOP ==1 + TAO_ZIOP_Adapter* ziop_adapter = this->orb_core_->ziop_adapter (); + + //ziop adapter found and not compressed yet + if (ziop_adapter) + { + if (TAO_debug_level >= 5) + { + ACE_DEBUG ((LM_DEBUG, ACE_TEXT("Before compression: "))); + this->dump_consolidated_msg (stream, true); + } + bool compressed; + if (&stub == 0) + { + compressed = ziop_adapter->marshal_data (stream, *this->orb_core_); + } + else + { + compressed = ziop_adapter->marshal_data (stream, stub); + } + + if (TAO_debug_level >= 5) + { + if (!compressed) + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT("GIOP message not compressed"))); + } + } +#endif + // Length of all buffers. size_t const total_len = stream.total_length (); @@ -254,6 +306,8 @@ TAO_GIOP_Message_Base::format_message (TAO_OutputCDR &stream) // this particular environment and that isn't handled by the // networking infrastructure (e.g., IPSEC). + char *buf = const_cast <char*> (stream.buffer ()); + CORBA::ULong bodylen = static_cast <CORBA::ULong> (total_len - TAO_GIOP_MESSAGE_HEADER_LEN); @@ -271,23 +325,7 @@ TAO_GIOP_Message_Base::format_message (TAO_OutputCDR &stream) if (TAO_debug_level >= 5) { - // Check whether the output cdr stream is build up of multiple - // messageblocks. If so, consolidate them to one block that can be - // dumped - ACE_Message_Block* consolidated_block = 0; - if (stream.begin()->cont () != 0) - { - ACE_NEW_RETURN (consolidated_block, ACE_Message_Block, 0); - ACE_CDR::consolidate (consolidated_block, stream.begin ()); - buf = (char *) (consolidated_block->rd_ptr ()); - } - /// - this->dump_msg ("send", reinterpret_cast <u_char *> (buf), total_len); - - // - delete consolidated_block; - consolidated_block = 0; - // + this->dump_consolidated_msg (stream, false); } return 0; @@ -600,9 +638,9 @@ TAO_GIOP_Message_Base::process_request_message (TAO_Transport *transport, qd->giop_version ().major_version (), qd->giop_version ().minor_version ()); - // Get the read and write positions before we steal data. + // Get the read and write positions and header before we steal data. size_t rd_pos = qd->msg_block ()->rd_ptr () - qd->msg_block ()->base (); - size_t const wr_pos = qd->msg_block ()->wr_ptr () - qd->msg_block ()->base (); + size_t wr_pos = qd->msg_block ()->wr_ptr () - qd->msg_block ()->base (); rd_pos += TAO_GIOP_MESSAGE_HEADER_LEN; if (TAO_debug_level >= 5) @@ -639,7 +677,12 @@ TAO_GIOP_Message_Base::process_request_message (TAO_Transport *transport, db = qd->msg_block ()->data_block ()->duplicate (); } - TAO_InputCDR input_cdr (db, +#if defined (TAO_HAS_ZIOP) && TAO_HAS_ZIOP ==1 + if (!decompress (&db, *qd, rd_pos, wr_pos)) + return -1; +#endif + + TAO_InputCDR input_cdr (db, flg, rd_pos, wr_pos, @@ -648,9 +691,6 @@ TAO_GIOP_Message_Base::process_request_message (TAO_Transport *transport, qd->giop_version ().minor_version (), this->orb_core_); -#if defined (TAO_HAS_ZIOP) && TAO_HAS_ZIOP ==1 - input_cdr.compressed (qd->state().compressed ()); -#endif transport->assign_translators(&input_cdr,&output); @@ -682,6 +722,42 @@ TAO_GIOP_Message_Base::process_request_message (TAO_Transport *transport, } } +bool +TAO_GIOP_Message_Base::decompress (ACE_Data_Block **db, TAO_Queued_Data& qd, + size_t& rd_pos, size_t& wr_pos) +{ + if (qd.state().compressed ()) + { + TAO_ZIOP_Adapter* adapter = this->orb_core_->ziop_adapter (); + if (adapter) + { + if (!adapter->decompress (db, qd, *this->orb_core_)) + return false; + rd_pos = TAO_GIOP_MESSAGE_HEADER_LEN; + ACE_Data_Block *tmp = *db; + wr_pos = tmp->size(); + if (TAO_debug_level >= 5) + { + ACE_HEX_DUMP ((LM_DEBUG, + const_cast <char*> (tmp->base ()), + tmp->size (), + ACE_TEXT ("GIOP message after decompression"))); + } + } + else + { + if (TAO_debug_level > 0) + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("TAO (%P|%t) ERROR: Unable to decompress ") + ACE_TEXT ("data.\n"))); + + return false; + } + } + return true; +} + + int TAO_GIOP_Message_Base::process_reply_message ( TAO_Pluggable_Reply_Params ¶ms, @@ -693,7 +769,7 @@ TAO_GIOP_Message_Base::process_reply_message ( // Get the read and write positions before we steal data. size_t rd_pos = qd->msg_block ()->rd_ptr () - qd->msg_block ()->base (); - size_t const wr_pos = qd->msg_block ()->wr_ptr () - qd->msg_block ()->base (); + size_t wr_pos = qd->msg_block ()->wr_ptr () - qd->msg_block ()->base (); rd_pos += TAO_GIOP_MESSAGE_HEADER_LEN; if (TAO_debug_level >= 5) @@ -703,11 +779,17 @@ TAO_GIOP_Message_Base::process_reply_message ( qd->msg_block ()->length ()); } + ACE_Data_Block *db = qd->msg_block ()->data_block ();; + +#if defined (TAO_HAS_ZIOP) && TAO_HAS_ZIOP ==1 + if (!decompress (&db, *qd, rd_pos, wr_pos)) + return -1; +#endif // Create a empty buffer on stack // NOTE: We use the same data block in which we read the message and // we pass it on to the higher layers of the ORB. So we dont to any // copies at all here. - TAO_InputCDR input_cdr (qd->msg_block ()->data_block (), + TAO_InputCDR input_cdr (db, ACE_Message_Block::DONT_DELETE, rd_pos, wr_pos, @@ -811,7 +893,7 @@ TAO_GIOP_Message_Base::write_protocol_header (GIOP::MsgType type, 0x4f, // 'O' 0x50 // 'P' }; - + header[4] = version.major; header[5] = version.minor; @@ -870,26 +952,6 @@ TAO_GIOP_Message_Base::process_request ( CORBA::Object_var forward_to; -#if defined (TAO_HAS_ZIOP) && TAO_HAS_ZIOP ==1 - if (cdr.compressed ()) - { - TAO_ZIOP_Adapter* adapter = this->orb_core_->ziop_adapter (); - if (adapter) - { - adapter->decompress (request); - } - else - { - if (TAO_debug_level > 0) - ACE_ERROR ((LM_ERROR, - ACE_TEXT ("TAO (%P|%t) ERROR: Unable to decompress ") - ACE_TEXT ("data.\n"))); - - return -1; - } - } -#endif - /* * Hook to specialize request processing within TAO * This hook will be replaced by specialized request @@ -928,8 +990,7 @@ TAO_GIOP_Message_Base::process_request ( output.message_attributes (request_id, 0, TAO_REPLY, - 0, - false); + 0); // Make the GIOP header and Reply header this->generate_reply_header (output, reply_params); @@ -1443,7 +1504,8 @@ TAO_GIOP_Message_Base::send_reply_exception ( void TAO_GIOP_Message_Base::dump_msg (const char *label, const u_char *ptr, - size_t len) + size_t len, + bool hex_dump_only) { if (TAO_debug_level < 10) { @@ -1471,7 +1533,9 @@ TAO_GIOP_Message_Base::dump_msg (const char *label, // Byte order. int const byte_order = ptr[TAO_GIOP_MESSAGE_FLAGS_OFFSET] & 0x01; - int const compressed = ptr[TAO_GIOP_MESSAGE_FLAGS_OFFSET] & 0x04; + int const compressed = ptr[0] == 0x5A; + ACE_TCHAR message_type[15]; + ACE_OS::sprintf(message_type, "%c%c%c%c message", ptr[0], ptr[1], ptr[2], ptr[3]); // Get the version info CORBA::Octet const major = ptr[TAO_GIOP_VERSION_MAJOR_OFFSET]; @@ -1511,23 +1575,25 @@ TAO_GIOP_Message_Base::dump_msg (const char *label, } // Print. - ACE_DEBUG ((LM_DEBUG, - "TAO (%P|%t) - GIOP_Message_Base::dump_msg, " - "%C GIOP v%c.%c msg, %d data bytes, %s endian, " - "%s compressed, Type %C[%u]\n", - label, - digits[ptr[TAO_GIOP_VERSION_MAJOR_OFFSET]], - digits[ptr[TAO_GIOP_VERSION_MINOR_OFFSET]], - len - TAO_GIOP_MESSAGE_HEADER_LEN , - (byte_order == TAO_ENCAP_BYTE_ORDER) ? ACE_TEXT("my") : ACE_TEXT("other"), - (compressed == 0) ? ACE_TEXT("not") : ACE_TEXT("is"), - message_name, - *id)); - + if (!hex_dump_only) + { + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - GIOP_Message_Base::dump_msg, " + "%C %s v%c.%c, %d data bytes, %s endian, " + "Type %C[%u]\n", + label, + message_type, + digits[ptr[TAO_GIOP_VERSION_MAJOR_OFFSET]], + digits[ptr[TAO_GIOP_VERSION_MINOR_OFFSET]], + len - TAO_GIOP_MESSAGE_HEADER_LEN , + (byte_order == TAO_ENCAP_BYTE_ORDER) ? ACE_TEXT("my") : ACE_TEXT("other"), + message_name, + *id)); + } ACE_HEX_DUMP ((LM_DEBUG, - (const char *) ptr, - len, - ACE_TEXT ("GIOP message"))); + (const char *) ptr, + len, + ACE_TEXT (message_type))); } int @@ -1953,11 +2019,6 @@ TAO_GIOP_Message_Base::set_giop_flags (TAO_OutputCDR & msg) const // Only supported in GIOP 1.1 or better. if (!(major <= 1 && minor == 0)) ACE_SET_BITS (flags, msg.more_fragments () << 1); - -#if defined (TAO_HAS_ZIOP) && TAO_HAS_ZIOP ==1 - if (!(major <= 1 && minor < 2)) - ACE_SET_BITS (flags, msg.compressed () << 2); -#endif } TAO_END_VERSIONED_NAMESPACE_DECL diff --git a/TAO/tao/GIOP_Message_Base.h b/TAO/tao/GIOP_Message_Base.h index 701a0f110d6..68f2e232f46 100644 --- a/TAO/tao/GIOP_Message_Base.h +++ b/TAO/tao/GIOP_Message_Base.h @@ -85,7 +85,7 @@ public: /// Format the message. As we have not written the message length in /// the header, we make use of this oppurtunity to insert and format /// the message. - int format_message (TAO_OutputCDR &cdr); + int format_message (TAO_OutputCDR &cdr, TAO_Stub& stub); /** * Parse the details of the next message from the @a incoming @@ -157,6 +157,11 @@ public: bool is_ready_for_bidirectional (TAO_OutputCDR &msg) const; private: + /// Decompresses a ZIOP message and turns it into a GIOP message + bool decompress (ACE_Data_Block **db, TAO_Queued_Data& qd, + size_t& rd_pos, size_t& wr_pos); + + /// Processes the GIOP_REQUEST messages int process_request (TAO_Transport *transport, TAO_InputCDR &input, @@ -173,8 +178,12 @@ private: TAO_GIOP_Message_Generator_Parser *get_parser ( const TAO_GIOP_Message_Version &version) const; + /// Print out consolidate messages + int dump_consolidated_msg (TAO_OutputCDR &stream, bool hex_dump_only); + /// Print out a debug messages.. - void dump_msg (const char *label, const u_char *ptr, size_t len); + void dump_msg (const char *label, const u_char *ptr, + size_t len, bool hex_dump_only = false); /// Writes the GIOP header in to @a msg /// @note If the GIOP header happens to change in the future, we can diff --git a/TAO/tao/GIOP_Message_State.cpp b/TAO/tao/GIOP_Message_State.cpp index 45511f0150a..6826fef374a 100644 --- a/TAO/tao/GIOP_Message_State.cpp +++ b/TAO/tao/GIOP_Message_State.cpp @@ -87,8 +87,7 @@ TAO_GIOP_Message_State::parse_message_header_i (ACE_Message_Block &incoming) int TAO_GIOP_Message_State::parse_magic_bytes (char *buf) { - // The values are hard-coded to support non-ASCII platforms. - if (!(buf [0] == 0x47 // 'G' + if (!((buf [0] == 0x5A || buf [0] == 0x47) // 'Z' or 'G' (depending on compression) && buf [1] == 0x49 // 'I' && buf [2] == 0x4f // 'O' && buf [3] == 0x50)) // 'P' @@ -97,15 +96,15 @@ TAO_GIOP_Message_State::parse_magic_bytes (char *buf) ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("TAO (%P|%t) - ") ACE_TEXT ("TAO_GIOP_Message_State::parse_magic_bytes, ") - ACE_TEXT ("bad header: ") + ACE_TEXT ("bad %cIOP header: ") ACE_TEXT ("magic word [%02x,%02x,%02x,%02x]\n"), buf[0], + buf[0], buf[1], buf[2], buf[3])); return -1; } - return 0; } @@ -183,8 +182,7 @@ TAO_GIOP_Message_State::get_byte_order_info (char *buf) #if defined (TAO_HAS_ZIOP) && TAO_HAS_ZIOP ==1 // Read the compressed flag - this->compressed_ = - ((buf[TAO_GIOP_MESSAGE_FLAGS_OFFSET]& 0x04) == 4); + this->compressed_ = (buf[0] == 0x5A); #endif } diff --git a/TAO/tao/IIOP_Transport.cpp b/TAO/tao/IIOP_Transport.cpp index a554b36c987..e215d8cc623 100644 --- a/TAO/tao/IIOP_Transport.cpp +++ b/TAO/tao/IIOP_Transport.cpp @@ -231,7 +231,7 @@ TAO_IIOP_Transport::send_message (TAO_OutputCDR &stream, ACE_Time_Value *max_wait_time) { // Format the message in the stream first - if (this->messaging_object ()->format_message (stream) != 0) + if (this->messaging_object ()->format_message (stream, *stub) != 0) return -1; // This guarantees to send all data (bytes) or return an error. diff --git a/TAO/tao/Messaging/Asynch_Invocation.cpp b/TAO/tao/Messaging/Asynch_Invocation.cpp index 12f2223ebc4..7243dc28e76 100644 --- a/TAO/tao/Messaging/Asynch_Invocation.cpp +++ b/TAO/tao/Messaging/Asynch_Invocation.cpp @@ -76,8 +76,7 @@ namespace TAO cdr.message_attributes (this->details_.request_id (), this->resolver_.stub (), TAO_ONEWAY_REQUEST, - max_wait_time, - false); + max_wait_time); this->write_header (cdr); diff --git a/TAO/tao/ORB_Core.h b/TAO/tao/ORB_Core.h index b2f3d7172a3..d1324f2ffdf 100644 --- a/TAO/tao/ORB_Core.h +++ b/TAO/tao/ORB_Core.h @@ -205,9 +205,6 @@ public: TAO_ZIOP_Adapter *ziop_adapter () const; void ziop_adapter (TAO_ZIOP_Adapter *adapter); - CORBA::Boolean ziop_enabled () const; - void ziop_enabled (CORBA::Boolean value); - TAO_Service_Context_Registry &service_context_registry (void); /// Get the protocol factories diff --git a/TAO/tao/ORB_Core.inl b/TAO/tao/ORB_Core.inl index 9a2f70ce16d..491012809cd 100644 --- a/TAO/tao/ORB_Core.inl +++ b/TAO/tao/ORB_Core.inl @@ -454,18 +454,6 @@ TAO_ORB_Core::ziop_adapter (TAO_ZIOP_Adapter *adapter) this->ziop_adapter_ = adapter; } -ACE_INLINE CORBA::Boolean -TAO_ORB_Core::ziop_enabled () const -{ - return this->ziop_enabled_; -} - -ACE_INLINE void -TAO_ORB_Core::ziop_enabled (CORBA::Boolean value) -{ - this->ziop_enabled_ = value; -} - ACE_INLINE TAO::ORBInitializer_Registry_Adapter * TAO_ORB_Core::orbinitializer_registry () { diff --git a/TAO/tao/PortableServer/Upcall_Wrapper.cpp b/TAO/tao/PortableServer/Upcall_Wrapper.cpp index 9e7ce66d489..fe652ebf4c2 100644 --- a/TAO/tao/PortableServer/Upcall_Wrapper.cpp +++ b/TAO/tao/PortableServer/Upcall_Wrapper.cpp @@ -4,7 +4,6 @@ #include "tao/PortableServer/Upcall_Command.h" #include "tao/PortableServer/Collocated_Arguments_Converter.h" #include "tao/SystemException.h" -#include "tao/ZIOP_Adapter.h" #if TAO_HAS_INTERCEPTORS == 1 # include "tao/ServerRequestInterceptor_Adapter.h" @@ -246,33 +245,20 @@ TAO::Upcall_Wrapper::post_upcall (TAO_ServerRequest& server_request, TAO::Argument * const * args, size_t nargs) { -#if defined (TAO_HAS_ZIOP) && TAO_HAS_ZIOP ==1 - // Marshal the operation "inout" and "out" arguments and return - // value, if any. - TAO_ZIOP_Adapter* ziop_adapter = server_request.orb_core ()->ziop_adapter (); + TAO_OutputCDR & cdr = (*server_request.outgoing ()); + TAO::Argument * const * const begin = args; + TAO::Argument * const * const end = args + nargs; - if (ziop_adapter) - { - ziop_adapter->marshal_reply_data (server_request, args, nargs); - } - else -#endif + for (TAO::Argument * const * i = begin; i != end; ++i) { - TAO_OutputCDR & cdr = (*server_request.outgoing ()); - TAO::Argument * const * const begin = args; - TAO::Argument * const * const end = args + nargs; - - for (TAO::Argument * const * i = begin; i != end; ++i) + if (!(*i)->marshal (cdr)) { - if (!(*i)->marshal (cdr)) - { - TAO_OutputCDR::throw_skel_exception (errno); - } + TAO_OutputCDR::throw_skel_exception (errno); } - - // Reply body marshaling completed. No other fragments to send. - cdr.more_fragments (false); } + + // Reply body marshaling completed. No other fragments to send. + cdr.more_fragments (false); } TAO_END_VERSIONED_NAMESPACE_DECL diff --git a/TAO/tao/Remote_Invocation.cpp b/TAO/tao/Remote_Invocation.cpp index 3533769987c..e937eb30d7d 100644 --- a/TAO/tao/Remote_Invocation.cpp +++ b/TAO/tao/Remote_Invocation.cpp @@ -11,7 +11,6 @@ #include "tao/Network_Priority_Protocols_Hooks.h" #include "tao/debug.h" #include "tao/SystemException.h" -#include "tao/ZIOP_Adapter.h" ACE_RCSID (tao, Remote_Invocation, @@ -120,22 +119,11 @@ namespace TAO void Remote_Invocation::marshal_data (TAO_OutputCDR &out_stream) { -#if defined (TAO_HAS_ZIOP) && TAO_HAS_ZIOP ==1 - TAO_ZIOP_Adapter* ziop_adapter = this->stub()->orb_core()->ziop_adapter (); - - if (ziop_adapter) - { - ziop_adapter->marshal_data (this->details_, out_stream, this->resolver_); - } - else -#endif - { - // Marshal application data - if (this->details_.marshal_args (out_stream) == false) - { - throw ::CORBA::MARSHAL (); - } - } + // Marshal application data + if (this->details_.marshal_args (out_stream) == false) + { + throw ::CORBA::MARSHAL (); + } } Invocation_Status diff --git a/TAO/tao/Strategies/DIOP_Transport.cpp b/TAO/tao/Strategies/DIOP_Transport.cpp index 48336b9d08e..b57595ee8e7 100644 --- a/TAO/tao/Strategies/DIOP_Transport.cpp +++ b/TAO/tao/Strategies/DIOP_Transport.cpp @@ -251,7 +251,7 @@ TAO_DIOP_Transport::send_message (TAO_OutputCDR &stream, ACE_Time_Value *max_wait_time) { // Format the message in the stream first - if (this->messaging_object ()->format_message (stream) != 0) + if (this->messaging_object ()->format_message (stream, *stub) != 0) return -1; // Strictly speaking, should not need to loop here because the diff --git a/TAO/tao/Strategies/SHMIOP_Transport.cpp b/TAO/tao/Strategies/SHMIOP_Transport.cpp index ea571797386..fad20ed3f2e 100644 --- a/TAO/tao/Strategies/SHMIOP_Transport.cpp +++ b/TAO/tao/Strategies/SHMIOP_Transport.cpp @@ -298,7 +298,7 @@ TAO_SHMIOP_Transport::send_message (TAO_OutputCDR &stream, ACE_Time_Value *max_wait_time) { // Format the message in the stream first - if (this->messaging_object ()->format_message (stream) != 0) + if (this->messaging_object ()->format_message (stream, *stub) != 0) return -1; // Strictly speaking, should not need to loop here because the diff --git a/TAO/tao/Synch_Invocation.cpp b/TAO/tao/Synch_Invocation.cpp index 45690f51c54..703da9b2180 100644 --- a/TAO/tao/Synch_Invocation.cpp +++ b/TAO/tao/Synch_Invocation.cpp @@ -91,13 +91,12 @@ namespace TAO cdr.message_attributes (this->details_.request_id (), this->resolver_.stub (), TAO_TWOWAY_REQUEST, - max_wait_time, - false); + max_wait_time); this->write_header (cdr); this->marshal_data (cdr); - + // Register a reply dispatcher for this invocation. Use the // preallocated reply dispatcher. TAO_Bind_Dispatcher_Guard dispatch_guard ( @@ -666,8 +665,7 @@ namespace TAO cdr.message_attributes (this->details_.request_id (), this->resolver_.stub (), TAO_ONEWAY_REQUEST, - max_wait_time, - false); + max_wait_time); this->write_header (cdr); @@ -685,12 +683,12 @@ namespace TAO else { if (TAO_debug_level > 4) - ACE_DEBUG ((LM_DEBUG, - "TAO (%P|%t) - Synch_Oneway_Invocation::" - "remote_oneway, queueing message\n")); + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - Synch_Oneway_Invocation::" + "remote_oneway, queueing message\n")); - if (transport->format_queue_message (cdr, max_wait_time) != 0) - s = TAO_INVOKE_FAILURE; + if (transport->format_queue_message (cdr, max_wait_time, *this->resolver_.stub()) != 0) + s = TAO_INVOKE_FAILURE; } } diff --git a/TAO/tao/TAO_Server_Request.cpp b/TAO/tao/TAO_Server_Request.cpp index d26faba992b..7a133e38e47 100644 --- a/TAO/tao/TAO_Server_Request.cpp +++ b/TAO/tao/TAO_Server_Request.cpp @@ -270,8 +270,7 @@ TAO_ServerRequest::init_reply (void) this->outgoing_->message_attributes (this->request_id_, 0, TAO_REPLY, - 0, - false); + 0); // Construct a REPLY header. this->mesg_base_->generate_reply_header (*this->outgoing_, reply_params); @@ -318,8 +317,7 @@ TAO_ServerRequest::send_no_exception_reply (void) this->outgoing_->message_attributes (this->request_id_, 0, TAO_REPLY, - 0, - false); + 0); // Construct a REPLY header. this->mesg_base_->generate_reply_header (*this->outgoing_, reply_params); @@ -526,8 +524,7 @@ TAO_ServerRequest::send_cached_reply (CORBA::OctetSeq &s) this->outgoing_->message_attributes (this->request_id_, 0, TAO_REPLY, - 0, - false); + 0); // Make the reply message if (this->mesg_base_->generate_reply_header (*this->outgoing_, diff --git a/TAO/tao/Transport.cpp b/TAO/tao/Transport.cpp index ecfa26e9042..45fde50d47f 100644 --- a/TAO/tao/Transport.cpp +++ b/TAO/tao/Transport.cpp @@ -559,9 +559,10 @@ TAO_Transport::handle_output (ACE_Time_Value *max_wait_time) int TAO_Transport::format_queue_message (TAO_OutputCDR &stream, - ACE_Time_Value *max_wait_time) + ACE_Time_Value *max_wait_time, + TAO_Stub& stub) { - if (this->messaging_object ()->format_message (stream) != 0) + if (this->messaging_object ()->format_message (stream, stub) != 0) return -1; return this->queue_message_i (stream.begin (), max_wait_time); @@ -2259,7 +2260,6 @@ TAO_Transport::handle_input_parse_data (TAO_Resume_Handle &rh, // putting them into queue. When this is done we can return // to process this message, and notifying other threads to // process the messages in queue. - char * end_marker = message_block.rd_ptr () + mesg_length; @@ -2329,7 +2329,6 @@ TAO_Transport::handle_input_parse_data (TAO_Resume_Handle &rh, { return -1; } - // move the rd_ptr tp position of end_marker message_block.rd_ptr (end_marker); } diff --git a/TAO/tao/Transport.h b/TAO/tao/Transport.h index 6c6c01dd703..712da3f1f89 100644 --- a/TAO/tao/Transport.h +++ b/TAO/tao/Transport.h @@ -703,7 +703,8 @@ public: /// @param max_wait_time The maximum time that the operation can /// block, used in the implementation of timeouts. int format_queue_message (TAO_OutputCDR &stream, - ACE_Time_Value *max_wait_time); + ACE_Time_Value *max_wait_time, + TAO_Stub& stub); /// Send a message block chain, int send_message_block_chain (const ACE_Message_Block *message_block, diff --git a/TAO/tao/ZIOP/ZIOP.cpp b/TAO/tao/ZIOP/ZIOP.cpp index 945386be7fb..44b94319b96 100644 --- a/TAO/tao/ZIOP/ZIOP.cpp +++ b/TAO/tao/ZIOP/ZIOP.cpp @@ -103,11 +103,35 @@ TAO_ZIOP_Loader::Initializer (void) return ACE_Service_Config::process_directive (ace_svc_desc_TAO_ZIOP_Loader); } + +bool +TAO_ZIOP_Loader::decompress (Compression::Compressor_ptr compressor, + const ::Compression::Buffer &source, + ::Compression::Buffer &target) +{ + try + { + compressor->decompress (source, target); + } + catch (::Compression::CompressionException &e) + { + ACE_ERROR_RETURN((LM_ERROR, + ACE_TEXT ("Decompression failed. Reason: %s. ") + ACE_TEXT ("Description : %s "), + ACE_TEXT ("Summary : %s\n"), + e.reason, e.description, e._info ()), + false); + } + + return true; +} + bool -TAO_ZIOP_Loader::decompress (TAO_ServerRequest& server_request) +TAO_ZIOP_Loader::decompress (ACE_Data_Block **db, TAO_Queued_Data& qd, + TAO_ORB_Core& orb_core) { CORBA::Object_var compression_manager = - server_request.orb_core()->resolve_compression_manager(); + orb_core.resolve_compression_manager(); Compression::CompressionManager_var manager = Compression::CompressionManager::_narrow (compression_manager.in ()); @@ -115,16 +139,53 @@ TAO_ZIOP_Loader::decompress (TAO_ServerRequest& server_request) if (!CORBA::is_nil(manager.in ())) { ZIOP::CompressedData data; - if (!(*(server_request.incoming()) >> data)) + //first set the read pointer after the header + size_t begin = qd.msg_block ()-> rd_ptr() - qd.msg_block ()->base (); + char * initial_rd_ptr = qd.msg_block ()-> rd_ptr(); + size_t const wr = qd.msg_block ()->wr_ptr () - qd.msg_block ()->base (); + + TAO_InputCDR cdr (*db, + qd.msg_block ()->self_flags (), + begin + TAO_GIOP_MESSAGE_HEADER_LEN, + wr, + qd.byte_order (), + qd.giop_version ().major_version (), + qd.giop_version ().minor_version (), + &orb_core); + + if (!(cdr >> data)) return false; - - Compression::Compressor_var compressor = manager->get_compressor (data.compressorid, 6); + + Compression::Compressor_var compressor = + manager->get_compressor (data.compressorid, 6); CORBA::OctetSeq myout; myout.length (data.original_length); - compressor->decompress (data.data, myout); - TAO_InputCDR* newstream = new TAO_InputCDR ((char*)myout.get_buffer(true), (size_t)data.original_length); - server_request.incoming()->steal_from (*newstream); + if (decompress(compressor.in(), data.data, myout)) + { + ACE_Message_Block *mb = new ACE_Message_Block(); + + mb->size ((size_t)(data.original_length + + TAO_GIOP_MESSAGE_HEADER_LEN)); + + qd.msg_block ()->rd_ptr (initial_rd_ptr); + + mb->copy(qd.msg_block ()->base () + begin, + TAO_GIOP_MESSAGE_HEADER_LEN); + + if (mb->copy((char*)myout.get_buffer(true), + (size_t)data.original_length) != 0) + ACE_ERROR_RETURN((LM_ERROR, + ACE_TEXT ("TAO - (%P|%t) - ") + ACE_TEXT ("Failed to copy decompressed data : ") + ACE_TEXT ("Buffer too small\n")), + false); + //change it into a GIOP message.. + mb->base ()[0] = 0x47; + ACE_CDR::mb_align (mb); + *db = mb->data_block (); + return true; + } } else { @@ -135,26 +196,15 @@ TAO_ZIOP_Loader::decompress (TAO_ServerRequest& server_request) } CORBA::ULong -TAO_ZIOP_Loader::compression_low_value (TAO::Profile_Transport_Resolver &resolver) const +TAO_ZIOP_Loader::compression_policy_value (CORBA::Policy_ptr policy) const { CORBA::ULong result = 0; #if defined (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0 - CORBA::Policy_var policy = CORBA::Policy::_nil (); - if (resolver.stub () == 0) - { - policy = - resolver.stub()->orb_core()->get_cached_policy_including_current (TAO_CACHED_COMPRESSION_LOW_VALUE_POLICY); - } - else - { - policy = resolver.stub ()->get_cached_policy (TAO_CACHED_COMPRESSION_LOW_VALUE_POLICY); - } - - if (!CORBA::is_nil (policy.in ())) + if (!CORBA::is_nil (policy)) { ZIOP::CompressionLowValuePolicy_var srp = - ZIOP::CompressionLowValuePolicy::_narrow (policy.in ()); + ZIOP::CompressionLowValuePolicy::_narrow (policy); if (!CORBA::is_nil (srp.in ())) { @@ -162,7 +212,7 @@ TAO_ZIOP_Loader::compression_low_value (TAO::Profile_Transport_Resolver &resolve } } #else - ACE_UNUSED_ARG (resolver); + ACE_UNUSED_ARG (policy); #endif return result; } @@ -176,270 +226,296 @@ TAO_ZIOP_Loader::compress (Compression::Compressor_ptr compressor, { compressor->compress (source, target); } - catch (...) + catch (::Compression::CompressionException &e) { - return false; + ACE_ERROR_RETURN((LM_ERROR, + ACE_TEXT ("Compression failed. Reason: %s. ") + ACE_TEXT ("Description : %s "), + ACE_TEXT ("Summary : %s\n"), + e.reason, e.description, e._info ()), + false); } return true; } bool -TAO_ZIOP_Loader::check_min_ratio (CORBA::ULong /* original_data_length */, CORBA::ULong /*compressed_length*/) const +TAO_ZIOP_Loader::check_min_ratio (::Compression::CompressionRatio ratio, + CORBA::Long min_ratio) const { -/* ::Compression::CompressionRatio ratio = 100 - (compressed_length /original_length) * 100; - if (resolver.stub () == 0) + bool accepted = min_ratio == 0 || ratio > min_ratio; + if (TAO_debug_level > 8) { - policy = - resolver.stub()->orb_core()->get_cached_policy_including_current (TAO_CACHED_COMPRESSION_ENABLING_POLICY); + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("TAO (%P|%t) - TAO_ZIOP_Loader::check_min_ratio: ") + ACE_TEXT ("Ratio:%d Accepted:%d\n"), + ratio, accepted)); + } + return accepted; +} + + +bool +TAO_ZIOP_Loader::get_compressor_details ( + ::Compression::CompressorIdLevelList *list, + Compression::CompressorId &compressor_id, + Compression::CompressionLevel &compression_level) + +{ + if (list) + { + compressor_id = (*list)[0].compressor_id; + compression_level = (*list)[0].compression_level; } else { - policy = resolver.stub ()->get_cached_policy (TAO_CACHED_COMPRESSION_ENABLING_POLICY); - }*/ + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("TAO (%P|%t) - ") + ACE_TEXT ("TAO_ZIOP_Loader::get_compressor_details: ") + ACE_TEXT ("No appropriate compressor found\n"))); + return false; + } return true; } bool -TAO_ZIOP_Loader::marshal_data (TAO_Operation_Details &details, TAO_OutputCDR &stream, TAO::Profile_Transport_Resolver &resolver) +TAO_ZIOP_Loader::get_compression_details( + CORBA::Policy_ptr compression_enabling_policy, + CORBA::Policy_ptr compression_level_list_policy, + Compression::CompressorId &compressor_id, + Compression::CompressionLevel &compression_level) { -#if defined (TAO_HAS_ZIOP) && TAO_HAS_ZIOP == 0 - ACE_UNUSED_ARG (details); - ACE_UNUSED_ARG (stream); - ACE_UNUSED_ARG (resolver); -#else - CORBA::Boolean use_ziop = false; - Compression::CompressorId compressor_id = Compression::COMPRESSORID_ZLIB; - Compression::CompressionLevel compression_level = 0; - - CORBA::Policy_var policy = resolver.stub ()->get_cached_policy (TAO_CACHED_COMPRESSION_ENABLING_POLICY); - - if (!CORBA::is_nil (policy.in ())) + bool use_ziop = false; + if (!CORBA::is_nil (compression_enabling_policy)) { ZIOP::CompressionEnablingPolicy_var srp = - ZIOP::CompressionEnablingPolicy::_narrow (policy.in ()); + ZIOP::CompressionEnablingPolicy::_narrow (compression_enabling_policy); if (!CORBA::is_nil (srp.in ())) { use_ziop = srp->compression_enabled (); + if (!use_ziop && TAO_debug_level > 8) + { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("TAO (%P|%t) - ") + ACE_TEXT ("TAO_ZIOP_Loader::get_compression_details: ") + ACE_TEXT ("No ZIOP policy set\n"))); + } } } - + else + { + ACE_ERROR((LM_ERROR, + ACE_TEXT("TAO (%P|%t) - ") + ACE_TEXT("TAO_ZIOP_Loader::get_compression_details : ") + ACE_TEXT("compression_enabling_policy is NIL. No ZIOP\n"))); + } + if (use_ziop) { - policy = resolver.stub ()->get_cached_policy (TAO_CACHED_COMPRESSION_ID_LEVEL_LIST_POLICY); - - if (!CORBA::is_nil (policy.in ())) + if (!CORBA::is_nil (compression_level_list_policy)) { ZIOP::CompressorIdLevelListPolicy_var srp = - ZIOP::CompressorIdLevelListPolicy::_narrow (policy.in ()); + ZIOP::CompressorIdLevelListPolicy::_narrow (compression_level_list_policy); if (!CORBA::is_nil (srp.in ())) { - ::Compression::CompressorIdLevelList* list = srp->compressor_ids (); - if (list) - { - compressor_id = (*list)[0].compressor_id; - compression_level = (*list)[0].compression_level; - } - else - { - // No compatible compressor found - use_ziop = false; - } + use_ziop = get_compressor_details (srp->compressor_ids (), + compressor_id, compression_level); } } + else + { + ACE_ERROR((LM_ERROR, + ACE_TEXT("TAO (%P|%t) - ") + ACE_TEXT("TAO_ZIOP_Loader::get_compression_details : ") + ACE_TEXT("Compression level policy not found\n"))); + } } + return use_ziop; +} - ACE_Message_Block* current = const_cast <ACE_Message_Block*> (stream.current ()); - - if (use_ziop) +void +TAO_ZIOP_Loader::complete_compression (Compression::Compressor_ptr compressor, + TAO_OutputCDR &cdr, + ACE_Message_Block& mb, + char *initial_rd_ptr, + CORBA::ULong low_value, + CORBA::Long min_ratio, + CORBA::ULong original_data_length, + Compression::CompressorId compressor_id) +{ + if (low_value > 0 && original_data_length > low_value) { - // Set the read pointer to the point where the application data starts - current->rd_ptr (current->wr_ptr()); - } + CORBA::OctetSeq myout; + CORBA::OctetSeq input (original_data_length, &mb); + myout.length (original_data_length); - // Marshal application data - if (!details.marshal_args (stream)) - { - throw ::CORBA::MARSHAL (); + bool compressed = this->compress (compressor, input, myout); + + if (compressed && + (myout.length () < original_data_length) && + (this->check_min_ratio (compressor->compression_ratio(), + min_ratio))) + { + mb.wr_ptr (mb.rd_ptr ()); + cdr.current_alignment (mb.wr_ptr() - mb.base ()); + ZIOP::CompressedData data; + data.compressorid = compressor_id; + data.original_length = input.length(); + data.data = myout; + cdr << data; + mb.rd_ptr(initial_rd_ptr); + int begin = (mb.rd_ptr() - mb.base ()); + mb.data_block ()->base ()[0 + begin] = 0x5A; + mb.data_block ()->base ()[TAO_GIOP_MESSAGE_SIZE_OFFSET + begin] = + cdr.length() - TAO_GIOP_MESSAGE_HEADER_LEN; + } } + else if (TAO_debug_level > 8) + { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("TAO (%P|%t) - ") + ACE_TEXT ("TAO_ZIOP_Loader::compress_data: ") + ACE_TEXT ("No compression used") + ACE_TEXT ("->Low Value Policy applied\n"))); + } +} - current = const_cast <ACE_Message_Block*> (stream.current()); - CORBA::ULong const original_data_length =(CORBA::ULong)(current->wr_ptr() - current->rd_ptr()); +bool +TAO_ZIOP_Loader::compress_data (TAO_OutputCDR &cdr, + CORBA::Object_ptr compression_manager, + CORBA::ULong low_value, + CORBA::Long min_ratio, + Compression::CompressorId compressor_id, + Compression::CompressionLevel compression_level) +{ + cdr.consolidate (); + + ACE_Message_Block* current = const_cast <ACE_Message_Block*> (cdr.current ()); - if (use_ziop && original_data_length > 0) - { - // We can only compress one message block, so when compression is enabled first do - // a consolidate. - stream.consolidate (); + char* initial_rd_ptr = current->rd_ptr(); - CORBA::Object_var compression_manager = - resolver.stub()->orb_core()->resolve_compression_manager(); + // Set the read pointer to the point where the data starts + current->rd_ptr (TAO_GIOP_MESSAGE_HEADER_LEN); + + current = const_cast <ACE_Message_Block*> (cdr.current()); + CORBA::ULong const original_data_length = + (CORBA::ULong)(current->wr_ptr() - current->rd_ptr()); + if (original_data_length > 0) + { Compression::CompressionManager_var manager = - Compression::CompressionManager::_narrow (compression_manager.in ()); + Compression::CompressionManager::_narrow (compression_manager); if (!CORBA::is_nil(manager.in ())) { - Compression::Compressor_var compressor = manager->get_compressor (compressor_id, compression_level); - - if (original_data_length > this->compression_low_value (resolver)) - { - CORBA::OctetSeq myout; - CORBA::OctetSeq input (original_data_length, current); - myout.length (original_data_length); - - bool compressed = this->compress (compressor.in (), input, myout); - - if (compressed && (myout.length () < original_data_length) && (this->check_min_ratio (original_data_length, myout.length()))) - { - stream.compressed (true); - current->wr_ptr (current->rd_ptr ()); - stream.current_alignment (current->wr_ptr() - current->base ()); - ZIOP::CompressedData data; - data.compressorid = compressor_id; - data.original_length = input.length(); - data.data = myout; - stream << data; - } - } + Compression::Compressor_var compressor = + manager->get_compressor (compressor_id, compression_level); + + complete_compression(compressor.in (), cdr, *current, + initial_rd_ptr, low_value, min_ratio, + original_data_length, compressor_id); } } - - // Set the read pointer back to the starting point - current->rd_ptr (current->base ()); -#endif - + //set back read pointer in case no compression was done... + current->rd_ptr(initial_rd_ptr); return true; } bool -TAO_ZIOP_Loader::marshal_reply_data (TAO_ServerRequest& server_request, - TAO::Argument * const * args, - size_t nargs) +TAO_ZIOP_Loader::marshal_data (TAO_OutputCDR &cdr, TAO_Stub& stub) { #if defined (TAO_HAS_ZIOP) && TAO_HAS_ZIOP == 0 - ACE_UNUSED_ARG (server_request); - ACE_UNUSED_ARG (args); - ACE_UNUSED_ARG (nargs); + ACE_UNUSED_ARG (cdr); + ACE_UNUSED_ARG (stub); + return true; #else CORBA::Boolean use_ziop = false; Compression::CompressorId compressor_id = Compression::COMPRESSORID_ZLIB; Compression::CompressionLevel compression_level = 0; - TAO_Transport& transport = *server_request.transport (); - - CORBA::Policy_var policy = transport.orb_core()->get_cached_policy_including_current (TAO_CACHED_COMPRESSION_ENABLING_POLICY); - - if (!CORBA::is_nil (policy.in ())) - { - ZIOP::CompressionEnablingPolicy_var srp = - ZIOP::CompressionEnablingPolicy::_narrow (policy.in ()); - - if (!CORBA::is_nil (srp.in ())) - { - use_ziop = srp->compression_enabled (); - } - } + + CORBA::Policy_var compression_enabling_policy = + stub.get_cached_policy (TAO_CACHED_COMPRESSION_ENABLING_POLICY); + CORBA::Policy_var compression_level_list_policy = + stub.get_cached_policy (TAO_CACHED_COMPRESSION_ID_LEVEL_LIST_POLICY); + + use_ziop = get_compression_details(compression_enabling_policy.in (), + compression_level_list_policy.in (), + compressor_id, compression_level); if (use_ziop) { - policy = transport.orb_core()->get_cached_policy_including_current (TAO_CACHED_COMPRESSION_ID_LEVEL_LIST_POLICY); + CORBA::Object_var compression_manager = + stub.orb_core ()->resolve_compression_manager(); - if (!CORBA::is_nil (policy.in ())) - { - ZIOP::CompressorIdLevelListPolicy_var srp = - ZIOP::CompressorIdLevelListPolicy::_narrow (policy.in ()); + CORBA::Policy_var policy_low_value = + stub.get_cached_policy (TAO_CACHED_COMPRESSION_LOW_VALUE_POLICY); + CORBA::Policy_var policy_min_ratio = + stub.get_cached_policy (TAO_CACHED_MIN_COMPRESSION_RATIO_POLICY); - if (!CORBA::is_nil (srp.in ())) - { - ::Compression::CompressorIdLevelList* list = srp->compressor_ids (); - if (list) - { - compressor_id = (*list)[0].compressor_id; - compression_level = (*list)[0].compression_level; - } - else - { - // No compatible compressor found - use_ziop = false; - } - } - } - } + CORBA::ULong low_value = + this->compression_policy_value (policy_low_value.in ()); + CORBA::Long min_ratio = + this->compression_policy_value (policy_min_ratio.in ()); - TAO_OutputCDR & stream = (*server_request.outgoing ()); - ACE_Message_Block* current = const_cast <ACE_Message_Block*> (stream.current ()); - - if (use_ziop) - { - // Set the read pointer to the point where the application data starts - current->rd_ptr (current->wr_ptr()); + return compress_data(cdr, compression_manager.in (), + low_value, min_ratio, + compressor_id, compression_level); } + return false; +#endif +} - // Marshal application data - TAO::Argument * const * const begin = args; - TAO::Argument * const * const end = args + nargs; - - for (TAO::Argument * const * i = begin; i != end; ++i) - { - if (!(*i)->marshal (stream)) - { - TAO_OutputCDR::throw_skel_exception (errno); - } - } - - // Reply body marshaling completed. No other fragments to send. - stream.more_fragments (false); - - current = const_cast <ACE_Message_Block*> (stream.current()); - CORBA::ULong const original_data_length =(CORBA::ULong)(current->wr_ptr() - current->rd_ptr()); +bool +TAO_ZIOP_Loader::marshal_data (TAO_OutputCDR& cdr, TAO_ORB_Core& orb_core) +{ +#if defined (TAO_HAS_ZIOP) && TAO_HAS_ZIOP == 0 + ACE_UNUSED_ARG (cdr); + ACE_UNUSED_ARG (orb_core); + return true; +#else + CORBA::Boolean use_ziop = false; + Compression::CompressorId compressor_id = Compression::COMPRESSORID_ZLIB; + Compression::CompressionLevel compression_level = 0; - if (use_ziop && original_data_length > 0) + CORBA::Policy_var compression_enabling_policy = + orb_core.get_cached_policy_including_current + (TAO_CACHED_COMPRESSION_ENABLING_POLICY); + + CORBA::Policy_var compression_level_list_policy = + orb_core.get_cached_policy_including_current + (TAO_CACHED_COMPRESSION_ID_LEVEL_LIST_POLICY); + + use_ziop = get_compression_details(compression_enabling_policy.in (), + compression_level_list_policy.in (), + compressor_id, compression_level); + + if (use_ziop) { - // We can only compress one message block, so when compression is enabled first do - // a consolidate. - stream.consolidate (); - CORBA::Object_var compression_manager = - server_request.transport ()->orb_core()->resolve_compression_manager(); - - Compression::CompressionManager_var manager = - Compression::CompressionManager::_narrow (compression_manager.in ()); - - if (!CORBA::is_nil(manager.in ())) - { - Compression::Compressor_var compressor = manager->get_compressor (compressor_id, compression_level); - -// if (original_data_length > this->compression_low_value (resolver) - if (1) - { - CORBA::OctetSeq myout; - CORBA::OctetSeq input (original_data_length, current); - myout.length (original_data_length); - - bool compressed = this->compress (compressor.in (), input, myout); - - if (compressed && (myout.length () < original_data_length)) // && (this->check_min_ratio (original_data_length, myout.length()))) - { - stream.compressed (true); - current->wr_ptr (current->rd_ptr ()); - stream.current_alignment (current->wr_ptr() - current->base ()); - ZIOP::CompressedData data; - data.compressorid = compressor_id; - data.original_length = input.length(); - data.data = myout; - stream << data; - } - } - } + orb_core.resolve_compression_manager(); + + CORBA::Policy_var policy_low_value = + orb_core.get_cached_policy_including_current + (TAO_CACHED_COMPRESSION_LOW_VALUE_POLICY); + + CORBA::Policy_var policy_min_ratio = + orb_core.get_cached_policy_including_current + (TAO_CACHED_MIN_COMPRESSION_RATIO_POLICY); + + CORBA::ULong low_value = + this->compression_policy_value (policy_low_value.in ()); + CORBA::Long min_ratio = + this->compression_policy_value (policy_min_ratio.in ()); + + return compress_data(cdr, compression_manager.in (), + low_value, min_ratio, + compressor_id, compression_level); } + return false; - // Set the read pointer back to the starting point - current->rd_ptr (current->base ()); #endif - - return true; } diff --git a/TAO/tao/ZIOP/ZIOP.h b/TAO/tao/ZIOP/ZIOP.h index c21dc518c51..310117f45fb 100644 --- a/TAO/tao/ZIOP/ZIOP.h +++ b/TAO/tao/ZIOP/ZIOP.h @@ -47,10 +47,11 @@ public: /// Destructor virtual ~TAO_ZIOP_Loader (void); - virtual bool decompress (TAO_ServerRequest& server_request); + virtual bool decompress (ACE_Data_Block **db, TAO_Queued_Data& qd, TAO_ORB_Core& orb_core); // Compress the @a stream. Starting point of the compression is rd_ptr() - virtual bool marshal_data (TAO_Operation_Details &details, TAO_OutputCDR &stream, TAO::Profile_Transport_Resolver &resolver); + virtual bool marshal_data (TAO_OutputCDR& cdr, TAO_Stub& stub); + virtual bool marshal_data (TAO_OutputCDR& cdr, TAO_ORB_Core& orb_core); /// Initialize the BiDIR loader hooks. virtual int init (int argc, ACE_TCHAR* []); @@ -60,10 +61,6 @@ public: /// Used to force the initialization of the ORB code. static int Initializer (void); - bool marshal_reply_data (TAO_ServerRequest& server_request, - TAO::Argument * const * args, - size_t nargs); - private: /// Flag to indicate whether the ZIOP library has been @@ -71,13 +68,44 @@ private: static bool is_activated_; /// Get the compression low value, returns 0 when it is not set - CORBA::ULong compression_low_value (TAO::Profile_Transport_Resolver &resolver) const; + CORBA::ULong compression_policy_value (CORBA::Policy_ptr policy) const; + + bool get_compressor_details ( + ::Compression::CompressorIdLevelList *list, + Compression::CompressorId &compressor_id, + Compression::CompressionLevel &compression_level); + + bool get_compression_details(CORBA::Policy_ptr compression_enabling_policy, + CORBA::Policy_ptr compression_level_list_policy, + Compression::CompressorId &compressor_id, + Compression::CompressionLevel &compression_level); + + void complete_compression (Compression::Compressor_ptr compressor, + TAO_OutputCDR &cdr, + ACE_Message_Block& mb, + char *initial_rd_ptr, + CORBA::ULong low_value, + CORBA::Long min_ratio, + CORBA::ULong original_data_length, + Compression::CompressorId compressor_id); + + bool compress_data (TAO_OutputCDR &cdr, + CORBA::Object_ptr compression_manager, + CORBA::ULong low_value, + CORBA::Long min_ratio, + ::Compression::CompressorId compressor_id, + ::Compression::CompressionLevel compression_level); bool compress (Compression::Compressor_ptr compressor, const ::Compression::Buffer &source, ::Compression::Buffer &target); + + bool decompress (Compression::Compressor_ptr compressor, + const ::Compression::Buffer &source, + ::Compression::Buffer &target); - bool check_min_ratio (CORBA::ULong original_data_length, CORBA::ULong compressed_length) const; + bool check_min_ratio (::Compression::CompressionRatio ratio, + CORBA::Long min_ratio) const; }; static int diff --git a/TAO/tao/ZIOP/ZIOP.pidl b/TAO/tao/ZIOP/ZIOP.pidl index 35cee2379f3..d3b49079881 100644 --- a/TAO/tao/ZIOP/ZIOP.pidl +++ b/TAO/tao/ZIOP/ZIOP.pidl @@ -19,7 +19,7 @@ module ZIOP /** * Tag Id for CompressionEnablingPolicy */ - const CORBA::PolicyType COMPRESSION_ENABLING_POLICY_ID = 5555; + const CORBA::PolicyType COMPRESSION_ENABLING_POLICY_ID = 64; /** * The ZIOP CompressionEnablingPolicy. Has an boolean attribute indicating @@ -35,7 +35,7 @@ module ZIOP /** * Tag Id for CompressorIdPolicy */ - const CORBA::PolicyType COMPRESSOR_ID_LEVEL_LIST_POLICY_ID = 5556; + const CORBA::PolicyType COMPRESSOR_ID_LEVEL_LIST_POLICY_ID = 65; /** * The ZIOP CompressorIdListPolicy. Has an CompressorId attribute indicating @@ -46,7 +46,7 @@ module ZIOP readonly attribute Compression::CompressorIdLevelList compressor_ids; }; - const CORBA::PolicyType COMPRESSION_LOW_VALUE_POLICY_ID = 5557; + const CORBA::PolicyType COMPRESSION_LOW_VALUE_POLICY_ID = 66; typedef unsigned long CompressionLowValuePolicyValue; @@ -55,7 +55,11 @@ module ZIOP readonly attribute CompressionLowValuePolicyValue low_value; }; - const CORBA::PolicyType COMPRESSION_MIN_RATIO_POLICY_ID = 5558; + /** + * The ZIOP CompressionMinRatioPolicy. If the compression ratio is smaller + * than this setting, a message is send uncompressed. + */ + const CORBA::PolicyType COMPRESSION_MIN_RATIO_POLICY_ID = 67; local interface CompressionMinRatioPolicy : CORBA::Policy { diff --git a/TAO/tao/ZIOP/ZIOP_PolicyFactory.cpp b/TAO/tao/ZIOP/ZIOP_PolicyFactory.cpp index e5e49a738ac..dcf763a285b 100644 --- a/TAO/tao/ZIOP/ZIOP_PolicyFactory.cpp +++ b/TAO/tao/ZIOP/ZIOP_PolicyFactory.cpp @@ -25,7 +25,6 @@ TAO_ZIOP_PolicyFactory::create_policy ( ::CORBA::Boolean val; // Extract the value from the any. - if (!(value >>= CORBA::Any::to_boolean (val))) { throw CORBA::PolicyError (CORBA::BAD_POLICY_VALUE); @@ -66,7 +65,6 @@ TAO_ZIOP_PolicyFactory::create_policy ( ::CORBA::ULong val; // Extract the value from the any. - if (!(value >>= val)) { throw CORBA::PolicyError (CORBA::BAD_POLICY_VALUE); @@ -87,7 +85,6 @@ TAO_ZIOP_PolicyFactory::create_policy ( ::Compression::CompressionRatio val; // Extract the value from the any. - if (!(value >>= val)) { throw CORBA::PolicyError (CORBA::BAD_POLICY_VALUE); diff --git a/TAO/tao/ZIOP/ZIOP_Policy_Validator.cpp b/TAO/tao/ZIOP/ZIOP_Policy_Validator.cpp index bec32269420..9d079b1fe2d 100644 --- a/TAO/tao/ZIOP/ZIOP_Policy_Validator.cpp +++ b/TAO/tao/ZIOP/ZIOP_Policy_Validator.cpp @@ -32,65 +32,80 @@ TAO_ZIOPPolicy_Validator::validate_impl (TAO_Policy_Set &policies) if (srp.in () == 0) return; - - // Set the flag in the ORB_Core - orb_core_.ziop_enabled (srp->compression_enabled ()); } void TAO_ZIOPPolicy_Validator::merge_policies_impl (TAO_Policy_Set &policies) { - // Check if the user has specified the priority model policy. - CORBA::Policy_var priority_model = + // Check if the user has specified the compression enabled policy. + CORBA::Policy_var compression_enabled = policies.get_cached_policy (TAO_CACHED_COMPRESSION_ENABLING_POLICY); - if (CORBA::is_nil (priority_model.in ())) + if (CORBA::is_nil (compression_enabled.in ())) { - // If not, check if the priority model policy has been specified + // If not, check if the compression enabled policy has been specified // at the ORB level. - priority_model = + compression_enabled = this->orb_core_.get_cached_policy (TAO_CACHED_COMPRESSION_ENABLING_POLICY); - if (!CORBA::is_nil (priority_model.in ())) + if (!CORBA::is_nil (compression_enabled.in ())) { // If so, we'll use that policy. - policies.set_policy (priority_model.in ()); + policies.set_policy (compression_enabled.in ()); } } - // Check if the user has specified the server protocol policy. - CORBA::Policy_var server_protocol = + // Check if the user has specified the compression low value policy. + CORBA::Policy_var low_value_policy = policies.get_cached_policy (TAO_CACHED_COMPRESSION_LOW_VALUE_POLICY); - if (CORBA::is_nil (server_protocol.in ())) + if (CORBA::is_nil (low_value_policy.in ())) { - // If not, check if the server protocol policy has been + // If not, check if the compression low value policy has been // specified at the ORB level. - server_protocol = + low_value_policy = this->orb_core_.get_cached_policy (TAO_CACHED_COMPRESSION_LOW_VALUE_POLICY); - if (!CORBA::is_nil (server_protocol.in ())) + if (!CORBA::is_nil (low_value_policy.in ())) + { + // If so, we'll use that policy. + policies.set_policy (low_value_policy.in ()); + } + } + + // Check if the user has specified the minimum compression ratio policy. + CORBA::Policy_var min_ratio_policy = + policies.get_cached_policy (TAO_CACHED_MIN_COMPRESSION_RATIO_POLICY); + + if (CORBA::is_nil (min_ratio_policy.in ())) + { + // If not, check if the minimum compression ratio policy has been + // specified at the ORB level. + min_ratio_policy = + this->orb_core_.get_cached_policy (TAO_CACHED_MIN_COMPRESSION_RATIO_POLICY); + + if (!CORBA::is_nil (min_ratio_policy.in ())) { // If so, we'll use that policy. - policies.set_policy (server_protocol.in ()); + policies.set_policy (min_ratio_policy.in ()); } } - // Check if the user has specified the server protocol policy. - CORBA::Policy_var x = + // Check if the user has specified the compression list policy. + CORBA::Policy_var compressior_list_policy = policies.get_cached_policy (TAO_CACHED_COMPRESSION_ID_LEVEL_LIST_POLICY); - if (CORBA::is_nil (x.in ())) + if (CORBA::is_nil (compressior_list_policy.in ())) { - // If not, check if the server protocol policy has been + // If not, check if the compression list policy has been // specified at the ORB level. - x = + compressior_list_policy = this->orb_core_.get_cached_policy (TAO_CACHED_COMPRESSION_ID_LEVEL_LIST_POLICY); - if (!CORBA::is_nil (x.in ())) + if (!CORBA::is_nil (compressior_list_policy.in ())) { // If so, we'll use that policy. - policies.set_policy (x.in ()); + policies.set_policy (compressior_list_policy.in ()); } } } @@ -100,6 +115,7 @@ TAO_ZIOPPolicy_Validator::legal_policy_impl (CORBA::PolicyType type) { return (type == ZIOP::COMPRESSION_ENABLING_POLICY_ID || type == ZIOP::COMPRESSION_LOW_VALUE_POLICY_ID || + type == ZIOP::COMPRESSION_MIN_RATIO_POLICY_ID || type == ZIOP::COMPRESSOR_ID_LEVEL_LIST_POLICY_ID); } diff --git a/TAO/tao/ZIOP/ZIOP_Policy_i.cpp b/TAO/tao/ZIOP/ZIOP_Policy_i.cpp index fc7fb5c8237..c6306341d7a 100644 --- a/TAO/tao/ZIOP/ZIOP_Policy_i.cpp +++ b/TAO/tao/ZIOP/ZIOP_Policy_i.cpp @@ -307,7 +307,7 @@ CompressionMinRatioPolicy::policy_type (void) { // Future policy implementors: notice how this minimizes the // footprint of the class. - return ZIOP::COMPRESSION_LOW_VALUE_POLICY_ID; + return ZIOP::COMPRESSION_MIN_RATIO_POLICY_ID; } @@ -350,7 +350,7 @@ CompressionMinRatioPolicy::ratio (void) TAO_Cached_Policy_Type CompressionMinRatioPolicy::_tao_cached_type (void) const { - return TAO_CACHED_POLICY_UNCACHED; + return TAO_CACHED_MIN_COMPRESSION_RATIO_POLICY; } } diff --git a/TAO/tao/ZIOP/ZIOP_Policy_i.h b/TAO/tao/ZIOP/ZIOP_Policy_i.h index 505d23a064f..a5b992bf864 100644 --- a/TAO/tao/ZIOP/ZIOP_Policy_i.h +++ b/TAO/tao/ZIOP/ZIOP_Policy_i.h @@ -161,9 +161,9 @@ private: }; /** - * @class CompressionLowValuePolicy + * @class CompressionMinRatioPolicy * - * @brief Implementation of the ZIOP::CompressionLowValuePolicy + * @brief Implementation of the ZIOP::CompressionMinRatioPolicy */ class CompressionMinRatioPolicy : public virtual ::ZIOP::CompressionMinRatioPolicy diff --git a/TAO/tao/ZIOP_Adapter.h b/TAO/tao/ZIOP_Adapter.h index 73be3fad27c..1fbc4d41a6b 100644 --- a/TAO/tao/ZIOP_Adapter.h +++ b/TAO/tao/ZIOP_Adapter.h @@ -27,6 +27,7 @@ TAO_BEGIN_VERSIONED_NAMESPACE_DECL class TAO_Policy_Validator; +class TAO_Queued_Data; /** * @class TAO_ZIOP_Adapter @@ -39,16 +40,13 @@ class TAO_Policy_Validator; class TAO_Export TAO_ZIOP_Adapter : public ACE_Service_Object { public: - virtual bool decompress (TAO_ServerRequest& server_request) = 0; + virtual bool decompress (ACE_Data_Block **db, TAO_Queued_Data& qd, TAO_ORB_Core& orb_core) = 0; - virtual bool marshal_data (TAO_Operation_Details &details, TAO_OutputCDR &stream, TAO::Profile_Transport_Resolver &resolver_) = 0; + virtual bool marshal_data (TAO_OutputCDR& cdr, TAO_Stub& stub) = 0; + virtual bool marshal_data (TAO_OutputCDR& cdr, TAO_ORB_Core& orb_core) = 0; virtual void load_policy_validators (TAO_Policy_Validator &validator) = 0; - virtual bool marshal_reply_data (TAO_ServerRequest& server_request, - TAO::Argument * const * args, - size_t nargs) = 0; - /// The virtual destructor virtual ~TAO_ZIOP_Adapter (void); }; diff --git a/TAO/tao/orbconf.h b/TAO/tao/orbconf.h index 744499b3726..2bfc004ed98 100644 --- a/TAO/tao/orbconf.h +++ b/TAO/tao/orbconf.h @@ -773,6 +773,8 @@ enum TAO_Cached_Policy_Type TAO_CACHED_COMPRESSION_LOW_VALUE_POLICY, + TAO_CACHED_MIN_COMPRESSION_RATIO_POLICY, + TAO_CACHED_COMPRESSION_ID_LEVEL_LIST_POLICY, /// NOTE: The "TAO_CACHED_POLICY_MAX_CACHED" should always be the last. |