summaryrefslogtreecommitdiff
path: root/TAO/tao
diff options
context:
space:
mode:
authorJohnny Willemsen <jwillemsen@remedy.nl>2006-04-20 12:40:50 +0000
committerJohnny Willemsen <jwillemsen@remedy.nl>2006-04-20 12:40:50 +0000
commit4088da9444fdfc114133c009170f53c5547a23c1 (patch)
tree8eaa757e26eff9e7ac20b3083429ff45aa4faefb /TAO/tao
parentd535deda0a24a7bbdaa76f8436ea808ea4770d12 (diff)
downloadATCD-4088da9444fdfc114133c009170f53c5547a23c1.tar.gz
ChangeLogTag: Thu Apr 20 11:50:12 UTC 2006 Johnny Willemsen <jwillemsen@remedy.nl>
Diffstat (limited to 'TAO/tao')
-rw-r--r--TAO/tao/Array_Traits_T.h4
-rw-r--r--TAO/tao/CDR.cpp131
-rw-r--r--TAO/tao/CDR.h104
-rw-r--r--TAO/tao/CDR.i113
-rw-r--r--TAO/tao/GIOP_Fragmentation_Strategy.cpp9
-rw-r--r--TAO/tao/GIOP_Fragmentation_Strategy.h85
-rw-r--r--TAO/tao/GIOP_Message_Base.cpp142
-rw-r--r--TAO/tao/GIOP_Message_Base.h35
-rw-r--r--TAO/tao/GIOP_Message_Generator_Parser.h5
-rw-r--r--TAO/tao/GIOP_Message_Generator_Parser_10.cpp10
-rw-r--r--TAO/tao/GIOP_Message_Generator_Parser_10.h3
-rw-r--r--TAO/tao/GIOP_Message_Generator_Parser_12.cpp11
-rw-r--r--TAO/tao/GIOP_Message_Generator_Parser_12.h3
-rw-r--r--TAO/tao/GIOP_Message_Lite.cpp57
-rw-r--r--TAO/tao/GIOP_Message_Lite.h6
-rw-r--r--TAO/tao/IIOP_Transport.cpp2
-rw-r--r--TAO/tao/Messaging/Asynch_Invocation.cpp7
-rw-r--r--TAO/tao/Null_Fragmentation_Strategy.cpp17
-rw-r--r--TAO/tao/Null_Fragmentation_Strategy.h64
-rw-r--r--TAO/tao/ORB_Core.cpp18
-rw-r--r--TAO/tao/ORB_Core.h4
-rw-r--r--TAO/tao/On_Demand_Fragmentation_Strategy.cpp93
-rw-r--r--TAO/tao/On_Demand_Fragmentation_Strategy.h77
-rw-r--r--TAO/tao/Pluggable_Messaging.h6
-rw-r--r--TAO/tao/PortableServer/Upcall_Wrapper.cpp4
-rw-r--r--TAO/tao/Range_Checking_T.h4
-rw-r--r--TAO/tao/Remote_Invocation.cpp2
-rw-r--r--TAO/tao/Resource_Factory.h8
-rw-r--r--TAO/tao/Strategies/DIOP_Transport.cpp1
-rw-r--r--TAO/tao/Strategies/SCIOP_Transport.cpp2
-rw-r--r--TAO/tao/Strategies/SHMIOP_Transport.cpp2
-rw-r--r--TAO/tao/Strategies/UIOP_Transport.cpp2
-rw-r--r--TAO/tao/Synch_Invocation.cpp11
-rw-r--r--TAO/tao/TAO_Server_Request.cpp28
-rw-r--r--TAO/tao/default_resource.cpp52
-rw-r--r--TAO/tao/default_resource.h4
-rw-r--r--TAO/tao/operation_details.cpp6
-rw-r--r--TAO/tao/params.cpp1
-rw-r--r--TAO/tao/params.h16
-rw-r--r--TAO/tao/params.i12
-rw-r--r--TAO/tao/tao.mpc3
41 files changed, 1051 insertions, 113 deletions
diff --git a/TAO/tao/Array_Traits_T.h b/TAO/tao/Array_Traits_T.h
index f7c25d4ebd5..56af536bc40 100644
--- a/TAO/tao/Array_Traits_T.h
+++ b/TAO/tao/Array_Traits_T.h
@@ -14,6 +14,8 @@
#include <algorithm>
#include "tao/Array_VarOut_T.h"
+TAO_BEGIN_VERSIONED_NAMESPACE_DECL
+
namespace TAO
{
namespace details
@@ -62,4 +64,6 @@ struct array_traits
} // namespace details
} // namespace CORBA
+TAO_END_VERSIONED_NAMESPACE_DECL
+
#endif // guard_array_traits_hpp
diff --git a/TAO/tao/CDR.cpp b/TAO/tao/CDR.cpp
index e738a4843ad..31efb9b0ea6 100644
--- a/TAO/tao/CDR.cpp
+++ b/TAO/tao/CDR.cpp
@@ -5,6 +5,7 @@
#include "tao/ORB_Core.h"
#include "tao/Environment.h"
#include "tao/SystemException.h"
+#include "tao/GIOP_Fragmentation_Strategy.h"
#if !defined (__ACE_INLINE__)
# include "tao/CDR.i"
@@ -25,7 +26,9 @@ static const char *TAO_CDR_Timeprobe_Description[] =
"OutputCDR::ctor[2] - enter",
"OutputCDR::ctor[2] - leave",
"OutputCDR::ctor[3] - enter",
- "OutputCDR::ctor[3] - leave"
+ "OutputCDR::ctor[3] - leave",
+ "OutputCDR::ctor[4] - enter",
+ "OutputCDR::ctor[4] - leave"
};
enum
@@ -35,7 +38,9 @@ enum
TAO_OUTPUT_CDR_CTOR2_ENTER,
TAO_OUTPUT_CDR_CTOR2_LEAVE,
TAO_OUTPUT_CDR_CTOR3_ENTER,
- TAO_OUTPUT_CDR_CTOR3_LEAVE
+ TAO_OUTPUT_CDR_CTOR3_LEAVE,
+ TAO_OUTPUT_CDR_CTOR4_ENTER,
+ TAO_OUTPUT_CDR_CTOR4_LEAVE
};
// Setup Timeprobes
@@ -54,14 +59,20 @@ TAO_OutputCDR::TAO_OutputCDR (size_t size,
size_t memcpy_tradeoff,
ACE_CDR::Octet major_version,
ACE_CDR::Octet minor_version)
- : ACE_OutputCDR (size,
- byte_order,
- buffer_allocator,
- data_block_allocator,
- message_block_allocator,
- memcpy_tradeoff,
- major_version,
- minor_version)
+ : ACE_OutputCDR (size,
+ byte_order,
+ buffer_allocator,
+ data_block_allocator,
+ message_block_allocator,
+ memcpy_tradeoff,
+ major_version,
+ minor_version)
+ , fragmentation_strategy_ (0)
+ , more_fragments_ (false)
+ , request_id_ (0)
+ , stub_ (0)
+ , message_semantics_ (-1)
+ , timeout_ (0)
{
ACE_FUNCTION_TIMEPROBE (TAO_OUTPUT_CDR_CTOR1_ENTER);
}
@@ -75,33 +86,73 @@ TAO_OutputCDR::TAO_OutputCDR (char *data,
size_t memcpy_tradeoff,
ACE_CDR::Octet major_version,
ACE_CDR::Octet minor_version)
- : ACE_OutputCDR (data,
- size,
- byte_order,
- buffer_allocator,
- data_block_allocator,
- message_block_allocator,
- memcpy_tradeoff,
- major_version,
- minor_version)
+ : ACE_OutputCDR (data,
+ size,
+ byte_order,
+ buffer_allocator,
+ data_block_allocator,
+ message_block_allocator,
+ memcpy_tradeoff,
+ major_version,
+ minor_version)
+ , fragmentation_strategy_ (0)
+ , more_fragments_ (false)
+ , request_id_ (0)
+ , stub_ (0)
+ , message_semantics_ (-1)
+ , timeout_ (0)
{
ACE_FUNCTION_TIMEPROBE (TAO_OUTPUT_CDR_CTOR2_ENTER);
}
-TAO_OutputCDR::TAO_OutputCDR (ACE_Message_Block *data,
+TAO_OutputCDR::TAO_OutputCDR (char *data,
+ size_t size,
int byte_order,
+ ACE_Allocator* buffer_allocator,
+ ACE_Allocator* data_block_allocator,
+ ACE_Allocator* message_block_allocator,
size_t memcpy_tradeoff,
+ TAO_GIOP_Fragmentation_Strategy * fs,
ACE_CDR::Octet major_version,
ACE_CDR::Octet minor_version)
- : ACE_OutputCDR (data,
- byte_order,
- memcpy_tradeoff,
- major_version,
- minor_version)
+ : ACE_OutputCDR (data,
+ size,
+ byte_order,
+ buffer_allocator,
+ data_block_allocator,
+ message_block_allocator,
+ memcpy_tradeoff,
+ major_version,
+ minor_version)
+ , fragmentation_strategy_ (fs)
+ , more_fragments_ (false)
+ , request_id_ (0)
+ , stub_ (0)
+ , message_semantics_ (-1)
+ , timeout_ (0)
{
ACE_FUNCTION_TIMEPROBE (TAO_OUTPUT_CDR_CTOR3_ENTER);
}
+TAO_OutputCDR::TAO_OutputCDR (ACE_Message_Block *data,
+ int byte_order,
+ size_t memcpy_tradeoff,
+ ACE_CDR::Octet major_version,
+ ACE_CDR::Octet minor_version)
+ : ACE_OutputCDR (data,
+ byte_order,
+ memcpy_tradeoff,
+ major_version,
+ minor_version)
+ , fragmentation_strategy_ (0)
+ , more_fragments_ (false)
+ , request_id_ (0)
+ , stub_ (0)
+ , message_semantics_ (-1)
+ , timeout_ (0)
+{
+ ACE_FUNCTION_TIMEPROBE (TAO_OUTPUT_CDR_CTOR4_ENTER);
+}
void
TAO_OutputCDR::throw_stub_exception (int error_num ACE_ENV_ARG_DECL)
@@ -112,15 +163,16 @@ TAO_OutputCDR::throw_stub_exception (int error_num ACE_ENV_ARG_DECL)
break;
case EINVAL : // wchar from a GIOP 1.0
ACE_THROW (CORBA::MARSHAL (CORBA::OMGVMCID | 5, CORBA::COMPLETED_NO));
- ACE_NOTREACHED(break);
+ ACE_NOTREACHED (break);
#if (ERANGE != EINVAL)
case ERANGE : // untranslatable character
- ACE_THROW (CORBA::DATA_CONVERSION (CORBA::OMGVMCID | 1, CORBA::COMPLETED_NO));
- ACE_NOTREACHED(break);
+ ACE_THROW (CORBA::DATA_CONVERSION (CORBA::OMGVMCID | 1,
+ CORBA::COMPLETED_NO));
+ ACE_NOTREACHED (break);
#endif
case EACCES : // wchar but no codeset
- ACE_THROW(CORBA::INV_OBJREF (CORBA::OMGVMCID | 2, CORBA::COMPLETED_NO));
- ACE_NOTREACHED(break);
+ ACE_THROW (CORBA::INV_OBJREF (CORBA::OMGVMCID | 2, CORBA::COMPLETED_NO));
+ ACE_NOTREACHED (break);
default :
ACE_THROW (CORBA::MARSHAL ());
}
@@ -136,18 +188,18 @@ TAO_OutputCDR::throw_skel_exception (int error_num ACE_ENV_ARG_DECL)
case EINVAL : // wchar from a GIOP 1.0
ACE_THROW (CORBA::MARSHAL (CORBA::OMGVMCID | 5, CORBA::COMPLETED_YES));
- ACE_NOTREACHED(break);
+ ACE_NOTREACHED (break);
case EACCES : // wchar but no codeset
ACE_THROW (CORBA::BAD_PARAM (CORBA::OMGVMCID | 23,
CORBA::COMPLETED_YES));
- ACE_NOTREACHED(break);
+ ACE_NOTREACHED (break);
#if (ERANGE != EINVAL)
case ERANGE : // untranslatable character
ACE_THROW (CORBA::DATA_CONVERSION (CORBA::OMGVMCID | 1,
CORBA::COMPLETED_YES));
- ACE_NOTREACHED(break);
+ ACE_NOTREACHED (break);
#endif
default :
@@ -156,6 +208,19 @@ TAO_OutputCDR::throw_skel_exception (int error_num ACE_ENV_ARG_DECL)
}
}
+bool
+TAO_OutputCDR::fragment_stream (ACE_CDR::ULong pending_alignment,
+ ACE_CDR::ULong pending_length)
+{
+ if (this->fragmentation_strategy_)
+ {
+ return (this->fragmentation_strategy_->fragment (*this,
+ pending_alignment,
+ pending_length) == 0);
+ }
+
+ return true; // Success.
+}
// ****************************************************************
diff --git a/TAO/tao/CDR.h b/TAO/tao/CDR.h
index 27f28c82ac2..3bfb47b2f14 100644
--- a/TAO/tao/CDR.h
+++ b/TAO/tao/CDR.h
@@ -43,21 +43,25 @@
#define TAO_CDR_H
#include /**/ "ace/pre.h"
-#include "ace/CORBA_macros.h"
+
+#include "tao/orbconf.h"
#if !defined (ACE_LACKS_PRAGMA_ONCE)
# pragma once
#endif /* ACE_LACKS_PRAGMA_ONCE */
-#include "ace/CDR_Stream.h"
-
#include "tao/TAO_Export.h"
#include "tao/Basic_Types.h"
-#include "tao/orbconf.h"
+
+#include "ace/CORBA_macros.h"
+#include "ace/CDR_Stream.h"
+
TAO_BEGIN_VERSIONED_NAMESPACE_DECL
class TAO_ORB_Core;
+class TAO_GIOP_Fragmentation_Strategy;
+class TAO_Stub;
namespace CORBA
{
@@ -120,6 +124,20 @@ public:
ACE_CDR::Octet minor_version =
TAO_DEF_GIOP_MINOR);
+ /// Build a CDR stream with an initial buffer, it will *not* remove
+ /// @a data since it did not allocated it, and enable fragmentation
+ /// support.
+ TAO_OutputCDR (char *data,
+ size_t size,
+ int byte_order,
+ ACE_Allocator* buffer_allocator,
+ ACE_Allocator* data_block_allocator,
+ ACE_Allocator* message_block_allocator,
+ size_t memcpy_tradeoff,
+ TAO_GIOP_Fragmentation_Strategy * fs,
+ ACE_CDR::Octet major_version,
+ ACE_CDR::Octet minor_version);
+
/// Build a CDR stream with an initial Message_Block chain, it will *not*
/// remove <data>, since it did not allocate it.
TAO_OutputCDR (ACE_Message_Block *data,
@@ -140,10 +158,84 @@ public:
static void throw_stub_exception (int error_num ACE_ENV_ARG_DECL);
static void throw_skel_exception (int error_num ACE_ENV_ARG_DECL);
+ /**
+ * @name Outgoing GIOP Fragment Related Methods
+ *
+ * These methods are only used when fragmenting outgoing GIOP
+ * requests and replies.
+ */
+ //@{
+ /// Fragment this output CDR stream if necessary.
+ /**
+ * Fragmentation will done through GIOP fragments when the length of
+ * the CDR stream length will exceed the configured threshold.
+ */
+ bool fragment_stream (ACE_CDR::ULong pending_alignment,
+ ACE_CDR::ULong pending_length);
+
+ /// Are there more data fragments to come?
+ bool more_fragments (void) const;
+
+ /// Specify whether there are more data fragments to come.
+ void more_fragments (bool more);
+
+ /// Set fragmented message attributes.
+ void message_attributes (CORBA::ULong request_id,
+ TAO_Stub * stub,
+ int message_semantics,
+ ACE_Time_Value * timeout);
+
+ /// Fragmented message request ID.
+ CORBA::ULong request_id (void) const;
+
+ /// Stub object associated with the request.
+ TAO_Stub * stub (void) const;
+
+ /// Message semantics (twoway, oneway, reply)
+ int message_semantics (void) const;
+
+ /// Maximum time to wait for outgoing message to be sent.
+ ACE_Time_Value * timeout (void) const;
+ //@}
+
private:
- /// disallow copying...
+
+ // disallow copying...
TAO_OutputCDR (const TAO_OutputCDR& rhs);
TAO_OutputCDR& operator= (const TAO_OutputCDR& rhs);
+
+private:
+
+ /**
+ * @name Outgoing GIOP Fragment Related Attributes
+ *
+ * These attributes are only used when fragmenting outgoing GIOP
+ * requests and replies.
+ */
+ //@{
+ /// Strategy that sends data currently marshaled into this
+ /// TAO_OutputCDR stream if necessary.
+ TAO_GIOP_Fragmentation_Strategy * const fragmentation_strategy_;
+
+ /// Are there more data fragments to come?
+ bool more_fragments_;
+
+ /// Request ID for the request currently being marshaled.
+ CORBA::ULong request_id_;
+
+ /// Stub object associated with the request.
+ TAO_Stub * stub_;
+
+ /// Twoway, oneway, reply?
+ /**
+ * @see TAO_Transport
+ */
+ int message_semantics_;
+
+ /// Request/reply send timeout.
+ ACE_Time_Value * timeout_;
+ //@}
+
};
/**
@@ -204,7 +296,7 @@ public:
/// Create an input stream from an ACE_Message_Block with an optional lock
/// used to protect the data.
- TAO_InputCDR (const ACE_Message_Block *data,
+ TAO_InputCDR (const ACE_Message_Block *data,
ACE_Lock* lock,
int byte_order = ACE_CDR_BYTE_ORDER,
ACE_CDR::Octet major_version = TAO_DEF_GIOP_MAJOR,
diff --git a/TAO/tao/CDR.i b/TAO/tao/CDR.i
index a7dcf134c77..57162a0e0c4 100644
--- a/TAO/tao/CDR.i
+++ b/TAO/tao/CDR.i
@@ -9,6 +9,56 @@ TAO_OutputCDR::~TAO_OutputCDR (void)
{
}
+ACE_INLINE bool
+TAO_OutputCDR::more_fragments (void) const
+{
+ return this->more_fragments_;
+}
+
+ACE_INLINE void
+TAO_OutputCDR::more_fragments (bool more)
+{
+ this->more_fragments_ = more;
+}
+
+ACE_INLINE void
+TAO_OutputCDR::message_attributes (CORBA::ULong request_id,
+ TAO_Stub * stub,
+ int message_semantics,
+ ACE_Time_Value * timeout)
+{
+ this->request_id_ = request_id;
+ this->stub_ = stub;
+ this->message_semantics_ = message_semantics;
+ this->timeout_ = timeout;
+}
+
+ACE_INLINE CORBA::ULong
+TAO_OutputCDR::request_id (void) const
+{
+ return this->request_id_;
+}
+
+ACE_INLINE TAO_Stub *
+TAO_OutputCDR::stub (void) const
+{
+ return this->stub_;
+}
+
+ACE_INLINE int
+TAO_OutputCDR::message_semantics (void) const
+{
+ return this->message_semantics_;
+}
+
+ACE_INLINE ACE_Time_Value *
+TAO_OutputCDR::timeout (void) const
+{
+ return this->timeout_;
+}
+
+// -------------------------------------------------------------------
+
ACE_INLINE
TAO_InputCDR::TAO_InputCDR (const char *buf,
size_t bufsiz,
@@ -158,67 +208,102 @@ TAO_InputCDR::orb_core (void) const
ACE_INLINE CORBA::Boolean operator<< (TAO_OutputCDR &os,
CORBA::Short x)
{
- return static_cast<ACE_OutputCDR &> (os) << x;
+ return
+ os.fragment_stream (ACE_CDR::SHORT_ALIGN,
+ sizeof (CORBA::Short))
+ && static_cast<ACE_OutputCDR &> (os) << x;
}
ACE_INLINE CORBA::Boolean operator<< (TAO_OutputCDR &os,
CORBA::UShort x)
{
- return static_cast<ACE_OutputCDR &> (os) << x;
+ return
+ os.fragment_stream (ACE_CDR::SHORT_ALIGN,
+ sizeof (CORBA::UShort))
+ && static_cast<ACE_OutputCDR &> (os) << x;
}
ACE_INLINE CORBA::Boolean operator<< (TAO_OutputCDR &os,
CORBA::Long x)
{
- return static_cast<ACE_OutputCDR &> (os) << x;
+ return
+ os.fragment_stream (ACE_CDR::LONG_ALIGN,
+ sizeof (CORBA::Long))
+ && static_cast<ACE_OutputCDR &> (os) << x;
}
ACE_INLINE CORBA::Boolean operator<< (TAO_OutputCDR &os,
CORBA::ULong x)
{
- return static_cast<ACE_OutputCDR &> (os) << x;
+ return
+ os.fragment_stream (ACE_CDR::LONG_ALIGN,
+ sizeof (CORBA::ULong))
+ && static_cast<ACE_OutputCDR &> (os) << x;
}
ACE_INLINE CORBA::Boolean operator<< (TAO_OutputCDR &os,
CORBA::LongLong x)
{
- return static_cast<ACE_OutputCDR &> (os) << x;
+ return
+ os.fragment_stream (ACE_CDR::LONGLONG_ALIGN,
+ sizeof (CORBA::LongLong))
+ && static_cast<ACE_OutputCDR &> (os) << x;
}
ACE_INLINE CORBA::Boolean operator<< (TAO_OutputCDR &os,
CORBA::ULongLong x)
{
- return static_cast<ACE_OutputCDR &> (os) << x;
+ return
+ os.fragment_stream (ACE_CDR::LONGLONG_ALIGN,
+ sizeof (CORBA::ULongLong))
+ && static_cast<ACE_OutputCDR &> (os) << x;
}
ACE_INLINE CORBA::Boolean operator<< (TAO_OutputCDR& os,
CORBA::LongDouble x)
{
- return static_cast<ACE_OutputCDR &> (os) << x;
+ return
+ os.fragment_stream (ACE_CDR::LONGDOUBLE_ALIGN,
+ sizeof (CORBA::LongDouble))
+ && static_cast<ACE_OutputCDR &> (os) << x;
}
ACE_INLINE CORBA::Boolean operator<< (TAO_OutputCDR &os,
CORBA::Float x)
{
- return static_cast<ACE_OutputCDR &> (os) << x;
+ return
+ os.fragment_stream (ACE_CDR::LONG_ALIGN,
+ sizeof (CORBA::Float))
+ && static_cast<ACE_OutputCDR &> (os) << x;
}
ACE_INLINE CORBA::Boolean operator<< (TAO_OutputCDR &os,
CORBA::Double x)
{
- return static_cast<ACE_OutputCDR &> (os) << x;
+ return
+ os.fragment_stream (ACE_CDR::LONGLONG_ALIGN,
+ sizeof (CORBA::Double))
+ && static_cast<ACE_OutputCDR &> (os) << x;
}
ACE_INLINE CORBA::Boolean operator<< (TAO_OutputCDR &os,
- const CORBA::Char* x)
+ const char * x)
{
- return static_cast<ACE_OutputCDR &> (os) << x;
+ return
+ os.fragment_stream (ACE_CDR::OCTET_ALIGN,
+ sizeof (char))
+ && static_cast<ACE_OutputCDR &> (os) << x;
}
ACE_INLINE CORBA::Boolean operator<< (TAO_OutputCDR &os,
- const CORBA::WChar* x)
-{
- return static_cast<ACE_OutputCDR &> (os) << x;
+ const CORBA::WChar * x)
+{
+ return
+ os.fragment_stream ((sizeof (CORBA::WChar) == 2
+ ? ACE_CDR::SHORT_ALIGN
+ : ACE_CDR::LONG_ALIGN),
+ sizeof (CORBA::WChar))
+ && static_cast<ACE_OutputCDR &> (os) << x;
}
// ****************************************************************
diff --git a/TAO/tao/GIOP_Fragmentation_Strategy.cpp b/TAO/tao/GIOP_Fragmentation_Strategy.cpp
new file mode 100644
index 00000000000..d10348117a6
--- /dev/null
+++ b/TAO/tao/GIOP_Fragmentation_Strategy.cpp
@@ -0,0 +1,9 @@
+// $Id$
+
+
+#include "tao/GIOP_Fragmentation_Strategy.h"
+
+
+TAO_GIOP_Fragmentation_Strategy::~TAO_GIOP_Fragmentation_Strategy (void)
+{
+}
diff --git a/TAO/tao/GIOP_Fragmentation_Strategy.h b/TAO/tao/GIOP_Fragmentation_Strategy.h
new file mode 100644
index 00000000000..6ffc058cb6a
--- /dev/null
+++ b/TAO/tao/GIOP_Fragmentation_Strategy.h
@@ -0,0 +1,85 @@
+// -*- C++ -*-
+
+//=============================================================================
+/**
+ * @file GIOP_Fragmentation_Strategy.h
+ *
+ * $Id$
+ *
+ * @author Ossama Othman <ossama@dre.vanderbilt.edu>
+ */
+//=============================================================================
+
+#ifndef TAO_GIOP_FRAGMENTATION_STRATEGY_H
+#define TAO_GIOP_FRAGMENTATION_STRATEGY_H
+
+#include /**/ "ace/pre.h"
+
+#include "tao/TAO_Export.h"
+
+#if !defined (ACE_LACKS_PRAGMA_ONCE)
+# pragma once
+#endif /* ACE_LACKS_PRAGMA_ONCE */
+
+#include "tao/orbconf.h"
+#include "ace/CDR_Base.h"
+
+TAO_BEGIN_VERSIONED_NAMESPACE_DECL
+
+class TAO_OutputCDR;
+
+/**
+ * @class TAO_GIOP_Fragmenation_Strategy
+ *
+ * @brief Abstract base class that defines TAO fragmentation strategy
+ * interface.
+ *
+ * GIOP message fragmentation is deferred to a fragmentation strategy
+ */
+class TAO_Export TAO_GIOP_Fragmentation_Strategy
+{
+public:
+
+ /// Constructor
+ TAO_GIOP_Fragmentation_Strategy (void) {}
+
+ /// Destructor.
+ virtual ~TAO_GIOP_Fragmentation_Strategy (void);
+
+ /// Fragment the (potentially partially) encoded GIOP message.
+ /**
+ * Fragmentation the contents of the CDR output stream @a cdr into
+ * smaller chunks of data of size that fits within the configured
+ * ORB fragmentation threshold, and send each fragment "over the
+ * wire."
+ *
+ * @note Fragmentation will only occur if the CDR stream length will
+ * surpass the configured fragmentation threshold when
+ * marshaling the pending set of data.
+ *
+ * @param cdr Output CDR stream.
+ * @param pending_alignment Size of alignment boundary for next data
+ * to be marshaled (e.g. 4 for a
+ * CORBA::ULong).
+ * @param pending_length Size of next data to be marshaled (e.g. 2
+ * for a CORBA::UShort).
+ *
+ * @return Zero on success.
+ */
+ virtual int fragment (TAO_OutputCDR & cdr,
+ ACE_CDR::ULong pending_alignment,
+ ACE_CDR::ULong pending_length) = 0;
+
+private:
+
+ // Disallow copying and assignment.
+ TAO_GIOP_Fragmentation_Strategy (TAO_GIOP_Fragmentation_Strategy const &);
+ void operator= (TAO_GIOP_Fragmentation_Strategy const &);
+
+};
+
+TAO_END_VERSIONED_NAMESPACE_DECL
+
+#include /**/ "ace/post.h"
+
+#endif /* TAO_GIOP_FRAGMENTATION_STRATEGY_H */
diff --git a/TAO/tao/GIOP_Message_Base.cpp b/TAO/tao/GIOP_Message_Base.cpp
index 77a379e1a10..7a044309c89 100644
--- a/TAO/tao/GIOP_Message_Base.cpp
+++ b/TAO/tao/GIOP_Message_Base.cpp
@@ -25,9 +25,11 @@ ACE_RCSID (tao,
TAO_BEGIN_VERSIONED_NAMESPACE_DECL
TAO_GIOP_Message_Base::TAO_GIOP_Message_Base (TAO_ORB_Core * orb_core,
+ TAO_Transport * transport,
size_t /* input_cdr_size */)
: orb_core_ (orb_core)
, message_state_ ()
+ , fragmentation_strategy_ (orb_core->fragmentation_strategy (transport))
, out_stream_ (this->buffer_,
sizeof this->buffer_, /* ACE_CDR::DEFAULT_BUFSIZE */
TAO_ENCAP_BYTE_ORDER,
@@ -35,6 +37,7 @@ TAO_GIOP_Message_Base::TAO_GIOP_Message_Base (TAO_ORB_Core * orb_core,
orb_core->output_cdr_dblock_allocator (),
orb_core->output_cdr_msgblock_allocator (),
orb_core->orb_params ()->cdr_memcpy_tradeoff (),
+ fragmentation_strategy_.get (),
TAO_DEF_GIOP_MAJOR,
TAO_DEF_GIOP_MINOR)
{
@@ -228,14 +231,51 @@ TAO_GIOP_Message_Base::generate_reply_header (
}
int
+TAO_GIOP_Message_Base::generate_fragment_header (TAO_OutputCDR & cdr,
+ CORBA::ULong request_id)
+{
+ // Get a parser for us
+ TAO_GIOP_Message_Generator_Parser *generator_parser = 0;
+
+ CORBA::Octet major, minor;
+
+ cdr.get_version (major, minor);
+
+ // GIOP fragments are supported in GIOP 1.1 and better, but TAO only
+ // supports them in 1.2 or better since GIOP 1.1 fragments do not
+ // have a fragment message header.
+ if (major == 1 && minor < 2)
+ return -1;
+
+ // Get the state information that we need to use
+ this->set_state (major,
+ minor,
+ generator_parser);
+
+ // Write the GIOP header first
+ if (!this->write_protocol_header (TAO_GIOP_FRAGMENT, cdr)
+ || !generator_parser->write_fragment_header (cdr, request_id))
+ {
+ if (TAO_debug_level)
+ ACE_ERROR ((LM_ERROR,
+ ACE_TEXT ("(%P|%t) Error in writing GIOP header \n")));
+
+ return -1;
+ }
+
+ return 0;
+}
+
+int
TAO_GIOP_Message_Base::format_message (TAO_OutputCDR &stream)
{
// Ptr to first buffer.
- char *buf = (char *) stream.buffer ();
+ char * buf = (char *) stream.buffer ();
+
+ this->set_giop_flags (stream);
// Length of all buffers.
- size_t const total_len =
- stream.total_length ();
+ size_t const total_len = stream.total_length ();
// NOTE: Here would also be a fine place to calculate a digital
// signature for the message and place it into a preallocated slot
@@ -645,6 +685,7 @@ TAO_GIOP_Message_Base::process_request_message (TAO_Transport *transport,
this->orb_core_->input_cdr_dblock_allocator (),
this->orb_core_->input_cdr_msgblock_allocator (),
this->orb_core_->orb_params ()->cdr_memcpy_tradeoff (),
+ this->fragmentation_strategy_.get (),
qd->major_version_,
qd->minor_version_);
@@ -859,30 +900,33 @@ TAO_GIOP_Message_Base::write_protocol_header (TAO_GIOP_Message_Type type,
msg.reset ();
CORBA::Octet header[12] =
- {
- // The following works on non-ASCII platforms, such as MVS (which
- // uses EBCDIC).
- 0x47, // 'G'
- 0x49, // 'I'
- 0x4f, // 'O'
- 0x50 // 'P'
- };
+ {
+ // The following works on non-ASCII platforms, such as MVS (which
+ // uses EBCDIC).
+ 0x47, // 'G'
+ 0x49, // 'I'
+ 0x4f, // 'O'
+ 0x50 // 'P'
+ };
CORBA::Octet major, minor = 0;
- msg.get_version (major, minor);
+
+ (void) msg.get_version (major, minor);
header[4] = major;
header[5] = minor;
- // We are putting the byte order. But at a later date if we support
- // fragmentation and when we want to use the other 6 bits in this
- // octet we can have a virtual function do this for us as the
- // version info , Bala
- header[6] = (TAO_ENCAP_BYTE_ORDER ^ msg.do_byte_swap ());
+ // "flags" octet, i.e. header[6] will be set up later when message
+ // is formatted by the transport.
+
+ header[7] = CORBA::Octet (type); // Message type
- header[7] = CORBA::Octet(type);
+ static ACE_CDR::ULong const header_size =
+ sizeof (header) / sizeof (header[0]);
- static int header_size = sizeof (header) / sizeof (header[0]);
+ // Fragmentation should not occur at this point since there are only
+ // 12 bytes in the stream, and fragmentation may only occur when
+ // the stream length >= 16.
msg.write_octet_array (header, header_size);
return msg.good_bit ();
@@ -966,6 +1010,11 @@ TAO_GIOP_Message_Base::process_request (TAO_Transport *transport,
// Send back the reply service context.
reply_params.service_context_notowned (&request.reply_service_info ());
+ output.message_attributes (request_id,
+ 0,
+ TAO_Transport::TAO_REPLY,
+ 0);
+
// Make the GIOP header and Reply header
this->generate_reply_header (output,
reply_params);
@@ -980,6 +1029,8 @@ TAO_GIOP_Message_Base::process_request (TAO_Transport *transport,
return -1;
}
+ output.more_fragments (false);
+
int result = transport->send_message (output,
0,
TAO_Transport::TAO_REPLY);
@@ -1259,6 +1310,8 @@ TAO_GIOP_Message_Base::make_send_locate_reply (TAO_Transport *transport,
request.request_id (),
status_info);
+ output.more_fragments (false);
+
// Send the message
int result = transport->send_message (output,
0,
@@ -1498,6 +1551,8 @@ TAO_GIOP_Message_Base::send_reply_exception (
*x) == -1)
return -1;
+ output.more_fragments (false);
+
return transport->send_message (output,
0,
TAO_Transport::TAO_REPLY);
@@ -1543,7 +1598,8 @@ TAO_GIOP_Message_Base::dump_msg (const char *label,
char *tmp_id = 0;
if (ptr[TAO_GIOP_MESSAGE_TYPE_OFFSET] == TAO_GIOP_REQUEST ||
- ptr[TAO_GIOP_MESSAGE_TYPE_OFFSET] == TAO_GIOP_REPLY)
+ ptr[TAO_GIOP_MESSAGE_TYPE_OFFSET] == TAO_GIOP_REPLY ||
+ ptr[TAO_GIOP_MESSAGE_TYPE_OFFSET] == TAO_GIOP_FRAGMENT)
{
if (major == 1 && minor < 2)
{
@@ -1555,16 +1611,16 @@ TAO_GIOP_Message_Base::dump_msg (const char *label,
tmp_id = (char * ) (ptr + TAO_GIOP_MESSAGE_HEADER_LEN);
}
#if !defined (ACE_DISABLE_SWAP_ON_READ)
- if (byte_order == TAO_ENCAP_BYTE_ORDER)
- {
- id = reinterpret_cast <ACE_CDR::ULong*> (tmp_id);
- }
- else
- {
- ACE_CDR::swap_4 (tmp_id, reinterpret_cast <char*> (id));
- }
+ if (byte_order == TAO_ENCAP_BYTE_ORDER)
+ {
+ id = reinterpret_cast <ACE_CDR::ULong*> (tmp_id);
+ }
+ else
+ {
+ ACE_CDR::swap_4 (tmp_id, reinterpret_cast <char*> (id));
+ }
#else
- id = reinterpret_cast <ACE_CDR::ULong*> (tmp_id);
+ id = reinterpret_cast <ACE_CDR::ULong*> (tmp_id);
#endif /* ACE_DISABLE_SWAP_ON_READ */
}
@@ -2041,7 +2097,6 @@ TAO_GIOP_Message_Base::discard_fragmented_message (const TAO_Queued_Data *cancel
cancel_request_id == head_request_id)
{
TAO_Queued_Data::release (head);
-
discard_all_GIOP11_messages = true;
}
else if (head->major_version_ == 1 &&
@@ -2066,5 +2121,32 @@ TAO_GIOP_Message_Base::discard_fragmented_message (const TAO_Queued_Data *cancel
return 0;
}
+TAO_GIOP_Fragmentation_Strategy *
+TAO_GIOP_Message_Base::fragmentation_strategy (void)
+{
+ return this->fragmentation_strategy_.get ();
+}
+
+void
+TAO_GIOP_Message_Base::set_giop_flags (TAO_OutputCDR & msg) const
+{
+ CORBA::Octet * const buf =
+ reinterpret_cast<CORBA::Octet *> (const_cast<char *> (msg.buffer ()));
+
+ CORBA::Octet const & major = buf[TAO_GIOP_VERSION_MAJOR_OFFSET];
+ CORBA::Octet const & minor = buf[TAO_GIOP_VERSION_MINOR_OFFSET];
+
+ // Flags for the GIOP protocol header "flags" field.
+ CORBA::Octet & flags = buf[TAO_GIOP_MESSAGE_FLAGS_OFFSET];
+
+ // Least significant bit: Byte order
+ ACE_SET_BITS (flags, TAO_ENCAP_BYTE_ORDER ^ msg.do_byte_swap ());
+
+ // Second least significant bit: More fragments
+ //
+ // Only supported in GIOP 1.1 or better.
+ if (!(major <= 1 && minor == 0))
+ ACE_SET_BITS (flags, msg.more_fragments () << 1);
+}
TAO_END_VERSIONED_NAMESPACE_DECL
diff --git a/TAO/tao/GIOP_Message_Base.h b/TAO/tao/GIOP_Message_Base.h
index 596ea383366..9909c091983 100644
--- a/TAO/tao/GIOP_Message_Base.h
+++ b/TAO/tao/GIOP_Message_Base.h
@@ -26,9 +26,13 @@
#include "tao/GIOP_Message_Generator_Parser_Impl.h"
#include "tao/GIOP_Utils.h"
#include "tao/GIOP_Message_State.h"
+#include "tao/GIOP_Fragmentation_Strategy.h"
#include "tao/CDR.h"
#include "tao/Incoming_Message_Stack.h"
+#include "ace/Auto_Ptr.h"
+
+
TAO_BEGIN_VERSIONED_NAMESPACE_DECL
class TAO_Pluggable_Reply_Params;
@@ -49,6 +53,7 @@ class TAO_Export TAO_GIOP_Message_Base : public TAO_Pluggable_Messaging
public:
/// Constructor
TAO_GIOP_Message_Base (TAO_ORB_Core *orb_core,
+ TAO_Transport * transport,
size_t input_cdr_size = ACE_CDR::DEFAULT_BUFSIZE);
/// Dtor
@@ -81,6 +86,9 @@ public:
TAO_Pluggable_Reply_Params_Base &params
);
+ virtual int generate_fragment_header (TAO_OutputCDR & cdr,
+ CORBA::ULong request_id);
+
/// Format the message. As we have not written the message length in
/// the header, we make use of this oppurtunity to insert and format
/// the message.
@@ -152,6 +160,9 @@ public:
/// success, 1 no fragment on stack relating to CancelRequest.
virtual int discard_fragmented_message (const TAO_Queued_Data *cancel_request);
+ /// Outgoing GIOP message fragmentation strategy.
+ virtual TAO_GIOP_Fragmentation_Strategy * fragmentation_strategy (void);
+
protected:
/// Processes the GIOP_REQUEST messages
@@ -247,6 +258,14 @@ private:
/// @return 0 on success, otherwise -1
int parse_request_id (const TAO_InputCDR &cdr, CORBA::ULong &request_id) const;
+ /// Set GIOP message flags in message that has been marshaled into
+ /// the output CDR stream @a msg.
+ /**
+ * @note It is assumed that the GIOP message header is the first
+ * thing marshaled into the output CDR stream @a msg.
+ */
+ void set_giop_flags (TAO_OutputCDR & msg) const;
+
private:
/// Cached ORB_Core pointer...
TAO_ORB_Core *orb_core_;
@@ -263,16 +282,28 @@ private:
TAO::Incoming_Message_Stack fragment_stack_;
protected:
+
/// Buffer used for both the output and input CDR streams, this is
/// "safe" because we only one of the streams at a time.
char buffer_[ACE_CDR::DEFAULT_BUFSIZE];
+ /**
+ * @name Outgoing GIOP Fragment Related Attributes
+ *
+ * These attributes are only used when fragmenting outgoing GIOP
+ * requests and replies.
+ */
+ //@{
+ /// Strategy that sends data currently marshaled into this
+ /// TAO_OutputCDR stream if necessary.
+ auto_ptr<TAO_GIOP_Fragmentation_Strategy> fragmentation_strategy_;
+
/// Buffer where the request is placed.
TAO_OutputCDR out_stream_;
/*
- * Hook in the GIOP_Message class to add data member. This hook used in
- * speeding up the dispatch within TAO.
+ * Hook in the GIOP_Message class to add data member. This hook is
+ * used in speeding up the dispatch within TAO.
*/
//@@ GIOP_MESSAGE_BASE_DATA_MEMBER_ADD_HOOK
diff --git a/TAO/tao/GIOP_Message_Generator_Parser.h b/TAO/tao/GIOP_Message_Generator_Parser.h
index 0291c4264c3..db0c6acb11e 100644
--- a/TAO/tao/GIOP_Message_Generator_Parser.h
+++ b/TAO/tao/GIOP_Message_Generator_Parser.h
@@ -75,6 +75,11 @@ public:
CORBA::ULong request_id,
TAO_GIOP_Locate_Status_Msg &status) = 0;
+ /// Write the GIOP fragment message header to the output CDR stream
+ /// @a cdr.
+ virtual bool write_fragment_header (TAO_OutputCDR & cdr,
+ CORBA::ULong request_id) = 0;
+
/// Parse the Request Header from the incoming stream. This will do a
/// version specific parsing of the incoming Request header
virtual int parse_request_header (TAO_ServerRequest &) = 0;
diff --git a/TAO/tao/GIOP_Message_Generator_Parser_10.cpp b/TAO/tao/GIOP_Message_Generator_Parser_10.cpp
index 351ed8e21be..f6d10cd7c2d 100644
--- a/TAO/tao/GIOP_Message_Generator_Parser_10.cpp
+++ b/TAO/tao/GIOP_Message_Generator_Parser_10.cpp
@@ -310,6 +310,16 @@ TAO_GIOP_Message_Generator_Parser_10::write_locate_reply_mesg (
return 1;
}
+bool
+TAO_GIOP_Message_Generator_Parser_10::write_fragment_header (
+ TAO_OutputCDR & /* cdr */,
+ CORBA::ULong /* request_id */)
+{
+ // GIOP fragments are not supported in GIOP 1.0.
+ return false;
+}
+
+
int
TAO_GIOP_Message_Generator_Parser_10::parse_request_header (
TAO_ServerRequest &request)
diff --git a/TAO/tao/GIOP_Message_Generator_Parser_10.h b/TAO/tao/GIOP_Message_Generator_Parser_10.h
index 6604c1ec09c..a8829c281ea 100644
--- a/TAO/tao/GIOP_Message_Generator_Parser_10.h
+++ b/TAO/tao/GIOP_Message_Generator_Parser_10.h
@@ -66,6 +66,9 @@ public:
TAO_GIOP_Locate_Status_Msg &status
);
+ virtual bool write_fragment_header (TAO_OutputCDR & cdr,
+ CORBA::ULong request_id);
+
/// Parse the Request Header from the incoming stream. This will do a
/// version specific parsing of the incoming Request header
virtual int parse_request_header (TAO_ServerRequest &);
diff --git a/TAO/tao/GIOP_Message_Generator_Parser_12.cpp b/TAO/tao/GIOP_Message_Generator_Parser_12.cpp
index 5fa498544dc..086081e78d6 100644
--- a/TAO/tao/GIOP_Message_Generator_Parser_12.cpp
+++ b/TAO/tao/GIOP_Message_Generator_Parser_12.cpp
@@ -249,6 +249,17 @@ TAO_GIOP_Message_Generator_Parser_12::write_locate_reply_mesg (
return 1;
}
+bool
+TAO_GIOP_Message_Generator_Parser_12::write_fragment_header (
+ TAO_OutputCDR & cdr,
+ CORBA::ULong request_id)
+{
+ return (cdr << request_id);
+
+ // No need to align write pointer to an 8 byte boundary since it
+ // should already be aligned (12 for GIOP messager + 4 for fragment
+ // header = 16 -- a multiple of 8)
+}
int
TAO_GIOP_Message_Generator_Parser_12::parse_request_header (
diff --git a/TAO/tao/GIOP_Message_Generator_Parser_12.h b/TAO/tao/GIOP_Message_Generator_Parser_12.h
index 1a91095fa6b..6fd84af43a6 100644
--- a/TAO/tao/GIOP_Message_Generator_Parser_12.h
+++ b/TAO/tao/GIOP_Message_Generator_Parser_12.h
@@ -68,6 +68,9 @@ public:
TAO_GIOP_Locate_Status_Msg &status
);
+ virtual bool write_fragment_header (TAO_OutputCDR & cdr,
+ CORBA::ULong request_id);
+
/// Parse the Request Header from the incoming stream. This will do a
/// version specific parsing of the incoming Request header
virtual int parse_request_header (TAO_ServerRequest &);
diff --git a/TAO/tao/GIOP_Message_Lite.cpp b/TAO/tao/GIOP_Message_Lite.cpp
index 273b3a347b8..a3c30058d59 100644
--- a/TAO/tao/GIOP_Message_Lite.cpp
+++ b/TAO/tao/GIOP_Message_Lite.cpp
@@ -166,10 +166,18 @@ TAO_GIOP_Message_Lite::generate_reply_header (
}
int
+TAO_GIOP_Message_Lite::generate_fragment_header (TAO_OutputCDR & /* cdr */,
+ CORBA::ULong /* request_id */)
+{
+ // GIOP fragmentation is not supported in GIOP lite.
+ return 0;
+}
+
+int
TAO_GIOP_Message_Lite::format_message (TAO_OutputCDR &stream)
{
// Get the header length
- const size_t header_len = TAO_GIOP_LITE_HEADER_LEN ;
+ const size_t header_len = TAO_GIOP_LITE_HEADER_LEN;
// Get the message size offset
const size_t offset = TAO_GIOP_LITE_MESSAGE_SIZE_OFFSET;
@@ -313,7 +321,7 @@ TAO_GIOP_Message_Lite::message_type (void) const
return TAO_PLUGGABLE_MESSAGE_MESSAGERROR;
}
-int
+int
TAO_GIOP_Message_Lite::parse_next_message (ACE_Message_Block &incoming,
TAO_Queued_Data &qd,
size_t &mesg_length)
@@ -322,7 +330,7 @@ TAO_GIOP_Message_Lite::parse_next_message (ACE_Message_Block &incoming,
{
qd.missing_data_ = TAO_MISSING_DATA_UNDEFINED;
- return 0; /* incomplete header */
+ return 0; /* incomplete header */
}
else
{
@@ -347,7 +355,7 @@ TAO_GIOP_Message_Lite::parse_next_message (ACE_Message_Block &incoming,
mesg_length = TAO_GIOP_LITE_HEADER_LEN + this->message_size_;
- return 1; /* parsed header successfully */
+ return 1; /* parsed header successfully */
}
}
@@ -364,9 +372,9 @@ TAO_GIOP_Message_Lite::extract_next_message (ACE_Message_Block &incoming,
qd =
this->make_queued_data (TAO_GIOP_LITE_HEADER_LEN);
- if (qd == 0)
+ if (qd == 0)
{
- return -1; /* out of memory */
+ return -1; /* out of memory */
}
qd->msg_block_->copy (incoming.rd_ptr (),
@@ -386,9 +394,9 @@ TAO_GIOP_Message_Lite::extract_next_message (ACE_Message_Block &incoming,
qd = this->make_queued_data (copying_len);
- if (qd == 0)
+ if (qd == 0)
{
- return -1; /* out of memory */
+ return -1; /* out of memory */
}
if (copying_len > incoming.length ())
@@ -431,9 +439,9 @@ TAO_GIOP_Message_Lite::consolidate_node (TAO_Queued_Data *qd,
const size_t n_copy = ace_min (available, desired);
// paranoid check, but would cause endless loop
- if (n_copy == 0)
+ if (n_copy == 0)
{
- return -1;
+ return -1;
}
qd->msg_block_->copy (incoming.rd_ptr (), n_copy);
@@ -442,7 +450,7 @@ TAO_GIOP_Message_Lite::consolidate_node (TAO_Queued_Data *qd,
incoming.rd_ptr (n_copy);
// verify there is now enough data to parse the header
- if (qd->msg_block_->length () < TAO_GIOP_LITE_HEADER_LEN)
+ if (qd->msg_block_->length () < TAO_GIOP_LITE_HEADER_LEN)
{
return 0;
}
@@ -456,7 +464,7 @@ TAO_GIOP_Message_Lite::consolidate_node (TAO_Queued_Data *qd,
if (ACE_CDR::grow (qd->msg_block_,
this->message_size_ + TAO_GIOP_LITE_HEADER_LEN) == -1)
{
- /* memory allocation failed */
+ /* memory allocation failed */
return -1;
}
@@ -481,9 +489,9 @@ TAO_GIOP_Message_Lite::consolidate_node (TAO_Queued_Data *qd,
}
// paranoid check
- if (copy_len == 0)
+ if (copy_len == 0)
{
- return -1;
+ return -1;
}
// ..now we are set to copy the right amount of data to the
@@ -1680,7 +1688,7 @@ TAO_GIOP_Message_Lite::make_queued_data (size_t sz)
this->orb_core_->create_input_cdr_data_block (sz +
ACE_CDR::MAX_ALIGNMENT);
- if (db == 0)
+ if (db == 0)
{
TAO_Queued_Data::release (qd);
return 0;
@@ -1695,7 +1703,7 @@ TAO_GIOP_Message_Lite::make_queued_data (size_t sz)
ACE_Message_Block *new_mb = mb.duplicate ();
- if (new_mb == 0)
+ if (new_mb == 0)
{
TAO_Queued_Data::release (qd);
db->release();
@@ -1748,9 +1756,9 @@ TAO_GIOP_Message_Lite::init_queued_data (TAO_Queued_Data* qd) const
qd->msg_type_ = this->message_type ();
}
-/* @return -1 error, 0 ok */
-int
-TAO_GIOP_Message_Lite::consolidate_fragmented_message (TAO_Queued_Data* /* qd */,
+/* @return -1 error, 0 ok */
+int
+TAO_GIOP_Message_Lite::consolidate_fragmented_message (TAO_Queued_Data* /* qd */,
TAO_Queued_Data *& /* msg */)
{
if (TAO_debug_level > 3)
@@ -1764,13 +1772,20 @@ TAO_GIOP_Message_Lite::consolidate_fragmented_message (TAO_Queued_Data* /* qd */
-/// Remove all fragments from stack corelating to CancelRequest @a qd.
+/// Remove all fragments from stack corelating to CancelRequest @a qd.
/// @return -1 on failure, 0 on success, 1 no fragment on stack
/// relating to CancelRequest.
-int
+int
TAO_GIOP_Message_Lite::discard_fragmented_message (const TAO_Queued_Data *)
{
return 1; // no fragment on stack relating to cancel-request
}
+TAO_GIOP_Fragmentation_Strategy *
+TAO_GIOP_Message_Lite::fragmentation_strategy (void)
+{
+ return 0; // Fragmentation is unsupported in GIOP lite.
+}
+
+
TAO_END_VERSIONED_NAMESPACE_DECL
diff --git a/TAO/tao/GIOP_Message_Lite.h b/TAO/tao/GIOP_Message_Lite.h
index 92f18c33952..7b8e35d512a 100644
--- a/TAO/tao/GIOP_Message_Lite.h
+++ b/TAO/tao/GIOP_Message_Lite.h
@@ -79,6 +79,9 @@ public:
TAO_Pluggable_Reply_Params_Base &params
);
+ virtual int generate_fragment_header (TAO_OutputCDR & cdr,
+ CORBA::ULong request_id);
+
/// Format the message. As we have not written the message length in
/// the header, we make use of this oppurtunity to insert and format
/// the message.
@@ -160,6 +163,9 @@ public:
/// fragment chain exists, -1 on error
virtual int discard_fragmented_message (const TAO_Queued_Data *cancel_request);
+ /// Outgoing GIOP message fragmentation strategy.
+ virtual TAO_GIOP_Fragmentation_Strategy * fragmentation_strategy (void);
+
private:
/// Writes the GIOP header in to @a msg
diff --git a/TAO/tao/IIOP_Transport.cpp b/TAO/tao/IIOP_Transport.cpp
index d7e493e7402..fddb13520b5 100644
--- a/TAO/tao/IIOP_Transport.cpp
+++ b/TAO/tao/IIOP_Transport.cpp
@@ -30,7 +30,7 @@ TAO_IIOP_Transport::TAO_IIOP_Transport (TAO_IIOP_Connection_Handler *handler,
{
// Use the normal GIOP object
ACE_NEW (this->messaging_object_,
- TAO_GIOP_Message_Base (orb_core));
+ TAO_GIOP_Message_Base (orb_core, this));
}
TAO_IIOP_Transport::~TAO_IIOP_Transport (void)
diff --git a/TAO/tao/Messaging/Asynch_Invocation.cpp b/TAO/tao/Messaging/Asynch_Invocation.cpp
index 893cab4df34..619de3ed97b 100644
--- a/TAO/tao/Messaging/Asynch_Invocation.cpp
+++ b/TAO/tao/Messaging/Asynch_Invocation.cpp
@@ -66,6 +66,13 @@ namespace TAO
// try block is to take care of the cases when things go wrong.
ACE_TRY
{
+ // Oneway semantics. See comments for below send_message()
+ // call.
+ cdr.message_attributes (this->details_.request_id (),
+ this->resolver_.stub (),
+ TAO_Transport::TAO_ONEWAY_REQUEST,
+ max_wait_time);
+
this->write_header (tspec,
cdr
ACE_ENV_ARG_PARAMETER);
diff --git a/TAO/tao/Null_Fragmentation_Strategy.cpp b/TAO/tao/Null_Fragmentation_Strategy.cpp
new file mode 100644
index 00000000000..79ddde5c290
--- /dev/null
+++ b/TAO/tao/Null_Fragmentation_Strategy.cpp
@@ -0,0 +1,17 @@
+// $Id$
+
+
+#include "tao/Null_Fragmentation_Strategy.h"
+
+
+TAO_Null_Fragmentation_Strategy::~TAO_Null_Fragmentation_Strategy (void)
+{
+}
+
+int
+TAO_Null_Fragmentation_Strategy::fragment (TAO_OutputCDR &,
+ ACE_CDR::ULong,
+ ACE_CDR::ULong)
+{
+ return 0;
+}
diff --git a/TAO/tao/Null_Fragmentation_Strategy.h b/TAO/tao/Null_Fragmentation_Strategy.h
new file mode 100644
index 00000000000..ace58815d2d
--- /dev/null
+++ b/TAO/tao/Null_Fragmentation_Strategy.h
@@ -0,0 +1,64 @@
+// -*- C++ -*-
+
+//=============================================================================
+/**
+ * @file Null_Fragmentation_Strategy.h
+ *
+ * $Id$
+ *
+ * @author Ossama Othman <ossama@dre.vanderbilt.edu>
+ */
+//=============================================================================
+
+#ifndef TAO_NULL_FRAGMENTATION_STRATEGY_H
+#define TAO_NULL_FRAGMENTATION_STRATEGY_H
+
+#include /**/ "ace/pre.h"
+
+#include "tao/GIOP_Fragmentation_Strategy.h"
+#include "ace/CDR_Base.h"
+
+#if !defined (ACE_LACKS_PRAGMA_ONCE)
+# pragma once
+#endif /* ACE_LACKS_PRAGMA_ONCE */
+
+
+TAO_BEGIN_VERSIONED_NAMESPACE_DECL
+
+class TAO_OutputCDR;
+
+
+/**
+ * @class TAO_Null_Fragmenation_Strategy
+ *
+ * @brief Null GIOP message fragmentation strategy.
+ *
+ * No-op GIOP message fragmentation strategy. This strategy performs
+ * no GIOP fragmentation.
+ *
+ * @see TAO_GIOP_Fragmentation_Strategy
+ */
+class TAO_Null_Fragmentation_Strategy
+ : public TAO_GIOP_Fragmentation_Strategy
+{
+public:
+
+ TAO_Null_Fragmentation_Strategy (void) {}
+ virtual ~TAO_Null_Fragmentation_Strategy (void);
+ virtual int fragment (TAO_OutputCDR &,
+ ACE_CDR::ULong,
+ ACE_CDR::ULong);
+
+private:
+
+ // Disallow copying and assignment.
+ TAO_Null_Fragmentation_Strategy (TAO_Null_Fragmentation_Strategy const &);
+ void operator= (TAO_Null_Fragmentation_Strategy const &);
+
+};
+
+TAO_END_VERSIONED_NAMESPACE_DECL
+
+#include /**/ "ace/post.h"
+
+#endif /* TAO_NULL_FRAGMENTATION_STRATEGY_H */
diff --git a/TAO/tao/ORB_Core.cpp b/TAO/tao/ORB_Core.cpp
index 9a5ca546c90..43f0e694e8a 100644
--- a/TAO/tao/ORB_Core.cpp
+++ b/TAO/tao/ORB_Core.cpp
@@ -36,6 +36,7 @@
#include "tao/PolicyFactory_Registry_Factory.h"
#include "tao/ORBInitializer_Registry_Adapter.h"
#include "tao/Codeset_Manager.h"
+#include "tao/GIOP_Fragmentation_Strategy.h"
#include "tao/Valuetype_Adapter.h"
#include "tao/Valuetype_Adapter_Factory.h"
@@ -942,6 +943,14 @@ TAO_ORB_Core::init (int &argc, char *argv[] ACE_ENV_ARG_DECL)
arg_shifter.consume_arg ();
}
+ else if (0 != (current_arg = arg_shifter.get_the_parameter
+ (ACE_LIB_TEXT("-ORBMaxMessageSize"))))
+ {
+ this->orb_params_.max_message_size (ACE_OS::atoi (current_arg));
+
+ arg_shifter.consume_arg ();
+ }
+
////////////////////////////////////////////////////////////////
// catch any unknown -ORB args //
////////////////////////////////////////////////////////////////
@@ -2674,6 +2683,15 @@ TAO_ORB_Core::connector_registry (ACE_ENV_SINGLE_ARG_DECL)
return conn;
}
+auto_ptr<TAO_GIOP_Fragmentation_Strategy>
+TAO_ORB_Core::fragmentation_strategy (TAO_Transport * transport)
+{
+ return
+ this->resource_factory ()->create_fragmentation_strategy (
+ transport,
+ this->orb_params_.max_message_size ());
+}
+
ACE_Reactor *
TAO_ORB_Core::reactor (void)
{
diff --git a/TAO/tao/ORB_Core.h b/TAO/tao/ORB_Core.h
index 61bb83b499e..d0144373b4b 100644
--- a/TAO/tao/ORB_Core.h
+++ b/TAO/tao/ORB_Core.h
@@ -893,6 +893,10 @@ public:
(const CORBA::Object_ptr obj,
const TAO_Service_Context &service_context);
+ /// Get outgoing fragmentation strategy.
+ auto_ptr<TAO_GIOP_Fragmentation_Strategy>
+ fragmentation_strategy (TAO_Transport * transport);
+
protected:
/// Destructor is protected since the ORB Core is a reference
diff --git a/TAO/tao/On_Demand_Fragmentation_Strategy.cpp b/TAO/tao/On_Demand_Fragmentation_Strategy.cpp
new file mode 100644
index 00000000000..329b6940b80
--- /dev/null
+++ b/TAO/tao/On_Demand_Fragmentation_Strategy.cpp
@@ -0,0 +1,93 @@
+// $Id$
+
+
+#include "tao/On_Demand_Fragmentation_Strategy.h"
+
+#include "tao/Transport.h"
+#include "tao/CDR.h"
+#include "tao/Pluggable_Messaging.h"
+#include "tao/debug.h"
+
+TAO_On_Demand_Fragmentation_Strategy::TAO_On_Demand_Fragmentation_Strategy (
+ TAO_Transport * transport,
+ CORBA::ULong max_message_size)
+ : transport_ (transport)
+ , max_message_size_ (max_message_size)
+{
+}
+
+TAO_On_Demand_Fragmentation_Strategy::~TAO_On_Demand_Fragmentation_Strategy (
+ void)
+{
+}
+
+int
+TAO_On_Demand_Fragmentation_Strategy::fragment (
+ TAO_OutputCDR & cdr,
+ ACE_CDR::ULong pending_alignment,
+ ACE_CDR::ULong pending_length)
+{
+ if (this->transport_ == 0)
+ return 0; // No transport. Can't fragment.
+
+ CORBA::Octet major = 0;
+ CORBA::Octet minor = 0;
+
+ (void) cdr.get_version (major, minor);
+
+ // GIOP fragments are supported in GIOP 1.1 and better, but TAO only
+ // supports them in 1.2 or better since GIOP 1.1 fragments do not
+ // have a fragment message header.
+ if (major == 1 && minor < 2)
+ return -1;
+
+ // Determine increase in CDR stream length if pending data is
+ // marshaled, taking into account the alignment for the given data
+ // type.
+ ACE_CDR::ULong const total_pending_length =
+ ACE_align_binary (cdr.total_length (), pending_alignment)
+ + pending_length;
+
+ // Except for the last fragment, fragmented GIOP messages must
+ // always be aligned on an 8-byte boundary. Padding will be added
+ // if necessary.
+ ACE_CDR::ULong const aligned_length =
+ ACE_align_binary (total_pending_length, ACE_CDR::MAX_ALIGNMENT);
+
+ // this->max_message_size_ must be >= 24 bytes, i.e.:
+ // 12 for GIOP protocol header
+ // + 4 for GIOP fragment header
+ // + 8 for payload (including padding)
+ // since fragments must be aligned on an 8 byte boundary.
+ if (aligned_length > this->max_message_size_)
+ {
+ // Pad the outgoing fragment if necessary.
+ if (cdr.align_write_ptr (ACE_CDR::MAX_ALIGNMENT) != 0)
+ return -1;
+
+ // More fragments to come.
+ cdr.more_fragments (true);
+
+ if (TAO_debug_level > 0)
+ ACE_DEBUG ((LM_DEBUG,
+ "TAO (%P|%t) - On_Demand_Fragmentation_Strategy::fragment, "
+ "sending fragment of size %d\n",
+ cdr.total_length ()));
+
+ // Send the current CDR stream contents through the transport,
+ // making sure to switch on the the GIOP flags "more fragments"
+ // bit.
+ if (this->transport_->send_message (cdr,
+ cdr.stub (),
+ cdr.message_semantics (),
+ cdr.timeout ()) == -1
+
+ // Now generate a fragment header.
+ || this->transport_->messaging_object ()->generate_fragment_header (
+ cdr,
+ cdr.request_id ()) != 0)
+ return -1;
+ }
+
+ return 0;
+}
diff --git a/TAO/tao/On_Demand_Fragmentation_Strategy.h b/TAO/tao/On_Demand_Fragmentation_Strategy.h
new file mode 100644
index 00000000000..97dd35b8cd7
--- /dev/null
+++ b/TAO/tao/On_Demand_Fragmentation_Strategy.h
@@ -0,0 +1,77 @@
+// -*- C++ -*-
+
+//=============================================================================
+/**
+ * @file True_Fragmentation_Strategy.h
+ *
+ * $Id$
+ *
+ * @author Ossama Othman <ossama@dre.vanderbilt.edu>
+ */
+//=============================================================================
+
+#ifndef TAO_TRUE_FRAGMENTATION_STRATEGY_H
+#define TAO_TRUE_FRAGMENTATION_STRATEGY_H
+
+#include /**/ "ace/pre.h"
+
+#include "tao/GIOP_Fragmentation_Strategy.h"
+#include "ace/CDR_Base.h"
+
+#if !defined (ACE_LACKS_PRAGMA_ONCE)
+# pragma once
+#endif /* ACE_LACKS_PRAGMA_ONCE */
+
+
+TAO_BEGIN_VERSIONED_NAMESPACE_DECL
+
+class TAO_OutputCDR;
+class TAO_Transport;
+
+
+/**
+ * @class TAO_On_Demand_Fragmenation_Strategy
+ *
+ * @brief On Demand GIOP message fragmentation strategy.
+ *
+ * GIOP message fragmentation strategy that performs fragmentation
+ * when the size of the CDR stream will exceed the user configured
+ * threshold when marshaling the next/pending set of data.
+ *
+ * @see TAO_GIOP_Fragmentation_Strategy
+ */
+class TAO_On_Demand_Fragmentation_Strategy
+ : public TAO_GIOP_Fragmentation_Strategy
+{
+public:
+
+ /// Constructor.
+ TAO_On_Demand_Fragmentation_Strategy (TAO_Transport * transport,
+ ACE_CDR::ULong max_message_size);
+
+ virtual ~TAO_On_Demand_Fragmentation_Strategy (void);
+ virtual int fragment (TAO_OutputCDR & cdr,
+ ACE_CDR::ULong pending_alignment,
+ ACE_CDR::ULong pending_length);
+
+private:
+
+ // Disallow copying and assignment.
+ TAO_On_Demand_Fragmentation_Strategy (TAO_On_Demand_Fragmentation_Strategy const &);
+ void operator= (TAO_On_Demand_Fragmentation_Strategy const &);
+
+private:
+
+ /// Pointer to the underlying transport object.
+ TAO_Transport * const transport_;
+
+ /// Size of GIOP message at which fragmentation will occur.
+ ACE_CDR::ULong const max_message_size_;
+
+};
+
+TAO_END_VERSIONED_NAMESPACE_DECL
+
+#include /**/ "ace/post.h"
+
+#endif /* TAO_TRUE_FRAGMENTATION_STRATEGY_H */
diff --git a/TAO/tao/Pluggable_Messaging.h b/TAO/tao/Pluggable_Messaging.h
index 89fce0721b5..c6e2e4d734f 100644
--- a/TAO/tao/Pluggable_Messaging.h
+++ b/TAO/tao/Pluggable_Messaging.h
@@ -45,6 +45,7 @@ class TAO_Operation_Details;
class TAO_Target_Specification;
class TAO_OutputCDR;
class TAO_Queued_Data;
+class TAO_GIOP_Fragmentation_Strategy;
// @@ The more I think I about this class, I feel that this class need
// not be a ABC as it is now. Instead we have these options
@@ -106,6 +107,8 @@ public:
TAO_OutputCDR &cdr,
TAO_Pluggable_Reply_Params_Base &params) = 0;
+ virtual int generate_fragment_header (TAO_OutputCDR & cdr,
+ CORBA::ULong request_id) = 0;
/// Format the message in the @a cdr. May not be needed in
/// general.
@@ -195,6 +198,9 @@ public:
/// @return -1 on failure, 0 on success, 1 no fragment on stack
/// relating to CancelRequest.
virtual int discard_fragmented_message (const TAO_Queued_Data *cancel_request) = 0;
+
+ /// Outgoing GIOP message fragmentation strategy.
+ virtual TAO_GIOP_Fragmentation_Strategy * fragmentation_strategy (void) = 0;
};
TAO_END_VERSIONED_NAMESPACE_DECL
diff --git a/TAO/tao/PortableServer/Upcall_Wrapper.cpp b/TAO/tao/PortableServer/Upcall_Wrapper.cpp
index 352332e6cd0..2e9b1f1564d 100644
--- a/TAO/tao/PortableServer/Upcall_Wrapper.cpp
+++ b/TAO/tao/PortableServer/Upcall_Wrapper.cpp
@@ -293,6 +293,10 @@ TAO::Upcall_Wrapper::post_upcall (TAO_OutputCDR & cdr,
ACE_CHECK;
}
}
+
+ // Reply body marshaling completed. No other fragments to send.
+ cdr.more_fragments (false);
}
TAO_END_VERSIONED_NAMESPACE_DECL
+
diff --git a/TAO/tao/Range_Checking_T.h b/TAO/tao/Range_Checking_T.h
index bcd4aa42b5d..f5e461ef09f 100644
--- a/TAO/tao/Range_Checking_T.h
+++ b/TAO/tao/Range_Checking_T.h
@@ -13,6 +13,8 @@
#include "tao/Basic_Types.h"
+TAO_BEGIN_VERSIONED_NAMESPACE_DECL
+
namespace TAO
{
namespace details
@@ -139,6 +141,8 @@ struct range_checking
} // namespace details
} // namespace TAO
+TAO_END_VERSIONED_NAMESPACE_DECL
+
#if defined(TAO_USER_DEFINED_SEQUENCE_RANGE_CHECKING_INCLUDE)
# include TAO_USER_DEFINED_SEQUENCE_RANGE_CHECKING_INCLUDE
#endif // TAO_USER_DEFINED_SEQUENCE_RANGE_CHECKING_INCLUDE
diff --git a/TAO/tao/Remote_Invocation.cpp b/TAO/tao/Remote_Invocation.cpp
index 7ff6e461339..417722db92b 100644
--- a/TAO/tao/Remote_Invocation.cpp
+++ b/TAO/tao/Remote_Invocation.cpp
@@ -1,4 +1,5 @@
//$Id$
+
#include "tao/Remote_Invocation.h"
#include "tao/Profile.h"
#include "tao/Profile_Transport_Resolver.h"
@@ -113,6 +114,7 @@ namespace TAO
{
this->resolver_.transport ()->clear_translators (0,
&out_stream);
+
// Send the request for the header
if (this->resolver_.transport ()->generate_request_header (this->details_,
spec,
diff --git a/TAO/tao/Resource_Factory.h b/TAO/tao/Resource_Factory.h
index 128a72667e0..1fae24ea22b 100644
--- a/TAO/tao/Resource_Factory.h
+++ b/TAO/tao/Resource_Factory.h
@@ -23,6 +23,7 @@
#endif /* ACE_LACKS_PRAGMA_ONCE */
#include "tao/Versioned_Namespace.h"
+#include "tao/Basic_Types.h"
#include "ace/Service_Object.h"
#include "ace/Unbounded_Set.h"
@@ -44,6 +45,8 @@ class TAO_Flushing_Strategy;
class TAO_Connection_Purging_Strategy;
class TAO_LF_Strategy;
class TAO_Codeset_Manager;
+class TAO_GIOP_Fragmentation_Strategy;
+class TAO_Transport;
// ****************************************************************
@@ -239,6 +242,11 @@ public:
/// caller.
virtual TAO_LF_Strategy *create_lf_strategy (void) = 0;
+ /// Outgoing fragment creation strategy.
+ virtual auto_ptr<TAO_GIOP_Fragmentation_Strategy>
+ create_fragmentation_strategy (TAO_Transport * transport,
+ CORBA::ULong max_message_size) const = 0;
+
/// Disables the factory. When a new factory is installed and used,
/// this function should be called on the previously used (default)
/// factory. This should result in proper error reporting if the
diff --git a/TAO/tao/Strategies/DIOP_Transport.cpp b/TAO/tao/Strategies/DIOP_Transport.cpp
index 876fd01d111..22de0661c8a 100644
--- a/TAO/tao/Strategies/DIOP_Transport.cpp
+++ b/TAO/tao/Strategies/DIOP_Transport.cpp
@@ -52,6 +52,7 @@ TAO_DIOP_Transport::TAO_DIOP_Transport (TAO_DIOP_Connection_Handler *handler,
// Use the normal GIOP object
ACE_NEW (this->messaging_object_,
TAO_GIOP_Message_Base (orb_core,
+ this,
ACE_MAX_DGRAM_SIZE));
}
diff --git a/TAO/tao/Strategies/SCIOP_Transport.cpp b/TAO/tao/Strategies/SCIOP_Transport.cpp
index 72da213aa93..f306345bca0 100644
--- a/TAO/tao/Strategies/SCIOP_Transport.cpp
+++ b/TAO/tao/Strategies/SCIOP_Transport.cpp
@@ -48,7 +48,7 @@ TAO_SCIOP_Transport::TAO_SCIOP_Transport (TAO_SCIOP_Connection_Handler *handler,
{
// Use the normal GIOP object
ACE_NEW (this->messaging_object_,
- TAO_GIOP_Message_Base (orb_core));
+ TAO_GIOP_Message_Base (orb_core, this));
}
}
diff --git a/TAO/tao/Strategies/SHMIOP_Transport.cpp b/TAO/tao/Strategies/SHMIOP_Transport.cpp
index 23e03b8e94b..bb99b50d002 100644
--- a/TAO/tao/Strategies/SHMIOP_Transport.cpp
+++ b/TAO/tao/Strategies/SHMIOP_Transport.cpp
@@ -45,7 +45,7 @@ TAO_SHMIOP_Transport::TAO_SHMIOP_Transport (TAO_SHMIOP_Connection_Handler *handl
{
// Use the normal GIOP object
ACE_NEW (this->messaging_object_,
- TAO_GIOP_Message_Base (orb_core));
+ TAO_GIOP_Message_Base (orb_core, this));
}
//@@ MESSAGING_SPL_COMMENT_HOOK_END
}
diff --git a/TAO/tao/Strategies/UIOP_Transport.cpp b/TAO/tao/Strategies/UIOP_Transport.cpp
index 4adc371f55a..c3cde036f2e 100644
--- a/TAO/tao/Strategies/UIOP_Transport.cpp
+++ b/TAO/tao/Strategies/UIOP_Transport.cpp
@@ -47,7 +47,7 @@ TAO_UIOP_Transport::TAO_UIOP_Transport (TAO_UIOP_Connection_Handler *handler,
{
// Use the normal GIOP object
ACE_NEW (this->messaging_object_,
- TAO_GIOP_Message_Base (orb_core));
+ TAO_GIOP_Message_Base (orb_core, this));
}
//@@ MESSAGING_SPL_COMMENT_HOOK_END
}
diff --git a/TAO/tao/Synch_Invocation.cpp b/TAO/tao/Synch_Invocation.cpp
index 5f18d58eba1..bff4141a033 100644
--- a/TAO/tao/Synch_Invocation.cpp
+++ b/TAO/tao/Synch_Invocation.cpp
@@ -82,6 +82,11 @@ namespace TAO
{
TAO_OutputCDR &cdr = this->resolver_.transport ()->out_stream ();
+ cdr.message_attributes (this->details_.request_id (),
+ this->resolver_.stub (),
+ TAO_Transport::TAO_TWOWAY_REQUEST,
+ max_wait_time);
+
this->write_header (tspec,
cdr
ACE_ENV_ARG_PARAMETER);
@@ -91,7 +96,6 @@ namespace TAO
ACE_ENV_ARG_PARAMETER);
ACE_TRY_CHECK;
-
// Register a reply dispatcher for this invocation. Use the
// preallocated reply dispatcher.
TAO_Bind_Dispatcher_Guard dispatch_guard (
@@ -736,6 +740,11 @@ namespace TAO
ACE_TRY
{
+ cdr.message_attributes (this->details_.request_id (),
+ this->resolver_.stub (),
+ TAO_Transport::TAO_ONEWAY_REQUEST,
+ max_wait_time);
+
this->write_header (tspec,
cdr
ACE_ENV_ARG_PARAMETER);
diff --git a/TAO/tao/TAO_Server_Request.cpp b/TAO/tao/TAO_Server_Request.cpp
index c4b7f69adb1..726825d5dcc 100644
--- a/TAO/tao/TAO_Server_Request.cpp
+++ b/TAO/tao/TAO_Server_Request.cpp
@@ -282,6 +282,12 @@ TAO_ServerRequest::init_reply (void)
reply_params.reply_status_ = this->exception_type_;
}
+
+ this->outgoing_->message_attributes (this->request_id_,
+ 0,
+ TAO_Transport::TAO_REPLY,
+ 0);
+
// Construct a REPLY header.
this->mesg_base_->generate_reply_header (*this->outgoing_,
reply_params);
@@ -325,10 +331,17 @@ TAO_ServerRequest::send_no_exception_reply (void)
// No data anyway.
reply_params.argument_flag_ = 0;
+ this->outgoing_->message_attributes (this->request_id_,
+ 0,
+ TAO_Transport::TAO_REPLY,
+ 0);
+
// Construct a REPLY header.
this->mesg_base_->generate_reply_header (*this->outgoing_,
reply_params);
+ this->outgoing_->more_fragments (false);
+
// Send the message.
int result = this->transport_->send_message (*this->outgoing_,
0,
@@ -355,6 +368,8 @@ TAO_ServerRequest::tao_send_reply (void)
if (this->collocated ())
return; // No transport in the collocated case.
+ this->outgoing_->more_fragments (false);
+
int result = this->transport_->send_message (*this->outgoing_,
0,
TAO_Transport::TAO_REPLY);
@@ -414,6 +429,7 @@ TAO_ServerRequest::tao_send_reply_exception (CORBA::Exception &ex)
this->orb_core_->output_cdr_dblock_allocator (),
this->orb_core_->output_cdr_msgblock_allocator (),
this->orb_core_->orb_params ()->cdr_memcpy_tradeoff (),
+ this->mesg_base_->fragmentation_strategy (),
TAO_DEF_GIOP_MAJOR,
TAO_DEF_GIOP_MINOR);
@@ -429,6 +445,8 @@ TAO_ServerRequest::tao_send_reply_exception (CORBA::Exception &ex)
}
+ this->outgoing_->more_fragments (false);
+
// Send the message
if (this->transport_->send_message (*this->outgoing_,
0,
@@ -475,10 +493,11 @@ TAO_ServerRequest::send_cached_reply (CORBA::OctetSeq &s)
this->orb_core_->output_cdr_dblock_allocator (),
this->orb_core_->output_cdr_msgblock_allocator (),
this->orb_core_->orb_params ()->cdr_memcpy_tradeoff (),
+ this->mesg_base_->fragmentation_strategy (),
TAO_DEF_GIOP_MAJOR,
TAO_DEF_GIOP_MINOR);
- this->transport_->assign_translators(0,&output);
+ this->transport_->assign_translators (0, &output);
// A copy of the reply parameters
TAO_Pluggable_Reply_Params_Base reply_params;
@@ -496,6 +515,11 @@ TAO_ServerRequest::send_cached_reply (CORBA::OctetSeq &s)
// Make a default reply status
reply_params.reply_status_ = TAO_GIOP_NO_EXCEPTION;
+ this->outgoing_->message_attributes (this->request_id_,
+ 0,
+ TAO_Transport::TAO_REPLY,
+ 0);
+
// Make the reply message
if (this->mesg_base_->generate_reply_header (*this->outgoing_,
reply_params) == -1)
@@ -516,6 +540,8 @@ TAO_ServerRequest::send_cached_reply (CORBA::OctetSeq &s)
ACE_TEXT ("TAO (%P|%t) - ServerRequest::send_cached_reply, ")
ACE_TEXT ("could not marshal reply\n")));
+ this->outgoing_->more_fragments (false);
+
// Send the message
if (this->transport_->send_message (*this->outgoing_,
0,
diff --git a/TAO/tao/default_resource.cpp b/TAO/tao/default_resource.cpp
index b53e516d9ac..b52665b7023 100644
--- a/TAO/tao/default_resource.cpp
+++ b/TAO/tao/default_resource.cpp
@@ -15,6 +15,8 @@
#include "tao/Codeset_Descriptor_Base.h"
#include "tao/Codeset_Manager_Factory_Base.h"
#include "tao/Codeset_Manager.h"
+#include "tao/Null_Fragmentation_Strategy.h"
+#include "tao/On_Demand_Fragmentation_Strategy.h"
#include "ace/TP_Reactor.h"
#include "ace/Dynamic_Service.h"
@@ -1085,6 +1087,56 @@ TAO_Default_Resource_Factory::create_lf_strategy (void)
return strategy;
}
+auto_ptr<TAO_GIOP_Fragmentation_Strategy>
+TAO_Default_Resource_Factory::create_fragmentation_strategy (
+ TAO_Transport * transport,
+ CORBA::ULong max_message_size) const
+{
+ auto_ptr<TAO_GIOP_Fragmentation_Strategy> strategy (0);
+
+ TAO_GIOP_Fragmentation_Strategy * tmp = 0;
+
+ // Minimum GIOP message size is 24 (a multiple of 8):
+ // 12 GIOP Message Header
+ // 4 GIOP Fragment Header (request ID)
+ // + 8 Smallest payload, including padding.
+ // ---
+ // 24
+ static CORBA::ULong const min_message_size = 24;
+
+ // GIOP fragments are supported in GIOP 1.1 and better, but TAO only
+ // supports them in 1.2 or better since GIOP 1.1 fragments do not
+ // have a fragment message header.
+
+
+ if (transport) // No transport. Cannot fragment.
+ {
+ if (max_message_size < min_message_size
+ || (TAO_DEF_GIOP_MAJOR == 1 && TAO_DEF_GIOP_MINOR < 2))
+ {
+ // No maximum was set by the user.
+ ACE_NEW_RETURN (tmp,
+ TAO_Null_Fragmentation_Strategy,
+ strategy);
+
+ }
+ else
+ {
+ ACE_NEW_RETURN (tmp,
+ TAO_On_Demand_Fragmentation_Strategy (
+ transport,
+ max_message_size),
+ strategy);
+ }
+ }
+
+ ACE_AUTO_PTR_RESET (strategy,
+ tmp,
+ TAO_GIOP_Fragmentation_Strategy);
+
+ return strategy;
+}
+
void
TAO_Default_Resource_Factory::report_option_value_error (
const ACE_TCHAR* option_name,
diff --git a/TAO/tao/default_resource.h b/TAO/tao/default_resource.h
index 2189a8ca7d7..aef5e545fd7 100644
--- a/TAO/tao/default_resource.h
+++ b/TAO/tao/default_resource.h
@@ -135,7 +135,9 @@ public:
virtual TAO_Connection_Purging_Strategy *create_purging_strategy (void);
TAO_Resource_Factory::Resource_Usage resource_usage_strategy (void) const;
virtual TAO_LF_Strategy *create_lf_strategy (void);
-
+ virtual auto_ptr<TAO_GIOP_Fragmentation_Strategy>
+ create_fragmentation_strategy (TAO_Transport * transport,
+ CORBA::ULong max_message_size) const;
virtual void disable_factory (void);
virtual bool drop_replies_during_shutdown (void) const;
//@}
diff --git a/TAO/tao/operation_details.cpp b/TAO/tao/operation_details.cpp
index faccd9fe89c..7e505f26c58 100644
--- a/TAO/tao/operation_details.cpp
+++ b/TAO/tao/operation_details.cpp
@@ -6,6 +6,7 @@
#include "tao/Exception_Data.h"
#include "tao/SystemException.h"
#include "tao/Argument.h"
+#include "tao/CDR.h"
#include "ace/OS_NS_string.h"
@@ -62,6 +63,11 @@ TAO_Operation_Details::marshal_args (TAO_OutputCDR &cdr)
return false;
}
+ // Nothing else to fragment. We're also guaranteed to have
+ // data in the CDR stream since the operation was a marshaling
+ // operation, not a fragmentation operation.
+ cdr.more_fragments (false);
+
return true;
}
diff --git a/TAO/tao/params.cpp b/TAO/tao/params.cpp
index a20fec5bb2b..3e2fca6b663 100644
--- a/TAO/tao/params.cpp
+++ b/TAO/tao/params.cpp
@@ -26,6 +26,7 @@ TAO_ORB_Parameters::TAO_ORB_Parameters (void)
, nodelay_ (1)
, sock_keepalive_ (0)
, cdr_memcpy_tradeoff_ (ACE_DEFAULT_CDR_MEMCPY_TRADEOFF)
+ , max_message_size_ (0) // Disable outgoing GIOP fragments by default
, use_lite_protocol_ (0)
, use_dotted_decimal_addresses_ (0)
, cache_incoming_by_dotted_decimal_address_ (0)
diff --git a/TAO/tao/params.h b/TAO/tao/params.h
index 3d5d51ff3ef..c65f9177e91 100644
--- a/TAO/tao/params.h
+++ b/TAO/tao/params.h
@@ -98,6 +98,15 @@ public:
int cdr_memcpy_tradeoff (void) const;
void cdr_memcpy_tradeoff (int);
+ /**
+ * Maximum size of a GIOP message before outgoing fragmentation
+ * kicks in.
+ */
+ //@{
+ ACE_CDR::ULong max_message_size (void) const;
+ void max_message_size (ACE_CDR::ULong size);
+ //@}
+
/// The ORB will use the dotted decimal notation for addresses. By
/// default we use the full ascii names.
int use_dotted_decimal_addresses (void) const;
@@ -238,6 +247,13 @@ private:
/// CDR streams.
int cdr_memcpy_tradeoff_;
+ /// Maximum GIOP message size to be sent over a given transport.
+ /**
+ * Setting a maximum message size will cause outgoing GIOP
+ * fragmentation to be enabled.
+ */
+ ACE_CDR::ULong max_message_size_;
+
/// For selecting a liteweight version of the GIOP protocol.
int use_lite_protocol_;
diff --git a/TAO/tao/params.i b/TAO/tao/params.i
index f9c2d3e889c..d900d18a967 100644
--- a/TAO/tao/params.i
+++ b/TAO/tao/params.i
@@ -40,6 +40,18 @@ TAO_ORB_Parameters::cdr_memcpy_tradeoff (int x)
this->cdr_memcpy_tradeoff_ = x;
}
+ACE_INLINE ACE_CDR::ULong
+TAO_ORB_Parameters::max_message_size (void) const
+{
+ return this->max_message_size_;
+}
+
+ACE_INLINE void
+TAO_ORB_Parameters::max_message_size (ACE_CDR::ULong size)
+{
+ this->max_message_size_ = size;
+}
+
ACE_INLINE int
TAO_ORB_Parameters::use_dotted_decimal_addresses (void) const
{
diff --git a/TAO/tao/tao.mpc b/TAO/tao/tao.mpc
index 1df2e0b5079..cecfc89cc43 100644
--- a/TAO/tao/tao.mpc
+++ b/TAO/tao/tao.mpc
@@ -68,6 +68,7 @@ project(TAO) : acelib, core, tao_output, taodefaults, pidl, extra_core {
FILE_Parser.cpp
FloatSeqC.cpp
Flushing_Strategy.cpp
+ GIOP_Fragmentation_Strategy.cpp
GIOP_Message_Base.cpp
GIOP_Message_Generator_Parser.cpp
GIOP_Message_Generator_Parser_10.cpp
@@ -128,6 +129,7 @@ project(TAO) : acelib, core, tao_output, taodefaults, pidl, extra_core {
Muxed_TMS.cpp
New_Leader_Generator.cpp
NVList_Adapter.cpp
+ Null_Fragmentation_Strategy.cpp
Object.cpp
Object_KeyC.cpp
Object_Loader.cpp
@@ -136,6 +138,7 @@ project(TAO) : acelib, core, tao_output, taodefaults, pidl, extra_core {
ObjectIdListC.cpp
ObjectKey_Table.cpp
OctetSeqC.cpp
+ On_Demand_Fragmentation_Strategy.cpp
operation_details.cpp
ORB.cpp
ORBInitializer_Registry.cpp