diff options
author | Gordon Sim <gsim@apache.org> | 2010-03-19 17:04:18 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2010-03-19 17:04:18 +0000 |
commit | 45b5d1cc1f48ed8f6caef8ee9652f788d69747a5 (patch) | |
tree | 285eebffee4dd1252abde2dd39872531565987d3 /cpp | |
parent | d6de561675087e8b1a6978d82569467c4aeff398 (diff) | |
download | qpid-python-45b5d1cc1f48ed8f6caef8ee9652f788d69747a5.tar.gz |
QPID-664: Prevent dangling pointers when receiver/sender handles stay in scope after connection/session handles goes out of scope. This change require connections to be closed explicitly to avoid leaking memory.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@925332 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
21 files changed, 67 insertions, 42 deletions
diff --git a/cpp/examples/messaging/client.cpp b/cpp/examples/messaging/client.cpp index 4d68d7c575..3f7afb5e3e 100644 --- a/cpp/examples/messaging/client.cpp +++ b/cpp/examples/messaging/client.cpp @@ -39,8 +39,8 @@ using std::string; int main(int argc, char** argv) { const char* url = argc>1 ? argv[1] : "amqp:tcp:127.0.0.1:5672"; + Connection connection; try { - Connection connection; connection.open(url); Session session = connection.newSession(); @@ -70,6 +70,7 @@ int main(int argc, char** argv) { return 0; } catch(const std::exception& error) { std::cout << error.what() << std::endl; + connection.close(); } return 1; } diff --git a/cpp/examples/messaging/drain.cpp b/cpp/examples/messaging/drain.cpp index bd18fd3884..38f6bdbb98 100644 --- a/cpp/examples/messaging/drain.cpp +++ b/cpp/examples/messaging/drain.cpp @@ -93,8 +93,8 @@ int main(int argc, char** argv) { Options options(argv[0]); if (options.parse(argc, argv)) { + Connection connection(options.connectionOptions); try { - Connection connection(options.connectionOptions); connection.open(options.url); Session session = connection.newSession(); Receiver receiver = session.createReceiver(options.address); @@ -116,6 +116,7 @@ int main(int argc, char** argv) return 0; } catch(const std::exception& error) { std::cout << error.what() << std::endl; + connection.close(); } } return 1; diff --git a/cpp/examples/messaging/map_receiver.cpp b/cpp/examples/messaging/map_receiver.cpp index 05be4090d2..cdbae6007e 100644 --- a/cpp/examples/messaging/map_receiver.cpp +++ b/cpp/examples/messaging/map_receiver.cpp @@ -38,8 +38,8 @@ using std::string; int main(int argc, char** argv) { const char* url = argc>1 ? argv[1] : "amqp:tcp:127.0.0.1:5672"; + Connection connection; try { - Connection connection; connection.open(url); Session session = connection.newSession(); Receiver receiver = session.createReceiver("message_queue"); @@ -52,6 +52,7 @@ int main(int argc, char** argv) { return 0; } catch(const std::exception& error) { std::cout << error.what() << std::endl; + connection.close(); } return 1; } diff --git a/cpp/examples/messaging/map_sender.cpp b/cpp/examples/messaging/map_sender.cpp index b6e0621844..037bb55201 100644 --- a/cpp/examples/messaging/map_sender.cpp +++ b/cpp/examples/messaging/map_sender.cpp @@ -37,9 +37,8 @@ using std::string; int main(int argc, char** argv) { const char* url = argc>1 ? argv[1] : "amqp:tcp:127.0.0.1:5672"; - + Connection connection; try { - Connection connection; connection.open(url); Session session = connection.newSession(); Sender sender = session.createSender("message_queue"); @@ -64,6 +63,7 @@ int main(int argc, char** argv) { return 0; } catch(const std::exception& error) { std::cout << error.what() << std::endl; + connection.close(); } return 1; } diff --git a/cpp/examples/messaging/queue_receiver.cpp b/cpp/examples/messaging/queue_receiver.cpp index 192b90088d..95756a9a3d 100644 --- a/cpp/examples/messaging/queue_receiver.cpp +++ b/cpp/examples/messaging/queue_receiver.cpp @@ -31,8 +31,8 @@ using namespace qpid::messaging; int main(int argc, char** argv) { const char* url = argc>1 ? argv[1] : "amqp:tcp:127.0.0.1:5672"; + Connection connection; try { - Connection connection; connection.open(url); Session session = connection.newSession(); Receiver receiver = session.createReceiver("message_queue"); @@ -51,6 +51,7 @@ int main(int argc, char** argv) { return 0; } catch(const std::exception& error) { std::cout << error.what() << std::endl; + connection.close(); } return 1; } diff --git a/cpp/examples/messaging/queue_sender.cpp b/cpp/examples/messaging/queue_sender.cpp index b2535d90bf..439e1dffaf 100644 --- a/cpp/examples/messaging/queue_sender.cpp +++ b/cpp/examples/messaging/queue_sender.cpp @@ -34,8 +34,8 @@ int main(int argc, char** argv) { const char* url = argc>1 ? argv[1] : "amqp:tcp:127.0.0.1:5672"; int count = argc>2 ? atoi(argv[2]) : 10; + Connection connection; try { - Connection connection; connection.open(url); Session session = connection.newSession(); Sender sender = session.createSender("message_queue"); @@ -50,10 +50,10 @@ int main(int argc, char** argv) { // And send a final message to indicate termination. sender.send(Message("That's all, folks!")); session.sync(); - connection.close(); return 0; } catch(const std::exception& error) { std::cout << error.what() << std::endl; + connection.close(); } return 1; } diff --git a/cpp/examples/messaging/server.cpp b/cpp/examples/messaging/server.cpp index 0a80a5fb02..046a209e2f 100644 --- a/cpp/examples/messaging/server.cpp +++ b/cpp/examples/messaging/server.cpp @@ -41,8 +41,8 @@ using std::string; int main(int argc, char** argv) { const char* url = argc>1 ? argv[1] : "amqp:tcp:127.0.0.1:5672"; + Connection connection; try { - Connection connection; connection.open(url); Session session = connection.newSession(); Receiver receiver = session.createReceiver("service_queue; {create: always}"); @@ -70,6 +70,7 @@ int main(int argc, char** argv) { return 0; } catch(const std::exception& error) { std::cout << error.what() << std::endl; + connection.close(); } return 1; } diff --git a/cpp/examples/messaging/spout.cpp b/cpp/examples/messaging/spout.cpp index cbb6b52b34..4819c6bc00 100644 --- a/cpp/examples/messaging/spout.cpp +++ b/cpp/examples/messaging/spout.cpp @@ -156,8 +156,8 @@ int main(int argc, char** argv) { Options options(argv[0]); if (options.parse(argc, argv)) { + Connection connection(options.connectionOptions); try { - Connection connection(options.connectionOptions); connection.open(options.url); Session session = connection.newSession(); Sender sender = session.createSender(options.address); @@ -183,6 +183,7 @@ int main(int argc, char** argv) return 0; } catch(const std::exception& error) { std::cout << error.what() << std::endl; + connection.close(); } } return 1; diff --git a/cpp/examples/messaging/topic_receiver.cpp b/cpp/examples/messaging/topic_receiver.cpp index 13f881e574..9e0264a4c3 100644 --- a/cpp/examples/messaging/topic_receiver.cpp +++ b/cpp/examples/messaging/topic_receiver.cpp @@ -34,8 +34,8 @@ int main(int argc, char** argv) { const std::string url = argc>1 ? argv[1] : "amqp:tcp:127.0.0.1:5672"; const std::string pattern = argc>2 ? argv[2] : "#.#"; + Connection connection; try { - Connection connection; connection.open(url); Session session = connection.newSession(); Receiver receiver = session.createReceiver("news_service; {filter:[control, " + pattern + "]}"); @@ -53,6 +53,7 @@ int main(int argc, char** argv) { return 0; } catch(const std::exception& error) { std::cout << error.what() << std::endl; + connection.close(); } return 1; } diff --git a/cpp/examples/messaging/topic_sender.cpp b/cpp/examples/messaging/topic_sender.cpp index d1ada45864..a37d4b5371 100644 --- a/cpp/examples/messaging/topic_sender.cpp +++ b/cpp/examples/messaging/topic_sender.cpp @@ -51,8 +51,8 @@ int main(int argc, char** argv) { const char* url = argc>1 ? argv[1] : "amqp:tcp:127.0.0.1:5672"; int count = argc>2 ? atoi(argv[2]) : 10; + Connection connection; try { - Connection connection; connection.open(url); Session session = connection.newSession(); Sender sender = session.createSender("news_service"); @@ -72,6 +72,7 @@ int main(int argc, char** argv) { return 0; } catch(const std::exception& error) { std::cout << error.what() << std::endl; + connection.close(); } return 1; } diff --git a/cpp/include/qpid/messaging/Connection.h b/cpp/include/qpid/messaging/Connection.h index 7f324cea1e..b5eeeb2980 100644 --- a/cpp/include/qpid/messaging/Connection.h +++ b/cpp/include/qpid/messaging/Connection.h @@ -80,6 +80,11 @@ class Connection : public qpid::messaging::Handle<ConnectionImpl> QPID_CLIENT_EXTERN Connection& operator=(const Connection&); QPID_CLIENT_EXTERN void setOption(const std::string& name, const Variant& value); QPID_CLIENT_EXTERN void open(const std::string& url); + /** + * Closes a connection and all sessions associated with it. An + * opened connection must be closed before the last handle is + * allowed to go out of scope. + */ QPID_CLIENT_EXTERN void close(); QPID_CLIENT_EXTERN Session newSession(bool transactional, const std::string& name = std::string()); QPID_CLIENT_EXTERN Session newSession(const std::string& name = std::string()); diff --git a/cpp/include/qpid/messaging/Session.h b/cpp/include/qpid/messaging/Session.h index 5375b4d346..032b678c5c 100644 --- a/cpp/include/qpid/messaging/Session.h +++ b/cpp/include/qpid/messaging/Session.h @@ -58,6 +58,12 @@ class Session : public qpid::messaging::Handle<SessionImpl> QPID_CLIENT_EXTERN ~Session(); QPID_CLIENT_EXTERN Session& operator=(const Session&); + /** + * Closes a session and all associated senders and receivers. An + * opened session should be closed before the last handle to it + * goes out of scope. All a connections sessions can be closed by + * a call to Connection::close(). + */ QPID_CLIENT_EXTERN void close(); QPID_CLIENT_EXTERN void commit(); diff --git a/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp b/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp index e24f2ba5b4..2f52efbceb 100644 --- a/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp +++ b/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp @@ -57,14 +57,14 @@ qpid::messaging::Message ReceiverImpl::fetch(qpid::messaging::Duration timeout) bool ReceiverImpl::get(qpid::messaging::Message& message, qpid::messaging::Duration timeout) { Get f(*this, message, timeout); - while (!parent.execute(f)) {} + while (!parent->execute(f)) {} return f.result; } bool ReceiverImpl::fetch(qpid::messaging::Message& message, qpid::messaging::Duration timeout) { Fetch f(*this, message, timeout); - while (!parent.execute(f)) {} + while (!parent->execute(f)) {} return f.result; } @@ -112,7 +112,7 @@ void ReceiverImpl::init(qpid::client::AsyncSession s, AddressResolution& resolve } if (state == CANCELLED) { source->cancel(session, destination); - parent.receiverCancelled(destination); + parent->receiverCancelled(destination); } else { source->subscribe(session, destination); start(); @@ -129,23 +129,23 @@ uint32_t ReceiverImpl::getCapacity() uint32_t ReceiverImpl::available() { - return parent.available(destination); + return parent->available(destination); } uint32_t ReceiverImpl::pendingAck() { - return parent.pendingAck(destination); + return parent->pendingAck(destination); } ReceiverImpl::ReceiverImpl(SessionImpl& p, const std::string& name, const qpid::messaging::Address& a) : - parent(p), destination(name), address(a), byteCredit(0xFFFFFFFF), + parent(&p), destination(name), address(a), byteCredit(0xFFFFFFFF), state(UNRESOLVED), capacity(0), window(0) {} bool ReceiverImpl::getImpl(qpid::messaging::Message& message, qpid::messaging::Duration timeout) { - return parent.get(*this, message, timeout); + return parent->get(*this, message, timeout); } bool ReceiverImpl::fetchImpl(qpid::messaging::Message& message, qpid::messaging::Duration timeout) @@ -172,7 +172,7 @@ void ReceiverImpl::closeImpl() if (state != CANCELLED) { state = CANCELLED; source->cancel(session, destination); - parent.receiverCancelled(destination); + parent->receiverCancelled(destination); } } @@ -188,7 +188,7 @@ void ReceiverImpl::setCapacityImpl(uint32_t c) } qpid::messaging::Session ReceiverImpl::getSession() const { - return qpid::messaging::Session(&parent); + return qpid::messaging::Session(parent.get()); } }}} // namespace qpid::client::amqp0_10 diff --git a/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h b/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h index 38aa125ec6..689a7f6f25 100644 --- a/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h +++ b/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h @@ -28,6 +28,7 @@ #include "qpid/client/AsyncSession.h" #include "qpid/client/amqp0_10/SessionImpl.h" #include "qpid/messaging/Duration.h" +#include <boost/intrusive_ptr.hpp> #include <memory> namespace qpid { @@ -65,7 +66,7 @@ class ReceiverImpl : public qpid::messaging::ReceiverImpl void received(qpid::messaging::Message& message); qpid::messaging::Session getSession() const; private: - SessionImpl& parent; + boost::intrusive_ptr<SessionImpl> parent; const std::string destination; const qpid::messaging::Address address; const uint32_t byteCredit; @@ -133,13 +134,13 @@ class ReceiverImpl : public qpid::messaging::ReceiverImpl template <class F> void execute() { F f(*this); - parent.execute(f); + parent->execute(f); } template <class F, class P> void execute1(P p) { F f(*this, p); - parent.execute(f); + parent->execute(f); } }; diff --git a/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp b/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp index 8782e6e813..9bb785e13f 100644 --- a/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp +++ b/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp @@ -31,17 +31,17 @@ namespace amqp0_10 { SenderImpl::SenderImpl(SessionImpl& _parent, const std::string& _name, const qpid::messaging::Address& _address) : - parent(_parent), name(_name), address(_address), state(UNRESOLVED), + parent(&_parent), name(_name), address(_address), state(UNRESOLVED), capacity(50), window(0), flushed(false), unreliable(AddressResolution::is_unreliable(address)) {} void SenderImpl::send(const qpid::messaging::Message& message) { if (unreliable) { UnreliableSend f(*this, &message); - parent.execute(f); + parent->execute(f); } else { Send f(*this, &message); - while (f.repeat) parent.execute(f); + while (f.repeat) parent->execute(f); } } @@ -60,7 +60,7 @@ uint32_t SenderImpl::getCapacity() { return capacity; } uint32_t SenderImpl::pending() { CheckPendingSends f(*this, false); - parent.execute(f); + parent->execute(f); return f.pending; } @@ -73,7 +73,7 @@ void SenderImpl::init(qpid::client::AsyncSession s, AddressResolution& resolver) } if (state == CANCELLED) { sink->cancel(session, name); - parent.senderCancelled(name); + parent->senderCancelled(name); } else { sink->declare(session, name); replay(); @@ -140,7 +140,7 @@ void SenderImpl::closeImpl() { state = CANCELLED; sink->cancel(session, name); - parent.senderCancelled(name); + parent->senderCancelled(name); } const std::string& SenderImpl::getName() const @@ -150,7 +150,7 @@ const std::string& SenderImpl::getName() const qpid::messaging::Session SenderImpl::getSession() const { - return qpid::messaging::Session(&parent); + return qpid::messaging::Session(parent.get()); } }}} // namespace qpid::client::amqp0_10 diff --git a/cpp/src/qpid/client/amqp0_10/SenderImpl.h b/cpp/src/qpid/client/amqp0_10/SenderImpl.h index b65f8cf8cc..9e4181f42f 100644 --- a/cpp/src/qpid/client/amqp0_10/SenderImpl.h +++ b/cpp/src/qpid/client/amqp0_10/SenderImpl.h @@ -28,6 +28,7 @@ #include "qpid/client/AsyncSession.h" #include "qpid/client/amqp0_10/SessionImpl.h" #include <memory> +#include <boost/intrusive_ptr.hpp> #include <boost/ptr_container/ptr_deque.hpp> namespace qpid { @@ -58,7 +59,7 @@ class SenderImpl : public qpid::messaging::SenderImpl qpid::messaging::Session getSession() const; private: - SessionImpl& parent; + boost::intrusive_ptr<SessionImpl> parent; const std::string name; const qpid::messaging::Address address; State state; @@ -143,13 +144,13 @@ class SenderImpl : public qpid::messaging::SenderImpl template <class F> void execute() { F f(*this); - parent.execute(f); + parent->execute(f); } template <class F, class P> bool execute1(P p) { F f(*this, p); - return parent.execute(f); + return parent->execute(f); } }; }}} // namespace qpid::client::amqp0_10 diff --git a/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp b/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp index f12d383206..65308dd0be 100644 --- a/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp +++ b/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp @@ -49,7 +49,7 @@ namespace qpid { namespace client { namespace amqp0_10 { -SessionImpl::SessionImpl(ConnectionImpl& c, bool t) : connection(c), transactional(t) {} +SessionImpl::SessionImpl(ConnectionImpl& c, bool t) : connection(&c), transactional(t) {} void SessionImpl::sync() @@ -108,7 +108,7 @@ void SessionImpl::close() for (std::vector<std::string>::const_iterator i = r.begin(); i != r.end(); ++i) getReceiver(*i).close(); - connection.closed(*this); + connection->closed(*this); session.close(); } @@ -431,12 +431,12 @@ void SessionImpl::senderCancelled(const std::string& name) void SessionImpl::reconnect() { - connection.connect(); + connection->connect(); } qpid::messaging::Connection SessionImpl::getConnection() const { - return qpid::messaging::Connection(&connection); + return qpid::messaging::Connection(connection.get()); } }}} // namespace qpid::client::amqp0_10 diff --git a/cpp/src/qpid/client/amqp0_10/SessionImpl.h b/cpp/src/qpid/client/amqp0_10/SessionImpl.h index 285c8f031b..a7eaae3cdd 100644 --- a/cpp/src/qpid/client/amqp0_10/SessionImpl.h +++ b/cpp/src/qpid/client/amqp0_10/SessionImpl.h @@ -29,6 +29,7 @@ #include "qpid/client/amqp0_10/AddressResolution.h" #include "qpid/client/amqp0_10/IncomingMessages.h" #include "qpid/sys/Mutex.h" +#include <boost/intrusive_ptr.hpp> namespace qpid { @@ -106,7 +107,7 @@ class SessionImpl : public qpid::messaging::SessionImpl typedef std::map<std::string, qpid::messaging::Sender> Senders; mutable qpid::sys::Mutex lock; - ConnectionImpl& connection; + boost::intrusive_ptr<ConnectionImpl> connection; qpid::client::Session session; AddressResolution resolver; IncomingMessages incoming; diff --git a/cpp/src/tests/qpid_recv.cpp b/cpp/src/tests/qpid_recv.cpp index e4cc6a7ac8..10738578ed 100644 --- a/cpp/src/tests/qpid_recv.cpp +++ b/cpp/src/tests/qpid_recv.cpp @@ -148,8 +148,8 @@ int main(int argc, char ** argv) { Options opts; if (opts.parse(argc, argv)) { + Connection connection(opts.connectionOptions); try { - Connection connection(opts.connectionOptions); connection.open(opts.url); std::auto_ptr<FailoverUpdates> updates(opts.failoverUpdates ? new FailoverUpdates(connection) : 0); Session session = connection.newSession(opts.tx > 0); @@ -207,6 +207,7 @@ int main(int argc, char ** argv) return 0; } catch(const std::exception& error) { std::cerr << "Failure: " << error.what() << std::endl; + connection.close(); } } return 1; diff --git a/cpp/src/tests/qpid_send.cpp b/cpp/src/tests/qpid_send.cpp index 50e6c4371a..a8b0241a1d 100644 --- a/cpp/src/tests/qpid_send.cpp +++ b/cpp/src/tests/qpid_send.cpp @@ -181,8 +181,8 @@ int main(int argc, char ** argv) { Options opts; if (opts.parse(argc, argv)) { + Connection connection(opts.connectionOptions); try { - Connection connection(opts.connectionOptions); connection.open(opts.url); std::auto_ptr<FailoverUpdates> updates(opts.failoverUpdates ? new FailoverUpdates(connection) : 0); Session session = connection.newSession(opts.tx > 0); @@ -230,6 +230,7 @@ int main(int argc, char ** argv) return 0; } catch(const std::exception& error) { std::cout << "Failed: " << error.what() << std::endl; + connection.close(); } } return 1; diff --git a/cpp/src/tests/qpid_stream.cpp b/cpp/src/tests/qpid_stream.cpp index ef0aea52e4..5ed7f84492 100644 --- a/cpp/src/tests/qpid_stream.cpp +++ b/cpp/src/tests/qpid_stream.cpp @@ -87,8 +87,8 @@ struct Client : qpid::sys::Runnable void run() { + Connection connection; try { - Connection connection; connection.open(opts.url); Session session = connection.newSession(); doWork(session); @@ -96,6 +96,7 @@ struct Client : qpid::sys::Runnable connection.close(); } catch(const std::exception& error) { std::cout << error.what() << std::endl; + connection.close(); } } |