diff options
Diffstat (limited to 'cpp/src/qpid/broker/BrokerQueue.cpp')
-rw-r--r-- | cpp/src/qpid/broker/BrokerQueue.cpp | 102 |
1 files changed, 83 insertions, 19 deletions
diff --git a/cpp/src/qpid/broker/BrokerQueue.cpp b/cpp/src/qpid/broker/BrokerQueue.cpp index 553f6016d2..a094c7a804 100644 --- a/cpp/src/qpid/broker/BrokerQueue.cpp +++ b/cpp/src/qpid/broker/BrokerQueue.cpp @@ -112,31 +112,50 @@ void Queue::requeue(const QueuedMessage& msg){ } - -void Queue::requestDispatch(){ - serializer.execute(dispatchCallback); +bool Queue::acquire(const QueuedMessage& msg) { + Mutex::ScopedLock locker(messageLock); + for (Messages::iterator i = messages.begin(); i != messages.end(); i++) { + if (i->position == msg.position) { + messages.erase(i); + return true; + } + } + return false; } +void Queue::requestDispatch(Consumer* c, bool sync){ + if (!c || c->preAcquires()) { + if (sync) { + serializer.dispatch(); + } else { + serializer.execute(dispatchCallback); + } + } else { + //note: this is always done on the callers thread, regardless + // of sync; browsers of large queues should use flow control! + serviceBrowser(c); + } +} bool Queue::dispatch(QueuedMessage& msg){ RWlock::ScopedWlock locker(consumerLock); /// lock scope to wide.... - if(consumers.empty()){ + if(acquirers.empty()){ return false; }else if(exclusive){ return exclusive->deliver(msg); }else{ //deliver to next consumer - next = next % consumers.size(); - Consumer* c = consumers[next]; + next = next % acquirers.size(); + Consumer* c = acquirers[next]; int start = next; while(c){ next++; if(c->deliver(msg)) return true; - next = next % consumers.size(); - c = next == start ? 0 : consumers[next]; + next = next % acquirers.size(); + c = next == start ? 0 : acquirers[next]; } return false; } @@ -153,34 +172,79 @@ void Queue::dispatch(){ } if( msg.payload->isEnqueueComplete() && dispatch(msg) ) { pop(); - } else { + } else { break; } - } + } + RWlock::ScopedRlock locker(consumerLock); + for (Consumers::iterator i = browsers.begin(); i != browsers.end(); i++) { + serviceBrowser(*i); + } +} + +void Queue::serviceBrowser(Consumer* browser) +{ + //This is a poorly performing implementation: + // + // * bad concurrency where browsers exist + // * inefficient for largish queues + // + //The queue needs to be based on a current data structure that + //does not invalidate iterators when modified. Subscribers could + //then use an iterator to continue from where they left off + + Mutex::ScopedLock locker(messageLock); + if (!messages.empty() && messages.back().position > browser->position) { + for (Messages::iterator i = messages.begin(); i != messages.end(); i++) { + if (i->position > browser->position) { + if (browser->deliver(*i)) { + browser->position = i->position; + } else { + break; + } + } + } + } } void Queue::consume(Consumer* c, bool requestExclusive){ RWlock::ScopedWlock locker(consumerLock); - if(exclusive) + if(exclusive) { throw ChannelException( 403, format("Queue '%s' has an exclusive consumer." " No more consumers allowed.") % getName()); + } if(requestExclusive) { - if(!consumers.empty()) + if(acquirers.empty() && browsers.empty()) { + exclusive = c; + } else { throw ChannelException( - 403, format("Queue '%s' already has conumers." - "Exclusive access denied.") %getName()); - exclusive = c; + 403, format("Queue '%s' already has consumers." + "Exclusive access denied.") % getName()); + } + } + if (c->preAcquires()) { + acquirers.push_back(c); + } else { + browsers.push_back(c); } - consumers.push_back(c); } void Queue::cancel(Consumer* c){ RWlock::ScopedWlock locker(consumerLock); + if (c->preAcquires()) { + cancel(c, acquirers); + } else { + cancel(c, browsers); + } + if(exclusive == c) exclusive = 0; +} + +void Queue::cancel(Consumer* c, Consumers& consumers) +{ Consumers::iterator i = std::find(consumers.begin(), consumers.end(), c); if (i != consumers.end()) consumers.erase(i); - if(exclusive == c) exclusive = 0; } QueuedMessage Queue::dequeue(){ @@ -233,12 +297,12 @@ uint32_t Queue::getMessageCount() const{ uint32_t Queue::getConsumerCount() const{ RWlock::ScopedRlock locker(consumerLock); - return consumers.size(); + return acquirers.size() + browsers.size(); } bool Queue::canAutoDelete() const{ RWlock::ScopedRlock locker(consumerLock); - return autodelete && consumers.size() == 0; + return autodelete && acquirers.empty() && browsers.empty(); } // return true if store exists, |