diff options
author | coryan <coryan@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 2001-04-20 22:34:09 +0000 |
---|---|---|
committer | coryan <coryan@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 2001-04-20 22:34:09 +0000 |
commit | edfdfd2cef09f2527596ea2ebbd7e39df8229194 (patch) | |
tree | 2226f2ebf127bf87dfd11a79927fa06e71a8ff21 | |
parent | 9ba5857bc5cef79818a0d1f1e183a30286a292ce (diff) | |
download | ATCD-edfdfd2cef09f2527596ea2ebbd7e39df8229194.tar.gz |
ChangeLogTag:Fri Apr 20 15:29:13 2001 Carlos O'Ryan <coryan@uci.edu>
-rw-r--r-- | TAO/ChangeLogs/ChangeLog-02a | 20 | ||||
-rw-r--r-- | TAO/tao/Makefile | 3 | ||||
-rw-r--r-- | TAO/tao/Makefile.bor | 1 | ||||
-rw-r--r-- | TAO/tao/Queued_Message.h | 5 | ||||
-rw-r--r-- | TAO/tao/Transport.cpp | 74 | ||||
-rw-r--r-- | TAO/tao/Transport.h | 36 | ||||
-rw-r--r-- | TAO/tao/Transport.inl | 14 | ||||
-rw-r--r-- | TAO/tao/Transport_Timer.cpp | 19 | ||||
-rw-r--r-- | TAO/tao/Transport_Timer.h | 54 | ||||
-rw-r--r-- | TAO/tests/Oneway_Buffering/client.cpp | 136 | ||||
-rwxr-xr-x | TAO/tests/Oneway_Buffering/run_test.pl | 2 | ||||
-rwxr-xr-x | TAO/tests/Oneway_Buffering/run_timeout_reactive.pl | 65 |
12 files changed, 413 insertions, 16 deletions
diff --git a/TAO/ChangeLogs/ChangeLog-02a b/TAO/ChangeLogs/ChangeLog-02a index 978de868de0..777033f1620 100644 --- a/TAO/ChangeLogs/ChangeLog-02a +++ b/TAO/ChangeLogs/ChangeLog-02a @@ -1,3 +1,23 @@ +Fri Apr 20 15:29:13 2001 Carlos O'Ryan <coryan@uci.edu> + + * tao/Makefile: + * tao/Makefile.bor: + * tao/Queued_Message.h: + * tao/Transport.h: + * tao/Transport.inl: + * tao/Transport.cpp: + * tao/Transport_Timer.h: + * tao/Transport_Timer.cpp: + Add support for timeout notifications from the Reactor. This is + used to implement the TIMEOUT buffering constraints when the + application does not send events continously. + + * tests/Oneway_Buffering/client.cpp: + * tests/Oneway_Buffering/run_test.pl: + * tests/Oneway_Buffering/run_timeout_reactive.pl: + Add new regression test to verify that the TIMEOUT buffering + constraints work. + Thu Apr 19 14:17:38 2001 Carlos O'Ryan <coryan@uci.edu> * tao/Transport.h: diff --git a/TAO/tao/Makefile b/TAO/tao/Makefile index dd2c77e6e88..7c489d410ba 100644 --- a/TAO/tao/Makefile +++ b/TAO/tao/Makefile @@ -244,7 +244,8 @@ ORB_CORE_FILES = \ Reactive_Flushing_Strategy \ Queued_Message \ Synch_Queued_Message \ - Asynch_Queued_Message + Asynch_Queued_Message \ + Transport_Timer DYNAMIC_ANY_FILES = diff --git a/TAO/tao/Makefile.bor b/TAO/tao/Makefile.bor index d01942fd890..a00a2f9d06c 100644 --- a/TAO/tao/Makefile.bor +++ b/TAO/tao/Makefile.bor @@ -132,6 +132,7 @@ OBJFILES = \ $(OBJDIR)\Parser_Registry.obj \ $(OBJDIR)\Pluggable.obj \ $(OBJDIR)\Transport.obj \ + $(OBJDIR)\Transport_Timer.obj \ $(OBJDIR)\Pluggable_Messaging.obj \ $(OBJDIR)\Pluggable_Messaging_Utils.obj \ $(OBJDIR)\Policy_Manager.obj \ diff --git a/TAO/tao/Queued_Message.h b/TAO/tao/Queued_Message.h index 2ae2d67a25a..11eb9861bfc 100644 --- a/TAO/tao/Queued_Message.h +++ b/TAO/tao/Queued_Message.h @@ -64,11 +64,6 @@ class TAO_Export TAO_Queued_Message { public: /// Constructor - /** - * @param callback A callback interface to signal any waiting - * threads about the status of the message. It is null if there are - * no waiting threads. - */ TAO_Queued_Message (void); /// Destructor diff --git a/TAO/tao/Transport.cpp b/TAO/tao/Transport.cpp index 08772832e6f..7741107b286 100644 --- a/TAO/tao/Transport.cpp +++ b/TAO/tao/Transport.cpp @@ -65,6 +65,8 @@ TAO_Transport::TAO_Transport (CORBA::ULong tag, , head_ (0) , tail_ (0) , current_deadline_ (ACE_Time_Value::zero) + , flush_timer_id_ (-1) + , transport_timer_ (this) , id_ ((long) this) { TAO_Client_Strategy_Factory *cf = @@ -138,7 +140,25 @@ TAO_Transport::handle_output () flushing_strategy->cancel_output (this); - this->current_deadline_ = ACE_Time_Value::zero; + if (this->flush_timer_id_ != -1) + { + ACE_MT (ACE_GUARD_RETURN (ACE_Lock, + guard, + *this->handler_lock_, + -1)); + + ACE_Event_Handler *eh = this->event_handler_i (); + if (eh != 0) + { + ACE_Reactor *reactor = eh->reactor (); + if (reactor != 0) + { + (void) reactor->cancel_timer (this->flush_timer_id_); + } + } + this->current_deadline_ = ACE_Time_Value::zero; + this->flush_timer_id_ = -1; + } return 0; } @@ -849,6 +869,27 @@ TAO_Transport::cancel_output (void) } int +TAO_Transport::handle_timeout (const ACE_Time_Value & /* current_time */, + const void *act) +{ + /// This is the only legal ACT in the current configuration.... + if (act != &this->current_deadline_) + return -1; + + if (this->flush_timer_pending ()) + { + // The timer is always a oneshot timer, so mark is as not + // pending. + this->reset_flush_timer (); + + TAO_Flushing_Strategy *flushing_strategy = + this->orb_core ()->flushing_strategy (); + (void) flushing_strategy->schedule_output (this); + } + return 0; +} + +int TAO_Transport::drain_queue (void) { ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, ace_mon, this->queue_mutex_, -1); @@ -1043,13 +1084,34 @@ TAO_Transport::check_buffering_constraints_i (TAO_Stub *stub, set_timer, new_deadline); - // ... check if we need to set a timer ... + // ... set the new timer, also cancel any previous timers ... if (set_timer) { - this->current_deadline_ = new_deadline; - // @@ We need to schedule the timer. We should also be - // careful not to schedule one if there is one scheduled - // already. + ACE_MT (ACE_GUARD_RETURN (ACE_Lock, + guard, + *this->handler_lock_, + -1)); + + ACE_Event_Handler *eh = this->event_handler_i (); + if (eh != 0) + { + ACE_Reactor *reactor = eh->reactor (); + if (reactor != 0) + { + this->current_deadline_ = new_deadline; + ACE_Time_Value delay = + new_deadline - ACE_OS::gettimeofday (); + + if (this->flush_timer_pending ()) + { + (void) reactor->cancel_timer (this->flush_timer_id_); + } + this->flush_timer_id_ = + reactor->schedule_timer (&this->transport_timer_, + &this->current_deadline_, + delay); + } + } } return constraints_reached; diff --git a/TAO/tao/Transport.h b/TAO/tao/Transport.h index 51b6c010b43..1ed82676483 100644 --- a/TAO/tao/Transport.h +++ b/TAO/tao/Transport.h @@ -22,6 +22,7 @@ #include "Exception.h" #include "Transport_Descriptor_Interface.h" #include "Transport_Cache_Manager.h" +#include "Transport_Timer.h" #include "ace/Strategies.h" #if !defined (ACE_LACKS_PRAGMA_ONCE) @@ -616,6 +617,26 @@ public: /// Cancel handle_output() callbacks int cancel_output (void); + /// The timeout callback, invoked when any of the timers related to + /// this transport expire. + /** + * @param current_time The current time as reported from the Reactor + * @param act The Asynchronous Completion Token. Currently it is + * interpreted as follows: + * - If the ACT is the address of this->current_deadline_ the + * queueing timeout has expired and the queue should start + * flushing. + * + * @return Returns 0 if there are no problems, -1 if there is an + * error + * + * @todo In the future this function could be used to expire + * messages (oneways) that have been sitting for too long on + * the queue. + */ + int handle_timeout (const ACE_Time_Value ¤t_time, + const void* act); + private: /// Send some of the data in the queue. /** @@ -641,7 +662,7 @@ private: /// This class needs special access to drain_queue_i() and /// queue_is_empty_i() - friend class TAO_Block_Flushing_Strategy; + friend class TAO_Block_Flushing_Strategy; /// A helper routine used in drain_queue_i() int drain_queue_helper (int &iovcnt, iovec iov[]); @@ -666,6 +687,13 @@ private: int send_synchronous_message_i (const ACE_Message_Block *message_block, ACE_Time_Value *max_wait_time); + /// Check if the flush timer is still pending + int flush_timer_pending (void) const; + + /// The flush timer expired or was explicitly cancelled, mark it as + /// not pending + void reset_flush_timer (void); + /// Prohibited ACE_UNIMPLEMENTED_FUNC (TAO_Transport (const TAO_Transport&)) ACE_UNIMPLEMENTED_FUNC (void operator= (const TAO_Transport&)) @@ -720,6 +748,12 @@ protected: /// *if* the deadline is ACE_Time_Value current_deadline_; + /// The timer ID + long flush_timer_id_; + + /// The adapter used to receive timeout callbacks from the Reactor + TAO_Transport_Timer transport_timer_; + /// Lock that insures that activities that *might* use handler-related /// resources (such as a connection handler) get serialized. /** diff --git a/TAO/tao/Transport.inl b/TAO/tao/Transport.inl index ccace134399..e35d8836b3b 100644 --- a/TAO/tao/Transport.inl +++ b/TAO/tao/Transport.inl @@ -50,3 +50,17 @@ TAO_Transport::cache_map_entry ( { this->cache_map_entry_ = entry; } + +ACE_INLINE int +TAO_Transport::flush_timer_pending (void) const +{ + return this->flush_timer_id_ != -1; +} + +ACE_INLINE void +TAO_Transport::reset_flush_timer (void) +{ + this->flush_timer_id_ = -1; + this->current_deadline_ = ACE_Time_Value::zero; +} + diff --git a/TAO/tao/Transport_Timer.cpp b/TAO/tao/Transport_Timer.cpp new file mode 100644 index 00000000000..709a98414c8 --- /dev/null +++ b/TAO/tao/Transport_Timer.cpp @@ -0,0 +1,19 @@ +// -*- C++ -*- +// $Id$ + +#include "Transport_Timer.h" +#include "Transport.h" + +ACE_RCSID(tao, Transport_Timer, "$Id$") + +TAO_Transport_Timer::TAO_Transport_Timer (TAO_Transport *transport) + : transport_ (transport) +{ +} + +int +TAO_Transport_Timer::handle_timeout (const ACE_Time_Value ¤t_time, + const void *act) +{ + return this->transport_->handle_timeout (current_time, act); +} diff --git a/TAO/tao/Transport_Timer.h b/TAO/tao/Transport_Timer.h new file mode 100644 index 00000000000..e1e2d7b070c --- /dev/null +++ b/TAO/tao/Transport_Timer.h @@ -0,0 +1,54 @@ +// -*- C++ -*- + +//============================================================================= +/** + * @file Transport_Timer.h + * + * $Id$ + * + * @author Carlos O'Ryan <coryan@uci.edu> + */ +//============================================================================= + +#ifndef TAO_TRANSPORT_TIMER_H +#define TAO_TRANSPORT_TIMER_H +#include "ace/pre.h" + +#include "TAO_Export.h" +#include "ace/Event_Handler.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +class TAO_Transport; + +/** + * @class TAO_Transport_Timer + * + * @brief Allows TAO_Transport instances to receive timeout + * notifications from the Reactor. In other words, implements + * the Adapter Role, of the Adapter Pattern, where the Adaptee + * is a TAO_Transport and the client is the Reactor. + * + */ +class TAO_Export TAO_Transport_Timer : public ACE_Event_Handler +{ +public: + /// Constructor + /** + * @param transport The adaptee + */ + TAO_Transport_Timer (TAO_Transport *transport); + + /// Receive timeout events from the Reactor and forward them to the + /// TAO_Transport + virtual int handle_timeout (const ACE_Time_Value ¤t_time, + const void *act); +private: + /// The Adaptee + TAO_Transport *transport_; +}; + +#include "ace/post.h" +#endif /* TAO_TRANSPORT_TIMER_H */ diff --git a/TAO/tests/Oneway_Buffering/client.cpp b/TAO/tests/Oneway_Buffering/client.cpp index c578cee3681..38b154f4ad7 100644 --- a/TAO/tests/Oneway_Buffering/client.cpp +++ b/TAO/tests/Oneway_Buffering/client.cpp @@ -13,6 +13,7 @@ int iterations = 200; int run_message_count_test = 0; int run_timeout_test = 0; +int run_timeout_reactive_test = 0; int run_buffer_size_test = 0; const int PAYLOAD_LENGTH = 1024; @@ -29,7 +30,7 @@ const double GIOP_OVERHEAD = 0.9; int parse_args (int argc, char *argv[]) { - ACE_Get_Opt get_opts (argc, argv, "k:a:i:ctb"); + ACE_Get_Opt get_opts (argc, argv, "k:a:i:ctbr"); int c; while ((c = get_opts ()) != -1) @@ -59,6 +60,10 @@ parse_args (int argc, char *argv[]) run_buffer_size_test = 1; break; + case 'r': + run_timeout_reactive_test = 1; + break; + case '?': default: ACE_ERROR_RETURN ((LM_ERROR, @@ -66,7 +71,7 @@ parse_args (int argc, char *argv[]) "-k <server_ior> " "-a <admin_ior> " "-i <iterations> " - "<-c|-t|-b> " + "<-c|-t|-b|-r> " "\n", argv [0]), -1); @@ -87,6 +92,12 @@ run_timeout (CORBA::ORB_ptr orb, CORBA::Environment &ACE_TRY_ENV); int +run_timeout_reactive (CORBA::ORB_ptr orb, + Test::Oneway_Buffering_ptr oneway_buffering, + Test::Oneway_Buffering_Admin_ptr oneway_buffering_admin, + CORBA::Environment &ACE_TRY_ENV); + +int run_buffer_size (CORBA::ORB_ptr orb, Test::Oneway_Buffering_ptr oneway_buffering, Test::Oneway_Buffering_Admin_ptr oneway_buffering_admin, @@ -159,6 +170,17 @@ main (int argc, char *argv[]) ACE_TRY_ENV); ACE_TRY_CHECK; } + else if (run_timeout_reactive_test) + { + ACE_DEBUG ((LM_DEBUG, + "Running timeout (reactive) flushing test\n")); + test_failed = + run_timeout_reactive (orb.in (), + oneway_buffering.in (), + oneway_buffering_admin.in (), + ACE_TRY_ENV); + ACE_TRY_CHECK; + } else if (run_buffer_size_test) { ACE_DEBUG ((LM_DEBUG, @@ -535,6 +557,116 @@ run_timeout (CORBA::ORB_ptr orb, } int +run_timeout_reactive (CORBA::ORB_ptr orb, + Test::Oneway_Buffering_ptr oneway_buffering, + Test::Oneway_Buffering_Admin_ptr oneway_buffering_admin, + CORBA::Environment &ACE_TRY_ENV) +{ + TAO::BufferingConstraint buffering_constraint; + buffering_constraint.mode = TAO::BUFFER_TIMEOUT; + buffering_constraint.message_count = 0; + buffering_constraint.message_bytes = 0; + buffering_constraint.timeout = TIMEOUT_MILLISECONDS * 10000; + + Test::Oneway_Buffering_var flusher; + int test_failed = + configure_policies (orb, buffering_constraint, + oneway_buffering, flusher.out (), + ACE_TRY_ENV); + ACE_CHECK_RETURN (-1); + + if (test_failed != 0) + return test_failed; + + Test::Payload payload (PAYLOAD_LENGTH); + payload.length (PAYLOAD_LENGTH); + for (int j = 0; j != PAYLOAD_LENGTH; ++j) + payload[j] = CORBA::Octet(j % 256); + + CORBA::ULong send_count = 0; + for (int i = 0; i != iterations; ++i) + { + // Get back in sync with the server... + flusher->flush (ACE_TRY_ENV); + ACE_CHECK_RETURN (-1); + flusher->sync (ACE_TRY_ENV); + ACE_CHECK_RETURN (-1); + + CORBA::ULong initial_receive_count = + oneway_buffering_admin->request_count (ACE_TRY_ENV); + ACE_CHECK_RETURN (-1); + + if (initial_receive_count != send_count) + { + test_failed = 1; + ACE_ERROR ((LM_ERROR, + "ERROR: Iteration %d message lost (%u != %u)\n", + i, initial_receive_count, send_count)); + } + + ACE_Time_Value start = ACE_OS::gettimeofday (); + for (int j = 0; j != 20; ++j) + { + oneway_buffering->receive_data (payload, ACE_TRY_ENV); + ACE_CHECK_RETURN (-1); + send_count++; + } + while (1) + { + CORBA::ULong receive_count = + oneway_buffering_admin->request_count (ACE_TRY_ENV); + ACE_CHECK_RETURN (-1); + + ACE_Time_Value sleep (0, 10000); + orb->run (sleep, ACE_TRY_ENV); + ACE_CHECK_RETURN (-1); + + ACE_Time_Value elapsed = ACE_OS::gettimeofday () - start; + if (receive_count != initial_receive_count) + { + if (elapsed.msec () < TIMEOUT_MILLISECONDS) + { + test_failed = 1; + ACE_ERROR ((LM_ERROR, + "ERROR: Iteration %d flush before " + "timeout expired. " + "Elapsed = %d, Timeout = %d msecs\n", + i, + elapsed.msec (), TIMEOUT_MILLISECONDS)); + } + // terminate the while loop. + break; + } + + if (elapsed.msec () > 2 * TIMEOUT_MILLISECONDS) + { + test_failed = 1; + ACE_ERROR ((LM_ERROR, + "ERROR: Iteration %d no flush past " + "timeout threshold. " + "Elapsed = %d, Timeout = %d msecs\n", + i, + elapsed.msec (), TIMEOUT_MILLISECONDS)); + break; + } + } + } + + int progress_test_failed = + run_progress_test (oneway_buffering, + flusher.in (), + oneway_buffering_admin, + ACE_TRY_ENV); + ACE_CHECK_RETURN (-1); + + if (progress_test_failed) + test_failed = 1; + + + return test_failed; +} + +int run_buffer_size (CORBA::ORB_ptr orb, Test::Oneway_Buffering_ptr oneway_buffering, Test::Oneway_Buffering_Admin_ptr oneway_buffering_admin, diff --git a/TAO/tests/Oneway_Buffering/run_test.pl b/TAO/tests/Oneway_Buffering/run_test.pl index 30593090375..99a64dd9dd5 100755 --- a/TAO/tests/Oneway_Buffering/run_test.pl +++ b/TAO/tests/Oneway_Buffering/run_test.pl @@ -11,7 +11,7 @@ use PerlACE::Run_Test; $admin_iorfile = PerlACE::LocalFile ("admin.ior"); $server_iorfile = PerlACE::LocalFile ("server.ior"); -foreach $test_type ("-c", "-t", "-b") { +foreach $test_type ("-c", "-t", "-b", "-r") { unlink $admin_iorfile; unlink $server_iorfile; diff --git a/TAO/tests/Oneway_Buffering/run_timeout_reactive.pl b/TAO/tests/Oneway_Buffering/run_timeout_reactive.pl new file mode 100755 index 00000000000..31f09190b88 --- /dev/null +++ b/TAO/tests/Oneway_Buffering/run_timeout_reactive.pl @@ -0,0 +1,65 @@ +eval '(exit $?0)' && eval 'exec perl -S $0 ${1+"$@"}' + & eval 'exec perl -S $0 $argv:q' + if 0; + +# $Id$ +# -*- perl -*- + +use lib '../../../bin'; +use PerlACE::Run_Test; + +$admin_iorfile = PerlACE::LocalFile ("admin.ior"); +$server_iorfile = PerlACE::LocalFile ("server.ior"); + +unlink $admin_iorfile; +unlink $server_iorfile; + +my $AD = new PerlACE::Process ("admin", "-o $admin_iorfile"); +my $SV = new PerlACE::Process ("server", "-o $server_iorfile"); +my $CL = new PerlACE::Process ("client", + " -k file://$server_iorfile " + ."-a file://$admin_iorfile " + ."-r "); + +$AD->Spawn (); + +if (PerlACE::waitforfile_timed ($admin_iorfile, 10) == -1) { + print STDERR "ERROR: cannot find file <$admin_iorfile>\n"; + $AD->Kill (); $AD->TimedWait (1); + exit 1; +} + +$SV->Spawn (); + +if (PerlACE::waitforfile_timed ($server_iorfile, 10) == -1) { + print STDERR "ERROR: cannot find file <$server_iorfile>\n"; + $AD->Kill (); $AD->TimedWait (1); + $SV->Kill (); $SV->TimedWait (1); + exit 1; +} + +$client = $CL->SpawnWaitKill (300); + +if ($client != 0) { + print STDERR "ERROR: client returned $client\n"; + $status = 1; +} + +$server = $SV->WaitKill (10); + +if ($server != 0) { + print STDERR "ERROR: server returned $server\n"; + $status = 1; +} + +$admin = $AD->WaitKill (10); + +if ($admin != 0) { + print STDERR "ERROR: admin returned $admin\n"; + $status = 1; +} + +unlink $server_iorfile; +unlink $admin_iorfile; + +exit $status; |