diff options
Diffstat (limited to 'TAO/orbsvcs/orbsvcs/Event/EC_Kokyu_Filter_Builder.cpp')
-rw-r--r-- | TAO/orbsvcs/orbsvcs/Event/EC_Kokyu_Filter_Builder.cpp | 356 |
1 files changed, 249 insertions, 107 deletions
diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_Kokyu_Filter_Builder.cpp b/TAO/orbsvcs/orbsvcs/Event/EC_Kokyu_Filter_Builder.cpp index 283b7428fef..d708293dfb2 100644 --- a/TAO/orbsvcs/orbsvcs/Event/EC_Kokyu_Filter_Builder.cpp +++ b/TAO/orbsvcs/orbsvcs/Event/EC_Kokyu_Filter_Builder.cpp @@ -15,6 +15,23 @@ ACE_RCSID(Event, EC_Kokyu_Filter_Builder, "$Id$") +const char* designator (long dsgn) +{ + switch(dsgn) + { + case ACE_ES_GLOBAL_DESIGNATOR: return "GLOBAL"; + case ACE_ES_CONJUNCTION_DESIGNATOR: return "CONJ"; + case ACE_ES_DISJUNCTION_DESIGNATOR: return "DISJ"; + case ACE_ES_NEGATION_DESIGNATOR: return "NEG"; + case ACE_ES_LOGICAL_AND_DESIGNATOR: return "LOG_AND"; + case ACE_ES_BITMASK_DESIGNATOR: return "BITMASK"; + case ACE_ES_MASKED_TYPE_DESIGNATOR: return "MASKED_TYPE"; + case ACE_ES_NULL_DESIGNATOR: return "NULL"; + } + + return "---"; +} + TAO_EC_Kokyu_Filter_Builder::~TAO_EC_Kokyu_Filter_Builder (void) { } @@ -25,7 +42,11 @@ TAO_EC_Kokyu_Filter_Builder::build ( RtecEventChannelAdmin::ConsumerQOS& qos ACE_ENV_ARG_DECL) const { + CORBA::ULong i=0,found=0; CORBA::ULong pos = 0; + CORBA::Long npos = -1; + int establish_final_consumer_dependency=0; + CORBA::Object_var tmp = this->event_channel_->scheduler (); @@ -33,17 +54,127 @@ TAO_EC_Kokyu_Filter_Builder::build ( RtecScheduler::Scheduler::_narrow (tmp.in () ACE_ENV_ARG_PARAMETER); ACE_CHECK_RETURN (0); -#if 0 //by VS - // @@ How do we figure out which parent??? - RtecScheduler::handle_t parent_info = - scheduler->lookup ("Dispatching_Task-250000.us" ACE_ENV_ARG_PARAMETER); - ACE_CHECK_RETURN (0); +#ifdef EC_KOKYU_LOGGING + for (i=0; i<qos.dependencies.length (); ++i) + { + ACE_DEBUG ((LM_DEBUG, + "consumerqos[%d] event.header.type = %s," + "rt_info = %d\n", + i, + designator (qos.dependencies[i].event.header.type), + qos.dependencies[i].rt_info)); + } +#endif + + //find the first entry which is not a designator. We are going to + //assume that this entry will have the rt_info of the connecting + //consumer (ProxyPushSupplier), which is passed into this function. + for (i=0; !found && i<qos.dependencies.length (); ++i) + { + switch (qos.dependencies[i].event.header.type) + { + case ACE_ES_CONJUNCTION_DESIGNATOR: + case ACE_ES_DISJUNCTION_DESIGNATOR: + case ACE_ES_NEGATION_DESIGNATOR: + case ACE_ES_LOGICAL_AND_DESIGNATOR: + case ACE_ES_BITMASK_DESIGNATOR: + case ACE_ES_MASKED_TYPE_DESIGNATOR: + case ACE_ES_NULL_DESIGNATOR: + establish_final_consumer_dependency = 1; + continue; + + case ACE_ES_GLOBAL_DESIGNATOR: + case ACE_ES_EVENT_TIMEOUT: + case ACE_ES_EVENT_INTERVAL_TIMEOUT: + case ACE_ES_EVENT_DEADLINE_TIMEOUT: + continue; + + default: + npos = i; + found = 1; + break; + } + } + + ACE_CString final_consumer_rep_name; + RtecScheduler::handle_t h_final_consumer_rt_info = 0; + RtecScheduler::handle_t h_final_consumer_rep_rt_info = 0; + +#ifdef EC_KOKYU_LOGGING + ACE_DEBUG ((LM_DEBUG, "consumer rt_info found in consumerqos[%d] \n", npos)); +#endif + + if (npos >= 0 && establish_final_consumer_dependency == 1) + { + //Hopefully this will have the final consumer's rt_info + h_final_consumer_rt_info = qos.dependencies[npos].rt_info; + +#ifdef EC_KOKYU_LOGGING + ACE_DEBUG ((LM_DEBUG, "about to get rt_info = %d\n", + h_final_consumer_rep_rt_info)); +#endif + + RtecScheduler::RT_Info_var final_consumer_rt_info = + scheduler->get ( h_final_consumer_rt_info + ACE_ENV_ARG_PARAMETER); + + final_consumer_rep_name = final_consumer_rt_info->entry_point.in (); + final_consumer_rep_name += "#rep"; + +#ifdef EC_KOKYU_LOGGING + ACE_DEBUG ((LM_DEBUG, "about to create consumer rep %s\n", + final_consumer_rep_name.c_str ())); #endif - return this->recursive_build (supplier, qos, pos, - scheduler.in (), - 0 //parent_info + //create an rt_info corresponding to this rep. + h_final_consumer_rep_rt_info = + scheduler->create (final_consumer_rep_name.c_str () + ACE_ENV_ARG_PARAMETER); + ACE_CHECK_RETURN (0); +#ifdef EC_KOKYU_LOGGING + ACE_DEBUG ((LM_DEBUG, "consumer rep created\n")); +#endif + + } + + //We are passing the final consumer as the parent. The final + //consumer is the one which is connecting to the ProxyPushSupplier + //passed in to this function. + + TAO_EC_Filter* filter = + this->recursive_build (supplier, qos, pos, + scheduler.in (), + h_final_consumer_rep_rt_info //parent_info + ACE_ENV_ARG_PARAMETER); + +#ifdef EC_KOKYU_LOGGING + ACE_DEBUG ((LM_DEBUG, + "Filter_Builder::Verifying whether root filter" + " dependency can be established\n")); +#endif + + if (npos >= 0 && establish_final_consumer_dependency == 1) + { +#ifdef EC_KOKYU_LOGGING + ACE_DEBUG ((LM_DEBUG, + "Filter_Builder::root filter dependency " + "can be established\n")); +#endif + TAO_EC_Kokyu_Filter* kokyu_filter = + ACE_dynamic_cast(TAO_EC_Kokyu_Filter*, filter); + + //add the dependency between the root in the filter hierarchy and + //the final consumer + TAO_EC_QOS_Info qos_info; + kokyu_filter->get_qos_info (qos_info); + + scheduler->add_dependency (h_final_consumer_rt_info, + qos_info.rt_info, + 1, + RtecBase::ONE_WAY_CALL ACE_ENV_ARG_PARAMETER); + } + return filter; } TAO_EC_Filter* @@ -57,8 +188,15 @@ TAO_EC_Kokyu_Filter_Builder::recursive_build ( { const RtecEventComm::Event& e = qos.dependencies[pos].event; +#ifdef EC_KOKYU_LOGGING + ACE_DEBUG ((LM_DEBUG, "Filter_Builder::In recursive build\n")); +#endif + if (e.header.type == ACE_ES_CONJUNCTION_DESIGNATOR) { +#ifdef EC_KOKYU_LOGGING + ACE_DEBUG ((LM_DEBUG, "Filter_Builder::Conjuction designator\n")); +#endif CORBA::ULong npos = pos; ACE_CString name; this->recursive_name (qos, npos, @@ -66,33 +204,32 @@ TAO_EC_Kokyu_Filter_Builder::recursive_build ( ACE_ENV_ARG_PARAMETER); ACE_CHECK_RETURN (0); - RtecScheduler::handle_t rt_info = - scheduler->create (name.c_str () ACE_ENV_ARG_PARAMETER); - ACE_CHECK_RETURN (0); - pos++; // Consume the designator + CORBA::ULong n = this->count_children (qos, pos); + RtecBase::handle_t conj_rt_info = parent_info; + TAO_EC_Filter** children; ACE_NEW_RETURN (children, TAO_EC_Filter*[n], 0); for (CORBA::ULong i = 0; i != n; ++i) { children[i] = this->recursive_build (supplier, qos, pos, scheduler, - rt_info - ACE_ENV_ARG_PARAMETER); + conj_rt_info + ACE_ENV_ARG_PARAMETER); ACE_CHECK_RETURN (0); } TAO_EC_Kokyu_Filter *filter; ACE_NEW_RETURN (filter, TAO_EC_Kokyu_Filter (name.c_str (), - rt_info, + conj_rt_info, scheduler, new TAO_EC_Conjunction_Filter(children, n), - rt_info, - parent_info, + conj_rt_info, + conj_rt_info, RtecScheduler::CONJUNCTION), 0); TAO_EC_QOS_Info qos_info; @@ -104,6 +241,9 @@ TAO_EC_Kokyu_Filter_Builder::recursive_build ( else if (e.header.type == ACE_ES_DISJUNCTION_DESIGNATOR) { +#ifdef EC_KOKYU_LOGGING + ACE_DEBUG ((LM_DEBUG, "Filter_Builder::Disjunction designator\n")); +#endif CORBA::ULong npos = pos; ACE_CString name; this->recursive_name (qos, npos, @@ -111,11 +251,10 @@ TAO_EC_Kokyu_Filter_Builder::recursive_build ( ACE_ENV_ARG_PARAMETER); ACE_CHECK_RETURN (0); - RtecScheduler::handle_t rt_info = - scheduler->create (name.c_str () ACE_ENV_ARG_PARAMETER); - ACE_CHECK_RETURN (0); - pos++; // Consume the designator + + RtecBase::handle_t disj_rt_info = parent_info; + CORBA::ULong n = this->count_children (qos, pos); TAO_EC_Filter** children; @@ -124,19 +263,19 @@ TAO_EC_Kokyu_Filter_Builder::recursive_build ( { children[i] = this->recursive_build (supplier, qos, pos, scheduler, - rt_info + disj_rt_info ACE_ENV_ARG_PARAMETER); ACE_CHECK_RETURN (0); } TAO_EC_Kokyu_Filter *filter; ACE_NEW_RETURN (filter, TAO_EC_Kokyu_Filter (name.c_str (), - rt_info, + disj_rt_info, scheduler, new TAO_EC_Disjunction_Filter (children, n), - rt_info, - parent_info, + disj_rt_info, + disj_rt_info, RtecScheduler::DISJUNCTION), 0); @@ -146,107 +285,110 @@ TAO_EC_Kokyu_Filter_Builder::recursive_build ( ACE_CHECK_RETURN (0); return filter; } + else if (e.header.type == ACE_ES_EVENT_TIMEOUT + || e.header.type == ACE_ES_EVENT_INTERVAL_TIMEOUT + || e.header.type == ACE_ES_EVENT_DEADLINE_TIMEOUT) + { +#ifdef EC_KOKYU_LOGGING + ACE_DEBUG ((LM_DEBUG, "Filter_Builder::Timeout designator\n")); +#endif + // @@ We need a unique name for each timeout, assigned by the + // application? + char buf[64]; + + //get the rt_info for the timer consumer + RtecBase::handle_t h_consumer_rt_info = qos.dependencies[pos].rt_info; + + //build a unique name using the cosumer_rt_info + ACE_OS::sprintf (buf, "TIMEOUT:%umsec:%d", + ACE_static_cast (u_int, + (e.header.creation_time / 10000)), + h_consumer_rt_info); + ACE_CString name = buf; + + TAO_EC_QOS_Info qos_info; + qos_info.rt_info = + scheduler->create (name.c_str () ACE_ENV_ARG_PARAMETER); + + // Convert the time to the proper units.... + RtecScheduler::Period_t period = + ACE_static_cast (RtecScheduler::Period_t, + e.header.creation_time); + +#if 1 //by VS original code replaced with this + RtecScheduler::RT_Info* consumer_rt_info_ptr; + + consumer_rt_info_ptr = scheduler->get (h_consumer_rt_info); + scheduler->set (qos_info.rt_info, + consumer_rt_info_ptr->criticality, + 0, // worst_cast_execution_time + 0, // typical_cast_execution_time + 0, // cached_cast_execution_time + period, + consumer_rt_info_ptr->importance, + 0, // quantum + 1, // threads + RtecScheduler::OPERATION + ACE_ENV_ARG_PARAMETER); + ACE_CHECK_RETURN (0); + + scheduler->add_dependency (qos_info.rt_info, + h_consumer_rt_info, + 1, + RtecBase::TWO_WAY_CALL + ACE_ENV_ARG_PARAMETER); + ACE_CHECK_RETURN (0); +#endif //by VS + + pos++; + return new TAO_EC_Timeout_Filter (this->event_channel_, + supplier, + qos_info, + e.header.type, + e.header.creation_time); + } + #if 1 //added by VS else if (e.header.type == ACE_ES_GLOBAL_DESIGNATOR) { - CORBA::ULong npos = pos+1; - const RtecEventComm::Event& e = qos.dependencies[npos].event; - - RtecScheduler::handle_t body_info = qos.dependencies[npos].rt_info; - - RtecScheduler::RT_Info_var info = - scheduler->get (body_info ACE_ENV_ARG_PARAMETER); - ACE_CHECK_RETURN (0); - - ACE_CString name = info->entry_point.in (); - - TAO_EC_Kokyu_Filter *filter; - ACE_NEW_RETURN (filter, - TAO_EC_Kokyu_Filter (name.c_str (), - body_info, - scheduler, - new TAO_EC_Type_Filter (e.header), - body_info, - 0, - RtecScheduler::OPERATION), - 0); - - return filter; + pos++; + return this->recursive_build (supplier, qos, pos, + scheduler, + parent_info + ACE_ENV_ARG_PARAMETER); + } + else + { +#ifdef EC_KOKYU_LOGGING + ACE_DEBUG ((LM_DEBUG, + "Kokyu_Filter_Builder::No designator for this entry. " + "Must be a body\n")); +#endif } #endif - else if (e.header.type == ACE_ES_EVENT_TIMEOUT - || e.header.type == ACE_ES_EVENT_INTERVAL_TIMEOUT - || e.header.type == ACE_ES_EVENT_DEADLINE_TIMEOUT) + //probably because of a global designator, the parent_info could be 0. + if (parent_info == 0) { - pos++; - - // @@ We need a unique name for each timeout, assigned by the - // application? - char buf[64]; - ACE_OS::sprintf (buf, "TIMEOUT:%u", - ACE_static_cast (u_int, - (e.header.creation_time / 10000))); - ACE_CString name = buf; - - TAO_EC_QOS_Info qos_info; - qos_info.rt_info = - scheduler->create (name.c_str () ACE_ENV_ARG_PARAMETER); - ACE_CHECK_RETURN (0); - - // Convert the time to the proper units.... - RtecScheduler::Period_t period = - ACE_static_cast (RtecScheduler::Period_t, - e.header.creation_time / 10); - - scheduler->set (qos_info.rt_info, - RtecScheduler::VERY_LOW_CRITICALITY, - 0, // worst_cast_execution_time - 0, // typical_cast_execution_time - 0, // cached_cast_execution_time - period, - RtecScheduler::VERY_LOW_IMPORTANCE, - 0, // quantum - 1, // threads - RtecScheduler::OPERATION - ACE_ENV_ARG_PARAMETER); - ACE_CHECK_RETURN (0); - - scheduler->add_dependency (qos_info.rt_info, - parent_info, - 1, - RtecBase::TWO_WAY_CALL - ACE_ENV_ARG_PARAMETER); - ACE_CHECK_RETURN (0); - - return new TAO_EC_Timeout_Filter (this->event_channel_, - supplier, - qos_info, - e.header.type, - e.header.creation_time); + //In this case, the parent_info is the same as the one supplied + //in the consumer qos. + parent_info = qos.dependencies[pos].rt_info; } - RtecScheduler::handle_t body_info = qos.dependencies[pos].rt_info; - RtecScheduler::RT_Info_var info = - scheduler->get (body_info ACE_ENV_ARG_PARAMETER); + scheduler->get (parent_info ACE_ENV_ARG_PARAMETER); ACE_CHECK_RETURN (0); ACE_CString name = info->entry_point.in (); - name += "#rep"; - - RtecScheduler::handle_t rt_info = - scheduler->create (name.c_str () ACE_ENV_ARG_PARAMETER); - ACE_CHECK_RETURN (0); pos++; TAO_EC_Kokyu_Filter *filter; ACE_NEW_RETURN (filter, TAO_EC_Kokyu_Filter (name.c_str (), - rt_info, + parent_info, scheduler, new TAO_EC_Type_Filter (e.header), - body_info, + parent_info, parent_info, RtecScheduler::OPERATION), 0); @@ -324,7 +466,7 @@ TAO_EC_Kokyu_Filter_Builder:: recursive_name ( pos++; char buf[64]; - ACE_OS::sprintf (buf, "TIMEOUT:%u", + ACE_OS::sprintf (buf, "TIMEOUT:%umsec", ACE_static_cast (u_int, (e.header.creation_time / 10000))); name = buf; |