diff options
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp | 18 | ||||
-rw-r--r-- | cpp/src/qpid/client/amqp0_10/ReceiverImpl.h | 7 | ||||
-rw-r--r-- | cpp/src/qpid/client/amqp0_10/SenderImpl.cpp | 14 | ||||
-rw-r--r-- | cpp/src/qpid/client/amqp0_10/SenderImpl.h | 7 | ||||
-rw-r--r-- | cpp/src/qpid/client/amqp0_10/SessionImpl.cpp | 8 | ||||
-rw-r--r-- | cpp/src/qpid/client/amqp0_10/SessionImpl.h | 3 | ||||
-rw-r--r-- | cpp/src/tests/qpid_recv.cpp | 3 | ||||
-rw-r--r-- | cpp/src/tests/qpid_send.cpp | 3 | ||||
-rw-r--r-- | cpp/src/tests/qpid_stream.cpp | 3 |
9 files changed, 36 insertions, 30 deletions
diff --git a/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp b/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp index e24f2ba5b4..2f52efbceb 100644 --- a/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp +++ b/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp @@ -57,14 +57,14 @@ qpid::messaging::Message ReceiverImpl::fetch(qpid::messaging::Duration timeout) bool ReceiverImpl::get(qpid::messaging::Message& message, qpid::messaging::Duration timeout) { Get f(*this, message, timeout); - while (!parent.execute(f)) {} + while (!parent->execute(f)) {} return f.result; } bool ReceiverImpl::fetch(qpid::messaging::Message& message, qpid::messaging::Duration timeout) { Fetch f(*this, message, timeout); - while (!parent.execute(f)) {} + while (!parent->execute(f)) {} return f.result; } @@ -112,7 +112,7 @@ void ReceiverImpl::init(qpid::client::AsyncSession s, AddressResolution& resolve } if (state == CANCELLED) { source->cancel(session, destination); - parent.receiverCancelled(destination); + parent->receiverCancelled(destination); } else { source->subscribe(session, destination); start(); @@ -129,23 +129,23 @@ uint32_t ReceiverImpl::getCapacity() uint32_t ReceiverImpl::available() { - return parent.available(destination); + return parent->available(destination); } uint32_t ReceiverImpl::pendingAck() { - return parent.pendingAck(destination); + return parent->pendingAck(destination); } ReceiverImpl::ReceiverImpl(SessionImpl& p, const std::string& name, const qpid::messaging::Address& a) : - parent(p), destination(name), address(a), byteCredit(0xFFFFFFFF), + parent(&p), destination(name), address(a), byteCredit(0xFFFFFFFF), state(UNRESOLVED), capacity(0), window(0) {} bool ReceiverImpl::getImpl(qpid::messaging::Message& message, qpid::messaging::Duration timeout) { - return parent.get(*this, message, timeout); + return parent->get(*this, message, timeout); } bool ReceiverImpl::fetchImpl(qpid::messaging::Message& message, qpid::messaging::Duration timeout) @@ -172,7 +172,7 @@ void ReceiverImpl::closeImpl() if (state != CANCELLED) { state = CANCELLED; source->cancel(session, destination); - parent.receiverCancelled(destination); + parent->receiverCancelled(destination); } } @@ -188,7 +188,7 @@ void ReceiverImpl::setCapacityImpl(uint32_t c) } qpid::messaging::Session ReceiverImpl::getSession() const { - return qpid::messaging::Session(&parent); + return qpid::messaging::Session(parent.get()); } }}} // namespace qpid::client::amqp0_10 diff --git a/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h b/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h index 38aa125ec6..689a7f6f25 100644 --- a/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h +++ b/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h @@ -28,6 +28,7 @@ #include "qpid/client/AsyncSession.h" #include "qpid/client/amqp0_10/SessionImpl.h" #include "qpid/messaging/Duration.h" +#include <boost/intrusive_ptr.hpp> #include <memory> namespace qpid { @@ -65,7 +66,7 @@ class ReceiverImpl : public qpid::messaging::ReceiverImpl void received(qpid::messaging::Message& message); qpid::messaging::Session getSession() const; private: - SessionImpl& parent; + boost::intrusive_ptr<SessionImpl> parent; const std::string destination; const qpid::messaging::Address address; const uint32_t byteCredit; @@ -133,13 +134,13 @@ class ReceiverImpl : public qpid::messaging::ReceiverImpl template <class F> void execute() { F f(*this); - parent.execute(f); + parent->execute(f); } template <class F, class P> void execute1(P p) { F f(*this, p); - parent.execute(f); + parent->execute(f); } }; diff --git a/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp b/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp index 8782e6e813..9bb785e13f 100644 --- a/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp +++ b/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp @@ -31,17 +31,17 @@ namespace amqp0_10 { SenderImpl::SenderImpl(SessionImpl& _parent, const std::string& _name, const qpid::messaging::Address& _address) : - parent(_parent), name(_name), address(_address), state(UNRESOLVED), + parent(&_parent), name(_name), address(_address), state(UNRESOLVED), capacity(50), window(0), flushed(false), unreliable(AddressResolution::is_unreliable(address)) {} void SenderImpl::send(const qpid::messaging::Message& message) { if (unreliable) { UnreliableSend f(*this, &message); - parent.execute(f); + parent->execute(f); } else { Send f(*this, &message); - while (f.repeat) parent.execute(f); + while (f.repeat) parent->execute(f); } } @@ -60,7 +60,7 @@ uint32_t SenderImpl::getCapacity() { return capacity; } uint32_t SenderImpl::pending() { CheckPendingSends f(*this, false); - parent.execute(f); + parent->execute(f); return f.pending; } @@ -73,7 +73,7 @@ void SenderImpl::init(qpid::client::AsyncSession s, AddressResolution& resolver) } if (state == CANCELLED) { sink->cancel(session, name); - parent.senderCancelled(name); + parent->senderCancelled(name); } else { sink->declare(session, name); replay(); @@ -140,7 +140,7 @@ void SenderImpl::closeImpl() { state = CANCELLED; sink->cancel(session, name); - parent.senderCancelled(name); + parent->senderCancelled(name); } const std::string& SenderImpl::getName() const @@ -150,7 +150,7 @@ const std::string& SenderImpl::getName() const qpid::messaging::Session SenderImpl::getSession() const { - return qpid::messaging::Session(&parent); + return qpid::messaging::Session(parent.get()); } }}} // namespace qpid::client::amqp0_10 diff --git a/cpp/src/qpid/client/amqp0_10/SenderImpl.h b/cpp/src/qpid/client/amqp0_10/SenderImpl.h index b65f8cf8cc..9e4181f42f 100644 --- a/cpp/src/qpid/client/amqp0_10/SenderImpl.h +++ b/cpp/src/qpid/client/amqp0_10/SenderImpl.h @@ -28,6 +28,7 @@ #include "qpid/client/AsyncSession.h" #include "qpid/client/amqp0_10/SessionImpl.h" #include <memory> +#include <boost/intrusive_ptr.hpp> #include <boost/ptr_container/ptr_deque.hpp> namespace qpid { @@ -58,7 +59,7 @@ class SenderImpl : public qpid::messaging::SenderImpl qpid::messaging::Session getSession() const; private: - SessionImpl& parent; + boost::intrusive_ptr<SessionImpl> parent; const std::string name; const qpid::messaging::Address address; State state; @@ -143,13 +144,13 @@ class SenderImpl : public qpid::messaging::SenderImpl template <class F> void execute() { F f(*this); - parent.execute(f); + parent->execute(f); } template <class F, class P> bool execute1(P p) { F f(*this, p); - return parent.execute(f); + return parent->execute(f); } }; }}} // namespace qpid::client::amqp0_10 diff --git a/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp b/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp index f12d383206..65308dd0be 100644 --- a/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp +++ b/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp @@ -49,7 +49,7 @@ namespace qpid { namespace client { namespace amqp0_10 { -SessionImpl::SessionImpl(ConnectionImpl& c, bool t) : connection(c), transactional(t) {} +SessionImpl::SessionImpl(ConnectionImpl& c, bool t) : connection(&c), transactional(t) {} void SessionImpl::sync() @@ -108,7 +108,7 @@ void SessionImpl::close() for (std::vector<std::string>::const_iterator i = r.begin(); i != r.end(); ++i) getReceiver(*i).close(); - connection.closed(*this); + connection->closed(*this); session.close(); } @@ -431,12 +431,12 @@ void SessionImpl::senderCancelled(const std::string& name) void SessionImpl::reconnect() { - connection.connect(); + connection->connect(); } qpid::messaging::Connection SessionImpl::getConnection() const { - return qpid::messaging::Connection(&connection); + return qpid::messaging::Connection(connection.get()); } }}} // namespace qpid::client::amqp0_10 diff --git a/cpp/src/qpid/client/amqp0_10/SessionImpl.h b/cpp/src/qpid/client/amqp0_10/SessionImpl.h index 285c8f031b..a7eaae3cdd 100644 --- a/cpp/src/qpid/client/amqp0_10/SessionImpl.h +++ b/cpp/src/qpid/client/amqp0_10/SessionImpl.h @@ -29,6 +29,7 @@ #include "qpid/client/amqp0_10/AddressResolution.h" #include "qpid/client/amqp0_10/IncomingMessages.h" #include "qpid/sys/Mutex.h" +#include <boost/intrusive_ptr.hpp> namespace qpid { @@ -106,7 +107,7 @@ class SessionImpl : public qpid::messaging::SessionImpl typedef std::map<std::string, qpid::messaging::Sender> Senders; mutable qpid::sys::Mutex lock; - ConnectionImpl& connection; + boost::intrusive_ptr<ConnectionImpl> connection; qpid::client::Session session; AddressResolution resolver; IncomingMessages incoming; diff --git a/cpp/src/tests/qpid_recv.cpp b/cpp/src/tests/qpid_recv.cpp index e4cc6a7ac8..10738578ed 100644 --- a/cpp/src/tests/qpid_recv.cpp +++ b/cpp/src/tests/qpid_recv.cpp @@ -148,8 +148,8 @@ int main(int argc, char ** argv) { Options opts; if (opts.parse(argc, argv)) { + Connection connection(opts.connectionOptions); try { - Connection connection(opts.connectionOptions); connection.open(opts.url); std::auto_ptr<FailoverUpdates> updates(opts.failoverUpdates ? new FailoverUpdates(connection) : 0); Session session = connection.newSession(opts.tx > 0); @@ -207,6 +207,7 @@ int main(int argc, char ** argv) return 0; } catch(const std::exception& error) { std::cerr << "Failure: " << error.what() << std::endl; + connection.close(); } } return 1; diff --git a/cpp/src/tests/qpid_send.cpp b/cpp/src/tests/qpid_send.cpp index 50e6c4371a..a8b0241a1d 100644 --- a/cpp/src/tests/qpid_send.cpp +++ b/cpp/src/tests/qpid_send.cpp @@ -181,8 +181,8 @@ int main(int argc, char ** argv) { Options opts; if (opts.parse(argc, argv)) { + Connection connection(opts.connectionOptions); try { - Connection connection(opts.connectionOptions); connection.open(opts.url); std::auto_ptr<FailoverUpdates> updates(opts.failoverUpdates ? new FailoverUpdates(connection) : 0); Session session = connection.newSession(opts.tx > 0); @@ -230,6 +230,7 @@ int main(int argc, char ** argv) return 0; } catch(const std::exception& error) { std::cout << "Failed: " << error.what() << std::endl; + connection.close(); } } return 1; diff --git a/cpp/src/tests/qpid_stream.cpp b/cpp/src/tests/qpid_stream.cpp index ef0aea52e4..5ed7f84492 100644 --- a/cpp/src/tests/qpid_stream.cpp +++ b/cpp/src/tests/qpid_stream.cpp @@ -87,8 +87,8 @@ struct Client : qpid::sys::Runnable void run() { + Connection connection; try { - Connection connection; connection.open(opts.url); Session session = connection.newSession(); doWork(session); @@ -96,6 +96,7 @@ struct Client : qpid::sys::Runnable connection.close(); } catch(const std::exception& error) { std::cout << error.what() << std::endl; + connection.close(); } } |