diff options
author | Gordon Sim <gsim@apache.org> | 2007-10-12 12:08:40 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2007-10-12 12:08:40 +0000 |
commit | 3d2ce1b5656bbba8b23b31848616b1010f46ede9 (patch) | |
tree | ce36757d28f41739e4c79de58cf95142cc79a71c /cpp/src | |
parent | cbca97b00d9fad64adcbdc860cd9f8633ca31f96 (diff) | |
download | qpid-python-3d2ce1b5656bbba8b23b31848616b1010f46ede9.tar.gz |
Some fixes to locking within the queue (preventing locks being held during delivery to a consumer)
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@584144 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/qpid/broker/Queue.cpp | 67 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Queue.h | 2 |
2 files changed, 33 insertions, 36 deletions
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp index e190a82485..16e91fc1cf 100644 --- a/cpp/src/qpid/broker/Queue.cpp +++ b/cpp/src/qpid/broker/Queue.cpp @@ -139,32 +139,32 @@ void Queue::requestDispatch(Consumer* c, bool sync){ } } -bool Queue::dispatch(QueuedMessage& msg){ - - - RWlock::ScopedWlock locker(consumerLock); /// lock scope to wide.... +Consumer* Queue::allocate() +{ + RWlock::ScopedWlock locker(consumerLock); if(acquirers.empty()){ - return false; + return 0; }else if(exclusive){ - return exclusive->deliver(msg); + return exclusive; }else{ - //deliver to next consumer next = next % acquirers.size(); - Consumer* c = acquirers[next]; - int start = next; - while(c){ - next++; - if(c->deliver(msg)) { - return true; - } - next = next % acquirers.size(); - c = next == start ? 0 : acquirers[next]; - } - return false; + return acquirers[next++]; } } +bool Queue::dispatch(QueuedMessage& msg) +{ + Consumer* c = allocate(); + int start = next; + while(c){ + if(c->deliver(msg)) { + return true; + } + c = next == start ? 0 : allocate(); + } + return false; +} void Queue::dispatch(){ QueuedMessage msg; @@ -188,27 +188,22 @@ void Queue::dispatch(){ void Queue::serviceBrowser(Consumer* browser) { - //This is a poorly performing implementation: - // - // * bad concurrency where browsers exist - // * inefficient for largish queues - // - //The queue needs to be based on a current data structure that - //does not invalidate iterators when modified. Subscribers could - //then use an iterator to continue from where they left off + QueuedMessage msg; + while (seek(msg, browser->position) && browser->deliver(msg)) { + browser->position = msg.position; + } +} +bool Queue::seek(QueuedMessage& msg, const framing::SequenceNumber& position) { Mutex::ScopedLock locker(messageLock); - if (!messages.empty() && messages.back().position > browser->position) { - for (Messages::iterator i = messages.begin(); i != messages.end(); i++) { - if (i->position > browser->position) { - if (browser->deliver(*i)) { - browser->position = i->position; - } else { - break; - } - } - } + if (!messages.empty() && messages.back().position > position) { + uint index = (position - messages.front().position) + 1; + if (index < messages.size()) { + msg = messages[index]; + return true; + } } + return false; } void Queue::consume(Consumer* c, bool requestExclusive){ diff --git a/cpp/src/qpid/broker/Queue.h b/cpp/src/qpid/broker/Queue.h index 7ee9106ef0..e02444642b 100644 --- a/cpp/src/qpid/broker/Queue.h +++ b/cpp/src/qpid/broker/Queue.h @@ -94,6 +94,8 @@ namespace qpid { void dispatch(); void cancel(Consumer* c, Consumers& set); void serviceBrowser(Consumer* c); + Consumer* allocate(); + bool seek(QueuedMessage& msg, const framing::SequenceNumber& position); protected: /** |