summaryrefslogtreecommitdiff
path: root/ACE/protocols/ace/TMCast
diff options
context:
space:
mode:
Diffstat (limited to 'ACE/protocols/ace/TMCast')
-rw-r--r--ACE/protocols/ace/TMCast/ACE_TMCast.pc.in11
-rw-r--r--ACE/protocols/ace/TMCast/Export.hpp58
-rw-r--r--ACE/protocols/ace/TMCast/FaultDetector.hpp45
-rw-r--r--ACE/protocols/ace/TMCast/Group.cpp506
-rw-r--r--ACE/protocols/ace/TMCast/Group.hpp51
-rw-r--r--ACE/protocols/ace/TMCast/GroupFwd.hpp15
-rw-r--r--ACE/protocols/ace/TMCast/LinkListener.hpp171
-rw-r--r--ACE/protocols/ace/TMCast/MTQueue.cpp7
-rw-r--r--ACE/protocols/ace/TMCast/MTQueue.hpp176
-rw-r--r--ACE/protocols/ace/TMCast/Makefile.am76
-rw-r--r--ACE/protocols/ace/TMCast/Messaging.hpp54
-rw-r--r--ACE/protocols/ace/TMCast/Protocol.cpp31
-rw-r--r--ACE/protocols/ace/TMCast/Protocol.hpp107
-rw-r--r--ACE/protocols/ace/TMCast/README240
-rw-r--r--ACE/protocols/ace/TMCast/TMCast.mpc12
-rw-r--r--ACE/protocols/ace/TMCast/TransactionController.hpp388
16 files changed, 1948 insertions, 0 deletions
diff --git a/ACE/protocols/ace/TMCast/ACE_TMCast.pc.in b/ACE/protocols/ace/TMCast/ACE_TMCast.pc.in
new file mode 100644
index 00000000000..a56956f81b2
--- /dev/null
+++ b/ACE/protocols/ace/TMCast/ACE_TMCast.pc.in
@@ -0,0 +1,11 @@
+prefix=@prefix@
+exec_prefix=@exec_prefix@
+libdir=@libdir@
+includedir=@includedir@
+
+Name: ACE_TMCast
+Description: ACE Transaction Multicast Library
+Requires: ACE
+Version: @VERSION@
+Libs: -L${libdir} -lACE_TMCast
+Cflags: -I${includedir}
diff --git a/ACE/protocols/ace/TMCast/Export.hpp b/ACE/protocols/ace/TMCast/Export.hpp
new file mode 100644
index 00000000000..bf04f7ee114
--- /dev/null
+++ b/ACE/protocols/ace/TMCast/Export.hpp
@@ -0,0 +1,58 @@
+
+// -*- C++ -*-
+// $Id$
+// Definition for Win32 Export directives.
+// This file is generated automatically by generate_export_file.pl ACE_TMCast
+// ------------------------------
+#ifndef TMCAST_EXPORT_H
+#define TMCAST_EXPORT_H
+
+#include "ace/config-all.h"
+
+#if defined (ACE_AS_STATIC_LIBS) && !defined (TMCAST_HAS_DLL)
+# define TMCAST_HAS_DLL 0
+#endif /* ACE_AS_STATIC_LIBS && TMCAST_HAS_DLL */
+
+#if !defined (TMCAST_HAS_DLL)
+#define TMCAST_HAS_DLL 1
+#endif /* ! TMCAST_HAS_DLL */
+
+#if defined (TMCAST_HAS_DLL) && (TMCAST_HAS_DLL == 1)
+# if defined (TMCAST_BUILD_DLL)
+# define ACE_TMCast_Export ACE_Proper_Export_Flag
+# define TMCAST_SINGLETON_DECLARATION(T) ACE_EXPORT_SINGLETON_DECLARATION (T)
+# define TMCAST_SINGLETON_DECLARE(SINGLETON_TYPE, CLASS, LOCK) ACE_EXPORT_SINGLETON_DECLARE(SINGLETON_TYPE, CLASS, LOCK)
+# else /* TMCAST_BUILD_DLL */
+# define ACE_TMCast_Export ACE_Proper_Import_Flag
+# define TMCAST_SINGLETON_DECLARATION(T) ACE_IMPORT_SINGLETON_DECLARATION (T)
+# define TMCAST_SINGLETON_DECLARE(SINGLETON_TYPE, CLASS, LOCK) ACE_IMPORT_SINGLETON_DECLARE(SINGLETON_TYPE, CLASS, LOCK)
+# endif /* TMCAST_BUILD_DLL */
+#else /* TMCAST_HAS_DLL == 1 */
+# define ACE_TMCast_Export
+# define TMCAST_SINGLETON_DECLARATION(T)
+# define TMCAST_SINGLETON_DECLARE(SINGLETON_TYPE, CLASS, LOCK)
+#endif /* TMCAST_HAS_DLL == 1 */
+
+// Set TMCAST_NTRACE = 0 to turn on library specific tracing even if
+// tracing is turned off for ACE.
+#if !defined (TMCAST_NTRACE)
+# if (ACE_NTRACE == 1)
+# define TMCAST_NTRACE 1
+# else /* (ACE_NTRACE == 1) */
+# define TMCAST_NTRACE 0
+# endif /* (ACE_NTRACE == 1) */
+#endif /* !TMCAST_NTRACE */
+
+#if (TMCAST_NTRACE == 1)
+# define TMCAST_TRACE(X)
+#else /* (TMCAST_NTRACE == 1) */
+# if !defined (ACE_HAS_TRACE)
+# define ACE_HAS_TRACE
+# endif /* ACE_HAS_TRACE */
+# define TMCAST_TRACE(X) ACE_TRACE_IMPL(X)
+# include "ace/Trace.h"
+#endif /* (TMCAST_NTRACE == 1) */
+
+#endif /* TMCAST_EXPORT_H */
+
+// End of auto generated file.
diff --git a/ACE/protocols/ace/TMCast/FaultDetector.hpp b/ACE/protocols/ace/TMCast/FaultDetector.hpp
new file mode 100644
index 00000000000..49ffcdd174c
--- /dev/null
+++ b/ACE/protocols/ace/TMCast/FaultDetector.hpp
@@ -0,0 +1,45 @@
+// file : ACE_TMCast/FaultDetector.hpp
+// author : Boris Kolpackov <boris@dre.vanderbilt.edu>
+// cvs-id : $Id$
+
+#include "Protocol.hpp"
+
+namespace ACE_TMCast
+{
+ class FaultDetector
+ {
+ public:
+ FaultDetector ()
+ : alone_ (true), silence_period_ (-1)
+ {
+ }
+
+ public:
+ class Failed {};
+
+
+ void
+ insync ()
+ {
+ if (alone_)
+ alone_ = false;
+
+ silence_period_ = 0;
+ }
+
+ void
+ outsync ()
+ {
+ if (!alone_ && ++silence_period_ >= Protocol::FATAL_SILENCE_FRAME)
+ {
+ // cerr << "Silence period has been passed." << endl;
+ // cerr << "Decalring the node failed." << endl;
+ throw Failed ();
+ }
+ }
+
+ private:
+ bool alone_; // true if we haven't heard from any members yet.
+ short silence_period_;
+ };
+}
diff --git a/ACE/protocols/ace/TMCast/Group.cpp b/ACE/protocols/ace/TMCast/Group.cpp
new file mode 100644
index 00000000000..1f2b2a60dfd
--- /dev/null
+++ b/ACE/protocols/ace/TMCast/Group.cpp
@@ -0,0 +1,506 @@
+// file : ACE_TMCast/Group.cpp
+// author : Boris Kolpackov <boris@dre.vanderbilt.edu>
+// cvs-id : $Id$
+
+#include "Group.hpp"
+
+#include <typeinfo>
+
+// OS primitives
+#include <ace/OS.h>
+#include <ace/OS_NS_stdlib.h>
+#include <ace/Synch.h>
+#include <ace/Time_Value.h>
+#include <ace/SOCK_Dgram_Mcast.h>
+
+#include "Messaging.hpp"
+
+#include "Protocol.hpp"
+
+// Components
+
+#include "LinkListener.hpp"
+#include "FaultDetector.hpp"
+#include "TransactionController.hpp"
+
+namespace ACE_TMCast
+{
+ bool
+ operator== (std::type_info const* pa, std::type_info const& b)
+ {
+ return *pa == b;
+ }
+
+ //
+ //
+ //
+ class Terminate : public virtual Message {};
+
+
+ //
+ //
+ //
+ class Failure : public virtual Message {};
+
+
+ //
+ //
+ //
+ class Scheduler
+ {
+ public:
+ Scheduler (ACE_INET_Addr const& addr,
+ char const* id,
+ MessageQueue& out_send_data,
+ MessageQueue& out_recv_data,
+ MessageQueue& out_control)
+
+ : cond_ (mutex_),
+
+ addr_ (addr),
+ sock_ (),
+
+ out_control_ (out_control),
+
+ in_data_ (mutex_),
+ in_link_data_(mutex_),
+ in_control_ (mutex_),
+
+ sync_schedule (ACE_OS::gettimeofday ()),
+
+ transaction_controller_ (in_data_, out_send_data, out_recv_data)
+ {
+ ACE_OS::strncpy (id_, id, Protocol::MEMBER_ID_LENGTH);
+ id_[Protocol::MEMBER_ID_LENGTH - 1] = '\0';
+
+ sock_.set_option (IP_MULTICAST_TTL, 32); // @@ ttl is hardcoded
+
+ in_data_.subscribe (cond_);
+ in_link_data_.subscribe (cond_);
+ in_control_.subscribe (cond_);
+
+ ACE_thread_t unused;
+ if (ACE_OS::thr_create (&thread_thunk,
+ this,
+ THR_JOINABLE,
+ &unused,
+ &thread_) != 0) ACE_OS::abort ();
+ }
+
+ virtual ~Scheduler ()
+ {
+ {
+ MessageQueueAutoLock lock (in_control_);
+
+ in_control_.push (MessagePtr (new Terminate));
+ }
+
+ if (ACE_OS::thr_join (thread_, 0) != 0) ACE_OS::abort ();
+
+ // cerr << "Scheduler is down." << endl;
+ }
+
+ public:
+ MessageQueue&
+ in_data ()
+ {
+ return in_data_;
+ }
+
+ private:
+ static ACE_THR_FUNC_RETURN
+ thread_thunk (void* arg)
+ {
+ Scheduler* obj = reinterpret_cast<Scheduler*> (arg);
+ obj->execute ();
+ return 0;
+ }
+
+ void
+ execute ()
+ {
+ try
+ {
+ sock_.join (addr_);
+ auto_ptr<LinkListener> ll (new LinkListener (sock_, in_link_data_));
+
+ {
+ AutoLock lock (mutex_);
+
+ // Loop
+ //
+ //
+
+ while (true)
+ {
+ cond_.wait (&sync_schedule);
+
+ // "Loop of Fairness"
+
+ bool done = false;
+
+ do
+ {
+ // control message
+ //
+ //
+ if (!in_control_.empty ())
+ {
+ done = true;
+ break;
+ }
+
+ // outsync
+ //
+ //
+ if (sync_schedule < ACE_OS::gettimeofday ())
+ {
+ // OUTSYNC
+
+ outsync ();
+
+ // schedule next outsync
+ sync_schedule =
+ ACE_OS::gettimeofday () +
+ ACE_Time_Value (0, Protocol::SYNC_PERIOD);
+ }
+
+ // link message
+ //
+ //
+ if (!in_link_data_.empty ())
+ {
+ MessagePtr m (in_link_data_.front ());
+ in_link_data_.pop ();
+
+ std::type_info const* exp = &typeid (*m);
+
+ if (exp == typeid (LinkFailure))
+ {
+ // cerr << "link failure" << endl;
+ throw false;
+ }
+ else if (exp == typeid (LinkData))
+ {
+
+ LinkData* data = dynamic_cast<LinkData*> (m.get ());
+
+ // INSYNC, TL, CT
+
+ // Filter out loopback.
+ //
+ if (ACE_OS::strcmp (data->header().member_id.id, id_) != 0)
+ {
+ insync ();
+ transaction_list ();
+ current_transaction (data->header().current,
+ data->payload (),
+ data->size ());
+ }
+ }
+ else
+ {
+ // cerr << "unknown message type from link listener: "
+ // << typeid (*m).name () << endl;
+ ACE_OS::abort ();
+ }
+ }
+
+ // api message
+ //
+ //
+ if (!in_data_.empty ())
+ {
+ // API
+
+ api ();
+ }
+
+ } while (!in_link_data_.empty() ||
+ sync_schedule < ACE_OS::gettimeofday ());
+
+ if (done) break;
+ }
+ }
+ }
+ catch (...)
+ {
+ // cerr << "Exception in scheduler loop." << endl;
+ MessageQueueAutoLock lock (out_control_);
+ out_control_.push (MessagePtr (new Failure));
+ }
+ }
+
+ // Events
+ //
+ // Order:
+ //
+ // INSYNC, TSL, VOTE, BEGIN
+ // API
+ // OUTSYNC
+ //
+
+ void
+ insync ()
+ {
+ fault_detector_.insync ();
+ }
+
+ void
+ outsync ()
+ {
+ char buf[Protocol::MAX_MESSAGE_SIZE];
+
+ Protocol::MessageHeader* hdr =
+ reinterpret_cast<Protocol::MessageHeader*> (buf);
+
+ void* data = buf + sizeof (Protocol::MessageHeader);
+
+ hdr->length = sizeof (Protocol::MessageHeader);
+ hdr->check_sum = 0;
+
+ ACE_OS::strcpy (hdr->member_id.id, id_);
+
+ size_t size (0);
+
+ transaction_controller_.outsync (hdr->current, data, size);
+
+ hdr->length += size;
+
+ fault_detector_.outsync ();
+
+ // sock_.send (buf, hdr->length, addr_);
+ sock_.send (buf, hdr->length);
+ }
+
+ void
+ transaction_list ()
+ {
+ }
+
+ void
+ current_transaction (Protocol::Transaction const& t,
+ void const* payload,
+ size_t size)
+ {
+ transaction_controller_.current_transaction (t, payload, size);
+ }
+
+ void
+ api ()
+ {
+ transaction_controller_.api ();
+ }
+
+ private:
+ ACE_hthread_t thread_;
+
+ ACE_Thread_Mutex mutex_;
+ ACE_Condition<ACE_Thread_Mutex> cond_;
+
+ typedef ACE_Guard<ACE_Thread_Mutex> AutoLock;
+
+ char id_[Protocol::MEMBER_ID_LENGTH];
+
+ ACE_INET_Addr addr_;
+ ACE_SOCK_Dgram_Mcast sock_;
+
+ MessageQueue& out_control_;
+
+ MessageQueue in_data_;
+ MessageQueue in_link_data_;
+ MessageQueue in_control_;
+
+ // Protocol state
+ //
+ //
+
+ ACE_Time_Value sync_schedule;
+
+ FaultDetector fault_detector_;
+ TransactionController transaction_controller_;
+ };
+
+
+ //
+ //
+ //
+ class Group::GroupImpl
+ {
+ public:
+ virtual ~GroupImpl ()
+ {
+ }
+
+ GroupImpl (ACE_INET_Addr const& addr, char const* id)
+ throw (Group::Failed)
+ : send_cond_ (mutex_),
+ recv_cond_ (mutex_),
+ failed_ (false),
+ in_send_data_ (mutex_),
+ in_recv_data_ (mutex_),
+ in_control_ (mutex_),
+ scheduler_ (new Scheduler (addr,
+ id,
+ in_send_data_,
+ in_recv_data_,
+ in_control_)),
+ out_data_ (scheduler_->in_data ())
+ {
+ in_send_data_.subscribe (send_cond_);
+ in_recv_data_.subscribe (recv_cond_);
+
+ in_control_.subscribe (send_cond_);
+ in_control_.subscribe (recv_cond_);
+ }
+
+ void
+ send (void const* msg, size_t size)
+ throw (Group::InvalidArg, Group::Failed, Group::Aborted)
+ {
+ if (size > Protocol::MAX_PAYLOAD_SIZE) throw InvalidArg ();
+
+ // Note the potential deadlock if I lock mutex_ and out_data_ in
+ // reverse order.
+
+ MessageQueueAutoLock l1 (out_data_);
+ AutoLock l2 (mutex_);
+
+ throw_if_failed ();
+
+ out_data_.push (MessagePtr (new Send (msg, size)));
+
+ l1.unlock (); // no need to keep it locked
+
+ while (true)
+ {
+ throw_if_failed ();
+
+ if (!in_send_data_.empty ())
+ {
+ MessagePtr m (in_send_data_.front ());
+ in_send_data_.pop ();
+
+ std::type_info const* exp = &typeid (*m);
+
+ if (exp == typeid (ACE_TMCast::Aborted))
+ {
+ throw Group::Aborted ();
+ }
+ else if (exp == typeid (Commited))
+ {
+ return;
+ }
+ else
+ {
+ // cerr << "send: group-scheduler messaging protocol violation; "
+ // << "unexpected message " << typeid (*m).name ()
+ // << " " << typeid (Aborted).name () << endl;
+
+ ACE_OS::abort ();
+ }
+ }
+
+ // cerr << "send: waiting on condition" << endl;
+ send_cond_.wait ();
+ // cerr << "send: wokeup on condition" << endl;
+ }
+ }
+
+
+
+ size_t
+ recv (void* msg, size_t size) throw (Group::Failed, Group::InsufficienSpace)
+ {
+ AutoLock lock (mutex_);
+
+ while (true)
+ {
+ throw_if_failed ();
+
+ if (!in_recv_data_.empty ())
+ {
+ MessagePtr m (in_recv_data_.front ());
+ in_recv_data_.pop ();
+
+ std::type_info const* exp = &typeid (*m);
+
+ if (exp == typeid (Recv))
+ {
+ Recv* data = dynamic_cast<Recv*> (m.get ());
+
+ if (size < data->size ()) throw Group::InsufficienSpace ();
+
+ memcpy (msg, data->payload (), data->size ());
+
+ return data->size ();
+ }
+ else
+ {
+ // cerr << "recv: group-scheduler messaging protocol violation. "
+ // << "unexpected message " << typeid (*m).name () << endl;
+
+ ACE_OS::abort ();
+ }
+ }
+
+ recv_cond_.wait ();
+ }
+ }
+
+ private:
+ void
+ throw_if_failed ()
+ {
+ if (!failed_ && !in_control_.empty ()) failed_ = true;
+
+ if (failed_) throw Group::Failed ();
+ }
+
+ private:
+ ACE_Thread_Mutex mutex_;
+ ACE_Condition<ACE_Thread_Mutex> send_cond_;
+ ACE_Condition<ACE_Thread_Mutex> recv_cond_;
+
+ typedef ACE_Guard<ACE_Thread_Mutex> AutoLock;
+
+ bool failed_;
+
+ MessageQueue in_send_data_;
+ MessageQueue in_recv_data_;
+ MessageQueue in_control_;
+
+ auto_ptr<Scheduler> scheduler_;
+
+ MessageQueue& out_data_;
+ };
+
+
+ // Group
+ //
+ //
+ Group::
+ Group (ACE_INET_Addr const& addr, char const* id)
+ throw (Group::Failed)
+ : pimpl_ (new GroupImpl (addr, id))
+ {
+ }
+
+ Group::
+ ~Group ()
+ {
+ }
+
+ void
+ Group::send (void const* msg, size_t size) throw (Group::InvalidArg, Group::Failed, Group::Aborted)
+ {
+ pimpl_->send (msg, size);
+ }
+
+ size_t
+ Group::recv (void* msg, size_t size) throw (Group::Failed, Group::InsufficienSpace)
+ {
+ return pimpl_->recv (msg, size);
+ }
+}
+
diff --git a/ACE/protocols/ace/TMCast/Group.hpp b/ACE/protocols/ace/TMCast/Group.hpp
new file mode 100644
index 00000000000..13c49f210bb
--- /dev/null
+++ b/ACE/protocols/ace/TMCast/Group.hpp
@@ -0,0 +1,51 @@
+// file : ACE_TMCast/Group.hpp
+// author : Boris Kolpackov <boris@dre.vanderbilt.edu>
+// cvs-id : $Id$
+
+#ifndef TMCAST_GROUP_HPP
+#define TMCAST_GROUP_HPP
+
+#include <ace/Auto_Ptr.h>
+#include <ace/INET_Addr.h>
+
+#include "Export.hpp"
+
+namespace ACE_TMCast
+{
+ class ACE_TMCast_Export Group
+ {
+ public:
+ class Aborted {};
+ class Failed {};
+ class InvalidArg {};
+ class InsufficienSpace {};
+
+ public:
+ ~Group ();
+
+ Group (ACE_INET_Addr const& addr, char const* id) throw (Failed);
+
+ public:
+ void
+ send (void const* msg, size_t size) throw (InvalidArg, Failed, Aborted);
+
+ size_t
+ recv (void* msg, size_t size) throw (Failed, InsufficienSpace);
+
+ private:
+ bool
+ failed ();
+
+ private:
+ class GroupImpl;
+ auto_ptr<GroupImpl> pimpl_;
+
+ private:
+ Group (Group const&);
+
+ Group&
+ operator= (Group const&);
+ };
+}
+
+#endif // TMCAST_GROUP_HPP
diff --git a/ACE/protocols/ace/TMCast/GroupFwd.hpp b/ACE/protocols/ace/TMCast/GroupFwd.hpp
new file mode 100644
index 00000000000..b4ed7304ff7
--- /dev/null
+++ b/ACE/protocols/ace/TMCast/GroupFwd.hpp
@@ -0,0 +1,15 @@
+// file : ACE_TMCast/GroupFwd.hpp
+// author : Boris Kolpackov <boris@dre.vanderbilt.edu>
+// cvs-id : $Id$
+
+#ifndef TMCAST_GROUP_FWD_HPP
+#define TMCAST_GROUP_FWD_HPP
+
+#include "Export.hpp"
+
+namespace ACE_TMCast
+{
+ class ACE_TMCast_Export Group;
+}
+
+#endif // TMCAST_GROUP_FWD_HPP
diff --git a/ACE/protocols/ace/TMCast/LinkListener.hpp b/ACE/protocols/ace/TMCast/LinkListener.hpp
new file mode 100644
index 00000000000..983f7828f3e
--- /dev/null
+++ b/ACE/protocols/ace/TMCast/LinkListener.hpp
@@ -0,0 +1,171 @@
+// file : ACE_TMCast/LinkListener.hpp
+// author : Boris Kolpackov <boris@dre.vanderbilt.edu>
+// cvs-id : $Id$
+
+// OS primitives
+#include <ace/OS_NS_string.h>
+#include <ace/OS_NS_stdlib.h>
+#include <ace/Synch.h>
+#include <ace/SOCK_Dgram_Mcast.h>
+#include <ace/Refcounted_Auto_Ptr.h>
+
+
+#include "Messaging.hpp"
+#include "Protocol.hpp"
+
+namespace ACE_TMCast
+{
+ //
+ //
+ //
+ class LinkFailure : public virtual Message {};
+
+
+ //
+ //
+ //
+ class LinkData : public virtual Message
+ {
+ public:
+ LinkData (Protocol::MessageHeader const* header,
+ void* payload,
+ size_t size)
+ : size_ (size)
+ {
+ ACE_OS::memcpy (&header_, header, sizeof (Protocol::MessageHeader));
+ ACE_OS::memcpy (payload_, payload, size_);
+ }
+
+ Protocol::MessageHeader const&
+ header () const
+ {
+ return header_;
+ }
+
+ void const*
+ payload () const
+ {
+ return payload_;
+ }
+
+ size_t
+ size () const
+ {
+ return size_;
+ }
+
+ private:
+ Protocol::MessageHeader header_;
+ char payload_[Protocol::MAX_MESSAGE_SIZE];
+ size_t size_;
+ };
+
+ typedef
+ ACE_Refcounted_Auto_Ptr<LinkData, ACE_Null_Mutex>
+ LinkDataPtr;
+
+ //
+ //
+ //
+ class LinkListener
+ {
+ private:
+ class Terminate : public virtual Message {};
+
+ public:
+ LinkListener (ACE_SOCK_Dgram_Mcast& sock, MessageQueue& out)
+ : sock_(sock), out_ (out)
+ {
+ ACE_thread_t unused;
+ if (ACE_OS::thr_create (&thread_thunk,
+ this,
+ THR_JOINABLE,
+ &unused,
+ &thread_) != 0) ACE_OS::abort ();
+ }
+
+ ~LinkListener ()
+ {
+ {
+ MessageQueueAutoLock lock (control_);
+
+ control_.push (MessagePtr (new Terminate));
+ }
+
+ if (ACE_OS::thr_join (thread_, 0) != 0) ACE_OS::abort ();
+
+ // cerr << "Link listener is down." << endl;
+ }
+
+
+ static ACE_THR_FUNC_RETURN
+ thread_thunk (void* arg)
+ {
+ LinkListener* obj = reinterpret_cast<LinkListener*> (arg);
+
+ obj->execute ();
+ return 0;
+ }
+
+ void
+ execute ()
+ {
+ char msg[Protocol::MAX_MESSAGE_SIZE];
+
+ ssize_t header_size = sizeof (Protocol::MessageHeader);
+
+ // OS::Time timeout (1000000); // one millisecond
+
+ ACE_Time_Value timeout (0, 1000); // one millisecond
+
+ try
+ {
+ while (true)
+ {
+ // Check control message queue
+
+ {
+ MessageQueueAutoLock lock (control_);
+
+ if (!control_.empty ()) break;
+ }
+
+ ACE_INET_Addr junk;
+ ssize_t n = sock_.recv (msg,
+ Protocol::MAX_MESSAGE_SIZE,
+ junk,
+ 0,
+ &timeout);
+
+ if (n != -1)
+ {
+ if (n < header_size) throw false;
+
+ Protocol::MessageHeader* header =
+ reinterpret_cast<Protocol::MessageHeader*> (msg);
+
+ MessageQueueAutoLock lock (out_);
+
+ out_.push (MessagePtr (new LinkData (header,
+ msg + header_size,
+ n - header_size)));
+ }
+ }
+ }
+ catch (...)
+ {
+ MessageQueueAutoLock lock (out_);
+
+ out_.push (MessagePtr (new LinkFailure));
+ }
+ }
+
+ private:
+ typedef ACE_Guard<ACE_Thread_Mutex> AutoLock;
+
+ ACE_hthread_t thread_;
+ ACE_SOCK_Dgram_Mcast& sock_;
+ MessageQueue& out_;
+ MessageQueue control_;
+ };
+}
diff --git a/ACE/protocols/ace/TMCast/MTQueue.cpp b/ACE/protocols/ace/TMCast/MTQueue.cpp
new file mode 100644
index 00000000000..9f08db5843d
--- /dev/null
+++ b/ACE/protocols/ace/TMCast/MTQueue.cpp
@@ -0,0 +1,7 @@
+// file : ACE_TMCast/MTQueue.cpp
+// author : Steve Huston <shuston@riverace.com>
+// cvs-id : $Id$
+
+#include "LinkListener.hpp"
+#include "MTQueue.hpp"
+
diff --git a/ACE/protocols/ace/TMCast/MTQueue.hpp b/ACE/protocols/ace/TMCast/MTQueue.hpp
new file mode 100644
index 00000000000..2eb128823fe
--- /dev/null
+++ b/ACE/protocols/ace/TMCast/MTQueue.hpp
@@ -0,0 +1,176 @@
+// file : ACE_TMCast/MTQueue.hpp
+// author : Boris Kolpackov <boris@dre.vanderbilt.edu>
+// cvs-id : $Id$
+
+#ifndef TMCAST_MT_QUEUE_HPP
+#define TMCAST_MT_QUEUE_HPP
+
+#include "ace/Auto_Ptr.h"
+#include "ace/Unbounded_Set.h"
+#include "ace/Unbounded_Queue.h"
+#include "ace/os_include/sys/os_types.h"
+
+namespace ACE_TMCast
+{
+ template <typename T,
+ typename M,
+ typename C,
+ typename Q = ACE_Unbounded_Queue<T> >
+ class MTQueue
+ {
+ public:
+ typedef T ElementType;
+ typedef M MutexType;
+ typedef C ConditionalType;
+ typedef Q QueueType;
+
+ public:
+
+ MTQueue ()
+ : mutexp_ (new MutexType),
+ mutex_ (*mutexp_),
+ queue_ (),
+ signal_ (false)
+ {
+ }
+
+ MTQueue (MutexType& mutex)
+ : mutexp_ (),
+ mutex_ (mutex),
+ queue_ (),
+ signal_ (false)
+ {
+ }
+
+ public:
+ bool
+ empty () const
+ {
+ return queue_.is_empty ();
+ }
+
+ size_t
+ size () const
+ {
+ return queue_.size ();
+ }
+
+ // typedef typename QueueType::Empty Empty;
+
+ class Empty {};
+
+ T&
+ front ()
+ {
+ ACE_Unbounded_Queue_Iterator<T> f (queue_);
+ T* tmp;
+ if (!f.next (tmp)) throw Empty ();
+
+ return *tmp;
+ }
+
+
+ T const&
+ front () const
+ {
+ ACE_Unbounded_Queue_Const_Iterator<T> f (queue_);
+ T* tmp;
+ if (!f.next (tmp)) throw Empty ();
+
+ return *tmp;
+ }
+
+ /*
+ T&
+ back ()
+ {
+ return queue_.back ();
+ }
+
+
+ T const&
+ back () const
+ {
+ return queue_.back ();
+ }
+ */
+
+ void
+ push (T const& t)
+ {
+ signal_ = empty ();
+ queue_.enqueue_tail (t);
+ }
+
+ void
+ pop ()
+ {
+ T junk;
+ queue_.dequeue_head (junk);
+ }
+
+ public:
+ void
+ lock () const
+ {
+ mutex_.acquire ();
+ }
+
+ void
+ unlock () const
+ {
+ if (signal_)
+ {
+ signal_ = false;
+
+ for (ConditionalSetConstIterator_ i (cond_set_);
+ !i.done ();
+ i.advance ())
+ {
+ ConditionalType** c = 0;
+
+ i.next (c);
+
+ (*c)->signal ();
+ }
+ }
+
+ mutex_.release ();
+ }
+
+ void
+ subscribe (ConditionalType& c)
+ {
+ //@@ should check for duplicates
+ //
+ cond_set_.insert (&c);
+ }
+
+ void
+ unsubscribe (ConditionalType& c)
+ {
+ //@@ should check for absence
+ //
+ cond_set_.remove (&c);
+ }
+
+ private:
+ auto_ptr<MutexType> mutexp_;
+ MutexType& mutex_;
+ QueueType queue_;
+
+ typedef
+ ACE_Unbounded_Set<ConditionalType*>
+ ConditionalSet_;
+
+ typedef
+ ACE_Unbounded_Set_Const_Iterator<ConditionalType*>
+ ConditionalSetConstIterator_;
+
+ ConditionalSet_ cond_set_;
+
+ mutable bool signal_;
+ };
+}
+
+#endif // TMCAST_MT_QUEUE_HPP
diff --git a/ACE/protocols/ace/TMCast/Makefile.am b/ACE/protocols/ace/TMCast/Makefile.am
new file mode 100644
index 00000000000..13f369142f4
--- /dev/null
+++ b/ACE/protocols/ace/TMCast/Makefile.am
@@ -0,0 +1,76 @@
+## Process this file with automake to create Makefile.in
+##
+## $Id$
+##
+## This file was generated by MPC. Any changes made directly to
+## this file will be lost the next time it is generated.
+##
+## MPC Command:
+## /acebuilds/ACE_wrappers-repository/bin/mwc.pl -include /acebuilds/MPC/config -include /acebuilds/MPC/templates -feature_file /acebuilds/ACE_wrappers-repository/local.features -noreldefs -type automake -exclude build,Kokyu
+
+includedir = @includedir@/protocols/ace/TMCast
+pkgconfigdir = @libdir@/pkgconfig
+
+ACE_BUILDDIR = $(top_builddir)
+ACE_ROOT = $(top_srcdir)
+
+
+## Makefile.TMCast.am
+
+if BUILD_EXCEPTIONS
+if BUILD_THREADS
+if !BUILD_ACE_FOR_TAO
+
+lib_LTLIBRARIES = libACE_TMCast.la
+
+libACE_TMCast_la_CPPFLAGS = \
+ -I$(ACE_ROOT) \
+ -I$(ACE_BUILDDIR) \
+ -DTMCAST_BUILD_DLL
+
+libACE_TMCast_la_SOURCES = \
+ Group.cpp \
+ MTQueue.cpp \
+ Protocol.cpp
+
+libACE_TMCast_la_LDFLAGS = \
+ -release @ACE_VERSION_NAME@
+
+libACE_TMCast_la_LIBADD = \
+ $(ACE_BUILDDIR)/ace/libACE.la
+
+nobase_include_HEADERS = \
+ Export.hpp \
+ FaultDetector.hpp \
+ Group.hpp \
+ GroupFwd.hpp \
+ LinkListener.hpp \
+ MTQueue.hpp \
+ Messaging.hpp \
+ Protocol.hpp \
+ TransactionController.hpp
+
+pkgconfig_DATA = \
+ ACE_TMCast.pc
+
+CLEANFILES = \
+ ACE_TMCast.pc
+
+ACE_TMCast.pc: ${top_builddir}/config.status ${srcdir}/ACE_TMCast.pc.in
+ ${top_builddir}/config.status --file "$@":${srcdir}/ACE_TMCast.pc.in
+
+endif !BUILD_ACE_FOR_TAO
+endif BUILD_THREADS
+endif BUILD_EXCEPTIONS
+
+EXTRA_DIST = \
+ ACE_TMCast.pc.in
+
+
+## Clean up template repositories, etc.
+clean-local:
+ -rm -f *~ *.bak *.rpo *.sym lib*.*_pure_* core core.*
+ -rm -f gcctemp.c gcctemp so_locations *.ics
+ -rm -rf cxx_repository ptrepository ti_files
+ -rm -rf templateregistry ir.out
+ -rm -rf ptrepository SunWS_cache Templates.DB
diff --git a/ACE/protocols/ace/TMCast/Messaging.hpp b/ACE/protocols/ace/TMCast/Messaging.hpp
new file mode 100644
index 00000000000..886745d1120
--- /dev/null
+++ b/ACE/protocols/ace/TMCast/Messaging.hpp
@@ -0,0 +1,54 @@
+// file : ACE_TMCast/Messaging.hpp
+// author : Boris Kolpackov <boris@dre.vanderbilt.edu>
+// cvs-id : $Id$
+
+#ifndef TMCAST_MESSAGING_HPP
+#define TMCAST_MESSAGING_HPP
+
+#include <ace/Synch.h>
+#include <ace/Refcounted_Auto_Ptr.h>
+
+#include "MTQueue.hpp"
+
+namespace ACE_TMCast
+{
+ class Message
+ {
+ public:
+ virtual
+ ~Message () {}
+ };
+
+ typedef
+ ACE_Refcounted_Auto_Ptr<Message, ACE_Null_Mutex>
+ MessagePtr;
+
+ typedef
+ MTQueue<MessagePtr, ACE_Thread_Mutex, ACE_Condition<ACE_Thread_Mutex> >
+ MessageQueue;
+
+ struct MessageQueueAutoLock
+ {
+ MessageQueueAutoLock (MessageQueue& q)
+ : q_ (q)
+ {
+ q_.lock ();
+ }
+
+ void
+ unlock ()
+ {
+ q_.unlock ();
+ }
+
+ ~MessageQueueAutoLock ()
+ {
+ q_.unlock ();
+ }
+
+ private:
+ MessageQueue& q_;
+ };
+}
+
+#endif // TMCAST_MESSAGING_HPP
diff --git a/ACE/protocols/ace/TMCast/Protocol.cpp b/ACE/protocols/ace/TMCast/Protocol.cpp
new file mode 100644
index 00000000000..ea4c6b39020
--- /dev/null
+++ b/ACE/protocols/ace/TMCast/Protocol.cpp
@@ -0,0 +1,31 @@
+// file : ACE_TMCast/Protocol.cpp
+// author : Boris Kolpackov <boris@dre.vanderbilt.edu>
+// cvs-id : $Id$
+
+#include "Protocol.hpp"
+
+namespace ACE_TMCast
+{
+ namespace Protocol
+ {
+ /*
+ namespace
+ {
+ char const* labels[] = {
+ "NONE", "BEGIN", "COMMIT", "ABORT", "COMMITED", "ABORTED"};
+ }
+
+ std::string
+ tslabel (Protocol::TransactionStatus s)
+ {
+ return labels[s];
+ }
+
+ std::ostream&
+ operator << (std::ostream& o, Transaction const& t)
+ {
+ return o << "{" << t.id << "; " << tslabel (t.status) << "}";
+ }
+ */
+ }
+}
diff --git a/ACE/protocols/ace/TMCast/Protocol.hpp b/ACE/protocols/ace/TMCast/Protocol.hpp
new file mode 100644
index 00000000000..6cdf374f4f9
--- /dev/null
+++ b/ACE/protocols/ace/TMCast/Protocol.hpp
@@ -0,0 +1,107 @@
+// file : ACE_TMCast/Protocol.hpp
+// author : Boris Kolpackov <boris@dre.vanderbilt.edu>
+// cvs-id : $Id$
+
+#ifndef TMCAST_PROTOCOL_HPP
+#define TMCAST_PROTOCOL_HPP
+
+namespace ACE_TMCast
+{
+ namespace Protocol
+ {
+ //
+ //
+ //
+ unsigned long const MEMBER_ID_LENGTH = 38;
+
+ struct MemberId
+ {
+ char id[MEMBER_ID_LENGTH];
+ /*
+ unsigned long ip;
+ unsigned short port;
+ */
+ };
+
+ //
+ //
+ //
+ typedef unsigned short TransactionId;
+
+
+
+ typedef unsigned char TransactionStatus;
+
+ TransactionStatus const TS_BEGIN = 1;
+ TransactionStatus const TS_COMMIT = 2;
+ TransactionStatus const TS_ABORT = 3;
+ TransactionStatus const TS_COMMITED = 4;
+ TransactionStatus const TS_ABORTED = 5;
+
+ struct Transaction
+ {
+ TransactionId id;
+ TransactionStatus status;
+ };
+
+ // Transaction List (TL)
+
+ // unsigned long const TL_LENGTH = 1;
+
+ // typedef Transaction TransactionList[TL_LENGTH];
+
+
+ //
+ //
+ //
+ struct MessageHeader
+ {
+ unsigned long length;
+
+ unsigned long check_sum;
+
+ MemberId member_id;
+
+ Transaction current;
+
+ //TransactionList transaction_list;
+ };
+
+
+ //
+ //
+ //
+
+ unsigned long const MAX_MESSAGE_SIZE = 768;
+
+ unsigned long const
+ MAX_PAYLOAD_SIZE = MAX_MESSAGE_SIZE - sizeof (MessageHeader);
+
+ // Protocol timing
+ //
+ //
+
+ unsigned long const SYNC_PERIOD = 30000; // in mks
+
+ unsigned short const VOTING_FRAME = 4; // in SYNC_PERIOD's
+ unsigned short const SEPARATION_FRAME = 5; // in SYNC_PERIOD's
+
+ // FATAL_SILENCE_FRAME in SYNC_PERIOD's
+ // Generally it's a good idea to set it to < VOTING_FRAME + SEPARATION_FRAME
+ //
+
+ short const FATAL_SILENCE_FRAME = VOTING_FRAME + SEPARATION_FRAME - 2;
+
+ // short const FATAL_SILENCE_FRAME = 10000;
+
+ // Helpers
+
+ // std::string
+ // tslabel (Protocol::TransactionStatus s);
+
+ // std::ostream&
+ // operator << (std::ostream& o, Transaction const& t);
+ }
+}
+
+#endif // TMCAST_PROTOCOL_HPP
diff --git a/ACE/protocols/ace/TMCast/README b/ACE/protocols/ace/TMCast/README
new file mode 100644
index 00000000000..7104be46e30
--- /dev/null
+++ b/ACE/protocols/ace/TMCast/README
@@ -0,0 +1,240 @@
+
+
+Introduction
+------------
+
+TMCast (stands for Transaction MultiCast) is an implementation of a
+transactional multicast protocol. In essence, the idea is to represent
+each message delivery to members of a multicast group as a transaction
+- an atomic, consistent and isolated action. A multicast transaction
+can be viewed as an atomic transition of the group members to a new
+state. If we define [Mo] as a set of operational (non-faulty) members
+of the group, [Mf] as a set of faulty members of the group, [Ma] as a
+set of members that view transition [Tn] as aborted and [Mc] as a set
+of members that view transition [Tn] as committed, then this atomic
+transition [Tn] should satisfy one of the following equations:
+
+Mo(Tn-1) = Ma(T) U Mf(T)
+Mo(Tn-1) = Mc(T) U Mf(T)
+
+Or, in other words, after transaction T has been committed (aborted),
+all operational (before transaction T) members are either in the
+committed (aborted) or failed state.
+
+Thus, for each member of the group, outcome of the transaction can be
+commit, abort or a member failure. It is important for a member to
+exhibit a failfast (error latency is less than transaction cycle)
+behavior. Or, in other words, if a member transitioned into a wrong
+state, it is guaranteed to fail instead of delivering a wrong result.
+
+In order to achieve such an error detection in a decentralized
+environment, certain limitations were imposed. One of the most
+user-visible limitation is the fact that the lifetime of the group
+with only one member is very short. This is because there is not way
+for a member to distinguish "no members yet" case from "my link to the
+group is down". In such a situation, the member assumes the latter
+case. There is also a military saying that puts it quite nicely: two
+is one, one is nothing.
+
+
+
+State of Implementation
+-----------------------
+
+The current implementation is in a prototypical stage. The following
+parts are not implemented or still under development:
+
+* Handling of network partitioning (TODO)
+
+* Redundant network support (TODO)
+
+* Member failure detection (partial implementation)
+
+
+Examples
+--------
+
+There is a simple example available in examples/TMCast/Member with
+the corresponding README.
+
+
+Architecture
+------------
+
+Primary goals of the protocol are to (1) mask transient failures of the
+underlying multicast protocol (or, more precisely, allow to recover
+from transient failures) and (2) exhibit failfast behavior in cases of
+permanent failures.
+
+The distinction between transient and permanent failures is based on
+timeouts thus what can be a transient failure in one configuration of
+the protocol could be a permanent failure in the other.
+
+[Maybe talk more about a transient/permanent threshold and its effect
+on performance/resource utilization/etc.]
+
+[Maybe add a terminology section.]
+
+Each member of a multicast group has its unique (group-wise) id:
+
+struct MemberId
+{
+ char id[MEMBER_ID_LENGTH];
+};
+
+Each payload delivery is part of a transaction. Each transaction is
+identified by TransactionId:
+
+typedef unsigned short TransactionId;
+
+
+Each transaction has a status code which identifies its state, as viewed by
+a group member:
+
+
+typedef unsigned char TransactionStatus;
+
+TransactionStatus const TS_BEGIN = 1;
+TransactionStatus const TS_COMMIT = 2;
+TransactionStatus const TS_ABORT = 3;
+TransactionStatus const TS_COMMITTED = 4;
+TransactionStatus const TS_ABORTED = 5;
+
+Thus each transaction is described by its id and status:
+
+struct Transaction
+{
+ TransactionId id;
+ TransactionStatus status;
+};
+
+The outcome of some predefined number of recent transactions is stored
+in TransactionList:
+
+typedef Transaction TransactionList[TL_LENGTH];
+
+
+Each message sent to a multicast group has the following header:
+
+struct MessageHeader
+{
+ unsigned long length;
+ unsigned long check_sum;
+ MemberId member_id;
+ Transaction current;
+ TransactionList transaction_list;
+};
+
+[Maybe describe each field here.]
+
+A new member joins the group with transaction id 0 and status
+TS_COMMITTED.
+
+Each member sends a periodic 'pulse' messages with some predefined interval
+advertising its current view of the group. This includes the state of the
+current transaction and the history of the recent transactions.
+
+
+If a member of the group needs a payload delivery it starts a new
+transaction by sending a message with current transaction set to
+
+{++current_id, TS_BEGIN}
+
+and payload appended after the header.
+
+
+Each member joins a transaction in one of the following ways:
+
+* A member that began the transaction joins it 'to commit' (TS_COMMIT)
+
+* A member that received TS_BEGIN joins current transaction 'to commit'
+ (TS_COMMIT).
+
+* A member that received TS_COMMIT or TS_ABORT but did not receive TS_BEGIN
+ joins current transaction 'to abort' (TS_ABORT).
+
+
+After a member has joined the transaction it starts participating in the
+transaction's voting phase. On this phase members of the group decide the
+fate of the transaction. Each member sends a predefined number of messages
+where it announces its vote. In between those messages the member is receiving
+and processing votes from other members and can be influenced by their
+'opinion'.
+
+In their decision-making members follow the principle of the majority. As
+the voting progresses (and comes close to an end) members become more and
+more reluctant to deviate from the decision of the majority.
+
+[Maybe add an equation that measures member's willingness to change
+its mind.]
+
+At the end of the voting phase each member declares the current transaction
+either committed (TS_COMMITTED) or aborted (TS_ABORTED). If this decision does
+not agree with the majority the member declares itself failed.
+
+In addition, each member builds a 'majority view' of the transaction history
+(based on transaction_list). If it deviates from the member's own history the
+member declares itself failed.
+
+Here are some example scenarios of how the protocol behaves in different
+situations. Let's say we have three members of the group S, R1, R2. S
+initiates a transaction. R1 and R2 join it.
+
+Scenario 1. (two-step voting)
+
+1. S initiates a transaction (TS_BEGIN)
+2a. R1 receives TS_BEGIN, joins for commit
+2b. R2 receives TS_BEGIN, joins for commit
+3a. S announces TS_COMMIT (first vote)
+3b. R1 announces TS_COMMIT (first vote)
+3c. R2 announces TS_COMMIT (first vote)
+4a. S announces TS_COMMIT (second vote)
+4b. R1 announces TS_COMMIT (second vote)
+4c. R2 announces TS_COMMIT (second vote)
+5a. S announces TS_COMMITTED (end of vote)
+5b. R1 announces TS_COMMITTED (end of vote)
+5c. R2 announces TS_COMMITTED (end of vote)
+
+
+Scenario 2. (two-step voting)
+
+1. S initiates a transaction (TS_BEGIN)
+2a. R1 receives TS_BEGIN, joins for commit
+2b. R2 didn't receive TS_BEGIN
+3a. S announces TS_COMMIT (first vote)
+3b. R1 announces TS_COMMIT (first vote)
+3c. R2 received R1's TS_COMMIT announces TS_ABORT (first vote)
+4a. S received R2's TS_ABORT announces TS_ABORT (second vote)
+4b. R1 received R2's TS_ABORT announces TS_ABORT (second vote)
+4c. R2 announces TS_ABORT (second vote)
+5a. S announces TS_ABORTED (end of vote)
+5b. R1 announces TS_ABORTED (end of vote)
+5c. R2 announces TS_ABORTED (end of vote)
+
+
+Scenario 3. (three-step voting)
+
+1. S initiates a transaction (TS_BEGIN)
+2a. R1 receives TS_BEGIN, joins for commit
+2b. R2 didn't receive TS_BEGIN
+3a. S announces TS_COMMIT (first vote)
+3b. R1 announces TS_COMMIT (first vote)
+3c. R2 still didn't receive anything
+4a. S announces TS_COMMIT (second vote)
+4b. R1 announces TS_COMMIT (second vote)
+4c. R2 received R1's TS_COMMIT, announces TS_ABORT (first vote)
+
+5a. S received R2's TS_ABORT but it is the end of the voting phase and
+ majority (S and R1) vote for commit, announces TS_COMMIT (third vote)
+5b. R1 received R2's TS_ABORT but it is the end of the voting phase and
+ majority (S and R1) vote for commit, announces TS_COMMIT (third vote)
+5c. R2 announces TS_ABORT (second vote)
+
+6a. S announces TS_COMMITTED (end of vote)
+6b. R1 announces TS_COMMITTED (end of vote)
+6c. R2 discovers that the the majority has declared current transaction
+ committed and thus declares itself failed.
+
+
+--
+Boris Kolpackov <boris@dre.vanderbilt.edu>
diff --git a/ACE/protocols/ace/TMCast/TMCast.mpc b/ACE/protocols/ace/TMCast/TMCast.mpc
new file mode 100644
index 00000000000..1ff937a0a1a
--- /dev/null
+++ b/ACE/protocols/ace/TMCast/TMCast.mpc
@@ -0,0 +1,12 @@
+// -*- MPC -*-
+// $Id$
+
+project : acelib, core, exceptions, threads {
+ avoids = ace_for_tao
+ sharedname = ACE_TMCast
+ dynamicflags += TMCAST_BUILD_DLL
+
+ Pkgconfig_Files {
+ ACE_TMCast.pc.in
+ }
+}
diff --git a/ACE/protocols/ace/TMCast/TransactionController.hpp b/ACE/protocols/ace/TMCast/TransactionController.hpp
new file mode 100644
index 00000000000..6b0d4281655
--- /dev/null
+++ b/ACE/protocols/ace/TMCast/TransactionController.hpp
@@ -0,0 +1,388 @@
+// file : ACE_TMCast/TransactionController.hpp
+// author : Boris Kolpackov <boris@dre.vanderbilt.edu>
+// cvs-id : $Id$
+
+#include "ace/OS_NS_string.h"
+#include "ace/OS_NS_stdlib.h"
+#include "ace/Synch.h"
+#include "ace/Refcounted_Auto_Ptr.h"
+
+#include "Protocol.hpp"
+#include "Messaging.hpp"
+
+#include <typeinfo>
+
+namespace ACE_TMCast
+{
+
+ // Messages
+ //
+ //
+ class Send : public virtual Message
+ {
+ public:
+ Send (void const* msg, size_t size)
+ : size_ (size)
+ {
+ ACE_OS::memcpy (payload_, msg, size_);
+ }
+
+ void const*
+ payload () const
+ {
+ return payload_;
+ }
+
+ size_t
+ size () const
+ {
+ return size_;
+ }
+
+ private:
+ size_t size_;
+ char payload_[Protocol::MAX_PAYLOAD_SIZE];
+ };
+
+ typedef
+ ACE_Refcounted_Auto_Ptr<Send, ACE_Null_Mutex>
+ SendPtr;
+
+
+ class Recv : public virtual Message
+ {
+ public:
+ Recv (void const* msg, size_t size)
+ : size_ (size)
+ {
+ ACE_OS::memcpy (payload_, msg, size_);
+ }
+
+ void const*
+ payload () const
+ {
+ return payload_;
+ }
+
+ size_t
+ size () const
+ {
+ return size_;
+ }
+
+ private:
+ size_t size_;
+ char payload_[Protocol::MAX_PAYLOAD_SIZE];
+ };
+
+ typedef
+ ACE_Refcounted_Auto_Ptr<Recv, ACE_Null_Mutex>
+ RecvPtr;
+
+ class Aborted : public virtual Message {};
+
+ class Commited : public virtual Message {};
+
+
+ //
+ //
+ //
+ class TransactionController
+ {
+ public:
+ TransactionController (MessageQueue& in,
+ MessageQueue& send_out,
+ MessageQueue& recv_out)
+ : trace_ (false),
+ voting_duration_ (0),
+ separation_duration_ (0),
+ in_ (in),
+ send_out_ (send_out),
+ recv_out_ (recv_out)
+ {
+ current_.id = 0;
+ current_.status = Protocol::TS_COMMITED;
+ }
+
+ public:
+ class Failure {};
+
+
+ void
+ outsync (Protocol::Transaction& c, void* payload, size_t& size)
+ {
+ if (current_.status == Protocol::TS_COMMIT ||
+ current_.status == Protocol::TS_ABORT)
+ {
+ if (++voting_duration_ >= Protocol::VOTING_FRAME)
+ {
+ // end of voting frame
+
+ if (current_.status == Protocol::TS_COMMIT)
+ {
+ {
+ if (initiated_)
+ {
+ MessageQueueAutoLock lock (send_out_);
+ send_out_.push (MessagePtr (new Commited));
+ }
+ else // joined transaction
+ {
+ MessageQueueAutoLock lock (recv_out_);
+ recv_out_.push (MessagePtr (recv_.release ()));
+ recv_ = RecvPtr ();
+ }
+ }
+
+ current_.status = Protocol::TS_COMMITED;
+
+ // if (trace_) cerr << "commited transaction with id "
+ // << current_.id << endl;
+ }
+ else // TS_ABORT
+ {
+ if (initiated_)
+ {
+ MessageQueueAutoLock lock (send_out_);
+ send_out_.push (MessagePtr (new Aborted));
+ }
+ else
+ {
+ // free revc_ buffer if necessary
+ //
+ if (recv_.get ()) recv_ = RecvPtr ();
+ }
+
+
+ current_.status = Protocol::TS_ABORTED;
+
+ // if (trace_) cerr << "aborted transaction with id "
+ // << current_.id << endl;
+ }
+
+ // start transaction separation frame (counts down)
+ // +1 because it will be decremented on this iteration
+ separation_duration_ = Protocol::SEPARATION_FRAME + 1;
+ }
+ }
+
+ // Set current outsync info
+
+ c.id = current_.id;
+ c.status = current_.status;
+
+
+ // Do some post-processing
+
+ switch (current_.status)
+ {
+ case Protocol::TS_COMMITED:
+ case Protocol::TS_ABORTED:
+ {
+ if (separation_duration_ > 0) --separation_duration_;
+ break;
+ }
+ case Protocol::TS_BEGIN:
+ {
+ // transfer payload
+
+ size = send_->size ();
+ memcpy (payload, send_->payload (), size);
+
+ send_ = SendPtr ();
+
+ // get redy to vote for 'commit'
+
+ current_.status = Protocol::TS_COMMIT;
+ voting_duration_ = 0;
+ }
+ }
+ }
+
+ void
+ current_transaction (Protocol::Transaction const& t,
+ void const* payload,
+ size_t size)
+ {
+ Protocol::TransactionId& id = current_.id;
+ Protocol::TransactionStatus& s = current_.status;
+
+ if (id == 0 && t.id != 0) // catch up
+ {
+ switch (t.status)
+ {
+ case Protocol::TS_BEGIN:
+ case Protocol::TS_COMMIT:
+ case Protocol::TS_ABORT:
+ {
+ id = t.id - 1;
+ s = Protocol::TS_COMMITED;
+ break;
+ }
+ case Protocol::TS_ABORTED:
+ case Protocol::TS_COMMITED:
+ {
+ id = t.id;
+ s = t.status;
+ break;
+ }
+ }
+
+ // if (trace_) cerr << "caught up with id " << id << endl;
+ }
+
+ bool stable (s == Protocol::TS_COMMITED || s == Protocol::TS_ABORTED);
+
+ switch (t.status)
+ {
+ case Protocol::TS_BEGIN:
+ {
+ if (!stable || t.id != id + 1)
+ {
+ // Transaction is in progress or hole in transaction id's
+
+ // cerr << "unexpected request to join " << t
+ // << " while on " << current_ << endl;
+
+ // if (!stable) cerr << "voting progress is " << voting_duration_
+ // << "/" << Protocol::VOTING_FRAME << endl;
+
+ if (t.id == id) // collision
+ {
+ if (!stable && s != Protocol::TS_ABORT)
+ {
+ // abort both
+ // cerr << "aborting both transactions" << endl;
+
+ s = Protocol::TS_ABORT;
+ voting_duration_ = 0; //@@ reset voting frame
+ }
+ }
+ else
+ {
+ // @@ delicate case. need to think more
+
+ // cerr << "Declaring node failed." << endl;
+ throw Failure ();
+ }
+ }
+ else
+ {
+ // join the transaction
+
+ initiated_ = false;
+
+ recv_ = RecvPtr (new Recv (payload, size));
+
+ id = t.id;
+ s = Protocol::TS_COMMIT;
+ voting_duration_ = 0;
+
+ // if (trace_) cerr << "joining-for-commit transaction with id "
+ // << id << endl;
+ }
+ break;
+ }
+ case Protocol::TS_COMMIT:
+ {
+ if (stable && id == t.id - 1)
+ {
+ // not begin and and we haven't joined
+
+ // join for abort
+
+ initiated_ = false;
+
+ current_.id = t.id;
+ current_.status = Protocol::TS_ABORT;
+ voting_duration_ = 0;
+
+ // if (trace_) cerr << "joining-for-abort transaction with id "
+ // << current_.id << endl;
+ }
+ break;
+ }
+ case Protocol::TS_ABORT:
+ {
+ if ((!stable && id == t.id && s == Protocol::TS_COMMIT) ||
+ (stable && id == t.id - 1)) // abort current || new transaction
+ {
+ // if (trace_) cerr << "voting-for-abort on transaction with id "
+ // << current_.id << endl;
+
+ id = t.id;
+ s = Protocol::TS_ABORT;
+
+ voting_duration_ = 0; //@@ reseting voting_duration_
+ }
+ else
+ {
+ }
+
+ break;
+ }
+ case Protocol::TS_ABORTED:
+ case Protocol::TS_COMMITED:
+ {
+ // nothing for now
+ break;
+ }
+ }
+ }
+
+ void
+ api ()
+ {
+ if ((current_.status == Protocol::TS_COMMITED ||
+ current_.status == Protocol::TS_ABORTED) &&
+ separation_duration_ == 0) // no transaction in progress
+ {
+ // start new transaction
+
+ // Note that in_ is already locked by Scheduler
+
+ MessagePtr m (in_.front ());
+ in_.pop ();
+
+ if (typeid (*m) == typeid (Send))
+ {
+ send_ = SendPtr (dynamic_cast<Send*> (m.release ()));
+ }
+ else
+ {
+ // cerr << "Expecting Send but received " << typeid (*m).name ()
+ // << endl;
+
+ ACE_OS::abort ();
+ }
+
+ current_.id++;
+ current_.status = Protocol::TS_BEGIN;
+
+ initiated_ = true;
+
+ // if (trace_) cerr << "starting transaction with id " << current_.id
+ // << endl;
+ }
+ }
+
+ private:
+ typedef ACE_Guard<ACE_Thread_Mutex> AutoLock;
+
+ bool trace_;
+
+ Protocol::Transaction current_;
+
+ bool initiated_;
+
+ unsigned short voting_duration_;
+ unsigned short separation_duration_;
+
+ MessageQueue& in_;
+ MessageQueue& send_out_;
+ MessageQueue& recv_out_;
+
+ SendPtr send_;
+ RecvPtr recv_;
+ };
+}