From 6fd3663ff0c697584bbba5bcba5d61c3fefa9115 Mon Sep 17 00:00:00 2001 From: Alan Conway Date: Thu, 19 Jan 2012 23:08:18 +0000 Subject: QPID-3603: Check for gaps in sequence numbers in qpid-receive. git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-3603-2@1233680 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/cpp/src/tests/qpid-receive.cpp | 37 ++++++++++++++++++++++--------------- 1 file changed, 22 insertions(+), 15 deletions(-) diff --git a/qpid/cpp/src/tests/qpid-receive.cpp b/qpid/cpp/src/tests/qpid-receive.cpp index 9c713e872a..7e94e8cf39 100644 --- a/qpid/cpp/src/tests/qpid-receive.cpp +++ b/qpid/cpp/src/tests/qpid-receive.cpp @@ -53,6 +53,7 @@ struct Options : public qpid::Options bool forever; uint messages; bool ignoreDuplicates; + bool verifySequence; bool checkRedelivered; uint capacity; uint ackFrequency; @@ -76,6 +77,7 @@ struct Options : public qpid::Options forever(false), messages(0), ignoreDuplicates(false), + verifySequence(false), checkRedelivered(false), capacity(1000), ackFrequency(100), @@ -98,6 +100,7 @@ struct Options : public qpid::Options ("forever,f", qpid::optValue(forever), "ignore timeout and wait forever") ("messages,m", qpid::optValue(messages, "N"), "Number of messages to receive; 0 means receive indefinitely") ("ignore-duplicates", qpid::optValue(ignoreDuplicates), "Detect and ignore duplicates (by checking 'sn' header)") + ("verify-sequence", qpid::optValue(verifySequence), "Verify there are no gaps in the message sequence (by checking 'sn' header)") ("check-redelivered", qpid::optValue(checkRedelivered), "Fails with exception if a duplicate is not marked as redelivered (only relevant when ignore-duplicates is selected)") ("capacity", qpid::optValue(capacity, "N"), "Pre-fetch window (0 implies no pre-fetch)") ("ack-frequency", qpid::optValue(ackFrequency, "N"), "Ack frequency (0 implies none of the messages will get accepted)") @@ -145,22 +148,29 @@ struct Options : public qpid::Options const string EOS("eos"); const string SN("sn"); +/** Check for duplicate or dropped messages by sequence number */ class SequenceTracker { - uint lastSn; public: - SequenceTracker() : lastSn(0) {} + SequenceTracker(const Options& o) : opts(o), lastSn(0) {} - bool isDuplicate(Message& message) - { + /** Return true if the message should be procesed, false if it should be ignored. */ + bool track(Message& message) { uint sn = message.getProperties()[SN]; - if (lastSn < sn) { - lastSn = sn; - return false; - } else { - return true; - } + bool duplicate = (sn <= lastSn); + bool dropped = (sn > lastSn+1); + if (opts.verifySequence && dropped) + throw Exception(QPID_MSG("Gap in sequence numbers " << lastSn << "-" << sn)); + bool ignore = duplicate && opts.ignoreDuplicates; + if (ignore && opts.checkRedelivered && !message.getRedelivered()) + throw qpid::Exception("duplicate sequence number received, message not marked as redelivered!"); + if (!duplicate) lastSn = sn; + return !ignore; } + + private: + const Options& opts; + uint lastSn; }; }} // namespace qpid::tests @@ -182,13 +192,12 @@ int main(int argc, char ** argv) Message msg; uint count = 0; uint txCount = 0; - SequenceTracker sequenceTracker; + SequenceTracker sequenceTracker(opts); Duration timeout = opts.getTimeout(); bool done = false; Reporter reporter(std::cout, opts.reportEvery, opts.reportHeader); if (!opts.readyAddress.empty()) session.createSender(opts.readyAddress).send(msg); - // For receive rate calculation qpid::sys::AbsTime start = qpid::sys::now(); int64_t interval = 0; @@ -198,7 +207,7 @@ int main(int argc, char ** argv) while (!done && receiver.fetch(msg, timeout)) { reporter.message(msg); - if (!opts.ignoreDuplicates || !sequenceTracker.isDuplicate(msg)) { + if (sequenceTracker.track(msg)) { if (msg.getContent() == EOS) { done = true; } else { @@ -219,8 +228,6 @@ int main(int argc, char ** argv) std::cout << msg.getContent() << std::endl;//TODO: handle map or list messages if (opts.messages && count >= opts.messages) done = true; } - } else if (opts.checkRedelivered && !msg.getRedelivered()) { - throw qpid::Exception("duplicate sequence number received, message not marked as redelivered!"); } if (opts.tx && (count % opts.tx == 0)) { if (opts.rollbackFrequency && (++txCount % opts.rollbackFrequency == 0)) { -- cgit v1.2.1