summaryrefslogtreecommitdiff
path: root/ACE/TAO/orbsvcs/tests/EC_Throughput/ECT_Consumer.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'ACE/TAO/orbsvcs/tests/EC_Throughput/ECT_Consumer.cpp')
-rw-r--r--ACE/TAO/orbsvcs/tests/EC_Throughput/ECT_Consumer.cpp176
1 files changed, 176 insertions, 0 deletions
diff --git a/ACE/TAO/orbsvcs/tests/EC_Throughput/ECT_Consumer.cpp b/ACE/TAO/orbsvcs/tests/EC_Throughput/ECT_Consumer.cpp
new file mode 100644
index 00000000000..26d765f6145
--- /dev/null
+++ b/ACE/TAO/orbsvcs/tests/EC_Throughput/ECT_Consumer.cpp
@@ -0,0 +1,176 @@
+// $Id$
+
+#include "ECT_Consumer.h"
+
+#include "orbsvcs/Event_Utilities.h"
+#include "orbsvcs/Event_Service_Constants.h"
+#include "orbsvcs/Time_Utilities.h"
+
+#include "tao/Timeprobe.h"
+#include "tao/debug.h"
+
+#include "ace/Get_Opt.h"
+#include "ace/Auto_Ptr.h"
+#include "ace/Sched_Params.h"
+#include "ace/OS_NS_unistd.h"
+
+ACE_RCSID (EC_Throughput,
+ ECT_Consumer,
+ "$Id$")
+
+Test_Consumer::Test_Consumer (ECT_Driver *driver,
+ void *cookie,
+ int n_suppliers,
+ int stall_length)
+ : driver_ (driver),
+ cookie_ (cookie),
+ n_suppliers_ (n_suppliers),
+ recv_count_ (0),
+ shutdown_count_ (0),
+ stall_length_(stall_length)
+{
+}
+
+void
+Test_Consumer::connect (RtecScheduler::Scheduler_ptr scheduler,
+ const char* name,
+ int type_start,
+ int type_count,
+ RtecEventChannelAdmin::EventChannel_ptr ec)
+{
+ RtecScheduler::handle_t rt_info =
+ scheduler->create (name);
+
+ // The worst case execution time is far less than 2
+ // milliseconds, but that is a safe estimate....
+ ACE_Time_Value tv (0, 2000);
+ TimeBase::TimeT time;
+ ORBSVCS_Time::Time_Value_to_TimeT (time, tv);
+ scheduler->set (rt_info,
+ RtecScheduler::VERY_HIGH_CRITICALITY,
+ time, time, time,
+ 0,
+ RtecScheduler::VERY_LOW_IMPORTANCE,
+ time,
+ 0,
+ RtecScheduler::OPERATION);
+
+ ACE_ConsumerQOS_Factory qos;
+ qos.start_disjunction_group ();
+ qos.insert_type (ACE_ES_EVENT_SHUTDOWN, rt_info);
+ for (int i = 0; i != type_count; ++i)
+ {
+ qos.insert_type (type_start + i, rt_info);
+ }
+
+ // = Connect as a consumer.
+ RtecEventChannelAdmin::ConsumerAdmin_var consumer_admin =
+ ec->for_consumers ();
+
+ this->supplier_proxy_ =
+ consumer_admin->obtain_push_supplier ();
+
+ RtecEventComm::PushConsumer_var objref = this->_this ();
+
+ this->supplier_proxy_->connect_push_consumer (objref.in (),
+ qos.get_ConsumerQOS ());
+}
+
+void
+Test_Consumer::disconnect (void)
+{
+ if (CORBA::is_nil (this->supplier_proxy_.in ()))
+ return;
+
+ this->supplier_proxy_->disconnect_push_supplier ();
+
+ this->supplier_proxy_ =
+ RtecEventChannelAdmin::ProxyPushSupplier::_nil ();
+
+ // Deactivate the servant
+ PortableServer::POA_var poa =
+ this->_default_POA ();
+ PortableServer::ObjectId_var id =
+ poa->servant_to_id (this);
+ poa->deactivate_object (id.in ());
+}
+
+void
+Test_Consumer::dump_results (const ACE_TCHAR* name,
+ ACE_UINT32 gsf)
+{
+ this->throughput_.dump_results (name, gsf);
+}
+
+void
+Test_Consumer::accumulate (ACE_Throughput_Stats& stats) const
+{
+ stats.accumulate (this->throughput_);
+}
+
+void
+Test_Consumer::push (const RtecEventComm::EventSet& events)
+{
+ if (events.length () == 0)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "ECT_Consumer (%P|%t) no events\n"));
+ return;
+ }
+
+ ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->lock_);
+
+ // We start the timer as soon as we receive the first event...
+ if (this->recv_count_ == 0)
+ {
+ this->first_event_ = ACE_OS::gethrtime ();
+ ACE_DEBUG ((LM_DEBUG,
+ "ECT_Consumer (%P|%t) stalling for %d seconds\n", this->stall_length_));
+ ACE_OS::sleep(this->stall_length_);
+ ACE_DEBUG ((LM_DEBUG, "ECT_Consumer (%P|%t) finished stalling\n"));
+ }
+
+
+ this->recv_count_ += events.length ();
+
+ if (TAO_debug_level > 0
+ && this->recv_count_ % 100 == 0)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "ECT_Consumer (%P|%t): %d events received\n",
+ this->recv_count_));
+ }
+
+ // ACE_DEBUG ((LM_DEBUG, "%d event(s)\n", events.length ()));
+
+ for (u_int i = 0; i < events.length (); ++i)
+ {
+ const RtecEventComm::Event& e = events[i];
+
+ if (e.header.type == ACE_ES_EVENT_SHUTDOWN)
+ {
+ this->shutdown_count_++;
+ if (this->shutdown_count_ >= this->n_suppliers_)
+ {
+ // We stop the timer as soon as we realize it is time to
+ // do so.
+ this->driver_->shutdown_consumer (this->cookie_);
+ }
+ }
+ else
+ {
+ ACE_hrtime_t creation;
+ ORBSVCS_Time::TimeT_to_hrtime (creation,
+ e.header.creation_time);
+
+ const ACE_hrtime_t now = ACE_OS::gethrtime ();
+ this->throughput_.sample (now - this->first_event_,
+ now - creation);
+ }
+ }
+}
+
+void
+Test_Consumer::disconnect_push_consumer (void)
+{
+}