diff options
author | Gordon Sim <gsim@apache.org> | 2006-10-31 13:10:17 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2006-10-31 13:10:17 +0000 |
commit | 6a793a2ad19b692e9fb5a053607be5a369ae36b3 (patch) | |
tree | 831fc829d8d28bf4226e046e4ae9e09186e4feee | |
parent | 5eb36a210ab9e2bed9d06fc0e5a92ffad09cc66a (diff) | |
download | qpid-python-6a793a2ad19b692e9fb5a053607be5a369ae36b3.tar.gz |
Some additional unit tests.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@469471 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | cpp/src/qpid/broker/AccumulatedAck.cpp | 2 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Queue.cpp | 6 | ||||
-rw-r--r-- | cpp/src/qpid/broker/TxAck.cpp | 2 | ||||
-rw-r--r-- | cpp/src/qpid/broker/TxAck.h | 13 | ||||
-rw-r--r-- | cpp/test/unit/qpid/broker/AccumulatedAckTest.cpp | 57 | ||||
-rw-r--r-- | cpp/test/unit/qpid/broker/ConfigurationTest.cpp | 94 | ||||
-rw-r--r-- | cpp/test/unit/qpid/broker/TxAckTest.cpp | 112 | ||||
-rw-r--r-- | cpp/test/unit/qpid/broker/TxBufferTest.cpp | 183 |
8 files changed, 461 insertions, 8 deletions
diff --git a/cpp/src/qpid/broker/AccumulatedAck.cpp b/cpp/src/qpid/broker/AccumulatedAck.cpp index 86e1a5e786..060e940309 100644 --- a/cpp/src/qpid/broker/AccumulatedAck.cpp +++ b/cpp/src/qpid/broker/AccumulatedAck.cpp @@ -42,5 +42,5 @@ void AccumulatedAck::clear(){ } bool AccumulatedAck::covers(u_int64_t tag) const{ - return tag < range || find(individual.begin(), individual.end(), tag) != individual.end(); + return tag <= range || find(individual.begin(), individual.end(), tag) != individual.end(); } diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp index 14a89f7a66..67fb6764be 100644 --- a/cpp/src/qpid/broker/Queue.cpp +++ b/cpp/src/qpid/broker/Queue.cpp @@ -165,15 +165,13 @@ bool Queue::canAutoDelete() const{ } void Queue::enqueue(Message::shared_ptr& msg, const string * const xid){ - bool persistent(false);//TODO: pull this from headers - if(persistent){ + if(store){ store->enqueue(msg, name, xid); } } void Queue::dequeue(Message::shared_ptr& msg, const string * const xid){ - bool persistent(false);//TODO: pull this from headers - if(persistent){ + if(store){ store->dequeue(msg, name, xid); } } diff --git a/cpp/src/qpid/broker/TxAck.cpp b/cpp/src/qpid/broker/TxAck.cpp index 7e787a463e..6dba6fd79d 100644 --- a/cpp/src/qpid/broker/TxAck.cpp +++ b/cpp/src/qpid/broker/TxAck.cpp @@ -22,7 +22,7 @@ using std::bind2nd; using std::mem_fun_ref; using namespace qpid::broker; -TxAck::TxAck(AccumulatedAck _acked, std::list<DeliveryRecord>& _unacked) : acked(_acked), unacked(_unacked){ +TxAck::TxAck(AccumulatedAck& _acked, std::list<DeliveryRecord>& _unacked) : acked(_acked), unacked(_unacked){ } diff --git a/cpp/src/qpid/broker/TxAck.h b/cpp/src/qpid/broker/TxAck.h index 645bf1b1b0..9046a89384 100644 --- a/cpp/src/qpid/broker/TxAck.h +++ b/cpp/src/qpid/broker/TxAck.h @@ -27,11 +27,20 @@ namespace qpid { namespace broker { + /** + * Defines the transactional behaviour for acks received by a + * transactional channel. + */ class TxAck : public TxOp{ - AccumulatedAck acked; + AccumulatedAck& acked; std::list<DeliveryRecord>& unacked; public: - TxAck(AccumulatedAck acked, std::list<DeliveryRecord>& unacked); + /** + * @param acked a representation of the accumulation of + * acks received + * @param unacked the record of delivered messages + */ + TxAck(AccumulatedAck& acked, std::list<DeliveryRecord>& unacked); virtual bool prepare() throw(); virtual void commit() throw(); virtual void rollback() throw(); diff --git a/cpp/test/unit/qpid/broker/AccumulatedAckTest.cpp b/cpp/test/unit/qpid/broker/AccumulatedAckTest.cpp new file mode 100644 index 0000000000..a4459cf0c2 --- /dev/null +++ b/cpp/test/unit/qpid/broker/AccumulatedAckTest.cpp @@ -0,0 +1,57 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include "qpid/broker/AccumulatedAck.h" +#include <qpid_test_plugin.h> +#include <iostream> +#include <list> + +using std::list; +using namespace qpid::broker; + +class AccumulatedAckTest : public CppUnit::TestCase +{ + CPPUNIT_TEST_SUITE(AccumulatedAckTest); + CPPUNIT_TEST(testCovers); + CPPUNIT_TEST_SUITE_END(); + + public: + void testCovers() + { + AccumulatedAck ack; + ack.range = 5; + ack.individual.push_back(7); + ack.individual.push_back(9); + + CPPUNIT_ASSERT(ack.covers(1)); + CPPUNIT_ASSERT(ack.covers(2)); + CPPUNIT_ASSERT(ack.covers(3)); + CPPUNIT_ASSERT(ack.covers(4)); + CPPUNIT_ASSERT(ack.covers(5)); + CPPUNIT_ASSERT(ack.covers(7)); + CPPUNIT_ASSERT(ack.covers(9)); + + CPPUNIT_ASSERT(!ack.covers(6)); + CPPUNIT_ASSERT(!ack.covers(8)); + CPPUNIT_ASSERT(!ack.covers(10)); + } +}; + +// Make this test suite a plugin. +CPPUNIT_PLUGIN_IMPLEMENT(); +CPPUNIT_TEST_SUITE_REGISTRATION(AccumulatedAckTest); + diff --git a/cpp/test/unit/qpid/broker/ConfigurationTest.cpp b/cpp/test/unit/qpid/broker/ConfigurationTest.cpp new file mode 100644 index 0000000000..1c24076ee3 --- /dev/null +++ b/cpp/test/unit/qpid/broker/ConfigurationTest.cpp @@ -0,0 +1,94 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include "qpid/broker/Configuration.h" +#include <qpid_test_plugin.h> +#include <iostream> + +using namespace std; +using namespace qpid::broker; + +class ConfigurationTest : public CppUnit::TestCase +{ + CPPUNIT_TEST_SUITE(ConfigurationTest); + CPPUNIT_TEST(testIsHelp); + CPPUNIT_TEST(testPortLongForm); + CPPUNIT_TEST(testPortShortForm); + CPPUNIT_TEST(testAcceptorLongForm); + CPPUNIT_TEST(testAcceptorShortForm); + CPPUNIT_TEST(testVarious); + CPPUNIT_TEST_SUITE_END(); + + public: + + void testIsHelp() + { + Configuration conf; + char* argv[] = {"ignore", "--help"}; + conf.parse(2, argv); + CPPUNIT_ASSERT(conf.isHelp()); + } + + void testPortLongForm() + { + Configuration conf; + char* argv[] = {"ignore", "--port", "6789"}; + conf.parse(3, argv); + CPPUNIT_ASSERT_EQUAL(6789, conf.getPort()); + } + + void testPortShortForm() + { + Configuration conf; + char* argv[] = {"ignore", "-p", "6789"}; + conf.parse(3, argv); + CPPUNIT_ASSERT_EQUAL(6789, conf.getPort()); + } + + void testAcceptorLongForm() + { + Configuration conf; + char* argv[] = {"ignore", "--acceptor", "blocking"}; + conf.parse(3, argv); + CPPUNIT_ASSERT_EQUAL(string("blocking"), conf.getAcceptor()); + } + + void testAcceptorShortForm() + { + Configuration conf; + char* argv[] = {"ignore", "-a", "blocking"}; + conf.parse(3, argv); + CPPUNIT_ASSERT_EQUAL(string("blocking"), conf.getAcceptor()); + } + + void testVarious() + { + Configuration conf; + char* argv[] = {"ignore", "-t", "--worker-threads", "10", "-a", "blocking"}; + conf.parse(6, argv); + CPPUNIT_ASSERT_EQUAL(5672, conf.getPort());//default + CPPUNIT_ASSERT_EQUAL(string("blocking"), conf.getAcceptor()); + CPPUNIT_ASSERT_EQUAL(10, conf.getWorkerThreads()); + CPPUNIT_ASSERT(conf.isTrace()); + CPPUNIT_ASSERT(!conf.isHelp()); + } +}; + +// Make this test suite a plugin. +CPPUNIT_PLUGIN_IMPLEMENT(); +CPPUNIT_TEST_SUITE_REGISTRATION(ConfigurationTest); + diff --git a/cpp/test/unit/qpid/broker/TxAckTest.cpp b/cpp/test/unit/qpid/broker/TxAckTest.cpp new file mode 100644 index 0000000000..b787c5793b --- /dev/null +++ b/cpp/test/unit/qpid/broker/TxAckTest.cpp @@ -0,0 +1,112 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include "qpid/broker/MessageStore.h" +#include "qpid/broker/TxAck.h" +#include <qpid_test_plugin.h> +#include <iostream> +#include <list> +#include <vector> + +using std::list; +using std::vector; +using namespace qpid::broker; + +class TxAckTest : public CppUnit::TestCase +{ + + class TestMessageStore : public MessageStore + { + public: + vector<Message::shared_ptr> dequeued; + + void dequeue(Message::shared_ptr& msg, const string& /*queue*/, const string * const /*xid*/) + { + dequeued.push_back(msg); + } + + //dont care about any of the other methods: + void enqueue(Message::shared_ptr&, const string&, const string * const){} + void committed(const string * const){} + void aborted(const string * const){} + void begin(){} + void commit(){} + void abort(){} + ~TestMessageStore(){} + }; + + CPPUNIT_TEST_SUITE(TxAckTest); + CPPUNIT_TEST(testPrepare); + CPPUNIT_TEST(testCommit); + CPPUNIT_TEST_SUITE_END(); + + + AccumulatedAck acked; + TestMessageStore store; + Queue::shared_ptr queue; + vector<Message::shared_ptr> messages; + list<DeliveryRecord> deliveries; + TxAck op; + + + public: + + TxAckTest() : queue(new Queue("my_queue", true, false, &store, 0)), op(acked, deliveries) + { + for(int i = 0; i < 10; i++){ + Message::shared_ptr msg(new Message(0, "exchange", "routing_key", false, false)); + messages.push_back(msg); + deliveries.push_back(DeliveryRecord(msg, queue, "xyz", (i+1))); + } + + //assume msgs 1-5, 7 and 9 are all acked (i.e. 6, 8 & 10 are not) + acked.range = 5; + acked.individual.push_back(7); + acked.individual.push_back(9); + } + + void testPrepare() + { + //ensure acked messages are discarded, i.e. dequeued from store + op.prepare(); + CPPUNIT_ASSERT_EQUAL((size_t) 7, store.dequeued.size()); + CPPUNIT_ASSERT_EQUAL((size_t) 10, deliveries.size()); + CPPUNIT_ASSERT_EQUAL(messages[0], store.dequeued[0]);//msg 1 + CPPUNIT_ASSERT_EQUAL(messages[1], store.dequeued[1]);//msg 2 + CPPUNIT_ASSERT_EQUAL(messages[2], store.dequeued[2]);//msg 3 + CPPUNIT_ASSERT_EQUAL(messages[3], store.dequeued[3]);//msg 4 + CPPUNIT_ASSERT_EQUAL(messages[4], store.dequeued[4]);//msg 5 + CPPUNIT_ASSERT_EQUAL(messages[6], store.dequeued[5]);//msg 7 + CPPUNIT_ASSERT_EQUAL(messages[8], store.dequeued[6]);//msg 9 + } + + void testCommit() + { + //emsure acked messages are removed from list + op.commit(); + CPPUNIT_ASSERT_EQUAL((size_t) 3, deliveries.size()); + list<DeliveryRecord>::iterator i = deliveries.begin(); + CPPUNIT_ASSERT(i->matches(6));//msg 6 + CPPUNIT_ASSERT((++i)->matches(8));//msg 8 + CPPUNIT_ASSERT((++i)->matches(10));//msg 10 + } +}; + +// Make this test suite a plugin. +CPPUNIT_PLUGIN_IMPLEMENT(); +CPPUNIT_TEST_SUITE_REGISTRATION(TxAckTest); + diff --git a/cpp/test/unit/qpid/broker/TxBufferTest.cpp b/cpp/test/unit/qpid/broker/TxBufferTest.cpp new file mode 100644 index 0000000000..65f6327ee4 --- /dev/null +++ b/cpp/test/unit/qpid/broker/TxBufferTest.cpp @@ -0,0 +1,183 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include "qpid/broker/TxBuffer.h" +#include <qpid_test_plugin.h> +#include <iostream> +#include <vector> + +using namespace qpid::broker; + +template <class T> void assertEqualVector(std::vector<T>& expected, std::vector<T>& actual){ + unsigned int i = 0; + while(i < expected.size() && i < actual.size()){ + CPPUNIT_ASSERT_EQUAL(expected[i], actual[i]); + i++; + } + CPPUNIT_ASSERT(i == expected.size()); + CPPUNIT_ASSERT(i == actual.size()); +} + +class TxBufferTest : public CppUnit::TestCase +{ + class MockTxOp : public TxOp{ + enum op_codes {PREPARE=2, COMMIT=4, ROLLBACK=8}; + std::vector<int> expected; + std::vector<int> actual; + bool failOnPrepare; + public: + MockTxOp() : failOnPrepare(false) {} + MockTxOp(bool _failOnPrepare) : failOnPrepare(_failOnPrepare) {} + + bool prepare() throw(){ + actual.push_back(PREPARE); + return !failOnPrepare; + } + void commit() throw(){ + actual.push_back(COMMIT); + } + void rollback() throw(){ + actual.push_back(ROLLBACK); + } + MockTxOp& expectPrepare(){ + expected.push_back(PREPARE); + return *this; + } + MockTxOp& expectCommit(){ + expected.push_back(COMMIT); + return *this; + } + MockTxOp& expectRollback(){ + expected.push_back(ROLLBACK); + return *this; + } + void check(){ + assertEqualVector(expected, actual); + } + ~MockTxOp(){} + }; + + class MockTransactionalStore : public TransactionalStore{ + enum op_codes {BEGIN=2, COMMIT=4, ABORT=8}; + std::vector<int> expected; + std::vector<int> actual; + public: + void begin(){ + actual.push_back(BEGIN); + } + void commit(){ + actual.push_back(COMMIT); + } + void abort(){ + actual.push_back(ABORT); + } + MockTransactionalStore& expectBegin(){ + expected.push_back(BEGIN); + return *this; + } + MockTransactionalStore& expectCommit(){ + expected.push_back(COMMIT); + return *this; + } + MockTransactionalStore& expectAbort(){ + expected.push_back(ABORT); + return *this; + } + void check(){ + assertEqualVector(expected, actual); + } + ~MockTransactionalStore(){} + }; + + CPPUNIT_TEST_SUITE(TxBufferTest); + CPPUNIT_TEST(testPrepareAndCommit); + CPPUNIT_TEST(testFailOnPrepare); + CPPUNIT_TEST(testRollback); + CPPUNIT_TEST_SUITE_END(); + + public: + + void testPrepareAndCommit(){ + MockTransactionalStore store; + store.expectBegin().expectCommit(); + + MockTxOp opA; + opA.expectPrepare().expectCommit(); + MockTxOp opB; + opB.expectPrepare().expectPrepare().expectCommit().expectCommit();//opB enlisted twice to test reative order + MockTxOp opC; + opC.expectPrepare().expectCommit(); + + TxBuffer buffer; + buffer.enlist(&opA); + buffer.enlist(&opB); + buffer.enlist(&opB);//opB enlisted twice + buffer.enlist(&opC); + + CPPUNIT_ASSERT(buffer.prepare(&store)); + buffer.commit(); + store.check(); + opA.check(); + opB.check(); + opC.check(); + } + + void testFailOnPrepare(){ + MockTransactionalStore store; + store.expectBegin().expectAbort(); + + MockTxOp opA; + opA.expectPrepare(); + MockTxOp opB(true); + opB.expectPrepare(); + MockTxOp opC;//will never get prepare as b will fail + + TxBuffer buffer; + buffer.enlist(&opA); + buffer.enlist(&opB); + buffer.enlist(&opC); + + CPPUNIT_ASSERT(!buffer.prepare(&store)); + store.check(); + opA.check(); + opB.check(); + opC.check(); + } + + void testRollback(){ + MockTxOp opA; + opA.expectRollback(); + MockTxOp opB(true); + opB.expectRollback(); + MockTxOp opC; + opC.expectRollback(); + + TxBuffer buffer; + buffer.enlist(&opA); + buffer.enlist(&opB); + buffer.enlist(&opC); + + buffer.rollback(); + opA.check(); + opB.check(); + opC.check(); + } +}; + +// Make this test suite a plugin. +CPPUNIT_PLUGIN_IMPLEMENT(); +CPPUNIT_TEST_SUITE_REGISTRATION(TxBufferTest); |