summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2011-02-25 16:14:32 +0000
committerGordon Sim <gsim@apache.org>2011-02-25 16:14:32 +0000
commit667818f1417c742bc4c49465ef83562a693f0e49 (patch)
tree56b390c8ff085ddef0ed936d9e0f857dab94ad75
parent09f3f9617416e1a9bf11efff0ea4df4e746f3592 (diff)
downloadqpid-python-667818f1417c742bc4c49465ef83562a693f0e49.tar.gz
QPID-2999: set redelivered on replay
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1074611 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp3
-rwxr-xr-xqpid/cpp/src/tests/cluster_tests.py30
-rw-r--r--qpid/cpp/src/tests/qpid-receive.cpp5
3 files changed, 37 insertions, 1 deletions
diff --git a/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp b/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp
index e1b75ec0cf..f2f0f1a9e5 100644
--- a/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp
+++ b/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp
@@ -135,6 +135,7 @@ void SenderImpl::sendUnreliable(const qpid::messaging::Message& m)
void SenderImpl::replay(const sys::Mutex::ScopedLock&)
{
for (OutgoingMessages::iterator i = outgoing.begin(); i != outgoing.end(); ++i) {
+ i->message.setRedelivered(true);
sink->send(session, name, *i);
}
}
@@ -147,7 +148,7 @@ uint32_t SenderImpl::checkPendingSends(bool flush) {
uint32_t SenderImpl::checkPendingSends(bool flush, const sys::Mutex::ScopedLock&)
{
if (flush) {
- session.flush();
+ session.flush();
flushed = true;
} else {
flushed = false;
diff --git a/qpid/cpp/src/tests/cluster_tests.py b/qpid/cpp/src/tests/cluster_tests.py
index 3e13a3ce8a..7443e6b663 100755
--- a/qpid/cpp/src/tests/cluster_tests.py
+++ b/qpid/cpp/src/tests/cluster_tests.py
@@ -304,6 +304,36 @@ acl allow all all
# Verify logs are consistent
cluster_test_logs.verify_logs()
+ def test_redelivered(self):
+ """Verify that redelivered flag is set correctly on replayed messages"""
+ cluster = self.cluster(2, expect=EXPECT_EXIT_FAIL)
+ url = "amqp:tcp:%s,tcp:%s" % (cluster[0].host_port(), cluster[1].host_port())
+ queue = "my-queue"
+ cluster[0].declare_queue(queue)
+ self.sender = self.popen(
+ ["qpid-send",
+ "--broker", url,
+ "--address", queue,
+ "--sequence=true",
+ "--send-eos=1",
+ "--messages=100000",
+ "--connection-options={reconnect:true}"
+ ])
+ self.receiver = self.popen(
+ ["qpid-receive",
+ "--broker", url,
+ "--address", queue,
+ "--ignore-duplicates",
+ "--check-redelivered",
+ "--connection-options={reconnect:true}",
+ "--forever"
+ ])
+ time.sleep(1)#give sender enough time to have some messages to replay
+ cluster[0].kill()
+ self.sender.wait()
+ self.receiver.wait()
+ cluster[1].kill()
+
class BlockedSend(Thread):
"""Send a message, send is expected to block.
Verify that it does block (for a given timeout), then allow
diff --git a/qpid/cpp/src/tests/qpid-receive.cpp b/qpid/cpp/src/tests/qpid-receive.cpp
index 3189a13c6e..5a85da4fd2 100644
--- a/qpid/cpp/src/tests/qpid-receive.cpp
+++ b/qpid/cpp/src/tests/qpid-receive.cpp
@@ -53,6 +53,7 @@ struct Options : public qpid::Options
bool forever;
uint messages;
bool ignoreDuplicates;
+ bool checkRedelivered;
uint capacity;
uint ackFrequency;
uint tx;
@@ -75,6 +76,7 @@ struct Options : public qpid::Options
forever(false),
messages(0),
ignoreDuplicates(false),
+ checkRedelivered(false),
capacity(1000),
ackFrequency(100),
tx(0),
@@ -96,6 +98,7 @@ struct Options : public qpid::Options
("forever,f", qpid::optValue(forever), "ignore timeout and wait forever")
("messages,m", qpid::optValue(messages, "N"), "Number of messages to receive; 0 means receive indefinitely")
("ignore-duplicates", qpid::optValue(ignoreDuplicates), "Detect and ignore duplicates (by checking 'sn' header)")
+ ("check-redelivered", qpid::optValue(checkRedelivered), "Fails with exception if a duplicate is not marked as redelivered (only relevant when ignore-duplicates is selected)")
("capacity", qpid::optValue(capacity, "N"), "Pre-fetch window (0 implies no pre-fetch)")
("ack-frequency", qpid::optValue(ackFrequency, "N"), "Ack frequency (0 implies none of the messages will get accepted)")
("tx", qpid::optValue(tx, "N"), "batch size for transactions (0 implies transaction are not used)")
@@ -216,6 +219,8 @@ int main(int argc, char ** argv)
std::cout << msg.getContent() << std::endl;//TODO: handle map or list messages
if (opts.messages && count >= opts.messages) done = true;
}
+ } else if (opts.checkRedelivered && !msg.getRedelivered()) {
+ throw qpid::Exception("duplicate sequence number received, message not marked as redelivered!");
}
if (opts.tx && (count % opts.tx == 0)) {
if (opts.rollbackFrequency && (++txCount % opts.rollbackFrequency == 0)) {