diff options
Diffstat (limited to 'ACE/protocols/ace/TMCast')
-rw-r--r-- | ACE/protocols/ace/TMCast/ACE_TMCast.pc.in | 11 | ||||
-rw-r--r-- | ACE/protocols/ace/TMCast/Export.hpp | 58 | ||||
-rw-r--r-- | ACE/protocols/ace/TMCast/FaultDetector.hpp | 45 | ||||
-rw-r--r-- | ACE/protocols/ace/TMCast/Group.cpp | 506 | ||||
-rw-r--r-- | ACE/protocols/ace/TMCast/Group.hpp | 51 | ||||
-rw-r--r-- | ACE/protocols/ace/TMCast/GroupFwd.hpp | 15 | ||||
-rw-r--r-- | ACE/protocols/ace/TMCast/LinkListener.hpp | 171 | ||||
-rw-r--r-- | ACE/protocols/ace/TMCast/MTQueue.cpp | 7 | ||||
-rw-r--r-- | ACE/protocols/ace/TMCast/MTQueue.hpp | 176 | ||||
-rw-r--r-- | ACE/protocols/ace/TMCast/Makefile.am | 76 | ||||
-rw-r--r-- | ACE/protocols/ace/TMCast/Messaging.hpp | 54 | ||||
-rw-r--r-- | ACE/protocols/ace/TMCast/Protocol.cpp | 31 | ||||
-rw-r--r-- | ACE/protocols/ace/TMCast/Protocol.hpp | 107 | ||||
-rw-r--r-- | ACE/protocols/ace/TMCast/README | 240 | ||||
-rw-r--r-- | ACE/protocols/ace/TMCast/TMCast.mpc | 12 | ||||
-rw-r--r-- | ACE/protocols/ace/TMCast/TransactionController.hpp | 388 |
16 files changed, 1948 insertions, 0 deletions
diff --git a/ACE/protocols/ace/TMCast/ACE_TMCast.pc.in b/ACE/protocols/ace/TMCast/ACE_TMCast.pc.in new file mode 100644 index 00000000000..a56956f81b2 --- /dev/null +++ b/ACE/protocols/ace/TMCast/ACE_TMCast.pc.in @@ -0,0 +1,11 @@ +prefix=@prefix@ +exec_prefix=@exec_prefix@ +libdir=@libdir@ +includedir=@includedir@ + +Name: ACE_TMCast +Description: ACE Transaction Multicast Library +Requires: ACE +Version: @VERSION@ +Libs: -L${libdir} -lACE_TMCast +Cflags: -I${includedir} diff --git a/ACE/protocols/ace/TMCast/Export.hpp b/ACE/protocols/ace/TMCast/Export.hpp new file mode 100644 index 00000000000..bf04f7ee114 --- /dev/null +++ b/ACE/protocols/ace/TMCast/Export.hpp @@ -0,0 +1,58 @@ + +// -*- C++ -*- +// $Id$ +// Definition for Win32 Export directives. +// This file is generated automatically by generate_export_file.pl ACE_TMCast +// ------------------------------ +#ifndef TMCAST_EXPORT_H +#define TMCAST_EXPORT_H + +#include "ace/config-all.h" + +#if defined (ACE_AS_STATIC_LIBS) && !defined (TMCAST_HAS_DLL) +# define TMCAST_HAS_DLL 0 +#endif /* ACE_AS_STATIC_LIBS && TMCAST_HAS_DLL */ + +#if !defined (TMCAST_HAS_DLL) +#define TMCAST_HAS_DLL 1 +#endif /* ! TMCAST_HAS_DLL */ + +#if defined (TMCAST_HAS_DLL) && (TMCAST_HAS_DLL == 1) +# if defined (TMCAST_BUILD_DLL) +# define ACE_TMCast_Export ACE_Proper_Export_Flag +# define TMCAST_SINGLETON_DECLARATION(T) ACE_EXPORT_SINGLETON_DECLARATION (T) +# define TMCAST_SINGLETON_DECLARE(SINGLETON_TYPE, CLASS, LOCK) ACE_EXPORT_SINGLETON_DECLARE(SINGLETON_TYPE, CLASS, LOCK) +# else /* TMCAST_BUILD_DLL */ +# define ACE_TMCast_Export ACE_Proper_Import_Flag +# define TMCAST_SINGLETON_DECLARATION(T) ACE_IMPORT_SINGLETON_DECLARATION (T) +# define TMCAST_SINGLETON_DECLARE(SINGLETON_TYPE, CLASS, LOCK) ACE_IMPORT_SINGLETON_DECLARE(SINGLETON_TYPE, CLASS, LOCK) +# endif /* TMCAST_BUILD_DLL */ +#else /* TMCAST_HAS_DLL == 1 */ +# define ACE_TMCast_Export +# define TMCAST_SINGLETON_DECLARATION(T) +# define TMCAST_SINGLETON_DECLARE(SINGLETON_TYPE, CLASS, LOCK) +#endif /* TMCAST_HAS_DLL == 1 */ + +// Set TMCAST_NTRACE = 0 to turn on library specific tracing even if +// tracing is turned off for ACE. +#if !defined (TMCAST_NTRACE) +# if (ACE_NTRACE == 1) +# define TMCAST_NTRACE 1 +# else /* (ACE_NTRACE == 1) */ +# define TMCAST_NTRACE 0 +# endif /* (ACE_NTRACE == 1) */ +#endif /* !TMCAST_NTRACE */ + +#if (TMCAST_NTRACE == 1) +# define TMCAST_TRACE(X) +#else /* (TMCAST_NTRACE == 1) */ +# if !defined (ACE_HAS_TRACE) +# define ACE_HAS_TRACE +# endif /* ACE_HAS_TRACE */ +# define TMCAST_TRACE(X) ACE_TRACE_IMPL(X) +# include "ace/Trace.h" +#endif /* (TMCAST_NTRACE == 1) */ + +#endif /* TMCAST_EXPORT_H */ + +// End of auto generated file. diff --git a/ACE/protocols/ace/TMCast/FaultDetector.hpp b/ACE/protocols/ace/TMCast/FaultDetector.hpp new file mode 100644 index 00000000000..49ffcdd174c --- /dev/null +++ b/ACE/protocols/ace/TMCast/FaultDetector.hpp @@ -0,0 +1,45 @@ +// file : ACE_TMCast/FaultDetector.hpp +// author : Boris Kolpackov <boris@dre.vanderbilt.edu> +// cvs-id : $Id$ + +#include "Protocol.hpp" + +namespace ACE_TMCast +{ + class FaultDetector + { + public: + FaultDetector () + : alone_ (true), silence_period_ (-1) + { + } + + public: + class Failed {}; + + + void + insync () + { + if (alone_) + alone_ = false; + + silence_period_ = 0; + } + + void + outsync () + { + if (!alone_ && ++silence_period_ >= Protocol::FATAL_SILENCE_FRAME) + { + // cerr << "Silence period has been passed." << endl; + // cerr << "Decalring the node failed." << endl; + throw Failed (); + } + } + + private: + bool alone_; // true if we haven't heard from any members yet. + short silence_period_; + }; +} diff --git a/ACE/protocols/ace/TMCast/Group.cpp b/ACE/protocols/ace/TMCast/Group.cpp new file mode 100644 index 00000000000..1f2b2a60dfd --- /dev/null +++ b/ACE/protocols/ace/TMCast/Group.cpp @@ -0,0 +1,506 @@ +// file : ACE_TMCast/Group.cpp +// author : Boris Kolpackov <boris@dre.vanderbilt.edu> +// cvs-id : $Id$ + +#include "Group.hpp" + +#include <typeinfo> + +// OS primitives +#include <ace/OS.h> +#include <ace/OS_NS_stdlib.h> +#include <ace/Synch.h> +#include <ace/Time_Value.h> +#include <ace/SOCK_Dgram_Mcast.h> + +#include "Messaging.hpp" + +#include "Protocol.hpp" + +// Components + +#include "LinkListener.hpp" +#include "FaultDetector.hpp" +#include "TransactionController.hpp" + +namespace ACE_TMCast +{ + bool + operator== (std::type_info const* pa, std::type_info const& b) + { + return *pa == b; + } + + // + // + // + class Terminate : public virtual Message {}; + + + // + // + // + class Failure : public virtual Message {}; + + + // + // + // + class Scheduler + { + public: + Scheduler (ACE_INET_Addr const& addr, + char const* id, + MessageQueue& out_send_data, + MessageQueue& out_recv_data, + MessageQueue& out_control) + + : cond_ (mutex_), + + addr_ (addr), + sock_ (), + + out_control_ (out_control), + + in_data_ (mutex_), + in_link_data_(mutex_), + in_control_ (mutex_), + + sync_schedule (ACE_OS::gettimeofday ()), + + transaction_controller_ (in_data_, out_send_data, out_recv_data) + { + ACE_OS::strncpy (id_, id, Protocol::MEMBER_ID_LENGTH); + id_[Protocol::MEMBER_ID_LENGTH - 1] = '\0'; + + sock_.set_option (IP_MULTICAST_TTL, 32); // @@ ttl is hardcoded + + in_data_.subscribe (cond_); + in_link_data_.subscribe (cond_); + in_control_.subscribe (cond_); + + ACE_thread_t unused; + if (ACE_OS::thr_create (&thread_thunk, + this, + THR_JOINABLE, + &unused, + &thread_) != 0) ACE_OS::abort (); + } + + virtual ~Scheduler () + { + { + MessageQueueAutoLock lock (in_control_); + + in_control_.push (MessagePtr (new Terminate)); + } + + if (ACE_OS::thr_join (thread_, 0) != 0) ACE_OS::abort (); + + // cerr << "Scheduler is down." << endl; + } + + public: + MessageQueue& + in_data () + { + return in_data_; + } + + private: + static ACE_THR_FUNC_RETURN + thread_thunk (void* arg) + { + Scheduler* obj = reinterpret_cast<Scheduler*> (arg); + obj->execute (); + return 0; + } + + void + execute () + { + try + { + sock_.join (addr_); + auto_ptr<LinkListener> ll (new LinkListener (sock_, in_link_data_)); + + { + AutoLock lock (mutex_); + + // Loop + // + // + + while (true) + { + cond_.wait (&sync_schedule); + + // "Loop of Fairness" + + bool done = false; + + do + { + // control message + // + // + if (!in_control_.empty ()) + { + done = true; + break; + } + + // outsync + // + // + if (sync_schedule < ACE_OS::gettimeofday ()) + { + // OUTSYNC + + outsync (); + + // schedule next outsync + sync_schedule = + ACE_OS::gettimeofday () + + ACE_Time_Value (0, Protocol::SYNC_PERIOD); + } + + // link message + // + // + if (!in_link_data_.empty ()) + { + MessagePtr m (in_link_data_.front ()); + in_link_data_.pop (); + + std::type_info const* exp = &typeid (*m); + + if (exp == typeid (LinkFailure)) + { + // cerr << "link failure" << endl; + throw false; + } + else if (exp == typeid (LinkData)) + { + + LinkData* data = dynamic_cast<LinkData*> (m.get ()); + + // INSYNC, TL, CT + + // Filter out loopback. + // + if (ACE_OS::strcmp (data->header().member_id.id, id_) != 0) + { + insync (); + transaction_list (); + current_transaction (data->header().current, + data->payload (), + data->size ()); + } + } + else + { + // cerr << "unknown message type from link listener: " + // << typeid (*m).name () << endl; + ACE_OS::abort (); + } + } + + // api message + // + // + if (!in_data_.empty ()) + { + // API + + api (); + } + + } while (!in_link_data_.empty() || + sync_schedule < ACE_OS::gettimeofday ()); + + if (done) break; + } + } + } + catch (...) + { + // cerr << "Exception in scheduler loop." << endl; + MessageQueueAutoLock lock (out_control_); + out_control_.push (MessagePtr (new Failure)); + } + } + + // Events + // + // Order: + // + // INSYNC, TSL, VOTE, BEGIN + // API + // OUTSYNC + // + + void + insync () + { + fault_detector_.insync (); + } + + void + outsync () + { + char buf[Protocol::MAX_MESSAGE_SIZE]; + + Protocol::MessageHeader* hdr = + reinterpret_cast<Protocol::MessageHeader*> (buf); + + void* data = buf + sizeof (Protocol::MessageHeader); + + hdr->length = sizeof (Protocol::MessageHeader); + hdr->check_sum = 0; + + ACE_OS::strcpy (hdr->member_id.id, id_); + + size_t size (0); + + transaction_controller_.outsync (hdr->current, data, size); + + hdr->length += size; + + fault_detector_.outsync (); + + // sock_.send (buf, hdr->length, addr_); + sock_.send (buf, hdr->length); + } + + void + transaction_list () + { + } + + void + current_transaction (Protocol::Transaction const& t, + void const* payload, + size_t size) + { + transaction_controller_.current_transaction (t, payload, size); + } + + void + api () + { + transaction_controller_.api (); + } + + private: + ACE_hthread_t thread_; + + ACE_Thread_Mutex mutex_; + ACE_Condition<ACE_Thread_Mutex> cond_; + + typedef ACE_Guard<ACE_Thread_Mutex> AutoLock; + + char id_[Protocol::MEMBER_ID_LENGTH]; + + ACE_INET_Addr addr_; + ACE_SOCK_Dgram_Mcast sock_; + + MessageQueue& out_control_; + + MessageQueue in_data_; + MessageQueue in_link_data_; + MessageQueue in_control_; + + // Protocol state + // + // + + ACE_Time_Value sync_schedule; + + FaultDetector fault_detector_; + TransactionController transaction_controller_; + }; + + + // + // + // + class Group::GroupImpl + { + public: + virtual ~GroupImpl () + { + } + + GroupImpl (ACE_INET_Addr const& addr, char const* id) + throw (Group::Failed) + : send_cond_ (mutex_), + recv_cond_ (mutex_), + failed_ (false), + in_send_data_ (mutex_), + in_recv_data_ (mutex_), + in_control_ (mutex_), + scheduler_ (new Scheduler (addr, + id, + in_send_data_, + in_recv_data_, + in_control_)), + out_data_ (scheduler_->in_data ()) + { + in_send_data_.subscribe (send_cond_); + in_recv_data_.subscribe (recv_cond_); + + in_control_.subscribe (send_cond_); + in_control_.subscribe (recv_cond_); + } + + void + send (void const* msg, size_t size) + throw (Group::InvalidArg, Group::Failed, Group::Aborted) + { + if (size > Protocol::MAX_PAYLOAD_SIZE) throw InvalidArg (); + + // Note the potential deadlock if I lock mutex_ and out_data_ in + // reverse order. + + MessageQueueAutoLock l1 (out_data_); + AutoLock l2 (mutex_); + + throw_if_failed (); + + out_data_.push (MessagePtr (new Send (msg, size))); + + l1.unlock (); // no need to keep it locked + + while (true) + { + throw_if_failed (); + + if (!in_send_data_.empty ()) + { + MessagePtr m (in_send_data_.front ()); + in_send_data_.pop (); + + std::type_info const* exp = &typeid (*m); + + if (exp == typeid (ACE_TMCast::Aborted)) + { + throw Group::Aborted (); + } + else if (exp == typeid (Commited)) + { + return; + } + else + { + // cerr << "send: group-scheduler messaging protocol violation; " + // << "unexpected message " << typeid (*m).name () + // << " " << typeid (Aborted).name () << endl; + + ACE_OS::abort (); + } + } + + // cerr << "send: waiting on condition" << endl; + send_cond_.wait (); + // cerr << "send: wokeup on condition" << endl; + } + } + + + + size_t + recv (void* msg, size_t size) throw (Group::Failed, Group::InsufficienSpace) + { + AutoLock lock (mutex_); + + while (true) + { + throw_if_failed (); + + if (!in_recv_data_.empty ()) + { + MessagePtr m (in_recv_data_.front ()); + in_recv_data_.pop (); + + std::type_info const* exp = &typeid (*m); + + if (exp == typeid (Recv)) + { + Recv* data = dynamic_cast<Recv*> (m.get ()); + + if (size < data->size ()) throw Group::InsufficienSpace (); + + memcpy (msg, data->payload (), data->size ()); + + return data->size (); + } + else + { + // cerr << "recv: group-scheduler messaging protocol violation. " + // << "unexpected message " << typeid (*m).name () << endl; + + ACE_OS::abort (); + } + } + + recv_cond_.wait (); + } + } + + private: + void + throw_if_failed () + { + if (!failed_ && !in_control_.empty ()) failed_ = true; + + if (failed_) throw Group::Failed (); + } + + private: + ACE_Thread_Mutex mutex_; + ACE_Condition<ACE_Thread_Mutex> send_cond_; + ACE_Condition<ACE_Thread_Mutex> recv_cond_; + + typedef ACE_Guard<ACE_Thread_Mutex> AutoLock; + + bool failed_; + + MessageQueue in_send_data_; + MessageQueue in_recv_data_; + MessageQueue in_control_; + + auto_ptr<Scheduler> scheduler_; + + MessageQueue& out_data_; + }; + + + // Group + // + // + Group:: + Group (ACE_INET_Addr const& addr, char const* id) + throw (Group::Failed) + : pimpl_ (new GroupImpl (addr, id)) + { + } + + Group:: + ~Group () + { + } + + void + Group::send (void const* msg, size_t size) throw (Group::InvalidArg, Group::Failed, Group::Aborted) + { + pimpl_->send (msg, size); + } + + size_t + Group::recv (void* msg, size_t size) throw (Group::Failed, Group::InsufficienSpace) + { + return pimpl_->recv (msg, size); + } +} + diff --git a/ACE/protocols/ace/TMCast/Group.hpp b/ACE/protocols/ace/TMCast/Group.hpp new file mode 100644 index 00000000000..13c49f210bb --- /dev/null +++ b/ACE/protocols/ace/TMCast/Group.hpp @@ -0,0 +1,51 @@ +// file : ACE_TMCast/Group.hpp +// author : Boris Kolpackov <boris@dre.vanderbilt.edu> +// cvs-id : $Id$ + +#ifndef TMCAST_GROUP_HPP +#define TMCAST_GROUP_HPP + +#include <ace/Auto_Ptr.h> +#include <ace/INET_Addr.h> + +#include "Export.hpp" + +namespace ACE_TMCast +{ + class ACE_TMCast_Export Group + { + public: + class Aborted {}; + class Failed {}; + class InvalidArg {}; + class InsufficienSpace {}; + + public: + ~Group (); + + Group (ACE_INET_Addr const& addr, char const* id) throw (Failed); + + public: + void + send (void const* msg, size_t size) throw (InvalidArg, Failed, Aborted); + + size_t + recv (void* msg, size_t size) throw (Failed, InsufficienSpace); + + private: + bool + failed (); + + private: + class GroupImpl; + auto_ptr<GroupImpl> pimpl_; + + private: + Group (Group const&); + + Group& + operator= (Group const&); + }; +} + +#endif // TMCAST_GROUP_HPP diff --git a/ACE/protocols/ace/TMCast/GroupFwd.hpp b/ACE/protocols/ace/TMCast/GroupFwd.hpp new file mode 100644 index 00000000000..b4ed7304ff7 --- /dev/null +++ b/ACE/protocols/ace/TMCast/GroupFwd.hpp @@ -0,0 +1,15 @@ +// file : ACE_TMCast/GroupFwd.hpp +// author : Boris Kolpackov <boris@dre.vanderbilt.edu> +// cvs-id : $Id$ + +#ifndef TMCAST_GROUP_FWD_HPP +#define TMCAST_GROUP_FWD_HPP + +#include "Export.hpp" + +namespace ACE_TMCast +{ + class ACE_TMCast_Export Group; +} + +#endif // TMCAST_GROUP_FWD_HPP diff --git a/ACE/protocols/ace/TMCast/LinkListener.hpp b/ACE/protocols/ace/TMCast/LinkListener.hpp new file mode 100644 index 00000000000..983f7828f3e --- /dev/null +++ b/ACE/protocols/ace/TMCast/LinkListener.hpp @@ -0,0 +1,171 @@ +// file : ACE_TMCast/LinkListener.hpp +// author : Boris Kolpackov <boris@dre.vanderbilt.edu> +// cvs-id : $Id$ + +// OS primitives +#include <ace/OS_NS_string.h> +#include <ace/OS_NS_stdlib.h> +#include <ace/Synch.h> +#include <ace/SOCK_Dgram_Mcast.h> +#include <ace/Refcounted_Auto_Ptr.h> + + +#include "Messaging.hpp" +#include "Protocol.hpp" + +namespace ACE_TMCast +{ + // + // + // + class LinkFailure : public virtual Message {}; + + + // + // + // + class LinkData : public virtual Message + { + public: + LinkData (Protocol::MessageHeader const* header, + void* payload, + size_t size) + : size_ (size) + { + ACE_OS::memcpy (&header_, header, sizeof (Protocol::MessageHeader)); + ACE_OS::memcpy (payload_, payload, size_); + } + + Protocol::MessageHeader const& + header () const + { + return header_; + } + + void const* + payload () const + { + return payload_; + } + + size_t + size () const + { + return size_; + } + + private: + Protocol::MessageHeader header_; + char payload_[Protocol::MAX_MESSAGE_SIZE]; + size_t size_; + }; + + typedef + ACE_Refcounted_Auto_Ptr<LinkData, ACE_Null_Mutex> + LinkDataPtr; + + // + // + // + class LinkListener + { + private: + class Terminate : public virtual Message {}; + + public: + LinkListener (ACE_SOCK_Dgram_Mcast& sock, MessageQueue& out) + : sock_(sock), out_ (out) + { + ACE_thread_t unused; + if (ACE_OS::thr_create (&thread_thunk, + this, + THR_JOINABLE, + &unused, + &thread_) != 0) ACE_OS::abort (); + } + + ~LinkListener () + { + { + MessageQueueAutoLock lock (control_); + + control_.push (MessagePtr (new Terminate)); + } + + if (ACE_OS::thr_join (thread_, 0) != 0) ACE_OS::abort (); + + // cerr << "Link listener is down." << endl; + } + + + static ACE_THR_FUNC_RETURN + thread_thunk (void* arg) + { + LinkListener* obj = reinterpret_cast<LinkListener*> (arg); + + obj->execute (); + return 0; + } + + void + execute () + { + char msg[Protocol::MAX_MESSAGE_SIZE]; + + ssize_t header_size = sizeof (Protocol::MessageHeader); + + // OS::Time timeout (1000000); // one millisecond + + ACE_Time_Value timeout (0, 1000); // one millisecond + + try + { + while (true) + { + // Check control message queue + + { + MessageQueueAutoLock lock (control_); + + if (!control_.empty ()) break; + } + + ACE_INET_Addr junk; + ssize_t n = sock_.recv (msg, + Protocol::MAX_MESSAGE_SIZE, + junk, + 0, + &timeout); + + if (n != -1) + { + if (n < header_size) throw false; + + Protocol::MessageHeader* header = + reinterpret_cast<Protocol::MessageHeader*> (msg); + + MessageQueueAutoLock lock (out_); + + out_.push (MessagePtr (new LinkData (header, + msg + header_size, + n - header_size))); + } + } + } + catch (...) + { + MessageQueueAutoLock lock (out_); + + out_.push (MessagePtr (new LinkFailure)); + } + } + + private: + typedef ACE_Guard<ACE_Thread_Mutex> AutoLock; + + ACE_hthread_t thread_; + ACE_SOCK_Dgram_Mcast& sock_; + MessageQueue& out_; + MessageQueue control_; + }; +} diff --git a/ACE/protocols/ace/TMCast/MTQueue.cpp b/ACE/protocols/ace/TMCast/MTQueue.cpp new file mode 100644 index 00000000000..9f08db5843d --- /dev/null +++ b/ACE/protocols/ace/TMCast/MTQueue.cpp @@ -0,0 +1,7 @@ +// file : ACE_TMCast/MTQueue.cpp +// author : Steve Huston <shuston@riverace.com> +// cvs-id : $Id$ + +#include "LinkListener.hpp" +#include "MTQueue.hpp" + diff --git a/ACE/protocols/ace/TMCast/MTQueue.hpp b/ACE/protocols/ace/TMCast/MTQueue.hpp new file mode 100644 index 00000000000..2eb128823fe --- /dev/null +++ b/ACE/protocols/ace/TMCast/MTQueue.hpp @@ -0,0 +1,176 @@ +// file : ACE_TMCast/MTQueue.hpp +// author : Boris Kolpackov <boris@dre.vanderbilt.edu> +// cvs-id : $Id$ + +#ifndef TMCAST_MT_QUEUE_HPP +#define TMCAST_MT_QUEUE_HPP + +#include "ace/Auto_Ptr.h" +#include "ace/Unbounded_Set.h" +#include "ace/Unbounded_Queue.h" +#include "ace/os_include/sys/os_types.h" + +namespace ACE_TMCast +{ + template <typename T, + typename M, + typename C, + typename Q = ACE_Unbounded_Queue<T> > + class MTQueue + { + public: + typedef T ElementType; + typedef M MutexType; + typedef C ConditionalType; + typedef Q QueueType; + + public: + + MTQueue () + : mutexp_ (new MutexType), + mutex_ (*mutexp_), + queue_ (), + signal_ (false) + { + } + + MTQueue (MutexType& mutex) + : mutexp_ (), + mutex_ (mutex), + queue_ (), + signal_ (false) + { + } + + public: + bool + empty () const + { + return queue_.is_empty (); + } + + size_t + size () const + { + return queue_.size (); + } + + // typedef typename QueueType::Empty Empty; + + class Empty {}; + + T& + front () + { + ACE_Unbounded_Queue_Iterator<T> f (queue_); + T* tmp; + if (!f.next (tmp)) throw Empty (); + + return *tmp; + } + + + T const& + front () const + { + ACE_Unbounded_Queue_Const_Iterator<T> f (queue_); + T* tmp; + if (!f.next (tmp)) throw Empty (); + + return *tmp; + } + + /* + T& + back () + { + return queue_.back (); + } + + + T const& + back () const + { + return queue_.back (); + } + */ + + void + push (T const& t) + { + signal_ = empty (); + queue_.enqueue_tail (t); + } + + void + pop () + { + T junk; + queue_.dequeue_head (junk); + } + + public: + void + lock () const + { + mutex_.acquire (); + } + + void + unlock () const + { + if (signal_) + { + signal_ = false; + + for (ConditionalSetConstIterator_ i (cond_set_); + !i.done (); + i.advance ()) + { + ConditionalType** c = 0; + + i.next (c); + + (*c)->signal (); + } + } + + mutex_.release (); + } + + void + subscribe (ConditionalType& c) + { + //@@ should check for duplicates + // + cond_set_.insert (&c); + } + + void + unsubscribe (ConditionalType& c) + { + //@@ should check for absence + // + cond_set_.remove (&c); + } + + private: + auto_ptr<MutexType> mutexp_; + MutexType& mutex_; + QueueType queue_; + + typedef + ACE_Unbounded_Set<ConditionalType*> + ConditionalSet_; + + typedef + ACE_Unbounded_Set_Const_Iterator<ConditionalType*> + ConditionalSetConstIterator_; + + ConditionalSet_ cond_set_; + + mutable bool signal_; + }; +} + +#endif // TMCAST_MT_QUEUE_HPP diff --git a/ACE/protocols/ace/TMCast/Makefile.am b/ACE/protocols/ace/TMCast/Makefile.am new file mode 100644 index 00000000000..13f369142f4 --- /dev/null +++ b/ACE/protocols/ace/TMCast/Makefile.am @@ -0,0 +1,76 @@ +## Process this file with automake to create Makefile.in +## +## $Id$ +## +## This file was generated by MPC. Any changes made directly to +## this file will be lost the next time it is generated. +## +## MPC Command: +## /acebuilds/ACE_wrappers-repository/bin/mwc.pl -include /acebuilds/MPC/config -include /acebuilds/MPC/templates -feature_file /acebuilds/ACE_wrappers-repository/local.features -noreldefs -type automake -exclude build,Kokyu + +includedir = @includedir@/protocols/ace/TMCast +pkgconfigdir = @libdir@/pkgconfig + +ACE_BUILDDIR = $(top_builddir) +ACE_ROOT = $(top_srcdir) + + +## Makefile.TMCast.am + +if BUILD_EXCEPTIONS +if BUILD_THREADS +if !BUILD_ACE_FOR_TAO + +lib_LTLIBRARIES = libACE_TMCast.la + +libACE_TMCast_la_CPPFLAGS = \ + -I$(ACE_ROOT) \ + -I$(ACE_BUILDDIR) \ + -DTMCAST_BUILD_DLL + +libACE_TMCast_la_SOURCES = \ + Group.cpp \ + MTQueue.cpp \ + Protocol.cpp + +libACE_TMCast_la_LDFLAGS = \ + -release @ACE_VERSION_NAME@ + +libACE_TMCast_la_LIBADD = \ + $(ACE_BUILDDIR)/ace/libACE.la + +nobase_include_HEADERS = \ + Export.hpp \ + FaultDetector.hpp \ + Group.hpp \ + GroupFwd.hpp \ + LinkListener.hpp \ + MTQueue.hpp \ + Messaging.hpp \ + Protocol.hpp \ + TransactionController.hpp + +pkgconfig_DATA = \ + ACE_TMCast.pc + +CLEANFILES = \ + ACE_TMCast.pc + +ACE_TMCast.pc: ${top_builddir}/config.status ${srcdir}/ACE_TMCast.pc.in + ${top_builddir}/config.status --file "$@":${srcdir}/ACE_TMCast.pc.in + +endif !BUILD_ACE_FOR_TAO +endif BUILD_THREADS +endif BUILD_EXCEPTIONS + +EXTRA_DIST = \ + ACE_TMCast.pc.in + + +## Clean up template repositories, etc. +clean-local: + -rm -f *~ *.bak *.rpo *.sym lib*.*_pure_* core core.* + -rm -f gcctemp.c gcctemp so_locations *.ics + -rm -rf cxx_repository ptrepository ti_files + -rm -rf templateregistry ir.out + -rm -rf ptrepository SunWS_cache Templates.DB diff --git a/ACE/protocols/ace/TMCast/Messaging.hpp b/ACE/protocols/ace/TMCast/Messaging.hpp new file mode 100644 index 00000000000..886745d1120 --- /dev/null +++ b/ACE/protocols/ace/TMCast/Messaging.hpp @@ -0,0 +1,54 @@ +// file : ACE_TMCast/Messaging.hpp +// author : Boris Kolpackov <boris@dre.vanderbilt.edu> +// cvs-id : $Id$ + +#ifndef TMCAST_MESSAGING_HPP +#define TMCAST_MESSAGING_HPP + +#include <ace/Synch.h> +#include <ace/Refcounted_Auto_Ptr.h> + +#include "MTQueue.hpp" + +namespace ACE_TMCast +{ + class Message + { + public: + virtual + ~Message () {} + }; + + typedef + ACE_Refcounted_Auto_Ptr<Message, ACE_Null_Mutex> + MessagePtr; + + typedef + MTQueue<MessagePtr, ACE_Thread_Mutex, ACE_Condition<ACE_Thread_Mutex> > + MessageQueue; + + struct MessageQueueAutoLock + { + MessageQueueAutoLock (MessageQueue& q) + : q_ (q) + { + q_.lock (); + } + + void + unlock () + { + q_.unlock (); + } + + ~MessageQueueAutoLock () + { + q_.unlock (); + } + + private: + MessageQueue& q_; + }; +} + +#endif // TMCAST_MESSAGING_HPP diff --git a/ACE/protocols/ace/TMCast/Protocol.cpp b/ACE/protocols/ace/TMCast/Protocol.cpp new file mode 100644 index 00000000000..ea4c6b39020 --- /dev/null +++ b/ACE/protocols/ace/TMCast/Protocol.cpp @@ -0,0 +1,31 @@ +// file : ACE_TMCast/Protocol.cpp +// author : Boris Kolpackov <boris@dre.vanderbilt.edu> +// cvs-id : $Id$ + +#include "Protocol.hpp" + +namespace ACE_TMCast +{ + namespace Protocol + { + /* + namespace + { + char const* labels[] = { + "NONE", "BEGIN", "COMMIT", "ABORT", "COMMITED", "ABORTED"}; + } + + std::string + tslabel (Protocol::TransactionStatus s) + { + return labels[s]; + } + + std::ostream& + operator << (std::ostream& o, Transaction const& t) + { + return o << "{" << t.id << "; " << tslabel (t.status) << "}"; + } + */ + } +} diff --git a/ACE/protocols/ace/TMCast/Protocol.hpp b/ACE/protocols/ace/TMCast/Protocol.hpp new file mode 100644 index 00000000000..6cdf374f4f9 --- /dev/null +++ b/ACE/protocols/ace/TMCast/Protocol.hpp @@ -0,0 +1,107 @@ +// file : ACE_TMCast/Protocol.hpp +// author : Boris Kolpackov <boris@dre.vanderbilt.edu> +// cvs-id : $Id$ + +#ifndef TMCAST_PROTOCOL_HPP +#define TMCAST_PROTOCOL_HPP + +namespace ACE_TMCast +{ + namespace Protocol + { + // + // + // + unsigned long const MEMBER_ID_LENGTH = 38; + + struct MemberId + { + char id[MEMBER_ID_LENGTH]; + /* + unsigned long ip; + unsigned short port; + */ + }; + + // + // + // + typedef unsigned short TransactionId; + + + + typedef unsigned char TransactionStatus; + + TransactionStatus const TS_BEGIN = 1; + TransactionStatus const TS_COMMIT = 2; + TransactionStatus const TS_ABORT = 3; + TransactionStatus const TS_COMMITED = 4; + TransactionStatus const TS_ABORTED = 5; + + struct Transaction + { + TransactionId id; + TransactionStatus status; + }; + + // Transaction List (TL) + + // unsigned long const TL_LENGTH = 1; + + // typedef Transaction TransactionList[TL_LENGTH]; + + + // + // + // + struct MessageHeader + { + unsigned long length; + + unsigned long check_sum; + + MemberId member_id; + + Transaction current; + + //TransactionList transaction_list; + }; + + + // + // + // + + unsigned long const MAX_MESSAGE_SIZE = 768; + + unsigned long const + MAX_PAYLOAD_SIZE = MAX_MESSAGE_SIZE - sizeof (MessageHeader); + + // Protocol timing + // + // + + unsigned long const SYNC_PERIOD = 30000; // in mks + + unsigned short const VOTING_FRAME = 4; // in SYNC_PERIOD's + unsigned short const SEPARATION_FRAME = 5; // in SYNC_PERIOD's + + // FATAL_SILENCE_FRAME in SYNC_PERIOD's + // Generally it's a good idea to set it to < VOTING_FRAME + SEPARATION_FRAME + // + + short const FATAL_SILENCE_FRAME = VOTING_FRAME + SEPARATION_FRAME - 2; + + // short const FATAL_SILENCE_FRAME = 10000; + + // Helpers + + // std::string + // tslabel (Protocol::TransactionStatus s); + + // std::ostream& + // operator << (std::ostream& o, Transaction const& t); + } +} + +#endif // TMCAST_PROTOCOL_HPP diff --git a/ACE/protocols/ace/TMCast/README b/ACE/protocols/ace/TMCast/README new file mode 100644 index 00000000000..7104be46e30 --- /dev/null +++ b/ACE/protocols/ace/TMCast/README @@ -0,0 +1,240 @@ + + +Introduction +------------ + +TMCast (stands for Transaction MultiCast) is an implementation of a +transactional multicast protocol. In essence, the idea is to represent +each message delivery to members of a multicast group as a transaction +- an atomic, consistent and isolated action. A multicast transaction +can be viewed as an atomic transition of the group members to a new +state. If we define [Mo] as a set of operational (non-faulty) members +of the group, [Mf] as a set of faulty members of the group, [Ma] as a +set of members that view transition [Tn] as aborted and [Mc] as a set +of members that view transition [Tn] as committed, then this atomic +transition [Tn] should satisfy one of the following equations: + +Mo(Tn-1) = Ma(T) U Mf(T) +Mo(Tn-1) = Mc(T) U Mf(T) + +Or, in other words, after transaction T has been committed (aborted), +all operational (before transaction T) members are either in the +committed (aborted) or failed state. + +Thus, for each member of the group, outcome of the transaction can be +commit, abort or a member failure. It is important for a member to +exhibit a failfast (error latency is less than transaction cycle) +behavior. Or, in other words, if a member transitioned into a wrong +state, it is guaranteed to fail instead of delivering a wrong result. + +In order to achieve such an error detection in a decentralized +environment, certain limitations were imposed. One of the most +user-visible limitation is the fact that the lifetime of the group +with only one member is very short. This is because there is not way +for a member to distinguish "no members yet" case from "my link to the +group is down". In such a situation, the member assumes the latter +case. There is also a military saying that puts it quite nicely: two +is one, one is nothing. + + + +State of Implementation +----------------------- + +The current implementation is in a prototypical stage. The following +parts are not implemented or still under development: + +* Handling of network partitioning (TODO) + +* Redundant network support (TODO) + +* Member failure detection (partial implementation) + + +Examples +-------- + +There is a simple example available in examples/TMCast/Member with +the corresponding README. + + +Architecture +------------ + +Primary goals of the protocol are to (1) mask transient failures of the +underlying multicast protocol (or, more precisely, allow to recover +from transient failures) and (2) exhibit failfast behavior in cases of +permanent failures. + +The distinction between transient and permanent failures is based on +timeouts thus what can be a transient failure in one configuration of +the protocol could be a permanent failure in the other. + +[Maybe talk more about a transient/permanent threshold and its effect +on performance/resource utilization/etc.] + +[Maybe add a terminology section.] + +Each member of a multicast group has its unique (group-wise) id: + +struct MemberId +{ + char id[MEMBER_ID_LENGTH]; +}; + +Each payload delivery is part of a transaction. Each transaction is +identified by TransactionId: + +typedef unsigned short TransactionId; + + +Each transaction has a status code which identifies its state, as viewed by +a group member: + + +typedef unsigned char TransactionStatus; + +TransactionStatus const TS_BEGIN = 1; +TransactionStatus const TS_COMMIT = 2; +TransactionStatus const TS_ABORT = 3; +TransactionStatus const TS_COMMITTED = 4; +TransactionStatus const TS_ABORTED = 5; + +Thus each transaction is described by its id and status: + +struct Transaction +{ + TransactionId id; + TransactionStatus status; +}; + +The outcome of some predefined number of recent transactions is stored +in TransactionList: + +typedef Transaction TransactionList[TL_LENGTH]; + + +Each message sent to a multicast group has the following header: + +struct MessageHeader +{ + unsigned long length; + unsigned long check_sum; + MemberId member_id; + Transaction current; + TransactionList transaction_list; +}; + +[Maybe describe each field here.] + +A new member joins the group with transaction id 0 and status +TS_COMMITTED. + +Each member sends a periodic 'pulse' messages with some predefined interval +advertising its current view of the group. This includes the state of the +current transaction and the history of the recent transactions. + + +If a member of the group needs a payload delivery it starts a new +transaction by sending a message with current transaction set to + +{++current_id, TS_BEGIN} + +and payload appended after the header. + + +Each member joins a transaction in one of the following ways: + +* A member that began the transaction joins it 'to commit' (TS_COMMIT) + +* A member that received TS_BEGIN joins current transaction 'to commit' + (TS_COMMIT). + +* A member that received TS_COMMIT or TS_ABORT but did not receive TS_BEGIN + joins current transaction 'to abort' (TS_ABORT). + + +After a member has joined the transaction it starts participating in the +transaction's voting phase. On this phase members of the group decide the +fate of the transaction. Each member sends a predefined number of messages +where it announces its vote. In between those messages the member is receiving +and processing votes from other members and can be influenced by their +'opinion'. + +In their decision-making members follow the principle of the majority. As +the voting progresses (and comes close to an end) members become more and +more reluctant to deviate from the decision of the majority. + +[Maybe add an equation that measures member's willingness to change +its mind.] + +At the end of the voting phase each member declares the current transaction +either committed (TS_COMMITTED) or aborted (TS_ABORTED). If this decision does +not agree with the majority the member declares itself failed. + +In addition, each member builds a 'majority view' of the transaction history +(based on transaction_list). If it deviates from the member's own history the +member declares itself failed. + +Here are some example scenarios of how the protocol behaves in different +situations. Let's say we have three members of the group S, R1, R2. S +initiates a transaction. R1 and R2 join it. + +Scenario 1. (two-step voting) + +1. S initiates a transaction (TS_BEGIN) +2a. R1 receives TS_BEGIN, joins for commit +2b. R2 receives TS_BEGIN, joins for commit +3a. S announces TS_COMMIT (first vote) +3b. R1 announces TS_COMMIT (first vote) +3c. R2 announces TS_COMMIT (first vote) +4a. S announces TS_COMMIT (second vote) +4b. R1 announces TS_COMMIT (second vote) +4c. R2 announces TS_COMMIT (second vote) +5a. S announces TS_COMMITTED (end of vote) +5b. R1 announces TS_COMMITTED (end of vote) +5c. R2 announces TS_COMMITTED (end of vote) + + +Scenario 2. (two-step voting) + +1. S initiates a transaction (TS_BEGIN) +2a. R1 receives TS_BEGIN, joins for commit +2b. R2 didn't receive TS_BEGIN +3a. S announces TS_COMMIT (first vote) +3b. R1 announces TS_COMMIT (first vote) +3c. R2 received R1's TS_COMMIT announces TS_ABORT (first vote) +4a. S received R2's TS_ABORT announces TS_ABORT (second vote) +4b. R1 received R2's TS_ABORT announces TS_ABORT (second vote) +4c. R2 announces TS_ABORT (second vote) +5a. S announces TS_ABORTED (end of vote) +5b. R1 announces TS_ABORTED (end of vote) +5c. R2 announces TS_ABORTED (end of vote) + + +Scenario 3. (three-step voting) + +1. S initiates a transaction (TS_BEGIN) +2a. R1 receives TS_BEGIN, joins for commit +2b. R2 didn't receive TS_BEGIN +3a. S announces TS_COMMIT (first vote) +3b. R1 announces TS_COMMIT (first vote) +3c. R2 still didn't receive anything +4a. S announces TS_COMMIT (second vote) +4b. R1 announces TS_COMMIT (second vote) +4c. R2 received R1's TS_COMMIT, announces TS_ABORT (first vote) + +5a. S received R2's TS_ABORT but it is the end of the voting phase and + majority (S and R1) vote for commit, announces TS_COMMIT (third vote) +5b. R1 received R2's TS_ABORT but it is the end of the voting phase and + majority (S and R1) vote for commit, announces TS_COMMIT (third vote) +5c. R2 announces TS_ABORT (second vote) + +6a. S announces TS_COMMITTED (end of vote) +6b. R1 announces TS_COMMITTED (end of vote) +6c. R2 discovers that the the majority has declared current transaction + committed and thus declares itself failed. + + +-- +Boris Kolpackov <boris@dre.vanderbilt.edu> diff --git a/ACE/protocols/ace/TMCast/TMCast.mpc b/ACE/protocols/ace/TMCast/TMCast.mpc new file mode 100644 index 00000000000..1ff937a0a1a --- /dev/null +++ b/ACE/protocols/ace/TMCast/TMCast.mpc @@ -0,0 +1,12 @@ +// -*- MPC -*- +// $Id$ + +project : acelib, core, exceptions, threads { + avoids = ace_for_tao + sharedname = ACE_TMCast + dynamicflags += TMCAST_BUILD_DLL + + Pkgconfig_Files { + ACE_TMCast.pc.in + } +} diff --git a/ACE/protocols/ace/TMCast/TransactionController.hpp b/ACE/protocols/ace/TMCast/TransactionController.hpp new file mode 100644 index 00000000000..6b0d4281655 --- /dev/null +++ b/ACE/protocols/ace/TMCast/TransactionController.hpp @@ -0,0 +1,388 @@ +// file : ACE_TMCast/TransactionController.hpp +// author : Boris Kolpackov <boris@dre.vanderbilt.edu> +// cvs-id : $Id$ + +#include "ace/OS_NS_string.h" +#include "ace/OS_NS_stdlib.h" +#include "ace/Synch.h" +#include "ace/Refcounted_Auto_Ptr.h" + +#include "Protocol.hpp" +#include "Messaging.hpp" + +#include <typeinfo> + +namespace ACE_TMCast +{ + + // Messages + // + // + class Send : public virtual Message + { + public: + Send (void const* msg, size_t size) + : size_ (size) + { + ACE_OS::memcpy (payload_, msg, size_); + } + + void const* + payload () const + { + return payload_; + } + + size_t + size () const + { + return size_; + } + + private: + size_t size_; + char payload_[Protocol::MAX_PAYLOAD_SIZE]; + }; + + typedef + ACE_Refcounted_Auto_Ptr<Send, ACE_Null_Mutex> + SendPtr; + + + class Recv : public virtual Message + { + public: + Recv (void const* msg, size_t size) + : size_ (size) + { + ACE_OS::memcpy (payload_, msg, size_); + } + + void const* + payload () const + { + return payload_; + } + + size_t + size () const + { + return size_; + } + + private: + size_t size_; + char payload_[Protocol::MAX_PAYLOAD_SIZE]; + }; + + typedef + ACE_Refcounted_Auto_Ptr<Recv, ACE_Null_Mutex> + RecvPtr; + + class Aborted : public virtual Message {}; + + class Commited : public virtual Message {}; + + + // + // + // + class TransactionController + { + public: + TransactionController (MessageQueue& in, + MessageQueue& send_out, + MessageQueue& recv_out) + : trace_ (false), + voting_duration_ (0), + separation_duration_ (0), + in_ (in), + send_out_ (send_out), + recv_out_ (recv_out) + { + current_.id = 0; + current_.status = Protocol::TS_COMMITED; + } + + public: + class Failure {}; + + + void + outsync (Protocol::Transaction& c, void* payload, size_t& size) + { + if (current_.status == Protocol::TS_COMMIT || + current_.status == Protocol::TS_ABORT) + { + if (++voting_duration_ >= Protocol::VOTING_FRAME) + { + // end of voting frame + + if (current_.status == Protocol::TS_COMMIT) + { + { + if (initiated_) + { + MessageQueueAutoLock lock (send_out_); + send_out_.push (MessagePtr (new Commited)); + } + else // joined transaction + { + MessageQueueAutoLock lock (recv_out_); + recv_out_.push (MessagePtr (recv_.release ())); + recv_ = RecvPtr (); + } + } + + current_.status = Protocol::TS_COMMITED; + + // if (trace_) cerr << "commited transaction with id " + // << current_.id << endl; + } + else // TS_ABORT + { + if (initiated_) + { + MessageQueueAutoLock lock (send_out_); + send_out_.push (MessagePtr (new Aborted)); + } + else + { + // free revc_ buffer if necessary + // + if (recv_.get ()) recv_ = RecvPtr (); + } + + + current_.status = Protocol::TS_ABORTED; + + // if (trace_) cerr << "aborted transaction with id " + // << current_.id << endl; + } + + // start transaction separation frame (counts down) + // +1 because it will be decremented on this iteration + separation_duration_ = Protocol::SEPARATION_FRAME + 1; + } + } + + // Set current outsync info + + c.id = current_.id; + c.status = current_.status; + + + // Do some post-processing + + switch (current_.status) + { + case Protocol::TS_COMMITED: + case Protocol::TS_ABORTED: + { + if (separation_duration_ > 0) --separation_duration_; + break; + } + case Protocol::TS_BEGIN: + { + // transfer payload + + size = send_->size (); + memcpy (payload, send_->payload (), size); + + send_ = SendPtr (); + + // get redy to vote for 'commit' + + current_.status = Protocol::TS_COMMIT; + voting_duration_ = 0; + } + } + } + + void + current_transaction (Protocol::Transaction const& t, + void const* payload, + size_t size) + { + Protocol::TransactionId& id = current_.id; + Protocol::TransactionStatus& s = current_.status; + + if (id == 0 && t.id != 0) // catch up + { + switch (t.status) + { + case Protocol::TS_BEGIN: + case Protocol::TS_COMMIT: + case Protocol::TS_ABORT: + { + id = t.id - 1; + s = Protocol::TS_COMMITED; + break; + } + case Protocol::TS_ABORTED: + case Protocol::TS_COMMITED: + { + id = t.id; + s = t.status; + break; + } + } + + // if (trace_) cerr << "caught up with id " << id << endl; + } + + bool stable (s == Protocol::TS_COMMITED || s == Protocol::TS_ABORTED); + + switch (t.status) + { + case Protocol::TS_BEGIN: + { + if (!stable || t.id != id + 1) + { + // Transaction is in progress or hole in transaction id's + + // cerr << "unexpected request to join " << t + // << " while on " << current_ << endl; + + // if (!stable) cerr << "voting progress is " << voting_duration_ + // << "/" << Protocol::VOTING_FRAME << endl; + + if (t.id == id) // collision + { + if (!stable && s != Protocol::TS_ABORT) + { + // abort both + // cerr << "aborting both transactions" << endl; + + s = Protocol::TS_ABORT; + voting_duration_ = 0; //@@ reset voting frame + } + } + else + { + // @@ delicate case. need to think more + + // cerr << "Declaring node failed." << endl; + throw Failure (); + } + } + else + { + // join the transaction + + initiated_ = false; + + recv_ = RecvPtr (new Recv (payload, size)); + + id = t.id; + s = Protocol::TS_COMMIT; + voting_duration_ = 0; + + // if (trace_) cerr << "joining-for-commit transaction with id " + // << id << endl; + } + break; + } + case Protocol::TS_COMMIT: + { + if (stable && id == t.id - 1) + { + // not begin and and we haven't joined + + // join for abort + + initiated_ = false; + + current_.id = t.id; + current_.status = Protocol::TS_ABORT; + voting_duration_ = 0; + + // if (trace_) cerr << "joining-for-abort transaction with id " + // << current_.id << endl; + } + break; + } + case Protocol::TS_ABORT: + { + if ((!stable && id == t.id && s == Protocol::TS_COMMIT) || + (stable && id == t.id - 1)) // abort current || new transaction + { + // if (trace_) cerr << "voting-for-abort on transaction with id " + // << current_.id << endl; + + id = t.id; + s = Protocol::TS_ABORT; + + voting_duration_ = 0; //@@ reseting voting_duration_ + } + else + { + } + + break; + } + case Protocol::TS_ABORTED: + case Protocol::TS_COMMITED: + { + // nothing for now + break; + } + } + } + + void + api () + { + if ((current_.status == Protocol::TS_COMMITED || + current_.status == Protocol::TS_ABORTED) && + separation_duration_ == 0) // no transaction in progress + { + // start new transaction + + // Note that in_ is already locked by Scheduler + + MessagePtr m (in_.front ()); + in_.pop (); + + if (typeid (*m) == typeid (Send)) + { + send_ = SendPtr (dynamic_cast<Send*> (m.release ())); + } + else + { + // cerr << "Expecting Send but received " << typeid (*m).name () + // << endl; + + ACE_OS::abort (); + } + + current_.id++; + current_.status = Protocol::TS_BEGIN; + + initiated_ = true; + + // if (trace_) cerr << "starting transaction with id " << current_.id + // << endl; + } + } + + private: + typedef ACE_Guard<ACE_Thread_Mutex> AutoLock; + + bool trace_; + + Protocol::Transaction current_; + + bool initiated_; + + unsigned short voting_duration_; + unsigned short separation_duration_; + + MessageQueue& in_; + MessageQueue& send_out_; + MessageQueue& recv_out_; + + SendPtr send_; + RecvPtr recv_; + }; +} |