diff options
Diffstat (limited to 'cpp/src/tests/qpid-send.cpp')
-rw-r--r-- | cpp/src/tests/qpid-send.cpp | 104 |
1 files changed, 8 insertions, 96 deletions
diff --git a/cpp/src/tests/qpid-send.cpp b/cpp/src/tests/qpid-send.cpp index b1213a484f..6a7e7838ce 100644 --- a/cpp/src/tests/qpid-send.cpp +++ b/cpp/src/tests/qpid-send.cpp @@ -28,7 +28,6 @@ #include <qpid/messaging/FailoverUpdates.h> #include <qpid/sys/Time.h> #include <qpid/sys/Monitor.h> -#include <qpid/sys/SystemInfo.h> #include "TestOptions.h" #include "Statistics.h" @@ -77,11 +76,6 @@ struct Options : public qpid::Options uint flowControl; bool sequence; bool timestamp; - std::string groupKey; - std::string groupPrefix; - uint groupSize; - bool groupRandSize; - uint groupInterleave; Options(const std::string& argv0=std::string()) : qpid::Options("Options"), @@ -106,23 +100,19 @@ struct Options : public qpid::Options sendRate(0), flowControl(0), sequence(true), - timestamp(true), - groupPrefix("GROUP-"), - groupSize(10), - groupRandSize(false), - groupInterleave(1) + timestamp(true) { addOptions() ("broker,b", qpid::optValue(url, "URL"), "url of broker to connect to") - ("address,a", qpid::optValue(address, "ADDRESS"), "address to send to") + ("address,a", qpid::optValue(address, "ADDRESS"), "address to drain from") ("connection-options", qpid::optValue(connectionOptions, "OPTIONS"), "options for the connection") ("messages,m", qpid::optValue(messages, "N"), "stop after N messages have been sent, 0 means no limit") ("id,i", qpid::optValue(id, "ID"), "use the supplied id instead of generating one") ("reply-to", qpid::optValue(replyto, "REPLY-TO"), "specify reply-to address") ("send-eos", qpid::optValue(sendEos, "N"), "Send N EOS messages to mark end of input") ("durable", qpid::optValue(durable, "yes|no"), "Mark messages as durable.") - ("ttl", qpid::optValue(ttl, "msecs"), "Time-to-live for messages, in milliseconds") - ("priority", qpid::optValue(priority, "PRIORITY"), "Priority for messages (higher value implies higher priority)") + ("ttl", qpid::optValue(ttl, "msecs"), "Time-to-live for messages, in milliseconds") + ("priority", qpid::optValue(priority, "PRIORITY"), "Priority for messages (higher value implies higher priority)") ("property,P", qpid::optValue(properties, "NAME=VALUE"), "specify message property") ("correlation-id", qpid::optValue(correlationid, "ID"), "correlation-id for message") ("user-id", qpid::optValue(userid, "USERID"), "userid for message") @@ -141,11 +131,6 @@ struct Options : public qpid::Options ("flow-control", qpid::optValue(flowControl,"N"), "Do end to end flow control to limit queue depth to 2*N. 0 means no flow control.") ("sequence", qpid::optValue(sequence, "yes|no"), "Add a sequence number messages property (required for duplicate/lost message detection)") ("timestamp", qpid::optValue(timestamp, "yes|no"), "Add a time stamp messages property (required for latency measurement)") - ("group-key", qpid::optValue(groupKey, "KEY"), "Generate groups of messages using message header 'KEY' to hold the group identifier") - ("group-prefix", qpid::optValue(groupPrefix, "STRING"), "Generate group identifers with 'STRING' prefix (if group-key specified)") - ("group-size", qpid::optValue(groupSize, "N"), "Number of messages per a group (if group-key specified)") - ("group-randomize-size", qpid::optValue(groupRandSize), "Randomize the number of messages per group to [1...group-size] (if group-key specified)") - ("group-interleave", qpid::optValue(groupInterleave, "N"), "Simultaineously interleave messages from N different groups (if group-key specified)") ("help", qpid::optValue(help), "print this usage statement"); add(log); } @@ -267,68 +252,6 @@ class MapContentGenerator : public ContentGenerator { const Options& opts; }; -// tag each generated message with a group identifer -// -class GroupGenerator { -public: - GroupGenerator(const std::string& key, - const std::string& prefix, - const uint size, - const bool randomize, - const uint interleave) - : groupKey(key), groupPrefix(prefix), groupSize(size), - randomizeSize(randomize), groupSuffix(0) - { - if (randomize) srand((unsigned int)qpid::sys::SystemInfo::getProcessId()); - - for (uint i = 0; i < 1 || i < interleave; ++i) { - newGroup(); - } - current = groups.begin(); - } - - void setGroupInfo(Message &msg) - { - if (current == groups.end()) - current = groups.begin(); - msg.getProperties()[groupKey] = current->id; - // std::cout << "SENDING GROUPID=[" << current->id << "]" << std::endl; - if (++(current->count) == current->size) { - newGroup(); - groups.erase(current++); - } else - ++current; - } - - private: - const std::string& groupKey; - const std::string& groupPrefix; - const uint groupSize; - const bool randomizeSize; - - uint groupSuffix; - - struct GroupState { - std::string id; - const uint size; - uint count; - GroupState( const std::string& i, const uint s ) - : id(i), size(s), count(0) {} - }; - typedef std::list<GroupState> GroupList; - GroupList groups; - GroupList::iterator current; - - void newGroup() { - std::ostringstream groupId(groupPrefix, ios_base::out|ios_base::ate); - groupId << groupSuffix++; - uint size = (randomizeSize) ? (rand() % groupSize) + 1 : groupSize; - // std::cout << "New group: GROUPID=[" << groupId.str() << "] size=" << size << std::endl; - GroupState group( groupId.str(), size ); - groups.push_back( group ); - } -}; - int main(int argc, char ** argv) { Connection connection; @@ -373,14 +296,6 @@ int main(int argc, char ** argv) else contentGen.reset(new FixedContentGenerator(opts.contentString)); - std::auto_ptr<GroupGenerator> groupGen; - if (!opts.groupKey.empty()) - groupGen.reset(new GroupGenerator(opts.groupKey, - opts.groupPrefix, - opts.groupSize, - opts.groupRandSize, - opts.groupInterleave)); - qpid::sys::AbsTime start = qpid::sys::now(); int64_t interval = 0; if (opts.sendRate) interval = qpid::sys::TIME_SEC/opts.sendRate; @@ -397,6 +312,9 @@ int main(int argc, char ** argv) ++sent; if (opts.sequence) msg.getProperties()[SN] = sent; + if (opts.timestamp) + msg.getProperties()[TS] = int64_t( + qpid::sys::Duration(qpid::sys::EPOCH, qpid::sys::now())); if (opts.flowControl) { if ((sent % opts.flowControl) == 0) { msg.setReplyTo(flowControlAddress); @@ -405,12 +323,6 @@ int main(int argc, char ** argv) else msg.setReplyTo(Address()); // Clear the reply address. } - if (groupGen.get()) - groupGen->setGroupInfo(msg); - - if (opts.timestamp) - msg.getProperties()[TS] = int64_t( - qpid::sys::Duration(qpid::sys::EPOCH, qpid::sys::now())); sender.send(msg); reporter.message(msg); @@ -456,7 +368,7 @@ int main(int argc, char ** argv) return 0; } } catch(const std::exception& error) { - std::cerr << "qpid-send: " << error.what() << std::endl; + std::cout << "Failed: " << error.what() << std::endl; connection.close(); return 1; } |