diff options
author | Kenneth Anthony Giusti <kgiusti@apache.org> | 2011-08-31 20:39:51 +0000 |
---|---|---|
committer | Kenneth Anthony Giusti <kgiusti@apache.org> | 2011-08-31 20:39:51 +0000 |
commit | 5d6f0702236ceb40e0118a20021caa1df36e5afb (patch) | |
tree | 9aaae233cae1cbf8e348a265780d9b3102ccad77 | |
parent | e0c2638285f77dc6f024ff91cfaef583c4d69be7 (diff) | |
download | qpid-python-5d6f0702236ceb40e0118a20021caa1df36e5afb.tar.gz |
QPID-3346: add functional test, bugfix.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-3346@1163809 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/cpp/src/qpid/broker/Consumer.h | 2 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/SemanticState.cpp | 17 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/SemanticState.h | 7 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/UpdateClient.cpp | 18 | ||||
-rw-r--r-- | qpid/cpp/src/tests/Makefile.am | 11 | ||||
-rw-r--r-- | qpid/cpp/src/tests/msg_group_test.cpp | 481 | ||||
-rwxr-xr-x | qpid/cpp/src/tests/run_msg_group_tests | 60 |
7 files changed, 579 insertions, 17 deletions
diff --git a/qpid/cpp/src/qpid/broker/Consumer.h b/qpid/cpp/src/qpid/broker/Consumer.h index 75deec5b27..2af9b0c121 100644 --- a/qpid/cpp/src/qpid/broker/Consumer.h +++ b/qpid/cpp/src/qpid/broker/Consumer.h @@ -36,6 +36,8 @@ class Consumer { // inListeners allows QueueListeners to efficiently track if this instance is registered // for notifications without having to search its containers bool inListeners; + // the name is generated by broker and is unique within broker scope. It is not + // provided or known by the remote Consumer. const std::string name; public: typedef boost::shared_ptr<Consumer> shared_ptr; diff --git a/qpid/cpp/src/qpid/broker/SemanticState.cpp b/qpid/cpp/src/qpid/broker/SemanticState.cpp index 24d6607ac1..52aad75748 100644 --- a/qpid/cpp/src/qpid/broker/SemanticState.cpp +++ b/qpid/cpp/src/qpid/broker/SemanticState.cpp @@ -107,11 +107,18 @@ bool SemanticState::exists(const string& consumerTag){ return consumers.find(consumerTag) != consumers.end(); } +namespace { + const std::string SEPARATOR("::"); +} + void SemanticState::consume(const string& tag, Queue::shared_ptr queue, bool ackRequired, bool acquire, bool exclusive, const string& resumeId, uint64_t resumeTtl, const FieldTable& arguments) { - ConsumerImpl::shared_ptr c(new ConsumerImpl(this, tag, queue, ackRequired, acquire, exclusive, resumeId, resumeTtl, arguments)); + // "tag" is only guaranteed to be unique to this session (see AMQP 0-10 Message.subscribe, destination). + // Create a globally unique name so the broker can identify individual consumers + std::string name = session.getSessionId().str() + SEPARATOR + tag; + ConsumerImpl::shared_ptr c(new ConsumerImpl(this, name, queue, ackRequired, acquire, exclusive, tag, resumeId, resumeTtl, arguments)); queue->consume(c, exclusive);//may throw exception consumers[tag] = c; } @@ -263,6 +270,7 @@ SemanticState::ConsumerImpl::ConsumerImpl(SemanticState* _parent, bool ack, bool _acquire, bool _exclusive, + const string& _tag, const string& _resumeId, uint64_t _resumeTtl, const framing::FieldTable& _arguments @@ -278,6 +286,7 @@ SemanticState::ConsumerImpl::ConsumerImpl(SemanticState* _parent, windowing(true), exclusive(_exclusive), resumeId(_resumeId), + tag(_tag), resumeTtl(_resumeTtl), arguments(_arguments), msgCredit(0), @@ -294,7 +303,7 @@ SemanticState::ConsumerImpl::ConsumerImpl(SemanticState* _parent, if (agent != 0) { - mgmtObject = new _qmf::Subscription(agent, this, ms , queue->GetManagementObject()->getObjectId(), getName(), + mgmtObject = new _qmf::Subscription(agent, this, ms , queue->GetManagementObject()->getObjectId(), getTag(), !acquire, ackExpected, exclusive, ManagementAgent::toMap(arguments)); agent->addObject (mgmtObject); mgmtObject->set_creditMode("WINDOW"); @@ -326,7 +335,7 @@ bool SemanticState::ConsumerImpl::deliver(QueuedMessage& msg) { assertClusterSafe(); allocateCredit(msg.payload); - DeliveryRecord record(msg, queue, getName(), acquire, !ackExpected, windowing); + DeliveryRecord record(msg, queue, getTag(), acquire, !ackExpected, windowing); bool sync = syncFrequency && ++deliveryCount >= syncFrequency; if (sync) deliveryCount = 0;//reset parent->deliver(record, sync); @@ -364,7 +373,7 @@ struct ConsumerName { }; ostream& operator<<(ostream& o, const ConsumerName& pc) { - return o << pc.consumer.getName() << " on " + return o << pc.consumer.getTag() << " on " << pc.consumer.getParent().getSession().getSessionId(); } } diff --git a/qpid/cpp/src/qpid/broker/SemanticState.h b/qpid/cpp/src/qpid/broker/SemanticState.h index 8947e1e35f..1cff46f369 100644 --- a/qpid/cpp/src/qpid/broker/SemanticState.h +++ b/qpid/cpp/src/qpid/broker/SemanticState.h @@ -82,6 +82,7 @@ class SemanticState : private boost::noncopyable { bool windowing; bool exclusive; std::string resumeId; + const std::string tag; // <destination> from AMQP 0-10 Message.subscribe command uint64_t resumeTtl; framing::FieldTable arguments; uint32_t msgCredit; @@ -98,10 +99,11 @@ class SemanticState : private boost::noncopyable { public: typedef boost::shared_ptr<ConsumerImpl> shared_ptr; - ConsumerImpl(SemanticState* parent, + ConsumerImpl(SemanticState* parent, const std::string& name, boost::shared_ptr<Queue> queue, bool ack, bool acquire, bool exclusive, - const std::string& resumeId, uint64_t resumeTtl, const framing::FieldTable& arguments); + const std::string& tag, const std::string& resumeId, + uint64_t resumeTtl, const framing::FieldTable& arguments); ~ConsumerImpl(); OwnershipToken* getSession(); bool deliver(QueuedMessage& msg); @@ -135,6 +137,7 @@ class SemanticState : private boost::noncopyable { uint32_t getMsgCredit() const { return msgCredit; } uint32_t getByteCredit() const { return byteCredit; } std::string getResumeId() const { return resumeId; }; + const std::string& getTag() const { return tag; } uint64_t getResumeTtl() const { return resumeTtl; } const framing::FieldTable& getArguments() const { return arguments; } diff --git a/qpid/cpp/src/qpid/cluster/UpdateClient.cpp b/qpid/cpp/src/qpid/cluster/UpdateClient.cpp index afe5b8ac3a..f4784297a7 100644 --- a/qpid/cpp/src/qpid/cluster/UpdateClient.cpp +++ b/qpid/cpp/src/qpid/cluster/UpdateClient.cpp @@ -410,8 +410,8 @@ void UpdateClient::updateOutputTask(const sys::OutputTask* task) { boost::polymorphic_downcast<const SemanticState::ConsumerImpl*> (task); SemanticState::ConsumerImpl* ci = const_cast<SemanticState::ConsumerImpl*>(cci); uint16_t channel = ci->getParent().getSession().getChannel(); - ClusterConnectionProxy(shadowConnection).outputTask(channel, ci->getName()); - QPID_LOG(debug, *this << " updating output task " << ci->getName() + ClusterConnectionProxy(shadowConnection).outputTask(channel, ci->getTag()); + QPID_LOG(debug, *this << " updating output task " << ci->getTag() << " channel=" << channel); } @@ -509,13 +509,13 @@ void UpdateClient::updateSession(broker::SessionHandler& sh) { void UpdateClient::updateConsumer( const broker::SemanticState::ConsumerImpl::shared_ptr& ci) { - QPID_LOG(debug, *this << " updating consumer " << ci->getName() << " on " + QPID_LOG(debug, *this << " updating consumer " << ci->getTag() << " on " << shadowSession.getId()); using namespace message; shadowSession.messageSubscribe( arg::queue = ci->getQueue()->getName(), - arg::destination = ci->getName(), + arg::destination = ci->getTag(), arg::acceptMode = ci->isAckExpected() ? ACCEPT_MODE_EXPLICIT : ACCEPT_MODE_NONE, arg::acquireMode = ci->isAcquire() ? ACQUIRE_MODE_PRE_ACQUIRED : ACQUIRE_MODE_NOT_ACQUIRED, arg::exclusive = ci->isExclusive(), @@ -523,18 +523,18 @@ void UpdateClient::updateConsumer( arg::resumeTtl = ci->getResumeTtl(), arg::arguments = ci->getArguments() ); - shadowSession.messageSetFlowMode(ci->getName(), ci->isWindowing() ? FLOW_MODE_WINDOW : FLOW_MODE_CREDIT); - shadowSession.messageFlow(ci->getName(), CREDIT_UNIT_MESSAGE, ci->getMsgCredit()); - shadowSession.messageFlow(ci->getName(), CREDIT_UNIT_BYTE, ci->getByteCredit()); + shadowSession.messageSetFlowMode(ci->getTag(), ci->isWindowing() ? FLOW_MODE_WINDOW : FLOW_MODE_CREDIT); + shadowSession.messageFlow(ci->getTag(), CREDIT_UNIT_MESSAGE, ci->getMsgCredit()); + shadowSession.messageFlow(ci->getTag(), CREDIT_UNIT_BYTE, ci->getByteCredit()); ClusterConnectionProxy(shadowSession).consumerState( - ci->getName(), + ci->getTag(), ci->isBlocked(), ci->isNotifyEnabled(), ci->position ); consumerNumbering.add(ci.get()); - QPID_LOG(debug, *this << " updated consumer " << ci->getName() + QPID_LOG(debug, *this << " updated consumer " << ci->getTag() << " on " << shadowSession.getId()); } diff --git a/qpid/cpp/src/tests/Makefile.am b/qpid/cpp/src/tests/Makefile.am index ed97c41bff..e0b0837701 100644 --- a/qpid/cpp/src/tests/Makefile.am +++ b/qpid/cpp/src/tests/Makefile.am @@ -291,13 +291,19 @@ qpid_stream_INCLUDES=$(PUBLIC_INCLUDES) qpid_stream_SOURCES=qpid-stream.cpp qpid_stream_LDADD=$(lib_messaging) +check_PROGRAMS+=msg_group_test +msg_group_test_INCLUDES=$(PUBLIC_INCLUDES) +msg_group_test_SOURCES=msg_group_test.cpp +msg_group_test_LDADD=$(lib_messaging) + TESTS_ENVIRONMENT = \ VALGRIND=$(VALGRIND) \ LIBTOOL="$(LIBTOOL)" \ QPID_DATA_DIR= \ $(srcdir)/run_test -system_tests = qpid-client-test quick_perftest quick_topictest run_header_test quick_txtest +system_tests = qpid-client-test quick_perftest quick_topictest run_header_test quick_txtest \ + run_msg_group_tests TESTS += start_broker $(system_tests) python_tests stop_broker run_federation_tests \ run_acl_tests run_cli_tests replication_test dynamic_log_level_test \ run_queue_flow_limit_tests @@ -340,7 +346,8 @@ EXTRA_DIST += \ start_broker.ps1 \ stop_broker.ps1 \ topictest.ps1 \ - run_queue_flow_limit_tests + run_queue_flow_limit_tests \ + run_msg_group_tests check_LTLIBRARIES += libdlclose_noop.la libdlclose_noop_la_LDFLAGS = -module -rpath $(abs_builddir) diff --git a/qpid/cpp/src/tests/msg_group_test.cpp b/qpid/cpp/src/tests/msg_group_test.cpp new file mode 100644 index 0000000000..f1636bc0d8 --- /dev/null +++ b/qpid/cpp/src/tests/msg_group_test.cpp @@ -0,0 +1,481 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <qpid/messaging/Address.h> +#include <qpid/messaging/Connection.h> +#include <qpid/messaging/Receiver.h> +#include <qpid/messaging/Sender.h> +#include <qpid/messaging/Session.h> +#include <qpid/messaging/Message.h> +#include <qpid/messaging/FailoverUpdates.h> +#include <qpid/Options.h> +#include <qpid/log/Logger.h> +#include <qpid/log/Options.h> +#include "qpid/sys/Time.h" +#include "qpid/sys/Runnable.h" +#include "qpid/sys/Thread.h" +#include "qpid/sys/SystemInfo.h" + +#include <iostream> +#include <memory> +#include <stdlib.h> + +using namespace qpid::messaging; +using namespace qpid::types; +using namespace std; + +namespace qpid { +namespace tests { + +struct Options : public qpid::Options +{ + bool help; + std::string url; + std::string address; + std::string connectionOptions; + uint messages; + uint capacity; + uint ackFrequency; + bool failoverUpdates; + qpid::log::Options log; + uint senders; + uint receivers; + uint groupSize; + bool printReport; + std::string groupKey; + bool durable; + bool allowDuplicates; + bool randomizeSize; + bool stickyConsumer; + + Options(const std::string& argv0=std::string()) + : qpid::Options("Options"), + help(false), + url("amqp:tcp:127.0.0.1"), + messages(10000), + capacity(1000), + ackFrequency(100), + failoverUpdates(false), + log(argv0), + senders(2), + receivers(2), + groupSize(10), + printReport(false), + groupKey("qpid.no_group"), + durable(false), + allowDuplicates(false), + randomizeSize(false), + stickyConsumer(false) + { + addOptions() + ("ack-frequency", qpid::optValue(ackFrequency, "N"), "Ack frequency (0 implies none of the messages will get accepted)") + ("address,a", qpid::optValue(address, "ADDRESS"), "address to receive from") + ("allow-duplicates", qpid::optValue(allowDuplicates), "Ignore the delivery of duplicated messages") + ("broker,b", qpid::optValue(url, "URL"), "url of broker to connect to") + ("capacity", qpid::optValue(capacity, "N"), "Pre-fetch window (0 implies no pre-fetch)") + ("connection-options", qpid::optValue(connectionOptions, "OPTIONS"), "options for the connection") + ("durable", qpid::optValue(durable, "yes|no"), "Mark messages as durable.") + ("failover-updates", qpid::optValue(failoverUpdates), "Listen for membership updates distributed via amq.failover") + ("group-key", qpid::optValue(groupKey, "KEY"), "Key of the message header containing the group identifier.") + ("group-size", qpid::optValue(groupSize, "N"), "Number of messages per a group.") + ("messages,m", qpid::optValue(messages, "N"), "Number of messages to send per each sender.") + ("receivers,r", qpid::optValue(receivers, "N"), "Number of message consumers.") + ("randomize-group-size", qpid::optValue(randomizeSize), "Randomize the number of messages per group to [1...group-size].") + ("senders,s", qpid::optValue(senders, "N"), "Number of message producers.") + ("sticky-consumers", qpid::optValue(stickyConsumer), "If set, verify that all messages in a group are consumed by the same client [TBD].") + ("print-report", qpid::optValue(printReport, "yes|no"), "Dump message group statistics to stdout.") + ("help", qpid::optValue(help), "print this usage statement"); + add(log); + //("check-redelivered", qpid::optValue(checkRedelivered), "Fails with exception if a duplicate is not marked as redelivered (only relevant when ignore-duplicates is selected)") + //("tx", qpid::optValue(tx, "N"), "batch size for transactions (0 implies transaction are not used)") + //("rollback-frequency", qpid::optValue(rollbackFrequency, "N"), "rollback frequency (0 implies no transaction will be rolledback)") + } + + bool parse(int argc, char** argv) + { + try { + qpid::Options::parse(argc, argv); + if (address.empty()) throw qpid::Exception("Address must be specified!"); + qpid::log::Logger::instance().configure(log); + if (help) { + std::ostringstream msg; + std::cout << msg << *this << std::endl << std::endl + << "Verifies the behavior of grouped messages." << std::endl; + return false; + } else { + return true; + } + } catch (const std::exception& e) { + std::cerr << *this << std::endl << std::endl << e.what() << std::endl; + return false; + } + } +}; + +const string EOS("eos"); +const string SN("sn"); + + +// class that monitors group state across all publishers and consumers. tracks the next +// expected sequence for each group, and total messages consumed. +class GroupChecker +{ + qpid::sys::Mutex lock; + + const uint totalMsgsPublished; + uint totalMsgsConsumed; + bool allowDuplicates; + uint duplicateMsgs; + + typedef std::map<std::string, uint> SequenceMap; + SequenceMap sequenceMap; + + // Statistics - for each group, store the names of all clients that consumed messages + // from that group, and the number of messages consumed per client. + typedef std::map<std::string, uint> ClientCounter; + typedef std::map<std::string, ClientCounter> GroupStatistics; + GroupStatistics statistics; + +public: + + GroupChecker( uint t, bool d ) : + totalMsgsPublished(t), totalMsgsConsumed(0), allowDuplicates(d), + duplicateMsgs(0) {} + + bool checkSequence( const std::string& groupId, + uint sequence, const std::string& client ) + { + qpid::sys::Mutex::ScopedLock l(lock); + + GroupStatistics::iterator gs = statistics.find(groupId); + if (gs == statistics.end()) { + statistics[groupId][client] = 1; + } else { + gs->second[client]++; + } + // now verify + SequenceMap::iterator s = sequenceMap.find(groupId); + if (s == sequenceMap.end()) { + sequenceMap[groupId] = 1; + totalMsgsConsumed++; + return sequence == 0; + } + if (sequence < s->second) { + duplicateMsgs++; + return allowDuplicates; + } + totalMsgsConsumed++; + return sequence == s->second++; + } + + bool eraseGroup( const std::string& groupId ) + { + qpid::sys::Mutex::ScopedLock l(lock); + return sequenceMap.erase( groupId ) == 1; + } + + uint getNextExpectedSequence( const std::string& groupId ) + { + qpid::sys::Mutex::ScopedLock l(lock); + return sequenceMap[groupId]; + } + + bool allMsgsConsumed() // true when done processing msgs + { + qpid::sys::Mutex::ScopedLock l(lock); + return totalMsgsConsumed == totalMsgsPublished; + } + + uint getConsumedTotal() + { + qpid::sys::Mutex::ScopedLock l(lock); + return totalMsgsConsumed; + } + + ostream& print(ostream& out) + { + qpid::sys::Mutex::ScopedLock l(lock); + out << "Total Published: " << totalMsgsPublished << ", Total Consumed: " << totalMsgsConsumed << + ", Duplicates detected: " << duplicateMsgs << std::endl; + out << "Total Groups: " << statistics.size() << std::endl; + unsigned long consumers = 0; + for (GroupStatistics::iterator gs = statistics.begin(); gs != statistics.end(); ++gs) { + out << " GroupId: " << gs->first; + consumers += gs->second.size(); // # of consumers that processed this group + if (gs->second.size() == 1) + out << " completely consumed by a single client." << std::endl; + else + out << " consumed by " << gs->second.size() << " different clients." << std::endl; + + for (ClientCounter::iterator cc = gs->second.begin(); cc != gs->second.end(); ++cc) { + out << " Client: " << cc->first << " consumed " << cc->second << " messages from the group." << std::endl; + } + } + out << "Average # of consumers per group: " << ((statistics.size() != 0) ? (double(consumers)/statistics.size()) : 0) << std::endl; + return out; + } +}; + + +namespace { + // rand() is not thread safe. Create a singleton obj to hold a lock while calling + // rand() so it can be called safely by multiple concurrent clients. + class Randomizer { + qpid::sys::Mutex lock; + public: + uint operator()(uint max) { + qpid::sys::Mutex::ScopedLock l(lock); + return (rand() % max) + 1; + } + }; + + static Randomizer randomize; +} + + +class Client : public qpid::sys::Runnable +{ +public: + typedef boost::shared_ptr<Client> shared_ptr; + enum State {ACTIVE, DONE, FAILURE}; + Client( const std::string& n, const Options& o ) : name(n), opts(o), state(ACTIVE), stopped(false) {} + virtual ~Client() {} + State getState() { return state; } + void testFailed( const std::string& reason ) { state = FAILURE; error << "Client '" << name << "' failed: " << reason; } + void clientDone() { if (state == ACTIVE) state = DONE; } + qpid::sys::Thread& getThread() { return thread; } + const std::string getErrorMsg() { return error.str(); } + void stop() {stopped = true;} + +protected: + const std::string name; + const Options& opts; + qpid::sys::Thread thread; + ostringstream error; + State state; + bool stopped; +}; + + +class Consumer : public Client +{ + GroupChecker& checker; + +public: + Consumer(const std::string& n, const Options& o, GroupChecker& c ) : Client(n, o), checker(c) {}; + virtual ~Consumer() {}; + + void run() + { + Connection connection; + try { + connection = Connection(opts.url, opts.connectionOptions); + connection.open(); + std::auto_ptr<FailoverUpdates> updates(opts.failoverUpdates ? new FailoverUpdates(connection) : 0); + Session session = connection.createSession(); + Receiver receiver = session.createReceiver(opts.address); + receiver.setCapacity(opts.capacity); + Message msg; + uint count = 0; + + while (!stopped && !checker.allMsgsConsumed()) { + + if (receiver.fetch(msg, Duration::SECOND)) { // msg retrieved + + qpid::types::Variant::Map& properties = msg.getProperties(); + + std::string groupId = properties[opts.groupKey]; + uint groupSeq = properties[SN]; + bool eof = properties[EOS]; + + qpid::sys::usleep(10); + + if (!checker.checkSequence( groupId, groupSeq, name )) { + ostringstream msg; + msg << "Check sequence failed. Group=" << groupId << " rcvd seq=" << groupSeq << " expected=" << checker.getNextExpectedSequence( groupId ); + testFailed( msg.str() ); + break; + } else if (eof) { + if (!checker.eraseGroup( groupId )) { + ostringstream msg; + msg << "Erase group failed. Group=" << groupId << " rcvd seq=" << groupSeq; + testFailed( msg.str() ); + break; + } + } + + ++count; + if (opts.ackFrequency && (count % opts.ackFrequency == 0)) { + session.acknowledge(); + } + // Clear out message properties & content for next iteration. + msg = Message(); // TODO aconway 2010-12-01: should be done by fetch + } + } + session.acknowledge(); + session.close(); + connection.close(); + } catch(const std::exception& error) { + ostringstream msg; + msg << "consumer error: " << error.what(); + testFailed( msg.str() ); + connection.close(); + } + clientDone(); + } +}; + + + +class Producer : public Client +{ +public: + Producer(const std::string& n, const Options& o) : Client(n, o) {}; + virtual ~Producer() {}; + + void run() + { + Connection connection; + try { + connection = Connection(opts.url, opts.connectionOptions); + connection.open(); + std::auto_ptr<FailoverUpdates> updates(opts.failoverUpdates ? new FailoverUpdates(connection) : 0); + Session session = connection.createSession(); + Sender sender = session.createSender(opts.address); + if (opts.capacity) sender.setCapacity(opts.capacity); + Message msg; + msg.setDurable(opts.durable); + uint sent = 0; + uint groupSeq = 0; + uint groupSize = opts.groupSize; + ostringstream group; + group << name << sent; + std::string groupId(group.str()); + + while (!stopped && sent < opts.messages) { + ++sent; + msg.getProperties()[opts.groupKey] = groupId; + msg.getProperties()[SN] = groupSeq++; + msg.getProperties()[EOS] = false; + if (groupSeq == groupSize) { + msg.getProperties()[EOS] = true; + // generate new group + ostringstream nextGroupId; + nextGroupId << name << sent; + groupId = nextGroupId.str(); + groupSeq = 0; + if (opts.randomizeSize) { + groupSize = randomize(opts.groupSize); + } + } + sender.send(msg); + qpid::sys::usleep(10); + } + session.sync(); + session.close(); + connection.close(); + } catch(const std::exception& error) { + ostringstream msg; + msg << "producer '" << name << "' error: " << error.what(); + testFailed(msg.str()); + connection.close(); + } + clientDone(); + } +}; + + +}} // namespace qpid::tests + +using namespace qpid::tests; + +int main(int argc, char ** argv) +{ + int status = 0; + try { + Options opts; + if (opts.parse(argc, argv)) { + + GroupChecker state( opts.senders * opts.messages, + opts.allowDuplicates); + std::vector<Client::shared_ptr> clients; + + if (opts.randomizeSize) srand((unsigned int)qpid::sys::SystemInfo::getProcessId()); + + // fire off the producers && consumers + for (size_t j = 0; j < opts.senders; ++j) { + ostringstream name; + name << "P_" << j; + clients.push_back(Client::shared_ptr(new Producer( name.str(), opts ))); + clients.back()->getThread() = qpid::sys::Thread(*clients.back()); + } + for (size_t j = 0; j < opts.receivers; ++j) { + ostringstream name; + name << "C_" << j; + clients.push_back(Client::shared_ptr(new Consumer( name.str(), opts, state ))); + clients.back()->getThread() = qpid::sys::Thread(*clients.back()); + } + + // wait for all pubs/subs to finish.... or for consumers to fail or stall. + uint lastCount; + bool done; + bool clientFailed = false; + do { + lastCount = state.getConsumedTotal(); + qpid::sys::usleep( 1000000 ); + + // check each client for status + done = true; + for (std::vector<Client::shared_ptr>::iterator i = clients.begin(); + i != clients.end(); ++i) { + if ((*i)->getState() == Client::FAILURE) { + std::cerr << argv[0] << ": test failed with client error: " << (*i)->getErrorMsg() << std::endl; + clientFailed = true; + done = true; + break; + } else if ((*i)->getState() != Client::DONE) { + done = false; + } + } + } while (!done && lastCount != state.getConsumedTotal()); + + if (clientFailed) { + status = 1; + } else if (!state.allMsgsConsumed()) { + std::cerr << argv[0] << ": test failed due to stalled consumer." << std::endl; + status = 2; + } + + // Wait for started threads. + for (std::vector<Client::shared_ptr>::iterator i = clients.begin(); + i != clients.end(); ++i) { + (*i)->stop(); + (*i)->getThread().join(); + } + + if (opts.printReport && !status) state.print(std::cout); + } + } catch(const std::exception& error) { + std::cerr << argv[0] << ": " << error.what() << std::endl; + status = 3; + } + return status; +} diff --git a/qpid/cpp/src/tests/run_msg_group_tests b/qpid/cpp/src/tests/run_msg_group_tests new file mode 100755 index 0000000000..6c429939bf --- /dev/null +++ b/qpid/cpp/src/tests/run_msg_group_tests @@ -0,0 +1,60 @@ +#!/bin/bash + +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +#script to run a sequence of ring queue tests via make + +#setup path to find qpid-config and sender/receiver test progs +source ./test_env.sh + +export PATH=$PWD:$srcdir:$PYTHON_COMMANDS:$PATH + +#set port to connect to via env var +test -s qpidd.port && QPID_PORT=`cat qpidd.port` + +trap cleanup INT TERM QUIT + +QUEUE_NAME="group-queue" +GROUP_KEY="My-Group-Id" + +BROKER_URL="-a ${QPID_BROKER:-localhost}:${QPID_PORT:-5672}" + +setup() { + qpid-config $BROKER_URL add queue $QUEUE_NAME --argument="qpid.group_header_key=${GROUP_KEY}" +} + +cleanup() { + qpid-config $BROKER_URL del queue $QUEUE_NAME --force +} + +run_test() { + + msg_group_test -a $QUEUE_NAME --group-key $GROUP_KEY --capacity 3 --group-size 13 --ack-frequency 7 --messages 103 --receivers 3 --senders 5 + RETCODE=$? + cleanup + + if test x$RETCODE != x0; then + echo "FAIL message group tests"; exit 1; + fi +} + +setup +run_test + + |