diff options
author | Alan Conway <aconway@apache.org> | 2010-11-18 19:40:53 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2010-11-18 19:40:53 +0000 |
commit | 4869e431e27144d96c81448d7db0d20c68f2b6a7 (patch) | |
tree | 8e7d07af618e2a560a585545c6b7dd5056b53e04 /cpp/src/tests/cluster_tests.py | |
parent | 9891b7d43f702ea4374c62381eba24db21b1bfbf (diff) | |
download | qpid-python-4869e431e27144d96c81448d7db0d20c68f2b6a7.tar.gz |
QPID-2874 Clustered broker crashes in assertion in cluster/ExpiryPolicy.cpp
- Added missing lock to ExpiryPolicy
- 1-N mapping for expiry ID to mapping when receiving an update.
- Regression test.
A fan-out message (sent to multiple queues e.g. by fanout or topic
exchange) is a single message on multiple queues with a single expiry
ID. During an update however each instance is sent as a separate
message so we need to allow 1-N mapping of expiry ID to message during
update.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1036589 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/tests/cluster_tests.py')
-rwxr-xr-x | cpp/src/tests/cluster_tests.py | 26 |
1 files changed, 24 insertions, 2 deletions
diff --git a/cpp/src/tests/cluster_tests.py b/cpp/src/tests/cluster_tests.py index 7272675971..1df72bf1d2 100755 --- a/cpp/src/tests/cluster_tests.py +++ b/cpp/src/tests/cluster_tests.py @@ -22,7 +22,7 @@ import os, signal, sys, time, imp, re, subprocess from qpid import datatypes, messaging from qpid.brokertest import * from qpid.harness import Skipped -from qpid.messaging import Message +from qpid.messaging import Message, Empty from threading import Thread, Lock from logging import getLogger from itertools import chain @@ -41,7 +41,6 @@ log = getLogger("qpid.cluster_tests") # Import scripts as modules qpid_cluster=import_script(checkenv("QPID_CLUSTER_EXEC")) - def readfile(filename): """Returns te content of file named filename as a string""" f = file(filename) @@ -223,6 +222,29 @@ acl allow all all out = qs.communicate()[0] assert out.find("amq.failover") > 0 + def evaluate_address(self, session, address): + """Create a receiver just to evaluate an address for its side effects""" + r = session.receiver(address) + r.close() + + def test_expire_fanout(self): + """Regression test for QPID-2874: Clustered broker crashes in assertion in + cluster/ExpiryPolicy.cpp. + Caused by a fan-out message being updated as separate messages""" + cluster = self.cluster(1) + session0 = cluster[0].connect().session() + # Create 2 queues bound to fanout exchange. + self.evaluate_address(session0, "q1;{create:always,node:{x-bindings:[{exchange:'amq.fanout',queue:q1}]}}") + self.evaluate_address(session0, "q2;{create:always,node:{x-bindings:[{exchange:'amq.fanout',queue:q2}]}}") + queues = ["q1", "q2"] + # Send a fanout message with a long timeout + s = session0.sender("amq.fanout") + s.send(Message("foo", ttl=100), sync=False) + # Start a new member, check the messages + cluster.start() + session1 = cluster[1].connect().session() + for q in queues: self.assert_browse(session1, "q1", ["foo"]) + class LongTests(BrokerTest): """Tests that can run for a long time if -DDURATION=<minutes> is set""" def duration(self): |