summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2014-10-27 17:00:41 +0000
committerGordon Sim <gsim@apache.org>2014-10-27 17:00:41 +0000
commit02fb5003ae7c0d9a517f5a4ee530e2b9401c6c59 (patch)
tree3b14601527b2b73f67eeb76f7a4a3908d9948088
parentbf0436b096c63baf69555a5528f7afd4f5717dbb (diff)
downloadqpid-python-02fb5003ae7c0d9a517f5a4ee530e2b9401c6c59.tar.gz
QPID-4710: Initial support for broker side AMQP 1.0 local transactions (single txn per session, single session per txn to begin with)
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1634598 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/Incoming.cpp22
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/Incoming.h6
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/ManagedSession.cpp23
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/ManagedSession.h3
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp8
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/Outgoing.h2
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/Session.cpp176
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/Session.h16
8 files changed, 236 insertions, 20 deletions
diff --git a/qpid/cpp/src/qpid/broker/amqp/Incoming.cpp b/qpid/cpp/src/qpid/broker/amqp/Incoming.cpp
index 634ae79be1..ce4c73dead 100644
--- a/qpid/cpp/src/qpid/broker/amqp/Incoming.cpp
+++ b/qpid/cpp/src/qpid/broker/amqp/Incoming.cpp
@@ -107,7 +107,7 @@ namespace {
}
DecodingIncoming::DecodingIncoming(pn_link_t* link, Broker& broker, Session& parent, const std::string& source, const std::string& target, const std::string& name)
- : Incoming(link, broker, parent, source, target, name), session(parent.shared_from_this()) {}
+ : Incoming(link, broker, parent, source, target, name), sessionPtr(parent.shared_from_this()) {}
DecodingIncoming::~DecodingIncoming() {}
void DecodingIncoming::readable(pn_delivery_t* delivery)
@@ -135,16 +135,20 @@ void DecodingIncoming::readable(pn_delivery_t* delivery)
received->scan();
pn_link_advance(link);
- received->setPublisher(&session->getParent());
+ received->setPublisher(&session.getParent());
received->computeExpiration();
-
- qpid::broker::Message message(received, received);
- userid.verify(message.getUserId());
- handle(message);
--window;
- received->begin();
- Transfer t(delivery, session);
- received->end(t);
+ deliver(received, delivery);
}
}
+
+void DecodingIncoming::deliver(boost::intrusive_ptr<qpid::broker::amqp::Message> received, pn_delivery_t* delivery)
+{
+ qpid::broker::Message message(received, received);
+ userid.verify(message.getUserId());
+ handle(message, session.getTransaction(delivery));
+ received->begin();
+ Transfer t(delivery, sessionPtr);
+ received->end(t);
+}
}}} // namespace qpid::broker::amqp
diff --git a/qpid/cpp/src/qpid/broker/amqp/Incoming.h b/qpid/cpp/src/qpid/broker/amqp/Incoming.h
index 38b9b3a919..1a7064337d 100644
--- a/qpid/cpp/src/qpid/broker/amqp/Incoming.h
+++ b/qpid/cpp/src/qpid/broker/amqp/Incoming.h
@@ -31,6 +31,7 @@ namespace qpid {
namespace broker {
class Broker;
class Message;
+class TxBuffer;
namespace amqp {
class Session;
@@ -74,9 +75,10 @@ class DecodingIncoming : public Incoming
DecodingIncoming(pn_link_t*, Broker& broker, Session& parent, const std::string& source, const std::string& target, const std::string& name);
virtual ~DecodingIncoming();
void readable(pn_delivery_t* delivery);
- virtual void handle(qpid::broker::Message&) = 0;
+ virtual void deliver(boost::intrusive_ptr<qpid::broker::amqp::Message> received, pn_delivery_t* delivery);
+ virtual void handle(qpid::broker::Message&, qpid::broker::TxBuffer*) = 0;
private:
- boost::shared_ptr<Session> session;
+ boost::shared_ptr<Session> sessionPtr;
boost::intrusive_ptr<Message> partial;
};
diff --git a/qpid/cpp/src/qpid/broker/amqp/ManagedSession.cpp b/qpid/cpp/src/qpid/broker/amqp/ManagedSession.cpp
index 19fa5a6a4d..3bf9cf90db 100644
--- a/qpid/cpp/src/qpid/broker/amqp/ManagedSession.cpp
+++ b/qpid/cpp/src/qpid/broker/amqp/ManagedSession.cpp
@@ -90,6 +90,29 @@ void ManagedSession::incomingMessageRejected()
{
}
+void ManagedSession::txStarted()
+{
+ if (session) {
+ session->inc_TxnStarts();
+ }
+}
+
+void ManagedSession::txCommitted()
+{
+ if (session) {
+ session->inc_TxnCommits();
+ session->inc_TxnCount();
+ }
+}
+
+void ManagedSession::txAborted()
+{
+ if (session) {
+ session->inc_TxnRejects();
+ session->inc_TxnCount();
+ }
+}
+
ManagedConnection& ManagedSession::getParent()
{
return parent;
diff --git a/qpid/cpp/src/qpid/broker/amqp/ManagedSession.h b/qpid/cpp/src/qpid/broker/amqp/ManagedSession.h
index 0da048654c..c35e304cfb 100644
--- a/qpid/cpp/src/qpid/broker/amqp/ManagedSession.h
+++ b/qpid/cpp/src/qpid/broker/amqp/ManagedSession.h
@@ -47,6 +47,9 @@ class ManagedSession : public qpid::management::Manageable, public OwnershipToke
void outgoingMessageSent();
void outgoingMessageAccepted();
void outgoingMessageRejected();
+ void txStarted();
+ void txCommitted();
+ void txAborted();
ManagedConnection& getParent();
qpid::management::Manageable::status_t ManagementMethod (uint32_t, qpid::management::Args&, std::string&);
diff --git a/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp b/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp
index ba073c8a36..54993d071e 100644
--- a/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp
+++ b/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp
@@ -112,6 +112,12 @@ void OutgoingFromQueue::handle(pn_delivery_t* delivery)
if (pn_delivery_updated(delivery)) {
assert(r.delivery == delivery);
r.disposition = pn_delivery_remote_state(delivery);
+
+ std::pair<TxBuffer*,uint64_t> txn = session.getTransactionalState(delivery);
+ if (txn.first) {
+ r.disposition = txn.second;
+ }
+
if (!r.disposition && pn_delivery_settled(delivery)) {
//if peer has settled without setting state, assume accepted
r.disposition = PN_ACCEPTED;
@@ -119,7 +125,7 @@ void OutgoingFromQueue::handle(pn_delivery_t* delivery)
if (r.disposition) {
switch (r.disposition) {
case PN_ACCEPTED:
- if (preAcquires()) queue->dequeue(0, r.cursor);
+ if (preAcquires()) queue->dequeue(r.cursor, txn.first);
outgoingMessageAccepted();
break;
case PN_REJECTED:
diff --git a/qpid/cpp/src/qpid/broker/amqp/Outgoing.h b/qpid/cpp/src/qpid/broker/amqp/Outgoing.h
index 8039044aea..27d8205fc8 100644
--- a/qpid/cpp/src/qpid/broker/amqp/Outgoing.h
+++ b/qpid/cpp/src/qpid/broker/amqp/Outgoing.h
@@ -77,7 +77,7 @@ class Outgoing : public ManagedOutgoingLink
virtual void handle(pn_delivery_t* delivery) = 0;
void wakeup();
virtual ~Outgoing() {}
- private:
+ protected:
Session& session;
};
diff --git a/qpid/cpp/src/qpid/broker/amqp/Session.cpp b/qpid/cpp/src/qpid/broker/amqp/Session.cpp
index 041c0b6117..68a735709b 100644
--- a/qpid/cpp/src/qpid/broker/amqp/Session.cpp
+++ b/qpid/cpp/src/qpid/broker/amqp/Session.cpp
@@ -23,12 +23,14 @@
#include "Outgoing.h"
#include "Message.h"
#include "Connection.h"
+#include "DataReader.h"
#include "Domain.h"
#include "Exception.h"
#include "Interconnects.h"
#include "NodePolicy.h"
#include "Relay.h"
#include "Topic.h"
+#include "qpid/amqp/Descriptor.h"
#include "qpid/amqp/descriptors.h"
#include "qpid/broker/Broker.h"
#include "qpid/broker/DeliverableMessage.h"
@@ -176,7 +178,7 @@ class IncomingToQueue : public DecodingIncoming
queue->markInUse(isControllingLink);
}
~IncomingToQueue() { queue->releaseFromUse(isControllingLink); }
- void handle(qpid::broker::Message& m);
+ void handle(qpid::broker::Message& m, qpid::broker::TxBuffer*);
private:
boost::shared_ptr<qpid::broker::Queue> queue;
bool isControllingLink;
@@ -194,17 +196,28 @@ class IncomingToExchange : public DecodingIncoming
{
exchange->decOtherUsers(isControllingLink);
}
- void handle(qpid::broker::Message& m);
+ void handle(qpid::broker::Message& m, qpid::broker::TxBuffer*);
private:
boost::shared_ptr<qpid::broker::Exchange> exchange;
Authorise& authorise;
bool isControllingLink;
};
+class IncomingToCoordinator : public DecodingIncoming
+{
+ public:
+ IncomingToCoordinator(pn_link_t* link, Broker& broker, Session& parent)
+ : DecodingIncoming(link, broker, parent, std::string(), "txn-ctrl", pn_link_name(link)) {}
+ ~IncomingToCoordinator() { session.abort(); }
+ void deliver(boost::intrusive_ptr<qpid::broker::amqp::Message>, pn_delivery_t*);
+ void handle(qpid::broker::Message&, qpid::broker::TxBuffer*) {}
+ private:
+};
+
Session::Session(pn_session_t* s, Connection& c, qpid::sys::OutputControl& o)
: ManagedSession(c.getBroker(), c, (boost::format("%1%") % s).str()), session(s), connection(c), out(o), deleted(false),
authorise(connection.getUserId(), connection.getBroker().getAcl()),
- detachRequested() {}
+ detachRequested(), txnId((boost::format("%1%") % s).str()) {}
Session::ResolvedNode Session::resolve(const std::string name, pn_terminus_t* terminus, bool incoming)
@@ -386,6 +399,11 @@ void Session::attach(pn_link_t* link)
std::string name;
if (pn_terminus_get_type(target) == PN_UNSPECIFIED) {
throw Exception(qpid::amqp::error_conditions::PRECONDITION_FAILED, "No target specified!");
+ } else if (pn_terminus_get_type(target) == PN_COORDINATOR) {
+ QPID_LOG(debug, "Received attach request for incoming link to transaction coordinator on " << this);
+ boost::shared_ptr<Incoming> i(new IncomingToCoordinator(link, connection.getBroker(), *this));
+ incoming[link] = i;
+ return;
} else if (pn_terminus_is_dynamic(target)) {
name = generateName(link);
QPID_LOG(debug, "Received attach request for incoming link to " << name);
@@ -623,6 +641,9 @@ void Session::writable(pn_link_t* link, pn_delivery_t* delivery)
bool Session::dispatch()
{
bool output(false);
+ if (commitPending.boolCompareAndSwap(true, false)) {
+ committed(true);
+ }
for (OutgoingLinks::iterator s = outgoing.begin(); s != outgoing.end();) {
try {
if (s->second->doWork()) output = true;
@@ -706,7 +727,117 @@ void Session::detachedByManagement()
wakeup();
}
-void IncomingToQueue::handle(qpid::broker::Message& message)
+TxBuffer* Session::getTransaction(const std::string& id)
+{
+ return (txn.get() && id == txnId) ? txn.get() : 0;
+}
+
+TxBuffer* Session::getTransaction(pn_delivery_t* delivery)
+{
+ return getTransactionalState(delivery).first;
+}
+
+std::pair<TxBuffer*,uint64_t> Session::getTransactionalState(pn_delivery_t* delivery)
+{
+ std::pair<TxBuffer*,uint64_t> result(0, 0);
+ if (pn_delivery_remote_state(delivery) == qpid::amqp::transaction::TRANSACTIONAL_STATE_CODE) {
+ pn_data_t* data = pn_disposition_data(pn_delivery_remote(delivery));
+ if (data && pn_data_next(data)) {
+ size_t count = pn_data_get_list(data);
+ if (count > 0) {
+ pn_data_enter(data);
+ pn_data_next(data);
+ std::string id = convert(pn_data_get_binary(data));
+ result.first = getTransaction(id);
+ if (!result.first) {
+ QPID_LOG(error, "Transaction not found for id: " << id);
+ }
+ if (count > 1 && pn_data_next(data)) {
+ result.second = pn_data_get_ulong(data);
+ }
+ pn_data_exit(data);
+ }
+ } else {
+ QPID_LOG(error, "Transactional delivery " << delivery << " appears to have no data");
+ }
+ }
+ return result;
+}
+
+std::string Session::declare()
+{
+ if (txn.get()) {
+ //not sure what the error code should be; none in spec really fit well.
+ throw Exception(qpid::amqp::error_conditions::transaction::ROLLBACK, "Session only supports one transaction active at a time");
+ }
+ txn = boost::intrusive_ptr<TxBuffer>(new TxBuffer());
+ connection.getBroker().getBrokerObservers().startTx(txn);
+ txStarted();
+ return txnId;
+}
+
+namespace {
+ class AsyncCommit : public qpid::broker::AsyncCompletion::Callback
+ {
+ public:
+ AsyncCommit(boost::shared_ptr<Session> s) : session(s) {}
+ void completed(bool sync) { session->committed(sync); }
+ boost::intrusive_ptr<qpid::broker::AsyncCompletion::Callback> clone()
+ {
+ boost::intrusive_ptr<qpid::broker::AsyncCompletion::Callback> copy(new AsyncCommit(session));
+ return copy;
+ }
+ private:
+ boost::shared_ptr<Session> session;
+ };
+}
+
+void Session::discharge(const std::string& id, bool failed)
+{
+ if (!txn.get() || id != txnId) {
+ throw Exception(qpid::amqp::error_conditions::transaction::UNKNOWN_ID, "No transaction declared with that id");
+ }
+ if (failed) {
+ abort();
+ } else {
+ txn->begin();
+ txn->startCommit(&connection.getBroker().getStore());
+ AsyncCommit callback(shared_from_this());
+ txn->end(callback);
+ }
+}
+
+void Session::abort()
+{
+ if (txn) {
+ txn->rollback();
+ txAborted();
+ }
+}
+
+void Session::committed(bool sync)
+{
+ if (sync) {
+ //this is on IO thread
+ if (txn.get()) {
+ txn->endCommit(&connection.getBroker().getStore());
+ txCommitted();
+ txn = boost::intrusive_ptr<TxBuffer>();
+ } else {
+ throw Exception(qpid::amqp::error_conditions::transaction::ROLLBACK, "tranaction vanished during async commit");
+ }
+ } else {
+ //this is not on IO thread, need to delay processing until on IO thread
+ if (commitPending.boolCompareAndSwap(false, true)) {
+ qpid::sys::Mutex::ScopedLock l(lock);
+ if (!deleted) {
+ out.activateOutput();
+ }
+ }
+ }
+}
+
+void IncomingToQueue::handle(qpid::broker::Message& message, qpid::broker::TxBuffer* transaction)
{
if (queue->isDeleted()) {
std::stringstream msg;
@@ -714,18 +845,18 @@ void IncomingToQueue::handle(qpid::broker::Message& message)
throw Exception(qpid::amqp::error_conditions::RESOURCE_DELETED, msg.str());
}
try {
- queue->deliver(message);
+ queue->deliver(message, transaction);
} catch (const qpid::SessionException& e) {
throw Exception(qpid::amqp::error_conditions::PRECONDITION_FAILED, e.what());
}
}
-void IncomingToExchange::handle(qpid::broker::Message& message)
+void IncomingToExchange::handle(qpid::broker::Message& message, qpid::broker::TxBuffer* transaction)
{
if (exchange->isDestroyed())
throw qpid::framing::ResourceDeletedException(QPID_MSG("Exchange " << exchange->getName() << " has been deleted."));
authorise.route(exchange, message);
- DeliverableMessage deliverable(message, 0);
+ DeliverableMessage deliverable(message, transaction);
exchange->route(deliverable);
if (!deliverable.delivered) {
if (exchange->getAlternate()) {
@@ -734,4 +865,35 @@ void IncomingToExchange::handle(qpid::broker::Message& message)
}
}
+void IncomingToCoordinator::deliver(boost::intrusive_ptr<qpid::broker::amqp::Message> message, pn_delivery_t* delivery)
+{
+ if (message && message->isTypedBody()) {
+ QPID_LOG(debug, "Coordinator got message: @" << message->getBodyDescriptor() << " " << message->getTypedBody());
+ if (message->getBodyDescriptor().match(qpid::amqp::transaction::DECLARE_SYMBOL, qpid::amqp::transaction::DECLARE_CODE)) {
+ std::string id = session.declare();
+ //encode the txn id in a 'declared' list on the disposition
+ pn_data_t* data = pn_disposition_data(pn_delivery_local(delivery));
+ pn_data_put_list(data);
+ pn_data_enter(data);
+ pn_data_put_binary(data, convert(id));
+ pn_data_exit(data);
+ pn_data_exit(data);
+ pn_delivery_update(delivery, qpid::amqp::transaction::DECLARED_CODE);
+ pn_delivery_settle(delivery);
+ session.incomingMessageAccepted();
+ } else if (message->getBodyDescriptor().match(qpid::amqp::transaction::DISCHARGE_SYMBOL, qpid::amqp::transaction::DISCHARGE_CODE)) {
+ if (message->getTypedBody().getType() == qpid::types::VAR_LIST) {
+ qpid::types::Variant::List args = message->getTypedBody().asList();
+ qpid::types::Variant::List::const_iterator i = args.begin();
+ if (i != args.end()) {
+ std::string id = *i;
+ bool failed = ++i != args.end() ? i->asBool() : false;
+ session.discharge(id, failed);
+ DecodingIncoming::deliver(message, delivery);//ensures async completion of commit is taken care of
+ }
+ }
+ }
+ }
+}
+
}}} // namespace qpid::broker::amqp
diff --git a/qpid/cpp/src/qpid/broker/amqp/Session.h b/qpid/cpp/src/qpid/broker/amqp/Session.h
index 997ad5d87b..591af1175f 100644
--- a/qpid/cpp/src/qpid/broker/amqp/Session.h
+++ b/qpid/cpp/src/qpid/broker/amqp/Session.h
@@ -21,6 +21,7 @@
* under the License.
*
*/
+#include "qpid/sys/AtomicValue.h"
#include "qpid/sys/Mutex.h"
#include "qpid/sys/OutputControl.h"
#include "qpid/broker/amqp/Authorise.h"
@@ -29,6 +30,7 @@
#include <deque>
#include <map>
#include <set>
+#include <boost/intrusive_ptr.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/enable_shared_from_this.hpp>
@@ -43,6 +45,7 @@ namespace broker {
class Broker;
class Exchange;
class Queue;
+class TxBuffer;
namespace amqp {
@@ -76,10 +79,20 @@ class Session : public ManagedSession, public boost::enable_shared_from_this<Ses
//called when a transfer is completly processed (e.g.including stored on disk)
void accepted(pn_delivery_t*, bool sync);
+ //called when async transaction completes
+ void committed(bool sync);
void wakeup();
Authorise& getAuthorise();
+
+ TxBuffer* getTransaction(const std::string&);
+ TxBuffer* getTransaction(pn_delivery_t*);
+ std::pair<TxBuffer*,uint64_t> getTransactionalState(pn_delivery_t*);
+ //transaction coordination:
+ std::string declare();
+ void discharge(const std::string& id, bool failed);
+ void abort();
protected:
void detachedByManagement();
private:
@@ -96,6 +109,9 @@ class Session : public ManagedSession, public boost::enable_shared_from_this<Ses
std::set< boost::shared_ptr<Queue> > exclusiveQueues;
Authorise authorise;
bool detachRequested;
+ boost::intrusive_ptr<TxBuffer> txn;
+ std::string txnId;
+ qpid::sys::AtomicValue<bool> commitPending;
struct ResolvedNode
{