diff options
Diffstat (limited to 'cpp/src/tests')
-rw-r--r-- | cpp/src/tests/APRBaseTest.cpp | 47 | ||||
-rw-r--r-- | cpp/src/tests/BrokerFixture.h | 77 | ||||
-rw-r--r-- | cpp/src/tests/ClientChannelTest.cpp | 2 | ||||
-rw-r--r-- | cpp/src/tests/ClientSessionTest.cpp | 26 | ||||
-rw-r--r-- | cpp/src/tests/InProcessBroker.h | 245 | ||||
-rw-r--r-- | cpp/src/tests/Makefile.am | 1 | ||||
-rw-r--r-- | cpp/src/tests/SocketProxy.h | 158 | ||||
-rw-r--r-- | cpp/src/tests/exception_test.cpp | 53 | ||||
-rwxr-xr-x | cpp/src/tests/quick_perftest | 2 |
9 files changed, 196 insertions, 415 deletions
diff --git a/cpp/src/tests/APRBaseTest.cpp b/cpp/src/tests/APRBaseTest.cpp deleted file mode 100644 index 3ec18d658e..0000000000 --- a/cpp/src/tests/APRBaseTest.cpp +++ /dev/null @@ -1,47 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include "qpid/sys/apr/APRBase.h" -#include "qpid_test_plugin.h" -#include <iostream> - -using namespace qpid::sys; - -class APRBaseTest : public CppUnit::TestCase -{ - CPPUNIT_TEST_SUITE(APRBaseTest); - CPPUNIT_TEST(testMe); - CPPUNIT_TEST_SUITE_END(); - - public: - - void testMe() - { - APRBase::increment(); - APRBase::increment(); - APRBase::decrement(); - APRBase::decrement(); - } -}; - -// Make this test suite a plugin. -CPPUNIT_PLUGIN_IMPLEMENT(); -CPPUNIT_TEST_SUITE_REGISTRATION(APRBaseTest); - diff --git a/cpp/src/tests/BrokerFixture.h b/cpp/src/tests/BrokerFixture.h index c7222c03a2..76cf2e8761 100644 --- a/cpp/src/tests/BrokerFixture.h +++ b/cpp/src/tests/BrokerFixture.h @@ -22,6 +22,7 @@ * */ +#include "SocketProxy.h" #include "qpid/sys/Thread.h" #include "qpid/broker/Broker.h" #include "qpid/client/Connection.h" @@ -30,52 +31,74 @@ #include "qpid/client/SubscriptionManager.h" /** - * A fixture to create an in-process broker and connect to it for tests. + * A fixture with an in-process broker. */ -struct BrokerFixture { +struct BrokerFixture { typedef qpid::broker::Broker Broker; typedef boost::shared_ptr<Broker> BrokerPtr; - struct OpenConnection : public qpid::client::Connection { - OpenConnection(int port) { open("localhost", port); } - }; - BrokerPtr broker; qpid::sys::Thread brokerThread; - OpenConnection connection; - qpid::client::Session_0_10 session; - qpid::client::SubscriptionManager subs; - qpid::client::LocalQueue lq; - - BrokerPtr newBroker() { + + BrokerFixture() { Broker::Options opts; opts.port=0; opts.workerThreads=1; - BrokerPtr b=Broker::create(opts); - // TODO aconway 2007-12-05: Without the following line - // the test can hang in the connection ctor. This is - // a race condition that should be fixed. - b->getPort(); - return b; + broker = Broker::create(opts); + // TODO aconway 2007-12-05: At one point BrokerFixture + // tests could hang in Connection ctor if the following + // line is removed. This may not be an issue anymore. + broker->getPort(); + brokerThread = qpid::sys::Thread(*broker); }; - BrokerFixture() : broker(newBroker()), - brokerThread(*broker), - connection(broker->getPort()), - session(connection.newSession()), - subs(session) - {} - ~BrokerFixture() { - connection.close(); broker->shutdown(); brokerThread.join(); } - /** Open a connection to the local broker */ + /** Open a connection to the broker. */ void open(qpid::client::Connection& c) { c.open("localhost", broker->getPort()); } }; +struct LocalConnection : public qpid::client::Connection { + LocalConnection(uint16_t port) { open("localhost", port); } +}; + +/** A local client connection via a socket proxy. */ +struct ProxyConnection : public qpid::client::Connection { + SocketProxy proxy; + ProxyConnection(int brokerPort) : proxy(brokerPort) { + open("localhost", proxy.getPort()); + } + ~ProxyConnection() { close(); } +}; + +/** + * A BrokerFixture with open Connection, Session and + * SubscriptionManager and LocalQueue for convenience. + */ +template <class ConnectionType> +struct SessionFixtureT : BrokerFixture { + ConnectionType connection; + qpid::client::Session_0_10 session; + qpid::client::SubscriptionManager subs; + qpid::client::LocalQueue lq; + + SessionFixtureT() : connection(broker->getPort()), + session(connection.newSession()), + subs(session) + {} + + ~SessionFixtureT() { + connection.close(); + } +}; + +typedef SessionFixtureT<LocalConnection> SessionFixture; +typedef SessionFixtureT<ProxyConnection> ProxySessionFixture; + + #endif /*!TESTS_BROKERFIXTURE_H*/ diff --git a/cpp/src/tests/ClientChannelTest.cpp b/cpp/src/tests/ClientChannelTest.cpp index bd8f5af6be..605d5e4885 100644 --- a/cpp/src/tests/ClientChannelTest.cpp +++ b/cpp/src/tests/ClientChannelTest.cpp @@ -44,7 +44,7 @@ const size_t FRAME_MAX = 256; * The test base defines the tests methods, derived classes * instantiate the channel in Basic or Message mode. */ -class ChannelTestBase : public CppUnit::TestCase, public BrokerFixture +class ChannelTestBase : public CppUnit::TestCase, public SessionFixture { struct Listener: public qpid::client::MessageListener { vector<Message> messages; diff --git a/cpp/src/tests/ClientSessionTest.cpp b/cpp/src/tests/ClientSessionTest.cpp index 5f45e1f938..82db7b9545 100644 --- a/cpp/src/tests/ClientSessionTest.cpp +++ b/cpp/src/tests/ClientSessionTest.cpp @@ -20,7 +20,6 @@ */ #include "qpid_test_plugin.h" #include "BrokerFixture.h" -#include "SocketProxy.h" #include "qpid/client/Dispatcher.h" #include "qpid/client/Session_0_10.h" #include "qpid/framing/TransferContent.h" @@ -62,7 +61,7 @@ struct DummyListener : public MessageListener } }; -class ClientSessionTest : public CppUnit::TestCase, public BrokerFixture +class ClientSessionTest : public CppUnit::TestCase, public ProxySessionFixture { CPPUNIT_TEST_SUITE(ClientSessionTest); CPPUNIT_TEST(testQueueQuery); @@ -71,7 +70,6 @@ class ClientSessionTest : public CppUnit::TestCase, public BrokerFixture CPPUNIT_TEST(testResumeExpiredError); CPPUNIT_TEST(testUseSuspendedError); CPPUNIT_TEST(testSuspendResume); - CPPUNIT_TEST(testDisconnectResume); CPPUNIT_TEST_SUITE_END(); public: @@ -85,11 +83,6 @@ class ClientSessionTest : public CppUnit::TestCase, public BrokerFixture session.messageFlow(destination=dest, unit=1, value=0xFFFFFFFF);//bytes } - bool queueExists(const std::string& q) { - TypedResult<QueueQueryResult> result = session.queueQuery(q); - return result.get().getQueue() == q; - } - void testQueueQuery() { session =connection.newSession(); @@ -166,26 +159,11 @@ class ClientSessionTest : public CppUnit::TestCase, public BrokerFixture declareSubscribe(); session.suspend(); // Make sure we are still subscribed after resume. - connection.resume(session); + connection.resume(session); session.messageTransfer(content=TransferContent("my-message", "my-queue")); FrameSet::shared_ptr msg = session.get(); CPPUNIT_ASSERT_EQUAL(string("my-message"), msg->getContent()); } - - void testDisconnectResume() { - // FIXME aconway 2007-12-11: Test hanging. -// ProxyConnection c(broker->getPort()); -// Session_0_10 s = c.session; -// s.queueDeclare(queue="before"); -// CPPUNIT_ASSERT(queueExists("before")); -// s.queueDeclare(queue=string("after")); -// c.proxy.client.close(); // Disconnect the client. -// Connection c2; -// open(c2); -// c2.resume(s); -// CPPUNIT_ASSERT(queueExists("after")); -// c2.close(); - } }; // Make this test suite a plugin. diff --git a/cpp/src/tests/InProcessBroker.h b/cpp/src/tests/InProcessBroker.h deleted file mode 100644 index f014941743..0000000000 --- a/cpp/src/tests/InProcessBroker.h +++ /dev/null @@ -1,245 +0,0 @@ -#ifndef _tests_InProcessBroker_h -#define _tests_InProcessBroker_h - -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#include "qpid/framing/AMQP_HighestVersion.h" -#include "qpid/framing/AMQFrame.h" -#include "qpid/broker/Broker.h" -#include "qpid/broker/Connection.h" -#include "qpid/client/Connector.h" -#include "qpid/client/Connection.h" -#include "qpid/log/Statement.h" -#include "qpid/sys/Thread.h" -#include "qpid/sys/BlockingQueue.h" -#include "qpid/shared_ptr.h" - -#include <vector> -#include <iostream> -#include <algorithm> - - -namespace qpid { - -using qpid::sys::ConnectionInputHandler; - -/** - * A client::Connector that connects directly to an in-process broker. - * Also allows you to "snoop" on frames exchanged between client & broker. - * - * see FramingTest::testRequestResponseRoundtrip() for example of use. - */ -class InProcessConnector : - public client::Connector -{ - public: - typedef sys::Mutex Mutex; - typedef Mutex::ScopedLock Lock; - typedef framing::FrameHandler FrameHandler; - typedef framing::AMQFrame AMQFrame; - - enum Sender {CLIENT,BROKER}; - - struct Task { - AMQFrame frame; - bool doOutput; - - Task() : doOutput(true) {} - Task(AMQFrame& f) : frame(f), doOutput(false) {} - }; - - /** Simulate the network thread of a peer with a queue and a thread. - * With setInputHandler(0) drops frames simulating network packet loss. - */ - class NetworkQueue : public sys::Runnable - { - public: - NetworkQueue(const char* r) : inputHandler(0), connectionHandler(0), receiver(r) { - thread=sys::Thread(this); - } - - ~NetworkQueue() { - queue.close(); - thread.join(); - } - - void push(AMQFrame& f) { queue.push(f); } - void activateOutput() { queue.push(Task()); } - - void run() { - try { - while(true) { - Task t = queue.pop(); - if (t.doOutput) { - if (connectionHandler) { - while (connectionHandler->doOutput()); - } - } else { - if (inputHandler) { - QPID_LOG(debug, QPID_MSG(receiver << " RECV: " << t.frame)); - inputHandler->handle(t.frame); - } - else - QPID_LOG(debug, QPID_MSG(receiver << " DROP: " << t.frame)); - } - } - } - catch (const std::exception& e) { - QPID_LOG(debug, QPID_MSG(receiver << " Terminated: " << e.what())); - return; - } - } - - void setConnectionInputHandler(ConnectionInputHandler* h) { - Lock l(lock); - inputHandler = h; - connectionHandler = h; - } - - void setInputHandler(FrameHandler* h) { - Lock l(lock); - inputHandler = h; - connectionHandler = 0; - } - - private: - sys::Mutex lock; - sys::BlockingQueue<Task> queue; - sys::Thread thread; - FrameHandler* inputHandler; - ConnectionInputHandler* connectionHandler; - const char* const receiver; - }; - - struct InProcessHandler : public sys::ConnectionOutputHandler { - Sender from; - NetworkQueue queue; - const char* const sender; - NetworkQueue* reverseQueue; - - InProcessHandler(Sender s) - : from(s), - queue(from==CLIENT? "BROKER" : "CLIENT"), - sender(from==BROKER? "BROKER" : "CLIENT"), - reverseQueue(0) - {} - - ~InProcessHandler() { } - - void send(AMQFrame& f) { - QPID_LOG(debug, QPID_MSG(sender << " SENT: " << f)); - queue.push(f); - } - - void close() { - // Do not shut down the queue here, we may be in - // the queue's dispatch thread. - } - - void activateOutput() { - if (reverseQueue) reverseQueue->activateOutput(); - } - }; - - - InProcessConnector(shared_ptr<broker::Broker> b=broker::Broker::create(), - framing::ProtocolVersion v=framing::ProtocolVersion()) : - Connector(v), - protocolInit(v), - broker(b), - brokerOut(BROKER), - brokerConnection(&brokerOut, *broker), - clientOut(CLIENT), - isClosed(false) - { - clientOut.queue.setConnectionInputHandler(&brokerConnection); - brokerOut.reverseQueue = &clientOut.queue; - clientOut.reverseQueue = &brokerOut.queue; - } - - ~InProcessConnector() { - close(); - - } - - void connect(const std::string& /*host*/, int /*port*/) {} - - void init() { brokerConnection.initiated(protocolInit); } - - void close() { - if (!isClosed) { - isClosed = true; - brokerOut.close(); - clientOut.close(); - brokerConnection.closed(); - } - } - - /** Client's input handler. */ - void setInputHandler(framing::InputHandler* handler) { - brokerOut.queue.setInputHandler(handler); - } - - /** Called by client to send a frame */ - void send(framing::AMQFrame& frame) { - clientOut.handle(frame); - } - - /** Sliently discard frames sent by either party, lost network traffic. */ - void discard() { - brokerOut.queue.setInputHandler(0); - clientOut.queue.setConnectionInputHandler(0); - } - - shared_ptr<broker::Broker> getBroker() { return broker; } - - private: - sys::Mutex lock; - framing::ProtocolInitiation protocolInit; - shared_ptr<broker::Broker> broker; - InProcessHandler brokerOut; - broker::Connection brokerConnection; - InProcessHandler clientOut; - bool isClosed; -}; - -struct InProcessConnection : public client::Connection { - /** Connect to an existing broker */ - InProcessConnection(shared_ptr<broker::Broker> b=broker::Broker::create()) - : client::Connection( - shared_ptr<client::Connector>(new InProcessConnector(b))) - { open(""); } - - InProcessConnector& getConnector() { - return static_cast<InProcessConnector&>(*impl->getConnector()); - } - - /** Simulate disconnected network connection. */ - void disconnect() { getConnector().close(); } - - /** Discard frames, simulates lost network traffic. */ - void discard() { getConnector().discard(); } - - shared_ptr<broker::Broker> getBroker() { - return getConnector().getBroker(); - } -}; - -} // namespace qpid - -#endif // _tests_InProcessBroker_h diff --git a/cpp/src/tests/Makefile.am b/cpp/src/tests/Makefile.am index 0dc4b47a84..a25c46b5b0 100644 --- a/cpp/src/tests/Makefile.am +++ b/cpp/src/tests/Makefile.am @@ -122,7 +122,6 @@ EXTRA_DIST += \ topictest \ .valgrind.supp \ .valgrindrc \ - InProcessBroker.h \ MessageUtils.h \ MockChannel.h \ MockConnectionInputHandler.h \ diff --git a/cpp/src/tests/SocketProxy.h b/cpp/src/tests/SocketProxy.h index b985ded175..a37c1f2c3e 100644 --- a/cpp/src/tests/SocketProxy.h +++ b/cpp/src/tests/SocketProxy.h @@ -24,59 +24,141 @@ #include "qpid/sys/Socket.h" #include "qpid/sys/Runnable.h" #include "qpid/sys/Thread.h" +#include "qpid/sys/Mutex.h" +#include "qpid/client/Connection.h" +#include "qpid/log/Statement.h" + +#include <algorithm> /** - * A simple socket proxy that forwards to another socket. Used between - * client & broker to simulate network failures. + * A simple socket proxy that forwards to another socket. + * Used between client & local broker to simulate network failures. */ -struct SocketProxy : public qpid::sys::Runnable +class SocketProxy : private qpid::sys::Runnable { - int port; // Port bound to server socket. - qpid::sys::Socket client, server; // Client & server sockets. + public: + /** Connect to connectPort on host, start a forwarding thread. + * Listen for connection on getPort(). + */ + SocketProxy(int connectPort, const std::string host="localhost") + : closed(false), port(listener.listen()) + { + int r=::pipe(closePipe); + if (r<0) throwErrno(QPID_MSG("::pipe returned " << r)); + client.connect(host, connectPort); + thread = qpid::sys::Thread(static_cast<qpid::sys::Runnable*>(this)); + } + + ~SocketProxy() { close(); } - SocketProxy(const std::string& host, int port) { init(host,port); } - SocketProxy(int port) { init("localhost",port); } + /** Simulate a network disconnect. */ + void close() { + { + qpid::sys::Mutex::ScopedLock l(lock); + if (closed) return; + closed=true; + } + write(closePipe[1], this, 1); // Random byte to closePipe + thread.join(); + client.close(); + ::close(closePipe[0]); + ::close(closePipe[1]); + } - ~SocketProxy() { client.close(); server.close(); thread.join(); } + bool isClosed() const { + qpid::sys::Mutex::ScopedLock l(lock); + return closed; + } + + uint16_t getPort() const { return port; } private: - - void init(const std::string& host, int connectPort) { - client.connect(host, connectPort); - port = server.listen(); - thread=qpid::sys::Thread(this); + static void throwErrno(const std::string& msg) { + throw qpid::Exception(msg+":"+qpid::strError(errno)); } - - void run() { - try { - do { - ssize_t recv = server.recv(buffer, sizeof(buffer)); - if (recv <= 0) return; - ssize_t sent=client.send(buffer, recv); - if (sent < 0) return; - assert(sent == recv); // Assumes we can send as we receive. - } while (true); - } catch(...) {} + static void throwIf(bool condition, const std::string& msg) { + if (condition) throw qpid::Exception(msg); } + + struct FdSet : fd_set { + FdSet() : maxFd(0) { clear(); } + void clear() { FD_ZERO(this); } + void set(int fd) { FD_SET(fd, this); maxFd = std::max(maxFd, fd); } + bool isSet(int fd) const { return FD_ISSET(fd, this); } + bool operator[](int fd) const { return isSet(fd); } - qpid::sys::Thread thread; - char buffer[64*1024]; -}; + int maxFd; + }; -/** A local client connection via a socket proxy. */ -struct ProxyConnection : public qpid::client::Connection { - SocketProxy proxy; - qpid::client::Session_0_10 session; + enum { RD=1, WR=2, ER=4 }; - ProxyConnection(const std::string& host, int port) : proxy(port) { - open(host, proxy.port); - session=newSession(); - } + struct Selector { + FdSet rd, wr, er; + + void set(int fd, int sets) { + if (sets & RD) rd.set(fd); + if (sets & WR) wr.set(fd); + if (sets & ER) er.set(fd); + } + + int select() { + for (;;) { + int maxFd = std::max(rd.maxFd, std::max(wr.maxFd, er.maxFd)); + int r = ::select(maxFd + 1, &rd, &wr, &er, NULL); + if (r == -1 && errno == EINTR) continue; + if (r < 0) throwErrno(QPID_MSG("select returned " <<r)); + return r; + } + } + }; - ProxyConnection(int port) : proxy(port) { - open("localhost", proxy.port); - session=newSession(); + void run() { + std::auto_ptr<qpid::sys::Socket> server; + try { + // Accept incoming connections, watch closePipe. + Selector accept; + accept.set(listener.toFd(), RD|ER); + accept.set(closePipe[0], RD|ER); + accept.select(); + throwIf(accept.rd[closePipe[0]], "Closed by close()"); + throwIf(!accept.rd[listener.toFd()],"Accept failed"); + server.reset(listener.accept(0, 0)); + + // Pump data between client & server sockets, watch closePipe. + char buffer[1024]; + for (;;) { + Selector select; + select.set(server->toFd(), RD|ER); + select.set(client.toFd(), RD|ER); + select.set(closePipe[0], RD|ER); + select.select(); + throwIf(select.rd[closePipe[0]], "Closed by close()"); + // Read even if fd is in error to throw a useful exception. + bool gotData=false; + if (select.rd[server->toFd()] || select.er[server->toFd()]) { + client.write(buffer, server->read(buffer, sizeof(buffer))); + gotData=true; + } + if (select.rd[client.toFd()] || select.er[client.toFd()]) { + server->write(buffer, client.read(buffer, sizeof(buffer))); + gotData=true; + } + throwIf(!gotData, "No data from select()"); + } + } + catch (const std::exception& e) { + QPID_LOG(debug, "SocketProxy::run exiting: " << e.what()); + } + if (server.get()) server->close(); + close(); } + + mutable qpid::sys::Mutex lock; + bool closed; + qpid::sys::Socket client, listener; + uint16_t port; + int closePipe[2]; + qpid::sys::Thread thread; }; #endif diff --git a/cpp/src/tests/exception_test.cpp b/cpp/src/tests/exception_test.cpp index d19307a5c0..700aeef47c 100644 --- a/cpp/src/tests/exception_test.cpp +++ b/cpp/src/tests/exception_test.cpp @@ -21,7 +21,6 @@ #include "unit_test.h" #include "BrokerFixture.h" -#include "SocketProxy.h" #include "qpid/client/SubscriptionManager.h" #include "qpid/sys/Runnable.h" #include "qpid/sys/Thread.h" @@ -52,7 +51,7 @@ struct Catcher : public Runnable { try { f(); } catch(const Ex& e) { caught=true; - BOOST_MESSAGE(e.what()); + BOOST_MESSAGE(string("Caught expected exception: ")+e.what()); } catch(const std::exception& e) { BOOST_ERROR(string("Bad exception: ")+e.what()); @@ -71,37 +70,29 @@ struct Catcher : public Runnable { } }; -// FIXME aconway 2007-12-11: Disabled hanging tests. -// BOOST_FIXTURE_TEST_CASE(DisconnectedGet, BrokerFixture) { -// ProxyConnection c(broker->getPort()); -// Catcher<ClosedException> get(bind(&Session_0_10::get, c.session)); -// c.proxy.client.close(); // Close the client side. -// BOOST_CHECK(get.join()); -// } - -// BOOST_FIXTURE_TEST_CASE(DisconnectedPop, BrokerFixture) { -// ProxyConnection c(broker->getPort()); -// c.session.queueDeclare(arg::queue="q"); -// subs.subscribe(lq, "q"); -// Catcher<ClosedException> pop(bind(&LocalQueue::pop, boost::ref(lq))); -// c.proxy.client.close(); -// BOOST_CHECK(pop.join()); -// } +BOOST_FIXTURE_TEST_CASE(DisconnectedPop, ProxySessionFixture) { + ProxyConnection c(broker->getPort()); + session.queueDeclare(arg::queue="q"); + subs.subscribe(lq, "q"); + Catcher<ClosedException> pop(bind(&LocalQueue::pop, boost::ref(lq))); + connection.proxy.close(); + BOOST_CHECK(pop.join()); +} -// BOOST_FIXTURE_TEST_CASE(DisconnectedListen, BrokerFixture) { -// struct NullListener : public MessageListener { -// void received(Message&) { BOOST_FAIL("Unexpected message"); } -// } l; -// ProxyConnection c(broker->getPort()); -// c.session.queueDeclare(arg::queue="q"); -// subs.subscribe(l, "q"); -// Thread t(subs); -// c.proxy.client.close(); -// t.join(); -// BOOST_CHECK_THROW(c.session.close(), InternalErrorException); -// } +BOOST_FIXTURE_TEST_CASE(DisconnectedListen, ProxySessionFixture) { + struct NullListener : public MessageListener { + void received(Message&) { BOOST_FAIL("Unexpected message"); } + } l; + ProxyConnection c(broker->getPort()); + session.queueDeclare(arg::queue="q"); + subs.subscribe(l, "q"); + Thread t(subs); + connection.proxy.close(); + t.join(); + BOOST_CHECK_THROW(session.close(), InternalErrorException); +} -BOOST_FIXTURE_TEST_CASE(NoSuchQueueTest, BrokerFixture) { +BOOST_FIXTURE_TEST_CASE(NoSuchQueueTest, SessionFixture) { BOOST_CHECK_THROW(subs.subscribe(lq, "no such queue").sync(), NotFoundException); } diff --git a/cpp/src/tests/quick_perftest b/cpp/src/tests/quick_perftest index 5522b7fee4..676436fdc7 100755 --- a/cpp/src/tests/quick_perftest +++ b/cpp/src/tests/quick_perftest @@ -1,2 +1,2 @@ #!/bin/sh -exec `dirname $0`/run_test ./perftest --summary --count 1000 +exec `dirname $0`/run_test ./perftest --summary --count 100 |