summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/ha/QueueReplicator.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/ha/QueueReplicator.cpp')
-rw-r--r--cpp/src/qpid/ha/QueueReplicator.cpp24
1 files changed, 24 insertions, 0 deletions
diff --git a/cpp/src/qpid/ha/QueueReplicator.cpp b/cpp/src/qpid/ha/QueueReplicator.cpp
index cc6c8a3f30..50f2ececdb 100644
--- a/cpp/src/qpid/ha/QueueReplicator.cpp
+++ b/cpp/src/qpid/ha/QueueReplicator.cpp
@@ -50,6 +50,7 @@ using namespace framing::execution;
using namespace std;
using std::exception;
using sys::Mutex;
+using boost::shared_ptr;
const std::string QueueReplicator::QPID_SYNC_FREQUENCY("qpid.sync_frequency");
@@ -61,6 +62,17 @@ bool QueueReplicator::isReplicatorName(const std::string& name) {
return startsWith(name, QUEUE_REPLICATOR_PREFIX);
}
+namespace {
+void pushIfQr(QueueReplicator::Vector& v, const shared_ptr<Exchange>& ex) {
+ shared_ptr<QueueReplicator> qr = boost::dynamic_pointer_cast<QueueReplicator>(ex);
+ if (qr) v.push_back(qr);
+}
+}
+
+void QueueReplicator::copy(ExchangeRegistry& registry, Vector& result) {
+ registry.eachExchange(boost::bind(&pushIfQr, boost::ref(result), _1));
+}
+
class QueueReplicator::ErrorListener : public SessionHandler::ErrorListener {
public:
ErrorListener(const boost::shared_ptr<QueueReplicator>& qr)
@@ -116,6 +128,7 @@ QueueReplicator::QueueReplicator(HaBroker& hb,
framing::FieldTable args = getArgs();
args.setString(QPID_REPLICATE, printable(NONE).str());
setArgs(args);
+ // Don't allow backup queues to auto-delete, primary decides when to delete.
if (q->isAutoDelete()) q->markInUse();
dispatch[DequeueEvent::KEY] =
@@ -306,5 +319,16 @@ bool QueueReplicator::isBound(boost::shared_ptr<Queue>, const std::string* const
bool QueueReplicator::hasBindings() { return false; }
std::string QueueReplicator::getType() const { return ReplicatingSubscription::QPID_QUEUE_REPLICATOR; }
+void QueueReplicator::promoted() {
+ // Promoted to primary, deal with auto-delete now.
+ if (queue && queue->isAutoDelete() && subscribed) {
+ // Make a temporary shared_ptr to prevent premature deletion of queue.
+ // Otherwise scheduleAutoDelete can call this->destroy, which resets this->queue
+ // which could delete the queue while it's still running it's destroyed logic.
+ boost::shared_ptr<Queue> q(queue);
+ q->releaseFromUse();
+ q->scheduleAutoDelete();
+ }
+}
}} // namespace qpid::broker