diff options
Diffstat (limited to 'trunk/TAO/orbsvcs/tests/Notify/lib/Periodic_Consumer.cpp')
-rw-r--r-- | trunk/TAO/orbsvcs/tests/Notify/lib/Periodic_Consumer.cpp | 273 |
1 files changed, 273 insertions, 0 deletions
diff --git a/trunk/TAO/orbsvcs/tests/Notify/lib/Periodic_Consumer.cpp b/trunk/TAO/orbsvcs/tests/Notify/lib/Periodic_Consumer.cpp new file mode 100644 index 00000000000..a718deb73bf --- /dev/null +++ b/trunk/TAO/orbsvcs/tests/Notify/lib/Periodic_Consumer.cpp @@ -0,0 +1,273 @@ +// $Id$ + +#include "Periodic_Consumer.h" + +#include "ace/Arg_Shifter.h" +#include "ace/High_Res_Timer.h" +#include "ace/Task.h" +#include "tao/debug.h" +#include "orbsvcs/Time_Utilities.h" +#include "StructuredEvent.h" +#include "Task_Stats.h" +#include "Task_Callback.h" +#include "LookupManager.h" +#include "Priority_Mapping.h" + +ACE_RCSID (RT_Notify, + TAO_Notify_Tests_Periodic_Consumer, + "$Id$") + +int WARMUP_COUNT = 10; + +TAO_Notify_Tests_Periodic_Consumer::TAO_Notify_Tests_Periodic_Consumer (void) + : count_ (-2) + , warmup_countdown_ (WARMUP_COUNT) + , max_count_ (-1) + , load_ (0) + , client_ (0) + , check_priority_ (0) + , stop_received_ (0) +{ +} + +TAO_Notify_Tests_Periodic_Consumer::~TAO_Notify_Tests_Periodic_Consumer () +{ +} + +void +TAO_Notify_Tests_Periodic_Consumer::task_callback (TAO_Notify_Tests_Task_Callback* client) +{ + this->client_ = client; +} + +int +TAO_Notify_Tests_Periodic_Consumer::init_state (ACE_Arg_Shifter& arg_shifter) +{ + // First, let the base class look for options. + if (TAO_Notify_Tests_StructuredPushConsumer::init_state (arg_shifter) == -1) + return -1; + + const ACE_TCHAR *current_arg = 0; + + while (arg_shifter.is_anything_left ()) + { + if ((current_arg = arg_shifter.get_the_parameter ("-MaxCount"))) + { + this->max_count_ = ACE_OS::atoi (current_arg); + arg_shifter.consume_arg (); + + if (max_count_ == 0) + { + if (this->client_) + this->client_->done (this); + } + } + else if (arg_shifter.cur_arg_strncasecmp ("-Check_Priority") == 0) + { + this->check_priority_ = 1; + + arg_shifter.consume_arg (); + } + else + { + break; + } + } /* while */ + + return 0; +} + +void +TAO_Notify_Tests_Periodic_Consumer::handle_start_event (const CosNotification::PropertySeq& prop_seq) +{ + if (TAO_debug_level > 0) + ACE_DEBUG ((LM_DEBUG, "(%P, %t)Consumer %s received inital (-1)th event \n", this->name_.c_str ())); + + for (CORBA::ULong i = 0; i < prop_seq.length (); ++i) + { + if (ACE_OS::strcmp (prop_seq[i].name.in (), "BaseTime") == 0) + { + TimeBase::TimeT base_time; + ACE_hrtime_t base_time_hrtime; + + prop_seq[i].value >>= base_time; + + ORBSVCS_Time::TimeT_to_hrtime (base_time_hrtime, base_time); + stats_.base_time (base_time_hrtime); + } + // if max_count has not been already specified, get it from the supplier. + else if (this->max_count_ == -1 && + ACE_OS::strcmp (prop_seq[i].name.in (), "MaxCount") == 0) + { + prop_seq[i].value >>= this->max_count_; + } + else if (ACE_OS::strcmp (prop_seq[i].name.in (), "Load") == 0) + { + prop_seq[i].value >>= this->load_; + } + } + + if (TAO_debug_level > 0) + { + ACE_DEBUG ((LM_DEBUG, "(%P, %t) Maxcount = %d, Load = %d\n", + this->max_count_, this->load_)); + } +} + +void +TAO_Notify_Tests_Periodic_Consumer::check_priority (const CosNotification::PropertySeq& prop_seq) +{ + // Check if the event carries a Priority. + int event_has_priority_set = 0; + CORBA::Short event_priority = 0; + + for (CORBA::ULong i = 0; i < prop_seq.length (); ++i) + { + if (ACE_OS::strcmp (prop_seq[i].name.in (), CosNotification::Priority) == 0) + { + prop_seq[i].value >>= event_priority; + + event_has_priority_set = 1; + break; + } + } + + if (event_has_priority_set == 1) + { + // Confirm that the current thread is at the priority set in the event + ACE_hthread_t current; + ACE_Thread::self (current); + + int priority; + if (ACE_Thread::getprio (current, priority) == -1) + { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("TAO (%P|%t) - ") + ACE_TEXT (" ACE_Thread::get_prio\n"))); + + return ; + } + + CORBA::Short native_priority = CORBA::Short (priority); + + TAO_Notify_Tests_Priority_Mapping* priority_mapping; + LOOKUP_MANAGER->resolve (priority_mapping); + + CORBA::Short corba_priority; + + priority_mapping->to_CORBA (native_priority, corba_priority); + + if (TAO_debug_level > 0) + ACE_DEBUG ((LM_DEBUG, + "Periodic Consumer expected priority = %d, received priority = %d/%d (native/corba)\n", + event_priority, native_priority, corba_priority)); + + if (corba_priority != event_priority) + ACE_DEBUG ((LM_DEBUG, + "Error: Periodic Consumer expected priority = %d, received priority = %d\n", + event_priority, corba_priority)); + } +} + +void +TAO_Notify_Tests_Periodic_Consumer::push_structured_event (const CosNotification::StructuredEvent & notification ACE_ENV_ARG_DECL) + ACE_THROW_SPEC (( + CORBA::SystemException, + CosEventComm::Disconnected + )) +{ + ACE_GUARD_THROW_EX (TAO_SYNCH_MUTEX, ace_mon, this->lock_, + CORBA::INTERNAL ()); + ACE_CHECK; + + const CosNotification::PropertySeq& prop_seq = + notification.header.variable_header; + + if (this->count_ == -2) + { + if (--warmup_countdown_ == 0) + this->count_ = -1; + + return; + } + else if (this->count_ == -1) + { + this->handle_start_event (prop_seq); + + if (this->max_count_ > 0) + this->stats_.init (this->max_count_); + + this->count_ = 0; + return; + } + + if (this->check_priority_) + { + this->check_priority (prop_seq); + } + + if (TAO_debug_level > 0) + { + ACE_DEBUG ((LM_DEBUG, "(%P, %t)Consumer %s received %d event type (%s,%s) \n", this->name_.c_str (), this->count_, + notification.header.fixed_header.event_type.domain_name.in(), + notification.header.fixed_header.event_type.type_name.in())); + } + + for (CORBA::ULong i = 0; i < prop_seq.length (); ++i) + { + if (ACE_OS::strcmp (prop_seq[i].name.in (), "Stop") == 0) + { + this->stop_received_ = 1; + } + } + + TimeBase::TimeT send_time, now; + ACE_hrtime_t send_time_hrtime; + + notification.remainder_of_body >>= send_time; + + ORBSVCS_Time::TimeT_to_hrtime (send_time_hrtime, send_time); + + now = ACE_OS::gethrtime (); + + stats_.sample (send_time_hrtime, now); + + // Eat CPU + static CORBA::ULong prime_number = 9619; + + (void)ACE::gcd (prime_number, prime_number/2 -1); + + for (CORBA::ULong load = this->load_; load != 0; --load) + ACE::is_prime (prime_number, + 2, + prime_number / 2); + + // --- + + if (++this->count_ >= this->max_count_ || this->stop_received_ == 1) + { + stats_.end_time (ACE_OS::gethrtime ()); + + if (this->client_) + this->client_->done (this); + + if (TAO_debug_level > 0) + ACE_DEBUG ((LM_DEBUG, "(%P, %t)Consumer %s done \n", this->name_.c_str ())); + } +} + +void +TAO_Notify_Tests_Periodic_Consumer::dump_stats (ACE_TCHAR* msg, int dump_samples) +{ + char buf[BUFSIZ]; + ACE_OS::sprintf (buf, "%s.dat", this->name_.c_str ()); + + ACE_CString fname (buf); + + ACE_OS::sprintf (buf, + "%s# Consumer Name = %s, Proxy ID = %d Load = %u\n", + msg, + this->name_.c_str (), this->proxy_id_, this->load_); + + stats_.dump_samples (fname.c_str (), buf, dump_samples); +} |