diff options
author | Erik Olof Gunnar Andersson <eandersson@blizzard.com> | 2023-01-21 18:13:31 -0800 |
---|---|---|
committer | Erik Olof Gunnar Andersson <eandersson@blizzard.com> | 2023-04-04 03:48:34 +0000 |
commit | fb9c2da977a36afc695b1e75a81aa85d8e56ef0a (patch) | |
tree | affaab8de5db9e4d7c5003769e8caa2c66fe18fe | |
parent | 0f6a837a23f218158e487f99219c4a63d80696cb (diff) | |
download | designate-fb9c2da977a36afc695b1e75a81aa85d8e56ef0a.tar.gz |
Move to a batch model for incrementing serial
This patch moves the responsibility of incrementing the
serial on a zone from central to the producer. This also
means that NOTIFY is triggered by the producer after the
serial has been incremented. The advantage of this approach
is that we can now batch requests which means less work
for the DNS servers, and it removes the risk of
race-conditions when updating the serial. Finally, the
producer is sharded and is easy to scale which means that
this approach should scale well with many zones.
The disadvantage is that it may take up to 5 seconds longer
for the DNS record to be updated on the DNS server. This
can be lowered by increasing the frequency of the task
that is responsible for incrementing the serial.
Depends-On: https://review.opendev.org/#/c/871266/
Change-Id: I5e9733abaaa40c874e1d80d7b57e563df0f12cee
21 files changed, 517 insertions, 189 deletions
diff --git a/designate/central/rpcapi.py b/designate/central/rpcapi.py index 3932690d..d761cc91 100644 --- a/designate/central/rpcapi.py +++ b/designate/central/rpcapi.py @@ -68,8 +68,9 @@ class CentralAPI(object): 6.4 - Removed unused record and diagnostic methods 6.5 - Removed additional unused methods 6.6 - Add methods for shared zones + 6.7 - Add increment_zone_serial """ - RPC_API_VERSION = '6.6' + RPC_API_VERSION = '6.7' # This allows us to mark some methods as not logged. # This can be for a few reasons - some methods my not actually call over @@ -82,7 +83,7 @@ class CentralAPI(object): target = messaging.Target(topic=self.topic, version=self.RPC_API_VERSION) - self.client = rpc.get_client(target, version_cap='6.6') + self.client = rpc.get_client(target, version_cap='6.7') @classmethod def get_instance(cls): @@ -141,6 +142,9 @@ class CentralAPI(object): return self.client.call(context, 'get_tenant', tenant_id=tenant_id) # Zone Methods + def increment_zone_serial(self, context, zone): + return self.client.call(context, 'increment_zone_serial', zone=zone) + def create_zone(self, context, zone): return self.client.call(context, 'create_zone', zone=zone) diff --git a/designate/central/service.py b/designate/central/service.py index fa7be5f3..f9e220e4 100644 --- a/designate/central/service.py +++ b/designate/central/service.py @@ -27,6 +27,7 @@ from dns import zone as dnszone from oslo_config import cfg from oslo_log import log as logging import oslo_messaging as messaging +from oslo_utils import timeutils from designate.common import constants from designate.common.decorators import lock @@ -51,7 +52,7 @@ LOG = logging.getLogger(__name__) class Service(service.RPCService): - RPC_API_VERSION = '6.6' + RPC_API_VERSION = '6.7' target = messaging.Target(version=RPC_API_VERSION) @@ -349,48 +350,35 @@ class Service(service.RPCService): "A project ID must be specified when not using a project " "scoped token.") - def _increment_zone_serial(self, context, zone, set_delayed_notify=False): - """Update the zone serial and the SOA record - Optionally set delayed_notify to have PM issue delayed notify - """ - - # Increment the serial number - zone.serial = utils.increment_serial(zone.serial) - if set_delayed_notify: - zone.delayed_notify = True - - zone = self.storage.update_zone(context, zone) - - # Update SOA record - self._update_soa(context, zone) - - return zone - # SOA Recordset Methods - def _build_soa_record(self, zone, ns_records): - return "%s %s. %d %d %d %d %d" % (ns_records[0]['hostname'], - zone['email'].replace("@", "."), - zone['serial'], - zone['refresh'], - zone['retry'], - zone['expire'], - zone['minimum']) + @staticmethod + def _build_soa_record(zone, ns_records): + return '%s %s. %d %d %d %d %d' % ( + ns_records[0]['hostname'], + zone['email'].replace('@', '.'), + zone['serial'], + zone['refresh'], + zone['retry'], + zone['expire'], + zone['minimum'] + ) def _create_soa(self, context, zone): pool_ns_records = self._get_pool_ns_records(context, zone.pool_id) - - soa_values = [self._build_soa_record(zone, pool_ns_records)] - recordlist = objects.RecordList(objects=[ - objects.Record(data=r, managed=True) for r in soa_values]) - values = { - 'name': zone['name'], - 'type': "SOA", - 'records': recordlist - } - soa, zone = self._create_recordset_in_storage( - context, zone, objects.RecordSet(**values), - increment_serial=False) - return soa + records = objects.RecordList(objects=[ + objects.Record( + data=self._build_soa_record(zone, pool_ns_records), + managed=True + ) + ]) + return self._create_recordset_in_storage( + context, zone, + objects.RecordSet( + name=zone['name'], + type='SOA', + records=records + ), increment_serial=False + )[0] def _update_soa(self, context, zone): # NOTE: We should not be updating SOA records when a zone is SECONDARY. @@ -400,14 +388,18 @@ class Service(service.RPCService): # Get the pool for it's list of ns_records pool_ns_records = self._get_pool_ns_records(context, zone.pool_id) - soa = self.find_recordset(context, - criterion={'zone_id': zone['id'], - 'type': "SOA"}) + soa = self.find_recordset( + context, criterion={ + 'zone_id': zone['id'], + 'type': 'SOA' + } + ) soa.records[0].data = self._build_soa_record(zone, pool_ns_records) - self._update_recordset_in_storage(context, zone, soa, - increment_serial=False) + self._update_recordset_in_storage( + context, zone, soa, increment_serial=False + ) # NS Recordset Methods def _create_ns(self, context, zone, ns_records): @@ -731,6 +723,14 @@ class Service(service.RPCService): return pool.ns_records @rpc.expected_exceptions() + @transaction + @lock.synchronized_zone() + def increment_zone_serial(self, context, zone): + zone.serial = self.storage.increment_serial(context, zone.id) + self._update_soa(context, zone) + return zone.serial + + @rpc.expected_exceptions() @notification.notify_type('dns.domain.create') @notification.notify_type('dns.zone.create') @lock.synchronized_zone(new_zone=True) @@ -853,7 +853,8 @@ class Service(service.RPCService): # can be very long-lived. time.sleep(0) self._create_recordset_in_storage( - context, zone, rrset, increment_serial=False) + context, zone, rrset, increment_serial=False + ) return zone @@ -992,29 +993,28 @@ class Service(service.RPCService): """Update zone """ zone = self._update_zone_in_storage( - context, zone, increment_serial=increment_serial) + context, zone, increment_serial=increment_serial + ) # Fire off a XFR if 'masters' in changes: self.worker_api.perform_zone_xfr(context, zone) - self.worker_api.update_zone(context, zone) - return zone @transaction def _update_zone_in_storage(self, context, zone, - increment_serial=True, set_delayed_notify=False): - + increment_serial=True, + set_delayed_notify=False): zone.action = 'UPDATE' zone.status = 'PENDING' if increment_serial: - # _increment_zone_serial increments and updates the zone - zone = self._increment_zone_serial( - context, zone, set_delayed_notify=set_delayed_notify) - else: - zone = self.storage.update_zone(context, zone) + zone.increment_serial = True + if set_delayed_notify: + zone.delayed_notify = True + + zone = self.storage.update_zone(context, zone) return zone @@ -1329,7 +1329,6 @@ class Service(service.RPCService): # RecordSet Methods @rpc.expected_exceptions() @notification.notify_type('dns.recordset.create') - @lock.synchronized_zone() def create_recordset(self, context, zone_id, recordset, increment_serial=True): zone = self.storage.get_zone(context, zone_id, @@ -1372,9 +1371,8 @@ class Service(service.RPCService): context = context.elevated(all_tenants=True) recordset, zone = self._create_recordset_in_storage( - context, zone, recordset, increment_serial=increment_serial) - - self.worker_api.update_zone(context, zone) + context, zone, recordset, increment_serial=increment_serial + ) recordset.zone_name = zone.name recordset.obj_reset_changes(['zone_name']) @@ -1412,33 +1410,33 @@ class Service(service.RPCService): @transaction_shallow_copy def _create_recordset_in_storage(self, context, zone, recordset, increment_serial=True): - # Ensure the tenant has enough quota to continue self._enforce_recordset_quota(context, zone) - self._validate_recordset(context, zone, recordset) - if recordset.obj_attr_is_set('records') and len(recordset.records) > 0: - + if recordset.obj_attr_is_set('records') and recordset.records: # Ensure the tenant has enough zone record quotas to # create new records self._enforce_record_quota(context, zone, recordset) - if increment_serial: - # update the zone's status and increment the serial - zone = self._update_zone_in_storage( - context, zone, increment_serial) - for record in recordset.records: record.action = 'CREATE' record.status = 'PENDING' - record.serial = zone.serial + if not increment_serial: + record.serial = zone.serial + else: + record.serial = timeutils.utcnow_ts() - recordset = self.storage.create_recordset(context, zone.id, - recordset) + new_recordset = self.storage.create_recordset(context, zone.id, + recordset) + if recordset.records and increment_serial: + # update the zone's status and increment the serial + zone = self._update_zone_in_storage( + context, zone, increment_serial + ) # Return the zone too in case it was updated - return (recordset, zone) + return new_recordset, zone @rpc.expected_exceptions() def get_recordset(self, context, zone_id, recordset_id): @@ -1553,7 +1551,6 @@ class Service(service.RPCService): @rpc.expected_exceptions() @notification.notify_type('dns.recordset.update') - @lock.synchronized_zone() def update_recordset(self, context, recordset, increment_serial=True): zone_id = recordset.obj_get_original_value('zone_id') changes = recordset.obj_get_changes() @@ -1622,41 +1619,44 @@ class Service(service.RPCService): recordset, zone = self._update_recordset_in_storage( context, zone, recordset, increment_serial=increment_serial) - self.worker_api.update_zone(context, zone) - return recordset @transaction def _update_recordset_in_storage(self, context, zone, recordset, - increment_serial=True, set_delayed_notify=False): + increment_serial=True, + set_delayed_notify=False): self._validate_recordset(context, zone, recordset) - if increment_serial: - # update the zone's status and increment the serial - zone = self._update_zone_in_storage( - context, zone, increment_serial, - set_delayed_notify=set_delayed_notify) - if recordset.records: for record in recordset.records: - if record.action != 'DELETE': - record.action = 'UPDATE' - record.status = 'PENDING' + if record.action == 'DELETE': + continue + record.action = 'UPDATE' + record.status = 'PENDING' + if not increment_serial: record.serial = zone.serial + else: + record.serial = timeutils.utcnow_ts() # Ensure the tenant has enough zone record quotas to # create new records self._enforce_record_quota(context, zone, recordset) # Update the recordset - recordset = self.storage.update_recordset(context, recordset) + new_recordset = self.storage.update_recordset(context, recordset) - return recordset, zone + if increment_serial: + # update the zone's status and increment the serial + zone = self._update_zone_in_storage( + context, zone, + increment_serial=increment_serial, + set_delayed_notify=set_delayed_notify) + + return new_recordset, zone @rpc.expected_exceptions() @notification.notify_type('dns.recordset.delete') - @lock.synchronized_zone() def delete_recordset(self, context, zone_id, recordset_id, increment_serial=True): # apply_tenant_criteria=False here as we will gate this delete @@ -1708,8 +1708,6 @@ class Service(service.RPCService): recordset, zone = self._delete_recordset_in_storage( context, zone, recordset, increment_serial=increment_serial) - self.worker_api.update_zone(context, zone) - recordset.zone_name = zone.name recordset.obj_reset_changes(['zone_name']) @@ -1718,23 +1716,26 @@ class Service(service.RPCService): @transaction def _delete_recordset_in_storage(self, context, zone, recordset, increment_serial=True): - - if increment_serial: - # update the zone's status and increment the serial - zone = self._update_zone_in_storage( - context, zone, increment_serial) - if recordset.records: for record in recordset.records: record.action = 'DELETE' record.status = 'PENDING' - record.serial = zone.serial + if not increment_serial: + record.serial = zone.serial + else: + record.serial = timeutils.utcnow_ts() # Update the recordset's action/status and then delete it self.storage.update_recordset(context, recordset) - recordset = self.storage.delete_recordset(context, recordset.id) - return (recordset, zone) + if increment_serial: + # update the zone's status and increment the serial + zone = self._update_zone_in_storage( + context, zone, increment_serial) + + new_recordset = self.storage.delete_recordset(context, recordset.id) + + return new_recordset, zone @rpc.expected_exceptions() def count_recordsets(self, context, criterion=None): diff --git a/designate/conf/producer.py b/designate/conf/producer.py index 44f5ec7d..4aa58731 100644 --- a/designate/conf/producer.py +++ b/designate/conf/producer.py @@ -20,6 +20,11 @@ PRODUCER_GROUP = cfg.OptGroup( title='Configuration for Producer Service' ) +PRODUCER_TASK_INCREMENT_SERIAL_GROUP = cfg.OptGroup( + name='producer_task:increment_serial', + title='Configuration for Producer Task: Increment Serial' +) + PRODUCER_TASK_DELAYED_NOTIFY_GROUP = cfg.OptGroup( name='producer_task:delayed_notify', title='Configuration for Producer Task: Delayed Notify' @@ -62,6 +67,15 @@ PRODUCER_OPTS = [ help='RPC topic name for producer'), ] +PRODUCER_TASK_INCREMENT_SERIAL_OPTS = [ + cfg.IntOpt('interval', default=5, + help='Run interval in seconds'), + cfg.IntOpt('per_page', default=100, + help='Default amount of results returned per page'), + cfg.IntOpt('batch_size', default=100, + help='How many zones to increment serial for on each run'), +] + PRODUCER_TASK_DELAYED_NOTIFY_OPTS = [ cfg.IntOpt('interval', default=5, help='Run interval in seconds'), @@ -111,6 +125,9 @@ def register_opts(conf): conf.register_group(PRODUCER_TASK_DELAYED_NOTIFY_GROUP) conf.register_opts(PRODUCER_TASK_DELAYED_NOTIFY_OPTS, group=PRODUCER_TASK_DELAYED_NOTIFY_GROUP) + conf.register_group(PRODUCER_TASK_INCREMENT_SERIAL_GROUP) + conf.register_opts(PRODUCER_TASK_INCREMENT_SERIAL_OPTS, + group=PRODUCER_TASK_INCREMENT_SERIAL_GROUP) conf.register_group(PRODUCER_TASK_PERIODIC_EXISTS_GROUP) conf.register_opts(PRODUCER_TASK_PERIODIC_EXISTS_OPTS, group=PRODUCER_TASK_PERIODIC_EXISTS_GROUP) diff --git a/designate/objects/zone.py b/designate/objects/zone.py index fae67bc5..dc27d271 100644 --- a/designate/objects/zone.py +++ b/designate/objects/zone.py @@ -66,6 +66,7 @@ class Zone(base.DesignateObject, base.DictObjectMixin, ), 'transferred_at': fields.DateTimeField(nullable=True, read_only=False), 'delayed_notify': fields.BooleanField(nullable=True), + 'increment_serial': fields.BooleanField(nullable=True), } STRING_KEYS = [ diff --git a/designate/producer/tasks.py b/designate/producer/tasks.py index 521d378e..41f3a78d 100644 --- a/designate/producer/tasks.py +++ b/designate/producer/tasks.py @@ -227,8 +227,6 @@ class PeriodicGenerateDelayedNotifyTask(PeriodicTask): Call Worker to emit NOTIFY transactions, Reset the flag. """ - pstart, pend = self._my_range() - ctxt = context.DesignateContext.get_admin_context() ctxt.all_tenants = True @@ -237,6 +235,7 @@ class PeriodicGenerateDelayedNotifyTask(PeriodicTask): # There's an index on delayed_notify. criterion = self._filter_between('shard') criterion['delayed_notify'] = True + criterion['increment_serial'] = False zones = self.central_api.find_zones( ctxt, criterion, @@ -246,6 +245,17 @@ class PeriodicGenerateDelayedNotifyTask(PeriodicTask): ) for zone in zones: + if zone.action == 'NONE': + zone.action = 'UPDATE' + zone.status = 'PENDING' + elif zone.action == 'DELETE': + LOG.debug( + 'Skipping delayed NOTIFY for %(id)s being DELETED', + { + 'id': zone.id, + } + ) + continue self.worker_api.update_zone(ctxt, zone) zone.delayed_notify = False self.central_api.update_zone(ctxt, zone) @@ -257,6 +267,54 @@ class PeriodicGenerateDelayedNotifyTask(PeriodicTask): ) +class PeriodicIncrementSerialTask(PeriodicTask): + __plugin_name__ = 'increment_serial' + + def __init__(self): + super(PeriodicIncrementSerialTask, self).__init__() + + def __call__(self): + ctxt = context.DesignateContext.get_admin_context() + ctxt.all_tenants = True + + # Select zones where "increment_serial" is set and starting from the + # oldest "updated_at". + # There's an index on increment_serial. + criterion = self._filter_between('shard') + criterion['increment_serial'] = True + zones = self.central_api.find_zones( + ctxt, + criterion, + limit=CONF[self.name].batch_size, + sort_key='updated_at', + sort_dir='asc', + ) + for zone in zones: + if zone.action == 'DELETE': + LOG.debug( + 'Skipping increment serial for %(id)s being DELETED', + { + 'id': zone.id, + } + ) + continue + + serial = self.central_api.increment_zone_serial(ctxt, zone) + LOG.debug( + 'Incremented serial for %(id)s to %(serial)d', + { + 'id': zone.id, + 'serial': serial, + } + ) + if not zone.delayed_notify: + # Notify the backend. + if zone.action == 'NONE': + zone.action = 'UPDATE' + zone.status = 'PENDING' + self.worker_api.update_zone(ctxt, zone) + + class WorkerPeriodicRecovery(PeriodicTask): __plugin_name__ = 'worker_periodic_recovery' diff --git a/designate/sqlalchemy/base.py b/designate/sqlalchemy/base.py index 8393e1d6..5512a7be 100644 --- a/designate/sqlalchemy/base.py +++ b/designate/sqlalchemy/base.py @@ -152,7 +152,7 @@ class SQLAlchemy(object, metaclass=abc.ABCMeta): # Ensure the Object is valid # obj.validate() - values = obj.obj_get_changes() + values = dict(obj) if skip_values is not None: for skip_value in skip_values: @@ -166,7 +166,7 @@ class SQLAlchemy(object, metaclass=abc.ABCMeta): with sql.get_write_session() as session: try: - resultproxy = session.execute(query, [dict(values)]) + resultproxy = session.execute(query, [values]) except oslo_db_exception.DBDuplicateEntry: raise exc_dup("Duplicate %s" % obj.obj_name()) diff --git a/designate/storage/base.py b/designate/storage/base.py index 347f9778..5b9cd221 100644 --- a/designate/storage/base.py +++ b/designate/storage/base.py @@ -745,6 +745,15 @@ class Storage(DriverPlugin, metaclass=abc.ABCMeta): """ @abc.abstractmethod + def increment_serial(self, context, zone_id): + """ + Increment serial of a Zone + + :param context: RPC Context. + :param zone_id: ID of the Zone. + """ + + @abc.abstractmethod def delete_zone_import(self, context, zone_import_id): """ Delete a Zone Import via ID. diff --git a/designate/storage/impl_sqlalchemy/__init__.py b/designate/storage/impl_sqlalchemy/__init__.py index f9a4c009..1a89709d 100644 --- a/designate/storage/impl_sqlalchemy/__init__.py +++ b/designate/storage/impl_sqlalchemy/__init__.py @@ -15,6 +15,7 @@ # under the License. from oslo_log import log as logging from oslo_utils.secretutils import md5 +from oslo_utils import timeutils from sqlalchemy import case, select, distinct, func from sqlalchemy.sql.expression import or_, literal_column @@ -435,6 +436,19 @@ class SQLAlchemyStorage(sqlalchemy_base.SQLAlchemy, storage_base.Storage): return updated_zone + def increment_serial(self, context, zone_id): + """Increment the zone's serial number. + """ + new_serial = timeutils.utcnow_ts() + query = tables.zones.update().where( + tables.zones.c.id == zone_id).values( + {'serial': new_serial, 'increment_serial': False} + ) + with sql.get_write_session() as session: + session.execute(query) + LOG.debug('Incremented zone serial for %s to %d', zone_id, new_serial) + return new_serial + def delete_zone(self, context, zone_id): """ """ diff --git a/designate/storage/impl_sqlalchemy/alembic/versions/a005af3aa38e_add_increment_serial.py b/designate/storage/impl_sqlalchemy/alembic/versions/a005af3aa38e_add_increment_serial.py new file mode 100644 index 00000000..e7a84dfc --- /dev/null +++ b/designate/storage/impl_sqlalchemy/alembic/versions/a005af3aa38e_add_increment_serial.py @@ -0,0 +1,38 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +"""Add increment serial + +Revision ID: a005af3aa38e +Revises: b20189fd288e +Create Date: 2023-01-21 17:39:00.822775 + +""" +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision = 'a005af3aa38e' +down_revision = 'b20189fd288e' +branch_labels = None +depends_on = None + + +def upgrade() -> None: + op.add_column( + 'zones', + sa.Column('increment_serial', sa.Boolean, default=False) + ) + op.create_index( + 'increment_serial', 'zones', ['increment_serial'] + ) diff --git a/designate/storage/impl_sqlalchemy/tables.py b/designate/storage/impl_sqlalchemy/tables.py index eacaf8d3..aa815ed9 100644 --- a/designate/storage/impl_sqlalchemy/tables.py +++ b/designate/storage/impl_sqlalchemy/tables.py @@ -139,6 +139,7 @@ zones = Table('zones', metadata, Column('pool_id', UUID, default=None, nullable=True), Column('reverse_name', String(255), nullable=False), Column('delayed_notify', Boolean, default=False), + Column('increment_serial', Boolean, default=False), UniqueConstraint('name', 'deleted', 'pool_id', name='unique_zone_name'), ForeignKeyConstraint(['parent_zone_id'], diff --git a/designate/tests/test_api/test_v2/test_recordsets.py b/designate/tests/test_api/test_v2/test_recordsets.py index fa03e934..58523226 100644 --- a/designate/tests/test_api/test_v2/test_recordsets.py +++ b/designate/tests/test_api/test_v2/test_recordsets.py @@ -17,6 +17,7 @@ from unittest.mock import patch from oslo_log import log as logging import oslo_messaging as messaging +from oslo_utils import timeutils from designate.central import service as central_service from designate import exceptions @@ -438,10 +439,9 @@ class ApiV2RecordSetsTest(ApiV2TestCase): self.client.delete(url, status=202, headers={'X-Test-Role': 'member'}) # Simulate the zone having been deleted on the backend - zone_serial = self.central_service.get_zone( - self.admin_context, zone['id']).serial self.central_service.update_status( - self.admin_context, zone['id'], 'SUCCESS', zone_serial, 'UPDATE' + self.admin_context, zone['id'], 'SUCCESS', timeutils.utcnow_ts(), + 'DELETE' ) # Try to get the record and ensure that we get a diff --git a/designate/tests/test_central/test_service.py b/designate/tests/test_central/test_service.py index 846684fa..c488729f 100644 --- a/designate/tests/test_central/test_service.py +++ b/designate/tests/test_central/test_service.py @@ -19,6 +19,7 @@ from collections import namedtuple from concurrent import futures import copy import datetime + import futurist import random import unittest @@ -33,7 +34,6 @@ from oslo_messaging.rpc import dispatcher as rpc_dispatcher from oslo_utils import timeutils from oslo_versionedobjects import exception as ovo_exc import testtools -from testtools.matchers import GreaterThan from designate import exceptions from designate import objects @@ -890,7 +890,7 @@ class CentralServiceTest(CentralTestCase): def test_update_zone(self, mock_notifier): # Create a zone zone = self.create_zone(email='info@example.org') - original_serial = zone.serial + self.assertFalse(zone.increment_serial) # Update the object zone.email = 'info@example.net' @@ -906,7 +906,7 @@ class CentralServiceTest(CentralTestCase): self.admin_context, zone.id) # Ensure the zone was updated correctly - self.assertGreater(zone.serial, original_serial) + self.assertTrue(zone.increment_serial) self.assertEqual('info@example.net', zone.email) self.assertEqual(2, mock_notifier.call_count) @@ -931,7 +931,7 @@ class CentralServiceTest(CentralTestCase): def test_update_zone_without_incrementing_serial(self): # Create a zone zone = self.create_zone(email='info@example.org') - original_serial = zone.serial + self.assertFalse(zone.increment_serial) # Update the object zone.email = 'info@example.net' @@ -944,7 +944,7 @@ class CentralServiceTest(CentralTestCase): zone = self.central_service.get_zone(self.admin_context, zone.id) # Ensure the zone was updated correctly - self.assertEqual(original_serial, zone.serial) + self.assertFalse(zone.increment_serial) self.assertEqual('info@example.net', zone.email) def test_update_zone_name_fail(self): @@ -965,7 +965,7 @@ class CentralServiceTest(CentralTestCase): def test_update_zone_deadlock_retry(self): # Create a zone zone = self.create_zone(name='example.org.') - original_serial = zone.serial + self.assertFalse(zone.increment_serial) # Update the Object zone.email = 'info@example.net' @@ -992,7 +992,7 @@ class CentralServiceTest(CentralTestCase): self.assertTrue(i[0]) # Ensure the zone was updated correctly - self.assertGreater(zone.serial, original_serial) + self.assertTrue(zone.increment_serial) self.assertEqual('info@example.net', zone.email) @mock.patch.object(notifier.Notifier, "info") @@ -1457,7 +1457,7 @@ class CentralServiceTest(CentralTestCase): # RecordSet Tests def test_create_recordset(self): zone = self.create_zone() - original_serial = zone.serial + self.assertFalse(zone.increment_serial) # Create the Object recordset = objects.RecordSet(name='www.%s' % zone.name, type='A') @@ -1469,7 +1469,6 @@ class CentralServiceTest(CentralTestCase): # Get the zone again to check if serial increased updated_zone = self.central_service.get_zone(self.admin_context, zone.id) - new_serial = updated_zone.serial # Ensure all values have been set correctly self.assertIsNotNone(recordset.id) @@ -1479,7 +1478,7 @@ class CentralServiceTest(CentralTestCase): self.assertIsNotNone(recordset.records) # The serial number does not get updated is there are no records # in the recordset - self.assertEqual(original_serial, new_serial) + self.assertFalse(updated_zone.increment_serial) def test_create_recordset_shared_zone(self): zone = self.create_zone() @@ -1546,15 +1545,15 @@ class CentralServiceTest(CentralTestCase): def test_create_recordset_with_records(self): zone = self.create_zone() - original_serial = zone.serial + self.assertFalse(zone.increment_serial) # Create the Object recordset = objects.RecordSet( name='www.%s' % zone.name, type='A', records=objects.RecordList(objects=[ - objects.Record(data='192.3.3.15'), - objects.Record(data='192.3.3.16'), + objects.Record(data='192.0.2.15'), + objects.Record(data='192.0.2.16'), ]) ) @@ -1565,14 +1564,13 @@ class CentralServiceTest(CentralTestCase): # Get updated serial number updated_zone = self.central_service.get_zone(self.admin_context, zone.id) - new_serial = updated_zone.serial # Ensure all values have been set correctly self.assertIsNotNone(recordset.records) self.assertEqual(2, len(recordset.records)) self.assertIsNotNone(recordset.records[0].id) self.assertIsNotNone(recordset.records[1].id) - self.assertThat(new_serial, GreaterThan(original_serial)) + self.assertTrue(updated_zone.increment_serial) def test_create_recordset_over_quota(self): # SOA, NS recordsets exist by default. @@ -1851,7 +1849,7 @@ class CentralServiceTest(CentralTestCase): def test_update_recordset(self): # Create a zone zone = self.create_zone() - original_serial = zone.serial + self.assertFalse(zone.increment_serial) # Create a recordset recordset = self.create_recordset(zone) @@ -1865,7 +1863,7 @@ class CentralServiceTest(CentralTestCase): # Get zone again to verify that serial number was updated updated_zone = self.central_service.get_zone(self.admin_context, zone.id) - new_serial = updated_zone.serial + self.assertTrue(updated_zone.increment_serial) # Fetch the resource again recordset = self.central_service.get_recordset( @@ -1873,7 +1871,6 @@ class CentralServiceTest(CentralTestCase): # Ensure the new value took self.assertEqual(1800, recordset.ttl) - self.assertThat(new_serial, GreaterThan(original_serial)) @unittest.expectedFailure # FIXME def test_update_recordset_deadlock_retry(self): @@ -1936,7 +1933,7 @@ class CentralServiceTest(CentralTestCase): def test_update_recordset_with_record_delete(self): # Create a zone zone = self.create_zone() - original_serial = zone.serial + self.assertFalse(zone.increment_serial) # Create a recordset and two records records = [ @@ -1960,12 +1957,11 @@ class CentralServiceTest(CentralTestCase): # Fetch the Zone again updated_zone = self.central_service.get_zone(self.admin_context, zone.id) - new_serial = updated_zone.serial # Ensure two Records are attached to the RecordSet correctly self.assertEqual(1, len(recordset.records)) self.assertIsNotNone(recordset.records[0].id) - self.assertThat(new_serial, GreaterThan(original_serial)) + self.assertTrue(updated_zone.increment_serial) def test_update_recordset_with_record_update(self): # Create a zone @@ -2066,7 +2062,7 @@ class CentralServiceTest(CentralTestCase): def test_update_recordset_shared_zone(self): # Create a zone zone = self.create_zone() - original_serial = zone.serial + self.assertFalse(zone.increment_serial) context = self.get_context(project_id='1', roles=['member', 'reader']) self.share_zone(context=self.admin_context, zone_id=zone.id, @@ -2084,7 +2080,9 @@ class CentralServiceTest(CentralTestCase): # Get zone again to verify that serial number was updated updated_zone = self.central_service.get_zone(self.admin_context, zone.id) - new_serial = updated_zone.serial + + # Ensure that we are incrementing the zone serial + self.assertTrue(updated_zone.increment_serial) # Fetch the resource again recordset = self.central_service.get_recordset( @@ -2092,11 +2090,10 @@ class CentralServiceTest(CentralTestCase): # Ensure the new value took self.assertEqual(1800, recordset.ttl) - self.assertThat(new_serial, GreaterThan(original_serial)) def test_delete_recordset(self): zone = self.create_zone() - original_serial = zone.serial + self.assertFalse(zone.increment_serial) # Create a recordset recordset = self.create_recordset(zone) @@ -2116,8 +2113,7 @@ class CentralServiceTest(CentralTestCase): # Fetch the zone again to verify serial number increased updated_zone = self.central_service.get_zone(self.admin_context, zone.id) - new_serial = updated_zone.serial - self.assertThat(new_serial, GreaterThan(original_serial)) + self.assertTrue(updated_zone.increment_serial) def test_delete_recordset_without_incrementing_serial(self): zone = self.create_zone() @@ -2219,8 +2215,8 @@ class CentralServiceTest(CentralTestCase): name='www.%s' % zone.name, type='A', records=objects.RecordList(objects=[ - objects.Record(data='192.3.3.15'), - objects.Record(data='192.3.3.16'), + objects.Record(data='203.0.113.15'), + objects.Record(data='203.0.113.16'), ]) ) @@ -2401,7 +2397,7 @@ class CentralServiceTest(CentralTestCase): # Ensure that the record is still in DB (No invalidation) self.central_service.find_records(elevated_a, criterion) - # Now give the fip id to tenant 'b' and see that it get's deleted + # Now give the fip id to tenant 'b' and see that it gets deleted self.network_api.fake.allocate_floatingip( context_b.project_id, fip['id']) @@ -2411,8 +2407,10 @@ class CentralServiceTest(CentralTestCase): self.assertIsNone(fip_ptr['ptrdname']) # Simulate the invalidation on the backend - zone_serial = self.central_service.get_zone( - elevated_a, zone_id).serial + zone = self.central_service.get_zone( + elevated_a, zone_id) + zone_serial = self.central_service.increment_zone_serial( + elevated_a, zone) self.central_service.update_status( elevated_a, zone_id, 'SUCCESS', zone_serial, 'UPDATE') @@ -2482,10 +2480,8 @@ class CentralServiceTest(CentralTestCase): elevated_a, criterion)[0].zone_id # Simulate the update on the backend - zone_serial = self.central_service.get_zone( - elevated_a, zone_id).serial self.central_service.update_status( - elevated_a, zone_id, 'SUCCESS', zone_serial, 'UPDATE') + elevated_a, zone_id, 'SUCCESS', timeutils.utcnow_ts(), 'UPDATE') self.network_api.fake.deallocate_floatingip(fip['id']) @@ -2495,7 +2491,7 @@ class CentralServiceTest(CentralTestCase): # Ensure that the record is still in DB (No invalidation) self.central_service.find_records(elevated_a, criterion) - # Now give the fip id to tenant 'b' and see that it get's deleted + # Now give the fip id to tenant 'b' and see that it gets deleted self.network_api.fake.allocate_floatingip( context_b.project_id, fip['id']) @@ -2505,10 +2501,8 @@ class CentralServiceTest(CentralTestCase): self.assertIsNone(fips[0]['ptrdname']) # Simulate the invalidation on the backend - zone_serial = self.central_service.get_zone( - elevated_a, zone_id).serial self.central_service.update_status( - elevated_a, zone_id, 'SUCCESS', zone_serial, 'UPDATE') + elevated_a, zone_id, 'SUCCESS', timeutils.utcnow_ts(), 'UPDATE') record = self.central_service.find_records(elevated_a, criterion)[0] self.assertEqual('NONE', record.action) @@ -3959,3 +3953,91 @@ class CentralServiceTest(CentralTestCase): retrived_shared_zone.target_project_id) self.assertEqual(shared_zone.project_id, retrived_shared_zone.project_id) + + def test_batch_increment_serial(self): + zone = self.create_zone() + zone_serial = zone.serial + self.assertFalse(zone.increment_serial) + + for index in range(10): + recordset = objects.RecordSet( + name='www.%d.%s' % (index, zone.name), + type='A', + records=objects.RecordList(objects=[ + objects.Record(data='192.0.2.%d' % index), + objects.Record(data='198.51.100.%d' % index), + ]) + ) + self.central_service.create_recordset( + self.admin_context, zone.id, recordset=recordset + ) + + updated_zone = self.central_service.get_zone( + self.admin_context, zone.id + ) + recordsets = self.central_service.find_recordsets( + self.admin_context, + criterion={'zone_id': zone.id, 'type': 'A'} + ) + + # Increment serial hasn't been triggered yet. + self.assertEqual(zone_serial, updated_zone.serial) + self.assertTrue(updated_zone.increment_serial) + + self.assertEqual('PENDING', updated_zone.status) + self.assertEqual(10, len(recordsets)) + for recordset in recordsets: + self.assertEqual('PENDING', recordset.status) + self.assertEqual(2, len(recordset.records)) + for record in recordset.records: + self.assertEqual('PENDING', record.status) + + # Increment serial (Producer -> Central) for zone. + with mock.patch.object(timeutils, 'utcnow_ts', + return_value=zone_serial + 5): + self.central_service.increment_zone_serial( + self.admin_context, zone + ) + + updated_zone = self.central_service.get_zone( + self.admin_context, zone.id + ) + recordsets = self.central_service.find_recordsets( + self.admin_context, + criterion={'zone_id': zone.id, 'type': 'A'} + ) + + # Ensure that serial is now correct. + self.assertEqual(zone_serial + 5, updated_zone.serial) + self.assertFalse(updated_zone.increment_serial) + + # But the zone is still in pending status as we haven't notified + # the upstream dns servers yet. + self.assertEqual('PENDING', updated_zone.status) + for recordset in recordsets: + self.assertEqual('PENDING', recordset.status) + for record in recordset.records: + self.assertEqual('PENDING', record.status) + + # Trigger update_status (Producer -> Worker -> Central). + # This happens after the upstream DNS servers have been notified + # and updated. + self.central_service.update_status( + self.admin_context, zone.id, 'SUCCESS', updated_zone.serial + ) + + updated_zone = self.central_service.get_zone( + self.admin_context, zone.id + ) + recordsets = self.central_service.find_recordsets( + self.admin_context, + criterion={'zone_id': zone.id, 'type': 'A'} + ) + + # Validate that the status is now ACTIVE. + self.assertEqual('ACTIVE', updated_zone.status) + self.assertEqual(zone_serial + 5, updated_zone.serial) + for recordset in recordsets: + self.assertEqual('ACTIVE', recordset.status) + for record in recordset.records: + self.assertEqual('ACTIVE', record.status) diff --git a/designate/tests/test_producer/test_tasks.py b/designate/tests/test_producer/test_tasks.py index 99f8f6f4..0aaebb3c 100644 --- a/designate/tests/test_producer/test_tasks.py +++ b/designate/tests/test_producer/test_tasks.py @@ -15,6 +15,7 @@ # under the License. import datetime +from unittest import mock from oslo_log import log as logging from oslo_utils import timeutils @@ -24,6 +25,7 @@ from designate.storage.impl_sqlalchemy import tables from designate.storage import sql from designate.tests import fixtures from designate.tests import TestCase +from designate.worker import rpcapi as worker_api LOG = logging.getLogger(__name__) @@ -39,7 +41,7 @@ class DeletedZonePurgeTest(TestCase): self.config( time_threshold=self.time_threshold, batch_size=self.batch_size, - group="producer_task:zone_purge" + group='producer_task:zone_purge' ) self.purge_task_fixture = self.useFixture( fixtures.ZoneManagerTaskFixture(tasks.DeletedZonePurgeTask) @@ -77,7 +79,7 @@ class DeletedZonePurgeTest(TestCase): age = index * (self.time_threshold // self.number_of_zones * 2) - 1 delta = datetime.timedelta(seconds=age) deletion_time = now - delta - name = "example%d.org." % index + name = 'example%d.org.' % index self._create_deleted_zone(name, deletion_time) def test_purge_zones(self): @@ -101,9 +103,8 @@ class PeriodicGenerateDelayedNotifyTaskTest(TestCase): super(PeriodicGenerateDelayedNotifyTaskTest, self).setUp() self.config(quota_zones=self.number_of_zones) self.config( - interval=1, batch_size=self.batch_size, - group="producer_task:delayed_notify" + group='producer_task:delayed_notify' ) self.generate_delayed_notify_task_fixture = self.useFixture( fixtures.ZoneManagerTaskFixture( @@ -123,7 +124,7 @@ class PeriodicGenerateDelayedNotifyTaskTest(TestCase): def _create_zones(self): # Create a number of zones; half of them with delayed_notify set. for index in range(self.number_of_zones): - name = "example%d.org." % index + name = 'example%d.org.' % index delayed_notify = (index % 2 == 0) self.create_zone( name=name, @@ -149,3 +150,43 @@ class PeriodicGenerateDelayedNotifyTaskTest(TestCase): remaining, len(zones), message='Remaining zones: %s' % zones ) + + +class PeriodicIncrementSerialTaskTest(TestCase): + number_of_zones = 20 + batch_size = 20 + + def setUp(self): + super(PeriodicIncrementSerialTaskTest, self).setUp() + self.worker_api = mock.Mock() + mock.patch.object(worker_api.WorkerAPI, 'get_instance', + return_value=self.worker_api).start() + self.config(quota_zones=self.number_of_zones) + self.config( + batch_size=self.batch_size, + group='producer_task:increment_serial' + ) + self.increment_serial_task_fixture = self.useFixture( + fixtures.ZoneManagerTaskFixture( + tasks.PeriodicIncrementSerialTask + ) + ) + + def _create_zones(self): + for index in range(self.number_of_zones): + name = 'example%d.org.' % index + increment_serial = (index % 2 == 0) + delayed_notify = (index % 4 == 0) + self.create_zone( + name=name, + increment_serial=increment_serial, + delayed_notify=delayed_notify, + ) + + def test_increment_serial(self): + self._create_zones() + + self.increment_serial_task_fixture.task() + + self.worker_api.update_zone.assert_called() + self.assertEqual(5, self.worker_api.update_zone.call_count) diff --git a/designate/tests/test_storage/test_sqlalchemy.py b/designate/tests/test_storage/test_sqlalchemy.py index 95122396..9cd05b04 100644 --- a/designate/tests/test_storage/test_sqlalchemy.py +++ b/designate/tests/test_storage/test_sqlalchemy.py @@ -113,6 +113,7 @@ class SqlalchemyStorageTest(StorageTestCase, TestCase): }, "zones": { "delayed_notify": "CREATE INDEX delayed_notify ON zones (delayed_notify)", # noqa + "increment_serial": "CREATE INDEX increment_serial ON zones (increment_serial)", # noqa "reverse_name_deleted": "CREATE INDEX reverse_name_deleted ON zones (reverse_name, deleted)", # noqa "zone_created_at": "CREATE INDEX zone_created_at ON zones (created_at)", # noqa "zone_deleted": "CREATE INDEX zone_deleted ON zones (deleted)", diff --git a/designate/tests/unit/producer/test_tasks.py b/designate/tests/unit/producer/test_tasks.py index a4461ded..f7b69f14 100644 --- a/designate/tests/unit/producer/test_tasks.py +++ b/designate/tests/unit/producer/test_tasks.py @@ -31,7 +31,9 @@ from designate import context from designate.producer import tasks from designate import rpc from designate.tests.unit import RoObject +from designate.tests.unit import RwObject from designate.utils import generate_uuid +from designate.worker import rpcapi as worker_api DUMMY_TASK_GROUP = cfg.OptGroup( name='producer_task:dummy', @@ -244,3 +246,82 @@ class PeriodicSecondaryRefreshTest(oslotest.base.BaseTestCase): self.task() self.assertFalse(self.central.xfr_zone.called) + + +class PeriodicIncrementSerialTest(oslotest.base.BaseTestCase): + def setUp(self): + super(PeriodicIncrementSerialTest, self).setUp() + self.useFixture(cfg_fixture.Config(CONF)) + + self.central_api = mock.Mock() + self.context = mock.Mock() + self.worker_api = mock.Mock() + mock.patch.object(worker_api.WorkerAPI, 'get_instance', + return_value=self.worker_api).start() + mock.patch.object(central_api.CentralAPI, 'get_instance', + return_value=self.central_api).start() + mock.patch.object(context.DesignateContext, 'get_admin_context', + return_value=self.context).start() + self.central_api.increment_zone_serial.return_value = 123 + self.task = tasks.PeriodicIncrementSerialTask() + self.task.my_partitions = 0, 9 + + def test_increment_zone(self): + zone = RoObject( + id=generate_uuid(), + action='CREATE', + increment_serial=True, + delayed_notify=False, + ) + self.central_api.find_zones.return_value = [zone] + + self.task() + + self.central_api.increment_zone_serial.assert_called() + self.worker_api.update_zone.assert_called() + + def test_increment_zone_with_action_none(self): + zone = RwObject( + id=generate_uuid(), + action='NONE', + status='ACTIVE', + increment_serial=True, + delayed_notify=False, + ) + self.central_api.find_zones.return_value = [zone] + + self.task() + + self.central_api.increment_zone_serial.assert_called() + self.worker_api.update_zone.assert_called() + + self.assertEqual('UPDATE', zone.action) + self.assertEqual('PENDING', zone.status) + + def test_increment_zone_with_delayed_notify(self): + zone = RoObject( + id=generate_uuid(), + action='CREATE', + increment_serial=True, + delayed_notify=True, + ) + self.central_api.find_zones.return_value = [zone] + + self.task() + + self.central_api.increment_zone_serial.assert_called() + self.worker_api.update_zone.assert_not_called() + + def test_increment_zone_skip_deleted(self): + zone = RoObject( + id=generate_uuid(), + action='DELETE', + increment_serial=True, + delayed_notify=False, + ) + self.central_api.find_zones.return_value = [zone] + + self.task() + + self.central_api.increment_zone_serial.assert_not_called() + self.worker_api.update_zone.assert_not_called() diff --git a/designate/tests/unit/test_central/test_basic.py b/designate/tests/unit/test_central/test_basic.py index 8a8068e5..3726406d 100644 --- a/designate/tests/unit/test_central/test_basic.py +++ b/designate/tests/unit/test_central/test_basic.py @@ -179,6 +179,7 @@ class MockRecordSet(object): ttl = 1 type = "PRIMARY" serial = 123 + records = [] def obj_attr_is_set(self, n): if n == 'records': @@ -418,8 +419,9 @@ class CentralServiceTestCase(CentralBasic): central_service._is_valid_ttl = mock.Mock() central_service.storage.create_recordset = mock.Mock(return_value='rs') - central_service._update_zone_in_storage = mock.Mock() - + central_service._update_zone_in_storage = mock.Mock( + return_value=Mockzone() + ) recordset = mock.Mock(spec=objects.RecordSet) recordset.obj_attr_is_set.return_value = True recordset.records = [MockRecord()] @@ -440,7 +442,9 @@ class CentralServiceTestCase(CentralBasic): self.service._is_valid_ttl = mock.Mock() self.service.storage.create_recordset = mock.Mock(return_value='rs') - self.service._update_zone_in_storage = mock.Mock() + self.service._update_zone_in_storage = mock.Mock( + return_value=Mockzone() + ) # NOTE(thirose): Since this is a race condition we assume that # we will hit it if we try to do the operations in a loop 100 times. @@ -1506,8 +1510,6 @@ class CentralZoneTestCase(CentralBasic): self.service.delete_recordset(self.context, CentralZoneTestCase.zone__id_2, CentralZoneTestCase.recordset__id) - self.assertTrue( - self.service.worker_api.update_zone.called) self.assertTrue( self.service._delete_recordset_in_storage.called) @@ -1524,6 +1526,7 @@ class CentralZoneTestCase(CentralBasic): action='', status='', serial=0, + increment_serial=False, ) ]) ) @@ -1533,7 +1536,7 @@ class CentralZoneTestCase(CentralBasic): self.assertEqual(1, len(rs.records)) self.assertEqual('DELETE', rs.records[0].action) self.assertEqual('PENDING', rs.records[0].status) - self.assertEqual(1, rs.records[0].serial) + self.assertTrue(rs.records[0].serial, 1) def test_delete_recordset_in_storage_no_increment_serial(self): self.service._update_zone_in_storage = mock.Mock() diff --git a/designate/tests/unit/test_utils.py b/designate/tests/unit/test_utils.py index 4734d004..42dfb298 100644 --- a/designate/tests/unit/test_utils.py +++ b/designate/tests/unit/test_utils.py @@ -16,7 +16,6 @@ import jinja2 from oslo_concurrency import processutils from oslo_config import cfg from oslo_config import fixture as cfg_fixture -from oslo_utils import timeutils import oslotest.base from designate import exceptions @@ -213,22 +212,6 @@ class TestUtils(oslotest.base.BaseTestCase): self.assertEqual('Hello World', result) - @mock.patch.object(timeutils, 'utcnow_ts') - def test_increment_serial_lower_than_ts(self, mock_utcnow_ts): - mock_utcnow_ts.return_value = 1561698354 - - ret_serial = utils.increment_serial(serial=1) - - self.assertEqual(1561698354, ret_serial) - - @mock.patch.object(timeutils, 'utcnow_ts') - def test_increment_serial_higher_than_ts(self, mock_utcnow_ts): - mock_utcnow_ts.return_value = 1561698354 - - ret_serial = utils.increment_serial(serial=1561698354 * 2) - - self.assertEqual(1561698354 * 2 + 1, ret_serial) - def test_is_uuid_like(self): self.assertTrue( utils.is_uuid_like('ce9fcd6b-d546-4397-8a49-8ceaec37cb64') diff --git a/designate/utils.py b/designate/utils.py index a14a6741..dd0198c1 100644 --- a/designate/utils.py +++ b/designate/utils.py @@ -26,7 +26,6 @@ from oslo_config import cfg from oslo_log import log as logging from oslo_serialization import jsonutils from oslo_utils.netutils import is_valid_ipv6 -from oslo_utils import timeutils from oslo_utils import uuidutils import pkg_resources @@ -119,16 +118,6 @@ def execute(*cmd, **kw): root_helper=root_helper, **kw) -def increment_serial(serial=0): - # This provides for *roughly* unix timestamp based serial numbers - new_serial = timeutils.utcnow_ts() - - if new_serial <= serial: - new_serial = serial + 1 - - return new_serial - - def deep_dict_merge(a, b): if not isinstance(b, dict): return b diff --git a/designate/worker/tasks/zone.py b/designate/worker/tasks/zone.py index 0ee0b3a3..a7aeede6 100644 --- a/designate/worker/tasks/zone.py +++ b/designate/worker/tasks/zone.py @@ -26,7 +26,6 @@ from oslo_utils import timeutils from designate import dnsutils from designate import exceptions from designate import objects -from designate import utils from designate.worker.tasks import base LOG = logging.getLogger(__name__) @@ -795,11 +794,10 @@ class RecoverShard(base.Task): # Include things that have been hanging out in PENDING # status for longer than they should # Generate the current serial, will provide a UTC Unix TS. - current = utils.increment_serial() stale_criterion = { 'shard': "BETWEEN %s,%s" % (self.begin_shard, self.end_shard), 'status': 'PENDING', - 'serial': "<%s" % (current - self.max_prop_time) + 'serial': "<%s" % (timeutils.utcnow_ts() - self.max_prop_time) } stale_zones = self.storage.find_zones(self.context, stale_criterion) diff --git a/releasenotes/notes/batch-increment-serial-07485eb3bbbac6c3.yaml b/releasenotes/notes/batch-increment-serial-07485eb3bbbac6c3.yaml new file mode 100644 index 00000000..04293940 --- /dev/null +++ b/releasenotes/notes/batch-increment-serial-07485eb3bbbac6c3.yaml @@ -0,0 +1,6 @@ +--- +features: + - | + Moved zone serial updates to a `designate-producer` task called + `increment_serial` to fix race conditions and to reduce the number of + updates to the upstream DNS servers when performing multiple DNS updates. @@ -118,6 +118,7 @@ designate.producer_tasks = periodic_exists = designate.producer.tasks:PeriodicExistsTask periodic_secondary_refresh = designate.producer.tasks:PeriodicSecondaryRefreshTask delayed_notify = designate.producer.tasks:PeriodicGenerateDelayedNotifyTask + increment_serial = designate.producer.tasks:PeriodicIncrementSerialTask worker_periodic_recovery = designate.producer.tasks:WorkerPeriodicRecovery designate.heartbeat_emitter = |