summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorcoryan <coryan@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>2001-03-18 01:54:27 +0000
committercoryan <coryan@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>2001-03-18 01:54:27 +0000
commitfc4b5450e12e9cf8f52479f104b71ed0976adfdb (patch)
treee1401567e96fcafe90a9daafa4b9c570feace5c0
parentf594b65f3fa8afb58601b33f225413e41906be49 (diff)
downloadATCD-fc4b5450e12e9cf8f52479f104b71ed0976adfdb.tar.gz
ChangeLogTag:Sat Mar 17 17:52:27 2001 Carlos O'Ryan <coryan@uci.edu>
-rw-r--r--TAO/ChangeLogs/ChangeLog-02a14
-rw-r--r--TAO/tao/IIOP_Transport.cpp30
-rw-r--r--TAO/tao/IIOP_Transport.h3
-rw-r--r--TAO/tao/Reactive_Flushing_Strategy.cpp26
-rw-r--r--TAO/tao/Transport.cpp114
-rw-r--r--TAO/tao/Transport.h31
6 files changed, 85 insertions, 133 deletions
diff --git a/TAO/ChangeLogs/ChangeLog-02a b/TAO/ChangeLogs/ChangeLog-02a
index 36a0e5af586..808b235e3cb 100644
--- a/TAO/ChangeLogs/ChangeLog-02a
+++ b/TAO/ChangeLogs/ChangeLog-02a
@@ -1,3 +1,17 @@
+Sat Mar 17 17:52:27 2001 Carlos O'Ryan <coryan@uci.edu>
+
+ * tao/Transport.h:
+ * tao/Transport.cpp:
+ Removed the current_message_ field, using the head of the queue
+ works just as well, at least as long as we always push events to
+ the end of the queue.
+
+ * tao/IIOP_Transport.h:
+ * tao/IIOP_Transport.cpp:
+ * tao/Reactive_Flushing_Strategy.cpp:
+ Use the Flushing Strategy to schedule output and cancel output
+ with the reactor.
+
Sat Mar 17 15:34:14 2001 Carlos O'Ryan <coryan@uci.edu>
* tests/Big_Oneways/Session.h:
diff --git a/TAO/tao/IIOP_Transport.cpp b/TAO/tao/IIOP_Transport.cpp
index 8cf934a8e3a..a27a6198ec9 100644
--- a/TAO/tao/IIOP_Transport.cpp
+++ b/TAO/tao/IIOP_Transport.cpp
@@ -330,36 +330,6 @@ TAO_IIOP_Transport::tear_listen_point_list (TAO_InputCDR &cdr)
}
int
-TAO_IIOP_Transport::schedule_output (void)
-{
- if (TAO_debug_level > 3)
- {
- ACE_DEBUG ((LM_DEBUG,
- "TAO (%P|%t) - IIOP_Transport[%d]::schedule_output\n",
- this->handle ()));
- }
- ACE_Reactor *r =
- this->connection_handler_->reactor ();
- return r->schedule_wakeup (this->connection_handler_,
- ACE_Event_Handler::WRITE_MASK);
-}
-
-int
-TAO_IIOP_Transport::cancel_output (void)
-{
- if (TAO_debug_level > 3)
- {
- ACE_DEBUG ((LM_DEBUG,
- "TAO (%P|%t) - IIOP_Transport[%d]::cancel_output\n",
- this->handle ()));
- }
- ACE_Reactor *r =
- this->connection_handler_->reactor ();
- return r->cancel_wakeup (this->connection_handler_,
- ACE_Event_Handler::WRITE_MASK);
-}
-
-int
TAO_IIOP_Transport::process_message (void)
{
// Get the <message_type> that we have received
diff --git a/TAO/tao/IIOP_Transport.h b/TAO/tao/IIOP_Transport.h
index 94b8efb38a0..68ebf14a558 100644
--- a/TAO/tao/IIOP_Transport.h
+++ b/TAO/tao/IIOP_Transport.h
@@ -125,9 +125,6 @@ public:
CORBA::Octet minor);
virtual int tear_listen_point_list (TAO_InputCDR &cdr);
-
- virtual int schedule_output (void);
- virtual int cancel_output (void);
//@}
private:
diff --git a/TAO/tao/Reactive_Flushing_Strategy.cpp b/TAO/tao/Reactive_Flushing_Strategy.cpp
index 58df0474633..e75f8079e6e 100644
--- a/TAO/tao/Reactive_Flushing_Strategy.cpp
+++ b/TAO/tao/Reactive_Flushing_Strategy.cpp
@@ -5,6 +5,7 @@
#include "Transport.h"
#include "ORB_Core.h"
#include "Queued_Message.h"
+#include "debug.h"
ACE_RCSID(tao, Reactive_Flushing_Strategy, "$Id$")
@@ -14,19 +15,32 @@ TAO_Reactive_Flushing_Strategy::schedule_output (TAO_Transport *transport)
ACE_Reactor *reactor =
transport->orb_core ()->reactor ();
- return reactor->register_handler (transport->event_handler (),
- ACE_Event_Handler::READ_MASK
- | ACE_Event_Handler::WRITE_MASK);
+ if (TAO_debug_level > 3)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "TAO (%P|%t) - Reactive_Flushing_Strategy[%d]::schedule_output\n",
+ transport->handle ()));
+ }
+
+ return reactor->schedule_wakeup (transport->event_handler (),
+ ACE_Event_Handler::WRITE_MASK);
}
int
TAO_Reactive_Flushing_Strategy::cancel_output (TAO_Transport *transport)
{
ACE_Reactor *reactor =
- transport->orb_core ()->reactor ();
+ transport->event_handler ()->reactor ();
+
+ if (TAO_debug_level > 3)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "TAO (%P|%t) - Reactive_Flushing_Strategy[%d]::cancel_output\n",
+ transport->handle ()));
+ }
- return reactor->register_handler (transport->event_handler (),
- ACE_Event_Handler::READ_MASK);
+ return reactor->cancel_wakeup (transport->event_handler (),
+ ACE_Event_Handler::WRITE_MASK);
}
int
diff --git a/TAO/tao/Transport.cpp b/TAO/tao/Transport.cpp
index c272e341914..24e2ba58788 100644
--- a/TAO/tao/Transport.cpp
+++ b/TAO/tao/Transport.cpp
@@ -30,7 +30,6 @@ TAO_Transport::TAO_Transport (CORBA::ULong tag,
, bidirectional_flag_ (-1)
, head_ (0)
, tail_ (0)
- , current_message_ (0)
{
TAO_Client_Strategy_Factory *cf =
this->orb_core_->client_factory ();
@@ -96,13 +95,12 @@ TAO_Transport::handle_output ()
{
// ... there is no current message or it was completely
// sent, time to check the queue....
- int dequeue = this->dequeue_next_message ();
- if (dequeue == -1)
- {
- // ... no more messages in the queue, cancel output...
- (void) this->cancel_output ();
- return 0;
- }
+ // ... no more messages in the queue, cancel output...
+ TAO_Flushing_Strategy *flushing_strategy =
+ this->orb_core ()->flushing_strategy ();
+
+ flushing_strategy->cancel_output (this);
+ return 0;
}
// ... on Win32 we must continue until we get EWOULDBLOCK ...
}
@@ -240,13 +238,13 @@ TAO_Transport::send_current_message (void)
{
ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, ace_mon, this->queue_mutex_, -1);
- if (this->current_message_ == 0)
+ if (this->head_ == 0)
return 1;
size_t bytes_transferred;
ssize_t retval =
- this->send_message_block_chain (this->current_message_->mb (),
+ this->send_message_block_chain (this->head_->mb (),
bytes_transferred);
if (retval == 0)
{
@@ -257,14 +255,14 @@ TAO_Transport::send_current_message (void)
// Because there can be a partial transfer we need to adjust the
// number of bytes sent.
- this->current_message_->bytes_transferred (bytes_transferred);
- if (this->current_message_->done ())
+ this->head_->bytes_transferred (bytes_transferred);
+ if (this->head_->done ())
{
// Remove the current message....
- // @@ We should be using a pool for these guys!
- this->current_message_->destroy ();
+ TAO_Queued_Message *head = this->head_;
+ head->remove_from_list (this->head_, this->tail_);
- this->current_message_ = 0;
+ head->destroy ();
}
if (retval == -1)
@@ -278,33 +276,8 @@ TAO_Transport::send_current_message (void)
return -1;
}
- if (this->current_message_ == 0)
- return 1;
- return 0;
-}
-
-int
-TAO_Transport::dequeue_next_message (void)
-{
- ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, ace_mon, this->queue_mutex_, -1);
if (this->head_ == 0)
- return -1;
-
- this->current_message_ = this->head_;
- this->current_message_->remove_from_list (this->head_, this->tail_);
-
- return 0;
-}
-
-int
-TAO_Transport::cancel_output (void)
-{
- return 0;
-}
-
-int
-TAO_Transport::schedule_output (void)
-{
+ return 1;
return 0;
}
@@ -314,6 +287,9 @@ TAO_Transport::send_message_i (TAO_Stub *stub,
const ACE_Message_Block *message_block,
ACE_Time_Value *max_wait_time)
{
+ TAO_Flushing_Strategy *flushing_strategy =
+ this->orb_core ()->flushing_strategy ();
+
ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, ace_mon, this->queue_mutex_, -1);
int queue_empty = (this->head_ == 0);
@@ -321,7 +297,7 @@ TAO_Transport::send_message_i (TAO_Stub *stub,
// Let's figure out if the message should be queued without trying
// to send first:
int must_queue = 0;
- if (this->current_message_ != 0)
+ if (this->head_ != 0)
must_queue = 1;
else if (!twoway_flag
&& stub->sync_strategy ().must_queue (queue_empty))
@@ -342,16 +318,9 @@ TAO_Transport::send_message_i (TAO_Stub *stub,
this->handle ()));
}
- size_t length = message_block->total_length ();
- ACE_Message_Block *copy =
- new ACE_Message_Block (length);
- for (const ACE_Message_Block *i = message_block;
- i != 0;
- i = i->cont ())
- copy->copy (i->rd_ptr (), i->length ());
-
queued_message =
- new TAO_Queued_Message (copy, 1);
+ this->copy_message_block (message_block);
+
queued_message->push_back (this->head_, this->tail_);
}
else
@@ -400,7 +369,7 @@ TAO_Transport::send_message_i (TAO_Stub *stub,
// ... the message was only partially sent, schedule reactive
// output...
- this->schedule_output ();
+ flushing_strategy->schedule_output (this);
// ... and set it as the current message ...
if (twoway_flag)
@@ -414,16 +383,8 @@ TAO_Transport::send_message_i (TAO_Stub *stub,
}
else
{
- size_t length = message_block->total_length ();
- ACE_Message_Block *copy =
- new ACE_Message_Block (length);
- for (const ACE_Message_Block *i = message_block;
- i != 0;
- i = i->cont ())
- copy->copy (i->rd_ptr (), i->length ());
-
queued_message =
- new TAO_Queued_Message (copy, 1);
+ this->copy_message_block (message_block);
}
// @@ Revisit message queue allocations
@@ -454,15 +415,15 @@ TAO_Transport::send_message_i (TAO_Stub *stub,
queued_message->mb ()->total_length ()));
}
- this->current_message_ = queued_message;
+ // ... insert at the head of the queue, we can use push_back()
+ // because the queue is empty ...
+ queued_message->push_back (this->head_, this->tail_);
}
// ... two choices, this is a twoway request or not, if it is
// then we must only return once the complete message has been
// sent:
- TAO_Flushing_Strategy *flushing_strategy =
- this->orb_core ()->flushing_strategy ();
if (twoway_flag)
{
// Release the mutex, other threads may modify the queue as we
@@ -485,11 +446,6 @@ TAO_Transport::send_message_i (TAO_Stub *stub,
msg_count++;
total_bytes += i->mb ()->total_length ();
}
- if (this->current_message_ != 0)
- {
- msg_count++;
- total_bytes += this->current_message_->mb ()->total_length ();
- }
int set_timer;
ACE_Time_Value interval;
@@ -549,3 +505,23 @@ TAO_Transport::reactor_signalling (void)
{
return 0;
}
+
+TAO_Queued_Message *
+TAO_Transport::copy_message_block (const ACE_Message_Block *message_block)
+{
+ size_t length = message_block->total_length ();
+
+ // @@ Use Auto_Ptr<> to cleanup the message block, should the second
+ // allocation fail
+ ACE_Message_Block *copy;
+ ACE_NEW_RETURN (copy, ACE_Message_Block (length), 0);
+ for (const ACE_Message_Block *i = message_block;
+ i != 0;
+ i = i->cont ())
+ copy->copy (i->rd_ptr (), i->length ());
+
+ TAO_Queued_Message *msg;
+ ACE_NEW_RETURN (msg, TAO_Queued_Message (copy, 1), 0);
+
+ return msg;
+}
diff --git a/TAO/tao/Transport.h b/TAO/tao/Transport.h
index f5729acdd34..d51ff70828a 100644
--- a/TAO/tao/Transport.h
+++ b/TAO/tao/Transport.h
@@ -71,10 +71,10 @@ class TAO_Queued_Message;
* transport may already be sending another message in a reactive
* fashion.
*
- * Consequently, the Transport must also keep a
- * <TT>current_message</TT>, if the current message is not null any
- * new messages must be queued. Only once the current message is
- * completely sent we can take a message out of the queue.
+ * Consequently, the Transport must also know if the head of the queue
+ * has been partially sent. In that case new messages can only follow
+ * the head. Only once the head is completely sent we can start
+ * sending new messages.
*
* <H4>Waiting threads:</H4> One or more threads can be blocked
* waiting for the connection to completely send the message.
@@ -483,14 +483,6 @@ public:
virtual int messaging_init (CORBA::Octet major,
CORBA::Octet minor) = 0;
- /// There is data queued or pending data in the current
- /// message. Enable the reactive calls through the reactor
- virtual int schedule_output (void);
-
- /// There is no more data to send, cancel any reactive calls through
- /// the reactor
- virtual int cancel_output (void);
-
//@}
/// Send a message block chain,
@@ -518,15 +510,8 @@ private:
*/
int send_current_message (void);
- /// Dequeue the next message, if any, and continue sending data
- /**
- * Once a message is completely sent, a new message is dequeued and
- * setup as the current message.
- *
- * Returns 0 if there is more data to send, -1 if there was an error
- * and 1 if the message was completely sent.
- */
- int dequeue_next_message (void);
+ /// Copy the contents of a message block into a Queued_Message
+ TAO_Queued_Message *copy_message_block (const ACE_Message_Block *mb);
/// Prohibited
ACE_UNIMPLEMENTED_FUNC (TAO_Transport (const TAO_Transport&))
@@ -585,10 +570,6 @@ protected:
/// Implement the outgoing data queue
TAO_Queued_Message *head_;
TAO_Queued_Message *tail_;
-
- /// Once part of a message has been sent it is kept here until it is
- /// completely sent
- TAO_Queued_Message *current_message_;
};
#if defined (__ACE_INLINE__)