diff options
Diffstat (limited to 'TAO/orbsvcs/tests/Notify/performance-tests/Throughput/Throughput.cpp')
-rw-r--r-- | TAO/orbsvcs/tests/Notify/performance-tests/Throughput/Throughput.cpp | 610 |
1 files changed, 610 insertions, 0 deletions
diff --git a/TAO/orbsvcs/tests/Notify/performance-tests/Throughput/Throughput.cpp b/TAO/orbsvcs/tests/Notify/performance-tests/Throughput/Throughput.cpp new file mode 100644 index 00000000000..f8c45095160 --- /dev/null +++ b/TAO/orbsvcs/tests/Notify/performance-tests/Throughput/Throughput.cpp @@ -0,0 +1,610 @@ +// $Id$ + +#include "Throughput.h" + +#include "ace/Arg_Shifter.h" +#include "ace/Get_Opt.h" +#include "ace/Synch.h" +#include "ace/OS.h" +#include "ace/Dynamic_Service.h" +#include "tao/Strategies/advanced_resource.h" +#include "tao/Messaging/Messaging.h" +#include "orbsvcs/Notify/Service.h" +#include "orbsvcs/Time_Utilities.h" + +ACE_RCSID (Notify_Tests, + Throughput, + "$Id$") + +/***************************************************************************/ + +Throughput_StructuredPushConsumer::Throughput_StructuredPushConsumer ( + Notify_Throughput *test_client + ) + : test_client_ (test_client), + push_count_ (0) +{ +} + +void +Throughput_StructuredPushConsumer::accumulate_into ( + ACE_Throughput_Stats &throughput + ) const +{ + throughput.accumulate (this->throughput_); +} + +void +Throughput_StructuredPushConsumer::dump_stats (const char* msg, + ACE_UINT32 gsf) +{ + this->throughput_.dump_results (msg, gsf); +} + +void +Throughput_StructuredPushConsumer::push_structured_event ( + const CosNotification::StructuredEvent & notification + ) +{ + // Extract payload. + const char* msg; + CORBA::Boolean ok = (notification.remainder_of_body >>= msg); + + if (!ok) + ACE_DEBUG ((LM_DEBUG, "(%t) Error extracting message body\n")); + + TimeBase::TimeT Throughput_base_recorded; + ACE_hrtime_t Throughput_base; + + notification.filterable_data[0].value >>= Throughput_base_recorded; + + ORBSVCS_Time::TimeT_to_hrtime (Throughput_base, + Throughput_base_recorded); + + ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->lock_); + + if (this->push_count_ == 0) + { + this->throughput_start_ = ACE_OS::gethrtime (); + } + + ++this->push_count_; + + // Grab timestamp again. + ACE_hrtime_t now = ACE_OS::gethrtime (); + + // Record statistics. + this->throughput_.sample (now - this->throughput_start_, + now - Throughput_base); + + if (this->push_count_%1000 == 0) + { + ACE_DEBUG ((LM_DEBUG, + "(%P)(%t) event count = %d\n", + this->push_count_)); + } + + if (push_count_ == test_client_->perconsumer_count_) + { + ACE_DEBUG ((LM_DEBUG, + "(%t)expected count reached\n")); + test_client_->peer_done (); + } +} + +/***************************************************************************/ + +Throughput_StructuredPushSupplier::Throughput_StructuredPushSupplier ( + Notify_Throughput* test_client + ) + :test_client_ (test_client), + push_count_ (0) +{ +} + +Throughput_StructuredPushSupplier::~Throughput_StructuredPushSupplier () +{ +} + +void +Throughput_StructuredPushSupplier::accumulate_into ( + ACE_Throughput_Stats &throughput + ) const +{ + throughput.accumulate (this->throughput_); +} + +void +Throughput_StructuredPushSupplier::dump_stats (const char* msg, + ACE_UINT32 gsf) +{ + this->throughput_.dump_results (msg, gsf); +} + +int +Throughput_StructuredPushSupplier::svc (void) +{ + // Initialize a time value to pace the test. + ACE_Time_Value tv (0, test_client_->burst_pause_); + + // Operations. + CosNotification::StructuredEvent event; + + // EventHeader + + // FixedEventHeader + // EventType + // string + event.header.fixed_header.event_type.domain_name = CORBA::string_dup("*"); + // string + event.header.fixed_header.event_type.type_name = CORBA::string_dup("*"); + // string + event.header.fixed_header.event_name = CORBA::string_dup("myevent"); + + // OptionalHeaderFields + // PropertySeq + // sequence<Property>: string name, any value + CosNotification::PropertySeq& qos = event.header.variable_header; + qos.length (0); // put nothing here + + // FilterableEventBody + // PropertySeq + // sequence<Property>: string name, any value + event.filterable_data.length (1); + event.filterable_data[0].name = CORBA::string_dup("Throughput_base"); + + event.remainder_of_body <<= test_client_->payload_; + + + this->throughput_start_ = ACE_OS::gethrtime (); + + for (int i = 0; i < test_client_->burst_count_; ++i) + { + for (int j = 0; j < test_client_->burst_size_; ++j) + { + // Record current time. + ACE_hrtime_t start = ACE_OS::gethrtime (); + TimeBase::TimeT Throughput_base; + ORBSVCS_Time::hrtime_to_TimeT (Throughput_base, + start); + // Any. + event.filterable_data[0].value <<= Throughput_base; + + this->proxy_->push_structured_event (event); + + ACE_hrtime_t end = ACE_OS::gethrtime (); + this->throughput_.sample (end - this->throughput_start_, + end - start); + } + + ACE_OS::sleep (tv); + } + + ACE_DEBUG ((LM_DEBUG, "(%P) (%t) Supplier done\n")); + test_client_->peer_done (); + return 0; +} + +/***************************************************************************/ +Notify_Throughput::Notify_Throughput (void) + : collocated_ec_ (0), + burst_count_ (1), + burst_pause_ (10000), + burst_size_ (1000), + payload_size_ (1024), + payload_ (0), + consumer_count_ (1), + supplier_count_ (1), + perconsumer_count_ (burst_size_*burst_count_*supplier_count_), + consumers_ (0), + suppliers_ (0), + nthreads_ (1), + peer_done_count_ (consumer_count_ + supplier_count_), + condition_ (lock_) +{ +} + +Notify_Throughput::~Notify_Throughput () +{ + this->orb_->shutdown (0); + + delete payload_; +} + +int +Notify_Throughput::init (int argc, char* argv []) +{ + // Initialize base class. + Notify_Test_Client::init_ORB (argc, + argv); + +#if (TAO_HAS_CORBA_MESSAGING == 1) + CORBA::Object_var manager_object = + orb_->resolve_initial_references ("ORBPolicyManager"); + + CORBA::PolicyManager_var policy_manager = + CORBA::PolicyManager::_narrow (manager_object.in ()); + + CORBA::Any sync_scope; + sync_scope <<= Messaging::SYNC_WITH_TARGET; + + CORBA::PolicyList policy_list (1); + policy_list.length (1); + policy_list[0] = + orb_->create_policy (Messaging::SYNC_SCOPE_POLICY_TYPE, + sync_scope); + policy_manager->set_policy_overrides (policy_list, + CORBA::SET_OVERRIDE); +#else + ACE_DEBUG ((LM_DEBUG, + "CORBA Messaging disabled in this configuration," + " test may not be optimally configured\n")); +#endif /* TAO_HAS_MESSAGING */ + + worker_.orb (this->orb_.in ()); + + if (worker_.activate (THR_NEW_LWP | THR_JOINABLE, + this->nthreads_) != 0) + { + ACE_ERROR ((LM_ERROR, + "Cannot activate client threads\n")); + } + + // Create all participents ... + this->create_EC (); + + CosNotifyChannelAdmin::AdminID adminid; + + supplier_admin_ = + ec_->new_for_suppliers (this->ifgop_, adminid); + + ACE_ASSERT (!CORBA::is_nil (supplier_admin_.in ())); + + consumer_admin_ = + ec_->new_for_consumers (this->ifgop_, adminid); + + ACE_ASSERT (!CORBA::is_nil (consumer_admin_.in ())); + + ACE_NEW_RETURN (consumers_, + Throughput_StructuredPushConsumer*[this->consumer_count_], + -1); + ACE_NEW_RETURN (suppliers_, + Throughput_StructuredPushSupplier*[this->supplier_count_], + -1); + + // ---- + + int i = 0; + + for (i = 0; i < this->consumer_count_; ++i) + { + ACE_NEW_RETURN (consumers_[i], + Throughput_StructuredPushConsumer (this), + -1); + consumers_[i]->init (root_poa_.in ()); + + consumers_[i]->connect (this->consumer_admin_.in ()); + } + + for (i = 0; i < this->supplier_count_; ++i) + { + ACE_NEW_RETURN (suppliers_[i], + Throughput_StructuredPushSupplier (this), + -1); + suppliers_[i]->TAO_Notify_Tests_StructuredPushSupplier::init ( + root_poa_.in () + ); + suppliers_[i]->connect (this->supplier_admin_.in ()); + } + + return 0; +} + +int +Notify_Throughput::parse_args(int argc, char *argv[]) +{ + ACE_Arg_Shifter arg_shifter (argc, argv); + + const ACE_TCHAR* current_arg = 0; + while (arg_shifter.is_anything_left ()) + { + if (arg_shifter.cur_arg_strncasecmp ("-collocated_ec") == 0) + { + this->collocated_ec_ = 1; + arg_shifter.consume_arg (); + } + else if ((current_arg = arg_shifter.get_the_parameter ("-consumers"))) + { + this->consumer_count_ = ACE_OS::atoi (current_arg); + // The number of events to send/receive. + arg_shifter.consume_arg (); + } + else if ((current_arg = arg_shifter.get_the_parameter ("-suppliers"))) + { + this->supplier_count_ = ACE_OS::atoi (current_arg); + // The number of events to send/receive. + arg_shifter.consume_arg (); + } + else if ((current_arg = arg_shifter.get_the_parameter ("-burst_size"))) + { + this->burst_size_ = ACE_OS::atoi (current_arg); + // The number of events to send/receive. + arg_shifter.consume_arg (); + } + else if ((current_arg = arg_shifter.get_the_parameter ("-burst_count"))) + { + this->burst_count_ = ACE_OS::atoi (current_arg); + // + arg_shifter.consume_arg (); + } + else if ((current_arg = arg_shifter.get_the_parameter ("-burst_pause"))) + { + this->burst_pause_ = ACE_OS::atoi (current_arg); + // + arg_shifter.consume_arg (); + } + else if ((current_arg = arg_shifter.get_the_parameter ("-payload"))) + { + this->payload_size_ = ACE_OS::atoi (current_arg); + ACE_NEW_RETURN (this->payload_, + char [this->payload_size_], + -1); + // + arg_shifter.consume_arg (); + } + else if ((current_arg = arg_shifter.get_the_parameter ("-EC"))) + { + this->ec_name_ = current_arg; + // + arg_shifter.consume_arg (); + } + else if ((current_arg = + arg_shifter.get_the_parameter ("-ExpectedCount"))) + { + this->perconsumer_count_ = ACE_OS::atoi (current_arg); + // + arg_shifter.consume_arg (); + } + else if (arg_shifter.cur_arg_strncasecmp ("-?") == 0) + { + ACE_DEBUG((LM_DEBUG, + "usage: %s " + "-collocated_ec, " + "-consumers [count], " + "-suppliers [count], " + "-burst_size [size], " + "-burst_count [count], " + "-burst_pause [time(uS)], " + "-payload [size]" + "-EC [Channel Name]" + "-ExpectedCount [count]\n", + argv[0], argv[0])); + + arg_shifter.consume_arg (); + + return -1; + } + else + { + arg_shifter.ignore_arg (); + } + } + // Recalculate. + peer_done_count_ = consumer_count_ + supplier_count_; + return 0; +} + +void +Notify_Throughput::create_EC (void) +{ + if (this->collocated_ec_ == 1) + { + TAO_Notify_Service* notify_service = ACE_Dynamic_Service<TAO_Notify_Service>::instance (TAO_NOTIFICATION_SERVICE_NAME); + + if (notify_service == 0) + { + ACE_DEBUG ((LM_DEBUG, "Service not found! check conf. file\n")); + return; + } + + // Activate the factory + this->notify_factory_ = + notify_service->create (this->root_poa_.in ()); + + ACE_ASSERT (!CORBA::is_nil (this->notify_factory_.in ())); + } + else + { + this->resolve_naming_service (); + this->resolve_Notify_factory (); + } + + // A channel name was specified, use that to resolve the service. + if (this->ec_name_.length () != 0) + { + CosNaming::Name name (1); + name.length (1); + name[0].id = CORBA::string_dup (ec_name_.c_str ()); + + CORBA::Object_var obj = + this->naming_context_->resolve (name); + + this->ec_ = + CosNotifyChannelAdmin::EventChannel::_narrow (obj.in ()); + } +else + { + CosNotifyChannelAdmin::ChannelID id; + + ec_ = notify_factory_->create_channel (initial_qos_, + initial_admin_, + id); + } + + ACE_ASSERT (!CORBA::is_nil (ec_.in ())); +} + +void +Notify_Throughput::run_test (void) +{ + + ACE_DEBUG ((LM_DEBUG, "collocated_ec_ %d ," + "burst_count_ %d, " + "burst_pause_ %d, " + "burst_size_ %d, " + "payload_size_ %d, " + "consumer_count_ %d, " + "supplier_count_ %d " + "expected count %d\n", + collocated_ec_, + burst_count_ , + burst_pause_ , + burst_size_ , + payload_size_, + consumer_count_ , + supplier_count_ , + perconsumer_count_)); + + for (int i = 0; i < this->supplier_count_; ++i) + { + suppliers_[i]-> + TAO_Notify_Tests_StructuredPushSupplier::init (root_poa_.in ()); + + if (suppliers_[i]->ACE_Task_Base::activate (THR_NEW_LWP | THR_JOINABLE) != 0) + { + ACE_ERROR ((LM_ERROR, + "Cannot activate client threads\n")); + } + } + + // Wait till we're signalled done. + { + ACE_DEBUG ((LM_DEBUG, "(%t)Waiting for shutdown signal in main..\n")); + ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, lock_); + + while (this->peer_done_count_ != 0) + { + condition_.wait (); + } + } + + if (this->ec_name_.length () == 0) // we are not using a global EC + { + // Destroy the ec. + this->ec_->destroy (); + } + + // Signal the workers. + this->worker_.done_ = 1; +} + +void +Notify_Throughput::peer_done (void) +{ + ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, lock_); + + if (--this->peer_done_count_ == 0) + { + ACE_DEBUG ((LM_DEBUG, "calling shutdown\n")); + condition_.broadcast (); + } +} + +void +Notify_Throughput::dump_results (void) +{ + ACE_Throughput_Stats throughput; + ACE_UINT32 gsf = ACE_High_Res_Timer::global_scale_factor (); + char buf[BUFSIZ]; + + for (int j = 0; j < this->consumer_count_; ++j) + { + ACE_OS::sprintf (buf, "Consumer [%02d]", j); + + this->consumers_[j]->dump_stats (buf, gsf); + this->consumers_[j]->accumulate_into (throughput); + } + + ACE_DEBUG ((LM_DEBUG, "\n")); + + ACE_Throughput_Stats suppliers; + + for (int i = 0; i < this->supplier_count_; ++i) + { + ACE_OS::sprintf (buf, "Supplier [%02d]", i); + + this->suppliers_[i]->dump_stats (buf, gsf); + this->suppliers_[i]->accumulate_into (suppliers); + } + + ACE_DEBUG ((LM_DEBUG, "\nTotals:\n")); + throughput.dump_results ("Notify_Consumer/totals", gsf); + + ACE_DEBUG ((LM_DEBUG, "\n")); + suppliers.dump_results ("Notify_Supplier/totals", gsf); +} + +/***************************************************************************/ + +int +main (int argc, char* argv[]) +{ + ACE_High_Res_Timer::calibrate (); + + Notify_Throughput events; + + if (events.parse_args (argc, argv) == -1) + { + return 1; + } + + try + { + events.init (argc, argv); //Init the Client + + events.run_test (); + + ACE_DEBUG ((LM_DEBUG, "Waiting for threads to exit...\n")); + ACE_Thread_Manager::instance ()->wait (); + events.dump_results(); + + ACE_DEBUG ((LM_DEBUG, "ending main...\n")); + + } + catch (const CORBA::Exception& se) + { + se._tao_print_exception ("Error: "); + return 1; + } + + return 0; +} + + +// **************************************************************** + +Worker::Worker (void) +:done_ (0) +{ +} + +void +Worker::orb (CORBA::ORB_ptr orb) +{ + orb_ = CORBA::ORB::_duplicate (orb); +} + +int +Worker::svc (void) +{ + ACE_Time_Value tv(5); + + do + { + this->orb_->run (tv); + }while (!this->done_); + + ACE_DEBUG ((LM_DEBUG, "(%P) (%t) done\n")); + + return 0; +} |