summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2011-06-15 20:15:51 +0000
committerAlan Conway <aconway@apache.org>2011-06-15 20:15:51 +0000
commit4cb595e5f5a3f68b4abb1e1aca1922269fa2b559 (patch)
tree6d2f930af52b5f4aa1e035e2b5cb48ee5182da4b
parent888505626d5fd5b8eee141017617d894eb0cc16f (diff)
downloadqpid-python-4cb595e5f5a3f68b4abb1e1aca1922269fa2b559.tar.gz
QPID-3280: Performance problem with TTL messages.
When sending a large number of messages with nonzero TTLs to a cluster, overall message throughput drops by around 20-30% compared to messages with TTL 0. The previous approach to TTL in the cluster is replaced with a simpler "cluster clock". Also QueueCleaner is executed in the cluster timer, and modified to be deterministic in a cluster. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1136170 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/include/qpid/framing/FieldTable.h2
-rw-r--r--qpid/cpp/src/CMakeLists.txt7
-rw-r--r--qpid/cpp/src/Makefile.am2
-rw-r--r--qpid/cpp/src/qpid/broker/Broker.cpp3
-rw-r--r--qpid/cpp/src/qpid/broker/ExpiryPolicy.cpp10
-rw-r--r--qpid/cpp/src/qpid/broker/ExpiryPolicy.h12
-rw-r--r--qpid/cpp/src/qpid/broker/Message.cpp55
-rw-r--r--qpid/cpp/src/qpid/broker/Message.h28
-rw-r--r--qpid/cpp/src/qpid/broker/Queue.cpp140
-rw-r--r--qpid/cpp/src/qpid/broker/Queue.h37
-rw-r--r--qpid/cpp/src/qpid/broker/QueueCleaner.cpp18
-rw-r--r--qpid/cpp/src/qpid/broker/QueueCleaner.h14
-rw-r--r--qpid/cpp/src/qpid/broker/RateTracker.cpp51
-rw-r--r--qpid/cpp/src/qpid/broker/RateTracker.h57
-rw-r--r--qpid/cpp/src/qpid/cluster/Cluster.cpp63
-rw-r--r--qpid/cpp/src/qpid/cluster/Cluster.h14
-rw-r--r--qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp1
-rw-r--r--qpid/cpp/src/qpid/cluster/ClusterSettings.h3
-rw-r--r--qpid/cpp/src/qpid/cluster/ClusterTimer.cpp4
-rw-r--r--qpid/cpp/src/qpid/cluster/Connection.cpp15
-rw-r--r--qpid/cpp/src/qpid/cluster/Connection.h5
-rw-r--r--qpid/cpp/src/qpid/cluster/ExpiryPolicy.cpp95
-rw-r--r--qpid/cpp/src/qpid/cluster/ExpiryPolicy.h42
-rw-r--r--qpid/cpp/src/qpid/cluster/FailoverExchange.cpp26
-rw-r--r--qpid/cpp/src/qpid/cluster/FailoverExchange.h10
-rw-r--r--qpid/cpp/src/qpid/cluster/UpdateClient.cpp91
-rw-r--r--qpid/cpp/src/qpid/cluster/UpdateClient.h13
-rw-r--r--qpid/cpp/src/qpid/cluster/UpdateExchange.cpp27
-rw-r--r--qpid/cpp/src/qpid/framing/AMQHeaderBody.h12
-rw-r--r--qpid/cpp/src/qpid/sys/Timer.cpp6
-rw-r--r--qpid/cpp/src/qpid/sys/Timer.h8
-rw-r--r--qpid/cpp/src/tests/QueueTest.cpp4
-rwxr-xr-xqpid/cpp/src/tests/cluster_test_logs.py2
-rwxr-xr-xqpid/cpp/src/tests/cluster_tests.py143
-rw-r--r--qpid/cpp/xml/cluster.xml40
35 files changed, 583 insertions, 477 deletions
diff --git a/qpid/cpp/include/qpid/framing/FieldTable.h b/qpid/cpp/include/qpid/framing/FieldTable.h
index fed431129a..e8ec524863 100644
--- a/qpid/cpp/include/qpid/framing/FieldTable.h
+++ b/qpid/cpp/include/qpid/framing/FieldTable.h
@@ -65,6 +65,8 @@ class FieldTable
QPID_COMMON_EXTERN void decode(Buffer& buffer);
QPID_COMMON_EXTERN int count() const;
+ QPID_COMMON_EXTERN size_t size() const { return values.size(); }
+ QPID_COMMON_EXTERN bool empty() { return size() == 0; }
QPID_COMMON_EXTERN void set(const std::string& name, const ValuePtr& value);
QPID_COMMON_EXTERN ValuePtr get(const std::string& name) const;
QPID_COMMON_INLINE_EXTERN bool isSet(const std::string& name) const { return get(name).get() != 0; }
diff --git a/qpid/cpp/src/CMakeLists.txt b/qpid/cpp/src/CMakeLists.txt
index 0fe2d7e4d0..11554b1034 100644
--- a/qpid/cpp/src/CMakeLists.txt
+++ b/qpid/cpp/src/CMakeLists.txt
@@ -96,7 +96,7 @@ MACRO (add_msvc_version_full verProject verProjectType verProjectFileExt verFN1
inherit_value ("winver_${verProject}_InternalName" "${verProject}")
inherit_value ("winver_${verProject}_OriginalFilename" "${verProject}.${verProjectFileExt}")
inherit_value ("winver_${verProject}_ProductName" "${winver_DESCRIPTION_SUMMARY}")
-
+
# Create strings to be substituted into the template file
set ("winverFileVersionBinary" "${winver_${verProject}_FileVersionBinary}")
set ("winverProductVersionBinary" "${winver_${verProject}_ProductVersionBinary}")
@@ -126,7 +126,7 @@ ENDMACRO (add_msvc_version_full)
#
MACRO (add_msvc_version verProject verProjectType verProjectFileExt)
if (MSVC)
- add_msvc_version_full (${verProject}
+ add_msvc_version_full (${verProject}
${verProjectType}
${verProjectFileExt}
${winver_FILE_VERSION_N1}
@@ -656,7 +656,7 @@ if (CMAKE_SYSTEM_NAME STREQUAL Windows)
set (qpidd_platform_SOURCES
windows/QpiddBroker.cpp
)
-
+
set (qpidmessaging_platform_SOURCES
qpid/messaging/HandleInstantiator.cpp
)
@@ -997,7 +997,6 @@ set (qpidbroker_SOURCES
qpid/broker/QueuePolicy.cpp
qpid/broker/QueueRegistry.cpp
qpid/broker/QueueFlowLimit.cpp
- qpid/broker/RateTracker.cpp
qpid/broker/RecoveryManagerImpl.cpp
qpid/broker/RecoveredEnqueue.cpp
qpid/broker/RecoveredDequeue.cpp
diff --git a/qpid/cpp/src/Makefile.am b/qpid/cpp/src/Makefile.am
index 15021cc68b..608afb0660 100644
--- a/qpid/cpp/src/Makefile.am
+++ b/qpid/cpp/src/Makefile.am
@@ -616,8 +616,6 @@ libqpidbroker_la_SOURCES = \
qpid/broker/QueueFlowLimit.h \
qpid/broker/QueueFlowLimit.cpp \
qpid/broker/RateFlowcontrol.h \
- qpid/broker/RateTracker.cpp \
- qpid/broker/RateTracker.h \
qpid/broker/RecoverableConfig.h \
qpid/broker/RecoverableExchange.h \
qpid/broker/RecoverableMessage.h \
diff --git a/qpid/cpp/src/qpid/broker/Broker.cpp b/qpid/cpp/src/qpid/broker/Broker.cpp
index ca3be5b567..f80e0f1e61 100644
--- a/qpid/cpp/src/qpid/broker/Broker.cpp
+++ b/qpid/cpp/src/qpid/broker/Broker.cpp
@@ -188,7 +188,7 @@ Broker::Broker(const Broker::Options& conf) :
conf.replayFlushLimit*1024, // convert kb to bytes.
conf.replayHardLimit*1024),
*this),
- queueCleaner(queues, timer),
+ queueCleaner(queues, &timer),
queueEvents(poller,!conf.asyncQueueEvents),
recovery(true),
inCluster(false),
@@ -750,6 +750,7 @@ bool Broker::deferDeliveryImpl(const std::string& ,
void Broker::setClusterTimer(std::auto_ptr<sys::Timer> t) {
clusterTimer = t;
+ queueCleaner.setTimer(clusterTimer.get());
}
const std::string Broker::TCP_TRANSPORT("tcp");
diff --git a/qpid/cpp/src/qpid/broker/ExpiryPolicy.cpp b/qpid/cpp/src/qpid/broker/ExpiryPolicy.cpp
index 64a12d918a..62cb3fc116 100644
--- a/qpid/cpp/src/qpid/broker/ExpiryPolicy.cpp
+++ b/qpid/cpp/src/qpid/broker/ExpiryPolicy.cpp
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -27,12 +27,12 @@ namespace broker {
ExpiryPolicy::~ExpiryPolicy() {}
-void ExpiryPolicy::willExpire(Message&) {}
-
bool ExpiryPolicy::hasExpired(Message& m) {
return m.getExpiration() < sys::AbsTime::now();
}
-void ExpiryPolicy::forget(Message&) {}
+sys::AbsTime ExpiryPolicy::getCurrentTime() {
+ return sys::AbsTime::now();
+}
}} // namespace qpid::broker
diff --git a/qpid/cpp/src/qpid/broker/ExpiryPolicy.h b/qpid/cpp/src/qpid/broker/ExpiryPolicy.h
index a723eb0aa8..2caf00ce00 100644
--- a/qpid/cpp/src/qpid/broker/ExpiryPolicy.h
+++ b/qpid/cpp/src/qpid/broker/ExpiryPolicy.h
@@ -10,9 +10,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -26,6 +26,11 @@
#include "qpid/broker/BrokerImportExport.h"
namespace qpid {
+
+namespace sys {
+class AbsTime;
+}
+
namespace broker {
class Message;
@@ -37,9 +42,8 @@ class QPID_BROKER_CLASS_EXTERN ExpiryPolicy : public RefCounted
{
public:
QPID_BROKER_EXTERN virtual ~ExpiryPolicy();
- QPID_BROKER_EXTERN virtual void willExpire(Message&);
QPID_BROKER_EXTERN virtual bool hasExpired(Message&);
- QPID_BROKER_EXTERN virtual void forget(Message&);
+ QPID_BROKER_EXTERN virtual qpid::sys::AbsTime getCurrentTime();
};
}} // namespace qpid::broker
diff --git a/qpid/cpp/src/qpid/broker/Message.cpp b/qpid/cpp/src/qpid/broker/Message.cpp
index 7545e4941f..d694d1eafd 100644
--- a/qpid/cpp/src/qpid/broker/Message.cpp
+++ b/qpid/cpp/src/qpid/broker/Message.cpp
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -49,25 +49,21 @@ TransferAdapter Message::TRANSFER;
Message::Message(const framing::SequenceNumber& id) :
frames(id), persistenceId(0), redelivered(false), loaded(false),
- staged(false), forcePersistentPolicy(false), publisher(0), adapter(0),
+ staged(false), forcePersistentPolicy(false), publisher(0), adapter(0),
expiration(FAR_FUTURE), dequeueCallback(0),
inCallback(false), requiredCredit(0), isManagementMessage(false)
{}
Message::Message(const Message& original) :
PersistableMessage(), frames(original.frames), persistenceId(0), redelivered(false), loaded(false),
- staged(false), forcePersistentPolicy(false), publisher(0), adapter(0),
+ staged(false), forcePersistentPolicy(false), publisher(0), adapter(0),
expiration(original.expiration), dequeueCallback(0),
inCallback(false), requiredCredit(0)
{
setExpiryPolicy(original.expiryPolicy);
}
-Message::~Message()
-{
- if (expiryPolicy)
- expiryPolicy->forget(*this);
-}
+Message::~Message() {}
void Message::forcePersistent()
{
@@ -87,7 +83,7 @@ std::string Message::getRoutingKey() const
return getAdapter().getRoutingKey(frames);
}
-std::string Message::getExchangeName() const
+std::string Message::getExchangeName() const
{
return getAdapter().getExchange(frames);
}
@@ -96,7 +92,7 @@ const boost::shared_ptr<Exchange> Message::getExchange(ExchangeRegistry& registr
{
if (!exchange) {
exchange = registry.get(getExchangeName());
- }
+ }
return exchange;
}
@@ -196,7 +192,7 @@ void Message::decodeContent(framing::Buffer& buffer)
} else {
//adjust header flags
MarkLastSegment f;
- frames.map_if(f, TypeFilter<HEADER_BODY>());
+ frames.map_if(f, TypeFilter<HEADER_BODY>());
}
//mark content loaded
loaded = true;
@@ -248,7 +244,7 @@ void Message::destroy()
bool Message::getContentFrame(const Queue& queue, AMQFrame& frame, uint16_t maxContentSize, uint64_t offset) const
{
intrusive_ptr<const PersistableMessage> pmsg(this);
-
+
bool done = false;
string& data = frame.castBody<AMQContentBody>()->getData();
store->loadContent(queue, pmsg, data, offset, maxContentSize);
@@ -273,7 +269,7 @@ void Message::sendContent(const Queue& queue, framing::FrameHandler& out, uint16
uint16_t maxContentSize = maxFrameSize - AMQFrame::frameOverhead();
bool morecontent = true;
for (uint64_t offset = 0; morecontent; offset += maxContentSize)
- {
+ {
AMQFrame frame((AMQContentBody()));
morecontent = getContentFrame(queue, frame, maxContentSize, offset);
out.handle(frame);
@@ -291,7 +287,7 @@ void Message::sendHeader(framing::FrameHandler& out, uint16_t /*maxFrameSize*/)
{
sys::Mutex::ScopedLock l(lock);
Relay f(out);
- frames.map_if(f, TypeFilter<HEADER_BODY>());
+ frames.map_if(f, TypeFilter<HEADER_BODY>());
}
// TODO aconway 2007-11-09: Obsolete, remove. Was used to cover over
@@ -321,7 +317,7 @@ bool Message::isContentLoaded() const
}
-namespace
+namespace
{
const std::string X_QPID_TRACE("x-qpid.trace");
}
@@ -358,13 +354,13 @@ void Message::addTraceId(const std::string& id)
trace += ",";
trace += id;
headers.setString(X_QPID_TRACE, trace);
- }
+ }
}
}
-void Message::setTimestamp(const boost::intrusive_ptr<ExpiryPolicy>& e)
+void Message::setTimestamp(const boost::intrusive_ptr<ExpiryPolicy>& e)
{
- DeliveryProperties* props = getProperties<DeliveryProperties>();
+ DeliveryProperties* props = getProperties<DeliveryProperties>();
if (props->getTtl()) {
// AMQP requires setting the expiration property to be posix
// time_t in seconds. TTL is in milliseconds
@@ -373,10 +369,14 @@ void Message::setTimestamp(const boost::intrusive_ptr<ExpiryPolicy>& e)
time_t now = ::time(0);
props->setExpiration(now + (props->getTtl()/1000));
}
- // Use higher resolution time for the internal expiry calculation.
- Duration ttl(std::min(props->getTtl() * TIME_MSEC, (uint64_t) std::numeric_limits<int64_t>::max()));//Prevent overflow
- expiration = AbsTime(AbsTime::now(), ttl);
- setExpiryPolicy(e);
+ if (e) {
+ // Use higher resolution time for the internal expiry calculation.
+ // Prevent overflow as a signed int64_t
+ Duration ttl(std::min(props->getTtl() * TIME_MSEC,
+ (uint64_t) std::numeric_limits<int64_t>::max()));
+ expiration = AbsTime(e->getCurrentTime(), ttl);
+ setExpiryPolicy(e);
+ }
}
}
@@ -386,16 +386,17 @@ void Message::adjustTtl()
if (props->getTtl()) {
sys::Mutex::ScopedLock l(lock);
if (expiration < FAR_FUTURE) {
- sys::Duration d(sys::AbsTime::now(), getExpiration());
- props->setTtl(int64_t(d) >= 1000000 ? int64_t(d)/1000000 : 1); // convert from ns to ms; set to 1 if expired
+ sys::AbsTime current(
+ expiryPolicy ? expiryPolicy->getCurrentTime() : sys::AbsTime::now());
+ sys::Duration ttl(current, getExpiration());
+ // convert from ns to ms; set to 1 if expired
+ props->setTtl(int64_t(ttl) >= 1000000 ? int64_t(ttl)/1000000 : 1);
}
}
}
void Message::setExpiryPolicy(const boost::intrusive_ptr<ExpiryPolicy>& e) {
expiryPolicy = e;
- if (expiryPolicy)
- expiryPolicy->willExpire(*this);
}
bool Message::hasExpired()
diff --git a/qpid/cpp/src/qpid/broker/Message.h b/qpid/cpp/src/qpid/broker/Message.h
index d85ee434db..e1d6c60a80 100644
--- a/qpid/cpp/src/qpid/broker/Message.h
+++ b/qpid/cpp/src/qpid/broker/Message.h
@@ -10,9 +10,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -34,12 +34,12 @@
#include <vector>
namespace qpid {
-
+
namespace framing {
class FieldTable;
class SequenceNumber;
}
-
+
namespace broker {
class ConnectionToken;
class Exchange;
@@ -51,11 +51,11 @@ class ExpiryPolicy;
class Message : public PersistableMessage {
public:
typedef boost::function<void (const boost::intrusive_ptr<Message>&)> MessageCallback;
-
+
QPID_BROKER_EXTERN Message(const framing::SequenceNumber& id = framing::SequenceNumber());
QPID_BROKER_EXTERN Message(const Message&);
QPID_BROKER_EXTERN ~Message();
-
+
uint64_t getPersistenceId() const { return persistenceId; }
void setPersistenceId(uint64_t _persistenceId) const { persistenceId = _persistenceId; }
@@ -83,10 +83,11 @@ public:
void setExpiryPolicy(const boost::intrusive_ptr<ExpiryPolicy>& e);
bool hasExpired();
sys::AbsTime getExpiration() const { return expiration; }
+ void setExpiration(sys::AbsTime exp) { expiration = exp; }
void adjustTtl();
- framing::FrameSet& getFrames() { return frames; }
- const framing::FrameSet& getFrames() const { return frames; }
+ framing::FrameSet& getFrames() { return frames; }
+ const framing::FrameSet& getFrames() const { return frames; }
template <class T> T* getProperties() {
qpid::framing::AMQHeaderBody* p = frames.getHeaders();
@@ -103,6 +104,11 @@ public:
return p->get<T>();
}
+ template <class T> void eraseProperties() {
+ qpid::framing::AMQHeaderBody* p = frames.getHeaders();
+ p->erase<T>();
+ }
+
template <class T> const T* getMethod() const {
return frames.as<T>();
}
@@ -135,7 +141,7 @@ public:
QPID_BROKER_EXTERN void decodeHeader(framing::Buffer& buffer);
QPID_BROKER_EXTERN void decodeContent(framing::Buffer& buffer);
-
+
void QPID_BROKER_EXTERN tryReleaseContent();
void releaseContent();
void releaseContent(MessageStore* s);//deprecated, use 'setStore(store); releaseContent();' instead
@@ -149,10 +155,10 @@ public:
bool isExcluded(const std::vector<std::string>& excludes) const;
void addTraceId(const std::string& id);
-
+
void forcePersistent();
bool isForcedPersistent();
-
+
/** Call cb when dequeue is complete, may call immediately. Holds cb by reference. */
void setDequeueCompleteCallback(MessageCallback& cb);
diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp
index 789ad581f5..bacc612e11 100644
--- a/qpid/cpp/src/qpid/broker/Queue.cpp
+++ b/qpid/cpp/src/qpid/broker/Queue.cpp
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -65,7 +65,7 @@ using std::mem_fun;
namespace _qmf = qmf::org::apache::qpid::broker;
-namespace
+namespace
{
const std::string qpidMaxSize("qpid.max_size");
const std::string qpidMaxCount("qpid.max_count");
@@ -87,16 +87,16 @@ const int ENQUEUE_ONLY=1;
const int ENQUEUE_AND_DEQUEUE=2;
}
-Queue::Queue(const string& _name, bool _autodelete,
+Queue::Queue(const string& _name, bool _autodelete,
MessageStore* const _store,
const OwnershipToken* const _owner,
Manageable* parent,
Broker* b) :
- name(_name),
+ name(_name),
autodelete(_autodelete),
store(_store),
- owner(_owner),
+ owner(_owner),
consumerCount(0),
exclusive(0),
noLocal(false),
@@ -179,9 +179,9 @@ void Queue::recover(boost::intrusive_ptr<Message>& msg){
if (policy.get()) policy->recoverEnqueued(msg);
push(msg, true);
- if (store){
+ if (store){
// setup synclist for recovered messages, so they don't get re-stored on lastNodeFailure
- msg->addToSyncList(shared_from_this(), store);
+ msg->addToSyncList(shared_from_this(), store);
}
if (store && (!msg->isContentLoaded() || msg->checkContentReleasable())) {
@@ -206,13 +206,13 @@ void Queue::process(boost::intrusive_ptr<Message>& msg){
void Queue::requeue(const QueuedMessage& msg){
assertClusterSafe();
QueueListeners::NotificationSet copy;
- {
+ {
Mutex::ScopedLock locker(messageLock);
if (!isEnqueued(msg)) return;
messages->reinsert(msg);
listeners.populate(copy);
- // for persistLastNode - don't force a message twice to disk, but force it if no force before
+ // for persistLastNode - don't force a message twice to disk, but force it if no force before
if(inLastNodeFailure && persistLastNode && !msg.payload->isStoredOnQueue(shared_from_this())) {
msg.payload->forcePersistent();
if (msg.payload->isForcedPersistent() ){
@@ -224,7 +224,7 @@ void Queue::requeue(const QueuedMessage& msg){
copy.notify();
}
-bool Queue::acquireMessageAt(const SequenceNumber& position, QueuedMessage& message)
+bool Queue::acquireMessageAt(const SequenceNumber& position, QueuedMessage& message)
{
Mutex::ScopedLock locker(messageLock);
assertClusterSafe();
@@ -268,7 +268,7 @@ bool Queue::getNextMessage(QueuedMessage& m, Consumer::shared_ptr c)
case NO_MESSAGES:
default:
return false;
- }
+ }
} else {
return browseNextMessage(m, c);
}
@@ -278,7 +278,7 @@ Queue::ConsumeCode Queue::consumeNextMessage(QueuedMessage& m, Consumer::shared_
{
while (true) {
Mutex::ScopedLock locker(messageLock);
- if (messages->empty()) {
+ if (messages->empty()) {
QPID_LOG(debug, "No messages to dispatch on queue '" << name << "'");
listeners.addListener(c);
return NO_MESSAGES;
@@ -291,7 +291,7 @@ Queue::ConsumeCode Queue::consumeNextMessage(QueuedMessage& m, Consumer::shared_
}
if (c->filter(msg.payload)) {
- if (c->accept(msg.payload)) {
+ if (c->accept(msg.payload)) {
m = msg;
pop();
return CONSUMED;
@@ -304,7 +304,7 @@ Queue::ConsumeCode Queue::consumeNextMessage(QueuedMessage& m, Consumer::shared_
//consumer will never want this message
QPID_LOG(debug, "Consumer doesn't want message from '" << name << "'");
return CANT_CONSUME;
- }
+ }
}
}
}
@@ -358,7 +358,7 @@ bool Queue::dispatch(Consumer::shared_ptr c)
}
}
-// Find the next message
+// Find the next message
bool Queue::seek(QueuedMessage& msg, Consumer::shared_ptr c) {
Mutex::ScopedLock locker(messageLock);
if (messages->next(c->position, msg)) {
@@ -426,19 +426,25 @@ bool collect_if_expired(std::deque<QueuedMessage>& expired, QueuedMessage& messa
}
}
-void Queue::purgeExpired()
+/**
+ *@param lapse: time since the last purgeExpired
+ */
+void Queue::purgeExpired(sys::Duration lapse)
{
//As expired messages are discarded during dequeue also, only
//bother explicitly expiring if the rate of dequeues since last
- //attempt is less than one per second.
-
- if (dequeueTracker.sampleRatePerSecond() < 1) {
+ //attempt is less than one per second.
+ int count = dequeueSincePurge.get();
+ dequeueSincePurge -= count;
+ int seconds = int64_t(lapse)/sys::TIME_SEC;
+ if (seconds == 0 || count / seconds < 1) {
std::deque<QueuedMessage> expired;
{
Mutex::ScopedLock locker(messageLock);
messages->removeIf(boost::bind(&collect_if_expired, boost::ref(expired), _1));
}
- for_each(expired.begin(), expired.end(), boost::bind(&Queue::dequeue, this, (TransactionContext*) 0, _1));
+ for_each(expired.begin(), expired.end(),
+ boost::bind(&Queue::dequeue, this, (TransactionContext*) 0, _1));
}
}
@@ -457,7 +463,7 @@ void Queue::purgeExpired()
uint32_t Queue::purge(const uint32_t purge_request, boost::shared_ptr<Exchange> dest)
{
Mutex::ScopedLock locker(messageLock);
- uint32_t purge_count = purge_request; // only comes into play if >0
+ uint32_t purge_count = purge_request; // only comes into play if >0
std::deque<DeliverableMessage> rerouteQueue;
uint32_t count = 0;
@@ -490,7 +496,7 @@ uint32_t Queue::purge(const uint32_t purge_request, boost::shared_ptr<Exchange>
uint32_t Queue::move(const Queue::shared_ptr destq, uint32_t qty) {
Mutex::ScopedLock locker(messageLock);
- uint32_t move_count = qty; // only comes into play if qty >0
+ uint32_t move_count = qty; // only comes into play if qty >0
uint32_t count = 0; // count how many were moved for returning
while((!qty || move_count--) && !messages->empty()) {
@@ -508,7 +514,7 @@ void Queue::pop()
{
assertClusterSafe();
messages->pop();
- ++dequeueTracker;
+ ++dequeueSincePurge;
}
void Queue::push(boost::intrusive_ptr<Message>& msg, bool isRecovery){
@@ -517,10 +523,10 @@ void Queue::push(boost::intrusive_ptr<Message>& msg, bool isRecovery){
QueuedMessage removed;
bool dequeueRequired = false;
{
- Mutex::ScopedLock locker(messageLock);
+ Mutex::ScopedLock locker(messageLock);
QueuedMessage qm(this, msg, ++sequence);
if (insertSeqNo) msg->getOrInsertHeaders().setInt64(seqNoKey, sequence);
-
+
dequeueRequired = messages->push(qm, removed);
listeners.populate(copy);
enqueued(qm);
@@ -599,7 +605,7 @@ void Queue::setLastNodeFailure()
}
-// return true if store exists,
+// return true if store exists,
bool Queue::enqueue(TransactionContext* ctxt, boost::intrusive_ptr<Message>& msg, bool suppressPolicyCheck)
{
ScopedUse u(barrier);
@@ -613,13 +619,13 @@ bool Queue::enqueue(TransactionContext* ctxt, boost::intrusive_ptr<Message>& msg
policy->getPendingDequeues(dequeues);
}
//depending on policy, may have some dequeues that need to performed without holding the lock
- for_each(dequeues.begin(), dequeues.end(), boost::bind(&Queue::dequeue, this, (TransactionContext*) 0, _1));
+ for_each(dequeues.begin(), dequeues.end(), boost::bind(&Queue::dequeue, this, (TransactionContext*) 0, _1));
}
if (inLastNodeFailure && persistLastNode){
msg->forcePersistent();
}
-
+
if (traceId.size()) {
//copy on write: take deep copy of message before modifying it
//as the frames may already be available for delivery on other
@@ -649,10 +655,10 @@ bool Queue::enqueue(TransactionContext* ctxt, boost::intrusive_ptr<Message>& msg
void Queue::enqueueAborted(boost::intrusive_ptr<Message> msg)
{
Mutex::ScopedLock locker(messageLock);
- if (policy.get()) policy->enqueueAborted(msg);
+ if (policy.get()) policy->enqueueAborted(msg);
}
-// return true if store exists,
+// return true if store exists,
bool Queue::dequeue(TransactionContext* ctxt, const QueuedMessage& msg)
{
ScopedUse u(barrier);
@@ -661,7 +667,7 @@ bool Queue::dequeue(TransactionContext* ctxt, const QueuedMessage& msg)
{
Mutex::ScopedLock locker(messageLock);
if (!isEnqueued(msg)) return false;
- if (!ctxt) {
+ if (!ctxt) {
dequeued(msg);
}
}
@@ -682,7 +688,7 @@ bool Queue::dequeue(TransactionContext* ctxt, const QueuedMessage& msg)
void Queue::dequeueCommitted(const QueuedMessage& msg)
{
Mutex::ScopedLock locker(messageLock);
- dequeued(msg);
+ dequeued(msg);
if (mgmtObject != 0) {
mgmtObject->inc_msgTxnDequeues();
mgmtObject->inc_byteTxnDequeues(msg.payload->contentSize());
@@ -737,8 +743,8 @@ int getIntegerSetting(const qpid::framing::FieldTable& settings, const std::stri
return v->get<int>();
} else if (v->convertsTo<std::string>()){
std::string s = v->get<std::string>();
- try {
- return boost::lexical_cast<int>(s);
+ try {
+ return boost::lexical_cast<int>(s);
} catch(const boost::bad_lexical_cast&) {
QPID_LOG(warning, "Ignoring invalid integer value for " << key << ": " << s);
return 0;
@@ -762,7 +768,7 @@ void Queue::configureImpl(const FieldTable& _settings)
broker->getQueueEvents().observe(*this, eventMode == ENQUEUE_ONLY);
}
- if (QueuePolicy::getType(_settings) == QueuePolicy::FLOW_TO_DISK &&
+ if (QueuePolicy::getType(_settings) == QueuePolicy::FLOW_TO_DISK &&
(!store || NullMessageStore::isNullStore(store) || (broker && !(broker->getQueueEvents().isSync())) )) {
if ( NullMessageStore::isNullStore(store)) {
QPID_LOG(warning, "Flow to disk not valid for non-persisted queue:" << getName());
@@ -800,7 +806,7 @@ void Queue::configureImpl(const FieldTable& _settings)
QPID_LOG(debug, "Configured queue " << getName() << " as priority queue.");
}
}
-
+
persistLastNode= _settings.get(qpidPersistLastNode);
if (persistLastNode) QPID_LOG(debug, "Configured queue to Persist data if cluster fails to one node for: " << getName());
@@ -809,15 +815,15 @@ void Queue::configureImpl(const FieldTable& _settings)
if (excludeList.size()) {
split(traceExclude, excludeList, ", ");
}
- QPID_LOG(debug, "Configured queue " << getName() << " with qpid.trace.id='" << traceId
+ QPID_LOG(debug, "Configured queue " << getName() << " with qpid.trace.id='" << traceId
<< "' and qpid.trace.exclude='"<< excludeList << "' i.e. " << traceExclude.size() << " elements");
FieldTable::ValuePtr p =_settings.get(qpidInsertSequenceNumbers);
if (p && p->convertsTo<std::string>()) insertSequenceNumbers(p->get<std::string>());
autoDeleteTimeout = getIntegerSetting(_settings, qpidAutoDeleteTimeout);
- if (autoDeleteTimeout)
- QPID_LOG(debug, "Configured queue " << getName() << " with qpid.auto_delete_timeout=" << autoDeleteTimeout);
+ if (autoDeleteTimeout)
+ QPID_LOG(debug, "Configured queue " << getName() << " with qpid.auto_delete_timeout=" << autoDeleteTimeout);
if (mgmtObject != 0) {
mgmtObject->set_arguments(ManagementAgent::toMap(_settings));
@@ -881,9 +887,9 @@ const QueuePolicy* Queue::getPolicy()
return policy.get();
}
-uint64_t Queue::getPersistenceId() const
-{
- return persistenceId;
+uint64_t Queue::getPersistenceId() const
+{
+ return persistenceId;
}
void Queue::setPersistenceId(uint64_t _persistenceId) const
@@ -897,11 +903,11 @@ void Queue::setPersistenceId(uint64_t _persistenceId) const
persistenceId = _persistenceId;
}
-void Queue::encode(Buffer& buffer) const
+void Queue::encode(Buffer& buffer) const
{
buffer.putShortString(name);
buffer.put(settings);
- if (policy.get()) {
+ if (policy.get()) {
buffer.put(*policy);
}
buffer.putShortString(alternateExchange.get() ? alternateExchange->getName() : std::string(""));
@@ -954,7 +960,7 @@ boost::shared_ptr<Exchange> Queue::getAlternateExchange()
void tryAutoDeleteImpl(Broker& broker, Queue::shared_ptr queue)
{
- if (broker.getQueues().destroyIf(queue->getName(),
+ if (broker.getQueues().destroyIf(queue->getName(),
boost::bind(boost::mem_fn(&Queue::canAutoDelete), queue))) {
QPID_LOG(debug, "Auto-deleting " << queue->getName());
queue->destroyed();
@@ -966,7 +972,7 @@ struct AutoDeleteTask : qpid::sys::TimerTask
Broker& broker;
Queue::shared_ptr queue;
- AutoDeleteTask(Broker& b, Queue::shared_ptr q, AbsTime fireTime)
+ AutoDeleteTask(Broker& b, Queue::shared_ptr q, AbsTime fireTime)
: qpid::sys::TimerTask(fireTime, "DelayedAutoDeletion"), broker(b), queue(q) {}
void fire()
@@ -984,27 +990,27 @@ void Queue::tryAutoDelete(Broker& broker, Queue::shared_ptr queue)
if (queue->autoDeleteTimeout && queue->canAutoDelete()) {
AbsTime time(now(), Duration(queue->autoDeleteTimeout * TIME_SEC));
queue->autoDeleteTask = boost::intrusive_ptr<qpid::sys::TimerTask>(new AutoDeleteTask(broker, queue, time));
- broker.getClusterTimer().add(queue->autoDeleteTask);
+ broker.getClusterTimer().add(queue->autoDeleteTask);
QPID_LOG(debug, "Timed auto-delete for " << queue->getName() << " initiated");
} else {
tryAutoDeleteImpl(broker, queue);
}
}
-bool Queue::isExclusiveOwner(const OwnershipToken* const o) const
-{
+bool Queue::isExclusiveOwner(const OwnershipToken* const o) const
+{
Mutex::ScopedLock locker(ownershipLock);
- return o == owner;
+ return o == owner;
}
-void Queue::releaseExclusiveOwnership()
-{
+void Queue::releaseExclusiveOwnership()
+{
Mutex::ScopedLock locker(ownershipLock);
- owner = 0;
+ owner = 0;
}
-bool Queue::setExclusiveOwner(const OwnershipToken* const o)
-{
+bool Queue::setExclusiveOwner(const OwnershipToken* const o)
+{
//reset auto deletion timer if necessary
if (autoDeleteTimeout && autoDeleteTask) {
autoDeleteTask->cancel();
@@ -1013,25 +1019,25 @@ bool Queue::setExclusiveOwner(const OwnershipToken* const o)
if (owner) {
return false;
} else {
- owner = o;
+ owner = o;
return true;
}
}
-bool Queue::hasExclusiveOwner() const
-{
+bool Queue::hasExclusiveOwner() const
+{
Mutex::ScopedLock locker(ownershipLock);
- return owner != 0;
+ return owner != 0;
}
-bool Queue::hasExclusiveConsumer() const
-{
- return exclusive;
+bool Queue::hasExclusiveConsumer() const
+{
+ return exclusive;
}
void Queue::setExternalQueueStore(ExternalQueueStore* inst) {
- if (externalQueueStore!=inst && externalQueueStore)
- delete externalQueueStore;
+ if (externalQueueStore!=inst && externalQueueStore)
+ delete externalQueueStore;
externalQueueStore = inst;
if (inst) {
@@ -1197,6 +1203,10 @@ const Broker* Queue::getBroker()
return broker;
}
+void Queue::setDequeueSincePurge(uint32_t value) {
+ dequeueSincePurge = AtomicValue<uint32_t>(value);
+}
+
Queue::UsageBarrier::UsageBarrier(Queue& q) : parent(q), count(0) {}
diff --git a/qpid/cpp/src/qpid/broker/Queue.h b/qpid/cpp/src/qpid/broker/Queue.h
index c4f1bcc07e..8435e75cab 100644
--- a/qpid/cpp/src/qpid/broker/Queue.h
+++ b/qpid/cpp/src/qpid/broker/Queue.h
@@ -10,9 +10,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -32,9 +32,9 @@
#include "qpid/broker/QueueBindings.h"
#include "qpid/broker/QueueListeners.h"
#include "qpid/broker/QueueObserver.h"
-#include "qpid/broker/RateTracker.h"
#include "qpid/framing/FieldTable.h"
+#include "qpid/sys/AtomicValue.h"
#include "qpid/sys/Monitor.h"
#include "qpid/sys/Timer.h"
#include "qpid/management/Manageable.h"
@@ -74,13 +74,13 @@ class Queue : public boost::enable_shared_from_this<Queue>,
{
Queue& parent;
uint count;
-
+
UsageBarrier(Queue&);
bool acquire();
void release();
void destroy();
};
-
+
struct ScopedUse
{
UsageBarrier& barrier;
@@ -88,7 +88,7 @@ class Queue : public boost::enable_shared_from_this<Queue>,
ScopedUse(UsageBarrier& b) : barrier(b), acquired(barrier.acquire()) {}
~ScopedUse() { if (acquired) barrier.release(); }
};
-
+
typedef std::set< boost::shared_ptr<QueueObserver> > Observers;
enum ConsumeCode {NO_MESSAGES=0, CANT_CONSUME=1, CONSUMED=2};
@@ -119,7 +119,7 @@ class Queue : public boost::enable_shared_from_this<Queue>,
boost::shared_ptr<Exchange> alternateExchange;
framing::SequenceNumber sequence;
qmf::org::apache::qpid::broker::Queue* mgmtObject;
- RateTracker dequeueTracker;
+ sys::AtomicValue<uint32_t> dequeueSincePurge; // Count dequeues since last purge.
int eventMode;
Observers observers;
bool insertSeqNo;
@@ -146,7 +146,7 @@ class Queue : public boost::enable_shared_from_this<Queue>,
void dequeued(const QueuedMessage& msg);
void pop();
void popAndDequeue();
- QueuedMessage getFront();
+
void forcePersistent(QueuedMessage& msg);
int getEventMode();
void configureImpl(const qpid::framing::FieldTable& settings);
@@ -184,8 +184,8 @@ class Queue : public boost::enable_shared_from_this<Queue>,
typedef std::vector<shared_ptr> vector;
QPID_BROKER_EXTERN Queue(const std::string& name,
- bool autodelete = false,
- MessageStore* const store = 0,
+ bool autodelete = false,
+ MessageStore* const store = 0,
const OwnershipToken* const owner = 0,
management::Manageable* parent = 0,
Broker* broker = 0);
@@ -245,11 +245,11 @@ class Queue : public boost::enable_shared_from_this<Queue>,
bool exclusive = false);
QPID_BROKER_EXTERN void cancel(Consumer::shared_ptr c);
- uint32_t purge(const uint32_t purge_request=0, boost::shared_ptr<Exchange> dest=boost::shared_ptr<Exchange>()); //defaults to all messages
- QPID_BROKER_EXTERN void purgeExpired();
+ uint32_t purge(const uint32_t purge_request=0, boost::shared_ptr<Exchange> dest=boost::shared_ptr<Exchange>()); //defaults to all messages
+ QPID_BROKER_EXTERN void purgeExpired(sys::Duration);
//move qty # of messages to destination Queue destq
- uint32_t move(const Queue::shared_ptr destq, uint32_t qty);
+ uint32_t move(const Queue::shared_ptr destq, uint32_t qty);
QPID_BROKER_EXTERN uint32_t getMessageCount() const;
QPID_BROKER_EXTERN uint32_t getEnqueueCompleteMessageCount() const;
@@ -288,8 +288,8 @@ class Queue : public boost::enable_shared_from_this<Queue>,
* Inform queue of messages that were enqueued, have since
* been acquired but not yet accepted or released (and
* thus are still logically on the queue) - used in
- * clustered broker.
- */
+ * clustered broker.
+ */
void updateEnqueued(const QueuedMessage& msg);
/**
@@ -300,9 +300,9 @@ class Queue : public boost::enable_shared_from_this<Queue>,
* accepted it).
*/
bool isEnqueued(const QueuedMessage& msg);
-
+
/**
- * Gets the next available message
+ * Gets the next available message
*/
QPID_BROKER_EXTERN QueuedMessage get();
@@ -382,6 +382,9 @@ class Queue : public boost::enable_shared_from_this<Queue>,
void flush();
const Broker* getBroker();
+
+ uint32_t getDequeueSincePurge() { return dequeueSincePurge.get(); }
+ void setDequeueSincePurge(uint32_t value);
};
}
}
diff --git a/qpid/cpp/src/qpid/broker/QueueCleaner.cpp b/qpid/cpp/src/qpid/broker/QueueCleaner.cpp
index 3499ea8a4d..838bc28be8 100644
--- a/qpid/cpp/src/qpid/broker/QueueCleaner.cpp
+++ b/qpid/cpp/src/qpid/broker/QueueCleaner.cpp
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -27,7 +27,7 @@
namespace qpid {
namespace broker {
-QueueCleaner::QueueCleaner(QueueRegistry& q, sys::Timer& t) : queues(q), timer(t) {}
+QueueCleaner::QueueCleaner(QueueRegistry& q, sys::Timer* t) : queues(q), timer(t) {}
QueueCleaner::~QueueCleaner()
{
@@ -36,10 +36,16 @@ QueueCleaner::~QueueCleaner()
void QueueCleaner::start(qpid::sys::Duration p)
{
+ period = p;
task = new Task(*this, p);
- timer.add(task);
+ timer->add(task);
}
+void QueueCleaner::setTimer(qpid::sys::Timer* timer) {
+ this->timer = timer;
+}
+
+
QueueCleaner::Task::Task(QueueCleaner& p, qpid::sys::Duration d) : sys::TimerTask(d,"QueueCleaner"), parent(p) {}
void QueueCleaner::Task::fire()
@@ -65,9 +71,9 @@ void QueueCleaner::fired()
std::vector<Queue::shared_ptr> copy;
CollectQueues collect(&copy);
queues.eachQueue(collect);
- std::for_each(copy.begin(), copy.end(), boost::bind(&Queue::purgeExpired, _1));
+ std::for_each(copy.begin(), copy.end(), boost::bind(&Queue::purgeExpired, _1, period));
task->setupNextFire();
- timer.add(task);
+ timer->add(task);
}
diff --git a/qpid/cpp/src/qpid/broker/QueueCleaner.h b/qpid/cpp/src/qpid/broker/QueueCleaner.h
index 11c2d180ac..ffebfe3e1b 100644
--- a/qpid/cpp/src/qpid/broker/QueueCleaner.h
+++ b/qpid/cpp/src/qpid/broker/QueueCleaner.h
@@ -10,9 +10,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -35,14 +35,15 @@ class QueueRegistry;
class QueueCleaner
{
public:
- QPID_BROKER_EXTERN QueueCleaner(QueueRegistry& queues, sys::Timer& timer);
+ QPID_BROKER_EXTERN QueueCleaner(QueueRegistry& queues, sys::Timer* timer);
QPID_BROKER_EXTERN ~QueueCleaner();
- QPID_BROKER_EXTERN void start(qpid::sys::Duration period);
+ QPID_BROKER_EXTERN void start(sys::Duration period);
+ QPID_BROKER_EXTERN void setTimer(sys::Timer* timer);
private:
class Task : public sys::TimerTask
{
public:
- Task(QueueCleaner& parent, qpid::sys::Duration duration);
+ Task(QueueCleaner& parent, sys::Duration duration);
void fire();
private:
QueueCleaner& parent;
@@ -50,7 +51,8 @@ class QueueCleaner
boost::intrusive_ptr<sys::TimerTask> task;
QueueRegistry& queues;
- sys::Timer& timer;
+ sys::Timer* timer;
+ sys::Duration period;
void fired();
};
diff --git a/qpid/cpp/src/qpid/broker/RateTracker.cpp b/qpid/cpp/src/qpid/broker/RateTracker.cpp
deleted file mode 100644
index 048349b658..0000000000
--- a/qpid/cpp/src/qpid/broker/RateTracker.cpp
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include "qpid/broker/RateTracker.h"
-
-using qpid::sys::AbsTime;
-using qpid::sys::Duration;
-using qpid::sys::TIME_SEC;
-
-namespace qpid {
-namespace broker {
-
-RateTracker::RateTracker() : currentCount(0), lastCount(0), lastTime(AbsTime::now()) {}
-
-RateTracker& RateTracker::operator++()
-{
- ++currentCount;
- return *this;
-}
-
-double RateTracker::sampleRatePerSecond()
-{
- int32_t increment = currentCount - lastCount;
- AbsTime now = AbsTime::now();
- Duration interval(lastTime, now);
- lastCount = currentCount;
- lastTime = now;
- //if sampling at higher frequency than supported, will just return the number of increments
- if (interval < TIME_SEC) return increment;
- else if (increment == 0) return 0;
- else return increment / (interval / TIME_SEC);
-}
-
-}} // namespace qpid::broker
diff --git a/qpid/cpp/src/qpid/broker/RateTracker.h b/qpid/cpp/src/qpid/broker/RateTracker.h
deleted file mode 100644
index 0c20b37312..0000000000
--- a/qpid/cpp/src/qpid/broker/RateTracker.h
+++ /dev/null
@@ -1,57 +0,0 @@
-#ifndef QPID_BROKER_RATETRACKER_H
-#define QPID_BROKER_RATETRACKER_H
-
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "qpid/sys/Time.h"
-
-namespace qpid {
-namespace broker {
-
-/**
- * Simple rate tracker: represents some value that can be incremented,
- * then can periodcially sample the rate of increments.
- */
-class RateTracker
-{
- public:
- RateTracker();
- /**
- * Increments the count being tracked. Can be called concurrently
- * with other calls to this operator as well as with calls to
- * sampleRatePerSecond().
- */
- RateTracker& operator++();
- /**
- * Returns the rate of increments per second since last
- * called. Calls to this method should be serialised, but can be
- * called concurrently with the increment operator
- */
- double sampleRatePerSecond();
- private:
- volatile int32_t currentCount;
- int32_t lastCount;
- qpid::sys::AbsTime lastTime;
-};
-}} // namespace qpid::broker
-
-#endif /*!QPID_BROKER_RATETRACKER_H*/
diff --git a/qpid/cpp/src/qpid/cluster/Cluster.cpp b/qpid/cpp/src/qpid/cluster/Cluster.cpp
index ede8d96681..82ed8bf8c9 100644
--- a/qpid/cpp/src/qpid/cluster/Cluster.cpp
+++ b/qpid/cpp/src/qpid/cluster/Cluster.cpp
@@ -146,6 +146,7 @@
#include "qpid/framing/AMQP_AllOperations.h"
#include "qpid/framing/AllInvoker.h"
#include "qpid/framing/ClusterConfigChangeBody.h"
+#include "qpid/framing/ClusterClockBody.h"
#include "qpid/framing/ClusterConnectionDeliverCloseBody.h"
#include "qpid/framing/ClusterConnectionAbortBody.h"
#include "qpid/framing/ClusterRetractOfferBody.h"
@@ -198,7 +199,7 @@ namespace _qmf = ::qmf::org::apache::qpid::cluster;
* Currently use SVN revision to avoid clashes with versions from
* different branches.
*/
-const uint32_t Cluster::CLUSTER_VERSION = 1097431;
+const uint32_t Cluster::CLUSTER_VERSION = 1128070;
struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler {
qpid::cluster::Cluster& cluster;
@@ -230,7 +231,6 @@ struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler {
cluster.updateOffer(member, updatee, l);
}
void retractOffer(uint64_t updatee) { cluster.retractOffer(member, updatee, l); }
- void messageExpired(uint64_t id) { cluster.messageExpired(member, id, l); }
void errorCheck(uint8_t type, const framing::SequenceNumber& frameSeq) {
cluster.errorCheck(member, type, frameSeq, l);
}
@@ -240,6 +240,7 @@ struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler {
void deliverToQueue(const std::string& queue, const std::string& message) {
cluster.deliverToQueue(queue, message, l);
}
+ void clock(uint64_t time) { cluster.clock(time, l); }
bool invoke(AMQBody& body) { return framing::invoke(*this, body).wasHandled(); }
};
@@ -253,7 +254,7 @@ Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) :
self(cpg.self()),
clusterId(true),
mAgent(0),
- expiryPolicy(new ExpiryPolicy(mcast, self, broker.getTimer())),
+ expiryPolicy(new ExpiryPolicy(*this)),
mcast(cpg, poller, boost::bind(&Cluster::leave, this)),
dispatcher(cpg, poller, boost::bind(&Cluster::leave, this)),
deliverEventQueue(boost::bind(&Cluster::deliveredEvent, this, _1),
@@ -668,6 +669,8 @@ void Cluster::initMapCompleted(Lock& l) {
else { // I can go ready.
discarding = false;
setReady(l);
+ // Must be called *before* memberUpdate so first update will be generated.
+ failoverExchange->setReady();
memberUpdate(l);
updateMgmtMembership(l);
mcast.mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()), self);
@@ -720,6 +723,20 @@ void Cluster::configChange(const MemberId&,
updateMgmtMembership(l); // Update on every config change for consistency
}
+struct ClusterClockTask : public sys::TimerTask {
+ Cluster& cluster;
+ sys::Timer& timer;
+
+ ClusterClockTask(Cluster& cluster, sys::Timer& timer, uint16_t clockInterval)
+ : TimerTask(Duration(clockInterval * TIME_MSEC),"ClusterClock"), cluster(cluster), timer(timer) {}
+
+ void fire() {
+ cluster.sendClockUpdate();
+ setupNextFire();
+ timer.add(this);
+ }
+};
+
void Cluster::becomeElder(Lock&) {
if (elder) return; // We were already the elder.
// We are the oldest, reactive links if necessary
@@ -727,6 +744,8 @@ void Cluster::becomeElder(Lock&) {
elder = true;
broker.getLinks().setPassive(false);
timer->becomeElder();
+
+ clockTimer.add(new ClusterClockTask(*this, clockTimer, settings.clockInterval));
}
void Cluster::makeOffer(const MemberId& id, Lock& ) {
@@ -847,7 +866,7 @@ void Cluster::updateOffer(const MemberId& updater, uint64_t updateeInt, Lock& l)
if (updatee != self && url) {
QPID_LOG(debug, debugSnapshot());
if (mAgent) mAgent->clusterUpdate();
- // Updatee will call clusterUpdate when update completes
+ // Updatee will call clusterUpdate() via checkUpdateIn() when update completes
}
}
@@ -928,10 +947,11 @@ void Cluster::checkUpdateIn(Lock& l) {
if (!updateClosed) return; // Wait till update connection closes.
if (updatedMap) { // We're up to date
map = *updatedMap;
- failoverExchange->setUrls(getUrls(l));
mcast.mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()), self);
state = CATCHUP;
memberUpdate(l);
+ // Must be called *after* memberUpdate() to avoid sending an extra update.
+ failoverExchange->setReady();
// NB: don't updateMgmtMembership() here as we are not in the deliver
// thread. It will be updated on delivery of the "ready" we just mcast.
broker.setClusterUpdatee(false);
@@ -1121,10 +1141,6 @@ void Cluster::setClusterId(const Uuid& uuid, Lock&) {
QPID_LOG(notice, *this << " cluster-uuid = " << clusterId);
}
-void Cluster::messageExpired(const MemberId&, uint64_t id, Lock&) {
- expiryPolicy->deliverExpire(id);
-}
-
void Cluster::errorCheck(const MemberId& from, uint8_t type, framing::SequenceNumber frameSeq, Lock&) {
// If we see an errorCheck here (rather than in the ErrorCheck
// class) then we have processed succesfully past the point of the
@@ -1162,6 +1178,35 @@ void Cluster::deliverToQueue(const std::string& queue, const std::string& messag
q->deliver(msg);
}
+sys::AbsTime Cluster::getClusterTime() {
+ Mutex::ScopedLock l(lock);
+ return clusterTime;
+}
+
+// This method is called during update on the updatee to set the initial cluster time.
+void Cluster::clock(const uint64_t time) {
+ Mutex::ScopedLock l(lock);
+ clock(time, l);
+}
+
+// called when broadcast message received
+void Cluster::clock(const uint64_t time, Lock&) {
+ clusterTime = AbsTime(EPOCH, time);
+ AbsTime now = AbsTime::now();
+
+ if (!elder) {
+ clusterTimeOffset = Duration(now, clusterTime);
+ }
+}
+
+// called by elder timer to send clock broadcast
+void Cluster::sendClockUpdate() {
+ Mutex::ScopedLock l(lock);
+ int64_t nanosecondsSinceEpoch = Duration(EPOCH, now());
+ nanosecondsSinceEpoch += clusterTimeOffset;
+ mcast.mcastControl(ClusterClockBody(ProtocolVersion(), nanosecondsSinceEpoch), self);
+}
+
bool Cluster::deferDeliveryImpl(const std::string& queue,
const boost::intrusive_ptr<broker::Message>& msg)
{
diff --git a/qpid/cpp/src/qpid/cluster/Cluster.h b/qpid/cpp/src/qpid/cluster/Cluster.h
index 78d325cdf9..adb06b2783 100644
--- a/qpid/cpp/src/qpid/cluster/Cluster.h
+++ b/qpid/cpp/src/qpid/cluster/Cluster.h
@@ -63,6 +63,12 @@ class AMQBody;
struct Uuid;
}
+namespace sys {
+class Timer;
+class AbsTime;
+class Duration;
+}
+
namespace cluster {
class Connection;
@@ -135,6 +141,9 @@ class Cluster : private Cpg::Handler, public management::Manageable {
bool deferDeliveryImpl(const std::string& queue,
const boost::intrusive_ptr<broker::Message>& msg);
+ sys::AbsTime getClusterTime();
+ void sendClockUpdate();
+ void clock(const uint64_t time);
private:
typedef sys::Monitor::ScopedLock Lock;
@@ -180,12 +189,12 @@ class Cluster : private Cpg::Handler, public management::Manageable {
const std::string& left,
const std::string& joined,
Lock& l);
- void messageExpired(const MemberId&, uint64_t, Lock& l);
void errorCheck(const MemberId&, uint8_t type, SequenceNumber frameSeq, Lock&);
void timerWakeup(const MemberId&, const std::string& name, Lock&);
void timerDrop(const MemberId&, const std::string& name, Lock&);
void shutdown(const MemberId&, const framing::Uuid& shutdownId, Lock&);
void deliverToQueue(const std::string& queue, const std::string& message, Lock&);
+ void clock(const uint64_t time, Lock&);
// Helper functions
ConnectionPtr getConnection(const EventFrame&, Lock&);
@@ -296,6 +305,9 @@ class Cluster : private Cpg::Handler, public management::Manageable {
ErrorCheck error;
UpdateReceiver updateReceiver;
ClusterTimer* timer;
+ sys::Timer clockTimer;
+ sys::AbsTime clusterTime;
+ sys::Duration clusterTimeOffset;
friend std::ostream& operator<<(std::ostream&, const Cluster&);
friend struct ClusterDispatcher;
diff --git a/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp b/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp
index 2962daaa07..69ba095f16 100644
--- a/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp
+++ b/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp
@@ -72,6 +72,7 @@ struct ClusterOptions : public Options {
("cluster-cman", optValue(settings.quorum), "Integrate with Cluster Manager (CMAN) cluster.")
#endif
("cluster-size", optValue(settings.size, "N"), "Wait for N cluster members before allowing clients to connect.")
+ ("cluster-clock-interval", optValue(settings.clockInterval,"N"), "How often to broadcast the current time to the cluster nodes, in milliseconds. A value between 5 and 1000 is recommended.")
("cluster-read-max", optValue(settings.readMax,"N"), "Experimental: flow-control limit reads per connection. 0=no limit.")
;
}
diff --git a/qpid/cpp/src/qpid/cluster/ClusterSettings.h b/qpid/cpp/src/qpid/cluster/ClusterSettings.h
index 8e708aa139..2f7b5be20a 100644
--- a/qpid/cpp/src/qpid/cluster/ClusterSettings.h
+++ b/qpid/cpp/src/qpid/cluster/ClusterSettings.h
@@ -35,8 +35,9 @@ struct ClusterSettings {
size_t readMax;
std::string username, password, mechanism;
size_t size;
+ uint16_t clockInterval;
- ClusterSettings() : quorum(false), readMax(10), size(1)
+ ClusterSettings() : quorum(false), readMax(10), size(1), clockInterval(10)
{}
Url getUrl(uint16_t port) const {
diff --git a/qpid/cpp/src/qpid/cluster/ClusterTimer.cpp b/qpid/cpp/src/qpid/cluster/ClusterTimer.cpp
index f6e1c7a849..b4f7d00f38 100644
--- a/qpid/cpp/src/qpid/cluster/ClusterTimer.cpp
+++ b/qpid/cpp/src/qpid/cluster/ClusterTimer.cpp
@@ -70,6 +70,7 @@ void ClusterTimer::add(intrusive_ptr<TimerTask> task)
if (i != map.end())
throw Exception(QPID_MSG("Task already exists with name " << task->getName()));
map[task->getName()] = task;
+
// Only the elder actually activates the task with the Timer base class.
if (cluster.isElder()) {
QPID_LOG(trace, "Elder activating cluster timer task " << task->getName());
@@ -112,6 +113,9 @@ void ClusterTimer::deliverWakeup(const std::string& name) {
else {
intrusive_ptr<TimerTask> t = i->second;
map.erase(i);
+ // Move the nextFireTime so readyToFire() is true. This is to ensure we
+ // don't get an error if the fired task calls setupNextFire()
+ t->setFired();
Timer::fire(t);
}
}
diff --git a/qpid/cpp/src/qpid/cluster/Connection.cpp b/qpid/cpp/src/qpid/cluster/Connection.cpp
index ab42122813..030d6e34c1 100644
--- a/qpid/cpp/src/qpid/cluster/Connection.cpp
+++ b/qpid/cpp/src/qpid/cluster/Connection.cpp
@@ -601,10 +601,6 @@ void Connection::queueObserverState(const std::string& qname, const std::string&
QPID_LOG(error, "Failed to find observer " << observerId << " state on queue " << qname << "; this will result in inconsistencies.");
}
-void Connection::expiryId(uint64_t id) {
- cluster.getExpiryPolicy().setId(id);
-}
-
std::ostream& operator<<(std::ostream& o, const Connection& c) {
const char* type="unknown";
if (c.isLocal()) type = "local";
@@ -724,5 +720,16 @@ void Connection::doCatchupIoCallbacks() {
if (catchUp) getBrokerConnection()->doIoCallbacks();
}
+
+void Connection::clock(uint64_t time) {
+ QPID_LOG(debug, "Cluster connection received time update");
+ cluster.clock(time);
+}
+
+void Connection::queueDequeueSincePurgeState(const std::string& qname, uint32_t dequeueSincePurge) {
+ boost::shared_ptr<broker::Queue> queue(findQueue(qname));
+ queue->setDequeueSincePurge(dequeueSincePurge);
+}
+
}} // Namespace qpid::cluster
diff --git a/qpid/cpp/src/qpid/cluster/Connection.h b/qpid/cpp/src/qpid/cluster/Connection.h
index a0da9efbb8..a9740f97f8 100644
--- a/qpid/cpp/src/qpid/cluster/Connection.h
+++ b/qpid/cpp/src/qpid/cluster/Connection.h
@@ -155,7 +155,6 @@ class Connection :
void queuePosition(const std::string&, const framing::SequenceNumber&);
void queueFairshareState(const std::string&, const uint8_t priority, const uint8_t count);
void queueObserverState(const std::string&, const std::string&, const framing::FieldTable&);
- void expiryId(uint64_t);
void txStart();
void txAccept(const framing::SequenceSet&);
@@ -192,6 +191,10 @@ class Connection :
void doCatchupIoCallbacks();
+ void clock(uint64_t time);
+
+ void queueDequeueSincePurgeState(const std::string&, uint32_t);
+
private:
struct NullFrameHandler : public framing::FrameHandler {
void handle(framing::AMQFrame&) {}
diff --git a/qpid/cpp/src/qpid/cluster/ExpiryPolicy.cpp b/qpid/cpp/src/qpid/cluster/ExpiryPolicy.cpp
index d9a7b0122a..0ef5c2a35d 100644
--- a/qpid/cpp/src/qpid/cluster/ExpiryPolicy.cpp
+++ b/qpid/cpp/src/qpid/cluster/ExpiryPolicy.cpp
@@ -21,106 +21,21 @@
#include "qpid/broker/Message.h"
#include "qpid/cluster/ExpiryPolicy.h"
-#include "qpid/cluster/Multicaster.h"
-#include "qpid/framing/ClusterMessageExpiredBody.h"
+#include "qpid/cluster/Cluster.h"
#include "qpid/sys/Time.h"
-#include "qpid/sys/Timer.h"
#include "qpid/log/Statement.h"
namespace qpid {
namespace cluster {
-ExpiryPolicy::ExpiryPolicy(Multicaster& m, const MemberId& id, sys::Timer& t)
- : expiryId(1), expiredPolicy(new Expired), mcast(m), memberId(id), timer(t) {}
+ExpiryPolicy::ExpiryPolicy(Cluster& cluster) : cluster(cluster) {}
-struct ExpiryTask : public sys::TimerTask {
- ExpiryTask(const boost::intrusive_ptr<ExpiryPolicy>& policy, uint64_t id, sys::AbsTime when)
- : TimerTask(when,"ExpiryPolicy"), expiryPolicy(policy), expiryId(id) {}
- void fire() { expiryPolicy->sendExpire(expiryId); }
- boost::intrusive_ptr<ExpiryPolicy> expiryPolicy;
- const uint64_t expiryId;
-};
-
-// Called while receiving an update
-void ExpiryPolicy::setId(uint64_t id) {
- sys::Mutex::ScopedLock l(lock);
- expiryId = id;
-}
-
-// Called while giving an update
-uint64_t ExpiryPolicy::getId() const {
- sys::Mutex::ScopedLock l(lock);
- return expiryId;
-}
-
-// Called in enqueuing connection thread
-void ExpiryPolicy::willExpire(broker::Message& m) {
- uint64_t id;
- {
- // When messages are fanned out to multiple queues, update sends
- // them as independenty messages so we can have multiple messages
- // with the same expiry ID.
- //
- sys::Mutex::ScopedLock l(lock);
- id = expiryId++;
- if (!id) { // This is an update of an already-expired message.
- m.setExpiryPolicy(expiredPolicy);
- }
- else {
- assert(unexpiredByMessage.find(&m) == unexpiredByMessage.end());
- // If this is an update, the id may already exist
- unexpiredById.insert(IdMessageMap::value_type(id, &m));
- unexpiredByMessage[&m] = id;
- }
- }
- timer.add(new ExpiryTask(this, id, m.getExpiration()));
-}
-
-// Called in dequeueing connection thread
-void ExpiryPolicy::forget(broker::Message& m) {
- sys::Mutex::ScopedLock l(lock);
- MessageIdMap::iterator i = unexpiredByMessage.find(&m);
- assert(i != unexpiredByMessage.end());
- unexpiredById.erase(i->second);
- unexpiredByMessage.erase(i);
-}
-
-// Called in dequeueing connection or cleanup thread.
bool ExpiryPolicy::hasExpired(broker::Message& m) {
- sys::Mutex::ScopedLock l(lock);
- return unexpiredByMessage.find(&m) == unexpiredByMessage.end();
-}
-
-// Called in timer thread
-void ExpiryPolicy::sendExpire(uint64_t id) {
- {
- sys::Mutex::ScopedLock l(lock);
- // Don't multicast an expiry notice if message is already forgotten.
- if (unexpiredById.find(id) == unexpiredById.end()) return;
- }
- mcast.mcastControl(framing::ClusterMessageExpiredBody(framing::ProtocolVersion(), id), memberId);
+ return m.getExpiration() < cluster.getClusterTime();
}
-// Called in CPG deliver thread.
-void ExpiryPolicy::deliverExpire(uint64_t id) {
- sys::Mutex::ScopedLock l(lock);
- std::pair<IdMessageMap::iterator, IdMessageMap::iterator> expired = unexpiredById.equal_range(id);
- IdMessageMap::iterator i = expired.first;
- while (i != expired.second) {
- i->second->setExpiryPolicy(expiredPolicy); // hasExpired() == true;
- unexpiredByMessage.erase(i->second);
- unexpiredById.erase(i++);
- }
+sys::AbsTime ExpiryPolicy::getCurrentTime() {
+ return cluster.getClusterTime();
}
-// Called in update thread on the updater.
-boost::optional<uint64_t> ExpiryPolicy::getId(broker::Message& m) {
- sys::Mutex::ScopedLock l(lock);
- MessageIdMap::iterator i = unexpiredByMessage.find(&m);
- return i == unexpiredByMessage.end() ? boost::optional<uint64_t>() : i->second;
-}
-
-bool ExpiryPolicy::Expired::hasExpired(broker::Message&) { return true; }
-void ExpiryPolicy::Expired::willExpire(broker::Message&) { }
-
}} // namespace qpid::cluster
diff --git a/qpid/cpp/src/qpid/cluster/ExpiryPolicy.h b/qpid/cpp/src/qpid/cluster/ExpiryPolicy.h
index 77a656aa68..d8ddbca8b3 100644
--- a/qpid/cpp/src/qpid/cluster/ExpiryPolicy.h
+++ b/qpid/cpp/src/qpid/cluster/ExpiryPolicy.h
@@ -36,12 +36,8 @@ namespace broker {
class Message;
}
-namespace sys {
-class Timer;
-}
-
namespace cluster {
-class Multicaster;
+class Cluster;
/**
* Cluster expiry policy
@@ -49,43 +45,13 @@ class Multicaster;
class ExpiryPolicy : public broker::ExpiryPolicy
{
public:
- ExpiryPolicy(Multicaster&, const MemberId&, sys::Timer&);
+ ExpiryPolicy(Cluster& cluster);
- void willExpire(broker::Message&);
bool hasExpired(broker::Message&);
- void forget(broker::Message&);
-
- // Send expiration notice to cluster.
- void sendExpire(uint64_t);
+ qpid::sys::AbsTime getCurrentTime();
- // Cluster delivers expiry notice.
- void deliverExpire(uint64_t);
-
- void setId(uint64_t id);
- uint64_t getId() const;
-
- boost::optional<uint64_t> getId(broker::Message&);
-
private:
- typedef std::map<broker::Message*, uint64_t> MessageIdMap;
- // When messages are fanned out to multiple queues, update sends
- // them as independenty messages so we can have multiple messages
- // with the same expiry ID.
- typedef std::multimap<uint64_t, broker::Message*> IdMessageMap;
-
- struct Expired : public broker::ExpiryPolicy {
- bool hasExpired(broker::Message&);
- void willExpire(broker::Message&);
- };
-
- mutable sys::Mutex lock;
- MessageIdMap unexpiredByMessage;
- IdMessageMap unexpiredById;
- uint64_t expiryId;
- boost::intrusive_ptr<Expired> expiredPolicy;
- Multicaster& mcast;
- MemberId memberId;
- sys::Timer& timer;
+ Cluster& cluster;
};
}} // namespace qpid::cluster
diff --git a/qpid/cpp/src/qpid/cluster/FailoverExchange.cpp b/qpid/cpp/src/qpid/cluster/FailoverExchange.cpp
index 84232dac1b..cfbe34a460 100644
--- a/qpid/cpp/src/qpid/cluster/FailoverExchange.cpp
+++ b/qpid/cpp/src/qpid/cluster/FailoverExchange.cpp
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -39,8 +39,10 @@ using namespace broker;
using namespace framing;
const string FailoverExchange::typeName("amq.failover");
-
-FailoverExchange::FailoverExchange(management::Manageable* parent, Broker* b) : Exchange(typeName, parent, b ) {
+
+FailoverExchange::FailoverExchange(management::Manageable* parent, Broker* b)
+ : Exchange(typeName, parent, b ), ready(false)
+{
if (mgmtExchange != 0)
mgmtExchange->set_type(typeName);
}
@@ -53,16 +55,17 @@ void FailoverExchange::setUrls(const vector<Url>& u) {
void FailoverExchange::updateUrls(const vector<Url>& u) {
Lock l(lock);
urls=u;
- if (urls.empty()) return;
- std::for_each(queues.begin(), queues.end(),
- boost::bind(&FailoverExchange::sendUpdate, this, _1));
+ if (ready && !urls.empty()) {
+ std::for_each(queues.begin(), queues.end(),
+ boost::bind(&FailoverExchange::sendUpdate, this, _1));
+ }
}
string FailoverExchange::getType() const { return typeName; }
bool FailoverExchange::bind(Queue::shared_ptr queue, const string&, const framing::FieldTable*) {
Lock l(lock);
- sendUpdate(queue);
+ if (ready) sendUpdate(queue);
return queues.insert(queue).second;
}
@@ -84,7 +87,7 @@ void FailoverExchange::sendUpdate(const Queue::shared_ptr& queue) {
// Called with lock held.
if (urls.empty()) return;
framing::Array array(0x95);
- for (Urls::const_iterator i = urls.begin(); i != urls.end(); ++i)
+ for (Urls::const_iterator i = urls.begin(); i != urls.end(); ++i)
array.add(boost::shared_ptr<Str16Value>(new Str16Value(i->str())));
const ProtocolVersion v;
boost::intrusive_ptr<Message> msg(new Message);
@@ -96,9 +99,12 @@ void FailoverExchange::sendUpdate(const Queue::shared_ptr& queue) {
header.get<MessageProperties>(true)->getApplicationHeaders().setArray(typeName, array);
AMQFrame headerFrame(header);
headerFrame.setFirstSegment(false);
- msg->getFrames().append(headerFrame);
+ msg->getFrames().append(headerFrame);
DeliverableMessage(msg).deliverTo(queue);
}
+void FailoverExchange::setReady() {
+ ready = true;
+}
}} // namespace cluster
diff --git a/qpid/cpp/src/qpid/cluster/FailoverExchange.h b/qpid/cpp/src/qpid/cluster/FailoverExchange.h
index 2e1edfc0ae..c3e50c6929 100644
--- a/qpid/cpp/src/qpid/cluster/FailoverExchange.h
+++ b/qpid/cpp/src/qpid/cluster/FailoverExchange.h
@@ -10,9 +10,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -46,6 +46,8 @@ class FailoverExchange : public broker::Exchange
void setUrls(const std::vector<Url>&);
/** Set the URLs and send an update.*/
void updateUrls(const std::vector<Url>&);
+ /** Flag the failover exchange as ready to generate updates (caught up) */
+ void setReady();
// Exchange overrides
std::string getType() const;
@@ -56,7 +58,7 @@ class FailoverExchange : public broker::Exchange
private:
void sendUpdate(const boost::shared_ptr<broker::Queue>&);
-
+
typedef sys::Mutex::ScopedLock Lock;
typedef std::vector<Url> Urls;
typedef std::set<boost::shared_ptr<broker::Queue> > Queues;
@@ -64,7 +66,7 @@ class FailoverExchange : public broker::Exchange
sys::Mutex lock;
Urls urls;
Queues queues;
-
+ bool ready;
};
}} // namespace qpid::cluster
diff --git a/qpid/cpp/src/qpid/cluster/UpdateClient.cpp b/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
index a15c14ff48..77448789db 100644
--- a/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
+++ b/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -26,9 +26,9 @@
#include "qpid/cluster/Decoder.h"
#include "qpid/cluster/ExpiryPolicy.h"
#include "qpid/cluster/UpdateDataExchange.h"
-#include "qpid/client/SessionBase_0_10Access.h"
-#include "qpid/client/ConnectionAccess.h"
-#include "qpid/client/SessionImpl.h"
+#include "qpid/client/SessionBase_0_10Access.h"
+#include "qpid/client/ConnectionAccess.h"
+#include "qpid/client/SessionImpl.h"
#include "qpid/client/ConnectionImpl.h"
#include "qpid/client/Future.h"
#include "qpid/broker/Broker.h"
@@ -83,11 +83,20 @@ using namespace framing;
namespace arg=client::arg;
using client::SessionBase_0_10Access;
+// Reserved exchange/queue name for catch-up, avoid clashes with user queues/exchanges.
+const std::string UpdateClient::UPDATE("x-qpid.cluster-update");
+// Name for header used to carry expiration information.
+const std::string UpdateClient::X_QPID_EXPIRATION = "x-qpid.expiration";
+// Headers used to flag headers/properties added by the UpdateClient so they can be
+// removed on the other side.
+const std::string UpdateClient::X_QPID_NO_MESSAGE_PROPS = "x-qpid.no-message-props";
+const std::string UpdateClient::X_QPID_NO_HEADERS = "x-qpid.no-headers";
+
std::ostream& operator<<(std::ostream& o, const UpdateClient& c) {
return o << "cluster(" << c.updaterId << " UPDATER)";
}
-struct ClusterConnectionProxy : public AMQP_AllProxy::ClusterConnection, public framing::FrameHandler
+struct ClusterConnectionProxy : public AMQP_AllProxy::ClusterConnection, public framing::FrameHandler
{
boost::shared_ptr<qpid::client::ConnectionImpl> connection;
@@ -121,7 +130,7 @@ void send(client::AsyncSession& s, const AMQBody& body) {
// TODO aconway 2008-09-24: optimization: update connections/sessions in parallel.
UpdateClient::UpdateClient(const MemberId& updater, const MemberId& updatee, const Url& url,
- broker::Broker& broker, const ClusterMap& m, ExpiryPolicy& expiry_,
+ broker::Broker& broker, const ClusterMap& m, ExpiryPolicy& expiry_,
const Cluster::ConnectionVector& cons, Decoder& decoder_,
const boost::function<void()>& ok,
const boost::function<void(const std::exception&)>& fail,
@@ -135,9 +144,6 @@ UpdateClient::UpdateClient(const MemberId& updater, const MemberId& updatee, con
UpdateClient::~UpdateClient() {}
-// Reserved exchange/queue name for catch-up, avoid clashes with user queues/exchanges.
-const std::string UpdateClient::UPDATE("qpid.cluster-update");
-
void UpdateClient::run() {
try {
connection.open(updateeUrl, connectionSettings);
@@ -155,6 +161,13 @@ void UpdateClient::update() {
<< " at " << updateeUrl);
Broker& b = updaterBroker;
+ if(b.getExpiryPolicy()) {
+ QPID_LOG(debug, *this << "Updating updatee with cluster time");
+ qpid::sys::AbsTime clusterTime = b.getExpiryPolicy()->getCurrentTime();
+ int64_t time = qpid::sys::Duration(qpid::sys::EPOCH, clusterTime);
+ ClusterConnectionProxy(session).clock(time);
+ }
+
updateManagementSetupState();
b.getExchanges().eachExchange(boost::bind(&UpdateClient::updateExchange, this, _1));
@@ -174,7 +187,6 @@ void UpdateClient::update() {
// Update queue listeners: must come after sessions so consumerNumbering is populated
b.getQueues().eachQueue(boost::bind(&UpdateClient::updateQueueListeners, this, _1));
- ClusterConnectionProxy(session).expiryId(expiry.getId());
updateLinks();
updateManagementAgent();
@@ -188,7 +200,7 @@ void UpdateClient::update() {
// NOTE: connection will be closed from the other end, don't close
// it here as that causes a race.
-
+
// TODO aconway 2010-03-15: This sleep avoids the race condition
// described in // https://bugzilla.redhat.com/show_bug.cgi?id=568831.
// It allows the connection to fully close before destroying the
@@ -280,7 +292,7 @@ class MessageUpdater {
framing::SequenceNumber lastPos;
client::AsyncSession session;
ExpiryPolicy& expiry;
-
+
public:
MessageUpdater(const string& q, const client::AsyncSession s, ExpiryPolicy& expiry_) : queue(q), haveLastPos(false), session(s), expiry(expiry_) {
@@ -297,7 +309,6 @@ class MessageUpdater {
}
}
-
void updateQueuedMessage(const broker::QueuedMessage& message) {
// Send the queue position if necessary.
if (!haveLastPos || message.position - lastPos != 1) {
@@ -306,10 +317,23 @@ class MessageUpdater {
}
lastPos = message.position;
- // Send the expiry ID if necessary.
- if (message.payload->getProperties<DeliveryProperties>()->getTtl()) {
- boost::optional<uint64_t> expiryId = expiry.getId(*message.payload);
- ClusterConnectionProxy(session).expiryId(expiryId?*expiryId:0);
+ // if the ttl > 0, we need to send the calculated expiration time to the updatee
+ if (message.payload->getProperties<DeliveryProperties>()->getTtl() > 0) {
+ bool hadMessageProps =
+ message.payload->hasProperties<framing::MessageProperties>();
+ framing::MessageProperties* mprops =
+ message.payload->getProperties<framing::MessageProperties>();
+ bool hadApplicationHeaders = mprops->hasApplicationHeaders();
+ FieldTable& applicationHeaders = mprops->getApplicationHeaders();
+ applicationHeaders.setInt64(
+ UpdateClient::X_QPID_EXPIRATION,
+ sys::Duration(sys::EPOCH, message.payload->getExpiration()));
+ // If message properties or application headers didn't exist
+ // prior to us adding data, we want to remove them on the other side.
+ if (!hadMessageProps)
+ applicationHeaders.setInt(UpdateClient::X_QPID_NO_MESSAGE_PROPS, 0);
+ else if (!hadApplicationHeaders)
+ applicationHeaders.setInt(UpdateClient::X_QPID_NO_HEADERS, 0);
}
// We can't send a broker::Message via the normal client API,
@@ -322,7 +346,7 @@ class MessageUpdater {
framing::MessageTransferBody transfer(
*message.payload->getFrames().as<framing::MessageTransferBody>());
transfer.setDestination(UpdateClient::UPDATE);
-
+
sb.get()->send(transfer, message.payload->getFrames(),
!message.payload->isContentReleased());
if (message.payload->isContentReleased()){
@@ -330,12 +354,21 @@ class MessageUpdater {
uint16_t maxContentSize = maxFrameSize - AMQFrame::frameOverhead();
bool morecontent = true;
for (uint64_t offset = 0; morecontent; offset += maxContentSize)
- {
+ {
AMQFrame frame((AMQContentBody()));
morecontent = message.payload->getContentFrame(*(message.queue), frame, maxContentSize, offset);
sb.get()->sendRawFrame(frame);
}
}
+ // If the ttl > 0, we need to send the calculated expiration time to the updatee
+ // Careful not to alter the message as a side effect e.g. by adding
+ // an empty DeliveryProperties or setting TTL when it wasn't set before.
+ uint64_t ttl = 0;
+ if (message.payload->hasProperties<DeliveryProperties>()) {
+ DeliveryProperties* dprops =
+ message.payload->getProperties<DeliveryProperties>();
+ if (dprops->hasTtl()) ttl = dprops->getTtl();
+ };
}
void updateMessage(const boost::intrusive_ptr<broker::Message>& message) {
@@ -361,6 +394,8 @@ void UpdateClient::updateQueue(client::AsyncSession& s, const boost::shared_ptr<
if (qpid::broker::Fairshare::getState(q->getMessages(), priority, count)) {
ClusterConnectionProxy(s).queueFairshareState(q->getName(), priority, count);
}
+
+ ClusterConnectionProxy(s).queueDequeueSincePurgeState(q->getName(), q->getDequeueSincePurge());
}
void UpdateClient::updateExclusiveQueue(const boost::shared_ptr<broker::Queue>& q) {
@@ -393,7 +428,7 @@ void UpdateClient::updateConnection(const boost::intrusive_ptr<Connection>& upda
QPID_LOG(debug, *this << " updating connection " << *updateConnection);
assert(updateConnection->getBrokerConnection());
broker::Connection& bc = *updateConnection->getBrokerConnection();
-
+
// Send the management ID first on the main connection.
std::string mgmtId = updateConnection->getBrokerConnection()->getMgmtId();
ClusterConnectionProxy(session).shadowPrepare(mgmtId);
@@ -430,7 +465,7 @@ void UpdateClient::updateSession(broker::SessionHandler& sh) {
QPID_LOG(debug, *this << " updating session " << ss->getId());
- // Create a client session to update session state.
+ // Create a client session to update session state.
boost::shared_ptr<client::ConnectionImpl> cimpl = client::ConnectionAccess::getImpl(shadowConnection);
boost::shared_ptr<client::SessionImpl> simpl = cimpl->newSession(ss->getId().getName(), ss->getTimeout(), sh.getChannel());
simpl->disableAutoDetach();
@@ -456,12 +491,12 @@ void UpdateClient::updateSession(broker::SessionHandler& sh) {
// Adjust command counter for message in progress, will be sent after state update.
boost::intrusive_ptr<Message> inProgress = ss->getMessageInProgress();
SequenceNumber received = ss->receiverGetReceived().command;
- if (inProgress)
+ if (inProgress)
--received;
// Sync the session to ensure all responses from broker have been processed.
shadowSession.sync();
-
+
// Reset command-sequence state.
proxy.sessionState(
ss->senderGetReplayPoint().command,
@@ -511,7 +546,7 @@ void UpdateClient::updateConsumer(
QPID_LOG(debug, *this << " updated consumer " << ci->getName()
<< " on " << shadowSession.getId());
}
-
+
void UpdateClient::updateUnacked(const broker::DeliveryRecord& dr) {
if (!dr.isEnded() && dr.isAcquired() && dr.getMessage().payload) {
// If the message is acquired then it is no longer on the
@@ -543,7 +578,7 @@ class TxOpUpdater : public broker::TxOpConstVisitor, public MessageUpdater {
void operator()(const broker::DtxAck& ) {
throw InternalErrorException("DTX transactions not currently supported by cluster.");
}
-
+
void operator()(const broker::RecoveredDequeue& rdeq) {
updateMessage(rdeq.getMessage());
proxy.txEnqueue(rdeq.getQueue()->getName());
@@ -563,7 +598,7 @@ class TxOpUpdater : public broker::TxOpConstVisitor, public MessageUpdater {
typedef std::list<Queue::shared_ptr> QueueList;
const QueueList& qlist = txPub.getQueues();
Array qarray(TYPE_CODE_STR8);
- for (QueueList::const_iterator i = qlist.begin(); i != qlist.end(); ++i)
+ for (QueueList::const_iterator i = qlist.begin(); i != qlist.end(); ++i)
qarray.push_back(Array::ValuePtr(new Str8Value((*i)->getName())));
proxy.txPublish(qarray, txPub.delivered);
}
@@ -573,7 +608,7 @@ class TxOpUpdater : public broker::TxOpConstVisitor, public MessageUpdater {
client::AsyncSession session;
ClusterConnectionProxy proxy;
};
-
+
void UpdateClient::updateTxState(broker::SemanticState& s) {
QPID_LOG(debug, *this << " updating TX transaction state.");
ClusterConnectionProxy proxy(shadowSession);
diff --git a/qpid/cpp/src/qpid/cluster/UpdateClient.h b/qpid/cpp/src/qpid/cluster/UpdateClient.h
index b72d090d73..21bf6024e0 100644
--- a/qpid/cpp/src/qpid/cluster/UpdateClient.h
+++ b/qpid/cpp/src/qpid/cluster/UpdateClient.h
@@ -10,9 +10,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -69,14 +69,19 @@ class ExpiryPolicy;
class UpdateClient : public sys::Runnable {
public:
static const std::string UPDATE; // Name for special update queue and exchange.
+ static const std::string X_QPID_EXPIRATION; // Update message expiration
+ // Flag to remove props/headers that were added by the UpdateClient
+ static const std::string X_QPID_NO_MESSAGE_PROPS;
+ static const std::string X_QPID_NO_HEADERS;
+
static client::Connection catchUpConnection();
-
+
UpdateClient(const MemberId& updater, const MemberId& updatee, const Url&,
broker::Broker& donor, const ClusterMap& map, ExpiryPolicy& expiry,
const std::vector<boost::intrusive_ptr<Connection> >&, Decoder&,
const boost::function<void()>& done,
const boost::function<void(const std::exception&)>& fail,
- const client::ConnectionSettings&
+ const client::ConnectionSettings&
);
~UpdateClient();
diff --git a/qpid/cpp/src/qpid/cluster/UpdateExchange.cpp b/qpid/cpp/src/qpid/cluster/UpdateExchange.cpp
index 11937f296f..e830459aba 100644
--- a/qpid/cpp/src/qpid/cluster/UpdateExchange.cpp
+++ b/qpid/cpp/src/qpid/cluster/UpdateExchange.cpp
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -19,6 +19,7 @@
*
*/
#include "qpid/framing/MessageTransferBody.h"
+#include "qpid/framing/FieldTable.h"
#include "qpid/broker/Message.h"
#include "UpdateExchange.h"
@@ -27,6 +28,8 @@ namespace cluster {
using framing::MessageTransferBody;
using framing::DeliveryProperties;
+using framing::MessageProperties;
+using framing::FieldTable;
UpdateExchange::UpdateExchange(management::Manageable* parent)
: broker::Exchange(UpdateClient::UPDATE, parent),
@@ -34,6 +37,7 @@ UpdateExchange::UpdateExchange(management::Manageable* parent)
void UpdateExchange::setProperties(const boost::intrusive_ptr<broker::Message>& msg) {
+ // Copy exchange name to destination property.
MessageTransferBody* transfer = msg->getMethod<MessageTransferBody>();
assert(transfer);
const DeliveryProperties* props = msg->getProperties<DeliveryProperties>();
@@ -42,6 +46,23 @@ void UpdateExchange::setProperties(const boost::intrusive_ptr<broker::Message>&
transfer->setDestination(props->getExchange());
else
transfer->clearDestinationFlag();
-}
+ // Copy expiration from x-property if present.
+ if (msg->hasProperties<MessageProperties>()) {
+ MessageProperties* mprops = msg->getProperties<MessageProperties>();
+ if (mprops->hasApplicationHeaders()) {
+ FieldTable& headers = mprops->getApplicationHeaders();
+ if (headers.isSet(UpdateClient::X_QPID_EXPIRATION)) {
+ msg->setExpiration(
+ sys::AbsTime(sys::EPOCH, headers.getAsInt64(UpdateClient::X_QPID_EXPIRATION)));
+ headers.erase(UpdateClient::X_QPID_EXPIRATION);
+ // Erase props/headers that were added by the UpdateClient
+ if (headers.isSet(UpdateClient::X_QPID_NO_MESSAGE_PROPS))
+ msg->eraseProperties<MessageProperties>();
+ else if (headers.isSet(UpdateClient::X_QPID_NO_HEADERS))
+ mprops->clearApplicationHeadersFlag();
+ }
+ }
+ }
+}
}} // namespace qpid::cluster
diff --git a/qpid/cpp/src/qpid/framing/AMQHeaderBody.h b/qpid/cpp/src/qpid/framing/AMQHeaderBody.h
index a8c326969a..452154eb5c 100644
--- a/qpid/cpp/src/qpid/framing/AMQHeaderBody.h
+++ b/qpid/cpp/src/qpid/framing/AMQHeaderBody.h
@@ -10,9 +10,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -58,7 +58,7 @@ class QPID_COMMON_CLASS_EXTERN AMQHeaderBody : public AMQBody
}
else
return Base::decode(buffer, size, type);
- }
+ }
void print(std::ostream& out) const {
const boost::optional<T>& p=this->OptProps<T>::props;
if (p) out << *p;
@@ -77,7 +77,7 @@ class QPID_COMMON_CLASS_EXTERN AMQHeaderBody : public AMQBody
typedef PropSet<PropSet<Empty, DeliveryProperties>, MessageProperties> Properties;
Properties properties;
-
+
public:
inline uint8_t type() const { return HEADER_BODY; }
@@ -99,6 +99,10 @@ public:
return properties.OptProps<T>::props.get_ptr();
}
+ template <class T> void erase() {
+ properties.OptProps<T>::props.reset();
+ }
+
boost::intrusive_ptr<AMQBody> clone() const { return BodyFactory::copy(*this); }
};
diff --git a/qpid/cpp/src/qpid/sys/Timer.cpp b/qpid/cpp/src/qpid/sys/Timer.cpp
index fdb2e8c6bb..47752e4584 100644
--- a/qpid/cpp/src/qpid/sys/Timer.cpp
+++ b/qpid/cpp/src/qpid/sys/Timer.cpp
@@ -75,6 +75,12 @@ void TimerTask::cancel() {
cancelled = true;
}
+void TimerTask::setFired() {
+ // Set nextFireTime to just before now, making readyToFire() true.
+ nextFireTime = AbsTime(sys::now(), Duration(-1));
+}
+
+
Timer::Timer() :
active(false),
late(50 * TIME_MSEC),
diff --git a/qpid/cpp/src/qpid/sys/Timer.h b/qpid/cpp/src/qpid/sys/Timer.h
index 98ba39ce38..fccb17dbc2 100644
--- a/qpid/cpp/src/qpid/sys/Timer.h
+++ b/qpid/cpp/src/qpid/sys/Timer.h
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -64,6 +64,10 @@ class TimerTask : public RefCounted {
std::string getName() const { return name; }
+ // Move the nextFireTime so readyToFire is true.
+ // Used by the cluster, where tasks are fired on cluster events, not on local time.
+ QPID_COMMON_EXTERN void setFired();
+
protected:
// Must be overridden with callback
virtual void fire() = 0;
diff --git a/qpid/cpp/src/tests/QueueTest.cpp b/qpid/cpp/src/tests/QueueTest.cpp
index 34e4592a15..6fdc4c69ad 100644
--- a/qpid/cpp/src/tests/QueueTest.cpp
+++ b/qpid/cpp/src/tests/QueueTest.cpp
@@ -681,7 +681,7 @@ QPID_AUTO_TEST_CASE(testPurgeExpired) {
addMessagesToQueue(10, queue);
BOOST_CHECK_EQUAL(queue.getMessageCount(), 10u);
::usleep(300*1000);
- queue.purgeExpired();
+ queue.purgeExpired(0);
BOOST_CHECK_EQUAL(queue.getMessageCount(), 5u);
}
@@ -692,7 +692,7 @@ QPID_AUTO_TEST_CASE(testQueueCleaner) {
addMessagesToQueue(10, *queue, 200, 400);
BOOST_CHECK_EQUAL(queue->getMessageCount(), 10u);
- QueueCleaner cleaner(queues, timer);
+ QueueCleaner cleaner(queues, &timer);
cleaner.start(100 * qpid::sys::TIME_MSEC);
::usleep(300*1000);
BOOST_CHECK_EQUAL(queue->getMessageCount(), 5u);
diff --git a/qpid/cpp/src/tests/cluster_test_logs.py b/qpid/cpp/src/tests/cluster_test_logs.py
index 9f7d1e2f6c..a0ce8fb9c3 100755
--- a/qpid/cpp/src/tests/cluster_test_logs.py
+++ b/qpid/cpp/src/tests/cluster_test_logs.py
@@ -54,7 +54,7 @@ def filter_log(log):
'caught up',
'active for links|Passivating links|Activating links',
'info Connection.* connected to', # UpdateClient connection
- 'warning Connection [\d+ [0-9.:]+] closed', # UpdateClient connection
+ 'warning Connection \\[[-0-9.: ]+\\] closed', # UpdateClient connection
'warning Broker closed connection: 200, OK',
'task late',
'task overran',
diff --git a/qpid/cpp/src/tests/cluster_tests.py b/qpid/cpp/src/tests/cluster_tests.py
index 66cfd27f7b..f5ce2a234f 100755
--- a/qpid/cpp/src/tests/cluster_tests.py
+++ b/qpid/cpp/src/tests/cluster_tests.py
@@ -18,11 +18,12 @@
# under the License.
#
-import os, signal, sys, time, imp, re, subprocess, glob, cluster_test_logs
+import os, signal, sys, time, imp, re, subprocess, glob, random, logging
+import cluster_test_logs
from qpid import datatypes, messaging
from brokertest import *
from qpid.harness import Skipped
-from qpid.messaging import Message, Empty, Disposition, REJECTED
+from qpid.messaging import Message, Empty, Disposition, REJECTED, util
from threading import Thread, Lock, Condition
from logging import getLogger
from itertools import chain
@@ -96,9 +97,15 @@ class ShortTests(BrokerTest):
destination="amq.direct",
message=qpid.datatypes.Message(props, "content"))
+ # Try message with TTL and differnet headers/properties
+ cluster[0].send_message("q", Message(durable=True, ttl=100000))
+ cluster[0].send_message("q", Message(durable=True, properties={}, ttl=100000))
+ cluster[0].send_message("q", Message(durable=True, properties={"x":10}, ttl=100000))
+
# Now update a new member and compare their dumps.
cluster.start(args=["--test-store-dump", "updatee.dump"])
assert readfile("direct.dump") == readfile("updatee.dump")
+
os.remove("direct.dump")
os.remove("updatee.dump")
@@ -687,6 +694,25 @@ acl allow all all
self.assert_browse(s1, "q", ["foo"])
+ def test_ttl_consistent(self):
+ """Ensure we don't get inconsistent errors with message that have TTL very close together"""
+ messages = [ Message(str(i), ttl=i/1000.0) for i in xrange(0,1000)]
+ messages.append(Message("x"))
+ cluster = self.cluster(2)
+ sender = cluster[0].connect().session().sender("q;{create:always}")
+
+ def fetch(b):
+ receiver = b.connect().session().receiver("q;{create:always}")
+ while receiver.fetch().content != "x": pass
+
+ for m in messages: sender.send(m, sync=False)
+ for m in messages: sender.send(m, sync=False)
+ fetch(cluster[0])
+ fetch(cluster[1])
+ for m in messages: sender.send(m, sync=False)
+ cluster.start()
+ fetch(cluster[2])
+
class LongTests(BrokerTest):
"""Tests that can run for a long time if -DDURATION=<minutes> is set"""
def duration(self):
@@ -811,7 +837,7 @@ class LongTests(BrokerTest):
endtime = time.time() + self.duration()
# For long duration, first run is a quarter of the duration.
- runtime = max(5, self.duration() / 4.0)
+ runtime = min(5.0, self.duration() / 3.0)
alive = 0 # First live cluster member
for i in range(len(cluster)): start_clients(cluster[i])
start_mclients(cluster[alive])
@@ -853,7 +879,7 @@ class LongTests(BrokerTest):
end = time.time() + self.duration()
while (time.time() < end): # Get a management interval
for i in xrange(1000): cluster[0].connect().close()
- cluster_test_logs.verify_logs()
+ cluster_test_logs.verify_logs()
def test_flowlimit_failover(self):
"""Test fail-over during continuous send-receive with flow control
@@ -880,6 +906,7 @@ class LongTests(BrokerTest):
while time.time() < endtime:
for s in senders: s.sender.assert_running()
receiver.receiver.assert_running()
+ for b in cluster[i:]: b.ready() # Check if any broker crashed.
cluster[i].kill()
i += 1
b = cluster.start(expect=EXPECT_EXIT_FAIL)
@@ -889,6 +916,114 @@ class LongTests(BrokerTest):
receiver.stop()
for i in range(i, len(cluster)): cluster[i].kill()
+ def test_ttl_failover(self):
+ """Test that messages with TTL don't cause problems in a cluster with failover"""
+
+ class Client(StoppableThread):
+
+ def __init__(self, broker):
+ StoppableThread.__init__(self)
+ self.connection = broker.connect(reconnect=True)
+ self.auto_fetch_reconnect_urls(self.connection)
+ self.session = self.connection.session()
+
+ def auto_fetch_reconnect_urls(self, conn):
+ """Replacment for qpid.messaging.util version which is noisy"""
+ ssn = conn.session("auto-fetch-reconnect-urls")
+ rcv = ssn.receiver("amq.failover")
+ rcv.capacity = 10
+
+ def main():
+ while True:
+ try:
+ msg = rcv.fetch()
+ qpid.messaging.util.set_reconnect_urls(conn, msg)
+ ssn.acknowledge(msg, sync=False)
+ except messaging.exceptions.LinkClosed: return
+ except messaging.exceptions.ConnectionError: return
+
+ thread = Thread(name="auto-fetch-reconnect-urls", target=main)
+ thread.setDaemon(True)
+ thread.start()
+
+ def stop(self):
+ StoppableThread.stop(self)
+ self.connection.detach()
+
+ class Sender(Client):
+ def __init__(self, broker, address):
+ Client.__init__(self, broker)
+ self.sent = 0 # Number of messages _reliably_ sent.
+ self.sender = self.session.sender(address, capacity=1000)
+
+ def send_counted(self, ttl):
+ self.sender.send(Message(str(self.sent), ttl=ttl))
+ self.sent += 1
+
+ def run(self):
+ while not self.stopped:
+ choice = random.randint(0,4)
+ if choice == 0: self.send_counted(None) # No ttl
+ elif choice == 1: self.send_counted(100000) # Large ttl
+ else: # Small ttl, might expire
+ self.sender.send(Message("", ttl=random.random()/10))
+ self.sender.send(Message("z"), sync=True) # Chaser.
+
+ class Receiver(Client):
+
+ def __init__(self, broker, address):
+ Client.__init__(self, broker)
+ self.received = 0 # Number of non-empty (reliable) messages received.
+ self.receiver = self.session.receiver(address, capacity=1000)
+ def run(self):
+ try:
+ while True:
+ m = self.receiver.fetch(1)
+ if m.content == "z": break
+ if m.content: # Ignore unreliable messages
+ # Ignore duplicates
+ if int(m.content) == self.received: self.received += 1
+ except Exception,e: self.error = e
+
+ # def test_ttl_failover
+
+ # Original cluster will all be killed so expect exit with failure
+ # Set small purge interval.
+ cluster = self.cluster(3, expect=EXPECT_EXIT_FAIL, args=["--queue-purge-interval=1"])
+ # Python client failover produces noisy WARN logs, disable temporarily
+ logger = logging.getLogger()
+ log_level = logger.getEffectiveLevel()
+ logger.setLevel(logging.ERROR)
+ try:
+ # Start sender and receiver threads
+ receiver = Receiver(cluster[0], "q;{create:always}")
+ receiver.start()
+ sender = Sender(cluster[0], "q;{create:always}")
+ sender.start()
+
+ # Kill brokers in a cycle.
+ endtime = time.time() + self.duration()
+ runtime = min(5.0, self.duration() / 4.0)
+ i = 0
+ while time.time() < endtime:
+ for b in cluster[i:]: b.ready() # Check if any broker crashed.
+ cluster[i].kill()
+ i += 1
+ b = cluster.start(expect=EXPECT_EXIT_FAIL)
+ b.ready()
+ time.sleep(runtime)
+ sender.stop()
+ receiver.stop()
+ for b in cluster[i:]:
+ b.ready() # Check it didn't crash
+ b.kill()
+ self.assertEqual(sender.sent, receiver.received)
+ cluster_test_logs.verify_logs()
+ finally:
+ # Detach to avoid slow reconnect attempts during shut-down if test fails.
+ sender.connection.detach()
+ receiver.connection.detach()
+ logger.setLevel(log_level)
class StoreTests(BrokerTest):
"""
diff --git a/qpid/cpp/xml/cluster.xml b/qpid/cpp/xml/cluster.xml
index 4d83c5b5de..c33f2e4852 100644
--- a/qpid/cpp/xml/cluster.xml
+++ b/qpid/cpp/xml/cluster.xml
@@ -8,9 +8,9 @@
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
--
+-
- http://www.apache.org/licenses/LICENSE-2.0
--
+-
- Unless required by applicable law or agreed to in writing,
- software distributed under the License is distributed on an
- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -78,10 +78,6 @@
<field name="left" type="vbin16"/> <!-- packed member-id array -->
</control>
- <control name="message-expired" code="0x12">
- <field name="id" type="uint64"/>
- </control>
-
<domain name="error-type" type="uint8" label="Types of error">
<enum>
<choice name="none" value="0"/>
@@ -89,7 +85,7 @@
<choice name="connection" value="2"/>
</enum>
</domain>
-
+
<!-- Check for error consistency across the cluster -->
<control name="error-check" code="0x14">
<field name="type" type="error-type"/>
@@ -116,6 +112,11 @@
<field name="message" type="vbin32"/>
</control>
+ <!-- Update the cluster time -->
+ <control name="clock" code="0x22">
+ <field name="time" type="uint64"/>
+ </control>
+
</class>
<!-- Controls associated with a specific connection. -->
@@ -149,7 +150,7 @@
<!-- Abort a connection that is sending invalid data. -->
<control name="abort" code="0x4"/>
-
+
<!-- Update controls. Sent to a new broker in joining mode.
A connection is updated as followed:
- send the shadow's management ID in shadow-perpare on the update connection
@@ -183,7 +184,7 @@
<field name="position" type="sequence-no"/>
<field name="tag" type="str8"/>
<field name="id" type="sequence-no"/>
- <field name="acquired" type="bit"/> <!--If not set, message follows. -->
+ <field name="acquired" type="bit"/> <!--If not set, message is on update queue. -->
<field name="accepted" type="bit"/>
<field name="cancelled" type="bit"/>
<field name="completed" type="bit"/>
@@ -192,9 +193,9 @@
<field name="enqueued" type="bit"/>
<field name="credit" type="uint32"/>
</control>
-
+
<!-- Tx transaction state. -->
- <control name="tx-start" code="0x12"/>
+ <control name="tx-start" code="0x12"/>
<control name="tx-accept" code="0x13"> <field name="commands" type="sequence-set"/> </control>
<control name="tx-dequeue" code="0x14"> <field name="queue" type="str8"/> </control>
<control name="tx-enqueue" code="0x15"> <field name="queue" type="str8"/> </control>
@@ -204,7 +205,7 @@
</control>
<control name="tx-end" code="0x17"/>
<control name="accumulated-ack" code="0x18"> <field name="commands" type="sequence-set"/> </control>
-
+
<!-- Consumers in the connection's output task -->
<control name="output-task" code="0x19">
<field name="channel" type="uint16"/>
@@ -253,9 +254,6 @@
<!-- Replicate encoded exchanges/queues. -->
<control name="exchange" code="0x31"><field name="encoded" type="str32"/></control>
- <!-- Set expiry-id for subsequent messages. -->
- <control name="expiry-id" code="0x33"><field name="expiry-id" type="uint64"/></control>
-
<!-- Add a listener to a queue -->
<control name="add-queue-listener" code="0x34">
<field name="queue" type="str8"/>
@@ -289,6 +287,18 @@
<field name="state" type="map"/> <!-- "name"=value -->
</control>
+ <!-- Update the cluster time -->
+ <control name="clock" code="0x40">
+ <field name="time" type="uint64"/>
+ </control>
+
+ <!-- Update a queue's dequeue rate -->
+ <control name="queue-dequeue-since-purge-state" code="0x41">
+ <field name="queue" type="str8"/>
+ <field name="dequeueSincePurge" type="uint32"/>
+ </control>
+
+
</class>
</amqp>