summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/Queue.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/Queue.cpp')
-rw-r--r--cpp/src/qpid/broker/Queue.cpp291
1 files changed, 127 insertions, 164 deletions
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp
index c43ab8c231..4dba60cd0d 100644
--- a/cpp/src/qpid/broker/Queue.cpp
+++ b/cpp/src/qpid/broker/Queue.cpp
@@ -31,7 +31,8 @@
#include <iostream>
#include <boost/bind.hpp>
#include "QueueRegistry.h"
-
+#include <algorithm>
+#include <functional>
using namespace qpid::broker;
using namespace qpid::sys;
@@ -40,6 +41,8 @@ using qpid::management::ManagementAgent;
using qpid::management::ManagementObject;
using qpid::management::Manageable;
using qpid::management::Args;
+using std::for_each;
+using std::mem_fun;
Queue::Queue(const string& _name, bool _autodelete,
MessageStore* const _store,
@@ -50,10 +53,9 @@ Queue::Queue(const string& _name, bool _autodelete,
autodelete(_autodelete),
store(_store),
owner(_owner),
- next(0),
- persistenceId(0),
- serializer(false),
- dispatchCallback(*this)
+ consumerCount(0),
+ exclusive(false),
+ persistenceId(0)
{
if (parent != 0)
{
@@ -73,9 +75,8 @@ Queue::~Queue()
void Queue::notifyDurableIOComplete()
{
- // signal SemanticHander to ack completed dequeues
- // then dispatch to ack...
- serializer.execute(dispatchCallback);
+ Mutex::ScopedLock locker(messageLock);
+ notify();
}
@@ -110,7 +111,6 @@ void Queue::deliver(intrusive_ptr<Message>& msg){
push(msg);
}
QPID_LOG(debug, "Message " << msg << " enqueued on " << name << "[" << this << "]");
- serializer.execute(dispatchCallback);
}
}
@@ -148,17 +148,13 @@ void Queue::process(intrusive_ptr<Message>& msg){
mgmtObject->inc_bytePersistEnqueues (msg->contentSize ());
}
}
- serializer.execute(dispatchCallback);
-
}
void Queue::requeue(const QueuedMessage& msg){
- {
- Mutex::ScopedLock locker(messageLock);
- msg.payload->enqueueComplete(); // mark the message as enqueued
- messages.push_front(msg);
- }
- serializer.execute(dispatchCallback);
+ Mutex::ScopedLock locker(messageLock);
+ msg.payload->enqueueComplete(); // mark the message as enqueued
+ messages.push_front(msg);
+ notify();
}
bool Queue::acquire(const QueuedMessage& msg) {
@@ -172,186 +168,170 @@ bool Queue::acquire(const QueuedMessage& msg) {
return false;
}
-void Queue::requestDispatch(Consumer::ptr c){
- if (!c || c->preAcquires()) {
- serializer.execute(dispatchCallback);
- } else {
- DispatchFunctor f(*this, c);
- serializer.execute(f);
- }
-}
-
-void Queue::flush(DispatchCompletion& completion)
-{
- DispatchFunctor f(*this, &completion);
- serializer.execute(f);
-}
-
/**
* Return true if the message can be excluded. This is currently the
- * case if the queue has an exclusive consumer that will never want
- * the message, or if the queue is exclusive to a single connection
- * and has a single consumer (covers the JMS topic case).
+ * case if the queue is exclusive and has an exclusive consumer that
+ * doesn't want the message or has a single consumer that doesn't want
+ * the message (covers the JMS topic case).
*/
-bool Queue::exclude(intrusive_ptr<Message> msg)
+bool Queue::canExcludeUnwanted()
+{
+ Mutex::ScopedLock locker(consumerLock);
+ return hasExclusiveOwner() && (exclusive || consumerCount == 1);
+}
+
+
+bool Queue::getNextMessage(QueuedMessage& m, Consumer& c)
{
- RWlock::ScopedWlock locker(consumerLock);
- if (exclusive) {
- return !exclusive->filter(msg);
- } else if (hasExclusiveOwner() && acquirers.size() == 1) {
- return !acquirers[0]->filter(msg);
+ if (c.preAcquires()) {
+ return consumeNextMessage(m, c);
} else {
- return false;
+ return browseNextMessage(m, c);
}
}
-Consumer::ptr Queue::allocate()
+bool Queue::consumeNextMessage(QueuedMessage& m, Consumer& c)
{
- RWlock::ScopedWlock locker(consumerLock);
-
- if (acquirers.empty()) {
- return Consumer::ptr();
- } else if (exclusive){
- return exclusive;
- } else {
- next = next % acquirers.size();
- return acquirers[next++];
+ while (true) {
+ Mutex::ScopedLock locker(messageLock);
+ if (messages.empty()) {
+ QPID_LOG(debug, "No messages to dispatch on queue '" << name << "'");
+ addListener(c);
+ return false;
+ } else {
+ QueuedMessage msg = messages.front();
+ if (!msg.payload->isEnqueueComplete()) {
+ QPID_LOG(debug, "Messages not ready to dispatch on queue '" << name << "'");
+ addListener(c);
+ return false;
+ }
+
+ if (c.filter(msg.payload)) {
+ if (c.accept(msg.payload)) {
+ m = msg;
+ pop();
+ return true;
+ } else {
+ //message(s) are available but consumer hasn't got enough credit
+ QPID_LOG(debug, "Consumer can't currently accept message from '" << name << "'");
+ return false;
+ }
+ } else {
+ //consumer will never want this message
+ if (canExcludeUnwanted()) {
+ //hack for no-local on JMS topics; get rid of this message
+ QPID_LOG(debug, "Excluding message from '" << name << "'");
+ pop();
+ } else {
+ //leave it for another consumer
+ QPID_LOG(debug, "Consumer doesn't want message from '" << name << "'");
+ return false;
+ }
+ }
+ }
}
}
-bool Queue::dispatch(QueuedMessage& msg)
+
+bool Queue::browseNextMessage(QueuedMessage& m, Consumer& c)
{
- QPID_LOG(info, "Dispatch message " << msg.position << " from queue " << name);
- //additions to the acquirers will result in a separate dispatch
- //request, so won't result in anyone being missed
- uint counter = getAcquirerCount();
- Consumer::ptr c = allocate();
- while (c && counter--){
- if (c->deliver(msg)) {
- return true;
+ QueuedMessage msg(this);
+ while (seek(msg, c)) {
+ if (c.filter(msg.payload)) {
+ if (c.accept(msg.payload)) {
+ //consumer wants the message
+ c.position = msg.position;
+ m = msg;
+ return true;
+ } else {
+ //consumer hasn't got enough credit for the message
+ QPID_LOG(debug, "Consumer can't currently accept message from '" << name << "'");
+ return false;
+ }
} else {
- c = allocate();
+ //consumer will never want this message, continue seeking
+ c.position = msg.position;
+ QPID_LOG(debug, "Browser skipping message from '" << name << "'");
}
}
return false;
}
-bool Queue::getNextMessage(QueuedMessage& msg)
+void Queue::notify()
{
- Mutex::ScopedLock locker(messageLock);
- if (messages.empty()) {
- QPID_LOG(debug, "No messages to dispatch on queue '" << name << "'");
- return false;
- } else {
- msg = messages.front();
- return true;
- }
+ //notify listeners that there may be messages to process
+ for_each(listeners.begin(), listeners.end(), mem_fun(&Consumer::notify));
+ listeners.clear();
}
-void Queue::dispatch()
+void Queue::removeListener(Consumer& c)
{
- QueuedMessage msg(this);
- while (getNextMessage(msg) && msg.payload->isEnqueueComplete()){
- if (dispatch(msg)) {
- pop();
- } else if (exclude(msg.payload)) {
- pop();
- dequeue(0, msg.payload);
- QPID_LOG(debug, "Message " << msg.payload << " filtered out of " << name << "[" << this << "]");
- } else {
- break;
- }
- }
- serviceAllBrowsers();
-}
-
-void Queue::serviceAllBrowsers()
+ Mutex::ScopedLock locker(messageLock);
+ listeners.erase(&c);
+}
+
+void Queue::addListener(Consumer& c)
{
- Consumers copy;
- {
- RWlock::ScopedRlock locker(consumerLock);
- if (browsers.empty()) return;//shortcut
- copy = browsers;
- }
- for (Consumers::iterator i = copy.begin(); i != copy.end(); i++) {
- serviceBrowser(*i);
- }
-}
-
-void Queue::serviceBrowser(Consumer::ptr browser)
+ listeners.insert(&c);
+}
+
+bool Queue::dispatch(Consumer& c)
{
QueuedMessage msg(this);
- while (seek(msg, browser->position) && browser->deliver(msg)) {
- browser->position = msg.position;
+ if (getNextMessage(msg, c)) {
+ c.deliver(msg);
+ return true;
+ } else {
+ return false;
}
}
-bool Queue::seek(QueuedMessage& msg, const framing::SequenceNumber& position) {
+bool Queue::seek(QueuedMessage& msg, Consumer& c) {
Mutex::ScopedLock locker(messageLock);
- if (!messages.empty() && messages.back().position > position) {
- if (position < messages.front().position) {
+ if (!messages.empty() && messages.back().position > c.position) {
+ if (c.position < messages.front().position) {
msg = messages.front();
return true;
} else {
- uint index = (position - messages.front().position) + 1;
+ uint index = (c.position - messages.front().position) + 1;
if (index < messages.size()) {
msg = messages[index];
return true;
}
}
}
+ addListener(c);
return false;
}
-void Queue::consume(Consumer::ptr c, bool requestExclusive){
- RWlock::ScopedWlock locker(consumerLock);
+void Queue::consume(Consumer&, bool requestExclusive){
+ Mutex::ScopedLock locker(consumerLock);
if(exclusive) {
throw AccessRefusedException(
QPID_MSG("Queue " << getName() << " has an exclusive consumer. No more consumers allowed."));
- }
- if(requestExclusive) {
- if(acquirers.empty() && browsers.empty()) {
- exclusive = c;
- } else {
+ } else if(requestExclusive) {
+ if(consumerCount) {
throw AccessRefusedException(
QPID_MSG("Queue " << getName() << " already has consumers. Exclusive access denied."));
- }
- }
- if (c->preAcquires()) {
- acquirers.push_back(c);
- } else {
- Mutex::ScopedLock locker(messageLock);
- if (messages.empty()) {
- c->position = SequenceNumber(sequence.getValue() - 1);
} else {
- c->position = SequenceNumber(messages.front().position.getValue() - 1);
+ exclusive = true;
}
- browsers.push_back(c);
}
+ consumerCount++;
if (mgmtObject != 0){
mgmtObject->inc_consumers ();
}
}
-void Queue::cancel(Consumer::ptr c){
- RWlock::ScopedWlock locker(consumerLock);
- if (c->preAcquires()) {
- cancel(c, acquirers);
- } else {
- cancel(c, browsers);
- }
+void Queue::cancel(Consumer& c){
+ removeListener(c);
+ Mutex::ScopedLock locker(consumerLock);
+ consumerCount--;
+ if(exclusive) exclusive = false;
if (mgmtObject != 0){
mgmtObject->dec_consumers ();
}
- if(exclusive == c) exclusive.reset();
-}
-
-void Queue::cancel(Consumer::ptr c, Consumers& consumers)
-{
- Consumers::iterator i = std::find(consumers.begin(), consumers.end(), c);
- if (i != consumers.end())
- consumers.erase(i);
}
QueuedMessage Queue::dequeue(){
@@ -382,14 +362,16 @@ uint32_t Queue::purge(){
return count;
}
+/**
+ * Assumes messageLock is held
+ */
void Queue::pop(){
- Mutex::ScopedLock locker(messageLock);
if (policy.get()) policy->dequeued(messages.front().payload->contentSize());
messages.pop_front();
}
void Queue::push(intrusive_ptr<Message>& msg){
- Mutex::ScopedLock locker(messageLock);
+ Mutex::ScopedLock locker(messageLock);
messages.push_back(QueuedMessage(this, msg, ++sequence));
if (policy.get()) {
policy->enqueued(msg->contentSize());
@@ -397,6 +379,7 @@ void Queue::push(intrusive_ptr<Message>& msg){
msg->releaseContent(store);
}
}
+ notify();
}
/** function only provided for unit tests, or code not in critical message path */
@@ -412,18 +395,13 @@ uint32_t Queue::getMessageCount() const{
}
uint32_t Queue::getConsumerCount() const{
- RWlock::ScopedRlock locker(consumerLock);
- return acquirers.size() + browsers.size();
-}
-
-uint32_t Queue::getAcquirerCount() const{
- RWlock::ScopedRlock locker(consumerLock);
- return acquirers.size();
+ Mutex::ScopedLock locker(consumerLock);
+ return consumerCount;
}
bool Queue::canAutoDelete() const{
- RWlock::ScopedRlock locker(consumerLock);
- return autodelete && acquirers.empty() && browsers.empty();
+ Mutex::ScopedLock locker(consumerLock);
+ return autodelete && !consumerCount;
}
// return true if store exists,
@@ -601,21 +579,6 @@ bool Queue::hasExclusiveConsumer() const
return exclusive;
}
-void Queue::DispatchFunctor::operator()()
-{
- try {
- if (consumer && !consumer->preAcquires()) {
- queue.serviceBrowser(consumer);
- }else{
- queue.dispatch();
- }
- } catch (const std::exception& e) {
- QPID_LOG(error, "Exception on dispatch: " << e.what());
- }
-
- if (sync) sync->completed();
-}
-
ManagementObject::shared_ptr Queue::GetManagementObject (void) const
{
return dynamic_pointer_cast<ManagementObject> (mgmtObject);