summaryrefslogtreecommitdiff
path: root/ACE/protocols/ace/TMCast/Group.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'ACE/protocols/ace/TMCast/Group.cpp')
-rw-r--r--ACE/protocols/ace/TMCast/Group.cpp506
1 files changed, 506 insertions, 0 deletions
diff --git a/ACE/protocols/ace/TMCast/Group.cpp b/ACE/protocols/ace/TMCast/Group.cpp
new file mode 100644
index 00000000000..1f2b2a60dfd
--- /dev/null
+++ b/ACE/protocols/ace/TMCast/Group.cpp
@@ -0,0 +1,506 @@
+// file : ACE_TMCast/Group.cpp
+// author : Boris Kolpackov <boris@dre.vanderbilt.edu>
+// cvs-id : $Id$
+
+#include "Group.hpp"
+
+#include <typeinfo>
+
+// OS primitives
+#include <ace/OS.h>
+#include <ace/OS_NS_stdlib.h>
+#include <ace/Synch.h>
+#include <ace/Time_Value.h>
+#include <ace/SOCK_Dgram_Mcast.h>
+
+#include "Messaging.hpp"
+
+#include "Protocol.hpp"
+
+// Components
+
+#include "LinkListener.hpp"
+#include "FaultDetector.hpp"
+#include "TransactionController.hpp"
+
+namespace ACE_TMCast
+{
+ bool
+ operator== (std::type_info const* pa, std::type_info const& b)
+ {
+ return *pa == b;
+ }
+
+ //
+ //
+ //
+ class Terminate : public virtual Message {};
+
+
+ //
+ //
+ //
+ class Failure : public virtual Message {};
+
+
+ //
+ //
+ //
+ class Scheduler
+ {
+ public:
+ Scheduler (ACE_INET_Addr const& addr,
+ char const* id,
+ MessageQueue& out_send_data,
+ MessageQueue& out_recv_data,
+ MessageQueue& out_control)
+
+ : cond_ (mutex_),
+
+ addr_ (addr),
+ sock_ (),
+
+ out_control_ (out_control),
+
+ in_data_ (mutex_),
+ in_link_data_(mutex_),
+ in_control_ (mutex_),
+
+ sync_schedule (ACE_OS::gettimeofday ()),
+
+ transaction_controller_ (in_data_, out_send_data, out_recv_data)
+ {
+ ACE_OS::strncpy (id_, id, Protocol::MEMBER_ID_LENGTH);
+ id_[Protocol::MEMBER_ID_LENGTH - 1] = '\0';
+
+ sock_.set_option (IP_MULTICAST_TTL, 32); // @@ ttl is hardcoded
+
+ in_data_.subscribe (cond_);
+ in_link_data_.subscribe (cond_);
+ in_control_.subscribe (cond_);
+
+ ACE_thread_t unused;
+ if (ACE_OS::thr_create (&thread_thunk,
+ this,
+ THR_JOINABLE,
+ &unused,
+ &thread_) != 0) ACE_OS::abort ();
+ }
+
+ virtual ~Scheduler ()
+ {
+ {
+ MessageQueueAutoLock lock (in_control_);
+
+ in_control_.push (MessagePtr (new Terminate));
+ }
+
+ if (ACE_OS::thr_join (thread_, 0) != 0) ACE_OS::abort ();
+
+ // cerr << "Scheduler is down." << endl;
+ }
+
+ public:
+ MessageQueue&
+ in_data ()
+ {
+ return in_data_;
+ }
+
+ private:
+ static ACE_THR_FUNC_RETURN
+ thread_thunk (void* arg)
+ {
+ Scheduler* obj = reinterpret_cast<Scheduler*> (arg);
+ obj->execute ();
+ return 0;
+ }
+
+ void
+ execute ()
+ {
+ try
+ {
+ sock_.join (addr_);
+ auto_ptr<LinkListener> ll (new LinkListener (sock_, in_link_data_));
+
+ {
+ AutoLock lock (mutex_);
+
+ // Loop
+ //
+ //
+
+ while (true)
+ {
+ cond_.wait (&sync_schedule);
+
+ // "Loop of Fairness"
+
+ bool done = false;
+
+ do
+ {
+ // control message
+ //
+ //
+ if (!in_control_.empty ())
+ {
+ done = true;
+ break;
+ }
+
+ // outsync
+ //
+ //
+ if (sync_schedule < ACE_OS::gettimeofday ())
+ {
+ // OUTSYNC
+
+ outsync ();
+
+ // schedule next outsync
+ sync_schedule =
+ ACE_OS::gettimeofday () +
+ ACE_Time_Value (0, Protocol::SYNC_PERIOD);
+ }
+
+ // link message
+ //
+ //
+ if (!in_link_data_.empty ())
+ {
+ MessagePtr m (in_link_data_.front ());
+ in_link_data_.pop ();
+
+ std::type_info const* exp = &typeid (*m);
+
+ if (exp == typeid (LinkFailure))
+ {
+ // cerr << "link failure" << endl;
+ throw false;
+ }
+ else if (exp == typeid (LinkData))
+ {
+
+ LinkData* data = dynamic_cast<LinkData*> (m.get ());
+
+ // INSYNC, TL, CT
+
+ // Filter out loopback.
+ //
+ if (ACE_OS::strcmp (data->header().member_id.id, id_) != 0)
+ {
+ insync ();
+ transaction_list ();
+ current_transaction (data->header().current,
+ data->payload (),
+ data->size ());
+ }
+ }
+ else
+ {
+ // cerr << "unknown message type from link listener: "
+ // << typeid (*m).name () << endl;
+ ACE_OS::abort ();
+ }
+ }
+
+ // api message
+ //
+ //
+ if (!in_data_.empty ())
+ {
+ // API
+
+ api ();
+ }
+
+ } while (!in_link_data_.empty() ||
+ sync_schedule < ACE_OS::gettimeofday ());
+
+ if (done) break;
+ }
+ }
+ }
+ catch (...)
+ {
+ // cerr << "Exception in scheduler loop." << endl;
+ MessageQueueAutoLock lock (out_control_);
+ out_control_.push (MessagePtr (new Failure));
+ }
+ }
+
+ // Events
+ //
+ // Order:
+ //
+ // INSYNC, TSL, VOTE, BEGIN
+ // API
+ // OUTSYNC
+ //
+
+ void
+ insync ()
+ {
+ fault_detector_.insync ();
+ }
+
+ void
+ outsync ()
+ {
+ char buf[Protocol::MAX_MESSAGE_SIZE];
+
+ Protocol::MessageHeader* hdr =
+ reinterpret_cast<Protocol::MessageHeader*> (buf);
+
+ void* data = buf + sizeof (Protocol::MessageHeader);
+
+ hdr->length = sizeof (Protocol::MessageHeader);
+ hdr->check_sum = 0;
+
+ ACE_OS::strcpy (hdr->member_id.id, id_);
+
+ size_t size (0);
+
+ transaction_controller_.outsync (hdr->current, data, size);
+
+ hdr->length += size;
+
+ fault_detector_.outsync ();
+
+ // sock_.send (buf, hdr->length, addr_);
+ sock_.send (buf, hdr->length);
+ }
+
+ void
+ transaction_list ()
+ {
+ }
+
+ void
+ current_transaction (Protocol::Transaction const& t,
+ void const* payload,
+ size_t size)
+ {
+ transaction_controller_.current_transaction (t, payload, size);
+ }
+
+ void
+ api ()
+ {
+ transaction_controller_.api ();
+ }
+
+ private:
+ ACE_hthread_t thread_;
+
+ ACE_Thread_Mutex mutex_;
+ ACE_Condition<ACE_Thread_Mutex> cond_;
+
+ typedef ACE_Guard<ACE_Thread_Mutex> AutoLock;
+
+ char id_[Protocol::MEMBER_ID_LENGTH];
+
+ ACE_INET_Addr addr_;
+ ACE_SOCK_Dgram_Mcast sock_;
+
+ MessageQueue& out_control_;
+
+ MessageQueue in_data_;
+ MessageQueue in_link_data_;
+ MessageQueue in_control_;
+
+ // Protocol state
+ //
+ //
+
+ ACE_Time_Value sync_schedule;
+
+ FaultDetector fault_detector_;
+ TransactionController transaction_controller_;
+ };
+
+
+ //
+ //
+ //
+ class Group::GroupImpl
+ {
+ public:
+ virtual ~GroupImpl ()
+ {
+ }
+
+ GroupImpl (ACE_INET_Addr const& addr, char const* id)
+ throw (Group::Failed)
+ : send_cond_ (mutex_),
+ recv_cond_ (mutex_),
+ failed_ (false),
+ in_send_data_ (mutex_),
+ in_recv_data_ (mutex_),
+ in_control_ (mutex_),
+ scheduler_ (new Scheduler (addr,
+ id,
+ in_send_data_,
+ in_recv_data_,
+ in_control_)),
+ out_data_ (scheduler_->in_data ())
+ {
+ in_send_data_.subscribe (send_cond_);
+ in_recv_data_.subscribe (recv_cond_);
+
+ in_control_.subscribe (send_cond_);
+ in_control_.subscribe (recv_cond_);
+ }
+
+ void
+ send (void const* msg, size_t size)
+ throw (Group::InvalidArg, Group::Failed, Group::Aborted)
+ {
+ if (size > Protocol::MAX_PAYLOAD_SIZE) throw InvalidArg ();
+
+ // Note the potential deadlock if I lock mutex_ and out_data_ in
+ // reverse order.
+
+ MessageQueueAutoLock l1 (out_data_);
+ AutoLock l2 (mutex_);
+
+ throw_if_failed ();
+
+ out_data_.push (MessagePtr (new Send (msg, size)));
+
+ l1.unlock (); // no need to keep it locked
+
+ while (true)
+ {
+ throw_if_failed ();
+
+ if (!in_send_data_.empty ())
+ {
+ MessagePtr m (in_send_data_.front ());
+ in_send_data_.pop ();
+
+ std::type_info const* exp = &typeid (*m);
+
+ if (exp == typeid (ACE_TMCast::Aborted))
+ {
+ throw Group::Aborted ();
+ }
+ else if (exp == typeid (Commited))
+ {
+ return;
+ }
+ else
+ {
+ // cerr << "send: group-scheduler messaging protocol violation; "
+ // << "unexpected message " << typeid (*m).name ()
+ // << " " << typeid (Aborted).name () << endl;
+
+ ACE_OS::abort ();
+ }
+ }
+
+ // cerr << "send: waiting on condition" << endl;
+ send_cond_.wait ();
+ // cerr << "send: wokeup on condition" << endl;
+ }
+ }
+
+
+
+ size_t
+ recv (void* msg, size_t size) throw (Group::Failed, Group::InsufficienSpace)
+ {
+ AutoLock lock (mutex_);
+
+ while (true)
+ {
+ throw_if_failed ();
+
+ if (!in_recv_data_.empty ())
+ {
+ MessagePtr m (in_recv_data_.front ());
+ in_recv_data_.pop ();
+
+ std::type_info const* exp = &typeid (*m);
+
+ if (exp == typeid (Recv))
+ {
+ Recv* data = dynamic_cast<Recv*> (m.get ());
+
+ if (size < data->size ()) throw Group::InsufficienSpace ();
+
+ memcpy (msg, data->payload (), data->size ());
+
+ return data->size ();
+ }
+ else
+ {
+ // cerr << "recv: group-scheduler messaging protocol violation. "
+ // << "unexpected message " << typeid (*m).name () << endl;
+
+ ACE_OS::abort ();
+ }
+ }
+
+ recv_cond_.wait ();
+ }
+ }
+
+ private:
+ void
+ throw_if_failed ()
+ {
+ if (!failed_ && !in_control_.empty ()) failed_ = true;
+
+ if (failed_) throw Group::Failed ();
+ }
+
+ private:
+ ACE_Thread_Mutex mutex_;
+ ACE_Condition<ACE_Thread_Mutex> send_cond_;
+ ACE_Condition<ACE_Thread_Mutex> recv_cond_;
+
+ typedef ACE_Guard<ACE_Thread_Mutex> AutoLock;
+
+ bool failed_;
+
+ MessageQueue in_send_data_;
+ MessageQueue in_recv_data_;
+ MessageQueue in_control_;
+
+ auto_ptr<Scheduler> scheduler_;
+
+ MessageQueue& out_data_;
+ };
+
+
+ // Group
+ //
+ //
+ Group::
+ Group (ACE_INET_Addr const& addr, char const* id)
+ throw (Group::Failed)
+ : pimpl_ (new GroupImpl (addr, id))
+ {
+ }
+
+ Group::
+ ~Group ()
+ {
+ }
+
+ void
+ Group::send (void const* msg, size_t size) throw (Group::InvalidArg, Group::Failed, Group::Aborted)
+ {
+ pimpl_->send (msg, size);
+ }
+
+ size_t
+ Group::recv (void* msg, size_t size) throw (Group::Failed, Group::InsufficienSpace)
+ {
+ return pimpl_->recv (msg, size);
+ }
+}
+