summaryrefslogtreecommitdiff
path: root/trunk/TAO/orbsvcs/tests/Notify/lib/Periodic_Supplier.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'trunk/TAO/orbsvcs/tests/Notify/lib/Periodic_Supplier.cpp')
-rw-r--r--trunk/TAO/orbsvcs/tests/Notify/lib/Periodic_Supplier.cpp358
1 files changed, 358 insertions, 0 deletions
diff --git a/trunk/TAO/orbsvcs/tests/Notify/lib/Periodic_Supplier.cpp b/trunk/TAO/orbsvcs/tests/Notify/lib/Periodic_Supplier.cpp
new file mode 100644
index 00000000000..b2aea30f498
--- /dev/null
+++ b/trunk/TAO/orbsvcs/tests/Notify/lib/Periodic_Supplier.cpp
@@ -0,0 +1,358 @@
+// $Id$
+
+#include "Periodic_Supplier.h"
+
+#include "ace/Arg_Shifter.h"
+#include "ace/High_Res_Timer.h"
+#include "ace/Barrier.h"
+#include "ace/OS_NS_unistd.h"
+#include "tao/debug.h"
+#include "tao/ORB_Core.h"
+#include "orbsvcs/Time_Utilities.h"
+#include "StructuredEvent.h"
+#include "Task_Stats.h"
+#include "Task_Callback.h"
+#include "LookupManager.h"
+#include "Priority_Mapping.h"
+
+ACE_RCSID (RT_Notify,
+ TAO_Notify_Tests_Periodic_Supplier,
+ "$Id$")
+
+TAO_Notify_Tests_Periodic_Supplier::TAO_Notify_Tests_Periodic_Supplier (void)
+ : barrier_ (0),
+ priority_ (0),
+ period_ (0),
+ total_deadlines_missed_ (0),
+ run_time_ (0),
+ exec_time_ (0),
+ phase_ (0),
+ iter_ (0),
+ load_ (0),
+ client_ (0)
+{
+}
+
+TAO_Notify_Tests_Periodic_Supplier::~TAO_Notify_Tests_Periodic_Supplier ()
+{
+}
+
+
+void
+TAO_Notify_Tests_Periodic_Supplier::task_callback(TAO_Notify_Tests_Task_Callback* client)
+{
+ this->client_ = client;
+}
+
+int
+TAO_Notify_Tests_Periodic_Supplier::init_state (ACE_Arg_Shifter& arg_shifter)
+{
+ // First, let the base class look for options.
+ if (TAO_Notify_Tests_StructuredPushSupplier::init_state (arg_shifter) == -1)
+ return -1;
+
+ const ACE_TCHAR *current_arg = 0;
+
+ while (arg_shifter.is_anything_left ())
+ {
+ if ((current_arg = arg_shifter.get_the_parameter ("-EventType")))
+ {
+ this->event_.type ("*", current_arg) ;
+ zeroth_event.type ("*", current_arg) ;
+ arg_shifter.consume_arg ();
+ }
+ else if (arg_shifter.cur_arg_strncasecmp ("-FilterLongData") == 0) // -FilterLongData name value
+ {
+ arg_shifter.consume_arg ();
+
+ ACE_CString name = arg_shifter.get_current ();
+
+ arg_shifter.consume_arg ();
+
+ CORBA::Long value = (CORBA::Long)ACE_OS::atoi (arg_shifter.get_current ());
+
+ arg_shifter.consume_arg ();
+
+ CORBA::Any buffer;
+ buffer <<= (CORBA::Long) value;
+
+ this->event_.filter (name.c_str (), buffer);
+ }
+ else if ((current_arg = arg_shifter.get_the_parameter ("-Priority")))
+ {
+ priority_ = ACE_OS::atoi (current_arg);
+ arg_shifter.consume_arg ();
+
+ CORBA::Any buffer;
+ buffer <<= (CORBA::Short) this->priority_;
+ this->event_.qos (CosNotification::Priority, buffer);
+ }
+ else if ((current_arg = arg_shifter.get_the_parameter ("-Period")))
+ {
+ period_ = ACE_OS::atoi (current_arg);
+ arg_shifter.consume_arg ();
+ }
+ else if ((current_arg = arg_shifter.get_the_parameter ("-ExecTime")))
+ {
+ exec_time_ = ACE_OS::atoi (current_arg);
+ arg_shifter.consume_arg ();
+ }
+ else if ((current_arg = arg_shifter.get_the_parameter ("-Phase")))
+ {
+ phase_ = ACE_OS::atoi (current_arg);
+ arg_shifter.consume_arg ();
+ }
+ else if ((current_arg = arg_shifter.get_the_parameter ("-Iter")))
+ {
+ iter_ = ACE_OS::atoi (current_arg);
+ arg_shifter.consume_arg ();
+
+ if (stats_.init (iter_) == -1)
+ return -1;
+ }
+ else if ((current_arg = arg_shifter.get_the_parameter ("-Load")))
+ {
+ load_ = ACE_OS::atoi (current_arg);
+ arg_shifter.consume_arg ();
+ }
+ else if ((current_arg = arg_shifter.get_the_parameter ("-RunTime"))) // in seconds
+ {
+ run_time_ = ACE_OS::atoi (current_arg);
+ arg_shifter.consume_arg ();
+ }
+ else
+ {
+ ACE_DEBUG ((LM_DEBUG, "parse Task unknown option %s\n",
+ arg_shifter.get_current ()));
+ if (TAO_debug_level > 0)
+ ACE_DEBUG ((LM_DEBUG, "event type %s, priority %d, period %duS, exec_time %duS, phase %duS, iter %d, load %d\n",
+ event_.type(), priority_, period_, exec_time_, phase_, iter_, load_));
+ break;
+ }
+ }
+ return 0;
+}
+
+int
+TAO_Notify_Tests_Periodic_Supplier::activate_task (ACE_Barrier* barrier)
+{
+ barrier_ = barrier;
+
+ long flags = THR_NEW_LWP | THR_JOINABLE;
+
+ // Resolve the ORB
+ CORBA::ORB_var orb;
+ LOOKUP_MANAGER->resolve (orb);
+
+ flags |=
+ orb->orb_core ()->orb_params ()->thread_creation_flags ();
+
+ TAO_Notify_Tests_Priority_Mapping* priority_mapping;
+ LOOKUP_MANAGER->resolve (priority_mapping);
+
+ CORBA::Short native_prio;
+
+ priority_mapping->to_native (this->priority_, native_prio);
+
+ // Become an active object.
+ if (this->ACE_Task <ACE_SYNCH>::activate (flags,
+ 1,
+ 0,
+ native_prio) == -1)
+ {
+ if (ACE_OS::last_error () == EPERM)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ ACE_TEXT ("Insufficient privilege to activate ACE_Task.\n")),
+ -1);
+ else
+ ACE_DEBUG ((LM_ERROR,
+ ACE_TEXT ("(%t) Task activation at priority %d failed, ")
+ ACE_TEXT ("exiting!\n%a"),
+ this->priority_,
+ -1));
+ }
+
+ ACE_DEBUG ((LM_ERROR, "Activated Periodic Supplier Thread at priority %d\n", this->priority_));
+
+ return 0;
+}
+
+void
+TAO_Notify_Tests_Periodic_Supplier::send_warmup_events (ACE_ENV_SINGLE_ARG_DECL)
+{
+ int WARMUP_COUNT = 10;
+
+ for (int i = 0; i < WARMUP_COUNT ; ++i)
+ {
+ this->send_event (this->event_.event () ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK;
+ }
+}
+
+void
+TAO_Notify_Tests_Periodic_Supplier::send_prologue (ACE_ENV_SINGLE_ARG_DECL)
+{
+ // populate event.
+ // send the base time and max count.
+ TimeBase::TimeT base_time;
+ ORBSVCS_Time::hrtime_to_TimeT (base_time,
+ BASE_TIME::instance ()->base_time_);
+
+ CORBA::Any buffer;
+ buffer <<= base_time;
+ zeroth_event.opt_header ("BaseTime", buffer);
+
+ buffer <<= this->iter_;
+ zeroth_event.opt_header ("MaxCount", buffer);
+
+ buffer <<= this->load_;
+ zeroth_event.opt_header ("Load", buffer);
+
+ if (TAO_debug_level > 0)
+ ACE_DEBUG ((LM_DEBUG, "(%P, %t) Supplier (%s) sending event 0th event\n"));
+
+ this->send_event (zeroth_event.event () ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK;
+}
+
+void
+TAO_Notify_Tests_Periodic_Supplier::handle_svc (ACE_ENV_SINGLE_ARG_DECL)
+{
+ this->send_prologue (ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_CHECK;
+
+ ACE_hrtime_t before, after;
+ TimeBase::TimeT time_t;
+
+ CORBA::Any buffer;
+
+ ACE_hrtime_t base_time = BASE_TIME::instance ()->base_time_;
+
+ for (int i = 0; i < iter_ ; ++i)
+ {
+ before = ACE_OS::gethrtime ();
+
+ ORBSVCS_Time::hrtime_to_TimeT (time_t,
+ before);
+
+ buffer <<= time_t;
+
+ this->event_.payload (buffer);
+
+ if (this->run_time_ != 0 &&
+ Task_Stats::diff_sec (base_time, before) > this->run_time_)
+ {
+ // Time up, send a "Stop" event.
+ buffer <<= (CORBA::Long) 1;
+ this->event_.opt_header ("Stop", buffer);
+
+ i = iter_; // Load the iter so that the loop exits.
+ }
+
+ if (TAO_debug_level > 0)
+ ACE_DEBUG ((LM_DEBUG, "(%P, %t) Supplier (%s) sending event #%d\n",
+ this->name_.c_str (), i));
+
+ this->send_event (this->event_.event () ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK;
+
+ after = ACE_OS::gethrtime ();
+
+ stats_.sample (before, after);
+
+ if (period_ != 0) // blast mode, no sleep.
+ {
+ ACE_UINT32 elapsed_microseconds =
+ Task_Stats::diff_usec (before, after);
+
+ // did we miss any deadlines?
+ int missed =
+ (int)elapsed_microseconds > period_ ? elapsed_microseconds/period_ : 0;
+ this->total_deadlines_missed_ += missed;
+
+ /* Start -- "Immediate run if last call missed deadline" */
+ if (missed > 0) // if we missed
+ continue;
+
+ long sleep_time = period_ - elapsed_microseconds;
+ /* End -- "Immediate run if last call missed deadline" */
+
+ /*
+ * This logic sleeps till the next period.
+ * So, if we missed a deadline we wait.
+ *
+ long sleep_time = (missed + 1)*period_ ;
+ sleep_time -= elapsed_microseconds;
+ */
+
+ if (TAO_debug_level > 0)
+ ACE_DEBUG ((LM_DEBUG, "(%t) sleep time = %d uSec, missed %d deadlines\n", sleep_time, missed));
+
+ ACE_Time_Value t_sleep (0, sleep_time);
+ ACE_OS::sleep (t_sleep);
+ } /* period != 0 */
+
+ } /* for */
+
+ stats_.end_time (ACE_OS::gethrtime ());
+
+ if (this->client_)
+ this->client_->done (this);
+}
+
+int
+TAO_Notify_Tests_Periodic_Supplier::svc (void)
+{
+ if (TAO_debug_level > 0)
+ ACE_DEBUG ((LM_DEBUG, "Thread_Task (%t) - wait\n"));
+
+ ACE_TRY_NEW_ENV
+ {
+ // First, send warmup events.
+ this->send_warmup_events (ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+
+ // Next, wait for other threads.
+ this->barrier_->wait ();
+
+ // first thread here inits the Base_Time.
+ stats_.base_time (BASE_TIME::instance ()->base_time_);
+
+ // now wait till the phase_ period expires.
+ ACE_OS::sleep (ACE_Time_Value (0, phase_));
+
+ this->handle_svc (ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+ }
+ ACE_CATCH (CORBA::UserException, ue)
+ {
+ ACE_PRINT_EXCEPTION (ue,
+ "Error: Periodic supplier: error sending event. ");
+ }
+ ACE_CATCH (CORBA::SystemException, se)
+ {
+ ACE_PRINT_EXCEPTION (se,
+ "Error: Periodic supplier: error sending event. ");
+ }
+ ACE_ENDTRY;
+
+ return 0;
+}
+
+void
+TAO_Notify_Tests_Periodic_Supplier::dump_stats (ACE_TCHAR* msg, int dump_samples)
+{
+ char buf[BUFSIZ];
+ ACE_OS::sprintf (buf, "%s.dat", this->name_.c_str ());
+
+ ACE_CString fname (buf);
+
+ ACE_OS::sprintf (buf,"%s# : Supplier Name = %s, Proxy ID = %d, Event Type = %s, priority %d, period %ld, exec_time %ld, phase %ld, iter_ %d , load_ %d, deadlines missed = %d\n",
+ msg, this->name_.c_str (), this->proxy_id_, this->event_.type (), priority_, period_, exec_time_, phase_, iter_, load_, this->total_deadlines_missed_);
+
+ stats_.dump_samples (fname.c_str (), buf, dump_samples);
+}
+
+#if defined (ACE_HAS_EXPLICIT_STATIC_TEMPLATE_MEMBER_INSTANTIATION)
+template ACE_Singleton<Base_Time, ACE_Thread_Mutex> *ACE_Singleton<Base_Time, ACE_Thread_Mutex>::singleton_;
+#endif /* ACE_HAS_EXPLICIT_STATIC_TEMPLATE_MEMBER_INSTANTIATION */