diff options
author | Alan Conway <aconway@apache.org> | 2010-11-18 19:40:53 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2010-11-18 19:40:53 +0000 |
commit | fbe188d7f35bb0c015940cc9ba944744255df99a (patch) | |
tree | 04d5de50a185301c9a501acad41da0fd6537e933 | |
parent | 877b20db5a124a14109f3f0445cb5f4635a1adbc (diff) | |
download | qpid-python-fbe188d7f35bb0c015940cc9ba944744255df99a.tar.gz |
QPID-2874 Clustered broker crashes in assertion in cluster/ExpiryPolicy.cpp
- Added missing lock to ExpiryPolicy
- 1-N mapping for expiry ID to mapping when receiving an update.
- Regression test.
A fan-out message (sent to multiple queues e.g. by fanout or topic
exchange) is a single message on multiple queues with a single expiry
ID. During an update however each instance is sent as a separate
message so we need to allow 1-N mapping of expiry ID to message during
update.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1036589 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/cpp/src/qpid/cluster/ExpiryPolicy.cpp | 52 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/ExpiryPolicy.h | 10 | ||||
-rwxr-xr-x | qpid/cpp/src/tests/cluster_tests.py | 26 | ||||
-rw-r--r-- | qpid/python/qpid/brokertest.py | 14 |
4 files changed, 89 insertions, 13 deletions
diff --git a/qpid/cpp/src/qpid/cluster/ExpiryPolicy.cpp b/qpid/cpp/src/qpid/cluster/ExpiryPolicy.cpp index e1ba420f2a..d91437c5ba 100644 --- a/qpid/cpp/src/qpid/cluster/ExpiryPolicy.cpp +++ b/qpid/cpp/src/qpid/cluster/ExpiryPolicy.cpp @@ -41,40 +41,76 @@ struct ExpiryTask : public sys::TimerTask { const uint64_t expiryId; }; +// Called while receiving an update +void ExpiryPolicy::setId(uint64_t id) { + sys::Mutex::ScopedLock l(lock); + expiryId = id; +} + +// Called while giving an update +uint64_t ExpiryPolicy::getId() const { + sys::Mutex::ScopedLock l(lock); + return expiryId; +} + +// Called in enqueuing connection thread void ExpiryPolicy::willExpire(broker::Message& m) { - uint64_t id = expiryId++; - assert(unexpiredById.find(id) == unexpiredById.end()); - assert(unexpiredByMessage.find(&m) == unexpiredByMessage.end()); - unexpiredById[id] = &m; - unexpiredByMessage[&m] = id; + uint64_t id; + { + // When messages are fanned out to multiple queues, update sends + // them as independenty messages so we can have multiple messages + // with the same expiry ID. + // + // TODO: fix update to avoid duplicating messages. + sys::Mutex::ScopedLock l(lock); + id = expiryId++; // if this is an update, this expiryId may already exist + assert(unexpiredByMessage.find(&m) == unexpiredByMessage.end()); + unexpiredById.insert(IdMessageMap::value_type(id, &m)); + unexpiredByMessage[&m] = id; + } timer.add(new ExpiryTask(this, id, m.getExpiration())); } +// Called in dequeueing connection thread void ExpiryPolicy::forget(broker::Message& m) { + sys::Mutex::ScopedLock l(lock); MessageIdMap::iterator i = unexpiredByMessage.find(&m); assert(i != unexpiredByMessage.end()); unexpiredById.erase(i->second); unexpiredByMessage.erase(i); } +// Called in dequeueing connection or cleanup thread. bool ExpiryPolicy::hasExpired(broker::Message& m) { + sys::Mutex::ScopedLock l(lock); return unexpiredByMessage.find(&m) == unexpiredByMessage.end(); } +// Called in timer thread void ExpiryPolicy::sendExpire(uint64_t id) { + { + sys::Mutex::ScopedLock l(lock); + // Don't multicast an expiry notice if message is already forgotten. + if (unexpiredById.find(id) == unexpiredById.end()) return; + } mcast.mcastControl(framing::ClusterMessageExpiredBody(framing::ProtocolVersion(), id), memberId); } +// Called in CPG deliver thread. void ExpiryPolicy::deliverExpire(uint64_t id) { - IdMessageMap::iterator i = unexpiredById.find(id); - if (i != unexpiredById.end()) { + sys::Mutex::ScopedLock l(lock); + std::pair<IdMessageMap::iterator, IdMessageMap::iterator> expired = unexpiredById.equal_range(id); + IdMessageMap::iterator i = expired.first; + while (i != expired.second) { i->second->setExpiryPolicy(expiredPolicy); // hasExpired() == true; unexpiredByMessage.erase(i->second); - unexpiredById.erase(i); + unexpiredById.erase(i++); } } +// Called in update thread on the updater. boost::optional<uint64_t> ExpiryPolicy::getId(broker::Message& m) { + sys::Mutex::ScopedLock l(lock); MessageIdMap::iterator i = unexpiredByMessage.find(&m); return i == unexpiredByMessage.end() ? boost::optional<uint64_t>() : i->second; } diff --git a/qpid/cpp/src/qpid/cluster/ExpiryPolicy.h b/qpid/cpp/src/qpid/cluster/ExpiryPolicy.h index bdbe3a61dc..77a656aa68 100644 --- a/qpid/cpp/src/qpid/cluster/ExpiryPolicy.h +++ b/qpid/cpp/src/qpid/cluster/ExpiryPolicy.h @@ -61,20 +61,24 @@ class ExpiryPolicy : public broker::ExpiryPolicy // Cluster delivers expiry notice. void deliverExpire(uint64_t); - void setId(uint64_t id) { expiryId = id; } - uint64_t getId() const { return expiryId; } + void setId(uint64_t id); + uint64_t getId() const; boost::optional<uint64_t> getId(broker::Message&); private: typedef std::map<broker::Message*, uint64_t> MessageIdMap; - typedef std::map<uint64_t, broker::Message*> IdMessageMap; + // When messages are fanned out to multiple queues, update sends + // them as independenty messages so we can have multiple messages + // with the same expiry ID. + typedef std::multimap<uint64_t, broker::Message*> IdMessageMap; struct Expired : public broker::ExpiryPolicy { bool hasExpired(broker::Message&); void willExpire(broker::Message&); }; + mutable sys::Mutex lock; MessageIdMap unexpiredByMessage; IdMessageMap unexpiredById; uint64_t expiryId; diff --git a/qpid/cpp/src/tests/cluster_tests.py b/qpid/cpp/src/tests/cluster_tests.py index 7272675971..1df72bf1d2 100755 --- a/qpid/cpp/src/tests/cluster_tests.py +++ b/qpid/cpp/src/tests/cluster_tests.py @@ -22,7 +22,7 @@ import os, signal, sys, time, imp, re, subprocess from qpid import datatypes, messaging from qpid.brokertest import * from qpid.harness import Skipped -from qpid.messaging import Message +from qpid.messaging import Message, Empty from threading import Thread, Lock from logging import getLogger from itertools import chain @@ -41,7 +41,6 @@ log = getLogger("qpid.cluster_tests") # Import scripts as modules qpid_cluster=import_script(checkenv("QPID_CLUSTER_EXEC")) - def readfile(filename): """Returns te content of file named filename as a string""" f = file(filename) @@ -223,6 +222,29 @@ acl allow all all out = qs.communicate()[0] assert out.find("amq.failover") > 0 + def evaluate_address(self, session, address): + """Create a receiver just to evaluate an address for its side effects""" + r = session.receiver(address) + r.close() + + def test_expire_fanout(self): + """Regression test for QPID-2874: Clustered broker crashes in assertion in + cluster/ExpiryPolicy.cpp. + Caused by a fan-out message being updated as separate messages""" + cluster = self.cluster(1) + session0 = cluster[0].connect().session() + # Create 2 queues bound to fanout exchange. + self.evaluate_address(session0, "q1;{create:always,node:{x-bindings:[{exchange:'amq.fanout',queue:q1}]}}") + self.evaluate_address(session0, "q2;{create:always,node:{x-bindings:[{exchange:'amq.fanout',queue:q2}]}}") + queues = ["q1", "q2"] + # Send a fanout message with a long timeout + s = session0.sender("amq.fanout") + s.send(Message("foo", ttl=100), sync=False) + # Start a new member, check the messages + cluster.start() + session1 = cluster[1].connect().session() + for q in queues: self.assert_browse(session1, "q1", ["foo"]) + class LongTests(BrokerTest): """Tests that can run for a long time if -DDURATION=<minutes> is set""" def duration(self): diff --git a/qpid/python/qpid/brokertest.py b/qpid/python/qpid/brokertest.py index 14be7c43c9..208bfd6000 100644 --- a/qpid/python/qpid/brokertest.py +++ b/qpid/python/qpid/brokertest.py @@ -523,6 +523,20 @@ class BrokerTest(TestCase): cluster = Cluster(self, count, args, expect=expect, wait=wait) return cluster + def assert_browse(self, session, queue, expect_contents, timeout=0): + """Assert that the contents of messages on queue (as retrieved + using session and timeout) exactly match the strings in + expect_contents""" + + r = session.receiver("%s;{mode:browse}"%(queue)) + actual_contents = [] + try: + for c in expect_contents: actual_contents.append(r.fetch(timeout=timeout).content) + while True: actual_contents.append(r.fetch(timeout=0).content) # Check for extra messages. + except messaging.Empty: pass + r.close() + self.assertEqual(expect_contents, actual_contents) + class RethrownException(Exception): """Captures the stack trace of the current exception to be thrown later""" def __init__(self, msg=""): |