/* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ #include "../broker/TxBuffer.h" #include "qpid_test_plugin.h" #include #include using namespace qpid::broker; using boost::static_pointer_cast; template void assertEqualVector(std::vector& expected, std::vector& actual){ unsigned int i = 0; while(i < expected.size() && i < actual.size()){ CPPUNIT_ASSERT_EQUAL(expected[i], actual[i]); i++; } CPPUNIT_ASSERT(i == expected.size()); CPPUNIT_ASSERT(i == actual.size()); } class TxBufferTest : public CppUnit::TestCase { class MockTxOp : public TxOp{ enum op_codes {PREPARE=2, COMMIT=4, ROLLBACK=8}; std::vector expected; std::vector actual; bool failOnPrepare; public: typedef boost::shared_ptr shared_ptr; MockTxOp() : failOnPrepare(false) {} MockTxOp(bool _failOnPrepare) : failOnPrepare(_failOnPrepare) {} bool prepare(TransactionContext*) throw(){ actual.push_back(PREPARE); return !failOnPrepare; } void commit() throw(){ actual.push_back(COMMIT); } void rollback() throw(){ actual.push_back(ROLLBACK); } MockTxOp& expectPrepare(){ expected.push_back(PREPARE); return *this; } MockTxOp& expectCommit(){ expected.push_back(COMMIT); return *this; } MockTxOp& expectRollback(){ expected.push_back(ROLLBACK); return *this; } void check(){ assertEqualVector(expected, actual); } ~MockTxOp(){} }; class MockTransactionalStore : public TransactionalStore{ enum op_codes {BEGIN=2, COMMIT=4, ABORT=8}; std::vector expected; std::vector actual; enum states {OPEN = 1, COMMITTED = 2, ABORTED = 3}; int state; class TestTransactionContext : public TransactionContext{ MockTransactionalStore* store; public: TestTransactionContext(MockTransactionalStore* _store) : store(_store) {} void commit(){ if(store->state != OPEN) throw "txn already completed"; store->state = COMMITTED; } void abort(){ if(store->state != OPEN) throw "txn already completed"; store->state = ABORTED; } ~TestTransactionContext(){} }; public: MockTransactionalStore() : state(OPEN){} std::auto_ptr begin(const std::string&){ throw "Operation not supported"; } void prepare(TPCTransactionContext&){ throw "Operation not supported"; } void collectPreparedXids(std::set&) { throw "Operation not supported"; } std::auto_ptr begin(){ actual.push_back(BEGIN); std::auto_ptr txn(new TestTransactionContext(this)); return txn; } void commit(TransactionContext& ctxt){ actual.push_back(COMMIT); dynamic_cast(ctxt).commit(); } void abort(TransactionContext& ctxt){ actual.push_back(ABORT); dynamic_cast(ctxt).abort(); } MockTransactionalStore& expectBegin(){ expected.push_back(BEGIN); return *this; } MockTransactionalStore& expectCommit(){ expected.push_back(COMMIT); return *this; } MockTransactionalStore& expectAbort(){ expected.push_back(ABORT); return *this; } void check(){ assertEqualVector(expected, actual); } bool isCommitted(){ return state == COMMITTED; } bool isAborted(){ return state == ABORTED; } bool isOpen() const{ return state == OPEN; } ~MockTransactionalStore(){} }; CPPUNIT_TEST_SUITE(TxBufferTest); CPPUNIT_TEST(testPrepareAndCommit); CPPUNIT_TEST(testFailOnPrepare); CPPUNIT_TEST(testRollback); CPPUNIT_TEST(testBufferIsClearedAfterRollback); CPPUNIT_TEST(testBufferIsClearedAfterCommit); CPPUNIT_TEST_SUITE_END(); public: void testPrepareAndCommit(){ MockTransactionalStore store; store.expectBegin().expectCommit(); MockTxOp::shared_ptr opA(new MockTxOp()); opA->expectPrepare().expectCommit(); MockTxOp::shared_ptr opB(new MockTxOp()); opB->expectPrepare().expectPrepare().expectCommit().expectCommit();//opB enlisted twice to test reative order MockTxOp::shared_ptr opC(new MockTxOp()); opC->expectPrepare().expectCommit(); TxBuffer buffer; buffer.enlist(static_pointer_cast(opA)); buffer.enlist(static_pointer_cast(opB)); buffer.enlist(static_pointer_cast(opB));//opB enlisted twice buffer.enlist(static_pointer_cast(opC)); CPPUNIT_ASSERT(buffer.prepare(&store)); buffer.commit(); store.check(); CPPUNIT_ASSERT(store.isCommitted()); opA->check(); opB->check(); opC->check(); } void testFailOnPrepare(){ MockTransactionalStore store; store.expectBegin().expectAbort(); MockTxOp::shared_ptr opA(new MockTxOp()); opA->expectPrepare(); MockTxOp::shared_ptr opB(new MockTxOp(true)); opB->expectPrepare(); MockTxOp::shared_ptr opC(new MockTxOp());//will never get prepare as b will fail TxBuffer buffer; buffer.enlist(static_pointer_cast(opA)); buffer.enlist(static_pointer_cast(opB)); buffer.enlist(static_pointer_cast(opC)); CPPUNIT_ASSERT(!buffer.prepare(&store)); store.check(); CPPUNIT_ASSERT(store.isAborted()); opA->check(); opB->check(); opC->check(); } void testRollback(){ MockTxOp::shared_ptr opA(new MockTxOp()); opA->expectRollback(); MockTxOp::shared_ptr opB(new MockTxOp(true)); opB->expectRollback(); MockTxOp::shared_ptr opC(new MockTxOp()); opC->expectRollback(); TxBuffer buffer; buffer.enlist(static_pointer_cast(opA)); buffer.enlist(static_pointer_cast(opB)); buffer.enlist(static_pointer_cast(opC)); buffer.rollback(); opA->check(); opB->check(); opC->check(); } void testBufferIsClearedAfterRollback(){ MockTxOp::shared_ptr opA(new MockTxOp()); opA->expectRollback(); MockTxOp::shared_ptr opB(new MockTxOp()); opB->expectRollback(); TxBuffer buffer; buffer.enlist(static_pointer_cast(opA)); buffer.enlist(static_pointer_cast(opB)); buffer.rollback(); buffer.commit();//second call should not reach ops opA->check(); opB->check(); } void testBufferIsClearedAfterCommit(){ MockTxOp::shared_ptr opA(new MockTxOp()); opA->expectCommit(); MockTxOp::shared_ptr opB(new MockTxOp()); opB->expectCommit(); TxBuffer buffer; buffer.enlist(static_pointer_cast(opA)); buffer.enlist(static_pointer_cast(opB)); buffer.commit(); buffer.rollback();//second call should not reach ops opA->check(); opB->check(); } }; // Make this test suite a plugin. CPPUNIT_PLUGIN_IMPLEMENT(); CPPUNIT_TEST_SUITE_REGISTRATION(TxBufferTest);