diff options
Diffstat (limited to 'cpp/src/tests/ClientSessionTest.cpp')
-rw-r--r-- | cpp/src/tests/ClientSessionTest.cpp | 204 |
1 files changed, 98 insertions, 106 deletions
diff --git a/cpp/src/tests/ClientSessionTest.cpp b/cpp/src/tests/ClientSessionTest.cpp index 82db7b9545..320d3afd27 100644 --- a/cpp/src/tests/ClientSessionTest.cpp +++ b/cpp/src/tests/ClientSessionTest.cpp @@ -18,35 +18,42 @@ * under the License. * */ -#include "qpid_test_plugin.h" +#include "unit_test.h" #include "BrokerFixture.h" #include "qpid/client/Dispatcher.h" +#include "qpid/sys/Thread.h" +#include "qpid/sys/Runnable.h" #include "qpid/client/Session_0_10.h" #include "qpid/framing/TransferContent.h" #include "qpid/framing/reply_exceptions.h" #include <boost/optional.hpp> +#include <boost/lexical_cast.hpp> -#include <list> +#include <vector> + +QPID_AUTO_TEST_SUITE(ClientSessionTest) using namespace qpid::client; using namespace qpid::client::arg; using namespace qpid::framing; using namespace qpid; +using std::string; +using std::cout; +using std::endl; using namespace boost; -struct DummyListener : public MessageListener -{ - std::list<Message> messages; - std::string name; + +struct DummyListener : public sys::Runnable, public MessageListener { + std::vector<Message> messages; + string name; uint expected; - uint count; Dispatcher dispatcher; - DummyListener(Session_0_10& session, const std::string& _name, uint _expected) : name(_name), expected(_expected), count(0), - dispatcher(session) {} + DummyListener(Session_0_10& session, const string& n, uint ex) : + name(n), expected(ex), dispatcher(session) {} - void listen() + void run() { dispatcher.listen(name, this); dispatcher.run(); @@ -55,117 +62,102 @@ struct DummyListener : public MessageListener void received(Message& msg) { messages.push_back(msg); - if (++count == expected) { + if (--expected == 0) dispatcher.stop(); - } } }; -class ClientSessionTest : public CppUnit::TestCase, public ProxySessionFixture +struct ClientSessionFixture : public ProxySessionFixture { - CPPUNIT_TEST_SUITE(ClientSessionTest); - CPPUNIT_TEST(testQueueQuery); - CPPUNIT_TEST(testTransfer); - CPPUNIT_TEST(testDispatcher); - CPPUNIT_TEST(testResumeExpiredError); - CPPUNIT_TEST(testUseSuspendedError); - CPPUNIT_TEST(testSuspendResume); - CPPUNIT_TEST_SUITE_END(); - - public: - - void declareSubscribe(const std::string& q="my-queue", - const std::string& dest="my-dest") + void declareSubscribe(const string& q="my-queue", + const string& dest="my-dest") { session.queueDeclare(queue=q); session.messageSubscribe(queue=q, destination=dest, acquireMode=1); session.messageFlow(destination=dest, unit=0, value=0xFFFFFFFF);//messages session.messageFlow(destination=dest, unit=1, value=0xFFFFFFFF);//bytes } +}; - void testQueueQuery() - { - session =connection.newSession(); - session.queueDeclare(queue="my-queue", alternateExchange="amq.fanout", exclusive=true, autoDelete=true); - TypedResult<QueueQueryResult> result = session.queueQuery(std::string("my-queue")); - CPPUNIT_ASSERT_EQUAL(false, result.get().getDurable()); - CPPUNIT_ASSERT_EQUAL(true, result.get().getExclusive()); - CPPUNIT_ASSERT_EQUAL(std::string("amq.fanout"), - result.get().getAlternateExchange()); - } - - void testTransfer() - { - session =connection.newSession(); - declareSubscribe(); - session.messageTransfer(content=TransferContent("my-message", "my-queue")); - //get & test the message: - FrameSet::shared_ptr msg = session.get(); - CPPUNIT_ASSERT(msg->isA<MessageTransferBody>()); - CPPUNIT_ASSERT_EQUAL(std::string("my-message"), msg->getContent()); - //confirm receipt: - session.getExecution().completed(msg->getId(), true, true); - } - - void testDispatcher() - { - session =connection.newSession(); - declareSubscribe(); +BOOST_FIXTURE_TEST_CASE(testQueueQuery, ClientSessionFixture) { + session =connection.newSession(); + session.queueDeclare(queue="my-queue", alternateExchange="amq.fanout", exclusive=true, autoDelete=true); + TypedResult<QueueQueryResult> result = session.queueQuery(string("my-queue")); + BOOST_CHECK_EQUAL(false, result.get().getDurable()); + BOOST_CHECK_EQUAL(true, result.get().getExclusive()); + BOOST_CHECK_EQUAL(string("amq.fanout"), + result.get().getAlternateExchange()); +} + +BOOST_FIXTURE_TEST_CASE(testTransfer, ClientSessionFixture) +{ + session=connection.newSession(); + declareSubscribe(); + session.messageTransfer(content=TransferContent("my-message", "my-queue")); + //get & test the message: + FrameSet::shared_ptr msg = session.get(); + BOOST_CHECK(msg->isA<MessageTransferBody>()); + BOOST_CHECK_EQUAL(string("my-message"), msg->getContent()); + //confirm receipt: + session.getExecution().completed(msg->getId(), true, true); +} + +BOOST_FIXTURE_TEST_CASE(testDispatcher, ClientSessionFixture) +{ + session =connection.newSession(); + declareSubscribe(); - TransferContent msg1("One"); - msg1.getDeliveryProperties().setRoutingKey("my-queue"); - session.messageTransfer(content=msg1); + TransferContent msg1("One"); + msg1.getDeliveryProperties().setRoutingKey("my-queue"); + session.messageTransfer(content=msg1); - TransferContent msg2("Two"); - msg2.getDeliveryProperties().setRoutingKey("my-queue"); - session.messageTransfer(content=msg2); + TransferContent msg2("Two"); + msg2.getDeliveryProperties().setRoutingKey("my-queue"); + session.messageTransfer(content=msg2); - TransferContent msg3("Three"); - msg3.getDeliveryProperties().setRoutingKey("my-queue"); - session.messageTransfer(content=msg3); + TransferContent msg3("Three"); + msg3.getDeliveryProperties().setRoutingKey("my-queue"); + session.messageTransfer(content=msg3); - DummyListener listener(session, "my-dest", 3); - listener.listen(); - CPPUNIT_ASSERT_EQUAL((size_t) 3, listener.messages.size()); - CPPUNIT_ASSERT_EQUAL(std::string("One"), listener.messages.front().getData()); - listener.messages.pop_front(); - CPPUNIT_ASSERT_EQUAL(std::string("Two"), listener.messages.front().getData()); - listener.messages.pop_front(); - CPPUNIT_ASSERT_EQUAL(std::string("Three"), listener.messages.front().getData()); - listener.messages.pop_front(); - - } - - void testResumeExpiredError() { - session =connection.newSession(0); - session.suspend(); // session has 0 timeout. - try { - connection.resume(session); - CPPUNIT_FAIL("Expected InvalidArgumentException."); - } catch(const InternalErrorException&) {} - } - - void testUseSuspendedError() { - session =connection.newSession(60); - session.suspend(); - try { - session.exchangeQuery(name="amq.fanout"); - CPPUNIT_FAIL("Expected session suspended exception"); - } catch(const CommandInvalidException&) {} - } - - void testSuspendResume() { - session =connection.newSession(60); - declareSubscribe(); - session.suspend(); - // Make sure we are still subscribed after resume. + DummyListener listener(session, "my-dest", 3); + listener.run(); + BOOST_CHECK_EQUAL((size_t) 3, listener.messages.size()); + BOOST_CHECK_EQUAL(std::string("One"), listener.messages[0].getData()); + BOOST_CHECK_EQUAL(std::string("Two"), listener.messages[1].getData()); + BOOST_CHECK_EQUAL(std::string("Three"), listener.messages[2].getData()); +} + +BOOST_FIXTURE_TEST_CASE(_FIXTURE, ClientSessionFixture) +{ + session =connection.newSession(0); + session.suspend(); // session has 0 timeout. + try { connection.resume(session); - session.messageTransfer(content=TransferContent("my-message", "my-queue")); - FrameSet::shared_ptr msg = session.get(); - CPPUNIT_ASSERT_EQUAL(string("my-message"), msg->getContent()); - } -}; + BOOST_FAIL("Expected InvalidArgumentException."); + } catch(const InternalErrorException&) {} +} + +BOOST_FIXTURE_TEST_CASE(testUseSuspendedError, ClientSessionFixture) +{ + session =connection.newSession(60); + session.suspend(); + try { + session.exchangeQuery(name="amq.fanout"); + BOOST_FAIL("Expected session suspended exception"); + } catch(const CommandInvalidException&) {} +} + +BOOST_FIXTURE_TEST_CASE(testSuspendResume, ClientSessionFixture) +{ + session =connection.newSession(60); + declareSubscribe(); + session.suspend(); + // Make sure we are still subscribed after resume. + connection.resume(session); + session.messageTransfer(content=TransferContent("my-message", "my-queue")); + FrameSet::shared_ptr msg = session.get(); + BOOST_CHECK_EQUAL(string("my-message"), msg->getContent()); +} + +QPID_AUTO_TEST_SUITE_END() -// Make this test suite a plugin. -CPPUNIT_PLUGIN_IMPLEMENT(); -CPPUNIT_TEST_SUITE_REGISTRATION(ClientSessionTest); |