summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2007-06-05 15:54:22 +0000
committerGordon Sim <gsim@apache.org>2007-06-05 15:54:22 +0000
commitdd86a8562275d411ba6af54b6651154b6abc08ef (patch)
treee7bd5e5ed838f041857e2f77461ad3f5759448e7 /cpp/src
parentabf98a7ed4bd2d08d88b7f4f5d753b2e6d6dceb2 (diff)
downloadqpid-python-dd86a8562275d411ba6af54b6651154b6abc08ef.tar.gz
Some tests and fixes for dtx preview.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@544522 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/qpid/broker/BrokerChannel.cpp41
-rw-r--r--cpp/src/qpid/broker/BrokerChannel.h6
-rw-r--r--cpp/src/qpid/broker/DtxBuffer.cpp16
-rw-r--r--cpp/src/qpid/broker/DtxBuffer.h4
-rw-r--r--cpp/src/qpid/broker/DtxHandlerImpl.cpp95
-rw-r--r--cpp/src/qpid/broker/DtxManager.cpp41
-rw-r--r--cpp/src/qpid/broker/DtxManager.h10
-rw-r--r--cpp/src/qpid/broker/DtxWorkRecord.cpp82
-rw-r--r--cpp/src/qpid/broker/DtxWorkRecord.h8
-rw-r--r--cpp/src/qpid/broker/NullMessageStore.cpp37
-rw-r--r--cpp/src/qpid/broker/NullMessageStore.h2
-rw-r--r--cpp/src/tests/DtxWorkRecordTest.cpp6
-rwxr-xr-xcpp/src/tests/python_tests2
13 files changed, 267 insertions, 83 deletions
diff --git a/cpp/src/qpid/broker/BrokerChannel.cpp b/cpp/src/qpid/broker/BrokerChannel.cpp
index 096478faad..0c06350c02 100644
--- a/cpp/src/qpid/broker/BrokerChannel.cpp
+++ b/cpp/src/qpid/broker/BrokerChannel.cpp
@@ -61,6 +61,7 @@ Channel::Channel(
prefetchCount(0),
framesize(_framesize),
tagGenerator("sgen"),
+ dtxSelected(false),
accumulatedAck(0),
store(_store),
messageBuilder(this, _store, _stagingThreshold),
@@ -103,6 +104,9 @@ void Channel::cancel(const string& tag){
void Channel::close(){
opened = false;
consumers.clear();
+ if (dtxBuffer.get()) {
+ dtxBuffer->fail();
+ }
recover(true);
}
@@ -123,22 +127,41 @@ void Channel::rollback(){
accumulatedAck.clear();
}
-void Channel::startDtx(const std::string& xid, DtxManager& mgr){
+void Channel::selectDtx(){
+ dtxSelected = true;
+}
+
+void Channel::startDtx(const std::string& xid, DtxManager& mgr, bool join){
+ if (!dtxSelected) {
+ throw ConnectionException(503, "Channel has not been selected for use with dtx");
+ }
dtxBuffer = DtxBuffer::shared_ptr(new DtxBuffer(xid));
txBuffer = static_pointer_cast<TxBuffer>(dtxBuffer);
- mgr.start(xid, dtxBuffer);
+ if (join) {
+ mgr.join(xid, dtxBuffer);
+ } else {
+ mgr.start(xid, dtxBuffer);
+ }
}
-void Channel::endDtx(const std::string& xid){
+void Channel::endDtx(const std::string& xid, bool fail){
+ if (!dtxBuffer) {
+ throw ConnectionException(503, boost::format("xid %1% not associated with this channel") % xid);
+ }
if (dtxBuffer->getXid() != xid) {
throw ConnectionException(503, boost::format("xid specified on start was %1%, but %2% specified on end")
% dtxBuffer->getXid() % xid);
}
- TxOp::shared_ptr txAck(new DtxAck(accumulatedAck, unacked));
- accumulatedAck.clear();
- dtxBuffer->enlist(txAck);
- dtxBuffer->markEnded();
+ if (fail) {
+ accumulatedAck.clear();
+ dtxBuffer->fail();
+ } else {
+ TxOp::shared_ptr txAck(new DtxAck(accumulatedAck, unacked));
+ accumulatedAck.clear();
+ dtxBuffer->enlist(txAck);
+ dtxBuffer->markEnded();
+ }
dtxBuffer.reset();
txBuffer.reset();
@@ -250,7 +273,7 @@ void Channel::complete(Message::shared_ptr msg) {
Exchange::shared_ptr exchange =
connection.broker.getExchanges().get(msg->getExchange());
assert(exchange.get());
- if (txBuffer) {
+ if (txBuffer.get()) {
TxPublish* deliverable(new TxPublish(msg));
TxOp::shared_ptr op(deliverable);
exchange->route(*deliverable, msg->getRoutingKey(),
@@ -276,7 +299,7 @@ void Channel::ack(uint64_t deliveryTag, bool multiple){
}
void Channel::ack(uint64_t firstTag, uint64_t lastTag){
- if (txBuffer) {
+ if (txBuffer.get()) {
accumulatedAck.update(firstTag, lastTag);
//TODO: I think the outstanding prefetch size & count should be updated at this point...
diff --git a/cpp/src/qpid/broker/BrokerChannel.h b/cpp/src/qpid/broker/BrokerChannel.h
index 1d0093cf82..a2f17f85f4 100644
--- a/cpp/src/qpid/broker/BrokerChannel.h
+++ b/cpp/src/qpid/broker/BrokerChannel.h
@@ -92,6 +92,7 @@ class Channel : public framing::ChannelAdapter,
sys::Mutex deliveryLock;
TxBuffer::shared_ptr txBuffer;
DtxBuffer::shared_ptr dtxBuffer;
+ bool dtxSelected;
AccumulatedAck accumulatedAck;
MessageStore* const store;
MessageBuilder messageBuilder;//builder for in-progress message
@@ -137,8 +138,9 @@ class Channel : public framing::ChannelAdapter,
void startTx();
void commit();
void rollback();
- void startDtx(const std::string& xid, DtxManager& mgr);
- void endDtx(const std::string& xid);
+ void selectDtx();
+ void startDtx(const std::string& xid, DtxManager& mgr, bool join);
+ void endDtx(const std::string& xid, bool fail);
void suspendDtx(const std::string& xid);
void resumeDtx(const std::string& xid);
void ack();
diff --git a/cpp/src/qpid/broker/DtxBuffer.cpp b/cpp/src/qpid/broker/DtxBuffer.cpp
index 2ffe744293..7f816ebcf4 100644
--- a/cpp/src/qpid/broker/DtxBuffer.cpp
+++ b/cpp/src/qpid/broker/DtxBuffer.cpp
@@ -23,7 +23,7 @@
using namespace qpid::broker;
using qpid::sys::Mutex;
-DtxBuffer::DtxBuffer(const std::string& _xid) : xid(_xid), ended(false), suspended(false) {}
+DtxBuffer::DtxBuffer(const std::string& _xid) : xid(_xid), ended(false), suspended(false), failed(false) {}
DtxBuffer::~DtxBuffer() {}
@@ -49,6 +49,20 @@ bool DtxBuffer::isSuspended()
return suspended;
}
+void DtxBuffer::fail()
+{
+ Mutex::ScopedLock locker(lock);
+ rollback();
+ failed = true;
+ ended = true;
+}
+
+bool DtxBuffer::isRollbackOnly()
+{
+ Mutex::ScopedLock locker(lock);
+ return failed;
+}
+
const std::string& DtxBuffer::getXid()
{
return xid;
diff --git a/cpp/src/qpid/broker/DtxBuffer.h b/cpp/src/qpid/broker/DtxBuffer.h
index 41be9309e8..0d4e6ccf31 100644
--- a/cpp/src/qpid/broker/DtxBuffer.h
+++ b/cpp/src/qpid/broker/DtxBuffer.h
@@ -31,6 +31,8 @@ namespace qpid {
const std::string xid;
bool ended;
bool suspended;
+ bool failed;
+
public:
typedef boost::shared_ptr<DtxBuffer> shared_ptr;
@@ -40,6 +42,8 @@ namespace qpid {
bool isEnded();
void setSuspended(bool suspended);
bool isSuspended();
+ void fail();
+ bool isRollbackOnly();
const std::string& getXid();
};
}
diff --git a/cpp/src/qpid/broker/DtxHandlerImpl.cpp b/cpp/src/qpid/broker/DtxHandlerImpl.cpp
index 933d787a8a..1d7c2df5f4 100644
--- a/cpp/src/qpid/broker/DtxHandlerImpl.cpp
+++ b/cpp/src/qpid/broker/DtxHandlerImpl.cpp
@@ -23,6 +23,7 @@
using namespace qpid::broker;
using qpid::framing::AMQP_ClientProxy;
+using qpid::framing::Buffer;
using qpid::framing::FieldTable;
using qpid::framing::MethodContext;
using std::string;
@@ -35,12 +36,22 @@ DtxHandlerImpl::DtxHandlerImpl(CoreRefs& parent) :
{
}
+const int XA_RBROLLBACK(1);
+const int XA_RBTIMEOUT(2);
+const int XA_HEURHAZ(3);
+const int XA_HEURCOM(4);
+const int XA_HEURRB(5);
+const int XA_HEURMIX(6);
+const int XA_RDONLY(7);
+const int XA_OK(8);
+
// DtxDemarcationHandler:
void DtxHandlerImpl::select(const MethodContext& context )
{
+ channel.selectDtx();
dClient.selectOk(context.getRequestId());
}
@@ -50,52 +61,58 @@ void DtxHandlerImpl::end(const MethodContext& context,
bool fail,
bool suspend)
{
- if (fail && suspend) {
- throw ConnectionException(503, "End and suspend cannot both be set.");
- }
- //TODO: handle fail
- if (suspend) {
- channel.suspendDtx(xid);
+ if (fail) {
+ channel.endDtx(xid, true);
+ if (suspend) {
+ throw ConnectionException(503, "End and suspend cannot both be set.");
+ } else {
+ dClient.endOk(XA_RBROLLBACK, context.getRequestId());
+ }
} else {
- channel.endDtx(xid);
+ if (suspend) {
+ channel.suspendDtx(xid);
+ } else {
+ channel.endDtx(xid, false);
+ }
+ dClient.endOk(XA_OK, context.getRequestId());
}
- dClient.endOk(0/*TODO - set flags*/, context.getRequestId());
}
void DtxHandlerImpl::start(const MethodContext& context,
u_int16_t /*ticket*/,
const string& xid,
- bool /*join*/,
+ bool join,
bool resume)
{
- //TODO: handle join
+ if (join && resume) {
+ throw ConnectionException(503, "Join and resume cannot both be set.");
+ }
if (resume) {
channel.resumeDtx(xid);
} else {
- channel.startDtx(xid, broker.getDtxManager());
+ channel.startDtx(xid, broker.getDtxManager(), join);
}
- dClient.startOk(0/*TODO - set flags*/, context.getRequestId());
+ dClient.startOk(XA_OK, context.getRequestId());
}
// DtxCoordinationHandler:
void DtxHandlerImpl::prepare(const MethodContext& context,
u_int16_t /*ticket*/,
- const string& xid )
+ const string& xid)
{
- broker.getDtxManager().prepare(xid);
- cClient.prepareOk(0/*TODO - set flags*/, context.getRequestId());
+ bool ok = broker.getDtxManager().prepare(xid);
+ cClient.prepareOk(ok ? XA_OK : XA_RBROLLBACK, context.getRequestId());
}
void DtxHandlerImpl::commit(const MethodContext& context,
u_int16_t /*ticket*/,
const string& xid,
- bool /*onePhase*/ )
+ bool onePhase)
{
- //TODO use onePhase flag to validate correct sequence
- broker.getDtxManager().commit(xid);
- cClient.commitOk(0/*TODO - set flags*/, context.getRequestId());
+ bool ok = broker.getDtxManager().commit(xid, onePhase);
+ cClient.commitOk(ok ? XA_OK : XA_RBROLLBACK, context.getRequestId());
}
@@ -104,22 +121,54 @@ void DtxHandlerImpl::rollback(const MethodContext& context,
const string& xid )
{
broker.getDtxManager().rollback(xid);
- cClient.rollbackOk(0/*TODO - set flags*/, context.getRequestId());
+ cClient.rollbackOk(XA_OK, context.getRequestId());
}
-void DtxHandlerImpl::recover(const MethodContext& /*context*/,
+void DtxHandlerImpl::recover(const MethodContext& context,
u_int16_t /*ticket*/,
bool /*startscan*/,
u_int32_t /*endscan*/ )
{
//TODO
+
+ //TODO: what do startscan and endscan actually mean?
+
+ // response should hold on key value pair with key = 'xids' and
+ // value = sequence of xids
+
+ // until sequences are supported (0-10 encoding), an alternate
+ // scheme is used for testing:
+ //
+ // key = 'xids' and value = a longstr containing shortstrs for each xid
+ //
+ // note that this restricts the length of the xids more than is
+ // strictly 'legal', but that is ok for testing
+ std::set<std::string> xids;
+ broker.getStore().collectPreparedXids(xids);
+ uint size(0);
+ for (std::set<std::string>::iterator i = xids.begin(); i != xids.end(); i++) {
+ size += i->size() + 1/*shortstr size*/;
+ }
+ Buffer buffer(size + 4/*longstr size*/);
+ buffer.putLong(size);
+ for (std::set<std::string>::iterator i = xids.begin(); i != xids.end(); i++) {
+ buffer.putShortString(*i);
+ }
+ buffer.flip();
+ string data;
+ buffer.getLongString(data);
+
+ FieldTable response;
+ response.setString("xids", data);
+ cClient.recoverOk(response, context.getRequestId());
}
void DtxHandlerImpl::forget(const MethodContext& /*context*/,
u_int16_t /*ticket*/,
- const string& /*xid*/ )
+ const string& xid)
{
- //TODO
+ //Currently no heuristic completion is supported, so this should never be used.
+ throw ConnectionException(503, boost::format("Forget is invalid. Branch with xid %1% not heuristically completed!") % xid);
}
void DtxHandlerImpl::getTimeout(const MethodContext& /*context*/,
diff --git a/cpp/src/qpid/broker/DtxManager.cpp b/cpp/src/qpid/broker/DtxManager.cpp
index 0b0262902b..b05f7b9784 100644
--- a/cpp/src/qpid/broker/DtxManager.cpp
+++ b/cpp/src/qpid/broker/DtxManager.cpp
@@ -21,6 +21,7 @@
#include "DtxManager.h"
#include <boost/format.hpp>
#include <iostream>
+using qpid::sys::Mutex;
using namespace qpid::broker;
@@ -30,31 +31,40 @@ DtxManager::~DtxManager() {}
void DtxManager::start(std::string xid, DtxBuffer::shared_ptr ops)
{
- getOrCreateWork(xid)->add(ops);
+ createWork(xid)->add(ops);
+}
+
+void DtxManager::join(std::string xid, DtxBuffer::shared_ptr ops)
+{
+ getWork(xid)->add(ops);
}
void DtxManager::recover(std::string xid, std::auto_ptr<TPCTransactionContext> txn, DtxBuffer::shared_ptr ops)
{
- getOrCreateWork(xid)->recover(txn, ops);
+ createWork(xid)->recover(txn, ops);
}
-void DtxManager::prepare(const std::string& xid)
+bool DtxManager::prepare(const std::string& xid)
{
- getWork(xid)->prepare();
+ return getWork(xid)->prepare();
}
-void DtxManager::commit(const std::string& xid)
+bool DtxManager::commit(const std::string& xid, bool onePhase)
{
- getWork(xid)->commit();
+ bool result = getWork(xid)->commit(onePhase);
+ remove(xid);
+ return result;
}
void DtxManager::rollback(const std::string& xid)
{
getWork(xid)->rollback();
+ remove(xid);
}
DtxManager::WorkMap::iterator DtxManager::getWork(const std::string& xid)
{
+ Mutex::ScopedLock locker(lock);
WorkMap::iterator i = work.find(xid);
if (i == work.end()) {
throw ConnectionException(503, boost::format("Unrecognised xid %1%!") % xid);
@@ -62,11 +72,24 @@ DtxManager::WorkMap::iterator DtxManager::getWork(const std::string& xid)
return i;
}
-DtxManager::WorkMap::iterator DtxManager::getOrCreateWork(std::string& xid)
+void DtxManager::remove(const std::string& xid)
{
+ Mutex::ScopedLock locker(lock);
WorkMap::iterator i = work.find(xid);
if (i == work.end()) {
- i = work.insert(xid, new DtxWorkRecord(xid, store)).first;
+ throw ConnectionException(503, boost::format("Unrecognised xid %1%!") % xid);
+ } else {
+ work.erase(i);
+ }
+}
+
+DtxManager::WorkMap::iterator DtxManager::createWork(std::string& xid)
+{
+ Mutex::ScopedLock locker(lock);
+ WorkMap::iterator i = work.find(xid);
+ if (i != work.end()) {
+ throw ConnectionException(503, boost::format("Xid %1% is already known (use 'join' to add work to an existing xid)!") % xid);
+ } else {
+ return work.insert(xid, new DtxWorkRecord(xid, store)).first;
}
- return i;
}
diff --git a/cpp/src/qpid/broker/DtxManager.h b/cpp/src/qpid/broker/DtxManager.h
index e908faecac..ce33f77a0f 100644
--- a/cpp/src/qpid/broker/DtxManager.h
+++ b/cpp/src/qpid/broker/DtxManager.h
@@ -26,6 +26,7 @@
#include "DtxWorkRecord.h"
#include "TransactionalStore.h"
#include "qpid/framing/amqp_types.h"
+#include "qpid/sys/Mutex.h"
namespace qpid {
namespace broker {
@@ -35,17 +36,20 @@ class DtxManager{
WorkMap work;
TransactionalStore* const store;
+ qpid::sys::Mutex lock;
+ void remove(const std::string& xid);
WorkMap::iterator getWork(const std::string& xid);
- WorkMap::iterator getOrCreateWork(std::string& xid);
+ WorkMap::iterator createWork(std::string& xid);
public:
DtxManager(TransactionalStore* const store);
~DtxManager();
void start(std::string xid, DtxBuffer::shared_ptr work);
+ void join(std::string xid, DtxBuffer::shared_ptr work);
void recover(std::string xid, std::auto_ptr<TPCTransactionContext> txn, DtxBuffer::shared_ptr work);
- void prepare(const std::string& xid);
- void commit(const std::string& xid);
+ bool prepare(const std::string& xid);
+ bool commit(const std::string& xid, bool onePhase);
void rollback(const std::string& xid);
};
diff --git a/cpp/src/qpid/broker/DtxWorkRecord.cpp b/cpp/src/qpid/broker/DtxWorkRecord.cpp
index 0cfffb41a1..1eb4903672 100644
--- a/cpp/src/qpid/broker/DtxWorkRecord.cpp
+++ b/cpp/src/qpid/broker/DtxWorkRecord.cpp
@@ -22,24 +22,32 @@
#include <boost/format.hpp>
#include <boost/mem_fn.hpp>
using boost::mem_fn;
+using qpid::sys::Mutex;
using namespace qpid::broker;
-DtxWorkRecord::DtxWorkRecord(const std::string& _xid, TransactionalStore* const _store) : xid(_xid), store(_store), completed(false) {}
+DtxWorkRecord::DtxWorkRecord(const std::string& _xid, TransactionalStore* const _store) :
+ xid(_xid), store(_store), completed(false), rolledback(false), prepared(false) {}
DtxWorkRecord::~DtxWorkRecord() {}
bool DtxWorkRecord::prepare()
{
- checkCompletion();
- txn = store->begin(xid);
- if (prepare(txn.get())) {
- store->prepare(*txn);
- return true;
+ Mutex::ScopedLock locker(lock);
+ if (check()) {
+ txn = store->begin(xid);
+ if (prepare(txn.get())) {
+ store->prepare(*txn);
+ prepared = true;
+ } else {
+ abort();
+ //TODO: this should probably be flagged as internal error
+ }
} else {
+ //some part of the work has been marked rollback only
abort();
- return false;
}
+ return prepared;
}
bool DtxWorkRecord::prepare(TransactionContext* _txn)
@@ -51,50 +59,77 @@ bool DtxWorkRecord::prepare(TransactionContext* _txn)
return succeeded;
}
-void DtxWorkRecord::commit()
+bool DtxWorkRecord::commit(bool onePhase)
{
- checkCompletion();
- if (txn.get()) {
- //already prepared
- store->commit(*txn);
- txn.reset();
+ Mutex::ScopedLock locker(lock);
+ if (check()) {
+ if (prepared) {
+ //already prepared i.e. 2pc
+ if (onePhase) {
+ throw ConnectionException(503,
+ boost::format("Branch with xid %1% has been prepared, one-phase option not valid!") % xid);
+ }
- for_each(work.begin(), work.end(), mem_fn(&TxBuffer::commit));
- } else {
- //1pc commit optimisation, don't need a 2pc transaction context:
- std::auto_ptr<TransactionContext> localtxn = store->begin();
- if (prepare(localtxn.get())) {
- store->commit(*localtxn);
+ store->commit(*txn);
+ txn.reset();
+
for_each(work.begin(), work.end(), mem_fn(&TxBuffer::commit));
+ return true;
} else {
- store->abort(*localtxn);
- abort();
+ //1pc commit optimisation, don't need a 2pc transaction context:
+ if (!onePhase) {
+ throw ConnectionException(503,
+ boost::format("Branch with xid %1% has not been prepared, one-phase option required!") % xid);
+ }
+ std::auto_ptr<TransactionContext> localtxn = store->begin();
+ if (prepare(localtxn.get())) {
+ store->commit(*localtxn);
+ for_each(work.begin(), work.end(), mem_fn(&TxBuffer::commit));
+ return true;
+ } else {
+ store->abort(*localtxn);
+ abort();
+ //TODO: this should probably be flagged as internal error
+ return false;
+ }
}
+ } else {
+ //some part of the work has been marked rollback only
+ abort();
+ return false;
}
}
void DtxWorkRecord::rollback()
{
- checkCompletion();
+ Mutex::ScopedLock locker(lock);
+ check();
abort();
}
void DtxWorkRecord::add(DtxBuffer::shared_ptr ops)
{
+ Mutex::ScopedLock locker(lock);
+ if (completed) {
+ throw ConnectionException(503, boost::format("Branch with xid %1% has been completed!") % xid);
+ }
work.push_back(ops);
}
-void DtxWorkRecord::checkCompletion()
+bool DtxWorkRecord::check()
{
if (!completed) {
//iterate through all DtxBuffers and ensure they are all ended
for (Work::iterator i = work.begin(); i != work.end(); i++) {
if (!(*i)->isEnded()) {
throw ConnectionException(503, boost::format("Branch with xid %1% not completed!") % xid);
+ } else if ((*i)->isRollbackOnly()) {
+ rolledback = true;
}
}
completed = true;
}
+ return !rolledback;
}
void DtxWorkRecord::abort()
@@ -112,4 +147,5 @@ void DtxWorkRecord::recover(std::auto_ptr<TPCTransactionContext> _txn, DtxBuffer
txn = _txn;
ops->markEnded();
completed = true;
+ prepared = true;
}
diff --git a/cpp/src/qpid/broker/DtxWorkRecord.h b/cpp/src/qpid/broker/DtxWorkRecord.h
index 0453ea1644..0c6e0ba6bc 100644
--- a/cpp/src/qpid/broker/DtxWorkRecord.h
+++ b/cpp/src/qpid/broker/DtxWorkRecord.h
@@ -27,6 +27,7 @@
#include "DtxBuffer.h"
#include "TransactionalStore.h"
#include "qpid/framing/amqp_types.h"
+#include "qpid/sys/Mutex.h"
namespace qpid {
namespace broker {
@@ -43,17 +44,20 @@ class DtxWorkRecord
const std::string xid;
TransactionalStore* const store;
bool completed;
+ bool rolledback;
+ bool prepared;
Work work;
std::auto_ptr<TPCTransactionContext> txn;
+ qpid::sys::Mutex lock;
- void checkCompletion();
+ bool check();
void abort();
bool prepare(TransactionContext* txn);
public:
DtxWorkRecord(const std::string& xid, TransactionalStore* const store);
~DtxWorkRecord();
bool prepare();
- void commit();
+ bool commit(bool onePhase);
void rollback();
void add(DtxBuffer::shared_ptr ops);
void recover(std::auto_ptr<TPCTransactionContext> txn, DtxBuffer::shared_ptr ops);
diff --git a/cpp/src/qpid/broker/NullMessageStore.cpp b/cpp/src/qpid/broker/NullMessageStore.cpp
index 49feb163bf..7742902cc9 100644
--- a/cpp/src/qpid/broker/NullMessageStore.cpp
+++ b/cpp/src/qpid/broker/NullMessageStore.cpp
@@ -25,6 +25,26 @@
#include <iostream>
+namespace qpid{
+namespace broker{
+
+const std::string nullxid = "";
+
+class DummyCtxt : public TPCTransactionContext
+{
+ const std::string xid;
+public:
+ DummyCtxt(const std::string& _xid) : xid(_xid) {}
+ static std::string getXid(TransactionContext& ctxt)
+ {
+ DummyCtxt* c(dynamic_cast<DummyCtxt*>(&ctxt));
+ return c ? c->xid : nullxid;
+ }
+};
+
+}
+}
+
using namespace qpid::broker;
NullMessageStore::NullMessageStore(bool _warn) : warn(_warn){}
@@ -92,24 +112,27 @@ std::auto_ptr<TransactionContext> NullMessageStore::begin()
return std::auto_ptr<TransactionContext>();
}
-std::auto_ptr<TPCTransactionContext> NullMessageStore::begin(const std::string&)
+std::auto_ptr<TPCTransactionContext> NullMessageStore::begin(const std::string& xid)
{
- return std::auto_ptr<TPCTransactionContext>();
+ return std::auto_ptr<TPCTransactionContext>(new DummyCtxt(xid));
}
-void NullMessageStore::prepare(TPCTransactionContext&)
+void NullMessageStore::prepare(TPCTransactionContext& ctxt)
{
+ prepared.insert(DummyCtxt::getXid(ctxt));
}
-void NullMessageStore::commit(TransactionContext&)
+void NullMessageStore::commit(TransactionContext& ctxt)
{
+ prepared.erase(DummyCtxt::getXid(ctxt));
}
-void NullMessageStore::abort(TransactionContext&)
+void NullMessageStore::abort(TransactionContext& ctxt)
{
+ prepared.erase(DummyCtxt::getXid(ctxt));
}
-void NullMessageStore::collectPreparedXids(std::set<string>&)
+void NullMessageStore::collectPreparedXids(std::set<string>& out)
{
-
+ out.insert(prepared.begin(), prepared.end());
}
diff --git a/cpp/src/qpid/broker/NullMessageStore.h b/cpp/src/qpid/broker/NullMessageStore.h
index 2835961048..e6188b43ce 100644
--- a/cpp/src/qpid/broker/NullMessageStore.h
+++ b/cpp/src/qpid/broker/NullMessageStore.h
@@ -21,6 +21,7 @@
#ifndef _NullMessageStore_
#define _NullMessageStore_
+#include <set>
#include "BrokerMessage.h"
#include "MessageStore.h"
#include "BrokerQueue.h"
@@ -33,6 +34,7 @@ namespace broker {
*/
class NullMessageStore : public MessageStore
{
+ std::set<std::string> prepared;
const bool warn;
public:
NullMessageStore(bool warn = false);
diff --git a/cpp/src/tests/DtxWorkRecordTest.cpp b/cpp/src/tests/DtxWorkRecordTest.cpp
index fc1e536ce3..d7d151f8d6 100644
--- a/cpp/src/tests/DtxWorkRecordTest.cpp
+++ b/cpp/src/tests/DtxWorkRecordTest.cpp
@@ -59,7 +59,7 @@ class DtxWorkRecordTest : public CppUnit::TestCase
work.add(bufferA);
work.add(bufferB);
- work.commit();
+ work.commit(true);
store.check();
CPPUNIT_ASSERT(store.isCommitted());
@@ -93,7 +93,7 @@ class DtxWorkRecordTest : public CppUnit::TestCase
work.add(bufferB);
work.add(bufferC);
- work.commit();
+ work.commit(true);
CPPUNIT_ASSERT(store.isAborted());
store.check();
@@ -125,7 +125,7 @@ class DtxWorkRecordTest : public CppUnit::TestCase
CPPUNIT_ASSERT(work.prepare());
CPPUNIT_ASSERT(store.isPrepared());
- work.commit();
+ work.commit(false);
store.check();
CPPUNIT_ASSERT(store.isCommitted());
opA->check();
diff --git a/cpp/src/tests/python_tests b/cpp/src/tests/python_tests
index dbf56a512a..2c0b6b2071 100755
--- a/cpp/src/tests/python_tests
+++ b/cpp/src/tests/python_tests
@@ -1,7 +1,7 @@
#!/bin/sh
# Run the python tests.
if test -d ../../../python ; then
- cd ../../../python && ./run-tests -v -s "0-9" -I cpp_failing_0-9.txt $PYTHON_TESTS
+ cd ../../../python && ./run-tests -v -s "0-9" -e ../specs/amqp-dtx-preview.0-9.xml -I cpp_failing_0-9.txt $PYTHON_TESTS
else
echo Warning: python tests not found.
fi