summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/ha/Primary.cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2014-01-24 21:54:59 +0000
committerAlan Conway <aconway@apache.org>2014-01-24 21:54:59 +0000
commit7d3e48dc2acf7ca77f044ac34f2063c5a0bf0692 (patch)
tree41f4d08b748da3f30afed78dd902a99b5b6d436b /qpid/cpp/src/qpid/ha/Primary.cpp
parentea2a45285fb83554df1364428075cda763644749 (diff)
downloadqpid-python-7d3e48dc2acf7ca77f044ac34f2063c5a0bf0692.tar.gz
QPID-5513: HA backup fails if number of replicated queues exceeds number of channels.
The problem: - create cluster of 2 brokers. - create more than 32768 queues (exceeds number of channels on a connection) - backup exits with critical error but - client creating queues receives no error, primary continues with unreplicated queue. The solution: Primary raises an error to the client if it attempts to create queues in excess of the channel limit. The queue is not created on primary or backup, primary and backup continue as normal. In addition: raised the channel limit from 32k to 64k. There was no reason for the smaller limit. See discussion: http://qpid.2158936.n2.nabble.com/CHANNEL-MAX-and-CHANNEL-HIGH-BIT-question-tp7603121p7603138.html New unit test to reproduce the issue, must create > 64k queues. Other minor improvements: - brokertest framework doesn't override --log options in the arguments. - increased default heartbeat in test framework for tests that have busy brokers. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1561206 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src/qpid/ha/Primary.cpp')
-rw-r--r--qpid/cpp/src/qpid/ha/Primary.cpp11
1 files changed, 8 insertions, 3 deletions
diff --git a/qpid/cpp/src/qpid/ha/Primary.cpp b/qpid/cpp/src/qpid/ha/Primary.cpp
index 496704f737..2b97d1dd9c 100644
--- a/qpid/cpp/src/qpid/ha/Primary.cpp
+++ b/qpid/cpp/src/qpid/ha/Primary.cpp
@@ -92,7 +92,8 @@ class ExpectedBackupTimerTask : public sys::TimerTask {
Primary::Primary(HaBroker& hb, const BrokerInfo::Set& expect) :
haBroker(hb), membership(hb.getMembership()),
logPrefix("Primary: "), active(false),
- replicationTest(hb.getSettings().replicateDefault.get())
+ replicationTest(hb.getSettings().replicateDefault.get()),
+ queueLimits(logPrefix)
{
// Note that at this point, we are still rejecting client connections.
// So we are safe from client interference while we set up the primary.
@@ -248,16 +249,17 @@ void Primary::queueCreate(const QueuePtr& q) {
ReplicateLevel level = replicationTest.useLevel(*q);
q->addArgument(QPID_REPLICATE, printable(level).str());
if (level) {
- QPID_LOG(debug, logPrefix << "Created queue " << q->getName()
- << " replication: " << printable(level));
// Give each queue a unique id. Used by backups to avoid confusion of
// same-named queues.
q->addArgument(QPID_HA_UUID, types::Variant(Uuid(true)));
{
Mutex::ScopedLock l(lock);
+ queueLimits.addQueue(q); // Throws if limit exceeded
for (BackupMap::iterator i = backups.begin(); i != backups.end(); ++i)
i->second->queueCreate(q);
}
+ QPID_LOG(debug, logPrefix << "Created queue " << q->getName()
+ << " replication: " << printable(level));
checkReady(); // Outside lock
}
}
@@ -268,6 +270,7 @@ void Primary::queueDestroy(const QueuePtr& q) {
QPID_LOG(debug, logPrefix << "Destroyed queue " << q->getName());
{
Mutex::ScopedLock l(lock);
+ queueLimits.removeQueue(q);
for (BackupMap::iterator i = backups.begin(); i != backups.end(); ++i)
i->second->queueDestroy(q);
}
@@ -302,6 +305,7 @@ shared_ptr<RemoteBackup> Primary::backupConnect(
const BrokerInfo& info, broker::Connection& connection, Mutex::ScopedLock&)
{
shared_ptr<RemoteBackup> backup(new RemoteBackup(info, &connection));
+ queueLimits.addBackup(backup);
backups[info.getSystemId()] = backup;
return backup;
}
@@ -309,6 +313,7 @@ shared_ptr<RemoteBackup> Primary::backupConnect(
// Remove a backup. Caller should not release the shared pointer returend till
// outside the lock.
void Primary::backupDisconnect(shared_ptr<RemoteBackup> backup, Mutex::ScopedLock&) {
+ queueLimits.addBackup(backup);
types::Uuid id = backup->getBrokerInfo().getSystemId();
backup->cancel();
expectedBackups.erase(backup);