diff options
author | Gordon Sim <gsim@apache.org> | 2007-06-05 15:54:22 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2007-06-05 15:54:22 +0000 |
commit | dd86a8562275d411ba6af54b6651154b6abc08ef (patch) | |
tree | e7bd5e5ed838f041857e2f77461ad3f5759448e7 /cpp/src | |
parent | abf98a7ed4bd2d08d88b7f4f5d753b2e6d6dceb2 (diff) | |
download | qpid-python-dd86a8562275d411ba6af54b6651154b6abc08ef.tar.gz |
Some tests and fixes for dtx preview.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@544522 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/qpid/broker/BrokerChannel.cpp | 41 | ||||
-rw-r--r-- | cpp/src/qpid/broker/BrokerChannel.h | 6 | ||||
-rw-r--r-- | cpp/src/qpid/broker/DtxBuffer.cpp | 16 | ||||
-rw-r--r-- | cpp/src/qpid/broker/DtxBuffer.h | 4 | ||||
-rw-r--r-- | cpp/src/qpid/broker/DtxHandlerImpl.cpp | 95 | ||||
-rw-r--r-- | cpp/src/qpid/broker/DtxManager.cpp | 41 | ||||
-rw-r--r-- | cpp/src/qpid/broker/DtxManager.h | 10 | ||||
-rw-r--r-- | cpp/src/qpid/broker/DtxWorkRecord.cpp | 82 | ||||
-rw-r--r-- | cpp/src/qpid/broker/DtxWorkRecord.h | 8 | ||||
-rw-r--r-- | cpp/src/qpid/broker/NullMessageStore.cpp | 37 | ||||
-rw-r--r-- | cpp/src/qpid/broker/NullMessageStore.h | 2 | ||||
-rw-r--r-- | cpp/src/tests/DtxWorkRecordTest.cpp | 6 | ||||
-rwxr-xr-x | cpp/src/tests/python_tests | 2 |
13 files changed, 267 insertions, 83 deletions
diff --git a/cpp/src/qpid/broker/BrokerChannel.cpp b/cpp/src/qpid/broker/BrokerChannel.cpp index 096478faad..0c06350c02 100644 --- a/cpp/src/qpid/broker/BrokerChannel.cpp +++ b/cpp/src/qpid/broker/BrokerChannel.cpp @@ -61,6 +61,7 @@ Channel::Channel( prefetchCount(0), framesize(_framesize), tagGenerator("sgen"), + dtxSelected(false), accumulatedAck(0), store(_store), messageBuilder(this, _store, _stagingThreshold), @@ -103,6 +104,9 @@ void Channel::cancel(const string& tag){ void Channel::close(){ opened = false; consumers.clear(); + if (dtxBuffer.get()) { + dtxBuffer->fail(); + } recover(true); } @@ -123,22 +127,41 @@ void Channel::rollback(){ accumulatedAck.clear(); } -void Channel::startDtx(const std::string& xid, DtxManager& mgr){ +void Channel::selectDtx(){ + dtxSelected = true; +} + +void Channel::startDtx(const std::string& xid, DtxManager& mgr, bool join){ + if (!dtxSelected) { + throw ConnectionException(503, "Channel has not been selected for use with dtx"); + } dtxBuffer = DtxBuffer::shared_ptr(new DtxBuffer(xid)); txBuffer = static_pointer_cast<TxBuffer>(dtxBuffer); - mgr.start(xid, dtxBuffer); + if (join) { + mgr.join(xid, dtxBuffer); + } else { + mgr.start(xid, dtxBuffer); + } } -void Channel::endDtx(const std::string& xid){ +void Channel::endDtx(const std::string& xid, bool fail){ + if (!dtxBuffer) { + throw ConnectionException(503, boost::format("xid %1% not associated with this channel") % xid); + } if (dtxBuffer->getXid() != xid) { throw ConnectionException(503, boost::format("xid specified on start was %1%, but %2% specified on end") % dtxBuffer->getXid() % xid); } - TxOp::shared_ptr txAck(new DtxAck(accumulatedAck, unacked)); - accumulatedAck.clear(); - dtxBuffer->enlist(txAck); - dtxBuffer->markEnded(); + if (fail) { + accumulatedAck.clear(); + dtxBuffer->fail(); + } else { + TxOp::shared_ptr txAck(new DtxAck(accumulatedAck, unacked)); + accumulatedAck.clear(); + dtxBuffer->enlist(txAck); + dtxBuffer->markEnded(); + } dtxBuffer.reset(); txBuffer.reset(); @@ -250,7 +273,7 @@ void Channel::complete(Message::shared_ptr msg) { Exchange::shared_ptr exchange = connection.broker.getExchanges().get(msg->getExchange()); assert(exchange.get()); - if (txBuffer) { + if (txBuffer.get()) { TxPublish* deliverable(new TxPublish(msg)); TxOp::shared_ptr op(deliverable); exchange->route(*deliverable, msg->getRoutingKey(), @@ -276,7 +299,7 @@ void Channel::ack(uint64_t deliveryTag, bool multiple){ } void Channel::ack(uint64_t firstTag, uint64_t lastTag){ - if (txBuffer) { + if (txBuffer.get()) { accumulatedAck.update(firstTag, lastTag); //TODO: I think the outstanding prefetch size & count should be updated at this point... diff --git a/cpp/src/qpid/broker/BrokerChannel.h b/cpp/src/qpid/broker/BrokerChannel.h index 1d0093cf82..a2f17f85f4 100644 --- a/cpp/src/qpid/broker/BrokerChannel.h +++ b/cpp/src/qpid/broker/BrokerChannel.h @@ -92,6 +92,7 @@ class Channel : public framing::ChannelAdapter, sys::Mutex deliveryLock; TxBuffer::shared_ptr txBuffer; DtxBuffer::shared_ptr dtxBuffer; + bool dtxSelected; AccumulatedAck accumulatedAck; MessageStore* const store; MessageBuilder messageBuilder;//builder for in-progress message @@ -137,8 +138,9 @@ class Channel : public framing::ChannelAdapter, void startTx(); void commit(); void rollback(); - void startDtx(const std::string& xid, DtxManager& mgr); - void endDtx(const std::string& xid); + void selectDtx(); + void startDtx(const std::string& xid, DtxManager& mgr, bool join); + void endDtx(const std::string& xid, bool fail); void suspendDtx(const std::string& xid); void resumeDtx(const std::string& xid); void ack(); diff --git a/cpp/src/qpid/broker/DtxBuffer.cpp b/cpp/src/qpid/broker/DtxBuffer.cpp index 2ffe744293..7f816ebcf4 100644 --- a/cpp/src/qpid/broker/DtxBuffer.cpp +++ b/cpp/src/qpid/broker/DtxBuffer.cpp @@ -23,7 +23,7 @@ using namespace qpid::broker; using qpid::sys::Mutex; -DtxBuffer::DtxBuffer(const std::string& _xid) : xid(_xid), ended(false), suspended(false) {} +DtxBuffer::DtxBuffer(const std::string& _xid) : xid(_xid), ended(false), suspended(false), failed(false) {} DtxBuffer::~DtxBuffer() {} @@ -49,6 +49,20 @@ bool DtxBuffer::isSuspended() return suspended; } +void DtxBuffer::fail() +{ + Mutex::ScopedLock locker(lock); + rollback(); + failed = true; + ended = true; +} + +bool DtxBuffer::isRollbackOnly() +{ + Mutex::ScopedLock locker(lock); + return failed; +} + const std::string& DtxBuffer::getXid() { return xid; diff --git a/cpp/src/qpid/broker/DtxBuffer.h b/cpp/src/qpid/broker/DtxBuffer.h index 41be9309e8..0d4e6ccf31 100644 --- a/cpp/src/qpid/broker/DtxBuffer.h +++ b/cpp/src/qpid/broker/DtxBuffer.h @@ -31,6 +31,8 @@ namespace qpid { const std::string xid; bool ended; bool suspended; + bool failed; + public: typedef boost::shared_ptr<DtxBuffer> shared_ptr; @@ -40,6 +42,8 @@ namespace qpid { bool isEnded(); void setSuspended(bool suspended); bool isSuspended(); + void fail(); + bool isRollbackOnly(); const std::string& getXid(); }; } diff --git a/cpp/src/qpid/broker/DtxHandlerImpl.cpp b/cpp/src/qpid/broker/DtxHandlerImpl.cpp index 933d787a8a..1d7c2df5f4 100644 --- a/cpp/src/qpid/broker/DtxHandlerImpl.cpp +++ b/cpp/src/qpid/broker/DtxHandlerImpl.cpp @@ -23,6 +23,7 @@ using namespace qpid::broker; using qpid::framing::AMQP_ClientProxy; +using qpid::framing::Buffer; using qpid::framing::FieldTable; using qpid::framing::MethodContext; using std::string; @@ -35,12 +36,22 @@ DtxHandlerImpl::DtxHandlerImpl(CoreRefs& parent) : { } +const int XA_RBROLLBACK(1); +const int XA_RBTIMEOUT(2); +const int XA_HEURHAZ(3); +const int XA_HEURCOM(4); +const int XA_HEURRB(5); +const int XA_HEURMIX(6); +const int XA_RDONLY(7); +const int XA_OK(8); + // DtxDemarcationHandler: void DtxHandlerImpl::select(const MethodContext& context ) { + channel.selectDtx(); dClient.selectOk(context.getRequestId()); } @@ -50,52 +61,58 @@ void DtxHandlerImpl::end(const MethodContext& context, bool fail, bool suspend) { - if (fail && suspend) { - throw ConnectionException(503, "End and suspend cannot both be set."); - } - //TODO: handle fail - if (suspend) { - channel.suspendDtx(xid); + if (fail) { + channel.endDtx(xid, true); + if (suspend) { + throw ConnectionException(503, "End and suspend cannot both be set."); + } else { + dClient.endOk(XA_RBROLLBACK, context.getRequestId()); + } } else { - channel.endDtx(xid); + if (suspend) { + channel.suspendDtx(xid); + } else { + channel.endDtx(xid, false); + } + dClient.endOk(XA_OK, context.getRequestId()); } - dClient.endOk(0/*TODO - set flags*/, context.getRequestId()); } void DtxHandlerImpl::start(const MethodContext& context, u_int16_t /*ticket*/, const string& xid, - bool /*join*/, + bool join, bool resume) { - //TODO: handle join + if (join && resume) { + throw ConnectionException(503, "Join and resume cannot both be set."); + } if (resume) { channel.resumeDtx(xid); } else { - channel.startDtx(xid, broker.getDtxManager()); + channel.startDtx(xid, broker.getDtxManager(), join); } - dClient.startOk(0/*TODO - set flags*/, context.getRequestId()); + dClient.startOk(XA_OK, context.getRequestId()); } // DtxCoordinationHandler: void DtxHandlerImpl::prepare(const MethodContext& context, u_int16_t /*ticket*/, - const string& xid ) + const string& xid) { - broker.getDtxManager().prepare(xid); - cClient.prepareOk(0/*TODO - set flags*/, context.getRequestId()); + bool ok = broker.getDtxManager().prepare(xid); + cClient.prepareOk(ok ? XA_OK : XA_RBROLLBACK, context.getRequestId()); } void DtxHandlerImpl::commit(const MethodContext& context, u_int16_t /*ticket*/, const string& xid, - bool /*onePhase*/ ) + bool onePhase) { - //TODO use onePhase flag to validate correct sequence - broker.getDtxManager().commit(xid); - cClient.commitOk(0/*TODO - set flags*/, context.getRequestId()); + bool ok = broker.getDtxManager().commit(xid, onePhase); + cClient.commitOk(ok ? XA_OK : XA_RBROLLBACK, context.getRequestId()); } @@ -104,22 +121,54 @@ void DtxHandlerImpl::rollback(const MethodContext& context, const string& xid ) { broker.getDtxManager().rollback(xid); - cClient.rollbackOk(0/*TODO - set flags*/, context.getRequestId()); + cClient.rollbackOk(XA_OK, context.getRequestId()); } -void DtxHandlerImpl::recover(const MethodContext& /*context*/, +void DtxHandlerImpl::recover(const MethodContext& context, u_int16_t /*ticket*/, bool /*startscan*/, u_int32_t /*endscan*/ ) { //TODO + + //TODO: what do startscan and endscan actually mean? + + // response should hold on key value pair with key = 'xids' and + // value = sequence of xids + + // until sequences are supported (0-10 encoding), an alternate + // scheme is used for testing: + // + // key = 'xids' and value = a longstr containing shortstrs for each xid + // + // note that this restricts the length of the xids more than is + // strictly 'legal', but that is ok for testing + std::set<std::string> xids; + broker.getStore().collectPreparedXids(xids); + uint size(0); + for (std::set<std::string>::iterator i = xids.begin(); i != xids.end(); i++) { + size += i->size() + 1/*shortstr size*/; + } + Buffer buffer(size + 4/*longstr size*/); + buffer.putLong(size); + for (std::set<std::string>::iterator i = xids.begin(); i != xids.end(); i++) { + buffer.putShortString(*i); + } + buffer.flip(); + string data; + buffer.getLongString(data); + + FieldTable response; + response.setString("xids", data); + cClient.recoverOk(response, context.getRequestId()); } void DtxHandlerImpl::forget(const MethodContext& /*context*/, u_int16_t /*ticket*/, - const string& /*xid*/ ) + const string& xid) { - //TODO + //Currently no heuristic completion is supported, so this should never be used. + throw ConnectionException(503, boost::format("Forget is invalid. Branch with xid %1% not heuristically completed!") % xid); } void DtxHandlerImpl::getTimeout(const MethodContext& /*context*/, diff --git a/cpp/src/qpid/broker/DtxManager.cpp b/cpp/src/qpid/broker/DtxManager.cpp index 0b0262902b..b05f7b9784 100644 --- a/cpp/src/qpid/broker/DtxManager.cpp +++ b/cpp/src/qpid/broker/DtxManager.cpp @@ -21,6 +21,7 @@ #include "DtxManager.h" #include <boost/format.hpp> #include <iostream> +using qpid::sys::Mutex; using namespace qpid::broker; @@ -30,31 +31,40 @@ DtxManager::~DtxManager() {} void DtxManager::start(std::string xid, DtxBuffer::shared_ptr ops) { - getOrCreateWork(xid)->add(ops); + createWork(xid)->add(ops); +} + +void DtxManager::join(std::string xid, DtxBuffer::shared_ptr ops) +{ + getWork(xid)->add(ops); } void DtxManager::recover(std::string xid, std::auto_ptr<TPCTransactionContext> txn, DtxBuffer::shared_ptr ops) { - getOrCreateWork(xid)->recover(txn, ops); + createWork(xid)->recover(txn, ops); } -void DtxManager::prepare(const std::string& xid) +bool DtxManager::prepare(const std::string& xid) { - getWork(xid)->prepare(); + return getWork(xid)->prepare(); } -void DtxManager::commit(const std::string& xid) +bool DtxManager::commit(const std::string& xid, bool onePhase) { - getWork(xid)->commit(); + bool result = getWork(xid)->commit(onePhase); + remove(xid); + return result; } void DtxManager::rollback(const std::string& xid) { getWork(xid)->rollback(); + remove(xid); } DtxManager::WorkMap::iterator DtxManager::getWork(const std::string& xid) { + Mutex::ScopedLock locker(lock); WorkMap::iterator i = work.find(xid); if (i == work.end()) { throw ConnectionException(503, boost::format("Unrecognised xid %1%!") % xid); @@ -62,11 +72,24 @@ DtxManager::WorkMap::iterator DtxManager::getWork(const std::string& xid) return i; } -DtxManager::WorkMap::iterator DtxManager::getOrCreateWork(std::string& xid) +void DtxManager::remove(const std::string& xid) { + Mutex::ScopedLock locker(lock); WorkMap::iterator i = work.find(xid); if (i == work.end()) { - i = work.insert(xid, new DtxWorkRecord(xid, store)).first; + throw ConnectionException(503, boost::format("Unrecognised xid %1%!") % xid); + } else { + work.erase(i); + } +} + +DtxManager::WorkMap::iterator DtxManager::createWork(std::string& xid) +{ + Mutex::ScopedLock locker(lock); + WorkMap::iterator i = work.find(xid); + if (i != work.end()) { + throw ConnectionException(503, boost::format("Xid %1% is already known (use 'join' to add work to an existing xid)!") % xid); + } else { + return work.insert(xid, new DtxWorkRecord(xid, store)).first; } - return i; } diff --git a/cpp/src/qpid/broker/DtxManager.h b/cpp/src/qpid/broker/DtxManager.h index e908faecac..ce33f77a0f 100644 --- a/cpp/src/qpid/broker/DtxManager.h +++ b/cpp/src/qpid/broker/DtxManager.h @@ -26,6 +26,7 @@ #include "DtxWorkRecord.h" #include "TransactionalStore.h" #include "qpid/framing/amqp_types.h" +#include "qpid/sys/Mutex.h" namespace qpid { namespace broker { @@ -35,17 +36,20 @@ class DtxManager{ WorkMap work; TransactionalStore* const store; + qpid::sys::Mutex lock; + void remove(const std::string& xid); WorkMap::iterator getWork(const std::string& xid); - WorkMap::iterator getOrCreateWork(std::string& xid); + WorkMap::iterator createWork(std::string& xid); public: DtxManager(TransactionalStore* const store); ~DtxManager(); void start(std::string xid, DtxBuffer::shared_ptr work); + void join(std::string xid, DtxBuffer::shared_ptr work); void recover(std::string xid, std::auto_ptr<TPCTransactionContext> txn, DtxBuffer::shared_ptr work); - void prepare(const std::string& xid); - void commit(const std::string& xid); + bool prepare(const std::string& xid); + bool commit(const std::string& xid, bool onePhase); void rollback(const std::string& xid); }; diff --git a/cpp/src/qpid/broker/DtxWorkRecord.cpp b/cpp/src/qpid/broker/DtxWorkRecord.cpp index 0cfffb41a1..1eb4903672 100644 --- a/cpp/src/qpid/broker/DtxWorkRecord.cpp +++ b/cpp/src/qpid/broker/DtxWorkRecord.cpp @@ -22,24 +22,32 @@ #include <boost/format.hpp> #include <boost/mem_fn.hpp> using boost::mem_fn; +using qpid::sys::Mutex; using namespace qpid::broker; -DtxWorkRecord::DtxWorkRecord(const std::string& _xid, TransactionalStore* const _store) : xid(_xid), store(_store), completed(false) {} +DtxWorkRecord::DtxWorkRecord(const std::string& _xid, TransactionalStore* const _store) : + xid(_xid), store(_store), completed(false), rolledback(false), prepared(false) {} DtxWorkRecord::~DtxWorkRecord() {} bool DtxWorkRecord::prepare() { - checkCompletion(); - txn = store->begin(xid); - if (prepare(txn.get())) { - store->prepare(*txn); - return true; + Mutex::ScopedLock locker(lock); + if (check()) { + txn = store->begin(xid); + if (prepare(txn.get())) { + store->prepare(*txn); + prepared = true; + } else { + abort(); + //TODO: this should probably be flagged as internal error + } } else { + //some part of the work has been marked rollback only abort(); - return false; } + return prepared; } bool DtxWorkRecord::prepare(TransactionContext* _txn) @@ -51,50 +59,77 @@ bool DtxWorkRecord::prepare(TransactionContext* _txn) return succeeded; } -void DtxWorkRecord::commit() +bool DtxWorkRecord::commit(bool onePhase) { - checkCompletion(); - if (txn.get()) { - //already prepared - store->commit(*txn); - txn.reset(); + Mutex::ScopedLock locker(lock); + if (check()) { + if (prepared) { + //already prepared i.e. 2pc + if (onePhase) { + throw ConnectionException(503, + boost::format("Branch with xid %1% has been prepared, one-phase option not valid!") % xid); + } - for_each(work.begin(), work.end(), mem_fn(&TxBuffer::commit)); - } else { - //1pc commit optimisation, don't need a 2pc transaction context: - std::auto_ptr<TransactionContext> localtxn = store->begin(); - if (prepare(localtxn.get())) { - store->commit(*localtxn); + store->commit(*txn); + txn.reset(); + for_each(work.begin(), work.end(), mem_fn(&TxBuffer::commit)); + return true; } else { - store->abort(*localtxn); - abort(); + //1pc commit optimisation, don't need a 2pc transaction context: + if (!onePhase) { + throw ConnectionException(503, + boost::format("Branch with xid %1% has not been prepared, one-phase option required!") % xid); + } + std::auto_ptr<TransactionContext> localtxn = store->begin(); + if (prepare(localtxn.get())) { + store->commit(*localtxn); + for_each(work.begin(), work.end(), mem_fn(&TxBuffer::commit)); + return true; + } else { + store->abort(*localtxn); + abort(); + //TODO: this should probably be flagged as internal error + return false; + } } + } else { + //some part of the work has been marked rollback only + abort(); + return false; } } void DtxWorkRecord::rollback() { - checkCompletion(); + Mutex::ScopedLock locker(lock); + check(); abort(); } void DtxWorkRecord::add(DtxBuffer::shared_ptr ops) { + Mutex::ScopedLock locker(lock); + if (completed) { + throw ConnectionException(503, boost::format("Branch with xid %1% has been completed!") % xid); + } work.push_back(ops); } -void DtxWorkRecord::checkCompletion() +bool DtxWorkRecord::check() { if (!completed) { //iterate through all DtxBuffers and ensure they are all ended for (Work::iterator i = work.begin(); i != work.end(); i++) { if (!(*i)->isEnded()) { throw ConnectionException(503, boost::format("Branch with xid %1% not completed!") % xid); + } else if ((*i)->isRollbackOnly()) { + rolledback = true; } } completed = true; } + return !rolledback; } void DtxWorkRecord::abort() @@ -112,4 +147,5 @@ void DtxWorkRecord::recover(std::auto_ptr<TPCTransactionContext> _txn, DtxBuffer txn = _txn; ops->markEnded(); completed = true; + prepared = true; } diff --git a/cpp/src/qpid/broker/DtxWorkRecord.h b/cpp/src/qpid/broker/DtxWorkRecord.h index 0453ea1644..0c6e0ba6bc 100644 --- a/cpp/src/qpid/broker/DtxWorkRecord.h +++ b/cpp/src/qpid/broker/DtxWorkRecord.h @@ -27,6 +27,7 @@ #include "DtxBuffer.h" #include "TransactionalStore.h" #include "qpid/framing/amqp_types.h" +#include "qpid/sys/Mutex.h" namespace qpid { namespace broker { @@ -43,17 +44,20 @@ class DtxWorkRecord const std::string xid; TransactionalStore* const store; bool completed; + bool rolledback; + bool prepared; Work work; std::auto_ptr<TPCTransactionContext> txn; + qpid::sys::Mutex lock; - void checkCompletion(); + bool check(); void abort(); bool prepare(TransactionContext* txn); public: DtxWorkRecord(const std::string& xid, TransactionalStore* const store); ~DtxWorkRecord(); bool prepare(); - void commit(); + bool commit(bool onePhase); void rollback(); void add(DtxBuffer::shared_ptr ops); void recover(std::auto_ptr<TPCTransactionContext> txn, DtxBuffer::shared_ptr ops); diff --git a/cpp/src/qpid/broker/NullMessageStore.cpp b/cpp/src/qpid/broker/NullMessageStore.cpp index 49feb163bf..7742902cc9 100644 --- a/cpp/src/qpid/broker/NullMessageStore.cpp +++ b/cpp/src/qpid/broker/NullMessageStore.cpp @@ -25,6 +25,26 @@ #include <iostream> +namespace qpid{ +namespace broker{ + +const std::string nullxid = ""; + +class DummyCtxt : public TPCTransactionContext +{ + const std::string xid; +public: + DummyCtxt(const std::string& _xid) : xid(_xid) {} + static std::string getXid(TransactionContext& ctxt) + { + DummyCtxt* c(dynamic_cast<DummyCtxt*>(&ctxt)); + return c ? c->xid : nullxid; + } +}; + +} +} + using namespace qpid::broker; NullMessageStore::NullMessageStore(bool _warn) : warn(_warn){} @@ -92,24 +112,27 @@ std::auto_ptr<TransactionContext> NullMessageStore::begin() return std::auto_ptr<TransactionContext>(); } -std::auto_ptr<TPCTransactionContext> NullMessageStore::begin(const std::string&) +std::auto_ptr<TPCTransactionContext> NullMessageStore::begin(const std::string& xid) { - return std::auto_ptr<TPCTransactionContext>(); + return std::auto_ptr<TPCTransactionContext>(new DummyCtxt(xid)); } -void NullMessageStore::prepare(TPCTransactionContext&) +void NullMessageStore::prepare(TPCTransactionContext& ctxt) { + prepared.insert(DummyCtxt::getXid(ctxt)); } -void NullMessageStore::commit(TransactionContext&) +void NullMessageStore::commit(TransactionContext& ctxt) { + prepared.erase(DummyCtxt::getXid(ctxt)); } -void NullMessageStore::abort(TransactionContext&) +void NullMessageStore::abort(TransactionContext& ctxt) { + prepared.erase(DummyCtxt::getXid(ctxt)); } -void NullMessageStore::collectPreparedXids(std::set<string>&) +void NullMessageStore::collectPreparedXids(std::set<string>& out) { - + out.insert(prepared.begin(), prepared.end()); } diff --git a/cpp/src/qpid/broker/NullMessageStore.h b/cpp/src/qpid/broker/NullMessageStore.h index 2835961048..e6188b43ce 100644 --- a/cpp/src/qpid/broker/NullMessageStore.h +++ b/cpp/src/qpid/broker/NullMessageStore.h @@ -21,6 +21,7 @@ #ifndef _NullMessageStore_ #define _NullMessageStore_ +#include <set> #include "BrokerMessage.h" #include "MessageStore.h" #include "BrokerQueue.h" @@ -33,6 +34,7 @@ namespace broker { */ class NullMessageStore : public MessageStore { + std::set<std::string> prepared; const bool warn; public: NullMessageStore(bool warn = false); diff --git a/cpp/src/tests/DtxWorkRecordTest.cpp b/cpp/src/tests/DtxWorkRecordTest.cpp index fc1e536ce3..d7d151f8d6 100644 --- a/cpp/src/tests/DtxWorkRecordTest.cpp +++ b/cpp/src/tests/DtxWorkRecordTest.cpp @@ -59,7 +59,7 @@ class DtxWorkRecordTest : public CppUnit::TestCase work.add(bufferA); work.add(bufferB); - work.commit(); + work.commit(true); store.check(); CPPUNIT_ASSERT(store.isCommitted()); @@ -93,7 +93,7 @@ class DtxWorkRecordTest : public CppUnit::TestCase work.add(bufferB); work.add(bufferC); - work.commit(); + work.commit(true); CPPUNIT_ASSERT(store.isAborted()); store.check(); @@ -125,7 +125,7 @@ class DtxWorkRecordTest : public CppUnit::TestCase CPPUNIT_ASSERT(work.prepare()); CPPUNIT_ASSERT(store.isPrepared()); - work.commit(); + work.commit(false); store.check(); CPPUNIT_ASSERT(store.isCommitted()); opA->check(); diff --git a/cpp/src/tests/python_tests b/cpp/src/tests/python_tests index dbf56a512a..2c0b6b2071 100755 --- a/cpp/src/tests/python_tests +++ b/cpp/src/tests/python_tests @@ -1,7 +1,7 @@ #!/bin/sh # Run the python tests. if test -d ../../../python ; then - cd ../../../python && ./run-tests -v -s "0-9" -I cpp_failing_0-9.txt $PYTHON_TESTS + cd ../../../python && ./run-tests -v -s "0-9" -e ../specs/amqp-dtx-preview.0-9.xml -I cpp_failing_0-9.txt $PYTHON_TESTS else echo Warning: python tests not found. fi |