summaryrefslogtreecommitdiff
path: root/cpp/src/tests/ClientSessionTest.cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2007-10-26 19:48:31 +0000
committerAlan Conway <aconway@apache.org>2007-10-26 19:48:31 +0000
commitf61e1ef7589da893b9b54448224dc0961515eb40 (patch)
tree258ac1fd99ac122b105ad90ad4394d8d544c5cbf /cpp/src/tests/ClientSessionTest.cpp
parentc5294d471ade7a18c52ca7d4028a494011c82293 (diff)
downloadqpid-python-f61e1ef7589da893b9b54448224dc0961515eb40.tar.gz
Session resume support in client & broker: Client can resume a session
after voluntary suspend() or network failure. Frames lost in network failure are automatically re-transmitted for transparent re-connection. client::Session improvements: - Locking to avoid races between network & user threads. - Replaced client::StateManager with sys::StateMonitor - avoid heap allocation. qpid::Exception clean up: - use QPID_MSG consistently to format exception messages. - throw typed exceptions (in reply_exceptions.h) for AMQP exceptions. - re-throw correct typed exception on client for exceptions from broker. - Removed QpidError.h rubygen/templates/constants.rb: - constants.h: Added FOO_CLASS_ID and FOO_BAR_METHOD_ID constants. - reply_constants.h: Added throwReplyException(code, text) log::Logger: - Fixed shutdown race in Statement::~Initializer() git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@588761 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/tests/ClientSessionTest.cpp')
-rw-r--r--cpp/src/tests/ClientSessionTest.cpp151
1 files changed, 105 insertions, 46 deletions
diff --git a/cpp/src/tests/ClientSessionTest.cpp b/cpp/src/tests/ClientSessionTest.cpp
index 2495a06fa4..db2cd62b0a 100644
--- a/cpp/src/tests/ClientSessionTest.cpp
+++ b/cpp/src/tests/ClientSessionTest.cpp
@@ -18,15 +18,21 @@
* under the License.
*
*/
-#include <list>
#include "qpid_test_plugin.h"
#include "InProcessBroker.h"
#include "qpid/client/Dispatcher.h"
#include "qpid/client/Session.h"
#include "qpid/framing/TransferContent.h"
+#include "qpid/framing/reply_exceptions.h"
+
+#include <boost/optional.hpp>
+
+#include <list>
using namespace qpid::client;
using namespace qpid::framing;
+using namespace qpid;
+using namespace boost;
struct DummyListener : public MessageListener
{
@@ -60,58 +66,77 @@ class ClientSessionTest : public CppUnit::TestCase
CPPUNIT_TEST(testQueueQuery);
CPPUNIT_TEST(testTransfer);
CPPUNIT_TEST(testDispatcher);
+ CPPUNIT_TEST(testResumeExpiredError);
+ CPPUNIT_TEST(testUseSuspendedError);
CPPUNIT_TEST(testSuspendResume);
- CPPUNIT_TEST(testSuspendResumeErrors);
+ CPPUNIT_TEST(testDisconnectResume);
+ CPPUNIT_TEST(testAutoDelete);
CPPUNIT_TEST_SUITE_END();
- boost::shared_ptr<Connector> broker;
- Connection connection;
+ shared_ptr<broker::Broker> broker;
Session session;
+ // Defer construction & thread creation to setUp
+ boost::optional<InProcessConnection> c;
+ boost::optional<InProcessConnection> c2;
public:
- ClientSessionTest() : broker(new qpid::broker::InProcessBroker()), connection(broker)
+ void setUp() {
+ broker = broker::Broker::create();
+ c=boost::in_place<InProcessConnection>(broker);
+ c2=boost::in_place<InProcessConnection>(broker);
+ }
+
+ void tearDown() {
+ c2.reset();
+ c.reset();
+ broker.reset();
+ }
+
+ void declareSubscribe(const std::string& q="my-queue",
+ const std::string& dest="my-dest")
{
- connection.open("");
- session = connection.newSession();
+ // FIXME aconway 2007-10-18: autoDelete queues are destroyed on channel close, not session.
+ // Fix & make all test queues exclusive, autoDelete
+ session.queueDeclare_(queue=q); // FIXME aconway 2007-10-01: exclusive=true, autoDelete=true);
+ session.messageSubscribe_(queue=q, destination=dest, acquireMode=1);
+ session.messageFlow_(destination=dest, unit=0, value=0xFFFFFFFF);//messages
+ session.messageFlow_(destination=dest, unit=1, value=0xFFFFFFFF);//bytes
}
+ bool queueExists(const std::string& q) {
+ TypedResult<QueueQueryResult> result = session.queueQuery_(q);
+ return result.get().getQueue() == q;
+ }
+
void testQueueQuery()
{
- std::string name("my-queue");
- std::string alternate("amq.fanout");
- session.queueDeclare((queue=name, alternateExchange=alternate, exclusive=true, autoDelete=true));
- TypedResult<QueueQueryResult> result = session.queueQuery(name);
+ session = c->newSession();
+ session.queueDeclare_(queue="my-queue", alternateExchange="amq.fanout", exclusive=true, autoDelete=true);
+ TypedResult<QueueQueryResult> result = session.queueQuery_(std::string("my-queue"));
CPPUNIT_ASSERT_EQUAL(false, result.get().getDurable());
CPPUNIT_ASSERT_EQUAL(true, result.get().getExclusive());
- CPPUNIT_ASSERT_EQUAL(alternate, result.get().getAlternateExchange());
+ CPPUNIT_ASSERT_EQUAL(std::string("amq.fanout"),
+ result.get().getAlternateExchange());
}
void testTransfer()
{
- std::string queueName("my-queue");
- std::string dest("my-dest");
- std::string data("my message");
- session.queueDeclare_(queue=queueName, exclusive=true, autoDelete=true);
- //subcribe to the queue with confirm_mode = 1:
- session.messageSubscribe_(queue=queueName, destination=dest, acquireMode=1);
- session.messageFlow((destination=dest, unit=0, value=1));//messages
- session.messageFlow((destination=dest, unit=1, value=0xFFFFFFFF));//bytes
- //publish a message:
- TransferContent _content(data);
- _content.getDeliveryProperties().setRoutingKey("my-queue");
- session.messageTransfer_(content=_content);
+ session = c->newSession();
+ declareSubscribe();
+ session.messageTransfer_(content=TransferContent("my-message", "my-queue"));
//get & test the message:
FrameSet::shared_ptr msg = session.get();
CPPUNIT_ASSERT(msg->isA<MessageTransferBody>());
- CPPUNIT_ASSERT_EQUAL(data, msg->getContent());
+ CPPUNIT_ASSERT_EQUAL(std::string("my-message"), msg->getContent());
//confirm receipt:
session.execution().completed(msg->getId(), true, true);
}
void testDispatcher()
{
- session.queueDeclare_(queue="my-queue", exclusive=true, autoDelete=true);
+ session = c->newSession();
+ declareSubscribe();
TransferContent msg1("One");
msg1.getDeliveryProperties().setRoutingKey("my-queue");
@@ -125,9 +150,6 @@ public:
msg3.getDeliveryProperties().setRoutingKey("my-queue");
session.messageTransfer_(content=msg3);
- session.messageSubscribe_(queue="my-queue", destination="my-dest", acquireMode=1);
- session.messageFlow((destination="my-dest", unit=0, value=1));//messages
- session.messageFlow((destination="my-dest", unit=1, value=0xFFFFFFFF));//bytes
DummyListener listener(session, "my-dest", 3);
listener.listen();
CPPUNIT_ASSERT_EQUAL((size_t) 3, listener.messages.size());
@@ -140,29 +162,66 @@ public:
}
- void testSuspendResume() {
- session = connection.newSession(60);
+ void testResumeExpiredError() {
+ session = c->newSession(0);
+ session.suspend(); // session has 0 timeout.
+ try {
+ c->resume(session);
+ CPPUNIT_FAIL("Expected InvalidArgumentException.");
+ } catch(const InvalidArgumentException&) {}
+ }
+
+ void testUseSuspendedError() {
+ session = c->newSession(60);
session.suspend();
try {
session.exchangeQuery_(name="amq.fanout");
CPPUNIT_FAIL("Expected session suspended exception");
- } catch(...) {}
- connection.resume(session);
- session.exchangeQuery_(name="amq.fanout");
- // FIXME aconway 2007-09-25: build up session state and confirm
- //it survives the resume
+ } catch(const CommandInvalidException&) {}
}
- void testSuspendResumeErrors() {
- session.suspend(); // session has 0 timeout.
- try {
- session.exchangeQuery_(name="amq.fanout");
- CPPUNIT_FAIL("Expected suspended session exception");
- } catch(...) {}
- try {
- connection.resume(session);
- CPPUNIT_FAIL("Expected no such session exception.");
- } catch(...) {}
+ void testSuspendResume() {
+ session = c->newSession(60);
+ declareSubscribe();
+ session.suspend();
+ // Make sure we are still subscribed after resume.
+ c->resume(session);
+ session.messageTransfer_(content=TransferContent("my-message", "my-queue"));
+ FrameSet::shared_ptr msg = session.get();
+ CPPUNIT_ASSERT_EQUAL(string("my-message"), msg->getContent());
+ }
+
+ void testDisconnectResume() {
+ session = c->newSession(60);
+ session.queueDeclare_(queue="before");
+ CPPUNIT_ASSERT(queueExists("before"));
+ // Simulate lost frames.
+ c->discard();
+ session.queueDeclare_(queue=string("after"));
+ c->disconnect(); // Simulate disconnect, resume on a new connection.
+ c2->resume(session);
+ CPPUNIT_ASSERT(queueExists("after"));
+ }
+
+ void testAutoDelete() {
+ // Verify that autoDelete queues survive suspend/resume.
+ session = c->newSession(60);
+ session.queueDeclare_(queue="my-queue", exclusive=true, autoDelete=true);
+ CPPUNIT_ASSERT(queueExists("my-queue"));
+ session.suspend();
+ c->resume(session);
+ CPPUNIT_ASSERT(queueExists("my-queue"));
+
+ // Verify they survive disconnect/resume on new Connection
+ c->disconnect();
+ c2->resume(session);
+
+ try {
+ // FIXME aconway 2007-10-23: Negative test, need to
+ // fix auto-delete queues to clean up with session, not channel.
+ CPPUNIT_ASSERT(queueExists("my-queue"));
+ CPPUNIT_FAIL("Negative test passed unexpectedly");
+ } catch(const ChannelException&) {}
}
};