diff options
-rw-r--r-- | etc/ironic/ironic.conf.sample | 10 | ||||
-rw-r--r-- | ironic/common/hash_ring.py | 79 | ||||
-rw-r--r-- | ironic/conductor/rpcapi.py | 2 | ||||
-rw-r--r-- | ironic/tests/test_hash_ring.py | 127 |
4 files changed, 167 insertions, 51 deletions
diff --git a/etc/ironic/ironic.conf.sample b/etc/ironic/ironic.conf.sample index 87ecd0e17..9aaae0e67 100644 --- a/etc/ironic/ironic.conf.sample +++ b/etc/ironic/ironic.conf.sample @@ -236,8 +236,14 @@ # distributing load across conductors. Larger values will # result in more even distribution of load and less load when # rebalancing the ring, but more memory usage. Number of -# partitions is (2^hash_partition_exponent). (integer value) -#hash_partition_exponent=16 +# partitions per conductor is (2^hash_partition_exponent). +# This determines the granularity of rebalancing: given 10 +# hosts, and an exponent of the 2, there are 40 partitions in +# the ring.A few thousand partitions should make rebalancing +# smooth in most cases. The default is suitable for up to a +# few hundred conductors. Too many partitions has a CPU +# impact. (integer value) +#hash_partition_exponent=5 # [Experimental Feature] Number of hosts to map onto each hash # partition. Setting this to more than one will cause diff --git a/ironic/common/hash_ring.py b/ironic/common/hash_ring.py index b13f795d1..6c8de3f3b 100644 --- a/ironic/common/hash_ring.py +++ b/ironic/common/hash_ring.py @@ -13,7 +13,7 @@ # License for the specific language governing permissions and limitations # under the License. -import array +import bisect import hashlib import struct import threading @@ -26,12 +26,19 @@ from ironic.db import api as dbapi hash_opts = [ cfg.IntOpt('hash_partition_exponent', - default=16, + default=5, help='Exponent to determine number of hash partitions to use ' 'when distributing load across conductors. Larger values ' 'will result in more even distribution of load and less ' 'load when rebalancing the ring, but more memory usage. ' - 'Number of partitions is (2^hash_partition_exponent).'), + 'Number of partitions per conductor is ' + '(2^hash_partition_exponent). This determines the ' + 'granularity of rebalancing: given 10 hosts, and an ' + 'exponent of the 2, there are 40 partitions in the ring.' + 'A few thousand partitions should make rebalancing ' + 'smooth in most cases. The default is suitable for up to ' + 'a few hundred conductors. Too many partitions has a CPU ' + 'impact.'), cfg.IntOpt('hash_distribution_replicas', default=1, help='[Experimental Feature] ' @@ -47,6 +54,16 @@ CONF.register_opts(hash_opts) class HashRing(object): + """A stable hash ring. + + We map item N to a host Y based on the closest lower hash + - hash(item) -> partition + - hash(host) -> divider + - closest lower divider is the host to use + - we hash each host many times to spread load more finely + as otherwise adding a host gets (on average) 50% of the load of + just one other host assigned to it. + """ def __init__(self, hosts, replicas=None): """Create a new hash ring across the specified hosts. @@ -61,21 +78,30 @@ class HashRing(object): replicas = CONF.hash_distribution_replicas try: - self.hosts = list(hosts) + self.hosts = set(hosts) self.replicas = replicas if replicas <= len(hosts) else len(hosts) except TypeError: raise exception.Invalid( _("Invalid hosts supplied when building HashRing.")) - self.partition_shift = 32 - CONF.hash_partition_exponent - self.part2host = array.array('H') - for p in range(2 ** CONF.hash_partition_exponent): - self.part2host.append(p % len(hosts)) + self._host_hashes = {} + for host in hosts: + key = str(host).encode('utf8') + key_hash = hashlib.md5(key) + for p in range(2 ** CONF.hash_partition_exponent): + key_hash.update(key) + hashed_key = struct.unpack_from('>I', key_hash.digest())[0] + self._host_hashes[hashed_key] = host + # Gather the (possibly colliding) resulting hashes into a bisectable + # list. + self._partitions = sorted(self._host_hashes.keys()) def _get_partition(self, data): try: - return (struct.unpack_from('>I', hashlib.md5(data).digest())[0] - >> self.partition_shift) + hashed_key = struct.unpack_from( + '>I', hashlib.md5(data).digest())[0] + position = bisect.bisect(self._partitions, hashed_key) + return position if position < len(self._partitions) else 0 except TypeError: raise exception.Invalid( _("Invalid data supplied to HashRing.get_hosts.")) @@ -93,24 +119,35 @@ class HashRing(object): this `HashRing` was created with. It may be less than this if ignore_hosts is not None. """ - host_ids = [] + hosts = [] if ignore_hosts is None: - ignore_host_ids = [] + ignore_hosts = set() else: - ignore_host_ids = [self.hosts.index(h) - for h in ignore_hosts if h in self.hosts] - + ignore_hosts = set(ignore_hosts) + ignore_hosts.intersection_update(self.hosts) partition = self._get_partition(data) for replica in range(0, self.replicas): - if len(host_ids + ignore_host_ids) == len(self.hosts): - # prevent infinite loop + if len(hosts) + len(ignore_hosts) == len(self.hosts): + # prevent infinite loop - cannot allocate more fallbacks. break - while self.part2host[partition] in host_ids + ignore_host_ids: + # Linear probing: partition N, then N+1 etc. + host = self._get_host(partition) + while host in hosts or host in ignore_hosts: partition += 1 - if partition >= len(self.part2host): + if partition >= len(self._partitions): partition = 0 - host_ids.append(self.part2host[partition]) - return [self.hosts[h] for h in host_ids] + host = self._get_host(partition) + hosts.append(host) + return hosts + + def _get_host(self, partition): + """Find what host is serving a partition. + + :param partition: The index of the partition in the partition map. + e.g. 0 is the first partition, 1 is the second. + :return: The host object the ring was constructed with. + """ + return self._host_hashes[self._partitions[partition]] class HashRingManager(object): diff --git a/ironic/conductor/rpcapi.py b/ironic/conductor/rpcapi.py index c1f4226f2..4fa96f1aa 100644 --- a/ironic/conductor/rpcapi.py +++ b/ironic/conductor/rpcapi.py @@ -107,7 +107,7 @@ class ConductorAPI(object): """ hash_ring = self.ring_manager[driver_name] - host = random.choice(hash_ring.hosts) + host = random.choice(list(hash_ring.hosts)) return self.topic + "." + host def update_node(self, context, node_obj, topic=None): diff --git a/ironic/tests/test_hash_ring.py b/ironic/tests/test_hash_ring.py index 53da4a9e4..63b5884f4 100644 --- a/ironic/tests/test_hash_ring.py +++ b/ironic/tests/test_hash_ring.py @@ -14,6 +14,7 @@ # under the License. from oslo.config import cfg +from testtools import matchers from ironic.common import exception from ironic.common import hash_ring @@ -37,69 +38,141 @@ class HashRingTestCase(base.TestCase): hosts = ['foo', 'bar'] replicas = 2 ring = hash_ring.HashRing(hosts, replicas=replicas) - self.assertEqual(hosts, ring.hosts) + self.assertEqual(set(hosts), ring.hosts) self.assertEqual(replicas, ring.replicas) def test_create_with_different_partition_counts(self): hosts = ['foo', 'bar'] CONF.set_override('hash_partition_exponent', 2) ring = hash_ring.HashRing(hosts) - self.assertEqual(2 ** 2, len(ring.part2host)) + self.assertEqual(2 ** 2 * 2, len(ring._partitions)) CONF.set_override('hash_partition_exponent', 8) ring = hash_ring.HashRing(hosts) - self.assertEqual(2 ** 8, len(ring.part2host)) + self.assertEqual(2 ** 8 * 2, len(ring._partitions)) CONF.set_override('hash_partition_exponent', 16) ring = hash_ring.HashRing(hosts) - self.assertEqual(2 ** 16, len(ring.part2host)) + self.assertEqual(2 ** 16 * 2, len(ring._partitions)) def test_distribution_one_replica(self): hosts = ['foo', 'bar', 'baz'] ring = hash_ring.HashRing(hosts, replicas=1) - self.assertEqual(['foo'], ring.get_hosts('fake')) - self.assertEqual(['bar'], ring.get_hosts('fake-again')) + fake_1_hosts = ring.get_hosts('fake') + fake_2_hosts = ring.get_hosts('fake-again') + # We should have one hosts for each thing + self.assertThat(fake_1_hosts, matchers.HasLength(1)) + self.assertThat(fake_2_hosts, matchers.HasLength(1)) + # And they must not be the same answers even on this simple data. + self.assertNotEqual(fake_1_hosts, fake_2_hosts) def test_distribution_two_replicas(self): hosts = ['foo', 'bar', 'baz'] ring = hash_ring.HashRing(hosts, replicas=2) - self.assertEqual(['foo', 'bar'], ring.get_hosts('fake')) - self.assertEqual(['bar', 'baz'], ring.get_hosts('fake-again')) + fake_1_hosts = ring.get_hosts('fake') + fake_2_hosts = ring.get_hosts('fake-again') + # We should have two hosts for each thing + self.assertThat(fake_1_hosts, matchers.HasLength(2)) + self.assertThat(fake_2_hosts, matchers.HasLength(2)) + # And they must not be the same answers even on this simple data + # because if they were we'd be making the active replica a hot spot. + self.assertNotEqual(fake_1_hosts, fake_2_hosts) def test_distribution_three_replicas(self): hosts = ['foo', 'bar', 'baz'] ring = hash_ring.HashRing(hosts, replicas=3) - self.assertEqual(['foo', 'bar', 'baz'], ring.get_hosts('fake')) - self.assertEqual(['bar', 'baz', 'foo'], ring.get_hosts('fake-again')) + fake_1_hosts = ring.get_hosts('fake') + fake_2_hosts = ring.get_hosts('fake-again') + # We should have two hosts for each thing + self.assertThat(fake_1_hosts, matchers.HasLength(3)) + self.assertThat(fake_2_hosts, matchers.HasLength(3)) + # And they must not be the same answers even on this simple data + # because if they were we'd be making the active replica a hot spot. + self.assertNotEqual(fake_1_hosts, fake_2_hosts) + self.assertNotEqual(fake_1_hosts[0], fake_2_hosts[0]) def test_ignore_hosts(self): hosts = ['foo', 'bar', 'baz'] ring = hash_ring.HashRing(hosts, replicas=1) - self.assertEqual(['bar'], ring.get_hosts('fake', - ignore_hosts=['foo'])) - self.assertEqual(['baz'], ring.get_hosts('fake', - ignore_hosts=['foo', 'bar'])) - self.assertEqual([], ring.get_hosts('fake', - ignore_hosts=hosts)) + equals_bar_or_baz = matchers.MatchesAny( + matchers.Equals(['bar']), + matchers.Equals(['baz'])) + self.assertThat( + ring.get_hosts('fake', ignore_hosts=['foo']), + equals_bar_or_baz) + self.assertThat( + ring.get_hosts('fake', ignore_hosts=['foo', 'bar']), + equals_bar_or_baz) + self.assertEqual([], ring.get_hosts('fake', ignore_hosts=hosts)) def test_ignore_hosts_with_replicas(self): hosts = ['foo', 'bar', 'baz'] ring = hash_ring.HashRing(hosts, replicas=2) - self.assertEqual(['bar', 'baz'], ring.get_hosts('fake', - ignore_hosts=['foo'])) - self.assertEqual(['baz'], ring.get_hosts('fake', - ignore_hosts=['foo', 'bar'])) - self.assertEqual(['baz', 'foo'], ring.get_hosts('fake-again', - ignore_hosts=['bar'])) - self.assertEqual(['foo'], ring.get_hosts('fake-again', - ignore_hosts=['bar', 'baz'])) - self.assertEqual([], ring.get_hosts('fake', - ignore_hosts=hosts)) + self.assertEqual( + set(['bar', 'baz']), + set(ring.get_hosts('fake', ignore_hosts=['foo']))) + self.assertEqual(set(['baz']), + set(ring.get_hosts('fake', ignore_hosts=['foo', 'bar']))) + self.assertEqual( + set(['baz', 'foo']), + set(ring.get_hosts('fake-again', ignore_hosts=['bar']))) + self.assertEqual( + set(['foo']), + set(ring.get_hosts('fake-again', ignore_hosts=['bar', 'baz']))) + self.assertEqual([], ring.get_hosts('fake', ignore_hosts=hosts)) + + def _compare_rings(self, nodes, conductors, ring, + new_conductors, new_ring): + delta = {} + mapping = dict((node, ring.get_hosts(node)[0]) for node in nodes) + new_mapping = dict( + (node, new_ring.get_hosts(node)[0]) for node in nodes) + + for key, old in mapping.items(): + new = new_mapping.get(key, None) + if new != old: + delta[key] = (old, new) + return delta + + def test_rebalance_stability_join(self): + num_conductors = 10 + num_nodes = 10000 + # Adding 1 conductor to a set of N should move 1/(N+1) of all nodes + # Eg, for a cluster of 10 nodes, adding one should move 1/11, or 9% + # We allow for 1/N to allow for rounding in tests. + redistribution_factor = 1.0 / num_conductors + + nodes = [str(x) for x in range(num_nodes)] + conductors = [str(x) for x in range(num_conductors)] + new_conductors = conductors + ['new'] + delta = self._compare_rings(nodes, + conductors, hash_ring.HashRing(conductors), + new_conductors, hash_ring.HashRing(new_conductors)) + + self.assertTrue(len(delta) < num_nodes * redistribution_factor) + + def test_rebalance_stability_leave(self): + num_conductors = 10 + num_nodes = 10000 + # Removing 1 conductor from a set of N should move 1/(N) of all nodes + # Eg, for a cluster of 10 nodes, removing one should move 1/10, or 10% + # We allow for 1/(N-1) to allow for rounding in tests. + redistribution_factor = 1.0 / (num_conductors - 1) + + nodes = [str(x) for x in range(num_nodes)] + conductors = [str(x) for x in range(num_conductors)] + new_conductors = conductors[:] + new_conductors.pop() + delta = self._compare_rings(nodes, + conductors, hash_ring.HashRing(conductors), + new_conductors, hash_ring.HashRing(new_conductors)) + + self.assertTrue(len(delta) < num_nodes * redistribution_factor) def test_more_replicas_than_hosts(self): hosts = ['foo', 'bar'] ring = hash_ring.HashRing(hosts, replicas=10) - self.assertEqual(hosts, ring.get_hosts('fake')) + self.assertEqual(set(hosts), set(ring.get_hosts('fake'))) def test_ignore_non_existent_host(self): hosts = ['foo', 'bar'] |