From c22583a269120a3d7b43286b7c996453b3fb31ac Mon Sep 17 00:00:00 2001 From: nobody Date: Tue, 27 Jan 2004 20:50:21 +0000 Subject: This commit was manufactured by cvs2svn to create tag 'EC_DT_CS520_2'. --- protocols/ace/RMCast/Makefile.am | 108 ------ protocols/ace/RMCast/README | 57 --- protocols/ace/RMCast/RMCast_Export.h | 44 --- protocols/ace/TMCast/Export.hpp | 58 --- protocols/ace/TMCast/FaultDetector.hpp | 41 -- protocols/ace/TMCast/Group.cpp | 508 ------------------------- protocols/ace/TMCast/Group.hpp | 51 --- protocols/ace/TMCast/GroupFwd.hpp | 15 - protocols/ace/TMCast/LinkListener.hpp | 166 -------- protocols/ace/TMCast/MTQueue.hpp | 178 --------- protocols/ace/TMCast/Messaging.hpp | 54 --- protocols/ace/TMCast/Protocol.cpp | 31 -- protocols/ace/TMCast/Protocol.hpp | 107 ------ protocols/ace/TMCast/README | 58 --- protocols/ace/TMCast/TMCast.mpc | 8 - protocols/ace/TMCast/TransactionController.hpp | 384 ------------------- 16 files changed, 1868 deletions(-) delete mode 100644 protocols/ace/RMCast/Makefile.am delete mode 100644 protocols/ace/RMCast/README delete mode 100644 protocols/ace/RMCast/RMCast_Export.h delete mode 100644 protocols/ace/TMCast/Export.hpp delete mode 100644 protocols/ace/TMCast/FaultDetector.hpp delete mode 100644 protocols/ace/TMCast/Group.cpp delete mode 100644 protocols/ace/TMCast/Group.hpp delete mode 100644 protocols/ace/TMCast/GroupFwd.hpp delete mode 100644 protocols/ace/TMCast/LinkListener.hpp delete mode 100644 protocols/ace/TMCast/MTQueue.hpp delete mode 100644 protocols/ace/TMCast/Messaging.hpp delete mode 100644 protocols/ace/TMCast/Protocol.cpp delete mode 100644 protocols/ace/TMCast/Protocol.hpp delete mode 100644 protocols/ace/TMCast/README delete mode 100644 protocols/ace/TMCast/TMCast.mpc delete mode 100644 protocols/ace/TMCast/TransactionController.hpp diff --git a/protocols/ace/RMCast/Makefile.am b/protocols/ace/RMCast/Makefile.am deleted file mode 100644 index f874ca9a380..00000000000 --- a/protocols/ace/RMCast/Makefile.am +++ /dev/null @@ -1,108 +0,0 @@ -##---------------------------------------------------------------------------- -## $Id$ -## -## Makefile.am for ACE_RMCast library -##---------------------------------------------------------------------------- - -## -## Process this file with automake to create Makefile.in -## - -AM_CPPFLAGS = -I$(top_builddir) -I$(top_srcdir) - -lib_LTLIBRARIES = libACE_RMCast.la - -## ACE_RMCast library version is same as ACE's version. -libACE_RMCast_la_LDFLAGS = -version-number @ACE_MAJOR@:@ACE_MINOR@:@ACE_BETA@ - -libACE_RMCast_la_SOURCES = \ - RMCast.cpp \ - RMCast_Ack_Worker.cpp \ - RMCast_Fork.cpp \ - RMCast_Fragment.cpp \ - RMCast_IO_UDP.cpp \ - RMCast_Membership.cpp \ - RMCast_Module.cpp \ - RMCast_Module_Factory.cpp \ - RMCast_Partial_Message.cpp \ - RMCast_Proxy.cpp \ - RMCast_Reassembly.cpp \ - RMCast_Receiver_Module.cpp \ - RMCast_Reliable_Factory.cpp \ - RMCast_Reordering.cpp \ - RMCast_Resend_Handler.cpp \ - RMCast_Resend_Worker.cpp \ - RMCast_Retransmission.cpp \ - RMCast_Sequencer.cpp \ - RMCast_Singleton_Factory.cpp \ - RMCast_UDP_Event_Handler.cpp \ - RMCast_UDP_Proxy.cpp \ - RMCast_UDP_Reliable_Receiver.cpp \ - RMCast_UDP_Reliable_Sender.cpp - -libACE_RMCast_la_LIBADD = $(top_builddir)/ace/libACE.la - -## These are template source files. -TEMPLATE_FILES = \ - RMCast_Copy_On_Write.cpp \ - RMCast_Worker.cpp - -HEADER_FILES = \ - RMCast.h \ - RMCast_Ack_Worker.h \ - RMCast_Copy_On_Write.h \ - RMCast_Export.h \ - RMCast_Fork.h \ - RMCast_Fragment.h \ - RMCast_IO_UDP.h \ - RMCast_Membership.h \ - RMCast_Module.h \ - RMCast_Module_Factory.h \ - RMCast_Partial_Message.h \ - RMCast_Proxy.h \ - RMCast_Reassembly.h \ - RMCast_Receiver_Module.h \ - RMCast_Reliable_Factory.h \ - RMCast_Reordering.h \ - RMCast_Resend_Handler.h \ - RMCast_Resend_Worker.h \ - RMCast_Retransmission.h \ - RMCast_Sequencer.h \ - RMCast_Singleton_Factory.h \ - RMCast_UDP_Event_Handler.h \ - RMCast_UDP_Proxy.h \ - RMCast_UDP_Reliable_Receiver.h \ - RMCast_UDP_Reliable_Sender.h \ - RMCast_Worker.h - -INLINE_FILES = \ - RMCast.i \ - RMCast_Ack_Worker.i \ - RMCast_Copy_On_Write.i \ - RMCast_Fork.i \ - RMCast_Fragment.i \ - RMCast_IO_UDP.i \ - RMCast_Membership.i \ - RMCast_Module.i \ - RMCast_Module_Factory.i \ - RMCast_Partial_Message.i \ - RMCast_Proxy.i \ - RMCast_Reassembly.i \ - RMCast_Receiver_Module.i \ - RMCast_Reliable_Factory.i \ - RMCast_Reordering.i \ - RMCast_Resend_Handler.i \ - RMCast_Resend_Worker.i \ - RMCast_Retransmission.i \ - RMCast_Sequencer.i \ - RMCast_Singleton_Factory.i \ - RMCast_UDP_Event_Handler.i \ - RMCast_UDP_Proxy.i \ - RMCast_UDP_Reliable_Receiver.i \ - RMCast_UDP_Reliable_Sender.i \ - RMCast_Worker.i - -pkginclude_HEADERS = \ - $(HEADER_FILES) \ - $(INLINE_FILES) \ - $(TEMPLATE_FILES) diff --git a/protocols/ace/RMCast/README b/protocols/ace/RMCast/README deleted file mode 100644 index 2dd0c5d9cfc..00000000000 --- a/protocols/ace/RMCast/README +++ /dev/null @@ -1,57 +0,0 @@ -# $Id$ - - This directory will contain a simple, small-scale reliable -multicast framework for ACE. The framework is based on the ASX -components of the ACE library: the protocol is implemented as a stack -of interchangeable "modules", each one in charge of a very small task. -For example, one module implements fragmentation and reassembly, other -modules implement retransmission, send ACK and NAK messages, and -maintain receiver membership. - - The modules are replaced to achieve different levels of -reliability. For example, the retransmission module can be either the -"Best_Effort", "Semi_Reliable" or "Reliable" implementation. In the -first case no retransmissions are performed, but lost messages are -detected and reported to the receiver. The "Semi_Reliable" case -messages are held for a pre-specified amount of time, and -re-transmited if requested, but it is possible to loose some messages -if multiple re-transmissions fail. As in the "Best_Effort" case the -lost messages are detected and flagged to the application. Finally -in the "Reliable" mode the senders are flowed controlled until enough -messages are successfully transmitted. - - In general the stack looks like this: - - -SENDER: - ----------------------------------------------------------------- -Buffering : Save lost messages -Retransmission : Retransmit ----------------------------------------------------------------- -Fragmentation : Fragment messages in smaller chunks -Reassembly : and ensure that the IOVMAX limit is not - : reached ----------------------------------------------------------------- -Tranport : Encapsulate the specific transport media - : such as TCP/IP, ATM, or shared memory - : Demuxes incoming data to the right chain - : Change control messages and data messages - : to the right dynamic types. ----------------------------------------------------------------- - -RECEIVER: - ----------------------------------------------------------------- -Lost detection : Detect lost messages and send control - : messages back ----------------------------------------------------------------- -Reassembly : Reassemble messages, fragment control -Fragmentation : data ----------------------------------------------------------------- -Transport : Group membership, ACT reception, - : handle keep-alive messages... ----------------------------------------------------------------- - - -@@ TODO: Piggybacking... diff --git a/protocols/ace/RMCast/RMCast_Export.h b/protocols/ace/RMCast/RMCast_Export.h deleted file mode 100644 index 51257c4d682..00000000000 --- a/protocols/ace/RMCast/RMCast_Export.h +++ /dev/null @@ -1,44 +0,0 @@ -// -*- C++ -*- -// $Id$ -// Definition for Win32 Export directives. -// This file is generated automatically by -// generate_export_file.pl -// ------------------------------ -#if !defined (ACE_RMCAST_EXPORT_H) -#define ACE_RMCAST_EXPORT_H - -#include "ace/config-all.h" - -#if defined (ACE_AS_STATIC_LIBS) && !defined (ACE_RMCAST_HAS_DLL) -# define ACE_RMCAST_HAS_DLL 0 -#endif /* ACE_AS_STATIC_LIBS && ACE_RMCAST_HAS_DLL */ - -#if !defined (ACE_RMCAST_HAS_DLL) -#define ACE_RMCAST_HAS_DLL 1 -#endif /* ! ACE_RMCAST_HAS_DLL */ - -#if defined (ACE_RMCAST_HAS_DLL) -# if (ACE_RMCAST_HAS_DLL == 1) -# if defined (ACE_RMCAST_BUILD_DLL) -# define ACE_RMCast_Export ACE_Proper_Export_Flag -# define ACE_RMCAST_SINGLETON_DECLARATION(T) ACE_EXPORT_SINGLETON_DECLARATION (T) -# define ACE_RMCAST_SINGLETON_DECLARE(SINGLETON_TYPE, CLASS, LOCK) ACE_EXPORT_SINGLETON_DECLARE(SINGLETON_TYPE, CLASS, LOCK) -# else -# define ACE_RMCast_Export ACE_Proper_Import_Flag -# define ACE_RMCAST_SINGLETON_DECLARATION(T) ACE_IMPORT_SINGLETON_DECLARATION (T) -# define ACE_RMCAST_SINGLETON_DECLARE(SINGLETON_TYPE, CLASS, LOCK) ACE_IMPORT_SINGLETON_DECLARE(SINGLETON_TYPE, CLASS, LOCK) -# endif /* ACE_RMCAST_BUILD_DLL */ -# else -# define ACE_RMCast_Export -# define ACE_RMCAST_SINGLETON_DECLARATION(T) -# define ACE_RMCAST_SINGLETON_DECLARE(SINGLETON_TYPE, CLASS, LOCK) -# endif /* ! ACE_RMCAST_HAS_DLL == 1 */ -#else -# define ACE_RMCast_Export -# define ACE_RMCAST_SINGLETON_DECLARATION(T) -# define ACE_RMCAST_SINGLETON_DECLARE(SINGLETON_TYPE, CLASS, LOCK) -#endif /* ACE_RMCAST_HAS_DLL */ - -#endif /* ACE_RMCAST_EXPORT_H */ - -// End of auto generated file. diff --git a/protocols/ace/TMCast/Export.hpp b/protocols/ace/TMCast/Export.hpp deleted file mode 100644 index f13a69ecefd..00000000000 --- a/protocols/ace/TMCast/Export.hpp +++ /dev/null @@ -1,58 +0,0 @@ - -// -*- C++ -*- -// $Id$ -// Definition for Win32 Export directives. -// This file is generated automatically by generate_export_file.pl TMCast -// ------------------------------ -#ifndef TMCAST_EXPORT_H -#define TMCAST_EXPORT_H - -#include "ace/config-all.h" - -#if defined (ACE_AS_STATIC_LIBS) && !defined (TMCAST_HAS_DLL) -# define TMCAST_HAS_DLL 0 -#endif /* ACE_AS_STATIC_LIBS && TMCAST_HAS_DLL */ - -#if !defined (TMCAST_HAS_DLL) -#define TMCAST_HAS_DLL 1 -#endif /* ! TMCAST_HAS_DLL */ - -#if defined (TMCAST_HAS_DLL) && (TMCAST_HAS_DLL == 1) -# if defined (TMCAST_BUILD_DLL) -# define TMCast_Export ACE_Proper_Export_Flag -# define TMCAST_SINGLETON_DECLARATION(T) ACE_EXPORT_SINGLETON_DECLARATION (T) -# define TMCAST_SINGLETON_DECLARE(SINGLETON_TYPE, CLASS, LOCK) ACE_EXPORT_SINGLETON_DECLARE(SINGLETON_TYPE, CLASS, LOCK) -# else /* TMCAST_BUILD_DLL */ -# define TMCast_Export ACE_Proper_Import_Flag -# define TMCAST_SINGLETON_DECLARATION(T) ACE_IMPORT_SINGLETON_DECLARATION (T) -# define TMCAST_SINGLETON_DECLARE(SINGLETON_TYPE, CLASS, LOCK) ACE_IMPORT_SINGLETON_DECLARE(SINGLETON_TYPE, CLASS, LOCK) -# endif /* TMCAST_BUILD_DLL */ -#else /* TMCAST_HAS_DLL == 1 */ -# define TMCast_Export -# define TMCAST_SINGLETON_DECLARATION(T) -# define TMCAST_SINGLETON_DECLARE(SINGLETON_TYPE, CLASS, LOCK) -#endif /* TMCAST_HAS_DLL == 1 */ - -// Set TMCAST_NTRACE = 0 to turn on library specific tracing even if -// tracing is turned off for ACE. -#if !defined (TMCAST_NTRACE) -# if (ACE_NTRACE == 1) -# define TMCAST_NTRACE 1 -# else /* (ACE_NTRACE == 1) */ -# define TMCAST_NTRACE 0 -# endif /* (ACE_NTRACE == 1) */ -#endif /* !TMCAST_NTRACE */ - -#if (TMCAST_NTRACE == 1) -# define TMCAST_TRACE(X) -#else /* (TMCAST_NTRACE == 1) */ -# if !defined (ACE_HAS_TRACE) -# define ACE_HAS_TRACE -# endif /* ACE_HAS_TRACE */ -# define TMCAST_TRACE(X) ACE_TRACE_IMPL(X) -# include "ace/Trace.h" -#endif /* (TMCAST_NTRACE == 1) */ - -#endif /* TMCAST_EXPORT_H */ - -// End of auto generated file. diff --git a/protocols/ace/TMCast/FaultDetector.hpp b/protocols/ace/TMCast/FaultDetector.hpp deleted file mode 100644 index ba476cbd367..00000000000 --- a/protocols/ace/TMCast/FaultDetector.hpp +++ /dev/null @@ -1,41 +0,0 @@ -// file : TMCast/FaultDetector.hpp -// author : Boris Kolpackov -// cvs-id : $Id$ - -#include "Protocol.hpp" - -namespace TMCast -{ - class FaultDetector - { - public: - FaultDetector () - : silence_period_ (-1) - { - } - - public: - class Failed {}; - - - void - insync () - { - silence_period_ = 0; - } - - void - outsync () - { - if (++silence_period_ >= Protocol::FATAL_SILENCE_FRAME) - { - // cerr << "Silence period has been passed." << endl; - // cerr << "Decalring the node failed." << endl; - throw Failed (); - } - } - - private: - short silence_period_; - }; -} diff --git a/protocols/ace/TMCast/Group.cpp b/protocols/ace/TMCast/Group.cpp deleted file mode 100644 index 901aa7ba650..00000000000 --- a/protocols/ace/TMCast/Group.cpp +++ /dev/null @@ -1,508 +0,0 @@ -// file : TMCast/Group.cpp -// author : Boris Kolpackov -// cvs-id : $Id$ - -#include "Group.hpp" - -#include - -// OS primitives -#include -#include -#include -#include - -#include "Messaging.hpp" - -#include "Protocol.hpp" - -// Components - -#include "LinkListener.hpp" -#include "FaultDetector.hpp" -#include "TransactionController.hpp" - -namespace TMCast -{ - bool - operator== (std::type_info const* pa, std::type_info const& b) - { - return *pa == b; - } - - // - // - // - class Terminate : public virtual Message {}; - - - // - // - // - class Failure : public virtual Message {}; - - - // - // - // - class Scheduler - { - public: - Scheduler (ACE_INET_Addr const& addr, - char const* id, - MessageQueue& out_send_data, - MessageQueue& out_recv_data, - MessageQueue& out_control) - - : cond_ (mutex_), - - addr_ (addr), - sock_ (), - - out_control_ (out_control), - - in_data_ (mutex_), - in_link_data_(mutex_), - in_control_ (mutex_), - - sync_schedule (ACE_OS::gettimeofday ()), - - transaction_controller_ (in_data_, out_send_data, out_recv_data) - { - ACE_OS::strncpy (id_, id, Protocol::MEMBER_ID_LENGTH); - id_[Protocol::MEMBER_ID_LENGTH - 1] = '\0'; - - sock_.set_option (IP_MULTICAST_TTL, 32); // @@ ttl is hardcoded - - in_data_.subscribe (cond_); - in_link_data_.subscribe (cond_); - in_control_.subscribe (cond_); - - if (ACE_OS::thr_create (&thread_thunk, - this, - THR_JOINABLE, - &thread_) != 0) ::abort (); - } - - ~Scheduler () - { - { - MessageQueueAutoLock lock (in_control_); - - in_control_.push (MessagePtr (new Terminate)); - } - - if (ACE_OS::thr_join (thread_, &thread_, 0) != 0) ::abort (); - - // cerr << "Scheduler is down." << endl; - } - - public: - MessageQueue& - in_data () - { - return in_data_; - } - - private: - static ACE_THR_FUNC_RETURN - thread_thunk (void* arg) - { - Scheduler* obj = reinterpret_cast (arg); - - obj->execute (); - return 0; - } - - void - execute () - { - try - { - sock_.join (addr_); - - auto_ptr ll (new LinkListener (sock_, in_link_data_)); - - { - AutoLock lock (mutex_); - - // Loop - // - // - - while (true) - { - cond_.wait (&sync_schedule); - - // "Loop of Fairness" - - bool done = false; - - do - { - // control message - // - // - if (!in_control_.empty ()) - { - done = true; - break; - } - - - // outsync - // - // - if (sync_schedule < ACE_OS::gettimeofday ()) - { - // OUTSYNC - - outsync (); - - // schedule next outsync - - sync_schedule = - ACE_OS::gettimeofday () + - ACE_Time_Value (0, Protocol::SYNC_PERIOD); - } - - // link message - // - // - if (!in_link_data_.empty ()) - { - MessagePtr m (in_link_data_.front ()); - in_link_data_.pop (); - - std::type_info const* exp = &typeid (*m); - - if (exp == typeid (LinkFailure)) - { - // cerr << "link failure" << endl; - throw false; - } - else if (exp == typeid (LinkData)) - { - - LinkData* data = dynamic_cast (m.get ()); - - // INSYNC, TL, CT - - // Filter out loopback. - // - if (ACE_OS::strcmp (data->header().member_id.id, id_) != 0) - { - insync (); - transaction_list (); - current_transaction (data->header().current, - data->payload (), - data->size ()); - } - } - else - { - // cerr << "unknown message type from link listener: " - // << typeid (*m).name () << endl; - abort (); - } - } - - // api message - // - // - if (!in_data_.empty ()) - { - // API - - api (); - } - - } while (!in_link_data_.empty() || - sync_schedule < ACE_OS::gettimeofday ()); - - if (done) break; - } - } - } - catch (...) - { - // cerr << "Exception in scheduler loop." << endl; - - MessageQueueAutoLock lock (out_control_); - out_control_.push (MessagePtr (new Failure)); - } - } - - // Events - // - // Order: - // - // INSYNC, TSL, VOTE, BEGIN - // API - // OUTSYNC - // - - void - insync () - { - fault_detector_.insync (); - } - - void - outsync () - { - char buf[Protocol::MAX_MESSAGE_SIZE]; - - Protocol::MessageHeader* hdr = - reinterpret_cast (buf); - - void* data = buf + sizeof (Protocol::MessageHeader); - - hdr->length = sizeof (Protocol::MessageHeader); - hdr->check_sum = 0; - - ACE_OS::strcpy (hdr->member_id.id, id_); - - size_t size (0); - - transaction_controller_.outsync (hdr->current, data, size); - - hdr->length += size; - - fault_detector_.outsync (); - - // sock_.send (buf, hdr->length, addr_); - - sock_.send (buf, hdr->length); - } - - void - transaction_list () - { - } - - void - current_transaction (Protocol::Transaction const& t, - void const* payload, - size_t size) - { - transaction_controller_.current_transaction (t, payload, size); - } - - void - api () - { - transaction_controller_.api (); - } - - private: - ACE_thread_t thread_; - - ACE_Thread_Mutex mutex_; - ACE_Condition cond_; - - typedef ACE_Guard AutoLock; - - char id_[Protocol::MEMBER_ID_LENGTH]; - - ACE_INET_Addr addr_; - ACE_SOCK_Dgram_Mcast sock_; - - MessageQueue& out_control_; - - MessageQueue in_data_; - MessageQueue in_link_data_; - MessageQueue in_control_; - - // Protocol state - // - // - - ACE_Time_Value sync_schedule; - - FaultDetector fault_detector_; - TransactionController transaction_controller_; - }; - - - // - // - // - class Group::GroupImpl - { - public: - ~GroupImpl () - { - } - - GroupImpl (ACE_INET_Addr const& addr, char const* id) - throw (Group::Failed) - : send_cond_ (mutex_), - recv_cond_ (mutex_), - failed_ (false), - in_send_data_ (mutex_), - in_recv_data_ (mutex_), - in_control_ (mutex_), - scheduler_ (new Scheduler (addr, - id, - in_send_data_, - in_recv_data_, - in_control_)), - out_data_ (scheduler_->in_data ()) - { - in_send_data_.subscribe (send_cond_); - in_recv_data_.subscribe (recv_cond_); - - in_control_.subscribe (send_cond_); - in_control_.subscribe (recv_cond_); - } - - void - send (void const* msg, size_t size) - throw (Group::InvalidArg, Group::Failed, Group::Aborted) - { - if (size > Protocol::MAX_PAYLOAD_SIZE) throw InvalidArg (); - - // Note the potential deadlock if I lock mutex_ and out_data_ in - // reverse order. - - MessageQueueAutoLock l1 (out_data_); - AutoLock l2 (mutex_); - - throw_if_failed (); - - out_data_.push (MessagePtr (new Send (msg, size))); - - l1.unlock (); // no need to keep it locked - - while (true) - { - throw_if_failed (); - - if (!in_send_data_.empty ()) - { - MessagePtr m (in_send_data_.front ()); - in_send_data_.pop (); - - std::type_info const* exp = &typeid (*m); - - if (exp == typeid (TMCast::Aborted)) - { - throw Group::Aborted (); - } - else if (exp == typeid (Commited)) - { - return; - } - else - { - // cerr << "send: group-scheduler messaging protocol violation; " - // << "unexpected message " << typeid (*m).name () - // << " " << typeid (Aborted).name () << endl; - - abort (); - } - } - - // cerr << "send: waiting on condition" << endl; - send_cond_.wait (); - // cerr << "send: wokeup on condition" << endl; - } - } - - - - size_t - recv (void* msg, size_t size) throw (Group::Failed, Group::InsufficienSpace) - { - AutoLock lock (mutex_); - - while (true) - { - throw_if_failed (); - - if (!in_recv_data_.empty ()) - { - MessagePtr m (in_recv_data_.front ()); - in_recv_data_.pop (); - - std::type_info const* exp = &typeid (*m); - - if (exp == typeid (Recv)) - { - Recv* data = dynamic_cast (m.get ()); - - if (size < data->size ()) throw Group::InsufficienSpace (); - - memcpy (msg, data->payload (), data->size ()); - - return data->size (); - } - else - { - // cerr << "recv: group-scheduler messaging protocol violation. " - // << "unexpected message " << typeid (*m).name () << endl; - - abort (); - } - } - - recv_cond_.wait (); - } - } - - private: - void - throw_if_failed () - { - if (!failed_ && !in_control_.empty ()) failed_ = true; - - if (failed_) throw Group::Failed (); - } - - private: - ACE_Thread_Mutex mutex_; - ACE_Condition send_cond_; - ACE_Condition recv_cond_; - - typedef ACE_Guard AutoLock; - - bool failed_; - - MessageQueue in_send_data_; - MessageQueue in_recv_data_; - MessageQueue in_control_; - - auto_ptr scheduler_; - - MessageQueue& out_data_; - }; - - - // Group - // - // - Group:: - Group (ACE_INET_Addr const& addr, char const* id) - throw (Group::Failed) - : pimpl_ (new GroupImpl (addr, id)) - { - } - - Group:: - ~Group () - { - } - - void - Group::send (void const* msg, size_t size) throw (Group::InvalidArg, Group::Failed, Group::Aborted) - { - pimpl_->send (msg, size); - } - - size_t - Group::recv (void* msg, size_t size) throw (Group::Failed, Group::InsufficienSpace) - { - return pimpl_->recv (msg, size); - } -} diff --git a/protocols/ace/TMCast/Group.hpp b/protocols/ace/TMCast/Group.hpp deleted file mode 100644 index 416cea0a17d..00000000000 --- a/protocols/ace/TMCast/Group.hpp +++ /dev/null @@ -1,51 +0,0 @@ -// file : TMCast/Group.hpp -// author : Boris Kolpackov -// cvs-id : $Id$ - -#ifndef TMCAST_GROUP_HPP -#define TMCAST_GROUP_HPP - -#include -#include - -#include "Export.hpp" - -namespace TMCast -{ - class TMCast_Export Group - { - public: - class Aborted {}; - class Failed {}; - class InvalidArg {}; - class InsufficienSpace {}; - - public: - ~Group (); - - Group (ACE_INET_Addr const& addr, char const* id) throw (Failed); - - public: - void - send (void const* msg, size_t size) throw (InvalidArg, Failed, Aborted); - - size_t - recv (void* msg, size_t size) throw (Failed, InsufficienSpace); - - private: - bool - failed (); - - private: - class GroupImpl; - auto_ptr pimpl_; - - private: - Group (Group const&); - - Group& - operator= (Group const&); - }; -} - -#endif // TMCAST_GROUP_HPP diff --git a/protocols/ace/TMCast/GroupFwd.hpp b/protocols/ace/TMCast/GroupFwd.hpp deleted file mode 100644 index beba06df79d..00000000000 --- a/protocols/ace/TMCast/GroupFwd.hpp +++ /dev/null @@ -1,15 +0,0 @@ -// file : TMCast/GroupFwd.hpp -// author : Boris Kolpackov -// cvs-id : $Id$ - -#ifndef TMCAST_GROUP_FWD_HPP -#define TMCAST_GROUP_FWD_HPP - -#include "Export.hpp" - -namespace TMCast -{ - class TMCast_Export Group; -} - -#endif // TMCAST_GROUP_FWD_HPP diff --git a/protocols/ace/TMCast/LinkListener.hpp b/protocols/ace/TMCast/LinkListener.hpp deleted file mode 100644 index aee1263aa0a..00000000000 --- a/protocols/ace/TMCast/LinkListener.hpp +++ /dev/null @@ -1,166 +0,0 @@ -// file : TMCast/LinkListener.hpp -// author : Boris Kolpackov -// cvs-id : $Id$ - -// OS primitives -#include -#include -#include - - -#include "Messaging.hpp" - -namespace TMCast -{ - // - // - // - class LinkFailure : public virtual Message {}; - - - // - // - // - class LinkData : public virtual Message - { - public: - LinkData (Protocol::MessageHeader const* header, - void* payload, - size_t size) - : size_ (size) - { - ACE_OS::memcpy (&header_, header, sizeof (Protocol::MessageHeader)); - ACE_OS::memcpy (payload_, payload, size_); - } - - Protocol::MessageHeader const& - header () const - { - return header_; - } - - void const* - payload () const - { - return payload_; - } - - size_t - size () const - { - return size_; - } - - private: - Protocol::MessageHeader header_; - char payload_[Protocol::MAX_MESSAGE_SIZE]; - size_t size_; - }; - - typedef - ACE_Refcounted_Auto_Ptr - LinkDataPtr; - - // - // - // - class LinkListener - { - private: - class Terminate : public virtual Message {}; - - public: - LinkListener (ACE_SOCK_Dgram_Mcast& sock, MessageQueue& out) - : sock_(sock), out_ (out) - { - if (ACE_OS::thr_create (&thread_thunk, - this, - THR_JOINABLE, - &thread_) != 0) ::abort (); - } - - ~LinkListener () - { - { - MessageQueueAutoLock lock (control_); - - control_.push (MessagePtr (new Terminate)); - } - - if (ACE_OS::thr_join (thread_, &thread_, 0) != 0) ::abort (); - - // cerr << "Link listener is down." << endl; - } - - - static ACE_THR_FUNC_RETURN - thread_thunk (void* arg) - { - LinkListener* obj = reinterpret_cast (arg); - - obj->execute (); - return 0; - } - - void - execute () - { - char msg[Protocol::MAX_MESSAGE_SIZE]; - - ssize_t header_size = sizeof (Protocol::MessageHeader); - - // OS::Time timeout (1000000); // one millisecond - - ACE_Time_Value timeout (0, 1000); // one millisecond - - try - { - while (true) - { - // Check control message queue - - { - MessageQueueAutoLock lock (control_); - - if (!control_.empty ()) break; - } - - ACE_Addr junk; - ssize_t n = sock_.recv (msg, - Protocol::MAX_MESSAGE_SIZE, - junk, - 0, - &timeout); - - if (n != -1) - { - if (n < header_size) throw false; - - Protocol::MessageHeader* header = - reinterpret_cast (msg); - - MessageQueueAutoLock lock (out_); - - out_.push (MessagePtr (new LinkData (header, - msg + header_size, - n - header_size))); - } - } - } - catch (...) - { - MessageQueueAutoLock lock (out_); - - out_.push (MessagePtr (new LinkFailure)); - } - } - - private: - typedef ACE_Guard AutoLock; - - ACE_thread_t thread_; - ACE_SOCK_Dgram_Mcast& sock_; - MessageQueue& out_; - MessageQueue control_; - }; -} diff --git a/protocols/ace/TMCast/MTQueue.hpp b/protocols/ace/TMCast/MTQueue.hpp deleted file mode 100644 index 23116cac7d2..00000000000 --- a/protocols/ace/TMCast/MTQueue.hpp +++ /dev/null @@ -1,178 +0,0 @@ -// file : TMCast/MTQueue.hpp -// author : Boris Kolpackov -// cvs-id : $Id$ - -#ifndef TMCAST_MT_QUEUE_HPP -#define TMCAST_MT_QUEUE_HPP - -#include "ace/Auto_Ptr.h" -#include "ace/Unbounded_Set.h" -#include "ace/Unbounded_Queue.h" -#include "ace/os_include/sys/os_types.h" - -namespace TMCast -{ - template > - class MTQueue - { - public: - typedef T ElementType; - typedef M MutexType; - typedef C ConditionalType; - typedef Q QueueType; - - public: - - MTQueue (size_t hint = 0) - : mutexp_ (new MutexType), - mutex_ (*mutexp_), - // queue_ (hint), - queue_ (), - signal_ (false) - { - } - - MTQueue (MutexType& mutex, size_t hint = 0) - : mutexp_ (), - mutex_ (mutex), - // queue_ (hint), - queue_ (), - signal_ (false) - { - } - - public: - bool - empty () const - { - return queue_.is_empty (); - } - - size_t - size () const - { - return queue_.size (); - } - - // typedef typename QueueType::Empty Empty; - - class Empty {}; - - T& - front () - { - ACE_Unbounded_Queue_Iterator f (queue_); - T* tmp; - if (!f.next (tmp)) throw Empty (); - - return *tmp; - } - - - T const& - front () const - { - ACE_Unbounded_Queue_Const_Iterator f (queue_); - T* tmp; - if (!f.next (tmp)) throw Empty (); - - return *tmp; - } - - /* - T& - back () - { - return queue_.back (); - } - - - T const& - back () const - { - return queue_.back (); - } - */ - - void - push (T const& t) - { - signal_ = empty (); - queue_.enqueue_tail (t); - } - - void - pop () - { - T junk; - queue_.dequeue_head (junk); - } - - public: - void - lock () const - { - mutex_.acquire (); - } - - void - unlock () const - { - if (signal_) - { - signal_ = false; - - for (ConditionalSetConstIterator_ i (cond_set_); - !i.done (); - i.advance ()) - { - ConditionalType** c; - - i.next (c); - - (*c)->signal (); - } - } - - mutex_.release (); - } - - void - subscribe (ConditionalType& c) - { - //@@ should check for duplicates - // - cond_set_.insert (&c); - } - - void - unsubscribe (ConditionalType& c) - { - //@@ should check for absence - // - cond_set_.remove (&c); - } - - private: - auto_ptr mutexp_; - mutable MutexType& mutex_; - QueueType queue_; - - typedef - ACE_Unbounded_Set - ConditionalSet_; - - typedef - ACE_Unbounded_Set_Const_Iterator - ConditionalSetConstIterator_; - - ConditionalSet_ cond_set_; - - mutable bool signal_; - }; -} - -#endif // TMCAST_MT_QUEUE_HPP diff --git a/protocols/ace/TMCast/Messaging.hpp b/protocols/ace/TMCast/Messaging.hpp deleted file mode 100644 index 6a1000c3265..00000000000 --- a/protocols/ace/TMCast/Messaging.hpp +++ /dev/null @@ -1,54 +0,0 @@ -// file : TMCast/Messaging.hpp -// author : Boris Kolpackov -// cvs-id : $Id$ - -#ifndef TMCAST_MESSAGING_HPP -#define TMCAST_MESSAGING_HPP - -#include -#include - -#include "MTQueue.hpp" - -namespace TMCast -{ - class Message - { - public: - virtual - ~Message () {} - }; - - typedef - ACE_Refcounted_Auto_Ptr - MessagePtr; - - typedef - MTQueue > - MessageQueue; - - struct MessageQueueAutoLock - { - MessageQueueAutoLock (MessageQueue& q) - : q_ (q) - { - q_.lock (); - } - - void - unlock () - { - q_.unlock (); - } - - ~MessageQueueAutoLock () - { - q_.unlock (); - } - - private: - MessageQueue& q_; - }; -} - -#endif // TMCAST_MESSAGING_HPP diff --git a/protocols/ace/TMCast/Protocol.cpp b/protocols/ace/TMCast/Protocol.cpp deleted file mode 100644 index 78563281694..00000000000 --- a/protocols/ace/TMCast/Protocol.cpp +++ /dev/null @@ -1,31 +0,0 @@ -// file : TMCast/Protocol.cpp -// author : Boris Kolpackov -// cvs-id : $Id$ - -#include "Protocol.hpp" - -namespace TMCast -{ - namespace Protocol - { - namespace - { - char const* labels[] = { - "NONE", "BEGIN", "COMMIT", "ABORT", "COMMITED", "ABORTED"}; - } - - /* - std::string - tslabel (Protocol::TransactionStatus s) - { - return labels[s]; - } - - std::ostream& - operator << (std::ostream& o, Transaction const& t) - { - return o << "{" << t.id << "; " << tslabel (t.status) << "}"; - } - */ - } -} diff --git a/protocols/ace/TMCast/Protocol.hpp b/protocols/ace/TMCast/Protocol.hpp deleted file mode 100644 index d5ae6a50cd6..00000000000 --- a/protocols/ace/TMCast/Protocol.hpp +++ /dev/null @@ -1,107 +0,0 @@ -// file : TMCast/Protocol.hpp -// author : Boris Kolpackov -// cvs-id : $Id$ - -#ifndef TMCAST_PROTOCOL_HPP -#define TMCAST_PROTOCOL_HPP - -namespace TMCast -{ - namespace Protocol - { - // - // - // - unsigned long const MEMBER_ID_LENGTH = 38; - - struct MemberId - { - char id[MEMBER_ID_LENGTH]; - /* - unsigned long ip; - unsigned short port; - */ - }; - - // - // - // - typedef unsigned short TransactionId; - - - - typedef unsigned char TransactionStatus; - - TransactionStatus const TS_BEGIN = 1; - TransactionStatus const TS_COMMIT = 2; - TransactionStatus const TS_ABORT = 3; - TransactionStatus const TS_COMMITED = 4; - TransactionStatus const TS_ABORTED = 5; - - struct Transaction - { - TransactionId id; - TransactionStatus status; - }; - - // Transaction List (TL) - - // unsigned long const TL_LENGTH = 1; - - // typedef Transaction TransactionList[TL_LENGTH]; - - - // - // - // - struct MessageHeader - { - unsigned long length; - - unsigned long check_sum; - - MemberId member_id; - - Transaction current; - - //TransactionList transaction_list; - }; - - - // - // - // - - unsigned long const MAX_MESSAGE_SIZE = 768; - - unsigned long const - MAX_PAYLOAD_SIZE = MAX_MESSAGE_SIZE - sizeof (MessageHeader); - - // Protocol timing - // - // - - unsigned long const SYNC_PERIOD = 30000; // in mks - - unsigned short const VOTING_FRAME = 4; // in SYNC_PERIOD's - unsigned short const SEPARATION_FRAME = 5; // in SYNC_PERIOD's - - // FATAL_SILENCE_FRAME in SYNC_PERIOD's - // Generally it's a good idea to set it to < VOTING_FRAME + SEPARATION_FRAME - // - - short const FATAL_SILENCE_FRAME = VOTING_FRAME + SEPARATION_FRAME - 2; - - // short const FATAL_SILENCE_FRAME = 10000; - - // Helpers - - // std::string - // tslabel (Protocol::TransactionStatus s); - - // std::ostream& - // operator << (std::ostream& o, Transaction const& t); - } -} - -#endif // TMCAST_PROTOCOL_HPP diff --git a/protocols/ace/TMCast/README b/protocols/ace/TMCast/README deleted file mode 100644 index d061c7b2cba..00000000000 --- a/protocols/ace/TMCast/README +++ /dev/null @@ -1,58 +0,0 @@ - - -Architecture - -TMCast (stands for Transaction MultiCast) is an implementation of a -transactional multicast protocol. In essence, the idea is to represent -message delivery to members of a multicast group as a transaction - -atomic, consistent and isolated action. Multicast transaction can -be viewed as an atomic transition of group members to a new state. -If we define Mo as a set of operational (non-faulty) members of the -group, Mf as a set of faulty members of the group, Ma as a set of -members that view transition Tn as aborted and Mc as a set of members -that view transition Tn as committed, then this atomic transition Tn -can be described as one of the following: - -Mo(Tn-1) = Ma(T) U Mf(T) -Mo(Tn-1) = Mc(T) U Mf(T) - -Or, in other words, after transaction T has been committed (aborted), -all operational (before transaction T) members are either in -committed (aborted) or failed state. - -Thus, for each member of the group, outcome of the transaction can -be commit, abort or member failure. It is important for a member -to exhibit a failfast (error latency is less than transaction cycle) -behavior. Or, in other words, if the member transitioned into a wrong -state, it is guaranteed to fail instead of delivering wrong result. - -In order to achieve such error detection in decentralized environment, -certain limitations should be imposed. One of the most user-visible -limitation is the fact that the lifetime of the group with only -one member is very short. This is because there is not way for a -member to distinguish "no members yet" case from "my link to the -group is down". In such situation, the member assumes the latter case. -There is also a military saying that puts it quite nicely: two is one, -one is nothing. - - -State of Implementation - -Current implementation is in prototypical stage. The following parts -are not implemented or still under development: - -* Handling of network partitioning (TODO) - -* Redundant network support (TODO) - -* Member failure detection (partial implementation) - - -Examples - -There is a simple example available in examples/TMCast/Member with -corresponding README. - - --- -Boris Kolpackov \ No newline at end of file diff --git a/protocols/ace/TMCast/TMCast.mpc b/protocols/ace/TMCast/TMCast.mpc deleted file mode 100644 index 0899982dd09..00000000000 --- a/protocols/ace/TMCast/TMCast.mpc +++ /dev/null @@ -1,8 +0,0 @@ -// -*- MPC -*- -// $Id$ - -project : acelib, core { - requires += exceptions - sharedname = TMCast - dynamicflags += TMCAST_BUILD_DLL -} diff --git a/protocols/ace/TMCast/TransactionController.hpp b/protocols/ace/TMCast/TransactionController.hpp deleted file mode 100644 index 66c924faaf1..00000000000 --- a/protocols/ace/TMCast/TransactionController.hpp +++ /dev/null @@ -1,384 +0,0 @@ -// file : TMCast/TransactionController.hpp -// author : Boris Kolpackov -// cvs-id : $Id$ - -#include -#include - -#include "Protocol.hpp" -#include "Messaging.hpp" - -namespace TMCast -{ - - // Messages - // - // - class Send : public virtual Message - { - public: - Send (void const* msg, size_t size) - : size_ (size) - { - ACE_OS::memcpy (payload_, msg, size_); - } - - void const* - payload () const - { - return payload_; - } - - size_t - size () const - { - return size_; - } - - private: - size_t size_; - char payload_[Protocol::MAX_PAYLOAD_SIZE]; - }; - - typedef - ACE_Refcounted_Auto_Ptr - SendPtr; - - - class Recv : public virtual Message - { - public: - Recv (void const* msg, size_t size) - : size_ (size) - { - ACE_OS::memcpy (payload_, msg, size_); - } - - void const* - payload () const - { - return payload_; - } - - size_t - size () const - { - return size_; - } - - private: - size_t size_; - char payload_[Protocol::MAX_PAYLOAD_SIZE]; - }; - - typedef - ACE_Refcounted_Auto_Ptr - RecvPtr; - - class Aborted : public virtual Message {}; - - class Commited : public virtual Message {}; - - - // - // - // - class TransactionController - { - public: - TransactionController (MessageQueue& in, - MessageQueue& send_out, - MessageQueue& recv_out) - : trace_ (false), - voting_duration_ (0), - separation_duration_ (0), - in_ (in), - send_out_ (send_out), - recv_out_ (recv_out) - { - current_.id = 0; - current_.status = Protocol::TS_COMMITED; - } - - public: - class Failure {}; - - - void - outsync (Protocol::Transaction& c, void* payload, size_t& size) - { - if (current_.status == Protocol::TS_COMMIT || - current_.status == Protocol::TS_ABORT) - { - if (++voting_duration_ >= Protocol::VOTING_FRAME) - { - // end of voting frame - - if (current_.status == Protocol::TS_COMMIT) - { - { - if (initiated_) - { - MessageQueueAutoLock lock (send_out_); - send_out_.push (MessagePtr (new Commited)); - } - else // joined transaction - { - MessageQueueAutoLock lock (recv_out_); - recv_out_.push (MessagePtr (recv_.release ())); - recv_ = RecvPtr (); - } - } - - current_.status = Protocol::TS_COMMITED; - - // if (trace_) cerr << "commited transaction with id " - // << current_.id << endl; - } - else // TS_ABORT - { - if (initiated_) - { - MessageQueueAutoLock lock (send_out_); - send_out_.push (MessagePtr (new Aborted)); - } - else - { - // free revc_ buffer if necessary - // - if (recv_.get ()) recv_ = RecvPtr (); - } - - - current_.status = Protocol::TS_ABORTED; - - // if (trace_) cerr << "aborted transaction with id " - // << current_.id << endl; - } - - // start transaction separation frame (counts down) - // +1 because it will be decremented on this iteration - separation_duration_ = Protocol::SEPARATION_FRAME + 1; - } - } - - // Set current outsync info - - c.id = current_.id; - c.status = current_.status; - - - // Do some post-processing - - switch (current_.status) - { - case Protocol::TS_COMMITED: - case Protocol::TS_ABORTED: - { - if (separation_duration_ > 0) --separation_duration_; - break; - } - case Protocol::TS_BEGIN: - { - // transfer payload - - size = send_->size (); - memcpy (payload, send_->payload (), size); - - send_ = SendPtr (); - - // get redy to vote for 'commit' - - current_.status = Protocol::TS_COMMIT; - voting_duration_ = 0; - } - } - } - - void - current_transaction (Protocol::Transaction const& t, - void const* payload, - size_t size) - { - Protocol::TransactionId& id = current_.id; - Protocol::TransactionStatus& s = current_.status; - - if (id == 0 && t.id != 0) // catch up - { - switch (t.status) - { - case Protocol::TS_BEGIN: - case Protocol::TS_COMMIT: - case Protocol::TS_ABORT: - { - id = t.id - 1; - s = Protocol::TS_COMMITED; - break; - } - case Protocol::TS_ABORTED: - case Protocol::TS_COMMITED: - { - id = t.id; - s = t.status; - break; - } - } - - // if (trace_) cerr << "caught up with id " << id << endl; - } - - bool stable (s == Protocol::TS_COMMITED || s == Protocol::TS_ABORTED); - - switch (t.status) - { - case Protocol::TS_BEGIN: - { - if (!stable || t.id != id + 1) - { - // Transaction is in progress or hole in transaction id's - - // cerr << "unexpected request to join " << t - // << " while on " << current_ << endl; - - // if (!stable) cerr << "voting progress is " << voting_duration_ - // << "/" << Protocol::VOTING_FRAME << endl; - - if (t.id == id) // collision - { - if (!stable && s != Protocol::TS_ABORT) - { - // abort both - // cerr << "aborting both transactions" << endl; - - s = Protocol::TS_ABORT; - voting_duration_ = 0; //@@ reset voting frame - } - } - else - { - // @@ delicate case. need to think more - - // cerr << "Declaring node failed." << endl; - throw Failure (); - } - } - else - { - // join the transaction - - initiated_ = false; - - recv_ = RecvPtr (new Recv (payload, size)); - - id = t.id; - s = Protocol::TS_COMMIT; - voting_duration_ = 0; - - // if (trace_) cerr << "joining-for-commit transaction with id " - // << id << endl; - } - break; - } - case Protocol::TS_COMMIT: - { - if (stable && id == t.id - 1) - { - // not begin and and we haven't joined - - // join for abort - - initiated_ = false; - - current_.id = t.id; - current_.status = Protocol::TS_ABORT; - voting_duration_ = 0; - - // if (trace_) cerr << "joining-for-abort transaction with id " - // << current_.id << endl; - } - break; - } - case Protocol::TS_ABORT: - { - if ((!stable && id == t.id && s == Protocol::TS_COMMIT) || - (stable && id == t.id - 1)) // abort current || new transaction - { - // if (trace_) cerr << "voting-for-abort on transaction with id " - // << current_.id << endl; - - id = t.id; - s = Protocol::TS_ABORT; - - voting_duration_ = 0; //@@ reseting voting_duration_ - } - else - { - } - - break; - } - case Protocol::TS_ABORTED: - case Protocol::TS_COMMITED: - { - // nothing for now - break; - } - } - } - - void - api () - { - if ((current_.status == Protocol::TS_COMMITED || - current_.status == Protocol::TS_ABORTED) && - separation_duration_ == 0) // no transaction in progress - { - // start new transaction - - // Note that in_ is already locked by Scheduler - - MessagePtr m (in_.front ()); - in_.pop (); - - if (typeid (*m) == typeid (Send)) - { - send_ = SendPtr (dynamic_cast (m.release ())); - } - else - { - // cerr << "Expecting Send but received " << typeid (*m).name () - // << endl; - - ::abort (); - } - - current_.id++; - current_.status = Protocol::TS_BEGIN; - - initiated_ = true; - - // if (trace_) cerr << "starting transaction with id " << current_.id - // << endl; - } - } - - private: - typedef ACE_Guard AutoLock; - - bool trace_; - - Protocol::Transaction current_; - - bool initiated_; - - unsigned short voting_duration_; - unsigned short separation_duration_; - - MessageQueue& in_; - MessageQueue& send_out_; - MessageQueue& recv_out_; - - SendPtr send_; - RecvPtr recv_; - }; -} -- cgit v1.2.1