diff options
author | bala <balanatarajan@users.noreply.github.com> | 1999-12-10 00:12:33 +0000 |
---|---|---|
committer | bala <balanatarajan@users.noreply.github.com> | 1999-12-10 00:12:33 +0000 |
commit | ed559b78780792371cec9d44f16512f93cfdc77d (patch) | |
tree | 5793fa9a87e58911819dc1868a675eefad86af7d | |
parent | bfca847446a661564fde0128808d992ee2964a0b (diff) | |
download | ATCD-ed559b78780792371cec9d44f16512f93cfdc77d.tar.gz |
*** empty log message ***
-rw-r--r-- | TAO/tao/GIOP_Message_Factory.cpp | 204 | ||||
-rw-r--r-- | TAO/tao/GIOP_Message_Factory.h | 70 | ||||
-rw-r--r-- | TAO/tao/IIOP_Connect.cpp | 6 | ||||
-rw-r--r-- | TAO/tao/IIOP_Connect.h | 8 | ||||
-rw-r--r-- | TAO/tao/IIOP_Connector.h | 2 | ||||
-rw-r--r-- | TAO/tao/IIOP_Transport.cpp | 110 | ||||
-rw-r--r-- | TAO/tao/IIOP_Transport.h | 23 | ||||
-rw-r--r-- | TAO/tao/Invocation.cpp | 18 | ||||
-rw-r--r-- | TAO/tao/Invocation.h | 13 | ||||
-rw-r--r-- | TAO/tao/Pluggable.cpp | 80 | ||||
-rw-r--r-- | TAO/tao/Pluggable.h | 31 | ||||
-rw-r--r-- | TAO/tao/Pluggable_Messaging.h | 42 | ||||
-rw-r--r-- | TAO/tao/UIOP_Acceptor.cpp | 2 | ||||
-rw-r--r-- | TAO/tao/UIOP_Connect.cpp | 8 | ||||
-rw-r--r-- | TAO/tao/UIOP_Connector.cpp | 4 | ||||
-rw-r--r-- | TAO/tao/UIOP_Profile.cpp | 12 | ||||
-rw-r--r-- | TAO/tao/UIOP_Transport.cpp | 116 | ||||
-rw-r--r-- | TAO/tao/UIOP_Transport.h | 19 | ||||
-rw-r--r-- | TAO/tao/corbafwd.h | 17 |
19 files changed, 599 insertions, 186 deletions
diff --git a/TAO/tao/GIOP_Message_Factory.cpp b/TAO/tao/GIOP_Message_Factory.cpp index 9ddba82937c..2b62bc0b6be 100644 --- a/TAO/tao/GIOP_Message_Factory.cpp +++ b/TAO/tao/GIOP_Message_Factory.cpp @@ -1,8 +1,52 @@ //$Id$ -#include "GIOP_Message.h" +// @(#)giop.cpp 1.10 95/09/21 +// Copyright 1994-1995 by Sun Microsystems Inc. +// All Rights Reserved +// +// GIOP: Utility routines for sending, receiving GIOP messages +// +// Note that the Internet IOP is just the TCP-specific mapping of the +// General IOP. Areas where other protocols may map differently +// include use of record streams (TCP has none), orderly disconnect +// (TCP has it), endpoint addressing (TCP uses host + port), security +// (Internet security should be leveraged by IIOP) and more. +// +// NOTE: There are a few places where this code knows that it's really +// talking IIOP instead of GIOP. No rush to fix this so long as we +// are really not running atop multiple connection protocols. +// +// THREADING NOTE: currently, the connection manager eliminates tricky +// threading issues by providing this code with the same programming +// model both in threaded and unthreaded environments. Since the GIOP +// APIs were all designed to be reentrant, this makes threading rather +// simple! +// +// That threading model is that the thread making (or handling) a call +// is given exclusive access to a connection for the duration of a +// call, so that no multiplexing or demultiplexing is needed. That +// is, locking is at the "connection level" rather than "message +// level". -#if defined (__ACE_INLINE__) -# include "tao/Pluggable_Messaging.i" +// The down side of this simple threading model is that utilization of +// system resources (mostly connections, but to some extent network +// I/O) in some kinds of environments can be inefficient. However, +// simpler threading models are much easier to get properly debugged, +// and often perform better. Also, such environments haven't been +// seen to be any kind of problem; the model can be changed later if +// needed, it's just an internal implementation detail. Any portable +// ORB client is not allowed to rely on semantic implications of such +// a model. +// +// @@ there is lots of unverified I/O here. In all cases, if an +// error is detected when marshaling or unmarshaling, it should be +// reported. + +#include "tao/GIOP_Message_Factory.h" +#include "tao/Any.h" + + +#if !defined (__ACE_INLINE__) +# include "tao/GIOP_Message_Factory.i" #endif /* __ACE_INLINE__ */ TAO_GIOP_Message_Factory::TAO_GIOP_Message_Factory (void) @@ -13,7 +57,7 @@ TAO_GIOP_Message_Factory::~TAO_GIOP_Message_Factory (void) { } -CORBA::Boolean +/*CORBA::Boolean TAO_GIOP_Message_Factory::start_message (const TAO_GIOP_Version &version, TAO_GIOP_Message_Factory::Message_Type t, TAO_OutputCDR &msg) @@ -43,3 +87,155 @@ TAO_GIOP_Message_Factory::start_message (const TAO_GIOP_Version &version, return 1; } +*/ + +CORBA::Boolean +TAO_GIOP_Message_Factory::write_request_header (const IOP::ServiceContextList& /*svc_ctx*/, + CORBA::ULong request_id, + CORBA::Octet response_flags, + TAO_Stub */*stub*/, + const CORBA::Short address_disposition, + const char */*opname*/, + TAO_OutputCDR &msg) +{ + // Adding only stuff that are common to all versions of GIOP. + // @@ Note: If at any stage we feel that this amount of granularity + // for factorisation is not important we can dispense with it + // anyway. + + // First the request id + msg << request_id; + + // Second the response flags + switch (response_flags) + { + // We have to use magic numbers as the actual variables are + // declared as const short. They cannot be used in switch case + // statements. Probably what we can do is to add them as an + // definitions in this class or in the parent and then use the + // definitions. Hmm. Good idea.. But we will live with this for + // the time being. + case 0: // SYNC_NONE + case 1: // SYNC_WITH_TRANSPORT + case 4: // This one corresponds to the TAO extension SYNC_FLUSH. + // No response required. + msg << CORBA::Any::from_octet (0); + break; + case 2: // SYNC_WITH_SERVER + // Return before dispatching servant. + msg << CORBA::Any::from_octet (1); + break; + case 3: // SYNC_WITH_TARGET + // Return after dispatching servant. + msg << CORBA::Any::from_octet (3); + break; + // Some cases for the DII are missing here. We can add that once + // our IDL compiler starts supporting those stuff. This is + // specific to GIOP 1.2. So some of the services would start + // using this at some point of time and we will have them here + // naturally out of a need. + + default: + // Until more flags are defined by the OMG. + return 0; + } + + return 1; +} + + + + +TAO_GIOP_Message_Factory::send_message (TAO_Transport *transport, + TAO_OutputCDR &stream, + ACE_Time_Value *max_wait_time = 0, + TAO_Stub *stub = 0) +{ + // TAO_FUNCTION_PP_TIMEPROBE (TAO_GIOP_SEND_MESSAGE_START); + + // Ptr to first buffer. + char *buf = (char *) stream.buffer (); + + // Length of all buffers. + size_t total_len = + stream.total_length (); + + // NOTE: Here would also be a fine place to calculate a digital + // signature for the message and place it into a preallocated slot + // in the "ServiceContext". Similarly, this is a good spot to + // encrypt messages (or just the message bodies) if that's needed in + // this particular environment and that isn't handled by the + // networking infrastructure (e.g., IPSEC). + + // Get the header length + size_t header_len = this->get_header_len (); + + // Get the message size offset + size_t offset = this->get_message_size_offset (); + + CORBA::ULong bodylen = total_len - header_len; + +#if !defined (ACE_ENABLE_SWAP_ON_WRITE) + *ACE_reinterpret_cast (CORBA::ULong *, buf + offset) = bodylen; +#else + if (!stream->do_byte_swap ()) + *ACE_reinterpret_cast (CORBA::ULong *, + buf + offset) = bodylen; + else + ACE_CDR::swap_4 (ACE_reinterpret_cast (char *, + &bodylen), + buf + offset); +#endif /* ACE_ENABLE_SWAP_ON_WRITE */ + + // Strictly speaking, should not need to loop here because the + // socket never gets set to a nonblocking mode ... some Linux + // versions seem to need it though. Leaving it costs little. + this->dump_msg ("send", + ACE_reinterpret_cast (u_char *, + buf), + stream.length ()); + + // This guarantees to send all data (bytes) or return an error. + ssize_t n = transport->send (stub, + stream.begin (), + max_wait_time); + + if (n == -1) + { + if (TAO_orbdebug) + ACE_DEBUG ((LM_DEBUG, + "TAO: (%P|%t) closing conn %d after fault %p\n", + transport->handle (), + "GIOP_Message_Factory::send_message ()")); + + return -1; + } + + // EOF. + if (n == 0) + { + if (TAO_orbdebug) + ACE_DEBUG ((LM_DEBUG, + "TAO: (%P|%t) GIOP::send_message () " + "EOF, closing conn %d\n", + transport->handle())); + return -1; + } + + return 1; +} + +void +TAO_GIOP_Message_Factory::dump_msg (const char */*label*/, + const u_char */*ptr*/, + size_t /*len*/) +{ + if (TAO_debug_level >= 5) + { + // I will have to print out all the relevant debug messages!! + // Let me not wory about that now. I will get back to that at a + // later date!! + } +} + + diff --git a/TAO/tao/GIOP_Message_Factory.h b/TAO/tao/GIOP_Message_Factory.h index 3eb257a6ee0..e771fc5c800 100644 --- a/TAO/tao/GIOP_Message_Factory.h +++ b/TAO/tao/GIOP_Message_Factory.h @@ -13,15 +13,18 @@ // Interface for the GIOP messaging protocol // // = AUTHOR +// Copyright 1994-1995 by Sun Microsystems Inc., // Balachandran Natarajan <bala@cs.wustl.edu> // // ============================================================================ #ifndef _TAO_GIOP_MESSAGE_H_ #define _TAO_GIOP_MESSAGE_H_ +#include "tao/Pluggable_Messaging.h" +#include "tao/debug.h" -class TAO_Export TAO_GIOP_Message_Factory :public TAO_Pluggable_Message +class TAO_Export TAO_GIOP_Message_Factory :public TAO_Pluggable_Message_Factory { // = TITLE // Definitions of GIOP specific stuff @@ -30,6 +33,9 @@ class TAO_Export TAO_GIOP_Message_Factory :public TAO_Pluggable_Message // This class will hold the specific details common to all the // GIOP versions. Some of them which are here may be shifted if // things start changing between versions + + // IMPORTANT: This code was based on the GIOP.h & GIOP.cpp + public: TAO_GIOP_Message_Factory (void); @@ -55,37 +61,63 @@ class TAO_Export TAO_GIOP_Message_Factory :public TAO_Pluggable_Message }; - virtual CORBA::Boolean start_message (const TAO_GIOP_Version &version, - TAO_GIOP_Message_Factory::Message_Type t, - TAO_OutputCDR &msg); - // Build the header for a message of type <t> into stream - // <msg>. Other GIOP related protocols that do not use this can - // override this. Like GIOP_lite - + /**********************************************************/ + // Methods related to the messages that would be sent by the client + /**********************************************************/ virtual CORBA::Boolean write_request_header (const IOP::ServiceContextList& svc_ctx, CORBA::ULong request_id, CORBA::Octet response_flags, - const TAO_opaque& key, + TAO_Stub *stub, + const CORBA::Short address_disposition, const char* opname, - CORBA::Principal_ptr principal, - TAO_OutputCDR &msg, - TAO_ORB_Core *orb_core); + TAO_OutputCDR &msg); + // Write the GIOP request header. CORBA::Boolean write_locate_request_header (CORBA::ULong request_id, - const TAO_opaque &key, - TAO_OutputCDR &msg); + TAO_Stub *stub, + const CORBA::Short address_disposition, + TAO_OutputCDR &msg) = 0; // Write the GIOP locate request header. - int send_message (TAO_Transport *transport, - TAO_OutputCDR &stream, - TAO_ORB_Core* orb_core, - ACE_Time_Value *max_wait_time = 0, - TAO_Stub *stub = 0); + + virtual int send_message (TAO_Transport *transport, + TAO_OutputCDR &stream, + ACE_Time_Value *max_wait_time = 0, + TAO_Stub *stub = 0); // Send message, returns TRUE if success, else FALSE. + virtual const size_t get_header_len (void) = 0; + // This will give the size of the header for different variants of + // GIOP. + + virtual const size_t get_message_size_offset (void) = 0; + // This will give the message_size offset as specified by different + // variants of GIOP + + void dump_msg (const char *label, + const u_char *ptr, + size_t len); + // Print out a debug messages.. + + // We need to add a Cancel request. But TAO does not support. + + + /**********************************************************/ + // Methods related to the messages that would be sent by the server. + /**********************************************************/ + //virtual CORBA::Boolean start_message (const TAO_GIOP_Version &version, + // TAO_GIOP_Message_Factory::Message_Type t, + // TAO_OutputCDR &msg); + // Build the header for a message of type <t> into stream + // <msg>. Other GIOP related protocols that do not use this can + // override this. Like GIOP_lite private: }; +#if defined (__ACE_INLINE__) +# include "tao/GIOP_Message_Factory.i" +#endif /* __ACE_INLINE__ */ + #endif /*_TAO_GIOP_MESSAGE_H_*/ diff --git a/TAO/tao/IIOP_Connect.cpp b/TAO/tao/IIOP_Connect.cpp index dfae9b98595..40d83cd8ed8 100644 --- a/TAO/tao/IIOP_Connect.cpp +++ b/TAO/tao/IIOP_Connect.cpp @@ -331,6 +331,12 @@ TAO_IIOP_Client_Connection_Handler (ACE_Thread_Manager *t, transport_ (this, orb_core), orb_core_ (orb_core) { + // OK, Here is a small twist. By now the all the objecs cached in + // this class would have been constructed. But we would like to make + // the one of the objects, precisely the transport object a pointer + // to the Messaging object. So, we set this up properly by calling + // the messaging_init method on the transport. + this->transport_.messaging_init (& this->message_factory_); } TAO_IIOP_Client_Connection_Handler::~TAO_IIOP_Client_Connection_Handler (void) diff --git a/TAO/tao/IIOP_Connect.h b/TAO/tao/IIOP_Connect.h index 1f58f5d2ab3..07112a87299 100644 --- a/TAO/tao/IIOP_Connect.h +++ b/TAO/tao/IIOP_Connect.h @@ -33,6 +33,9 @@ #include "tao/IIOP_Transport.h" +// BALA Temporray include +#include "tao/GIOP_Message_1_1.h" + // Forward Decls class TAO_ORB_Core; class TAO_ORB_Core_TSS_Resources; @@ -100,6 +103,11 @@ protected: TAO_ORB_Core *orb_core_; // Cached ORB Core. + + //@@Added by Bala for the time being. This would change to the + // actual factory at a later date + TAO_GIOP_Message_1_1 message_factory_; + // ///////////////////// }; // **************************************************************** diff --git a/TAO/tao/IIOP_Connector.h b/TAO/tao/IIOP_Connector.h index 569619ce7b5..830f3aab592 100644 --- a/TAO/tao/IIOP_Connector.h +++ b/TAO/tao/IIOP_Connector.h @@ -191,6 +191,8 @@ private: TAO_IIOP_BASE_CONNECTOR base_connector_; // The connector initiating connection requests for IIOP. + + #if defined (TAO_USES_ROBUST_CONNECTION_MGMT) TAO_CACHED_CONNECT_STRATEGY *cached_connect_strategy_; // Cached connect strategy. diff --git a/TAO/tao/IIOP_Transport.cpp b/TAO/tao/IIOP_Transport.cpp index afd9d291b68..de90e56e6ac 100644 --- a/TAO/tao/IIOP_Transport.cpp +++ b/TAO/tao/IIOP_Transport.cpp @@ -48,7 +48,7 @@ ACE_TIMEPROBE_EVENT_DESCRIPTIONS (TAO_Transport_Timeprobe_Description, TAO_IIOP_Transport::TAO_IIOP_Transport (TAO_IIOP_Handler_Base *handler, TAO_ORB_Core *orb_core) - : TAO_Transport (TAO_IOP_TAG_INTERNET_IOP, + : TAO_Transport (TAO_TAG_IIOP_PROFILE, orb_core), handler_ (handler) { @@ -56,7 +56,12 @@ TAO_IIOP_Transport::TAO_IIOP_Transport (TAO_IIOP_Handler_Base *handler, TAO_IIOP_Transport::~TAO_IIOP_Transport (void) { - this->flush_buffered_messages (); + // Cannot deal with errors, and therefore they are ignored. + this->send_buffered_messages (); + + // Note that it also doesn't matter how much of the data was + // actually sent. + this->dequeue_all (); } TAO_IIOP_Handler_Base *& @@ -89,6 +94,11 @@ TAO_IIOP_Transport::event_handler (void) return this->handler_; } +void +TAO_IIOP_Transport::messaging_init (TAO_Pluggable_Message_Factory *mesg) +{ + this->giop_factory_ = mesg; +} // **************************************************************** TAO_IIOP_Server_Transport:: @@ -330,67 +340,10 @@ TAO_IIOP_Transport::send (const ACE_Message_Block *message_block, ACE_Time_Value *max_wait_time) { TAO_FUNCTION_PP_TIMEPROBE (TAO_IIOP_TRANSPORT_SEND_START); - - // @@ This code should be refactored into ACE.cpp or something - // similar! - - // For the most part this was copied from GIOP::send_request and - // friends. - - iovec iov[IOV_MAX]; - int iovcnt = 0; - ssize_t n = 0; - ssize_t nbytes = 0; - - for (const ACE_Message_Block *i = message_block; - i != 0; - i = i->cont ()) - { - // Make sure there is something to send! - if (i->length () > 0) - { - iov[iovcnt].iov_base = i->rd_ptr (); - iov[iovcnt].iov_len = i->length (); - iovcnt++; - - // The buffer is full make a OS call. @@ TODO this should - // be optimized on a per-platform basis, for instance, some - // platforms do not implement writev() there we should copy - // the data into a buffer and call send_n(). In other cases - // there may be some limits on the size of the iovec, there - // we should set IOV_MAX to that limit. - if (iovcnt == IOV_MAX) - { - if (max_wait_time == 0) - n = this->handler_->peer ().sendv_n (iov, - iovcnt); - else - n = ACE::writev (this->handler_->peer ().get_handle (), - iov, - iovcnt, - max_wait_time); - - if (n <= 0) - return n; - - nbytes += n; - iovcnt = 0; - } - } - } - - // Check for remaining buffers to be sent! - if (iovcnt != 0) - { - n = this->handler_->peer ().sendv_n (iov, - iovcnt); - if (n < 1) - return n; - - nbytes += n; - } - - return nbytes; + + return ACE::send_n (this->handle (), + message_block, + max_wait_time); } ssize_t @@ -410,10 +363,9 @@ TAO_IIOP_Transport::recv (char *buf, { TAO_FUNCTION_PP_TIMEPROBE (TAO_IIOP_TRANSPORT_RECEIVE_START); - return ACE::recv_n (this->handler_->peer ().get_handle (), - buf, - len, - max_wait_time); + return this->handler_->peer ().recv_n (buf, + len, + max_wait_time); } // Default action to be taken for send request. @@ -426,3 +378,27 @@ TAO_IIOP_Transport::send_request (TAO_Stub *, { return -1; } + +CORBA::Boolean +TAO_IIOP_Transport::send_request_header (const IOP::ServiceContextList & svc_ctx, + CORBA::ULong request_id, + CORBA::Octet response_flags, + TAO_Stub *stub, + const CORBA::Short address_disposition, + const char* opname, + TAO_OutputCDR & msg) +{ + // We are going to pass on this request to the underlying messaging + // layer. It should take care of this request + CORBA::Boolean retval = + this->giop_factory_->write_request_header (svc_ctx, + request_id, + response_flags, + stub, + address_disposition, + opname, + msg); + + return retval; +} + diff --git a/TAO/tao/IIOP_Transport.h b/TAO/tao/IIOP_Transport.h index a82205a4717..403a74a1307 100644 --- a/TAO/tao/IIOP_Transport.h +++ b/TAO/tao/IIOP_Transport.h @@ -22,6 +22,10 @@ #include "tao/Pluggable.h" +// BALA Temporrary inclusion +#include "tao/Pluggable_Messaging.h" + + #if !defined (ACE_LACKS_PRAGMA_ONCE) # pragma once #endif /* ACE_LACKS_PRAGMA_ONCE */ @@ -77,11 +81,29 @@ public: TAO_OutputCDR &stream, int twoway, ACE_Time_Value *max_wait_time); + + CORBA::Boolean + send_request_header (const IOP::ServiceContextList &svc_ctx, + CORBA::ULong request_id, + CORBA::Octet response_flags, + TAO_Stub *stub, + const short address_disposition, + const char* opname, + TAO_OutputCDR &msg); + + void messaging_init (TAO_Pluggable_Message_Factory *mesg); + // Initialising the messaging object protected: TAO_IIOP_Handler_Base *handler_; // the connection service handler used for accessing lower layer // communication protocols. + + TAO_Pluggable_Message_Factory *giop_factory_; + // The message_factor instance specific for this particular + // transport protocol. + + // @@ Shouldn't this be TAO_Pluggable_message_Factory?? }; class TAO_Export TAO_IIOP_Client_Transport : public TAO_IIOP_Transport @@ -125,6 +147,7 @@ public: TAO_OutputCDR &stream, int twoway, ACE_Time_Value *max_wait_time); + virtual int handle_client_input (int block = 0, ACE_Time_Value *max_time_value = 0); virtual int register_handler (void); diff --git a/TAO/tao/Invocation.cpp b/TAO/tao/Invocation.cpp index 83ebca3edd6..fed83ab27eb 100644 --- a/TAO/tao/Invocation.cpp +++ b/TAO/tao/Invocation.cpp @@ -331,19 +331,17 @@ TAO_GIOP_Invocation::prepare_header (CORBA::Octet response_flags, // unverified user ID, and then verifying the message (i.e. a dummy // service context entry is set up to hold a digital signature for // this message, then patched shortly before it's sent). - static CORBA::Principal_ptr principal = 0; - - if (TAO_GIOP::write_request_header (this->service_info_, - this->request_id_, - response_flags, - this->profile_->object_key (), - this->opname_, - principal, - this->out_stream_, - this->orb_core_) == 0) + if (this->transport_->send_request_header (this->service_info_, + this->request_id_, + response_flags, + this->stub_, + TAO_GIOP_Invocation::Key_Addr, + this->opname_, + this->out_stream_) == 0) ACE_THROW (CORBA::MARSHAL ()); } + // Send request. int TAO_GIOP_Invocation::invoke (CORBA::Boolean is_roundtrip, diff --git a/TAO/tao/Invocation.h b/TAO/tao/Invocation.h index c95bfc79e1f..1bc5b10e5ad 100644 --- a/TAO/tao/Invocation.h +++ b/TAO/tao/Invocation.h @@ -83,6 +83,19 @@ public: // they want to. All the synchronous invocations <idle> the // Transport, but asynchronous invocations do not do that. + enum TAO_Target_Address + { + Key_Addr = 0, + Profile_Addr, + Reference_Addr + }; + // This enum is basically a equivalent of the addressing mechanism + // as defined by the GIOP classes. As things would be bad to get the + // GIOP specific details in to this class, we have this enum. Now it + // would like all the Messaging layers can hack on this addressing + // scheme. This would also prevent magic numbers to float around and + // this is the our main aim. + void prepare_header (CORBA::Octet response_flags, CORBA_Environment &ACE_TRY_ENV = TAO_default_environment ()) diff --git a/TAO/tao/Pluggable.cpp b/TAO/tao/Pluggable.cpp index 612072bb6b7..cca58ec2435 100644 --- a/TAO/tao/Pluggable.cpp +++ b/TAO/tao/Pluggable.cpp @@ -51,6 +51,86 @@ TAO_Transport::~TAO_Transport (void) delete this->buffering_queue_; } +ssize_t +TAO_Transport::send_buffered_messages (const ACE_Time_Value *max_wait_time) +{ + // Make sure we have a buffering queue and there are messages in it. + if (this->buffering_queue_ == 0 || + this->buffering_queue_->is_empty ()) + return 0; + + // Get the first message from the queue. + ACE_Message_Block *queued_message = 0; + ssize_t result = this->buffering_queue_->peek_dequeue_head (queued_message); + + // @@ What to do here on failures? + ACE_ASSERT (result != -1); + + // Actual network send. + result = this->send (queued_message, + max_wait_time); + + // Socket closed. + if (result == 0) + { + this->dequeue_all (); + return -1; + } + + // Cannot send. + if (result == -1) + { + // Timeout. + if (errno == ETIME) + { + // Since we queue up the message, this is not an error. We + // can try next time around. + return 0; + } + // Non-timeout error. + else + { + this->dequeue_all (); + return -1; + } + } + + // If successful in sending some or all of the data, reset the queue + // appropriately. + this->reset_queued_message (queued_message, + result); + + // Indicate success. + return result; +} + +void +TAO_Transport::dequeue_head (void) +{ + // Remove from the head of the queue. + ACE_Message_Block *message_block = 0; + int result = this->buffering_queue_->dequeue_head (message_block); + + // @@ What to do here on failures? + ACE_ASSERT (result != -1); + ACE_UNUSED_ARG (result); + + // Release the memory. + message_block->release (); +} + +void +TAO_Transport::dequeue_all (void) +{ + // Flush all queued messages. + if (this->buffering_queue_) + { + while (!this->buffering_queue_->is_empty ()) + this->dequeue_head (); + } +} + + void TAO_Transport::flush_buffered_messages (void) { diff --git a/TAO/tao/Pluggable.h b/TAO/tao/Pluggable.h index 84dd99368ad..af77ea67a2b 100644 --- a/TAO/tao/Pluggable.h +++ b/TAO/tao/Pluggable.h @@ -31,6 +31,7 @@ #include "tao/Typecode.h" #include "tao/IOPC.h" + // Forward declarations. class ACE_Addr; class ACE_Reactor; @@ -45,6 +46,8 @@ class TAO_Reply_Dispatcher; class TAO_Transport_Mux_Strategy; class TAO_Wait_Strategy; +class TAO_Pluggable_Message_Factory; + typedef ACE_Message_Queue<ACE_NULL_SYNCH> TAO_Transport_Buffering_Queue; class TAO_Export TAO_Transport @@ -105,6 +108,13 @@ public: // not clear this this is the best place to specify this. The actual // timeout values will be kept in the Policies. + virtual void messaging_init + (TAO_Pluggable_Message_Factory *factory) = 0; + // This is the method that would set the messaging object on the + // transport object. The transport object would be generally + // ignorant of the undrelying messaging object. This method sets a + // pointer to the right messaging object. + virtual void start_request (TAO_ORB_Core *orb_core, const TAO_Profile *profile, TAO_OutputCDR &output, @@ -133,6 +143,17 @@ public: // Using this method, instead of send(), allows the transport (and // wait strategy) to take appropiate action. + virtual CORBA::Boolean + send_request_header (const IOP::ServiceContextList &svc_ctx, + CORBA::ULong request_id, + CORBA::Octet response_flags, + TAO_Stub *stub, + const short address_disposition, + const char* opname, + TAO_OutputCDR &msg) = 0; + // This is a request for the transport object to write a request + // header before it sends out a request + TAO_ORB_Core *orb_core (void) const; // Access the ORB that owns this connection. @@ -181,7 +202,17 @@ public: void flush_buffered_messages (void); // Flush any messages that have been buffered. + ssize_t send_buffered_messages (const ACE_Time_Value *max_wait_time = 0); + // Send any messages that have been buffered. protected: + + void dequeue_head (void); + + void dequeue_all (void); + + void reset_queued_message (ACE_Message_Block *message_block, + size_t bytes_delivered); + CORBA::ULong tag_; // IOP protocol tag. diff --git a/TAO/tao/Pluggable_Messaging.h b/TAO/tao/Pluggable_Messaging.h index b7421437558..6510c495d3e 100644 --- a/TAO/tao/Pluggable_Messaging.h +++ b/TAO/tao/Pluggable_Messaging.h @@ -18,6 +18,8 @@ // ============================================================================ #ifndef _TAO_PLUGGABLE_MESSAGE_H_ #define _TAO_PLUGGABLE_MESSAGE_H_ +#include "tao/corbafwd.h" +#include "tao/Pluggable.h" class TAO_Export TAO_Pluggable_Message_Factory { @@ -26,6 +28,8 @@ class TAO_Export TAO_Pluggable_Message_Factory // // = DESCRIPTION // + +public: TAO_Pluggable_Message_Factory (void); // Ctor @@ -33,6 +37,44 @@ class TAO_Export TAO_Pluggable_Message_Factory virtual ~TAO_Pluggable_Message_Factory (void); // Dtor + /**********************************************************/ + // Methods related to the messages that would be sent by the client + /**********************************************************/ + virtual CORBA::Boolean write_request_header (const IOP::ServiceContextList& svc_ctx, + CORBA::ULong request_id, + CORBA::Octet response_flags, + TAO_Stub *stub, + const CORBA::Short address_disposition, + const char* opname, + TAO_OutputCDR &msg) = 0; + + // Write the GIOP request header. + + virtual CORBA::Boolean write_locate_request_header (CORBA::ULong request_id, + TAO_Stub *stub, + const CORBA::Short address_disposition, + TAO_OutputCDR &msg) = 0; + // Write the GIOP locate request header. + + + virtual int send_message (TAO_Transport *transport, + TAO_OutputCDR &stream, + ACE_Time_Value *max_wait_time = 0, + TAO_Stub *stub = 0) = 0; + // Send message, returns TRUE if success, else FALSE. + + + + /**********************************************************/ + // Methods related to the messages that would be sent by the server. + /**********************************************************/ + //virtual CORBA::Boolean start_message (const TAO_GIOP_Version &version, + // TAO_GIOP_Message_Factory::Message_Type t, + // TAO_OutputCDR &msg); + // Build the header for a message of type <t> into stream + // <msg>. Other GIOP related protocols that do not use this can + // override this. Like GIOP_lite + }; diff --git a/TAO/tao/UIOP_Acceptor.cpp b/TAO/tao/UIOP_Acceptor.cpp index 4db6194d264..edd50b31a82 100644 --- a/TAO/tao/UIOP_Acceptor.cpp +++ b/TAO/tao/UIOP_Acceptor.cpp @@ -57,7 +57,7 @@ template class TAO_Accept_Strategy<TAO_UIOP_Server_Connection_Handler, ACE_LSOCK #endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */ TAO_UIOP_Acceptor::TAO_UIOP_Acceptor (void) - : TAO_Acceptor (TAO_IOP_TAG_UNIX_IOP), + : TAO_Acceptor (TAO_TAG_UIOP_PROFILE), base_acceptor_ (), creation_strategy_ (0), concurrency_strategy_ (0), diff --git a/TAO/tao/UIOP_Connect.cpp b/TAO/tao/UIOP_Connect.cpp index 73a4ff1e417..66bed11894a 100644 --- a/TAO/tao/UIOP_Connect.cpp +++ b/TAO/tao/UIOP_Connect.cpp @@ -397,8 +397,12 @@ int TAO_UIOP_Client_Connection_Handler::handle_timeout (const ACE_Time_Value &, const void *) { - // Called when buffering timer expires. - this->transport ()->flush_buffered_messages (); + // + // This method is called when buffering timer expires. + // + + // Cannot deal with errors, and therefore they are ignored. + this->transport ()->send_buffered_messages (); return 0; } diff --git a/TAO/tao/UIOP_Connector.cpp b/TAO/tao/UIOP_Connector.cpp index 30f7d1c1cc5..02c3c851c70 100644 --- a/TAO/tao/UIOP_Connector.cpp +++ b/TAO/tao/UIOP_Connector.cpp @@ -338,7 +338,7 @@ typedef ACE_Cached_Connect_Strategy<TAO_UIOP_Client_Connection_Handler, #endif /* ! TAO_USES_ROBUST_CONNECTION_MGMT */ TAO_UIOP_Connector::TAO_UIOP_Connector (void) - : TAO_Connector (TAO_IOP_TAG_UNIX_IOP), + : TAO_Connector (TAO_TAG_UIOP_PROFILE), base_connector_ (), orb_core_ (0) #if defined (TAO_USES_ROBUST_CONNECTION_MGMT) @@ -444,7 +444,7 @@ TAO_UIOP_Connector::connect (TAO_Profile *profile, TAO_Transport *& transport, ACE_Time_Value *max_wait_time) { - if (profile->tag () != TAO_IOP_TAG_UNIX_IOP) + if (profile->tag () != TAO_TAG_UIOP_PROFILE) return -1; TAO_UIOP_Profile *uiop_profile = diff --git a/TAO/tao/UIOP_Profile.cpp b/TAO/tao/UIOP_Profile.cpp index 968ce41d20a..d504d07ea64 100644 --- a/TAO/tao/UIOP_Profile.cpp +++ b/TAO/tao/UIOP_Profile.cpp @@ -27,7 +27,7 @@ TAO_UIOP_Profile::TAO_UIOP_Profile (const ACE_UNIX_Addr &addr, const TAO_ObjectKey &object_key, const TAO_GIOP_Version &version, TAO_ORB_Core *orb_core) - : TAO_Profile (TAO_IOP_TAG_UNIX_IOP), + : TAO_Profile (TAO_TAG_UIOP_PROFILE), version_ (version), object_key_ (object_key), object_addr_ (addr), @@ -41,7 +41,7 @@ TAO_UIOP_Profile::TAO_UIOP_Profile (const char *, const ACE_UNIX_Addr &addr, const TAO_GIOP_Version &version, TAO_ORB_Core *orb_core) - : TAO_Profile (TAO_IOP_TAG_UNIX_IOP), + : TAO_Profile (TAO_TAG_UIOP_PROFILE), version_ (version), object_key_ (object_key), object_addr_ (addr), @@ -63,7 +63,7 @@ TAO_UIOP_Profile::TAO_UIOP_Profile (const TAO_UIOP_Profile &pfile) TAO_UIOP_Profile::TAO_UIOP_Profile (const char *string, TAO_ORB_Core *orb_core, CORBA::Environment &ACE_TRY_ENV) - : TAO_Profile (TAO_IOP_TAG_UNIX_IOP), + : TAO_Profile (TAO_TAG_UIOP_PROFILE), version_ (TAO_DEF_GIOP_MAJOR, TAO_DEF_GIOP_MINOR), object_key_ (), object_addr_ (), @@ -75,7 +75,7 @@ TAO_UIOP_Profile::TAO_UIOP_Profile (const char *string, } TAO_UIOP_Profile::TAO_UIOP_Profile (TAO_ORB_Core *orb_core) - : TAO_Profile (TAO_IOP_TAG_UNIX_IOP), + : TAO_Profile (TAO_TAG_UIOP_PROFILE), version_ (TAO_DEF_GIOP_MAJOR, TAO_DEF_GIOP_MINOR), object_key_ (), object_addr_ (), @@ -171,7 +171,7 @@ CORBA::Boolean TAO_UIOP_Profile::is_equivalent (const TAO_Profile *other_profile) { - if (other_profile->tag () != TAO_IOP_TAG_UNIX_IOP) + if (other_profile->tag () != TAO_TAG_UIOP_PROFILE) return 0; const TAO_UIOP_Profile *op = @@ -352,7 +352,7 @@ TAO_UIOP_Profile::encode (TAO_OutputCDR &stream) const // @@ it seems like this is not a good separation of concerns, why // do we write the TAG here? That's generic code and should be // handled by the object reference writer (IMHO). - stream.write_ulong (TAO_IOP_TAG_UNIX_IOP); + stream.write_ulong (TAO_TAG_UIOP_PROFILE); // Create the encapsulation.... TAO_OutputCDR encap (ACE_CDR::DEFAULT_BUFSIZE, diff --git a/TAO/tao/UIOP_Transport.cpp b/TAO/tao/UIOP_Transport.cpp index 255825caf6a..9da838d967f 100644 --- a/TAO/tao/UIOP_Transport.cpp +++ b/TAO/tao/UIOP_Transport.cpp @@ -51,7 +51,7 @@ ACE_TIMEPROBE_EVENT_DESCRIPTIONS (TAO_UIOP_Transport_Timeprobe_Description, TAO_UIOP_Transport::TAO_UIOP_Transport (TAO_UIOP_Handler_Base *handler, TAO_ORB_Core *orb_core) - : TAO_Transport (TAO_IOP_TAG_UNIX_IOP, + : TAO_Transport (TAO_TAG_UIOP_PROFILE, orb_core), handler_ (handler) { @@ -59,7 +59,12 @@ TAO_UIOP_Transport::TAO_UIOP_Transport (TAO_UIOP_Handler_Base *handler, TAO_UIOP_Transport::~TAO_UIOP_Transport (void) { - this->flush_buffered_messages (); + // Cannot deal with errors, and therefore they are ignored. + this->send_buffered_messages (); + + // Note that it also doesn't matter how much of the data was + // actually sent. + this->dequeue_all (); } TAO_UIOP_Handler_Base *& @@ -92,6 +97,11 @@ TAO_UIOP_Transport::event_handler (void) return this->handler_; } +void +TAO_UIOP_Transport::messaging_init (TAO_Pluggable_Message_Factory *mesg) +{ + this->giop_factory_ = mesg; +} // **************************************************************** TAO_UIOP_Server_Transport:: @@ -330,81 +340,26 @@ TAO_UIOP_Transport::send (TAO_Stub *stub, } ssize_t -TAO_UIOP_Transport::send (const ACE_Message_Block *mblk, - ACE_Time_Value *max_time_wait) +TAO_UIOP_Transport::send (const ACE_Message_Block *message_block, + ACE_Time_Value *max_wait_time) { TAO_FUNCTION_PP_TIMEPROBE (TAO_UIOP_TRANSPORT_SEND_START); - // @@ This code should be refactored into ACE.cpp or something - // similar! - - // For the most part this was copied from GIOP::send_request and - // friends. - - iovec iov[IOV_MAX]; - int iovcnt = 0; - ssize_t n = 0; - ssize_t nbytes = 0; - - for (const ACE_Message_Block *i = mblk; - i != 0; - i = i->cont ()) - { - // Make sure there is something to send! - if (i->length () > 0) - { - iov[iovcnt].iov_base = i->rd_ptr (); - iov[iovcnt].iov_len = i->length (); - iovcnt++; - - // The buffer is full make a OS call. @@ TODO this should - // be optimized on a per-platform basis, for instance, some - // platforms do not implement writev() there we should copy - // the data into a buffer and call send_n(). In other cases - // there may be some limits on the size of the iovec, there - // we should set IOV_MAX to that limit. - if (iovcnt == IOV_MAX) - { - if (max_time_wait == 0) - n = this->handler_->peer ().sendv_n (iov, - iovcnt); - else - n = ACE::writev (this->handler_->peer ().get_handle (), - iov, - iovcnt, - max_time_wait); - - if (n <= 0) - return n; - - nbytes += n; - iovcnt = 0; - } - } - } - - // Check for remaining buffers to be sent! - if (iovcnt != 0) - { - n = this->handler_->peer ().sendv_n (iov, - iovcnt); - if (n < 1) - return n; - - nbytes += n; - } - - return nbytes; + return ACE::send_n (this->handle (), + message_block, + max_wait_time); } ssize_t TAO_UIOP_Transport::send (const u_char *buf, size_t len, - ACE_Time_Value *) + ACE_Time_Value *max_wait_time) { TAO_FUNCTION_PP_TIMEPROBE (TAO_UIOP_TRANSPORT_SEND_START); - return this->handler_->peer ().send_n (buf, len); + return this->handler_->peer ().send_n (buf, + len, + max_wait_time); } ssize_t @@ -414,10 +369,9 @@ TAO_UIOP_Transport::recv (char *buf, { TAO_FUNCTION_PP_TIMEPROBE (TAO_UIOP_TRANSPORT_RECEIVE_START); - return ACE::recv_n (this->handler_->peer ().get_handle (), - buf, - len, - max_wait_time); + return this->handler_->peer ().recv_n (buf, + len, + max_wait_time); } // Default action to be taken for send request. @@ -431,4 +385,26 @@ TAO_UIOP_Transport::send_request (TAO_Stub *, return -1; } +CORBA::Boolean +TAO_UIOP_Transport::send_request_header (const IOP::ServiceContextList & svc_ctx, + CORBA::ULong request_id, + CORBA::Octet response_flags, + TAO_Stub *stub, + const CORBA::Short address_disposition, + const char* opname, + TAO_OutputCDR & msg) +{ + // We are going to pass on this request to the underlying messaging + // layer. It should take care of this request + CORBA::Boolean retval = + this->giop_factory_->write_request_header (svc_ctx, + request_id, + response_flags, + stub, + address_disposition, + opname, + msg); + + return retval; +} #endif /* TAO_HAS_UIOP */ diff --git a/TAO/tao/UIOP_Transport.h b/TAO/tao/UIOP_Transport.h index d73779f74e3..3e06b52ad73 100644 --- a/TAO/tao/UIOP_Transport.h +++ b/TAO/tao/UIOP_Transport.h @@ -23,6 +23,9 @@ #include "tao/Pluggable.h" +// BALA Temporrary inclusion +#include "tao/Pluggable_Messaging.h" + #if !defined (ACE_LACKS_PRAGMA_ONCE) # pragma once #endif /* ACE_LACKS_PRAGMA_ONCE */ @@ -81,10 +84,26 @@ public: int twoway, ACE_Time_Value *max_wait_time); + + CORBA::Boolean + send_request_header (const IOP::ServiceContextList &svc_ctx, + CORBA::ULong request_id, + CORBA::Octet response_flags, + TAO_Stub *stub, + const short address_disposition, + const char* opname, + TAO_OutputCDR &msg); + + void messaging_init (TAO_Pluggable_Message_Factory *mesg); + // Initialising the messaging object protected: TAO_UIOP_Handler_Base *handler_; // the connection service handler used for accessing lower layer // communication protocols. + + TAO_Pluggable_Message_Factory *giop_factory_; + // The message_factor instance specific for this particular + // transport protocol. }; class TAO_Export TAO_UIOP_Client_Transport : public TAO_UIOP_Transport diff --git a/TAO/tao/corbafwd.h b/TAO/tao/corbafwd.h index 89e1a8a52e1..0c4b93b126e 100644 --- a/TAO/tao/corbafwd.h +++ b/TAO/tao/corbafwd.h @@ -644,7 +644,7 @@ template <class T,class T_var> class TAO_Unbounded_Object_Sequence; // Provide a simple function to access the TSS default environment. // We tried with CORBA_Environment::default_environment (), // CORBA::default_environment() and others. -extern TAO_Export CORBA_Environment& TAO_default_environment (void); +TAO_Export CORBA_Environment& TAO_default_environment (void); enum TAO_SERVANT_LOCATION { @@ -703,7 +703,7 @@ TAO_NAMESPACE CORBA // = String memory management. TAO_NAMESPACE_INLINE_FUNCTION Char* string_alloc (ULong len); - TAO_NAMESPACE_STORAGE_CLASS Char* string_dup (const Char *); + TAO_NAMESPACE_STORAGE_CLASS Char* string_dup (const Char *); TAO_NAMESPACE_INLINE_FUNCTION void string_free (Char *); // This is a TAO extension and must go away.... @@ -719,7 +719,7 @@ TAO_NAMESPACE CORBA // = String memory management routines. TAO_NAMESPACE_INLINE_FUNCTION WChar* wstring_alloc (ULong len); - TAO_NAMESPACE_STORAGE_CLASS WChar* wstring_dup (const WChar *const); + TAO_NAMESPACE_STORAGE_CLASS WChar* wstring_dup (const WChar *const); TAO_NAMESPACE_INLINE_FUNCTION void wstring_free (WChar *const); typedef CORBA_WString_var WString_var; @@ -1692,6 +1692,13 @@ TAO_NAMESPACE_CLOSE // end of class (namespace) CORBA // types later. #define TAO_ORB_TYPE 0x54414f00U +// The standard profile tags, they are listed here only to avoid +// putting the raw literal in the code, it is *NOT* necessary to list +// your own protocols here. +#define TAO_TAG_INVALID_PROFILE -1 +#define TAO_TAG_IIOP_PROFILE 0 +#define TAO_TAG_MULTIPLE_COMPONENT_PROFILE 1 + // We reserved the range 0x54414f00 - 0x54414f0f with the OMG to // define our own profile ids in TAO. #define TAO_TAG_UIOP_PROFILE 0x54414f00U /* Local IPC (Unix Domain) */ @@ -1806,10 +1813,10 @@ TAO_NAMESPACE_CLOSE // end of class (namespace) CORBA typedef TAO_Unbounded_Sequence<CORBA::Octet> TAO_opaque; extern TAO_Export CORBA::TypeCode_ptr TC_opaque; -extern TAO_Export CORBA::Boolean +TAO_Export CORBA::Boolean operator<< (TAO_OutputCDR&, const TAO_opaque&); -extern TAO_Export CORBA::Boolean +TAO_Export CORBA::Boolean operator>> (TAO_InputCDR&, TAO_opaque&); class TAO_ObjectKey; |