diff options
author | Alan Conway <aconway@apache.org> | 2007-06-29 17:59:00 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2007-06-29 17:59:00 +0000 |
commit | fda6dadde945a9c73c97b73dc79e93368b743348 (patch) | |
tree | d7755539ae485efdfbc46298cd1ef6632515159e /cpp/src/qpid/cluster/Cluster.cpp | |
parent | 79cd6c772da003ddc917eff362f9adaa99e28b49 (diff) | |
download | qpid-python-fda6dadde945a9c73c97b73dc79e93368b743348.tar.gz |
* Summary:
- Improved plugin framework and HandlerUpdater interface.
- Cluster handlers for traffic to/from cluster.
- Cluster HandlerUpdater configures channel chains for cluster.
- Cluster PluginProvider registers cluster objects with broker.
* src/qpid/framing/AMQFrame.h: Made data members public. Handlers
need to be able to modify frame data, getters/setters are just a
nuisance here.
* src/tests/Cluster.cpp: Updated for cluster changes, using
handlers instead of friendship to hook test into Cluster code.
* src/qpid/framing/amqp_types.h: Added CHANNEL_MAX and
CHANNEL_HIGH_BIT constants.
* src/qpid/framing/HandlerUpdater.h: Renamed ChannelInitializer,
broke dependency on broker channel types.
* src/qpid/framing/Handler.h: Added constructors and nextHandler()
* src/qpid/framing/AMQFrame.h (class AMQFrame): Inlined getChannel()
* src/qpid/cluster/ClusterPluginProvider.cpp: Provider for cluster
plugins.
* src/qpid/cluster/Cluster.cpp: Use ChannelManager. Factor out
plugin details to ClusterPluginProvider.
* src/qpid/cluster/ChannelManager.h: Insert cluster handlers
into channel chains, route frames between cluster and channels.
* src/qpid/broker/BrokerAdapter.cpp (startOk): use CHANNEL_MAX
constant.
* src/qpid/broker/Broker.cpp:
- Refactored for new plugin framework.
- Added getUrl().
* src/qpid/Url.h: Added constructor from Address.
* src/qpid/Plugin.h: Generalized plugin framework, broke
dependency on Broker interfaces. We may want to use plug-ins for
clients also at some point.
* src/tests/run_test: Fix bug when VALGRIND is not set.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@551981 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/cluster/Cluster.cpp')
-rw-r--r-- | cpp/src/qpid/cluster/Cluster.cpp | 121 |
1 files changed, 75 insertions, 46 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index 30073c4551..8d898eefa3 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -17,13 +17,13 @@ */ #include "Cluster.h" -#include "Cpg.h" #include "qpid/framing/AMQFrame.h" #include "qpid/framing/ClusterNotifyBody.h" #include "qpid/log/Statement.h" #include <boost/bind.hpp> #include <algorithm> #include <iterator> +#include <map> namespace qpid { namespace cluster { @@ -35,40 +35,62 @@ ostream& operator <<(ostream& out, const Cluster& cluster) { return out << cluster.name.str() << "(" << cluster.self << ")"; } +namespace { +Cluster::Member::Status statusMap[CPG_REASON_PROCDOWN+1]; +struct StatusMapInit { + StatusMapInit() { + statusMap[CPG_REASON_JOIN] = Cluster::Member::JOIN; + statusMap[CPG_REASON_LEAVE] = Cluster::Member::LEAVE; + statusMap[CPG_REASON_NODEDOWN] = Cluster::Member::NODEDOWN; + statusMap[CPG_REASON_NODEUP] = Cluster::Member::NODEUP; + statusMap[CPG_REASON_PROCDOWN] = Cluster::Member::PROCDOWN; + } +} instance; +} + +Cluster::Member::Member(const cpg_address& addr) + : status(statusMap[addr.reason]) {} + void Cluster::notify() { + ProtocolVersion version; // TODO aconway 2007-06-25: Use proxy here. AMQFrame frame(version, 0, make_shared_ptr(new ClusterNotifyBody(version, url))); handle(frame); } -Cluster::Cluster( - const std::string& name_, const std::string& url_, FrameHandler& next_, - ProtocolVersion ver) - : name(name_), url(url_), version(ver), - cpg(new Cpg(boost::bind(&Cluster::cpgDeliver, this, _1, _2, _3, _4, _5, _6), - boost::bind(&Cluster::cpgConfigChange, this, _1, _2, _3, _4, _5, _6, _7, _8))), - next(next_) -{ - self=Id(cpg->getLocalNoideId(), getpid()); +Cluster::Cluster(const std::string& name_, const std::string& url_) : + name(name_), + url(url_), + cpg(new Cpg( + boost::bind(&Cluster::cpgDeliver, this, _1, _2, _3, _4, _5, _6), + boost::bind(&Cluster::cpgConfigChange, this, _1, _2, _3, _4, _5, _6, _7, _8))), + self(cpg->getLocalNoideId(), getpid()) +{} + +void Cluster::join(FrameHandler::Chain next) { QPID_LOG(trace, *this << " Joining cluster."); + next = next; + dispatcher=Thread(*this); cpg->join(name); notify(); - dispatcher=Thread(*this); } Cluster::~Cluster() { - try { - QPID_LOG(trace, *this << " Leaving cluster."); - cpg->leave(name); - cpg.reset(); - dispatcher.join(); - } catch (const std::exception& e) { - QPID_LOG(error, "Exception leaving cluster " << e.what()); + if (cpg) { + try { + QPID_LOG(trace, *this << " Leaving cluster."); + cpg->leave(name); + cpg.reset(); + dispatcher.join(); + } catch (const std::exception& e) { + QPID_LOG(error, "Exception leaving cluster " << e.what()); + } } } void Cluster::handle(AMQFrame& frame) { + assert(cpg); QPID_LOG(trace, *this << " SEND: " << frame); Buffer buf(frame.size()); frame.encode(buf); @@ -104,52 +126,59 @@ void Cluster::cpgDeliver( frame.decode(buf); QPID_LOG(trace, *this << " RECV: " << frame); // TODO aconway 2007-06-20: use visitor pattern. - ClusterNotifyBody* notifyIn= dynamic_cast<ClusterNotifyBody*>(frame.getBody().get()); + ClusterNotifyBody* notifyIn= + dynamic_cast<ClusterNotifyBody*>(frame.getBody().get()); if (notifyIn) { - Mutex::ScopedLock l(lock); - members[from].reset(new Member(notifyIn->getUrl())); - lock.notifyAll(); + { + Mutex::ScopedLock l(lock); + assert(members[from]); + members[from]->url = notifyIn->getUrl(); + members[from]->status = Member::BROKER; + } + if (callback) + callback(); } else - next.handle(frame); + next->handle(frame); } void Cluster::cpgConfigChange( cpg_handle_t /*handle*/, struct cpg_name */*group*/, - struct cpg_address *ccMembers, int nMembers, + struct cpg_address *current, int nCurrent, struct cpg_address *left, int nLeft, struct cpg_address *joined, int nJoined ) { - QPID_LOG( - trace, - *this << " Configuration change. " << endl - << " Joined: " << make_pair(joined, nJoined) << endl - << " Left: " << make_pair(left, nLeft) << endl - << " Current: " << make_pair(ccMembers, nMembers)); - + QPID_LOG(trace, + *this << " Configuration change. " << endl + << " Joined: " << make_pair(joined, nJoined) << endl + << " Left: " << make_pair(left, nLeft) << endl + << " Current: " << make_pair(current, nCurrent)); + + bool needNotify=false; + MemberList updated; { Mutex::ScopedLock l(lock); - // Erase members that left. - for (int i = 0; i < nLeft; ++i) - members.erase(Id(left[i])); - lock.notifyAll(); - } - - // If there are new members (other than myself) then notify. - for (int i=0; i< nJoined; ++i) { - if (Id(joined[i]) != self) { - notify(); - break; + for (int i = 0; i < nJoined; ++i) { + Id id(current[i]); + members[id].reset(new Member(current[i])); + if (id != self) + needNotify = true; // Notify new members other than myself. } - } - - // Note: New members are be added to my map when cpgDeliver - // gets a cluster.notify frame. + for (int i = 0; i < nLeft; ++i) + members.erase(Id(current[i])); + } // End of locked scope. + if (needNotify) + notify(); + if (callback) + callback(); } +void Cluster::setCallback(boost::function<void()> f) { callback=f; } + void Cluster::run() { + assert(cpg); cpg->dispatchBlocking(); } |