summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorwolff1 <wolff1@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>2009-04-30 03:38:06 +0000
committerwolff1 <wolff1@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>2009-04-30 03:38:06 +0000
commit1b4a0fbe93c04fa29e814d75798b21aa19fa0985 (patch)
treea44bc63e43b433ddd5d51f8b98f1cd13b7be27db
parent1d824867665bad352226ebc6461008e45e96b735 (diff)
downloadATCD-1b4a0fbe93c04fa29e814d75798b21aa19fa0985.tar.gz
added Packing_Scheduler
-rw-r--r--TAO/orbsvcs/examples/FaultTolerance/FLARe/DeCoRAM/src/Packing_Scheduler.cpp375
-rw-r--r--TAO/orbsvcs/examples/FaultTolerance/FLARe/DeCoRAM/src/Packing_Scheduler.h61
2 files changed, 436 insertions, 0 deletions
diff --git a/TAO/orbsvcs/examples/FaultTolerance/FLARe/DeCoRAM/src/Packing_Scheduler.cpp b/TAO/orbsvcs/examples/FaultTolerance/FLARe/DeCoRAM/src/Packing_Scheduler.cpp
new file mode 100644
index 00000000000..233dfb08996
--- /dev/null
+++ b/TAO/orbsvcs/examples/FaultTolerance/FLARe/DeCoRAM/src/Packing_Scheduler.cpp
@@ -0,0 +1,375 @@
+// -*- C++ -*-
+
+//=============================================================================
+/**
+ * @file Packing_Scheduler.cpp
+ *
+ * $Id$
+ *
+ * @author Friedhelm Wolf (fwolf@dre.vanderbilt.edu)
+ */
+//=============================================================================
+
+#include <numeric>
+#include "Packing_Scheduler.h"
+#include "CTT_Basic.h"
+#include "CTT_Enhanced.h"
+
+Packing_Scheduler::Packing_Scheduler (const PROCESSOR_LIST & processors,
+ unsigned int max_failures)
+ : Scheduler (processors, max_failures)
+{
+}
+
+class TaskListBackupAccumulator : public std::binary_function <bool,
+ TASK_LIST,
+ bool>
+{
+public:
+ TaskListBackupAccumulator (const Task & task,
+ const Processor & primary,
+ REPLICA_GROUPS & rep_groups)
+ : task_ (task),
+ primary_ (primary),
+ rep_groups_ (rep_groups) {}
+
+ bool operator () (bool found, TASK_LIST & tasks)
+ {
+ bool result = found;
+
+ if (!found)
+ {
+ Processor p = rep_groups_[primary_name (
+ *tasks.begin ())].begin ()->first;
+
+ if (p.compare (primary_) == 0)
+ {
+ result = true;
+ tasks.push_back (task_);
+ }
+ }
+
+ return result;
+ }
+private:
+ Task task_;
+ Processor primary_;
+ REPLICA_GROUPS & rep_groups_;
+};
+
+class RankAccumulator : public std::binary_function <unsigned int,
+ TASK_LIST,
+ unsigned int>
+{
+public:
+ RankAccumulator (REPLICA_GROUPS & rep_groups)
+ : rep_groups_ (rep_groups)
+ {
+ }
+
+ unsigned int operator () (unsigned int value, const TASK_LIST & tasks)
+ {
+ bool found = false;
+
+ // get first task
+ Task t = *tasks.begin ();
+
+ // take rank of the first entry
+ const TASK_POSITIONS & replicas =
+ rep_groups_[primary_name (t)];
+
+ // find position of task
+ unsigned int rank = 0;
+ for (; rank < replicas.size (); ++rank)
+ if (t.name.compare (replicas[rank].second.name) == 0)
+ {
+ found = true;
+ ++rank;
+ break;
+ }
+
+ if (!found)
+ rank = 0;
+
+ return value + rank;
+ }
+private:
+ REPLICA_GROUPS & rep_groups_;
+};
+
+void
+Packing_Scheduler::update_schedule (const Task & task,
+ const Processor & processor)
+{
+ this->Scheduler::update_schedule (task, processor);
+
+ this->update_task_groups (task, processor);
+}
+
+double
+Packing_Scheduler::schedule_task (const Task & task,
+ const Processor & processor)
+{
+ CTT_Basic ctt;
+
+ if (task.rank == 0)
+ {
+ // do ordinary wcrt for primary
+ TASK_LIST local_tasks = schedule_[processor];
+ local_tasks.push_back (task);
+ return ctt (local_tasks);
+ }
+ else
+ {
+ // check whether this processor already contains a replica of
+ // this application
+ TASK_POSITIONS replica_group =
+ replica_groups_[primary_name (task)];
+
+ if (std::accumulate (replica_group.begin (),
+ replica_group.end (),
+ false,
+ ProcessorNameComparison (
+ processor)))
+ return .0;
+
+ // check if the processors contains primaries
+ if (schedule_[processor].begin ()->rank == 0)
+ return .0;
+
+ // determine groups of tasks that reside on the same processors
+ TASK_LISTS local_groups = task_groups_[processor];
+
+ // add task to group
+ this->add_backup (task, local_groups);
+
+ // if this is the first entry
+ if (local_groups.size () == 1)
+ {
+ // schedule using the basic algorithm
+ TASK_LIST local_tasks = schedule_[processor];
+ local_tasks.push_back (task);
+ return ctt (local_tasks);
+ }
+ else
+ {
+ if (!(this->rank_check (local_groups)))
+ return .0;
+
+ TRACE ("found: " << local_groups);
+
+ // check if tasks are schedulable in any processor failure
+ // case
+ if (this->merge_check (local_groups))
+ {
+ CTT_Enhanced ctt_enh;
+ return ctt_enh (this->merge_lists (local_groups));
+ }
+ }
+ }
+
+ return .0;
+}
+
+class TaskListPrimaryAccumulator : public std::binary_function <bool,
+ TASK_LIST,
+ bool>
+{
+public:
+ TaskListPrimaryAccumulator (const Task & task)
+ : task_ (task) {}
+
+ bool operator () (bool found, TASK_LIST & tasks)
+ {
+ bool result = found;
+
+ if (!found)
+ {
+ if (tasks.begin ()->rank == 0)
+ {
+ result = true;
+ tasks.push_back (task_);
+ }
+ }
+
+ return result;
+ }
+
+private:
+ Task task_;
+};
+
+void
+Packing_Scheduler::update_task_groups (const Task & task,
+ const Processor & processor)
+{
+ // add task to existing task groups
+ TASK_LISTS & local_groups = task_groups_[processor];
+
+ if (task.rank == 0)
+ {
+ // if this is a primary check wether there are other
+ // primaries on this processors
+ if (!std::accumulate (local_groups.begin (),
+ local_groups.end (),
+ false,
+ TaskListPrimaryAccumulator (task)))
+ {
+ // create a new group if there is no primary yet
+ TASK_LIST new_list;
+ new_list.push_back (task);
+ local_groups.push_back (new_list);
+ }
+ }
+ else // if the task is a backup task
+ {
+ this->add_backup (task, local_groups);
+ }
+
+ TRACE (task_groups_);
+}
+
+void
+Packing_Scheduler::add_backup (const Task & task,
+ TASK_LISTS & tl)
+{
+ // first find the primary processor
+ Processor pp =
+ replica_groups_[primary_name (task)].begin ()->first;
+
+ // check wether there is a task list with the same
+ // primary processor and add it if this is the case
+ if (!std::accumulate (tl.begin (),
+ tl.end (),
+ false,
+ TaskListBackupAccumulator (task,
+ pp,
+ replica_groups_)))
+ {
+ // create a new group if there is no group yet
+ TASK_LIST new_list;
+ new_list.push_back (task);
+ tl.push_back (new_list);
+ }
+}
+
+bool
+Packing_Scheduler::rank_check (const TASK_LISTS & list)
+{
+ for (TASK_LISTS::const_iterator it1 = list.begin ();
+ it1 != list.end ();
+ ++it1)
+ {
+ for (TASK_LISTS::const_iterator it2 = it1 + 1;
+ it2 != list.end ();
+ ++it2)
+ {
+ if (this->paired_rank_check (*it1, *it2) <= max_failures_)
+ {
+ return false;
+ }
+ }
+ }
+
+ return true;
+}
+
+unsigned int
+Packing_Scheduler::paired_rank_check (const TASK_LIST & list1,
+ const TASK_LIST & list2)
+{
+ TASK_LISTS group;
+ group.push_back (list1);
+ group.push_back (list2);
+
+ // check whether k = consistency_level numbers of failures would
+ // affect more than one task group to become active.
+ return std::accumulate (group.begin (),
+ group.end (),
+ (unsigned int) 0,
+ RankAccumulator (replica_groups_));
+}
+
+bool
+Packing_Scheduler::merge_check (const TASK_LISTS & lists)
+{
+ bool schedulable = true;
+ CTT_Enhanced ctt;
+
+ for (size_t i = 0;
+ i < lists.size ();
+ ++i)
+ {
+ TASK_LISTS tl = lists;
+ TASK_LIST active_tasks;
+
+ std::transform (tl[i].begin (),
+ tl[i].end (),
+ std::inserter (active_tasks,
+ active_tasks.begin ()),
+ PrimaryConversion ());
+
+ tl[i] = active_tasks;
+
+ if (.0 >= ctt (this->merge_lists (tl)))
+ {
+ schedulable = false;
+ break;
+ }
+ }
+
+ return schedulable;
+}
+
+TASK_LIST
+Packing_Scheduler::merge_lists (const TASK_LISTS & lists)
+{
+ TASK_LIST result;
+
+ // merge all lists
+ for (TASK_LISTS::const_iterator it = lists.begin ();
+ it != lists.end ();
+ ++it)
+ {
+ std::copy (it->begin (),
+ it->end (),
+ std::inserter (result,
+ result.begin ()));
+ }
+
+ // sort list by period
+ std::sort (result.begin (),
+ result.end (),
+ PeriodComparison<Task> ());
+
+ return result;
+}
+
+std::ostream & operator<< (std::ostream & ostr,
+ const TASK_LISTS & tl)
+{
+ ostr << "<";
+ for (TASK_LISTS::const_iterator it = tl.begin ();
+ it != tl.end ();
+ ++it)
+ {
+ ostr << *it << "|";
+ }
+ ostr << ">";
+
+ return ostr;
+}
+
+std::ostream & operator<< (std::ostream & ostr,
+ const TASK_GROUPS & tg)
+{
+ ostr << "TASK_GROUPS:";
+ for (TASK_GROUPS::const_iterator it = tg.begin ();
+ it != tg.end ();
+ ++it)
+ {
+ ostr << std::endl << it->first << ": " << it->second;
+ }
+
+ return ostr;
+}
diff --git a/TAO/orbsvcs/examples/FaultTolerance/FLARe/DeCoRAM/src/Packing_Scheduler.h b/TAO/orbsvcs/examples/FaultTolerance/FLARe/DeCoRAM/src/Packing_Scheduler.h
new file mode 100644
index 00000000000..c700b126b11
--- /dev/null
+++ b/TAO/orbsvcs/examples/FaultTolerance/FLARe/DeCoRAM/src/Packing_Scheduler.h
@@ -0,0 +1,61 @@
+// -*- C++ -*-
+
+//=============================================================================
+/**
+ * @file Packing_Scheduler.h
+ *
+ * $Id$
+ *
+ * @author Friedhelm Wolf (fwolf@dre.vanderbilt.edu)
+ */
+//=============================================================================
+
+#ifndef PACKING_SCHEDULER_H_
+#define PACKING_SCHEDULER_H_
+
+#include "Scheduler.h"
+
+typedef std::vector<TASK_LIST> TASK_LISTS;
+typedef std::map <Processor, TASK_LISTS> TASK_GROUPS;
+
+std::ostream & operator<< (std::ostream & ostr,
+ const TASK_LISTS & tl);
+
+std::ostream & operator<< (std::ostream & ostr,
+ const TASK_GROUPS & tg);
+
+struct Packing_Scheduler : public Scheduler
+{
+public:
+ /// default ctor
+ Packing_Scheduler (const PROCESSOR_LIST & processors,
+ unsigned int max_failures);
+
+ virtual double schedule_task (const Task & task,
+ const Processor & processor);
+
+protected:
+ virtual void update_schedule (const Task & task,
+ const Processor & processor);
+
+private:
+ void update_task_groups (const Task & task,
+ const Processor & processor);
+
+ void add_backup (const Task & task,
+ TASK_LISTS & tg);
+
+ bool rank_check (const TASK_LISTS & list);
+
+ unsigned int paired_rank_check (const TASK_LIST & list1,
+ const TASK_LIST & list2);
+
+ bool merge_check (const TASK_LISTS & lists);
+
+ TASK_LIST merge_lists (const TASK_LISTS & lists);
+
+private:
+ TASK_GROUPS task_groups_;
+};
+
+#endif /* PACKING_SCHEDULER_H_ */