summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorboris <boris@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>2003-11-03 23:23:22 +0000
committerboris <boris@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>2003-11-03 23:23:22 +0000
commitb74c3ce9b719f1b325370d067ad44fa2404a23c3 (patch)
tree7eac2188160da955c2947dc464de9176c7d4b282
parent8c9a3e8856cd212cce975d7c016b1e121ff015f8 (diff)
downloadATCD-b74c3ce9b719f1b325370d067ad44fa2404a23c3.tar.gz
ChangeLogTag: Mon Nov 3 17:02:42 UTC 2003 Don Hinton <dhinton@dresystems.com>
-rw-r--r--ace/TMCast/Export.hpp54
-rw-r--r--ace/TMCast/FaultDetector.hpp41
-rw-r--r--ace/TMCast/Group.cpp508
-rw-r--r--ace/TMCast/Group.hpp51
-rw-r--r--ace/TMCast/GroupFwd.hpp15
-rw-r--r--ace/TMCast/LinkListener.hpp166
-rw-r--r--ace/TMCast/MTQueue.hpp177
-rw-r--r--ace/TMCast/Makefile221
-rw-r--r--ace/TMCast/Messaging.hpp54
-rw-r--r--ace/TMCast/Protocol.cpp31
-rw-r--r--ace/TMCast/Protocol.hpp107
-rw-r--r--ace/TMCast/README58
-rw-r--r--ace/TMCast/TransactionController.hpp384
-rw-r--r--examples/TMCast/Makefile21
-rw-r--r--examples/TMCast/Member/Makefile124
-rw-r--r--examples/TMCast/Member/README36
-rw-r--r--examples/TMCast/Member/member.cpp80
-rw-r--r--protocols/ace/TMCast/Export.hpp54
-rw-r--r--protocols/ace/TMCast/FaultDetector.hpp41
-rw-r--r--protocols/ace/TMCast/Group.cpp508
-rw-r--r--protocols/ace/TMCast/Group.hpp51
-rw-r--r--protocols/ace/TMCast/GroupFwd.hpp15
-rw-r--r--protocols/ace/TMCast/LinkListener.hpp166
-rw-r--r--protocols/ace/TMCast/MTQueue.hpp177
-rw-r--r--protocols/ace/TMCast/Makefile221
-rw-r--r--protocols/ace/TMCast/Messaging.hpp54
-rw-r--r--protocols/ace/TMCast/Protocol.cpp31
-rw-r--r--protocols/ace/TMCast/Protocol.hpp107
-rw-r--r--protocols/ace/TMCast/README58
-rw-r--r--protocols/ace/TMCast/TransactionController.hpp384
30 files changed, 3995 insertions, 0 deletions
diff --git a/ace/TMCast/Export.hpp b/ace/TMCast/Export.hpp
new file mode 100644
index 00000000000..149a83cb785
--- /dev/null
+++ b/ace/TMCast/Export.hpp
@@ -0,0 +1,54 @@
+
+// -*- C++ -*-
+// $Id$
+// Definition for Win32 Export directives.
+// This file is generated automatically by generate_export_file.pl TMCast
+// ------------------------------
+#ifndef TMCAST_EXPORT_H
+#define TMCAST_EXPORT_H
+
+#include "ace/config-all.h"
+
+#if !defined (TMCAST_HAS_DLL)
+# define TMCAST_HAS_DLL 1
+#endif /* ! TMCAST_HAS_DLL */
+
+#if defined (TMCAST_HAS_DLL) && (TMCAST_HAS_DLL == 1)
+# if defined (TMCAST_BUILD_DLL)
+# define TMCast_Export ACE_Proper_Export_Flag
+# define TMCAST_SINGLETON_DECLARATION(T) ACE_EXPORT_SINGLETON_DECLARATION (T)
+# define TMCAST_SINGLETON_DECLARE(SINGLETON_TYPE, CLASS, LOCK) ACE_EXPORT_SINGLETON_DECLARE(SINGLETON_TYPE, CLASS, LOCK)
+# else /* TMCAST_BUILD_DLL */
+# define TMCast_Export ACE_Proper_Import_Flag
+# define TMCAST_SINGLETON_DECLARATION(T) ACE_IMPORT_SINGLETON_DECLARATION (T)
+# define TMCAST_SINGLETON_DECLARE(SINGLETON_TYPE, CLASS, LOCK) ACE_IMPORT_SINGLETON_DECLARE(SINGLETON_TYPE, CLASS, LOCK)
+# endif /* TMCAST_BUILD_DLL */
+#else /* TMCAST_HAS_DLL == 1 */
+# define TMCast_Export
+# define TMCAST_SINGLETON_DECLARATION(T)
+# define TMCAST_SINGLETON_DECLARE(SINGLETON_TYPE, CLASS, LOCK)
+#endif /* TMCAST_HAS_DLL == 1 */
+
+// Set TMCAST_NTRACE = 0 to turn on library specific tracing even if
+// tracing is turned off for ACE.
+#if !defined (TMCAST_NTRACE)
+# if (ACE_NTRACE == 1)
+# define TMCAST_NTRACE 1
+# else /* (ACE_NTRACE == 1) */
+# define TMCAST_NTRACE 0
+# endif /* (ACE_NTRACE == 1) */
+#endif /* !TMCAST_NTRACE */
+
+#if (TMCAST_NTRACE == 1)
+# define TMCAST_TRACE(X)
+#else /* (TMCAST_NTRACE == 1) */
+# if !defined (ACE_HAS_TRACE)
+# define ACE_HAS_TRACE
+# endif /* ACE_HAS_TRACE */
+# define TMCAST_TRACE(X) ACE_TRACE_IMPL(X)
+# include "ace/Trace.h"
+#endif /* (TMCAST_NTRACE == 1) */
+
+#endif /* TMCAST_EXPORT_H */
+
+// End of auto generated file.
diff --git a/ace/TMCast/FaultDetector.hpp b/ace/TMCast/FaultDetector.hpp
new file mode 100644
index 00000000000..ba476cbd367
--- /dev/null
+++ b/ace/TMCast/FaultDetector.hpp
@@ -0,0 +1,41 @@
+// file : TMCast/FaultDetector.hpp
+// author : Boris Kolpackov <boris@dre.vanderbilt.edu>
+// cvs-id : $Id$
+
+#include "Protocol.hpp"
+
+namespace TMCast
+{
+ class FaultDetector
+ {
+ public:
+ FaultDetector ()
+ : silence_period_ (-1)
+ {
+ }
+
+ public:
+ class Failed {};
+
+
+ void
+ insync ()
+ {
+ silence_period_ = 0;
+ }
+
+ void
+ outsync ()
+ {
+ if (++silence_period_ >= Protocol::FATAL_SILENCE_FRAME)
+ {
+ // cerr << "Silence period has been passed." << endl;
+ // cerr << "Decalring the node failed." << endl;
+ throw Failed ();
+ }
+ }
+
+ private:
+ short silence_period_;
+ };
+}
diff --git a/ace/TMCast/Group.cpp b/ace/TMCast/Group.cpp
new file mode 100644
index 00000000000..f6858d96644
--- /dev/null
+++ b/ace/TMCast/Group.cpp
@@ -0,0 +1,508 @@
+// file : TMCast/Group.cpp
+// author : Boris Kolpackov <boris@dre.vanderbilt.edu>
+// cvs-id : $Id$
+
+#include "Group.hpp"
+
+#include <typeinfo>
+
+// OS primitives
+#include <ace/OS.h>
+#include <ace/Synch.h>
+#include <ace/Time_Value.h>
+#include <ace/SOCK_Dgram_Mcast.h>
+
+#include "Messaging.hpp"
+
+#include "Protocol.hpp"
+
+// Components
+
+#include "LinkListener.hpp"
+#include "FaultDetector.hpp"
+#include "TransactionController.hpp"
+
+namespace TMCast
+{
+ bool
+ operator== (std::type_info const* pa, std::type_info const& b)
+ {
+ return *pa == b;
+ }
+
+ //
+ //
+ //
+ class Terminate : public virtual Message {};
+
+
+ //
+ //
+ //
+ class Failure : public virtual Message {};
+
+
+ //
+ //
+ //
+ class Scheduler
+ {
+ public:
+ Scheduler (ACE_INET_Addr const& addr,
+ char const* id,
+ MessageQueue& out_send_data,
+ MessageQueue& out_recv_data,
+ MessageQueue& out_control)
+
+ : cond_ (mutex_),
+
+ addr_ (addr),
+ sock_ (),
+
+ out_control_ (out_control),
+
+ in_data_ (mutex_),
+ in_link_data_(mutex_),
+ in_control_ (mutex_),
+
+ sync_schedule (ACE_OS::gettimeofday ()),
+
+ transaction_controller_ (in_data_, out_send_data, out_recv_data)
+ {
+ ACE_OS::strncpy (id_, id, Protocol::MEMBER_ID_LENGTH);
+ id_[Protocol::MEMBER_ID_LENGTH - 1] = '\0';
+
+ sock_.set_option (IP_MULTICAST_TTL, 32); // @@ ttl is hardcoded
+
+ in_data_.subscribe (cond_);
+ in_link_data_.subscribe (cond_);
+ in_control_.subscribe (cond_);
+
+ if (ACE_OS::thr_create (&thread_thunk,
+ this,
+ THR_JOINABLE,
+ &thread_) != 0) ::abort ();
+ }
+
+ ~Scheduler ()
+ {
+ {
+ MessageQueueAutoLock lock (in_control_);
+
+ in_control_.push (MessagePtr (new Terminate));
+ }
+
+ if (ACE_OS::thr_join (thread_, 0) != 0) ::abort ();
+
+ // cerr << "Scheduler is down." << endl;
+ }
+
+ public:
+ MessageQueue&
+ in_data ()
+ {
+ return in_data_;
+ }
+
+ private:
+ static void*
+ thread_thunk (void* arg)
+ {
+ Scheduler* obj = reinterpret_cast<Scheduler*> (arg);
+
+ obj->execute ();
+ return 0;
+ }
+
+ void
+ execute ()
+ {
+ try
+ {
+ sock_.join (addr_);
+
+ auto_ptr<LinkListener> ll (new LinkListener (sock_, in_link_data_));
+
+ {
+ AutoLock lock (mutex_);
+
+ // Loop
+ //
+ //
+
+ while (true)
+ {
+ cond_.wait (&sync_schedule);
+
+ // "Loop of Fairness"
+
+ bool done = false;
+
+ do
+ {
+ // control message
+ //
+ //
+ if (!in_control_.empty ())
+ {
+ done = true;
+ break;
+ }
+
+
+ // outsync
+ //
+ //
+ if (sync_schedule < ACE_OS::gettimeofday ())
+ {
+ // OUTSYNC
+
+ outsync ();
+
+ // schedule next outsync
+
+ sync_schedule =
+ ACE_OS::gettimeofday () +
+ ACE_Time_Value (0, Protocol::SYNC_PERIOD);
+ }
+
+ // link message
+ //
+ //
+ if (!in_link_data_.empty ())
+ {
+ MessagePtr m (in_link_data_.front ());
+ in_link_data_.pop ();
+
+ std::type_info const* exp (&typeid (*m));
+
+ if (exp == typeid (LinkFailure))
+ {
+ // cerr << "link failure" << endl;
+ throw false;
+ }
+ else if (exp == typeid (LinkData))
+ {
+
+ LinkData* data (dynamic_cast<LinkData*> (m.get ()));
+
+ // INSYNC, TL, CT
+
+ // Filter out loopback.
+ //
+ if (ACE_OS::strcmp (data->header().member_id.id, id_) != 0)
+ {
+ insync ();
+ transaction_list ();
+ current_transaction (data->header().current,
+ data->payload (),
+ data->size ());
+ }
+ }
+ else
+ {
+ // cerr << "unknown message type from link listener: "
+ // << typeid (*m).name () << endl;
+ abort ();
+ }
+ }
+
+ // api message
+ //
+ //
+ if (!in_data_.empty ())
+ {
+ // API
+
+ api ();
+ }
+
+ } while (!in_link_data_.empty() ||
+ sync_schedule < ACE_OS::gettimeofday ());
+
+ if (done) break;
+ }
+ }
+ }
+ catch (...)
+ {
+ // cerr << "Exception in scheduler loop." << endl;
+
+ MessageQueueAutoLock lock (out_control_);
+ out_control_.push (MessagePtr (new Failure));
+ }
+ }
+
+ // Events
+ //
+ // Order:
+ //
+ // INSYNC, TSL, VOTE, BEGIN
+ // API
+ // OUTSYNC
+ //
+
+ void
+ insync ()
+ {
+ fault_detector_.insync ();
+ }
+
+ void
+ outsync ()
+ {
+ char buf[Protocol::MAX_MESSAGE_SIZE];
+
+ Protocol::MessageHeader* hdr =
+ reinterpret_cast<Protocol::MessageHeader*> (buf);
+
+ void* data = buf + sizeof (Protocol::MessageHeader);
+
+ hdr->length = sizeof (Protocol::MessageHeader);
+ hdr->check_sum = 0;
+
+ ACE_OS::strcpy (hdr->member_id.id, id_);
+
+ size_t size (0);
+
+ transaction_controller_.outsync (hdr->current, data, size);
+
+ hdr->length += size;
+
+ fault_detector_.outsync ();
+
+ // sock_.send (buf, hdr->length, addr_);
+
+ sock_.send (buf, hdr->length);
+ }
+
+ void
+ transaction_list ()
+ {
+ }
+
+ void
+ current_transaction (Protocol::Transaction const& t,
+ void const* payload,
+ size_t size)
+ {
+ transaction_controller_.current_transaction (t, payload, size);
+ }
+
+ void
+ api ()
+ {
+ transaction_controller_.api ();
+ }
+
+ private:
+ ACE_thread_t thread_;
+
+ ACE_Thread_Mutex mutex_;
+ ACE_Condition<ACE_Thread_Mutex> cond_;
+
+ typedef ACE_Guard<ACE_Thread_Mutex> AutoLock;
+
+ char id_[Protocol::MEMBER_ID_LENGTH];
+
+ ACE_INET_Addr addr_;
+ ACE_SOCK_Dgram_Mcast sock_;
+
+ MessageQueue& out_control_;
+
+ MessageQueue in_data_;
+ MessageQueue in_link_data_;
+ MessageQueue in_control_;
+
+ // Protocol state
+ //
+ //
+
+ ACE_Time_Value sync_schedule;
+
+ FaultDetector fault_detector_;
+ TransactionController transaction_controller_;
+ };
+
+
+ //
+ //
+ //
+ class Group::GroupImpl
+ {
+ public:
+ ~GroupImpl ()
+ {
+ }
+
+ GroupImpl (ACE_INET_Addr const& addr, char const* id)
+ throw (Group::Failed)
+ : send_cond_ (mutex_),
+ recv_cond_ (mutex_),
+ failed_ (false),
+ in_send_data_ (mutex_),
+ in_recv_data_ (mutex_),
+ in_control_ (mutex_),
+ scheduler_ (new Scheduler (addr,
+ id,
+ in_send_data_,
+ in_recv_data_,
+ in_control_)),
+ out_data_ (scheduler_->in_data ())
+ {
+ in_send_data_.subscribe (send_cond_);
+ in_recv_data_.subscribe (recv_cond_);
+
+ in_control_.subscribe (send_cond_);
+ in_control_.subscribe (recv_cond_);
+ }
+
+ void
+ send (void const* msg, size_t size)
+ throw (Group::InvalidArg, Group::Failed, Group::Aborted)
+ {
+ if (size > Protocol::MAX_PAYLOAD_SIZE) throw InvalidArg ();
+
+ // Note the potential deadlock if I lock mutex_ and out_data_ in
+ // reverse order.
+
+ MessageQueueAutoLock l1 (out_data_);
+ AutoLock l2 (mutex_);
+
+ throw_if_failed ();
+
+ out_data_.push (MessagePtr (new Send (msg, size)));
+
+ l1.unlock (); // no need to keep it locked
+
+ while (true)
+ {
+ throw_if_failed ();
+
+ if (!in_send_data_.empty ())
+ {
+ MessagePtr m (in_send_data_.front ());
+ in_send_data_.pop ();
+
+ std::type_info const* exp (&typeid (*m));
+
+ if (exp == typeid (TMCast::Aborted))
+ {
+ throw Group::Aborted ();
+ }
+ else if (exp == typeid (Commited))
+ {
+ return;
+ }
+ else
+ {
+ // cerr << "send: group-scheduler messaging protocol violation; "
+ // << "unexpected message " << typeid (*m).name ()
+ // << " " << typeid (Aborted).name () << endl;
+
+ abort ();
+ }
+ }
+
+ // cerr << "send: waiting on condition" << endl;
+ send_cond_.wait ();
+ // cerr << "send: wokeup on condition" << endl;
+ }
+ }
+
+
+
+ size_t
+ recv (void* msg, size_t size) throw (Group::Failed, Group::InsufficienSpace)
+ {
+ AutoLock lock (mutex_);
+
+ while (true)
+ {
+ throw_if_failed ();
+
+ if (!in_recv_data_.empty ())
+ {
+ MessagePtr m (in_recv_data_.front ());
+ in_recv_data_.pop ();
+
+ std::type_info const* exp (&typeid (*m));
+
+ if (exp == typeid (Recv))
+ {
+ Recv* data (dynamic_cast<Recv*> (m.get ()));
+
+ if (size < data->size ()) throw Group::InsufficienSpace ();
+
+ memcpy (msg, data->payload (), data->size ());
+
+ return data->size ();
+ }
+ else
+ {
+ // cerr << "recv: group-scheduler messaging protocol violation. "
+ // << "unexpected message " << typeid (*m).name () << endl;
+
+ abort ();
+ }
+ }
+
+ recv_cond_.wait ();
+ }
+ }
+
+ private:
+ void
+ throw_if_failed ()
+ {
+ if (!failed_ && !in_control_.empty ()) failed_ = true;
+
+ if (failed_) throw Group::Failed ();
+ }
+
+ private:
+ ACE_Thread_Mutex mutex_;
+ ACE_Condition<ACE_Thread_Mutex> send_cond_;
+ ACE_Condition<ACE_Thread_Mutex> recv_cond_;
+
+ typedef ACE_Guard<ACE_Thread_Mutex> AutoLock;
+
+ bool failed_;
+
+ MessageQueue in_send_data_;
+ MessageQueue in_recv_data_;
+ MessageQueue in_control_;
+
+ auto_ptr<Scheduler> scheduler_;
+
+ MessageQueue& out_data_;
+ };
+
+
+ // Group
+ //
+ //
+ Group::
+ Group (ACE_INET_Addr const& addr, char const* id)
+ throw (Failed)
+ : pimpl_ (new GroupImpl (addr, id))
+ {
+ }
+
+ Group::
+ ~Group ()
+ {
+ }
+
+ void Group::
+ send (void const* msg, size_t size) throw (InvalidArg, Failed, Aborted)
+ {
+ pimpl_->send (msg, size);
+ }
+
+ size_t Group::
+ recv (void* msg, size_t size) throw (Failed, InsufficienSpace)
+ {
+ return pimpl_->recv (msg, size);
+ }
+}
diff --git a/ace/TMCast/Group.hpp b/ace/TMCast/Group.hpp
new file mode 100644
index 00000000000..416cea0a17d
--- /dev/null
+++ b/ace/TMCast/Group.hpp
@@ -0,0 +1,51 @@
+// file : TMCast/Group.hpp
+// author : Boris Kolpackov <boris@dre.vanderbilt.edu>
+// cvs-id : $Id$
+
+#ifndef TMCAST_GROUP_HPP
+#define TMCAST_GROUP_HPP
+
+#include <ace/Auto_Ptr.h>
+#include <ace/INET_Addr.h>
+
+#include "Export.hpp"
+
+namespace TMCast
+{
+ class TMCast_Export Group
+ {
+ public:
+ class Aborted {};
+ class Failed {};
+ class InvalidArg {};
+ class InsufficienSpace {};
+
+ public:
+ ~Group ();
+
+ Group (ACE_INET_Addr const& addr, char const* id) throw (Failed);
+
+ public:
+ void
+ send (void const* msg, size_t size) throw (InvalidArg, Failed, Aborted);
+
+ size_t
+ recv (void* msg, size_t size) throw (Failed, InsufficienSpace);
+
+ private:
+ bool
+ failed ();
+
+ private:
+ class GroupImpl;
+ auto_ptr<GroupImpl> pimpl_;
+
+ private:
+ Group (Group const&);
+
+ Group&
+ operator= (Group const&);
+ };
+}
+
+#endif // TMCAST_GROUP_HPP
diff --git a/ace/TMCast/GroupFwd.hpp b/ace/TMCast/GroupFwd.hpp
new file mode 100644
index 00000000000..beba06df79d
--- /dev/null
+++ b/ace/TMCast/GroupFwd.hpp
@@ -0,0 +1,15 @@
+// file : TMCast/GroupFwd.hpp
+// author : Boris Kolpackov <boris@dre.vanderbilt.edu>
+// cvs-id : $Id$
+
+#ifndef TMCAST_GROUP_FWD_HPP
+#define TMCAST_GROUP_FWD_HPP
+
+#include "Export.hpp"
+
+namespace TMCast
+{
+ class TMCast_Export Group;
+}
+
+#endif // TMCAST_GROUP_FWD_HPP
diff --git a/ace/TMCast/LinkListener.hpp b/ace/TMCast/LinkListener.hpp
new file mode 100644
index 00000000000..990f9e8f803
--- /dev/null
+++ b/ace/TMCast/LinkListener.hpp
@@ -0,0 +1,166 @@
+// file : TMCast/LinkListener.hpp
+// author : Boris Kolpackov <boris@dre.vanderbilt.edu>
+// cvs-id : $Id$
+
+// OS primitives
+#include <ace/Synch.h>
+#include <ace/SOCK_Dgram_Mcast.h>
+#include <ace/Refcounted_Auto_Ptr.h>
+
+
+#include "Messaging.hpp"
+
+namespace TMCast
+{
+ //
+ //
+ //
+ class LinkFailure : public virtual Message {};
+
+
+ //
+ //
+ //
+ class LinkData : public virtual Message
+ {
+ public:
+ LinkData (Protocol::MessageHeader const* header,
+ void* payload,
+ size_t size)
+ : size_ (size)
+ {
+ ACE_OS::memcpy (&header_, header, sizeof (Protocol::MessageHeader));
+ ACE_OS::memcpy (payload_, payload, size_);
+ }
+
+ Protocol::MessageHeader const&
+ header () const
+ {
+ return header_;
+ }
+
+ void const*
+ payload () const
+ {
+ return payload_;
+ }
+
+ size_t
+ size () const
+ {
+ return size_;
+ }
+
+ private:
+ Protocol::MessageHeader header_;
+ char payload_[Protocol::MAX_MESSAGE_SIZE];
+ size_t size_;
+ };
+
+ typedef
+ ACE_Refcounted_Auto_Ptr<LinkData, ACE_Null_Mutex>
+ LinkDataPtr;
+
+ //
+ //
+ //
+ class LinkListener
+ {
+ private:
+ class Terminate : public virtual Message {};
+
+ public:
+ LinkListener (ACE_SOCK_Dgram_Mcast& sock, MessageQueue& out)
+ : sock_(sock), out_ (out)
+ {
+ if (ACE_OS::thr_create (&thread_thunk,
+ this,
+ THR_JOINABLE,
+ &thread_) != 0) ::abort ();
+ }
+
+ ~LinkListener ()
+ {
+ {
+ MessageQueueAutoLock lock (control_);
+
+ control_.push (MessagePtr (new Terminate));
+ }
+
+ if (ACE_OS::thr_join (thread_, 0) != 0) ::abort ();
+
+ // cerr << "Link listener is down." << endl;
+ }
+
+
+ static void*
+ thread_thunk (void* arg)
+ {
+ LinkListener* obj = reinterpret_cast<LinkListener*> (arg);
+
+ obj->execute ();
+ return 0;
+ }
+
+ void
+ execute ()
+ {
+ char msg[Protocol::MAX_MESSAGE_SIZE];
+
+ ssize_t header_size = sizeof (Protocol::MessageHeader);
+
+ // OS::Time timeout (1000000); // one millisecond
+
+ ACE_Time_Value timeout (0, 1000); // one millisecond
+
+ try
+ {
+ while (true)
+ {
+ // Check control message queue
+
+ {
+ MessageQueueAutoLock lock (control_);
+
+ if (!control_.empty ()) break;
+ }
+
+ ACE_Addr junk;
+ ssize_t n = sock_.recv (msg,
+ Protocol::MAX_MESSAGE_SIZE,
+ junk,
+ 0,
+ &timeout);
+
+ if (n != -1)
+ {
+ if (n < header_size) throw false;
+
+ Protocol::MessageHeader* header =
+ reinterpret_cast<Protocol::MessageHeader*> (msg);
+
+ MessageQueueAutoLock lock (out_);
+
+ out_.push (MessagePtr (new LinkData (header,
+ msg + header_size,
+ n - header_size)));
+ }
+ }
+ }
+ catch (...)
+ {
+ MessageQueueAutoLock lock (out_);
+
+ out_.push (MessagePtr (new LinkFailure));
+ }
+ }
+
+ private:
+ typedef ACE_Guard<ACE_Thread_Mutex> AutoLock;
+
+ ACE_thread_t thread_;
+ ACE_SOCK_Dgram_Mcast& sock_;
+ MessageQueue& out_;
+ MessageQueue control_;
+ };
+}
diff --git a/ace/TMCast/MTQueue.hpp b/ace/TMCast/MTQueue.hpp
new file mode 100644
index 00000000000..d593c034723
--- /dev/null
+++ b/ace/TMCast/MTQueue.hpp
@@ -0,0 +1,177 @@
+// file : TMCast/MTQueue.hpp
+// author : Boris Kolpackov <boris@dre.vanderbilt.edu>
+// cvs-id : $Id$
+
+#ifndef TMCAST_MT_QUEUE_HPP
+#define TMCAST_MT_QUEUE_HPP
+
+#include "ace/Auto_Ptr.h"
+#include "ace/Unbounded_Set.h"
+#include "ace/Unbounded_Queue.h"
+
+namespace TMCast
+{
+ template <typename T,
+ typename M,
+ typename C,
+ typename Q = ACE_Unbounded_Queue<T> >
+ class MTQueue
+ {
+ public:
+ typedef T ElementType;
+ typedef M MutexType;
+ typedef C ConditionalType;
+ typedef Q QueueType;
+
+ public:
+
+ MTQueue (std::size_t hint = 0)
+ : mutexp_ (new MutexType),
+ mutex_ (*mutexp_),
+ // queue_ (hint),
+ queue_ (),
+ signal_ (false)
+ {
+ }
+
+ MTQueue (MutexType& mutex, std::size_t hint = 0)
+ : mutexp_ (),
+ mutex_ (mutex),
+ // queue_ (hint),
+ queue_ (),
+ signal_ (false)
+ {
+ }
+
+ public:
+ bool
+ empty () const
+ {
+ return queue_.is_empty ();
+ }
+
+ std::size_t
+ size () const
+ {
+ return queue_.size ();
+ }
+
+ // typedef typename QueueType::Empty Empty;
+
+ class Empty {};
+
+ T&
+ front ()
+ {
+ ACE_Unbounded_Queue_Iterator<T> f (queue_);
+ T* tmp;
+ if (!f.next (tmp)) throw Empty ();
+
+ return *tmp;
+ }
+
+
+ T const&
+ front () const
+ {
+ ACE_Unbounded_Queue_Const_Iterator<T> f (queue_);
+ T* tmp;
+ if (!f.next (tmp)) throw Empty ();
+
+ return *tmp;
+ }
+
+ /*
+ T&
+ back ()
+ {
+ return queue_.back ();
+ }
+
+
+ T const&
+ back () const
+ {
+ return queue_.back ();
+ }
+ */
+
+ void
+ push (T const& t)
+ {
+ signal_ = empty ();
+ queue_.enqueue_tail (t);
+ }
+
+ void
+ pop ()
+ {
+ T junk;
+ queue_.dequeue_head (junk);
+ }
+
+ public:
+ void
+ lock () const
+ {
+ mutex_.acquire ();
+ }
+
+ void
+ unlock () const
+ {
+ if (signal_)
+ {
+ signal_ = false;
+
+ for (ConditionalSetConstIterator_ i (cond_set_);
+ !i.done ();
+ i.advance ())
+ {
+ ConditionalType** c;
+
+ i.next (c);
+
+ (*c)->signal ();
+ }
+ }
+
+ mutex_.release ();
+ }
+
+ void
+ subscribe (ConditionalType& c)
+ {
+ //@@ should check for duplicates
+ //
+ cond_set_.insert (&c);
+ }
+
+ void
+ unsubscribe (ConditionalType& c)
+ {
+ //@@ should check for absence
+ //
+ cond_set_.remove (&c);
+ }
+
+ private:
+ auto_ptr<MutexType> mutexp_;
+ mutable MutexType& mutex_;
+ QueueType queue_;
+
+ typedef
+ ACE_Unbounded_Set<ConditionalType*>
+ ConditionalSet_;
+
+ typedef
+ ACE_Unbounded_Set_Const_Iterator<ConditionalType*>
+ ConditionalSetConstIterator_;
+
+ ConditionalSet_ cond_set_;
+
+ mutable bool signal_;
+ };
+}
+
+#endif // TMCAST_MT_QUEUE_HPP
diff --git a/ace/TMCast/Makefile b/ace/TMCast/Makefile
new file mode 100644
index 00000000000..87f6d1964ba
--- /dev/null
+++ b/ace/TMCast/Makefile
@@ -0,0 +1,221 @@
+#----------------------------------------------------------------------------
+#
+# $Id$
+#
+#----------------------------------------------------------------------------
+
+MAKEFILE = Makefile
+LIB = libTMCast.a
+SHLIB = libTMCast.$(SOEXT)
+
+FILES= Group Protocol
+
+LIBS=$(ACELIB)
+
+#----------------------------------------------------------------------------
+# Include macros and targets
+#----------------------------------------------------------------------------
+
+include $(ACE_ROOT)/include/makeinclude/wrapper_macros.GNU
+
+LSRC = $(addsuffix .cpp,$(FILES))
+
+include $(ACE_ROOT)/include/makeinclude/macros.GNU
+include $(ACE_ROOT)/include/makeinclude/rules.common.GNU
+include $(ACE_ROOT)/include/makeinclude/rules.nonested.GNU
+include $(ACE_ROOT)/include/makeinclude/rules.lib.GNU
+include $(ACE_ROOT)/include/makeinclude/rules.local.GNU
+
+#----------------------------------------------------------------------------
+# Local targets
+#----------------------------------------------------------------------------
+
+ifeq ($(shared_libs),1)
+ifneq ($(SHLIB),)
+CPPFLAGS += -DTMCAST_BUILD_DLL
+endif
+endif
+
+#----------------------------------------------------------------------------
+# Dependencies
+#----------------------------------------------------------------------------
+# DO NOT DELETE THIS LINE -- g++dep uses it.
+# DO NOT PUT ANYTHING AFTER THIS LINE, IT WILL GO AWAY.
+
+
+.obj/Group.o .obj/Group.so .shobj/Group.o .shobj/Group.so: Group.cpp Group.hpp \
+ $(ACE_ROOT)/ace/Auto_Ptr.h \
+ $(ACE_ROOT)/ace/pre.h \
+ $(ACE_ROOT)/ace/post.h \
+ $(ACE_ROOT)/ace/ace_wchar.h \
+ $(ACE_ROOT)/ace/ace_wchar.inl \
+ $(ACE_ROOT)/ace/Auto_Ptr.i \
+ $(ACE_ROOT)/ace/Global_Macros.h \
+ $(ACE_ROOT)/ace/OS_Export.h \
+ $(ACE_ROOT)/ace/Auto_Ptr.cpp \
+ $(ACE_ROOT)/ace/INET_Addr.h \
+ $(ACE_ROOT)/ace/Sock_Connect.h \
+ $(ACE_ROOT)/ace/ACE_export.h \
+ $(ACE_ROOT)/ace/Basic_Types.h \
+ $(ACE_ROOT)/ace/os_include/os_limits.h \
+ $(ACE_ROOT)/ace/os_include/os_unistd.h \
+ $(ACE_ROOT)/ace/os_include/sys/os_types.h \
+ $(ACE_ROOT)/ace/os_include/os_stddef.h \
+ $(ACE_ROOT)/ace/os_include/os_inttypes.h \
+ $(ACE_ROOT)/ace/os_include/os_stdint.h \
+ $(ACE_ROOT)/ace/os_include/os_stdio.h \
+ $(ACE_ROOT)/ace/os_include/os_stdarg.h \
+ $(ACE_ROOT)/ace/os_include/os_float.h \
+ $(ACE_ROOT)/ace/os_include/os_stdlib.h \
+ $(ACE_ROOT)/ace/os_include/sys/os_wait.h \
+ $(ACE_ROOT)/ace/os_include/os_signal.h \
+ $(ACE_ROOT)/ace/os_include/os_time.h \
+ $(ACE_ROOT)/ace/os_include/os_ucontext.h \
+ $(ACE_ROOT)/ace/os_include/sys/os_resource.h \
+ $(ACE_ROOT)/ace/os_include/sys/os_time.h \
+ $(ACE_ROOT)/ace/os_include/sys/os_select.h \
+ $(ACE_ROOT)/ace/Basic_Types.i \
+ $(ACE_ROOT)/ace/os_include/netinet/os_in.h \
+ $(ACE_ROOT)/ace/os_include/sys/os_socket.h \
+ $(ACE_ROOT)/ace/os_include/sys/os_uio.h \
+ $(ACE_ROOT)/ace/Sock_Connect.i \
+ $(ACE_ROOT)/ace/Addr.h \
+ $(ACE_ROOT)/ace/Addr.i \
+ $(ACE_ROOT)/ace/INET_Addr.i \
+ $(ACE_ROOT)/ace/OS.h \
+ $(ACE_ROOT)/ace/OS_Dirent.h \
+ $(ACE_ROOT)/ace/OS_Errno.h \
+ $(ACE_ROOT)/ace/os_include/os_errno.h \
+ $(ACE_ROOT)/ace/OS_Errno.inl \
+ $(ACE_ROOT)/ace/os_include/os_dirent.h \
+ $(ACE_ROOT)/ace/OS_Dirent.inl \
+ $(ACE_ROOT)/ace/OS_String.h \
+ $(ACE_ROOT)/ace/OS_String.inl \
+ $(ACE_ROOT)/ace/os_include/os_string.h \
+ $(ACE_ROOT)/ace/os_include/os_strings.h \
+ $(ACE_ROOT)/ace/os_include/os_ctype.h \
+ $(ACE_ROOT)/ace/OS_Memory.h \
+ $(ACE_ROOT)/ace/OS_Memory.inl \
+ $(ACE_ROOT)/ace/OS_TLI.h \
+ $(ACE_ROOT)/ace/OS_TLI.inl \
+ $(ACE_ROOT)/ace/os_include/os_dlfcn.h \
+ $(ACE_ROOT)/ace/os_include/sys/os_mman.h \
+ $(ACE_ROOT)/ace/os_include/os_netdb.h \
+ $(ACE_ROOT)/ace/os_include/net/os_if.h \
+ $(ACE_ROOT)/ace/os_include/sys/os_sem.h \
+ $(ACE_ROOT)/ace/os_include/sys/os_ipc.h \
+ $(ACE_ROOT)/ace/Time_Value.h \
+ $(ACE_ROOT)/ace/Time_Value.inl \
+ $(ACE_ROOT)/ace/Default_Constants.h \
+ $(ACE_ROOT)/ace/Min_Max.h \
+ $(ACE_ROOT)/ace/os_include/os_pthread.h \
+ $(ACE_ROOT)/ace/os_include/os_assert.h \
+ $(ACE_ROOT)/ace/os_include/os_fcntl.h \
+ $(ACE_ROOT)/ace/os_include/sys/os_stat.h \
+ $(ACE_ROOT)/ace/iosfwd.h \
+ $(ACE_ROOT)/ace/os_include/arpa/os_inet.h \
+ $(ACE_ROOT)/ace/os_include/netinet/os_tcp.h \
+ $(ACE_ROOT)/ace/os_include/sys/os_shm.h \
+ $(ACE_ROOT)/ace/os_include/os_pwd.h \
+ $(ACE_ROOT)/ace/os_include/os_stropts.h \
+ $(ACE_ROOT)/ace/os_include/os_termios.h \
+ $(ACE_ROOT)/ace/os_include/os_aio.h \
+ $(ACE_ROOT)/ace/os_include/sys/os_un.h \
+ $(ACE_ROOT)/ace/os_include/os_poll.h \
+ $(ACE_ROOT)/ace/os_include/sys/os_msg.h \
+ $(ACE_ROOT)/ace/os_include/sys/os_utsname.h \
+ $(ACE_ROOT)/ace/os_include/os_syslog.h \
+ $(ACE_ROOT)/ace/OS.i Export.hpp \
+ $(ACE_ROOT)/ace/Synch.h \
+ $(ACE_ROOT)/ace/Auto_Event.h \
+ $(ACE_ROOT)/ace/Event.h \
+ $(ACE_ROOT)/ace/Event.inl \
+ $(ACE_ROOT)/ace/Auto_Event.inl \
+ $(ACE_ROOT)/ace/Barrier.h \
+ $(ACE_ROOT)/ace/Condition_Thread_Mutex.h \
+ $(ACE_ROOT)/ace/Thread_Mutex.h \
+ $(ACE_ROOT)/ace/Thread_Mutex.inl \
+ $(ACE_ROOT)/ace/Condition_Thread_Mutex.inl \
+ $(ACE_ROOT)/ace/Barrier.inl \
+ $(ACE_ROOT)/ace/Condition_Recursive_Thread_Mutex.h \
+ $(ACE_ROOT)/ace/Recursive_Thread_Mutex.h \
+ $(ACE_ROOT)/ace/Recursive_Thread_Mutex.inl \
+ $(ACE_ROOT)/ace/Condition_Recursive_Thread_Mutex.inl \
+ $(ACE_ROOT)/ace/Lock.h \
+ $(ACE_ROOT)/ace/Lock.inl \
+ $(ACE_ROOT)/ace/Manual_Event.h \
+ $(ACE_ROOT)/ace/Manual_Event.inl \
+ $(ACE_ROOT)/ace/Mutex.h \
+ $(ACE_ROOT)/ace/Mutex.inl \
+ $(ACE_ROOT)/ace/Null_Barrier.h \
+ $(ACE_ROOT)/ace/Null_Condition.h \
+ $(ACE_ROOT)/ace/Null_Mutex.h \
+ $(ACE_ROOT)/ace/Null_Semaphore.h \
+ $(ACE_ROOT)/ace/RW_Mutex.h \
+ $(ACE_ROOT)/ace/RW_Mutex.inl \
+ $(ACE_ROOT)/ace/RW_Thread_Mutex.h \
+ $(ACE_ROOT)/ace/RW_Thread_Mutex.inl \
+ $(ACE_ROOT)/ace/Semaphore.h \
+ $(ACE_ROOT)/ace/Semaphore.inl \
+ $(ACE_ROOT)/ace/Thread_Semaphore.h \
+ $(ACE_ROOT)/ace/Thread_Semaphore.inl \
+ $(ACE_ROOT)/ace/TSS_Adapter.h \
+ $(ACE_ROOT)/ace/TSS_Adapter.inl \
+ $(ACE_ROOT)/ace/Synch.i \
+ $(ACE_ROOT)/ace/Synch_T.h \
+ $(ACE_ROOT)/ace/Lock_Adapter_T.h \
+ $(ACE_ROOT)/ace/Lock_Adapter_T.inl \
+ $(ACE_ROOT)/ace/Lock_Adapter_T.cpp \
+ $(ACE_ROOT)/ace/Reverse_Lock_T.h \
+ $(ACE_ROOT)/ace/Reverse_Lock_T.inl \
+ $(ACE_ROOT)/ace/Reverse_Lock_T.cpp \
+ $(ACE_ROOT)/ace/Guard_T.h \
+ $(ACE_ROOT)/ace/Guard_T.inl \
+ $(ACE_ROOT)/ace/Guard_T.cpp \
+ $(ACE_ROOT)/ace/TSS_T.h \
+ $(ACE_ROOT)/ace/TSS_T.inl \
+ $(ACE_ROOT)/ace/TSS_T.cpp \
+ $(ACE_ROOT)/ace/Thread.h \
+ $(ACE_ROOT)/ace/Thread_Adapter.h \
+ $(ACE_ROOT)/ace/Base_Thread_Adapter.h \
+ $(ACE_ROOT)/ace/OS_Log_Msg_Attributes.h \
+ $(ACE_ROOT)/ace/OS_Log_Msg_Attributes.inl \
+ $(ACE_ROOT)/ace/Base_Thread_Adapter.inl \
+ $(ACE_ROOT)/ace/Thread_Adapter.inl \
+ $(ACE_ROOT)/ace/Thread.i \
+ $(ACE_ROOT)/ace/Log_Msg.h \
+ $(ACE_ROOT)/ace/Log_Priority.h \
+ $(ACE_ROOT)/ace/Condition_T.h \
+ $(ACE_ROOT)/ace/Condition_T.inl \
+ $(ACE_ROOT)/ace/Condition_T.cpp \
+ $(ACE_ROOT)/ace/Synch_Traits.h \
+ $(ACE_ROOT)/ace/Synch_T.i \
+ $(ACE_ROOT)/ace/Synch_T.cpp \
+ $(ACE_ROOT)/ace/SOCK_Dgram_Mcast.h \
+ $(ACE_ROOT)/ace/SOCK_Dgram.h \
+ $(ACE_ROOT)/ace/SOCK.h \
+ $(ACE_ROOT)/ace/IPC_SAP.h \
+ $(ACE_ROOT)/ace/Flag_Manip.h \
+ $(ACE_ROOT)/ace/Flag_Manip.i \
+ $(ACE_ROOT)/ace/IPC_SAP.i \
+ $(ACE_ROOT)/ace/SOCK.i \
+ $(ACE_ROOT)/ace/SOCK_Dgram.i \
+ $(ACE_ROOT)/ace/SOCK_Dgram_Mcast.i \
+ Messaging.hpp \
+ $(ACE_ROOT)/ace/Refcounted_Auto_Ptr.h \
+ $(ACE_ROOT)/ace/Refcounted_Auto_Ptr.i \
+ MTQueue.hpp $(ACE_ROOT)/ace/Unbounded_Set.h \
+ $(ACE_ROOT)/ace/Node.h \
+ $(ACE_ROOT)/ace/Node.cpp \
+ $(ACE_ROOT)/ace/Unbounded_Set.inl \
+ $(ACE_ROOT)/ace/Unbounded_Set.cpp \
+ $(ACE_ROOT)/ace/Malloc_Base.h \
+ $(ACE_ROOT)/ace/Unbounded_Queue.h \
+ $(ACE_ROOT)/ace/Unbounded_Queue.inl \
+ $(ACE_ROOT)/ace/Unbounded_Queue.cpp \
+ Protocol.hpp LinkListener.hpp FaultDetector.hpp \
+ TransactionController.hpp
+
+.obj/Protocol.o .obj/Protocol.so .shobj/Protocol.o .shobj/Protocol.so: Protocol.cpp Protocol.hpp
+
+# IF YOU PUT ANYTHING HERE IT WILL GO AWAY
diff --git a/ace/TMCast/Messaging.hpp b/ace/TMCast/Messaging.hpp
new file mode 100644
index 00000000000..6a1000c3265
--- /dev/null
+++ b/ace/TMCast/Messaging.hpp
@@ -0,0 +1,54 @@
+// file : TMCast/Messaging.hpp
+// author : Boris Kolpackov <boris@dre.vanderbilt.edu>
+// cvs-id : $Id$
+
+#ifndef TMCAST_MESSAGING_HPP
+#define TMCAST_MESSAGING_HPP
+
+#include <ace/Synch.h>
+#include <ace/Refcounted_Auto_Ptr.h>
+
+#include "MTQueue.hpp"
+
+namespace TMCast
+{
+ class Message
+ {
+ public:
+ virtual
+ ~Message () {}
+ };
+
+ typedef
+ ACE_Refcounted_Auto_Ptr<Message, ACE_Null_Mutex>
+ MessagePtr;
+
+ typedef
+ MTQueue<MessagePtr, ACE_Thread_Mutex, ACE_Condition<ACE_Thread_Mutex> >
+ MessageQueue;
+
+ struct MessageQueueAutoLock
+ {
+ MessageQueueAutoLock (MessageQueue& q)
+ : q_ (q)
+ {
+ q_.lock ();
+ }
+
+ void
+ unlock ()
+ {
+ q_.unlock ();
+ }
+
+ ~MessageQueueAutoLock ()
+ {
+ q_.unlock ();
+ }
+
+ private:
+ MessageQueue& q_;
+ };
+}
+
+#endif // TMCAST_MESSAGING_HPP
diff --git a/ace/TMCast/Protocol.cpp b/ace/TMCast/Protocol.cpp
new file mode 100644
index 00000000000..78563281694
--- /dev/null
+++ b/ace/TMCast/Protocol.cpp
@@ -0,0 +1,31 @@
+// file : TMCast/Protocol.cpp
+// author : Boris Kolpackov <boris@dre.vanderbilt.edu>
+// cvs-id : $Id$
+
+#include "Protocol.hpp"
+
+namespace TMCast
+{
+ namespace Protocol
+ {
+ namespace
+ {
+ char const* labels[] = {
+ "NONE", "BEGIN", "COMMIT", "ABORT", "COMMITED", "ABORTED"};
+ }
+
+ /*
+ std::string
+ tslabel (Protocol::TransactionStatus s)
+ {
+ return labels[s];
+ }
+
+ std::ostream&
+ operator << (std::ostream& o, Transaction const& t)
+ {
+ return o << "{" << t.id << "; " << tslabel (t.status) << "}";
+ }
+ */
+ }
+}
diff --git a/ace/TMCast/Protocol.hpp b/ace/TMCast/Protocol.hpp
new file mode 100644
index 00000000000..d5ae6a50cd6
--- /dev/null
+++ b/ace/TMCast/Protocol.hpp
@@ -0,0 +1,107 @@
+// file : TMCast/Protocol.hpp
+// author : Boris Kolpackov <boris@dre.vanderbilt.edu>
+// cvs-id : $Id$
+
+#ifndef TMCAST_PROTOCOL_HPP
+#define TMCAST_PROTOCOL_HPP
+
+namespace TMCast
+{
+ namespace Protocol
+ {
+ //
+ //
+ //
+ unsigned long const MEMBER_ID_LENGTH = 38;
+
+ struct MemberId
+ {
+ char id[MEMBER_ID_LENGTH];
+ /*
+ unsigned long ip;
+ unsigned short port;
+ */
+ };
+
+ //
+ //
+ //
+ typedef unsigned short TransactionId;
+
+
+
+ typedef unsigned char TransactionStatus;
+
+ TransactionStatus const TS_BEGIN = 1;
+ TransactionStatus const TS_COMMIT = 2;
+ TransactionStatus const TS_ABORT = 3;
+ TransactionStatus const TS_COMMITED = 4;
+ TransactionStatus const TS_ABORTED = 5;
+
+ struct Transaction
+ {
+ TransactionId id;
+ TransactionStatus status;
+ };
+
+ // Transaction List (TL)
+
+ // unsigned long const TL_LENGTH = 1;
+
+ // typedef Transaction TransactionList[TL_LENGTH];
+
+
+ //
+ //
+ //
+ struct MessageHeader
+ {
+ unsigned long length;
+
+ unsigned long check_sum;
+
+ MemberId member_id;
+
+ Transaction current;
+
+ //TransactionList transaction_list;
+ };
+
+
+ //
+ //
+ //
+
+ unsigned long const MAX_MESSAGE_SIZE = 768;
+
+ unsigned long const
+ MAX_PAYLOAD_SIZE = MAX_MESSAGE_SIZE - sizeof (MessageHeader);
+
+ // Protocol timing
+ //
+ //
+
+ unsigned long const SYNC_PERIOD = 30000; // in mks
+
+ unsigned short const VOTING_FRAME = 4; // in SYNC_PERIOD's
+ unsigned short const SEPARATION_FRAME = 5; // in SYNC_PERIOD's
+
+ // FATAL_SILENCE_FRAME in SYNC_PERIOD's
+ // Generally it's a good idea to set it to < VOTING_FRAME + SEPARATION_FRAME
+ //
+
+ short const FATAL_SILENCE_FRAME = VOTING_FRAME + SEPARATION_FRAME - 2;
+
+ // short const FATAL_SILENCE_FRAME = 10000;
+
+ // Helpers
+
+ // std::string
+ // tslabel (Protocol::TransactionStatus s);
+
+ // std::ostream&
+ // operator << (std::ostream& o, Transaction const& t);
+ }
+}
+
+#endif // TMCAST_PROTOCOL_HPP
diff --git a/ace/TMCast/README b/ace/TMCast/README
new file mode 100644
index 00000000000..d061c7b2cba
--- /dev/null
+++ b/ace/TMCast/README
@@ -0,0 +1,58 @@
+
+
+Architecture
+
+TMCast (stands for Transaction MultiCast) is an implementation of a
+transactional multicast protocol. In essence, the idea is to represent
+message delivery to members of a multicast group as a transaction -
+atomic, consistent and isolated action. Multicast transaction can
+be viewed as an atomic transition of group members to a new state.
+If we define Mo as a set of operational (non-faulty) members of the
+group, Mf as a set of faulty members of the group, Ma as a set of
+members that view transition Tn as aborted and Mc as a set of members
+that view transition Tn as committed, then this atomic transition Tn
+can be described as one of the following:
+
+Mo(Tn-1) = Ma(T) U Mf(T)
+Mo(Tn-1) = Mc(T) U Mf(T)
+
+Or, in other words, after transaction T has been committed (aborted),
+all operational (before transaction T) members are either in
+committed (aborted) or failed state.
+
+Thus, for each member of the group, outcome of the transaction can
+be commit, abort or member failure. It is important for a member
+to exhibit a failfast (error latency is less than transaction cycle)
+behavior. Or, in other words, if the member transitioned into a wrong
+state, it is guaranteed to fail instead of delivering wrong result.
+
+In order to achieve such error detection in decentralized environment,
+certain limitations should be imposed. One of the most user-visible
+limitation is the fact that the lifetime of the group with only
+one member is very short. This is because there is not way for a
+member to distinguish "no members yet" case from "my link to the
+group is down". In such situation, the member assumes the latter case.
+There is also a military saying that puts it quite nicely: two is one,
+one is nothing.
+
+
+State of Implementation
+
+Current implementation is in prototypical stage. The following parts
+are not implemented or still under development:
+
+* Handling of network partitioning (TODO)
+
+* Redundant network support (TODO)
+
+* Member failure detection (partial implementation)
+
+
+Examples
+
+There is a simple example available in examples/TMCast/Member with
+corresponding README.
+
+
+--
+Boris Kolpackov <boris@dre.vanderbilt.edu> \ No newline at end of file
diff --git a/ace/TMCast/TransactionController.hpp b/ace/TMCast/TransactionController.hpp
new file mode 100644
index 00000000000..66c924faaf1
--- /dev/null
+++ b/ace/TMCast/TransactionController.hpp
@@ -0,0 +1,384 @@
+// file : TMCast/TransactionController.hpp
+// author : Boris Kolpackov <boris@dre.vanderbilt.edu>
+// cvs-id : $Id$
+
+#include <ace/Synch.h>
+#include <ace/Refcounted_Auto_Ptr.h>
+
+#include "Protocol.hpp"
+#include "Messaging.hpp"
+
+namespace TMCast
+{
+
+ // Messages
+ //
+ //
+ class Send : public virtual Message
+ {
+ public:
+ Send (void const* msg, size_t size)
+ : size_ (size)
+ {
+ ACE_OS::memcpy (payload_, msg, size_);
+ }
+
+ void const*
+ payload () const
+ {
+ return payload_;
+ }
+
+ size_t
+ size () const
+ {
+ return size_;
+ }
+
+ private:
+ size_t size_;
+ char payload_[Protocol::MAX_PAYLOAD_SIZE];
+ };
+
+ typedef
+ ACE_Refcounted_Auto_Ptr<Send, ACE_Null_Mutex>
+ SendPtr;
+
+
+ class Recv : public virtual Message
+ {
+ public:
+ Recv (void const* msg, size_t size)
+ : size_ (size)
+ {
+ ACE_OS::memcpy (payload_, msg, size_);
+ }
+
+ void const*
+ payload () const
+ {
+ return payload_;
+ }
+
+ size_t
+ size () const
+ {
+ return size_;
+ }
+
+ private:
+ size_t size_;
+ char payload_[Protocol::MAX_PAYLOAD_SIZE];
+ };
+
+ typedef
+ ACE_Refcounted_Auto_Ptr<Recv, ACE_Null_Mutex>
+ RecvPtr;
+
+ class Aborted : public virtual Message {};
+
+ class Commited : public virtual Message {};
+
+
+ //
+ //
+ //
+ class TransactionController
+ {
+ public:
+ TransactionController (MessageQueue& in,
+ MessageQueue& send_out,
+ MessageQueue& recv_out)
+ : trace_ (false),
+ voting_duration_ (0),
+ separation_duration_ (0),
+ in_ (in),
+ send_out_ (send_out),
+ recv_out_ (recv_out)
+ {
+ current_.id = 0;
+ current_.status = Protocol::TS_COMMITED;
+ }
+
+ public:
+ class Failure {};
+
+
+ void
+ outsync (Protocol::Transaction& c, void* payload, size_t& size)
+ {
+ if (current_.status == Protocol::TS_COMMIT ||
+ current_.status == Protocol::TS_ABORT)
+ {
+ if (++voting_duration_ >= Protocol::VOTING_FRAME)
+ {
+ // end of voting frame
+
+ if (current_.status == Protocol::TS_COMMIT)
+ {
+ {
+ if (initiated_)
+ {
+ MessageQueueAutoLock lock (send_out_);
+ send_out_.push (MessagePtr (new Commited));
+ }
+ else // joined transaction
+ {
+ MessageQueueAutoLock lock (recv_out_);
+ recv_out_.push (MessagePtr (recv_.release ()));
+ recv_ = RecvPtr ();
+ }
+ }
+
+ current_.status = Protocol::TS_COMMITED;
+
+ // if (trace_) cerr << "commited transaction with id "
+ // << current_.id << endl;
+ }
+ else // TS_ABORT
+ {
+ if (initiated_)
+ {
+ MessageQueueAutoLock lock (send_out_);
+ send_out_.push (MessagePtr (new Aborted));
+ }
+ else
+ {
+ // free revc_ buffer if necessary
+ //
+ if (recv_.get ()) recv_ = RecvPtr ();
+ }
+
+
+ current_.status = Protocol::TS_ABORTED;
+
+ // if (trace_) cerr << "aborted transaction with id "
+ // << current_.id << endl;
+ }
+
+ // start transaction separation frame (counts down)
+ // +1 because it will be decremented on this iteration
+ separation_duration_ = Protocol::SEPARATION_FRAME + 1;
+ }
+ }
+
+ // Set current outsync info
+
+ c.id = current_.id;
+ c.status = current_.status;
+
+
+ // Do some post-processing
+
+ switch (current_.status)
+ {
+ case Protocol::TS_COMMITED:
+ case Protocol::TS_ABORTED:
+ {
+ if (separation_duration_ > 0) --separation_duration_;
+ break;
+ }
+ case Protocol::TS_BEGIN:
+ {
+ // transfer payload
+
+ size = send_->size ();
+ memcpy (payload, send_->payload (), size);
+
+ send_ = SendPtr ();
+
+ // get redy to vote for 'commit'
+
+ current_.status = Protocol::TS_COMMIT;
+ voting_duration_ = 0;
+ }
+ }
+ }
+
+ void
+ current_transaction (Protocol::Transaction const& t,
+ void const* payload,
+ size_t size)
+ {
+ Protocol::TransactionId& id = current_.id;
+ Protocol::TransactionStatus& s = current_.status;
+
+ if (id == 0 && t.id != 0) // catch up
+ {
+ switch (t.status)
+ {
+ case Protocol::TS_BEGIN:
+ case Protocol::TS_COMMIT:
+ case Protocol::TS_ABORT:
+ {
+ id = t.id - 1;
+ s = Protocol::TS_COMMITED;
+ break;
+ }
+ case Protocol::TS_ABORTED:
+ case Protocol::TS_COMMITED:
+ {
+ id = t.id;
+ s = t.status;
+ break;
+ }
+ }
+
+ // if (trace_) cerr << "caught up with id " << id << endl;
+ }
+
+ bool stable (s == Protocol::TS_COMMITED || s == Protocol::TS_ABORTED);
+
+ switch (t.status)
+ {
+ case Protocol::TS_BEGIN:
+ {
+ if (!stable || t.id != id + 1)
+ {
+ // Transaction is in progress or hole in transaction id's
+
+ // cerr << "unexpected request to join " << t
+ // << " while on " << current_ << endl;
+
+ // if (!stable) cerr << "voting progress is " << voting_duration_
+ // << "/" << Protocol::VOTING_FRAME << endl;
+
+ if (t.id == id) // collision
+ {
+ if (!stable && s != Protocol::TS_ABORT)
+ {
+ // abort both
+ // cerr << "aborting both transactions" << endl;
+
+ s = Protocol::TS_ABORT;
+ voting_duration_ = 0; //@@ reset voting frame
+ }
+ }
+ else
+ {
+ // @@ delicate case. need to think more
+
+ // cerr << "Declaring node failed." << endl;
+ throw Failure ();
+ }
+ }
+ else
+ {
+ // join the transaction
+
+ initiated_ = false;
+
+ recv_ = RecvPtr (new Recv (payload, size));
+
+ id = t.id;
+ s = Protocol::TS_COMMIT;
+ voting_duration_ = 0;
+
+ // if (trace_) cerr << "joining-for-commit transaction with id "
+ // << id << endl;
+ }
+ break;
+ }
+ case Protocol::TS_COMMIT:
+ {
+ if (stable && id == t.id - 1)
+ {
+ // not begin and and we haven't joined
+
+ // join for abort
+
+ initiated_ = false;
+
+ current_.id = t.id;
+ current_.status = Protocol::TS_ABORT;
+ voting_duration_ = 0;
+
+ // if (trace_) cerr << "joining-for-abort transaction with id "
+ // << current_.id << endl;
+ }
+ break;
+ }
+ case Protocol::TS_ABORT:
+ {
+ if ((!stable && id == t.id && s == Protocol::TS_COMMIT) ||
+ (stable && id == t.id - 1)) // abort current || new transaction
+ {
+ // if (trace_) cerr << "voting-for-abort on transaction with id "
+ // << current_.id << endl;
+
+ id = t.id;
+ s = Protocol::TS_ABORT;
+
+ voting_duration_ = 0; //@@ reseting voting_duration_
+ }
+ else
+ {
+ }
+
+ break;
+ }
+ case Protocol::TS_ABORTED:
+ case Protocol::TS_COMMITED:
+ {
+ // nothing for now
+ break;
+ }
+ }
+ }
+
+ void
+ api ()
+ {
+ if ((current_.status == Protocol::TS_COMMITED ||
+ current_.status == Protocol::TS_ABORTED) &&
+ separation_duration_ == 0) // no transaction in progress
+ {
+ // start new transaction
+
+ // Note that in_ is already locked by Scheduler
+
+ MessagePtr m (in_.front ());
+ in_.pop ();
+
+ if (typeid (*m) == typeid (Send))
+ {
+ send_ = SendPtr (dynamic_cast<Send*> (m.release ()));
+ }
+ else
+ {
+ // cerr << "Expecting Send but received " << typeid (*m).name ()
+ // << endl;
+
+ ::abort ();
+ }
+
+ current_.id++;
+ current_.status = Protocol::TS_BEGIN;
+
+ initiated_ = true;
+
+ // if (trace_) cerr << "starting transaction with id " << current_.id
+ // << endl;
+ }
+ }
+
+ private:
+ typedef ACE_Guard<ACE_Thread_Mutex> AutoLock;
+
+ bool trace_;
+
+ Protocol::Transaction current_;
+
+ bool initiated_;
+
+ unsigned short voting_duration_;
+ unsigned short separation_duration_;
+
+ MessageQueue& in_;
+ MessageQueue& send_out_;
+ MessageQueue& recv_out_;
+
+ SendPtr send_;
+ RecvPtr recv_;
+ };
+}
diff --git a/examples/TMCast/Makefile b/examples/TMCast/Makefile
new file mode 100644
index 00000000000..5a39f26db35
--- /dev/null
+++ b/examples/TMCast/Makefile
@@ -0,0 +1,21 @@
+#----------------------------------------------------------------------------
+#
+# $Id$
+#
+#----------------------------------------------------------------------------
+
+#----------------------------------------------------------------------------
+# Local macros
+#----------------------------------------------------------------------------
+
+DIRS = Member
+
+#----------------------------------------------------------------------------
+# Include macros and targets
+#----------------------------------------------------------------------------
+
+include $(ACE_ROOT)/include/makeinclude/wrapper_macros.GNU
+include $(ACE_ROOT)/include/makeinclude/macros.GNU
+include $(ACE_ROOT)/include/makeinclude/rules.common.GNU
+include $(ACE_ROOT)/include/makeinclude/rules.nested.GNU
+include $(ACE_ROOT)/include/makeinclude/rules.nolocal.GNU
diff --git a/examples/TMCast/Member/Makefile b/examples/TMCast/Member/Makefile
new file mode 100644
index 00000000000..babfc7793b9
--- /dev/null
+++ b/examples/TMCast/Member/Makefile
@@ -0,0 +1,124 @@
+#----------------------------------------------------------------------------
+#
+# $Id$
+#
+#----------------------------------------------------------------------------
+
+#----------------------------------------------------------------------------
+# Local macros
+#----------------------------------------------------------------------------
+
+BIN = member
+
+PSRC=$(addsuffix .cpp,$(BIN))
+LDLIBS = -lTMCast
+
+#----------------------------------------------------------------------------
+# Include macros and targets
+#----------------------------------------------------------------------------
+
+include $(ACE_ROOT)/include/makeinclude/wrapper_macros.GNU
+include $(ACE_ROOT)/include/makeinclude/macros.GNU
+include $(ACE_ROOT)/include/makeinclude/rules.common.GNU
+include $(ACE_ROOT)/include/makeinclude/rules.nonested.GNU
+include $(ACE_ROOT)/include/makeinclude/rules.bin.GNU
+include $(ACE_ROOT)/include/makeinclude/rules.local.GNU
+
+#----------------------------------------------------------------------------
+# Local targets
+#----------------------------------------------------------------------------
+
+#----------------------------------------------------------------------------
+# Dependencies
+#----------------------------------------------------------------------------
+# DO NOT DELETE THIS LINE -- g++dep uses it.
+# DO NOT PUT ANYTHING AFTER THIS LINE, IT WILL GO AWAY.
+
+
+.obj/member.o .obj/member.so .shobj/member.o .shobj/member.so: member.cpp \
+ $(ACE_ROOT)/ace/TMCast/Group.hpp \
+ $(ACE_ROOT)/ace/Auto_Ptr.h \
+ $(ACE_ROOT)/ace/pre.h \
+ $(ACE_ROOT)/ace/post.h \
+ $(ACE_ROOT)/ace/ace_wchar.h \
+ $(ACE_ROOT)/ace/ace_wchar.inl \
+ $(ACE_ROOT)/ace/Auto_Ptr.i \
+ $(ACE_ROOT)/ace/Global_Macros.h \
+ $(ACE_ROOT)/ace/OS_Export.h \
+ $(ACE_ROOT)/ace/Auto_Ptr.cpp \
+ $(ACE_ROOT)/ace/INET_Addr.h \
+ $(ACE_ROOT)/ace/Sock_Connect.h \
+ $(ACE_ROOT)/ace/ACE_export.h \
+ $(ACE_ROOT)/ace/Basic_Types.h \
+ $(ACE_ROOT)/ace/os_include/os_limits.h \
+ $(ACE_ROOT)/ace/os_include/os_unistd.h \
+ $(ACE_ROOT)/ace/os_include/sys/os_types.h \
+ $(ACE_ROOT)/ace/os_include/os_stddef.h \
+ $(ACE_ROOT)/ace/os_include/os_inttypes.h \
+ $(ACE_ROOT)/ace/os_include/os_stdint.h \
+ $(ACE_ROOT)/ace/os_include/os_stdio.h \
+ $(ACE_ROOT)/ace/os_include/os_stdarg.h \
+ $(ACE_ROOT)/ace/os_include/os_float.h \
+ $(ACE_ROOT)/ace/os_include/os_stdlib.h \
+ $(ACE_ROOT)/ace/os_include/sys/os_wait.h \
+ $(ACE_ROOT)/ace/os_include/os_signal.h \
+ $(ACE_ROOT)/ace/os_include/os_time.h \
+ $(ACE_ROOT)/ace/os_include/os_ucontext.h \
+ $(ACE_ROOT)/ace/os_include/sys/os_resource.h \
+ $(ACE_ROOT)/ace/os_include/sys/os_time.h \
+ $(ACE_ROOT)/ace/os_include/sys/os_select.h \
+ $(ACE_ROOT)/ace/Basic_Types.i \
+ $(ACE_ROOT)/ace/os_include/netinet/os_in.h \
+ $(ACE_ROOT)/ace/os_include/sys/os_socket.h \
+ $(ACE_ROOT)/ace/os_include/sys/os_uio.h \
+ $(ACE_ROOT)/ace/Sock_Connect.i \
+ $(ACE_ROOT)/ace/Addr.h \
+ $(ACE_ROOT)/ace/Addr.i \
+ $(ACE_ROOT)/ace/INET_Addr.i \
+ $(ACE_ROOT)/ace/OS.h \
+ $(ACE_ROOT)/ace/OS_Dirent.h \
+ $(ACE_ROOT)/ace/OS_Errno.h \
+ $(ACE_ROOT)/ace/os_include/os_errno.h \
+ $(ACE_ROOT)/ace/OS_Errno.inl \
+ $(ACE_ROOT)/ace/os_include/os_dirent.h \
+ $(ACE_ROOT)/ace/OS_Dirent.inl \
+ $(ACE_ROOT)/ace/OS_String.h \
+ $(ACE_ROOT)/ace/OS_String.inl \
+ $(ACE_ROOT)/ace/os_include/os_string.h \
+ $(ACE_ROOT)/ace/os_include/os_strings.h \
+ $(ACE_ROOT)/ace/os_include/os_ctype.h \
+ $(ACE_ROOT)/ace/OS_Memory.h \
+ $(ACE_ROOT)/ace/OS_Memory.inl \
+ $(ACE_ROOT)/ace/OS_TLI.h \
+ $(ACE_ROOT)/ace/OS_TLI.inl \
+ $(ACE_ROOT)/ace/os_include/os_dlfcn.h \
+ $(ACE_ROOT)/ace/os_include/sys/os_mman.h \
+ $(ACE_ROOT)/ace/os_include/os_netdb.h \
+ $(ACE_ROOT)/ace/os_include/net/os_if.h \
+ $(ACE_ROOT)/ace/os_include/sys/os_sem.h \
+ $(ACE_ROOT)/ace/os_include/sys/os_ipc.h \
+ $(ACE_ROOT)/ace/Time_Value.h \
+ $(ACE_ROOT)/ace/Time_Value.inl \
+ $(ACE_ROOT)/ace/Default_Constants.h \
+ $(ACE_ROOT)/ace/Min_Max.h \
+ $(ACE_ROOT)/ace/os_include/os_pthread.h \
+ $(ACE_ROOT)/ace/os_include/os_assert.h \
+ $(ACE_ROOT)/ace/os_include/os_fcntl.h \
+ $(ACE_ROOT)/ace/os_include/sys/os_stat.h \
+ $(ACE_ROOT)/ace/iosfwd.h \
+ $(ACE_ROOT)/ace/os_include/arpa/os_inet.h \
+ $(ACE_ROOT)/ace/os_include/netinet/os_tcp.h \
+ $(ACE_ROOT)/ace/os_include/sys/os_shm.h \
+ $(ACE_ROOT)/ace/os_include/os_pwd.h \
+ $(ACE_ROOT)/ace/os_include/os_stropts.h \
+ $(ACE_ROOT)/ace/os_include/os_termios.h \
+ $(ACE_ROOT)/ace/os_include/os_aio.h \
+ $(ACE_ROOT)/ace/os_include/sys/os_un.h \
+ $(ACE_ROOT)/ace/os_include/os_poll.h \
+ $(ACE_ROOT)/ace/os_include/sys/os_msg.h \
+ $(ACE_ROOT)/ace/os_include/sys/os_utsname.h \
+ $(ACE_ROOT)/ace/os_include/os_syslog.h \
+ $(ACE_ROOT)/ace/OS.i \
+ $(ACE_ROOT)/ace/TMCast/Export.hpp
+
+# IF YOU PUT ANYTHING HERE IT WILL GO AWAY
diff --git a/examples/TMCast/Member/README b/examples/TMCast/Member/README
new file mode 100644
index 00000000000..6a62a107a77
--- /dev/null
+++ b/examples/TMCast/Member/README
@@ -0,0 +1,36 @@
+Member example shows how you can build a simple multicast group
+using transactional multicast (TMCast). Each member can be either
+a sender or a receiver.
+
+The sender sends small messages to the multicast group with a random
+wait period in [0, 1] second range. The receiver is simply receiving
+those messages and prints them out.
+
+To start the sender you can execute something like this:
+
+$ ./member s sender-1 239.255.0.1:10000
+
+Here the first argument ('s') indicates that new member will be
+a sender. The second argument ('sender-1') is an id of the new
+member (each member of the group should have a unique id). And
+the third argument ('239.255.0.1:10000') specifies IPv4 multicast
+address and port (you can choose you own).
+
+To start the receiver you can execute similar command:
+
+$ ./member r receiver-1 239.255.0.1:10000
+
+After you have started both the receiver and the sender you
+should see a sequence of messages printed by the receiver.
+
+Note, since the group can exist with only one member for a
+very short period of time you should start first two members
+virtually at the same time. See TMCast documentation for more
+information about why it behaves this way.
+
+You may want to add more than one sender to the group if you
+want to see how TMCast operates in a totally-ordered mode.
+
+
+--
+Boris Kolpackov <boris@dre.vanderbilt.edu>
diff --git a/examples/TMCast/Member/member.cpp b/examples/TMCast/Member/member.cpp
new file mode 100644
index 00000000000..1c07c6c8532
--- /dev/null
+++ b/examples/TMCast/Member/member.cpp
@@ -0,0 +1,80 @@
+// file : TMCast/Member/member.cpp
+// author : Boris Kolpackov <boris@dre.vanderbilt.edu>
+// cvs-id : $Id$
+
+#include <ace/Log_Msg.h>
+
+#include "ace/TMCast/Group.hpp"
+
+class Args {};
+
+int
+main (int argc, char* argv[])
+{
+ try
+ {
+ if (argc < 4) throw Args ();
+
+ bool receiver (true);
+
+ if (argv[1][0] == 'r') receiver = true;
+ else if (argv[1][0] == 's') receiver = false;
+ else throw Args ();
+
+ if (!receiver) ACE_OS::srand (ACE_OS::time ());
+
+ ACE_INET_Addr address (argv[3]);
+
+ TMCast::Group group (address, argv[2]);
+
+ if (receiver)
+ {
+ for (char buffer[256];;)
+ {
+ group.recv (buffer, sizeof (buffer));
+
+ ACE_DEBUG ((LM_DEBUG, "%s\n", buffer));
+ }
+ }
+ else
+ {
+ char buffer[256];
+
+ for (unsigned long i = 0; i < 1000; i++)
+ {
+ // Sleep some random time around 1 sec.
+
+ unsigned long t = (1000000ULL * ACE_OS::rand ()) / RAND_MAX;
+
+ // ACE_DEBUG ((LM_DEBUG, "sleeping for %u\n", t));
+
+ ACE_OS::sleep (ACE_Time_Value (0, t));
+
+ ACE_OS::sprintf (buffer, "message # %lu", i);
+
+ try
+ {
+ group.send (buffer, ACE_OS::strlen (buffer) + 1);
+ }
+ catch (TMCast::Group::Aborted const&)
+ {
+ ACE_ERROR ((LM_ERROR, "%s has been aborted\n", buffer));
+ }
+ }
+ }
+ }
+ catch (Args const&)
+ {
+ ACE_ERROR ((LM_ERROR,
+ "Usage: member {r|s} <id> <IPv4 mcast address>:<port>\n"));
+ }
+ catch (TMCast::Group::Failed const&)
+ {
+ ACE_ERROR ((LM_ERROR,
+ "Group failure. Perhaps I am alone in the group.\n"));
+ }
+ catch (TMCast::Group::InsufficienSpace const&)
+ {
+ ACE_ERROR ((LM_ERROR, "Insufficient space in receive buffer.\n"));
+ }
+}
diff --git a/protocols/ace/TMCast/Export.hpp b/protocols/ace/TMCast/Export.hpp
new file mode 100644
index 00000000000..149a83cb785
--- /dev/null
+++ b/protocols/ace/TMCast/Export.hpp
@@ -0,0 +1,54 @@
+
+// -*- C++ -*-
+// $Id$
+// Definition for Win32 Export directives.
+// This file is generated automatically by generate_export_file.pl TMCast
+// ------------------------------
+#ifndef TMCAST_EXPORT_H
+#define TMCAST_EXPORT_H
+
+#include "ace/config-all.h"
+
+#if !defined (TMCAST_HAS_DLL)
+# define TMCAST_HAS_DLL 1
+#endif /* ! TMCAST_HAS_DLL */
+
+#if defined (TMCAST_HAS_DLL) && (TMCAST_HAS_DLL == 1)
+# if defined (TMCAST_BUILD_DLL)
+# define TMCast_Export ACE_Proper_Export_Flag
+# define TMCAST_SINGLETON_DECLARATION(T) ACE_EXPORT_SINGLETON_DECLARATION (T)
+# define TMCAST_SINGLETON_DECLARE(SINGLETON_TYPE, CLASS, LOCK) ACE_EXPORT_SINGLETON_DECLARE(SINGLETON_TYPE, CLASS, LOCK)
+# else /* TMCAST_BUILD_DLL */
+# define TMCast_Export ACE_Proper_Import_Flag
+# define TMCAST_SINGLETON_DECLARATION(T) ACE_IMPORT_SINGLETON_DECLARATION (T)
+# define TMCAST_SINGLETON_DECLARE(SINGLETON_TYPE, CLASS, LOCK) ACE_IMPORT_SINGLETON_DECLARE(SINGLETON_TYPE, CLASS, LOCK)
+# endif /* TMCAST_BUILD_DLL */
+#else /* TMCAST_HAS_DLL == 1 */
+# define TMCast_Export
+# define TMCAST_SINGLETON_DECLARATION(T)
+# define TMCAST_SINGLETON_DECLARE(SINGLETON_TYPE, CLASS, LOCK)
+#endif /* TMCAST_HAS_DLL == 1 */
+
+// Set TMCAST_NTRACE = 0 to turn on library specific tracing even if
+// tracing is turned off for ACE.
+#if !defined (TMCAST_NTRACE)
+# if (ACE_NTRACE == 1)
+# define TMCAST_NTRACE 1
+# else /* (ACE_NTRACE == 1) */
+# define TMCAST_NTRACE 0
+# endif /* (ACE_NTRACE == 1) */
+#endif /* !TMCAST_NTRACE */
+
+#if (TMCAST_NTRACE == 1)
+# define TMCAST_TRACE(X)
+#else /* (TMCAST_NTRACE == 1) */
+# if !defined (ACE_HAS_TRACE)
+# define ACE_HAS_TRACE
+# endif /* ACE_HAS_TRACE */
+# define TMCAST_TRACE(X) ACE_TRACE_IMPL(X)
+# include "ace/Trace.h"
+#endif /* (TMCAST_NTRACE == 1) */
+
+#endif /* TMCAST_EXPORT_H */
+
+// End of auto generated file.
diff --git a/protocols/ace/TMCast/FaultDetector.hpp b/protocols/ace/TMCast/FaultDetector.hpp
new file mode 100644
index 00000000000..ba476cbd367
--- /dev/null
+++ b/protocols/ace/TMCast/FaultDetector.hpp
@@ -0,0 +1,41 @@
+// file : TMCast/FaultDetector.hpp
+// author : Boris Kolpackov <boris@dre.vanderbilt.edu>
+// cvs-id : $Id$
+
+#include "Protocol.hpp"
+
+namespace TMCast
+{
+ class FaultDetector
+ {
+ public:
+ FaultDetector ()
+ : silence_period_ (-1)
+ {
+ }
+
+ public:
+ class Failed {};
+
+
+ void
+ insync ()
+ {
+ silence_period_ = 0;
+ }
+
+ void
+ outsync ()
+ {
+ if (++silence_period_ >= Protocol::FATAL_SILENCE_FRAME)
+ {
+ // cerr << "Silence period has been passed." << endl;
+ // cerr << "Decalring the node failed." << endl;
+ throw Failed ();
+ }
+ }
+
+ private:
+ short silence_period_;
+ };
+}
diff --git a/protocols/ace/TMCast/Group.cpp b/protocols/ace/TMCast/Group.cpp
new file mode 100644
index 00000000000..f6858d96644
--- /dev/null
+++ b/protocols/ace/TMCast/Group.cpp
@@ -0,0 +1,508 @@
+// file : TMCast/Group.cpp
+// author : Boris Kolpackov <boris@dre.vanderbilt.edu>
+// cvs-id : $Id$
+
+#include "Group.hpp"
+
+#include <typeinfo>
+
+// OS primitives
+#include <ace/OS.h>
+#include <ace/Synch.h>
+#include <ace/Time_Value.h>
+#include <ace/SOCK_Dgram_Mcast.h>
+
+#include "Messaging.hpp"
+
+#include "Protocol.hpp"
+
+// Components
+
+#include "LinkListener.hpp"
+#include "FaultDetector.hpp"
+#include "TransactionController.hpp"
+
+namespace TMCast
+{
+ bool
+ operator== (std::type_info const* pa, std::type_info const& b)
+ {
+ return *pa == b;
+ }
+
+ //
+ //
+ //
+ class Terminate : public virtual Message {};
+
+
+ //
+ //
+ //
+ class Failure : public virtual Message {};
+
+
+ //
+ //
+ //
+ class Scheduler
+ {
+ public:
+ Scheduler (ACE_INET_Addr const& addr,
+ char const* id,
+ MessageQueue& out_send_data,
+ MessageQueue& out_recv_data,
+ MessageQueue& out_control)
+
+ : cond_ (mutex_),
+
+ addr_ (addr),
+ sock_ (),
+
+ out_control_ (out_control),
+
+ in_data_ (mutex_),
+ in_link_data_(mutex_),
+ in_control_ (mutex_),
+
+ sync_schedule (ACE_OS::gettimeofday ()),
+
+ transaction_controller_ (in_data_, out_send_data, out_recv_data)
+ {
+ ACE_OS::strncpy (id_, id, Protocol::MEMBER_ID_LENGTH);
+ id_[Protocol::MEMBER_ID_LENGTH - 1] = '\0';
+
+ sock_.set_option (IP_MULTICAST_TTL, 32); // @@ ttl is hardcoded
+
+ in_data_.subscribe (cond_);
+ in_link_data_.subscribe (cond_);
+ in_control_.subscribe (cond_);
+
+ if (ACE_OS::thr_create (&thread_thunk,
+ this,
+ THR_JOINABLE,
+ &thread_) != 0) ::abort ();
+ }
+
+ ~Scheduler ()
+ {
+ {
+ MessageQueueAutoLock lock (in_control_);
+
+ in_control_.push (MessagePtr (new Terminate));
+ }
+
+ if (ACE_OS::thr_join (thread_, 0) != 0) ::abort ();
+
+ // cerr << "Scheduler is down." << endl;
+ }
+
+ public:
+ MessageQueue&
+ in_data ()
+ {
+ return in_data_;
+ }
+
+ private:
+ static void*
+ thread_thunk (void* arg)
+ {
+ Scheduler* obj = reinterpret_cast<Scheduler*> (arg);
+
+ obj->execute ();
+ return 0;
+ }
+
+ void
+ execute ()
+ {
+ try
+ {
+ sock_.join (addr_);
+
+ auto_ptr<LinkListener> ll (new LinkListener (sock_, in_link_data_));
+
+ {
+ AutoLock lock (mutex_);
+
+ // Loop
+ //
+ //
+
+ while (true)
+ {
+ cond_.wait (&sync_schedule);
+
+ // "Loop of Fairness"
+
+ bool done = false;
+
+ do
+ {
+ // control message
+ //
+ //
+ if (!in_control_.empty ())
+ {
+ done = true;
+ break;
+ }
+
+
+ // outsync
+ //
+ //
+ if (sync_schedule < ACE_OS::gettimeofday ())
+ {
+ // OUTSYNC
+
+ outsync ();
+
+ // schedule next outsync
+
+ sync_schedule =
+ ACE_OS::gettimeofday () +
+ ACE_Time_Value (0, Protocol::SYNC_PERIOD);
+ }
+
+ // link message
+ //
+ //
+ if (!in_link_data_.empty ())
+ {
+ MessagePtr m (in_link_data_.front ());
+ in_link_data_.pop ();
+
+ std::type_info const* exp (&typeid (*m));
+
+ if (exp == typeid (LinkFailure))
+ {
+ // cerr << "link failure" << endl;
+ throw false;
+ }
+ else if (exp == typeid (LinkData))
+ {
+
+ LinkData* data (dynamic_cast<LinkData*> (m.get ()));
+
+ // INSYNC, TL, CT
+
+ // Filter out loopback.
+ //
+ if (ACE_OS::strcmp (data->header().member_id.id, id_) != 0)
+ {
+ insync ();
+ transaction_list ();
+ current_transaction (data->header().current,
+ data->payload (),
+ data->size ());
+ }
+ }
+ else
+ {
+ // cerr << "unknown message type from link listener: "
+ // << typeid (*m).name () << endl;
+ abort ();
+ }
+ }
+
+ // api message
+ //
+ //
+ if (!in_data_.empty ())
+ {
+ // API
+
+ api ();
+ }
+
+ } while (!in_link_data_.empty() ||
+ sync_schedule < ACE_OS::gettimeofday ());
+
+ if (done) break;
+ }
+ }
+ }
+ catch (...)
+ {
+ // cerr << "Exception in scheduler loop." << endl;
+
+ MessageQueueAutoLock lock (out_control_);
+ out_control_.push (MessagePtr (new Failure));
+ }
+ }
+
+ // Events
+ //
+ // Order:
+ //
+ // INSYNC, TSL, VOTE, BEGIN
+ // API
+ // OUTSYNC
+ //
+
+ void
+ insync ()
+ {
+ fault_detector_.insync ();
+ }
+
+ void
+ outsync ()
+ {
+ char buf[Protocol::MAX_MESSAGE_SIZE];
+
+ Protocol::MessageHeader* hdr =
+ reinterpret_cast<Protocol::MessageHeader*> (buf);
+
+ void* data = buf + sizeof (Protocol::MessageHeader);
+
+ hdr->length = sizeof (Protocol::MessageHeader);
+ hdr->check_sum = 0;
+
+ ACE_OS::strcpy (hdr->member_id.id, id_);
+
+ size_t size (0);
+
+ transaction_controller_.outsync (hdr->current, data, size);
+
+ hdr->length += size;
+
+ fault_detector_.outsync ();
+
+ // sock_.send (buf, hdr->length, addr_);
+
+ sock_.send (buf, hdr->length);
+ }
+
+ void
+ transaction_list ()
+ {
+ }
+
+ void
+ current_transaction (Protocol::Transaction const& t,
+ void const* payload,
+ size_t size)
+ {
+ transaction_controller_.current_transaction (t, payload, size);
+ }
+
+ void
+ api ()
+ {
+ transaction_controller_.api ();
+ }
+
+ private:
+ ACE_thread_t thread_;
+
+ ACE_Thread_Mutex mutex_;
+ ACE_Condition<ACE_Thread_Mutex> cond_;
+
+ typedef ACE_Guard<ACE_Thread_Mutex> AutoLock;
+
+ char id_[Protocol::MEMBER_ID_LENGTH];
+
+ ACE_INET_Addr addr_;
+ ACE_SOCK_Dgram_Mcast sock_;
+
+ MessageQueue& out_control_;
+
+ MessageQueue in_data_;
+ MessageQueue in_link_data_;
+ MessageQueue in_control_;
+
+ // Protocol state
+ //
+ //
+
+ ACE_Time_Value sync_schedule;
+
+ FaultDetector fault_detector_;
+ TransactionController transaction_controller_;
+ };
+
+
+ //
+ //
+ //
+ class Group::GroupImpl
+ {
+ public:
+ ~GroupImpl ()
+ {
+ }
+
+ GroupImpl (ACE_INET_Addr const& addr, char const* id)
+ throw (Group::Failed)
+ : send_cond_ (mutex_),
+ recv_cond_ (mutex_),
+ failed_ (false),
+ in_send_data_ (mutex_),
+ in_recv_data_ (mutex_),
+ in_control_ (mutex_),
+ scheduler_ (new Scheduler (addr,
+ id,
+ in_send_data_,
+ in_recv_data_,
+ in_control_)),
+ out_data_ (scheduler_->in_data ())
+ {
+ in_send_data_.subscribe (send_cond_);
+ in_recv_data_.subscribe (recv_cond_);
+
+ in_control_.subscribe (send_cond_);
+ in_control_.subscribe (recv_cond_);
+ }
+
+ void
+ send (void const* msg, size_t size)
+ throw (Group::InvalidArg, Group::Failed, Group::Aborted)
+ {
+ if (size > Protocol::MAX_PAYLOAD_SIZE) throw InvalidArg ();
+
+ // Note the potential deadlock if I lock mutex_ and out_data_ in
+ // reverse order.
+
+ MessageQueueAutoLock l1 (out_data_);
+ AutoLock l2 (mutex_);
+
+ throw_if_failed ();
+
+ out_data_.push (MessagePtr (new Send (msg, size)));
+
+ l1.unlock (); // no need to keep it locked
+
+ while (true)
+ {
+ throw_if_failed ();
+
+ if (!in_send_data_.empty ())
+ {
+ MessagePtr m (in_send_data_.front ());
+ in_send_data_.pop ();
+
+ std::type_info const* exp (&typeid (*m));
+
+ if (exp == typeid (TMCast::Aborted))
+ {
+ throw Group::Aborted ();
+ }
+ else if (exp == typeid (Commited))
+ {
+ return;
+ }
+ else
+ {
+ // cerr << "send: group-scheduler messaging protocol violation; "
+ // << "unexpected message " << typeid (*m).name ()
+ // << " " << typeid (Aborted).name () << endl;
+
+ abort ();
+ }
+ }
+
+ // cerr << "send: waiting on condition" << endl;
+ send_cond_.wait ();
+ // cerr << "send: wokeup on condition" << endl;
+ }
+ }
+
+
+
+ size_t
+ recv (void* msg, size_t size) throw (Group::Failed, Group::InsufficienSpace)
+ {
+ AutoLock lock (mutex_);
+
+ while (true)
+ {
+ throw_if_failed ();
+
+ if (!in_recv_data_.empty ())
+ {
+ MessagePtr m (in_recv_data_.front ());
+ in_recv_data_.pop ();
+
+ std::type_info const* exp (&typeid (*m));
+
+ if (exp == typeid (Recv))
+ {
+ Recv* data (dynamic_cast<Recv*> (m.get ()));
+
+ if (size < data->size ()) throw Group::InsufficienSpace ();
+
+ memcpy (msg, data->payload (), data->size ());
+
+ return data->size ();
+ }
+ else
+ {
+ // cerr << "recv: group-scheduler messaging protocol violation. "
+ // << "unexpected message " << typeid (*m).name () << endl;
+
+ abort ();
+ }
+ }
+
+ recv_cond_.wait ();
+ }
+ }
+
+ private:
+ void
+ throw_if_failed ()
+ {
+ if (!failed_ && !in_control_.empty ()) failed_ = true;
+
+ if (failed_) throw Group::Failed ();
+ }
+
+ private:
+ ACE_Thread_Mutex mutex_;
+ ACE_Condition<ACE_Thread_Mutex> send_cond_;
+ ACE_Condition<ACE_Thread_Mutex> recv_cond_;
+
+ typedef ACE_Guard<ACE_Thread_Mutex> AutoLock;
+
+ bool failed_;
+
+ MessageQueue in_send_data_;
+ MessageQueue in_recv_data_;
+ MessageQueue in_control_;
+
+ auto_ptr<Scheduler> scheduler_;
+
+ MessageQueue& out_data_;
+ };
+
+
+ // Group
+ //
+ //
+ Group::
+ Group (ACE_INET_Addr const& addr, char const* id)
+ throw (Failed)
+ : pimpl_ (new GroupImpl (addr, id))
+ {
+ }
+
+ Group::
+ ~Group ()
+ {
+ }
+
+ void Group::
+ send (void const* msg, size_t size) throw (InvalidArg, Failed, Aborted)
+ {
+ pimpl_->send (msg, size);
+ }
+
+ size_t Group::
+ recv (void* msg, size_t size) throw (Failed, InsufficienSpace)
+ {
+ return pimpl_->recv (msg, size);
+ }
+}
diff --git a/protocols/ace/TMCast/Group.hpp b/protocols/ace/TMCast/Group.hpp
new file mode 100644
index 00000000000..416cea0a17d
--- /dev/null
+++ b/protocols/ace/TMCast/Group.hpp
@@ -0,0 +1,51 @@
+// file : TMCast/Group.hpp
+// author : Boris Kolpackov <boris@dre.vanderbilt.edu>
+// cvs-id : $Id$
+
+#ifndef TMCAST_GROUP_HPP
+#define TMCAST_GROUP_HPP
+
+#include <ace/Auto_Ptr.h>
+#include <ace/INET_Addr.h>
+
+#include "Export.hpp"
+
+namespace TMCast
+{
+ class TMCast_Export Group
+ {
+ public:
+ class Aborted {};
+ class Failed {};
+ class InvalidArg {};
+ class InsufficienSpace {};
+
+ public:
+ ~Group ();
+
+ Group (ACE_INET_Addr const& addr, char const* id) throw (Failed);
+
+ public:
+ void
+ send (void const* msg, size_t size) throw (InvalidArg, Failed, Aborted);
+
+ size_t
+ recv (void* msg, size_t size) throw (Failed, InsufficienSpace);
+
+ private:
+ bool
+ failed ();
+
+ private:
+ class GroupImpl;
+ auto_ptr<GroupImpl> pimpl_;
+
+ private:
+ Group (Group const&);
+
+ Group&
+ operator= (Group const&);
+ };
+}
+
+#endif // TMCAST_GROUP_HPP
diff --git a/protocols/ace/TMCast/GroupFwd.hpp b/protocols/ace/TMCast/GroupFwd.hpp
new file mode 100644
index 00000000000..beba06df79d
--- /dev/null
+++ b/protocols/ace/TMCast/GroupFwd.hpp
@@ -0,0 +1,15 @@
+// file : TMCast/GroupFwd.hpp
+// author : Boris Kolpackov <boris@dre.vanderbilt.edu>
+// cvs-id : $Id$
+
+#ifndef TMCAST_GROUP_FWD_HPP
+#define TMCAST_GROUP_FWD_HPP
+
+#include "Export.hpp"
+
+namespace TMCast
+{
+ class TMCast_Export Group;
+}
+
+#endif // TMCAST_GROUP_FWD_HPP
diff --git a/protocols/ace/TMCast/LinkListener.hpp b/protocols/ace/TMCast/LinkListener.hpp
new file mode 100644
index 00000000000..990f9e8f803
--- /dev/null
+++ b/protocols/ace/TMCast/LinkListener.hpp
@@ -0,0 +1,166 @@
+// file : TMCast/LinkListener.hpp
+// author : Boris Kolpackov <boris@dre.vanderbilt.edu>
+// cvs-id : $Id$
+
+// OS primitives
+#include <ace/Synch.h>
+#include <ace/SOCK_Dgram_Mcast.h>
+#include <ace/Refcounted_Auto_Ptr.h>
+
+
+#include "Messaging.hpp"
+
+namespace TMCast
+{
+ //
+ //
+ //
+ class LinkFailure : public virtual Message {};
+
+
+ //
+ //
+ //
+ class LinkData : public virtual Message
+ {
+ public:
+ LinkData (Protocol::MessageHeader const* header,
+ void* payload,
+ size_t size)
+ : size_ (size)
+ {
+ ACE_OS::memcpy (&header_, header, sizeof (Protocol::MessageHeader));
+ ACE_OS::memcpy (payload_, payload, size_);
+ }
+
+ Protocol::MessageHeader const&
+ header () const
+ {
+ return header_;
+ }
+
+ void const*
+ payload () const
+ {
+ return payload_;
+ }
+
+ size_t
+ size () const
+ {
+ return size_;
+ }
+
+ private:
+ Protocol::MessageHeader header_;
+ char payload_[Protocol::MAX_MESSAGE_SIZE];
+ size_t size_;
+ };
+
+ typedef
+ ACE_Refcounted_Auto_Ptr<LinkData, ACE_Null_Mutex>
+ LinkDataPtr;
+
+ //
+ //
+ //
+ class LinkListener
+ {
+ private:
+ class Terminate : public virtual Message {};
+
+ public:
+ LinkListener (ACE_SOCK_Dgram_Mcast& sock, MessageQueue& out)
+ : sock_(sock), out_ (out)
+ {
+ if (ACE_OS::thr_create (&thread_thunk,
+ this,
+ THR_JOINABLE,
+ &thread_) != 0) ::abort ();
+ }
+
+ ~LinkListener ()
+ {
+ {
+ MessageQueueAutoLock lock (control_);
+
+ control_.push (MessagePtr (new Terminate));
+ }
+
+ if (ACE_OS::thr_join (thread_, 0) != 0) ::abort ();
+
+ // cerr << "Link listener is down." << endl;
+ }
+
+
+ static void*
+ thread_thunk (void* arg)
+ {
+ LinkListener* obj = reinterpret_cast<LinkListener*> (arg);
+
+ obj->execute ();
+ return 0;
+ }
+
+ void
+ execute ()
+ {
+ char msg[Protocol::MAX_MESSAGE_SIZE];
+
+ ssize_t header_size = sizeof (Protocol::MessageHeader);
+
+ // OS::Time timeout (1000000); // one millisecond
+
+ ACE_Time_Value timeout (0, 1000); // one millisecond
+
+ try
+ {
+ while (true)
+ {
+ // Check control message queue
+
+ {
+ MessageQueueAutoLock lock (control_);
+
+ if (!control_.empty ()) break;
+ }
+
+ ACE_Addr junk;
+ ssize_t n = sock_.recv (msg,
+ Protocol::MAX_MESSAGE_SIZE,
+ junk,
+ 0,
+ &timeout);
+
+ if (n != -1)
+ {
+ if (n < header_size) throw false;
+
+ Protocol::MessageHeader* header =
+ reinterpret_cast<Protocol::MessageHeader*> (msg);
+
+ MessageQueueAutoLock lock (out_);
+
+ out_.push (MessagePtr (new LinkData (header,
+ msg + header_size,
+ n - header_size)));
+ }
+ }
+ }
+ catch (...)
+ {
+ MessageQueueAutoLock lock (out_);
+
+ out_.push (MessagePtr (new LinkFailure));
+ }
+ }
+
+ private:
+ typedef ACE_Guard<ACE_Thread_Mutex> AutoLock;
+
+ ACE_thread_t thread_;
+ ACE_SOCK_Dgram_Mcast& sock_;
+ MessageQueue& out_;
+ MessageQueue control_;
+ };
+}
diff --git a/protocols/ace/TMCast/MTQueue.hpp b/protocols/ace/TMCast/MTQueue.hpp
new file mode 100644
index 00000000000..d593c034723
--- /dev/null
+++ b/protocols/ace/TMCast/MTQueue.hpp
@@ -0,0 +1,177 @@
+// file : TMCast/MTQueue.hpp
+// author : Boris Kolpackov <boris@dre.vanderbilt.edu>
+// cvs-id : $Id$
+
+#ifndef TMCAST_MT_QUEUE_HPP
+#define TMCAST_MT_QUEUE_HPP
+
+#include "ace/Auto_Ptr.h"
+#include "ace/Unbounded_Set.h"
+#include "ace/Unbounded_Queue.h"
+
+namespace TMCast
+{
+ template <typename T,
+ typename M,
+ typename C,
+ typename Q = ACE_Unbounded_Queue<T> >
+ class MTQueue
+ {
+ public:
+ typedef T ElementType;
+ typedef M MutexType;
+ typedef C ConditionalType;
+ typedef Q QueueType;
+
+ public:
+
+ MTQueue (std::size_t hint = 0)
+ : mutexp_ (new MutexType),
+ mutex_ (*mutexp_),
+ // queue_ (hint),
+ queue_ (),
+ signal_ (false)
+ {
+ }
+
+ MTQueue (MutexType& mutex, std::size_t hint = 0)
+ : mutexp_ (),
+ mutex_ (mutex),
+ // queue_ (hint),
+ queue_ (),
+ signal_ (false)
+ {
+ }
+
+ public:
+ bool
+ empty () const
+ {
+ return queue_.is_empty ();
+ }
+
+ std::size_t
+ size () const
+ {
+ return queue_.size ();
+ }
+
+ // typedef typename QueueType::Empty Empty;
+
+ class Empty {};
+
+ T&
+ front ()
+ {
+ ACE_Unbounded_Queue_Iterator<T> f (queue_);
+ T* tmp;
+ if (!f.next (tmp)) throw Empty ();
+
+ return *tmp;
+ }
+
+
+ T const&
+ front () const
+ {
+ ACE_Unbounded_Queue_Const_Iterator<T> f (queue_);
+ T* tmp;
+ if (!f.next (tmp)) throw Empty ();
+
+ return *tmp;
+ }
+
+ /*
+ T&
+ back ()
+ {
+ return queue_.back ();
+ }
+
+
+ T const&
+ back () const
+ {
+ return queue_.back ();
+ }
+ */
+
+ void
+ push (T const& t)
+ {
+ signal_ = empty ();
+ queue_.enqueue_tail (t);
+ }
+
+ void
+ pop ()
+ {
+ T junk;
+ queue_.dequeue_head (junk);
+ }
+
+ public:
+ void
+ lock () const
+ {
+ mutex_.acquire ();
+ }
+
+ void
+ unlock () const
+ {
+ if (signal_)
+ {
+ signal_ = false;
+
+ for (ConditionalSetConstIterator_ i (cond_set_);
+ !i.done ();
+ i.advance ())
+ {
+ ConditionalType** c;
+
+ i.next (c);
+
+ (*c)->signal ();
+ }
+ }
+
+ mutex_.release ();
+ }
+
+ void
+ subscribe (ConditionalType& c)
+ {
+ //@@ should check for duplicates
+ //
+ cond_set_.insert (&c);
+ }
+
+ void
+ unsubscribe (ConditionalType& c)
+ {
+ //@@ should check for absence
+ //
+ cond_set_.remove (&c);
+ }
+
+ private:
+ auto_ptr<MutexType> mutexp_;
+ mutable MutexType& mutex_;
+ QueueType queue_;
+
+ typedef
+ ACE_Unbounded_Set<ConditionalType*>
+ ConditionalSet_;
+
+ typedef
+ ACE_Unbounded_Set_Const_Iterator<ConditionalType*>
+ ConditionalSetConstIterator_;
+
+ ConditionalSet_ cond_set_;
+
+ mutable bool signal_;
+ };
+}
+
+#endif // TMCAST_MT_QUEUE_HPP
diff --git a/protocols/ace/TMCast/Makefile b/protocols/ace/TMCast/Makefile
new file mode 100644
index 00000000000..87f6d1964ba
--- /dev/null
+++ b/protocols/ace/TMCast/Makefile
@@ -0,0 +1,221 @@
+#----------------------------------------------------------------------------
+#
+# $Id$
+#
+#----------------------------------------------------------------------------
+
+MAKEFILE = Makefile
+LIB = libTMCast.a
+SHLIB = libTMCast.$(SOEXT)
+
+FILES= Group Protocol
+
+LIBS=$(ACELIB)
+
+#----------------------------------------------------------------------------
+# Include macros and targets
+#----------------------------------------------------------------------------
+
+include $(ACE_ROOT)/include/makeinclude/wrapper_macros.GNU
+
+LSRC = $(addsuffix .cpp,$(FILES))
+
+include $(ACE_ROOT)/include/makeinclude/macros.GNU
+include $(ACE_ROOT)/include/makeinclude/rules.common.GNU
+include $(ACE_ROOT)/include/makeinclude/rules.nonested.GNU
+include $(ACE_ROOT)/include/makeinclude/rules.lib.GNU
+include $(ACE_ROOT)/include/makeinclude/rules.local.GNU
+
+#----------------------------------------------------------------------------
+# Local targets
+#----------------------------------------------------------------------------
+
+ifeq ($(shared_libs),1)
+ifneq ($(SHLIB),)
+CPPFLAGS += -DTMCAST_BUILD_DLL
+endif
+endif
+
+#----------------------------------------------------------------------------
+# Dependencies
+#----------------------------------------------------------------------------
+# DO NOT DELETE THIS LINE -- g++dep uses it.
+# DO NOT PUT ANYTHING AFTER THIS LINE, IT WILL GO AWAY.
+
+
+.obj/Group.o .obj/Group.so .shobj/Group.o .shobj/Group.so: Group.cpp Group.hpp \
+ $(ACE_ROOT)/ace/Auto_Ptr.h \
+ $(ACE_ROOT)/ace/pre.h \
+ $(ACE_ROOT)/ace/post.h \
+ $(ACE_ROOT)/ace/ace_wchar.h \
+ $(ACE_ROOT)/ace/ace_wchar.inl \
+ $(ACE_ROOT)/ace/Auto_Ptr.i \
+ $(ACE_ROOT)/ace/Global_Macros.h \
+ $(ACE_ROOT)/ace/OS_Export.h \
+ $(ACE_ROOT)/ace/Auto_Ptr.cpp \
+ $(ACE_ROOT)/ace/INET_Addr.h \
+ $(ACE_ROOT)/ace/Sock_Connect.h \
+ $(ACE_ROOT)/ace/ACE_export.h \
+ $(ACE_ROOT)/ace/Basic_Types.h \
+ $(ACE_ROOT)/ace/os_include/os_limits.h \
+ $(ACE_ROOT)/ace/os_include/os_unistd.h \
+ $(ACE_ROOT)/ace/os_include/sys/os_types.h \
+ $(ACE_ROOT)/ace/os_include/os_stddef.h \
+ $(ACE_ROOT)/ace/os_include/os_inttypes.h \
+ $(ACE_ROOT)/ace/os_include/os_stdint.h \
+ $(ACE_ROOT)/ace/os_include/os_stdio.h \
+ $(ACE_ROOT)/ace/os_include/os_stdarg.h \
+ $(ACE_ROOT)/ace/os_include/os_float.h \
+ $(ACE_ROOT)/ace/os_include/os_stdlib.h \
+ $(ACE_ROOT)/ace/os_include/sys/os_wait.h \
+ $(ACE_ROOT)/ace/os_include/os_signal.h \
+ $(ACE_ROOT)/ace/os_include/os_time.h \
+ $(ACE_ROOT)/ace/os_include/os_ucontext.h \
+ $(ACE_ROOT)/ace/os_include/sys/os_resource.h \
+ $(ACE_ROOT)/ace/os_include/sys/os_time.h \
+ $(ACE_ROOT)/ace/os_include/sys/os_select.h \
+ $(ACE_ROOT)/ace/Basic_Types.i \
+ $(ACE_ROOT)/ace/os_include/netinet/os_in.h \
+ $(ACE_ROOT)/ace/os_include/sys/os_socket.h \
+ $(ACE_ROOT)/ace/os_include/sys/os_uio.h \
+ $(ACE_ROOT)/ace/Sock_Connect.i \
+ $(ACE_ROOT)/ace/Addr.h \
+ $(ACE_ROOT)/ace/Addr.i \
+ $(ACE_ROOT)/ace/INET_Addr.i \
+ $(ACE_ROOT)/ace/OS.h \
+ $(ACE_ROOT)/ace/OS_Dirent.h \
+ $(ACE_ROOT)/ace/OS_Errno.h \
+ $(ACE_ROOT)/ace/os_include/os_errno.h \
+ $(ACE_ROOT)/ace/OS_Errno.inl \
+ $(ACE_ROOT)/ace/os_include/os_dirent.h \
+ $(ACE_ROOT)/ace/OS_Dirent.inl \
+ $(ACE_ROOT)/ace/OS_String.h \
+ $(ACE_ROOT)/ace/OS_String.inl \
+ $(ACE_ROOT)/ace/os_include/os_string.h \
+ $(ACE_ROOT)/ace/os_include/os_strings.h \
+ $(ACE_ROOT)/ace/os_include/os_ctype.h \
+ $(ACE_ROOT)/ace/OS_Memory.h \
+ $(ACE_ROOT)/ace/OS_Memory.inl \
+ $(ACE_ROOT)/ace/OS_TLI.h \
+ $(ACE_ROOT)/ace/OS_TLI.inl \
+ $(ACE_ROOT)/ace/os_include/os_dlfcn.h \
+ $(ACE_ROOT)/ace/os_include/sys/os_mman.h \
+ $(ACE_ROOT)/ace/os_include/os_netdb.h \
+ $(ACE_ROOT)/ace/os_include/net/os_if.h \
+ $(ACE_ROOT)/ace/os_include/sys/os_sem.h \
+ $(ACE_ROOT)/ace/os_include/sys/os_ipc.h \
+ $(ACE_ROOT)/ace/Time_Value.h \
+ $(ACE_ROOT)/ace/Time_Value.inl \
+ $(ACE_ROOT)/ace/Default_Constants.h \
+ $(ACE_ROOT)/ace/Min_Max.h \
+ $(ACE_ROOT)/ace/os_include/os_pthread.h \
+ $(ACE_ROOT)/ace/os_include/os_assert.h \
+ $(ACE_ROOT)/ace/os_include/os_fcntl.h \
+ $(ACE_ROOT)/ace/os_include/sys/os_stat.h \
+ $(ACE_ROOT)/ace/iosfwd.h \
+ $(ACE_ROOT)/ace/os_include/arpa/os_inet.h \
+ $(ACE_ROOT)/ace/os_include/netinet/os_tcp.h \
+ $(ACE_ROOT)/ace/os_include/sys/os_shm.h \
+ $(ACE_ROOT)/ace/os_include/os_pwd.h \
+ $(ACE_ROOT)/ace/os_include/os_stropts.h \
+ $(ACE_ROOT)/ace/os_include/os_termios.h \
+ $(ACE_ROOT)/ace/os_include/os_aio.h \
+ $(ACE_ROOT)/ace/os_include/sys/os_un.h \
+ $(ACE_ROOT)/ace/os_include/os_poll.h \
+ $(ACE_ROOT)/ace/os_include/sys/os_msg.h \
+ $(ACE_ROOT)/ace/os_include/sys/os_utsname.h \
+ $(ACE_ROOT)/ace/os_include/os_syslog.h \
+ $(ACE_ROOT)/ace/OS.i Export.hpp \
+ $(ACE_ROOT)/ace/Synch.h \
+ $(ACE_ROOT)/ace/Auto_Event.h \
+ $(ACE_ROOT)/ace/Event.h \
+ $(ACE_ROOT)/ace/Event.inl \
+ $(ACE_ROOT)/ace/Auto_Event.inl \
+ $(ACE_ROOT)/ace/Barrier.h \
+ $(ACE_ROOT)/ace/Condition_Thread_Mutex.h \
+ $(ACE_ROOT)/ace/Thread_Mutex.h \
+ $(ACE_ROOT)/ace/Thread_Mutex.inl \
+ $(ACE_ROOT)/ace/Condition_Thread_Mutex.inl \
+ $(ACE_ROOT)/ace/Barrier.inl \
+ $(ACE_ROOT)/ace/Condition_Recursive_Thread_Mutex.h \
+ $(ACE_ROOT)/ace/Recursive_Thread_Mutex.h \
+ $(ACE_ROOT)/ace/Recursive_Thread_Mutex.inl \
+ $(ACE_ROOT)/ace/Condition_Recursive_Thread_Mutex.inl \
+ $(ACE_ROOT)/ace/Lock.h \
+ $(ACE_ROOT)/ace/Lock.inl \
+ $(ACE_ROOT)/ace/Manual_Event.h \
+ $(ACE_ROOT)/ace/Manual_Event.inl \
+ $(ACE_ROOT)/ace/Mutex.h \
+ $(ACE_ROOT)/ace/Mutex.inl \
+ $(ACE_ROOT)/ace/Null_Barrier.h \
+ $(ACE_ROOT)/ace/Null_Condition.h \
+ $(ACE_ROOT)/ace/Null_Mutex.h \
+ $(ACE_ROOT)/ace/Null_Semaphore.h \
+ $(ACE_ROOT)/ace/RW_Mutex.h \
+ $(ACE_ROOT)/ace/RW_Mutex.inl \
+ $(ACE_ROOT)/ace/RW_Thread_Mutex.h \
+ $(ACE_ROOT)/ace/RW_Thread_Mutex.inl \
+ $(ACE_ROOT)/ace/Semaphore.h \
+ $(ACE_ROOT)/ace/Semaphore.inl \
+ $(ACE_ROOT)/ace/Thread_Semaphore.h \
+ $(ACE_ROOT)/ace/Thread_Semaphore.inl \
+ $(ACE_ROOT)/ace/TSS_Adapter.h \
+ $(ACE_ROOT)/ace/TSS_Adapter.inl \
+ $(ACE_ROOT)/ace/Synch.i \
+ $(ACE_ROOT)/ace/Synch_T.h \
+ $(ACE_ROOT)/ace/Lock_Adapter_T.h \
+ $(ACE_ROOT)/ace/Lock_Adapter_T.inl \
+ $(ACE_ROOT)/ace/Lock_Adapter_T.cpp \
+ $(ACE_ROOT)/ace/Reverse_Lock_T.h \
+ $(ACE_ROOT)/ace/Reverse_Lock_T.inl \
+ $(ACE_ROOT)/ace/Reverse_Lock_T.cpp \
+ $(ACE_ROOT)/ace/Guard_T.h \
+ $(ACE_ROOT)/ace/Guard_T.inl \
+ $(ACE_ROOT)/ace/Guard_T.cpp \
+ $(ACE_ROOT)/ace/TSS_T.h \
+ $(ACE_ROOT)/ace/TSS_T.inl \
+ $(ACE_ROOT)/ace/TSS_T.cpp \
+ $(ACE_ROOT)/ace/Thread.h \
+ $(ACE_ROOT)/ace/Thread_Adapter.h \
+ $(ACE_ROOT)/ace/Base_Thread_Adapter.h \
+ $(ACE_ROOT)/ace/OS_Log_Msg_Attributes.h \
+ $(ACE_ROOT)/ace/OS_Log_Msg_Attributes.inl \
+ $(ACE_ROOT)/ace/Base_Thread_Adapter.inl \
+ $(ACE_ROOT)/ace/Thread_Adapter.inl \
+ $(ACE_ROOT)/ace/Thread.i \
+ $(ACE_ROOT)/ace/Log_Msg.h \
+ $(ACE_ROOT)/ace/Log_Priority.h \
+ $(ACE_ROOT)/ace/Condition_T.h \
+ $(ACE_ROOT)/ace/Condition_T.inl \
+ $(ACE_ROOT)/ace/Condition_T.cpp \
+ $(ACE_ROOT)/ace/Synch_Traits.h \
+ $(ACE_ROOT)/ace/Synch_T.i \
+ $(ACE_ROOT)/ace/Synch_T.cpp \
+ $(ACE_ROOT)/ace/SOCK_Dgram_Mcast.h \
+ $(ACE_ROOT)/ace/SOCK_Dgram.h \
+ $(ACE_ROOT)/ace/SOCK.h \
+ $(ACE_ROOT)/ace/IPC_SAP.h \
+ $(ACE_ROOT)/ace/Flag_Manip.h \
+ $(ACE_ROOT)/ace/Flag_Manip.i \
+ $(ACE_ROOT)/ace/IPC_SAP.i \
+ $(ACE_ROOT)/ace/SOCK.i \
+ $(ACE_ROOT)/ace/SOCK_Dgram.i \
+ $(ACE_ROOT)/ace/SOCK_Dgram_Mcast.i \
+ Messaging.hpp \
+ $(ACE_ROOT)/ace/Refcounted_Auto_Ptr.h \
+ $(ACE_ROOT)/ace/Refcounted_Auto_Ptr.i \
+ MTQueue.hpp $(ACE_ROOT)/ace/Unbounded_Set.h \
+ $(ACE_ROOT)/ace/Node.h \
+ $(ACE_ROOT)/ace/Node.cpp \
+ $(ACE_ROOT)/ace/Unbounded_Set.inl \
+ $(ACE_ROOT)/ace/Unbounded_Set.cpp \
+ $(ACE_ROOT)/ace/Malloc_Base.h \
+ $(ACE_ROOT)/ace/Unbounded_Queue.h \
+ $(ACE_ROOT)/ace/Unbounded_Queue.inl \
+ $(ACE_ROOT)/ace/Unbounded_Queue.cpp \
+ Protocol.hpp LinkListener.hpp FaultDetector.hpp \
+ TransactionController.hpp
+
+.obj/Protocol.o .obj/Protocol.so .shobj/Protocol.o .shobj/Protocol.so: Protocol.cpp Protocol.hpp
+
+# IF YOU PUT ANYTHING HERE IT WILL GO AWAY
diff --git a/protocols/ace/TMCast/Messaging.hpp b/protocols/ace/TMCast/Messaging.hpp
new file mode 100644
index 00000000000..6a1000c3265
--- /dev/null
+++ b/protocols/ace/TMCast/Messaging.hpp
@@ -0,0 +1,54 @@
+// file : TMCast/Messaging.hpp
+// author : Boris Kolpackov <boris@dre.vanderbilt.edu>
+// cvs-id : $Id$
+
+#ifndef TMCAST_MESSAGING_HPP
+#define TMCAST_MESSAGING_HPP
+
+#include <ace/Synch.h>
+#include <ace/Refcounted_Auto_Ptr.h>
+
+#include "MTQueue.hpp"
+
+namespace TMCast
+{
+ class Message
+ {
+ public:
+ virtual
+ ~Message () {}
+ };
+
+ typedef
+ ACE_Refcounted_Auto_Ptr<Message, ACE_Null_Mutex>
+ MessagePtr;
+
+ typedef
+ MTQueue<MessagePtr, ACE_Thread_Mutex, ACE_Condition<ACE_Thread_Mutex> >
+ MessageQueue;
+
+ struct MessageQueueAutoLock
+ {
+ MessageQueueAutoLock (MessageQueue& q)
+ : q_ (q)
+ {
+ q_.lock ();
+ }
+
+ void
+ unlock ()
+ {
+ q_.unlock ();
+ }
+
+ ~MessageQueueAutoLock ()
+ {
+ q_.unlock ();
+ }
+
+ private:
+ MessageQueue& q_;
+ };
+}
+
+#endif // TMCAST_MESSAGING_HPP
diff --git a/protocols/ace/TMCast/Protocol.cpp b/protocols/ace/TMCast/Protocol.cpp
new file mode 100644
index 00000000000..78563281694
--- /dev/null
+++ b/protocols/ace/TMCast/Protocol.cpp
@@ -0,0 +1,31 @@
+// file : TMCast/Protocol.cpp
+// author : Boris Kolpackov <boris@dre.vanderbilt.edu>
+// cvs-id : $Id$
+
+#include "Protocol.hpp"
+
+namespace TMCast
+{
+ namespace Protocol
+ {
+ namespace
+ {
+ char const* labels[] = {
+ "NONE", "BEGIN", "COMMIT", "ABORT", "COMMITED", "ABORTED"};
+ }
+
+ /*
+ std::string
+ tslabel (Protocol::TransactionStatus s)
+ {
+ return labels[s];
+ }
+
+ std::ostream&
+ operator << (std::ostream& o, Transaction const& t)
+ {
+ return o << "{" << t.id << "; " << tslabel (t.status) << "}";
+ }
+ */
+ }
+}
diff --git a/protocols/ace/TMCast/Protocol.hpp b/protocols/ace/TMCast/Protocol.hpp
new file mode 100644
index 00000000000..d5ae6a50cd6
--- /dev/null
+++ b/protocols/ace/TMCast/Protocol.hpp
@@ -0,0 +1,107 @@
+// file : TMCast/Protocol.hpp
+// author : Boris Kolpackov <boris@dre.vanderbilt.edu>
+// cvs-id : $Id$
+
+#ifndef TMCAST_PROTOCOL_HPP
+#define TMCAST_PROTOCOL_HPP
+
+namespace TMCast
+{
+ namespace Protocol
+ {
+ //
+ //
+ //
+ unsigned long const MEMBER_ID_LENGTH = 38;
+
+ struct MemberId
+ {
+ char id[MEMBER_ID_LENGTH];
+ /*
+ unsigned long ip;
+ unsigned short port;
+ */
+ };
+
+ //
+ //
+ //
+ typedef unsigned short TransactionId;
+
+
+
+ typedef unsigned char TransactionStatus;
+
+ TransactionStatus const TS_BEGIN = 1;
+ TransactionStatus const TS_COMMIT = 2;
+ TransactionStatus const TS_ABORT = 3;
+ TransactionStatus const TS_COMMITED = 4;
+ TransactionStatus const TS_ABORTED = 5;
+
+ struct Transaction
+ {
+ TransactionId id;
+ TransactionStatus status;
+ };
+
+ // Transaction List (TL)
+
+ // unsigned long const TL_LENGTH = 1;
+
+ // typedef Transaction TransactionList[TL_LENGTH];
+
+
+ //
+ //
+ //
+ struct MessageHeader
+ {
+ unsigned long length;
+
+ unsigned long check_sum;
+
+ MemberId member_id;
+
+ Transaction current;
+
+ //TransactionList transaction_list;
+ };
+
+
+ //
+ //
+ //
+
+ unsigned long const MAX_MESSAGE_SIZE = 768;
+
+ unsigned long const
+ MAX_PAYLOAD_SIZE = MAX_MESSAGE_SIZE - sizeof (MessageHeader);
+
+ // Protocol timing
+ //
+ //
+
+ unsigned long const SYNC_PERIOD = 30000; // in mks
+
+ unsigned short const VOTING_FRAME = 4; // in SYNC_PERIOD's
+ unsigned short const SEPARATION_FRAME = 5; // in SYNC_PERIOD's
+
+ // FATAL_SILENCE_FRAME in SYNC_PERIOD's
+ // Generally it's a good idea to set it to < VOTING_FRAME + SEPARATION_FRAME
+ //
+
+ short const FATAL_SILENCE_FRAME = VOTING_FRAME + SEPARATION_FRAME - 2;
+
+ // short const FATAL_SILENCE_FRAME = 10000;
+
+ // Helpers
+
+ // std::string
+ // tslabel (Protocol::TransactionStatus s);
+
+ // std::ostream&
+ // operator << (std::ostream& o, Transaction const& t);
+ }
+}
+
+#endif // TMCAST_PROTOCOL_HPP
diff --git a/protocols/ace/TMCast/README b/protocols/ace/TMCast/README
new file mode 100644
index 00000000000..d061c7b2cba
--- /dev/null
+++ b/protocols/ace/TMCast/README
@@ -0,0 +1,58 @@
+
+
+Architecture
+
+TMCast (stands for Transaction MultiCast) is an implementation of a
+transactional multicast protocol. In essence, the idea is to represent
+message delivery to members of a multicast group as a transaction -
+atomic, consistent and isolated action. Multicast transaction can
+be viewed as an atomic transition of group members to a new state.
+If we define Mo as a set of operational (non-faulty) members of the
+group, Mf as a set of faulty members of the group, Ma as a set of
+members that view transition Tn as aborted and Mc as a set of members
+that view transition Tn as committed, then this atomic transition Tn
+can be described as one of the following:
+
+Mo(Tn-1) = Ma(T) U Mf(T)
+Mo(Tn-1) = Mc(T) U Mf(T)
+
+Or, in other words, after transaction T has been committed (aborted),
+all operational (before transaction T) members are either in
+committed (aborted) or failed state.
+
+Thus, for each member of the group, outcome of the transaction can
+be commit, abort or member failure. It is important for a member
+to exhibit a failfast (error latency is less than transaction cycle)
+behavior. Or, in other words, if the member transitioned into a wrong
+state, it is guaranteed to fail instead of delivering wrong result.
+
+In order to achieve such error detection in decentralized environment,
+certain limitations should be imposed. One of the most user-visible
+limitation is the fact that the lifetime of the group with only
+one member is very short. This is because there is not way for a
+member to distinguish "no members yet" case from "my link to the
+group is down". In such situation, the member assumes the latter case.
+There is also a military saying that puts it quite nicely: two is one,
+one is nothing.
+
+
+State of Implementation
+
+Current implementation is in prototypical stage. The following parts
+are not implemented or still under development:
+
+* Handling of network partitioning (TODO)
+
+* Redundant network support (TODO)
+
+* Member failure detection (partial implementation)
+
+
+Examples
+
+There is a simple example available in examples/TMCast/Member with
+corresponding README.
+
+
+--
+Boris Kolpackov <boris@dre.vanderbilt.edu> \ No newline at end of file
diff --git a/protocols/ace/TMCast/TransactionController.hpp b/protocols/ace/TMCast/TransactionController.hpp
new file mode 100644
index 00000000000..66c924faaf1
--- /dev/null
+++ b/protocols/ace/TMCast/TransactionController.hpp
@@ -0,0 +1,384 @@
+// file : TMCast/TransactionController.hpp
+// author : Boris Kolpackov <boris@dre.vanderbilt.edu>
+// cvs-id : $Id$
+
+#include <ace/Synch.h>
+#include <ace/Refcounted_Auto_Ptr.h>
+
+#include "Protocol.hpp"
+#include "Messaging.hpp"
+
+namespace TMCast
+{
+
+ // Messages
+ //
+ //
+ class Send : public virtual Message
+ {
+ public:
+ Send (void const* msg, size_t size)
+ : size_ (size)
+ {
+ ACE_OS::memcpy (payload_, msg, size_);
+ }
+
+ void const*
+ payload () const
+ {
+ return payload_;
+ }
+
+ size_t
+ size () const
+ {
+ return size_;
+ }
+
+ private:
+ size_t size_;
+ char payload_[Protocol::MAX_PAYLOAD_SIZE];
+ };
+
+ typedef
+ ACE_Refcounted_Auto_Ptr<Send, ACE_Null_Mutex>
+ SendPtr;
+
+
+ class Recv : public virtual Message
+ {
+ public:
+ Recv (void const* msg, size_t size)
+ : size_ (size)
+ {
+ ACE_OS::memcpy (payload_, msg, size_);
+ }
+
+ void const*
+ payload () const
+ {
+ return payload_;
+ }
+
+ size_t
+ size () const
+ {
+ return size_;
+ }
+
+ private:
+ size_t size_;
+ char payload_[Protocol::MAX_PAYLOAD_SIZE];
+ };
+
+ typedef
+ ACE_Refcounted_Auto_Ptr<Recv, ACE_Null_Mutex>
+ RecvPtr;
+
+ class Aborted : public virtual Message {};
+
+ class Commited : public virtual Message {};
+
+
+ //
+ //
+ //
+ class TransactionController
+ {
+ public:
+ TransactionController (MessageQueue& in,
+ MessageQueue& send_out,
+ MessageQueue& recv_out)
+ : trace_ (false),
+ voting_duration_ (0),
+ separation_duration_ (0),
+ in_ (in),
+ send_out_ (send_out),
+ recv_out_ (recv_out)
+ {
+ current_.id = 0;
+ current_.status = Protocol::TS_COMMITED;
+ }
+
+ public:
+ class Failure {};
+
+
+ void
+ outsync (Protocol::Transaction& c, void* payload, size_t& size)
+ {
+ if (current_.status == Protocol::TS_COMMIT ||
+ current_.status == Protocol::TS_ABORT)
+ {
+ if (++voting_duration_ >= Protocol::VOTING_FRAME)
+ {
+ // end of voting frame
+
+ if (current_.status == Protocol::TS_COMMIT)
+ {
+ {
+ if (initiated_)
+ {
+ MessageQueueAutoLock lock (send_out_);
+ send_out_.push (MessagePtr (new Commited));
+ }
+ else // joined transaction
+ {
+ MessageQueueAutoLock lock (recv_out_);
+ recv_out_.push (MessagePtr (recv_.release ()));
+ recv_ = RecvPtr ();
+ }
+ }
+
+ current_.status = Protocol::TS_COMMITED;
+
+ // if (trace_) cerr << "commited transaction with id "
+ // << current_.id << endl;
+ }
+ else // TS_ABORT
+ {
+ if (initiated_)
+ {
+ MessageQueueAutoLock lock (send_out_);
+ send_out_.push (MessagePtr (new Aborted));
+ }
+ else
+ {
+ // free revc_ buffer if necessary
+ //
+ if (recv_.get ()) recv_ = RecvPtr ();
+ }
+
+
+ current_.status = Protocol::TS_ABORTED;
+
+ // if (trace_) cerr << "aborted transaction with id "
+ // << current_.id << endl;
+ }
+
+ // start transaction separation frame (counts down)
+ // +1 because it will be decremented on this iteration
+ separation_duration_ = Protocol::SEPARATION_FRAME + 1;
+ }
+ }
+
+ // Set current outsync info
+
+ c.id = current_.id;
+ c.status = current_.status;
+
+
+ // Do some post-processing
+
+ switch (current_.status)
+ {
+ case Protocol::TS_COMMITED:
+ case Protocol::TS_ABORTED:
+ {
+ if (separation_duration_ > 0) --separation_duration_;
+ break;
+ }
+ case Protocol::TS_BEGIN:
+ {
+ // transfer payload
+
+ size = send_->size ();
+ memcpy (payload, send_->payload (), size);
+
+ send_ = SendPtr ();
+
+ // get redy to vote for 'commit'
+
+ current_.status = Protocol::TS_COMMIT;
+ voting_duration_ = 0;
+ }
+ }
+ }
+
+ void
+ current_transaction (Protocol::Transaction const& t,
+ void const* payload,
+ size_t size)
+ {
+ Protocol::TransactionId& id = current_.id;
+ Protocol::TransactionStatus& s = current_.status;
+
+ if (id == 0 && t.id != 0) // catch up
+ {
+ switch (t.status)
+ {
+ case Protocol::TS_BEGIN:
+ case Protocol::TS_COMMIT:
+ case Protocol::TS_ABORT:
+ {
+ id = t.id - 1;
+ s = Protocol::TS_COMMITED;
+ break;
+ }
+ case Protocol::TS_ABORTED:
+ case Protocol::TS_COMMITED:
+ {
+ id = t.id;
+ s = t.status;
+ break;
+ }
+ }
+
+ // if (trace_) cerr << "caught up with id " << id << endl;
+ }
+
+ bool stable (s == Protocol::TS_COMMITED || s == Protocol::TS_ABORTED);
+
+ switch (t.status)
+ {
+ case Protocol::TS_BEGIN:
+ {
+ if (!stable || t.id != id + 1)
+ {
+ // Transaction is in progress or hole in transaction id's
+
+ // cerr << "unexpected request to join " << t
+ // << " while on " << current_ << endl;
+
+ // if (!stable) cerr << "voting progress is " << voting_duration_
+ // << "/" << Protocol::VOTING_FRAME << endl;
+
+ if (t.id == id) // collision
+ {
+ if (!stable && s != Protocol::TS_ABORT)
+ {
+ // abort both
+ // cerr << "aborting both transactions" << endl;
+
+ s = Protocol::TS_ABORT;
+ voting_duration_ = 0; //@@ reset voting frame
+ }
+ }
+ else
+ {
+ // @@ delicate case. need to think more
+
+ // cerr << "Declaring node failed." << endl;
+ throw Failure ();
+ }
+ }
+ else
+ {
+ // join the transaction
+
+ initiated_ = false;
+
+ recv_ = RecvPtr (new Recv (payload, size));
+
+ id = t.id;
+ s = Protocol::TS_COMMIT;
+ voting_duration_ = 0;
+
+ // if (trace_) cerr << "joining-for-commit transaction with id "
+ // << id << endl;
+ }
+ break;
+ }
+ case Protocol::TS_COMMIT:
+ {
+ if (stable && id == t.id - 1)
+ {
+ // not begin and and we haven't joined
+
+ // join for abort
+
+ initiated_ = false;
+
+ current_.id = t.id;
+ current_.status = Protocol::TS_ABORT;
+ voting_duration_ = 0;
+
+ // if (trace_) cerr << "joining-for-abort transaction with id "
+ // << current_.id << endl;
+ }
+ break;
+ }
+ case Protocol::TS_ABORT:
+ {
+ if ((!stable && id == t.id && s == Protocol::TS_COMMIT) ||
+ (stable && id == t.id - 1)) // abort current || new transaction
+ {
+ // if (trace_) cerr << "voting-for-abort on transaction with id "
+ // << current_.id << endl;
+
+ id = t.id;
+ s = Protocol::TS_ABORT;
+
+ voting_duration_ = 0; //@@ reseting voting_duration_
+ }
+ else
+ {
+ }
+
+ break;
+ }
+ case Protocol::TS_ABORTED:
+ case Protocol::TS_COMMITED:
+ {
+ // nothing for now
+ break;
+ }
+ }
+ }
+
+ void
+ api ()
+ {
+ if ((current_.status == Protocol::TS_COMMITED ||
+ current_.status == Protocol::TS_ABORTED) &&
+ separation_duration_ == 0) // no transaction in progress
+ {
+ // start new transaction
+
+ // Note that in_ is already locked by Scheduler
+
+ MessagePtr m (in_.front ());
+ in_.pop ();
+
+ if (typeid (*m) == typeid (Send))
+ {
+ send_ = SendPtr (dynamic_cast<Send*> (m.release ()));
+ }
+ else
+ {
+ // cerr << "Expecting Send but received " << typeid (*m).name ()
+ // << endl;
+
+ ::abort ();
+ }
+
+ current_.id++;
+ current_.status = Protocol::TS_BEGIN;
+
+ initiated_ = true;
+
+ // if (trace_) cerr << "starting transaction with id " << current_.id
+ // << endl;
+ }
+ }
+
+ private:
+ typedef ACE_Guard<ACE_Thread_Mutex> AutoLock;
+
+ bool trace_;
+
+ Protocol::Transaction current_;
+
+ bool initiated_;
+
+ unsigned short voting_duration_;
+ unsigned short separation_duration_;
+
+ MessageQueue& in_;
+ MessageQueue& send_out_;
+ MessageQueue& recv_out_;
+
+ SendPtr send_;
+ RecvPtr recv_;
+ };
+}