summaryrefslogtreecommitdiff
path: root/cpp/src/tests/ClientSessionTest.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/tests/ClientSessionTest.cpp')
-rw-r--r--cpp/src/tests/ClientSessionTest.cpp56
1 files changed, 15 insertions, 41 deletions
diff --git a/cpp/src/tests/ClientSessionTest.cpp b/cpp/src/tests/ClientSessionTest.cpp
index 440605a2e4..abe317aad8 100644
--- a/cpp/src/tests/ClientSessionTest.cpp
+++ b/cpp/src/tests/ClientSessionTest.cpp
@@ -20,8 +20,7 @@
*/
#include "unit_test.h"
#include "BrokerFixture.h"
-#include "qpid/client/AckPolicy.h"
-#include "qpid/client/Dispatcher.h"
+#include "qpid/client/SubscriptionManager.h"
#include "qpid/sys/Monitor.h"
#include "qpid/sys/Thread.h"
#include "qpid/sys/Runnable.h"
@@ -52,22 +51,22 @@ struct DummyListener : public sys::Runnable, public MessageListener {
std::vector<Message> messages;
string name;
uint expected;
- Dispatcher dispatcher;
+ SubscriptionManager submgr;
DummyListener(Session& session, const string& n, uint ex) :
- name(n), expected(ex), dispatcher(session) {}
+ name(n), expected(ex), submgr(session) {}
void run()
{
- dispatcher.listen(name, this);
- dispatcher.run();
+ submgr.subscribe(*this, name);
+ submgr.run();
}
void received(Message& msg)
{
messages.push_back(msg);
if (--expected == 0) {
- dispatcher.stop();
+ submgr.stop();
}
}
};
@@ -95,53 +94,30 @@ struct SimpleListener : public MessageListener
struct ClientSessionFixture : public ProxySessionFixture
{
- ClientSessionFixture(Broker::Options opts = Broker::Options()) : ProxySessionFixture(opts) {}
-
- void declareSubscribe(const string& q="my-queue",
- const string& dest="my-dest")
- {
- session.queueDeclare(arg::queue=q);
- session.messageSubscribe(arg::queue=q, arg::destination=dest, arg::acquireMode=1);
- session.messageFlow(arg::destination=dest, arg::unit=0, arg::value=0xFFFFFFFF);//messages
- session.messageFlow(arg::destination=dest, arg::unit=1, arg::value=0xFFFFFFFF);//bytes
+ ClientSessionFixture(Broker::Options opts = Broker::Options()) : ProxySessionFixture(opts) {
+ session.queueDeclare(arg::queue="my-queue");
}
};
QPID_AUTO_TEST_CASE(testQueueQuery) {
ClientSessionFixture fix;
fix.session = fix.connection.newSession();
- fix.session.queueDeclare(arg::queue="my-queue", arg::alternateExchange="amq.fanout", arg::exclusive=true, arg::autoDelete=true);
- QueueQueryResult result = fix.session.queueQuery(string("my-queue"));
+ fix.session.queueDeclare(arg::queue="q", arg::alternateExchange="amq.fanout",
+ arg::exclusive=true, arg::autoDelete=true);
+ QueueQueryResult result = fix.session.queueQuery("q");
BOOST_CHECK_EQUAL(false, result.getDurable());
BOOST_CHECK_EQUAL(true, result.getExclusive());
- BOOST_CHECK_EQUAL(string("amq.fanout"),
- result.getAlternateExchange());
-}
-
-QPID_AUTO_TEST_CASE(testTransfer)
-{
- ClientSessionFixture fix;
- fix.session=fix.connection.newSession();
- fix.declareSubscribe();
- fix.session.messageTransfer(arg::acceptMode=1, arg::content=TransferContent("my-message", "my-queue"));
- //get & test the message:
- FrameSet::shared_ptr msg = fix.session.get();
- BOOST_CHECK(msg->isA<MessageTransferBody>());
- BOOST_CHECK_EQUAL(string("my-message"), msg->getContent());
- //confirm receipt:
- AckPolicy autoAck;
- autoAck.ack(Message(*msg), fix.session);
+ BOOST_CHECK_EQUAL("amq.fanout", result.getAlternateExchange());
}
QPID_AUTO_TEST_CASE(testDispatcher)
{
ClientSessionFixture fix;
fix.session =fix.connection.newSession();
- fix.declareSubscribe();
size_t count = 100;
for (size_t i = 0; i < count; ++i)
fix.session.messageTransfer(arg::content=TransferContent(boost::lexical_cast<string>(i), "my-queue"));
- DummyListener listener(fix.session, "my-dest", count);
+ DummyListener listener(fix.session, "my-queue", count);
listener.run();
BOOST_CHECK_EQUAL(count, listener.messages.size());
for (size_t i = 0; i < count; ++i)
@@ -152,9 +128,8 @@ QPID_AUTO_TEST_CASE(testDispatcherThread)
{
ClientSessionFixture fix;
fix.session =fix.connection.newSession();
- fix.declareSubscribe();
size_t count = 10;
- DummyListener listener(fix.session, "my-dest", count);
+ DummyListener listener(fix.session, "my-queue", count);
sys::Thread t(listener);
for (size_t i = 0; i < count; ++i) {
fix.session.messageTransfer(arg::content=TransferContent(boost::lexical_cast<string>(i), "my-queue"));
@@ -190,7 +165,6 @@ QPID_AUTO_TEST_CASE_EXPECTED_FAILURES(testSuspendResume, 1)
{
ClientSessionFixture fix;
fix.session.timeout(60);
- fix.declareSubscribe();
fix.session.suspend();
// Make sure we are still subscribed after resume.
fix.connection.resume(fix.session);
@@ -234,7 +208,7 @@ QPID_AUTO_TEST_CASE(testLocalQueue) {
BOOST_CHECK_EQUAL("foo0", lq.pop().getData());
BOOST_CHECK_EQUAL("foo1", lq.pop().getData());
BOOST_CHECK(lq.empty()); // Credit exhausted.
- fix.subs.setFlowControl("lq", FlowControl::unlimited());
+ fix.subs.getSubscription("lq").setFlowControl(FlowControl::unlimited());
BOOST_CHECK_EQUAL("foo2", lq.pop().getData());
}