summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorthrall <thrall@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>2003-10-08 22:08:53 +0000
committerthrall <thrall@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>2003-10-08 22:08:53 +0000
commit7e83a1bb944f50b67f118923afd055d5cb5b1fa6 (patch)
tree7c4ccc1d8970e6b7594e67c7ce3b3708efb69569
parent97abc1130e52c2bfb312f467cefbb75ff45eea22 (diff)
downloadATCD-EC_Kokyu_0.tar.gz
Lots of changes. Refactored RT_Info creation into TimeoutConsumer,EC_Kokyu_0
Consumer. Spawns thread for orb->run(), shuts down correctly when all events sent from TimeoutConsumers.
-rw-r--r--TAO/orbsvcs/tests/EC_Config/Config_Factory.h6
-rw-r--r--TAO/orbsvcs/tests/EC_Config/Consumer.cpp151
-rw-r--r--TAO/orbsvcs/tests/EC_Config/Consumer.h75
-rw-r--r--TAO/orbsvcs/tests/EC_Config/ECConfig.cpp506
-rw-r--r--TAO/orbsvcs/tests/EC_Config/ECConfig.h52
-rw-r--r--TAO/orbsvcs/tests/EC_Config/Makefile8
-rw-r--r--TAO/orbsvcs/tests/EC_Config/Test.cpp64
-rw-r--r--TAO/orbsvcs/tests/EC_Config/TestConfig.h2
-rw-r--r--TAO/orbsvcs/tests/EC_Config/Test_Handler.cpp10
-rw-r--r--TAO/orbsvcs/tests/EC_Config/Test_Handler.h7
-rw-r--r--TAO/orbsvcs/tests/EC_Config/TimeoutConsumer.cpp281
-rw-r--r--TAO/orbsvcs/tests/EC_Config/TimeoutConsumer.h110
-rw-r--r--TAO/orbsvcs/tests/EC_Config/test.xml6
-rw-r--r--TAO/orbsvcs/tests/EC_Config/testconfig.dtd1
14 files changed, 952 insertions, 327 deletions
diff --git a/TAO/orbsvcs/tests/EC_Config/Config_Factory.h b/TAO/orbsvcs/tests/EC_Config/Config_Factory.h
index 95af81c0620..0c38b350f16 100644
--- a/TAO/orbsvcs/tests/EC_Config/Config_Factory.h
+++ b/TAO/orbsvcs/tests/EC_Config/Config_Factory.h
@@ -37,6 +37,10 @@ namespace ConfigFactory {
class Config_Factory : ACE_Service_Object
{
public:
+ ///Constructor
+ Config_Factory (void) {}
+
+ ///Destructor
virtual ~Config_Factory (void);
/// Create and destroy the TestConfig module.
@@ -73,7 +77,7 @@ public:
/// Constructor
Default_Config_Factory (void);
- /// destructor...
+ /// Destructor...
virtual ~Default_Config_Factory (void);
/// Helper function to register the default factory into the service
diff --git a/TAO/orbsvcs/tests/EC_Config/Consumer.cpp b/TAO/orbsvcs/tests/EC_Config/Consumer.cpp
index 78d35d206de..8acacceeea6 100644
--- a/TAO/orbsvcs/tests/EC_Config/Consumer.cpp
+++ b/TAO/orbsvcs/tests/EC_Config/Consumer.cpp
@@ -1,18 +1,159 @@
// $Id$
-//#include "ace/Thread.h"
#include "Consumer.h"
+#include <sstream> //for ostringstream
+
+#include "ace/Thread.h"
+#include "orbsvcs/Event_Utilities.h" //for ACE_Supplier/ConsumerQOS_Factory
+#include "orbsvcs/RtecSchedulerC.h"
+
ACE_RCSID(EC_Examples, Consumer, "$Id$")
Consumer::Consumer (void)
- : _ordinal(-1)
+ : _consumer(this)
+ , _consumer_id(-1)
+{
+}
+
+Consumer::~Consumer(void)
+{
+}
+
+void
+Consumer::connect (RtecScheduler::Scheduler_ptr scheduler,
+ const char *entry_prefix,
+ int consumer_id, //unique identifier
+ long event_type,
+ RtecEventChannelAdmin::EventChannel_ptr ec
+ ACE_ENV_ARG_DECL)
+{
+ this->connect_impl(false,
+ scheduler,
+ entry_prefix,
+ consumer_id,
+ event_type,
+ 0, //period; ignored
+ RtecScheduler::VERY_LOW_IMPORTANCE, //ignored
+ RtecScheduler::VERY_LOW_CRITICALITY, //ignored
+ ec
+ ACE_ENV_ARG_PARAMETER);
+}
+
+void
+Consumer::connect (RtecScheduler::Scheduler_ptr scheduler,
+ const char *entry_prefix,
+ int consumer_id, //unique identifier
+ long event_type,
+ TimeBase::TimeT period,
+ RtecScheduler::Importance_t importance,
+ RtecScheduler::Criticality_t criticality,
+ RtecEventChannelAdmin::EventChannel_ptr ec
+ ACE_ENV_ARG_DECL)
+{
+ this->connect_impl(true,
+ scheduler,
+ entry_prefix,
+ consumer_id,
+ event_type,
+ period,
+ importance,
+ criticality,
+ ec
+ ACE_ENV_ARG_PARAMETER);
+}
+
+void
+Consumer::connect_impl (bool set_rtinfo, //true if should set RT_Info
+ RtecScheduler::Scheduler_ptr scheduler,
+ const char *entry_prefix,
+ int consumer_id, //unique identifier
+ long event_type,
+ TimeBase::TimeT period,
+ RtecScheduler::Importance_t importance,
+ RtecScheduler::Criticality_t criticality,
+ RtecEventChannelAdmin::EventChannel_ptr ec
+ ACE_ENV_ARG_DECL)
{
+ this->_consumer_id = consumer_id;
+
+ //create consumer RT_Info
+ std::ostringstream cons_entry_pt;
+ cons_entry_pt << entry_prefix; //unique RT_Info entry point
+ ACE_DEBUG((LM_DEBUG,"Creating %s\n",cons_entry_pt.str().c_str()));
+ RtecScheduler::handle_t rt_info = scheduler->create (cons_entry_pt.str().c_str()
+ ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK;
+ if (set_rtinfo)
+ {
+ scheduler->set (rt_info,
+ criticality,
+ period, period, period,
+ period,
+ importance,
+ period,
+ 0,
+ RtecScheduler::OPERATION
+ ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK;
+ ACE_DEBUG((LM_DEBUG,"Set Consumer %d RT_Info\n",this->_consumer_id));
+ } else
+ {
+ ACE_DEBUG((LM_DEBUG,"NOT Set Consumer %d RT_Info\n",this->_consumer_id));
+ }
+
+ // Register as consumer of appropriate event type
+ ACE_ConsumerQOS_Factory consQoS;
+ consQoS.insert_type(event_type,
+ rt_info);
+
+ RtecEventChannelAdmin::ConsumerAdmin_var consumer_admin =
+ ec->for_consumers (ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_CHECK;
+
+ this->_supplier_proxy =
+ consumer_admin->obtain_push_supplier (ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_CHECK;
+
+ RtecEventComm::PushConsumer_var consumerv =
+ this->_consumer._this (ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_CHECK;
+
+ this->_supplier_proxy->connect_push_consumer (consumerv.in (),
+ consQoS.get_ConsumerQOS ()
+ ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK;
+
+ ACE_DEBUG((LM_DEBUG,"Consumer %d connected\n",this->_consumer_id));
+ ACE_DEBUG((LM_DEBUG,"\tEvent type: %d\n",event_type));
}
-Consumer::Consumer(int ord)
- : _ordinal(ord)
+void
+Consumer::disconnect (ACE_ENV_SINGLE_ARG_DECL)
{
+ //disconnect consumer
+
+ if (! CORBA::is_nil (this->_supplier_proxy.in()))
+ {
+ this->_supplier_proxy->disconnect_push_supplier (ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_CHECK;
+
+ this->_supplier_proxy = RtecEventChannelAdmin::ProxyPushSupplier::_nil();
+
+ //Deactivate the servant
+ PortableServer::POA_var poa =
+ this->_consumer._default_POA(ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_CHECK;
+ PortableServer::ObjectId_var id =
+ poa->servant_to_id (&this->_consumer ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK;
+ poa->deactivate_object(id.in() ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK;
+ ACE_DEBUG((LM_DEBUG,"Consumer %d disconnected\n",this->_consumer_id));
+ } else
+ {
+ ACE_DEBUG((LM_DEBUG,"Cannot disconnect; Consumer %d not connected!\n",this->_consumer_id));
+ }
}
void
@@ -33,7 +174,7 @@ Consumer::push (const RtecEventComm::EventSet& events
ACE_Thread::getprio(handle,prio);
//ACE_thread_t tid = ACE_Thread::self();
ACE_DEBUG ((LM_DEBUG, "Consumer #%d @%d (%P|%t) we received event type %d\n",
- _ordinal,prio,events[0].header.type));
+ this->_consumer_id,prio,events[0].header.type));
}
void
diff --git a/TAO/orbsvcs/tests/EC_Config/Consumer.h b/TAO/orbsvcs/tests/EC_Config/Consumer.h
index d92e461fd20..800bc309f3b 100644
--- a/TAO/orbsvcs/tests/EC_Config/Consumer.h
+++ b/TAO/orbsvcs/tests/EC_Config/Consumer.h
@@ -10,51 +10,100 @@
// Consumer
//
// = AUTHOR
-// Carlos O'Ryan (coryan@cs.wustl.edu)
+// Bryan A. Thrall (thrall@cse.wustl.edu)
//
// ============================================================================
#ifndef CONSUMER_H
#define CONSUMER_H
-#include "orbsvcs/RtecEventCommS.h"
+#include "orbsvcs/RtecEventChannelAdminC.h"
+#include "orbsvcs/RtecEventCommC.h"
+#include "orbsvcs/RtecSchedulerC.h"
+#include "orbsvcs/Channel_Clients_T.h"
#if !defined (ACE_LACKS_PRAGMA_ONCE)
# pragma once
#endif /* ACE_LACKS_PRAGMA_ONCE */
-class Consumer : public POA_RtecEventComm::PushConsumer
+class Consumer
{
// = TITLE
// Simple consumer object
//
// = DESCRIPTION
- // This class is a consumer of events.
- // It simply register for two event typesone event type
- // The class is just a helper to simplify common tasks in EC
- // tests, such as subscribing for a range of events, disconnecting
- // from the EC, informing the driver of shutdown messages, etc.
+ // This class is a consumer of the events pushed by a TimeoutConsumer
+ // every timeout.
//
- // There are several ways to connect and disconnect this class,
- // and it is up to the driver program to use the right one.
+ // It simply registers for the event type specified in its connect()
+ // function.
//
public:
Consumer (void);
- // Constructor
+ // Default Constructor.
- Consumer(int ord);
+ virtual ~Consumer (void);
+
+ void connect (RtecScheduler::Scheduler_ptr scheduler,
+ const char *entry_prefix,
+ int consumer_id, //unique identifier
+ long event_type,
+ RtecEventChannelAdmin::EventChannel_ptr ec
+ ACE_ENV_ARG_DECL);
+ // This method connects the consumer to the EC without setting anything
+ // in the RT_Info (such as period, criticality, etc.). The consumer
+ // subscribes to events with the specified event_type.
+
+ void connect (RtecScheduler::Scheduler_ptr scheduler,
+ const char *entry_prefix,
+ int consumer_id, //unique identifier
+ long event_type,
+ TimeBase::TimeT period,
+ RtecScheduler::Importance_t importance,
+ RtecScheduler::Criticality_t criticality,
+ RtecEventChannelAdmin::EventChannel_ptr ec
+ ACE_ENV_ARG_DECL);
+ // This method connects the consumer to the EC, setting RT_Info values
+ // for period, criticality, and importance. The consumer subscribes
+ // to events with the specified event_type.
+
+ void disconnect (ACE_ENV_SINGLE_ARG_DECL);
+ // Disconnect from the EC.
// = The RtecEventComm::PushConsumer methods
virtual void push (const RtecEventComm::EventSet& events
ACE_ENV_ARG_DECL)
ACE_THROW_SPEC ((CORBA::SystemException));
+
virtual void disconnect_push_consumer (ACE_ENV_SINGLE_ARG_DECL_NOT_USED)
ACE_THROW_SPEC ((CORBA::SystemException));
// The skeleton methods.
+protected:
+ void connect_impl (bool set_rtinfo, //true if should set RT_Info
+ RtecScheduler::Scheduler_ptr scheduler,
+ const char *entry_prefix,
+ int consumer_id, //unique identifier
+ long event_type,
+ TimeBase::TimeT period,
+ RtecScheduler::Importance_t importance,
+ RtecScheduler::Criticality_t criticality,
+ RtecEventChannelAdmin::EventChannel_ptr ec
+ ACE_ENV_ARG_DECL);
+ // This method implements the Consumer::connect() methods; if the first
+ // parameter is false, then the RT_Info values are ignored. Otherwise,
+ // they are set.
+
private:
- int _ordinal;
+ RtecEventChannelAdmin::ProxyPushSupplier_var _supplier_proxy;
+ // We talk to the EC (as a consumer) using this proxy.
+
+ ACE_PushConsumer_Adapter<Consumer> _consumer;
+ // We connect to the EC as a consumer so we can receive the
+ // timeout events.
+
+ int _consumer_id;
};
#endif /* CONSUMER_H */
diff --git a/TAO/orbsvcs/tests/EC_Config/ECConfig.cpp b/TAO/orbsvcs/tests/EC_Config/ECConfig.cpp
index 470a931da06..370b0e28cba 100644
--- a/TAO/orbsvcs/tests/EC_Config/ECConfig.cpp
+++ b/TAO/orbsvcs/tests/EC_Config/ECConfig.cpp
@@ -4,59 +4,100 @@
#define ECCONFIG_C
#include "ECConfig.h"
+#include "Consumer.h"
+#include "TimeoutConsumer.h"
#include <sstream> //for ostringstream
#include "ace/Array.h"
#include "ace/Bound_Ptr.h"
+#include "ace/Thread_Manager.h"
#include "orbsvcs/Scheduler_Factory.h"
#include "orbsvcs/Event_Utilities.h"
#include "orbsvcs/Event_Service_Constants.h"
#include "orbsvcs/Event/EC_Event_Channel.h"
#include "orbsvcs/Event/EC_Kokyu_Factory.h"
+#include "orbsvcs/RtecSchedulerC.h"
+#include "orbsvcs/RtecEventCommC.h"
namespace TestConfig {
template <class SCHED_STRAT>
ECConfig<SCHED_STRAT>::ECConfig (void)
- : Test_Config (),
- ec_impl(0),
- sched_impl(0),
- configured (0) //false
+ : Test_Config ()
+ , ec_impl(0)
+ , sched_impl(0)
+ , periods(0)
+ , importances(0)
+ , crits(0)
+ , test_done(new ACE_RW_Mutex())
+ , configured (0) //false
{
}
template <class SCHED_STRAT>
ECConfig<SCHED_STRAT>::~ECConfig (void)
{
- this->reset();
+ this->reset(ACE_ENV_SINGLE_ARG_PARAMETER);
+
+ delete this->test_done;
}
template <class SCHED_STRAT> void
-ECConfig<SCHED_STRAT>::reset (void)
+ECConfig<SCHED_STRAT>::reset (ACE_ENV_SINGLE_ARG_DECL)
{
// We should do a lot of cleanup (disconnect from the EC,
// deactivate all the objects with the POA, etc.).
- delete this->ec_impl;
+ this->disconnect_suppliers(ACE_ENV_SINGLE_ARG_PARAMETER); //should release all read locks on this->test_done
- delete this->sched_impl;
+ this->disconnect_consumers(ACE_ENV_SINGLE_ARG_PARAMETER);
+
+ {
+ // Deactivate the EC
+ PortableServer::POA_var poa =
+ this->ec_impl->_default_POA (ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_CHECK;
+ PortableServer::ObjectId_var id =
+ poa->servant_to_id (this->ec_impl ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK;
+ poa->deactivate_object (id.in () ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK;
- for(size_t i=0; i<consumers.size(); ++i) {
- delete this->consumers[i];
+ ACE_DEBUG ((LM_DEBUG, "EC deactivated\n"));
}
- for(size_t i=0; i<suppliers.size(); ++i) {
- delete this->suppliers[i];
+ {
+ // Deactivate the Scheduler
+ PortableServer::POA_var poa =
+ this->sched_impl->_default_POA (ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_CHECK;
+ PortableServer::ObjectId_var id =
+ poa->servant_to_id (this->sched_impl ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK;
+ poa->deactivate_object (id.in () ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK;
+
+ ACE_DEBUG ((LM_DEBUG, "scheduler deactivated\n"));
}
+ delete this->ec_impl;
+ this->ec_impl = 0;
+
+ delete this->sched_impl;
+ this->sched_impl = 0;
+
+ //TODO clear config_infos?
+
+ //TODO clear RT_Infos from scheduler?
+
configured = 0; //false
}
template <class SCHED_STRAT> int
ECConfig<SCHED_STRAT>::configure (TCFG_SET_WPTR testconfigs)
{
- if (configured) {
+ if (this->configured) {
ACE_DEBUG((LM_DEBUG,ACE_TEXT("Resetting EC\n")));
this->reset(); //delete memory used by previous configuration
}
@@ -70,119 +111,67 @@ ECConfig<SCHED_STRAT>::configure (TCFG_SET_WPTR testconfigs)
////////////////// EC ready; do config ////////////////////
size_t tsize = testconfigs->size();
- supplier_cfgs.size(tsize);
- consumer_cfgs.size(tsize);
- testcfgs.size(tsize);
- consumers.size(tsize);
- suppliers.size(tsize);
+ this->testcfgs.size(tsize);
+ this->periods.size(tsize);
+ this->importances.size(tsize);
+ this->crits.size(tsize);
for (size_t i=0; i<tsize; ++i)
{
//ACE_Weak_Bound_Ptr doesn't have operator*()! ??
//test_config_t *curcfg = (*testconfigs)[i];
test_config_t *curcfg = (*testconfigs.unsafe_get())[i];
- testcfgs[i] = curcfg;
+ this->testcfgs[i] = curcfg;
- RtecScheduler::Criticality_t criticality;
switch (curcfg->criticality) {
case VERY_LOW_CRITICALITY :
- criticality = RtecScheduler::VERY_LOW_CRITICALITY;
+ this->crits[i] = RtecScheduler::VERY_LOW_CRITICALITY;
break;
case LOW_CRITICALITY :
- criticality = RtecScheduler::LOW_CRITICALITY;
+ this->crits[i] = RtecScheduler::LOW_CRITICALITY;
break;
case MEDIUM_CRITICALITY :
- criticality = RtecScheduler::MEDIUM_CRITICALITY;
+ this->crits[i] = RtecScheduler::MEDIUM_CRITICALITY;
break;
case HIGH_CRITICALITY :
- criticality = RtecScheduler::HIGH_CRITICALITY;
+ this->crits[i] = RtecScheduler::HIGH_CRITICALITY;
break;
case VERY_HIGH_CRITICALITY :
- criticality = RtecScheduler::VERY_HIGH_CRITICALITY;
+ this->crits[i] = RtecScheduler::VERY_HIGH_CRITICALITY;
break;
}
- RtecScheduler::Importance_t importance;
switch (curcfg->importance) {
case VERY_LOW_IMPORTANCE :
- importance = RtecScheduler::VERY_LOW_IMPORTANCE;
+ this->importances[i] = RtecScheduler::VERY_LOW_IMPORTANCE;
break;
case LOW_IMPORTANCE :
- importance = RtecScheduler::LOW_IMPORTANCE;
+ this->importances[i] = RtecScheduler::LOW_IMPORTANCE;
break;
case MEDIUM_IMPORTANCE :
- importance = RtecScheduler::MEDIUM_IMPORTANCE;
+ this->importances[i] = RtecScheduler::MEDIUM_IMPORTANCE;
break;
case HIGH_IMPORTANCE :
- importance = RtecScheduler::HIGH_IMPORTANCE;
+ this->importances[i] = RtecScheduler::HIGH_IMPORTANCE;
break;
case VERY_HIGH_IMPORTANCE :
- importance = RtecScheduler::VERY_HIGH_IMPORTANCE;
+ this->importances[i] = RtecScheduler::VERY_HIGH_IMPORTANCE;
break;
}
- //create supplier RT_Info
- std::ostringstream supp_entry_pt;
- supp_entry_pt << "Supplier Event Class " << i; //unique RT_Info entry point
- RtecScheduler::handle_t rt_info =
- this->scheduler->create (supp_entry_pt.str().c_str() ACE_ENV_ARG_PARAMETER);
- ACE_Time_Value tv (0, curcfg->period);
- TimeBase::TimeT time;
- ORBSVCS_Time::Time_Value_to_TimeT (time, tv);
- this->scheduler->set (rt_info,
- criticality,
- time, time, time,
- time,
- importance,
- time,
- 0,
- RtecScheduler::OPERATION
- ACE_ENV_ARG_PARAMETER);
- ACE_TRY_CHECK;
-
- if (this->supplier_cfgs.set(rt_info,i) != 0) {
- ACE_DEBUG((LM_DEBUG, "Could not set supplier RT_Info into config: %d of %d\n",
- i,consumer_cfgs.max_size()));
- return 1;
- }
-
- //create consumer RT_Info
- std::ostringstream cons_entry_pt;
- cons_entry_pt << "Consumer Event Class " << i; //unique RT_Info entry point
- rt_info =
- this->scheduler->create (cons_entry_pt.str().c_str() ACE_ENV_ARG_PARAMETER);
- tv.set (0, curcfg->period);
- ORBSVCS_Time::Time_Value_to_TimeT (time, tv);
- this->scheduler->set (rt_info,
- criticality,
- time, time, time,
- time,
- importance,
- time,
- 0,
- RtecScheduler::OPERATION
- ACE_ENV_ARG_PARAMETER);
- ACE_TRY_CHECK;
-
- if (this->consumer_cfgs.set(rt_info,i) != 0) {
- ACE_DEBUG((LM_DEBUG, "Could not set consumer RT_Info into config: %d of %d\n",
- i,consumer_cfgs.max_size()));
- return 1;
- }
+ ACE_Time_Value tv;
+ tv.msec(curcfg->period);
+ ORBSVCS_Time::Time_Value_to_TimeT (this->periods[i], tv);
}
- this->connectConsumers();
- this->connectSuppliers();
-
- ACE_DEBUG ((LM_DEBUG, "Consumer RT_Infos:\n"));
- print_RT_Infos (this->consumer_cfgs);
- ACE_DEBUG ((LM_DEBUG, "\nSupplier RT_Infos:\n"));
- print_RT_Infos (this->supplier_cfgs);
+ this->connect_consumers(ACE_ENV_SINGLE_ARG_PARAMETER);
+ this->connect_suppliers(ACE_ENV_SINGLE_ARG_PARAMETER);
////////////////// Configured; compute schedule ///////////
ACE_DEBUG ((LM_DEBUG, "Computing schedule\n"));
RtecScheduler::RT_Info_Set_var infos;
RtecScheduler::Config_Info_Set_var configs;
RtecScheduler::Scheduling_Anomaly_Set_var anomalies;
+ RtecScheduler::Dependency_Set_var deps;
// Obtain the range of valid priorities in the current
// platform, the scheduler hard-code this values in the
@@ -198,6 +187,7 @@ ECConfig<SCHED_STRAT>::configure (TCFG_SET_WPTR testconfigs)
this->scheduler->compute_scheduling (min_os_priority,
max_os_priority,
infos.out (),
+ deps.out (),
configs.out (),
anomalies.out ()
ACE_ENV_ARG_PARAMETER);
@@ -205,6 +195,7 @@ ECConfig<SCHED_STRAT>::configure (TCFG_SET_WPTR testconfigs)
// Dump the schedule to a file..
ACE_Scheduler_Factory::dump_schedule (infos.in (),
+ deps.in(),
configs.in (),
anomalies.in (),
"ecconfig.out");
@@ -237,51 +228,45 @@ ECConfig<SCHED_STRAT>::run (void)
ACE_TRY
{
- RtecEventComm::EventSet event (1);
- event.length (1);
+ ACE_Thread_Manager *inst = ACE_Thread_Manager::instance();
- ACE_Array<int> evt_counts(this->testcfgs.size());
- for(size_t i=0; i<this->testcfgs.size(); ++i)
+ // Spawn orb thread (which calls orb.run(), then terminates on return)
+ ACE_DEBUG((LM_DEBUG,"SPAWNING ORB thread\n"));
+ int ret = inst->spawn(ECConfig<SCHED_STRAT>::run_orb,&(this->orb));
+ //no need for getting tid?
+ if (ret == -1)
{
- //copy over total number of events per test_config_t to send
- evt_counts[i] = this->testcfgs[i]->num_entities;
+ ACE_DEBUG ((LM_DEBUG, "ERROR: Couldn't spawn ORB->run() thread: %s\n",
+ ACE_OS::strerror(errno)));
+ return 1;
}
- size_t num_done = 0; //total number of testcfgs which have no more events to push
- while (num_done<this->testcfgs.size())
+ // Block waiting for consumers to finish
+ //when can acquire write lock, all TimeoutConsumers are finished
+ ret = this->test_done->acquire_write();
+ if (ret == -1)
{
- //for each consumer, push an event
- for(size_t i=0; i<this->testcfgs.size() && i<this->consumer_proxys.size(); ++i)
- {
- if (evt_counts[i]<=0)
- {
- if (evt_counts[i]==0)
- {
- //just finished
- ++num_done;
- evt_counts[i]--; //indicate accounted for in num_done
- } //else already incr num_done for this one
- continue; //no more events of this to push
- } //else...
- test_config_t *tcfg = this->testcfgs[i];
- ProxyList::TYPE curproxy = this->consumer_proxys[i];
-
- event[0].header.type = ACE_ES_EVENT_UNDEFINED+tcfg->type;
- event[0].header.source = supplier_ids[i];
- event[0].header.ttl = 1;
-
- curproxy->push (event ACE_ENV_ARG_PARAMETER);
- ACE_TRY_CHECK;
-
- //event pushed, so decr corresponding evt_count
- evt_counts[i]--;
-
- // BT TODO sleep until next period expires
- ACE_Time_Value rate (0, 10000);
- ACE_OS::sleep (rate);
+ ACE_DEBUG((LM_DEBUG, "ERROR: could not acquire write lock for ECConfig: %s\n",
+ ACE_OS::strerror(errno)));
+ return 1;
+ }
- }
+ //all TimeoutConsumers done, so stop EC and ORB
+ //Shutdown EC
+ this->reset();
+
+ // Shutdown ORB
+ this->orb->shutdown(1); //argument is TRUE
+
+ if (inst->wait() == -1) //wait for ORB thread to terminate
+ {
+ ACE_ERROR((LM_ERROR, "ERROR: Thread_Manager wait failed: %s\n",
+ ACE_OS::strerror(errno)));
+ return 1;
}
+
+ ACE_DEBUG ((LM_DEBUG, "suppliers finished\n"));
+
}
ACE_CATCHANY
{
@@ -293,170 +278,145 @@ ECConfig<SCHED_STRAT>::run (void)
return 0; //successful run
}
-template <class SCHED_STRAT> int
-ECConfig<SCHED_STRAT>::initEC()
+template <class SCHED_STRAT> void
+ECConfig<SCHED_STRAT>::initEC(ACE_ENV_SINGLE_ARG_DECL)
{
TAO_EC_Kokyu_Factory::init_svcs ();
- ACE_DEBUG ((LM_DEBUG, ACE_TEXT("Initializing event channel\n")));
- ACE_TRY
- {
- // ORB initialization boiler plate...
- int argc = 0;
- char** argv = 0;
- this->orb =
- CORBA::ORB_init (argc, argv, "" ACE_ENV_ARG_PARAMETER);
- ACE_TRY_CHECK;
+ ACE_DEBUG ((LM_DEBUG,ACE_TEXT("Initializing event channel\n")));
- ACE_DEBUG((LM_DEBUG,ACE_TEXT("Resolving initial references\n")));
+ // ORB initialization boiler plate...
+ int argc = 0;
+ char** argv = 0;
+ this->orb =
+ CORBA::ORB_init (argc, argv, "" ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK;
- CORBA::Object_var object =
- orb->resolve_initial_references ("RootPOA" ACE_ENV_ARG_PARAMETER);
- ACE_TRY_CHECK;
- this->poa =
- PortableServer::POA::_narrow (object.in () ACE_ENV_ARG_PARAMETER);
- ACE_TRY_CHECK;
- this->poa_manager =
- poa->the_POAManager (ACE_ENV_SINGLE_ARG_PARAMETER);
- ACE_TRY_CHECK;
- poa_manager->activate (ACE_ENV_SINGLE_ARG_PARAMETER);
- ACE_TRY_CHECK;
- // DO these need to remain in scope beyond this function?
+ ACE_DEBUG((LM_DEBUG,ACE_TEXT("Resolving initial references\n")));
- ACE_DEBUG((LM_DEBUG,ACE_TEXT("Creating sched service\n")));
+ CORBA::Object_var object =
+ orb->resolve_initial_references ("RootPOA" ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK;
+ this->poa =
+ PortableServer::POA::_narrow (object.in () ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK;
+ this->poa_manager =
+ poa->the_POAManager (ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_CHECK;
+ poa_manager->activate (ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_CHECK;
+ // DO these need to remain in scope beyond this function?
- // Create a scheduling service
- ACE_NEW_RETURN (this->sched_impl,SCHED_STRAT,1);
+ ACE_DEBUG((LM_DEBUG,ACE_TEXT("Creating sched service\n")));
- this->scheduler = sched_impl->_this (ACE_ENV_SINGLE_ARG_PARAMETER);
- ACE_TRY_CHECK;
+ // Create a scheduling service
+ ACE_NEW (this->sched_impl,SCHED_STRAT);
- // Create an event channel implementation...
- TAO_EC_Event_Channel_Attributes attributes (poa.in (),
- poa.in ());
- attributes.scheduler = scheduler.in (); // no need to dup
+ this->scheduler = sched_impl->_this (ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_CHECK;
- ACE_NEW_RETURN (this->ec_impl,TAO_EC_Event_Channel (attributes),1);
+ // Create an event channel implementation...
+ TAO_EC_Event_Channel_Attributes attributes (poa.in (),
+ poa.in ());
+ attributes.scheduler = scheduler.in (); // no need to dup
- ACE_DEBUG((LM_DEBUG,ACE_TEXT("Created ec_impl\n")));
+ ACE_NEW (this->ec_impl,TAO_EC_Event_Channel (attributes));
- this->event_channel =
- this->ec_impl->_this (ACE_ENV_SINGLE_ARG_PARAMETER);
- ACE_TRY_CHECK;
- }
- ACE_CATCHANY
- {
- ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, "ECConfig");
- return 1;
- }
- ACE_ENDTRY;
+ ACE_DEBUG((LM_DEBUG,ACE_TEXT("Created ec_impl\n")));
- return 0;
+ this->event_channel =
+ this->ec_impl->_this (ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_CHECK;
}
-template <class SCHED_STRAT> int
-ECConfig<SCHED_STRAT>::connectConsumers()
+template <class SCHED_STRAT> void
+ECConfig<SCHED_STRAT>::connect_suppliers (ACE_ENV_SINGLE_ARG_DECL)
{
- ACE_TRY
- {
- // The canonical protocol to connect to the EC
- RtecEventChannelAdmin::ConsumerAdmin_var consumer_admin =
- this->event_channel->for_consumers (ACE_ENV_SINGLE_ARG_PARAMETER);
- ACE_TRY_CHECK;
+ RtecEventComm::EventSet event(1);
+ event.length(1);
- ACE_DEBUG ((LM_DEBUG, "connecting consumers\n"));
- for (size_t i=0; i<this->consumer_cfgs.size() && i<this->testcfgs.size(); ++i)
- {
- ACE_NEW_RETURN(this->consumers[i],Consumer(i),1);
-
- RtecScheduler::handle_t hndl = this->consumer_cfgs[i];
- test_config_t *tcfg = this->testcfgs[i];
-
- ACE_ConsumerQOS_Factory consumer_qos;
- //consumer_qos.start_disjunction_group ();
- // The types in the range [0,ACE_ES_EVENT_UNDEFINED) are
- // reserved for the EC...
- consumer_qos.insert_type (ACE_ES_EVENT_UNDEFINED+tcfg->type,
- hndl);
-
- RtecEventChannelAdmin::ProxyPushSupplier_var supplier_proxy =
- consumer_admin->obtain_push_supplier (ACE_ENV_SINGLE_ARG_PARAMETER);
- ACE_TRY_CHECK;
-
- RtecEventComm::PushConsumer_var consumerv =
- consumers[i]->_this (ACE_ENV_SINGLE_ARG_PARAMETER);
- ACE_TRY_CHECK;
-
- supplier_proxy->connect_push_consumer (consumerv.in (),
- consumer_qos.get_ConsumerQOS ()
- ACE_ENV_ARG_PARAMETER);
- ACE_TRY_CHECK;
- }
- }
- ACE_CATCHANY
+ ACE_DEBUG((LM_DEBUG,"TimeoutConsumers to connect: %d\n",this->testcfgs.size()));
+ this->suppliers.size(this->testcfgs.size());
+ for (size_t i=0; i<this->testcfgs.size(); ++i)
{
- ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, "ECConfig");
- return 1;
+ ACE_NEW (this->suppliers[i], TimeoutConsumer ());
+
+ event[0].header.type = ACE_ES_EVENT_UNDEFINED+this->testcfgs[i]->type;
+ event[0].header.source = i; //supplier_id
+ event[0].header.ttl = 1;
+
+ std::ostringstream entry_prefix;
+ entry_prefix << "TimeoutConsumer " << i;
+
+ ACE_DEBUG((LM_DEBUG,"TimeoutConsumer.connect() for %s\n",entry_prefix.str().c_str()));
+ this->suppliers[i]->connect (this->test_done,
+ this->scheduler.in(),
+ entry_prefix.str().c_str(),
+ this->periods[i], //period
+ this->importances[i], //importance
+ this->crits[i], //criticality
+ i, //supplier_id
+ this->testcfgs[i]->num_entities, //to_send
+ event, //event set
+ this->event_channel.in()
+ ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK;
}
- ACE_ENDTRY;
-
- return 0; //successful run
}
-template <class SCHED_STRAT> int
-ECConfig<SCHED_STRAT>::connectSuppliers()
+template <class SCHED_STRAT> void
+ECConfig<SCHED_STRAT>::connect_consumers (ACE_ENV_SINGLE_ARG_DECL)
{
- ACE_TRY
+ ACE_DEBUG((LM_DEBUG,"Consumers to connect: %d\n",this->testcfgs.size()));
+ this->consumers.size(this->testcfgs.size());
+ for (size_t i=0; i<this->testcfgs.size(); ++i)
{
- // The canonical protocol to connect to the EC
- RtecEventChannelAdmin::SupplierAdmin_var supplier_admin =
- event_channel->for_suppliers (ACE_ENV_SINGLE_ARG_PARAMETER);
- ACE_TRY_CHECK;
-
- ACE_DEBUG ((LM_DEBUG, "connecting suppliers\n"));
- supplier_ids.size(supplier_cfgs.size());
- consumer_proxys.size(supplier_cfgs.size());
- for (size_t i=0; i<supplier_cfgs.size() && i<this->testcfgs.size(); ++i)
- {
- ACE_NEW_RETURN(this->suppliers[i],Supplier(),1);
-
- RtecScheduler::handle_t hndl = this->supplier_cfgs[i];
- test_config_t *tcfg = this->testcfgs[i];
-
- RtecEventComm::EventSourceID supplier_id = i;
- this->supplier_ids[i] = supplier_id;
-
- ACE_SupplierQOS_Factory supplier_qos;
- supplier_qos.insert (supplier_id,
- ACE_ES_EVENT_UNDEFINED+tcfg->type,
- hndl,
- 1); // number of calls, but what does that mean?
-
- consumer_proxys[i] =
- supplier_admin->obtain_push_consumer (ACE_ENV_SINGLE_ARG_PARAMETER);
- ACE_TRY_CHECK;
-
- RtecEventComm::PushSupplier_var supplier =
- suppliers[i]->_this (ACE_ENV_SINGLE_ARG_PARAMETER);
- ACE_TRY_CHECK;
+ ACE_NEW (this->consumers[i], Consumer ());
+
+ std::ostringstream entry_prefix;
+ entry_prefix << "Consumer " << i;
+
+ ACE_DEBUG((LM_DEBUG,"Consumer.connect() for %s\n",entry_prefix.str().c_str()));
+ //don't set the RT_Info values
+ this->consumers[i]->connect (this->scheduler.in(),
+ entry_prefix.str().c_str(),
+ i, //consumer id
+ ACE_ES_EVENT_UNDEFINED+this->testcfgs[i]->type, //type
+ /*
+ this->periods[i],
+ this->importances[i],
+ this->crits[i],
+ */
+ this->event_channel.in()
+ ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK;
+ }
+}
- //PROBLEM: Occasional segfault here:
- consumer_proxys[i]->connect_push_supplier (supplier.in (),
- supplier_qos.get_SupplierQOS ()
- ACE_ENV_ARG_PARAMETER);
- ACE_TRY_CHECK;
- }
+template <class SCHED_STRAT> void
+ECConfig<SCHED_STRAT>::disconnect_suppliers (ACE_ENV_SINGLE_ARG_DECL)
+{
+ for (size_t i = 0; i < this->suppliers.size(); ++i)
+ {
+ this->suppliers[i]->disconnect (ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_CHECK;
- ACE_DEBUG ((LM_DEBUG, "suppliers connected\n"));
+ delete this->suppliers[i];
+ this->suppliers[i] = 0;
}
- ACE_CATCHANY
+}
+
+template <class SCHED_STRAT> void
+ECConfig<SCHED_STRAT>::disconnect_consumers (ACE_ENV_SINGLE_ARG_DECL)
+{
+ for (size_t i = 0; i < this->consumers.size(); ++i)
{
- ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, "ECConfig");
- return 1;
- }
- ACE_ENDTRY;
+ this->consumers[i]->disconnect (ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_CHECK;
- return 0;
+ delete this->consumers[i];
+ this->consumers[i] = 0;
+ }
}
template <class SCHED_STRAT> void
@@ -507,6 +467,32 @@ ECConfig<SCHED_STRAT>::print_RT_Infos (ACE_Array<RtecScheduler::handle_t> cfg_se
}
+template <class SCHED_STRAT> ACE_THR_FUNC_RETURN
+ECConfig<SCHED_STRAT>::run_orb(void *data)
+{
+ //ACE_DECLARE_NEW_CORBA_ENV;
+ ACE_TRY
+ {
+ ACE_DEBUG((LM_DEBUG, "ORB thread: Casting %x\n",data));
+
+ CORBA::ORB_var orb = *(ACE_reinterpret_cast(CORBA::ORB_var*,data));
+
+ ACE_DEBUG((LM_DEBUG, "ORB thread: Running orb\n"));
+
+ orb->run();
+ //this method returns when orb->shutdown() is called; then thread exits
+
+ ACE_DEBUG((LM_DEBUG, "ORB thread: Shutdown\n"));
+ }
+ ACE_CATCHANY
+ {
+ ACE_PRINT_EXCEPTION(ACE_ANY_EXCEPTION, "ECConfig ORB thread");
+ }
+ ACE_ENDTRY;
+
+ return 0;
+}
+
} /* namespace TestConfig */
#endif /* ECCONFIG_C */
diff --git a/TAO/orbsvcs/tests/EC_Config/ECConfig.h b/TAO/orbsvcs/tests/EC_Config/ECConfig.h
index cef06505ecc..e0b3b1c2384 100644
--- a/TAO/orbsvcs/tests/EC_Config/ECConfig.h
+++ b/TAO/orbsvcs/tests/EC_Config/ECConfig.h
@@ -15,7 +15,8 @@
#define ECCONFIG_H
#include "ace/Array.h"
-#include "ace/Synch.h"
+//#include "ace/Synch.h"
+#include "ace/RW_Mutex.h"
#include "orbsvcs/RtecSchedulerS.h" //for POA_RtecScheduler
#include "orbsvcs/RtecSchedulerC.h"
#include "orbsvcs/RtecEventChannelAdminC.h"
@@ -23,7 +24,7 @@
#include "TestConfig.h"
#include "Consumer.h"
-#include "Supplier.h"
+#include "TimeoutConsumer.h"
#if !defined (ACE_LACKS_PRAGMA_ONCE)
# pragma once
@@ -34,9 +35,12 @@ namespace TestConfig {
typedef ACE_Array<RtecEventChannelAdmin::ProxyPushConsumer_var> ProxyList;
typedef ACE_Array<RtecScheduler::handle_t> ConfigList;
typedef ACE_Array<RtecEventComm::EventSourceID> SupplierIDList;
+typedef ACE_Array<TimeBase::TimeT> PeriodList;
+typedef ACE_Array<RtecScheduler::Importance_t> ImportanceList;
+typedef ACE_Array<RtecScheduler::Criticality_t> CritList;
typedef ACE_Array<Consumer*> ConsumerList;
-typedef ACE_Array<Supplier*> SupplierList;
+typedef ACE_Array<TimeoutConsumer*> SupplierList;
template <class SCHED_STRAT>
class ECConfig : public Test_Config {
@@ -60,19 +64,26 @@ public:
//events.
protected:
- virtual int initEC (void);
+ // TODO USE DEFAULTS FOR ANY OF THESE?
- virtual int connectConsumers (void);
+ virtual void initEC (ACE_ENV_SINGLE_ARG_DECL);
- virtual int connectSuppliers (void);
+ void connect_suppliers (ACE_ENV_SINGLE_ARG_DECL);
- virtual void reset (void);
- //
+ void disconnect_suppliers (ACE_ENV_SINGLE_ARG_DECL);
+
+ void connect_consumers (ACE_ENV_SINGLE_ARG_DECL);
+
+ void disconnect_consumers (ACE_ENV_SINGLE_ARG_DECL);
+
+ virtual void reset (ACE_ENV_SINGLE_ARG_DECL);
private:
void print_RT_Infos (ACE_Array<RtecScheduler::handle_t> cfg_set);
+ static ACE_THR_FUNC_RETURN run_orb(void *data); //thread fcn for running ORB
+
Test_Config_Set testcfgs;
//copy of the currently configured Test_Config_Set
//using the same test_config_t objects
@@ -86,30 +97,25 @@ private:
RtecEventChannelAdmin::EventChannel_var event_channel;
RtecScheduler::Scheduler_var scheduler;
- /*
- ACE_Strong_Bound_Ptr<TAO_EC_Event_Channel,ACE_Null_Mutex> ec_impl;
- ACE_Strong_Bound_Ptr<POA_RtecScheduler::Scheduler,ACE_Null_Mutex> sched_impl;
- */
TAO_EC_Event_Channel *ec_impl;
- POA_RtecScheduler::Scheduler *sched_impl;
+ SCHED_STRAT *sched_impl;
- ProxyList consumer_proxys;
- //proxy objects for pushing events to consumers
+ ConsumerList consumers;
- ConfigList supplier_cfgs;
- //RT_Infos generated by configure() for suppliers.
+ SupplierList suppliers;
- ConfigList consumer_cfgs;
- //RT_Infos generated by configure() for consumers.
+ PeriodList periods;
- SupplierIDList supplier_ids;
- //IDs of the suppliers
+ ImportanceList importances;
- ConsumerList consumers;
+ CritList crits;
- SupplierList suppliers;
+ ACE_RW_Mutex* test_done;
+ //TimeoutConsumers acquire read locks; when the ECConfig can acquire
+ //a write lock, all TimeoutConsumers must've finished, so the test
+ //is finished.
int configured; //boolean
};
diff --git a/TAO/orbsvcs/tests/EC_Config/Makefile b/TAO/orbsvcs/tests/EC_Config/Makefile
index 511b965f130..fb585fceabb 100644
--- a/TAO/orbsvcs/tests/EC_Config/Makefile
+++ b/TAO/orbsvcs/tests/EC_Config/Makefile
@@ -23,7 +23,7 @@ ifeq (Event,$(findstring Event,$(TAO_ORBSVCS)))
endif # Sched
endif # Event
-PSRC= Test.cpp Supplier.cpp Consumer.cpp ECConfig.cpp Service.cpp Config_Factory.cpp Test_Handler.cpp TestConfig.cpp
+PSRC= Test.cpp Supplier.cpp Consumer.cpp ECConfig.cpp Service.cpp Config_Factory.cpp Test_Handler.cpp TestConfig.cpp TimeoutConsumer.cpp
LDLIBS = -lTAO_RTOLDEvent -lTAO_RTEvent -lTAO_RTSched -lTAO_CosNaming -lTAO_Svc_Utils -lTAO_IORTable -lTAO_Messaging -lTAO_PortableServer -lTAO -lTAO_RTKokyuEvent -lACEXML_Parser
# The complete path to orbsvcs/orbsvcs/Sched is required for DU/CXX
@@ -32,13 +32,17 @@ CPPFLAGS += -I$(TAO_ROOT) -I$(TAO_ROOT)/orbsvcs \
-I$(TAO_ROOT)/orbsvcs/orbsvcs/Sched \
$(foreach svc, $(TAO_ORBSVCS), -DTAO_ORBSVCS_HAS_$(svc))
-Test_OBJS=$(addsuffix .o, Test Supplier Consumer ECConfig Config_Factory Test_Handler TestConfig)
+Test_OBJS=$(addsuffix .o, Test Consumer ECConfig Config_Factory Test_Handler TestConfig TimeoutConsumer)
Service_OBJS=$(addsuffix .o, Service Supplier Consumer ECConfig TestConfig)
#----------------------------------------------------------------------------
# Include macros and targets
#----------------------------------------------------------------------------
+# To allow debug, disable optimization:
+debug=1
+optimize=0
+
include $(ACE_ROOT)/include/makeinclude/wrapper_macros.GNU
include $(ACE_ROOT)/include/makeinclude/macros.GNU
include $(TAO_ROOT)/rules.tao.GNU
diff --git a/TAO/orbsvcs/tests/EC_Config/Test.cpp b/TAO/orbsvcs/tests/EC_Config/Test.cpp
index 2941938a70b..705725f9e52 100644
--- a/TAO/orbsvcs/tests/EC_Config/Test.cpp
+++ b/TAO/orbsvcs/tests/EC_Config/Test.cpp
@@ -3,6 +3,8 @@
#include "ace/Array.h"
#include "ace/Bound_Ptr.h"
#include "ace/Synch.h"
+#include "ace/Get_Opt.h"
+#include "ace/String_Base.h"
#include "ACEXML/parser/parser/Parser.h"
#include "ACEXML/common/InputSource.h"
#include "ACEXML/common/FileCharStream.h"
@@ -14,7 +16,12 @@
using namespace TestConfig;
-//NOTE: Read from a formatted file rather than hardcode the configuration. Check ACE_XML.
+struct Arguments
+{
+ ACE_CString filename_;
+};
+
+int parse_args (int argc, char *argv[],Arguments &args);
int
main (int argc, char *argv[])
@@ -24,19 +31,24 @@ main (int argc, char *argv[])
ACEXML_TRY_NEW_ENV
{
ACEXML_Parser parser;
-
- // TODO parse args for config filename
-
- ACEXML_Char *filename = ACE_LIB_TEXT("test.xml");
- ACEXML_FileCharStream fcs;
- if ((retval = fcs.open(filename)) != 0) {
- ACE_DEBUG ((LM_DEBUG, "Could not open file %s\n",filename));
+ Arguments args;
+ args.filename_.set(ACE_TEXT("test.xml"));
+
+ // parse args for config filename
+ if (parse_args(argc,argv,args) == -1)
+ {
+ return 1;
+ }
+
+ ACEXML_FileCharStream *fcs = new ACEXML_FileCharStream();
+ if ((retval = fcs->open(args.filename_.c_str())) != 0) {
+ ACE_DEBUG ((LM_DEBUG, "Could not open file %s\n",args.filename_.c_str()));
return retval;
}
- ACEXML_InputSource is (&fcs);
+ ACEXML_InputSource is (fcs); //takes responsibility of fcs
- Test_Handler handler (filename);
+ Test_Handler handler (args.filename_.c_str());
ACEXML_DefaultHandler dflt;
parser.setContentHandler (&handler);
@@ -47,9 +59,14 @@ main (int argc, char *argv[])
parser.parse(&is);
ACEXML_TRY_CHECK;
+ if ((retval = fcs->close()) != 0) {
+ ACE_DEBUG ((LM_DEBUG, "Could not close file %s\n",args.filename_.c_str()));
+ return retval;
+ }
+
ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("Finished parsing\n")));
- // TODO configure according to parsed XML
+ // configure according to parsed XML
ConfigFactory::Default_Config_Factory fact;
fact.init(argc,argv);
@@ -89,3 +106,28 @@ main (int argc, char *argv[])
return retval;
}
+
+int parse_args (int argc, char *argv[], Arguments &args)
+{
+ ACE_Get_Opt get_opts (argc, argv, "f:");
+ int c;
+
+ while ((c = get_opts ()) != -1)
+ switch (c)
+ {
+ case 'f':
+ args.filename_.set(get_opts.opt_arg());
+ ACE_DEBUG((LM_DEBUG,ACE_TEXT("Filename argument: %s\n"),args.filename_.c_str()));
+ break;
+ case '?':
+ default:
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "usage: %s "
+ "[-f <filename>] "
+ "\n",
+ argv [0]),
+ -1);
+ }
+ // Indicates sucessful parsing of the command line
+ return 0;
+}
diff --git a/TAO/orbsvcs/tests/EC_Config/TestConfig.h b/TAO/orbsvcs/tests/EC_Config/TestConfig.h
index 536569f4b3b..1ab044cd160 100644
--- a/TAO/orbsvcs/tests/EC_Config/TestConfig.h
+++ b/TAO/orbsvcs/tests/EC_Config/TestConfig.h
@@ -35,7 +35,7 @@ typedef ACE_Weak_Bound_Ptr<Test_Config_Set,ACE_Null_Mutex> TCFG_SET_WPTR;
// distinct values for each.
typedef unsigned long Entity_Type_t;
-typedef long Period_t;
+typedef long Period_t; //in milliseconds
enum Criticality_t {
// Defines the criticality of the entity.
diff --git a/TAO/orbsvcs/tests/EC_Config/Test_Handler.cpp b/TAO/orbsvcs/tests/EC_Config/Test_Handler.cpp
index 6e11fa54b2c..5e5be2a5140 100644
--- a/TAO/orbsvcs/tests/EC_Config/Test_Handler.cpp
+++ b/TAO/orbsvcs/tests/EC_Config/Test_Handler.cpp
@@ -9,9 +9,9 @@
#include <stdlib.h> //for atol
#include <sstream> //for istringstream
-Test_Handler::Test_Handler (ACEXML_Char* fileName)
+Test_Handler::Test_Handler (const char *filename)
: configs_(new TestConfig::Test_Config_Set(0)),
- fileName_(ACE::strnew (fileName)),
+ fileName_(filename),
didtype_(0), //false
didperiod_(0),
didcrit_(0),
@@ -23,8 +23,6 @@ Test_Handler::Test_Handler (ACEXML_Char* fileName)
Test_Handler::~Test_Handler (void)
{
- delete[] this->fileName_;
-
const TestConfig::Test_Config_Set &cfgs = *this->configs_;
for(size_t i=0; i<cfgs.size(); ++i) {
@@ -408,7 +406,7 @@ Test_Handler::error (ACEXML_SAXParseException & ex ACEXML_ENV_ARG_DECL_NOT_USED)
ACE_THROW_SPEC ((ACEXML_SAXException))
{
- ACE_DEBUG ((LM_DEBUG, "%s:%d:%d ", this->fileName_,
+ ACE_DEBUG ((LM_DEBUG, "%s:%d:%d ", this->fileName_.c_str(),
this->locator_->getLineNumber(),
this->locator_->getColumnNumber()));
ex.print();
@@ -419,7 +417,7 @@ Test_Handler::fatalError (ACEXML_SAXParseException& ex ACEXML_ENV_ARG_DECL_NOT_U
ACE_THROW_SPEC ((ACEXML_SAXException))
{
- ACE_DEBUG ((LM_DEBUG, "%s:%d:%d ", this->fileName_,
+ ACE_DEBUG ((LM_DEBUG, "%s:%d:%d ", this->fileName_.c_str(),
this->locator_->getLineNumber(),
this->locator_->getColumnNumber()));
ex.print();
diff --git a/TAO/orbsvcs/tests/EC_Config/Test_Handler.h b/TAO/orbsvcs/tests/EC_Config/Test_Handler.h
index d783386fecd..85320731c81 100644
--- a/TAO/orbsvcs/tests/EC_Config/Test_Handler.h
+++ b/TAO/orbsvcs/tests/EC_Config/Test_Handler.h
@@ -16,6 +16,7 @@
#include "TestConfig.h"
+#include "ace/String_Base.h"
#include "ACEXML/common/ContentHandler.h"
#include "ACEXML/common/ErrorHandler.h"
@@ -23,6 +24,8 @@
enum element { TESTCONFIG,TEST_CONFIG_T,TYPE,PERIOD,CRITICALITY,IMPORTANCE,NUM_ENTITIES };
+// TODO Change XML format to list supplier and client RT_Infos separate, plus dependencies
+
/**
* @class Test_Handler
*
@@ -40,7 +43,7 @@ public:
/*
* Default constructor.
*/
- Test_Handler (ACEXML_Char* fileName);
+ Test_Handler (const char *filename);
/*
* Default destructor.
@@ -167,7 +170,7 @@ public:
private:
TestConfig::TCFG_SET_SPTR configs_;
- ACEXML_Char* fileName_;
+ ACE_CString fileName_;
ACEXML_Locator* locator_;
STACK scope_;
diff --git a/TAO/orbsvcs/tests/EC_Config/TimeoutConsumer.cpp b/TAO/orbsvcs/tests/EC_Config/TimeoutConsumer.cpp
new file mode 100644
index 00000000000..7546364855c
--- /dev/null
+++ b/TAO/orbsvcs/tests/EC_Config/TimeoutConsumer.cpp
@@ -0,0 +1,281 @@
+// $Id$
+
+#include "TimeoutConsumer.h"
+
+#include <sstream> //for ostringstream
+
+#include "orbsvcs/Event_Utilities.h" //for ACE_Supplier/ConsumerQOS_Factory
+#include "orbsvcs/Event_Service_Constants.h"
+#include "orbsvcs/RtecSchedulerC.h"
+#include "orbsvcs/RtecEventCommC.h"
+#include "ace/RW_Mutex.h"
+
+ACE_RCSID(EC_Examples, Consumer, "$Id$")
+
+TimeoutConsumer::TimeoutConsumer (void)
+ : _to_send(0)
+ , _num_sent(0)
+ , _hold_mtx(0)
+ , _done(0)
+ , _supplier(this)
+ , _consumer(this)
+ , _events(0)
+{
+}
+
+TimeoutConsumer::~TimeoutConsumer (void)
+{
+ // TODO this->disconnect() ???
+
+ if (this->_hold_mtx && this->_done!=0)
+ {
+ this->_done->release();
+ this->_hold_mtx = 0;
+ }
+}
+
+void
+TimeoutConsumer::connect (ACE_RW_Mutex* done,
+ RtecScheduler::Scheduler_ptr scheduler,
+ const char *entry_prefix,
+ TimeBase::TimeT period,
+ RtecScheduler::Importance_t importance,
+ RtecScheduler::Criticality_t criticality,
+ RtecEventComm::EventSourceID supplier_id,
+ size_t to_send,
+ const RtecEventComm::EventSet& events,
+ RtecEventChannelAdmin::EventChannel_ptr ec
+ ACE_ENV_ARG_DECL)
+{
+ this->_supplier_id = supplier_id;
+ this->_to_send = to_send;
+ this->_num_sent = 0;
+ this->_hold_mtx = 0;
+ this->_done = done;
+ if (this->_done!= 0 && this->_num_sent<this->_to_send)
+ {
+ int ret = done->acquire_read();
+ if (ret == -1)
+ {
+ ACE_DEBUG((LM_DEBUG,"ERROR: Could not acquire read lock for TimeoutConsumer: %s\n",
+ ACE_OS::strerror(errno)));
+ } else
+ {
+ ACE_DEBUG((LM_DEBUG,"ACQUIRED read lock for TimeoutConsumer %d\n",this->_supplier_id));
+ this->_hold_mtx = 1;
+ }
+ } else
+ {
+ ACE_DEBUG((LM_DEBUG,"Already done; did not grab read lock for TimeoutConsumer\n"));
+ }
+
+ this->_events.length(events.length());
+ for (size_t i=0; i<events.length(); ++i)
+ {
+ this->_events[i] = events[i]; //copy event to local set
+ this->_events[i].header.source = this->_supplier_id; //make sure event source is this
+ }
+
+ //create supplier RT_Info
+ std::ostringstream supp_entry_pt;
+ supp_entry_pt << entry_prefix << " Supplier " << this->_supplier_id; //unique RT_Info entry point
+ ACE_DEBUG((LM_DEBUG,"Creating %s\n",supp_entry_pt.str().c_str()));
+ RtecScheduler::handle_t rt_info = scheduler->create (supp_entry_pt.str().c_str()
+ ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK;
+ scheduler->set (rt_info,
+ criticality,
+ period, period, period,
+ period,
+ importance,
+ period,
+ 0,
+ RtecScheduler::OPERATION
+ ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK;
+
+ // Register as supplier of events
+ ACE_SupplierQOS_Factory supplierQOS;
+ for (size_t i=0; i<events.length(); ++i)
+ {
+ //insert type for each event
+ supplierQOS.insert (this->_supplier_id,
+ events[i].header.type,
+ rt_info,
+ 1);
+ }
+
+ RtecEventChannelAdmin::SupplierAdmin_var supplier_admin =
+ ec->for_suppliers (ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_CHECK;
+
+ this->_consumer_proxy =
+ supplier_admin->obtain_push_consumer (ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_CHECK;
+
+ RtecEventComm::PushSupplier_var supplierv =
+ this->_supplier._this (ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_CHECK;
+
+ this->_consumer_proxy->connect_push_supplier (supplierv.in (),
+ supplierQOS.get_SupplierQOS ()
+ ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK;
+
+ ACE_DEBUG((LM_DEBUG,"TimeoutConsumer connected as event supplier\n"));
+ for (size_t i=0; i<events.length(); ++i)
+ {
+ ACE_DEBUG((LM_DEBUG,"\tEvent Type: %d\n",events[i].header.type));
+ }
+
+ //create consumer RT_Info
+ std::ostringstream cons_entry_pt;
+ cons_entry_pt << entry_prefix << " Consumer"; //unique RT_Info entry point
+ ACE_DEBUG((LM_DEBUG,"Creating %s\n",cons_entry_pt.str().c_str()));
+ rt_info = scheduler->create (cons_entry_pt.str().c_str() ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK;
+
+ // Register as consumer of timeout events
+ ACE_ConsumerQOS_Factory timeoutQOS;
+ timeoutQOS.insert_time(ACE_ES_EVENT_INTERVAL_TIMEOUT /*??*/,
+ period, //TimeBase::TimeT
+ rt_info);
+
+ RtecEventChannelAdmin::ConsumerAdmin_var consumer_admin =
+ ec->for_consumers (ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_CHECK;
+
+ this->_supplier_proxy =
+ consumer_admin->obtain_push_supplier (ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_CHECK;
+
+ RtecEventComm::PushConsumer_var consumerv =
+ this->_consumer._this (ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_CHECK;
+
+ this->_supplier_proxy->connect_push_consumer (consumerv.in (),
+ timeoutQOS.get_ConsumerQOS ()
+ ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK;
+
+ ACE_DEBUG((LM_DEBUG,"TimeoutConsumer connected as timeout consumer\n"));
+ std::ostringstream prd;
+ prd << period;
+ ACE_DEBUG((LM_DEBUG,"\tTimeout period: %s\n",prd.str().c_str()));
+}
+
+void
+TimeoutConsumer::disconnect (ACE_ENV_SINGLE_ARG_DECL)
+{
+ if (! CORBA::is_nil (this->_consumer_proxy.in ()))
+ {
+ this->_consumer_proxy->disconnect_push_consumer (ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_CHECK;
+
+ this->_consumer_proxy =
+ RtecEventChannelAdmin::ProxyPushConsumer::_nil ();
+
+ // Deactivate the servant
+ PortableServer::POA_var poa =
+ this->_supplier._default_POA (ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_CHECK;
+ PortableServer::ObjectId_var id =
+ poa->servant_to_id (&this->_supplier ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK;
+ poa->deactivate_object (id.in () ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK;
+ }
+
+ //disconnect consumer ???
+ if (! CORBA::is_nil (this->_supplier_proxy.in()))
+ {
+ this->_supplier_proxy->disconnect_push_supplier (ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_CHECK;
+
+ this->_supplier_proxy = RtecEventChannelAdmin::ProxyPushSupplier::_nil();
+
+ //Deactivate the servant
+ PortableServer::POA_var poa =
+ this->_consumer._default_POA(ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_CHECK;
+ PortableServer::ObjectId_var id =
+ poa->servant_to_id (&this->_consumer ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK;
+ poa->deactivate_object(id.in() ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK;
+ }
+
+ if (this->_hold_mtx && this->_done!=0)
+ {
+ this->_done->release();
+ this->_hold_mtx = 0;
+ }
+}
+
+void
+TimeoutConsumer::push (const RtecEventComm::EventSet& events
+ ACE_ENV_ARG_DECL_NOT_USED)
+ ACE_THROW_SPEC ((CORBA::SystemException))
+{
+ if (events.length () == 0)
+ {
+ ACE_DEBUG ((LM_DEBUG,"TimeoutConsumer (%P|%t) push but no events\n"));
+ return;
+ }
+
+ ACE_DEBUG((LM_DEBUG,"TimeoutConsumer %d (%P|%t) received %d events:\n",this->_supplier_id,
+ events.length()));
+ if (this->_num_sent < this->_to_send)
+ {
+ for (size_t i=0; i<events.length(); ++i)
+ {
+ if (ACE_ES_EVENT_INTERVAL_TIMEOUT == events[i].header.type)
+ {
+ ACE_DEBUG((LM_DEBUG,"TimeoutConsumer %d (%P|%t) received timeout event\n",this->_supplier_id));
+
+ //TODO send this->_events
+ ++this->_num_sent;
+ ACE_DEBUG((LM_DEBUG,"Sent events; %d sent\t%d total\n",this->_num_sent,this->_to_send));
+ if (this->_num_sent >= this->_to_send)
+ {
+ //done
+ ACE_DEBUG((LM_DEBUG,"RELEASE read lock from TimeoutConsumer %d\n",
+ this->_supplier_id));
+ this->_done->release();
+ this->_hold_mtx = 0;
+ }
+ }
+ else
+ {
+ int prio = -1;
+ ACE_hthread_t handle;
+ ACE_Thread::self(handle);
+ ACE_Thread::getprio(handle,prio);
+ //ACE_thread_t tid = ACE_Thread::self();
+ ACE_DEBUG ((LM_DEBUG, "TimeoutConsumer @%d (%P|%t) we received event type %d\n",
+ prio,events[0].header.type));
+ }
+ }
+ } else
+ {
+ //do nothing
+ }
+}
+
+void
+TimeoutConsumer::disconnect_push_consumer (ACE_ENV_SINGLE_ARG_DECL_NOT_USED)
+ ACE_THROW_SPEC ((CORBA::SystemException))
+{
+}
+
+void
+TimeoutConsumer::disconnect_push_supplier (ACE_ENV_SINGLE_ARG_DECL_NOT_USED)
+ ACE_THROW_SPEC ((CORBA::SystemException))
+{
+}
+
+// ****************************************************************
+
+#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)
+#elif defined(ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA)
+#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */
diff --git a/TAO/orbsvcs/tests/EC_Config/TimeoutConsumer.h b/TAO/orbsvcs/tests/EC_Config/TimeoutConsumer.h
new file mode 100644
index 00000000000..db55b9970c3
--- /dev/null
+++ b/TAO/orbsvcs/tests/EC_Config/TimeoutConsumer.h
@@ -0,0 +1,110 @@
+/* -*- C++ -*- */
+// $Id$
+//
+// ============================================================================
+//
+// = LIBRARY
+// ORBSVCS Real-time Event Channel examples
+//
+// = FILENAME
+// TimeoutConsumer
+//
+// = AUTHOR
+// Bryan Thrall (thrall@cse.wustl.edu)
+//
+// ============================================================================
+
+#ifndef TIMEOUTCONSUMER_H
+#define TIMEOUTCONSUMER_H
+
+#include "orbsvcs/RtecEventChannelAdminC.h"
+#include "orbsvcs/RtecEventCommC.h"
+#include "orbsvcs/RtecSchedulerC.h"
+#include "orbsvcs/Channel_Clients_T.h"
+//#include "ace/Task.h"
+//#include "ace/Synch.h"
+#include "ace/RW_Mutex.h"
+
+#if !defined (ACE_LACKS_PRAGMA_ONCE)
+# pragma once
+#endif /* ACE_LACKS_PRAGMA_ONCE */
+
+class TimeoutConsumer
+{
+ // = TITLE
+ // Simple consumer object which responds to timeout events.
+ //
+ // = DESCRIPTION
+ // This class is a consumer of timeout events.
+ // For each timeout event it consumes, it pushes a specified EventSet into the EC.
+ //
+ // There are several ways to connect and disconnect this class,
+ // and it is up to the driver program to use the right one.
+ //
+public:
+ TimeoutConsumer (void);
+ // Default Constructor.
+
+ virtual ~TimeoutConsumer (void);
+
+ void connect (ACE_RW_Mutex* done,
+ RtecScheduler::Scheduler_ptr scheduler,
+ const char *entry_prefix,
+ TimeBase::TimeT period,
+ RtecScheduler::Importance_t importance,
+ RtecScheduler::Criticality_t criticality,
+ RtecEventComm::EventSourceID supplier_id,
+ size_t to_send,
+ const RtecEventComm::EventSet& events,
+ RtecEventChannelAdmin::EventChannel_ptr ec
+ ACE_ENV_ARG_DECL);
+ // This method connects the supplier to the EC.
+
+ void disconnect (ACE_ENV_SINGLE_ARG_DECL);
+ // Disconnect from the EC.
+
+ // = The RtecEventComm::PushConsumer methods
+
+ virtual void push (const RtecEventComm::EventSet& events
+ ACE_ENV_ARG_DECL)
+ ACE_THROW_SPEC ((CORBA::SystemException));
+ // pushes _events as a supplier until _to_send pushes, then calls resume() on this task.
+
+ virtual void disconnect_push_consumer (ACE_ENV_SINGLE_ARG_DECL_NOT_USED)
+ ACE_THROW_SPEC ((CORBA::SystemException));
+ // The skeleton methods.
+
+ // = The RtecEventComm::PushSupplier methods
+
+ virtual void disconnect_push_supplier (ACE_ENV_SINGLE_ARG_DECL_NOT_USED)
+ ACE_THROW_SPEC ((CORBA::SystemException));
+ // The skeleton methods.
+
+private:
+ size_t _to_send; //number of times to push on timeout
+ size_t _num_sent; //number of pushes so far
+ int _hold_mtx; //1 when hold _done mutex; 0 else
+ ACE_RW_Mutex* _done; //release read lock when _num_sent >= _to_send
+
+ RtecEventComm::EventSourceID _supplier_id;
+ // We generate an id based on the name....
+
+ RtecEventChannelAdmin::ProxyPushConsumer_var _consumer_proxy;
+ // We talk to the EC (as a supplier) using this proxy.
+
+ ACE_PushSupplier_Adapter<TimeoutConsumer> _supplier;
+ // We connect to the EC as a supplier so we can push events
+ // every time we receive a timeout event.
+
+ RtecEventChannelAdmin::ProxyPushSupplier_var _supplier_proxy;
+ // We talk to the EC (as a consumer) using this proxy.
+
+ ACE_PushConsumer_Adapter<TimeoutConsumer> _consumer;
+ // We connect to the EC as a consumer so we can receive the
+ // timeout events.
+
+ RtecEventComm::EventSet _events;
+ // set of events to push when a timeout event is received.
+};
+
+#endif /* CONSUMER_H */
diff --git a/TAO/orbsvcs/tests/EC_Config/test.xml b/TAO/orbsvcs/tests/EC_Config/test.xml
index 5a37434eb0f..dd90d444e5b 100644
--- a/TAO/orbsvcs/tests/EC_Config/test.xml
+++ b/TAO/orbsvcs/tests/EC_Config/test.xml
@@ -3,16 +3,16 @@
<testconfig>
<test_config_t>
<type>0</type>
- <period>1000</period>
+ <period>700</period>
<criticality value="HIGH" />
<importance value="VERY_LOW" />
<num_entities>10</num_entities>
</test_config_t>
<test_config_t>
<type>1</type>
- <period>2000</period>
+ <period>2100</period>
<criticality value="LOW"/>
<importance value="VERY_LOW"/>
<num_entities>10</num_entities>
</test_config_t>
-</testconfig> \ No newline at end of file
+</testconfig>
diff --git a/TAO/orbsvcs/tests/EC_Config/testconfig.dtd b/TAO/orbsvcs/tests/EC_Config/testconfig.dtd
index 6330b60e817..c658b70003c 100644
--- a/TAO/orbsvcs/tests/EC_Config/testconfig.dtd
+++ b/TAO/orbsvcs/tests/EC_Config/testconfig.dtd
@@ -1,6 +1,7 @@
<!ELEMENT testconfig (test_config_t*) >
<!ELEMENT test_config_t (type,period,criticality,importance,num_entities) >
<!ELEMENT type (#PCDATA) >
+<!-- period is in milliseconds -->
<!ELEMENT period (#PCDATA) >
<!ELEMENT criticality EMPTY >
<!ELEMENT importance EMPTY >