summaryrefslogtreecommitdiff
path: root/TAO/orbsvcs/orbsvcs/Event/EC_Kokyu_Filter_Builder.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'TAO/orbsvcs/orbsvcs/Event/EC_Kokyu_Filter_Builder.cpp')
-rw-r--r--TAO/orbsvcs/orbsvcs/Event/EC_Kokyu_Filter_Builder.cpp356
1 files changed, 249 insertions, 107 deletions
diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_Kokyu_Filter_Builder.cpp b/TAO/orbsvcs/orbsvcs/Event/EC_Kokyu_Filter_Builder.cpp
index 283b7428fef..d708293dfb2 100644
--- a/TAO/orbsvcs/orbsvcs/Event/EC_Kokyu_Filter_Builder.cpp
+++ b/TAO/orbsvcs/orbsvcs/Event/EC_Kokyu_Filter_Builder.cpp
@@ -15,6 +15,23 @@
ACE_RCSID(Event, EC_Kokyu_Filter_Builder, "$Id$")
+const char* designator (long dsgn)
+{
+ switch(dsgn)
+ {
+ case ACE_ES_GLOBAL_DESIGNATOR: return "GLOBAL";
+ case ACE_ES_CONJUNCTION_DESIGNATOR: return "CONJ";
+ case ACE_ES_DISJUNCTION_DESIGNATOR: return "DISJ";
+ case ACE_ES_NEGATION_DESIGNATOR: return "NEG";
+ case ACE_ES_LOGICAL_AND_DESIGNATOR: return "LOG_AND";
+ case ACE_ES_BITMASK_DESIGNATOR: return "BITMASK";
+ case ACE_ES_MASKED_TYPE_DESIGNATOR: return "MASKED_TYPE";
+ case ACE_ES_NULL_DESIGNATOR: return "NULL";
+ }
+
+ return "---";
+}
+
TAO_EC_Kokyu_Filter_Builder::~TAO_EC_Kokyu_Filter_Builder (void)
{
}
@@ -25,7 +42,11 @@ TAO_EC_Kokyu_Filter_Builder::build (
RtecEventChannelAdmin::ConsumerQOS& qos
ACE_ENV_ARG_DECL) const
{
+ CORBA::ULong i=0,found=0;
CORBA::ULong pos = 0;
+ CORBA::Long npos = -1;
+ int establish_final_consumer_dependency=0;
+
CORBA::Object_var tmp =
this->event_channel_->scheduler ();
@@ -33,17 +54,127 @@ TAO_EC_Kokyu_Filter_Builder::build (
RtecScheduler::Scheduler::_narrow (tmp.in () ACE_ENV_ARG_PARAMETER);
ACE_CHECK_RETURN (0);
-#if 0 //by VS
- // @@ How do we figure out which parent???
- RtecScheduler::handle_t parent_info =
- scheduler->lookup ("Dispatching_Task-250000.us" ACE_ENV_ARG_PARAMETER);
- ACE_CHECK_RETURN (0);
+#ifdef EC_KOKYU_LOGGING
+ for (i=0; i<qos.dependencies.length (); ++i)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "consumerqos[%d] event.header.type = %s,"
+ "rt_info = %d\n",
+ i,
+ designator (qos.dependencies[i].event.header.type),
+ qos.dependencies[i].rt_info));
+ }
+#endif
+
+ //find the first entry which is not a designator. We are going to
+ //assume that this entry will have the rt_info of the connecting
+ //consumer (ProxyPushSupplier), which is passed into this function.
+ for (i=0; !found && i<qos.dependencies.length (); ++i)
+ {
+ switch (qos.dependencies[i].event.header.type)
+ {
+ case ACE_ES_CONJUNCTION_DESIGNATOR:
+ case ACE_ES_DISJUNCTION_DESIGNATOR:
+ case ACE_ES_NEGATION_DESIGNATOR:
+ case ACE_ES_LOGICAL_AND_DESIGNATOR:
+ case ACE_ES_BITMASK_DESIGNATOR:
+ case ACE_ES_MASKED_TYPE_DESIGNATOR:
+ case ACE_ES_NULL_DESIGNATOR:
+ establish_final_consumer_dependency = 1;
+ continue;
+
+ case ACE_ES_GLOBAL_DESIGNATOR:
+ case ACE_ES_EVENT_TIMEOUT:
+ case ACE_ES_EVENT_INTERVAL_TIMEOUT:
+ case ACE_ES_EVENT_DEADLINE_TIMEOUT:
+ continue;
+
+ default:
+ npos = i;
+ found = 1;
+ break;
+ }
+ }
+
+ ACE_CString final_consumer_rep_name;
+ RtecScheduler::handle_t h_final_consumer_rt_info = 0;
+ RtecScheduler::handle_t h_final_consumer_rep_rt_info = 0;
+
+#ifdef EC_KOKYU_LOGGING
+ ACE_DEBUG ((LM_DEBUG, "consumer rt_info found in consumerqos[%d] \n", npos));
+#endif
+
+ if (npos >= 0 && establish_final_consumer_dependency == 1)
+ {
+ //Hopefully this will have the final consumer's rt_info
+ h_final_consumer_rt_info = qos.dependencies[npos].rt_info;
+
+#ifdef EC_KOKYU_LOGGING
+ ACE_DEBUG ((LM_DEBUG, "about to get rt_info = %d\n",
+ h_final_consumer_rep_rt_info));
+#endif
+
+ RtecScheduler::RT_Info_var final_consumer_rt_info =
+ scheduler->get ( h_final_consumer_rt_info
+ ACE_ENV_ARG_PARAMETER);
+
+ final_consumer_rep_name = final_consumer_rt_info->entry_point.in ();
+ final_consumer_rep_name += "#rep";
+
+#ifdef EC_KOKYU_LOGGING
+ ACE_DEBUG ((LM_DEBUG, "about to create consumer rep %s\n",
+ final_consumer_rep_name.c_str ()));
#endif
- return this->recursive_build (supplier, qos, pos,
- scheduler.in (),
- 0 //parent_info
+ //create an rt_info corresponding to this rep.
+ h_final_consumer_rep_rt_info =
+ scheduler->create (final_consumer_rep_name.c_str ()
+ ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK_RETURN (0);
+#ifdef EC_KOKYU_LOGGING
+ ACE_DEBUG ((LM_DEBUG, "consumer rep created\n"));
+#endif
+
+ }
+
+ //We are passing the final consumer as the parent. The final
+ //consumer is the one which is connecting to the ProxyPushSupplier
+ //passed in to this function.
+
+ TAO_EC_Filter* filter =
+ this->recursive_build (supplier, qos, pos,
+ scheduler.in (),
+ h_final_consumer_rep_rt_info //parent_info
+ ACE_ENV_ARG_PARAMETER);
+
+#ifdef EC_KOKYU_LOGGING
+ ACE_DEBUG ((LM_DEBUG,
+ "Filter_Builder::Verifying whether root filter"
+ " dependency can be established\n"));
+#endif
+
+ if (npos >= 0 && establish_final_consumer_dependency == 1)
+ {
+#ifdef EC_KOKYU_LOGGING
+ ACE_DEBUG ((LM_DEBUG,
+ "Filter_Builder::root filter dependency "
+ "can be established\n"));
+#endif
+ TAO_EC_Kokyu_Filter* kokyu_filter =
+ ACE_dynamic_cast(TAO_EC_Kokyu_Filter*, filter);
+
+ //add the dependency between the root in the filter hierarchy and
+ //the final consumer
+ TAO_EC_QOS_Info qos_info;
+ kokyu_filter->get_qos_info (qos_info);
+
+ scheduler->add_dependency (h_final_consumer_rt_info,
+ qos_info.rt_info,
+ 1,
+ RtecBase::ONE_WAY_CALL
ACE_ENV_ARG_PARAMETER);
+ }
+ return filter;
}
TAO_EC_Filter*
@@ -57,8 +188,15 @@ TAO_EC_Kokyu_Filter_Builder::recursive_build (
{
const RtecEventComm::Event& e = qos.dependencies[pos].event;
+#ifdef EC_KOKYU_LOGGING
+ ACE_DEBUG ((LM_DEBUG, "Filter_Builder::In recursive build\n"));
+#endif
+
if (e.header.type == ACE_ES_CONJUNCTION_DESIGNATOR)
{
+#ifdef EC_KOKYU_LOGGING
+ ACE_DEBUG ((LM_DEBUG, "Filter_Builder::Conjuction designator\n"));
+#endif
CORBA::ULong npos = pos;
ACE_CString name;
this->recursive_name (qos, npos,
@@ -66,33 +204,32 @@ TAO_EC_Kokyu_Filter_Builder::recursive_build (
ACE_ENV_ARG_PARAMETER);
ACE_CHECK_RETURN (0);
- RtecScheduler::handle_t rt_info =
- scheduler->create (name.c_str () ACE_ENV_ARG_PARAMETER);
- ACE_CHECK_RETURN (0);
-
pos++; // Consume the designator
+
CORBA::ULong n = this->count_children (qos, pos);
+ RtecBase::handle_t conj_rt_info = parent_info;
+
TAO_EC_Filter** children;
ACE_NEW_RETURN (children, TAO_EC_Filter*[n], 0);
for (CORBA::ULong i = 0; i != n; ++i)
{
children[i] = this->recursive_build (supplier, qos, pos,
scheduler,
- rt_info
- ACE_ENV_ARG_PARAMETER);
+ conj_rt_info
+ ACE_ENV_ARG_PARAMETER);
ACE_CHECK_RETURN (0);
}
TAO_EC_Kokyu_Filter *filter;
ACE_NEW_RETURN (filter,
TAO_EC_Kokyu_Filter (name.c_str (),
- rt_info,
+ conj_rt_info,
scheduler,
new TAO_EC_Conjunction_Filter(children,
n),
- rt_info,
- parent_info,
+ conj_rt_info,
+ conj_rt_info,
RtecScheduler::CONJUNCTION),
0);
TAO_EC_QOS_Info qos_info;
@@ -104,6 +241,9 @@ TAO_EC_Kokyu_Filter_Builder::recursive_build (
else if (e.header.type == ACE_ES_DISJUNCTION_DESIGNATOR)
{
+#ifdef EC_KOKYU_LOGGING
+ ACE_DEBUG ((LM_DEBUG, "Filter_Builder::Disjunction designator\n"));
+#endif
CORBA::ULong npos = pos;
ACE_CString name;
this->recursive_name (qos, npos,
@@ -111,11 +251,10 @@ TAO_EC_Kokyu_Filter_Builder::recursive_build (
ACE_ENV_ARG_PARAMETER);
ACE_CHECK_RETURN (0);
- RtecScheduler::handle_t rt_info =
- scheduler->create (name.c_str () ACE_ENV_ARG_PARAMETER);
- ACE_CHECK_RETURN (0);
-
pos++; // Consume the designator
+
+ RtecBase::handle_t disj_rt_info = parent_info;
+
CORBA::ULong n = this->count_children (qos, pos);
TAO_EC_Filter** children;
@@ -124,19 +263,19 @@ TAO_EC_Kokyu_Filter_Builder::recursive_build (
{
children[i] = this->recursive_build (supplier, qos, pos,
scheduler,
- rt_info
+ disj_rt_info
ACE_ENV_ARG_PARAMETER);
ACE_CHECK_RETURN (0);
}
TAO_EC_Kokyu_Filter *filter;
ACE_NEW_RETURN (filter,
TAO_EC_Kokyu_Filter (name.c_str (),
- rt_info,
+ disj_rt_info,
scheduler,
new TAO_EC_Disjunction_Filter (children,
n),
- rt_info,
- parent_info,
+ disj_rt_info,
+ disj_rt_info,
RtecScheduler::DISJUNCTION),
0);
@@ -146,107 +285,110 @@ TAO_EC_Kokyu_Filter_Builder::recursive_build (
ACE_CHECK_RETURN (0);
return filter;
}
+ else if (e.header.type == ACE_ES_EVENT_TIMEOUT
+ || e.header.type == ACE_ES_EVENT_INTERVAL_TIMEOUT
+ || e.header.type == ACE_ES_EVENT_DEADLINE_TIMEOUT)
+ {
+#ifdef EC_KOKYU_LOGGING
+ ACE_DEBUG ((LM_DEBUG, "Filter_Builder::Timeout designator\n"));
+#endif
+ // @@ We need a unique name for each timeout, assigned by the
+ // application?
+ char buf[64];
+
+ //get the rt_info for the timer consumer
+ RtecBase::handle_t h_consumer_rt_info = qos.dependencies[pos].rt_info;
+
+ //build a unique name using the cosumer_rt_info
+ ACE_OS::sprintf (buf, "TIMEOUT:%umsec:%d",
+ ACE_static_cast (u_int,
+ (e.header.creation_time / 10000)),
+ h_consumer_rt_info);
+ ACE_CString name = buf;
+
+ TAO_EC_QOS_Info qos_info;
+ qos_info.rt_info =
+ scheduler->create (name.c_str () ACE_ENV_ARG_PARAMETER);
+
+ // Convert the time to the proper units....
+ RtecScheduler::Period_t period =
+ ACE_static_cast (RtecScheduler::Period_t,
+ e.header.creation_time);
+
+#if 1 //by VS original code replaced with this
+ RtecScheduler::RT_Info* consumer_rt_info_ptr;
+
+ consumer_rt_info_ptr = scheduler->get (h_consumer_rt_info);
+ scheduler->set (qos_info.rt_info,
+ consumer_rt_info_ptr->criticality,
+ 0, // worst_cast_execution_time
+ 0, // typical_cast_execution_time
+ 0, // cached_cast_execution_time
+ period,
+ consumer_rt_info_ptr->importance,
+ 0, // quantum
+ 1, // threads
+ RtecScheduler::OPERATION
+ ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK_RETURN (0);
+
+ scheduler->add_dependency (qos_info.rt_info,
+ h_consumer_rt_info,
+ 1,
+ RtecBase::TWO_WAY_CALL
+ ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK_RETURN (0);
+#endif //by VS
+
+ pos++;
+ return new TAO_EC_Timeout_Filter (this->event_channel_,
+ supplier,
+ qos_info,
+ e.header.type,
+ e.header.creation_time);
+ }
+
#if 1 //added by VS
else if (e.header.type == ACE_ES_GLOBAL_DESIGNATOR)
{
- CORBA::ULong npos = pos+1;
- const RtecEventComm::Event& e = qos.dependencies[npos].event;
-
- RtecScheduler::handle_t body_info = qos.dependencies[npos].rt_info;
-
- RtecScheduler::RT_Info_var info =
- scheduler->get (body_info ACE_ENV_ARG_PARAMETER);
- ACE_CHECK_RETURN (0);
-
- ACE_CString name = info->entry_point.in ();
-
- TAO_EC_Kokyu_Filter *filter;
- ACE_NEW_RETURN (filter,
- TAO_EC_Kokyu_Filter (name.c_str (),
- body_info,
- scheduler,
- new TAO_EC_Type_Filter (e.header),
- body_info,
- 0,
- RtecScheduler::OPERATION),
- 0);
-
- return filter;
+ pos++;
+ return this->recursive_build (supplier, qos, pos,
+ scheduler,
+ parent_info
+ ACE_ENV_ARG_PARAMETER);
+ }
+ else
+ {
+#ifdef EC_KOKYU_LOGGING
+ ACE_DEBUG ((LM_DEBUG,
+ "Kokyu_Filter_Builder::No designator for this entry. "
+ "Must be a body\n"));
+#endif
}
#endif
- else if (e.header.type == ACE_ES_EVENT_TIMEOUT
- || e.header.type == ACE_ES_EVENT_INTERVAL_TIMEOUT
- || e.header.type == ACE_ES_EVENT_DEADLINE_TIMEOUT)
+ //probably because of a global designator, the parent_info could be 0.
+ if (parent_info == 0)
{
- pos++;
-
- // @@ We need a unique name for each timeout, assigned by the
- // application?
- char buf[64];
- ACE_OS::sprintf (buf, "TIMEOUT:%u",
- ACE_static_cast (u_int,
- (e.header.creation_time / 10000)));
- ACE_CString name = buf;
-
- TAO_EC_QOS_Info qos_info;
- qos_info.rt_info =
- scheduler->create (name.c_str () ACE_ENV_ARG_PARAMETER);
- ACE_CHECK_RETURN (0);
-
- // Convert the time to the proper units....
- RtecScheduler::Period_t period =
- ACE_static_cast (RtecScheduler::Period_t,
- e.header.creation_time / 10);
-
- scheduler->set (qos_info.rt_info,
- RtecScheduler::VERY_LOW_CRITICALITY,
- 0, // worst_cast_execution_time
- 0, // typical_cast_execution_time
- 0, // cached_cast_execution_time
- period,
- RtecScheduler::VERY_LOW_IMPORTANCE,
- 0, // quantum
- 1, // threads
- RtecScheduler::OPERATION
- ACE_ENV_ARG_PARAMETER);
- ACE_CHECK_RETURN (0);
-
- scheduler->add_dependency (qos_info.rt_info,
- parent_info,
- 1,
- RtecBase::TWO_WAY_CALL
- ACE_ENV_ARG_PARAMETER);
- ACE_CHECK_RETURN (0);
-
- return new TAO_EC_Timeout_Filter (this->event_channel_,
- supplier,
- qos_info,
- e.header.type,
- e.header.creation_time);
+ //In this case, the parent_info is the same as the one supplied
+ //in the consumer qos.
+ parent_info = qos.dependencies[pos].rt_info;
}
- RtecScheduler::handle_t body_info = qos.dependencies[pos].rt_info;
-
RtecScheduler::RT_Info_var info =
- scheduler->get (body_info ACE_ENV_ARG_PARAMETER);
+ scheduler->get (parent_info ACE_ENV_ARG_PARAMETER);
ACE_CHECK_RETURN (0);
ACE_CString name = info->entry_point.in ();
- name += "#rep";
-
- RtecScheduler::handle_t rt_info =
- scheduler->create (name.c_str () ACE_ENV_ARG_PARAMETER);
- ACE_CHECK_RETURN (0);
pos++;
TAO_EC_Kokyu_Filter *filter;
ACE_NEW_RETURN (filter,
TAO_EC_Kokyu_Filter (name.c_str (),
- rt_info,
+ parent_info,
scheduler,
new TAO_EC_Type_Filter (e.header),
- body_info,
+ parent_info,
parent_info,
RtecScheduler::OPERATION),
0);
@@ -324,7 +466,7 @@ TAO_EC_Kokyu_Filter_Builder:: recursive_name (
pos++;
char buf[64];
- ACE_OS::sprintf (buf, "TIMEOUT:%u",
+ ACE_OS::sprintf (buf, "TIMEOUT:%umsec",
ACE_static_cast (u_int,
(e.header.creation_time / 10000)));
name = buf;