summaryrefslogtreecommitdiff
path: root/cpp/src/qpid
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid')
-rw-r--r--cpp/src/qpid/ha/Backup.cpp2
-rw-r--r--cpp/src/qpid/ha/BrokerReplicator.cpp23
-rw-r--r--cpp/src/qpid/ha/BrokerReplicator.h2
-rw-r--r--cpp/src/qpid/ha/Primary.cpp9
-rw-r--r--cpp/src/qpid/ha/PrimaryTxObserver.cpp6
-rw-r--r--cpp/src/qpid/ha/QueueReplicator.cpp24
-rw-r--r--cpp/src/qpid/ha/QueueReplicator.h7
-rw-r--r--cpp/src/qpid/ha/ReplicatingSubscription.cpp1
8 files changed, 49 insertions, 25 deletions
diff --git a/cpp/src/qpid/ha/Backup.cpp b/cpp/src/qpid/ha/Backup.cpp
index 503de3e351..93ad5ec381 100644
--- a/cpp/src/qpid/ha/Backup.cpp
+++ b/cpp/src/qpid/ha/Backup.cpp
@@ -100,8 +100,8 @@ Role* Backup::recover(Mutex::ScopedLock&) {
// Reset membership before allowing backups to connect.
backups = membership.otherBackups();
membership.clear();
- return new Primary(haBroker, backups);
}
+ return new Primary(haBroker, backups);
}
Role* Backup::promote() {
diff --git a/cpp/src/qpid/ha/BrokerReplicator.cpp b/cpp/src/qpid/ha/BrokerReplicator.cpp
index 5e8da17a1b..1587b5b33f 100644
--- a/cpp/src/qpid/ha/BrokerReplicator.cpp
+++ b/cpp/src/qpid/ha/BrokerReplicator.cpp
@@ -865,27 +865,14 @@ bool BrokerReplicator::hasBindings() { return false; }
string BrokerReplicator::getType() const { return QPID_CONFIGURATION_REPLICATOR; }
-void BrokerReplicator::disconnectedExchange(boost::shared_ptr<Exchange> ex) {
+void BrokerReplicator::disconnectedQueueReplicator(boost::shared_ptr<Exchange> ex) {
boost::shared_ptr<QueueReplicator> qr(boost::dynamic_pointer_cast<QueueReplicator>(ex));
- // FIXME aconway 2013-11-01: move logic with releaseFromUse to QueueReplicator
if (qr) {
qr->disconnect();
if (TxReplicator::isTxQueue(qr->getQueue()->getName())) {
// Transactions are aborted on failover so clean up tx-queues
deleteQueue(qr->getQueue()->getName());
}
- else if (qr->getQueue()->isAutoDelete() && qr->isSubscribed()) {
- if (qr->getQueue()->getSettings().autoDeleteDelay) {
- // Start the auto-delete timer
- qr->getQueue()->releaseFromUse();
- qr->getQueue()->scheduleAutoDelete();
- }
- else {
- // Delete immediately. Don't purge, the primary is gone so we need
- // to reroute the deleted messages.
- deleteQueue(qr->getQueue()->getName(), false);
- }
- }
}
}
@@ -893,9 +880,9 @@ typedef vector<boost::shared_ptr<Exchange> > ExchangeVector;
// Callback function for accumulating exchange candidates
namespace {
- void exchangeAccumulatorCallback(ExchangeVector& ev, const Exchange::shared_ptr& i) {
- ev.push_back(i);
- }
+void exchangeAccumulatorCallback(ExchangeVector& ev, const Exchange::shared_ptr& i) {
+ ev.push_back(i);
+}
}
// Called by ConnectionObserver::disconnected, disconnected from the network side.
@@ -907,7 +894,7 @@ void BrokerReplicator::disconnected() {
ExchangeVector exs;
exchanges.eachExchange(boost::bind(&exchangeAccumulatorCallback, boost::ref(exs), _1));
for_each(exs.begin(), exs.end(),
- boost::bind(&BrokerReplicator::disconnectedExchange, this, _1));
+ boost::bind(&BrokerReplicator::disconnectedQueueReplicator, this, _1));
}
void BrokerReplicator::setMembership(const Variant::List& brokers) {
diff --git a/cpp/src/qpid/ha/BrokerReplicator.h b/cpp/src/qpid/ha/BrokerReplicator.h
index e319ab1219..b3e3fe3223 100644
--- a/cpp/src/qpid/ha/BrokerReplicator.h
+++ b/cpp/src/qpid/ha/BrokerReplicator.h
@@ -148,7 +148,7 @@ class BrokerReplicator : public broker::Exchange,
void deleteQueue(const std::string& name, bool purge=true);
void deleteExchange(const std::string& name);
- void disconnectedExchange(boost::shared_ptr<broker::Exchange>);
+ void disconnectedQueueReplicator(boost::shared_ptr<broker::Exchange>);
void disconnected();
void setMembership(const types::Variant::List&); // Set membership from list.
diff --git a/cpp/src/qpid/ha/Primary.cpp b/cpp/src/qpid/ha/Primary.cpp
index 0c1858ceb1..0c0fe983bb 100644
--- a/cpp/src/qpid/ha/Primary.cpp
+++ b/cpp/src/qpid/ha/Primary.cpp
@@ -94,7 +94,16 @@ Primary::Primary(HaBroker& hb, const BrokerInfo::Set& expect) :
logPrefix("Primary: "), active(false),
replicationTest(hb.getSettings().replicateDefault.get())
{
+ // Note that at this point, we are still rejecting client connections.
+ // So we are safe from client interference while we set up the primary.
+
hb.getMembership().setStatus(RECOVERING);
+
+ // Process all QueueReplicators, handles auto-delete queues.
+ QueueReplicator::Vector qrs;
+ QueueReplicator::copy(hb.getBroker().getExchanges(), qrs);
+ std::for_each(qrs.begin(), qrs.end(), boost::bind(&QueueReplicator::promoted, _1));
+
broker::QueueRegistry& queues = hb.getBroker().getQueues();
queues.eachQueue(boost::bind(&Primary::initializeQueue, this, _1));
if (expect.empty()) {
diff --git a/cpp/src/qpid/ha/PrimaryTxObserver.cpp b/cpp/src/qpid/ha/PrimaryTxObserver.cpp
index a32334bcf9..eeb3312aec 100644
--- a/cpp/src/qpid/ha/PrimaryTxObserver.cpp
+++ b/cpp/src/qpid/ha/PrimaryTxObserver.cpp
@@ -26,7 +26,6 @@
#include "QueueGuard.h"
#include "RemoteBackup.h"
#include "ReplicatingSubscription.h"
-#include "QueueReplicator.h"
#include "qpid/broker/Broker.h"
#include "qpid/broker/Queue.h"
@@ -121,7 +120,7 @@ void PrimaryTxObserver::initialize() {
throw InvalidArgumentException(
QPID_MSG(logPrefix << "TX replication queue already exists."));
txQueue = result.first;
- txQueue->markInUse(true); // Prevent auto-delete till we are done.
+ txQueue->markInUse(); // Prevent auto-delete till we are done.
txQueue->deliver(TxBackupsEvent(backups).message());
}
@@ -228,7 +227,8 @@ void PrimaryTxObserver::end(Mutex::ScopedLock&) {
// If there are no outstanding completions, break pointer cycle here.
// Otherwise break it in cancel() when the remaining completions are done.
if (incomplete.empty()) txBuffer = 0;
- txQueue->releaseFromUse(true); // txQueue will auto-delete
+ txQueue->releaseFromUse(); // txQueue will auto-delete
+ txQueue->scheduleAutoDelete();
txQueue.reset();
try {
broker.getExchanges().destroy(getExchangeName());
diff --git a/cpp/src/qpid/ha/QueueReplicator.cpp b/cpp/src/qpid/ha/QueueReplicator.cpp
index cc6c8a3f30..50f2ececdb 100644
--- a/cpp/src/qpid/ha/QueueReplicator.cpp
+++ b/cpp/src/qpid/ha/QueueReplicator.cpp
@@ -50,6 +50,7 @@ using namespace framing::execution;
using namespace std;
using std::exception;
using sys::Mutex;
+using boost::shared_ptr;
const std::string QueueReplicator::QPID_SYNC_FREQUENCY("qpid.sync_frequency");
@@ -61,6 +62,17 @@ bool QueueReplicator::isReplicatorName(const std::string& name) {
return startsWith(name, QUEUE_REPLICATOR_PREFIX);
}
+namespace {
+void pushIfQr(QueueReplicator::Vector& v, const shared_ptr<Exchange>& ex) {
+ shared_ptr<QueueReplicator> qr = boost::dynamic_pointer_cast<QueueReplicator>(ex);
+ if (qr) v.push_back(qr);
+}
+}
+
+void QueueReplicator::copy(ExchangeRegistry& registry, Vector& result) {
+ registry.eachExchange(boost::bind(&pushIfQr, boost::ref(result), _1));
+}
+
class QueueReplicator::ErrorListener : public SessionHandler::ErrorListener {
public:
ErrorListener(const boost::shared_ptr<QueueReplicator>& qr)
@@ -116,6 +128,7 @@ QueueReplicator::QueueReplicator(HaBroker& hb,
framing::FieldTable args = getArgs();
args.setString(QPID_REPLICATE, printable(NONE).str());
setArgs(args);
+ // Don't allow backup queues to auto-delete, primary decides when to delete.
if (q->isAutoDelete()) q->markInUse();
dispatch[DequeueEvent::KEY] =
@@ -306,5 +319,16 @@ bool QueueReplicator::isBound(boost::shared_ptr<Queue>, const std::string* const
bool QueueReplicator::hasBindings() { return false; }
std::string QueueReplicator::getType() const { return ReplicatingSubscription::QPID_QUEUE_REPLICATOR; }
+void QueueReplicator::promoted() {
+ // Promoted to primary, deal with auto-delete now.
+ if (queue && queue->isAutoDelete() && subscribed) {
+ // Make a temporary shared_ptr to prevent premature deletion of queue.
+ // Otherwise scheduleAutoDelete can call this->destroy, which resets this->queue
+ // which could delete the queue while it's still running it's destroyed logic.
+ boost::shared_ptr<Queue> q(queue);
+ q->releaseFromUse();
+ q->scheduleAutoDelete();
+ }
+}
}} // namespace qpid::broker
diff --git a/cpp/src/qpid/ha/QueueReplicator.h b/cpp/src/qpid/ha/QueueReplicator.h
index 6fd140fde3..8938285fe3 100644
--- a/cpp/src/qpid/ha/QueueReplicator.h
+++ b/cpp/src/qpid/ha/QueueReplicator.h
@@ -38,6 +38,7 @@ class Queue;
class QueueRegistry;
class SessionHandler;
class Deliverable;
+class ExchangeRegistry;
}
namespace ha {
@@ -59,9 +60,12 @@ class QueueReplicator : public broker::Exchange,
public:
static const std::string QPID_SYNC_FREQUENCY;
static const std::string REPLICATOR_PREFIX;
+ typedef std::vector<boost::shared_ptr<QueueReplicator> > Vector;
static std::string replicatorName(const std::string& queueName);
static bool isReplicatorName(const std::string&);
+ /*** Copy QueueReplicators from the registry */
+ static void copy(broker::ExchangeRegistry&, Vector& result);
QueueReplicator(HaBroker&,
boost::shared_ptr<broker::Queue> q,
@@ -78,7 +82,6 @@ class QueueReplicator : public broker::Exchange,
// Set if the queue has ever been subscribed to, used for auto-delete cleanup.
void setSubscribed() { subscribed = true; }
- bool isSubscribed() { return subscribed; }
boost::shared_ptr<broker::Queue> getQueue() const { return queue; }
@@ -90,6 +93,8 @@ class QueueReplicator : public broker::Exchange,
bool isBound(boost::shared_ptr<broker::Queue>, const std::string* const, const framing::FieldTable* const);
bool hasBindings();
+ void promoted();
+
protected:
typedef boost::function<void(const std::string&, sys::Mutex::ScopedLock&)> DispatchFn;
typedef qpid::sys::unordered_map<std::string, DispatchFn> DispatchMap;
diff --git a/cpp/src/qpid/ha/ReplicatingSubscription.cpp b/cpp/src/qpid/ha/ReplicatingSubscription.cpp
index d0b93da85f..95215e1e59 100644
--- a/cpp/src/qpid/ha/ReplicatingSubscription.cpp
+++ b/cpp/src/qpid/ha/ReplicatingSubscription.cpp
@@ -22,7 +22,6 @@
#include "Event.h"
#include "IdSetter.h"
#include "QueueGuard.h"
-#include "QueueReplicator.h"
#include "QueueSnapshots.h"
#include "ReplicatingSubscription.h"
#include "TxReplicatingSubscription.h"