summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/broker/Queue.h
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2012-08-10 12:04:27 +0000
committerGordon Sim <gsim@apache.org>2012-08-10 12:04:27 +0000
commitdf36b35eb7ca20c3b354d6895004fb201346482b (patch)
tree357d90752f44304284639014f3b9db0cae1f2b2b /qpid/cpp/src/qpid/broker/Queue.h
parent798cebf0e4f41953eb542d6358e5f0eea33d85a7 (diff)
downloadqpid-python-df36b35eb7ca20c3b354d6895004fb201346482b.tar.gz
QPID-4178: broker refactoring
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1371676 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src/qpid/broker/Queue.h')
-rw-r--r--qpid/cpp/src/qpid/broker/Queue.h170
1 files changed, 80 insertions, 90 deletions
diff --git a/qpid/cpp/src/qpid/broker/Queue.h b/qpid/cpp/src/qpid/broker/Queue.h
index a31e0002ea..671a24d53e 100644
--- a/qpid/cpp/src/qpid/broker/Queue.h
+++ b/qpid/cpp/src/qpid/broker/Queue.h
@@ -28,12 +28,14 @@
#include "qpid/broker/Message.h"
#include "qpid/broker/Messages.h"
#include "qpid/broker/PersistableQueue.h"
-#include "qpid/broker/QueuePolicy.h"
#include "qpid/broker/QueueBindings.h"
#include "qpid/broker/QueueListeners.h"
#include "qpid/broker/QueueObserver.h"
+#include "qpid/broker/QueueSettings.h"
+#include "qpid/broker/TxOp.h"
#include "qpid/framing/FieldTable.h"
+#include "qpid/framing/SequenceNumber.h"
#include "qpid/sys/AtomicValue.h"
#include "qpid/sys/Monitor.h"
#include "qpid/sys/Timer.h"
@@ -56,10 +58,14 @@
namespace qpid {
namespace broker {
class Broker;
+class Exchange;
class MessageStore;
+class QueueDepth;
class QueueEvents;
class QueueRegistry;
+class QueueFactory;
class TransactionContext;
+class TxBuffer;
class MessageDistributor;
/**
@@ -70,7 +76,9 @@ class MessageDistributor;
*/
class Queue : public boost::enable_shared_from_this<Queue>,
public PersistableQueue, public management::Manageable {
-
+ public:
+ typedef boost::function1<bool, const Message&> MessagePredicate;
+ protected:
struct UsageBarrier
{
Queue& parent;
@@ -90,31 +98,40 @@ class Queue : public boost::enable_shared_from_this<Queue>,
~ScopedUse() { if (acquired) barrier.release(); }
};
+ class TxPublish : public TxOp
+ {
+ Message message;
+ boost::shared_ptr<Queue> queue;
+ bool prepared;
+ public:
+ TxPublish(const Message&,boost::shared_ptr<Queue>);
+ bool prepare(TransactionContext* ctxt) throw();
+ void commit() throw();
+ void rollback() throw();
+ };
+
typedef std::set< boost::shared_ptr<QueueObserver> > Observers;
enum ConsumeCode {NO_MESSAGES=0, CANT_CONSUME=1, CONSUMED=2};
+ typedef boost::function1<void, Message&> MessageFunctor;
const std::string name;
- const bool autodelete;
MessageStore* store;
const OwnershipToken* owner;
uint32_t consumerCount; // Actually a count of all subscriptions, acquiring or not.
uint32_t browserCount; // Count of non-acquiring subscriptions.
OwnershipToken* exclusive;
- bool noLocal;
bool persistLastNode;
bool inLastNodeFailure;
- std::string traceId;
std::vector<std::string> traceExclude;
QueueListeners listeners;
std::auto_ptr<Messages> messages;
- std::deque<QueuedMessage> pendingDequeues;//used to avoid dequeuing during recovery
+ std::vector<Message> pendingDequeues;
/** messageLock is used to keep the Queue's state consistent while processing message
* events, such as message dispatch, enqueue, acquire, and dequeue. It must be held
* while updating certain members in order to keep these members consistent with
* each other:
* o messages
* o sequence
- * o policy
* o listeners
* o allocator
* o observeXXX() methods
@@ -127,9 +144,9 @@ class Queue : public boost::enable_shared_from_this<Queue>,
mutable qpid::sys::Monitor messageLock;
mutable qpid::sys::Mutex ownershipLock;
mutable uint64_t persistenceId;
- framing::FieldTable settings;
- std::auto_ptr<QueuePolicy> policy;
- bool policyExceeded;
+ const QueueSettings settings;
+ qpid::framing::FieldTable encodableSettings;
+ QueueDepth current;
QueueBindings bindings;
std::string alternateExchangeName;
boost::shared_ptr<Exchange> alternateExchange;
@@ -139,43 +156,42 @@ class Queue : public boost::enable_shared_from_this<Queue>,
sys::AtomicValue<uint32_t> dequeueSincePurge; // Count dequeues since last purge.
int eventMode;
Observers observers;
- bool insertSeqNo;
std::string seqNoKey;
Broker* broker;
bool deleted;
UsageBarrier barrier;
- int autoDeleteTimeout;
boost::intrusive_ptr<qpid::sys::TimerTask> autoDeleteTask;
boost::shared_ptr<MessageDistributor> allocator;
- void push(boost::intrusive_ptr<Message>& msg, bool isRecovery=false);
- void setPolicy(std::auto_ptr<QueuePolicy> policy);
- bool getNextMessage(QueuedMessage& msg, Consumer::shared_ptr& c);
- ConsumeCode consumeNextMessage(QueuedMessage& msg, Consumer::shared_ptr& c);
- bool browseNextMessage(QueuedMessage& msg, Consumer::shared_ptr& c);
- void notifyListener();
+ virtual void push(Message& msg, bool isRecovery=false);
+ void process(Message& msg);
+ bool enqueue(TransactionContext* ctxt, Message& msg);
+ bool getNextMessage(Message& msg, Consumer::shared_ptr& c);
void removeListener(Consumer::shared_ptr);
- bool isExcluded(boost::intrusive_ptr<Message>& msg);
+ bool isExcluded(const Message& msg);
- /** update queue observers, stats, policy, etc when the messages' state changes.
- * messageLock is held by caller */
- void observeEnqueue(const QueuedMessage& msg, const sys::Mutex::ScopedLock& lock);
- void observeAcquire(const QueuedMessage& msg, const sys::Mutex::ScopedLock& lock);
- void observeRequeue(const QueuedMessage& msg, const sys::Mutex::ScopedLock& lock);
- void observeDequeue(const QueuedMessage& msg, const sys::Mutex::ScopedLock& lock);
+ /** update queue observers, stats, policy, etc when the messages' state changes. Lock
+ * must be held by caller */
+ void observeEnqueue(const Message& msg, const sys::Mutex::ScopedLock& lock);
+ void observeAcquire(const Message& msg, const sys::Mutex::ScopedLock& lock);
+ void observeRequeue(const Message& msg, const sys::Mutex::ScopedLock& lock);
+ void observeDequeue(const Message& msg, const sys::Mutex::ScopedLock& lock);
void observeConsumerAdd( const Consumer&, const sys::Mutex::ScopedLock& lock);
void observeConsumerRemove( const Consumer&, const sys::Mutex::ScopedLock& lock);
- bool popAndDequeue(QueuedMessage&);
- bool acquire(const qpid::framing::SequenceNumber& position, QueuedMessage& msg);
- void forcePersistent(QueuedMessage& msg);
+ bool acquire(const qpid::framing::SequenceNumber& position, Message& msg,
+ const qpid::sys::Mutex::ScopedLock& locker);
+
+ void forcePersistent(const Message& msg);
int getEventMode();
- void configureImpl(const qpid::framing::FieldTable& settings);
- void checkNotDeleted(const Consumer::shared_ptr& c);
+ void dequeueFromStore(boost::intrusive_ptr<PersistableMessage>);
+ void abandoned(const Message& message);
+ void checkNotDeleted(const Consumer::shared_ptr&);
void notifyDeleted();
- void dequeueIf(Messages::Predicate predicate, std::deque<QueuedMessage>& dequeued);
+ uint32_t remove(uint32_t maxCount, MessagePredicate, MessageFunctor, SubscriptionType);
+ virtual bool checkDepth(const QueueDepth& increment, const Message&);
public:
@@ -184,12 +200,11 @@ class Queue : public boost::enable_shared_from_this<Queue>,
typedef std::vector<shared_ptr> vector;
QPID_BROKER_EXTERN Queue(const std::string& name,
- bool autodelete = false,
+ const QueueSettings& settings = QueueSettings(),
MessageStore* const store = 0,
- const OwnershipToken* const owner = 0,
management::Manageable* parent = 0,
Broker* broker = 0);
- QPID_BROKER_EXTERN ~Queue();
+ QPID_BROKER_EXTERN virtual ~Queue();
/** allow the Consumer to consume or browse the next available message */
QPID_BROKER_EXTERN bool dispatch(Consumer::shared_ptr);
@@ -198,19 +213,13 @@ class Queue : public boost::enable_shared_from_this<Queue>,
* @param msg - message to be acquired.
* @return false if message is no longer available for acquire.
*/
- QPID_BROKER_EXTERN bool acquire(const QueuedMessage& msg, const std::string& consumer);
+ QPID_BROKER_EXTERN bool acquire(const QueueCursor& msg, const std::string& consumer);
/**
- * Used to configure a new queue and create a persistent record
- * for it in store if required.
+ * Used to create a persistent record for the queue in store if required.
*/
- QPID_BROKER_EXTERN void create(const qpid::framing::FieldTable& settings);
+ QPID_BROKER_EXTERN void create();
- /**
- * Used to reconfigure a recovered queue (does not create
- * persistent record in store).
- */
- QPID_BROKER_EXTERN void configure(const qpid::framing::FieldTable& settings);
void destroyed();
QPID_BROKER_EXTERN void bound(const std::string& exchange,
const std::string& key,
@@ -224,34 +233,36 @@ class Queue : public boost::enable_shared_from_this<Queue>,
boost::shared_ptr<Exchange> exchange, const std::string& key,
const qpid::framing::FieldTable& arguments=qpid::framing::FieldTable());
- /** Acquire the message at the given position if it is available for acquire. Not to
- * be used by clients, but used by the broker for queue management.
- * @param message - set to the acquired message if true returned.
- * @return true if the message has been acquired.
+ /**
+ * Removes (and dequeues) a message by its sequence number (used
+ * for some broker features, e.g. queue replication)
+ *
+ * @param position the sequence number of the message to be dequeued.
+ * @return true if the message is dequeued.
*/
- QPID_BROKER_EXTERN bool acquireMessageAt(const qpid::framing::SequenceNumber& position, QueuedMessage& message);
+ QPID_BROKER_EXTERN bool dequeueMessageAt(const qpid::framing::SequenceNumber& position);
/**
* Delivers a message to the queue. Will record it as
* enqueued if persistent then process it.
*/
- QPID_BROKER_EXTERN void deliver(boost::intrusive_ptr<Message> msg);
- /**
- * Dispatches the messages immediately to a consumer if
- * one is available or stores it for later if not.
- */
- QPID_BROKER_EXTERN void process(boost::intrusive_ptr<Message>& msg);
+ QPID_BROKER_EXTERN void deliver(Message, TxBuffer* = 0);
/**
* Returns a message to the in-memory queue (due to lack
* of acknowledegement from a receiver). If a consumer is
* available it will be dispatched immediately, else it
* will be returned to the front of the queue.
*/
- QPID_BROKER_EXTERN void requeue(const QueuedMessage& msg);
+ QPID_BROKER_EXTERN void release(const QueueCursor& msg, bool markRedelivered=true);
+ QPID_BROKER_EXTERN void reject(const QueueCursor& msg);
+
+ QPID_BROKER_EXTERN bool seek(QueueCursor&, MessagePredicate);
+ QPID_BROKER_EXTERN bool seek(QueueCursor&, MessagePredicate, qpid::framing::SequenceNumber start);
+ QPID_BROKER_EXTERN bool seek(QueueCursor&, qpid::framing::SequenceNumber start);
/**
* Used during recovery to add stored messages back to the queue
*/
- QPID_BROKER_EXTERN void recover(boost::intrusive_ptr<Message>& msg);
+ QPID_BROKER_EXTERN void recover(Message& msg);
QPID_BROKER_EXTERN void consume(Consumer::shared_ptr c,
bool exclusive = false);
@@ -268,7 +279,6 @@ class Queue : public boost::enable_shared_from_this<Queue>,
const qpid::types::Variant::Map *filter=0);
QPID_BROKER_EXTERN uint32_t getMessageCount() const;
- QPID_BROKER_EXTERN uint32_t getEnqueueCompleteMessageCount() const;
QPID_BROKER_EXTERN uint32_t getConsumerCount() const;
inline const std::string& getName() const { return name; }
QPID_BROKER_EXTERN bool isExclusiveOwner(const OwnershipToken* const o) const;
@@ -277,8 +287,9 @@ class Queue : public boost::enable_shared_from_this<Queue>,
QPID_BROKER_EXTERN bool hasExclusiveConsumer() const;
QPID_BROKER_EXTERN bool hasExclusiveOwner() const;
inline bool isDurable() const { return store != 0; }
- inline const framing::FieldTable& getSettings() const { return settings; }
- inline bool isAutoDelete() const { return autodelete; }
+ inline const QueueSettings& getSettings() const { return settings; }
+ inline const qpid::framing::FieldTable& getEncodableSettings() const { return encodableSettings; }
+ inline bool isAutoDelete() const { return settings.autodelete; }
QPID_BROKER_EXTERN bool canAutoDelete() const;
const QueueBindings& getBindings() const { return bindings; }
@@ -288,48 +299,22 @@ class Queue : public boost::enable_shared_from_this<Queue>,
QPID_BROKER_EXTERN void setLastNodeFailure();
QPID_BROKER_EXTERN void clearLastNodeFailure();
- QPID_BROKER_EXTERN bool enqueue(TransactionContext* ctxt, boost::intrusive_ptr<Message>& msg, bool suppressPolicyCheck = false);
- QPID_BROKER_EXTERN void enqueueAborted(boost::intrusive_ptr<Message> msg);
/**
* dequeue from store (only done once messages is acknowledged)
*/
- QPID_BROKER_EXTERN bool dequeue(TransactionContext* ctxt, const QueuedMessage &msg);
+ QPID_BROKER_EXTERN void dequeue(TransactionContext* ctxt, const QueueCursor&);
/**
* Inform the queue that a previous transactional dequeue
* committed.
*/
- QPID_BROKER_EXTERN void dequeueCommitted(const QueuedMessage& msg);
-
- /**
- * Inform queue of messages that were enqueued, have since
- * been acquired but not yet accepted or released (and
- * thus are still logically on the queue) - used in
- * clustered broker.
- */
- QPID_BROKER_EXTERN void updateEnqueued(const QueuedMessage& msg);
-
- /**
- * Test whether the specified message (identified by its
- * sequence/position), is still enqueued (note this
- * doesn't mean it is available for delivery as it may
- * have been delievered to a subscriber who has not yet
- * accepted it).
- */
- QPID_BROKER_EXTERN bool isEnqueued(const QueuedMessage& msg);
-
- /**
- * Acquires the next available (oldest) message
- */
- QPID_BROKER_EXTERN QueuedMessage get();
+ void dequeueCommitted(const QueueCursor& msg);
/** Get the message at position pos, returns true if found and sets msg */
- QPID_BROKER_EXTERN bool find(framing::SequenceNumber pos, QueuedMessage& msg ) const;
-
- QPID_BROKER_EXTERN const QueuePolicy* getPolicy();
+ QPID_BROKER_EXTERN bool find(framing::SequenceNumber pos, Message& msg ) const;
QPID_BROKER_EXTERN void setAlternateExchange(boost::shared_ptr<Exchange> exchange);
QPID_BROKER_EXTERN boost::shared_ptr<Exchange> getAlternateExchange();
- QPID_BROKER_EXTERN bool isLocal(boost::intrusive_ptr<Message>& msg);
+ QPID_BROKER_EXTERN bool isLocal(const Message& msg);
//PersistableQueue support:
QPID_BROKER_EXTERN uint64_t getPersistenceId() const;
@@ -410,7 +395,11 @@ class Queue : public boost::enable_shared_from_this<Queue>,
* Reserve space in policy for an enqueued message that
* has been recovered in the prepared state (dtx only)
*/
- QPID_BROKER_EXTERN void recoverPrepared(boost::intrusive_ptr<Message>& msg);
+ QPID_BROKER_EXTERN void recoverPrepared(const Message& msg);
+ void enqueueAborted(const Message& msg);
+ void enqueueCommited(Message& msg);
+ void dequeueAborted(Message& msg);
+ void dequeueCommited(const Message& msg);
QPID_BROKER_EXTERN void flush();
@@ -418,6 +407,7 @@ class Queue : public boost::enable_shared_from_this<Queue>,
uint32_t getDequeueSincePurge() { return dequeueSincePurge.get(); }
QPID_BROKER_EXTERN void setDequeueSincePurge(uint32_t value);
+ friend class QueueFactory;
};
}
}