summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/ha/BrokerReplicator.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/ha/BrokerReplicator.cpp')
-rw-r--r--cpp/src/qpid/ha/BrokerReplicator.cpp24
1 files changed, 18 insertions, 6 deletions
diff --git a/cpp/src/qpid/ha/BrokerReplicator.cpp b/cpp/src/qpid/ha/BrokerReplicator.cpp
index 37c2a2d6b4..9ab6628e17 100644
--- a/cpp/src/qpid/ha/BrokerReplicator.cpp
+++ b/cpp/src/qpid/ha/BrokerReplicator.cpp
@@ -287,8 +287,8 @@ BrokerReplicator::BrokerReplicator(HaBroker& hb, const boost::shared_ptr<Link>&
alternates(hb.getBroker().getExchanges()),
connection(0)
{
- broker.getConnectionObservers().add(
- boost::shared_ptr<broker::ConnectionObserver>(new ConnectionObserver(*this)));
+ connectionObserver.reset(new ConnectionObserver(*this));
+ broker.getConnectionObservers().add(connectionObserver);
framing::FieldTable args = getArgs();
args.setString(QPID_REPLICATE, printable(NONE).str());
setArgs(args);
@@ -320,10 +320,11 @@ void BrokerReplicator::initialize() {
"", // excludes
false, // dynamic
0, // sync?
- // shared_ptr keeps this in memory until outstanding initializeBridge
+ // shared_ptr keeps this in memory until outstanding connected
// calls are run.
- boost::bind(&BrokerReplicator::initializeBridge, shared_from_this(), _1, _2)
+ boost::bind(&BrokerReplicator::connected, shared_from_this(), _1, _2)
);
+ assert(result.second);
result.first->setErrorListener(
boost::shared_ptr<ErrorListener>(new ErrorListener(logPrefix, *this)));
}
@@ -339,10 +340,21 @@ void collectQueueReplicators(
}
} // namespace
-void BrokerReplicator::shutdown() {}
+void BrokerReplicator::shutdown() {
+ // NOTE: this is called in a QMF dispatch thread, not the Link's connection
+ // thread. It's OK to be unlocked because it doesn't use any mutable state,
+ // it only calls thread safe functions objects belonging to the Broker.
+
+ // Unregister with broker objects:
+ if (connectionObserver) {
+ broker.getConnectionObservers().remove(connectionObserver);
+ connectionObserver.reset();
+ }
+ broker.getExchanges().destroy(getName());
+}
// This is called in the connection IO thread when the bridge is started.
-void BrokerReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHandler) {
+void BrokerReplicator::connected(Bridge& bridge, SessionHandler& sessionHandler) {
// Use the credentials of the outgoing Link connection for creating queues,
// exchanges etc. We know link->getConnection() is non-zero because we are
// being called in the connections thread context.