diff options
Diffstat (limited to 'cpp/src/qpid/broker/Queue.cpp')
-rw-r--r-- | cpp/src/qpid/broker/Queue.cpp | 67 |
1 files changed, 31 insertions, 36 deletions
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp index e190a82485..16e91fc1cf 100644 --- a/cpp/src/qpid/broker/Queue.cpp +++ b/cpp/src/qpid/broker/Queue.cpp @@ -139,32 +139,32 @@ void Queue::requestDispatch(Consumer* c, bool sync){ } } -bool Queue::dispatch(QueuedMessage& msg){ - - - RWlock::ScopedWlock locker(consumerLock); /// lock scope to wide.... +Consumer* Queue::allocate() +{ + RWlock::ScopedWlock locker(consumerLock); if(acquirers.empty()){ - return false; + return 0; }else if(exclusive){ - return exclusive->deliver(msg); + return exclusive; }else{ - //deliver to next consumer next = next % acquirers.size(); - Consumer* c = acquirers[next]; - int start = next; - while(c){ - next++; - if(c->deliver(msg)) { - return true; - } - next = next % acquirers.size(); - c = next == start ? 0 : acquirers[next]; - } - return false; + return acquirers[next++]; } } +bool Queue::dispatch(QueuedMessage& msg) +{ + Consumer* c = allocate(); + int start = next; + while(c){ + if(c->deliver(msg)) { + return true; + } + c = next == start ? 0 : allocate(); + } + return false; +} void Queue::dispatch(){ QueuedMessage msg; @@ -188,27 +188,22 @@ void Queue::dispatch(){ void Queue::serviceBrowser(Consumer* browser) { - //This is a poorly performing implementation: - // - // * bad concurrency where browsers exist - // * inefficient for largish queues - // - //The queue needs to be based on a current data structure that - //does not invalidate iterators when modified. Subscribers could - //then use an iterator to continue from where they left off + QueuedMessage msg; + while (seek(msg, browser->position) && browser->deliver(msg)) { + browser->position = msg.position; + } +} +bool Queue::seek(QueuedMessage& msg, const framing::SequenceNumber& position) { Mutex::ScopedLock locker(messageLock); - if (!messages.empty() && messages.back().position > browser->position) { - for (Messages::iterator i = messages.begin(); i != messages.end(); i++) { - if (i->position > browser->position) { - if (browser->deliver(*i)) { - browser->position = i->position; - } else { - break; - } - } - } + if (!messages.empty() && messages.back().position > position) { + uint index = (position - messages.front().position) + 1; + if (index < messages.size()) { + msg = messages[index]; + return true; + } } + return false; } void Queue::consume(Consumer* c, bool requestExclusive){ |