summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/broker/TxBuffer.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/broker/TxBuffer.cpp')
-rw-r--r--qpid/cpp/src/qpid/broker/TxBuffer.cpp63
1 files changed, 41 insertions, 22 deletions
diff --git a/qpid/cpp/src/qpid/broker/TxBuffer.cpp b/qpid/cpp/src/qpid/broker/TxBuffer.cpp
index a8df4fb214..6dc2f5c2f4 100644
--- a/qpid/cpp/src/qpid/broker/TxBuffer.cpp
+++ b/qpid/cpp/src/qpid/broker/TxBuffer.cpp
@@ -21,22 +21,28 @@
#include "qpid/broker/TxBuffer.h"
#include "qpid/broker/TransactionObserver.h"
#include "qpid/log/Statement.h"
+#include "qpid/framing/reply_exceptions.h"
#include <boost/mem_fn.hpp>
#include <boost/bind.hpp>
+
+namespace qpid {
+namespace broker{
+
using boost::mem_fn;
-using namespace qpid::broker;
+using framing::InternalErrorException;
TxBuffer::TxBuffer() : observer(new NullTransactionObserver) {}
bool TxBuffer::prepare(TransactionContext* const ctxt)
{
+ // The observer may call startCompleter to delay completion.
if (!observer->prepare()) return false;
for(op_iterator i = ops.begin(); i != ops.end(); i++){
- if(!(*i)->prepare(ctxt)){
- return false;
- }
+ if(!(*i)->prepare(ctxt)) return false;
}
+ // At this point prepare has succeeded locally but if completion is delayed,
+ // then completing threads may call setError to indicate an error.
return true;
}
@@ -60,24 +66,37 @@ void TxBuffer::enlist(TxOp::shared_ptr op)
ops.push_back(op);
}
-bool TxBuffer::commitLocal(TransactionalStore* const store)
+void TxBuffer::startCommit(TransactionalStore* const store)
{
- if (!store) return false;
- try {
- std::auto_ptr<TransactionContext> ctxt = store->begin();
- if (prepare(ctxt.get())) {
- store->commit(*ctxt);
- commit();
- return true;
- } else {
- store->abort(*ctxt);
- rollback();
- return false;
- }
- } catch (std::exception& e) {
- QPID_LOG(error, "Commit failed with exception: " << e.what());
- } catch (...) {
- QPID_LOG(error, "Commit failed with unknown exception");
+ if (!store) throw Exception("Can't commit transaction, no store.");
+ txContext.reset(store->begin().release());
+ if (!prepare(txContext.get()))
+ setError("Transaction prepare failed.");
+}
+
+// Called when async completion is complete.
+std::string TxBuffer::endCommit(TransactionalStore* const store) {
+ std::string e;
+ {
+ sys::Mutex::ScopedLock l(errorLock);
+ e = error;
+ }
+ if (!e.empty()) {
+ store->abort(*txContext);
+ rollback();
+ throw InternalErrorException(e);
}
- return false;
+ else {
+ store->commit(*txContext);
+ commit();
+ }
+ return std::string(); // There is no result from tx.commit
+}
+
+void TxBuffer::setError(const std::string& e) {
+ QPID_LOG(error, "Asynchronous transaction error: " << e);
+ sys::Mutex::ScopedLock l(errorLock);
+ error = e;
}
+
+}} // namespace qpid::broker