summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKenneth Anthony Giusti <kgiusti@apache.org>2011-09-01 21:34:50 +0000
committerKenneth Anthony Giusti <kgiusti@apache.org>2011-09-01 21:34:50 +0000
commit03aeada7f07e5c3492578f3278df16919cbc0b9a (patch)
tree878ddf49f3f381ee0fd2dfb932f57f88fc64a721
parent5d6f0702236ceb40e0118a20021caa1df36e5afb (diff)
downloadqpid-python-03aeada7f07e5c3492578f3278df16919cbc0b9a.tar.gz
QPID-3346: added some functional tests.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-3346@1164284 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/tests/Makefile.am8
-rw-r--r--qpid/cpp/src/tests/msg_group_test.cpp77
-rwxr-xr-xqpid/cpp/src/tests/run_msg_group_tests46
-rwxr-xr-xqpid/cpp/src/tests/run_msg_group_tests_soak60
4 files changed, 153 insertions, 38 deletions
diff --git a/qpid/cpp/src/tests/Makefile.am b/qpid/cpp/src/tests/Makefile.am
index e0b0837701..dceabe36cc 100644
--- a/qpid/cpp/src/tests/Makefile.am
+++ b/qpid/cpp/src/tests/Makefile.am
@@ -358,7 +358,10 @@ CLEANFILES+=valgrind.out *.log *.vglog* dummy_test qpidd.port $(unit_wrappers)
# Longer running stability tests, not run by default check: target.
# Not run under valgrind, too slow
-LONG_TESTS+=start_broker fanout_perftest shared_perftest multiq_perftest topic_perftest run_ring_queue_test stop_broker \
+LONG_TESTS+=start_broker \
+ fanout_perftest shared_perftest multiq_perftest topic_perftest run_ring_queue_test \
+ run_msg_groups_tests_soak \
+ stop_broker \
run_failover_soak reliable_replication_test \
federated_cluster_test_with_node_failure
@@ -370,7 +373,8 @@ EXTRA_DIST+= \
run_failover_soak \
reliable_replication_test \
federated_cluster_test_with_node_failure \
- sasl_test_setup.sh
+ sasl_test_setup.sh \
+ run_msg_groups_tests_soak
check-long:
$(MAKE) check TESTS="$(LONG_TESTS)" VALGRIND=
diff --git a/qpid/cpp/src/tests/msg_group_test.cpp b/qpid/cpp/src/tests/msg_group_test.cpp
index f1636bc0d8..3a458ad569 100644
--- a/qpid/cpp/src/tests/msg_group_test.cpp
+++ b/qpid/cpp/src/tests/msg_group_test.cpp
@@ -29,6 +29,7 @@
#include <qpid/Options.h>
#include <qpid/log/Logger.h>
#include <qpid/log/Options.h>
+#include "qpid/log/Statement.h"
#include "qpid/sys/Time.h"
#include "qpid/sys/Runnable.h"
#include "qpid/sys/Thread.h"
@@ -65,6 +66,7 @@ struct Options : public qpid::Options
bool allowDuplicates;
bool randomizeSize;
bool stickyConsumer;
+ uint timeout;
Options(const std::string& argv0=std::string())
: qpid::Options("Options"),
@@ -83,7 +85,8 @@ struct Options : public qpid::Options
durable(false),
allowDuplicates(false),
randomizeSize(false),
- stickyConsumer(false)
+ stickyConsumer(false),
+ timeout(10)
{
addOptions()
("ack-frequency", qpid::optValue(ackFrequency, "N"), "Ack frequency (0 implies none of the messages will get accepted)")
@@ -101,7 +104,8 @@ struct Options : public qpid::Options
("randomize-group-size", qpid::optValue(randomizeSize), "Randomize the number of messages per group to [1...group-size].")
("senders,s", qpid::optValue(senders, "N"), "Number of message producers.")
("sticky-consumers", qpid::optValue(stickyConsumer), "If set, verify that all messages in a group are consumed by the same client [TBD].")
- ("print-report", qpid::optValue(printReport, "yes|no"), "Dump message group statistics to stdout.")
+ ("timeout", qpid::optValue(timeout, "N"), "Fail with a stall error should all consumers remain idle for timeout seconds.")
+ ("print-report", qpid::optValue(printReport), "Dump message group statistics to stdout.")
("help", qpid::optValue(help), "print this usage statement");
add(log);
//("check-redelivered", qpid::optValue(checkRedelivered), "Fails with exception if a duplicate is not marked as redelivered (only relevant when ignore-duplicates is selected)")
@@ -140,8 +144,9 @@ class GroupChecker
{
qpid::sys::Mutex lock;
- const uint totalMsgsPublished;
+ const uint totalMsgs;
uint totalMsgsConsumed;
+ uint totalMsgsPublished;
bool allowDuplicates;
uint duplicateMsgs;
@@ -157,7 +162,7 @@ class GroupChecker
public:
GroupChecker( uint t, bool d ) :
- totalMsgsPublished(t), totalMsgsConsumed(0), allowDuplicates(d),
+ totalMsgs(t), totalMsgsConsumed(0), totalMsgsPublished(0), allowDuplicates(d),
duplicateMsgs(0) {}
bool checkSequence( const std::string& groupId,
@@ -165,6 +170,8 @@ public:
{
qpid::sys::Mutex::ScopedLock l(lock);
+ QPID_LOG(debug, "Client " << client << " has received " << groupId << ":" << sequence);
+
GroupStatistics::iterator gs = statistics.find(groupId);
if (gs == statistics.end()) {
statistics[groupId][client] = 1;
@@ -176,19 +183,33 @@ public:
if (s == sequenceMap.end()) {
sequenceMap[groupId] = 1;
totalMsgsConsumed++;
+ QPID_LOG(debug, "Client " << client << " thinks this is the first message from group " << groupId << ":" << sequence);
return sequence == 0;
}
if (sequence < s->second) {
duplicateMsgs++;
+ QPID_LOG(debug, "Client " << client << " thinks this message is a duplicate! " << groupId << ":" << sequence);
return allowDuplicates;
}
totalMsgsConsumed++;
return sequence == s->second++;
}
- bool eraseGroup( const std::string& groupId )
+ void sendingSequence( const std::string& groupId,
+ uint sequence, bool eos,
+ const std::string& client )
{
qpid::sys::Mutex::ScopedLock l(lock);
+ ++totalMsgsPublished;
+
+ QPID_LOG(debug, "Client " << client << " sending " << groupId << ":" << sequence <<
+ ((eos) ? " (last)" : ""));
+ }
+
+ bool eraseGroup( const std::string& groupId, const std::string& name )
+ {
+ qpid::sys::Mutex::ScopedLock l(lock);
+ QPID_LOG(debug, "Deleting group " << groupId << " (by client " << name << ")");
return sequenceMap.erase( groupId ) == 1;
}
@@ -201,7 +222,7 @@ public:
bool allMsgsConsumed() // true when done processing msgs
{
qpid::sys::Mutex::ScopedLock l(lock);
- return totalMsgsConsumed == totalMsgsPublished;
+ return totalMsgsConsumed == totalMsgs;
}
uint getConsumedTotal()
@@ -210,6 +231,12 @@ public:
return totalMsgsConsumed;
}
+ uint getPublishedTotal()
+ {
+ qpid::sys::Mutex::ScopedLock l(lock);
+ return totalMsgsPublished;
+ }
+
ostream& print(ostream& out)
{
qpid::sys::Mutex::ScopedLock l(lock);
@@ -314,7 +341,7 @@ public:
testFailed( msg.str() );
break;
} else if (eof) {
- if (!checker.eraseGroup( groupId )) {
+ if (!checker.eraseGroup( groupId, name )) {
ostringstream msg;
msg << "Erase group failed. Group=" << groupId << " rcvd seq=" << groupSeq;
testFailed( msg.str() );
@@ -347,8 +374,10 @@ public:
class Producer : public Client
{
+ GroupChecker& checker;
+
public:
- Producer(const std::string& n, const Options& o) : Client(n, o) {};
+ Producer(const std::string& n, const Options& o, GroupChecker& c) : Client(n, o), checker(c) {};
virtual ~Producer() {};
void run()
@@ -367,7 +396,7 @@ public:
uint groupSeq = 0;
uint groupSize = opts.groupSize;
ostringstream group;
- group << name << sent;
+ group << name << ":" << sent;
std::string groupId(group.str());
while (!stopped && sent < opts.messages) {
@@ -375,11 +404,12 @@ public:
msg.getProperties()[opts.groupKey] = groupId;
msg.getProperties()[SN] = groupSeq++;
msg.getProperties()[EOS] = false;
+ checker.sendingSequence( groupId, groupSeq-1, (groupSeq == groupSize), name );
if (groupSeq == groupSize) {
msg.getProperties()[EOS] = true;
// generate new group
ostringstream nextGroupId;
- nextGroupId << name << sent;
+ nextGroupId << name << ":" << sent;
groupId = nextGroupId.str();
groupSeq = 0;
if (opts.randomizeSize) {
@@ -424,7 +454,7 @@ int main(int argc, char ** argv)
for (size_t j = 0; j < opts.senders; ++j) {
ostringstream name;
name << "P_" << j;
- clients.push_back(Client::shared_ptr(new Producer( name.str(), opts )));
+ clients.push_back(Client::shared_ptr(new Producer( name.str(), opts, state )));
clients.back()->getThread() = qpid::sys::Thread(*clients.back());
}
for (size_t j = 0; j < opts.receivers; ++j) {
@@ -435,11 +465,11 @@ int main(int argc, char ** argv)
}
// wait for all pubs/subs to finish.... or for consumers to fail or stall.
- uint lastCount;
+ uint stalledTime = 0;
bool done;
bool clientFailed = false;
do {
- lastCount = state.getConsumedTotal();
+ uint lastCount = state.getConsumedTotal();
qpid::sys::usleep( 1000000 );
// check each client for status
@@ -450,16 +480,31 @@ int main(int argc, char ** argv)
std::cerr << argv[0] << ": test failed with client error: " << (*i)->getErrorMsg() << std::endl;
clientFailed = true;
done = true;
- break;
+ break; // exit test.
} else if ((*i)->getState() != Client::DONE) {
done = false;
}
}
- } while (!done && lastCount != state.getConsumedTotal());
+
+ if (!done) {
+ // check that consumers are still receiving messages
+ if (lastCount == state.getConsumedTotal())
+ stalledTime++;
+ else {
+ lastCount = state.getConsumedTotal();
+ stalledTime = 0;
+ }
+ }
+
+ QPID_LOG(debug, "Consumed to date = " << state.getConsumedTotal() <<
+ " Published to date = " << state.getPublishedTotal() <<
+ " total=" << opts.senders * opts.messages );
+
+ } while (!done && stalledTime < opts.timeout);
if (clientFailed) {
status = 1;
- } else if (!state.allMsgsConsumed()) {
+ } else if (stalledTime >= opts.timeout) {
std::cerr << argv[0] << ": test failed due to stalled consumer." << std::endl;
status = 2;
}
diff --git a/qpid/cpp/src/tests/run_msg_group_tests b/qpid/cpp/src/tests/run_msg_group_tests
index 6c429939bf..cae614ccff 100755
--- a/qpid/cpp/src/tests/run_msg_group_tests
+++ b/qpid/cpp/src/tests/run_msg_group_tests
@@ -18,9 +18,9 @@
# specific language governing permissions and limitations
# under the License.
#
-#script to run a sequence of ring queue tests via make
+#script to run a sequence of message group queue tests via make
-#setup path to find qpid-config and sender/receiver test progs
+#setup path to find qpid-config and msg_group_test progs
source ./test_env.sh
export PATH=$PWD:$srcdir:$PYTHON_COMMANDS:$PATH
@@ -28,33 +28,39 @@ export PATH=$PWD:$srcdir:$PYTHON_COMMANDS:$PATH
#set port to connect to via env var
test -s qpidd.port && QPID_PORT=`cat qpidd.port`
-trap cleanup INT TERM QUIT
+#trap cleanup INT TERM QUIT
QUEUE_NAME="group-queue"
GROUP_KEY="My-Group-Id"
-BROKER_URL="-a ${QPID_BROKER:-localhost}:${QPID_PORT:-5672}"
+BROKER_URL="${QPID_BROKER:-localhost}:${QPID_PORT:-5672}"
-setup() {
- qpid-config $BROKER_URL add queue $QUEUE_NAME --argument="qpid.group_header_key=${GROUP_KEY}"
+run_test() {
+ $@
}
-cleanup() {
- qpid-config $BROKER_URL del queue $QUEUE_NAME --force
-}
+##set -x
-run_test() {
+declare -i i=0
+declare -a tests
+tests=("qpid-config -a $BROKER_URL add queue $QUEUE_NAME --argument=qpid.group_header_key=${GROUP_KEY}"
+ "msg_group_test -b $BROKER_URL -a $QUEUE_NAME --group-key $GROUP_KEY --messages 103 --group-size 13 --receivers 2 --senders 3 --capacity 3 --ack-frequency 7 --randomize-group-size"
+ "msg_group_test -b $BROKER_URL -a $QUEUE_NAME --group-key $GROUP_KEY --messages 103 --group-size 13 --receivers 2 --senders 3 --capacity 7 --ack-frequency 7 --randomize-group-size"
+ "qpid-config -a $BROKER_URL add queue ${QUEUE_NAME}-two --argument=qpid.group_header_key=${GROUP_KEY}"
+ "msg_group_test -b $BROKER_URL -a $QUEUE_NAME --group-key $GROUP_KEY --messages 103 --group-size 13 --receivers 2 --senders 3 --capacity 7 --ack-frequency 3 --randomize-group-size"
+ "msg_group_test -b $BROKER_URL -a ${QUEUE_NAME}-two --group-key $GROUP_KEY --messages 103 --group-size 13 --receivers 2 --senders 3 --capacity 3 --ack-frequency 7 --randomize-group-size"
+ "msg_group_test -b $BROKER_URL -a $QUEUE_NAME --group-key $GROUP_KEY --messages 59 --group-size 5 --receivers 2 --senders 3 --capacity 1 --ack-frequency 3 --randomize-group-size"
+ "qpid-config -a $BROKER_URL del queue ${QUEUE_NAME}-two --force"
+ "msg_group_test -b $BROKER_URL -a $QUEUE_NAME --group-key $GROUP_KEY --messages 59 --group-size 3 --receivers 2 --senders 3 --capacity 1 --ack-frequency 1 --randomize-group-size"
+ "msg_group_test -b $BROKER_URL -a $QUEUE_NAME --group-key $GROUP_KEY --messages 103 --group-size 13 --receivers 2 --senders 3 --capacity 47 --ack-frequency 79"
+ "qpid-config -a $BROKER_URL del queue $QUEUE_NAME --force")
- msg_group_test -a $QUEUE_NAME --group-key $GROUP_KEY --capacity 3 --group-size 13 --ack-frequency 7 --messages 103 --receivers 3 --senders 5
+while [ -n "${tests[i]}" ]; do
+ run_test ${tests[i]}
RETCODE=$?
- cleanup
-
if test x$RETCODE != x0; then
- echo "FAIL message group tests"; exit 1;
+ echo "FAILED message group test. Failed command: \"${tests[i]}\"";
+ exit 1;
fi
-}
-
-setup
-run_test
-
-
+ i+=1
+done
diff --git a/qpid/cpp/src/tests/run_msg_group_tests_soak b/qpid/cpp/src/tests/run_msg_group_tests_soak
new file mode 100755
index 0000000000..4d288758bd
--- /dev/null
+++ b/qpid/cpp/src/tests/run_msg_group_tests_soak
@@ -0,0 +1,60 @@
+#!/bin/bash
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#script to run a sequence of long-running message group tests via make
+
+#setup path to find qpid-config and msg_group_test test progs
+source ./test_env.sh
+
+export PATH=$PWD:$srcdir:$PYTHON_COMMANDS:$PATH
+
+#set port to connect to via env var
+test -s qpidd.port && QPID_PORT=`cat qpidd.port`
+
+#trap cleanup INT TERM QUIT
+
+QUEUE_NAME="group-queue"
+GROUP_KEY="My-Group-Id"
+
+BROKER_URL="${QPID_BROKER:-localhost}:${QPID_PORT:-5672}"
+
+run_test() {
+ $@
+}
+
+##set -x
+
+declare -i i=0
+declare -a tests
+tests=("qpid-config -a $BROKER_URL add queue $QUEUE_NAME --argument=qpid.group_header_key=${GROUP_KEY}"
+ "msg_group_test -b $BROKER_URL -a $QUEUE_NAME --group-key $GROUP_KEY --messages 10007 --receivers 3 --senders 5 --group-size 211 --randomize-group-size --capacity 47 --ack-frequency 97"
+ "msg_group_test -b $BROKER_URL -a $QUEUE_NAME --group-key $GROUP_KEY --messages 10007 --receivers 3 --senders 5 --group-size 211 --randomize-group-size --capacity 79 --ack-frequency 79"
+ "msg_group_test -b $BROKER_URL -a $QUEUE_NAME --group-key $GROUP_KEY --messages 10007 --receivers 3 --senders 5 --group-size 211 --randomize-group-size --capacity 97 --ack-frequency 47"
+ "qpid-config -a $BROKER_URL del queue $QUEUE_NAME --force")
+
+while [ -n "${tests[i]}" ]; do
+ run_test ${tests[i]}
+ RETCODE=$?
+ if test x$RETCODE != x0; then
+ echo "FAILED message group test. Failed command: \"${tests[i]}\"";
+ exit 1;
+ fi
+ i+=1
+done