diff options
author | Gordon Sim <gsim@apache.org> | 2014-10-27 17:00:41 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2014-10-27 17:00:41 +0000 |
commit | 02fb5003ae7c0d9a517f5a4ee530e2b9401c6c59 (patch) | |
tree | 3b14601527b2b73f67eeb76f7a4a3908d9948088 | |
parent | bf0436b096c63baf69555a5528f7afd4f5717dbb (diff) | |
download | qpid-python-02fb5003ae7c0d9a517f5a4ee530e2b9401c6c59.tar.gz |
QPID-4710: Initial support for broker side AMQP 1.0 local transactions (single txn per session, single session per txn to begin with)
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1634598 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/cpp/src/qpid/broker/amqp/Incoming.cpp | 22 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/amqp/Incoming.h | 6 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/amqp/ManagedSession.cpp | 23 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/amqp/ManagedSession.h | 3 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp | 8 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/amqp/Outgoing.h | 2 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/amqp/Session.cpp | 176 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/amqp/Session.h | 16 |
8 files changed, 236 insertions, 20 deletions
diff --git a/qpid/cpp/src/qpid/broker/amqp/Incoming.cpp b/qpid/cpp/src/qpid/broker/amqp/Incoming.cpp index 634ae79be1..ce4c73dead 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Incoming.cpp +++ b/qpid/cpp/src/qpid/broker/amqp/Incoming.cpp @@ -107,7 +107,7 @@ namespace { } DecodingIncoming::DecodingIncoming(pn_link_t* link, Broker& broker, Session& parent, const std::string& source, const std::string& target, const std::string& name) - : Incoming(link, broker, parent, source, target, name), session(parent.shared_from_this()) {} + : Incoming(link, broker, parent, source, target, name), sessionPtr(parent.shared_from_this()) {} DecodingIncoming::~DecodingIncoming() {} void DecodingIncoming::readable(pn_delivery_t* delivery) @@ -135,16 +135,20 @@ void DecodingIncoming::readable(pn_delivery_t* delivery) received->scan(); pn_link_advance(link); - received->setPublisher(&session->getParent()); + received->setPublisher(&session.getParent()); received->computeExpiration(); - - qpid::broker::Message message(received, received); - userid.verify(message.getUserId()); - handle(message); --window; - received->begin(); - Transfer t(delivery, session); - received->end(t); + deliver(received, delivery); } } + +void DecodingIncoming::deliver(boost::intrusive_ptr<qpid::broker::amqp::Message> received, pn_delivery_t* delivery) +{ + qpid::broker::Message message(received, received); + userid.verify(message.getUserId()); + handle(message, session.getTransaction(delivery)); + received->begin(); + Transfer t(delivery, sessionPtr); + received->end(t); +} }}} // namespace qpid::broker::amqp diff --git a/qpid/cpp/src/qpid/broker/amqp/Incoming.h b/qpid/cpp/src/qpid/broker/amqp/Incoming.h index 38b9b3a919..1a7064337d 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Incoming.h +++ b/qpid/cpp/src/qpid/broker/amqp/Incoming.h @@ -31,6 +31,7 @@ namespace qpid { namespace broker { class Broker; class Message; +class TxBuffer; namespace amqp { class Session; @@ -74,9 +75,10 @@ class DecodingIncoming : public Incoming DecodingIncoming(pn_link_t*, Broker& broker, Session& parent, const std::string& source, const std::string& target, const std::string& name); virtual ~DecodingIncoming(); void readable(pn_delivery_t* delivery); - virtual void handle(qpid::broker::Message&) = 0; + virtual void deliver(boost::intrusive_ptr<qpid::broker::amqp::Message> received, pn_delivery_t* delivery); + virtual void handle(qpid::broker::Message&, qpid::broker::TxBuffer*) = 0; private: - boost::shared_ptr<Session> session; + boost::shared_ptr<Session> sessionPtr; boost::intrusive_ptr<Message> partial; }; diff --git a/qpid/cpp/src/qpid/broker/amqp/ManagedSession.cpp b/qpid/cpp/src/qpid/broker/amqp/ManagedSession.cpp index 19fa5a6a4d..3bf9cf90db 100644 --- a/qpid/cpp/src/qpid/broker/amqp/ManagedSession.cpp +++ b/qpid/cpp/src/qpid/broker/amqp/ManagedSession.cpp @@ -90,6 +90,29 @@ void ManagedSession::incomingMessageRejected() { } +void ManagedSession::txStarted() +{ + if (session) { + session->inc_TxnStarts(); + } +} + +void ManagedSession::txCommitted() +{ + if (session) { + session->inc_TxnCommits(); + session->inc_TxnCount(); + } +} + +void ManagedSession::txAborted() +{ + if (session) { + session->inc_TxnRejects(); + session->inc_TxnCount(); + } +} + ManagedConnection& ManagedSession::getParent() { return parent; diff --git a/qpid/cpp/src/qpid/broker/amqp/ManagedSession.h b/qpid/cpp/src/qpid/broker/amqp/ManagedSession.h index 0da048654c..c35e304cfb 100644 --- a/qpid/cpp/src/qpid/broker/amqp/ManagedSession.h +++ b/qpid/cpp/src/qpid/broker/amqp/ManagedSession.h @@ -47,6 +47,9 @@ class ManagedSession : public qpid::management::Manageable, public OwnershipToke void outgoingMessageSent(); void outgoingMessageAccepted(); void outgoingMessageRejected(); + void txStarted(); + void txCommitted(); + void txAborted(); ManagedConnection& getParent(); qpid::management::Manageable::status_t ManagementMethod (uint32_t, qpid::management::Args&, std::string&); diff --git a/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp b/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp index ba073c8a36..54993d071e 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp +++ b/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp @@ -112,6 +112,12 @@ void OutgoingFromQueue::handle(pn_delivery_t* delivery) if (pn_delivery_updated(delivery)) { assert(r.delivery == delivery); r.disposition = pn_delivery_remote_state(delivery); + + std::pair<TxBuffer*,uint64_t> txn = session.getTransactionalState(delivery); + if (txn.first) { + r.disposition = txn.second; + } + if (!r.disposition && pn_delivery_settled(delivery)) { //if peer has settled without setting state, assume accepted r.disposition = PN_ACCEPTED; @@ -119,7 +125,7 @@ void OutgoingFromQueue::handle(pn_delivery_t* delivery) if (r.disposition) { switch (r.disposition) { case PN_ACCEPTED: - if (preAcquires()) queue->dequeue(0, r.cursor); + if (preAcquires()) queue->dequeue(r.cursor, txn.first); outgoingMessageAccepted(); break; case PN_REJECTED: diff --git a/qpid/cpp/src/qpid/broker/amqp/Outgoing.h b/qpid/cpp/src/qpid/broker/amqp/Outgoing.h index 8039044aea..27d8205fc8 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Outgoing.h +++ b/qpid/cpp/src/qpid/broker/amqp/Outgoing.h @@ -77,7 +77,7 @@ class Outgoing : public ManagedOutgoingLink virtual void handle(pn_delivery_t* delivery) = 0; void wakeup(); virtual ~Outgoing() {} - private: + protected: Session& session; }; diff --git a/qpid/cpp/src/qpid/broker/amqp/Session.cpp b/qpid/cpp/src/qpid/broker/amqp/Session.cpp index 041c0b6117..68a735709b 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Session.cpp +++ b/qpid/cpp/src/qpid/broker/amqp/Session.cpp @@ -23,12 +23,14 @@ #include "Outgoing.h" #include "Message.h" #include "Connection.h" +#include "DataReader.h" #include "Domain.h" #include "Exception.h" #include "Interconnects.h" #include "NodePolicy.h" #include "Relay.h" #include "Topic.h" +#include "qpid/amqp/Descriptor.h" #include "qpid/amqp/descriptors.h" #include "qpid/broker/Broker.h" #include "qpid/broker/DeliverableMessage.h" @@ -176,7 +178,7 @@ class IncomingToQueue : public DecodingIncoming queue->markInUse(isControllingLink); } ~IncomingToQueue() { queue->releaseFromUse(isControllingLink); } - void handle(qpid::broker::Message& m); + void handle(qpid::broker::Message& m, qpid::broker::TxBuffer*); private: boost::shared_ptr<qpid::broker::Queue> queue; bool isControllingLink; @@ -194,17 +196,28 @@ class IncomingToExchange : public DecodingIncoming { exchange->decOtherUsers(isControllingLink); } - void handle(qpid::broker::Message& m); + void handle(qpid::broker::Message& m, qpid::broker::TxBuffer*); private: boost::shared_ptr<qpid::broker::Exchange> exchange; Authorise& authorise; bool isControllingLink; }; +class IncomingToCoordinator : public DecodingIncoming +{ + public: + IncomingToCoordinator(pn_link_t* link, Broker& broker, Session& parent) + : DecodingIncoming(link, broker, parent, std::string(), "txn-ctrl", pn_link_name(link)) {} + ~IncomingToCoordinator() { session.abort(); } + void deliver(boost::intrusive_ptr<qpid::broker::amqp::Message>, pn_delivery_t*); + void handle(qpid::broker::Message&, qpid::broker::TxBuffer*) {} + private: +}; + Session::Session(pn_session_t* s, Connection& c, qpid::sys::OutputControl& o) : ManagedSession(c.getBroker(), c, (boost::format("%1%") % s).str()), session(s), connection(c), out(o), deleted(false), authorise(connection.getUserId(), connection.getBroker().getAcl()), - detachRequested() {} + detachRequested(), txnId((boost::format("%1%") % s).str()) {} Session::ResolvedNode Session::resolve(const std::string name, pn_terminus_t* terminus, bool incoming) @@ -386,6 +399,11 @@ void Session::attach(pn_link_t* link) std::string name; if (pn_terminus_get_type(target) == PN_UNSPECIFIED) { throw Exception(qpid::amqp::error_conditions::PRECONDITION_FAILED, "No target specified!"); + } else if (pn_terminus_get_type(target) == PN_COORDINATOR) { + QPID_LOG(debug, "Received attach request for incoming link to transaction coordinator on " << this); + boost::shared_ptr<Incoming> i(new IncomingToCoordinator(link, connection.getBroker(), *this)); + incoming[link] = i; + return; } else if (pn_terminus_is_dynamic(target)) { name = generateName(link); QPID_LOG(debug, "Received attach request for incoming link to " << name); @@ -623,6 +641,9 @@ void Session::writable(pn_link_t* link, pn_delivery_t* delivery) bool Session::dispatch() { bool output(false); + if (commitPending.boolCompareAndSwap(true, false)) { + committed(true); + } for (OutgoingLinks::iterator s = outgoing.begin(); s != outgoing.end();) { try { if (s->second->doWork()) output = true; @@ -706,7 +727,117 @@ void Session::detachedByManagement() wakeup(); } -void IncomingToQueue::handle(qpid::broker::Message& message) +TxBuffer* Session::getTransaction(const std::string& id) +{ + return (txn.get() && id == txnId) ? txn.get() : 0; +} + +TxBuffer* Session::getTransaction(pn_delivery_t* delivery) +{ + return getTransactionalState(delivery).first; +} + +std::pair<TxBuffer*,uint64_t> Session::getTransactionalState(pn_delivery_t* delivery) +{ + std::pair<TxBuffer*,uint64_t> result(0, 0); + if (pn_delivery_remote_state(delivery) == qpid::amqp::transaction::TRANSACTIONAL_STATE_CODE) { + pn_data_t* data = pn_disposition_data(pn_delivery_remote(delivery)); + if (data && pn_data_next(data)) { + size_t count = pn_data_get_list(data); + if (count > 0) { + pn_data_enter(data); + pn_data_next(data); + std::string id = convert(pn_data_get_binary(data)); + result.first = getTransaction(id); + if (!result.first) { + QPID_LOG(error, "Transaction not found for id: " << id); + } + if (count > 1 && pn_data_next(data)) { + result.second = pn_data_get_ulong(data); + } + pn_data_exit(data); + } + } else { + QPID_LOG(error, "Transactional delivery " << delivery << " appears to have no data"); + } + } + return result; +} + +std::string Session::declare() +{ + if (txn.get()) { + //not sure what the error code should be; none in spec really fit well. + throw Exception(qpid::amqp::error_conditions::transaction::ROLLBACK, "Session only supports one transaction active at a time"); + } + txn = boost::intrusive_ptr<TxBuffer>(new TxBuffer()); + connection.getBroker().getBrokerObservers().startTx(txn); + txStarted(); + return txnId; +} + +namespace { + class AsyncCommit : public qpid::broker::AsyncCompletion::Callback + { + public: + AsyncCommit(boost::shared_ptr<Session> s) : session(s) {} + void completed(bool sync) { session->committed(sync); } + boost::intrusive_ptr<qpid::broker::AsyncCompletion::Callback> clone() + { + boost::intrusive_ptr<qpid::broker::AsyncCompletion::Callback> copy(new AsyncCommit(session)); + return copy; + } + private: + boost::shared_ptr<Session> session; + }; +} + +void Session::discharge(const std::string& id, bool failed) +{ + if (!txn.get() || id != txnId) { + throw Exception(qpid::amqp::error_conditions::transaction::UNKNOWN_ID, "No transaction declared with that id"); + } + if (failed) { + abort(); + } else { + txn->begin(); + txn->startCommit(&connection.getBroker().getStore()); + AsyncCommit callback(shared_from_this()); + txn->end(callback); + } +} + +void Session::abort() +{ + if (txn) { + txn->rollback(); + txAborted(); + } +} + +void Session::committed(bool sync) +{ + if (sync) { + //this is on IO thread + if (txn.get()) { + txn->endCommit(&connection.getBroker().getStore()); + txCommitted(); + txn = boost::intrusive_ptr<TxBuffer>(); + } else { + throw Exception(qpid::amqp::error_conditions::transaction::ROLLBACK, "tranaction vanished during async commit"); + } + } else { + //this is not on IO thread, need to delay processing until on IO thread + if (commitPending.boolCompareAndSwap(false, true)) { + qpid::sys::Mutex::ScopedLock l(lock); + if (!deleted) { + out.activateOutput(); + } + } + } +} + +void IncomingToQueue::handle(qpid::broker::Message& message, qpid::broker::TxBuffer* transaction) { if (queue->isDeleted()) { std::stringstream msg; @@ -714,18 +845,18 @@ void IncomingToQueue::handle(qpid::broker::Message& message) throw Exception(qpid::amqp::error_conditions::RESOURCE_DELETED, msg.str()); } try { - queue->deliver(message); + queue->deliver(message, transaction); } catch (const qpid::SessionException& e) { throw Exception(qpid::amqp::error_conditions::PRECONDITION_FAILED, e.what()); } } -void IncomingToExchange::handle(qpid::broker::Message& message) +void IncomingToExchange::handle(qpid::broker::Message& message, qpid::broker::TxBuffer* transaction) { if (exchange->isDestroyed()) throw qpid::framing::ResourceDeletedException(QPID_MSG("Exchange " << exchange->getName() << " has been deleted.")); authorise.route(exchange, message); - DeliverableMessage deliverable(message, 0); + DeliverableMessage deliverable(message, transaction); exchange->route(deliverable); if (!deliverable.delivered) { if (exchange->getAlternate()) { @@ -734,4 +865,35 @@ void IncomingToExchange::handle(qpid::broker::Message& message) } } +void IncomingToCoordinator::deliver(boost::intrusive_ptr<qpid::broker::amqp::Message> message, pn_delivery_t* delivery) +{ + if (message && message->isTypedBody()) { + QPID_LOG(debug, "Coordinator got message: @" << message->getBodyDescriptor() << " " << message->getTypedBody()); + if (message->getBodyDescriptor().match(qpid::amqp::transaction::DECLARE_SYMBOL, qpid::amqp::transaction::DECLARE_CODE)) { + std::string id = session.declare(); + //encode the txn id in a 'declared' list on the disposition + pn_data_t* data = pn_disposition_data(pn_delivery_local(delivery)); + pn_data_put_list(data); + pn_data_enter(data); + pn_data_put_binary(data, convert(id)); + pn_data_exit(data); + pn_data_exit(data); + pn_delivery_update(delivery, qpid::amqp::transaction::DECLARED_CODE); + pn_delivery_settle(delivery); + session.incomingMessageAccepted(); + } else if (message->getBodyDescriptor().match(qpid::amqp::transaction::DISCHARGE_SYMBOL, qpid::amqp::transaction::DISCHARGE_CODE)) { + if (message->getTypedBody().getType() == qpid::types::VAR_LIST) { + qpid::types::Variant::List args = message->getTypedBody().asList(); + qpid::types::Variant::List::const_iterator i = args.begin(); + if (i != args.end()) { + std::string id = *i; + bool failed = ++i != args.end() ? i->asBool() : false; + session.discharge(id, failed); + DecodingIncoming::deliver(message, delivery);//ensures async completion of commit is taken care of + } + } + } + } +} + }}} // namespace qpid::broker::amqp diff --git a/qpid/cpp/src/qpid/broker/amqp/Session.h b/qpid/cpp/src/qpid/broker/amqp/Session.h index 997ad5d87b..591af1175f 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Session.h +++ b/qpid/cpp/src/qpid/broker/amqp/Session.h @@ -21,6 +21,7 @@ * under the License. * */ +#include "qpid/sys/AtomicValue.h" #include "qpid/sys/Mutex.h" #include "qpid/sys/OutputControl.h" #include "qpid/broker/amqp/Authorise.h" @@ -29,6 +30,7 @@ #include <deque> #include <map> #include <set> +#include <boost/intrusive_ptr.hpp> #include <boost/shared_ptr.hpp> #include <boost/enable_shared_from_this.hpp> @@ -43,6 +45,7 @@ namespace broker { class Broker; class Exchange; class Queue; +class TxBuffer; namespace amqp { @@ -76,10 +79,20 @@ class Session : public ManagedSession, public boost::enable_shared_from_this<Ses //called when a transfer is completly processed (e.g.including stored on disk) void accepted(pn_delivery_t*, bool sync); + //called when async transaction completes + void committed(bool sync); void wakeup(); Authorise& getAuthorise(); + + TxBuffer* getTransaction(const std::string&); + TxBuffer* getTransaction(pn_delivery_t*); + std::pair<TxBuffer*,uint64_t> getTransactionalState(pn_delivery_t*); + //transaction coordination: + std::string declare(); + void discharge(const std::string& id, bool failed); + void abort(); protected: void detachedByManagement(); private: @@ -96,6 +109,9 @@ class Session : public ManagedSession, public boost::enable_shared_from_this<Ses std::set< boost::shared_ptr<Queue> > exclusiveQueues; Authorise authorise; bool detachRequested; + boost::intrusive_ptr<TxBuffer> txn; + std::string txnId; + qpid::sys::AtomicValue<bool> commitPending; struct ResolvedNode { |