summaryrefslogtreecommitdiff
path: root/trunk/TAO/orbsvcs/orbsvcs/FaultTolerance/FT_ClientRequest_Interceptor.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'trunk/TAO/orbsvcs/orbsvcs/FaultTolerance/FT_ClientRequest_Interceptor.cpp')
-rw-r--r--trunk/TAO/orbsvcs/orbsvcs/FaultTolerance/FT_ClientRequest_Interceptor.cpp452
1 files changed, 452 insertions, 0 deletions
diff --git a/trunk/TAO/orbsvcs/orbsvcs/FaultTolerance/FT_ClientRequest_Interceptor.cpp b/trunk/TAO/orbsvcs/orbsvcs/FaultTolerance/FT_ClientRequest_Interceptor.cpp
new file mode 100644
index 00000000000..1da0a14348d
--- /dev/null
+++ b/trunk/TAO/orbsvcs/orbsvcs/FaultTolerance/FT_ClientRequest_Interceptor.cpp
@@ -0,0 +1,452 @@
+// $Id$
+
+#include "orbsvcs/FaultTolerance/FT_ClientRequest_Interceptor.h"
+#include "orbsvcs/FaultTolerance/FT_Service_Callbacks.h"
+#include "orbsvcs/FT_CORBA_ORBC.h"
+
+#include "tao/CORBA_String.h"
+#include "tao/debug.h"
+#include "tao/ORB_Constants.h"
+#include "tao/CDR.h"
+#include "tao/PI/ClientRequestInfo.h"
+
+#include "ace/UUID.h"
+#include "ace/Lock_Adapter_T.h"
+#include "ace/Lock.h"
+#include "ace/Synch_Traits.h"
+
+ACE_RCSID (FaultTolerance,
+ FT_ORBInitializer,
+ "$Id$")
+
+
+TAO_BEGIN_VERSIONED_NAMESPACE_DECL
+
+namespace TAO
+{
+ FT_ClientRequest_Interceptor::FT_ClientRequest_Interceptor (void)
+ : name_ ("TAO_FT_ClientRequest_Interceptor")
+ , uuid_ (0)
+ , lock_ (0)
+ , retention_id_ (0)
+ {
+ ACE_Utils::UUID_GENERATOR::instance ()->init ();
+
+ this->uuid_ =
+ ACE_Utils::UUID_GENERATOR::instance ()->generateUUID ();
+
+ // Would be nice to have runtime option.
+ ACE_NEW (this->lock_,
+ ACE_Lock_Adapter<ACE_SYNCH_MUTEX>);
+
+ }
+
+ FT_ClientRequest_Interceptor::~FT_ClientRequest_Interceptor (void)
+ {
+ delete this->uuid_;
+ delete this->lock_;
+ }
+
+ char *
+ FT_ClientRequest_Interceptor::name (ACE_ENV_SINGLE_ARG_DECL_NOT_USED)
+ ACE_THROW_SPEC ((CORBA::SystemException))
+ {
+ return CORBA::string_dup (this->name_);
+ }
+
+ void
+ FT_ClientRequest_Interceptor::destroy (ACE_ENV_SINGLE_ARG_DECL_NOT_USED)
+ ACE_THROW_SPEC ((CORBA::SystemException))
+ {
+ }
+
+ void
+ FT_ClientRequest_Interceptor::send_poll (
+ PortableInterceptor::ClientRequestInfo_ptr
+ ACE_ENV_ARG_DECL_NOT_USED)
+ ACE_THROW_SPEC ((CORBA::SystemException))
+ {
+ // Do Nothing
+ }
+
+ void
+ FT_ClientRequest_Interceptor::send_request (
+ PortableInterceptor::ClientRequestInfo_ptr ri
+ ACE_ENV_ARG_DECL)
+ ACE_THROW_SPEC ((CORBA::SystemException,
+ PortableInterceptor::ForwardRequest))
+ {
+ if (TAO_debug_level > 3)
+ {
+ CORBA::String_var op =
+ ri->operation (ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_CHECK;
+
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("TAO_FT (%P|%t) - %s called for %s\n"),
+ this->name_,
+ op.in ()));
+ }
+
+ IOP::TaggedComponent_var tp;
+ ACE_TRY
+ {
+ tp =
+ ri->get_effective_component (IOP::TAG_FT_GROUP
+ ACE_ENV_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+ }
+ ACE_CATCHANY
+ {
+ return;
+ }
+ ACE_ENDTRY;
+
+ this->group_version_context (ri,
+ tp
+ ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK;
+
+ this->request_service_context (ri
+ ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK;
+ }
+
+ void
+ FT_ClientRequest_Interceptor::receive_reply (
+ PortableInterceptor::ClientRequestInfo_ptr
+ ACE_ENV_ARG_DECL_NOT_USED)
+ ACE_THROW_SPEC ((CORBA::SystemException))
+ {
+ }
+
+ void
+ FT_ClientRequest_Interceptor::receive_other (
+ PortableInterceptor::ClientRequestInfo_ptr ri
+ ACE_ENV_ARG_DECL)
+ ACE_THROW_SPEC ((CORBA::SystemException,
+ PortableInterceptor::ForwardRequest))
+ {
+ TAO_ClientRequestInfo* tao_ri = dynamic_cast<TAO_ClientRequestInfo*> (ri);
+
+ if (!tao_ri)
+ {
+ ACE_THROW (CORBA::INTERNAL ());
+ }
+
+ TimeBase::TimeT expires = tao_ri->tao_ft_expiration_time ();
+
+ if (!expires)
+ {
+ // Not an FT request
+ return;
+ }
+
+ PortableInterceptor::ReplyStatus status = -1;
+
+ ACE_TRY
+ {
+ status = ri->reply_status(ACE_ENV_SINGLE_ARG_PARAMETER);
+ }
+ ACE_CATCHANY
+ {
+ // No reply status => Not a location forward.
+ return;
+ }
+ ACE_ENDTRY;
+
+ if (status == PortableInterceptor::LOCATION_FORWARD)
+ {
+ // We are in an FT request and a location forward has been received.
+
+ if (expires < TAO_FT_Service_Callbacks::now ())
+ {
+ // The request has already expired...
+
+ if (TAO_debug_level > 3)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "TAO_FT (%P|%t): FT_ClientRequest_Interceptor::receive_other - LOCATION_FORWARD received after request expiration.\n"));
+ }
+
+ // The spec says throw a SYSTEM_EXCEPTION, but doesn't specify which one.
+ // I think a TRANSIENT is the most suitable.
+ ACE_THROW (CORBA::TRANSIENT (
+ CORBA::SystemException::_tao_minor_code (
+ TAO_INVOCATION_LOCATION_FORWARD_MINOR_CODE,
+ errno),
+ CORBA::COMPLETED_NO));
+ }
+ }
+ }
+
+ void
+ FT_ClientRequest_Interceptor::receive_exception (
+ PortableInterceptor::ClientRequestInfo_ptr
+ ACE_ENV_ARG_DECL_NOT_USED)
+ ACE_THROW_SPEC ((CORBA::SystemException,
+ PortableInterceptor::ForwardRequest))
+ {
+#if 0
+ // @@ Will be used later.
+ // Do a check for policy in which this can be done..
+ PortableInterceptor::ReplyStatus rs =
+ ri->reply_status (ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_CHECK;
+
+ if (rs != PortableInterceptor::SYSTEM_EXCEPTION)
+ {
+ return;
+ }
+
+ CORBA::Any_var ex =
+ ri->received_exception (ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_CHECK;
+
+ TAO_OutputCDR cdr;
+
+ if (!ex->impl ()->marshal_value (cdr))
+ return;
+
+ // Place for optimization..
+ TAO_InputCDR icdr (cdr);
+
+ CORBA::String_var rep_id;
+
+ CORBA::ULong min, cs = 0;
+
+ if (!(icdr.read_string (rep_id.out ()) &&
+ icdr.read_ulong (min) &&
+ icdr.read_ulong (cs)))
+ ACE_THROW (CORBA::MARSHAL (TAO::VMCID,
+ CORBA::COMPLETED_MAYBE));
+
+ cs = CORBA::CompletionStatus (cs);
+
+ // ORB would take care of it
+ if (cs != CORBA::COMPLETED_MAYBE)
+ return;
+
+ // If it is COMPLETED_MAYBE, then we will throw a COMPLETED_NO,
+ // which the ORB should understand
+ if (ACE_OS_String::strcmp (rep_id.in (),
+ "IDL:omg.org/CORBA/TRANSIENT:1.0") == 0)
+ ACE_THROW (CORBA::TRANSIENT (min,
+ CORBA::COMPLETED_NO));
+ else if (ACE_OS_String::strcmp (rep_id.in (),
+ "IDL:omg.org/CORBA/COMM_FAILURE:1.0") == 0)
+ ACE_THROW (CORBA::COMM_FAILURE (min,
+ CORBA::COMPLETED_NO));
+ else if (ACE_OS_String::strcmp (rep_id.in (),
+ "IDL:omg.org/CORBA/NO_REPONSE:1.0") == 0)
+ ACE_THROW (CORBA::NO_RESPONSE (min,
+ CORBA::COMPLETED_NO));
+
+ else if (ACE_OS_String::strcmp (rep_id.in (),
+ "IDL:omg.org/CORBA/OBJ_ADAPTER:1.0") == 0)
+ ACE_THROW (CORBA::OBJ_ADAPTER (min,
+ CORBA::COMPLETED_NO));
+ cout << "Didnt throw exception " << endl;
+#endif /*if 0*/
+ return;
+ }
+
+
+ void
+ FT_ClientRequest_Interceptor::group_version_context (
+ PortableInterceptor::ClientRequestInfo_ptr ri,
+ IOP::TaggedComponent* tp
+ ACE_ENV_ARG_DECL)
+ ACE_THROW_SPEC ((CORBA::SystemException))
+ {
+ ACE_TRY
+ {
+ // Grab the object group version
+ // @@ NOTE: This involves an allocation and a dellocation. This is
+ // really bad.
+ TAO_InputCDR cdr (reinterpret_cast<const char*> (tp->component_data.get_buffer ()),
+ tp->component_data.length ());
+ CORBA::Boolean byte_order;
+
+ if ((cdr >> ACE_InputCDR::to_boolean (byte_order)) == 0)
+ return;
+
+ cdr.reset_byte_order (static_cast<int> (byte_order));
+
+ FT::TagFTGroupTaggedComponent gtc;
+
+ if ((cdr >> gtc) == 0)
+ ACE_THROW (CORBA::BAD_PARAM (CORBA::OMGVMCID | 28,
+ CORBA::COMPLETED_NO));
+
+ IOP::ServiceContext sc;
+ sc.context_id = IOP::FT_GROUP_VERSION;
+
+ TAO_OutputCDR ocdr;
+ if (!(ocdr << ACE_OutputCDR::from_boolean (TAO_ENCAP_BYTE_ORDER)))
+ return;
+ //ACE_THROW (CORBA::MARSHAL ());
+
+ if (!(ocdr << gtc.object_group_ref_version))
+ return;
+ //ACE_THROW (CORBA::MARSHAL ());
+
+ CORBA::ULong length =
+ static_cast<CORBA::ULong> (ocdr.total_length ());
+ sc.context_data.length (length);
+ CORBA::Octet *buf = sc.context_data.get_buffer ();
+
+ for (const ACE_Message_Block *i = ocdr.begin ();
+ i != 0;
+ i = i->cont ())
+ {
+ ACE_OS::memcpy (buf, i->rd_ptr (), i->length ());
+ buf += i->length ();
+ }
+
+ // Add this context to the service context list.
+ ri->add_request_service_context (sc,
+ 0
+ ACE_ENV_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+
+ }
+ ACE_CATCHANY
+ {
+ // Not much can be done anyway. Just keep quiet
+ // ACE_RE_THROW;
+ }
+ ACE_ENDTRY;
+ ACE_CHECK;
+
+ return;
+ }
+
+ void
+ FT_ClientRequest_Interceptor::request_service_context (
+ PortableInterceptor::ClientRequestInfo_ptr ri
+ ACE_ENV_ARG_DECL)
+ ACE_THROW_SPEC ((CORBA::SystemException))
+ {
+ ACE_TRY
+ {
+ IOP::ServiceContext sc;
+ sc.context_id = IOP::FT_REQUEST;
+
+ CORBA::Policy_var policy =
+ ri->get_request_policy (FT::REQUEST_DURATION_POLICY
+ ACE_ENV_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+
+ FT::FTRequestServiceContext ftrsc;
+ ftrsc.client_id =
+ CORBA::string_dup (this->uuid_->to_string ()->c_str ());
+
+ TAO_ClientRequestInfo* tao_ri = dynamic_cast<TAO_ClientRequestInfo*> (ri);
+
+ if (!tao_ri)
+ {
+ ACE_THROW (CORBA::INTERNAL ());
+ }
+
+ if (tao_ri->tao_ft_expiration_time ())
+ {
+ ftrsc.retention_id = tao_ri->tao_ft_retention_id ();
+ ftrsc.expiration_time = tao_ri->tao_ft_expiration_time ();
+ }
+ else
+ {
+ ACE_GUARD (ACE_Lock,
+ guard,
+ *this->lock_);
+
+ ftrsc.retention_id = ++this->retention_id_;
+ ftrsc.expiration_time =
+ this->request_expiration_time (policy.in ()
+ ACE_ENV_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+
+ tao_ri->tao_ft_retention_id (ftrsc.retention_id);
+ tao_ri->tao_ft_expiration_time (ftrsc.expiration_time);
+ }
+
+ TAO_OutputCDR ocdr;
+ if (!(ocdr << ACE_OutputCDR::from_boolean (TAO_ENCAP_BYTE_ORDER)))
+ return;
+ //ACE_THROW (CORBA::MARSHAL ());
+
+ if (!(ocdr << ftrsc))
+ return;
+ // ACE_THROW (CORBA::MARSHAL ());
+
+ // Make a *copy* of the CDR stream...
+ CORBA::ULong length =
+ static_cast<CORBA::ULong> (ocdr.total_length ());
+ sc.context_data.length (length);
+ CORBA::Octet *buf = sc.context_data.get_buffer ();
+
+ for (const ACE_Message_Block *i = ocdr.begin ();
+ i != 0;
+ i = i->cont ())
+ {
+ ACE_OS::memcpy (buf, i->rd_ptr (), i->length ());
+ buf += i->length ();
+ }
+
+ // Add this context to the service context list.
+ ri->add_request_service_context (sc,
+ 0
+ ACE_ENV_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+ }
+ ACE_CATCHANY
+ {
+ // ACE_RE_THROW;
+ }
+ ACE_ENDTRY;
+ ACE_CHECK;
+ return;
+ }
+
+ TimeBase::TimeT
+ FT_ClientRequest_Interceptor::request_expiration_time (
+ CORBA::Policy *policy
+ ACE_ENV_ARG_DECL)
+ ACE_THROW_SPEC ((CORBA::SystemException))
+ {
+ FT::RequestDurationPolicy_var p;
+
+ if (policy != 0)
+ {
+ p = FT::RequestDurationPolicy::_narrow (policy
+ ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK_RETURN (0);
+ }
+
+ TimeBase::TimeT t = 0;
+
+ if (p.in ())
+ {
+ t =
+ p->request_duration_policy_value (ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_CHECK_RETURN (0);
+ }
+ else
+ {
+ if (TAO_debug_level > 3)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("TAO_FT (%P|%t) - Using default value")
+ ACE_TEXT ("of 1.5 seconds\n")));
+ }
+ t = 15 * 1000000;
+ }
+
+ // Calculaton of the expiration time
+ t += TAO_FT_Service_Callbacks::now ();
+
+ return t;
+ }
+}
+
+TAO_END_VERSIONED_NAMESPACE_DECL