diff options
author | Alan Conway <aconway@apache.org> | 2013-02-07 19:26:12 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2013-02-07 19:26:12 +0000 |
commit | 88613594a4bfa64c5f244ce2d183d605449e2496 (patch) | |
tree | 4e475fbdd91e5eff2672dcc045709f3626fb2550 /qpid/cpp/src/qpid/ha/Primary.cpp | |
parent | 2f8518477686ce2382110d159349fe732db27a1a (diff) | |
download | qpid-python-88613594a4bfa64c5f244ce2d183d605449e2496.tar.gz |
QPID-4555: HA Primary sets explicit qpid.replicate in Queue and Exchange arguments.
Previously both Primary and Backup would calculate the qpid.replicate value
independently, assuming the result would be the same. In the case of exclusive
queues, the exclusivity can change over time so its possible that primary and
backup won't agree.
Now only Primary does the calculation with exclusive, auto-delete etc. and puts
an explicity qpid.replicate in the queue or event arguments. Backup uses the
value set by primary.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1443678 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src/qpid/ha/Primary.cpp')
-rw-r--r-- | qpid/cpp/src/qpid/ha/Primary.cpp | 40 |
1 files changed, 25 insertions, 15 deletions
diff --git a/qpid/cpp/src/qpid/ha/Primary.cpp b/qpid/cpp/src/qpid/ha/Primary.cpp index 67108fa5f9..93dbbbea85 100644 --- a/qpid/cpp/src/qpid/ha/Primary.cpp +++ b/qpid/cpp/src/qpid/ha/Primary.cpp @@ -83,7 +83,8 @@ Primary* Primary::instance = 0; Primary::Primary(HaBroker& hb, const BrokerInfo::Set& expect) : haBroker(hb), membership(hb.getMembership()), - logPrefix("Primary: "), active(false) + logPrefix("Primary: "), active(false), + replicationTest(hb.getSettings().replicateDefault.get()) { hb.getMembership().setStatus(RECOVERING); assert(instance == 0); @@ -97,8 +98,7 @@ Primary::Primary(HaBroker& hb, const BrokerInfo::Set& expect) : // the QueueGuards are created. QPID_LOG(notice, logPrefix << "Promoted to primary. Expected backups: " << expect); for (BrokerInfo::Set::const_iterator i = expect.begin(); i != expect.end(); ++i) { - boost::shared_ptr<RemoteBackup> backup( - new RemoteBackup(*i, haBroker.getReplicationTest(), 0)); + boost::shared_ptr<RemoteBackup> backup(new RemoteBackup(*i, 0)); backups[i->getSystemId()] = backup; if (!backup->isReady()) expectedBackups.insert(backup); backup->setCatchupQueues(hb.getBroker().getQueues(), true); // Create guards @@ -196,19 +196,25 @@ void Primary::readyReplica(const ReplicatingSubscription& rs) { // NOTE: Called with queue registry lock held. void Primary::queueCreate(const QueuePtr& q) { - if (haBroker.getReplicationTest().isReplicated(CONFIGURATION, *q)) { + // Set replication argument. + ReplicateLevel level = replicationTest.useLevel(*q); + QPID_LOG(debug, logPrefix << "Created queue " << q->getName() + << " replication: " << printable(level)); + q->addArgument(QPID_REPLICATE, printable(level).str()); + if (level) { // Give each queue a unique id to avoid confusion of same-named queues. q->addArgument(QPID_HA_UUID, types::Variant(Uuid(true))); - } - Mutex::ScopedLock l(lock); - for (BackupMap::iterator i = backups.begin(); i != backups.end(); ++i) { - i->second->queueCreate(q); - checkReady(i, l); + Mutex::ScopedLock l(lock); + for (BackupMap::iterator i = backups.begin(); i != backups.end(); ++i) { + i->second->queueCreate(q); + checkReady(i, l); + } } } // NOTE: Called with queue registry lock held. void Primary::queueDestroy(const QueuePtr& q) { + QPID_LOG(debug, logPrefix << "Destroyed queue " << q->getName()); Mutex::ScopedLock l(lock); for (BackupMap::iterator i = backups.begin(); i != backups.end(); ++i) i->second->queueDestroy(q); @@ -217,16 +223,21 @@ void Primary::queueDestroy(const QueuePtr& q) { // NOTE: Called with exchange registry lock held. void Primary::exchangeCreate(const ExchangePtr& ex) { - if (haBroker.getReplicationTest().isReplicated(CONFIGURATION, *ex)) { + ReplicateLevel level = replicationTest.useLevel(*ex); + QPID_LOG(debug, logPrefix << "Created exchange " << ex->getName() + << " replication: " << printable(level)); + FieldTable args = ex->getArgs(); + args.setString(QPID_REPLICATE, printable(level).str()); // Set replication arg. + if (level) { // Give each exchange a unique id to avoid confusion of same-named exchanges. - FieldTable args = ex->getArgs(); args.set(QPID_HA_UUID, FieldTable::ValuePtr(new UuidValue(&Uuid(true)[0]))); - ex->setArgs(args); } + ex->setArgs(args); } // NOTE: Called with exchange registry lock held. -void Primary::exchangeDestroy(const ExchangePtr&) { +void Primary::exchangeDestroy(const ExchangePtr& ex) { + QPID_LOG(debug, logPrefix << "Destroyed exchange " << ex->getName()); // Do nothing } @@ -237,8 +248,7 @@ void Primary::opened(broker::Connection& connection) { BackupMap::iterator i = backups.find(info.getSystemId()); if (i == backups.end()) { QPID_LOG(info, logPrefix << "New backup connected: " << info); - boost::shared_ptr<RemoteBackup> backup( - new RemoteBackup(info, haBroker.getReplicationTest(), &connection)); + boost::shared_ptr<RemoteBackup> backup(new RemoteBackup(info, &connection)); { // Avoid deadlock with queue registry lock. Mutex::ScopedUnlock u(lock); |