// -*- C++ -*- // $Id$ /** * Code generated by the The ACE ORB (TAO) IDL Compiler v1.8.3 * TAO and the TAO IDL Compiler have been developed by: * Center for Distributed Object Computing * Washington University * St. Louis, MO * USA * http://www.cs.wustl.edu/~schmidt/doc-center.html * and * Distributed Object Computing Laboratory * University of California at Irvine * Irvine, CA * USA * and * Institute for Software Integrated Systems * Vanderbilt University * Nashville, TN * USA * http://www.isis.vanderbilt.edu/ * * Information about TAO is available at: * http://www.cs.wustl.edu/~schmidt/TAO.html **/ #include "LatencyTT_Test_Sender_exec.h" #include "tao/ORB_Core.h" #include "ace/Reactor.h" #include "ace/Timer_Queue.h" #include "ace/High_Res_Timer.h" #include "dds4ccm/impl/dds4ccm_conf.h" namespace CIAO_LatencyTT_Test_Sender_Impl { static int compare_two_longs (const void *long1, const void *long2) { return (int)(*(ACE_UINT64*)long1 - *(ACE_UINT64*)long2); } /** * WriteTicker */ WriteTicker::WriteTicker (Sender_exec_i &callback) : callback_ (callback) { } int WriteTicker::handle_timeout (const ACE_Time_Value &, const void *) { // Notify the subscribers this->callback_.write_one (); return 0; } /** * Facet Executor Implementation Class: ping_listen_data_listener_exec_i */ ping_listen_data_listener_exec_i::ping_listen_data_listener_exec_i ( ::LatencyTT_Test::CCM_Sender_Context_ptr ctx, Sender_exec_i &callback) : ciao_context_ ( ::LatencyTT_Test::CCM_Sender_Context::_duplicate (ctx)) , callback_ (callback) { } ping_listen_data_listener_exec_i::~ping_listen_data_listener_exec_i (void) { } // Operations from ::LatencyTT_Test::LatencyTTTestConn::Listener void ping_listen_data_listener_exec_i::on_one_data (const ::LatencyTTTest & datum, const ::CCM_DDS::ReadInfo & /* info */) { ACE_UINT64 receive_time = 0; ACE_High_Res_Timer::gettimeofday_hr ().to_usec (receive_time); this->callback_.read(const_cast (datum), receive_time); } void ping_listen_data_listener_exec_i::on_many_data (const ::LatencyTTTestSeq & /* data */, const ::CCM_DDS::ReadInfoSeq & /* infos */) { /* Your code here. */ } /** * Facet Executor Implementation Class: connector_status_exec_i */ connector_status_exec_i::connector_status_exec_i ( ::LatencyTT_Test::CCM_Sender_Context_ptr ctx, Sender_exec_i &callback, Atomic_Boolean &matched, int number_of_subscribers, Atomic_Long &unexpected_count) : ciao_context_ ( ::LatencyTT_Test::CCM_Sender_Context::_duplicate (ctx)) , callback_ (callback) , matched_ (matched) , number_of_subscribers_ (number_of_subscribers) , unexpected_count_(unexpected_count) { } connector_status_exec_i::~connector_status_exec_i (void) { } // Operations from ::CCM_DDS::ConnectorStatusListener void connector_status_exec_i::on_inconsistent_topic (::DDS::Topic_ptr /* the_topic */, const ::DDS::InconsistentTopicStatus & /* status */) { /* Your code here. */ } void connector_status_exec_i::on_requested_incompatible_qos (::DDS::DataReader_ptr /* the_reader */, const ::DDS::RequestedIncompatibleQosStatus & /* status */) { /* Your code here. */ } void connector_status_exec_i::on_sample_rejected (::DDS::DataReader_ptr /* the_reader */, const ::DDS::SampleRejectedStatus & /* status */) { /* Your code here. */ } void connector_status_exec_i::on_offered_deadline_missed (::DDS::DataWriter_ptr /* the_writer */, const ::DDS::OfferedDeadlineMissedStatus & /* status */) { /* Your code here. */ } void connector_status_exec_i::on_offered_incompatible_qos (::DDS::DataWriter_ptr /* the_writer */, const ::DDS::OfferedIncompatibleQosStatus & /* status */) { /* Your code here. */ } void connector_status_exec_i::on_unexpected_status (::DDS::Entity_ptr the_entity, ::DDS::StatusKind status_kind) { ++this->unexpected_count_; if (! ::CORBA::is_nil (the_entity) && status_kind == DDS::PUBLICATION_MATCHED_STATUS) { ::DDS::PublicationMatchedStatus_var stat; ::DDS::DataWriter_var wr = ::DDS::DataWriter::_narrow(the_entity); if(::CORBA::is_nil(wr)) { throw ::CORBA::INTERNAL (); } ::DDS::ReturnCode_t retval = wr->get_publication_matched_status (stat.out ()); if (retval == DDS::RETCODE_OK) { if (stat.in ().current_count >= this->number_of_subscribers_ && !this->matched_.value()) { this->matched_ = true; this->callback_.start(); } } } } /** * Component Executor Implementation Class: Sender_exec_i */ Sender_exec_i::Sender_exec_i (void) : iterations_ (1000) , sleep_ (10) , number_of_sub_ (1) , datalen_ (100) , datalen_idx_ (0) , nr_of_runs_ (10) , matched_ (false) , tv_total_ (0L) , tv_max_ (0L) , tv_min_ (0L) , count_ (0) , number_of_msg_ (0) // Number of sent messages , timer_ (false) , received_ (false) , seq_num_ (0) , sigma_duration_squared_ (0) , start_time_ (0) , start_time_test_ (0) , end_time_test_ (0) , duration_times_ (0) , datalen_range_ (0) , _clock_overhead_ (0) , unexpected_count_ (0) { ACE_NEW_THROW_EX (this->ticker_, WriteTicker (*this), ::CORBA::NO_MEMORY ()); ACE_NEW_THROW_EX (this->datalen_range_, ::CORBA::Short[this->nr_of_runs_], ::CORBA::NO_MEMORY ()); } Sender_exec_i::~Sender_exec_i (void) { delete this->ticker_; delete [] this->duration_times_; delete [] datalen_range_; } // Supported operations and attributes. ACE_Reactor* Sender_exec_i::reactor (void) { ACE_Reactor* reactor = 0; ::CORBA::Object_var ccm_object = this->ciao_context_->get_CCM_object(); if (! ::CORBA::is_nil (ccm_object.in ())) { ::CORBA::ORB_var orb = ccm_object->_get_orb (); if (! ::CORBA::is_nil (orb.in ())) { reactor = orb->orb_core ()->reactor (); } } if (reactor == 0) { throw ::CORBA::INTERNAL (); } return reactor; } void Sender_exec_i::write_one (void) { if (this->number_of_msg_ == 0 && this->datalen_idx_ == 0) { ACE_High_Res_Timer::gettimeofday_hr ().to_usec (this->start_time_test_); } // First message sent always, next messages only as previous sent message // is received back. if (this->number_of_msg_ == 0 || this->received_.value ()) { // All messages send, stop timer. if (this->iterations_ != 0 && this->number_of_msg_ >= this->iterations_) { if( this->datalen_idx_ >= (this->nr_of_runs_ - 1)) { this->stop (); this->timer_ = false; this->calc_results (); ACE_High_Res_Timer::gettimeofday_hr ().to_usec (this->end_time_test_); } else { this->calc_results (); this->reset_results (); ++this->datalen_idx_; this->datalen_ = this->datalen_range_[this->datalen_idx_]; this->test_topic_.data.length (this->datalen_); } } else { try { this->test_topic_.seq_num = this->number_of_msg_; // Keep last sent seq_num, to control if message is sent back. this->seq_num_ = this->number_of_msg_; this->received_ = false; ACE_High_Res_Timer::gettimeofday_hr ().to_usec (this->start_time_); this->writer_->write_one (this->test_topic_, ::DDS::HANDLE_NIL); } catch (const CCM_DDS::InternalError& ) { ACE_ERROR ((LM_ERROR, ACE_TEXT ("ERROR: Internal Error ") ACE_TEXT ("while wrinting sample with sequence_number <%u>.\n"), this->test_topic_.seq_num)); } ++this->number_of_msg_; } } } void Sender_exec_i::read (LatencyTTTest & an_instance, ACE_UINT64 receive_time) { if (an_instance.seq_num == this->seq_num_) { this->record_time (receive_time); this->received_ = true; } } void Sender_exec_i::reset_results() { delete [] this->duration_times_; this->count_ = 0; ACE_NEW_THROW_EX (this->duration_times_, ACE_UINT64[this->iterations_], ::CORBA::NO_MEMORY ()); this->tv_total_ = 0L; this->tv_max_ = 0L; this->tv_min_ = 0L; this->number_of_msg_ = 0; this->received_ = false; this->seq_num_ = 0; this->sigma_duration_squared_ = 0; } void Sender_exec_i::calc_results (void) { // Sort all duration times. qsort(this->duration_times_, this->count_, sizeof(ACE_UINT64), compare_two_longs); // Show latency_50_percentile, latency_90_percentile, // latency_99_percentile and latency_99.99_percentile. // For example duration_times[per50] is the median i.e. 50% of the // samples have a latency time <= duration_times[per50] int const per50 = this->count_ / 2; int const per90 = (int)(this->count_ * 0.90); int const per99 = (int)(this->count_ * 0.990); int const per9999 = (int)(this->count_ * 0.9999); double const avg = (double)(this->tv_total_ / this->count_); // Calculate standard deviation. double const roundtrip_time_std = sqrt( (this->sigma_duration_squared_ / (double)this->count_) - (avg * avg)); // Show values as float, in order to be comparable with RTI performance test. if (this->count_ > 0) { if (this->datalen_idx_ == 0) { #if (CIAO_DDS4CCM_CONTEXT_SWITCH==1) ACE_DEBUG ((LM_DEBUG, "\n\nYES, we're using a threadswitch between " "DDS and CCM\n\n")); #else ACE_DEBUG ((LM_DEBUG, "\n\nNO, we're not using a threadswitch between " "DDS and CCM\n\n")); #endif ACE_DEBUG ((LM_DEBUG, "Collecting statistics on %d samples per message size.\n" "This is the roundtrip time, *not* the one-way-latency\n" "bytes ,stdev us,ave us, min us, 50%% us, 90%% us, 99%% us, 99.99%%," " max us\n" "------,-------,-------,-------,-------,-------,-------,-------," "-------\n" "%6d,%7.1f,%7.1f,%7.1f,%7.1f,%7.1f,%7.1f,%7.1f,%7.1f\n", this->count_, this->datalen_, roundtrip_time_std, avg, (double)this->tv_min_, (double)this->duration_times_[per50-1], (double)this->duration_times_[per90-1], (double)this->duration_times_[per99-1], (double)this->duration_times_[per9999-1], (double)this->tv_max_)); } else { ACE_DEBUG ((LM_DEBUG, "%6d,%7.1f,%7.1f,%7.1f,%7.1f,%7.1f,%7.1f,%7.1f,%7.1f\n", this->datalen_, roundtrip_time_std, avg, (double)this->tv_min_, (double)this->duration_times_[per50-1], (double)this->duration_times_[per90-1], (double)this->duration_times_[per99-1], (double)this->duration_times_[per9999-1], (double)this->tv_max_)); } } else { ACE_ERROR ((LM_ERROR, "SUMMARY SENDER latency time:\n " "No samples received back.\n")); } } void Sender_exec_i::start (void) { // This->sleep_ is in ms unsigned int sec = this->sleep_ / 1000; unsigned int usec = (this->sleep_ % 1000) * 1000; ACE_DEBUG ((LM_DEBUG, "Sender_exec_i::start - " "Start test with interval <%u.%u>\n", sec, usec)); (void) ACE_High_Res_Timer::global_scale_factor (); this->reactor ()->timer_queue ()->gettimeofday (&ACE_High_Res_Timer::gettimeofday_hr); if (this->reactor ()->schedule_timer( this->ticker_, 0, ACE_Time_Value (5, 0), ACE_Time_Value (sec, usec)) == -1) { ACE_ERROR ((LM_ERROR, ACE_TEXT ("Sender_exec_i::start : ") ACE_TEXT ("Error scheduling timer"))); } this->timer_ = true; } void Sender_exec_i::stop (void) { if (this->timer_.value ()) { this->reactor ()->cancel_timer (this->ticker_); } } void Sender_exec_i::record_time (ACE_UINT64 receive_time) { ACE_UINT64 interval = receive_time - this->start_time_; ACE_UINT64 duration = interval - this->_clock_overhead_; int i = ++this->count_; this->duration_times_[i-1] = duration; this->sigma_duration_squared_ += (double)duration * (double)duration; this->tv_total_ += duration; if (duration > this->tv_max_ || this->tv_max_ == 0L) { this->tv_max_ = duration; } if (duration < this->tv_min_ || this->tv_min_ == 0L) { this->tv_min_ = duration; } } void Sender_exec_i::calculate_clock_overhead (void) { int num_of_loops_clock = 320; ACE_UINT64 begin_time = 0; ACE_UINT64 clock_roundtrip_time = 0; ACE_High_Res_Timer::gettimeofday_hr ().to_usec (begin_time); for (int i = 0; i < num_of_loops_clock; ++i) { ACE_High_Res_Timer::gettimeofday_hr ().to_usec (clock_roundtrip_time); } ACE_UINT64 total_time = clock_roundtrip_time - begin_time; this->_clock_overhead_ = (long)(total_time / num_of_loops_clock); } void Sender_exec_i::init_values (void) { delete [] this->duration_times_; ACE_NEW_THROW_EX (this->duration_times_, ACE_UINT64[this->iterations_], ::CORBA::NO_MEMORY ()); int start = 16; for (int i = 0; i < this->nr_of_runs_; i++) { this->datalen_range_[i] = start; start = 2 * start; } this->datalen_ = this->datalen_range_[0]; // make instances of Topic this->test_topic_.seq_num = 0; this->test_topic_.data.length (this->datalen_); calculate_clock_overhead (); } // Component attributes and port operations. ::LatencyTT_Test::LatencyTTTestConn::CCM_Listener_ptr Sender_exec_i::get_ping_listen_data_listener (void) { if ( ::CORBA::is_nil (this->ciao_ping_listen_data_listener_.in ())) { ping_listen_data_listener_exec_i *tmp = 0; ACE_NEW_RETURN ( tmp, ping_listen_data_listener_exec_i ( this->ciao_context_.in (), *this), ::LatencyTT_Test::LatencyTTTestConn::CCM_Listener::_nil ()); this->ciao_ping_listen_data_listener_ = tmp; } return ::LatencyTT_Test::LatencyTTTestConn::CCM_Listener::_duplicate ( this->ciao_ping_listen_data_listener_.in ()); } ::CCM_DDS::CCM_PortStatusListener_ptr Sender_exec_i::get_ping_listen_status (void) { return ::CCM_DDS::CCM_PortStatusListener::_nil (); } ::CCM_DDS::CCM_ConnectorStatusListener_ptr Sender_exec_i::get_connector_status (void) { if ( ::CORBA::is_nil (this->ciao_connector_status_.in ())) { connector_status_exec_i *tmp = 0; ACE_NEW_RETURN ( tmp, connector_status_exec_i ( this->ciao_context_.in (), *this, this->matched_, this->number_of_sub_, this->unexpected_count_), ::CCM_DDS::CCM_ConnectorStatusListener::_nil ()); this->ciao_connector_status_ = tmp; } return ::CCM_DDS::CCM_ConnectorStatusListener::_duplicate ( this->ciao_connector_status_.in ()); } ::CORBA::ULong Sender_exec_i::iterations (void) { return this->iterations_; } void Sender_exec_i::iterations ( const ::CORBA::ULong iterations) { this->iterations_ = iterations; } ::CORBA::UShort Sender_exec_i::sleep (void) { return this->sleep_; } void Sender_exec_i::sleep ( const ::CORBA::UShort sleep) { this->sleep_ = sleep; } ::CORBA::UShort Sender_exec_i::number_of_sub (void) { return this->number_of_sub_; } void Sender_exec_i::number_of_sub ( const ::CORBA::UShort number_of_sub) { if (number_of_sub > 0) { this->number_of_sub_ = number_of_sub; } else { this->number_of_sub_ = 1; } } // Operations from Components::SessionComponent. void Sender_exec_i::set_session_context ( ::Components::SessionContext_ptr ctx) { this->ciao_context_ = ::LatencyTT_Test::CCM_Sender_Context::_narrow (ctx); if ( ::CORBA::is_nil (this->ciao_context_.in ())) { throw ::CORBA::INTERNAL (); } } void Sender_exec_i::configuration_complete (void) { /* Your code here. */ } void Sender_exec_i::ccm_activate (void) { try { this->writer_ = this->ciao_context_->get_connection_info_write_data (); ::CCM_DDS::DataListenerControl_var dlc = this->ciao_context_->get_connection_ping_listen_data_control (); dlc->mode (::CCM_DDS::ONE_BY_ONE); } catch (const CORBA::Exception& ex) { ex._tao_print_exception ("Exception caught:"); ACE_ERROR ((LM_ERROR, ACE_TEXT ("ERROR: Sender_exec_i::ccm_activate: Exception caught\n"))); } catch (...) { ACE_ERROR ((LM_ERROR, ACE_TEXT ("ERROR: Sender_exec_i::ccm_activate: Unknown exception caught\n"))); } this->init_values(); } void Sender_exec_i::ccm_passivate (void) { this->stop (); } void Sender_exec_i::ccm_remove (void) { if (this->nr_of_runs_ -1 != this->datalen_idx_) { if (this->datalen_idx_ == 0) { ACE_ERROR ((LM_ERROR, "ERROR SENDER: No run has taken place.\n")); } else { ACE_DEBUG ((LM_DEBUG, "SUMMARY SENDER : %u of %u runs completed.\n" " Number of messages sent of last run (%u): %u\n", this->datalen_idx_, this->nr_of_runs_, this->datalen_idx_ + 1, this->number_of_msg_)); } } else { ACE_UINT64 test_time_usec = this->end_time_test_ - this->start_time_test_; double sec = (double)test_time_usec / (1000 * 1000); ACE_DEBUG ((LM_DEBUG, "TEST successful, number of runs (%u) of " "%u messages in %3.3f seconds.\n", this->nr_of_runs_, this->number_of_msg_, sec)); } ACE_DEBUG ((LM_DEBUG, "\tNumber of unexpected events : %u\n", this->unexpected_count_.value ())); } extern "C" SENDER_EXEC_Export ::Components::EnterpriseComponent_ptr create_LatencyTT_Test_Sender_Impl (void) { ::Components::EnterpriseComponent_ptr retval = ::Components::EnterpriseComponent::_nil (); ACE_NEW_NORETURN ( retval, Sender_exec_i); return retval; } }