summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster/Cluster.cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2009-02-09 22:25:26 +0000
committerAlan Conway <aconway@apache.org>2009-02-09 22:25:26 +0000
commit3a60db0672b78a75c52f39f5cefeeb00d3eeba97 (patch)
tree3f9c211e3649a3ef8a883e95d741387cf402dd17 /cpp/src/qpid/cluster/Cluster.cpp
parentc9a654925355a4dd128d5111af862e8be89e0a45 (diff)
downloadqpid-python-3a60db0672b78a75c52f39f5cefeeb00d3eeba97.tar.gz
Cluster support for message time-to-live.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@742774 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/cluster/Cluster.cpp')
-rw-r--r--cpp/src/qpid/cluster/Cluster.cpp31
1 files changed, 21 insertions, 10 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp
index 8e6ece11cc..be04eebc57 100644
--- a/cpp/src/qpid/cluster/Cluster.cpp
+++ b/cpp/src/qpid/cluster/Cluster.cpp
@@ -76,6 +76,7 @@ struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler {
void ready(const std::string& url) { cluster.ready(member, url, l); }
void configChange(const std::string& addresses) { cluster.configChange(member, addresses, l); }
void updateOffer(uint64_t updatee, const Uuid& id) { cluster.updateOffer(member, updatee, id, l); }
+ void messageExpired(uint64_t id) { cluster.messageExpired(member, id, l); }
void shutdown() { cluster.shutdown(member, l); }
bool invoke(AMQBody& body) { return framing::invoke(*this, body).wasHandled(); }
@@ -103,6 +104,8 @@ Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b, b
poller),
connections(*this),
decoder(boost::bind(&PollableFrameQueue::push, &deliverFrameQueue, _1)),
+ expiryPolicy(new ExpiryPolicy(boost::bind(&Cluster::isLeader, this), mcast, myId, broker.getTimer())),
+ frameId(0),
initialized(false),
state(INIT),
lastSize(0),
@@ -134,6 +137,7 @@ void Cluster::initialize() {
myUrl = Url::getIpAddressesUrl(broker.getPort(broker::Broker::TCP_TRANSPORT));
QPID_LOG(notice, *this << " joining cluster " << name << " with url=" << myUrl);
broker.getKnownBrokers = boost::bind(&Cluster::getUrls, this);
+ broker.setExpiryPolicy(expiryPolicy);
dispatcher.start();
deliverEventQueue.start();
deliverFrameQueue.start();
@@ -238,7 +242,8 @@ void Cluster::deliveredEvent(const Event& e) {
// Handler for deliverFrameQueue
void Cluster::deliveredFrame(const EventFrame& e) {
- Mutex::ScopedLock l(lock);
+ Mutex::ScopedLock l(lock);
+ const_cast<AMQFrame&>(e.frame).setClusterId(frameId++);
QPID_LOG(trace, *this << " DLVR: " << e);
QPID_LATENCY_RECORD("delivered frame queue", e.frame);
if (e.isCluster()) { // Cluster control frame
@@ -333,22 +338,23 @@ void Cluster::configChange(const MemberId&, const std::string& addresses, Lock&
state = JOINER;
QPID_LOG(info, *this << " joining cluster: " << map);
mcast.mcastControl(ClusterUpdateRequestBody(ProtocolVersion(), myUrl.str()), myId);
- ClusterMap::Set members = map.getAlive();
- members.erase(myId);
- myElders = members;
+ elders = map.getAlive();
+ elders.erase(myId);
broker.getLinks().setPassive(true);
}
}
else if (state >= READY && memberChange) {
memberUpdate(l);
- myElders = ClusterMap::intersection(myElders, map.getAlive());
- if (myElders.empty()) {
+ elders = ClusterMap::intersection(elders, map.getAlive());
+ if (elders.empty()) {
//assume we are oldest, reactive links if necessary
broker.getLinks().setPassive(false);
}
}
}
+bool Cluster::isLeader() const { return elders.empty(); }
+
void Cluster::tryMakeOffer(const MemberId& id, Lock& ) {
if (state == READY && map.isJoiner(id)) {
state = OFFER;
@@ -420,15 +426,16 @@ void Cluster::updateStart(const MemberId& updatee, const Url& url, Lock&) {
deliverFrameQueue.stop();
if (updateThread.id()) updateThread.join(); // Join the previous updatethread.
updateThread = Thread(
- new UpdateClient(myId, updatee, url, broker, map, connections.values(),
- boost::bind(&Cluster::updateOutDone, this),
- boost::bind(&Cluster::updateOutError, this, _1)));
+ new UpdateClient(myId, updatee, url, broker, map, frameId, connections.values(),
+ boost::bind(&Cluster::updateOutDone, this),
+ boost::bind(&Cluster::updateOutError, this, _1)));
}
// Called in update thread.
-void Cluster::updateInDone(const ClusterMap& m) {
+void Cluster::updateInDone(const ClusterMap& m, uint64_t fid) {
Lock l(lock);
updatedMap = m;
+ frameId = fid;
checkUpdateIn(l);
}
@@ -573,4 +580,8 @@ void Cluster::setClusterId(const Uuid& uuid) {
QPID_LOG(debug, *this << " cluster-id = " << clusterId);
}
+void Cluster::messageExpired(const MemberId&, uint64_t id, Lock&) {
+ expiryPolicy->deliverExpire(id);
+}
+
}} // namespace qpid::cluster