summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authornobody <nobody@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>2004-01-27 20:50:21 +0000
committernobody <nobody@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>2004-01-27 20:50:21 +0000
commitc22583a269120a3d7b43286b7c996453b3fb31ac (patch)
tree34f0b8ecda22a5b4c601e97937b309a9f413ef46
parent4f7fe5fbb13938de8744643d898db5c3671009fe (diff)
downloadATCD-c22583a269120a3d7b43286b7c996453b3fb31ac.tar.gz
This commit was manufactured by cvs2svn to create tagEC_DT_CS520_2
'EC_DT_CS520_2'.
-rw-r--r--protocols/ace/RMCast/Makefile.am108
-rw-r--r--protocols/ace/RMCast/README57
-rw-r--r--protocols/ace/RMCast/RMCast_Export.h44
-rw-r--r--protocols/ace/TMCast/Export.hpp58
-rw-r--r--protocols/ace/TMCast/FaultDetector.hpp41
-rw-r--r--protocols/ace/TMCast/Group.cpp508
-rw-r--r--protocols/ace/TMCast/Group.hpp51
-rw-r--r--protocols/ace/TMCast/GroupFwd.hpp15
-rw-r--r--protocols/ace/TMCast/LinkListener.hpp166
-rw-r--r--protocols/ace/TMCast/MTQueue.hpp178
-rw-r--r--protocols/ace/TMCast/Messaging.hpp54
-rw-r--r--protocols/ace/TMCast/Protocol.cpp31
-rw-r--r--protocols/ace/TMCast/Protocol.hpp107
-rw-r--r--protocols/ace/TMCast/README58
-rw-r--r--protocols/ace/TMCast/TMCast.mpc8
-rw-r--r--protocols/ace/TMCast/TransactionController.hpp384
16 files changed, 0 insertions, 1868 deletions
diff --git a/protocols/ace/RMCast/Makefile.am b/protocols/ace/RMCast/Makefile.am
deleted file mode 100644
index f874ca9a380..00000000000
--- a/protocols/ace/RMCast/Makefile.am
+++ /dev/null
@@ -1,108 +0,0 @@
-##----------------------------------------------------------------------------
-## $Id$
-##
-## Makefile.am for ACE_RMCast library
-##----------------------------------------------------------------------------
-
-##
-## Process this file with automake to create Makefile.in
-##
-
-AM_CPPFLAGS = -I$(top_builddir) -I$(top_srcdir)
-
-lib_LTLIBRARIES = libACE_RMCast.la
-
-## ACE_RMCast library version is same as ACE's version.
-libACE_RMCast_la_LDFLAGS = -version-number @ACE_MAJOR@:@ACE_MINOR@:@ACE_BETA@
-
-libACE_RMCast_la_SOURCES = \
- RMCast.cpp \
- RMCast_Ack_Worker.cpp \
- RMCast_Fork.cpp \
- RMCast_Fragment.cpp \
- RMCast_IO_UDP.cpp \
- RMCast_Membership.cpp \
- RMCast_Module.cpp \
- RMCast_Module_Factory.cpp \
- RMCast_Partial_Message.cpp \
- RMCast_Proxy.cpp \
- RMCast_Reassembly.cpp \
- RMCast_Receiver_Module.cpp \
- RMCast_Reliable_Factory.cpp \
- RMCast_Reordering.cpp \
- RMCast_Resend_Handler.cpp \
- RMCast_Resend_Worker.cpp \
- RMCast_Retransmission.cpp \
- RMCast_Sequencer.cpp \
- RMCast_Singleton_Factory.cpp \
- RMCast_UDP_Event_Handler.cpp \
- RMCast_UDP_Proxy.cpp \
- RMCast_UDP_Reliable_Receiver.cpp \
- RMCast_UDP_Reliable_Sender.cpp
-
-libACE_RMCast_la_LIBADD = $(top_builddir)/ace/libACE.la
-
-## These are template source files.
-TEMPLATE_FILES = \
- RMCast_Copy_On_Write.cpp \
- RMCast_Worker.cpp
-
-HEADER_FILES = \
- RMCast.h \
- RMCast_Ack_Worker.h \
- RMCast_Copy_On_Write.h \
- RMCast_Export.h \
- RMCast_Fork.h \
- RMCast_Fragment.h \
- RMCast_IO_UDP.h \
- RMCast_Membership.h \
- RMCast_Module.h \
- RMCast_Module_Factory.h \
- RMCast_Partial_Message.h \
- RMCast_Proxy.h \
- RMCast_Reassembly.h \
- RMCast_Receiver_Module.h \
- RMCast_Reliable_Factory.h \
- RMCast_Reordering.h \
- RMCast_Resend_Handler.h \
- RMCast_Resend_Worker.h \
- RMCast_Retransmission.h \
- RMCast_Sequencer.h \
- RMCast_Singleton_Factory.h \
- RMCast_UDP_Event_Handler.h \
- RMCast_UDP_Proxy.h \
- RMCast_UDP_Reliable_Receiver.h \
- RMCast_UDP_Reliable_Sender.h \
- RMCast_Worker.h
-
-INLINE_FILES = \
- RMCast.i \
- RMCast_Ack_Worker.i \
- RMCast_Copy_On_Write.i \
- RMCast_Fork.i \
- RMCast_Fragment.i \
- RMCast_IO_UDP.i \
- RMCast_Membership.i \
- RMCast_Module.i \
- RMCast_Module_Factory.i \
- RMCast_Partial_Message.i \
- RMCast_Proxy.i \
- RMCast_Reassembly.i \
- RMCast_Receiver_Module.i \
- RMCast_Reliable_Factory.i \
- RMCast_Reordering.i \
- RMCast_Resend_Handler.i \
- RMCast_Resend_Worker.i \
- RMCast_Retransmission.i \
- RMCast_Sequencer.i \
- RMCast_Singleton_Factory.i \
- RMCast_UDP_Event_Handler.i \
- RMCast_UDP_Proxy.i \
- RMCast_UDP_Reliable_Receiver.i \
- RMCast_UDP_Reliable_Sender.i \
- RMCast_Worker.i
-
-pkginclude_HEADERS = \
- $(HEADER_FILES) \
- $(INLINE_FILES) \
- $(TEMPLATE_FILES)
diff --git a/protocols/ace/RMCast/README b/protocols/ace/RMCast/README
deleted file mode 100644
index 2dd0c5d9cfc..00000000000
--- a/protocols/ace/RMCast/README
+++ /dev/null
@@ -1,57 +0,0 @@
-# $Id$
-
- This directory will contain a simple, small-scale reliable
-multicast framework for ACE. The framework is based on the ASX
-components of the ACE library: the protocol is implemented as a stack
-of interchangeable "modules", each one in charge of a very small task.
-For example, one module implements fragmentation and reassembly, other
-modules implement retransmission, send ACK and NAK messages, and
-maintain receiver membership.
-
- The modules are replaced to achieve different levels of
-reliability. For example, the retransmission module can be either the
-"Best_Effort", "Semi_Reliable" or "Reliable" implementation. In the
-first case no retransmissions are performed, but lost messages are
-detected and reported to the receiver. The "Semi_Reliable" case
-messages are held for a pre-specified amount of time, and
-re-transmited if requested, but it is possible to loose some messages
-if multiple re-transmissions fail. As in the "Best_Effort" case the
-lost messages are detected and flagged to the application. Finally
-in the "Reliable" mode the senders are flowed controlled until enough
-messages are successfully transmitted.
-
- In general the stack looks like this:
-
-
-SENDER:
-
-----------------------------------------------------------------
-Buffering : Save lost messages
-Retransmission : Retransmit
-----------------------------------------------------------------
-Fragmentation : Fragment messages in smaller chunks
-Reassembly : and ensure that the IOVMAX limit is not
- : reached
-----------------------------------------------------------------
-Tranport : Encapsulate the specific transport media
- : such as TCP/IP, ATM, or shared memory
- : Demuxes incoming data to the right chain
- : Change control messages and data messages
- : to the right dynamic types.
-----------------------------------------------------------------
-
-RECEIVER:
-
-----------------------------------------------------------------
-Lost detection : Detect lost messages and send control
- : messages back
-----------------------------------------------------------------
-Reassembly : Reassemble messages, fragment control
-Fragmentation : data
-----------------------------------------------------------------
-Transport : Group membership, ACT reception,
- : handle keep-alive messages...
-----------------------------------------------------------------
-
-
-@@ TODO: Piggybacking...
diff --git a/protocols/ace/RMCast/RMCast_Export.h b/protocols/ace/RMCast/RMCast_Export.h
deleted file mode 100644
index 51257c4d682..00000000000
--- a/protocols/ace/RMCast/RMCast_Export.h
+++ /dev/null
@@ -1,44 +0,0 @@
-// -*- C++ -*-
-// $Id$
-// Definition for Win32 Export directives.
-// This file is generated automatically by
-// generate_export_file.pl
-// ------------------------------
-#if !defined (ACE_RMCAST_EXPORT_H)
-#define ACE_RMCAST_EXPORT_H
-
-#include "ace/config-all.h"
-
-#if defined (ACE_AS_STATIC_LIBS) && !defined (ACE_RMCAST_HAS_DLL)
-# define ACE_RMCAST_HAS_DLL 0
-#endif /* ACE_AS_STATIC_LIBS && ACE_RMCAST_HAS_DLL */
-
-#if !defined (ACE_RMCAST_HAS_DLL)
-#define ACE_RMCAST_HAS_DLL 1
-#endif /* ! ACE_RMCAST_HAS_DLL */
-
-#if defined (ACE_RMCAST_HAS_DLL)
-# if (ACE_RMCAST_HAS_DLL == 1)
-# if defined (ACE_RMCAST_BUILD_DLL)
-# define ACE_RMCast_Export ACE_Proper_Export_Flag
-# define ACE_RMCAST_SINGLETON_DECLARATION(T) ACE_EXPORT_SINGLETON_DECLARATION (T)
-# define ACE_RMCAST_SINGLETON_DECLARE(SINGLETON_TYPE, CLASS, LOCK) ACE_EXPORT_SINGLETON_DECLARE(SINGLETON_TYPE, CLASS, LOCK)
-# else
-# define ACE_RMCast_Export ACE_Proper_Import_Flag
-# define ACE_RMCAST_SINGLETON_DECLARATION(T) ACE_IMPORT_SINGLETON_DECLARATION (T)
-# define ACE_RMCAST_SINGLETON_DECLARE(SINGLETON_TYPE, CLASS, LOCK) ACE_IMPORT_SINGLETON_DECLARE(SINGLETON_TYPE, CLASS, LOCK)
-# endif /* ACE_RMCAST_BUILD_DLL */
-# else
-# define ACE_RMCast_Export
-# define ACE_RMCAST_SINGLETON_DECLARATION(T)
-# define ACE_RMCAST_SINGLETON_DECLARE(SINGLETON_TYPE, CLASS, LOCK)
-# endif /* ! ACE_RMCAST_HAS_DLL == 1 */
-#else
-# define ACE_RMCast_Export
-# define ACE_RMCAST_SINGLETON_DECLARATION(T)
-# define ACE_RMCAST_SINGLETON_DECLARE(SINGLETON_TYPE, CLASS, LOCK)
-#endif /* ACE_RMCAST_HAS_DLL */
-
-#endif /* ACE_RMCAST_EXPORT_H */
-
-// End of auto generated file.
diff --git a/protocols/ace/TMCast/Export.hpp b/protocols/ace/TMCast/Export.hpp
deleted file mode 100644
index f13a69ecefd..00000000000
--- a/protocols/ace/TMCast/Export.hpp
+++ /dev/null
@@ -1,58 +0,0 @@
-
-// -*- C++ -*-
-// $Id$
-// Definition for Win32 Export directives.
-// This file is generated automatically by generate_export_file.pl TMCast
-// ------------------------------
-#ifndef TMCAST_EXPORT_H
-#define TMCAST_EXPORT_H
-
-#include "ace/config-all.h"
-
-#if defined (ACE_AS_STATIC_LIBS) && !defined (TMCAST_HAS_DLL)
-# define TMCAST_HAS_DLL 0
-#endif /* ACE_AS_STATIC_LIBS && TMCAST_HAS_DLL */
-
-#if !defined (TMCAST_HAS_DLL)
-#define TMCAST_HAS_DLL 1
-#endif /* ! TMCAST_HAS_DLL */
-
-#if defined (TMCAST_HAS_DLL) && (TMCAST_HAS_DLL == 1)
-# if defined (TMCAST_BUILD_DLL)
-# define TMCast_Export ACE_Proper_Export_Flag
-# define TMCAST_SINGLETON_DECLARATION(T) ACE_EXPORT_SINGLETON_DECLARATION (T)
-# define TMCAST_SINGLETON_DECLARE(SINGLETON_TYPE, CLASS, LOCK) ACE_EXPORT_SINGLETON_DECLARE(SINGLETON_TYPE, CLASS, LOCK)
-# else /* TMCAST_BUILD_DLL */
-# define TMCast_Export ACE_Proper_Import_Flag
-# define TMCAST_SINGLETON_DECLARATION(T) ACE_IMPORT_SINGLETON_DECLARATION (T)
-# define TMCAST_SINGLETON_DECLARE(SINGLETON_TYPE, CLASS, LOCK) ACE_IMPORT_SINGLETON_DECLARE(SINGLETON_TYPE, CLASS, LOCK)
-# endif /* TMCAST_BUILD_DLL */
-#else /* TMCAST_HAS_DLL == 1 */
-# define TMCast_Export
-# define TMCAST_SINGLETON_DECLARATION(T)
-# define TMCAST_SINGLETON_DECLARE(SINGLETON_TYPE, CLASS, LOCK)
-#endif /* TMCAST_HAS_DLL == 1 */
-
-// Set TMCAST_NTRACE = 0 to turn on library specific tracing even if
-// tracing is turned off for ACE.
-#if !defined (TMCAST_NTRACE)
-# if (ACE_NTRACE == 1)
-# define TMCAST_NTRACE 1
-# else /* (ACE_NTRACE == 1) */
-# define TMCAST_NTRACE 0
-# endif /* (ACE_NTRACE == 1) */
-#endif /* !TMCAST_NTRACE */
-
-#if (TMCAST_NTRACE == 1)
-# define TMCAST_TRACE(X)
-#else /* (TMCAST_NTRACE == 1) */
-# if !defined (ACE_HAS_TRACE)
-# define ACE_HAS_TRACE
-# endif /* ACE_HAS_TRACE */
-# define TMCAST_TRACE(X) ACE_TRACE_IMPL(X)
-# include "ace/Trace.h"
-#endif /* (TMCAST_NTRACE == 1) */
-
-#endif /* TMCAST_EXPORT_H */
-
-// End of auto generated file.
diff --git a/protocols/ace/TMCast/FaultDetector.hpp b/protocols/ace/TMCast/FaultDetector.hpp
deleted file mode 100644
index ba476cbd367..00000000000
--- a/protocols/ace/TMCast/FaultDetector.hpp
+++ /dev/null
@@ -1,41 +0,0 @@
-// file : TMCast/FaultDetector.hpp
-// author : Boris Kolpackov <boris@dre.vanderbilt.edu>
-// cvs-id : $Id$
-
-#include "Protocol.hpp"
-
-namespace TMCast
-{
- class FaultDetector
- {
- public:
- FaultDetector ()
- : silence_period_ (-1)
- {
- }
-
- public:
- class Failed {};
-
-
- void
- insync ()
- {
- silence_period_ = 0;
- }
-
- void
- outsync ()
- {
- if (++silence_period_ >= Protocol::FATAL_SILENCE_FRAME)
- {
- // cerr << "Silence period has been passed." << endl;
- // cerr << "Decalring the node failed." << endl;
- throw Failed ();
- }
- }
-
- private:
- short silence_period_;
- };
-}
diff --git a/protocols/ace/TMCast/Group.cpp b/protocols/ace/TMCast/Group.cpp
deleted file mode 100644
index 901aa7ba650..00000000000
--- a/protocols/ace/TMCast/Group.cpp
+++ /dev/null
@@ -1,508 +0,0 @@
-// file : TMCast/Group.cpp
-// author : Boris Kolpackov <boris@dre.vanderbilt.edu>
-// cvs-id : $Id$
-
-#include "Group.hpp"
-
-#include <typeinfo>
-
-// OS primitives
-#include <ace/OS.h>
-#include <ace/Synch.h>
-#include <ace/Time_Value.h>
-#include <ace/SOCK_Dgram_Mcast.h>
-
-#include "Messaging.hpp"
-
-#include "Protocol.hpp"
-
-// Components
-
-#include "LinkListener.hpp"
-#include "FaultDetector.hpp"
-#include "TransactionController.hpp"
-
-namespace TMCast
-{
- bool
- operator== (std::type_info const* pa, std::type_info const& b)
- {
- return *pa == b;
- }
-
- //
- //
- //
- class Terminate : public virtual Message {};
-
-
- //
- //
- //
- class Failure : public virtual Message {};
-
-
- //
- //
- //
- class Scheduler
- {
- public:
- Scheduler (ACE_INET_Addr const& addr,
- char const* id,
- MessageQueue& out_send_data,
- MessageQueue& out_recv_data,
- MessageQueue& out_control)
-
- : cond_ (mutex_),
-
- addr_ (addr),
- sock_ (),
-
- out_control_ (out_control),
-
- in_data_ (mutex_),
- in_link_data_(mutex_),
- in_control_ (mutex_),
-
- sync_schedule (ACE_OS::gettimeofday ()),
-
- transaction_controller_ (in_data_, out_send_data, out_recv_data)
- {
- ACE_OS::strncpy (id_, id, Protocol::MEMBER_ID_LENGTH);
- id_[Protocol::MEMBER_ID_LENGTH - 1] = '\0';
-
- sock_.set_option (IP_MULTICAST_TTL, 32); // @@ ttl is hardcoded
-
- in_data_.subscribe (cond_);
- in_link_data_.subscribe (cond_);
- in_control_.subscribe (cond_);
-
- if (ACE_OS::thr_create (&thread_thunk,
- this,
- THR_JOINABLE,
- &thread_) != 0) ::abort ();
- }
-
- ~Scheduler ()
- {
- {
- MessageQueueAutoLock lock (in_control_);
-
- in_control_.push (MessagePtr (new Terminate));
- }
-
- if (ACE_OS::thr_join (thread_, &thread_, 0) != 0) ::abort ();
-
- // cerr << "Scheduler is down." << endl;
- }
-
- public:
- MessageQueue&
- in_data ()
- {
- return in_data_;
- }
-
- private:
- static ACE_THR_FUNC_RETURN
- thread_thunk (void* arg)
- {
- Scheduler* obj = reinterpret_cast<Scheduler*> (arg);
-
- obj->execute ();
- return 0;
- }
-
- void
- execute ()
- {
- try
- {
- sock_.join (addr_);
-
- auto_ptr<LinkListener> ll (new LinkListener (sock_, in_link_data_));
-
- {
- AutoLock lock (mutex_);
-
- // Loop
- //
- //
-
- while (true)
- {
- cond_.wait (&sync_schedule);
-
- // "Loop of Fairness"
-
- bool done = false;
-
- do
- {
- // control message
- //
- //
- if (!in_control_.empty ())
- {
- done = true;
- break;
- }
-
-
- // outsync
- //
- //
- if (sync_schedule < ACE_OS::gettimeofday ())
- {
- // OUTSYNC
-
- outsync ();
-
- // schedule next outsync
-
- sync_schedule =
- ACE_OS::gettimeofday () +
- ACE_Time_Value (0, Protocol::SYNC_PERIOD);
- }
-
- // link message
- //
- //
- if (!in_link_data_.empty ())
- {
- MessagePtr m (in_link_data_.front ());
- in_link_data_.pop ();
-
- std::type_info const* exp = &typeid (*m);
-
- if (exp == typeid (LinkFailure))
- {
- // cerr << "link failure" << endl;
- throw false;
- }
- else if (exp == typeid (LinkData))
- {
-
- LinkData* data = dynamic_cast<LinkData*> (m.get ());
-
- // INSYNC, TL, CT
-
- // Filter out loopback.
- //
- if (ACE_OS::strcmp (data->header().member_id.id, id_) != 0)
- {
- insync ();
- transaction_list ();
- current_transaction (data->header().current,
- data->payload (),
- data->size ());
- }
- }
- else
- {
- // cerr << "unknown message type from link listener: "
- // << typeid (*m).name () << endl;
- abort ();
- }
- }
-
- // api message
- //
- //
- if (!in_data_.empty ())
- {
- // API
-
- api ();
- }
-
- } while (!in_link_data_.empty() ||
- sync_schedule < ACE_OS::gettimeofday ());
-
- if (done) break;
- }
- }
- }
- catch (...)
- {
- // cerr << "Exception in scheduler loop." << endl;
-
- MessageQueueAutoLock lock (out_control_);
- out_control_.push (MessagePtr (new Failure));
- }
- }
-
- // Events
- //
- // Order:
- //
- // INSYNC, TSL, VOTE, BEGIN
- // API
- // OUTSYNC
- //
-
- void
- insync ()
- {
- fault_detector_.insync ();
- }
-
- void
- outsync ()
- {
- char buf[Protocol::MAX_MESSAGE_SIZE];
-
- Protocol::MessageHeader* hdr =
- reinterpret_cast<Protocol::MessageHeader*> (buf);
-
- void* data = buf + sizeof (Protocol::MessageHeader);
-
- hdr->length = sizeof (Protocol::MessageHeader);
- hdr->check_sum = 0;
-
- ACE_OS::strcpy (hdr->member_id.id, id_);
-
- size_t size (0);
-
- transaction_controller_.outsync (hdr->current, data, size);
-
- hdr->length += size;
-
- fault_detector_.outsync ();
-
- // sock_.send (buf, hdr->length, addr_);
-
- sock_.send (buf, hdr->length);
- }
-
- void
- transaction_list ()
- {
- }
-
- void
- current_transaction (Protocol::Transaction const& t,
- void const* payload,
- size_t size)
- {
- transaction_controller_.current_transaction (t, payload, size);
- }
-
- void
- api ()
- {
- transaction_controller_.api ();
- }
-
- private:
- ACE_thread_t thread_;
-
- ACE_Thread_Mutex mutex_;
- ACE_Condition<ACE_Thread_Mutex> cond_;
-
- typedef ACE_Guard<ACE_Thread_Mutex> AutoLock;
-
- char id_[Protocol::MEMBER_ID_LENGTH];
-
- ACE_INET_Addr addr_;
- ACE_SOCK_Dgram_Mcast sock_;
-
- MessageQueue& out_control_;
-
- MessageQueue in_data_;
- MessageQueue in_link_data_;
- MessageQueue in_control_;
-
- // Protocol state
- //
- //
-
- ACE_Time_Value sync_schedule;
-
- FaultDetector fault_detector_;
- TransactionController transaction_controller_;
- };
-
-
- //
- //
- //
- class Group::GroupImpl
- {
- public:
- ~GroupImpl ()
- {
- }
-
- GroupImpl (ACE_INET_Addr const& addr, char const* id)
- throw (Group::Failed)
- : send_cond_ (mutex_),
- recv_cond_ (mutex_),
- failed_ (false),
- in_send_data_ (mutex_),
- in_recv_data_ (mutex_),
- in_control_ (mutex_),
- scheduler_ (new Scheduler (addr,
- id,
- in_send_data_,
- in_recv_data_,
- in_control_)),
- out_data_ (scheduler_->in_data ())
- {
- in_send_data_.subscribe (send_cond_);
- in_recv_data_.subscribe (recv_cond_);
-
- in_control_.subscribe (send_cond_);
- in_control_.subscribe (recv_cond_);
- }
-
- void
- send (void const* msg, size_t size)
- throw (Group::InvalidArg, Group::Failed, Group::Aborted)
- {
- if (size > Protocol::MAX_PAYLOAD_SIZE) throw InvalidArg ();
-
- // Note the potential deadlock if I lock mutex_ and out_data_ in
- // reverse order.
-
- MessageQueueAutoLock l1 (out_data_);
- AutoLock l2 (mutex_);
-
- throw_if_failed ();
-
- out_data_.push (MessagePtr (new Send (msg, size)));
-
- l1.unlock (); // no need to keep it locked
-
- while (true)
- {
- throw_if_failed ();
-
- if (!in_send_data_.empty ())
- {
- MessagePtr m (in_send_data_.front ());
- in_send_data_.pop ();
-
- std::type_info const* exp = &typeid (*m);
-
- if (exp == typeid (TMCast::Aborted))
- {
- throw Group::Aborted ();
- }
- else if (exp == typeid (Commited))
- {
- return;
- }
- else
- {
- // cerr << "send: group-scheduler messaging protocol violation; "
- // << "unexpected message " << typeid (*m).name ()
- // << " " << typeid (Aborted).name () << endl;
-
- abort ();
- }
- }
-
- // cerr << "send: waiting on condition" << endl;
- send_cond_.wait ();
- // cerr << "send: wokeup on condition" << endl;
- }
- }
-
-
-
- size_t
- recv (void* msg, size_t size) throw (Group::Failed, Group::InsufficienSpace)
- {
- AutoLock lock (mutex_);
-
- while (true)
- {
- throw_if_failed ();
-
- if (!in_recv_data_.empty ())
- {
- MessagePtr m (in_recv_data_.front ());
- in_recv_data_.pop ();
-
- std::type_info const* exp = &typeid (*m);
-
- if (exp == typeid (Recv))
- {
- Recv* data = dynamic_cast<Recv*> (m.get ());
-
- if (size < data->size ()) throw Group::InsufficienSpace ();
-
- memcpy (msg, data->payload (), data->size ());
-
- return data->size ();
- }
- else
- {
- // cerr << "recv: group-scheduler messaging protocol violation. "
- // << "unexpected message " << typeid (*m).name () << endl;
-
- abort ();
- }
- }
-
- recv_cond_.wait ();
- }
- }
-
- private:
- void
- throw_if_failed ()
- {
- if (!failed_ && !in_control_.empty ()) failed_ = true;
-
- if (failed_) throw Group::Failed ();
- }
-
- private:
- ACE_Thread_Mutex mutex_;
- ACE_Condition<ACE_Thread_Mutex> send_cond_;
- ACE_Condition<ACE_Thread_Mutex> recv_cond_;
-
- typedef ACE_Guard<ACE_Thread_Mutex> AutoLock;
-
- bool failed_;
-
- MessageQueue in_send_data_;
- MessageQueue in_recv_data_;
- MessageQueue in_control_;
-
- auto_ptr<Scheduler> scheduler_;
-
- MessageQueue& out_data_;
- };
-
-
- // Group
- //
- //
- Group::
- Group (ACE_INET_Addr const& addr, char const* id)
- throw (Group::Failed)
- : pimpl_ (new GroupImpl (addr, id))
- {
- }
-
- Group::
- ~Group ()
- {
- }
-
- void
- Group::send (void const* msg, size_t size) throw (Group::InvalidArg, Group::Failed, Group::Aborted)
- {
- pimpl_->send (msg, size);
- }
-
- size_t
- Group::recv (void* msg, size_t size) throw (Group::Failed, Group::InsufficienSpace)
- {
- return pimpl_->recv (msg, size);
- }
-}
diff --git a/protocols/ace/TMCast/Group.hpp b/protocols/ace/TMCast/Group.hpp
deleted file mode 100644
index 416cea0a17d..00000000000
--- a/protocols/ace/TMCast/Group.hpp
+++ /dev/null
@@ -1,51 +0,0 @@
-// file : TMCast/Group.hpp
-// author : Boris Kolpackov <boris@dre.vanderbilt.edu>
-// cvs-id : $Id$
-
-#ifndef TMCAST_GROUP_HPP
-#define TMCAST_GROUP_HPP
-
-#include <ace/Auto_Ptr.h>
-#include <ace/INET_Addr.h>
-
-#include "Export.hpp"
-
-namespace TMCast
-{
- class TMCast_Export Group
- {
- public:
- class Aborted {};
- class Failed {};
- class InvalidArg {};
- class InsufficienSpace {};
-
- public:
- ~Group ();
-
- Group (ACE_INET_Addr const& addr, char const* id) throw (Failed);
-
- public:
- void
- send (void const* msg, size_t size) throw (InvalidArg, Failed, Aborted);
-
- size_t
- recv (void* msg, size_t size) throw (Failed, InsufficienSpace);
-
- private:
- bool
- failed ();
-
- private:
- class GroupImpl;
- auto_ptr<GroupImpl> pimpl_;
-
- private:
- Group (Group const&);
-
- Group&
- operator= (Group const&);
- };
-}
-
-#endif // TMCAST_GROUP_HPP
diff --git a/protocols/ace/TMCast/GroupFwd.hpp b/protocols/ace/TMCast/GroupFwd.hpp
deleted file mode 100644
index beba06df79d..00000000000
--- a/protocols/ace/TMCast/GroupFwd.hpp
+++ /dev/null
@@ -1,15 +0,0 @@
-// file : TMCast/GroupFwd.hpp
-// author : Boris Kolpackov <boris@dre.vanderbilt.edu>
-// cvs-id : $Id$
-
-#ifndef TMCAST_GROUP_FWD_HPP
-#define TMCAST_GROUP_FWD_HPP
-
-#include "Export.hpp"
-
-namespace TMCast
-{
- class TMCast_Export Group;
-}
-
-#endif // TMCAST_GROUP_FWD_HPP
diff --git a/protocols/ace/TMCast/LinkListener.hpp b/protocols/ace/TMCast/LinkListener.hpp
deleted file mode 100644
index aee1263aa0a..00000000000
--- a/protocols/ace/TMCast/LinkListener.hpp
+++ /dev/null
@@ -1,166 +0,0 @@
-// file : TMCast/LinkListener.hpp
-// author : Boris Kolpackov <boris@dre.vanderbilt.edu>
-// cvs-id : $Id$
-
-// OS primitives
-#include <ace/Synch.h>
-#include <ace/SOCK_Dgram_Mcast.h>
-#include <ace/Refcounted_Auto_Ptr.h>
-
-
-#include "Messaging.hpp"
-
-namespace TMCast
-{
- //
- //
- //
- class LinkFailure : public virtual Message {};
-
-
- //
- //
- //
- class LinkData : public virtual Message
- {
- public:
- LinkData (Protocol::MessageHeader const* header,
- void* payload,
- size_t size)
- : size_ (size)
- {
- ACE_OS::memcpy (&header_, header, sizeof (Protocol::MessageHeader));
- ACE_OS::memcpy (payload_, payload, size_);
- }
-
- Protocol::MessageHeader const&
- header () const
- {
- return header_;
- }
-
- void const*
- payload () const
- {
- return payload_;
- }
-
- size_t
- size () const
- {
- return size_;
- }
-
- private:
- Protocol::MessageHeader header_;
- char payload_[Protocol::MAX_MESSAGE_SIZE];
- size_t size_;
- };
-
- typedef
- ACE_Refcounted_Auto_Ptr<LinkData, ACE_Null_Mutex>
- LinkDataPtr;
-
- //
- //
- //
- class LinkListener
- {
- private:
- class Terminate : public virtual Message {};
-
- public:
- LinkListener (ACE_SOCK_Dgram_Mcast& sock, MessageQueue& out)
- : sock_(sock), out_ (out)
- {
- if (ACE_OS::thr_create (&thread_thunk,
- this,
- THR_JOINABLE,
- &thread_) != 0) ::abort ();
- }
-
- ~LinkListener ()
- {
- {
- MessageQueueAutoLock lock (control_);
-
- control_.push (MessagePtr (new Terminate));
- }
-
- if (ACE_OS::thr_join (thread_, &thread_, 0) != 0) ::abort ();
-
- // cerr << "Link listener is down." << endl;
- }
-
-
- static ACE_THR_FUNC_RETURN
- thread_thunk (void* arg)
- {
- LinkListener* obj = reinterpret_cast<LinkListener*> (arg);
-
- obj->execute ();
- return 0;
- }
-
- void
- execute ()
- {
- char msg[Protocol::MAX_MESSAGE_SIZE];
-
- ssize_t header_size = sizeof (Protocol::MessageHeader);
-
- // OS::Time timeout (1000000); // one millisecond
-
- ACE_Time_Value timeout (0, 1000); // one millisecond
-
- try
- {
- while (true)
- {
- // Check control message queue
-
- {
- MessageQueueAutoLock lock (control_);
-
- if (!control_.empty ()) break;
- }
-
- ACE_Addr junk;
- ssize_t n = sock_.recv (msg,
- Protocol::MAX_MESSAGE_SIZE,
- junk,
- 0,
- &timeout);
-
- if (n != -1)
- {
- if (n < header_size) throw false;
-
- Protocol::MessageHeader* header =
- reinterpret_cast<Protocol::MessageHeader*> (msg);
-
- MessageQueueAutoLock lock (out_);
-
- out_.push (MessagePtr (new LinkData (header,
- msg + header_size,
- n - header_size)));
- }
- }
- }
- catch (...)
- {
- MessageQueueAutoLock lock (out_);
-
- out_.push (MessagePtr (new LinkFailure));
- }
- }
-
- private:
- typedef ACE_Guard<ACE_Thread_Mutex> AutoLock;
-
- ACE_thread_t thread_;
- ACE_SOCK_Dgram_Mcast& sock_;
- MessageQueue& out_;
- MessageQueue control_;
- };
-}
diff --git a/protocols/ace/TMCast/MTQueue.hpp b/protocols/ace/TMCast/MTQueue.hpp
deleted file mode 100644
index 23116cac7d2..00000000000
--- a/protocols/ace/TMCast/MTQueue.hpp
+++ /dev/null
@@ -1,178 +0,0 @@
-// file : TMCast/MTQueue.hpp
-// author : Boris Kolpackov <boris@dre.vanderbilt.edu>
-// cvs-id : $Id$
-
-#ifndef TMCAST_MT_QUEUE_HPP
-#define TMCAST_MT_QUEUE_HPP
-
-#include "ace/Auto_Ptr.h"
-#include "ace/Unbounded_Set.h"
-#include "ace/Unbounded_Queue.h"
-#include "ace/os_include/sys/os_types.h"
-
-namespace TMCast
-{
- template <typename T,
- typename M,
- typename C,
- typename Q = ACE_Unbounded_Queue<T> >
- class MTQueue
- {
- public:
- typedef T ElementType;
- typedef M MutexType;
- typedef C ConditionalType;
- typedef Q QueueType;
-
- public:
-
- MTQueue (size_t hint = 0)
- : mutexp_ (new MutexType),
- mutex_ (*mutexp_),
- // queue_ (hint),
- queue_ (),
- signal_ (false)
- {
- }
-
- MTQueue (MutexType& mutex, size_t hint = 0)
- : mutexp_ (),
- mutex_ (mutex),
- // queue_ (hint),
- queue_ (),
- signal_ (false)
- {
- }
-
- public:
- bool
- empty () const
- {
- return queue_.is_empty ();
- }
-
- size_t
- size () const
- {
- return queue_.size ();
- }
-
- // typedef typename QueueType::Empty Empty;
-
- class Empty {};
-
- T&
- front ()
- {
- ACE_Unbounded_Queue_Iterator<T> f (queue_);
- T* tmp;
- if (!f.next (tmp)) throw Empty ();
-
- return *tmp;
- }
-
-
- T const&
- front () const
- {
- ACE_Unbounded_Queue_Const_Iterator<T> f (queue_);
- T* tmp;
- if (!f.next (tmp)) throw Empty ();
-
- return *tmp;
- }
-
- /*
- T&
- back ()
- {
- return queue_.back ();
- }
-
-
- T const&
- back () const
- {
- return queue_.back ();
- }
- */
-
- void
- push (T const& t)
- {
- signal_ = empty ();
- queue_.enqueue_tail (t);
- }
-
- void
- pop ()
- {
- T junk;
- queue_.dequeue_head (junk);
- }
-
- public:
- void
- lock () const
- {
- mutex_.acquire ();
- }
-
- void
- unlock () const
- {
- if (signal_)
- {
- signal_ = false;
-
- for (ConditionalSetConstIterator_ i (cond_set_);
- !i.done ();
- i.advance ())
- {
- ConditionalType** c;
-
- i.next (c);
-
- (*c)->signal ();
- }
- }
-
- mutex_.release ();
- }
-
- void
- subscribe (ConditionalType& c)
- {
- //@@ should check for duplicates
- //
- cond_set_.insert (&c);
- }
-
- void
- unsubscribe (ConditionalType& c)
- {
- //@@ should check for absence
- //
- cond_set_.remove (&c);
- }
-
- private:
- auto_ptr<MutexType> mutexp_;
- mutable MutexType& mutex_;
- QueueType queue_;
-
- typedef
- ACE_Unbounded_Set<ConditionalType*>
- ConditionalSet_;
-
- typedef
- ACE_Unbounded_Set_Const_Iterator<ConditionalType*>
- ConditionalSetConstIterator_;
-
- ConditionalSet_ cond_set_;
-
- mutable bool signal_;
- };
-}
-
-#endif // TMCAST_MT_QUEUE_HPP
diff --git a/protocols/ace/TMCast/Messaging.hpp b/protocols/ace/TMCast/Messaging.hpp
deleted file mode 100644
index 6a1000c3265..00000000000
--- a/protocols/ace/TMCast/Messaging.hpp
+++ /dev/null
@@ -1,54 +0,0 @@
-// file : TMCast/Messaging.hpp
-// author : Boris Kolpackov <boris@dre.vanderbilt.edu>
-// cvs-id : $Id$
-
-#ifndef TMCAST_MESSAGING_HPP
-#define TMCAST_MESSAGING_HPP
-
-#include <ace/Synch.h>
-#include <ace/Refcounted_Auto_Ptr.h>
-
-#include "MTQueue.hpp"
-
-namespace TMCast
-{
- class Message
- {
- public:
- virtual
- ~Message () {}
- };
-
- typedef
- ACE_Refcounted_Auto_Ptr<Message, ACE_Null_Mutex>
- MessagePtr;
-
- typedef
- MTQueue<MessagePtr, ACE_Thread_Mutex, ACE_Condition<ACE_Thread_Mutex> >
- MessageQueue;
-
- struct MessageQueueAutoLock
- {
- MessageQueueAutoLock (MessageQueue& q)
- : q_ (q)
- {
- q_.lock ();
- }
-
- void
- unlock ()
- {
- q_.unlock ();
- }
-
- ~MessageQueueAutoLock ()
- {
- q_.unlock ();
- }
-
- private:
- MessageQueue& q_;
- };
-}
-
-#endif // TMCAST_MESSAGING_HPP
diff --git a/protocols/ace/TMCast/Protocol.cpp b/protocols/ace/TMCast/Protocol.cpp
deleted file mode 100644
index 78563281694..00000000000
--- a/protocols/ace/TMCast/Protocol.cpp
+++ /dev/null
@@ -1,31 +0,0 @@
-// file : TMCast/Protocol.cpp
-// author : Boris Kolpackov <boris@dre.vanderbilt.edu>
-// cvs-id : $Id$
-
-#include "Protocol.hpp"
-
-namespace TMCast
-{
- namespace Protocol
- {
- namespace
- {
- char const* labels[] = {
- "NONE", "BEGIN", "COMMIT", "ABORT", "COMMITED", "ABORTED"};
- }
-
- /*
- std::string
- tslabel (Protocol::TransactionStatus s)
- {
- return labels[s];
- }
-
- std::ostream&
- operator << (std::ostream& o, Transaction const& t)
- {
- return o << "{" << t.id << "; " << tslabel (t.status) << "}";
- }
- */
- }
-}
diff --git a/protocols/ace/TMCast/Protocol.hpp b/protocols/ace/TMCast/Protocol.hpp
deleted file mode 100644
index d5ae6a50cd6..00000000000
--- a/protocols/ace/TMCast/Protocol.hpp
+++ /dev/null
@@ -1,107 +0,0 @@
-// file : TMCast/Protocol.hpp
-// author : Boris Kolpackov <boris@dre.vanderbilt.edu>
-// cvs-id : $Id$
-
-#ifndef TMCAST_PROTOCOL_HPP
-#define TMCAST_PROTOCOL_HPP
-
-namespace TMCast
-{
- namespace Protocol
- {
- //
- //
- //
- unsigned long const MEMBER_ID_LENGTH = 38;
-
- struct MemberId
- {
- char id[MEMBER_ID_LENGTH];
- /*
- unsigned long ip;
- unsigned short port;
- */
- };
-
- //
- //
- //
- typedef unsigned short TransactionId;
-
-
-
- typedef unsigned char TransactionStatus;
-
- TransactionStatus const TS_BEGIN = 1;
- TransactionStatus const TS_COMMIT = 2;
- TransactionStatus const TS_ABORT = 3;
- TransactionStatus const TS_COMMITED = 4;
- TransactionStatus const TS_ABORTED = 5;
-
- struct Transaction
- {
- TransactionId id;
- TransactionStatus status;
- };
-
- // Transaction List (TL)
-
- // unsigned long const TL_LENGTH = 1;
-
- // typedef Transaction TransactionList[TL_LENGTH];
-
-
- //
- //
- //
- struct MessageHeader
- {
- unsigned long length;
-
- unsigned long check_sum;
-
- MemberId member_id;
-
- Transaction current;
-
- //TransactionList transaction_list;
- };
-
-
- //
- //
- //
-
- unsigned long const MAX_MESSAGE_SIZE = 768;
-
- unsigned long const
- MAX_PAYLOAD_SIZE = MAX_MESSAGE_SIZE - sizeof (MessageHeader);
-
- // Protocol timing
- //
- //
-
- unsigned long const SYNC_PERIOD = 30000; // in mks
-
- unsigned short const VOTING_FRAME = 4; // in SYNC_PERIOD's
- unsigned short const SEPARATION_FRAME = 5; // in SYNC_PERIOD's
-
- // FATAL_SILENCE_FRAME in SYNC_PERIOD's
- // Generally it's a good idea to set it to < VOTING_FRAME + SEPARATION_FRAME
- //
-
- short const FATAL_SILENCE_FRAME = VOTING_FRAME + SEPARATION_FRAME - 2;
-
- // short const FATAL_SILENCE_FRAME = 10000;
-
- // Helpers
-
- // std::string
- // tslabel (Protocol::TransactionStatus s);
-
- // std::ostream&
- // operator << (std::ostream& o, Transaction const& t);
- }
-}
-
-#endif // TMCAST_PROTOCOL_HPP
diff --git a/protocols/ace/TMCast/README b/protocols/ace/TMCast/README
deleted file mode 100644
index d061c7b2cba..00000000000
--- a/protocols/ace/TMCast/README
+++ /dev/null
@@ -1,58 +0,0 @@
-
-
-Architecture
-
-TMCast (stands for Transaction MultiCast) is an implementation of a
-transactional multicast protocol. In essence, the idea is to represent
-message delivery to members of a multicast group as a transaction -
-atomic, consistent and isolated action. Multicast transaction can
-be viewed as an atomic transition of group members to a new state.
-If we define Mo as a set of operational (non-faulty) members of the
-group, Mf as a set of faulty members of the group, Ma as a set of
-members that view transition Tn as aborted and Mc as a set of members
-that view transition Tn as committed, then this atomic transition Tn
-can be described as one of the following:
-
-Mo(Tn-1) = Ma(T) U Mf(T)
-Mo(Tn-1) = Mc(T) U Mf(T)
-
-Or, in other words, after transaction T has been committed (aborted),
-all operational (before transaction T) members are either in
-committed (aborted) or failed state.
-
-Thus, for each member of the group, outcome of the transaction can
-be commit, abort or member failure. It is important for a member
-to exhibit a failfast (error latency is less than transaction cycle)
-behavior. Or, in other words, if the member transitioned into a wrong
-state, it is guaranteed to fail instead of delivering wrong result.
-
-In order to achieve such error detection in decentralized environment,
-certain limitations should be imposed. One of the most user-visible
-limitation is the fact that the lifetime of the group with only
-one member is very short. This is because there is not way for a
-member to distinguish "no members yet" case from "my link to the
-group is down". In such situation, the member assumes the latter case.
-There is also a military saying that puts it quite nicely: two is one,
-one is nothing.
-
-
-State of Implementation
-
-Current implementation is in prototypical stage. The following parts
-are not implemented or still under development:
-
-* Handling of network partitioning (TODO)
-
-* Redundant network support (TODO)
-
-* Member failure detection (partial implementation)
-
-
-Examples
-
-There is a simple example available in examples/TMCast/Member with
-corresponding README.
-
-
---
-Boris Kolpackov <boris@dre.vanderbilt.edu> \ No newline at end of file
diff --git a/protocols/ace/TMCast/TMCast.mpc b/protocols/ace/TMCast/TMCast.mpc
deleted file mode 100644
index 0899982dd09..00000000000
--- a/protocols/ace/TMCast/TMCast.mpc
+++ /dev/null
@@ -1,8 +0,0 @@
-// -*- MPC -*-
-// $Id$
-
-project : acelib, core {
- requires += exceptions
- sharedname = TMCast
- dynamicflags += TMCAST_BUILD_DLL
-}
diff --git a/protocols/ace/TMCast/TransactionController.hpp b/protocols/ace/TMCast/TransactionController.hpp
deleted file mode 100644
index 66c924faaf1..00000000000
--- a/protocols/ace/TMCast/TransactionController.hpp
+++ /dev/null
@@ -1,384 +0,0 @@
-// file : TMCast/TransactionController.hpp
-// author : Boris Kolpackov <boris@dre.vanderbilt.edu>
-// cvs-id : $Id$
-
-#include <ace/Synch.h>
-#include <ace/Refcounted_Auto_Ptr.h>
-
-#include "Protocol.hpp"
-#include "Messaging.hpp"
-
-namespace TMCast
-{
-
- // Messages
- //
- //
- class Send : public virtual Message
- {
- public:
- Send (void const* msg, size_t size)
- : size_ (size)
- {
- ACE_OS::memcpy (payload_, msg, size_);
- }
-
- void const*
- payload () const
- {
- return payload_;
- }
-
- size_t
- size () const
- {
- return size_;
- }
-
- private:
- size_t size_;
- char payload_[Protocol::MAX_PAYLOAD_SIZE];
- };
-
- typedef
- ACE_Refcounted_Auto_Ptr<Send, ACE_Null_Mutex>
- SendPtr;
-
-
- class Recv : public virtual Message
- {
- public:
- Recv (void const* msg, size_t size)
- : size_ (size)
- {
- ACE_OS::memcpy (payload_, msg, size_);
- }
-
- void const*
- payload () const
- {
- return payload_;
- }
-
- size_t
- size () const
- {
- return size_;
- }
-
- private:
- size_t size_;
- char payload_[Protocol::MAX_PAYLOAD_SIZE];
- };
-
- typedef
- ACE_Refcounted_Auto_Ptr<Recv, ACE_Null_Mutex>
- RecvPtr;
-
- class Aborted : public virtual Message {};
-
- class Commited : public virtual Message {};
-
-
- //
- //
- //
- class TransactionController
- {
- public:
- TransactionController (MessageQueue& in,
- MessageQueue& send_out,
- MessageQueue& recv_out)
- : trace_ (false),
- voting_duration_ (0),
- separation_duration_ (0),
- in_ (in),
- send_out_ (send_out),
- recv_out_ (recv_out)
- {
- current_.id = 0;
- current_.status = Protocol::TS_COMMITED;
- }
-
- public:
- class Failure {};
-
-
- void
- outsync (Protocol::Transaction& c, void* payload, size_t& size)
- {
- if (current_.status == Protocol::TS_COMMIT ||
- current_.status == Protocol::TS_ABORT)
- {
- if (++voting_duration_ >= Protocol::VOTING_FRAME)
- {
- // end of voting frame
-
- if (current_.status == Protocol::TS_COMMIT)
- {
- {
- if (initiated_)
- {
- MessageQueueAutoLock lock (send_out_);
- send_out_.push (MessagePtr (new Commited));
- }
- else // joined transaction
- {
- MessageQueueAutoLock lock (recv_out_);
- recv_out_.push (MessagePtr (recv_.release ()));
- recv_ = RecvPtr ();
- }
- }
-
- current_.status = Protocol::TS_COMMITED;
-
- // if (trace_) cerr << "commited transaction with id "
- // << current_.id << endl;
- }
- else // TS_ABORT
- {
- if (initiated_)
- {
- MessageQueueAutoLock lock (send_out_);
- send_out_.push (MessagePtr (new Aborted));
- }
- else
- {
- // free revc_ buffer if necessary
- //
- if (recv_.get ()) recv_ = RecvPtr ();
- }
-
-
- current_.status = Protocol::TS_ABORTED;
-
- // if (trace_) cerr << "aborted transaction with id "
- // << current_.id << endl;
- }
-
- // start transaction separation frame (counts down)
- // +1 because it will be decremented on this iteration
- separation_duration_ = Protocol::SEPARATION_FRAME + 1;
- }
- }
-
- // Set current outsync info
-
- c.id = current_.id;
- c.status = current_.status;
-
-
- // Do some post-processing
-
- switch (current_.status)
- {
- case Protocol::TS_COMMITED:
- case Protocol::TS_ABORTED:
- {
- if (separation_duration_ > 0) --separation_duration_;
- break;
- }
- case Protocol::TS_BEGIN:
- {
- // transfer payload
-
- size = send_->size ();
- memcpy (payload, send_->payload (), size);
-
- send_ = SendPtr ();
-
- // get redy to vote for 'commit'
-
- current_.status = Protocol::TS_COMMIT;
- voting_duration_ = 0;
- }
- }
- }
-
- void
- current_transaction (Protocol::Transaction const& t,
- void const* payload,
- size_t size)
- {
- Protocol::TransactionId& id = current_.id;
- Protocol::TransactionStatus& s = current_.status;
-
- if (id == 0 && t.id != 0) // catch up
- {
- switch (t.status)
- {
- case Protocol::TS_BEGIN:
- case Protocol::TS_COMMIT:
- case Protocol::TS_ABORT:
- {
- id = t.id - 1;
- s = Protocol::TS_COMMITED;
- break;
- }
- case Protocol::TS_ABORTED:
- case Protocol::TS_COMMITED:
- {
- id = t.id;
- s = t.status;
- break;
- }
- }
-
- // if (trace_) cerr << "caught up with id " << id << endl;
- }
-
- bool stable (s == Protocol::TS_COMMITED || s == Protocol::TS_ABORTED);
-
- switch (t.status)
- {
- case Protocol::TS_BEGIN:
- {
- if (!stable || t.id != id + 1)
- {
- // Transaction is in progress or hole in transaction id's
-
- // cerr << "unexpected request to join " << t
- // << " while on " << current_ << endl;
-
- // if (!stable) cerr << "voting progress is " << voting_duration_
- // << "/" << Protocol::VOTING_FRAME << endl;
-
- if (t.id == id) // collision
- {
- if (!stable && s != Protocol::TS_ABORT)
- {
- // abort both
- // cerr << "aborting both transactions" << endl;
-
- s = Protocol::TS_ABORT;
- voting_duration_ = 0; //@@ reset voting frame
- }
- }
- else
- {
- // @@ delicate case. need to think more
-
- // cerr << "Declaring node failed." << endl;
- throw Failure ();
- }
- }
- else
- {
- // join the transaction
-
- initiated_ = false;
-
- recv_ = RecvPtr (new Recv (payload, size));
-
- id = t.id;
- s = Protocol::TS_COMMIT;
- voting_duration_ = 0;
-
- // if (trace_) cerr << "joining-for-commit transaction with id "
- // << id << endl;
- }
- break;
- }
- case Protocol::TS_COMMIT:
- {
- if (stable && id == t.id - 1)
- {
- // not begin and and we haven't joined
-
- // join for abort
-
- initiated_ = false;
-
- current_.id = t.id;
- current_.status = Protocol::TS_ABORT;
- voting_duration_ = 0;
-
- // if (trace_) cerr << "joining-for-abort transaction with id "
- // << current_.id << endl;
- }
- break;
- }
- case Protocol::TS_ABORT:
- {
- if ((!stable && id == t.id && s == Protocol::TS_COMMIT) ||
- (stable && id == t.id - 1)) // abort current || new transaction
- {
- // if (trace_) cerr << "voting-for-abort on transaction with id "
- // << current_.id << endl;
-
- id = t.id;
- s = Protocol::TS_ABORT;
-
- voting_duration_ = 0; //@@ reseting voting_duration_
- }
- else
- {
- }
-
- break;
- }
- case Protocol::TS_ABORTED:
- case Protocol::TS_COMMITED:
- {
- // nothing for now
- break;
- }
- }
- }
-
- void
- api ()
- {
- if ((current_.status == Protocol::TS_COMMITED ||
- current_.status == Protocol::TS_ABORTED) &&
- separation_duration_ == 0) // no transaction in progress
- {
- // start new transaction
-
- // Note that in_ is already locked by Scheduler
-
- MessagePtr m (in_.front ());
- in_.pop ();
-
- if (typeid (*m) == typeid (Send))
- {
- send_ = SendPtr (dynamic_cast<Send*> (m.release ()));
- }
- else
- {
- // cerr << "Expecting Send but received " << typeid (*m).name ()
- // << endl;
-
- ::abort ();
- }
-
- current_.id++;
- current_.status = Protocol::TS_BEGIN;
-
- initiated_ = true;
-
- // if (trace_) cerr << "starting transaction with id " << current_.id
- // << endl;
- }
- }
-
- private:
- typedef ACE_Guard<ACE_Thread_Mutex> AutoLock;
-
- bool trace_;
-
- Protocol::Transaction current_;
-
- bool initiated_;
-
- unsigned short voting_duration_;
- unsigned short separation_duration_;
-
- MessageQueue& in_;
- MessageQueue& send_out_;
- MessageQueue& recv_out_;
-
- SendPtr send_;
- RecvPtr recv_;
- };
-}