diff options
author | Alan Conway <aconway@apache.org> | 2014-01-24 21:54:59 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2014-01-24 21:54:59 +0000 |
commit | 7d3e48dc2acf7ca77f044ac34f2063c5a0bf0692 (patch) | |
tree | 41f4d08b748da3f30afed78dd902a99b5b6d436b /qpid/cpp/src/qpid/ha/Primary.cpp | |
parent | ea2a45285fb83554df1364428075cda763644749 (diff) | |
download | qpid-python-7d3e48dc2acf7ca77f044ac34f2063c5a0bf0692.tar.gz |
QPID-5513: HA backup fails if number of replicated queues exceeds number of channels.
The problem:
- create cluster of 2 brokers.
- create more than 32768 queues (exceeds number of channels on a connection)
- backup exits with critical error but
- client creating queues receives no error, primary continues with unreplicated queue.
The solution: Primary raises an error to the client if it attempts to create
queues in excess of the channel limit. The queue is not created on primary
or backup, primary and backup continue as normal.
In addition: raised the channel limit from 32k to 64k. There was no reason for
the smaller limit. See discussion: http://qpid.2158936.n2.nabble.com/CHANNEL-MAX-and-CHANNEL-HIGH-BIT-question-tp7603121p7603138.html
New unit test to reproduce the issue, must create > 64k queues.
Other minor improvements:
- brokertest framework doesn't override --log options in the arguments.
- increased default heartbeat in test framework for tests that have busy brokers.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1561206 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src/qpid/ha/Primary.cpp')
-rw-r--r-- | qpid/cpp/src/qpid/ha/Primary.cpp | 11 |
1 files changed, 8 insertions, 3 deletions
diff --git a/qpid/cpp/src/qpid/ha/Primary.cpp b/qpid/cpp/src/qpid/ha/Primary.cpp index 496704f737..2b97d1dd9c 100644 --- a/qpid/cpp/src/qpid/ha/Primary.cpp +++ b/qpid/cpp/src/qpid/ha/Primary.cpp @@ -92,7 +92,8 @@ class ExpectedBackupTimerTask : public sys::TimerTask { Primary::Primary(HaBroker& hb, const BrokerInfo::Set& expect) : haBroker(hb), membership(hb.getMembership()), logPrefix("Primary: "), active(false), - replicationTest(hb.getSettings().replicateDefault.get()) + replicationTest(hb.getSettings().replicateDefault.get()), + queueLimits(logPrefix) { // Note that at this point, we are still rejecting client connections. // So we are safe from client interference while we set up the primary. @@ -248,16 +249,17 @@ void Primary::queueCreate(const QueuePtr& q) { ReplicateLevel level = replicationTest.useLevel(*q); q->addArgument(QPID_REPLICATE, printable(level).str()); if (level) { - QPID_LOG(debug, logPrefix << "Created queue " << q->getName() - << " replication: " << printable(level)); // Give each queue a unique id. Used by backups to avoid confusion of // same-named queues. q->addArgument(QPID_HA_UUID, types::Variant(Uuid(true))); { Mutex::ScopedLock l(lock); + queueLimits.addQueue(q); // Throws if limit exceeded for (BackupMap::iterator i = backups.begin(); i != backups.end(); ++i) i->second->queueCreate(q); } + QPID_LOG(debug, logPrefix << "Created queue " << q->getName() + << " replication: " << printable(level)); checkReady(); // Outside lock } } @@ -268,6 +270,7 @@ void Primary::queueDestroy(const QueuePtr& q) { QPID_LOG(debug, logPrefix << "Destroyed queue " << q->getName()); { Mutex::ScopedLock l(lock); + queueLimits.removeQueue(q); for (BackupMap::iterator i = backups.begin(); i != backups.end(); ++i) i->second->queueDestroy(q); } @@ -302,6 +305,7 @@ shared_ptr<RemoteBackup> Primary::backupConnect( const BrokerInfo& info, broker::Connection& connection, Mutex::ScopedLock&) { shared_ptr<RemoteBackup> backup(new RemoteBackup(info, &connection)); + queueLimits.addBackup(backup); backups[info.getSystemId()] = backup; return backup; } @@ -309,6 +313,7 @@ shared_ptr<RemoteBackup> Primary::backupConnect( // Remove a backup. Caller should not release the shared pointer returend till // outside the lock. void Primary::backupDisconnect(shared_ptr<RemoteBackup> backup, Mutex::ScopedLock&) { + queueLimits.addBackup(backup); types::Uuid id = backup->getBrokerInfo().getSystemId(); backup->cancel(); expectedBackups.erase(backup); |