diff options
Diffstat (limited to 'TAO/tao/Transport.h')
-rw-r--r-- | TAO/tao/Transport.h | 146 |
1 files changed, 83 insertions, 63 deletions
diff --git a/TAO/tao/Transport.h b/TAO/tao/Transport.h index f1a9ba45ac0..ca542ee7eb0 100644 --- a/TAO/tao/Transport.h +++ b/TAO/tao/Transport.h @@ -18,7 +18,7 @@ #include /**/ "ace/pre.h" -#include "tao/Transport_Cache_Manager.h" +#include "Transport_Cache_Manager.h" #if !defined (ACE_LACKS_PRAGMA_ONCE) # pragma once @@ -26,13 +26,8 @@ #include "tao/Transport_Timer.h" #include "tao/Incoming_Message_Queue.h" -#include "tao/Incoming_Message_Stack.h" #include "ace/Time_Value.h" -struct iovec; - -TAO_BEGIN_VERSIONED_NAMESPACE_DECL - class TAO_ORB_Core; class TAO_Target_Specification; class TAO_Operation_Details; @@ -46,6 +41,7 @@ class TAO_Queued_Message; class TAO_Synch_Queued_Message; class TAO_Resume_Handle; class TAO_Stub; +struct iovec; namespace TAO { @@ -183,20 +179,18 @@ namespace TAO * * We solve the problems as follows * - * (a) First do a read with the buffer on stack. Query the underlying - * messaging object whether the message has any incomplete - * portion. If so, data will be copied into new buffer being able - * to hold full message and is queued; succeeding events will read - * data from socket and write directly into this buffer. - * Otherwise, if if the message in local buffer is complete, we free - * the handle and then send the message to the higher layers of the - * ORB for processing. + * (a) First do a read with the buffer on stack. Query the underlying + * messaging object whether the message has any incomplete + * portion. If so, we just grow the buffer for the missing size + * and read the rest of the message. We free the handle and then + * send the message to the higher layers of the ORB for + * processing. * - * (b) If buffer with incomplete message has been enqueued, while trying - * to do the above, the reactor will call us back when the handle - * becomes read ready. The read-operation will copy data directly - * into the enqueued buffer. If the message has bee read completely - * the message is sent to the higher layers of the ORB for processing. + * (b) If we block (ie. if we receive a EWOULDBLOCK) while trying to + * do the above (ie. trying to read after growing the buffer + * size) we put the message in a queue and return back to the + * reactor. The reactor would call us back when the handle + * becomes read ready. * * (c) If we get multiple messages (possible if the client connected * to the server sends oneways or AMI requests), we parse and @@ -370,7 +364,9 @@ public: * transformations of the data, such as SSLIOP or protocols that * compress the stream. * - * @param iov contains the data that must be sent. + * @param mblk contains the data that must be sent. For each + * message block in the cont() chain all the data between rd_ptr() + * and wr_ptr() should be delivered to the remote peer. * * @param timeout is the maximum time that the application is * willing to wait for the data to be sent, useful in platforms that @@ -393,8 +389,7 @@ public: * down). In that case, it returns -1 and sets errno to * <code>ENOENT</code>. */ - virtual ssize_t send (iovec *iov, - int iovcnt, + virtual ssize_t send (iovec *iov, int iovcnt, size_t &bytes_transferred, const ACE_Time_Value *timeout = 0) = 0; @@ -463,7 +458,8 @@ public: * valid certificates. There are no pre_connect_hooks () since the * transport doesn't exist before a connection establishment. :-) * - * @note The methods are not made const with a reason. + * + * @@NOTE: The methods are not made const with a reason. */ virtual bool post_connect_hook (void); @@ -647,11 +643,60 @@ public: protected: + /// Called by the handle_input_i(). This method is used to parse + /// message read by the handle_input_i() call. It also decides + /// whether the message needs consolidation before processing. + int parse_consolidate_messages (ACE_Message_Block &bl, + TAO_Resume_Handle &rh, + ACE_Time_Value *time = 0); + + + /// Method does parsing of the message if we have a fresh message in + /// the @a message_block or just returns if we have read part of the + /// previously stored message. + int parse_incoming_messages (ACE_Message_Block &message_block); + + /// Return if we have any missing data in the queue of messages + /// or determine if we have more information left out in the + /// presently read message to make it complete. + ssize_t missing_data (ACE_Message_Block &message_block); + + /// Consolidate the currently read message or consolidate the last + /// message in the queue. The consolidation of the last message in + /// the queue is done by calling consolidate_message_queue (). + virtual int consolidate_message (ACE_Message_Block &incoming, + ssize_t missing_data, + TAO_Resume_Handle &rh, + ACE_Time_Value *max_wait_time); + + /// @@Bala: Docu??? + int consolidate_fragments (TAO_Queued_Data *qd, + TAO_Resume_Handle &rh); + + /// First consolidate the message queue. If the message is still not + /// complete, try to read from the handle again to make it + /// complete. If these dont help put the message back in the queue + /// and try to check the queue if we have message to process. (the + /// thread needs to do some work anyway :-)) + int consolidate_message_queue (ACE_Message_Block &incoming, + ssize_t missing_data, + TAO_Resume_Handle &rh, + ACE_Time_Value *max_wait_time); + + /// Called by parse_consolidate_message () if we have more messages + /// in one read. Queue up the messages and try to process one of + /// them, atleast at the head of them. + int consolidate_extra_messages (ACE_Message_Block &incoming, + TAO_Resume_Handle &rh); + /// Process the message by sending it to the higher layers of the /// ORB. int process_parsed_messages (TAO_Queued_Data *qd, TAO_Resume_Handle &rh); + /// Make a queued data from the @a incoming message block + TAO_Queued_Data *make_queued_data (ACE_Message_Block &incoming); + /// Implement send_message_shared() assuming the handler_lock_ is /// held. int send_message_shared_i (TAO_Stub *stub, @@ -839,37 +884,10 @@ private: /// Print out error messages if the event handler is not valid void report_invalid_event_handler (const char *caller); - /// Is invoked by handle_input operation. It consolidate message on - /// top of incoming_message_stack. The amount of missing data is - /// known and recv operation copies data directly into message buffer, - /// as much as a single recv-invocation provides. - int handle_input_missing_data (TAO_Resume_Handle &rh, - ACE_Time_Value *max_wait_time, - TAO_Queued_Data *q_data); - - /// Is invoked by handle_input operation. It parses new messages from input stream - /// or consolidates messages whose header has been partially read, the message - /// size being unknown so far. It parses as much data as a single recv-invocation provides. - int handle_input_parse_data (TAO_Resume_Handle &rh, - ACE_Time_Value *max_wait_time); - - /// Is invoked by handle_input_parse_data. Parses all messages remaining - /// in @a message_block. - int handle_input_parse_extra_messages (ACE_Message_Block &message_block); - - /// @return -1 error, otherwise 0 - int consolidate_enqueue_message (TAO_Queued_Data *qd); - - /// @return -1 error, otherwise 0 - int consolidate_process_message (TAO_Queued_Data *qd, TAO_Resume_Handle &rh); - /* * Process the message that is in the head of the incoming queue. * If there are more messages in the queue, this method calls * this->notify_reactor () to wake up a thread - * @retval -1 on error - * @retval 0 if successfully processing enqueued messages - * @retval 1 if no message present in queue */ int process_queue_head (TAO_Resume_Handle &rh); @@ -882,13 +900,21 @@ private: /// Assume the lock is held void send_connection_closed_notifications_i (void); + /// Process a non-version specific fragment by either consolidating + /// the fragments or enqueuing the queueable message + void process_fragment (TAO_Queued_Data* fragment_message, + TAO_Queued_Data* queueable_message, + CORBA::Octet major, + CORBA::Octet minor, + TAO_Resume_Handle &rh); + /// Allocate a partial message block and store it in our /// partial_message_ data member. void allocate_partial_message_block (void); - // Disallow copying and assignment. - TAO_Transport (const TAO_Transport&); - void operator= (const TAO_Transport&); + /// Prohibited + ACE_UNIMPLEMENTED_FUNC (TAO_Transport (const TAO_Transport&)) + ACE_UNIMPLEMENTED_FUNC (void operator= (const TAO_Transport&)) /* * Specialization hook to add concrete private methods from @@ -905,7 +931,7 @@ protected: /// Global orbcore resource. TAO_ORB_Core *orb_core_; - /// Our entry in the cache. We don't own this. It is here for our + /// Our entry in the cache. We dont own this. It is here for our /// convenience. We cannot just change things around. TAO::Transport_Cache_Manager::HASH_MAP_ENTRY *cache_map_entry_; @@ -943,14 +969,10 @@ protected: TAO_Queued_Message *head_; TAO_Queued_Message *tail_; - /// Queue of the consolidated, incoming messages.. + /// Queue of the incoming messages.. TAO_Incoming_Message_Queue incoming_message_queue_; - /// Stack of incoming fragments, consolidated messages - /// are going to be enqueued in "incoming_message_queue_" - TAO::Incoming_Message_Stack incoming_message_stack_; - - /// The queue will start draining no later than <queeing_deadline_> + /// The queue will start draining no later than <queing_deadline_> /// *if* the deadline is ACE_Time_Value current_deadline_; @@ -1042,10 +1064,8 @@ private: //@@ TAO_TRANSPORT_SPL_EXTERN_ADD_HOOK -TAO_END_VERSIONED_NAMESPACE_DECL - #if defined (__ACE_INLINE__) -# include "tao/Transport.inl" +# include "Transport.inl" #endif /* __ACE_INLINE__ */ #include /**/ "ace/post.h" |