From 3ff288653b2a0bde0849a0a0438a2bc6b7ad00b2 Mon Sep 17 00:00:00 2001 From: Alan Conway Date: Wed, 13 Apr 2011 14:06:48 +0000 Subject: QPID-3202: Clustered brokers shut down with "unknown connection" error. This error is an assertion recently introduced in r1091097, the bug has probably been there for a while. Two fixes: - Connections that close before they are announced no longer send a deliver-close. - Fixed race in OutputInterceptor that sent a deliver-do-output after deliver-close. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1091790 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/cpp/src/qpid/cluster/Connection.cpp | 12 +++++++----- qpid/cpp/src/qpid/cluster/Connection.h | 1 + qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp | 22 ++++++++++++++-------- qpid/cpp/src/qpid/cluster/OutputInterceptor.h | 10 +++++----- 4 files changed, 27 insertions(+), 18 deletions(-) diff --git a/qpid/cpp/src/qpid/cluster/Connection.cpp b/qpid/cpp/src/qpid/cluster/Connection.cpp index c1304c2b75..f2ea466a9b 100644 --- a/qpid/cpp/src/qpid/cluster/Connection.cpp +++ b/qpid/cpp/src/qpid/cluster/Connection.cpp @@ -78,7 +78,7 @@ const std::string shadowPrefix("[shadow]"); Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out, const std::string& mgmtId, const ConnectionId& id, const qpid::sys::SecuritySettings& external) - : cluster(c), self(id), catchUp(false), output(*this, out), + : cluster(c), self(id), catchUp(false), announced(false), output(*this, out), connectionCtor(&output, cluster.getBroker(), mgmtId, external, false, 0, true), expectProtocolHeader(false), mcastFrameHandler(cluster.getMulticast(), self), @@ -90,7 +90,7 @@ Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out, Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out, const std::string& mgmtId, MemberId member, bool isCatchUp, bool isLink, const qpid::sys::SecuritySettings& external -) : cluster(c), self(member, ++idCounter), catchUp(isCatchUp), output(*this, out), +) : cluster(c), self(member, ++idCounter), catchUp(isCatchUp), announced(false), output(*this, out), connectionCtor(&output, cluster.getBroker(), mgmtId, external, @@ -255,7 +255,7 @@ void Connection::deliveredFrame(const EventFrame& f) { } } -// A local connection is closed by the network layer. +// A local connection is closed by the network layer. Called in the connection thread. void Connection::closed() { try { if (isUpdated()) { @@ -272,8 +272,9 @@ void Connection::closed() { // closed and process any outstanding frames from the cluster // until self-delivery of deliver-close. output.closeOutput(); - cluster.getMulticast().mcastControl( - ClusterConnectionDeliverCloseBody(), self); + if (announced) + cluster.getMulticast().mcastControl( + ClusterConnectionDeliverCloseBody(), self); } } catch (const std::exception& e) { @@ -384,6 +385,7 @@ void Connection::processInitialFrames(const char*& ptr, size_t size) { connection->getUserId(), initialFrames), getId()); + announced = true; initialFrames.clear(); } } diff --git a/qpid/cpp/src/qpid/cluster/Connection.h b/qpid/cpp/src/qpid/cluster/Connection.h index 8a9891deb6..a4436e84a8 100644 --- a/qpid/cpp/src/qpid/cluster/Connection.h +++ b/qpid/cpp/src/qpid/cluster/Connection.h @@ -250,6 +250,7 @@ class Connection : Cluster& cluster; ConnectionId self; bool catchUp; + bool announced; OutputInterceptor output; framing::FrameDecoder localDecoder; ConnectionCtor connectionCtor; diff --git a/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp b/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp index 13e95d1ec2..4bf03eefa2 100644 --- a/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp +++ b/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp @@ -45,12 +45,11 @@ void OutputInterceptor::send(framing::AMQFrame& f) { } void OutputInterceptor::activateOutput() { - if (parent.isCatchUp()) { - sys::Mutex::ScopedLock l(lock); + sys::Mutex::ScopedLock l(lock); + if (parent.isCatchUp()) next->activateOutput(); - } else - sendDoOutput(sendMax); + sendDoOutput(sendMax, l); } void OutputInterceptor::abort() { @@ -75,23 +74,29 @@ bool OutputInterceptor::doOutput() { // Send output up to limit, calculate new limit. void OutputInterceptor::deliverDoOutput(uint32_t limit) { + sys::Mutex::ScopedLock l(lock); sentDoOutput = false; sendMax = limit; size_t newLimit = limit; if (parent.isLocal()) { - size_t buffered = getBuffered(); + size_t buffered = next->getBuffered(); if (buffered == 0 && sent == sendMax) // Could have sent more, increase the limit. newLimit = sendMax*2; else if (buffered > 0 && sent > 1) // Data left unsent, reduce the limit. newLimit = (sendMax + sent) / 2; } sent = 0; - while (sent < limit && parent.getBrokerConnection()->doOutput()) + while (sent < limit) { + { + sys::Mutex::ScopedUnlock u(lock); + if (!parent.getBrokerConnection()->doOutput()) break; + } ++sent; - if (sent == limit) sendDoOutput(newLimit); + } + if (sent == limit) sendDoOutput(newLimit, l); } -void OutputInterceptor::sendDoOutput(size_t newLimit) { +void OutputInterceptor::sendDoOutput(size_t newLimit, const sys::Mutex::ScopedLock&) { if (parent.isLocal() && !sentDoOutput && !closing) { sentDoOutput = true; parent.getCluster().getMulticast().mcastControl( @@ -100,6 +105,7 @@ void OutputInterceptor::sendDoOutput(size_t newLimit) { } } +// Called in connection thread when local connection closes. void OutputInterceptor::closeOutput() { sys::Mutex::ScopedLock l(lock); closing = true; diff --git a/qpid/cpp/src/qpid/cluster/OutputInterceptor.h b/qpid/cpp/src/qpid/cluster/OutputInterceptor.h index 65bd82a4fc..3abf5273a0 100644 --- a/qpid/cpp/src/qpid/cluster/OutputInterceptor.h +++ b/qpid/cpp/src/qpid/cluster/OutputInterceptor.h @@ -10,9 +10,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -58,13 +58,13 @@ class OutputInterceptor : public sys::ConnectionOutputHandler { uint32_t getSendMax() const { return sendMax; } void setSendMax(uint32_t sendMax_) { sendMax=sendMax_; } - + cluster::Connection& parent; - + private: typedef sys::Mutex::ScopedLock Locker; - void sendDoOutput(size_t newLimit); + void sendDoOutput(size_t newLimit, const sys::Mutex::ScopedLock&); mutable sys::Mutex lock; bool closing; -- cgit v1.2.1