diff options
author | Alan Conway <aconway@apache.org> | 2009-02-09 22:25:26 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2009-02-09 22:25:26 +0000 |
commit | 3a60db0672b78a75c52f39f5cefeeb00d3eeba97 (patch) | |
tree | 3f9c211e3649a3ef8a883e95d741387cf402dd17 /cpp/src/qpid/cluster/Cluster.cpp | |
parent | c9a654925355a4dd128d5111af862e8be89e0a45 (diff) | |
download | qpid-python-3a60db0672b78a75c52f39f5cefeeb00d3eeba97.tar.gz |
Cluster support for message time-to-live.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@742774 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/cluster/Cluster.cpp')
-rw-r--r-- | cpp/src/qpid/cluster/Cluster.cpp | 31 |
1 files changed, 21 insertions, 10 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index 8e6ece11cc..be04eebc57 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -76,6 +76,7 @@ struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler { void ready(const std::string& url) { cluster.ready(member, url, l); } void configChange(const std::string& addresses) { cluster.configChange(member, addresses, l); } void updateOffer(uint64_t updatee, const Uuid& id) { cluster.updateOffer(member, updatee, id, l); } + void messageExpired(uint64_t id) { cluster.messageExpired(member, id, l); } void shutdown() { cluster.shutdown(member, l); } bool invoke(AMQBody& body) { return framing::invoke(*this, body).wasHandled(); } @@ -103,6 +104,8 @@ Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b, b poller), connections(*this), decoder(boost::bind(&PollableFrameQueue::push, &deliverFrameQueue, _1)), + expiryPolicy(new ExpiryPolicy(boost::bind(&Cluster::isLeader, this), mcast, myId, broker.getTimer())), + frameId(0), initialized(false), state(INIT), lastSize(0), @@ -134,6 +137,7 @@ void Cluster::initialize() { myUrl = Url::getIpAddressesUrl(broker.getPort(broker::Broker::TCP_TRANSPORT)); QPID_LOG(notice, *this << " joining cluster " << name << " with url=" << myUrl); broker.getKnownBrokers = boost::bind(&Cluster::getUrls, this); + broker.setExpiryPolicy(expiryPolicy); dispatcher.start(); deliverEventQueue.start(); deliverFrameQueue.start(); @@ -238,7 +242,8 @@ void Cluster::deliveredEvent(const Event& e) { // Handler for deliverFrameQueue void Cluster::deliveredFrame(const EventFrame& e) { - Mutex::ScopedLock l(lock); + Mutex::ScopedLock l(lock); + const_cast<AMQFrame&>(e.frame).setClusterId(frameId++); QPID_LOG(trace, *this << " DLVR: " << e); QPID_LATENCY_RECORD("delivered frame queue", e.frame); if (e.isCluster()) { // Cluster control frame @@ -333,22 +338,23 @@ void Cluster::configChange(const MemberId&, const std::string& addresses, Lock& state = JOINER; QPID_LOG(info, *this << " joining cluster: " << map); mcast.mcastControl(ClusterUpdateRequestBody(ProtocolVersion(), myUrl.str()), myId); - ClusterMap::Set members = map.getAlive(); - members.erase(myId); - myElders = members; + elders = map.getAlive(); + elders.erase(myId); broker.getLinks().setPassive(true); } } else if (state >= READY && memberChange) { memberUpdate(l); - myElders = ClusterMap::intersection(myElders, map.getAlive()); - if (myElders.empty()) { + elders = ClusterMap::intersection(elders, map.getAlive()); + if (elders.empty()) { //assume we are oldest, reactive links if necessary broker.getLinks().setPassive(false); } } } +bool Cluster::isLeader() const { return elders.empty(); } + void Cluster::tryMakeOffer(const MemberId& id, Lock& ) { if (state == READY && map.isJoiner(id)) { state = OFFER; @@ -420,15 +426,16 @@ void Cluster::updateStart(const MemberId& updatee, const Url& url, Lock&) { deliverFrameQueue.stop(); if (updateThread.id()) updateThread.join(); // Join the previous updatethread. updateThread = Thread( - new UpdateClient(myId, updatee, url, broker, map, connections.values(), - boost::bind(&Cluster::updateOutDone, this), - boost::bind(&Cluster::updateOutError, this, _1))); + new UpdateClient(myId, updatee, url, broker, map, frameId, connections.values(), + boost::bind(&Cluster::updateOutDone, this), + boost::bind(&Cluster::updateOutError, this, _1))); } // Called in update thread. -void Cluster::updateInDone(const ClusterMap& m) { +void Cluster::updateInDone(const ClusterMap& m, uint64_t fid) { Lock l(lock); updatedMap = m; + frameId = fid; checkUpdateIn(l); } @@ -573,4 +580,8 @@ void Cluster::setClusterId(const Uuid& uuid) { QPID_LOG(debug, *this << " cluster-id = " << clusterId); } +void Cluster::messageExpired(const MemberId&, uint64_t id, Lock&) { + expiryPolicy->deliverExpire(id); +} + }} // namespace qpid::cluster |