diff options
author | Gordon Sim <gsim@apache.org> | 2009-11-17 17:34:55 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2009-11-17 17:34:55 +0000 |
commit | 99c896bf60506c66f339d2ce51d4dca2725968af (patch) | |
tree | 5658e1d6c3ad6adfb2823dbbabefa5b66c951274 /cpp/src | |
parent | 054f31c5dd081bf0476ac8cc4db4ece722a465a5 (diff) | |
download | qpid-python-99c896bf60506c66f339d2ce51d4dca2725968af.tar.gz |
QPID-664: Allow application to set session name and retrieve session using that name; close all sessions when connection is closed.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@881394 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp | 34 | ||||
-rw-r--r-- | cpp/src/qpid/client/amqp0_10/ConnectionImpl.h | 9 | ||||
-rw-r--r-- | cpp/src/qpid/messaging/Connection.cpp | 3 | ||||
-rw-r--r-- | cpp/src/qpid/messaging/ConnectionImpl.h | 3 | ||||
-rw-r--r-- | cpp/src/tests/MessagingSessionTests.cpp | 14 |
5 files changed, 51 insertions, 12 deletions
diff --git a/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp b/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp index 1698f96caf..ebc5859d32 100644 --- a/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp +++ b/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp @@ -22,14 +22,17 @@ #include "SessionImpl.h" #include "qpid/messaging/Session.h" #include "qpid/client/PrivateImplRef.h" +#include "qpid/framing/Uuid.h" #include "qpid/log/Statement.h" #include <boost/intrusive_ptr.hpp> +#include <vector> namespace qpid { namespace client { namespace amqp0_10 { using qpid::messaging::Variant; +using qpid::framing::Uuid; using namespace qpid::sys; template <class T> void setIfFound(const Variant::Map& map, const std::string& key, T& value) @@ -73,6 +76,15 @@ ConnectionImpl::ConnectionImpl(const std::string& u, const Variant::Map& options void ConnectionImpl::close() { + std::vector<std::string> names; + { + qpid::sys::Mutex::ScopedLock l(lock); + for (Sessions::const_iterator i = sessions.begin(); i != sessions.end(); ++i) names.push_back(i->first); + } + for (std::vector<std::string>::const_iterator i = names.begin(); i != names.end(); ++i) { + getSession(*i).close(); + } + qpid::sys::Mutex::ScopedLock l(lock); connection.close(); } @@ -88,22 +100,34 @@ void ConnectionImpl::closed(SessionImpl& s) { qpid::sys::Mutex::ScopedLock l(lock); for (Sessions::iterator i = sessions.begin(); i != sessions.end(); ++i) { - if (getImplPtr(*i).get() == &s) { + if (getImplPtr(i->second).get() == &s) { sessions.erase(i); break; } } } -qpid::messaging::Session ConnectionImpl::newSession() +qpid::messaging::Session ConnectionImpl::getSession(const std::string& name) const +{ + qpid::sys::Mutex::ScopedLock l(lock); + Sessions::const_iterator i = sessions.find(name); + if (i == sessions.end()) { + throw qpid::messaging::KeyError("No such session: " + name); + } else { + return i->second; + } +} + +qpid::messaging::Session ConnectionImpl::newSession(const std::string& n) { + std::string name = n.empty() ? Uuid(true).str() : n; qpid::messaging::Session impl(new SessionImpl(*this)); { qpid::sys::Mutex::ScopedLock l(lock); - sessions.push_back(impl); + sessions[name] = impl; } try { - getImplPtr(impl)->setSession(connection.newSession()); + getImplPtr(impl)->setSession(connection.newSession(name)); } catch (const TransportFailure&) { reconnect(); } @@ -172,7 +196,7 @@ bool ConnectionImpl::resetSessions() try { qpid::sys::Mutex::ScopedLock l(lock); for (Sessions::iterator i = sessions.begin(); i != sessions.end(); ++i) { - getImplPtr(*i)->setSession(connection.newSession()); + getImplPtr(i->second)->setSession(connection.newSession(i->first)); } return true; } catch (const TransportFailure&) { diff --git a/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h b/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h index a8754778f0..5272121f92 100644 --- a/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h +++ b/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h @@ -29,7 +29,7 @@ #include "qpid/client/ConnectionSettings.h" #include "qpid/sys/Mutex.h" #include "qpid/sys/Semaphore.h" -#include <vector> +#include <map> namespace qpid { namespace client { @@ -42,13 +42,14 @@ class ConnectionImpl : public qpid::messaging::ConnectionImpl public: ConnectionImpl(const std::string& url, const qpid::messaging::Variant::Map& options); void close(); - qpid::messaging::Session newSession(); + qpid::messaging::Session newSession(const std::string& name); + qpid::messaging::Session getSession(const std::string& name) const; void closed(SessionImpl&); void reconnect(); private: - typedef std::vector<qpid::messaging::Session> Sessions; + typedef std::map<std::string, qpid::messaging::Session> Sessions; - qpid::sys::Mutex lock;//used to protect data structures + mutable qpid::sys::Mutex lock;//used to protect data structures qpid::sys::Semaphore semaphore;//used to coordinate reconnection Sessions sessions; qpid::client::Connection connection; diff --git a/cpp/src/qpid/messaging/Connection.cpp b/cpp/src/qpid/messaging/Connection.cpp index feb6566008..8342fc546a 100644 --- a/cpp/src/qpid/messaging/Connection.cpp +++ b/cpp/src/qpid/messaging/Connection.cpp @@ -50,7 +50,8 @@ Connection& Connection::operator=(const Connection& c) { return PI::assign(*this Connection::~Connection() { PI::dtor(*this); } void Connection::close() { impl->close(); } -Session Connection::newSession() { return impl->newSession(); } +Session Connection::newSession(const std::string& name) { return impl->newSession(name); } +Session Connection::getSession(const std::string& name) const { return impl->getSession(name); } InvalidOptionString::InvalidOptionString(const std::string& msg) : Exception(msg) {} diff --git a/cpp/src/qpid/messaging/ConnectionImpl.h b/cpp/src/qpid/messaging/ConnectionImpl.h index aa9e5b5fbe..ce60f6f9fc 100644 --- a/cpp/src/qpid/messaging/ConnectionImpl.h +++ b/cpp/src/qpid/messaging/ConnectionImpl.h @@ -37,7 +37,8 @@ class ConnectionImpl : public virtual qpid::RefCounted public: virtual ~ConnectionImpl() {} virtual void close() = 0; - virtual Session newSession() = 0; + virtual Session newSession(const std::string& name) = 0; + virtual Session getSession(const std::string& name) const = 0; private: }; }} // namespace qpid::messaging diff --git a/cpp/src/tests/MessagingSessionTests.cpp b/cpp/src/tests/MessagingSessionTests.cpp index 5b030f0f31..082c639636 100644 --- a/cpp/src/tests/MessagingSessionTests.cpp +++ b/cpp/src/tests/MessagingSessionTests.cpp @@ -704,7 +704,6 @@ QPID_AUTO_TEST_CASE(testGetSender) BOOST_CHECK_THROW(fix.session.getSender("UnknownSender"), qpid::messaging::KeyError); } - QPID_AUTO_TEST_CASE(testGetReceiver) { QueueFixture fix; @@ -719,6 +718,19 @@ QPID_AUTO_TEST_CASE(testGetReceiver) BOOST_CHECK_THROW(fix.session.getReceiver("UnknownReceiver"), qpid::messaging::KeyError); } +QPID_AUTO_TEST_CASE(testGetSession) +{ + QueueFixture fix; + fix.connection.newSession("my-session"); + Session session = fix.connection.getSession("my-session"); + Message out(Uuid(true).str()); + session.createSender(fix.queue).send(out); + Message in; + BOOST_CHECK(session.createReceiver(fix.queue).fetch(in)); + BOOST_CHECK_EQUAL(out.getContent(), in.getContent()); + BOOST_CHECK_THROW(fix.connection.getSession("UnknownSession"), qpid::messaging::KeyError); +} + QPID_AUTO_TEST_SUITE_END() }} // namespace qpid::tests |