summaryrefslogtreecommitdiff
path: root/qpid
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2010-11-18 19:40:53 +0000
committerAlan Conway <aconway@apache.org>2010-11-18 19:40:53 +0000
commitfbe188d7f35bb0c015940cc9ba944744255df99a (patch)
tree04d5de50a185301c9a501acad41da0fd6537e933 /qpid
parent877b20db5a124a14109f3f0445cb5f4635a1adbc (diff)
downloadqpid-python-fbe188d7f35bb0c015940cc9ba944744255df99a.tar.gz
QPID-2874 Clustered broker crashes in assertion in cluster/ExpiryPolicy.cpp
- Added missing lock to ExpiryPolicy - 1-N mapping for expiry ID to mapping when receiving an update. - Regression test. A fan-out message (sent to multiple queues e.g. by fanout or topic exchange) is a single message on multiple queues with a single expiry ID. During an update however each instance is sent as a separate message so we need to allow 1-N mapping of expiry ID to message during update. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1036589 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid')
-rw-r--r--qpid/cpp/src/qpid/cluster/ExpiryPolicy.cpp52
-rw-r--r--qpid/cpp/src/qpid/cluster/ExpiryPolicy.h10
-rwxr-xr-xqpid/cpp/src/tests/cluster_tests.py26
-rw-r--r--qpid/python/qpid/brokertest.py14
4 files changed, 89 insertions, 13 deletions
diff --git a/qpid/cpp/src/qpid/cluster/ExpiryPolicy.cpp b/qpid/cpp/src/qpid/cluster/ExpiryPolicy.cpp
index e1ba420f2a..d91437c5ba 100644
--- a/qpid/cpp/src/qpid/cluster/ExpiryPolicy.cpp
+++ b/qpid/cpp/src/qpid/cluster/ExpiryPolicy.cpp
@@ -41,40 +41,76 @@ struct ExpiryTask : public sys::TimerTask {
const uint64_t expiryId;
};
+// Called while receiving an update
+void ExpiryPolicy::setId(uint64_t id) {
+ sys::Mutex::ScopedLock l(lock);
+ expiryId = id;
+}
+
+// Called while giving an update
+uint64_t ExpiryPolicy::getId() const {
+ sys::Mutex::ScopedLock l(lock);
+ return expiryId;
+}
+
+// Called in enqueuing connection thread
void ExpiryPolicy::willExpire(broker::Message& m) {
- uint64_t id = expiryId++;
- assert(unexpiredById.find(id) == unexpiredById.end());
- assert(unexpiredByMessage.find(&m) == unexpiredByMessage.end());
- unexpiredById[id] = &m;
- unexpiredByMessage[&m] = id;
+ uint64_t id;
+ {
+ // When messages are fanned out to multiple queues, update sends
+ // them as independenty messages so we can have multiple messages
+ // with the same expiry ID.
+ //
+ // TODO: fix update to avoid duplicating messages.
+ sys::Mutex::ScopedLock l(lock);
+ id = expiryId++; // if this is an update, this expiryId may already exist
+ assert(unexpiredByMessage.find(&m) == unexpiredByMessage.end());
+ unexpiredById.insert(IdMessageMap::value_type(id, &m));
+ unexpiredByMessage[&m] = id;
+ }
timer.add(new ExpiryTask(this, id, m.getExpiration()));
}
+// Called in dequeueing connection thread
void ExpiryPolicy::forget(broker::Message& m) {
+ sys::Mutex::ScopedLock l(lock);
MessageIdMap::iterator i = unexpiredByMessage.find(&m);
assert(i != unexpiredByMessage.end());
unexpiredById.erase(i->second);
unexpiredByMessage.erase(i);
}
+// Called in dequeueing connection or cleanup thread.
bool ExpiryPolicy::hasExpired(broker::Message& m) {
+ sys::Mutex::ScopedLock l(lock);
return unexpiredByMessage.find(&m) == unexpiredByMessage.end();
}
+// Called in timer thread
void ExpiryPolicy::sendExpire(uint64_t id) {
+ {
+ sys::Mutex::ScopedLock l(lock);
+ // Don't multicast an expiry notice if message is already forgotten.
+ if (unexpiredById.find(id) == unexpiredById.end()) return;
+ }
mcast.mcastControl(framing::ClusterMessageExpiredBody(framing::ProtocolVersion(), id), memberId);
}
+// Called in CPG deliver thread.
void ExpiryPolicy::deliverExpire(uint64_t id) {
- IdMessageMap::iterator i = unexpiredById.find(id);
- if (i != unexpiredById.end()) {
+ sys::Mutex::ScopedLock l(lock);
+ std::pair<IdMessageMap::iterator, IdMessageMap::iterator> expired = unexpiredById.equal_range(id);
+ IdMessageMap::iterator i = expired.first;
+ while (i != expired.second) {
i->second->setExpiryPolicy(expiredPolicy); // hasExpired() == true;
unexpiredByMessage.erase(i->second);
- unexpiredById.erase(i);
+ unexpiredById.erase(i++);
}
}
+// Called in update thread on the updater.
boost::optional<uint64_t> ExpiryPolicy::getId(broker::Message& m) {
+ sys::Mutex::ScopedLock l(lock);
MessageIdMap::iterator i = unexpiredByMessage.find(&m);
return i == unexpiredByMessage.end() ? boost::optional<uint64_t>() : i->second;
}
diff --git a/qpid/cpp/src/qpid/cluster/ExpiryPolicy.h b/qpid/cpp/src/qpid/cluster/ExpiryPolicy.h
index bdbe3a61dc..77a656aa68 100644
--- a/qpid/cpp/src/qpid/cluster/ExpiryPolicy.h
+++ b/qpid/cpp/src/qpid/cluster/ExpiryPolicy.h
@@ -61,20 +61,24 @@ class ExpiryPolicy : public broker::ExpiryPolicy
// Cluster delivers expiry notice.
void deliverExpire(uint64_t);
- void setId(uint64_t id) { expiryId = id; }
- uint64_t getId() const { return expiryId; }
+ void setId(uint64_t id);
+ uint64_t getId() const;
boost::optional<uint64_t> getId(broker::Message&);
private:
typedef std::map<broker::Message*, uint64_t> MessageIdMap;
- typedef std::map<uint64_t, broker::Message*> IdMessageMap;
+ // When messages are fanned out to multiple queues, update sends
+ // them as independenty messages so we can have multiple messages
+ // with the same expiry ID.
+ typedef std::multimap<uint64_t, broker::Message*> IdMessageMap;
struct Expired : public broker::ExpiryPolicy {
bool hasExpired(broker::Message&);
void willExpire(broker::Message&);
};
+ mutable sys::Mutex lock;
MessageIdMap unexpiredByMessage;
IdMessageMap unexpiredById;
uint64_t expiryId;
diff --git a/qpid/cpp/src/tests/cluster_tests.py b/qpid/cpp/src/tests/cluster_tests.py
index 7272675971..1df72bf1d2 100755
--- a/qpid/cpp/src/tests/cluster_tests.py
+++ b/qpid/cpp/src/tests/cluster_tests.py
@@ -22,7 +22,7 @@ import os, signal, sys, time, imp, re, subprocess
from qpid import datatypes, messaging
from qpid.brokertest import *
from qpid.harness import Skipped
-from qpid.messaging import Message
+from qpid.messaging import Message, Empty
from threading import Thread, Lock
from logging import getLogger
from itertools import chain
@@ -41,7 +41,6 @@ log = getLogger("qpid.cluster_tests")
# Import scripts as modules
qpid_cluster=import_script(checkenv("QPID_CLUSTER_EXEC"))
-
def readfile(filename):
"""Returns te content of file named filename as a string"""
f = file(filename)
@@ -223,6 +222,29 @@ acl allow all all
out = qs.communicate()[0]
assert out.find("amq.failover") > 0
+ def evaluate_address(self, session, address):
+ """Create a receiver just to evaluate an address for its side effects"""
+ r = session.receiver(address)
+ r.close()
+
+ def test_expire_fanout(self):
+ """Regression test for QPID-2874: Clustered broker crashes in assertion in
+ cluster/ExpiryPolicy.cpp.
+ Caused by a fan-out message being updated as separate messages"""
+ cluster = self.cluster(1)
+ session0 = cluster[0].connect().session()
+ # Create 2 queues bound to fanout exchange.
+ self.evaluate_address(session0, "q1;{create:always,node:{x-bindings:[{exchange:'amq.fanout',queue:q1}]}}")
+ self.evaluate_address(session0, "q2;{create:always,node:{x-bindings:[{exchange:'amq.fanout',queue:q2}]}}")
+ queues = ["q1", "q2"]
+ # Send a fanout message with a long timeout
+ s = session0.sender("amq.fanout")
+ s.send(Message("foo", ttl=100), sync=False)
+ # Start a new member, check the messages
+ cluster.start()
+ session1 = cluster[1].connect().session()
+ for q in queues: self.assert_browse(session1, "q1", ["foo"])
+
class LongTests(BrokerTest):
"""Tests that can run for a long time if -DDURATION=<minutes> is set"""
def duration(self):
diff --git a/qpid/python/qpid/brokertest.py b/qpid/python/qpid/brokertest.py
index 14be7c43c9..208bfd6000 100644
--- a/qpid/python/qpid/brokertest.py
+++ b/qpid/python/qpid/brokertest.py
@@ -523,6 +523,20 @@ class BrokerTest(TestCase):
cluster = Cluster(self, count, args, expect=expect, wait=wait)
return cluster
+ def assert_browse(self, session, queue, expect_contents, timeout=0):
+ """Assert that the contents of messages on queue (as retrieved
+ using session and timeout) exactly match the strings in
+ expect_contents"""
+
+ r = session.receiver("%s;{mode:browse}"%(queue))
+ actual_contents = []
+ try:
+ for c in expect_contents: actual_contents.append(r.fetch(timeout=timeout).content)
+ while True: actual_contents.append(r.fetch(timeout=0).content) # Check for extra messages.
+ except messaging.Empty: pass
+ r.close()
+ self.assertEqual(expect_contents, actual_contents)
+
class RethrownException(Exception):
"""Captures the stack trace of the current exception to be thrown later"""
def __init__(self, msg=""):