summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKenneth Anthony Giusti <kgiusti@apache.org>2011-11-04 17:39:49 +0000
committerKenneth Anthony Giusti <kgiusti@apache.org>2011-11-04 17:39:49 +0000
commitf1f1b41c39982ab393b73a099a8e479ee6251bd2 (patch)
tree866aa5e22c150958698dd9b95728569028fd0b38
parente2d694d60d2d653fef29a0d715dda83600425f2f (diff)
downloadqpid-python-f1f1b41c39982ab393b73a099a8e479ee6251bd2.tar.gz
QPID-3564: enhance message group generator to allow queue fill/drain tests
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1197686 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/tests/msg_group_test.cpp86
-rwxr-xr-xqpid/cpp/src/tests/run_msg_group_tests2
-rwxr-xr-xqpid/cpp/src/tests/run_msg_group_tests_soak2
3 files changed, 59 insertions, 31 deletions
diff --git a/qpid/cpp/src/tests/msg_group_test.cpp b/qpid/cpp/src/tests/msg_group_test.cpp
index 6b9d09b89a..4f54e3ee53 100644
--- a/qpid/cpp/src/tests/msg_group_test.cpp
+++ b/qpid/cpp/src/tests/msg_group_test.cpp
@@ -126,6 +126,8 @@ struct Options : public qpid::Options
try {
qpid::Options::parse(argc, argv);
if (address.empty()) throw qpid::Exception("Address must be specified!");
+ if (senders == 0 && receivers == 0) throw qpid::Exception("No senders and No receivers?");
+ if (messages == 0) throw qpid::Exception("The message count cannot be zero.");
qpid::log::Logger::instance().configure(log);
if (help) {
std::ostringstream msg;
@@ -152,7 +154,9 @@ class GroupChecker
{
qpid::sys::Mutex lock;
- const uint totalMsgs;
+ uint consumerCt;
+ uint producerCt;
+ uint totalMsgs;
uint totalMsgsConsumed;
uint totalMsgsPublished;
bool allowDuplicates;
@@ -169,9 +173,18 @@ class GroupChecker
public:
- GroupChecker( uint t, bool d ) :
- totalMsgs(t), totalMsgsConsumed(0), totalMsgsPublished(0), allowDuplicates(d),
- duplicateMsgs(0) {}
+ GroupChecker( uint messages, uint consumers, uint producers, bool d) :
+ consumerCt(consumers), producerCt(producers),
+ totalMsgs(0), totalMsgsConsumed(0), totalMsgsPublished(0), allowDuplicates(d),
+ duplicateMsgs(0)
+ {
+ // if consumering only - we a draining a queue of 'messages' queued messages.
+ if (producerCt != 0) {
+ totalMsgs = producers * messages;
+ } else {
+ totalMsgs = messages;
+ }
+ }
bool checkSequence( const std::string& groupId,
uint sequence, const std::string& client )
@@ -227,12 +240,22 @@ public:
return sequenceMap[groupId];
}
- bool allMsgsConsumed() // true when done processing msgs
+ bool allMsgsPublished() // true when done publishing msgs
{
qpid::sys::Mutex::ScopedLock l(lock);
- return (totalMsgsPublished >= totalMsgs) &&
- (totalMsgsConsumed >= totalMsgsPublished) &&
- sequenceMap.size() == 0;
+ return (producerCt == 0 || totalMsgsPublished >= totalMsgs);
+ }
+
+ bool allMsgsConsumed() // true when done consuming msgs
+ {
+ qpid::sys::Mutex::ScopedLock l(lock);
+ return (consumerCt == 0 ||
+ (totalMsgsConsumed >= totalMsgs && sequenceMap.size() == 0));
+ }
+
+ uint getTotalMessages()
+ {
+ return totalMsgs;
}
uint getConsumedTotal()
@@ -533,7 +556,9 @@ int main(int argc, char ** argv)
Options opts;
if (opts.parse(argc, argv)) {
- GroupChecker state( opts.senders * opts.messages,
+ GroupChecker state( opts.messages,
+ opts.receivers,
+ opts.senders,
opts.allowDuplicates);
std::vector<Client::shared_ptr> clients;
@@ -555,48 +580,47 @@ int main(int argc, char ** argv)
// wait for all pubs/subs to finish.... or for consumers to fail or stall.
uint stalledTime = 0;
- bool done;
bool clientFailed = false;
- do {
- uint lastCount = state.getConsumedTotal();
+ while (!clientFailed && (!state.allMsgsPublished() || !state.allMsgsConsumed())) {
+ uint lastCount;
+
+ lastCount = state.getConsumedTotal();
qpid::sys::usleep( 1000000 );
- // check each client for status
- done = true;
+ // check each client for failures
for (std::vector<Client::shared_ptr>::iterator i = clients.begin();
i != clients.end(); ++i) {
QPID_LOG(debug, "Client " << (*i)->getName() << " state=" << (*i)->getState());
if ((*i)->getState() == Client::FAILURE) {
QPID_LOG(error, argv[0] << ": test failed with client error: " << (*i)->getErrorMsg());
clientFailed = true;
- done = true;
break; // exit test.
- } else if ((*i)->getState() != Client::DONE) {
- done = false;
}
}
- if (!done) {
- // check that consumers are still receiving messages
- if (lastCount == state.getConsumedTotal())
- stalledTime++;
- else {
- lastCount = state.getConsumedTotal();
+ // check for stalled consumers
+ if (!clientFailed && !state.allMsgsConsumed()) {
+ if (lastCount == state.getConsumedTotal()) {
+ if (++stalledTime >= opts.timeout) {
+ clientFailed = true;
+ break; // exit test
+ }
+ } else {
stalledTime = 0;
}
}
-
QPID_LOG(debug, "Consumed to date = " << state.getConsumedTotal() <<
" Published to date = " << state.getPublishedTotal() <<
- " total=" << opts.senders * opts.messages );
-
- } while (!done && stalledTime < opts.timeout);
+ " total=" << state.getTotalMessages());
+ }
if (clientFailed) {
- status = 1;
- } else if (stalledTime >= opts.timeout) {
- QPID_LOG(error, argv[0] << ": test failed due to stalled consumer." );
- status = 2;
+ if (stalledTime >= opts.timeout) {
+ QPID_LOG(error, argv[0] << ": test failed due to stalled consumer." );
+ status = 2;
+ } else {
+ status = 1;
+ }
}
// Wait for started threads.
diff --git a/qpid/cpp/src/tests/run_msg_group_tests b/qpid/cpp/src/tests/run_msg_group_tests
index e2fec2d170..5a6da546f3 100755
--- a/qpid/cpp/src/tests/run_msg_group_tests
+++ b/qpid/cpp/src/tests/run_msg_group_tests
@@ -54,6 +54,8 @@ tests=("qpid-config -a $BROKER_URL add queue $QUEUE_NAME --group-header=${GROUP_
"qpid-config -a $BROKER_URL del queue ${QUEUE_NAME}-two --force"
"msg_group_test -b $BROKER_URL -a $QUEUE_NAME --group-key $GROUP_KEY --messages 59 --group-size 3 --receivers 2 --senders 3 --capacity 1 --ack-frequency 1 --randomize-group-size"
"msg_group_test -b $BROKER_URL -a $QUEUE_NAME --group-key $GROUP_KEY --messages 211 --group-size 13 --receivers 2 --senders 3 --capacity 47 --ack-frequency 79 --interleave 53"
+ "msg_group_test -b $BROKER_URL -a $QUEUE_NAME --group-key $GROUP_KEY --messages 10000 --group-size 1 --receivers 0 --senders 1"
+ "msg_group_test -b $BROKER_URL -a $QUEUE_NAME --group-key $GROUP_KEY --messages 10000 --receivers 5 --senders 0"
"qpid-config -a $BROKER_URL del queue $QUEUE_NAME --force")
while [ -n "${tests[i]}" ]; do
diff --git a/qpid/cpp/src/tests/run_msg_group_tests_soak b/qpid/cpp/src/tests/run_msg_group_tests_soak
index d9d4cc227f..44995423cc 100755
--- a/qpid/cpp/src/tests/run_msg_group_tests_soak
+++ b/qpid/cpp/src/tests/run_msg_group_tests_soak
@@ -48,6 +48,8 @@ tests=("qpid-config -a $BROKER_URL add queue $QUEUE_NAME --group-header=${GROUP_
"msg_group_test -b $BROKER_URL -a $QUEUE_NAME --group-key $GROUP_KEY --messages 10007 --receivers 3 --senders 5 --group-size 211 --randomize-group-size --capacity 47 --ack-frequency 97"
"msg_group_test -b $BROKER_URL -a $QUEUE_NAME --group-key $GROUP_KEY --messages 10007 --receivers 3 --senders 5 --group-size 211 --randomize-group-size --capacity 79 --ack-frequency 79"
"msg_group_test -b $BROKER_URL -a $QUEUE_NAME --group-key $GROUP_KEY --messages 10007 --receivers 3 --senders 5 --group-size 211 --randomize-group-size --capacity 97 --ack-frequency 47"
+ "msg_group_test -b $BROKER_URL -a $QUEUE_NAME --group-key $GROUP_KEY --messages 40000 --receivers 0 --senders 5 --group-size 13 --randomize-group-size"
+ "msg_group_test -b $BROKER_URL -a $QUEUE_NAME --group-key $GROUP_KEY --messages 200000 --receivers 3 --senders 0 --capacity 23 --ack-frequency 7"
"qpid-config -a $BROKER_URL del queue $QUEUE_NAME --force")
while [ -n "${tests[i]}" ]; do