diff options
author | Gordon Sim <gsim@apache.org> | 2006-10-31 15:38:36 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2006-10-31 15:38:36 +0000 |
commit | 1150be6d66a943d899e25af4cb876e7f68c657d9 (patch) | |
tree | 582b89c2b738b66255deecbacf089a91c07d3348 /cpp | |
parent | 2c78ceb1faaad9c9e57ad7c815ceea82f9ff15a1 (diff) | |
download | qpid-python-1150be6d66a943d899e25af4cb876e7f68c657d9.tar.gz |
Added doc & unit tests.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@469530 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
-rw-r--r-- | cpp/src/qpid/broker/AccumulatedAck.cpp | 2 | ||||
-rw-r--r-- | cpp/src/qpid/broker/TxPublish.h | 10 | ||||
-rw-r--r-- | cpp/test/unit/qpid/broker/AccumulatedAckTest.cpp | 25 | ||||
-rw-r--r-- | cpp/test/unit/qpid/broker/TxPublishTest.cpp | 103 |
4 files changed, 139 insertions, 1 deletions
diff --git a/cpp/src/qpid/broker/AccumulatedAck.cpp b/cpp/src/qpid/broker/AccumulatedAck.cpp index 060e940309..84ddcee556 100644 --- a/cpp/src/qpid/broker/AccumulatedAck.cpp +++ b/cpp/src/qpid/broker/AccumulatedAck.cpp @@ -25,7 +25,7 @@ void AccumulatedAck::update(u_int64_t tag, bool multiple){ if(multiple){ if(tag > range) range = tag; //else don't care, it is already counted - }else if(tag < range){ + }else if(tag > range){ individual.push_back(tag); } } diff --git a/cpp/src/qpid/broker/TxPublish.h b/cpp/src/qpid/broker/TxPublish.h index 01bb573fe2..5076e0f56f 100644 --- a/cpp/src/qpid/broker/TxPublish.h +++ b/cpp/src/qpid/broker/TxPublish.h @@ -29,6 +29,16 @@ namespace qpid { namespace broker { + /** + * Defines the behaviour for publish operations on a + * transactional channel. Messages are routed through + * exchanges when received but are not at that stage delivered + * to the matching queues, rather the queues are held in an + * instance of this class. On prepare() the message is marked + * enqueued to the relevant queues in the MessagesStore. On + * commit() the messages will be passed to the queue for + * dispatch or to be added to the in-memory queue. + */ class TxPublish : public TxOp, public Deliverable{ class Prepare{ Message::shared_ptr& msg; diff --git a/cpp/test/unit/qpid/broker/AccumulatedAckTest.cpp b/cpp/test/unit/qpid/broker/AccumulatedAckTest.cpp index a4459cf0c2..6cbdaacc32 100644 --- a/cpp/test/unit/qpid/broker/AccumulatedAckTest.cpp +++ b/cpp/test/unit/qpid/broker/AccumulatedAckTest.cpp @@ -27,6 +27,7 @@ class AccumulatedAckTest : public CppUnit::TestCase { CPPUNIT_TEST_SUITE(AccumulatedAckTest); CPPUNIT_TEST(testCovers); + CPPUNIT_TEST(testUpdateAndConsolidate); CPPUNIT_TEST_SUITE_END(); public: @@ -49,6 +50,30 @@ class AccumulatedAckTest : public CppUnit::TestCase CPPUNIT_ASSERT(!ack.covers(8)); CPPUNIT_ASSERT(!ack.covers(10)); } + + void testUpdateAndConsolidate() + { + AccumulatedAck ack; + ack.clear(); + ack.update(1, false); + ack.update(3, false); + ack.update(10, false); + ack.update(8, false); + ack.update(6, false); + ack.update(3, true); + ack.update(2, true); + ack.update(5, true); + ack.consolidate(); + CPPUNIT_ASSERT_EQUAL((u_int64_t) 5, ack.range); + CPPUNIT_ASSERT_EQUAL((size_t) 3, ack.individual.size()); + list<u_int64_t>::iterator i = ack.individual.begin(); + CPPUNIT_ASSERT_EQUAL((u_int64_t) 6, *i); + i++; + CPPUNIT_ASSERT_EQUAL((u_int64_t) 8, *i); + i++; + CPPUNIT_ASSERT_EQUAL((u_int64_t) 10, *i); + + } }; // Make this test suite a plugin. diff --git a/cpp/test/unit/qpid/broker/TxPublishTest.cpp b/cpp/test/unit/qpid/broker/TxPublishTest.cpp new file mode 100644 index 0000000000..b8d3c99cb9 --- /dev/null +++ b/cpp/test/unit/qpid/broker/TxPublishTest.cpp @@ -0,0 +1,103 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include "qpid/broker/MessageStore.h" +#include "qpid/broker/TxPublish.h" +#include <qpid_test_plugin.h> +#include <iostream> +#include <list> +#include <vector> + +using std::list; +using std::pair; +using std::vector; +using namespace qpid::broker; + +class TxPublishTest : public CppUnit::TestCase +{ + + class TestMessageStore : public MessageStore + { + public: + vector< pair<string, Message::shared_ptr> > enqueued; + + void enqueue(Message::shared_ptr& msg, const string& queue, const string * const /*xid*/) + { + enqueued.push_back(pair<string, Message::shared_ptr>(queue,msg)); + } + + //dont care about any of the other methods: + void dequeue(Message::shared_ptr&, const string&, const string * const){} + void committed(const string * const){} + void aborted(const string * const){} + void begin(){} + void commit(){} + void abort(){} + ~TestMessageStore(){} + }; + + CPPUNIT_TEST_SUITE(TxPublishTest); + CPPUNIT_TEST(testPrepare); + CPPUNIT_TEST(testCommit); + CPPUNIT_TEST_SUITE_END(); + + + TestMessageStore store; + Queue::shared_ptr queue1; + Queue::shared_ptr queue2; + Message::shared_ptr msg; + TxPublish op; + + + public: + + TxPublishTest() : queue1(new Queue("queue1", true, false, &store, 0)), + queue2(new Queue("queue2", true, false, &store, 0)), + msg(new Message(0, "exchange", "routing_key", false, false)), + op(msg) + { + op.deliverTo(queue1); + op.deliverTo(queue2); + } + + void testPrepare() + { + //ensure messages are enqueued in store + op.prepare(); + CPPUNIT_ASSERT_EQUAL((size_t) 2, store.enqueued.size()); + CPPUNIT_ASSERT_EQUAL(string("queue1"), store.enqueued[0].first); + CPPUNIT_ASSERT_EQUAL(msg, store.enqueued[0].second); + CPPUNIT_ASSERT_EQUAL(string("queue2"), store.enqueued[1].first); + CPPUNIT_ASSERT_EQUAL(msg, store.enqueued[1].second); + } + + void testCommit() + { + //ensure messages are delivered to queue + op.commit(); + CPPUNIT_ASSERT_EQUAL((u_int32_t) 1, queue1->getMessageCount()); + CPPUNIT_ASSERT_EQUAL(msg, queue1->dequeue()); + + CPPUNIT_ASSERT_EQUAL((u_int32_t) 1, queue2->getMessageCount()); + CPPUNIT_ASSERT_EQUAL(msg, queue2->dequeue()); + } +}; + +// Make this test suite a plugin. +CPPUNIT_PLUGIN_IMPLEMENT(); +CPPUNIT_TEST_SUITE_REGISTRATION(TxPublishTest); + |