summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorbala <balanatarajan@users.noreply.github.com>2003-09-03 21:53:57 +0000
committerbala <balanatarajan@users.noreply.github.com>2003-09-03 21:53:57 +0000
commit48aa2c139d2f23c318f829ece81b5197b5d0d1a6 (patch)
treed881176015f96d23a14d45bf8b1e36f9be1c7f40
parent21540d07016aa900e4345bd152f7ff6791bf9799 (diff)
downloadATCD-48aa2c139d2f23c318f829ece81b5197b5d0d1a6.tar.gz
ChangeLogTag:Wed Sep 3 16:47:23 2003 Balachandran Natarajan <bala@dre.vanderbilt.edu>
-rw-r--r--TAO/tao/Messaging/Asynch_Invocation.cpp181
-rw-r--r--TAO/tao/Messaging/Asynch_Invocation.h59
-rw-r--r--TAO/tao/Messaging/Asynch_Invocation_Adapter.cpp122
-rw-r--r--TAO/tao/Messaging/Asynch_Invocation_Adapter.h96
4 files changed, 458 insertions, 0 deletions
diff --git a/TAO/tao/Messaging/Asynch_Invocation.cpp b/TAO/tao/Messaging/Asynch_Invocation.cpp
new file mode 100644
index 00000000000..790d7de27d6
--- /dev/null
+++ b/TAO/tao/Messaging/Asynch_Invocation.cpp
@@ -0,0 +1,181 @@
+//$Id$
+#include "Asynch_Invocation.h"
+#include "Asynch_Reply_Dispatcher.h"
+#include "tao/Profile_Transport_Resolver.h"
+#include "tao/Invocation_Utils.h"
+#include "tao/operation_details.h"
+#include "tao/Bind_Dispatcher_Guard.h"
+#include "tao/Transport.h"
+#include "tao/Muxed_TMS.h"
+#include "tao/Pluggable_Messaging.h"
+
+/*
+
+#include "Stub.h"
+
+#include "TAOC.h"
+*/
+ACE_RCSID (tao,
+ Synch_Invocation,
+ "$Id$")
+
+namespace TAO
+{
+ Asynch_Remote_Invocation::Asynch_Remote_Invocation (
+ CORBA::Object_ptr otarget,
+ Profile_Transport_Resolver &resolver,
+ TAO_Operation_Details &detail,
+ TAO_Asynch_Reply_Dispatcher *rd,
+ bool response_expected)
+ : Synch_Twoway_Invocation (otarget,
+ resolver,
+ detail,
+ response_expected)
+ , rd_ (rd)
+ {
+ }
+
+ Invocation_Status
+ Asynch_Remote_Invocation::remote_invocation (ACE_Time_Value *max_wait_time
+ ACE_ENV_ARG_DECL)
+ ACE_THROW_SPEC ((CORBA::Exception))
+ {
+ // Register a reply dispatcher for this invocation. Use the
+ // preallocated reply dispatcher.
+ TAO_Bind_Dispatcher_Guard dispatch_guard (this->details_.request_id (),
+ this->rd_,
+ this->resolver_.transport ()->tms ());
+
+
+ if (dispatch_guard.status () != 0)
+ {
+ // @@ What is the right way to handle this error? Do we need
+ // to call the interceptors in this case?
+ ACE_THROW_RETURN (CORBA::INTERNAL (TAO_DEFAULT_MINOR_CODE,
+ CORBA::COMPLETED_NO),
+ TAO_INVOKE_FAILURE);
+ }
+
+
+ TAO_Target_Specification tspec;
+ this->init_target_spec (tspec ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK_RETURN (TAO_INVOKE_FAILURE);
+
+ Invocation_Status s = TAO_INVOKE_FAILURE;
+
+#if TAO_HAS_INTERCEPTORS == 1
+ s =
+ this->send_request_interception (ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_CHECK_RETURN (TAO_INVOKE_FAILURE);
+
+ if (s != TAO_INVOKE_SUCCESS)
+ return s;
+#endif /*TAO_HAS_INTERCEPTORS */
+
+ TAO_OutputCDR &cdr =
+ this->resolver_.transport ()->messaging_object ()->out_stream ();
+
+ // We have started the interception flow. We need to call the
+ // ending interception flow if things go wrong. The purpose of the
+ // try block is to do just this.
+ ACE_TRY
+ {
+ this->write_header (tspec,
+ cdr
+ ACE_ENV_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+
+ this->marshal_data (cdr
+ ACE_ENV_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+
+ s =
+ this->send_message (cdr,
+ TAO_Transport::TAO_TWOWAY_REQUEST,
+ max_wait_time
+ ACE_ENV_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+
+#if TAO_HAS_INTERCEPTORS == 1
+ // If the above call returns a restart due to connection
+ // failure then call the receive_other interception point
+ // before we leave.
+ if (s == TAO_INVOKE_RESTART)
+ {
+ s =
+ this->receive_other_interception (ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+ }
+#endif /*TAO_HAS_INTERCEPTORS */
+
+ if (s != TAO_INVOKE_SUCCESS)
+ return s;
+
+ // For some strategies one may want to release the transport
+ // back to cache. If the idling is successfull let the
+ // resolver about that.
+ if (this->resolver_.transport ()->idle_after_send ())
+ this->resolver_.transport_released ();
+
+ // @@ In all MT environments, there's a cancellation point lurking
+ // here; need to investigate. Client threads would frequently be
+ // canceled sometime during recv_request ... the correct action to
+ // take on being canceled is to issue a CancelRequest message to the
+ // server and then imediately let other client-side cancellation
+ // handlers do their jobs.
+ //
+ // In C++, that basically means to unwind the stack using almost
+ // normal procedures: all destructors should fire, and some "catch"
+ // blocks should probably be able to handle things like releasing
+ // pointers. (Without unwinding the C++ stack, resources that must
+ // be freed by thread cancellation won't be freed, and the process
+ // won't continue to function correctly.) The tricky part is that
+ // according to POSIX, all C stack frames must also have their
+ // (explicitly coded) handlers called. We assume a POSIX.1c/C/C++
+ // environment.
+#if TAO_HAS_INTERCEPTORS == 1
+ s =
+ this->receive_other_interception (ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+#endif /*TAO_HAS_INTERCEPTORS */
+ }
+ ACE_CATCHANY
+ {
+#if TAO_HAS_INTERCEPTORS == 1
+ PortableInterceptor::ReplyStatus status =
+ this->handle_any_exception (&ACE_ANY_EXCEPTION
+ ACE_ENV_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+
+ if (status == PortableInterceptor::LOCATION_FORWARD ||
+ status == PortableInterceptor::TRANSPORT_RETRY)
+ s = TAO_INVOKE_RESTART;
+ else if (status == PortableInterceptor::SYSTEM_EXCEPTION
+ || status == PortableInterceptor::USER_EXCEPTION)
+#endif /*TAO_HAS_INTERCEPTORS*/
+ ACE_RE_THROW;
+ }
+# if defined (ACE_HAS_EXCEPTIONS) \
+ && defined (ACE_HAS_BROKEN_UNEXPECTED_EXCEPTIONS)
+ ACE_CATCHALL
+ {
+#if TAO_HAS_INTERCEPTORS == 1
+ PortableInterceptor::ReplyStatus st =
+ this->handle_all_exception (ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+
+ if (st == PortableInterceptor::LOCATION_FORWARD ||
+ st == PortableInterceptor::TRANSPORT_RETRY)
+ s = TAO_INVOKE_RESTART;
+ else
+#endif /*TAO_HAS_INTERCEPTORS == 1*/
+ ACE_RE_THROW;
+ }
+# endif /* ACE_HAS_EXCEPTIONS &&
+ ACE_HAS_BROKEN_UNEXPECTED_EXCEPTION*/
+ ACE_ENDTRY;
+ ACE_CHECK_RETURN (TAO_INVOKE_FAILURE);
+
+ return s;
+ }
+}
diff --git a/TAO/tao/Messaging/Asynch_Invocation.h b/TAO/tao/Messaging/Asynch_Invocation.h
new file mode 100644
index 00000000000..aaa7f4a677f
--- /dev/null
+++ b/TAO/tao/Messaging/Asynch_Invocation.h
@@ -0,0 +1,59 @@
+// -*- C++ -*-
+
+//=============================================================================
+/**
+ * @file Asynch_Invocation.h
+ *
+ * $Id$
+ *
+ * @author Balachandran Natarajan <bala@dre.vanderbilt.edu>
+ */
+//=============================================================================
+
+#ifndef TAO_MESSAGING_ASYNCH_INVOCATION_H
+#define TAO_MESSAGING_ASYNCH_INVOCATION_H
+#include "ace/pre.h"
+
+#include "tao/Synch_Invocation.h"
+
+#if !defined (ACE_LACKS_PRAGMA_ONCE)
+# pragma once
+#endif /* ACE_LACKS_PRAGMA_ONCE */
+
+#include "ace/Global_Macros.h"
+
+class TAO_Operation_Details;
+class TAO_InputCDR;
+class ACE_Time_Value;
+class TAO_Asynch_Reply_Dispatcher;
+
+namespace CORBA
+{
+ class SystemException;
+}
+
+namespace TAO
+{
+ class Profile_Transport_Resolver;
+
+ class TAO_Export Asynch_Remote_Invocation : protected Synch_Twoway_Invocation
+ {
+ public:
+ Asynch_Remote_Invocation (CORBA::Object_ptr otarget,
+ Profile_Transport_Resolver &resolver,
+ TAO_Operation_Details &detail,
+ TAO_Asynch_Reply_Dispatcher *rd,
+ bool response_expected = true);
+
+ Invocation_Status remote_invocation (ACE_Time_Value *value
+ ACE_ENV_ARG_DECL)
+ ACE_THROW_SPEC ((CORBA::Exception));
+
+ private:
+ TAO_Asynch_Reply_Dispatcher *rd_;
+ };
+}
+
+
+#include "ace/post.h"
+#endif /*TAO_SYNCH_INVOCATION_H*/
diff --git a/TAO/tao/Messaging/Asynch_Invocation_Adapter.cpp b/TAO/tao/Messaging/Asynch_Invocation_Adapter.cpp
new file mode 100644
index 00000000000..30d7edf98cd
--- /dev/null
+++ b/TAO/tao/Messaging/Asynch_Invocation_Adapter.cpp
@@ -0,0 +1,122 @@
+//$Id$
+#include "Asynch_Invocation_Adapter.h"
+#include "Asynch_Reply_Dispatcher.h"
+#include "tao/Profile_Transport_Resolver.h"
+#include "tao/operation_details.h"
+#include "tao/Stub.h"
+#include "tao/corbafwd.h"
+#include "tao/Transport.h"
+#include "tao/Muxed_TMS.h"
+#include "Asynch_Invocation.h"
+
+ACE_RCSID (tao,
+ Invocation_Adapter,
+ "$Id$")
+
+
+namespace TAO
+{
+ Asynch_Invocation_Adapter::Asynch_Invocation_Adapter (CORBA::Object *target,
+ Argument **args,
+ int arg_number,
+ char *operation,
+ int op_len,
+ Collocation_Proxy_Broker *p,
+ Invocation_Mode m)
+ : Invocation_Adapter (target,
+ args,
+ arg_number,
+ operation,
+ op_len,
+ p,
+ TAO_TWOWAY_INVOCATION,
+ m)
+ , rd_ (0)
+ {
+ }
+
+ void
+ Asynch_Invocation_Adapter::invoke (Messaging::ReplyHandler_ptr reply_handler_ptr,
+ const TAO_Reply_Handler_Skeleton &reply_handler_skel
+ ACE_ENV_ARG_DECL)
+ {
+ TAO_Stub *stub =
+ this->get_stub (ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_CHECK;
+
+ // If the reply handler is nil, we do not create a reply dispatcher.
+ // The ORB will drop replies to which it cannot associate a reply
+ // dispatcher.
+ if (!CORBA::is_nil (reply_handler_ptr))
+ {
+ // New reply dispatcher on the heap, because
+ // we will go out of scope and hand over the reply dispatcher
+ // to the ORB.
+
+ // @@ Need to use memory pool here..
+ ACE_NEW_THROW_EX (this->rd_,
+ TAO_Asynch_Reply_Dispatcher (reply_handler_skel,
+ reply_handler_ptr,
+ stub->orb_core ()),
+ CORBA::NO_MEMORY ());
+ }
+
+ Invocation_Adapter::invoke (0, 0 ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK;
+ }
+
+
+ void
+ Asynch_Invocation_Adapter::invoke_remote (TAO_Stub *stub,
+ TAO_Operation_Details &op
+ ACE_ENV_ARG_DECL)
+ {
+ ACE_Time_Value tmp_wait_time;
+ bool is_timeout =
+ this->get_timeout (tmp_wait_time);
+
+ ACE_Time_Value *max_wait_time = 0;
+
+ if (is_timeout)
+ max_wait_time = &tmp_wait_time;
+
+ // Initial state
+ // TAO::Invocation_Status status = TAO_INVOKE_START;
+
+ Profile_Transport_Resolver resolver (this->target_,
+ stub);
+ resolver.resolve (max_wait_time
+ ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK;
+
+ // Update the request id now that we have a transport
+ op.request_id (resolver.transport ()->tms ()->request_id ());
+
+ if (this->rd_)
+ {
+ // Cache the transport in the reply dispatcher
+ this->rd_->transport (resolver.transport ());
+
+ // AMI Timeout Handling Begin
+ if (is_timeout)
+ {
+ this->rd_->schedule_timer (op.request_id (),
+ *max_wait_time);
+ }
+ }
+
+ // @@ NOTE:Need to change this to something better. Too many
+ // hash defines meaning the same thing..
+ op.response_flags (TAO_TWOWAY_RESPONSE_FLAG);
+ TAO::Asynch_Remote_Invocation asynch (this->target_,
+ resolver,
+ op,
+ this->rd_);
+
+ (void) asynch.remote_invocation (max_wait_time
+ ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK;
+ }
+
+
+} // End namespace TAO
diff --git a/TAO/tao/Messaging/Asynch_Invocation_Adapter.h b/TAO/tao/Messaging/Asynch_Invocation_Adapter.h
new file mode 100644
index 00000000000..ddd50e8f522
--- /dev/null
+++ b/TAO/tao/Messaging/Asynch_Invocation_Adapter.h
@@ -0,0 +1,96 @@
+// -*- C++ -*-
+
+//=============================================================================
+/**
+ * @file Asynch_Invocation_Adapter.h
+ *
+ * $Id$
+ *
+ *
+ * @author Balachandran Natarajan <bala@dre.vanderbilt.edu>
+ */
+//=============================================================================
+#ifndef TAO_MESSAGING_ASYNCH_INVOCATION_ADAPTER_H
+#define TAO_MESSAGING_ASYNCH_INVOCATION_ADAPTER_H
+#include "ace/pre.h"
+
+#include "tao/Messaging/messaging_export.h"
+
+#if !defined (ACE_LACKS_PRAGMA_ONCE)
+# pragma once
+#endif /* ACE_LACKS_PRAGMA_ONCE */
+
+#include "tao/Invocation_Adapter.h"
+#include "ace/CORBA_macros.h"
+#include "ace/Global_Macros.h"
+#include "tao/Messaging/Messaging.h"
+
+class TAO_Operation_Details;
+class TAO_Stub;
+class ACE_Time_Value;
+class TAO_Asynch_Reply_Dispatcher;
+
+namespace CORBA
+{
+ class Object;
+ class Environment;
+ class SystemException;
+}
+
+namespace TAO
+{
+ class Argument;
+
+ class Collocation_Proxy_Broker;
+
+ /**
+ * @class Asynch_Invocation_Adapter
+ *
+ * @brief Generic interface for the invocation object visible to the
+ * IDL compiler.
+ *
+ */
+ class TAO_Messaging_Export Asynch_Invocation_Adapter : private Invocation_Adapter
+ {
+ public:
+ Asynch_Invocation_Adapter (
+ CORBA::Object *target,
+ Argument **args,
+ int arg_number,
+ char *operation,
+ int op_len,
+ Collocation_Proxy_Broker *b,
+ TAO::Invocation_Mode mode = TAO_ASYNCHRONOUS_CALLBACK_INVOCATION);
+
+ void invoke (Messaging::ReplyHandler_ptr reply_handler_ptr,
+ const TAO_Reply_Handler_Skeleton &reply_handler_skel
+ ACE_ENV_ARG_DECL);
+
+ protected:
+
+ /**
+ virtual void invoke_collocated (TAO_Stub *,
+ TAO_Operation_Details &op
+ ACE_ENV_ARG_DECL);
+ **/
+ virtual void invoke_remote (TAO_Stub *,
+ TAO_Operation_Details &op
+ ACE_ENV_ARG_DECL);
+
+ private:
+
+ private:
+ TAO_Asynch_Reply_Dispatcher *rd_;
+
+ private:
+ /// Dont allow default initializations
+ ACE_UNIMPLEMENTED_FUNC (Asynch_Invocation_Adapter (void));
+
+ ACE_UNIMPLEMENTED_FUNC (Asynch_Invocation_Adapter & operator= (const Asynch_Invocation_Adapter &));
+
+ };
+} // End namespace TAO
+
+
+#include "ace/post.h"
+#endif /*TAO_MESSAGING_ASYNCH_INVOCATION_ADAPTER_H*/