From 1dddc51e6ff3e372eabb661bda47956d7a1a0c5e Mon Sep 17 00:00:00 2001 From: coryan Date: Thu, 5 Apr 2001 18:45:14 +0000 Subject: ChangeLogTag:Thu Apr 05 10:36:57 2001 Carlos O'Ryan --- TAO/ChangeLogs/ChangeLog-02a | 27 +++++++++++ TAO/tao/Block_Flushing_Strategy.cpp | 2 +- TAO/tao/Queued_Message.h | 3 -- TAO/tao/Reactive_Flushing_Strategy.cpp | 3 +- TAO/tao/Transport.cpp | 82 +++++++++++++++------------------- TAO/tests/Big_Oneways/Session.cpp | 29 +++++++++++- TAO/tests/Big_Oneways/Session.h | 6 +++ TAO/tests/Big_Oneways/Test.idl | 3 ++ TAO/tests/Big_Oneways/server.cpp | 2 +- 9 files changed, 104 insertions(+), 53 deletions(-) diff --git a/TAO/ChangeLogs/ChangeLog-02a b/TAO/ChangeLogs/ChangeLog-02a index 7dffcaeeff3..8a00270af23 100644 --- a/TAO/ChangeLogs/ChangeLog-02a +++ b/TAO/ChangeLogs/ChangeLog-02a @@ -1,3 +1,30 @@ +Thu Apr 05 10:36:57 2001 Carlos O'Ryan + + * tao/Queued_Message.h: + Remove the declaration of the done() method, the semantics were + not clear and it was removed a couple of iterations ago. + + * tao/Block_Flushing_Strategy.cpp: + * tao/Reactive_Flushing_Strategy.cpp: + Wait until all_data_sent() returns. + + * tao/Transport.cpp: + Cleanup handle_output() no need to loop, the drain_queue() + method does that. + After trying to send a message and blocking the send_message_i() + method was not updating the new Asynch_Queued_Message with the + number of bytes sent. + drain_queue() loop was too complicated for its own sake. + + * tests/Big_Oneways/Session.h: + * tests/Big_Oneways/Test.idl: + * tests/Big_Oneways/Session.cpp: + Add methods to prime the connections among multiple clients. + + * tests/Big_Oneways/server.cpp: + Increase timeout for initial session registration. Important + for manual executions. + Wed Apr 4 10:53:27 2001 Carlos O'Ryan * tao/Transport.h: diff --git a/TAO/tao/Block_Flushing_Strategy.cpp b/TAO/tao/Block_Flushing_Strategy.cpp index bd49b02ad86..4703a5e6455 100644 --- a/TAO/tao/Block_Flushing_Strategy.cpp +++ b/TAO/tao/Block_Flushing_Strategy.cpp @@ -23,7 +23,7 @@ int TAO_Block_Flushing_Strategy::flush_message (TAO_Transport *transport, TAO_Queued_Message *msg) { - while (!msg->done ()) + while (!msg->all_data_sent ()) { int result = transport->handle_output (); if (result == -1) diff --git a/TAO/tao/Queued_Message.h b/TAO/tao/Queued_Message.h index 4763a1efa83..b050d1a83c8 100644 --- a/TAO/tao/Queued_Message.h +++ b/TAO/tao/Queued_Message.h @@ -75,9 +75,6 @@ public: /// Destructor virtual ~TAO_Queued_Message (void); - /// Return 0 if the message has not been completely sent - int done (void) const; - /// The underlying connection has been closed, release resources and /// signal waiting threads. void connection_closed (void); diff --git a/TAO/tao/Reactive_Flushing_Strategy.cpp b/TAO/tao/Reactive_Flushing_Strategy.cpp index d76308de211..8add2b98b36 100644 --- a/TAO/tao/Reactive_Flushing_Strategy.cpp +++ b/TAO/tao/Reactive_Flushing_Strategy.cpp @@ -32,7 +32,8 @@ TAO_Reactive_Flushing_Strategy::flush_message (TAO_Transport *transport, ACE_DECLARE_NEW_CORBA_ENV; ACE_TRY { - while (!msg->done () && result > 0) + while (!msg->all_data_sent () + && result > 0) { result = orb_core->run (0, 1, ACE_TRY_ENV); ACE_TRY_CHECK; diff --git a/TAO/tao/Transport.cpp b/TAO/tao/Transport.cpp index 7305809713c..814bd170f97 100644 --- a/TAO/tao/Transport.cpp +++ b/TAO/tao/Transport.cpp @@ -113,36 +113,30 @@ TAO_Transport::handle_output () this->id ())); } - int retval; - do + // The flushing strategy (potentially via the Reactor) wants to send + // more data, first check if there is a current message that needs + // more sending... + int retval = this->drain_queue (); + + if (TAO_debug_level > 4) { - // The reactor is asking us to send more data, first check if - // there is a current message that needs more sending: - retval = this->drain_queue (); + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - Transport[%d]::handle_output, " + "drain_queue returns %d/%d\n", + this->id (), + retval, errno)); + } - if (TAO_debug_level > 4) - { - ACE_DEBUG ((LM_DEBUG, - "TAO (%P|%t) - Transport[%d]::handle_output, " - "drain_queue returns %d/%d\n", - this->id (), - retval, errno)); - } + if (retval == 1) + { + // ... there is no current message or it was completely + // sent, cancel output... + TAO_Flushing_Strategy *flushing_strategy = + this->orb_core ()->flushing_strategy (); - if (retval == 1) - { - // ... there is no current message or it was completely - // sent, time to check the queue.... - // ... no more messages in the queue, cancel output... - TAO_Flushing_Strategy *flushing_strategy = - this->orb_core ()->flushing_strategy (); - - flushing_strategy->cancel_output (this); - return 0; - } - // ... on Win32 we must continue until we get EWOULDBLOCK ... + flushing_strategy->cancel_output (this); + return 0; } - while (retval > 0); // Any errors are returned directly to the Reactor return retval; @@ -403,6 +397,8 @@ TAO_Transport::send_message_i (TAO_Stub *stub, ace_mon.acquire (); } + ACE_ASSERT (synch_message.next () == 0); + ACE_ASSERT (synch_message.prev () == 0); synch_message.destroy (); return result; } @@ -424,6 +420,7 @@ TAO_Transport::send_message_i (TAO_Stub *stub, // ... insert at the head of the queue, we can use push_back() // because the queue is empty ... + queued_message->bytes_transferred (byte_count); queued_message->push_back (this->head_, this->tail_); // ... this is not a twoway. We must check if the buffering @@ -730,24 +727,15 @@ TAO_Transport::drain_queue (void) // We loop over all the elements in the queue ... TAO_Queued_Message *i = this->head_; - do + while (i != 0) { // ... each element fills the iovector ... i->fill_iov (IOV_MAX, iovcnt, iov); - // ... if the vector is not full we tack another message into - // the vector ... - if (iovcnt != IOV_MAX) - { - // Go for the next element in the list - i = i->next (); - continue; - } - - // ... time to send data because the vector is full. We need to - // loop because a single message can span multiple IOV_MAX - // elements ... - while (iovcnt == IOV_MAX) + // ... the vector is full, no choice but to send some data out. + // We need to loop because a single message can span multiple + // IOV_MAX elements ... + if (iovcnt == IOV_MAX) { size_t byte_count; @@ -772,14 +760,16 @@ TAO_Transport::drain_queue (void) return -1; } - if (this->head_ == 0) - break; - - /// Message may have been only partially sent... - i->fill_iov (IOV_MAX, iovcnt, iov); + // ... start over, how do we guarantee progress? Because if + // no bytes are sent send() can only return 0 or -1 + ACE_ASSERT (byte_count != 0); + i = this->head_; + continue; } + // ... notice that this line is only reached if there is still + // room in the iovector ... + i = i->next (); } - while (this->head_ != 0); size_t byte_count; ssize_t retval = diff --git a/TAO/tests/Big_Oneways/Session.cpp b/TAO/tests/Big_Oneways/Session.cpp index d923ad779f8..409a01595b0 100644 --- a/TAO/tests/Big_Oneways/Session.cpp +++ b/TAO/tests/Big_Oneways/Session.cpp @@ -51,6 +51,9 @@ Session::svc (void) CORBA::ULong session_count = this->other_sessions_.length (); + this->validate_connections (ACE_TRY_ENV); + ACE_TRY_CHECK; + for (CORBA::ULong i = 0; i != this->message_count_; ++i) { #if 0 @@ -95,6 +98,25 @@ Session::svc (void) return 0; } +void +Session::validate_connections (CORBA::Environment &ACE_TRY_ENV) +{ + CORBA::ULong session_count = + this->other_sessions_.length (); + for (CORBA::ULong i = 0; i != 100; ++i) + { + for (CORBA::ULong j = 0; j != session_count; ++j) + { + ACE_TRY + { + this->other_sessions_[j]->ping (ACE_TRY_ENV); + ACE_TRY_CHECK; + } + ACE_CATCHANY {} ACE_ENDTRY; + } + } +} + void Session::start (const Test::Session_List &other_sessions, CORBA::Environment &ACE_TRY_ENV) @@ -153,6 +175,11 @@ Session::start (const Test::Session_List &other_sessions, this->terminate (0, ACE_TRY_ENV); } +void +Session::ping (CORBA::Environment &) ACE_THROW_SPEC (()) +{ +} + void Session::receive_payload (const Test::Payload &the_payload, CORBA::Environment &ACE_TRY_ENV) @@ -176,7 +203,7 @@ Session::receive_payload (const Test::Payload &the_payload, if (this->expected_messages_ < 500) verbose = (this->expected_messages_ % 100 == 0); if (this->expected_messages_ < 100) - verbose = (this->expected_messages_ % 10 == 0); + verbose = (this->expected_messages_ % 10 == 0); #endif /* 0 */ if (this->expected_messages_ < 5) verbose = 1; diff --git a/TAO/tests/Big_Oneways/Session.h b/TAO/tests/Big_Oneways/Session.h index 2ed4501a940..71a05cbe5d1 100644 --- a/TAO/tests/Big_Oneways/Session.h +++ b/TAO/tests/Big_Oneways/Session.h @@ -42,6 +42,8 @@ public: Test::Already_Running, Test::No_Peers)); + virtual void ping (CORBA::Environment &) ACE_THROW_SPEC (()); + virtual void receive_payload (const Test::Payload &the_payload, CORBA::Environment &ACE_TRY_ENV) ACE_THROW_SPEC ((CORBA::SystemException)); @@ -59,6 +61,10 @@ private: /// Return 1 if all the work in this session has been completed int more_work (void) const; + /// Make sure that all threads have connections avaiable to the + /// other sessions. + void validate_connections (CORBA::Environment &ACE_TRY_ENV); + private: /// Synchronize the internal state ACE_SYNCH_MUTEX mutex_; diff --git a/TAO/tests/Big_Oneways/Test.idl b/TAO/tests/Big_Oneways/Test.idl index e9825b3241d..60065cf85fc 100644 --- a/TAO/tests/Big_Oneways/Test.idl +++ b/TAO/tests/Big_Oneways/Test.idl @@ -60,6 +60,9 @@ module Test void start (in Session_List other_sessions) raises (Already_Running, No_Peers); + /// Ping the session, used to validate all connections + void ping (); + /// Receive the payload oneway void receive_payload (in Payload the_payload); diff --git a/TAO/tests/Big_Oneways/server.cpp b/TAO/tests/Big_Oneways/server.cpp index becf7ab3861..b09e4283d0c 100644 --- a/TAO/tests/Big_Oneways/server.cpp +++ b/TAO/tests/Big_Oneways/server.cpp @@ -115,7 +115,7 @@ main (int argc, char *argv[]) ACE_DEBUG ((LM_DEBUG, "Waiting for peers . . . ")); for (int i = 0; - i != 30 && !coordinator_impl->has_all_peers (); + i != 60 && !coordinator_impl->has_all_peers (); ++i) { ACE_Time_Value tv (1, 0); -- cgit v1.2.1