diff options
author | boris <boris@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 2003-11-03 23:23:22 +0000 |
---|---|---|
committer | boris <boris@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 2003-11-03 23:23:22 +0000 |
commit | b74c3ce9b719f1b325370d067ad44fa2404a23c3 (patch) | |
tree | 7eac2188160da955c2947dc464de9176c7d4b282 /protocols | |
parent | 8c9a3e8856cd212cce975d7c016b1e121ff015f8 (diff) | |
download | ATCD-b74c3ce9b719f1b325370d067ad44fa2404a23c3.tar.gz |
ChangeLogTag: Mon Nov 3 17:02:42 UTC 2003 Don Hinton <dhinton@dresystems.com>
Diffstat (limited to 'protocols')
-rw-r--r-- | protocols/ace/TMCast/Export.hpp | 54 | ||||
-rw-r--r-- | protocols/ace/TMCast/FaultDetector.hpp | 41 | ||||
-rw-r--r-- | protocols/ace/TMCast/Group.cpp | 508 | ||||
-rw-r--r-- | protocols/ace/TMCast/Group.hpp | 51 | ||||
-rw-r--r-- | protocols/ace/TMCast/GroupFwd.hpp | 15 | ||||
-rw-r--r-- | protocols/ace/TMCast/LinkListener.hpp | 166 | ||||
-rw-r--r-- | protocols/ace/TMCast/MTQueue.hpp | 177 | ||||
-rw-r--r-- | protocols/ace/TMCast/Makefile | 221 | ||||
-rw-r--r-- | protocols/ace/TMCast/Messaging.hpp | 54 | ||||
-rw-r--r-- | protocols/ace/TMCast/Protocol.cpp | 31 | ||||
-rw-r--r-- | protocols/ace/TMCast/Protocol.hpp | 107 | ||||
-rw-r--r-- | protocols/ace/TMCast/README | 58 | ||||
-rw-r--r-- | protocols/ace/TMCast/TransactionController.hpp | 384 |
13 files changed, 1867 insertions, 0 deletions
diff --git a/protocols/ace/TMCast/Export.hpp b/protocols/ace/TMCast/Export.hpp new file mode 100644 index 00000000000..149a83cb785 --- /dev/null +++ b/protocols/ace/TMCast/Export.hpp @@ -0,0 +1,54 @@ + +// -*- C++ -*- +// $Id$ +// Definition for Win32 Export directives. +// This file is generated automatically by generate_export_file.pl TMCast +// ------------------------------ +#ifndef TMCAST_EXPORT_H +#define TMCAST_EXPORT_H + +#include "ace/config-all.h" + +#if !defined (TMCAST_HAS_DLL) +# define TMCAST_HAS_DLL 1 +#endif /* ! TMCAST_HAS_DLL */ + +#if defined (TMCAST_HAS_DLL) && (TMCAST_HAS_DLL == 1) +# if defined (TMCAST_BUILD_DLL) +# define TMCast_Export ACE_Proper_Export_Flag +# define TMCAST_SINGLETON_DECLARATION(T) ACE_EXPORT_SINGLETON_DECLARATION (T) +# define TMCAST_SINGLETON_DECLARE(SINGLETON_TYPE, CLASS, LOCK) ACE_EXPORT_SINGLETON_DECLARE(SINGLETON_TYPE, CLASS, LOCK) +# else /* TMCAST_BUILD_DLL */ +# define TMCast_Export ACE_Proper_Import_Flag +# define TMCAST_SINGLETON_DECLARATION(T) ACE_IMPORT_SINGLETON_DECLARATION (T) +# define TMCAST_SINGLETON_DECLARE(SINGLETON_TYPE, CLASS, LOCK) ACE_IMPORT_SINGLETON_DECLARE(SINGLETON_TYPE, CLASS, LOCK) +# endif /* TMCAST_BUILD_DLL */ +#else /* TMCAST_HAS_DLL == 1 */ +# define TMCast_Export +# define TMCAST_SINGLETON_DECLARATION(T) +# define TMCAST_SINGLETON_DECLARE(SINGLETON_TYPE, CLASS, LOCK) +#endif /* TMCAST_HAS_DLL == 1 */ + +// Set TMCAST_NTRACE = 0 to turn on library specific tracing even if +// tracing is turned off for ACE. +#if !defined (TMCAST_NTRACE) +# if (ACE_NTRACE == 1) +# define TMCAST_NTRACE 1 +# else /* (ACE_NTRACE == 1) */ +# define TMCAST_NTRACE 0 +# endif /* (ACE_NTRACE == 1) */ +#endif /* !TMCAST_NTRACE */ + +#if (TMCAST_NTRACE == 1) +# define TMCAST_TRACE(X) +#else /* (TMCAST_NTRACE == 1) */ +# if !defined (ACE_HAS_TRACE) +# define ACE_HAS_TRACE +# endif /* ACE_HAS_TRACE */ +# define TMCAST_TRACE(X) ACE_TRACE_IMPL(X) +# include "ace/Trace.h" +#endif /* (TMCAST_NTRACE == 1) */ + +#endif /* TMCAST_EXPORT_H */ + +// End of auto generated file. diff --git a/protocols/ace/TMCast/FaultDetector.hpp b/protocols/ace/TMCast/FaultDetector.hpp new file mode 100644 index 00000000000..ba476cbd367 --- /dev/null +++ b/protocols/ace/TMCast/FaultDetector.hpp @@ -0,0 +1,41 @@ +// file : TMCast/FaultDetector.hpp +// author : Boris Kolpackov <boris@dre.vanderbilt.edu> +// cvs-id : $Id$ + +#include "Protocol.hpp" + +namespace TMCast +{ + class FaultDetector + { + public: + FaultDetector () + : silence_period_ (-1) + { + } + + public: + class Failed {}; + + + void + insync () + { + silence_period_ = 0; + } + + void + outsync () + { + if (++silence_period_ >= Protocol::FATAL_SILENCE_FRAME) + { + // cerr << "Silence period has been passed." << endl; + // cerr << "Decalring the node failed." << endl; + throw Failed (); + } + } + + private: + short silence_period_; + }; +} diff --git a/protocols/ace/TMCast/Group.cpp b/protocols/ace/TMCast/Group.cpp new file mode 100644 index 00000000000..f6858d96644 --- /dev/null +++ b/protocols/ace/TMCast/Group.cpp @@ -0,0 +1,508 @@ +// file : TMCast/Group.cpp +// author : Boris Kolpackov <boris@dre.vanderbilt.edu> +// cvs-id : $Id$ + +#include "Group.hpp" + +#include <typeinfo> + +// OS primitives +#include <ace/OS.h> +#include <ace/Synch.h> +#include <ace/Time_Value.h> +#include <ace/SOCK_Dgram_Mcast.h> + +#include "Messaging.hpp" + +#include "Protocol.hpp" + +// Components + +#include "LinkListener.hpp" +#include "FaultDetector.hpp" +#include "TransactionController.hpp" + +namespace TMCast +{ + bool + operator== (std::type_info const* pa, std::type_info const& b) + { + return *pa == b; + } + + // + // + // + class Terminate : public virtual Message {}; + + + // + // + // + class Failure : public virtual Message {}; + + + // + // + // + class Scheduler + { + public: + Scheduler (ACE_INET_Addr const& addr, + char const* id, + MessageQueue& out_send_data, + MessageQueue& out_recv_data, + MessageQueue& out_control) + + : cond_ (mutex_), + + addr_ (addr), + sock_ (), + + out_control_ (out_control), + + in_data_ (mutex_), + in_link_data_(mutex_), + in_control_ (mutex_), + + sync_schedule (ACE_OS::gettimeofday ()), + + transaction_controller_ (in_data_, out_send_data, out_recv_data) + { + ACE_OS::strncpy (id_, id, Protocol::MEMBER_ID_LENGTH); + id_[Protocol::MEMBER_ID_LENGTH - 1] = '\0'; + + sock_.set_option (IP_MULTICAST_TTL, 32); // @@ ttl is hardcoded + + in_data_.subscribe (cond_); + in_link_data_.subscribe (cond_); + in_control_.subscribe (cond_); + + if (ACE_OS::thr_create (&thread_thunk, + this, + THR_JOINABLE, + &thread_) != 0) ::abort (); + } + + ~Scheduler () + { + { + MessageQueueAutoLock lock (in_control_); + + in_control_.push (MessagePtr (new Terminate)); + } + + if (ACE_OS::thr_join (thread_, 0) != 0) ::abort (); + + // cerr << "Scheduler is down." << endl; + } + + public: + MessageQueue& + in_data () + { + return in_data_; + } + + private: + static void* + thread_thunk (void* arg) + { + Scheduler* obj = reinterpret_cast<Scheduler*> (arg); + + obj->execute (); + return 0; + } + + void + execute () + { + try + { + sock_.join (addr_); + + auto_ptr<LinkListener> ll (new LinkListener (sock_, in_link_data_)); + + { + AutoLock lock (mutex_); + + // Loop + // + // + + while (true) + { + cond_.wait (&sync_schedule); + + // "Loop of Fairness" + + bool done = false; + + do + { + // control message + // + // + if (!in_control_.empty ()) + { + done = true; + break; + } + + + // outsync + // + // + if (sync_schedule < ACE_OS::gettimeofday ()) + { + // OUTSYNC + + outsync (); + + // schedule next outsync + + sync_schedule = + ACE_OS::gettimeofday () + + ACE_Time_Value (0, Protocol::SYNC_PERIOD); + } + + // link message + // + // + if (!in_link_data_.empty ()) + { + MessagePtr m (in_link_data_.front ()); + in_link_data_.pop (); + + std::type_info const* exp (&typeid (*m)); + + if (exp == typeid (LinkFailure)) + { + // cerr << "link failure" << endl; + throw false; + } + else if (exp == typeid (LinkData)) + { + + LinkData* data (dynamic_cast<LinkData*> (m.get ())); + + // INSYNC, TL, CT + + // Filter out loopback. + // + if (ACE_OS::strcmp (data->header().member_id.id, id_) != 0) + { + insync (); + transaction_list (); + current_transaction (data->header().current, + data->payload (), + data->size ()); + } + } + else + { + // cerr << "unknown message type from link listener: " + // << typeid (*m).name () << endl; + abort (); + } + } + + // api message + // + // + if (!in_data_.empty ()) + { + // API + + api (); + } + + } while (!in_link_data_.empty() || + sync_schedule < ACE_OS::gettimeofday ()); + + if (done) break; + } + } + } + catch (...) + { + // cerr << "Exception in scheduler loop." << endl; + + MessageQueueAutoLock lock (out_control_); + out_control_.push (MessagePtr (new Failure)); + } + } + + // Events + // + // Order: + // + // INSYNC, TSL, VOTE, BEGIN + // API + // OUTSYNC + // + + void + insync () + { + fault_detector_.insync (); + } + + void + outsync () + { + char buf[Protocol::MAX_MESSAGE_SIZE]; + + Protocol::MessageHeader* hdr = + reinterpret_cast<Protocol::MessageHeader*> (buf); + + void* data = buf + sizeof (Protocol::MessageHeader); + + hdr->length = sizeof (Protocol::MessageHeader); + hdr->check_sum = 0; + + ACE_OS::strcpy (hdr->member_id.id, id_); + + size_t size (0); + + transaction_controller_.outsync (hdr->current, data, size); + + hdr->length += size; + + fault_detector_.outsync (); + + // sock_.send (buf, hdr->length, addr_); + + sock_.send (buf, hdr->length); + } + + void + transaction_list () + { + } + + void + current_transaction (Protocol::Transaction const& t, + void const* payload, + size_t size) + { + transaction_controller_.current_transaction (t, payload, size); + } + + void + api () + { + transaction_controller_.api (); + } + + private: + ACE_thread_t thread_; + + ACE_Thread_Mutex mutex_; + ACE_Condition<ACE_Thread_Mutex> cond_; + + typedef ACE_Guard<ACE_Thread_Mutex> AutoLock; + + char id_[Protocol::MEMBER_ID_LENGTH]; + + ACE_INET_Addr addr_; + ACE_SOCK_Dgram_Mcast sock_; + + MessageQueue& out_control_; + + MessageQueue in_data_; + MessageQueue in_link_data_; + MessageQueue in_control_; + + // Protocol state + // + // + + ACE_Time_Value sync_schedule; + + FaultDetector fault_detector_; + TransactionController transaction_controller_; + }; + + + // + // + // + class Group::GroupImpl + { + public: + ~GroupImpl () + { + } + + GroupImpl (ACE_INET_Addr const& addr, char const* id) + throw (Group::Failed) + : send_cond_ (mutex_), + recv_cond_ (mutex_), + failed_ (false), + in_send_data_ (mutex_), + in_recv_data_ (mutex_), + in_control_ (mutex_), + scheduler_ (new Scheduler (addr, + id, + in_send_data_, + in_recv_data_, + in_control_)), + out_data_ (scheduler_->in_data ()) + { + in_send_data_.subscribe (send_cond_); + in_recv_data_.subscribe (recv_cond_); + + in_control_.subscribe (send_cond_); + in_control_.subscribe (recv_cond_); + } + + void + send (void const* msg, size_t size) + throw (Group::InvalidArg, Group::Failed, Group::Aborted) + { + if (size > Protocol::MAX_PAYLOAD_SIZE) throw InvalidArg (); + + // Note the potential deadlock if I lock mutex_ and out_data_ in + // reverse order. + + MessageQueueAutoLock l1 (out_data_); + AutoLock l2 (mutex_); + + throw_if_failed (); + + out_data_.push (MessagePtr (new Send (msg, size))); + + l1.unlock (); // no need to keep it locked + + while (true) + { + throw_if_failed (); + + if (!in_send_data_.empty ()) + { + MessagePtr m (in_send_data_.front ()); + in_send_data_.pop (); + + std::type_info const* exp (&typeid (*m)); + + if (exp == typeid (TMCast::Aborted)) + { + throw Group::Aborted (); + } + else if (exp == typeid (Commited)) + { + return; + } + else + { + // cerr << "send: group-scheduler messaging protocol violation; " + // << "unexpected message " << typeid (*m).name () + // << " " << typeid (Aborted).name () << endl; + + abort (); + } + } + + // cerr << "send: waiting on condition" << endl; + send_cond_.wait (); + // cerr << "send: wokeup on condition" << endl; + } + } + + + + size_t + recv (void* msg, size_t size) throw (Group::Failed, Group::InsufficienSpace) + { + AutoLock lock (mutex_); + + while (true) + { + throw_if_failed (); + + if (!in_recv_data_.empty ()) + { + MessagePtr m (in_recv_data_.front ()); + in_recv_data_.pop (); + + std::type_info const* exp (&typeid (*m)); + + if (exp == typeid (Recv)) + { + Recv* data (dynamic_cast<Recv*> (m.get ())); + + if (size < data->size ()) throw Group::InsufficienSpace (); + + memcpy (msg, data->payload (), data->size ()); + + return data->size (); + } + else + { + // cerr << "recv: group-scheduler messaging protocol violation. " + // << "unexpected message " << typeid (*m).name () << endl; + + abort (); + } + } + + recv_cond_.wait (); + } + } + + private: + void + throw_if_failed () + { + if (!failed_ && !in_control_.empty ()) failed_ = true; + + if (failed_) throw Group::Failed (); + } + + private: + ACE_Thread_Mutex mutex_; + ACE_Condition<ACE_Thread_Mutex> send_cond_; + ACE_Condition<ACE_Thread_Mutex> recv_cond_; + + typedef ACE_Guard<ACE_Thread_Mutex> AutoLock; + + bool failed_; + + MessageQueue in_send_data_; + MessageQueue in_recv_data_; + MessageQueue in_control_; + + auto_ptr<Scheduler> scheduler_; + + MessageQueue& out_data_; + }; + + + // Group + // + // + Group:: + Group (ACE_INET_Addr const& addr, char const* id) + throw (Failed) + : pimpl_ (new GroupImpl (addr, id)) + { + } + + Group:: + ~Group () + { + } + + void Group:: + send (void const* msg, size_t size) throw (InvalidArg, Failed, Aborted) + { + pimpl_->send (msg, size); + } + + size_t Group:: + recv (void* msg, size_t size) throw (Failed, InsufficienSpace) + { + return pimpl_->recv (msg, size); + } +} diff --git a/protocols/ace/TMCast/Group.hpp b/protocols/ace/TMCast/Group.hpp new file mode 100644 index 00000000000..416cea0a17d --- /dev/null +++ b/protocols/ace/TMCast/Group.hpp @@ -0,0 +1,51 @@ +// file : TMCast/Group.hpp +// author : Boris Kolpackov <boris@dre.vanderbilt.edu> +// cvs-id : $Id$ + +#ifndef TMCAST_GROUP_HPP +#define TMCAST_GROUP_HPP + +#include <ace/Auto_Ptr.h> +#include <ace/INET_Addr.h> + +#include "Export.hpp" + +namespace TMCast +{ + class TMCast_Export Group + { + public: + class Aborted {}; + class Failed {}; + class InvalidArg {}; + class InsufficienSpace {}; + + public: + ~Group (); + + Group (ACE_INET_Addr const& addr, char const* id) throw (Failed); + + public: + void + send (void const* msg, size_t size) throw (InvalidArg, Failed, Aborted); + + size_t + recv (void* msg, size_t size) throw (Failed, InsufficienSpace); + + private: + bool + failed (); + + private: + class GroupImpl; + auto_ptr<GroupImpl> pimpl_; + + private: + Group (Group const&); + + Group& + operator= (Group const&); + }; +} + +#endif // TMCAST_GROUP_HPP diff --git a/protocols/ace/TMCast/GroupFwd.hpp b/protocols/ace/TMCast/GroupFwd.hpp new file mode 100644 index 00000000000..beba06df79d --- /dev/null +++ b/protocols/ace/TMCast/GroupFwd.hpp @@ -0,0 +1,15 @@ +// file : TMCast/GroupFwd.hpp +// author : Boris Kolpackov <boris@dre.vanderbilt.edu> +// cvs-id : $Id$ + +#ifndef TMCAST_GROUP_FWD_HPP +#define TMCAST_GROUP_FWD_HPP + +#include "Export.hpp" + +namespace TMCast +{ + class TMCast_Export Group; +} + +#endif // TMCAST_GROUP_FWD_HPP diff --git a/protocols/ace/TMCast/LinkListener.hpp b/protocols/ace/TMCast/LinkListener.hpp new file mode 100644 index 00000000000..990f9e8f803 --- /dev/null +++ b/protocols/ace/TMCast/LinkListener.hpp @@ -0,0 +1,166 @@ +// file : TMCast/LinkListener.hpp +// author : Boris Kolpackov <boris@dre.vanderbilt.edu> +// cvs-id : $Id$ + +// OS primitives +#include <ace/Synch.h> +#include <ace/SOCK_Dgram_Mcast.h> +#include <ace/Refcounted_Auto_Ptr.h> + + +#include "Messaging.hpp" + +namespace TMCast +{ + // + // + // + class LinkFailure : public virtual Message {}; + + + // + // + // + class LinkData : public virtual Message + { + public: + LinkData (Protocol::MessageHeader const* header, + void* payload, + size_t size) + : size_ (size) + { + ACE_OS::memcpy (&header_, header, sizeof (Protocol::MessageHeader)); + ACE_OS::memcpy (payload_, payload, size_); + } + + Protocol::MessageHeader const& + header () const + { + return header_; + } + + void const* + payload () const + { + return payload_; + } + + size_t + size () const + { + return size_; + } + + private: + Protocol::MessageHeader header_; + char payload_[Protocol::MAX_MESSAGE_SIZE]; + size_t size_; + }; + + typedef + ACE_Refcounted_Auto_Ptr<LinkData, ACE_Null_Mutex> + LinkDataPtr; + + // + // + // + class LinkListener + { + private: + class Terminate : public virtual Message {}; + + public: + LinkListener (ACE_SOCK_Dgram_Mcast& sock, MessageQueue& out) + : sock_(sock), out_ (out) + { + if (ACE_OS::thr_create (&thread_thunk, + this, + THR_JOINABLE, + &thread_) != 0) ::abort (); + } + + ~LinkListener () + { + { + MessageQueueAutoLock lock (control_); + + control_.push (MessagePtr (new Terminate)); + } + + if (ACE_OS::thr_join (thread_, 0) != 0) ::abort (); + + // cerr << "Link listener is down." << endl; + } + + + static void* + thread_thunk (void* arg) + { + LinkListener* obj = reinterpret_cast<LinkListener*> (arg); + + obj->execute (); + return 0; + } + + void + execute () + { + char msg[Protocol::MAX_MESSAGE_SIZE]; + + ssize_t header_size = sizeof (Protocol::MessageHeader); + + // OS::Time timeout (1000000); // one millisecond + + ACE_Time_Value timeout (0, 1000); // one millisecond + + try + { + while (true) + { + // Check control message queue + + { + MessageQueueAutoLock lock (control_); + + if (!control_.empty ()) break; + } + + ACE_Addr junk; + ssize_t n = sock_.recv (msg, + Protocol::MAX_MESSAGE_SIZE, + junk, + 0, + &timeout); + + if (n != -1) + { + if (n < header_size) throw false; + + Protocol::MessageHeader* header = + reinterpret_cast<Protocol::MessageHeader*> (msg); + + MessageQueueAutoLock lock (out_); + + out_.push (MessagePtr (new LinkData (header, + msg + header_size, + n - header_size))); + } + } + } + catch (...) + { + MessageQueueAutoLock lock (out_); + + out_.push (MessagePtr (new LinkFailure)); + } + } + + private: + typedef ACE_Guard<ACE_Thread_Mutex> AutoLock; + + ACE_thread_t thread_; + ACE_SOCK_Dgram_Mcast& sock_; + MessageQueue& out_; + MessageQueue control_; + }; +} diff --git a/protocols/ace/TMCast/MTQueue.hpp b/protocols/ace/TMCast/MTQueue.hpp new file mode 100644 index 00000000000..d593c034723 --- /dev/null +++ b/protocols/ace/TMCast/MTQueue.hpp @@ -0,0 +1,177 @@ +// file : TMCast/MTQueue.hpp +// author : Boris Kolpackov <boris@dre.vanderbilt.edu> +// cvs-id : $Id$ + +#ifndef TMCAST_MT_QUEUE_HPP +#define TMCAST_MT_QUEUE_HPP + +#include "ace/Auto_Ptr.h" +#include "ace/Unbounded_Set.h" +#include "ace/Unbounded_Queue.h" + +namespace TMCast +{ + template <typename T, + typename M, + typename C, + typename Q = ACE_Unbounded_Queue<T> > + class MTQueue + { + public: + typedef T ElementType; + typedef M MutexType; + typedef C ConditionalType; + typedef Q QueueType; + + public: + + MTQueue (std::size_t hint = 0) + : mutexp_ (new MutexType), + mutex_ (*mutexp_), + // queue_ (hint), + queue_ (), + signal_ (false) + { + } + + MTQueue (MutexType& mutex, std::size_t hint = 0) + : mutexp_ (), + mutex_ (mutex), + // queue_ (hint), + queue_ (), + signal_ (false) + { + } + + public: + bool + empty () const + { + return queue_.is_empty (); + } + + std::size_t + size () const + { + return queue_.size (); + } + + // typedef typename QueueType::Empty Empty; + + class Empty {}; + + T& + front () + { + ACE_Unbounded_Queue_Iterator<T> f (queue_); + T* tmp; + if (!f.next (tmp)) throw Empty (); + + return *tmp; + } + + + T const& + front () const + { + ACE_Unbounded_Queue_Const_Iterator<T> f (queue_); + T* tmp; + if (!f.next (tmp)) throw Empty (); + + return *tmp; + } + + /* + T& + back () + { + return queue_.back (); + } + + + T const& + back () const + { + return queue_.back (); + } + */ + + void + push (T const& t) + { + signal_ = empty (); + queue_.enqueue_tail (t); + } + + void + pop () + { + T junk; + queue_.dequeue_head (junk); + } + + public: + void + lock () const + { + mutex_.acquire (); + } + + void + unlock () const + { + if (signal_) + { + signal_ = false; + + for (ConditionalSetConstIterator_ i (cond_set_); + !i.done (); + i.advance ()) + { + ConditionalType** c; + + i.next (c); + + (*c)->signal (); + } + } + + mutex_.release (); + } + + void + subscribe (ConditionalType& c) + { + //@@ should check for duplicates + // + cond_set_.insert (&c); + } + + void + unsubscribe (ConditionalType& c) + { + //@@ should check for absence + // + cond_set_.remove (&c); + } + + private: + auto_ptr<MutexType> mutexp_; + mutable MutexType& mutex_; + QueueType queue_; + + typedef + ACE_Unbounded_Set<ConditionalType*> + ConditionalSet_; + + typedef + ACE_Unbounded_Set_Const_Iterator<ConditionalType*> + ConditionalSetConstIterator_; + + ConditionalSet_ cond_set_; + + mutable bool signal_; + }; +} + +#endif // TMCAST_MT_QUEUE_HPP diff --git a/protocols/ace/TMCast/Makefile b/protocols/ace/TMCast/Makefile new file mode 100644 index 00000000000..87f6d1964ba --- /dev/null +++ b/protocols/ace/TMCast/Makefile @@ -0,0 +1,221 @@ +#---------------------------------------------------------------------------- +# +# $Id$ +# +#---------------------------------------------------------------------------- + +MAKEFILE = Makefile +LIB = libTMCast.a +SHLIB = libTMCast.$(SOEXT) + +FILES= Group Protocol + +LIBS=$(ACELIB) + +#---------------------------------------------------------------------------- +# Include macros and targets +#---------------------------------------------------------------------------- + +include $(ACE_ROOT)/include/makeinclude/wrapper_macros.GNU + +LSRC = $(addsuffix .cpp,$(FILES)) + +include $(ACE_ROOT)/include/makeinclude/macros.GNU +include $(ACE_ROOT)/include/makeinclude/rules.common.GNU +include $(ACE_ROOT)/include/makeinclude/rules.nonested.GNU +include $(ACE_ROOT)/include/makeinclude/rules.lib.GNU +include $(ACE_ROOT)/include/makeinclude/rules.local.GNU + +#---------------------------------------------------------------------------- +# Local targets +#---------------------------------------------------------------------------- + +ifeq ($(shared_libs),1) +ifneq ($(SHLIB),) +CPPFLAGS += -DTMCAST_BUILD_DLL +endif +endif + +#---------------------------------------------------------------------------- +# Dependencies +#---------------------------------------------------------------------------- +# DO NOT DELETE THIS LINE -- g++dep uses it. +# DO NOT PUT ANYTHING AFTER THIS LINE, IT WILL GO AWAY. + + +.obj/Group.o .obj/Group.so .shobj/Group.o .shobj/Group.so: Group.cpp Group.hpp \ + $(ACE_ROOT)/ace/Auto_Ptr.h \ + $(ACE_ROOT)/ace/pre.h \ + $(ACE_ROOT)/ace/post.h \ + $(ACE_ROOT)/ace/ace_wchar.h \ + $(ACE_ROOT)/ace/ace_wchar.inl \ + $(ACE_ROOT)/ace/Auto_Ptr.i \ + $(ACE_ROOT)/ace/Global_Macros.h \ + $(ACE_ROOT)/ace/OS_Export.h \ + $(ACE_ROOT)/ace/Auto_Ptr.cpp \ + $(ACE_ROOT)/ace/INET_Addr.h \ + $(ACE_ROOT)/ace/Sock_Connect.h \ + $(ACE_ROOT)/ace/ACE_export.h \ + $(ACE_ROOT)/ace/Basic_Types.h \ + $(ACE_ROOT)/ace/os_include/os_limits.h \ + $(ACE_ROOT)/ace/os_include/os_unistd.h \ + $(ACE_ROOT)/ace/os_include/sys/os_types.h \ + $(ACE_ROOT)/ace/os_include/os_stddef.h \ + $(ACE_ROOT)/ace/os_include/os_inttypes.h \ + $(ACE_ROOT)/ace/os_include/os_stdint.h \ + $(ACE_ROOT)/ace/os_include/os_stdio.h \ + $(ACE_ROOT)/ace/os_include/os_stdarg.h \ + $(ACE_ROOT)/ace/os_include/os_float.h \ + $(ACE_ROOT)/ace/os_include/os_stdlib.h \ + $(ACE_ROOT)/ace/os_include/sys/os_wait.h \ + $(ACE_ROOT)/ace/os_include/os_signal.h \ + $(ACE_ROOT)/ace/os_include/os_time.h \ + $(ACE_ROOT)/ace/os_include/os_ucontext.h \ + $(ACE_ROOT)/ace/os_include/sys/os_resource.h \ + $(ACE_ROOT)/ace/os_include/sys/os_time.h \ + $(ACE_ROOT)/ace/os_include/sys/os_select.h \ + $(ACE_ROOT)/ace/Basic_Types.i \ + $(ACE_ROOT)/ace/os_include/netinet/os_in.h \ + $(ACE_ROOT)/ace/os_include/sys/os_socket.h \ + $(ACE_ROOT)/ace/os_include/sys/os_uio.h \ + $(ACE_ROOT)/ace/Sock_Connect.i \ + $(ACE_ROOT)/ace/Addr.h \ + $(ACE_ROOT)/ace/Addr.i \ + $(ACE_ROOT)/ace/INET_Addr.i \ + $(ACE_ROOT)/ace/OS.h \ + $(ACE_ROOT)/ace/OS_Dirent.h \ + $(ACE_ROOT)/ace/OS_Errno.h \ + $(ACE_ROOT)/ace/os_include/os_errno.h \ + $(ACE_ROOT)/ace/OS_Errno.inl \ + $(ACE_ROOT)/ace/os_include/os_dirent.h \ + $(ACE_ROOT)/ace/OS_Dirent.inl \ + $(ACE_ROOT)/ace/OS_String.h \ + $(ACE_ROOT)/ace/OS_String.inl \ + $(ACE_ROOT)/ace/os_include/os_string.h \ + $(ACE_ROOT)/ace/os_include/os_strings.h \ + $(ACE_ROOT)/ace/os_include/os_ctype.h \ + $(ACE_ROOT)/ace/OS_Memory.h \ + $(ACE_ROOT)/ace/OS_Memory.inl \ + $(ACE_ROOT)/ace/OS_TLI.h \ + $(ACE_ROOT)/ace/OS_TLI.inl \ + $(ACE_ROOT)/ace/os_include/os_dlfcn.h \ + $(ACE_ROOT)/ace/os_include/sys/os_mman.h \ + $(ACE_ROOT)/ace/os_include/os_netdb.h \ + $(ACE_ROOT)/ace/os_include/net/os_if.h \ + $(ACE_ROOT)/ace/os_include/sys/os_sem.h \ + $(ACE_ROOT)/ace/os_include/sys/os_ipc.h \ + $(ACE_ROOT)/ace/Time_Value.h \ + $(ACE_ROOT)/ace/Time_Value.inl \ + $(ACE_ROOT)/ace/Default_Constants.h \ + $(ACE_ROOT)/ace/Min_Max.h \ + $(ACE_ROOT)/ace/os_include/os_pthread.h \ + $(ACE_ROOT)/ace/os_include/os_assert.h \ + $(ACE_ROOT)/ace/os_include/os_fcntl.h \ + $(ACE_ROOT)/ace/os_include/sys/os_stat.h \ + $(ACE_ROOT)/ace/iosfwd.h \ + $(ACE_ROOT)/ace/os_include/arpa/os_inet.h \ + $(ACE_ROOT)/ace/os_include/netinet/os_tcp.h \ + $(ACE_ROOT)/ace/os_include/sys/os_shm.h \ + $(ACE_ROOT)/ace/os_include/os_pwd.h \ + $(ACE_ROOT)/ace/os_include/os_stropts.h \ + $(ACE_ROOT)/ace/os_include/os_termios.h \ + $(ACE_ROOT)/ace/os_include/os_aio.h \ + $(ACE_ROOT)/ace/os_include/sys/os_un.h \ + $(ACE_ROOT)/ace/os_include/os_poll.h \ + $(ACE_ROOT)/ace/os_include/sys/os_msg.h \ + $(ACE_ROOT)/ace/os_include/sys/os_utsname.h \ + $(ACE_ROOT)/ace/os_include/os_syslog.h \ + $(ACE_ROOT)/ace/OS.i Export.hpp \ + $(ACE_ROOT)/ace/Synch.h \ + $(ACE_ROOT)/ace/Auto_Event.h \ + $(ACE_ROOT)/ace/Event.h \ + $(ACE_ROOT)/ace/Event.inl \ + $(ACE_ROOT)/ace/Auto_Event.inl \ + $(ACE_ROOT)/ace/Barrier.h \ + $(ACE_ROOT)/ace/Condition_Thread_Mutex.h \ + $(ACE_ROOT)/ace/Thread_Mutex.h \ + $(ACE_ROOT)/ace/Thread_Mutex.inl \ + $(ACE_ROOT)/ace/Condition_Thread_Mutex.inl \ + $(ACE_ROOT)/ace/Barrier.inl \ + $(ACE_ROOT)/ace/Condition_Recursive_Thread_Mutex.h \ + $(ACE_ROOT)/ace/Recursive_Thread_Mutex.h \ + $(ACE_ROOT)/ace/Recursive_Thread_Mutex.inl \ + $(ACE_ROOT)/ace/Condition_Recursive_Thread_Mutex.inl \ + $(ACE_ROOT)/ace/Lock.h \ + $(ACE_ROOT)/ace/Lock.inl \ + $(ACE_ROOT)/ace/Manual_Event.h \ + $(ACE_ROOT)/ace/Manual_Event.inl \ + $(ACE_ROOT)/ace/Mutex.h \ + $(ACE_ROOT)/ace/Mutex.inl \ + $(ACE_ROOT)/ace/Null_Barrier.h \ + $(ACE_ROOT)/ace/Null_Condition.h \ + $(ACE_ROOT)/ace/Null_Mutex.h \ + $(ACE_ROOT)/ace/Null_Semaphore.h \ + $(ACE_ROOT)/ace/RW_Mutex.h \ + $(ACE_ROOT)/ace/RW_Mutex.inl \ + $(ACE_ROOT)/ace/RW_Thread_Mutex.h \ + $(ACE_ROOT)/ace/RW_Thread_Mutex.inl \ + $(ACE_ROOT)/ace/Semaphore.h \ + $(ACE_ROOT)/ace/Semaphore.inl \ + $(ACE_ROOT)/ace/Thread_Semaphore.h \ + $(ACE_ROOT)/ace/Thread_Semaphore.inl \ + $(ACE_ROOT)/ace/TSS_Adapter.h \ + $(ACE_ROOT)/ace/TSS_Adapter.inl \ + $(ACE_ROOT)/ace/Synch.i \ + $(ACE_ROOT)/ace/Synch_T.h \ + $(ACE_ROOT)/ace/Lock_Adapter_T.h \ + $(ACE_ROOT)/ace/Lock_Adapter_T.inl \ + $(ACE_ROOT)/ace/Lock_Adapter_T.cpp \ + $(ACE_ROOT)/ace/Reverse_Lock_T.h \ + $(ACE_ROOT)/ace/Reverse_Lock_T.inl \ + $(ACE_ROOT)/ace/Reverse_Lock_T.cpp \ + $(ACE_ROOT)/ace/Guard_T.h \ + $(ACE_ROOT)/ace/Guard_T.inl \ + $(ACE_ROOT)/ace/Guard_T.cpp \ + $(ACE_ROOT)/ace/TSS_T.h \ + $(ACE_ROOT)/ace/TSS_T.inl \ + $(ACE_ROOT)/ace/TSS_T.cpp \ + $(ACE_ROOT)/ace/Thread.h \ + $(ACE_ROOT)/ace/Thread_Adapter.h \ + $(ACE_ROOT)/ace/Base_Thread_Adapter.h \ + $(ACE_ROOT)/ace/OS_Log_Msg_Attributes.h \ + $(ACE_ROOT)/ace/OS_Log_Msg_Attributes.inl \ + $(ACE_ROOT)/ace/Base_Thread_Adapter.inl \ + $(ACE_ROOT)/ace/Thread_Adapter.inl \ + $(ACE_ROOT)/ace/Thread.i \ + $(ACE_ROOT)/ace/Log_Msg.h \ + $(ACE_ROOT)/ace/Log_Priority.h \ + $(ACE_ROOT)/ace/Condition_T.h \ + $(ACE_ROOT)/ace/Condition_T.inl \ + $(ACE_ROOT)/ace/Condition_T.cpp \ + $(ACE_ROOT)/ace/Synch_Traits.h \ + $(ACE_ROOT)/ace/Synch_T.i \ + $(ACE_ROOT)/ace/Synch_T.cpp \ + $(ACE_ROOT)/ace/SOCK_Dgram_Mcast.h \ + $(ACE_ROOT)/ace/SOCK_Dgram.h \ + $(ACE_ROOT)/ace/SOCK.h \ + $(ACE_ROOT)/ace/IPC_SAP.h \ + $(ACE_ROOT)/ace/Flag_Manip.h \ + $(ACE_ROOT)/ace/Flag_Manip.i \ + $(ACE_ROOT)/ace/IPC_SAP.i \ + $(ACE_ROOT)/ace/SOCK.i \ + $(ACE_ROOT)/ace/SOCK_Dgram.i \ + $(ACE_ROOT)/ace/SOCK_Dgram_Mcast.i \ + Messaging.hpp \ + $(ACE_ROOT)/ace/Refcounted_Auto_Ptr.h \ + $(ACE_ROOT)/ace/Refcounted_Auto_Ptr.i \ + MTQueue.hpp $(ACE_ROOT)/ace/Unbounded_Set.h \ + $(ACE_ROOT)/ace/Node.h \ + $(ACE_ROOT)/ace/Node.cpp \ + $(ACE_ROOT)/ace/Unbounded_Set.inl \ + $(ACE_ROOT)/ace/Unbounded_Set.cpp \ + $(ACE_ROOT)/ace/Malloc_Base.h \ + $(ACE_ROOT)/ace/Unbounded_Queue.h \ + $(ACE_ROOT)/ace/Unbounded_Queue.inl \ + $(ACE_ROOT)/ace/Unbounded_Queue.cpp \ + Protocol.hpp LinkListener.hpp FaultDetector.hpp \ + TransactionController.hpp + +.obj/Protocol.o .obj/Protocol.so .shobj/Protocol.o .shobj/Protocol.so: Protocol.cpp Protocol.hpp + +# IF YOU PUT ANYTHING HERE IT WILL GO AWAY diff --git a/protocols/ace/TMCast/Messaging.hpp b/protocols/ace/TMCast/Messaging.hpp new file mode 100644 index 00000000000..6a1000c3265 --- /dev/null +++ b/protocols/ace/TMCast/Messaging.hpp @@ -0,0 +1,54 @@ +// file : TMCast/Messaging.hpp +// author : Boris Kolpackov <boris@dre.vanderbilt.edu> +// cvs-id : $Id$ + +#ifndef TMCAST_MESSAGING_HPP +#define TMCAST_MESSAGING_HPP + +#include <ace/Synch.h> +#include <ace/Refcounted_Auto_Ptr.h> + +#include "MTQueue.hpp" + +namespace TMCast +{ + class Message + { + public: + virtual + ~Message () {} + }; + + typedef + ACE_Refcounted_Auto_Ptr<Message, ACE_Null_Mutex> + MessagePtr; + + typedef + MTQueue<MessagePtr, ACE_Thread_Mutex, ACE_Condition<ACE_Thread_Mutex> > + MessageQueue; + + struct MessageQueueAutoLock + { + MessageQueueAutoLock (MessageQueue& q) + : q_ (q) + { + q_.lock (); + } + + void + unlock () + { + q_.unlock (); + } + + ~MessageQueueAutoLock () + { + q_.unlock (); + } + + private: + MessageQueue& q_; + }; +} + +#endif // TMCAST_MESSAGING_HPP diff --git a/protocols/ace/TMCast/Protocol.cpp b/protocols/ace/TMCast/Protocol.cpp new file mode 100644 index 00000000000..78563281694 --- /dev/null +++ b/protocols/ace/TMCast/Protocol.cpp @@ -0,0 +1,31 @@ +// file : TMCast/Protocol.cpp +// author : Boris Kolpackov <boris@dre.vanderbilt.edu> +// cvs-id : $Id$ + +#include "Protocol.hpp" + +namespace TMCast +{ + namespace Protocol + { + namespace + { + char const* labels[] = { + "NONE", "BEGIN", "COMMIT", "ABORT", "COMMITED", "ABORTED"}; + } + + /* + std::string + tslabel (Protocol::TransactionStatus s) + { + return labels[s]; + } + + std::ostream& + operator << (std::ostream& o, Transaction const& t) + { + return o << "{" << t.id << "; " << tslabel (t.status) << "}"; + } + */ + } +} diff --git a/protocols/ace/TMCast/Protocol.hpp b/protocols/ace/TMCast/Protocol.hpp new file mode 100644 index 00000000000..d5ae6a50cd6 --- /dev/null +++ b/protocols/ace/TMCast/Protocol.hpp @@ -0,0 +1,107 @@ +// file : TMCast/Protocol.hpp +// author : Boris Kolpackov <boris@dre.vanderbilt.edu> +// cvs-id : $Id$ + +#ifndef TMCAST_PROTOCOL_HPP +#define TMCAST_PROTOCOL_HPP + +namespace TMCast +{ + namespace Protocol + { + // + // + // + unsigned long const MEMBER_ID_LENGTH = 38; + + struct MemberId + { + char id[MEMBER_ID_LENGTH]; + /* + unsigned long ip; + unsigned short port; + */ + }; + + // + // + // + typedef unsigned short TransactionId; + + + + typedef unsigned char TransactionStatus; + + TransactionStatus const TS_BEGIN = 1; + TransactionStatus const TS_COMMIT = 2; + TransactionStatus const TS_ABORT = 3; + TransactionStatus const TS_COMMITED = 4; + TransactionStatus const TS_ABORTED = 5; + + struct Transaction + { + TransactionId id; + TransactionStatus status; + }; + + // Transaction List (TL) + + // unsigned long const TL_LENGTH = 1; + + // typedef Transaction TransactionList[TL_LENGTH]; + + + // + // + // + struct MessageHeader + { + unsigned long length; + + unsigned long check_sum; + + MemberId member_id; + + Transaction current; + + //TransactionList transaction_list; + }; + + + // + // + // + + unsigned long const MAX_MESSAGE_SIZE = 768; + + unsigned long const + MAX_PAYLOAD_SIZE = MAX_MESSAGE_SIZE - sizeof (MessageHeader); + + // Protocol timing + // + // + + unsigned long const SYNC_PERIOD = 30000; // in mks + + unsigned short const VOTING_FRAME = 4; // in SYNC_PERIOD's + unsigned short const SEPARATION_FRAME = 5; // in SYNC_PERIOD's + + // FATAL_SILENCE_FRAME in SYNC_PERIOD's + // Generally it's a good idea to set it to < VOTING_FRAME + SEPARATION_FRAME + // + + short const FATAL_SILENCE_FRAME = VOTING_FRAME + SEPARATION_FRAME - 2; + + // short const FATAL_SILENCE_FRAME = 10000; + + // Helpers + + // std::string + // tslabel (Protocol::TransactionStatus s); + + // std::ostream& + // operator << (std::ostream& o, Transaction const& t); + } +} + +#endif // TMCAST_PROTOCOL_HPP diff --git a/protocols/ace/TMCast/README b/protocols/ace/TMCast/README new file mode 100644 index 00000000000..d061c7b2cba --- /dev/null +++ b/protocols/ace/TMCast/README @@ -0,0 +1,58 @@ + + +Architecture + +TMCast (stands for Transaction MultiCast) is an implementation of a +transactional multicast protocol. In essence, the idea is to represent +message delivery to members of a multicast group as a transaction - +atomic, consistent and isolated action. Multicast transaction can +be viewed as an atomic transition of group members to a new state. +If we define Mo as a set of operational (non-faulty) members of the +group, Mf as a set of faulty members of the group, Ma as a set of +members that view transition Tn as aborted and Mc as a set of members +that view transition Tn as committed, then this atomic transition Tn +can be described as one of the following: + +Mo(Tn-1) = Ma(T) U Mf(T) +Mo(Tn-1) = Mc(T) U Mf(T) + +Or, in other words, after transaction T has been committed (aborted), +all operational (before transaction T) members are either in +committed (aborted) or failed state. + +Thus, for each member of the group, outcome of the transaction can +be commit, abort or member failure. It is important for a member +to exhibit a failfast (error latency is less than transaction cycle) +behavior. Or, in other words, if the member transitioned into a wrong +state, it is guaranteed to fail instead of delivering wrong result. + +In order to achieve such error detection in decentralized environment, +certain limitations should be imposed. One of the most user-visible +limitation is the fact that the lifetime of the group with only +one member is very short. This is because there is not way for a +member to distinguish "no members yet" case from "my link to the +group is down". In such situation, the member assumes the latter case. +There is also a military saying that puts it quite nicely: two is one, +one is nothing. + + +State of Implementation + +Current implementation is in prototypical stage. The following parts +are not implemented or still under development: + +* Handling of network partitioning (TODO) + +* Redundant network support (TODO) + +* Member failure detection (partial implementation) + + +Examples + +There is a simple example available in examples/TMCast/Member with +corresponding README. + + +-- +Boris Kolpackov <boris@dre.vanderbilt.edu>
\ No newline at end of file diff --git a/protocols/ace/TMCast/TransactionController.hpp b/protocols/ace/TMCast/TransactionController.hpp new file mode 100644 index 00000000000..66c924faaf1 --- /dev/null +++ b/protocols/ace/TMCast/TransactionController.hpp @@ -0,0 +1,384 @@ +// file : TMCast/TransactionController.hpp +// author : Boris Kolpackov <boris@dre.vanderbilt.edu> +// cvs-id : $Id$ + +#include <ace/Synch.h> +#include <ace/Refcounted_Auto_Ptr.h> + +#include "Protocol.hpp" +#include "Messaging.hpp" + +namespace TMCast +{ + + // Messages + // + // + class Send : public virtual Message + { + public: + Send (void const* msg, size_t size) + : size_ (size) + { + ACE_OS::memcpy (payload_, msg, size_); + } + + void const* + payload () const + { + return payload_; + } + + size_t + size () const + { + return size_; + } + + private: + size_t size_; + char payload_[Protocol::MAX_PAYLOAD_SIZE]; + }; + + typedef + ACE_Refcounted_Auto_Ptr<Send, ACE_Null_Mutex> + SendPtr; + + + class Recv : public virtual Message + { + public: + Recv (void const* msg, size_t size) + : size_ (size) + { + ACE_OS::memcpy (payload_, msg, size_); + } + + void const* + payload () const + { + return payload_; + } + + size_t + size () const + { + return size_; + } + + private: + size_t size_; + char payload_[Protocol::MAX_PAYLOAD_SIZE]; + }; + + typedef + ACE_Refcounted_Auto_Ptr<Recv, ACE_Null_Mutex> + RecvPtr; + + class Aborted : public virtual Message {}; + + class Commited : public virtual Message {}; + + + // + // + // + class TransactionController + { + public: + TransactionController (MessageQueue& in, + MessageQueue& send_out, + MessageQueue& recv_out) + : trace_ (false), + voting_duration_ (0), + separation_duration_ (0), + in_ (in), + send_out_ (send_out), + recv_out_ (recv_out) + { + current_.id = 0; + current_.status = Protocol::TS_COMMITED; + } + + public: + class Failure {}; + + + void + outsync (Protocol::Transaction& c, void* payload, size_t& size) + { + if (current_.status == Protocol::TS_COMMIT || + current_.status == Protocol::TS_ABORT) + { + if (++voting_duration_ >= Protocol::VOTING_FRAME) + { + // end of voting frame + + if (current_.status == Protocol::TS_COMMIT) + { + { + if (initiated_) + { + MessageQueueAutoLock lock (send_out_); + send_out_.push (MessagePtr (new Commited)); + } + else // joined transaction + { + MessageQueueAutoLock lock (recv_out_); + recv_out_.push (MessagePtr (recv_.release ())); + recv_ = RecvPtr (); + } + } + + current_.status = Protocol::TS_COMMITED; + + // if (trace_) cerr << "commited transaction with id " + // << current_.id << endl; + } + else // TS_ABORT + { + if (initiated_) + { + MessageQueueAutoLock lock (send_out_); + send_out_.push (MessagePtr (new Aborted)); + } + else + { + // free revc_ buffer if necessary + // + if (recv_.get ()) recv_ = RecvPtr (); + } + + + current_.status = Protocol::TS_ABORTED; + + // if (trace_) cerr << "aborted transaction with id " + // << current_.id << endl; + } + + // start transaction separation frame (counts down) + // +1 because it will be decremented on this iteration + separation_duration_ = Protocol::SEPARATION_FRAME + 1; + } + } + + // Set current outsync info + + c.id = current_.id; + c.status = current_.status; + + + // Do some post-processing + + switch (current_.status) + { + case Protocol::TS_COMMITED: + case Protocol::TS_ABORTED: + { + if (separation_duration_ > 0) --separation_duration_; + break; + } + case Protocol::TS_BEGIN: + { + // transfer payload + + size = send_->size (); + memcpy (payload, send_->payload (), size); + + send_ = SendPtr (); + + // get redy to vote for 'commit' + + current_.status = Protocol::TS_COMMIT; + voting_duration_ = 0; + } + } + } + + void + current_transaction (Protocol::Transaction const& t, + void const* payload, + size_t size) + { + Protocol::TransactionId& id = current_.id; + Protocol::TransactionStatus& s = current_.status; + + if (id == 0 && t.id != 0) // catch up + { + switch (t.status) + { + case Protocol::TS_BEGIN: + case Protocol::TS_COMMIT: + case Protocol::TS_ABORT: + { + id = t.id - 1; + s = Protocol::TS_COMMITED; + break; + } + case Protocol::TS_ABORTED: + case Protocol::TS_COMMITED: + { + id = t.id; + s = t.status; + break; + } + } + + // if (trace_) cerr << "caught up with id " << id << endl; + } + + bool stable (s == Protocol::TS_COMMITED || s == Protocol::TS_ABORTED); + + switch (t.status) + { + case Protocol::TS_BEGIN: + { + if (!stable || t.id != id + 1) + { + // Transaction is in progress or hole in transaction id's + + // cerr << "unexpected request to join " << t + // << " while on " << current_ << endl; + + // if (!stable) cerr << "voting progress is " << voting_duration_ + // << "/" << Protocol::VOTING_FRAME << endl; + + if (t.id == id) // collision + { + if (!stable && s != Protocol::TS_ABORT) + { + // abort both + // cerr << "aborting both transactions" << endl; + + s = Protocol::TS_ABORT; + voting_duration_ = 0; //@@ reset voting frame + } + } + else + { + // @@ delicate case. need to think more + + // cerr << "Declaring node failed." << endl; + throw Failure (); + } + } + else + { + // join the transaction + + initiated_ = false; + + recv_ = RecvPtr (new Recv (payload, size)); + + id = t.id; + s = Protocol::TS_COMMIT; + voting_duration_ = 0; + + // if (trace_) cerr << "joining-for-commit transaction with id " + // << id << endl; + } + break; + } + case Protocol::TS_COMMIT: + { + if (stable && id == t.id - 1) + { + // not begin and and we haven't joined + + // join for abort + + initiated_ = false; + + current_.id = t.id; + current_.status = Protocol::TS_ABORT; + voting_duration_ = 0; + + // if (trace_) cerr << "joining-for-abort transaction with id " + // << current_.id << endl; + } + break; + } + case Protocol::TS_ABORT: + { + if ((!stable && id == t.id && s == Protocol::TS_COMMIT) || + (stable && id == t.id - 1)) // abort current || new transaction + { + // if (trace_) cerr << "voting-for-abort on transaction with id " + // << current_.id << endl; + + id = t.id; + s = Protocol::TS_ABORT; + + voting_duration_ = 0; //@@ reseting voting_duration_ + } + else + { + } + + break; + } + case Protocol::TS_ABORTED: + case Protocol::TS_COMMITED: + { + // nothing for now + break; + } + } + } + + void + api () + { + if ((current_.status == Protocol::TS_COMMITED || + current_.status == Protocol::TS_ABORTED) && + separation_duration_ == 0) // no transaction in progress + { + // start new transaction + + // Note that in_ is already locked by Scheduler + + MessagePtr m (in_.front ()); + in_.pop (); + + if (typeid (*m) == typeid (Send)) + { + send_ = SendPtr (dynamic_cast<Send*> (m.release ())); + } + else + { + // cerr << "Expecting Send but received " << typeid (*m).name () + // << endl; + + ::abort (); + } + + current_.id++; + current_.status = Protocol::TS_BEGIN; + + initiated_ = true; + + // if (trace_) cerr << "starting transaction with id " << current_.id + // << endl; + } + } + + private: + typedef ACE_Guard<ACE_Thread_Mutex> AutoLock; + + bool trace_; + + Protocol::Transaction current_; + + bool initiated_; + + unsigned short voting_duration_; + unsigned short separation_duration_; + + MessageQueue& in_; + MessageQueue& send_out_; + MessageQueue& recv_out_; + + SendPtr send_; + RecvPtr recv_; + }; +} |