summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/cluster/Cluster.h
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/cluster/Cluster.h')
-rw-r--r--qpid/cpp/src/qpid/cluster/Cluster.h40
1 files changed, 22 insertions, 18 deletions
diff --git a/qpid/cpp/src/qpid/cluster/Cluster.h b/qpid/cpp/src/qpid/cluster/Cluster.h
index 5187cb08e7..24db07b32b 100644
--- a/qpid/cpp/src/qpid/cluster/Cluster.h
+++ b/qpid/cpp/src/qpid/cluster/Cluster.h
@@ -19,19 +19,19 @@
*
*/
-#include "qpid/cluster/Cpg.h"
-#include "qpid/cluster/Event.h"
-#include "qpid/sys/PollableQueue.h"
-#include "qpid/cluster/NoOpConnectionOutputHandler.h"
+#include "Cpg.h"
+#include "Event.h"
+#include "NoOpConnectionOutputHandler.h"
+#include "ClusterMap.h"
#include "qpid/broker/Broker.h"
+#include "qpid/sys/PollableQueue.h"
#include "qpid/sys/Monitor.h"
#include "qpid/framing/AMQP_AllOperations.h"
#include "qpid/Url.h"
#include <boost/intrusive_ptr.hpp>
-#include <map>
#include <vector>
namespace qpid {
@@ -68,33 +68,38 @@ class Cluster : private Cpg::Handler
bool empty() const { return size() == 0; }
/** Send to the cluster */
- void mcastFrame(const framing::AMQFrame&, const ConnectionId&);
+ void mcastControl(const framing::AMQBody& controlBody, Connection* cptr);
void mcastBuffer(const char*, size_t, const ConnectionId&);
void mcastEvent(const Event& e);
/** Leave the cluster */
void leave();
- void urlNotice(const MemberId&, const std::string& url);
- void ready(const MemberId&);
+ void dumpRequest(const MemberId&, const std::string& url);
+ void dumpError(const MemberId& dumper, const MemberId& dumpee);
+ void ready(const MemberId&, const std::string& url);
+ void mapInit(const framing::FieldTable& members,
+ const framing::FieldTable& dumpees,
+ const framing::FieldTable& dumps);
MemberId getSelf() const { return self; }
void stall();
- void unStall();
+ void ready();
void shutdown();
broker::Broker& getBroker();
private:
- typedef std::map<MemberId, Url> UrlMap;
typedef std::map<ConnectionId, boost::intrusive_ptr<cluster::Connection> > ConnectionMap;
typedef sys::PollableQueue<Event> EventQueue;
enum State {
- DISCARD, // Initially discard connection events up to my own join message.
- READY, // Normal processing.
- STALL // Stalled while a new member joins.
+ DISCARD, // Discard updates up to catchup point.
+ HAVE_DUMP, // Received state dump, waiting for catchup point.
+ CATCHUP, // Stalled at catchup point, waiting for dump.
+ DUMPING, // Stalled while sending a state dump.
+ READY // Normal processing.
};
void connectionEvent(const Event&);
@@ -126,23 +131,22 @@ class Cluster : private Cpg::Handler
boost::intrusive_ptr<cluster::Connection> getConnection(const ConnectionId&);
+ void dumpTo(const Url&);
+ void dumpError(const MemberId&, const Url&, const char* msg);
+
mutable sys::Monitor lock; // Protect access to members.
broker::Broker& broker;
boost::shared_ptr<sys::Poller> poller;
Cpg cpg;
Cpg::Name name;
Url url;
- UrlMap urls;
+ ClusterMap map;
MemberId self;
ConnectionMap connections;
NoOpConnectionOutputHandler shadowOut;
sys::DispatchHandle cpgDispatchHandle;
EventQueue connectionEventQueue;
State state;
-
- friend std::ostream& operator <<(std::ostream&, const Cluster&);
- friend std::ostream& operator <<(std::ostream&, const UrlMap::value_type&);
- friend std::ostream& operator <<(std::ostream&, const UrlMap&);
};
}} // namespace qpid::cluster