summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorthrall <thrall@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>2004-05-25 21:38:52 +0000
committerthrall <thrall@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>2004-05-25 21:38:52 +0000
commitb2c36806de01b12099d3d546385935adc9d5800d (patch)
treeb78d76e6e45582a072441c7f4f7dd0bdedb1ddb3
parent7f6aee383581cdce6b1cc8064454db7cfcf14623 (diff)
downloadATCD-b2c36806de01b12099d3d546385935adc9d5800d.tar.gz
Fixed problem of Client sending to only one Middle; next, set up multiple consumer subscriptions, then join semantics in the Server
-rw-r--r--TAO/orbsvcs/examples/RtEC/Federated_Kokyu/dynamic_topology_test/Consumer_EC.cpp46
-rw-r--r--TAO/orbsvcs/examples/RtEC/Federated_Kokyu/dynamic_topology_test/Consumer_Supplier_EC.cpp48
-rw-r--r--TAO/orbsvcs/examples/RtEC/Federated_Kokyu/dynamic_topology_test/Dynamic_Supplier.cpp46
-rw-r--r--TAO/orbsvcs/examples/RtEC/Federated_Kokyu/dynamic_topology_test/Dynamic_Supplier.h10
-rw-r--r--TAO/orbsvcs/examples/RtEC/Federated_Kokyu/dynamic_topology_test/Kokyu_EC.cpp114
-rw-r--r--TAO/orbsvcs/examples/RtEC/Federated_Kokyu/dynamic_topology_test/Kokyu_EC.h10
-rw-r--r--TAO/orbsvcs/examples/RtEC/Federated_Kokyu/dynamic_topology_test/Supplier.cpp35
-rw-r--r--TAO/orbsvcs/examples/RtEC/Federated_Kokyu/dynamic_topology_test/Supplier.h29
-rw-r--r--TAO/orbsvcs/examples/RtEC/Federated_Kokyu/dynamic_topology_test/Supplier_EC.cpp54
9 files changed, 242 insertions, 150 deletions
diff --git a/TAO/orbsvcs/examples/RtEC/Federated_Kokyu/dynamic_topology_test/Consumer_EC.cpp b/TAO/orbsvcs/examples/RtEC/Federated_Kokyu/dynamic_topology_test/Consumer_EC.cpp
index 1fe20dd823d..1998c7319c7 100644
--- a/TAO/orbsvcs/examples/RtEC/Federated_Kokyu/dynamic_topology_test/Consumer_EC.cpp
+++ b/TAO/orbsvcs/examples/RtEC/Federated_Kokyu/dynamic_topology_test/Consumer_EC.cpp
@@ -13,6 +13,7 @@
#include "orbsvcs/Event/EC_Kokyu_Factory.h"
#include "orbsvcs/Time_Utilities.h"
#include "orbsvcs/Event_Service_Constants.h"
+#include "orbsvcs/Scheduler_Factory.h"
#include "tao/ORB_Core.h"
#include "Kokyu_EC.h"
@@ -30,6 +31,7 @@ namespace
{
int config_run = 0;
ACE_CString sched_type ="rms";
+ ACE_CString ior_output_filename;
FILE * ior_output_file;
}
/*
@@ -158,6 +160,44 @@ public:
);
ACE_CHECK;
+ //DEBUG: print out schedule
+ RtecScheduler::Scheduler_ptr scheduler = this->scheduler(ACE_ENV_SINGLE_ARG_PARAMETER);
+ //RtecEventChannelAdmin::EventChannel_ptr event_channel = this->event_channel(ACE_ENV_SINGLE_ARG_DECL);
+
+ RtecScheduler::RT_Info_Set_var infos;
+ RtecScheduler::Config_Info_Set_var configs;
+ RtecScheduler::Dependency_Set_var dependencies;
+ RtecScheduler::Scheduling_Anomaly_Set unsafe_anomalies;
+ RtecScheduler::Scheduling_Anomaly_Set_var anomalies;
+
+ int min_os_priority =
+ ACE_Sched_Params::priority_min (ACE_SCHED_FIFO,
+ ACE_SCOPE_THREAD);
+ int max_os_priority =
+ ACE_Sched_Params::priority_max (ACE_SCHED_FIFO,
+ ACE_SCOPE_THREAD);
+
+ scheduler->compute_scheduling (min_os_priority,
+ max_os_priority,
+ infos.out (),
+ dependencies.out (),
+ configs.out (),
+ anomalies.out ()
+ ACE_ENV_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+
+ std::stringstream sched_out;
+ ACE_CString ior_prefix(ior_output_filename.c_str(),ior_output_filename.length()-4); //cut off '.ior'
+ sched_out << "schedule_" << ior_prefix.c_str() << ".out";
+
+ ACE_DEBUG((LM_DEBUG,"Consumer_EC writing schedule to %s\n",sched_out.str().c_str()));
+
+ ACE_Scheduler_Factory::dump_schedule (infos.in (),
+ dependencies.in (),
+ configs.in (),
+ anomalies.in (),
+ sched_out.str().c_str());
+ ////END DEBUG
ACE_DEBUG((LM_DEBUG,"Consumer_EC set_up_supp_and_cons() DONE\n"));
} //set_up_supp_and_cons()
@@ -303,7 +343,8 @@ while ((c = get_opts ()) != -1)
switch (c)
{
case 'o':
- ior_output_file = ACE_OS::fopen (get_opts.opt_arg (), "w");
+ ior_output_filename = get_opts.opt_arg();
+ ior_output_file = ACE_OS::fopen (ior_output_filename.c_str(), "w");
if (ior_output_file == 0)
ACE_ERROR_RETURN ((LM_ERROR,
"Unable to open %s for writing: %p\n",
@@ -324,7 +365,8 @@ while ((c = get_opts ()) != -1)
// Indicates sucessful parsing of the command line
if (ior_output_file == 0)
{
- ior_output_file = ACE_OS::fopen ("consumer_ec.ior", "w");
+ ior_output_filename = "consumer_ec.ior";
+ ior_output_file = ACE_OS::fopen (ior_output_filename.c_str(), "w");
}
return 0;
}
diff --git a/TAO/orbsvcs/examples/RtEC/Federated_Kokyu/dynamic_topology_test/Consumer_Supplier_EC.cpp b/TAO/orbsvcs/examples/RtEC/Federated_Kokyu/dynamic_topology_test/Consumer_Supplier_EC.cpp
index efca4690cfa..df8f14809db 100644
--- a/TAO/orbsvcs/examples/RtEC/Federated_Kokyu/dynamic_topology_test/Consumer_Supplier_EC.cpp
+++ b/TAO/orbsvcs/examples/RtEC/Federated_Kokyu/dynamic_topology_test/Consumer_Supplier_EC.cpp
@@ -12,6 +12,7 @@
#include "orbsvcs/Event/EC_Kokyu_Factory.h"
#include "orbsvcs/Time_Utilities.h"
#include "orbsvcs/Event_Service_Constants.h"
+#include "orbsvcs/Scheduler_Factory.h"
#include "tao/ORB_Core.h"
#include "Kokyu_EC.h"
@@ -31,6 +32,7 @@ namespace
int supp_id = 3;
ACE_CString sched_type = "rms";
ACE_CString ior_input_file = "file://consumer_ec.ior";
+ ACE_CString ior_output_filename;
FILE * ior_output_file;
}
@@ -177,7 +179,7 @@ public:
RtecScheduler::VERY_LOW_IMPORTANCE,
supplier_impl1_3,
"supplier1_3",
- supp1_3_types[0]
+ supp1_3_types
ACE_ENV_ARG_PARAMETER
);
ACE_CHECK;
@@ -192,6 +194,44 @@ public:
add_dummy_supplier(types);
ACE_CHECK;
+ //DEBUG: print out schedule
+ RtecScheduler::Scheduler_ptr scheduler = this->scheduler(ACE_ENV_SINGLE_ARG_PARAMETER);
+ //RtecEventChannelAdmin::EventChannel_ptr event_channel = this->event_channel(ACE_ENV_SINGLE_ARG_DECL);
+
+ RtecScheduler::RT_Info_Set_var infos;
+ RtecScheduler::Config_Info_Set_var configs;
+ RtecScheduler::Dependency_Set_var dependencies;
+ RtecScheduler::Scheduling_Anomaly_Set unsafe_anomalies;
+ RtecScheduler::Scheduling_Anomaly_Set_var anomalies;
+
+ int min_os_priority =
+ ACE_Sched_Params::priority_min (ACE_SCHED_FIFO,
+ ACE_SCOPE_THREAD);
+ int max_os_priority =
+ ACE_Sched_Params::priority_max (ACE_SCHED_FIFO,
+ ACE_SCOPE_THREAD);
+
+ scheduler->compute_scheduling (min_os_priority,
+ max_os_priority,
+ infos.out (),
+ dependencies.out (),
+ configs.out (),
+ anomalies.out ()
+ ACE_ENV_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+
+ std::stringstream sched_out;
+ ACE_CString ior_prefix(ior_output_filename.c_str(),ior_output_filename.length()-4); //cut off '.ior'
+ sched_out << "schedule_" << ior_prefix.c_str() << ".out";
+
+ ACE_DEBUG((LM_DEBUG,"Supplier_EC writing schedule to %s\n",sched_out.str().c_str()));
+
+ ACE_Scheduler_Factory::dump_schedule (infos.in (),
+ dependencies.in (),
+ configs.in (),
+ anomalies.in (),
+ sched_out.str().c_str());
+ ////END DEBUG
//Kokyu_EC::start(ACE_ENV_SINGLE_ARG_PARAMETER);
//ACE_CHECK;
}
@@ -335,7 +375,8 @@ int parse_args (int argc, char *argv[])
{
case 'o':
{
- ior_output_file = ACE_OS::fopen (get_opts.opt_arg (), "w");
+ ior_output_filename = get_opts.opt_arg();
+ ior_output_file = ACE_OS::fopen (ior_output_filename.c_str(), "w");
if (ior_output_file == 0)
ACE_ERROR_RETURN ((LM_ERROR,
"Unable to open %s for writing: %p\n",
@@ -376,7 +417,8 @@ int parse_args (int argc, char *argv[])
}
if (ior_output_file == 0)
{
- ior_output_file = ACE_OS::fopen ("consumer_supplier_ec.ior", "w");
+ ior_output_filename = "consumer_supplier_ec.ior";
+ ior_output_file = ACE_OS::fopen (ior_output_filename.c_str(), "w");
}
// Indicates sucessful parsing of the command line
return 0;
diff --git a/TAO/orbsvcs/examples/RtEC/Federated_Kokyu/dynamic_topology_test/Dynamic_Supplier.cpp b/TAO/orbsvcs/examples/RtEC/Federated_Kokyu/dynamic_topology_test/Dynamic_Supplier.cpp
index 30e8ab245cf..eb6fdcfb5c6 100644
--- a/TAO/orbsvcs/examples/RtEC/Federated_Kokyu/dynamic_topology_test/Dynamic_Supplier.cpp
+++ b/TAO/orbsvcs/examples/RtEC/Federated_Kokyu/dynamic_topology_test/Dynamic_Supplier.cpp
@@ -1,12 +1,7 @@
// $Id$
#include "Dynamic_Supplier.h"
-//#include "ace/Time_Value.h"
-//#include "ace/Thread.h" //for ACE_Thread::self()
-//#include "ace/Counter.h"
-//#include "ace/OS_NS_sys_time.h"
#include "orbsvcs/Event_Service_Constants.h"
-//#include "orbsvcs/Event/EC_Event_Channel.h"
#include "orbsvcs/RtecEventCommC.h"
#if defined (ACE_HAS_DSUI)
@@ -17,11 +12,11 @@
ACE_RCSID(EC_Examples, Dynamic_Supplier, "$Id$")
-Dynamic_Supplier::Dynamic_Supplier(RtecEventComm::EventSourceID id,
- RtecEventComm::EventType normal_type1,
- RtecEventComm::EventType normal_type2,
- RtecEventComm::EventType ft_type1,
- RtecEventComm::EventType ft_type2,
+Dynamic_Supplier::Dynamic_Supplier(SourceID id,
+ EventType normal_type1,
+ EventType normal_type2,
+ EventType ft_type1,
+ EventType ft_type2,
Service_Handler *handler)
: Supplier(id,normal_type1,ft_type1,handler)
, norm_type2_(normal_type2)
@@ -44,17 +39,15 @@ Dynamic_Supplier::timeout_occured (ACE_ENV_SINGLE_ARG_DECL)
ACE_DEBUG((LM_DEBUG,"Dynamic_Supplier (%P|%t) handle_service_start() DONE\n"));
}
- PushConsumer_Vector *proxies = &(this->consumer_proxy_);
-
RtecEventComm::EventSet eventA (1);
eventA.length (1);
eventA[0].header.source = this->id_;
eventA[0].header.ttl = 1;
RtecEventComm::EventSet eventB (1);
- eventA.length (1);
- eventA[0].header.source = this->id_;
- eventA[0].header.ttl = 1;
+ eventB.length (1);
+ eventB[0].header.source = this->id_;
+ eventB[0].header.ttl = 1;
switch (this->mode_) {
case FAULT_TOLERANT:
@@ -93,31 +86,18 @@ Dynamic_Supplier::timeout_occured (ACE_ENV_SINGLE_ARG_DECL)
oid.type = eventA[0].header.type;
DSTRM_EVENT (WORKER_GROUP_FAM, ONE_WAY_CALL_START, 0, sizeof(Object_ID), (char*)&oid);
//TODO: BUG? This code pushes eventA/B to ALL consumers!
- for(PushConsumer_Vector::Iterator iter(*proxies);
- !iter.done(); iter.advance())
- {
- PushConsumer_Vector::TYPE *proxy; //would rather const to ensure we don't change it, but not supported!
- iter.next(proxy);
- ACE_DEBUG((LM_DEBUG,"Dynamic_Supplier (id %d) in thread %t pushing eventA %d\n",this->id_,eventA[0].header.type));
- (*proxy)->push (eventA ACE_ENV_ARG_PARAMETER);
- }
+ ACE_DEBUG((LM_DEBUG,"Dynamic_Supplier (id %d) in thread %t pushing eventA %d\n",this->id_,eventA[0].header.type));
+ this->consumer_proxy_->push (eventA ACE_ENV_ARG_PARAMETER);
DSTRM_EVENT (WORKER_GROUP_FAM, ONE_WAY_CALL_DONE, 0, sizeof(Object_ID), (char*)&oid);
oid.type = eventB[0].header.type;
DSTRM_EVENT (WORKER_GROUP_FAM, ONE_WAY_CALL_START, 0, sizeof(Object_ID), (char*)&oid);
- for(PushConsumer_Vector::Iterator iter(*proxies);
- !iter.done(); iter.advance())
- {
- PushConsumer_Vector::TYPE *proxy; //would rather const to ensure we don't change it, but not supported!
- iter.next(proxy);
- ACE_DEBUG((LM_DEBUG,"Dynamic_Supplier (id %d) in thread %t pushing eventB %d\n",this->id_,eventB[0].header.type));
- (*proxy)->push (eventB ACE_ENV_ARG_PARAMETER);
- }
- DSTRM_EVENT (WORKER_GROUP_FAM, ONE_WAY_CALL_DONE, 0, sizeof(Object_ID), (char*)&oid);
- //DSTRM_EVENT (WORKER_GROUP_FAM, ONE_WAY_CALL_DONE, m_id, 0, NULL);
+ ACE_DEBUG((LM_DEBUG,"Dynamic_Supplier (id %d) in thread %t pushing eventB %d\n",this->id_,eventB[0].header.type));
+ this->consumer_proxy_->push (eventB ACE_ENV_ARG_PARAMETER);
ACE_DEBUG((LM_DEBUG,"Dynamic_Supplier (id %d) in thread %t ONE_WAY_CALL_DONE at %u\n",this->id_,ACE_OS::gettimeofday().msec()));
+ DSTRM_EVENT (WORKER_GROUP_FAM, ONE_WAY_CALL_DONE, 0, sizeof(Object_ID), (char*)&oid);
if (this->handler_ != 0)
{
diff --git a/TAO/orbsvcs/examples/RtEC/Federated_Kokyu/dynamic_topology_test/Dynamic_Supplier.h b/TAO/orbsvcs/examples/RtEC/Federated_Kokyu/dynamic_topology_test/Dynamic_Supplier.h
index 2fc9cdc853b..82e5eb1990b 100644
--- a/TAO/orbsvcs/examples/RtEC/Federated_Kokyu/dynamic_topology_test/Dynamic_Supplier.h
+++ b/TAO/orbsvcs/examples/RtEC/Federated_Kokyu/dynamic_topology_test/Dynamic_Supplier.h
@@ -35,11 +35,11 @@ class Dynamic_Supplier : public Supplier
public:
///Calls up to Service_Handler every time a timeout
///occurs. normal_channel_id1 is used as id returned by get_id().
- Dynamic_Supplier(RtecEventComm::EventSourceID id,
- RtecEventComm::EventType normal_type1,
- RtecEventComm::EventType normal_type2,
- RtecEventComm::EventType ft_type1,
- RtecEventComm::EventType ft_type2,
+ Dynamic_Supplier(SourceID id,
+ EventType normal_type1,
+ EventType normal_type2,
+ EventType ft_type1,
+ EventType ft_type2,
Service_Handler *handler = 0);
virtual ~Dynamic_Supplier(void);
diff --git a/TAO/orbsvcs/examples/RtEC/Federated_Kokyu/dynamic_topology_test/Kokyu_EC.cpp b/TAO/orbsvcs/examples/RtEC/Federated_Kokyu/dynamic_topology_test/Kokyu_EC.cpp
index a9cc4004d0c..ba890822a96 100644
--- a/TAO/orbsvcs/examples/RtEC/Federated_Kokyu/dynamic_topology_test/Kokyu_EC.cpp
+++ b/TAO/orbsvcs/examples/RtEC/Federated_Kokyu/dynamic_topology_test/Kokyu_EC.cpp
@@ -96,9 +96,9 @@ Kokyu_EC::init(const char* schedule_discipline, PortableServer::POA_ptr poa)
ec_impl_ = ec;
consumer_admin_ = ec_impl_->for_consumers(ACE_ENV_SINGLE_ARG_PARAMETER);
- ACE_TRY_CHECK;
+ ACE_CHECK;
supplier_admin_ = ec_impl_->for_suppliers(ACE_ENV_SINGLE_ARG_PARAMETER);
- ACE_TRY_CHECK;
+ ACE_CHECK;
}
ACE_CATCHALL {
return -1;
@@ -126,7 +126,7 @@ Kokyu_EC::register_consumer (
{
RtecScheduler::handle_t consumer1_rt_info =
scheduler_impl_->create (entry_point ACE_ENV_ARG_PARAMETER);
- ACE_TRY_CHECK;
+ ACE_CHECK;
scheduler_impl_->set (consumer1_rt_info,
info.criticality,
@@ -139,26 +139,30 @@ Kokyu_EC::register_consumer (
info.threads,
info.info_type
ACE_ENV_ARG_PARAMETER);
- ACE_TRY_CHECK;
+ ACE_CHECK;
ACE_ConsumerQOS_Factory consumer_qos1;
if (type != ACE_ES_EVENT_INTERVAL_TIMEOUT)
- consumer_qos1.insert_type (type, consumer1_rt_info);
+ {
+ ACE_DEBUG((LM_DEBUG,"Kokyu_EC::register_consumer() inserting type %d into RT_Info %d\n",type,consumer1_rt_info));
+ consumer_qos1.insert_type (type, consumer1_rt_info);
+ }
else
- consumer_qos1.insert_time (ACE_ES_EVENT_INTERVAL_TIMEOUT,
- info.period, //in 100s of nanosec
- consumer1_rt_info);
-
+ {
+ consumer_qos1.insert_time (ACE_ES_EVENT_INTERVAL_TIMEOUT,
+ info.period, //in 100s of nanosec
+ consumer1_rt_info);
+ }
proxy_supplier =
consumer_admin_->obtain_push_supplier (ACE_ENV_SINGLE_ARG_PARAMETER);
- ACE_TRY_CHECK;
+ ACE_CHECK;
proxy_supplier->connect_push_consumer (consumer,
consumer_qos1.get_ConsumerQOS ()
ACE_ENV_ARG_PARAMETER);
- ACE_TRY_CHECK;
+ ACE_CHECK;
return consumer1_rt_info;
}
@@ -166,7 +170,7 @@ RtEventChannelAdmin::handle_t
Kokyu_EC::register_supplier (
const char * entry_point,
RtecEventComm::EventSourceID source,
- RtecEventComm::EventType type,
+ EventType_Vector& supp_types, //RtecEventComm::EventType type,
RtecEventComm::PushSupplier_ptr supplier,
RtecEventChannelAdmin::ProxyPushConsumer_out proxy_consumer
ACE_ENV_ARG_DECL
@@ -180,24 +184,32 @@ Kokyu_EC::register_supplier (
{
RtecScheduler::handle_t supplier1_rt_info =
scheduler_impl_->create (entry_point ACE_ENV_ARG_PARAMETER);
- ACE_TRY_CHECK;
+ ACE_CHECK;
ACE_SupplierQOS_Factory supplier_qos1;
- supplier_qos1.insert (source,
- type,
- supplier1_rt_info,
- 1 /* number of calls, but what does that mean? */);
+ for(EventType_Vector::Iterator iter(supp_types);
+ !iter.done(); iter.advance())
+ {
+ EventType_Vector::TYPE *type; //would rather const to ensure we don't change it, but not supported!
+ iter.next(type);
+
+ ACE_DEBUG((LM_DEBUG,"Kokyu_EC::register_supplier() inserting type %d into RT_Info %d\n",*type,supplier1_rt_info));
+ supplier_qos1.insert (source,
+ *type,
+ supplier1_rt_info,
+ 1 /* number of calls, but what does that mean? */);
+ }
proxy_consumer =
supplier_admin_->obtain_push_consumer (ACE_ENV_SINGLE_ARG_PARAMETER);
- ACE_TRY_CHECK;
+ ACE_CHECK;
proxy_consumer->connect_push_supplier (supplier,
supplier_qos1.get_SupplierQOS ()
ACE_ENV_ARG_PARAMETER);
- ACE_TRY_CHECK;
+ ACE_CHECK;
return supplier1_rt_info;
-}
+} //register_suppliers()
void
Kokyu_EC::add_dependency (
@@ -248,9 +260,9 @@ Kokyu_EC::start (ACE_ENV_SINGLE_ARG_DECL)
configs.out (),
anomalies.out ()
ACE_ENV_ARG_PARAMETER);
- ACE_TRY_CHECK;
+ ACE_CHECK;
ec_impl_->activate (ACE_ENV_SINGLE_ARG_PARAMETER);
- ACE_TRY_CHECK;
+ ACE_CHECK;
//@BT: EC activated is roughly equivalent to having the DT scheduler ready to run
//DSTRM_EVENT (MAIN_GROUP_FAM, SCHEDULER_STARTED, 1, 0, NULL);
@@ -276,7 +288,7 @@ void
Kokyu_EC::add_supplier_with_timeout(
Supplier * supplier_impl,
const char * supp_entry_point,
- RtecEventComm::EventType supp_type,
+ EventType_Vector& supp_types, //RtecEventComm::EventType supp_type,
Timeout_Consumer * timeout_consumer_impl,
const char * timeout_entry_point,
ACE_Time_Value period,
@@ -291,7 +303,7 @@ Kokyu_EC::add_supplier_with_timeout(
, RtecScheduler::SYNCHRONIZATION_FAILURE
))
{
- add_supplier(supplier_impl,supp_entry_point,supp_type ACE_ENV_ARG_PARAMETER);
+ add_supplier(supplier_impl,supp_entry_point,supp_types ACE_ENV_ARG_PARAMETER);
ACE_CHECK;
add_timeout_consumer(supplier_impl,timeout_consumer_impl,timeout_entry_point,period,crit,imp ACE_ENV_ARG_PARAMETER);
ACE_CHECK;
@@ -338,10 +350,8 @@ Kokyu_EC::add_timeout_consumer(
ACE_CHECK;
//don't need to save supplier_timeout_consumer_rt_info because only used to set dependency here:
- Supplier::RT_Info_Vector supp_infos = supplier_impl->rt_info();
this->add_dependency (supplier_timeout_consumer_rt_info,
- supp_infos[0],
- //supplier_impl->rt_info(),
+ supplier_impl->rt_info(),
1,
RtecBase::TWO_WAY_CALL
ACE_ENV_ARG_PARAMETER);
@@ -355,7 +365,7 @@ void
Kokyu_EC::add_supplier(
Supplier * supplier_impl,
const char * entry_point,
- RtecEventComm::EventType type
+ EventType_Vector& supp_types //RtecEventComm::EventType type
ACE_ENV_ARG_DECL
)
ACE_THROW_SPEC ((
@@ -365,7 +375,7 @@ Kokyu_EC::add_supplier(
, RtecScheduler::SYNCHRONIZATION_FAILURE
))
{
- RtecEventComm::EventSourceID supplier_id = supplier_impl->get_id();
+ Supplier::SourceID supplier_id = supplier_impl->get_id();
RtecEventChannelAdmin::ProxyPushConsumer_var consumer_proxy;
RtecEventComm::PushSupplier_var supplier;
@@ -376,18 +386,14 @@ Kokyu_EC::add_supplier(
RtecScheduler::handle_t supplier_rt_info =
this->register_supplier(entry_point,
supplier_id,
- type,
+ supp_types, //type,
supplier.in(),
consumer_proxy.out()
ACE_ENV_ARG_PARAMETER);
ACE_CHECK;
- Supplier::PushConsumer_Vector cons_proxies;
- cons_proxies.push_back(consumer_proxy);
- supplier_impl->set_consumer_proxy(cons_proxies);
- Supplier::RT_Info_Vector supp_infos;
- supp_infos.push_back(supplier_rt_info);
- supplier_impl->rt_info(supp_infos);
+ supplier_impl->set_consumer_proxy(consumer_proxy);
+ supplier_impl->rt_info(supplier_rt_info);
this->suppliers_.push_back(supplier_impl);
} //add_supplier()
@@ -403,7 +409,7 @@ Kokyu_EC::add_consumer_with_supplier(
RtecScheduler::Importance_t cons_imp,
Supplier * supplier_impl,
const char * supp_entry_point,
- RtecEventComm::EventType supp_type
+ EventType_Vector& supp_types //RtecEventComm::EventType supp_type
ACE_ENV_ARG_DECL
)
ACE_THROW_SPEC ((
@@ -415,14 +421,12 @@ Kokyu_EC::add_consumer_with_supplier(
{
add_consumer(consumer_impl,cons_entry_point,cons_period,cons_type,cons_crit,cons_imp ACE_ENV_ARG_PARAMETER);
ACE_CHECK;
- add_supplier(supplier_impl,supp_entry_point,supp_type ACE_ENV_ARG_PARAMETER);
+ add_supplier(supplier_impl,supp_entry_point,supp_types ACE_ENV_ARG_PARAMETER);
ACE_CHECK;
- Supplier::RT_Info_Vector supp_infos = supplier_impl->rt_info();
Consumer::RT_Info_Vector cons_infos = consumer_impl->rt_info();
this->add_dependency (cons_infos[0],//consumer_impl->rt_info(),
- supp_infos[0],
- //supplier_impl->rt_info(),
+ supplier_impl->rt_info(),
1,
RtecBase::TWO_WAY_CALL
ACE_ENV_ARG_PARAMETER);
@@ -479,7 +483,7 @@ Kokyu_EC::add_consumer(
} //add_consumer()
void
-Kokyu_EC::add_dummy_supplier(EventType_Vector& types
+Kokyu_EC::add_dummy_supplier(EventType_Vector& supp_types
ACE_ENV_ARG_DECL
)
ACE_THROW_SPEC ((
@@ -496,7 +500,7 @@ Kokyu_EC::add_dummy_supplier(EventType_Vector& types
//known event types.
RtecScheduler::handle_t general_rt_info =
this->scheduler_->create ("Generalized Supplier (DUMMY)" ACE_ENV_ARG_PARAMETER);
- ACE_TRY_CHECK;
+ ACE_CHECK;
ACE_Time_Value exec(0,500); //some arbitrary execution time
RtecScheduler::Time exec_time;
@@ -515,12 +519,12 @@ Kokyu_EC::add_dummy_supplier(EventType_Vector& types
0, //threads
RtecScheduler::OPERATION
ACE_ENV_ARG_PARAMETER);
- ACE_TRY_CHECK;
+ ACE_CHECK;
- RtecEventComm::EventSourceID general_id = 3000; //we assume 3000 is big enough to not overlap w/ actual suppliers
+ Supplier::SourceID general_id = 3000; //we assume 3000 is big enough to not overlap w/ actual suppliers
ACE_SupplierQOS_Factory general_qos;
//NOTE that this is kind of hard-cody since it assumes types between UNDEF+6 and UNDEF+9
- for(EventType_Vector::Iterator iter(types);
+ for(EventType_Vector::Iterator iter(supp_types);
!iter.done(); iter.advance())
{
EventType_Vector::TYPE *type; //would rather const to ensure we don't change it, but not supported!
@@ -534,7 +538,7 @@ Kokyu_EC::add_dummy_supplier(EventType_Vector& types
}
//we need to actually connect to the EC so the scheduler will set up the dependencies right
- RtecEventComm::EventType supp_type = general_id; //some arbitrary type, never pushed
+ Supplier::EventType supp_type = general_id; //some arbitrary type, never pushed
Supplier * general_impl;
ACE_NEW(general_impl,
Supplier(general_id,supp_type,supp_type));
@@ -542,23 +546,19 @@ Kokyu_EC::add_dummy_supplier(EventType_Vector& types
RtecEventComm::PushSupplier_var general;
general = general_impl->_this(ACE_ENV_SINGLE_ARG_PARAMETER);
- ACE_TRY_CHECK;
+ ACE_CHECK;
RtecEventChannelAdmin::ProxyPushConsumer_var proxy_consumer;
proxy_consumer = this->supplier_admin_->obtain_push_consumer (ACE_ENV_SINGLE_ARG_PARAMETER);
- ACE_TRY_CHECK;
+ ACE_CHECK;
proxy_consumer->connect_push_supplier (general.in(),
general_qos.get_SupplierQOS()
ACE_ENV_ARG_PARAMETER);
- ACE_TRY_CHECK;
-
- Supplier::PushConsumer_Vector proxies(1);
- proxies.push_back(proxy_consumer);
- general_impl->set_consumer_proxy(proxies);
- Supplier::RT_Info_Vector infos(1);
- infos.push_back(general_rt_info);
- general_impl->rt_info(infos);
+ ACE_CHECK;
+
+ general_impl->set_consumer_proxy(proxy_consumer);
+ general_impl->rt_info(general_rt_info);
this->suppliers_.push_back(general_impl);
} //add_dummy_supplier
diff --git a/TAO/orbsvcs/examples/RtEC/Federated_Kokyu/dynamic_topology_test/Kokyu_EC.h b/TAO/orbsvcs/examples/RtEC/Federated_Kokyu/dynamic_topology_test/Kokyu_EC.h
index caac54bd51c..56bccc56135 100644
--- a/TAO/orbsvcs/examples/RtEC/Federated_Kokyu/dynamic_topology_test/Kokyu_EC.h
+++ b/TAO/orbsvcs/examples/RtEC/Federated_Kokyu/dynamic_topology_test/Kokyu_EC.h
@@ -49,7 +49,7 @@ public:
virtual RtEventChannelAdmin::handle_t register_supplier (
const char * entry_point,
RtecEventComm::EventSourceID source,
- RtecEventComm::EventType type,
+ EventType_Vector& supp_types, //RtecEventComm::EventType type,
RtecEventComm::PushSupplier_ptr supplier,
RtecEventChannelAdmin::ProxyPushConsumer_out proxy_consumer
ACE_ENV_ARG_DECL
@@ -93,7 +93,7 @@ public:
void add_supplier_with_timeout(
Supplier * supplier_impl,
const char * supp_entry_point,
- RtecEventComm::EventType supp_type,
+ EventType_Vector& supp_types, //RtecEventComm::EventType supp_type,
Timeout_Consumer * timeout_consumer_impl,
const char * timeout_entry_point,
ACE_Time_Value period,
@@ -129,7 +129,7 @@ public:
void add_supplier(
Supplier * supplier_impl,
const char * entry_point,
- RtecEventComm::EventType type
+ EventType_Vector& supp_types //RtecEventComm::EventType type
ACE_ENV_ARG_DECL
)
ACE_THROW_SPEC ((
@@ -149,7 +149,7 @@ public:
RtecScheduler::Importance_t cons_imp,
Supplier * supplier_impl,
const char * supp_entry_point,
- RtecEventComm::EventType supp_type
+ EventType_Vector& supp_types //RtecEventComm::EventType supp_type
ACE_ENV_ARG_DECL
)
ACE_THROW_SPEC ((
@@ -176,7 +176,7 @@ public:
, RtecScheduler::SYNCHRONIZATION_FAILURE
));
- void add_dummy_supplier(EventType_Vector& types
+ void add_dummy_supplier(EventType_Vector& supp_types
ACE_ENV_ARG_DECL
)
ACE_THROW_SPEC ((
diff --git a/TAO/orbsvcs/examples/RtEC/Federated_Kokyu/dynamic_topology_test/Supplier.cpp b/TAO/orbsvcs/examples/RtEC/Federated_Kokyu/dynamic_topology_test/Supplier.cpp
index d182e1e9062..fe688bc7ed6 100644
--- a/TAO/orbsvcs/examples/RtEC/Federated_Kokyu/dynamic_topology_test/Supplier.cpp
+++ b/TAO/orbsvcs/examples/RtEC/Federated_Kokyu/dynamic_topology_test/Supplier.cpp
@@ -17,9 +17,9 @@
ACE_RCSID(EC_Examples, Supplier, "$Id$")
-Supplier::Supplier (RtecEventComm::EventSourceID id,
- RtecEventComm::EventType norm_type,
- RtecEventComm::EventType ft_type,
+Supplier::Supplier (SourceID id,
+ EventType norm_type,
+ EventType ft_type,
Service_Handler *handler)
: id_ (id)
, norm_type_(norm_type)
@@ -34,27 +34,18 @@ Supplier::~Supplier (void)
}
void
-Supplier::set_consumer_proxy(PushConsumer_Vector consumer_proxies)
+Supplier::set_consumer_proxy(ConsumerProxy proxy)
{
- this->consumer_proxy_.clear();
-
- for(PushConsumer_Vector::Iterator iter(consumer_proxies);
- !iter.done(); iter.advance())
- {
- PushConsumer_Vector::TYPE *proxy; //would rather const to ensure we don't change it, but not supported!
- iter.next(proxy);
-
- this->consumer_proxy_.push_back(*proxy);
- }
+ this->consumer_proxy_ = proxy;
}
void
-Supplier::rt_info(RT_Info_Vector& supplier_rt_info)
+Supplier::rt_info(InfoHandle supplier_rt_info)
{
this->rt_info_ = supplier_rt_info;
}
-Supplier::RT_Info_Vector&
+Supplier::InfoHandle
Supplier::rt_info(void)
{
return this->rt_info_;
@@ -103,14 +94,8 @@ Supplier::timeout_occured (ACE_ENV_SINGLE_ARG_DECL)
ACE_DEBUG((LM_DEBUG,"Supplier (id %d) in thread %t ONE_WAY_CALL_START at %u\n",this->id_,ACE_OS::gettimeofday().msec()));
DSTRM_EVENT (WORKER_GROUP_FAM, ONE_WAY_CALL_START, 0, sizeof(Object_ID), (char*)&oid);
- for(PushConsumer_Vector::Iterator iter(this->consumer_proxy_);
- !iter.done(); iter.advance())
- {
- PushConsumer_Vector::TYPE *proxy; //would rather const to ensure we don't change it, but not supported!
- iter.next(proxy);
+ this->consumer_proxy_->push (event ACE_ENV_ARG_PARAMETER);
- (*proxy)->push (event ACE_ENV_ARG_PARAMETER);
- }
//DSTRM_EVENT (WORKER_GROUP_FAM, ONE_WAY_CALL_DONE, m_id, 0, NULL);
ACE_DEBUG((LM_DEBUG,"Supplier (id %d) in thread %t ONE_WAY_CALL_DONE at %u\n",this->id_,ACE_OS::gettimeofday().msec()));
DSTRM_EVENT (WORKER_GROUP_FAM, ONE_WAY_CALL_DONE, 0, sizeof(Object_ID), (char*)&oid);
@@ -129,14 +114,14 @@ Supplier::disconnect_push_supplier (ACE_ENV_SINGLE_ARG_DECL_NOT_USED)
{
}
-RtecEventComm::EventSourceID
+Supplier::SourceID
Supplier::get_id(void) const
{
return this->id_;
}
void
-Supplier::mode(Supplier::mode_t mode)
+Supplier::mode(mode_t mode)
{
ACE_DEBUG((LM_DEBUG,"Supplier (%P|%t) changing mode from %d to %d\n",this->mode_,mode));
this->mode_ = mode;
diff --git a/TAO/orbsvcs/examples/RtEC/Federated_Kokyu/dynamic_topology_test/Supplier.h b/TAO/orbsvcs/examples/RtEC/Federated_Kokyu/dynamic_topology_test/Supplier.h
index 44adcaa76b5..6186a4eb9b6 100644
--- a/TAO/orbsvcs/examples/RtEC/Federated_Kokyu/dynamic_topology_test/Supplier.h
+++ b/TAO/orbsvcs/examples/RtEC/Federated_Kokyu/dynamic_topology_test/Supplier.h
@@ -37,7 +37,7 @@ class Supplier : public POA_RtecEventComm::PushSupplier
//
// = DESCRIPTION
// This class is a supplier of events.
- // It simply register for two event typesone event type
+ // It simply registers for two event types.
// The class is just a helper to simplify common tasks in EC
// tests, such as subscribing for a range of events, disconnecting
// from the EC, informing the driver of shutdown messages, etc.
@@ -51,9 +51,12 @@ public:
FAULT_TOLERANT
};
- typedef ACE_Vector<RtecEventChannelAdmin::ProxyPushConsumer_var> PushConsumer_Vector;
+ typedef RtecEventChannelAdmin::ProxyPushConsumer_var ConsumerProxy;
+ typedef RtecEventComm::EventSourceID SourceID;
+ typedef RtecEventComm::EventType EventType;
+ typedef RtecScheduler::handle_t InfoHandle;
- Supplier (RtecEventComm::EventSourceID id, RtecEventComm::EventType norm_type, RtecEventComm::EventType ft_type,
+ Supplier (SourceID id, EventType norm_type, EventType ft_type,
Service_Handler * handler = 0);
// Constructor
@@ -67,14 +70,12 @@ public:
virtual void timeout_occured (ACE_ENV_SINGLE_ARG_DECL);
- void set_consumer_proxy(PushConsumer_Vector consumer_proxies);
+ void set_consumer_proxy(ConsumerProxy consumer_proxies);
- typedef ACE_Vector<RtecScheduler::handle_t> RT_Info_Vector;
+ void rt_info(InfoHandle supplier_rt_info);
+ InfoHandle rt_info(void);
- void rt_info(RT_Info_Vector& supplier_rt_info);
- RT_Info_Vector& rt_info(void);
-
- RtecEventComm::EventSourceID get_id(void) const;
+ SourceID get_id(void) const;
void mode(mode_t mode);
mode_t mode(void) const;
@@ -82,15 +83,15 @@ public:
Service_Handler * handler(void) const;
protected:
- RtecEventComm::EventSourceID id_;
- RtecEventComm::EventType norm_type_;
- RtecEventComm::EventType ft_type_;
+ SourceID id_;
+ EventType norm_type_;
+ EventType ft_type_;
- PushConsumer_Vector consumer_proxy_;
+ ConsumerProxy consumer_proxy_;
mode_t mode_;
- RT_Info_Vector rt_info_;
+ InfoHandle rt_info_;
Service_Handler *handler_;
}; //class Supplier
diff --git a/TAO/orbsvcs/examples/RtEC/Federated_Kokyu/dynamic_topology_test/Supplier_EC.cpp b/TAO/orbsvcs/examples/RtEC/Federated_Kokyu/dynamic_topology_test/Supplier_EC.cpp
index c8d40714f21..8dfee69b2a3 100644
--- a/TAO/orbsvcs/examples/RtEC/Federated_Kokyu/dynamic_topology_test/Supplier_EC.cpp
+++ b/TAO/orbsvcs/examples/RtEC/Federated_Kokyu/dynamic_topology_test/Supplier_EC.cpp
@@ -14,6 +14,7 @@
#include "orbsvcs/Event/EC_Kokyu_Factory.h"
#include "orbsvcs/Time_Utilities.h"
#include "orbsvcs/Event_Service_Constants.h"
+#include "orbsvcs/Scheduler_Factory.h"
#include "tao/ORB_Core.h"
#include "Kokyu_EC.h"
@@ -31,7 +32,8 @@
namespace
{
int config_run = 0;
- ACE_CString sched_type ="rms";
+ ACE_CString sched_type = "rms";
+ ACE_CString ior_output_filename;
FILE * ior_output_file;
typedef ACE_Vector<const char*> Filename_Array;
@@ -223,7 +225,7 @@ public:
Mode_Handler *mode_handler;
ACE_NEW(mode_handler,
- Mode_Handler(100)); //mode switch after first event
+ Mode_Handler(5)); //mode switch after first event
Supplier *supplier_impl1_1;
Timeout_Consumer *timeout_consumer_impl1_1;
ACE_NEW(supplier_impl1_1,
@@ -235,7 +237,7 @@ public:
ACE_Time_Value tv(1,200000); //period DEBUG: set to much longer period
add_supplier_with_timeout(supplier_impl1_1,
"supplier1_1",
- supp1_1_types[0],
+ supp1_1_types,
timeout_consumer_impl1_1,
"supplier1_1_timeout_consumer",
tv,
@@ -282,7 +284,7 @@ public:
RtecScheduler::VERY_LOW_IMPORTANCE,
supplier_impl1_2,
"supplier1_2",
- supp1_2_types[0]
+ supp1_2_types
ACE_ENV_ARG_PARAMETER
);
ACE_CHECK;
@@ -297,6 +299,44 @@ public:
add_dummy_supplier(types);
ACE_CHECK;
+ //DEBUG: print out schedule
+ RtecScheduler::Scheduler_ptr scheduler = this->scheduler(ACE_ENV_SINGLE_ARG_PARAMETER);
+ //RtecEventChannelAdmin::EventChannel_ptr event_channel = this->event_channel(ACE_ENV_SINGLE_ARG_DECL);
+
+ RtecScheduler::RT_Info_Set_var infos;
+ RtecScheduler::Config_Info_Set_var configs;
+ RtecScheduler::Dependency_Set_var dependencies;
+ RtecScheduler::Scheduling_Anomaly_Set unsafe_anomalies;
+ RtecScheduler::Scheduling_Anomaly_Set_var anomalies;
+
+ int min_os_priority =
+ ACE_Sched_Params::priority_min (ACE_SCHED_FIFO,
+ ACE_SCOPE_THREAD);
+ int max_os_priority =
+ ACE_Sched_Params::priority_max (ACE_SCHED_FIFO,
+ ACE_SCOPE_THREAD);
+
+ scheduler->compute_scheduling (min_os_priority,
+ max_os_priority,
+ infos.out (),
+ dependencies.out (),
+ configs.out (),
+ anomalies.out ()
+ ACE_ENV_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+
+ std::stringstream sched_out;
+ ACE_CString ior_prefix(ior_output_filename.c_str(),ior_output_filename.length()-4); //cut off '.ior'
+ sched_out << "schedule_" << ior_prefix.c_str() << ".out";
+
+ ACE_DEBUG((LM_DEBUG,"Supplier_EC writing schedule to %s\n",sched_out.str().c_str()));
+
+ ACE_Scheduler_Factory::dump_schedule (infos.in (),
+ dependencies.in (),
+ configs.in (),
+ anomalies.in (),
+ sched_out.str().c_str());
+ ////END DEBUG
Kokyu_EC::start(ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_CHECK;
} //start()
@@ -444,7 +484,8 @@ int parse_args (int argc, char *argv[])
switch (c)
{
case 'o':
- ior_output_file = ACE_OS::fopen (get_opts.opt_arg (), "w");
+ ior_output_filename = get_opts.opt_arg();
+ ior_output_file = ACE_OS::fopen (ior_output_filename.c_str(), "w");
if (ior_output_file == 0)
{
ACE_ERROR_RETURN ((LM_ERROR,
@@ -477,7 +518,8 @@ int parse_args (int argc, char *argv[])
}
if (ior_output_file == 0)
{
- ior_output_file = ACE_OS::fopen ("supplier_ec.ior", "w");
+ ior_output_filename = "supplier_ec.ior";
+ ior_output_file = ACE_OS::fopen (ior_output_filename.c_str(), "w");
}
if (ior_input_files.size() == 0)
{