diff options
Diffstat (limited to 'cpp/src/qpid/broker/Queue.h')
-rw-r--r-- | cpp/src/qpid/broker/Queue.h | 131 |
1 files changed, 40 insertions, 91 deletions
diff --git a/cpp/src/qpid/broker/Queue.h b/cpp/src/qpid/broker/Queue.h index 59ae41e768..12a3d273be 100644 --- a/cpp/src/qpid/broker/Queue.h +++ b/cpp/src/qpid/broker/Queue.h @@ -10,9 +10,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -32,9 +32,9 @@ #include "qpid/broker/QueueBindings.h" #include "qpid/broker/QueueListeners.h" #include "qpid/broker/QueueObserver.h" +#include "qpid/broker/RateTracker.h" #include "qpid/framing/FieldTable.h" -#include "qpid/sys/AtomicValue.h" #include "qpid/sys/Monitor.h" #include "qpid/sys/Timer.h" #include "qpid/management/Manageable.h" @@ -59,7 +59,7 @@ class MessageStore; class QueueEvents; class QueueRegistry; class TransactionContext; -class MessageDistributor; +class Exchange; /** * The brokers representation of an amqp queue. Messages are @@ -74,13 +74,13 @@ class Queue : public boost::enable_shared_from_this<Queue>, { Queue& parent; uint count; - + UsageBarrier(Queue&); bool acquire(); void release(); void destroy(); }; - + struct ScopedUse { UsageBarrier& barrier; @@ -88,7 +88,7 @@ class Queue : public boost::enable_shared_from_this<Queue>, ScopedUse(UsageBarrier& b) : barrier(b), acquired(barrier.acquire()) {} ~ScopedUse() { if (acquired) barrier.release(); } }; - + typedef std::set< boost::shared_ptr<QueueObserver> > Observers; enum ConsumeCode {NO_MESSAGES=0, CANT_CONSUME=1, CONSUMED=2}; @@ -119,7 +119,7 @@ class Queue : public boost::enable_shared_from_this<Queue>, boost::shared_ptr<Exchange> alternateExchange; framing::SequenceNumber sequence; qmf::org::apache::qpid::broker::Queue* mgmtObject; - sys::AtomicValue<uint32_t> dequeueSincePurge; // Count dequeues since last purge. + RateTracker dequeueTracker; int eventMode; Observers observers; bool insertSeqNo; @@ -129,36 +129,26 @@ class Queue : public boost::enable_shared_from_this<Queue>, UsageBarrier barrier; int autoDeleteTimeout; boost::intrusive_ptr<qpid::sys::TimerTask> autoDeleteTask; - boost::shared_ptr<MessageDistributor> allocator; void push(boost::intrusive_ptr<Message>& msg, bool isRecovery=false); void setPolicy(std::auto_ptr<QueuePolicy> policy); - bool getNextMessage(QueuedMessage& msg, Consumer::shared_ptr& c); - ConsumeCode consumeNextMessage(QueuedMessage& msg, Consumer::shared_ptr& c); - bool browseNextMessage(QueuedMessage& msg, Consumer::shared_ptr& c); + bool seek(QueuedMessage& msg, Consumer::shared_ptr position); + bool getNextMessage(QueuedMessage& msg, Consumer::shared_ptr c); + ConsumeCode consumeNextMessage(QueuedMessage& msg, Consumer::shared_ptr c); + bool browseNextMessage(QueuedMessage& msg, Consumer::shared_ptr c); void notifyListener(); void removeListener(Consumer::shared_ptr); bool isExcluded(boost::intrusive_ptr<Message>& msg); - /** update queue observers, stats, policy, etc when the messages' state changes. Lock - * must be held by caller */ - void observeEnqueue(const QueuedMessage& msg, const sys::Mutex::ScopedLock& lock); - void observeAcquire(const QueuedMessage& msg, const sys::Mutex::ScopedLock& lock); - void observeRequeue(const QueuedMessage& msg, const sys::Mutex::ScopedLock& lock); - void observeDequeue(const QueuedMessage& msg, const sys::Mutex::ScopedLock& lock); - - /** modify the Queue's message container - assumes messageLock held */ - void pop(const sys::Mutex::ScopedLock& held); // acquire front msg - void popAndDequeue(const sys::Mutex::ScopedLock& held); // acquire and dequeue front msg - // acquire message @ position, return true and set msg if acquire succeeds - bool acquire(const qpid::framing::SequenceNumber& position, QueuedMessage& msg, - const sys::Mutex::ScopedLock& held); - + void enqueued(const QueuedMessage& msg); + void dequeued(const QueuedMessage& msg); + void pop(); + void popAndDequeue(); + QueuedMessage getFront(); void forcePersistent(QueuedMessage& msg); int getEventMode(); - void configureImpl(const qpid::framing::FieldTable& settings); inline void mgntEnqStats(const boost::intrusive_ptr<Message>& msg) { @@ -182,9 +172,8 @@ class Queue : public boost::enable_shared_from_this<Queue>, } } } - + void checkNotDeleted(); - void notifyDeleted(); public: @@ -193,50 +182,29 @@ class Queue : public boost::enable_shared_from_this<Queue>, typedef std::vector<shared_ptr> vector; QPID_BROKER_EXTERN Queue(const std::string& name, - bool autodelete = false, - MessageStore* const store = 0, + bool autodelete = false, + MessageStore* const store = 0, const OwnershipToken* const owner = 0, management::Manageable* parent = 0, Broker* broker = 0); QPID_BROKER_EXTERN ~Queue(); - /** allow the Consumer to consume or browse the next available message */ QPID_BROKER_EXTERN bool dispatch(Consumer::shared_ptr); - /** allow the Consumer to acquire a message that it has browsed. - * @param msg - message to be acquired. - * @return false if message is no longer available for acquire. - */ - QPID_BROKER_EXTERN bool acquire(const QueuedMessage& msg, const std::string& consumer); + void create(const qpid::framing::FieldTable& settings); - /** - * Used to configure a new queue and create a persistent record - * for it in store if required. - */ - QPID_BROKER_EXTERN void create(const qpid::framing::FieldTable& settings); - - /** - * Used to reconfigure a recovered queue (does not create - * persistent record in store). - */ - QPID_BROKER_EXTERN void configure(const qpid::framing::FieldTable& settings); - void destroyed(); + // "recovering" means we are doing a MessageStore recovery. + QPID_BROKER_EXTERN void configure(const qpid::framing::FieldTable& settings, + bool recovering = false); + void destroy(); + void notifyDeleted(); QPID_BROKER_EXTERN void bound(const std::string& exchange, const std::string& key, const qpid::framing::FieldTable& args); - //TODO: get unbind out of the public interface; only there for purposes of one unit test - QPID_BROKER_EXTERN void unbind(ExchangeRegistry& exchanges); - /** - * Bind self to specified exchange, and record that binding for unbinding on delete. - */ - bool bind(boost::shared_ptr<Exchange> exchange, const std::string& key, - const qpid::framing::FieldTable& arguments=qpid::framing::FieldTable()); + QPID_BROKER_EXTERN void unbind(ExchangeRegistry& exchanges, + Queue::shared_ptr shared_ref); - /** Acquire the message at the given position if it is available for acquire. Not to - * be used by clients, but used by the broker for queue management. - * @param message - set to the acquired message if true returned. - * @return true if the message has been acquired. - */ + QPID_BROKER_EXTERN bool acquire(const QueuedMessage& msg); QPID_BROKER_EXTERN bool acquireMessageAt(const qpid::framing::SequenceNumber& position, QueuedMessage& message); /** @@ -265,14 +233,11 @@ class Queue : public boost::enable_shared_from_this<Queue>, bool exclusive = false); QPID_BROKER_EXTERN void cancel(Consumer::shared_ptr c); - uint32_t purge(const uint32_t purge_request=0, //defaults to all messages - boost::shared_ptr<Exchange> dest=boost::shared_ptr<Exchange>(), - const ::qpid::types::Variant::Map *filter=0); - QPID_BROKER_EXTERN void purgeExpired(sys::Duration); + uint32_t purge(const uint32_t purge_request=0, boost::shared_ptr<Exchange> dest=boost::shared_ptr<Exchange>()); //defaults to all messages + QPID_BROKER_EXTERN void purgeExpired(); //move qty # of messages to destination Queue destq - uint32_t move(const Queue::shared_ptr destq, uint32_t qty, - const qpid::types::Variant::Map *filter=0); + uint32_t move(const Queue::shared_ptr destq, uint32_t qty); QPID_BROKER_EXTERN uint32_t getMessageCount() const; QPID_BROKER_EXTERN uint32_t getEnqueueCompleteMessageCount() const; @@ -311,8 +276,8 @@ class Queue : public boost::enable_shared_from_this<Queue>, * Inform queue of messages that were enqueued, have since * been acquired but not yet accepted or released (and * thus are still logically on the queue) - used in - * clustered broker. - */ + * clustered broker. + */ void updateEnqueued(const QueuedMessage& msg); /** @@ -323,14 +288,14 @@ class Queue : public boost::enable_shared_from_this<Queue>, * accepted it). */ bool isEnqueued(const QueuedMessage& msg); - + /** - * Acquires the next available (oldest) message + * Gets the next available message */ QPID_BROKER_EXTERN QueuedMessage get(); - /** Get the message at position pos, returns true if found and sets msg */ - QPID_BROKER_EXTERN bool find(framing::SequenceNumber pos, QueuedMessage& msg ) const; + /** Get the message at position pos */ + QPID_BROKER_EXTERN QueuedMessage find(framing::SequenceNumber pos) const; const QueuePolicy* getPolicy(); @@ -344,13 +309,8 @@ class Queue : public boost::enable_shared_from_this<Queue>, void encode(framing::Buffer& buffer) const; uint32_t encodedSize() const; - /** - * Restores a queue from encoded data (used in recovery) - * - * Note: restored queue will be neither auto-deleted or have an - * exclusive owner - */ - static Queue::shared_ptr restore(QueueRegistry& queues, framing::Buffer& buffer); + // "recovering" means we are doing a MessageStore recovery. + static Queue::shared_ptr decode(QueueRegistry& queues, framing::Buffer& buffer, bool recovering = false ); static void tryAutoDelete(Broker& broker, Queue::shared_ptr); virtual void setExternalQueueStore(ExternalQueueStore* inst); @@ -359,7 +319,6 @@ class Queue : public boost::enable_shared_from_this<Queue>, management::ManagementObject* GetManagementObject (void) const; management::Manageable::status_t ManagementMethod (uint32_t methodId, management::Args& args, std::string& text); - void query(::qpid::types::Variant::Map&) const; /** Apply f to each Message on the queue. */ template <class F> void eachMessage(F f) { @@ -372,11 +331,6 @@ class Queue : public boost::enable_shared_from_this<Queue>, bindings.eachBinding(f); } - /** Apply f to each Observer on the queue */ - template <class F> void eachObserver(F f) { - std::for_each<Observers::iterator, F>(observers.begin(), observers.end(), f); - } - /** Set the position sequence number for the next message on the queue. * Must be >= the current sequence number. * Used by cluster to replicate queues. @@ -404,11 +358,6 @@ class Queue : public boost::enable_shared_from_this<Queue>, void recoverPrepared(boost::intrusive_ptr<Message>& msg); void flush(); - - const Broker* getBroker(); - - uint32_t getDequeueSincePurge() { return dequeueSincePurge.get(); } - void setDequeueSincePurge(uint32_t value); }; } } |