summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2012-05-28 18:24:06 +0000
committerAlan Conway <aconway@apache.org>2012-05-28 18:24:06 +0000
commita78cef0941374e4aa27c9025fbb3b5a43686b8fd (patch)
treef82688402704057000bfa13ce775fb8ad2f036ff /cpp/src
parent2fcdb5bc17a6ae502a3af7df4ba66dd7adb79dfa (diff)
downloadqpid-python-a78cef0941374e4aa27c9025fbb3b5a43686b8fd.tar.gz
QPID-3603: Allow Queue::setPosition() to truncate the queue.
In the new HA code a backup may sometimes be ahead of the new primary after a fail-over. In that case the backup truncates it's queues to the same position as the primary so it can continue replicating. (Note the assertions added to verify setPosition showed up a minor bug in the old cluster code, which was leaving messages on the cluster update queue after an update. This patch fixes the issue.) git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1343347 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/qpid/broker/MessageDeque.cpp20
-rw-r--r--cpp/src/qpid/broker/MessageDeque.h2
-rw-r--r--cpp/src/qpid/broker/MessageMap.cpp11
-rw-r--r--cpp/src/qpid/broker/MessageMap.h3
-rw-r--r--cpp/src/qpid/broker/Messages.h9
-rw-r--r--cpp/src/qpid/broker/PriorityQueue.cpp4
-rw-r--r--cpp/src/qpid/broker/PriorityQueue.h1
-rw-r--r--cpp/src/qpid/broker/Queue.cpp85
-rw-r--r--cpp/src/qpid/broker/Queue.h18
-rw-r--r--cpp/src/qpid/cluster/Connection.cpp1
-rw-r--r--cpp/src/tests/Makefile.am2
-rw-r--r--cpp/src/tests/QueueTest.cpp246
12 files changed, 310 insertions, 92 deletions
diff --git a/cpp/src/qpid/broker/MessageDeque.cpp b/cpp/src/qpid/broker/MessageDeque.cpp
index f70c996975..474e4139bd 100644
--- a/cpp/src/qpid/broker/MessageDeque.cpp
+++ b/cpp/src/qpid/broker/MessageDeque.cpp
@@ -173,6 +173,26 @@ void MessageDeque::updateAcquired(const QueuedMessage& acquired)
}
}
+namespace {
+bool isNotDeleted(const QueuedMessage& qm) { return qm.status != QueuedMessage::DELETED; }
+} // namespace
+
+void MessageDeque::setPosition(const framing::SequenceNumber& n) {
+ size_t i = index(n+1);
+ if (i >= messages.size()) return; // Nothing to do.
+
+ // Assertion to verify the precondition: no messaages after n.
+ assert(std::find_if(messages.begin()+i, messages.end(), &isNotDeleted) ==
+ messages.end());
+ messages.erase(messages.begin()+i, messages.end());
+ if (head >= messages.size()) head = messages.size() - 1;
+ // Re-count the available messages
+ available = 0;
+ for (Deque::iterator i = messages.begin(); i != messages.end(); ++i) {
+ if (i->status == QueuedMessage::AVAILABLE) ++available;
+ }
+}
+
void MessageDeque::clean()
{
while (messages.size() && messages.front().status == QueuedMessage::DELETED) {
diff --git a/cpp/src/qpid/broker/MessageDeque.h b/cpp/src/qpid/broker/MessageDeque.h
index 9b53716d4e..c5670b2a72 100644
--- a/cpp/src/qpid/broker/MessageDeque.h
+++ b/cpp/src/qpid/broker/MessageDeque.h
@@ -44,7 +44,7 @@ class MessageDeque : public Messages
bool consume(QueuedMessage&);
bool push(const QueuedMessage& added, QueuedMessage& removed);
void updateAcquired(const QueuedMessage& acquired);
-
+ void setPosition(const framing::SequenceNumber&);
void foreach(Functor);
void removeIf(Predicate);
diff --git a/cpp/src/qpid/broker/MessageMap.cpp b/cpp/src/qpid/broker/MessageMap.cpp
index 9b164d4e5c..d6702a9336 100644
--- a/cpp/src/qpid/broker/MessageMap.cpp
+++ b/cpp/src/qpid/broker/MessageMap.cpp
@@ -21,6 +21,7 @@
#include "qpid/broker/MessageMap.h"
#include "qpid/broker/QueuedMessage.h"
#include "qpid/log/Statement.h"
+#include <algorithm>
namespace qpid {
namespace broker {
@@ -130,18 +131,24 @@ bool MessageMap::push(const QueuedMessage& added, QueuedMessage& removed)
QueuedMessage& a = messages[added.position];
a = added;
a.status = QueuedMessage::AVAILABLE;
- QPID_LOG(debug, "Added message at " << a.position);
+ QPID_LOG(debug, "Added message " << a);
return false;
} else {
//there is already a message with that key which needs to be replaced
removed = result.first->second;
result.first->second = replace(result.first->second, added);
result.first->second.status = QueuedMessage::AVAILABLE;
- QPID_LOG(debug, "Displaced message at " << removed.position << " with " << result.first->second.position << ": " << result.first->first);
+ QPID_LOG(debug, "Displaced message " << removed << " with " << result.first->second << ": " << result.first->first);
return true;
}
}
+void MessageMap::setPosition(const framing::SequenceNumber& seq) {
+ // Nothing to do, just assert that the precondition is respected and there
+ // are no undeleted messages after seq.
+ assert(messages.empty() || (--messages.end())->first <= seq);
+}
+
void MessageMap::foreach(Functor f)
{
for (Ordering::iterator i = messages.begin(); i != messages.end(); ++i) {
diff --git a/cpp/src/qpid/broker/MessageMap.h b/cpp/src/qpid/broker/MessageMap.h
index a668450250..1f0481cb6b 100644
--- a/cpp/src/qpid/broker/MessageMap.h
+++ b/cpp/src/qpid/broker/MessageMap.h
@@ -6,7 +6,7 @@
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
+o * regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
@@ -50,6 +50,7 @@ class MessageMap : public Messages
virtual bool browse(const framing::SequenceNumber&, QueuedMessage&, bool);
bool consume(QueuedMessage&);
virtual bool push(const QueuedMessage& added, QueuedMessage& removed);
+ void setPosition(const framing::SequenceNumber&);
void foreach(Functor);
virtual void removeIf(Predicate);
diff --git a/cpp/src/qpid/broker/Messages.h b/cpp/src/qpid/broker/Messages.h
index 61e9fa110a..45f5e6cd81 100644
--- a/cpp/src/qpid/broker/Messages.h
+++ b/cpp/src/qpid/broker/Messages.h
@@ -21,6 +21,7 @@
* under the License.
*
*/
+#include "qpid/framing/SequenceNumber.h"
#include <boost/function.hpp>
namespace qpid {
@@ -101,14 +102,22 @@ class Messages
virtual void updateAcquired(const QueuedMessage&) { }
/**
+ * Set the position of the back of the queue. Next message enqueued will be n+1.
+ *@pre Any messages with seq > n must already be dequeued.
+ */
+ virtual void setPosition(const framing::SequenceNumber& /*n*/) = 0;
+
+ /**
* Apply, the functor to each message held
*/
+
virtual void foreach(Functor) = 0;
/**
* Remove every message held that for which the specified
* predicate returns true
*/
virtual void removeIf(Predicate) = 0;
+
private:
};
}} // namespace qpid::broker
diff --git a/cpp/src/qpid/broker/PriorityQueue.cpp b/cpp/src/qpid/broker/PriorityQueue.cpp
index ab5ec7235a..9a0fead744 100644
--- a/cpp/src/qpid/broker/PriorityQueue.cpp
+++ b/cpp/src/qpid/broker/PriorityQueue.cpp
@@ -121,6 +121,10 @@ void PriorityQueue::updateAcquired(const QueuedMessage& acquired) {
fifo.updateAcquired(acquired);
}
+void PriorityQueue::setPosition(const framing::SequenceNumber& n) {
+ fifo.setPosition(n);
+}
+
void PriorityQueue::foreach(Functor f)
{
fifo.foreach(f);
diff --git a/cpp/src/qpid/broker/PriorityQueue.h b/cpp/src/qpid/broker/PriorityQueue.h
index 8628745db1..301367358b 100644
--- a/cpp/src/qpid/broker/PriorityQueue.h
+++ b/cpp/src/qpid/broker/PriorityQueue.h
@@ -52,6 +52,7 @@ class PriorityQueue : public Messages
bool consume(QueuedMessage&);
bool push(const QueuedMessage& added, QueuedMessage& removed);
void updateAcquired(const QueuedMessage& acquired);
+ void setPosition(const framing::SequenceNumber&);
void foreach(Functor);
void removeIf(Predicate);
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp
index 3d90490186..9df0ebf313 100644
--- a/cpp/src/qpid/broker/Queue.cpp
+++ b/cpp/src/qpid/broker/Queue.cpp
@@ -588,21 +588,51 @@ QueuedMessage Queue::get(){
return msg;
}
-bool collect_if_expired(std::deque<QueuedMessage>& expired, QueuedMessage& message)
+namespace {
+bool collectIf(QueuedMessage& qm, Messages::Predicate predicate,
+ std::deque<QueuedMessage>& collection)
{
- if (message.payload->hasExpired()) {
- expired.push_back(message);
+ if (predicate(qm)) {
+ collection.push_back(qm);
return true;
} else {
return false;
}
}
+bool isExpired(const QueuedMessage& qm) { return qm.payload->hasExpired(); }
+} // namespace
+
+void Queue::dequeueIf(Messages::Predicate predicate,
+ std::deque<QueuedMessage>& dequeued)
+{
+ {
+ Mutex::ScopedLock locker(messageLock);
+ messages->removeIf(boost::bind(&collectIf, _1, predicate, boost::ref(dequeued)));
+ }
+ if (!dequeued.empty()) {
+ if (mgmtObject) {
+ mgmtObject->inc_acquires(dequeued.size());
+ if (brokerMgmtObject)
+ brokerMgmtObject->inc_acquires(dequeued.size());
+ }
+ for (std::deque<QueuedMessage>::const_iterator i = dequeued.begin();
+ i != dequeued.end(); ++i) {
+ {
+ // KAG: should be safe to retake lock after the removeIf, since
+ // no other thread can touch these messages after the removeIf() call
+ Mutex::ScopedLock locker(messageLock);
+ observeAcquire(*i, locker);
+ }
+ dequeue( 0, *i );
+ }
+ }
+}
+
/**
*@param lapse: time since the last purgeExpired
*/
-void Queue::purgeExpired(qpid::sys::Duration lapse)
-{
+void Queue::purgeExpired(sys::Duration lapse) {
//As expired messages are discarded during dequeue also, only
//bother explicitly expiring if the rate of dequeues since last
//attempt is less than one per second.
@@ -610,37 +640,18 @@ void Queue::purgeExpired(qpid::sys::Duration lapse)
dequeueSincePurge -= count;
int seconds = int64_t(lapse)/qpid::sys::TIME_SEC;
if (seconds == 0 || count / seconds < 1) {
- std::deque<QueuedMessage> expired;
- {
- Mutex::ScopedLock locker(messageLock);
- messages->removeIf(boost::bind(&collect_if_expired, boost::ref(expired), _1));
- }
-
- if (!expired.empty()) {
+ std::deque<QueuedMessage> dequeued;
+ dequeueIf(boost::bind(&isExpired, _1), dequeued);
+ if (dequeued.size()) {
if (mgmtObject) {
- mgmtObject->inc_acquires(expired.size());
- mgmtObject->inc_discardsTtl(expired.size());
- if (brokerMgmtObject) {
- brokerMgmtObject->inc_acquires(expired.size());
- brokerMgmtObject->inc_discardsTtl(expired.size());
- }
- }
-
- for (std::deque<QueuedMessage>::const_iterator i = expired.begin();
- i != expired.end(); ++i) {
- {
- // KAG: should be safe to retake lock after the removeIf, since
- // no other thread can touch these messages after the removeIf() call
- Mutex::ScopedLock locker(messageLock);
- observeAcquire(*i, locker);
- }
- dequeue( 0, *i );
+ mgmtObject->inc_discardsTtl(dequeued.size());
+ if (brokerMgmtObject)
+ brokerMgmtObject->inc_discardsTtl(dequeued.size());
}
}
}
}
-
namespace {
// for use with purge/move below - collect messages that match a given filter
//
@@ -1661,8 +1672,22 @@ void Queue::query(qpid::types::Variant::Map& results) const
if (allocator) allocator->query(results);
}
+namespace {
+struct After {
+ framing::SequenceNumber seq;
+ After(framing::SequenceNumber s) : seq(s) {}
+ bool operator()(const QueuedMessage& qm) { return qm.position > seq; }
+};
+} // namespace
+
+
void Queue::setPosition(SequenceNumber n) {
Mutex::ScopedLock locker(messageLock);
+ if (n < sequence) {
+ std::deque<QueuedMessage> dequeued;
+ dequeueIf(After(n), dequeued);
+ messages->setPosition(n);
+ }
sequence = n;
QPID_LOG(trace, "Set position to " << sequence << " on " << getName());
}
diff --git a/cpp/src/qpid/broker/Queue.h b/cpp/src/qpid/broker/Queue.h
index 9869a698c1..ed1f63504b 100644
--- a/cpp/src/qpid/broker/Queue.h
+++ b/cpp/src/qpid/broker/Queue.h
@@ -175,6 +175,7 @@ class Queue : public boost::enable_shared_from_this<Queue>,
void configureImpl(const qpid::framing::FieldTable& settings);
void checkNotDeleted(const Consumer::shared_ptr& c);
void notifyDeleted();
+ void dequeueIf(Messages::Predicate predicate, std::deque<QueuedMessage>& dequeued);
public:
@@ -375,12 +376,21 @@ class Queue : public boost::enable_shared_from_this<Queue>,
std::for_each<Observers::iterator, F>(observers.begin(), observers.end(), f);
}
- /** Set the position sequence number for the next message on the queue.
- * Must be >= the current sequence number.
- * Used by cluster to replicate queues.
+ /**
+ * Set the sequence number for the back of the queue, the
+ * next message enqueued will be pos+1.
+ * If pos > getPosition() this creates a gap in the sequence numbers.
+ * if pos < getPosition() the back of the queue is reset to pos,
+ *
+ * The _caller_ must ensure that any messages after pos have been dequeued.
+ *
+ * Used by HA/cluster code for queue replication.
*/
QPID_BROKER_EXTERN void setPosition(framing::SequenceNumber pos);
- /** return current position sequence number for the next message on the queue.
+
+ /**
+ *@return sequence number for the back of the queue. The next message pushed
+ * will be at getPosition+1
*/
QPID_BROKER_EXTERN framing::SequenceNumber getPosition();
QPID_BROKER_EXTERN void addObserver(boost::shared_ptr<QueueObserver>);
diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp
index 3754e791f9..081d54ab49 100644
--- a/cpp/src/qpid/cluster/Connection.cpp
+++ b/cpp/src/qpid/cluster/Connection.cpp
@@ -524,6 +524,7 @@ broker::QueuedMessage Connection::getUpdateMessage() {
boost::shared_ptr<broker::Queue> updateq = findQueue(UpdateClient::UPDATE);
assert(!updateq->isDurable());
broker::QueuedMessage m = updateq->get();
+ updateq->dequeue(0, m);
if (!m.payload) throw Exception(QPID_MSG(cluster << " empty update queue"));
return m;
}
diff --git a/cpp/src/tests/Makefile.am b/cpp/src/tests/Makefile.am
index 3e705b074e..cdc7429f3b 100644
--- a/cpp/src/tests/Makefile.am
+++ b/cpp/src/tests/Makefile.am
@@ -67,7 +67,7 @@ tmodule_LTLIBRARIES=
TESTS+=unit_test
check_PROGRAMS+=unit_test
-unit_test_LDADD=-lboost_unit_test_framework \
+unit_test_LDADD=-lboost_unit_test_framework -lpthread \
$(lib_messaging) $(lib_broker) $(lib_console) $(lib_qmf2)
unit_test_SOURCES= unit_test.cpp unit_test.h \
diff --git a/cpp/src/tests/QueueTest.cpp b/cpp/src/tests/QueueTest.cpp
index db84614a9b..a481f703ce 100644
--- a/cpp/src/tests/QueueTest.cpp
+++ b/cpp/src/tests/QueueTest.cpp
@@ -31,6 +31,8 @@
#include "qpid/broker/QueueRegistry.h"
#include "qpid/broker/NullMessageStore.h"
#include "qpid/broker/ExpiryPolicy.h"
+#include "qpid/framing/DeliveryProperties.h"
+#include "qpid/framing/FieldTable.h"
#include "qpid/framing/MessageTransferBody.h"
#include "qpid/client/QueueOptions.h"
#include "qpid/framing/AMQFrame.h"
@@ -40,10 +42,11 @@
#include "qpid/broker/QueueFlowLimit.h"
#include <iostream>
-#include "boost/format.hpp"
-
-using std::string;
+#include <vector>
+#include <boost/format.hpp>
+#include <boost/lexical_cast.hpp>
+using namespace std;
using boost::intrusive_ptr;
using namespace qpid;
using namespace qpid::broker;
@@ -85,7 +88,7 @@ public:
Message& getMessage() { return *(msg.get()); }
};
-intrusive_ptr<Message> create_message(std::string exchange, std::string routingKey, uint64_t ttl = 0) {
+intrusive_ptr<Message> createMessage(std::string exchange, std::string routingKey, uint64_t ttl = 0) {
intrusive_ptr<Message> msg(new Message());
AMQFrame method((MessageTransferBody(ProtocolVersion(), exchange, 0, 0)));
AMQFrame header((AMQHeaderBody()));
@@ -96,6 +99,16 @@ intrusive_ptr<Message> create_message(std::string exchange, std::string routingK
return msg;
}
+intrusive_ptr<Message> contentMessage(string content) {
+ intrusive_ptr<Message> m(MessageUtils::createMessage());
+ MessageUtils::addContent(m, content);
+ return m;
+}
+
+string getContent(intrusive_ptr<Message> m) {
+ return m->getFrames().getContent();
+}
+
QPID_AUTO_TEST_SUITE(QueueTestSuite)
QPID_AUTO_TEST_CASE(testAsyncMessage) {
@@ -107,7 +120,7 @@ QPID_AUTO_TEST_CASE(testAsyncMessage) {
//Test basic delivery:
- intrusive_ptr<Message> msg1 = create_message("e", "A");
+ intrusive_ptr<Message> msg1 = createMessage("e", "A");
msg1->enqueueAsync(queue, 0);//this is done on enqueue which is not called from process
queue->process(msg1);
sleep(2);
@@ -122,7 +135,7 @@ QPID_AUTO_TEST_CASE(testAsyncMessage) {
QPID_AUTO_TEST_CASE(testAsyncMessageCount){
Queue::shared_ptr queue(new Queue("my_test_queue", true));
- intrusive_ptr<Message> msg1 = create_message("e", "A");
+ intrusive_ptr<Message> msg1 = createMessage("e", "A");
msg1->enqueueAsync(queue, 0);//this is done on enqueue which is not called from process
queue->process(msg1);
@@ -147,9 +160,9 @@ QPID_AUTO_TEST_CASE(testConsumers){
BOOST_CHECK_EQUAL(uint32_t(2), queue->getConsumerCount());
//Test basic delivery:
- intrusive_ptr<Message> msg1 = create_message("e", "A");
- intrusive_ptr<Message> msg2 = create_message("e", "B");
- intrusive_ptr<Message> msg3 = create_message("e", "C");
+ intrusive_ptr<Message> msg1 = createMessage("e", "A");
+ intrusive_ptr<Message> msg2 = createMessage("e", "B");
+ intrusive_ptr<Message> msg3 = createMessage("e", "C");
queue->deliver(msg1);
BOOST_CHECK(queue->dispatch(c1));
@@ -193,9 +206,9 @@ QPID_AUTO_TEST_CASE(testRegistry){
QPID_AUTO_TEST_CASE(testDequeue){
Queue::shared_ptr queue(new Queue("my_queue", true));
- intrusive_ptr<Message> msg1 = create_message("e", "A");
- intrusive_ptr<Message> msg2 = create_message("e", "B");
- intrusive_ptr<Message> msg3 = create_message("e", "C");
+ intrusive_ptr<Message> msg1 = createMessage("e", "A");
+ intrusive_ptr<Message> msg2 = createMessage("e", "B");
+ intrusive_ptr<Message> msg3 = createMessage("e", "C");
intrusive_ptr<Message> received;
queue->deliver(msg1);
@@ -267,9 +280,9 @@ QPID_AUTO_TEST_CASE(testPersistLastNodeStanding){
Queue::shared_ptr queue(new Queue("my-queue", true));
queue->configure(args);
- intrusive_ptr<Message> msg1 = create_message("e", "A");
- intrusive_ptr<Message> msg2 = create_message("e", "B");
- intrusive_ptr<Message> msg3 = create_message("e", "C");
+ intrusive_ptr<Message> msg1 = createMessage("e", "A");
+ intrusive_ptr<Message> msg2 = createMessage("e", "B");
+ intrusive_ptr<Message> msg3 = createMessage("e", "C");
//enqueue 2 messages
queue->deliver(msg1);
@@ -293,9 +306,9 @@ QPID_AUTO_TEST_CASE(testSeek){
Queue::shared_ptr queue(new Queue("my-queue", true));
- intrusive_ptr<Message> msg1 = create_message("e", "A");
- intrusive_ptr<Message> msg2 = create_message("e", "B");
- intrusive_ptr<Message> msg3 = create_message("e", "C");
+ intrusive_ptr<Message> msg1 = createMessage("e", "A");
+ intrusive_ptr<Message> msg2 = createMessage("e", "B");
+ intrusive_ptr<Message> msg3 = createMessage("e", "C");
//enqueue 2 messages
queue->deliver(msg1);
@@ -319,9 +332,9 @@ QPID_AUTO_TEST_CASE(testSearch){
Queue::shared_ptr queue(new Queue("my-queue", true));
- intrusive_ptr<Message> msg1 = create_message("e", "A");
- intrusive_ptr<Message> msg2 = create_message("e", "B");
- intrusive_ptr<Message> msg3 = create_message("e", "C");
+ intrusive_ptr<Message> msg1 = createMessage("e", "A");
+ intrusive_ptr<Message> msg2 = createMessage("e", "B");
+ intrusive_ptr<Message> msg3 = createMessage("e", "C");
//enqueue 2 messages
queue->deliver(msg1);
@@ -433,10 +446,10 @@ QPID_AUTO_TEST_CASE(testLVQOrdering){
Queue::shared_ptr queue(new Queue("my-queue", true ));
queue->configure(args);
- intrusive_ptr<Message> msg1 = create_message("e", "A");
- intrusive_ptr<Message> msg2 = create_message("e", "B");
- intrusive_ptr<Message> msg3 = create_message("e", "C");
- intrusive_ptr<Message> msg4 = create_message("e", "D");
+ intrusive_ptr<Message> msg1 = createMessage("e", "A");
+ intrusive_ptr<Message> msg2 = createMessage("e", "B");
+ intrusive_ptr<Message> msg3 = createMessage("e", "C");
+ intrusive_ptr<Message> msg4 = createMessage("e", "D");
intrusive_ptr<Message> received;
//set deliever match for LVQ a,b,c,a
@@ -468,9 +481,9 @@ QPID_AUTO_TEST_CASE(testLVQOrdering){
received = queue->get().payload;
BOOST_CHECK_EQUAL(msg3.get(), received.get());
- intrusive_ptr<Message> msg5 = create_message("e", "A");
- intrusive_ptr<Message> msg6 = create_message("e", "B");
- intrusive_ptr<Message> msg7 = create_message("e", "C");
+ intrusive_ptr<Message> msg5 = createMessage("e", "A");
+ intrusive_ptr<Message> msg6 = createMessage("e", "B");
+ intrusive_ptr<Message> msg7 = createMessage("e", "C");
msg5->insertCustomProperty(key,"a");
msg6->insertCustomProperty(key,"b");
msg7->insertCustomProperty(key,"c");
@@ -500,8 +513,8 @@ QPID_AUTO_TEST_CASE(testLVQEmptyKey){
Queue::shared_ptr queue(new Queue("my-queue", true ));
queue->configure(args);
- intrusive_ptr<Message> msg1 = create_message("e", "A");
- intrusive_ptr<Message> msg2 = create_message("e", "B");
+ intrusive_ptr<Message> msg1 = createMessage("e", "A");
+ intrusive_ptr<Message> msg2 = createMessage("e", "B");
string key;
args.getLVQKey(key);
@@ -526,12 +539,12 @@ QPID_AUTO_TEST_CASE(testLVQAcquire){
Queue::shared_ptr queue(new Queue("my-queue", true ));
queue->configure(args);
- intrusive_ptr<Message> msg1 = create_message("e", "A");
- intrusive_ptr<Message> msg2 = create_message("e", "B");
- intrusive_ptr<Message> msg3 = create_message("e", "C");
- intrusive_ptr<Message> msg4 = create_message("e", "D");
- intrusive_ptr<Message> msg5 = create_message("e", "F");
- intrusive_ptr<Message> msg6 = create_message("e", "G");
+ intrusive_ptr<Message> msg1 = createMessage("e", "A");
+ intrusive_ptr<Message> msg2 = createMessage("e", "B");
+ intrusive_ptr<Message> msg3 = createMessage("e", "C");
+ intrusive_ptr<Message> msg4 = createMessage("e", "D");
+ intrusive_ptr<Message> msg5 = createMessage("e", "F");
+ intrusive_ptr<Message> msg6 = createMessage("e", "G");
//set deliever match for LVQ a,b,c,a
@@ -603,8 +616,8 @@ QPID_AUTO_TEST_CASE(testLVQMultiQueue){
queue1->configure(args);
queue2->configure(args);
- intrusive_ptr<Message> msg1 = create_message("e", "A");
- intrusive_ptr<Message> msg2 = create_message("e", "A");
+ intrusive_ptr<Message> msg1 = createMessage("e", "A");
+ intrusive_ptr<Message> msg2 = createMessage("e", "A");
string key;
args.getLVQKey(key);
@@ -647,8 +660,8 @@ QPID_AUTO_TEST_CASE(testLVQRecover){
intrusive_ptr<Message> received;
queue1->create(args);
- intrusive_ptr<Message> msg1 = create_message("e", "A");
- intrusive_ptr<Message> msg2 = create_message("e", "A");
+ intrusive_ptr<Message> msg1 = createMessage("e", "A");
+ intrusive_ptr<Message> msg2 = createMessage("e", "A");
// 2
string key;
args.getLVQKey(key);
@@ -675,7 +688,7 @@ QPID_AUTO_TEST_CASE(testLVQRecover){
void addMessagesToQueue(uint count, Queue& queue, uint oddTtl = 200, uint evenTtl = 0)
{
for (uint i = 0; i < count; i++) {
- intrusive_ptr<Message> m = create_message("exchange", "key", i % 2 ? oddTtl : evenTtl);
+ intrusive_ptr<Message> m = createMessage("exchange", "key", i % 2 ? oddTtl : evenTtl);
m->computeExpiration(new broker::ExpiryPolicy);
queue.deliver(m);
}
@@ -738,7 +751,7 @@ QPID_AUTO_TEST_CASE(testGroupsMultiConsumer) {
std::string("b"), std::string("b"), std::string("b"),
std::string("c"), std::string("c"), std::string("c") };
for (int i = 0; i < 9; ++i) {
- intrusive_ptr<Message> msg = create_message("e", "A");
+ intrusive_ptr<Message> msg = createMessage("e", "A");
msg->insertCustomProperty("GROUP-ID", groups[i]);
msg->insertCustomProperty("MY-ID", i);
queue->deliver(msg);
@@ -885,7 +898,7 @@ QPID_AUTO_TEST_CASE(testGroupsMultiConsumer) {
// Queue = a-2,
// Owners= ^C3,
- intrusive_ptr<Message> msg = create_message("e", "A");
+ intrusive_ptr<Message> msg = createMessage("e", "A");
msg->insertCustomProperty("GROUP-ID", "a");
msg->insertCustomProperty("MY-ID", 9);
queue->deliver(msg);
@@ -896,7 +909,7 @@ QPID_AUTO_TEST_CASE(testGroupsMultiConsumer) {
gotOne = queue->dispatch(c2);
BOOST_CHECK( !gotOne );
- msg = create_message("e", "A");
+ msg = createMessage("e", "A");
msg->insertCustomProperty("GROUP-ID", "b");
msg->insertCustomProperty("MY-ID", 10);
queue->deliver(msg);
@@ -927,7 +940,7 @@ QPID_AUTO_TEST_CASE(testGroupsMultiConsumerDefaults) {
queue->configure(args);
for (int i = 0; i < 3; ++i) {
- intrusive_ptr<Message> msg = create_message("e", "A");
+ intrusive_ptr<Message> msg = createMessage("e", "A");
// no "GROUP-ID" header
msg->insertCustomProperty("MY-ID", i);
queue->deliver(msg);
@@ -990,7 +1003,7 @@ QPID_AUTO_TEST_CASE(testMultiQueueLastNode){
Queue::shared_ptr queue2(new Queue("queue2", true, &testStore ));
queue2->create(args);
- intrusive_ptr<Message> msg1 = create_message("e", "A");
+ intrusive_ptr<Message> msg1 = createMessage("e", "A");
queue1->deliver(msg1);
queue2->deliver(msg1);
@@ -1006,7 +1019,7 @@ QPID_AUTO_TEST_CASE(testMultiQueueLastNode){
queue2->setLastNodeFailure();
BOOST_CHECK_EQUAL(testStore.enqCnt, 2u);
- intrusive_ptr<Message> msg2 = create_message("e", "B");
+ intrusive_ptr<Message> msg2 = createMessage("e", "B");
queue1->deliver(msg2);
queue2->deliver(msg2);
@@ -1021,7 +1034,7 @@ QPID_AUTO_TEST_CASE(testMultiQueueLastNode){
queue1->clearLastNodeFailure();
queue2->clearLastNodeFailure();
- intrusive_ptr<Message> msg3 = create_message("e", "B");
+ intrusive_ptr<Message> msg3 = createMessage("e", "B");
queue1->deliver(msg3);
queue2->deliver(msg3);
BOOST_CHECK_EQUAL(testStore.enqCnt, 4u);
@@ -1035,8 +1048,8 @@ QPID_AUTO_TEST_CASE(testMultiQueueLastNode){
* internal details not part of the queue abstraction.
// check requeue 1
- intrusive_ptr<Message> msg4 = create_message("e", "C");
- intrusive_ptr<Message> msg5 = create_message("e", "D");
+ intrusive_ptr<Message> msg4 = createMessage("e", "C");
+ intrusive_ptr<Message> msg5 = createMessage("e", "D");
framing::SequenceNumber sequence(1);
QueuedMessage qmsg1(queue1.get(), msg4, sequence);
@@ -1083,8 +1096,8 @@ not requeued to the store.
queue1->create(args);
// check requeue 1
- intrusive_ptr<Message> msg1 = create_message("e", "C");
- intrusive_ptr<Message> msg2 = create_message("e", "D");
+ intrusive_ptr<Message> msg1 = createMessage("e", "C");
+ intrusive_ptr<Message> msg2 = createMessage("e", "D");
queue1->recover(msg1);
@@ -1116,7 +1129,7 @@ simulate store exception going into last node standing
queue1->configure(args);
// check requeue 1
- intrusive_ptr<Message> msg1 = create_message("e", "C");
+ intrusive_ptr<Message> msg1 = createMessage("e", "C");
queue1->deliver(msg1);
testStore.createError();
@@ -1403,6 +1416,133 @@ QPID_AUTO_TEST_CASE(testFlowToDiskBlocking){
BOOST_CHECK_EQUAL(5u, tq9->getMessageCount());
}
+QPID_AUTO_TEST_CASE(testSetPositionFifo) {
+ Queue::shared_ptr q(new Queue("my-queue", true));
+ BOOST_CHECK_EQUAL(q->getPosition(), SequenceNumber(0));
+ for (int i = 0; i < 10; ++i)
+ q->deliver(contentMessage(boost::lexical_cast<string>(i+1)));
+
+ // Verify the front of the queue
+ TestConsumer::shared_ptr c(new TestConsumer("test", false)); // Don't acquire
+ BOOST_CHECK(q->dispatch(c));
+ BOOST_CHECK_EQUAL(1, c->last.position); // Numbered from 1
+ BOOST_CHECK_EQUAL("1", getContent(c->last.payload));
+ // Verify the back of the queue
+ QueuedMessage qm;
+ BOOST_CHECK_EQUAL(10, q->getPosition());
+ BOOST_CHECK(q->find(q->getPosition(), qm)); // Back of the queue
+ BOOST_CHECK_EQUAL("10", getContent(qm.payload));
+ BOOST_CHECK_EQUAL(10, q->getMessageCount());
+
+ // Using setPosition to introduce a gap in sequence numbers.
+ q->setPosition(15);
+ BOOST_CHECK_EQUAL(10, q->getMessageCount());
+ BOOST_CHECK_EQUAL(15, q->getPosition());
+ BOOST_CHECK(q->find(10, qm)); // Back of the queue
+ BOOST_CHECK_EQUAL("10", getContent(qm.payload));
+ q->deliver(contentMessage("16"));
+ c->setPosition(9);
+ BOOST_CHECK(q->dispatch(c));
+ BOOST_CHECK_EQUAL(10, c->last.position);
+ BOOST_CHECK_EQUAL("10", getContent(c->last.payload));
+ BOOST_CHECK(q->dispatch(c));
+ BOOST_CHECK_EQUAL(16, c->last.position);
+ BOOST_CHECK_EQUAL("16", getContent(c->last.payload));
+
+ // Using setPosition to trunkcate the queue
+ q->setPosition(5);
+ BOOST_CHECK_EQUAL(5, q->getMessageCount());
+ q->deliver(contentMessage("6a"));
+ c->setPosition(4);
+ BOOST_CHECK(q->dispatch(c));
+ BOOST_CHECK_EQUAL(5, c->last.position);
+ BOOST_CHECK_EQUAL("5", getContent(c->last.payload));
+ BOOST_CHECK(q->dispatch(c));
+ BOOST_CHECK_EQUAL(6, c->last.position);
+ BOOST_CHECK_EQUAL("6a", getContent(c->last.payload));
+ BOOST_CHECK(!q->dispatch(c)); // No more messages.
+}
+
+QPID_AUTO_TEST_CASE(testSetPositionLvq) {
+ Queue::shared_ptr q(new Queue("my-queue", true));
+ string key="key";
+ framing::FieldTable args;
+ args.setString("qpid.last_value_queue_key", "key");
+ q->configure(args);
+
+ const char* values[] = { "a", "b", "c", "a", "b", "c" };
+ for (size_t i = 0; i < sizeof(values)/sizeof(values[0]); ++i) {
+ intrusive_ptr<Message> m = contentMessage(boost::lexical_cast<string>(i+1));
+ m->insertCustomProperty(key, values[i]);
+ q->deliver(m);
+ }
+ BOOST_CHECK_EQUAL(3, q->getMessageCount());
+ // Verify the front of the queue
+ TestConsumer::shared_ptr c(new TestConsumer("test", false)); // Don't acquire
+ BOOST_CHECK(q->dispatch(c));
+ BOOST_CHECK_EQUAL(4, c->last.position); // Numbered from 1
+ BOOST_CHECK_EQUAL("4", getContent(c->last.payload));
+ // Verify the back of the queue
+ QueuedMessage qm;
+ BOOST_CHECK_EQUAL(6, q->getPosition());
+ BOOST_CHECK(q->find(q->getPosition(), qm)); // Back of the queue
+ BOOST_CHECK_EQUAL("6", getContent(qm.payload));
+
+ q->setPosition(5);
+ c->setPosition(4);
+ BOOST_CHECK(q->dispatch(c));
+ BOOST_CHECK_EQUAL(5, c->last.position); // Numbered from 1
+ BOOST_CHECK(!q->dispatch(c));
+}
+
+QPID_AUTO_TEST_CASE(testSetPositionPriority) {
+ Queue::shared_ptr q(new Queue("my-queue", true));
+ framing::FieldTable args;
+ args.setInt("qpid.priorities", 10);
+ q->configure(args);
+
+ const int priorities[] = { 1, 2, 3, 2, 1, 3 };
+ for (size_t i = 0; i < sizeof(priorities)/sizeof(priorities[0]); ++i) {
+ intrusive_ptr<Message> m = contentMessage(boost::lexical_cast<string>(i+1));
+ m->getFrames().getHeaders()->get<DeliveryProperties>(true)
+ ->setPriority(priorities[i]);
+ q->deliver(m);
+ }
+
+ // Truncation removes messages in fifo order, not priority order.
+ q->setPosition(3);
+ TestConsumer::shared_ptr c(new TestConsumer("test", false)); // Browse in FIFO order
+ BOOST_CHECK(q->dispatch(c));
+ BOOST_CHECK_EQUAL(1, c->last.position);
+ BOOST_CHECK(q->dispatch(c));
+ BOOST_CHECK_EQUAL(2, c->last.position);
+ BOOST_CHECK(q->dispatch(c));
+ BOOST_CHECK_EQUAL(3, c->last.position);
+ BOOST_CHECK(!q->dispatch(c));
+
+ intrusive_ptr<Message> m = contentMessage("4a");
+ m->getFrames().getHeaders()->get<DeliveryProperties>(true)
+ ->setPriority(4);
+ q->deliver(m);
+ BOOST_CHECK(q->dispatch(c));
+ BOOST_CHECK_EQUAL(4, c->last.position);
+ BOOST_CHECK_EQUAL("4a", getContent(c->last.payload));
+
+ // But consumers see priority order
+ c.reset(new TestConsumer("test", true));
+ BOOST_CHECK(q->dispatch(c));
+ BOOST_CHECK_EQUAL(4, c->last.position);
+ BOOST_CHECK_EQUAL("4a", getContent(c->last.payload));
+ BOOST_CHECK(q->dispatch(c));
+ BOOST_CHECK_EQUAL(3, c->last.position);
+ BOOST_CHECK_EQUAL("3", getContent(c->last.payload));
+ BOOST_CHECK(q->dispatch(c));
+ BOOST_CHECK_EQUAL(2, c->last.position);
+ BOOST_CHECK_EQUAL("2", getContent(c->last.payload));
+ BOOST_CHECK(q->dispatch(c));
+ BOOST_CHECK_EQUAL(1, c->last.position);
+ BOOST_CHECK_EQUAL("1", getContent(c->last.payload));
+}
QPID_AUTO_TEST_SUITE_END()