diff options
author | Kenneth Anthony Giusti <kgiusti@apache.org> | 2011-11-04 17:39:49 +0000 |
---|---|---|
committer | Kenneth Anthony Giusti <kgiusti@apache.org> | 2011-11-04 17:39:49 +0000 |
commit | f1f1b41c39982ab393b73a099a8e479ee6251bd2 (patch) | |
tree | 866aa5e22c150958698dd9b95728569028fd0b38 | |
parent | e2d694d60d2d653fef29a0d715dda83600425f2f (diff) | |
download | qpid-python-f1f1b41c39982ab393b73a099a8e479ee6251bd2.tar.gz |
QPID-3564: enhance message group generator to allow queue fill/drain tests
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1197686 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/cpp/src/tests/msg_group_test.cpp | 86 | ||||
-rwxr-xr-x | qpid/cpp/src/tests/run_msg_group_tests | 2 | ||||
-rwxr-xr-x | qpid/cpp/src/tests/run_msg_group_tests_soak | 2 |
3 files changed, 59 insertions, 31 deletions
diff --git a/qpid/cpp/src/tests/msg_group_test.cpp b/qpid/cpp/src/tests/msg_group_test.cpp index 6b9d09b89a..4f54e3ee53 100644 --- a/qpid/cpp/src/tests/msg_group_test.cpp +++ b/qpid/cpp/src/tests/msg_group_test.cpp @@ -126,6 +126,8 @@ struct Options : public qpid::Options try { qpid::Options::parse(argc, argv); if (address.empty()) throw qpid::Exception("Address must be specified!"); + if (senders == 0 && receivers == 0) throw qpid::Exception("No senders and No receivers?"); + if (messages == 0) throw qpid::Exception("The message count cannot be zero."); qpid::log::Logger::instance().configure(log); if (help) { std::ostringstream msg; @@ -152,7 +154,9 @@ class GroupChecker { qpid::sys::Mutex lock; - const uint totalMsgs; + uint consumerCt; + uint producerCt; + uint totalMsgs; uint totalMsgsConsumed; uint totalMsgsPublished; bool allowDuplicates; @@ -169,9 +173,18 @@ class GroupChecker public: - GroupChecker( uint t, bool d ) : - totalMsgs(t), totalMsgsConsumed(0), totalMsgsPublished(0), allowDuplicates(d), - duplicateMsgs(0) {} + GroupChecker( uint messages, uint consumers, uint producers, bool d) : + consumerCt(consumers), producerCt(producers), + totalMsgs(0), totalMsgsConsumed(0), totalMsgsPublished(0), allowDuplicates(d), + duplicateMsgs(0) + { + // if consumering only - we a draining a queue of 'messages' queued messages. + if (producerCt != 0) { + totalMsgs = producers * messages; + } else { + totalMsgs = messages; + } + } bool checkSequence( const std::string& groupId, uint sequence, const std::string& client ) @@ -227,12 +240,22 @@ public: return sequenceMap[groupId]; } - bool allMsgsConsumed() // true when done processing msgs + bool allMsgsPublished() // true when done publishing msgs { qpid::sys::Mutex::ScopedLock l(lock); - return (totalMsgsPublished >= totalMsgs) && - (totalMsgsConsumed >= totalMsgsPublished) && - sequenceMap.size() == 0; + return (producerCt == 0 || totalMsgsPublished >= totalMsgs); + } + + bool allMsgsConsumed() // true when done consuming msgs + { + qpid::sys::Mutex::ScopedLock l(lock); + return (consumerCt == 0 || + (totalMsgsConsumed >= totalMsgs && sequenceMap.size() == 0)); + } + + uint getTotalMessages() + { + return totalMsgs; } uint getConsumedTotal() @@ -533,7 +556,9 @@ int main(int argc, char ** argv) Options opts; if (opts.parse(argc, argv)) { - GroupChecker state( opts.senders * opts.messages, + GroupChecker state( opts.messages, + opts.receivers, + opts.senders, opts.allowDuplicates); std::vector<Client::shared_ptr> clients; @@ -555,48 +580,47 @@ int main(int argc, char ** argv) // wait for all pubs/subs to finish.... or for consumers to fail or stall. uint stalledTime = 0; - bool done; bool clientFailed = false; - do { - uint lastCount = state.getConsumedTotal(); + while (!clientFailed && (!state.allMsgsPublished() || !state.allMsgsConsumed())) { + uint lastCount; + + lastCount = state.getConsumedTotal(); qpid::sys::usleep( 1000000 ); - // check each client for status - done = true; + // check each client for failures for (std::vector<Client::shared_ptr>::iterator i = clients.begin(); i != clients.end(); ++i) { QPID_LOG(debug, "Client " << (*i)->getName() << " state=" << (*i)->getState()); if ((*i)->getState() == Client::FAILURE) { QPID_LOG(error, argv[0] << ": test failed with client error: " << (*i)->getErrorMsg()); clientFailed = true; - done = true; break; // exit test. - } else if ((*i)->getState() != Client::DONE) { - done = false; } } - if (!done) { - // check that consumers are still receiving messages - if (lastCount == state.getConsumedTotal()) - stalledTime++; - else { - lastCount = state.getConsumedTotal(); + // check for stalled consumers + if (!clientFailed && !state.allMsgsConsumed()) { + if (lastCount == state.getConsumedTotal()) { + if (++stalledTime >= opts.timeout) { + clientFailed = true; + break; // exit test + } + } else { stalledTime = 0; } } - QPID_LOG(debug, "Consumed to date = " << state.getConsumedTotal() << " Published to date = " << state.getPublishedTotal() << - " total=" << opts.senders * opts.messages ); - - } while (!done && stalledTime < opts.timeout); + " total=" << state.getTotalMessages()); + } if (clientFailed) { - status = 1; - } else if (stalledTime >= opts.timeout) { - QPID_LOG(error, argv[0] << ": test failed due to stalled consumer." ); - status = 2; + if (stalledTime >= opts.timeout) { + QPID_LOG(error, argv[0] << ": test failed due to stalled consumer." ); + status = 2; + } else { + status = 1; + } } // Wait for started threads. diff --git a/qpid/cpp/src/tests/run_msg_group_tests b/qpid/cpp/src/tests/run_msg_group_tests index e2fec2d170..5a6da546f3 100755 --- a/qpid/cpp/src/tests/run_msg_group_tests +++ b/qpid/cpp/src/tests/run_msg_group_tests @@ -54,6 +54,8 @@ tests=("qpid-config -a $BROKER_URL add queue $QUEUE_NAME --group-header=${GROUP_ "qpid-config -a $BROKER_URL del queue ${QUEUE_NAME}-two --force" "msg_group_test -b $BROKER_URL -a $QUEUE_NAME --group-key $GROUP_KEY --messages 59 --group-size 3 --receivers 2 --senders 3 --capacity 1 --ack-frequency 1 --randomize-group-size" "msg_group_test -b $BROKER_URL -a $QUEUE_NAME --group-key $GROUP_KEY --messages 211 --group-size 13 --receivers 2 --senders 3 --capacity 47 --ack-frequency 79 --interleave 53" + "msg_group_test -b $BROKER_URL -a $QUEUE_NAME --group-key $GROUP_KEY --messages 10000 --group-size 1 --receivers 0 --senders 1" + "msg_group_test -b $BROKER_URL -a $QUEUE_NAME --group-key $GROUP_KEY --messages 10000 --receivers 5 --senders 0" "qpid-config -a $BROKER_URL del queue $QUEUE_NAME --force") while [ -n "${tests[i]}" ]; do diff --git a/qpid/cpp/src/tests/run_msg_group_tests_soak b/qpid/cpp/src/tests/run_msg_group_tests_soak index d9d4cc227f..44995423cc 100755 --- a/qpid/cpp/src/tests/run_msg_group_tests_soak +++ b/qpid/cpp/src/tests/run_msg_group_tests_soak @@ -48,6 +48,8 @@ tests=("qpid-config -a $BROKER_URL add queue $QUEUE_NAME --group-header=${GROUP_ "msg_group_test -b $BROKER_URL -a $QUEUE_NAME --group-key $GROUP_KEY --messages 10007 --receivers 3 --senders 5 --group-size 211 --randomize-group-size --capacity 47 --ack-frequency 97" "msg_group_test -b $BROKER_URL -a $QUEUE_NAME --group-key $GROUP_KEY --messages 10007 --receivers 3 --senders 5 --group-size 211 --randomize-group-size --capacity 79 --ack-frequency 79" "msg_group_test -b $BROKER_URL -a $QUEUE_NAME --group-key $GROUP_KEY --messages 10007 --receivers 3 --senders 5 --group-size 211 --randomize-group-size --capacity 97 --ack-frequency 47" + "msg_group_test -b $BROKER_URL -a $QUEUE_NAME --group-key $GROUP_KEY --messages 40000 --receivers 0 --senders 5 --group-size 13 --randomize-group-size" + "msg_group_test -b $BROKER_URL -a $QUEUE_NAME --group-key $GROUP_KEY --messages 200000 --receivers 3 --senders 0 --capacity 23 --ack-frequency 7" "qpid-config -a $BROKER_URL del queue $QUEUE_NAME --force") while [ -n "${tests[i]}" ]; do |