diff options
author | Kenneth Anthony Giusti <kgiusti@apache.org> | 2011-09-01 21:34:50 +0000 |
---|---|---|
committer | Kenneth Anthony Giusti <kgiusti@apache.org> | 2011-09-01 21:34:50 +0000 |
commit | 03aeada7f07e5c3492578f3278df16919cbc0b9a (patch) | |
tree | 878ddf49f3f381ee0fd2dfb932f57f88fc64a721 | |
parent | 5d6f0702236ceb40e0118a20021caa1df36e5afb (diff) | |
download | qpid-python-03aeada7f07e5c3492578f3278df16919cbc0b9a.tar.gz |
QPID-3346: added some functional tests.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-3346@1164284 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/cpp/src/tests/Makefile.am | 8 | ||||
-rw-r--r-- | qpid/cpp/src/tests/msg_group_test.cpp | 77 | ||||
-rwxr-xr-x | qpid/cpp/src/tests/run_msg_group_tests | 46 | ||||
-rwxr-xr-x | qpid/cpp/src/tests/run_msg_group_tests_soak | 60 |
4 files changed, 153 insertions, 38 deletions
diff --git a/qpid/cpp/src/tests/Makefile.am b/qpid/cpp/src/tests/Makefile.am index e0b0837701..dceabe36cc 100644 --- a/qpid/cpp/src/tests/Makefile.am +++ b/qpid/cpp/src/tests/Makefile.am @@ -358,7 +358,10 @@ CLEANFILES+=valgrind.out *.log *.vglog* dummy_test qpidd.port $(unit_wrappers) # Longer running stability tests, not run by default check: target. # Not run under valgrind, too slow -LONG_TESTS+=start_broker fanout_perftest shared_perftest multiq_perftest topic_perftest run_ring_queue_test stop_broker \ +LONG_TESTS+=start_broker \ + fanout_perftest shared_perftest multiq_perftest topic_perftest run_ring_queue_test \ + run_msg_groups_tests_soak \ + stop_broker \ run_failover_soak reliable_replication_test \ federated_cluster_test_with_node_failure @@ -370,7 +373,8 @@ EXTRA_DIST+= \ run_failover_soak \ reliable_replication_test \ federated_cluster_test_with_node_failure \ - sasl_test_setup.sh + sasl_test_setup.sh \ + run_msg_groups_tests_soak check-long: $(MAKE) check TESTS="$(LONG_TESTS)" VALGRIND= diff --git a/qpid/cpp/src/tests/msg_group_test.cpp b/qpid/cpp/src/tests/msg_group_test.cpp index f1636bc0d8..3a458ad569 100644 --- a/qpid/cpp/src/tests/msg_group_test.cpp +++ b/qpid/cpp/src/tests/msg_group_test.cpp @@ -29,6 +29,7 @@ #include <qpid/Options.h> #include <qpid/log/Logger.h> #include <qpid/log/Options.h> +#include "qpid/log/Statement.h" #include "qpid/sys/Time.h" #include "qpid/sys/Runnable.h" #include "qpid/sys/Thread.h" @@ -65,6 +66,7 @@ struct Options : public qpid::Options bool allowDuplicates; bool randomizeSize; bool stickyConsumer; + uint timeout; Options(const std::string& argv0=std::string()) : qpid::Options("Options"), @@ -83,7 +85,8 @@ struct Options : public qpid::Options durable(false), allowDuplicates(false), randomizeSize(false), - stickyConsumer(false) + stickyConsumer(false), + timeout(10) { addOptions() ("ack-frequency", qpid::optValue(ackFrequency, "N"), "Ack frequency (0 implies none of the messages will get accepted)") @@ -101,7 +104,8 @@ struct Options : public qpid::Options ("randomize-group-size", qpid::optValue(randomizeSize), "Randomize the number of messages per group to [1...group-size].") ("senders,s", qpid::optValue(senders, "N"), "Number of message producers.") ("sticky-consumers", qpid::optValue(stickyConsumer), "If set, verify that all messages in a group are consumed by the same client [TBD].") - ("print-report", qpid::optValue(printReport, "yes|no"), "Dump message group statistics to stdout.") + ("timeout", qpid::optValue(timeout, "N"), "Fail with a stall error should all consumers remain idle for timeout seconds.") + ("print-report", qpid::optValue(printReport), "Dump message group statistics to stdout.") ("help", qpid::optValue(help), "print this usage statement"); add(log); //("check-redelivered", qpid::optValue(checkRedelivered), "Fails with exception if a duplicate is not marked as redelivered (only relevant when ignore-duplicates is selected)") @@ -140,8 +144,9 @@ class GroupChecker { qpid::sys::Mutex lock; - const uint totalMsgsPublished; + const uint totalMsgs; uint totalMsgsConsumed; + uint totalMsgsPublished; bool allowDuplicates; uint duplicateMsgs; @@ -157,7 +162,7 @@ class GroupChecker public: GroupChecker( uint t, bool d ) : - totalMsgsPublished(t), totalMsgsConsumed(0), allowDuplicates(d), + totalMsgs(t), totalMsgsConsumed(0), totalMsgsPublished(0), allowDuplicates(d), duplicateMsgs(0) {} bool checkSequence( const std::string& groupId, @@ -165,6 +170,8 @@ public: { qpid::sys::Mutex::ScopedLock l(lock); + QPID_LOG(debug, "Client " << client << " has received " << groupId << ":" << sequence); + GroupStatistics::iterator gs = statistics.find(groupId); if (gs == statistics.end()) { statistics[groupId][client] = 1; @@ -176,19 +183,33 @@ public: if (s == sequenceMap.end()) { sequenceMap[groupId] = 1; totalMsgsConsumed++; + QPID_LOG(debug, "Client " << client << " thinks this is the first message from group " << groupId << ":" << sequence); return sequence == 0; } if (sequence < s->second) { duplicateMsgs++; + QPID_LOG(debug, "Client " << client << " thinks this message is a duplicate! " << groupId << ":" << sequence); return allowDuplicates; } totalMsgsConsumed++; return sequence == s->second++; } - bool eraseGroup( const std::string& groupId ) + void sendingSequence( const std::string& groupId, + uint sequence, bool eos, + const std::string& client ) { qpid::sys::Mutex::ScopedLock l(lock); + ++totalMsgsPublished; + + QPID_LOG(debug, "Client " << client << " sending " << groupId << ":" << sequence << + ((eos) ? " (last)" : "")); + } + + bool eraseGroup( const std::string& groupId, const std::string& name ) + { + qpid::sys::Mutex::ScopedLock l(lock); + QPID_LOG(debug, "Deleting group " << groupId << " (by client " << name << ")"); return sequenceMap.erase( groupId ) == 1; } @@ -201,7 +222,7 @@ public: bool allMsgsConsumed() // true when done processing msgs { qpid::sys::Mutex::ScopedLock l(lock); - return totalMsgsConsumed == totalMsgsPublished; + return totalMsgsConsumed == totalMsgs; } uint getConsumedTotal() @@ -210,6 +231,12 @@ public: return totalMsgsConsumed; } + uint getPublishedTotal() + { + qpid::sys::Mutex::ScopedLock l(lock); + return totalMsgsPublished; + } + ostream& print(ostream& out) { qpid::sys::Mutex::ScopedLock l(lock); @@ -314,7 +341,7 @@ public: testFailed( msg.str() ); break; } else if (eof) { - if (!checker.eraseGroup( groupId )) { + if (!checker.eraseGroup( groupId, name )) { ostringstream msg; msg << "Erase group failed. Group=" << groupId << " rcvd seq=" << groupSeq; testFailed( msg.str() ); @@ -347,8 +374,10 @@ public: class Producer : public Client { + GroupChecker& checker; + public: - Producer(const std::string& n, const Options& o) : Client(n, o) {}; + Producer(const std::string& n, const Options& o, GroupChecker& c) : Client(n, o), checker(c) {}; virtual ~Producer() {}; void run() @@ -367,7 +396,7 @@ public: uint groupSeq = 0; uint groupSize = opts.groupSize; ostringstream group; - group << name << sent; + group << name << ":" << sent; std::string groupId(group.str()); while (!stopped && sent < opts.messages) { @@ -375,11 +404,12 @@ public: msg.getProperties()[opts.groupKey] = groupId; msg.getProperties()[SN] = groupSeq++; msg.getProperties()[EOS] = false; + checker.sendingSequence( groupId, groupSeq-1, (groupSeq == groupSize), name ); if (groupSeq == groupSize) { msg.getProperties()[EOS] = true; // generate new group ostringstream nextGroupId; - nextGroupId << name << sent; + nextGroupId << name << ":" << sent; groupId = nextGroupId.str(); groupSeq = 0; if (opts.randomizeSize) { @@ -424,7 +454,7 @@ int main(int argc, char ** argv) for (size_t j = 0; j < opts.senders; ++j) { ostringstream name; name << "P_" << j; - clients.push_back(Client::shared_ptr(new Producer( name.str(), opts ))); + clients.push_back(Client::shared_ptr(new Producer( name.str(), opts, state ))); clients.back()->getThread() = qpid::sys::Thread(*clients.back()); } for (size_t j = 0; j < opts.receivers; ++j) { @@ -435,11 +465,11 @@ int main(int argc, char ** argv) } // wait for all pubs/subs to finish.... or for consumers to fail or stall. - uint lastCount; + uint stalledTime = 0; bool done; bool clientFailed = false; do { - lastCount = state.getConsumedTotal(); + uint lastCount = state.getConsumedTotal(); qpid::sys::usleep( 1000000 ); // check each client for status @@ -450,16 +480,31 @@ int main(int argc, char ** argv) std::cerr << argv[0] << ": test failed with client error: " << (*i)->getErrorMsg() << std::endl; clientFailed = true; done = true; - break; + break; // exit test. } else if ((*i)->getState() != Client::DONE) { done = false; } } - } while (!done && lastCount != state.getConsumedTotal()); + + if (!done) { + // check that consumers are still receiving messages + if (lastCount == state.getConsumedTotal()) + stalledTime++; + else { + lastCount = state.getConsumedTotal(); + stalledTime = 0; + } + } + + QPID_LOG(debug, "Consumed to date = " << state.getConsumedTotal() << + " Published to date = " << state.getPublishedTotal() << + " total=" << opts.senders * opts.messages ); + + } while (!done && stalledTime < opts.timeout); if (clientFailed) { status = 1; - } else if (!state.allMsgsConsumed()) { + } else if (stalledTime >= opts.timeout) { std::cerr << argv[0] << ": test failed due to stalled consumer." << std::endl; status = 2; } diff --git a/qpid/cpp/src/tests/run_msg_group_tests b/qpid/cpp/src/tests/run_msg_group_tests index 6c429939bf..cae614ccff 100755 --- a/qpid/cpp/src/tests/run_msg_group_tests +++ b/qpid/cpp/src/tests/run_msg_group_tests @@ -18,9 +18,9 @@ # specific language governing permissions and limitations # under the License. # -#script to run a sequence of ring queue tests via make +#script to run a sequence of message group queue tests via make -#setup path to find qpid-config and sender/receiver test progs +#setup path to find qpid-config and msg_group_test progs source ./test_env.sh export PATH=$PWD:$srcdir:$PYTHON_COMMANDS:$PATH @@ -28,33 +28,39 @@ export PATH=$PWD:$srcdir:$PYTHON_COMMANDS:$PATH #set port to connect to via env var test -s qpidd.port && QPID_PORT=`cat qpidd.port` -trap cleanup INT TERM QUIT +#trap cleanup INT TERM QUIT QUEUE_NAME="group-queue" GROUP_KEY="My-Group-Id" -BROKER_URL="-a ${QPID_BROKER:-localhost}:${QPID_PORT:-5672}" +BROKER_URL="${QPID_BROKER:-localhost}:${QPID_PORT:-5672}" -setup() { - qpid-config $BROKER_URL add queue $QUEUE_NAME --argument="qpid.group_header_key=${GROUP_KEY}" +run_test() { + $@ } -cleanup() { - qpid-config $BROKER_URL del queue $QUEUE_NAME --force -} +##set -x -run_test() { +declare -i i=0 +declare -a tests +tests=("qpid-config -a $BROKER_URL add queue $QUEUE_NAME --argument=qpid.group_header_key=${GROUP_KEY}" + "msg_group_test -b $BROKER_URL -a $QUEUE_NAME --group-key $GROUP_KEY --messages 103 --group-size 13 --receivers 2 --senders 3 --capacity 3 --ack-frequency 7 --randomize-group-size" + "msg_group_test -b $BROKER_URL -a $QUEUE_NAME --group-key $GROUP_KEY --messages 103 --group-size 13 --receivers 2 --senders 3 --capacity 7 --ack-frequency 7 --randomize-group-size" + "qpid-config -a $BROKER_URL add queue ${QUEUE_NAME}-two --argument=qpid.group_header_key=${GROUP_KEY}" + "msg_group_test -b $BROKER_URL -a $QUEUE_NAME --group-key $GROUP_KEY --messages 103 --group-size 13 --receivers 2 --senders 3 --capacity 7 --ack-frequency 3 --randomize-group-size" + "msg_group_test -b $BROKER_URL -a ${QUEUE_NAME}-two --group-key $GROUP_KEY --messages 103 --group-size 13 --receivers 2 --senders 3 --capacity 3 --ack-frequency 7 --randomize-group-size" + "msg_group_test -b $BROKER_URL -a $QUEUE_NAME --group-key $GROUP_KEY --messages 59 --group-size 5 --receivers 2 --senders 3 --capacity 1 --ack-frequency 3 --randomize-group-size" + "qpid-config -a $BROKER_URL del queue ${QUEUE_NAME}-two --force" + "msg_group_test -b $BROKER_URL -a $QUEUE_NAME --group-key $GROUP_KEY --messages 59 --group-size 3 --receivers 2 --senders 3 --capacity 1 --ack-frequency 1 --randomize-group-size" + "msg_group_test -b $BROKER_URL -a $QUEUE_NAME --group-key $GROUP_KEY --messages 103 --group-size 13 --receivers 2 --senders 3 --capacity 47 --ack-frequency 79" + "qpid-config -a $BROKER_URL del queue $QUEUE_NAME --force") - msg_group_test -a $QUEUE_NAME --group-key $GROUP_KEY --capacity 3 --group-size 13 --ack-frequency 7 --messages 103 --receivers 3 --senders 5 +while [ -n "${tests[i]}" ]; do + run_test ${tests[i]} RETCODE=$? - cleanup - if test x$RETCODE != x0; then - echo "FAIL message group tests"; exit 1; + echo "FAILED message group test. Failed command: \"${tests[i]}\""; + exit 1; fi -} - -setup -run_test - - + i+=1 +done diff --git a/qpid/cpp/src/tests/run_msg_group_tests_soak b/qpid/cpp/src/tests/run_msg_group_tests_soak new file mode 100755 index 0000000000..4d288758bd --- /dev/null +++ b/qpid/cpp/src/tests/run_msg_group_tests_soak @@ -0,0 +1,60 @@ +#!/bin/bash + +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +#script to run a sequence of long-running message group tests via make + +#setup path to find qpid-config and msg_group_test test progs +source ./test_env.sh + +export PATH=$PWD:$srcdir:$PYTHON_COMMANDS:$PATH + +#set port to connect to via env var +test -s qpidd.port && QPID_PORT=`cat qpidd.port` + +#trap cleanup INT TERM QUIT + +QUEUE_NAME="group-queue" +GROUP_KEY="My-Group-Id" + +BROKER_URL="${QPID_BROKER:-localhost}:${QPID_PORT:-5672}" + +run_test() { + $@ +} + +##set -x + +declare -i i=0 +declare -a tests +tests=("qpid-config -a $BROKER_URL add queue $QUEUE_NAME --argument=qpid.group_header_key=${GROUP_KEY}" + "msg_group_test -b $BROKER_URL -a $QUEUE_NAME --group-key $GROUP_KEY --messages 10007 --receivers 3 --senders 5 --group-size 211 --randomize-group-size --capacity 47 --ack-frequency 97" + "msg_group_test -b $BROKER_URL -a $QUEUE_NAME --group-key $GROUP_KEY --messages 10007 --receivers 3 --senders 5 --group-size 211 --randomize-group-size --capacity 79 --ack-frequency 79" + "msg_group_test -b $BROKER_URL -a $QUEUE_NAME --group-key $GROUP_KEY --messages 10007 --receivers 3 --senders 5 --group-size 211 --randomize-group-size --capacity 97 --ack-frequency 47" + "qpid-config -a $BROKER_URL del queue $QUEUE_NAME --force") + +while [ -n "${tests[i]}" ]; do + run_test ${tests[i]} + RETCODE=$? + if test x$RETCODE != x0; then + echo "FAILED message group test. Failed command: \"${tests[i]}\""; + exit 1; + fi + i+=1 +done |