summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--cpp/src/qpid/broker/Queue.cpp6
-rw-r--r--cpp/src/qpid/cluster/Connection.cpp1
-rw-r--r--cpp/src/qpid/cluster/UpdateClient.cpp1
-rw-r--r--python/tests_0-10/message.py35
4 files changed, 42 insertions, 1 deletions
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp
index f260482db9..11b2682575 100644
--- a/cpp/src/qpid/broker/Queue.cpp
+++ b/cpp/src/qpid/broker/Queue.cpp
@@ -217,7 +217,11 @@ void Queue::requeue(const QueuedMessage& msg){
Mutex::ScopedLock locker(messageLock);
if (!isEnqueued(msg)) return;
msg.payload->enqueueComplete(); // mark the message as enqueued
- messages.push_front(msg);
+ //put message back in correct position:
+ Messages::reverse_iterator i = messages.rbegin();
+ while (i != messages.rend() && msg.position < i->position) { ++i; }
+ messages.insert(i.base(), msg);
+
listeners.populate(copy);
// for persistLastNode - don't force a message twice to disk, but force it if no force before
diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp
index 6873827b81..841c3b610d 100644
--- a/cpp/src/qpid/cluster/Connection.cpp
+++ b/cpp/src/qpid/cluster/Connection.cpp
@@ -378,6 +378,7 @@ void Connection::deliveryRecord(const string& qname,
if (acquired) { // Message is on the update queue
m = getUpdateMessage();
m.queue = queue.get();
+ m.position = position;
if (enqueued) queue->enqueued(m); //inform queue of the message
} else { // Message at original position in original queue
m = queue->find(position);
diff --git a/cpp/src/qpid/cluster/UpdateClient.cpp b/cpp/src/qpid/cluster/UpdateClient.cpp
index a9761962e8..f263577fd3 100644
--- a/cpp/src/qpid/cluster/UpdateClient.cpp
+++ b/cpp/src/qpid/cluster/UpdateClient.cpp
@@ -252,6 +252,7 @@ void UpdateClient::updateQueue(client::AsyncSession& s, const boost::shared_ptr<
MessageUpdater updater(q->getName(), s, expiry);
q->eachMessage(boost::bind(&MessageUpdater::updateQueuedMessage, &updater, _1));
q->eachBinding(boost::bind(&UpdateClient::updateBinding, this, s, q->getName(), _1));
+ ClusterConnectionProxy(s).queuePosition(q->getName(), q->getPosition());
}
void UpdateClient::updateExclusiveQueue(const boost::shared_ptr<broker::Queue>& q) {
diff --git a/python/tests_0-10/message.py b/python/tests_0-10/message.py
index d40d5e811e..e80333a1e6 100644
--- a/python/tests_0-10/message.py
+++ b/python/tests_0-10/message.py
@@ -815,6 +815,41 @@ class MessageTests(TestBase010):
#ensure there are no other messages
self.assertEmpty(queueC)
+ def test_release_order(self):
+ session = self.session
+
+ #create queue
+ session.queue_declare(queue = "q", exclusive=True, auto_delete=True)
+
+ #send messages
+ for i in range(1, 11):
+ session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "message-%d" % (i)))
+
+ #subscribe:
+ session.message_subscribe(queue="q", destination="a")
+ a = session.incoming("a")
+ session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFFL, destination = "a")
+ session.message_flow(unit = session.credit_unit.message, value = 10, destination = "a")
+
+ for i in range(1, 11):
+ msg = a.get(timeout = 1)
+ self.assertEquals("message-%d" % (i), msg.body)
+ if (i % 2):
+ #accept all odd messages
+ session.message_accept(RangedSet(msg.id))
+ else:
+ #release all even messages
+ session.message_release(RangedSet(msg.id))
+
+ #browse:
+ session.message_subscribe(queue="q", destination="b", acquire_mode=1)
+ b = session.incoming("b")
+ b.start()
+ for i in [2, 4, 6, 8, 10]:
+ msg = b.get(timeout = 1)
+ self.assertEquals("message-%d" % (i), msg.body)
+
+
def test_empty_body(self):
session = self.session
session.queue_declare(queue="xyz", exclusive=True, auto_delete=True)