summaryrefslogtreecommitdiff
path: root/TAO/tao/Transport.h
diff options
context:
space:
mode:
authorcoryan <coryan@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>2001-04-24 08:02:58 +0000
committercoryan <coryan@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>2001-04-24 08:02:58 +0000
commitffb827a2938294145c0c74d7e06d837236d29694 (patch)
tree8c0bc2f7aa508472a4fd98dfca9d6afd60130f52 /TAO/tao/Transport.h
parentd01a55ccd330806cac88d2e978e6d3d59a844bef (diff)
downloadATCD-ffb827a2938294145c0c74d7e06d837236d29694.tar.gz
ChangeLogTag:Tue Apr 24 00:21:54 2001 Carlos O'Ryan <coryan@uci.edu>
Diffstat (limited to 'TAO/tao/Transport.h')
-rw-r--r--TAO/tao/Transport.h277
1 files changed, 180 insertions, 97 deletions
diff --git a/TAO/tao/Transport.h b/TAO/tao/Transport.h
index 00ca18315cf..f4ec1fefb62 100644
--- a/TAO/tao/Transport.h
+++ b/TAO/tao/Transport.h
@@ -9,7 +9,6 @@
* Define the interface for the Transport component in TAO's
* pluggable protocol framework.
*
- *
* @author Fred Kuhns <fredk@cs.wustl.edu>
*/
//=============================================================================
@@ -23,6 +22,8 @@
#include "Exception.h"
#include "Transport_Descriptor_Interface.h"
#include "Transport_Cache_Manager.h"
+#include "Transport_Timer.h"
+#include "ace/Strategies.h"
#if !defined (ACE_LACKS_PRAGMA_ONCE)
# pragma once
@@ -36,10 +37,7 @@ class TAO_Wait_Strategy;
class TAO_Connection_Handler;
class TAO_Pluggable_Messaging;
-#include "ace/Message_Queue.h"
-#include "ace/Strategies.h"
-
-typedef ACE_Message_Queue<ACE_NULL_SYNCH> TAO_Transport_Buffering_Queue;
+class TAO_Queued_Message;
class TAO_Export TAO_Synch_Refcountable : private ACE_Refcountable
{
@@ -57,7 +55,6 @@ protected:
ACE_Lock *refcount_lock_;
};
-
/**
* @class TAO_Transport
*
@@ -95,10 +92,10 @@ protected:
* transport may already be sending another message in a reactive
* fashion.
*
- * Consequently, the Transport must also keep a
- * <TT>current_message</TT>, if the current message is not null any
- * new messages must be queued. Only once the current message is
- * completely sent we can take a message out of the queue.
+ * Consequently, the Transport must also know if the head of the queue
+ * has been partially sent. In that case new messages can only follow
+ * the head. Only once the head is completely sent we can start
+ * sending new messages.
*
* <H4>Waiting threads:</H4> One or more threads can be blocked
* waiting for the connection to completely send the message.
@@ -146,11 +143,6 @@ protected:
*/
class TAO_Export TAO_Transport : private TAO_Synch_Refcountable
{
-
- friend class TAO_Transport_Sync_Strategy;
- friend class TAO_Eager_Buffering_Sync_Strategy;
- friend class TAO_Delayed_Buffering_Sync_Strategy;
-
public:
/// default creator, requres the tag value be supplied.
TAO_Transport (CORBA::ULong tag,
@@ -193,26 +185,8 @@ public:
*/
TAO_Wait_Strategy *wait_strategy (void) const;
- /// Send a request or queue it for later.
- /**
- * If the right policies are set queue the request for later.
- * Otherwise, or if the queue size has reached the configured
- * limits, start draining the queue.
- *
- * If any data is to be sent it blocks until the queue is completely
- * drained.
- *
- * This method serializes on handler_lock_, guaranteeing that only
- * thread can execute it on the same instance concurrently.
- *
- * @todo: this routine will probably go away as part of the
- * reorganization to support non-blocking writes.
- */
- // @@ lockme
- ssize_t send_or_buffer (TAO_Stub *stub,
- int two_way,
- const ACE_Message_Block *mblk,
- const ACE_Time_Value *s = 0);
+ /// Callback method to reactively drain the outgoing data queue
+ int handle_output (void);
/**
* Return the TSS leader follower condition variable used in the
@@ -221,21 +195,6 @@ public:
*/
virtual TAO_SYNCH_CONDITION *leader_follower_condition_variable (void);
- /// Queue for buffering transport messages.
- virtual TAO_Transport_Buffering_Queue &buffering_queue (void);
-
- /// Timer id associated with buffering.
- long buffering_timer_id (void) const;
- void buffering_timer_id (long);
-
- /// Timeout value associated with buffering.
- const ACE_Time_Value &buffering_timeout_value (void) const;
- void buffering_timeout_value (const ACE_Time_Value &time);
-
- /// Send any messages that have been buffered.
- // @@ lockme
- ssize_t send_buffered_messages (const ACE_Time_Value *max_wait_time = 0);
-
/**
* Initialising the messaging object. This would be used by the
* connector side. On the acceptor side the connection handler
@@ -252,7 +211,8 @@ public:
/**
* Called by the cache when the cache is closing in order to fill
* in a handle_set in a lock-safe manner.
- * @param handle_set the ACE_Handle_Set into which the transport should place any handle registered with the reactor
+ * @param handle_set the ACE_Handle_Set into which the transport
+ * should place any handle registered with the reactor
*/
void provide_handle (ACE_Handle_Set &handle_set);
@@ -267,6 +227,12 @@ public:
*/
void dequeue_all (void);
+ /// Check if there are messages pending in the queue
+ /**
+ * @return 1 if the queue is empty
+ */
+ int queue_is_empty (void);
+
/// Register the handler with the reactor.
/**
* This method is used by the Wait_On_Reactor strategy. The
@@ -340,9 +306,9 @@ public:
* down). In that case, it returns -1 and sets errno to
* <code>ENOENT</code>.
*/
- ssize_t send (const ACE_Message_Block *mblk,
- const ACE_Time_Value *timeout = 0,
- size_t *bytes_transferred = 0);
+ ssize_t send (iovec *iov, int iovcnt,
+ size_t &bytes_transferred,
+ const ACE_Time_Value *timeout = 0);
/// Read len bytes from into buf.
/**
@@ -433,9 +399,9 @@ protected:
* bytes already on the OS I/O subsystem.
*
*/
- virtual ssize_t send_i (const ACE_Message_Block *mblk,
- const ACE_Time_Value *timeout = 0,
- size_t *bytes_transferred = 0) = 0;
+ virtual ssize_t send_i (iovec *iov, int iovcnt,
+ size_t &bytes_transferred,
+ const ACE_Time_Value *timeout = 0) = 0;
// Read len bytes from into buf.
/**
@@ -491,7 +457,7 @@ public:
virtual int send_request (TAO_Stub *stub,
TAO_ORB_Core *orb_core,
TAO_OutputCDR &stream,
- int twoway,
+ int is_synchronous,
ACE_Time_Value *max_time_wait) = 0;
@@ -509,7 +475,7 @@ public:
// @@ lockme
virtual int send_message (TAO_OutputCDR &stream,
TAO_Stub *stub = 0,
- int twoway = 1,
+ int is_synchronous = 1,
ACE_Time_Value *max_time_wait = 0) = 0;
/// Callback to read incoming data
@@ -568,8 +534,6 @@ public:
*/
virtual int reactor_signalling (void);
- //@}
-
/// Method for the connection handler to signify that it
/// is being closed and destroyed.
virtual void connection_handler_closing (void);
@@ -577,14 +541,18 @@ public:
/// Register the associated connection handler with the reactor
/// for a timer.
/**
- * At this point, only <code>TAO_Eager_Buffering_Sync_Strategy::timer_check()</code>
- * uses this, and it's unclear whether it needs to stay around. But, it's here
- * because it uses the associated protocol-specific connection handler, and accesses
- * to that must be serialized on the internal lock.
- *
- * @param arg argument passed to the handle_timeout() method of the event handler
+ * At this point, only
+ * <code>TAO_Eager_Buffering_Sync_Strategy::timer_check()</code>
+ * uses this, and it's unclear whether it needs to stay around.
+ * But, it's here because it uses the associated protocol-specific
+ * connection handler, and accesses to that must be serialized on
+ * the internal lock.
+ *
+ * @param arg argument passed to the handle_timeout() method of the
+ * event handler
* @param delay time interval after which the timer will expire
- * @param interval time interval after which the timer will be automatically rescheduled
+ * @param interval time interval after which the timer will be
+ * automatically rescheduled
* @return -1 on failure, a Reactor timer_id value on success
*
* @see ACE_Reactor::schedule_timer()
@@ -594,7 +562,6 @@ public:
const ACE_Time_Value &delay,
const ACE_Time_Value &interval = ACE_Time_Value::zero);
-
// Maintain reference counting with these
static TAO_Transport* _duplicate (TAO_Transport* transport);
static void release (TAO_Transport* transport);
@@ -603,34 +570,145 @@ public:
int recache_transport (TAO_Transport_Descriptor_Interface* desc);
/// Set/Get the Cache Map entry
- void cache_map_entry (
- TAO_Transport_Cache_Manager::HASH_MAP_ENTRY *entry);
+ void cache_map_entry (TAO_Transport_Cache_Manager::HASH_MAP_ENTRY *entry);
+ TAO_Transport_Cache_Manager::HASH_MAP_ENTRY *cache_map_entry (void);
+
+ /// Send a message block chain,
+ int send_message_block_chain (const ACE_Message_Block *message_block,
+ size_t &bytes_transferred,
+ ACE_Time_Value *max_wait_time = 0);
+ /// Sent the contents of <message_block>
+ /**
+ * @todo This method name sucks, but send_message() was already
+ * taken by other silly methods!
+ *
+ * @param stub The object reference used for this operation, useful
+ * to obtain the current policies.
+ * @param is_synchronous If set this method will block until the
+ * operation is completely written on the wire
+ * @param message_block The CDR encapsulation of the GIOP message
+ * that must be sent. The message may consist of
+ * multiple Message Blocks chained through the cont()
+ * field.
+ * @param max_wait_time The maximum time that the operation can
+ * block, used in the implementation of timeouts.
+ *
+ */
+ /// the twoway flag or by the current policies in the stub.
+ int send_message_i (TAO_Stub *stub,
+ int is_synchronous,
+ const ACE_Message_Block *message_block,
+ ACE_Time_Value *max_wait_time);
+
+ /// Send a message block chain, assuming the lock is held
+ int send_message_block_chain_i (const ACE_Message_Block *message_block,
+ size_t &bytes_transferred,
+ ACE_Time_Value *max_wait_time);
+ /// Cache management
void mark_invalid (void);
+ /// Cache management
int make_idle (void);
-protected:
- // @@ see if one of these calls send_message()
- /// Remove the first message from the outgoing queue.
- void dequeue_head (void);
+ /// The timeout callback, invoked when any of the timers related to
+ /// this transport expire.
+ /**
+ * @param current_time The current time as reported from the Reactor
+ * @param act The Asynchronous Completion Token. Currently it is
+ * interpreted as follows:
+ * - If the ACT is the address of this->current_deadline_ the
+ * queueing timeout has expired and the queue should start
+ * flushing.
+ *
+ * @return Returns 0 if there are no problems, -1 if there is an
+ * error
+ *
+ * @todo In the future this function could be used to expire
+ * messages (oneways) that have been sitting for too long on
+ * the queue.
+ */
+ int handle_timeout (const ACE_Time_Value &current_time,
+ const void* act);
- /// Update the state of the outgoing queue, assuming that
- /// bytes_delivered bytes have been sent already.
- void reset_queued_message (ACE_Message_Block *message_block,
- size_t bytes_delivered);
+private:
+ /// Send some of the data in the queue.
+ /**
+ * As the outgoing data is drained this method is invoked to send as
+ * much of the current message as possible.
+ *
+ * Returns 0 if there is more data to send, -1 if there was an error
+ * and 1 if the message was completely sent.
+ */
+ int drain_queue (void);
- /// Update the state of the outgoing queue, this time a complete
- /// message was sent.
- void reset_sent_message (ACE_Message_Block *message_block,
- size_t bytes_delivered);
+ /// Implement drain_queue() assuming the lock is held
+ int drain_queue_i (void);
- /// Helper function used to implement the two methods above.
- void reset_message (ACE_Message_Block *message_block,
- size_t bytes_delivered,
- int queued_message);
+ /// This class needs priviledged access to
+ /// - queue_is_empty_i()
+ /// - drain_queue_i()
+ friend class TAO_Block_Flushing_Strategy;
+
+ /// Check if there are messages pending in the queue
+ /**
+ * This version assumes that the lock is already held. Use with
+ * care!
+ *
+ * @return 1 if the queue is empty
+ */
+ int queue_is_empty_i (void);
+
+ /// A helper routine used in drain_queue_i()
+ int drain_queue_helper (int &iovcnt, iovec iov[]);
+
+ /// This class needs privileged access to:
+ /// - schedule_output_i()
+ /// - cancel_output_i()
+ friend class TAO_Reactive_Flushing_Strategy;
+
+ /// Schedule handle_output() callbacks
+ int schedule_output_i (void);
+
+ /// Cancel handle_output() callbacks
+ int cancel_output_i (void);
+
+ /// Cleanup the queue.
+ /**
+ * Exactly <byte_count> bytes have been sent, the queue must be
+ * cleaned up as potentially several messages have been completely
+ * sent out.
+ * It leaves on head_ the next message to send out.
+ */
+ void cleanup_queue (size_t byte_count);
+
+ /// Copy the contents of a message block into a Queued_Message
+ /// TAO_Queued_Message *copy_message_block (const ACE_Message_Block *mb);
+
+ /// Check if the buffering constraints have been reached
+ int check_buffering_constraints_i (TAO_Stub *stub, int &must_flush);
+
+ /// Send a synchronous message, i.e. block until the message is on
+ /// the wire
+ int send_synchronous_message_i (const ACE_Message_Block *message_block,
+ ACE_Time_Value *max_wait_time);
+
+ /// Check if the flush timer is still pending
+ int flush_timer_pending (void) const;
+
+ /// The flush timer expired or was explicitly cancelled, mark it as
+ /// not pending
+ void reset_flush_timer (void);
+
+ /// Check if the underlying event handler is still valid.
+ /**
+ * @return Returns -1 if not, 0 if it is.
+ */
+ int check_event_handler_i (const char *caller);
+
+ /// Print out error messages if the event handler is not valid
+ void report_invalid_event_handler (const char *caller);
-private:
/// Prohibited
ACE_UNIMPLEMENTED_FUNC (TAO_Transport (const TAO_Transport&))
ACE_UNIMPLEMENTED_FUNC (void operator= (const TAO_Transport&))
@@ -653,15 +731,6 @@ protected:
/// Strategy for waiting for the reply after sending the request.
TAO_Wait_Strategy *ws_;
- /// Queue for buffering transport messages.
- TAO_Transport_Buffering_Queue *buffering_queue_;
-
- /// Buffering timer id.
- long buffering_timer_id_;
-
- /// Buffering timeout value.
- ACE_Time_Value buffering_timeout_value_;
-
/// Use to check if bidirectional info has been synchronized with
/// the peer.
/**
@@ -683,6 +752,20 @@ protected:
*/
int bidirectional_flag_;
+ /// Implement the outgoing data queue
+ TAO_Queued_Message *head_;
+ TAO_Queued_Message *tail_;
+
+ /// The queue will start draining no later than <queing_deadline_>
+ /// *if* the deadline is
+ ACE_Time_Value current_deadline_;
+
+ /// The timer ID
+ long flush_timer_id_;
+
+ /// The adapter used to receive timeout callbacks from the Reactor
+ TAO_Transport_Timer transport_timer_;
+
/// Lock that insures that activities that *might* use handler-related
/// resources (such as a connection handler) get serialized.
/**