summaryrefslogtreecommitdiff
path: root/cpp/src/tests/msg_group_test.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/tests/msg_group_test.cpp')
-rw-r--r--cpp/src/tests/msg_group_test.cpp86
1 files changed, 55 insertions, 31 deletions
diff --git a/cpp/src/tests/msg_group_test.cpp b/cpp/src/tests/msg_group_test.cpp
index 6b9d09b89a..4f54e3ee53 100644
--- a/cpp/src/tests/msg_group_test.cpp
+++ b/cpp/src/tests/msg_group_test.cpp
@@ -126,6 +126,8 @@ struct Options : public qpid::Options
try {
qpid::Options::parse(argc, argv);
if (address.empty()) throw qpid::Exception("Address must be specified!");
+ if (senders == 0 && receivers == 0) throw qpid::Exception("No senders and No receivers?");
+ if (messages == 0) throw qpid::Exception("The message count cannot be zero.");
qpid::log::Logger::instance().configure(log);
if (help) {
std::ostringstream msg;
@@ -152,7 +154,9 @@ class GroupChecker
{
qpid::sys::Mutex lock;
- const uint totalMsgs;
+ uint consumerCt;
+ uint producerCt;
+ uint totalMsgs;
uint totalMsgsConsumed;
uint totalMsgsPublished;
bool allowDuplicates;
@@ -169,9 +173,18 @@ class GroupChecker
public:
- GroupChecker( uint t, bool d ) :
- totalMsgs(t), totalMsgsConsumed(0), totalMsgsPublished(0), allowDuplicates(d),
- duplicateMsgs(0) {}
+ GroupChecker( uint messages, uint consumers, uint producers, bool d) :
+ consumerCt(consumers), producerCt(producers),
+ totalMsgs(0), totalMsgsConsumed(0), totalMsgsPublished(0), allowDuplicates(d),
+ duplicateMsgs(0)
+ {
+ // if consumering only - we a draining a queue of 'messages' queued messages.
+ if (producerCt != 0) {
+ totalMsgs = producers * messages;
+ } else {
+ totalMsgs = messages;
+ }
+ }
bool checkSequence( const std::string& groupId,
uint sequence, const std::string& client )
@@ -227,12 +240,22 @@ public:
return sequenceMap[groupId];
}
- bool allMsgsConsumed() // true when done processing msgs
+ bool allMsgsPublished() // true when done publishing msgs
{
qpid::sys::Mutex::ScopedLock l(lock);
- return (totalMsgsPublished >= totalMsgs) &&
- (totalMsgsConsumed >= totalMsgsPublished) &&
- sequenceMap.size() == 0;
+ return (producerCt == 0 || totalMsgsPublished >= totalMsgs);
+ }
+
+ bool allMsgsConsumed() // true when done consuming msgs
+ {
+ qpid::sys::Mutex::ScopedLock l(lock);
+ return (consumerCt == 0 ||
+ (totalMsgsConsumed >= totalMsgs && sequenceMap.size() == 0));
+ }
+
+ uint getTotalMessages()
+ {
+ return totalMsgs;
}
uint getConsumedTotal()
@@ -533,7 +556,9 @@ int main(int argc, char ** argv)
Options opts;
if (opts.parse(argc, argv)) {
- GroupChecker state( opts.senders * opts.messages,
+ GroupChecker state( opts.messages,
+ opts.receivers,
+ opts.senders,
opts.allowDuplicates);
std::vector<Client::shared_ptr> clients;
@@ -555,48 +580,47 @@ int main(int argc, char ** argv)
// wait for all pubs/subs to finish.... or for consumers to fail or stall.
uint stalledTime = 0;
- bool done;
bool clientFailed = false;
- do {
- uint lastCount = state.getConsumedTotal();
+ while (!clientFailed && (!state.allMsgsPublished() || !state.allMsgsConsumed())) {
+ uint lastCount;
+
+ lastCount = state.getConsumedTotal();
qpid::sys::usleep( 1000000 );
- // check each client for status
- done = true;
+ // check each client for failures
for (std::vector<Client::shared_ptr>::iterator i = clients.begin();
i != clients.end(); ++i) {
QPID_LOG(debug, "Client " << (*i)->getName() << " state=" << (*i)->getState());
if ((*i)->getState() == Client::FAILURE) {
QPID_LOG(error, argv[0] << ": test failed with client error: " << (*i)->getErrorMsg());
clientFailed = true;
- done = true;
break; // exit test.
- } else if ((*i)->getState() != Client::DONE) {
- done = false;
}
}
- if (!done) {
- // check that consumers are still receiving messages
- if (lastCount == state.getConsumedTotal())
- stalledTime++;
- else {
- lastCount = state.getConsumedTotal();
+ // check for stalled consumers
+ if (!clientFailed && !state.allMsgsConsumed()) {
+ if (lastCount == state.getConsumedTotal()) {
+ if (++stalledTime >= opts.timeout) {
+ clientFailed = true;
+ break; // exit test
+ }
+ } else {
stalledTime = 0;
}
}
-
QPID_LOG(debug, "Consumed to date = " << state.getConsumedTotal() <<
" Published to date = " << state.getPublishedTotal() <<
- " total=" << opts.senders * opts.messages );
-
- } while (!done && stalledTime < opts.timeout);
+ " total=" << state.getTotalMessages());
+ }
if (clientFailed) {
- status = 1;
- } else if (stalledTime >= opts.timeout) {
- QPID_LOG(error, argv[0] << ": test failed due to stalled consumer." );
- status = 2;
+ if (stalledTime >= opts.timeout) {
+ QPID_LOG(error, argv[0] << ": test failed due to stalled consumer." );
+ status = 2;
+ } else {
+ status = 1;
+ }
}
// Wait for started threads.