summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/broker/TxBuffer.cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2013-10-29 15:23:49 +0000
committerAlan Conway <aconway@apache.org>2013-10-29 15:23:49 +0000
commit1731c3ba99577fa515985609a675afd89e5c91e4 (patch)
tree8432209a8e11f28fca72d8d7972016572da0f3c6 /qpid/cpp/src/qpid/broker/TxBuffer.cpp
parent7033bf67ab672fafc57d374f8a727cc8e4b7c54e (diff)
downloadqpid-python-1731c3ba99577fa515985609a675afd89e5c91e4.tar.gz
QPID-5139: HA transactions block a thread, can deadlock the broker
PrimaryTxObserver::prepare used to block pending responses from each backup. With concurrent transactions this can deadlock the broker: once all worker threads are blocked in prepare, responses from backups cannot be received. This commit generalizes the async completion mechanism for messages to allow async completion of arbitrary commands. It leaves the special-case code for messages undisturbed but adds a second path (starting from SessionState::handleCommand) for async completion of other commands. In particular it implements tx.commit to allow async completion. TxBuffer is now an AsyncCompletion and commitLocal() is split into - startCommit() called by SemanticState::commit() - endCommit() called when the commit command completes TxAccept no longer holds pre-computed ranges, compute fresh each time. - Avoid range iterators going out of date during a delayed commit. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1536754 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src/qpid/broker/TxBuffer.cpp')
-rw-r--r--qpid/cpp/src/qpid/broker/TxBuffer.cpp63
1 files changed, 41 insertions, 22 deletions
diff --git a/qpid/cpp/src/qpid/broker/TxBuffer.cpp b/qpid/cpp/src/qpid/broker/TxBuffer.cpp
index a8df4fb214..6dc2f5c2f4 100644
--- a/qpid/cpp/src/qpid/broker/TxBuffer.cpp
+++ b/qpid/cpp/src/qpid/broker/TxBuffer.cpp
@@ -21,22 +21,28 @@
#include "qpid/broker/TxBuffer.h"
#include "qpid/broker/TransactionObserver.h"
#include "qpid/log/Statement.h"
+#include "qpid/framing/reply_exceptions.h"
#include <boost/mem_fn.hpp>
#include <boost/bind.hpp>
+
+namespace qpid {
+namespace broker{
+
using boost::mem_fn;
-using namespace qpid::broker;
+using framing::InternalErrorException;
TxBuffer::TxBuffer() : observer(new NullTransactionObserver) {}
bool TxBuffer::prepare(TransactionContext* const ctxt)
{
+ // The observer may call startCompleter to delay completion.
if (!observer->prepare()) return false;
for(op_iterator i = ops.begin(); i != ops.end(); i++){
- if(!(*i)->prepare(ctxt)){
- return false;
- }
+ if(!(*i)->prepare(ctxt)) return false;
}
+ // At this point prepare has succeeded locally but if completion is delayed,
+ // then completing threads may call setError to indicate an error.
return true;
}
@@ -60,24 +66,37 @@ void TxBuffer::enlist(TxOp::shared_ptr op)
ops.push_back(op);
}
-bool TxBuffer::commitLocal(TransactionalStore* const store)
+void TxBuffer::startCommit(TransactionalStore* const store)
{
- if (!store) return false;
- try {
- std::auto_ptr<TransactionContext> ctxt = store->begin();
- if (prepare(ctxt.get())) {
- store->commit(*ctxt);
- commit();
- return true;
- } else {
- store->abort(*ctxt);
- rollback();
- return false;
- }
- } catch (std::exception& e) {
- QPID_LOG(error, "Commit failed with exception: " << e.what());
- } catch (...) {
- QPID_LOG(error, "Commit failed with unknown exception");
+ if (!store) throw Exception("Can't commit transaction, no store.");
+ txContext.reset(store->begin().release());
+ if (!prepare(txContext.get()))
+ setError("Transaction prepare failed.");
+}
+
+// Called when async completion is complete.
+std::string TxBuffer::endCommit(TransactionalStore* const store) {
+ std::string e;
+ {
+ sys::Mutex::ScopedLock l(errorLock);
+ e = error;
+ }
+ if (!e.empty()) {
+ store->abort(*txContext);
+ rollback();
+ throw InternalErrorException(e);
}
- return false;
+ else {
+ store->commit(*txContext);
+ commit();
+ }
+ return std::string(); // There is no result from tx.commit
+}
+
+void TxBuffer::setError(const std::string& e) {
+ QPID_LOG(error, "Asynchronous transaction error: " << e);
+ sys::Mutex::ScopedLock l(errorLock);
+ error = e;
}
+
+}} // namespace qpid::broker