summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/Queue.h
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/Queue.h')
-rw-r--r--cpp/src/qpid/broker/Queue.h131
1 files changed, 40 insertions, 91 deletions
diff --git a/cpp/src/qpid/broker/Queue.h b/cpp/src/qpid/broker/Queue.h
index 59ae41e768..12a3d273be 100644
--- a/cpp/src/qpid/broker/Queue.h
+++ b/cpp/src/qpid/broker/Queue.h
@@ -10,9 +10,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -32,9 +32,9 @@
#include "qpid/broker/QueueBindings.h"
#include "qpid/broker/QueueListeners.h"
#include "qpid/broker/QueueObserver.h"
+#include "qpid/broker/RateTracker.h"
#include "qpid/framing/FieldTable.h"
-#include "qpid/sys/AtomicValue.h"
#include "qpid/sys/Monitor.h"
#include "qpid/sys/Timer.h"
#include "qpid/management/Manageable.h"
@@ -59,7 +59,7 @@ class MessageStore;
class QueueEvents;
class QueueRegistry;
class TransactionContext;
-class MessageDistributor;
+class Exchange;
/**
* The brokers representation of an amqp queue. Messages are
@@ -74,13 +74,13 @@ class Queue : public boost::enable_shared_from_this<Queue>,
{
Queue& parent;
uint count;
-
+
UsageBarrier(Queue&);
bool acquire();
void release();
void destroy();
};
-
+
struct ScopedUse
{
UsageBarrier& barrier;
@@ -88,7 +88,7 @@ class Queue : public boost::enable_shared_from_this<Queue>,
ScopedUse(UsageBarrier& b) : barrier(b), acquired(barrier.acquire()) {}
~ScopedUse() { if (acquired) barrier.release(); }
};
-
+
typedef std::set< boost::shared_ptr<QueueObserver> > Observers;
enum ConsumeCode {NO_MESSAGES=0, CANT_CONSUME=1, CONSUMED=2};
@@ -119,7 +119,7 @@ class Queue : public boost::enable_shared_from_this<Queue>,
boost::shared_ptr<Exchange> alternateExchange;
framing::SequenceNumber sequence;
qmf::org::apache::qpid::broker::Queue* mgmtObject;
- sys::AtomicValue<uint32_t> dequeueSincePurge; // Count dequeues since last purge.
+ RateTracker dequeueTracker;
int eventMode;
Observers observers;
bool insertSeqNo;
@@ -129,36 +129,26 @@ class Queue : public boost::enable_shared_from_this<Queue>,
UsageBarrier barrier;
int autoDeleteTimeout;
boost::intrusive_ptr<qpid::sys::TimerTask> autoDeleteTask;
- boost::shared_ptr<MessageDistributor> allocator;
void push(boost::intrusive_ptr<Message>& msg, bool isRecovery=false);
void setPolicy(std::auto_ptr<QueuePolicy> policy);
- bool getNextMessage(QueuedMessage& msg, Consumer::shared_ptr& c);
- ConsumeCode consumeNextMessage(QueuedMessage& msg, Consumer::shared_ptr& c);
- bool browseNextMessage(QueuedMessage& msg, Consumer::shared_ptr& c);
+ bool seek(QueuedMessage& msg, Consumer::shared_ptr position);
+ bool getNextMessage(QueuedMessage& msg, Consumer::shared_ptr c);
+ ConsumeCode consumeNextMessage(QueuedMessage& msg, Consumer::shared_ptr c);
+ bool browseNextMessage(QueuedMessage& msg, Consumer::shared_ptr c);
void notifyListener();
void removeListener(Consumer::shared_ptr);
bool isExcluded(boost::intrusive_ptr<Message>& msg);
- /** update queue observers, stats, policy, etc when the messages' state changes. Lock
- * must be held by caller */
- void observeEnqueue(const QueuedMessage& msg, const sys::Mutex::ScopedLock& lock);
- void observeAcquire(const QueuedMessage& msg, const sys::Mutex::ScopedLock& lock);
- void observeRequeue(const QueuedMessage& msg, const sys::Mutex::ScopedLock& lock);
- void observeDequeue(const QueuedMessage& msg, const sys::Mutex::ScopedLock& lock);
-
- /** modify the Queue's message container - assumes messageLock held */
- void pop(const sys::Mutex::ScopedLock& held); // acquire front msg
- void popAndDequeue(const sys::Mutex::ScopedLock& held); // acquire and dequeue front msg
- // acquire message @ position, return true and set msg if acquire succeeds
- bool acquire(const qpid::framing::SequenceNumber& position, QueuedMessage& msg,
- const sys::Mutex::ScopedLock& held);
-
+ void enqueued(const QueuedMessage& msg);
+ void dequeued(const QueuedMessage& msg);
+ void pop();
+ void popAndDequeue();
+ QueuedMessage getFront();
void forcePersistent(QueuedMessage& msg);
int getEventMode();
- void configureImpl(const qpid::framing::FieldTable& settings);
inline void mgntEnqStats(const boost::intrusive_ptr<Message>& msg)
{
@@ -182,9 +172,8 @@ class Queue : public boost::enable_shared_from_this<Queue>,
}
}
}
-
+
void checkNotDeleted();
- void notifyDeleted();
public:
@@ -193,50 +182,29 @@ class Queue : public boost::enable_shared_from_this<Queue>,
typedef std::vector<shared_ptr> vector;
QPID_BROKER_EXTERN Queue(const std::string& name,
- bool autodelete = false,
- MessageStore* const store = 0,
+ bool autodelete = false,
+ MessageStore* const store = 0,
const OwnershipToken* const owner = 0,
management::Manageable* parent = 0,
Broker* broker = 0);
QPID_BROKER_EXTERN ~Queue();
- /** allow the Consumer to consume or browse the next available message */
QPID_BROKER_EXTERN bool dispatch(Consumer::shared_ptr);
- /** allow the Consumer to acquire a message that it has browsed.
- * @param msg - message to be acquired.
- * @return false if message is no longer available for acquire.
- */
- QPID_BROKER_EXTERN bool acquire(const QueuedMessage& msg, const std::string& consumer);
+ void create(const qpid::framing::FieldTable& settings);
- /**
- * Used to configure a new queue and create a persistent record
- * for it in store if required.
- */
- QPID_BROKER_EXTERN void create(const qpid::framing::FieldTable& settings);
-
- /**
- * Used to reconfigure a recovered queue (does not create
- * persistent record in store).
- */
- QPID_BROKER_EXTERN void configure(const qpid::framing::FieldTable& settings);
- void destroyed();
+ // "recovering" means we are doing a MessageStore recovery.
+ QPID_BROKER_EXTERN void configure(const qpid::framing::FieldTable& settings,
+ bool recovering = false);
+ void destroy();
+ void notifyDeleted();
QPID_BROKER_EXTERN void bound(const std::string& exchange,
const std::string& key,
const qpid::framing::FieldTable& args);
- //TODO: get unbind out of the public interface; only there for purposes of one unit test
- QPID_BROKER_EXTERN void unbind(ExchangeRegistry& exchanges);
- /**
- * Bind self to specified exchange, and record that binding for unbinding on delete.
- */
- bool bind(boost::shared_ptr<Exchange> exchange, const std::string& key,
- const qpid::framing::FieldTable& arguments=qpid::framing::FieldTable());
+ QPID_BROKER_EXTERN void unbind(ExchangeRegistry& exchanges,
+ Queue::shared_ptr shared_ref);
- /** Acquire the message at the given position if it is available for acquire. Not to
- * be used by clients, but used by the broker for queue management.
- * @param message - set to the acquired message if true returned.
- * @return true if the message has been acquired.
- */
+ QPID_BROKER_EXTERN bool acquire(const QueuedMessage& msg);
QPID_BROKER_EXTERN bool acquireMessageAt(const qpid::framing::SequenceNumber& position, QueuedMessage& message);
/**
@@ -265,14 +233,11 @@ class Queue : public boost::enable_shared_from_this<Queue>,
bool exclusive = false);
QPID_BROKER_EXTERN void cancel(Consumer::shared_ptr c);
- uint32_t purge(const uint32_t purge_request=0, //defaults to all messages
- boost::shared_ptr<Exchange> dest=boost::shared_ptr<Exchange>(),
- const ::qpid::types::Variant::Map *filter=0);
- QPID_BROKER_EXTERN void purgeExpired(sys::Duration);
+ uint32_t purge(const uint32_t purge_request=0, boost::shared_ptr<Exchange> dest=boost::shared_ptr<Exchange>()); //defaults to all messages
+ QPID_BROKER_EXTERN void purgeExpired();
//move qty # of messages to destination Queue destq
- uint32_t move(const Queue::shared_ptr destq, uint32_t qty,
- const qpid::types::Variant::Map *filter=0);
+ uint32_t move(const Queue::shared_ptr destq, uint32_t qty);
QPID_BROKER_EXTERN uint32_t getMessageCount() const;
QPID_BROKER_EXTERN uint32_t getEnqueueCompleteMessageCount() const;
@@ -311,8 +276,8 @@ class Queue : public boost::enable_shared_from_this<Queue>,
* Inform queue of messages that were enqueued, have since
* been acquired but not yet accepted or released (and
* thus are still logically on the queue) - used in
- * clustered broker.
- */
+ * clustered broker.
+ */
void updateEnqueued(const QueuedMessage& msg);
/**
@@ -323,14 +288,14 @@ class Queue : public boost::enable_shared_from_this<Queue>,
* accepted it).
*/
bool isEnqueued(const QueuedMessage& msg);
-
+
/**
- * Acquires the next available (oldest) message
+ * Gets the next available message
*/
QPID_BROKER_EXTERN QueuedMessage get();
- /** Get the message at position pos, returns true if found and sets msg */
- QPID_BROKER_EXTERN bool find(framing::SequenceNumber pos, QueuedMessage& msg ) const;
+ /** Get the message at position pos */
+ QPID_BROKER_EXTERN QueuedMessage find(framing::SequenceNumber pos) const;
const QueuePolicy* getPolicy();
@@ -344,13 +309,8 @@ class Queue : public boost::enable_shared_from_this<Queue>,
void encode(framing::Buffer& buffer) const;
uint32_t encodedSize() const;
- /**
- * Restores a queue from encoded data (used in recovery)
- *
- * Note: restored queue will be neither auto-deleted or have an
- * exclusive owner
- */
- static Queue::shared_ptr restore(QueueRegistry& queues, framing::Buffer& buffer);
+ // "recovering" means we are doing a MessageStore recovery.
+ static Queue::shared_ptr decode(QueueRegistry& queues, framing::Buffer& buffer, bool recovering = false );
static void tryAutoDelete(Broker& broker, Queue::shared_ptr);
virtual void setExternalQueueStore(ExternalQueueStore* inst);
@@ -359,7 +319,6 @@ class Queue : public boost::enable_shared_from_this<Queue>,
management::ManagementObject* GetManagementObject (void) const;
management::Manageable::status_t
ManagementMethod (uint32_t methodId, management::Args& args, std::string& text);
- void query(::qpid::types::Variant::Map&) const;
/** Apply f to each Message on the queue. */
template <class F> void eachMessage(F f) {
@@ -372,11 +331,6 @@ class Queue : public boost::enable_shared_from_this<Queue>,
bindings.eachBinding(f);
}
- /** Apply f to each Observer on the queue */
- template <class F> void eachObserver(F f) {
- std::for_each<Observers::iterator, F>(observers.begin(), observers.end(), f);
- }
-
/** Set the position sequence number for the next message on the queue.
* Must be >= the current sequence number.
* Used by cluster to replicate queues.
@@ -404,11 +358,6 @@ class Queue : public boost::enable_shared_from_this<Queue>,
void recoverPrepared(boost::intrusive_ptr<Message>& msg);
void flush();
-
- const Broker* getBroker();
-
- uint32_t getDequeueSincePurge() { return dequeueSincePurge.get(); }
- void setDequeueSincePurge(uint32_t value);
};
}
}