diff options
Diffstat (limited to 'qpid/cpp/src/tests')
-rw-r--r-- | qpid/cpp/src/tests/ConnectionOptions.h | 1 | ||||
-rw-r--r-- | qpid/cpp/src/tests/Makefile.am | 1 | ||||
-rw-r--r-- | qpid/cpp/src/tests/TxAckTest.cpp | 95 | ||||
-rw-r--r-- | qpid/cpp/src/tests/perftest.cpp | 29 |
4 files changed, 20 insertions, 106 deletions
diff --git a/qpid/cpp/src/tests/ConnectionOptions.h b/qpid/cpp/src/tests/ConnectionOptions.h index b4b562fbb9..4e0af7352f 100644 --- a/qpid/cpp/src/tests/ConnectionOptions.h +++ b/qpid/cpp/src/tests/ConnectionOptions.h @@ -38,7 +38,6 @@ struct ConnectionOptions : public qpid::Options, ("broker,b", optValue(host, "HOST"), "Broker host to connect to") ("port,p", optValue(port, "PORT"), "Broker port to connect to") ("virtualhost,v", optValue(virtualhost, "VHOST"), "virtual host") - ("clientname,n", optValue(clientid, "ID"), "unique client identifier") ("username", optValue(username, "USER"), "user name for broker log in.") ("password", optValue(password, "PASSWORD"), "password for broker log in.") ("mechanism", optValue(mechanism, "MECH"), "SASL mechanism to use when authenticating.") diff --git a/qpid/cpp/src/tests/Makefile.am b/qpid/cpp/src/tests/Makefile.am index 0e3cba30bb..735d85b419 100644 --- a/qpid/cpp/src/tests/Makefile.am +++ b/qpid/cpp/src/tests/Makefile.am @@ -59,7 +59,6 @@ unit_test_SOURCES= unit_test.cpp unit_test.h \ SequenceNumberTest.cpp \ TimerTest.cpp \ TopicExchangeTest.cpp \ - TxAckTest.cpp \ TxBufferTest.cpp \ TxPublishTest.cpp \ MessageBuilderTest.cpp \ diff --git a/qpid/cpp/src/tests/TxAckTest.cpp b/qpid/cpp/src/tests/TxAckTest.cpp deleted file mode 100644 index d330942ced..0000000000 --- a/qpid/cpp/src/tests/TxAckTest.cpp +++ /dev/null @@ -1,95 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include "MessageUtils.h" -#include "qpid/broker/NullMessageStore.h" -#include "qpid/broker/RecoveryManager.h" -#include "qpid/broker/TxAck.h" -#include "TestMessageStore.h" -#include "unit_test.h" -#include <iostream> -#include <list> -#include <vector> - -using std::list; -using std::vector; -using boost::intrusive_ptr; -using namespace qpid; -using namespace qpid::broker; -using namespace qpid::framing; - -QPID_AUTO_TEST_SUITE(TxAckTestSuite) - -struct TxAckTest -{ - AccumulatedAck acked; - TestMessageStore store; - Queue::shared_ptr queue; - vector<intrusive_ptr<Message> > messages; - list<DeliveryRecord> deliveries; - TxAck op; - - TxAckTest() : acked(0), queue(new Queue("my_queue", false, &store, 0)), op(acked, deliveries) - { - for(int i = 0; i < 10; i++){ - intrusive_ptr<Message> msg(MessageUtils::createMessage("exchange", "routing_key")); - msg->getProperties<DeliveryProperties>()->setDeliveryMode(PERSISTENT); - messages.push_back(msg); - QueuedMessage qm(queue.get()); - qm.payload = msg; - deliveries.push_back(DeliveryRecord(qm, queue, "xyz", DeliveryToken::shared_ptr(), (i+1), true)); - } - - //assume msgs 1-5, 7 and 9 are all acked (i.e. 6, 8 & 10 are not) - acked.mark = 5; - acked.update(7, 7); - acked.update(9, 9); - } - -}; - -QPID_AUTO_TEST_CASE(testPrepare) -{ - TxAckTest t; - - //ensure acked messages are discarded, i.e. dequeued from store - t.op.prepare(0); - BOOST_CHECK_EQUAL((size_t) 7, t.store.dequeued.size()); - BOOST_CHECK_EQUAL((size_t) 10, t.deliveries.size()); - int dequeued[] = {0, 1, 2, 3, 4, 6, 8}; - for (int i = 0; i < 7; i++) { - BOOST_CHECK_EQUAL(static_pointer_cast<PersistableMessage>(t.messages[dequeued[i]]), t.store.dequeued[i]); - } -} - -QPID_AUTO_TEST_CASE(testCommit) -{ - TxAckTest t; - - //ensure acked messages are removed from list - t.op.commit(); - BOOST_CHECK_EQUAL((size_t) 3, t.deliveries.size()); - list<DeliveryRecord>::iterator i = t.deliveries.begin(); - BOOST_CHECK(i->matches(6));//msg 6 - BOOST_CHECK((++i)->matches(8));//msg 8 - BOOST_CHECK((++i)->matches(10));//msg 10 -} - -QPID_AUTO_TEST_SUITE_END() diff --git a/qpid/cpp/src/tests/perftest.cpp b/qpid/cpp/src/tests/perftest.cpp index e0e947eb74..23e9e565e0 100644 --- a/qpid/cpp/src/tests/perftest.cpp +++ b/qpid/cpp/src/tests/perftest.cpp @@ -95,8 +95,9 @@ struct Opts : public TestOptions { size_t iterations; Mode mode; bool summary; - uint32_t intervalSub; - uint32_t intervalPub; + uint32_t intervalSub; + uint32_t intervalPub; + size_t tx; static const std::string helpText; @@ -106,7 +107,7 @@ struct Opts : public TestOptions { pubs(1), count(500000), size(1024), confirm(true), durable(false), uniqueData(false), syncPub(false), subs(1), ack(0), qt(1), iterations(1), mode(SHARED), summary(false), - intervalSub(0), intervalPub(0) + intervalSub(0), intervalPub(0), tx(0) { addOptions() ("setup", optValue(setup), "Create shared queues.") @@ -140,7 +141,9 @@ struct Opts : public TestOptions { ("queue-durable", optValue(queueDurable, "N"), "Make queue durable (implied if durable set)") ("interval_sub", optValue(intervalSub, "ms"), ">=0 delay between msg consume") - ("interval_pub", optValue(intervalPub, "ms"), ">=0 delay between msg publish"); + ("interval_pub", optValue(intervalPub, "ms"), ">=0 delay between msg publish") + + ("tx", optValue(tx, "N"), "if non-zero, the transaction batch size"); } // Computed values @@ -450,6 +453,7 @@ struct PublishThread : public Client { msg.getDeliveryProperties().setDeliveryMode(framing::PERSISTENT); + if (opts.tx) sync(session).txSelect(); SubscriptionManager subs(session); LocalQueue lq; subs.setFlowControl(1, SubscriptionManager::UNLIMITED, true); @@ -474,6 +478,7 @@ struct PublishThread : public Client { arg::content=msg, arg::acceptMode=1); } + if (opts.tx && (j % opts.tx == 0)) sync(session).txCommit(); if (opts.intervalPub) ::usleep(opts.intervalPub*1000); } if (opts.confirm) session.sync(); @@ -483,6 +488,7 @@ struct PublishThread : public Client { // Send result to controller. Message report(lexical_cast<string>(opts.count/time), "pub_done"); session.messageTransfer(arg::content=report, arg::acceptMode=1); + if (opts.tx) sync(session).txCommit(); } session.close(); } @@ -523,16 +529,17 @@ struct SubscribeThread : public Client { } void run() { // Subscribe - try { + try { + if (opts.tx) sync(session).txSelect(); SubscriptionManager subs(session); - LocalQueue lq(AckPolicy(opts.ack)); - subs.setAcceptMode(opts.ack > 0 ? 0 : 1); + LocalQueue lq(AckPolicy(opts.tx ? opts.tx : opts.ack)); + subs.setAcceptMode(opts.tx || opts.ack ? 0 : 1); subs.setFlowControl(opts.subQuota, SubscriptionManager::UNLIMITED, false); subs.subscribe(lq, queue); // Notify controller we are ready. session.messageTransfer(arg::content=Message("ready", "sub_ready"), arg::acceptMode=1); - + if (opts.tx) sync(session).txCommit(); for (size_t j = 0; j < opts.iterations; ++j) { if (j > 0) { @@ -544,6 +551,7 @@ struct SubscribeThread : public Client { size_t expect=0; for (size_t i = 0; i < opts.subQuota; ++i) { msg=lq.pop(); + if (opts.tx && (i % opts.tx == 0)) sync(session).txCommit(); if (opts.intervalSub) ::usleep(opts.intervalSub*1000); // TODO aconway 2007-11-23: check message order for. // multiple publishers. Need an array of counters, @@ -560,14 +568,17 @@ struct SubscribeThread : public Client { expect = n+1; } } - if (opts.ack !=0) + if (opts.ack) subs.getAckPolicy().ackOutstanding(session); // Cumulative ack for final batch. + if (opts.tx) + sync(session).txCommit(); AbsTime end=now(); // Report to publisher. Message result(lexical_cast<string>(opts.subQuota/secs(start,end)), "sub_done"); session.messageTransfer(arg::content=result, arg::acceptMode=1); + if (opts.tx) sync(session).txCommit(); } session.close(); } |