diff options
Diffstat (limited to 'cpp/src/qpid/cluster/Cluster.cpp')
-rw-r--r-- | cpp/src/qpid/cluster/Cluster.cpp | 8 |
1 files changed, 6 insertions, 2 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index 6221b0054c..7285b85991 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -163,7 +163,6 @@ void Cluster::addShadowConnection(const boost::intrusive_ptr<Connection>& c) { void Cluster::erase(const ConnectionId& id) { // Called only by Connection::deliverClose in deliver thread, no need to lock. connections.erase(id); - decoder.erase(id); } std::vector<string> Cluster::getIds() const { @@ -231,9 +230,14 @@ void Cluster::deliveredEvent(const Event& e) { Buffer buf(const_cast<char*>(e.getData()), e.getSize()); if (e.getType() == CONTROL) { AMQFrame frame; - while (frame.decode(buf)) + while (frame.decode(buf)) { + // Check for deliver close here so we can erase the + // connection decoder safely in this thread. + if (frame.getMethod()->isA<ClusterConnectionDeliverCloseBody>()) + decoder.erase(e.getConnectionId()); deliverFrameQueue.push(EventFrame(e, frame)); } + } else if (e.getType() == DATA) decoder.decode(e, e.getData()); } |