diff options
author | Alan Conway <aconway@apache.org> | 2008-10-25 01:55:06 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2008-10-25 01:55:06 +0000 |
commit | 57bd5193208b228c1088586917d7f43f13e0dd9a (patch) | |
tree | 564d1aa0d13da985bd2159bbdd8d4b92be4016fb /cpp/src/tests | |
parent | d1239516d2cd33ceb90be7a74bd5ea73825c577e (diff) | |
download | qpid-python-57bd5193208b228c1088586917d7f43f13e0dd9a.tar.gz |
Client API change: Centralize access to subscription status, better control of acquire/accept.
client/AckPolicy: removed, functionality moved to Subscription and SubscriptionSettings
client/SubscriptionSettings: struct aggregates flow control & accept-acquire parameters for subscribe.
client/Subscription: represents active subscription. Query settings, unacked messages, manual accept/acquire
client/SubscriptionManager: use AcceptMode, AcquireMode enums rather than confusing bools.
Issues addressed by the change:
- old use of bool for acceptMode was inverted wrt AMQP enum values, bools are confusing.
- old AckPolicy was broken - not possible to access the instance associated with an active subscription
- old AckPolicy did not provide a way to do manual acquire, only accept.
- setting values on SubscriptionManager to apply to subsequent subscriptions is awkward & error-prone, now can use SubscriptionSettings to control on each subscribe individually.
- a subscription is a central concept in AMQP, it deserves to be a class. Subscription and SubscriptionSettings provides a single point for future expansion of interactions with a a Subscription.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@707808 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/tests')
-rw-r--r-- | cpp/src/tests/ClientSessionTest.cpp | 56 | ||||
-rw-r--r-- | cpp/src/tests/QueuePolicyTest.cpp | 6 | ||||
-rw-r--r-- | cpp/src/tests/XmlClientSessionTest.cpp | 26 | ||||
-rw-r--r-- | cpp/src/tests/consume.cpp | 12 | ||||
-rw-r--r-- | cpp/src/tests/echotest.cpp | 4 | ||||
-rw-r--r-- | cpp/src/tests/latencytest.cpp | 11 | ||||
-rw-r--r-- | cpp/src/tests/perftest.cpp | 13 | ||||
-rw-r--r-- | cpp/src/tests/topic_listener.cpp | 14 | ||||
-rw-r--r-- | cpp/src/tests/txtest.cpp | 18 |
9 files changed, 57 insertions, 103 deletions
diff --git a/cpp/src/tests/ClientSessionTest.cpp b/cpp/src/tests/ClientSessionTest.cpp index 440605a2e4..abe317aad8 100644 --- a/cpp/src/tests/ClientSessionTest.cpp +++ b/cpp/src/tests/ClientSessionTest.cpp @@ -20,8 +20,7 @@ */ #include "unit_test.h" #include "BrokerFixture.h" -#include "qpid/client/AckPolicy.h" -#include "qpid/client/Dispatcher.h" +#include "qpid/client/SubscriptionManager.h" #include "qpid/sys/Monitor.h" #include "qpid/sys/Thread.h" #include "qpid/sys/Runnable.h" @@ -52,22 +51,22 @@ struct DummyListener : public sys::Runnable, public MessageListener { std::vector<Message> messages; string name; uint expected; - Dispatcher dispatcher; + SubscriptionManager submgr; DummyListener(Session& session, const string& n, uint ex) : - name(n), expected(ex), dispatcher(session) {} + name(n), expected(ex), submgr(session) {} void run() { - dispatcher.listen(name, this); - dispatcher.run(); + submgr.subscribe(*this, name); + submgr.run(); } void received(Message& msg) { messages.push_back(msg); if (--expected == 0) { - dispatcher.stop(); + submgr.stop(); } } }; @@ -95,53 +94,30 @@ struct SimpleListener : public MessageListener struct ClientSessionFixture : public ProxySessionFixture { - ClientSessionFixture(Broker::Options opts = Broker::Options()) : ProxySessionFixture(opts) {} - - void declareSubscribe(const string& q="my-queue", - const string& dest="my-dest") - { - session.queueDeclare(arg::queue=q); - session.messageSubscribe(arg::queue=q, arg::destination=dest, arg::acquireMode=1); - session.messageFlow(arg::destination=dest, arg::unit=0, arg::value=0xFFFFFFFF);//messages - session.messageFlow(arg::destination=dest, arg::unit=1, arg::value=0xFFFFFFFF);//bytes + ClientSessionFixture(Broker::Options opts = Broker::Options()) : ProxySessionFixture(opts) { + session.queueDeclare(arg::queue="my-queue"); } }; QPID_AUTO_TEST_CASE(testQueueQuery) { ClientSessionFixture fix; fix.session = fix.connection.newSession(); - fix.session.queueDeclare(arg::queue="my-queue", arg::alternateExchange="amq.fanout", arg::exclusive=true, arg::autoDelete=true); - QueueQueryResult result = fix.session.queueQuery(string("my-queue")); + fix.session.queueDeclare(arg::queue="q", arg::alternateExchange="amq.fanout", + arg::exclusive=true, arg::autoDelete=true); + QueueQueryResult result = fix.session.queueQuery("q"); BOOST_CHECK_EQUAL(false, result.getDurable()); BOOST_CHECK_EQUAL(true, result.getExclusive()); - BOOST_CHECK_EQUAL(string("amq.fanout"), - result.getAlternateExchange()); -} - -QPID_AUTO_TEST_CASE(testTransfer) -{ - ClientSessionFixture fix; - fix.session=fix.connection.newSession(); - fix.declareSubscribe(); - fix.session.messageTransfer(arg::acceptMode=1, arg::content=TransferContent("my-message", "my-queue")); - //get & test the message: - FrameSet::shared_ptr msg = fix.session.get(); - BOOST_CHECK(msg->isA<MessageTransferBody>()); - BOOST_CHECK_EQUAL(string("my-message"), msg->getContent()); - //confirm receipt: - AckPolicy autoAck; - autoAck.ack(Message(*msg), fix.session); + BOOST_CHECK_EQUAL("amq.fanout", result.getAlternateExchange()); } QPID_AUTO_TEST_CASE(testDispatcher) { ClientSessionFixture fix; fix.session =fix.connection.newSession(); - fix.declareSubscribe(); size_t count = 100; for (size_t i = 0; i < count; ++i) fix.session.messageTransfer(arg::content=TransferContent(boost::lexical_cast<string>(i), "my-queue")); - DummyListener listener(fix.session, "my-dest", count); + DummyListener listener(fix.session, "my-queue", count); listener.run(); BOOST_CHECK_EQUAL(count, listener.messages.size()); for (size_t i = 0; i < count; ++i) @@ -152,9 +128,8 @@ QPID_AUTO_TEST_CASE(testDispatcherThread) { ClientSessionFixture fix; fix.session =fix.connection.newSession(); - fix.declareSubscribe(); size_t count = 10; - DummyListener listener(fix.session, "my-dest", count); + DummyListener listener(fix.session, "my-queue", count); sys::Thread t(listener); for (size_t i = 0; i < count; ++i) { fix.session.messageTransfer(arg::content=TransferContent(boost::lexical_cast<string>(i), "my-queue")); @@ -190,7 +165,6 @@ QPID_AUTO_TEST_CASE_EXPECTED_FAILURES(testSuspendResume, 1) { ClientSessionFixture fix; fix.session.timeout(60); - fix.declareSubscribe(); fix.session.suspend(); // Make sure we are still subscribed after resume. fix.connection.resume(fix.session); @@ -234,7 +208,7 @@ QPID_AUTO_TEST_CASE(testLocalQueue) { BOOST_CHECK_EQUAL("foo0", lq.pop().getData()); BOOST_CHECK_EQUAL("foo1", lq.pop().getData()); BOOST_CHECK(lq.empty()); // Credit exhausted. - fix.subs.setFlowControl("lq", FlowControl::unlimited()); + fix.subs.getSubscription("lq").setFlowControl(FlowControl::unlimited()); BOOST_CHECK_EQUAL("foo2", lq.pop().getData()); } diff --git a/cpp/src/tests/QueuePolicyTest.cpp b/cpp/src/tests/QueuePolicyTest.cpp index f7fe81a709..28f555cf6a 100644 --- a/cpp/src/tests/QueuePolicyTest.cpp +++ b/cpp/src/tests/QueuePolicyTest.cpp @@ -168,8 +168,10 @@ QPID_AUTO_TEST_CASE(testStrictRingPolicy) ProxySessionFixture f; std::string q("my-ring-queue"); f.session.queueDeclare(arg::queue=q, arg::exclusive=true, arg::autoDelete=true, arg::arguments=args); - LocalQueue incoming(AckPolicy(0));//no automatic acknowledgements - f.subs.subscribe(incoming, q); + LocalQueue incoming; + SubscriptionSettings settings(FlowControl::unlimited()); + settings.autoAck = 0; // no auto ack. + Subscription sub = f.subs.subscribe(incoming, q, settings); for (int i = 0; i < 5; i++) { f.session.messageTransfer(arg::content=client::Message((boost::format("%1%_%2%") % "Message" % (i+1)).str(), q)); } diff --git a/cpp/src/tests/XmlClientSessionTest.cpp b/cpp/src/tests/XmlClientSessionTest.cpp index 534ecf70f2..98558f0a76 100644 --- a/cpp/src/tests/XmlClientSessionTest.cpp +++ b/cpp/src/tests/XmlClientSessionTest.cpp @@ -29,7 +29,7 @@ #include "qpid/framing/TransferContent.h" #include "qpid/framing/reply_exceptions.h" #include "qpid/client/Connection.h" -#include "qpid/client/Dispatcher.h" +#include "qpid/client/SubscriptionManager.h" #include "qpid/client/LocalQueue.h" #include "qpid/client/Session.h" #include "qpid/client/SubscriptionManager.h" @@ -54,30 +54,6 @@ using std::endl; Shlib shlib("../.libs/xml.so"); -struct DummyListener : public sys::Runnable, public MessageListener { - std::vector<Message> messages; - string name; - uint expected; - Dispatcher dispatcher; - - DummyListener(Session& session, const string& n, uint ex) : - name(n), expected(ex), dispatcher(session) {} - - void run() - { - dispatcher.listen(name, this); - dispatcher.run(); - } - - void received(Message& msg) - { - messages.push_back(msg); - if (--expected == 0) - dispatcher.stop(); - } -}; - - class SubscribedLocalQueue : public LocalQueue { private: SubscriptionManager& subscriptions; diff --git a/cpp/src/tests/consume.cpp b/cpp/src/tests/consume.cpp index 29c61ada1b..4d74b8ae57 100644 --- a/cpp/src/tests/consume.cpp +++ b/cpp/src/tests/consume.cpp @@ -75,11 +75,11 @@ struct Client if (opts.declare) session.queueDeclare(opts.queue); SubscriptionManager subs(session); - LocalQueue lq(AckPolicy(opts.ack)); - subs.setAcceptMode(opts.ack > 0 ? 0 : 1); - subs.setFlowControl(opts.count, SubscriptionManager::UNLIMITED, - false); - subs.subscribe(lq, opts.queue); + LocalQueue lq; + SubscriptionSettings settings; + settings.acceptMode = opts.ack > 0 ? ACCEPT_MODE_EXPLICIT : ACCEPT_MODE_NONE; + settings.flowControl = FlowControl(opts.count, SubscriptionManager::UNLIMITED,false); + Subscription sub = subs.subscribe(lq, opts.queue, settings); Message msg; AbsTime begin=now(); for (size_t i = 0; i < opts.count; ++i) { @@ -87,7 +87,7 @@ struct Client QPID_LOG(info, "Received: " << msg.getMessageProperties().getCorrelationId()); } if (opts.ack != 0) - subs.getAckPolicy().ackOutstanding(session); // Cumulative ack for final batch. + sub.accept(sub.getUnaccepted()); // Cumulative ack for final batch. AbsTime end=now(); double secs(double(Duration(begin,end))/TIME_SEC); if (opts.summary) cout << opts.count/secs << endl; diff --git a/cpp/src/tests/echotest.cpp b/cpp/src/tests/echotest.cpp index a57e2de5ad..7cbf3e7df4 100644 --- a/cpp/src/tests/echotest.cpp +++ b/cpp/src/tests/echotest.cpp @@ -92,9 +92,7 @@ void Listener::start(uint size) { session.queueDeclare(arg::queue=queue, arg::exclusive=true, arg::autoDelete=true); request.getDeliveryProperties().setRoutingKey(queue); - subscriptions.setAcceptMode(1/*not-required*/); - subscriptions.setFlowControl(SubscriptionManager::UNLIMITED, SubscriptionManager::UNLIMITED, false); - subscriptions.subscribe(*this, queue); + subscriptions.subscribe(*this, queue, SubscriptionSettings(FlowControl::unlimited(), ACCEPT_MODE_NONE)); request.getDeliveryProperties().setTimestamp(current_time()); if (size) request.setData(std::string(size, 'X')); diff --git a/cpp/src/tests/latencytest.cpp b/cpp/src/tests/latencytest.cpp index 524870a0e8..a980a43322 100644 --- a/cpp/src/tests/latencytest.cpp +++ b/cpp/src/tests/latencytest.cpp @@ -204,14 +204,15 @@ Receiver::Receiver(const string& q, Stats& s) : Client(q), mgr(session), count(0 std::cout << "Warning: found " << msgCount << " msgs on " << queue << ". Purging..." << std::endl; session.queuePurge(arg::queue=queue); } + SubscriptionSettings settings; if (opts.prefetch) { - mgr.setAckPolicy(AckPolicy(opts.ack ? opts.ack : (opts.prefetch / 2))); - mgr.setFlowControl(opts.prefetch, SubscriptionManager::UNLIMITED, true); + settings.autoAck = (opts.ack ? opts.ack : (opts.prefetch / 2)); + settings.flowControl = FlowControl::messageWindow(opts.prefetch); } else { - mgr.setAcceptMode(1/*not-required*/); - mgr.setFlowControl(SubscriptionManager::UNLIMITED, SubscriptionManager::UNLIMITED, false); + settings.acceptMode = ACCEPT_MODE_NONE; + settings.flowControl = FlowControl::unlimited(); } - mgr.subscribe(*this, queue); + mgr.subscribe(*this, queue, settings); } void Receiver::test() diff --git a/cpp/src/tests/perftest.cpp b/cpp/src/tests/perftest.cpp index 2e15489525..eb7235a09b 100644 --- a/cpp/src/tests/perftest.cpp +++ b/cpp/src/tests/perftest.cpp @@ -560,11 +560,12 @@ struct SubscribeThread : public Client { try { if (opts.txSub) sync(session).txSelect(); SubscriptionManager subs(session); - LocalQueue lq(AckPolicy(opts.txSub ? opts.txSub : opts.ack)); - subs.setAcceptMode(opts.txSub || opts.ack ? 0 : 1); - subs.setFlowControl(opts.subQuota, SubscriptionManager::UNLIMITED, - false); - subs.subscribe(lq, queue); + SubscriptionSettings settings; + settings.autoAck = opts.txSub ? opts.txSub : opts.ack; + settings.acceptMode = (opts.txSub || opts.ack ? ACCEPT_MODE_NONE : ACCEPT_MODE_EXPLICIT); + settings.flowControl = FlowControl::messageCredit(opts.subQuota); + LocalQueue lq; + Subscription subscription = subs.subscribe(lq, queue, settings); // Notify controller we are ready. session.messageTransfer(arg::content=Message("ready", "sub_ready"), arg::acceptMode=1); if (opts.txSub) { @@ -603,7 +604,7 @@ struct SubscribeThread : public Client { } } if (opts.txSub || opts.ack) - lq.getAckPolicy().ackOutstanding(session); // Cumulative ack for final batch. + subscription.accept(subscription.getUnaccepted()); if (opts.txSub) { if (opts.commitAsync) session.txCommit(); else sync(session).txCommit(); diff --git a/cpp/src/tests/topic_listener.cpp b/cpp/src/tests/topic_listener.cpp index 0da26fa2c4..7bdc2c32de 100644 --- a/cpp/src/tests/topic_listener.cpp +++ b/cpp/src/tests/topic_listener.cpp @@ -66,6 +66,7 @@ class Listener : public MessageListener{ public: Listener(const Session& session, SubscriptionManager& mgr, const string& reponseQueue, bool tx); virtual void received(Message& msg); + Subscription subscription; }; /** @@ -118,14 +119,15 @@ int main(int argc, char** argv){ //set up listener SubscriptionManager mgr(session); Listener listener(session, mgr, "response", args.transactional); + SubscriptionSettings settings; if (args.prefetch) { - mgr.setAckPolicy(AckPolicy(args.ack ? args.ack : (args.prefetch / 2))); - mgr.setFlowControl(args.prefetch, SubscriptionManager::UNLIMITED, true); + settings.autoAck = (args.ack ? args.ack : (args.prefetch / 2)); + settings.flowControl = FlowControl::messageCredit(args.prefetch); } else { - mgr.setAcceptMode(1/*-not-required*/); - mgr.setFlowControl(SubscriptionManager::UNLIMITED, SubscriptionManager::UNLIMITED, false); + settings.acceptMode = ACCEPT_MODE_NONE; + settings.flowControl = FlowControl::unlimited(); } - mgr.subscribe(listener, control); + listener.subscription = mgr.subscribe(listener, control, settings); session.sync(); if( args.statusqueue.length() > 0 ) { @@ -170,7 +172,7 @@ void Listener::received(Message& message){ if(string("TERMINATION_REQUEST") == type){ shutdown(); }else if(string("REPORT_REQUEST") == type){ - mgr.getAckPolicy().ackOutstanding(session);//acknowledge everything upto this point + subscription.accept(subscription.getUnaccepted()); // Accept everything upto this point cout <<"Batch ended, sending report." << endl; //send a report: report(); diff --git a/cpp/src/tests/txtest.cpp b/cpp/src/tests/txtest.cpp index c285ff9fcc..9d253ddb7f 100644 --- a/cpp/src/tests/txtest.cpp +++ b/cpp/src/tests/txtest.cpp @@ -139,9 +139,10 @@ struct Transfer : public Client, public Runnable else session.txSelect(); SubscriptionManager subs(session); - LocalQueue lq(AckPolicy(0));//manual acking - subs.setFlowControl(opts.msgsPerTx, SubscriptionManager::UNLIMITED, true); - subs.subscribe(lq, src); + LocalQueue lq; + SubscriptionSettings settings(FlowControl::messageWindow(opts.msgsPerTx)); + settings.autoAck = 0; // Disabled + Subscription sub = subs.subscribe(lq, src, settings); for (uint t = 0; t < opts.txCount; t++) { Message in; @@ -157,7 +158,7 @@ struct Transfer : public Client, public Runnable out.getDeliveryProperties().setDeliveryMode(in.getDeliveryProperties().getDeliveryMode()); session.messageTransfer(arg::content=out, arg::acceptMode=1); } - lq.getAckPolicy().ackOutstanding(session); + sub.accept(sub.getUnaccepted()); if (opts.dtx) { session.dtxEnd(arg::xid=xid); session.dtxPrepare(arg::xid=xid); @@ -230,8 +231,6 @@ struct Controller : public Client int check() { SubscriptionManager subs(session); - subs.setFlowControl(SubscriptionManager::UNLIMITED, SubscriptionManager::UNLIMITED, false); - subs.setAcceptMode(1/*not-required*/); // Recover DTX transactions (if any) if (opts.dtx) { @@ -262,9 +261,10 @@ struct Controller : public Client StringSet drained; //drain each queue and verify the correct set of messages are available for (StringSet::iterator i = queues.begin(); i != queues.end(); i++) { - //subscribe, allocate credit and flush - LocalQueue lq(AckPolicy(0));//manual acking - subs.subscribe(lq, *i, *i); + //subscribe, allocate credit and flushn + LocalQueue lq; + SubscriptionSettings settings(FlowControl::unlimited(), ACCEPT_MODE_NONE); + subs.subscribe(lq, *i, settings); session.messageFlush(arg::destination=*i); session.sync(); |