summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/Queue.h
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2007-11-29 11:54:17 +0000
committerGordon Sim <gsim@apache.org>2007-11-29 11:54:17 +0000
commit6b179639ac573be8f5c7d84bfd480c71a6815265 (patch)
tree29d56665e8258c923f256fbed3942148dede48e0 /cpp/src/qpid/broker/Queue.h
parentd1f32f54b73807b778eb6027bb048f9e7b0e808f (diff)
downloadqpid-python-6b179639ac573be8f5c7d84bfd480c71a6815265.tar.gz
Changes to threading: queues serialiser removed, io threads used to drive dispatch to consumers
Fix to PersistableMessage: use correct lock when accessing synclist, don't hold enqueue lock when notifying queues git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@599395 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker/Queue.h')
-rw-r--r--cpp/src/qpid/broker/Queue.h70
1 files changed, 21 insertions, 49 deletions
diff --git a/cpp/src/qpid/broker/Queue.h b/cpp/src/qpid/broker/Queue.h
index 1e56f1b6e9..4018f91367 100644
--- a/cpp/src/qpid/broker/Queue.h
+++ b/cpp/src/qpid/broker/Queue.h
@@ -24,6 +24,7 @@
#include <vector>
#include <memory>
#include <deque>
+#include <set>
#include <boost/shared_ptr.hpp>
#include "qpid/framing/amqp_types.h"
#include "ConnectionToken.h"
@@ -48,12 +49,6 @@ namespace qpid {
using std::string;
- struct DispatchCompletion
- {
- virtual ~DispatchCompletion() {}
- virtual void completed() = 0;
- };
-
/**
* The brokers representation of an amqp queue. Messages are
* delivered to a queue from where they can be dispatched to
@@ -61,59 +56,40 @@ namespace qpid {
* or more consumers registers.
*/
class Queue : public PersistableQueue, public management::Manageable {
- typedef std::vector<Consumer::ptr> Consumers;
+ typedef std::set<Consumer*> Listeners;
typedef std::deque<QueuedMessage> Messages;
-
- struct DispatchFunctor
- {
- Queue& queue;
- Consumer::ptr consumer;
- DispatchCompletion* sync;
-
- DispatchFunctor(Queue& q, DispatchCompletion* s = 0) : queue(q), sync(s) {}
- DispatchFunctor(Queue& q, Consumer::ptr c, DispatchCompletion* s = 0) : queue(q), consumer(c), sync(s) {}
- void operator()();
- };
const string name;
const bool autodelete;
MessageStore* const store;
const ConnectionToken* owner;
- Consumers acquirers;
- Consumers browsers;
+ uint32_t consumerCount;
+ bool exclusive;
+ Listeners listeners;
Messages messages;
- int next;
- mutable qpid::sys::RWlock consumerLock;
+ mutable qpid::sys::Mutex consumerLock;
mutable qpid::sys::Mutex messageLock;
mutable qpid::sys::Mutex ownershipLock;
- Consumer::ptr exclusive;
mutable uint64_t persistenceId;
framing::FieldTable settings;
std::auto_ptr<QueuePolicy> policy;
QueueBindings bindings;
boost::shared_ptr<Exchange> alternateExchange;
- qpid::sys::Serializer<DispatchFunctor> serializer;
- DispatchFunctor dispatchCallback;
framing::SequenceNumber sequence;
management::Queue::shared_ptr mgmtObject;
void pop();
void push(intrusive_ptr<Message>& msg);
- bool dispatch(QueuedMessage& msg);
void setPolicy(std::auto_ptr<QueuePolicy> policy);
- /**
- * only called by serilizer
- */
- void dispatch();
- void cancel(Consumer::ptr c, Consumers& set);
- void serviceAllBrowsers();
- void serviceBrowser(Consumer::ptr c);
- Consumer::ptr allocate();
- bool seek(QueuedMessage& msg, const framing::SequenceNumber& position);
- uint32_t getAcquirerCount() const;
- bool getNextMessage(QueuedMessage& msg);
- bool exclude(intrusive_ptr<Message> msg);
-
+ bool seek(QueuedMessage& msg, Consumer& position);
+ bool getNextMessage(QueuedMessage& msg, Consumer& c);
+ bool consumeNextMessage(QueuedMessage& msg, Consumer& c);
+ bool browseNextMessage(QueuedMessage& msg, Consumer& c);
+ bool canExcludeUnwanted();
+
+ void notify();
+ void removeListener(Consumer&);
+ void addListener(Consumer&);
public:
virtual void notifyDurableIOComplete();
@@ -127,6 +103,8 @@ namespace qpid {
Manageable* parent = 0);
~Queue();
+ bool dispatch(Consumer&);
+
void create(const qpid::framing::FieldTable& settings);
void configure(const qpid::framing::FieldTable& settings);
void destroy();
@@ -156,16 +134,10 @@ namespace qpid {
* Used during recovery to add stored messages back to the queue
*/
void recover(intrusive_ptr<Message>& msg);
- /**
- * Request dispatch any queued messages providing there are
- * consumers for them. Only one thread can be dispatching
- * at any time, so this call schedules the despatch based on
- * the serilizer policy.
- */
- void requestDispatch(Consumer::ptr c = Consumer::ptr());
- void flush(DispatchCompletion& callback);
- void consume(Consumer::ptr c, bool exclusive = false);
- void cancel(Consumer::ptr c);
+
+ void consume(Consumer& c, bool exclusive = false);
+ void cancel(Consumer& c);
+
uint32_t purge();
uint32_t getMessageCount() const;
uint32_t getConsumerCount() const;