summaryrefslogtreecommitdiff
path: root/cpp
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2006-10-31 15:38:36 +0000
committerGordon Sim <gsim@apache.org>2006-10-31 15:38:36 +0000
commit1150be6d66a943d899e25af4cb876e7f68c657d9 (patch)
tree582b89c2b738b66255deecbacf089a91c07d3348 /cpp
parent2c78ceb1faaad9c9e57ad7c815ceea82f9ff15a1 (diff)
downloadqpid-python-1150be6d66a943d899e25af4cb876e7f68c657d9.tar.gz
Added doc & unit tests.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@469530 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
-rw-r--r--cpp/src/qpid/broker/AccumulatedAck.cpp2
-rw-r--r--cpp/src/qpid/broker/TxPublish.h10
-rw-r--r--cpp/test/unit/qpid/broker/AccumulatedAckTest.cpp25
-rw-r--r--cpp/test/unit/qpid/broker/TxPublishTest.cpp103
4 files changed, 139 insertions, 1 deletions
diff --git a/cpp/src/qpid/broker/AccumulatedAck.cpp b/cpp/src/qpid/broker/AccumulatedAck.cpp
index 060e940309..84ddcee556 100644
--- a/cpp/src/qpid/broker/AccumulatedAck.cpp
+++ b/cpp/src/qpid/broker/AccumulatedAck.cpp
@@ -25,7 +25,7 @@ void AccumulatedAck::update(u_int64_t tag, bool multiple){
if(multiple){
if(tag > range) range = tag;
//else don't care, it is already counted
- }else if(tag < range){
+ }else if(tag > range){
individual.push_back(tag);
}
}
diff --git a/cpp/src/qpid/broker/TxPublish.h b/cpp/src/qpid/broker/TxPublish.h
index 01bb573fe2..5076e0f56f 100644
--- a/cpp/src/qpid/broker/TxPublish.h
+++ b/cpp/src/qpid/broker/TxPublish.h
@@ -29,6 +29,16 @@
namespace qpid {
namespace broker {
+ /**
+ * Defines the behaviour for publish operations on a
+ * transactional channel. Messages are routed through
+ * exchanges when received but are not at that stage delivered
+ * to the matching queues, rather the queues are held in an
+ * instance of this class. On prepare() the message is marked
+ * enqueued to the relevant queues in the MessagesStore. On
+ * commit() the messages will be passed to the queue for
+ * dispatch or to be added to the in-memory queue.
+ */
class TxPublish : public TxOp, public Deliverable{
class Prepare{
Message::shared_ptr& msg;
diff --git a/cpp/test/unit/qpid/broker/AccumulatedAckTest.cpp b/cpp/test/unit/qpid/broker/AccumulatedAckTest.cpp
index a4459cf0c2..6cbdaacc32 100644
--- a/cpp/test/unit/qpid/broker/AccumulatedAckTest.cpp
+++ b/cpp/test/unit/qpid/broker/AccumulatedAckTest.cpp
@@ -27,6 +27,7 @@ class AccumulatedAckTest : public CppUnit::TestCase
{
CPPUNIT_TEST_SUITE(AccumulatedAckTest);
CPPUNIT_TEST(testCovers);
+ CPPUNIT_TEST(testUpdateAndConsolidate);
CPPUNIT_TEST_SUITE_END();
public:
@@ -49,6 +50,30 @@ class AccumulatedAckTest : public CppUnit::TestCase
CPPUNIT_ASSERT(!ack.covers(8));
CPPUNIT_ASSERT(!ack.covers(10));
}
+
+ void testUpdateAndConsolidate()
+ {
+ AccumulatedAck ack;
+ ack.clear();
+ ack.update(1, false);
+ ack.update(3, false);
+ ack.update(10, false);
+ ack.update(8, false);
+ ack.update(6, false);
+ ack.update(3, true);
+ ack.update(2, true);
+ ack.update(5, true);
+ ack.consolidate();
+ CPPUNIT_ASSERT_EQUAL((u_int64_t) 5, ack.range);
+ CPPUNIT_ASSERT_EQUAL((size_t) 3, ack.individual.size());
+ list<u_int64_t>::iterator i = ack.individual.begin();
+ CPPUNIT_ASSERT_EQUAL((u_int64_t) 6, *i);
+ i++;
+ CPPUNIT_ASSERT_EQUAL((u_int64_t) 8, *i);
+ i++;
+ CPPUNIT_ASSERT_EQUAL((u_int64_t) 10, *i);
+
+ }
};
// Make this test suite a plugin.
diff --git a/cpp/test/unit/qpid/broker/TxPublishTest.cpp b/cpp/test/unit/qpid/broker/TxPublishTest.cpp
new file mode 100644
index 0000000000..b8d3c99cb9
--- /dev/null
+++ b/cpp/test/unit/qpid/broker/TxPublishTest.cpp
@@ -0,0 +1,103 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include "qpid/broker/MessageStore.h"
+#include "qpid/broker/TxPublish.h"
+#include <qpid_test_plugin.h>
+#include <iostream>
+#include <list>
+#include <vector>
+
+using std::list;
+using std::pair;
+using std::vector;
+using namespace qpid::broker;
+
+class TxPublishTest : public CppUnit::TestCase
+{
+
+ class TestMessageStore : public MessageStore
+ {
+ public:
+ vector< pair<string, Message::shared_ptr> > enqueued;
+
+ void enqueue(Message::shared_ptr& msg, const string& queue, const string * const /*xid*/)
+ {
+ enqueued.push_back(pair<string, Message::shared_ptr>(queue,msg));
+ }
+
+ //dont care about any of the other methods:
+ void dequeue(Message::shared_ptr&, const string&, const string * const){}
+ void committed(const string * const){}
+ void aborted(const string * const){}
+ void begin(){}
+ void commit(){}
+ void abort(){}
+ ~TestMessageStore(){}
+ };
+
+ CPPUNIT_TEST_SUITE(TxPublishTest);
+ CPPUNIT_TEST(testPrepare);
+ CPPUNIT_TEST(testCommit);
+ CPPUNIT_TEST_SUITE_END();
+
+
+ TestMessageStore store;
+ Queue::shared_ptr queue1;
+ Queue::shared_ptr queue2;
+ Message::shared_ptr msg;
+ TxPublish op;
+
+
+ public:
+
+ TxPublishTest() : queue1(new Queue("queue1", true, false, &store, 0)),
+ queue2(new Queue("queue2", true, false, &store, 0)),
+ msg(new Message(0, "exchange", "routing_key", false, false)),
+ op(msg)
+ {
+ op.deliverTo(queue1);
+ op.deliverTo(queue2);
+ }
+
+ void testPrepare()
+ {
+ //ensure messages are enqueued in store
+ op.prepare();
+ CPPUNIT_ASSERT_EQUAL((size_t) 2, store.enqueued.size());
+ CPPUNIT_ASSERT_EQUAL(string("queue1"), store.enqueued[0].first);
+ CPPUNIT_ASSERT_EQUAL(msg, store.enqueued[0].second);
+ CPPUNIT_ASSERT_EQUAL(string("queue2"), store.enqueued[1].first);
+ CPPUNIT_ASSERT_EQUAL(msg, store.enqueued[1].second);
+ }
+
+ void testCommit()
+ {
+ //ensure messages are delivered to queue
+ op.commit();
+ CPPUNIT_ASSERT_EQUAL((u_int32_t) 1, queue1->getMessageCount());
+ CPPUNIT_ASSERT_EQUAL(msg, queue1->dequeue());
+
+ CPPUNIT_ASSERT_EQUAL((u_int32_t) 1, queue2->getMessageCount());
+ CPPUNIT_ASSERT_EQUAL(msg, queue2->dequeue());
+ }
+};
+
+// Make this test suite a plugin.
+CPPUNIT_PLUGIN_IMPLEMENT();
+CPPUNIT_TEST_SUITE_REGISTRATION(TxPublishTest);
+