diff options
author | coryan <coryan@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 2001-04-14 20:02:29 +0000 |
---|---|---|
committer | coryan <coryan@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 2001-04-14 20:02:29 +0000 |
commit | 8eea1d472d53c2bc5cafde8e4f66503a092aca30 (patch) | |
tree | ff07cd7297cba5786be142a390acc2a1ed559319 /TAO/tao | |
parent | e52bcc5027d3ba4d24787c71eff791a15385e13a (diff) | |
download | ATCD-8eea1d472d53c2bc5cafde8e4f66503a092aca30.tar.gz |
ChangeLogTag:Sat Apr 14 12:59:39 2001 Carlos O'Ryan <coryan@uci.edu>
Diffstat (limited to 'TAO/tao')
-rw-r--r-- | TAO/tao/Block_Flushing_Strategy.cpp | 46 | ||||
-rw-r--r-- | TAO/tao/Block_Flushing_Strategy.h | 40 | ||||
-rw-r--r-- | TAO/tao/Flushing_Strategy.cpp | 10 | ||||
-rw-r--r-- | TAO/tao/Flushing_Strategy.h | 69 | ||||
-rw-r--r-- | TAO/tao/IIOP_Transport.cpp | 6 | ||||
-rw-r--r-- | TAO/tao/Message_Sent_Callback.cpp | 14 | ||||
-rw-r--r-- | TAO/tao/Message_Sent_Callback.h | 60 | ||||
-rw-r--r-- | TAO/tao/Message_Sent_Callback.inl | 0 | ||||
-rw-r--r-- | TAO/tao/Queued_Message.cpp | 103 | ||||
-rw-r--r-- | TAO/tao/Queued_Message.h | 214 | ||||
-rw-r--r-- | TAO/tao/Queued_Message.inl | 13 | ||||
-rw-r--r-- | TAO/tao/Reactive_Flushing_Strategy.cpp | 77 | ||||
-rw-r--r-- | TAO/tao/Reactive_Flushing_Strategy.h | 40 |
13 files changed, 692 insertions, 0 deletions
diff --git a/TAO/tao/Block_Flushing_Strategy.cpp b/TAO/tao/Block_Flushing_Strategy.cpp new file mode 100644 index 00000000000..fc402ec48bf --- /dev/null +++ b/TAO/tao/Block_Flushing_Strategy.cpp @@ -0,0 +1,46 @@ +// -*- C++ -*- +// $Id$ + +#include "Block_Flushing_Strategy.h" +#include "Transport.h" +#include "Queued_Message.h" + +ACE_RCSID(tao, Block_Flushing_Strategy, "$Id$") + +int +TAO_Block_Flushing_Strategy::schedule_output (TAO_Transport *) +{ + return 0; +} + +int +TAO_Block_Flushing_Strategy::cancel_output (TAO_Transport *) +{ + return 0; +} + +int +TAO_Block_Flushing_Strategy::flush_message (TAO_Transport *transport, + TAO_Queued_Message *msg, + ACE_Time_Value *) +{ + while (!msg->all_data_sent ()) + { + int result = transport->handle_output (); + if (result == -1) + return -1; + } + return 0; +} + +int +TAO_Block_Flushing_Strategy::flush_transport (TAO_Transport *transport) +{ + while (!transport->queue_is_empty ()) + { + int result = transport->handle_output (); + if (result == -1) + return -1; + } + return 0; +} diff --git a/TAO/tao/Block_Flushing_Strategy.h b/TAO/tao/Block_Flushing_Strategy.h new file mode 100644 index 00000000000..9b41ef8dd17 --- /dev/null +++ b/TAO/tao/Block_Flushing_Strategy.h @@ -0,0 +1,40 @@ +// -*- C++ -*- + +//============================================================================= +/** + * @file Block_Flushing_Strategy.h + * + * $Id$ + * + * @author Carlos O'Ryan <coryan@uci.edu> + */ +//============================================================================= + +#ifndef TAO_BLOCK_FLUSHING_STRATEGY_H +#define TAO_BLOCK_FLUSHING_STRATEGY_H +#include "ace/pre.h" + +#include "Flushing_Strategy.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +/** + * @class TAO_Block_Flushing_Strategy + * + * @brief Implement a flushing strategy that blocks on write to flush + */ +class TAO_Export TAO_Block_Flushing_Strategy : public TAO_Flushing_Strategy +{ +public: + virtual int schedule_output (TAO_Transport *transport); + virtual int cancel_output (TAO_Transport *transport); + virtual int flush_message (TAO_Transport *transport, + TAO_Queued_Message *msg, + ACE_Time_Value *max_wait_time); + virtual int flush_transport (TAO_Transport *transport); +}; + +#include "ace/post.h" +#endif /* TAO_BLOCK_FLUSHING_STRATEGY_H */ diff --git a/TAO/tao/Flushing_Strategy.cpp b/TAO/tao/Flushing_Strategy.cpp new file mode 100644 index 00000000000..82a66ec1427 --- /dev/null +++ b/TAO/tao/Flushing_Strategy.cpp @@ -0,0 +1,10 @@ +// -*- C++ -*- +// $Id$ + +#include "Flushing_Strategy.h" + +ACE_RCSID(tao, Flushing_Strategy, "$Id$") + +TAO_Flushing_Strategy::~TAO_Flushing_Strategy (void) +{ +} diff --git a/TAO/tao/Flushing_Strategy.h b/TAO/tao/Flushing_Strategy.h new file mode 100644 index 00000000000..0873936ccf4 --- /dev/null +++ b/TAO/tao/Flushing_Strategy.h @@ -0,0 +1,69 @@ +// -*- C++ -*- + +//============================================================================= +/** + * @file Flushing_Strategy.h + * + * $Id$ + * + * @author Carlos O'Ryan <coryan@uci.edu> + */ +//============================================================================= + +#ifndef TAO_FLUSHING_STRATEGY_H +#define TAO_FLUSHING_STRATEGY_H +#include "ace/pre.h" + +#include "corbafwd.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +class TAO_Transport; +class TAO_Queued_Message; + +/** + * @class TAO_Flushing_Strategy + * + * @brief Define the interface for the flushing strategy, i.e. the + * algorithm that controls how does the ORB flush outgoing + * data. + * + * Please read the documentation in the TAO_Transport class to find + * out more about the design of the outgoing data path. + * + * Some applications can block the current thread whenever they are + * sending out data. In those cases they can obtain better + * performance by blocking in calls to write() than by participating + * in the Leader/Followers protocol to shared the ORB Reactor. + * + * This strategy controls how does the ORB schedule and cancel + * reactive I/O, if there is no reactive I/O the strategy is just a + * no-op. + * + */ +class TAO_Export TAO_Flushing_Strategy +{ +public: + /// Destructor + virtual ~TAO_Flushing_Strategy (void); + + /// Schedule the transport argument to be flushed + virtual int schedule_output (TAO_Transport *transport) = 0; + + /// Cancel all scheduled output for the transport argument + virtual int cancel_output (TAO_Transport *transport) = 0; + + /// Wait until msg is sent out. Potentially other messages are + /// flushed too, for example, because there are ahead in the queue. + virtual int flush_message (TAO_Transport *transport, + TAO_Queued_Message *msg, + ACE_Time_Value *max_wait_time) = 0; + + /// Wait until the transport has no messages queued. + virtual int flush_transport (TAO_Transport *transport) = 0; +}; + +#include "ace/post.h" +#endif /* TAO_FLUSHING_STRATEGY_H */ diff --git a/TAO/tao/IIOP_Transport.cpp b/TAO/tao/IIOP_Transport.cpp index 34a6d41d9b5..b9dd65ee1ff 100644 --- a/TAO/tao/IIOP_Transport.cpp +++ b/TAO/tao/IIOP_Transport.cpp @@ -65,6 +65,12 @@ TAO_IIOP_Transport::event_handler_i (void) return this->connection_handler_; } +TAO_Pluggable_Messaging * +TAO_IIOP_Transport::messaging_object (void) +{ + return this->messaging_object_; +} + ssize_t TAO_IIOP_Transport::send_i (iovec *iov, int iovcnt, size_t &bytes_transferred, diff --git a/TAO/tao/Message_Sent_Callback.cpp b/TAO/tao/Message_Sent_Callback.cpp new file mode 100644 index 00000000000..5508945fbde --- /dev/null +++ b/TAO/tao/Message_Sent_Callback.cpp @@ -0,0 +1,14 @@ +// -*- C++ -*- +// $Id$ + +#include "Message_Sent_Callback.h" + +#if !defined (__ACE_INLINE__) +# include "Message_Sent_Callback.inl" +#endif /* __ACE_INLINE__ */ + +ACE_RCSID(tao, Message_Sent_Callback, "$Id$") + +TAO_Message_Sent_Callback::~TAO_Message_Sent_Callback (void) +{ +} diff --git a/TAO/tao/Message_Sent_Callback.h b/TAO/tao/Message_Sent_Callback.h new file mode 100644 index 00000000000..5a3bccda8ed --- /dev/null +++ b/TAO/tao/Message_Sent_Callback.h @@ -0,0 +1,60 @@ +// -*- C++ -*- + +//============================================================================= +/** + * @file Message_Sent_Callback.h + * + * $Id$ + * + * @author Carlos O'Ryan <coryan@uci.edu> + */ +//============================================================================= + +#ifndef TAO_MESSAGE_SENT_CALLBACK_H +#define TAO_MESSAGE_SENT_CALLBACK_H +#include "ace/pre.h" + +#include "corbafwd.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +/** + * @class TAO_Message_Sent_Callback + * + * @brief Encapsulate the signaling mechanism used to wake up threads + * waiting for a message to be sent out. + * + * Please read the documentation in the TAO_Transport class to find + * out more about the design of the outgoing data path. + * + */ +class TAO_Export TAO_Message_Sent_Callback +{ +public: + /// Constructor + TAO_Message_Sent_Callback (void); + + /// Destructor + virtual ~TAO_Message_Sent_Callback (void); + + /// The message has been successfully sent + virtual void send_completed (void) = 0; + + /// The message has failed + virtual void send_failed (void) = 0; + + /// The message has timedout + virtual void send_timeout (void) = 0; + + /// The connection was closed before the message was sent + virtual void connection_closed (void) = 0; +}; + +#if defined (__ACE_INLINE__) +# include "Message_Sent_Callback.inl" +#endif /* __ACE_INLINE__ */ + +#include "ace/post.h" +#endif /* TAO_MESSAGE_SENT_CALLBACK_H */ diff --git a/TAO/tao/Message_Sent_Callback.inl b/TAO/tao/Message_Sent_Callback.inl new file mode 100644 index 00000000000..e69de29bb2d --- /dev/null +++ b/TAO/tao/Message_Sent_Callback.inl diff --git a/TAO/tao/Queued_Message.cpp b/TAO/tao/Queued_Message.cpp new file mode 100644 index 00000000000..0b84413bd46 --- /dev/null +++ b/TAO/tao/Queued_Message.cpp @@ -0,0 +1,103 @@ +// -*- C++ -*- +// $Id$ + +#include "Queued_Message.h" +#include "Message_Sent_Callback.h" + +#if !defined (__ACE_INLINE__) +# include "Queued_Message.inl" +#endif /* __ACE_INLINE__ */ + +ACE_RCSID(tao, Queued_Message, "$Id$") + +TAO_Queued_Message::TAO_Queued_Message (TAO_Message_Sent_Callback *callback) + : connection_closed_ (0) + , send_failure_ (0) + , timeout_ (0) + , callback_ (callback) + , next_ (0) + , prev_ (0) +{ +} + +TAO_Queued_Message::~TAO_Queued_Message (void) +{ +} + +void +TAO_Queued_Message::connection_closed (void) +{ + this->connection_closed_ = 1; + + if (this->callback_ != 0) + { + this->callback_->connection_closed (); + } +} + +void +TAO_Queued_Message::send_failure (void) +{ + this->send_failure_ = 1; + + if (this->callback_ != 0) + { + this->callback_->send_failed (); + } +} + +void +TAO_Queued_Message::remove_from_list (TAO_Queued_Message *&head, + TAO_Queued_Message *&tail) +{ + if (this->prev_ != 0) + this->prev_->next_ = this->next_; + else + head = this->next_; + + if (this->next_ != 0) + this->next_->prev_ = this->prev_; + else + tail = this->prev_; + + this->next_ = 0; + this->prev_ = 0; +} + +void +TAO_Queued_Message::push_back (TAO_Queued_Message *&head, + TAO_Queued_Message *&tail) +{ + if (tail == 0) + { + tail = this; + head = this; + this->next_ = 0; + this->prev_ = 0; + return; + } + + tail->next_ = this; + this->prev_ = tail; + this->next_ = 0; + tail = this; +} + +void +TAO_Queued_Message::push_front (TAO_Queued_Message *&head, + TAO_Queued_Message *&tail) +{ + if (head == 0) + { + tail = this; + head = this; + this->next_ = 0; + this->prev_ = 0; + return; + } + + head->prev_ = this; + this->next_ = head; + this->prev_ = 0; + head = this; +} diff --git a/TAO/tao/Queued_Message.h b/TAO/tao/Queued_Message.h new file mode 100644 index 00000000000..e403df1d8d1 --- /dev/null +++ b/TAO/tao/Queued_Message.h @@ -0,0 +1,214 @@ +// -*- C++ -*- + +//============================================================================= +/** + * @file Queued_Message.h + * + * $Id$ + * + * @author Carlos O'Ryan <coryan@uci.edu> + */ +//============================================================================= + +#ifndef TAO_QUEUED_MESSAGE_H +#define TAO_QUEUED_MESSAGE_H +#include "ace/pre.h" + +#include "corbafwd.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +class ACE_Message_Block; +class TAO_Message_Sent_Callback; + +/** + * @class TAO_Queued_Message + * + * @brief Represent messages queued in the outgoing data path of the + * TAO_Transport class. + * + * Please read the documentation in the TAO_Transport class to find + * out more about the design of the outgoing data path. + * + * In some configurations TAO needs to maintain a per-connection queue + * of outgoing messages. This queue is drained by the pluggable + * protocols framework, normally under control of the ACE_Reactor, but + * other configurations are conceivable. The elements in the queue + * may may removed early, for example, because the application can + * specify timeouts for each message, or because the underlying + * connection is broken. + * + * In many cases the message corresponds to some application request, + * the application may be blocked waiting for the request to be sent, + * even more importantlyl, the ORB can be configured to use the + * Leader/Followers strategy, in which case one of the waiting threads + * can be required to wake up before its message completes + * each message may contain a 'Sent_Notifier' + * + * <H4>NOTE:</H4> The contents of the ACE_Message_Block may have been + * allocated from TSS storage, in that case we cannot steal them. + * However, we do not need to perform a deep copy all the time, for + * example, in a twoway request the sending thread blocks until the + * data goes out. The queued message can borrow the memory as it will + * be deallocated by the sending thread when it finishes. + * Oneways and asynchronous calls are another story. + * + * @todo: Change the ORB to allocate oneway and AMI buffer from global + * memory, to avoid the data copy in this path. What happens + * if the there is no queueing? Can we check that before + * allocating the memory? + * + */ +class TAO_Export TAO_Queued_Message +{ +public: + /// Constructor + /** + * @param callback A callback interface to signal any waiting + * threads about the status of the message. It is null if there are + * no waiting threads. + */ + TAO_Queued_Message (TAO_Message_Sent_Callback *callback = 0); + + /// Destructor + virtual ~TAO_Queued_Message (void); + + /// The underlying connection has been closed, release resources and + /// signal waiting threads. + void connection_closed (void); + + /// There was an error while sending the data. + void send_failure (void); + + /** @name Intrusive list manipulation + * + * The messages are put in a doubled linked list (for easy insertion + * and removal). To minimize memory allocations the list is + * intrusive, i.e. each element in the list contains the pointers + * for the next and previous element. + * + * The following methods are used to manipulate this implicit list. + * + * @todo: We should implement this as a base template, something + * like:<BR> + * template<class T> Intrusive_Node {<BR> + * public:<BR><BR> + * void next (T *);<BR> + * T* next () const;<BR><BR> + * private:<BR> + * T* next_;<BR> + * };<BR> + * and use it as follows:<BR> + * class TAO_Queued_Message : public Intrusive_Node<TAO_Queued_Message><BR> + * {<BR> + * };<BR> + * + */ + //@{ + /// Set/get the next element in the list + virtual TAO_Queued_Message *next (void) const; + + /// Set/get the previous element in the list + virtual TAO_Queued_Message *prev (void) const; + + /// Remove this element from the list + virtual void remove_from_list (TAO_Queued_Message *&head, + TAO_Queued_Message *&tail); + + /// Insert the current element at the tail of the queue. + virtual void push_back (TAO_Queued_Message *&head, + TAO_Queued_Message *&tail); + + /// Insert the current element at the head of the queue. + virtual void push_front (TAO_Queued_Message *&head, + TAO_Queued_Message *&tail); + //@} + + /** @name Template Methods + */ + //@{ + + /// Return the length of the message + /** + * If the message has been partially sent it returns the number of + * bytes that are still not sent. + */ + virtual size_t message_length (void) const = 0; + + /// Return 1 if all the data has been sent + virtual int all_data_sent (void) const = 0; + + /// Fill up an io vector using the connects of the message + /** + * Different versions of this class represent the message using + * either a single buffer, or a message block. + * This method allows a derived class to fill up the contents of an + * io vector, the TAO_Transport class uses this method to group as + * many messages as possible in an iovector before sending them to + * the OS I/O subsystem. + * + * @param iovcnt_max The number of elements in iov + * @param iovcnt The number of elements already used by iov, this + * method should update this counter + * @param iov The io vector + */ + virtual void fill_iov (int iovcnt_max, int &iovcnt, iovec iov[]) const = 0; + + /// Update the internal state, data has been sent. + /** + * After the TAO_Transport class completes a successful (or + * partially successful) I/O operation it must update the state of + * all the messages queued. This callback method is used by each + * message to update its state and determine if all the data has + * been sent already. + * + * @param byte_count The number of bytes succesfully sent. The + * TAO_Queued_Message should decrement this value + * by the number of bytes that must still be sent. + * @return Returns 1 if the TAO_Queued_Message has any more data to + * send. + */ + virtual int bytes_transferred (size_t &byte_count) = 0; + + /// Reclaim resources + /** + * Reliable messages are allocated from the stack, thus they do not + * be deallocated. + * Asynchronous (SYNC_NONE) messages are allocated from the heap (or + * a pool), they need to be reclaimed explicitly. + */ + virtual void destroy (void) = 0; + //@} + +protected: + /// Set to 1 if the connection was closed + int connection_closed_; + + /// Set to 1 if there was a failure while sending the data + int send_failure_; + + /// Set to 1 if there was a timeout while sending the data + int timeout_; + +private: + /// If not null, this is the object that we signal to indicate that + /// the message was sent. + /** + * The signaling mechanism used to wakeup the thread waiting for + * this message to complete changes + */ + TAO_Message_Sent_Callback *callback_; + + /// Implement an intrusive double-linked list for the message queue + TAO_Queued_Message *next_; + TAO_Queued_Message *prev_; +}; + +#if defined (__ACE_INLINE__) +# include "Queued_Message.inl" +#endif /* __ACE_INLINE__ */ + +#include "ace/post.h" +#endif /* TAO_QUEUED_MESSAGE_H */ diff --git a/TAO/tao/Queued_Message.inl b/TAO/tao/Queued_Message.inl new file mode 100644 index 00000000000..e9cd0a9ff4b --- /dev/null +++ b/TAO/tao/Queued_Message.inl @@ -0,0 +1,13 @@ +// $Id$ + +ACE_INLINE TAO_Queued_Message * +TAO_Queued_Message::next (void) const +{ + return this->next_; +} + +ACE_INLINE TAO_Queued_Message * +TAO_Queued_Message::prev (void) const +{ + return this->prev_; +} diff --git a/TAO/tao/Reactive_Flushing_Strategy.cpp b/TAO/tao/Reactive_Flushing_Strategy.cpp new file mode 100644 index 00000000000..ed2cdd1da84 --- /dev/null +++ b/TAO/tao/Reactive_Flushing_Strategy.cpp @@ -0,0 +1,77 @@ +// -*- C++ -*- +// $Id$ + +#include "Reactive_Flushing_Strategy.h" +#include "Transport.h" +#include "ORB_Core.h" +#include "Queued_Message.h" +#include "debug.h" + +ACE_RCSID(tao, Reactive_Flushing_Strategy, "$Id$") + +int +TAO_Reactive_Flushing_Strategy::schedule_output (TAO_Transport *transport) +{ + return transport->schedule_output (); +} + +int +TAO_Reactive_Flushing_Strategy::cancel_output (TAO_Transport *transport) +{ + return transport->cancel_output (); +} + +int +TAO_Reactive_Flushing_Strategy::flush_message (TAO_Transport *transport, + TAO_Queued_Message *msg, + ACE_Time_Value *max_wait_time) +{ + int result = 0; + + // @@ Should we pass this down? Can we? + ACE_DECLARE_NEW_CORBA_ENV; + ACE_TRY + { + TAO_ORB_Core *orb_core = transport->orb_core (); + + while (!msg->all_data_sent () && result >= 0) + { + result = orb_core->run (max_wait_time, 1, ACE_TRY_ENV); + ACE_TRY_CHECK; + } + } + ACE_CATCHANY + { + return -1; + } + ACE_ENDTRY; + + return result; +} + +int +TAO_Reactive_Flushing_Strategy::flush_transport (TAO_Transport *transport) +{ + // @@ Should we pass this down? Can we? + ACE_DECLARE_NEW_CORBA_ENV; + ACE_TRY + { + TAO_ORB_Core *orb_core = transport->orb_core (); + + while (!transport->queue_is_empty ()) + { + int result = orb_core->run (0, 1, ACE_TRY_ENV); + ACE_TRY_CHECK; + + if (result == -1) + return -1; + } + } + ACE_CATCHANY + { + return -1; + } + ACE_ENDTRY; + + return 0; +} diff --git a/TAO/tao/Reactive_Flushing_Strategy.h b/TAO/tao/Reactive_Flushing_Strategy.h new file mode 100644 index 00000000000..e1aba16bbb0 --- /dev/null +++ b/TAO/tao/Reactive_Flushing_Strategy.h @@ -0,0 +1,40 @@ +// -*- C++ -*- + +//============================================================================= +/** + * @file Reactive_Flushing_Strategy.h + * + * $Id$ + * + * @author Carlos O'Ryan <coryan@uci.edu> + */ +//============================================================================= + +#ifndef TAO_REACTIVE_FLUSHING_STRATEGY_H +#define TAO_REACTIVE_FLUSHING_STRATEGY_H +#include "ace/pre.h" + +#include "Flushing_Strategy.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +/** + * @class TAO_Reactive_Flushing_Strategy + * + * @brief Implement a flushing strategy that uses the reactor. + */ +class TAO_Export TAO_Reactive_Flushing_Strategy : public TAO_Flushing_Strategy +{ +public: + virtual int schedule_output (TAO_Transport *transport); + virtual int cancel_output (TAO_Transport *transport); + virtual int flush_message (TAO_Transport *transport, + TAO_Queued_Message *msg, + ACE_Time_Value *max_wait_time); + virtual int flush_transport (TAO_Transport *transport); +}; + +#include "ace/post.h" +#endif /* TAO_REACTIVE_FLUSHING_STRATEGY_H */ |