diff options
Diffstat (limited to 'qpid/cpp/src/qpid/broker')
-rw-r--r-- | qpid/cpp/src/qpid/broker/Queue.cpp | 2 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/amqp/Exception.h | 3 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/amqp/Incoming.cpp | 3 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp | 5 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/amqp/Session.cpp | 120 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/amqp/Session.h | 17 |
6 files changed, 98 insertions, 52 deletions
diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp index b1f7d0524b..4dd6455104 100644 --- a/qpid/cpp/src/qpid/broker/Queue.cpp +++ b/qpid/cpp/src/qpid/broker/Queue.cpp @@ -297,6 +297,8 @@ void Queue::deliverTo(Message msg, TxBuffer* txn) if (txn) { TxOp::shared_ptr op(new TxPublish(msg, shared_from_this())); txn->enlist(op); + QPID_LOG(debug, "Message " << msg.getSequence() << " enqueue on " << name + << " enlisted in " << txn); } else { if (enqueue(0, msg)) { push(msg); diff --git a/qpid/cpp/src/qpid/broker/amqp/Exception.h b/qpid/cpp/src/qpid/broker/amqp/Exception.h index c2fe470e55..a129dffe1f 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Exception.h +++ b/qpid/cpp/src/qpid/broker/amqp/Exception.h @@ -22,6 +22,7 @@ * */ #include <string> +#include <qpid/Exception.h> namespace qpid { namespace broker { @@ -29,7 +30,7 @@ namespace amqp { /** * Exception to signal various AMQP 1.0 defined conditions */ -class Exception : public std::exception +class Exception : public qpid::Exception { public: Exception(const std::string& name, const std::string& description); diff --git a/qpid/cpp/src/qpid/broker/amqp/Incoming.cpp b/qpid/cpp/src/qpid/broker/amqp/Incoming.cpp index d4f73fc511..3986818846 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Incoming.cpp +++ b/qpid/cpp/src/qpid/broker/amqp/Incoming.cpp @@ -100,6 +100,7 @@ namespace { boost::intrusive_ptr<qpid::broker::AsyncCompletion::Callback> copy(new Transfer(delivery, session)); return copy; } + private: pn_delivery_t* delivery; boost::shared_ptr<Session> session; @@ -146,8 +147,8 @@ void DecodingIncoming::deliver(boost::intrusive_ptr<qpid::broker::amqp::Message> { qpid::broker::Message message(received, received); userid.verify(message.getUserId()); - handle(message, session.getTransaction(delivery)); received->begin(); + handle(message, session.getTransaction(delivery)); Transfer t(delivery, sessionPtr); received->end(t); } diff --git a/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp b/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp index 0136d5a0ed..f2949c5879 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp +++ b/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp @@ -28,6 +28,7 @@ #include "qpid/broker/TopicKeyNode.h" #include "qpid/sys/OutputControl.h" #include "qpid/amqp/descriptors.h" +#include "qpid/amqp/Descriptor.h" #include "qpid/amqp/MessageEncoder.h" #include "qpid/framing/Buffer.h" #include "qpid/framing/reply_exceptions.h" @@ -90,13 +91,13 @@ bool OutgoingFromQueue::doWork() return true; } else { pn_link_drained(link); - QPID_LOG(debug, "No message available on " << queue->getName()); + QPID_LOG(trace, "No message available on " << queue->getName()); } } catch (const qpid::framing::ResourceDeletedException& e) { throw Exception(qpid::amqp::error_conditions::RESOURCE_DELETED, e.what()); } } else { - QPID_LOG(debug, "Can't deliver to " << getName() << " from " << queue->getName() << ": " << pn_link_credit(link)); + QPID_LOG(trace, "Can't deliver to " << getName() << " from " << queue->getName() << ": " << pn_link_credit(link)); } return false; } diff --git a/qpid/cpp/src/qpid/broker/amqp/Session.cpp b/qpid/cpp/src/qpid/broker/amqp/Session.cpp index 0e44374d19..3b65e6a64d 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Session.cpp +++ b/qpid/cpp/src/qpid/broker/amqp/Session.cpp @@ -61,6 +61,8 @@ namespace qpid { namespace broker { namespace amqp { +using namespace qpid::amqp::transaction; + namespace { pn_bytes_t convert(const std::string& s) { @@ -209,6 +211,7 @@ class IncomingToCoordinator : public DecodingIncoming public: IncomingToCoordinator(pn_link_t* link, Broker& broker, Session& parent) : DecodingIncoming(link, broker, parent, std::string(), "txn-ctrl", pn_link_name(link)) {} + ~IncomingToCoordinator() { session.abort(); } void deliver(boost::intrusive_ptr<qpid::broker::amqp::Message>, pn_delivery_t*); void handle(qpid::broker::Message&, qpid::broker::TxBuffer*) {} @@ -218,7 +221,9 @@ class IncomingToCoordinator : public DecodingIncoming Session::Session(pn_session_t* s, Connection& c, qpid::sys::OutputControl& o) : ManagedSession(c.getBroker(), c, (boost::format("%1%") % s).str()), session(s), connection(c), out(o), deleted(false), authorise(connection.getUserId(), connection.getBroker().getAcl()), - detachRequested(), txnId((boost::format("%1%") % s).str()) {} + detachRequested(), + tx(*this) +{} Session::ResolvedNode Session::resolve(const std::string name, pn_terminus_t* terminus, bool incoming) @@ -636,11 +641,12 @@ void Session::readable(pn_link_t* link, pn_delivery_t* delivery) if (target->second->haveWork()) out.activateOutput(); } } + void Session::writable(pn_link_t* link, pn_delivery_t* delivery) { OutgoingLinks::iterator sender = outgoing.find(link); if (sender == outgoing.end()) { - QPID_LOG(error, "Delivery returned for unknown link"); + QPID_LOG(error, "Delivery returned for unknown link " << pn_link_name(link)); } else { sender->second->handle(delivery); } @@ -649,7 +655,7 @@ void Session::writable(pn_link_t* link, pn_delivery_t* delivery) bool Session::dispatch() { bool output(false); - if (commitPending.boolCompareAndSwap(true, false)) { + if (tx.commitPending.boolCompareAndSwap(true, false)) { committed(true); } for (OutgoingLinks::iterator s = outgoing.begin(); s != outgoing.end();) { @@ -737,7 +743,7 @@ void Session::detachedByManagement() TxBuffer* Session::getTransaction(const std::string& id) { - return (txn.get() && id == txnId) ? txn.get() : 0; + return (tx.buffer.get() && id == tx.id) ? tx.buffer.get() : 0; } TxBuffer* Session::getTransaction(pn_delivery_t* delivery) @@ -748,42 +754,41 @@ TxBuffer* Session::getTransaction(pn_delivery_t* delivery) std::pair<TxBuffer*,uint64_t> Session::getTransactionalState(pn_delivery_t* delivery) { std::pair<TxBuffer*,uint64_t> result((TxBuffer*)0, 0); - if (pn_delivery_remote_state(delivery) == qpid::amqp::transaction::TRANSACTIONAL_STATE_CODE) { + if (pn_delivery_remote_state(delivery) == TRANSACTIONAL_STATE_CODE) { pn_data_t* data = pn_disposition_data(pn_delivery_remote(delivery)); - if (data && pn_data_next(data)) { - size_t count = pn_data_get_list(data); - if (count > 0) { + pn_data_rewind(data); + size_t count = 0; + if (data && pn_data_next(data) && (count = pn_data_get_list(data)) > 0) { + pn_data_enter(data); + pn_data_next(data); + std::string id = convert(pn_data_get_binary(data)); + result.first = getTransaction(id); + if (!result.first) { + QPID_LOG(error, "Transaction not found for id: " << id); + } + if (count > 1 && pn_data_next(data)) { pn_data_enter(data); pn_data_next(data); - std::string id = convert(pn_data_get_binary(data)); - result.first = getTransaction(id); - if (!result.first) { - QPID_LOG(error, "Transaction not found for id: " << id); - } - if (count > 1 && pn_data_next(data) && pn_data_is_described(data)) { - pn_data_enter(data); - pn_data_next(data); - result.second = pn_data_get_ulong(data); - } - pn_data_exit(data); + result.second = pn_data_get_ulong(data); } - } else { - QPID_LOG(error, "Transactional delivery " << delivery << " appears to have no data"); } + else + QPID_LOG(error, "Transactional delivery " << delivery << " appears to have no data"); } return result; } std::string Session::declare() { - if (txn.get()) { + if (tx.buffer.get()) { //not sure what the error code should be; none in spec really fit well. - throw Exception(qpid::amqp::error_conditions::transaction::ROLLBACK, "Session only supports one transaction active at a time"); + throw Exception(qpid::amqp::error_conditions::transaction::ROLLBACK, + "Session only supports one transaction active at a time"); } - txn = boost::intrusive_ptr<TxBuffer>(new TxBuffer()); - connection.getBroker().getBrokerObservers().startTx(txn); + tx.buffer = boost::intrusive_ptr<TxBuffer>(new TxBuffer()); + connection.getBroker().getBrokerObservers().startTx(tx.buffer); txStarted(); - return txnId; + return tx.id; } namespace { @@ -797,32 +802,41 @@ namespace { boost::intrusive_ptr<qpid::broker::AsyncCompletion::Callback> copy(new AsyncCommit(session)); return copy; } + private: boost::shared_ptr<Session> session; }; } -void Session::discharge(const std::string& id, bool failed) +void Session::discharge(const std::string& id, bool failed, pn_delivery_t* delivery) { - if (!txn.get() || id != txnId) { - throw Exception(qpid::amqp::error_conditions::transaction::UNKNOWN_ID, "No transaction declared with that id"); + QPID_LOG(debug, "Coordinator " << (failed ? " rollback" : " commit") + << " transaction " << id); + if (!tx.buffer.get() || id != tx.id) { + throw Exception(qpid::amqp::error_conditions::transaction::UNKNOWN_ID, + Msg() << "Cannot discharge transaction " << id + << (tx.buffer.get() ? Msg() << ", current transaction is " << tx.id : + Msg() << ", no current transaction")); } + tx.discharge = delivery; if (failed) { abort(); } else { - txn->begin(); - txn->startCommit(&connection.getBroker().getStore()); + tx.buffer->begin(); + tx.buffer->startCommit(&connection.getBroker().getStore()); AsyncCommit callback(shared_from_this()); - txn->end(callback); + tx.buffer->end(callback); } } void Session::abort() { - if (txn) { - txn->rollback(); + if (tx.buffer) { + tx.dischargeComplete(); + tx.buffer->rollback(); txAborted(); - txn = boost::intrusive_ptr<TxBuffer>(); + tx.buffer.reset(); + QPID_LOG(debug, "Transaction " << tx.id << " rolled back"); } } @@ -830,16 +844,18 @@ void Session::committed(bool sync) { if (sync) { //this is on IO thread - if (txn.get()) { - txn->endCommit(&connection.getBroker().getStore()); + tx.dischargeComplete(); + if (tx.buffer.get()) { + tx.buffer->endCommit(&connection.getBroker().getStore()); txCommitted(); - txn = boost::intrusive_ptr<TxBuffer>(); + tx.buffer.reset(); + QPID_LOG(debug, "Transaction " << tx.id << " comitted"); } else { throw Exception(qpid::amqp::error_conditions::transaction::ROLLBACK, "tranaction vanished during async commit"); } } else { //this is not on IO thread, need to delay processing until on IO thread - if (commitPending.boolCompareAndSwap(false, true)) { + if (tx.commitPending.boolCompareAndSwap(false, true)) { qpid::sys::Mutex::ScopedLock l(lock); if (!deleted) { out.activateOutput(); @@ -880,7 +896,7 @@ void IncomingToCoordinator::deliver(boost::intrusive_ptr<qpid::broker::amqp::Mes { if (message && message->isTypedBody()) { QPID_LOG(debug, "Coordinator got message: @" << message->getBodyDescriptor() << " " << message->getTypedBody()); - if (message->getBodyDescriptor().match(qpid::amqp::transaction::DECLARE_SYMBOL, qpid::amqp::transaction::DECLARE_CODE)) { + if (message->getBodyDescriptor().match(DECLARE_SYMBOL, DECLARE_CODE)) { std::string id = session.declare(); //encode the txn id in a 'declared' list on the disposition pn_data_t* data = pn_disposition_data(pn_delivery_local(delivery)); @@ -889,22 +905,38 @@ void IncomingToCoordinator::deliver(boost::intrusive_ptr<qpid::broker::amqp::Mes pn_data_put_binary(data, convert(id)); pn_data_exit(data); pn_data_exit(data); - pn_delivery_update(delivery, qpid::amqp::transaction::DECLARED_CODE); + pn_delivery_update(delivery, DECLARED_CODE); pn_delivery_settle(delivery); session.incomingMessageAccepted(); - } else if (message->getBodyDescriptor().match(qpid::amqp::transaction::DISCHARGE_SYMBOL, qpid::amqp::transaction::DISCHARGE_CODE)) { + QPID_LOG(debug, "Coordinator declared transaction " << id); + } else if (message->getBodyDescriptor().match(DISCHARGE_SYMBOL, DISCHARGE_CODE)) { if (message->getTypedBody().getType() == qpid::types::VAR_LIST) { qpid::types::Variant::List args = message->getTypedBody().asList(); qpid::types::Variant::List::const_iterator i = args.begin(); if (i != args.end()) { std::string id = *i; bool failed = ++i != args.end() ? i->asBool() : false; - session.discharge(id, failed); - DecodingIncoming::deliver(message, delivery);//ensures async completion of commit is taken care of + session.discharge(id, failed, delivery); } + + } else { + throw framing::IllegalArgumentException( + Msg() << "Coordinator unknown message: @" << + message->getBodyDescriptor() << " " << message->getTypedBody()); } } } } +Session::Transaction::Transaction(Session& s) : + session(s), id((boost::format("%1%") % &s).str()), discharge(0) {} + +// Called in IO thread to signal completion of dischage by settling discharge message. +void Session::Transaction::dischargeComplete() { + if (buffer.get() && discharge) { + session.accepted(discharge, false); // Queue up accept and activate output. + discharge = 0; + } +} + }}} // namespace qpid::broker::amqp diff --git a/qpid/cpp/src/qpid/broker/amqp/Session.h b/qpid/cpp/src/qpid/broker/amqp/Session.h index 591af1175f..ea3fb82beb 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Session.h +++ b/qpid/cpp/src/qpid/broker/amqp/Session.h @@ -91,7 +91,7 @@ class Session : public ManagedSession, public boost::enable_shared_from_this<Ses std::pair<TxBuffer*,uint64_t> getTransactionalState(pn_delivery_t*); //transaction coordination: std::string declare(); - void discharge(const std::string& id, bool failed); + void discharge(const std::string& id, bool failed, pn_delivery_t*); void abort(); protected: void detachedByManagement(); @@ -109,9 +109,18 @@ class Session : public ManagedSession, public boost::enable_shared_from_this<Ses std::set< boost::shared_ptr<Queue> > exclusiveQueues; Authorise authorise; bool detachRequested; - boost::intrusive_ptr<TxBuffer> txn; - std::string txnId; - qpid::sys::AtomicValue<bool> commitPending; + + struct Transaction { + Transaction(Session&); + void dischargeComplete(); + + Session& session; + boost::intrusive_ptr<TxBuffer> buffer; + std::string id; + qpid::sys::AtomicValue<bool> commitPending; + pn_delivery_t* discharge; + }; + Transaction tx; struct ResolvedNode { |