From 4cb595e5f5a3f68b4abb1e1aca1922269fa2b559 Mon Sep 17 00:00:00 2001 From: Alan Conway Date: Wed, 15 Jun 2011 20:15:51 +0000 Subject: QPID-3280: Performance problem with TTL messages. When sending a large number of messages with nonzero TTLs to a cluster, overall message throughput drops by around 20-30% compared to messages with TTL 0. The previous approach to TTL in the cluster is replaced with a simpler "cluster clock". Also QueueCleaner is executed in the cluster timer, and modified to be deterministic in a cluster. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1136170 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/cpp/include/qpid/framing/FieldTable.h | 2 + qpid/cpp/src/CMakeLists.txt | 7 +- qpid/cpp/src/Makefile.am | 2 - qpid/cpp/src/qpid/broker/Broker.cpp | 3 +- qpid/cpp/src/qpid/broker/ExpiryPolicy.cpp | 10 +- qpid/cpp/src/qpid/broker/ExpiryPolicy.h | 12 ++- qpid/cpp/src/qpid/broker/Message.cpp | 55 +++++----- qpid/cpp/src/qpid/broker/Message.h | 28 +++-- qpid/cpp/src/qpid/broker/Queue.cpp | 140 +++++++++++++----------- qpid/cpp/src/qpid/broker/Queue.h | 37 ++++--- qpid/cpp/src/qpid/broker/QueueCleaner.cpp | 18 ++-- qpid/cpp/src/qpid/broker/QueueCleaner.h | 14 +-- qpid/cpp/src/qpid/broker/RateTracker.cpp | 51 --------- qpid/cpp/src/qpid/broker/RateTracker.h | 57 ---------- qpid/cpp/src/qpid/cluster/Cluster.cpp | 63 +++++++++-- qpid/cpp/src/qpid/cluster/Cluster.h | 14 ++- qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp | 1 + qpid/cpp/src/qpid/cluster/ClusterSettings.h | 3 +- qpid/cpp/src/qpid/cluster/ClusterTimer.cpp | 4 + qpid/cpp/src/qpid/cluster/Connection.cpp | 15 ++- qpid/cpp/src/qpid/cluster/Connection.h | 5 +- qpid/cpp/src/qpid/cluster/ExpiryPolicy.cpp | 95 +--------------- qpid/cpp/src/qpid/cluster/ExpiryPolicy.h | 42 +------- qpid/cpp/src/qpid/cluster/FailoverExchange.cpp | 26 +++-- qpid/cpp/src/qpid/cluster/FailoverExchange.h | 10 +- qpid/cpp/src/qpid/cluster/UpdateClient.cpp | 91 +++++++++++----- qpid/cpp/src/qpid/cluster/UpdateClient.h | 13 ++- qpid/cpp/src/qpid/cluster/UpdateExchange.cpp | 27 ++++- qpid/cpp/src/qpid/framing/AMQHeaderBody.h | 12 ++- qpid/cpp/src/qpid/sys/Timer.cpp | 6 ++ qpid/cpp/src/qpid/sys/Timer.h | 8 +- qpid/cpp/src/tests/QueueTest.cpp | 4 +- qpid/cpp/src/tests/cluster_test_logs.py | 2 +- qpid/cpp/src/tests/cluster_tests.py | 143 ++++++++++++++++++++++++- qpid/cpp/xml/cluster.xml | 40 ++++--- 35 files changed, 583 insertions(+), 477 deletions(-) delete mode 100644 qpid/cpp/src/qpid/broker/RateTracker.cpp delete mode 100644 qpid/cpp/src/qpid/broker/RateTracker.h diff --git a/qpid/cpp/include/qpid/framing/FieldTable.h b/qpid/cpp/include/qpid/framing/FieldTable.h index fed431129a..e8ec524863 100644 --- a/qpid/cpp/include/qpid/framing/FieldTable.h +++ b/qpid/cpp/include/qpid/framing/FieldTable.h @@ -65,6 +65,8 @@ class FieldTable QPID_COMMON_EXTERN void decode(Buffer& buffer); QPID_COMMON_EXTERN int count() const; + QPID_COMMON_EXTERN size_t size() const { return values.size(); } + QPID_COMMON_EXTERN bool empty() { return size() == 0; } QPID_COMMON_EXTERN void set(const std::string& name, const ValuePtr& value); QPID_COMMON_EXTERN ValuePtr get(const std::string& name) const; QPID_COMMON_INLINE_EXTERN bool isSet(const std::string& name) const { return get(name).get() != 0; } diff --git a/qpid/cpp/src/CMakeLists.txt b/qpid/cpp/src/CMakeLists.txt index 0fe2d7e4d0..11554b1034 100644 --- a/qpid/cpp/src/CMakeLists.txt +++ b/qpid/cpp/src/CMakeLists.txt @@ -96,7 +96,7 @@ MACRO (add_msvc_version_full verProject verProjectType verProjectFileExt verFN1 inherit_value ("winver_${verProject}_InternalName" "${verProject}") inherit_value ("winver_${verProject}_OriginalFilename" "${verProject}.${verProjectFileExt}") inherit_value ("winver_${verProject}_ProductName" "${winver_DESCRIPTION_SUMMARY}") - + # Create strings to be substituted into the template file set ("winverFileVersionBinary" "${winver_${verProject}_FileVersionBinary}") set ("winverProductVersionBinary" "${winver_${verProject}_ProductVersionBinary}") @@ -126,7 +126,7 @@ ENDMACRO (add_msvc_version_full) # MACRO (add_msvc_version verProject verProjectType verProjectFileExt) if (MSVC) - add_msvc_version_full (${verProject} + add_msvc_version_full (${verProject} ${verProjectType} ${verProjectFileExt} ${winver_FILE_VERSION_N1} @@ -656,7 +656,7 @@ if (CMAKE_SYSTEM_NAME STREQUAL Windows) set (qpidd_platform_SOURCES windows/QpiddBroker.cpp ) - + set (qpidmessaging_platform_SOURCES qpid/messaging/HandleInstantiator.cpp ) @@ -997,7 +997,6 @@ set (qpidbroker_SOURCES qpid/broker/QueuePolicy.cpp qpid/broker/QueueRegistry.cpp qpid/broker/QueueFlowLimit.cpp - qpid/broker/RateTracker.cpp qpid/broker/RecoveryManagerImpl.cpp qpid/broker/RecoveredEnqueue.cpp qpid/broker/RecoveredDequeue.cpp diff --git a/qpid/cpp/src/Makefile.am b/qpid/cpp/src/Makefile.am index 15021cc68b..608afb0660 100644 --- a/qpid/cpp/src/Makefile.am +++ b/qpid/cpp/src/Makefile.am @@ -616,8 +616,6 @@ libqpidbroker_la_SOURCES = \ qpid/broker/QueueFlowLimit.h \ qpid/broker/QueueFlowLimit.cpp \ qpid/broker/RateFlowcontrol.h \ - qpid/broker/RateTracker.cpp \ - qpid/broker/RateTracker.h \ qpid/broker/RecoverableConfig.h \ qpid/broker/RecoverableExchange.h \ qpid/broker/RecoverableMessage.h \ diff --git a/qpid/cpp/src/qpid/broker/Broker.cpp b/qpid/cpp/src/qpid/broker/Broker.cpp index ca3be5b567..f80e0f1e61 100644 --- a/qpid/cpp/src/qpid/broker/Broker.cpp +++ b/qpid/cpp/src/qpid/broker/Broker.cpp @@ -188,7 +188,7 @@ Broker::Broker(const Broker::Options& conf) : conf.replayFlushLimit*1024, // convert kb to bytes. conf.replayHardLimit*1024), *this), - queueCleaner(queues, timer), + queueCleaner(queues, &timer), queueEvents(poller,!conf.asyncQueueEvents), recovery(true), inCluster(false), @@ -750,6 +750,7 @@ bool Broker::deferDeliveryImpl(const std::string& , void Broker::setClusterTimer(std::auto_ptr t) { clusterTimer = t; + queueCleaner.setTimer(clusterTimer.get()); } const std::string Broker::TCP_TRANSPORT("tcp"); diff --git a/qpid/cpp/src/qpid/broker/ExpiryPolicy.cpp b/qpid/cpp/src/qpid/broker/ExpiryPolicy.cpp index 64a12d918a..62cb3fc116 100644 --- a/qpid/cpp/src/qpid/broker/ExpiryPolicy.cpp +++ b/qpid/cpp/src/qpid/broker/ExpiryPolicy.cpp @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -27,12 +27,12 @@ namespace broker { ExpiryPolicy::~ExpiryPolicy() {} -void ExpiryPolicy::willExpire(Message&) {} - bool ExpiryPolicy::hasExpired(Message& m) { return m.getExpiration() < sys::AbsTime::now(); } -void ExpiryPolicy::forget(Message&) {} +sys::AbsTime ExpiryPolicy::getCurrentTime() { + return sys::AbsTime::now(); +} }} // namespace qpid::broker diff --git a/qpid/cpp/src/qpid/broker/ExpiryPolicy.h b/qpid/cpp/src/qpid/broker/ExpiryPolicy.h index a723eb0aa8..2caf00ce00 100644 --- a/qpid/cpp/src/qpid/broker/ExpiryPolicy.h +++ b/qpid/cpp/src/qpid/broker/ExpiryPolicy.h @@ -10,9 +10,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -26,6 +26,11 @@ #include "qpid/broker/BrokerImportExport.h" namespace qpid { + +namespace sys { +class AbsTime; +} + namespace broker { class Message; @@ -37,9 +42,8 @@ class QPID_BROKER_CLASS_EXTERN ExpiryPolicy : public RefCounted { public: QPID_BROKER_EXTERN virtual ~ExpiryPolicy(); - QPID_BROKER_EXTERN virtual void willExpire(Message&); QPID_BROKER_EXTERN virtual bool hasExpired(Message&); - QPID_BROKER_EXTERN virtual void forget(Message&); + QPID_BROKER_EXTERN virtual qpid::sys::AbsTime getCurrentTime(); }; }} // namespace qpid::broker diff --git a/qpid/cpp/src/qpid/broker/Message.cpp b/qpid/cpp/src/qpid/broker/Message.cpp index 7545e4941f..d694d1eafd 100644 --- a/qpid/cpp/src/qpid/broker/Message.cpp +++ b/qpid/cpp/src/qpid/broker/Message.cpp @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -49,25 +49,21 @@ TransferAdapter Message::TRANSFER; Message::Message(const framing::SequenceNumber& id) : frames(id), persistenceId(0), redelivered(false), loaded(false), - staged(false), forcePersistentPolicy(false), publisher(0), adapter(0), + staged(false), forcePersistentPolicy(false), publisher(0), adapter(0), expiration(FAR_FUTURE), dequeueCallback(0), inCallback(false), requiredCredit(0), isManagementMessage(false) {} Message::Message(const Message& original) : PersistableMessage(), frames(original.frames), persistenceId(0), redelivered(false), loaded(false), - staged(false), forcePersistentPolicy(false), publisher(0), adapter(0), + staged(false), forcePersistentPolicy(false), publisher(0), adapter(0), expiration(original.expiration), dequeueCallback(0), inCallback(false), requiredCredit(0) { setExpiryPolicy(original.expiryPolicy); } -Message::~Message() -{ - if (expiryPolicy) - expiryPolicy->forget(*this); -} +Message::~Message() {} void Message::forcePersistent() { @@ -87,7 +83,7 @@ std::string Message::getRoutingKey() const return getAdapter().getRoutingKey(frames); } -std::string Message::getExchangeName() const +std::string Message::getExchangeName() const { return getAdapter().getExchange(frames); } @@ -96,7 +92,7 @@ const boost::shared_ptr Message::getExchange(ExchangeRegistry& registr { if (!exchange) { exchange = registry.get(getExchangeName()); - } + } return exchange; } @@ -196,7 +192,7 @@ void Message::decodeContent(framing::Buffer& buffer) } else { //adjust header flags MarkLastSegment f; - frames.map_if(f, TypeFilter()); + frames.map_if(f, TypeFilter()); } //mark content loaded loaded = true; @@ -248,7 +244,7 @@ void Message::destroy() bool Message::getContentFrame(const Queue& queue, AMQFrame& frame, uint16_t maxContentSize, uint64_t offset) const { intrusive_ptr pmsg(this); - + bool done = false; string& data = frame.castBody()->getData(); store->loadContent(queue, pmsg, data, offset, maxContentSize); @@ -273,7 +269,7 @@ void Message::sendContent(const Queue& queue, framing::FrameHandler& out, uint16 uint16_t maxContentSize = maxFrameSize - AMQFrame::frameOverhead(); bool morecontent = true; for (uint64_t offset = 0; morecontent; offset += maxContentSize) - { + { AMQFrame frame((AMQContentBody())); morecontent = getContentFrame(queue, frame, maxContentSize, offset); out.handle(frame); @@ -291,7 +287,7 @@ void Message::sendHeader(framing::FrameHandler& out, uint16_t /*maxFrameSize*/) { sys::Mutex::ScopedLock l(lock); Relay f(out); - frames.map_if(f, TypeFilter()); + frames.map_if(f, TypeFilter()); } // TODO aconway 2007-11-09: Obsolete, remove. Was used to cover over @@ -321,7 +317,7 @@ bool Message::isContentLoaded() const } -namespace +namespace { const std::string X_QPID_TRACE("x-qpid.trace"); } @@ -358,13 +354,13 @@ void Message::addTraceId(const std::string& id) trace += ","; trace += id; headers.setString(X_QPID_TRACE, trace); - } + } } } -void Message::setTimestamp(const boost::intrusive_ptr& e) +void Message::setTimestamp(const boost::intrusive_ptr& e) { - DeliveryProperties* props = getProperties(); + DeliveryProperties* props = getProperties(); if (props->getTtl()) { // AMQP requires setting the expiration property to be posix // time_t in seconds. TTL is in milliseconds @@ -373,10 +369,14 @@ void Message::setTimestamp(const boost::intrusive_ptr& e) time_t now = ::time(0); props->setExpiration(now + (props->getTtl()/1000)); } - // Use higher resolution time for the internal expiry calculation. - Duration ttl(std::min(props->getTtl() * TIME_MSEC, (uint64_t) std::numeric_limits::max()));//Prevent overflow - expiration = AbsTime(AbsTime::now(), ttl); - setExpiryPolicy(e); + if (e) { + // Use higher resolution time for the internal expiry calculation. + // Prevent overflow as a signed int64_t + Duration ttl(std::min(props->getTtl() * TIME_MSEC, + (uint64_t) std::numeric_limits::max())); + expiration = AbsTime(e->getCurrentTime(), ttl); + setExpiryPolicy(e); + } } } @@ -386,16 +386,17 @@ void Message::adjustTtl() if (props->getTtl()) { sys::Mutex::ScopedLock l(lock); if (expiration < FAR_FUTURE) { - sys::Duration d(sys::AbsTime::now(), getExpiration()); - props->setTtl(int64_t(d) >= 1000000 ? int64_t(d)/1000000 : 1); // convert from ns to ms; set to 1 if expired + sys::AbsTime current( + expiryPolicy ? expiryPolicy->getCurrentTime() : sys::AbsTime::now()); + sys::Duration ttl(current, getExpiration()); + // convert from ns to ms; set to 1 if expired + props->setTtl(int64_t(ttl) >= 1000000 ? int64_t(ttl)/1000000 : 1); } } } void Message::setExpiryPolicy(const boost::intrusive_ptr& e) { expiryPolicy = e; - if (expiryPolicy) - expiryPolicy->willExpire(*this); } bool Message::hasExpired() diff --git a/qpid/cpp/src/qpid/broker/Message.h b/qpid/cpp/src/qpid/broker/Message.h index d85ee434db..e1d6c60a80 100644 --- a/qpid/cpp/src/qpid/broker/Message.h +++ b/qpid/cpp/src/qpid/broker/Message.h @@ -10,9 +10,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -34,12 +34,12 @@ #include namespace qpid { - + namespace framing { class FieldTable; class SequenceNumber; } - + namespace broker { class ConnectionToken; class Exchange; @@ -51,11 +51,11 @@ class ExpiryPolicy; class Message : public PersistableMessage { public: typedef boost::function&)> MessageCallback; - + QPID_BROKER_EXTERN Message(const framing::SequenceNumber& id = framing::SequenceNumber()); QPID_BROKER_EXTERN Message(const Message&); QPID_BROKER_EXTERN ~Message(); - + uint64_t getPersistenceId() const { return persistenceId; } void setPersistenceId(uint64_t _persistenceId) const { persistenceId = _persistenceId; } @@ -83,10 +83,11 @@ public: void setExpiryPolicy(const boost::intrusive_ptr& e); bool hasExpired(); sys::AbsTime getExpiration() const { return expiration; } + void setExpiration(sys::AbsTime exp) { expiration = exp; } void adjustTtl(); - framing::FrameSet& getFrames() { return frames; } - const framing::FrameSet& getFrames() const { return frames; } + framing::FrameSet& getFrames() { return frames; } + const framing::FrameSet& getFrames() const { return frames; } template T* getProperties() { qpid::framing::AMQHeaderBody* p = frames.getHeaders(); @@ -103,6 +104,11 @@ public: return p->get(); } + template void eraseProperties() { + qpid::framing::AMQHeaderBody* p = frames.getHeaders(); + p->erase(); + } + template const T* getMethod() const { return frames.as(); } @@ -135,7 +141,7 @@ public: QPID_BROKER_EXTERN void decodeHeader(framing::Buffer& buffer); QPID_BROKER_EXTERN void decodeContent(framing::Buffer& buffer); - + void QPID_BROKER_EXTERN tryReleaseContent(); void releaseContent(); void releaseContent(MessageStore* s);//deprecated, use 'setStore(store); releaseContent();' instead @@ -149,10 +155,10 @@ public: bool isExcluded(const std::vector& excludes) const; void addTraceId(const std::string& id); - + void forcePersistent(); bool isForcedPersistent(); - + /** Call cb when dequeue is complete, may call immediately. Holds cb by reference. */ void setDequeueCompleteCallback(MessageCallback& cb); diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp index 789ad581f5..bacc612e11 100644 --- a/qpid/cpp/src/qpid/broker/Queue.cpp +++ b/qpid/cpp/src/qpid/broker/Queue.cpp @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -65,7 +65,7 @@ using std::mem_fun; namespace _qmf = qmf::org::apache::qpid::broker; -namespace +namespace { const std::string qpidMaxSize("qpid.max_size"); const std::string qpidMaxCount("qpid.max_count"); @@ -87,16 +87,16 @@ const int ENQUEUE_ONLY=1; const int ENQUEUE_AND_DEQUEUE=2; } -Queue::Queue(const string& _name, bool _autodelete, +Queue::Queue(const string& _name, bool _autodelete, MessageStore* const _store, const OwnershipToken* const _owner, Manageable* parent, Broker* b) : - name(_name), + name(_name), autodelete(_autodelete), store(_store), - owner(_owner), + owner(_owner), consumerCount(0), exclusive(0), noLocal(false), @@ -179,9 +179,9 @@ void Queue::recover(boost::intrusive_ptr& msg){ if (policy.get()) policy->recoverEnqueued(msg); push(msg, true); - if (store){ + if (store){ // setup synclist for recovered messages, so they don't get re-stored on lastNodeFailure - msg->addToSyncList(shared_from_this(), store); + msg->addToSyncList(shared_from_this(), store); } if (store && (!msg->isContentLoaded() || msg->checkContentReleasable())) { @@ -206,13 +206,13 @@ void Queue::process(boost::intrusive_ptr& msg){ void Queue::requeue(const QueuedMessage& msg){ assertClusterSafe(); QueueListeners::NotificationSet copy; - { + { Mutex::ScopedLock locker(messageLock); if (!isEnqueued(msg)) return; messages->reinsert(msg); listeners.populate(copy); - // for persistLastNode - don't force a message twice to disk, but force it if no force before + // for persistLastNode - don't force a message twice to disk, but force it if no force before if(inLastNodeFailure && persistLastNode && !msg.payload->isStoredOnQueue(shared_from_this())) { msg.payload->forcePersistent(); if (msg.payload->isForcedPersistent() ){ @@ -224,7 +224,7 @@ void Queue::requeue(const QueuedMessage& msg){ copy.notify(); } -bool Queue::acquireMessageAt(const SequenceNumber& position, QueuedMessage& message) +bool Queue::acquireMessageAt(const SequenceNumber& position, QueuedMessage& message) { Mutex::ScopedLock locker(messageLock); assertClusterSafe(); @@ -268,7 +268,7 @@ bool Queue::getNextMessage(QueuedMessage& m, Consumer::shared_ptr c) case NO_MESSAGES: default: return false; - } + } } else { return browseNextMessage(m, c); } @@ -278,7 +278,7 @@ Queue::ConsumeCode Queue::consumeNextMessage(QueuedMessage& m, Consumer::shared_ { while (true) { Mutex::ScopedLock locker(messageLock); - if (messages->empty()) { + if (messages->empty()) { QPID_LOG(debug, "No messages to dispatch on queue '" << name << "'"); listeners.addListener(c); return NO_MESSAGES; @@ -291,7 +291,7 @@ Queue::ConsumeCode Queue::consumeNextMessage(QueuedMessage& m, Consumer::shared_ } if (c->filter(msg.payload)) { - if (c->accept(msg.payload)) { + if (c->accept(msg.payload)) { m = msg; pop(); return CONSUMED; @@ -304,7 +304,7 @@ Queue::ConsumeCode Queue::consumeNextMessage(QueuedMessage& m, Consumer::shared_ //consumer will never want this message QPID_LOG(debug, "Consumer doesn't want message from '" << name << "'"); return CANT_CONSUME; - } + } } } } @@ -358,7 +358,7 @@ bool Queue::dispatch(Consumer::shared_ptr c) } } -// Find the next message +// Find the next message bool Queue::seek(QueuedMessage& msg, Consumer::shared_ptr c) { Mutex::ScopedLock locker(messageLock); if (messages->next(c->position, msg)) { @@ -426,19 +426,25 @@ bool collect_if_expired(std::deque& expired, QueuedMessage& messa } } -void Queue::purgeExpired() +/** + *@param lapse: time since the last purgeExpired + */ +void Queue::purgeExpired(sys::Duration lapse) { //As expired messages are discarded during dequeue also, only //bother explicitly expiring if the rate of dequeues since last - //attempt is less than one per second. - - if (dequeueTracker.sampleRatePerSecond() < 1) { + //attempt is less than one per second. + int count = dequeueSincePurge.get(); + dequeueSincePurge -= count; + int seconds = int64_t(lapse)/sys::TIME_SEC; + if (seconds == 0 || count / seconds < 1) { std::deque expired; { Mutex::ScopedLock locker(messageLock); messages->removeIf(boost::bind(&collect_if_expired, boost::ref(expired), _1)); } - for_each(expired.begin(), expired.end(), boost::bind(&Queue::dequeue, this, (TransactionContext*) 0, _1)); + for_each(expired.begin(), expired.end(), + boost::bind(&Queue::dequeue, this, (TransactionContext*) 0, _1)); } } @@ -457,7 +463,7 @@ void Queue::purgeExpired() uint32_t Queue::purge(const uint32_t purge_request, boost::shared_ptr dest) { Mutex::ScopedLock locker(messageLock); - uint32_t purge_count = purge_request; // only comes into play if >0 + uint32_t purge_count = purge_request; // only comes into play if >0 std::deque rerouteQueue; uint32_t count = 0; @@ -490,7 +496,7 @@ uint32_t Queue::purge(const uint32_t purge_request, boost::shared_ptr uint32_t Queue::move(const Queue::shared_ptr destq, uint32_t qty) { Mutex::ScopedLock locker(messageLock); - uint32_t move_count = qty; // only comes into play if qty >0 + uint32_t move_count = qty; // only comes into play if qty >0 uint32_t count = 0; // count how many were moved for returning while((!qty || move_count--) && !messages->empty()) { @@ -508,7 +514,7 @@ void Queue::pop() { assertClusterSafe(); messages->pop(); - ++dequeueTracker; + ++dequeueSincePurge; } void Queue::push(boost::intrusive_ptr& msg, bool isRecovery){ @@ -517,10 +523,10 @@ void Queue::push(boost::intrusive_ptr& msg, bool isRecovery){ QueuedMessage removed; bool dequeueRequired = false; { - Mutex::ScopedLock locker(messageLock); + Mutex::ScopedLock locker(messageLock); QueuedMessage qm(this, msg, ++sequence); if (insertSeqNo) msg->getOrInsertHeaders().setInt64(seqNoKey, sequence); - + dequeueRequired = messages->push(qm, removed); listeners.populate(copy); enqueued(qm); @@ -599,7 +605,7 @@ void Queue::setLastNodeFailure() } -// return true if store exists, +// return true if store exists, bool Queue::enqueue(TransactionContext* ctxt, boost::intrusive_ptr& msg, bool suppressPolicyCheck) { ScopedUse u(barrier); @@ -613,13 +619,13 @@ bool Queue::enqueue(TransactionContext* ctxt, boost::intrusive_ptr& msg policy->getPendingDequeues(dequeues); } //depending on policy, may have some dequeues that need to performed without holding the lock - for_each(dequeues.begin(), dequeues.end(), boost::bind(&Queue::dequeue, this, (TransactionContext*) 0, _1)); + for_each(dequeues.begin(), dequeues.end(), boost::bind(&Queue::dequeue, this, (TransactionContext*) 0, _1)); } if (inLastNodeFailure && persistLastNode){ msg->forcePersistent(); } - + if (traceId.size()) { //copy on write: take deep copy of message before modifying it //as the frames may already be available for delivery on other @@ -649,10 +655,10 @@ bool Queue::enqueue(TransactionContext* ctxt, boost::intrusive_ptr& msg void Queue::enqueueAborted(boost::intrusive_ptr msg) { Mutex::ScopedLock locker(messageLock); - if (policy.get()) policy->enqueueAborted(msg); + if (policy.get()) policy->enqueueAborted(msg); } -// return true if store exists, +// return true if store exists, bool Queue::dequeue(TransactionContext* ctxt, const QueuedMessage& msg) { ScopedUse u(barrier); @@ -661,7 +667,7 @@ bool Queue::dequeue(TransactionContext* ctxt, const QueuedMessage& msg) { Mutex::ScopedLock locker(messageLock); if (!isEnqueued(msg)) return false; - if (!ctxt) { + if (!ctxt) { dequeued(msg); } } @@ -682,7 +688,7 @@ bool Queue::dequeue(TransactionContext* ctxt, const QueuedMessage& msg) void Queue::dequeueCommitted(const QueuedMessage& msg) { Mutex::ScopedLock locker(messageLock); - dequeued(msg); + dequeued(msg); if (mgmtObject != 0) { mgmtObject->inc_msgTxnDequeues(); mgmtObject->inc_byteTxnDequeues(msg.payload->contentSize()); @@ -737,8 +743,8 @@ int getIntegerSetting(const qpid::framing::FieldTable& settings, const std::stri return v->get(); } else if (v->convertsTo()){ std::string s = v->get(); - try { - return boost::lexical_cast(s); + try { + return boost::lexical_cast(s); } catch(const boost::bad_lexical_cast&) { QPID_LOG(warning, "Ignoring invalid integer value for " << key << ": " << s); return 0; @@ -762,7 +768,7 @@ void Queue::configureImpl(const FieldTable& _settings) broker->getQueueEvents().observe(*this, eventMode == ENQUEUE_ONLY); } - if (QueuePolicy::getType(_settings) == QueuePolicy::FLOW_TO_DISK && + if (QueuePolicy::getType(_settings) == QueuePolicy::FLOW_TO_DISK && (!store || NullMessageStore::isNullStore(store) || (broker && !(broker->getQueueEvents().isSync())) )) { if ( NullMessageStore::isNullStore(store)) { QPID_LOG(warning, "Flow to disk not valid for non-persisted queue:" << getName()); @@ -800,7 +806,7 @@ void Queue::configureImpl(const FieldTable& _settings) QPID_LOG(debug, "Configured queue " << getName() << " as priority queue."); } } - + persistLastNode= _settings.get(qpidPersistLastNode); if (persistLastNode) QPID_LOG(debug, "Configured queue to Persist data if cluster fails to one node for: " << getName()); @@ -809,15 +815,15 @@ void Queue::configureImpl(const FieldTable& _settings) if (excludeList.size()) { split(traceExclude, excludeList, ", "); } - QPID_LOG(debug, "Configured queue " << getName() << " with qpid.trace.id='" << traceId + QPID_LOG(debug, "Configured queue " << getName() << " with qpid.trace.id='" << traceId << "' and qpid.trace.exclude='"<< excludeList << "' i.e. " << traceExclude.size() << " elements"); FieldTable::ValuePtr p =_settings.get(qpidInsertSequenceNumbers); if (p && p->convertsTo()) insertSequenceNumbers(p->get()); autoDeleteTimeout = getIntegerSetting(_settings, qpidAutoDeleteTimeout); - if (autoDeleteTimeout) - QPID_LOG(debug, "Configured queue " << getName() << " with qpid.auto_delete_timeout=" << autoDeleteTimeout); + if (autoDeleteTimeout) + QPID_LOG(debug, "Configured queue " << getName() << " with qpid.auto_delete_timeout=" << autoDeleteTimeout); if (mgmtObject != 0) { mgmtObject->set_arguments(ManagementAgent::toMap(_settings)); @@ -881,9 +887,9 @@ const QueuePolicy* Queue::getPolicy() return policy.get(); } -uint64_t Queue::getPersistenceId() const -{ - return persistenceId; +uint64_t Queue::getPersistenceId() const +{ + return persistenceId; } void Queue::setPersistenceId(uint64_t _persistenceId) const @@ -897,11 +903,11 @@ void Queue::setPersistenceId(uint64_t _persistenceId) const persistenceId = _persistenceId; } -void Queue::encode(Buffer& buffer) const +void Queue::encode(Buffer& buffer) const { buffer.putShortString(name); buffer.put(settings); - if (policy.get()) { + if (policy.get()) { buffer.put(*policy); } buffer.putShortString(alternateExchange.get() ? alternateExchange->getName() : std::string("")); @@ -954,7 +960,7 @@ boost::shared_ptr Queue::getAlternateExchange() void tryAutoDeleteImpl(Broker& broker, Queue::shared_ptr queue) { - if (broker.getQueues().destroyIf(queue->getName(), + if (broker.getQueues().destroyIf(queue->getName(), boost::bind(boost::mem_fn(&Queue::canAutoDelete), queue))) { QPID_LOG(debug, "Auto-deleting " << queue->getName()); queue->destroyed(); @@ -966,7 +972,7 @@ struct AutoDeleteTask : qpid::sys::TimerTask Broker& broker; Queue::shared_ptr queue; - AutoDeleteTask(Broker& b, Queue::shared_ptr q, AbsTime fireTime) + AutoDeleteTask(Broker& b, Queue::shared_ptr q, AbsTime fireTime) : qpid::sys::TimerTask(fireTime, "DelayedAutoDeletion"), broker(b), queue(q) {} void fire() @@ -984,27 +990,27 @@ void Queue::tryAutoDelete(Broker& broker, Queue::shared_ptr queue) if (queue->autoDeleteTimeout && queue->canAutoDelete()) { AbsTime time(now(), Duration(queue->autoDeleteTimeout * TIME_SEC)); queue->autoDeleteTask = boost::intrusive_ptr(new AutoDeleteTask(broker, queue, time)); - broker.getClusterTimer().add(queue->autoDeleteTask); + broker.getClusterTimer().add(queue->autoDeleteTask); QPID_LOG(debug, "Timed auto-delete for " << queue->getName() << " initiated"); } else { tryAutoDeleteImpl(broker, queue); } } -bool Queue::isExclusiveOwner(const OwnershipToken* const o) const -{ +bool Queue::isExclusiveOwner(const OwnershipToken* const o) const +{ Mutex::ScopedLock locker(ownershipLock); - return o == owner; + return o == owner; } -void Queue::releaseExclusiveOwnership() -{ +void Queue::releaseExclusiveOwnership() +{ Mutex::ScopedLock locker(ownershipLock); - owner = 0; + owner = 0; } -bool Queue::setExclusiveOwner(const OwnershipToken* const o) -{ +bool Queue::setExclusiveOwner(const OwnershipToken* const o) +{ //reset auto deletion timer if necessary if (autoDeleteTimeout && autoDeleteTask) { autoDeleteTask->cancel(); @@ -1013,25 +1019,25 @@ bool Queue::setExclusiveOwner(const OwnershipToken* const o) if (owner) { return false; } else { - owner = o; + owner = o; return true; } } -bool Queue::hasExclusiveOwner() const -{ +bool Queue::hasExclusiveOwner() const +{ Mutex::ScopedLock locker(ownershipLock); - return owner != 0; + return owner != 0; } -bool Queue::hasExclusiveConsumer() const -{ - return exclusive; +bool Queue::hasExclusiveConsumer() const +{ + return exclusive; } void Queue::setExternalQueueStore(ExternalQueueStore* inst) { - if (externalQueueStore!=inst && externalQueueStore) - delete externalQueueStore; + if (externalQueueStore!=inst && externalQueueStore) + delete externalQueueStore; externalQueueStore = inst; if (inst) { @@ -1197,6 +1203,10 @@ const Broker* Queue::getBroker() return broker; } +void Queue::setDequeueSincePurge(uint32_t value) { + dequeueSincePurge = AtomicValue(value); +} + Queue::UsageBarrier::UsageBarrier(Queue& q) : parent(q), count(0) {} diff --git a/qpid/cpp/src/qpid/broker/Queue.h b/qpid/cpp/src/qpid/broker/Queue.h index c4f1bcc07e..8435e75cab 100644 --- a/qpid/cpp/src/qpid/broker/Queue.h +++ b/qpid/cpp/src/qpid/broker/Queue.h @@ -10,9 +10,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -32,9 +32,9 @@ #include "qpid/broker/QueueBindings.h" #include "qpid/broker/QueueListeners.h" #include "qpid/broker/QueueObserver.h" -#include "qpid/broker/RateTracker.h" #include "qpid/framing/FieldTable.h" +#include "qpid/sys/AtomicValue.h" #include "qpid/sys/Monitor.h" #include "qpid/sys/Timer.h" #include "qpid/management/Manageable.h" @@ -74,13 +74,13 @@ class Queue : public boost::enable_shared_from_this, { Queue& parent; uint count; - + UsageBarrier(Queue&); bool acquire(); void release(); void destroy(); }; - + struct ScopedUse { UsageBarrier& barrier; @@ -88,7 +88,7 @@ class Queue : public boost::enable_shared_from_this, ScopedUse(UsageBarrier& b) : barrier(b), acquired(barrier.acquire()) {} ~ScopedUse() { if (acquired) barrier.release(); } }; - + typedef std::set< boost::shared_ptr > Observers; enum ConsumeCode {NO_MESSAGES=0, CANT_CONSUME=1, CONSUMED=2}; @@ -119,7 +119,7 @@ class Queue : public boost::enable_shared_from_this, boost::shared_ptr alternateExchange; framing::SequenceNumber sequence; qmf::org::apache::qpid::broker::Queue* mgmtObject; - RateTracker dequeueTracker; + sys::AtomicValue dequeueSincePurge; // Count dequeues since last purge. int eventMode; Observers observers; bool insertSeqNo; @@ -146,7 +146,7 @@ class Queue : public boost::enable_shared_from_this, void dequeued(const QueuedMessage& msg); void pop(); void popAndDequeue(); - QueuedMessage getFront(); + void forcePersistent(QueuedMessage& msg); int getEventMode(); void configureImpl(const qpid::framing::FieldTable& settings); @@ -184,8 +184,8 @@ class Queue : public boost::enable_shared_from_this, typedef std::vector vector; QPID_BROKER_EXTERN Queue(const std::string& name, - bool autodelete = false, - MessageStore* const store = 0, + bool autodelete = false, + MessageStore* const store = 0, const OwnershipToken* const owner = 0, management::Manageable* parent = 0, Broker* broker = 0); @@ -245,11 +245,11 @@ class Queue : public boost::enable_shared_from_this, bool exclusive = false); QPID_BROKER_EXTERN void cancel(Consumer::shared_ptr c); - uint32_t purge(const uint32_t purge_request=0, boost::shared_ptr dest=boost::shared_ptr()); //defaults to all messages - QPID_BROKER_EXTERN void purgeExpired(); + uint32_t purge(const uint32_t purge_request=0, boost::shared_ptr dest=boost::shared_ptr()); //defaults to all messages + QPID_BROKER_EXTERN void purgeExpired(sys::Duration); //move qty # of messages to destination Queue destq - uint32_t move(const Queue::shared_ptr destq, uint32_t qty); + uint32_t move(const Queue::shared_ptr destq, uint32_t qty); QPID_BROKER_EXTERN uint32_t getMessageCount() const; QPID_BROKER_EXTERN uint32_t getEnqueueCompleteMessageCount() const; @@ -288,8 +288,8 @@ class Queue : public boost::enable_shared_from_this, * Inform queue of messages that were enqueued, have since * been acquired but not yet accepted or released (and * thus are still logically on the queue) - used in - * clustered broker. - */ + * clustered broker. + */ void updateEnqueued(const QueuedMessage& msg); /** @@ -300,9 +300,9 @@ class Queue : public boost::enable_shared_from_this, * accepted it). */ bool isEnqueued(const QueuedMessage& msg); - + /** - * Gets the next available message + * Gets the next available message */ QPID_BROKER_EXTERN QueuedMessage get(); @@ -382,6 +382,9 @@ class Queue : public boost::enable_shared_from_this, void flush(); const Broker* getBroker(); + + uint32_t getDequeueSincePurge() { return dequeueSincePurge.get(); } + void setDequeueSincePurge(uint32_t value); }; } } diff --git a/qpid/cpp/src/qpid/broker/QueueCleaner.cpp b/qpid/cpp/src/qpid/broker/QueueCleaner.cpp index 3499ea8a4d..838bc28be8 100644 --- a/qpid/cpp/src/qpid/broker/QueueCleaner.cpp +++ b/qpid/cpp/src/qpid/broker/QueueCleaner.cpp @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -27,7 +27,7 @@ namespace qpid { namespace broker { -QueueCleaner::QueueCleaner(QueueRegistry& q, sys::Timer& t) : queues(q), timer(t) {} +QueueCleaner::QueueCleaner(QueueRegistry& q, sys::Timer* t) : queues(q), timer(t) {} QueueCleaner::~QueueCleaner() { @@ -36,10 +36,16 @@ QueueCleaner::~QueueCleaner() void QueueCleaner::start(qpid::sys::Duration p) { + period = p; task = new Task(*this, p); - timer.add(task); + timer->add(task); } +void QueueCleaner::setTimer(qpid::sys::Timer* timer) { + this->timer = timer; +} + + QueueCleaner::Task::Task(QueueCleaner& p, qpid::sys::Duration d) : sys::TimerTask(d,"QueueCleaner"), parent(p) {} void QueueCleaner::Task::fire() @@ -65,9 +71,9 @@ void QueueCleaner::fired() std::vector copy; CollectQueues collect(©); queues.eachQueue(collect); - std::for_each(copy.begin(), copy.end(), boost::bind(&Queue::purgeExpired, _1)); + std::for_each(copy.begin(), copy.end(), boost::bind(&Queue::purgeExpired, _1, period)); task->setupNextFire(); - timer.add(task); + timer->add(task); } diff --git a/qpid/cpp/src/qpid/broker/QueueCleaner.h b/qpid/cpp/src/qpid/broker/QueueCleaner.h index 11c2d180ac..ffebfe3e1b 100644 --- a/qpid/cpp/src/qpid/broker/QueueCleaner.h +++ b/qpid/cpp/src/qpid/broker/QueueCleaner.h @@ -10,9 +10,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -35,14 +35,15 @@ class QueueRegistry; class QueueCleaner { public: - QPID_BROKER_EXTERN QueueCleaner(QueueRegistry& queues, sys::Timer& timer); + QPID_BROKER_EXTERN QueueCleaner(QueueRegistry& queues, sys::Timer* timer); QPID_BROKER_EXTERN ~QueueCleaner(); - QPID_BROKER_EXTERN void start(qpid::sys::Duration period); + QPID_BROKER_EXTERN void start(sys::Duration period); + QPID_BROKER_EXTERN void setTimer(sys::Timer* timer); private: class Task : public sys::TimerTask { public: - Task(QueueCleaner& parent, qpid::sys::Duration duration); + Task(QueueCleaner& parent, sys::Duration duration); void fire(); private: QueueCleaner& parent; @@ -50,7 +51,8 @@ class QueueCleaner boost::intrusive_ptr task; QueueRegistry& queues; - sys::Timer& timer; + sys::Timer* timer; + sys::Duration period; void fired(); }; diff --git a/qpid/cpp/src/qpid/broker/RateTracker.cpp b/qpid/cpp/src/qpid/broker/RateTracker.cpp deleted file mode 100644 index 048349b658..0000000000 --- a/qpid/cpp/src/qpid/broker/RateTracker.cpp +++ /dev/null @@ -1,51 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include "qpid/broker/RateTracker.h" - -using qpid::sys::AbsTime; -using qpid::sys::Duration; -using qpid::sys::TIME_SEC; - -namespace qpid { -namespace broker { - -RateTracker::RateTracker() : currentCount(0), lastCount(0), lastTime(AbsTime::now()) {} - -RateTracker& RateTracker::operator++() -{ - ++currentCount; - return *this; -} - -double RateTracker::sampleRatePerSecond() -{ - int32_t increment = currentCount - lastCount; - AbsTime now = AbsTime::now(); - Duration interval(lastTime, now); - lastCount = currentCount; - lastTime = now; - //if sampling at higher frequency than supported, will just return the number of increments - if (interval < TIME_SEC) return increment; - else if (increment == 0) return 0; - else return increment / (interval / TIME_SEC); -} - -}} // namespace qpid::broker diff --git a/qpid/cpp/src/qpid/broker/RateTracker.h b/qpid/cpp/src/qpid/broker/RateTracker.h deleted file mode 100644 index 0c20b37312..0000000000 --- a/qpid/cpp/src/qpid/broker/RateTracker.h +++ /dev/null @@ -1,57 +0,0 @@ -#ifndef QPID_BROKER_RATETRACKER_H -#define QPID_BROKER_RATETRACKER_H - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "qpid/sys/Time.h" - -namespace qpid { -namespace broker { - -/** - * Simple rate tracker: represents some value that can be incremented, - * then can periodcially sample the rate of increments. - */ -class RateTracker -{ - public: - RateTracker(); - /** - * Increments the count being tracked. Can be called concurrently - * with other calls to this operator as well as with calls to - * sampleRatePerSecond(). - */ - RateTracker& operator++(); - /** - * Returns the rate of increments per second since last - * called. Calls to this method should be serialised, but can be - * called concurrently with the increment operator - */ - double sampleRatePerSecond(); - private: - volatile int32_t currentCount; - int32_t lastCount; - qpid::sys::AbsTime lastTime; -}; -}} // namespace qpid::broker - -#endif /*!QPID_BROKER_RATETRACKER_H*/ diff --git a/qpid/cpp/src/qpid/cluster/Cluster.cpp b/qpid/cpp/src/qpid/cluster/Cluster.cpp index ede8d96681..82ed8bf8c9 100644 --- a/qpid/cpp/src/qpid/cluster/Cluster.cpp +++ b/qpid/cpp/src/qpid/cluster/Cluster.cpp @@ -146,6 +146,7 @@ #include "qpid/framing/AMQP_AllOperations.h" #include "qpid/framing/AllInvoker.h" #include "qpid/framing/ClusterConfigChangeBody.h" +#include "qpid/framing/ClusterClockBody.h" #include "qpid/framing/ClusterConnectionDeliverCloseBody.h" #include "qpid/framing/ClusterConnectionAbortBody.h" #include "qpid/framing/ClusterRetractOfferBody.h" @@ -198,7 +199,7 @@ namespace _qmf = ::qmf::org::apache::qpid::cluster; * Currently use SVN revision to avoid clashes with versions from * different branches. */ -const uint32_t Cluster::CLUSTER_VERSION = 1097431; +const uint32_t Cluster::CLUSTER_VERSION = 1128070; struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler { qpid::cluster::Cluster& cluster; @@ -230,7 +231,6 @@ struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler { cluster.updateOffer(member, updatee, l); } void retractOffer(uint64_t updatee) { cluster.retractOffer(member, updatee, l); } - void messageExpired(uint64_t id) { cluster.messageExpired(member, id, l); } void errorCheck(uint8_t type, const framing::SequenceNumber& frameSeq) { cluster.errorCheck(member, type, frameSeq, l); } @@ -240,6 +240,7 @@ struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler { void deliverToQueue(const std::string& queue, const std::string& message) { cluster.deliverToQueue(queue, message, l); } + void clock(uint64_t time) { cluster.clock(time, l); } bool invoke(AMQBody& body) { return framing::invoke(*this, body).wasHandled(); } }; @@ -253,7 +254,7 @@ Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) : self(cpg.self()), clusterId(true), mAgent(0), - expiryPolicy(new ExpiryPolicy(mcast, self, broker.getTimer())), + expiryPolicy(new ExpiryPolicy(*this)), mcast(cpg, poller, boost::bind(&Cluster::leave, this)), dispatcher(cpg, poller, boost::bind(&Cluster::leave, this)), deliverEventQueue(boost::bind(&Cluster::deliveredEvent, this, _1), @@ -668,6 +669,8 @@ void Cluster::initMapCompleted(Lock& l) { else { // I can go ready. discarding = false; setReady(l); + // Must be called *before* memberUpdate so first update will be generated. + failoverExchange->setReady(); memberUpdate(l); updateMgmtMembership(l); mcast.mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()), self); @@ -720,6 +723,20 @@ void Cluster::configChange(const MemberId&, updateMgmtMembership(l); // Update on every config change for consistency } +struct ClusterClockTask : public sys::TimerTask { + Cluster& cluster; + sys::Timer& timer; + + ClusterClockTask(Cluster& cluster, sys::Timer& timer, uint16_t clockInterval) + : TimerTask(Duration(clockInterval * TIME_MSEC),"ClusterClock"), cluster(cluster), timer(timer) {} + + void fire() { + cluster.sendClockUpdate(); + setupNextFire(); + timer.add(this); + } +}; + void Cluster::becomeElder(Lock&) { if (elder) return; // We were already the elder. // We are the oldest, reactive links if necessary @@ -727,6 +744,8 @@ void Cluster::becomeElder(Lock&) { elder = true; broker.getLinks().setPassive(false); timer->becomeElder(); + + clockTimer.add(new ClusterClockTask(*this, clockTimer, settings.clockInterval)); } void Cluster::makeOffer(const MemberId& id, Lock& ) { @@ -847,7 +866,7 @@ void Cluster::updateOffer(const MemberId& updater, uint64_t updateeInt, Lock& l) if (updatee != self && url) { QPID_LOG(debug, debugSnapshot()); if (mAgent) mAgent->clusterUpdate(); - // Updatee will call clusterUpdate when update completes + // Updatee will call clusterUpdate() via checkUpdateIn() when update completes } } @@ -928,10 +947,11 @@ void Cluster::checkUpdateIn(Lock& l) { if (!updateClosed) return; // Wait till update connection closes. if (updatedMap) { // We're up to date map = *updatedMap; - failoverExchange->setUrls(getUrls(l)); mcast.mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()), self); state = CATCHUP; memberUpdate(l); + // Must be called *after* memberUpdate() to avoid sending an extra update. + failoverExchange->setReady(); // NB: don't updateMgmtMembership() here as we are not in the deliver // thread. It will be updated on delivery of the "ready" we just mcast. broker.setClusterUpdatee(false); @@ -1121,10 +1141,6 @@ void Cluster::setClusterId(const Uuid& uuid, Lock&) { QPID_LOG(notice, *this << " cluster-uuid = " << clusterId); } -void Cluster::messageExpired(const MemberId&, uint64_t id, Lock&) { - expiryPolicy->deliverExpire(id); -} - void Cluster::errorCheck(const MemberId& from, uint8_t type, framing::SequenceNumber frameSeq, Lock&) { // If we see an errorCheck here (rather than in the ErrorCheck // class) then we have processed succesfully past the point of the @@ -1162,6 +1178,35 @@ void Cluster::deliverToQueue(const std::string& queue, const std::string& messag q->deliver(msg); } +sys::AbsTime Cluster::getClusterTime() { + Mutex::ScopedLock l(lock); + return clusterTime; +} + +// This method is called during update on the updatee to set the initial cluster time. +void Cluster::clock(const uint64_t time) { + Mutex::ScopedLock l(lock); + clock(time, l); +} + +// called when broadcast message received +void Cluster::clock(const uint64_t time, Lock&) { + clusterTime = AbsTime(EPOCH, time); + AbsTime now = AbsTime::now(); + + if (!elder) { + clusterTimeOffset = Duration(now, clusterTime); + } +} + +// called by elder timer to send clock broadcast +void Cluster::sendClockUpdate() { + Mutex::ScopedLock l(lock); + int64_t nanosecondsSinceEpoch = Duration(EPOCH, now()); + nanosecondsSinceEpoch += clusterTimeOffset; + mcast.mcastControl(ClusterClockBody(ProtocolVersion(), nanosecondsSinceEpoch), self); +} + bool Cluster::deferDeliveryImpl(const std::string& queue, const boost::intrusive_ptr& msg) { diff --git a/qpid/cpp/src/qpid/cluster/Cluster.h b/qpid/cpp/src/qpid/cluster/Cluster.h index 78d325cdf9..adb06b2783 100644 --- a/qpid/cpp/src/qpid/cluster/Cluster.h +++ b/qpid/cpp/src/qpid/cluster/Cluster.h @@ -63,6 +63,12 @@ class AMQBody; struct Uuid; } +namespace sys { +class Timer; +class AbsTime; +class Duration; +} + namespace cluster { class Connection; @@ -135,6 +141,9 @@ class Cluster : private Cpg::Handler, public management::Manageable { bool deferDeliveryImpl(const std::string& queue, const boost::intrusive_ptr& msg); + sys::AbsTime getClusterTime(); + void sendClockUpdate(); + void clock(const uint64_t time); private: typedef sys::Monitor::ScopedLock Lock; @@ -180,12 +189,12 @@ class Cluster : private Cpg::Handler, public management::Manageable { const std::string& left, const std::string& joined, Lock& l); - void messageExpired(const MemberId&, uint64_t, Lock& l); void errorCheck(const MemberId&, uint8_t type, SequenceNumber frameSeq, Lock&); void timerWakeup(const MemberId&, const std::string& name, Lock&); void timerDrop(const MemberId&, const std::string& name, Lock&); void shutdown(const MemberId&, const framing::Uuid& shutdownId, Lock&); void deliverToQueue(const std::string& queue, const std::string& message, Lock&); + void clock(const uint64_t time, Lock&); // Helper functions ConnectionPtr getConnection(const EventFrame&, Lock&); @@ -296,6 +305,9 @@ class Cluster : private Cpg::Handler, public management::Manageable { ErrorCheck error; UpdateReceiver updateReceiver; ClusterTimer* timer; + sys::Timer clockTimer; + sys::AbsTime clusterTime; + sys::Duration clusterTimeOffset; friend std::ostream& operator<<(std::ostream&, const Cluster&); friend struct ClusterDispatcher; diff --git a/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp b/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp index 2962daaa07..69ba095f16 100644 --- a/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp +++ b/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp @@ -72,6 +72,7 @@ struct ClusterOptions : public Options { ("cluster-cman", optValue(settings.quorum), "Integrate with Cluster Manager (CMAN) cluster.") #endif ("cluster-size", optValue(settings.size, "N"), "Wait for N cluster members before allowing clients to connect.") + ("cluster-clock-interval", optValue(settings.clockInterval,"N"), "How often to broadcast the current time to the cluster nodes, in milliseconds. A value between 5 and 1000 is recommended.") ("cluster-read-max", optValue(settings.readMax,"N"), "Experimental: flow-control limit reads per connection. 0=no limit.") ; } diff --git a/qpid/cpp/src/qpid/cluster/ClusterSettings.h b/qpid/cpp/src/qpid/cluster/ClusterSettings.h index 8e708aa139..2f7b5be20a 100644 --- a/qpid/cpp/src/qpid/cluster/ClusterSettings.h +++ b/qpid/cpp/src/qpid/cluster/ClusterSettings.h @@ -35,8 +35,9 @@ struct ClusterSettings { size_t readMax; std::string username, password, mechanism; size_t size; + uint16_t clockInterval; - ClusterSettings() : quorum(false), readMax(10), size(1) + ClusterSettings() : quorum(false), readMax(10), size(1), clockInterval(10) {} Url getUrl(uint16_t port) const { diff --git a/qpid/cpp/src/qpid/cluster/ClusterTimer.cpp b/qpid/cpp/src/qpid/cluster/ClusterTimer.cpp index f6e1c7a849..b4f7d00f38 100644 --- a/qpid/cpp/src/qpid/cluster/ClusterTimer.cpp +++ b/qpid/cpp/src/qpid/cluster/ClusterTimer.cpp @@ -70,6 +70,7 @@ void ClusterTimer::add(intrusive_ptr task) if (i != map.end()) throw Exception(QPID_MSG("Task already exists with name " << task->getName())); map[task->getName()] = task; + // Only the elder actually activates the task with the Timer base class. if (cluster.isElder()) { QPID_LOG(trace, "Elder activating cluster timer task " << task->getName()); @@ -112,6 +113,9 @@ void ClusterTimer::deliverWakeup(const std::string& name) { else { intrusive_ptr t = i->second; map.erase(i); + // Move the nextFireTime so readyToFire() is true. This is to ensure we + // don't get an error if the fired task calls setupNextFire() + t->setFired(); Timer::fire(t); } } diff --git a/qpid/cpp/src/qpid/cluster/Connection.cpp b/qpid/cpp/src/qpid/cluster/Connection.cpp index ab42122813..030d6e34c1 100644 --- a/qpid/cpp/src/qpid/cluster/Connection.cpp +++ b/qpid/cpp/src/qpid/cluster/Connection.cpp @@ -601,10 +601,6 @@ void Connection::queueObserverState(const std::string& qname, const std::string& QPID_LOG(error, "Failed to find observer " << observerId << " state on queue " << qname << "; this will result in inconsistencies."); } -void Connection::expiryId(uint64_t id) { - cluster.getExpiryPolicy().setId(id); -} - std::ostream& operator<<(std::ostream& o, const Connection& c) { const char* type="unknown"; if (c.isLocal()) type = "local"; @@ -724,5 +720,16 @@ void Connection::doCatchupIoCallbacks() { if (catchUp) getBrokerConnection()->doIoCallbacks(); } + +void Connection::clock(uint64_t time) { + QPID_LOG(debug, "Cluster connection received time update"); + cluster.clock(time); +} + +void Connection::queueDequeueSincePurgeState(const std::string& qname, uint32_t dequeueSincePurge) { + boost::shared_ptr queue(findQueue(qname)); + queue->setDequeueSincePurge(dequeueSincePurge); +} + }} // Namespace qpid::cluster diff --git a/qpid/cpp/src/qpid/cluster/Connection.h b/qpid/cpp/src/qpid/cluster/Connection.h index a0da9efbb8..a9740f97f8 100644 --- a/qpid/cpp/src/qpid/cluster/Connection.h +++ b/qpid/cpp/src/qpid/cluster/Connection.h @@ -155,7 +155,6 @@ class Connection : void queuePosition(const std::string&, const framing::SequenceNumber&); void queueFairshareState(const std::string&, const uint8_t priority, const uint8_t count); void queueObserverState(const std::string&, const std::string&, const framing::FieldTable&); - void expiryId(uint64_t); void txStart(); void txAccept(const framing::SequenceSet&); @@ -192,6 +191,10 @@ class Connection : void doCatchupIoCallbacks(); + void clock(uint64_t time); + + void queueDequeueSincePurgeState(const std::string&, uint32_t); + private: struct NullFrameHandler : public framing::FrameHandler { void handle(framing::AMQFrame&) {} diff --git a/qpid/cpp/src/qpid/cluster/ExpiryPolicy.cpp b/qpid/cpp/src/qpid/cluster/ExpiryPolicy.cpp index d9a7b0122a..0ef5c2a35d 100644 --- a/qpid/cpp/src/qpid/cluster/ExpiryPolicy.cpp +++ b/qpid/cpp/src/qpid/cluster/ExpiryPolicy.cpp @@ -21,106 +21,21 @@ #include "qpid/broker/Message.h" #include "qpid/cluster/ExpiryPolicy.h" -#include "qpid/cluster/Multicaster.h" -#include "qpid/framing/ClusterMessageExpiredBody.h" +#include "qpid/cluster/Cluster.h" #include "qpid/sys/Time.h" -#include "qpid/sys/Timer.h" #include "qpid/log/Statement.h" namespace qpid { namespace cluster { -ExpiryPolicy::ExpiryPolicy(Multicaster& m, const MemberId& id, sys::Timer& t) - : expiryId(1), expiredPolicy(new Expired), mcast(m), memberId(id), timer(t) {} +ExpiryPolicy::ExpiryPolicy(Cluster& cluster) : cluster(cluster) {} -struct ExpiryTask : public sys::TimerTask { - ExpiryTask(const boost::intrusive_ptr& policy, uint64_t id, sys::AbsTime when) - : TimerTask(when,"ExpiryPolicy"), expiryPolicy(policy), expiryId(id) {} - void fire() { expiryPolicy->sendExpire(expiryId); } - boost::intrusive_ptr expiryPolicy; - const uint64_t expiryId; -}; - -// Called while receiving an update -void ExpiryPolicy::setId(uint64_t id) { - sys::Mutex::ScopedLock l(lock); - expiryId = id; -} - -// Called while giving an update -uint64_t ExpiryPolicy::getId() const { - sys::Mutex::ScopedLock l(lock); - return expiryId; -} - -// Called in enqueuing connection thread -void ExpiryPolicy::willExpire(broker::Message& m) { - uint64_t id; - { - // When messages are fanned out to multiple queues, update sends - // them as independenty messages so we can have multiple messages - // with the same expiry ID. - // - sys::Mutex::ScopedLock l(lock); - id = expiryId++; - if (!id) { // This is an update of an already-expired message. - m.setExpiryPolicy(expiredPolicy); - } - else { - assert(unexpiredByMessage.find(&m) == unexpiredByMessage.end()); - // If this is an update, the id may already exist - unexpiredById.insert(IdMessageMap::value_type(id, &m)); - unexpiredByMessage[&m] = id; - } - } - timer.add(new ExpiryTask(this, id, m.getExpiration())); -} - -// Called in dequeueing connection thread -void ExpiryPolicy::forget(broker::Message& m) { - sys::Mutex::ScopedLock l(lock); - MessageIdMap::iterator i = unexpiredByMessage.find(&m); - assert(i != unexpiredByMessage.end()); - unexpiredById.erase(i->second); - unexpiredByMessage.erase(i); -} - -// Called in dequeueing connection or cleanup thread. bool ExpiryPolicy::hasExpired(broker::Message& m) { - sys::Mutex::ScopedLock l(lock); - return unexpiredByMessage.find(&m) == unexpiredByMessage.end(); -} - -// Called in timer thread -void ExpiryPolicy::sendExpire(uint64_t id) { - { - sys::Mutex::ScopedLock l(lock); - // Don't multicast an expiry notice if message is already forgotten. - if (unexpiredById.find(id) == unexpiredById.end()) return; - } - mcast.mcastControl(framing::ClusterMessageExpiredBody(framing::ProtocolVersion(), id), memberId); + return m.getExpiration() < cluster.getClusterTime(); } -// Called in CPG deliver thread. -void ExpiryPolicy::deliverExpire(uint64_t id) { - sys::Mutex::ScopedLock l(lock); - std::pair expired = unexpiredById.equal_range(id); - IdMessageMap::iterator i = expired.first; - while (i != expired.second) { - i->second->setExpiryPolicy(expiredPolicy); // hasExpired() == true; - unexpiredByMessage.erase(i->second); - unexpiredById.erase(i++); - } +sys::AbsTime ExpiryPolicy::getCurrentTime() { + return cluster.getClusterTime(); } -// Called in update thread on the updater. -boost::optional ExpiryPolicy::getId(broker::Message& m) { - sys::Mutex::ScopedLock l(lock); - MessageIdMap::iterator i = unexpiredByMessage.find(&m); - return i == unexpiredByMessage.end() ? boost::optional() : i->second; -} - -bool ExpiryPolicy::Expired::hasExpired(broker::Message&) { return true; } -void ExpiryPolicy::Expired::willExpire(broker::Message&) { } - }} // namespace qpid::cluster diff --git a/qpid/cpp/src/qpid/cluster/ExpiryPolicy.h b/qpid/cpp/src/qpid/cluster/ExpiryPolicy.h index 77a656aa68..d8ddbca8b3 100644 --- a/qpid/cpp/src/qpid/cluster/ExpiryPolicy.h +++ b/qpid/cpp/src/qpid/cluster/ExpiryPolicy.h @@ -36,12 +36,8 @@ namespace broker { class Message; } -namespace sys { -class Timer; -} - namespace cluster { -class Multicaster; +class Cluster; /** * Cluster expiry policy @@ -49,43 +45,13 @@ class Multicaster; class ExpiryPolicy : public broker::ExpiryPolicy { public: - ExpiryPolicy(Multicaster&, const MemberId&, sys::Timer&); + ExpiryPolicy(Cluster& cluster); - void willExpire(broker::Message&); bool hasExpired(broker::Message&); - void forget(broker::Message&); - - // Send expiration notice to cluster. - void sendExpire(uint64_t); + qpid::sys::AbsTime getCurrentTime(); - // Cluster delivers expiry notice. - void deliverExpire(uint64_t); - - void setId(uint64_t id); - uint64_t getId() const; - - boost::optional getId(broker::Message&); - private: - typedef std::map MessageIdMap; - // When messages are fanned out to multiple queues, update sends - // them as independenty messages so we can have multiple messages - // with the same expiry ID. - typedef std::multimap IdMessageMap; - - struct Expired : public broker::ExpiryPolicy { - bool hasExpired(broker::Message&); - void willExpire(broker::Message&); - }; - - mutable sys::Mutex lock; - MessageIdMap unexpiredByMessage; - IdMessageMap unexpiredById; - uint64_t expiryId; - boost::intrusive_ptr expiredPolicy; - Multicaster& mcast; - MemberId memberId; - sys::Timer& timer; + Cluster& cluster; }; }} // namespace qpid::cluster diff --git a/qpid/cpp/src/qpid/cluster/FailoverExchange.cpp b/qpid/cpp/src/qpid/cluster/FailoverExchange.cpp index 84232dac1b..cfbe34a460 100644 --- a/qpid/cpp/src/qpid/cluster/FailoverExchange.cpp +++ b/qpid/cpp/src/qpid/cluster/FailoverExchange.cpp @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -39,8 +39,10 @@ using namespace broker; using namespace framing; const string FailoverExchange::typeName("amq.failover"); - -FailoverExchange::FailoverExchange(management::Manageable* parent, Broker* b) : Exchange(typeName, parent, b ) { + +FailoverExchange::FailoverExchange(management::Manageable* parent, Broker* b) + : Exchange(typeName, parent, b ), ready(false) +{ if (mgmtExchange != 0) mgmtExchange->set_type(typeName); } @@ -53,16 +55,17 @@ void FailoverExchange::setUrls(const vector& u) { void FailoverExchange::updateUrls(const vector& u) { Lock l(lock); urls=u; - if (urls.empty()) return; - std::for_each(queues.begin(), queues.end(), - boost::bind(&FailoverExchange::sendUpdate, this, _1)); + if (ready && !urls.empty()) { + std::for_each(queues.begin(), queues.end(), + boost::bind(&FailoverExchange::sendUpdate, this, _1)); + } } string FailoverExchange::getType() const { return typeName; } bool FailoverExchange::bind(Queue::shared_ptr queue, const string&, const framing::FieldTable*) { Lock l(lock); - sendUpdate(queue); + if (ready) sendUpdate(queue); return queues.insert(queue).second; } @@ -84,7 +87,7 @@ void FailoverExchange::sendUpdate(const Queue::shared_ptr& queue) { // Called with lock held. if (urls.empty()) return; framing::Array array(0x95); - for (Urls::const_iterator i = urls.begin(); i != urls.end(); ++i) + for (Urls::const_iterator i = urls.begin(); i != urls.end(); ++i) array.add(boost::shared_ptr(new Str16Value(i->str()))); const ProtocolVersion v; boost::intrusive_ptr msg(new Message); @@ -96,9 +99,12 @@ void FailoverExchange::sendUpdate(const Queue::shared_ptr& queue) { header.get(true)->getApplicationHeaders().setArray(typeName, array); AMQFrame headerFrame(header); headerFrame.setFirstSegment(false); - msg->getFrames().append(headerFrame); + msg->getFrames().append(headerFrame); DeliverableMessage(msg).deliverTo(queue); } +void FailoverExchange::setReady() { + ready = true; +} }} // namespace cluster diff --git a/qpid/cpp/src/qpid/cluster/FailoverExchange.h b/qpid/cpp/src/qpid/cluster/FailoverExchange.h index 2e1edfc0ae..c3e50c6929 100644 --- a/qpid/cpp/src/qpid/cluster/FailoverExchange.h +++ b/qpid/cpp/src/qpid/cluster/FailoverExchange.h @@ -10,9 +10,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -46,6 +46,8 @@ class FailoverExchange : public broker::Exchange void setUrls(const std::vector&); /** Set the URLs and send an update.*/ void updateUrls(const std::vector&); + /** Flag the failover exchange as ready to generate updates (caught up) */ + void setReady(); // Exchange overrides std::string getType() const; @@ -56,7 +58,7 @@ class FailoverExchange : public broker::Exchange private: void sendUpdate(const boost::shared_ptr&); - + typedef sys::Mutex::ScopedLock Lock; typedef std::vector Urls; typedef std::set > Queues; @@ -64,7 +66,7 @@ class FailoverExchange : public broker::Exchange sys::Mutex lock; Urls urls; Queues queues; - + bool ready; }; }} // namespace qpid::cluster diff --git a/qpid/cpp/src/qpid/cluster/UpdateClient.cpp b/qpid/cpp/src/qpid/cluster/UpdateClient.cpp index a15c14ff48..77448789db 100644 --- a/qpid/cpp/src/qpid/cluster/UpdateClient.cpp +++ b/qpid/cpp/src/qpid/cluster/UpdateClient.cpp @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -26,9 +26,9 @@ #include "qpid/cluster/Decoder.h" #include "qpid/cluster/ExpiryPolicy.h" #include "qpid/cluster/UpdateDataExchange.h" -#include "qpid/client/SessionBase_0_10Access.h" -#include "qpid/client/ConnectionAccess.h" -#include "qpid/client/SessionImpl.h" +#include "qpid/client/SessionBase_0_10Access.h" +#include "qpid/client/ConnectionAccess.h" +#include "qpid/client/SessionImpl.h" #include "qpid/client/ConnectionImpl.h" #include "qpid/client/Future.h" #include "qpid/broker/Broker.h" @@ -83,11 +83,20 @@ using namespace framing; namespace arg=client::arg; using client::SessionBase_0_10Access; +// Reserved exchange/queue name for catch-up, avoid clashes with user queues/exchanges. +const std::string UpdateClient::UPDATE("x-qpid.cluster-update"); +// Name for header used to carry expiration information. +const std::string UpdateClient::X_QPID_EXPIRATION = "x-qpid.expiration"; +// Headers used to flag headers/properties added by the UpdateClient so they can be +// removed on the other side. +const std::string UpdateClient::X_QPID_NO_MESSAGE_PROPS = "x-qpid.no-message-props"; +const std::string UpdateClient::X_QPID_NO_HEADERS = "x-qpid.no-headers"; + std::ostream& operator<<(std::ostream& o, const UpdateClient& c) { return o << "cluster(" << c.updaterId << " UPDATER)"; } -struct ClusterConnectionProxy : public AMQP_AllProxy::ClusterConnection, public framing::FrameHandler +struct ClusterConnectionProxy : public AMQP_AllProxy::ClusterConnection, public framing::FrameHandler { boost::shared_ptr connection; @@ -121,7 +130,7 @@ void send(client::AsyncSession& s, const AMQBody& body) { // TODO aconway 2008-09-24: optimization: update connections/sessions in parallel. UpdateClient::UpdateClient(const MemberId& updater, const MemberId& updatee, const Url& url, - broker::Broker& broker, const ClusterMap& m, ExpiryPolicy& expiry_, + broker::Broker& broker, const ClusterMap& m, ExpiryPolicy& expiry_, const Cluster::ConnectionVector& cons, Decoder& decoder_, const boost::function& ok, const boost::function& fail, @@ -135,9 +144,6 @@ UpdateClient::UpdateClient(const MemberId& updater, const MemberId& updatee, con UpdateClient::~UpdateClient() {} -// Reserved exchange/queue name for catch-up, avoid clashes with user queues/exchanges. -const std::string UpdateClient::UPDATE("qpid.cluster-update"); - void UpdateClient::run() { try { connection.open(updateeUrl, connectionSettings); @@ -155,6 +161,13 @@ void UpdateClient::update() { << " at " << updateeUrl); Broker& b = updaterBroker; + if(b.getExpiryPolicy()) { + QPID_LOG(debug, *this << "Updating updatee with cluster time"); + qpid::sys::AbsTime clusterTime = b.getExpiryPolicy()->getCurrentTime(); + int64_t time = qpid::sys::Duration(qpid::sys::EPOCH, clusterTime); + ClusterConnectionProxy(session).clock(time); + } + updateManagementSetupState(); b.getExchanges().eachExchange(boost::bind(&UpdateClient::updateExchange, this, _1)); @@ -174,7 +187,6 @@ void UpdateClient::update() { // Update queue listeners: must come after sessions so consumerNumbering is populated b.getQueues().eachQueue(boost::bind(&UpdateClient::updateQueueListeners, this, _1)); - ClusterConnectionProxy(session).expiryId(expiry.getId()); updateLinks(); updateManagementAgent(); @@ -188,7 +200,7 @@ void UpdateClient::update() { // NOTE: connection will be closed from the other end, don't close // it here as that causes a race. - + // TODO aconway 2010-03-15: This sleep avoids the race condition // described in // https://bugzilla.redhat.com/show_bug.cgi?id=568831. // It allows the connection to fully close before destroying the @@ -280,7 +292,7 @@ class MessageUpdater { framing::SequenceNumber lastPos; client::AsyncSession session; ExpiryPolicy& expiry; - + public: MessageUpdater(const string& q, const client::AsyncSession s, ExpiryPolicy& expiry_) : queue(q), haveLastPos(false), session(s), expiry(expiry_) { @@ -297,7 +309,6 @@ class MessageUpdater { } } - void updateQueuedMessage(const broker::QueuedMessage& message) { // Send the queue position if necessary. if (!haveLastPos || message.position - lastPos != 1) { @@ -306,10 +317,23 @@ class MessageUpdater { } lastPos = message.position; - // Send the expiry ID if necessary. - if (message.payload->getProperties()->getTtl()) { - boost::optional expiryId = expiry.getId(*message.payload); - ClusterConnectionProxy(session).expiryId(expiryId?*expiryId:0); + // if the ttl > 0, we need to send the calculated expiration time to the updatee + if (message.payload->getProperties()->getTtl() > 0) { + bool hadMessageProps = + message.payload->hasProperties(); + framing::MessageProperties* mprops = + message.payload->getProperties(); + bool hadApplicationHeaders = mprops->hasApplicationHeaders(); + FieldTable& applicationHeaders = mprops->getApplicationHeaders(); + applicationHeaders.setInt64( + UpdateClient::X_QPID_EXPIRATION, + sys::Duration(sys::EPOCH, message.payload->getExpiration())); + // If message properties or application headers didn't exist + // prior to us adding data, we want to remove them on the other side. + if (!hadMessageProps) + applicationHeaders.setInt(UpdateClient::X_QPID_NO_MESSAGE_PROPS, 0); + else if (!hadApplicationHeaders) + applicationHeaders.setInt(UpdateClient::X_QPID_NO_HEADERS, 0); } // We can't send a broker::Message via the normal client API, @@ -322,7 +346,7 @@ class MessageUpdater { framing::MessageTransferBody transfer( *message.payload->getFrames().as()); transfer.setDestination(UpdateClient::UPDATE); - + sb.get()->send(transfer, message.payload->getFrames(), !message.payload->isContentReleased()); if (message.payload->isContentReleased()){ @@ -330,12 +354,21 @@ class MessageUpdater { uint16_t maxContentSize = maxFrameSize - AMQFrame::frameOverhead(); bool morecontent = true; for (uint64_t offset = 0; morecontent; offset += maxContentSize) - { + { AMQFrame frame((AMQContentBody())); morecontent = message.payload->getContentFrame(*(message.queue), frame, maxContentSize, offset); sb.get()->sendRawFrame(frame); } } + // If the ttl > 0, we need to send the calculated expiration time to the updatee + // Careful not to alter the message as a side effect e.g. by adding + // an empty DeliveryProperties or setting TTL when it wasn't set before. + uint64_t ttl = 0; + if (message.payload->hasProperties()) { + DeliveryProperties* dprops = + message.payload->getProperties(); + if (dprops->hasTtl()) ttl = dprops->getTtl(); + }; } void updateMessage(const boost::intrusive_ptr& message) { @@ -361,6 +394,8 @@ void UpdateClient::updateQueue(client::AsyncSession& s, const boost::shared_ptr< if (qpid::broker::Fairshare::getState(q->getMessages(), priority, count)) { ClusterConnectionProxy(s).queueFairshareState(q->getName(), priority, count); } + + ClusterConnectionProxy(s).queueDequeueSincePurgeState(q->getName(), q->getDequeueSincePurge()); } void UpdateClient::updateExclusiveQueue(const boost::shared_ptr& q) { @@ -393,7 +428,7 @@ void UpdateClient::updateConnection(const boost::intrusive_ptr& upda QPID_LOG(debug, *this << " updating connection " << *updateConnection); assert(updateConnection->getBrokerConnection()); broker::Connection& bc = *updateConnection->getBrokerConnection(); - + // Send the management ID first on the main connection. std::string mgmtId = updateConnection->getBrokerConnection()->getMgmtId(); ClusterConnectionProxy(session).shadowPrepare(mgmtId); @@ -430,7 +465,7 @@ void UpdateClient::updateSession(broker::SessionHandler& sh) { QPID_LOG(debug, *this << " updating session " << ss->getId()); - // Create a client session to update session state. + // Create a client session to update session state. boost::shared_ptr cimpl = client::ConnectionAccess::getImpl(shadowConnection); boost::shared_ptr simpl = cimpl->newSession(ss->getId().getName(), ss->getTimeout(), sh.getChannel()); simpl->disableAutoDetach(); @@ -456,12 +491,12 @@ void UpdateClient::updateSession(broker::SessionHandler& sh) { // Adjust command counter for message in progress, will be sent after state update. boost::intrusive_ptr inProgress = ss->getMessageInProgress(); SequenceNumber received = ss->receiverGetReceived().command; - if (inProgress) + if (inProgress) --received; // Sync the session to ensure all responses from broker have been processed. shadowSession.sync(); - + // Reset command-sequence state. proxy.sessionState( ss->senderGetReplayPoint().command, @@ -511,7 +546,7 @@ void UpdateClient::updateConsumer( QPID_LOG(debug, *this << " updated consumer " << ci->getName() << " on " << shadowSession.getId()); } - + void UpdateClient::updateUnacked(const broker::DeliveryRecord& dr) { if (!dr.isEnded() && dr.isAcquired() && dr.getMessage().payload) { // If the message is acquired then it is no longer on the @@ -543,7 +578,7 @@ class TxOpUpdater : public broker::TxOpConstVisitor, public MessageUpdater { void operator()(const broker::DtxAck& ) { throw InternalErrorException("DTX transactions not currently supported by cluster."); } - + void operator()(const broker::RecoveredDequeue& rdeq) { updateMessage(rdeq.getMessage()); proxy.txEnqueue(rdeq.getQueue()->getName()); @@ -563,7 +598,7 @@ class TxOpUpdater : public broker::TxOpConstVisitor, public MessageUpdater { typedef std::list QueueList; const QueueList& qlist = txPub.getQueues(); Array qarray(TYPE_CODE_STR8); - for (QueueList::const_iterator i = qlist.begin(); i != qlist.end(); ++i) + for (QueueList::const_iterator i = qlist.begin(); i != qlist.end(); ++i) qarray.push_back(Array::ValuePtr(new Str8Value((*i)->getName()))); proxy.txPublish(qarray, txPub.delivered); } @@ -573,7 +608,7 @@ class TxOpUpdater : public broker::TxOpConstVisitor, public MessageUpdater { client::AsyncSession session; ClusterConnectionProxy proxy; }; - + void UpdateClient::updateTxState(broker::SemanticState& s) { QPID_LOG(debug, *this << " updating TX transaction state."); ClusterConnectionProxy proxy(shadowSession); diff --git a/qpid/cpp/src/qpid/cluster/UpdateClient.h b/qpid/cpp/src/qpid/cluster/UpdateClient.h index b72d090d73..21bf6024e0 100644 --- a/qpid/cpp/src/qpid/cluster/UpdateClient.h +++ b/qpid/cpp/src/qpid/cluster/UpdateClient.h @@ -10,9 +10,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -69,14 +69,19 @@ class ExpiryPolicy; class UpdateClient : public sys::Runnable { public: static const std::string UPDATE; // Name for special update queue and exchange. + static const std::string X_QPID_EXPIRATION; // Update message expiration + // Flag to remove props/headers that were added by the UpdateClient + static const std::string X_QPID_NO_MESSAGE_PROPS; + static const std::string X_QPID_NO_HEADERS; + static client::Connection catchUpConnection(); - + UpdateClient(const MemberId& updater, const MemberId& updatee, const Url&, broker::Broker& donor, const ClusterMap& map, ExpiryPolicy& expiry, const std::vector >&, Decoder&, const boost::function& done, const boost::function& fail, - const client::ConnectionSettings& + const client::ConnectionSettings& ); ~UpdateClient(); diff --git a/qpid/cpp/src/qpid/cluster/UpdateExchange.cpp b/qpid/cpp/src/qpid/cluster/UpdateExchange.cpp index 11937f296f..e830459aba 100644 --- a/qpid/cpp/src/qpid/cluster/UpdateExchange.cpp +++ b/qpid/cpp/src/qpid/cluster/UpdateExchange.cpp @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -19,6 +19,7 @@ * */ #include "qpid/framing/MessageTransferBody.h" +#include "qpid/framing/FieldTable.h" #include "qpid/broker/Message.h" #include "UpdateExchange.h" @@ -27,6 +28,8 @@ namespace cluster { using framing::MessageTransferBody; using framing::DeliveryProperties; +using framing::MessageProperties; +using framing::FieldTable; UpdateExchange::UpdateExchange(management::Manageable* parent) : broker::Exchange(UpdateClient::UPDATE, parent), @@ -34,6 +37,7 @@ UpdateExchange::UpdateExchange(management::Manageable* parent) void UpdateExchange::setProperties(const boost::intrusive_ptr& msg) { + // Copy exchange name to destination property. MessageTransferBody* transfer = msg->getMethod(); assert(transfer); const DeliveryProperties* props = msg->getProperties(); @@ -42,6 +46,23 @@ void UpdateExchange::setProperties(const boost::intrusive_ptr& transfer->setDestination(props->getExchange()); else transfer->clearDestinationFlag(); -} + // Copy expiration from x-property if present. + if (msg->hasProperties()) { + MessageProperties* mprops = msg->getProperties(); + if (mprops->hasApplicationHeaders()) { + FieldTable& headers = mprops->getApplicationHeaders(); + if (headers.isSet(UpdateClient::X_QPID_EXPIRATION)) { + msg->setExpiration( + sys::AbsTime(sys::EPOCH, headers.getAsInt64(UpdateClient::X_QPID_EXPIRATION))); + headers.erase(UpdateClient::X_QPID_EXPIRATION); + // Erase props/headers that were added by the UpdateClient + if (headers.isSet(UpdateClient::X_QPID_NO_MESSAGE_PROPS)) + msg->eraseProperties(); + else if (headers.isSet(UpdateClient::X_QPID_NO_HEADERS)) + mprops->clearApplicationHeadersFlag(); + } + } + } +} }} // namespace qpid::cluster diff --git a/qpid/cpp/src/qpid/framing/AMQHeaderBody.h b/qpid/cpp/src/qpid/framing/AMQHeaderBody.h index a8c326969a..452154eb5c 100644 --- a/qpid/cpp/src/qpid/framing/AMQHeaderBody.h +++ b/qpid/cpp/src/qpid/framing/AMQHeaderBody.h @@ -10,9 +10,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -58,7 +58,7 @@ class QPID_COMMON_CLASS_EXTERN AMQHeaderBody : public AMQBody } else return Base::decode(buffer, size, type); - } + } void print(std::ostream& out) const { const boost::optional& p=this->OptProps::props; if (p) out << *p; @@ -77,7 +77,7 @@ class QPID_COMMON_CLASS_EXTERN AMQHeaderBody : public AMQBody typedef PropSet, MessageProperties> Properties; Properties properties; - + public: inline uint8_t type() const { return HEADER_BODY; } @@ -99,6 +99,10 @@ public: return properties.OptProps::props.get_ptr(); } + template void erase() { + properties.OptProps::props.reset(); + } + boost::intrusive_ptr clone() const { return BodyFactory::copy(*this); } }; diff --git a/qpid/cpp/src/qpid/sys/Timer.cpp b/qpid/cpp/src/qpid/sys/Timer.cpp index fdb2e8c6bb..47752e4584 100644 --- a/qpid/cpp/src/qpid/sys/Timer.cpp +++ b/qpid/cpp/src/qpid/sys/Timer.cpp @@ -75,6 +75,12 @@ void TimerTask::cancel() { cancelled = true; } +void TimerTask::setFired() { + // Set nextFireTime to just before now, making readyToFire() true. + nextFireTime = AbsTime(sys::now(), Duration(-1)); +} + + Timer::Timer() : active(false), late(50 * TIME_MSEC), diff --git a/qpid/cpp/src/qpid/sys/Timer.h b/qpid/cpp/src/qpid/sys/Timer.h index 98ba39ce38..fccb17dbc2 100644 --- a/qpid/cpp/src/qpid/sys/Timer.h +++ b/qpid/cpp/src/qpid/sys/Timer.h @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -64,6 +64,10 @@ class TimerTask : public RefCounted { std::string getName() const { return name; } + // Move the nextFireTime so readyToFire is true. + // Used by the cluster, where tasks are fired on cluster events, not on local time. + QPID_COMMON_EXTERN void setFired(); + protected: // Must be overridden with callback virtual void fire() = 0; diff --git a/qpid/cpp/src/tests/QueueTest.cpp b/qpid/cpp/src/tests/QueueTest.cpp index 34e4592a15..6fdc4c69ad 100644 --- a/qpid/cpp/src/tests/QueueTest.cpp +++ b/qpid/cpp/src/tests/QueueTest.cpp @@ -681,7 +681,7 @@ QPID_AUTO_TEST_CASE(testPurgeExpired) { addMessagesToQueue(10, queue); BOOST_CHECK_EQUAL(queue.getMessageCount(), 10u); ::usleep(300*1000); - queue.purgeExpired(); + queue.purgeExpired(0); BOOST_CHECK_EQUAL(queue.getMessageCount(), 5u); } @@ -692,7 +692,7 @@ QPID_AUTO_TEST_CASE(testQueueCleaner) { addMessagesToQueue(10, *queue, 200, 400); BOOST_CHECK_EQUAL(queue->getMessageCount(), 10u); - QueueCleaner cleaner(queues, timer); + QueueCleaner cleaner(queues, &timer); cleaner.start(100 * qpid::sys::TIME_MSEC); ::usleep(300*1000); BOOST_CHECK_EQUAL(queue->getMessageCount(), 5u); diff --git a/qpid/cpp/src/tests/cluster_test_logs.py b/qpid/cpp/src/tests/cluster_test_logs.py index 9f7d1e2f6c..a0ce8fb9c3 100755 --- a/qpid/cpp/src/tests/cluster_test_logs.py +++ b/qpid/cpp/src/tests/cluster_test_logs.py @@ -54,7 +54,7 @@ def filter_log(log): 'caught up', 'active for links|Passivating links|Activating links', 'info Connection.* connected to', # UpdateClient connection - 'warning Connection [\d+ [0-9.:]+] closed', # UpdateClient connection + 'warning Connection \\[[-0-9.: ]+\\] closed', # UpdateClient connection 'warning Broker closed connection: 200, OK', 'task late', 'task overran', diff --git a/qpid/cpp/src/tests/cluster_tests.py b/qpid/cpp/src/tests/cluster_tests.py index 66cfd27f7b..f5ce2a234f 100755 --- a/qpid/cpp/src/tests/cluster_tests.py +++ b/qpid/cpp/src/tests/cluster_tests.py @@ -18,11 +18,12 @@ # under the License. # -import os, signal, sys, time, imp, re, subprocess, glob, cluster_test_logs +import os, signal, sys, time, imp, re, subprocess, glob, random, logging +import cluster_test_logs from qpid import datatypes, messaging from brokertest import * from qpid.harness import Skipped -from qpid.messaging import Message, Empty, Disposition, REJECTED +from qpid.messaging import Message, Empty, Disposition, REJECTED, util from threading import Thread, Lock, Condition from logging import getLogger from itertools import chain @@ -96,9 +97,15 @@ class ShortTests(BrokerTest): destination="amq.direct", message=qpid.datatypes.Message(props, "content")) + # Try message with TTL and differnet headers/properties + cluster[0].send_message("q", Message(durable=True, ttl=100000)) + cluster[0].send_message("q", Message(durable=True, properties={}, ttl=100000)) + cluster[0].send_message("q", Message(durable=True, properties={"x":10}, ttl=100000)) + # Now update a new member and compare their dumps. cluster.start(args=["--test-store-dump", "updatee.dump"]) assert readfile("direct.dump") == readfile("updatee.dump") + os.remove("direct.dump") os.remove("updatee.dump") @@ -687,6 +694,25 @@ acl allow all all self.assert_browse(s1, "q", ["foo"]) + def test_ttl_consistent(self): + """Ensure we don't get inconsistent errors with message that have TTL very close together""" + messages = [ Message(str(i), ttl=i/1000.0) for i in xrange(0,1000)] + messages.append(Message("x")) + cluster = self.cluster(2) + sender = cluster[0].connect().session().sender("q;{create:always}") + + def fetch(b): + receiver = b.connect().session().receiver("q;{create:always}") + while receiver.fetch().content != "x": pass + + for m in messages: sender.send(m, sync=False) + for m in messages: sender.send(m, sync=False) + fetch(cluster[0]) + fetch(cluster[1]) + for m in messages: sender.send(m, sync=False) + cluster.start() + fetch(cluster[2]) + class LongTests(BrokerTest): """Tests that can run for a long time if -DDURATION= is set""" def duration(self): @@ -811,7 +837,7 @@ class LongTests(BrokerTest): endtime = time.time() + self.duration() # For long duration, first run is a quarter of the duration. - runtime = max(5, self.duration() / 4.0) + runtime = min(5.0, self.duration() / 3.0) alive = 0 # First live cluster member for i in range(len(cluster)): start_clients(cluster[i]) start_mclients(cluster[alive]) @@ -853,7 +879,7 @@ class LongTests(BrokerTest): end = time.time() + self.duration() while (time.time() < end): # Get a management interval for i in xrange(1000): cluster[0].connect().close() - cluster_test_logs.verify_logs() + cluster_test_logs.verify_logs() def test_flowlimit_failover(self): """Test fail-over during continuous send-receive with flow control @@ -880,6 +906,7 @@ class LongTests(BrokerTest): while time.time() < endtime: for s in senders: s.sender.assert_running() receiver.receiver.assert_running() + for b in cluster[i:]: b.ready() # Check if any broker crashed. cluster[i].kill() i += 1 b = cluster.start(expect=EXPECT_EXIT_FAIL) @@ -889,6 +916,114 @@ class LongTests(BrokerTest): receiver.stop() for i in range(i, len(cluster)): cluster[i].kill() + def test_ttl_failover(self): + """Test that messages with TTL don't cause problems in a cluster with failover""" + + class Client(StoppableThread): + + def __init__(self, broker): + StoppableThread.__init__(self) + self.connection = broker.connect(reconnect=True) + self.auto_fetch_reconnect_urls(self.connection) + self.session = self.connection.session() + + def auto_fetch_reconnect_urls(self, conn): + """Replacment for qpid.messaging.util version which is noisy""" + ssn = conn.session("auto-fetch-reconnect-urls") + rcv = ssn.receiver("amq.failover") + rcv.capacity = 10 + + def main(): + while True: + try: + msg = rcv.fetch() + qpid.messaging.util.set_reconnect_urls(conn, msg) + ssn.acknowledge(msg, sync=False) + except messaging.exceptions.LinkClosed: return + except messaging.exceptions.ConnectionError: return + + thread = Thread(name="auto-fetch-reconnect-urls", target=main) + thread.setDaemon(True) + thread.start() + + def stop(self): + StoppableThread.stop(self) + self.connection.detach() + + class Sender(Client): + def __init__(self, broker, address): + Client.__init__(self, broker) + self.sent = 0 # Number of messages _reliably_ sent. + self.sender = self.session.sender(address, capacity=1000) + + def send_counted(self, ttl): + self.sender.send(Message(str(self.sent), ttl=ttl)) + self.sent += 1 + + def run(self): + while not self.stopped: + choice = random.randint(0,4) + if choice == 0: self.send_counted(None) # No ttl + elif choice == 1: self.send_counted(100000) # Large ttl + else: # Small ttl, might expire + self.sender.send(Message("", ttl=random.random()/10)) + self.sender.send(Message("z"), sync=True) # Chaser. + + class Receiver(Client): + + def __init__(self, broker, address): + Client.__init__(self, broker) + self.received = 0 # Number of non-empty (reliable) messages received. + self.receiver = self.session.receiver(address, capacity=1000) + def run(self): + try: + while True: + m = self.receiver.fetch(1) + if m.content == "z": break + if m.content: # Ignore unreliable messages + # Ignore duplicates + if int(m.content) == self.received: self.received += 1 + except Exception,e: self.error = e + + # def test_ttl_failover + + # Original cluster will all be killed so expect exit with failure + # Set small purge interval. + cluster = self.cluster(3, expect=EXPECT_EXIT_FAIL, args=["--queue-purge-interval=1"]) + # Python client failover produces noisy WARN logs, disable temporarily + logger = logging.getLogger() + log_level = logger.getEffectiveLevel() + logger.setLevel(logging.ERROR) + try: + # Start sender and receiver threads + receiver = Receiver(cluster[0], "q;{create:always}") + receiver.start() + sender = Sender(cluster[0], "q;{create:always}") + sender.start() + + # Kill brokers in a cycle. + endtime = time.time() + self.duration() + runtime = min(5.0, self.duration() / 4.0) + i = 0 + while time.time() < endtime: + for b in cluster[i:]: b.ready() # Check if any broker crashed. + cluster[i].kill() + i += 1 + b = cluster.start(expect=EXPECT_EXIT_FAIL) + b.ready() + time.sleep(runtime) + sender.stop() + receiver.stop() + for b in cluster[i:]: + b.ready() # Check it didn't crash + b.kill() + self.assertEqual(sender.sent, receiver.received) + cluster_test_logs.verify_logs() + finally: + # Detach to avoid slow reconnect attempts during shut-down if test fails. + sender.connection.detach() + receiver.connection.detach() + logger.setLevel(log_level) class StoreTests(BrokerTest): """ diff --git a/qpid/cpp/xml/cluster.xml b/qpid/cpp/xml/cluster.xml index 4d83c5b5de..c33f2e4852 100644 --- a/qpid/cpp/xml/cluster.xml +++ b/qpid/cpp/xml/cluster.xml @@ -8,9 +8,9 @@ - to you under the Apache License, Version 2.0 (the - "License"); you may not use this file except in compliance - with the License. You may obtain a copy of the License at -- +- - http://www.apache.org/licenses/LICENSE-2.0 -- +- - Unless required by applicable law or agreed to in writing, - software distributed under the License is distributed on an - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -78,10 +78,6 @@ - - - - @@ -89,7 +85,7 @@ - + @@ -116,6 +112,11 @@ + + + + + @@ -149,7 +150,7 @@ - + + @@ -192,9 +193,9 @@ - + - + @@ -204,7 +205,7 @@ - + @@ -253,9 +254,6 @@ - - - @@ -289,6 +287,18 @@ + + + + + + + + + + + + -- cgit v1.2.1