diff options
Diffstat (limited to 'cpp/src/tests/msg_group_test.cpp')
-rw-r--r-- | cpp/src/tests/msg_group_test.cpp | 86 |
1 files changed, 55 insertions, 31 deletions
diff --git a/cpp/src/tests/msg_group_test.cpp b/cpp/src/tests/msg_group_test.cpp index 6b9d09b89a..4f54e3ee53 100644 --- a/cpp/src/tests/msg_group_test.cpp +++ b/cpp/src/tests/msg_group_test.cpp @@ -126,6 +126,8 @@ struct Options : public qpid::Options try { qpid::Options::parse(argc, argv); if (address.empty()) throw qpid::Exception("Address must be specified!"); + if (senders == 0 && receivers == 0) throw qpid::Exception("No senders and No receivers?"); + if (messages == 0) throw qpid::Exception("The message count cannot be zero."); qpid::log::Logger::instance().configure(log); if (help) { std::ostringstream msg; @@ -152,7 +154,9 @@ class GroupChecker { qpid::sys::Mutex lock; - const uint totalMsgs; + uint consumerCt; + uint producerCt; + uint totalMsgs; uint totalMsgsConsumed; uint totalMsgsPublished; bool allowDuplicates; @@ -169,9 +173,18 @@ class GroupChecker public: - GroupChecker( uint t, bool d ) : - totalMsgs(t), totalMsgsConsumed(0), totalMsgsPublished(0), allowDuplicates(d), - duplicateMsgs(0) {} + GroupChecker( uint messages, uint consumers, uint producers, bool d) : + consumerCt(consumers), producerCt(producers), + totalMsgs(0), totalMsgsConsumed(0), totalMsgsPublished(0), allowDuplicates(d), + duplicateMsgs(0) + { + // if consumering only - we a draining a queue of 'messages' queued messages. + if (producerCt != 0) { + totalMsgs = producers * messages; + } else { + totalMsgs = messages; + } + } bool checkSequence( const std::string& groupId, uint sequence, const std::string& client ) @@ -227,12 +240,22 @@ public: return sequenceMap[groupId]; } - bool allMsgsConsumed() // true when done processing msgs + bool allMsgsPublished() // true when done publishing msgs { qpid::sys::Mutex::ScopedLock l(lock); - return (totalMsgsPublished >= totalMsgs) && - (totalMsgsConsumed >= totalMsgsPublished) && - sequenceMap.size() == 0; + return (producerCt == 0 || totalMsgsPublished >= totalMsgs); + } + + bool allMsgsConsumed() // true when done consuming msgs + { + qpid::sys::Mutex::ScopedLock l(lock); + return (consumerCt == 0 || + (totalMsgsConsumed >= totalMsgs && sequenceMap.size() == 0)); + } + + uint getTotalMessages() + { + return totalMsgs; } uint getConsumedTotal() @@ -533,7 +556,9 @@ int main(int argc, char ** argv) Options opts; if (opts.parse(argc, argv)) { - GroupChecker state( opts.senders * opts.messages, + GroupChecker state( opts.messages, + opts.receivers, + opts.senders, opts.allowDuplicates); std::vector<Client::shared_ptr> clients; @@ -555,48 +580,47 @@ int main(int argc, char ** argv) // wait for all pubs/subs to finish.... or for consumers to fail or stall. uint stalledTime = 0; - bool done; bool clientFailed = false; - do { - uint lastCount = state.getConsumedTotal(); + while (!clientFailed && (!state.allMsgsPublished() || !state.allMsgsConsumed())) { + uint lastCount; + + lastCount = state.getConsumedTotal(); qpid::sys::usleep( 1000000 ); - // check each client for status - done = true; + // check each client for failures for (std::vector<Client::shared_ptr>::iterator i = clients.begin(); i != clients.end(); ++i) { QPID_LOG(debug, "Client " << (*i)->getName() << " state=" << (*i)->getState()); if ((*i)->getState() == Client::FAILURE) { QPID_LOG(error, argv[0] << ": test failed with client error: " << (*i)->getErrorMsg()); clientFailed = true; - done = true; break; // exit test. - } else if ((*i)->getState() != Client::DONE) { - done = false; } } - if (!done) { - // check that consumers are still receiving messages - if (lastCount == state.getConsumedTotal()) - stalledTime++; - else { - lastCount = state.getConsumedTotal(); + // check for stalled consumers + if (!clientFailed && !state.allMsgsConsumed()) { + if (lastCount == state.getConsumedTotal()) { + if (++stalledTime >= opts.timeout) { + clientFailed = true; + break; // exit test + } + } else { stalledTime = 0; } } - QPID_LOG(debug, "Consumed to date = " << state.getConsumedTotal() << " Published to date = " << state.getPublishedTotal() << - " total=" << opts.senders * opts.messages ); - - } while (!done && stalledTime < opts.timeout); + " total=" << state.getTotalMessages()); + } if (clientFailed) { - status = 1; - } else if (stalledTime >= opts.timeout) { - QPID_LOG(error, argv[0] << ": test failed due to stalled consumer." ); - status = 2; + if (stalledTime >= opts.timeout) { + QPID_LOG(error, argv[0] << ": test failed due to stalled consumer." ); + status = 2; + } else { + status = 1; + } } // Wait for started threads. |