summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2006-12-07 12:44:08 +0000
committerGordon Sim <gsim@apache.org>2006-12-07 12:44:08 +0000
commitea25bbb6602aa35f7e862e810b99282580cd3684 (patch)
tree2625393031406ba8709bbfa515468b5f523e979e
parent6837709d617692649a22b689f7be6741dd569646 (diff)
downloadqpid-python-ea25bbb6602aa35f7e862e810b99282580cd3684.tar.gz
Added unit test and slightly refactored code.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@483437 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--cpp/lib/broker/BrokerQueue.cpp9
-rw-r--r--cpp/lib/broker/QueuePolicy.cpp24
-rw-r--r--cpp/lib/broker/QueuePolicy.h9
-rw-r--r--cpp/tests/Makefile.am1
-rw-r--r--cpp/tests/QueuePolicyTest.cpp89
5 files changed, 110 insertions, 22 deletions
diff --git a/cpp/lib/broker/BrokerQueue.cpp b/cpp/lib/broker/BrokerQueue.cpp
index a8c5343ca3..bfea1918a4 100644
--- a/cpp/lib/broker/BrokerQueue.cpp
+++ b/cpp/lib/broker/BrokerQueue.cpp
@@ -161,14 +161,19 @@ u_int32_t Queue::purge(){
}
void Queue::pop(){
- if (policy.get()) policy->dequeued(messages.front(), store);
+ if (policy.get()) policy->dequeued(messages.front()->contentSize());
messages.pop();
}
void Queue::push(Message::shared_ptr& msg){
queueing = true;
messages.push(msg);
- if (policy.get()) policy->enqueued(messages.front(), store);
+ if (policy.get()) {
+ policy->enqueued(msg->contentSize());
+ if (policy->limitExceeded()) {
+ msg->releaseContent(store);
+ }
+ }
}
u_int32_t Queue::getMessageCount() const{
diff --git a/cpp/lib/broker/QueuePolicy.cpp b/cpp/lib/broker/QueuePolicy.cpp
index 055d415226..e13fd62fc6 100644
--- a/cpp/lib/broker/QueuePolicy.cpp
+++ b/cpp/lib/broker/QueuePolicy.cpp
@@ -24,33 +24,27 @@ using namespace qpid::broker;
using namespace qpid::framing;
QueuePolicy::QueuePolicy(u_int32_t _maxCount, u_int64_t _maxSize) :
- maxCount(_maxCount), maxSize(_maxSize) {}
+ maxCount(_maxCount), maxSize(_maxSize), count(0), size(0) {}
QueuePolicy::QueuePolicy(const FieldTable& settings) :
maxCount(getInt(settings, maxCountKey, 0)),
- maxSize(getInt(settings, maxSizeKey, 0)) {}
+ maxSize(getInt(settings, maxSizeKey, 0)), count(0), size(0) {}
-void QueuePolicy::enqueued(Message::shared_ptr& msg, MessageStore* store)
+void QueuePolicy::enqueued(u_int64_t _size)
{
- if (checkCount(msg) || checkSize(msg)) {
- msg->releaseContent(store);
- }
+ if (maxCount) count++;
+ if (maxSize) size += _size;
}
-void QueuePolicy::dequeued(Message::shared_ptr& msg, MessageStore* /*store*/)
+void QueuePolicy::dequeued(u_int64_t _size)
{
if (maxCount) count--;
- if (maxSize) size -= msg->contentSize();
-}
-
-bool QueuePolicy::checkCount(Message::shared_ptr& /*msg*/)
-{
- return maxCount && ++count > maxCount;
+ if (maxSize) size -= _size;
}
-bool QueuePolicy::checkSize(Message::shared_ptr& msg)
+bool QueuePolicy::limitExceeded()
{
- return maxSize && (size += msg->contentSize()) > maxSize;
+ return (maxSize && size > maxSize) || (maxCount && count > maxCount);
}
void QueuePolicy::update(FieldTable& settings)
diff --git a/cpp/lib/broker/QueuePolicy.h b/cpp/lib/broker/QueuePolicy.h
index c31e9ec968..597cfe7ce8 100644
--- a/cpp/lib/broker/QueuePolicy.h
+++ b/cpp/lib/broker/QueuePolicy.h
@@ -21,7 +21,6 @@
#ifndef _QueuePolicy_
#define _QueuePolicy_
-#include <BrokerMessage.h>
#include <FieldTable.h>
namespace qpid {
@@ -37,14 +36,14 @@ namespace qpid {
u_int64_t size;
static int getInt(const qpid::framing::FieldTable& settings, const std::string& key, int defaultValue);
- bool checkCount(Message::shared_ptr& msg);
- bool checkSize(Message::shared_ptr& msg);
+
public:
QueuePolicy(u_int32_t maxCount, u_int64_t maxSize);
QueuePolicy(const qpid::framing::FieldTable& settings);
- void enqueued(Message::shared_ptr& msg, MessageStore* store);
- void dequeued(Message::shared_ptr& msg, MessageStore* store);
+ void enqueued(u_int64_t size);
+ void dequeued(u_int64_t size);
void update(qpid::framing::FieldTable& settings);
+ bool limitExceeded();
u_int32_t getMaxCount() const { return maxCount; }
u_int64_t getMaxSize() const { return maxSize; }
};
diff --git a/cpp/tests/Makefile.am b/cpp/tests/Makefile.am
index d5cd60a831..e52a3512f0 100644
--- a/cpp/tests/Makefile.am
+++ b/cpp/tests/Makefile.am
@@ -36,6 +36,7 @@ broker_tests = \
MessageTest \
QueueRegistryTest \
QueueTest \
+ QueuePolicyTest \
TopicExchangeTest \
TxAckTest \
TxBufferTest \
diff --git a/cpp/tests/QueuePolicyTest.cpp b/cpp/tests/QueuePolicyTest.cpp
new file mode 100644
index 0000000000..0ae0d2f7bc
--- /dev/null
+++ b/cpp/tests/QueuePolicyTest.cpp
@@ -0,0 +1,89 @@
+ /*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <QueuePolicy.h>
+#include <qpid_test_plugin.h>
+
+using namespace qpid::broker;
+using namespace qpid::framing;
+
+class QueuePolicyTest : public CppUnit::TestCase
+{
+ CPPUNIT_TEST_SUITE(QueuePolicyTest);
+ CPPUNIT_TEST(testCount);
+ CPPUNIT_TEST(testSize);
+ CPPUNIT_TEST(testBoth);
+ CPPUNIT_TEST(testSettings);
+ CPPUNIT_TEST_SUITE_END();
+
+ public:
+ void testCount(){
+ QueuePolicy policy(5, 0);
+ CPPUNIT_ASSERT(!policy.limitExceeded());
+ for (int i = 0; i < 5; i++) policy.enqueued(10);
+ CPPUNIT_ASSERT_EQUAL((u_int64_t) 0, policy.getMaxSize());
+ CPPUNIT_ASSERT_EQUAL((u_int32_t) 5, policy.getMaxCount());
+ CPPUNIT_ASSERT(!policy.limitExceeded());
+ policy.enqueued(10);
+ CPPUNIT_ASSERT(policy.limitExceeded());
+ policy.dequeued(10);
+ CPPUNIT_ASSERT(!policy.limitExceeded());
+ policy.enqueued(10);
+ CPPUNIT_ASSERT(policy.limitExceeded());
+ }
+
+ void testSize(){
+ QueuePolicy policy(0, 50);
+ for (int i = 0; i < 5; i++) policy.enqueued(10);
+ CPPUNIT_ASSERT(!policy.limitExceeded());
+ policy.enqueued(10);
+ CPPUNIT_ASSERT(policy.limitExceeded());
+ policy.dequeued(10);
+ CPPUNIT_ASSERT(!policy.limitExceeded());
+ policy.enqueued(10);
+ CPPUNIT_ASSERT(policy.limitExceeded());
+ }
+
+ void testBoth(){
+ QueuePolicy policy(5, 50);
+ for (int i = 0; i < 5; i++) policy.enqueued(11);
+ CPPUNIT_ASSERT(policy.limitExceeded());
+ policy.dequeued(20);
+ CPPUNIT_ASSERT(!policy.limitExceeded());//fails
+ policy.enqueued(5);
+ policy.enqueued(10);
+ CPPUNIT_ASSERT(policy.limitExceeded());
+ }
+
+ void testSettings(){
+ //test reading and writing the policy from/to field table
+ FieldTable settings;
+ QueuePolicy a(101, 303);
+ a.update(settings);
+ QueuePolicy b(settings);
+ CPPUNIT_ASSERT_EQUAL(a.getMaxCount(), b.getMaxCount());
+ CPPUNIT_ASSERT_EQUAL(a.getMaxSize(), b.getMaxSize());
+ }
+};
+
+// Make this test suite a plugin.
+CPPUNIT_PLUGIN_IMPLEMENT();
+CPPUNIT_TEST_SUITE_REGISTRATION(QueuePolicyTest);
+