summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorcoryan <coryan@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>2001-04-20 22:34:09 +0000
committercoryan <coryan@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>2001-04-20 22:34:09 +0000
commitedfdfd2cef09f2527596ea2ebbd7e39df8229194 (patch)
tree2226f2ebf127bf87dfd11a79927fa06e71a8ff21
parent9ba5857bc5cef79818a0d1f1e183a30286a292ce (diff)
downloadATCD-edfdfd2cef09f2527596ea2ebbd7e39df8229194.tar.gz
ChangeLogTag:Fri Apr 20 15:29:13 2001 Carlos O'Ryan <coryan@uci.edu>
-rw-r--r--TAO/ChangeLogs/ChangeLog-02a20
-rw-r--r--TAO/tao/Makefile3
-rw-r--r--TAO/tao/Makefile.bor1
-rw-r--r--TAO/tao/Queued_Message.h5
-rw-r--r--TAO/tao/Transport.cpp74
-rw-r--r--TAO/tao/Transport.h36
-rw-r--r--TAO/tao/Transport.inl14
-rw-r--r--TAO/tao/Transport_Timer.cpp19
-rw-r--r--TAO/tao/Transport_Timer.h54
-rw-r--r--TAO/tests/Oneway_Buffering/client.cpp136
-rwxr-xr-xTAO/tests/Oneway_Buffering/run_test.pl2
-rwxr-xr-xTAO/tests/Oneway_Buffering/run_timeout_reactive.pl65
12 files changed, 413 insertions, 16 deletions
diff --git a/TAO/ChangeLogs/ChangeLog-02a b/TAO/ChangeLogs/ChangeLog-02a
index 978de868de0..777033f1620 100644
--- a/TAO/ChangeLogs/ChangeLog-02a
+++ b/TAO/ChangeLogs/ChangeLog-02a
@@ -1,3 +1,23 @@
+Fri Apr 20 15:29:13 2001 Carlos O'Ryan <coryan@uci.edu>
+
+ * tao/Makefile:
+ * tao/Makefile.bor:
+ * tao/Queued_Message.h:
+ * tao/Transport.h:
+ * tao/Transport.inl:
+ * tao/Transport.cpp:
+ * tao/Transport_Timer.h:
+ * tao/Transport_Timer.cpp:
+ Add support for timeout notifications from the Reactor. This is
+ used to implement the TIMEOUT buffering constraints when the
+ application does not send events continously.
+
+ * tests/Oneway_Buffering/client.cpp:
+ * tests/Oneway_Buffering/run_test.pl:
+ * tests/Oneway_Buffering/run_timeout_reactive.pl:
+ Add new regression test to verify that the TIMEOUT buffering
+ constraints work.
+
Thu Apr 19 14:17:38 2001 Carlos O'Ryan <coryan@uci.edu>
* tao/Transport.h:
diff --git a/TAO/tao/Makefile b/TAO/tao/Makefile
index dd2c77e6e88..7c489d410ba 100644
--- a/TAO/tao/Makefile
+++ b/TAO/tao/Makefile
@@ -244,7 +244,8 @@ ORB_CORE_FILES = \
Reactive_Flushing_Strategy \
Queued_Message \
Synch_Queued_Message \
- Asynch_Queued_Message
+ Asynch_Queued_Message \
+ Transport_Timer
DYNAMIC_ANY_FILES =
diff --git a/TAO/tao/Makefile.bor b/TAO/tao/Makefile.bor
index d01942fd890..a00a2f9d06c 100644
--- a/TAO/tao/Makefile.bor
+++ b/TAO/tao/Makefile.bor
@@ -132,6 +132,7 @@ OBJFILES = \
$(OBJDIR)\Parser_Registry.obj \
$(OBJDIR)\Pluggable.obj \
$(OBJDIR)\Transport.obj \
+ $(OBJDIR)\Transport_Timer.obj \
$(OBJDIR)\Pluggable_Messaging.obj \
$(OBJDIR)\Pluggable_Messaging_Utils.obj \
$(OBJDIR)\Policy_Manager.obj \
diff --git a/TAO/tao/Queued_Message.h b/TAO/tao/Queued_Message.h
index 2ae2d67a25a..11eb9861bfc 100644
--- a/TAO/tao/Queued_Message.h
+++ b/TAO/tao/Queued_Message.h
@@ -64,11 +64,6 @@ class TAO_Export TAO_Queued_Message
{
public:
/// Constructor
- /**
- * @param callback A callback interface to signal any waiting
- * threads about the status of the message. It is null if there are
- * no waiting threads.
- */
TAO_Queued_Message (void);
/// Destructor
diff --git a/TAO/tao/Transport.cpp b/TAO/tao/Transport.cpp
index 08772832e6f..7741107b286 100644
--- a/TAO/tao/Transport.cpp
+++ b/TAO/tao/Transport.cpp
@@ -65,6 +65,8 @@ TAO_Transport::TAO_Transport (CORBA::ULong tag,
, head_ (0)
, tail_ (0)
, current_deadline_ (ACE_Time_Value::zero)
+ , flush_timer_id_ (-1)
+ , transport_timer_ (this)
, id_ ((long) this)
{
TAO_Client_Strategy_Factory *cf =
@@ -138,7 +140,25 @@ TAO_Transport::handle_output ()
flushing_strategy->cancel_output (this);
- this->current_deadline_ = ACE_Time_Value::zero;
+ if (this->flush_timer_id_ != -1)
+ {
+ ACE_MT (ACE_GUARD_RETURN (ACE_Lock,
+ guard,
+ *this->handler_lock_,
+ -1));
+
+ ACE_Event_Handler *eh = this->event_handler_i ();
+ if (eh != 0)
+ {
+ ACE_Reactor *reactor = eh->reactor ();
+ if (reactor != 0)
+ {
+ (void) reactor->cancel_timer (this->flush_timer_id_);
+ }
+ }
+ this->current_deadline_ = ACE_Time_Value::zero;
+ this->flush_timer_id_ = -1;
+ }
return 0;
}
@@ -849,6 +869,27 @@ TAO_Transport::cancel_output (void)
}
int
+TAO_Transport::handle_timeout (const ACE_Time_Value & /* current_time */,
+ const void *act)
+{
+ /// This is the only legal ACT in the current configuration....
+ if (act != &this->current_deadline_)
+ return -1;
+
+ if (this->flush_timer_pending ())
+ {
+ // The timer is always a oneshot timer, so mark is as not
+ // pending.
+ this->reset_flush_timer ();
+
+ TAO_Flushing_Strategy *flushing_strategy =
+ this->orb_core ()->flushing_strategy ();
+ (void) flushing_strategy->schedule_output (this);
+ }
+ return 0;
+}
+
+int
TAO_Transport::drain_queue (void)
{
ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, ace_mon, this->queue_mutex_, -1);
@@ -1043,13 +1084,34 @@ TAO_Transport::check_buffering_constraints_i (TAO_Stub *stub,
set_timer,
new_deadline);
- // ... check if we need to set a timer ...
+ // ... set the new timer, also cancel any previous timers ...
if (set_timer)
{
- this->current_deadline_ = new_deadline;
- // @@ We need to schedule the timer. We should also be
- // careful not to schedule one if there is one scheduled
- // already.
+ ACE_MT (ACE_GUARD_RETURN (ACE_Lock,
+ guard,
+ *this->handler_lock_,
+ -1));
+
+ ACE_Event_Handler *eh = this->event_handler_i ();
+ if (eh != 0)
+ {
+ ACE_Reactor *reactor = eh->reactor ();
+ if (reactor != 0)
+ {
+ this->current_deadline_ = new_deadline;
+ ACE_Time_Value delay =
+ new_deadline - ACE_OS::gettimeofday ();
+
+ if (this->flush_timer_pending ())
+ {
+ (void) reactor->cancel_timer (this->flush_timer_id_);
+ }
+ this->flush_timer_id_ =
+ reactor->schedule_timer (&this->transport_timer_,
+ &this->current_deadline_,
+ delay);
+ }
+ }
}
return constraints_reached;
diff --git a/TAO/tao/Transport.h b/TAO/tao/Transport.h
index 51b6c010b43..1ed82676483 100644
--- a/TAO/tao/Transport.h
+++ b/TAO/tao/Transport.h
@@ -22,6 +22,7 @@
#include "Exception.h"
#include "Transport_Descriptor_Interface.h"
#include "Transport_Cache_Manager.h"
+#include "Transport_Timer.h"
#include "ace/Strategies.h"
#if !defined (ACE_LACKS_PRAGMA_ONCE)
@@ -616,6 +617,26 @@ public:
/// Cancel handle_output() callbacks
int cancel_output (void);
+ /// The timeout callback, invoked when any of the timers related to
+ /// this transport expire.
+ /**
+ * @param current_time The current time as reported from the Reactor
+ * @param act The Asynchronous Completion Token. Currently it is
+ * interpreted as follows:
+ * - If the ACT is the address of this->current_deadline_ the
+ * queueing timeout has expired and the queue should start
+ * flushing.
+ *
+ * @return Returns 0 if there are no problems, -1 if there is an
+ * error
+ *
+ * @todo In the future this function could be used to expire
+ * messages (oneways) that have been sitting for too long on
+ * the queue.
+ */
+ int handle_timeout (const ACE_Time_Value &current_time,
+ const void* act);
+
private:
/// Send some of the data in the queue.
/**
@@ -641,7 +662,7 @@ private:
/// This class needs special access to drain_queue_i() and
/// queue_is_empty_i()
- friend class TAO_Block_Flushing_Strategy;
+ friend class TAO_Block_Flushing_Strategy;
/// A helper routine used in drain_queue_i()
int drain_queue_helper (int &iovcnt, iovec iov[]);
@@ -666,6 +687,13 @@ private:
int send_synchronous_message_i (const ACE_Message_Block *message_block,
ACE_Time_Value *max_wait_time);
+ /// Check if the flush timer is still pending
+ int flush_timer_pending (void) const;
+
+ /// The flush timer expired or was explicitly cancelled, mark it as
+ /// not pending
+ void reset_flush_timer (void);
+
/// Prohibited
ACE_UNIMPLEMENTED_FUNC (TAO_Transport (const TAO_Transport&))
ACE_UNIMPLEMENTED_FUNC (void operator= (const TAO_Transport&))
@@ -720,6 +748,12 @@ protected:
/// *if* the deadline is
ACE_Time_Value current_deadline_;
+ /// The timer ID
+ long flush_timer_id_;
+
+ /// The adapter used to receive timeout callbacks from the Reactor
+ TAO_Transport_Timer transport_timer_;
+
/// Lock that insures that activities that *might* use handler-related
/// resources (such as a connection handler) get serialized.
/**
diff --git a/TAO/tao/Transport.inl b/TAO/tao/Transport.inl
index ccace134399..e35d8836b3b 100644
--- a/TAO/tao/Transport.inl
+++ b/TAO/tao/Transport.inl
@@ -50,3 +50,17 @@ TAO_Transport::cache_map_entry (
{
this->cache_map_entry_ = entry;
}
+
+ACE_INLINE int
+TAO_Transport::flush_timer_pending (void) const
+{
+ return this->flush_timer_id_ != -1;
+}
+
+ACE_INLINE void
+TAO_Transport::reset_flush_timer (void)
+{
+ this->flush_timer_id_ = -1;
+ this->current_deadline_ = ACE_Time_Value::zero;
+}
+
diff --git a/TAO/tao/Transport_Timer.cpp b/TAO/tao/Transport_Timer.cpp
new file mode 100644
index 00000000000..709a98414c8
--- /dev/null
+++ b/TAO/tao/Transport_Timer.cpp
@@ -0,0 +1,19 @@
+// -*- C++ -*-
+// $Id$
+
+#include "Transport_Timer.h"
+#include "Transport.h"
+
+ACE_RCSID(tao, Transport_Timer, "$Id$")
+
+TAO_Transport_Timer::TAO_Transport_Timer (TAO_Transport *transport)
+ : transport_ (transport)
+{
+}
+
+int
+TAO_Transport_Timer::handle_timeout (const ACE_Time_Value &current_time,
+ const void *act)
+{
+ return this->transport_->handle_timeout (current_time, act);
+}
diff --git a/TAO/tao/Transport_Timer.h b/TAO/tao/Transport_Timer.h
new file mode 100644
index 00000000000..e1e2d7b070c
--- /dev/null
+++ b/TAO/tao/Transport_Timer.h
@@ -0,0 +1,54 @@
+// -*- C++ -*-
+
+//=============================================================================
+/**
+ * @file Transport_Timer.h
+ *
+ * $Id$
+ *
+ * @author Carlos O'Ryan <coryan@uci.edu>
+ */
+//=============================================================================
+
+#ifndef TAO_TRANSPORT_TIMER_H
+#define TAO_TRANSPORT_TIMER_H
+#include "ace/pre.h"
+
+#include "TAO_Export.h"
+#include "ace/Event_Handler.h"
+
+#if !defined (ACE_LACKS_PRAGMA_ONCE)
+# pragma once
+#endif /* ACE_LACKS_PRAGMA_ONCE */
+
+class TAO_Transport;
+
+/**
+ * @class TAO_Transport_Timer
+ *
+ * @brief Allows TAO_Transport instances to receive timeout
+ * notifications from the Reactor. In other words, implements
+ * the Adapter Role, of the Adapter Pattern, where the Adaptee
+ * is a TAO_Transport and the client is the Reactor.
+ *
+ */
+class TAO_Export TAO_Transport_Timer : public ACE_Event_Handler
+{
+public:
+ /// Constructor
+ /**
+ * @param transport The adaptee
+ */
+ TAO_Transport_Timer (TAO_Transport *transport);
+
+ /// Receive timeout events from the Reactor and forward them to the
+ /// TAO_Transport
+ virtual int handle_timeout (const ACE_Time_Value &current_time,
+ const void *act);
+private:
+ /// The Adaptee
+ TAO_Transport *transport_;
+};
+
+#include "ace/post.h"
+#endif /* TAO_TRANSPORT_TIMER_H */
diff --git a/TAO/tests/Oneway_Buffering/client.cpp b/TAO/tests/Oneway_Buffering/client.cpp
index c578cee3681..38b154f4ad7 100644
--- a/TAO/tests/Oneway_Buffering/client.cpp
+++ b/TAO/tests/Oneway_Buffering/client.cpp
@@ -13,6 +13,7 @@ int iterations = 200;
int run_message_count_test = 0;
int run_timeout_test = 0;
+int run_timeout_reactive_test = 0;
int run_buffer_size_test = 0;
const int PAYLOAD_LENGTH = 1024;
@@ -29,7 +30,7 @@ const double GIOP_OVERHEAD = 0.9;
int
parse_args (int argc, char *argv[])
{
- ACE_Get_Opt get_opts (argc, argv, "k:a:i:ctb");
+ ACE_Get_Opt get_opts (argc, argv, "k:a:i:ctbr");
int c;
while ((c = get_opts ()) != -1)
@@ -59,6 +60,10 @@ parse_args (int argc, char *argv[])
run_buffer_size_test = 1;
break;
+ case 'r':
+ run_timeout_reactive_test = 1;
+ break;
+
case '?':
default:
ACE_ERROR_RETURN ((LM_ERROR,
@@ -66,7 +71,7 @@ parse_args (int argc, char *argv[])
"-k <server_ior> "
"-a <admin_ior> "
"-i <iterations> "
- "<-c|-t|-b> "
+ "<-c|-t|-b|-r> "
"\n",
argv [0]),
-1);
@@ -87,6 +92,12 @@ run_timeout (CORBA::ORB_ptr orb,
CORBA::Environment &ACE_TRY_ENV);
int
+run_timeout_reactive (CORBA::ORB_ptr orb,
+ Test::Oneway_Buffering_ptr oneway_buffering,
+ Test::Oneway_Buffering_Admin_ptr oneway_buffering_admin,
+ CORBA::Environment &ACE_TRY_ENV);
+
+int
run_buffer_size (CORBA::ORB_ptr orb,
Test::Oneway_Buffering_ptr oneway_buffering,
Test::Oneway_Buffering_Admin_ptr oneway_buffering_admin,
@@ -159,6 +170,17 @@ main (int argc, char *argv[])
ACE_TRY_ENV);
ACE_TRY_CHECK;
}
+ else if (run_timeout_reactive_test)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "Running timeout (reactive) flushing test\n"));
+ test_failed =
+ run_timeout_reactive (orb.in (),
+ oneway_buffering.in (),
+ oneway_buffering_admin.in (),
+ ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+ }
else if (run_buffer_size_test)
{
ACE_DEBUG ((LM_DEBUG,
@@ -535,6 +557,116 @@ run_timeout (CORBA::ORB_ptr orb,
}
int
+run_timeout_reactive (CORBA::ORB_ptr orb,
+ Test::Oneway_Buffering_ptr oneway_buffering,
+ Test::Oneway_Buffering_Admin_ptr oneway_buffering_admin,
+ CORBA::Environment &ACE_TRY_ENV)
+{
+ TAO::BufferingConstraint buffering_constraint;
+ buffering_constraint.mode = TAO::BUFFER_TIMEOUT;
+ buffering_constraint.message_count = 0;
+ buffering_constraint.message_bytes = 0;
+ buffering_constraint.timeout = TIMEOUT_MILLISECONDS * 10000;
+
+ Test::Oneway_Buffering_var flusher;
+ int test_failed =
+ configure_policies (orb, buffering_constraint,
+ oneway_buffering, flusher.out (),
+ ACE_TRY_ENV);
+ ACE_CHECK_RETURN (-1);
+
+ if (test_failed != 0)
+ return test_failed;
+
+ Test::Payload payload (PAYLOAD_LENGTH);
+ payload.length (PAYLOAD_LENGTH);
+ for (int j = 0; j != PAYLOAD_LENGTH; ++j)
+ payload[j] = CORBA::Octet(j % 256);
+
+ CORBA::ULong send_count = 0;
+ for (int i = 0; i != iterations; ++i)
+ {
+ // Get back in sync with the server...
+ flusher->flush (ACE_TRY_ENV);
+ ACE_CHECK_RETURN (-1);
+ flusher->sync (ACE_TRY_ENV);
+ ACE_CHECK_RETURN (-1);
+
+ CORBA::ULong initial_receive_count =
+ oneway_buffering_admin->request_count (ACE_TRY_ENV);
+ ACE_CHECK_RETURN (-1);
+
+ if (initial_receive_count != send_count)
+ {
+ test_failed = 1;
+ ACE_ERROR ((LM_ERROR,
+ "ERROR: Iteration %d message lost (%u != %u)\n",
+ i, initial_receive_count, send_count));
+ }
+
+ ACE_Time_Value start = ACE_OS::gettimeofday ();
+ for (int j = 0; j != 20; ++j)
+ {
+ oneway_buffering->receive_data (payload, ACE_TRY_ENV);
+ ACE_CHECK_RETURN (-1);
+ send_count++;
+ }
+ while (1)
+ {
+ CORBA::ULong receive_count =
+ oneway_buffering_admin->request_count (ACE_TRY_ENV);
+ ACE_CHECK_RETURN (-1);
+
+ ACE_Time_Value sleep (0, 10000);
+ orb->run (sleep, ACE_TRY_ENV);
+ ACE_CHECK_RETURN (-1);
+
+ ACE_Time_Value elapsed = ACE_OS::gettimeofday () - start;
+ if (receive_count != initial_receive_count)
+ {
+ if (elapsed.msec () < TIMEOUT_MILLISECONDS)
+ {
+ test_failed = 1;
+ ACE_ERROR ((LM_ERROR,
+ "ERROR: Iteration %d flush before "
+ "timeout expired. "
+ "Elapsed = %d, Timeout = %d msecs\n",
+ i,
+ elapsed.msec (), TIMEOUT_MILLISECONDS));
+ }
+ // terminate the while loop.
+ break;
+ }
+
+ if (elapsed.msec () > 2 * TIMEOUT_MILLISECONDS)
+ {
+ test_failed = 1;
+ ACE_ERROR ((LM_ERROR,
+ "ERROR: Iteration %d no flush past "
+ "timeout threshold. "
+ "Elapsed = %d, Timeout = %d msecs\n",
+ i,
+ elapsed.msec (), TIMEOUT_MILLISECONDS));
+ break;
+ }
+ }
+ }
+
+ int progress_test_failed =
+ run_progress_test (oneway_buffering,
+ flusher.in (),
+ oneway_buffering_admin,
+ ACE_TRY_ENV);
+ ACE_CHECK_RETURN (-1);
+
+ if (progress_test_failed)
+ test_failed = 1;
+
+
+ return test_failed;
+}
+
+int
run_buffer_size (CORBA::ORB_ptr orb,
Test::Oneway_Buffering_ptr oneway_buffering,
Test::Oneway_Buffering_Admin_ptr oneway_buffering_admin,
diff --git a/TAO/tests/Oneway_Buffering/run_test.pl b/TAO/tests/Oneway_Buffering/run_test.pl
index 30593090375..99a64dd9dd5 100755
--- a/TAO/tests/Oneway_Buffering/run_test.pl
+++ b/TAO/tests/Oneway_Buffering/run_test.pl
@@ -11,7 +11,7 @@ use PerlACE::Run_Test;
$admin_iorfile = PerlACE::LocalFile ("admin.ior");
$server_iorfile = PerlACE::LocalFile ("server.ior");
-foreach $test_type ("-c", "-t", "-b") {
+foreach $test_type ("-c", "-t", "-b", "-r") {
unlink $admin_iorfile;
unlink $server_iorfile;
diff --git a/TAO/tests/Oneway_Buffering/run_timeout_reactive.pl b/TAO/tests/Oneway_Buffering/run_timeout_reactive.pl
new file mode 100755
index 00000000000..31f09190b88
--- /dev/null
+++ b/TAO/tests/Oneway_Buffering/run_timeout_reactive.pl
@@ -0,0 +1,65 @@
+eval '(exit $?0)' && eval 'exec perl -S $0 ${1+"$@"}'
+ & eval 'exec perl -S $0 $argv:q'
+ if 0;
+
+# $Id$
+# -*- perl -*-
+
+use lib '../../../bin';
+use PerlACE::Run_Test;
+
+$admin_iorfile = PerlACE::LocalFile ("admin.ior");
+$server_iorfile = PerlACE::LocalFile ("server.ior");
+
+unlink $admin_iorfile;
+unlink $server_iorfile;
+
+my $AD = new PerlACE::Process ("admin", "-o $admin_iorfile");
+my $SV = new PerlACE::Process ("server", "-o $server_iorfile");
+my $CL = new PerlACE::Process ("client",
+ " -k file://$server_iorfile "
+ ."-a file://$admin_iorfile "
+ ."-r ");
+
+$AD->Spawn ();
+
+if (PerlACE::waitforfile_timed ($admin_iorfile, 10) == -1) {
+ print STDERR "ERROR: cannot find file <$admin_iorfile>\n";
+ $AD->Kill (); $AD->TimedWait (1);
+ exit 1;
+}
+
+$SV->Spawn ();
+
+if (PerlACE::waitforfile_timed ($server_iorfile, 10) == -1) {
+ print STDERR "ERROR: cannot find file <$server_iorfile>\n";
+ $AD->Kill (); $AD->TimedWait (1);
+ $SV->Kill (); $SV->TimedWait (1);
+ exit 1;
+}
+
+$client = $CL->SpawnWaitKill (300);
+
+if ($client != 0) {
+ print STDERR "ERROR: client returned $client\n";
+ $status = 1;
+}
+
+$server = $SV->WaitKill (10);
+
+if ($server != 0) {
+ print STDERR "ERROR: server returned $server\n";
+ $status = 1;
+}
+
+$admin = $AD->WaitKill (10);
+
+if ($admin != 0) {
+ print STDERR "ERROR: admin returned $admin\n";
+ $status = 1;
+}
+
+unlink $server_iorfile;
+unlink $admin_iorfile;
+
+exit $status;