summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorcoryan <coryan@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>2001-04-05 18:45:14 +0000
committercoryan <coryan@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>2001-04-05 18:45:14 +0000
commit1dddc51e6ff3e372eabb661bda47956d7a1a0c5e (patch)
tree7ed89d721e556cd5a4173ee192e25dae5d3d968d
parent41b3ffd124988015d833ef559802ffd7545dd774 (diff)
downloadATCD-1dddc51e6ff3e372eabb661bda47956d7a1a0c5e.tar.gz
ChangeLogTag:Thu Apr 05 10:36:57 2001 Carlos O'Ryan <coryan@uci.edu>
-rw-r--r--TAO/ChangeLogs/ChangeLog-02a27
-rw-r--r--TAO/tao/Block_Flushing_Strategy.cpp2
-rw-r--r--TAO/tao/Queued_Message.h3
-rw-r--r--TAO/tao/Reactive_Flushing_Strategy.cpp3
-rw-r--r--TAO/tao/Transport.cpp82
-rw-r--r--TAO/tests/Big_Oneways/Session.cpp29
-rw-r--r--TAO/tests/Big_Oneways/Session.h6
-rw-r--r--TAO/tests/Big_Oneways/Test.idl3
-rw-r--r--TAO/tests/Big_Oneways/server.cpp2
9 files changed, 104 insertions, 53 deletions
diff --git a/TAO/ChangeLogs/ChangeLog-02a b/TAO/ChangeLogs/ChangeLog-02a
index 7dffcaeeff3..8a00270af23 100644
--- a/TAO/ChangeLogs/ChangeLog-02a
+++ b/TAO/ChangeLogs/ChangeLog-02a
@@ -1,3 +1,30 @@
+Thu Apr 05 10:36:57 2001 Carlos O'Ryan <coryan@uci.edu>
+
+ * tao/Queued_Message.h:
+ Remove the declaration of the done() method, the semantics were
+ not clear and it was removed a couple of iterations ago.
+
+ * tao/Block_Flushing_Strategy.cpp:
+ * tao/Reactive_Flushing_Strategy.cpp:
+ Wait until all_data_sent() returns.
+
+ * tao/Transport.cpp:
+ Cleanup handle_output() no need to loop, the drain_queue()
+ method does that.
+ After trying to send a message and blocking the send_message_i()
+ method was not updating the new Asynch_Queued_Message with the
+ number of bytes sent.
+ drain_queue() loop was too complicated for its own sake.
+
+ * tests/Big_Oneways/Session.h:
+ * tests/Big_Oneways/Test.idl:
+ * tests/Big_Oneways/Session.cpp:
+ Add methods to prime the connections among multiple clients.
+
+ * tests/Big_Oneways/server.cpp:
+ Increase timeout for initial session registration. Important
+ for manual executions.
+
Wed Apr 4 10:53:27 2001 Carlos O'Ryan <coryan@uci.edu>
* tao/Transport.h:
diff --git a/TAO/tao/Block_Flushing_Strategy.cpp b/TAO/tao/Block_Flushing_Strategy.cpp
index bd49b02ad86..4703a5e6455 100644
--- a/TAO/tao/Block_Flushing_Strategy.cpp
+++ b/TAO/tao/Block_Flushing_Strategy.cpp
@@ -23,7 +23,7 @@ int
TAO_Block_Flushing_Strategy::flush_message (TAO_Transport *transport,
TAO_Queued_Message *msg)
{
- while (!msg->done ())
+ while (!msg->all_data_sent ())
{
int result = transport->handle_output ();
if (result == -1)
diff --git a/TAO/tao/Queued_Message.h b/TAO/tao/Queued_Message.h
index 4763a1efa83..b050d1a83c8 100644
--- a/TAO/tao/Queued_Message.h
+++ b/TAO/tao/Queued_Message.h
@@ -75,9 +75,6 @@ public:
/// Destructor
virtual ~TAO_Queued_Message (void);
- /// Return 0 if the message has not been completely sent
- int done (void) const;
-
/// The underlying connection has been closed, release resources and
/// signal waiting threads.
void connection_closed (void);
diff --git a/TAO/tao/Reactive_Flushing_Strategy.cpp b/TAO/tao/Reactive_Flushing_Strategy.cpp
index d76308de211..8add2b98b36 100644
--- a/TAO/tao/Reactive_Flushing_Strategy.cpp
+++ b/TAO/tao/Reactive_Flushing_Strategy.cpp
@@ -32,7 +32,8 @@ TAO_Reactive_Flushing_Strategy::flush_message (TAO_Transport *transport,
ACE_DECLARE_NEW_CORBA_ENV;
ACE_TRY
{
- while (!msg->done () && result > 0)
+ while (!msg->all_data_sent ()
+ && result > 0)
{
result = orb_core->run (0, 1, ACE_TRY_ENV);
ACE_TRY_CHECK;
diff --git a/TAO/tao/Transport.cpp b/TAO/tao/Transport.cpp
index 7305809713c..814bd170f97 100644
--- a/TAO/tao/Transport.cpp
+++ b/TAO/tao/Transport.cpp
@@ -113,36 +113,30 @@ TAO_Transport::handle_output ()
this->id ()));
}
- int retval;
- do
+ // The flushing strategy (potentially via the Reactor) wants to send
+ // more data, first check if there is a current message that needs
+ // more sending...
+ int retval = this->drain_queue ();
+
+ if (TAO_debug_level > 4)
{
- // The reactor is asking us to send more data, first check if
- // there is a current message that needs more sending:
- retval = this->drain_queue ();
+ ACE_DEBUG ((LM_DEBUG,
+ "TAO (%P|%t) - Transport[%d]::handle_output, "
+ "drain_queue returns %d/%d\n",
+ this->id (),
+ retval, errno));
+ }
- if (TAO_debug_level > 4)
- {
- ACE_DEBUG ((LM_DEBUG,
- "TAO (%P|%t) - Transport[%d]::handle_output, "
- "drain_queue returns %d/%d\n",
- this->id (),
- retval, errno));
- }
+ if (retval == 1)
+ {
+ // ... there is no current message or it was completely
+ // sent, cancel output...
+ TAO_Flushing_Strategy *flushing_strategy =
+ this->orb_core ()->flushing_strategy ();
- if (retval == 1)
- {
- // ... there is no current message or it was completely
- // sent, time to check the queue....
- // ... no more messages in the queue, cancel output...
- TAO_Flushing_Strategy *flushing_strategy =
- this->orb_core ()->flushing_strategy ();
-
- flushing_strategy->cancel_output (this);
- return 0;
- }
- // ... on Win32 we must continue until we get EWOULDBLOCK ...
+ flushing_strategy->cancel_output (this);
+ return 0;
}
- while (retval > 0);
// Any errors are returned directly to the Reactor
return retval;
@@ -403,6 +397,8 @@ TAO_Transport::send_message_i (TAO_Stub *stub,
ace_mon.acquire ();
}
+ ACE_ASSERT (synch_message.next () == 0);
+ ACE_ASSERT (synch_message.prev () == 0);
synch_message.destroy ();
return result;
}
@@ -424,6 +420,7 @@ TAO_Transport::send_message_i (TAO_Stub *stub,
// ... insert at the head of the queue, we can use push_back()
// because the queue is empty ...
+ queued_message->bytes_transferred (byte_count);
queued_message->push_back (this->head_, this->tail_);
// ... this is not a twoway. We must check if the buffering
@@ -730,24 +727,15 @@ TAO_Transport::drain_queue (void)
// We loop over all the elements in the queue ...
TAO_Queued_Message *i = this->head_;
- do
+ while (i != 0)
{
// ... each element fills the iovector ...
i->fill_iov (IOV_MAX, iovcnt, iov);
- // ... if the vector is not full we tack another message into
- // the vector ...
- if (iovcnt != IOV_MAX)
- {
- // Go for the next element in the list
- i = i->next ();
- continue;
- }
-
- // ... time to send data because the vector is full. We need to
- // loop because a single message can span multiple IOV_MAX
- // elements ...
- while (iovcnt == IOV_MAX)
+ // ... the vector is full, no choice but to send some data out.
+ // We need to loop because a single message can span multiple
+ // IOV_MAX elements ...
+ if (iovcnt == IOV_MAX)
{
size_t byte_count;
@@ -772,14 +760,16 @@ TAO_Transport::drain_queue (void)
return -1;
}
- if (this->head_ == 0)
- break;
-
- /// Message <i> may have been only partially sent...
- i->fill_iov (IOV_MAX, iovcnt, iov);
+ // ... start over, how do we guarantee progress? Because if
+ // no bytes are sent send() can only return 0 or -1
+ ACE_ASSERT (byte_count != 0);
+ i = this->head_;
+ continue;
}
+ // ... notice that this line is only reached if there is still
+ // room in the iovector ...
+ i = i->next ();
}
- while (this->head_ != 0);
size_t byte_count;
ssize_t retval =
diff --git a/TAO/tests/Big_Oneways/Session.cpp b/TAO/tests/Big_Oneways/Session.cpp
index d923ad779f8..409a01595b0 100644
--- a/TAO/tests/Big_Oneways/Session.cpp
+++ b/TAO/tests/Big_Oneways/Session.cpp
@@ -51,6 +51,9 @@ Session::svc (void)
CORBA::ULong session_count =
this->other_sessions_.length ();
+ this->validate_connections (ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+
for (CORBA::ULong i = 0; i != this->message_count_; ++i)
{
#if 0
@@ -96,6 +99,25 @@ Session::svc (void)
}
void
+Session::validate_connections (CORBA::Environment &ACE_TRY_ENV)
+{
+ CORBA::ULong session_count =
+ this->other_sessions_.length ();
+ for (CORBA::ULong i = 0; i != 100; ++i)
+ {
+ for (CORBA::ULong j = 0; j != session_count; ++j)
+ {
+ ACE_TRY
+ {
+ this->other_sessions_[j]->ping (ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+ }
+ ACE_CATCHANY {} ACE_ENDTRY;
+ }
+ }
+}
+
+void
Session::start (const Test::Session_List &other_sessions,
CORBA::Environment &ACE_TRY_ENV)
ACE_THROW_SPEC ((CORBA::SystemException,
@@ -154,6 +176,11 @@ Session::start (const Test::Session_List &other_sessions,
}
void
+Session::ping (CORBA::Environment &) ACE_THROW_SPEC (())
+{
+}
+
+void
Session::receive_payload (const Test::Payload &the_payload,
CORBA::Environment &ACE_TRY_ENV)
ACE_THROW_SPEC ((CORBA::SystemException))
@@ -176,7 +203,7 @@ Session::receive_payload (const Test::Payload &the_payload,
if (this->expected_messages_ < 500)
verbose = (this->expected_messages_ % 100 == 0);
if (this->expected_messages_ < 100)
- verbose = (this->expected_messages_ % 10 == 0);
+ verbose = (this->expected_messages_ % 10 == 0);
#endif /* 0 */
if (this->expected_messages_ < 5)
verbose = 1;
diff --git a/TAO/tests/Big_Oneways/Session.h b/TAO/tests/Big_Oneways/Session.h
index 2ed4501a940..71a05cbe5d1 100644
--- a/TAO/tests/Big_Oneways/Session.h
+++ b/TAO/tests/Big_Oneways/Session.h
@@ -42,6 +42,8 @@ public:
Test::Already_Running,
Test::No_Peers));
+ virtual void ping (CORBA::Environment &) ACE_THROW_SPEC (());
+
virtual void receive_payload (const Test::Payload &the_payload,
CORBA::Environment &ACE_TRY_ENV)
ACE_THROW_SPEC ((CORBA::SystemException));
@@ -59,6 +61,10 @@ private:
/// Return 1 if all the work in this session has been completed
int more_work (void) const;
+ /// Make sure that all threads have connections avaiable to the
+ /// other sessions.
+ void validate_connections (CORBA::Environment &ACE_TRY_ENV);
+
private:
/// Synchronize the internal state
ACE_SYNCH_MUTEX mutex_;
diff --git a/TAO/tests/Big_Oneways/Test.idl b/TAO/tests/Big_Oneways/Test.idl
index e9825b3241d..60065cf85fc 100644
--- a/TAO/tests/Big_Oneways/Test.idl
+++ b/TAO/tests/Big_Oneways/Test.idl
@@ -60,6 +60,9 @@ module Test
void start (in Session_List other_sessions)
raises (Already_Running, No_Peers);
+ /// Ping the session, used to validate all connections
+ void ping ();
+
/// Receive the payload
oneway void receive_payload (in Payload the_payload);
diff --git a/TAO/tests/Big_Oneways/server.cpp b/TAO/tests/Big_Oneways/server.cpp
index becf7ab3861..b09e4283d0c 100644
--- a/TAO/tests/Big_Oneways/server.cpp
+++ b/TAO/tests/Big_Oneways/server.cpp
@@ -115,7 +115,7 @@ main (int argc, char *argv[])
ACE_DEBUG ((LM_DEBUG, "Waiting for peers . . . "));
for (int i = 0;
- i != 30 && !coordinator_impl->has_all_peers ();
+ i != 60 && !coordinator_impl->has_all_peers ();
++i)
{
ACE_Time_Value tv (1, 0);