summaryrefslogtreecommitdiff
path: root/cpp/src/tests/qpid-send.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/tests/qpid-send.cpp')
-rw-r--r--cpp/src/tests/qpid-send.cpp104
1 files changed, 8 insertions, 96 deletions
diff --git a/cpp/src/tests/qpid-send.cpp b/cpp/src/tests/qpid-send.cpp
index b1213a484f..6a7e7838ce 100644
--- a/cpp/src/tests/qpid-send.cpp
+++ b/cpp/src/tests/qpid-send.cpp
@@ -28,7 +28,6 @@
#include <qpid/messaging/FailoverUpdates.h>
#include <qpid/sys/Time.h>
#include <qpid/sys/Monitor.h>
-#include <qpid/sys/SystemInfo.h>
#include "TestOptions.h"
#include "Statistics.h"
@@ -77,11 +76,6 @@ struct Options : public qpid::Options
uint flowControl;
bool sequence;
bool timestamp;
- std::string groupKey;
- std::string groupPrefix;
- uint groupSize;
- bool groupRandSize;
- uint groupInterleave;
Options(const std::string& argv0=std::string())
: qpid::Options("Options"),
@@ -106,23 +100,19 @@ struct Options : public qpid::Options
sendRate(0),
flowControl(0),
sequence(true),
- timestamp(true),
- groupPrefix("GROUP-"),
- groupSize(10),
- groupRandSize(false),
- groupInterleave(1)
+ timestamp(true)
{
addOptions()
("broker,b", qpid::optValue(url, "URL"), "url of broker to connect to")
- ("address,a", qpid::optValue(address, "ADDRESS"), "address to send to")
+ ("address,a", qpid::optValue(address, "ADDRESS"), "address to drain from")
("connection-options", qpid::optValue(connectionOptions, "OPTIONS"), "options for the connection")
("messages,m", qpid::optValue(messages, "N"), "stop after N messages have been sent, 0 means no limit")
("id,i", qpid::optValue(id, "ID"), "use the supplied id instead of generating one")
("reply-to", qpid::optValue(replyto, "REPLY-TO"), "specify reply-to address")
("send-eos", qpid::optValue(sendEos, "N"), "Send N EOS messages to mark end of input")
("durable", qpid::optValue(durable, "yes|no"), "Mark messages as durable.")
- ("ttl", qpid::optValue(ttl, "msecs"), "Time-to-live for messages, in milliseconds")
- ("priority", qpid::optValue(priority, "PRIORITY"), "Priority for messages (higher value implies higher priority)")
+ ("ttl", qpid::optValue(ttl, "msecs"), "Time-to-live for messages, in milliseconds")
+ ("priority", qpid::optValue(priority, "PRIORITY"), "Priority for messages (higher value implies higher priority)")
("property,P", qpid::optValue(properties, "NAME=VALUE"), "specify message property")
("correlation-id", qpid::optValue(correlationid, "ID"), "correlation-id for message")
("user-id", qpid::optValue(userid, "USERID"), "userid for message")
@@ -141,11 +131,6 @@ struct Options : public qpid::Options
("flow-control", qpid::optValue(flowControl,"N"), "Do end to end flow control to limit queue depth to 2*N. 0 means no flow control.")
("sequence", qpid::optValue(sequence, "yes|no"), "Add a sequence number messages property (required for duplicate/lost message detection)")
("timestamp", qpid::optValue(timestamp, "yes|no"), "Add a time stamp messages property (required for latency measurement)")
- ("group-key", qpid::optValue(groupKey, "KEY"), "Generate groups of messages using message header 'KEY' to hold the group identifier")
- ("group-prefix", qpid::optValue(groupPrefix, "STRING"), "Generate group identifers with 'STRING' prefix (if group-key specified)")
- ("group-size", qpid::optValue(groupSize, "N"), "Number of messages per a group (if group-key specified)")
- ("group-randomize-size", qpid::optValue(groupRandSize), "Randomize the number of messages per group to [1...group-size] (if group-key specified)")
- ("group-interleave", qpid::optValue(groupInterleave, "N"), "Simultaineously interleave messages from N different groups (if group-key specified)")
("help", qpid::optValue(help), "print this usage statement");
add(log);
}
@@ -267,68 +252,6 @@ class MapContentGenerator : public ContentGenerator {
const Options& opts;
};
-// tag each generated message with a group identifer
-//
-class GroupGenerator {
-public:
- GroupGenerator(const std::string& key,
- const std::string& prefix,
- const uint size,
- const bool randomize,
- const uint interleave)
- : groupKey(key), groupPrefix(prefix), groupSize(size),
- randomizeSize(randomize), groupSuffix(0)
- {
- if (randomize) srand((unsigned int)qpid::sys::SystemInfo::getProcessId());
-
- for (uint i = 0; i < 1 || i < interleave; ++i) {
- newGroup();
- }
- current = groups.begin();
- }
-
- void setGroupInfo(Message &msg)
- {
- if (current == groups.end())
- current = groups.begin();
- msg.getProperties()[groupKey] = current->id;
- // std::cout << "SENDING GROUPID=[" << current->id << "]" << std::endl;
- if (++(current->count) == current->size) {
- newGroup();
- groups.erase(current++);
- } else
- ++current;
- }
-
- private:
- const std::string& groupKey;
- const std::string& groupPrefix;
- const uint groupSize;
- const bool randomizeSize;
-
- uint groupSuffix;
-
- struct GroupState {
- std::string id;
- const uint size;
- uint count;
- GroupState( const std::string& i, const uint s )
- : id(i), size(s), count(0) {}
- };
- typedef std::list<GroupState> GroupList;
- GroupList groups;
- GroupList::iterator current;
-
- void newGroup() {
- std::ostringstream groupId(groupPrefix, ios_base::out|ios_base::ate);
- groupId << groupSuffix++;
- uint size = (randomizeSize) ? (rand() % groupSize) + 1 : groupSize;
- // std::cout << "New group: GROUPID=[" << groupId.str() << "] size=" << size << std::endl;
- GroupState group( groupId.str(), size );
- groups.push_back( group );
- }
-};
-
int main(int argc, char ** argv)
{
Connection connection;
@@ -373,14 +296,6 @@ int main(int argc, char ** argv)
else
contentGen.reset(new FixedContentGenerator(opts.contentString));
- std::auto_ptr<GroupGenerator> groupGen;
- if (!opts.groupKey.empty())
- groupGen.reset(new GroupGenerator(opts.groupKey,
- opts.groupPrefix,
- opts.groupSize,
- opts.groupRandSize,
- opts.groupInterleave));
-
qpid::sys::AbsTime start = qpid::sys::now();
int64_t interval = 0;
if (opts.sendRate) interval = qpid::sys::TIME_SEC/opts.sendRate;
@@ -397,6 +312,9 @@ int main(int argc, char ** argv)
++sent;
if (opts.sequence)
msg.getProperties()[SN] = sent;
+ if (opts.timestamp)
+ msg.getProperties()[TS] = int64_t(
+ qpid::sys::Duration(qpid::sys::EPOCH, qpid::sys::now()));
if (opts.flowControl) {
if ((sent % opts.flowControl) == 0) {
msg.setReplyTo(flowControlAddress);
@@ -405,12 +323,6 @@ int main(int argc, char ** argv)
else
msg.setReplyTo(Address()); // Clear the reply address.
}
- if (groupGen.get())
- groupGen->setGroupInfo(msg);
-
- if (opts.timestamp)
- msg.getProperties()[TS] = int64_t(
- qpid::sys::Duration(qpid::sys::EPOCH, qpid::sys::now()));
sender.send(msg);
reporter.message(msg);
@@ -456,7 +368,7 @@ int main(int argc, char ** argv)
return 0;
}
} catch(const std::exception& error) {
- std::cerr << "qpid-send: " << error.what() << std::endl;
+ std::cout << "Failed: " << error.what() << std::endl;
connection.close();
return 1;
}