diff options
Diffstat (limited to 'qpid/cpp/src/qpid/cluster/Cluster.h')
-rw-r--r-- | qpid/cpp/src/qpid/cluster/Cluster.h | 105 |
1 files changed, 55 insertions, 50 deletions
diff --git a/qpid/cpp/src/qpid/cluster/Cluster.h b/qpid/cpp/src/qpid/cluster/Cluster.h index aff213b6c9..199a93a7c5 100644 --- a/qpid/cpp/src/qpid/cluster/Cluster.h +++ b/qpid/cpp/src/qpid/cluster/Cluster.h @@ -21,89 +21,80 @@ #include "qpid/cluster/Cpg.h" #include "qpid/framing/FrameHandler.h" -#include "qpid/sys/Thread.h" +#include "qpid/shared_ptr.h" #include "qpid/sys/Monitor.h" #include "qpid/sys/Runnable.h" -#include "qpid/shared_ptr.h" +#include "qpid/sys/Thread.h" +#include "qpid/log/Logger.h" + #include <boost/function.hpp> #include <boost/scoped_ptr.hpp> + #include <map> #include <vector> -namespace qpid { - -namespace broker { -class HandlerUpdater; -} - -namespace cluster { - -class ChannelManager; +namespace qpid { namespace cluster { /** * Represents a cluster, provides access to data about members. - * - * Implements a FrameHandler that multicasts frames to the cluster. - * - * Requires a handler for frames arriving from the cluster, - * normally a ChannelManager but other handlers could be interposed - * for testing, logging etc. + * Implements HandlerUpdater to manage handlers that route frames to + * and from the cluster. */ -class Cluster : public framing::FrameHandler, private sys::Runnable { +class Cluster : private sys::Runnable, private Cpg::Handler +{ public: /** Details of a cluster member */ struct Member { typedef shared_ptr<const Member> Ptr; - /** Status of a cluster member. */ - enum Status { - JOIN, ///< Process joined the group. - LEAVE, ///< Process left the group cleanly. - NODEDOWN, ///< Process's node went down. - NODEUP, ///< Process's node joined the cluster. - PROCDOWN, ///< Process died without leaving. - BROKER ///< Broker details are available. - }; - - Member(const cpg_address&); - std::string url; - Status status; + Member(const std::string& url_) : url(url_) {} + std::string url; ///< Broker address. }; typedef std::vector<Member::Ptr> MemberList; - + /** - * Create a cluster object but do not joing. + * Join a cluster. * @param name of the cluster. * @param url of this broker, sent to the cluster. */ Cluster(const std::string& name, const std::string& url); - ~Cluster(); - - /** Join the cluster. - *@handler is the handler for frames arriving from the cluster. - */ - void join(framing::FrameHandler::Chain handler); - - /** Multicast a frame to the cluster. */ - void handle(framing::AMQFrame&); + virtual ~Cluster(); /** Get the current cluster membership. */ MemberList getMembers() const; - /** Called when membership changes. */ - void setCallback(boost::function<void()>); - /** Number of members in the cluster. */ size_t size() const; + bool empty() const { return size() == 0; } + + /** Get handler chains to send frames to the cluster */ + framing::FrameHandler::Chains getToChains() { + return toChains; + } + + /** Set handler chains for frames received from the cluster */ + void setFromChains(const framing::FrameHandler::Chains& chains); + + /** Wait for predicate(*this) to be true, up to timeout. + *@return True if predicate became true, false if timed out. + *Note the predicate may not be true after wait returns, + *all the caller can say is it was true at some earlier point. + */ + bool wait(boost::function<bool(const Cluster&)> predicate, + sys::Duration timeout=sys::TIME_INFINITE) const; + private: typedef Cpg::Id Id; typedef std::map<Id, shared_ptr<Member> > MemberMap; + typedef std::map< + framing::ChannelId, framing::FrameHandler::Chains> ChannelMap; + + void mcast(framing::AMQFrame&); ///< send frame by multicast. + void notify(); ///< Notify cluster of my details. - void run(); - void notify(); - void cpgDeliver( + void deliver( cpg_handle_t /*handle*/, struct cpg_name *group, uint32_t /*nodeid*/, @@ -111,7 +102,7 @@ class Cluster : public framing::FrameHandler, private sys::Runnable { void* /*msg*/, int /*msg_len*/); - void cpgConfigChange( + void configChange( cpg_handle_t /*handle*/, struct cpg_name */*group*/, struct cpg_address */*members*/, int /*nMembers*/, @@ -119,16 +110,30 @@ class Cluster : public framing::FrameHandler, private sys::Runnable { struct cpg_address */*joined*/, int /*nJoined*/ ); + void run(); + bool handleClusterFrame(Id from, framing::AMQFrame&); + mutable sys::Monitor lock; + boost::scoped_ptr<Cpg> cpg; Cpg::Name name; std::string url; - boost::scoped_ptr<Cpg> cpg; Id self; MemberMap members; + ChannelMap channels; sys::Thread dispatcher; boost::function<void()> callback; + framing::FrameHandler::Chains toChains; + framing::FrameHandler::Chains fromChains; + + struct IncomingHandler; + struct OutgoingHandler; + + friend struct IncomingHandler; + friend struct OutgoingHandler; friend std::ostream& operator <<(std::ostream&, const Cluster&); + friend std::ostream& operator <<(std::ostream&, const MemberMap::value_type&); + friend std::ostream& operator <<(std::ostream&, const MemberMap&); }; }} // namespace qpid::cluster |