summaryrefslogtreecommitdiff
path: root/TAO/tao/Transport.h
diff options
context:
space:
mode:
Diffstat (limited to 'TAO/tao/Transport.h')
-rw-r--r--TAO/tao/Transport.h1160
1 files changed, 1160 insertions, 0 deletions
diff --git a/TAO/tao/Transport.h b/TAO/tao/Transport.h
new file mode 100644
index 00000000000..ad9a8a25155
--- /dev/null
+++ b/TAO/tao/Transport.h
@@ -0,0 +1,1160 @@
+// -*- C++ -*-
+
+//=============================================================================
+/**
+ * @file Transport.h
+ *
+ * $Id$
+ *
+ * Define the interface for the Transport component in TAO's
+ * pluggable protocol framework.
+ *
+ * @author Fred Kuhns <fredk@cs.wustl.edu>
+ */
+//=============================================================================
+
+#ifndef TAO_TRANSPORT_H
+#define TAO_TRANSPORT_H
+
+#include /**/ "ace/pre.h"
+
+#include "tao/Transport_Cache_Manager.h"
+
+#if !defined (ACE_LACKS_PRAGMA_ONCE)
+# pragma once
+#endif /* ACE_LACKS_PRAGMA_ONCE */
+
+#include "tao/Transport_Timer.h"
+#include "tao/Incoming_Message_Queue.h"
+#include "tao/Incoming_Message_Stack.h"
+#include "ace/Time_Value.h"
+#include "ace/Basic_Stats.h"
+
+struct iovec;
+
+TAO_BEGIN_VERSIONED_NAMESPACE_DECL
+
+class TAO_ORB_Core;
+class TAO_Target_Specification;
+class TAO_Operation_Details;
+class TAO_Transport_Mux_Strategy;
+class TAO_Wait_Strategy;
+class TAO_Connection_Handler;
+class TAO_Pluggable_Messaging;
+class TAO_Codeset_Translator_Base;
+
+class TAO_Queued_Message;
+class TAO_Synch_Queued_Message;
+class TAO_Resume_Handle;
+class TAO_Stub;
+class TAO_MMAP_Allocator;
+
+namespace TAO
+{
+ /**
+ * @note Should this be in TAO namespace. Seems like a candidate
+ * that should be in the transport
+ */
+ enum Connection_Role
+ {
+ TAO_UNSPECIFIED_ROLE = 0,
+ TAO_SERVER_ROLE = 1,
+ TAO_CLIENT_ROLE = 2
+ };
+
+ namespace Transport
+ {
+ /// Transport-level statistics. Initially introduced to support
+ /// the "Transport Current" functionality.
+ class Stats;
+ }
+}
+
+/*
+ * Specialization hook for the TAO's transport implementation.
+ */
+//@@ TAO_TRANSPORT_SPL_INCLUDE_FORWARD_DECL_ADD_HOOK
+
+/**
+ * @class TAO_Transport
+ *
+ * @brief Generic definitions for the Transport class.
+ *
+ * The transport object is created in the Service handler
+ * constructor and deleted in the Service Handler's destructor!!
+ *
+ * The main responsability of a Transport object is to encapsulate a
+ * connection, and provide a transport independent way to send and
+ * receive data. Since TAO is heavily based on the Reactor for all if
+ * not all its I/O the Transport class is usually implemented with a
+ * helper Connection Handler that adapts the generic Transport
+ * interface to the Reactor types.
+ *
+ * <H3>The outgoing data path:</H3>
+ *
+ * One of the responsibilities of the TAO_Transport class is to send
+ * out GIOP messages as efficiently as possible. In most cases
+ * messages are put out in FIFO order, the transport object will put
+ * out the message using a single system call and return control to
+ * the application. However, for oneways and AMI requests it may be
+ * more efficient (or required if the SYNC_NONE policy is in effect)
+ * to queue the messages until a large enough data set is available.
+ * Another reason to queue is that some applications cannot block for
+ * I/O, yet they want to send messages so large that a single write()
+ * operation would not be able to cope with them. In such cases we
+ * need to queue the data and use the Reactor to drain the queue.
+ *
+ * Therefore, the Transport class may need to use a queue to
+ * temporarily hold the messages, and, in some configurations, it may
+ * need to use the Reactor to concurrently drain such queues.
+ *
+ * <H4>Out of order messages:</H4> TAO provides explicit policies to
+ * send 'urgent' messages. Such messages may put at the head of the
+ * queue. However, they cannot be sent immediately because the
+ * transport may already be sending another message in a reactive
+ * fashion.
+ *
+ * Consequently, the Transport must also know if the head of the queue
+ * has been partially sent. In that case new messages can only follow
+ * the head. Only once the head is completely sent we can start
+ * sending new messages.
+ *
+ * <H4>Waiting threads:</H4> One or more threads can be blocked
+ * waiting for the connection to completely send the message.
+ * The thread should return as soon as its message has been sent, so a
+ * per-thread condition is required. This suggest that simply using a
+ * ACE_Message_Queue would not be enough: there is a significant
+ * amount of ancillary information, to keep on each message that the
+ * Message_Block class does not provide room for.
+ *
+ * Blocking I/O is still attractive for some applications. First, my
+ * eliminating the Reactor overhead performance is improved when
+ * sending large blocks of data. Second, using the Reactor to send
+ * out data opens the door for nested upcalls, yet some applications
+ * cannot deal with the reentrancy issues in this case.
+ *
+ * <H4>Timeouts:</H4> Some or all messages could have a timeout period
+ * attached to them. The timeout source could either be some
+ * high-level policy or maybe some strategy to prevent denial of
+ * service attacks. In any case the timeouts are per-message, and
+ * later messages could have shorter timeouts.
+ * In fact, some kind of scheduling (such as EDF) could be required in
+ * a few applications.
+ *
+ * <H4>Conclusions:</H4> The outgoing data path consist in several
+ * components:
+ *
+ * - A queue of pending messages
+ * - A message currently being transmitted
+ * - A per-transport 'send strategy' to choose between blocking on
+ * write, blocking on the reactor or blockin on leader/follower.
+ * - A per-message 'waiting object'
+ * - A per-message timeout
+ *
+ * The Transport object provides a single method to send request
+ * messages (send_request_message ()).
+ *
+ * <H3>The incoming data path:</H3>
+ *
+ * One of the main responsibilities of the transport is to read and
+ * process the incoming GIOP message as quickly and efficiently as
+ * possible. There are other forces that needs to be given due
+ * consideration. They are
+ * - Multiple threads should be able to traverse along the same data
+ * path but should not be able to read from the same handle at the
+ * same time ie. the handle should not be shared between threads at
+ * any instant.
+ * - Reads on the handle could give one or more messages.
+ * - Minimise locking and copying overhead when trying to attack the
+ * above.
+ *
+ * <H3> Parsing messages (GIOP) & processing the message:</H3>
+ *
+ * The messages should be checked for validity and the right
+ * information should be sent to the higher layer for processing. The
+ * process of doing a sanity check and preparing the messages for the
+ * higher layers of the ORB are done by the messaging protocol.
+ *
+ * <H3> Design forces and Challenges </H3>
+ *
+ * To keep things as efficient as possible for medium sized requests,
+ * it would be good to minimise data copying and locking along the
+ * incoming path ie. from the time of reading the data from the handle
+ * to the application. We achieve this by creating a buffer on stack
+ * and reading the data from the handle into the buffer. We then pass
+ * the same data block (the buffer is encapsulated into a data block)
+ * to the higher layers of the ORB. The problems stem from the
+ * following
+ * (a) Data is bigger than the buffer that we have on stack
+ * (b) Transports like TCP do not guarantee availability of the whole
+ * chunk of data in one shot. Data could trickle in byte by byte.
+ * (c) Single read gives multiple messages
+ *
+ * We solve the problems as follows
+ *
+ * (a) First do a read with the buffer on stack. Query the underlying
+ * messaging object whether the message has any incomplete
+ * portion. If so, data will be copied into new buffer being able
+ * to hold full message and is queued; succeeding events will read
+ * data from socket and write directly into this buffer.
+ * Otherwise, if if the message in local buffer is complete, we free
+ * the handle and then send the message to the higher layers of the
+ * ORB for processing.
+ *
+ * (b) If buffer with incomplete message has been enqueued, while trying
+ * to do the above, the reactor will call us back when the handle
+ * becomes read ready. The read-operation will copy data directly
+ * into the enqueued buffer. If the message has bee read completely
+ * the message is sent to the higher layers of the ORB for processing.
+ *
+ * (c) If we get multiple messages (possible if the client connected
+ * to the server sends oneways or AMI requests), we parse and
+ * split the messages. Every message is put in the queue. Once
+ * the messages are queued, the thread picks up one message to
+ * send to the higher layers of the ORB. Before doing that, if
+ * it finds more messages, it sends a notify to the reactor
+ * without resuming the handle. The next thread picks up a
+ * message from the queue and processes that. Once the queue
+ * is drained the last thread resumes the handle.
+ *
+ * <H3> Sending Replies </H3>
+ *
+ * We could use the outgoing path of the ORB to send replies. This
+ * would allow us to reuse most of the code in the outgoing data
+ * path. We were doing this till TAO-1.2.3. We run in to
+ * problems. When writing the reply the ORB gets flow controlled, and the
+ * ORB tries to flush the message by going into the reactor. This
+ * resulted in unnecessary nesting. The thread that gets into the
+ * Reactor could potentially handle other messages (incoming or
+ * outgoing) and the stack starts growing leading to crashes.
+ *
+ * <H4> Solution to the nesting problem </H4>
+ *
+ * The solution that we (plan to) adopt is pretty straight
+ * forward. The thread sending replies will not block to send the
+ * replies but queue the replies and return to the Reactor. (Note the
+ * careful usages of the terms "blocking in the Reactor" as opposed to
+ * "return back to the Reactor".
+ *
+ *
+ * <B>See Also:</B>
+ *
+ * https://svn.dre.vanderbilt.edu/viewvc/Middleware/trunk/TAO/docs/pluggable_protocols/index.html?revision=HEAD
+ *
+ */
+class TAO_Export TAO_Transport
+{
+public:
+
+ /// Default creator, requires the tag value be supplied.
+ TAO_Transport (CORBA::ULong tag,
+ TAO_ORB_Core *orb_core);
+
+ /// Destructor
+ virtual ~TAO_Transport (void);
+
+ /// Return the protocol tag.
+ /**
+ * The OMG assigns unique tags (a 32-bit unsigned number) to each
+ * protocol. New protocol tags can be obtained free of charge from
+ * the OMG, check the documents in corbafwd.h for more details.
+ */
+ CORBA::ULong tag (void) const;
+
+ /// Access the ORB that owns this connection.
+ TAO_ORB_Core *orb_core (void) const;
+
+ /// Get the TAO_Tranport_Mux_Strategy used by this object.
+ /**
+ * The role of the TAO_Transport_Mux_Strategy is described in more
+ * detail in that class' documentation. Enough is to say that the
+ * class is used to control how many threads can have pending
+ * requests over the same connection. Multiplexing multiple threads
+ * over the same connection conserves resources and is almost
+ * required for AMI, but having only one pending request per
+ * connection is more efficient and reduces the possibilities of
+ * priority inversions.
+ */
+ TAO_Transport_Mux_Strategy *tms (void) const;
+
+ /// Return the TAO_Wait_Strategy used by this object.
+ /**
+ * The role of the TAO_Wait_Strategy is described in more detail in
+ * that class' documentation. Enough is to say that the ORB can wait
+ * for a reply blocking on read(), using the Reactor to wait for
+ * multiple events concurrently or using the Leader/Followers
+ * protocol.
+ */
+ TAO_Wait_Strategy *wait_strategy (void) const;
+
+ /// Callback method to reactively drain the outgoing data queue
+ int handle_output (void);
+
+ /// Get the bidirectional flag
+ int bidirectional_flag (void) const;
+
+ /// Set the bidirectional flag
+ void bidirectional_flag (int flag);
+
+ /// Set the Cache Map entry
+ void cache_map_entry (TAO::Transport_Cache_Manager::HASH_MAP_ENTRY *entry);
+
+ /// Get the Cache Map entry
+ TAO::Transport_Cache_Manager::HASH_MAP_ENTRY *cache_map_entry (void);
+
+ /// Set and Get the identifier for this transport instance.
+ /**
+ * If not set, this will return an integer representation of
+ * the <code>this</code> pointer for the instance on which
+ * it's called.
+ */
+ size_t id (void) const;
+ void id (size_t id);
+
+ /**
+ * Methods dealing with the role of the connection, e.g., CLIENT or SERVER.
+ * See CORBA 2.6 Specification, Section 15.5.1 for origin of definitions.
+ */
+ TAO::Connection_Role opened_as (void) const;
+ void opened_as (TAO::Connection_Role);
+
+ /// Get and Set the purging order. The purging strategy uses the set
+ /// version to set the purging order.
+ unsigned long purging_order (void) const;
+ void purging_order(unsigned long value);
+
+ /// Check if there are messages pending in the queue
+ /**
+ * @return 1 if the queue is empty
+ */
+ int queue_is_empty (void);
+
+ /// Added event handler to the handlers set.
+ /**
+ * Called by the cache when the cache is closing.
+ *
+ * @param handlers The TAO_Connection_Handler_Set into which the
+ * transport should place its handler
+ */
+ void provide_handler (TAO::Connection_Handler_Set &handlers);
+
+ /// Add event handlers corresponding to transports that have RW wait
+ /// strategy to the handlers set.
+ /**
+ * Called by the cache when the ORB is shuting down.
+ *
+ * @param handlers The TAO_Connection_Handler_Set into which the
+ * transport should place its handler if the transport has RW
+ * strategy on.
+ *
+ * @return true indicates a handler was added to the handler set.
+ * false indocates that the transport did not have a
+ * blockable handler that could be added.
+ */
+ bool provide_blockable_handler (TAO::Connection_Handler_Set &handlers);
+
+ /// Register the handler with the reactor.
+ /**
+ * Register the handler with the reactor. This method is used by the
+ * Wait_On_Reactor strategy. The transport must register its event
+ * handler with the ORB's Reactor.
+ *
+ * @todo I think this method is pretty much useless, the
+ * connections are *always* registered with the Reactor, except in
+ * thread-per-connection mode. In that case putting the connection
+ * in the Reactor would produce unpredictable results anyway.
+ */
+ virtual int register_handler (void);
+
+ /// Write the complete Message_Block chain to the connection.
+ /**
+ * This method serializes on handler_lock_, guaranteeing that only
+ * thread can execute it on the same instance concurrently.
+ *
+ * Often the implementation simply forwards the arguments to the
+ * underlying ACE_Svc_Handler class. Using the code factored out
+ * into ACE.
+ *
+ * Be careful with protocols that perform non-trivial
+ * transformations of the data, such as SSLIOP or protocols that
+ * compress the stream.
+ *
+ * @param iov contains the data that must be sent.
+ *
+ * @param timeout is the maximum time that the application is
+ * willing to wait for the data to be sent, useful in platforms that
+ * implement timed writes.
+ * The timeout value is obtained from the policies set by the
+ * application.
+ *
+ * @param bytes_transferred should return the total number of bytes
+ * successfully transferred before the connection blocked. This is
+ * required because in some platforms and/or protocols multiple
+ * system calls may be required to send the chain of message
+ * blocks. The first few calls can work successfully, but the final
+ * one can fail or signal a flow control situation (via EAGAIN).
+ * In this case the ORB expects the function to return -1, errno to
+ * be appropriately set and this argument to return the number of
+ * bytes already on the OS I/O subsystem.
+ *
+ * This call can also fail if the transport instance is no longer
+ * associated with a connection (e.g., the connection handler closed
+ * down). In that case, it returns -1 and sets errno to
+ * <code>ENOENT</code>.
+ */
+ virtual ssize_t send (iovec *iov,
+ int iovcnt,
+ size_t &bytes_transferred,
+ const ACE_Time_Value *timeout = 0) = 0;
+
+#if TAO_HAS_SENDFILE == 1
+ /// Send data through zero-copy write mechanism, if available.
+ /**
+ * This method sends the data in the I/O vector through the platform
+ * sendfile() function to perform a zero-copy write, if available.
+ * Otherwise, the default fallback implementation simply delegates
+ * to the TAO_Transport::send() method.
+ *
+ * @note This method is best used when sending very large blocks of
+ * data.
+ */
+ virtual ssize_t sendfile (TAO_MMAP_Allocator * allocator,
+ iovec * iov,
+ int iovcnt,
+ size_t &bytes_transferred,
+ ACE_Time_Value const * timeout = 0);
+#endif /* TAO_HAS_SENDFILE==1 */
+
+
+ /// Read len bytes from into buf.
+ /**
+ * This method serializes on handler_lock_, guaranteeing that only
+ * thread can execute it on the same instance concurrently.
+ *
+ * @param buffer ORB allocated buffer where the data should be
+ * @@ The ACE_Time_Value *s is just a place holder for now. It is
+ * not clear this this is the best place to specify this. The actual
+ * timeout values will be kept in the Policies.
+ */
+ virtual ssize_t recv (char *buffer,
+ size_t len,
+ const ACE_Time_Value *timeout = 0) = 0;
+
+ /**
+ * @name Control connection lifecycle
+ *
+ * These methods are routed through the TMS object. The TMS
+ * strategies implement them correctly.
+ */
+ //@{
+
+ /// Request has been just sent, but the reply is not received. Idle
+ /// the transport now.
+ bool idle_after_send (void);
+
+ /// Request is sent and the reply is received. Idle the transport
+ /// now.
+ bool idle_after_reply (void);
+
+ /// Call the implementation method after obtaining the lock.
+ virtual void close_connection (void);
+
+ //@}
+
+ /** @name Template methods
+ *
+ * The Transport class uses the Template Method Pattern to implement
+ * the protocol specific functionality.
+ * Implementors of a pluggable protocol should override the
+ * following methods with the semantics documented below.
+ */
+ /**
+ * Initialising the messaging object. This would be used by the
+ * connector side. On the acceptor side the connection handler
+ * would take care of the messaging objects.
+ */
+ virtual int messaging_init (CORBA::Octet major,
+ CORBA::Octet minor) = 0;
+
+ /// Extracts the list of listen points from the @a cdr stream. The
+ /// list would have the protocol specific details of the
+ /// ListenPoints
+ virtual int tear_listen_point_list (TAO_InputCDR &cdr);
+
+ /// Hooks that can be overridden in concrete transports.
+ /**
+ * These hooks are invoked just after connection establishment (or
+ * after a connection is fetched from cache). The
+ * return value signifies whether the invoker should proceed with
+ * post connection establishment activities. Protocols like SSLIOP
+ * need this to verify whether connections already established have
+ * valid certificates. There are no pre_connect_hooks () since the
+ * transport doesn't exist before a connection establishment. :-)
+ *
+ * @note The methods are not made const with a reason.
+ */
+ virtual bool post_connect_hook (void);
+
+ /// Memory management routines.
+ /*
+ * Forwards to event handler.
+ */
+ ACE_Event_Handler::Reference_Count add_reference (void);
+ ACE_Event_Handler::Reference_Count remove_reference (void);
+
+ /// Return the messaging object that is used to format the data that
+ /// needs to be sent.
+ virtual TAO_Pluggable_Messaging * messaging_object (void) = 0;
+
+ /** @name Template methods
+ *
+ * The Transport class uses the Template Method Pattern to implement
+ * the protocol specific functionality.
+ * Implementors of a pluggable protocol should override the
+ * following methods with the semantics documented below.
+ */
+ //@{
+
+ /// Return the event handler used to receive notifications from the
+ /// Reactor.
+ /**
+ * Normally a concrete TAO_Transport object has-a ACE_Event_Handler
+ * member that functions as an adapter between the ACE_Reactor
+ * framework and the TAO pluggable protocol framework.
+ * In all the protocols implemented so far this role is fullfilled
+ * by an instance of ACE_Svc_Handler.
+ *
+ * @todo Since we only use a limited functionality of
+ * ACE_Svc_Handler we could probably implement a generic
+ * adapter class (TAO_Transport_Event_Handler or something), this
+ * will reduce footprint and simplify the process of implementing a
+ * pluggable protocol.
+ *
+ * @todo This method has to be renamed to event_handler()
+ */
+ virtual ACE_Event_Handler * event_handler_i (void) = 0;
+
+ /// Is this transport really connected
+ bool is_connected (void) const;
+
+ /// Perform all the actions when this transport get opened
+ bool post_open (size_t id);
+
+ /// do what needs to be done when closing the transport
+ void pre_close (void);
+
+ /// Get the connection handler for this transport
+ TAO_Connection_Handler * connection_handler (void);
+
+ /// Accessor for the output CDR stream
+ TAO_OutputCDR &out_stream (void);
+
+ /*
+ * Specialization hook to add public methods from
+ * concrete transport implementations to TAO's transport
+ * class
+ */
+ //@@ TAO_TRANSPORT_SPL_PUBLIC_METHODS_ADD_HOOK
+
+protected:
+
+ virtual TAO_Connection_Handler * connection_handler_i (void) = 0;
+
+public:
+
+ /// This is a request for the transport object to write a
+ /// LocateRequest header before it is sent out.
+ int generate_locate_request (TAO_Target_Specification &spec,
+ TAO_Operation_Details &opdetails,
+ TAO_OutputCDR &output);
+
+ /// This is a request for the transport object to write a request
+ /// header before it sends out the request
+ virtual int generate_request_header (TAO_Operation_Details &opd,
+ TAO_Target_Specification &spec,
+ TAO_OutputCDR &msg);
+
+ /// Recache ourselves in the cache
+ int recache_transport (TAO_Transport_Descriptor_Interface* desc);
+
+ /// Callback to read incoming data
+ /**
+ * The ACE_Event_Handler adapter invokes this method as part of its
+ * handle_input() operation.
+ *
+ * @todo the method name is confusing! Calling it handle_input()
+ * would probably make things easier to understand and follow!
+ *
+ * Once a complete message is read the Transport class delegates on
+ * the Messaging layer to invoke the right upcall (on the server) or
+ * the TAO_Reply_Dispatcher (on the client side).
+ *
+ * @param max_wait_time In some cases the I/O is synchronous, e.g. a
+ * thread-per-connection server or when Wait_On_Read is enabled. In
+ * those cases a maximum read time can be specified.
+ *
+ * @param block Is deprecated and ignored.
+ *
+ */
+ virtual int handle_input (TAO_Resume_Handle &rh,
+ ACE_Time_Value *max_wait_time = 0,
+ int block = 0);
+
+ enum
+ {
+ TAO_ONEWAY_REQUEST = 0,
+ TAO_TWOWAY_REQUEST = 1,
+ TAO_REPLY
+ };
+
+ /// Prepare the waiting and demuxing strategy to receive a reply for
+ /// a new request.
+ /**
+ * Preparing the ORB to receive the reply only once the request is
+ * completely sent opens the system to some subtle race conditions:
+ * suppose the ORB is running in a multi-threaded configuration,
+ * thread A makes a request while thread B is using the Reactor to
+ * process all incoming requests.
+ * Thread A could be implemented as follows:
+ * 1) send the request
+ * 2) setup the ORB to receive the reply
+ * 3) wait for the request
+ *
+ * but in this case thread B may receive the reply between step (1)
+ * and (2), and drop it as an invalid or unexpected message.
+ * Consequently the correct implementation is:
+ * 1) setup the ORB to receive the reply
+ * 2) send the request
+ * 3) wait for the reply
+ *
+ * The following method encapsulates this idiom.
+ *
+ * @todo This is generic code, it should be factored out into the
+ * Transport class.
+ */
+ // @nolock b/c this calls send_or_buffer
+ virtual int send_request (TAO_Stub *stub,
+ TAO_ORB_Core *orb_core,
+ TAO_OutputCDR &stream,
+ int message_semantics,
+ ACE_Time_Value *max_time_wait) = 0;
+
+
+
+ /// This method formats the stream and then sends the message on the
+ /// transport.
+ /**
+ * Once the ORB is prepared to receive a reply (see send_request()
+ * above), and all the arguments have been marshaled the CDR stream
+ * must be 'formatted', i.e. the message_size field in the GIOP
+ * header can finally be set to the proper value.
+ *
+ */
+ virtual int send_message (TAO_OutputCDR &stream,
+ TAO_Stub *stub = 0,
+ int message_semantics = TAO_Transport::TAO_TWOWAY_REQUEST,
+ ACE_Time_Value *max_time_wait = 0) = 0;
+
+
+ /// Sent the contents of @a message_block
+ /**
+ * @param stub The object reference used for this operation, useful
+ * to obtain the current policies.
+ * @param message_semantics If this is set to TAO_TWO_REQUEST
+ * this method will block until the operation is completely
+ * written on the wire. If it is set to other values this
+ * operation could return.
+ * @param message_block The CDR encapsulation of the GIOP message
+ * that must be sent. The message may consist of
+ * multiple Message Blocks chained through the cont()
+ * field.
+ * @param max_wait_time The maximum time that the operation can
+ * block, used in the implementation of timeouts.
+ */
+ virtual int send_message_shared (TAO_Stub *stub,
+ int message_semantics,
+ const ACE_Message_Block *message_block,
+ ACE_Time_Value *max_wait_time);
+
+protected:
+
+ /// Process the message by sending it to the higher layers of the
+ /// ORB.
+ int process_parsed_messages (TAO_Queued_Data *qd,
+ TAO_Resume_Handle &rh);
+
+ /// Implement send_message_shared() assuming the handler_lock_ is
+ /// held.
+ int send_message_shared_i (TAO_Stub *stub,
+ int message_semantics,
+ const ACE_Message_Block *message_block,
+ ACE_Time_Value *max_wait_time);
+
+ /// Queue a message for @a message_block
+ /// @param max_wait_time The maximum time that the operation can
+ /// block, used in the implementation of timeouts.
+ int queue_message_i (const ACE_Message_Block *message_block,
+ ACE_Time_Value *max_wait_time);
+
+public:
+ /// Format and queue a message for @a stream
+ /// @param max_wait_time The maximum time that the operation can
+ /// block, used in the implementation of timeouts.
+ int format_queue_message (TAO_OutputCDR &stream,
+ ACE_Time_Value *max_wait_time);
+
+ /// Send a message block chain,
+ int send_message_block_chain (const ACE_Message_Block *message_block,
+ size_t &bytes_transferred,
+ ACE_Time_Value *max_wait_time = 0);
+
+ /// Send a message block chain, assuming the lock is held
+ int send_message_block_chain_i (const ACE_Message_Block *message_block,
+ size_t &bytes_transferred,
+ ACE_Time_Value *max_wait_time);
+ /// Cache management
+ int purge_entry (void);
+
+ /// Cache management
+ int make_idle (void);
+
+ /// Cache management
+ int update_transport (void);
+
+ /// The timeout callback, invoked when any of the timers related to
+ /// this transport expire.
+ /**
+ * @param current_time The current time as reported from the Reactor
+ * @param act The Asynchronous Completion Token. Currently it is
+ * interpreted as follows:
+ * - If the ACT is the address of this->current_deadline_ the
+ * queueing timeout has expired and the queue should start
+ * flushing.
+ *
+ * @return Returns 0 if there are no problems, -1 if there is an
+ * error
+ *
+ * @todo In the future this function could be used to expire
+ * messages (oneways) that have been sitting for too long on
+ * the queue.
+ */
+ int handle_timeout (const ACE_Time_Value &current_time,
+ const void* act);
+
+ /// Accessor to recv_buffer_size_
+ size_t recv_buffer_size (void) const;
+
+ /// Accessor to sent_byte_count_
+ size_t sent_byte_count (void) const;
+
+ /// CodeSet Negotiation - Get the char codeset translator factory
+ TAO_Codeset_Translator_Base *char_translator (void) const;
+
+ /// CodeSet Negotiation - Get the wchar codeset translator factory
+ TAO_Codeset_Translator_Base *wchar_translator (void) const;
+
+ /// CodeSet negotiation - Set the char codeset translator factory
+ void char_translator (TAO_Codeset_Translator_Base *);
+
+ /// CodeSet negotiation - Set the wchar codeset translator factory
+ void wchar_translator (TAO_Codeset_Translator_Base *);
+
+ /// Use the Transport's codeset factories to set the translator for input
+ /// and output CDRs.
+ void assign_translators (TAO_InputCDR *, TAO_OutputCDR *);
+
+ /// It is necessary to clear the codeset translator when a CDR stream
+ /// is used for more than one GIOP message. This is required since the
+ /// header must not be translated, whereas the body must be.
+ void clear_translators (TAO_InputCDR *, TAO_OutputCDR *);
+
+ /// Return true if the tcs has been set
+ CORBA::Boolean is_tcs_set() const;
+
+ /// Set the state of the first_request_ flag to 0
+ void first_request_sent();
+
+ /// Notify all the components inside a Transport when the underlying
+ /// connection is closed.
+ void send_connection_closed_notifications (void);
+
+ /// Transport statistics
+ TAO::Transport::Stats* stats (void) const;
+
+private:
+
+ /// Helper method that returns the Transport Cache Manager.
+ TAO::Transport_Cache_Manager &transport_cache_manager (void);
+
+ /// Send some of the data in the queue.
+ /**
+ * As the outgoing data is drained this method is invoked to send as
+ * much of the current message as possible.
+ *
+ * Returns 0 if there is more data to send, -1 if there was an error
+ * and 1 if the message was completely sent.
+ */
+ int drain_queue (void);
+
+ /// Implement drain_queue() assuming the lock is held
+ int drain_queue_i (void);
+
+ /// This class needs priviledged access to
+ /// - queue_is_empty_i()
+ /// - drain_queue_i()
+ friend class TAO_Block_Flushing_Strategy;
+
+ /// Check if there are messages pending in the queue
+ /**
+ * This version assumes that the lock is already held. Use with
+ * care!
+ *
+ * @return 1 if the queue is empty
+ */
+ int queue_is_empty_i (void);
+
+ /// A helper routine used in drain_queue_i()
+ int drain_queue_helper (int &iovcnt, iovec iov[]);
+
+ /// These classes need privileged access to:
+ /// - schedule_output_i()
+ /// - cancel_output_i()
+ friend class TAO_Reactive_Flushing_Strategy;
+ friend class TAO_Leader_Follower_Flushing_Strategy;
+
+ /// Needs priveleged access to
+ /// event_handler_i ()
+ friend class TAO_Thread_Per_Connection_Handler;
+
+ /// Schedule handle_output() callbacks
+ int schedule_output_i (void);
+
+ /// Cancel handle_output() callbacks
+ int cancel_output_i (void);
+
+ /// Cleanup the queue.
+ /**
+ * Exactly @a byte_count bytes have been sent, the queue must be
+ * cleaned up as potentially several messages have been completely
+ * sent out.
+ * It leaves on head_ the next message to send out.
+ */
+ void cleanup_queue (size_t byte_count);
+
+ /// Cleanup the complete queue
+ void cleanup_queue_i ();
+
+ /// Check if the buffering constraints have been reached
+ int check_buffering_constraints_i (TAO_Stub *stub, bool &must_flush);
+
+ /// Send a synchronous message, i.e. block until the message is on
+ /// the wire
+ int send_synchronous_message_i (const ACE_Message_Block *message_block,
+ ACE_Time_Value *max_wait_time);
+
+ /// Send a reply message, i.e. do not block until the message is on
+ /// the wire, but just return after adding them to the queue.
+ int send_reply_message_i (const ACE_Message_Block *message_block,
+ ACE_Time_Value *max_wait_time);
+
+ /// Send an asynchronous message, i.e. do not block until the message is on
+ /// the wire
+ int send_asynchronous_message_i (TAO_Stub *stub,
+ const ACE_Message_Block *message_block,
+ ACE_Time_Value *max_wait_time);
+
+ /// A helper method used by send_synchronous_message_i() and
+ /// send_reply_message_i(). Reusable code that could be used by both
+ /// the methods.
+ int send_synch_message_helper_i (TAO_Synch_Queued_Message &s,
+ ACE_Time_Value *max_wait_time);
+
+ /// Check if the flush timer is still pending
+ int flush_timer_pending (void) const;
+
+ /// The flush timer expired or was explicitly cancelled, mark it as
+ /// not pending
+ void reset_flush_timer (void);
+
+ /// Print out error messages if the event handler is not valid
+ void report_invalid_event_handler (const char *caller);
+
+ /// Is invoked by handle_input operation. It consolidate message on
+ /// top of incoming_message_stack. The amount of missing data is
+ /// known and recv operation copies data directly into message buffer,
+ /// as much as a single recv-invocation provides.
+ int handle_input_missing_data (TAO_Resume_Handle &rh,
+ ACE_Time_Value *max_wait_time,
+ TAO_Queued_Data *q_data);
+
+ /// Is invoked by handle_input operation. It parses new messages from input stream
+ /// or consolidates messages whose header has been partially read, the message
+ /// size being unknown so far. It parses as much data as a single recv-invocation provides.
+ int handle_input_parse_data (TAO_Resume_Handle &rh,
+ ACE_Time_Value *max_wait_time);
+
+ /// Is invoked by handle_input_parse_data. Parses all messages remaining
+ /// in @a message_block.
+ int handle_input_parse_extra_messages (ACE_Message_Block &message_block);
+
+ /// @return -1 error, otherwise 0
+ int consolidate_enqueue_message (TAO_Queued_Data *qd);
+
+ /// @return -1 error, otherwise 0
+ int consolidate_process_message (TAO_Queued_Data *qd, TAO_Resume_Handle &rh);
+
+ /*
+ * Process the message that is in the head of the incoming queue.
+ * If there are more messages in the queue, this method calls
+ * this->notify_reactor () to wake up a thread
+ * @retval -1 on error
+ * @retval 0 if successfully processing enqueued messages
+ * @retval 1 if no message present in queue
+ */
+ int process_queue_head (TAO_Resume_Handle &rh);
+
+ /*
+ * This call prepares a new handler for the notify call and sends a
+ * notify () call to the reactor.
+ */
+ int notify_reactor (void);
+
+ /// Assume the lock is held
+ void send_connection_closed_notifications_i (void);
+
+ /// Allocate a partial message block and store it in our
+ /// partial_message_ data member.
+ void allocate_partial_message_block (void);
+
+ // Disallow copying and assignment.
+ TAO_Transport (const TAO_Transport&);
+ void operator= (const TAO_Transport&);
+
+ /*
+ * Specialization hook to add concrete private methods from
+ * TAO's protocol implementation onto the base Transport class
+ */
+
+ //@@ TAO_TRANSPORT_SPL_PRIVATE_METHODS_ADD_HOOK
+
+protected:
+
+ /// IOP protocol tag.
+ CORBA::ULong const tag_;
+
+ /// Global orbcore resource.
+ TAO_ORB_Core * const orb_core_;
+
+ /// Our entry in the cache. We don't own this. It is here for our
+ /// convenience. We cannot just change things around.
+ TAO::Transport_Cache_Manager::HASH_MAP_ENTRY *cache_map_entry_;
+
+ /// Strategy to decide whether multiple requests can be sent over the
+ /// same connection or the connection is exclusive for a request.
+ TAO_Transport_Mux_Strategy *tms_;
+
+ /// Strategy for waiting for the reply after sending the request.
+ TAO_Wait_Strategy *ws_;
+
+ /// Use to check if bidirectional info has been synchronized with
+ /// the peer.
+ /**
+ * Have we sent any info on bidirectional information or have we
+ * received any info regarding making the connection served by this
+ * transport bidirectional.
+ * The flag is used as follows:
+ * + We dont want to send the bidirectional context info more than
+ * once on the connection. Why? Waste of marshalling and
+ * demarshalling time on the client.
+ * + On the server side -- once a client that has established the
+ * connection asks the server to use the connection both ways, we
+ * *dont* want the server to pack service info to the client. That
+ * is not allowed. We need a flag to prevent such a things from
+ * happening.
+ *
+ * The value of this flag will be 0 if the client sends info and 1
+ * if the server receives the info.
+ */
+ int bidirectional_flag_;
+
+ TAO::Connection_Role opening_connection_role_;
+
+ /// Implement the outgoing data queue
+ TAO_Queued_Message *head_;
+ TAO_Queued_Message *tail_;
+
+ /// Queue of the consolidated, incoming messages..
+ TAO_Incoming_Message_Queue incoming_message_queue_;
+
+ /// Stack of incoming fragments, consolidated messages
+ /// are going to be enqueued in "incoming_message_queue_"
+ TAO::Incoming_Message_Stack incoming_message_stack_;
+
+ /// The queue will start draining no later than <queeing_deadline_>
+ /// *if* the deadline is
+ ACE_Time_Value current_deadline_;
+
+ /// The timer ID
+ long flush_timer_id_;
+
+ /// The adapter used to receive timeout callbacks from the Reactor
+ TAO_Transport_Timer transport_timer_;
+
+ /// Lock that insures that activities that *might* use handler-related
+ /// resources (such as a connection handler) get serialized.
+ /**
+ * This is an <code>ACE_Lock</code> that gets initialized from
+ * @c TAO_ORB_Core::resource_factory()->create_cached_connection_lock().
+ * This way, one can use a lock appropriate for the type of system, i.e.,
+ * a null lock for single-threaded systems, and a real lock for
+ * multi-threaded systems.
+ */
+ mutable ACE_Lock *handler_lock_;
+
+ /// A unique identifier for the transport.
+ /**
+ * This never *never* changes over the lifespan, so we don't have to worry
+ * about locking it.
+ *
+ * HINT: Protocol-specific transports that use connection handler
+ * might choose to set this to the handle for their connection.
+ */
+ size_t id_;
+
+ /// Used by the LRU, LFU and FIFO Connection Purging Strategies.
+ unsigned long purging_order_;
+
+ /// Size of the buffer received.
+ size_t recv_buffer_size_;
+
+ /// Number of bytes sent.
+ size_t sent_byte_count_;
+
+ /// Is this transport really connected or not. In case of oneways with
+ /// SYNC_NONE Policy we don't wait until the connection is ready and we
+ /// buffer the requests in this transport until the connection is ready
+ bool is_connected_;
+
+private:
+
+ /// @@Phil, I think it would be nice if we could think of a way to
+ /// do the following.
+ /// We have been trying to use the transport for marking about
+ /// translator factories and such! IMHO this is a wrong encapulation
+ /// ie. trying to populate the transport object with these
+ /// details. We should probably have a class something like
+ /// TAO_Message_Property or TAO_Message_Translator or whatever (I am
+ /// sure you get the idea) and encapsulate all these
+ /// details. Coupling these seems odd. if I have to be more cynical
+ /// we can move this to the connection_handler and it may more sense
+ /// with the DSCP stuff around there. Do you agree?
+
+ /// Additional member values required to support codeset translation
+ TAO_Codeset_Translator_Base *char_translator_;
+ TAO_Codeset_Translator_Base *wchar_translator_;
+
+ /// The tcs_set_ flag indicates that negotiation has occured and so the
+ /// translators are correct, since a null translator is valid if both ends
+ /// are using the same codeset, whatever that codeset might be.
+ CORBA::Boolean tcs_set_;
+
+ /// First_request_ is true until the first request is sent or received. This
+ /// is necessary since codeset context information is necessary only on the
+ /// first request. After that, the translators are fixed for the life of the
+ /// connection.
+ CORBA::Boolean first_request_;
+
+ /// Holds the partial GIOP message (if there is one)
+ ACE_Message_Block* partial_message_;
+
+#if TAO_HAS_SENDFILE == 1
+ /// mmap()-based allocator used to allocator output CDR buffers.
+ /**
+ * If this pointer is non-zero, sendfile() will be used to send data
+ * in a TAO_OutputCDR stream instance.
+ */
+ TAO_MMAP_Allocator * const mmap_allocator_;
+#endif /* TAO_HAS_SENDFILE==1 */
+
+ /// Statistics
+ TAO::Transport::Stats* stats_;
+
+ /*
+ * specialization hook to add class members from concrete
+ * transport class onto the base transport class. Please
+ * add any private members to this class *before* this hook.
+ */
+ //@@ TAO_TRANSPORT_SPL_DATA_MEMBERS_ADD_HOOK
+};
+
+/*
+ * Hook to add external typedefs and specializations to
+ * TAO's transport implementation.
+ */
+
+//@@ TAO_TRANSPORT_SPL_EXTERN_ADD_HOOK
+
+namespace TAO
+{
+ namespace Transport
+ {
+ /*
+ * @class Stats
+ *
+ * @brief Used to collect stats on a transport.
+ *
+ * The base class in (potentialy) extensible hierarchy used to
+ * specialize the information available for a specific protocol.
+ *
+ * This class is necessary for the implementation of the Transport
+ * Current feature.
+ *
+ * <B>See Also:</B>
+ *
+ * https://svn.dre.vanderbilt.edu/viewvc/Middleware/trunk/TAO/docs/transport_current/index.html?revision=HEAD
+ *
+ */
+ class TAO_Export Stats
+ {
+ public:
+ Stats ();
+
+ void messages_sent (size_t message_length);
+ CORBA::LongLong messages_sent (void) const;
+ CORBA::LongLong bytes_sent (void) const;
+
+ void messages_received (size_t message_length);
+ CORBA::LongLong messages_received (void) const;
+ CORBA::LongLong bytes_received (void) const;
+
+ void opened_since (const ACE_Time_Value& tv);
+ const ACE_Time_Value& opened_since (void) const;
+
+ private:
+ // @NOTE: I could have used bytes_rcvd_.samples_count() instead,
+ // however there was a suspicion that 32 bits would be
+ // insufficient.
+ CORBA::LongLong messages_rcvd_;
+
+ // @NOTE: I could have used bytes_sent_.samples_count() instead,
+ // however there was a suspicion that 32 bits would be
+ // insufficient.
+ CORBA::LongLong messages_sent_;
+
+ ACE_Basic_Stats bytes_rcvd_;
+ ACE_Basic_Stats bytes_sent_;
+
+ ACE_Time_Value opened_since_;
+ };
+ }
+}
+
+TAO_END_VERSIONED_NAMESPACE_DECL
+
+#if defined (__ACE_INLINE__)
+# include "tao/Transport.inl"
+#endif /* __ACE_INLINE__ */
+
+#include /**/ "ace/post.h"
+
+#endif /* TAO_TRANSPORT_H */