diff options
Diffstat (limited to 'cpp/src/tests/ClientSessionTest.cpp')
-rw-r--r-- | cpp/src/tests/ClientSessionTest.cpp | 151 |
1 files changed, 105 insertions, 46 deletions
diff --git a/cpp/src/tests/ClientSessionTest.cpp b/cpp/src/tests/ClientSessionTest.cpp index 2495a06fa4..db2cd62b0a 100644 --- a/cpp/src/tests/ClientSessionTest.cpp +++ b/cpp/src/tests/ClientSessionTest.cpp @@ -18,15 +18,21 @@ * under the License. * */ -#include <list> #include "qpid_test_plugin.h" #include "InProcessBroker.h" #include "qpid/client/Dispatcher.h" #include "qpid/client/Session.h" #include "qpid/framing/TransferContent.h" +#include "qpid/framing/reply_exceptions.h" + +#include <boost/optional.hpp> + +#include <list> using namespace qpid::client; using namespace qpid::framing; +using namespace qpid; +using namespace boost; struct DummyListener : public MessageListener { @@ -60,58 +66,77 @@ class ClientSessionTest : public CppUnit::TestCase CPPUNIT_TEST(testQueueQuery); CPPUNIT_TEST(testTransfer); CPPUNIT_TEST(testDispatcher); + CPPUNIT_TEST(testResumeExpiredError); + CPPUNIT_TEST(testUseSuspendedError); CPPUNIT_TEST(testSuspendResume); - CPPUNIT_TEST(testSuspendResumeErrors); + CPPUNIT_TEST(testDisconnectResume); + CPPUNIT_TEST(testAutoDelete); CPPUNIT_TEST_SUITE_END(); - boost::shared_ptr<Connector> broker; - Connection connection; + shared_ptr<broker::Broker> broker; Session session; + // Defer construction & thread creation to setUp + boost::optional<InProcessConnection> c; + boost::optional<InProcessConnection> c2; public: - ClientSessionTest() : broker(new qpid::broker::InProcessBroker()), connection(broker) + void setUp() { + broker = broker::Broker::create(); + c=boost::in_place<InProcessConnection>(broker); + c2=boost::in_place<InProcessConnection>(broker); + } + + void tearDown() { + c2.reset(); + c.reset(); + broker.reset(); + } + + void declareSubscribe(const std::string& q="my-queue", + const std::string& dest="my-dest") { - connection.open(""); - session = connection.newSession(); + // FIXME aconway 2007-10-18: autoDelete queues are destroyed on channel close, not session. + // Fix & make all test queues exclusive, autoDelete + session.queueDeclare_(queue=q); // FIXME aconway 2007-10-01: exclusive=true, autoDelete=true); + session.messageSubscribe_(queue=q, destination=dest, acquireMode=1); + session.messageFlow_(destination=dest, unit=0, value=0xFFFFFFFF);//messages + session.messageFlow_(destination=dest, unit=1, value=0xFFFFFFFF);//bytes } + bool queueExists(const std::string& q) { + TypedResult<QueueQueryResult> result = session.queueQuery_(q); + return result.get().getQueue() == q; + } + void testQueueQuery() { - std::string name("my-queue"); - std::string alternate("amq.fanout"); - session.queueDeclare((queue=name, alternateExchange=alternate, exclusive=true, autoDelete=true)); - TypedResult<QueueQueryResult> result = session.queueQuery(name); + session = c->newSession(); + session.queueDeclare_(queue="my-queue", alternateExchange="amq.fanout", exclusive=true, autoDelete=true); + TypedResult<QueueQueryResult> result = session.queueQuery_(std::string("my-queue")); CPPUNIT_ASSERT_EQUAL(false, result.get().getDurable()); CPPUNIT_ASSERT_EQUAL(true, result.get().getExclusive()); - CPPUNIT_ASSERT_EQUAL(alternate, result.get().getAlternateExchange()); + CPPUNIT_ASSERT_EQUAL(std::string("amq.fanout"), + result.get().getAlternateExchange()); } void testTransfer() { - std::string queueName("my-queue"); - std::string dest("my-dest"); - std::string data("my message"); - session.queueDeclare_(queue=queueName, exclusive=true, autoDelete=true); - //subcribe to the queue with confirm_mode = 1: - session.messageSubscribe_(queue=queueName, destination=dest, acquireMode=1); - session.messageFlow((destination=dest, unit=0, value=1));//messages - session.messageFlow((destination=dest, unit=1, value=0xFFFFFFFF));//bytes - //publish a message: - TransferContent _content(data); - _content.getDeliveryProperties().setRoutingKey("my-queue"); - session.messageTransfer_(content=_content); + session = c->newSession(); + declareSubscribe(); + session.messageTransfer_(content=TransferContent("my-message", "my-queue")); //get & test the message: FrameSet::shared_ptr msg = session.get(); CPPUNIT_ASSERT(msg->isA<MessageTransferBody>()); - CPPUNIT_ASSERT_EQUAL(data, msg->getContent()); + CPPUNIT_ASSERT_EQUAL(std::string("my-message"), msg->getContent()); //confirm receipt: session.execution().completed(msg->getId(), true, true); } void testDispatcher() { - session.queueDeclare_(queue="my-queue", exclusive=true, autoDelete=true); + session = c->newSession(); + declareSubscribe(); TransferContent msg1("One"); msg1.getDeliveryProperties().setRoutingKey("my-queue"); @@ -125,9 +150,6 @@ public: msg3.getDeliveryProperties().setRoutingKey("my-queue"); session.messageTransfer_(content=msg3); - session.messageSubscribe_(queue="my-queue", destination="my-dest", acquireMode=1); - session.messageFlow((destination="my-dest", unit=0, value=1));//messages - session.messageFlow((destination="my-dest", unit=1, value=0xFFFFFFFF));//bytes DummyListener listener(session, "my-dest", 3); listener.listen(); CPPUNIT_ASSERT_EQUAL((size_t) 3, listener.messages.size()); @@ -140,29 +162,66 @@ public: } - void testSuspendResume() { - session = connection.newSession(60); + void testResumeExpiredError() { + session = c->newSession(0); + session.suspend(); // session has 0 timeout. + try { + c->resume(session); + CPPUNIT_FAIL("Expected InvalidArgumentException."); + } catch(const InvalidArgumentException&) {} + } + + void testUseSuspendedError() { + session = c->newSession(60); session.suspend(); try { session.exchangeQuery_(name="amq.fanout"); CPPUNIT_FAIL("Expected session suspended exception"); - } catch(...) {} - connection.resume(session); - session.exchangeQuery_(name="amq.fanout"); - // FIXME aconway 2007-09-25: build up session state and confirm - //it survives the resume + } catch(const CommandInvalidException&) {} } - void testSuspendResumeErrors() { - session.suspend(); // session has 0 timeout. - try { - session.exchangeQuery_(name="amq.fanout"); - CPPUNIT_FAIL("Expected suspended session exception"); - } catch(...) {} - try { - connection.resume(session); - CPPUNIT_FAIL("Expected no such session exception."); - } catch(...) {} + void testSuspendResume() { + session = c->newSession(60); + declareSubscribe(); + session.suspend(); + // Make sure we are still subscribed after resume. + c->resume(session); + session.messageTransfer_(content=TransferContent("my-message", "my-queue")); + FrameSet::shared_ptr msg = session.get(); + CPPUNIT_ASSERT_EQUAL(string("my-message"), msg->getContent()); + } + + void testDisconnectResume() { + session = c->newSession(60); + session.queueDeclare_(queue="before"); + CPPUNIT_ASSERT(queueExists("before")); + // Simulate lost frames. + c->discard(); + session.queueDeclare_(queue=string("after")); + c->disconnect(); // Simulate disconnect, resume on a new connection. + c2->resume(session); + CPPUNIT_ASSERT(queueExists("after")); + } + + void testAutoDelete() { + // Verify that autoDelete queues survive suspend/resume. + session = c->newSession(60); + session.queueDeclare_(queue="my-queue", exclusive=true, autoDelete=true); + CPPUNIT_ASSERT(queueExists("my-queue")); + session.suspend(); + c->resume(session); + CPPUNIT_ASSERT(queueExists("my-queue")); + + // Verify they survive disconnect/resume on new Connection + c->disconnect(); + c2->resume(session); + + try { + // FIXME aconway 2007-10-23: Negative test, need to + // fix auto-delete queues to clean up with session, not channel. + CPPUNIT_ASSERT(queueExists("my-queue")); + CPPUNIT_FAIL("Negative test passed unexpectedly"); + } catch(const ChannelException&) {} } }; |