summaryrefslogtreecommitdiff
path: root/TAO/orbsvcs/examples/RtEC/test_driver/Consumer.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'TAO/orbsvcs/examples/RtEC/test_driver/Consumer.cpp')
-rw-r--r--TAO/orbsvcs/examples/RtEC/test_driver/Consumer.cpp193
1 files changed, 193 insertions, 0 deletions
diff --git a/TAO/orbsvcs/examples/RtEC/test_driver/Consumer.cpp b/TAO/orbsvcs/examples/RtEC/test_driver/Consumer.cpp
new file mode 100644
index 00000000000..c0ff8bef3a1
--- /dev/null
+++ b/TAO/orbsvcs/examples/RtEC/test_driver/Consumer.cpp
@@ -0,0 +1,193 @@
+// $Id$
+
+#include "Consumer.h"
+
+#include <sstream> //for ostringstream
+
+#include "ace/Thread.h"
+#include "orbsvcs/Event_Utilities.h" //for ACE_Supplier/ConsumerQOS_Factory
+#include "orbsvcs/RtecSchedulerC.h"
+
+ACE_RCSID(EC_Examples, Consumer, "$Id$")
+
+Consumer::Consumer (void)
+ : _consumer(this)
+ , _consumer_id(-1)
+{
+}
+
+Consumer::~Consumer(void)
+{
+}
+
+void
+Consumer::connect (RtecScheduler::Scheduler_ptr scheduler,
+ const char *entry_prefix,
+ int consumer_id, //unique identifier
+ long event_type,
+ RtecEventChannelAdmin::EventChannel_ptr ec
+ ACE_ENV_ARG_DECL)
+{
+ this->connect_impl(false,
+ scheduler,
+ entry_prefix,
+ consumer_id,
+ event_type,
+ 0, //period; ignored
+ RtecScheduler::VERY_LOW_IMPORTANCE, //ignored
+ RtecScheduler::VERY_LOW_CRITICALITY, //ignored
+ ec
+ ACE_ENV_ARG_PARAMETER);
+}
+
+void
+Consumer::connect (RtecScheduler::Scheduler_ptr scheduler,
+ const char *entry_prefix,
+ int consumer_id, //unique identifier
+ long event_type,
+ TimeBase::TimeT period,
+ RtecScheduler::Importance_t importance,
+ RtecScheduler::Criticality_t criticality,
+ RtecEventChannelAdmin::EventChannel_ptr ec
+ ACE_ENV_ARG_DECL)
+{
+ this->connect_impl(true,
+ scheduler,
+ entry_prefix,
+ consumer_id,
+ event_type,
+ period,
+ importance,
+ criticality,
+ ec
+ ACE_ENV_ARG_PARAMETER);
+}
+
+void
+Consumer::connect_impl (bool set_rtinfo, //true if should set RT_Info
+ RtecScheduler::Scheduler_ptr scheduler,
+ const char *entry_prefix,
+ int consumer_id, //unique identifier
+ long event_type,
+ TimeBase::TimeT period,
+ RtecScheduler::Importance_t importance,
+ RtecScheduler::Criticality_t criticality,
+ RtecEventChannelAdmin::EventChannel_ptr ec
+ ACE_ENV_ARG_DECL)
+{
+ this->_consumer_id = consumer_id;
+
+ //create consumer RT_Info
+ std::ostringstream cons_entry_pt;
+ cons_entry_pt << entry_prefix; //unique RT_Info entry point
+ ACE_DEBUG((LM_DEBUG,"Creating %s\n",cons_entry_pt.str().c_str()));
+ RtecScheduler::handle_t rt_info = scheduler->create (cons_entry_pt.str().c_str()
+ ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK;
+ if (set_rtinfo)
+ {
+ ACE_Time_Value tv (0,0);
+ TimeBase::TimeT tmp;
+ ORBSVCS_Time::Time_Value_to_TimeT (tmp, tv);
+ scheduler->set (rt_info,
+ criticality,
+ tmp,tmp,tmp,
+ period,
+ importance,
+ tmp,
+ 0,
+ RtecScheduler::OPERATION
+ ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK;
+ ACE_DEBUG((LM_DEBUG,"Set Consumer %d RT_Info\n",this->_consumer_id));
+ } else
+ {
+ ACE_DEBUG((LM_DEBUG,"NOT Set Consumer %d RT_Info\n",this->_consumer_id));
+ }
+
+ // Register as consumer of appropriate event type
+ ACE_ConsumerQOS_Factory consQoS;
+ consQoS.insert_type(event_type,
+ rt_info);
+
+ RtecEventChannelAdmin::ConsumerAdmin_var consumer_admin =
+ ec->for_consumers (ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_CHECK;
+
+ this->_supplier_proxy =
+ consumer_admin->obtain_push_supplier (ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_CHECK;
+
+ RtecEventComm::PushConsumer_var consumerv =
+ this->_consumer._this (ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_CHECK;
+
+ this->_supplier_proxy->connect_push_consumer (consumerv.in (),
+ consQoS.get_ConsumerQOS ()
+ ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK;
+
+ ACE_DEBUG((LM_DEBUG,"Consumer %d connected\n",this->_consumer_id));
+ ACE_DEBUG((LM_DEBUG,"\tEvent type: %d\n",event_type));
+}
+
+void
+Consumer::disconnect (ACE_ENV_SINGLE_ARG_DECL)
+{
+ //disconnect consumer
+
+ if (! CORBA::is_nil (this->_supplier_proxy.in()))
+ {
+ this->_supplier_proxy->disconnect_push_supplier (ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_CHECK;
+
+ this->_supplier_proxy = RtecEventChannelAdmin::ProxyPushSupplier::_nil();
+
+ //Deactivate the servant
+ PortableServer::POA_var poa =
+ this->_consumer._default_POA(ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_CHECK;
+ PortableServer::ObjectId_var id =
+ poa->servant_to_id (&this->_consumer ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK;
+ poa->deactivate_object(id.in() ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK;
+ ACE_DEBUG((LM_DEBUG,"Consumer %d disconnected\n",this->_consumer_id));
+ } else
+ {
+ ACE_DEBUG((LM_DEBUG,"Cannot disconnect; Consumer %d not connected!\n",this->_consumer_id));
+ }
+}
+
+void
+Consumer::push (const RtecEventComm::EventSet& events
+ ACE_ENV_ARG_DECL_NOT_USED)
+ ACE_THROW_SPEC ((CORBA::SystemException))
+{
+ if (events.length () == 0)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "Consumer (%P|%t) no events\n"));
+ return;
+ }
+
+ int prio = -1;
+ ACE_hthread_t handle;
+ ACE_Thread::self(handle);
+ ACE_Thread::getprio(handle,prio);
+ //ACE_thread_t tid = ACE_Thread::self();
+ ACE_DEBUG ((LM_DEBUG, "Consumer #%d @%d (%P|%t) we received event type %d\n",
+ this->_consumer_id,prio,events[0].header.type));
+}
+
+void
+Consumer::disconnect_push_consumer (ACE_ENV_SINGLE_ARG_DECL_NOT_USED)
+ ACE_THROW_SPEC ((CORBA::SystemException))
+{
+}
+
+// ****************************************************************
+
+#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)
+#elif defined(ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA)
+#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */