diff options
author | Gordon Sim <gsim@apache.org> | 2007-06-05 15:54:22 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2007-06-05 15:54:22 +0000 |
commit | d276a19bff4afbe03d458e82e5ef35c64efe892b (patch) | |
tree | b3fe8805a90fc358a047bab01f8fc159f1b226df | |
parent | b3b97381ead5356ec7ca24483ade51594dfd4995 (diff) | |
download | qpid-python-d276a19bff4afbe03d458e82e5ef35c64efe892b.tar.gz |
Some tests and fixes for dtx preview.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@544522 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/cpp/src/qpid/broker/BrokerChannel.cpp | 41 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/BrokerChannel.h | 6 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/DtxBuffer.cpp | 16 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/DtxBuffer.h | 4 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/DtxHandlerImpl.cpp | 95 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/DtxManager.cpp | 41 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/DtxManager.h | 10 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/DtxWorkRecord.cpp | 82 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/DtxWorkRecord.h | 8 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/NullMessageStore.cpp | 37 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/NullMessageStore.h | 2 | ||||
-rw-r--r-- | qpid/cpp/src/tests/DtxWorkRecordTest.cpp | 6 | ||||
-rwxr-xr-x | qpid/cpp/src/tests/python_tests | 2 | ||||
-rwxr-xr-x | qpid/python/amqp-doc | 6 | ||||
-rw-r--r-- | qpid/python/qpid/spec.py | 18 | ||||
-rw-r--r-- | qpid/python/qpid/testlib.py | 6 | ||||
-rw-r--r-- | qpid/python/tests_0-9/dtx.py | 540 |
17 files changed, 824 insertions, 96 deletions
diff --git a/qpid/cpp/src/qpid/broker/BrokerChannel.cpp b/qpid/cpp/src/qpid/broker/BrokerChannel.cpp index 096478faad..0c06350c02 100644 --- a/qpid/cpp/src/qpid/broker/BrokerChannel.cpp +++ b/qpid/cpp/src/qpid/broker/BrokerChannel.cpp @@ -61,6 +61,7 @@ Channel::Channel( prefetchCount(0), framesize(_framesize), tagGenerator("sgen"), + dtxSelected(false), accumulatedAck(0), store(_store), messageBuilder(this, _store, _stagingThreshold), @@ -103,6 +104,9 @@ void Channel::cancel(const string& tag){ void Channel::close(){ opened = false; consumers.clear(); + if (dtxBuffer.get()) { + dtxBuffer->fail(); + } recover(true); } @@ -123,22 +127,41 @@ void Channel::rollback(){ accumulatedAck.clear(); } -void Channel::startDtx(const std::string& xid, DtxManager& mgr){ +void Channel::selectDtx(){ + dtxSelected = true; +} + +void Channel::startDtx(const std::string& xid, DtxManager& mgr, bool join){ + if (!dtxSelected) { + throw ConnectionException(503, "Channel has not been selected for use with dtx"); + } dtxBuffer = DtxBuffer::shared_ptr(new DtxBuffer(xid)); txBuffer = static_pointer_cast<TxBuffer>(dtxBuffer); - mgr.start(xid, dtxBuffer); + if (join) { + mgr.join(xid, dtxBuffer); + } else { + mgr.start(xid, dtxBuffer); + } } -void Channel::endDtx(const std::string& xid){ +void Channel::endDtx(const std::string& xid, bool fail){ + if (!dtxBuffer) { + throw ConnectionException(503, boost::format("xid %1% not associated with this channel") % xid); + } if (dtxBuffer->getXid() != xid) { throw ConnectionException(503, boost::format("xid specified on start was %1%, but %2% specified on end") % dtxBuffer->getXid() % xid); } - TxOp::shared_ptr txAck(new DtxAck(accumulatedAck, unacked)); - accumulatedAck.clear(); - dtxBuffer->enlist(txAck); - dtxBuffer->markEnded(); + if (fail) { + accumulatedAck.clear(); + dtxBuffer->fail(); + } else { + TxOp::shared_ptr txAck(new DtxAck(accumulatedAck, unacked)); + accumulatedAck.clear(); + dtxBuffer->enlist(txAck); + dtxBuffer->markEnded(); + } dtxBuffer.reset(); txBuffer.reset(); @@ -250,7 +273,7 @@ void Channel::complete(Message::shared_ptr msg) { Exchange::shared_ptr exchange = connection.broker.getExchanges().get(msg->getExchange()); assert(exchange.get()); - if (txBuffer) { + if (txBuffer.get()) { TxPublish* deliverable(new TxPublish(msg)); TxOp::shared_ptr op(deliverable); exchange->route(*deliverable, msg->getRoutingKey(), @@ -276,7 +299,7 @@ void Channel::ack(uint64_t deliveryTag, bool multiple){ } void Channel::ack(uint64_t firstTag, uint64_t lastTag){ - if (txBuffer) { + if (txBuffer.get()) { accumulatedAck.update(firstTag, lastTag); //TODO: I think the outstanding prefetch size & count should be updated at this point... diff --git a/qpid/cpp/src/qpid/broker/BrokerChannel.h b/qpid/cpp/src/qpid/broker/BrokerChannel.h index 1d0093cf82..a2f17f85f4 100644 --- a/qpid/cpp/src/qpid/broker/BrokerChannel.h +++ b/qpid/cpp/src/qpid/broker/BrokerChannel.h @@ -92,6 +92,7 @@ class Channel : public framing::ChannelAdapter, sys::Mutex deliveryLock; TxBuffer::shared_ptr txBuffer; DtxBuffer::shared_ptr dtxBuffer; + bool dtxSelected; AccumulatedAck accumulatedAck; MessageStore* const store; MessageBuilder messageBuilder;//builder for in-progress message @@ -137,8 +138,9 @@ class Channel : public framing::ChannelAdapter, void startTx(); void commit(); void rollback(); - void startDtx(const std::string& xid, DtxManager& mgr); - void endDtx(const std::string& xid); + void selectDtx(); + void startDtx(const std::string& xid, DtxManager& mgr, bool join); + void endDtx(const std::string& xid, bool fail); void suspendDtx(const std::string& xid); void resumeDtx(const std::string& xid); void ack(); diff --git a/qpid/cpp/src/qpid/broker/DtxBuffer.cpp b/qpid/cpp/src/qpid/broker/DtxBuffer.cpp index 2ffe744293..7f816ebcf4 100644 --- a/qpid/cpp/src/qpid/broker/DtxBuffer.cpp +++ b/qpid/cpp/src/qpid/broker/DtxBuffer.cpp @@ -23,7 +23,7 @@ using namespace qpid::broker; using qpid::sys::Mutex; -DtxBuffer::DtxBuffer(const std::string& _xid) : xid(_xid), ended(false), suspended(false) {} +DtxBuffer::DtxBuffer(const std::string& _xid) : xid(_xid), ended(false), suspended(false), failed(false) {} DtxBuffer::~DtxBuffer() {} @@ -49,6 +49,20 @@ bool DtxBuffer::isSuspended() return suspended; } +void DtxBuffer::fail() +{ + Mutex::ScopedLock locker(lock); + rollback(); + failed = true; + ended = true; +} + +bool DtxBuffer::isRollbackOnly() +{ + Mutex::ScopedLock locker(lock); + return failed; +} + const std::string& DtxBuffer::getXid() { return xid; diff --git a/qpid/cpp/src/qpid/broker/DtxBuffer.h b/qpid/cpp/src/qpid/broker/DtxBuffer.h index 41be9309e8..0d4e6ccf31 100644 --- a/qpid/cpp/src/qpid/broker/DtxBuffer.h +++ b/qpid/cpp/src/qpid/broker/DtxBuffer.h @@ -31,6 +31,8 @@ namespace qpid { const std::string xid; bool ended; bool suspended; + bool failed; + public: typedef boost::shared_ptr<DtxBuffer> shared_ptr; @@ -40,6 +42,8 @@ namespace qpid { bool isEnded(); void setSuspended(bool suspended); bool isSuspended(); + void fail(); + bool isRollbackOnly(); const std::string& getXid(); }; } diff --git a/qpid/cpp/src/qpid/broker/DtxHandlerImpl.cpp b/qpid/cpp/src/qpid/broker/DtxHandlerImpl.cpp index 933d787a8a..1d7c2df5f4 100644 --- a/qpid/cpp/src/qpid/broker/DtxHandlerImpl.cpp +++ b/qpid/cpp/src/qpid/broker/DtxHandlerImpl.cpp @@ -23,6 +23,7 @@ using namespace qpid::broker; using qpid::framing::AMQP_ClientProxy; +using qpid::framing::Buffer; using qpid::framing::FieldTable; using qpid::framing::MethodContext; using std::string; @@ -35,12 +36,22 @@ DtxHandlerImpl::DtxHandlerImpl(CoreRefs& parent) : { } +const int XA_RBROLLBACK(1); +const int XA_RBTIMEOUT(2); +const int XA_HEURHAZ(3); +const int XA_HEURCOM(4); +const int XA_HEURRB(5); +const int XA_HEURMIX(6); +const int XA_RDONLY(7); +const int XA_OK(8); + // DtxDemarcationHandler: void DtxHandlerImpl::select(const MethodContext& context ) { + channel.selectDtx(); dClient.selectOk(context.getRequestId()); } @@ -50,52 +61,58 @@ void DtxHandlerImpl::end(const MethodContext& context, bool fail, bool suspend) { - if (fail && suspend) { - throw ConnectionException(503, "End and suspend cannot both be set."); - } - //TODO: handle fail - if (suspend) { - channel.suspendDtx(xid); + if (fail) { + channel.endDtx(xid, true); + if (suspend) { + throw ConnectionException(503, "End and suspend cannot both be set."); + } else { + dClient.endOk(XA_RBROLLBACK, context.getRequestId()); + } } else { - channel.endDtx(xid); + if (suspend) { + channel.suspendDtx(xid); + } else { + channel.endDtx(xid, false); + } + dClient.endOk(XA_OK, context.getRequestId()); } - dClient.endOk(0/*TODO - set flags*/, context.getRequestId()); } void DtxHandlerImpl::start(const MethodContext& context, u_int16_t /*ticket*/, const string& xid, - bool /*join*/, + bool join, bool resume) { - //TODO: handle join + if (join && resume) { + throw ConnectionException(503, "Join and resume cannot both be set."); + } if (resume) { channel.resumeDtx(xid); } else { - channel.startDtx(xid, broker.getDtxManager()); + channel.startDtx(xid, broker.getDtxManager(), join); } - dClient.startOk(0/*TODO - set flags*/, context.getRequestId()); + dClient.startOk(XA_OK, context.getRequestId()); } // DtxCoordinationHandler: void DtxHandlerImpl::prepare(const MethodContext& context, u_int16_t /*ticket*/, - const string& xid ) + const string& xid) { - broker.getDtxManager().prepare(xid); - cClient.prepareOk(0/*TODO - set flags*/, context.getRequestId()); + bool ok = broker.getDtxManager().prepare(xid); + cClient.prepareOk(ok ? XA_OK : XA_RBROLLBACK, context.getRequestId()); } void DtxHandlerImpl::commit(const MethodContext& context, u_int16_t /*ticket*/, const string& xid, - bool /*onePhase*/ ) + bool onePhase) { - //TODO use onePhase flag to validate correct sequence - broker.getDtxManager().commit(xid); - cClient.commitOk(0/*TODO - set flags*/, context.getRequestId()); + bool ok = broker.getDtxManager().commit(xid, onePhase); + cClient.commitOk(ok ? XA_OK : XA_RBROLLBACK, context.getRequestId()); } @@ -104,22 +121,54 @@ void DtxHandlerImpl::rollback(const MethodContext& context, const string& xid ) { broker.getDtxManager().rollback(xid); - cClient.rollbackOk(0/*TODO - set flags*/, context.getRequestId()); + cClient.rollbackOk(XA_OK, context.getRequestId()); } -void DtxHandlerImpl::recover(const MethodContext& /*context*/, +void DtxHandlerImpl::recover(const MethodContext& context, u_int16_t /*ticket*/, bool /*startscan*/, u_int32_t /*endscan*/ ) { //TODO + + //TODO: what do startscan and endscan actually mean? + + // response should hold on key value pair with key = 'xids' and + // value = sequence of xids + + // until sequences are supported (0-10 encoding), an alternate + // scheme is used for testing: + // + // key = 'xids' and value = a longstr containing shortstrs for each xid + // + // note that this restricts the length of the xids more than is + // strictly 'legal', but that is ok for testing + std::set<std::string> xids; + broker.getStore().collectPreparedXids(xids); + uint size(0); + for (std::set<std::string>::iterator i = xids.begin(); i != xids.end(); i++) { + size += i->size() + 1/*shortstr size*/; + } + Buffer buffer(size + 4/*longstr size*/); + buffer.putLong(size); + for (std::set<std::string>::iterator i = xids.begin(); i != xids.end(); i++) { + buffer.putShortString(*i); + } + buffer.flip(); + string data; + buffer.getLongString(data); + + FieldTable response; + response.setString("xids", data); + cClient.recoverOk(response, context.getRequestId()); } void DtxHandlerImpl::forget(const MethodContext& /*context*/, u_int16_t /*ticket*/, - const string& /*xid*/ ) + const string& xid) { - //TODO + //Currently no heuristic completion is supported, so this should never be used. + throw ConnectionException(503, boost::format("Forget is invalid. Branch with xid %1% not heuristically completed!") % xid); } void DtxHandlerImpl::getTimeout(const MethodContext& /*context*/, diff --git a/qpid/cpp/src/qpid/broker/DtxManager.cpp b/qpid/cpp/src/qpid/broker/DtxManager.cpp index 0b0262902b..b05f7b9784 100644 --- a/qpid/cpp/src/qpid/broker/DtxManager.cpp +++ b/qpid/cpp/src/qpid/broker/DtxManager.cpp @@ -21,6 +21,7 @@ #include "DtxManager.h" #include <boost/format.hpp> #include <iostream> +using qpid::sys::Mutex; using namespace qpid::broker; @@ -30,31 +31,40 @@ DtxManager::~DtxManager() {} void DtxManager::start(std::string xid, DtxBuffer::shared_ptr ops) { - getOrCreateWork(xid)->add(ops); + createWork(xid)->add(ops); +} + +void DtxManager::join(std::string xid, DtxBuffer::shared_ptr ops) +{ + getWork(xid)->add(ops); } void DtxManager::recover(std::string xid, std::auto_ptr<TPCTransactionContext> txn, DtxBuffer::shared_ptr ops) { - getOrCreateWork(xid)->recover(txn, ops); + createWork(xid)->recover(txn, ops); } -void DtxManager::prepare(const std::string& xid) +bool DtxManager::prepare(const std::string& xid) { - getWork(xid)->prepare(); + return getWork(xid)->prepare(); } -void DtxManager::commit(const std::string& xid) +bool DtxManager::commit(const std::string& xid, bool onePhase) { - getWork(xid)->commit(); + bool result = getWork(xid)->commit(onePhase); + remove(xid); + return result; } void DtxManager::rollback(const std::string& xid) { getWork(xid)->rollback(); + remove(xid); } DtxManager::WorkMap::iterator DtxManager::getWork(const std::string& xid) { + Mutex::ScopedLock locker(lock); WorkMap::iterator i = work.find(xid); if (i == work.end()) { throw ConnectionException(503, boost::format("Unrecognised xid %1%!") % xid); @@ -62,11 +72,24 @@ DtxManager::WorkMap::iterator DtxManager::getWork(const std::string& xid) return i; } -DtxManager::WorkMap::iterator DtxManager::getOrCreateWork(std::string& xid) +void DtxManager::remove(const std::string& xid) { + Mutex::ScopedLock locker(lock); WorkMap::iterator i = work.find(xid); if (i == work.end()) { - i = work.insert(xid, new DtxWorkRecord(xid, store)).first; + throw ConnectionException(503, boost::format("Unrecognised xid %1%!") % xid); + } else { + work.erase(i); + } +} + +DtxManager::WorkMap::iterator DtxManager::createWork(std::string& xid) +{ + Mutex::ScopedLock locker(lock); + WorkMap::iterator i = work.find(xid); + if (i != work.end()) { + throw ConnectionException(503, boost::format("Xid %1% is already known (use 'join' to add work to an existing xid)!") % xid); + } else { + return work.insert(xid, new DtxWorkRecord(xid, store)).first; } - return i; } diff --git a/qpid/cpp/src/qpid/broker/DtxManager.h b/qpid/cpp/src/qpid/broker/DtxManager.h index e908faecac..ce33f77a0f 100644 --- a/qpid/cpp/src/qpid/broker/DtxManager.h +++ b/qpid/cpp/src/qpid/broker/DtxManager.h @@ -26,6 +26,7 @@ #include "DtxWorkRecord.h" #include "TransactionalStore.h" #include "qpid/framing/amqp_types.h" +#include "qpid/sys/Mutex.h" namespace qpid { namespace broker { @@ -35,17 +36,20 @@ class DtxManager{ WorkMap work; TransactionalStore* const store; + qpid::sys::Mutex lock; + void remove(const std::string& xid); WorkMap::iterator getWork(const std::string& xid); - WorkMap::iterator getOrCreateWork(std::string& xid); + WorkMap::iterator createWork(std::string& xid); public: DtxManager(TransactionalStore* const store); ~DtxManager(); void start(std::string xid, DtxBuffer::shared_ptr work); + void join(std::string xid, DtxBuffer::shared_ptr work); void recover(std::string xid, std::auto_ptr<TPCTransactionContext> txn, DtxBuffer::shared_ptr work); - void prepare(const std::string& xid); - void commit(const std::string& xid); + bool prepare(const std::string& xid); + bool commit(const std::string& xid, bool onePhase); void rollback(const std::string& xid); }; diff --git a/qpid/cpp/src/qpid/broker/DtxWorkRecord.cpp b/qpid/cpp/src/qpid/broker/DtxWorkRecord.cpp index 0cfffb41a1..1eb4903672 100644 --- a/qpid/cpp/src/qpid/broker/DtxWorkRecord.cpp +++ b/qpid/cpp/src/qpid/broker/DtxWorkRecord.cpp @@ -22,24 +22,32 @@ #include <boost/format.hpp> #include <boost/mem_fn.hpp> using boost::mem_fn; +using qpid::sys::Mutex; using namespace qpid::broker; -DtxWorkRecord::DtxWorkRecord(const std::string& _xid, TransactionalStore* const _store) : xid(_xid), store(_store), completed(false) {} +DtxWorkRecord::DtxWorkRecord(const std::string& _xid, TransactionalStore* const _store) : + xid(_xid), store(_store), completed(false), rolledback(false), prepared(false) {} DtxWorkRecord::~DtxWorkRecord() {} bool DtxWorkRecord::prepare() { - checkCompletion(); - txn = store->begin(xid); - if (prepare(txn.get())) { - store->prepare(*txn); - return true; + Mutex::ScopedLock locker(lock); + if (check()) { + txn = store->begin(xid); + if (prepare(txn.get())) { + store->prepare(*txn); + prepared = true; + } else { + abort(); + //TODO: this should probably be flagged as internal error + } } else { + //some part of the work has been marked rollback only abort(); - return false; } + return prepared; } bool DtxWorkRecord::prepare(TransactionContext* _txn) @@ -51,50 +59,77 @@ bool DtxWorkRecord::prepare(TransactionContext* _txn) return succeeded; } -void DtxWorkRecord::commit() +bool DtxWorkRecord::commit(bool onePhase) { - checkCompletion(); - if (txn.get()) { - //already prepared - store->commit(*txn); - txn.reset(); + Mutex::ScopedLock locker(lock); + if (check()) { + if (prepared) { + //already prepared i.e. 2pc + if (onePhase) { + throw ConnectionException(503, + boost::format("Branch with xid %1% has been prepared, one-phase option not valid!") % xid); + } - for_each(work.begin(), work.end(), mem_fn(&TxBuffer::commit)); - } else { - //1pc commit optimisation, don't need a 2pc transaction context: - std::auto_ptr<TransactionContext> localtxn = store->begin(); - if (prepare(localtxn.get())) { - store->commit(*localtxn); + store->commit(*txn); + txn.reset(); + for_each(work.begin(), work.end(), mem_fn(&TxBuffer::commit)); + return true; } else { - store->abort(*localtxn); - abort(); + //1pc commit optimisation, don't need a 2pc transaction context: + if (!onePhase) { + throw ConnectionException(503, + boost::format("Branch with xid %1% has not been prepared, one-phase option required!") % xid); + } + std::auto_ptr<TransactionContext> localtxn = store->begin(); + if (prepare(localtxn.get())) { + store->commit(*localtxn); + for_each(work.begin(), work.end(), mem_fn(&TxBuffer::commit)); + return true; + } else { + store->abort(*localtxn); + abort(); + //TODO: this should probably be flagged as internal error + return false; + } } + } else { + //some part of the work has been marked rollback only + abort(); + return false; } } void DtxWorkRecord::rollback() { - checkCompletion(); + Mutex::ScopedLock locker(lock); + check(); abort(); } void DtxWorkRecord::add(DtxBuffer::shared_ptr ops) { + Mutex::ScopedLock locker(lock); + if (completed) { + throw ConnectionException(503, boost::format("Branch with xid %1% has been completed!") % xid); + } work.push_back(ops); } -void DtxWorkRecord::checkCompletion() +bool DtxWorkRecord::check() { if (!completed) { //iterate through all DtxBuffers and ensure they are all ended for (Work::iterator i = work.begin(); i != work.end(); i++) { if (!(*i)->isEnded()) { throw ConnectionException(503, boost::format("Branch with xid %1% not completed!") % xid); + } else if ((*i)->isRollbackOnly()) { + rolledback = true; } } completed = true; } + return !rolledback; } void DtxWorkRecord::abort() @@ -112,4 +147,5 @@ void DtxWorkRecord::recover(std::auto_ptr<TPCTransactionContext> _txn, DtxBuffer txn = _txn; ops->markEnded(); completed = true; + prepared = true; } diff --git a/qpid/cpp/src/qpid/broker/DtxWorkRecord.h b/qpid/cpp/src/qpid/broker/DtxWorkRecord.h index 0453ea1644..0c6e0ba6bc 100644 --- a/qpid/cpp/src/qpid/broker/DtxWorkRecord.h +++ b/qpid/cpp/src/qpid/broker/DtxWorkRecord.h @@ -27,6 +27,7 @@ #include "DtxBuffer.h" #include "TransactionalStore.h" #include "qpid/framing/amqp_types.h" +#include "qpid/sys/Mutex.h" namespace qpid { namespace broker { @@ -43,17 +44,20 @@ class DtxWorkRecord const std::string xid; TransactionalStore* const store; bool completed; + bool rolledback; + bool prepared; Work work; std::auto_ptr<TPCTransactionContext> txn; + qpid::sys::Mutex lock; - void checkCompletion(); + bool check(); void abort(); bool prepare(TransactionContext* txn); public: DtxWorkRecord(const std::string& xid, TransactionalStore* const store); ~DtxWorkRecord(); bool prepare(); - void commit(); + bool commit(bool onePhase); void rollback(); void add(DtxBuffer::shared_ptr ops); void recover(std::auto_ptr<TPCTransactionContext> txn, DtxBuffer::shared_ptr ops); diff --git a/qpid/cpp/src/qpid/broker/NullMessageStore.cpp b/qpid/cpp/src/qpid/broker/NullMessageStore.cpp index 49feb163bf..7742902cc9 100644 --- a/qpid/cpp/src/qpid/broker/NullMessageStore.cpp +++ b/qpid/cpp/src/qpid/broker/NullMessageStore.cpp @@ -25,6 +25,26 @@ #include <iostream> +namespace qpid{ +namespace broker{ + +const std::string nullxid = ""; + +class DummyCtxt : public TPCTransactionContext +{ + const std::string xid; +public: + DummyCtxt(const std::string& _xid) : xid(_xid) {} + static std::string getXid(TransactionContext& ctxt) + { + DummyCtxt* c(dynamic_cast<DummyCtxt*>(&ctxt)); + return c ? c->xid : nullxid; + } +}; + +} +} + using namespace qpid::broker; NullMessageStore::NullMessageStore(bool _warn) : warn(_warn){} @@ -92,24 +112,27 @@ std::auto_ptr<TransactionContext> NullMessageStore::begin() return std::auto_ptr<TransactionContext>(); } -std::auto_ptr<TPCTransactionContext> NullMessageStore::begin(const std::string&) +std::auto_ptr<TPCTransactionContext> NullMessageStore::begin(const std::string& xid) { - return std::auto_ptr<TPCTransactionContext>(); + return std::auto_ptr<TPCTransactionContext>(new DummyCtxt(xid)); } -void NullMessageStore::prepare(TPCTransactionContext&) +void NullMessageStore::prepare(TPCTransactionContext& ctxt) { + prepared.insert(DummyCtxt::getXid(ctxt)); } -void NullMessageStore::commit(TransactionContext&) +void NullMessageStore::commit(TransactionContext& ctxt) { + prepared.erase(DummyCtxt::getXid(ctxt)); } -void NullMessageStore::abort(TransactionContext&) +void NullMessageStore::abort(TransactionContext& ctxt) { + prepared.erase(DummyCtxt::getXid(ctxt)); } -void NullMessageStore::collectPreparedXids(std::set<string>&) +void NullMessageStore::collectPreparedXids(std::set<string>& out) { - + out.insert(prepared.begin(), prepared.end()); } diff --git a/qpid/cpp/src/qpid/broker/NullMessageStore.h b/qpid/cpp/src/qpid/broker/NullMessageStore.h index 2835961048..e6188b43ce 100644 --- a/qpid/cpp/src/qpid/broker/NullMessageStore.h +++ b/qpid/cpp/src/qpid/broker/NullMessageStore.h @@ -21,6 +21,7 @@ #ifndef _NullMessageStore_ #define _NullMessageStore_ +#include <set> #include "BrokerMessage.h" #include "MessageStore.h" #include "BrokerQueue.h" @@ -33,6 +34,7 @@ namespace broker { */ class NullMessageStore : public MessageStore { + std::set<std::string> prepared; const bool warn; public: NullMessageStore(bool warn = false); diff --git a/qpid/cpp/src/tests/DtxWorkRecordTest.cpp b/qpid/cpp/src/tests/DtxWorkRecordTest.cpp index fc1e536ce3..d7d151f8d6 100644 --- a/qpid/cpp/src/tests/DtxWorkRecordTest.cpp +++ b/qpid/cpp/src/tests/DtxWorkRecordTest.cpp @@ -59,7 +59,7 @@ class DtxWorkRecordTest : public CppUnit::TestCase work.add(bufferA); work.add(bufferB); - work.commit(); + work.commit(true); store.check(); CPPUNIT_ASSERT(store.isCommitted()); @@ -93,7 +93,7 @@ class DtxWorkRecordTest : public CppUnit::TestCase work.add(bufferB); work.add(bufferC); - work.commit(); + work.commit(true); CPPUNIT_ASSERT(store.isAborted()); store.check(); @@ -125,7 +125,7 @@ class DtxWorkRecordTest : public CppUnit::TestCase CPPUNIT_ASSERT(work.prepare()); CPPUNIT_ASSERT(store.isPrepared()); - work.commit(); + work.commit(false); store.check(); CPPUNIT_ASSERT(store.isCommitted()); opA->check(); diff --git a/qpid/cpp/src/tests/python_tests b/qpid/cpp/src/tests/python_tests index dbf56a512a..2c0b6b2071 100755 --- a/qpid/cpp/src/tests/python_tests +++ b/qpid/cpp/src/tests/python_tests @@ -1,7 +1,7 @@ #!/bin/sh # Run the python tests. if test -d ../../../python ; then - cd ../../../python && ./run-tests -v -s "0-9" -I cpp_failing_0-9.txt $PYTHON_TESTS + cd ../../../python && ./run-tests -v -s "0-9" -e ../specs/amqp-dtx-preview.0-9.xml -I cpp_failing_0-9.txt $PYTHON_TESTS else echo Warning: python tests not found. fi diff --git a/qpid/python/amqp-doc b/qpid/python/amqp-doc index 00226d63cb..1f5910f942 100755 --- a/qpid/python/amqp-doc +++ b/qpid/python/amqp-doc @@ -37,15 +37,17 @@ Options: """ % (msg, sys.argv[0])).strip() try: - opts, args = getopt(sys.argv[1:], "s:e", ["regexp", "spec="]) + opts, args = getopt(sys.argv[1:], "s:ea:", ["regexp", "spec=", "additional="]) except GetoptError, e: die(str(e)) regexp = False spec = "../specs/amqp.0-9.xml" +errata = [] for k, v in opts: if k == "-e" or k == "--regexp": regexp = True if k == "-s" or k == "--spec": spec = v + if k == "-a" or k == "--additional": errata.append(v) if regexp: def match(pattern, value): @@ -57,7 +59,7 @@ else: def match(pattern, value): return fnmatch(value, pattern) -spec = load(spec) +spec = load(spec, *errata) methods = {} patterns = args for pattern in patterns: diff --git a/qpid/python/qpid/spec.py b/qpid/python/qpid/spec.py index bb0e7eb58c..f8e37737e2 100644 --- a/qpid/python/qpid/spec.py +++ b/qpid/python/qpid/spec.py @@ -309,8 +309,10 @@ def load(specfile, *errata): for nd in root["constant"]: const = Constant(spec, pythonize(nd["@name"]), int(nd["@value"]), nd.get("@class"), get_docs(nd)) - spec.constants.add(const) - + try: + spec.constants.add(const) + except ValueError, e: + print "Warning:", e # domains are typedefs for nd in root["domain"]: spec.domains.add(Domain(spec, nd.index(), pythonize(nd["@name"]), @@ -320,18 +322,20 @@ def load(specfile, *errata): # classes for c_nd in root["class"]: cname = pythonize(c_nd["@name"]) - if root == spec_root: + if spec.classes.byname.has_key(cname): + klass = spec.classes.byname[cname] + else: klass = Class(spec, cname, int(c_nd["@index"]), c_nd["@handler"], get_docs(c_nd)) spec.classes.add(klass) - else: - klass = spec.classes.byname[cname] added_methods = [] load_fields(c_nd, klass.fields, spec.domains.byname) for m_nd in c_nd["method"]: mname = pythonize(m_nd["@name"]) - if root == spec_root: + if klass.methods.byname.has_key(mname): + meth = klass.methods.byname[mname] + else: meth = Method(klass, mname, int(m_nd["@index"]), m_nd.get_bool("@content", False), @@ -341,8 +345,6 @@ def load(specfile, *errata): get_docs(m_nd)) klass.methods.add(meth) added_methods.append(meth) - else: - meth = klass.methods.byname[mname] load_fields(m_nd, meth.fields, spec.domains.byname) # resolve the responses for m in added_methods: diff --git a/qpid/python/qpid/testlib.py b/qpid/python/qpid/testlib.py index 48a6755d25..fa904ff029 100644 --- a/qpid/python/qpid/testlib.py +++ b/qpid/python/qpid/testlib.py @@ -99,7 +99,7 @@ Options: self.specfile = "0-8" self.errata = [] try: - opts, self.tests = getopt(args, "s:b:h?dvi:I:", ["help", "spec", "server", "verbose", "ignore", "ignore-file"]) + opts, self.tests = getopt(args, "s:e:b:h?dvi:I:", ["help", "spec", "errata=", "server", "verbose", "ignore", "ignore-file"]) except GetoptError, e: self._die(str(e)) for opt, value in opts: @@ -278,14 +278,14 @@ class TestBase(unittest.TestCase): self.assertPublishGet(self.consume(queue), exchange, routing_key, properties) def assertChannelException(self, expectedCode, message): - if not isinstance(message, Message): self.fail("expected channel_close method") + if not isinstance(message, Message): self.fail("expected channel_close method, got %s" % (message)) self.assertEqual("channel", message.method.klass.name) self.assertEqual("close", message.method.name) self.assertEqual(expectedCode, message.reply_code) def assertConnectionException(self, expectedCode, message): - if not isinstance(message, Message): self.fail("expected connection_close method") + if not isinstance(message, Message): self.fail("expected connection_close method, got %s" % (message)) self.assertEqual("connection", message.method.klass.name) self.assertEqual("close", message.method.name) self.assertEqual(expectedCode, message.reply_code) diff --git a/qpid/python/tests_0-9/dtx.py b/qpid/python/tests_0-9/dtx.py new file mode 100644 index 0000000000..ec82c72d49 --- /dev/null +++ b/qpid/python/tests_0-9/dtx.py @@ -0,0 +1,540 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +from qpid.client import Client, Closed +from qpid.queue import Empty +from qpid.content import Content +from qpid.testlib import testrunner, TestBase +from struct import pack, unpack + +class DtxTests(TestBase): + """ + Tests for the amqp dtx related classes. + + Tests of the form test_simple_xxx test the basic transactional + behaviour. The approach here is to 'swap' a message from one queue + to another by consuming and re-publishing in the same + transaction. That transaction is then completed in different ways + and the appropriate result verified. + + The other tests enforce more specific rules and behaviour on a + per-method or per-field basis. + """ + + XA_RBROLLBACK = 1 + XA_OK = 8 + + def test_simple_commit(self): + """ + Test basic one-phase commit behaviour. + """ + channel = self.channel + tx = self.xid("my-xid") + self.txswap(tx, "commit") + + #neither queue should have any messages accessible + self.assertMessageCount(0, "queue-a") + self.assertMessageCount(0, "queue-b") + + #commit + self.assertEqual(self.XA_OK, channel.dtx_coordination_commit(xid=tx, one_phase=True).flags) + + #check result + self.assertMessageCount(0, "queue-a") + self.assertMessageCount(1, "queue-b") + self.assertMessageId("commit", "queue-b") + + def test_simple_prepare_commit(self): + """ + Test basic two-phase commit behaviour. + """ + channel = self.channel + tx = self.xid("my-xid") + self.txswap(tx, "prepare-commit") + + #prepare + self.assertEqual(self.XA_OK, channel.dtx_coordination_prepare(xid=tx).flags) + + #neither queue should have any messages accessible + self.assertMessageCount(0, "queue-a") + self.assertMessageCount(0, "queue-b") + + #commit + self.assertEqual(self.XA_OK, channel.dtx_coordination_commit(xid=tx, one_phase=False).flags) + + #check result + self.assertMessageCount(0, "queue-a") + self.assertMessageCount(1, "queue-b") + self.assertMessageId("prepare-commit", "queue-b") + + + def test_simple_rollback(self): + """ + Test basic rollback behaviour. + """ + channel = self.channel + tx = self.xid("my-xid") + self.txswap(tx, "rollback") + + #neither queue should have any messages accessible + self.assertMessageCount(0, "queue-a") + self.assertMessageCount(0, "queue-b") + + #rollback + self.assertEqual(self.XA_OK, channel.dtx_coordination_rollback(xid=tx).flags) + + #check result + self.assertMessageCount(1, "queue-a") + self.assertMessageCount(0, "queue-b") + self.assertMessageId("rollback", "queue-a") + + def test_simple_prepare_rollback(self): + """ + Test basic rollback behaviour after the transaction has been prepared. + """ + channel = self.channel + tx = self.xid("my-xid") + self.txswap(tx, "prepare-rollback") + + #prepare + self.assertEqual(self.XA_OK, channel.dtx_coordination_prepare(xid=tx).flags) + + #neither queue should have any messages accessible + self.assertMessageCount(0, "queue-a") + self.assertMessageCount(0, "queue-b") + + #rollback + self.assertEqual(self.XA_OK, channel.dtx_coordination_rollback(xid=tx).flags) + + #check result + self.assertMessageCount(1, "queue-a") + self.assertMessageCount(0, "queue-b") + self.assertMessageId("prepare-rollback", "queue-a") + + def test_select_required(self): + """ + check that an error is flagged if select is not issued before + start or end + """ + channel = self.channel + tx = self.xid("dummy") + try: + channel.dtx_demarcation_start(xid=tx) + + #if we get here we have failed, but need to do some cleanup: + channel.dtx_demarcation_end(xid=tx) + channel.dtx_coordination_rollback(xid=tx) + self.fail("Channel not selected for use with dtx, expected exception!") + except Closed, e: + self.assertConnectionException(503, e.args[0]) + + def test_start_already_known(self): + """ + Verify that an attempt to start an association with a + transaction that is already known is not allowed (unless the + join flag is set). + """ + #create two channels on different connection & select them for use with dtx: + channel1 = self.channel + channel1.dtx_demarcation_select() + + other = self.connect() + channel2 = other.channel(1) + channel2.channel_open() + channel2.dtx_demarcation_select() + + #create a xid + tx = self.xid("dummy") + #start work on one channel under that xid: + channel1.dtx_demarcation_start(xid=tx) + #then start on the other without the join set + failed = False + try: + channel2.dtx_demarcation_start(xid=tx) + except Closed, e: + failed = True + error = e + + #cleanup: + if not failed: + channel2.dtx_demarcation_end(xid=tx) + other.close() + channel1.dtx_demarcation_end(xid=tx) + channel1.dtx_coordination_rollback(xid=tx) + + #verification: + if failed: self.assertConnectionException(503, e.args[0]) + else: self.fail("Xid already known, expected exception!") + + def test_forget_xid_on_completion(self): + """ + Verify that a xid is 'forgotten' - and can therefore be used + again - once it is completed. + """ + channel = self.channel + #do some transactional work & complete the transaction + self.test_simple_commit() + + #start association for the same xid as the previously completed txn + tx = self.xid("my-xid") + channel.dtx_demarcation_start(xid=tx) + channel.dtx_demarcation_end(xid=tx) + channel.dtx_coordination_rollback(xid=tx) + + def test_start_join_and_resume(self): + """ + Ensure the correct error is signalled when both the join and + resume flags are set on starting an association between a + channel and a transcation. + """ + channel = self.channel + channel.dtx_demarcation_select() + tx = self.xid("dummy") + try: + channel.dtx_demarcation_start(xid=tx, join=True, resume=True) + #failed, but need some cleanup: + channel.dtx_demarcation_end(xid=tx) + channel.dtx_coordination_rollback(xid=tx) + self.fail("Join and resume both set, expected exception!") + except Closed, e: + self.assertConnectionException(503, e.args[0]) + + def test_start_join(self): + """ + Verify 'join' behaviour, where a channel is associated with a + transaction that is already associated with another channel. + """ + #create two channels & select them for use with dtx: + channel1 = self.channel + channel1.dtx_demarcation_select() + + channel2 = self.client.channel(2) + channel2.channel_open() + channel2.dtx_demarcation_select() + + #setup + channel1.queue_declare(queue="one", exclusive=True) + channel1.queue_declare(queue="two", exclusive=True) + channel1.message_transfer(routing_key="one", message_id="a", body="DtxMessage") + channel1.message_transfer(routing_key="two", message_id="b", body="DtxMessage") + + #create a xid + tx = self.xid("dummy") + #start work on one channel under that xid: + channel1.dtx_demarcation_start(xid=tx) + #then start on the other with the join flag set + channel2.dtx_demarcation_start(xid=tx, join=True) + + #do work through each channel + self.swap(channel1, "one", "two")#swap 'a' from 'one' to 'two' + self.swap(channel2, "two", "one")#swap 'b' from 'two' to 'one' + + #mark end on both channels + channel1.dtx_demarcation_end(xid=tx) + channel2.dtx_demarcation_end(xid=tx) + + #commit and check + channel1.dtx_coordination_commit(xid=tx, one_phase=True) + self.assertMessageCount(1, "one") + self.assertMessageCount(1, "two") + self.assertMessageId("a", "two") + self.assertMessageId("b", "one") + + + def test_suspend_resume(self): + """ + Test suspension and resumption of an association + """ + channel = self.channel + channel.dtx_demarcation_select() + + #setup + channel.queue_declare(queue="one", exclusive=True) + channel.queue_declare(queue="two", exclusive=True) + channel.message_transfer(routing_key="one", message_id="a", body="DtxMessage") + channel.message_transfer(routing_key="two", message_id="b", body="DtxMessage") + + tx = self.xid("dummy") + + channel.dtx_demarcation_start(xid=tx) + self.swap(channel, "one", "two")#swap 'a' from 'one' to 'two' + channel.dtx_demarcation_end(xid=tx, suspend=True) + + channel.dtx_demarcation_start(xid=tx, resume=True) + self.swap(channel, "two", "one")#swap 'b' from 'two' to 'one' + channel.dtx_demarcation_end(xid=tx) + + #commit and check + channel.dtx_coordination_commit(xid=tx, one_phase=True) + self.assertMessageCount(1, "one") + self.assertMessageCount(1, "two") + self.assertMessageId("a", "two") + self.assertMessageId("b", "one") + + def test_end_suspend_and_fail(self): + """ + Verify that the correct error is signalled if the suspend and + fail flag are both set when disassociating a transaction from + the channel + """ + channel = self.channel + channel.dtx_demarcation_select() + tx = self.xid("suspend_and_fail") + channel.dtx_demarcation_start(xid=tx) + try: + channel.dtx_demarcation_end(xid=tx, suspend=True, fail=True) + self.fail("Suspend and fail both set, expected exception!") + except Closed, e: + self.assertConnectionException(503, e.args[0]) + + #cleanup + other = self.connect() + channel = other.channel(1) + channel.channel_open() + channel.dtx_coordination_rollback(xid=tx) + channel.channel_close() + other.close() + + + def test_end_unknown_xid(self): + """ + Verifies that the correct exception is thrown when an attempt + is made to end the association for a xid not previously + associated with the channel + """ + channel = self.channel + channel.dtx_demarcation_select() + tx = self.xid("unknown-xid") + try: + channel.dtx_demarcation_end(xid=tx) + self.fail("Attempted to end association with unknown xid, expected exception!") + except Closed, e: + #FYI: this is currently *not* the exception specified, but I think the spec is wrong! Confirming... + self.assertConnectionException(503, e.args[0]) + + def test_end(self): + """ + Verify that the association is terminated by end and subsequent + operations are non-transactional + """ + channel = self.client.channel(2) + channel.channel_open() + channel.queue_declare(queue="tx-queue", exclusive=True) + + #publish a message under a transaction + channel.dtx_demarcation_select() + tx = self.xid("dummy") + channel.dtx_demarcation_start(xid=tx) + channel.message_transfer(routing_key="tx-queue", message_id="one", body="DtxMessage") + channel.dtx_demarcation_end(xid=tx) + + #now that association with txn is ended, publish another message + channel.message_transfer(routing_key="tx-queue", message_id="two", body="DtxMessage") + + #check the second message is available, but not the first + self.assertMessageCount(1, "tx-queue") + channel.message_consume(queue="tx-queue", destination="results", no_ack=False) + msg = self.client.queue("results").get(timeout=1) + self.assertEqual("two", msg.message_id) + channel.message_cancel(destination="results") + #ack the message then close the channel + msg.ok() + channel.channel_close() + + channel = self.channel + #commit the transaction and check that the first message (and + #only the first message) is then delivered + channel.dtx_coordination_commit(xid=tx, one_phase=True) + self.assertMessageCount(1, "tx-queue") + self.assertMessageId("one", "tx-queue") + + def test_invalid_commit_one_phase_true(self): + """ + Test that a commit with one_phase = True is rejected if the + transaction in question has already been prepared. + """ + other = self.connect() + tester = other.channel(1) + tester.channel_open() + tester.queue_declare(queue="dummy", exclusive=True) + tester.dtx_demarcation_select() + tx = self.xid("dummy") + tester.dtx_demarcation_start(xid=tx) + tester.message_transfer(routing_key="dummy", body="whatever") + tester.dtx_demarcation_end(xid=tx) + tester.dtx_coordination_prepare(xid=tx) + failed = False + try: + tester.dtx_coordination_commit(xid=tx, one_phase=True) + except Closed, e: + failed = True + error = e + + if failed: + self.channel.dtx_coordination_rollback(xid=tx) + self.assertConnectionException(503, e.args[0]) + else: + tester.channel_close() + other.close() + self.fail("Invalid use of one_phase=True, expected exception!") + + def test_invalid_commit_one_phase_false(self): + """ + Test that a commit with one_phase = False is rejected if the + transaction in question has not yet been prepared. + """ + """ + Test that a commit with one_phase = True is rejected if the + transaction in question has already been prepared. + """ + other = self.connect() + tester = other.channel(1) + tester.channel_open() + tester.queue_declare(queue="dummy", exclusive=True) + tester.dtx_demarcation_select() + tx = self.xid("dummy") + tester.dtx_demarcation_start(xid=tx) + tester.message_transfer(routing_key="dummy", body="whatever") + tester.dtx_demarcation_end(xid=tx) + failed = False + try: + tester.dtx_coordination_commit(xid=tx, one_phase=False) + except Closed, e: + failed = True + error = e + + if failed: + self.channel.dtx_coordination_rollback(xid=tx) + self.assertConnectionException(503, e.args[0]) + else: + tester.channel_close() + other.close() + self.fail("Invalid use of one_phase=False, expected exception!") + + def test_implicit_end(self): + """ + Test that an association is implicitly ended when the channel + is closed (whether by exception or explicit client request) + and the transaction in question is marked as rollback only. + """ + channel1 = self.channel + channel2 = self.client.channel(2) + channel2.channel_open() + + #setup: + channel2.queue_declare(queue="dummy", exclusive=True) + channel2.message_transfer(routing_key="dummy", body="whatever") + tx = self.xid("dummy") + + channel2.dtx_demarcation_select() + channel2.dtx_demarcation_start(xid=tx) + channel2.message_get(queue="dummy", destination="dummy") + self.client.queue("dummy").get(timeout=1).ok() + channel2.message_transfer(routing_key="dummy", body="whatever") + channel2.channel_close() + + self.assertEqual(self.XA_RBROLLBACK, channel1.dtx_coordination_prepare(xid=tx).flags) + channel1.dtx_coordination_rollback(xid=tx) + + def test_recover(self): + """ + Test basic recover behaviour + """ + channel = self.channel + + channel.dtx_demarcation_select() + channel.queue_declare(queue="dummy", exclusive=True) + + prepared = [] + for i in range(1, 10): + tx = self.xid("tx%s" % (i)) + channel.dtx_demarcation_start(xid=tx) + channel.message_transfer(routing_key="dummy", body="message%s" % (i)) + channel.dtx_demarcation_end(xid=tx) + if i in [2, 5, 6, 8]: + channel.dtx_coordination_prepare(xid=tx) + prepared.append(tx) + else: + channel.dtx_coordination_rollback(xid=tx) + + indoubt = channel.dtx_coordination_recover().xids + #convert indoubt table to a list of xids (note: this will change for 0-10) + data = indoubt["xids"] + xids = [] + pos = 0 + while pos < len(data): + size = unpack("!B", data[pos])[0] + start = pos + 1 + end = start + size + xid = data[start:end] + xids.append(xid) + pos = end + + #rollback the prepared transactions returned by recover + for x in xids: + channel.dtx_coordination_rollback(xid=x) + + #validate against the expected list of prepared transactions + actual = set(xids) + expected = set(prepared) + intersection = actual.intersection(expected) + + if intersection != expected: + missing = expected.difference(actual) + extra = actual.difference(expected) + for x in missing: + channel.dtx_coordination_rollback(xid=x) + self.fail("Recovered xids not as expected. missing: %s; extra: %s" % (missing, extra)) + + def xid(self, txid, branchqual = ''): + return pack('LBB', 0, len(txid), len(branchqual)) + txid + branchqual + + def txswap(self, tx, id): + channel = self.channel + #declare two queues: + channel.queue_declare(queue="queue-a", exclusive=True) + channel.queue_declare(queue="queue-b", exclusive=True) + #put message with specified id on one queue: + channel.message_transfer(routing_key="queue-a", message_id=id, body="DtxMessage") + + #start the transaction: + channel.dtx_demarcation_select() + self.assertEqual(self.XA_OK, self.channel.dtx_demarcation_start(xid=tx).flags) + + #'swap' the message from one queue to the other, under that transaction: + self.swap(self.channel, "queue-a", "queue-b") + + #mark the end of the transactional work: + self.assertEqual(self.XA_OK, self.channel.dtx_demarcation_end(xid=tx).flags) + + def swap(self, channel, src, dest): + #consume from src: + channel.message_get(destination="temp-swap", queue=src) + msg = self.client.queue("temp-swap").get(timeout=1) + msg.ok(); + + #re-publish to dest + channel.message_transfer(routing_key=dest, message_id=msg.message_id, body=msg.body) + + def assertMessageCount(self, expected, queue): + self.assertEqual(expected, self.channel.queue_declare(queue=queue, passive=True).message_count) + + def assertMessageId(self, expected, queue): + self.channel.message_consume(queue=queue, destination="results", no_ack=True) + self.assertEqual(expected, self.client.queue("results").get(timeout=1).message_id) + self.channel.message_cancel(destination="results") |