summaryrefslogtreecommitdiff
path: root/cpp/src/qpid
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2009-02-09 22:25:26 +0000
committerAlan Conway <aconway@apache.org>2009-02-09 22:25:26 +0000
commit3a60db0672b78a75c52f39f5cefeeb00d3eeba97 (patch)
tree3f9c211e3649a3ef8a883e95d741387cf402dd17 /cpp/src/qpid
parentc9a654925355a4dd128d5111af862e8be89e0a45 (diff)
downloadqpid-python-3a60db0672b78a75c52f39f5cefeeb00d3eeba97.tar.gz
Cluster support for message time-to-live.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@742774 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid')
-rw-r--r--cpp/src/qpid/broker/Broker.cpp2
-rw-r--r--cpp/src/qpid/broker/Broker.h13
-rw-r--r--cpp/src/qpid/broker/ExpiryPolicy.cpp36
-rw-r--r--cpp/src/qpid/broker/ExpiryPolicy.h44
-rw-r--r--cpp/src/qpid/broker/Message.cpp22
-rw-r--r--cpp/src/qpid/broker/Message.h9
-rw-r--r--cpp/src/qpid/broker/SemanticState.cpp11
-rw-r--r--cpp/src/qpid/cluster/Cluster.cpp31
-rw-r--r--cpp/src/qpid/cluster/Cluster.h12
-rw-r--r--cpp/src/qpid/cluster/ClusterPlugin.cpp2
-rw-r--r--cpp/src/qpid/cluster/Connection.cpp8
-rw-r--r--cpp/src/qpid/cluster/Connection.h2
-rw-r--r--cpp/src/qpid/cluster/ExpiryPolicy.cpp80
-rw-r--r--cpp/src/qpid/cluster/ExpiryPolicy.h76
-rw-r--r--cpp/src/qpid/cluster/UpdateClient.cpp11
-rw-r--r--cpp/src/qpid/cluster/UpdateClient.h8
-rw-r--r--cpp/src/qpid/framing/AMQFrame.h4
-rw-r--r--cpp/src/qpid/framing/FrameSet.h4
18 files changed, 327 insertions, 48 deletions
diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp
index 091f67ec58..95f55bb596 100644
--- a/cpp/src/qpid/broker/Broker.cpp
+++ b/cpp/src/qpid/broker/Broker.cpp
@@ -30,6 +30,7 @@
#include "SecureConnectionFactory.h"
#include "TopicExchange.h"
#include "Link.h"
+#include "ExpiryPolicy.h"
#include "qmf/org/apache/qpid/broker/Package.h"
#include "qmf/org/apache/qpid/broker/ArgsBrokerEcho.h"
@@ -150,6 +151,7 @@ Broker::Broker(const Broker::Options& conf) :
queueCleaner(queues, timer),
queueEvents(poller),
recovery(true),
+ expiryPolicy(new ExpiryPolicy),
getKnownBrokers(boost::bind(&Broker::getKnownBrokersImpl, this))
{
if (conf.enableMgmt) {
diff --git a/cpp/src/qpid/broker/Broker.h b/cpp/src/qpid/broker/Broker.h
index 71b69b51aa..a52a0f67e0 100644
--- a/cpp/src/qpid/broker/Broker.h
+++ b/cpp/src/qpid/broker/Broker.h
@@ -36,6 +36,7 @@
#include "Vhost.h"
#include "System.h"
#include "Timer.h"
+#include "ExpiryPolicy.h"
#include "qpid/management/Manageable.h"
#include "qpid/management/ManagementBroker.h"
#include "qmf/org/apache/qpid/broker/Broker.h"
@@ -65,6 +66,8 @@ struct Url;
namespace broker {
+class ExpiryPolicy;
+
static const uint16_t DEFAULT_PORT=5672;
struct NoSuchTransportException : qpid::Exception
@@ -111,6 +114,8 @@ class Broker : public sys::Runnable, public Plugin::Target,
private:
typedef std::map<std::string, boost::shared_ptr<sys::ProtocolFactory> > ProtocolFactoryMap;
+ void declareStandardExchange(const std::string& name, const std::string& type);
+
boost::shared_ptr<sys::Poller> poller;
Options config;
management::ManagementAgent::Singleton managementAgentSingleton;
@@ -132,14 +137,11 @@ class Broker : public sys::Runnable, public Plugin::Target,
System::shared_ptr systemObject;
QueueCleaner queueCleaner;
QueueEvents queueEvents;
-
- void declareStandardExchange(const std::string& name, const std::string& type);
-
std::vector<Url> knownBrokers;
std::vector<Url> getKnownBrokersImpl();
std::string federationTag;
-
bool recovery;
+ boost::intrusive_ptr<ExpiryPolicy> expiryPolicy;
public:
@@ -180,6 +182,9 @@ class Broker : public sys::Runnable, public Plugin::Target,
Options& getOptions() { return config; }
QueueEvents& getQueueEvents() { return queueEvents; }
+ void setExpiryPolicy(const boost::intrusive_ptr<ExpiryPolicy>& e) { expiryPolicy = e; }
+ boost::intrusive_ptr<ExpiryPolicy> getExpiryPolicy() { return expiryPolicy; }
+
SessionManager& getSessionManager() { return sessionManager; }
const std::string& getFederationTag() const { return federationTag; }
diff --git a/cpp/src/qpid/broker/ExpiryPolicy.cpp b/cpp/src/qpid/broker/ExpiryPolicy.cpp
new file mode 100644
index 0000000000..907f1e56e1
--- /dev/null
+++ b/cpp/src/qpid/broker/ExpiryPolicy.cpp
@@ -0,0 +1,36 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "ExpiryPolicy.h"
+#include "Message.h"
+#include "qpid/sys/Time.h"
+
+namespace qpid {
+namespace broker {
+
+ExpiryPolicy::~ExpiryPolicy() {}
+
+void ExpiryPolicy::willExpire(Message&) {}
+
+bool ExpiryPolicy::hasExpired(Message& m) {
+ return m.getExpiration() < sys::AbsTime::now();
+}
+
+}} // namespace qpid::broker
diff --git a/cpp/src/qpid/broker/ExpiryPolicy.h b/cpp/src/qpid/broker/ExpiryPolicy.h
new file mode 100644
index 0000000000..1b7316f6f9
--- /dev/null
+++ b/cpp/src/qpid/broker/ExpiryPolicy.h
@@ -0,0 +1,44 @@
+#ifndef QPID_BROKER_EXPIRYPOLICY_H
+#define QPID_BROKER_EXPIRYPOLICY_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/RefCounted.h"
+
+namespace qpid {
+namespace broker {
+
+class Message;
+
+/**
+ * Default expiry policy.
+ */
+class ExpiryPolicy : public RefCounted
+{
+ public:
+ virtual ~ExpiryPolicy();
+ virtual void willExpire(Message&);
+ virtual bool hasExpired(Message&);
+};
+}} // namespace qpid::broker
+
+#endif /*!QPID_BROKER_EXPIRYPOLICY_H*/
diff --git a/cpp/src/qpid/broker/Message.cpp b/cpp/src/qpid/broker/Message.cpp
index e5a0c3e9e1..ce0477b08c 100644
--- a/cpp/src/qpid/broker/Message.cpp
+++ b/cpp/src/qpid/broker/Message.cpp
@@ -21,6 +21,7 @@
#include "Message.h"
#include "ExchangeRegistry.h"
+#include "ExpiryPolicy.h"
#include "qpid/StringUtils.h"
#include "qpid/framing/frame_functors.h"
#include "qpid/framing/FieldTable.h"
@@ -316,24 +317,29 @@ void Message::addTraceId(const std::string& id)
}
}
-void Message::setTimestamp()
+void Message::setTimestamp(const boost::intrusive_ptr<ExpiryPolicy>& e)
{
DeliveryProperties* props = getProperties<DeliveryProperties>();
- //Spec states that timestamp should be set, evaluate the
- //performance impact before re-enabling this:
- //time_t now = ::time(0);
- //props->setTimestamp(now);
if (props->getTtl()) {
- //set expiration (nb: ttl is in millisecs, time_t is in secs)
+ // AMQP requires setting the expiration property to be posix
+ // time_t in seconds. TTL is in milliseconds
time_t now = ::time(0);
props->setExpiration(now + (props->getTtl()/1000));
+ // Use higher resolution time for the internal expiry calculation.
expiration = AbsTime(AbsTime::now(), Duration(props->getTtl() * TIME_MSEC));
+ setExpiryPolicy(e);
}
}
-bool Message::hasExpired() const
+void Message::setExpiryPolicy(const boost::intrusive_ptr<ExpiryPolicy>& e) {
+ expiryPolicy = e;
+ if (expiryPolicy)
+ expiryPolicy->willExpire(*this);
+}
+
+bool Message::hasExpired()
{
- return expiration < FAR_FUTURE && expiration < AbsTime::now();
+ return expiryPolicy && expiryPolicy->hasExpired(*this);
}
boost::intrusive_ptr<Message>& Message::getReplacementMessage(const Queue* qfor) const
diff --git a/cpp/src/qpid/broker/Message.h b/cpp/src/qpid/broker/Message.h
index de716e9441..96fcf61dfc 100644
--- a/cpp/src/qpid/broker/Message.h
+++ b/cpp/src/qpid/broker/Message.h
@@ -45,6 +45,7 @@ class Exchange;
class ExchangeRegistry;
class MessageStore;
class Queue;
+class ExpiryPolicy;
class Message : public PersistableMessage {
public:
@@ -73,8 +74,11 @@ public:
const framing::FieldTable* getApplicationHeaders() const;
bool isPersistent();
bool requiresAccept();
- void setTimestamp();
- bool hasExpired() const;
+
+ void setTimestamp(const boost::intrusive_ptr<ExpiryPolicy>& e);
+ void setExpiryPolicy(const boost::intrusive_ptr<ExpiryPolicy>& e);
+ bool hasExpired();
+ sys::AbsTime getExpiration() const { return expiration; }
framing::FrameSet& getFrames() { return frames; }
const framing::FrameSet& getFrames() const { return frames; }
@@ -171,6 +175,7 @@ public:
ConnectionToken* publisher;
mutable MessageAdapter* adapter;
qpid::sys::AbsTime expiration;
+ boost::intrusive_ptr<ExpiryPolicy> expiryPolicy;
static TransferAdapter TRANSFER;
diff --git a/cpp/src/qpid/broker/SemanticState.cpp b/cpp/src/qpid/broker/SemanticState.cpp
index f9f75679e5..13a8c649d2 100644
--- a/cpp/src/qpid/broker/SemanticState.cpp
+++ b/cpp/src/qpid/broker/SemanticState.cpp
@@ -358,14 +358,13 @@ void SemanticState::route(intrusive_ptr<Message> msg, Deliverable& strategy) {
std::string exchangeName = msg->getExchangeName();
//TODO: the following should be hidden behind message (using MessageAdapter or similar)
- // Do not replace the delivery-properties.exchange if it is is already set.
- // This is used internally (by the cluster) to force the exchange name on a message.
- // The client library ensures this is always empty for messages from normal clients.
if (msg->isA<MessageTransferBody>()) {
- if (!msg->hasProperties<DeliveryProperties>() ||
- msg->getProperties<DeliveryProperties>()->getExchange().empty())
+ // Do not replace the delivery-properties.exchange if it is is already set.
+ // This is used internally (by the cluster) to force the exchange name on a message.
+ // The client library ensures this is always empty for messages from normal clients.
+ if (!msg->hasProperties<DeliveryProperties>() || msg->getProperties<DeliveryProperties>()->getExchange().empty())
msg->getProperties<DeliveryProperties>()->setExchange(exchangeName);
- msg->setTimestamp();
+ msg->setTimestamp(getSession().getBroker().getExpiryPolicy());
}
if (!cacheExchange || cacheExchange->getName() != exchangeName){
cacheExchange = session.getBroker().getExchanges().get(exchangeName);
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp
index 8e6ece11cc..be04eebc57 100644
--- a/cpp/src/qpid/cluster/Cluster.cpp
+++ b/cpp/src/qpid/cluster/Cluster.cpp
@@ -76,6 +76,7 @@ struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler {
void ready(const std::string& url) { cluster.ready(member, url, l); }
void configChange(const std::string& addresses) { cluster.configChange(member, addresses, l); }
void updateOffer(uint64_t updatee, const Uuid& id) { cluster.updateOffer(member, updatee, id, l); }
+ void messageExpired(uint64_t id) { cluster.messageExpired(member, id, l); }
void shutdown() { cluster.shutdown(member, l); }
bool invoke(AMQBody& body) { return framing::invoke(*this, body).wasHandled(); }
@@ -103,6 +104,8 @@ Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b, b
poller),
connections(*this),
decoder(boost::bind(&PollableFrameQueue::push, &deliverFrameQueue, _1)),
+ expiryPolicy(new ExpiryPolicy(boost::bind(&Cluster::isLeader, this), mcast, myId, broker.getTimer())),
+ frameId(0),
initialized(false),
state(INIT),
lastSize(0),
@@ -134,6 +137,7 @@ void Cluster::initialize() {
myUrl = Url::getIpAddressesUrl(broker.getPort(broker::Broker::TCP_TRANSPORT));
QPID_LOG(notice, *this << " joining cluster " << name << " with url=" << myUrl);
broker.getKnownBrokers = boost::bind(&Cluster::getUrls, this);
+ broker.setExpiryPolicy(expiryPolicy);
dispatcher.start();
deliverEventQueue.start();
deliverFrameQueue.start();
@@ -238,7 +242,8 @@ void Cluster::deliveredEvent(const Event& e) {
// Handler for deliverFrameQueue
void Cluster::deliveredFrame(const EventFrame& e) {
- Mutex::ScopedLock l(lock);
+ Mutex::ScopedLock l(lock);
+ const_cast<AMQFrame&>(e.frame).setClusterId(frameId++);
QPID_LOG(trace, *this << " DLVR: " << e);
QPID_LATENCY_RECORD("delivered frame queue", e.frame);
if (e.isCluster()) { // Cluster control frame
@@ -333,22 +338,23 @@ void Cluster::configChange(const MemberId&, const std::string& addresses, Lock&
state = JOINER;
QPID_LOG(info, *this << " joining cluster: " << map);
mcast.mcastControl(ClusterUpdateRequestBody(ProtocolVersion(), myUrl.str()), myId);
- ClusterMap::Set members = map.getAlive();
- members.erase(myId);
- myElders = members;
+ elders = map.getAlive();
+ elders.erase(myId);
broker.getLinks().setPassive(true);
}
}
else if (state >= READY && memberChange) {
memberUpdate(l);
- myElders = ClusterMap::intersection(myElders, map.getAlive());
- if (myElders.empty()) {
+ elders = ClusterMap::intersection(elders, map.getAlive());
+ if (elders.empty()) {
//assume we are oldest, reactive links if necessary
broker.getLinks().setPassive(false);
}
}
}
+bool Cluster::isLeader() const { return elders.empty(); }
+
void Cluster::tryMakeOffer(const MemberId& id, Lock& ) {
if (state == READY && map.isJoiner(id)) {
state = OFFER;
@@ -420,15 +426,16 @@ void Cluster::updateStart(const MemberId& updatee, const Url& url, Lock&) {
deliverFrameQueue.stop();
if (updateThread.id()) updateThread.join(); // Join the previous updatethread.
updateThread = Thread(
- new UpdateClient(myId, updatee, url, broker, map, connections.values(),
- boost::bind(&Cluster::updateOutDone, this),
- boost::bind(&Cluster::updateOutError, this, _1)));
+ new UpdateClient(myId, updatee, url, broker, map, frameId, connections.values(),
+ boost::bind(&Cluster::updateOutDone, this),
+ boost::bind(&Cluster::updateOutError, this, _1)));
}
// Called in update thread.
-void Cluster::updateInDone(const ClusterMap& m) {
+void Cluster::updateInDone(const ClusterMap& m, uint64_t fid) {
Lock l(lock);
updatedMap = m;
+ frameId = fid;
checkUpdateIn(l);
}
@@ -573,4 +580,8 @@ void Cluster::setClusterId(const Uuid& uuid) {
QPID_LOG(debug, *this << " cluster-id = " << clusterId);
}
+void Cluster::messageExpired(const MemberId&, uint64_t id, Lock&) {
+ expiryPolicy->deliverExpire(id);
+}
+
}} // namespace qpid::cluster
diff --git a/cpp/src/qpid/cluster/Cluster.h b/cpp/src/qpid/cluster/Cluster.h
index f7955aa743..4d994943f7 100644
--- a/cpp/src/qpid/cluster/Cluster.h
+++ b/cpp/src/qpid/cluster/Cluster.h
@@ -31,6 +31,7 @@
#include "Quorum.h"
#include "Decoder.h"
#include "PollableQueue.h"
+#include "ExpiryPolicy.h"
#include "qpid/broker/Broker.h"
#include "qpid/sys/Monitor.h"
@@ -89,7 +90,7 @@ class Cluster : private Cpg::Handler, public management::Manageable {
void leave();
// Update completed - called in update thread
- void updateInDone(const ClusterMap&);
+ void updateInDone(const ClusterMap&, uint64_t frameId);
MemberId getId() const;
broker::Broker& getBroker() const;
@@ -100,6 +101,8 @@ class Cluster : private Cpg::Handler, public management::Manageable {
size_t getReadMax() { return readMax; }
size_t getWriteEstimate() { return writeEstimate; }
+
+ bool isLeader() const; // Called in deliver thread.
private:
typedef sys::Monitor::ScopedLock Lock;
@@ -129,6 +132,7 @@ class Cluster : private Cpg::Handler, public management::Manageable {
void updateOffer(const MemberId& updater, uint64_t updatee, const framing::Uuid&, Lock&);
void ready(const MemberId&, const std::string&, Lock&);
void configChange(const MemberId&, const std::string& addresses, Lock& l);
+ void messageExpired(const MemberId&, uint64_t, Lock& l);
void shutdown(const MemberId&, Lock&);
void deliveredEvent(const Event&);
void deliveredFrame(const EventFrame&);
@@ -185,7 +189,6 @@ class Cluster : private Cpg::Handler, public management::Manageable {
const size_t writeEstimate;
framing::Uuid clusterId;
NoOpConnectionOutputHandler shadowOut;
- ClusterMap::Set myElders;
qpid::management::ManagementAgent* mAgent;
// Thread safe members
@@ -197,8 +200,11 @@ class Cluster : private Cpg::Handler, public management::Manageable {
boost::shared_ptr<FailoverExchange> failoverExchange;
Quorum quorum;
- // Called only from event delivery thread
+ // Used only in delivery thread
Decoder decoder;
+ ClusterMap::Set elders;
+ boost::intrusive_ptr<ExpiryPolicy> expiryPolicy;
+ uint64_t frameId;
// Used only during initialization
bool initialized;
diff --git a/cpp/src/qpid/cluster/ClusterPlugin.cpp b/cpp/src/qpid/cluster/ClusterPlugin.cpp
index 2f7d12dcfe..d54d8389e0 100644
--- a/cpp/src/qpid/cluster/ClusterPlugin.cpp
+++ b/cpp/src/qpid/cluster/ClusterPlugin.cpp
@@ -54,7 +54,7 @@ struct ClusterValues {
bool quorum;
size_t readMax, writeEstimate;
- ClusterValues() : quorum(false), readMax(3), writeEstimate(64) {}
+ ClusterValues() : quorum(false), readMax(10), writeEstimate(64) {}
Url getUrl(uint16_t port) const {
if (url.empty()) return Url::getIpAddressesUrl(port);
diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp
index 9ea79fa2b6..295705e967 100644
--- a/cpp/src/qpid/cluster/Connection.cpp
+++ b/cpp/src/qpid/cluster/Connection.cpp
@@ -127,10 +127,6 @@ bool Connection::checkUnsupported(const AMQBody& body) {
case DTX_CLASS_ID: message = "DTX transactions are not currently supported by cluster."; break;
}
}
- else if (body.type() == HEADER_BODY) {
- const DeliveryProperties* dp = static_cast<const AMQHeaderBody&>(body).get<DeliveryProperties>();
- if (dp && dp->getTtl()) message = "Message TTL is not currently supported by cluster.";
- }
if (!message.empty())
connection.close(connection::CLOSE_CODE_FRAMING_ERROR, message);
return !message.empty();
@@ -259,9 +255,9 @@ void Connection::shadowReady(uint64_t memberId, uint64_t connectionId) {
self = shadow;
}
-void Connection::membership(const FieldTable& joiners, const FieldTable& members) {
+void Connection::membership(const FieldTable& joiners, const FieldTable& members, uint64_t frameId) {
QPID_LOG(debug, cluster << " incoming update complete on connection " << *this);
- cluster.updateInDone(ClusterMap(joiners, members));
+ cluster.updateInDone(ClusterMap(joiners, members), frameId);
self.second = 0; // Mark this as completed update connection.
}
diff --git a/cpp/src/qpid/cluster/Connection.h b/cpp/src/qpid/cluster/Connection.h
index 160855dc2d..a6e9aa65f9 100644
--- a/cpp/src/qpid/cluster/Connection.h
+++ b/cpp/src/qpid/cluster/Connection.h
@@ -119,7 +119,7 @@ class Connection :
void shadowReady(uint64_t memberId, uint64_t connectionId);
- void membership(const framing::FieldTable&, const framing::FieldTable&);
+ void membership(const framing::FieldTable&, const framing::FieldTable&, uint64_t frameId);
void deliveryRecord(const std::string& queue,
const framing::SequenceNumber& position,
diff --git a/cpp/src/qpid/cluster/ExpiryPolicy.cpp b/cpp/src/qpid/cluster/ExpiryPolicy.cpp
new file mode 100644
index 0000000000..690acfc3ad
--- /dev/null
+++ b/cpp/src/qpid/cluster/ExpiryPolicy.cpp
@@ -0,0 +1,80 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "ExpiryPolicy.h"
+#include "Multicaster.h"
+#include "qpid/framing/ClusterMessageExpiredBody.h"
+#include "qpid/sys/Time.h"
+#include "qpid/broker/Message.h"
+#include "qpid/broker/Timer.h"
+#include "qpid/log/Statement.h"
+
+namespace qpid {
+namespace cluster {
+
+ExpiryPolicy::ExpiryPolicy(const boost::function<bool()> & f, Multicaster& m, const MemberId& id, broker::Timer& t)
+ : expiredPolicy(new Expired), isLeader(f), mcast(m), memberId(id), timer(t) {}
+
+namespace {
+uint64_t clusterId(const broker::Message& m) {
+ assert(m.getFrames().begin() != m.getFrames().end());
+ return m.getFrames().begin()->getClusterId();
+}
+
+struct ExpiryTask : public broker::TimerTask {
+ ExpiryTask(const boost::intrusive_ptr<ExpiryPolicy>& policy, uint64_t id, sys::AbsTime when)
+ : TimerTask(when), expiryPolicy(policy), messageId(id) {}
+ void fire() { expiryPolicy->sendExpire(messageId); }
+ boost::intrusive_ptr<ExpiryPolicy> expiryPolicy;
+ const uint64_t messageId;
+};
+}
+
+void ExpiryPolicy::willExpire(broker::Message& m) {
+ timer.add(new ExpiryTask(this, clusterId(m), m.getExpiration()));
+}
+
+bool ExpiryPolicy::hasExpired(broker::Message& m) {
+ sys::Mutex::ScopedLock l(lock);
+ IdSet::iterator i = expired.find(clusterId(m));
+ if (i != expired.end()) {
+ expired.erase(i);
+ const_cast<broker::Message&>(m).setExpiryPolicy(expiredPolicy); // hasExpired() == true;
+ return true;
+ }
+ return false;
+}
+
+void ExpiryPolicy::sendExpire(uint64_t id) {
+ sys::Mutex::ScopedLock l(lock);
+ if (isLeader())
+ mcast.mcastControl(framing::ClusterMessageExpiredBody(framing::ProtocolVersion(), id), memberId);
+}
+
+void ExpiryPolicy::deliverExpire(uint64_t id) {
+ sys::Mutex::ScopedLock l(lock);
+ expired.insert(id);
+}
+
+bool ExpiryPolicy::Expired::hasExpired(broker::Message&) { return true; }
+void ExpiryPolicy::Expired::willExpire(broker::Message&) { }
+
+}} // namespace qpid::cluster
diff --git a/cpp/src/qpid/cluster/ExpiryPolicy.h b/cpp/src/qpid/cluster/ExpiryPolicy.h
new file mode 100644
index 0000000000..7fb63c731e
--- /dev/null
+++ b/cpp/src/qpid/cluster/ExpiryPolicy.h
@@ -0,0 +1,76 @@
+#ifndef QPID_CLUSTER_EXPIRYPOLICY_H
+#define QPID_CLUSTER_EXPIRYPOLICY_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "types.h"
+#include "qpid/broker/ExpiryPolicy.h"
+#include "qpid/sys/Mutex.h"
+#include <boost/function.hpp>
+#include <boost/intrusive_ptr.hpp>
+#include <set>
+
+namespace qpid {
+
+namespace broker { class Timer; }
+
+namespace cluster {
+class Multicaster;
+
+/**
+ * Cluster expiry policy
+ */
+class ExpiryPolicy : public broker::ExpiryPolicy
+{
+ public:
+ ExpiryPolicy(const boost::function<bool()> & isLeader, Multicaster&, const MemberId&, broker::Timer&);
+
+ void willExpire(broker::Message&);
+
+ bool hasExpired(broker::Message&);
+
+ // Send expiration notice to cluster.
+ void sendExpire(uint64_t);
+
+ // Cluster delivers expiry notice.
+ void deliverExpire(uint64_t);
+
+ private:
+ sys::Mutex lock;
+ typedef std::set<uint64_t> IdSet;
+
+ struct Expired : public broker::ExpiryPolicy {
+ bool hasExpired(broker::Message&);
+ void willExpire(broker::Message&);
+ };
+
+ IdSet expired;
+ boost::intrusive_ptr<Expired> expiredPolicy;
+ boost::function<bool()> isLeader;
+ Multicaster& mcast;
+ MemberId memberId;
+ broker::Timer& timer;
+};
+
+}} // namespace qpid::cluster
+
+#endif /*!QPID_CLUSTER_EXPIRYPOLICY_H*/
diff --git a/cpp/src/qpid/cluster/UpdateClient.cpp b/cpp/src/qpid/cluster/UpdateClient.cpp
index 91d4c6d3ce..e50c936b50 100644
--- a/cpp/src/qpid/cluster/UpdateClient.cpp
+++ b/cpp/src/qpid/cluster/UpdateClient.cpp
@@ -86,10 +86,12 @@ void send(client::AsyncSession& s, const AMQBody& body) {
// TODO aconway 2008-09-24: optimization: update connections/sessions in parallel.
UpdateClient::UpdateClient(const MemberId& updater, const MemberId& updatee, const Url& url,
- broker::Broker& broker, const ClusterMap& m, const Cluster::Connections& cons,
- const boost::function<void()>& ok,
- const boost::function<void(const std::exception&)>& fail)
- : updaterId(updater), updateeId(updatee), updateeUrl(url), updaterBroker(broker), map(m), connections(cons),
+ broker::Broker& broker, const ClusterMap& m, uint64_t frameId_,
+ const Cluster::Connections& cons,
+ const boost::function<void()>& ok,
+ const boost::function<void(const std::exception&)>& fail)
+ : updaterId(updater), updateeId(updatee), updateeUrl(url), updaterBroker(broker), map(m),
+ frameId(frameId_), connections(cons),
connection(catchUpConnection()), shadowConnection(catchUpConnection()),
done(ok), failed(fail)
{
@@ -120,6 +122,7 @@ void UpdateClient::update() {
ClusterConnectionMembershipBody membership;
map.toMethodBody(membership);
+ membership.setFrameId(frameId);
AMQFrame frame(membership);
client::ConnectionAccess::getImpl(connection)->handle(frame);
connection.close();
diff --git a/cpp/src/qpid/cluster/UpdateClient.h b/cpp/src/qpid/cluster/UpdateClient.h
index 93dca9f0c6..0819eb4cdb 100644
--- a/cpp/src/qpid/cluster/UpdateClient.h
+++ b/cpp/src/qpid/cluster/UpdateClient.h
@@ -63,9 +63,10 @@ class UpdateClient : public sys::Runnable {
static const std::string UPDATE; // Name for special update queue and exchange.
UpdateClient(const MemberId& updater, const MemberId& updatee, const Url&,
- broker::Broker& donor, const ClusterMap& map, const std::vector<boost::intrusive_ptr<Connection> >& ,
- const boost::function<void()>& done,
- const boost::function<void(const std::exception&)>& fail);
+ broker::Broker& donor, const ClusterMap& map, uint64_t sequence,
+ const std::vector<boost::intrusive_ptr<Connection> >& ,
+ const boost::function<void()>& done,
+ const boost::function<void(const std::exception&)>& fail);
~UpdateClient();
void update();
@@ -89,6 +90,7 @@ class UpdateClient : public sys::Runnable {
Url updateeUrl;
broker::Broker& updaterBroker;
ClusterMap map;
+ uint64_t frameId;
std::vector<boost::intrusive_ptr<Connection> > connections;
client::Connection connection, shadowConnection;
client::AsyncSession session, shadowSession;
diff --git a/cpp/src/qpid/framing/AMQFrame.h b/cpp/src/qpid/framing/AMQFrame.h
index 02a1ea4622..028d0c1d8a 100644
--- a/cpp/src/qpid/framing/AMQFrame.h
+++ b/cpp/src/qpid/framing/AMQFrame.h
@@ -92,6 +92,9 @@ class AMQFrame : public AMQDataBlock, public sys::LatencyMetricTimestamp
/** Must point to at least DECODE_SIZE_MIN bytes of data */
static uint16_t decodeSize(char* data);
+ uint64_t getClusterId() const { return clusterId; }
+ void setClusterId(uint64_t id) { clusterId = id; }
+
private:
void init();
@@ -103,6 +106,7 @@ class AMQFrame : public AMQDataBlock, public sys::LatencyMetricTimestamp
bool bos : 1;
bool eos : 1;
mutable uint32_t encodedSizeCache;
+ uint64_t clusterId; // Used to identify frames in a clustered broekr.
};
std::ostream& operator<<(std::ostream&, const AMQFrame&);
diff --git a/cpp/src/qpid/framing/FrameSet.h b/cpp/src/qpid/framing/FrameSet.h
index 82987910a7..b13ca16e97 100644
--- a/cpp/src/qpid/framing/FrameSet.h
+++ b/cpp/src/qpid/framing/FrameSet.h
@@ -49,6 +49,7 @@ public:
bool isComplete() const;
uint64_t getContentSize() const;
+
void getContent(std::string&) const;
std::string getContent() const;
@@ -73,6 +74,9 @@ public:
return header ? header->get<T>() : 0;
}
+ Frames::const_iterator begin() const { return parts.begin(); }
+ Frames::const_iterator end() const { return parts.end(); }
+
const SequenceNumber& getId() const { return id; }
template <class P> void remove(P predicate) {