summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2012-01-19 23:08:18 +0000
committerAlan Conway <aconway@apache.org>2012-01-19 23:08:18 +0000
commit6fd3663ff0c697584bbba5bcba5d61c3fefa9115 (patch)
tree5cb20b9e1ed6c084fa6688366884bd6fadd65a20
parente12d6247b844db1b307674f6c9ab10fdcd4beb0a (diff)
downloadqpid-python-6fd3663ff0c697584bbba5bcba5d61c3fefa9115.tar.gz
QPID-3603: Check for gaps in sequence numbers in qpid-receive.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-3603-2@1233680 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/tests/qpid-receive.cpp37
1 files changed, 22 insertions, 15 deletions
diff --git a/qpid/cpp/src/tests/qpid-receive.cpp b/qpid/cpp/src/tests/qpid-receive.cpp
index 9c713e872a..7e94e8cf39 100644
--- a/qpid/cpp/src/tests/qpid-receive.cpp
+++ b/qpid/cpp/src/tests/qpid-receive.cpp
@@ -53,6 +53,7 @@ struct Options : public qpid::Options
bool forever;
uint messages;
bool ignoreDuplicates;
+ bool verifySequence;
bool checkRedelivered;
uint capacity;
uint ackFrequency;
@@ -76,6 +77,7 @@ struct Options : public qpid::Options
forever(false),
messages(0),
ignoreDuplicates(false),
+ verifySequence(false),
checkRedelivered(false),
capacity(1000),
ackFrequency(100),
@@ -98,6 +100,7 @@ struct Options : public qpid::Options
("forever,f", qpid::optValue(forever), "ignore timeout and wait forever")
("messages,m", qpid::optValue(messages, "N"), "Number of messages to receive; 0 means receive indefinitely")
("ignore-duplicates", qpid::optValue(ignoreDuplicates), "Detect and ignore duplicates (by checking 'sn' header)")
+ ("verify-sequence", qpid::optValue(verifySequence), "Verify there are no gaps in the message sequence (by checking 'sn' header)")
("check-redelivered", qpid::optValue(checkRedelivered), "Fails with exception if a duplicate is not marked as redelivered (only relevant when ignore-duplicates is selected)")
("capacity", qpid::optValue(capacity, "N"), "Pre-fetch window (0 implies no pre-fetch)")
("ack-frequency", qpid::optValue(ackFrequency, "N"), "Ack frequency (0 implies none of the messages will get accepted)")
@@ -145,22 +148,29 @@ struct Options : public qpid::Options
const string EOS("eos");
const string SN("sn");
+/** Check for duplicate or dropped messages by sequence number */
class SequenceTracker
{
- uint lastSn;
public:
- SequenceTracker() : lastSn(0) {}
+ SequenceTracker(const Options& o) : opts(o), lastSn(0) {}
- bool isDuplicate(Message& message)
- {
+ /** Return true if the message should be procesed, false if it should be ignored. */
+ bool track(Message& message) {
uint sn = message.getProperties()[SN];
- if (lastSn < sn) {
- lastSn = sn;
- return false;
- } else {
- return true;
- }
+ bool duplicate = (sn <= lastSn);
+ bool dropped = (sn > lastSn+1);
+ if (opts.verifySequence && dropped)
+ throw Exception(QPID_MSG("Gap in sequence numbers " << lastSn << "-" << sn));
+ bool ignore = duplicate && opts.ignoreDuplicates;
+ if (ignore && opts.checkRedelivered && !message.getRedelivered())
+ throw qpid::Exception("duplicate sequence number received, message not marked as redelivered!");
+ if (!duplicate) lastSn = sn;
+ return !ignore;
}
+
+ private:
+ const Options& opts;
+ uint lastSn;
};
}} // namespace qpid::tests
@@ -182,13 +192,12 @@ int main(int argc, char ** argv)
Message msg;
uint count = 0;
uint txCount = 0;
- SequenceTracker sequenceTracker;
+ SequenceTracker sequenceTracker(opts);
Duration timeout = opts.getTimeout();
bool done = false;
Reporter<ThroughputAndLatency> reporter(std::cout, opts.reportEvery, opts.reportHeader);
if (!opts.readyAddress.empty())
session.createSender(opts.readyAddress).send(msg);
-
// For receive rate calculation
qpid::sys::AbsTime start = qpid::sys::now();
int64_t interval = 0;
@@ -198,7 +207,7 @@ int main(int argc, char ** argv)
while (!done && receiver.fetch(msg, timeout)) {
reporter.message(msg);
- if (!opts.ignoreDuplicates || !sequenceTracker.isDuplicate(msg)) {
+ if (sequenceTracker.track(msg)) {
if (msg.getContent() == EOS) {
done = true;
} else {
@@ -219,8 +228,6 @@ int main(int argc, char ** argv)
std::cout << msg.getContent() << std::endl;//TODO: handle map or list messages
if (opts.messages && count >= opts.messages) done = true;
}
- } else if (opts.checkRedelivered && !msg.getRedelivered()) {
- throw qpid::Exception("duplicate sequence number received, message not marked as redelivered!");
}
if (opts.tx && (count % opts.tx == 0)) {
if (opts.rollbackFrequency && (++txCount % opts.rollbackFrequency == 0)) {