diff options
author | Gordon Sim <gsim@apache.org> | 2007-04-20 17:11:23 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2007-04-20 17:11:23 +0000 |
commit | 53605c52439daacf4a0d96a6bf4e9c95a7425b76 (patch) | |
tree | 7c57c4153a463439a8488968eee14415c6d55253 /cpp | |
parent | 0a7a787a38f2761e21219bf99a8a2467dfac5eef (diff) | |
download | qpid-python-53605c52439daacf4a0d96a6bf4e9c95a7425b76.tar.gz |
Added some dtx related unit tests
Added support for suspend and resume
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@530853 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
-rw-r--r-- | cpp/src/qpid/broker/BrokerChannel.cpp | 31 | ||||
-rw-r--r-- | cpp/src/qpid/broker/BrokerChannel.h | 4 | ||||
-rw-r--r-- | cpp/src/qpid/broker/DtxBuffer.cpp | 18 | ||||
-rw-r--r-- | cpp/src/qpid/broker/DtxBuffer.h | 7 | ||||
-rw-r--r-- | cpp/src/qpid/broker/DtxHandlerImpl.cpp | 41 | ||||
-rw-r--r-- | cpp/src/qpid/broker/DtxWorkRecord.cpp | 4 | ||||
-rw-r--r-- | cpp/src/qpid/broker/DtxWorkRecord.h | 5 | ||||
-rw-r--r-- | cpp/src/tests/DtxWorkRecordTest.cpp | 202 | ||||
-rw-r--r-- | cpp/src/tests/Makefile.am | 2 | ||||
-rw-r--r-- | cpp/src/tests/TxBufferTest.cpp | 133 | ||||
-rw-r--r-- | cpp/src/tests/TxMocks.h | 227 |
11 files changed, 520 insertions, 154 deletions
diff --git a/cpp/src/qpid/broker/BrokerChannel.cpp b/cpp/src/qpid/broker/BrokerChannel.cpp index e4ff098c8e..235f320cb7 100644 --- a/cpp/src/qpid/broker/BrokerChannel.cpp +++ b/cpp/src/qpid/broker/BrokerChannel.cpp @@ -26,6 +26,7 @@ #include <functional> #include <boost/bind.hpp> +#include <boost/format.hpp> #include "BrokerChannel.h" #include "qpid/framing/ChannelAdapter.h" @@ -121,12 +122,17 @@ void Channel::rollback(){ } void Channel::startDtx(const std::string& xid, DtxManager& mgr){ - dtxBuffer = DtxBuffer::shared_ptr(new DtxBuffer()); + dtxBuffer = DtxBuffer::shared_ptr(new DtxBuffer(xid)); txBuffer = static_pointer_cast<TxBuffer>(dtxBuffer); mgr.start(xid, dtxBuffer); } -void Channel::endDtx(){ +void Channel::endDtx(const std::string& xid){ + if (dtxBuffer->getXid() != xid) { + throw ConnectionException(503, boost::format("xid specified on start was %1%, but %2% specified on end") + % dtxBuffer->getXid() % xid); + } + TxOp::shared_ptr txAck(new TxAck(accumulatedAck, unacked)); dtxBuffer->enlist(txAck); dtxBuffer->markEnded(); @@ -135,6 +141,27 @@ void Channel::endDtx(){ txBuffer.reset(); } +void Channel::suspendDtx(const std::string& xid){ + if (dtxBuffer->getXid() != xid) { + throw ConnectionException(503, boost::format("xid specified on start was %1%, but %2% specified on suspend") + % dtxBuffer->getXid() % xid); + } + dtxBuffer->setSuspended(true); + txBuffer.reset(); +} + +void Channel::resumeDtx(const std::string& xid){ + if (dtxBuffer->getXid() != xid) { + throw ConnectionException(503, boost::format("xid specified on start was %1%, but %2% specified on resume") + % dtxBuffer->getXid() % xid); + } + if (!dtxBuffer->isSuspended()) { + throw ConnectionException(503, boost::format("xid %1% not suspended")% xid); + } + dtxBuffer->setSuspended(true); + txBuffer = static_pointer_cast<TxBuffer>(dtxBuffer); +} + void Channel::deliver( Message::shared_ptr& msg, const string& consumerTag, Queue::shared_ptr& queue, bool ackExpected) diff --git a/cpp/src/qpid/broker/BrokerChannel.h b/cpp/src/qpid/broker/BrokerChannel.h index 4749ef6b5a..1d0093cf82 100644 --- a/cpp/src/qpid/broker/BrokerChannel.h +++ b/cpp/src/qpid/broker/BrokerChannel.h @@ -138,7 +138,9 @@ class Channel : public framing::ChannelAdapter, void commit(); void rollback(); void startDtx(const std::string& xid, DtxManager& mgr); - void endDtx(); + void endDtx(const std::string& xid); + void suspendDtx(const std::string& xid); + void resumeDtx(const std::string& xid); void ack(); void ack(uint64_t deliveryTag, bool multiple); void ack(uint64_t deliveryTag, uint64_t endTag); diff --git a/cpp/src/qpid/broker/DtxBuffer.cpp b/cpp/src/qpid/broker/DtxBuffer.cpp index bdc326593a..2ffe744293 100644 --- a/cpp/src/qpid/broker/DtxBuffer.cpp +++ b/cpp/src/qpid/broker/DtxBuffer.cpp @@ -23,7 +23,7 @@ using namespace qpid::broker; using qpid::sys::Mutex; -DtxBuffer::DtxBuffer() : ended(false) {} +DtxBuffer::DtxBuffer(const std::string& _xid) : xid(_xid), ended(false), suspended(false) {} DtxBuffer::~DtxBuffer() {} @@ -38,3 +38,19 @@ bool DtxBuffer::isEnded() Mutex::ScopedLock locker(lock); return ended; } + +void DtxBuffer::setSuspended(bool isSuspended) +{ + suspended = isSuspended; +} + +bool DtxBuffer::isSuspended() +{ + return suspended; +} + +const std::string& DtxBuffer::getXid() +{ + return xid; +} + diff --git a/cpp/src/qpid/broker/DtxBuffer.h b/cpp/src/qpid/broker/DtxBuffer.h index 15970ccff0..41be9309e8 100644 --- a/cpp/src/qpid/broker/DtxBuffer.h +++ b/cpp/src/qpid/broker/DtxBuffer.h @@ -28,14 +28,19 @@ namespace qpid { namespace broker { class DtxBuffer : public TxBuffer{ sys::Mutex lock; + const std::string xid; bool ended; + bool suspended; public: typedef boost::shared_ptr<DtxBuffer> shared_ptr; - DtxBuffer(); + DtxBuffer(const std::string& xid = ""); ~DtxBuffer(); void markEnded(); bool isEnded(); + void setSuspended(bool suspended); + bool isSuspended(); + const std::string& getXid(); }; } } diff --git a/cpp/src/qpid/broker/DtxHandlerImpl.cpp b/cpp/src/qpid/broker/DtxHandlerImpl.cpp index 06b69bc20a..1c3fce9cdb 100644 --- a/cpp/src/qpid/broker/DtxHandlerImpl.cpp +++ b/cpp/src/qpid/broker/DtxHandlerImpl.cpp @@ -17,6 +17,7 @@ */ #include "DtxHandlerImpl.h" +#include <boost/format.hpp> #include "Broker.h" #include "BrokerChannel.h" @@ -30,18 +31,6 @@ DtxHandlerImpl::DtxHandlerImpl(CoreRefs& parent) : CoreRefs(parent) {} // DtxDemarcationHandler: -void DtxHandlerImpl::end(const MethodContext& /*context*/, - u_int16_t /*ticket*/, - const string& /*xid*/, - bool /*fail*/, - bool /*suspend*/ ) -{ - channel.endDtx(); - //send end-ok - //TODO: handle fail and suspend - //TODO: check xid is as expected? -} - void DtxHandlerImpl::select(const MethodContext& /*context*/ ) { @@ -49,16 +38,38 @@ void DtxHandlerImpl::select(const MethodContext& /*context*/ ) //send select-ok } +void DtxHandlerImpl::end(const MethodContext& /*context*/, + u_int16_t /*ticket*/, + const string& xid, + bool fail, + bool suspend) +{ + if (fail && suspend) { + throw ConnectionException(503, "End and suspend cannot both be set."); + } + + //TODO: handle fail + if (suspend) { + channel.suspendDtx(xid); + } else { + channel.endDtx(xid); + } + //send end-ok +} void DtxHandlerImpl::start(const MethodContext& /*context*/, u_int16_t /*ticket*/, const string& xid, bool /*join*/, - bool /*resume*/ ) + bool resume) { - channel.startDtx(xid, broker.getDtxManager()); + //TODO: handle join + if (resume) { + channel.resumeDtx(xid); + } else { + channel.startDtx(xid, broker.getDtxManager()); + } //send start-ok - //TODO: handle join and resume } // DtxCoordinationHandler: diff --git a/cpp/src/qpid/broker/DtxWorkRecord.cpp b/cpp/src/qpid/broker/DtxWorkRecord.cpp index 5e31312a8e..218131f6bc 100644 --- a/cpp/src/qpid/broker/DtxWorkRecord.cpp +++ b/cpp/src/qpid/broker/DtxWorkRecord.cpp @@ -25,7 +25,7 @@ using boost::mem_fn; using namespace qpid::broker; -DtxWorkRecord::DtxWorkRecord(const std::string& _xid, TransactionalStore* const _store) : xid(_xid), store(_store) {} +DtxWorkRecord::DtxWorkRecord(const std::string& _xid, TransactionalStore* const _store) : xid(_xid), store(_store), completed(false) {} DtxWorkRecord::~DtxWorkRecord() {} @@ -65,6 +65,7 @@ void DtxWorkRecord::commit() std::auto_ptr<TransactionContext> localtxn = store->begin(); if (prepare(localtxn.get())) { store->commit(*localtxn); + for_each(work.begin(), work.end(), mem_fn(&TxBuffer::commit)); } else { store->abort(*localtxn); abort(); @@ -103,5 +104,4 @@ void DtxWorkRecord::abort() txn.reset(); } for_each(work.begin(), work.end(), mem_fn(&TxBuffer::rollback)); - } diff --git a/cpp/src/qpid/broker/DtxWorkRecord.h b/cpp/src/qpid/broker/DtxWorkRecord.h index 8ad4596963..18b41c7808 100644 --- a/cpp/src/qpid/broker/DtxWorkRecord.h +++ b/cpp/src/qpid/broker/DtxWorkRecord.h @@ -31,6 +31,11 @@ namespace qpid { namespace broker { +/** + * Represents the work done under a particular distributed transaction + * across potentially multiple channels. Identified by a xid. Allows + * that work to be prepared, committed and rolled-back. + */ class DtxWorkRecord { typedef std::vector<DtxBuffer::shared_ptr> Work; diff --git a/cpp/src/tests/DtxWorkRecordTest.cpp b/cpp/src/tests/DtxWorkRecordTest.cpp new file mode 100644 index 0000000000..fc1e536ce3 --- /dev/null +++ b/cpp/src/tests/DtxWorkRecordTest.cpp @@ -0,0 +1,202 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/broker/DtxWorkRecord.h" +#include "qpid_test_plugin.h" +#include <iostream> +#include <vector> +#include "TxMocks.h" + +using namespace qpid::broker; +using boost::static_pointer_cast; + +class DtxWorkRecordTest : public CppUnit::TestCase +{ + CPPUNIT_TEST_SUITE(DtxWorkRecordTest); + CPPUNIT_TEST(testOnePhaseCommit); + CPPUNIT_TEST(testFailOnOnePhaseCommit); + CPPUNIT_TEST(testTwoPhaseCommit); + CPPUNIT_TEST(testFailOnTwoPhaseCommit); + CPPUNIT_TEST(testRollback); + CPPUNIT_TEST_SUITE_END(); + + public: + + void testOnePhaseCommit(){ + MockTransactionalStore store; + store.expectBegin().expectCommit(); + + MockTxOp::shared_ptr opA(new MockTxOp()); + opA->expectPrepare().expectCommit(); + MockTxOp::shared_ptr opB(new MockTxOp()); + opB->expectPrepare().expectCommit(); + + DtxBuffer::shared_ptr bufferA(new DtxBuffer()); + bufferA->enlist(static_pointer_cast<TxOp>(opA)); + bufferA->markEnded(); + DtxBuffer::shared_ptr bufferB(new DtxBuffer()); + bufferB->enlist(static_pointer_cast<TxOp>(opB)); + bufferB->markEnded(); + + DtxWorkRecord work("my-xid", &store); + work.add(bufferA); + work.add(bufferB); + + work.commit(); + + store.check(); + CPPUNIT_ASSERT(store.isCommitted()); + opA->check(); + opB->check(); + } + + void testFailOnOnePhaseCommit(){ + MockTransactionalStore store; + store.expectBegin().expectAbort(); + + MockTxOp::shared_ptr opA(new MockTxOp()); + opA->expectPrepare().expectRollback(); + MockTxOp::shared_ptr opB(new MockTxOp(true)); + opB->expectPrepare().expectRollback(); + MockTxOp::shared_ptr opC(new MockTxOp()); + opC->expectRollback(); + + DtxBuffer::shared_ptr bufferA(new DtxBuffer()); + bufferA->enlist(static_pointer_cast<TxOp>(opA)); + bufferA->markEnded(); + DtxBuffer::shared_ptr bufferB(new DtxBuffer()); + bufferB->enlist(static_pointer_cast<TxOp>(opB)); + bufferB->markEnded(); + DtxBuffer::shared_ptr bufferC(new DtxBuffer()); + bufferC->enlist(static_pointer_cast<TxOp>(opC)); + bufferC->markEnded(); + + DtxWorkRecord work("my-xid", &store); + work.add(bufferA); + work.add(bufferB); + work.add(bufferC); + + work.commit(); + + CPPUNIT_ASSERT(store.isAborted()); + store.check(); + + opA->check(); + opB->check(); + opC->check(); + } + + void testTwoPhaseCommit(){ + MockTransactionalStore store; + store.expectBegin2PC().expectPrepare().expectCommit(); + + MockTxOp::shared_ptr opA(new MockTxOp()); + opA->expectPrepare().expectCommit(); + MockTxOp::shared_ptr opB(new MockTxOp()); + opB->expectPrepare().expectCommit(); + + DtxBuffer::shared_ptr bufferA(new DtxBuffer()); + bufferA->enlist(static_pointer_cast<TxOp>(opA)); + bufferA->markEnded(); + DtxBuffer::shared_ptr bufferB(new DtxBuffer()); + bufferB->enlist(static_pointer_cast<TxOp>(opB)); + bufferB->markEnded(); + + DtxWorkRecord work("my-xid", &store); + work.add(bufferA); + work.add(bufferB); + + CPPUNIT_ASSERT(work.prepare()); + CPPUNIT_ASSERT(store.isPrepared()); + work.commit(); + store.check(); + CPPUNIT_ASSERT(store.isCommitted()); + opA->check(); + opB->check(); + } + + void testFailOnTwoPhaseCommit(){ + MockTransactionalStore store; + store.expectBegin2PC().expectAbort(); + + MockTxOp::shared_ptr opA(new MockTxOp()); + opA->expectPrepare().expectRollback(); + MockTxOp::shared_ptr opB(new MockTxOp(true)); + opB->expectPrepare().expectRollback(); + MockTxOp::shared_ptr opC(new MockTxOp()); + opC->expectRollback(); + + DtxBuffer::shared_ptr bufferA(new DtxBuffer()); + bufferA->enlist(static_pointer_cast<TxOp>(opA)); + bufferA->markEnded(); + DtxBuffer::shared_ptr bufferB(new DtxBuffer()); + bufferB->enlist(static_pointer_cast<TxOp>(opB)); + bufferB->markEnded(); + DtxBuffer::shared_ptr bufferC(new DtxBuffer()); + bufferC->enlist(static_pointer_cast<TxOp>(opC)); + bufferC->markEnded(); + + DtxWorkRecord work("my-xid", &store); + work.add(bufferA); + work.add(bufferB); + work.add(bufferC); + + CPPUNIT_ASSERT(!work.prepare()); + CPPUNIT_ASSERT(store.isAborted()); + store.check(); + opA->check(); + opB->check(); + opC->check(); + } + + void testRollback(){ + MockTransactionalStore store; + store.expectBegin2PC().expectPrepare().expectAbort(); + + MockTxOp::shared_ptr opA(new MockTxOp()); + opA->expectPrepare().expectRollback(); + MockTxOp::shared_ptr opB(new MockTxOp()); + opB->expectPrepare().expectRollback(); + + DtxBuffer::shared_ptr bufferA(new DtxBuffer()); + bufferA->enlist(static_pointer_cast<TxOp>(opA)); + bufferA->markEnded(); + DtxBuffer::shared_ptr bufferB(new DtxBuffer()); + bufferB->enlist(static_pointer_cast<TxOp>(opB)); + bufferB->markEnded(); + + DtxWorkRecord work("my-xid", &store); + work.add(bufferA); + work.add(bufferB); + + CPPUNIT_ASSERT(work.prepare()); + CPPUNIT_ASSERT(store.isPrepared()); + work.rollback(); + store.check(); + CPPUNIT_ASSERT(store.isAborted()); + opA->check(); + opB->check(); + } +}; + +// Make this test suite a plugin. +CPPUNIT_PLUGIN_IMPLEMENT(); +CPPUNIT_TEST_SUITE_REGISTRATION(DtxWorkRecordTest); + diff --git a/cpp/src/tests/Makefile.am b/cpp/src/tests/Makefile.am index 6ca81f960f..935b30ba37 100644 --- a/cpp/src/tests/Makefile.am +++ b/cpp/src/tests/Makefile.am @@ -6,6 +6,7 @@ broker_unit_tests = \ AccumulatedAckTest \ BrokerChannelTest \ ConfigurationTest \ + DtxWorkRecordTest \ ExchangeTest \ HeadersExchangeTest \ InMemoryContentTest \ @@ -72,6 +73,7 @@ EXTRA_DIST = \ InProcessBroker.h \ MockChannel.h \ MockConnectionInputHandler.h \ + TxMocks.h \ qpid_test_plugin.h \ APRBaseTest.cpp diff --git a/cpp/src/tests/TxBufferTest.cpp b/cpp/src/tests/TxBufferTest.cpp index f58d6e356f..afe6d2b0fc 100644 --- a/cpp/src/tests/TxBufferTest.cpp +++ b/cpp/src/tests/TxBufferTest.cpp @@ -22,144 +22,13 @@ #include "qpid_test_plugin.h" #include <iostream> #include <vector> +#include "TxMocks.h" using namespace qpid::broker; using boost::static_pointer_cast; -template <class T> void assertEqualVector(std::vector<T>& expected, std::vector<T>& actual){ - unsigned int i = 0; - while(i < expected.size() && i < actual.size()){ - CPPUNIT_ASSERT_EQUAL(expected[i], actual[i]); - i++; - } - CPPUNIT_ASSERT(i == expected.size()); - CPPUNIT_ASSERT(i == actual.size()); -} - class TxBufferTest : public CppUnit::TestCase { - class MockTxOp : public TxOp{ - enum op_codes {PREPARE=2, COMMIT=4, ROLLBACK=8}; - std::vector<int> expected; - std::vector<int> actual; - bool failOnPrepare; - public: - typedef boost::shared_ptr<MockTxOp> shared_ptr; - - MockTxOp() : failOnPrepare(false) {} - MockTxOp(bool _failOnPrepare) : failOnPrepare(_failOnPrepare) {} - - bool prepare(TransactionContext*) throw(){ - actual.push_back(PREPARE); - return !failOnPrepare; - } - void commit() throw(){ - actual.push_back(COMMIT); - } - void rollback() throw(){ - actual.push_back(ROLLBACK); - } - MockTxOp& expectPrepare(){ - expected.push_back(PREPARE); - return *this; - } - MockTxOp& expectCommit(){ - expected.push_back(COMMIT); - return *this; - } - MockTxOp& expectRollback(){ - expected.push_back(ROLLBACK); - return *this; - } - void check(){ - assertEqualVector(expected, actual); - } - ~MockTxOp(){} - }; - - class MockTransactionalStore : public TransactionalStore{ - enum op_codes {BEGIN=2, COMMIT=4, ABORT=8}; - std::vector<int> expected; - std::vector<int> actual; - - enum states {OPEN = 1, COMMITTED = 2, ABORTED = 3}; - int state; - - class TestTransactionContext : public TransactionContext{ - MockTransactionalStore* store; - public: - TestTransactionContext(MockTransactionalStore* _store) : store(_store) {} - void commit(){ - if(store->state != OPEN) throw "txn already completed"; - store->state = COMMITTED; - } - - void abort(){ - if(store->state != OPEN) throw "txn already completed"; - store->state = ABORTED; - } - ~TestTransactionContext(){} - }; - - - public: - MockTransactionalStore() : state(OPEN){} - - std::auto_ptr<TPCTransactionContext> begin(const std::string&){ - throw "Operation not supported"; - } - void prepare(TPCTransactionContext&){ - throw "Operation not supported"; - } - void collectPreparedXids(std::set<std::string>&) - { - throw "Operation not supported"; - } - - - std::auto_ptr<TransactionContext> begin(){ - actual.push_back(BEGIN); - std::auto_ptr<TransactionContext> txn(new TestTransactionContext(this)); - return txn; - } - void commit(TransactionContext& ctxt){ - actual.push_back(COMMIT); - dynamic_cast<TestTransactionContext&>(ctxt).commit(); - } - void abort(TransactionContext& ctxt){ - actual.push_back(ABORT); - dynamic_cast<TestTransactionContext&>(ctxt).abort(); - } - MockTransactionalStore& expectBegin(){ - expected.push_back(BEGIN); - return *this; - } - MockTransactionalStore& expectCommit(){ - expected.push_back(COMMIT); - return *this; - } - MockTransactionalStore& expectAbort(){ - expected.push_back(ABORT); - return *this; - } - void check(){ - assertEqualVector(expected, actual); - } - - bool isCommitted(){ - return state == COMMITTED; - } - - bool isAborted(){ - return state == ABORTED; - } - - bool isOpen() const{ - return state == OPEN; - } - ~MockTransactionalStore(){} - }; - CPPUNIT_TEST_SUITE(TxBufferTest); CPPUNIT_TEST(testCommitLocal); CPPUNIT_TEST(testFailOnCommitLocal); diff --git a/cpp/src/tests/TxMocks.h b/cpp/src/tests/TxMocks.h new file mode 100644 index 0000000000..e4e74ee535 --- /dev/null +++ b/cpp/src/tests/TxMocks.h @@ -0,0 +1,227 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _tests_TxMocks_h +#define _tests_TxMocks_h + + +#include "qpid/Exception.h" +#include "qpid/broker/TransactionalStore.h" +#include "qpid/broker/TxOp.h" +#include <boost/format.hpp> +#include <iostream> +#include <vector> + +using namespace qpid::broker; +using boost::static_pointer_cast; +using std::string; + +template <class T> void assertEqualVector(std::vector<T>& expected, std::vector<T>& actual){ + unsigned int i = 0; + while(i < expected.size() && i < actual.size()){ + CPPUNIT_ASSERT_EQUAL(expected[i], actual[i]); + i++; + } + if (i < expected.size()) { + throw qpid::Exception(boost::format("Missing %1%") % expected[i]); + } else if (i < actual.size()) { + throw qpid::Exception(boost::format("Extra %1%") % actual[i]); + } + CPPUNIT_ASSERT_EQUAL(expected.size(), actual.size()); +} + +class TxOpConstants{ +protected: + const string PREPARE; + const string COMMIT; + const string ROLLBACK; + + TxOpConstants() : PREPARE("PREPARE"), COMMIT("COMMIT"), ROLLBACK("ROLLBACK") {} +}; + +class MockTxOp : public TxOp, public TxOpConstants{ + std::vector<string> expected; + std::vector<string> actual; + bool failOnPrepare; + string debugName; +public: + typedef boost::shared_ptr<MockTxOp> shared_ptr; + + MockTxOp() : failOnPrepare(false) {} + MockTxOp(bool _failOnPrepare) : failOnPrepare(_failOnPrepare) {} + + void setDebugName(string name){ + debugName = name; + } + + void printExpected(){ + std::cout << std::endl << "MockTxOp[" << debugName << "] expects: "; + for (std::vector<string>::iterator i = expected.begin(); i < expected.end(); i++) { + if(i != expected.begin()) std::cout << ", "; + std::cout << *i; + } + std::cout << std::endl; + } + + void printActual(){ + std::cout << std::endl << "MockTxOp[" << debugName << "] actual: "; + for (std::vector<string>::iterator i = actual.begin(); i < actual.end(); i++) { + if(i != actual.begin()) std::cout << ", "; + std::cout << *i; + } + std::cout << std::endl; + } + + bool prepare(TransactionContext*) throw(){ + actual.push_back(PREPARE); + return !failOnPrepare; + } + void commit() throw(){ + actual.push_back(COMMIT); + } + void rollback() throw(){ + if(!debugName.empty()) std::cout << std::endl << "MockTxOp[" << debugName << "]::rollback()" << std::endl; + actual.push_back(ROLLBACK); + } + MockTxOp& expectPrepare(){ + expected.push_back(PREPARE); + return *this; + } + MockTxOp& expectCommit(){ + expected.push_back(COMMIT); + return *this; + } + MockTxOp& expectRollback(){ + expected.push_back(ROLLBACK); + return *this; + } + void check(){ + assertEqualVector(expected, actual); + } + ~MockTxOp(){} +}; + +class MockTransactionalStore : public TransactionalStore{ + const string BEGIN; + const string BEGIN2PC; + const string PREPARE; + const string COMMIT; + const string ABORT; + std::vector<string> expected; + std::vector<string> actual; + + enum states {OPEN = 1, PREPARED = 2, COMMITTED = 3, ABORTED = 4}; + int state; + + class TestTransactionContext : public TPCTransactionContext{ + MockTransactionalStore* store; + public: + TestTransactionContext(MockTransactionalStore* _store) : store(_store) {} + void prepare(){ + if(!store->isOpen()) throw "txn already completed"; + store->state = PREPARED; + } + + void commit(){ + if(!store->isOpen() && !store->isPrepared()) throw "txn already completed"; + store->state = COMMITTED; + } + + void abort(){ + if(!store->isOpen() && !store->isPrepared()) throw "txn already completed"; + store->state = ABORTED; + } + ~TestTransactionContext(){} + }; + +public: + MockTransactionalStore() : + BEGIN("BEGIN"), BEGIN2PC("BEGIN2PC"), PREPARE("PREPARE"), COMMIT("COMMIT"), ABORT("ABORT"), state(OPEN){} + + void collectPreparedXids(std::set<std::string>&) + { + throw "Operation not supported"; + } + + std::auto_ptr<TPCTransactionContext> begin(const std::string&){ + actual.push_back(BEGIN2PC); + std::auto_ptr<TPCTransactionContext> txn(new TestTransactionContext(this)); + return txn; + } + std::auto_ptr<TransactionContext> begin(){ + actual.push_back(BEGIN); + std::auto_ptr<TransactionContext> txn(new TestTransactionContext(this)); + return txn; + } + void prepare(TPCTransactionContext& ctxt){ + actual.push_back(PREPARE); + dynamic_cast<TestTransactionContext&>(ctxt).prepare(); + } + void commit(TransactionContext& ctxt){ + actual.push_back(COMMIT); + dynamic_cast<TestTransactionContext&>(ctxt).commit(); + } + void abort(TransactionContext& ctxt){ + actual.push_back(ABORT); + dynamic_cast<TestTransactionContext&>(ctxt).abort(); + } + MockTransactionalStore& expectBegin(){ + expected.push_back(BEGIN); + return *this; + } + MockTransactionalStore& expectBegin2PC(){ + expected.push_back(BEGIN2PC); + return *this; + } + MockTransactionalStore& expectPrepare(){ + expected.push_back(PREPARE); + return *this; + } + MockTransactionalStore& expectCommit(){ + expected.push_back(COMMIT); + return *this; + } + MockTransactionalStore& expectAbort(){ + expected.push_back(ABORT); + return *this; + } + void check(){ + assertEqualVector(expected, actual); + } + + bool isPrepared(){ + return state == PREPARED; + } + + bool isCommitted(){ + return state == COMMITTED; + } + + bool isAborted(){ + return state == ABORTED; + } + + bool isOpen() const{ + return state == OPEN; + } + ~MockTransactionalStore(){} +}; + +#endif |