diff options
author | coryan <coryan@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 2001-04-24 08:02:58 +0000 |
---|---|---|
committer | coryan <coryan@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 2001-04-24 08:02:58 +0000 |
commit | ffb827a2938294145c0c74d7e06d837236d29694 (patch) | |
tree | 8c0bc2f7aa508472a4fd98dfca9d6afd60130f52 /TAO/tao/Transport.h | |
parent | d01a55ccd330806cac88d2e978e6d3d59a844bef (diff) | |
download | ATCD-ffb827a2938294145c0c74d7e06d837236d29694.tar.gz |
ChangeLogTag:Tue Apr 24 00:21:54 2001 Carlos O'Ryan <coryan@uci.edu>
Diffstat (limited to 'TAO/tao/Transport.h')
-rw-r--r-- | TAO/tao/Transport.h | 277 |
1 files changed, 180 insertions, 97 deletions
diff --git a/TAO/tao/Transport.h b/TAO/tao/Transport.h index 00ca18315cf..f4ec1fefb62 100644 --- a/TAO/tao/Transport.h +++ b/TAO/tao/Transport.h @@ -9,7 +9,6 @@ * Define the interface for the Transport component in TAO's * pluggable protocol framework. * - * * @author Fred Kuhns <fredk@cs.wustl.edu> */ //============================================================================= @@ -23,6 +22,8 @@ #include "Exception.h" #include "Transport_Descriptor_Interface.h" #include "Transport_Cache_Manager.h" +#include "Transport_Timer.h" +#include "ace/Strategies.h" #if !defined (ACE_LACKS_PRAGMA_ONCE) # pragma once @@ -36,10 +37,7 @@ class TAO_Wait_Strategy; class TAO_Connection_Handler; class TAO_Pluggable_Messaging; -#include "ace/Message_Queue.h" -#include "ace/Strategies.h" - -typedef ACE_Message_Queue<ACE_NULL_SYNCH> TAO_Transport_Buffering_Queue; +class TAO_Queued_Message; class TAO_Export TAO_Synch_Refcountable : private ACE_Refcountable { @@ -57,7 +55,6 @@ protected: ACE_Lock *refcount_lock_; }; - /** * @class TAO_Transport * @@ -95,10 +92,10 @@ protected: * transport may already be sending another message in a reactive * fashion. * - * Consequently, the Transport must also keep a - * <TT>current_message</TT>, if the current message is not null any - * new messages must be queued. Only once the current message is - * completely sent we can take a message out of the queue. + * Consequently, the Transport must also know if the head of the queue + * has been partially sent. In that case new messages can only follow + * the head. Only once the head is completely sent we can start + * sending new messages. * * <H4>Waiting threads:</H4> One or more threads can be blocked * waiting for the connection to completely send the message. @@ -146,11 +143,6 @@ protected: */ class TAO_Export TAO_Transport : private TAO_Synch_Refcountable { - - friend class TAO_Transport_Sync_Strategy; - friend class TAO_Eager_Buffering_Sync_Strategy; - friend class TAO_Delayed_Buffering_Sync_Strategy; - public: /// default creator, requres the tag value be supplied. TAO_Transport (CORBA::ULong tag, @@ -193,26 +185,8 @@ public: */ TAO_Wait_Strategy *wait_strategy (void) const; - /// Send a request or queue it for later. - /** - * If the right policies are set queue the request for later. - * Otherwise, or if the queue size has reached the configured - * limits, start draining the queue. - * - * If any data is to be sent it blocks until the queue is completely - * drained. - * - * This method serializes on handler_lock_, guaranteeing that only - * thread can execute it on the same instance concurrently. - * - * @todo: this routine will probably go away as part of the - * reorganization to support non-blocking writes. - */ - // @@ lockme - ssize_t send_or_buffer (TAO_Stub *stub, - int two_way, - const ACE_Message_Block *mblk, - const ACE_Time_Value *s = 0); + /// Callback method to reactively drain the outgoing data queue + int handle_output (void); /** * Return the TSS leader follower condition variable used in the @@ -221,21 +195,6 @@ public: */ virtual TAO_SYNCH_CONDITION *leader_follower_condition_variable (void); - /// Queue for buffering transport messages. - virtual TAO_Transport_Buffering_Queue &buffering_queue (void); - - /// Timer id associated with buffering. - long buffering_timer_id (void) const; - void buffering_timer_id (long); - - /// Timeout value associated with buffering. - const ACE_Time_Value &buffering_timeout_value (void) const; - void buffering_timeout_value (const ACE_Time_Value &time); - - /// Send any messages that have been buffered. - // @@ lockme - ssize_t send_buffered_messages (const ACE_Time_Value *max_wait_time = 0); - /** * Initialising the messaging object. This would be used by the * connector side. On the acceptor side the connection handler @@ -252,7 +211,8 @@ public: /** * Called by the cache when the cache is closing in order to fill * in a handle_set in a lock-safe manner. - * @param handle_set the ACE_Handle_Set into which the transport should place any handle registered with the reactor + * @param handle_set the ACE_Handle_Set into which the transport + * should place any handle registered with the reactor */ void provide_handle (ACE_Handle_Set &handle_set); @@ -267,6 +227,12 @@ public: */ void dequeue_all (void); + /// Check if there are messages pending in the queue + /** + * @return 1 if the queue is empty + */ + int queue_is_empty (void); + /// Register the handler with the reactor. /** * This method is used by the Wait_On_Reactor strategy. The @@ -340,9 +306,9 @@ public: * down). In that case, it returns -1 and sets errno to * <code>ENOENT</code>. */ - ssize_t send (const ACE_Message_Block *mblk, - const ACE_Time_Value *timeout = 0, - size_t *bytes_transferred = 0); + ssize_t send (iovec *iov, int iovcnt, + size_t &bytes_transferred, + const ACE_Time_Value *timeout = 0); /// Read len bytes from into buf. /** @@ -433,9 +399,9 @@ protected: * bytes already on the OS I/O subsystem. * */ - virtual ssize_t send_i (const ACE_Message_Block *mblk, - const ACE_Time_Value *timeout = 0, - size_t *bytes_transferred = 0) = 0; + virtual ssize_t send_i (iovec *iov, int iovcnt, + size_t &bytes_transferred, + const ACE_Time_Value *timeout = 0) = 0; // Read len bytes from into buf. /** @@ -491,7 +457,7 @@ public: virtual int send_request (TAO_Stub *stub, TAO_ORB_Core *orb_core, TAO_OutputCDR &stream, - int twoway, + int is_synchronous, ACE_Time_Value *max_time_wait) = 0; @@ -509,7 +475,7 @@ public: // @@ lockme virtual int send_message (TAO_OutputCDR &stream, TAO_Stub *stub = 0, - int twoway = 1, + int is_synchronous = 1, ACE_Time_Value *max_time_wait = 0) = 0; /// Callback to read incoming data @@ -568,8 +534,6 @@ public: */ virtual int reactor_signalling (void); - //@} - /// Method for the connection handler to signify that it /// is being closed and destroyed. virtual void connection_handler_closing (void); @@ -577,14 +541,18 @@ public: /// Register the associated connection handler with the reactor /// for a timer. /** - * At this point, only <code>TAO_Eager_Buffering_Sync_Strategy::timer_check()</code> - * uses this, and it's unclear whether it needs to stay around. But, it's here - * because it uses the associated protocol-specific connection handler, and accesses - * to that must be serialized on the internal lock. - * - * @param arg argument passed to the handle_timeout() method of the event handler + * At this point, only + * <code>TAO_Eager_Buffering_Sync_Strategy::timer_check()</code> + * uses this, and it's unclear whether it needs to stay around. + * But, it's here because it uses the associated protocol-specific + * connection handler, and accesses to that must be serialized on + * the internal lock. + * + * @param arg argument passed to the handle_timeout() method of the + * event handler * @param delay time interval after which the timer will expire - * @param interval time interval after which the timer will be automatically rescheduled + * @param interval time interval after which the timer will be + * automatically rescheduled * @return -1 on failure, a Reactor timer_id value on success * * @see ACE_Reactor::schedule_timer() @@ -594,7 +562,6 @@ public: const ACE_Time_Value &delay, const ACE_Time_Value &interval = ACE_Time_Value::zero); - // Maintain reference counting with these static TAO_Transport* _duplicate (TAO_Transport* transport); static void release (TAO_Transport* transport); @@ -603,34 +570,145 @@ public: int recache_transport (TAO_Transport_Descriptor_Interface* desc); /// Set/Get the Cache Map entry - void cache_map_entry ( - TAO_Transport_Cache_Manager::HASH_MAP_ENTRY *entry); + void cache_map_entry (TAO_Transport_Cache_Manager::HASH_MAP_ENTRY *entry); + TAO_Transport_Cache_Manager::HASH_MAP_ENTRY *cache_map_entry (void); + + /// Send a message block chain, + int send_message_block_chain (const ACE_Message_Block *message_block, + size_t &bytes_transferred, + ACE_Time_Value *max_wait_time = 0); + /// Sent the contents of <message_block> + /** + * @todo This method name sucks, but send_message() was already + * taken by other silly methods! + * + * @param stub The object reference used for this operation, useful + * to obtain the current policies. + * @param is_synchronous If set this method will block until the + * operation is completely written on the wire + * @param message_block The CDR encapsulation of the GIOP message + * that must be sent. The message may consist of + * multiple Message Blocks chained through the cont() + * field. + * @param max_wait_time The maximum time that the operation can + * block, used in the implementation of timeouts. + * + */ + /// the twoway flag or by the current policies in the stub. + int send_message_i (TAO_Stub *stub, + int is_synchronous, + const ACE_Message_Block *message_block, + ACE_Time_Value *max_wait_time); + + /// Send a message block chain, assuming the lock is held + int send_message_block_chain_i (const ACE_Message_Block *message_block, + size_t &bytes_transferred, + ACE_Time_Value *max_wait_time); + /// Cache management void mark_invalid (void); + /// Cache management int make_idle (void); -protected: - // @@ see if one of these calls send_message() - /// Remove the first message from the outgoing queue. - void dequeue_head (void); + /// The timeout callback, invoked when any of the timers related to + /// this transport expire. + /** + * @param current_time The current time as reported from the Reactor + * @param act The Asynchronous Completion Token. Currently it is + * interpreted as follows: + * - If the ACT is the address of this->current_deadline_ the + * queueing timeout has expired and the queue should start + * flushing. + * + * @return Returns 0 if there are no problems, -1 if there is an + * error + * + * @todo In the future this function could be used to expire + * messages (oneways) that have been sitting for too long on + * the queue. + */ + int handle_timeout (const ACE_Time_Value ¤t_time, + const void* act); - /// Update the state of the outgoing queue, assuming that - /// bytes_delivered bytes have been sent already. - void reset_queued_message (ACE_Message_Block *message_block, - size_t bytes_delivered); +private: + /// Send some of the data in the queue. + /** + * As the outgoing data is drained this method is invoked to send as + * much of the current message as possible. + * + * Returns 0 if there is more data to send, -1 if there was an error + * and 1 if the message was completely sent. + */ + int drain_queue (void); - /// Update the state of the outgoing queue, this time a complete - /// message was sent. - void reset_sent_message (ACE_Message_Block *message_block, - size_t bytes_delivered); + /// Implement drain_queue() assuming the lock is held + int drain_queue_i (void); - /// Helper function used to implement the two methods above. - void reset_message (ACE_Message_Block *message_block, - size_t bytes_delivered, - int queued_message); + /// This class needs priviledged access to + /// - queue_is_empty_i() + /// - drain_queue_i() + friend class TAO_Block_Flushing_Strategy; + + /// Check if there are messages pending in the queue + /** + * This version assumes that the lock is already held. Use with + * care! + * + * @return 1 if the queue is empty + */ + int queue_is_empty_i (void); + + /// A helper routine used in drain_queue_i() + int drain_queue_helper (int &iovcnt, iovec iov[]); + + /// This class needs privileged access to: + /// - schedule_output_i() + /// - cancel_output_i() + friend class TAO_Reactive_Flushing_Strategy; + + /// Schedule handle_output() callbacks + int schedule_output_i (void); + + /// Cancel handle_output() callbacks + int cancel_output_i (void); + + /// Cleanup the queue. + /** + * Exactly <byte_count> bytes have been sent, the queue must be + * cleaned up as potentially several messages have been completely + * sent out. + * It leaves on head_ the next message to send out. + */ + void cleanup_queue (size_t byte_count); + + /// Copy the contents of a message block into a Queued_Message + /// TAO_Queued_Message *copy_message_block (const ACE_Message_Block *mb); + + /// Check if the buffering constraints have been reached + int check_buffering_constraints_i (TAO_Stub *stub, int &must_flush); + + /// Send a synchronous message, i.e. block until the message is on + /// the wire + int send_synchronous_message_i (const ACE_Message_Block *message_block, + ACE_Time_Value *max_wait_time); + + /// Check if the flush timer is still pending + int flush_timer_pending (void) const; + + /// The flush timer expired or was explicitly cancelled, mark it as + /// not pending + void reset_flush_timer (void); + + /// Check if the underlying event handler is still valid. + /** + * @return Returns -1 if not, 0 if it is. + */ + int check_event_handler_i (const char *caller); + + /// Print out error messages if the event handler is not valid + void report_invalid_event_handler (const char *caller); -private: /// Prohibited ACE_UNIMPLEMENTED_FUNC (TAO_Transport (const TAO_Transport&)) ACE_UNIMPLEMENTED_FUNC (void operator= (const TAO_Transport&)) @@ -653,15 +731,6 @@ protected: /// Strategy for waiting for the reply after sending the request. TAO_Wait_Strategy *ws_; - /// Queue for buffering transport messages. - TAO_Transport_Buffering_Queue *buffering_queue_; - - /// Buffering timer id. - long buffering_timer_id_; - - /// Buffering timeout value. - ACE_Time_Value buffering_timeout_value_; - /// Use to check if bidirectional info has been synchronized with /// the peer. /** @@ -683,6 +752,20 @@ protected: */ int bidirectional_flag_; + /// Implement the outgoing data queue + TAO_Queued_Message *head_; + TAO_Queued_Message *tail_; + + /// The queue will start draining no later than <queing_deadline_> + /// *if* the deadline is + ACE_Time_Value current_deadline_; + + /// The timer ID + long flush_timer_id_; + + /// The adapter used to receive timeout callbacks from the Reactor + TAO_Transport_Timer transport_timer_; + /// Lock that insures that activities that *might* use handler-related /// resources (such as a connection handler) get serialized. /** |