diff options
author | Alan Conway <aconway@apache.org> | 2010-11-22 16:15:06 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2010-11-22 16:15:06 +0000 |
commit | 0f86c796ccb052fa6c5f807f480a1feabec643f8 (patch) | |
tree | 0d5d5613220e01ef99ff6ca79f9ff9a1898fa822 /cpp/src | |
parent | 17401b1293bcd090ee1a675665ba30fcfac76d5b (diff) | |
download | qpid-python-0f86c796ccb052fa6c5f807f480a1feabec643f8.tar.gz |
QPID-2956: cluster broker exits with "error deliveryRecord no update message."
The following sequence of events was causing a broker joining the cluster to shutdown:
- a client acquires or browses a message with TTL set.
- the message expires.
- a new broker joins before the client has acknowledged the message.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1037763 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/qpid/cluster/ExpiryPolicy.cpp | 17 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/UpdateClient.cpp | 6 | ||||
-rwxr-xr-x | cpp/src/tests/cluster_tests.py | 23 |
3 files changed, 35 insertions, 11 deletions
diff --git a/cpp/src/qpid/cluster/ExpiryPolicy.cpp b/cpp/src/qpid/cluster/ExpiryPolicy.cpp index d91437c5ba..d9a7b0122a 100644 --- a/cpp/src/qpid/cluster/ExpiryPolicy.cpp +++ b/cpp/src/qpid/cluster/ExpiryPolicy.cpp @@ -31,7 +31,7 @@ namespace qpid { namespace cluster { ExpiryPolicy::ExpiryPolicy(Multicaster& m, const MemberId& id, sys::Timer& t) - : expiryId(0), expiredPolicy(new Expired), mcast(m), memberId(id), timer(t) {} + : expiryId(1), expiredPolicy(new Expired), mcast(m), memberId(id), timer(t) {} struct ExpiryTask : public sys::TimerTask { ExpiryTask(const boost::intrusive_ptr<ExpiryPolicy>& policy, uint64_t id, sys::AbsTime when) @@ -61,12 +61,17 @@ void ExpiryPolicy::willExpire(broker::Message& m) { // them as independenty messages so we can have multiple messages // with the same expiry ID. // - // TODO: fix update to avoid duplicating messages. sys::Mutex::ScopedLock l(lock); - id = expiryId++; // if this is an update, this expiryId may already exist - assert(unexpiredByMessage.find(&m) == unexpiredByMessage.end()); - unexpiredById.insert(IdMessageMap::value_type(id, &m)); - unexpiredByMessage[&m] = id; + id = expiryId++; + if (!id) { // This is an update of an already-expired message. + m.setExpiryPolicy(expiredPolicy); + } + else { + assert(unexpiredByMessage.find(&m) == unexpiredByMessage.end()); + // If this is an update, the id may already exist + unexpiredById.insert(IdMessageMap::value_type(id, &m)); + unexpiredByMessage[&m] = id; + } } timer.add(new ExpiryTask(this, id, m.getExpiration())); } diff --git a/cpp/src/qpid/cluster/UpdateClient.cpp b/cpp/src/qpid/cluster/UpdateClient.cpp index 54c5fa0bcc..bc1b812a94 100644 --- a/cpp/src/qpid/cluster/UpdateClient.cpp +++ b/cpp/src/qpid/cluster/UpdateClient.cpp @@ -272,8 +272,7 @@ class MessageUpdater { // Send the expiry ID if necessary. if (message.payload->getProperties<DeliveryProperties>()->getTtl()) { boost::optional<uint64_t> expiryId = expiry.getId(*message.payload); - if (!expiryId) return; // Message already expired, don't replicate. - ClusterConnectionProxy(session).expiryId(*expiryId); + ClusterConnectionProxy(session).expiryId(expiryId?*expiryId:0); } // We can't send a broker::Message via the normal client API, @@ -408,7 +407,8 @@ void UpdateClient::updateSession(broker::SessionHandler& sh) { QPID_LOG(debug, updaterId << " updating unacknowledged messages."); broker::DeliveryRecords& drs = ss->getSemanticState().getUnacked(); - std::for_each(drs.begin(), drs.end(), boost::bind(&UpdateClient::updateUnacked, this, _1)); + std::for_each(drs.begin(), drs.end(), + boost::bind(&UpdateClient::updateUnacked, this, _1)); updateTxState(ss->getSemanticState()); // Tx transaction state. diff --git a/cpp/src/tests/cluster_tests.py b/cpp/src/tests/cluster_tests.py index 1df72bf1d2..9f70121b74 100755 --- a/cpp/src/tests/cluster_tests.py +++ b/cpp/src/tests/cluster_tests.py @@ -245,6 +245,25 @@ acl allow all all session1 = cluster[1].connect().session() for q in queues: self.assert_browse(session1, "q1", ["foo"]) + def test_dr_no_message(self): + """Regression test for https://bugzilla.redhat.com/show_bug.cgi?id=655141 + Joining broker crashes with 'error deliveryRecord no update message' + """ + + cluster = self.cluster(1) + session0 = cluster[0].connect().session() + s = session0.sender("q1;{create:always}") + s.send(Message("a", ttl=0.05), sync=False) + s.send(Message("b", ttl=0.05), sync=False) + r1 = session0.receiver("q1") + self.assertEqual("a", r1.fetch(timeout=0).content) + r2 = session0.receiver("q1;{mode:browse}") + self.assertEqual("b", r2.fetch(timeout=0).content) + # Leave messages un-acknowledged, let the expire, then start new broker. + time.sleep(.1) + cluster.start() + self.assertRaises(Empty, cluster[1].connect().session().receiver("q1").fetch,0) + class LongTests(BrokerTest): """Tests that can run for a long time if -DDURATION=<minutes> is set""" def duration(self): @@ -274,7 +293,7 @@ class LongTests(BrokerTest): i += 1 b = cluster.start(expect=EXPECT_EXIT_FAIL) ErrorGenerator(b) - time.sleep(min(5,self.duration()/2)) + time.sleep(5) sender.stop() receiver.stop() for i in range(i, len(cluster)): cluster[i].kill() @@ -363,7 +382,7 @@ class LongTests(BrokerTest): start_mclients(cluster[alive]) while time.time() < endtime: - time.sleep(max(5,self.duration()/4)) + time.sleep(5) for b in cluster[alive:]: b.ready() # Check if a broker crashed. # Kill the first broker, expect the clients to fail. b = cluster[alive] |