summaryrefslogtreecommitdiff
path: root/TAO/tao
diff options
context:
space:
mode:
Diffstat (limited to 'TAO/tao')
-rw-r--r--TAO/tao/Block_Flushing_Strategy.cpp33
-rw-r--r--TAO/tao/Block_Flushing_Strategy.h38
-rw-r--r--TAO/tao/Flushing_Strategy.cpp10
-rw-r--r--TAO/tao/Flushing_Strategy.h65
-rw-r--r--TAO/tao/GIOP_Message_Base.cpp2
-rw-r--r--TAO/tao/IIOP_Connection_Handler.cpp27
-rw-r--r--TAO/tao/IIOP_Transport.cpp24
-rw-r--r--TAO/tao/Message_Sent_Callback.cpp14
-rw-r--r--TAO/tao/Message_Sent_Callback.h60
-rw-r--r--TAO/tao/Message_Sent_Callback.inl6
-rw-r--r--TAO/tao/ORB_Core.cpp8
-rw-r--r--TAO/tao/ORB_Core.h17
-rw-r--r--TAO/tao/ORB_Core.i5
-rw-r--r--TAO/tao/Queued_Message.cpp96
-rw-r--r--TAO/tao/Queued_Message.h170
-rw-r--r--TAO/tao/Queued_Message.inl25
-rw-r--r--TAO/tao/Reactive_Flushing_Strategy.cpp56
-rw-r--r--TAO/tao/Reactive_Flushing_Strategy.h38
-rw-r--r--TAO/tao/Resource_Factory.h6
-rw-r--r--TAO/tao/Strategies/SHMIOP_Connection_Handler.cpp28
-rw-r--r--TAO/tao/Strategies/SHMIOP_Transport.cpp13
-rw-r--r--TAO/tao/Strategies/UIOP_Connection_Handler.cpp28
-rw-r--r--TAO/tao/Strategies/UIOP_Transport.cpp13
-rw-r--r--TAO/tao/Sync_Strategies.cpp230
-rw-r--r--TAO/tao/Sync_Strategies.h96
-rw-r--r--TAO/tao/TAO.dsp123
-rw-r--r--TAO/tao/TAO_Static.dsp28
-rw-r--r--TAO/tao/Transport.cpp261
-rw-r--r--TAO/tao/Transport.h110
-rw-r--r--TAO/tao/Transport.inl37
-rw-r--r--TAO/tao/default_resource.cpp39
-rw-r--r--TAO/tao/default_resource.h10
32 files changed, 1348 insertions, 368 deletions
diff --git a/TAO/tao/Block_Flushing_Strategy.cpp b/TAO/tao/Block_Flushing_Strategy.cpp
new file mode 100644
index 00000000000..bd49b02ad86
--- /dev/null
+++ b/TAO/tao/Block_Flushing_Strategy.cpp
@@ -0,0 +1,33 @@
+// -*- C++ -*-
+// $Id$
+
+#include "Block_Flushing_Strategy.h"
+#include "Transport.h"
+#include "Queued_Message.h"
+
+ACE_RCSID(tao, Block_Flushing_Strategy, "$Id$")
+
+int
+TAO_Block_Flushing_Strategy::schedule_output (TAO_Transport *)
+{
+ return 0;
+}
+
+int
+TAO_Block_Flushing_Strategy::cancel_output (TAO_Transport *)
+{
+ return 0;
+}
+
+int
+TAO_Block_Flushing_Strategy::flush_message (TAO_Transport *transport,
+ TAO_Queued_Message *msg)
+{
+ while (!msg->done ())
+ {
+ int result = transport->handle_output ();
+ if (result == -1)
+ return -1;
+ }
+ return 0;
+}
diff --git a/TAO/tao/Block_Flushing_Strategy.h b/TAO/tao/Block_Flushing_Strategy.h
new file mode 100644
index 00000000000..82af3a0cd3e
--- /dev/null
+++ b/TAO/tao/Block_Flushing_Strategy.h
@@ -0,0 +1,38 @@
+// -*- C++ -*-
+
+//=============================================================================
+/**
+ * @file Block_Flushing_Strategy.h
+ *
+ * $Id$
+ *
+ * @author Carlos O'Ryan <coryan@uci.edu>
+ */
+//=============================================================================
+
+#ifndef TAO_BLOCK_FLUSHING_STRATEGY_H
+#define TAO_BLOCK_FLUSHING_STRATEGY_H
+#include "ace/pre.h"
+
+#include "Flushing_Strategy.h"
+
+#if !defined (ACE_LACKS_PRAGMA_ONCE)
+# pragma once
+#endif /* ACE_LACKS_PRAGMA_ONCE */
+
+/**
+ * @class TAO_Block_Flushing_Strategy
+ *
+ * @brief Implement a flushing strategy that blocks on write to flush
+ */
+class TAO_Export TAO_Block_Flushing_Strategy : public TAO_Flushing_Strategy
+{
+public:
+ virtual int schedule_output (TAO_Transport *transport);
+ virtual int cancel_output (TAO_Transport *transport);
+ virtual int flush_message (TAO_Transport *transport,
+ TAO_Queued_Message *msg);
+};
+
+#include "ace/post.h"
+#endif /* TAO_BLOCK_FLUSHING_STRATEGY_H */
diff --git a/TAO/tao/Flushing_Strategy.cpp b/TAO/tao/Flushing_Strategy.cpp
new file mode 100644
index 00000000000..82a66ec1427
--- /dev/null
+++ b/TAO/tao/Flushing_Strategy.cpp
@@ -0,0 +1,10 @@
+// -*- C++ -*-
+// $Id$
+
+#include "Flushing_Strategy.h"
+
+ACE_RCSID(tao, Flushing_Strategy, "$Id$")
+
+TAO_Flushing_Strategy::~TAO_Flushing_Strategy (void)
+{
+}
diff --git a/TAO/tao/Flushing_Strategy.h b/TAO/tao/Flushing_Strategy.h
new file mode 100644
index 00000000000..11d6cda2983
--- /dev/null
+++ b/TAO/tao/Flushing_Strategy.h
@@ -0,0 +1,65 @@
+// -*- C++ -*-
+
+//=============================================================================
+/**
+ * @file Flushing_Strategy.h
+ *
+ * $Id$
+ *
+ * @author Carlos O'Ryan <coryan@uci.edu>
+ */
+//=============================================================================
+
+#ifndef TAO_FLUSHING_STRATEGY_H
+#define TAO_FLUSHING_STRATEGY_H
+#include "ace/pre.h"
+
+#include "corbafwd.h"
+
+#if !defined (ACE_LACKS_PRAGMA_ONCE)
+# pragma once
+#endif /* ACE_LACKS_PRAGMA_ONCE */
+
+class TAO_Transport;
+class TAO_Queued_Message;
+
+/**
+ * @class TAO_Flushing_Strategy
+ *
+ * @brief Define the interface for the flushing strategy, i.e. the
+ * algorithm that controls how does the ORB flush outgoing
+ * data.
+ *
+ * Please read the documentation in the TAO_Transport class to find
+ * out more about the design of the outgoing data path.
+ *
+ * Some applications can block the current thread whenever they are
+ * sending out data. In those cases they can obtain better
+ * performance by blocking in calls to write() than by participating
+ * in the Leader/Followers protocol to shared the ORB Reactor.
+ *
+ * This strategy controls how does the ORB schedule and cancel
+ * reactive I/O, if there is no reactive I/O the strategy is just a
+ * no-op.
+ *
+ */
+class TAO_Export TAO_Flushing_Strategy
+{
+public:
+ /// Destructor
+ virtual ~TAO_Flushing_Strategy (void);
+
+ /// Schedule the transport argument to be flushed
+ virtual int schedule_output (TAO_Transport *transport) = 0;
+
+ /// Cancel all scheduled output for the transport argument
+ virtual int cancel_output (TAO_Transport *transport) = 0;
+
+ /// Wait until msg is sent out. Potentially other messages are
+ /// flushed too, for example, because there are ahead in the queue.
+ virtual int flush_message (TAO_Transport *transport,
+ TAO_Queued_Message *msg) = 0;
+};
+
+#include "ace/post.h"
+#endif /* TAO_FLUSHING_STRATEGY_H */
diff --git a/TAO/tao/GIOP_Message_Base.cpp b/TAO/tao/GIOP_Message_Base.cpp
index 28135d4f8ea..de84da9427f 100644
--- a/TAO/tao/GIOP_Message_Base.cpp
+++ b/TAO/tao/GIOP_Message_Base.cpp
@@ -875,7 +875,7 @@ TAO_GIOP_Message_Base::send_error (TAO_Transport *transport)
0);
ACE_Message_Block message_block(&data_block);
message_block.wr_ptr (TAO_GIOP_MESSAGE_HEADER_LEN);
-
+
int result = transport->send (&message_block);
if (result == -1)
{
diff --git a/TAO/tao/IIOP_Connection_Handler.cpp b/TAO/tao/IIOP_Connection_Handler.cpp
index 81ca246d8b8..2aa95f41a7a 100644
--- a/TAO/tao/IIOP_Connection_Handler.cpp
+++ b/TAO/tao/IIOP_Connection_Handler.cpp
@@ -53,20 +53,8 @@ TAO_IIOP_Connection_Handler::TAO_IIOP_Connection_Handler (TAO_ORB_Core *orb_core
TAO_IIOP_Connection_Handler::~TAO_IIOP_Connection_Handler (void)
{
- // If the socket has not already been closed.
- if (this->get_handle () != ACE_INVALID_HANDLE)
- {
- // Cannot deal with errors, and therefore they are ignored.
- this->transport_.send_buffered_messages ();
- }
- else
- {
- // Dequeue messages and delete message blocks.
- this->transport_.dequeue_all ();
- }
}
-
int
TAO_IIOP_Connection_Handler::open (void*)
{
@@ -206,28 +194,17 @@ TAO_IIOP_Connection_Handler::fetch_handle (void)
return this->get_handle ();
}
-
int
TAO_IIOP_Connection_Handler::handle_timeout (const ACE_Time_Value &,
const void *)
{
- // This method is called when buffering timer expires.
- //
- ACE_Time_Value *max_wait_time = 0;
-
- TAO_Stub *stub = 0;
- int has_timeout;
- this->orb_core ()->call_timeout_hook (stub,
- has_timeout,
- *max_wait_time);
-
// Cannot deal with errors, and therefore they are ignored.
- this->transport ()->send_buffered_messages (max_wait_time);
+ if (this->transport ()->handle_output () == -1)
+ return -1;
return 0;
}
-
int
TAO_IIOP_Connection_Handler::close (u_long)
{
diff --git a/TAO/tao/IIOP_Transport.cpp b/TAO/tao/IIOP_Transport.cpp
index fea6cdfdb40..25de3704e1f 100644
--- a/TAO/tao/IIOP_Transport.cpp
+++ b/TAO/tao/IIOP_Transport.cpp
@@ -90,10 +90,9 @@ TAO_IIOP_Transport::send (const ACE_Message_Block *message_block,
const ACE_Time_Value *max_wait_time,
size_t *bytes_transferred)
{
- return ACE::send_n (this->handle (),
- message_block,
- max_wait_time,
- bytes_transferred);
+ return ACE::send (this->handle (),
+ message_block,
+ bytes_transferred);
}
ssize_t
@@ -197,12 +196,8 @@ TAO_IIOP_Transport::send_message (TAO_OutputCDR &stream,
if (this->messaging_object_->format_message (stream) != 0)
return -1;
- // Strictly speaking, should not need to loop here because the
- // socket never gets set to a nonblocking mode ... some Linux
- // versions seem to need it though. Leaving it costs little.
-
// This guarantees to send all data (bytes) or return an error.
- ssize_t n = this->send_or_buffer (stub,
+ ssize_t n = this->send_message_i (stub,
twoway,
stream.begin (),
max_wait_time);
@@ -218,17 +213,6 @@ TAO_IIOP_Transport::send_message (TAO_OutputCDR &stream,
return -1;
}
- // EOF.
- if (n == 0)
- {
- if (TAO_debug_level)
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("TAO: (%P|%t|%N|%l) send_message () \n")
- ACE_TEXT ("EOF, closing conn %d\n"),
- this->handle()));
- return -1;
- }
-
return 1;
}
diff --git a/TAO/tao/Message_Sent_Callback.cpp b/TAO/tao/Message_Sent_Callback.cpp
new file mode 100644
index 00000000000..5508945fbde
--- /dev/null
+++ b/TAO/tao/Message_Sent_Callback.cpp
@@ -0,0 +1,14 @@
+// -*- C++ -*-
+// $Id$
+
+#include "Message_Sent_Callback.h"
+
+#if !defined (__ACE_INLINE__)
+# include "Message_Sent_Callback.inl"
+#endif /* __ACE_INLINE__ */
+
+ACE_RCSID(tao, Message_Sent_Callback, "$Id$")
+
+TAO_Message_Sent_Callback::~TAO_Message_Sent_Callback (void)
+{
+}
diff --git a/TAO/tao/Message_Sent_Callback.h b/TAO/tao/Message_Sent_Callback.h
new file mode 100644
index 00000000000..5a3bccda8ed
--- /dev/null
+++ b/TAO/tao/Message_Sent_Callback.h
@@ -0,0 +1,60 @@
+// -*- C++ -*-
+
+//=============================================================================
+/**
+ * @file Message_Sent_Callback.h
+ *
+ * $Id$
+ *
+ * @author Carlos O'Ryan <coryan@uci.edu>
+ */
+//=============================================================================
+
+#ifndef TAO_MESSAGE_SENT_CALLBACK_H
+#define TAO_MESSAGE_SENT_CALLBACK_H
+#include "ace/pre.h"
+
+#include "corbafwd.h"
+
+#if !defined (ACE_LACKS_PRAGMA_ONCE)
+# pragma once
+#endif /* ACE_LACKS_PRAGMA_ONCE */
+
+/**
+ * @class TAO_Message_Sent_Callback
+ *
+ * @brief Encapsulate the signaling mechanism used to wake up threads
+ * waiting for a message to be sent out.
+ *
+ * Please read the documentation in the TAO_Transport class to find
+ * out more about the design of the outgoing data path.
+ *
+ */
+class TAO_Export TAO_Message_Sent_Callback
+{
+public:
+ /// Constructor
+ TAO_Message_Sent_Callback (void);
+
+ /// Destructor
+ virtual ~TAO_Message_Sent_Callback (void);
+
+ /// The message has been successfully sent
+ virtual void send_completed (void) = 0;
+
+ /// The message has failed
+ virtual void send_failed (void) = 0;
+
+ /// The message has timedout
+ virtual void send_timeout (void) = 0;
+
+ /// The connection was closed before the message was sent
+ virtual void connection_closed (void) = 0;
+};
+
+#if defined (__ACE_INLINE__)
+# include "Message_Sent_Callback.inl"
+#endif /* __ACE_INLINE__ */
+
+#include "ace/post.h"
+#endif /* TAO_MESSAGE_SENT_CALLBACK_H */
diff --git a/TAO/tao/Message_Sent_Callback.inl b/TAO/tao/Message_Sent_Callback.inl
new file mode 100644
index 00000000000..e7b8f4d6668
--- /dev/null
+++ b/TAO/tao/Message_Sent_Callback.inl
@@ -0,0 +1,6 @@
+// $Id$
+
+ACE_INLINE
+TAO_Message_Sent_Callback::TAO_Message_Sent_Callback (void)
+{
+}
diff --git a/TAO/tao/ORB_Core.cpp b/TAO/tao/ORB_Core.cpp
index 21b06de9766..f352135f555 100644
--- a/TAO/tao/ORB_Core.cpp
+++ b/TAO/tao/ORB_Core.cpp
@@ -35,6 +35,8 @@
#include "IORInfo.h"
+#include "Flushing_Strategy.h"
+
#if defined(ACE_MVS)
#include "ace/Codeset_IBM1047.h"
#endif /* ACE_MVS */
@@ -158,6 +160,7 @@ TAO_ORB_Core::TAO_ORB_Core (const char *orbid)
parser_registry_ (),
connection_cache_ (),
bidir_giop_policy_ (0)
+ , flushing_strategy_ (0)
{
#if defined(ACE_MVS)
ACE_NEW (this->from_iso8859_, ACE_IBM1047_ISO8859);
@@ -221,6 +224,8 @@ TAO_ORB_Core::TAO_ORB_Core (const char *orbid)
TAO_ORB_Core::~TAO_ORB_Core (void)
{
+ delete this->flushing_strategy_;
+
ACE_OS::free (this->orbid_);
delete this->from_iso8859_;
@@ -1043,6 +1048,9 @@ TAO_ORB_Core::init (int &argc, char *argv[], CORBA::Environment &ACE_TRY_ENV)
// init the ORB core's pointer
this->protocol_factories_ = trf->get_protocol_factories ();
+ // Initialize the flushing strategy
+ this->flushing_strategy_ = trf->create_flushing_strategy ();
+
// Now that we have a complete list of available protocols and their
// related factory objects, set default policies and initialize the
// registries!
diff --git a/TAO/tao/ORB_Core.h b/TAO/tao/ORB_Core.h
index 163294e9e17..17113eccd7b 100644
--- a/TAO/tao/ORB_Core.h
+++ b/TAO/tao/ORB_Core.h
@@ -82,6 +82,8 @@ class TAO_Message_State_Factory;
class TAO_ServerRequest;
class TAO_Protocols_Hooks;
+class TAO_Flushing_Strategy;
+
#if (TAO_HAS_BUFFERING_CONSTRAINT_POLICY == 1)
class TAO_Eager_Buffering_Sync_Strategy;
@@ -861,13 +863,21 @@ public:
CORBA::Boolean bidir_giop_policy (void);
void bidir_giop_policy (CORBA::Boolean);
-
-
/// Return the table that maps object key/name to de-stringified
/// object reference. It is needed for supporting local objects in
/// the resolve_initial_references() mechanism.
TAO_Object_Ref_Table &object_ref_table (void);
+ /// Return the flushing strategy
+ /**
+ * The flushing strategy is created by the resource factory, and it
+ * is used by the ORB to control the mechanism used to flush the
+ * outgoing data queues.
+ * The flushing strategies are stateless, therefore, there is only
+ * one per ORB.
+ */
+ TAO_Flushing_Strategy *flushing_strategy (void);
+
protected:
/// Destructor is protected since the ORB Core should only be
@@ -1204,6 +1214,9 @@ protected:
/// Bir Dir GIOP policy value
CORBA::Boolean bidir_giop_policy_;
+
+ /// Hold the flushing strategy
+ TAO_Flushing_Strategy *flushing_strategy_;
};
// ****************************************************************
diff --git a/TAO/tao/ORB_Core.i b/TAO/tao/ORB_Core.i
index 2a3b815aec1..fce8b81dffe 100644
--- a/TAO/tao/ORB_Core.i
+++ b/TAO/tao/ORB_Core.i
@@ -47,6 +47,11 @@ TAO_ORB_Core::object_ref_table (void)
return this->object_ref_table_;
}
+ACE_INLINE TAO_Flushing_Strategy *
+TAO_ORB_Core::flushing_strategy (void)
+{
+ return this->flushing_strategy_;
+}
ACE_INLINE CORBA::Boolean
TAO_ORB_Core::service_profile_selection (TAO_MProfile &mprofile,
diff --git a/TAO/tao/Queued_Message.cpp b/TAO/tao/Queued_Message.cpp
new file mode 100644
index 00000000000..f3d4a2ed8f8
--- /dev/null
+++ b/TAO/tao/Queued_Message.cpp
@@ -0,0 +1,96 @@
+// -*- C++ -*-
+// $Id$
+
+#include "Queued_Message.h"
+#include "Message_Sent_Callback.h"
+
+#if !defined (__ACE_INLINE__)
+# include "Queued_Message.inl"
+#endif /* __ACE_INLINE__ */
+
+ACE_RCSID(tao, Queued_Message, "$Id$")
+
+TAO_Queued_Message::TAO_Queued_Message (ACE_Message_Block *contents,
+ TAO_Message_Sent_Callback *callback)
+ : contents_ (contents)
+ , callback_ (callback)
+ , next_ (0)
+ , prev_ (0)
+{
+}
+
+TAO_Queued_Message::~TAO_Queued_Message (void)
+{
+ ACE_Message_Block::release (this->contents_);
+}
+
+void
+TAO_Queued_Message::connection_closed (void)
+{
+ if (this->callback_ != 0)
+ this->callback_->connection_closed ();
+}
+
+void
+TAO_Queued_Message::destroy (void)
+{
+ delete this;
+}
+
+void
+TAO_Queued_Message::bytes_transferred (size_t byte_count)
+{
+ while (byte_count > 0 && !this->done ())
+ {
+ size_t l = this->contents_->length ();
+ if (byte_count < l)
+ {
+ this->contents_->rd_ptr (byte_count);
+ return;
+ }
+ ACE_Message_Block *cont = this->contents_->cont ();
+ byte_count -= l;
+ ACE_Message_Block::release (this->contents_);
+ this->contents_ = cont;
+ }
+}
+
+void
+TAO_Queued_Message::remove_from_list (TAO_Queued_Message *&head,
+ TAO_Queued_Message *&tail)
+{
+ if (this->prev_ != 0)
+ this->prev_->next_ = this->next_;
+ else
+ head = this->next_;
+
+ if (this->next_ != 0)
+ this->next_->prev_ = this->prev_;
+ else
+ tail = this->prev_;
+
+ this->next_ = 0;
+ this->prev_ = 0;
+}
+
+void
+TAO_Queued_Message::push_back (TAO_Queued_Message *&head,
+ TAO_Queued_Message *&tail)
+{
+ if (tail == 0)
+ {
+ tail = this;
+ head = this;
+ this->next_ = 0;
+ this->prev_ = 0;
+ return;
+ }
+
+ this->prev_ = tail;
+ this->next_ = 0;
+ if (tail->prev_ != 0)
+ {
+ tail->prev_->next_ = this;
+ }
+ tail = this;
+}
diff --git a/TAO/tao/Queued_Message.h b/TAO/tao/Queued_Message.h
new file mode 100644
index 00000000000..ddb66a92e7f
--- /dev/null
+++ b/TAO/tao/Queued_Message.h
@@ -0,0 +1,170 @@
+// -*- C++ -*-
+
+//=============================================================================
+/**
+ * @file Queued_Message.h
+ *
+ * $Id$
+ *
+ * @author Carlos O'Ryan <coryan@uci.edu>
+ */
+//=============================================================================
+
+#ifndef TAO_QUEUED_MESSAGE_H
+#define TAO_QUEUED_MESSAGE_H
+#include "ace/pre.h"
+
+#include "corbafwd.h"
+
+#if !defined (ACE_LACKS_PRAGMA_ONCE)
+# pragma once
+#endif /* ACE_LACKS_PRAGMA_ONCE */
+
+class ACE_Message_Block;
+class TAO_Message_Sent_Callback;
+
+/**
+ * @class TAO_Queued_Message
+ *
+ * @brief Implement an queued message for the outgoing path in the
+ * TAO_Transport class
+ *
+ * Please read the documentation in the TAO_Transport class to find
+ * out more about the design of the outgoing data path.
+ *
+ * In some configurations TAO needs to maintain a per-connection queue
+ * of outgoing messages. This queue is drained by the pluggable
+ * protocols framework, normally under control of the ACE_Reactor, but
+ * other configurations are conceivable. The elements in the queue
+ * may may removed early, for example, because the application can
+ * specify timeouts for each message, or because the underlying
+ * connection is broken.
+ *
+ * In many cases the message corresponds to some application request,
+ * the application may be blocked waiting for the request to be sent,
+ * even more importantlyl, the ORB can be configured to use the
+ * Leader/Followers strategy, in which case one of the waiting threads
+ * can be required to wake up before its message completes
+ * each message may contain a 'Sent_Notifier'
+ *
+ * <H4>NOTE:</H4> The contents of the ACE_Message_Block may have been
+ * allocated from TSS storage, in that case we cannot steal them.
+ * However, we do not need to perform a deep copy all the time, for
+ * example, in a twoway request the sending thread blocks until the
+ * data goes out. The queued message can borrow the memory as it will
+ * be deallocated by the sending thread when it finishes.
+ * Oneways and asynchronous calls are another story.
+ *
+ * @todo: Change the ORB to allocate oneway and AMI buffer from global
+ * memory, to avoid the data copy in this path. What happens
+ * if the there is no queueing? Can we check that before
+ * allocating the memory?
+ *
+ */
+class TAO_Export TAO_Queued_Message
+{
+public:
+ /// Constructor
+ /**
+ * @param contents The message block chain that must be sent, this
+ * class <B>always</B> assumes ownership of the chain.
+ *
+ * @param callback A callback interface to signal any waiting
+ * threads about the status of the message. It is null if there are
+ * no waiting threads.
+ */
+ TAO_Queued_Message (ACE_Message_Block *contents,
+ TAO_Message_Sent_Callback *callback = 0);
+
+ /// Destructor
+ virtual ~TAO_Queued_Message (void);
+
+ /// Get the message block
+ ACE_Message_Block *mb (void) const;
+
+ /// The transport has successfully sent more data, adjust internal
+ /// status
+ void bytes_transferred (size_t byte_count);
+
+ /// Return 0 if the message has not been completely sent
+ int done (void) const;
+
+ /// The underlying connection has been closed, release resources and
+ /// signal waiting threads.
+ void connection_closed (void);
+
+ /// Reclaim resources
+ void destroy (void);
+
+ /** @name Intrusive list manipulation
+ *
+ * The messages are put in a doubled linked list (for easy insertion
+ * and removal). To minimize memory allocations the list is
+ * intrusive, i.e. each element in the list contains the pointers
+ * for the next and previous element.
+ *
+ * The following methods are used to manipulate this implicit list.
+ *
+ * @todo: We should implement this as a base template, something
+ * like:<BR>
+ * template<class T> Intrusive_Node {<BR>
+ * public:<BR><BR>
+ * void next (T *);<BR>
+ * T* next () const;<BR><BR>
+ * private:<BR>
+ * T* next_;<BR>
+ * };<BR>
+ * and use it as follows:<BR>
+ * class TAO_Queued_Message : public Intrusive_Node<TAO_Queued_Message><BR>
+ * {<BR>
+ * };<BR>
+ *
+ */
+ //@{
+ /// Set/get the next element in the list
+ virtual TAO_Queued_Message *next (void) const;
+
+ /// Set/get the previous element in the list
+ virtual TAO_Queued_Message *prev (void) const;
+
+ /// Remove this element from the list
+ virtual void remove_from_list (TAO_Queued_Message *&head,
+ TAO_Queued_Message *&tail);
+
+ /// Insert the current element after position.
+ /**
+ * If position is null then we assume that we are inserting the
+ * current element into an empty list.
+ */
+ virtual void push_back (TAO_Queued_Message *&head,
+ TAO_Queued_Message *&tail);
+ //@}
+
+private:
+ /// The contents of the message.
+ /**
+ * The message is normally generated by a TAO_OutputCDR stream. The
+ * application marshals the payload, possibly generating a chain of
+ * message block connected via the 'cont()' field.
+ */
+ ACE_Message_Block *contents_;
+
+ /// If not null, this is the object that we signal to indicate that
+ /// the message was sent.
+ /**
+ * The signaling mechanism used to wakeup the thread waiting for
+ * this message to complete changes
+ */
+ TAO_Message_Sent_Callback *callback_;
+
+ /// Implement an intrusive double-linked list for the message queue
+ TAO_Queued_Message *next_;
+ TAO_Queued_Message *prev_;
+};
+
+#if defined (__ACE_INLINE__)
+# include "Queued_Message.inl"
+#endif /* __ACE_INLINE__ */
+
+#include "ace/post.h"
+#endif /* TAO_QUEUED_MESSAGE_H */
diff --git a/TAO/tao/Queued_Message.inl b/TAO/tao/Queued_Message.inl
new file mode 100644
index 00000000000..76a86e62d1c
--- /dev/null
+++ b/TAO/tao/Queued_Message.inl
@@ -0,0 +1,25 @@
+// $Id$
+
+ACE_INLINE ACE_Message_Block *
+TAO_Queued_Message::mb (void) const
+{
+ return this->contents_;
+}
+
+ACE_INLINE int
+TAO_Queued_Message::done (void) const
+{
+ return this->contents_ != 0;
+}
+
+ACE_INLINE TAO_Queued_Message *
+TAO_Queued_Message::next (void) const
+{
+ return this->next_;
+}
+
+ACE_INLINE TAO_Queued_Message *
+TAO_Queued_Message::prev (void) const
+{
+ return this->prev_;
+}
diff --git a/TAO/tao/Reactive_Flushing_Strategy.cpp b/TAO/tao/Reactive_Flushing_Strategy.cpp
new file mode 100644
index 00000000000..58df0474633
--- /dev/null
+++ b/TAO/tao/Reactive_Flushing_Strategy.cpp
@@ -0,0 +1,56 @@
+// -*- C++ -*-
+// $Id$
+
+#include "Reactive_Flushing_Strategy.h"
+#include "Transport.h"
+#include "ORB_Core.h"
+#include "Queued_Message.h"
+
+ACE_RCSID(tao, Reactive_Flushing_Strategy, "$Id$")
+
+int
+TAO_Reactive_Flushing_Strategy::schedule_output (TAO_Transport *transport)
+{
+ ACE_Reactor *reactor =
+ transport->orb_core ()->reactor ();
+
+ return reactor->register_handler (transport->event_handler (),
+ ACE_Event_Handler::READ_MASK
+ | ACE_Event_Handler::WRITE_MASK);
+}
+
+int
+TAO_Reactive_Flushing_Strategy::cancel_output (TAO_Transport *transport)
+{
+ ACE_Reactor *reactor =
+ transport->orb_core ()->reactor ();
+
+ return reactor->register_handler (transport->event_handler (),
+ ACE_Event_Handler::READ_MASK);
+}
+
+int
+TAO_Reactive_Flushing_Strategy::flush_message (TAO_Transport *transport,
+ TAO_Queued_Message *msg)
+{
+ TAO_ORB_Core *orb_core = transport->orb_core ();
+
+ int result = 0;
+ // @@ Should we pass this down? Can we?
+ ACE_DECLARE_NEW_CORBA_ENV;
+ ACE_TRY
+ {
+ while (!msg->done () && result > 0)
+ {
+ result = orb_core->run (0, 1, ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+ }
+ }
+ ACE_CATCHANY
+ {
+ return -1;
+ }
+ ACE_ENDTRY;
+
+ return result;
+}
diff --git a/TAO/tao/Reactive_Flushing_Strategy.h b/TAO/tao/Reactive_Flushing_Strategy.h
new file mode 100644
index 00000000000..c1a33aaef5b
--- /dev/null
+++ b/TAO/tao/Reactive_Flushing_Strategy.h
@@ -0,0 +1,38 @@
+// -*- C++ -*-
+
+//=============================================================================
+/**
+ * @file Reactive_Flushing_Strategy.h
+ *
+ * $Id$
+ *
+ * @author Carlos O'Ryan <coryan@uci.edu>
+ */
+//=============================================================================
+
+#ifndef TAO_REACTIVE_FLUSHING_STRATEGY_H
+#define TAO_REACTIVE_FLUSHING_STRATEGY_H
+#include "ace/pre.h"
+
+#include "Flushing_Strategy.h"
+
+#if !defined (ACE_LACKS_PRAGMA_ONCE)
+# pragma once
+#endif /* ACE_LACKS_PRAGMA_ONCE */
+
+/**
+ * @class TAO_Reactive_Flushing_Strategy
+ *
+ * @brief Implement a flushing strategy that uses the reactor.
+ */
+class TAO_Export TAO_Reactive_Flushing_Strategy : public TAO_Flushing_Strategy
+{
+public:
+ virtual int schedule_output (TAO_Transport *transport);
+ virtual int cancel_output (TAO_Transport *transport);
+ virtual int flush_message (TAO_Transport *transport,
+ TAO_Queued_Message *msg);
+};
+
+#include "ace/post.h"
+#endif /* TAO_REACTIVE_FLUSHING_STRATEGY_H */
diff --git a/TAO/tao/Resource_Factory.h b/TAO/tao/Resource_Factory.h
index 523aaaea637..a3e66b87d53 100644
--- a/TAO/tao/Resource_Factory.h
+++ b/TAO/tao/Resource_Factory.h
@@ -32,6 +32,8 @@ class TAO_Connector_Registry;
class TAO_Reactor_Registry;
class TAO_Priority_Mapping;
+class TAO_Flushing_Strategy;
+
// ****************************************************************
class TAO_Export TAO_Protocol_Item
@@ -181,6 +183,10 @@ public:
/// Creates the lock for the lock needed in the Cache Map
virtual ACE_Lock *create_cached_connection_lock (void);
+ /// Creates the flushing strategy. The new instance is owned by the
+ /// caller.
+ virtual TAO_Flushing_Strategy *create_flushing_strategy (void) = 0;
+
protected:
/**
* Loads the default protocols. This method is used so that the
diff --git a/TAO/tao/Strategies/SHMIOP_Connection_Handler.cpp b/TAO/tao/Strategies/SHMIOP_Connection_Handler.cpp
index 0b87d02ba5c..4abbb20eea1 100644
--- a/TAO/tao/Strategies/SHMIOP_Connection_Handler.cpp
+++ b/TAO/tao/Strategies/SHMIOP_Connection_Handler.cpp
@@ -52,21 +52,8 @@ TAO_SHMIOP_Connection_Handler::TAO_SHMIOP_Connection_Handler (TAO_ORB_Core *orb_
TAO_SHMIOP_Connection_Handler::~TAO_SHMIOP_Connection_Handler (void)
{
-
- // If the socket has not already been closed.
- if (this->get_handle () != ACE_INVALID_HANDLE)
- {
- // Cannot deal with errors, and therefore they are ignored.
- this->transport_.send_buffered_messages ();
- }
- else
- {
- // Dequeue messages and delete message blocks.
- this->transport_.dequeue_all ();
- }
}
-
int
TAO_SHMIOP_Connection_Handler::open (void*)
{
@@ -211,28 +198,17 @@ TAO_SHMIOP_Connection_Handler::fetch_handle (void)
return this->get_handle ();
}
-
int
TAO_SHMIOP_Connection_Handler::handle_timeout (const ACE_Time_Value &,
const void *)
{
- // This method is called when buffering timer expires.
- //
- ACE_Time_Value *max_wait_time = 0;
-
- TAO_Stub *stub = 0;
- int has_timeout;
- this->orb_core ()->call_timeout_hook (stub,
- has_timeout,
- *max_wait_time);
-
// Cannot deal with errors, and therefore they are ignored.
- this->transport ()->send_buffered_messages (max_wait_time);
+ if (this->transport ()->handle_output () == -1)
+ return -1;
return 0;
}
-
int
TAO_SHMIOP_Connection_Handler::close (u_long)
{
diff --git a/TAO/tao/Strategies/SHMIOP_Transport.cpp b/TAO/tao/Strategies/SHMIOP_Transport.cpp
index 27fd2d23b68..3931ecfc0b7 100644
--- a/TAO/tao/Strategies/SHMIOP_Transport.cpp
+++ b/TAO/tao/Strategies/SHMIOP_Transport.cpp
@@ -204,7 +204,7 @@ TAO_SHMIOP_Transport::send_message (TAO_OutputCDR &stream,
// versions seem to need it though. Leaving it costs little.
// This guarantees to send all data (bytes) or return an error.
- ssize_t n = this->send_or_buffer (stub,
+ ssize_t n = this->send_message_i (stub,
twoway,
stream.begin (),
max_wait_time);
@@ -220,17 +220,6 @@ TAO_SHMIOP_Transport::send_message (TAO_OutputCDR &stream,
return -1;
}
- // EOF.
- if (n == 0)
- {
- if (TAO_debug_level)
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("TAO: (%P|%t|%N|%l) send_message () \n")
- ACE_TEXT ("EOF, closing conn %d\n"),
- this->handle()));
- return -1;
- }
-
return 1;
}
diff --git a/TAO/tao/Strategies/UIOP_Connection_Handler.cpp b/TAO/tao/Strategies/UIOP_Connection_Handler.cpp
index 03fd6a1ae99..58c61ec1947 100644
--- a/TAO/tao/Strategies/UIOP_Connection_Handler.cpp
+++ b/TAO/tao/Strategies/UIOP_Connection_Handler.cpp
@@ -56,22 +56,8 @@ TAO_UIOP_Connection_Handler::TAO_UIOP_Connection_Handler (TAO_ORB_Core *orb_core
TAO_UIOP_Connection_Handler::~TAO_UIOP_Connection_Handler (void)
{
-
- // If the socket has not already been closed.
- if (this->get_handle () != ACE_INVALID_HANDLE)
- {
- // Cannot deal with errors, and therefore they are ignored.
- this->transport_.send_buffered_messages ();
- }
- else
- {
- // Dequeue messages and delete message blocks.
- this->transport_.dequeue_all ();
- }
}
-
-
int
TAO_UIOP_Connection_Handler::open (void*)
{
@@ -198,23 +184,13 @@ int
TAO_UIOP_Connection_Handler::handle_timeout (const ACE_Time_Value &,
const void *)
{
- // This method is called when buffering timer expires.
- //
- ACE_Time_Value *max_wait_time = 0;
-
- TAO_Stub *stub = 0;
- int has_timeout;
- this->orb_core ()->call_timeout_hook (stub,
- has_timeout,
- *max_wait_time);
-
// Cannot deal with errors, and therefore they are ignored.
- this->transport ()->send_buffered_messages (max_wait_time);
+ if (this->transport ()->handle_output () == -1)
+ return -1;
return 0;
}
-
int
TAO_UIOP_Connection_Handler::close (u_long)
{
diff --git a/TAO/tao/Strategies/UIOP_Transport.cpp b/TAO/tao/Strategies/UIOP_Transport.cpp
index ac762b53c9e..54ad2408b58 100644
--- a/TAO/tao/Strategies/UIOP_Transport.cpp
+++ b/TAO/tao/Strategies/UIOP_Transport.cpp
@@ -206,7 +206,7 @@ TAO_UIOP_Transport::send_message (TAO_OutputCDR &stream,
// versions seem to need it though. Leaving it costs little.
// This guarantees to send all data (bytes) or return an error.
- ssize_t n = this->send_or_buffer (stub,
+ ssize_t n = this->send_message_i (stub,
twoway,
stream.begin (),
max_wait_time);
@@ -222,17 +222,6 @@ TAO_UIOP_Transport::send_message (TAO_OutputCDR &stream,
return -1;
}
- // EOF.
- if (n == 0)
- {
- if (TAO_debug_level)
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("TAO: (%P|%t|%N|%l) send_message () \n")
- ACE_TEXT ("EOF, closing conn %d\n"),
- this->handle()));
- return -1;
- }
-
return 1;
}
diff --git a/TAO/tao/Sync_Strategies.cpp b/TAO/tao/Sync_Strategies.cpp
index c679fc3405c..d79f1109e27 100644
--- a/TAO/tao/Sync_Strategies.cpp
+++ b/TAO/tao/Sync_Strategies.cpp
@@ -14,129 +14,47 @@ TAO_Sync_Strategy::~TAO_Sync_Strategy (void)
{
}
-ssize_t
-TAO_Transport_Sync_Strategy::send (TAO_Transport &transport,
- TAO_Stub &,
- const ACE_Message_Block *message_block,
- const ACE_Time_Value *max_wait_time)
-{
- // Immediate delegation to the transport.
- return transport.send (message_block,
- max_wait_time);
-}
+// ****************************************************************
-#if (TAO_HAS_BUFFERING_CONSTRAINT_POLICY == 1)
-
-ssize_t
-TAO_Delayed_Buffering_Sync_Strategy::send (TAO_Transport &transport,
- TAO_Stub &stub,
- const ACE_Message_Block *mb,
- const ACE_Time_Value *max_wait_time)
+int
+TAO_Transport_Sync_Strategy::
+ must_queue (TAO_Stub *, int)
{
- ACE_Message_Block *message_block =
- ACE_const_cast (ACE_Message_Block *, mb);
-
- ssize_t result = 0;
-
- // Get the message queue from the transport.
- TAO_Transport_Buffering_Queue &buffering_queue =
- transport.buffering_queue ();
-
- // Check if there are messages already in the queue.
- if (!buffering_queue.is_empty ())
- return TAO_Eager_Buffering_Sync_Strategy::send (transport,
- stub,
- message_block,
- max_wait_time);
-
- //
- // Otherwise there were no queued messages. We first try to send
- // the message right away.
- //
-
- // Actual network send.
- size_t bytes_transferred = 0;
- result = transport.send (message_block,
- max_wait_time,
- &bytes_transferred);
-
- // Cannot send completely: timed out.
- if (result == -1 &&
- errno == ETIME)
- {
- if (bytes_transferred > 0)
- {
- // If successful in sending some of the data, reset the
- // message block appropriately.
- transport.reset_sent_message (message_block,
- bytes_transferred);
- }
-
- // Queue the rest.
- return bytes_transferred +
- TAO_Eager_Buffering_Sync_Strategy::send (transport,
- stub,
- message_block,
- max_wait_time);
- }
-
- // EOF or other errors.
- if (result == -1 ||
- result == 0)
- return -1;
-
- // Everything was successfully delivered.
- return result;
+ return 0;
}
-ssize_t
-TAO_Eager_Buffering_Sync_Strategy::send (TAO_Transport &transport,
- TAO_Stub &stub,
- const ACE_Message_Block *message_block,
- const ACE_Time_Value *max_wait_time)
+int
+TAO_Transport_Sync_Strategy::
+ buffering_constraints_reached (TAO_Stub *,
+ size_t ,
+ size_t ,
+ int &,
+ ACE_Time_Value &)
{
- ssize_t result = 0;
-
- // Get the message queue from the transport.
- TAO_Transport_Buffering_Queue &buffering_queue =
- transport.buffering_queue ();
-
- // Copy the message.
- ACE_Message_Block *copy = message_block->clone ();
-
- // Enqueue current message.
- result = buffering_queue.enqueue_tail (copy);
-
- // Enqueuing error.
- if (result == -1)
- {
- // Eliminate the copy.
- copy->release ();
+ return 0;
+}
- // Return error.
- return -1;
- }
+#if (TAO_HAS_BUFFERING_CONSTRAINT_POLICY == 1)
- // Check if upper bound has been reached.
- if (this->buffering_constraints_reached (transport,
- stub,
- buffering_queue))
- {
- return transport.send_buffered_messages (max_wait_time);
- }
+// ****************************************************************
- // Hoping that this return value is meaningful or at least
- // acceptable.
- return message_block->total_length ();
+int
+TAO_Eager_Buffering_Sync_Strategy::
+ must_queue (TAO_Stub *, int)
+{
+ return 1;
}
int
-TAO_Eager_Buffering_Sync_Strategy::buffering_constraints_reached (TAO_Transport &transport,
- TAO_Stub &stub,
- TAO_Transport_Buffering_Queue &buffering_queue)
+TAO_Eager_Buffering_Sync_Strategy::
+ buffering_constraints_reached (TAO_Stub *stub,
+ size_t msg_count,
+ size_t total_bytes,
+ int &set_timer,
+ ACE_Time_Value &interval)
{
TAO_Buffering_Constraint_Policy *buffering_constraint_policy =
- stub.buffering_constraint ();
+ stub->buffering_constraint ();
if (buffering_constraint_policy == 0)
return 1;
@@ -147,86 +65,53 @@ TAO_Eager_Buffering_Sync_Strategy::buffering_constraints_reached (TAO_Transport
TAO::BufferingConstraint buffering_constraint;
buffering_constraint_policy->get_buffering_constraint (buffering_constraint);
- this->timer_check (transport,
- buffering_constraint);
+ this->timer_check (buffering_constraint, set_timer, interval);
if (buffering_constraint.mode == TAO::BUFFER_FLUSH)
return 1;
if (ACE_BIT_ENABLED (buffering_constraint.mode,
- TAO::BUFFER_MESSAGE_COUNT) &&
- buffering_queue.message_count () >= buffering_constraint.message_count)
+ TAO::BUFFER_MESSAGE_COUNT)
+ && msg_count >= buffering_constraint.message_count)
return 1;
if (ACE_BIT_ENABLED (buffering_constraint.mode,
- TAO::BUFFER_MESSAGE_BYTES) &&
- buffering_queue.message_length () >= buffering_constraint.message_bytes)
+ TAO::BUFFER_MESSAGE_BYTES)
+ && total_bytes >= buffering_constraint.message_bytes)
return 1;
return 0;
}
void
-TAO_Eager_Buffering_Sync_Strategy::timer_check (TAO_Transport &transport,
- const TAO::BufferingConstraint &buffering_constraint)
+TAO_Eager_Buffering_Sync_Strategy::
+ timer_check (const TAO::BufferingConstraint &buffering_constraint,
+ int &set_timer,
+ ACE_Time_Value &interval)
{
- if (transport.buffering_timer_id () != 0)
+ if (!ACE_BIT_ENABLED (buffering_constraint.mode,
+ TAO::BUFFER_TIMEOUT))
{
- //
- // There is a timeout set by us, though we are not sure if we
- // still need the timeout or if the timeout value is correct or
- // not.
- //
-
- // Get our reactor.
- ACE_Reactor *reactor = transport.orb_core ()->reactor ();
-
- if (!ACE_BIT_ENABLED (buffering_constraint.mode,
- TAO::BUFFER_TIMEOUT))
- {
- // Timeouts are no longer needed. Cancel existing one.
- reactor->cancel_timer (transport.buffering_timer_id ());
- transport.buffering_timer_id (0);
- }
- else
- {
- ACE_Time_Value timeout =
- this->time_conversion (buffering_constraint.timeout);
-
- if (transport.buffering_timeout_value () == timeout)
- {
- // Timeout value is the same, nothing to be done.
- }
- else
- {
- // Timeout value has changed, reset the old timer.
- reactor->reset_timer_interval (transport.buffering_timer_id (),
- timeout);
- }
- }
+ set_timer = 0;
+ return;
}
- else if (ACE_BIT_ENABLED (buffering_constraint.mode,
- TAO::BUFFER_TIMEOUT))
- {
- // We didn't have timeouts before, but we want them now.
- ACE_Time_Value timeout =
- this->time_conversion (buffering_constraint.timeout);
- // Get our reactor.
- ACE_Reactor *reactor = transport.orb_core ()->reactor ();
+ ACE_Time_Value timeout =
+ this->time_conversion (buffering_constraint.timeout);
- long timer_id = reactor->schedule_timer (transport.event_handler (),
- 0,
- timeout,
- timeout);
-
- transport.buffering_timer_id (timer_id);
- transport.buffering_timeout_value (timeout);
+ if (interval == timeout)
+ {
+ set_timer = 0;
+ return;
}
+
+ set_timer = 1;
+ interval = timeout;
}
ACE_Time_Value
-TAO_Eager_Buffering_Sync_Strategy::time_conversion (const TimeBase::TimeT &time)
+TAO_Eager_Buffering_Sync_Strategy::
+ time_conversion (const TimeBase::TimeT &time)
{
TimeBase::TimeT seconds = time / 10000000u;
TimeBase::TimeT microseconds = (time % 10000000u) / 10;
@@ -234,4 +119,15 @@ TAO_Eager_Buffering_Sync_Strategy::time_conversion (const TimeBase::TimeT &time)
ACE_U64_TO_U32 (microseconds));
}
+// ****************************************************************
+
+int
+TAO_Delayed_Buffering_Sync_Strategy::
+ must_queue (TAO_Stub *,
+ int queue_empty)
+{
+ // If the queue is empty we want to send immediately
+ return !queue_empty;
+}
+
#endif /* TAO_HAS_BUFFERING_CONSTRAINT_POLICY == 1 */
diff --git a/TAO/tao/Sync_Strategies.h b/TAO/tao/Sync_Strategies.h
index 5b273dac3e5..231d81a9624 100644
--- a/TAO/tao/Sync_Strategies.h
+++ b/TAO/tao/Sync_Strategies.h
@@ -27,24 +27,59 @@
#include "tao/Transport.h"
#include "tao/TAOC.h"
+/// Define the interface for the Queueing Strategy
+/**
+ * The low-level I/O components in the ORB use this strategy to
+ * determine when messages must be queued, immediately sent or
+ * flushed.
+ *
+ * The strategy isolates this low-level components from the higher
+ * level strategies used by the application developer.
+ *
+ * @todo The class name (Sync_Strategy) is inherited from the policies
+ * (SyncScopePolicy), but Queueing_Strategy probably captures its
+ * intent better. It should be changed in a future revision of the
+ * ORB.
+ */
class TAO_Export TAO_Sync_Strategy
{
public:
+ /// Destructor
virtual ~TAO_Sync_Strategy (void);
- virtual ssize_t send (TAO_Transport &transport,
- TAO_Stub &stub,
- const ACE_Message_Block *message_block,
- const ACE_Time_Value *max_wait_time) = 0;
+ /// Return 1 if a message must be queued
+ virtual int must_queue (TAO_Stub *stub,
+ int queue_empty) = 0;
+
+ /// Return 1 if it is time to start
+ /**
+ * @param stub The object used to make the request, this is used to
+ * obtain the policies currently in effect for the request
+ * @param msg_count The number of messages currently queued
+ * @param total_bytes Number of bytes currently queued
+ * @param set_timer Returns 1 if a timer should be set to drain the
+ * queue
+ * @param interval If set_timer returns 1, this parameter contains
+ * the timer interval
+ */
+ virtual int buffering_constraints_reached (TAO_Stub *stub,
+ size_t msg_count,
+ size_t total_bytes,
+ int &set_timer,
+ ACE_Time_Value &interval) = 0;
};
class TAO_Export TAO_Transport_Sync_Strategy : public TAO_Sync_Strategy
{
public:
- ssize_t send (TAO_Transport &transport,
- TAO_Stub &stub,
- const ACE_Message_Block *message_block,
- const ACE_Time_Value *max_wait_time);
+ virtual int must_queue (TAO_Stub *stub,
+ int queue_empty);
+
+ virtual int buffering_constraints_reached (TAO_Stub *stub,
+ size_t msg_count,
+ size_t total_bytes,
+ int &set_timer,
+ ACE_Time_Value &interval);
};
#if (TAO_HAS_BUFFERING_CONSTRAINT_POLICY == 1)
@@ -52,28 +87,41 @@ public:
class TAO_Export TAO_Eager_Buffering_Sync_Strategy : public TAO_Sync_Strategy
{
public:
- ssize_t send (TAO_Transport &transport,
- TAO_Stub &stub,
- const ACE_Message_Block *message_block,
- const ACE_Time_Value *max_wait_time);
-
- virtual int buffering_constraints_reached (TAO_Transport &transport,
- TAO_Stub &stub,
- TAO_Transport_Buffering_Queue &buffering_queue);
-
- void timer_check (TAO_Transport &transport,
- const TAO::BufferingConstraint &buffering_constraint);
-
+ virtual int must_queue (TAO_Stub *stub,
+ int queue_empty);
+
+ virtual int buffering_constraints_reached (TAO_Stub *stub,
+ size_t msg_count,
+ size_t total_bytes,
+ int &set_timer,
+ ACE_Time_Value &interval);
+
+private:
+ /// Check if the buffering constraint includes any timeouts and
+ /// compute the right timeout interval if needed.
+ /**
+ * @param buffering_constraint The constraints defined by the
+ * application
+ * @param set_timer Return 1 if the timer should be set
+ * @param interval Return the timer interval value
+ */
+ void timer_check (const TAO::BufferingConstraint &buffering_constraint,
+ int &set_timer,
+ ACE_Time_Value &interval);
+
+ /// Convert from standard CORBA time units to seconds/microseconds.
ACE_Time_Value time_conversion (const TimeBase::TimeT &time);
};
+/// Delay the buffering decision until the transport blocks
+/**
+ * If the queue is empty the transport will try to send immediately.
+ */
class TAO_Export TAO_Delayed_Buffering_Sync_Strategy : public TAO_Eager_Buffering_Sync_Strategy
{
public:
- ssize_t send (TAO_Transport &transport,
- TAO_Stub &stub,
- const ACE_Message_Block *message_block,
- const ACE_Time_Value *max_wait_time);
+ virtual int must_queue (TAO_Stub *stub,
+ int queue_empty);
};
#endif /* TAO_HAS_BUFFERING_CONSTRAINT_POLICY == 1 */
diff --git a/TAO/tao/TAO.dsp b/TAO/tao/TAO.dsp
index 658b09cf1b0..9b379316e0e 100644
--- a/TAO/tao/TAO.dsp
+++ b/TAO/tao/TAO.dsp
@@ -499,6 +499,25 @@ SOURCE=.\Bind_Dispatcher_Guard.cpp
# End Source File
# Begin Source File
+SOURCE=.\Block_Flushing_Strategy.cpp
+
+!IF "$(CFG)" == "TAO DLL - Win32 Alpha Release"
+
+!ELSEIF "$(CFG)" == "TAO DLL - Win32 Alpha Debug"
+
+!ELSEIF "$(CFG)" == "TAO DLL - Win32 MFC Release"
+
+!ELSEIF "$(CFG)" == "TAO DLL - Win32 MFC Debug"
+
+!ELSEIF "$(CFG)" == "TAO DLL - Win32 Release"
+
+!ELSEIF "$(CFG)" == "TAO DLL - Win32 Debug"
+
+!ENDIF
+
+# End Source File
+# Begin Source File
+
SOURCE=.\BoundsC.cpp
!IF "$(CFG)" == "TAO DLL - Win32 Alpha Release"
@@ -1202,6 +1221,25 @@ SOURCE=.\FILE_Parser.cpp
# End Source File
# Begin Source File
+SOURCE=.\Flushing_Strategy.cpp
+
+!IF "$(CFG)" == "TAO DLL - Win32 Alpha Release"
+
+!ELSEIF "$(CFG)" == "TAO DLL - Win32 Alpha Debug"
+
+!ELSEIF "$(CFG)" == "TAO DLL - Win32 MFC Release"
+
+!ELSEIF "$(CFG)" == "TAO DLL - Win32 MFC Debug"
+
+!ELSEIF "$(CFG)" == "TAO DLL - Win32 Release"
+
+!ELSEIF "$(CFG)" == "TAO DLL - Win32 Debug"
+
+!ENDIF
+
+# End Source File
+# Begin Source File
+
SOURCE=.\GIOP_Message_Base.cpp
!IF "$(CFG)" == "TAO DLL - Win32 Alpha Release"
@@ -1867,6 +1905,25 @@ SOURCE=.\Marshal.cpp
# End Source File
# Begin Source File
+SOURCE=.\Message_Sent_Callback.cpp
+
+!IF "$(CFG)" == "TAO DLL - Win32 Alpha Release"
+
+!ELSEIF "$(CFG)" == "TAO DLL - Win32 Alpha Debug"
+
+!ELSEIF "$(CFG)" == "TAO DLL - Win32 MFC Release"
+
+!ELSEIF "$(CFG)" == "TAO DLL - Win32 MFC Debug"
+
+!ELSEIF "$(CFG)" == "TAO DLL - Win32 Release"
+
+!ELSEIF "$(CFG)" == "TAO DLL - Win32 Debug"
+
+!ENDIF
+
+# End Source File
+# Begin Source File
+
SOURCE=.\Messaging_ORBInitializer.cpp
!IF "$(CFG)" == "TAO DLL - Win32 Alpha Release"
@@ -2646,6 +2703,44 @@ SOURCE=.\Protocols_Hooks.cpp
# End Source File
# Begin Source File
+SOURCE=.\Queued_Message.cpp
+
+!IF "$(CFG)" == "TAO DLL - Win32 Alpha Release"
+
+!ELSEIF "$(CFG)" == "TAO DLL - Win32 Alpha Debug"
+
+!ELSEIF "$(CFG)" == "TAO DLL - Win32 MFC Release"
+
+!ELSEIF "$(CFG)" == "TAO DLL - Win32 MFC Debug"
+
+!ELSEIF "$(CFG)" == "TAO DLL - Win32 Release"
+
+!ELSEIF "$(CFG)" == "TAO DLL - Win32 Debug"
+
+!ENDIF
+
+# End Source File
+# Begin Source File
+
+SOURCE=.\Reactive_Flushing_Strategy.cpp
+
+!IF "$(CFG)" == "TAO DLL - Win32 Alpha Release"
+
+!ELSEIF "$(CFG)" == "TAO DLL - Win32 Alpha Debug"
+
+!ELSEIF "$(CFG)" == "TAO DLL - Win32 MFC Release"
+
+!ELSEIF "$(CFG)" == "TAO DLL - Win32 MFC Debug"
+
+!ELSEIF "$(CFG)" == "TAO DLL - Win32 Release"
+
+!ELSEIF "$(CFG)" == "TAO DLL - Win32 Debug"
+
+!ENDIF
+
+# End Source File
+# Begin Source File
+
SOURCE=.\Reactor_Registry.cpp
!IF "$(CFG)" == "TAO DLL - Win32 Alpha Release"
@@ -3694,6 +3789,10 @@ SOURCE=.\Bind_Dispatcher_Guard.h
# End Source File
# Begin Source File
+SOURCE=.\Block_Flushing_Strategy.h
+# End Source File
+# Begin Source File
+
SOURCE=.\BoundsC.h
# End Source File
# Begin Source File
@@ -3854,6 +3953,10 @@ SOURCE=.\FILE_Parser.h
# End Source File
# Begin Source File
+SOURCE=.\Flushing_Strategy.h
+# End Source File
+# Begin Source File
+
SOURCE=.\giop.h
# End Source File
# Begin Source File
@@ -4010,6 +4113,14 @@ SOURCE=.\marshal.h
# End Source File
# Begin Source File
+SOURCE=.\Message_Sent_Callback.h
+# End Source File
+# Begin Source File
+
+SOURCE=.\Message_Sent_Callback.inl
+# End Source File
+# Begin Source File
+
SOURCE=.\Messaging_ORBInitializer.h
# End Source File
# Begin Source File
@@ -4190,6 +4301,18 @@ SOURCE=.\Protocols_Hooks.h
# End Source File
# Begin Source File
+SOURCE=.\Queued_Message.h
+# End Source File
+# Begin Source File
+
+SOURCE=.\Queued_Message.inl
+# End Source File
+# Begin Source File
+
+SOURCE=.\Reactive_Flushing_Strategy.h
+# End Source File
+# Begin Source File
+
SOURCE=.\Reactor_Registry.h
# End Source File
# Begin Source File
diff --git a/TAO/tao/TAO_Static.dsp b/TAO/tao/TAO_Static.dsp
index 214ac0cc0ae..f1f24e4dd47 100644
--- a/TAO/tao/TAO_Static.dsp
+++ b/TAO/tao/TAO_Static.dsp
@@ -40,8 +40,8 @@ RSC=rc.exe
# PROP Output_Dir ""
# PROP Intermediate_Dir "LIB\Release"
# PROP Target_Dir ""
-LINK32=link.exe -lib
MTL=midl.exe
+LINK32=link.exe -lib
# ADD BASE CPP /nologo /W3 /GX /O2 /D "WIN32" /D "NDEBUG" /D "_WINDOWS" /YX /FD /c
# ADD CPP /nologo /MD /W3 /GX /O2 /I "../../" /I "../" /D "_WINDOWS" /D "_CONSOLE" /D "NDEBUG" /D "WIN32" /D "TAO_AS_STATIC_LIBS" /D "ACE_AS_STATIC_LIBS" /FD /c
# SUBTRACT CPP /YX
@@ -66,8 +66,8 @@ LIB32=link.exe -lib
# PROP Output_Dir ""
# PROP Intermediate_Dir "LIB\Debug"
# PROP Target_Dir ""
-LINK32=link.exe -lib
MTL=midl.exe
+LINK32=link.exe -lib
# ADD BASE CPP /nologo /W3 /GX /Z7 /Od /D "WIN32" /D "_DEBUG" /D "_WINDOWS" /YX /FD /c
# ADD CPP /nologo /MDd /W3 /GX /Zi /Od /I "../../" /I "../" /D "_WINDOWS" /D "_CONSOLE" /D "_DEBUG" /D "WIN32" /D "ACE_AS_STATIC_LIBS" /D "TAO_AS_STATIC_LIBS" /FD /c
# SUBTRACT CPP /YX
@@ -451,6 +451,10 @@ SOURCE=.\marshal.h
# End Source File
# Begin Source File
+SOURCE=.\Message_Sent_Callback.h
+# End Source File
+# Begin Source File
+
SOURCE=.\Messaging_ORBInitializer.h
# End Source File
# Begin Source File
@@ -631,6 +635,10 @@ SOURCE=.\Protocols_Hooks.h
# End Source File
# Begin Source File
+SOURCE=.\Queued_Message.h
+# End Source File
+# Begin Source File
+
SOURCE=.\Reactor_Registry.h
# End Source File
# Begin Source File
@@ -1163,6 +1171,10 @@ SOURCE=.\marshal.i
# End Source File
# Begin Source File
+SOURCE=.\Message_Sent_Callback.inl
+# End Source File
+# Begin Source File
+
SOURCE=.\Messaging_Policy_i.i
# End Source File
# Begin Source File
@@ -1299,6 +1311,10 @@ SOURCE=.\Profile.i
# End Source File
# Begin Source File
+SOURCE=.\Queued_Message.inl
+# End Source File
+# Begin Source File
+
SOURCE=.\Reactor_Registry.i
# End Source File
# Begin Source File
@@ -1815,6 +1831,10 @@ SOURCE=.\Marshal.cpp
# End Source File
# Begin Source File
+SOURCE=.\Message_Sent_Callback.cpp
+# End Source File
+# Begin Source File
+
SOURCE=.\Messaging_ORBInitializer.cpp
# End Source File
# Begin Source File
@@ -1979,6 +1999,10 @@ SOURCE=.\Protocols_Hooks.cpp
# End Source File
# Begin Source File
+SOURCE=.\Queued_Message.cpp
+# End Source File
+# Begin Source File
+
SOURCE=.\Reactor_Registry.cpp
# End Source File
# Begin Source File
diff --git a/TAO/tao/Transport.cpp b/TAO/tao/Transport.cpp
index 98bcc63b319..94beb0d4f9b 100644
--- a/TAO/tao/Transport.cpp
+++ b/TAO/tao/Transport.cpp
@@ -10,6 +10,10 @@
#include "Transport_Mux_Strategy.h"
#include "Stub.h"
#include "Sync_Strategies.h"
+#include "Queued_Message.h"
+#include "Flushing_Strategy.h"
+
+#include "ace/Message_Block.h"
#if !defined (__ACE_INLINE__)
# include "Transport.inl"
@@ -22,9 +26,10 @@ TAO_Transport::TAO_Transport (CORBA::ULong tag,
TAO_ORB_Core *orb_core)
: tag_ (tag)
, orb_core_ (orb_core)
- , buffering_queue_ (0)
- , buffering_timer_id_ (0)
, bidirectional_flag_ (-1)
+ , head_ (0)
+ , tail_ (0)
+ , current_message_ (0)
{
TAO_Client_Strategy_Factory *cf =
this->orb_core_->client_factory ();
@@ -44,9 +49,19 @@ TAO_Transport::~TAO_Transport (void)
delete this->tms_;
this->tms_ = 0;
- delete this->buffering_queue_;
+ // delete this->buffering_queue_;
+
+ for (TAO_Queued_Message *i = this->head_; i != 0; i = i->next ())
+ {
+ // @@ This is a good point to insert a flag to indicate that a
+ // CloseConnection message was successfully received.
+ i->connection_closed ();
+
+ i->destroy ();
+ }
}
+#if 0
ssize_t
TAO_Transport::send_or_buffer (TAO_Stub *stub,
int two_way,
@@ -197,6 +212,246 @@ TAO_Transport::reset_message (ACE_Message_Block *message_block,
}
}
}
+#endif /* 0 */
+
+int
+TAO_Transport::handle_output ()
+{
+ // The reactor is asking us to send more data, first check if
+ // there is a current message that needs more sending:
+ int result = this->send_current_message ();
+ if (result == 0)
+ return 0;
+
+ if (result > 0)
+ {
+ // ... there is no current message or it was completely
+ // sent, time to check the queue....
+ result = this->dequeue_next_message ();
+ if (result == 0)
+ return 0;
+ }
+ // else { there was an error.... }
+
+ if (result > 0)
+ {
+ // ... no more data to send ... remove ourselves from the
+ // reactor ...
+ result = this->cancel_output ();
+ }
+
+ if (result == -1)
+ return -1; // There was an error, return -1 so the Reactor
+ else if (result == 0)
+ return 0; // There is no more data or socket blocked, just return 0
+
+ // else if (result > 0)
+ // There is more data to be sent, don't try right now, let the
+ // reactor handle it, because it can handle starvation better
+ return 0;
+}
+
+int
+TAO_Transport::send_current_message (void)
+{
+ if (this->current_message_ == 0)
+ return 1;
+
+ size_t byte_count;
+ ssize_t n = this->send (this->current_message_->mb (),
+ 0, /* it is non-blocking */
+ &byte_count);
+ if (n == 0)
+ {
+ // The connection was closed, return -1 to have the Reactor
+ // close this transport and event handler
+ return -1;
+ }
+
+ // Because there can be a partial transfer we need to adjust the
+ // number of bytes sent.
+ this->current_message_->bytes_transferred (byte_count);
+ if (this->current_message_->done ())
+ {
+ // Remove the current message....
+ // @@ We should be using a pool for these guys!
+ this->current_message_->destroy ();
+
+ this->current_message_ = 0;
+
+ if (n == -1)
+ return -1;
+
+ return 1;
+ }
+
+ if (n == -1)
+ {
+ // ... timeouts and flow control are not real errors, the
+ // connection is still valid and we must continue sending the
+ // current message ...
+ if (errno == EWOULDBLOCK
+ || errno == ETIME)
+ return 0;
+
+ return -1;
+ }
+
+ return 0;
+}
+
+int
+TAO_Transport::dequeue_next_message (void)
+{
+ ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, ace_mon, this->queue_mutex_, -1);
+ if (this->head_ == 0)
+ return 1;
+
+ this->current_message_ = this->head_;
+ this->head_->remove_from_list (this->head_, this->tail_);
+
+ return 0;
+}
+
+int
+TAO_Transport::cancel_output (void)
+{
+ return 0;
+}
+
+int
+TAO_Transport::schedule_output (void)
+{
+ return 0;
+}
+
+int
+TAO_Transport::send_message_i (TAO_Stub *stub,
+ int twoway_flag,
+ const ACE_Message_Block *message_block,
+ ACE_Time_Value *max_wait_time)
+{
+ ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, ace_mon, this->queue_mutex_, -1);
+
+ int queue_empty = (this->head_ == 0);
+
+ // Let's figure out if the message should be queued without trying
+ // to send first:
+ int non_queued_message =
+ (this->current_message_ == 0) // There is an outgoing message already
+ && (twoway_flag
+ || !stub->sync_strategy ().must_queue (stub,
+ queue_empty));
+
+ TAO_Queued_Message *queued_message = 0;
+ if (non_queued_message)
+ {
+ // ... in this case we must try to send the message first ...
+
+ size_t byte_count;
+
+ // @@ I don't think we want to hold the mutex here, however if
+ // we release it we need to recheck the status of the transport
+ // after we return... once I understand the final form for this
+ // code I will re-visit this stuff.
+ ssize_t n = this->send (message_block,
+ 0, // non-blocking
+ &byte_count);
+ if (n == 0)
+ return -1;
+ else if (n == -1 && errno != EWOULDBLOCK)
+ return -1;
+
+ // ... let's figure out if the complete message was sent ...
+ if (message_block->total_length () == byte_count)
+ {
+ // Done, just return. Notice that there are no allocations
+ // or copies up to this point (though some fancy calling
+ // back and forth).
+ // This should be the normal path in the ORB, it should be
+ // fast.
+ return 0;
+ }
+
+ // ... the message was only partially sent, set it as the
+ // current message ...
+ queued_message =
+ new TAO_Queued_Message (message_block->clone ());
+ // @@ Revisit message queue allocations
+
+ queued_message->bytes_transferred (byte_count);
+ this->current_message_ = queued_message;
+ }
+ else
+ {
+ // ... otherwise simply queue the message ...
+ queued_message =
+ new TAO_Queued_Message (message_block->clone ());
+ queued_message->push_back (this->head_, this->tail_);
+ }
+
+ // ... two choices, this is a twoway request or not, if it is
+ // then we must only return once the complete message has been
+ // sent:
+
+ TAO_Flushing_Strategy *flushing_strategy =
+ this->orb_core ()->flushing_strategy ();
+ if (twoway_flag)
+ {
+ // Release the mutex, other threads may modify the queue as we
+ // block for a long time writing out data.
+ ace_mon.release ();
+ int result = flushing_strategy->flush_message (this, queued_message);
+ queued_message->destroy ();
+ return result;
+ }
+
+ // ... this is not a twoway. We must check if the buffering
+ // constraints have been reached, if so, then we should start
+ // flushing out data....
+
+ // First let's compute the size of the queue:
+ size_t msg_count = 0;
+ size_t total_bytes = 0;
+ for (TAO_Queued_Message *i = this->head_; i != 0; i = i->next ())
+ {
+ msg_count++;
+ total_bytes += i->mb ()->total_length ();
+ }
+ if (this->current_message_ != 0)
+ {
+ msg_count++;
+ total_bytes += this->current_message_->mb ()->total_length ();
+ }
+
+ int set_timer;
+ ACE_Time_Value interval;
+
+ if (stub->sync_strategy ().buffering_constraints_reached (stub,
+ msg_count,
+ total_bytes,
+ set_timer,
+ interval))
+ {
+ ace_mon.release ();
+ // @@ memory management of the queued messages is getting tricky.
+ int result = flushing_strategy->flush_message (this,
+ this->tail_);
+ return result;
+ }
+ else
+ {
+ // ... it is not time to flush yet, but maybe we need to set a
+ // timer ...
+ if (set_timer)
+ {
+ // @@ We need to schedule the timer. We should also be
+ // careful not to schedule one if there is one scheduled
+ // already.
+ }
+ }
+ return 0;
+}
int
TAO_Transport::idle_after_send (void)
diff --git a/TAO/tao/Transport.h b/TAO/tao/Transport.h
index 2b5484a5abd..1505925b4e9 100644
--- a/TAO/tao/Transport.h
+++ b/TAO/tao/Transport.h
@@ -32,9 +32,7 @@ class TAO_Operation_Details;
class TAO_Transport_Mux_Strategy;
class TAO_Wait_Strategy;
-#include "ace/Message_Queue.h"
-
-typedef ACE_Message_Queue<ACE_NULL_SYNCH> TAO_Transport_Buffering_Queue;
+class TAO_Queued_Message;
/**
* @class TAO_Transport
@@ -89,7 +87,7 @@ typedef ACE_Message_Queue<ACE_NULL_SYNCH> TAO_Transport_Buffering_Queue;
* Also some applications may choose, for performance reasons or to
* avoid complex concurrency scenarios due to nested upcalls, to
* using blocking I/O
- * block the
+ * block the
*
* <H4>Timeouts:</H4> Some or all messages could have a timeout period
* attached to them. The timeout source could either be some
@@ -111,7 +109,7 @@ typedef ACE_Message_Queue<ACE_NULL_SYNCH> TAO_Transport_Buffering_Queue;
*
* The Transport object provides a single method to send messages
* (send_message ()).
- *
+ *
* <H3>The incoming data path:</H3>
*
* @todo Document the incoming data path design forces.
@@ -120,15 +118,10 @@ typedef ACE_Message_Queue<ACE_NULL_SYNCH> TAO_Transport_Buffering_Queue;
* <B>See Also:</B>
*
* http://ace.cs.wustl.edu/cvsweb/ace-latest.cgi/ACE_wrappers/TAO/docs/pluggable_protocols/index.html
- *
+ *
*/
class TAO_Export TAO_Transport
{
-
- friend class TAO_Transport_Sync_Strategy;
- friend class TAO_Eager_Buffering_Sync_Strategy;
- friend class TAO_Delayed_Buffering_Sync_Strategy;
-
public:
/// default creator, requres the tag value be supplied.
TAO_Transport (CORBA::ULong tag,
@@ -171,6 +164,17 @@ public:
*/
TAO_Wait_Strategy *wait_strategy (void) const;
+ /// Callback method to reactively drain the outgoing data queue
+ int handle_output (void);
+
+ /**
+ * Return the TSS leader follower condition variable used in the
+ * Wait Strategy. Muxed Leader Follower implementation returns a
+ * valid condition variable, others return 0.
+ */
+ virtual TAO_SYNCH_CONDITION *leader_follower_condition_variable (void);
+
+#if 0
/// Send a request or queue it for later.
/**
* If the right policies are set queue the request for later.
@@ -188,13 +192,6 @@ public:
const ACE_Message_Block *mblk,
const ACE_Time_Value *s = 0);
- /**
- * Return the TSS leader follower condition variable used in the
- * Wait Strategy. Muxed Leader Follower implementation returns a
- * valid condition variable, others return 0.
- */
- virtual TAO_SYNCH_CONDITION *leader_follower_condition_variable (void);
-
/// Queue for buffering transport messages.
virtual TAO_Transport_Buffering_Queue &buffering_queue (void);
@@ -208,14 +205,7 @@ public:
/// Send any messages that have been buffered.
ssize_t send_buffered_messages (const ACE_Time_Value *max_wait_time = 0);
-
- /**
- * Initialising the messaging object. This would be used by the
- * connector side. On the acceptor side the connection handler
- * would take care of the messaging objects.
- */
- virtual int messaging_init (CORBA::Octet major,
- CORBA::Octet minor) = 0;
+#endif /* 0 */
/// Get/Set the bidirectional flag
virtual int bidirectional_flag (void) const;
@@ -255,7 +245,7 @@ public:
*
* The Transport class uses the Template Method Pattern to implement
* the protocol specific functionality.
- * Implementors of a pluggable protocol should override the
+ * Implementors of a pluggable protocol should override the
* following methods with the semantics documented below.
*/
//@{
@@ -294,7 +284,7 @@ public:
* framework and the TAO pluggable protocol framework.
* In all the protocols implemented so far this role is fullfilled
* by an instance of ACE_Svc_Handler.
- *
+ *
* @todo Since we only use a limited functionality of
* ACE_Svc_Handler we could probably implement a generic
* adapter class (TAO_Transport_Event_Handler or something), this
@@ -340,7 +330,7 @@ public:
// Read len bytes from into buf.
/**
- * @param buffer ORB allocated buffer where the data should be
+ * @param buffer ORB allocated buffer where the data should be
* @@ The ACE_Time_Value *s is just a place holder for now. It is
* not clear this this is the best place to specify this. The actual
* timeout values will be kept in the Policies.
@@ -455,7 +445,7 @@ public:
virtual int read_process_message (ACE_Time_Value *max_wait_time = 0,
int block = 0) = 0;
- /// Register the handler with the reactor.
+ /// Register the handler with the reactor.
/**
* This method is used by the Wait_On_Reactor strategy. The
* transport must register its event handler with the ORB's Reactor.
@@ -467,9 +457,17 @@ public:
*/
virtual int register_handler (void) = 0;
+ /**
+ * Initialising the messaging object. This would be used by the
+ * connector side. On the acceptor side the connection handler
+ * would take care of the messaging objects.
+ */
+ virtual int messaging_init (CORBA::Octet major,
+ CORBA::Octet minor) = 0;
//@}
protected:
+#if 0
/// Remove the first message from the outgoing queue.
void dequeue_head (void);
@@ -487,7 +485,46 @@ protected:
void reset_message (ACE_Message_Block *message_block,
size_t bytes_delivered,
int queued_message);
+#endif /* 0 */
+
+protected:
+ /// Sent the contents of <message_block>, blocking if required by
+ /// the twoway flag or by the current policies in the stub.
+ int send_message_i (TAO_Stub *stub,
+ int twoway_flag,
+ const ACE_Message_Block *message_block,
+ ACE_Time_Value *max_wait_time);
+
private:
+
+ /// Try to send the current message.
+ /**
+ * As the outgoing data is drained this method is invoked to send as
+ * much of the current message as possible.
+ *
+ * Returns 0 if there is more data to send, -1 if there was an error
+ * and 1 if the message was completely sent.
+ */
+ int send_current_message (void);
+
+ /// Dequeue the next message, if any, and continue sending data
+ /**
+ * Once a message is completely sent, a new message is dequeued and
+ * setup as the current message.
+ *
+ * Returns 0 if there is more data to send, -1 if there was an error
+ * and 1 if the message was completely sent.
+ */
+ int dequeue_next_message (void);
+
+ /// There is data queued or pending data in the current
+ /// message. Enable the reactive calls through the reactor
+ int schedule_output (void);
+
+ /// There is no more data to send, cancel any reactive calls through
+ /// the reactor
+ int cancel_output (void);
+
/// Prohibited
ACE_UNIMPLEMENTED_FUNC (TAO_Transport (const TAO_Transport&))
ACE_UNIMPLEMENTED_FUNC (void operator= (const TAO_Transport&))
@@ -506,6 +543,7 @@ protected:
/// Strategy for waiting for the reply after sending the request.
TAO_Wait_Strategy *ws_;
+#if 0
/// Queue for buffering transport messages.
TAO_Transport_Buffering_Queue *buffering_queue_;
@@ -514,6 +552,7 @@ protected:
/// Buffering timeout value.
ACE_Time_Value buffering_timeout_value_;
+#endif /* 0 */
/// Use to check if bidirectional info has been synchronized with
/// the peer.
@@ -536,6 +575,17 @@ protected:
* if the server receives the info.
*/
int bidirectional_flag_;
+
+ /// Synchronize access to the outgoing data queue
+ TAO_SYNCH_MUTEX queue_mutex_;
+
+ /// Implement the outgoing data queue
+ TAO_Queued_Message *head_;
+ TAO_Queued_Message *tail_;
+
+ /// Once part of a message has been sent it is kept here until it is
+ /// completely sent
+ TAO_Queued_Message *current_message_;
};
#if defined (__ACE_INLINE__)
diff --git a/TAO/tao/Transport.inl b/TAO/tao/Transport.inl
index 6bc1316cb35..f9c281342c9 100644
--- a/TAO/tao/Transport.inl
+++ b/TAO/tao/Transport.inl
@@ -1,5 +1,11 @@
// $Id$
+ACE_INLINE CORBA::ULong
+TAO_Transport::tag (void) const
+{
+ return this->tag_;
+}
+
ACE_INLINE TAO_ORB_Core *
TAO_Transport::orb_core (void) const
{
@@ -19,12 +25,7 @@ TAO_Transport::wait_strategy (void) const
return this->ws_;
}
-ACE_INLINE CORBA::ULong
-TAO_Transport::tag (void) const
-{
- return this->tag_;
-}
-
+#if 0
ACE_INLINE long
TAO_Transport::buffering_timer_id (void) const
{
@@ -77,6 +78,19 @@ TAO_Transport::dequeue_head (void)
message_block->release ();
}
+ACE_INLINE void
+TAO_Transport::dequeue_all (void)
+{
+ // Flush all queued messages.
+ if (this->buffering_queue_)
+ {
+ while (!this->buffering_queue_->is_empty ())
+ this->dequeue_head ();
+ }
+}
+
+#endif /* 0 */
+
ACE_INLINE int
TAO_Transport::bidirectional_flag (void) const
{
@@ -88,14 +102,3 @@ TAO_Transport::bidirectional_flag (int flag)
{
this->bidirectional_flag_ = flag;
}
-
-ACE_INLINE void
-TAO_Transport::dequeue_all (void)
-{
- // Flush all queued messages.
- if (this->buffering_queue_)
- {
- while (!this->buffering_queue_->is_empty ())
- this->dequeue_head ();
- }
-}
diff --git a/TAO/tao/default_resource.cpp b/TAO/tao/default_resource.cpp
index eb39187d3ba..d7920738bf8 100644
--- a/TAO/tao/default_resource.cpp
+++ b/TAO/tao/default_resource.cpp
@@ -11,6 +11,9 @@
#include "tao/Single_Reactor.h"
#include "tao/Priority_Mapping.h"
+#include "tao/Reactive_Flushing_Strategy.h"
+#include "tao/Block_Flushing_Strategy.h"
+
#include "ace/TP_Reactor.h"
#include "ace/Dynamic_Service.h"
#include "ace/Arg_Shifter.h"
@@ -33,6 +36,7 @@ TAO_Default_Resource_Factory::TAO_Default_Resource_Factory (void)
reactor_mask_signals_ (1),
dynamically_allocated_reactor_ (0),
cached_connection_lock_type_ (TAO_THREAD_LOCK)
+ , flushing_strategy_type_ (TAO_REACTIVE_FLUSHING)
{
}
@@ -193,6 +197,7 @@ TAO_Default_Resource_Factory::init (int argc, char **argv)
this->add_to_ior_parser_names (argv[curarg]);
}
}
+
else if (ACE_OS::strcasecmp (argv[curarg],
"-ORBConnectionLock") == 0)
{
@@ -210,6 +215,23 @@ TAO_Default_Resource_Factory::init (int argc, char **argv)
}
}
+ else if (ACE_OS::strcasecmp (argv[curarg],
+ "-ORBFlushingStrategy") == 0)
+ {
+ curarg++;
+ if (curarg < argc)
+ {
+ char *name = argv[curarg];
+
+ if (ACE_OS::strcasecmp (name,
+ "reactive") == 0)
+ this->flushing_strategy_type_ = TAO_REACTIVE_FLUSHING;
+ else if (ACE_OS::strcasecmp (name,
+ "blocking") == 0)
+ this->flushing_strategy_type_ = TAO_BLOCKING_FLUSHING;
+ }
+ }
+
return 0;
}
@@ -609,7 +631,7 @@ TAO_Default_Resource_Factory::input_cdr_buffer_allocator (void)
ACE_NEW_RETURN (allocator,
LOCKED_ALLOCATOR,
0);
-
+
return allocator;
}
@@ -658,6 +680,21 @@ TAO_Default_Resource_Factory::create_cached_connection_lock (void)
return the_lock;
}
+TAO_Flushing_Strategy *
+TAO_Default_Resource_Factory::create_flushing_strategy (void)
+{
+ TAO_Flushing_Strategy *strategy = 0;
+ if (this->flushing_strategy_type_ == TAO_REACTIVE_FLUSHING)
+ ACE_NEW_RETURN (strategy,
+ TAO_Reactive_Flushing_Strategy,
+ 0);
+ else
+ ACE_NEW_RETURN (strategy,
+ TAO_Block_Flushing_Strategy,
+ 0);
+ return strategy;
+}
+
TAO_Priority_Mapping *
TAO_Default_Resource_Factory::get_priority_mapping (void)
{
diff --git a/TAO/tao/default_resource.h b/TAO/tao/default_resource.h
index a9b2c85e2ee..674f676148b 100644
--- a/TAO/tao/default_resource.h
+++ b/TAO/tao/default_resource.h
@@ -97,6 +97,7 @@ public:
virtual double purge_percentage (void) const;
virtual TAO_Priority_Mapping *get_priority_mapping (void);
virtual ACE_Lock *create_cached_connection_lock (void);
+ virtual TAO_Flushing_Strategy *create_flushing_strategy (void);
protected:
/// Obtain the reactor implementation
@@ -167,6 +168,15 @@ private:
/// Type of lock used by the cached connector.
Lock_Type cached_connection_lock_type_;
+
+ enum Flushing_Strategy_Type
+ {
+ TAO_REACTIVE_FLUSHING,
+ TAO_BLOCKING_FLUSHING
+ };
+
+ /// Type of flushing strategy configured
+ int flushing_strategy_type_;
};
#if defined (__ACE_INLINE__)