From 075450d6d19fe5cb2d18e1e006312af9ded24e06 Mon Sep 17 00:00:00 2001 From: Alan Conway Date: Mon, 9 Feb 2009 22:25:26 +0000 Subject: Cluster support for message time-to-live. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@742774 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/cpp/src/Makefile.am | 2 + qpid/cpp/src/cluster.mk | 2 + qpid/cpp/src/qpid/broker/Broker.cpp | 2 + qpid/cpp/src/qpid/broker/Broker.h | 13 ++- qpid/cpp/src/qpid/broker/ExpiryPolicy.cpp | 36 ++++++++ qpid/cpp/src/qpid/broker/ExpiryPolicy.h | 44 ++++++++++ qpid/cpp/src/qpid/broker/Message.cpp | 22 +++-- qpid/cpp/src/qpid/broker/Message.h | 9 +- qpid/cpp/src/qpid/broker/SemanticState.cpp | 11 ++- qpid/cpp/src/qpid/cluster/Cluster.cpp | 31 ++++--- qpid/cpp/src/qpid/cluster/Cluster.h | 12 ++- qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp | 2 +- qpid/cpp/src/qpid/cluster/Connection.cpp | 8 +- qpid/cpp/src/qpid/cluster/Connection.h | 2 +- qpid/cpp/src/qpid/cluster/ExpiryPolicy.cpp | 80 ++++++++++++++++++ qpid/cpp/src/qpid/cluster/ExpiryPolicy.h | 76 +++++++++++++++++ qpid/cpp/src/qpid/cluster/UpdateClient.cpp | 11 ++- qpid/cpp/src/qpid/cluster/UpdateClient.h | 8 +- qpid/cpp/src/qpid/framing/AMQFrame.h | 4 + qpid/cpp/src/qpid/framing/FrameSet.h | 4 + qpid/cpp/src/tests/QueueTest.cpp | 3 +- qpid/cpp/src/tests/cluster_test.cpp | 125 ++++++++++++++++++---------- qpid/cpp/xml/cluster.xml | 5 ++ 23 files changed, 421 insertions(+), 91 deletions(-) create mode 100644 qpid/cpp/src/qpid/broker/ExpiryPolicy.cpp create mode 100644 qpid/cpp/src/qpid/broker/ExpiryPolicy.h create mode 100644 qpid/cpp/src/qpid/cluster/ExpiryPolicy.cpp create mode 100644 qpid/cpp/src/qpid/cluster/ExpiryPolicy.h diff --git a/qpid/cpp/src/Makefile.am b/qpid/cpp/src/Makefile.am index e07267da37..f46c204f79 100644 --- a/qpid/cpp/src/Makefile.am +++ b/qpid/cpp/src/Makefile.am @@ -358,6 +358,8 @@ libqpidbroker_la_SOURCES = \ qpid/broker/Broker.cpp \ qpid/broker/BrokerSingleton.cpp \ qpid/broker/Exchange.cpp \ + qpid/broker/ExpiryPolicy.h \ + qpid/broker/ExpiryPolicy.cpp \ qpid/broker/Queue.cpp \ qpid/broker/QueueCleaner.cpp \ qpid/broker/QueueListeners.cpp \ diff --git a/qpid/cpp/src/cluster.mk b/qpid/cpp/src/cluster.mk index a588e79c96..8880493bf5 100644 --- a/qpid/cpp/src/cluster.mk +++ b/qpid/cpp/src/cluster.mk @@ -63,6 +63,8 @@ cluster_la_SOURCES = \ qpid/cluster/Event.h \ qpid/cluster/EventFrame.h \ qpid/cluster/EventFrame.cpp \ + qpid/cluster/ExpiryPolicy.h \ + qpid/cluster/ExpiryPolicy.cpp \ qpid/cluster/FailoverExchange.cpp \ qpid/cluster/FailoverExchange.h \ qpid/cluster/Multicaster.cpp \ diff --git a/qpid/cpp/src/qpid/broker/Broker.cpp b/qpid/cpp/src/qpid/broker/Broker.cpp index 091f67ec58..95f55bb596 100644 --- a/qpid/cpp/src/qpid/broker/Broker.cpp +++ b/qpid/cpp/src/qpid/broker/Broker.cpp @@ -30,6 +30,7 @@ #include "SecureConnectionFactory.h" #include "TopicExchange.h" #include "Link.h" +#include "ExpiryPolicy.h" #include "qmf/org/apache/qpid/broker/Package.h" #include "qmf/org/apache/qpid/broker/ArgsBrokerEcho.h" @@ -150,6 +151,7 @@ Broker::Broker(const Broker::Options& conf) : queueCleaner(queues, timer), queueEvents(poller), recovery(true), + expiryPolicy(new ExpiryPolicy), getKnownBrokers(boost::bind(&Broker::getKnownBrokersImpl, this)) { if (conf.enableMgmt) { diff --git a/qpid/cpp/src/qpid/broker/Broker.h b/qpid/cpp/src/qpid/broker/Broker.h index 71b69b51aa..a52a0f67e0 100644 --- a/qpid/cpp/src/qpid/broker/Broker.h +++ b/qpid/cpp/src/qpid/broker/Broker.h @@ -36,6 +36,7 @@ #include "Vhost.h" #include "System.h" #include "Timer.h" +#include "ExpiryPolicy.h" #include "qpid/management/Manageable.h" #include "qpid/management/ManagementBroker.h" #include "qmf/org/apache/qpid/broker/Broker.h" @@ -65,6 +66,8 @@ struct Url; namespace broker { +class ExpiryPolicy; + static const uint16_t DEFAULT_PORT=5672; struct NoSuchTransportException : qpid::Exception @@ -111,6 +114,8 @@ class Broker : public sys::Runnable, public Plugin::Target, private: typedef std::map > ProtocolFactoryMap; + void declareStandardExchange(const std::string& name, const std::string& type); + boost::shared_ptr poller; Options config; management::ManagementAgent::Singleton managementAgentSingleton; @@ -132,14 +137,11 @@ class Broker : public sys::Runnable, public Plugin::Target, System::shared_ptr systemObject; QueueCleaner queueCleaner; QueueEvents queueEvents; - - void declareStandardExchange(const std::string& name, const std::string& type); - std::vector knownBrokers; std::vector getKnownBrokersImpl(); std::string federationTag; - bool recovery; + boost::intrusive_ptr expiryPolicy; public: @@ -180,6 +182,9 @@ class Broker : public sys::Runnable, public Plugin::Target, Options& getOptions() { return config; } QueueEvents& getQueueEvents() { return queueEvents; } + void setExpiryPolicy(const boost::intrusive_ptr& e) { expiryPolicy = e; } + boost::intrusive_ptr getExpiryPolicy() { return expiryPolicy; } + SessionManager& getSessionManager() { return sessionManager; } const std::string& getFederationTag() const { return federationTag; } diff --git a/qpid/cpp/src/qpid/broker/ExpiryPolicy.cpp b/qpid/cpp/src/qpid/broker/ExpiryPolicy.cpp new file mode 100644 index 0000000000..907f1e56e1 --- /dev/null +++ b/qpid/cpp/src/qpid/broker/ExpiryPolicy.cpp @@ -0,0 +1,36 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "ExpiryPolicy.h" +#include "Message.h" +#include "qpid/sys/Time.h" + +namespace qpid { +namespace broker { + +ExpiryPolicy::~ExpiryPolicy() {} + +void ExpiryPolicy::willExpire(Message&) {} + +bool ExpiryPolicy::hasExpired(Message& m) { + return m.getExpiration() < sys::AbsTime::now(); +} + +}} // namespace qpid::broker diff --git a/qpid/cpp/src/qpid/broker/ExpiryPolicy.h b/qpid/cpp/src/qpid/broker/ExpiryPolicy.h new file mode 100644 index 0000000000..1b7316f6f9 --- /dev/null +++ b/qpid/cpp/src/qpid/broker/ExpiryPolicy.h @@ -0,0 +1,44 @@ +#ifndef QPID_BROKER_EXPIRYPOLICY_H +#define QPID_BROKER_EXPIRYPOLICY_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/RefCounted.h" + +namespace qpid { +namespace broker { + +class Message; + +/** + * Default expiry policy. + */ +class ExpiryPolicy : public RefCounted +{ + public: + virtual ~ExpiryPolicy(); + virtual void willExpire(Message&); + virtual bool hasExpired(Message&); +}; +}} // namespace qpid::broker + +#endif /*!QPID_BROKER_EXPIRYPOLICY_H*/ diff --git a/qpid/cpp/src/qpid/broker/Message.cpp b/qpid/cpp/src/qpid/broker/Message.cpp index e5a0c3e9e1..ce0477b08c 100644 --- a/qpid/cpp/src/qpid/broker/Message.cpp +++ b/qpid/cpp/src/qpid/broker/Message.cpp @@ -21,6 +21,7 @@ #include "Message.h" #include "ExchangeRegistry.h" +#include "ExpiryPolicy.h" #include "qpid/StringUtils.h" #include "qpid/framing/frame_functors.h" #include "qpid/framing/FieldTable.h" @@ -316,24 +317,29 @@ void Message::addTraceId(const std::string& id) } } -void Message::setTimestamp() +void Message::setTimestamp(const boost::intrusive_ptr& e) { DeliveryProperties* props = getProperties(); - //Spec states that timestamp should be set, evaluate the - //performance impact before re-enabling this: - //time_t now = ::time(0); - //props->setTimestamp(now); if (props->getTtl()) { - //set expiration (nb: ttl is in millisecs, time_t is in secs) + // AMQP requires setting the expiration property to be posix + // time_t in seconds. TTL is in milliseconds time_t now = ::time(0); props->setExpiration(now + (props->getTtl()/1000)); + // Use higher resolution time for the internal expiry calculation. expiration = AbsTime(AbsTime::now(), Duration(props->getTtl() * TIME_MSEC)); + setExpiryPolicy(e); } } -bool Message::hasExpired() const +void Message::setExpiryPolicy(const boost::intrusive_ptr& e) { + expiryPolicy = e; + if (expiryPolicy) + expiryPolicy->willExpire(*this); +} + +bool Message::hasExpired() { - return expiration < FAR_FUTURE && expiration < AbsTime::now(); + return expiryPolicy && expiryPolicy->hasExpired(*this); } boost::intrusive_ptr& Message::getReplacementMessage(const Queue* qfor) const diff --git a/qpid/cpp/src/qpid/broker/Message.h b/qpid/cpp/src/qpid/broker/Message.h index de716e9441..96fcf61dfc 100644 --- a/qpid/cpp/src/qpid/broker/Message.h +++ b/qpid/cpp/src/qpid/broker/Message.h @@ -45,6 +45,7 @@ class Exchange; class ExchangeRegistry; class MessageStore; class Queue; +class ExpiryPolicy; class Message : public PersistableMessage { public: @@ -73,8 +74,11 @@ public: const framing::FieldTable* getApplicationHeaders() const; bool isPersistent(); bool requiresAccept(); - void setTimestamp(); - bool hasExpired() const; + + void setTimestamp(const boost::intrusive_ptr& e); + void setExpiryPolicy(const boost::intrusive_ptr& e); + bool hasExpired(); + sys::AbsTime getExpiration() const { return expiration; } framing::FrameSet& getFrames() { return frames; } const framing::FrameSet& getFrames() const { return frames; } @@ -171,6 +175,7 @@ public: ConnectionToken* publisher; mutable MessageAdapter* adapter; qpid::sys::AbsTime expiration; + boost::intrusive_ptr expiryPolicy; static TransferAdapter TRANSFER; diff --git a/qpid/cpp/src/qpid/broker/SemanticState.cpp b/qpid/cpp/src/qpid/broker/SemanticState.cpp index f9f75679e5..13a8c649d2 100644 --- a/qpid/cpp/src/qpid/broker/SemanticState.cpp +++ b/qpid/cpp/src/qpid/broker/SemanticState.cpp @@ -358,14 +358,13 @@ void SemanticState::route(intrusive_ptr msg, Deliverable& strategy) { std::string exchangeName = msg->getExchangeName(); //TODO: the following should be hidden behind message (using MessageAdapter or similar) - // Do not replace the delivery-properties.exchange if it is is already set. - // This is used internally (by the cluster) to force the exchange name on a message. - // The client library ensures this is always empty for messages from normal clients. if (msg->isA()) { - if (!msg->hasProperties() || - msg->getProperties()->getExchange().empty()) + // Do not replace the delivery-properties.exchange if it is is already set. + // This is used internally (by the cluster) to force the exchange name on a message. + // The client library ensures this is always empty for messages from normal clients. + if (!msg->hasProperties() || msg->getProperties()->getExchange().empty()) msg->getProperties()->setExchange(exchangeName); - msg->setTimestamp(); + msg->setTimestamp(getSession().getBroker().getExpiryPolicy()); } if (!cacheExchange || cacheExchange->getName() != exchangeName){ cacheExchange = session.getBroker().getExchanges().get(exchangeName); diff --git a/qpid/cpp/src/qpid/cluster/Cluster.cpp b/qpid/cpp/src/qpid/cluster/Cluster.cpp index 8e6ece11cc..be04eebc57 100644 --- a/qpid/cpp/src/qpid/cluster/Cluster.cpp +++ b/qpid/cpp/src/qpid/cluster/Cluster.cpp @@ -76,6 +76,7 @@ struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler { void ready(const std::string& url) { cluster.ready(member, url, l); } void configChange(const std::string& addresses) { cluster.configChange(member, addresses, l); } void updateOffer(uint64_t updatee, const Uuid& id) { cluster.updateOffer(member, updatee, id, l); } + void messageExpired(uint64_t id) { cluster.messageExpired(member, id, l); } void shutdown() { cluster.shutdown(member, l); } bool invoke(AMQBody& body) { return framing::invoke(*this, body).wasHandled(); } @@ -103,6 +104,8 @@ Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b, b poller), connections(*this), decoder(boost::bind(&PollableFrameQueue::push, &deliverFrameQueue, _1)), + expiryPolicy(new ExpiryPolicy(boost::bind(&Cluster::isLeader, this), mcast, myId, broker.getTimer())), + frameId(0), initialized(false), state(INIT), lastSize(0), @@ -134,6 +137,7 @@ void Cluster::initialize() { myUrl = Url::getIpAddressesUrl(broker.getPort(broker::Broker::TCP_TRANSPORT)); QPID_LOG(notice, *this << " joining cluster " << name << " with url=" << myUrl); broker.getKnownBrokers = boost::bind(&Cluster::getUrls, this); + broker.setExpiryPolicy(expiryPolicy); dispatcher.start(); deliverEventQueue.start(); deliverFrameQueue.start(); @@ -238,7 +242,8 @@ void Cluster::deliveredEvent(const Event& e) { // Handler for deliverFrameQueue void Cluster::deliveredFrame(const EventFrame& e) { - Mutex::ScopedLock l(lock); + Mutex::ScopedLock l(lock); + const_cast(e.frame).setClusterId(frameId++); QPID_LOG(trace, *this << " DLVR: " << e); QPID_LATENCY_RECORD("delivered frame queue", e.frame); if (e.isCluster()) { // Cluster control frame @@ -333,22 +338,23 @@ void Cluster::configChange(const MemberId&, const std::string& addresses, Lock& state = JOINER; QPID_LOG(info, *this << " joining cluster: " << map); mcast.mcastControl(ClusterUpdateRequestBody(ProtocolVersion(), myUrl.str()), myId); - ClusterMap::Set members = map.getAlive(); - members.erase(myId); - myElders = members; + elders = map.getAlive(); + elders.erase(myId); broker.getLinks().setPassive(true); } } else if (state >= READY && memberChange) { memberUpdate(l); - myElders = ClusterMap::intersection(myElders, map.getAlive()); - if (myElders.empty()) { + elders = ClusterMap::intersection(elders, map.getAlive()); + if (elders.empty()) { //assume we are oldest, reactive links if necessary broker.getLinks().setPassive(false); } } } +bool Cluster::isLeader() const { return elders.empty(); } + void Cluster::tryMakeOffer(const MemberId& id, Lock& ) { if (state == READY && map.isJoiner(id)) { state = OFFER; @@ -420,15 +426,16 @@ void Cluster::updateStart(const MemberId& updatee, const Url& url, Lock&) { deliverFrameQueue.stop(); if (updateThread.id()) updateThread.join(); // Join the previous updatethread. updateThread = Thread( - new UpdateClient(myId, updatee, url, broker, map, connections.values(), - boost::bind(&Cluster::updateOutDone, this), - boost::bind(&Cluster::updateOutError, this, _1))); + new UpdateClient(myId, updatee, url, broker, map, frameId, connections.values(), + boost::bind(&Cluster::updateOutDone, this), + boost::bind(&Cluster::updateOutError, this, _1))); } // Called in update thread. -void Cluster::updateInDone(const ClusterMap& m) { +void Cluster::updateInDone(const ClusterMap& m, uint64_t fid) { Lock l(lock); updatedMap = m; + frameId = fid; checkUpdateIn(l); } @@ -573,4 +580,8 @@ void Cluster::setClusterId(const Uuid& uuid) { QPID_LOG(debug, *this << " cluster-id = " << clusterId); } +void Cluster::messageExpired(const MemberId&, uint64_t id, Lock&) { + expiryPolicy->deliverExpire(id); +} + }} // namespace qpid::cluster diff --git a/qpid/cpp/src/qpid/cluster/Cluster.h b/qpid/cpp/src/qpid/cluster/Cluster.h index f7955aa743..4d994943f7 100644 --- a/qpid/cpp/src/qpid/cluster/Cluster.h +++ b/qpid/cpp/src/qpid/cluster/Cluster.h @@ -31,6 +31,7 @@ #include "Quorum.h" #include "Decoder.h" #include "PollableQueue.h" +#include "ExpiryPolicy.h" #include "qpid/broker/Broker.h" #include "qpid/sys/Monitor.h" @@ -89,7 +90,7 @@ class Cluster : private Cpg::Handler, public management::Manageable { void leave(); // Update completed - called in update thread - void updateInDone(const ClusterMap&); + void updateInDone(const ClusterMap&, uint64_t frameId); MemberId getId() const; broker::Broker& getBroker() const; @@ -100,6 +101,8 @@ class Cluster : private Cpg::Handler, public management::Manageable { size_t getReadMax() { return readMax; } size_t getWriteEstimate() { return writeEstimate; } + + bool isLeader() const; // Called in deliver thread. private: typedef sys::Monitor::ScopedLock Lock; @@ -129,6 +132,7 @@ class Cluster : private Cpg::Handler, public management::Manageable { void updateOffer(const MemberId& updater, uint64_t updatee, const framing::Uuid&, Lock&); void ready(const MemberId&, const std::string&, Lock&); void configChange(const MemberId&, const std::string& addresses, Lock& l); + void messageExpired(const MemberId&, uint64_t, Lock& l); void shutdown(const MemberId&, Lock&); void deliveredEvent(const Event&); void deliveredFrame(const EventFrame&); @@ -185,7 +189,6 @@ class Cluster : private Cpg::Handler, public management::Manageable { const size_t writeEstimate; framing::Uuid clusterId; NoOpConnectionOutputHandler shadowOut; - ClusterMap::Set myElders; qpid::management::ManagementAgent* mAgent; // Thread safe members @@ -197,8 +200,11 @@ class Cluster : private Cpg::Handler, public management::Manageable { boost::shared_ptr failoverExchange; Quorum quorum; - // Called only from event delivery thread + // Used only in delivery thread Decoder decoder; + ClusterMap::Set elders; + boost::intrusive_ptr expiryPolicy; + uint64_t frameId; // Used only during initialization bool initialized; diff --git a/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp b/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp index 2f7d12dcfe..d54d8389e0 100644 --- a/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp +++ b/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp @@ -54,7 +54,7 @@ struct ClusterValues { bool quorum; size_t readMax, writeEstimate; - ClusterValues() : quorum(false), readMax(3), writeEstimate(64) {} + ClusterValues() : quorum(false), readMax(10), writeEstimate(64) {} Url getUrl(uint16_t port) const { if (url.empty()) return Url::getIpAddressesUrl(port); diff --git a/qpid/cpp/src/qpid/cluster/Connection.cpp b/qpid/cpp/src/qpid/cluster/Connection.cpp index 9ea79fa2b6..295705e967 100644 --- a/qpid/cpp/src/qpid/cluster/Connection.cpp +++ b/qpid/cpp/src/qpid/cluster/Connection.cpp @@ -127,10 +127,6 @@ bool Connection::checkUnsupported(const AMQBody& body) { case DTX_CLASS_ID: message = "DTX transactions are not currently supported by cluster."; break; } } - else if (body.type() == HEADER_BODY) { - const DeliveryProperties* dp = static_cast(body).get(); - if (dp && dp->getTtl()) message = "Message TTL is not currently supported by cluster."; - } if (!message.empty()) connection.close(connection::CLOSE_CODE_FRAMING_ERROR, message); return !message.empty(); @@ -259,9 +255,9 @@ void Connection::shadowReady(uint64_t memberId, uint64_t connectionId) { self = shadow; } -void Connection::membership(const FieldTable& joiners, const FieldTable& members) { +void Connection::membership(const FieldTable& joiners, const FieldTable& members, uint64_t frameId) { QPID_LOG(debug, cluster << " incoming update complete on connection " << *this); - cluster.updateInDone(ClusterMap(joiners, members)); + cluster.updateInDone(ClusterMap(joiners, members), frameId); self.second = 0; // Mark this as completed update connection. } diff --git a/qpid/cpp/src/qpid/cluster/Connection.h b/qpid/cpp/src/qpid/cluster/Connection.h index 160855dc2d..a6e9aa65f9 100644 --- a/qpid/cpp/src/qpid/cluster/Connection.h +++ b/qpid/cpp/src/qpid/cluster/Connection.h @@ -119,7 +119,7 @@ class Connection : void shadowReady(uint64_t memberId, uint64_t connectionId); - void membership(const framing::FieldTable&, const framing::FieldTable&); + void membership(const framing::FieldTable&, const framing::FieldTable&, uint64_t frameId); void deliveryRecord(const std::string& queue, const framing::SequenceNumber& position, diff --git a/qpid/cpp/src/qpid/cluster/ExpiryPolicy.cpp b/qpid/cpp/src/qpid/cluster/ExpiryPolicy.cpp new file mode 100644 index 0000000000..690acfc3ad --- /dev/null +++ b/qpid/cpp/src/qpid/cluster/ExpiryPolicy.cpp @@ -0,0 +1,80 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "ExpiryPolicy.h" +#include "Multicaster.h" +#include "qpid/framing/ClusterMessageExpiredBody.h" +#include "qpid/sys/Time.h" +#include "qpid/broker/Message.h" +#include "qpid/broker/Timer.h" +#include "qpid/log/Statement.h" + +namespace qpid { +namespace cluster { + +ExpiryPolicy::ExpiryPolicy(const boost::function & f, Multicaster& m, const MemberId& id, broker::Timer& t) + : expiredPolicy(new Expired), isLeader(f), mcast(m), memberId(id), timer(t) {} + +namespace { +uint64_t clusterId(const broker::Message& m) { + assert(m.getFrames().begin() != m.getFrames().end()); + return m.getFrames().begin()->getClusterId(); +} + +struct ExpiryTask : public broker::TimerTask { + ExpiryTask(const boost::intrusive_ptr& policy, uint64_t id, sys::AbsTime when) + : TimerTask(when), expiryPolicy(policy), messageId(id) {} + void fire() { expiryPolicy->sendExpire(messageId); } + boost::intrusive_ptr expiryPolicy; + const uint64_t messageId; +}; +} + +void ExpiryPolicy::willExpire(broker::Message& m) { + timer.add(new ExpiryTask(this, clusterId(m), m.getExpiration())); +} + +bool ExpiryPolicy::hasExpired(broker::Message& m) { + sys::Mutex::ScopedLock l(lock); + IdSet::iterator i = expired.find(clusterId(m)); + if (i != expired.end()) { + expired.erase(i); + const_cast(m).setExpiryPolicy(expiredPolicy); // hasExpired() == true; + return true; + } + return false; +} + +void ExpiryPolicy::sendExpire(uint64_t id) { + sys::Mutex::ScopedLock l(lock); + if (isLeader()) + mcast.mcastControl(framing::ClusterMessageExpiredBody(framing::ProtocolVersion(), id), memberId); +} + +void ExpiryPolicy::deliverExpire(uint64_t id) { + sys::Mutex::ScopedLock l(lock); + expired.insert(id); +} + +bool ExpiryPolicy::Expired::hasExpired(broker::Message&) { return true; } +void ExpiryPolicy::Expired::willExpire(broker::Message&) { } + +}} // namespace qpid::cluster diff --git a/qpid/cpp/src/qpid/cluster/ExpiryPolicy.h b/qpid/cpp/src/qpid/cluster/ExpiryPolicy.h new file mode 100644 index 0000000000..7fb63c731e --- /dev/null +++ b/qpid/cpp/src/qpid/cluster/ExpiryPolicy.h @@ -0,0 +1,76 @@ +#ifndef QPID_CLUSTER_EXPIRYPOLICY_H +#define QPID_CLUSTER_EXPIRYPOLICY_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "types.h" +#include "qpid/broker/ExpiryPolicy.h" +#include "qpid/sys/Mutex.h" +#include +#include +#include + +namespace qpid { + +namespace broker { class Timer; } + +namespace cluster { +class Multicaster; + +/** + * Cluster expiry policy + */ +class ExpiryPolicy : public broker::ExpiryPolicy +{ + public: + ExpiryPolicy(const boost::function & isLeader, Multicaster&, const MemberId&, broker::Timer&); + + void willExpire(broker::Message&); + + bool hasExpired(broker::Message&); + + // Send expiration notice to cluster. + void sendExpire(uint64_t); + + // Cluster delivers expiry notice. + void deliverExpire(uint64_t); + + private: + sys::Mutex lock; + typedef std::set IdSet; + + struct Expired : public broker::ExpiryPolicy { + bool hasExpired(broker::Message&); + void willExpire(broker::Message&); + }; + + IdSet expired; + boost::intrusive_ptr expiredPolicy; + boost::function isLeader; + Multicaster& mcast; + MemberId memberId; + broker::Timer& timer; +}; + +}} // namespace qpid::cluster + +#endif /*!QPID_CLUSTER_EXPIRYPOLICY_H*/ diff --git a/qpid/cpp/src/qpid/cluster/UpdateClient.cpp b/qpid/cpp/src/qpid/cluster/UpdateClient.cpp index 91d4c6d3ce..e50c936b50 100644 --- a/qpid/cpp/src/qpid/cluster/UpdateClient.cpp +++ b/qpid/cpp/src/qpid/cluster/UpdateClient.cpp @@ -86,10 +86,12 @@ void send(client::AsyncSession& s, const AMQBody& body) { // TODO aconway 2008-09-24: optimization: update connections/sessions in parallel. UpdateClient::UpdateClient(const MemberId& updater, const MemberId& updatee, const Url& url, - broker::Broker& broker, const ClusterMap& m, const Cluster::Connections& cons, - const boost::function& ok, - const boost::function& fail) - : updaterId(updater), updateeId(updatee), updateeUrl(url), updaterBroker(broker), map(m), connections(cons), + broker::Broker& broker, const ClusterMap& m, uint64_t frameId_, + const Cluster::Connections& cons, + const boost::function& ok, + const boost::function& fail) + : updaterId(updater), updateeId(updatee), updateeUrl(url), updaterBroker(broker), map(m), + frameId(frameId_), connections(cons), connection(catchUpConnection()), shadowConnection(catchUpConnection()), done(ok), failed(fail) { @@ -120,6 +122,7 @@ void UpdateClient::update() { ClusterConnectionMembershipBody membership; map.toMethodBody(membership); + membership.setFrameId(frameId); AMQFrame frame(membership); client::ConnectionAccess::getImpl(connection)->handle(frame); connection.close(); diff --git a/qpid/cpp/src/qpid/cluster/UpdateClient.h b/qpid/cpp/src/qpid/cluster/UpdateClient.h index 93dca9f0c6..0819eb4cdb 100644 --- a/qpid/cpp/src/qpid/cluster/UpdateClient.h +++ b/qpid/cpp/src/qpid/cluster/UpdateClient.h @@ -63,9 +63,10 @@ class UpdateClient : public sys::Runnable { static const std::string UPDATE; // Name for special update queue and exchange. UpdateClient(const MemberId& updater, const MemberId& updatee, const Url&, - broker::Broker& donor, const ClusterMap& map, const std::vector >& , - const boost::function& done, - const boost::function& fail); + broker::Broker& donor, const ClusterMap& map, uint64_t sequence, + const std::vector >& , + const boost::function& done, + const boost::function& fail); ~UpdateClient(); void update(); @@ -89,6 +90,7 @@ class UpdateClient : public sys::Runnable { Url updateeUrl; broker::Broker& updaterBroker; ClusterMap map; + uint64_t frameId; std::vector > connections; client::Connection connection, shadowConnection; client::AsyncSession session, shadowSession; diff --git a/qpid/cpp/src/qpid/framing/AMQFrame.h b/qpid/cpp/src/qpid/framing/AMQFrame.h index 02a1ea4622..028d0c1d8a 100644 --- a/qpid/cpp/src/qpid/framing/AMQFrame.h +++ b/qpid/cpp/src/qpid/framing/AMQFrame.h @@ -92,6 +92,9 @@ class AMQFrame : public AMQDataBlock, public sys::LatencyMetricTimestamp /** Must point to at least DECODE_SIZE_MIN bytes of data */ static uint16_t decodeSize(char* data); + uint64_t getClusterId() const { return clusterId; } + void setClusterId(uint64_t id) { clusterId = id; } + private: void init(); @@ -103,6 +106,7 @@ class AMQFrame : public AMQDataBlock, public sys::LatencyMetricTimestamp bool bos : 1; bool eos : 1; mutable uint32_t encodedSizeCache; + uint64_t clusterId; // Used to identify frames in a clustered broekr. }; std::ostream& operator<<(std::ostream&, const AMQFrame&); diff --git a/qpid/cpp/src/qpid/framing/FrameSet.h b/qpid/cpp/src/qpid/framing/FrameSet.h index 82987910a7..b13ca16e97 100644 --- a/qpid/cpp/src/qpid/framing/FrameSet.h +++ b/qpid/cpp/src/qpid/framing/FrameSet.h @@ -49,6 +49,7 @@ public: bool isComplete() const; uint64_t getContentSize() const; + void getContent(std::string&) const; std::string getContent() const; @@ -73,6 +74,9 @@ public: return header ? header->get() : 0; } + Frames::const_iterator begin() const { return parts.begin(); } + Frames::const_iterator end() const { return parts.end(); } + const SequenceNumber& getId() const { return id; } template void remove(P predicate) { diff --git a/qpid/cpp/src/tests/QueueTest.cpp b/qpid/cpp/src/tests/QueueTest.cpp index 079b9b0ba6..5a93533755 100644 --- a/qpid/cpp/src/tests/QueueTest.cpp +++ b/qpid/cpp/src/tests/QueueTest.cpp @@ -26,6 +26,7 @@ #include "qpid/broker/ExchangeRegistry.h" #include "qpid/broker/QueueRegistry.h" #include "qpid/broker/NullMessageStore.h" +#include "qpid/broker/ExpiryPolicy.h" #include "qpid/framing/MessageTransferBody.h" #include "qpid/client/QueueOptions.h" #include @@ -491,7 +492,7 @@ void addMessagesToQueue(uint count, Queue& queue, uint oddTtl = 200, uint evenTt } else { if (evenTtl) m->getProperties()->setTtl(evenTtl); } - m->setTimestamp(); + m->setTimestamp(new broker::ExpiryPolicy); queue.deliver(m); } } diff --git a/qpid/cpp/src/tests/cluster_test.cpp b/qpid/cpp/src/tests/cluster_test.cpp index 15a96aeba9..14b7659b65 100644 --- a/qpid/cpp/src/tests/cluster_test.cpp +++ b/qpid/cpp/src/tests/cluster_test.cpp @@ -37,6 +37,7 @@ #include #include +#include #include #include @@ -51,22 +52,23 @@ template ostream& operator<<(ostream& o, const std::set& s) { return seqPrint(o, s); } } - -QPID_AUTO_TEST_SUITE(cluster) +QPID_AUTO_TEST_SUITE(cluster_test) using namespace std; using namespace qpid; using namespace qpid::cluster; using namespace qpid::framing; using namespace qpid::client; -using qpid::sys::TIME_SEC; -using qpid::broker::Broker; +using namespace boost::assign; +using broker::Broker; using boost::shared_ptr; -using qpid::cluster::Cluster; + +// Timeout for tests that wait for messages +const sys::Duration TIMEOUT=sys::TIME_SEC/4; ostream& operator<<(ostream& o, const cpg_name* n) { - return o << qpid::cluster::Cpg::str(*n); + return o << cluster::Cpg::str(*n); } ostream& operator<<(ostream& o, const cpg_address& a) { @@ -94,7 +96,7 @@ template std::set knownBrokerPorts(T& source, int n=-1) { BOOST_MESSAGE("knownBrokerPorts waiting for " << n << ": " << urls); // Retry up to 10 secs in .1 second intervals. for (size_t retry=100; urls.size() != unsigned(n) && retry != 0; --retry) { - ::usleep(1000*100); // 0.1 secs + sys::usleep(1000*100); // 0.1 secs urls = source.getKnownBrokers(); } } @@ -127,6 +129,45 @@ int64_t getMsgSequence(const Message& m) { return m.getMessageProperties().getApplicationHeaders().getAsInt64("qpid.msg_sequence"); } +Message ttlMessage(const std::string& data, const std::string& key, uint64_t ttl) { + Message m(data, key); + m.getDeliveryProperties().setTtl(ttl); + return m; +} + +vector browse(Client& c, const std::string& q, int n) { + SubscriptionSettings browseSettings( + FlowControl::unlimited(), + ACCEPT_MODE_NONE, + ACQUIRE_MODE_NOT_ACQUIRED, + 0 // No auto-ack. + ); + LocalQueue lq; + c.subs.subscribe(lq, q, browseSettings); + vector result; + for (int i = 0; i < n; ++i) { + result.push_back(lq.get(TIMEOUT).getData()); + } + c.subs.getSubscription(q).cancel(); + return result; +} + +QPID_AUTO_TEST_CASE(testMessageTimeToLive) { + // Note: this doesn't actually test for cluster race conditions around TTL, + // it just verifies that basic TTL functionality works. + // + ClusterFixture cluster(2); + Client c0(cluster[0], "c0"); + Client c1(cluster[1], "c1"); + c0.session.queueDeclare("q"); + c0.session.messageTransfer(arg::content=ttlMessage("a", "q", 200)); + c0.session.messageTransfer(arg::content=Message("b", "q")); + BOOST_CHECK_EQUAL(browse(c1, "q", 2), list_of("a")("b")); + sys::usleep(300*1000); + BOOST_CHECK_EQUAL(browse(c0, "q", 1), list_of("b")); + BOOST_CHECK_EQUAL(browse(c1, "q", 1), list_of("b")); +} + QPID_AUTO_TEST_CASE(testSequenceOptions) { // Make sure the exchange qpid.msg_sequence property is properly replicated. ClusterFixture cluster(1); @@ -138,13 +179,13 @@ QPID_AUTO_TEST_CASE(testSequenceOptions) { c0.session.exchangeBind(arg::exchange="ex", arg::queue="q", arg::bindingKey="k"); c0.session.messageTransfer(arg::content=Message("1", "k"), arg::destination="ex"); c0.session.messageTransfer(arg::content=Message("2", "k"), arg::destination="ex"); - BOOST_CHECK_EQUAL(1, getMsgSequence(c0.subs.get("q", TIME_SEC))); - BOOST_CHECK_EQUAL(2, getMsgSequence(c0.subs.get("q", TIME_SEC))); + BOOST_CHECK_EQUAL(1, getMsgSequence(c0.subs.get("q", TIMEOUT))); + BOOST_CHECK_EQUAL(2, getMsgSequence(c0.subs.get("q", TIMEOUT))); cluster.add(); Client c1(cluster[1]); c1.session.messageTransfer(arg::content=Message("3", "k"), arg::destination="ex"); - BOOST_CHECK_EQUAL(3, getMsgSequence(c1.subs.get("q", TIME_SEC))); + BOOST_CHECK_EQUAL(3, getMsgSequence(c1.subs.get("q", TIMEOUT))); } QPID_AUTO_TEST_CASE(testTxTransaction) { @@ -160,14 +201,14 @@ QPID_AUTO_TEST_CASE(testTxTransaction) { commitSession.txSelect(); commitSession.messageTransfer(arg::content=Message("a", "q")); commitSession.messageTransfer(arg::content=Message("b", "q")); - BOOST_CHECK_EQUAL(commitSubs.get("q", TIME_SEC).getData(), "A"); + BOOST_CHECK_EQUAL(commitSubs.get("q", TIMEOUT).getData(), "A"); // Start a transaction that will roll back. Session rollbackSession = c0.connection.newSession("rollback"); SubscriptionManager rollbackSubs(rollbackSession); rollbackSession.txSelect(); rollbackSession.messageTransfer(arg::content=Message("1", "q")); - Message rollbackMessage = rollbackSubs.get("q", TIME_SEC); + Message rollbackMessage = rollbackSubs.get("q", TIMEOUT); BOOST_CHECK_EQUAL(rollbackMessage.getData(), "B"); BOOST_CHECK_EQUAL(c0.session.queueQuery("q").getMessageCount(), 0u); @@ -191,10 +232,10 @@ QPID_AUTO_TEST_CASE(testTxTransaction) { // Verify queue status: just the comitted messages and dequeues should remain. BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), 4u); - BOOST_CHECK_EQUAL(c1.subs.get("q", TIME_SEC).getData(), "B"); - BOOST_CHECK_EQUAL(c1.subs.get("q", TIME_SEC).getData(), "a"); - BOOST_CHECK_EQUAL(c1.subs.get("q", TIME_SEC).getData(), "b"); - BOOST_CHECK_EQUAL(c1.subs.get("q", TIME_SEC).getData(), "c"); + BOOST_CHECK_EQUAL(c1.subs.get("q", TIMEOUT).getData(), "B"); + BOOST_CHECK_EQUAL(c1.subs.get("q", TIMEOUT).getData(), "a"); + BOOST_CHECK_EQUAL(c1.subs.get("q", TIMEOUT).getData(), "b"); + BOOST_CHECK_EQUAL(c1.subs.get("q", TIMEOUT).getData(), "c"); } QPID_AUTO_TEST_CASE(testUnacked) { @@ -210,7 +251,7 @@ QPID_AUTO_TEST_CASE(testUnacked) { c0.session.messageTransfer(arg::content=Message("11","q1")); LocalQueue q1; c0.subs.subscribe(q1, "q1", manualAccept); - BOOST_CHECK_EQUAL(q1.get(TIME_SEC).getData(), "11"); // Acquired but not accepted + BOOST_CHECK_EQUAL(q1.get(TIMEOUT).getData(), "11"); // Acquired but not accepted BOOST_CHECK_EQUAL(c0.session.queueQuery("q1").getMessageCount(), 0u); // Gone from queue // Create unacked message: not acquired, accepted or completeed. @@ -220,12 +261,12 @@ QPID_AUTO_TEST_CASE(testUnacked) { c0.session.messageTransfer(arg::content=Message("22","q2")); LocalQueue q2; c0.subs.subscribe(q2, "q2", manualAcquire); - m = q2.get(TIME_SEC); // Not acquired or accepted, still on queue + m = q2.get(TIMEOUT); // Not acquired or accepted, still on queue BOOST_CHECK_EQUAL(m.getData(), "21"); BOOST_CHECK_EQUAL(c0.session.queueQuery("q2").getMessageCount(), 2u); // Not removed c0.subs.getSubscription("q2").acquire(m); // Acquire manually BOOST_CHECK_EQUAL(c0.session.queueQuery("q2").getMessageCount(), 1u); // Removed - BOOST_CHECK_EQUAL(q2.get(TIME_SEC).getData(), "22"); // Not acquired or accepted, still on queue + BOOST_CHECK_EQUAL(q2.get(TIMEOUT).getData(), "22"); // Not acquired or accepted, still on queue BOOST_CHECK_EQUAL(c0.session.queueQuery("q2").getMessageCount(), 1u); // 1 not acquired. // Create empty credit record: acquire and accept but don't complete. @@ -235,7 +276,7 @@ QPID_AUTO_TEST_CASE(testUnacked) { c0.session.messageTransfer(arg::content=Message("32", "q3")); LocalQueue q3; c0.subs.subscribe(q3, "q3", manualComplete); - Message m31=q3.get(TIME_SEC); + Message m31=q3.get(TIMEOUT); BOOST_CHECK_EQUAL(m31.getData(), "31"); // Automatically acquired & accepted but not completed. BOOST_CHECK_EQUAL(c0.session.queueQuery("q3").getMessageCount(), 1u); @@ -251,7 +292,7 @@ QPID_AUTO_TEST_CASE(testUnacked) { // Complete the empty credit message, should unblock the message behind it. BOOST_CHECK_THROW(q3.get(0), Exception); c0.session.markCompleted(SequenceSet(m31.getId()), true); - BOOST_CHECK_EQUAL(q3.get(TIME_SEC).getData(), "32"); + BOOST_CHECK_EQUAL(q3.get(TIMEOUT).getData(), "32"); BOOST_CHECK_EQUAL(c0.session.queueQuery("q3").getMessageCount(), 0u); BOOST_CHECK_EQUAL(c1.session.queueQuery("q3").getMessageCount(), 0u); @@ -260,9 +301,9 @@ QPID_AUTO_TEST_CASE(testUnacked) { BOOST_CHECK_EQUAL(c1.session.queueQuery("q1").getMessageCount(), 1u); BOOST_CHECK_EQUAL(c1.session.queueQuery("q2").getMessageCount(), 2u); - BOOST_CHECK_EQUAL(c1.subs.get("q1", TIME_SEC).getData(), "11"); - BOOST_CHECK_EQUAL(c1.subs.get("q2", TIME_SEC).getData(), "21"); - BOOST_CHECK_EQUAL(c1.subs.get("q2", TIME_SEC).getData(), "22"); + BOOST_CHECK_EQUAL(c1.subs.get("q1", TIMEOUT).getData(), "11"); + BOOST_CHECK_EQUAL(c1.subs.get("q2", TIMEOUT).getData(), "21"); + BOOST_CHECK_EQUAL(c1.subs.get("q2", TIMEOUT).getData(), "22"); } QPID_AUTO_TEST_CASE_EXPECTED_FAILURES(testUpdateTxState, 1) { @@ -276,7 +317,7 @@ QPID_AUTO_TEST_CASE_EXPECTED_FAILURES(testUpdateTxState, 1) { c0.session.messageTransfer(arg::content=Message("1","q")); c0.session.messageTransfer(arg::content=Message("2","q")); Message m; - BOOST_CHECK(c0.subs.get(m, "q", TIME_SEC)); + BOOST_CHECK(c0.subs.get(m, "q", TIMEOUT)); BOOST_CHECK_EQUAL(m.getData(), "1"); // New member, TX not comitted, c1 should see nothing. @@ -287,7 +328,7 @@ QPID_AUTO_TEST_CASE_EXPECTED_FAILURES(testUpdateTxState, 1) { // After commit c1 shoudl see results of tx. c0.session.txCommit(); BOOST_CHECK_EQUAL(c1.session.queueQuery(arg::queue="q").getMessageCount(), 1u); - BOOST_CHECK(c1.subs.get(m, "q", TIME_SEC)); + BOOST_CHECK(c1.subs.get(m, "q", TIMEOUT)); BOOST_CHECK_EQUAL(m.getData(), "2"); // Another transaction with both members active. @@ -295,7 +336,7 @@ QPID_AUTO_TEST_CASE_EXPECTED_FAILURES(testUpdateTxState, 1) { BOOST_CHECK_EQUAL(c1.session.queueQuery(arg::queue="q").getMessageCount(), 0u); c0.session.txCommit(); BOOST_CHECK_EQUAL(c1.session.queueQuery(arg::queue="q").getMessageCount(), 1u); - BOOST_CHECK(c1.subs.get(m, "q", TIME_SEC)); + BOOST_CHECK(c1.subs.get(m, "q", TIMEOUT)); BOOST_CHECK_EQUAL(m.getData(), "3"); } @@ -318,7 +359,7 @@ QPID_AUTO_TEST_CASE(testUpdateMessageBuilder) { // No reliable way to ensure the partial message has arrived // before we start the new broker, so we sleep. - ::usleep(2500); + sys::usleep(2500); cluster.add(); // Send final 2 frames of message. @@ -328,7 +369,7 @@ QPID_AUTO_TEST_CASE(testUpdateMessageBuilder) { // Verify message is enqued correctly on second member. Message m; Client c1(cluster[1], "c1"); - BOOST_CHECK(c1.subs.get(m, "q", TIME_SEC)); + BOOST_CHECK(c1.subs.get(m, "q", TIMEOUT)); BOOST_CHECK_EQUAL(m.getData(), "abcd"); BOOST_CHECK_EQUAL(2u, knownBrokerPorts(c1.connection).size()); } @@ -391,20 +432,20 @@ QPID_AUTO_TEST_CASE(testUpdateConsumers) { // Activate the subscription, ensure message removed on all queues. c0.subs.setFlowControl("q", FlowControl::unlimited()); Message m; - BOOST_CHECK(c0.lq.get(m, TIME_SEC)); + BOOST_CHECK(c0.lq.get(m, TIMEOUT)); BOOST_CHECK_EQUAL(m.getData(), "aaa"); BOOST_CHECK_EQUAL(c0.session.queueQuery("q").getMessageCount(), 0u); BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), 0u); BOOST_CHECK_EQUAL(c2.session.queueQuery("q").getMessageCount(), 0u); // Check second subscription's flow control: gets first message, not second. - BOOST_CHECK(lp.get(m, TIME_SEC)); + BOOST_CHECK(lp.get(m, TIMEOUT)); BOOST_CHECK_EQUAL(m.getData(), "bbb"); BOOST_CHECK_EQUAL(c0.session.queueQuery("p").getMessageCount(), 1u); BOOST_CHECK_EQUAL(c1.session.queueQuery("p").getMessageCount(), 1u); BOOST_CHECK_EQUAL(c2.session.queueQuery("p").getMessageCount(), 1u); - BOOST_CHECK(c0.subs.get(m, "p", TIME_SEC)); + BOOST_CHECK(c0.subs.get(m, "p", TIMEOUT)); BOOST_CHECK_EQUAL(m.getData(), "ccc"); // Kill the subscribing member, ensure further messages are not removed. @@ -412,7 +453,7 @@ QPID_AUTO_TEST_CASE(testUpdateConsumers) { BOOST_REQUIRE_EQUAL(knownBrokerPorts(c1.connection, 2).size(), 2u); for (int i = 0; i < 10; ++i) { c1.session.messageTransfer(arg::content=Message("xxx", "q")); - BOOST_REQUIRE(c1.subs.get(m, "q", TIME_SEC)); + BOOST_REQUIRE(c1.subs.get(m, "q", TIMEOUT)); BOOST_REQUIRE_EQUAL(m.getData(), "xxx"); } } @@ -426,7 +467,7 @@ QPID_AUTO_TEST_CASE(testCatchupSharedState) { c0.session.messageTransfer(arg::content=Message("foo","q")); c0.session.messageTransfer(arg::content=Message("bar","q")); while (c0.session.queueQuery("q").getMessageCount() != 2) - ::usleep(1000); // Wait for message to show up on broker 0. + sys::usleep(1000); // Wait for message to show up on broker 0. // Add a new broker, it should catch up. cluster.add(); @@ -444,18 +485,18 @@ QPID_AUTO_TEST_CASE(testCatchupSharedState) { Client c1(cluster[1], "c1"); - BOOST_CHECK(c1.subs.get(m, "q", TIME_SEC)); + BOOST_CHECK(c1.subs.get(m, "q", TIMEOUT)); BOOST_CHECK_EQUAL(m.getData(), "foo"); - BOOST_CHECK(c1.subs.get(m, "q", TIME_SEC)); + BOOST_CHECK(c1.subs.get(m, "q", TIMEOUT)); BOOST_CHECK_EQUAL(m.getData(), "bar"); BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), 0u); // Add another broker, don't wait for join - should be stalled till ready. cluster.add(); Client c2(cluster[2], "c2"); - BOOST_CHECK(c2.subs.get(m, "p", TIME_SEC)); + BOOST_CHECK(c2.subs.get(m, "p", TIMEOUT)); BOOST_CHECK_EQUAL(m.getData(), "pfoo"); - BOOST_CHECK(c2.subs.get(m, "p", TIME_SEC)); + BOOST_CHECK(c2.subs.get(m, "p", TIMEOUT)); BOOST_CHECK_EQUAL(m.getData(), "pbar"); BOOST_CHECK_EQUAL(c2.session.queueQuery("p").getMessageCount(), 0u); } @@ -488,9 +529,9 @@ QPID_AUTO_TEST_CASE(testMessageEnqueue) { c0.session.close(); Client c1(cluster[1]); Message msg; - BOOST_CHECK(c1.subs.get(msg, "q", qpid::sys::TIME_SEC)); + BOOST_CHECK(c1.subs.get(msg, "q", TIMEOUT)); BOOST_CHECK_EQUAL(string("foo"), msg.getData()); - BOOST_CHECK(c1.subs.get(msg, "q", qpid::sys::TIME_SEC)); + BOOST_CHECK(c1.subs.get(msg, "q", TIMEOUT)); BOOST_CHECK_EQUAL(string("bar"), msg.getData()); } @@ -535,9 +576,9 @@ QPID_AUTO_TEST_CASE(testDequeueWaitingSubscription) { // Check they arrived Message m; - BOOST_CHECK(c0.lq.get(m, sys::TIME_SEC)); + BOOST_CHECK(c0.lq.get(m, TIMEOUT)); BOOST_CHECK_EQUAL("foo", m.getData()); - BOOST_CHECK(c0.lq.get(m, sys::TIME_SEC)); + BOOST_CHECK(c0.lq.get(m, TIMEOUT)); BOOST_CHECK_EQUAL("bar", m.getData()); // Queue should be empty on all cluster members. diff --git a/qpid/cpp/xml/cluster.xml b/qpid/cpp/xml/cluster.xml index c114ef0151..6dc0c722dd 100644 --- a/qpid/cpp/xml/cluster.xml +++ b/qpid/cpp/xml/cluster.xml @@ -43,6 +43,10 @@ + + + + @@ -126,6 +130,7 @@ + > -- cgit v1.2.1