summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/cluster/Cluster.h
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/cluster/Cluster.h')
-rw-r--r--qpid/cpp/src/qpid/cluster/Cluster.h105
1 files changed, 55 insertions, 50 deletions
diff --git a/qpid/cpp/src/qpid/cluster/Cluster.h b/qpid/cpp/src/qpid/cluster/Cluster.h
index aff213b6c9..199a93a7c5 100644
--- a/qpid/cpp/src/qpid/cluster/Cluster.h
+++ b/qpid/cpp/src/qpid/cluster/Cluster.h
@@ -21,89 +21,80 @@
#include "qpid/cluster/Cpg.h"
#include "qpid/framing/FrameHandler.h"
-#include "qpid/sys/Thread.h"
+#include "qpid/shared_ptr.h"
#include "qpid/sys/Monitor.h"
#include "qpid/sys/Runnable.h"
-#include "qpid/shared_ptr.h"
+#include "qpid/sys/Thread.h"
+#include "qpid/log/Logger.h"
+
#include <boost/function.hpp>
#include <boost/scoped_ptr.hpp>
+
#include <map>
#include <vector>
-namespace qpid {
-
-namespace broker {
-class HandlerUpdater;
-}
-
-namespace cluster {
-
-class ChannelManager;
+namespace qpid { namespace cluster {
/**
* Represents a cluster, provides access to data about members.
- *
- * Implements a FrameHandler that multicasts frames to the cluster.
- *
- * Requires a handler for frames arriving from the cluster,
- * normally a ChannelManager but other handlers could be interposed
- * for testing, logging etc.
+ * Implements HandlerUpdater to manage handlers that route frames to
+ * and from the cluster.
*/
-class Cluster : public framing::FrameHandler, private sys::Runnable {
+class Cluster : private sys::Runnable, private Cpg::Handler
+{
public:
/** Details of a cluster member */
struct Member {
typedef shared_ptr<const Member> Ptr;
- /** Status of a cluster member. */
- enum Status {
- JOIN, ///< Process joined the group.
- LEAVE, ///< Process left the group cleanly.
- NODEDOWN, ///< Process's node went down.
- NODEUP, ///< Process's node joined the cluster.
- PROCDOWN, ///< Process died without leaving.
- BROKER ///< Broker details are available.
- };
-
- Member(const cpg_address&);
- std::string url;
- Status status;
+ Member(const std::string& url_) : url(url_) {}
+ std::string url; ///< Broker address.
};
typedef std::vector<Member::Ptr> MemberList;
-
+
/**
- * Create a cluster object but do not joing.
+ * Join a cluster.
* @param name of the cluster.
* @param url of this broker, sent to the cluster.
*/
Cluster(const std::string& name, const std::string& url);
- ~Cluster();
-
- /** Join the cluster.
- *@handler is the handler for frames arriving from the cluster.
- */
- void join(framing::FrameHandler::Chain handler);
-
- /** Multicast a frame to the cluster. */
- void handle(framing::AMQFrame&);
+ virtual ~Cluster();
/** Get the current cluster membership. */
MemberList getMembers() const;
- /** Called when membership changes. */
- void setCallback(boost::function<void()>);
-
/** Number of members in the cluster. */
size_t size() const;
+ bool empty() const { return size() == 0; }
+
+ /** Get handler chains to send frames to the cluster */
+ framing::FrameHandler::Chains getToChains() {
+ return toChains;
+ }
+
+ /** Set handler chains for frames received from the cluster */
+ void setFromChains(const framing::FrameHandler::Chains& chains);
+
+ /** Wait for predicate(*this) to be true, up to timeout.
+ *@return True if predicate became true, false if timed out.
+ *Note the predicate may not be true after wait returns,
+ *all the caller can say is it was true at some earlier point.
+ */
+ bool wait(boost::function<bool(const Cluster&)> predicate,
+ sys::Duration timeout=sys::TIME_INFINITE) const;
+
private:
typedef Cpg::Id Id;
typedef std::map<Id, shared_ptr<Member> > MemberMap;
+ typedef std::map<
+ framing::ChannelId, framing::FrameHandler::Chains> ChannelMap;
+
+ void mcast(framing::AMQFrame&); ///< send frame by multicast.
+ void notify(); ///< Notify cluster of my details.
- void run();
- void notify();
- void cpgDeliver(
+ void deliver(
cpg_handle_t /*handle*/,
struct cpg_name *group,
uint32_t /*nodeid*/,
@@ -111,7 +102,7 @@ class Cluster : public framing::FrameHandler, private sys::Runnable {
void* /*msg*/,
int /*msg_len*/);
- void cpgConfigChange(
+ void configChange(
cpg_handle_t /*handle*/,
struct cpg_name */*group*/,
struct cpg_address */*members*/, int /*nMembers*/,
@@ -119,16 +110,30 @@ class Cluster : public framing::FrameHandler, private sys::Runnable {
struct cpg_address */*joined*/, int /*nJoined*/
);
+ void run();
+ bool handleClusterFrame(Id from, framing::AMQFrame&);
+
mutable sys::Monitor lock;
+ boost::scoped_ptr<Cpg> cpg;
Cpg::Name name;
std::string url;
- boost::scoped_ptr<Cpg> cpg;
Id self;
MemberMap members;
+ ChannelMap channels;
sys::Thread dispatcher;
boost::function<void()> callback;
+ framing::FrameHandler::Chains toChains;
+ framing::FrameHandler::Chains fromChains;
+
+ struct IncomingHandler;
+ struct OutgoingHandler;
+
+ friend struct IncomingHandler;
+ friend struct OutgoingHandler;
friend std::ostream& operator <<(std::ostream&, const Cluster&);
+ friend std::ostream& operator <<(std::ostream&, const MemberMap::value_type&);
+ friend std::ostream& operator <<(std::ostream&, const MemberMap&);
};
}} // namespace qpid::cluster