/* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ #include #include #include #include #include #include #include #include #include #include #include "qpid/log/Statement.h" #include "qpid/sys/Time.h" #include "qpid/sys/Runnable.h" #include "qpid/sys/Thread.h" #include "qpid/sys/SystemInfo.h" #include #include #include using namespace qpid::messaging; using namespace qpid::types; using namespace std; namespace qpid { namespace tests { struct Options : public qpid::Options { bool help; std::string url; std::string address; std::string connectionOptions; uint messages; uint capacity; uint ackFrequency; bool failoverUpdates; qpid::log::Options log; uint senders; uint receivers; uint groupSize; bool printReport; std::string groupKey; bool durable; bool allowDuplicates; bool randomizeSize; bool stickyConsumer; uint timeout; uint interleave; std::string prefix; uint sendRate; Options(const std::string& argv0=std::string()) : qpid::Options("Options"), help(false), url("amqp:tcp:127.0.0.1"), messages(10000), capacity(1000), ackFrequency(100), failoverUpdates(false), log(argv0), senders(2), receivers(2), groupSize(10), printReport(false), groupKey("qpid.no_group"), durable(false), allowDuplicates(false), randomizeSize(false), stickyConsumer(false), timeout(10), interleave(1), sendRate(0) { addOptions() ("ack-frequency", qpid::optValue(ackFrequency, "N"), "Ack frequency (0 implies none of the messages will get accepted)") ("address,a", qpid::optValue(address, "ADDRESS"), "address to send and receive from") ("allow-duplicates", qpid::optValue(allowDuplicates), "Ignore the delivery of duplicated messages") ("broker,b", qpid::optValue(url, "URL"), "url of broker to connect to") ("capacity", qpid::optValue(capacity, "N"), "Pre-fetch window (0 implies no pre-fetch)") ("connection-options", qpid::optValue(connectionOptions, "OPTIONS"), "options for the connection") ("durable", qpid::optValue(durable, "yes|no"), "Mark messages as durable.") ("failover-updates", qpid::optValue(failoverUpdates), "Listen for membership updates distributed via amq.failover") ("group-key", qpid::optValue(groupKey, "KEY"), "Key of the message header containing the group identifier.") ("group-prefix", qpid::optValue(prefix, "STRING"), "Add 'prefix' to the start of all generated group identifiers.") ("group-size", qpid::optValue(groupSize, "N"), "Number of messages per a group.") ("interleave", qpid::optValue(interleave, "N"), "Simultaineously interleave messages from N different groups.") ("messages,m", qpid::optValue(messages, "N"), "Number of messages to send per each sender.") ("receivers,r", qpid::optValue(receivers, "N"), "Number of message consumers.") ("randomize-group-size", qpid::optValue(randomizeSize), "Randomize the number of messages per group to [1...group-size].") ("send-rate", qpid::optValue(sendRate,"N"), "Send at rate of N messages/second. 0 means send as fast as possible.") ("senders,s", qpid::optValue(senders, "N"), "Number of message producers.") ("sticky-consumers", qpid::optValue(stickyConsumer), "If set, verify that all messages in a group are consumed by the same client [TBD].") ("timeout", qpid::optValue(timeout, "N"), "Fail with a stall error should all consumers remain idle for timeout seconds.") ("print-report", qpid::optValue(printReport), "Dump message group statistics to stdout.") ("help", qpid::optValue(help), "print this usage statement"); add(log); //("check-redelivered", qpid::optValue(checkRedelivered), "Fails with exception if a duplicate is not marked as redelivered (only relevant when ignore-duplicates is selected)") //("tx", qpid::optValue(tx, "N"), "batch size for transactions (0 implies transaction are not used)") //("rollback-frequency", qpid::optValue(rollbackFrequency, "N"), "rollback frequency (0 implies no transaction will be rolledback)") } bool parse(int argc, char** argv) { try { qpid::Options::parse(argc, argv); if (address.empty()) throw qpid::Exception("Address must be specified!"); if (senders == 0 && receivers == 0) throw qpid::Exception("No senders and No receivers?"); if (messages == 0) throw qpid::Exception("The message count cannot be zero."); qpid::log::Logger::instance().configure(log); if (help) { std::ostringstream msg; std::cout << msg << *this << std::endl << std::endl << "Verifies the behavior of grouped messages." << std::endl; return false; } else { return true; } } catch (const std::exception& e) { std::cerr << *this << std::endl << std::endl << e.what() << std::endl; return false; } } }; const string EOS("eos"); const string SN("sn"); // class that monitors group state across all publishers and consumers. tracks the next // expected sequence for each group, and total messages consumed. class GroupChecker { qpid::sys::Mutex lock; uint consumerCt; uint producerCt; uint totalMsgs; uint totalMsgsConsumed; uint totalMsgsPublished; bool allowDuplicates; uint duplicateMsgs; typedef std::map SequenceMap; SequenceMap sequenceMap; // Statistics - for each group, store the names of all clients that consumed messages // from that group, and the number of messages consumed per client. typedef std::map ClientCounter; typedef std::map GroupStatistics; GroupStatistics statistics; public: GroupChecker( uint messages, uint consumers, uint producers, bool d) : consumerCt(consumers), producerCt(producers), totalMsgs(0), totalMsgsConsumed(0), totalMsgsPublished(0), allowDuplicates(d), duplicateMsgs(0) { // if consumering only - we a draining a queue of 'messages' queued messages. if (producerCt != 0) { totalMsgs = producers * messages; } else { totalMsgs = messages; } } bool checkSequence( const std::string& groupId, uint sequence, const std::string& client ) { qpid::sys::Mutex::ScopedLock l(lock); QPID_LOG(debug, "Client " << client << " has received " << groupId << ":" << sequence); GroupStatistics::iterator gs = statistics.find(groupId); if (gs == statistics.end()) { statistics[groupId][client] = 1; } else { gs->second[client]++; } // now verify SequenceMap::iterator s = sequenceMap.find(groupId); if (s == sequenceMap.end()) { QPID_LOG(debug, "Client " << client << " thinks this is the first message from group " << groupId << ":" << sequence); // if duplication allowed, it is possible that the last msg(s) of an old sequence are redelivered on reconnect. // in this case, set the sequence from the first msg. sequenceMap[groupId] = (allowDuplicates) ? sequence : 0; s = sequenceMap.find(groupId); } else if (sequence < s->second) { duplicateMsgs++; QPID_LOG(debug, "Client " << client << " thinks this message is a duplicate! " << groupId << ":" << sequence); return allowDuplicates; } totalMsgsConsumed++; return sequence == s->second++; } void sendingSequence( const std::string& groupId, uint sequence, bool eos, const std::string& client ) { qpid::sys::Mutex::ScopedLock l(lock); ++totalMsgsPublished; QPID_LOG(debug, "Client " << client << " sending " << groupId << ":" << sequence << ((eos) ? " (last)" : "")); } bool eraseGroup( const std::string& groupId, const std::string& name ) { qpid::sys::Mutex::ScopedLock l(lock); QPID_LOG(debug, "Deleting group " << groupId << " (by client " << name << ")"); return sequenceMap.erase( groupId ) == 1; } uint getNextExpectedSequence( const std::string& groupId ) { qpid::sys::Mutex::ScopedLock l(lock); return sequenceMap[groupId]; } bool allMsgsPublished() // true when done publishing msgs { qpid::sys::Mutex::ScopedLock l(lock); return (producerCt == 0 || totalMsgsPublished >= totalMsgs); } bool allMsgsConsumed() // true when done consuming msgs { qpid::sys::Mutex::ScopedLock l(lock); return (consumerCt == 0 || (totalMsgsConsumed >= totalMsgs && sequenceMap.size() == 0)); } uint getTotalMessages() { return totalMsgs; } uint getConsumedTotal() { qpid::sys::Mutex::ScopedLock l(lock); return totalMsgsConsumed; } uint getPublishedTotal() { qpid::sys::Mutex::ScopedLock l(lock); return totalMsgsPublished; } ostream& print(ostream& out) { qpid::sys::Mutex::ScopedLock l(lock); out << "Total Published: " << totalMsgsPublished << ", Total Consumed: " << totalMsgsConsumed << ", Duplicates detected: " << duplicateMsgs << std::endl; out << "Total Groups: " << statistics.size() << std::endl; unsigned long consumers = 0; for (GroupStatistics::iterator gs = statistics.begin(); gs != statistics.end(); ++gs) { out << " GroupId: " << gs->first; consumers += gs->second.size(); // # of consumers that processed this group if (gs->second.size() == 1) out << " completely consumed by a single client." << std::endl; else out << " consumed by " << gs->second.size() << " different clients." << std::endl; for (ClientCounter::iterator cc = gs->second.begin(); cc != gs->second.end(); ++cc) { out << " Client: " << cc->first << " consumed " << cc->second << " messages from the group." << std::endl; } } out << "Average # of consumers per group: " << ((statistics.size() != 0) ? (double(consumers)/statistics.size()) : 0) << std::endl; return out; } }; namespace { // rand() is not thread safe. Create a singleton obj to hold a lock while calling // rand() so it can be called safely by multiple concurrent clients. class Randomizer { qpid::sys::Mutex lock; public: uint operator()(uint max) { qpid::sys::Mutex::ScopedLock l(lock); return (rand() % max) + 1; } }; static Randomizer randomizer; } // tag each generated message with a group identifer // class GroupGenerator { const std::string groupPrefix; const uint groupSize; const bool randomizeSize; const uint interleave; uint groupSuffix; uint total; struct GroupState { std::string id; const uint size; uint count; GroupState( const std::string& i, const uint s ) : id(i), size(s), count(0) {} }; typedef std::list GroupList; GroupList groups; GroupList::iterator current; // add a new group identifier to the list void newGroup() { std::ostringstream groupId(groupPrefix, ios_base::out|ios_base::ate); groupId << std::string(":") << groupSuffix++; uint size = (randomizeSize) ? randomizer(groupSize) : groupSize; QPID_LOG(trace, "New group: GROUPID=[" << groupId.str() << "] size=" << size << " this=" << this); GroupState group( groupId.str(), size ); groups.push_back( group ); } public: GroupGenerator( const std::string& prefix, const uint t, const uint size, const bool randomize, const uint i) : groupPrefix(prefix), groupSize(size), randomizeSize(randomize), interleave(i), groupSuffix(0), total(t) { QPID_LOG(trace, "New group generator: PREFIX=[" << prefix << "] total=" << total << " size=" << size << " rand=" << randomize << " interleave=" << interleave << " this=" << this); for (uint i = 0; i < 1 || i < interleave; ++i) { newGroup(); } current = groups.begin(); } bool genGroup(std::string& groupId, uint& seq, bool& eos) { if (!total) return false; --total; if (current == groups.end()) current = groups.begin(); groupId = current->id; seq = current->count++; if (current->count == current->size) { QPID_LOG(trace, "Last msg for " << current->id << ", " << current->count << " this=" << this); eos = true; if (total >= interleave) { // need a new group to replace this one newGroup(); groups.erase(current++); } else ++current; } else { ++current; eos = total < interleave; // mark eos on the last message of each group } QPID_LOG(trace, "SENDING GROUPID=[" << groupId << "] seq=" << seq << " eos=" << eos << " this=" << this); return true; } }; class Client : public qpid::sys::Runnable { public: typedef boost::shared_ptr shared_ptr; enum State {ACTIVE, DONE, FAILURE}; Client( const std::string& n, const Options& o ) : name(n), opts(o), state(ACTIVE), stopped(false) {} virtual ~Client() {} State getState() { return state; } void testFailed( const std::string& reason ) { state = FAILURE; error << "Client '" << name << "' failed: " << reason; } void clientDone() { if (state == ACTIVE) state = DONE; } qpid::sys::Thread& getThread() { return thread; } const std::string getErrorMsg() { return error.str(); } void stop() {stopped = true;} const std::string& getName() { return name; } protected: const std::string name; const Options& opts; qpid::sys::Thread thread; ostringstream error; State state; bool stopped; }; class Consumer : public Client { GroupChecker& checker; public: Consumer(const std::string& n, const Options& o, GroupChecker& c ) : Client(n, o), checker(c) {}; virtual ~Consumer() {}; void run() { Connection connection; try { connection = Connection(opts.url, opts.connectionOptions); connection.open(); std::auto_ptr updates(opts.failoverUpdates ? new FailoverUpdates(connection) : 0); Session session = connection.createSession(); Receiver receiver = session.createReceiver(opts.address); receiver.setCapacity(opts.capacity); Message msg; uint count = 0; while (!stopped) { if (receiver.fetch(msg, Duration::SECOND)) { // msg retrieved qpid::types::Variant::Map& properties = msg.getProperties(); std::string groupId = properties[opts.groupKey]; uint groupSeq = properties[SN]; bool eof = properties[EOS]; QPID_LOG(trace, "RECVING GROUPID=[" << groupId << "] seq=" << groupSeq << " eos=" << eof << " name=" << name); qpid::sys::usleep(10); if (!checker.checkSequence( groupId, groupSeq, name )) { ostringstream msg; msg << "Check sequence failed. Group=" << groupId << " rcvd seq=" << groupSeq << " expected=" << checker.getNextExpectedSequence( groupId ); testFailed( msg.str() ); break; } else if (eof) { if (!checker.eraseGroup( groupId, name )) { ostringstream msg; msg << "Erase group failed. Group=" << groupId << " rcvd seq=" << groupSeq; testFailed( msg.str() ); break; } } ++count; if (opts.ackFrequency && (count % opts.ackFrequency == 0)) { session.acknowledge(); } // Clear out message properties & content for next iteration. msg = Message(); // TODO aconway 2010-12-01: should be done by fetch } else if (checker.allMsgsConsumed()) // timed out, nothing else to do? break; } session.acknowledge(); session.close(); connection.close(); } catch(const std::exception& error) { ostringstream msg; msg << "consumer error: " << error.what(); testFailed( msg.str() ); connection.close(); } clientDone(); QPID_LOG(trace, "Consuming client " << name << " completed."); } }; class Producer : public Client { GroupChecker& checker; GroupGenerator generator; public: Producer(const std::string& n, const Options& o, GroupChecker& c) : Client(n, o), checker(c), generator( n, o.messages, o.groupSize, o.randomizeSize, o.interleave ) {}; virtual ~Producer() {}; void run() { Connection connection; try { connection = Connection(opts.url, opts.connectionOptions); connection.open(); std::auto_ptr updates(opts.failoverUpdates ? new FailoverUpdates(connection) : 0); Session session = connection.createSession(); Sender sender = session.createSender(opts.address); if (opts.capacity) sender.setCapacity(opts.capacity); Message msg; msg.setDurable(opts.durable); std::string groupId; uint seq; bool eos; uint sent = 0; qpid::sys::AbsTime start = qpid::sys::now(); int64_t interval = 0; if (opts.sendRate) interval = qpid::sys::TIME_SEC/opts.sendRate; while (!stopped && generator.genGroup(groupId, seq, eos)) { msg.getProperties()[opts.groupKey] = groupId; msg.getProperties()[SN] = seq; msg.getProperties()[EOS] = eos; checker.sendingSequence( groupId, seq, eos, name ); sender.send(msg); ++sent; if (opts.sendRate) { qpid::sys::AbsTime waitTill(start, sent*interval); int64_t delay = qpid::sys::Duration(qpid::sys::now(), waitTill); if (delay > 0) qpid::sys::usleep(delay/qpid::sys::TIME_USEC); } } session.sync(); session.close(); connection.close(); } catch(const std::exception& error) { ostringstream msg; msg << "producer '" << name << "' error: " << error.what(); testFailed(msg.str()); connection.close(); } clientDone(); QPID_LOG(trace, "Producing client " << name << " completed."); } }; }} // namespace qpid::tests using namespace qpid::tests; int main(int argc, char ** argv) { int status = 0; try { Options opts; if (opts.parse(argc, argv)) { GroupChecker state( opts.messages, opts.receivers, opts.senders, opts.allowDuplicates); std::vector clients; if (opts.randomizeSize) srand((unsigned int)qpid::sys::SystemInfo::getProcessId()); // fire off the producers && consumers for (size_t j = 0; j < opts.senders; ++j) { ostringstream name; name << opts.prefix << "P_" << j; clients.push_back(Client::shared_ptr(new Producer( name.str(), opts, state ))); clients.back()->getThread() = qpid::sys::Thread(*clients.back()); } for (size_t j = 0; j < opts.receivers; ++j) { ostringstream name; name << opts.prefix << "C_" << j; clients.push_back(Client::shared_ptr(new Consumer( name.str(), opts, state ))); clients.back()->getThread() = qpid::sys::Thread(*clients.back()); } // wait for all pubs/subs to finish.... or for consumers to fail or stall. uint stalledTime = 0; bool clientFailed = false; while (!clientFailed && (!state.allMsgsPublished() || !state.allMsgsConsumed())) { uint lastCount; lastCount = state.getConsumedTotal(); qpid::sys::usleep( 1000000 ); // check each client for failures for (std::vector::iterator i = clients.begin(); i != clients.end(); ++i) { QPID_LOG(debug, "Client " << (*i)->getName() << " state=" << (*i)->getState()); if ((*i)->getState() == Client::FAILURE) { QPID_LOG(error, argv[0] << ": test failed with client error: " << (*i)->getErrorMsg()); clientFailed = true; break; // exit test. } } // check for stalled consumers if (!clientFailed && !state.allMsgsConsumed()) { if (lastCount == state.getConsumedTotal()) { if (++stalledTime >= opts.timeout) { clientFailed = true; break; // exit test } } else { stalledTime = 0; } } QPID_LOG(debug, "Consumed to date = " << state.getConsumedTotal() << " Published to date = " << state.getPublishedTotal() << " total=" << state.getTotalMessages()); } if (clientFailed) { if (stalledTime >= opts.timeout) { QPID_LOG(error, argv[0] << ": test failed due to stalled consumer." ); status = 2; } else { status = 1; } } // Wait for started threads. for (std::vector::iterator i = clients.begin(); i != clients.end(); ++i) { (*i)->stop(); (*i)->getThread().join(); } if (opts.printReport && !status) state.print(std::cout); } else status = 4; } catch(const std::exception& error) { QPID_LOG(error, argv[0] << ": " << error.what()); status = 3; } QPID_LOG(trace, "TEST DONE [" << status << "]"); return status; }