summaryrefslogtreecommitdiff
path: root/TAO/orbsvcs/tests/EC_Throughput/ECT_Supplier.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'TAO/orbsvcs/tests/EC_Throughput/ECT_Supplier.cpp')
-rw-r--r--TAO/orbsvcs/tests/EC_Throughput/ECT_Supplier.cpp243
1 files changed, 243 insertions, 0 deletions
diff --git a/TAO/orbsvcs/tests/EC_Throughput/ECT_Supplier.cpp b/TAO/orbsvcs/tests/EC_Throughput/ECT_Supplier.cpp
new file mode 100644
index 00000000000..49efa019ecf
--- /dev/null
+++ b/TAO/orbsvcs/tests/EC_Throughput/ECT_Supplier.cpp
@@ -0,0 +1,243 @@
+// $Id$
+
+#include "ECT_Supplier.h"
+
+#include "orbsvcs/Event_Utilities.h"
+#include "orbsvcs/Event_Service_Constants.h"
+#include "orbsvcs/Time_Utilities.h"
+
+#include "tao/Timeprobe.h"
+#include "tao/debug.h"
+
+#include "ace/Get_Opt.h"
+#include "ace/Auto_Ptr.h"
+#include "ace/Sched_Params.h"
+#include "ace/High_Res_Timer.h"
+#include "ace/ACE.h"
+#include "ace/OS_NS_unistd.h"
+
+ACE_RCSID (EC_Throughput,
+ ECT_Supplier,
+ "$Id$")
+
+Test_Supplier::Test_Supplier (ECT_Driver *driver)
+ : driver_ (driver),
+ supplier_ (this),
+ burst_count_ (0),
+ burst_size_ (0),
+ event_size_ (0),
+ burst_pause_ (0),
+ type_start_ (ACE_ES_EVENT_UNDEFINED),
+ type_count_ (1)
+{
+}
+
+void
+Test_Supplier::connect (RtecScheduler::Scheduler_ptr scheduler,
+ const char* name,
+ int burst_count,
+ int burst_size,
+ int event_size,
+ int burst_pause,
+ int type_start,
+ int type_count,
+ RtecEventChannelAdmin::EventChannel_ptr ec)
+{
+ this->burst_count_ = burst_count;
+ this->burst_size_ = burst_size;
+ this->event_size_ = event_size;
+ this->burst_pause_ = burst_pause;
+ this->type_start_ = type_start;
+ this->type_count_ = type_count;
+
+ RtecScheduler::handle_t rt_info =
+ scheduler->create (name);
+
+ ACE_Time_Value tv (0, burst_pause);
+ RtecScheduler::Period_t rate = tv.usec () * 10;
+
+ // The execution times are set to reasonable values, but
+ // actually they are changed on the real execution, i.e. we
+ // lie to the scheduler to obtain right priorities; but we
+ // don't care if the set is schedulable.
+ tv.set (0, 2000);
+ TimeBase::TimeT time;
+ ORBSVCS_Time::Time_Value_to_TimeT (time, tv);
+ scheduler->set (rt_info,
+ RtecScheduler::VERY_HIGH_CRITICALITY,
+ time, time, time,
+ rate,
+ RtecScheduler::VERY_LOW_IMPORTANCE,
+ time,
+ 1,
+ RtecScheduler::OPERATION);
+
+ this->supplier_id_ = ACE::crc32 (name);
+ ACE_DEBUG ((LM_DEBUG, "ID for <%s> is %04.4x\n", name,
+ this->supplier_id_));
+
+ ACE_SupplierQOS_Factory qos;
+ for (int i = 0; i != type_count; ++i)
+ {
+ qos.insert (this->supplier_id_,
+ type_start + i,
+ rt_info, 1);
+ }
+ qos.insert (this->supplier_id_,
+ ACE_ES_EVENT_SHUTDOWN,
+ rt_info, 1);
+
+ RtecEventChannelAdmin::SupplierAdmin_var supplier_admin =
+ ec->for_suppliers ();
+
+ this->consumer_proxy_ =
+ supplier_admin->obtain_push_consumer ();
+
+ RtecEventComm::PushSupplier_var objref =
+ this->supplier_._this ();
+
+ this->consumer_proxy_->connect_push_supplier (objref.in (),
+ qos.get_SupplierQOS ());
+}
+
+void
+Test_Supplier::disconnect (void)
+{
+ if (CORBA::is_nil (this->consumer_proxy_.in ()))
+ return;
+
+ this->consumer_proxy_->disconnect_push_consumer ();
+
+ this->consumer_proxy_ =
+ RtecEventChannelAdmin::ProxyPushConsumer::_nil ();
+
+ // Deactivate the servant
+ PortableServer::POA_var poa =
+ this->supplier_._default_POA ();
+ PortableServer::ObjectId_var id =
+ poa->servant_to_id (&this->supplier_);
+ poa->deactivate_object (id.in ());
+}
+
+int
+Test_Supplier::svc ()
+{
+ try
+ {
+ // Initialize a time value to pace the test
+ ACE_Time_Value tv (0, this->burst_pause_);
+
+ // Pre-allocate a message to send
+ ACE_Message_Block mb (this->event_size_);
+ mb.wr_ptr (this->event_size_);
+
+ RtecEventComm::EventSet event (1);
+ event.length (1);
+ event[0].header.source = this->supplier_id ();
+ event[0].header.ttl = 1;
+
+ ACE_hrtime_t t = ACE_OS::gethrtime ();
+ ORBSVCS_Time::hrtime_to_TimeT (event[0].header.creation_time, t);
+
+ // We use replace to minimize the copies, this should result
+ // in just one memory allocation;
+#if (TAO_NO_COPY_OCTET_SEQUENCES == 1)
+ event[0].data.payload.replace (this->event_size_,
+ &mb);
+#else
+ // If the replace method is not available, we will need
+ // to do the copy manually. First, set the octet sequence length.
+ event[0].data.payload.length (this->event_size_);
+
+ // Now copy over each byte.
+ char* base = mb.data_block ()->base ();
+ for(CORBA::ULong i = 0; i < (CORBA::ULong)this->event_size_; i++)
+ {
+ event[0].data.payload[i] = base[i];
+ }
+#endif /* TAO_NO_COPY_OCTET_SEQUENCES == 1 */
+
+ ACE_hrtime_t test_start = ACE_OS::gethrtime ();
+
+ for (int i = 0; i < this->burst_count_; ++i)
+ {
+ for (int j = 0; j < this->burst_size_; ++j)
+ {
+ event[0].header.type =
+ this->type_start_ + j % this->type_count_;
+
+ ACE_hrtime_t request_start = ACE_OS::gethrtime ();
+ ORBSVCS_Time::hrtime_to_TimeT (event[0].header.creation_time,
+ request_start);
+ // ACE_DEBUG ((LM_DEBUG, "(%t) supplier push event\n"));
+ this->consumer_proxy ()->push (event);
+
+ ACE_hrtime_t end = ACE_OS::gethrtime ();
+ this->throughput_.sample (end - test_start,
+ end - request_start);
+ }
+
+ if (TAO_debug_level > 0
+ && i % 100 == 0)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "ECT_Supplier (%P|%t): %d bursts sent\n",
+ i));
+ }
+
+ ACE_OS::sleep (tv);
+ }
+
+ // Send one event shutdown from each supplier
+ event[0].header.type = ACE_ES_EVENT_SHUTDOWN;
+ ACE_hrtime_t request_start = ACE_OS::gethrtime ();
+ ORBSVCS_Time::hrtime_to_TimeT (event[0].header.creation_time,
+ request_start);
+ this->consumer_proxy ()->push(event);
+ ACE_hrtime_t end = ACE_OS::gethrtime ();
+ this->throughput_.sample (end - test_start,
+ end - request_start);
+ }
+ catch (const CORBA::SystemException& sys_ex)
+ {
+ sys_ex._tao_print_exception ("SYS_EX");
+ }
+ catch (const CORBA::Exception& ex)
+ {
+ ex._tao_print_exception ("NON SYS EX");
+ }
+
+ ACE_DEBUG ((LM_DEBUG,
+ "Supplier %4.4x completed\n",
+ this->supplier_id_));
+ return 0;
+}
+
+void
+Test_Supplier::disconnect_push_supplier (void)
+{
+}
+
+int Test_Supplier::supplier_id (void) const
+{
+ return this->supplier_id_;
+}
+
+RtecEventChannelAdmin::ProxyPushConsumer_ptr
+Test_Supplier::consumer_proxy (void)
+{
+ return this->consumer_proxy_.in ();
+}
+
+void
+Test_Supplier::dump_results (const char* name,
+ ACE_UINT32 gsf)
+{
+ this->throughput_.dump_results (name, gsf);
+}
+
+void
+Test_Supplier::accumulate (ACE_Throughput_Stats& stats) const
+{
+ stats.accumulate (this->throughput_);
+}