diff options
author | coryan <coryan@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 2009-04-25 11:34:50 +0000 |
---|---|---|
committer | coryan <coryan@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 2009-04-25 11:34:50 +0000 |
commit | e6e16136e56c0277b7c25ad22ad2f4f1e155c4aa (patch) | |
tree | 7b985a05db795192f577ef9980356f7fe2f530e5 | |
parent | 3df1c42feb0bd3296c31d80e84d3419e7f34b5d7 (diff) | |
download | ATCD-e6e16136e56c0277b7c25ad22ad2f4f1e155c4aa.tar.gz |
Sat Apr 25 11:31:50 UTC 2009 Carlos O'Ryan <coryan@atdesk.com>
-rw-r--r-- | TAO/ChangeLog | 35 | ||||
-rw-r--r-- | TAO/tao/Block_Flushing_Strategy.cpp | 12 | ||||
-rw-r--r-- | TAO/tao/Connection_Handler.cpp | 4 | ||||
-rw-r--r-- | TAO/tao/GIOP_Message_Base.cpp | 19 | ||||
-rw-r--r-- | TAO/tao/Transport.cpp | 83 | ||||
-rw-r--r-- | TAO/tao/Transport.h | 115 | ||||
-rw-r--r-- | TAO/tests/Bug_3647_Regression/Backend_Impl.cpp | 1 | ||||
-rw-r--r-- | TAO/tests/Bug_3647_Regression/Middle_Impl.cpp | 2 | ||||
-rw-r--r-- | TAO/tests/Bug_3647_Regression/client.cpp | 2 | ||||
-rwxr-xr-x | TAO/tests/Bug_3647_Regression/run_test.pl | 28 |
10 files changed, 235 insertions, 66 deletions
diff --git a/TAO/ChangeLog b/TAO/ChangeLog index bfc3d778156..568e377d302 100644 --- a/TAO/ChangeLog +++ b/TAO/ChangeLog @@ -1,3 +1,38 @@ +Sat Apr 25 11:31:50 UTC 2009 Carlos O'Ryan <coryan@atdesk.com> + + * tao/Transport.h: + * tao/Transport.cpp: + * tao/GIOP_Message_Base.cpp: + * tao/Block_Flushing_Strategy.cpp: + * tao/Connection_Handler.cpp: + Seemingly completed the fixes for 3647. + Fundamentally, the calls to sendv() need to use a timeout + parameter when called with the blocking flushing strategy or + with the read-write waiting strategy *and* when there is a + timeout. + Unfortunately, the point(s) where we call sendv() does not have + enough context to determine if the parameter is needed. + I changed the Transport class to pass a little struct with both + the timeout value and flag to indicate if using blocking I/O + calls was desired. + The caller makes the determination and passes the parameter into + the Transport object, for example, the Block_Flushing_Strategy + certainly wants to use blocking I/O calls. + Several interface in TAO_Transport changed, and so did its + callers. + + * tests/Bug_3647_Regression/client.cpp: + * tests/Bug_3647_Regression/Middle_Impl.cpp: + * tests/Bug_3647_Regression/Backend_Impl.cpp: + * tests/Bug_3647_Regression/run_test.pl: + Fine-tune the test so it would pass all the time. The default + parameters showed the problem before the changes, but then + failed due to a timeout during shutdown. + Also expanded run_test.pl to test with SYNC_NONE vs. other + policies. It was important to me to verify that the test + continues to fail with SYNC_WITH_SERVER, so my "fine tuning" did + not hide real errors. + Fri Apr 24 13:57:18 UTC 2009 Carlos O'Ryan <coryan@atdesk.com> * tao/Transport.h: diff --git a/TAO/tao/Block_Flushing_Strategy.cpp b/TAO/tao/Block_Flushing_Strategy.cpp index f6ef164b948..397e4717006 100644 --- a/TAO/tao/Block_Flushing_Strategy.cpp +++ b/TAO/tao/Block_Flushing_Strategy.cpp @@ -27,19 +27,23 @@ TAO_Block_Flushing_Strategy::flush_message (TAO_Transport *transport, { while (!msg->all_data_sent ()) { - if (transport->handle_output (max_wait_time) == -1) + TAO::Transport::Drain_Constraints dc( + max_wait_time, true); + if (transport->handle_output (dc) == -1) return -1; } return 0; } int -TAO_Block_Flushing_Strategy::flush_transport (TAO_Transport *transport - , ACE_Time_Value *max_wait_time) +TAO_Block_Flushing_Strategy::flush_transport (TAO_Transport *transport, + ACE_Time_Value *max_wait_time) { while (!transport->queue_is_empty ()) { - if (transport->handle_output (max_wait_time) == -1) + TAO::Transport::Drain_Constraints dc( + max_wait_time, true); + if (transport->handle_output (dc) == -1) return -1; } return 0; diff --git a/TAO/tao/Connection_Handler.cpp b/TAO/tao/Connection_Handler.cpp index 852e8557e5b..269ffd38024 100644 --- a/TAO/tao/Connection_Handler.cpp +++ b/TAO/tao/Connection_Handler.cpp @@ -206,7 +206,9 @@ TAO_Connection_Handler::handle_output_eh ( return return_value; } - return_value = this->transport ()->handle_output (0); + // The default constraints are to never block. + TAO::Transport::Drain_Constraints dc; + return_value = this->transport ()->handle_output (dc); this->pos_io_hook (return_value); diff --git a/TAO/tao/GIOP_Message_Base.cpp b/TAO/tao/GIOP_Message_Base.cpp index 6e3d3fbf3bc..56ff8f1c3a2 100644 --- a/TAO/tao/GIOP_Message_Base.cpp +++ b/TAO/tao/GIOP_Message_Base.cpp @@ -1437,25 +1437,6 @@ TAO_GIOP_Message_Base:: TAO_GIOP_MESSAGE_HEADER_LEN); } -#if 0 - // @@CJC I don't think we need this check b/c the transport's send() - // will simply return -1. However, I guess we could create something - // like TAO_Tranport::is_closed() that returns whether the connection - // is already closed. The problem with that, however, is that it's - // entirely possible that is_closed() could return TRUE, and then the - // transport could get closed down btw. the time it gets called and the - // time that the send actually occurs. - ACE_HANDLE which = transport->handle (); - if (which == ACE_INVALID_HANDLE) - { - if (TAO_debug_level > 0) - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("TAO (%P|%t) TAO_GIOP_Message_Base::send_close_connection -") - ACE_TEXT (" connection already closed\n"))); - return; - } -#endif - ACE_Data_Block data_block (TAO_GIOP_MESSAGE_HEADER_LEN, ACE_Message_Block::MB_DATA, close_message, diff --git a/TAO/tao/Transport.cpp b/TAO/tao/Transport.cpp index ab79d3ef08d..2d1716e32b6 100644 --- a/TAO/tao/Transport.cpp +++ b/TAO/tao/Transport.cpp @@ -401,7 +401,7 @@ TAO_Transport::sendfile (TAO_MMAP_Allocator * /* allocator */, iovec * iov, int iovcnt, size_t &bytes_transferred, - ACE_Time_Value const * timeout) + TAO::Transport::Drain_Constraints const & dc) { // Concrete pluggable transport doesn't implement sendfile(). // Fallback on TAO_Transport::send(). @@ -411,7 +411,7 @@ TAO_Transport::sendfile (TAO_MMAP_Allocator * /* allocator */, // specific configuration out of this base class method. // -Ossama return this->send (iov, iovcnt, bytes_transferred, - this->io_timeout (timeout)); + this->io_timeout (dc)); } #endif /* TAO_HAS_SENDFILE==1 */ @@ -522,19 +522,23 @@ TAO_Transport::update_transport (void) * */ int -TAO_Transport::handle_output (ACE_Time_Value *max_wait_time) +TAO_Transport::handle_output (TAO::Transport::Drain_Constraints const & dc) { if (TAO_debug_level > 3) { ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_output\n"), - this->id ())); + ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_output" + " - block_on_io=%d, timeout=%d.%06d\n"), + this->id (), + dc.block_on_io(), + dc.timeout() ? dc.timeout()->sec() : -1, + dc.timeout() ? dc.timeout()->usec() : -1 )); } // The flushing strategy (potentially via the Reactor) wants to send // more data, first check if there is a current message that needs // more sending... - int const retval = this->drain_queue (max_wait_time); + int const retval = this->drain_queue (dc); if (TAO_debug_level > 3) { @@ -567,15 +571,18 @@ TAO_Transport::send_message_block_chain (const ACE_Message_Block *mb, { ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->handler_lock_, -1); + TAO::Transport::Drain_Constraints dc( + max_wait_time, true); + return this->send_message_block_chain_i (mb, bytes_transferred, - max_wait_time); + dc); } int TAO_Transport::send_message_block_chain_i (const ACE_Message_Block *mb, size_t &bytes_transferred, - ACE_Time_Value *max_wait_time) + TAO::Transport::Drain_Constraints const & dc) { size_t const total_length = mb->total_length (); @@ -585,7 +592,7 @@ TAO_Transport::send_message_block_chain_i (const ACE_Message_Block *mb, synch_message.push_back (this->head_, this->tail_); - int const n = this->drain_queue_i (max_wait_time); + int const n = this->drain_queue_i (dc); if (n == -1) { @@ -770,7 +777,10 @@ int TAO_Transport::send_synch_message_helper_i (TAO_Synch_Queued_Message &synch_message, ACE_Time_Value * max_wait_time) { - int const n = this->drain_queue_i (max_wait_time); + TAO::Transport::Drain_Constraints dc( + max_wait_time, this->using_blocking_io_for_synch_messages()); + + int const n = this->drain_queue_i (dc); if (n == -1) { @@ -891,10 +901,10 @@ TAO_Transport::handle_timeout (const ACE_Time_Value & /* current_time */, } int -TAO_Transport::drain_queue (ACE_Time_Value *max_wait_time) +TAO_Transport::drain_queue (TAO::Transport::Drain_Constraints const & dc) { ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->handler_lock_, -1); - int const retval = this->drain_queue_i (max_wait_time); + int const retval = this->drain_queue_i (dc); if (retval == 1) { @@ -912,10 +922,10 @@ TAO_Transport::drain_queue (ACE_Time_Value *max_wait_time) } int -TAO_Transport::drain_queue_helper (int &iovcnt, iovec iov[], ACE_Time_Value *max_wait_time) +TAO_Transport::drain_queue_helper (int &iovcnt, iovec iov[], + TAO::Transport::Drain_Constraints const & dc) { size_t byte_count = 0; - ACE_Countdown_Time countdown (max_wait_time); // ... send the message ... ssize_t retval = -1; @@ -925,11 +935,12 @@ TAO_Transport::drain_queue_helper (int &iovcnt, iovec iov[], ACE_Time_Value *max retval = this->sendfile (this->mmap_allocator_, iov, iovcnt, - byte_count); + byte_count, + dc); else #endif /* TAO_HAS_SENDFILE==1 */ retval = this->send (iov, iovcnt, byte_count, - this->io_timeout (max_wait_time)); + this->io_timeout (dc)); if (TAO_debug_level == 5) { @@ -990,7 +1001,7 @@ TAO_Transport::drain_queue_helper (int &iovcnt, iovec iov[], ACE_Time_Value *max } int -TAO_Transport::drain_queue_i (ACE_Time_Value *max_wait_time) +TAO_Transport::drain_queue_i (TAO::Transport::Drain_Constraints const & dc) { // This is the vector used to send data, it must be declared outside // the loop because after the loop there may still be data to be @@ -1041,8 +1052,7 @@ TAO_Transport::drain_queue_i (ACE_Time_Value *max_wait_time) // IOV_MAX elements ... if (iovcnt == ACE_IOV_MAX) { - int const retval = this->drain_queue_helper (iovcnt, iov, - max_wait_time); + int const retval = this->drain_queue_helper (iovcnt, iov, dc); now = ACE_High_Res_Timer::gettimeofday_hr (); @@ -1069,7 +1079,7 @@ TAO_Transport::drain_queue_i (ACE_Time_Value *max_wait_time) if (iovcnt != 0) { - int const retval = this->drain_queue_helper (iovcnt, iov, max_wait_time); + int const retval = this->drain_queue_helper (iovcnt, iov, dc); if (TAO_debug_level > 4) { @@ -1347,6 +1357,9 @@ TAO_Transport::send_asynchronous_message_i (TAO_Stub *stub, bool partially_sent = false; bool timeout_encountered = false; + TAO::Transport::Drain_Constraints dc( + max_wait_time, this->using_blocking_io_for_asynch_messages()); + if (try_sending_first) { ssize_t n = 0; @@ -1369,7 +1382,7 @@ TAO_Transport::send_asynchronous_message_i (TAO_Stub *stub, // code I will re-visit this decision n = this->send_message_block_chain_i (message_block, byte_count, - max_wait_time); + dc); if (n == -1) { @@ -1854,7 +1867,7 @@ TAO_Transport::handle_input_missing_data (TAO_Resume_Handle &rh, // Read the message into the existing message block on heap ssize_t const n = this->recv (q_data->msg_block ()->wr_ptr(), recv_size, - this->io_timeout (max_wait_time)); + max_wait_time); if (n <= 0) { @@ -2070,7 +2083,7 @@ TAO_Transport::handle_input_parse_data (TAO_Resume_Handle &rh, // the stack. ssize_t const n = this->recv (message_block.wr_ptr (), recv_size, - this->io_timeout (max_wait_time)); + max_wait_time); // If there is an error return to the reactor.. if (n <= 0) @@ -2759,13 +2772,33 @@ TAO_Transport::set_bidir_context_info (TAO_Operation_Details &) ACE_Time_Value const * TAO_Transport::io_timeout( - ACE_Time_Value const * operation_timeout) const + TAO::Transport::Drain_Constraints const & dc) const { + if (dc.block_on_io()) + { + return dc.timeout(); + } if (this->wait_strategy()->can_process_upcalls()) { return 0; } - return operation_timeout; + return dc.timeout(); +} + +bool +TAO_Transport::using_blocking_io_for_synch_messages() const +{ + if (this->wait_strategy()->can_process_upcalls()) + { + return false; + } + return true; +} + +bool +TAO_Transport::using_blocking_io_for_asynch_messages() const +{ + return false; } /* diff --git a/TAO/tao/Transport.h b/TAO/tao/Transport.h index eaf07814cec..689b52ef7ee 100644 --- a/TAO/tao/Transport.h +++ b/TAO/tao/Transport.h @@ -69,6 +69,80 @@ namespace TAO /// Transport-level statistics. Initially introduced to support /// the "Transport Current" functionality. class Stats; + + /** + * @struct Drain_Constraints + * + * @brief Encapsulate the flushing control parameters. + * + * At several points, the ORB needs to flush data from a transport to the + * underlying I/O mechanisms. How this data is flushed depends on the + * context where the request is made, the ORB configuration and the + * application level policies in effect. + * + * Some examples: + * + * # When idle, the ORB will want to send data on any socket that has + * space available. In this case, the queue must be drained on + * a best-effort basis, without any blocking. + * # If the ORB is configured to handle nested upcalls, any two-way + * request should block and push data to the underlying socket as fast + * as possible. + * # In the same use-case, but now with a timeout policy in + * effect, the ORB will need to send the data use I/O operations with + * timeouts (as implemented by ACE::sendv() + * # When the ORB is configured to support nested upcalls, any two-way, + * reliable oneway or similar should wait using the reactor or + * Leader-Follower implementation. While still respecting the timeout + * policies. + * + * Instead of sprinkling if() statements throughput the critical path + * trying to determine how the I/O operations should be performed, we + * pass the information encapsulated in this class. The caller into the + * Transport object determines the right parameters to use, and the + * Transport object simply obeys those instructions. + */ + class Drain_Constraints : private ACE_Copy_Disabled + { + public: + /// Default constructor + Drain_Constraints() + : timeout_(0) + , block_on_io_(false) + { + } + + /// Constructor + Drain_Constraints( + ACE_Time_Value const * timeout, + bool block_on_io) + : timeout_(timeout) + , block_on_io_(block_on_io) + { + } + + /** + * If true, then the ORB should block on I/O operations instead of + * using non-blocking I/O. + */ + bool block_on_io() const + { + return block_on_io_; + } + + /** + * The maximum time to block on I/O operations (or nested loops) based + * on the current timeout policies. + */ + ACE_Time_Value const * timeout() const + { + return timeout_; + } + + private: + ACE_Time_Value const * timeout_; + bool block_on_io_; + }; } } @@ -291,7 +365,7 @@ public: TAO_Wait_Strategy *wait_strategy (void) const; /// Callback method to reactively drain the outgoing data queue - int handle_output (ACE_Time_Value *max_wait_time); + int handle_output (TAO::Transport::Drain_Constraints const & c); /// Get the bidirectional flag int bidirectional_flag (void) const; @@ -412,7 +486,7 @@ public: virtual ssize_t send (iovec *iov, int iovcnt, size_t &bytes_transferred, - const ACE_Time_Value *timeout = 0) = 0; + ACE_Time_Value const * timeout) = 0; #if TAO_HAS_SENDFILE == 1 /// Send data through zero-copy write mechanism, if available. @@ -429,7 +503,7 @@ public: iovec * iov, int iovcnt, size_t &bytes_transferred, - ACE_Time_Value const * timeout = 0); + TAO::Transport::Drain_Constraints const & dc); #endif /* TAO_HAS_SENDFILE==1 */ @@ -709,7 +783,14 @@ public: ACE_Time_Value *max_wait_time, TAO_Stub* stub); - /// Send a message block chain, + /** + * This is a very specialized interface to send a simple chain of + * messages through the Transport. The only place we use this interface + * is in GIOP_Message_Base.cpp, to send error messages (i.e., an + * indication that we received a malformed GIOP message,) and to close + * the connection. + * + */ int send_message_block_chain (const ACE_Message_Block *message_block, size_t &bytes_transferred, ACE_Time_Value *max_wait_time = 0); @@ -717,7 +798,8 @@ public: /// Send a message block chain, assuming the lock is held int send_message_block_chain_i (const ACE_Message_Block *message_block, size_t &bytes_transferred, - ACE_Time_Value *max_wait_time); + TAO::Transport::Drain_Constraints const & dc); + /// Cache management int purge_entry (void); @@ -802,10 +884,10 @@ private: * Returns 0 if there is more data to send, -1 if there was an error * and 1 if the message was completely sent. */ - int drain_queue (ACE_Time_Value *max_wait_time); + int drain_queue (TAO::Transport::Drain_Constraints const & dc); /// Implement drain_queue() assuming the lock is held - int drain_queue_i (ACE_Time_Value *max_wait_time); + int drain_queue_i (TAO::Transport::Drain_Constraints const & dc); /// Check if there are messages pending in the queue /** @@ -817,7 +899,8 @@ private: bool queue_is_empty_i (void) const; /// A helper routine used in drain_queue_i() - int drain_queue_helper (int &iovcnt, iovec iov[], ACE_Time_Value *max_wait_time); + int drain_queue_helper (int &iovcnt, iovec iov[], + TAO::Transport::Drain_Constraints const & dc); /// These classes need privileged access to: /// - schedule_output_i() @@ -942,8 +1025,22 @@ private: * This function was introduced as part of the fixes for bug 3647. */ ACE_Time_Value const *io_timeout( - ACE_Time_Value const * operation_timeout) const; + TAO::Transport::Drain_Constraints const & dc) const; + /** + * Return true if blocking I/O should be used for sending synchronous + * (two-way, reliable oneways, etc.) messages. This is determined based + * on the current flushing and waiting strategies. + */ + bool using_blocking_io_for_synch_messages() const; + + /** + * Return true if blocking I/O should be used for sending asynchronous + * (AMI calls, non-blocking oneways, responses to operations, etc.) + * messages. This is determined based on the current flushing strategy. + */ + bool using_blocking_io_for_asynch_messages() const; + /* * Specialization hook to add concrete private methods from * TAO's protocol implementation onto the base Transport class diff --git a/TAO/tests/Bug_3647_Regression/Backend_Impl.cpp b/TAO/tests/Bug_3647_Regression/Backend_Impl.cpp index 2b8981d822c..435f8a505ee 100644 --- a/TAO/tests/Bug_3647_Regression/Backend_Impl.cpp +++ b/TAO/tests/Bug_3647_Regression/Backend_Impl.cpp @@ -54,6 +54,7 @@ freeze(CORBA::ULong seconds) "Backend_Impl::sleep(%P|%t) - finished after %d seconds\n", seconds)); } + shutdown(); } void Bug_3647_Regression::Backend_Impl:: diff --git a/TAO/tests/Bug_3647_Regression/Middle_Impl.cpp b/TAO/tests/Bug_3647_Regression/Middle_Impl.cpp index e90d156a9ba..dcd08d9c068 100644 --- a/TAO/tests/Bug_3647_Regression/Middle_Impl.cpp +++ b/TAO/tests/Bug_3647_Regression/Middle_Impl.cpp @@ -35,7 +35,7 @@ startup_test() "Middle_Impl::startup_test(%P|%t) - backend " "startup call sucessful\n")); } - backend_->freeze(5*timeout_); + backend_->freeze(10); if (verbose_) { ACE_DEBUG ((LM_INFO, diff --git a/TAO/tests/Bug_3647_Regression/client.cpp b/TAO/tests/Bug_3647_Regression/client.cpp index ec8f78fc69c..830ef6d3ab3 100644 --- a/TAO/tests/Bug_3647_Regression/client.cpp +++ b/TAO/tests/Bug_3647_Regression/client.cpp @@ -117,7 +117,7 @@ ACE_TMAIN(int argc, ACE_TCHAR *argv[]) ACE_DEBUG ((LM_DEBUG, "client(%P|%t) - test started up\n")); int const iterations = 10000; - int const interval = 10; + int const interval = iterations / 20; ACE_DEBUG ((LM_DEBUG, "client(%P|%t) - running pings")); for (int i = 0; i != iterations; ++i) { diff --git a/TAO/tests/Bug_3647_Regression/run_test.pl b/TAO/tests/Bug_3647_Regression/run_test.pl index 14d839d32a8..acac010b251 100755 --- a/TAO/tests/Bug_3647_Regression/run_test.pl +++ b/TAO/tests/Bug_3647_Regression/run_test.pl @@ -10,11 +10,26 @@ use PerlACE::TestTarget; use strict; my $verbose = ''; +my $mode = 'DELAYED'; foreach my $i (@ARGV) { - if ($i eq '-verbose') { - $verbose = ' -v'; - } + if ($i eq '-verbose') { + $verbose = ' -v'; + } elsif ($i eq '-none') { + $mode = 'NONE'; + } elsif ($i eq '-delayed') { + $mode = 'DELAYED'; + } elsif ($i eq '-transport') { + # In this mode, the test is *expected* to fail. We only run it + # like this to verify that the test is a good test (i.e. it + # detects failures.) Same comment applies for SERVER and TARGET + # modes. + $mode = 'TRANSPORT'; + } elsif ($i eq '-server') { + $mode = 'SERVER'; + } elsif ($i eq '-target') { + $mode = 'TARGET'; + } } my $backend = PerlACE::TestTarget::create_target(1) @@ -42,14 +57,15 @@ my $BE = " -o $backend_iorfile" . $verbose); my $MD = - $middle->CreateProcess ("/usr/bin/strace"," -o md.strace.txt ./middle_server". - " -s DELAYED -t 300 " + $middle->CreateProcess (#"/usr/bin/strace"," -o md.strace.txt ./middle_server". + "middle_server", + " -s $mode -t 5 " ." -o $middle_out_iorfile" . $verbose . " -k file://$middle_in_iorfile"); my $CL = $client->CreateProcess ("client", " -k file://$client_in_iorfile" - ." -t 600 " + ." -t 1 " .$verbose); my $be_status = $BE->Spawn (); if ($be_status != 0) { |