summaryrefslogtreecommitdiff
path: root/cpp/src/tests
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2008-10-25 01:55:06 +0000
committerAlan Conway <aconway@apache.org>2008-10-25 01:55:06 +0000
commit57bd5193208b228c1088586917d7f43f13e0dd9a (patch)
tree564d1aa0d13da985bd2159bbdd8d4b92be4016fb /cpp/src/tests
parentd1239516d2cd33ceb90be7a74bd5ea73825c577e (diff)
downloadqpid-python-57bd5193208b228c1088586917d7f43f13e0dd9a.tar.gz
Client API change: Centralize access to subscription status, better control of acquire/accept.
client/AckPolicy: removed, functionality moved to Subscription and SubscriptionSettings client/SubscriptionSettings: struct aggregates flow control & accept-acquire parameters for subscribe. client/Subscription: represents active subscription. Query settings, unacked messages, manual accept/acquire client/SubscriptionManager: use AcceptMode, AcquireMode enums rather than confusing bools. Issues addressed by the change: - old use of bool for acceptMode was inverted wrt AMQP enum values, bools are confusing. - old AckPolicy was broken - not possible to access the instance associated with an active subscription - old AckPolicy did not provide a way to do manual acquire, only accept. - setting values on SubscriptionManager to apply to subsequent subscriptions is awkward & error-prone, now can use SubscriptionSettings to control on each subscribe individually. - a subscription is a central concept in AMQP, it deserves to be a class. Subscription and SubscriptionSettings provides a single point for future expansion of interactions with a a Subscription. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@707808 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/tests')
-rw-r--r--cpp/src/tests/ClientSessionTest.cpp56
-rw-r--r--cpp/src/tests/QueuePolicyTest.cpp6
-rw-r--r--cpp/src/tests/XmlClientSessionTest.cpp26
-rw-r--r--cpp/src/tests/consume.cpp12
-rw-r--r--cpp/src/tests/echotest.cpp4
-rw-r--r--cpp/src/tests/latencytest.cpp11
-rw-r--r--cpp/src/tests/perftest.cpp13
-rw-r--r--cpp/src/tests/topic_listener.cpp14
-rw-r--r--cpp/src/tests/txtest.cpp18
9 files changed, 57 insertions, 103 deletions
diff --git a/cpp/src/tests/ClientSessionTest.cpp b/cpp/src/tests/ClientSessionTest.cpp
index 440605a2e4..abe317aad8 100644
--- a/cpp/src/tests/ClientSessionTest.cpp
+++ b/cpp/src/tests/ClientSessionTest.cpp
@@ -20,8 +20,7 @@
*/
#include "unit_test.h"
#include "BrokerFixture.h"
-#include "qpid/client/AckPolicy.h"
-#include "qpid/client/Dispatcher.h"
+#include "qpid/client/SubscriptionManager.h"
#include "qpid/sys/Monitor.h"
#include "qpid/sys/Thread.h"
#include "qpid/sys/Runnable.h"
@@ -52,22 +51,22 @@ struct DummyListener : public sys::Runnable, public MessageListener {
std::vector<Message> messages;
string name;
uint expected;
- Dispatcher dispatcher;
+ SubscriptionManager submgr;
DummyListener(Session& session, const string& n, uint ex) :
- name(n), expected(ex), dispatcher(session) {}
+ name(n), expected(ex), submgr(session) {}
void run()
{
- dispatcher.listen(name, this);
- dispatcher.run();
+ submgr.subscribe(*this, name);
+ submgr.run();
}
void received(Message& msg)
{
messages.push_back(msg);
if (--expected == 0) {
- dispatcher.stop();
+ submgr.stop();
}
}
};
@@ -95,53 +94,30 @@ struct SimpleListener : public MessageListener
struct ClientSessionFixture : public ProxySessionFixture
{
- ClientSessionFixture(Broker::Options opts = Broker::Options()) : ProxySessionFixture(opts) {}
-
- void declareSubscribe(const string& q="my-queue",
- const string& dest="my-dest")
- {
- session.queueDeclare(arg::queue=q);
- session.messageSubscribe(arg::queue=q, arg::destination=dest, arg::acquireMode=1);
- session.messageFlow(arg::destination=dest, arg::unit=0, arg::value=0xFFFFFFFF);//messages
- session.messageFlow(arg::destination=dest, arg::unit=1, arg::value=0xFFFFFFFF);//bytes
+ ClientSessionFixture(Broker::Options opts = Broker::Options()) : ProxySessionFixture(opts) {
+ session.queueDeclare(arg::queue="my-queue");
}
};
QPID_AUTO_TEST_CASE(testQueueQuery) {
ClientSessionFixture fix;
fix.session = fix.connection.newSession();
- fix.session.queueDeclare(arg::queue="my-queue", arg::alternateExchange="amq.fanout", arg::exclusive=true, arg::autoDelete=true);
- QueueQueryResult result = fix.session.queueQuery(string("my-queue"));
+ fix.session.queueDeclare(arg::queue="q", arg::alternateExchange="amq.fanout",
+ arg::exclusive=true, arg::autoDelete=true);
+ QueueQueryResult result = fix.session.queueQuery("q");
BOOST_CHECK_EQUAL(false, result.getDurable());
BOOST_CHECK_EQUAL(true, result.getExclusive());
- BOOST_CHECK_EQUAL(string("amq.fanout"),
- result.getAlternateExchange());
-}
-
-QPID_AUTO_TEST_CASE(testTransfer)
-{
- ClientSessionFixture fix;
- fix.session=fix.connection.newSession();
- fix.declareSubscribe();
- fix.session.messageTransfer(arg::acceptMode=1, arg::content=TransferContent("my-message", "my-queue"));
- //get & test the message:
- FrameSet::shared_ptr msg = fix.session.get();
- BOOST_CHECK(msg->isA<MessageTransferBody>());
- BOOST_CHECK_EQUAL(string("my-message"), msg->getContent());
- //confirm receipt:
- AckPolicy autoAck;
- autoAck.ack(Message(*msg), fix.session);
+ BOOST_CHECK_EQUAL("amq.fanout", result.getAlternateExchange());
}
QPID_AUTO_TEST_CASE(testDispatcher)
{
ClientSessionFixture fix;
fix.session =fix.connection.newSession();
- fix.declareSubscribe();
size_t count = 100;
for (size_t i = 0; i < count; ++i)
fix.session.messageTransfer(arg::content=TransferContent(boost::lexical_cast<string>(i), "my-queue"));
- DummyListener listener(fix.session, "my-dest", count);
+ DummyListener listener(fix.session, "my-queue", count);
listener.run();
BOOST_CHECK_EQUAL(count, listener.messages.size());
for (size_t i = 0; i < count; ++i)
@@ -152,9 +128,8 @@ QPID_AUTO_TEST_CASE(testDispatcherThread)
{
ClientSessionFixture fix;
fix.session =fix.connection.newSession();
- fix.declareSubscribe();
size_t count = 10;
- DummyListener listener(fix.session, "my-dest", count);
+ DummyListener listener(fix.session, "my-queue", count);
sys::Thread t(listener);
for (size_t i = 0; i < count; ++i) {
fix.session.messageTransfer(arg::content=TransferContent(boost::lexical_cast<string>(i), "my-queue"));
@@ -190,7 +165,6 @@ QPID_AUTO_TEST_CASE_EXPECTED_FAILURES(testSuspendResume, 1)
{
ClientSessionFixture fix;
fix.session.timeout(60);
- fix.declareSubscribe();
fix.session.suspend();
// Make sure we are still subscribed after resume.
fix.connection.resume(fix.session);
@@ -234,7 +208,7 @@ QPID_AUTO_TEST_CASE(testLocalQueue) {
BOOST_CHECK_EQUAL("foo0", lq.pop().getData());
BOOST_CHECK_EQUAL("foo1", lq.pop().getData());
BOOST_CHECK(lq.empty()); // Credit exhausted.
- fix.subs.setFlowControl("lq", FlowControl::unlimited());
+ fix.subs.getSubscription("lq").setFlowControl(FlowControl::unlimited());
BOOST_CHECK_EQUAL("foo2", lq.pop().getData());
}
diff --git a/cpp/src/tests/QueuePolicyTest.cpp b/cpp/src/tests/QueuePolicyTest.cpp
index f7fe81a709..28f555cf6a 100644
--- a/cpp/src/tests/QueuePolicyTest.cpp
+++ b/cpp/src/tests/QueuePolicyTest.cpp
@@ -168,8 +168,10 @@ QPID_AUTO_TEST_CASE(testStrictRingPolicy)
ProxySessionFixture f;
std::string q("my-ring-queue");
f.session.queueDeclare(arg::queue=q, arg::exclusive=true, arg::autoDelete=true, arg::arguments=args);
- LocalQueue incoming(AckPolicy(0));//no automatic acknowledgements
- f.subs.subscribe(incoming, q);
+ LocalQueue incoming;
+ SubscriptionSettings settings(FlowControl::unlimited());
+ settings.autoAck = 0; // no auto ack.
+ Subscription sub = f.subs.subscribe(incoming, q, settings);
for (int i = 0; i < 5; i++) {
f.session.messageTransfer(arg::content=client::Message((boost::format("%1%_%2%") % "Message" % (i+1)).str(), q));
}
diff --git a/cpp/src/tests/XmlClientSessionTest.cpp b/cpp/src/tests/XmlClientSessionTest.cpp
index 534ecf70f2..98558f0a76 100644
--- a/cpp/src/tests/XmlClientSessionTest.cpp
+++ b/cpp/src/tests/XmlClientSessionTest.cpp
@@ -29,7 +29,7 @@
#include "qpid/framing/TransferContent.h"
#include "qpid/framing/reply_exceptions.h"
#include "qpid/client/Connection.h"
-#include "qpid/client/Dispatcher.h"
+#include "qpid/client/SubscriptionManager.h"
#include "qpid/client/LocalQueue.h"
#include "qpid/client/Session.h"
#include "qpid/client/SubscriptionManager.h"
@@ -54,30 +54,6 @@ using std::endl;
Shlib shlib("../.libs/xml.so");
-struct DummyListener : public sys::Runnable, public MessageListener {
- std::vector<Message> messages;
- string name;
- uint expected;
- Dispatcher dispatcher;
-
- DummyListener(Session& session, const string& n, uint ex) :
- name(n), expected(ex), dispatcher(session) {}
-
- void run()
- {
- dispatcher.listen(name, this);
- dispatcher.run();
- }
-
- void received(Message& msg)
- {
- messages.push_back(msg);
- if (--expected == 0)
- dispatcher.stop();
- }
-};
-
-
class SubscribedLocalQueue : public LocalQueue {
private:
SubscriptionManager& subscriptions;
diff --git a/cpp/src/tests/consume.cpp b/cpp/src/tests/consume.cpp
index 29c61ada1b..4d74b8ae57 100644
--- a/cpp/src/tests/consume.cpp
+++ b/cpp/src/tests/consume.cpp
@@ -75,11 +75,11 @@ struct Client
if (opts.declare)
session.queueDeclare(opts.queue);
SubscriptionManager subs(session);
- LocalQueue lq(AckPolicy(opts.ack));
- subs.setAcceptMode(opts.ack > 0 ? 0 : 1);
- subs.setFlowControl(opts.count, SubscriptionManager::UNLIMITED,
- false);
- subs.subscribe(lq, opts.queue);
+ LocalQueue lq;
+ SubscriptionSettings settings;
+ settings.acceptMode = opts.ack > 0 ? ACCEPT_MODE_EXPLICIT : ACCEPT_MODE_NONE;
+ settings.flowControl = FlowControl(opts.count, SubscriptionManager::UNLIMITED,false);
+ Subscription sub = subs.subscribe(lq, opts.queue, settings);
Message msg;
AbsTime begin=now();
for (size_t i = 0; i < opts.count; ++i) {
@@ -87,7 +87,7 @@ struct Client
QPID_LOG(info, "Received: " << msg.getMessageProperties().getCorrelationId());
}
if (opts.ack != 0)
- subs.getAckPolicy().ackOutstanding(session); // Cumulative ack for final batch.
+ sub.accept(sub.getUnaccepted()); // Cumulative ack for final batch.
AbsTime end=now();
double secs(double(Duration(begin,end))/TIME_SEC);
if (opts.summary) cout << opts.count/secs << endl;
diff --git a/cpp/src/tests/echotest.cpp b/cpp/src/tests/echotest.cpp
index a57e2de5ad..7cbf3e7df4 100644
--- a/cpp/src/tests/echotest.cpp
+++ b/cpp/src/tests/echotest.cpp
@@ -92,9 +92,7 @@ void Listener::start(uint size)
{
session.queueDeclare(arg::queue=queue, arg::exclusive=true, arg::autoDelete=true);
request.getDeliveryProperties().setRoutingKey(queue);
- subscriptions.setAcceptMode(1/*not-required*/);
- subscriptions.setFlowControl(SubscriptionManager::UNLIMITED, SubscriptionManager::UNLIMITED, false);
- subscriptions.subscribe(*this, queue);
+ subscriptions.subscribe(*this, queue, SubscriptionSettings(FlowControl::unlimited(), ACCEPT_MODE_NONE));
request.getDeliveryProperties().setTimestamp(current_time());
if (size) request.setData(std::string(size, 'X'));
diff --git a/cpp/src/tests/latencytest.cpp b/cpp/src/tests/latencytest.cpp
index 524870a0e8..a980a43322 100644
--- a/cpp/src/tests/latencytest.cpp
+++ b/cpp/src/tests/latencytest.cpp
@@ -204,14 +204,15 @@ Receiver::Receiver(const string& q, Stats& s) : Client(q), mgr(session), count(0
std::cout << "Warning: found " << msgCount << " msgs on " << queue << ". Purging..." << std::endl;
session.queuePurge(arg::queue=queue);
}
+ SubscriptionSettings settings;
if (opts.prefetch) {
- mgr.setAckPolicy(AckPolicy(opts.ack ? opts.ack : (opts.prefetch / 2)));
- mgr.setFlowControl(opts.prefetch, SubscriptionManager::UNLIMITED, true);
+ settings.autoAck = (opts.ack ? opts.ack : (opts.prefetch / 2));
+ settings.flowControl = FlowControl::messageWindow(opts.prefetch);
} else {
- mgr.setAcceptMode(1/*not-required*/);
- mgr.setFlowControl(SubscriptionManager::UNLIMITED, SubscriptionManager::UNLIMITED, false);
+ settings.acceptMode = ACCEPT_MODE_NONE;
+ settings.flowControl = FlowControl::unlimited();
}
- mgr.subscribe(*this, queue);
+ mgr.subscribe(*this, queue, settings);
}
void Receiver::test()
diff --git a/cpp/src/tests/perftest.cpp b/cpp/src/tests/perftest.cpp
index 2e15489525..eb7235a09b 100644
--- a/cpp/src/tests/perftest.cpp
+++ b/cpp/src/tests/perftest.cpp
@@ -560,11 +560,12 @@ struct SubscribeThread : public Client {
try {
if (opts.txSub) sync(session).txSelect();
SubscriptionManager subs(session);
- LocalQueue lq(AckPolicy(opts.txSub ? opts.txSub : opts.ack));
- subs.setAcceptMode(opts.txSub || opts.ack ? 0 : 1);
- subs.setFlowControl(opts.subQuota, SubscriptionManager::UNLIMITED,
- false);
- subs.subscribe(lq, queue);
+ SubscriptionSettings settings;
+ settings.autoAck = opts.txSub ? opts.txSub : opts.ack;
+ settings.acceptMode = (opts.txSub || opts.ack ? ACCEPT_MODE_NONE : ACCEPT_MODE_EXPLICIT);
+ settings.flowControl = FlowControl::messageCredit(opts.subQuota);
+ LocalQueue lq;
+ Subscription subscription = subs.subscribe(lq, queue, settings);
// Notify controller we are ready.
session.messageTransfer(arg::content=Message("ready", "sub_ready"), arg::acceptMode=1);
if (opts.txSub) {
@@ -603,7 +604,7 @@ struct SubscribeThread : public Client {
}
}
if (opts.txSub || opts.ack)
- lq.getAckPolicy().ackOutstanding(session); // Cumulative ack for final batch.
+ subscription.accept(subscription.getUnaccepted());
if (opts.txSub) {
if (opts.commitAsync) session.txCommit();
else sync(session).txCommit();
diff --git a/cpp/src/tests/topic_listener.cpp b/cpp/src/tests/topic_listener.cpp
index 0da26fa2c4..7bdc2c32de 100644
--- a/cpp/src/tests/topic_listener.cpp
+++ b/cpp/src/tests/topic_listener.cpp
@@ -66,6 +66,7 @@ class Listener : public MessageListener{
public:
Listener(const Session& session, SubscriptionManager& mgr, const string& reponseQueue, bool tx);
virtual void received(Message& msg);
+ Subscription subscription;
};
/**
@@ -118,14 +119,15 @@ int main(int argc, char** argv){
//set up listener
SubscriptionManager mgr(session);
Listener listener(session, mgr, "response", args.transactional);
+ SubscriptionSettings settings;
if (args.prefetch) {
- mgr.setAckPolicy(AckPolicy(args.ack ? args.ack : (args.prefetch / 2)));
- mgr.setFlowControl(args.prefetch, SubscriptionManager::UNLIMITED, true);
+ settings.autoAck = (args.ack ? args.ack : (args.prefetch / 2));
+ settings.flowControl = FlowControl::messageCredit(args.prefetch);
} else {
- mgr.setAcceptMode(1/*-not-required*/);
- mgr.setFlowControl(SubscriptionManager::UNLIMITED, SubscriptionManager::UNLIMITED, false);
+ settings.acceptMode = ACCEPT_MODE_NONE;
+ settings.flowControl = FlowControl::unlimited();
}
- mgr.subscribe(listener, control);
+ listener.subscription = mgr.subscribe(listener, control, settings);
session.sync();
if( args.statusqueue.length() > 0 ) {
@@ -170,7 +172,7 @@ void Listener::received(Message& message){
if(string("TERMINATION_REQUEST") == type){
shutdown();
}else if(string("REPORT_REQUEST") == type){
- mgr.getAckPolicy().ackOutstanding(session);//acknowledge everything upto this point
+ subscription.accept(subscription.getUnaccepted()); // Accept everything upto this point
cout <<"Batch ended, sending report." << endl;
//send a report:
report();
diff --git a/cpp/src/tests/txtest.cpp b/cpp/src/tests/txtest.cpp
index c285ff9fcc..9d253ddb7f 100644
--- a/cpp/src/tests/txtest.cpp
+++ b/cpp/src/tests/txtest.cpp
@@ -139,9 +139,10 @@ struct Transfer : public Client, public Runnable
else session.txSelect();
SubscriptionManager subs(session);
- LocalQueue lq(AckPolicy(0));//manual acking
- subs.setFlowControl(opts.msgsPerTx, SubscriptionManager::UNLIMITED, true);
- subs.subscribe(lq, src);
+ LocalQueue lq;
+ SubscriptionSettings settings(FlowControl::messageWindow(opts.msgsPerTx));
+ settings.autoAck = 0; // Disabled
+ Subscription sub = subs.subscribe(lq, src, settings);
for (uint t = 0; t < opts.txCount; t++) {
Message in;
@@ -157,7 +158,7 @@ struct Transfer : public Client, public Runnable
out.getDeliveryProperties().setDeliveryMode(in.getDeliveryProperties().getDeliveryMode());
session.messageTransfer(arg::content=out, arg::acceptMode=1);
}
- lq.getAckPolicy().ackOutstanding(session);
+ sub.accept(sub.getUnaccepted());
if (opts.dtx) {
session.dtxEnd(arg::xid=xid);
session.dtxPrepare(arg::xid=xid);
@@ -230,8 +231,6 @@ struct Controller : public Client
int check()
{
SubscriptionManager subs(session);
- subs.setFlowControl(SubscriptionManager::UNLIMITED, SubscriptionManager::UNLIMITED, false);
- subs.setAcceptMode(1/*not-required*/);
// Recover DTX transactions (if any)
if (opts.dtx) {
@@ -262,9 +261,10 @@ struct Controller : public Client
StringSet drained;
//drain each queue and verify the correct set of messages are available
for (StringSet::iterator i = queues.begin(); i != queues.end(); i++) {
- //subscribe, allocate credit and flush
- LocalQueue lq(AckPolicy(0));//manual acking
- subs.subscribe(lq, *i, *i);
+ //subscribe, allocate credit and flushn
+ LocalQueue lq;
+ SubscriptionSettings settings(FlowControl::unlimited(), ACCEPT_MODE_NONE);
+ subs.subscribe(lq, *i, settings);
session.messageFlush(arg::destination=*i);
session.sync();