summaryrefslogtreecommitdiff
path: root/TAO/orbsvcs/examples/RtEC/Federated_Kokyu/dynamic_topology_test/Dynamic_Consumer.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'TAO/orbsvcs/examples/RtEC/Federated_Kokyu/dynamic_topology_test/Dynamic_Consumer.cpp')
-rw-r--r--TAO/orbsvcs/examples/RtEC/Federated_Kokyu/dynamic_topology_test/Dynamic_Consumer.cpp164
1 files changed, 78 insertions, 86 deletions
diff --git a/TAO/orbsvcs/examples/RtEC/Federated_Kokyu/dynamic_topology_test/Dynamic_Consumer.cpp b/TAO/orbsvcs/examples/RtEC/Federated_Kokyu/dynamic_topology_test/Dynamic_Consumer.cpp
index e497b6e7b5c..1450b37fc75 100644
--- a/TAO/orbsvcs/examples/RtEC/Federated_Kokyu/dynamic_topology_test/Dynamic_Consumer.cpp
+++ b/TAO/orbsvcs/examples/RtEC/Federated_Kokyu/dynamic_topology_test/Dynamic_Consumer.cpp
@@ -31,8 +31,7 @@ Dynamic_Consumer::Dynamic_Consumer (SourceID normal_type,
: Consumer(normal_type,ft_type,worktime,fwddest,handler)
, norm_type2_(normal_type2)
, ft_type2_(ft_type2)
- , received_ft_type1_(false)
- , received_ft_type2_(false)
+ , window_(1)
{
}
@@ -45,8 +44,7 @@ Dynamic_Consumer::Dynamic_Consumer (SourceID normal_type,
: Consumer(normal_type,ft_type,fwddest,handler)
, norm_type2_(normal_type2)
, ft_type2_(ft_type2)
- , received_ft_type1_(false)
- , received_ft_type2_(false)
+ , window_(1)
{
}
@@ -68,77 +66,57 @@ Dynamic_Consumer::push (const RtecEventComm::EventSet& events
//WARNING: We are assuming only one event is pushed at a time!
- ACE_DEBUG ((LM_DEBUG, "Dynamic_Consumer (%P|%t) we received event type %d\n",
+ ACE_DEBUG ((LM_DEBUG, "Dynamic_Consumer (%P|%t) we received event type %i\n",
events[0].header.type));
+ if (this->handler_ != 0)
+ {
+ ACE_DEBUG((LM_DEBUG,"Dynamic_Consumer (%P|%t) calling handle_service_start()\n"));
+ this->handler_->handle_service_start(ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_DEBUG((LM_DEBUG,"Dynamic_Consumer (%P|%t) handle_service_start() DONE\n"));
+ }
+
+ //@BT INSTRUMENT with event ID: EVENT_WORK_START Measure time
+ //when work triggered by event starts.
+ //DSTRM_EVENT (TEST_ONE_FAM, START_SERVICE, guid, 0, NULL);
+ ACE_DEBUG((LM_DEBUG,"Dynamic_Consumer (%P|%t) START_SERVICE at %u\n",ACE_OS::gettimeofday().msec()));
+
+ Object_ID oid;
+ oid.id = events[0].header.eid.id;
+ oid.tid = events[0].header.eid.tid;
+ oid.pid = events[0].header.eid.pid;
+ oid.queue_id = events[0].header.eid.queue_id;
+ oid.type = events[0].header.type;
+
+ ACE_TIMEPROBE("START_SERVICE");
+ DSTRM_EVENT (TEST_ONE_FAM, START_SERVICE, 0, sizeof(Object_ID), (char*)&oid);
+
Supplier::mode_t mode;
if (events[0].header.type == this->ft_type_
|| events[0].header.type == this->ft_type2_)
{
mode = Supplier::FAULT_TOLERANT;
- if (events[0].header.type == this->ft_type_)
- {
- if (this->received_ft_type1_)
- {
- ACE_DEBUG((LM_DEBUG,"Dynamic_Consumer (%P|%t) received second event (type %d) before receiving type %d!\n",this->ft_type_,this->ft_type2_));
- }
- this->received_ft_type1_ = true;
- }
- else {
- //must be ft_type2_
- if (this->received_ft_type2_)
- {
- ACE_DEBUG((LM_DEBUG,"Dynamic_Consumer (%P|%t) received second event (type %d) before receiving type %d!\n",this->ft_type2_,this->ft_type_));
- }
- this->received_ft_type2_ = true;
- }
}
else if (events[0].header.type == this->norm_type_
|| events[0].header.type == this->norm_type2_)
{
mode = Supplier::NORMAL;
- if (this->received_ft_type1_ || this->received_ft_type2_) {
- ACE_DEBUG((LM_DEBUG,"Dynamic_Consumer (%P|%t) mode is NORMAL, but both FAULT_TOLERANT events not received!\n"));
- }
}
else
{
//default
- ACE_DEBUG((LM_DEBUG,"Dynamic_Consumer (%P|%t) received unknown type %d; ignoring\n",events[0].header.type));
+ ACE_DEBUG((LM_DEBUG,"Dynamic_Consumer (%P|%t) received unknown type %i; ignoring\n",events[0].header.type));
return;
}
- if (this->handler_ != 0)
- {
- ACE_DEBUG((LM_DEBUG,"Dynamic_Consumer (%P|%t) calling handle_service_start()\n"));
- this->handler_->handle_service_start(ACE_ENV_SINGLE_ARG_PARAMETER);
- ACE_DEBUG((LM_DEBUG,"Dynamic_Consumer (%P|%t) handle_service_start() DONE\n"));
- }
-
- //NOTE: We don't trigger START/STOP_SERVICE events or handle fwddest if
- //not ready because if we aren't ready, it's just overhead.
- //We are ready only if we're in NORMAL mode (we should receive only one
- //event) or if we're in FAULT_TOLERANT mode and have received both events.
+ //We are ready only if we're in NORMAL mode (we should receive only
+ //one event) or if we're in FAULT_TOLERANT mode and have NOT
+ //received an event from this iteration before.
if (Supplier::NORMAL == mode ||
(Supplier::FAULT_TOLERANT == mode &&
- this->received_ft_type1_ && this->received_ft_type2_))
+ this->window_ == events[0].header.eid.id))
{
- //@BT INSTRUMENT with event ID: EVENT_WORK_START Measure time
- //when work triggered by event starts.
- //DSTRM_EVENT (TEST_ONE_FAM, START_SERVICE, guid, 0, NULL);
- ACE_DEBUG((LM_DEBUG,"Dynamic_Consumer (%P|%t) START_SERVICE at %u\n",ACE_OS::gettimeofday().msec()));
-
- Object_ID oid;
- oid.id = events[0].header.eid.id;
- oid.tid = events[0].header.eid.tid;
- oid.pid = events[0].header.eid.pid;
- oid.queue_id = events[0].header.eid.queue_id;
- oid.type = events[0].header.type;
-
- ACE_TIMEPROBE("START_SERVICE");
- DSTRM_EVENT (TEST_ONE_FAM, START_SERVICE, 0, sizeof(Object_ID), (char*)&oid);
-
ACE_High_Res_Timer timer;
ACE_Time_Value elapsed_time;
@@ -172,7 +150,8 @@ Dynamic_Consumer::push (const RtecEventComm::EventSet& events
timer.elapsed_time (elapsed_time); //total elapsed time
TimeBase::TimeT now;
- ORBSVCS_Time::Time_Value_to_TimeT (now, ACE_OS::gettimeofday ());
+ ACE_Time_Value now_tv(ACE_OS::gettimeofday());
+ ORBSVCS_Time::Time_Value_to_TimeT (now, now_tv);
if(now > events[0].header.deadline )
{
this->deadline_missed_++;
@@ -180,51 +159,64 @@ Dynamic_Consumer::push (const RtecEventComm::EventSet& events
//@BT INSTRUMENT with event ID: EVENT_WORK_DEADLINE_MISSED Measure time when
//work triggered by event finishes and deadline missed.
//DSTRM_EVENT (TEST_ONE_FAM, DEADLINE_MISSED, guid, strlen(extra_info), extra_info);
- ACE_DEBUG((LM_DEBUG,"Dynamic_Consumer in thread %t STOP_SERVICE (DEADLINE_MISSED) at %u\n",ACE_OS::gettimeofday().msec()));
+ ACE_Time_Value dead_tv;
+ ORBSVCS_Time::TimeT_to_Time_Value (dead_tv, events[0].header.deadline);
+ ACE_DEBUG((LM_DEBUG,"Dynamic_Consumer in thread %t STOP_SERVICE (DEADLINE_MISSED) at %isec %iusec with deadline %isec %iusec\n",now_tv.sec(),now_tv.usec(),dead_tv.sec(),dead_tv.usec()));
DSTRM_EVENT (TEST_ONE_FAM, DEADLINE_MISSED, 0, sizeof(Object_ID), (char*)&oid);
}
- //@BT INSTRUMENT with event ID: EVENT_WORK_END Measure time when
- //work triggered by event finishes.
- //DSTRM_EVENT (TEST_ONE_FAM, STOP_SERVICE, guid,0,NULL);
- ACE_DEBUG((LM_DEBUG,"Dynamic_Consumer in thread %t STOP_SERVICE at %u\n",ACE_OS::gettimeofday().msec()));
- DSTRM_EVENT (TEST_ONE_FAM, STOP_SERVICE, 0, sizeof(Object_ID), (char*)&oid);
-
- ACE_TIMEPROBE("STOP_SERVICE");
-
- //now print timeprobe values
- ACE_TIMEPROBE_PRINT;
- ACE_TIMEPROBE_RESET;
-
- ACE_DEBUG ((LM_DEBUG, "Dynamic_Consumer (%P|%t) request processing for %d done, "
- "elapsed time = %isec %iusec, deadline_missed_=%d\n",
+ ACE_DEBUG ((LM_DEBUG, "Dynamic_Consumer (%P|%t) request processing for %i done, "
+ "elapsed time = %isec %iusec, deadline_missed_=%i\n",
events[0].header.type,elapsed_time.sec(),elapsed_time.usec(),
this->deadline_missed_));
- ACE_DEBUG((LM_DEBUG,"Dynamic_Consumer (%P|%t) processing took %d iterations\n",j));
+ ACE_DEBUG((LM_DEBUG,"Dynamic_Consumer (%P|%t) processing took %i iterations\n",j));
ACE_DEBUG((LM_DEBUG,"Dynamic_Consumer (%P|%t) event had deadline %i\n",
events[0].header.deadline));
- // ACE_DEBUG ((LM_DEBUG,
- // "Request processing in thread %t done, "
- // "prio = %d, load = %d, elapsed time = %umsec, deadline_missed = %d\n",
- // prio, exec_duration, elapsed_time.msec (),Deadline_missed ));
-
- //now, trigger the next subtask if any
- if (this->fwddest_ != 0)
- {
- //trigger next subtask; we assume we are the only ones who set the Supplier's mode!
- ACE_DEBUG((LM_DEBUG,"Dynamic_Consumer (%P|%t) triggering next subtask\n"));
- this->fwddest_->mode(mode);
- this->fwddest_->timeout_occured(ACE_ENV_SINGLE_ARG_PARAMETER);
- }
-
//get ready for next event
- this->received_ft_type1_ = false;
- this->received_ft_type2_ = false;
+ this->window_++;
+ ACE_DEBUG((LM_DEBUG,"Dynamic_Consumer (%P|%t) INCREMENT window to %i\n",this->window_));
} //else not ready to run
+ else if (Supplier::FAULT_TOLERANT == mode &&
+ this->window_ > events[0].header.eid.id)
+ {
+ ACE_DEBUG((LM_DEBUG,"Dynamic_Consumer (%P|%t) Ignoring REDUNDANT event from iteration %i\n",events[0].header.eid.id));
+ }
+ else if (Supplier::FAULT_TOLERANT == mode &&
+ this->window_ < events[0].header.eid.id)
+ {
+ ACE_DEBUG((LM_DEBUG,"Dynamic_Consumer (%P|%t) Ignoring FUTURE event from iteration %i; window is %i\n",events[0].header.eid.id,this->window_));
+ DSTRM_EVENT (TEST_ONE_FAM, FUTURE_EVENT, 0, sizeof(Object_ID), (char*)&oid);
+ }
else
{
- ACE_DEBUG((LM_DEBUG,"Dynamic_Consumer (%P|%t) NOT READY to run\n"));
+ ACE_DEBUG((LM_DEBUG,"Dynamic_Consumer (%P|%t) UNEXPECTED STATE: mode=%i\twindow=%ievent id=%i\n",mode,this->window_,events[0].header.eid.id));
+ }
+
+ //@BT INSTRUMENT with event ID: EVENT_WORK_END Measure time when
+ //work triggered by event finishes.
+ //DSTRM_EVENT (TEST_ONE_FAM, STOP_SERVICE, guid,0,NULL);
+ ACE_DEBUG((LM_DEBUG,"Dynamic_Consumer in thread %t STOP_SERVICE at %u\n",ACE_OS::gettimeofday().msec()));
+ DSTRM_EVENT (TEST_ONE_FAM, STOP_SERVICE, 0, sizeof(Object_ID), (char*)&oid);
+
+ ACE_TIMEPROBE("STOP_SERVICE");
+
+ //now print timeprobe values
+ ACE_TIMEPROBE_PRINT;
+ ACE_TIMEPROBE_RESET;
+
+ // ACE_DEBUG ((LM_DEBUG,
+ // "Request processing in thread %t done, "
+ // "prio = %i, load = %i, elapsed time = %umsec, deadline_missed = %i\n",
+ // prio, exec_duration, elapsed_time.msec (),Deadline_missed ));
+
+ //now, trigger the next subtask if any
+ if (this->fwddest_ != 0)
+ {
+ //trigger next subtask; we assume we are the only ones who set the Supplier's mode!
+ ACE_DEBUG((LM_DEBUG,"Dynamic_Consumer (%P|%t) triggering next subtask\n"));
+ this->fwddest_->mode(mode);
+ this->fwddest_->timeout_occured(oid ACE_ENV_ARG_PARAMETER);
}
if (this->handler_ != 0)