diff options
-rw-r--r-- | cpp/include/qpid/messaging/Connection.h | 3 | ||||
-rw-r--r-- | cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp | 4 | ||||
-rw-r--r-- | cpp/src/qpid/client/amqp0_10/ConnectionImpl.h | 2 | ||||
-rw-r--r-- | cpp/src/qpid/client/amqp0_10/SessionImpl.cpp | 3 | ||||
-rw-r--r-- | cpp/src/qpid/client/amqp0_10/SessionImpl.h | 3 | ||||
-rw-r--r-- | cpp/src/qpid/messaging/Connection.cpp | 7 | ||||
-rw-r--r-- | cpp/src/qpid/messaging/ConnectionImpl.h | 2 | ||||
-rw-r--r-- | cpp/src/tests/MessagingSessionTests.cpp | 27 |
8 files changed, 44 insertions, 7 deletions
diff --git a/cpp/include/qpid/messaging/Connection.h b/cpp/include/qpid/messaging/Connection.h index 531f046bec..f7c5ecfae9 100644 --- a/cpp/include/qpid/messaging/Connection.h +++ b/cpp/include/qpid/messaging/Connection.h @@ -48,7 +48,10 @@ class Connection : public qpid::client::Handle<ConnectionImpl> QPID_CLIENT_EXTERN ~Connection(); QPID_CLIENT_EXTERN Connection& operator=(const Connection&); QPID_CLIENT_EXTERN void close(); + QPID_CLIENT_EXTERN Session newSession(bool transactional, const std::string& name = std::string()); QPID_CLIENT_EXTERN Session newSession(const std::string& name = std::string()); + QPID_CLIENT_EXTERN Session newSession(const char* name); + QPID_CLIENT_EXTERN Session getSession(const std::string& name) const; private: friend class qpid::client::PrivateImplRef<Connection>; diff --git a/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp b/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp index ebc5859d32..cd5c0214e3 100644 --- a/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp +++ b/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp @@ -118,10 +118,10 @@ qpid::messaging::Session ConnectionImpl::getSession(const std::string& name) con } } -qpid::messaging::Session ConnectionImpl::newSession(const std::string& n) +qpid::messaging::Session ConnectionImpl::newSession(bool transactional, const std::string& n) { std::string name = n.empty() ? Uuid(true).str() : n; - qpid::messaging::Session impl(new SessionImpl(*this)); + qpid::messaging::Session impl(new SessionImpl(*this, transactional)); { qpid::sys::Mutex::ScopedLock l(lock); sessions[name] = impl; diff --git a/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h b/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h index 5272121f92..979cc6c82a 100644 --- a/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h +++ b/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h @@ -42,7 +42,7 @@ class ConnectionImpl : public qpid::messaging::ConnectionImpl public: ConnectionImpl(const std::string& url, const qpid::messaging::Variant::Map& options); void close(); - qpid::messaging::Session newSession(const std::string& name); + qpid::messaging::Session newSession(bool transactional, const std::string& name); qpid::messaging::Session getSession(const std::string& name) const; void closed(SessionImpl&); void reconnect(); diff --git a/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp b/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp index 0c09f26039..bb47288e88 100644 --- a/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp +++ b/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp @@ -50,7 +50,7 @@ namespace qpid { namespace client { namespace amqp0_10 { -SessionImpl::SessionImpl(ConnectionImpl& c) : connection(c) {} +SessionImpl::SessionImpl(ConnectionImpl& c, bool t) : connection(c), transactional(t) {} void SessionImpl::sync() @@ -134,6 +134,7 @@ void SessionImpl::setSession(qpid::client::Session s) qpid::sys::Mutex::ScopedLock l(lock); session = s; incoming.setSession(session); + if (transactional) session.txSelect(); for (Receivers::iterator i = receivers.begin(); i != receivers.end(); ++i) { getImplPtr<Receiver, ReceiverImpl>(i->second)->init(session, resolver); } diff --git a/cpp/src/qpid/client/amqp0_10/SessionImpl.h b/cpp/src/qpid/client/amqp0_10/SessionImpl.h index 30391fc0c5..96c7ca93a3 100644 --- a/cpp/src/qpid/client/amqp0_10/SessionImpl.h +++ b/cpp/src/qpid/client/amqp0_10/SessionImpl.h @@ -54,7 +54,7 @@ class SenderImpl; class SessionImpl : public qpid::messaging::SessionImpl { public: - SessionImpl(ConnectionImpl&); + SessionImpl(ConnectionImpl&, bool transactional); void commit(); void rollback(); void acknowledge(); @@ -111,6 +111,7 @@ class SessionImpl : public qpid::messaging::SessionImpl IncomingMessages incoming; Receivers receivers; Senders senders; + const bool transactional; bool accept(ReceiverImpl*, qpid::messaging::Message*, IncomingMessages::MessageTransfer&); bool getIncoming(IncomingMessages::Handler& handler, qpid::sys::Duration timeout); diff --git a/cpp/src/qpid/messaging/Connection.cpp b/cpp/src/qpid/messaging/Connection.cpp index 8342fc546a..64ca962317 100644 --- a/cpp/src/qpid/messaging/Connection.cpp +++ b/cpp/src/qpid/messaging/Connection.cpp @@ -50,7 +50,12 @@ Connection& Connection::operator=(const Connection& c) { return PI::assign(*this Connection::~Connection() { PI::dtor(*this); } void Connection::close() { impl->close(); } -Session Connection::newSession(const std::string& name) { return impl->newSession(name); } +Session Connection::newSession(const char* name) { return impl->newSession(false, name); } +Session Connection::newSession(const std::string& name) { return impl->newSession(false, name); } +Session Connection::newSession(bool transactional, const std::string& name) +{ + return impl->newSession(transactional, name); +} Session Connection::getSession(const std::string& name) const { return impl->getSession(name); } InvalidOptionString::InvalidOptionString(const std::string& msg) : Exception(msg) {} diff --git a/cpp/src/qpid/messaging/ConnectionImpl.h b/cpp/src/qpid/messaging/ConnectionImpl.h index ce60f6f9fc..4eff68ff9d 100644 --- a/cpp/src/qpid/messaging/ConnectionImpl.h +++ b/cpp/src/qpid/messaging/ConnectionImpl.h @@ -37,7 +37,7 @@ class ConnectionImpl : public virtual qpid::RefCounted public: virtual ~ConnectionImpl() {} virtual void close() = 0; - virtual Session newSession(const std::string& name) = 0; + virtual Session newSession(bool transactional, const std::string& name) = 0; virtual Session getSession(const std::string& name) const = 0; private: }; diff --git a/cpp/src/tests/MessagingSessionTests.cpp b/cpp/src/tests/MessagingSessionTests.cpp index 07112f52dc..4125c51698 100644 --- a/cpp/src/tests/MessagingSessionTests.cpp +++ b/cpp/src/tests/MessagingSessionTests.cpp @@ -745,6 +745,33 @@ QPID_AUTO_TEST_CASE(testGetConnectionFromSession) BOOST_CHECK_EQUAL(out.getContent(), in.getContent()); } +QPID_AUTO_TEST_CASE(testTx) +{ + QueueFixture fix; + Session ssn1 = fix.connection.newSession(true); + Session ssn2 = fix.connection.newSession(true); + Sender sender1 = ssn1.createSender(fix.queue); + Sender sender2 = ssn2.createSender(fix.queue); + Receiver receiver1 = ssn1.createReceiver(fix.queue); + Receiver receiver2 = ssn2.createReceiver(fix.queue); + Message in; + + send(sender1, 5, 1, "A"); + send(sender2, 5, 1, "B"); + ssn2.commit(); + receive(receiver1, 5, 1, "B");//(only those from sender2 should be received) + BOOST_CHECK(!receiver1.fetch(in, 0));//check there are no more messages + ssn1.rollback(); + receive(receiver2, 5, 1, "B"); + BOOST_CHECK(!receiver2.fetch(in, 0));//check there are no more messages + ssn2.rollback(); + receive(receiver1, 5, 1, "B"); + BOOST_CHECK(!receiver1.fetch(in, 0));//check there are no more messages + ssn1.commit(); + //check neither receiver gets any more messages: + BOOST_CHECK(!receiver1.fetch(in, 0)); + BOOST_CHECK(!receiver2.fetch(in, 0)); +} QPID_AUTO_TEST_SUITE_END() |