summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorthrall <thrall@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>2004-05-24 21:31:16 +0000
committerthrall <thrall@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>2004-05-24 21:31:16 +0000
commit7f6aee383581cdce6b1cc8064454db7cfcf14623 (patch)
treec1fba3022cc066eb3dba288d4370393870d613c8
parentb7c272b2b1c74a45e0f9e1060c4db6c71ce17cbb (diff)
downloadATCD-7f6aee383581cdce6b1cc8064454db7cfcf14623.tar.gz
Stripped multiple subscriptions to get down to working path. Added dummy supplier for Consumer_Supplier_EC and Consumer_EC. Fixed ACE_Reactor in Reactor_Task. Added scripts for running the processes
-rwxr-xr-xTAO/orbsvcs/examples/RtEC/Federated_Kokyu/dynamic_topology_test/Client3
-rw-r--r--TAO/orbsvcs/examples/RtEC/Federated_Kokyu/dynamic_topology_test/Consumer_EC.cpp4
-rw-r--r--TAO/orbsvcs/examples/RtEC/Federated_Kokyu/dynamic_topology_test/Consumer_Supplier_EC.cpp54
-rw-r--r--TAO/orbsvcs/examples/RtEC/Federated_Kokyu/dynamic_topology_test/Kokyu_EC.cpp334
-rw-r--r--TAO/orbsvcs/examples/RtEC/Federated_Kokyu/dynamic_topology_test/Kokyu_EC.h30
-rwxr-xr-xTAO/orbsvcs/examples/RtEC/Federated_Kokyu/dynamic_topology_test/Middle13
-rwxr-xr-xTAO/orbsvcs/examples/RtEC/Federated_Kokyu/dynamic_topology_test/Middle23
-rwxr-xr-xTAO/orbsvcs/examples/RtEC/Federated_Kokyu/dynamic_topology_test/Server3
-rw-r--r--TAO/orbsvcs/examples/RtEC/Federated_Kokyu/dynamic_topology_test/Supplier_EC.cpp20
9 files changed, 251 insertions, 203 deletions
diff --git a/TAO/orbsvcs/examples/RtEC/Federated_Kokyu/dynamic_topology_test/Client b/TAO/orbsvcs/examples/RtEC/Federated_Kokyu/dynamic_topology_test/Client
new file mode 100755
index 00000000000..25befde4485
--- /dev/null
+++ b/TAO/orbsvcs/examples/RtEC/Federated_Kokyu/dynamic_topology_test/Client
@@ -0,0 +1,3 @@
+#!/bin/bash
+
+./Supplier_EC -s edf -o client.ior -i middle1.ior -i middle2.ior
diff --git a/TAO/orbsvcs/examples/RtEC/Federated_Kokyu/dynamic_topology_test/Consumer_EC.cpp b/TAO/orbsvcs/examples/RtEC/Federated_Kokyu/dynamic_topology_test/Consumer_EC.cpp
index d0182f04b00..1fe20dd823d 100644
--- a/TAO/orbsvcs/examples/RtEC/Federated_Kokyu/dynamic_topology_test/Consumer_EC.cpp
+++ b/TAO/orbsvcs/examples/RtEC/Federated_Kokyu/dynamic_topology_test/Consumer_EC.cpp
@@ -151,7 +151,7 @@ public:
add_consumer(consumer_impl1_3,
"consumer1_3",
tv,
- cons1_3_types,
+ cons1_3_types[0],
RtecScheduler::VERY_HIGH_CRITICALITY,
RtecScheduler::VERY_HIGH_IMPORTANCE
ACE_ENV_ARG_PARAMETER
@@ -296,7 +296,7 @@ main (int argc, char* argv[])
int parse_args (int argc, char *argv[])
{
-ACE_Get_Opt get_opts (argc, argv, "cs:o:");
+ACE_Get_Opt get_opts (argc, argv, "s:o:");
int c;
while ((c = get_opts ()) != -1)
diff --git a/TAO/orbsvcs/examples/RtEC/Federated_Kokyu/dynamic_topology_test/Consumer_Supplier_EC.cpp b/TAO/orbsvcs/examples/RtEC/Federated_Kokyu/dynamic_topology_test/Consumer_Supplier_EC.cpp
index bd3e89bdc49..efca4690cfa 100644
--- a/TAO/orbsvcs/examples/RtEC/Federated_Kokyu/dynamic_topology_test/Consumer_Supplier_EC.cpp
+++ b/TAO/orbsvcs/examples/RtEC/Federated_Kokyu/dynamic_topology_test/Consumer_Supplier_EC.cpp
@@ -29,7 +29,8 @@
namespace
{
int supp_id = 3;
- ACE_CString sched_type ="rms";
+ ACE_CString sched_type = "rms";
+ ACE_CString ior_input_file = "file://consumer_ec.ior";
FILE * ior_output_file;
}
@@ -130,39 +131,32 @@ public:
//Supplier types
RtecEventComm::EventType supp_normal_type;
RtecEventComm::EventType supp_ft_type;
- //We need to register these since the scheduler is local; it doesn't know about the other Consumer_Supplier_EC
- RtecEventComm::EventType supp_normal_type_hack;
- RtecEventComm::EventType supp_ft_type_hack;
//Consumer types
RtecEventComm::EventType cons_normal_type;
RtecEventComm::EventType cons_ft_type;
if (supp_id == 3)
{
+ ACE_DEBUG((LM_DEBUG,"Consumer_Supplier_EC has id 3\n"));
supp_normal_type = ACE_ES_EVENT_UNDEFINED+6;
supp_ft_type = ACE_ES_EVENT_UNDEFINED+8;
- supp_normal_type_hack = ACE_ES_EVENT_UNDEFINED+7;
- supp_ft_type_hack = ACE_ES_EVENT_UNDEFINED+9;
cons_normal_type = ACE_ES_EVENT_UNDEFINED+2;
cons_ft_type = ACE_ES_EVENT_UNDEFINED+4;
}
else //supp_id == 4
{
+ ACE_DEBUG((LM_DEBUG,"Consumer_Supplier_EC has id 4\n"));
supp_normal_type = ACE_ES_EVENT_UNDEFINED+7;
supp_ft_type = ACE_ES_EVENT_UNDEFINED+9;
- supp_normal_type_hack = ACE_ES_EVENT_UNDEFINED+6;
- supp_ft_type_hack = ACE_ES_EVENT_UNDEFINED+8;
cons_normal_type = ACE_ES_EVENT_UNDEFINED+3;
cons_ft_type = ACE_ES_EVENT_UNDEFINED+5;
}
Kokyu_EC::EventType_Vector supp1_3_types;
supp1_3_types.push_back(supp_normal_type);
supp1_3_types.push_back(supp_ft_type);
- supp1_3_types.push_back(supp_normal_type_hack);
- supp1_3_types.push_back(supp_ft_type_hack);
Kokyu_EC::EventType_Vector cons1_2_types(2);
- cons1_2_types.push_back(cons_ft_type);
cons1_2_types.push_back(cons_normal_type);
+ cons1_2_types.push_back(cons_ft_type);
Supplier *supplier_impl1_3;
ACE_NEW(supplier_impl1_3,
@@ -178,16 +172,26 @@ public:
add_consumer_with_supplier(consumer_impl1_2, //deleted in consumer
"consumer1_2",
tv,
- cons1_2_types,
+ cons1_2_types[0],
RtecScheduler::VERY_LOW_CRITICALITY,
RtecScheduler::VERY_LOW_IMPORTANCE,
supplier_impl1_3,
"supplier1_3",
- supp1_3_types
+ supp1_3_types[0]
ACE_ENV_ARG_PARAMETER
);
ACE_CHECK;
+ EventType_Vector types(4);
+ for(RtecEventComm::EventType type = ACE_ES_EVENT_UNDEFINED+6;
+ type <= ACE_ES_EVENT_UNDEFINED+9; type++)
+ {
+ types.push_back(type);
+ }
+
+ add_dummy_supplier(types);
+ ACE_CHECK;
+
//Kokyu_EC::start(ACE_ENV_SINGLE_ARG_PARAMETER);
//ACE_CHECK;
}
@@ -245,7 +249,7 @@ main (int argc, char* argv[])
cons_supp_ec.init_gateway(orb.in(),
poa.in(),
- "file://consumer_ec.ior" ACE_ENV_ARG_PARAMETER);
+ ior_input_file.c_str() ACE_ENV_ARG_PARAMETER);
ACE_TRY_CHECK;
// ****************************************************************
RtEventChannelAdmin::RtSchedEventChannel_var cons_supp_ec_ior =
@@ -319,8 +323,12 @@ main (int argc, char* argv[])
int parse_args (int argc, char *argv[])
{
- ACE_Get_Opt get_opts (argc, argv, "cs:o:d:");
+ ACE_Get_Opt get_opts (argc, argv, "s:o:d:i:");
int c;
+ //these used for handline '-i':
+ const char* input_file;
+ size_t len;
+ char *filename;
while ((c = get_opts ()) != -1)
switch (c)
@@ -344,12 +352,24 @@ int parse_args (int argc, char *argv[])
supp_id = atol(get_opts.opt_arg ());
break;
}
+ case 'i':
+ input_file = get_opts.opt_arg();
+ len = ACE_OS::strlen("file://")+ACE_OS::strlen(input_file)+1;
+ filename = new char[len];
+ sprintf(filename,"file://%s",input_file);
+ ACE_DEBUG((LM_DEBUG,"Adding consumer IOR %s\n",filename));
+ ior_input_file = filename;
+ break;
case '?':
default:
{
ACE_ERROR_RETURN ((LM_ERROR,
- "Usage: %s -s <rms|muf|edf> -d <supplier id>"
- "\n",
+ "Usage: %s -s <rms|muf|edf>"
+ " -d <supplier id>"
+ " [-o iorfile]"
+ " [-i server_EC_ior_file]"
+ "\n"
+ "For multiple consumers, specify -i multiple times\n",
argv [0]),
-1);
}
diff --git a/TAO/orbsvcs/examples/RtEC/Federated_Kokyu/dynamic_topology_test/Kokyu_EC.cpp b/TAO/orbsvcs/examples/RtEC/Federated_Kokyu/dynamic_topology_test/Kokyu_EC.cpp
index 3d75827452c..a9cc4004d0c 100644
--- a/TAO/orbsvcs/examples/RtEC/Federated_Kokyu/dynamic_topology_test/Kokyu_EC.cpp
+++ b/TAO/orbsvcs/examples/RtEC/Federated_Kokyu/dynamic_topology_test/Kokyu_EC.cpp
@@ -10,7 +10,6 @@
#include "orbsvcs/Event/EC_Kokyu_Factory.h"
#include "ace/OS_NS_strings.h" //for ACE_OS::strcasecmp
#include "ace/OS_NS_sys_time.h" // for ACE_OS::gettimeofday
-#include <sstream>
#include "Kokyu/Dispatch_Deferrer.h"
@@ -61,15 +60,22 @@ Kokyu_EC::Kokyu_EC(void)
Kokyu_EC::~Kokyu_EC(void)
{
+ //Need ACE_ENV_ARG_PARAMETER for these?
for(size_t i=0; i<suppliers_.size(); ++i) {
+ suppliers_[i]->disconnect_push_supplier();
delete suppliers_[i];
}
for(size_t i=0; i<timeout_consumers_.size(); ++i) {
+ timeout_consumers_[i]->disconnect_push_consumer();
delete timeout_consumers_[i];
}
for(size_t i=0; i<consumers_.size(); ++i) {
+ consumers_[i]->disconnect_push_consumer();
delete consumers_[i];
}
+
+ this->ec_impl_->destroy();
+
}
int
@@ -106,7 +112,7 @@ RtEventChannelAdmin::handle_t
Kokyu_EC::register_consumer (
const char * entry_point,
const RtEventChannelAdmin::SchedInfo & info,
- EventType_Vector& types,
+ RtecEventComm::EventType type,
RtecEventComm::PushConsumer_ptr consumer,
RtecEventChannelAdmin::ProxyPushSupplier_out proxy_supplier
ACE_ENV_ARG_DECL
@@ -120,7 +126,7 @@ Kokyu_EC::register_consumer (
{
RtecScheduler::handle_t consumer1_rt_info =
scheduler_impl_->create (entry_point ACE_ENV_ARG_PARAMETER);
- ACE_CHECK;
+ ACE_TRY_CHECK;
scheduler_impl_->set (consumer1_rt_info,
info.criticality,
@@ -133,40 +139,26 @@ Kokyu_EC::register_consumer (
info.threads,
info.info_type
ACE_ENV_ARG_PARAMETER);
- ACE_CHECK;
+ ACE_TRY_CHECK;
ACE_ConsumerQOS_Factory consumer_qos1;
- consumer_qos1.start_disjunction_group(types.size());
- for(EventType_Vector::Iterator iter(types);
- !iter.done(); iter.advance())
- {
- EventType_Vector::TYPE *type; //would rather const to ensure we don't change it, but not supported!
- iter.next(type);
- ACE_DEBUG((LM_DEBUG,"Kokyu_EC register_consumer() registering event type %d\n",*type));
- if (*type != ACE_ES_EVENT_INTERVAL_TIMEOUT)
- {
- consumer_qos1.insert_type (*type, consumer1_rt_info);
- }
- else
- {
- consumer_qos1.insert_time (ACE_ES_EVENT_INTERVAL_TIMEOUT,
- info.period, //in 100s of nanosec
- consumer1_rt_info);
- }
- ACE_DEBUG((LM_DEBUG,"Kokyu_EC register_consumer() registered event type\n"));
- }
+ if (type != ACE_ES_EVENT_INTERVAL_TIMEOUT)
+ consumer_qos1.insert_type (type, consumer1_rt_info);
+ else
+ consumer_qos1.insert_time (ACE_ES_EVENT_INTERVAL_TIMEOUT,
+ info.period, //in 100s of nanosec
+ consumer1_rt_info);
+
proxy_supplier =
consumer_admin_->obtain_push_supplier (ACE_ENV_SINGLE_ARG_PARAMETER);
- ACE_CHECK;
+ ACE_TRY_CHECK;
proxy_supplier->connect_push_consumer (consumer,
consumer_qos1.get_ConsumerQOS ()
ACE_ENV_ARG_PARAMETER);
- ACE_CHECK;
-
- //ACE_DEBUG((LM_DEBUG,"Kokyu_EC register_consumer() connected PushConsumer\n"));
+ ACE_TRY_CHECK;
return consumer1_rt_info;
}
@@ -174,7 +166,7 @@ RtEventChannelAdmin::handle_t
Kokyu_EC::register_supplier (
const char * entry_point,
RtecEventComm::EventSourceID source,
- EventType_Vector& types,
+ RtecEventComm::EventType type,
RtecEventComm::PushSupplier_ptr supplier,
RtecEventChannelAdmin::ProxyPushConsumer_out proxy_consumer
ACE_ENV_ARG_DECL
@@ -188,28 +180,22 @@ Kokyu_EC::register_supplier (
{
RtecScheduler::handle_t supplier1_rt_info =
scheduler_impl_->create (entry_point ACE_ENV_ARG_PARAMETER);
- ACE_CHECK;
+ ACE_TRY_CHECK;
ACE_SupplierQOS_Factory supplier_qos1;
- for(EventType_Vector::Iterator iter(types);
- !iter.done(); iter.advance())
- {
- EventType_Vector::TYPE *type; //would rather const to ensure we don't change it, but not supported!
- iter.next(type);
- supplier_qos1.insert (source,
- *type,
- supplier1_rt_info,
- 1 /* number of calls, but what does that mean? */);
- }
+ supplier_qos1.insert (source,
+ type,
+ supplier1_rt_info,
+ 1 /* number of calls, but what does that mean? */);
proxy_consumer =
supplier_admin_->obtain_push_consumer (ACE_ENV_SINGLE_ARG_PARAMETER);
- ACE_CHECK;
+ ACE_TRY_CHECK;
proxy_consumer->connect_push_supplier (supplier,
supplier_qos1.get_SupplierQOS ()
ACE_ENV_ARG_PARAMETER);
- ACE_CHECK;
+ ACE_TRY_CHECK;
return supplier1_rt_info;
}
@@ -262,15 +248,9 @@ Kokyu_EC::start (ACE_ENV_SINGLE_ARG_DECL)
configs.out (),
anomalies.out ()
ACE_ENV_ARG_PARAMETER);
- ACE_CHECK;
+ ACE_TRY_CHECK;
ec_impl_->activate (ACE_ENV_SINGLE_ARG_PARAMETER);
- ACE_CHECK;
-
- ACE_Scheduler_Factory::dump_schedule (infos.in (),
- dependencies.in (),
- configs.in (),
- anomalies.in (),
- "schedule.out");
+ ACE_TRY_CHECK;
//@BT: EC activated is roughly equivalent to having the DT scheduler ready to run
//DSTRM_EVENT (MAIN_GROUP_FAM, SCHEDULER_STARTED, 1, 0, NULL);
@@ -296,7 +276,7 @@ void
Kokyu_EC::add_supplier_with_timeout(
Supplier * supplier_impl,
const char * supp_entry_point,
- EventType_Vector& types,
+ RtecEventComm::EventType supp_type,
Timeout_Consumer * timeout_consumer_impl,
const char * timeout_entry_point,
ACE_Time_Value period,
@@ -311,7 +291,7 @@ Kokyu_EC::add_supplier_with_timeout(
, RtecScheduler::SYNCHRONIZATION_FAILURE
))
{
- add_supplier(supplier_impl,supp_entry_point,types ACE_ENV_ARG_PARAMETER);
+ add_supplier(supplier_impl,supp_entry_point,supp_type ACE_ENV_ARG_PARAMETER);
ACE_CHECK;
add_timeout_consumer(supplier_impl,timeout_consumer_impl,timeout_entry_point,period,crit,imp ACE_ENV_ARG_PARAMETER);
ACE_CHECK;
@@ -348,34 +328,24 @@ Kokyu_EC::add_timeout_consumer(
info.threads = 0;
info.info_type = RtecScheduler::OPERATION;
- EventType_Vector types(1);
- types.push_back(ACE_ES_EVENT_INTERVAL_TIMEOUT);
RtecScheduler::handle_t supplier_timeout_consumer_rt_info =
this->register_consumer(timeout_entry_point,
info,
- types,
+ ACE_ES_EVENT_INTERVAL_TIMEOUT,
safe_timeout_consumer.in(),
timeout_supplier_proxy.out()
ACE_ENV_ARG_PARAMETER);
ACE_CHECK;
//don't need to save supplier_timeout_consumer_rt_info because only used to set dependency here:
- for(Supplier::RT_Info_Vector::Iterator iter(supplier_impl->rt_info());
- !iter.done(); iter.advance())
- {
- Supplier::RT_Info_Vector::TYPE *info; //would rather const to ensure we don't change it, but not supported!
- iter.next(info);
-
- ACE_DEBUG((LM_DEBUG,"Kokyu_EC add_supplier_with_timeout() adding deps for supplier RT_Info %d and "
- "timeout_consumer RT_Info %d\n",*info,supplier_timeout_consumer_rt_info));
-
- this->add_dependency (supplier_timeout_consumer_rt_info,
- *info,
- 1,
- RtecBase::TWO_WAY_CALL
- ACE_ENV_ARG_PARAMETER);
- ACE_CHECK;
- }
+ Supplier::RT_Info_Vector supp_infos = supplier_impl->rt_info();
+ this->add_dependency (supplier_timeout_consumer_rt_info,
+ supp_infos[0],
+ //supplier_impl->rt_info(),
+ 1,
+ RtecBase::TWO_WAY_CALL
+ ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK;
this->timeout_consumers_.push_back(timeout_consumer_impl);
} //add_supplier_with_timeout()
@@ -385,7 +355,7 @@ void
Kokyu_EC::add_supplier(
Supplier * supplier_impl,
const char * entry_point,
- EventType_Vector& types
+ RtecEventComm::EventType type
ACE_ENV_ARG_DECL
)
ACE_THROW_SPEC ((
@@ -403,42 +373,21 @@ Kokyu_EC::add_supplier(
supplier = supplier_impl->_this(ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_CHECK;
- Supplier::RT_Info_Vector rt_infos(types.size());
- Supplier::PushConsumer_Vector proxies(types.size());
- /*
- EventType_Vector one_type(1);
- for(EventType_Vector::Iterator iter(types);
- !iter.done(); iter.advance())
- {
- EventType_Vector::TYPE *type; //would rather const to ensure we don't change it, but not supported!
- iter.next(type);
- one_type.clear();
- one_type.push_back(*type);
-
- std::stringstream entry;
- entry << entry_point << ":" << *type;
-
- ACE_DEBUG((LM_DEBUG,"Kokyu_EC add_supplier() registering supplier for type %d\n",*type));
- */
- RtecScheduler::handle_t supplier_rt_info =
- this->register_supplier(entry_point,//entry.str().c_str(),
- supplier_id,
- types,//one_type,
- supplier.in(),
- consumer_proxy.out()
- ACE_ENV_ARG_PARAMETER);
- ACE_CHECK;
-
- //ACE_DEBUG((LM_DEBUG,"Kokyu_EC add_supplier() consumer_proxy: %@ with RT_Info %s [%d]\n",consumer_proxy.in(),entry.str().c_str(),supplier_rt_info));
-
- rt_infos.push_back(supplier_rt_info);
- proxies.push_back(consumer_proxy); //proxies has a _var ref to Consumer_Proxy so won't be deleted on return
-
- //supplier_impl->set_consumer_proxies(*type,rt_infos,proxies);
- supplier_impl->set_consumer_proxy(proxies);
- //}
+ RtecScheduler::handle_t supplier_rt_info =
+ this->register_supplier(entry_point,
+ supplier_id,
+ type,
+ supplier.in(),
+ consumer_proxy.out()
+ ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK;
- supplier_impl->rt_info(rt_infos);
+ Supplier::PushConsumer_Vector cons_proxies;
+ cons_proxies.push_back(consumer_proxy);
+ supplier_impl->set_consumer_proxy(cons_proxies);
+ Supplier::RT_Info_Vector supp_infos;
+ supp_infos.push_back(supplier_rt_info);
+ supplier_impl->rt_info(supp_infos);
this->suppliers_.push_back(supplier_impl);
} //add_supplier()
@@ -449,12 +398,12 @@ Kokyu_EC::add_consumer_with_supplier(
Consumer * consumer_impl,
const char * cons_entry_point,
ACE_Time_Value cons_period,
- EventType_Vector& cons_types,
+ RtecEventComm::EventType cons_type,
RtecScheduler::Criticality_t cons_crit,
RtecScheduler::Importance_t cons_imp,
Supplier * supplier_impl,
const char * supp_entry_point,
- EventType_Vector& supp_types
+ RtecEventComm::EventType supp_type
ACE_ENV_ARG_DECL
)
ACE_THROW_SPEC ((
@@ -464,33 +413,20 @@ Kokyu_EC::add_consumer_with_supplier(
, RtecScheduler::SYNCHRONIZATION_FAILURE
))
{
- add_consumer(consumer_impl,cons_entry_point,cons_period,cons_types,
- cons_crit,cons_imp ACE_ENV_ARG_PARAMETER);
+ add_consumer(consumer_impl,cons_entry_point,cons_period,cons_type,cons_crit,cons_imp ACE_ENV_ARG_PARAMETER);
ACE_CHECK;
- add_supplier(supplier_impl,supp_entry_point,supp_types ACE_ENV_ARG_PARAMETER);
+ add_supplier(supplier_impl,supp_entry_point,supp_type ACE_ENV_ARG_PARAMETER);
ACE_CHECK;
- for(Consumer::RT_Info_Vector::Iterator citer(consumer_impl->rt_info());
- !citer.done(); citer.advance())
- {
- Consumer::RT_Info_Vector::TYPE *cons_info;
- citer.next(cons_info);
-
- for(Supplier::RT_Info_Vector::Iterator iter(supplier_impl->rt_info());
- !iter.done(); iter.advance())
- {
- Supplier::RT_Info_Vector::TYPE *supp_info;
- iter.next(supp_info);
-
- //ACE_DEBUG((LM_DEBUG,"Kokyu_EC (%P|%t) add_consumer_with_supplier() adding dependency %d\n",*cons_info));
- this->add_dependency (*supp_info,
- *cons_info,
- 1,
- RtecBase::ONE_WAY_CALL
- ACE_ENV_ARG_PARAMETER);
- ACE_CHECK;
- }
- }
+ Supplier::RT_Info_Vector supp_infos = supplier_impl->rt_info();
+ Consumer::RT_Info_Vector cons_infos = consumer_impl->rt_info();
+ this->add_dependency (cons_infos[0],//consumer_impl->rt_info(),
+ supp_infos[0],
+ //supplier_impl->rt_info(),
+ 1,
+ RtecBase::TWO_WAY_CALL
+ ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK;
} //add_consumer_with_supplier()
///Takes ownership of Consumer
@@ -499,7 +435,7 @@ Kokyu_EC::add_consumer(
Consumer * consumer_impl,
const char * entry_point,
ACE_Time_Value period,
- EventType_Vector& types,
+ RtecEventComm::EventType cons_type,
RtecScheduler::Criticality_t crit,
RtecScheduler::Importance_t imp
ACE_ENV_ARG_DECL
@@ -511,9 +447,6 @@ Kokyu_EC::add_consumer(
, RtecScheduler::SYNCHRONIZATION_FAILURE
))
{
- RtecEventComm::PushConsumer_var consumer = consumer_impl->_this (ACE_ENV_SINGLE_ARG_PARAMETER);
- ACE_CHECK;
-
RtecEventChannelAdmin::ProxyPushSupplier_var proxy_supplier;
//Specifying criticality is crucial since it propagates from
@@ -525,45 +458,110 @@ Kokyu_EC::add_consumer(
info.threads = 0;
info.info_type = RtecScheduler::OPERATION;
- Consumer::RT_Info_Vector rt_infos(types.size());
- /*
- EventType_Vector one_type(1);
+ RtecEventComm::PushConsumer_var consumer =
+ consumer_impl->_this (ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_CHECK;
+
+ RtecScheduler::handle_t consumer_rt_info =
+ this->register_consumer(entry_point,
+ info,
+ cons_type,
+ consumer.in(),
+ proxy_supplier.out()
+ ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK;
+
+ Consumer::RT_Info_Vector cons_infos;
+ cons_infos.push_back(consumer_rt_info);
+ consumer_impl->rt_info(cons_infos);
+
+ this->consumers_.push_back(consumer_impl);
+} //add_consumer()
+
+void
+Kokyu_EC::add_dummy_supplier(EventType_Vector& types
+ ACE_ENV_ARG_DECL
+ )
+ ACE_THROW_SPEC ((
+ CORBA::SystemException
+ , RtecScheduler::UNKNOWN_TASK
+ , RtecScheduler::INTERNAL
+ , RtecScheduler::SYNCHRONIZATION_FAILURE
+ ))
+{
+ //Gateways register as consumers of ALL the consumers in the
+ //remote EC, so we need a generalized RT_Info to cover all the
+ //dependencies not actually provided by this node. That is, we
+ //need an RT_Info for a fictional supplier which publishes all
+ //known event types.
+ RtecScheduler::handle_t general_rt_info =
+ this->scheduler_->create ("Generalized Supplier (DUMMY)" ACE_ENV_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+
+ ACE_Time_Value exec(0,500); //some arbitrary execution time
+ RtecScheduler::Time exec_time;
+ ORBSVCS_Time::Time_Value_to_TimeT(exec_time,exec);
+ ACE_Time_Value period(0,500); //some arbitrary period
+ RtecScheduler::Time period_time;
+ ORBSVCS_Time::Time_Value_to_TimeT(period_time,period);
+ this->scheduler_->set (general_rt_info,
+ RtecScheduler::LOW_CRITICALITY, //doesn't matter for EDF anyway
+ exec_time,
+ exec_time,
+ exec_time,
+ period_time,
+ RtecScheduler::LOW_IMPORTANCE, //doesn't matter for EDF anyway
+ 0, //quantum
+ 0, //threads
+ RtecScheduler::OPERATION
+ ACE_ENV_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+
+ RtecEventComm::EventSourceID general_id = 3000; //we assume 3000 is big enough to not overlap w/ actual suppliers
+ ACE_SupplierQOS_Factory general_qos;
+ //NOTE that this is kind of hard-cody since it assumes types between UNDEF+6 and UNDEF+9
for(EventType_Vector::Iterator iter(types);
!iter.done(); iter.advance())
{
EventType_Vector::TYPE *type; //would rather const to ensure we don't change it, but not supported!
iter.next(type);
- one_type.clear();
- one_type.push_back(*type);
-
- std::stringstream entry;
- entry << entry_point << ":" << *type;
- ACE_DEBUG((LM_DEBUG,"Kokyu_EC add_consumer() registering consumer for type %d\n",*type));
- */
- RtecScheduler::handle_t consumer_rt_info =
- this->register_consumer(entry_point,//entry.str().c_str(),
- info,
- types,//one_type,
- consumer.in(),
- proxy_supplier.out()
- ACE_ENV_ARG_PARAMETER);
- ACE_CHECK;
-
- //ACE_DEBUG((LM_DEBUG,"Kokyu_EC add_consumer() registered consumer with RT_Info %s [%d]\n",entry.str().c_str(),consumer_rt_info));
-
- rt_infos.push_back(consumer_rt_info);
- //}
-
- //ACE_DEBUG((LM_DEBUG,"Kokyu_EC add_consumer() setting RT_Infos and storing consumer_impl\n"));
-
- consumer_impl->rt_info(rt_infos);
-
- this->consumers_.push_back(consumer_impl);
-
- //ACE_DEBUG((LM_DEBUG,"Kokyu_EC add_consumer() DONE\n"));
+ general_qos.insert (general_id,
+ *type,
+ general_rt_info,
+ 1 //number of calls
+ );
+ }
-} //add_consumer()
+ //we need to actually connect to the EC so the scheduler will set up the dependencies right
+ RtecEventComm::EventType supp_type = general_id; //some arbitrary type, never pushed
+ Supplier * general_impl;
+ ACE_NEW(general_impl,
+ Supplier(general_id,supp_type,supp_type));
+ //dummy supplier since nothing will trigger it to push!
+
+ RtecEventComm::PushSupplier_var general;
+ general = general_impl->_this(ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+
+ RtecEventChannelAdmin::ProxyPushConsumer_var proxy_consumer;
+ proxy_consumer = this->supplier_admin_->obtain_push_consumer (ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+
+ proxy_consumer->connect_push_supplier (general.in(),
+ general_qos.get_SupplierQOS()
+ ACE_ENV_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+
+ Supplier::PushConsumer_Vector proxies(1);
+ proxies.push_back(proxy_consumer);
+ general_impl->set_consumer_proxy(proxies);
+ Supplier::RT_Info_Vector infos(1);
+ infos.push_back(general_rt_info);
+ general_impl->rt_info(infos);
+
+ this->suppliers_.push_back(general_impl);
+} //add_dummy_supplier
//*************************************************************
diff --git a/TAO/orbsvcs/examples/RtEC/Federated_Kokyu/dynamic_topology_test/Kokyu_EC.h b/TAO/orbsvcs/examples/RtEC/Federated_Kokyu/dynamic_topology_test/Kokyu_EC.h
index a41ab090542..caac54bd51c 100644
--- a/TAO/orbsvcs/examples/RtEC/Federated_Kokyu/dynamic_topology_test/Kokyu_EC.h
+++ b/TAO/orbsvcs/examples/RtEC/Federated_Kokyu/dynamic_topology_test/Kokyu_EC.h
@@ -17,6 +17,8 @@
class Kokyu_EC : public POA_RtEventChannelAdmin::RtSchedEventChannel
{
public:
+ typedef ACE_Vector<RtecEventComm::EventType> EventType_Vector;
+
Kokyu_EC(void);
virtual ~Kokyu_EC(void);
@@ -29,13 +31,10 @@ public:
int init(const char* schedule_discipline, PortableServer::POA_ptr poa);
- typedef ACE_Vector<RtecEventComm::EventType> EventType_Vector;
-
- //To specify a consumer of one type of event, set both types to the same value
virtual RtEventChannelAdmin::handle_t register_consumer (
const char * entry_point,
const RtEventChannelAdmin::SchedInfo & info,
- EventType_Vector& types,
+ RtecEventComm::EventType type,
RtecEventComm::PushConsumer_ptr consumer,
RtecEventChannelAdmin::ProxyPushSupplier_out proxy_supplier
ACE_ENV_ARG_DECL
@@ -47,11 +46,10 @@ public:
, RtecScheduler::SYNCHRONIZATION_FAILURE
));
- //To specify a supplier of one type of event, set both types to the same value
virtual RtEventChannelAdmin::handle_t register_supplier (
const char * entry_point,
RtecEventComm::EventSourceID source,
- EventType_Vector& types,
+ RtecEventComm::EventType type,
RtecEventComm::PushSupplier_ptr supplier,
RtecEventChannelAdmin::ProxyPushConsumer_out proxy_consumer
ACE_ENV_ARG_DECL
@@ -95,7 +93,7 @@ public:
void add_supplier_with_timeout(
Supplier * supplier_impl,
const char * supp_entry_point,
- EventType_Vector& supp_types,
+ RtecEventComm::EventType supp_type,
Timeout_Consumer * timeout_consumer_impl,
const char * timeout_entry_point,
ACE_Time_Value period,
@@ -131,7 +129,7 @@ public:
void add_supplier(
Supplier * supplier_impl,
const char * entry_point,
- EventType_Vector& types
+ RtecEventComm::EventType type
ACE_ENV_ARG_DECL
)
ACE_THROW_SPEC ((
@@ -146,12 +144,12 @@ public:
Consumer * consumer_impl,
const char * cons_entry_point,
ACE_Time_Value cons_period,
- EventType_Vector& cons_types,
+ RtecEventComm::EventType cons_type,
RtecScheduler::Criticality_t cons_crit,
RtecScheduler::Importance_t cons_imp,
Supplier * supplier_impl,
const char * supp_entry_point,
- EventType_Vector& supp_types
+ RtecEventComm::EventType supp_type
ACE_ENV_ARG_DECL
)
ACE_THROW_SPEC ((
@@ -166,7 +164,7 @@ public:
Consumer * consumer_impl,
const char * entry_point,
ACE_Time_Value period,
- EventType_Vector& types,
+ RtecEventComm::EventType cons_type,
RtecScheduler::Criticality_t crit,
RtecScheduler::Importance_t imp
ACE_ENV_ARG_DECL
@@ -178,6 +176,16 @@ public:
, RtecScheduler::SYNCHRONIZATION_FAILURE
));
+ void add_dummy_supplier(EventType_Vector& types
+ ACE_ENV_ARG_DECL
+ )
+ ACE_THROW_SPEC ((
+ CORBA::SystemException
+ , RtecScheduler::UNKNOWN_TASK
+ , RtecScheduler::INTERNAL
+ , RtecScheduler::SYNCHRONIZATION_FAILURE
+ ));
+
private:
TAO::Utils::Servant_Var<POA_RtecScheduler::Scheduler> scheduler_impl_;
TAO::Utils::Servant_Var<TAO_EC_Event_Channel> ec_impl_;
diff --git a/TAO/orbsvcs/examples/RtEC/Federated_Kokyu/dynamic_topology_test/Middle1 b/TAO/orbsvcs/examples/RtEC/Federated_Kokyu/dynamic_topology_test/Middle1
new file mode 100755
index 00000000000..5955c97fd54
--- /dev/null
+++ b/TAO/orbsvcs/examples/RtEC/Federated_Kokyu/dynamic_topology_test/Middle1
@@ -0,0 +1,3 @@
+#!/bin/bash
+
+./Consumer_Supplier_EC -s edf -i server.ior -o middle1.ior -d 3
diff --git a/TAO/orbsvcs/examples/RtEC/Federated_Kokyu/dynamic_topology_test/Middle2 b/TAO/orbsvcs/examples/RtEC/Federated_Kokyu/dynamic_topology_test/Middle2
new file mode 100755
index 00000000000..d3426f2ad8f
--- /dev/null
+++ b/TAO/orbsvcs/examples/RtEC/Federated_Kokyu/dynamic_topology_test/Middle2
@@ -0,0 +1,3 @@
+#!/bin/bash
+
+./Consumer_Supplier_EC -s edf -i server.ior -o middle2.ior -d 4
diff --git a/TAO/orbsvcs/examples/RtEC/Federated_Kokyu/dynamic_topology_test/Server b/TAO/orbsvcs/examples/RtEC/Federated_Kokyu/dynamic_topology_test/Server
new file mode 100755
index 00000000000..e0c787e4087
--- /dev/null
+++ b/TAO/orbsvcs/examples/RtEC/Federated_Kokyu/dynamic_topology_test/Server
@@ -0,0 +1,3 @@
+#!/bin/bash
+
+./Consumer_EC -s edf -o server.ior
diff --git a/TAO/orbsvcs/examples/RtEC/Federated_Kokyu/dynamic_topology_test/Supplier_EC.cpp b/TAO/orbsvcs/examples/RtEC/Federated_Kokyu/dynamic_topology_test/Supplier_EC.cpp
index b3c89ce15bb..c8d40714f21 100644
--- a/TAO/orbsvcs/examples/RtEC/Federated_Kokyu/dynamic_topology_test/Supplier_EC.cpp
+++ b/TAO/orbsvcs/examples/RtEC/Federated_Kokyu/dynamic_topology_test/Supplier_EC.cpp
@@ -235,7 +235,7 @@ public:
ACE_Time_Value tv(1,200000); //period DEBUG: set to much longer period
add_supplier_with_timeout(supplier_impl1_1,
"supplier1_1",
- supp1_1_types,
+ supp1_1_types[0],
timeout_consumer_impl1_1,
"supplier1_1_timeout_consumer",
tv,
@@ -277,19 +277,29 @@ public:
add_consumer_with_supplier(consumer_impl1_1, //deleted in consumer
"consumer1_1",
tv,
- cons1_1_types,
+ cons1_1_types[0],
RtecScheduler::VERY_LOW_CRITICALITY,
RtecScheduler::VERY_LOW_IMPORTANCE,
supplier_impl1_2,
"supplier1_2",
- supp1_2_types
+ supp1_2_types[0]
ACE_ENV_ARG_PARAMETER
);
ACE_CHECK;
+ EventType_Vector types(4);
+ for(RtecEventComm::EventType type = ACE_ES_EVENT_UNDEFINED+2;
+ type <= ACE_ES_EVENT_UNDEFINED+9; type++)
+ {
+ types.push_back(type);
+ }
+
+ add_dummy_supplier(types);
+ ACE_CHECK;
+
Kokyu_EC::start(ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_CHECK;
- }
+ } //start()
};
int parse_args (int argc, char *argv[]);
@@ -423,7 +433,7 @@ main (int argc, char* argv[])
int parse_args (int argc, char *argv[])
{
- ACE_Get_Opt get_opts (argc, argv, "cs:o:i:");
+ ACE_Get_Opt get_opts (argc, argv, "s:o:i:");
int c;
//these used for handline '-i':
const char* input_file;