summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2009-02-09 22:25:26 +0000
committerAlan Conway <aconway@apache.org>2009-02-09 22:25:26 +0000
commit075450d6d19fe5cb2d18e1e006312af9ded24e06 (patch)
tree2a9efab54a88c2a13bac3c1ba9774966de24b9af
parentd7ba7d6fdf756080b2862a48a892526ef40e163f (diff)
downloadqpid-python-075450d6d19fe5cb2d18e1e006312af9ded24e06.tar.gz
Cluster support for message time-to-live.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@742774 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/Makefile.am2
-rw-r--r--qpid/cpp/src/cluster.mk2
-rw-r--r--qpid/cpp/src/qpid/broker/Broker.cpp2
-rw-r--r--qpid/cpp/src/qpid/broker/Broker.h13
-rw-r--r--qpid/cpp/src/qpid/broker/ExpiryPolicy.cpp36
-rw-r--r--qpid/cpp/src/qpid/broker/ExpiryPolicy.h44
-rw-r--r--qpid/cpp/src/qpid/broker/Message.cpp22
-rw-r--r--qpid/cpp/src/qpid/broker/Message.h9
-rw-r--r--qpid/cpp/src/qpid/broker/SemanticState.cpp11
-rw-r--r--qpid/cpp/src/qpid/cluster/Cluster.cpp31
-rw-r--r--qpid/cpp/src/qpid/cluster/Cluster.h12
-rw-r--r--qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp2
-rw-r--r--qpid/cpp/src/qpid/cluster/Connection.cpp8
-rw-r--r--qpid/cpp/src/qpid/cluster/Connection.h2
-rw-r--r--qpid/cpp/src/qpid/cluster/ExpiryPolicy.cpp80
-rw-r--r--qpid/cpp/src/qpid/cluster/ExpiryPolicy.h76
-rw-r--r--qpid/cpp/src/qpid/cluster/UpdateClient.cpp11
-rw-r--r--qpid/cpp/src/qpid/cluster/UpdateClient.h8
-rw-r--r--qpid/cpp/src/qpid/framing/AMQFrame.h4
-rw-r--r--qpid/cpp/src/qpid/framing/FrameSet.h4
-rw-r--r--qpid/cpp/src/tests/QueueTest.cpp3
-rw-r--r--qpid/cpp/src/tests/cluster_test.cpp125
-rw-r--r--qpid/cpp/xml/cluster.xml5
23 files changed, 421 insertions, 91 deletions
diff --git a/qpid/cpp/src/Makefile.am b/qpid/cpp/src/Makefile.am
index e07267da37..f46c204f79 100644
--- a/qpid/cpp/src/Makefile.am
+++ b/qpid/cpp/src/Makefile.am
@@ -358,6 +358,8 @@ libqpidbroker_la_SOURCES = \
qpid/broker/Broker.cpp \
qpid/broker/BrokerSingleton.cpp \
qpid/broker/Exchange.cpp \
+ qpid/broker/ExpiryPolicy.h \
+ qpid/broker/ExpiryPolicy.cpp \
qpid/broker/Queue.cpp \
qpid/broker/QueueCleaner.cpp \
qpid/broker/QueueListeners.cpp \
diff --git a/qpid/cpp/src/cluster.mk b/qpid/cpp/src/cluster.mk
index a588e79c96..8880493bf5 100644
--- a/qpid/cpp/src/cluster.mk
+++ b/qpid/cpp/src/cluster.mk
@@ -63,6 +63,8 @@ cluster_la_SOURCES = \
qpid/cluster/Event.h \
qpid/cluster/EventFrame.h \
qpid/cluster/EventFrame.cpp \
+ qpid/cluster/ExpiryPolicy.h \
+ qpid/cluster/ExpiryPolicy.cpp \
qpid/cluster/FailoverExchange.cpp \
qpid/cluster/FailoverExchange.h \
qpid/cluster/Multicaster.cpp \
diff --git a/qpid/cpp/src/qpid/broker/Broker.cpp b/qpid/cpp/src/qpid/broker/Broker.cpp
index 091f67ec58..95f55bb596 100644
--- a/qpid/cpp/src/qpid/broker/Broker.cpp
+++ b/qpid/cpp/src/qpid/broker/Broker.cpp
@@ -30,6 +30,7 @@
#include "SecureConnectionFactory.h"
#include "TopicExchange.h"
#include "Link.h"
+#include "ExpiryPolicy.h"
#include "qmf/org/apache/qpid/broker/Package.h"
#include "qmf/org/apache/qpid/broker/ArgsBrokerEcho.h"
@@ -150,6 +151,7 @@ Broker::Broker(const Broker::Options& conf) :
queueCleaner(queues, timer),
queueEvents(poller),
recovery(true),
+ expiryPolicy(new ExpiryPolicy),
getKnownBrokers(boost::bind(&Broker::getKnownBrokersImpl, this))
{
if (conf.enableMgmt) {
diff --git a/qpid/cpp/src/qpid/broker/Broker.h b/qpid/cpp/src/qpid/broker/Broker.h
index 71b69b51aa..a52a0f67e0 100644
--- a/qpid/cpp/src/qpid/broker/Broker.h
+++ b/qpid/cpp/src/qpid/broker/Broker.h
@@ -36,6 +36,7 @@
#include "Vhost.h"
#include "System.h"
#include "Timer.h"
+#include "ExpiryPolicy.h"
#include "qpid/management/Manageable.h"
#include "qpid/management/ManagementBroker.h"
#include "qmf/org/apache/qpid/broker/Broker.h"
@@ -65,6 +66,8 @@ struct Url;
namespace broker {
+class ExpiryPolicy;
+
static const uint16_t DEFAULT_PORT=5672;
struct NoSuchTransportException : qpid::Exception
@@ -111,6 +114,8 @@ class Broker : public sys::Runnable, public Plugin::Target,
private:
typedef std::map<std::string, boost::shared_ptr<sys::ProtocolFactory> > ProtocolFactoryMap;
+ void declareStandardExchange(const std::string& name, const std::string& type);
+
boost::shared_ptr<sys::Poller> poller;
Options config;
management::ManagementAgent::Singleton managementAgentSingleton;
@@ -132,14 +137,11 @@ class Broker : public sys::Runnable, public Plugin::Target,
System::shared_ptr systemObject;
QueueCleaner queueCleaner;
QueueEvents queueEvents;
-
- void declareStandardExchange(const std::string& name, const std::string& type);
-
std::vector<Url> knownBrokers;
std::vector<Url> getKnownBrokersImpl();
std::string federationTag;
-
bool recovery;
+ boost::intrusive_ptr<ExpiryPolicy> expiryPolicy;
public:
@@ -180,6 +182,9 @@ class Broker : public sys::Runnable, public Plugin::Target,
Options& getOptions() { return config; }
QueueEvents& getQueueEvents() { return queueEvents; }
+ void setExpiryPolicy(const boost::intrusive_ptr<ExpiryPolicy>& e) { expiryPolicy = e; }
+ boost::intrusive_ptr<ExpiryPolicy> getExpiryPolicy() { return expiryPolicy; }
+
SessionManager& getSessionManager() { return sessionManager; }
const std::string& getFederationTag() const { return federationTag; }
diff --git a/qpid/cpp/src/qpid/broker/ExpiryPolicy.cpp b/qpid/cpp/src/qpid/broker/ExpiryPolicy.cpp
new file mode 100644
index 0000000000..907f1e56e1
--- /dev/null
+++ b/qpid/cpp/src/qpid/broker/ExpiryPolicy.cpp
@@ -0,0 +1,36 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "ExpiryPolicy.h"
+#include "Message.h"
+#include "qpid/sys/Time.h"
+
+namespace qpid {
+namespace broker {
+
+ExpiryPolicy::~ExpiryPolicy() {}
+
+void ExpiryPolicy::willExpire(Message&) {}
+
+bool ExpiryPolicy::hasExpired(Message& m) {
+ return m.getExpiration() < sys::AbsTime::now();
+}
+
+}} // namespace qpid::broker
diff --git a/qpid/cpp/src/qpid/broker/ExpiryPolicy.h b/qpid/cpp/src/qpid/broker/ExpiryPolicy.h
new file mode 100644
index 0000000000..1b7316f6f9
--- /dev/null
+++ b/qpid/cpp/src/qpid/broker/ExpiryPolicy.h
@@ -0,0 +1,44 @@
+#ifndef QPID_BROKER_EXPIRYPOLICY_H
+#define QPID_BROKER_EXPIRYPOLICY_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/RefCounted.h"
+
+namespace qpid {
+namespace broker {
+
+class Message;
+
+/**
+ * Default expiry policy.
+ */
+class ExpiryPolicy : public RefCounted
+{
+ public:
+ virtual ~ExpiryPolicy();
+ virtual void willExpire(Message&);
+ virtual bool hasExpired(Message&);
+};
+}} // namespace qpid::broker
+
+#endif /*!QPID_BROKER_EXPIRYPOLICY_H*/
diff --git a/qpid/cpp/src/qpid/broker/Message.cpp b/qpid/cpp/src/qpid/broker/Message.cpp
index e5a0c3e9e1..ce0477b08c 100644
--- a/qpid/cpp/src/qpid/broker/Message.cpp
+++ b/qpid/cpp/src/qpid/broker/Message.cpp
@@ -21,6 +21,7 @@
#include "Message.h"
#include "ExchangeRegistry.h"
+#include "ExpiryPolicy.h"
#include "qpid/StringUtils.h"
#include "qpid/framing/frame_functors.h"
#include "qpid/framing/FieldTable.h"
@@ -316,24 +317,29 @@ void Message::addTraceId(const std::string& id)
}
}
-void Message::setTimestamp()
+void Message::setTimestamp(const boost::intrusive_ptr<ExpiryPolicy>& e)
{
DeliveryProperties* props = getProperties<DeliveryProperties>();
- //Spec states that timestamp should be set, evaluate the
- //performance impact before re-enabling this:
- //time_t now = ::time(0);
- //props->setTimestamp(now);
if (props->getTtl()) {
- //set expiration (nb: ttl is in millisecs, time_t is in secs)
+ // AMQP requires setting the expiration property to be posix
+ // time_t in seconds. TTL is in milliseconds
time_t now = ::time(0);
props->setExpiration(now + (props->getTtl()/1000));
+ // Use higher resolution time for the internal expiry calculation.
expiration = AbsTime(AbsTime::now(), Duration(props->getTtl() * TIME_MSEC));
+ setExpiryPolicy(e);
}
}
-bool Message::hasExpired() const
+void Message::setExpiryPolicy(const boost::intrusive_ptr<ExpiryPolicy>& e) {
+ expiryPolicy = e;
+ if (expiryPolicy)
+ expiryPolicy->willExpire(*this);
+}
+
+bool Message::hasExpired()
{
- return expiration < FAR_FUTURE && expiration < AbsTime::now();
+ return expiryPolicy && expiryPolicy->hasExpired(*this);
}
boost::intrusive_ptr<Message>& Message::getReplacementMessage(const Queue* qfor) const
diff --git a/qpid/cpp/src/qpid/broker/Message.h b/qpid/cpp/src/qpid/broker/Message.h
index de716e9441..96fcf61dfc 100644
--- a/qpid/cpp/src/qpid/broker/Message.h
+++ b/qpid/cpp/src/qpid/broker/Message.h
@@ -45,6 +45,7 @@ class Exchange;
class ExchangeRegistry;
class MessageStore;
class Queue;
+class ExpiryPolicy;
class Message : public PersistableMessage {
public:
@@ -73,8 +74,11 @@ public:
const framing::FieldTable* getApplicationHeaders() const;
bool isPersistent();
bool requiresAccept();
- void setTimestamp();
- bool hasExpired() const;
+
+ void setTimestamp(const boost::intrusive_ptr<ExpiryPolicy>& e);
+ void setExpiryPolicy(const boost::intrusive_ptr<ExpiryPolicy>& e);
+ bool hasExpired();
+ sys::AbsTime getExpiration() const { return expiration; }
framing::FrameSet& getFrames() { return frames; }
const framing::FrameSet& getFrames() const { return frames; }
@@ -171,6 +175,7 @@ public:
ConnectionToken* publisher;
mutable MessageAdapter* adapter;
qpid::sys::AbsTime expiration;
+ boost::intrusive_ptr<ExpiryPolicy> expiryPolicy;
static TransferAdapter TRANSFER;
diff --git a/qpid/cpp/src/qpid/broker/SemanticState.cpp b/qpid/cpp/src/qpid/broker/SemanticState.cpp
index f9f75679e5..13a8c649d2 100644
--- a/qpid/cpp/src/qpid/broker/SemanticState.cpp
+++ b/qpid/cpp/src/qpid/broker/SemanticState.cpp
@@ -358,14 +358,13 @@ void SemanticState::route(intrusive_ptr<Message> msg, Deliverable& strategy) {
std::string exchangeName = msg->getExchangeName();
//TODO: the following should be hidden behind message (using MessageAdapter or similar)
- // Do not replace the delivery-properties.exchange if it is is already set.
- // This is used internally (by the cluster) to force the exchange name on a message.
- // The client library ensures this is always empty for messages from normal clients.
if (msg->isA<MessageTransferBody>()) {
- if (!msg->hasProperties<DeliveryProperties>() ||
- msg->getProperties<DeliveryProperties>()->getExchange().empty())
+ // Do not replace the delivery-properties.exchange if it is is already set.
+ // This is used internally (by the cluster) to force the exchange name on a message.
+ // The client library ensures this is always empty for messages from normal clients.
+ if (!msg->hasProperties<DeliveryProperties>() || msg->getProperties<DeliveryProperties>()->getExchange().empty())
msg->getProperties<DeliveryProperties>()->setExchange(exchangeName);
- msg->setTimestamp();
+ msg->setTimestamp(getSession().getBroker().getExpiryPolicy());
}
if (!cacheExchange || cacheExchange->getName() != exchangeName){
cacheExchange = session.getBroker().getExchanges().get(exchangeName);
diff --git a/qpid/cpp/src/qpid/cluster/Cluster.cpp b/qpid/cpp/src/qpid/cluster/Cluster.cpp
index 8e6ece11cc..be04eebc57 100644
--- a/qpid/cpp/src/qpid/cluster/Cluster.cpp
+++ b/qpid/cpp/src/qpid/cluster/Cluster.cpp
@@ -76,6 +76,7 @@ struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler {
void ready(const std::string& url) { cluster.ready(member, url, l); }
void configChange(const std::string& addresses) { cluster.configChange(member, addresses, l); }
void updateOffer(uint64_t updatee, const Uuid& id) { cluster.updateOffer(member, updatee, id, l); }
+ void messageExpired(uint64_t id) { cluster.messageExpired(member, id, l); }
void shutdown() { cluster.shutdown(member, l); }
bool invoke(AMQBody& body) { return framing::invoke(*this, body).wasHandled(); }
@@ -103,6 +104,8 @@ Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b, b
poller),
connections(*this),
decoder(boost::bind(&PollableFrameQueue::push, &deliverFrameQueue, _1)),
+ expiryPolicy(new ExpiryPolicy(boost::bind(&Cluster::isLeader, this), mcast, myId, broker.getTimer())),
+ frameId(0),
initialized(false),
state(INIT),
lastSize(0),
@@ -134,6 +137,7 @@ void Cluster::initialize() {
myUrl = Url::getIpAddressesUrl(broker.getPort(broker::Broker::TCP_TRANSPORT));
QPID_LOG(notice, *this << " joining cluster " << name << " with url=" << myUrl);
broker.getKnownBrokers = boost::bind(&Cluster::getUrls, this);
+ broker.setExpiryPolicy(expiryPolicy);
dispatcher.start();
deliverEventQueue.start();
deliverFrameQueue.start();
@@ -238,7 +242,8 @@ void Cluster::deliveredEvent(const Event& e) {
// Handler for deliverFrameQueue
void Cluster::deliveredFrame(const EventFrame& e) {
- Mutex::ScopedLock l(lock);
+ Mutex::ScopedLock l(lock);
+ const_cast<AMQFrame&>(e.frame).setClusterId(frameId++);
QPID_LOG(trace, *this << " DLVR: " << e);
QPID_LATENCY_RECORD("delivered frame queue", e.frame);
if (e.isCluster()) { // Cluster control frame
@@ -333,22 +338,23 @@ void Cluster::configChange(const MemberId&, const std::string& addresses, Lock&
state = JOINER;
QPID_LOG(info, *this << " joining cluster: " << map);
mcast.mcastControl(ClusterUpdateRequestBody(ProtocolVersion(), myUrl.str()), myId);
- ClusterMap::Set members = map.getAlive();
- members.erase(myId);
- myElders = members;
+ elders = map.getAlive();
+ elders.erase(myId);
broker.getLinks().setPassive(true);
}
}
else if (state >= READY && memberChange) {
memberUpdate(l);
- myElders = ClusterMap::intersection(myElders, map.getAlive());
- if (myElders.empty()) {
+ elders = ClusterMap::intersection(elders, map.getAlive());
+ if (elders.empty()) {
//assume we are oldest, reactive links if necessary
broker.getLinks().setPassive(false);
}
}
}
+bool Cluster::isLeader() const { return elders.empty(); }
+
void Cluster::tryMakeOffer(const MemberId& id, Lock& ) {
if (state == READY && map.isJoiner(id)) {
state = OFFER;
@@ -420,15 +426,16 @@ void Cluster::updateStart(const MemberId& updatee, const Url& url, Lock&) {
deliverFrameQueue.stop();
if (updateThread.id()) updateThread.join(); // Join the previous updatethread.
updateThread = Thread(
- new UpdateClient(myId, updatee, url, broker, map, connections.values(),
- boost::bind(&Cluster::updateOutDone, this),
- boost::bind(&Cluster::updateOutError, this, _1)));
+ new UpdateClient(myId, updatee, url, broker, map, frameId, connections.values(),
+ boost::bind(&Cluster::updateOutDone, this),
+ boost::bind(&Cluster::updateOutError, this, _1)));
}
// Called in update thread.
-void Cluster::updateInDone(const ClusterMap& m) {
+void Cluster::updateInDone(const ClusterMap& m, uint64_t fid) {
Lock l(lock);
updatedMap = m;
+ frameId = fid;
checkUpdateIn(l);
}
@@ -573,4 +580,8 @@ void Cluster::setClusterId(const Uuid& uuid) {
QPID_LOG(debug, *this << " cluster-id = " << clusterId);
}
+void Cluster::messageExpired(const MemberId&, uint64_t id, Lock&) {
+ expiryPolicy->deliverExpire(id);
+}
+
}} // namespace qpid::cluster
diff --git a/qpid/cpp/src/qpid/cluster/Cluster.h b/qpid/cpp/src/qpid/cluster/Cluster.h
index f7955aa743..4d994943f7 100644
--- a/qpid/cpp/src/qpid/cluster/Cluster.h
+++ b/qpid/cpp/src/qpid/cluster/Cluster.h
@@ -31,6 +31,7 @@
#include "Quorum.h"
#include "Decoder.h"
#include "PollableQueue.h"
+#include "ExpiryPolicy.h"
#include "qpid/broker/Broker.h"
#include "qpid/sys/Monitor.h"
@@ -89,7 +90,7 @@ class Cluster : private Cpg::Handler, public management::Manageable {
void leave();
// Update completed - called in update thread
- void updateInDone(const ClusterMap&);
+ void updateInDone(const ClusterMap&, uint64_t frameId);
MemberId getId() const;
broker::Broker& getBroker() const;
@@ -100,6 +101,8 @@ class Cluster : private Cpg::Handler, public management::Manageable {
size_t getReadMax() { return readMax; }
size_t getWriteEstimate() { return writeEstimate; }
+
+ bool isLeader() const; // Called in deliver thread.
private:
typedef sys::Monitor::ScopedLock Lock;
@@ -129,6 +132,7 @@ class Cluster : private Cpg::Handler, public management::Manageable {
void updateOffer(const MemberId& updater, uint64_t updatee, const framing::Uuid&, Lock&);
void ready(const MemberId&, const std::string&, Lock&);
void configChange(const MemberId&, const std::string& addresses, Lock& l);
+ void messageExpired(const MemberId&, uint64_t, Lock& l);
void shutdown(const MemberId&, Lock&);
void deliveredEvent(const Event&);
void deliveredFrame(const EventFrame&);
@@ -185,7 +189,6 @@ class Cluster : private Cpg::Handler, public management::Manageable {
const size_t writeEstimate;
framing::Uuid clusterId;
NoOpConnectionOutputHandler shadowOut;
- ClusterMap::Set myElders;
qpid::management::ManagementAgent* mAgent;
// Thread safe members
@@ -197,8 +200,11 @@ class Cluster : private Cpg::Handler, public management::Manageable {
boost::shared_ptr<FailoverExchange> failoverExchange;
Quorum quorum;
- // Called only from event delivery thread
+ // Used only in delivery thread
Decoder decoder;
+ ClusterMap::Set elders;
+ boost::intrusive_ptr<ExpiryPolicy> expiryPolicy;
+ uint64_t frameId;
// Used only during initialization
bool initialized;
diff --git a/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp b/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp
index 2f7d12dcfe..d54d8389e0 100644
--- a/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp
+++ b/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp
@@ -54,7 +54,7 @@ struct ClusterValues {
bool quorum;
size_t readMax, writeEstimate;
- ClusterValues() : quorum(false), readMax(3), writeEstimate(64) {}
+ ClusterValues() : quorum(false), readMax(10), writeEstimate(64) {}
Url getUrl(uint16_t port) const {
if (url.empty()) return Url::getIpAddressesUrl(port);
diff --git a/qpid/cpp/src/qpid/cluster/Connection.cpp b/qpid/cpp/src/qpid/cluster/Connection.cpp
index 9ea79fa2b6..295705e967 100644
--- a/qpid/cpp/src/qpid/cluster/Connection.cpp
+++ b/qpid/cpp/src/qpid/cluster/Connection.cpp
@@ -127,10 +127,6 @@ bool Connection::checkUnsupported(const AMQBody& body) {
case DTX_CLASS_ID: message = "DTX transactions are not currently supported by cluster."; break;
}
}
- else if (body.type() == HEADER_BODY) {
- const DeliveryProperties* dp = static_cast<const AMQHeaderBody&>(body).get<DeliveryProperties>();
- if (dp && dp->getTtl()) message = "Message TTL is not currently supported by cluster.";
- }
if (!message.empty())
connection.close(connection::CLOSE_CODE_FRAMING_ERROR, message);
return !message.empty();
@@ -259,9 +255,9 @@ void Connection::shadowReady(uint64_t memberId, uint64_t connectionId) {
self = shadow;
}
-void Connection::membership(const FieldTable& joiners, const FieldTable& members) {
+void Connection::membership(const FieldTable& joiners, const FieldTable& members, uint64_t frameId) {
QPID_LOG(debug, cluster << " incoming update complete on connection " << *this);
- cluster.updateInDone(ClusterMap(joiners, members));
+ cluster.updateInDone(ClusterMap(joiners, members), frameId);
self.second = 0; // Mark this as completed update connection.
}
diff --git a/qpid/cpp/src/qpid/cluster/Connection.h b/qpid/cpp/src/qpid/cluster/Connection.h
index 160855dc2d..a6e9aa65f9 100644
--- a/qpid/cpp/src/qpid/cluster/Connection.h
+++ b/qpid/cpp/src/qpid/cluster/Connection.h
@@ -119,7 +119,7 @@ class Connection :
void shadowReady(uint64_t memberId, uint64_t connectionId);
- void membership(const framing::FieldTable&, const framing::FieldTable&);
+ void membership(const framing::FieldTable&, const framing::FieldTable&, uint64_t frameId);
void deliveryRecord(const std::string& queue,
const framing::SequenceNumber& position,
diff --git a/qpid/cpp/src/qpid/cluster/ExpiryPolicy.cpp b/qpid/cpp/src/qpid/cluster/ExpiryPolicy.cpp
new file mode 100644
index 0000000000..690acfc3ad
--- /dev/null
+++ b/qpid/cpp/src/qpid/cluster/ExpiryPolicy.cpp
@@ -0,0 +1,80 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "ExpiryPolicy.h"
+#include "Multicaster.h"
+#include "qpid/framing/ClusterMessageExpiredBody.h"
+#include "qpid/sys/Time.h"
+#include "qpid/broker/Message.h"
+#include "qpid/broker/Timer.h"
+#include "qpid/log/Statement.h"
+
+namespace qpid {
+namespace cluster {
+
+ExpiryPolicy::ExpiryPolicy(const boost::function<bool()> & f, Multicaster& m, const MemberId& id, broker::Timer& t)
+ : expiredPolicy(new Expired), isLeader(f), mcast(m), memberId(id), timer(t) {}
+
+namespace {
+uint64_t clusterId(const broker::Message& m) {
+ assert(m.getFrames().begin() != m.getFrames().end());
+ return m.getFrames().begin()->getClusterId();
+}
+
+struct ExpiryTask : public broker::TimerTask {
+ ExpiryTask(const boost::intrusive_ptr<ExpiryPolicy>& policy, uint64_t id, sys::AbsTime when)
+ : TimerTask(when), expiryPolicy(policy), messageId(id) {}
+ void fire() { expiryPolicy->sendExpire(messageId); }
+ boost::intrusive_ptr<ExpiryPolicy> expiryPolicy;
+ const uint64_t messageId;
+};
+}
+
+void ExpiryPolicy::willExpire(broker::Message& m) {
+ timer.add(new ExpiryTask(this, clusterId(m), m.getExpiration()));
+}
+
+bool ExpiryPolicy::hasExpired(broker::Message& m) {
+ sys::Mutex::ScopedLock l(lock);
+ IdSet::iterator i = expired.find(clusterId(m));
+ if (i != expired.end()) {
+ expired.erase(i);
+ const_cast<broker::Message&>(m).setExpiryPolicy(expiredPolicy); // hasExpired() == true;
+ return true;
+ }
+ return false;
+}
+
+void ExpiryPolicy::sendExpire(uint64_t id) {
+ sys::Mutex::ScopedLock l(lock);
+ if (isLeader())
+ mcast.mcastControl(framing::ClusterMessageExpiredBody(framing::ProtocolVersion(), id), memberId);
+}
+
+void ExpiryPolicy::deliverExpire(uint64_t id) {
+ sys::Mutex::ScopedLock l(lock);
+ expired.insert(id);
+}
+
+bool ExpiryPolicy::Expired::hasExpired(broker::Message&) { return true; }
+void ExpiryPolicy::Expired::willExpire(broker::Message&) { }
+
+}} // namespace qpid::cluster
diff --git a/qpid/cpp/src/qpid/cluster/ExpiryPolicy.h b/qpid/cpp/src/qpid/cluster/ExpiryPolicy.h
new file mode 100644
index 0000000000..7fb63c731e
--- /dev/null
+++ b/qpid/cpp/src/qpid/cluster/ExpiryPolicy.h
@@ -0,0 +1,76 @@
+#ifndef QPID_CLUSTER_EXPIRYPOLICY_H
+#define QPID_CLUSTER_EXPIRYPOLICY_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "types.h"
+#include "qpid/broker/ExpiryPolicy.h"
+#include "qpid/sys/Mutex.h"
+#include <boost/function.hpp>
+#include <boost/intrusive_ptr.hpp>
+#include <set>
+
+namespace qpid {
+
+namespace broker { class Timer; }
+
+namespace cluster {
+class Multicaster;
+
+/**
+ * Cluster expiry policy
+ */
+class ExpiryPolicy : public broker::ExpiryPolicy
+{
+ public:
+ ExpiryPolicy(const boost::function<bool()> & isLeader, Multicaster&, const MemberId&, broker::Timer&);
+
+ void willExpire(broker::Message&);
+
+ bool hasExpired(broker::Message&);
+
+ // Send expiration notice to cluster.
+ void sendExpire(uint64_t);
+
+ // Cluster delivers expiry notice.
+ void deliverExpire(uint64_t);
+
+ private:
+ sys::Mutex lock;
+ typedef std::set<uint64_t> IdSet;
+
+ struct Expired : public broker::ExpiryPolicy {
+ bool hasExpired(broker::Message&);
+ void willExpire(broker::Message&);
+ };
+
+ IdSet expired;
+ boost::intrusive_ptr<Expired> expiredPolicy;
+ boost::function<bool()> isLeader;
+ Multicaster& mcast;
+ MemberId memberId;
+ broker::Timer& timer;
+};
+
+}} // namespace qpid::cluster
+
+#endif /*!QPID_CLUSTER_EXPIRYPOLICY_H*/
diff --git a/qpid/cpp/src/qpid/cluster/UpdateClient.cpp b/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
index 91d4c6d3ce..e50c936b50 100644
--- a/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
+++ b/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
@@ -86,10 +86,12 @@ void send(client::AsyncSession& s, const AMQBody& body) {
// TODO aconway 2008-09-24: optimization: update connections/sessions in parallel.
UpdateClient::UpdateClient(const MemberId& updater, const MemberId& updatee, const Url& url,
- broker::Broker& broker, const ClusterMap& m, const Cluster::Connections& cons,
- const boost::function<void()>& ok,
- const boost::function<void(const std::exception&)>& fail)
- : updaterId(updater), updateeId(updatee), updateeUrl(url), updaterBroker(broker), map(m), connections(cons),
+ broker::Broker& broker, const ClusterMap& m, uint64_t frameId_,
+ const Cluster::Connections& cons,
+ const boost::function<void()>& ok,
+ const boost::function<void(const std::exception&)>& fail)
+ : updaterId(updater), updateeId(updatee), updateeUrl(url), updaterBroker(broker), map(m),
+ frameId(frameId_), connections(cons),
connection(catchUpConnection()), shadowConnection(catchUpConnection()),
done(ok), failed(fail)
{
@@ -120,6 +122,7 @@ void UpdateClient::update() {
ClusterConnectionMembershipBody membership;
map.toMethodBody(membership);
+ membership.setFrameId(frameId);
AMQFrame frame(membership);
client::ConnectionAccess::getImpl(connection)->handle(frame);
connection.close();
diff --git a/qpid/cpp/src/qpid/cluster/UpdateClient.h b/qpid/cpp/src/qpid/cluster/UpdateClient.h
index 93dca9f0c6..0819eb4cdb 100644
--- a/qpid/cpp/src/qpid/cluster/UpdateClient.h
+++ b/qpid/cpp/src/qpid/cluster/UpdateClient.h
@@ -63,9 +63,10 @@ class UpdateClient : public sys::Runnable {
static const std::string UPDATE; // Name for special update queue and exchange.
UpdateClient(const MemberId& updater, const MemberId& updatee, const Url&,
- broker::Broker& donor, const ClusterMap& map, const std::vector<boost::intrusive_ptr<Connection> >& ,
- const boost::function<void()>& done,
- const boost::function<void(const std::exception&)>& fail);
+ broker::Broker& donor, const ClusterMap& map, uint64_t sequence,
+ const std::vector<boost::intrusive_ptr<Connection> >& ,
+ const boost::function<void()>& done,
+ const boost::function<void(const std::exception&)>& fail);
~UpdateClient();
void update();
@@ -89,6 +90,7 @@ class UpdateClient : public sys::Runnable {
Url updateeUrl;
broker::Broker& updaterBroker;
ClusterMap map;
+ uint64_t frameId;
std::vector<boost::intrusive_ptr<Connection> > connections;
client::Connection connection, shadowConnection;
client::AsyncSession session, shadowSession;
diff --git a/qpid/cpp/src/qpid/framing/AMQFrame.h b/qpid/cpp/src/qpid/framing/AMQFrame.h
index 02a1ea4622..028d0c1d8a 100644
--- a/qpid/cpp/src/qpid/framing/AMQFrame.h
+++ b/qpid/cpp/src/qpid/framing/AMQFrame.h
@@ -92,6 +92,9 @@ class AMQFrame : public AMQDataBlock, public sys::LatencyMetricTimestamp
/** Must point to at least DECODE_SIZE_MIN bytes of data */
static uint16_t decodeSize(char* data);
+ uint64_t getClusterId() const { return clusterId; }
+ void setClusterId(uint64_t id) { clusterId = id; }
+
private:
void init();
@@ -103,6 +106,7 @@ class AMQFrame : public AMQDataBlock, public sys::LatencyMetricTimestamp
bool bos : 1;
bool eos : 1;
mutable uint32_t encodedSizeCache;
+ uint64_t clusterId; // Used to identify frames in a clustered broekr.
};
std::ostream& operator<<(std::ostream&, const AMQFrame&);
diff --git a/qpid/cpp/src/qpid/framing/FrameSet.h b/qpid/cpp/src/qpid/framing/FrameSet.h
index 82987910a7..b13ca16e97 100644
--- a/qpid/cpp/src/qpid/framing/FrameSet.h
+++ b/qpid/cpp/src/qpid/framing/FrameSet.h
@@ -49,6 +49,7 @@ public:
bool isComplete() const;
uint64_t getContentSize() const;
+
void getContent(std::string&) const;
std::string getContent() const;
@@ -73,6 +74,9 @@ public:
return header ? header->get<T>() : 0;
}
+ Frames::const_iterator begin() const { return parts.begin(); }
+ Frames::const_iterator end() const { return parts.end(); }
+
const SequenceNumber& getId() const { return id; }
template <class P> void remove(P predicate) {
diff --git a/qpid/cpp/src/tests/QueueTest.cpp b/qpid/cpp/src/tests/QueueTest.cpp
index 079b9b0ba6..5a93533755 100644
--- a/qpid/cpp/src/tests/QueueTest.cpp
+++ b/qpid/cpp/src/tests/QueueTest.cpp
@@ -26,6 +26,7 @@
#include "qpid/broker/ExchangeRegistry.h"
#include "qpid/broker/QueueRegistry.h"
#include "qpid/broker/NullMessageStore.h"
+#include "qpid/broker/ExpiryPolicy.h"
#include "qpid/framing/MessageTransferBody.h"
#include "qpid/client/QueueOptions.h"
#include <iostream>
@@ -491,7 +492,7 @@ void addMessagesToQueue(uint count, Queue& queue, uint oddTtl = 200, uint evenTt
} else {
if (evenTtl) m->getProperties<DeliveryProperties>()->setTtl(evenTtl);
}
- m->setTimestamp();
+ m->setTimestamp(new broker::ExpiryPolicy);
queue.deliver(m);
}
}
diff --git a/qpid/cpp/src/tests/cluster_test.cpp b/qpid/cpp/src/tests/cluster_test.cpp
index 15a96aeba9..14b7659b65 100644
--- a/qpid/cpp/src/tests/cluster_test.cpp
+++ b/qpid/cpp/src/tests/cluster_test.cpp
@@ -37,6 +37,7 @@
#include <boost/bind.hpp>
#include <boost/shared_ptr.hpp>
+#include <boost/assign.hpp>
#include <string>
#include <iostream>
@@ -51,22 +52,23 @@ template <class T>
ostream& operator<<(ostream& o, const std::set<T>& s) { return seqPrint(o, s); }
}
-
-QPID_AUTO_TEST_SUITE(cluster)
+QPID_AUTO_TEST_SUITE(cluster_test)
using namespace std;
using namespace qpid;
using namespace qpid::cluster;
using namespace qpid::framing;
using namespace qpid::client;
-using qpid::sys::TIME_SEC;
-using qpid::broker::Broker;
+using namespace boost::assign;
+using broker::Broker;
using boost::shared_ptr;
-using qpid::cluster::Cluster;
+
+// Timeout for tests that wait for messages
+const sys::Duration TIMEOUT=sys::TIME_SEC/4;
ostream& operator<<(ostream& o, const cpg_name* n) {
- return o << qpid::cluster::Cpg::str(*n);
+ return o << cluster::Cpg::str(*n);
}
ostream& operator<<(ostream& o, const cpg_address& a) {
@@ -94,7 +96,7 @@ template <class T> std::set<uint16_t> knownBrokerPorts(T& source, int n=-1) {
BOOST_MESSAGE("knownBrokerPorts waiting for " << n << ": " << urls);
// Retry up to 10 secs in .1 second intervals.
for (size_t retry=100; urls.size() != unsigned(n) && retry != 0; --retry) {
- ::usleep(1000*100); // 0.1 secs
+ sys::usleep(1000*100); // 0.1 secs
urls = source.getKnownBrokers();
}
}
@@ -127,6 +129,45 @@ int64_t getMsgSequence(const Message& m) {
return m.getMessageProperties().getApplicationHeaders().getAsInt64("qpid.msg_sequence");
}
+Message ttlMessage(const std::string& data, const std::string& key, uint64_t ttl) {
+ Message m(data, key);
+ m.getDeliveryProperties().setTtl(ttl);
+ return m;
+}
+
+vector<std::string> browse(Client& c, const std::string& q, int n) {
+ SubscriptionSettings browseSettings(
+ FlowControl::unlimited(),
+ ACCEPT_MODE_NONE,
+ ACQUIRE_MODE_NOT_ACQUIRED,
+ 0 // No auto-ack.
+ );
+ LocalQueue lq;
+ c.subs.subscribe(lq, q, browseSettings);
+ vector<std::string> result;
+ for (int i = 0; i < n; ++i) {
+ result.push_back(lq.get(TIMEOUT).getData());
+ }
+ c.subs.getSubscription(q).cancel();
+ return result;
+}
+
+QPID_AUTO_TEST_CASE(testMessageTimeToLive) {
+ // Note: this doesn't actually test for cluster race conditions around TTL,
+ // it just verifies that basic TTL functionality works.
+ //
+ ClusterFixture cluster(2);
+ Client c0(cluster[0], "c0");
+ Client c1(cluster[1], "c1");
+ c0.session.queueDeclare("q");
+ c0.session.messageTransfer(arg::content=ttlMessage("a", "q", 200));
+ c0.session.messageTransfer(arg::content=Message("b", "q"));
+ BOOST_CHECK_EQUAL(browse(c1, "q", 2), list_of<std::string>("a")("b"));
+ sys::usleep(300*1000);
+ BOOST_CHECK_EQUAL(browse(c0, "q", 1), list_of<std::string>("b"));
+ BOOST_CHECK_EQUAL(browse(c1, "q", 1), list_of<std::string>("b"));
+}
+
QPID_AUTO_TEST_CASE(testSequenceOptions) {
// Make sure the exchange qpid.msg_sequence property is properly replicated.
ClusterFixture cluster(1);
@@ -138,13 +179,13 @@ QPID_AUTO_TEST_CASE(testSequenceOptions) {
c0.session.exchangeBind(arg::exchange="ex", arg::queue="q", arg::bindingKey="k");
c0.session.messageTransfer(arg::content=Message("1", "k"), arg::destination="ex");
c0.session.messageTransfer(arg::content=Message("2", "k"), arg::destination="ex");
- BOOST_CHECK_EQUAL(1, getMsgSequence(c0.subs.get("q", TIME_SEC)));
- BOOST_CHECK_EQUAL(2, getMsgSequence(c0.subs.get("q", TIME_SEC)));
+ BOOST_CHECK_EQUAL(1, getMsgSequence(c0.subs.get("q", TIMEOUT)));
+ BOOST_CHECK_EQUAL(2, getMsgSequence(c0.subs.get("q", TIMEOUT)));
cluster.add();
Client c1(cluster[1]);
c1.session.messageTransfer(arg::content=Message("3", "k"), arg::destination="ex");
- BOOST_CHECK_EQUAL(3, getMsgSequence(c1.subs.get("q", TIME_SEC)));
+ BOOST_CHECK_EQUAL(3, getMsgSequence(c1.subs.get("q", TIMEOUT)));
}
QPID_AUTO_TEST_CASE(testTxTransaction) {
@@ -160,14 +201,14 @@ QPID_AUTO_TEST_CASE(testTxTransaction) {
commitSession.txSelect();
commitSession.messageTransfer(arg::content=Message("a", "q"));
commitSession.messageTransfer(arg::content=Message("b", "q"));
- BOOST_CHECK_EQUAL(commitSubs.get("q", TIME_SEC).getData(), "A");
+ BOOST_CHECK_EQUAL(commitSubs.get("q", TIMEOUT).getData(), "A");
// Start a transaction that will roll back.
Session rollbackSession = c0.connection.newSession("rollback");
SubscriptionManager rollbackSubs(rollbackSession);
rollbackSession.txSelect();
rollbackSession.messageTransfer(arg::content=Message("1", "q"));
- Message rollbackMessage = rollbackSubs.get("q", TIME_SEC);
+ Message rollbackMessage = rollbackSubs.get("q", TIMEOUT);
BOOST_CHECK_EQUAL(rollbackMessage.getData(), "B");
BOOST_CHECK_EQUAL(c0.session.queueQuery("q").getMessageCount(), 0u);
@@ -191,10 +232,10 @@ QPID_AUTO_TEST_CASE(testTxTransaction) {
// Verify queue status: just the comitted messages and dequeues should remain.
BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), 4u);
- BOOST_CHECK_EQUAL(c1.subs.get("q", TIME_SEC).getData(), "B");
- BOOST_CHECK_EQUAL(c1.subs.get("q", TIME_SEC).getData(), "a");
- BOOST_CHECK_EQUAL(c1.subs.get("q", TIME_SEC).getData(), "b");
- BOOST_CHECK_EQUAL(c1.subs.get("q", TIME_SEC).getData(), "c");
+ BOOST_CHECK_EQUAL(c1.subs.get("q", TIMEOUT).getData(), "B");
+ BOOST_CHECK_EQUAL(c1.subs.get("q", TIMEOUT).getData(), "a");
+ BOOST_CHECK_EQUAL(c1.subs.get("q", TIMEOUT).getData(), "b");
+ BOOST_CHECK_EQUAL(c1.subs.get("q", TIMEOUT).getData(), "c");
}
QPID_AUTO_TEST_CASE(testUnacked) {
@@ -210,7 +251,7 @@ QPID_AUTO_TEST_CASE(testUnacked) {
c0.session.messageTransfer(arg::content=Message("11","q1"));
LocalQueue q1;
c0.subs.subscribe(q1, "q1", manualAccept);
- BOOST_CHECK_EQUAL(q1.get(TIME_SEC).getData(), "11"); // Acquired but not accepted
+ BOOST_CHECK_EQUAL(q1.get(TIMEOUT).getData(), "11"); // Acquired but not accepted
BOOST_CHECK_EQUAL(c0.session.queueQuery("q1").getMessageCount(), 0u); // Gone from queue
// Create unacked message: not acquired, accepted or completeed.
@@ -220,12 +261,12 @@ QPID_AUTO_TEST_CASE(testUnacked) {
c0.session.messageTransfer(arg::content=Message("22","q2"));
LocalQueue q2;
c0.subs.subscribe(q2, "q2", manualAcquire);
- m = q2.get(TIME_SEC); // Not acquired or accepted, still on queue
+ m = q2.get(TIMEOUT); // Not acquired or accepted, still on queue
BOOST_CHECK_EQUAL(m.getData(), "21");
BOOST_CHECK_EQUAL(c0.session.queueQuery("q2").getMessageCount(), 2u); // Not removed
c0.subs.getSubscription("q2").acquire(m); // Acquire manually
BOOST_CHECK_EQUAL(c0.session.queueQuery("q2").getMessageCount(), 1u); // Removed
- BOOST_CHECK_EQUAL(q2.get(TIME_SEC).getData(), "22"); // Not acquired or accepted, still on queue
+ BOOST_CHECK_EQUAL(q2.get(TIMEOUT).getData(), "22"); // Not acquired or accepted, still on queue
BOOST_CHECK_EQUAL(c0.session.queueQuery("q2").getMessageCount(), 1u); // 1 not acquired.
// Create empty credit record: acquire and accept but don't complete.
@@ -235,7 +276,7 @@ QPID_AUTO_TEST_CASE(testUnacked) {
c0.session.messageTransfer(arg::content=Message("32", "q3"));
LocalQueue q3;
c0.subs.subscribe(q3, "q3", manualComplete);
- Message m31=q3.get(TIME_SEC);
+ Message m31=q3.get(TIMEOUT);
BOOST_CHECK_EQUAL(m31.getData(), "31"); // Automatically acquired & accepted but not completed.
BOOST_CHECK_EQUAL(c0.session.queueQuery("q3").getMessageCount(), 1u);
@@ -251,7 +292,7 @@ QPID_AUTO_TEST_CASE(testUnacked) {
// Complete the empty credit message, should unblock the message behind it.
BOOST_CHECK_THROW(q3.get(0), Exception);
c0.session.markCompleted(SequenceSet(m31.getId()), true);
- BOOST_CHECK_EQUAL(q3.get(TIME_SEC).getData(), "32");
+ BOOST_CHECK_EQUAL(q3.get(TIMEOUT).getData(), "32");
BOOST_CHECK_EQUAL(c0.session.queueQuery("q3").getMessageCount(), 0u);
BOOST_CHECK_EQUAL(c1.session.queueQuery("q3").getMessageCount(), 0u);
@@ -260,9 +301,9 @@ QPID_AUTO_TEST_CASE(testUnacked) {
BOOST_CHECK_EQUAL(c1.session.queueQuery("q1").getMessageCount(), 1u);
BOOST_CHECK_EQUAL(c1.session.queueQuery("q2").getMessageCount(), 2u);
- BOOST_CHECK_EQUAL(c1.subs.get("q1", TIME_SEC).getData(), "11");
- BOOST_CHECK_EQUAL(c1.subs.get("q2", TIME_SEC).getData(), "21");
- BOOST_CHECK_EQUAL(c1.subs.get("q2", TIME_SEC).getData(), "22");
+ BOOST_CHECK_EQUAL(c1.subs.get("q1", TIMEOUT).getData(), "11");
+ BOOST_CHECK_EQUAL(c1.subs.get("q2", TIMEOUT).getData(), "21");
+ BOOST_CHECK_EQUAL(c1.subs.get("q2", TIMEOUT).getData(), "22");
}
QPID_AUTO_TEST_CASE_EXPECTED_FAILURES(testUpdateTxState, 1) {
@@ -276,7 +317,7 @@ QPID_AUTO_TEST_CASE_EXPECTED_FAILURES(testUpdateTxState, 1) {
c0.session.messageTransfer(arg::content=Message("1","q"));
c0.session.messageTransfer(arg::content=Message("2","q"));
Message m;
- BOOST_CHECK(c0.subs.get(m, "q", TIME_SEC));
+ BOOST_CHECK(c0.subs.get(m, "q", TIMEOUT));
BOOST_CHECK_EQUAL(m.getData(), "1");
// New member, TX not comitted, c1 should see nothing.
@@ -287,7 +328,7 @@ QPID_AUTO_TEST_CASE_EXPECTED_FAILURES(testUpdateTxState, 1) {
// After commit c1 shoudl see results of tx.
c0.session.txCommit();
BOOST_CHECK_EQUAL(c1.session.queueQuery(arg::queue="q").getMessageCount(), 1u);
- BOOST_CHECK(c1.subs.get(m, "q", TIME_SEC));
+ BOOST_CHECK(c1.subs.get(m, "q", TIMEOUT));
BOOST_CHECK_EQUAL(m.getData(), "2");
// Another transaction with both members active.
@@ -295,7 +336,7 @@ QPID_AUTO_TEST_CASE_EXPECTED_FAILURES(testUpdateTxState, 1) {
BOOST_CHECK_EQUAL(c1.session.queueQuery(arg::queue="q").getMessageCount(), 0u);
c0.session.txCommit();
BOOST_CHECK_EQUAL(c1.session.queueQuery(arg::queue="q").getMessageCount(), 1u);
- BOOST_CHECK(c1.subs.get(m, "q", TIME_SEC));
+ BOOST_CHECK(c1.subs.get(m, "q", TIMEOUT));
BOOST_CHECK_EQUAL(m.getData(), "3");
}
@@ -318,7 +359,7 @@ QPID_AUTO_TEST_CASE(testUpdateMessageBuilder) {
// No reliable way to ensure the partial message has arrived
// before we start the new broker, so we sleep.
- ::usleep(2500);
+ sys::usleep(2500);
cluster.add();
// Send final 2 frames of message.
@@ -328,7 +369,7 @@ QPID_AUTO_TEST_CASE(testUpdateMessageBuilder) {
// Verify message is enqued correctly on second member.
Message m;
Client c1(cluster[1], "c1");
- BOOST_CHECK(c1.subs.get(m, "q", TIME_SEC));
+ BOOST_CHECK(c1.subs.get(m, "q", TIMEOUT));
BOOST_CHECK_EQUAL(m.getData(), "abcd");
BOOST_CHECK_EQUAL(2u, knownBrokerPorts(c1.connection).size());
}
@@ -391,20 +432,20 @@ QPID_AUTO_TEST_CASE(testUpdateConsumers) {
// Activate the subscription, ensure message removed on all queues.
c0.subs.setFlowControl("q", FlowControl::unlimited());
Message m;
- BOOST_CHECK(c0.lq.get(m, TIME_SEC));
+ BOOST_CHECK(c0.lq.get(m, TIMEOUT));
BOOST_CHECK_EQUAL(m.getData(), "aaa");
BOOST_CHECK_EQUAL(c0.session.queueQuery("q").getMessageCount(), 0u);
BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), 0u);
BOOST_CHECK_EQUAL(c2.session.queueQuery("q").getMessageCount(), 0u);
// Check second subscription's flow control: gets first message, not second.
- BOOST_CHECK(lp.get(m, TIME_SEC));
+ BOOST_CHECK(lp.get(m, TIMEOUT));
BOOST_CHECK_EQUAL(m.getData(), "bbb");
BOOST_CHECK_EQUAL(c0.session.queueQuery("p").getMessageCount(), 1u);
BOOST_CHECK_EQUAL(c1.session.queueQuery("p").getMessageCount(), 1u);
BOOST_CHECK_EQUAL(c2.session.queueQuery("p").getMessageCount(), 1u);
- BOOST_CHECK(c0.subs.get(m, "p", TIME_SEC));
+ BOOST_CHECK(c0.subs.get(m, "p", TIMEOUT));
BOOST_CHECK_EQUAL(m.getData(), "ccc");
// Kill the subscribing member, ensure further messages are not removed.
@@ -412,7 +453,7 @@ QPID_AUTO_TEST_CASE(testUpdateConsumers) {
BOOST_REQUIRE_EQUAL(knownBrokerPorts(c1.connection, 2).size(), 2u);
for (int i = 0; i < 10; ++i) {
c1.session.messageTransfer(arg::content=Message("xxx", "q"));
- BOOST_REQUIRE(c1.subs.get(m, "q", TIME_SEC));
+ BOOST_REQUIRE(c1.subs.get(m, "q", TIMEOUT));
BOOST_REQUIRE_EQUAL(m.getData(), "xxx");
}
}
@@ -426,7 +467,7 @@ QPID_AUTO_TEST_CASE(testCatchupSharedState) {
c0.session.messageTransfer(arg::content=Message("foo","q"));
c0.session.messageTransfer(arg::content=Message("bar","q"));
while (c0.session.queueQuery("q").getMessageCount() != 2)
- ::usleep(1000); // Wait for message to show up on broker 0.
+ sys::usleep(1000); // Wait for message to show up on broker 0.
// Add a new broker, it should catch up.
cluster.add();
@@ -444,18 +485,18 @@ QPID_AUTO_TEST_CASE(testCatchupSharedState) {
Client c1(cluster[1], "c1");
- BOOST_CHECK(c1.subs.get(m, "q", TIME_SEC));
+ BOOST_CHECK(c1.subs.get(m, "q", TIMEOUT));
BOOST_CHECK_EQUAL(m.getData(), "foo");
- BOOST_CHECK(c1.subs.get(m, "q", TIME_SEC));
+ BOOST_CHECK(c1.subs.get(m, "q", TIMEOUT));
BOOST_CHECK_EQUAL(m.getData(), "bar");
BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), 0u);
// Add another broker, don't wait for join - should be stalled till ready.
cluster.add();
Client c2(cluster[2], "c2");
- BOOST_CHECK(c2.subs.get(m, "p", TIME_SEC));
+ BOOST_CHECK(c2.subs.get(m, "p", TIMEOUT));
BOOST_CHECK_EQUAL(m.getData(), "pfoo");
- BOOST_CHECK(c2.subs.get(m, "p", TIME_SEC));
+ BOOST_CHECK(c2.subs.get(m, "p", TIMEOUT));
BOOST_CHECK_EQUAL(m.getData(), "pbar");
BOOST_CHECK_EQUAL(c2.session.queueQuery("p").getMessageCount(), 0u);
}
@@ -488,9 +529,9 @@ QPID_AUTO_TEST_CASE(testMessageEnqueue) {
c0.session.close();
Client c1(cluster[1]);
Message msg;
- BOOST_CHECK(c1.subs.get(msg, "q", qpid::sys::TIME_SEC));
+ BOOST_CHECK(c1.subs.get(msg, "q", TIMEOUT));
BOOST_CHECK_EQUAL(string("foo"), msg.getData());
- BOOST_CHECK(c1.subs.get(msg, "q", qpid::sys::TIME_SEC));
+ BOOST_CHECK(c1.subs.get(msg, "q", TIMEOUT));
BOOST_CHECK_EQUAL(string("bar"), msg.getData());
}
@@ -535,9 +576,9 @@ QPID_AUTO_TEST_CASE(testDequeueWaitingSubscription) {
// Check they arrived
Message m;
- BOOST_CHECK(c0.lq.get(m, sys::TIME_SEC));
+ BOOST_CHECK(c0.lq.get(m, TIMEOUT));
BOOST_CHECK_EQUAL("foo", m.getData());
- BOOST_CHECK(c0.lq.get(m, sys::TIME_SEC));
+ BOOST_CHECK(c0.lq.get(m, TIMEOUT));
BOOST_CHECK_EQUAL("bar", m.getData());
// Queue should be empty on all cluster members.
diff --git a/qpid/cpp/xml/cluster.xml b/qpid/cpp/xml/cluster.xml
index c114ef0151..6dc0c722dd 100644
--- a/qpid/cpp/xml/cluster.xml
+++ b/qpid/cpp/xml/cluster.xml
@@ -43,6 +43,10 @@
<control name="config-change" code="0x11" label="Raw cluster membership.">
<field name="current" type="vbin16"/> <!-- packed member-id array -->
</control>
+
+ <control name="message-expired" code="0x12">
+ <field name="id" type="uint64"/>
+ </control>
<control name="shutdown" code="0x20" label="Shut down entire cluster"/>
@@ -126,6 +130,7 @@
<control name="membership" code="0x21" label="Cluster membership details.">
<field name="joiners" type="map"/> <!-- member-id -> URL -->
<field name="members" type="map"/> <!-- member-id -> state -->
+ <field name="frame-id" type="uint64"/>> <!-- Frame id counter value -->
</control>
<!-- Set the position of a replicated queue. -->