summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2007-10-12 12:08:40 +0000
committerGordon Sim <gsim@apache.org>2007-10-12 12:08:40 +0000
commit3d2ce1b5656bbba8b23b31848616b1010f46ede9 (patch)
treece36757d28f41739e4c79de58cf95142cc79a71c /cpp/src
parentcbca97b00d9fad64adcbdc860cd9f8633ca31f96 (diff)
downloadqpid-python-3d2ce1b5656bbba8b23b31848616b1010f46ede9.tar.gz
Some fixes to locking within the queue (preventing locks being held during delivery to a consumer)
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@584144 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/qpid/broker/Queue.cpp67
-rw-r--r--cpp/src/qpid/broker/Queue.h2
2 files changed, 33 insertions, 36 deletions
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp
index e190a82485..16e91fc1cf 100644
--- a/cpp/src/qpid/broker/Queue.cpp
+++ b/cpp/src/qpid/broker/Queue.cpp
@@ -139,32 +139,32 @@ void Queue::requestDispatch(Consumer* c, bool sync){
}
}
-bool Queue::dispatch(QueuedMessage& msg){
-
-
- RWlock::ScopedWlock locker(consumerLock); /// lock scope to wide....
+Consumer* Queue::allocate()
+{
+ RWlock::ScopedWlock locker(consumerLock);
if(acquirers.empty()){
- return false;
+ return 0;
}else if(exclusive){
- return exclusive->deliver(msg);
+ return exclusive;
}else{
- //deliver to next consumer
next = next % acquirers.size();
- Consumer* c = acquirers[next];
- int start = next;
- while(c){
- next++;
- if(c->deliver(msg)) {
- return true;
- }
- next = next % acquirers.size();
- c = next == start ? 0 : acquirers[next];
- }
- return false;
+ return acquirers[next++];
}
}
+bool Queue::dispatch(QueuedMessage& msg)
+{
+ Consumer* c = allocate();
+ int start = next;
+ while(c){
+ if(c->deliver(msg)) {
+ return true;
+ }
+ c = next == start ? 0 : allocate();
+ }
+ return false;
+}
void Queue::dispatch(){
QueuedMessage msg;
@@ -188,27 +188,22 @@ void Queue::dispatch(){
void Queue::serviceBrowser(Consumer* browser)
{
- //This is a poorly performing implementation:
- //
- // * bad concurrency where browsers exist
- // * inefficient for largish queues
- //
- //The queue needs to be based on a current data structure that
- //does not invalidate iterators when modified. Subscribers could
- //then use an iterator to continue from where they left off
+ QueuedMessage msg;
+ while (seek(msg, browser->position) && browser->deliver(msg)) {
+ browser->position = msg.position;
+ }
+}
+bool Queue::seek(QueuedMessage& msg, const framing::SequenceNumber& position) {
Mutex::ScopedLock locker(messageLock);
- if (!messages.empty() && messages.back().position > browser->position) {
- for (Messages::iterator i = messages.begin(); i != messages.end(); i++) {
- if (i->position > browser->position) {
- if (browser->deliver(*i)) {
- browser->position = i->position;
- } else {
- break;
- }
- }
- }
+ if (!messages.empty() && messages.back().position > position) {
+ uint index = (position - messages.front().position) + 1;
+ if (index < messages.size()) {
+ msg = messages[index];
+ return true;
+ }
}
+ return false;
}
void Queue::consume(Consumer* c, bool requestExclusive){
diff --git a/cpp/src/qpid/broker/Queue.h b/cpp/src/qpid/broker/Queue.h
index 7ee9106ef0..e02444642b 100644
--- a/cpp/src/qpid/broker/Queue.h
+++ b/cpp/src/qpid/broker/Queue.h
@@ -94,6 +94,8 @@ namespace qpid {
void dispatch();
void cancel(Consumer* c, Consumers& set);
void serviceBrowser(Consumer* c);
+ Consumer* allocate();
+ bool seek(QueuedMessage& msg, const framing::SequenceNumber& position);
protected:
/**