diff options
author | bala <bala@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 2003-09-03 21:53:57 +0000 |
---|---|---|
committer | bala <bala@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 2003-09-03 21:53:57 +0000 |
commit | ba901ebfa4cc69d3e18720fa7839fc9f3a00fb1d (patch) | |
tree | d881176015f96d23a14d45bf8b1e36f9be1c7f40 /TAO | |
parent | 2b7a4ebea12a6ecc796f7c93eb64c32c4e6327b9 (diff) | |
download | ATCD-ba901ebfa4cc69d3e18720fa7839fc9f3a00fb1d.tar.gz |
ChangeLogTag:Wed Sep 3 16:47:23 2003 Balachandran Natarajan <bala@dre.vanderbilt.edu>
Diffstat (limited to 'TAO')
-rw-r--r-- | TAO/tao/Messaging/Asynch_Invocation.cpp | 181 | ||||
-rw-r--r-- | TAO/tao/Messaging/Asynch_Invocation.h | 59 | ||||
-rw-r--r-- | TAO/tao/Messaging/Asynch_Invocation_Adapter.cpp | 122 | ||||
-rw-r--r-- | TAO/tao/Messaging/Asynch_Invocation_Adapter.h | 96 |
4 files changed, 458 insertions, 0 deletions
diff --git a/TAO/tao/Messaging/Asynch_Invocation.cpp b/TAO/tao/Messaging/Asynch_Invocation.cpp new file mode 100644 index 00000000000..790d7de27d6 --- /dev/null +++ b/TAO/tao/Messaging/Asynch_Invocation.cpp @@ -0,0 +1,181 @@ +//$Id$ +#include "Asynch_Invocation.h" +#include "Asynch_Reply_Dispatcher.h" +#include "tao/Profile_Transport_Resolver.h" +#include "tao/Invocation_Utils.h" +#include "tao/operation_details.h" +#include "tao/Bind_Dispatcher_Guard.h" +#include "tao/Transport.h" +#include "tao/Muxed_TMS.h" +#include "tao/Pluggable_Messaging.h" + +/* + +#include "Stub.h" + +#include "TAOC.h" +*/ +ACE_RCSID (tao, + Synch_Invocation, + "$Id$") + +namespace TAO +{ + Asynch_Remote_Invocation::Asynch_Remote_Invocation ( + CORBA::Object_ptr otarget, + Profile_Transport_Resolver &resolver, + TAO_Operation_Details &detail, + TAO_Asynch_Reply_Dispatcher *rd, + bool response_expected) + : Synch_Twoway_Invocation (otarget, + resolver, + detail, + response_expected) + , rd_ (rd) + { + } + + Invocation_Status + Asynch_Remote_Invocation::remote_invocation (ACE_Time_Value *max_wait_time + ACE_ENV_ARG_DECL) + ACE_THROW_SPEC ((CORBA::Exception)) + { + // Register a reply dispatcher for this invocation. Use the + // preallocated reply dispatcher. + TAO_Bind_Dispatcher_Guard dispatch_guard (this->details_.request_id (), + this->rd_, + this->resolver_.transport ()->tms ()); + + + if (dispatch_guard.status () != 0) + { + // @@ What is the right way to handle this error? Do we need + // to call the interceptors in this case? + ACE_THROW_RETURN (CORBA::INTERNAL (TAO_DEFAULT_MINOR_CODE, + CORBA::COMPLETED_NO), + TAO_INVOKE_FAILURE); + } + + + TAO_Target_Specification tspec; + this->init_target_spec (tspec ACE_ENV_ARG_PARAMETER); + ACE_CHECK_RETURN (TAO_INVOKE_FAILURE); + + Invocation_Status s = TAO_INVOKE_FAILURE; + +#if TAO_HAS_INTERCEPTORS == 1 + s = + this->send_request_interception (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_CHECK_RETURN (TAO_INVOKE_FAILURE); + + if (s != TAO_INVOKE_SUCCESS) + return s; +#endif /*TAO_HAS_INTERCEPTORS */ + + TAO_OutputCDR &cdr = + this->resolver_.transport ()->messaging_object ()->out_stream (); + + // We have started the interception flow. We need to call the + // ending interception flow if things go wrong. The purpose of the + // try block is to do just this. + ACE_TRY + { + this->write_header (tspec, + cdr + ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + this->marshal_data (cdr + ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + s = + this->send_message (cdr, + TAO_Transport::TAO_TWOWAY_REQUEST, + max_wait_time + ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + +#if TAO_HAS_INTERCEPTORS == 1 + // If the above call returns a restart due to connection + // failure then call the receive_other interception point + // before we leave. + if (s == TAO_INVOKE_RESTART) + { + s = + this->receive_other_interception (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + } +#endif /*TAO_HAS_INTERCEPTORS */ + + if (s != TAO_INVOKE_SUCCESS) + return s; + + // For some strategies one may want to release the transport + // back to cache. If the idling is successfull let the + // resolver about that. + if (this->resolver_.transport ()->idle_after_send ()) + this->resolver_.transport_released (); + + // @@ In all MT environments, there's a cancellation point lurking + // here; need to investigate. Client threads would frequently be + // canceled sometime during recv_request ... the correct action to + // take on being canceled is to issue a CancelRequest message to the + // server and then imediately let other client-side cancellation + // handlers do their jobs. + // + // In C++, that basically means to unwind the stack using almost + // normal procedures: all destructors should fire, and some "catch" + // blocks should probably be able to handle things like releasing + // pointers. (Without unwinding the C++ stack, resources that must + // be freed by thread cancellation won't be freed, and the process + // won't continue to function correctly.) The tricky part is that + // according to POSIX, all C stack frames must also have their + // (explicitly coded) handlers called. We assume a POSIX.1c/C/C++ + // environment. +#if TAO_HAS_INTERCEPTORS == 1 + s = + this->receive_other_interception (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; +#endif /*TAO_HAS_INTERCEPTORS */ + } + ACE_CATCHANY + { +#if TAO_HAS_INTERCEPTORS == 1 + PortableInterceptor::ReplyStatus status = + this->handle_any_exception (&ACE_ANY_EXCEPTION + ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + if (status == PortableInterceptor::LOCATION_FORWARD || + status == PortableInterceptor::TRANSPORT_RETRY) + s = TAO_INVOKE_RESTART; + else if (status == PortableInterceptor::SYSTEM_EXCEPTION + || status == PortableInterceptor::USER_EXCEPTION) +#endif /*TAO_HAS_INTERCEPTORS*/ + ACE_RE_THROW; + } +# if defined (ACE_HAS_EXCEPTIONS) \ + && defined (ACE_HAS_BROKEN_UNEXPECTED_EXCEPTIONS) + ACE_CATCHALL + { +#if TAO_HAS_INTERCEPTORS == 1 + PortableInterceptor::ReplyStatus st = + this->handle_all_exception (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + + if (st == PortableInterceptor::LOCATION_FORWARD || + st == PortableInterceptor::TRANSPORT_RETRY) + s = TAO_INVOKE_RESTART; + else +#endif /*TAO_HAS_INTERCEPTORS == 1*/ + ACE_RE_THROW; + } +# endif /* ACE_HAS_EXCEPTIONS && + ACE_HAS_BROKEN_UNEXPECTED_EXCEPTION*/ + ACE_ENDTRY; + ACE_CHECK_RETURN (TAO_INVOKE_FAILURE); + + return s; + } +} diff --git a/TAO/tao/Messaging/Asynch_Invocation.h b/TAO/tao/Messaging/Asynch_Invocation.h new file mode 100644 index 00000000000..aaa7f4a677f --- /dev/null +++ b/TAO/tao/Messaging/Asynch_Invocation.h @@ -0,0 +1,59 @@ +// -*- C++ -*- + +//============================================================================= +/** + * @file Asynch_Invocation.h + * + * $Id$ + * + * @author Balachandran Natarajan <bala@dre.vanderbilt.edu> + */ +//============================================================================= + +#ifndef TAO_MESSAGING_ASYNCH_INVOCATION_H +#define TAO_MESSAGING_ASYNCH_INVOCATION_H +#include "ace/pre.h" + +#include "tao/Synch_Invocation.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +#include "ace/Global_Macros.h" + +class TAO_Operation_Details; +class TAO_InputCDR; +class ACE_Time_Value; +class TAO_Asynch_Reply_Dispatcher; + +namespace CORBA +{ + class SystemException; +} + +namespace TAO +{ + class Profile_Transport_Resolver; + + class TAO_Export Asynch_Remote_Invocation : protected Synch_Twoway_Invocation + { + public: + Asynch_Remote_Invocation (CORBA::Object_ptr otarget, + Profile_Transport_Resolver &resolver, + TAO_Operation_Details &detail, + TAO_Asynch_Reply_Dispatcher *rd, + bool response_expected = true); + + Invocation_Status remote_invocation (ACE_Time_Value *value + ACE_ENV_ARG_DECL) + ACE_THROW_SPEC ((CORBA::Exception)); + + private: + TAO_Asynch_Reply_Dispatcher *rd_; + }; +} + + +#include "ace/post.h" +#endif /*TAO_SYNCH_INVOCATION_H*/ diff --git a/TAO/tao/Messaging/Asynch_Invocation_Adapter.cpp b/TAO/tao/Messaging/Asynch_Invocation_Adapter.cpp new file mode 100644 index 00000000000..30d7edf98cd --- /dev/null +++ b/TAO/tao/Messaging/Asynch_Invocation_Adapter.cpp @@ -0,0 +1,122 @@ +//$Id$ +#include "Asynch_Invocation_Adapter.h" +#include "Asynch_Reply_Dispatcher.h" +#include "tao/Profile_Transport_Resolver.h" +#include "tao/operation_details.h" +#include "tao/Stub.h" +#include "tao/corbafwd.h" +#include "tao/Transport.h" +#include "tao/Muxed_TMS.h" +#include "Asynch_Invocation.h" + +ACE_RCSID (tao, + Invocation_Adapter, + "$Id$") + + +namespace TAO +{ + Asynch_Invocation_Adapter::Asynch_Invocation_Adapter (CORBA::Object *target, + Argument **args, + int arg_number, + char *operation, + int op_len, + Collocation_Proxy_Broker *p, + Invocation_Mode m) + : Invocation_Adapter (target, + args, + arg_number, + operation, + op_len, + p, + TAO_TWOWAY_INVOCATION, + m) + , rd_ (0) + { + } + + void + Asynch_Invocation_Adapter::invoke (Messaging::ReplyHandler_ptr reply_handler_ptr, + const TAO_Reply_Handler_Skeleton &reply_handler_skel + ACE_ENV_ARG_DECL) + { + TAO_Stub *stub = + this->get_stub (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_CHECK; + + // If the reply handler is nil, we do not create a reply dispatcher. + // The ORB will drop replies to which it cannot associate a reply + // dispatcher. + if (!CORBA::is_nil (reply_handler_ptr)) + { + // New reply dispatcher on the heap, because + // we will go out of scope and hand over the reply dispatcher + // to the ORB. + + // @@ Need to use memory pool here.. + ACE_NEW_THROW_EX (this->rd_, + TAO_Asynch_Reply_Dispatcher (reply_handler_skel, + reply_handler_ptr, + stub->orb_core ()), + CORBA::NO_MEMORY ()); + } + + Invocation_Adapter::invoke (0, 0 ACE_ENV_ARG_PARAMETER); + ACE_CHECK; + } + + + void + Asynch_Invocation_Adapter::invoke_remote (TAO_Stub *stub, + TAO_Operation_Details &op + ACE_ENV_ARG_DECL) + { + ACE_Time_Value tmp_wait_time; + bool is_timeout = + this->get_timeout (tmp_wait_time); + + ACE_Time_Value *max_wait_time = 0; + + if (is_timeout) + max_wait_time = &tmp_wait_time; + + // Initial state + // TAO::Invocation_Status status = TAO_INVOKE_START; + + Profile_Transport_Resolver resolver (this->target_, + stub); + resolver.resolve (max_wait_time + ACE_ENV_ARG_PARAMETER); + ACE_CHECK; + + // Update the request id now that we have a transport + op.request_id (resolver.transport ()->tms ()->request_id ()); + + if (this->rd_) + { + // Cache the transport in the reply dispatcher + this->rd_->transport (resolver.transport ()); + + // AMI Timeout Handling Begin + if (is_timeout) + { + this->rd_->schedule_timer (op.request_id (), + *max_wait_time); + } + } + + // @@ NOTE:Need to change this to something better. Too many + // hash defines meaning the same thing.. + op.response_flags (TAO_TWOWAY_RESPONSE_FLAG); + TAO::Asynch_Remote_Invocation asynch (this->target_, + resolver, + op, + this->rd_); + + (void) asynch.remote_invocation (max_wait_time + ACE_ENV_ARG_PARAMETER); + ACE_CHECK; + } + + +} // End namespace TAO diff --git a/TAO/tao/Messaging/Asynch_Invocation_Adapter.h b/TAO/tao/Messaging/Asynch_Invocation_Adapter.h new file mode 100644 index 00000000000..ddd50e8f522 --- /dev/null +++ b/TAO/tao/Messaging/Asynch_Invocation_Adapter.h @@ -0,0 +1,96 @@ +// -*- C++ -*- + +//============================================================================= +/** + * @file Asynch_Invocation_Adapter.h + * + * $Id$ + * + * + * @author Balachandran Natarajan <bala@dre.vanderbilt.edu> + */ +//============================================================================= +#ifndef TAO_MESSAGING_ASYNCH_INVOCATION_ADAPTER_H +#define TAO_MESSAGING_ASYNCH_INVOCATION_ADAPTER_H +#include "ace/pre.h" + +#include "tao/Messaging/messaging_export.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +#include "tao/Invocation_Adapter.h" +#include "ace/CORBA_macros.h" +#include "ace/Global_Macros.h" +#include "tao/Messaging/Messaging.h" + +class TAO_Operation_Details; +class TAO_Stub; +class ACE_Time_Value; +class TAO_Asynch_Reply_Dispatcher; + +namespace CORBA +{ + class Object; + class Environment; + class SystemException; +} + +namespace TAO +{ + class Argument; + + class Collocation_Proxy_Broker; + + /** + * @class Asynch_Invocation_Adapter + * + * @brief Generic interface for the invocation object visible to the + * IDL compiler. + * + */ + class TAO_Messaging_Export Asynch_Invocation_Adapter : private Invocation_Adapter + { + public: + Asynch_Invocation_Adapter ( + CORBA::Object *target, + Argument **args, + int arg_number, + char *operation, + int op_len, + Collocation_Proxy_Broker *b, + TAO::Invocation_Mode mode = TAO_ASYNCHRONOUS_CALLBACK_INVOCATION); + + void invoke (Messaging::ReplyHandler_ptr reply_handler_ptr, + const TAO_Reply_Handler_Skeleton &reply_handler_skel + ACE_ENV_ARG_DECL); + + protected: + + /** + virtual void invoke_collocated (TAO_Stub *, + TAO_Operation_Details &op + ACE_ENV_ARG_DECL); + **/ + virtual void invoke_remote (TAO_Stub *, + TAO_Operation_Details &op + ACE_ENV_ARG_DECL); + + private: + + private: + TAO_Asynch_Reply_Dispatcher *rd_; + + private: + /// Dont allow default initializations + ACE_UNIMPLEMENTED_FUNC (Asynch_Invocation_Adapter (void)); + + ACE_UNIMPLEMENTED_FUNC (Asynch_Invocation_Adapter & operator= (const Asynch_Invocation_Adapter &)); + + }; +} // End namespace TAO + + +#include "ace/post.h" +#endif /*TAO_MESSAGING_ASYNCH_INVOCATION_ADAPTER_H*/ |