diff options
author | Gordon Sim <gsim@apache.org> | 2008-02-14 11:49:17 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2008-02-14 11:49:17 +0000 |
commit | 02507ea2ad2f9f826f99b68c3daccaa88ad0748c (patch) | |
tree | c28134fa81bf79bc048c79439e15ffbfb6721306 | |
parent | 299cb36acd8d12611f5b0daa28f03ffd004f0500 (diff) | |
download | qpid-python-02507ea2ad2f9f826f99b68c3daccaa88ad0748c.tar.gz |
Fixed bug in browsing that failed to deal correctly with 'gaps' in message sequence.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@627718 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | cpp/src/qpid/broker/Queue.cpp | 21 | ||||
-rw-r--r-- | python/tests_0-10/message.py | 43 |
2 files changed, 57 insertions, 7 deletions
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp index c7dd656a4e..d34ca06364 100644 --- a/cpp/src/qpid/broker/Queue.cpp +++ b/cpp/src/qpid/broker/Queue.cpp @@ -170,12 +170,17 @@ void Queue::requeue(const QueuedMessage& msg){ bool Queue::acquire(const QueuedMessage& msg) { Mutex::ScopedLock locker(messageLock); + QPID_LOG(debug, "attempting to acquire " << msg.position); for (Messages::iterator i = messages.begin(); i != messages.end(); i++) { if (i->position == msg.position) { messages.erase(i); + QPID_LOG(debug, "Match found, acquire succeeded: " << i->position << " == " << msg.position); return true; + } else { + QPID_LOG(debug, "No match: " << i->position << " != " << msg.position); } } + QPID_LOG(debug, "Acquire failed for " << msg.position); return false; } @@ -255,8 +260,8 @@ bool Queue::browseNextMessage(QueuedMessage& m, Consumer& c) m = msg; return true; } else { - //consumer hasn't got enough credit for the message - QPID_LOG(debug, "Consumer can't currently accept message from '" << name << "'"); + //browser hasn't got enough credit for the message + QPID_LOG(debug, "Browser can't currently accept message from '" << name << "'"); return false; } } else { @@ -304,11 +309,13 @@ bool Queue::seek(QueuedMessage& msg, Consumer& c) { msg = messages.front(); return true; } else { - uint index = (c.position - messages.front().position) + 1; - if (index < messages.size()) { - msg = messages[index]; - return true; - } + //TODO: can improve performance of this search, for now just searching linearly from end + Messages::reverse_iterator pos; + for (Messages::reverse_iterator i = messages.rbegin(); i != messages.rend() && i->position > c.position; i++) { + pos = i; + } + msg = *pos; + return true; } } addListener(c); diff --git a/python/tests_0-10/message.py b/python/tests_0-10/message.py index c251e6aca0..65aab2870f 100644 --- a/python/tests_0-10/message.py +++ b/python/tests_0-10/message.py @@ -720,6 +720,49 @@ class MessageTests(TestBase): #check all 'browsed' messages are still on the queue self.assertEqual(5, channel.queue_query(queue="q").message_count) + def test_subscribe_not_acquired_3(self): + channel = self.channel + + #publish some messages + self.queue_declare(queue = "q", exclusive=True, auto_delete=True) + for i in range(1, 11): + channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "message-%d" % (i))) + + #create a not-acquired subscriber + channel.message_subscribe(queue = "q", destination = "a", confirm_mode = 1, acquire_mode=1) + channel.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "a") + channel.message_flow(unit = 0, value = 10, destination = "a") + + #browse through messages + queue = self.client.queue("a") + for i in range(1, 11): + msg = queue.get(timeout = 1) + self.assertEquals("message-%d" % (i), msg.content.body) + if (i % 2): + #try to acquire every second message + channel.message_acquire([msg.command_id, msg.command_id]) + #check that acquire succeeds + response = channel.control_queue.get(timeout=1) + self.assertEquals(response.transfers, [msg.command_id, msg.command_id]) + msg.complete() + self.assertEmpty(queue) + + #create a second not-acquired subscriber + channel.message_subscribe(queue = "q", destination = "b", confirm_mode = 1, acquire_mode=1) + channel.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "b") + channel.message_flow(unit = 0, value = 1, destination = "b") + #check it gets those not consumed + queue = self.client.queue("b") + for i in [2,4,6,8,10]: + msg = queue.get(timeout = 1) + self.assertEquals("message-%d" % (i), msg.content.body) + msg.complete() + channel.message_flow(unit = 0, value = 1, destination = "b") + self.assertEmpty(queue) + + #check all 'browsed' messages are still on the queue + self.assertEqual(5, channel.queue_query(queue="q").message_count) + def test_no_size(self): self.queue_declare(queue = "q", exclusive=True, auto_delete=True) |