diff options
Diffstat (limited to 'qpid/cpp/src')
-rw-r--r-- | qpid/cpp/src/qpid/broker/QueuePolicy.cpp | 8 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/Connection.cpp | 1 | ||||
-rw-r--r-- | qpid/cpp/src/tests/cluster_test.cpp | 79 |
3 files changed, 86 insertions, 2 deletions
diff --git a/qpid/cpp/src/qpid/broker/QueuePolicy.cpp b/qpid/cpp/src/qpid/broker/QueuePolicy.cpp index c59736969f..43d914fc42 100644 --- a/qpid/cpp/src/qpid/broker/QueuePolicy.cpp +++ b/qpid/cpp/src/qpid/broker/QueuePolicy.cpp @@ -187,11 +187,17 @@ bool FlowToDiskPolicy::checkLimit(const QueuedMessage& m) RingQueuePolicy::RingQueuePolicy(uint32_t _maxCount, uint64_t _maxSize, const std::string& _type) : QueuePolicy(_maxCount, _maxSize, _type), strict(_type == RING_STRICT) {} +bool before(const QueuedMessage& a, const QueuedMessage& b) +{ + return a.position < b.position; +} + void RingQueuePolicy::enqueued(const QueuedMessage& m) { QueuePolicy::enqueued(m); qpid::sys::Mutex::ScopedLock l(lock); - queue.push_back(m); + //need to insert in correct location based on position + queue.insert(lower_bound(queue.begin(), queue.end(), m, before), m); } void RingQueuePolicy::dequeued(const QueuedMessage& m) diff --git a/qpid/cpp/src/qpid/cluster/Connection.cpp b/qpid/cpp/src/qpid/cluster/Connection.cpp index 77bfbe0c6d..f2904c6734 100644 --- a/qpid/cpp/src/qpid/cluster/Connection.cpp +++ b/qpid/cpp/src/qpid/cluster/Connection.cpp @@ -332,6 +332,7 @@ void Connection::deliveryRecord(const string& qname, if (!ended) { // Has a message if (acquired) { // Message is on the update queue m = getUpdateMessage(); + m.queue = queue.get(); queue->enqueued(m); //inform queue of the message } else { // Message at original position in original queue m = queue->find(position); diff --git a/qpid/cpp/src/tests/cluster_test.cpp b/qpid/cpp/src/tests/cluster_test.cpp index 611911d126..0c249bdf15 100644 --- a/qpid/cpp/src/tests/cluster_test.cpp +++ b/qpid/cpp/src/tests/cluster_test.cpp @@ -704,7 +704,7 @@ QPID_AUTO_TEST_CASE(queueDurabilityPropagationToNewbie) */ ClusterFixture::Args args; prepareArgs(args, durableFlag); - ClusterFixture cluster(1, args); + ClusterFixture cluster(1, args, -1); Client c0(cluster[0]); c0.session.queueDeclare("durable_queue", arg::durable=true); c0.session.queueDeclare("non_durable_queue", arg::durable=false); @@ -712,6 +712,8 @@ QPID_AUTO_TEST_CASE(queueDurabilityPropagationToNewbie) Client c1(cluster[1]); QueueQueryResult durable_query = c1.session.queueQuery ( "durable_queue" ); QueueQueryResult non_durable_query = c1.session.queueQuery ( "non_durable_queue" ); + BOOST_CHECK_EQUAL(durable_query.getQueue(), std::string("durable_queue")); + BOOST_CHECK_EQUAL(non_durable_query.getQueue(), std::string("non_durable_queue")); BOOST_CHECK_EQUAL ( durable_query.getDurable(), true ); BOOST_CHECK_EQUAL ( non_durable_query.getDurable(), false ); @@ -854,4 +856,79 @@ QPID_AUTO_TEST_CASE(testExclusiveQueueUpdate) { BOOST_CHECK_THROW(c2.session.queueDeclare(arg::queue="q", arg::passive=true), framing::NotFoundException); } +QPID_AUTO_TEST_CASE(testRingQueueUpdate) { + ScopedSuppressLogging allQuiet; + //tests that ring queues are accurately replicated on newly + //joined nodes + ClusterFixture::Args args; + args += "--log-enable", "critical"; + prepareArgs(args, durableFlag); + ClusterFixture cluster(1, args, -1); + Client c1(cluster[0], "c1"); + QueueOptions options; + options.setSizePolicy(RING, 0, 5); + c1.session.queueDeclare("q", arg::arguments=options, arg::durable=durableFlag); + for (int i = 0; i < 5; i++) { + c1.session.messageTransfer(arg::content=makeMessage((boost::format("%1%_%2%") % "m" % (i+1)).str(), "q", durableFlag)); + } + //receive but don't ack a message + LocalQueue lq; + SubscriptionSettings lqSettings(FlowControl::messageCredit(1)); + lqSettings.autoAck = 0; + Subscription lqSub = c1.subs.subscribe(lq, "q", lqSettings); + c1.session.messageFlush("q"); + + //add new node + cluster.add(); + + //send one more message + c1.session.messageTransfer(arg::content=makeMessage((boost::format("%1%_%2%") % "m" % 6).str(), "q", durableFlag)); + + c1.session.close(); + c1.connection.close(); + + //check state of queue on both nodes + vector<string> expected = list_of<string>("m_2")("m_3")("m_4")("m_5")("m_6"); + Client c3(cluster[0], "c3"); + BOOST_CHECK_EQUAL(browse(c3, "q", 5), expected); + Client c2(cluster[1], "c2"); + BOOST_CHECK_EQUAL(browse(c2, "q", 5), expected); +} + +QPID_AUTO_TEST_CASE(testRelease) { + ScopedSuppressLogging allQuiet; + //tests that releasing a messages that was unacked when one node + //joined works correctly + ClusterFixture::Args args; + args += "--log-enable", "critical"; + prepareArgs(args, durableFlag); + ClusterFixture cluster(1, args, -1); + Client c1(cluster[0], "c1"); + c1.session.queueDeclare("q", arg::durable=durableFlag); + for (int i = 0; i < 5; i++) { + c1.session.messageTransfer(arg::content=makeMessage((boost::format("%1%_%2%") % "m" % (i+1)).str(), "q", durableFlag)); + } + //receive but don't ack a message + LocalQueue lq; + SubscriptionSettings lqSettings(FlowControl::messageCredit(1)); + lqSettings.autoAck = 0; + Subscription lqSub = c1.subs.subscribe(lq, "q", lqSettings); + c1.session.messageFlush("q"); + Message received; + BOOST_CHECK(lq.get(received)); + BOOST_CHECK_EQUAL(received.getData(), std::string("m_1")); + + //add new node + cluster.add(); + + lqSub.release(lqSub.getUnaccepted()); + + //check state of queue on both nodes + vector<string> expected = list_of<string>("m_1")("m_2")("m_3")("m_4")("m_5"); + Client c3(cluster[0], "c3"); + BOOST_CHECK_EQUAL(browse(c3, "q", 5), expected); + Client c2(cluster[1], "c2"); + BOOST_CHECK_EQUAL(browse(c2, "q", 5), expected); +} + QPID_AUTO_TEST_SUITE_END() |