summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/broker
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/broker')
-rw-r--r--qpid/cpp/src/qpid/broker/Queue.cpp2
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/Exception.h3
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/Incoming.cpp3
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp5
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/Session.cpp120
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/Session.h17
6 files changed, 98 insertions, 52 deletions
diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp
index b1f7d0524b..4dd6455104 100644
--- a/qpid/cpp/src/qpid/broker/Queue.cpp
+++ b/qpid/cpp/src/qpid/broker/Queue.cpp
@@ -297,6 +297,8 @@ void Queue::deliverTo(Message msg, TxBuffer* txn)
if (txn) {
TxOp::shared_ptr op(new TxPublish(msg, shared_from_this()));
txn->enlist(op);
+ QPID_LOG(debug, "Message " << msg.getSequence() << " enqueue on " << name
+ << " enlisted in " << txn);
} else {
if (enqueue(0, msg)) {
push(msg);
diff --git a/qpid/cpp/src/qpid/broker/amqp/Exception.h b/qpid/cpp/src/qpid/broker/amqp/Exception.h
index c2fe470e55..a129dffe1f 100644
--- a/qpid/cpp/src/qpid/broker/amqp/Exception.h
+++ b/qpid/cpp/src/qpid/broker/amqp/Exception.h
@@ -22,6 +22,7 @@
*
*/
#include <string>
+#include <qpid/Exception.h>
namespace qpid {
namespace broker {
@@ -29,7 +30,7 @@ namespace amqp {
/**
* Exception to signal various AMQP 1.0 defined conditions
*/
-class Exception : public std::exception
+class Exception : public qpid::Exception
{
public:
Exception(const std::string& name, const std::string& description);
diff --git a/qpid/cpp/src/qpid/broker/amqp/Incoming.cpp b/qpid/cpp/src/qpid/broker/amqp/Incoming.cpp
index d4f73fc511..3986818846 100644
--- a/qpid/cpp/src/qpid/broker/amqp/Incoming.cpp
+++ b/qpid/cpp/src/qpid/broker/amqp/Incoming.cpp
@@ -100,6 +100,7 @@ namespace {
boost::intrusive_ptr<qpid::broker::AsyncCompletion::Callback> copy(new Transfer(delivery, session));
return copy;
}
+
private:
pn_delivery_t* delivery;
boost::shared_ptr<Session> session;
@@ -146,8 +147,8 @@ void DecodingIncoming::deliver(boost::intrusive_ptr<qpid::broker::amqp::Message>
{
qpid::broker::Message message(received, received);
userid.verify(message.getUserId());
- handle(message, session.getTransaction(delivery));
received->begin();
+ handle(message, session.getTransaction(delivery));
Transfer t(delivery, sessionPtr);
received->end(t);
}
diff --git a/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp b/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp
index 0136d5a0ed..f2949c5879 100644
--- a/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp
+++ b/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp
@@ -28,6 +28,7 @@
#include "qpid/broker/TopicKeyNode.h"
#include "qpid/sys/OutputControl.h"
#include "qpid/amqp/descriptors.h"
+#include "qpid/amqp/Descriptor.h"
#include "qpid/amqp/MessageEncoder.h"
#include "qpid/framing/Buffer.h"
#include "qpid/framing/reply_exceptions.h"
@@ -90,13 +91,13 @@ bool OutgoingFromQueue::doWork()
return true;
} else {
pn_link_drained(link);
- QPID_LOG(debug, "No message available on " << queue->getName());
+ QPID_LOG(trace, "No message available on " << queue->getName());
}
} catch (const qpid::framing::ResourceDeletedException& e) {
throw Exception(qpid::amqp::error_conditions::RESOURCE_DELETED, e.what());
}
} else {
- QPID_LOG(debug, "Can't deliver to " << getName() << " from " << queue->getName() << ": " << pn_link_credit(link));
+ QPID_LOG(trace, "Can't deliver to " << getName() << " from " << queue->getName() << ": " << pn_link_credit(link));
}
return false;
}
diff --git a/qpid/cpp/src/qpid/broker/amqp/Session.cpp b/qpid/cpp/src/qpid/broker/amqp/Session.cpp
index 0e44374d19..3b65e6a64d 100644
--- a/qpid/cpp/src/qpid/broker/amqp/Session.cpp
+++ b/qpid/cpp/src/qpid/broker/amqp/Session.cpp
@@ -61,6 +61,8 @@ namespace qpid {
namespace broker {
namespace amqp {
+using namespace qpid::amqp::transaction;
+
namespace {
pn_bytes_t convert(const std::string& s)
{
@@ -209,6 +211,7 @@ class IncomingToCoordinator : public DecodingIncoming
public:
IncomingToCoordinator(pn_link_t* link, Broker& broker, Session& parent)
: DecodingIncoming(link, broker, parent, std::string(), "txn-ctrl", pn_link_name(link)) {}
+
~IncomingToCoordinator() { session.abort(); }
void deliver(boost::intrusive_ptr<qpid::broker::amqp::Message>, pn_delivery_t*);
void handle(qpid::broker::Message&, qpid::broker::TxBuffer*) {}
@@ -218,7 +221,9 @@ class IncomingToCoordinator : public DecodingIncoming
Session::Session(pn_session_t* s, Connection& c, qpid::sys::OutputControl& o)
: ManagedSession(c.getBroker(), c, (boost::format("%1%") % s).str()), session(s), connection(c), out(o), deleted(false),
authorise(connection.getUserId(), connection.getBroker().getAcl()),
- detachRequested(), txnId((boost::format("%1%") % s).str()) {}
+ detachRequested(),
+ tx(*this)
+{}
Session::ResolvedNode Session::resolve(const std::string name, pn_terminus_t* terminus, bool incoming)
@@ -636,11 +641,12 @@ void Session::readable(pn_link_t* link, pn_delivery_t* delivery)
if (target->second->haveWork()) out.activateOutput();
}
}
+
void Session::writable(pn_link_t* link, pn_delivery_t* delivery)
{
OutgoingLinks::iterator sender = outgoing.find(link);
if (sender == outgoing.end()) {
- QPID_LOG(error, "Delivery returned for unknown link");
+ QPID_LOG(error, "Delivery returned for unknown link " << pn_link_name(link));
} else {
sender->second->handle(delivery);
}
@@ -649,7 +655,7 @@ void Session::writable(pn_link_t* link, pn_delivery_t* delivery)
bool Session::dispatch()
{
bool output(false);
- if (commitPending.boolCompareAndSwap(true, false)) {
+ if (tx.commitPending.boolCompareAndSwap(true, false)) {
committed(true);
}
for (OutgoingLinks::iterator s = outgoing.begin(); s != outgoing.end();) {
@@ -737,7 +743,7 @@ void Session::detachedByManagement()
TxBuffer* Session::getTransaction(const std::string& id)
{
- return (txn.get() && id == txnId) ? txn.get() : 0;
+ return (tx.buffer.get() && id == tx.id) ? tx.buffer.get() : 0;
}
TxBuffer* Session::getTransaction(pn_delivery_t* delivery)
@@ -748,42 +754,41 @@ TxBuffer* Session::getTransaction(pn_delivery_t* delivery)
std::pair<TxBuffer*,uint64_t> Session::getTransactionalState(pn_delivery_t* delivery)
{
std::pair<TxBuffer*,uint64_t> result((TxBuffer*)0, 0);
- if (pn_delivery_remote_state(delivery) == qpid::amqp::transaction::TRANSACTIONAL_STATE_CODE) {
+ if (pn_delivery_remote_state(delivery) == TRANSACTIONAL_STATE_CODE) {
pn_data_t* data = pn_disposition_data(pn_delivery_remote(delivery));
- if (data && pn_data_next(data)) {
- size_t count = pn_data_get_list(data);
- if (count > 0) {
+ pn_data_rewind(data);
+ size_t count = 0;
+ if (data && pn_data_next(data) && (count = pn_data_get_list(data)) > 0) {
+ pn_data_enter(data);
+ pn_data_next(data);
+ std::string id = convert(pn_data_get_binary(data));
+ result.first = getTransaction(id);
+ if (!result.first) {
+ QPID_LOG(error, "Transaction not found for id: " << id);
+ }
+ if (count > 1 && pn_data_next(data)) {
pn_data_enter(data);
pn_data_next(data);
- std::string id = convert(pn_data_get_binary(data));
- result.first = getTransaction(id);
- if (!result.first) {
- QPID_LOG(error, "Transaction not found for id: " << id);
- }
- if (count > 1 && pn_data_next(data) && pn_data_is_described(data)) {
- pn_data_enter(data);
- pn_data_next(data);
- result.second = pn_data_get_ulong(data);
- }
- pn_data_exit(data);
+ result.second = pn_data_get_ulong(data);
}
- } else {
- QPID_LOG(error, "Transactional delivery " << delivery << " appears to have no data");
}
+ else
+ QPID_LOG(error, "Transactional delivery " << delivery << " appears to have no data");
}
return result;
}
std::string Session::declare()
{
- if (txn.get()) {
+ if (tx.buffer.get()) {
//not sure what the error code should be; none in spec really fit well.
- throw Exception(qpid::amqp::error_conditions::transaction::ROLLBACK, "Session only supports one transaction active at a time");
+ throw Exception(qpid::amqp::error_conditions::transaction::ROLLBACK,
+ "Session only supports one transaction active at a time");
}
- txn = boost::intrusive_ptr<TxBuffer>(new TxBuffer());
- connection.getBroker().getBrokerObservers().startTx(txn);
+ tx.buffer = boost::intrusive_ptr<TxBuffer>(new TxBuffer());
+ connection.getBroker().getBrokerObservers().startTx(tx.buffer);
txStarted();
- return txnId;
+ return tx.id;
}
namespace {
@@ -797,32 +802,41 @@ namespace {
boost::intrusive_ptr<qpid::broker::AsyncCompletion::Callback> copy(new AsyncCommit(session));
return copy;
}
+
private:
boost::shared_ptr<Session> session;
};
}
-void Session::discharge(const std::string& id, bool failed)
+void Session::discharge(const std::string& id, bool failed, pn_delivery_t* delivery)
{
- if (!txn.get() || id != txnId) {
- throw Exception(qpid::amqp::error_conditions::transaction::UNKNOWN_ID, "No transaction declared with that id");
+ QPID_LOG(debug, "Coordinator " << (failed ? " rollback" : " commit")
+ << " transaction " << id);
+ if (!tx.buffer.get() || id != tx.id) {
+ throw Exception(qpid::amqp::error_conditions::transaction::UNKNOWN_ID,
+ Msg() << "Cannot discharge transaction " << id
+ << (tx.buffer.get() ? Msg() << ", current transaction is " << tx.id :
+ Msg() << ", no current transaction"));
}
+ tx.discharge = delivery;
if (failed) {
abort();
} else {
- txn->begin();
- txn->startCommit(&connection.getBroker().getStore());
+ tx.buffer->begin();
+ tx.buffer->startCommit(&connection.getBroker().getStore());
AsyncCommit callback(shared_from_this());
- txn->end(callback);
+ tx.buffer->end(callback);
}
}
void Session::abort()
{
- if (txn) {
- txn->rollback();
+ if (tx.buffer) {
+ tx.dischargeComplete();
+ tx.buffer->rollback();
txAborted();
- txn = boost::intrusive_ptr<TxBuffer>();
+ tx.buffer.reset();
+ QPID_LOG(debug, "Transaction " << tx.id << " rolled back");
}
}
@@ -830,16 +844,18 @@ void Session::committed(bool sync)
{
if (sync) {
//this is on IO thread
- if (txn.get()) {
- txn->endCommit(&connection.getBroker().getStore());
+ tx.dischargeComplete();
+ if (tx.buffer.get()) {
+ tx.buffer->endCommit(&connection.getBroker().getStore());
txCommitted();
- txn = boost::intrusive_ptr<TxBuffer>();
+ tx.buffer.reset();
+ QPID_LOG(debug, "Transaction " << tx.id << " comitted");
} else {
throw Exception(qpid::amqp::error_conditions::transaction::ROLLBACK, "tranaction vanished during async commit");
}
} else {
//this is not on IO thread, need to delay processing until on IO thread
- if (commitPending.boolCompareAndSwap(false, true)) {
+ if (tx.commitPending.boolCompareAndSwap(false, true)) {
qpid::sys::Mutex::ScopedLock l(lock);
if (!deleted) {
out.activateOutput();
@@ -880,7 +896,7 @@ void IncomingToCoordinator::deliver(boost::intrusive_ptr<qpid::broker::amqp::Mes
{
if (message && message->isTypedBody()) {
QPID_LOG(debug, "Coordinator got message: @" << message->getBodyDescriptor() << " " << message->getTypedBody());
- if (message->getBodyDescriptor().match(qpid::amqp::transaction::DECLARE_SYMBOL, qpid::amqp::transaction::DECLARE_CODE)) {
+ if (message->getBodyDescriptor().match(DECLARE_SYMBOL, DECLARE_CODE)) {
std::string id = session.declare();
//encode the txn id in a 'declared' list on the disposition
pn_data_t* data = pn_disposition_data(pn_delivery_local(delivery));
@@ -889,22 +905,38 @@ void IncomingToCoordinator::deliver(boost::intrusive_ptr<qpid::broker::amqp::Mes
pn_data_put_binary(data, convert(id));
pn_data_exit(data);
pn_data_exit(data);
- pn_delivery_update(delivery, qpid::amqp::transaction::DECLARED_CODE);
+ pn_delivery_update(delivery, DECLARED_CODE);
pn_delivery_settle(delivery);
session.incomingMessageAccepted();
- } else if (message->getBodyDescriptor().match(qpid::amqp::transaction::DISCHARGE_SYMBOL, qpid::amqp::transaction::DISCHARGE_CODE)) {
+ QPID_LOG(debug, "Coordinator declared transaction " << id);
+ } else if (message->getBodyDescriptor().match(DISCHARGE_SYMBOL, DISCHARGE_CODE)) {
if (message->getTypedBody().getType() == qpid::types::VAR_LIST) {
qpid::types::Variant::List args = message->getTypedBody().asList();
qpid::types::Variant::List::const_iterator i = args.begin();
if (i != args.end()) {
std::string id = *i;
bool failed = ++i != args.end() ? i->asBool() : false;
- session.discharge(id, failed);
- DecodingIncoming::deliver(message, delivery);//ensures async completion of commit is taken care of
+ session.discharge(id, failed, delivery);
}
+
+ } else {
+ throw framing::IllegalArgumentException(
+ Msg() << "Coordinator unknown message: @" <<
+ message->getBodyDescriptor() << " " << message->getTypedBody());
}
}
}
}
+Session::Transaction::Transaction(Session& s) :
+ session(s), id((boost::format("%1%") % &s).str()), discharge(0) {}
+
+// Called in IO thread to signal completion of dischage by settling discharge message.
+void Session::Transaction::dischargeComplete() {
+ if (buffer.get() && discharge) {
+ session.accepted(discharge, false); // Queue up accept and activate output.
+ discharge = 0;
+ }
+}
+
}}} // namespace qpid::broker::amqp
diff --git a/qpid/cpp/src/qpid/broker/amqp/Session.h b/qpid/cpp/src/qpid/broker/amqp/Session.h
index 591af1175f..ea3fb82beb 100644
--- a/qpid/cpp/src/qpid/broker/amqp/Session.h
+++ b/qpid/cpp/src/qpid/broker/amqp/Session.h
@@ -91,7 +91,7 @@ class Session : public ManagedSession, public boost::enable_shared_from_this<Ses
std::pair<TxBuffer*,uint64_t> getTransactionalState(pn_delivery_t*);
//transaction coordination:
std::string declare();
- void discharge(const std::string& id, bool failed);
+ void discharge(const std::string& id, bool failed, pn_delivery_t*);
void abort();
protected:
void detachedByManagement();
@@ -109,9 +109,18 @@ class Session : public ManagedSession, public boost::enable_shared_from_this<Ses
std::set< boost::shared_ptr<Queue> > exclusiveQueues;
Authorise authorise;
bool detachRequested;
- boost::intrusive_ptr<TxBuffer> txn;
- std::string txnId;
- qpid::sys::AtomicValue<bool> commitPending;
+
+ struct Transaction {
+ Transaction(Session&);
+ void dischargeComplete();
+
+ Session& session;
+ boost::intrusive_ptr<TxBuffer> buffer;
+ std::string id;
+ qpid::sys::AtomicValue<bool> commitPending;
+ pn_delivery_t* discharge;
+ };
+ Transaction tx;
struct ResolvedNode
{