summaryrefslogtreecommitdiff
path: root/cpp/test
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/test')
-rw-r--r--cpp/test/unit/qpid/broker/TxAckTest.cpp15
-rw-r--r--cpp/test/unit/qpid/broker/TxBufferTest.cpp50
-rw-r--r--cpp/test/unit/qpid/broker/TxPublishTest.cpp15
3 files changed, 64 insertions, 16 deletions
diff --git a/cpp/test/unit/qpid/broker/TxAckTest.cpp b/cpp/test/unit/qpid/broker/TxAckTest.cpp
index ab1e607e87..a619809b97 100644
--- a/cpp/test/unit/qpid/broker/TxAckTest.cpp
+++ b/cpp/test/unit/qpid/broker/TxAckTest.cpp
@@ -26,6 +26,7 @@
using std::list;
using std::vector;
using namespace qpid::broker;
+using namespace qpid::framing;
class TxAckTest : public CppUnit::TestCase
{
@@ -35,7 +36,7 @@ class TxAckTest : public CppUnit::TestCase
public:
vector<Message::shared_ptr> dequeued;
- void dequeue(Message::shared_ptr& msg, const Queue& /*queue*/, const string * const /*xid*/)
+ void dequeue(TransactionContext*, Message::shared_ptr& msg, const Queue& /*queue*/, const string * const /*xid*/)
{
dequeued.push_back(msg);
}
@@ -44,12 +45,12 @@ class TxAckTest : public CppUnit::TestCase
void create(const Queue&){}
void destroy(const Queue&){}
void recover(QueueRegistry&){}
- void enqueue(Message::shared_ptr&, const Queue&, const string * const){}
+ void enqueue(TransactionContext*, Message::shared_ptr&, const Queue&, const string * const){}
void committed(const string * const){}
void aborted(const string * const){}
- void begin(){}
- void commit(){}
- void abort(){}
+ TransactionContext* begin(){ return 0; }
+ void commit(TransactionContext*){}
+ void abort(TransactionContext*){}
~TestMessageStore(){}
};
@@ -73,6 +74,8 @@ public:
{
for(int i = 0; i < 10; i++){
Message::shared_ptr msg(new Message(0, "exchange", "routing_key", false, false));
+ msg->setHeader(AMQHeaderBody::shared_ptr(new AMQHeaderBody(BASIC)));
+ msg->getHeaderProperties()->setDeliveryMode(PERSISTENT);
messages.push_back(msg);
deliveries.push_back(DeliveryRecord(msg, queue, "xyz", (i+1)));
}
@@ -86,7 +89,7 @@ public:
void testPrepare()
{
//ensure acked messages are discarded, i.e. dequeued from store
- op.prepare();
+ op.prepare(0);
CPPUNIT_ASSERT_EQUAL((size_t) 7, store.dequeued.size());
CPPUNIT_ASSERT_EQUAL((size_t) 10, deliveries.size());
CPPUNIT_ASSERT_EQUAL(messages[0], store.dequeued[0]);//msg 1
diff --git a/cpp/test/unit/qpid/broker/TxBufferTest.cpp b/cpp/test/unit/qpid/broker/TxBufferTest.cpp
index 65f6327ee4..35b92671aa 100644
--- a/cpp/test/unit/qpid/broker/TxBufferTest.cpp
+++ b/cpp/test/unit/qpid/broker/TxBufferTest.cpp
@@ -34,6 +34,35 @@ template <class T> void assertEqualVector(std::vector<T>& expected, std::vector<
class TxBufferTest : public CppUnit::TestCase
{
+ class TestTransactionContext : public TransactionContext{
+ enum states {OPEN = 1, COMMITTED = 2, ABORTED = 3};
+ int state;
+ public:
+ TestTransactionContext() : state(OPEN) {}
+ void commit(){
+ if(state != OPEN) throw "txn already completed";
+ state = COMMITTED;
+ }
+
+ void abort(){
+ if(state != OPEN) throw "txn already completed";
+ state = ABORTED;
+ }
+
+ bool isCommitted(){
+ return state == COMMITTED;
+ }
+
+ bool isAborted(){
+ return state == ABORTED;
+ }
+
+ bool isOpen(){
+ return state == OPEN;
+ }
+ ~TestTransactionContext(){}
+ };
+
class MockTxOp : public TxOp{
enum op_codes {PREPARE=2, COMMIT=4, ROLLBACK=8};
std::vector<int> expected;
@@ -43,7 +72,7 @@ class TxBufferTest : public CppUnit::TestCase
MockTxOp() : failOnPrepare(false) {}
MockTxOp(bool _failOnPrepare) : failOnPrepare(_failOnPrepare) {}
- bool prepare() throw(){
+ bool prepare(TransactionContext*) throw(){
actual.push_back(PREPARE);
return !failOnPrepare;
}
@@ -75,15 +104,25 @@ class TxBufferTest : public CppUnit::TestCase
enum op_codes {BEGIN=2, COMMIT=4, ABORT=8};
std::vector<int> expected;
std::vector<int> actual;
+
public:
- void begin(){
+ TestTransactionContext txn;
+
+ TransactionContext* begin(){
actual.push_back(BEGIN);
+ return &txn;
}
- void commit(){
+ void commit(TransactionContext* ctxt){
actual.push_back(COMMIT);
+ TestTransactionContext* _txn(dynamic_cast<TestTransactionContext*>(ctxt));
+ CPPUNIT_ASSERT_EQUAL(_txn, &txn);
+ _txn->commit();
}
- void abort(){
+ void abort(TransactionContext* ctxt){
actual.push_back(ABORT);
+ TestTransactionContext* _txn(dynamic_cast<TestTransactionContext*>(ctxt));
+ CPPUNIT_ASSERT_EQUAL(_txn, &txn);
+ _txn->abort();
}
MockTransactionalStore& expectBegin(){
expected.push_back(BEGIN);
@@ -131,6 +170,7 @@ class TxBufferTest : public CppUnit::TestCase
CPPUNIT_ASSERT(buffer.prepare(&store));
buffer.commit();
store.check();
+ CPPUNIT_ASSERT(store.txn.isCommitted());
opA.check();
opB.check();
opC.check();
@@ -153,6 +193,7 @@ class TxBufferTest : public CppUnit::TestCase
CPPUNIT_ASSERT(!buffer.prepare(&store));
store.check();
+ CPPUNIT_ASSERT(store.txn.isAborted());
opA.check();
opB.check();
opC.check();
@@ -181,3 +222,4 @@ class TxBufferTest : public CppUnit::TestCase
// Make this test suite a plugin.
CPPUNIT_PLUGIN_IMPLEMENT();
CPPUNIT_TEST_SUITE_REGISTRATION(TxBufferTest);
+
diff --git a/cpp/test/unit/qpid/broker/TxPublishTest.cpp b/cpp/test/unit/qpid/broker/TxPublishTest.cpp
index 4fe6c7497a..b8387b0752 100644
--- a/cpp/test/unit/qpid/broker/TxPublishTest.cpp
+++ b/cpp/test/unit/qpid/broker/TxPublishTest.cpp
@@ -27,6 +27,7 @@ using std::list;
using std::pair;
using std::vector;
using namespace qpid::broker;
+using namespace qpid::framing;
class TxPublishTest : public CppUnit::TestCase
{
@@ -36,7 +37,7 @@ class TxPublishTest : public CppUnit::TestCase
public:
vector< pair<string, Message::shared_ptr> > enqueued;
- void enqueue(Message::shared_ptr& msg, const Queue& queue, const string * const /*xid*/)
+ void enqueue(TransactionContext*, Message::shared_ptr& msg, const Queue& queue, const string * const /*xid*/)
{
enqueued.push_back(pair<string, Message::shared_ptr>(queue.getName(),msg));
}
@@ -45,12 +46,12 @@ class TxPublishTest : public CppUnit::TestCase
void create(const Queue&){}
void destroy(const Queue&){}
void recover(QueueRegistry&){}
- void dequeue(Message::shared_ptr&, const Queue&, const string * const){}
+ void dequeue(TransactionContext*, Message::shared_ptr&, const Queue&, const string * const){}
void committed(const string * const){}
void aborted(const string * const){}
- void begin(){}
- void commit(){}
- void abort(){}
+ TransactionContext* begin(){ return 0; }
+ void commit(TransactionContext*){}
+ void abort(TransactionContext*){}
~TestMessageStore(){}
};
@@ -74,6 +75,8 @@ public:
msg(new Message(0, "exchange", "routing_key", false, false)),
op(msg)
{
+ msg->setHeader(AMQHeaderBody::shared_ptr(new AMQHeaderBody(BASIC)));
+ msg->getHeaderProperties()->setDeliveryMode(PERSISTENT);
op.deliverTo(queue1);
op.deliverTo(queue2);
}
@@ -81,7 +84,7 @@ public:
void testPrepare()
{
//ensure messages are enqueued in store
- op.prepare();
+ op.prepare(0);
CPPUNIT_ASSERT_EQUAL((size_t) 2, store.enqueued.size());
CPPUNIT_ASSERT_EQUAL(string("queue1"), store.enqueued[0].first);
CPPUNIT_ASSERT_EQUAL(msg, store.enqueued[0].second);