diff options
Diffstat (limited to 'cpp/test')
-rw-r--r-- | cpp/test/unit/qpid/broker/TxAckTest.cpp | 15 | ||||
-rw-r--r-- | cpp/test/unit/qpid/broker/TxBufferTest.cpp | 50 | ||||
-rw-r--r-- | cpp/test/unit/qpid/broker/TxPublishTest.cpp | 15 |
3 files changed, 64 insertions, 16 deletions
diff --git a/cpp/test/unit/qpid/broker/TxAckTest.cpp b/cpp/test/unit/qpid/broker/TxAckTest.cpp index ab1e607e87..a619809b97 100644 --- a/cpp/test/unit/qpid/broker/TxAckTest.cpp +++ b/cpp/test/unit/qpid/broker/TxAckTest.cpp @@ -26,6 +26,7 @@ using std::list; using std::vector; using namespace qpid::broker; +using namespace qpid::framing; class TxAckTest : public CppUnit::TestCase { @@ -35,7 +36,7 @@ class TxAckTest : public CppUnit::TestCase public: vector<Message::shared_ptr> dequeued; - void dequeue(Message::shared_ptr& msg, const Queue& /*queue*/, const string * const /*xid*/) + void dequeue(TransactionContext*, Message::shared_ptr& msg, const Queue& /*queue*/, const string * const /*xid*/) { dequeued.push_back(msg); } @@ -44,12 +45,12 @@ class TxAckTest : public CppUnit::TestCase void create(const Queue&){} void destroy(const Queue&){} void recover(QueueRegistry&){} - void enqueue(Message::shared_ptr&, const Queue&, const string * const){} + void enqueue(TransactionContext*, Message::shared_ptr&, const Queue&, const string * const){} void committed(const string * const){} void aborted(const string * const){} - void begin(){} - void commit(){} - void abort(){} + TransactionContext* begin(){ return 0; } + void commit(TransactionContext*){} + void abort(TransactionContext*){} ~TestMessageStore(){} }; @@ -73,6 +74,8 @@ public: { for(int i = 0; i < 10; i++){ Message::shared_ptr msg(new Message(0, "exchange", "routing_key", false, false)); + msg->setHeader(AMQHeaderBody::shared_ptr(new AMQHeaderBody(BASIC))); + msg->getHeaderProperties()->setDeliveryMode(PERSISTENT); messages.push_back(msg); deliveries.push_back(DeliveryRecord(msg, queue, "xyz", (i+1))); } @@ -86,7 +89,7 @@ public: void testPrepare() { //ensure acked messages are discarded, i.e. dequeued from store - op.prepare(); + op.prepare(0); CPPUNIT_ASSERT_EQUAL((size_t) 7, store.dequeued.size()); CPPUNIT_ASSERT_EQUAL((size_t) 10, deliveries.size()); CPPUNIT_ASSERT_EQUAL(messages[0], store.dequeued[0]);//msg 1 diff --git a/cpp/test/unit/qpid/broker/TxBufferTest.cpp b/cpp/test/unit/qpid/broker/TxBufferTest.cpp index 65f6327ee4..35b92671aa 100644 --- a/cpp/test/unit/qpid/broker/TxBufferTest.cpp +++ b/cpp/test/unit/qpid/broker/TxBufferTest.cpp @@ -34,6 +34,35 @@ template <class T> void assertEqualVector(std::vector<T>& expected, std::vector< class TxBufferTest : public CppUnit::TestCase { + class TestTransactionContext : public TransactionContext{ + enum states {OPEN = 1, COMMITTED = 2, ABORTED = 3}; + int state; + public: + TestTransactionContext() : state(OPEN) {} + void commit(){ + if(state != OPEN) throw "txn already completed"; + state = COMMITTED; + } + + void abort(){ + if(state != OPEN) throw "txn already completed"; + state = ABORTED; + } + + bool isCommitted(){ + return state == COMMITTED; + } + + bool isAborted(){ + return state == ABORTED; + } + + bool isOpen(){ + return state == OPEN; + } + ~TestTransactionContext(){} + }; + class MockTxOp : public TxOp{ enum op_codes {PREPARE=2, COMMIT=4, ROLLBACK=8}; std::vector<int> expected; @@ -43,7 +72,7 @@ class TxBufferTest : public CppUnit::TestCase MockTxOp() : failOnPrepare(false) {} MockTxOp(bool _failOnPrepare) : failOnPrepare(_failOnPrepare) {} - bool prepare() throw(){ + bool prepare(TransactionContext*) throw(){ actual.push_back(PREPARE); return !failOnPrepare; } @@ -75,15 +104,25 @@ class TxBufferTest : public CppUnit::TestCase enum op_codes {BEGIN=2, COMMIT=4, ABORT=8}; std::vector<int> expected; std::vector<int> actual; + public: - void begin(){ + TestTransactionContext txn; + + TransactionContext* begin(){ actual.push_back(BEGIN); + return &txn; } - void commit(){ + void commit(TransactionContext* ctxt){ actual.push_back(COMMIT); + TestTransactionContext* _txn(dynamic_cast<TestTransactionContext*>(ctxt)); + CPPUNIT_ASSERT_EQUAL(_txn, &txn); + _txn->commit(); } - void abort(){ + void abort(TransactionContext* ctxt){ actual.push_back(ABORT); + TestTransactionContext* _txn(dynamic_cast<TestTransactionContext*>(ctxt)); + CPPUNIT_ASSERT_EQUAL(_txn, &txn); + _txn->abort(); } MockTransactionalStore& expectBegin(){ expected.push_back(BEGIN); @@ -131,6 +170,7 @@ class TxBufferTest : public CppUnit::TestCase CPPUNIT_ASSERT(buffer.prepare(&store)); buffer.commit(); store.check(); + CPPUNIT_ASSERT(store.txn.isCommitted()); opA.check(); opB.check(); opC.check(); @@ -153,6 +193,7 @@ class TxBufferTest : public CppUnit::TestCase CPPUNIT_ASSERT(!buffer.prepare(&store)); store.check(); + CPPUNIT_ASSERT(store.txn.isAborted()); opA.check(); opB.check(); opC.check(); @@ -181,3 +222,4 @@ class TxBufferTest : public CppUnit::TestCase // Make this test suite a plugin. CPPUNIT_PLUGIN_IMPLEMENT(); CPPUNIT_TEST_SUITE_REGISTRATION(TxBufferTest); + diff --git a/cpp/test/unit/qpid/broker/TxPublishTest.cpp b/cpp/test/unit/qpid/broker/TxPublishTest.cpp index 4fe6c7497a..b8387b0752 100644 --- a/cpp/test/unit/qpid/broker/TxPublishTest.cpp +++ b/cpp/test/unit/qpid/broker/TxPublishTest.cpp @@ -27,6 +27,7 @@ using std::list; using std::pair; using std::vector; using namespace qpid::broker; +using namespace qpid::framing; class TxPublishTest : public CppUnit::TestCase { @@ -36,7 +37,7 @@ class TxPublishTest : public CppUnit::TestCase public: vector< pair<string, Message::shared_ptr> > enqueued; - void enqueue(Message::shared_ptr& msg, const Queue& queue, const string * const /*xid*/) + void enqueue(TransactionContext*, Message::shared_ptr& msg, const Queue& queue, const string * const /*xid*/) { enqueued.push_back(pair<string, Message::shared_ptr>(queue.getName(),msg)); } @@ -45,12 +46,12 @@ class TxPublishTest : public CppUnit::TestCase void create(const Queue&){} void destroy(const Queue&){} void recover(QueueRegistry&){} - void dequeue(Message::shared_ptr&, const Queue&, const string * const){} + void dequeue(TransactionContext*, Message::shared_ptr&, const Queue&, const string * const){} void committed(const string * const){} void aborted(const string * const){} - void begin(){} - void commit(){} - void abort(){} + TransactionContext* begin(){ return 0; } + void commit(TransactionContext*){} + void abort(TransactionContext*){} ~TestMessageStore(){} }; @@ -74,6 +75,8 @@ public: msg(new Message(0, "exchange", "routing_key", false, false)), op(msg) { + msg->setHeader(AMQHeaderBody::shared_ptr(new AMQHeaderBody(BASIC))); + msg->getHeaderProperties()->setDeliveryMode(PERSISTENT); op.deliverTo(queue1); op.deliverTo(queue2); } @@ -81,7 +84,7 @@ public: void testPrepare() { //ensure messages are enqueued in store - op.prepare(); + op.prepare(0); CPPUNIT_ASSERT_EQUAL((size_t) 2, store.enqueued.size()); CPPUNIT_ASSERT_EQUAL(string("queue1"), store.enqueued[0].first); CPPUNIT_ASSERT_EQUAL(msg, store.enqueued[0].second); |