summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2013-11-21 16:27:53 +0000
committerAlan Conway <aconway@apache.org>2013-11-21 16:27:53 +0000
commitca56df8136bb82f074fae29f877e856a834120fc (patch)
tree7852ac435ffeeb820e0e030f70a7c3494c0d25af
parentbe7491cfc7b25c78687198dd85ad9dc9451a2f2d (diff)
downloadqpid-python-ca56df8136bb82f074fae29f877e856a834120fc.tar.gz
QPID-5366: qpid segfaults in qpid::ha::BrokerReplicator::disconnected
Fix for a race condition: previously, BrokerReplicator created a separate ConnectionObserver object to forward connection events to it. However the Observers locking is such that it is possible for an event to arrive *after* calling Observers::remove (Observers copies the pointers and delivers events outside its lock.) This meant that it was possible for a call to BrokerReplicator::disconnect to be made after the BrokerReplicator was deleted. The fix is to combine BrokerReplicator and BrokerReplicator::ConnectionObserver into a single object with one lifetime that will last until it is removed from both the ExchangeRegistry and the ConnectionObservers. git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/0.26@1544246 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/ha/BrokerReplicator.cpp35
-rw-r--r--qpid/cpp/src/qpid/ha/BrokerReplicator.h18
2 files changed, 19 insertions, 34 deletions
diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
index d27d5e84b3..5e8da17a1b 100644
--- a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
+++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
@@ -24,7 +24,6 @@
#include "TxReplicator.h"
#include "qpid/broker/Broker.h"
#include "qpid/broker/amqp_0_10/Connection.h"
-#include "qpid/broker/ConnectionObserver.h"
#include "qpid/broker/Queue.h"
#include "qpid/broker/QueueSettings.h"
#include "qpid/broker/Link.h"
@@ -203,22 +202,6 @@ class BrokerReplicator::ErrorListener : public SessionHandler::ErrorListener {
BrokerReplicator& brokerReplicator;
};
-class BrokerReplicator::ConnectionObserver : public broker::ConnectionObserver
-{
- public:
- ConnectionObserver(BrokerReplicator& br) : brokerReplicator(br) {}
- virtual void connection(Connection&) {}
- virtual void opened(Connection&) {}
-
- virtual void closed(Connection& c) {
- if (brokerReplicator.link && &c == brokerReplicator.connection)
- brokerReplicator.disconnected();
- }
- virtual void forced(Connection& c, const std::string& /*message*/) { closed(c); }
- private:
- BrokerReplicator& brokerReplicator;
-};
-
/** Keep track of queues or exchanges during the update process to solve 2
* problems.
*
@@ -300,10 +283,8 @@ BrokerReplicator::BrokerReplicator(HaBroker& hb, const boost::shared_ptr<Link>&
link(l),
initialized(false),
alternates(hb.getBroker().getExchanges()),
- connection(0),
- connectionObserver(new ConnectionObserver(*this))
+ connect(0)
{
- broker.getConnectionObservers().add(connectionObserver);
framing::FieldTable args = getArgs();
args.setString(QPID_REPLICATE, printable(NONE).str());
setArgs(args);
@@ -343,9 +324,10 @@ void BrokerReplicator::initialize() {
assert(result.second);
result.first->setErrorListener(
boost::shared_ptr<ErrorListener>(new ErrorListener(logPrefix, *this)));
+ broker.getConnectionObservers().add(shared_from_this());
}
-BrokerReplicator::~BrokerReplicator() { shutdown(); }
+BrokerReplicator::~BrokerReplicator() {}
namespace {
void collectQueueReplicators(
@@ -363,10 +345,7 @@ void BrokerReplicator::shutdown() {
// it only calls thread safe functions objects belonging to the Broker.
// Unregister with broker objects:
- if (connectionObserver) {
- broker.getConnectionObservers().remove(connectionObserver);
- connectionObserver.reset();
- }
+ broker.getConnectionObservers().remove(shared_from_this());
broker.getExchanges().destroy(getName());
}
@@ -376,8 +355,8 @@ void BrokerReplicator::connected(Bridge& bridge, SessionHandler& sessionHandler)
// exchanges etc. We know link->getConnection() is non-zero because we are
// being called in the connections thread context.
//
- connection = link->getConnection();
- assert(connection);
+ connect = link->getConnection();
+ assert(connect);
userId = link->getConnection()->getUserId();
remoteHost = link->getConnection()->getMgmtId();
@@ -922,7 +901,7 @@ namespace {
// Called by ConnectionObserver::disconnected, disconnected from the network side.
void BrokerReplicator::disconnected() {
QPID_LOG(info, logPrefix << "Disconnected from primary " << primary);
- connection = 0;
+ connect = 0;
// Make copy of exchanges so we can work outside the registry lock.
ExchangeVector exs;
diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.h b/qpid/cpp/src/qpid/ha/BrokerReplicator.h
index 07b992df6a..e319ab1219 100644
--- a/qpid/cpp/src/qpid/ha/BrokerReplicator.h
+++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.h
@@ -27,6 +27,7 @@
#include "AlternateExchangeSetter.h"
#include "qpid/Address.h"
#include "qpid/broker/Exchange.h"
+#include "qpid/broker/ConnectionObserver.h"
#include "qpid/types/Variant.h"
#include "qpid/management/ManagementObject.h"
#include "qpid/sys/unordered_map.h"
@@ -68,7 +69,8 @@ class QueueReplicator;
*
*/
class BrokerReplicator : public broker::Exchange,
- public boost::enable_shared_from_this<BrokerReplicator>
+ public boost::enable_shared_from_this<BrokerReplicator>,
+ public broker::ConnectionObserver
{
public:
typedef boost::shared_ptr<QueueReplicator> QueueReplicatorPtr;
@@ -76,7 +78,8 @@ class BrokerReplicator : public broker::Exchange,
BrokerReplicator(HaBroker&, const boost::shared_ptr<broker::Link>&);
~BrokerReplicator();
- void initialize();
+ void initialize(); // Must be called immediately after constructor.
+ void shutdown();
// Exchange methods
std::string getType() const;
@@ -85,7 +88,12 @@ class BrokerReplicator : public broker::Exchange,
void route(broker::Deliverable&);
bool isBound(boost::shared_ptr<broker::Queue>, const std::string* const, const framing::FieldTable* const);
bool hasBindings();
- void shutdown();
+
+ // ConnectionObserver methods
+ void connection(broker::Connection&) {}
+ void opened(broker::Connection&) {}
+ void closed(broker::Connection& c) { if (link && &c == connect) disconnected(); }
+ void forced(broker::Connection& c, const std::string& /*message*/) { closed(c); }
QueueReplicatorPtr findQueueReplicator(const std::string& qname);
@@ -100,7 +108,6 @@ class BrokerReplicator : public broker::Exchange,
class UpdateTracker;
class ErrorListener;
- class ConnectionObserver;
void connected(broker::Bridge&, broker::SessionHandler&);
void existingQueue(const boost::shared_ptr<broker::Queue>&);
@@ -157,11 +164,10 @@ class BrokerReplicator : public broker::Exchange,
bool initialized;
AlternateExchangeSetter alternates;
qpid::Address primary;
- broker::Connection* connection;
+ broker::Connection* connect;
EventDispatchMap dispatch;
std::auto_ptr<UpdateTracker> queueTracker;
std::auto_ptr<UpdateTracker> exchangeTracker;
- boost::shared_ptr<ConnectionObserver> connectionObserver;
};
}} // namespace qpid::broker