diff options
author | Gordon Sim <gsim@apache.org> | 2011-02-25 16:14:32 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2011-02-25 16:14:32 +0000 |
commit | 667818f1417c742bc4c49465ef83562a693f0e49 (patch) | |
tree | 56b390c8ff085ddef0ed936d9e0f857dab94ad75 | |
parent | 09f3f9617416e1a9bf11efff0ea4df4e746f3592 (diff) | |
download | qpid-python-667818f1417c742bc4c49465ef83562a693f0e49.tar.gz |
QPID-2999: set redelivered on replay
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1074611 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp | 3 | ||||
-rwxr-xr-x | qpid/cpp/src/tests/cluster_tests.py | 30 | ||||
-rw-r--r-- | qpid/cpp/src/tests/qpid-receive.cpp | 5 |
3 files changed, 37 insertions, 1 deletions
diff --git a/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp b/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp index e1b75ec0cf..f2f0f1a9e5 100644 --- a/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp +++ b/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp @@ -135,6 +135,7 @@ void SenderImpl::sendUnreliable(const qpid::messaging::Message& m) void SenderImpl::replay(const sys::Mutex::ScopedLock&) { for (OutgoingMessages::iterator i = outgoing.begin(); i != outgoing.end(); ++i) { + i->message.setRedelivered(true); sink->send(session, name, *i); } } @@ -147,7 +148,7 @@ uint32_t SenderImpl::checkPendingSends(bool flush) { uint32_t SenderImpl::checkPendingSends(bool flush, const sys::Mutex::ScopedLock&) { if (flush) { - session.flush(); + session.flush(); flushed = true; } else { flushed = false; diff --git a/qpid/cpp/src/tests/cluster_tests.py b/qpid/cpp/src/tests/cluster_tests.py index 3e13a3ce8a..7443e6b663 100755 --- a/qpid/cpp/src/tests/cluster_tests.py +++ b/qpid/cpp/src/tests/cluster_tests.py @@ -304,6 +304,36 @@ acl allow all all # Verify logs are consistent cluster_test_logs.verify_logs() + def test_redelivered(self): + """Verify that redelivered flag is set correctly on replayed messages""" + cluster = self.cluster(2, expect=EXPECT_EXIT_FAIL) + url = "amqp:tcp:%s,tcp:%s" % (cluster[0].host_port(), cluster[1].host_port()) + queue = "my-queue" + cluster[0].declare_queue(queue) + self.sender = self.popen( + ["qpid-send", + "--broker", url, + "--address", queue, + "--sequence=true", + "--send-eos=1", + "--messages=100000", + "--connection-options={reconnect:true}" + ]) + self.receiver = self.popen( + ["qpid-receive", + "--broker", url, + "--address", queue, + "--ignore-duplicates", + "--check-redelivered", + "--connection-options={reconnect:true}", + "--forever" + ]) + time.sleep(1)#give sender enough time to have some messages to replay + cluster[0].kill() + self.sender.wait() + self.receiver.wait() + cluster[1].kill() + class BlockedSend(Thread): """Send a message, send is expected to block. Verify that it does block (for a given timeout), then allow diff --git a/qpid/cpp/src/tests/qpid-receive.cpp b/qpid/cpp/src/tests/qpid-receive.cpp index 3189a13c6e..5a85da4fd2 100644 --- a/qpid/cpp/src/tests/qpid-receive.cpp +++ b/qpid/cpp/src/tests/qpid-receive.cpp @@ -53,6 +53,7 @@ struct Options : public qpid::Options bool forever; uint messages; bool ignoreDuplicates; + bool checkRedelivered; uint capacity; uint ackFrequency; uint tx; @@ -75,6 +76,7 @@ struct Options : public qpid::Options forever(false), messages(0), ignoreDuplicates(false), + checkRedelivered(false), capacity(1000), ackFrequency(100), tx(0), @@ -96,6 +98,7 @@ struct Options : public qpid::Options ("forever,f", qpid::optValue(forever), "ignore timeout and wait forever") ("messages,m", qpid::optValue(messages, "N"), "Number of messages to receive; 0 means receive indefinitely") ("ignore-duplicates", qpid::optValue(ignoreDuplicates), "Detect and ignore duplicates (by checking 'sn' header)") + ("check-redelivered", qpid::optValue(checkRedelivered), "Fails with exception if a duplicate is not marked as redelivered (only relevant when ignore-duplicates is selected)") ("capacity", qpid::optValue(capacity, "N"), "Pre-fetch window (0 implies no pre-fetch)") ("ack-frequency", qpid::optValue(ackFrequency, "N"), "Ack frequency (0 implies none of the messages will get accepted)") ("tx", qpid::optValue(tx, "N"), "batch size for transactions (0 implies transaction are not used)") @@ -216,6 +219,8 @@ int main(int argc, char ** argv) std::cout << msg.getContent() << std::endl;//TODO: handle map or list messages if (opts.messages && count >= opts.messages) done = true; } + } else if (opts.checkRedelivered && !msg.getRedelivered()) { + throw qpid::Exception("duplicate sequence number received, message not marked as redelivered!"); } if (opts.tx && (count % opts.tx == 0)) { if (opts.rollbackFrequency && (++txCount % opts.rollbackFrequency == 0)) { |