summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKenneth Anthony Giusti <kgiusti@apache.org>2011-08-31 20:39:51 +0000
committerKenneth Anthony Giusti <kgiusti@apache.org>2011-08-31 20:39:51 +0000
commit5d6f0702236ceb40e0118a20021caa1df36e5afb (patch)
tree9aaae233cae1cbf8e348a265780d9b3102ccad77
parente0c2638285f77dc6f024ff91cfaef583c4d69be7 (diff)
downloadqpid-python-5d6f0702236ceb40e0118a20021caa1df36e5afb.tar.gz
QPID-3346: add functional test, bugfix.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-3346@1163809 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/broker/Consumer.h2
-rw-r--r--qpid/cpp/src/qpid/broker/SemanticState.cpp17
-rw-r--r--qpid/cpp/src/qpid/broker/SemanticState.h7
-rw-r--r--qpid/cpp/src/qpid/cluster/UpdateClient.cpp18
-rw-r--r--qpid/cpp/src/tests/Makefile.am11
-rw-r--r--qpid/cpp/src/tests/msg_group_test.cpp481
-rwxr-xr-xqpid/cpp/src/tests/run_msg_group_tests60
7 files changed, 579 insertions, 17 deletions
diff --git a/qpid/cpp/src/qpid/broker/Consumer.h b/qpid/cpp/src/qpid/broker/Consumer.h
index 75deec5b27..2af9b0c121 100644
--- a/qpid/cpp/src/qpid/broker/Consumer.h
+++ b/qpid/cpp/src/qpid/broker/Consumer.h
@@ -36,6 +36,8 @@ class Consumer {
// inListeners allows QueueListeners to efficiently track if this instance is registered
// for notifications without having to search its containers
bool inListeners;
+ // the name is generated by broker and is unique within broker scope. It is not
+ // provided or known by the remote Consumer.
const std::string name;
public:
typedef boost::shared_ptr<Consumer> shared_ptr;
diff --git a/qpid/cpp/src/qpid/broker/SemanticState.cpp b/qpid/cpp/src/qpid/broker/SemanticState.cpp
index 24d6607ac1..52aad75748 100644
--- a/qpid/cpp/src/qpid/broker/SemanticState.cpp
+++ b/qpid/cpp/src/qpid/broker/SemanticState.cpp
@@ -107,11 +107,18 @@ bool SemanticState::exists(const string& consumerTag){
return consumers.find(consumerTag) != consumers.end();
}
+namespace {
+ const std::string SEPARATOR("::");
+}
+
void SemanticState::consume(const string& tag,
Queue::shared_ptr queue, bool ackRequired, bool acquire,
bool exclusive, const string& resumeId, uint64_t resumeTtl, const FieldTable& arguments)
{
- ConsumerImpl::shared_ptr c(new ConsumerImpl(this, tag, queue, ackRequired, acquire, exclusive, resumeId, resumeTtl, arguments));
+ // "tag" is only guaranteed to be unique to this session (see AMQP 0-10 Message.subscribe, destination).
+ // Create a globally unique name so the broker can identify individual consumers
+ std::string name = session.getSessionId().str() + SEPARATOR + tag;
+ ConsumerImpl::shared_ptr c(new ConsumerImpl(this, name, queue, ackRequired, acquire, exclusive, tag, resumeId, resumeTtl, arguments));
queue->consume(c, exclusive);//may throw exception
consumers[tag] = c;
}
@@ -263,6 +270,7 @@ SemanticState::ConsumerImpl::ConsumerImpl(SemanticState* _parent,
bool ack,
bool _acquire,
bool _exclusive,
+ const string& _tag,
const string& _resumeId,
uint64_t _resumeTtl,
const framing::FieldTable& _arguments
@@ -278,6 +286,7 @@ SemanticState::ConsumerImpl::ConsumerImpl(SemanticState* _parent,
windowing(true),
exclusive(_exclusive),
resumeId(_resumeId),
+ tag(_tag),
resumeTtl(_resumeTtl),
arguments(_arguments),
msgCredit(0),
@@ -294,7 +303,7 @@ SemanticState::ConsumerImpl::ConsumerImpl(SemanticState* _parent,
if (agent != 0)
{
- mgmtObject = new _qmf::Subscription(agent, this, ms , queue->GetManagementObject()->getObjectId(), getName(),
+ mgmtObject = new _qmf::Subscription(agent, this, ms , queue->GetManagementObject()->getObjectId(), getTag(),
!acquire, ackExpected, exclusive, ManagementAgent::toMap(arguments));
agent->addObject (mgmtObject);
mgmtObject->set_creditMode("WINDOW");
@@ -326,7 +335,7 @@ bool SemanticState::ConsumerImpl::deliver(QueuedMessage& msg)
{
assertClusterSafe();
allocateCredit(msg.payload);
- DeliveryRecord record(msg, queue, getName(), acquire, !ackExpected, windowing);
+ DeliveryRecord record(msg, queue, getTag(), acquire, !ackExpected, windowing);
bool sync = syncFrequency && ++deliveryCount >= syncFrequency;
if (sync) deliveryCount = 0;//reset
parent->deliver(record, sync);
@@ -364,7 +373,7 @@ struct ConsumerName {
};
ostream& operator<<(ostream& o, const ConsumerName& pc) {
- return o << pc.consumer.getName() << " on "
+ return o << pc.consumer.getTag() << " on "
<< pc.consumer.getParent().getSession().getSessionId();
}
}
diff --git a/qpid/cpp/src/qpid/broker/SemanticState.h b/qpid/cpp/src/qpid/broker/SemanticState.h
index 8947e1e35f..1cff46f369 100644
--- a/qpid/cpp/src/qpid/broker/SemanticState.h
+++ b/qpid/cpp/src/qpid/broker/SemanticState.h
@@ -82,6 +82,7 @@ class SemanticState : private boost::noncopyable {
bool windowing;
bool exclusive;
std::string resumeId;
+ const std::string tag; // <destination> from AMQP 0-10 Message.subscribe command
uint64_t resumeTtl;
framing::FieldTable arguments;
uint32_t msgCredit;
@@ -98,10 +99,11 @@ class SemanticState : private boost::noncopyable {
public:
typedef boost::shared_ptr<ConsumerImpl> shared_ptr;
- ConsumerImpl(SemanticState* parent,
+ ConsumerImpl(SemanticState* parent,
const std::string& name, boost::shared_ptr<Queue> queue,
bool ack, bool acquire, bool exclusive,
- const std::string& resumeId, uint64_t resumeTtl, const framing::FieldTable& arguments);
+ const std::string& tag, const std::string& resumeId,
+ uint64_t resumeTtl, const framing::FieldTable& arguments);
~ConsumerImpl();
OwnershipToken* getSession();
bool deliver(QueuedMessage& msg);
@@ -135,6 +137,7 @@ class SemanticState : private boost::noncopyable {
uint32_t getMsgCredit() const { return msgCredit; }
uint32_t getByteCredit() const { return byteCredit; }
std::string getResumeId() const { return resumeId; };
+ const std::string& getTag() const { return tag; }
uint64_t getResumeTtl() const { return resumeTtl; }
const framing::FieldTable& getArguments() const { return arguments; }
diff --git a/qpid/cpp/src/qpid/cluster/UpdateClient.cpp b/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
index afe5b8ac3a..f4784297a7 100644
--- a/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
+++ b/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
@@ -410,8 +410,8 @@ void UpdateClient::updateOutputTask(const sys::OutputTask* task) {
boost::polymorphic_downcast<const SemanticState::ConsumerImpl*> (task);
SemanticState::ConsumerImpl* ci = const_cast<SemanticState::ConsumerImpl*>(cci);
uint16_t channel = ci->getParent().getSession().getChannel();
- ClusterConnectionProxy(shadowConnection).outputTask(channel, ci->getName());
- QPID_LOG(debug, *this << " updating output task " << ci->getName()
+ ClusterConnectionProxy(shadowConnection).outputTask(channel, ci->getTag());
+ QPID_LOG(debug, *this << " updating output task " << ci->getTag()
<< " channel=" << channel);
}
@@ -509,13 +509,13 @@ void UpdateClient::updateSession(broker::SessionHandler& sh) {
void UpdateClient::updateConsumer(
const broker::SemanticState::ConsumerImpl::shared_ptr& ci)
{
- QPID_LOG(debug, *this << " updating consumer " << ci->getName() << " on "
+ QPID_LOG(debug, *this << " updating consumer " << ci->getTag() << " on "
<< shadowSession.getId());
using namespace message;
shadowSession.messageSubscribe(
arg::queue = ci->getQueue()->getName(),
- arg::destination = ci->getName(),
+ arg::destination = ci->getTag(),
arg::acceptMode = ci->isAckExpected() ? ACCEPT_MODE_EXPLICIT : ACCEPT_MODE_NONE,
arg::acquireMode = ci->isAcquire() ? ACQUIRE_MODE_PRE_ACQUIRED : ACQUIRE_MODE_NOT_ACQUIRED,
arg::exclusive = ci->isExclusive(),
@@ -523,18 +523,18 @@ void UpdateClient::updateConsumer(
arg::resumeTtl = ci->getResumeTtl(),
arg::arguments = ci->getArguments()
);
- shadowSession.messageSetFlowMode(ci->getName(), ci->isWindowing() ? FLOW_MODE_WINDOW : FLOW_MODE_CREDIT);
- shadowSession.messageFlow(ci->getName(), CREDIT_UNIT_MESSAGE, ci->getMsgCredit());
- shadowSession.messageFlow(ci->getName(), CREDIT_UNIT_BYTE, ci->getByteCredit());
+ shadowSession.messageSetFlowMode(ci->getTag(), ci->isWindowing() ? FLOW_MODE_WINDOW : FLOW_MODE_CREDIT);
+ shadowSession.messageFlow(ci->getTag(), CREDIT_UNIT_MESSAGE, ci->getMsgCredit());
+ shadowSession.messageFlow(ci->getTag(), CREDIT_UNIT_BYTE, ci->getByteCredit());
ClusterConnectionProxy(shadowSession).consumerState(
- ci->getName(),
+ ci->getTag(),
ci->isBlocked(),
ci->isNotifyEnabled(),
ci->position
);
consumerNumbering.add(ci.get());
- QPID_LOG(debug, *this << " updated consumer " << ci->getName()
+ QPID_LOG(debug, *this << " updated consumer " << ci->getTag()
<< " on " << shadowSession.getId());
}
diff --git a/qpid/cpp/src/tests/Makefile.am b/qpid/cpp/src/tests/Makefile.am
index ed97c41bff..e0b0837701 100644
--- a/qpid/cpp/src/tests/Makefile.am
+++ b/qpid/cpp/src/tests/Makefile.am
@@ -291,13 +291,19 @@ qpid_stream_INCLUDES=$(PUBLIC_INCLUDES)
qpid_stream_SOURCES=qpid-stream.cpp
qpid_stream_LDADD=$(lib_messaging)
+check_PROGRAMS+=msg_group_test
+msg_group_test_INCLUDES=$(PUBLIC_INCLUDES)
+msg_group_test_SOURCES=msg_group_test.cpp
+msg_group_test_LDADD=$(lib_messaging)
+
TESTS_ENVIRONMENT = \
VALGRIND=$(VALGRIND) \
LIBTOOL="$(LIBTOOL)" \
QPID_DATA_DIR= \
$(srcdir)/run_test
-system_tests = qpid-client-test quick_perftest quick_topictest run_header_test quick_txtest
+system_tests = qpid-client-test quick_perftest quick_topictest run_header_test quick_txtest \
+ run_msg_group_tests
TESTS += start_broker $(system_tests) python_tests stop_broker run_federation_tests \
run_acl_tests run_cli_tests replication_test dynamic_log_level_test \
run_queue_flow_limit_tests
@@ -340,7 +346,8 @@ EXTRA_DIST += \
start_broker.ps1 \
stop_broker.ps1 \
topictest.ps1 \
- run_queue_flow_limit_tests
+ run_queue_flow_limit_tests \
+ run_msg_group_tests
check_LTLIBRARIES += libdlclose_noop.la
libdlclose_noop_la_LDFLAGS = -module -rpath $(abs_builddir)
diff --git a/qpid/cpp/src/tests/msg_group_test.cpp b/qpid/cpp/src/tests/msg_group_test.cpp
new file mode 100644
index 0000000000..f1636bc0d8
--- /dev/null
+++ b/qpid/cpp/src/tests/msg_group_test.cpp
@@ -0,0 +1,481 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <qpid/messaging/Address.h>
+#include <qpid/messaging/Connection.h>
+#include <qpid/messaging/Receiver.h>
+#include <qpid/messaging/Sender.h>
+#include <qpid/messaging/Session.h>
+#include <qpid/messaging/Message.h>
+#include <qpid/messaging/FailoverUpdates.h>
+#include <qpid/Options.h>
+#include <qpid/log/Logger.h>
+#include <qpid/log/Options.h>
+#include "qpid/sys/Time.h"
+#include "qpid/sys/Runnable.h"
+#include "qpid/sys/Thread.h"
+#include "qpid/sys/SystemInfo.h"
+
+#include <iostream>
+#include <memory>
+#include <stdlib.h>
+
+using namespace qpid::messaging;
+using namespace qpid::types;
+using namespace std;
+
+namespace qpid {
+namespace tests {
+
+struct Options : public qpid::Options
+{
+ bool help;
+ std::string url;
+ std::string address;
+ std::string connectionOptions;
+ uint messages;
+ uint capacity;
+ uint ackFrequency;
+ bool failoverUpdates;
+ qpid::log::Options log;
+ uint senders;
+ uint receivers;
+ uint groupSize;
+ bool printReport;
+ std::string groupKey;
+ bool durable;
+ bool allowDuplicates;
+ bool randomizeSize;
+ bool stickyConsumer;
+
+ Options(const std::string& argv0=std::string())
+ : qpid::Options("Options"),
+ help(false),
+ url("amqp:tcp:127.0.0.1"),
+ messages(10000),
+ capacity(1000),
+ ackFrequency(100),
+ failoverUpdates(false),
+ log(argv0),
+ senders(2),
+ receivers(2),
+ groupSize(10),
+ printReport(false),
+ groupKey("qpid.no_group"),
+ durable(false),
+ allowDuplicates(false),
+ randomizeSize(false),
+ stickyConsumer(false)
+ {
+ addOptions()
+ ("ack-frequency", qpid::optValue(ackFrequency, "N"), "Ack frequency (0 implies none of the messages will get accepted)")
+ ("address,a", qpid::optValue(address, "ADDRESS"), "address to receive from")
+ ("allow-duplicates", qpid::optValue(allowDuplicates), "Ignore the delivery of duplicated messages")
+ ("broker,b", qpid::optValue(url, "URL"), "url of broker to connect to")
+ ("capacity", qpid::optValue(capacity, "N"), "Pre-fetch window (0 implies no pre-fetch)")
+ ("connection-options", qpid::optValue(connectionOptions, "OPTIONS"), "options for the connection")
+ ("durable", qpid::optValue(durable, "yes|no"), "Mark messages as durable.")
+ ("failover-updates", qpid::optValue(failoverUpdates), "Listen for membership updates distributed via amq.failover")
+ ("group-key", qpid::optValue(groupKey, "KEY"), "Key of the message header containing the group identifier.")
+ ("group-size", qpid::optValue(groupSize, "N"), "Number of messages per a group.")
+ ("messages,m", qpid::optValue(messages, "N"), "Number of messages to send per each sender.")
+ ("receivers,r", qpid::optValue(receivers, "N"), "Number of message consumers.")
+ ("randomize-group-size", qpid::optValue(randomizeSize), "Randomize the number of messages per group to [1...group-size].")
+ ("senders,s", qpid::optValue(senders, "N"), "Number of message producers.")
+ ("sticky-consumers", qpid::optValue(stickyConsumer), "If set, verify that all messages in a group are consumed by the same client [TBD].")
+ ("print-report", qpid::optValue(printReport, "yes|no"), "Dump message group statistics to stdout.")
+ ("help", qpid::optValue(help), "print this usage statement");
+ add(log);
+ //("check-redelivered", qpid::optValue(checkRedelivered), "Fails with exception if a duplicate is not marked as redelivered (only relevant when ignore-duplicates is selected)")
+ //("tx", qpid::optValue(tx, "N"), "batch size for transactions (0 implies transaction are not used)")
+ //("rollback-frequency", qpid::optValue(rollbackFrequency, "N"), "rollback frequency (0 implies no transaction will be rolledback)")
+ }
+
+ bool parse(int argc, char** argv)
+ {
+ try {
+ qpid::Options::parse(argc, argv);
+ if (address.empty()) throw qpid::Exception("Address must be specified!");
+ qpid::log::Logger::instance().configure(log);
+ if (help) {
+ std::ostringstream msg;
+ std::cout << msg << *this << std::endl << std::endl
+ << "Verifies the behavior of grouped messages." << std::endl;
+ return false;
+ } else {
+ return true;
+ }
+ } catch (const std::exception& e) {
+ std::cerr << *this << std::endl << std::endl << e.what() << std::endl;
+ return false;
+ }
+ }
+};
+
+const string EOS("eos");
+const string SN("sn");
+
+
+// class that monitors group state across all publishers and consumers. tracks the next
+// expected sequence for each group, and total messages consumed.
+class GroupChecker
+{
+ qpid::sys::Mutex lock;
+
+ const uint totalMsgsPublished;
+ uint totalMsgsConsumed;
+ bool allowDuplicates;
+ uint duplicateMsgs;
+
+ typedef std::map<std::string, uint> SequenceMap;
+ SequenceMap sequenceMap;
+
+ // Statistics - for each group, store the names of all clients that consumed messages
+ // from that group, and the number of messages consumed per client.
+ typedef std::map<std::string, uint> ClientCounter;
+ typedef std::map<std::string, ClientCounter> GroupStatistics;
+ GroupStatistics statistics;
+
+public:
+
+ GroupChecker( uint t, bool d ) :
+ totalMsgsPublished(t), totalMsgsConsumed(0), allowDuplicates(d),
+ duplicateMsgs(0) {}
+
+ bool checkSequence( const std::string& groupId,
+ uint sequence, const std::string& client )
+ {
+ qpid::sys::Mutex::ScopedLock l(lock);
+
+ GroupStatistics::iterator gs = statistics.find(groupId);
+ if (gs == statistics.end()) {
+ statistics[groupId][client] = 1;
+ } else {
+ gs->second[client]++;
+ }
+ // now verify
+ SequenceMap::iterator s = sequenceMap.find(groupId);
+ if (s == sequenceMap.end()) {
+ sequenceMap[groupId] = 1;
+ totalMsgsConsumed++;
+ return sequence == 0;
+ }
+ if (sequence < s->second) {
+ duplicateMsgs++;
+ return allowDuplicates;
+ }
+ totalMsgsConsumed++;
+ return sequence == s->second++;
+ }
+
+ bool eraseGroup( const std::string& groupId )
+ {
+ qpid::sys::Mutex::ScopedLock l(lock);
+ return sequenceMap.erase( groupId ) == 1;
+ }
+
+ uint getNextExpectedSequence( const std::string& groupId )
+ {
+ qpid::sys::Mutex::ScopedLock l(lock);
+ return sequenceMap[groupId];
+ }
+
+ bool allMsgsConsumed() // true when done processing msgs
+ {
+ qpid::sys::Mutex::ScopedLock l(lock);
+ return totalMsgsConsumed == totalMsgsPublished;
+ }
+
+ uint getConsumedTotal()
+ {
+ qpid::sys::Mutex::ScopedLock l(lock);
+ return totalMsgsConsumed;
+ }
+
+ ostream& print(ostream& out)
+ {
+ qpid::sys::Mutex::ScopedLock l(lock);
+ out << "Total Published: " << totalMsgsPublished << ", Total Consumed: " << totalMsgsConsumed <<
+ ", Duplicates detected: " << duplicateMsgs << std::endl;
+ out << "Total Groups: " << statistics.size() << std::endl;
+ unsigned long consumers = 0;
+ for (GroupStatistics::iterator gs = statistics.begin(); gs != statistics.end(); ++gs) {
+ out << " GroupId: " << gs->first;
+ consumers += gs->second.size(); // # of consumers that processed this group
+ if (gs->second.size() == 1)
+ out << " completely consumed by a single client." << std::endl;
+ else
+ out << " consumed by " << gs->second.size() << " different clients." << std::endl;
+
+ for (ClientCounter::iterator cc = gs->second.begin(); cc != gs->second.end(); ++cc) {
+ out << " Client: " << cc->first << " consumed " << cc->second << " messages from the group." << std::endl;
+ }
+ }
+ out << "Average # of consumers per group: " << ((statistics.size() != 0) ? (double(consumers)/statistics.size()) : 0) << std::endl;
+ return out;
+ }
+};
+
+
+namespace {
+ // rand() is not thread safe. Create a singleton obj to hold a lock while calling
+ // rand() so it can be called safely by multiple concurrent clients.
+ class Randomizer {
+ qpid::sys::Mutex lock;
+ public:
+ uint operator()(uint max) {
+ qpid::sys::Mutex::ScopedLock l(lock);
+ return (rand() % max) + 1;
+ }
+ };
+
+ static Randomizer randomize;
+}
+
+
+class Client : public qpid::sys::Runnable
+{
+public:
+ typedef boost::shared_ptr<Client> shared_ptr;
+ enum State {ACTIVE, DONE, FAILURE};
+ Client( const std::string& n, const Options& o ) : name(n), opts(o), state(ACTIVE), stopped(false) {}
+ virtual ~Client() {}
+ State getState() { return state; }
+ void testFailed( const std::string& reason ) { state = FAILURE; error << "Client '" << name << "' failed: " << reason; }
+ void clientDone() { if (state == ACTIVE) state = DONE; }
+ qpid::sys::Thread& getThread() { return thread; }
+ const std::string getErrorMsg() { return error.str(); }
+ void stop() {stopped = true;}
+
+protected:
+ const std::string name;
+ const Options& opts;
+ qpid::sys::Thread thread;
+ ostringstream error;
+ State state;
+ bool stopped;
+};
+
+
+class Consumer : public Client
+{
+ GroupChecker& checker;
+
+public:
+ Consumer(const std::string& n, const Options& o, GroupChecker& c ) : Client(n, o), checker(c) {};
+ virtual ~Consumer() {};
+
+ void run()
+ {
+ Connection connection;
+ try {
+ connection = Connection(opts.url, opts.connectionOptions);
+ connection.open();
+ std::auto_ptr<FailoverUpdates> updates(opts.failoverUpdates ? new FailoverUpdates(connection) : 0);
+ Session session = connection.createSession();
+ Receiver receiver = session.createReceiver(opts.address);
+ receiver.setCapacity(opts.capacity);
+ Message msg;
+ uint count = 0;
+
+ while (!stopped && !checker.allMsgsConsumed()) {
+
+ if (receiver.fetch(msg, Duration::SECOND)) { // msg retrieved
+
+ qpid::types::Variant::Map& properties = msg.getProperties();
+
+ std::string groupId = properties[opts.groupKey];
+ uint groupSeq = properties[SN];
+ bool eof = properties[EOS];
+
+ qpid::sys::usleep(10);
+
+ if (!checker.checkSequence( groupId, groupSeq, name )) {
+ ostringstream msg;
+ msg << "Check sequence failed. Group=" << groupId << " rcvd seq=" << groupSeq << " expected=" << checker.getNextExpectedSequence( groupId );
+ testFailed( msg.str() );
+ break;
+ } else if (eof) {
+ if (!checker.eraseGroup( groupId )) {
+ ostringstream msg;
+ msg << "Erase group failed. Group=" << groupId << " rcvd seq=" << groupSeq;
+ testFailed( msg.str() );
+ break;
+ }
+ }
+
+ ++count;
+ if (opts.ackFrequency && (count % opts.ackFrequency == 0)) {
+ session.acknowledge();
+ }
+ // Clear out message properties & content for next iteration.
+ msg = Message(); // TODO aconway 2010-12-01: should be done by fetch
+ }
+ }
+ session.acknowledge();
+ session.close();
+ connection.close();
+ } catch(const std::exception& error) {
+ ostringstream msg;
+ msg << "consumer error: " << error.what();
+ testFailed( msg.str() );
+ connection.close();
+ }
+ clientDone();
+ }
+};
+
+
+
+class Producer : public Client
+{
+public:
+ Producer(const std::string& n, const Options& o) : Client(n, o) {};
+ virtual ~Producer() {};
+
+ void run()
+ {
+ Connection connection;
+ try {
+ connection = Connection(opts.url, opts.connectionOptions);
+ connection.open();
+ std::auto_ptr<FailoverUpdates> updates(opts.failoverUpdates ? new FailoverUpdates(connection) : 0);
+ Session session = connection.createSession();
+ Sender sender = session.createSender(opts.address);
+ if (opts.capacity) sender.setCapacity(opts.capacity);
+ Message msg;
+ msg.setDurable(opts.durable);
+ uint sent = 0;
+ uint groupSeq = 0;
+ uint groupSize = opts.groupSize;
+ ostringstream group;
+ group << name << sent;
+ std::string groupId(group.str());
+
+ while (!stopped && sent < opts.messages) {
+ ++sent;
+ msg.getProperties()[opts.groupKey] = groupId;
+ msg.getProperties()[SN] = groupSeq++;
+ msg.getProperties()[EOS] = false;
+ if (groupSeq == groupSize) {
+ msg.getProperties()[EOS] = true;
+ // generate new group
+ ostringstream nextGroupId;
+ nextGroupId << name << sent;
+ groupId = nextGroupId.str();
+ groupSeq = 0;
+ if (opts.randomizeSize) {
+ groupSize = randomize(opts.groupSize);
+ }
+ }
+ sender.send(msg);
+ qpid::sys::usleep(10);
+ }
+ session.sync();
+ session.close();
+ connection.close();
+ } catch(const std::exception& error) {
+ ostringstream msg;
+ msg << "producer '" << name << "' error: " << error.what();
+ testFailed(msg.str());
+ connection.close();
+ }
+ clientDone();
+ }
+};
+
+
+}} // namespace qpid::tests
+
+using namespace qpid::tests;
+
+int main(int argc, char ** argv)
+{
+ int status = 0;
+ try {
+ Options opts;
+ if (opts.parse(argc, argv)) {
+
+ GroupChecker state( opts.senders * opts.messages,
+ opts.allowDuplicates);
+ std::vector<Client::shared_ptr> clients;
+
+ if (opts.randomizeSize) srand((unsigned int)qpid::sys::SystemInfo::getProcessId());
+
+ // fire off the producers && consumers
+ for (size_t j = 0; j < opts.senders; ++j) {
+ ostringstream name;
+ name << "P_" << j;
+ clients.push_back(Client::shared_ptr(new Producer( name.str(), opts )));
+ clients.back()->getThread() = qpid::sys::Thread(*clients.back());
+ }
+ for (size_t j = 0; j < opts.receivers; ++j) {
+ ostringstream name;
+ name << "C_" << j;
+ clients.push_back(Client::shared_ptr(new Consumer( name.str(), opts, state )));
+ clients.back()->getThread() = qpid::sys::Thread(*clients.back());
+ }
+
+ // wait for all pubs/subs to finish.... or for consumers to fail or stall.
+ uint lastCount;
+ bool done;
+ bool clientFailed = false;
+ do {
+ lastCount = state.getConsumedTotal();
+ qpid::sys::usleep( 1000000 );
+
+ // check each client for status
+ done = true;
+ for (std::vector<Client::shared_ptr>::iterator i = clients.begin();
+ i != clients.end(); ++i) {
+ if ((*i)->getState() == Client::FAILURE) {
+ std::cerr << argv[0] << ": test failed with client error: " << (*i)->getErrorMsg() << std::endl;
+ clientFailed = true;
+ done = true;
+ break;
+ } else if ((*i)->getState() != Client::DONE) {
+ done = false;
+ }
+ }
+ } while (!done && lastCount != state.getConsumedTotal());
+
+ if (clientFailed) {
+ status = 1;
+ } else if (!state.allMsgsConsumed()) {
+ std::cerr << argv[0] << ": test failed due to stalled consumer." << std::endl;
+ status = 2;
+ }
+
+ // Wait for started threads.
+ for (std::vector<Client::shared_ptr>::iterator i = clients.begin();
+ i != clients.end(); ++i) {
+ (*i)->stop();
+ (*i)->getThread().join();
+ }
+
+ if (opts.printReport && !status) state.print(std::cout);
+ }
+ } catch(const std::exception& error) {
+ std::cerr << argv[0] << ": " << error.what() << std::endl;
+ status = 3;
+ }
+ return status;
+}
diff --git a/qpid/cpp/src/tests/run_msg_group_tests b/qpid/cpp/src/tests/run_msg_group_tests
new file mode 100755
index 0000000000..6c429939bf
--- /dev/null
+++ b/qpid/cpp/src/tests/run_msg_group_tests
@@ -0,0 +1,60 @@
+#!/bin/bash
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#script to run a sequence of ring queue tests via make
+
+#setup path to find qpid-config and sender/receiver test progs
+source ./test_env.sh
+
+export PATH=$PWD:$srcdir:$PYTHON_COMMANDS:$PATH
+
+#set port to connect to via env var
+test -s qpidd.port && QPID_PORT=`cat qpidd.port`
+
+trap cleanup INT TERM QUIT
+
+QUEUE_NAME="group-queue"
+GROUP_KEY="My-Group-Id"
+
+BROKER_URL="-a ${QPID_BROKER:-localhost}:${QPID_PORT:-5672}"
+
+setup() {
+ qpid-config $BROKER_URL add queue $QUEUE_NAME --argument="qpid.group_header_key=${GROUP_KEY}"
+}
+
+cleanup() {
+ qpid-config $BROKER_URL del queue $QUEUE_NAME --force
+}
+
+run_test() {
+
+ msg_group_test -a $QUEUE_NAME --group-key $GROUP_KEY --capacity 3 --group-size 13 --ack-frequency 7 --messages 103 --receivers 3 --senders 5
+ RETCODE=$?
+ cleanup
+
+ if test x$RETCODE != x0; then
+ echo "FAIL message group tests"; exit 1;
+ fi
+}
+
+setup
+run_test
+
+