summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2010-11-22 16:15:06 +0000
committerAlan Conway <aconway@apache.org>2010-11-22 16:15:06 +0000
commit0f86c796ccb052fa6c5f807f480a1feabec643f8 (patch)
tree0d5d5613220e01ef99ff6ca79f9ff9a1898fa822 /cpp/src
parent17401b1293bcd090ee1a675665ba30fcfac76d5b (diff)
downloadqpid-python-0f86c796ccb052fa6c5f807f480a1feabec643f8.tar.gz
QPID-2956: cluster broker exits with "error deliveryRecord no update message."
The following sequence of events was causing a broker joining the cluster to shutdown: - a client acquires or browses a message with TTL set. - the message expires. - a new broker joins before the client has acknowledged the message. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1037763 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/qpid/cluster/ExpiryPolicy.cpp17
-rw-r--r--cpp/src/qpid/cluster/UpdateClient.cpp6
-rwxr-xr-xcpp/src/tests/cluster_tests.py23
3 files changed, 35 insertions, 11 deletions
diff --git a/cpp/src/qpid/cluster/ExpiryPolicy.cpp b/cpp/src/qpid/cluster/ExpiryPolicy.cpp
index d91437c5ba..d9a7b0122a 100644
--- a/cpp/src/qpid/cluster/ExpiryPolicy.cpp
+++ b/cpp/src/qpid/cluster/ExpiryPolicy.cpp
@@ -31,7 +31,7 @@ namespace qpid {
namespace cluster {
ExpiryPolicy::ExpiryPolicy(Multicaster& m, const MemberId& id, sys::Timer& t)
- : expiryId(0), expiredPolicy(new Expired), mcast(m), memberId(id), timer(t) {}
+ : expiryId(1), expiredPolicy(new Expired), mcast(m), memberId(id), timer(t) {}
struct ExpiryTask : public sys::TimerTask {
ExpiryTask(const boost::intrusive_ptr<ExpiryPolicy>& policy, uint64_t id, sys::AbsTime when)
@@ -61,12 +61,17 @@ void ExpiryPolicy::willExpire(broker::Message& m) {
// them as independenty messages so we can have multiple messages
// with the same expiry ID.
//
- // TODO: fix update to avoid duplicating messages.
sys::Mutex::ScopedLock l(lock);
- id = expiryId++; // if this is an update, this expiryId may already exist
- assert(unexpiredByMessage.find(&m) == unexpiredByMessage.end());
- unexpiredById.insert(IdMessageMap::value_type(id, &m));
- unexpiredByMessage[&m] = id;
+ id = expiryId++;
+ if (!id) { // This is an update of an already-expired message.
+ m.setExpiryPolicy(expiredPolicy);
+ }
+ else {
+ assert(unexpiredByMessage.find(&m) == unexpiredByMessage.end());
+ // If this is an update, the id may already exist
+ unexpiredById.insert(IdMessageMap::value_type(id, &m));
+ unexpiredByMessage[&m] = id;
+ }
}
timer.add(new ExpiryTask(this, id, m.getExpiration()));
}
diff --git a/cpp/src/qpid/cluster/UpdateClient.cpp b/cpp/src/qpid/cluster/UpdateClient.cpp
index 54c5fa0bcc..bc1b812a94 100644
--- a/cpp/src/qpid/cluster/UpdateClient.cpp
+++ b/cpp/src/qpid/cluster/UpdateClient.cpp
@@ -272,8 +272,7 @@ class MessageUpdater {
// Send the expiry ID if necessary.
if (message.payload->getProperties<DeliveryProperties>()->getTtl()) {
boost::optional<uint64_t> expiryId = expiry.getId(*message.payload);
- if (!expiryId) return; // Message already expired, don't replicate.
- ClusterConnectionProxy(session).expiryId(*expiryId);
+ ClusterConnectionProxy(session).expiryId(expiryId?*expiryId:0);
}
// We can't send a broker::Message via the normal client API,
@@ -408,7 +407,8 @@ void UpdateClient::updateSession(broker::SessionHandler& sh) {
QPID_LOG(debug, updaterId << " updating unacknowledged messages.");
broker::DeliveryRecords& drs = ss->getSemanticState().getUnacked();
- std::for_each(drs.begin(), drs.end(), boost::bind(&UpdateClient::updateUnacked, this, _1));
+ std::for_each(drs.begin(), drs.end(),
+ boost::bind(&UpdateClient::updateUnacked, this, _1));
updateTxState(ss->getSemanticState()); // Tx transaction state.
diff --git a/cpp/src/tests/cluster_tests.py b/cpp/src/tests/cluster_tests.py
index 1df72bf1d2..9f70121b74 100755
--- a/cpp/src/tests/cluster_tests.py
+++ b/cpp/src/tests/cluster_tests.py
@@ -245,6 +245,25 @@ acl allow all all
session1 = cluster[1].connect().session()
for q in queues: self.assert_browse(session1, "q1", ["foo"])
+ def test_dr_no_message(self):
+ """Regression test for https://bugzilla.redhat.com/show_bug.cgi?id=655141
+ Joining broker crashes with 'error deliveryRecord no update message'
+ """
+
+ cluster = self.cluster(1)
+ session0 = cluster[0].connect().session()
+ s = session0.sender("q1;{create:always}")
+ s.send(Message("a", ttl=0.05), sync=False)
+ s.send(Message("b", ttl=0.05), sync=False)
+ r1 = session0.receiver("q1")
+ self.assertEqual("a", r1.fetch(timeout=0).content)
+ r2 = session0.receiver("q1;{mode:browse}")
+ self.assertEqual("b", r2.fetch(timeout=0).content)
+ # Leave messages un-acknowledged, let the expire, then start new broker.
+ time.sleep(.1)
+ cluster.start()
+ self.assertRaises(Empty, cluster[1].connect().session().receiver("q1").fetch,0)
+
class LongTests(BrokerTest):
"""Tests that can run for a long time if -DDURATION=<minutes> is set"""
def duration(self):
@@ -274,7 +293,7 @@ class LongTests(BrokerTest):
i += 1
b = cluster.start(expect=EXPECT_EXIT_FAIL)
ErrorGenerator(b)
- time.sleep(min(5,self.duration()/2))
+ time.sleep(5)
sender.stop()
receiver.stop()
for i in range(i, len(cluster)): cluster[i].kill()
@@ -363,7 +382,7 @@ class LongTests(BrokerTest):
start_mclients(cluster[alive])
while time.time() < endtime:
- time.sleep(max(5,self.duration()/4))
+ time.sleep(5)
for b in cluster[alive:]: b.ready() # Check if a broker crashed.
# Kill the first broker, expect the clients to fail.
b = cluster[alive]