diff options
Diffstat (limited to 'cpp/src/qpid/broker/Queue.h')
-rw-r--r-- | cpp/src/qpid/broker/Queue.h | 54 |
1 files changed, 39 insertions, 15 deletions
diff --git a/cpp/src/qpid/broker/Queue.h b/cpp/src/qpid/broker/Queue.h index 8435e75cab..59ae41e768 100644 --- a/cpp/src/qpid/broker/Queue.h +++ b/cpp/src/qpid/broker/Queue.h @@ -59,7 +59,7 @@ class MessageStore; class QueueEvents; class QueueRegistry; class TransactionContext; -class Exchange; +class MessageDistributor; /** * The brokers representation of an amqp queue. Messages are @@ -129,23 +129,32 @@ class Queue : public boost::enable_shared_from_this<Queue>, UsageBarrier barrier; int autoDeleteTimeout; boost::intrusive_ptr<qpid::sys::TimerTask> autoDeleteTask; + boost::shared_ptr<MessageDistributor> allocator; void push(boost::intrusive_ptr<Message>& msg, bool isRecovery=false); void setPolicy(std::auto_ptr<QueuePolicy> policy); - bool seek(QueuedMessage& msg, Consumer::shared_ptr position); - bool getNextMessage(QueuedMessage& msg, Consumer::shared_ptr c); - ConsumeCode consumeNextMessage(QueuedMessage& msg, Consumer::shared_ptr c); - bool browseNextMessage(QueuedMessage& msg, Consumer::shared_ptr c); + bool getNextMessage(QueuedMessage& msg, Consumer::shared_ptr& c); + ConsumeCode consumeNextMessage(QueuedMessage& msg, Consumer::shared_ptr& c); + bool browseNextMessage(QueuedMessage& msg, Consumer::shared_ptr& c); void notifyListener(); void removeListener(Consumer::shared_ptr); bool isExcluded(boost::intrusive_ptr<Message>& msg); - void enqueued(const QueuedMessage& msg); - void dequeued(const QueuedMessage& msg); - void pop(); - void popAndDequeue(); + /** update queue observers, stats, policy, etc when the messages' state changes. Lock + * must be held by caller */ + void observeEnqueue(const QueuedMessage& msg, const sys::Mutex::ScopedLock& lock); + void observeAcquire(const QueuedMessage& msg, const sys::Mutex::ScopedLock& lock); + void observeRequeue(const QueuedMessage& msg, const sys::Mutex::ScopedLock& lock); + void observeDequeue(const QueuedMessage& msg, const sys::Mutex::ScopedLock& lock); + + /** modify the Queue's message container - assumes messageLock held */ + void pop(const sys::Mutex::ScopedLock& held); // acquire front msg + void popAndDequeue(const sys::Mutex::ScopedLock& held); // acquire and dequeue front msg + // acquire message @ position, return true and set msg if acquire succeeds + bool acquire(const qpid::framing::SequenceNumber& position, QueuedMessage& msg, + const sys::Mutex::ScopedLock& held); void forcePersistent(QueuedMessage& msg); int getEventMode(); @@ -191,8 +200,15 @@ class Queue : public boost::enable_shared_from_this<Queue>, Broker* broker = 0); QPID_BROKER_EXTERN ~Queue(); + /** allow the Consumer to consume or browse the next available message */ QPID_BROKER_EXTERN bool dispatch(Consumer::shared_ptr); + /** allow the Consumer to acquire a message that it has browsed. + * @param msg - message to be acquired. + * @return false if message is no longer available for acquire. + */ + QPID_BROKER_EXTERN bool acquire(const QueuedMessage& msg, const std::string& consumer); + /** * Used to configure a new queue and create a persistent record * for it in store if required. @@ -216,7 +232,11 @@ class Queue : public boost::enable_shared_from_this<Queue>, bool bind(boost::shared_ptr<Exchange> exchange, const std::string& key, const qpid::framing::FieldTable& arguments=qpid::framing::FieldTable()); - QPID_BROKER_EXTERN bool acquire(const QueuedMessage& msg); + /** Acquire the message at the given position if it is available for acquire. Not to + * be used by clients, but used by the broker for queue management. + * @param message - set to the acquired message if true returned. + * @return true if the message has been acquired. + */ QPID_BROKER_EXTERN bool acquireMessageAt(const qpid::framing::SequenceNumber& position, QueuedMessage& message); /** @@ -245,11 +265,14 @@ class Queue : public boost::enable_shared_from_this<Queue>, bool exclusive = false); QPID_BROKER_EXTERN void cancel(Consumer::shared_ptr c); - uint32_t purge(const uint32_t purge_request=0, boost::shared_ptr<Exchange> dest=boost::shared_ptr<Exchange>()); //defaults to all messages + uint32_t purge(const uint32_t purge_request=0, //defaults to all messages + boost::shared_ptr<Exchange> dest=boost::shared_ptr<Exchange>(), + const ::qpid::types::Variant::Map *filter=0); QPID_BROKER_EXTERN void purgeExpired(sys::Duration); //move qty # of messages to destination Queue destq - uint32_t move(const Queue::shared_ptr destq, uint32_t qty); + uint32_t move(const Queue::shared_ptr destq, uint32_t qty, + const qpid::types::Variant::Map *filter=0); QPID_BROKER_EXTERN uint32_t getMessageCount() const; QPID_BROKER_EXTERN uint32_t getEnqueueCompleteMessageCount() const; @@ -302,12 +325,12 @@ class Queue : public boost::enable_shared_from_this<Queue>, bool isEnqueued(const QueuedMessage& msg); /** - * Gets the next available message + * Acquires the next available (oldest) message */ QPID_BROKER_EXTERN QueuedMessage get(); - /** Get the message at position pos */ - QPID_BROKER_EXTERN QueuedMessage find(framing::SequenceNumber pos) const; + /** Get the message at position pos, returns true if found and sets msg */ + QPID_BROKER_EXTERN bool find(framing::SequenceNumber pos, QueuedMessage& msg ) const; const QueuePolicy* getPolicy(); @@ -336,6 +359,7 @@ class Queue : public boost::enable_shared_from_this<Queue>, management::ManagementObject* GetManagementObject (void) const; management::Manageable::status_t ManagementMethod (uint32_t methodId, management::Args& args, std::string& text); + void query(::qpid::types::Variant::Map&) const; /** Apply f to each Message on the queue. */ template <class F> void eachMessage(F f) { |