diff options
author | Gordon Sim <gsim@apache.org> | 2008-11-20 17:24:55 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2008-11-20 17:24:55 +0000 |
commit | 682790a3536b8c12dd7d237f7f834c87980b741f (patch) | |
tree | 595db62d2ae4c8672067d646545d94e3a20cf2ac | |
parent | 7ad3988e599da508eb782ca84d97033b33692ac4 (diff) | |
download | qpid-python-682790a3536b8c12dd7d237f7f834c87980b741f.tar.gz |
Added some extra test options.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@719298 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/cpp/src/tests/receiver.cpp | 23 | ||||
-rw-r--r-- | qpid/cpp/src/tests/sender.cpp | 14 | ||||
-rw-r--r-- | qpid/cpp/src/tests/txjob.cpp | 2 |
3 files changed, 24 insertions, 15 deletions
diff --git a/qpid/cpp/src/tests/receiver.cpp b/qpid/cpp/src/tests/receiver.cpp index 3a4ac3649d..1b0b6b2548 100644 --- a/qpid/cpp/src/tests/receiver.cpp +++ b/qpid/cpp/src/tests/receiver.cpp @@ -41,13 +41,17 @@ struct Args : public qpid::TestOptions string queue; uint messages; bool ignoreDuplicates; + uint creditWindow; + uint ackFrequency; - Args() : queue("test-queue"), messages(0), ignoreDuplicates(false) + Args() : queue("test-queue"), messages(0), ignoreDuplicates(false), creditWindow(0), ackFrequency(1) { addOptions() ("queue", qpid::optValue(queue, "QUEUE NAME"), "Queue from which to request messages") ("messages", qpid::optValue(messages, "N"), "Number of messages to receive; 0 means receive indefinitely") - ("ignore-duplicates", qpid::optValue(ignoreDuplicates), "Detect and ignore duplicates (by checking 'sn' header)"); + ("ignore-duplicates", qpid::optValue(ignoreDuplicates), "Detect and ignore duplicates (by checking 'sn' header)") + ("credit-window", qpid::optValue(creditWindow, "N"), "Credit window (0 implies infinite window)") + ("ack-frequency", qpid::optValue(ackFrequency, "N"), "Ack frequency (0 implies none of the messages will get accepted)"); } }; @@ -56,13 +60,14 @@ const string EOS("eos"); class Receiver : public MessageListener, public FailoverManager::Command { public: - Receiver(const string& queue, uint messages, bool ignoreDuplicates); + Receiver(const string& queue, uint messages, bool ignoreDuplicates, uint creditWindow, uint ackFrequency); void received(Message& message); void execute(AsyncSession& session, bool isRetry); private: const string queue; const uint count; const bool skipDups; + SubscriptionSettings settings; Subscription subscription; uint processed; uint lastSn; @@ -70,8 +75,12 @@ class Receiver : public MessageListener, public FailoverManager::Command bool isDuplicate(Message& message); }; -Receiver::Receiver(const string& q, uint messages, bool ignoreDuplicates) : - queue(q), count(messages), skipDups(ignoreDuplicates), processed(0), lastSn(0) {} +Receiver::Receiver(const string& q, uint messages, bool ignoreDuplicates, uint creditWindow, uint ackFrequency) : + queue(q), count(messages), skipDups(ignoreDuplicates), processed(0), lastSn(0) +{ + if (creditWindow) settings.flowControl = FlowControl::messageWindow(creditWindow); + settings.autoAck = ackFrequency; +} void Receiver::received(Message & message) { @@ -96,7 +105,7 @@ bool Receiver::isDuplicate(Message& message) void Receiver::execute(AsyncSession& session, bool /*isRetry*/) { SubscriptionManager subs(session); - subscription = subs.subscribe(*this, queue); + subscription = subs.subscribe(*this, queue, settings); subs.run(); } @@ -106,7 +115,7 @@ int main(int argc, char ** argv) try { opts.parse(argc, argv); FailoverManager connection(opts.con); - Receiver receiver(opts.queue, opts.messages, opts.ignoreDuplicates); + Receiver receiver(opts.queue, opts.messages, opts.ignoreDuplicates, opts.creditWindow, opts.ackFrequency); connection.execute(receiver); connection.close(); return 0; diff --git a/qpid/cpp/src/tests/sender.cpp b/qpid/cpp/src/tests/sender.cpp index 2da1990041..a02b713d86 100644 --- a/qpid/cpp/src/tests/sender.cpp +++ b/qpid/cpp/src/tests/sender.cpp @@ -39,14 +39,14 @@ struct Args : public qpid::TestOptions { string destination; string key; - bool sendEos; + uint sendEos; - Args() : key("test-queue"), sendEos(false) + Args() : key("test-queue"), sendEos(0) { addOptions() ("exchange", qpid::optValue(destination, "EXCHANGE"), "Exchange to send messages to") ("routing-key", qpid::optValue(key, "KEY"), "Routing key to add to messages") - ("send-eos", qpid::optValue(sendEos), "Send EOS message to mark end of input"); + ("send-eos", qpid::optValue(sendEos, "N"), "Send N EOS messages to mark end of input"); } }; @@ -55,16 +55,16 @@ const string EOS("eos"); class Sender : public FailoverManager::Command { public: - Sender(const std::string& destination, const std::string& key, bool sendEos); + Sender(const std::string& destination, const std::string& key, uint sendEos); void execute(AsyncSession& session, bool isRetry); private: MessageReplayTracker sender; Message message; - const bool sendEos; + const uint sendEos; uint sent; }; -Sender::Sender(const std::string& destination, const std::string& key, bool eos) : +Sender::Sender(const std::string& destination, const std::string& key, uint eos) : sender(10), message(destination, key), sendEos(eos), sent(0) {} void Sender::execute(AsyncSession& session, bool isRetry) @@ -77,7 +77,7 @@ void Sender::execute(AsyncSession& session, bool isRetry) message.getHeaders().setInt("sn", ++sent); sender.send(message); } - if (sendEos) { + for (uint i = sendEos; i > 0; --i) { message.setData(EOS); sender.send(message); } diff --git a/qpid/cpp/src/tests/txjob.cpp b/qpid/cpp/src/tests/txjob.cpp index 336f77014d..94db96a666 100644 --- a/qpid/cpp/src/tests/txjob.cpp +++ b/qpid/cpp/src/tests/txjob.cpp @@ -81,7 +81,7 @@ int main(int argc, char** argv) } if (opts.quit) { - async(session).messageTransfer(arg::content=Message("quit", opts.workQueue)); + async(session).messageTransfer(arg::content=Message("quit", opts.workQueue)); } session.sync(); |