diff options
Diffstat (limited to 'TAO/tao')
32 files changed, 1348 insertions, 368 deletions
diff --git a/TAO/tao/Block_Flushing_Strategy.cpp b/TAO/tao/Block_Flushing_Strategy.cpp new file mode 100644 index 00000000000..bd49b02ad86 --- /dev/null +++ b/TAO/tao/Block_Flushing_Strategy.cpp @@ -0,0 +1,33 @@ +// -*- C++ -*- +// $Id$ + +#include "Block_Flushing_Strategy.h" +#include "Transport.h" +#include "Queued_Message.h" + +ACE_RCSID(tao, Block_Flushing_Strategy, "$Id$") + +int +TAO_Block_Flushing_Strategy::schedule_output (TAO_Transport *) +{ + return 0; +} + +int +TAO_Block_Flushing_Strategy::cancel_output (TAO_Transport *) +{ + return 0; +} + +int +TAO_Block_Flushing_Strategy::flush_message (TAO_Transport *transport, + TAO_Queued_Message *msg) +{ + while (!msg->done ()) + { + int result = transport->handle_output (); + if (result == -1) + return -1; + } + return 0; +} diff --git a/TAO/tao/Block_Flushing_Strategy.h b/TAO/tao/Block_Flushing_Strategy.h new file mode 100644 index 00000000000..82af3a0cd3e --- /dev/null +++ b/TAO/tao/Block_Flushing_Strategy.h @@ -0,0 +1,38 @@ +// -*- C++ -*- + +//============================================================================= +/** + * @file Block_Flushing_Strategy.h + * + * $Id$ + * + * @author Carlos O'Ryan <coryan@uci.edu> + */ +//============================================================================= + +#ifndef TAO_BLOCK_FLUSHING_STRATEGY_H +#define TAO_BLOCK_FLUSHING_STRATEGY_H +#include "ace/pre.h" + +#include "Flushing_Strategy.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +/** + * @class TAO_Block_Flushing_Strategy + * + * @brief Implement a flushing strategy that blocks on write to flush + */ +class TAO_Export TAO_Block_Flushing_Strategy : public TAO_Flushing_Strategy +{ +public: + virtual int schedule_output (TAO_Transport *transport); + virtual int cancel_output (TAO_Transport *transport); + virtual int flush_message (TAO_Transport *transport, + TAO_Queued_Message *msg); +}; + +#include "ace/post.h" +#endif /* TAO_BLOCK_FLUSHING_STRATEGY_H */ diff --git a/TAO/tao/Flushing_Strategy.cpp b/TAO/tao/Flushing_Strategy.cpp new file mode 100644 index 00000000000..82a66ec1427 --- /dev/null +++ b/TAO/tao/Flushing_Strategy.cpp @@ -0,0 +1,10 @@ +// -*- C++ -*- +// $Id$ + +#include "Flushing_Strategy.h" + +ACE_RCSID(tao, Flushing_Strategy, "$Id$") + +TAO_Flushing_Strategy::~TAO_Flushing_Strategy (void) +{ +} diff --git a/TAO/tao/Flushing_Strategy.h b/TAO/tao/Flushing_Strategy.h new file mode 100644 index 00000000000..11d6cda2983 --- /dev/null +++ b/TAO/tao/Flushing_Strategy.h @@ -0,0 +1,65 @@ +// -*- C++ -*- + +//============================================================================= +/** + * @file Flushing_Strategy.h + * + * $Id$ + * + * @author Carlos O'Ryan <coryan@uci.edu> + */ +//============================================================================= + +#ifndef TAO_FLUSHING_STRATEGY_H +#define TAO_FLUSHING_STRATEGY_H +#include "ace/pre.h" + +#include "corbafwd.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +class TAO_Transport; +class TAO_Queued_Message; + +/** + * @class TAO_Flushing_Strategy + * + * @brief Define the interface for the flushing strategy, i.e. the + * algorithm that controls how does the ORB flush outgoing + * data. + * + * Please read the documentation in the TAO_Transport class to find + * out more about the design of the outgoing data path. + * + * Some applications can block the current thread whenever they are + * sending out data. In those cases they can obtain better + * performance by blocking in calls to write() than by participating + * in the Leader/Followers protocol to shared the ORB Reactor. + * + * This strategy controls how does the ORB schedule and cancel + * reactive I/O, if there is no reactive I/O the strategy is just a + * no-op. + * + */ +class TAO_Export TAO_Flushing_Strategy +{ +public: + /// Destructor + virtual ~TAO_Flushing_Strategy (void); + + /// Schedule the transport argument to be flushed + virtual int schedule_output (TAO_Transport *transport) = 0; + + /// Cancel all scheduled output for the transport argument + virtual int cancel_output (TAO_Transport *transport) = 0; + + /// Wait until msg is sent out. Potentially other messages are + /// flushed too, for example, because there are ahead in the queue. + virtual int flush_message (TAO_Transport *transport, + TAO_Queued_Message *msg) = 0; +}; + +#include "ace/post.h" +#endif /* TAO_FLUSHING_STRATEGY_H */ diff --git a/TAO/tao/GIOP_Message_Base.cpp b/TAO/tao/GIOP_Message_Base.cpp index 28135d4f8ea..de84da9427f 100644 --- a/TAO/tao/GIOP_Message_Base.cpp +++ b/TAO/tao/GIOP_Message_Base.cpp @@ -875,7 +875,7 @@ TAO_GIOP_Message_Base::send_error (TAO_Transport *transport) 0); ACE_Message_Block message_block(&data_block); message_block.wr_ptr (TAO_GIOP_MESSAGE_HEADER_LEN); - + int result = transport->send (&message_block); if (result == -1) { diff --git a/TAO/tao/IIOP_Connection_Handler.cpp b/TAO/tao/IIOP_Connection_Handler.cpp index 81ca246d8b8..2aa95f41a7a 100644 --- a/TAO/tao/IIOP_Connection_Handler.cpp +++ b/TAO/tao/IIOP_Connection_Handler.cpp @@ -53,20 +53,8 @@ TAO_IIOP_Connection_Handler::TAO_IIOP_Connection_Handler (TAO_ORB_Core *orb_core TAO_IIOP_Connection_Handler::~TAO_IIOP_Connection_Handler (void) { - // If the socket has not already been closed. - if (this->get_handle () != ACE_INVALID_HANDLE) - { - // Cannot deal with errors, and therefore they are ignored. - this->transport_.send_buffered_messages (); - } - else - { - // Dequeue messages and delete message blocks. - this->transport_.dequeue_all (); - } } - int TAO_IIOP_Connection_Handler::open (void*) { @@ -206,28 +194,17 @@ TAO_IIOP_Connection_Handler::fetch_handle (void) return this->get_handle (); } - int TAO_IIOP_Connection_Handler::handle_timeout (const ACE_Time_Value &, const void *) { - // This method is called when buffering timer expires. - // - ACE_Time_Value *max_wait_time = 0; - - TAO_Stub *stub = 0; - int has_timeout; - this->orb_core ()->call_timeout_hook (stub, - has_timeout, - *max_wait_time); - // Cannot deal with errors, and therefore they are ignored. - this->transport ()->send_buffered_messages (max_wait_time); + if (this->transport ()->handle_output () == -1) + return -1; return 0; } - int TAO_IIOP_Connection_Handler::close (u_long) { diff --git a/TAO/tao/IIOP_Transport.cpp b/TAO/tao/IIOP_Transport.cpp index fea6cdfdb40..25de3704e1f 100644 --- a/TAO/tao/IIOP_Transport.cpp +++ b/TAO/tao/IIOP_Transport.cpp @@ -90,10 +90,9 @@ TAO_IIOP_Transport::send (const ACE_Message_Block *message_block, const ACE_Time_Value *max_wait_time, size_t *bytes_transferred) { - return ACE::send_n (this->handle (), - message_block, - max_wait_time, - bytes_transferred); + return ACE::send (this->handle (), + message_block, + bytes_transferred); } ssize_t @@ -197,12 +196,8 @@ TAO_IIOP_Transport::send_message (TAO_OutputCDR &stream, if (this->messaging_object_->format_message (stream) != 0) return -1; - // Strictly speaking, should not need to loop here because the - // socket never gets set to a nonblocking mode ... some Linux - // versions seem to need it though. Leaving it costs little. - // This guarantees to send all data (bytes) or return an error. - ssize_t n = this->send_or_buffer (stub, + ssize_t n = this->send_message_i (stub, twoway, stream.begin (), max_wait_time); @@ -218,17 +213,6 @@ TAO_IIOP_Transport::send_message (TAO_OutputCDR &stream, return -1; } - // EOF. - if (n == 0) - { - if (TAO_debug_level) - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("TAO: (%P|%t|%N|%l) send_message () \n") - ACE_TEXT ("EOF, closing conn %d\n"), - this->handle())); - return -1; - } - return 1; } diff --git a/TAO/tao/Message_Sent_Callback.cpp b/TAO/tao/Message_Sent_Callback.cpp new file mode 100644 index 00000000000..5508945fbde --- /dev/null +++ b/TAO/tao/Message_Sent_Callback.cpp @@ -0,0 +1,14 @@ +// -*- C++ -*- +// $Id$ + +#include "Message_Sent_Callback.h" + +#if !defined (__ACE_INLINE__) +# include "Message_Sent_Callback.inl" +#endif /* __ACE_INLINE__ */ + +ACE_RCSID(tao, Message_Sent_Callback, "$Id$") + +TAO_Message_Sent_Callback::~TAO_Message_Sent_Callback (void) +{ +} diff --git a/TAO/tao/Message_Sent_Callback.h b/TAO/tao/Message_Sent_Callback.h new file mode 100644 index 00000000000..5a3bccda8ed --- /dev/null +++ b/TAO/tao/Message_Sent_Callback.h @@ -0,0 +1,60 @@ +// -*- C++ -*- + +//============================================================================= +/** + * @file Message_Sent_Callback.h + * + * $Id$ + * + * @author Carlos O'Ryan <coryan@uci.edu> + */ +//============================================================================= + +#ifndef TAO_MESSAGE_SENT_CALLBACK_H +#define TAO_MESSAGE_SENT_CALLBACK_H +#include "ace/pre.h" + +#include "corbafwd.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +/** + * @class TAO_Message_Sent_Callback + * + * @brief Encapsulate the signaling mechanism used to wake up threads + * waiting for a message to be sent out. + * + * Please read the documentation in the TAO_Transport class to find + * out more about the design of the outgoing data path. + * + */ +class TAO_Export TAO_Message_Sent_Callback +{ +public: + /// Constructor + TAO_Message_Sent_Callback (void); + + /// Destructor + virtual ~TAO_Message_Sent_Callback (void); + + /// The message has been successfully sent + virtual void send_completed (void) = 0; + + /// The message has failed + virtual void send_failed (void) = 0; + + /// The message has timedout + virtual void send_timeout (void) = 0; + + /// The connection was closed before the message was sent + virtual void connection_closed (void) = 0; +}; + +#if defined (__ACE_INLINE__) +# include "Message_Sent_Callback.inl" +#endif /* __ACE_INLINE__ */ + +#include "ace/post.h" +#endif /* TAO_MESSAGE_SENT_CALLBACK_H */ diff --git a/TAO/tao/Message_Sent_Callback.inl b/TAO/tao/Message_Sent_Callback.inl new file mode 100644 index 00000000000..e7b8f4d6668 --- /dev/null +++ b/TAO/tao/Message_Sent_Callback.inl @@ -0,0 +1,6 @@ +// $Id$ + +ACE_INLINE +TAO_Message_Sent_Callback::TAO_Message_Sent_Callback (void) +{ +} diff --git a/TAO/tao/ORB_Core.cpp b/TAO/tao/ORB_Core.cpp index 21b06de9766..f352135f555 100644 --- a/TAO/tao/ORB_Core.cpp +++ b/TAO/tao/ORB_Core.cpp @@ -35,6 +35,8 @@ #include "IORInfo.h" +#include "Flushing_Strategy.h" + #if defined(ACE_MVS) #include "ace/Codeset_IBM1047.h" #endif /* ACE_MVS */ @@ -158,6 +160,7 @@ TAO_ORB_Core::TAO_ORB_Core (const char *orbid) parser_registry_ (), connection_cache_ (), bidir_giop_policy_ (0) + , flushing_strategy_ (0) { #if defined(ACE_MVS) ACE_NEW (this->from_iso8859_, ACE_IBM1047_ISO8859); @@ -221,6 +224,8 @@ TAO_ORB_Core::TAO_ORB_Core (const char *orbid) TAO_ORB_Core::~TAO_ORB_Core (void) { + delete this->flushing_strategy_; + ACE_OS::free (this->orbid_); delete this->from_iso8859_; @@ -1043,6 +1048,9 @@ TAO_ORB_Core::init (int &argc, char *argv[], CORBA::Environment &ACE_TRY_ENV) // init the ORB core's pointer this->protocol_factories_ = trf->get_protocol_factories (); + // Initialize the flushing strategy + this->flushing_strategy_ = trf->create_flushing_strategy (); + // Now that we have a complete list of available protocols and their // related factory objects, set default policies and initialize the // registries! diff --git a/TAO/tao/ORB_Core.h b/TAO/tao/ORB_Core.h index 163294e9e17..17113eccd7b 100644 --- a/TAO/tao/ORB_Core.h +++ b/TAO/tao/ORB_Core.h @@ -82,6 +82,8 @@ class TAO_Message_State_Factory; class TAO_ServerRequest; class TAO_Protocols_Hooks; +class TAO_Flushing_Strategy; + #if (TAO_HAS_BUFFERING_CONSTRAINT_POLICY == 1) class TAO_Eager_Buffering_Sync_Strategy; @@ -861,13 +863,21 @@ public: CORBA::Boolean bidir_giop_policy (void); void bidir_giop_policy (CORBA::Boolean); - - /// Return the table that maps object key/name to de-stringified /// object reference. It is needed for supporting local objects in /// the resolve_initial_references() mechanism. TAO_Object_Ref_Table &object_ref_table (void); + /// Return the flushing strategy + /** + * The flushing strategy is created by the resource factory, and it + * is used by the ORB to control the mechanism used to flush the + * outgoing data queues. + * The flushing strategies are stateless, therefore, there is only + * one per ORB. + */ + TAO_Flushing_Strategy *flushing_strategy (void); + protected: /// Destructor is protected since the ORB Core should only be @@ -1204,6 +1214,9 @@ protected: /// Bir Dir GIOP policy value CORBA::Boolean bidir_giop_policy_; + + /// Hold the flushing strategy + TAO_Flushing_Strategy *flushing_strategy_; }; // **************************************************************** diff --git a/TAO/tao/ORB_Core.i b/TAO/tao/ORB_Core.i index 2a3b815aec1..fce8b81dffe 100644 --- a/TAO/tao/ORB_Core.i +++ b/TAO/tao/ORB_Core.i @@ -47,6 +47,11 @@ TAO_ORB_Core::object_ref_table (void) return this->object_ref_table_; } +ACE_INLINE TAO_Flushing_Strategy * +TAO_ORB_Core::flushing_strategy (void) +{ + return this->flushing_strategy_; +} ACE_INLINE CORBA::Boolean TAO_ORB_Core::service_profile_selection (TAO_MProfile &mprofile, diff --git a/TAO/tao/Queued_Message.cpp b/TAO/tao/Queued_Message.cpp new file mode 100644 index 00000000000..f3d4a2ed8f8 --- /dev/null +++ b/TAO/tao/Queued_Message.cpp @@ -0,0 +1,96 @@ +// -*- C++ -*- +// $Id$ + +#include "Queued_Message.h" +#include "Message_Sent_Callback.h" + +#if !defined (__ACE_INLINE__) +# include "Queued_Message.inl" +#endif /* __ACE_INLINE__ */ + +ACE_RCSID(tao, Queued_Message, "$Id$") + +TAO_Queued_Message::TAO_Queued_Message (ACE_Message_Block *contents, + TAO_Message_Sent_Callback *callback) + : contents_ (contents) + , callback_ (callback) + , next_ (0) + , prev_ (0) +{ +} + +TAO_Queued_Message::~TAO_Queued_Message (void) +{ + ACE_Message_Block::release (this->contents_); +} + +void +TAO_Queued_Message::connection_closed (void) +{ + if (this->callback_ != 0) + this->callback_->connection_closed (); +} + +void +TAO_Queued_Message::destroy (void) +{ + delete this; +} + +void +TAO_Queued_Message::bytes_transferred (size_t byte_count) +{ + while (byte_count > 0 && !this->done ()) + { + size_t l = this->contents_->length (); + if (byte_count < l) + { + this->contents_->rd_ptr (byte_count); + return; + } + ACE_Message_Block *cont = this->contents_->cont (); + byte_count -= l; + ACE_Message_Block::release (this->contents_); + this->contents_ = cont; + } +} + +void +TAO_Queued_Message::remove_from_list (TAO_Queued_Message *&head, + TAO_Queued_Message *&tail) +{ + if (this->prev_ != 0) + this->prev_->next_ = this->next_; + else + head = this->next_; + + if (this->next_ != 0) + this->next_->prev_ = this->prev_; + else + tail = this->prev_; + + this->next_ = 0; + this->prev_ = 0; +} + +void +TAO_Queued_Message::push_back (TAO_Queued_Message *&head, + TAO_Queued_Message *&tail) +{ + if (tail == 0) + { + tail = this; + head = this; + this->next_ = 0; + this->prev_ = 0; + return; + } + + this->prev_ = tail; + this->next_ = 0; + if (tail->prev_ != 0) + { + tail->prev_->next_ = this; + } + tail = this; +} diff --git a/TAO/tao/Queued_Message.h b/TAO/tao/Queued_Message.h new file mode 100644 index 00000000000..ddb66a92e7f --- /dev/null +++ b/TAO/tao/Queued_Message.h @@ -0,0 +1,170 @@ +// -*- C++ -*- + +//============================================================================= +/** + * @file Queued_Message.h + * + * $Id$ + * + * @author Carlos O'Ryan <coryan@uci.edu> + */ +//============================================================================= + +#ifndef TAO_QUEUED_MESSAGE_H +#define TAO_QUEUED_MESSAGE_H +#include "ace/pre.h" + +#include "corbafwd.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +class ACE_Message_Block; +class TAO_Message_Sent_Callback; + +/** + * @class TAO_Queued_Message + * + * @brief Implement an queued message for the outgoing path in the + * TAO_Transport class + * + * Please read the documentation in the TAO_Transport class to find + * out more about the design of the outgoing data path. + * + * In some configurations TAO needs to maintain a per-connection queue + * of outgoing messages. This queue is drained by the pluggable + * protocols framework, normally under control of the ACE_Reactor, but + * other configurations are conceivable. The elements in the queue + * may may removed early, for example, because the application can + * specify timeouts for each message, or because the underlying + * connection is broken. + * + * In many cases the message corresponds to some application request, + * the application may be blocked waiting for the request to be sent, + * even more importantlyl, the ORB can be configured to use the + * Leader/Followers strategy, in which case one of the waiting threads + * can be required to wake up before its message completes + * each message may contain a 'Sent_Notifier' + * + * <H4>NOTE:</H4> The contents of the ACE_Message_Block may have been + * allocated from TSS storage, in that case we cannot steal them. + * However, we do not need to perform a deep copy all the time, for + * example, in a twoway request the sending thread blocks until the + * data goes out. The queued message can borrow the memory as it will + * be deallocated by the sending thread when it finishes. + * Oneways and asynchronous calls are another story. + * + * @todo: Change the ORB to allocate oneway and AMI buffer from global + * memory, to avoid the data copy in this path. What happens + * if the there is no queueing? Can we check that before + * allocating the memory? + * + */ +class TAO_Export TAO_Queued_Message +{ +public: + /// Constructor + /** + * @param contents The message block chain that must be sent, this + * class <B>always</B> assumes ownership of the chain. + * + * @param callback A callback interface to signal any waiting + * threads about the status of the message. It is null if there are + * no waiting threads. + */ + TAO_Queued_Message (ACE_Message_Block *contents, + TAO_Message_Sent_Callback *callback = 0); + + /// Destructor + virtual ~TAO_Queued_Message (void); + + /// Get the message block + ACE_Message_Block *mb (void) const; + + /// The transport has successfully sent more data, adjust internal + /// status + void bytes_transferred (size_t byte_count); + + /// Return 0 if the message has not been completely sent + int done (void) const; + + /// The underlying connection has been closed, release resources and + /// signal waiting threads. + void connection_closed (void); + + /// Reclaim resources + void destroy (void); + + /** @name Intrusive list manipulation + * + * The messages are put in a doubled linked list (for easy insertion + * and removal). To minimize memory allocations the list is + * intrusive, i.e. each element in the list contains the pointers + * for the next and previous element. + * + * The following methods are used to manipulate this implicit list. + * + * @todo: We should implement this as a base template, something + * like:<BR> + * template<class T> Intrusive_Node {<BR> + * public:<BR><BR> + * void next (T *);<BR> + * T* next () const;<BR><BR> + * private:<BR> + * T* next_;<BR> + * };<BR> + * and use it as follows:<BR> + * class TAO_Queued_Message : public Intrusive_Node<TAO_Queued_Message><BR> + * {<BR> + * };<BR> + * + */ + //@{ + /// Set/get the next element in the list + virtual TAO_Queued_Message *next (void) const; + + /// Set/get the previous element in the list + virtual TAO_Queued_Message *prev (void) const; + + /// Remove this element from the list + virtual void remove_from_list (TAO_Queued_Message *&head, + TAO_Queued_Message *&tail); + + /// Insert the current element after position. + /** + * If position is null then we assume that we are inserting the + * current element into an empty list. + */ + virtual void push_back (TAO_Queued_Message *&head, + TAO_Queued_Message *&tail); + //@} + +private: + /// The contents of the message. + /** + * The message is normally generated by a TAO_OutputCDR stream. The + * application marshals the payload, possibly generating a chain of + * message block connected via the 'cont()' field. + */ + ACE_Message_Block *contents_; + + /// If not null, this is the object that we signal to indicate that + /// the message was sent. + /** + * The signaling mechanism used to wakeup the thread waiting for + * this message to complete changes + */ + TAO_Message_Sent_Callback *callback_; + + /// Implement an intrusive double-linked list for the message queue + TAO_Queued_Message *next_; + TAO_Queued_Message *prev_; +}; + +#if defined (__ACE_INLINE__) +# include "Queued_Message.inl" +#endif /* __ACE_INLINE__ */ + +#include "ace/post.h" +#endif /* TAO_QUEUED_MESSAGE_H */ diff --git a/TAO/tao/Queued_Message.inl b/TAO/tao/Queued_Message.inl new file mode 100644 index 00000000000..76a86e62d1c --- /dev/null +++ b/TAO/tao/Queued_Message.inl @@ -0,0 +1,25 @@ +// $Id$ + +ACE_INLINE ACE_Message_Block * +TAO_Queued_Message::mb (void) const +{ + return this->contents_; +} + +ACE_INLINE int +TAO_Queued_Message::done (void) const +{ + return this->contents_ != 0; +} + +ACE_INLINE TAO_Queued_Message * +TAO_Queued_Message::next (void) const +{ + return this->next_; +} + +ACE_INLINE TAO_Queued_Message * +TAO_Queued_Message::prev (void) const +{ + return this->prev_; +} diff --git a/TAO/tao/Reactive_Flushing_Strategy.cpp b/TAO/tao/Reactive_Flushing_Strategy.cpp new file mode 100644 index 00000000000..58df0474633 --- /dev/null +++ b/TAO/tao/Reactive_Flushing_Strategy.cpp @@ -0,0 +1,56 @@ +// -*- C++ -*- +// $Id$ + +#include "Reactive_Flushing_Strategy.h" +#include "Transport.h" +#include "ORB_Core.h" +#include "Queued_Message.h" + +ACE_RCSID(tao, Reactive_Flushing_Strategy, "$Id$") + +int +TAO_Reactive_Flushing_Strategy::schedule_output (TAO_Transport *transport) +{ + ACE_Reactor *reactor = + transport->orb_core ()->reactor (); + + return reactor->register_handler (transport->event_handler (), + ACE_Event_Handler::READ_MASK + | ACE_Event_Handler::WRITE_MASK); +} + +int +TAO_Reactive_Flushing_Strategy::cancel_output (TAO_Transport *transport) +{ + ACE_Reactor *reactor = + transport->orb_core ()->reactor (); + + return reactor->register_handler (transport->event_handler (), + ACE_Event_Handler::READ_MASK); +} + +int +TAO_Reactive_Flushing_Strategy::flush_message (TAO_Transport *transport, + TAO_Queued_Message *msg) +{ + TAO_ORB_Core *orb_core = transport->orb_core (); + + int result = 0; + // @@ Should we pass this down? Can we? + ACE_DECLARE_NEW_CORBA_ENV; + ACE_TRY + { + while (!msg->done () && result > 0) + { + result = orb_core->run (0, 1, ACE_TRY_ENV); + ACE_TRY_CHECK; + } + } + ACE_CATCHANY + { + return -1; + } + ACE_ENDTRY; + + return result; +} diff --git a/TAO/tao/Reactive_Flushing_Strategy.h b/TAO/tao/Reactive_Flushing_Strategy.h new file mode 100644 index 00000000000..c1a33aaef5b --- /dev/null +++ b/TAO/tao/Reactive_Flushing_Strategy.h @@ -0,0 +1,38 @@ +// -*- C++ -*- + +//============================================================================= +/** + * @file Reactive_Flushing_Strategy.h + * + * $Id$ + * + * @author Carlos O'Ryan <coryan@uci.edu> + */ +//============================================================================= + +#ifndef TAO_REACTIVE_FLUSHING_STRATEGY_H +#define TAO_REACTIVE_FLUSHING_STRATEGY_H +#include "ace/pre.h" + +#include "Flushing_Strategy.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +/** + * @class TAO_Reactive_Flushing_Strategy + * + * @brief Implement a flushing strategy that uses the reactor. + */ +class TAO_Export TAO_Reactive_Flushing_Strategy : public TAO_Flushing_Strategy +{ +public: + virtual int schedule_output (TAO_Transport *transport); + virtual int cancel_output (TAO_Transport *transport); + virtual int flush_message (TAO_Transport *transport, + TAO_Queued_Message *msg); +}; + +#include "ace/post.h" +#endif /* TAO_REACTIVE_FLUSHING_STRATEGY_H */ diff --git a/TAO/tao/Resource_Factory.h b/TAO/tao/Resource_Factory.h index 523aaaea637..a3e66b87d53 100644 --- a/TAO/tao/Resource_Factory.h +++ b/TAO/tao/Resource_Factory.h @@ -32,6 +32,8 @@ class TAO_Connector_Registry; class TAO_Reactor_Registry; class TAO_Priority_Mapping; +class TAO_Flushing_Strategy; + // **************************************************************** class TAO_Export TAO_Protocol_Item @@ -181,6 +183,10 @@ public: /// Creates the lock for the lock needed in the Cache Map virtual ACE_Lock *create_cached_connection_lock (void); + /// Creates the flushing strategy. The new instance is owned by the + /// caller. + virtual TAO_Flushing_Strategy *create_flushing_strategy (void) = 0; + protected: /** * Loads the default protocols. This method is used so that the diff --git a/TAO/tao/Strategies/SHMIOP_Connection_Handler.cpp b/TAO/tao/Strategies/SHMIOP_Connection_Handler.cpp index 0b87d02ba5c..4abbb20eea1 100644 --- a/TAO/tao/Strategies/SHMIOP_Connection_Handler.cpp +++ b/TAO/tao/Strategies/SHMIOP_Connection_Handler.cpp @@ -52,21 +52,8 @@ TAO_SHMIOP_Connection_Handler::TAO_SHMIOP_Connection_Handler (TAO_ORB_Core *orb_ TAO_SHMIOP_Connection_Handler::~TAO_SHMIOP_Connection_Handler (void) { - - // If the socket has not already been closed. - if (this->get_handle () != ACE_INVALID_HANDLE) - { - // Cannot deal with errors, and therefore they are ignored. - this->transport_.send_buffered_messages (); - } - else - { - // Dequeue messages and delete message blocks. - this->transport_.dequeue_all (); - } } - int TAO_SHMIOP_Connection_Handler::open (void*) { @@ -211,28 +198,17 @@ TAO_SHMIOP_Connection_Handler::fetch_handle (void) return this->get_handle (); } - int TAO_SHMIOP_Connection_Handler::handle_timeout (const ACE_Time_Value &, const void *) { - // This method is called when buffering timer expires. - // - ACE_Time_Value *max_wait_time = 0; - - TAO_Stub *stub = 0; - int has_timeout; - this->orb_core ()->call_timeout_hook (stub, - has_timeout, - *max_wait_time); - // Cannot deal with errors, and therefore they are ignored. - this->transport ()->send_buffered_messages (max_wait_time); + if (this->transport ()->handle_output () == -1) + return -1; return 0; } - int TAO_SHMIOP_Connection_Handler::close (u_long) { diff --git a/TAO/tao/Strategies/SHMIOP_Transport.cpp b/TAO/tao/Strategies/SHMIOP_Transport.cpp index 27fd2d23b68..3931ecfc0b7 100644 --- a/TAO/tao/Strategies/SHMIOP_Transport.cpp +++ b/TAO/tao/Strategies/SHMIOP_Transport.cpp @@ -204,7 +204,7 @@ TAO_SHMIOP_Transport::send_message (TAO_OutputCDR &stream, // versions seem to need it though. Leaving it costs little. // This guarantees to send all data (bytes) or return an error. - ssize_t n = this->send_or_buffer (stub, + ssize_t n = this->send_message_i (stub, twoway, stream.begin (), max_wait_time); @@ -220,17 +220,6 @@ TAO_SHMIOP_Transport::send_message (TAO_OutputCDR &stream, return -1; } - // EOF. - if (n == 0) - { - if (TAO_debug_level) - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("TAO: (%P|%t|%N|%l) send_message () \n") - ACE_TEXT ("EOF, closing conn %d\n"), - this->handle())); - return -1; - } - return 1; } diff --git a/TAO/tao/Strategies/UIOP_Connection_Handler.cpp b/TAO/tao/Strategies/UIOP_Connection_Handler.cpp index 03fd6a1ae99..58c61ec1947 100644 --- a/TAO/tao/Strategies/UIOP_Connection_Handler.cpp +++ b/TAO/tao/Strategies/UIOP_Connection_Handler.cpp @@ -56,22 +56,8 @@ TAO_UIOP_Connection_Handler::TAO_UIOP_Connection_Handler (TAO_ORB_Core *orb_core TAO_UIOP_Connection_Handler::~TAO_UIOP_Connection_Handler (void) { - - // If the socket has not already been closed. - if (this->get_handle () != ACE_INVALID_HANDLE) - { - // Cannot deal with errors, and therefore they are ignored. - this->transport_.send_buffered_messages (); - } - else - { - // Dequeue messages and delete message blocks. - this->transport_.dequeue_all (); - } } - - int TAO_UIOP_Connection_Handler::open (void*) { @@ -198,23 +184,13 @@ int TAO_UIOP_Connection_Handler::handle_timeout (const ACE_Time_Value &, const void *) { - // This method is called when buffering timer expires. - // - ACE_Time_Value *max_wait_time = 0; - - TAO_Stub *stub = 0; - int has_timeout; - this->orb_core ()->call_timeout_hook (stub, - has_timeout, - *max_wait_time); - // Cannot deal with errors, and therefore they are ignored. - this->transport ()->send_buffered_messages (max_wait_time); + if (this->transport ()->handle_output () == -1) + return -1; return 0; } - int TAO_UIOP_Connection_Handler::close (u_long) { diff --git a/TAO/tao/Strategies/UIOP_Transport.cpp b/TAO/tao/Strategies/UIOP_Transport.cpp index ac762b53c9e..54ad2408b58 100644 --- a/TAO/tao/Strategies/UIOP_Transport.cpp +++ b/TAO/tao/Strategies/UIOP_Transport.cpp @@ -206,7 +206,7 @@ TAO_UIOP_Transport::send_message (TAO_OutputCDR &stream, // versions seem to need it though. Leaving it costs little. // This guarantees to send all data (bytes) or return an error. - ssize_t n = this->send_or_buffer (stub, + ssize_t n = this->send_message_i (stub, twoway, stream.begin (), max_wait_time); @@ -222,17 +222,6 @@ TAO_UIOP_Transport::send_message (TAO_OutputCDR &stream, return -1; } - // EOF. - if (n == 0) - { - if (TAO_debug_level) - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("TAO: (%P|%t|%N|%l) send_message () \n") - ACE_TEXT ("EOF, closing conn %d\n"), - this->handle())); - return -1; - } - return 1; } diff --git a/TAO/tao/Sync_Strategies.cpp b/TAO/tao/Sync_Strategies.cpp index c679fc3405c..d79f1109e27 100644 --- a/TAO/tao/Sync_Strategies.cpp +++ b/TAO/tao/Sync_Strategies.cpp @@ -14,129 +14,47 @@ TAO_Sync_Strategy::~TAO_Sync_Strategy (void) { } -ssize_t -TAO_Transport_Sync_Strategy::send (TAO_Transport &transport, - TAO_Stub &, - const ACE_Message_Block *message_block, - const ACE_Time_Value *max_wait_time) -{ - // Immediate delegation to the transport. - return transport.send (message_block, - max_wait_time); -} +// **************************************************************** -#if (TAO_HAS_BUFFERING_CONSTRAINT_POLICY == 1) - -ssize_t -TAO_Delayed_Buffering_Sync_Strategy::send (TAO_Transport &transport, - TAO_Stub &stub, - const ACE_Message_Block *mb, - const ACE_Time_Value *max_wait_time) +int +TAO_Transport_Sync_Strategy:: + must_queue (TAO_Stub *, int) { - ACE_Message_Block *message_block = - ACE_const_cast (ACE_Message_Block *, mb); - - ssize_t result = 0; - - // Get the message queue from the transport. - TAO_Transport_Buffering_Queue &buffering_queue = - transport.buffering_queue (); - - // Check if there are messages already in the queue. - if (!buffering_queue.is_empty ()) - return TAO_Eager_Buffering_Sync_Strategy::send (transport, - stub, - message_block, - max_wait_time); - - // - // Otherwise there were no queued messages. We first try to send - // the message right away. - // - - // Actual network send. - size_t bytes_transferred = 0; - result = transport.send (message_block, - max_wait_time, - &bytes_transferred); - - // Cannot send completely: timed out. - if (result == -1 && - errno == ETIME) - { - if (bytes_transferred > 0) - { - // If successful in sending some of the data, reset the - // message block appropriately. - transport.reset_sent_message (message_block, - bytes_transferred); - } - - // Queue the rest. - return bytes_transferred + - TAO_Eager_Buffering_Sync_Strategy::send (transport, - stub, - message_block, - max_wait_time); - } - - // EOF or other errors. - if (result == -1 || - result == 0) - return -1; - - // Everything was successfully delivered. - return result; + return 0; } -ssize_t -TAO_Eager_Buffering_Sync_Strategy::send (TAO_Transport &transport, - TAO_Stub &stub, - const ACE_Message_Block *message_block, - const ACE_Time_Value *max_wait_time) +int +TAO_Transport_Sync_Strategy:: + buffering_constraints_reached (TAO_Stub *, + size_t , + size_t , + int &, + ACE_Time_Value &) { - ssize_t result = 0; - - // Get the message queue from the transport. - TAO_Transport_Buffering_Queue &buffering_queue = - transport.buffering_queue (); - - // Copy the message. - ACE_Message_Block *copy = message_block->clone (); - - // Enqueue current message. - result = buffering_queue.enqueue_tail (copy); - - // Enqueuing error. - if (result == -1) - { - // Eliminate the copy. - copy->release (); + return 0; +} - // Return error. - return -1; - } +#if (TAO_HAS_BUFFERING_CONSTRAINT_POLICY == 1) - // Check if upper bound has been reached. - if (this->buffering_constraints_reached (transport, - stub, - buffering_queue)) - { - return transport.send_buffered_messages (max_wait_time); - } +// **************************************************************** - // Hoping that this return value is meaningful or at least - // acceptable. - return message_block->total_length (); +int +TAO_Eager_Buffering_Sync_Strategy:: + must_queue (TAO_Stub *, int) +{ + return 1; } int -TAO_Eager_Buffering_Sync_Strategy::buffering_constraints_reached (TAO_Transport &transport, - TAO_Stub &stub, - TAO_Transport_Buffering_Queue &buffering_queue) +TAO_Eager_Buffering_Sync_Strategy:: + buffering_constraints_reached (TAO_Stub *stub, + size_t msg_count, + size_t total_bytes, + int &set_timer, + ACE_Time_Value &interval) { TAO_Buffering_Constraint_Policy *buffering_constraint_policy = - stub.buffering_constraint (); + stub->buffering_constraint (); if (buffering_constraint_policy == 0) return 1; @@ -147,86 +65,53 @@ TAO_Eager_Buffering_Sync_Strategy::buffering_constraints_reached (TAO_Transport TAO::BufferingConstraint buffering_constraint; buffering_constraint_policy->get_buffering_constraint (buffering_constraint); - this->timer_check (transport, - buffering_constraint); + this->timer_check (buffering_constraint, set_timer, interval); if (buffering_constraint.mode == TAO::BUFFER_FLUSH) return 1; if (ACE_BIT_ENABLED (buffering_constraint.mode, - TAO::BUFFER_MESSAGE_COUNT) && - buffering_queue.message_count () >= buffering_constraint.message_count) + TAO::BUFFER_MESSAGE_COUNT) + && msg_count >= buffering_constraint.message_count) return 1; if (ACE_BIT_ENABLED (buffering_constraint.mode, - TAO::BUFFER_MESSAGE_BYTES) && - buffering_queue.message_length () >= buffering_constraint.message_bytes) + TAO::BUFFER_MESSAGE_BYTES) + && total_bytes >= buffering_constraint.message_bytes) return 1; return 0; } void -TAO_Eager_Buffering_Sync_Strategy::timer_check (TAO_Transport &transport, - const TAO::BufferingConstraint &buffering_constraint) +TAO_Eager_Buffering_Sync_Strategy:: + timer_check (const TAO::BufferingConstraint &buffering_constraint, + int &set_timer, + ACE_Time_Value &interval) { - if (transport.buffering_timer_id () != 0) + if (!ACE_BIT_ENABLED (buffering_constraint.mode, + TAO::BUFFER_TIMEOUT)) { - // - // There is a timeout set by us, though we are not sure if we - // still need the timeout or if the timeout value is correct or - // not. - // - - // Get our reactor. - ACE_Reactor *reactor = transport.orb_core ()->reactor (); - - if (!ACE_BIT_ENABLED (buffering_constraint.mode, - TAO::BUFFER_TIMEOUT)) - { - // Timeouts are no longer needed. Cancel existing one. - reactor->cancel_timer (transport.buffering_timer_id ()); - transport.buffering_timer_id (0); - } - else - { - ACE_Time_Value timeout = - this->time_conversion (buffering_constraint.timeout); - - if (transport.buffering_timeout_value () == timeout) - { - // Timeout value is the same, nothing to be done. - } - else - { - // Timeout value has changed, reset the old timer. - reactor->reset_timer_interval (transport.buffering_timer_id (), - timeout); - } - } + set_timer = 0; + return; } - else if (ACE_BIT_ENABLED (buffering_constraint.mode, - TAO::BUFFER_TIMEOUT)) - { - // We didn't have timeouts before, but we want them now. - ACE_Time_Value timeout = - this->time_conversion (buffering_constraint.timeout); - // Get our reactor. - ACE_Reactor *reactor = transport.orb_core ()->reactor (); + ACE_Time_Value timeout = + this->time_conversion (buffering_constraint.timeout); - long timer_id = reactor->schedule_timer (transport.event_handler (), - 0, - timeout, - timeout); - - transport.buffering_timer_id (timer_id); - transport.buffering_timeout_value (timeout); + if (interval == timeout) + { + set_timer = 0; + return; } + + set_timer = 1; + interval = timeout; } ACE_Time_Value -TAO_Eager_Buffering_Sync_Strategy::time_conversion (const TimeBase::TimeT &time) +TAO_Eager_Buffering_Sync_Strategy:: + time_conversion (const TimeBase::TimeT &time) { TimeBase::TimeT seconds = time / 10000000u; TimeBase::TimeT microseconds = (time % 10000000u) / 10; @@ -234,4 +119,15 @@ TAO_Eager_Buffering_Sync_Strategy::time_conversion (const TimeBase::TimeT &time) ACE_U64_TO_U32 (microseconds)); } +// **************************************************************** + +int +TAO_Delayed_Buffering_Sync_Strategy:: + must_queue (TAO_Stub *, + int queue_empty) +{ + // If the queue is empty we want to send immediately + return !queue_empty; +} + #endif /* TAO_HAS_BUFFERING_CONSTRAINT_POLICY == 1 */ diff --git a/TAO/tao/Sync_Strategies.h b/TAO/tao/Sync_Strategies.h index 5b273dac3e5..231d81a9624 100644 --- a/TAO/tao/Sync_Strategies.h +++ b/TAO/tao/Sync_Strategies.h @@ -27,24 +27,59 @@ #include "tao/Transport.h" #include "tao/TAOC.h" +/// Define the interface for the Queueing Strategy +/** + * The low-level I/O components in the ORB use this strategy to + * determine when messages must be queued, immediately sent or + * flushed. + * + * The strategy isolates this low-level components from the higher + * level strategies used by the application developer. + * + * @todo The class name (Sync_Strategy) is inherited from the policies + * (SyncScopePolicy), but Queueing_Strategy probably captures its + * intent better. It should be changed in a future revision of the + * ORB. + */ class TAO_Export TAO_Sync_Strategy { public: + /// Destructor virtual ~TAO_Sync_Strategy (void); - virtual ssize_t send (TAO_Transport &transport, - TAO_Stub &stub, - const ACE_Message_Block *message_block, - const ACE_Time_Value *max_wait_time) = 0; + /// Return 1 if a message must be queued + virtual int must_queue (TAO_Stub *stub, + int queue_empty) = 0; + + /// Return 1 if it is time to start + /** + * @param stub The object used to make the request, this is used to + * obtain the policies currently in effect for the request + * @param msg_count The number of messages currently queued + * @param total_bytes Number of bytes currently queued + * @param set_timer Returns 1 if a timer should be set to drain the + * queue + * @param interval If set_timer returns 1, this parameter contains + * the timer interval + */ + virtual int buffering_constraints_reached (TAO_Stub *stub, + size_t msg_count, + size_t total_bytes, + int &set_timer, + ACE_Time_Value &interval) = 0; }; class TAO_Export TAO_Transport_Sync_Strategy : public TAO_Sync_Strategy { public: - ssize_t send (TAO_Transport &transport, - TAO_Stub &stub, - const ACE_Message_Block *message_block, - const ACE_Time_Value *max_wait_time); + virtual int must_queue (TAO_Stub *stub, + int queue_empty); + + virtual int buffering_constraints_reached (TAO_Stub *stub, + size_t msg_count, + size_t total_bytes, + int &set_timer, + ACE_Time_Value &interval); }; #if (TAO_HAS_BUFFERING_CONSTRAINT_POLICY == 1) @@ -52,28 +87,41 @@ public: class TAO_Export TAO_Eager_Buffering_Sync_Strategy : public TAO_Sync_Strategy { public: - ssize_t send (TAO_Transport &transport, - TAO_Stub &stub, - const ACE_Message_Block *message_block, - const ACE_Time_Value *max_wait_time); - - virtual int buffering_constraints_reached (TAO_Transport &transport, - TAO_Stub &stub, - TAO_Transport_Buffering_Queue &buffering_queue); - - void timer_check (TAO_Transport &transport, - const TAO::BufferingConstraint &buffering_constraint); - + virtual int must_queue (TAO_Stub *stub, + int queue_empty); + + virtual int buffering_constraints_reached (TAO_Stub *stub, + size_t msg_count, + size_t total_bytes, + int &set_timer, + ACE_Time_Value &interval); + +private: + /// Check if the buffering constraint includes any timeouts and + /// compute the right timeout interval if needed. + /** + * @param buffering_constraint The constraints defined by the + * application + * @param set_timer Return 1 if the timer should be set + * @param interval Return the timer interval value + */ + void timer_check (const TAO::BufferingConstraint &buffering_constraint, + int &set_timer, + ACE_Time_Value &interval); + + /// Convert from standard CORBA time units to seconds/microseconds. ACE_Time_Value time_conversion (const TimeBase::TimeT &time); }; +/// Delay the buffering decision until the transport blocks +/** + * If the queue is empty the transport will try to send immediately. + */ class TAO_Export TAO_Delayed_Buffering_Sync_Strategy : public TAO_Eager_Buffering_Sync_Strategy { public: - ssize_t send (TAO_Transport &transport, - TAO_Stub &stub, - const ACE_Message_Block *message_block, - const ACE_Time_Value *max_wait_time); + virtual int must_queue (TAO_Stub *stub, + int queue_empty); }; #endif /* TAO_HAS_BUFFERING_CONSTRAINT_POLICY == 1 */ diff --git a/TAO/tao/TAO.dsp b/TAO/tao/TAO.dsp index 658b09cf1b0..9b379316e0e 100644 --- a/TAO/tao/TAO.dsp +++ b/TAO/tao/TAO.dsp @@ -499,6 +499,25 @@ SOURCE=.\Bind_Dispatcher_Guard.cpp # End Source File
# Begin Source File
+SOURCE=.\Block_Flushing_Strategy.cpp
+
+!IF "$(CFG)" == "TAO DLL - Win32 Alpha Release"
+
+!ELSEIF "$(CFG)" == "TAO DLL - Win32 Alpha Debug"
+
+!ELSEIF "$(CFG)" == "TAO DLL - Win32 MFC Release"
+
+!ELSEIF "$(CFG)" == "TAO DLL - Win32 MFC Debug"
+
+!ELSEIF "$(CFG)" == "TAO DLL - Win32 Release"
+
+!ELSEIF "$(CFG)" == "TAO DLL - Win32 Debug"
+
+!ENDIF
+
+# End Source File
+# Begin Source File
+
SOURCE=.\BoundsC.cpp
!IF "$(CFG)" == "TAO DLL - Win32 Alpha Release"
@@ -1202,6 +1221,25 @@ SOURCE=.\FILE_Parser.cpp # End Source File
# Begin Source File
+SOURCE=.\Flushing_Strategy.cpp
+
+!IF "$(CFG)" == "TAO DLL - Win32 Alpha Release"
+
+!ELSEIF "$(CFG)" == "TAO DLL - Win32 Alpha Debug"
+
+!ELSEIF "$(CFG)" == "TAO DLL - Win32 MFC Release"
+
+!ELSEIF "$(CFG)" == "TAO DLL - Win32 MFC Debug"
+
+!ELSEIF "$(CFG)" == "TAO DLL - Win32 Release"
+
+!ELSEIF "$(CFG)" == "TAO DLL - Win32 Debug"
+
+!ENDIF
+
+# End Source File
+# Begin Source File
+
SOURCE=.\GIOP_Message_Base.cpp
!IF "$(CFG)" == "TAO DLL - Win32 Alpha Release"
@@ -1867,6 +1905,25 @@ SOURCE=.\Marshal.cpp # End Source File
# Begin Source File
+SOURCE=.\Message_Sent_Callback.cpp
+
+!IF "$(CFG)" == "TAO DLL - Win32 Alpha Release"
+
+!ELSEIF "$(CFG)" == "TAO DLL - Win32 Alpha Debug"
+
+!ELSEIF "$(CFG)" == "TAO DLL - Win32 MFC Release"
+
+!ELSEIF "$(CFG)" == "TAO DLL - Win32 MFC Debug"
+
+!ELSEIF "$(CFG)" == "TAO DLL - Win32 Release"
+
+!ELSEIF "$(CFG)" == "TAO DLL - Win32 Debug"
+
+!ENDIF
+
+# End Source File
+# Begin Source File
+
SOURCE=.\Messaging_ORBInitializer.cpp
!IF "$(CFG)" == "TAO DLL - Win32 Alpha Release"
@@ -2646,6 +2703,44 @@ SOURCE=.\Protocols_Hooks.cpp # End Source File
# Begin Source File
+SOURCE=.\Queued_Message.cpp
+
+!IF "$(CFG)" == "TAO DLL - Win32 Alpha Release"
+
+!ELSEIF "$(CFG)" == "TAO DLL - Win32 Alpha Debug"
+
+!ELSEIF "$(CFG)" == "TAO DLL - Win32 MFC Release"
+
+!ELSEIF "$(CFG)" == "TAO DLL - Win32 MFC Debug"
+
+!ELSEIF "$(CFG)" == "TAO DLL - Win32 Release"
+
+!ELSEIF "$(CFG)" == "TAO DLL - Win32 Debug"
+
+!ENDIF
+
+# End Source File
+# Begin Source File
+
+SOURCE=.\Reactive_Flushing_Strategy.cpp
+
+!IF "$(CFG)" == "TAO DLL - Win32 Alpha Release"
+
+!ELSEIF "$(CFG)" == "TAO DLL - Win32 Alpha Debug"
+
+!ELSEIF "$(CFG)" == "TAO DLL - Win32 MFC Release"
+
+!ELSEIF "$(CFG)" == "TAO DLL - Win32 MFC Debug"
+
+!ELSEIF "$(CFG)" == "TAO DLL - Win32 Release"
+
+!ELSEIF "$(CFG)" == "TAO DLL - Win32 Debug"
+
+!ENDIF
+
+# End Source File
+# Begin Source File
+
SOURCE=.\Reactor_Registry.cpp
!IF "$(CFG)" == "TAO DLL - Win32 Alpha Release"
@@ -3694,6 +3789,10 @@ SOURCE=.\Bind_Dispatcher_Guard.h # End Source File
# Begin Source File
+SOURCE=.\Block_Flushing_Strategy.h
+# End Source File
+# Begin Source File
+
SOURCE=.\BoundsC.h
# End Source File
# Begin Source File
@@ -3854,6 +3953,10 @@ SOURCE=.\FILE_Parser.h # End Source File
# Begin Source File
+SOURCE=.\Flushing_Strategy.h
+# End Source File
+# Begin Source File
+
SOURCE=.\giop.h
# End Source File
# Begin Source File
@@ -4010,6 +4113,14 @@ SOURCE=.\marshal.h # End Source File
# Begin Source File
+SOURCE=.\Message_Sent_Callback.h
+# End Source File
+# Begin Source File
+
+SOURCE=.\Message_Sent_Callback.inl
+# End Source File
+# Begin Source File
+
SOURCE=.\Messaging_ORBInitializer.h
# End Source File
# Begin Source File
@@ -4190,6 +4301,18 @@ SOURCE=.\Protocols_Hooks.h # End Source File
# Begin Source File
+SOURCE=.\Queued_Message.h
+# End Source File
+# Begin Source File
+
+SOURCE=.\Queued_Message.inl
+# End Source File
+# Begin Source File
+
+SOURCE=.\Reactive_Flushing_Strategy.h
+# End Source File
+# Begin Source File
+
SOURCE=.\Reactor_Registry.h
# End Source File
# Begin Source File
diff --git a/TAO/tao/TAO_Static.dsp b/TAO/tao/TAO_Static.dsp index 214ac0cc0ae..f1f24e4dd47 100644 --- a/TAO/tao/TAO_Static.dsp +++ b/TAO/tao/TAO_Static.dsp @@ -40,8 +40,8 @@ RSC=rc.exe # PROP Output_Dir ""
# PROP Intermediate_Dir "LIB\Release"
# PROP Target_Dir ""
-LINK32=link.exe -lib
MTL=midl.exe
+LINK32=link.exe -lib
# ADD BASE CPP /nologo /W3 /GX /O2 /D "WIN32" /D "NDEBUG" /D "_WINDOWS" /YX /FD /c
# ADD CPP /nologo /MD /W3 /GX /O2 /I "../../" /I "../" /D "_WINDOWS" /D "_CONSOLE" /D "NDEBUG" /D "WIN32" /D "TAO_AS_STATIC_LIBS" /D "ACE_AS_STATIC_LIBS" /FD /c
# SUBTRACT CPP /YX
@@ -66,8 +66,8 @@ LIB32=link.exe -lib # PROP Output_Dir ""
# PROP Intermediate_Dir "LIB\Debug"
# PROP Target_Dir ""
-LINK32=link.exe -lib
MTL=midl.exe
+LINK32=link.exe -lib
# ADD BASE CPP /nologo /W3 /GX /Z7 /Od /D "WIN32" /D "_DEBUG" /D "_WINDOWS" /YX /FD /c
# ADD CPP /nologo /MDd /W3 /GX /Zi /Od /I "../../" /I "../" /D "_WINDOWS" /D "_CONSOLE" /D "_DEBUG" /D "WIN32" /D "ACE_AS_STATIC_LIBS" /D "TAO_AS_STATIC_LIBS" /FD /c
# SUBTRACT CPP /YX
@@ -451,6 +451,10 @@ SOURCE=.\marshal.h # End Source File
# Begin Source File
+SOURCE=.\Message_Sent_Callback.h
+# End Source File
+# Begin Source File
+
SOURCE=.\Messaging_ORBInitializer.h
# End Source File
# Begin Source File
@@ -631,6 +635,10 @@ SOURCE=.\Protocols_Hooks.h # End Source File
# Begin Source File
+SOURCE=.\Queued_Message.h
+# End Source File
+# Begin Source File
+
SOURCE=.\Reactor_Registry.h
# End Source File
# Begin Source File
@@ -1163,6 +1171,10 @@ SOURCE=.\marshal.i # End Source File
# Begin Source File
+SOURCE=.\Message_Sent_Callback.inl
+# End Source File
+# Begin Source File
+
SOURCE=.\Messaging_Policy_i.i
# End Source File
# Begin Source File
@@ -1299,6 +1311,10 @@ SOURCE=.\Profile.i # End Source File
# Begin Source File
+SOURCE=.\Queued_Message.inl
+# End Source File
+# Begin Source File
+
SOURCE=.\Reactor_Registry.i
# End Source File
# Begin Source File
@@ -1815,6 +1831,10 @@ SOURCE=.\Marshal.cpp # End Source File
# Begin Source File
+SOURCE=.\Message_Sent_Callback.cpp
+# End Source File
+# Begin Source File
+
SOURCE=.\Messaging_ORBInitializer.cpp
# End Source File
# Begin Source File
@@ -1979,6 +1999,10 @@ SOURCE=.\Protocols_Hooks.cpp # End Source File
# Begin Source File
+SOURCE=.\Queued_Message.cpp
+# End Source File
+# Begin Source File
+
SOURCE=.\Reactor_Registry.cpp
# End Source File
# Begin Source File
diff --git a/TAO/tao/Transport.cpp b/TAO/tao/Transport.cpp index 98bcc63b319..94beb0d4f9b 100644 --- a/TAO/tao/Transport.cpp +++ b/TAO/tao/Transport.cpp @@ -10,6 +10,10 @@ #include "Transport_Mux_Strategy.h" #include "Stub.h" #include "Sync_Strategies.h" +#include "Queued_Message.h" +#include "Flushing_Strategy.h" + +#include "ace/Message_Block.h" #if !defined (__ACE_INLINE__) # include "Transport.inl" @@ -22,9 +26,10 @@ TAO_Transport::TAO_Transport (CORBA::ULong tag, TAO_ORB_Core *orb_core) : tag_ (tag) , orb_core_ (orb_core) - , buffering_queue_ (0) - , buffering_timer_id_ (0) , bidirectional_flag_ (-1) + , head_ (0) + , tail_ (0) + , current_message_ (0) { TAO_Client_Strategy_Factory *cf = this->orb_core_->client_factory (); @@ -44,9 +49,19 @@ TAO_Transport::~TAO_Transport (void) delete this->tms_; this->tms_ = 0; - delete this->buffering_queue_; + // delete this->buffering_queue_; + + for (TAO_Queued_Message *i = this->head_; i != 0; i = i->next ()) + { + // @@ This is a good point to insert a flag to indicate that a + // CloseConnection message was successfully received. + i->connection_closed (); + + i->destroy (); + } } +#if 0 ssize_t TAO_Transport::send_or_buffer (TAO_Stub *stub, int two_way, @@ -197,6 +212,246 @@ TAO_Transport::reset_message (ACE_Message_Block *message_block, } } } +#endif /* 0 */ + +int +TAO_Transport::handle_output () +{ + // The reactor is asking us to send more data, first check if + // there is a current message that needs more sending: + int result = this->send_current_message (); + if (result == 0) + return 0; + + if (result > 0) + { + // ... there is no current message or it was completely + // sent, time to check the queue.... + result = this->dequeue_next_message (); + if (result == 0) + return 0; + } + // else { there was an error.... } + + if (result > 0) + { + // ... no more data to send ... remove ourselves from the + // reactor ... + result = this->cancel_output (); + } + + if (result == -1) + return -1; // There was an error, return -1 so the Reactor + else if (result == 0) + return 0; // There is no more data or socket blocked, just return 0 + + // else if (result > 0) + // There is more data to be sent, don't try right now, let the + // reactor handle it, because it can handle starvation better + return 0; +} + +int +TAO_Transport::send_current_message (void) +{ + if (this->current_message_ == 0) + return 1; + + size_t byte_count; + ssize_t n = this->send (this->current_message_->mb (), + 0, /* it is non-blocking */ + &byte_count); + if (n == 0) + { + // The connection was closed, return -1 to have the Reactor + // close this transport and event handler + return -1; + } + + // Because there can be a partial transfer we need to adjust the + // number of bytes sent. + this->current_message_->bytes_transferred (byte_count); + if (this->current_message_->done ()) + { + // Remove the current message.... + // @@ We should be using a pool for these guys! + this->current_message_->destroy (); + + this->current_message_ = 0; + + if (n == -1) + return -1; + + return 1; + } + + if (n == -1) + { + // ... timeouts and flow control are not real errors, the + // connection is still valid and we must continue sending the + // current message ... + if (errno == EWOULDBLOCK + || errno == ETIME) + return 0; + + return -1; + } + + return 0; +} + +int +TAO_Transport::dequeue_next_message (void) +{ + ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, ace_mon, this->queue_mutex_, -1); + if (this->head_ == 0) + return 1; + + this->current_message_ = this->head_; + this->head_->remove_from_list (this->head_, this->tail_); + + return 0; +} + +int +TAO_Transport::cancel_output (void) +{ + return 0; +} + +int +TAO_Transport::schedule_output (void) +{ + return 0; +} + +int +TAO_Transport::send_message_i (TAO_Stub *stub, + int twoway_flag, + const ACE_Message_Block *message_block, + ACE_Time_Value *max_wait_time) +{ + ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, ace_mon, this->queue_mutex_, -1); + + int queue_empty = (this->head_ == 0); + + // Let's figure out if the message should be queued without trying + // to send first: + int non_queued_message = + (this->current_message_ == 0) // There is an outgoing message already + && (twoway_flag + || !stub->sync_strategy ().must_queue (stub, + queue_empty)); + + TAO_Queued_Message *queued_message = 0; + if (non_queued_message) + { + // ... in this case we must try to send the message first ... + + size_t byte_count; + + // @@ I don't think we want to hold the mutex here, however if + // we release it we need to recheck the status of the transport + // after we return... once I understand the final form for this + // code I will re-visit this stuff. + ssize_t n = this->send (message_block, + 0, // non-blocking + &byte_count); + if (n == 0) + return -1; + else if (n == -1 && errno != EWOULDBLOCK) + return -1; + + // ... let's figure out if the complete message was sent ... + if (message_block->total_length () == byte_count) + { + // Done, just return. Notice that there are no allocations + // or copies up to this point (though some fancy calling + // back and forth). + // This should be the normal path in the ORB, it should be + // fast. + return 0; + } + + // ... the message was only partially sent, set it as the + // current message ... + queued_message = + new TAO_Queued_Message (message_block->clone ()); + // @@ Revisit message queue allocations + + queued_message->bytes_transferred (byte_count); + this->current_message_ = queued_message; + } + else + { + // ... otherwise simply queue the message ... + queued_message = + new TAO_Queued_Message (message_block->clone ()); + queued_message->push_back (this->head_, this->tail_); + } + + // ... two choices, this is a twoway request or not, if it is + // then we must only return once the complete message has been + // sent: + + TAO_Flushing_Strategy *flushing_strategy = + this->orb_core ()->flushing_strategy (); + if (twoway_flag) + { + // Release the mutex, other threads may modify the queue as we + // block for a long time writing out data. + ace_mon.release (); + int result = flushing_strategy->flush_message (this, queued_message); + queued_message->destroy (); + return result; + } + + // ... this is not a twoway. We must check if the buffering + // constraints have been reached, if so, then we should start + // flushing out data.... + + // First let's compute the size of the queue: + size_t msg_count = 0; + size_t total_bytes = 0; + for (TAO_Queued_Message *i = this->head_; i != 0; i = i->next ()) + { + msg_count++; + total_bytes += i->mb ()->total_length (); + } + if (this->current_message_ != 0) + { + msg_count++; + total_bytes += this->current_message_->mb ()->total_length (); + } + + int set_timer; + ACE_Time_Value interval; + + if (stub->sync_strategy ().buffering_constraints_reached (stub, + msg_count, + total_bytes, + set_timer, + interval)) + { + ace_mon.release (); + // @@ memory management of the queued messages is getting tricky. + int result = flushing_strategy->flush_message (this, + this->tail_); + return result; + } + else + { + // ... it is not time to flush yet, but maybe we need to set a + // timer ... + if (set_timer) + { + // @@ We need to schedule the timer. We should also be + // careful not to schedule one if there is one scheduled + // already. + } + } + return 0; +} int TAO_Transport::idle_after_send (void) diff --git a/TAO/tao/Transport.h b/TAO/tao/Transport.h index 2b5484a5abd..1505925b4e9 100644 --- a/TAO/tao/Transport.h +++ b/TAO/tao/Transport.h @@ -32,9 +32,7 @@ class TAO_Operation_Details; class TAO_Transport_Mux_Strategy; class TAO_Wait_Strategy; -#include "ace/Message_Queue.h" - -typedef ACE_Message_Queue<ACE_NULL_SYNCH> TAO_Transport_Buffering_Queue; +class TAO_Queued_Message; /** * @class TAO_Transport @@ -89,7 +87,7 @@ typedef ACE_Message_Queue<ACE_NULL_SYNCH> TAO_Transport_Buffering_Queue; * Also some applications may choose, for performance reasons or to * avoid complex concurrency scenarios due to nested upcalls, to * using blocking I/O - * block the + * block the * * <H4>Timeouts:</H4> Some or all messages could have a timeout period * attached to them. The timeout source could either be some @@ -111,7 +109,7 @@ typedef ACE_Message_Queue<ACE_NULL_SYNCH> TAO_Transport_Buffering_Queue; * * The Transport object provides a single method to send messages * (send_message ()). - * + * * <H3>The incoming data path:</H3> * * @todo Document the incoming data path design forces. @@ -120,15 +118,10 @@ typedef ACE_Message_Queue<ACE_NULL_SYNCH> TAO_Transport_Buffering_Queue; * <B>See Also:</B> * * http://ace.cs.wustl.edu/cvsweb/ace-latest.cgi/ACE_wrappers/TAO/docs/pluggable_protocols/index.html - * + * */ class TAO_Export TAO_Transport { - - friend class TAO_Transport_Sync_Strategy; - friend class TAO_Eager_Buffering_Sync_Strategy; - friend class TAO_Delayed_Buffering_Sync_Strategy; - public: /// default creator, requres the tag value be supplied. TAO_Transport (CORBA::ULong tag, @@ -171,6 +164,17 @@ public: */ TAO_Wait_Strategy *wait_strategy (void) const; + /// Callback method to reactively drain the outgoing data queue + int handle_output (void); + + /** + * Return the TSS leader follower condition variable used in the + * Wait Strategy. Muxed Leader Follower implementation returns a + * valid condition variable, others return 0. + */ + virtual TAO_SYNCH_CONDITION *leader_follower_condition_variable (void); + +#if 0 /// Send a request or queue it for later. /** * If the right policies are set queue the request for later. @@ -188,13 +192,6 @@ public: const ACE_Message_Block *mblk, const ACE_Time_Value *s = 0); - /** - * Return the TSS leader follower condition variable used in the - * Wait Strategy. Muxed Leader Follower implementation returns a - * valid condition variable, others return 0. - */ - virtual TAO_SYNCH_CONDITION *leader_follower_condition_variable (void); - /// Queue for buffering transport messages. virtual TAO_Transport_Buffering_Queue &buffering_queue (void); @@ -208,14 +205,7 @@ public: /// Send any messages that have been buffered. ssize_t send_buffered_messages (const ACE_Time_Value *max_wait_time = 0); - - /** - * Initialising the messaging object. This would be used by the - * connector side. On the acceptor side the connection handler - * would take care of the messaging objects. - */ - virtual int messaging_init (CORBA::Octet major, - CORBA::Octet minor) = 0; +#endif /* 0 */ /// Get/Set the bidirectional flag virtual int bidirectional_flag (void) const; @@ -255,7 +245,7 @@ public: * * The Transport class uses the Template Method Pattern to implement * the protocol specific functionality. - * Implementors of a pluggable protocol should override the + * Implementors of a pluggable protocol should override the * following methods with the semantics documented below. */ //@{ @@ -294,7 +284,7 @@ public: * framework and the TAO pluggable protocol framework. * In all the protocols implemented so far this role is fullfilled * by an instance of ACE_Svc_Handler. - * + * * @todo Since we only use a limited functionality of * ACE_Svc_Handler we could probably implement a generic * adapter class (TAO_Transport_Event_Handler or something), this @@ -340,7 +330,7 @@ public: // Read len bytes from into buf. /** - * @param buffer ORB allocated buffer where the data should be + * @param buffer ORB allocated buffer where the data should be * @@ The ACE_Time_Value *s is just a place holder for now. It is * not clear this this is the best place to specify this. The actual * timeout values will be kept in the Policies. @@ -455,7 +445,7 @@ public: virtual int read_process_message (ACE_Time_Value *max_wait_time = 0, int block = 0) = 0; - /// Register the handler with the reactor. + /// Register the handler with the reactor. /** * This method is used by the Wait_On_Reactor strategy. The * transport must register its event handler with the ORB's Reactor. @@ -467,9 +457,17 @@ public: */ virtual int register_handler (void) = 0; + /** + * Initialising the messaging object. This would be used by the + * connector side. On the acceptor side the connection handler + * would take care of the messaging objects. + */ + virtual int messaging_init (CORBA::Octet major, + CORBA::Octet minor) = 0; //@} protected: +#if 0 /// Remove the first message from the outgoing queue. void dequeue_head (void); @@ -487,7 +485,46 @@ protected: void reset_message (ACE_Message_Block *message_block, size_t bytes_delivered, int queued_message); +#endif /* 0 */ + +protected: + /// Sent the contents of <message_block>, blocking if required by + /// the twoway flag or by the current policies in the stub. + int send_message_i (TAO_Stub *stub, + int twoway_flag, + const ACE_Message_Block *message_block, + ACE_Time_Value *max_wait_time); + private: + + /// Try to send the current message. + /** + * As the outgoing data is drained this method is invoked to send as + * much of the current message as possible. + * + * Returns 0 if there is more data to send, -1 if there was an error + * and 1 if the message was completely sent. + */ + int send_current_message (void); + + /// Dequeue the next message, if any, and continue sending data + /** + * Once a message is completely sent, a new message is dequeued and + * setup as the current message. + * + * Returns 0 if there is more data to send, -1 if there was an error + * and 1 if the message was completely sent. + */ + int dequeue_next_message (void); + + /// There is data queued or pending data in the current + /// message. Enable the reactive calls through the reactor + int schedule_output (void); + + /// There is no more data to send, cancel any reactive calls through + /// the reactor + int cancel_output (void); + /// Prohibited ACE_UNIMPLEMENTED_FUNC (TAO_Transport (const TAO_Transport&)) ACE_UNIMPLEMENTED_FUNC (void operator= (const TAO_Transport&)) @@ -506,6 +543,7 @@ protected: /// Strategy for waiting for the reply after sending the request. TAO_Wait_Strategy *ws_; +#if 0 /// Queue for buffering transport messages. TAO_Transport_Buffering_Queue *buffering_queue_; @@ -514,6 +552,7 @@ protected: /// Buffering timeout value. ACE_Time_Value buffering_timeout_value_; +#endif /* 0 */ /// Use to check if bidirectional info has been synchronized with /// the peer. @@ -536,6 +575,17 @@ protected: * if the server receives the info. */ int bidirectional_flag_; + + /// Synchronize access to the outgoing data queue + TAO_SYNCH_MUTEX queue_mutex_; + + /// Implement the outgoing data queue + TAO_Queued_Message *head_; + TAO_Queued_Message *tail_; + + /// Once part of a message has been sent it is kept here until it is + /// completely sent + TAO_Queued_Message *current_message_; }; #if defined (__ACE_INLINE__) diff --git a/TAO/tao/Transport.inl b/TAO/tao/Transport.inl index 6bc1316cb35..f9c281342c9 100644 --- a/TAO/tao/Transport.inl +++ b/TAO/tao/Transport.inl @@ -1,5 +1,11 @@ // $Id$ +ACE_INLINE CORBA::ULong +TAO_Transport::tag (void) const +{ + return this->tag_; +} + ACE_INLINE TAO_ORB_Core * TAO_Transport::orb_core (void) const { @@ -19,12 +25,7 @@ TAO_Transport::wait_strategy (void) const return this->ws_; } -ACE_INLINE CORBA::ULong -TAO_Transport::tag (void) const -{ - return this->tag_; -} - +#if 0 ACE_INLINE long TAO_Transport::buffering_timer_id (void) const { @@ -77,6 +78,19 @@ TAO_Transport::dequeue_head (void) message_block->release (); } +ACE_INLINE void +TAO_Transport::dequeue_all (void) +{ + // Flush all queued messages. + if (this->buffering_queue_) + { + while (!this->buffering_queue_->is_empty ()) + this->dequeue_head (); + } +} + +#endif /* 0 */ + ACE_INLINE int TAO_Transport::bidirectional_flag (void) const { @@ -88,14 +102,3 @@ TAO_Transport::bidirectional_flag (int flag) { this->bidirectional_flag_ = flag; } - -ACE_INLINE void -TAO_Transport::dequeue_all (void) -{ - // Flush all queued messages. - if (this->buffering_queue_) - { - while (!this->buffering_queue_->is_empty ()) - this->dequeue_head (); - } -} diff --git a/TAO/tao/default_resource.cpp b/TAO/tao/default_resource.cpp index eb39187d3ba..d7920738bf8 100644 --- a/TAO/tao/default_resource.cpp +++ b/TAO/tao/default_resource.cpp @@ -11,6 +11,9 @@ #include "tao/Single_Reactor.h" #include "tao/Priority_Mapping.h" +#include "tao/Reactive_Flushing_Strategy.h" +#include "tao/Block_Flushing_Strategy.h" + #include "ace/TP_Reactor.h" #include "ace/Dynamic_Service.h" #include "ace/Arg_Shifter.h" @@ -33,6 +36,7 @@ TAO_Default_Resource_Factory::TAO_Default_Resource_Factory (void) reactor_mask_signals_ (1), dynamically_allocated_reactor_ (0), cached_connection_lock_type_ (TAO_THREAD_LOCK) + , flushing_strategy_type_ (TAO_REACTIVE_FLUSHING) { } @@ -193,6 +197,7 @@ TAO_Default_Resource_Factory::init (int argc, char **argv) this->add_to_ior_parser_names (argv[curarg]); } } + else if (ACE_OS::strcasecmp (argv[curarg], "-ORBConnectionLock") == 0) { @@ -210,6 +215,23 @@ TAO_Default_Resource_Factory::init (int argc, char **argv) } } + else if (ACE_OS::strcasecmp (argv[curarg], + "-ORBFlushingStrategy") == 0) + { + curarg++; + if (curarg < argc) + { + char *name = argv[curarg]; + + if (ACE_OS::strcasecmp (name, + "reactive") == 0) + this->flushing_strategy_type_ = TAO_REACTIVE_FLUSHING; + else if (ACE_OS::strcasecmp (name, + "blocking") == 0) + this->flushing_strategy_type_ = TAO_BLOCKING_FLUSHING; + } + } + return 0; } @@ -609,7 +631,7 @@ TAO_Default_Resource_Factory::input_cdr_buffer_allocator (void) ACE_NEW_RETURN (allocator, LOCKED_ALLOCATOR, 0); - + return allocator; } @@ -658,6 +680,21 @@ TAO_Default_Resource_Factory::create_cached_connection_lock (void) return the_lock; } +TAO_Flushing_Strategy * +TAO_Default_Resource_Factory::create_flushing_strategy (void) +{ + TAO_Flushing_Strategy *strategy = 0; + if (this->flushing_strategy_type_ == TAO_REACTIVE_FLUSHING) + ACE_NEW_RETURN (strategy, + TAO_Reactive_Flushing_Strategy, + 0); + else + ACE_NEW_RETURN (strategy, + TAO_Block_Flushing_Strategy, + 0); + return strategy; +} + TAO_Priority_Mapping * TAO_Default_Resource_Factory::get_priority_mapping (void) { diff --git a/TAO/tao/default_resource.h b/TAO/tao/default_resource.h index a9b2c85e2ee..674f676148b 100644 --- a/TAO/tao/default_resource.h +++ b/TAO/tao/default_resource.h @@ -97,6 +97,7 @@ public: virtual double purge_percentage (void) const; virtual TAO_Priority_Mapping *get_priority_mapping (void); virtual ACE_Lock *create_cached_connection_lock (void); + virtual TAO_Flushing_Strategy *create_flushing_strategy (void); protected: /// Obtain the reactor implementation @@ -167,6 +168,15 @@ private: /// Type of lock used by the cached connector. Lock_Type cached_connection_lock_type_; + + enum Flushing_Strategy_Type + { + TAO_REACTIVE_FLUSHING, + TAO_BLOCKING_FLUSHING + }; + + /// Type of flushing strategy configured + int flushing_strategy_type_; }; #if defined (__ACE_INLINE__) |