diff options
Diffstat (limited to 'TAO/orbsvcs/examples/RtEC/Federated_Kokyu/dynamic_topology_test/Dynamic_Consumer.cpp')
-rw-r--r-- | TAO/orbsvcs/examples/RtEC/Federated_Kokyu/dynamic_topology_test/Dynamic_Consumer.cpp | 164 |
1 files changed, 78 insertions, 86 deletions
diff --git a/TAO/orbsvcs/examples/RtEC/Federated_Kokyu/dynamic_topology_test/Dynamic_Consumer.cpp b/TAO/orbsvcs/examples/RtEC/Federated_Kokyu/dynamic_topology_test/Dynamic_Consumer.cpp index e497b6e7b5c..1450b37fc75 100644 --- a/TAO/orbsvcs/examples/RtEC/Federated_Kokyu/dynamic_topology_test/Dynamic_Consumer.cpp +++ b/TAO/orbsvcs/examples/RtEC/Federated_Kokyu/dynamic_topology_test/Dynamic_Consumer.cpp @@ -31,8 +31,7 @@ Dynamic_Consumer::Dynamic_Consumer (SourceID normal_type, : Consumer(normal_type,ft_type,worktime,fwddest,handler) , norm_type2_(normal_type2) , ft_type2_(ft_type2) - , received_ft_type1_(false) - , received_ft_type2_(false) + , window_(1) { } @@ -45,8 +44,7 @@ Dynamic_Consumer::Dynamic_Consumer (SourceID normal_type, : Consumer(normal_type,ft_type,fwddest,handler) , norm_type2_(normal_type2) , ft_type2_(ft_type2) - , received_ft_type1_(false) - , received_ft_type2_(false) + , window_(1) { } @@ -68,77 +66,57 @@ Dynamic_Consumer::push (const RtecEventComm::EventSet& events //WARNING: We are assuming only one event is pushed at a time! - ACE_DEBUG ((LM_DEBUG, "Dynamic_Consumer (%P|%t) we received event type %d\n", + ACE_DEBUG ((LM_DEBUG, "Dynamic_Consumer (%P|%t) we received event type %i\n", events[0].header.type)); + if (this->handler_ != 0) + { + ACE_DEBUG((LM_DEBUG,"Dynamic_Consumer (%P|%t) calling handle_service_start()\n")); + this->handler_->handle_service_start(ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_DEBUG((LM_DEBUG,"Dynamic_Consumer (%P|%t) handle_service_start() DONE\n")); + } + + //@BT INSTRUMENT with event ID: EVENT_WORK_START Measure time + //when work triggered by event starts. + //DSTRM_EVENT (TEST_ONE_FAM, START_SERVICE, guid, 0, NULL); + ACE_DEBUG((LM_DEBUG,"Dynamic_Consumer (%P|%t) START_SERVICE at %u\n",ACE_OS::gettimeofday().msec())); + + Object_ID oid; + oid.id = events[0].header.eid.id; + oid.tid = events[0].header.eid.tid; + oid.pid = events[0].header.eid.pid; + oid.queue_id = events[0].header.eid.queue_id; + oid.type = events[0].header.type; + + ACE_TIMEPROBE("START_SERVICE"); + DSTRM_EVENT (TEST_ONE_FAM, START_SERVICE, 0, sizeof(Object_ID), (char*)&oid); + Supplier::mode_t mode; if (events[0].header.type == this->ft_type_ || events[0].header.type == this->ft_type2_) { mode = Supplier::FAULT_TOLERANT; - if (events[0].header.type == this->ft_type_) - { - if (this->received_ft_type1_) - { - ACE_DEBUG((LM_DEBUG,"Dynamic_Consumer (%P|%t) received second event (type %d) before receiving type %d!\n",this->ft_type_,this->ft_type2_)); - } - this->received_ft_type1_ = true; - } - else { - //must be ft_type2_ - if (this->received_ft_type2_) - { - ACE_DEBUG((LM_DEBUG,"Dynamic_Consumer (%P|%t) received second event (type %d) before receiving type %d!\n",this->ft_type2_,this->ft_type_)); - } - this->received_ft_type2_ = true; - } } else if (events[0].header.type == this->norm_type_ || events[0].header.type == this->norm_type2_) { mode = Supplier::NORMAL; - if (this->received_ft_type1_ || this->received_ft_type2_) { - ACE_DEBUG((LM_DEBUG,"Dynamic_Consumer (%P|%t) mode is NORMAL, but both FAULT_TOLERANT events not received!\n")); - } } else { //default - ACE_DEBUG((LM_DEBUG,"Dynamic_Consumer (%P|%t) received unknown type %d; ignoring\n",events[0].header.type)); + ACE_DEBUG((LM_DEBUG,"Dynamic_Consumer (%P|%t) received unknown type %i; ignoring\n",events[0].header.type)); return; } - if (this->handler_ != 0) - { - ACE_DEBUG((LM_DEBUG,"Dynamic_Consumer (%P|%t) calling handle_service_start()\n")); - this->handler_->handle_service_start(ACE_ENV_SINGLE_ARG_PARAMETER); - ACE_DEBUG((LM_DEBUG,"Dynamic_Consumer (%P|%t) handle_service_start() DONE\n")); - } - - //NOTE: We don't trigger START/STOP_SERVICE events or handle fwddest if - //not ready because if we aren't ready, it's just overhead. - //We are ready only if we're in NORMAL mode (we should receive only one - //event) or if we're in FAULT_TOLERANT mode and have received both events. + //We are ready only if we're in NORMAL mode (we should receive only + //one event) or if we're in FAULT_TOLERANT mode and have NOT + //received an event from this iteration before. if (Supplier::NORMAL == mode || (Supplier::FAULT_TOLERANT == mode && - this->received_ft_type1_ && this->received_ft_type2_)) + this->window_ == events[0].header.eid.id)) { - //@BT INSTRUMENT with event ID: EVENT_WORK_START Measure time - //when work triggered by event starts. - //DSTRM_EVENT (TEST_ONE_FAM, START_SERVICE, guid, 0, NULL); - ACE_DEBUG((LM_DEBUG,"Dynamic_Consumer (%P|%t) START_SERVICE at %u\n",ACE_OS::gettimeofday().msec())); - - Object_ID oid; - oid.id = events[0].header.eid.id; - oid.tid = events[0].header.eid.tid; - oid.pid = events[0].header.eid.pid; - oid.queue_id = events[0].header.eid.queue_id; - oid.type = events[0].header.type; - - ACE_TIMEPROBE("START_SERVICE"); - DSTRM_EVENT (TEST_ONE_FAM, START_SERVICE, 0, sizeof(Object_ID), (char*)&oid); - ACE_High_Res_Timer timer; ACE_Time_Value elapsed_time; @@ -172,7 +150,8 @@ Dynamic_Consumer::push (const RtecEventComm::EventSet& events timer.elapsed_time (elapsed_time); //total elapsed time TimeBase::TimeT now; - ORBSVCS_Time::Time_Value_to_TimeT (now, ACE_OS::gettimeofday ()); + ACE_Time_Value now_tv(ACE_OS::gettimeofday()); + ORBSVCS_Time::Time_Value_to_TimeT (now, now_tv); if(now > events[0].header.deadline ) { this->deadline_missed_++; @@ -180,51 +159,64 @@ Dynamic_Consumer::push (const RtecEventComm::EventSet& events //@BT INSTRUMENT with event ID: EVENT_WORK_DEADLINE_MISSED Measure time when //work triggered by event finishes and deadline missed. //DSTRM_EVENT (TEST_ONE_FAM, DEADLINE_MISSED, guid, strlen(extra_info), extra_info); - ACE_DEBUG((LM_DEBUG,"Dynamic_Consumer in thread %t STOP_SERVICE (DEADLINE_MISSED) at %u\n",ACE_OS::gettimeofday().msec())); + ACE_Time_Value dead_tv; + ORBSVCS_Time::TimeT_to_Time_Value (dead_tv, events[0].header.deadline); + ACE_DEBUG((LM_DEBUG,"Dynamic_Consumer in thread %t STOP_SERVICE (DEADLINE_MISSED) at %isec %iusec with deadline %isec %iusec\n",now_tv.sec(),now_tv.usec(),dead_tv.sec(),dead_tv.usec())); DSTRM_EVENT (TEST_ONE_FAM, DEADLINE_MISSED, 0, sizeof(Object_ID), (char*)&oid); } - //@BT INSTRUMENT with event ID: EVENT_WORK_END Measure time when - //work triggered by event finishes. - //DSTRM_EVENT (TEST_ONE_FAM, STOP_SERVICE, guid,0,NULL); - ACE_DEBUG((LM_DEBUG,"Dynamic_Consumer in thread %t STOP_SERVICE at %u\n",ACE_OS::gettimeofday().msec())); - DSTRM_EVENT (TEST_ONE_FAM, STOP_SERVICE, 0, sizeof(Object_ID), (char*)&oid); - - ACE_TIMEPROBE("STOP_SERVICE"); - - //now print timeprobe values - ACE_TIMEPROBE_PRINT; - ACE_TIMEPROBE_RESET; - - ACE_DEBUG ((LM_DEBUG, "Dynamic_Consumer (%P|%t) request processing for %d done, " - "elapsed time = %isec %iusec, deadline_missed_=%d\n", + ACE_DEBUG ((LM_DEBUG, "Dynamic_Consumer (%P|%t) request processing for %i done, " + "elapsed time = %isec %iusec, deadline_missed_=%i\n", events[0].header.type,elapsed_time.sec(),elapsed_time.usec(), this->deadline_missed_)); - ACE_DEBUG((LM_DEBUG,"Dynamic_Consumer (%P|%t) processing took %d iterations\n",j)); + ACE_DEBUG((LM_DEBUG,"Dynamic_Consumer (%P|%t) processing took %i iterations\n",j)); ACE_DEBUG((LM_DEBUG,"Dynamic_Consumer (%P|%t) event had deadline %i\n", events[0].header.deadline)); - // ACE_DEBUG ((LM_DEBUG, - // "Request processing in thread %t done, " - // "prio = %d, load = %d, elapsed time = %umsec, deadline_missed = %d\n", - // prio, exec_duration, elapsed_time.msec (),Deadline_missed )); - - //now, trigger the next subtask if any - if (this->fwddest_ != 0) - { - //trigger next subtask; we assume we are the only ones who set the Supplier's mode! - ACE_DEBUG((LM_DEBUG,"Dynamic_Consumer (%P|%t) triggering next subtask\n")); - this->fwddest_->mode(mode); - this->fwddest_->timeout_occured(ACE_ENV_SINGLE_ARG_PARAMETER); - } - //get ready for next event - this->received_ft_type1_ = false; - this->received_ft_type2_ = false; + this->window_++; + ACE_DEBUG((LM_DEBUG,"Dynamic_Consumer (%P|%t) INCREMENT window to %i\n",this->window_)); } //else not ready to run + else if (Supplier::FAULT_TOLERANT == mode && + this->window_ > events[0].header.eid.id) + { + ACE_DEBUG((LM_DEBUG,"Dynamic_Consumer (%P|%t) Ignoring REDUNDANT event from iteration %i\n",events[0].header.eid.id)); + } + else if (Supplier::FAULT_TOLERANT == mode && + this->window_ < events[0].header.eid.id) + { + ACE_DEBUG((LM_DEBUG,"Dynamic_Consumer (%P|%t) Ignoring FUTURE event from iteration %i; window is %i\n",events[0].header.eid.id,this->window_)); + DSTRM_EVENT (TEST_ONE_FAM, FUTURE_EVENT, 0, sizeof(Object_ID), (char*)&oid); + } else { - ACE_DEBUG((LM_DEBUG,"Dynamic_Consumer (%P|%t) NOT READY to run\n")); + ACE_DEBUG((LM_DEBUG,"Dynamic_Consumer (%P|%t) UNEXPECTED STATE: mode=%i\twindow=%ievent id=%i\n",mode,this->window_,events[0].header.eid.id)); + } + + //@BT INSTRUMENT with event ID: EVENT_WORK_END Measure time when + //work triggered by event finishes. + //DSTRM_EVENT (TEST_ONE_FAM, STOP_SERVICE, guid,0,NULL); + ACE_DEBUG((LM_DEBUG,"Dynamic_Consumer in thread %t STOP_SERVICE at %u\n",ACE_OS::gettimeofday().msec())); + DSTRM_EVENT (TEST_ONE_FAM, STOP_SERVICE, 0, sizeof(Object_ID), (char*)&oid); + + ACE_TIMEPROBE("STOP_SERVICE"); + + //now print timeprobe values + ACE_TIMEPROBE_PRINT; + ACE_TIMEPROBE_RESET; + + // ACE_DEBUG ((LM_DEBUG, + // "Request processing in thread %t done, " + // "prio = %i, load = %i, elapsed time = %umsec, deadline_missed = %i\n", + // prio, exec_duration, elapsed_time.msec (),Deadline_missed )); + + //now, trigger the next subtask if any + if (this->fwddest_ != 0) + { + //trigger next subtask; we assume we are the only ones who set the Supplier's mode! + ACE_DEBUG((LM_DEBUG,"Dynamic_Consumer (%P|%t) triggering next subtask\n")); + this->fwddest_->mode(mode); + this->fwddest_->timeout_occured(oid ACE_ENV_ARG_PARAMETER); } if (this->handler_ != 0) |