diff options
Diffstat (limited to 'TAO')
-rw-r--r-- | TAO/tao/Asynch_Invocation.cpp | 63 | ||||
-rw-r--r-- | TAO/tao/Asynch_Invocation.h | 48 | ||||
-rw-r--r-- | TAO/tao/Asynch_Invocation.i | 25 | ||||
-rw-r--r-- | TAO/tao/Object.cpp | 3 | ||||
-rw-r--r-- | TAO/tao/Reply_Dispatcher.cpp | 99 | ||||
-rw-r--r-- | TAO/tao/Reply_Dispatcher.h | 61 | ||||
-rw-r--r-- | TAO/tao/Reply_Dispatcher.i | 16 | ||||
-rw-r--r-- | TAO/tao/Request.cpp | 316 | ||||
-rw-r--r-- | TAO/tao/Request.h | 91 | ||||
-rw-r--r-- | TAO/tao/Stub.cpp | 183 | ||||
-rw-r--r-- | TAO/tao/Stub.h | 16 |
11 files changed, 587 insertions, 334 deletions
diff --git a/TAO/tao/Asynch_Invocation.cpp b/TAO/tao/Asynch_Invocation.cpp index fabf37f7ae6..2e9c9246a6a 100644 --- a/TAO/tao/Asynch_Invocation.cpp +++ b/TAO/tao/Asynch_Invocation.cpp @@ -94,7 +94,8 @@ TAO_GIOP_Twoway_Asynch_Invocation::invoke_i (CORBA::Environment &ACE_TRY_ENV) } // Just send the request, without trying to wait for the reply. - retval = TAO_GIOP_Invocation::invoke (1, ACE_TRY_ENV); + retval = TAO_GIOP_Invocation::invoke (1, + ACE_TRY_ENV); ACE_CHECK_RETURN (retval); if (retval != TAO_INVOKE_OK) @@ -105,9 +106,63 @@ TAO_GIOP_Twoway_Asynch_Invocation::invoke_i (CORBA::Environment &ACE_TRY_ENV) return TAO_INVOKE_OK; } -#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION) -#elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA) -#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */ +//************************************************************************** + +void +TAO_GIOP_DII_Deferred_Invocation::start (CORBA::Environment &ACE_TRY_ENV) + ACE_THROW_SPEC ((CORBA::SystemException)) +{ + this->TAO_GIOP_Invocation::start (ACE_TRY_ENV); + ACE_CHECK; + + this->transport_->start_request (this->orb_core_, + this->profile_, + this->out_stream_, + ACE_TRY_ENV); + ACE_CHECK; +} + +int +TAO_GIOP_DII_Deferred_Invocation::invoke (CORBA::Environment &ACE_TRY_ENV) + ACE_THROW_SPEC ((CORBA::SystemException)) +{ + TAO_FUNCTION_PP_TIMEPROBE (TAO_GIOP_ASYNCH_INVOCATION_INVOKE_START); + + return this->invoke_i (ACE_TRY_ENV); +} + + +int +TAO_GIOP_DII_Deferred_Invocation::invoke_i (CORBA::Environment &ACE_TRY_ENV) + ACE_THROW_SPEC ((CORBA::SystemException)) +{ + // Register a reply dispatcher for this Asynch_Invocation. Use the + // heap allocated reply dispatcher. + + int retval = + this->transport_->tms ()->bind_dispatcher (this->request_id_, + this->rd_); + if (retval == -1) + { + // @@ What is the right way to handle this error? + this->close_connection (); + ACE_THROW_RETURN (CORBA::INTERNAL (TAO_DEFAULT_MINOR_CODE, + CORBA::COMPLETED_NO), + TAO_INVOKE_EXCEPTION); + } + + // Just send the request, without trying to wait for the reply. + retval = TAO_GIOP_Invocation::invoke (1, + ACE_TRY_ENV); + ACE_CHECK_RETURN (retval); + + if (retval != TAO_INVOKE_OK) + return retval; + + // We do not wait for the reply. Let us return. + + return TAO_INVOKE_OK; +} #endif /* TAO_HAS_AMI_CALLBACK || TAO_HAS_AMI_POLLER */ #endif /* TAO_HAS_CORBA_MESSAGING */ diff --git a/TAO/tao/Asynch_Invocation.h b/TAO/tao/Asynch_Invocation.h index 352f06458a4..bacf8b97174 100644 --- a/TAO/tao/Asynch_Invocation.h +++ b/TAO/tao/Asynch_Invocation.h @@ -81,6 +81,54 @@ private: // Reply dispatcher for the current synchronous Asynch_Invocation. }; +//*********************************************************************** + +class TAO_Export TAO_GIOP_DII_Deferred_Invocation : public TAO_GIOP_Invocation +{ + // = TITLE + // Sends a two-way request does not expect the reply. + // + // = DESCRIPTION + // This class connects (or lookups a connection from the cache) to + // the remote server, builds the CDR stream for the Request, send + // the CDR stream and returns. + // +public: + TAO_GIOP_DII_Deferred_Invocation (TAO_Stub *data, + TAO_ORB_Core* orb_core, + const CORBA::Request_ptr req); + // Constructor. + + void start (CORBA_Environment &TAO_IN_ENV = + TAO_default_environment ()) + ACE_THROW_SPEC ((CORBA::SystemException)); + // Calls TAO_GIOP_Asynch_Invocation::start. + + int invoke (CORBA_Environment &TAO_IN_ENV = + TAO_default_environment ()) + ACE_THROW_SPEC ((CORBA::SystemException)); + // Send request, block until any reply comes back, and unmarshal + // reply parameters as appropriate. + + + // TAO_InputCDR &inp_stream (void); + // // Return the underlying input stream. + // + + const IOP::ServiceContextList& reply_service_info (void) const; + // Accessor to the reply ServiceContextList. + +private: + int invoke_i (CORBA::Environment &ACE_TRY_ENV) + ACE_THROW_SPEC ((CORBA::SystemException)); + // Implementation of the invoke() methods, handles the basic + // send/reply code and the system exceptions. + +private: + TAO_DII_Deferred_Reply_Dispatcher *rd_; + // Reply dispatcher for the current synchronous Asynch_Invocation. +}; + #if defined (__ACE_INLINE__) # include "tao/Asynch_Invocation.i" #endif /* __ACE_INLINE__ */ diff --git a/TAO/tao/Asynch_Invocation.i b/TAO/tao/Asynch_Invocation.i index 1cdd8b484da..12895c64383 100644 --- a/TAO/tao/Asynch_Invocation.i +++ b/TAO/tao/Asynch_Invocation.i @@ -10,7 +10,9 @@ TAO_GIOP_Twoway_Asynch_Invocation (TAO_Stub *stub, TAO_ORB_Core *orb_core, const TAO_Reply_Handler_Skeleton &reply_handler_skel, Messaging::ReplyHandler_ptr reply_handler_ptr) - : TAO_GIOP_Invocation (stub, operation, orb_core), + : TAO_GIOP_Invocation (stub, + operation, + orb_core), rd_ (0) { // New reply dispatcher on the heap, because @@ -22,3 +24,24 @@ TAO_GIOP_Twoway_Asynch_Invocation (TAO_Stub *stub, TAO_Asynch_Reply_Dispatcher (reply_handler_skel, reply_handler_ptr)); } + +//**************************************************************************** + +ACE_INLINE +TAO_GIOP_DII_Deferred_Invocation:: +TAO_GIOP_DII_Deferred_Invocation (TAO_Stub *stub, + TAO_ORB_Core *orb_core, + const CORBA::Request_ptr req) + : TAO_GIOP_Invocation (stub, + req->operation (), + orb_core), + rd_ (0) +{ + // New reply dispatcher on the heap, because + // we will go out of scope and hand over the + // reply dispatcher to the ORB. + // So this->rd_ is 0, because we do not need to + // hold a pointer to it. + ACE_NEW (rd_, + TAO_DII_Deferred_Reply_Dispatcher (req)); +} diff --git a/TAO/tao/Object.cpp b/TAO/tao/Object.cpp index 8abd70f267d..0454bc1c728 100644 --- a/TAO/tao/Object.cpp +++ b/TAO/tao/Object.cpp @@ -337,6 +337,7 @@ CORBA_Object::_create_request (CORBA::Context_ptr ctx, ACE_NEW_THROW_EX (request, CORBA::Request (this, + this->protocol_proxy_->orb_core ()-> orb (), operation, arg_list, result, @@ -365,6 +366,7 @@ CORBA_Object::_create_request (CORBA::Context_ptr ctx, ACE_NEW_THROW_EX (request, CORBA::Request (this, + this->protocol_proxy_->orb_core ()->orb (), operation, arg_list, result, @@ -383,6 +385,7 @@ CORBA_Object::_request (const CORBA::Char *operation, CORBA::Request_ptr req = CORBA::Request::_nil (); ACE_NEW_THROW_EX (req, CORBA::Request (this, + this->protocol_proxy_->orb_core ()->orb (), operation, ACE_TRY_ENV), CORBA::NO_MEMORY ()); diff --git a/TAO/tao/Reply_Dispatcher.cpp b/TAO/tao/Reply_Dispatcher.cpp index 9374fc70d05..d0982203235 100644 --- a/TAO/tao/Reply_Dispatcher.cpp +++ b/TAO/tao/Reply_Dispatcher.cpp @@ -134,6 +134,7 @@ TAO_Synch_Reply_Dispatcher::leader_follower_condition_variable (TAO_Transport *t } // ********************************************************************* + #if defined (TAO_HAS_CORBA_MESSAGING) #if defined (TAO_HAS_AMI_CALLBACK) || defined (TAO_HAS_AMI_POLLER) @@ -158,8 +159,6 @@ TAO_Asynch_Reply_Dispatcher::dispatch_reply (CORBA::ULong reply_status, IOP::ServiceContextList &reply_ctx, TAO_GIOP_Message_State *message_state) { - // this->reply_received_ = 1; - this->reply_status_ = reply_status; this->version_ = version; this->message_state_ = message_state; @@ -200,12 +199,11 @@ TAO_Asynch_Reply_Dispatcher::dispatch_reply (CORBA::ULong reply_status, break; } - CORBA::Environment &ACE_TRY_ENV = TAO_default_environment (); - ACE_TRY + ACE_TRY_NEW_ENV { // Call the Reply Handler's skeleton. - reply_handler_skel_ (message_state_->cdr, - reply_handler_, + reply_handler_skel_ (this->message_state_->cdr, + this->reply_handler_, reply_error, ACE_TRY_ENV); ACE_TRY_CHECK; @@ -231,6 +229,95 @@ TAO_Asynch_Reply_Dispatcher::message_state (void) return this->message_state_; } +// ********************************************************************* + +// Constructor. +TAO_DII_Deferred_Reply_Dispatcher::TAO_DII_Deferred_Reply_Dispatcher ( + const CORBA::Request_ptr req + ) + : req_ (req) +{ +} + +// Destructor. +TAO_DII_Deferred_Reply_Dispatcher::~TAO_DII_Deferred_Reply_Dispatcher (void) +{ +} + +// Dispatch the reply. +int +TAO_DII_Deferred_Reply_Dispatcher::dispatch_reply ( + CORBA::ULong reply_status, + const TAO_GIOP_Version &version, + IOP::ServiceContextList &reply_ctx, + TAO_GIOP_Message_State *message_state + ) +{ + this->reply_status_ = reply_status; + this->version_ = version; + this->message_state_ = message_state; + + // Steal the buffer, that way we don't do any unnecesary copies of + // this data. + CORBA::ULong max = reply_ctx.maximum (); + CORBA::ULong len = reply_ctx.length (); + IOP::ServiceContext* context_list = reply_ctx.get_buffer (1); + this->reply_service_info_.replace (max, len, context_list, 1); + + + if (TAO_debug_level >= 4) + { + ACE_DEBUG ((LM_DEBUG, + "(%P | %t):TAO_Asynch_Reply_Dispatcher::dispatch_reply:\n")); + } + + CORBA::ULong reply_error = TAO_AMI_REPLY_NOT_OK; + switch (reply_status) + { + case TAO_GIOP_NO_EXCEPTION: + reply_error = TAO_AMI_REPLY_OK; + break; + case TAO_GIOP_USER_EXCEPTION: + reply_error = TAO_AMI_REPLY_USER_EXCEPTION; + break; + case TAO_GIOP_SYSTEM_EXCEPTION: + reply_error = TAO_AMI_REPLY_SYSTEM_EXCEPTION; + break; + case TAO_GIOP_LOCATION_FORWARD: + default: + reply_error = TAO_AMI_REPLY_NOT_OK; + break; + } + + ACE_TRY_NEW_ENV + { + // Call the Request back and send the reply data. + this->req_->handle_response (this->message_state_->cdr, + reply_error, + ACE_TRY_ENV); + ACE_TRY_CHECK; + } + ACE_CATCHANY + { + if (TAO_debug_level >= 4) + ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, + "Exception during reply handler"); + } + ACE_ENDTRY; + + // This was dynamically allocated. Now the job is done. Commit + // suicide here. + delete this; + + return 1; +} + +TAO_GIOP_Message_State * +TAO_DII_Deferred_Reply_Dispatcher::message_state (void) +{ + return this->message_state_; +} + #endif /* TAO_HAS_AMI_CALLBACK || TAO_HAS_AMI_POLLER */ #endif /* TAO_HAS_CORBA_MESSAGING */ diff --git a/TAO/tao/Reply_Dispatcher.h b/TAO/tao/Reply_Dispatcher.h index ddb8b989595..127d698f35c 100644 --- a/TAO/tao/Reply_Dispatcher.h +++ b/TAO/tao/Reply_Dispatcher.h @@ -21,6 +21,7 @@ #define TAO_REPLY_DISPATCHER_H #include "tao/GIOP.h" +#include "tao/Request.h" #if !defined (ACE_LACKS_PRAGMA_ONCE) # pragma once @@ -205,6 +206,66 @@ private: // Reply Handler passed in the Asynchronous Invocation. }; +// ********************************************************************* + +class TAO_Export TAO_DII_Deferred_Reply_Dispatcher : public TAO_Reply_Dispatcher +{ + // = TITLE + // + // Reply dispatcher for DII deferred requests. + // + // = DESCRIPTION + // + +public: + TAO_DII_Deferred_Reply_Dispatcher (const CORBA::Request_ptr req); + // Constructor. + + virtual ~TAO_DII_Deferred_Reply_Dispatcher (void); + // Destructor. + + CORBA::ULong reply_status (void) const; + // Get the reply status. + + const TAO_GIOP_Version& version (void) const; + // Get the GIOP version + + virtual int dispatch_reply (CORBA::ULong reply_status, + const TAO_GIOP_Version& version, + IOP::ServiceContextList& reply_ctx, + TAO_GIOP_Message_State* message_state); + // Dispatch the reply. This involves demarshalling the reply and + // calling the appropriate call back hook method on the reply + // handler. + // Return 1 on sucess, -1 on error. + + virtual TAO_GIOP_Message_State *message_state (void); + // Return the message state. + +protected: + IOP::ServiceContextList reply_service_info_; + // The service context list + // Note, that this is not a reference as in + // the synchronous case. We own the reply_service_info + // because our TAO_Asynch_Invocation will go out + // of scope before we are done. + +private: + CORBA::ULong reply_status_; + // Reply or LocateReply status. + + TAO_GIOP_Version version_; + // The version + + TAO_GIOP_Message_State *message_state_; + // CDR stream for reading the input. + // @@ Carlos : message_state should go away. All we need is the reply + // cdr. Is that right? (Alex). + + const CORBA::Request_ptr req_; + // Where the reply needs to go. +}; + # endif /* TAO_HAS_AMI_CALLBACK || TAO_HAS_AMI_POLLER */ #endif /* TAO_HAS_CORBA_MESSAGING */ diff --git a/TAO/tao/Reply_Dispatcher.i b/TAO/tao/Reply_Dispatcher.i index 275aa179644..f8c31c897ab 100644 --- a/TAO/tao/Reply_Dispatcher.i +++ b/TAO/tao/Reply_Dispatcher.i @@ -13,6 +13,7 @@ TAO_Synch_Reply_Dispatcher::version (void) const } #if defined (TAO_HAS_CORBA_MESSAGING) && defined (TAO_POLLER) + ACE_INLINE CORBA::ULong TAO_Asynch_Reply_Dispatcher::reply_status (void) const { @@ -24,4 +25,19 @@ TAO_Asynch_Reply_Dispatcher::version (void) const { return this->version_; } + +//********************************************************************* + +ACE_INLINE CORBA::ULong +TAO_DII_Deferred_Reply_Dispatcher::reply_status (void) const +{ + return this->reply_status_; +} + +ACE_INLINE const TAO_GIOP_Version& +TAO_DII_Deferred_Reply_Dispatcher::version (void) const +{ + return this->version_; +} + #endif /* TAO_HAS_CORBA_MESSAGING && TAO_POLLER */ diff --git a/TAO/tao/Request.cpp b/TAO/tao/Request.cpp index 5bc099e8a16..28bcddf16d2 100644 --- a/TAO/tao/Request.cpp +++ b/TAO/tao/Request.cpp @@ -16,7 +16,10 @@ ACE_RCSID(tao, Request, "$Id$") CORBA::ULong CORBA_Request::_incr_refcnt (void) { - ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->refcount_lock_, 0); + ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, + ace_mon, + this->lock_, + 0); return refcount_++; } @@ -24,7 +27,10 @@ CORBA::ULong CORBA_Request::_decr_refcnt (void) { { - ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->refcount_lock_, 0); + ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, + ace_mon, + this->lock_, + 0); this->refcount_--; if (this->refcount_ != 0) return this->refcount_; @@ -45,28 +51,33 @@ CORBA_Request::_nil (void) // DII Request class implementation CORBA_Request::CORBA_Request (CORBA::Object_ptr obj, + CORBA::ORB_ptr orb, const CORBA::Char *op, CORBA::NVList_ptr args, CORBA::NamedValue_ptr result, CORBA::Flags flags, CORBA::Environment &ACE_TRY_ENV) - : args_ (CORBA::NVList::_duplicate (args)), + : orb_ (CORBA::ORB::_duplicate (orb)), + args_ (CORBA::NVList::_duplicate (args)), result_ (CORBA::NamedValue::_duplicate (result)), flags_ (flags), env_ (ACE_TRY_ENV), contexts_ (0), ctx_ (0), refcount_ (1), - lazy_evaluation_ (0) + lazy_evaluation_ (0), + response_received_ (0) { target_ = CORBA::Object::_duplicate (obj); opname_ = CORBA::string_dup (op); } CORBA_Request::CORBA_Request (CORBA::Object_ptr obj, + CORBA::ORB_ptr orb, const CORBA::Char *op, CORBA::Environment &ACE_TRY_ENV) - : flags_ (0), + : orb_ (CORBA::ORB::_duplicate (orb)), + flags_ (0), env_ (ACE_TRY_ENV), contexts_ (0), ctx_ (0), @@ -131,25 +142,93 @@ CORBA_Request::send_oneway (CORBA::Environment &ACE_TRY_ENV) void CORBA_Request::send_deferred (CORBA::Environment &ACE_TRY_ENV) { - ACE_THROW (CORBA::NO_IMPLEMENT (TAO_DEFAULT_MINOR_CODE, - CORBA::COMPLETED_NO)); + { + ACE_GUARD (ACE_SYNCH_MUTEX, + ace_mon, + this->lock_); + + this->response_received_ = 0; + } + + TAO_Stub *stub = this->target_->_stubobj (); + + stub->do_deferred_call (this, + ACE_TRY_ENV); } void CORBA_Request::get_response (CORBA::Environment &ACE_TRY_ENV) { - ACE_THROW (CORBA::NO_IMPLEMENT (TAO_DEFAULT_MINOR_CODE, - CORBA::COMPLETED_NO)); + while (!this->response_received_ && this->orb_->work_pending ()) + { + (void) this->orb_->perform_work (); + } + + if (this->lazy_evaluation_) + { + this->args_->evaluate (ACE_TRY_ENV); + ACE_CHECK; + } } CORBA::Boolean -CORBA_Request::poll_response (CORBA::Environment &ACE_TRY_ENV) +CORBA_Request::poll_response (CORBA::Environment &) { - ACE_THROW_RETURN (CORBA::NO_IMPLEMENT (TAO_DEFAULT_MINOR_CODE, - CORBA::COMPLETED_NO), + ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, + ace_mon, + this->lock_, 0); + + return this->response_received_; } +#if defined (TAO_HAS_CORBA_MESSAGING) + +# if defined (TAO_HAS_AMI_CALLBACK) || defined (TAO_HAS_AMI_POLLER) + +void +CORBA_Request::handle_response (TAO_InputCDR &incoming, + CORBA::ULong reply_status, + CORBA::Environment &ACE_TRY_ENV) +{ + switch (reply_status) + { + case TAO_AMI_REPLY_OK: + if (this->result_ != 0) + { + this->result_->value ()->_tao_decode (incoming, + ACE_TRY_ENV); + ACE_CHECK; + } + + this->args_->_tao_incoming_cdr (incoming, + CORBA::ARG_OUT | CORBA::ARG_INOUT, + this->lazy_evaluation_, + ACE_TRY_ENV); + ACE_CHECK; + + { + ACE_GUARD (ACE_SYNCH_MUTEX, + ace_mon, + this->lock_); + + this->response_received_ = 1; + } + + break; + case TAO_AMI_REPLY_USER_EXCEPTION: + case TAO_AMI_REPLY_SYSTEM_EXCEPTION: + case TAO_AMI_REPLY_NOT_OK: + default: + // @@ (JP) Don't know what to do about any of these yet. + ACE_ERROR ((LM_ERROR, + "(%P|%t) unhandled reply status\n")); + } +} + +# endif /* TAO_HAS_AMI_CALLBACK || TAO_HAS_AMI_POLLER */ + +#endif /* TAO_HAS_CORBA_MESSAGING */ // constructor. CORBA_ORB_RequestSeq::CORBA_ORB_RequestSeq (CORBA::ULong max) @@ -182,219 +261,6 @@ CORBA_ORB_RequestSeq::CORBA_ORB_RequestSeq (void) // no-op } -/* -// Constructor using a maximum length value. -CORBA_ORB_RequestSeq::CORBA_ORB_RequestSeq (CORBA::ULong maximum) - : TAO_Unbounded_Base_Sequence (maximum, allocbuf (maximum)) -{ -} - -CORBA_ORB_RequestSeq::CORBA_ORB_RequestSeq (CORBA::ULong maximum, - CORBA::ULong length, - CORBA::Request_ptr *data, - CORBA::Boolean release) - : TAO_Unbounded_Base_Sequence (maximum, length, data, release) -{ -} - -CORBA_ORB_RequestSeq::CORBA_ORB_RequestSeq (const CORBA_ORB_RequestSeq &rhs) - : TAO_Unbounded_Base_Sequence (rhs) -{ - CORBA::Request_ptr *tmp1 = allocbuf (this->maximum_); - CORBA::Request_ptr * const tmp2 = - ACE_reinterpret_cast (CORBA::Request_ptr * ACE_CAST_CONST, - rhs.buffer_); - - for (CORBA::ULong i = 0; i < this->length_; ++i) - tmp1[i] = tmp2[i]; - - this->buffer_ = tmp1; -} - -CORBA_ORB_RequestSeq & -CORBA_ORB_RequestSeq::operator= (const CORBA_ORB_RequestSeq &rhs) -{ - if (this == &rhs) - return *this; - - if (this->release_) - { - if (this->maximum_ < rhs.maximum_) - { - // free the old buffer - CORBA::Request_ptr *tmp = - ACE_reinterpret_cast (CORBA::Request_ptr *, - this->buffer_); - freebuf (tmp); - this->buffer_ = allocbuf (rhs.maximum_); - } - } - else - this->buffer_ = allocbuf (rhs.maximum_); - - TAO_Unbounded_Base_Sequence::operator= (rhs); - - CORBA::Request_ptr *tmp1 = - ACE_reinterpret_cast (CORBA::Request_ptr *, - this->buffer_); - CORBA::Request_ptr * const tmp2 = - ACE_reinterpret_cast (CORBA::Request_ptr * ACE_CAST_CONST, - rhs.buffer_); - - for (CORBA::ULong i = 0; i < this->length_; ++i) - tmp1[i] = tmp2[i]; - - return *this; -} - -CORBA_ORB_RequestSeq::~CORBA_ORB_RequestSeq (void) -{ - this->_deallocate_buffer (); -} - -CORBA::Request_ptr -CORBA_ORB_RequestSeq::operator[] (CORBA::ULong i) -{ - if (i >= this->maximum_) - ACE_ERROR_RETURN ((LM_ERROR, - "(%P|%t) CORBA_ORB_RequestSeq %p\n", - "operator[] - subscript out of range"), - 0); - - CORBA::Request_ptr *tmp = - ACE_reinterpret_cast (CORBA::Request_ptr *, - this->buffer_); - return tmp[i]; -} - -const CORBA::Request* -CORBA_ORB_RequestSeq::operator[] (CORBA::ULong i) const -{ - if (i >= this->maximum_) - ACE_ERROR_RETURN ((LM_ERROR, - "(%P|%t) CORBA_ORB_RequestSeq %p\n", - "operator[] - subscript out of range"), - 0); - - CORBA::Request_ptr * const tmp = - ACE_reinterpret_cast (CORBA::Request_ptr * ACE_CAST_CONST, - this->buffer_); - - return tmp[i]; -} - -CORBA::Request_ptr * -CORBA_ORB_RequestSeq::allocbuf (CORBA::ULong size) -{ - return new CORBA::Request_ptr[size]; -} - -void -CORBA_ORB_RequestSeq::freebuf (CORBA::Request_ptr *buffer) -{ - delete [] buffer; -} - -void -CORBA_ORB_RequestSeq::_allocate_buffer (CORBA::ULong length) -{ - CORBA::Request_ptr * tmp = allocbuf (length); - - if (this->buffer_ != 0) - { - CORBA::Request_ptr *old = - ACE_reinterpret_cast (CORBA::Request_ptr *, - this->buffer_); - - for (CORBA::ULong i = 0; i < this->length_; ++i) - tmp[i] = old[i]; - - if (this->release_) - freebuf (old); - } - - this->buffer_ = tmp; -} - -void -CORBA_ORB_RequestSeq::_deallocate_buffer (void) -{ - if (this->buffer_ == 0 || this->release_ == 0) - return; - - CORBA::Request_ptr *tmp = - ACE_reinterpret_cast (CORBA::Request_ptr *, - this->buffer_); - - freebuf (tmp); - - this->buffer_ = 0; -} - -CORBA::Request_ptr * -CORBA_ORB_RequestSeq::get_buffer (CORBA::Boolean orphan) -{ - CORBA::Request_ptr *result = 0; - - if (orphan == 0) - { - // We retain ownership. - if (this->buffer_ == 0) - { - result = allocbuf (this->length_); - this->buffer_ = result; - } - else - { - result = ACE_reinterpret_cast (CORBA::Request_ptr *, - this->buffer_); - } - } - else // if (orphan == 1) - { - if (this->release_ != 0) - { - // We set the state back to default and relinquish - // ownership. - result = ACE_reinterpret_cast(CORBA::Request_ptr *, - this->buffer_); - this->maximum_ = 0; - this->length_ = 0; - this->buffer_ = 0; - this->release_ = 0; - } - } - return result; -} - -const CORBA::Request_ptr * -CORBA_ORB_RequestSeq::get_buffer (void) const -{ - return ACE_reinterpret_cast (const CORBA::Request_ptr * ACE_CAST_CONST, - this->buffer_); -} - -void -CORBA_ORB_RequestSeq::replace (CORBA::ULong max, - CORBA::ULong length, - CORBA::Request_ptr *data, - CORBA::Boolean release) -{ - this->maximum_ = max; - this->length_ = length; - - if (this->buffer_ && this->release_ == 1) - { - CORBA::Request_ptr *tmp = - ACE_reinterpret_cast(CORBA::Request_ptr *, - this->buffer_); - freebuf (tmp); - } - - this->buffer_ = data; - this->release_ = release; -} -*/ #if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION) template class TAO_Unbounded_Pseudo_Sequence<CORBA_Request,CORBA_Request_var>; diff --git a/TAO/tao/Request.h b/TAO/tao/Request.h index d0d944b1a2a..1f6d34f2f57 100644 --- a/TAO/tao/Request.h +++ b/TAO/tao/Request.h @@ -31,10 +31,12 @@ #if !defined (TAO_HAS_MINIMUM_CORBA) #include "tao/corbafwd.h" +#include "tao/ORB.h" #include "tao/NVList.h" #include "tao/Environment.h" #include "tao/Context.h" #include "tao/Sequence.h" +#include "tao/MessagingC.h" class TAO_Export CORBA_Request { @@ -107,7 +109,7 @@ public: // A default argument is set, but please note that this not recommended // as the user may not be able to propagate the exceptions. - // NOT IMPLEMENTED - these next 3 will just throw CORBA::NO_IMPLEMENT. + // The 'deferred sunchronous' methods. void send_deferred (CORBA::Environment &ACE_TRY_ENV = CORBA::Environment::default_environment ()); void get_response (CORBA::Environment &ACE_TRY_ENV = @@ -115,6 +117,19 @@ public: CORBA::Boolean poll_response (CORBA::Environment &ACE_TRY_ENV = CORBA::Environment::default_environment ()); +#if defined (TAO_HAS_CORBA_MESSAGING) + +# if defined (TAO_HAS_AMI_CALLBACK) || defined (TAO_HAS_AMI_POLLER) + + void handle_response (TAO_InputCDR &incoming, + CORBA::ULong reply_status, + CORBA::Environment &ACE_TRY_ENV = + CORBA::Environment::default_environment ()); + +# endif /* TAO_HAS_AMI_CALLBACK || TAO_HAS_AMI_POLLER */ + +#endif /* TAO_HAS_CORBA_MESSAGING */ + // Pseudo object methods static CORBA_Request* _duplicate (CORBA_Request*); static CORBA_Request* _nil (void); @@ -138,6 +153,7 @@ private: // following are not allowed CORBA_Request (CORBA::Object_ptr obj, + CORBA::ORB_ptr orb, const CORBA::Char *op, CORBA::NVList_ptr args, CORBA::NamedValue_ptr result, @@ -146,6 +162,7 @@ private: TAO_default_environment ()); CORBA_Request (CORBA::Object_ptr obj, + CORBA::ORB_ptr orb, const CORBA::Char *op, CORBA::Environment &ACE_TRY_ENV = TAO_default_environment ()); @@ -155,6 +172,9 @@ private: CORBA::Object_ptr target_; // target object + CORBA::ORB_var orb_; + // Pointer to our ORB. + const CORBA::Char *opname_; // operation name @@ -182,11 +202,15 @@ private: CORBA::ULong refcount_; // reference counting - ACE_SYNCH_MUTEX refcount_lock_; - // protect the reference count + ACE_SYNCH_MUTEX lock_; + // protect the refcount_ and response_receieved_. int lazy_evaluation_; // If not zero then the NVList is not evaluated by default + + CORBA::Boolean response_received_; + // Set to TRUE upon completion of invoke() or + // handle_response(). }; typedef CORBA_Request* CORBA_Request_ptr; @@ -289,69 +313,8 @@ public: CORBA_ORB_RequestSeq (const CORBA_ORB_RequestSeq &); // Copy ctor, deep copies. - - //~CORBA_ORB_RequestSeq (void); - // dtor releases all the contained elements. }; -// This class definition should be removed.. But need to -// check with all the compiler guys before we have this removed - -/*class CORBA_ORB_RequestSeq : public TAO_Unbounded_Base_Sequence -{ -public: - - - // Default constructor. - CORBA_ORB_RequestSeq (void); - - // Constructor using a maximum length value. - CORBA_ORB_RequestSeq (CORBA::ULong maximum); - - // Constructor with all the sequence parameters. - CORBA_ORB_RequestSeq (CORBA::ULong maximum, - CORBA::ULong length, - CORBA::Request_ptr *data, - CORBA::Boolean release = 0); - - // Copy constructor. - CORBA_ORB_RequestSeq (const CORBA_ORB_RequestSeq &rhs); - - // Assignment operator. - CORBA_ORB_RequestSeq &operator= (const CORBA_ORB_RequestSeq &rhs); - - // Dtor. - ~CORBA_ORB_RequestSeq (void); - - // = Accessors. - CORBA::Request_ptr operator[] (CORBA::ULong i); - - const CORBA::Request* operator[] (CORBA::ULong i) const; - - // = Static operations. - - // Allocate storage for the sequence. - static CORBA::Request_ptr *allocbuf (CORBA::ULong size); - - // Free the sequence. - static void freebuf (CORBA::Request_ptr *buffer); - - virtual void _allocate_buffer (CORBA::ULong length); - - virtual void _deallocate_buffer (void); - - // Implement the TAO_Base_Sequence methods (see Sequence.h) - - CORBA::Request_ptr *get_buffer (CORBA::Boolean orphan = 0); - - const CORBA::Request_ptr *get_buffer (void) const; - - void replace (CORBA::ULong max, - CORBA::ULong length, - CORBA::Request_ptr *data, - CORBA::Boolean release); -}; -*/ class CORBA_ORB_RequestSeq_var { public: diff --git a/TAO/tao/Stub.cpp b/TAO/tao/Stub.cpp index 14ebb6012a3..2a98b5bae79 100644 --- a/TAO/tao/Stub.cpp +++ b/TAO/tao/Stub.cpp @@ -16,6 +16,7 @@ #include "tao/GIOP.h" #include "tao/NVList.h" #include "tao/Invocation.h" +#include "tao/Asynch_Invocation.h" #include "tao/ORB_Core.h" #include "tao/Client_Strategy_Factory.h" #include "tao/debug.h" @@ -195,7 +196,10 @@ TAO_Stub::is_equivalent (CORBA::Object_ptr other_obj) CORBA::ULong TAO_Stub::_incr_refcnt (void) { - ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, guard, this->refcount_lock_, 0); + ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, + guard, + this->refcount_lock_, + 0); return this->refcount_++; } @@ -204,7 +208,11 @@ CORBA::ULong TAO_Stub::_decr_refcnt (void) { { - ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, mon, this->refcount_lock_, 0); + ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, + mon, + this->refcount_lock_, + 0); + this->refcount_--; if (this->refcount_ != 0) return this->refcount_; @@ -279,8 +287,8 @@ private: void TAO_Stub::do_static_call (CORBA::Environment &ACE_TRY_ENV, - const TAO_Call_Data *info, - void** args) + const TAO_Call_Data *info, + void** args) { ACE_FUNCTION_TIMEPROBE (TAO_STUB_OBJECT_DO_STATIC_CALL_START); @@ -315,6 +323,7 @@ TAO_Stub::do_static_call (CORBA::Environment &ACE_TRY_ENV, { TAO_GIOP_Twoway_Invocation call (this, info->opname, this->orb_core_); + ACE_TIMEPROBE (TAO_STUB_OBJECT_DO_STATIC_CALL_INVOCATION_CTOR); // We may need to loop through here more than once if we're @@ -356,7 +365,8 @@ TAO_Stub::do_static_call (CORBA::Environment &ACE_TRY_ENV, return; // Shouldn't happen if (status != TAO_INVOKE_OK) - ACE_THROW (CORBA::UNKNOWN (TAO_DEFAULT_MINOR_CODE, CORBA::COMPLETED_MAYBE)); + ACE_THROW (CORBA::UNKNOWN (TAO_DEFAULT_MINOR_CODE, + CORBA::COMPLETED_MAYBE)); // The only case left is status == TAO_INVOKE_OK, exit the // loop. We cannot retry because at this point we either @@ -438,6 +448,7 @@ TAO_Stub::do_static_call (CORBA::Environment &ACE_TRY_ENV, // assert (value_size == tc->size()); ACE_NEW (*(void **)ptr, CORBA::Octet [pdp->value_size]); + (void) call.inp_stream ().decode (pdp->tc, *(void**)ptr, 0, @@ -451,6 +462,7 @@ TAO_Stub::do_static_call (CORBA::Environment &ACE_TRY_ENV, { TAO_GIOP_Oneway_Invocation call (this, info->opname, this->orb_core_); + ACE_TIMEPROBE (TAO_STUB_OBJECT_DO_STATIC_CALL_INVOCATION_CTOR); for (;;) @@ -458,13 +470,18 @@ TAO_Stub::do_static_call (CORBA::Environment &ACE_TRY_ENV, call.start (ACE_TRY_ENV); ACE_CHECK; - call.prepare_header (0, ACE_TRY_ENV); + call.prepare_header (0, + ACE_TRY_ENV); ACE_CHECK; - this->put_params (ACE_TRY_ENV, info, call, args); + this->put_params (ACE_TRY_ENV, + info, + call, + args); ACE_CHECK; ACE_TIMEPROBE (TAO_STUB_OBJECT_DO_STATIC_CALL_PUT_PARAMS); + int status = call.invoke (ACE_TRY_ENV); ACE_CHECK; @@ -475,7 +492,8 @@ TAO_Stub::do_static_call (CORBA::Environment &ACE_TRY_ENV, return; // Shouldn't happen if (status != TAO_INVOKE_OK) - ACE_THROW (CORBA::UNKNOWN (TAO_DEFAULT_MINOR_CODE, CORBA::COMPLETED_MAYBE)); + ACE_THROW (CORBA::UNKNOWN (TAO_DEFAULT_MINOR_CODE, + CORBA::COMPLETED_MAYBE)); break; } @@ -508,14 +526,23 @@ TAO_Stub::put_params (CORBA::Environment &ACE_TRY_ENV, if (pdp->mode == PARAM_IN) { - (void) cdr.encode (pdp->tc, ptr, 0, ACE_TRY_ENV); + (void) cdr.encode (pdp->tc, + ptr, + 0, + ACE_TRY_ENV); } else if (pdp->mode == PARAM_INOUT) { if (pdp->value_size == 0) - (void) cdr.encode (pdp->tc, ptr, 0, ACE_TRY_ENV); + (void) cdr.encode (pdp->tc, + ptr, + 0, + ACE_TRY_ENV); else - (void) cdr.encode (pdp->tc, *(void**)ptr, 0, ACE_TRY_ENV); + (void) cdr.encode (pdp->tc, + *(void**)ptr, + 0, + ACE_TRY_ENV); } ACE_CHECK; } @@ -543,7 +570,8 @@ TAO_Stub::do_dynamic_call (const char *opname, // be forwarded. No standard way now to know. if (this->use_locate_request_ && this->first_locate_request_) { - TAO_GIOP_Locate_Request_Invocation call (this, this->orb_core_); + TAO_GIOP_Locate_Request_Invocation call (this, + this->orb_core_); // Simply let these exceptions propagate up // (if any of them occurs.) @@ -558,7 +586,9 @@ TAO_Stub::do_dynamic_call (const char *opname, if (is_roundtrip) { - TAO_GIOP_Twoway_Invocation call (this, opname, this->orb_core_); + TAO_GIOP_Twoway_Invocation call (this, + opname, + this->orb_core_); // Loop as needed for forwarding; see above. @@ -567,15 +597,19 @@ TAO_Stub::do_dynamic_call (const char *opname, call.start (ACE_TRY_ENV); ACE_CHECK; - call.prepare_header (1, ACE_TRY_ENV); + call.prepare_header (1, + ACE_TRY_ENV); ACE_CHECK; - this->put_params (call, args, ACE_TRY_ENV); + this->put_params (call, + args, + ACE_TRY_ENV); ACE_CHECK; // Make the call ... blocking for the response. int status = - call.invoke (exceptions, ACE_TRY_ENV); + call.invoke (exceptions, + ACE_TRY_ENV); ACE_CHECK; if (status == TAO_INVOKE_RESTART) @@ -585,7 +619,8 @@ TAO_Stub::do_dynamic_call (const char *opname, return; // Shouldn't happen if (status != TAO_INVOKE_OK) - ACE_THROW (CORBA::UNKNOWN (TAO_DEFAULT_MINOR_CODE, CORBA::COMPLETED_MAYBE)); + ACE_THROW (CORBA::UNKNOWN (TAO_DEFAULT_MINOR_CODE, + CORBA::COMPLETED_MAYBE)); // The only case left is status == TAO_INVOKE_OK, exit the // loop. We cannot retry because at this point we either @@ -615,17 +650,22 @@ TAO_Stub::do_dynamic_call (const char *opname, } else { - TAO_GIOP_Oneway_Invocation call (this, opname, this->orb_core_); + TAO_GIOP_Oneway_Invocation call (this, + opname, + this->orb_core_); for (;;) { call.start (ACE_TRY_ENV); ACE_CHECK; - call.prepare_header (0, ACE_TRY_ENV); + call.prepare_header (0, + ACE_TRY_ENV); ACE_CHECK; - this->put_params (call, args, ACE_TRY_ENV); + this->put_params (call, + args, + ACE_TRY_ENV); ACE_CHECK; int status = call.invoke (ACE_TRY_ENV); @@ -646,6 +686,81 @@ TAO_Stub::do_dynamic_call (const char *opname, } } +#if defined (TAO_HAS_CORBA_MESSAGING) + +#if defined (TAO_HAS_AMI_CALLBACK) || defined (TAO_HAS_AMI_POLLER) + +void +TAO_Stub::do_deferred_call (const CORBA::Request_ptr req, + CORBA::Environment &ACE_TRY_ENV) +{ + + TAO_Synchronous_Cancellation_Required NOT_USED; + + // Do a locate_request if necessary/wanted. + // Suspect that you will be forwarded, so be proactive! + // strategy for reducing overhead when you think a request will + // be forwarded. No standard way now to know. + if (this->use_locate_request_ && this->first_locate_request_) + { + TAO_GIOP_Locate_Request_Invocation call (this, + this->orb_core_); + + // Simply let these exceptions propagate up + // (if any of them occurs.) + call.start (ACE_TRY_ENV); + ACE_CHECK; + + call.invoke (ACE_TRY_ENV); + ACE_CHECK; + + this->first_locate_request_ = 0; + } + + TAO_GIOP_DII_Deferred_Invocation call (this, + this->orb_core_, + req); + + // Loop as needed for forwarding; see above. + + for (;;) + { + call.start (ACE_TRY_ENV); + ACE_CHECK; + + call.prepare_header (1, + ACE_TRY_ENV); + ACE_CHECK; + + this->put_params (call, + req->arguments (), + ACE_TRY_ENV); + ACE_CHECK; + + // Make the call without blocking. + CORBA::ULong status = call.invoke (ACE_TRY_ENV); + ACE_CHECK; + + if (status == TAO_INVOKE_RESTART) + continue; + + if (status != TAO_INVOKE_OK) + ACE_THROW (CORBA::UNKNOWN (TAO_DEFAULT_MINOR_CODE, + CORBA::COMPLETED_MAYBE)); + + // The only case left is status == TAO_INVOKE_OK, exit the + // loop. We cannot retry because at this point we either + // got a reply or something with an status of + // COMPLETED_MAYBE, thus we cannot reissue the request if we + // are to satisfy the "at most once" semantics. + break; + } +} + +#endif /* TAO_HAS_AMI_CALLBACK || TAO_HAS_AMI_POLLER */ + +#endif /* TAO_HAS_CORBA_MESSAGING */ + void TAO_Stub::put_params (TAO_GIOP_Invocation &call, CORBA::NVList_ptr args, @@ -674,13 +789,13 @@ TAO_Stub::get_policy ( if (this->policies_ == 0) return CORBA::Policy::_nil (); - return this->policies_->get_policy (type, ACE_TRY_ENV); + return this->policies_->get_policy (type, + ACE_TRY_ENV); } CORBA::Policy_ptr -TAO_Stub::get_client_policy ( - CORBA::PolicyType type, - CORBA::Environment &ACE_TRY_ENV) +TAO_Stub::get_client_policy (CORBA::PolicyType type, + CORBA::Environment &ACE_TRY_ENV) { // No need to lock, the stub only changes its policies at // construction time... @@ -688,14 +803,16 @@ TAO_Stub::get_client_policy ( CORBA::Policy_var result; if (this->policies_ != 0) { - result = this->policies_->get_policy (type, ACE_TRY_ENV); + result = this->policies_->get_policy (type, + ACE_TRY_ENV); ACE_CHECK_RETURN (CORBA::Policy::_nil ()); } if (CORBA::is_nil (result.in ())) { TAO_Policy_Current &policy_current = this->orb_core_->policy_current (); - result = policy_current.get_policy (type, ACE_TRY_ENV); + result = policy_current.get_policy (type, + ACE_TRY_ENV); ACE_CHECK_RETURN (CORBA::Policy::_nil ()); } @@ -708,14 +825,16 @@ TAO_Stub::get_client_policy ( this->orb_core_->policy_manager (); if (policy_manager != 0) { - result = policy_manager->get_policy (type, ACE_TRY_ENV); + result = policy_manager->get_policy (type, + ACE_TRY_ENV); ACE_CHECK_RETURN (CORBA::Policy::_nil ()); } } if (CORBA::is_nil (result.in ())) { - result = this->orb_core_->get_default_policy (type, ACE_TRY_ENV); + result = this->orb_core_->get_default_policy (type, + ACE_TRY_ENV); ACE_CHECK_RETURN (CORBA::Policy::_nil ()); } @@ -840,14 +959,14 @@ TAO_Stub::set_policy_overrides ( } CORBA::PolicyList * -TAO_Stub::get_policy_overrides ( - const CORBA::PolicyTypeSeq & types, - CORBA::Environment &ACE_TRY_ENV) +TAO_Stub::get_policy_overrides (const CORBA::PolicyTypeSeq &types, + CORBA::Environment &ACE_TRY_ENV) { if (this->policies_ == 0) return 0; - return this->policies_->get_policy_overrides (types, ACE_TRY_ENV); + return this->policies_->get_policy_overrides (types, + ACE_TRY_ENV); } CORBA::Boolean diff --git a/TAO/tao/Stub.h b/TAO/tao/Stub.h index 472f3df3b48..eb0714595cd 100644 --- a/TAO/tao/Stub.h +++ b/TAO/tao/Stub.h @@ -268,6 +268,18 @@ public: // - exceptions ... list of legal user-defined exceptions // - ACE_TRY_ENV ... used for exception reporting. +#if defined (TAO_HAS_CORBA_MESSAGING) + +# if defined (TAO_HAS_AMI_CALLBACK) || defined (TAO_HAS_AMI_POLLER) + + void do_deferred_call (const CORBA::Request_ptr req, + CORBA_Environment &ACE_TRY_ENV = + TAO_default_environment ()); + +# endif /* TAO_HAS_AMI_CALLBACK || TAO_HAS_AMI_POLLER */ + +#endif /* TAO_HAS_CORBA_MESSAGING */ + #endif /* TAO_HAS_MINIMUM_CORBA */ #if defined (TAO_HAS_CORBA_MESSAGING) @@ -456,10 +468,10 @@ private: // NON-THREAD-SAFE. utility method for next_profile. private: - TAO_MProfile base_profiles_; + TAO_MProfile base_profiles_; // ordered list of profiles for this object. - TAO_MProfile *forward_profiles_; + TAO_MProfile *forward_profiles_; // The list of forwarding profiles. This is actually iimplemented as a // linked list of TAO_MProfile objects. |