summaryrefslogtreecommitdiff
path: root/cpp
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2010-03-19 17:04:18 +0000
committerGordon Sim <gsim@apache.org>2010-03-19 17:04:18 +0000
commit45b5d1cc1f48ed8f6caef8ee9652f788d69747a5 (patch)
tree285eebffee4dd1252abde2dd39872531565987d3 /cpp
parentd6de561675087e8b1a6978d82569467c4aeff398 (diff)
downloadqpid-python-45b5d1cc1f48ed8f6caef8ee9652f788d69747a5.tar.gz
QPID-664: Prevent dangling pointers when receiver/sender handles stay in scope after connection/session handles goes out of scope. This change require connections to be closed explicitly to avoid leaking memory.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@925332 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
-rw-r--r--cpp/examples/messaging/client.cpp3
-rw-r--r--cpp/examples/messaging/drain.cpp3
-rw-r--r--cpp/examples/messaging/map_receiver.cpp3
-rw-r--r--cpp/examples/messaging/map_sender.cpp4
-rw-r--r--cpp/examples/messaging/queue_receiver.cpp3
-rw-r--r--cpp/examples/messaging/queue_sender.cpp4
-rw-r--r--cpp/examples/messaging/server.cpp3
-rw-r--r--cpp/examples/messaging/spout.cpp3
-rw-r--r--cpp/examples/messaging/topic_receiver.cpp3
-rw-r--r--cpp/examples/messaging/topic_sender.cpp3
-rw-r--r--cpp/include/qpid/messaging/Connection.h5
-rw-r--r--cpp/include/qpid/messaging/Session.h6
-rw-r--r--cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp18
-rw-r--r--cpp/src/qpid/client/amqp0_10/ReceiverImpl.h7
-rw-r--r--cpp/src/qpid/client/amqp0_10/SenderImpl.cpp14
-rw-r--r--cpp/src/qpid/client/amqp0_10/SenderImpl.h7
-rw-r--r--cpp/src/qpid/client/amqp0_10/SessionImpl.cpp8
-rw-r--r--cpp/src/qpid/client/amqp0_10/SessionImpl.h3
-rw-r--r--cpp/src/tests/qpid_recv.cpp3
-rw-r--r--cpp/src/tests/qpid_send.cpp3
-rw-r--r--cpp/src/tests/qpid_stream.cpp3
21 files changed, 67 insertions, 42 deletions
diff --git a/cpp/examples/messaging/client.cpp b/cpp/examples/messaging/client.cpp
index 4d68d7c575..3f7afb5e3e 100644
--- a/cpp/examples/messaging/client.cpp
+++ b/cpp/examples/messaging/client.cpp
@@ -39,8 +39,8 @@ using std::string;
int main(int argc, char** argv) {
const char* url = argc>1 ? argv[1] : "amqp:tcp:127.0.0.1:5672";
+ Connection connection;
try {
- Connection connection;
connection.open(url);
Session session = connection.newSession();
@@ -70,6 +70,7 @@ int main(int argc, char** argv) {
return 0;
} catch(const std::exception& error) {
std::cout << error.what() << std::endl;
+ connection.close();
}
return 1;
}
diff --git a/cpp/examples/messaging/drain.cpp b/cpp/examples/messaging/drain.cpp
index bd18fd3884..38f6bdbb98 100644
--- a/cpp/examples/messaging/drain.cpp
+++ b/cpp/examples/messaging/drain.cpp
@@ -93,8 +93,8 @@ int main(int argc, char** argv)
{
Options options(argv[0]);
if (options.parse(argc, argv)) {
+ Connection connection(options.connectionOptions);
try {
- Connection connection(options.connectionOptions);
connection.open(options.url);
Session session = connection.newSession();
Receiver receiver = session.createReceiver(options.address);
@@ -116,6 +116,7 @@ int main(int argc, char** argv)
return 0;
} catch(const std::exception& error) {
std::cout << error.what() << std::endl;
+ connection.close();
}
}
return 1;
diff --git a/cpp/examples/messaging/map_receiver.cpp b/cpp/examples/messaging/map_receiver.cpp
index 05be4090d2..cdbae6007e 100644
--- a/cpp/examples/messaging/map_receiver.cpp
+++ b/cpp/examples/messaging/map_receiver.cpp
@@ -38,8 +38,8 @@ using std::string;
int main(int argc, char** argv) {
const char* url = argc>1 ? argv[1] : "amqp:tcp:127.0.0.1:5672";
+ Connection connection;
try {
- Connection connection;
connection.open(url);
Session session = connection.newSession();
Receiver receiver = session.createReceiver("message_queue");
@@ -52,6 +52,7 @@ int main(int argc, char** argv) {
return 0;
} catch(const std::exception& error) {
std::cout << error.what() << std::endl;
+ connection.close();
}
return 1;
}
diff --git a/cpp/examples/messaging/map_sender.cpp b/cpp/examples/messaging/map_sender.cpp
index b6e0621844..037bb55201 100644
--- a/cpp/examples/messaging/map_sender.cpp
+++ b/cpp/examples/messaging/map_sender.cpp
@@ -37,9 +37,8 @@ using std::string;
int main(int argc, char** argv) {
const char* url = argc>1 ? argv[1] : "amqp:tcp:127.0.0.1:5672";
-
+ Connection connection;
try {
- Connection connection;
connection.open(url);
Session session = connection.newSession();
Sender sender = session.createSender("message_queue");
@@ -64,6 +63,7 @@ int main(int argc, char** argv) {
return 0;
} catch(const std::exception& error) {
std::cout << error.what() << std::endl;
+ connection.close();
}
return 1;
}
diff --git a/cpp/examples/messaging/queue_receiver.cpp b/cpp/examples/messaging/queue_receiver.cpp
index 192b90088d..95756a9a3d 100644
--- a/cpp/examples/messaging/queue_receiver.cpp
+++ b/cpp/examples/messaging/queue_receiver.cpp
@@ -31,8 +31,8 @@ using namespace qpid::messaging;
int main(int argc, char** argv) {
const char* url = argc>1 ? argv[1] : "amqp:tcp:127.0.0.1:5672";
+ Connection connection;
try {
- Connection connection;
connection.open(url);
Session session = connection.newSession();
Receiver receiver = session.createReceiver("message_queue");
@@ -51,6 +51,7 @@ int main(int argc, char** argv) {
return 0;
} catch(const std::exception& error) {
std::cout << error.what() << std::endl;
+ connection.close();
}
return 1;
}
diff --git a/cpp/examples/messaging/queue_sender.cpp b/cpp/examples/messaging/queue_sender.cpp
index b2535d90bf..439e1dffaf 100644
--- a/cpp/examples/messaging/queue_sender.cpp
+++ b/cpp/examples/messaging/queue_sender.cpp
@@ -34,8 +34,8 @@ int main(int argc, char** argv) {
const char* url = argc>1 ? argv[1] : "amqp:tcp:127.0.0.1:5672";
int count = argc>2 ? atoi(argv[2]) : 10;
+ Connection connection;
try {
- Connection connection;
connection.open(url);
Session session = connection.newSession();
Sender sender = session.createSender("message_queue");
@@ -50,10 +50,10 @@ int main(int argc, char** argv) {
// And send a final message to indicate termination.
sender.send(Message("That's all, folks!"));
session.sync();
- connection.close();
return 0;
} catch(const std::exception& error) {
std::cout << error.what() << std::endl;
+ connection.close();
}
return 1;
}
diff --git a/cpp/examples/messaging/server.cpp b/cpp/examples/messaging/server.cpp
index 0a80a5fb02..046a209e2f 100644
--- a/cpp/examples/messaging/server.cpp
+++ b/cpp/examples/messaging/server.cpp
@@ -41,8 +41,8 @@ using std::string;
int main(int argc, char** argv) {
const char* url = argc>1 ? argv[1] : "amqp:tcp:127.0.0.1:5672";
+ Connection connection;
try {
- Connection connection;
connection.open(url);
Session session = connection.newSession();
Receiver receiver = session.createReceiver("service_queue; {create: always}");
@@ -70,6 +70,7 @@ int main(int argc, char** argv) {
return 0;
} catch(const std::exception& error) {
std::cout << error.what() << std::endl;
+ connection.close();
}
return 1;
}
diff --git a/cpp/examples/messaging/spout.cpp b/cpp/examples/messaging/spout.cpp
index cbb6b52b34..4819c6bc00 100644
--- a/cpp/examples/messaging/spout.cpp
+++ b/cpp/examples/messaging/spout.cpp
@@ -156,8 +156,8 @@ int main(int argc, char** argv)
{
Options options(argv[0]);
if (options.parse(argc, argv)) {
+ Connection connection(options.connectionOptions);
try {
- Connection connection(options.connectionOptions);
connection.open(options.url);
Session session = connection.newSession();
Sender sender = session.createSender(options.address);
@@ -183,6 +183,7 @@ int main(int argc, char** argv)
return 0;
} catch(const std::exception& error) {
std::cout << error.what() << std::endl;
+ connection.close();
}
}
return 1;
diff --git a/cpp/examples/messaging/topic_receiver.cpp b/cpp/examples/messaging/topic_receiver.cpp
index 13f881e574..9e0264a4c3 100644
--- a/cpp/examples/messaging/topic_receiver.cpp
+++ b/cpp/examples/messaging/topic_receiver.cpp
@@ -34,8 +34,8 @@ int main(int argc, char** argv) {
const std::string url = argc>1 ? argv[1] : "amqp:tcp:127.0.0.1:5672";
const std::string pattern = argc>2 ? argv[2] : "#.#";
+ Connection connection;
try {
- Connection connection;
connection.open(url);
Session session = connection.newSession();
Receiver receiver = session.createReceiver("news_service; {filter:[control, " + pattern + "]}");
@@ -53,6 +53,7 @@ int main(int argc, char** argv) {
return 0;
} catch(const std::exception& error) {
std::cout << error.what() << std::endl;
+ connection.close();
}
return 1;
}
diff --git a/cpp/examples/messaging/topic_sender.cpp b/cpp/examples/messaging/topic_sender.cpp
index d1ada45864..a37d4b5371 100644
--- a/cpp/examples/messaging/topic_sender.cpp
+++ b/cpp/examples/messaging/topic_sender.cpp
@@ -51,8 +51,8 @@ int main(int argc, char** argv) {
const char* url = argc>1 ? argv[1] : "amqp:tcp:127.0.0.1:5672";
int count = argc>2 ? atoi(argv[2]) : 10;
+ Connection connection;
try {
- Connection connection;
connection.open(url);
Session session = connection.newSession();
Sender sender = session.createSender("news_service");
@@ -72,6 +72,7 @@ int main(int argc, char** argv) {
return 0;
} catch(const std::exception& error) {
std::cout << error.what() << std::endl;
+ connection.close();
}
return 1;
}
diff --git a/cpp/include/qpid/messaging/Connection.h b/cpp/include/qpid/messaging/Connection.h
index 7f324cea1e..b5eeeb2980 100644
--- a/cpp/include/qpid/messaging/Connection.h
+++ b/cpp/include/qpid/messaging/Connection.h
@@ -80,6 +80,11 @@ class Connection : public qpid::messaging::Handle<ConnectionImpl>
QPID_CLIENT_EXTERN Connection& operator=(const Connection&);
QPID_CLIENT_EXTERN void setOption(const std::string& name, const Variant& value);
QPID_CLIENT_EXTERN void open(const std::string& url);
+ /**
+ * Closes a connection and all sessions associated with it. An
+ * opened connection must be closed before the last handle is
+ * allowed to go out of scope.
+ */
QPID_CLIENT_EXTERN void close();
QPID_CLIENT_EXTERN Session newSession(bool transactional, const std::string& name = std::string());
QPID_CLIENT_EXTERN Session newSession(const std::string& name = std::string());
diff --git a/cpp/include/qpid/messaging/Session.h b/cpp/include/qpid/messaging/Session.h
index 5375b4d346..032b678c5c 100644
--- a/cpp/include/qpid/messaging/Session.h
+++ b/cpp/include/qpid/messaging/Session.h
@@ -58,6 +58,12 @@ class Session : public qpid::messaging::Handle<SessionImpl>
QPID_CLIENT_EXTERN ~Session();
QPID_CLIENT_EXTERN Session& operator=(const Session&);
+ /**
+ * Closes a session and all associated senders and receivers. An
+ * opened session should be closed before the last handle to it
+ * goes out of scope. All a connections sessions can be closed by
+ * a call to Connection::close().
+ */
QPID_CLIENT_EXTERN void close();
QPID_CLIENT_EXTERN void commit();
diff --git a/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp b/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp
index e24f2ba5b4..2f52efbceb 100644
--- a/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp
+++ b/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp
@@ -57,14 +57,14 @@ qpid::messaging::Message ReceiverImpl::fetch(qpid::messaging::Duration timeout)
bool ReceiverImpl::get(qpid::messaging::Message& message, qpid::messaging::Duration timeout)
{
Get f(*this, message, timeout);
- while (!parent.execute(f)) {}
+ while (!parent->execute(f)) {}
return f.result;
}
bool ReceiverImpl::fetch(qpid::messaging::Message& message, qpid::messaging::Duration timeout)
{
Fetch f(*this, message, timeout);
- while (!parent.execute(f)) {}
+ while (!parent->execute(f)) {}
return f.result;
}
@@ -112,7 +112,7 @@ void ReceiverImpl::init(qpid::client::AsyncSession s, AddressResolution& resolve
}
if (state == CANCELLED) {
source->cancel(session, destination);
- parent.receiverCancelled(destination);
+ parent->receiverCancelled(destination);
} else {
source->subscribe(session, destination);
start();
@@ -129,23 +129,23 @@ uint32_t ReceiverImpl::getCapacity()
uint32_t ReceiverImpl::available()
{
- return parent.available(destination);
+ return parent->available(destination);
}
uint32_t ReceiverImpl::pendingAck()
{
- return parent.pendingAck(destination);
+ return parent->pendingAck(destination);
}
ReceiverImpl::ReceiverImpl(SessionImpl& p, const std::string& name,
const qpid::messaging::Address& a) :
- parent(p), destination(name), address(a), byteCredit(0xFFFFFFFF),
+ parent(&p), destination(name), address(a), byteCredit(0xFFFFFFFF),
state(UNRESOLVED), capacity(0), window(0) {}
bool ReceiverImpl::getImpl(qpid::messaging::Message& message, qpid::messaging::Duration timeout)
{
- return parent.get(*this, message, timeout);
+ return parent->get(*this, message, timeout);
}
bool ReceiverImpl::fetchImpl(qpid::messaging::Message& message, qpid::messaging::Duration timeout)
@@ -172,7 +172,7 @@ void ReceiverImpl::closeImpl()
if (state != CANCELLED) {
state = CANCELLED;
source->cancel(session, destination);
- parent.receiverCancelled(destination);
+ parent->receiverCancelled(destination);
}
}
@@ -188,7 +188,7 @@ void ReceiverImpl::setCapacityImpl(uint32_t c)
}
qpid::messaging::Session ReceiverImpl::getSession() const
{
- return qpid::messaging::Session(&parent);
+ return qpid::messaging::Session(parent.get());
}
}}} // namespace qpid::client::amqp0_10
diff --git a/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h b/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h
index 38aa125ec6..689a7f6f25 100644
--- a/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h
+++ b/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h
@@ -28,6 +28,7 @@
#include "qpid/client/AsyncSession.h"
#include "qpid/client/amqp0_10/SessionImpl.h"
#include "qpid/messaging/Duration.h"
+#include <boost/intrusive_ptr.hpp>
#include <memory>
namespace qpid {
@@ -65,7 +66,7 @@ class ReceiverImpl : public qpid::messaging::ReceiverImpl
void received(qpid::messaging::Message& message);
qpid::messaging::Session getSession() const;
private:
- SessionImpl& parent;
+ boost::intrusive_ptr<SessionImpl> parent;
const std::string destination;
const qpid::messaging::Address address;
const uint32_t byteCredit;
@@ -133,13 +134,13 @@ class ReceiverImpl : public qpid::messaging::ReceiverImpl
template <class F> void execute()
{
F f(*this);
- parent.execute(f);
+ parent->execute(f);
}
template <class F, class P> void execute1(P p)
{
F f(*this, p);
- parent.execute(f);
+ parent->execute(f);
}
};
diff --git a/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp b/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp
index 8782e6e813..9bb785e13f 100644
--- a/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp
+++ b/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp
@@ -31,17 +31,17 @@ namespace amqp0_10 {
SenderImpl::SenderImpl(SessionImpl& _parent, const std::string& _name,
const qpid::messaging::Address& _address) :
- parent(_parent), name(_name), address(_address), state(UNRESOLVED),
+ parent(&_parent), name(_name), address(_address), state(UNRESOLVED),
capacity(50), window(0), flushed(false), unreliable(AddressResolution::is_unreliable(address)) {}
void SenderImpl::send(const qpid::messaging::Message& message)
{
if (unreliable) {
UnreliableSend f(*this, &message);
- parent.execute(f);
+ parent->execute(f);
} else {
Send f(*this, &message);
- while (f.repeat) parent.execute(f);
+ while (f.repeat) parent->execute(f);
}
}
@@ -60,7 +60,7 @@ uint32_t SenderImpl::getCapacity() { return capacity; }
uint32_t SenderImpl::pending()
{
CheckPendingSends f(*this, false);
- parent.execute(f);
+ parent->execute(f);
return f.pending;
}
@@ -73,7 +73,7 @@ void SenderImpl::init(qpid::client::AsyncSession s, AddressResolution& resolver)
}
if (state == CANCELLED) {
sink->cancel(session, name);
- parent.senderCancelled(name);
+ parent->senderCancelled(name);
} else {
sink->declare(session, name);
replay();
@@ -140,7 +140,7 @@ void SenderImpl::closeImpl()
{
state = CANCELLED;
sink->cancel(session, name);
- parent.senderCancelled(name);
+ parent->senderCancelled(name);
}
const std::string& SenderImpl::getName() const
@@ -150,7 +150,7 @@ const std::string& SenderImpl::getName() const
qpid::messaging::Session SenderImpl::getSession() const
{
- return qpid::messaging::Session(&parent);
+ return qpid::messaging::Session(parent.get());
}
}}} // namespace qpid::client::amqp0_10
diff --git a/cpp/src/qpid/client/amqp0_10/SenderImpl.h b/cpp/src/qpid/client/amqp0_10/SenderImpl.h
index b65f8cf8cc..9e4181f42f 100644
--- a/cpp/src/qpid/client/amqp0_10/SenderImpl.h
+++ b/cpp/src/qpid/client/amqp0_10/SenderImpl.h
@@ -28,6 +28,7 @@
#include "qpid/client/AsyncSession.h"
#include "qpid/client/amqp0_10/SessionImpl.h"
#include <memory>
+#include <boost/intrusive_ptr.hpp>
#include <boost/ptr_container/ptr_deque.hpp>
namespace qpid {
@@ -58,7 +59,7 @@ class SenderImpl : public qpid::messaging::SenderImpl
qpid::messaging::Session getSession() const;
private:
- SessionImpl& parent;
+ boost::intrusive_ptr<SessionImpl> parent;
const std::string name;
const qpid::messaging::Address address;
State state;
@@ -143,13 +144,13 @@ class SenderImpl : public qpid::messaging::SenderImpl
template <class F> void execute()
{
F f(*this);
- parent.execute(f);
+ parent->execute(f);
}
template <class F, class P> bool execute1(P p)
{
F f(*this, p);
- return parent.execute(f);
+ return parent->execute(f);
}
};
}}} // namespace qpid::client::amqp0_10
diff --git a/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp b/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp
index f12d383206..65308dd0be 100644
--- a/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp
+++ b/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp
@@ -49,7 +49,7 @@ namespace qpid {
namespace client {
namespace amqp0_10 {
-SessionImpl::SessionImpl(ConnectionImpl& c, bool t) : connection(c), transactional(t) {}
+SessionImpl::SessionImpl(ConnectionImpl& c, bool t) : connection(&c), transactional(t) {}
void SessionImpl::sync()
@@ -108,7 +108,7 @@ void SessionImpl::close()
for (std::vector<std::string>::const_iterator i = r.begin(); i != r.end(); ++i) getReceiver(*i).close();
- connection.closed(*this);
+ connection->closed(*this);
session.close();
}
@@ -431,12 +431,12 @@ void SessionImpl::senderCancelled(const std::string& name)
void SessionImpl::reconnect()
{
- connection.connect();
+ connection->connect();
}
qpid::messaging::Connection SessionImpl::getConnection() const
{
- return qpid::messaging::Connection(&connection);
+ return qpid::messaging::Connection(connection.get());
}
}}} // namespace qpid::client::amqp0_10
diff --git a/cpp/src/qpid/client/amqp0_10/SessionImpl.h b/cpp/src/qpid/client/amqp0_10/SessionImpl.h
index 285c8f031b..a7eaae3cdd 100644
--- a/cpp/src/qpid/client/amqp0_10/SessionImpl.h
+++ b/cpp/src/qpid/client/amqp0_10/SessionImpl.h
@@ -29,6 +29,7 @@
#include "qpid/client/amqp0_10/AddressResolution.h"
#include "qpid/client/amqp0_10/IncomingMessages.h"
#include "qpid/sys/Mutex.h"
+#include <boost/intrusive_ptr.hpp>
namespace qpid {
@@ -106,7 +107,7 @@ class SessionImpl : public qpid::messaging::SessionImpl
typedef std::map<std::string, qpid::messaging::Sender> Senders;
mutable qpid::sys::Mutex lock;
- ConnectionImpl& connection;
+ boost::intrusive_ptr<ConnectionImpl> connection;
qpid::client::Session session;
AddressResolution resolver;
IncomingMessages incoming;
diff --git a/cpp/src/tests/qpid_recv.cpp b/cpp/src/tests/qpid_recv.cpp
index e4cc6a7ac8..10738578ed 100644
--- a/cpp/src/tests/qpid_recv.cpp
+++ b/cpp/src/tests/qpid_recv.cpp
@@ -148,8 +148,8 @@ int main(int argc, char ** argv)
{
Options opts;
if (opts.parse(argc, argv)) {
+ Connection connection(opts.connectionOptions);
try {
- Connection connection(opts.connectionOptions);
connection.open(opts.url);
std::auto_ptr<FailoverUpdates> updates(opts.failoverUpdates ? new FailoverUpdates(connection) : 0);
Session session = connection.newSession(opts.tx > 0);
@@ -207,6 +207,7 @@ int main(int argc, char ** argv)
return 0;
} catch(const std::exception& error) {
std::cerr << "Failure: " << error.what() << std::endl;
+ connection.close();
}
}
return 1;
diff --git a/cpp/src/tests/qpid_send.cpp b/cpp/src/tests/qpid_send.cpp
index 50e6c4371a..a8b0241a1d 100644
--- a/cpp/src/tests/qpid_send.cpp
+++ b/cpp/src/tests/qpid_send.cpp
@@ -181,8 +181,8 @@ int main(int argc, char ** argv)
{
Options opts;
if (opts.parse(argc, argv)) {
+ Connection connection(opts.connectionOptions);
try {
- Connection connection(opts.connectionOptions);
connection.open(opts.url);
std::auto_ptr<FailoverUpdates> updates(opts.failoverUpdates ? new FailoverUpdates(connection) : 0);
Session session = connection.newSession(opts.tx > 0);
@@ -230,6 +230,7 @@ int main(int argc, char ** argv)
return 0;
} catch(const std::exception& error) {
std::cout << "Failed: " << error.what() << std::endl;
+ connection.close();
}
}
return 1;
diff --git a/cpp/src/tests/qpid_stream.cpp b/cpp/src/tests/qpid_stream.cpp
index ef0aea52e4..5ed7f84492 100644
--- a/cpp/src/tests/qpid_stream.cpp
+++ b/cpp/src/tests/qpid_stream.cpp
@@ -87,8 +87,8 @@ struct Client : qpid::sys::Runnable
void run()
{
+ Connection connection;
try {
- Connection connection;
connection.open(opts.url);
Session session = connection.newSession();
doWork(session);
@@ -96,6 +96,7 @@ struct Client : qpid::sys::Runnable
connection.close();
} catch(const std::exception& error) {
std::cout << error.what() << std::endl;
+ connection.close();
}
}