summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorthrall <thrall@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>2004-05-05 18:37:58 +0000
committerthrall <thrall@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>2004-05-05 18:37:58 +0000
commitdc808c6099259d882029ab008ce6694a80a61b63 (patch)
tree9933786ef1e5991e1265097d02a5099e542f49c9
parenteab1c08cc06b2f108948d83c058ba834c7d671be (diff)
downloadATCD-dc808c6099259d882029ab008ce6694a80a61b63.tar.gz
Got everything working.
-rw-r--r--TAO/orbsvcs/examples/RtEC/Federated_Kokyu/dynamic_topology_test/Consumer.cpp6
-rw-r--r--TAO/orbsvcs/examples/RtEC/Federated_Kokyu/dynamic_topology_test/Consumer.h14
-rw-r--r--TAO/orbsvcs/examples/RtEC/Federated_Kokyu/dynamic_topology_test/Consumer_EC.cpp2
-rw-r--r--TAO/orbsvcs/examples/RtEC/Federated_Kokyu/dynamic_topology_test/Dynamic_Supplier.cpp29
-rw-r--r--TAO/orbsvcs/examples/RtEC/Federated_Kokyu/dynamic_topology_test/Kokyu_EC.cpp157
-rw-r--r--TAO/orbsvcs/examples/RtEC/Federated_Kokyu/dynamic_topology_test/Kokyu_EC.h2
-rw-r--r--TAO/orbsvcs/examples/RtEC/Federated_Kokyu/dynamic_topology_test/Supplier.cpp32
-rw-r--r--TAO/orbsvcs/examples/RtEC/Federated_Kokyu/dynamic_topology_test/Supplier.h15
-rw-r--r--TAO/orbsvcs/examples/RtEC/Federated_Kokyu/dynamic_topology_test/Supplier_EC.cpp17
9 files changed, 197 insertions, 77 deletions
diff --git a/TAO/orbsvcs/examples/RtEC/Federated_Kokyu/dynamic_topology_test/Consumer.cpp b/TAO/orbsvcs/examples/RtEC/Federated_Kokyu/dynamic_topology_test/Consumer.cpp
index 3f0a4b73318..04f3171ad26 100644
--- a/TAO/orbsvcs/examples/RtEC/Federated_Kokyu/dynamic_topology_test/Consumer.cpp
+++ b/TAO/orbsvcs/examples/RtEC/Federated_Kokyu/dynamic_topology_test/Consumer.cpp
@@ -196,13 +196,13 @@ Consumer::setWorkTime(ACE_Time_Value& worktime)
}
void
-Consumer::rt_info(RtecScheduler::handle_t consumer_rt_info)
+Consumer::rt_info(RT_Info_Vector& consumer_rt_info)
{
rt_info_ = consumer_rt_info;
}
-RtecScheduler::handle_t
-Consumer::rt_info(void) const
+Consumer::RT_Info_Vector&
+Consumer::rt_info(void)
{
return rt_info_;
}
diff --git a/TAO/orbsvcs/examples/RtEC/Federated_Kokyu/dynamic_topology_test/Consumer.h b/TAO/orbsvcs/examples/RtEC/Federated_Kokyu/dynamic_topology_test/Consumer.h
index 65b43b204a3..ae1efc9ba7f 100644
--- a/TAO/orbsvcs/examples/RtEC/Federated_Kokyu/dynamic_topology_test/Consumer.h
+++ b/TAO/orbsvcs/examples/RtEC/Federated_Kokyu/dynamic_topology_test/Consumer.h
@@ -19,9 +19,12 @@
#include "orbsvcs/RtecEventCommS.h"
#include "orbsvcs/RtecSchedulerC.h"
+
+#include "ace/Time_Value.h"
+#include "ace/Vector_T.h"
+
#include "Supplier.h"
#include "Service_Handler.h"
-#include "ace/Time_Value.h"
#if !defined (ACE_LACKS_PRAGMA_ONCE)
# pragma once
@@ -65,8 +68,10 @@ public:
void setWorkTime(ACE_Time_Value& worktime);
- void rt_info(RtecScheduler::handle_t consumer_rt_info);
- RtecScheduler::handle_t rt_info(void) const;
+ typedef ACE_Vector<RtecScheduler::handle_t> RT_Info_Vector;
+
+ void rt_info(RT_Info_Vector& consumer_rt_info);
+ RT_Info_Vector& rt_info(void);
void handler(Service_Handler * handler);
@@ -77,7 +82,8 @@ private:
ACE_Time_Value worktime_;
Supplier *fwddest_;
- RtecScheduler::handle_t rt_info_;
+ //RtecScheduler::handle_t rt_info_;
+ RT_Info_Vector rt_info_;
Service_Handler * handler_;
diff --git a/TAO/orbsvcs/examples/RtEC/Federated_Kokyu/dynamic_topology_test/Consumer_EC.cpp b/TAO/orbsvcs/examples/RtEC/Federated_Kokyu/dynamic_topology_test/Consumer_EC.cpp
index 102b719a553..2f862cf87bf 100644
--- a/TAO/orbsvcs/examples/RtEC/Federated_Kokyu/dynamic_topology_test/Consumer_EC.cpp
+++ b/TAO/orbsvcs/examples/RtEC/Federated_Kokyu/dynamic_topology_test/Consumer_EC.cpp
@@ -185,6 +185,8 @@ public:
ACE_ENV_ARG_PARAMETER
);
ACE_CHECK;
+
+ ACE_DEBUG((LM_DEBUG,"Consumer_EC set_up_supp_and_cons() DONE\n"));
} //set_up_supp_and_cons()
private:
diff --git a/TAO/orbsvcs/examples/RtEC/Federated_Kokyu/dynamic_topology_test/Dynamic_Supplier.cpp b/TAO/orbsvcs/examples/RtEC/Federated_Kokyu/dynamic_topology_test/Dynamic_Supplier.cpp
index b24a56d67f4..95a43f86a83 100644
--- a/TAO/orbsvcs/examples/RtEC/Federated_Kokyu/dynamic_topology_test/Dynamic_Supplier.cpp
+++ b/TAO/orbsvcs/examples/RtEC/Federated_Kokyu/dynamic_topology_test/Dynamic_Supplier.cpp
@@ -57,14 +57,14 @@ Dynamic_Supplier::timeout_occured (ACE_ENV_SINGLE_ARG_DECL)
switch (this->mode_) {
case NORMAL:
{
- eventA[0].header.type = ACE_ES_EVENT_UNDEFINED + this->norm_type_ - 1;
- eventB[0].header.type = ACE_ES_EVENT_UNDEFINED + this->norm_type2_ - 1;
+ eventA[0].header.type = this->norm_type_;
+ eventB[0].header.type = this->norm_type2_;
break;
}
case FAULT_TOLERANT:
{
- eventA[0].header.type = ACE_ES_EVENT_UNDEFINED + this->ft_type_ - 1;
- eventB[0].header.type = ACE_ES_EVENT_UNDEFINED + this->ft_type2_ - 1;
+ eventA[0].header.type = this->ft_type_;
+ eventB[0].header.type = this->ft_type2_;
break;
}
}
@@ -86,12 +86,29 @@ Dynamic_Supplier::timeout_occured (ACE_ENV_SINGLE_ARG_DECL)
ACE_DEBUG((LM_DEBUG,"Dynamic_Supplier (id %d) in thread %t ONE_WAY_CALL_START at %u\n",this->id_,ACE_OS::gettimeofday().msec()));
oid.type = eventA[0].header.type;
DSTRM_EVENT (WORKER_GROUP_FAM, ONE_WAY_CALL_START, 0, sizeof(Object_ID), (char*)&oid);
- consumer_proxy_->push (eventA ACE_ENV_ARG_PARAMETER);
+ //TODO: BUG! This code pushes eventA/B to ALL consumers!
+ for(PushConsumer_Vector::Iterator iter(this->consumer_proxy_);
+ !iter.done(); iter.advance())
+ {
+ PushConsumer_Vector::TYPE *proxy; //would rather const to ensure we don't change it, but not supported!
+ iter.next(proxy);
+
+ ACE_DEBUG((LM_DEBUG,"Dynamic_Supplier (id %d) in thread %t pushing eventA\n",this->id_));
+ (*proxy)->push (eventA ACE_ENV_ARG_PARAMETER);
+ }
DSTRM_EVENT (WORKER_GROUP_FAM, ONE_WAY_CALL_DONE, 0, sizeof(Object_ID), (char*)&oid);
oid.type = eventB[0].header.type;
DSTRM_EVENT (WORKER_GROUP_FAM, ONE_WAY_CALL_START, 0, sizeof(Object_ID), (char*)&oid);
- consumer_proxy_->push (eventB ACE_ENV_ARG_PARAMETER);
+ for(PushConsumer_Vector::Iterator iter(this->consumer_proxy_);
+ !iter.done(); iter.advance())
+ {
+ PushConsumer_Vector::TYPE *proxy; //would rather const to ensure we don't change it, but not supported!
+ iter.next(proxy);
+
+ ACE_DEBUG((LM_DEBUG,"Dynamic_Supplier (id %d) in thread %t pushing eventB\n",this->id_));
+ (*proxy)->push (eventB ACE_ENV_ARG_PARAMETER);
+ }
DSTRM_EVENT (WORKER_GROUP_FAM, ONE_WAY_CALL_DONE, 0, sizeof(Object_ID), (char*)&oid);
//DSTRM_EVENT (WORKER_GROUP_FAM, ONE_WAY_CALL_DONE, m_id, 0, NULL);
ACE_DEBUG((LM_DEBUG,"Dynamic_Supplier (id %d) in thread %t ONE_WAY_CALL_DONE at %u\n",this->id_,ACE_OS::gettimeofday().msec()));
diff --git a/TAO/orbsvcs/examples/RtEC/Federated_Kokyu/dynamic_topology_test/Kokyu_EC.cpp b/TAO/orbsvcs/examples/RtEC/Federated_Kokyu/dynamic_topology_test/Kokyu_EC.cpp
index e3192b3612c..ac366dae18a 100644
--- a/TAO/orbsvcs/examples/RtEC/Federated_Kokyu/dynamic_topology_test/Kokyu_EC.cpp
+++ b/TAO/orbsvcs/examples/RtEC/Federated_Kokyu/dynamic_topology_test/Kokyu_EC.cpp
@@ -12,6 +12,8 @@
#include <ace/OS_NS_strings.h> //for ACE_OS::strcasecmp
#include <ace/OS_NS_sys_time.h> // for ACE_OS::gettimeofday
+#include <sstream>
+
namespace {
typedef TAO_Reconfig_Scheduler<TAO_RMS_FAIR_Reconfig_Sched_Strategy, TAO_SYNCH_MUTEX> RECONFIG_RMS_SCHED_TYPE;
@@ -116,7 +118,7 @@ Kokyu_EC::register_consumer (
{
RtecScheduler::handle_t consumer1_rt_info =
scheduler_impl_->create (entry_point ACE_ENV_ARG_PARAMETER);
- ACE_TRY_CHECK;
+ ACE_CHECK;
scheduler_impl_->set (consumer1_rt_info,
info.criticality,
@@ -129,7 +131,7 @@ Kokyu_EC::register_consumer (
info.threads,
info.info_type
ACE_ENV_ARG_PARAMETER);
- ACE_TRY_CHECK;
+ ACE_CHECK;
ACE_ConsumerQOS_Factory consumer_qos1;
@@ -138,7 +140,7 @@ Kokyu_EC::register_consumer (
{
EventType_Vector::TYPE *type; //would rather const to ensure we don't change it, but not supported!
iter.next(type);
- ACE_DEBUG((LM_DEBUG,"Kokyu_EC register_consumer() registering event type %d\n",*type));
+ //ACE_DEBUG((LM_DEBUG,"Kokyu_EC register_consumer() registering event type %d\n",*type));
if (*type != ACE_ES_EVENT_INTERVAL_TIMEOUT)
{
consumer_qos1.insert_type (*type, consumer1_rt_info);
@@ -149,16 +151,19 @@ Kokyu_EC::register_consumer (
info.period, //in 100s of nanosec
consumer1_rt_info);
}
+ //ACE_DEBUG((LM_DEBUG,"Kokyu_EC register_consumer() registered event type\n"));
}
proxy_supplier =
consumer_admin_->obtain_push_supplier (ACE_ENV_SINGLE_ARG_PARAMETER);
- ACE_TRY_CHECK;
+ ACE_CHECK;
proxy_supplier->connect_push_consumer (consumer,
consumer_qos1.get_ConsumerQOS ()
ACE_ENV_ARG_PARAMETER);
- ACE_TRY_CHECK;
+ ACE_CHECK;
+
+ //ACE_DEBUG((LM_DEBUG,"Kokyu_EC register_consumer() connected PushConsumer\n"));
return consumer1_rt_info;
}
@@ -180,7 +185,7 @@ Kokyu_EC::register_supplier (
{
RtecScheduler::handle_t supplier1_rt_info =
scheduler_impl_->create (entry_point ACE_ENV_ARG_PARAMETER);
- ACE_TRY_CHECK;
+ ACE_CHECK;
ACE_SupplierQOS_Factory supplier_qos1;
for(EventType_Vector::Iterator iter(types);
@@ -196,12 +201,12 @@ Kokyu_EC::register_supplier (
proxy_consumer =
supplier_admin_->obtain_push_consumer (ACE_ENV_SINGLE_ARG_PARAMETER);
- ACE_TRY_CHECK;
+ ACE_CHECK;
proxy_consumer->connect_push_supplier (supplier,
supplier_qos1.get_SupplierQOS ()
ACE_ENV_ARG_PARAMETER);
- ACE_TRY_CHECK;
+ ACE_CHECK;
return supplier1_rt_info;
}
@@ -254,9 +259,9 @@ Kokyu_EC::start (ACE_ENV_SINGLE_ARG_DECL)
configs.out (),
anomalies.out ()
ACE_ENV_ARG_PARAMETER);
- ACE_TRY_CHECK;
+ ACE_CHECK;
ec_impl_->activate (ACE_ENV_SINGLE_ARG_PARAMETER);
- ACE_TRY_CHECK;
+ ACE_CHECK;
//@BT: EC activated is roughly equivalent to having the DT scheduler ready to run
//DSTRM_EVENT (MAIN_GROUP_FAM, SCHEDULER_STARTED, 1, 0, NULL);
@@ -346,12 +351,22 @@ Kokyu_EC::add_timeout_consumer(
ACE_CHECK;
//don't need to save supplier_timeout_consumer_rt_info because only used to set dependency here:
- this->add_dependency (supplier_timeout_consumer_rt_info,
- supplier_impl->rt_info(),
- 1,
- RtecBase::TWO_WAY_CALL
- ACE_ENV_ARG_PARAMETER);
- ACE_CHECK;
+ for(Supplier::RT_Info_Vector::Iterator iter(supplier_impl->rt_info());
+ !iter.done(); iter.advance())
+ {
+ Supplier::RT_Info_Vector::TYPE *info; //would rather const to ensure we don't change it, but not supported!
+ iter.next(info);
+
+ ACE_DEBUG((LM_DEBUG,"Kokyu_EC add_supplier_with_timeout() adding deps for supplier RT_Info %d and "
+ "timeout_consumer RT_Info %d\n",*info,supplier_timeout_consumer_rt_info));
+
+ this->add_dependency (supplier_timeout_consumer_rt_info,
+ *info,
+ 1,
+ RtecBase::TWO_WAY_CALL
+ ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK;
+ }
this->timeout_consumers_.push_back(timeout_consumer_impl);
} //add_supplier_with_timeout()
@@ -379,17 +394,37 @@ Kokyu_EC::add_supplier(
supplier = supplier_impl->_this(ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_CHECK;
- RtecScheduler::handle_t supplier_rt_info =
- this->register_supplier(entry_point,
- supplier_id,
- types,
- supplier.in(),
- consumer_proxy.out()
- ACE_ENV_ARG_PARAMETER);
- ACE_CHECK;
+ EventType_Vector one_type(1);
+ Supplier::RT_Info_Vector rt_infos(types.size());
+ Supplier::PushConsumer_Vector proxies(types.size());
+ for(EventType_Vector::Iterator iter(types);
+ !iter.done(); iter.advance())
+ {
+ EventType_Vector::TYPE *type; //would rather const to ensure we don't change it, but not supported!
+ iter.next(type);
+ one_type.clear();
+ one_type.push_back(*type);
+
+ std::stringstream entry;
+ entry << entry_point << ":" << *type;
+
+ ACE_DEBUG((LM_DEBUG,"Kokyu_EC add_supplier() registering supplier for type %d\n",*type));
+
+ RtecScheduler::handle_t supplier_rt_info =
+ this->register_supplier(entry.str().c_str(),
+ supplier_id,
+ one_type,
+ supplier.in(),
+ consumer_proxy.out()
+ ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK;
+
+ rt_infos.push_back(supplier_rt_info);
+ proxies.push_back(consumer_proxy); //proxies has a _var ref to Consumer_Proxy so won't be deleted on return
+ }
- supplier_impl->set_consumer_proxy(consumer_proxy.in());
- supplier_impl->rt_info(supplier_rt_info);
+ supplier_impl->set_consumer_proxy(proxies);
+ supplier_impl->rt_info(rt_infos);
this->suppliers_.push_back(supplier_impl);
} //add_supplier()
@@ -421,12 +456,22 @@ Kokyu_EC::add_consumer_with_supplier(
add_supplier(supplier_impl,supp_entry_point,supp_types ACE_ENV_ARG_PARAMETER);
ACE_CHECK;
- this->add_dependency (consumer_impl->rt_info(),
- supplier_impl->rt_info(),
- 1,
- RtecBase::TWO_WAY_CALL
- ACE_ENV_ARG_PARAMETER);
- ACE_CHECK;
+ //only add dependencies from one consumer RT_Info; else get dependency loops (presumable because of TWO_WAY_CALL)
+ for(Supplier::RT_Info_Vector::Iterator iter(supplier_impl->rt_info());
+ !iter.done(); iter.advance())
+ {
+ Supplier::RT_Info_Vector::TYPE *supp_info;
+ iter.next(supp_info);
+
+ //ACE_DEBUG((LM_DEBUG,"Kokyu_EC (%P|%t) add_consumer_with_supplier() adding dependency %d\n",*cons_info));
+
+ this->add_dependency (consumer_impl->rt_info()[0],//*cons_info,
+ *supp_info,
+ 1,
+ RtecBase::TWO_WAY_CALL
+ ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK;
+ }
} //add_consumer_with_supplier()
///Takes ownership of Consumer
@@ -447,6 +492,9 @@ Kokyu_EC::add_consumer(
, RtecScheduler::SYNCHRONIZATION_FAILURE
))
{
+ RtecEventComm::PushConsumer_var consumer = consumer_impl->_this (ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_CHECK;
+
RtecEventChannelAdmin::ProxyPushSupplier_var proxy_supplier;
//Specifying criticality is crucial since it propagates from
@@ -458,20 +506,41 @@ Kokyu_EC::add_consumer(
info.threads = 0;
info.info_type = RtecScheduler::OPERATION;
- RtecEventComm::PushConsumer_var consumer =
- consumer_impl->_this (ACE_ENV_SINGLE_ARG_PARAMETER);
- ACE_CHECK;
+ EventType_Vector one_type(1);
+ Consumer::RT_Info_Vector rt_infos(types.size());
+ for(EventType_Vector::Iterator iter(types);
+ !iter.done(); iter.advance())
+ {
+ EventType_Vector::TYPE *type; //would rather const to ensure we don't change it, but not supported!
+ iter.next(type);
+ one_type.clear();
+ one_type.push_back(*type);
- RtecScheduler::handle_t consumer_rt_info =
- this->register_consumer(entry_point,
- info,
- types,
- consumer.in(),
- proxy_supplier.out()
- ACE_ENV_ARG_PARAMETER);
- ACE_CHECK;
+ std::stringstream entry;
+ entry << entry_point << ":" << *type;
+
+ ACE_DEBUG((LM_DEBUG,"Kokyu_EC add_consumer() registering consumer for type %d\n",*type));
- consumer_impl->rt_info(consumer_rt_info);
+ RtecScheduler::handle_t consumer_rt_info =
+ this->register_consumer(entry.str().c_str(),
+ info,
+ one_type,
+ consumer.in(),
+ proxy_supplier.out()
+ ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK;
+
+ //ACE_DEBUG((LM_DEBUG,"Kokyu_EC add_consumer() registered consumer\n"));
+
+ rt_infos.push_back(consumer_rt_info);
+ }
+
+ //ACE_DEBUG((LM_DEBUG,"Kokyu_EC add_consumer() setting RT_Infos and storing consumer_impl\n"));
+
+ consumer_impl->rt_info(rt_infos);
this->consumers_.push_back(consumer_impl);
+
+ //ACE_DEBUG((LM_DEBUG,"Kokyu_EC add_consumer() DONE\n"));
+
} //add_consumer()
diff --git a/TAO/orbsvcs/examples/RtEC/Federated_Kokyu/dynamic_topology_test/Kokyu_EC.h b/TAO/orbsvcs/examples/RtEC/Federated_Kokyu/dynamic_topology_test/Kokyu_EC.h
index 81a477447e0..26a5da8f897 100644
--- a/TAO/orbsvcs/examples/RtEC/Federated_Kokyu/dynamic_topology_test/Kokyu_EC.h
+++ b/TAO/orbsvcs/examples/RtEC/Federated_Kokyu/dynamic_topology_test/Kokyu_EC.h
@@ -29,7 +29,7 @@ public:
int init(const char* schedule_discipline, PortableServer::POA_ptr poa);
- typedef ACE_Vector<RtecEventComm::EventType> EventType_Vector;
+ typedef ACE_Vector<RtecEventComm::EventType> EventType_Vector;
//To specify a consumer of one type of event, set both types to the same value
virtual RtEventChannelAdmin::handle_t register_consumer (
diff --git a/TAO/orbsvcs/examples/RtEC/Federated_Kokyu/dynamic_topology_test/Supplier.cpp b/TAO/orbsvcs/examples/RtEC/Federated_Kokyu/dynamic_topology_test/Supplier.cpp
index 61d77cdad91..d182e1e9062 100644
--- a/TAO/orbsvcs/examples/RtEC/Federated_Kokyu/dynamic_topology_test/Supplier.cpp
+++ b/TAO/orbsvcs/examples/RtEC/Federated_Kokyu/dynamic_topology_test/Supplier.cpp
@@ -34,21 +34,30 @@ Supplier::~Supplier (void)
}
void
-Supplier::set_consumer_proxy(const RtecEventChannelAdmin::ProxyPushConsumer_ptr consumer_proxy)
+Supplier::set_consumer_proxy(PushConsumer_Vector consumer_proxies)
{
- consumer_proxy_ = RtecEventChannelAdmin::ProxyPushConsumer::_duplicate(consumer_proxy);
+ this->consumer_proxy_.clear();
+
+ for(PushConsumer_Vector::Iterator iter(consumer_proxies);
+ !iter.done(); iter.advance())
+ {
+ PushConsumer_Vector::TYPE *proxy; //would rather const to ensure we don't change it, but not supported!
+ iter.next(proxy);
+
+ this->consumer_proxy_.push_back(*proxy);
+ }
}
void
-Supplier::rt_info(const RtecScheduler::handle_t supplier_rt_info)
+Supplier::rt_info(RT_Info_Vector& supplier_rt_info)
{
- rt_info_ = supplier_rt_info;
+ this->rt_info_ = supplier_rt_info;
}
-RtecScheduler::handle_t
-Supplier::rt_info(void) const
+Supplier::RT_Info_Vector&
+Supplier::rt_info(void)
{
- return rt_info_;
+ return this->rt_info_;
}
void
@@ -94,7 +103,14 @@ Supplier::timeout_occured (ACE_ENV_SINGLE_ARG_DECL)
ACE_DEBUG((LM_DEBUG,"Supplier (id %d) in thread %t ONE_WAY_CALL_START at %u\n",this->id_,ACE_OS::gettimeofday().msec()));
DSTRM_EVENT (WORKER_GROUP_FAM, ONE_WAY_CALL_START, 0, sizeof(Object_ID), (char*)&oid);
- consumer_proxy_->push (event ACE_ENV_ARG_PARAMETER);
+ for(PushConsumer_Vector::Iterator iter(this->consumer_proxy_);
+ !iter.done(); iter.advance())
+ {
+ PushConsumer_Vector::TYPE *proxy; //would rather const to ensure we don't change it, but not supported!
+ iter.next(proxy);
+
+ (*proxy)->push (event ACE_ENV_ARG_PARAMETER);
+ }
//DSTRM_EVENT (WORKER_GROUP_FAM, ONE_WAY_CALL_DONE, m_id, 0, NULL);
ACE_DEBUG((LM_DEBUG,"Supplier (id %d) in thread %t ONE_WAY_CALL_DONE at %u\n",this->id_,ACE_OS::gettimeofday().msec()));
DSTRM_EVENT (WORKER_GROUP_FAM, ONE_WAY_CALL_DONE, 0, sizeof(Object_ID), (char*)&oid);
diff --git a/TAO/orbsvcs/examples/RtEC/Federated_Kokyu/dynamic_topology_test/Supplier.h b/TAO/orbsvcs/examples/RtEC/Federated_Kokyu/dynamic_topology_test/Supplier.h
index 27d059c1d3f..44adcaa76b5 100644
--- a/TAO/orbsvcs/examples/RtEC/Federated_Kokyu/dynamic_topology_test/Supplier.h
+++ b/TAO/orbsvcs/examples/RtEC/Federated_Kokyu/dynamic_topology_test/Supplier.h
@@ -18,6 +18,7 @@
#define SUPPLIER_H
#include "ace/Event_Handler.h"
+#include "ace/Vector_T.h"
#include "orbsvcs/RtecEventCommS.h"
#include "orbsvcs/RtecSchedulerC.h"
@@ -50,6 +51,8 @@ public:
FAULT_TOLERANT
};
+ typedef ACE_Vector<RtecEventChannelAdmin::ProxyPushConsumer_var> PushConsumer_Vector;
+
Supplier (RtecEventComm::EventSourceID id, RtecEventComm::EventType norm_type, RtecEventComm::EventType ft_type,
Service_Handler * handler = 0);
// Constructor
@@ -64,10 +67,12 @@ public:
virtual void timeout_occured (ACE_ENV_SINGLE_ARG_DECL);
- void set_consumer_proxy(const RtecEventChannelAdmin::ProxyPushConsumer_ptr consumer_proxy);
+ void set_consumer_proxy(PushConsumer_Vector consumer_proxies);
+
+ typedef ACE_Vector<RtecScheduler::handle_t> RT_Info_Vector;
- void rt_info(RtecScheduler::handle_t supplier_rt_info);
- RtecScheduler::handle_t rt_info(void) const;
+ void rt_info(RT_Info_Vector& supplier_rt_info);
+ RT_Info_Vector& rt_info(void);
RtecEventComm::EventSourceID get_id(void) const;
@@ -81,11 +86,11 @@ protected:
RtecEventComm::EventType norm_type_;
RtecEventComm::EventType ft_type_;
- RtecEventChannelAdmin::ProxyPushConsumer_ptr consumer_proxy_;
+ PushConsumer_Vector consumer_proxy_;
mode_t mode_;
- RtecScheduler::handle_t rt_info_;
+ RT_Info_Vector rt_info_;
Service_Handler *handler_;
}; //class Supplier
diff --git a/TAO/orbsvcs/examples/RtEC/Federated_Kokyu/dynamic_topology_test/Supplier_EC.cpp b/TAO/orbsvcs/examples/RtEC/Federated_Kokyu/dynamic_topology_test/Supplier_EC.cpp
index 318183b1859..cb98db97ef5 100644
--- a/TAO/orbsvcs/examples/RtEC/Federated_Kokyu/dynamic_topology_test/Supplier_EC.cpp
+++ b/TAO/orbsvcs/examples/RtEC/Federated_Kokyu/dynamic_topology_test/Supplier_EC.cpp
@@ -223,7 +223,7 @@ public:
Mode_Handler *mode_handler;
ACE_NEW(mode_handler,
- Mode_Handler(1)); //mode switch after first event
+ Mode_Handler(100)); //mode switch after first event
Supplier *supplier_impl1_1;
Timeout_Consumer *timeout_consumer_impl1_1;
ACE_NEW(supplier_impl1_1,
@@ -232,7 +232,7 @@ public:
this->handler_ = mode_handler;
ACE_NEW(timeout_consumer_impl1_1,
Timeout_Consumer(supplier_impl1_1));
- ACE_Time_Value tv(2,200000); //period
+ ACE_Time_Value tv(0,200000); //period
add_supplier_with_timeout(supplier_impl1_1,
"supplier1_1",
supp1_1_types,
@@ -245,10 +245,10 @@ public:
);
ACE_CHECK;
- RtecEventComm::EventType supp1_2_normal_type1 = ACE_ES_EVENT_UNDEFINED+1;
- RtecEventComm::EventType supp1_2_normal_type2 = ACE_ES_EVENT_UNDEFINED+2;
- RtecEventComm::EventType supp1_2_ft_type1 = ACE_ES_EVENT_UNDEFINED+3;
- RtecEventComm::EventType supp1_2_ft_type2 = ACE_ES_EVENT_UNDEFINED+4;
+ RtecEventComm::EventType supp1_2_normal_type1 = ACE_ES_EVENT_UNDEFINED+2;
+ RtecEventComm::EventType supp1_2_normal_type2 = ACE_ES_EVENT_UNDEFINED+3;
+ RtecEventComm::EventType supp1_2_ft_type1 = ACE_ES_EVENT_UNDEFINED+4;
+ RtecEventComm::EventType supp1_2_ft_type2 = ACE_ES_EVENT_UNDEFINED+5;
Kokyu_EC::EventType_Vector supp1_2_types(4);
supp1_2_types.push_back(supp1_2_normal_type1);
supp1_2_types.push_back(supp1_2_normal_type2);
@@ -256,10 +256,15 @@ public:
supp1_2_types.push_back(supp1_2_ft_type2);
Supplier *supplier_impl1_2;
+ /*
+ ACE_NEW(supplier_impl1_2,
+ Supplier(2,supp1_2_normal_type1,supp1_2_ft_type1));
+ */
ACE_NEW(supplier_impl1_2,
Dynamic_Supplier(2,
supp1_2_normal_type1,supp1_2_normal_type2,
supp1_2_ft_type1,supp1_2_ft_type2));
+
Consumer * consumer_impl1_1;
ACE_NEW(consumer_impl1_1,
Consumer(supp1_1_normal_type,supp1_1_ft_type,supplier_impl1_2));