diff options
author | Alan Conway <aconway@apache.org> | 2014-01-24 21:54:59 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2014-01-24 21:54:59 +0000 |
commit | 7d3e48dc2acf7ca77f044ac34f2063c5a0bf0692 (patch) | |
tree | 41f4d08b748da3f30afed78dd902a99b5b6d436b /qpid/cpp/src | |
parent | ea2a45285fb83554df1364428075cda763644749 (diff) | |
download | qpid-python-7d3e48dc2acf7ca77f044ac34f2063c5a0bf0692.tar.gz |
QPID-5513: HA backup fails if number of replicated queues exceeds number of channels.
The problem:
- create cluster of 2 brokers.
- create more than 32768 queues (exceeds number of channels on a connection)
- backup exits with critical error but
- client creating queues receives no error, primary continues with unreplicated queue.
The solution: Primary raises an error to the client if it attempts to create
queues in excess of the channel limit. The queue is not created on primary
or backup, primary and backup continue as normal.
In addition: raised the channel limit from 32k to 64k. There was no reason for
the smaller limit. See discussion: http://qpid.2158936.n2.nabble.com/CHANNEL-MAX-and-CHANNEL-HIGH-BIT-question-tp7603121p7603138.html
New unit test to reproduce the issue, must create > 64k queues.
Other minor improvements:
- brokertest framework doesn't override --log options in the arguments.
- increased default heartbeat in test framework for tests that have busy brokers.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1561206 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src')
-rw-r--r-- | qpid/cpp/src/CMakeLists.txt | 1 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/client/Connection.h | 1 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/framing/amqp_types.h | 5 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/Backup.cpp | 1 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/Primary.cpp | 11 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/Primary.h | 2 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/PrimaryQueueLimits.h | 90 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/QueueReplicator.cpp | 1 | ||||
-rw-r--r-- | qpid/cpp/src/tests/CMakeLists.txt | 4 | ||||
-rw-r--r-- | qpid/cpp/src/tests/brokertest.py | 11 | ||||
-rwxr-xr-x | qpid/cpp/src/tests/ha_test.py | 7 | ||||
-rw-r--r-- | qpid/cpp/src/tests/ha_test_max_queues.cpp | 64 | ||||
-rwxr-xr-x | qpid/cpp/src/tests/ha_tests.py | 13 |
13 files changed, 195 insertions, 16 deletions
diff --git a/qpid/cpp/src/CMakeLists.txt b/qpid/cpp/src/CMakeLists.txt index 25f7e4a943..7b8ea49251 100644 --- a/qpid/cpp/src/CMakeLists.txt +++ b/qpid/cpp/src/CMakeLists.txt @@ -609,6 +609,7 @@ if (BUILD_HA) qpid/ha/Membership.h qpid/ha/Primary.cpp qpid/ha/Primary.h + qpid/ha/PrimaryQueueLimits.h qpid/ha/QueueGuard.cpp qpid/ha/QueueGuard.h qpid/ha/QueueReplicator.cpp diff --git a/qpid/cpp/src/qpid/client/Connection.h b/qpid/cpp/src/qpid/client/Connection.h index fb502cb40a..8c8a106c36 100644 --- a/qpid/cpp/src/qpid/client/Connection.h +++ b/qpid/cpp/src/qpid/client/Connection.h @@ -105,7 +105,6 @@ class QPID_CLIENT_CLASS_EXTERN Connection const std::string& uid = "", const std::string& pwd = "", const std::string& virtualhost = "/", uint16_t maxFrameSize=65535); - /** * Opens a connection to a broker using a URL. * If the URL contains multiple addresses, try each in turn diff --git a/qpid/cpp/src/qpid/framing/amqp_types.h b/qpid/cpp/src/qpid/framing/amqp_types.h index 2072a83904..3fe8b68dcd 100644 --- a/qpid/cpp/src/qpid/framing/amqp_types.h +++ b/qpid/cpp/src/qpid/framing/amqp_types.h @@ -46,9 +46,8 @@ struct Uuid; // Useful constants -/** Maximum channel ID used by broker. Reserve high bit for internal use.*/ -const ChannelId CHANNEL_MAX=(ChannelId(~1))>>1; -const ChannelId CHANNEL_HIGH_BIT= ChannelId(~CHANNEL_MAX); +/** Maximum channel ID used by broker. */ +const ChannelId CHANNEL_MAX=(ChannelId(~1)); // Forward declare class types class FramingContent; diff --git a/qpid/cpp/src/qpid/ha/Backup.cpp b/qpid/cpp/src/qpid/ha/Backup.cpp index 93ad5ec381..d33fcdd6b4 100644 --- a/qpid/cpp/src/qpid/ha/Backup.cpp +++ b/qpid/cpp/src/qpid/ha/Backup.cpp @@ -30,7 +30,6 @@ #include "qpid/amqp_0_10/Codecs.h" #include "qpid/broker/Bridge.h" #include "qpid/broker/Broker.h" -#include "qpid/broker/SessionHandler.h" #include "qpid/broker/Link.h" #include "qpid/framing/AMQP_ServerProxy.h" #include "qpid/framing/AMQFrame.h" diff --git a/qpid/cpp/src/qpid/ha/Primary.cpp b/qpid/cpp/src/qpid/ha/Primary.cpp index 496704f737..2b97d1dd9c 100644 --- a/qpid/cpp/src/qpid/ha/Primary.cpp +++ b/qpid/cpp/src/qpid/ha/Primary.cpp @@ -92,7 +92,8 @@ class ExpectedBackupTimerTask : public sys::TimerTask { Primary::Primary(HaBroker& hb, const BrokerInfo::Set& expect) : haBroker(hb), membership(hb.getMembership()), logPrefix("Primary: "), active(false), - replicationTest(hb.getSettings().replicateDefault.get()) + replicationTest(hb.getSettings().replicateDefault.get()), + queueLimits(logPrefix) { // Note that at this point, we are still rejecting client connections. // So we are safe from client interference while we set up the primary. @@ -248,16 +249,17 @@ void Primary::queueCreate(const QueuePtr& q) { ReplicateLevel level = replicationTest.useLevel(*q); q->addArgument(QPID_REPLICATE, printable(level).str()); if (level) { - QPID_LOG(debug, logPrefix << "Created queue " << q->getName() - << " replication: " << printable(level)); // Give each queue a unique id. Used by backups to avoid confusion of // same-named queues. q->addArgument(QPID_HA_UUID, types::Variant(Uuid(true))); { Mutex::ScopedLock l(lock); + queueLimits.addQueue(q); // Throws if limit exceeded for (BackupMap::iterator i = backups.begin(); i != backups.end(); ++i) i->second->queueCreate(q); } + QPID_LOG(debug, logPrefix << "Created queue " << q->getName() + << " replication: " << printable(level)); checkReady(); // Outside lock } } @@ -268,6 +270,7 @@ void Primary::queueDestroy(const QueuePtr& q) { QPID_LOG(debug, logPrefix << "Destroyed queue " << q->getName()); { Mutex::ScopedLock l(lock); + queueLimits.removeQueue(q); for (BackupMap::iterator i = backups.begin(); i != backups.end(); ++i) i->second->queueDestroy(q); } @@ -302,6 +305,7 @@ shared_ptr<RemoteBackup> Primary::backupConnect( const BrokerInfo& info, broker::Connection& connection, Mutex::ScopedLock&) { shared_ptr<RemoteBackup> backup(new RemoteBackup(info, &connection)); + queueLimits.addBackup(backup); backups[info.getSystemId()] = backup; return backup; } @@ -309,6 +313,7 @@ shared_ptr<RemoteBackup> Primary::backupConnect( // Remove a backup. Caller should not release the shared pointer returend till // outside the lock. void Primary::backupDisconnect(shared_ptr<RemoteBackup> backup, Mutex::ScopedLock&) { + queueLimits.addBackup(backup); types::Uuid id = backup->getBrokerInfo().getSystemId(); backup->cancel(); expectedBackups.erase(backup); diff --git a/qpid/cpp/src/qpid/ha/Primary.h b/qpid/cpp/src/qpid/ha/Primary.h index e0a7065e2c..2e32515c9a 100644 --- a/qpid/cpp/src/qpid/ha/Primary.h +++ b/qpid/cpp/src/qpid/ha/Primary.h @@ -25,6 +25,7 @@ #include "types.h" #include "hash.h" #include "BrokerInfo.h" +#include "PrimaryQueueLimits.h" #include "ReplicationTest.h" #include "Role.h" #include "qpid/sys/Mutex.h" @@ -154,6 +155,7 @@ class Primary : public Role boost::intrusive_ptr<sys::TimerTask> timerTask; ReplicaMap replicas; TxMap txMap; + PrimaryQueueLimits queueLimits; }; }} // namespace qpid::ha diff --git a/qpid/cpp/src/qpid/ha/PrimaryQueueLimits.h b/qpid/cpp/src/qpid/ha/PrimaryQueueLimits.h new file mode 100644 index 0000000000..a2322f1545 --- /dev/null +++ b/qpid/cpp/src/qpid/ha/PrimaryQueueLimits.h @@ -0,0 +1,90 @@ +#ifndef QPID_HA_PRIMARYQUEUELIMITS_H +#define QPID_HA_PRIMARYQUEUELIMITS_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <qpid/broker/Queue.h> +#include <qpid/framing/amqp_types.h> +#include <boost/shared_ptr.hpp> +#include <string> + +namespace qpid { +namespace broker { +class Queue; +} + +namespace ha { +class RemoteBackup; + +/** + * Track queue limits on the primary, ensure the primary does not attempt to + * replicate more queues than the backups can handle. + * + * THREAD UNSAFE: Protected by Primary::lock + */ +class PrimaryQueueLimits +{ + public: + // FIXME aconway 2014-01-24: hardcoded maxQueues, use negotiated channel-max + PrimaryQueueLimits(const std::string& lp) : + logPrefix(lp), maxQueues(framing::CHANNEL_MAX-100), queues(0) {} + + /** Add a replicated queue + *@exception ResourceLimitExceededException if this would exceed the limit. + */ + void addQueue(const boost::shared_ptr<broker::Queue>& q) { + if (queues >= maxQueues) { + QPID_LOG(error, logPrefix << "Cannot create replicated queue " << q->getName() + << " exceeds limit of " << maxQueues + << " replicated queues."); + throw framing::ResourceLimitExceededException( + "Exceeded replicated queue limit."); + } + else ++queues; + } + + /** Remove a replicated queue. + * @pre Was previously added with addQueue + */ + void removeQueue(const boost::shared_ptr<broker::Queue>&) { --queues; } + + // TODO aconway 2014-01-24: Currently replication links always use the + // hard-coded framing::CHANNEL_MAX. In future (e.g. when we support AMQP1.0 + // on replication links) we may need to check the actual channel max on each + // link and update maxQueues to the smallest value. addBackup and removeBackup + // are placeholders for that. + + /** Add a backup */ + void addBackup(const boost::shared_ptr<RemoteBackup>&) {} + + /** Remove a backup */ + void removeBackup(const boost::shared_ptr<RemoteBackup>&) {} + + private: + std::string logPrefix; + uint64_t maxQueues; + uint64_t queues; +}; + +}} // namespace qpid::ha + +#endif /*!QPID_HA_PRIMARYQUEUELIMITS_H*/ diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp index eda3f96180..1300819eb7 100644 --- a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp +++ b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp @@ -34,7 +34,6 @@ #include "qpid/broker/QueueObserver.h" #include "qpid/broker/QueueRegistry.h" #include "qpid/broker/SessionHandler.h" -#include "qpid/broker/SessionHandler.h" #include "qpid/framing/FieldTable.h" #include "qpid/log/Statement.h" #include "qpid/Msg.h" diff --git a/qpid/cpp/src/tests/CMakeLists.txt b/qpid/cpp/src/tests/CMakeLists.txt index a4093c82a2..abcb82b97d 100644 --- a/qpid/cpp/src/tests/CMakeLists.txt +++ b/qpid/cpp/src/tests/CMakeLists.txt @@ -299,6 +299,10 @@ add_executable (msg_group_test msg_group_test.cpp ${platform_test_additions}) target_link_libraries (msg_group_test qpidmessaging qpidtypes qpidcommon "${Boost_PROGRAM_OPTIONS_LIBRARY}") remember_location(msg_group_test) +add_executable (ha_test_max_queues ha_test_max_queues.cpp ${platform_test_additions}) +target_link_libraries (ha_test_max_queues qpidmessaging qpidtypes qpidcommon "${Boost_PROGRAM_OPTIONS_LIBRARY}") +remember_location(ha_test_max_queues) + if (BUILD_SASL) add_executable (sasl_version sasl_version.cpp ${platform_test_additions}) remember_location(sasl_version) diff --git a/qpid/cpp/src/tests/brokertest.py b/qpid/cpp/src/tests/brokertest.py index 3f99d8647b..c7fcf2c3af 100644 --- a/qpid/cpp/src/tests/brokertest.py +++ b/qpid/cpp/src/tests/brokertest.py @@ -247,7 +247,7 @@ class Broker(Popen): def get_log(self): return os.path.abspath(self.log) - def __init__(self, test, args=[], test_store=False, name=None, expect=EXPECT_RUNNING, port=0, log_level=None, wait=None, show_cmd=False): + def __init__(self, test, args=[], test_store=False, name=None, expect=EXPECT_RUNNING, port=0, wait=None, show_cmd=False): """Start a broker daemon. name determines the data-dir and log file names.""" @@ -276,7 +276,10 @@ class Broker(Popen): cmd += ["--log-to-file", self.log] cmd += ["--log-to-stderr=no"] - cmd += ["--log-enable=%s"%(log_level or "info+") ] + + # Add default --log-enable arguments unless args already has --log arguments. + if not next((l for l in args if l.startswith("--log")), None): + args += ["--log-enable=info+"] if test_store: cmd += ["--load-module", BrokerTest.test_store_lib, "--test-store-events", self.store_log] @@ -474,9 +477,9 @@ class BrokerTest(TestCase): self.cleanup_stop(p) return p - def broker(self, args=[], name=None, expect=EXPECT_RUNNING, wait=True, port=0, log_level=None, show_cmd=False): + def broker(self, args=[], name=None, expect=EXPECT_RUNNING, wait=True, port=0, show_cmd=False): """Create and return a broker ready for use""" - b = Broker(self, args=args, name=name, expect=expect, port=port, log_level=log_level, show_cmd=show_cmd) + b = Broker(self, args=args, name=name, expect=expect, port=port, show_cmd=show_cmd) if (wait): try: b.ready() except Exception, e: diff --git a/qpid/cpp/src/tests/ha_test.py b/qpid/cpp/src/tests/ha_test.py index 8dd1347144..4606fab746 100755 --- a/qpid/cpp/src/tests/ha_test.py +++ b/qpid/cpp/src/tests/ha_test.py @@ -121,7 +121,7 @@ class HaBroker(Broker): @param client_cred: (user, password, mechanism) for admin clients started by the HaBroker. """ - heartbeat=2 + heartbeat=5 def __init__(self, test, ha_port=None, args=[], brokers_url=None, ha_cluster=True, ha_replicate="all", client_credentials=None, **kwargs): @@ -129,8 +129,6 @@ class HaBroker(Broker): ha_port = ha_port or HaPort(test) args = copy(args) args += ["--load-module", BrokerTest.ha_lib, - "--log-enable=info+", - "--log-enable=debug+:ha::", # Non-standard settings for faster tests. "--link-maintenance-interval=0.1", # Heartbeat and negotiate time are needed so that a broker wont @@ -138,6 +136,9 @@ class HaBroker(Broker): "--link-heartbeat-interval=%s"%(HaBroker.heartbeat), "--max-negotiate-time=1000", "--ha-cluster=%s"%ha_cluster] + # Add default --log-enable arguments unless args already has --log arguments. + if not next((l for l in args if l.startswith("--log")), None): + args += ["--log-enable=info+", "--log-enable=debug+:ha::"] if ha_replicate is not None: args += [ "--ha-replicate=%s"%ha_replicate ] if brokers_url: args += [ "--ha-brokers-url", brokers_url ] diff --git a/qpid/cpp/src/tests/ha_test_max_queues.cpp b/qpid/cpp/src/tests/ha_test_max_queues.cpp new file mode 100644 index 0000000000..fda271f854 --- /dev/null +++ b/qpid/cpp/src/tests/ha_test_max_queues.cpp @@ -0,0 +1,64 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <qpid/client/Connection.h> +#include <qpid/client/Session.h> +#include <qpid/client/AsyncSession.h> +#include <qpid/Url.h> +#include <qpid/framing/reply_exceptions.h> +#include <sstream> + +using namespace qpid::client; +using namespace std; + +int main(int argc, char** argv) { + assert(argc == 2); // Expecing URL of broker as argv[1] + try { + // We need to create a large number of queues quickly, so we + // use the old API for it's asynchronous commands. + // The qpid::messaging API does not allow async queue creation. + // + Connection c; + c.open(qpid::Url(argv[1])); + AsyncSession s = async(c.newSession()); + // Generate too many queues, make sure we get an exception. + for (uint64_t i = 0; i < 100000; ++i) { + ostringstream os; + os << "q" << i; + string q = os.str(); + s.queueDeclare(q, arg::sync=false); + if (i && i % 1000 == 0) { + s.sync(); // Check for exceptions. + cout << "Declared " << q << endl; + } + } + cout << "Expected resource-limit-exceeded exception" << endl; + return 1; + } + catch (const qpid::framing::ResourceLimitExceededException& e) { + cout << "Resource limit exceeded: " << e.what() << endl; + return 0; + } + catch (const std::exception& e) { + cout << "Error: " << e.what() << endl; + return 1; + } +} diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py index b8644ab0fa..abc62b643e 100755 --- a/qpid/cpp/src/tests/ha_tests.py +++ b/qpid/cpp/src/tests/ha_tests.py @@ -22,6 +22,7 @@ import os, signal, sys, time, imp, re, subprocess, glob, random, logging, shutil import traceback from qpid.messaging import Message, SessionError, NotFound, ConnectionError, ReceiverError, Connection, Timeout, Disposition, REJECTED, Empty, ServerError from qpid.datatypes import uuid4, UUID +from qpid.harness import Skipped from brokertest import * from ha_test import * from threading import Thread, Lock, Condition @@ -1078,6 +1079,18 @@ class LongTests(HaBrokerTest): finally: for t in threads: t.stopped = True; t.join() + def test_max_queues(self): + """Verify that we behave properly if we try to exceed the max number + of replicated queues - currently limited by the max number of channels + in the replication link""" + # This test is very slow (3 mins), skip it unless duration() > 1 minute. + if self.duration() < 60: return + # This test is written in C++ for speed, it takes a long time + # to create 64k queues in python. See ha_test_max_queues.cpp. + cluster = HaCluster(self, 2) + test = self.popen(["ha_test_max_queues", cluster[0].host_port()]) + self.assertEqual(test.wait(), 0) + class RecoveryTests(HaBrokerTest): """Tests for recovery after a failure.""" |