diff options
author | Gordon Sim <gsim@apache.org> | 2007-11-29 11:54:17 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2007-11-29 11:54:17 +0000 |
commit | 6b179639ac573be8f5c7d84bfd480c71a6815265 (patch) | |
tree | 29d56665e8258c923f256fbed3942148dede48e0 /cpp/src/qpid/broker/Queue.h | |
parent | d1f32f54b73807b778eb6027bb048f9e7b0e808f (diff) | |
download | qpid-python-6b179639ac573be8f5c7d84bfd480c71a6815265.tar.gz |
Changes to threading: queues serialiser removed, io threads used to drive dispatch to consumers
Fix to PersistableMessage: use correct lock when accessing synclist, don't hold enqueue lock when notifying queues
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@599395 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker/Queue.h')
-rw-r--r-- | cpp/src/qpid/broker/Queue.h | 70 |
1 files changed, 21 insertions, 49 deletions
diff --git a/cpp/src/qpid/broker/Queue.h b/cpp/src/qpid/broker/Queue.h index 1e56f1b6e9..4018f91367 100644 --- a/cpp/src/qpid/broker/Queue.h +++ b/cpp/src/qpid/broker/Queue.h @@ -24,6 +24,7 @@ #include <vector> #include <memory> #include <deque> +#include <set> #include <boost/shared_ptr.hpp> #include "qpid/framing/amqp_types.h" #include "ConnectionToken.h" @@ -48,12 +49,6 @@ namespace qpid { using std::string; - struct DispatchCompletion - { - virtual ~DispatchCompletion() {} - virtual void completed() = 0; - }; - /** * The brokers representation of an amqp queue. Messages are * delivered to a queue from where they can be dispatched to @@ -61,59 +56,40 @@ namespace qpid { * or more consumers registers. */ class Queue : public PersistableQueue, public management::Manageable { - typedef std::vector<Consumer::ptr> Consumers; + typedef std::set<Consumer*> Listeners; typedef std::deque<QueuedMessage> Messages; - - struct DispatchFunctor - { - Queue& queue; - Consumer::ptr consumer; - DispatchCompletion* sync; - - DispatchFunctor(Queue& q, DispatchCompletion* s = 0) : queue(q), sync(s) {} - DispatchFunctor(Queue& q, Consumer::ptr c, DispatchCompletion* s = 0) : queue(q), consumer(c), sync(s) {} - void operator()(); - }; const string name; const bool autodelete; MessageStore* const store; const ConnectionToken* owner; - Consumers acquirers; - Consumers browsers; + uint32_t consumerCount; + bool exclusive; + Listeners listeners; Messages messages; - int next; - mutable qpid::sys::RWlock consumerLock; + mutable qpid::sys::Mutex consumerLock; mutable qpid::sys::Mutex messageLock; mutable qpid::sys::Mutex ownershipLock; - Consumer::ptr exclusive; mutable uint64_t persistenceId; framing::FieldTable settings; std::auto_ptr<QueuePolicy> policy; QueueBindings bindings; boost::shared_ptr<Exchange> alternateExchange; - qpid::sys::Serializer<DispatchFunctor> serializer; - DispatchFunctor dispatchCallback; framing::SequenceNumber sequence; management::Queue::shared_ptr mgmtObject; void pop(); void push(intrusive_ptr<Message>& msg); - bool dispatch(QueuedMessage& msg); void setPolicy(std::auto_ptr<QueuePolicy> policy); - /** - * only called by serilizer - */ - void dispatch(); - void cancel(Consumer::ptr c, Consumers& set); - void serviceAllBrowsers(); - void serviceBrowser(Consumer::ptr c); - Consumer::ptr allocate(); - bool seek(QueuedMessage& msg, const framing::SequenceNumber& position); - uint32_t getAcquirerCount() const; - bool getNextMessage(QueuedMessage& msg); - bool exclude(intrusive_ptr<Message> msg); - + bool seek(QueuedMessage& msg, Consumer& position); + bool getNextMessage(QueuedMessage& msg, Consumer& c); + bool consumeNextMessage(QueuedMessage& msg, Consumer& c); + bool browseNextMessage(QueuedMessage& msg, Consumer& c); + bool canExcludeUnwanted(); + + void notify(); + void removeListener(Consumer&); + void addListener(Consumer&); public: virtual void notifyDurableIOComplete(); @@ -127,6 +103,8 @@ namespace qpid { Manageable* parent = 0); ~Queue(); + bool dispatch(Consumer&); + void create(const qpid::framing::FieldTable& settings); void configure(const qpid::framing::FieldTable& settings); void destroy(); @@ -156,16 +134,10 @@ namespace qpid { * Used during recovery to add stored messages back to the queue */ void recover(intrusive_ptr<Message>& msg); - /** - * Request dispatch any queued messages providing there are - * consumers for them. Only one thread can be dispatching - * at any time, so this call schedules the despatch based on - * the serilizer policy. - */ - void requestDispatch(Consumer::ptr c = Consumer::ptr()); - void flush(DispatchCompletion& callback); - void consume(Consumer::ptr c, bool exclusive = false); - void cancel(Consumer::ptr c); + + void consume(Consumer& c, bool exclusive = false); + void cancel(Consumer& c); + uint32_t purge(); uint32_t getMessageCount() const; uint32_t getConsumerCount() const; |