summaryrefslogtreecommitdiff
path: root/TAO/tao
diff options
context:
space:
mode:
authormsmit <msmit@remedy.nl>2009-02-13 14:52:13 +0000
committermsmit <msmit@remedy.nl>2009-02-13 14:52:13 +0000
commit97dd3923a92f571f4bd7d29e0739d68b3e658377 (patch)
treead8496cbe0f0caa5554879dff794e9d9f3eb202e /TAO/tao
parent9f1e1dcd68536ed4a6407ec3b0fcb95bcd8424fa (diff)
downloadATCD-97dd3923a92f571f4bd7d29e0739d68b3e658377.tar.gz
Fri Feb 13 14:35:39 UTC 2009 Marcel Smit <msmit@remedy.nl>
Implementation of the ZIOP Beta 1 spec * tao/CDR.cpp: * tao/CDR.h: * tao/CDR.inl: * tao/Messaging/Asynch_Invocation.cpp: * tao/TAO_Server_Request.cpp: Removed obsolete compression flag. * tao/Compression/zlib/ZlibCompressor.cpp: * tao/Compression/bzip2/Bzip2Compressor.cpp: * tao/Compression/Compression.pidl: Added description to Compression exception in order to meet the ZIOP Beta 1 specification * tao/Compression/Compression_Manager.cpp: No major changes made. * tao/GIOP_Message_Base.cpp: * tao/GIOP_Message_Base.h: * tao/GIOP_Message_State.cpp: Implementation of compression and decompression methods. * tao/ORB_Core.h: * tao/ORB_Core.inl: Removed ziop_enabled method since it became obsolete. * tao/orbconf.h: Implemented compression policies. * tao/Remote_Invocation.cpp: * tao/PortableServer/Upcall_Wrapper.cpp: Removed compression and decompression methods here (moved to GIOP_Message_Base. * tao/Synch_Invocation.cpp: Due to interface change of format_message method in GIOP_Message_Base. Removed obsolete compression flag. * tao/ZIOP_Adapter.h: * tao/ZIOP/ZIOP.cpp: * tao/ZIOP/ZIOP.h: * tao/ZIOP/ZIOP.pidl: Refactored current ZIOP implementation in order to meet the ZIOP Beta 1 specification. * tao/ZIOP/ZIOP_Policy_i.cpp: * tao/ZIOP/ZIOP_Policy_i.h: * tao/ZIOP/ZIOP_Policy_Validator.cpp: * tao/ZIOP/ZIOP_PolicyFactory.cpp: Implemented compression policies. * tao/Transport.cpp: * tao/Transport.h: * tao/IIOP_Transport.cpp: * orbsvcs/orbsvcs/HTIOP/HTIOP_Transport.cpp: * orbsvcs/orbsvcs/PortableGroup/UIPMC_Transport.cpp: * tao/Strategies/DIOP_Transport.cpp: * tao/Strategies/SHMIOP_Transport.cpp: Due to interface change of format_message method in GIOP_Message_Base.
Diffstat (limited to 'TAO/tao')
-rw-r--r--TAO/tao/CDR.cpp18
-rw-r--r--TAO/tao/CDR.h28
-rw-r--r--TAO/tao/CDR.inl66
-rw-r--r--TAO/tao/Compression/Compression.pidl18
-rw-r--r--TAO/tao/Compression/Compression_Manager.cpp2
-rw-r--r--TAO/tao/Compression/bzip2/Bzip2Compressor.cpp7
-rw-r--r--TAO/tao/Compression/zlib/ZlibCompressor.cpp8
-rw-r--r--TAO/tao/GIOP_Message_Base.cpp209
-rw-r--r--TAO/tao/GIOP_Message_Base.h13
-rw-r--r--TAO/tao/GIOP_Message_State.cpp10
-rw-r--r--TAO/tao/IIOP_Transport.cpp2
-rw-r--r--TAO/tao/Messaging/Asynch_Invocation.cpp3
-rw-r--r--TAO/tao/ORB_Core.h3
-rw-r--r--TAO/tao/ORB_Core.inl12
-rw-r--r--TAO/tao/PortableServer/Upcall_Wrapper.cpp32
-rw-r--r--TAO/tao/Remote_Invocation.cpp22
-rw-r--r--TAO/tao/Strategies/DIOP_Transport.cpp2
-rw-r--r--TAO/tao/Strategies/SHMIOP_Transport.cpp2
-rw-r--r--TAO/tao/Synch_Invocation.cpp18
-rw-r--r--TAO/tao/TAO_Server_Request.cpp9
-rw-r--r--TAO/tao/Transport.cpp7
-rw-r--r--TAO/tao/Transport.h3
-rw-r--r--TAO/tao/ZIOP/ZIOP.cpp504
-rw-r--r--TAO/tao/ZIOP/ZIOP.h44
-rw-r--r--TAO/tao/ZIOP/ZIOP.pidl12
-rw-r--r--TAO/tao/ZIOP/ZIOP_PolicyFactory.cpp3
-rw-r--r--TAO/tao/ZIOP/ZIOP_Policy_Validator.cpp64
-rw-r--r--TAO/tao/ZIOP/ZIOP_Policy_i.cpp4
-rw-r--r--TAO/tao/ZIOP/ZIOP_Policy_i.h4
-rw-r--r--TAO/tao/ZIOP_Adapter.h10
-rw-r--r--TAO/tao/orbconf.h2
31 files changed, 589 insertions, 552 deletions
diff --git a/TAO/tao/CDR.cpp b/TAO/tao/CDR.cpp
index 72aaae5a00a..ab398fabd52 100644
--- a/TAO/tao/CDR.cpp
+++ b/TAO/tao/CDR.cpp
@@ -76,9 +76,6 @@ TAO_OutputCDR::TAO_OutputCDR (size_t size,
, stub_ (0)
, message_semantics_ (TAO_TWOWAY_REQUEST)
, timeout_ (0)
-#if defined (TAO_HAS_ZIOP) && TAO_HAS_ZIOP ==1
- , compressed_ (false)
-#endif
{
ACE_FUNCTION_TIMEPROBE (TAO_OUTPUT_CDR_CTOR1_ENTER);
@@ -115,9 +112,6 @@ TAO_OutputCDR::TAO_OutputCDR (char *data,
, stub_ (0)
, message_semantics_ (TAO_TWOWAY_REQUEST)
, timeout_ (0)
-#if defined (TAO_HAS_ZIOP) && TAO_HAS_ZIOP ==1
- , compressed_ (false)
-#endif
{
ACE_FUNCTION_TIMEPROBE (TAO_OUTPUT_CDR_CTOR2_ENTER);
}
@@ -147,9 +141,6 @@ TAO_OutputCDR::TAO_OutputCDR (char *data,
, stub_ (0)
, message_semantics_ (TAO_TWOWAY_REQUEST)
, timeout_ (0)
-#if defined (TAO_HAS_ZIOP) && TAO_HAS_ZIOP ==1
- , compressed_ (false)
-#endif
{
ACE_FUNCTION_TIMEPROBE (TAO_OUTPUT_CDR_CTOR3_ENTER);
}
@@ -170,9 +161,6 @@ TAO_OutputCDR::TAO_OutputCDR (ACE_Message_Block *data,
, stub_ (0)
, message_semantics_ (TAO_TWOWAY_REQUEST)
, timeout_ (0)
-#if defined (TAO_HAS_ZIOP) && TAO_HAS_ZIOP ==1
- , compressed_ (false)
-#endif
{
ACE_FUNCTION_TIMEPROBE (TAO_OUTPUT_CDR_CTOR4_ENTER);
}
@@ -196,9 +184,6 @@ TAO_OutputCDR::TAO_OutputCDR (ACE_Data_Block *data_block,
, stub_ (0)
, message_semantics_ (TAO_TWOWAY_REQUEST)
, timeout_ (0)
-#if defined (TAO_HAS_ZIOP) && TAO_HAS_ZIOP ==1
- , compressed_ (false)
-#endif
{
ACE_FUNCTION_TIMEPROBE (TAO_OUTPUT_CDR_CTOR5_ENTER);
}
@@ -292,9 +277,6 @@ TAO_InputCDR::TAO_InputCDR (const TAO_OutputCDR& rhs,
: (orb_core ?
orb_core->output_cdr_msgblock_allocator () : 0)),
orb_core_ (orb_core)
-#if defined (TAO_HAS_ZIOP) && TAO_HAS_ZIOP ==1
- , compressed_ (false)
-#endif
{
}
diff --git a/TAO/tao/CDR.h b/TAO/tao/CDR.h
index a69df9d28c5..8d3125cf334 100644
--- a/TAO/tao/CDR.h
+++ b/TAO/tao/CDR.h
@@ -180,20 +180,11 @@ public:
/// Specify whether there are more data fragments to come.
void more_fragments (bool more);
-#if defined (TAO_HAS_ZIOP) && TAO_HAS_ZIOP ==1
- /// Are we containing compressed data?
- bool compressed (void) const;
-
- /// Specify whether we have compressed data.
- void compressed (bool compressed);
-#endif
-
/// Set fragmented message attributes.
void message_attributes (CORBA::ULong request_id,
TAO_Stub * stub,
TAO_Message_Semantics message_semantics,
- ACE_Time_Value * timeout,
- bool compressed);
+ ACE_Time_Value * timeout);
/// Fragmented message request ID.
CORBA::ULong request_id (void) const;
@@ -244,11 +235,6 @@ private:
/// Request/reply send timeout.
ACE_Time_Value * timeout_;
-
-#if defined (TAO_HAS_ZIOP) && TAO_HAS_ZIOP ==1
- /// Do we contain compressed data
- bool compressed_;
-#endif
//@}
};
@@ -380,21 +366,9 @@ public:
static void throw_stub_exception (int error_num);
static void throw_skel_exception (int error_num);
-#if defined (TAO_HAS_ZIOP) && TAO_HAS_ZIOP ==1
- /// Are we containing compressed data?
- bool compressed (void) const;
-
- /// Specify whether we have compressed data.
- void compressed (bool compressed);
-#endif
-
private:
/// The ORB_Core, required to extract object references.
TAO_ORB_Core* orb_core_;
-
-#if defined (TAO_HAS_ZIOP) && TAO_HAS_ZIOP ==1
- CORBA::Boolean compressed_;
-#endif
};
TAO_END_VERSIONED_NAMESPACE_DECL
diff --git a/TAO/tao/CDR.inl b/TAO/tao/CDR.inl
index 25611598b02..bbcbbc5a40f 100644
--- a/TAO/tao/CDR.inl
+++ b/TAO/tao/CDR.inl
@@ -23,36 +23,16 @@ TAO_OutputCDR::more_fragments (bool more)
this->more_fragments_ = more;
}
-#if defined (TAO_HAS_ZIOP) && TAO_HAS_ZIOP ==1
-ACE_INLINE bool
-TAO_OutputCDR::compressed (void) const
-{
- return this->compressed_;
-}
-
-ACE_INLINE void
-TAO_OutputCDR::compressed (bool compressed)
-{
- this->compressed_ = compressed;
-}
-#endif
-
ACE_INLINE void
TAO_OutputCDR::message_attributes (CORBA::ULong request_id,
TAO_Stub * stub,
TAO_Message_Semantics message_semantics,
- ACE_Time_Value * timeout,
- bool compressed)
+ ACE_Time_Value * timeout)
{
this->request_id_ = request_id;
this->stub_ = stub;
this->message_semantics_ = message_semantics;
this->timeout_ = timeout;
-#if defined (TAO_HAS_ZIOP) && TAO_HAS_ZIOP ==1
- this->compressed_ = compressed;
-#else
- ACE_UNUSED_ARG (compressed);
-#endif
}
ACE_INLINE CORBA::ULong
@@ -100,9 +80,6 @@ TAO_InputCDR::TAO_InputCDR (const char *buf,
major_version,
minor_version),
orb_core_ (orb_core)
-#if defined (TAO_HAS_ZIOP) && TAO_HAS_ZIOP ==1
- , compressed_ (false)
-#endif
{
}
@@ -117,9 +94,6 @@ TAO_InputCDR::TAO_InputCDR (size_t bufsiz,
major_version,
minor_version),
orb_core_ (orb_core)
-#if defined (TAO_HAS_ZIOP) && TAO_HAS_ZIOP ==1
- , compressed_ (false)
-#endif
{
}
@@ -134,9 +108,6 @@ TAO_InputCDR::TAO_InputCDR (const ACE_Message_Block *data,
major_version,
minor_version),
orb_core_ (orb_core)
-#if defined (TAO_HAS_ZIOP) && TAO_HAS_ZIOP ==1
- , compressed_ (false)
-#endif
{
}
@@ -153,9 +124,6 @@ TAO_InputCDR::TAO_InputCDR (const ACE_Message_Block *data,
minor_version,
lock),
orb_core_ (orb_core)
-#if defined (TAO_HAS_ZIOP) && TAO_HAS_ZIOP ==1
- , compressed_ (false)
-#endif
{
}
@@ -172,9 +140,6 @@ TAO_InputCDR::TAO_InputCDR (ACE_Data_Block *data,
major_version,
minor_version),
orb_core_ (orb_core)
-#if defined (TAO_HAS_ZIOP) && TAO_HAS_ZIOP ==1
- , compressed_ (false)
-#endif
{
}
@@ -196,9 +161,6 @@ TAO_InputCDR::TAO_InputCDR (ACE_Data_Block *data,
major_version,
minor_version),
orb_core_ (orb_core)
-#if defined (TAO_HAS_ZIOP) && TAO_HAS_ZIOP ==1
- , compressed_ (false)
-#endif
{
}
@@ -211,9 +173,6 @@ TAO_InputCDR::TAO_InputCDR (const TAO_InputCDR& rhs,
size,
offset),
orb_core_ (rhs.orb_core_)
-#if defined (TAO_HAS_ZIOP) && TAO_HAS_ZIOP ==1
- , compressed_ (false)
-#endif
{
}
@@ -223,9 +182,6 @@ TAO_InputCDR::TAO_InputCDR (const TAO_InputCDR& rhs,
: ACE_InputCDR (rhs,
size),
orb_core_ (rhs.orb_core_)
-#if defined (TAO_HAS_ZIOP) && TAO_HAS_ZIOP ==1
- , compressed_ (false)
-#endif
{
}
@@ -233,9 +189,6 @@ ACE_INLINE
TAO_InputCDR::TAO_InputCDR (const TAO_InputCDR& rhs)
: ACE_InputCDR (rhs),
orb_core_ (rhs.orb_core_)
-#if defined (TAO_HAS_ZIOP) && TAO_HAS_ZIOP ==1
- , compressed_ (false)
-#endif
{
}
@@ -244,9 +197,6 @@ TAO_InputCDR::TAO_InputCDR (ACE_InputCDR::Transfer_Contents rhs,
TAO_ORB_Core* orb_core)
: ACE_InputCDR (rhs),
orb_core_ (orb_core)
-#if defined (TAO_HAS_ZIOP) && TAO_HAS_ZIOP ==1
- , compressed_ (false)
-#endif
{
}
@@ -261,20 +211,6 @@ TAO_InputCDR::orb_core (void) const
return this->orb_core_;
}
-#if defined (TAO_HAS_ZIOP) && TAO_HAS_ZIOP ==1
-ACE_INLINE bool
-TAO_InputCDR::compressed (void) const
-{
- return this->compressed_;
-}
-
-ACE_INLINE void
-TAO_InputCDR::compressed (bool compressed)
-{
- this->compressed_ = compressed;
-}
-#endif
-
// ****************************************************************
ACE_INLINE CORBA::Boolean operator<< (TAO_OutputCDR &os,
diff --git a/TAO/tao/Compression/Compression.pidl b/TAO/tao/Compression/Compression.pidl
index 41fa649a165..0fed028ce7c 100644
--- a/TAO/tao/Compression/Compression.pidl
+++ b/TAO/tao/Compression/Compression.pidl
@@ -22,6 +22,7 @@ module Compression
exception CompressionException
{
unsigned long reason;
+ string description;
};
/**
@@ -42,7 +43,7 @@ module Compression
/**
* CompressorId type.
*/
- typedef unsigned long CompressorId;
+ typedef unsigned short CompressorId;
const CompressorId COMPRESSORID_NONE = 0;
const CompressorId COMPRESSORID_GZIP = 1;
const CompressorId COMPRESSORID_PKZIP = 2;
@@ -55,19 +56,18 @@ module Compression
const CompressorId COMPRESSORID_XAR = 9;
/**
- * CompressorId list
+ * CompressionLevel type.
*/
- typedef sequence <CompressorId> CompressorIdList;
+ typedef unsigned short CompressionLevel;
/**
- * CompressionLevel type.
+ * CompressionRatio type.
*/
- typedef unsigned long CompressionLevel;
-
typedef long CompressionRatio;
- local interface CompressorFactory;
-
+ /**
+ * CompressionLevelId struc.
+ */
struct CompressorIdLevel {
CompressorId compressor_id;
CompressionLevel compression_level;
@@ -76,6 +76,8 @@ module Compression
typedef CORBA::OctetSeq Buffer;
+ local interface CompressorFactory;
+
/**
* Compressor - abstraction of a compressor and decompressor.
*/
diff --git a/TAO/tao/Compression/Compression_Manager.cpp b/TAO/tao/Compression/Compression_Manager.cpp
index d9ea7e2dbfc..035a9c2a95e 100644
--- a/TAO/tao/Compression/Compression_Manager.cpp
+++ b/TAO/tao/Compression/Compression_Manager.cpp
@@ -18,7 +18,6 @@ namespace TAO
if (!::CORBA::is_nil (compressor_factory))
{
ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->mutex_);
-
CORBA::ULong const length = this->factories_.length ();
for (CORBA::ULong i = 0; i < length; ++i)
@@ -62,7 +61,6 @@ namespace TAO
this->factories_[i] = ::Compression::CompressorFactory::_nil ();
// make sequence smaller
-
return;
}
diff --git a/TAO/tao/Compression/bzip2/Bzip2Compressor.cpp b/TAO/tao/Compression/bzip2/Bzip2Compressor.cpp
index 357e4eaf4ff..fcfd6a83a0d 100644
--- a/TAO/tao/Compression/bzip2/Bzip2Compressor.cpp
+++ b/TAO/tao/Compression/bzip2/Bzip2Compressor.cpp
@@ -23,7 +23,8 @@ Bzip2Compressor::compress (
::Compression::Buffer & target
)
{
- unsigned int max_length = static_cast <unsigned int> (source.length () * 1.1) + 12;
+ unsigned int max_length =
+ static_cast <unsigned int> (source.length () * 1.1) + TAO_GIOP_MESSAGE_HEADER_LEN;
target.length (static_cast <CORBA::ULong> (max_length));
// todo, check 0,1 values
@@ -37,7 +38,7 @@ Bzip2Compressor::compress (
if (retval != BZ_OK)
{
- throw ::Compression::CompressionException (retval);
+ throw ::Compression::CompressionException (retval, "");
}
else
{
@@ -64,7 +65,7 @@ Bzip2Compressor::decompress (
if (retval != BZ_OK)
{
- throw ::Compression::CompressionException (retval);
+ throw ::Compression::CompressionException (retval, "");
}
else
{
diff --git a/TAO/tao/Compression/zlib/ZlibCompressor.cpp b/TAO/tao/Compression/zlib/ZlibCompressor.cpp
index 4a22a0f5fe5..45c27e26765 100644
--- a/TAO/tao/Compression/zlib/ZlibCompressor.cpp
+++ b/TAO/tao/Compression/zlib/ZlibCompressor.cpp
@@ -23,7 +23,8 @@ ZlibCompressor::compress (
::Compression::Buffer & target
)
{
- uLongf max_length = static_cast <uLongf> (source.length () * 1.1) + 12;
+ uLongf max_length =
+ static_cast <uLongf> (source.length () * 1.1) + TAO_GIOP_MESSAGE_HEADER_LEN;
target.length (static_cast <CORBA::ULong> (max_length));
int const retval = ::compress2 (reinterpret_cast <Bytef*>(target.get_buffer ()),
@@ -34,7 +35,8 @@ ZlibCompressor::compress (
if (retval != Z_OK)
{
- throw ::Compression::CompressionException (retval);
+ throw ::Compression::CompressionException (retval,
+ CORBA::string_dup(""));
}
else
{
@@ -58,7 +60,7 @@ ZlibCompressor::decompress (
if (retval != Z_OK)
{
- throw ::Compression::CompressionException (retval);
+ throw ::Compression::CompressionException (retval, "");
}
else
{
diff --git a/TAO/tao/GIOP_Message_Base.cpp b/TAO/tao/GIOP_Message_Base.cpp
index d0350c5c591..a02ffca1f1e 100644
--- a/TAO/tao/GIOP_Message_Base.cpp
+++ b/TAO/tao/GIOP_Message_Base.cpp
@@ -236,14 +236,66 @@ TAO_GIOP_Message_Base::generate_fragment_header (TAO_OutputCDR & cdr,
return 0;
}
-int
-TAO_GIOP_Message_Base::format_message (TAO_OutputCDR &stream)
+int
+TAO_GIOP_Message_Base::dump_consolidated_msg (TAO_OutputCDR &stream, bool hex_dump_only)
{
- // Ptr to first buffer.
+ // Check whether the output cdr stream is build up of multiple
+ // messageblocks. If so, consolidate them to one block that can be
+ // dumped
+ ACE_Message_Block* consolidated_block = 0;
char *buf = const_cast <char*> (stream.buffer ());
+ size_t const total_len = stream.total_length ();
+ if (stream.begin()->cont () != 0)
+ {
+ ACE_NEW_RETURN (consolidated_block, ACE_Message_Block, 0);
+ ACE_CDR::consolidate (consolidated_block, stream.begin ());
+ buf = (char *) (consolidated_block->rd_ptr ());
+ }
+ ///
+ this->dump_msg ("send", reinterpret_cast <u_char *> (buf), total_len, hex_dump_only);
+ //
+ delete consolidated_block;
+ consolidated_block = 0;
+ //
+ return 0;
+}
+
+int
+TAO_GIOP_Message_Base::format_message (TAO_OutputCDR &stream, TAO_Stub& stub)
+{
this->set_giop_flags (stream);
+#if defined (TAO_HAS_ZIOP) && TAO_HAS_ZIOP ==1
+ TAO_ZIOP_Adapter* ziop_adapter = this->orb_core_->ziop_adapter ();
+
+ //ziop adapter found and not compressed yet
+ if (ziop_adapter)
+ {
+ if (TAO_debug_level >= 5)
+ {
+ ACE_DEBUG ((LM_DEBUG, ACE_TEXT("Before compression: ")));
+ this->dump_consolidated_msg (stream, true);
+ }
+ bool compressed;
+ if (&stub == 0)
+ {
+ compressed = ziop_adapter->marshal_data (stream, *this->orb_core_);
+ }
+ else
+ {
+ compressed = ziop_adapter->marshal_data (stream, stub);
+ }
+
+ if (TAO_debug_level >= 5)
+ {
+ if (!compressed)
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT("GIOP message not compressed")));
+ }
+ }
+#endif
+
// Length of all buffers.
size_t const total_len = stream.total_length ();
@@ -254,6 +306,8 @@ TAO_GIOP_Message_Base::format_message (TAO_OutputCDR &stream)
// this particular environment and that isn't handled by the
// networking infrastructure (e.g., IPSEC).
+ char *buf = const_cast <char*> (stream.buffer ());
+
CORBA::ULong bodylen = static_cast <CORBA::ULong>
(total_len - TAO_GIOP_MESSAGE_HEADER_LEN);
@@ -271,23 +325,7 @@ TAO_GIOP_Message_Base::format_message (TAO_OutputCDR &stream)
if (TAO_debug_level >= 5)
{
- // Check whether the output cdr stream is build up of multiple
- // messageblocks. If so, consolidate them to one block that can be
- // dumped
- ACE_Message_Block* consolidated_block = 0;
- if (stream.begin()->cont () != 0)
- {
- ACE_NEW_RETURN (consolidated_block, ACE_Message_Block, 0);
- ACE_CDR::consolidate (consolidated_block, stream.begin ());
- buf = (char *) (consolidated_block->rd_ptr ());
- }
- ///
- this->dump_msg ("send", reinterpret_cast <u_char *> (buf), total_len);
-
- //
- delete consolidated_block;
- consolidated_block = 0;
- //
+ this->dump_consolidated_msg (stream, false);
}
return 0;
@@ -600,9 +638,9 @@ TAO_GIOP_Message_Base::process_request_message (TAO_Transport *transport,
qd->giop_version ().major_version (),
qd->giop_version ().minor_version ());
- // Get the read and write positions before we steal data.
+ // Get the read and write positions and header before we steal data.
size_t rd_pos = qd->msg_block ()->rd_ptr () - qd->msg_block ()->base ();
- size_t const wr_pos = qd->msg_block ()->wr_ptr () - qd->msg_block ()->base ();
+ size_t wr_pos = qd->msg_block ()->wr_ptr () - qd->msg_block ()->base ();
rd_pos += TAO_GIOP_MESSAGE_HEADER_LEN;
if (TAO_debug_level >= 5)
@@ -639,7 +677,12 @@ TAO_GIOP_Message_Base::process_request_message (TAO_Transport *transport,
db = qd->msg_block ()->data_block ()->duplicate ();
}
- TAO_InputCDR input_cdr (db,
+#if defined (TAO_HAS_ZIOP) && TAO_HAS_ZIOP ==1
+ if (!decompress (&db, *qd, rd_pos, wr_pos))
+ return -1;
+#endif
+
+ TAO_InputCDR input_cdr (db,
flg,
rd_pos,
wr_pos,
@@ -648,9 +691,6 @@ TAO_GIOP_Message_Base::process_request_message (TAO_Transport *transport,
qd->giop_version ().minor_version (),
this->orb_core_);
-#if defined (TAO_HAS_ZIOP) && TAO_HAS_ZIOP ==1
- input_cdr.compressed (qd->state().compressed ());
-#endif
transport->assign_translators(&input_cdr,&output);
@@ -682,6 +722,42 @@ TAO_GIOP_Message_Base::process_request_message (TAO_Transport *transport,
}
}
+bool
+TAO_GIOP_Message_Base::decompress (ACE_Data_Block **db, TAO_Queued_Data& qd,
+ size_t& rd_pos, size_t& wr_pos)
+{
+ if (qd.state().compressed ())
+ {
+ TAO_ZIOP_Adapter* adapter = this->orb_core_->ziop_adapter ();
+ if (adapter)
+ {
+ if (!adapter->decompress (db, qd, *this->orb_core_))
+ return false;
+ rd_pos = TAO_GIOP_MESSAGE_HEADER_LEN;
+ ACE_Data_Block *tmp = *db;
+ wr_pos = tmp->size();
+ if (TAO_debug_level >= 5)
+ {
+ ACE_HEX_DUMP ((LM_DEBUG,
+ const_cast <char*> (tmp->base ()),
+ tmp->size (),
+ ACE_TEXT ("GIOP message after decompression")));
+ }
+ }
+ else
+ {
+ if (TAO_debug_level > 0)
+ ACE_ERROR ((LM_ERROR,
+ ACE_TEXT ("TAO (%P|%t) ERROR: Unable to decompress ")
+ ACE_TEXT ("data.\n")));
+
+ return false;
+ }
+ }
+ return true;
+}
+
+
int
TAO_GIOP_Message_Base::process_reply_message (
TAO_Pluggable_Reply_Params &params,
@@ -693,7 +769,7 @@ TAO_GIOP_Message_Base::process_reply_message (
// Get the read and write positions before we steal data.
size_t rd_pos = qd->msg_block ()->rd_ptr () - qd->msg_block ()->base ();
- size_t const wr_pos = qd->msg_block ()->wr_ptr () - qd->msg_block ()->base ();
+ size_t wr_pos = qd->msg_block ()->wr_ptr () - qd->msg_block ()->base ();
rd_pos += TAO_GIOP_MESSAGE_HEADER_LEN;
if (TAO_debug_level >= 5)
@@ -703,11 +779,17 @@ TAO_GIOP_Message_Base::process_reply_message (
qd->msg_block ()->length ());
}
+ ACE_Data_Block *db = qd->msg_block ()->data_block ();;
+
+#if defined (TAO_HAS_ZIOP) && TAO_HAS_ZIOP ==1
+ if (!decompress (&db, *qd, rd_pos, wr_pos))
+ return -1;
+#endif
// Create a empty buffer on stack
// NOTE: We use the same data block in which we read the message and
// we pass it on to the higher layers of the ORB. So we dont to any
// copies at all here.
- TAO_InputCDR input_cdr (qd->msg_block ()->data_block (),
+ TAO_InputCDR input_cdr (db,
ACE_Message_Block::DONT_DELETE,
rd_pos,
wr_pos,
@@ -811,7 +893,7 @@ TAO_GIOP_Message_Base::write_protocol_header (GIOP::MsgType type,
0x4f, // 'O'
0x50 // 'P'
};
-
+
header[4] = version.major;
header[5] = version.minor;
@@ -870,26 +952,6 @@ TAO_GIOP_Message_Base::process_request (
CORBA::Object_var forward_to;
-#if defined (TAO_HAS_ZIOP) && TAO_HAS_ZIOP ==1
- if (cdr.compressed ())
- {
- TAO_ZIOP_Adapter* adapter = this->orb_core_->ziop_adapter ();
- if (adapter)
- {
- adapter->decompress (request);
- }
- else
- {
- if (TAO_debug_level > 0)
- ACE_ERROR ((LM_ERROR,
- ACE_TEXT ("TAO (%P|%t) ERROR: Unable to decompress ")
- ACE_TEXT ("data.\n")));
-
- return -1;
- }
- }
-#endif
-
/*
* Hook to specialize request processing within TAO
* This hook will be replaced by specialized request
@@ -928,8 +990,7 @@ TAO_GIOP_Message_Base::process_request (
output.message_attributes (request_id,
0,
TAO_REPLY,
- 0,
- false);
+ 0);
// Make the GIOP header and Reply header
this->generate_reply_header (output, reply_params);
@@ -1443,7 +1504,8 @@ TAO_GIOP_Message_Base::send_reply_exception (
void
TAO_GIOP_Message_Base::dump_msg (const char *label,
const u_char *ptr,
- size_t len)
+ size_t len,
+ bool hex_dump_only)
{
if (TAO_debug_level < 10)
{
@@ -1471,7 +1533,9 @@ TAO_GIOP_Message_Base::dump_msg (const char *label,
// Byte order.
int const byte_order = ptr[TAO_GIOP_MESSAGE_FLAGS_OFFSET] & 0x01;
- int const compressed = ptr[TAO_GIOP_MESSAGE_FLAGS_OFFSET] & 0x04;
+ int const compressed = ptr[0] == 0x5A;
+ ACE_TCHAR message_type[15];
+ ACE_OS::sprintf(message_type, "%c%c%c%c message", ptr[0], ptr[1], ptr[2], ptr[3]);
// Get the version info
CORBA::Octet const major = ptr[TAO_GIOP_VERSION_MAJOR_OFFSET];
@@ -1511,23 +1575,25 @@ TAO_GIOP_Message_Base::dump_msg (const char *label,
}
// Print.
- ACE_DEBUG ((LM_DEBUG,
- "TAO (%P|%t) - GIOP_Message_Base::dump_msg, "
- "%C GIOP v%c.%c msg, %d data bytes, %s endian, "
- "%s compressed, Type %C[%u]\n",
- label,
- digits[ptr[TAO_GIOP_VERSION_MAJOR_OFFSET]],
- digits[ptr[TAO_GIOP_VERSION_MINOR_OFFSET]],
- len - TAO_GIOP_MESSAGE_HEADER_LEN ,
- (byte_order == TAO_ENCAP_BYTE_ORDER) ? ACE_TEXT("my") : ACE_TEXT("other"),
- (compressed == 0) ? ACE_TEXT("not") : ACE_TEXT("is"),
- message_name,
- *id));
-
+ if (!hex_dump_only)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "TAO (%P|%t) - GIOP_Message_Base::dump_msg, "
+ "%C %s v%c.%c, %d data bytes, %s endian, "
+ "Type %C[%u]\n",
+ label,
+ message_type,
+ digits[ptr[TAO_GIOP_VERSION_MAJOR_OFFSET]],
+ digits[ptr[TAO_GIOP_VERSION_MINOR_OFFSET]],
+ len - TAO_GIOP_MESSAGE_HEADER_LEN ,
+ (byte_order == TAO_ENCAP_BYTE_ORDER) ? ACE_TEXT("my") : ACE_TEXT("other"),
+ message_name,
+ *id));
+ }
ACE_HEX_DUMP ((LM_DEBUG,
- (const char *) ptr,
- len,
- ACE_TEXT ("GIOP message")));
+ (const char *) ptr,
+ len,
+ ACE_TEXT (message_type)));
}
int
@@ -1953,11 +2019,6 @@ TAO_GIOP_Message_Base::set_giop_flags (TAO_OutputCDR & msg) const
// Only supported in GIOP 1.1 or better.
if (!(major <= 1 && minor == 0))
ACE_SET_BITS (flags, msg.more_fragments () << 1);
-
-#if defined (TAO_HAS_ZIOP) && TAO_HAS_ZIOP ==1
- if (!(major <= 1 && minor < 2))
- ACE_SET_BITS (flags, msg.compressed () << 2);
-#endif
}
TAO_END_VERSIONED_NAMESPACE_DECL
diff --git a/TAO/tao/GIOP_Message_Base.h b/TAO/tao/GIOP_Message_Base.h
index 701a0f110d6..68f2e232f46 100644
--- a/TAO/tao/GIOP_Message_Base.h
+++ b/TAO/tao/GIOP_Message_Base.h
@@ -85,7 +85,7 @@ public:
/// Format the message. As we have not written the message length in
/// the header, we make use of this oppurtunity to insert and format
/// the message.
- int format_message (TAO_OutputCDR &cdr);
+ int format_message (TAO_OutputCDR &cdr, TAO_Stub& stub);
/**
* Parse the details of the next message from the @a incoming
@@ -157,6 +157,11 @@ public:
bool is_ready_for_bidirectional (TAO_OutputCDR &msg) const;
private:
+ /// Decompresses a ZIOP message and turns it into a GIOP message
+ bool decompress (ACE_Data_Block **db, TAO_Queued_Data& qd,
+ size_t& rd_pos, size_t& wr_pos);
+
+
/// Processes the GIOP_REQUEST messages
int process_request (TAO_Transport *transport,
TAO_InputCDR &input,
@@ -173,8 +178,12 @@ private:
TAO_GIOP_Message_Generator_Parser *get_parser (
const TAO_GIOP_Message_Version &version) const;
+ /// Print out consolidate messages
+ int dump_consolidated_msg (TAO_OutputCDR &stream, bool hex_dump_only);
+
/// Print out a debug messages..
- void dump_msg (const char *label, const u_char *ptr, size_t len);
+ void dump_msg (const char *label, const u_char *ptr,
+ size_t len, bool hex_dump_only = false);
/// Writes the GIOP header in to @a msg
/// @note If the GIOP header happens to change in the future, we can
diff --git a/TAO/tao/GIOP_Message_State.cpp b/TAO/tao/GIOP_Message_State.cpp
index 45511f0150a..6826fef374a 100644
--- a/TAO/tao/GIOP_Message_State.cpp
+++ b/TAO/tao/GIOP_Message_State.cpp
@@ -87,8 +87,7 @@ TAO_GIOP_Message_State::parse_message_header_i (ACE_Message_Block &incoming)
int
TAO_GIOP_Message_State::parse_magic_bytes (char *buf)
{
- // The values are hard-coded to support non-ASCII platforms.
- if (!(buf [0] == 0x47 // 'G'
+ if (!((buf [0] == 0x5A || buf [0] == 0x47) // 'Z' or 'G' (depending on compression)
&& buf [1] == 0x49 // 'I'
&& buf [2] == 0x4f // 'O'
&& buf [3] == 0x50)) // 'P'
@@ -97,15 +96,15 @@ TAO_GIOP_Message_State::parse_magic_bytes (char *buf)
ACE_DEBUG ((LM_DEBUG,
ACE_TEXT ("TAO (%P|%t) - ")
ACE_TEXT ("TAO_GIOP_Message_State::parse_magic_bytes, ")
- ACE_TEXT ("bad header: ")
+ ACE_TEXT ("bad %cIOP header: ")
ACE_TEXT ("magic word [%02x,%02x,%02x,%02x]\n"),
buf[0],
+ buf[0],
buf[1],
buf[2],
buf[3]));
return -1;
}
-
return 0;
}
@@ -183,8 +182,7 @@ TAO_GIOP_Message_State::get_byte_order_info (char *buf)
#if defined (TAO_HAS_ZIOP) && TAO_HAS_ZIOP ==1
// Read the compressed flag
- this->compressed_ =
- ((buf[TAO_GIOP_MESSAGE_FLAGS_OFFSET]& 0x04) == 4);
+ this->compressed_ = (buf[0] == 0x5A);
#endif
}
diff --git a/TAO/tao/IIOP_Transport.cpp b/TAO/tao/IIOP_Transport.cpp
index a554b36c987..e215d8cc623 100644
--- a/TAO/tao/IIOP_Transport.cpp
+++ b/TAO/tao/IIOP_Transport.cpp
@@ -231,7 +231,7 @@ TAO_IIOP_Transport::send_message (TAO_OutputCDR &stream,
ACE_Time_Value *max_wait_time)
{
// Format the message in the stream first
- if (this->messaging_object ()->format_message (stream) != 0)
+ if (this->messaging_object ()->format_message (stream, *stub) != 0)
return -1;
// This guarantees to send all data (bytes) or return an error.
diff --git a/TAO/tao/Messaging/Asynch_Invocation.cpp b/TAO/tao/Messaging/Asynch_Invocation.cpp
index 12f2223ebc4..7243dc28e76 100644
--- a/TAO/tao/Messaging/Asynch_Invocation.cpp
+++ b/TAO/tao/Messaging/Asynch_Invocation.cpp
@@ -76,8 +76,7 @@ namespace TAO
cdr.message_attributes (this->details_.request_id (),
this->resolver_.stub (),
TAO_ONEWAY_REQUEST,
- max_wait_time,
- false);
+ max_wait_time);
this->write_header (cdr);
diff --git a/TAO/tao/ORB_Core.h b/TAO/tao/ORB_Core.h
index b2f3d7172a3..d1324f2ffdf 100644
--- a/TAO/tao/ORB_Core.h
+++ b/TAO/tao/ORB_Core.h
@@ -205,9 +205,6 @@ public:
TAO_ZIOP_Adapter *ziop_adapter () const;
void ziop_adapter (TAO_ZIOP_Adapter *adapter);
- CORBA::Boolean ziop_enabled () const;
- void ziop_enabled (CORBA::Boolean value);
-
TAO_Service_Context_Registry &service_context_registry (void);
/// Get the protocol factories
diff --git a/TAO/tao/ORB_Core.inl b/TAO/tao/ORB_Core.inl
index 9a2f70ce16d..491012809cd 100644
--- a/TAO/tao/ORB_Core.inl
+++ b/TAO/tao/ORB_Core.inl
@@ -454,18 +454,6 @@ TAO_ORB_Core::ziop_adapter (TAO_ZIOP_Adapter *adapter)
this->ziop_adapter_ = adapter;
}
-ACE_INLINE CORBA::Boolean
-TAO_ORB_Core::ziop_enabled () const
-{
- return this->ziop_enabled_;
-}
-
-ACE_INLINE void
-TAO_ORB_Core::ziop_enabled (CORBA::Boolean value)
-{
- this->ziop_enabled_ = value;
-}
-
ACE_INLINE TAO::ORBInitializer_Registry_Adapter *
TAO_ORB_Core::orbinitializer_registry ()
{
diff --git a/TAO/tao/PortableServer/Upcall_Wrapper.cpp b/TAO/tao/PortableServer/Upcall_Wrapper.cpp
index 9e7ce66d489..fe652ebf4c2 100644
--- a/TAO/tao/PortableServer/Upcall_Wrapper.cpp
+++ b/TAO/tao/PortableServer/Upcall_Wrapper.cpp
@@ -4,7 +4,6 @@
#include "tao/PortableServer/Upcall_Command.h"
#include "tao/PortableServer/Collocated_Arguments_Converter.h"
#include "tao/SystemException.h"
-#include "tao/ZIOP_Adapter.h"
#if TAO_HAS_INTERCEPTORS == 1
# include "tao/ServerRequestInterceptor_Adapter.h"
@@ -246,33 +245,20 @@ TAO::Upcall_Wrapper::post_upcall (TAO_ServerRequest& server_request,
TAO::Argument * const * args,
size_t nargs)
{
-#if defined (TAO_HAS_ZIOP) && TAO_HAS_ZIOP ==1
- // Marshal the operation "inout" and "out" arguments and return
- // value, if any.
- TAO_ZIOP_Adapter* ziop_adapter = server_request.orb_core ()->ziop_adapter ();
+ TAO_OutputCDR & cdr = (*server_request.outgoing ());
+ TAO::Argument * const * const begin = args;
+ TAO::Argument * const * const end = args + nargs;
- if (ziop_adapter)
- {
- ziop_adapter->marshal_reply_data (server_request, args, nargs);
- }
- else
-#endif
+ for (TAO::Argument * const * i = begin; i != end; ++i)
{
- TAO_OutputCDR & cdr = (*server_request.outgoing ());
- TAO::Argument * const * const begin = args;
- TAO::Argument * const * const end = args + nargs;
-
- for (TAO::Argument * const * i = begin; i != end; ++i)
+ if (!(*i)->marshal (cdr))
{
- if (!(*i)->marshal (cdr))
- {
- TAO_OutputCDR::throw_skel_exception (errno);
- }
+ TAO_OutputCDR::throw_skel_exception (errno);
}
-
- // Reply body marshaling completed. No other fragments to send.
- cdr.more_fragments (false);
}
+
+ // Reply body marshaling completed. No other fragments to send.
+ cdr.more_fragments (false);
}
TAO_END_VERSIONED_NAMESPACE_DECL
diff --git a/TAO/tao/Remote_Invocation.cpp b/TAO/tao/Remote_Invocation.cpp
index 3533769987c..e937eb30d7d 100644
--- a/TAO/tao/Remote_Invocation.cpp
+++ b/TAO/tao/Remote_Invocation.cpp
@@ -11,7 +11,6 @@
#include "tao/Network_Priority_Protocols_Hooks.h"
#include "tao/debug.h"
#include "tao/SystemException.h"
-#include "tao/ZIOP_Adapter.h"
ACE_RCSID (tao,
Remote_Invocation,
@@ -120,22 +119,11 @@ namespace TAO
void
Remote_Invocation::marshal_data (TAO_OutputCDR &out_stream)
{
-#if defined (TAO_HAS_ZIOP) && TAO_HAS_ZIOP ==1
- TAO_ZIOP_Adapter* ziop_adapter = this->stub()->orb_core()->ziop_adapter ();
-
- if (ziop_adapter)
- {
- ziop_adapter->marshal_data (this->details_, out_stream, this->resolver_);
- }
- else
-#endif
- {
- // Marshal application data
- if (this->details_.marshal_args (out_stream) == false)
- {
- throw ::CORBA::MARSHAL ();
- }
- }
+ // Marshal application data
+ if (this->details_.marshal_args (out_stream) == false)
+ {
+ throw ::CORBA::MARSHAL ();
+ }
}
Invocation_Status
diff --git a/TAO/tao/Strategies/DIOP_Transport.cpp b/TAO/tao/Strategies/DIOP_Transport.cpp
index 48336b9d08e..b57595ee8e7 100644
--- a/TAO/tao/Strategies/DIOP_Transport.cpp
+++ b/TAO/tao/Strategies/DIOP_Transport.cpp
@@ -251,7 +251,7 @@ TAO_DIOP_Transport::send_message (TAO_OutputCDR &stream,
ACE_Time_Value *max_wait_time)
{
// Format the message in the stream first
- if (this->messaging_object ()->format_message (stream) != 0)
+ if (this->messaging_object ()->format_message (stream, *stub) != 0)
return -1;
// Strictly speaking, should not need to loop here because the
diff --git a/TAO/tao/Strategies/SHMIOP_Transport.cpp b/TAO/tao/Strategies/SHMIOP_Transport.cpp
index ea571797386..fad20ed3f2e 100644
--- a/TAO/tao/Strategies/SHMIOP_Transport.cpp
+++ b/TAO/tao/Strategies/SHMIOP_Transport.cpp
@@ -298,7 +298,7 @@ TAO_SHMIOP_Transport::send_message (TAO_OutputCDR &stream,
ACE_Time_Value *max_wait_time)
{
// Format the message in the stream first
- if (this->messaging_object ()->format_message (stream) != 0)
+ if (this->messaging_object ()->format_message (stream, *stub) != 0)
return -1;
// Strictly speaking, should not need to loop here because the
diff --git a/TAO/tao/Synch_Invocation.cpp b/TAO/tao/Synch_Invocation.cpp
index 45690f51c54..703da9b2180 100644
--- a/TAO/tao/Synch_Invocation.cpp
+++ b/TAO/tao/Synch_Invocation.cpp
@@ -91,13 +91,12 @@ namespace TAO
cdr.message_attributes (this->details_.request_id (),
this->resolver_.stub (),
TAO_TWOWAY_REQUEST,
- max_wait_time,
- false);
+ max_wait_time);
this->write_header (cdr);
this->marshal_data (cdr);
-
+
// Register a reply dispatcher for this invocation. Use the
// preallocated reply dispatcher.
TAO_Bind_Dispatcher_Guard dispatch_guard (
@@ -666,8 +665,7 @@ namespace TAO
cdr.message_attributes (this->details_.request_id (),
this->resolver_.stub (),
TAO_ONEWAY_REQUEST,
- max_wait_time,
- false);
+ max_wait_time);
this->write_header (cdr);
@@ -685,12 +683,12 @@ namespace TAO
else
{
if (TAO_debug_level > 4)
- ACE_DEBUG ((LM_DEBUG,
- "TAO (%P|%t) - Synch_Oneway_Invocation::"
- "remote_oneway, queueing message\n"));
+ ACE_DEBUG ((LM_DEBUG,
+ "TAO (%P|%t) - Synch_Oneway_Invocation::"
+ "remote_oneway, queueing message\n"));
- if (transport->format_queue_message (cdr, max_wait_time) != 0)
- s = TAO_INVOKE_FAILURE;
+ if (transport->format_queue_message (cdr, max_wait_time, *this->resolver_.stub()) != 0)
+ s = TAO_INVOKE_FAILURE;
}
}
diff --git a/TAO/tao/TAO_Server_Request.cpp b/TAO/tao/TAO_Server_Request.cpp
index d26faba992b..7a133e38e47 100644
--- a/TAO/tao/TAO_Server_Request.cpp
+++ b/TAO/tao/TAO_Server_Request.cpp
@@ -270,8 +270,7 @@ TAO_ServerRequest::init_reply (void)
this->outgoing_->message_attributes (this->request_id_,
0,
TAO_REPLY,
- 0,
- false);
+ 0);
// Construct a REPLY header.
this->mesg_base_->generate_reply_header (*this->outgoing_, reply_params);
@@ -318,8 +317,7 @@ TAO_ServerRequest::send_no_exception_reply (void)
this->outgoing_->message_attributes (this->request_id_,
0,
TAO_REPLY,
- 0,
- false);
+ 0);
// Construct a REPLY header.
this->mesg_base_->generate_reply_header (*this->outgoing_, reply_params);
@@ -526,8 +524,7 @@ TAO_ServerRequest::send_cached_reply (CORBA::OctetSeq &s)
this->outgoing_->message_attributes (this->request_id_,
0,
TAO_REPLY,
- 0,
- false);
+ 0);
// Make the reply message
if (this->mesg_base_->generate_reply_header (*this->outgoing_,
diff --git a/TAO/tao/Transport.cpp b/TAO/tao/Transport.cpp
index ecfa26e9042..45fde50d47f 100644
--- a/TAO/tao/Transport.cpp
+++ b/TAO/tao/Transport.cpp
@@ -559,9 +559,10 @@ TAO_Transport::handle_output (ACE_Time_Value *max_wait_time)
int
TAO_Transport::format_queue_message (TAO_OutputCDR &stream,
- ACE_Time_Value *max_wait_time)
+ ACE_Time_Value *max_wait_time,
+ TAO_Stub& stub)
{
- if (this->messaging_object ()->format_message (stream) != 0)
+ if (this->messaging_object ()->format_message (stream, stub) != 0)
return -1;
return this->queue_message_i (stream.begin (), max_wait_time);
@@ -2259,7 +2260,6 @@ TAO_Transport::handle_input_parse_data (TAO_Resume_Handle &rh,
// putting them into queue. When this is done we can return
// to process this message, and notifying other threads to
// process the messages in queue.
-
char * end_marker = message_block.rd_ptr ()
+ mesg_length;
@@ -2329,7 +2329,6 @@ TAO_Transport::handle_input_parse_data (TAO_Resume_Handle &rh,
{
return -1;
}
-
// move the rd_ptr tp position of end_marker
message_block.rd_ptr (end_marker);
}
diff --git a/TAO/tao/Transport.h b/TAO/tao/Transport.h
index 6c6c01dd703..712da3f1f89 100644
--- a/TAO/tao/Transport.h
+++ b/TAO/tao/Transport.h
@@ -703,7 +703,8 @@ public:
/// @param max_wait_time The maximum time that the operation can
/// block, used in the implementation of timeouts.
int format_queue_message (TAO_OutputCDR &stream,
- ACE_Time_Value *max_wait_time);
+ ACE_Time_Value *max_wait_time,
+ TAO_Stub& stub);
/// Send a message block chain,
int send_message_block_chain (const ACE_Message_Block *message_block,
diff --git a/TAO/tao/ZIOP/ZIOP.cpp b/TAO/tao/ZIOP/ZIOP.cpp
index 945386be7fb..44b94319b96 100644
--- a/TAO/tao/ZIOP/ZIOP.cpp
+++ b/TAO/tao/ZIOP/ZIOP.cpp
@@ -103,11 +103,35 @@ TAO_ZIOP_Loader::Initializer (void)
return ACE_Service_Config::process_directive (ace_svc_desc_TAO_ZIOP_Loader);
}
+
+bool
+TAO_ZIOP_Loader::decompress (Compression::Compressor_ptr compressor,
+ const ::Compression::Buffer &source,
+ ::Compression::Buffer &target)
+{
+ try
+ {
+ compressor->decompress (source, target);
+ }
+ catch (::Compression::CompressionException &e)
+ {
+ ACE_ERROR_RETURN((LM_ERROR,
+ ACE_TEXT ("Decompression failed. Reason: %s. ")
+ ACE_TEXT ("Description : %s "),
+ ACE_TEXT ("Summary : %s\n"),
+ e.reason, e.description, e._info ()),
+ false);
+ }
+
+ return true;
+}
+
bool
-TAO_ZIOP_Loader::decompress (TAO_ServerRequest& server_request)
+TAO_ZIOP_Loader::decompress (ACE_Data_Block **db, TAO_Queued_Data& qd,
+ TAO_ORB_Core& orb_core)
{
CORBA::Object_var compression_manager =
- server_request.orb_core()->resolve_compression_manager();
+ orb_core.resolve_compression_manager();
Compression::CompressionManager_var manager =
Compression::CompressionManager::_narrow (compression_manager.in ());
@@ -115,16 +139,53 @@ TAO_ZIOP_Loader::decompress (TAO_ServerRequest& server_request)
if (!CORBA::is_nil(manager.in ()))
{
ZIOP::CompressedData data;
- if (!(*(server_request.incoming()) >> data))
+ //first set the read pointer after the header
+ size_t begin = qd.msg_block ()-> rd_ptr() - qd.msg_block ()->base ();
+ char * initial_rd_ptr = qd.msg_block ()-> rd_ptr();
+ size_t const wr = qd.msg_block ()->wr_ptr () - qd.msg_block ()->base ();
+
+ TAO_InputCDR cdr (*db,
+ qd.msg_block ()->self_flags (),
+ begin + TAO_GIOP_MESSAGE_HEADER_LEN,
+ wr,
+ qd.byte_order (),
+ qd.giop_version ().major_version (),
+ qd.giop_version ().minor_version (),
+ &orb_core);
+
+ if (!(cdr >> data))
return false;
-
- Compression::Compressor_var compressor = manager->get_compressor (data.compressorid, 6);
+
+ Compression::Compressor_var compressor =
+ manager->get_compressor (data.compressorid, 6);
CORBA::OctetSeq myout;
myout.length (data.original_length);
- compressor->decompress (data.data, myout);
- TAO_InputCDR* newstream = new TAO_InputCDR ((char*)myout.get_buffer(true), (size_t)data.original_length);
- server_request.incoming()->steal_from (*newstream);
+ if (decompress(compressor.in(), data.data, myout))
+ {
+ ACE_Message_Block *mb = new ACE_Message_Block();
+
+ mb->size ((size_t)(data.original_length +
+ TAO_GIOP_MESSAGE_HEADER_LEN));
+
+ qd.msg_block ()->rd_ptr (initial_rd_ptr);
+
+ mb->copy(qd.msg_block ()->base () + begin,
+ TAO_GIOP_MESSAGE_HEADER_LEN);
+
+ if (mb->copy((char*)myout.get_buffer(true),
+ (size_t)data.original_length) != 0)
+ ACE_ERROR_RETURN((LM_ERROR,
+ ACE_TEXT ("TAO - (%P|%t) - ")
+ ACE_TEXT ("Failed to copy decompressed data : ")
+ ACE_TEXT ("Buffer too small\n")),
+ false);
+ //change it into a GIOP message..
+ mb->base ()[0] = 0x47;
+ ACE_CDR::mb_align (mb);
+ *db = mb->data_block ();
+ return true;
+ }
}
else
{
@@ -135,26 +196,15 @@ TAO_ZIOP_Loader::decompress (TAO_ServerRequest& server_request)
}
CORBA::ULong
-TAO_ZIOP_Loader::compression_low_value (TAO::Profile_Transport_Resolver &resolver) const
+TAO_ZIOP_Loader::compression_policy_value (CORBA::Policy_ptr policy) const
{
CORBA::ULong result = 0;
#if defined (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0
- CORBA::Policy_var policy = CORBA::Policy::_nil ();
- if (resolver.stub () == 0)
- {
- policy =
- resolver.stub()->orb_core()->get_cached_policy_including_current (TAO_CACHED_COMPRESSION_LOW_VALUE_POLICY);
- }
- else
- {
- policy = resolver.stub ()->get_cached_policy (TAO_CACHED_COMPRESSION_LOW_VALUE_POLICY);
- }
-
- if (!CORBA::is_nil (policy.in ()))
+ if (!CORBA::is_nil (policy))
{
ZIOP::CompressionLowValuePolicy_var srp =
- ZIOP::CompressionLowValuePolicy::_narrow (policy.in ());
+ ZIOP::CompressionLowValuePolicy::_narrow (policy);
if (!CORBA::is_nil (srp.in ()))
{
@@ -162,7 +212,7 @@ TAO_ZIOP_Loader::compression_low_value (TAO::Profile_Transport_Resolver &resolve
}
}
#else
- ACE_UNUSED_ARG (resolver);
+ ACE_UNUSED_ARG (policy);
#endif
return result;
}
@@ -176,270 +226,296 @@ TAO_ZIOP_Loader::compress (Compression::Compressor_ptr compressor,
{
compressor->compress (source, target);
}
- catch (...)
+ catch (::Compression::CompressionException &e)
{
- return false;
+ ACE_ERROR_RETURN((LM_ERROR,
+ ACE_TEXT ("Compression failed. Reason: %s. ")
+ ACE_TEXT ("Description : %s "),
+ ACE_TEXT ("Summary : %s\n"),
+ e.reason, e.description, e._info ()),
+ false);
}
return true;
}
bool
-TAO_ZIOP_Loader::check_min_ratio (CORBA::ULong /* original_data_length */, CORBA::ULong /*compressed_length*/) const
+TAO_ZIOP_Loader::check_min_ratio (::Compression::CompressionRatio ratio,
+ CORBA::Long min_ratio) const
{
-/* ::Compression::CompressionRatio ratio = 100 - (compressed_length /original_length) * 100;
- if (resolver.stub () == 0)
+ bool accepted = min_ratio == 0 || ratio > min_ratio;
+ if (TAO_debug_level > 8)
{
- policy =
- resolver.stub()->orb_core()->get_cached_policy_including_current (TAO_CACHED_COMPRESSION_ENABLING_POLICY);
+ ACE_ERROR ((LM_ERROR,
+ ACE_TEXT ("TAO (%P|%t) - TAO_ZIOP_Loader::check_min_ratio: ")
+ ACE_TEXT ("Ratio:%d Accepted:%d\n"),
+ ratio, accepted));
+ }
+ return accepted;
+}
+
+
+bool
+TAO_ZIOP_Loader::get_compressor_details (
+ ::Compression::CompressorIdLevelList *list,
+ Compression::CompressorId &compressor_id,
+ Compression::CompressionLevel &compression_level)
+
+{
+ if (list)
+ {
+ compressor_id = (*list)[0].compressor_id;
+ compression_level = (*list)[0].compression_level;
}
else
{
- policy = resolver.stub ()->get_cached_policy (TAO_CACHED_COMPRESSION_ENABLING_POLICY);
- }*/
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("TAO (%P|%t) - ")
+ ACE_TEXT ("TAO_ZIOP_Loader::get_compressor_details: ")
+ ACE_TEXT ("No appropriate compressor found\n")));
+ return false;
+ }
return true;
}
bool
-TAO_ZIOP_Loader::marshal_data (TAO_Operation_Details &details, TAO_OutputCDR &stream, TAO::Profile_Transport_Resolver &resolver)
+TAO_ZIOP_Loader::get_compression_details(
+ CORBA::Policy_ptr compression_enabling_policy,
+ CORBA::Policy_ptr compression_level_list_policy,
+ Compression::CompressorId &compressor_id,
+ Compression::CompressionLevel &compression_level)
{
-#if defined (TAO_HAS_ZIOP) && TAO_HAS_ZIOP == 0
- ACE_UNUSED_ARG (details);
- ACE_UNUSED_ARG (stream);
- ACE_UNUSED_ARG (resolver);
-#else
- CORBA::Boolean use_ziop = false;
- Compression::CompressorId compressor_id = Compression::COMPRESSORID_ZLIB;
- Compression::CompressionLevel compression_level = 0;
-
- CORBA::Policy_var policy = resolver.stub ()->get_cached_policy (TAO_CACHED_COMPRESSION_ENABLING_POLICY);
-
- if (!CORBA::is_nil (policy.in ()))
+ bool use_ziop = false;
+ if (!CORBA::is_nil (compression_enabling_policy))
{
ZIOP::CompressionEnablingPolicy_var srp =
- ZIOP::CompressionEnablingPolicy::_narrow (policy.in ());
+ ZIOP::CompressionEnablingPolicy::_narrow (compression_enabling_policy);
if (!CORBA::is_nil (srp.in ()))
{
use_ziop = srp->compression_enabled ();
+ if (!use_ziop && TAO_debug_level > 8)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("TAO (%P|%t) - ")
+ ACE_TEXT ("TAO_ZIOP_Loader::get_compression_details: ")
+ ACE_TEXT ("No ZIOP policy set\n")));
+ }
}
}
-
+ else
+ {
+ ACE_ERROR((LM_ERROR,
+ ACE_TEXT("TAO (%P|%t) - ")
+ ACE_TEXT("TAO_ZIOP_Loader::get_compression_details : ")
+ ACE_TEXT("compression_enabling_policy is NIL. No ZIOP\n")));
+ }
+
if (use_ziop)
{
- policy = resolver.stub ()->get_cached_policy (TAO_CACHED_COMPRESSION_ID_LEVEL_LIST_POLICY);
-
- if (!CORBA::is_nil (policy.in ()))
+ if (!CORBA::is_nil (compression_level_list_policy))
{
ZIOP::CompressorIdLevelListPolicy_var srp =
- ZIOP::CompressorIdLevelListPolicy::_narrow (policy.in ());
+ ZIOP::CompressorIdLevelListPolicy::_narrow (compression_level_list_policy);
if (!CORBA::is_nil (srp.in ()))
{
- ::Compression::CompressorIdLevelList* list = srp->compressor_ids ();
- if (list)
- {
- compressor_id = (*list)[0].compressor_id;
- compression_level = (*list)[0].compression_level;
- }
- else
- {
- // No compatible compressor found
- use_ziop = false;
- }
+ use_ziop = get_compressor_details (srp->compressor_ids (),
+ compressor_id, compression_level);
}
}
+ else
+ {
+ ACE_ERROR((LM_ERROR,
+ ACE_TEXT("TAO (%P|%t) - ")
+ ACE_TEXT("TAO_ZIOP_Loader::get_compression_details : ")
+ ACE_TEXT("Compression level policy not found\n")));
+ }
}
+ return use_ziop;
+}
- ACE_Message_Block* current = const_cast <ACE_Message_Block*> (stream.current ());
-
- if (use_ziop)
+void
+TAO_ZIOP_Loader::complete_compression (Compression::Compressor_ptr compressor,
+ TAO_OutputCDR &cdr,
+ ACE_Message_Block& mb,
+ char *initial_rd_ptr,
+ CORBA::ULong low_value,
+ CORBA::Long min_ratio,
+ CORBA::ULong original_data_length,
+ Compression::CompressorId compressor_id)
+{
+ if (low_value > 0 && original_data_length > low_value)
{
- // Set the read pointer to the point where the application data starts
- current->rd_ptr (current->wr_ptr());
- }
+ CORBA::OctetSeq myout;
+ CORBA::OctetSeq input (original_data_length, &mb);
+ myout.length (original_data_length);
- // Marshal application data
- if (!details.marshal_args (stream))
- {
- throw ::CORBA::MARSHAL ();
+ bool compressed = this->compress (compressor, input, myout);
+
+ if (compressed &&
+ (myout.length () < original_data_length) &&
+ (this->check_min_ratio (compressor->compression_ratio(),
+ min_ratio)))
+ {
+ mb.wr_ptr (mb.rd_ptr ());
+ cdr.current_alignment (mb.wr_ptr() - mb.base ());
+ ZIOP::CompressedData data;
+ data.compressorid = compressor_id;
+ data.original_length = input.length();
+ data.data = myout;
+ cdr << data;
+ mb.rd_ptr(initial_rd_ptr);
+ int begin = (mb.rd_ptr() - mb.base ());
+ mb.data_block ()->base ()[0 + begin] = 0x5A;
+ mb.data_block ()->base ()[TAO_GIOP_MESSAGE_SIZE_OFFSET + begin] =
+ cdr.length() - TAO_GIOP_MESSAGE_HEADER_LEN;
+ }
}
+ else if (TAO_debug_level > 8)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("TAO (%P|%t) - ")
+ ACE_TEXT ("TAO_ZIOP_Loader::compress_data: ")
+ ACE_TEXT ("No compression used")
+ ACE_TEXT ("->Low Value Policy applied\n")));
+ }
+}
- current = const_cast <ACE_Message_Block*> (stream.current());
- CORBA::ULong const original_data_length =(CORBA::ULong)(current->wr_ptr() - current->rd_ptr());
+bool
+TAO_ZIOP_Loader::compress_data (TAO_OutputCDR &cdr,
+ CORBA::Object_ptr compression_manager,
+ CORBA::ULong low_value,
+ CORBA::Long min_ratio,
+ Compression::CompressorId compressor_id,
+ Compression::CompressionLevel compression_level)
+{
+ cdr.consolidate ();
+
+ ACE_Message_Block* current = const_cast <ACE_Message_Block*> (cdr.current ());
- if (use_ziop && original_data_length > 0)
- {
- // We can only compress one message block, so when compression is enabled first do
- // a consolidate.
- stream.consolidate ();
+ char* initial_rd_ptr = current->rd_ptr();
- CORBA::Object_var compression_manager =
- resolver.stub()->orb_core()->resolve_compression_manager();
+ // Set the read pointer to the point where the data starts
+ current->rd_ptr (TAO_GIOP_MESSAGE_HEADER_LEN);
+
+ current = const_cast <ACE_Message_Block*> (cdr.current());
+ CORBA::ULong const original_data_length =
+ (CORBA::ULong)(current->wr_ptr() - current->rd_ptr());
+ if (original_data_length > 0)
+ {
Compression::CompressionManager_var manager =
- Compression::CompressionManager::_narrow (compression_manager.in ());
+ Compression::CompressionManager::_narrow (compression_manager);
if (!CORBA::is_nil(manager.in ()))
{
- Compression::Compressor_var compressor = manager->get_compressor (compressor_id, compression_level);
-
- if (original_data_length > this->compression_low_value (resolver))
- {
- CORBA::OctetSeq myout;
- CORBA::OctetSeq input (original_data_length, current);
- myout.length (original_data_length);
-
- bool compressed = this->compress (compressor.in (), input, myout);
-
- if (compressed && (myout.length () < original_data_length) && (this->check_min_ratio (original_data_length, myout.length())))
- {
- stream.compressed (true);
- current->wr_ptr (current->rd_ptr ());
- stream.current_alignment (current->wr_ptr() - current->base ());
- ZIOP::CompressedData data;
- data.compressorid = compressor_id;
- data.original_length = input.length();
- data.data = myout;
- stream << data;
- }
- }
+ Compression::Compressor_var compressor =
+ manager->get_compressor (compressor_id, compression_level);
+
+ complete_compression(compressor.in (), cdr, *current,
+ initial_rd_ptr, low_value, min_ratio,
+ original_data_length, compressor_id);
}
}
-
- // Set the read pointer back to the starting point
- current->rd_ptr (current->base ());
-#endif
-
+ //set back read pointer in case no compression was done...
+ current->rd_ptr(initial_rd_ptr);
return true;
}
bool
-TAO_ZIOP_Loader::marshal_reply_data (TAO_ServerRequest& server_request,
- TAO::Argument * const * args,
- size_t nargs)
+TAO_ZIOP_Loader::marshal_data (TAO_OutputCDR &cdr, TAO_Stub& stub)
{
#if defined (TAO_HAS_ZIOP) && TAO_HAS_ZIOP == 0
- ACE_UNUSED_ARG (server_request);
- ACE_UNUSED_ARG (args);
- ACE_UNUSED_ARG (nargs);
+ ACE_UNUSED_ARG (cdr);
+ ACE_UNUSED_ARG (stub);
+ return true;
#else
CORBA::Boolean use_ziop = false;
Compression::CompressorId compressor_id = Compression::COMPRESSORID_ZLIB;
Compression::CompressionLevel compression_level = 0;
- TAO_Transport& transport = *server_request.transport ();
-
- CORBA::Policy_var policy = transport.orb_core()->get_cached_policy_including_current (TAO_CACHED_COMPRESSION_ENABLING_POLICY);
-
- if (!CORBA::is_nil (policy.in ()))
- {
- ZIOP::CompressionEnablingPolicy_var srp =
- ZIOP::CompressionEnablingPolicy::_narrow (policy.in ());
-
- if (!CORBA::is_nil (srp.in ()))
- {
- use_ziop = srp->compression_enabled ();
- }
- }
+
+ CORBA::Policy_var compression_enabling_policy =
+ stub.get_cached_policy (TAO_CACHED_COMPRESSION_ENABLING_POLICY);
+ CORBA::Policy_var compression_level_list_policy =
+ stub.get_cached_policy (TAO_CACHED_COMPRESSION_ID_LEVEL_LIST_POLICY);
+
+ use_ziop = get_compression_details(compression_enabling_policy.in (),
+ compression_level_list_policy.in (),
+ compressor_id, compression_level);
if (use_ziop)
{
- policy = transport.orb_core()->get_cached_policy_including_current (TAO_CACHED_COMPRESSION_ID_LEVEL_LIST_POLICY);
+ CORBA::Object_var compression_manager =
+ stub.orb_core ()->resolve_compression_manager();
- if (!CORBA::is_nil (policy.in ()))
- {
- ZIOP::CompressorIdLevelListPolicy_var srp =
- ZIOP::CompressorIdLevelListPolicy::_narrow (policy.in ());
+ CORBA::Policy_var policy_low_value =
+ stub.get_cached_policy (TAO_CACHED_COMPRESSION_LOW_VALUE_POLICY);
+ CORBA::Policy_var policy_min_ratio =
+ stub.get_cached_policy (TAO_CACHED_MIN_COMPRESSION_RATIO_POLICY);
- if (!CORBA::is_nil (srp.in ()))
- {
- ::Compression::CompressorIdLevelList* list = srp->compressor_ids ();
- if (list)
- {
- compressor_id = (*list)[0].compressor_id;
- compression_level = (*list)[0].compression_level;
- }
- else
- {
- // No compatible compressor found
- use_ziop = false;
- }
- }
- }
- }
+ CORBA::ULong low_value =
+ this->compression_policy_value (policy_low_value.in ());
+ CORBA::Long min_ratio =
+ this->compression_policy_value (policy_min_ratio.in ());
- TAO_OutputCDR & stream = (*server_request.outgoing ());
- ACE_Message_Block* current = const_cast <ACE_Message_Block*> (stream.current ());
-
- if (use_ziop)
- {
- // Set the read pointer to the point where the application data starts
- current->rd_ptr (current->wr_ptr());
+ return compress_data(cdr, compression_manager.in (),
+ low_value, min_ratio,
+ compressor_id, compression_level);
}
+ return false;
+#endif
+}
- // Marshal application data
- TAO::Argument * const * const begin = args;
- TAO::Argument * const * const end = args + nargs;
-
- for (TAO::Argument * const * i = begin; i != end; ++i)
- {
- if (!(*i)->marshal (stream))
- {
- TAO_OutputCDR::throw_skel_exception (errno);
- }
- }
-
- // Reply body marshaling completed. No other fragments to send.
- stream.more_fragments (false);
-
- current = const_cast <ACE_Message_Block*> (stream.current());
- CORBA::ULong const original_data_length =(CORBA::ULong)(current->wr_ptr() - current->rd_ptr());
+bool
+TAO_ZIOP_Loader::marshal_data (TAO_OutputCDR& cdr, TAO_ORB_Core& orb_core)
+{
+#if defined (TAO_HAS_ZIOP) && TAO_HAS_ZIOP == 0
+ ACE_UNUSED_ARG (cdr);
+ ACE_UNUSED_ARG (orb_core);
+ return true;
+#else
+ CORBA::Boolean use_ziop = false;
+ Compression::CompressorId compressor_id = Compression::COMPRESSORID_ZLIB;
+ Compression::CompressionLevel compression_level = 0;
- if (use_ziop && original_data_length > 0)
+ CORBA::Policy_var compression_enabling_policy =
+ orb_core.get_cached_policy_including_current
+ (TAO_CACHED_COMPRESSION_ENABLING_POLICY);
+
+ CORBA::Policy_var compression_level_list_policy =
+ orb_core.get_cached_policy_including_current
+ (TAO_CACHED_COMPRESSION_ID_LEVEL_LIST_POLICY);
+
+ use_ziop = get_compression_details(compression_enabling_policy.in (),
+ compression_level_list_policy.in (),
+ compressor_id, compression_level);
+
+ if (use_ziop)
{
- // We can only compress one message block, so when compression is enabled first do
- // a consolidate.
- stream.consolidate ();
-
CORBA::Object_var compression_manager =
- server_request.transport ()->orb_core()->resolve_compression_manager();
-
- Compression::CompressionManager_var manager =
- Compression::CompressionManager::_narrow (compression_manager.in ());
-
- if (!CORBA::is_nil(manager.in ()))
- {
- Compression::Compressor_var compressor = manager->get_compressor (compressor_id, compression_level);
-
-// if (original_data_length > this->compression_low_value (resolver)
- if (1)
- {
- CORBA::OctetSeq myout;
- CORBA::OctetSeq input (original_data_length, current);
- myout.length (original_data_length);
-
- bool compressed = this->compress (compressor.in (), input, myout);
-
- if (compressed && (myout.length () < original_data_length)) // && (this->check_min_ratio (original_data_length, myout.length())))
- {
- stream.compressed (true);
- current->wr_ptr (current->rd_ptr ());
- stream.current_alignment (current->wr_ptr() - current->base ());
- ZIOP::CompressedData data;
- data.compressorid = compressor_id;
- data.original_length = input.length();
- data.data = myout;
- stream << data;
- }
- }
- }
+ orb_core.resolve_compression_manager();
+
+ CORBA::Policy_var policy_low_value =
+ orb_core.get_cached_policy_including_current
+ (TAO_CACHED_COMPRESSION_LOW_VALUE_POLICY);
+
+ CORBA::Policy_var policy_min_ratio =
+ orb_core.get_cached_policy_including_current
+ (TAO_CACHED_MIN_COMPRESSION_RATIO_POLICY);
+
+ CORBA::ULong low_value =
+ this->compression_policy_value (policy_low_value.in ());
+ CORBA::Long min_ratio =
+ this->compression_policy_value (policy_min_ratio.in ());
+
+ return compress_data(cdr, compression_manager.in (),
+ low_value, min_ratio,
+ compressor_id, compression_level);
}
+ return false;
- // Set the read pointer back to the starting point
- current->rd_ptr (current->base ());
#endif
-
- return true;
}
diff --git a/TAO/tao/ZIOP/ZIOP.h b/TAO/tao/ZIOP/ZIOP.h
index c21dc518c51..310117f45fb 100644
--- a/TAO/tao/ZIOP/ZIOP.h
+++ b/TAO/tao/ZIOP/ZIOP.h
@@ -47,10 +47,11 @@ public:
/// Destructor
virtual ~TAO_ZIOP_Loader (void);
- virtual bool decompress (TAO_ServerRequest& server_request);
+ virtual bool decompress (ACE_Data_Block **db, TAO_Queued_Data& qd, TAO_ORB_Core& orb_core);
// Compress the @a stream. Starting point of the compression is rd_ptr()
- virtual bool marshal_data (TAO_Operation_Details &details, TAO_OutputCDR &stream, TAO::Profile_Transport_Resolver &resolver);
+ virtual bool marshal_data (TAO_OutputCDR& cdr, TAO_Stub& stub);
+ virtual bool marshal_data (TAO_OutputCDR& cdr, TAO_ORB_Core& orb_core);
/// Initialize the BiDIR loader hooks.
virtual int init (int argc, ACE_TCHAR* []);
@@ -60,10 +61,6 @@ public:
/// Used to force the initialization of the ORB code.
static int Initializer (void);
- bool marshal_reply_data (TAO_ServerRequest& server_request,
- TAO::Argument * const * args,
- size_t nargs);
-
private:
/// Flag to indicate whether the ZIOP library has been
@@ -71,13 +68,44 @@ private:
static bool is_activated_;
/// Get the compression low value, returns 0 when it is not set
- CORBA::ULong compression_low_value (TAO::Profile_Transport_Resolver &resolver) const;
+ CORBA::ULong compression_policy_value (CORBA::Policy_ptr policy) const;
+
+ bool get_compressor_details (
+ ::Compression::CompressorIdLevelList *list,
+ Compression::CompressorId &compressor_id,
+ Compression::CompressionLevel &compression_level);
+
+ bool get_compression_details(CORBA::Policy_ptr compression_enabling_policy,
+ CORBA::Policy_ptr compression_level_list_policy,
+ Compression::CompressorId &compressor_id,
+ Compression::CompressionLevel &compression_level);
+
+ void complete_compression (Compression::Compressor_ptr compressor,
+ TAO_OutputCDR &cdr,
+ ACE_Message_Block& mb,
+ char *initial_rd_ptr,
+ CORBA::ULong low_value,
+ CORBA::Long min_ratio,
+ CORBA::ULong original_data_length,
+ Compression::CompressorId compressor_id);
+
+ bool compress_data (TAO_OutputCDR &cdr,
+ CORBA::Object_ptr compression_manager,
+ CORBA::ULong low_value,
+ CORBA::Long min_ratio,
+ ::Compression::CompressorId compressor_id,
+ ::Compression::CompressionLevel compression_level);
bool compress (Compression::Compressor_ptr compressor,
const ::Compression::Buffer &source,
::Compression::Buffer &target);
+
+ bool decompress (Compression::Compressor_ptr compressor,
+ const ::Compression::Buffer &source,
+ ::Compression::Buffer &target);
- bool check_min_ratio (CORBA::ULong original_data_length, CORBA::ULong compressed_length) const;
+ bool check_min_ratio (::Compression::CompressionRatio ratio,
+ CORBA::Long min_ratio) const;
};
static int
diff --git a/TAO/tao/ZIOP/ZIOP.pidl b/TAO/tao/ZIOP/ZIOP.pidl
index 35cee2379f3..d3b49079881 100644
--- a/TAO/tao/ZIOP/ZIOP.pidl
+++ b/TAO/tao/ZIOP/ZIOP.pidl
@@ -19,7 +19,7 @@ module ZIOP
/**
* Tag Id for CompressionEnablingPolicy
*/
- const CORBA::PolicyType COMPRESSION_ENABLING_POLICY_ID = 5555;
+ const CORBA::PolicyType COMPRESSION_ENABLING_POLICY_ID = 64;
/**
* The ZIOP CompressionEnablingPolicy. Has an boolean attribute indicating
@@ -35,7 +35,7 @@ module ZIOP
/**
* Tag Id for CompressorIdPolicy
*/
- const CORBA::PolicyType COMPRESSOR_ID_LEVEL_LIST_POLICY_ID = 5556;
+ const CORBA::PolicyType COMPRESSOR_ID_LEVEL_LIST_POLICY_ID = 65;
/**
* The ZIOP CompressorIdListPolicy. Has an CompressorId attribute indicating
@@ -46,7 +46,7 @@ module ZIOP
readonly attribute Compression::CompressorIdLevelList compressor_ids;
};
- const CORBA::PolicyType COMPRESSION_LOW_VALUE_POLICY_ID = 5557;
+ const CORBA::PolicyType COMPRESSION_LOW_VALUE_POLICY_ID = 66;
typedef unsigned long CompressionLowValuePolicyValue;
@@ -55,7 +55,11 @@ module ZIOP
readonly attribute CompressionLowValuePolicyValue low_value;
};
- const CORBA::PolicyType COMPRESSION_MIN_RATIO_POLICY_ID = 5558;
+ /**
+ * The ZIOP CompressionMinRatioPolicy. If the compression ratio is smaller
+ * than this setting, a message is send uncompressed.
+ */
+ const CORBA::PolicyType COMPRESSION_MIN_RATIO_POLICY_ID = 67;
local interface CompressionMinRatioPolicy : CORBA::Policy
{
diff --git a/TAO/tao/ZIOP/ZIOP_PolicyFactory.cpp b/TAO/tao/ZIOP/ZIOP_PolicyFactory.cpp
index e5e49a738ac..dcf763a285b 100644
--- a/TAO/tao/ZIOP/ZIOP_PolicyFactory.cpp
+++ b/TAO/tao/ZIOP/ZIOP_PolicyFactory.cpp
@@ -25,7 +25,6 @@ TAO_ZIOP_PolicyFactory::create_policy (
::CORBA::Boolean val;
// Extract the value from the any.
-
if (!(value >>= CORBA::Any::to_boolean (val)))
{
throw CORBA::PolicyError (CORBA::BAD_POLICY_VALUE);
@@ -66,7 +65,6 @@ TAO_ZIOP_PolicyFactory::create_policy (
::CORBA::ULong val;
// Extract the value from the any.
-
if (!(value >>= val))
{
throw CORBA::PolicyError (CORBA::BAD_POLICY_VALUE);
@@ -87,7 +85,6 @@ TAO_ZIOP_PolicyFactory::create_policy (
::Compression::CompressionRatio val;
// Extract the value from the any.
-
if (!(value >>= val))
{
throw CORBA::PolicyError (CORBA::BAD_POLICY_VALUE);
diff --git a/TAO/tao/ZIOP/ZIOP_Policy_Validator.cpp b/TAO/tao/ZIOP/ZIOP_Policy_Validator.cpp
index bec32269420..9d079b1fe2d 100644
--- a/TAO/tao/ZIOP/ZIOP_Policy_Validator.cpp
+++ b/TAO/tao/ZIOP/ZIOP_Policy_Validator.cpp
@@ -32,65 +32,80 @@ TAO_ZIOPPolicy_Validator::validate_impl (TAO_Policy_Set &policies)
if (srp.in () == 0)
return;
-
- // Set the flag in the ORB_Core
- orb_core_.ziop_enabled (srp->compression_enabled ());
}
void
TAO_ZIOPPolicy_Validator::merge_policies_impl (TAO_Policy_Set &policies)
{
- // Check if the user has specified the priority model policy.
- CORBA::Policy_var priority_model =
+ // Check if the user has specified the compression enabled policy.
+ CORBA::Policy_var compression_enabled =
policies.get_cached_policy (TAO_CACHED_COMPRESSION_ENABLING_POLICY);
- if (CORBA::is_nil (priority_model.in ()))
+ if (CORBA::is_nil (compression_enabled.in ()))
{
- // If not, check if the priority model policy has been specified
+ // If not, check if the compression enabled policy has been specified
// at the ORB level.
- priority_model =
+ compression_enabled =
this->orb_core_.get_cached_policy (TAO_CACHED_COMPRESSION_ENABLING_POLICY);
- if (!CORBA::is_nil (priority_model.in ()))
+ if (!CORBA::is_nil (compression_enabled.in ()))
{
// If so, we'll use that policy.
- policies.set_policy (priority_model.in ());
+ policies.set_policy (compression_enabled.in ());
}
}
- // Check if the user has specified the server protocol policy.
- CORBA::Policy_var server_protocol =
+ // Check if the user has specified the compression low value policy.
+ CORBA::Policy_var low_value_policy =
policies.get_cached_policy (TAO_CACHED_COMPRESSION_LOW_VALUE_POLICY);
- if (CORBA::is_nil (server_protocol.in ()))
+ if (CORBA::is_nil (low_value_policy.in ()))
{
- // If not, check if the server protocol policy has been
+ // If not, check if the compression low value policy has been
// specified at the ORB level.
- server_protocol =
+ low_value_policy =
this->orb_core_.get_cached_policy (TAO_CACHED_COMPRESSION_LOW_VALUE_POLICY);
- if (!CORBA::is_nil (server_protocol.in ()))
+ if (!CORBA::is_nil (low_value_policy.in ()))
+ {
+ // If so, we'll use that policy.
+ policies.set_policy (low_value_policy.in ());
+ }
+ }
+
+ // Check if the user has specified the minimum compression ratio policy.
+ CORBA::Policy_var min_ratio_policy =
+ policies.get_cached_policy (TAO_CACHED_MIN_COMPRESSION_RATIO_POLICY);
+
+ if (CORBA::is_nil (min_ratio_policy.in ()))
+ {
+ // If not, check if the minimum compression ratio policy has been
+ // specified at the ORB level.
+ min_ratio_policy =
+ this->orb_core_.get_cached_policy (TAO_CACHED_MIN_COMPRESSION_RATIO_POLICY);
+
+ if (!CORBA::is_nil (min_ratio_policy.in ()))
{
// If so, we'll use that policy.
- policies.set_policy (server_protocol.in ());
+ policies.set_policy (min_ratio_policy.in ());
}
}
- // Check if the user has specified the server protocol policy.
- CORBA::Policy_var x =
+ // Check if the user has specified the compression list policy.
+ CORBA::Policy_var compressior_list_policy =
policies.get_cached_policy (TAO_CACHED_COMPRESSION_ID_LEVEL_LIST_POLICY);
- if (CORBA::is_nil (x.in ()))
+ if (CORBA::is_nil (compressior_list_policy.in ()))
{
- // If not, check if the server protocol policy has been
+ // If not, check if the compression list policy has been
// specified at the ORB level.
- x =
+ compressior_list_policy =
this->orb_core_.get_cached_policy (TAO_CACHED_COMPRESSION_ID_LEVEL_LIST_POLICY);
- if (!CORBA::is_nil (x.in ()))
+ if (!CORBA::is_nil (compressior_list_policy.in ()))
{
// If so, we'll use that policy.
- policies.set_policy (x.in ());
+ policies.set_policy (compressior_list_policy.in ());
}
}
}
@@ -100,6 +115,7 @@ TAO_ZIOPPolicy_Validator::legal_policy_impl (CORBA::PolicyType type)
{
return (type == ZIOP::COMPRESSION_ENABLING_POLICY_ID ||
type == ZIOP::COMPRESSION_LOW_VALUE_POLICY_ID ||
+ type == ZIOP::COMPRESSION_MIN_RATIO_POLICY_ID ||
type == ZIOP::COMPRESSOR_ID_LEVEL_LIST_POLICY_ID);
}
diff --git a/TAO/tao/ZIOP/ZIOP_Policy_i.cpp b/TAO/tao/ZIOP/ZIOP_Policy_i.cpp
index fc7fb5c8237..c6306341d7a 100644
--- a/TAO/tao/ZIOP/ZIOP_Policy_i.cpp
+++ b/TAO/tao/ZIOP/ZIOP_Policy_i.cpp
@@ -307,7 +307,7 @@ CompressionMinRatioPolicy::policy_type (void)
{
// Future policy implementors: notice how this minimizes the
// footprint of the class.
- return ZIOP::COMPRESSION_LOW_VALUE_POLICY_ID;
+ return ZIOP::COMPRESSION_MIN_RATIO_POLICY_ID;
}
@@ -350,7 +350,7 @@ CompressionMinRatioPolicy::ratio (void)
TAO_Cached_Policy_Type
CompressionMinRatioPolicy::_tao_cached_type (void) const
{
- return TAO_CACHED_POLICY_UNCACHED;
+ return TAO_CACHED_MIN_COMPRESSION_RATIO_POLICY;
}
}
diff --git a/TAO/tao/ZIOP/ZIOP_Policy_i.h b/TAO/tao/ZIOP/ZIOP_Policy_i.h
index 505d23a064f..a5b992bf864 100644
--- a/TAO/tao/ZIOP/ZIOP_Policy_i.h
+++ b/TAO/tao/ZIOP/ZIOP_Policy_i.h
@@ -161,9 +161,9 @@ private:
};
/**
- * @class CompressionLowValuePolicy
+ * @class CompressionMinRatioPolicy
*
- * @brief Implementation of the ZIOP::CompressionLowValuePolicy
+ * @brief Implementation of the ZIOP::CompressionMinRatioPolicy
*/
class CompressionMinRatioPolicy
: public virtual ::ZIOP::CompressionMinRatioPolicy
diff --git a/TAO/tao/ZIOP_Adapter.h b/TAO/tao/ZIOP_Adapter.h
index 73be3fad27c..1fbc4d41a6b 100644
--- a/TAO/tao/ZIOP_Adapter.h
+++ b/TAO/tao/ZIOP_Adapter.h
@@ -27,6 +27,7 @@
TAO_BEGIN_VERSIONED_NAMESPACE_DECL
class TAO_Policy_Validator;
+class TAO_Queued_Data;
/**
* @class TAO_ZIOP_Adapter
@@ -39,16 +40,13 @@ class TAO_Policy_Validator;
class TAO_Export TAO_ZIOP_Adapter : public ACE_Service_Object
{
public:
- virtual bool decompress (TAO_ServerRequest& server_request) = 0;
+ virtual bool decompress (ACE_Data_Block **db, TAO_Queued_Data& qd, TAO_ORB_Core& orb_core) = 0;
- virtual bool marshal_data (TAO_Operation_Details &details, TAO_OutputCDR &stream, TAO::Profile_Transport_Resolver &resolver_) = 0;
+ virtual bool marshal_data (TAO_OutputCDR& cdr, TAO_Stub& stub) = 0;
+ virtual bool marshal_data (TAO_OutputCDR& cdr, TAO_ORB_Core& orb_core) = 0;
virtual void load_policy_validators (TAO_Policy_Validator &validator) = 0;
- virtual bool marshal_reply_data (TAO_ServerRequest& server_request,
- TAO::Argument * const * args,
- size_t nargs) = 0;
-
/// The virtual destructor
virtual ~TAO_ZIOP_Adapter (void);
};
diff --git a/TAO/tao/orbconf.h b/TAO/tao/orbconf.h
index 744499b3726..2bfc004ed98 100644
--- a/TAO/tao/orbconf.h
+++ b/TAO/tao/orbconf.h
@@ -773,6 +773,8 @@ enum TAO_Cached_Policy_Type
TAO_CACHED_COMPRESSION_LOW_VALUE_POLICY,
+ TAO_CACHED_MIN_COMPRESSION_RATIO_POLICY,
+
TAO_CACHED_COMPRESSION_ID_LEVEL_LIST_POLICY,
/// NOTE: The "TAO_CACHED_POLICY_MAX_CACHED" should always be the last.