diff options
Diffstat (limited to 'protocols/ace/TMCast')
-rw-r--r-- | protocols/ace/TMCast/ACE_TMCast.pc.in | 11 | ||||
-rw-r--r-- | protocols/ace/TMCast/Export.hpp | 58 | ||||
-rw-r--r-- | protocols/ace/TMCast/FaultDetector.hpp | 45 | ||||
-rw-r--r-- | protocols/ace/TMCast/Group.cpp | 511 | ||||
-rw-r--r-- | protocols/ace/TMCast/Group.hpp | 51 | ||||
-rw-r--r-- | protocols/ace/TMCast/GroupFwd.hpp | 15 | ||||
-rw-r--r-- | protocols/ace/TMCast/LinkListener.hpp | 171 | ||||
-rw-r--r-- | protocols/ace/TMCast/MTQueue.cpp | 24 | ||||
-rw-r--r-- | protocols/ace/TMCast/MTQueue.hpp | 176 | ||||
-rw-r--r-- | protocols/ace/TMCast/Makefile.am | 66 | ||||
-rw-r--r-- | protocols/ace/TMCast/Messaging.hpp | 54 | ||||
-rw-r--r-- | protocols/ace/TMCast/Protocol.cpp | 31 | ||||
-rw-r--r-- | protocols/ace/TMCast/Protocol.hpp | 107 | ||||
-rw-r--r-- | protocols/ace/TMCast/README | 240 | ||||
-rw-r--r-- | protocols/ace/TMCast/TMCast.mpc | 12 | ||||
-rw-r--r-- | protocols/ace/TMCast/Template_Instantiations.cpp | 31 | ||||
-rw-r--r-- | protocols/ace/TMCast/TransactionController.hpp | 388 |
17 files changed, 0 insertions, 1991 deletions
diff --git a/protocols/ace/TMCast/ACE_TMCast.pc.in b/protocols/ace/TMCast/ACE_TMCast.pc.in deleted file mode 100644 index a56956f81b2..00000000000 --- a/protocols/ace/TMCast/ACE_TMCast.pc.in +++ /dev/null @@ -1,11 +0,0 @@ -prefix=@prefix@ -exec_prefix=@exec_prefix@ -libdir=@libdir@ -includedir=@includedir@ - -Name: ACE_TMCast -Description: ACE Transaction Multicast Library -Requires: ACE -Version: @VERSION@ -Libs: -L${libdir} -lACE_TMCast -Cflags: -I${includedir} diff --git a/protocols/ace/TMCast/Export.hpp b/protocols/ace/TMCast/Export.hpp deleted file mode 100644 index bf04f7ee114..00000000000 --- a/protocols/ace/TMCast/Export.hpp +++ /dev/null @@ -1,58 +0,0 @@ - -// -*- C++ -*- -// $Id$ -// Definition for Win32 Export directives. -// This file is generated automatically by generate_export_file.pl ACE_TMCast -// ------------------------------ -#ifndef TMCAST_EXPORT_H -#define TMCAST_EXPORT_H - -#include "ace/config-all.h" - -#if defined (ACE_AS_STATIC_LIBS) && !defined (TMCAST_HAS_DLL) -# define TMCAST_HAS_DLL 0 -#endif /* ACE_AS_STATIC_LIBS && TMCAST_HAS_DLL */ - -#if !defined (TMCAST_HAS_DLL) -#define TMCAST_HAS_DLL 1 -#endif /* ! TMCAST_HAS_DLL */ - -#if defined (TMCAST_HAS_DLL) && (TMCAST_HAS_DLL == 1) -# if defined (TMCAST_BUILD_DLL) -# define ACE_TMCast_Export ACE_Proper_Export_Flag -# define TMCAST_SINGLETON_DECLARATION(T) ACE_EXPORT_SINGLETON_DECLARATION (T) -# define TMCAST_SINGLETON_DECLARE(SINGLETON_TYPE, CLASS, LOCK) ACE_EXPORT_SINGLETON_DECLARE(SINGLETON_TYPE, CLASS, LOCK) -# else /* TMCAST_BUILD_DLL */ -# define ACE_TMCast_Export ACE_Proper_Import_Flag -# define TMCAST_SINGLETON_DECLARATION(T) ACE_IMPORT_SINGLETON_DECLARATION (T) -# define TMCAST_SINGLETON_DECLARE(SINGLETON_TYPE, CLASS, LOCK) ACE_IMPORT_SINGLETON_DECLARE(SINGLETON_TYPE, CLASS, LOCK) -# endif /* TMCAST_BUILD_DLL */ -#else /* TMCAST_HAS_DLL == 1 */ -# define ACE_TMCast_Export -# define TMCAST_SINGLETON_DECLARATION(T) -# define TMCAST_SINGLETON_DECLARE(SINGLETON_TYPE, CLASS, LOCK) -#endif /* TMCAST_HAS_DLL == 1 */ - -// Set TMCAST_NTRACE = 0 to turn on library specific tracing even if -// tracing is turned off for ACE. -#if !defined (TMCAST_NTRACE) -# if (ACE_NTRACE == 1) -# define TMCAST_NTRACE 1 -# else /* (ACE_NTRACE == 1) */ -# define TMCAST_NTRACE 0 -# endif /* (ACE_NTRACE == 1) */ -#endif /* !TMCAST_NTRACE */ - -#if (TMCAST_NTRACE == 1) -# define TMCAST_TRACE(X) -#else /* (TMCAST_NTRACE == 1) */ -# if !defined (ACE_HAS_TRACE) -# define ACE_HAS_TRACE -# endif /* ACE_HAS_TRACE */ -# define TMCAST_TRACE(X) ACE_TRACE_IMPL(X) -# include "ace/Trace.h" -#endif /* (TMCAST_NTRACE == 1) */ - -#endif /* TMCAST_EXPORT_H */ - -// End of auto generated file. diff --git a/protocols/ace/TMCast/FaultDetector.hpp b/protocols/ace/TMCast/FaultDetector.hpp deleted file mode 100644 index 49ffcdd174c..00000000000 --- a/protocols/ace/TMCast/FaultDetector.hpp +++ /dev/null @@ -1,45 +0,0 @@ -// file : ACE_TMCast/FaultDetector.hpp -// author : Boris Kolpackov <boris@dre.vanderbilt.edu> -// cvs-id : $Id$ - -#include "Protocol.hpp" - -namespace ACE_TMCast -{ - class FaultDetector - { - public: - FaultDetector () - : alone_ (true), silence_period_ (-1) - { - } - - public: - class Failed {}; - - - void - insync () - { - if (alone_) - alone_ = false; - - silence_period_ = 0; - } - - void - outsync () - { - if (!alone_ && ++silence_period_ >= Protocol::FATAL_SILENCE_FRAME) - { - // cerr << "Silence period has been passed." << endl; - // cerr << "Decalring the node failed." << endl; - throw Failed (); - } - } - - private: - bool alone_; // true if we haven't heard from any members yet. - short silence_period_; - }; -} diff --git a/protocols/ace/TMCast/Group.cpp b/protocols/ace/TMCast/Group.cpp deleted file mode 100644 index 29187838a85..00000000000 --- a/protocols/ace/TMCast/Group.cpp +++ /dev/null @@ -1,511 +0,0 @@ -// file : ACE_TMCast/Group.cpp -// author : Boris Kolpackov <boris@dre.vanderbilt.edu> -// cvs-id : $Id$ - -#include "Group.hpp" - -#include <typeinfo> - -// OS primitives -#include <ace/OS.h> -#include <ace/OS_NS_stdlib.h> -#include <ace/Synch.h> -#include <ace/Time_Value.h> -#include <ace/SOCK_Dgram_Mcast.h> - -#include "Messaging.hpp" - -#include "Protocol.hpp" - -// Components - -#include "LinkListener.hpp" -#include "FaultDetector.hpp" -#include "TransactionController.hpp" - -namespace ACE_TMCast -{ - bool - operator== (std::type_info const* pa, std::type_info const& b) - { - return *pa == b; - } - - // - // - // - class Terminate : public virtual Message {}; - - - // - // - // - class Failure : public virtual Message {}; - - - // - // - // - class Scheduler - { - public: - Scheduler (ACE_INET_Addr const& addr, - char const* id, - MessageQueue& out_send_data, - MessageQueue& out_recv_data, - MessageQueue& out_control) - - : cond_ (mutex_), - - addr_ (addr), - sock_ (), - - out_control_ (out_control), - - in_data_ (mutex_), - in_link_data_(mutex_), - in_control_ (mutex_), - - sync_schedule (ACE_OS::gettimeofday ()), - - transaction_controller_ (in_data_, out_send_data, out_recv_data) - { - ACE_OS::strncpy (id_, id, Protocol::MEMBER_ID_LENGTH); - id_[Protocol::MEMBER_ID_LENGTH - 1] = '\0'; - - sock_.set_option (IP_MULTICAST_TTL, 32); // @@ ttl is hardcoded - - in_data_.subscribe (cond_); - in_link_data_.subscribe (cond_); - in_control_.subscribe (cond_); - - ACE_thread_t unused; - if (ACE_OS::thr_create (&thread_thunk, - this, - THR_JOINABLE, - &unused, - &thread_) != 0) ACE_OS::abort (); - } - - virtual ~Scheduler () - { - { - MessageQueueAutoLock lock (in_control_); - - in_control_.push (MessagePtr (new Terminate)); - } - - if (ACE_OS::thr_join (thread_, 0) != 0) ACE_OS::abort (); - - // cerr << "Scheduler is down." << endl; - } - - public: - MessageQueue& - in_data () - { - return in_data_; - } - - private: - static ACE_THR_FUNC_RETURN - thread_thunk (void* arg) - { - Scheduler* obj = reinterpret_cast<Scheduler*> (arg); - obj->execute (); - return 0; - } - - void - execute () - { - try - { - sock_.join (addr_); - auto_ptr<LinkListener> ll (new LinkListener (sock_, in_link_data_)); - - { - AutoLock lock (mutex_); - - // Loop - // - // - - while (true) - { - cond_.wait (&sync_schedule); - - // "Loop of Fairness" - - bool done = false; - - do - { - // control message - // - // - if (!in_control_.empty ()) - { - done = true; - break; - } - - // outsync - // - // - if (sync_schedule < ACE_OS::gettimeofday ()) - { - // OUTSYNC - - outsync (); - - // schedule next outsync - sync_schedule = - ACE_OS::gettimeofday () + - ACE_Time_Value (0, Protocol::SYNC_PERIOD); - } - - // link message - // - // - if (!in_link_data_.empty ()) - { - MessagePtr m (in_link_data_.front ()); - in_link_data_.pop (); - - std::type_info const* exp = &typeid (*m); - - if (exp == typeid (LinkFailure)) - { - // cerr << "link failure" << endl; - throw false; - } - else if (exp == typeid (LinkData)) - { - - LinkData* data = dynamic_cast<LinkData*> (m.get ()); - - // INSYNC, TL, CT - - // Filter out loopback. - // - if (ACE_OS::strcmp (data->header().member_id.id, id_) != 0) - { - insync (); - transaction_list (); - current_transaction (data->header().current, - data->payload (), - data->size ()); - } - } - else - { - // cerr << "unknown message type from link listener: " - // << typeid (*m).name () << endl; - ACE_OS::abort (); - } - } - - // api message - // - // - if (!in_data_.empty ()) - { - // API - - api (); - } - - } while (!in_link_data_.empty() || - sync_schedule < ACE_OS::gettimeofday ()); - - if (done) break; - } - } - } - catch (...) - { - // cerr << "Exception in scheduler loop." << endl; - MessageQueueAutoLock lock (out_control_); - out_control_.push (MessagePtr (new Failure)); - } - } - - // Events - // - // Order: - // - // INSYNC, TSL, VOTE, BEGIN - // API - // OUTSYNC - // - - void - insync () - { - fault_detector_.insync (); - } - - void - outsync () - { - char buf[Protocol::MAX_MESSAGE_SIZE]; - - Protocol::MessageHeader* hdr = - reinterpret_cast<Protocol::MessageHeader*> (buf); - - void* data = buf + sizeof (Protocol::MessageHeader); - - hdr->length = sizeof (Protocol::MessageHeader); - hdr->check_sum = 0; - - ACE_OS::strcpy (hdr->member_id.id, id_); - - size_t size (0); - - transaction_controller_.outsync (hdr->current, data, size); - - hdr->length += size; - - fault_detector_.outsync (); - - // sock_.send (buf, hdr->length, addr_); - sock_.send (buf, hdr->length); - } - - void - transaction_list () - { - } - - void - current_transaction (Protocol::Transaction const& t, - void const* payload, - size_t size) - { - transaction_controller_.current_transaction (t, payload, size); - } - - void - api () - { - transaction_controller_.api (); - } - - private: - ACE_hthread_t thread_; - - ACE_Thread_Mutex mutex_; - ACE_Condition<ACE_Thread_Mutex> cond_; - - typedef ACE_Guard<ACE_Thread_Mutex> AutoLock; - - char id_[Protocol::MEMBER_ID_LENGTH]; - - ACE_INET_Addr addr_; - ACE_SOCK_Dgram_Mcast sock_; - - MessageQueue& out_control_; - - MessageQueue in_data_; - MessageQueue in_link_data_; - MessageQueue in_control_; - - // Protocol state - // - // - - ACE_Time_Value sync_schedule; - - FaultDetector fault_detector_; - TransactionController transaction_controller_; - }; - - - // - // - // - class Group::GroupImpl - { - public: - virtual ~GroupImpl () - { - } - - GroupImpl (ACE_INET_Addr const& addr, char const* id) - throw (Group::Failed) - : send_cond_ (mutex_), - recv_cond_ (mutex_), - failed_ (false), - in_send_data_ (mutex_), - in_recv_data_ (mutex_), - in_control_ (mutex_), - scheduler_ (new Scheduler (addr, - id, - in_send_data_, - in_recv_data_, - in_control_)), - out_data_ (scheduler_->in_data ()) - { - in_send_data_.subscribe (send_cond_); - in_recv_data_.subscribe (recv_cond_); - - in_control_.subscribe (send_cond_); - in_control_.subscribe (recv_cond_); - } - - void - send (void const* msg, size_t size) - throw (Group::InvalidArg, Group::Failed, Group::Aborted) - { - if (size > Protocol::MAX_PAYLOAD_SIZE) throw InvalidArg (); - - // Note the potential deadlock if I lock mutex_ and out_data_ in - // reverse order. - - MessageQueueAutoLock l1 (out_data_); - AutoLock l2 (mutex_); - - throw_if_failed (); - - out_data_.push (MessagePtr (new Send (msg, size))); - - l1.unlock (); // no need to keep it locked - - while (true) - { - throw_if_failed (); - - if (!in_send_data_.empty ()) - { - MessagePtr m (in_send_data_.front ()); - in_send_data_.pop (); - - std::type_info const* exp = &typeid (*m); - - if (exp == typeid (ACE_TMCast::Aborted)) - { - throw Group::Aborted (); - } - else if (exp == typeid (Commited)) - { - return; - } - else - { - // cerr << "send: group-scheduler messaging protocol violation; " - // << "unexpected message " << typeid (*m).name () - // << " " << typeid (Aborted).name () << endl; - - ACE_OS::abort (); - } - } - - // cerr << "send: waiting on condition" << endl; - send_cond_.wait (); - // cerr << "send: wokeup on condition" << endl; - } - } - - - - size_t - recv (void* msg, size_t size) throw (Group::Failed, Group::InsufficienSpace) - { - AutoLock lock (mutex_); - - while (true) - { - throw_if_failed (); - - if (!in_recv_data_.empty ()) - { - MessagePtr m (in_recv_data_.front ()); - in_recv_data_.pop (); - - std::type_info const* exp = &typeid (*m); - - if (exp == typeid (Recv)) - { - Recv* data = dynamic_cast<Recv*> (m.get ()); - - if (size < data->size ()) throw Group::InsufficienSpace (); - - memcpy (msg, data->payload (), data->size ()); - - return data->size (); - } - else - { - // cerr << "recv: group-scheduler messaging protocol violation. " - // << "unexpected message " << typeid (*m).name () << endl; - - ACE_OS::abort (); - } - } - - recv_cond_.wait (); - } - } - - private: - void - throw_if_failed () - { - if (!failed_ && !in_control_.empty ()) failed_ = true; - - if (failed_) throw Group::Failed (); - } - - private: - ACE_Thread_Mutex mutex_; - ACE_Condition<ACE_Thread_Mutex> send_cond_; - ACE_Condition<ACE_Thread_Mutex> recv_cond_; - - typedef ACE_Guard<ACE_Thread_Mutex> AutoLock; - - bool failed_; - - MessageQueue in_send_data_; - MessageQueue in_recv_data_; - MessageQueue in_control_; - - auto_ptr<Scheduler> scheduler_; - - MessageQueue& out_data_; - }; - - - // Group - // - // - Group:: - Group (ACE_INET_Addr const& addr, char const* id) - throw (Group::Failed) - : pimpl_ (new GroupImpl (addr, id)) - { - } - - Group:: - ~Group () - { - } - - void - Group::send (void const* msg, size_t size) throw (Group::InvalidArg, Group::Failed, Group::Aborted) - { - pimpl_->send (msg, size); - } - - size_t - Group::recv (void* msg, size_t size) throw (Group::Failed, Group::InsufficienSpace) - { - return pimpl_->recv (msg, size); - } -} - -#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION) -template class ACE_Condition<ACE_Thread_Mutex>; -#elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA) -#pragma instantiate ACE_Condition<ACE_Thread_Mutex> -#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */ diff --git a/protocols/ace/TMCast/Group.hpp b/protocols/ace/TMCast/Group.hpp deleted file mode 100644 index 13c49f210bb..00000000000 --- a/protocols/ace/TMCast/Group.hpp +++ /dev/null @@ -1,51 +0,0 @@ -// file : ACE_TMCast/Group.hpp -// author : Boris Kolpackov <boris@dre.vanderbilt.edu> -// cvs-id : $Id$ - -#ifndef TMCAST_GROUP_HPP -#define TMCAST_GROUP_HPP - -#include <ace/Auto_Ptr.h> -#include <ace/INET_Addr.h> - -#include "Export.hpp" - -namespace ACE_TMCast -{ - class ACE_TMCast_Export Group - { - public: - class Aborted {}; - class Failed {}; - class InvalidArg {}; - class InsufficienSpace {}; - - public: - ~Group (); - - Group (ACE_INET_Addr const& addr, char const* id) throw (Failed); - - public: - void - send (void const* msg, size_t size) throw (InvalidArg, Failed, Aborted); - - size_t - recv (void* msg, size_t size) throw (Failed, InsufficienSpace); - - private: - bool - failed (); - - private: - class GroupImpl; - auto_ptr<GroupImpl> pimpl_; - - private: - Group (Group const&); - - Group& - operator= (Group const&); - }; -} - -#endif // TMCAST_GROUP_HPP diff --git a/protocols/ace/TMCast/GroupFwd.hpp b/protocols/ace/TMCast/GroupFwd.hpp deleted file mode 100644 index b4ed7304ff7..00000000000 --- a/protocols/ace/TMCast/GroupFwd.hpp +++ /dev/null @@ -1,15 +0,0 @@ -// file : ACE_TMCast/GroupFwd.hpp -// author : Boris Kolpackov <boris@dre.vanderbilt.edu> -// cvs-id : $Id$ - -#ifndef TMCAST_GROUP_FWD_HPP -#define TMCAST_GROUP_FWD_HPP - -#include "Export.hpp" - -namespace ACE_TMCast -{ - class ACE_TMCast_Export Group; -} - -#endif // TMCAST_GROUP_FWD_HPP diff --git a/protocols/ace/TMCast/LinkListener.hpp b/protocols/ace/TMCast/LinkListener.hpp deleted file mode 100644 index 983f7828f3e..00000000000 --- a/protocols/ace/TMCast/LinkListener.hpp +++ /dev/null @@ -1,171 +0,0 @@ -// file : ACE_TMCast/LinkListener.hpp -// author : Boris Kolpackov <boris@dre.vanderbilt.edu> -// cvs-id : $Id$ - -// OS primitives -#include <ace/OS_NS_string.h> -#include <ace/OS_NS_stdlib.h> -#include <ace/Synch.h> -#include <ace/SOCK_Dgram_Mcast.h> -#include <ace/Refcounted_Auto_Ptr.h> - - -#include "Messaging.hpp" -#include "Protocol.hpp" - -namespace ACE_TMCast -{ - // - // - // - class LinkFailure : public virtual Message {}; - - - // - // - // - class LinkData : public virtual Message - { - public: - LinkData (Protocol::MessageHeader const* header, - void* payload, - size_t size) - : size_ (size) - { - ACE_OS::memcpy (&header_, header, sizeof (Protocol::MessageHeader)); - ACE_OS::memcpy (payload_, payload, size_); - } - - Protocol::MessageHeader const& - header () const - { - return header_; - } - - void const* - payload () const - { - return payload_; - } - - size_t - size () const - { - return size_; - } - - private: - Protocol::MessageHeader header_; - char payload_[Protocol::MAX_MESSAGE_SIZE]; - size_t size_; - }; - - typedef - ACE_Refcounted_Auto_Ptr<LinkData, ACE_Null_Mutex> - LinkDataPtr; - - // - // - // - class LinkListener - { - private: - class Terminate : public virtual Message {}; - - public: - LinkListener (ACE_SOCK_Dgram_Mcast& sock, MessageQueue& out) - : sock_(sock), out_ (out) - { - ACE_thread_t unused; - if (ACE_OS::thr_create (&thread_thunk, - this, - THR_JOINABLE, - &unused, - &thread_) != 0) ACE_OS::abort (); - } - - ~LinkListener () - { - { - MessageQueueAutoLock lock (control_); - - control_.push (MessagePtr (new Terminate)); - } - - if (ACE_OS::thr_join (thread_, 0) != 0) ACE_OS::abort (); - - // cerr << "Link listener is down." << endl; - } - - - static ACE_THR_FUNC_RETURN - thread_thunk (void* arg) - { - LinkListener* obj = reinterpret_cast<LinkListener*> (arg); - - obj->execute (); - return 0; - } - - void - execute () - { - char msg[Protocol::MAX_MESSAGE_SIZE]; - - ssize_t header_size = sizeof (Protocol::MessageHeader); - - // OS::Time timeout (1000000); // one millisecond - - ACE_Time_Value timeout (0, 1000); // one millisecond - - try - { - while (true) - { - // Check control message queue - - { - MessageQueueAutoLock lock (control_); - - if (!control_.empty ()) break; - } - - ACE_INET_Addr junk; - ssize_t n = sock_.recv (msg, - Protocol::MAX_MESSAGE_SIZE, - junk, - 0, - &timeout); - - if (n != -1) - { - if (n < header_size) throw false; - - Protocol::MessageHeader* header = - reinterpret_cast<Protocol::MessageHeader*> (msg); - - MessageQueueAutoLock lock (out_); - - out_.push (MessagePtr (new LinkData (header, - msg + header_size, - n - header_size))); - } - } - } - catch (...) - { - MessageQueueAutoLock lock (out_); - - out_.push (MessagePtr (new LinkFailure)); - } - } - - private: - typedef ACE_Guard<ACE_Thread_Mutex> AutoLock; - - ACE_hthread_t thread_; - ACE_SOCK_Dgram_Mcast& sock_; - MessageQueue& out_; - MessageQueue control_; - }; -} diff --git a/protocols/ace/TMCast/MTQueue.cpp b/protocols/ace/TMCast/MTQueue.cpp deleted file mode 100644 index 32d10eb23b7..00000000000 --- a/protocols/ace/TMCast/MTQueue.cpp +++ /dev/null @@ -1,24 +0,0 @@ -// file : ACE_TMCast/MTQueue.cpp -// author : Steve Huston <shuston@riverace.com> -// cvs-id : $Id$ - -#include "LinkListener.hpp" -#include "MTQueue.hpp" - -#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION) -template class ACE_Node<ACE_Refcounted_Auto_Ptr<ACE_TMCast::Message,ACE_Null_Mutex> >; -template class ACE_Unbounded_Queue<ACE_Refcounted_Auto_Ptr<ACE_TMCast::Message,ACE_Null_Mutex> >; -template class ACE_Unbounded_Queue_Iterator<ACE_Refcounted_Auto_Ptr<ACE_TMCast::Message,ACE_Null_Mutex> >; -template class ACE_Node<ACE_Condition<ACE_Thread_Mutex>* >; -template class ACE_Unbounded_Set<ACE_Condition<ACE_Thread_Mutex>* >; -template class ACE_Unbounded_Set_Iterator<ACE_Condition<ACE_Thread_Mutex> *>; -template class ACE_Unbounded_Set_Const_Iterator<ACE_Condition<ACE_Thread_Mutex> *>; -#elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA) -#pragma instantiate ACE_Node<ACE_Refcounted_Auto_Ptr<ACE_TMCast::Message,ACE_Null_Mutex> > -#pragma instantiate ACE_Unbounded_Queue<ACE_Refcounted_Auto_Ptr<ACE_TMCast::Message,ACE_Null_Mutex> > -#pragma instantiate ACE_Unbounded_Queue_Iterator<ACE_Refcounted_Auto_Ptr<ACE_TMCast::Message,ACE_Null_Mutex> > -#pragma instantiate ACE_Node<ACE_Condition<ACE_Thread_Mutex> *> -#pragma instantiate ACE_Unbounded_Set<ACE_Condition<ACE_Thread_Mutex> *> -#pragma instantiate ACE_Unbounded_Set_Iterator<ACE_Condition<ACE_Thread_Mutex> *> -#pragma instantiate ACE_Unbounded_Set_Const_Iterator<ACE_Condition<ACE_Thread_Mutex> *> -#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */ diff --git a/protocols/ace/TMCast/MTQueue.hpp b/protocols/ace/TMCast/MTQueue.hpp deleted file mode 100644 index 2eb128823fe..00000000000 --- a/protocols/ace/TMCast/MTQueue.hpp +++ /dev/null @@ -1,176 +0,0 @@ -// file : ACE_TMCast/MTQueue.hpp -// author : Boris Kolpackov <boris@dre.vanderbilt.edu> -// cvs-id : $Id$ - -#ifndef TMCAST_MT_QUEUE_HPP -#define TMCAST_MT_QUEUE_HPP - -#include "ace/Auto_Ptr.h" -#include "ace/Unbounded_Set.h" -#include "ace/Unbounded_Queue.h" -#include "ace/os_include/sys/os_types.h" - -namespace ACE_TMCast -{ - template <typename T, - typename M, - typename C, - typename Q = ACE_Unbounded_Queue<T> > - class MTQueue - { - public: - typedef T ElementType; - typedef M MutexType; - typedef C ConditionalType; - typedef Q QueueType; - - public: - - MTQueue () - : mutexp_ (new MutexType), - mutex_ (*mutexp_), - queue_ (), - signal_ (false) - { - } - - MTQueue (MutexType& mutex) - : mutexp_ (), - mutex_ (mutex), - queue_ (), - signal_ (false) - { - } - - public: - bool - empty () const - { - return queue_.is_empty (); - } - - size_t - size () const - { - return queue_.size (); - } - - // typedef typename QueueType::Empty Empty; - - class Empty {}; - - T& - front () - { - ACE_Unbounded_Queue_Iterator<T> f (queue_); - T* tmp; - if (!f.next (tmp)) throw Empty (); - - return *tmp; - } - - - T const& - front () const - { - ACE_Unbounded_Queue_Const_Iterator<T> f (queue_); - T* tmp; - if (!f.next (tmp)) throw Empty (); - - return *tmp; - } - - /* - T& - back () - { - return queue_.back (); - } - - - T const& - back () const - { - return queue_.back (); - } - */ - - void - push (T const& t) - { - signal_ = empty (); - queue_.enqueue_tail (t); - } - - void - pop () - { - T junk; - queue_.dequeue_head (junk); - } - - public: - void - lock () const - { - mutex_.acquire (); - } - - void - unlock () const - { - if (signal_) - { - signal_ = false; - - for (ConditionalSetConstIterator_ i (cond_set_); - !i.done (); - i.advance ()) - { - ConditionalType** c = 0; - - i.next (c); - - (*c)->signal (); - } - } - - mutex_.release (); - } - - void - subscribe (ConditionalType& c) - { - //@@ should check for duplicates - // - cond_set_.insert (&c); - } - - void - unsubscribe (ConditionalType& c) - { - //@@ should check for absence - // - cond_set_.remove (&c); - } - - private: - auto_ptr<MutexType> mutexp_; - MutexType& mutex_; - QueueType queue_; - - typedef - ACE_Unbounded_Set<ConditionalType*> - ConditionalSet_; - - typedef - ACE_Unbounded_Set_Const_Iterator<ConditionalType*> - ConditionalSetConstIterator_; - - ConditionalSet_ cond_set_; - - mutable bool signal_; - }; -} - -#endif // TMCAST_MT_QUEUE_HPP diff --git a/protocols/ace/TMCast/Makefile.am b/protocols/ace/TMCast/Makefile.am deleted file mode 100644 index 3338e4a8f64..00000000000 --- a/protocols/ace/TMCast/Makefile.am +++ /dev/null @@ -1,66 +0,0 @@ -## Process this file with automake to create Makefile.in -## -## $Id$ -## - -includedir = @includedir@/ace/TMCast -pkgconfigdir = @libdir@/pkgconfig - -ACE_BUILDDIR = $(top_builddir) -ACE_ROOT = $(top_srcdir) - - -## Makefile.TMCast.am - -if BUILD_EXCEPTIONS -if BUILD_THREADS - -lib_LTLIBRARIES = libACE_TMCast.la - -libACE_TMCast_la_CPPFLAGS = \ - -I$(ACE_ROOT) \ - -I$(ACE_BUILDDIR) \ - -DTMCAST_BUILD_DLL - -libACE_TMCast_la_SOURCES = \ - Group.cpp \ - MTQueue.cpp \ - Protocol.cpp \ - Template_Instantiations.cpp - -libACE_TMCast_la_LDFLAGS = \ - -version-number @ACE_MAJOR@:@ACE_MINOR@:@ACE_BETA@ - -libACE_TMCast_la_LIBADD = \ - $(ACE_BUILDDIR)/ace/libACE.la - -nobase_include_HEADERS = \ - Export.hpp \ - FaultDetector.hpp \ - Group.hpp \ - GroupFwd.hpp \ - LinkListener.hpp \ - MTQueue.hpp \ - Messaging.hpp \ - Protocol.hpp \ - TransactionController.hpp - -pkgconfig_DATA = ACE_TMCast.pc - -ACE_TMCast.pc: ${top_builddir}/config.status ${srcdir}/ACE_TMCast.pc.in - ${top_builddir}/config.status --file $@:${srcdir}/ACE_TMCast.pc.in - -endif BUILD_THREADS -endif BUILD_EXCEPTIONS - -EXTRA_DIST = \ - ACE_TMCast.pc.in - - -## Clean up template repositories, etc. -clean-local: - -rm -f *~ *.bak *.rpo *.sym lib*.*_pure_* core core.* - -rm -f gcctemp.c gcctemp so_locations *.ics - -rm -rf cxx_repository ptrepository ti_files - -rm -rf templateregistry ir.out - -rm -rf ptrepository SunWS_cache Templates.DB diff --git a/protocols/ace/TMCast/Messaging.hpp b/protocols/ace/TMCast/Messaging.hpp deleted file mode 100644 index 886745d1120..00000000000 --- a/protocols/ace/TMCast/Messaging.hpp +++ /dev/null @@ -1,54 +0,0 @@ -// file : ACE_TMCast/Messaging.hpp -// author : Boris Kolpackov <boris@dre.vanderbilt.edu> -// cvs-id : $Id$ - -#ifndef TMCAST_MESSAGING_HPP -#define TMCAST_MESSAGING_HPP - -#include <ace/Synch.h> -#include <ace/Refcounted_Auto_Ptr.h> - -#include "MTQueue.hpp" - -namespace ACE_TMCast -{ - class Message - { - public: - virtual - ~Message () {} - }; - - typedef - ACE_Refcounted_Auto_Ptr<Message, ACE_Null_Mutex> - MessagePtr; - - typedef - MTQueue<MessagePtr, ACE_Thread_Mutex, ACE_Condition<ACE_Thread_Mutex> > - MessageQueue; - - struct MessageQueueAutoLock - { - MessageQueueAutoLock (MessageQueue& q) - : q_ (q) - { - q_.lock (); - } - - void - unlock () - { - q_.unlock (); - } - - ~MessageQueueAutoLock () - { - q_.unlock (); - } - - private: - MessageQueue& q_; - }; -} - -#endif // TMCAST_MESSAGING_HPP diff --git a/protocols/ace/TMCast/Protocol.cpp b/protocols/ace/TMCast/Protocol.cpp deleted file mode 100644 index ea4c6b39020..00000000000 --- a/protocols/ace/TMCast/Protocol.cpp +++ /dev/null @@ -1,31 +0,0 @@ -// file : ACE_TMCast/Protocol.cpp -// author : Boris Kolpackov <boris@dre.vanderbilt.edu> -// cvs-id : $Id$ - -#include "Protocol.hpp" - -namespace ACE_TMCast -{ - namespace Protocol - { - /* - namespace - { - char const* labels[] = { - "NONE", "BEGIN", "COMMIT", "ABORT", "COMMITED", "ABORTED"}; - } - - std::string - tslabel (Protocol::TransactionStatus s) - { - return labels[s]; - } - - std::ostream& - operator << (std::ostream& o, Transaction const& t) - { - return o << "{" << t.id << "; " << tslabel (t.status) << "}"; - } - */ - } -} diff --git a/protocols/ace/TMCast/Protocol.hpp b/protocols/ace/TMCast/Protocol.hpp deleted file mode 100644 index 6cdf374f4f9..00000000000 --- a/protocols/ace/TMCast/Protocol.hpp +++ /dev/null @@ -1,107 +0,0 @@ -// file : ACE_TMCast/Protocol.hpp -// author : Boris Kolpackov <boris@dre.vanderbilt.edu> -// cvs-id : $Id$ - -#ifndef TMCAST_PROTOCOL_HPP -#define TMCAST_PROTOCOL_HPP - -namespace ACE_TMCast -{ - namespace Protocol - { - // - // - // - unsigned long const MEMBER_ID_LENGTH = 38; - - struct MemberId - { - char id[MEMBER_ID_LENGTH]; - /* - unsigned long ip; - unsigned short port; - */ - }; - - // - // - // - typedef unsigned short TransactionId; - - - - typedef unsigned char TransactionStatus; - - TransactionStatus const TS_BEGIN = 1; - TransactionStatus const TS_COMMIT = 2; - TransactionStatus const TS_ABORT = 3; - TransactionStatus const TS_COMMITED = 4; - TransactionStatus const TS_ABORTED = 5; - - struct Transaction - { - TransactionId id; - TransactionStatus status; - }; - - // Transaction List (TL) - - // unsigned long const TL_LENGTH = 1; - - // typedef Transaction TransactionList[TL_LENGTH]; - - - // - // - // - struct MessageHeader - { - unsigned long length; - - unsigned long check_sum; - - MemberId member_id; - - Transaction current; - - //TransactionList transaction_list; - }; - - - // - // - // - - unsigned long const MAX_MESSAGE_SIZE = 768; - - unsigned long const - MAX_PAYLOAD_SIZE = MAX_MESSAGE_SIZE - sizeof (MessageHeader); - - // Protocol timing - // - // - - unsigned long const SYNC_PERIOD = 30000; // in mks - - unsigned short const VOTING_FRAME = 4; // in SYNC_PERIOD's - unsigned short const SEPARATION_FRAME = 5; // in SYNC_PERIOD's - - // FATAL_SILENCE_FRAME in SYNC_PERIOD's - // Generally it's a good idea to set it to < VOTING_FRAME + SEPARATION_FRAME - // - - short const FATAL_SILENCE_FRAME = VOTING_FRAME + SEPARATION_FRAME - 2; - - // short const FATAL_SILENCE_FRAME = 10000; - - // Helpers - - // std::string - // tslabel (Protocol::TransactionStatus s); - - // std::ostream& - // operator << (std::ostream& o, Transaction const& t); - } -} - -#endif // TMCAST_PROTOCOL_HPP diff --git a/protocols/ace/TMCast/README b/protocols/ace/TMCast/README deleted file mode 100644 index 7104be46e30..00000000000 --- a/protocols/ace/TMCast/README +++ /dev/null @@ -1,240 +0,0 @@ - - -Introduction ------------- - -TMCast (stands for Transaction MultiCast) is an implementation of a -transactional multicast protocol. In essence, the idea is to represent -each message delivery to members of a multicast group as a transaction -- an atomic, consistent and isolated action. A multicast transaction -can be viewed as an atomic transition of the group members to a new -state. If we define [Mo] as a set of operational (non-faulty) members -of the group, [Mf] as a set of faulty members of the group, [Ma] as a -set of members that view transition [Tn] as aborted and [Mc] as a set -of members that view transition [Tn] as committed, then this atomic -transition [Tn] should satisfy one of the following equations: - -Mo(Tn-1) = Ma(T) U Mf(T) -Mo(Tn-1) = Mc(T) U Mf(T) - -Or, in other words, after transaction T has been committed (aborted), -all operational (before transaction T) members are either in the -committed (aborted) or failed state. - -Thus, for each member of the group, outcome of the transaction can be -commit, abort or a member failure. It is important for a member to -exhibit a failfast (error latency is less than transaction cycle) -behavior. Or, in other words, if a member transitioned into a wrong -state, it is guaranteed to fail instead of delivering a wrong result. - -In order to achieve such an error detection in a decentralized -environment, certain limitations were imposed. One of the most -user-visible limitation is the fact that the lifetime of the group -with only one member is very short. This is because there is not way -for a member to distinguish "no members yet" case from "my link to the -group is down". In such a situation, the member assumes the latter -case. There is also a military saying that puts it quite nicely: two -is one, one is nothing. - - - -State of Implementation ------------------------ - -The current implementation is in a prototypical stage. The following -parts are not implemented or still under development: - -* Handling of network partitioning (TODO) - -* Redundant network support (TODO) - -* Member failure detection (partial implementation) - - -Examples --------- - -There is a simple example available in examples/TMCast/Member with -the corresponding README. - - -Architecture ------------- - -Primary goals of the protocol are to (1) mask transient failures of the -underlying multicast protocol (or, more precisely, allow to recover -from transient failures) and (2) exhibit failfast behavior in cases of -permanent failures. - -The distinction between transient and permanent failures is based on -timeouts thus what can be a transient failure in one configuration of -the protocol could be a permanent failure in the other. - -[Maybe talk more about a transient/permanent threshold and its effect -on performance/resource utilization/etc.] - -[Maybe add a terminology section.] - -Each member of a multicast group has its unique (group-wise) id: - -struct MemberId -{ - char id[MEMBER_ID_LENGTH]; -}; - -Each payload delivery is part of a transaction. Each transaction is -identified by TransactionId: - -typedef unsigned short TransactionId; - - -Each transaction has a status code which identifies its state, as viewed by -a group member: - - -typedef unsigned char TransactionStatus; - -TransactionStatus const TS_BEGIN = 1; -TransactionStatus const TS_COMMIT = 2; -TransactionStatus const TS_ABORT = 3; -TransactionStatus const TS_COMMITTED = 4; -TransactionStatus const TS_ABORTED = 5; - -Thus each transaction is described by its id and status: - -struct Transaction -{ - TransactionId id; - TransactionStatus status; -}; - -The outcome of some predefined number of recent transactions is stored -in TransactionList: - -typedef Transaction TransactionList[TL_LENGTH]; - - -Each message sent to a multicast group has the following header: - -struct MessageHeader -{ - unsigned long length; - unsigned long check_sum; - MemberId member_id; - Transaction current; - TransactionList transaction_list; -}; - -[Maybe describe each field here.] - -A new member joins the group with transaction id 0 and status -TS_COMMITTED. - -Each member sends a periodic 'pulse' messages with some predefined interval -advertising its current view of the group. This includes the state of the -current transaction and the history of the recent transactions. - - -If a member of the group needs a payload delivery it starts a new -transaction by sending a message with current transaction set to - -{++current_id, TS_BEGIN} - -and payload appended after the header. - - -Each member joins a transaction in one of the following ways: - -* A member that began the transaction joins it 'to commit' (TS_COMMIT) - -* A member that received TS_BEGIN joins current transaction 'to commit' - (TS_COMMIT). - -* A member that received TS_COMMIT or TS_ABORT but did not receive TS_BEGIN - joins current transaction 'to abort' (TS_ABORT). - - -After a member has joined the transaction it starts participating in the -transaction's voting phase. On this phase members of the group decide the -fate of the transaction. Each member sends a predefined number of messages -where it announces its vote. In between those messages the member is receiving -and processing votes from other members and can be influenced by their -'opinion'. - -In their decision-making members follow the principle of the majority. As -the voting progresses (and comes close to an end) members become more and -more reluctant to deviate from the decision of the majority. - -[Maybe add an equation that measures member's willingness to change -its mind.] - -At the end of the voting phase each member declares the current transaction -either committed (TS_COMMITTED) or aborted (TS_ABORTED). If this decision does -not agree with the majority the member declares itself failed. - -In addition, each member builds a 'majority view' of the transaction history -(based on transaction_list). If it deviates from the member's own history the -member declares itself failed. - -Here are some example scenarios of how the protocol behaves in different -situations. Let's say we have three members of the group S, R1, R2. S -initiates a transaction. R1 and R2 join it. - -Scenario 1. (two-step voting) - -1. S initiates a transaction (TS_BEGIN) -2a. R1 receives TS_BEGIN, joins for commit -2b. R2 receives TS_BEGIN, joins for commit -3a. S announces TS_COMMIT (first vote) -3b. R1 announces TS_COMMIT (first vote) -3c. R2 announces TS_COMMIT (first vote) -4a. S announces TS_COMMIT (second vote) -4b. R1 announces TS_COMMIT (second vote) -4c. R2 announces TS_COMMIT (second vote) -5a. S announces TS_COMMITTED (end of vote) -5b. R1 announces TS_COMMITTED (end of vote) -5c. R2 announces TS_COMMITTED (end of vote) - - -Scenario 2. (two-step voting) - -1. S initiates a transaction (TS_BEGIN) -2a. R1 receives TS_BEGIN, joins for commit -2b. R2 didn't receive TS_BEGIN -3a. S announces TS_COMMIT (first vote) -3b. R1 announces TS_COMMIT (first vote) -3c. R2 received R1's TS_COMMIT announces TS_ABORT (first vote) -4a. S received R2's TS_ABORT announces TS_ABORT (second vote) -4b. R1 received R2's TS_ABORT announces TS_ABORT (second vote) -4c. R2 announces TS_ABORT (second vote) -5a. S announces TS_ABORTED (end of vote) -5b. R1 announces TS_ABORTED (end of vote) -5c. R2 announces TS_ABORTED (end of vote) - - -Scenario 3. (three-step voting) - -1. S initiates a transaction (TS_BEGIN) -2a. R1 receives TS_BEGIN, joins for commit -2b. R2 didn't receive TS_BEGIN -3a. S announces TS_COMMIT (first vote) -3b. R1 announces TS_COMMIT (first vote) -3c. R2 still didn't receive anything -4a. S announces TS_COMMIT (second vote) -4b. R1 announces TS_COMMIT (second vote) -4c. R2 received R1's TS_COMMIT, announces TS_ABORT (first vote) - -5a. S received R2's TS_ABORT but it is the end of the voting phase and - majority (S and R1) vote for commit, announces TS_COMMIT (third vote) -5b. R1 received R2's TS_ABORT but it is the end of the voting phase and - majority (S and R1) vote for commit, announces TS_COMMIT (third vote) -5c. R2 announces TS_ABORT (second vote) - -6a. S announces TS_COMMITTED (end of vote) -6b. R1 announces TS_COMMITTED (end of vote) -6c. R2 discovers that the the majority has declared current transaction - committed and thus declares itself failed. - - --- -Boris Kolpackov <boris@dre.vanderbilt.edu> diff --git a/protocols/ace/TMCast/TMCast.mpc b/protocols/ace/TMCast/TMCast.mpc deleted file mode 100644 index 1ff937a0a1a..00000000000 --- a/protocols/ace/TMCast/TMCast.mpc +++ /dev/null @@ -1,12 +0,0 @@ -// -*- MPC -*- -// $Id$ - -project : acelib, core, exceptions, threads { - avoids = ace_for_tao - sharedname = ACE_TMCast - dynamicflags += TMCAST_BUILD_DLL - - Pkgconfig_Files { - ACE_TMCast.pc.in - } -} diff --git a/protocols/ace/TMCast/Template_Instantiations.cpp b/protocols/ace/TMCast/Template_Instantiations.cpp deleted file mode 100644 index 62e9b24a8bc..00000000000 --- a/protocols/ace/TMCast/Template_Instantiations.cpp +++ /dev/null @@ -1,31 +0,0 @@ -// $Id$ - -// Note: this file is here only until support for explicit template -// instantiations is removed from ACE, after ACE 5.5 is released. - -#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION) || \ - defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA) - -# include "ace/Null_Mutex.h" -# include "ace/Refcounted_Auto_Ptr.h" - -# include "TransactionController.hpp" - -#endif - -#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION) - -template class ACE_Refcounted_Auto_Ptr<ACE_TMCast::Message, ACE_Null_Mutex>; -template class ACE_Refcounted_Auto_Ptr<ACE_TMCast::Recv, ACE_Null_Mutex>; -template class ACE_Refcounted_Auto_Ptr<ACE_TMCast::Send, ACE_Null_Mutex>; - -#elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA) - -# pragma instantiate class ACE_Refcounted_Auto_Ptr<ACE_TMCast::Message, ACE_Null_Mutex> -# pragma instantiate class ACE_Refcounted_Auto_Ptr<ACE_TMCast::Recv, ACE_Null_Mutex> -# pragma instantiate class ACE_Refcounted_Auto_Ptr<ACE_TMCast::Send, ACE_Null_Mutex> - -#elif defined (__HP_aCC) -// Make aC++ stop complaining about an empty translation unit -static int shut_up_aCC = 0; -#endif diff --git a/protocols/ace/TMCast/TransactionController.hpp b/protocols/ace/TMCast/TransactionController.hpp deleted file mode 100644 index 6b0d4281655..00000000000 --- a/protocols/ace/TMCast/TransactionController.hpp +++ /dev/null @@ -1,388 +0,0 @@ -// file : ACE_TMCast/TransactionController.hpp -// author : Boris Kolpackov <boris@dre.vanderbilt.edu> -// cvs-id : $Id$ - -#include "ace/OS_NS_string.h" -#include "ace/OS_NS_stdlib.h" -#include "ace/Synch.h" -#include "ace/Refcounted_Auto_Ptr.h" - -#include "Protocol.hpp" -#include "Messaging.hpp" - -#include <typeinfo> - -namespace ACE_TMCast -{ - - // Messages - // - // - class Send : public virtual Message - { - public: - Send (void const* msg, size_t size) - : size_ (size) - { - ACE_OS::memcpy (payload_, msg, size_); - } - - void const* - payload () const - { - return payload_; - } - - size_t - size () const - { - return size_; - } - - private: - size_t size_; - char payload_[Protocol::MAX_PAYLOAD_SIZE]; - }; - - typedef - ACE_Refcounted_Auto_Ptr<Send, ACE_Null_Mutex> - SendPtr; - - - class Recv : public virtual Message - { - public: - Recv (void const* msg, size_t size) - : size_ (size) - { - ACE_OS::memcpy (payload_, msg, size_); - } - - void const* - payload () const - { - return payload_; - } - - size_t - size () const - { - return size_; - } - - private: - size_t size_; - char payload_[Protocol::MAX_PAYLOAD_SIZE]; - }; - - typedef - ACE_Refcounted_Auto_Ptr<Recv, ACE_Null_Mutex> - RecvPtr; - - class Aborted : public virtual Message {}; - - class Commited : public virtual Message {}; - - - // - // - // - class TransactionController - { - public: - TransactionController (MessageQueue& in, - MessageQueue& send_out, - MessageQueue& recv_out) - : trace_ (false), - voting_duration_ (0), - separation_duration_ (0), - in_ (in), - send_out_ (send_out), - recv_out_ (recv_out) - { - current_.id = 0; - current_.status = Protocol::TS_COMMITED; - } - - public: - class Failure {}; - - - void - outsync (Protocol::Transaction& c, void* payload, size_t& size) - { - if (current_.status == Protocol::TS_COMMIT || - current_.status == Protocol::TS_ABORT) - { - if (++voting_duration_ >= Protocol::VOTING_FRAME) - { - // end of voting frame - - if (current_.status == Protocol::TS_COMMIT) - { - { - if (initiated_) - { - MessageQueueAutoLock lock (send_out_); - send_out_.push (MessagePtr (new Commited)); - } - else // joined transaction - { - MessageQueueAutoLock lock (recv_out_); - recv_out_.push (MessagePtr (recv_.release ())); - recv_ = RecvPtr (); - } - } - - current_.status = Protocol::TS_COMMITED; - - // if (trace_) cerr << "commited transaction with id " - // << current_.id << endl; - } - else // TS_ABORT - { - if (initiated_) - { - MessageQueueAutoLock lock (send_out_); - send_out_.push (MessagePtr (new Aborted)); - } - else - { - // free revc_ buffer if necessary - // - if (recv_.get ()) recv_ = RecvPtr (); - } - - - current_.status = Protocol::TS_ABORTED; - - // if (trace_) cerr << "aborted transaction with id " - // << current_.id << endl; - } - - // start transaction separation frame (counts down) - // +1 because it will be decremented on this iteration - separation_duration_ = Protocol::SEPARATION_FRAME + 1; - } - } - - // Set current outsync info - - c.id = current_.id; - c.status = current_.status; - - - // Do some post-processing - - switch (current_.status) - { - case Protocol::TS_COMMITED: - case Protocol::TS_ABORTED: - { - if (separation_duration_ > 0) --separation_duration_; - break; - } - case Protocol::TS_BEGIN: - { - // transfer payload - - size = send_->size (); - memcpy (payload, send_->payload (), size); - - send_ = SendPtr (); - - // get redy to vote for 'commit' - - current_.status = Protocol::TS_COMMIT; - voting_duration_ = 0; - } - } - } - - void - current_transaction (Protocol::Transaction const& t, - void const* payload, - size_t size) - { - Protocol::TransactionId& id = current_.id; - Protocol::TransactionStatus& s = current_.status; - - if (id == 0 && t.id != 0) // catch up - { - switch (t.status) - { - case Protocol::TS_BEGIN: - case Protocol::TS_COMMIT: - case Protocol::TS_ABORT: - { - id = t.id - 1; - s = Protocol::TS_COMMITED; - break; - } - case Protocol::TS_ABORTED: - case Protocol::TS_COMMITED: - { - id = t.id; - s = t.status; - break; - } - } - - // if (trace_) cerr << "caught up with id " << id << endl; - } - - bool stable (s == Protocol::TS_COMMITED || s == Protocol::TS_ABORTED); - - switch (t.status) - { - case Protocol::TS_BEGIN: - { - if (!stable || t.id != id + 1) - { - // Transaction is in progress or hole in transaction id's - - // cerr << "unexpected request to join " << t - // << " while on " << current_ << endl; - - // if (!stable) cerr << "voting progress is " << voting_duration_ - // << "/" << Protocol::VOTING_FRAME << endl; - - if (t.id == id) // collision - { - if (!stable && s != Protocol::TS_ABORT) - { - // abort both - // cerr << "aborting both transactions" << endl; - - s = Protocol::TS_ABORT; - voting_duration_ = 0; //@@ reset voting frame - } - } - else - { - // @@ delicate case. need to think more - - // cerr << "Declaring node failed." << endl; - throw Failure (); - } - } - else - { - // join the transaction - - initiated_ = false; - - recv_ = RecvPtr (new Recv (payload, size)); - - id = t.id; - s = Protocol::TS_COMMIT; - voting_duration_ = 0; - - // if (trace_) cerr << "joining-for-commit transaction with id " - // << id << endl; - } - break; - } - case Protocol::TS_COMMIT: - { - if (stable && id == t.id - 1) - { - // not begin and and we haven't joined - - // join for abort - - initiated_ = false; - - current_.id = t.id; - current_.status = Protocol::TS_ABORT; - voting_duration_ = 0; - - // if (trace_) cerr << "joining-for-abort transaction with id " - // << current_.id << endl; - } - break; - } - case Protocol::TS_ABORT: - { - if ((!stable && id == t.id && s == Protocol::TS_COMMIT) || - (stable && id == t.id - 1)) // abort current || new transaction - { - // if (trace_) cerr << "voting-for-abort on transaction with id " - // << current_.id << endl; - - id = t.id; - s = Protocol::TS_ABORT; - - voting_duration_ = 0; //@@ reseting voting_duration_ - } - else - { - } - - break; - } - case Protocol::TS_ABORTED: - case Protocol::TS_COMMITED: - { - // nothing for now - break; - } - } - } - - void - api () - { - if ((current_.status == Protocol::TS_COMMITED || - current_.status == Protocol::TS_ABORTED) && - separation_duration_ == 0) // no transaction in progress - { - // start new transaction - - // Note that in_ is already locked by Scheduler - - MessagePtr m (in_.front ()); - in_.pop (); - - if (typeid (*m) == typeid (Send)) - { - send_ = SendPtr (dynamic_cast<Send*> (m.release ())); - } - else - { - // cerr << "Expecting Send but received " << typeid (*m).name () - // << endl; - - ACE_OS::abort (); - } - - current_.id++; - current_.status = Protocol::TS_BEGIN; - - initiated_ = true; - - // if (trace_) cerr << "starting transaction with id " << current_.id - // << endl; - } - } - - private: - typedef ACE_Guard<ACE_Thread_Mutex> AutoLock; - - bool trace_; - - Protocol::Transaction current_; - - bool initiated_; - - unsigned short voting_duration_; - unsigned short separation_duration_; - - MessageQueue& in_; - MessageQueue& send_out_; - MessageQueue& recv_out_; - - SendPtr send_; - RecvPtr recv_; - }; -} |