diff options
author | Alan Conway <aconway@apache.org> | 2011-06-15 20:15:51 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2011-06-15 20:15:51 +0000 |
commit | 4cdf746f5bb38db60821047c3393f89f15b26f1e (patch) | |
tree | 610a404288b464a2225668c128fa77a84020ea62 /cpp/src/qpid/broker/Queue.h | |
parent | 8034affaba71c0d991bfe1fff5de537f73d0f404 (diff) | |
download | qpid-python-4cdf746f5bb38db60821047c3393f89f15b26f1e.tar.gz |
QPID-3280: Performance problem with TTL messages.
When sending a large number of messages with nonzero TTLs to a
cluster, overall message throughput drops by around 20-30% compared to
messages with TTL 0.
The previous approach to TTL in the cluster is replaced with a simpler
"cluster clock". Also QueueCleaner is executed in the cluster timer,
and modified to be deterministic in a cluster.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1136170 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker/Queue.h')
-rw-r--r-- | cpp/src/qpid/broker/Queue.h | 37 |
1 files changed, 20 insertions, 17 deletions
diff --git a/cpp/src/qpid/broker/Queue.h b/cpp/src/qpid/broker/Queue.h index c4f1bcc07e..8435e75cab 100644 --- a/cpp/src/qpid/broker/Queue.h +++ b/cpp/src/qpid/broker/Queue.h @@ -10,9 +10,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -32,9 +32,9 @@ #include "qpid/broker/QueueBindings.h" #include "qpid/broker/QueueListeners.h" #include "qpid/broker/QueueObserver.h" -#include "qpid/broker/RateTracker.h" #include "qpid/framing/FieldTable.h" +#include "qpid/sys/AtomicValue.h" #include "qpid/sys/Monitor.h" #include "qpid/sys/Timer.h" #include "qpid/management/Manageable.h" @@ -74,13 +74,13 @@ class Queue : public boost::enable_shared_from_this<Queue>, { Queue& parent; uint count; - + UsageBarrier(Queue&); bool acquire(); void release(); void destroy(); }; - + struct ScopedUse { UsageBarrier& barrier; @@ -88,7 +88,7 @@ class Queue : public boost::enable_shared_from_this<Queue>, ScopedUse(UsageBarrier& b) : barrier(b), acquired(barrier.acquire()) {} ~ScopedUse() { if (acquired) barrier.release(); } }; - + typedef std::set< boost::shared_ptr<QueueObserver> > Observers; enum ConsumeCode {NO_MESSAGES=0, CANT_CONSUME=1, CONSUMED=2}; @@ -119,7 +119,7 @@ class Queue : public boost::enable_shared_from_this<Queue>, boost::shared_ptr<Exchange> alternateExchange; framing::SequenceNumber sequence; qmf::org::apache::qpid::broker::Queue* mgmtObject; - RateTracker dequeueTracker; + sys::AtomicValue<uint32_t> dequeueSincePurge; // Count dequeues since last purge. int eventMode; Observers observers; bool insertSeqNo; @@ -146,7 +146,7 @@ class Queue : public boost::enable_shared_from_this<Queue>, void dequeued(const QueuedMessage& msg); void pop(); void popAndDequeue(); - QueuedMessage getFront(); + void forcePersistent(QueuedMessage& msg); int getEventMode(); void configureImpl(const qpid::framing::FieldTable& settings); @@ -184,8 +184,8 @@ class Queue : public boost::enable_shared_from_this<Queue>, typedef std::vector<shared_ptr> vector; QPID_BROKER_EXTERN Queue(const std::string& name, - bool autodelete = false, - MessageStore* const store = 0, + bool autodelete = false, + MessageStore* const store = 0, const OwnershipToken* const owner = 0, management::Manageable* parent = 0, Broker* broker = 0); @@ -245,11 +245,11 @@ class Queue : public boost::enable_shared_from_this<Queue>, bool exclusive = false); QPID_BROKER_EXTERN void cancel(Consumer::shared_ptr c); - uint32_t purge(const uint32_t purge_request=0, boost::shared_ptr<Exchange> dest=boost::shared_ptr<Exchange>()); //defaults to all messages - QPID_BROKER_EXTERN void purgeExpired(); + uint32_t purge(const uint32_t purge_request=0, boost::shared_ptr<Exchange> dest=boost::shared_ptr<Exchange>()); //defaults to all messages + QPID_BROKER_EXTERN void purgeExpired(sys::Duration); //move qty # of messages to destination Queue destq - uint32_t move(const Queue::shared_ptr destq, uint32_t qty); + uint32_t move(const Queue::shared_ptr destq, uint32_t qty); QPID_BROKER_EXTERN uint32_t getMessageCount() const; QPID_BROKER_EXTERN uint32_t getEnqueueCompleteMessageCount() const; @@ -288,8 +288,8 @@ class Queue : public boost::enable_shared_from_this<Queue>, * Inform queue of messages that were enqueued, have since * been acquired but not yet accepted or released (and * thus are still logically on the queue) - used in - * clustered broker. - */ + * clustered broker. + */ void updateEnqueued(const QueuedMessage& msg); /** @@ -300,9 +300,9 @@ class Queue : public boost::enable_shared_from_this<Queue>, * accepted it). */ bool isEnqueued(const QueuedMessage& msg); - + /** - * Gets the next available message + * Gets the next available message */ QPID_BROKER_EXTERN QueuedMessage get(); @@ -382,6 +382,9 @@ class Queue : public boost::enable_shared_from_this<Queue>, void flush(); const Broker* getBroker(); + + uint32_t getDequeueSincePurge() { return dequeueSincePurge.get(); } + void setDequeueSincePurge(uint32_t value); }; } } |