summaryrefslogtreecommitdiff
path: root/cpp/src/tests
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/tests')
-rw-r--r--cpp/src/tests/APRBaseTest.cpp47
-rw-r--r--cpp/src/tests/BrokerFixture.h77
-rw-r--r--cpp/src/tests/ClientChannelTest.cpp2
-rw-r--r--cpp/src/tests/ClientSessionTest.cpp26
-rw-r--r--cpp/src/tests/InProcessBroker.h245
-rw-r--r--cpp/src/tests/Makefile.am1
-rw-r--r--cpp/src/tests/SocketProxy.h158
-rw-r--r--cpp/src/tests/exception_test.cpp53
-rwxr-xr-xcpp/src/tests/quick_perftest2
9 files changed, 196 insertions, 415 deletions
diff --git a/cpp/src/tests/APRBaseTest.cpp b/cpp/src/tests/APRBaseTest.cpp
deleted file mode 100644
index 3ec18d658e..0000000000
--- a/cpp/src/tests/APRBaseTest.cpp
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include "qpid/sys/apr/APRBase.h"
-#include "qpid_test_plugin.h"
-#include <iostream>
-
-using namespace qpid::sys;
-
-class APRBaseTest : public CppUnit::TestCase
-{
- CPPUNIT_TEST_SUITE(APRBaseTest);
- CPPUNIT_TEST(testMe);
- CPPUNIT_TEST_SUITE_END();
-
- public:
-
- void testMe()
- {
- APRBase::increment();
- APRBase::increment();
- APRBase::decrement();
- APRBase::decrement();
- }
-};
-
-// Make this test suite a plugin.
-CPPUNIT_PLUGIN_IMPLEMENT();
-CPPUNIT_TEST_SUITE_REGISTRATION(APRBaseTest);
-
diff --git a/cpp/src/tests/BrokerFixture.h b/cpp/src/tests/BrokerFixture.h
index c7222c03a2..76cf2e8761 100644
--- a/cpp/src/tests/BrokerFixture.h
+++ b/cpp/src/tests/BrokerFixture.h
@@ -22,6 +22,7 @@
*
*/
+#include "SocketProxy.h"
#include "qpid/sys/Thread.h"
#include "qpid/broker/Broker.h"
#include "qpid/client/Connection.h"
@@ -30,52 +31,74 @@
#include "qpid/client/SubscriptionManager.h"
/**
- * A fixture to create an in-process broker and connect to it for tests.
+ * A fixture with an in-process broker.
*/
-struct BrokerFixture {
+struct BrokerFixture {
typedef qpid::broker::Broker Broker;
typedef boost::shared_ptr<Broker> BrokerPtr;
- struct OpenConnection : public qpid::client::Connection {
- OpenConnection(int port) { open("localhost", port); }
- };
-
BrokerPtr broker;
qpid::sys::Thread brokerThread;
- OpenConnection connection;
- qpid::client::Session_0_10 session;
- qpid::client::SubscriptionManager subs;
- qpid::client::LocalQueue lq;
-
- BrokerPtr newBroker() {
+
+ BrokerFixture() {
Broker::Options opts;
opts.port=0;
opts.workerThreads=1;
- BrokerPtr b=Broker::create(opts);
- // TODO aconway 2007-12-05: Without the following line
- // the test can hang in the connection ctor. This is
- // a race condition that should be fixed.
- b->getPort();
- return b;
+ broker = Broker::create(opts);
+ // TODO aconway 2007-12-05: At one point BrokerFixture
+ // tests could hang in Connection ctor if the following
+ // line is removed. This may not be an issue anymore.
+ broker->getPort();
+ brokerThread = qpid::sys::Thread(*broker);
};
- BrokerFixture() : broker(newBroker()),
- brokerThread(*broker),
- connection(broker->getPort()),
- session(connection.newSession()),
- subs(session)
- {}
-
~BrokerFixture() {
- connection.close();
broker->shutdown();
brokerThread.join();
}
- /** Open a connection to the local broker */
+ /** Open a connection to the broker. */
void open(qpid::client::Connection& c) {
c.open("localhost", broker->getPort());
}
};
+struct LocalConnection : public qpid::client::Connection {
+ LocalConnection(uint16_t port) { open("localhost", port); }
+};
+
+/** A local client connection via a socket proxy. */
+struct ProxyConnection : public qpid::client::Connection {
+ SocketProxy proxy;
+ ProxyConnection(int brokerPort) : proxy(brokerPort) {
+ open("localhost", proxy.getPort());
+ }
+ ~ProxyConnection() { close(); }
+};
+
+/**
+ * A BrokerFixture with open Connection, Session and
+ * SubscriptionManager and LocalQueue for convenience.
+ */
+template <class ConnectionType>
+struct SessionFixtureT : BrokerFixture {
+ ConnectionType connection;
+ qpid::client::Session_0_10 session;
+ qpid::client::SubscriptionManager subs;
+ qpid::client::LocalQueue lq;
+
+ SessionFixtureT() : connection(broker->getPort()),
+ session(connection.newSession()),
+ subs(session)
+ {}
+
+ ~SessionFixtureT() {
+ connection.close();
+ }
+};
+
+typedef SessionFixtureT<LocalConnection> SessionFixture;
+typedef SessionFixtureT<ProxyConnection> ProxySessionFixture;
+
+
#endif /*!TESTS_BROKERFIXTURE_H*/
diff --git a/cpp/src/tests/ClientChannelTest.cpp b/cpp/src/tests/ClientChannelTest.cpp
index bd8f5af6be..605d5e4885 100644
--- a/cpp/src/tests/ClientChannelTest.cpp
+++ b/cpp/src/tests/ClientChannelTest.cpp
@@ -44,7 +44,7 @@ const size_t FRAME_MAX = 256;
* The test base defines the tests methods, derived classes
* instantiate the channel in Basic or Message mode.
*/
-class ChannelTestBase : public CppUnit::TestCase, public BrokerFixture
+class ChannelTestBase : public CppUnit::TestCase, public SessionFixture
{
struct Listener: public qpid::client::MessageListener {
vector<Message> messages;
diff --git a/cpp/src/tests/ClientSessionTest.cpp b/cpp/src/tests/ClientSessionTest.cpp
index 5f45e1f938..82db7b9545 100644
--- a/cpp/src/tests/ClientSessionTest.cpp
+++ b/cpp/src/tests/ClientSessionTest.cpp
@@ -20,7 +20,6 @@
*/
#include "qpid_test_plugin.h"
#include "BrokerFixture.h"
-#include "SocketProxy.h"
#include "qpid/client/Dispatcher.h"
#include "qpid/client/Session_0_10.h"
#include "qpid/framing/TransferContent.h"
@@ -62,7 +61,7 @@ struct DummyListener : public MessageListener
}
};
-class ClientSessionTest : public CppUnit::TestCase, public BrokerFixture
+class ClientSessionTest : public CppUnit::TestCase, public ProxySessionFixture
{
CPPUNIT_TEST_SUITE(ClientSessionTest);
CPPUNIT_TEST(testQueueQuery);
@@ -71,7 +70,6 @@ class ClientSessionTest : public CppUnit::TestCase, public BrokerFixture
CPPUNIT_TEST(testResumeExpiredError);
CPPUNIT_TEST(testUseSuspendedError);
CPPUNIT_TEST(testSuspendResume);
- CPPUNIT_TEST(testDisconnectResume);
CPPUNIT_TEST_SUITE_END();
public:
@@ -85,11 +83,6 @@ class ClientSessionTest : public CppUnit::TestCase, public BrokerFixture
session.messageFlow(destination=dest, unit=1, value=0xFFFFFFFF);//bytes
}
- bool queueExists(const std::string& q) {
- TypedResult<QueueQueryResult> result = session.queueQuery(q);
- return result.get().getQueue() == q;
- }
-
void testQueueQuery()
{
session =connection.newSession();
@@ -166,26 +159,11 @@ class ClientSessionTest : public CppUnit::TestCase, public BrokerFixture
declareSubscribe();
session.suspend();
// Make sure we are still subscribed after resume.
- connection.resume(session);
+ connection.resume(session);
session.messageTransfer(content=TransferContent("my-message", "my-queue"));
FrameSet::shared_ptr msg = session.get();
CPPUNIT_ASSERT_EQUAL(string("my-message"), msg->getContent());
}
-
- void testDisconnectResume() {
- // FIXME aconway 2007-12-11: Test hanging.
-// ProxyConnection c(broker->getPort());
-// Session_0_10 s = c.session;
-// s.queueDeclare(queue="before");
-// CPPUNIT_ASSERT(queueExists("before"));
-// s.queueDeclare(queue=string("after"));
-// c.proxy.client.close(); // Disconnect the client.
-// Connection c2;
-// open(c2);
-// c2.resume(s);
-// CPPUNIT_ASSERT(queueExists("after"));
-// c2.close();
- }
};
// Make this test suite a plugin.
diff --git a/cpp/src/tests/InProcessBroker.h b/cpp/src/tests/InProcessBroker.h
deleted file mode 100644
index f014941743..0000000000
--- a/cpp/src/tests/InProcessBroker.h
+++ /dev/null
@@ -1,245 +0,0 @@
-#ifndef _tests_InProcessBroker_h
-#define _tests_InProcessBroker_h
-
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#include "qpid/framing/AMQP_HighestVersion.h"
-#include "qpid/framing/AMQFrame.h"
-#include "qpid/broker/Broker.h"
-#include "qpid/broker/Connection.h"
-#include "qpid/client/Connector.h"
-#include "qpid/client/Connection.h"
-#include "qpid/log/Statement.h"
-#include "qpid/sys/Thread.h"
-#include "qpid/sys/BlockingQueue.h"
-#include "qpid/shared_ptr.h"
-
-#include <vector>
-#include <iostream>
-#include <algorithm>
-
-
-namespace qpid {
-
-using qpid::sys::ConnectionInputHandler;
-
-/**
- * A client::Connector that connects directly to an in-process broker.
- * Also allows you to "snoop" on frames exchanged between client & broker.
- *
- * see FramingTest::testRequestResponseRoundtrip() for example of use.
- */
-class InProcessConnector :
- public client::Connector
-{
- public:
- typedef sys::Mutex Mutex;
- typedef Mutex::ScopedLock Lock;
- typedef framing::FrameHandler FrameHandler;
- typedef framing::AMQFrame AMQFrame;
-
- enum Sender {CLIENT,BROKER};
-
- struct Task {
- AMQFrame frame;
- bool doOutput;
-
- Task() : doOutput(true) {}
- Task(AMQFrame& f) : frame(f), doOutput(false) {}
- };
-
- /** Simulate the network thread of a peer with a queue and a thread.
- * With setInputHandler(0) drops frames simulating network packet loss.
- */
- class NetworkQueue : public sys::Runnable
- {
- public:
- NetworkQueue(const char* r) : inputHandler(0), connectionHandler(0), receiver(r) {
- thread=sys::Thread(this);
- }
-
- ~NetworkQueue() {
- queue.close();
- thread.join();
- }
-
- void push(AMQFrame& f) { queue.push(f); }
- void activateOutput() { queue.push(Task()); }
-
- void run() {
- try {
- while(true) {
- Task t = queue.pop();
- if (t.doOutput) {
- if (connectionHandler) {
- while (connectionHandler->doOutput());
- }
- } else {
- if (inputHandler) {
- QPID_LOG(debug, QPID_MSG(receiver << " RECV: " << t.frame));
- inputHandler->handle(t.frame);
- }
- else
- QPID_LOG(debug, QPID_MSG(receiver << " DROP: " << t.frame));
- }
- }
- }
- catch (const std::exception& e) {
- QPID_LOG(debug, QPID_MSG(receiver << " Terminated: " << e.what()));
- return;
- }
- }
-
- void setConnectionInputHandler(ConnectionInputHandler* h) {
- Lock l(lock);
- inputHandler = h;
- connectionHandler = h;
- }
-
- void setInputHandler(FrameHandler* h) {
- Lock l(lock);
- inputHandler = h;
- connectionHandler = 0;
- }
-
- private:
- sys::Mutex lock;
- sys::BlockingQueue<Task> queue;
- sys::Thread thread;
- FrameHandler* inputHandler;
- ConnectionInputHandler* connectionHandler;
- const char* const receiver;
- };
-
- struct InProcessHandler : public sys::ConnectionOutputHandler {
- Sender from;
- NetworkQueue queue;
- const char* const sender;
- NetworkQueue* reverseQueue;
-
- InProcessHandler(Sender s)
- : from(s),
- queue(from==CLIENT? "BROKER" : "CLIENT"),
- sender(from==BROKER? "BROKER" : "CLIENT"),
- reverseQueue(0)
- {}
-
- ~InProcessHandler() { }
-
- void send(AMQFrame& f) {
- QPID_LOG(debug, QPID_MSG(sender << " SENT: " << f));
- queue.push(f);
- }
-
- void close() {
- // Do not shut down the queue here, we may be in
- // the queue's dispatch thread.
- }
-
- void activateOutput() {
- if (reverseQueue) reverseQueue->activateOutput();
- }
- };
-
-
- InProcessConnector(shared_ptr<broker::Broker> b=broker::Broker::create(),
- framing::ProtocolVersion v=framing::ProtocolVersion()) :
- Connector(v),
- protocolInit(v),
- broker(b),
- brokerOut(BROKER),
- brokerConnection(&brokerOut, *broker),
- clientOut(CLIENT),
- isClosed(false)
- {
- clientOut.queue.setConnectionInputHandler(&brokerConnection);
- brokerOut.reverseQueue = &clientOut.queue;
- clientOut.reverseQueue = &brokerOut.queue;
- }
-
- ~InProcessConnector() {
- close();
-
- }
-
- void connect(const std::string& /*host*/, int /*port*/) {}
-
- void init() { brokerConnection.initiated(protocolInit); }
-
- void close() {
- if (!isClosed) {
- isClosed = true;
- brokerOut.close();
- clientOut.close();
- brokerConnection.closed();
- }
- }
-
- /** Client's input handler. */
- void setInputHandler(framing::InputHandler* handler) {
- brokerOut.queue.setInputHandler(handler);
- }
-
- /** Called by client to send a frame */
- void send(framing::AMQFrame& frame) {
- clientOut.handle(frame);
- }
-
- /** Sliently discard frames sent by either party, lost network traffic. */
- void discard() {
- brokerOut.queue.setInputHandler(0);
- clientOut.queue.setConnectionInputHandler(0);
- }
-
- shared_ptr<broker::Broker> getBroker() { return broker; }
-
- private:
- sys::Mutex lock;
- framing::ProtocolInitiation protocolInit;
- shared_ptr<broker::Broker> broker;
- InProcessHandler brokerOut;
- broker::Connection brokerConnection;
- InProcessHandler clientOut;
- bool isClosed;
-};
-
-struct InProcessConnection : public client::Connection {
- /** Connect to an existing broker */
- InProcessConnection(shared_ptr<broker::Broker> b=broker::Broker::create())
- : client::Connection(
- shared_ptr<client::Connector>(new InProcessConnector(b)))
- { open(""); }
-
- InProcessConnector& getConnector() {
- return static_cast<InProcessConnector&>(*impl->getConnector());
- }
-
- /** Simulate disconnected network connection. */
- void disconnect() { getConnector().close(); }
-
- /** Discard frames, simulates lost network traffic. */
- void discard() { getConnector().discard(); }
-
- shared_ptr<broker::Broker> getBroker() {
- return getConnector().getBroker();
- }
-};
-
-} // namespace qpid
-
-#endif // _tests_InProcessBroker_h
diff --git a/cpp/src/tests/Makefile.am b/cpp/src/tests/Makefile.am
index 0dc4b47a84..a25c46b5b0 100644
--- a/cpp/src/tests/Makefile.am
+++ b/cpp/src/tests/Makefile.am
@@ -122,7 +122,6 @@ EXTRA_DIST += \
topictest \
.valgrind.supp \
.valgrindrc \
- InProcessBroker.h \
MessageUtils.h \
MockChannel.h \
MockConnectionInputHandler.h \
diff --git a/cpp/src/tests/SocketProxy.h b/cpp/src/tests/SocketProxy.h
index b985ded175..a37c1f2c3e 100644
--- a/cpp/src/tests/SocketProxy.h
+++ b/cpp/src/tests/SocketProxy.h
@@ -24,59 +24,141 @@
#include "qpid/sys/Socket.h"
#include "qpid/sys/Runnable.h"
#include "qpid/sys/Thread.h"
+#include "qpid/sys/Mutex.h"
+#include "qpid/client/Connection.h"
+#include "qpid/log/Statement.h"
+
+#include <algorithm>
/**
- * A simple socket proxy that forwards to another socket. Used between
- * client & broker to simulate network failures.
+ * A simple socket proxy that forwards to another socket.
+ * Used between client & local broker to simulate network failures.
*/
-struct SocketProxy : public qpid::sys::Runnable
+class SocketProxy : private qpid::sys::Runnable
{
- int port; // Port bound to server socket.
- qpid::sys::Socket client, server; // Client & server sockets.
+ public:
+ /** Connect to connectPort on host, start a forwarding thread.
+ * Listen for connection on getPort().
+ */
+ SocketProxy(int connectPort, const std::string host="localhost")
+ : closed(false), port(listener.listen())
+ {
+ int r=::pipe(closePipe);
+ if (r<0) throwErrno(QPID_MSG("::pipe returned " << r));
+ client.connect(host, connectPort);
+ thread = qpid::sys::Thread(static_cast<qpid::sys::Runnable*>(this));
+ }
+
+ ~SocketProxy() { close(); }
- SocketProxy(const std::string& host, int port) { init(host,port); }
- SocketProxy(int port) { init("localhost",port); }
+ /** Simulate a network disconnect. */
+ void close() {
+ {
+ qpid::sys::Mutex::ScopedLock l(lock);
+ if (closed) return;
+ closed=true;
+ }
+ write(closePipe[1], this, 1); // Random byte to closePipe
+ thread.join();
+ client.close();
+ ::close(closePipe[0]);
+ ::close(closePipe[1]);
+ }
- ~SocketProxy() { client.close(); server.close(); thread.join(); }
+ bool isClosed() const {
+ qpid::sys::Mutex::ScopedLock l(lock);
+ return closed;
+ }
+
+ uint16_t getPort() const { return port; }
private:
-
- void init(const std::string& host, int connectPort) {
- client.connect(host, connectPort);
- port = server.listen();
- thread=qpid::sys::Thread(this);
+ static void throwErrno(const std::string& msg) {
+ throw qpid::Exception(msg+":"+qpid::strError(errno));
}
-
- void run() {
- try {
- do {
- ssize_t recv = server.recv(buffer, sizeof(buffer));
- if (recv <= 0) return;
- ssize_t sent=client.send(buffer, recv);
- if (sent < 0) return;
- assert(sent == recv); // Assumes we can send as we receive.
- } while (true);
- } catch(...) {}
+ static void throwIf(bool condition, const std::string& msg) {
+ if (condition) throw qpid::Exception(msg);
}
+
+ struct FdSet : fd_set {
+ FdSet() : maxFd(0) { clear(); }
+ void clear() { FD_ZERO(this); }
+ void set(int fd) { FD_SET(fd, this); maxFd = std::max(maxFd, fd); }
+ bool isSet(int fd) const { return FD_ISSET(fd, this); }
+ bool operator[](int fd) const { return isSet(fd); }
- qpid::sys::Thread thread;
- char buffer[64*1024];
-};
+ int maxFd;
+ };
-/** A local client connection via a socket proxy. */
-struct ProxyConnection : public qpid::client::Connection {
- SocketProxy proxy;
- qpid::client::Session_0_10 session;
+ enum { RD=1, WR=2, ER=4 };
- ProxyConnection(const std::string& host, int port) : proxy(port) {
- open(host, proxy.port);
- session=newSession();
- }
+ struct Selector {
+ FdSet rd, wr, er;
+
+ void set(int fd, int sets) {
+ if (sets & RD) rd.set(fd);
+ if (sets & WR) wr.set(fd);
+ if (sets & ER) er.set(fd);
+ }
+
+ int select() {
+ for (;;) {
+ int maxFd = std::max(rd.maxFd, std::max(wr.maxFd, er.maxFd));
+ int r = ::select(maxFd + 1, &rd, &wr, &er, NULL);
+ if (r == -1 && errno == EINTR) continue;
+ if (r < 0) throwErrno(QPID_MSG("select returned " <<r));
+ return r;
+ }
+ }
+ };
- ProxyConnection(int port) : proxy(port) {
- open("localhost", proxy.port);
- session=newSession();
+ void run() {
+ std::auto_ptr<qpid::sys::Socket> server;
+ try {
+ // Accept incoming connections, watch closePipe.
+ Selector accept;
+ accept.set(listener.toFd(), RD|ER);
+ accept.set(closePipe[0], RD|ER);
+ accept.select();
+ throwIf(accept.rd[closePipe[0]], "Closed by close()");
+ throwIf(!accept.rd[listener.toFd()],"Accept failed");
+ server.reset(listener.accept(0, 0));
+
+ // Pump data between client & server sockets, watch closePipe.
+ char buffer[1024];
+ for (;;) {
+ Selector select;
+ select.set(server->toFd(), RD|ER);
+ select.set(client.toFd(), RD|ER);
+ select.set(closePipe[0], RD|ER);
+ select.select();
+ throwIf(select.rd[closePipe[0]], "Closed by close()");
+ // Read even if fd is in error to throw a useful exception.
+ bool gotData=false;
+ if (select.rd[server->toFd()] || select.er[server->toFd()]) {
+ client.write(buffer, server->read(buffer, sizeof(buffer)));
+ gotData=true;
+ }
+ if (select.rd[client.toFd()] || select.er[client.toFd()]) {
+ server->write(buffer, client.read(buffer, sizeof(buffer)));
+ gotData=true;
+ }
+ throwIf(!gotData, "No data from select()");
+ }
+ }
+ catch (const std::exception& e) {
+ QPID_LOG(debug, "SocketProxy::run exiting: " << e.what());
+ }
+ if (server.get()) server->close();
+ close();
}
+
+ mutable qpid::sys::Mutex lock;
+ bool closed;
+ qpid::sys::Socket client, listener;
+ uint16_t port;
+ int closePipe[2];
+ qpid::sys::Thread thread;
};
#endif
diff --git a/cpp/src/tests/exception_test.cpp b/cpp/src/tests/exception_test.cpp
index d19307a5c0..700aeef47c 100644
--- a/cpp/src/tests/exception_test.cpp
+++ b/cpp/src/tests/exception_test.cpp
@@ -21,7 +21,6 @@
#include "unit_test.h"
#include "BrokerFixture.h"
-#include "SocketProxy.h"
#include "qpid/client/SubscriptionManager.h"
#include "qpid/sys/Runnable.h"
#include "qpid/sys/Thread.h"
@@ -52,7 +51,7 @@ struct Catcher : public Runnable {
try { f(); }
catch(const Ex& e) {
caught=true;
- BOOST_MESSAGE(e.what());
+ BOOST_MESSAGE(string("Caught expected exception: ")+e.what());
}
catch(const std::exception& e) {
BOOST_ERROR(string("Bad exception: ")+e.what());
@@ -71,37 +70,29 @@ struct Catcher : public Runnable {
}
};
-// FIXME aconway 2007-12-11: Disabled hanging tests.
-// BOOST_FIXTURE_TEST_CASE(DisconnectedGet, BrokerFixture) {
-// ProxyConnection c(broker->getPort());
-// Catcher<ClosedException> get(bind(&Session_0_10::get, c.session));
-// c.proxy.client.close(); // Close the client side.
-// BOOST_CHECK(get.join());
-// }
-
-// BOOST_FIXTURE_TEST_CASE(DisconnectedPop, BrokerFixture) {
-// ProxyConnection c(broker->getPort());
-// c.session.queueDeclare(arg::queue="q");
-// subs.subscribe(lq, "q");
-// Catcher<ClosedException> pop(bind(&LocalQueue::pop, boost::ref(lq)));
-// c.proxy.client.close();
-// BOOST_CHECK(pop.join());
-// }
+BOOST_FIXTURE_TEST_CASE(DisconnectedPop, ProxySessionFixture) {
+ ProxyConnection c(broker->getPort());
+ session.queueDeclare(arg::queue="q");
+ subs.subscribe(lq, "q");
+ Catcher<ClosedException> pop(bind(&LocalQueue::pop, boost::ref(lq)));
+ connection.proxy.close();
+ BOOST_CHECK(pop.join());
+}
-// BOOST_FIXTURE_TEST_CASE(DisconnectedListen, BrokerFixture) {
-// struct NullListener : public MessageListener {
-// void received(Message&) { BOOST_FAIL("Unexpected message"); }
-// } l;
-// ProxyConnection c(broker->getPort());
-// c.session.queueDeclare(arg::queue="q");
-// subs.subscribe(l, "q");
-// Thread t(subs);
-// c.proxy.client.close();
-// t.join();
-// BOOST_CHECK_THROW(c.session.close(), InternalErrorException);
-// }
+BOOST_FIXTURE_TEST_CASE(DisconnectedListen, ProxySessionFixture) {
+ struct NullListener : public MessageListener {
+ void received(Message&) { BOOST_FAIL("Unexpected message"); }
+ } l;
+ ProxyConnection c(broker->getPort());
+ session.queueDeclare(arg::queue="q");
+ subs.subscribe(l, "q");
+ Thread t(subs);
+ connection.proxy.close();
+ t.join();
+ BOOST_CHECK_THROW(session.close(), InternalErrorException);
+}
-BOOST_FIXTURE_TEST_CASE(NoSuchQueueTest, BrokerFixture) {
+BOOST_FIXTURE_TEST_CASE(NoSuchQueueTest, SessionFixture) {
BOOST_CHECK_THROW(subs.subscribe(lq, "no such queue").sync(), NotFoundException);
}
diff --git a/cpp/src/tests/quick_perftest b/cpp/src/tests/quick_perftest
index 5522b7fee4..676436fdc7 100755
--- a/cpp/src/tests/quick_perftest
+++ b/cpp/src/tests/quick_perftest
@@ -1,2 +1,2 @@
#!/bin/sh
-exec `dirname $0`/run_test ./perftest --summary --count 1000
+exec `dirname $0`/run_test ./perftest --summary --count 100