summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2009-11-17 17:34:55 +0000
committerGordon Sim <gsim@apache.org>2009-11-17 17:34:55 +0000
commit99c896bf60506c66f339d2ce51d4dca2725968af (patch)
tree5658e1d6c3ad6adfb2823dbbabefa5b66c951274 /cpp/src
parent054f31c5dd081bf0476ac8cc4db4ece722a465a5 (diff)
downloadqpid-python-99c896bf60506c66f339d2ce51d4dca2725968af.tar.gz
QPID-664: Allow application to set session name and retrieve session using that name; close all sessions when connection is closed.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@881394 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp34
-rw-r--r--cpp/src/qpid/client/amqp0_10/ConnectionImpl.h9
-rw-r--r--cpp/src/qpid/messaging/Connection.cpp3
-rw-r--r--cpp/src/qpid/messaging/ConnectionImpl.h3
-rw-r--r--cpp/src/tests/MessagingSessionTests.cpp14
5 files changed, 51 insertions, 12 deletions
diff --git a/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp b/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp
index 1698f96caf..ebc5859d32 100644
--- a/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp
+++ b/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp
@@ -22,14 +22,17 @@
#include "SessionImpl.h"
#include "qpid/messaging/Session.h"
#include "qpid/client/PrivateImplRef.h"
+#include "qpid/framing/Uuid.h"
#include "qpid/log/Statement.h"
#include <boost/intrusive_ptr.hpp>
+#include <vector>
namespace qpid {
namespace client {
namespace amqp0_10 {
using qpid::messaging::Variant;
+using qpid::framing::Uuid;
using namespace qpid::sys;
template <class T> void setIfFound(const Variant::Map& map, const std::string& key, T& value)
@@ -73,6 +76,15 @@ ConnectionImpl::ConnectionImpl(const std::string& u, const Variant::Map& options
void ConnectionImpl::close()
{
+ std::vector<std::string> names;
+ {
+ qpid::sys::Mutex::ScopedLock l(lock);
+ for (Sessions::const_iterator i = sessions.begin(); i != sessions.end(); ++i) names.push_back(i->first);
+ }
+ for (std::vector<std::string>::const_iterator i = names.begin(); i != names.end(); ++i) {
+ getSession(*i).close();
+ }
+
qpid::sys::Mutex::ScopedLock l(lock);
connection.close();
}
@@ -88,22 +100,34 @@ void ConnectionImpl::closed(SessionImpl& s)
{
qpid::sys::Mutex::ScopedLock l(lock);
for (Sessions::iterator i = sessions.begin(); i != sessions.end(); ++i) {
- if (getImplPtr(*i).get() == &s) {
+ if (getImplPtr(i->second).get() == &s) {
sessions.erase(i);
break;
}
}
}
-qpid::messaging::Session ConnectionImpl::newSession()
+qpid::messaging::Session ConnectionImpl::getSession(const std::string& name) const
+{
+ qpid::sys::Mutex::ScopedLock l(lock);
+ Sessions::const_iterator i = sessions.find(name);
+ if (i == sessions.end()) {
+ throw qpid::messaging::KeyError("No such session: " + name);
+ } else {
+ return i->second;
+ }
+}
+
+qpid::messaging::Session ConnectionImpl::newSession(const std::string& n)
{
+ std::string name = n.empty() ? Uuid(true).str() : n;
qpid::messaging::Session impl(new SessionImpl(*this));
{
qpid::sys::Mutex::ScopedLock l(lock);
- sessions.push_back(impl);
+ sessions[name] = impl;
}
try {
- getImplPtr(impl)->setSession(connection.newSession());
+ getImplPtr(impl)->setSession(connection.newSession(name));
} catch (const TransportFailure&) {
reconnect();
}
@@ -172,7 +196,7 @@ bool ConnectionImpl::resetSessions()
try {
qpid::sys::Mutex::ScopedLock l(lock);
for (Sessions::iterator i = sessions.begin(); i != sessions.end(); ++i) {
- getImplPtr(*i)->setSession(connection.newSession());
+ getImplPtr(i->second)->setSession(connection.newSession(i->first));
}
return true;
} catch (const TransportFailure&) {
diff --git a/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h b/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h
index a8754778f0..5272121f92 100644
--- a/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h
+++ b/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h
@@ -29,7 +29,7 @@
#include "qpid/client/ConnectionSettings.h"
#include "qpid/sys/Mutex.h"
#include "qpid/sys/Semaphore.h"
-#include <vector>
+#include <map>
namespace qpid {
namespace client {
@@ -42,13 +42,14 @@ class ConnectionImpl : public qpid::messaging::ConnectionImpl
public:
ConnectionImpl(const std::string& url, const qpid::messaging::Variant::Map& options);
void close();
- qpid::messaging::Session newSession();
+ qpid::messaging::Session newSession(const std::string& name);
+ qpid::messaging::Session getSession(const std::string& name) const;
void closed(SessionImpl&);
void reconnect();
private:
- typedef std::vector<qpid::messaging::Session> Sessions;
+ typedef std::map<std::string, qpid::messaging::Session> Sessions;
- qpid::sys::Mutex lock;//used to protect data structures
+ mutable qpid::sys::Mutex lock;//used to protect data structures
qpid::sys::Semaphore semaphore;//used to coordinate reconnection
Sessions sessions;
qpid::client::Connection connection;
diff --git a/cpp/src/qpid/messaging/Connection.cpp b/cpp/src/qpid/messaging/Connection.cpp
index feb6566008..8342fc546a 100644
--- a/cpp/src/qpid/messaging/Connection.cpp
+++ b/cpp/src/qpid/messaging/Connection.cpp
@@ -50,7 +50,8 @@ Connection& Connection::operator=(const Connection& c) { return PI::assign(*this
Connection::~Connection() { PI::dtor(*this); }
void Connection::close() { impl->close(); }
-Session Connection::newSession() { return impl->newSession(); }
+Session Connection::newSession(const std::string& name) { return impl->newSession(name); }
+Session Connection::getSession(const std::string& name) const { return impl->getSession(name); }
InvalidOptionString::InvalidOptionString(const std::string& msg) : Exception(msg) {}
diff --git a/cpp/src/qpid/messaging/ConnectionImpl.h b/cpp/src/qpid/messaging/ConnectionImpl.h
index aa9e5b5fbe..ce60f6f9fc 100644
--- a/cpp/src/qpid/messaging/ConnectionImpl.h
+++ b/cpp/src/qpid/messaging/ConnectionImpl.h
@@ -37,7 +37,8 @@ class ConnectionImpl : public virtual qpid::RefCounted
public:
virtual ~ConnectionImpl() {}
virtual void close() = 0;
- virtual Session newSession() = 0;
+ virtual Session newSession(const std::string& name) = 0;
+ virtual Session getSession(const std::string& name) const = 0;
private:
};
}} // namespace qpid::messaging
diff --git a/cpp/src/tests/MessagingSessionTests.cpp b/cpp/src/tests/MessagingSessionTests.cpp
index 5b030f0f31..082c639636 100644
--- a/cpp/src/tests/MessagingSessionTests.cpp
+++ b/cpp/src/tests/MessagingSessionTests.cpp
@@ -704,7 +704,6 @@ QPID_AUTO_TEST_CASE(testGetSender)
BOOST_CHECK_THROW(fix.session.getSender("UnknownSender"), qpid::messaging::KeyError);
}
-
QPID_AUTO_TEST_CASE(testGetReceiver)
{
QueueFixture fix;
@@ -719,6 +718,19 @@ QPID_AUTO_TEST_CASE(testGetReceiver)
BOOST_CHECK_THROW(fix.session.getReceiver("UnknownReceiver"), qpid::messaging::KeyError);
}
+QPID_AUTO_TEST_CASE(testGetSession)
+{
+ QueueFixture fix;
+ fix.connection.newSession("my-session");
+ Session session = fix.connection.getSession("my-session");
+ Message out(Uuid(true).str());
+ session.createSender(fix.queue).send(out);
+ Message in;
+ BOOST_CHECK(session.createReceiver(fix.queue).fetch(in));
+ BOOST_CHECK_EQUAL(out.getContent(), in.getContent());
+ BOOST_CHECK_THROW(fix.connection.getSession("UnknownSession"), qpid::messaging::KeyError);
+}
+
QPID_AUTO_TEST_SUITE_END()
}} // namespace qpid::tests