diff options
author | Alan Conway <aconway@apache.org> | 2007-10-26 19:48:31 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2007-10-26 19:48:31 +0000 |
commit | f61e1ef7589da893b9b54448224dc0961515eb40 (patch) | |
tree | 258ac1fd99ac122b105ad90ad4394d8d544c5cbf /cpp/src/tests/ClientSessionTest.cpp | |
parent | c5294d471ade7a18c52ca7d4028a494011c82293 (diff) | |
download | qpid-python-f61e1ef7589da893b9b54448224dc0961515eb40.tar.gz |
Session resume support in client & broker: Client can resume a session
after voluntary suspend() or network failure. Frames lost in network
failure are automatically re-transmitted for transparent re-connection.
client::Session improvements:
- Locking to avoid races between network & user threads.
- Replaced client::StateManager with sys::StateMonitor - avoid heap allocation.
qpid::Exception clean up:
- use QPID_MSG consistently to format exception messages.
- throw typed exceptions (in reply_exceptions.h) for AMQP exceptions.
- re-throw correct typed exception on client for exceptions from broker.
- Removed QpidError.h
rubygen/templates/constants.rb:
- constants.h: Added FOO_CLASS_ID and FOO_BAR_METHOD_ID constants.
- reply_constants.h: Added throwReplyException(code, text)
log::Logger:
- Fixed shutdown race in Statement::~Initializer()
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@588761 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/tests/ClientSessionTest.cpp')
-rw-r--r-- | cpp/src/tests/ClientSessionTest.cpp | 151 |
1 files changed, 105 insertions, 46 deletions
diff --git a/cpp/src/tests/ClientSessionTest.cpp b/cpp/src/tests/ClientSessionTest.cpp index 2495a06fa4..db2cd62b0a 100644 --- a/cpp/src/tests/ClientSessionTest.cpp +++ b/cpp/src/tests/ClientSessionTest.cpp @@ -18,15 +18,21 @@ * under the License. * */ -#include <list> #include "qpid_test_plugin.h" #include "InProcessBroker.h" #include "qpid/client/Dispatcher.h" #include "qpid/client/Session.h" #include "qpid/framing/TransferContent.h" +#include "qpid/framing/reply_exceptions.h" + +#include <boost/optional.hpp> + +#include <list> using namespace qpid::client; using namespace qpid::framing; +using namespace qpid; +using namespace boost; struct DummyListener : public MessageListener { @@ -60,58 +66,77 @@ class ClientSessionTest : public CppUnit::TestCase CPPUNIT_TEST(testQueueQuery); CPPUNIT_TEST(testTransfer); CPPUNIT_TEST(testDispatcher); + CPPUNIT_TEST(testResumeExpiredError); + CPPUNIT_TEST(testUseSuspendedError); CPPUNIT_TEST(testSuspendResume); - CPPUNIT_TEST(testSuspendResumeErrors); + CPPUNIT_TEST(testDisconnectResume); + CPPUNIT_TEST(testAutoDelete); CPPUNIT_TEST_SUITE_END(); - boost::shared_ptr<Connector> broker; - Connection connection; + shared_ptr<broker::Broker> broker; Session session; + // Defer construction & thread creation to setUp + boost::optional<InProcessConnection> c; + boost::optional<InProcessConnection> c2; public: - ClientSessionTest() : broker(new qpid::broker::InProcessBroker()), connection(broker) + void setUp() { + broker = broker::Broker::create(); + c=boost::in_place<InProcessConnection>(broker); + c2=boost::in_place<InProcessConnection>(broker); + } + + void tearDown() { + c2.reset(); + c.reset(); + broker.reset(); + } + + void declareSubscribe(const std::string& q="my-queue", + const std::string& dest="my-dest") { - connection.open(""); - session = connection.newSession(); + // FIXME aconway 2007-10-18: autoDelete queues are destroyed on channel close, not session. + // Fix & make all test queues exclusive, autoDelete + session.queueDeclare_(queue=q); // FIXME aconway 2007-10-01: exclusive=true, autoDelete=true); + session.messageSubscribe_(queue=q, destination=dest, acquireMode=1); + session.messageFlow_(destination=dest, unit=0, value=0xFFFFFFFF);//messages + session.messageFlow_(destination=dest, unit=1, value=0xFFFFFFFF);//bytes } + bool queueExists(const std::string& q) { + TypedResult<QueueQueryResult> result = session.queueQuery_(q); + return result.get().getQueue() == q; + } + void testQueueQuery() { - std::string name("my-queue"); - std::string alternate("amq.fanout"); - session.queueDeclare((queue=name, alternateExchange=alternate, exclusive=true, autoDelete=true)); - TypedResult<QueueQueryResult> result = session.queueQuery(name); + session = c->newSession(); + session.queueDeclare_(queue="my-queue", alternateExchange="amq.fanout", exclusive=true, autoDelete=true); + TypedResult<QueueQueryResult> result = session.queueQuery_(std::string("my-queue")); CPPUNIT_ASSERT_EQUAL(false, result.get().getDurable()); CPPUNIT_ASSERT_EQUAL(true, result.get().getExclusive()); - CPPUNIT_ASSERT_EQUAL(alternate, result.get().getAlternateExchange()); + CPPUNIT_ASSERT_EQUAL(std::string("amq.fanout"), + result.get().getAlternateExchange()); } void testTransfer() { - std::string queueName("my-queue"); - std::string dest("my-dest"); - std::string data("my message"); - session.queueDeclare_(queue=queueName, exclusive=true, autoDelete=true); - //subcribe to the queue with confirm_mode = 1: - session.messageSubscribe_(queue=queueName, destination=dest, acquireMode=1); - session.messageFlow((destination=dest, unit=0, value=1));//messages - session.messageFlow((destination=dest, unit=1, value=0xFFFFFFFF));//bytes - //publish a message: - TransferContent _content(data); - _content.getDeliveryProperties().setRoutingKey("my-queue"); - session.messageTransfer_(content=_content); + session = c->newSession(); + declareSubscribe(); + session.messageTransfer_(content=TransferContent("my-message", "my-queue")); //get & test the message: FrameSet::shared_ptr msg = session.get(); CPPUNIT_ASSERT(msg->isA<MessageTransferBody>()); - CPPUNIT_ASSERT_EQUAL(data, msg->getContent()); + CPPUNIT_ASSERT_EQUAL(std::string("my-message"), msg->getContent()); //confirm receipt: session.execution().completed(msg->getId(), true, true); } void testDispatcher() { - session.queueDeclare_(queue="my-queue", exclusive=true, autoDelete=true); + session = c->newSession(); + declareSubscribe(); TransferContent msg1("One"); msg1.getDeliveryProperties().setRoutingKey("my-queue"); @@ -125,9 +150,6 @@ public: msg3.getDeliveryProperties().setRoutingKey("my-queue"); session.messageTransfer_(content=msg3); - session.messageSubscribe_(queue="my-queue", destination="my-dest", acquireMode=1); - session.messageFlow((destination="my-dest", unit=0, value=1));//messages - session.messageFlow((destination="my-dest", unit=1, value=0xFFFFFFFF));//bytes DummyListener listener(session, "my-dest", 3); listener.listen(); CPPUNIT_ASSERT_EQUAL((size_t) 3, listener.messages.size()); @@ -140,29 +162,66 @@ public: } - void testSuspendResume() { - session = connection.newSession(60); + void testResumeExpiredError() { + session = c->newSession(0); + session.suspend(); // session has 0 timeout. + try { + c->resume(session); + CPPUNIT_FAIL("Expected InvalidArgumentException."); + } catch(const InvalidArgumentException&) {} + } + + void testUseSuspendedError() { + session = c->newSession(60); session.suspend(); try { session.exchangeQuery_(name="amq.fanout"); CPPUNIT_FAIL("Expected session suspended exception"); - } catch(...) {} - connection.resume(session); - session.exchangeQuery_(name="amq.fanout"); - // FIXME aconway 2007-09-25: build up session state and confirm - //it survives the resume + } catch(const CommandInvalidException&) {} } - void testSuspendResumeErrors() { - session.suspend(); // session has 0 timeout. - try { - session.exchangeQuery_(name="amq.fanout"); - CPPUNIT_FAIL("Expected suspended session exception"); - } catch(...) {} - try { - connection.resume(session); - CPPUNIT_FAIL("Expected no such session exception."); - } catch(...) {} + void testSuspendResume() { + session = c->newSession(60); + declareSubscribe(); + session.suspend(); + // Make sure we are still subscribed after resume. + c->resume(session); + session.messageTransfer_(content=TransferContent("my-message", "my-queue")); + FrameSet::shared_ptr msg = session.get(); + CPPUNIT_ASSERT_EQUAL(string("my-message"), msg->getContent()); + } + + void testDisconnectResume() { + session = c->newSession(60); + session.queueDeclare_(queue="before"); + CPPUNIT_ASSERT(queueExists("before")); + // Simulate lost frames. + c->discard(); + session.queueDeclare_(queue=string("after")); + c->disconnect(); // Simulate disconnect, resume on a new connection. + c2->resume(session); + CPPUNIT_ASSERT(queueExists("after")); + } + + void testAutoDelete() { + // Verify that autoDelete queues survive suspend/resume. + session = c->newSession(60); + session.queueDeclare_(queue="my-queue", exclusive=true, autoDelete=true); + CPPUNIT_ASSERT(queueExists("my-queue")); + session.suspend(); + c->resume(session); + CPPUNIT_ASSERT(queueExists("my-queue")); + + // Verify they survive disconnect/resume on new Connection + c->disconnect(); + c2->resume(session); + + try { + // FIXME aconway 2007-10-23: Negative test, need to + // fix auto-delete queues to clean up with session, not channel. + CPPUNIT_ASSERT(queueExists("my-queue")); + CPPUNIT_FAIL("Negative test passed unexpectedly"); + } catch(const ChannelException&) {} } }; |