diff options
Diffstat (limited to 'qpid/cpp/src/qpid/cluster/Cluster.h')
-rw-r--r-- | qpid/cpp/src/qpid/cluster/Cluster.h | 40 |
1 files changed, 22 insertions, 18 deletions
diff --git a/qpid/cpp/src/qpid/cluster/Cluster.h b/qpid/cpp/src/qpid/cluster/Cluster.h index 5187cb08e7..24db07b32b 100644 --- a/qpid/cpp/src/qpid/cluster/Cluster.h +++ b/qpid/cpp/src/qpid/cluster/Cluster.h @@ -19,19 +19,19 @@ * */ -#include "qpid/cluster/Cpg.h" -#include "qpid/cluster/Event.h" -#include "qpid/sys/PollableQueue.h" -#include "qpid/cluster/NoOpConnectionOutputHandler.h" +#include "Cpg.h" +#include "Event.h" +#include "NoOpConnectionOutputHandler.h" +#include "ClusterMap.h" #include "qpid/broker/Broker.h" +#include "qpid/sys/PollableQueue.h" #include "qpid/sys/Monitor.h" #include "qpid/framing/AMQP_AllOperations.h" #include "qpid/Url.h" #include <boost/intrusive_ptr.hpp> -#include <map> #include <vector> namespace qpid { @@ -68,33 +68,38 @@ class Cluster : private Cpg::Handler bool empty() const { return size() == 0; } /** Send to the cluster */ - void mcastFrame(const framing::AMQFrame&, const ConnectionId&); + void mcastControl(const framing::AMQBody& controlBody, Connection* cptr); void mcastBuffer(const char*, size_t, const ConnectionId&); void mcastEvent(const Event& e); /** Leave the cluster */ void leave(); - void urlNotice(const MemberId&, const std::string& url); - void ready(const MemberId&); + void dumpRequest(const MemberId&, const std::string& url); + void dumpError(const MemberId& dumper, const MemberId& dumpee); + void ready(const MemberId&, const std::string& url); + void mapInit(const framing::FieldTable& members, + const framing::FieldTable& dumpees, + const framing::FieldTable& dumps); MemberId getSelf() const { return self; } void stall(); - void unStall(); + void ready(); void shutdown(); broker::Broker& getBroker(); private: - typedef std::map<MemberId, Url> UrlMap; typedef std::map<ConnectionId, boost::intrusive_ptr<cluster::Connection> > ConnectionMap; typedef sys::PollableQueue<Event> EventQueue; enum State { - DISCARD, // Initially discard connection events up to my own join message. - READY, // Normal processing. - STALL // Stalled while a new member joins. + DISCARD, // Discard updates up to catchup point. + HAVE_DUMP, // Received state dump, waiting for catchup point. + CATCHUP, // Stalled at catchup point, waiting for dump. + DUMPING, // Stalled while sending a state dump. + READY // Normal processing. }; void connectionEvent(const Event&); @@ -126,23 +131,22 @@ class Cluster : private Cpg::Handler boost::intrusive_ptr<cluster::Connection> getConnection(const ConnectionId&); + void dumpTo(const Url&); + void dumpError(const MemberId&, const Url&, const char* msg); + mutable sys::Monitor lock; // Protect access to members. broker::Broker& broker; boost::shared_ptr<sys::Poller> poller; Cpg cpg; Cpg::Name name; Url url; - UrlMap urls; + ClusterMap map; MemberId self; ConnectionMap connections; NoOpConnectionOutputHandler shadowOut; sys::DispatchHandle cpgDispatchHandle; EventQueue connectionEventQueue; State state; - - friend std::ostream& operator <<(std::ostream&, const Cluster&); - friend std::ostream& operator <<(std::ostream&, const UrlMap::value_type&); - friend std::ostream& operator <<(std::ostream&, const UrlMap&); }; }} // namespace qpid::cluster |