summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorErik Olof Gunnar Andersson <eandersson@blizzard.com>2023-01-21 18:13:31 -0800
committerErik Olof Gunnar Andersson <eandersson@blizzard.com>2023-04-04 03:48:34 +0000
commitfb9c2da977a36afc695b1e75a81aa85d8e56ef0a (patch)
treeaffaab8de5db9e4d7c5003769e8caa2c66fe18fe
parent0f6a837a23f218158e487f99219c4a63d80696cb (diff)
downloaddesignate-fb9c2da977a36afc695b1e75a81aa85d8e56ef0a.tar.gz
Move to a batch model for incrementing serial
This patch moves the responsibility of incrementing the serial on a zone from central to the producer. This also means that NOTIFY is triggered by the producer after the serial has been incremented. The advantage of this approach is that we can now batch requests which means less work for the DNS servers, and it removes the risk of race-conditions when updating the serial. Finally, the producer is sharded and is easy to scale which means that this approach should scale well with many zones. The disadvantage is that it may take up to 5 seconds longer for the DNS record to be updated on the DNS server. This can be lowered by increasing the frequency of the task that is responsible for incrementing the serial. Depends-On: https://review.opendev.org/#/c/871266/ Change-Id: I5e9733abaaa40c874e1d80d7b57e563df0f12cee
-rw-r--r--designate/central/rpcapi.py8
-rw-r--r--designate/central/service.py199
-rw-r--r--designate/conf/producer.py17
-rw-r--r--designate/objects/zone.py1
-rw-r--r--designate/producer/tasks.py62
-rw-r--r--designate/sqlalchemy/base.py4
-rw-r--r--designate/storage/base.py9
-rw-r--r--designate/storage/impl_sqlalchemy/__init__.py14
-rw-r--r--designate/storage/impl_sqlalchemy/alembic/versions/a005af3aa38e_add_increment_serial.py38
-rw-r--r--designate/storage/impl_sqlalchemy/tables.py1
-rw-r--r--designate/tests/test_api/test_v2/test_recordsets.py6
-rw-r--r--designate/tests/test_central/test_service.py160
-rw-r--r--designate/tests/test_producer/test_tasks.py51
-rw-r--r--designate/tests/test_storage/test_sqlalchemy.py1
-rw-r--r--designate/tests/unit/producer/test_tasks.py81
-rw-r--r--designate/tests/unit/test_central/test_basic.py15
-rw-r--r--designate/tests/unit/test_utils.py17
-rw-r--r--designate/utils.py11
-rw-r--r--designate/worker/tasks/zone.py4
-rw-r--r--releasenotes/notes/batch-increment-serial-07485eb3bbbac6c3.yaml6
-rw-r--r--setup.cfg1
21 files changed, 517 insertions, 189 deletions
diff --git a/designate/central/rpcapi.py b/designate/central/rpcapi.py
index 3932690d..d761cc91 100644
--- a/designate/central/rpcapi.py
+++ b/designate/central/rpcapi.py
@@ -68,8 +68,9 @@ class CentralAPI(object):
6.4 - Removed unused record and diagnostic methods
6.5 - Removed additional unused methods
6.6 - Add methods for shared zones
+ 6.7 - Add increment_zone_serial
"""
- RPC_API_VERSION = '6.6'
+ RPC_API_VERSION = '6.7'
# This allows us to mark some methods as not logged.
# This can be for a few reasons - some methods my not actually call over
@@ -82,7 +83,7 @@ class CentralAPI(object):
target = messaging.Target(topic=self.topic,
version=self.RPC_API_VERSION)
- self.client = rpc.get_client(target, version_cap='6.6')
+ self.client = rpc.get_client(target, version_cap='6.7')
@classmethod
def get_instance(cls):
@@ -141,6 +142,9 @@ class CentralAPI(object):
return self.client.call(context, 'get_tenant', tenant_id=tenant_id)
# Zone Methods
+ def increment_zone_serial(self, context, zone):
+ return self.client.call(context, 'increment_zone_serial', zone=zone)
+
def create_zone(self, context, zone):
return self.client.call(context, 'create_zone', zone=zone)
diff --git a/designate/central/service.py b/designate/central/service.py
index fa7be5f3..f9e220e4 100644
--- a/designate/central/service.py
+++ b/designate/central/service.py
@@ -27,6 +27,7 @@ from dns import zone as dnszone
from oslo_config import cfg
from oslo_log import log as logging
import oslo_messaging as messaging
+from oslo_utils import timeutils
from designate.common import constants
from designate.common.decorators import lock
@@ -51,7 +52,7 @@ LOG = logging.getLogger(__name__)
class Service(service.RPCService):
- RPC_API_VERSION = '6.6'
+ RPC_API_VERSION = '6.7'
target = messaging.Target(version=RPC_API_VERSION)
@@ -349,48 +350,35 @@ class Service(service.RPCService):
"A project ID must be specified when not using a project "
"scoped token.")
- def _increment_zone_serial(self, context, zone, set_delayed_notify=False):
- """Update the zone serial and the SOA record
- Optionally set delayed_notify to have PM issue delayed notify
- """
-
- # Increment the serial number
- zone.serial = utils.increment_serial(zone.serial)
- if set_delayed_notify:
- zone.delayed_notify = True
-
- zone = self.storage.update_zone(context, zone)
-
- # Update SOA record
- self._update_soa(context, zone)
-
- return zone
-
# SOA Recordset Methods
- def _build_soa_record(self, zone, ns_records):
- return "%s %s. %d %d %d %d %d" % (ns_records[0]['hostname'],
- zone['email'].replace("@", "."),
- zone['serial'],
- zone['refresh'],
- zone['retry'],
- zone['expire'],
- zone['minimum'])
+ @staticmethod
+ def _build_soa_record(zone, ns_records):
+ return '%s %s. %d %d %d %d %d' % (
+ ns_records[0]['hostname'],
+ zone['email'].replace('@', '.'),
+ zone['serial'],
+ zone['refresh'],
+ zone['retry'],
+ zone['expire'],
+ zone['minimum']
+ )
def _create_soa(self, context, zone):
pool_ns_records = self._get_pool_ns_records(context, zone.pool_id)
-
- soa_values = [self._build_soa_record(zone, pool_ns_records)]
- recordlist = objects.RecordList(objects=[
- objects.Record(data=r, managed=True) for r in soa_values])
- values = {
- 'name': zone['name'],
- 'type': "SOA",
- 'records': recordlist
- }
- soa, zone = self._create_recordset_in_storage(
- context, zone, objects.RecordSet(**values),
- increment_serial=False)
- return soa
+ records = objects.RecordList(objects=[
+ objects.Record(
+ data=self._build_soa_record(zone, pool_ns_records),
+ managed=True
+ )
+ ])
+ return self._create_recordset_in_storage(
+ context, zone,
+ objects.RecordSet(
+ name=zone['name'],
+ type='SOA',
+ records=records
+ ), increment_serial=False
+ )[0]
def _update_soa(self, context, zone):
# NOTE: We should not be updating SOA records when a zone is SECONDARY.
@@ -400,14 +388,18 @@ class Service(service.RPCService):
# Get the pool for it's list of ns_records
pool_ns_records = self._get_pool_ns_records(context, zone.pool_id)
- soa = self.find_recordset(context,
- criterion={'zone_id': zone['id'],
- 'type': "SOA"})
+ soa = self.find_recordset(
+ context, criterion={
+ 'zone_id': zone['id'],
+ 'type': 'SOA'
+ }
+ )
soa.records[0].data = self._build_soa_record(zone, pool_ns_records)
- self._update_recordset_in_storage(context, zone, soa,
- increment_serial=False)
+ self._update_recordset_in_storage(
+ context, zone, soa, increment_serial=False
+ )
# NS Recordset Methods
def _create_ns(self, context, zone, ns_records):
@@ -731,6 +723,14 @@ class Service(service.RPCService):
return pool.ns_records
@rpc.expected_exceptions()
+ @transaction
+ @lock.synchronized_zone()
+ def increment_zone_serial(self, context, zone):
+ zone.serial = self.storage.increment_serial(context, zone.id)
+ self._update_soa(context, zone)
+ return zone.serial
+
+ @rpc.expected_exceptions()
@notification.notify_type('dns.domain.create')
@notification.notify_type('dns.zone.create')
@lock.synchronized_zone(new_zone=True)
@@ -853,7 +853,8 @@ class Service(service.RPCService):
# can be very long-lived.
time.sleep(0)
self._create_recordset_in_storage(
- context, zone, rrset, increment_serial=False)
+ context, zone, rrset, increment_serial=False
+ )
return zone
@@ -992,29 +993,28 @@ class Service(service.RPCService):
"""Update zone
"""
zone = self._update_zone_in_storage(
- context, zone, increment_serial=increment_serial)
+ context, zone, increment_serial=increment_serial
+ )
# Fire off a XFR
if 'masters' in changes:
self.worker_api.perform_zone_xfr(context, zone)
- self.worker_api.update_zone(context, zone)
-
return zone
@transaction
def _update_zone_in_storage(self, context, zone,
- increment_serial=True, set_delayed_notify=False):
-
+ increment_serial=True,
+ set_delayed_notify=False):
zone.action = 'UPDATE'
zone.status = 'PENDING'
if increment_serial:
- # _increment_zone_serial increments and updates the zone
- zone = self._increment_zone_serial(
- context, zone, set_delayed_notify=set_delayed_notify)
- else:
- zone = self.storage.update_zone(context, zone)
+ zone.increment_serial = True
+ if set_delayed_notify:
+ zone.delayed_notify = True
+
+ zone = self.storage.update_zone(context, zone)
return zone
@@ -1329,7 +1329,6 @@ class Service(service.RPCService):
# RecordSet Methods
@rpc.expected_exceptions()
@notification.notify_type('dns.recordset.create')
- @lock.synchronized_zone()
def create_recordset(self, context, zone_id, recordset,
increment_serial=True):
zone = self.storage.get_zone(context, zone_id,
@@ -1372,9 +1371,8 @@ class Service(service.RPCService):
context = context.elevated(all_tenants=True)
recordset, zone = self._create_recordset_in_storage(
- context, zone, recordset, increment_serial=increment_serial)
-
- self.worker_api.update_zone(context, zone)
+ context, zone, recordset, increment_serial=increment_serial
+ )
recordset.zone_name = zone.name
recordset.obj_reset_changes(['zone_name'])
@@ -1412,33 +1410,33 @@ class Service(service.RPCService):
@transaction_shallow_copy
def _create_recordset_in_storage(self, context, zone, recordset,
increment_serial=True):
-
# Ensure the tenant has enough quota to continue
self._enforce_recordset_quota(context, zone)
-
self._validate_recordset(context, zone, recordset)
- if recordset.obj_attr_is_set('records') and len(recordset.records) > 0:
-
+ if recordset.obj_attr_is_set('records') and recordset.records:
# Ensure the tenant has enough zone record quotas to
# create new records
self._enforce_record_quota(context, zone, recordset)
- if increment_serial:
- # update the zone's status and increment the serial
- zone = self._update_zone_in_storage(
- context, zone, increment_serial)
-
for record in recordset.records:
record.action = 'CREATE'
record.status = 'PENDING'
- record.serial = zone.serial
+ if not increment_serial:
+ record.serial = zone.serial
+ else:
+ record.serial = timeutils.utcnow_ts()
- recordset = self.storage.create_recordset(context, zone.id,
- recordset)
+ new_recordset = self.storage.create_recordset(context, zone.id,
+ recordset)
+ if recordset.records and increment_serial:
+ # update the zone's status and increment the serial
+ zone = self._update_zone_in_storage(
+ context, zone, increment_serial
+ )
# Return the zone too in case it was updated
- return (recordset, zone)
+ return new_recordset, zone
@rpc.expected_exceptions()
def get_recordset(self, context, zone_id, recordset_id):
@@ -1553,7 +1551,6 @@ class Service(service.RPCService):
@rpc.expected_exceptions()
@notification.notify_type('dns.recordset.update')
- @lock.synchronized_zone()
def update_recordset(self, context, recordset, increment_serial=True):
zone_id = recordset.obj_get_original_value('zone_id')
changes = recordset.obj_get_changes()
@@ -1622,41 +1619,44 @@ class Service(service.RPCService):
recordset, zone = self._update_recordset_in_storage(
context, zone, recordset, increment_serial=increment_serial)
- self.worker_api.update_zone(context, zone)
-
return recordset
@transaction
def _update_recordset_in_storage(self, context, zone, recordset,
- increment_serial=True, set_delayed_notify=False):
+ increment_serial=True,
+ set_delayed_notify=False):
self._validate_recordset(context, zone, recordset)
- if increment_serial:
- # update the zone's status and increment the serial
- zone = self._update_zone_in_storage(
- context, zone, increment_serial,
- set_delayed_notify=set_delayed_notify)
-
if recordset.records:
for record in recordset.records:
- if record.action != 'DELETE':
- record.action = 'UPDATE'
- record.status = 'PENDING'
+ if record.action == 'DELETE':
+ continue
+ record.action = 'UPDATE'
+ record.status = 'PENDING'
+ if not increment_serial:
record.serial = zone.serial
+ else:
+ record.serial = timeutils.utcnow_ts()
# Ensure the tenant has enough zone record quotas to
# create new records
self._enforce_record_quota(context, zone, recordset)
# Update the recordset
- recordset = self.storage.update_recordset(context, recordset)
+ new_recordset = self.storage.update_recordset(context, recordset)
- return recordset, zone
+ if increment_serial:
+ # update the zone's status and increment the serial
+ zone = self._update_zone_in_storage(
+ context, zone,
+ increment_serial=increment_serial,
+ set_delayed_notify=set_delayed_notify)
+
+ return new_recordset, zone
@rpc.expected_exceptions()
@notification.notify_type('dns.recordset.delete')
- @lock.synchronized_zone()
def delete_recordset(self, context, zone_id, recordset_id,
increment_serial=True):
# apply_tenant_criteria=False here as we will gate this delete
@@ -1708,8 +1708,6 @@ class Service(service.RPCService):
recordset, zone = self._delete_recordset_in_storage(
context, zone, recordset, increment_serial=increment_serial)
- self.worker_api.update_zone(context, zone)
-
recordset.zone_name = zone.name
recordset.obj_reset_changes(['zone_name'])
@@ -1718,23 +1716,26 @@ class Service(service.RPCService):
@transaction
def _delete_recordset_in_storage(self, context, zone, recordset,
increment_serial=True):
-
- if increment_serial:
- # update the zone's status and increment the serial
- zone = self._update_zone_in_storage(
- context, zone, increment_serial)
-
if recordset.records:
for record in recordset.records:
record.action = 'DELETE'
record.status = 'PENDING'
- record.serial = zone.serial
+ if not increment_serial:
+ record.serial = zone.serial
+ else:
+ record.serial = timeutils.utcnow_ts()
# Update the recordset's action/status and then delete it
self.storage.update_recordset(context, recordset)
- recordset = self.storage.delete_recordset(context, recordset.id)
- return (recordset, zone)
+ if increment_serial:
+ # update the zone's status and increment the serial
+ zone = self._update_zone_in_storage(
+ context, zone, increment_serial)
+
+ new_recordset = self.storage.delete_recordset(context, recordset.id)
+
+ return new_recordset, zone
@rpc.expected_exceptions()
def count_recordsets(self, context, criterion=None):
diff --git a/designate/conf/producer.py b/designate/conf/producer.py
index 44f5ec7d..4aa58731 100644
--- a/designate/conf/producer.py
+++ b/designate/conf/producer.py
@@ -20,6 +20,11 @@ PRODUCER_GROUP = cfg.OptGroup(
title='Configuration for Producer Service'
)
+PRODUCER_TASK_INCREMENT_SERIAL_GROUP = cfg.OptGroup(
+ name='producer_task:increment_serial',
+ title='Configuration for Producer Task: Increment Serial'
+)
+
PRODUCER_TASK_DELAYED_NOTIFY_GROUP = cfg.OptGroup(
name='producer_task:delayed_notify',
title='Configuration for Producer Task: Delayed Notify'
@@ -62,6 +67,15 @@ PRODUCER_OPTS = [
help='RPC topic name for producer'),
]
+PRODUCER_TASK_INCREMENT_SERIAL_OPTS = [
+ cfg.IntOpt('interval', default=5,
+ help='Run interval in seconds'),
+ cfg.IntOpt('per_page', default=100,
+ help='Default amount of results returned per page'),
+ cfg.IntOpt('batch_size', default=100,
+ help='How many zones to increment serial for on each run'),
+]
+
PRODUCER_TASK_DELAYED_NOTIFY_OPTS = [
cfg.IntOpt('interval', default=5,
help='Run interval in seconds'),
@@ -111,6 +125,9 @@ def register_opts(conf):
conf.register_group(PRODUCER_TASK_DELAYED_NOTIFY_GROUP)
conf.register_opts(PRODUCER_TASK_DELAYED_NOTIFY_OPTS,
group=PRODUCER_TASK_DELAYED_NOTIFY_GROUP)
+ conf.register_group(PRODUCER_TASK_INCREMENT_SERIAL_GROUP)
+ conf.register_opts(PRODUCER_TASK_INCREMENT_SERIAL_OPTS,
+ group=PRODUCER_TASK_INCREMENT_SERIAL_GROUP)
conf.register_group(PRODUCER_TASK_PERIODIC_EXISTS_GROUP)
conf.register_opts(PRODUCER_TASK_PERIODIC_EXISTS_OPTS,
group=PRODUCER_TASK_PERIODIC_EXISTS_GROUP)
diff --git a/designate/objects/zone.py b/designate/objects/zone.py
index fae67bc5..dc27d271 100644
--- a/designate/objects/zone.py
+++ b/designate/objects/zone.py
@@ -66,6 +66,7 @@ class Zone(base.DesignateObject, base.DictObjectMixin,
),
'transferred_at': fields.DateTimeField(nullable=True, read_only=False),
'delayed_notify': fields.BooleanField(nullable=True),
+ 'increment_serial': fields.BooleanField(nullable=True),
}
STRING_KEYS = [
diff --git a/designate/producer/tasks.py b/designate/producer/tasks.py
index 521d378e..41f3a78d 100644
--- a/designate/producer/tasks.py
+++ b/designate/producer/tasks.py
@@ -227,8 +227,6 @@ class PeriodicGenerateDelayedNotifyTask(PeriodicTask):
Call Worker to emit NOTIFY transactions,
Reset the flag.
"""
- pstart, pend = self._my_range()
-
ctxt = context.DesignateContext.get_admin_context()
ctxt.all_tenants = True
@@ -237,6 +235,7 @@ class PeriodicGenerateDelayedNotifyTask(PeriodicTask):
# There's an index on delayed_notify.
criterion = self._filter_between('shard')
criterion['delayed_notify'] = True
+ criterion['increment_serial'] = False
zones = self.central_api.find_zones(
ctxt,
criterion,
@@ -246,6 +245,17 @@ class PeriodicGenerateDelayedNotifyTask(PeriodicTask):
)
for zone in zones:
+ if zone.action == 'NONE':
+ zone.action = 'UPDATE'
+ zone.status = 'PENDING'
+ elif zone.action == 'DELETE':
+ LOG.debug(
+ 'Skipping delayed NOTIFY for %(id)s being DELETED',
+ {
+ 'id': zone.id,
+ }
+ )
+ continue
self.worker_api.update_zone(ctxt, zone)
zone.delayed_notify = False
self.central_api.update_zone(ctxt, zone)
@@ -257,6 +267,54 @@ class PeriodicGenerateDelayedNotifyTask(PeriodicTask):
)
+class PeriodicIncrementSerialTask(PeriodicTask):
+ __plugin_name__ = 'increment_serial'
+
+ def __init__(self):
+ super(PeriodicIncrementSerialTask, self).__init__()
+
+ def __call__(self):
+ ctxt = context.DesignateContext.get_admin_context()
+ ctxt.all_tenants = True
+
+ # Select zones where "increment_serial" is set and starting from the
+ # oldest "updated_at".
+ # There's an index on increment_serial.
+ criterion = self._filter_between('shard')
+ criterion['increment_serial'] = True
+ zones = self.central_api.find_zones(
+ ctxt,
+ criterion,
+ limit=CONF[self.name].batch_size,
+ sort_key='updated_at',
+ sort_dir='asc',
+ )
+ for zone in zones:
+ if zone.action == 'DELETE':
+ LOG.debug(
+ 'Skipping increment serial for %(id)s being DELETED',
+ {
+ 'id': zone.id,
+ }
+ )
+ continue
+
+ serial = self.central_api.increment_zone_serial(ctxt, zone)
+ LOG.debug(
+ 'Incremented serial for %(id)s to %(serial)d',
+ {
+ 'id': zone.id,
+ 'serial': serial,
+ }
+ )
+ if not zone.delayed_notify:
+ # Notify the backend.
+ if zone.action == 'NONE':
+ zone.action = 'UPDATE'
+ zone.status = 'PENDING'
+ self.worker_api.update_zone(ctxt, zone)
+
+
class WorkerPeriodicRecovery(PeriodicTask):
__plugin_name__ = 'worker_periodic_recovery'
diff --git a/designate/sqlalchemy/base.py b/designate/sqlalchemy/base.py
index 8393e1d6..5512a7be 100644
--- a/designate/sqlalchemy/base.py
+++ b/designate/sqlalchemy/base.py
@@ -152,7 +152,7 @@ class SQLAlchemy(object, metaclass=abc.ABCMeta):
# Ensure the Object is valid
# obj.validate()
- values = obj.obj_get_changes()
+ values = dict(obj)
if skip_values is not None:
for skip_value in skip_values:
@@ -166,7 +166,7 @@ class SQLAlchemy(object, metaclass=abc.ABCMeta):
with sql.get_write_session() as session:
try:
- resultproxy = session.execute(query, [dict(values)])
+ resultproxy = session.execute(query, [values])
except oslo_db_exception.DBDuplicateEntry:
raise exc_dup("Duplicate %s" % obj.obj_name())
diff --git a/designate/storage/base.py b/designate/storage/base.py
index 347f9778..5b9cd221 100644
--- a/designate/storage/base.py
+++ b/designate/storage/base.py
@@ -745,6 +745,15 @@ class Storage(DriverPlugin, metaclass=abc.ABCMeta):
"""
@abc.abstractmethod
+ def increment_serial(self, context, zone_id):
+ """
+ Increment serial of a Zone
+
+ :param context: RPC Context.
+ :param zone_id: ID of the Zone.
+ """
+
+ @abc.abstractmethod
def delete_zone_import(self, context, zone_import_id):
"""
Delete a Zone Import via ID.
diff --git a/designate/storage/impl_sqlalchemy/__init__.py b/designate/storage/impl_sqlalchemy/__init__.py
index f9a4c009..1a89709d 100644
--- a/designate/storage/impl_sqlalchemy/__init__.py
+++ b/designate/storage/impl_sqlalchemy/__init__.py
@@ -15,6 +15,7 @@
# under the License.
from oslo_log import log as logging
from oslo_utils.secretutils import md5
+from oslo_utils import timeutils
from sqlalchemy import case, select, distinct, func
from sqlalchemy.sql.expression import or_, literal_column
@@ -435,6 +436,19 @@ class SQLAlchemyStorage(sqlalchemy_base.SQLAlchemy, storage_base.Storage):
return updated_zone
+ def increment_serial(self, context, zone_id):
+ """Increment the zone's serial number.
+ """
+ new_serial = timeutils.utcnow_ts()
+ query = tables.zones.update().where(
+ tables.zones.c.id == zone_id).values(
+ {'serial': new_serial, 'increment_serial': False}
+ )
+ with sql.get_write_session() as session:
+ session.execute(query)
+ LOG.debug('Incremented zone serial for %s to %d', zone_id, new_serial)
+ return new_serial
+
def delete_zone(self, context, zone_id):
"""
"""
diff --git a/designate/storage/impl_sqlalchemy/alembic/versions/a005af3aa38e_add_increment_serial.py b/designate/storage/impl_sqlalchemy/alembic/versions/a005af3aa38e_add_increment_serial.py
new file mode 100644
index 00000000..e7a84dfc
--- /dev/null
+++ b/designate/storage/impl_sqlalchemy/alembic/versions/a005af3aa38e_add_increment_serial.py
@@ -0,0 +1,38 @@
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""Add increment serial
+
+Revision ID: a005af3aa38e
+Revises: b20189fd288e
+Create Date: 2023-01-21 17:39:00.822775
+
+"""
+from alembic import op
+import sqlalchemy as sa
+
+
+# revision identifiers, used by Alembic.
+revision = 'a005af3aa38e'
+down_revision = 'b20189fd288e'
+branch_labels = None
+depends_on = None
+
+
+def upgrade() -> None:
+ op.add_column(
+ 'zones',
+ sa.Column('increment_serial', sa.Boolean, default=False)
+ )
+ op.create_index(
+ 'increment_serial', 'zones', ['increment_serial']
+ )
diff --git a/designate/storage/impl_sqlalchemy/tables.py b/designate/storage/impl_sqlalchemy/tables.py
index eacaf8d3..aa815ed9 100644
--- a/designate/storage/impl_sqlalchemy/tables.py
+++ b/designate/storage/impl_sqlalchemy/tables.py
@@ -139,6 +139,7 @@ zones = Table('zones', metadata,
Column('pool_id', UUID, default=None, nullable=True),
Column('reverse_name', String(255), nullable=False),
Column('delayed_notify', Boolean, default=False),
+ Column('increment_serial', Boolean, default=False),
UniqueConstraint('name', 'deleted', 'pool_id', name='unique_zone_name'),
ForeignKeyConstraint(['parent_zone_id'],
diff --git a/designate/tests/test_api/test_v2/test_recordsets.py b/designate/tests/test_api/test_v2/test_recordsets.py
index fa03e934..58523226 100644
--- a/designate/tests/test_api/test_v2/test_recordsets.py
+++ b/designate/tests/test_api/test_v2/test_recordsets.py
@@ -17,6 +17,7 @@ from unittest.mock import patch
from oslo_log import log as logging
import oslo_messaging as messaging
+from oslo_utils import timeutils
from designate.central import service as central_service
from designate import exceptions
@@ -438,10 +439,9 @@ class ApiV2RecordSetsTest(ApiV2TestCase):
self.client.delete(url, status=202, headers={'X-Test-Role': 'member'})
# Simulate the zone having been deleted on the backend
- zone_serial = self.central_service.get_zone(
- self.admin_context, zone['id']).serial
self.central_service.update_status(
- self.admin_context, zone['id'], 'SUCCESS', zone_serial, 'UPDATE'
+ self.admin_context, zone['id'], 'SUCCESS', timeutils.utcnow_ts(),
+ 'DELETE'
)
# Try to get the record and ensure that we get a
diff --git a/designate/tests/test_central/test_service.py b/designate/tests/test_central/test_service.py
index 846684fa..c488729f 100644
--- a/designate/tests/test_central/test_service.py
+++ b/designate/tests/test_central/test_service.py
@@ -19,6 +19,7 @@ from collections import namedtuple
from concurrent import futures
import copy
import datetime
+
import futurist
import random
import unittest
@@ -33,7 +34,6 @@ from oslo_messaging.rpc import dispatcher as rpc_dispatcher
from oslo_utils import timeutils
from oslo_versionedobjects import exception as ovo_exc
import testtools
-from testtools.matchers import GreaterThan
from designate import exceptions
from designate import objects
@@ -890,7 +890,7 @@ class CentralServiceTest(CentralTestCase):
def test_update_zone(self, mock_notifier):
# Create a zone
zone = self.create_zone(email='info@example.org')
- original_serial = zone.serial
+ self.assertFalse(zone.increment_serial)
# Update the object
zone.email = 'info@example.net'
@@ -906,7 +906,7 @@ class CentralServiceTest(CentralTestCase):
self.admin_context, zone.id)
# Ensure the zone was updated correctly
- self.assertGreater(zone.serial, original_serial)
+ self.assertTrue(zone.increment_serial)
self.assertEqual('info@example.net', zone.email)
self.assertEqual(2, mock_notifier.call_count)
@@ -931,7 +931,7 @@ class CentralServiceTest(CentralTestCase):
def test_update_zone_without_incrementing_serial(self):
# Create a zone
zone = self.create_zone(email='info@example.org')
- original_serial = zone.serial
+ self.assertFalse(zone.increment_serial)
# Update the object
zone.email = 'info@example.net'
@@ -944,7 +944,7 @@ class CentralServiceTest(CentralTestCase):
zone = self.central_service.get_zone(self.admin_context, zone.id)
# Ensure the zone was updated correctly
- self.assertEqual(original_serial, zone.serial)
+ self.assertFalse(zone.increment_serial)
self.assertEqual('info@example.net', zone.email)
def test_update_zone_name_fail(self):
@@ -965,7 +965,7 @@ class CentralServiceTest(CentralTestCase):
def test_update_zone_deadlock_retry(self):
# Create a zone
zone = self.create_zone(name='example.org.')
- original_serial = zone.serial
+ self.assertFalse(zone.increment_serial)
# Update the Object
zone.email = 'info@example.net'
@@ -992,7 +992,7 @@ class CentralServiceTest(CentralTestCase):
self.assertTrue(i[0])
# Ensure the zone was updated correctly
- self.assertGreater(zone.serial, original_serial)
+ self.assertTrue(zone.increment_serial)
self.assertEqual('info@example.net', zone.email)
@mock.patch.object(notifier.Notifier, "info")
@@ -1457,7 +1457,7 @@ class CentralServiceTest(CentralTestCase):
# RecordSet Tests
def test_create_recordset(self):
zone = self.create_zone()
- original_serial = zone.serial
+ self.assertFalse(zone.increment_serial)
# Create the Object
recordset = objects.RecordSet(name='www.%s' % zone.name, type='A')
@@ -1469,7 +1469,6 @@ class CentralServiceTest(CentralTestCase):
# Get the zone again to check if serial increased
updated_zone = self.central_service.get_zone(self.admin_context,
zone.id)
- new_serial = updated_zone.serial
# Ensure all values have been set correctly
self.assertIsNotNone(recordset.id)
@@ -1479,7 +1478,7 @@ class CentralServiceTest(CentralTestCase):
self.assertIsNotNone(recordset.records)
# The serial number does not get updated is there are no records
# in the recordset
- self.assertEqual(original_serial, new_serial)
+ self.assertFalse(updated_zone.increment_serial)
def test_create_recordset_shared_zone(self):
zone = self.create_zone()
@@ -1546,15 +1545,15 @@ class CentralServiceTest(CentralTestCase):
def test_create_recordset_with_records(self):
zone = self.create_zone()
- original_serial = zone.serial
+ self.assertFalse(zone.increment_serial)
# Create the Object
recordset = objects.RecordSet(
name='www.%s' % zone.name,
type='A',
records=objects.RecordList(objects=[
- objects.Record(data='192.3.3.15'),
- objects.Record(data='192.3.3.16'),
+ objects.Record(data='192.0.2.15'),
+ objects.Record(data='192.0.2.16'),
])
)
@@ -1565,14 +1564,13 @@ class CentralServiceTest(CentralTestCase):
# Get updated serial number
updated_zone = self.central_service.get_zone(self.admin_context,
zone.id)
- new_serial = updated_zone.serial
# Ensure all values have been set correctly
self.assertIsNotNone(recordset.records)
self.assertEqual(2, len(recordset.records))
self.assertIsNotNone(recordset.records[0].id)
self.assertIsNotNone(recordset.records[1].id)
- self.assertThat(new_serial, GreaterThan(original_serial))
+ self.assertTrue(updated_zone.increment_serial)
def test_create_recordset_over_quota(self):
# SOA, NS recordsets exist by default.
@@ -1851,7 +1849,7 @@ class CentralServiceTest(CentralTestCase):
def test_update_recordset(self):
# Create a zone
zone = self.create_zone()
- original_serial = zone.serial
+ self.assertFalse(zone.increment_serial)
# Create a recordset
recordset = self.create_recordset(zone)
@@ -1865,7 +1863,7 @@ class CentralServiceTest(CentralTestCase):
# Get zone again to verify that serial number was updated
updated_zone = self.central_service.get_zone(self.admin_context,
zone.id)
- new_serial = updated_zone.serial
+ self.assertTrue(updated_zone.increment_serial)
# Fetch the resource again
recordset = self.central_service.get_recordset(
@@ -1873,7 +1871,6 @@ class CentralServiceTest(CentralTestCase):
# Ensure the new value took
self.assertEqual(1800, recordset.ttl)
- self.assertThat(new_serial, GreaterThan(original_serial))
@unittest.expectedFailure # FIXME
def test_update_recordset_deadlock_retry(self):
@@ -1936,7 +1933,7 @@ class CentralServiceTest(CentralTestCase):
def test_update_recordset_with_record_delete(self):
# Create a zone
zone = self.create_zone()
- original_serial = zone.serial
+ self.assertFalse(zone.increment_serial)
# Create a recordset and two records
records = [
@@ -1960,12 +1957,11 @@ class CentralServiceTest(CentralTestCase):
# Fetch the Zone again
updated_zone = self.central_service.get_zone(self.admin_context,
zone.id)
- new_serial = updated_zone.serial
# Ensure two Records are attached to the RecordSet correctly
self.assertEqual(1, len(recordset.records))
self.assertIsNotNone(recordset.records[0].id)
- self.assertThat(new_serial, GreaterThan(original_serial))
+ self.assertTrue(updated_zone.increment_serial)
def test_update_recordset_with_record_update(self):
# Create a zone
@@ -2066,7 +2062,7 @@ class CentralServiceTest(CentralTestCase):
def test_update_recordset_shared_zone(self):
# Create a zone
zone = self.create_zone()
- original_serial = zone.serial
+ self.assertFalse(zone.increment_serial)
context = self.get_context(project_id='1', roles=['member', 'reader'])
self.share_zone(context=self.admin_context, zone_id=zone.id,
@@ -2084,7 +2080,9 @@ class CentralServiceTest(CentralTestCase):
# Get zone again to verify that serial number was updated
updated_zone = self.central_service.get_zone(self.admin_context,
zone.id)
- new_serial = updated_zone.serial
+
+ # Ensure that we are incrementing the zone serial
+ self.assertTrue(updated_zone.increment_serial)
# Fetch the resource again
recordset = self.central_service.get_recordset(
@@ -2092,11 +2090,10 @@ class CentralServiceTest(CentralTestCase):
# Ensure the new value took
self.assertEqual(1800, recordset.ttl)
- self.assertThat(new_serial, GreaterThan(original_serial))
def test_delete_recordset(self):
zone = self.create_zone()
- original_serial = zone.serial
+ self.assertFalse(zone.increment_serial)
# Create a recordset
recordset = self.create_recordset(zone)
@@ -2116,8 +2113,7 @@ class CentralServiceTest(CentralTestCase):
# Fetch the zone again to verify serial number increased
updated_zone = self.central_service.get_zone(self.admin_context,
zone.id)
- new_serial = updated_zone.serial
- self.assertThat(new_serial, GreaterThan(original_serial))
+ self.assertTrue(updated_zone.increment_serial)
def test_delete_recordset_without_incrementing_serial(self):
zone = self.create_zone()
@@ -2219,8 +2215,8 @@ class CentralServiceTest(CentralTestCase):
name='www.%s' % zone.name,
type='A',
records=objects.RecordList(objects=[
- objects.Record(data='192.3.3.15'),
- objects.Record(data='192.3.3.16'),
+ objects.Record(data='203.0.113.15'),
+ objects.Record(data='203.0.113.16'),
])
)
@@ -2401,7 +2397,7 @@ class CentralServiceTest(CentralTestCase):
# Ensure that the record is still in DB (No invalidation)
self.central_service.find_records(elevated_a, criterion)
- # Now give the fip id to tenant 'b' and see that it get's deleted
+ # Now give the fip id to tenant 'b' and see that it gets deleted
self.network_api.fake.allocate_floatingip(
context_b.project_id, fip['id'])
@@ -2411,8 +2407,10 @@ class CentralServiceTest(CentralTestCase):
self.assertIsNone(fip_ptr['ptrdname'])
# Simulate the invalidation on the backend
- zone_serial = self.central_service.get_zone(
- elevated_a, zone_id).serial
+ zone = self.central_service.get_zone(
+ elevated_a, zone_id)
+ zone_serial = self.central_service.increment_zone_serial(
+ elevated_a, zone)
self.central_service.update_status(
elevated_a, zone_id, 'SUCCESS', zone_serial, 'UPDATE')
@@ -2482,10 +2480,8 @@ class CentralServiceTest(CentralTestCase):
elevated_a, criterion)[0].zone_id
# Simulate the update on the backend
- zone_serial = self.central_service.get_zone(
- elevated_a, zone_id).serial
self.central_service.update_status(
- elevated_a, zone_id, 'SUCCESS', zone_serial, 'UPDATE')
+ elevated_a, zone_id, 'SUCCESS', timeutils.utcnow_ts(), 'UPDATE')
self.network_api.fake.deallocate_floatingip(fip['id'])
@@ -2495,7 +2491,7 @@ class CentralServiceTest(CentralTestCase):
# Ensure that the record is still in DB (No invalidation)
self.central_service.find_records(elevated_a, criterion)
- # Now give the fip id to tenant 'b' and see that it get's deleted
+ # Now give the fip id to tenant 'b' and see that it gets deleted
self.network_api.fake.allocate_floatingip(
context_b.project_id, fip['id'])
@@ -2505,10 +2501,8 @@ class CentralServiceTest(CentralTestCase):
self.assertIsNone(fips[0]['ptrdname'])
# Simulate the invalidation on the backend
- zone_serial = self.central_service.get_zone(
- elevated_a, zone_id).serial
self.central_service.update_status(
- elevated_a, zone_id, 'SUCCESS', zone_serial, 'UPDATE')
+ elevated_a, zone_id, 'SUCCESS', timeutils.utcnow_ts(), 'UPDATE')
record = self.central_service.find_records(elevated_a, criterion)[0]
self.assertEqual('NONE', record.action)
@@ -3959,3 +3953,91 @@ class CentralServiceTest(CentralTestCase):
retrived_shared_zone.target_project_id)
self.assertEqual(shared_zone.project_id,
retrived_shared_zone.project_id)
+
+ def test_batch_increment_serial(self):
+ zone = self.create_zone()
+ zone_serial = zone.serial
+ self.assertFalse(zone.increment_serial)
+
+ for index in range(10):
+ recordset = objects.RecordSet(
+ name='www.%d.%s' % (index, zone.name),
+ type='A',
+ records=objects.RecordList(objects=[
+ objects.Record(data='192.0.2.%d' % index),
+ objects.Record(data='198.51.100.%d' % index),
+ ])
+ )
+ self.central_service.create_recordset(
+ self.admin_context, zone.id, recordset=recordset
+ )
+
+ updated_zone = self.central_service.get_zone(
+ self.admin_context, zone.id
+ )
+ recordsets = self.central_service.find_recordsets(
+ self.admin_context,
+ criterion={'zone_id': zone.id, 'type': 'A'}
+ )
+
+ # Increment serial hasn't been triggered yet.
+ self.assertEqual(zone_serial, updated_zone.serial)
+ self.assertTrue(updated_zone.increment_serial)
+
+ self.assertEqual('PENDING', updated_zone.status)
+ self.assertEqual(10, len(recordsets))
+ for recordset in recordsets:
+ self.assertEqual('PENDING', recordset.status)
+ self.assertEqual(2, len(recordset.records))
+ for record in recordset.records:
+ self.assertEqual('PENDING', record.status)
+
+ # Increment serial (Producer -> Central) for zone.
+ with mock.patch.object(timeutils, 'utcnow_ts',
+ return_value=zone_serial + 5):
+ self.central_service.increment_zone_serial(
+ self.admin_context, zone
+ )
+
+ updated_zone = self.central_service.get_zone(
+ self.admin_context, zone.id
+ )
+ recordsets = self.central_service.find_recordsets(
+ self.admin_context,
+ criterion={'zone_id': zone.id, 'type': 'A'}
+ )
+
+ # Ensure that serial is now correct.
+ self.assertEqual(zone_serial + 5, updated_zone.serial)
+ self.assertFalse(updated_zone.increment_serial)
+
+ # But the zone is still in pending status as we haven't notified
+ # the upstream dns servers yet.
+ self.assertEqual('PENDING', updated_zone.status)
+ for recordset in recordsets:
+ self.assertEqual('PENDING', recordset.status)
+ for record in recordset.records:
+ self.assertEqual('PENDING', record.status)
+
+ # Trigger update_status (Producer -> Worker -> Central).
+ # This happens after the upstream DNS servers have been notified
+ # and updated.
+ self.central_service.update_status(
+ self.admin_context, zone.id, 'SUCCESS', updated_zone.serial
+ )
+
+ updated_zone = self.central_service.get_zone(
+ self.admin_context, zone.id
+ )
+ recordsets = self.central_service.find_recordsets(
+ self.admin_context,
+ criterion={'zone_id': zone.id, 'type': 'A'}
+ )
+
+ # Validate that the status is now ACTIVE.
+ self.assertEqual('ACTIVE', updated_zone.status)
+ self.assertEqual(zone_serial + 5, updated_zone.serial)
+ for recordset in recordsets:
+ self.assertEqual('ACTIVE', recordset.status)
+ for record in recordset.records:
+ self.assertEqual('ACTIVE', record.status)
diff --git a/designate/tests/test_producer/test_tasks.py b/designate/tests/test_producer/test_tasks.py
index 99f8f6f4..0aaebb3c 100644
--- a/designate/tests/test_producer/test_tasks.py
+++ b/designate/tests/test_producer/test_tasks.py
@@ -15,6 +15,7 @@
# under the License.
import datetime
+from unittest import mock
from oslo_log import log as logging
from oslo_utils import timeutils
@@ -24,6 +25,7 @@ from designate.storage.impl_sqlalchemy import tables
from designate.storage import sql
from designate.tests import fixtures
from designate.tests import TestCase
+from designate.worker import rpcapi as worker_api
LOG = logging.getLogger(__name__)
@@ -39,7 +41,7 @@ class DeletedZonePurgeTest(TestCase):
self.config(
time_threshold=self.time_threshold,
batch_size=self.batch_size,
- group="producer_task:zone_purge"
+ group='producer_task:zone_purge'
)
self.purge_task_fixture = self.useFixture(
fixtures.ZoneManagerTaskFixture(tasks.DeletedZonePurgeTask)
@@ -77,7 +79,7 @@ class DeletedZonePurgeTest(TestCase):
age = index * (self.time_threshold // self.number_of_zones * 2) - 1
delta = datetime.timedelta(seconds=age)
deletion_time = now - delta
- name = "example%d.org." % index
+ name = 'example%d.org.' % index
self._create_deleted_zone(name, deletion_time)
def test_purge_zones(self):
@@ -101,9 +103,8 @@ class PeriodicGenerateDelayedNotifyTaskTest(TestCase):
super(PeriodicGenerateDelayedNotifyTaskTest, self).setUp()
self.config(quota_zones=self.number_of_zones)
self.config(
- interval=1,
batch_size=self.batch_size,
- group="producer_task:delayed_notify"
+ group='producer_task:delayed_notify'
)
self.generate_delayed_notify_task_fixture = self.useFixture(
fixtures.ZoneManagerTaskFixture(
@@ -123,7 +124,7 @@ class PeriodicGenerateDelayedNotifyTaskTest(TestCase):
def _create_zones(self):
# Create a number of zones; half of them with delayed_notify set.
for index in range(self.number_of_zones):
- name = "example%d.org." % index
+ name = 'example%d.org.' % index
delayed_notify = (index % 2 == 0)
self.create_zone(
name=name,
@@ -149,3 +150,43 @@ class PeriodicGenerateDelayedNotifyTaskTest(TestCase):
remaining, len(zones),
message='Remaining zones: %s' % zones
)
+
+
+class PeriodicIncrementSerialTaskTest(TestCase):
+ number_of_zones = 20
+ batch_size = 20
+
+ def setUp(self):
+ super(PeriodicIncrementSerialTaskTest, self).setUp()
+ self.worker_api = mock.Mock()
+ mock.patch.object(worker_api.WorkerAPI, 'get_instance',
+ return_value=self.worker_api).start()
+ self.config(quota_zones=self.number_of_zones)
+ self.config(
+ batch_size=self.batch_size,
+ group='producer_task:increment_serial'
+ )
+ self.increment_serial_task_fixture = self.useFixture(
+ fixtures.ZoneManagerTaskFixture(
+ tasks.PeriodicIncrementSerialTask
+ )
+ )
+
+ def _create_zones(self):
+ for index in range(self.number_of_zones):
+ name = 'example%d.org.' % index
+ increment_serial = (index % 2 == 0)
+ delayed_notify = (index % 4 == 0)
+ self.create_zone(
+ name=name,
+ increment_serial=increment_serial,
+ delayed_notify=delayed_notify,
+ )
+
+ def test_increment_serial(self):
+ self._create_zones()
+
+ self.increment_serial_task_fixture.task()
+
+ self.worker_api.update_zone.assert_called()
+ self.assertEqual(5, self.worker_api.update_zone.call_count)
diff --git a/designate/tests/test_storage/test_sqlalchemy.py b/designate/tests/test_storage/test_sqlalchemy.py
index 95122396..9cd05b04 100644
--- a/designate/tests/test_storage/test_sqlalchemy.py
+++ b/designate/tests/test_storage/test_sqlalchemy.py
@@ -113,6 +113,7 @@ class SqlalchemyStorageTest(StorageTestCase, TestCase):
},
"zones": {
"delayed_notify": "CREATE INDEX delayed_notify ON zones (delayed_notify)", # noqa
+ "increment_serial": "CREATE INDEX increment_serial ON zones (increment_serial)", # noqa
"reverse_name_deleted": "CREATE INDEX reverse_name_deleted ON zones (reverse_name, deleted)", # noqa
"zone_created_at": "CREATE INDEX zone_created_at ON zones (created_at)", # noqa
"zone_deleted": "CREATE INDEX zone_deleted ON zones (deleted)",
diff --git a/designate/tests/unit/producer/test_tasks.py b/designate/tests/unit/producer/test_tasks.py
index a4461ded..f7b69f14 100644
--- a/designate/tests/unit/producer/test_tasks.py
+++ b/designate/tests/unit/producer/test_tasks.py
@@ -31,7 +31,9 @@ from designate import context
from designate.producer import tasks
from designate import rpc
from designate.tests.unit import RoObject
+from designate.tests.unit import RwObject
from designate.utils import generate_uuid
+from designate.worker import rpcapi as worker_api
DUMMY_TASK_GROUP = cfg.OptGroup(
name='producer_task:dummy',
@@ -244,3 +246,82 @@ class PeriodicSecondaryRefreshTest(oslotest.base.BaseTestCase):
self.task()
self.assertFalse(self.central.xfr_zone.called)
+
+
+class PeriodicIncrementSerialTest(oslotest.base.BaseTestCase):
+ def setUp(self):
+ super(PeriodicIncrementSerialTest, self).setUp()
+ self.useFixture(cfg_fixture.Config(CONF))
+
+ self.central_api = mock.Mock()
+ self.context = mock.Mock()
+ self.worker_api = mock.Mock()
+ mock.patch.object(worker_api.WorkerAPI, 'get_instance',
+ return_value=self.worker_api).start()
+ mock.patch.object(central_api.CentralAPI, 'get_instance',
+ return_value=self.central_api).start()
+ mock.patch.object(context.DesignateContext, 'get_admin_context',
+ return_value=self.context).start()
+ self.central_api.increment_zone_serial.return_value = 123
+ self.task = tasks.PeriodicIncrementSerialTask()
+ self.task.my_partitions = 0, 9
+
+ def test_increment_zone(self):
+ zone = RoObject(
+ id=generate_uuid(),
+ action='CREATE',
+ increment_serial=True,
+ delayed_notify=False,
+ )
+ self.central_api.find_zones.return_value = [zone]
+
+ self.task()
+
+ self.central_api.increment_zone_serial.assert_called()
+ self.worker_api.update_zone.assert_called()
+
+ def test_increment_zone_with_action_none(self):
+ zone = RwObject(
+ id=generate_uuid(),
+ action='NONE',
+ status='ACTIVE',
+ increment_serial=True,
+ delayed_notify=False,
+ )
+ self.central_api.find_zones.return_value = [zone]
+
+ self.task()
+
+ self.central_api.increment_zone_serial.assert_called()
+ self.worker_api.update_zone.assert_called()
+
+ self.assertEqual('UPDATE', zone.action)
+ self.assertEqual('PENDING', zone.status)
+
+ def test_increment_zone_with_delayed_notify(self):
+ zone = RoObject(
+ id=generate_uuid(),
+ action='CREATE',
+ increment_serial=True,
+ delayed_notify=True,
+ )
+ self.central_api.find_zones.return_value = [zone]
+
+ self.task()
+
+ self.central_api.increment_zone_serial.assert_called()
+ self.worker_api.update_zone.assert_not_called()
+
+ def test_increment_zone_skip_deleted(self):
+ zone = RoObject(
+ id=generate_uuid(),
+ action='DELETE',
+ increment_serial=True,
+ delayed_notify=False,
+ )
+ self.central_api.find_zones.return_value = [zone]
+
+ self.task()
+
+ self.central_api.increment_zone_serial.assert_not_called()
+ self.worker_api.update_zone.assert_not_called()
diff --git a/designate/tests/unit/test_central/test_basic.py b/designate/tests/unit/test_central/test_basic.py
index 8a8068e5..3726406d 100644
--- a/designate/tests/unit/test_central/test_basic.py
+++ b/designate/tests/unit/test_central/test_basic.py
@@ -179,6 +179,7 @@ class MockRecordSet(object):
ttl = 1
type = "PRIMARY"
serial = 123
+ records = []
def obj_attr_is_set(self, n):
if n == 'records':
@@ -418,8 +419,9 @@ class CentralServiceTestCase(CentralBasic):
central_service._is_valid_ttl = mock.Mock()
central_service.storage.create_recordset = mock.Mock(return_value='rs')
- central_service._update_zone_in_storage = mock.Mock()
-
+ central_service._update_zone_in_storage = mock.Mock(
+ return_value=Mockzone()
+ )
recordset = mock.Mock(spec=objects.RecordSet)
recordset.obj_attr_is_set.return_value = True
recordset.records = [MockRecord()]
@@ -440,7 +442,9 @@ class CentralServiceTestCase(CentralBasic):
self.service._is_valid_ttl = mock.Mock()
self.service.storage.create_recordset = mock.Mock(return_value='rs')
- self.service._update_zone_in_storage = mock.Mock()
+ self.service._update_zone_in_storage = mock.Mock(
+ return_value=Mockzone()
+ )
# NOTE(thirose): Since this is a race condition we assume that
# we will hit it if we try to do the operations in a loop 100 times.
@@ -1506,8 +1510,6 @@ class CentralZoneTestCase(CentralBasic):
self.service.delete_recordset(self.context,
CentralZoneTestCase.zone__id_2,
CentralZoneTestCase.recordset__id)
- self.assertTrue(
- self.service.worker_api.update_zone.called)
self.assertTrue(
self.service._delete_recordset_in_storage.called)
@@ -1524,6 +1526,7 @@ class CentralZoneTestCase(CentralBasic):
action='',
status='',
serial=0,
+ increment_serial=False,
)
])
)
@@ -1533,7 +1536,7 @@ class CentralZoneTestCase(CentralBasic):
self.assertEqual(1, len(rs.records))
self.assertEqual('DELETE', rs.records[0].action)
self.assertEqual('PENDING', rs.records[0].status)
- self.assertEqual(1, rs.records[0].serial)
+ self.assertTrue(rs.records[0].serial, 1)
def test_delete_recordset_in_storage_no_increment_serial(self):
self.service._update_zone_in_storage = mock.Mock()
diff --git a/designate/tests/unit/test_utils.py b/designate/tests/unit/test_utils.py
index 4734d004..42dfb298 100644
--- a/designate/tests/unit/test_utils.py
+++ b/designate/tests/unit/test_utils.py
@@ -16,7 +16,6 @@ import jinja2
from oslo_concurrency import processutils
from oslo_config import cfg
from oslo_config import fixture as cfg_fixture
-from oslo_utils import timeutils
import oslotest.base
from designate import exceptions
@@ -213,22 +212,6 @@ class TestUtils(oslotest.base.BaseTestCase):
self.assertEqual('Hello World', result)
- @mock.patch.object(timeutils, 'utcnow_ts')
- def test_increment_serial_lower_than_ts(self, mock_utcnow_ts):
- mock_utcnow_ts.return_value = 1561698354
-
- ret_serial = utils.increment_serial(serial=1)
-
- self.assertEqual(1561698354, ret_serial)
-
- @mock.patch.object(timeutils, 'utcnow_ts')
- def test_increment_serial_higher_than_ts(self, mock_utcnow_ts):
- mock_utcnow_ts.return_value = 1561698354
-
- ret_serial = utils.increment_serial(serial=1561698354 * 2)
-
- self.assertEqual(1561698354 * 2 + 1, ret_serial)
-
def test_is_uuid_like(self):
self.assertTrue(
utils.is_uuid_like('ce9fcd6b-d546-4397-8a49-8ceaec37cb64')
diff --git a/designate/utils.py b/designate/utils.py
index a14a6741..dd0198c1 100644
--- a/designate/utils.py
+++ b/designate/utils.py
@@ -26,7 +26,6 @@ from oslo_config import cfg
from oslo_log import log as logging
from oslo_serialization import jsonutils
from oslo_utils.netutils import is_valid_ipv6
-from oslo_utils import timeutils
from oslo_utils import uuidutils
import pkg_resources
@@ -119,16 +118,6 @@ def execute(*cmd, **kw):
root_helper=root_helper, **kw)
-def increment_serial(serial=0):
- # This provides for *roughly* unix timestamp based serial numbers
- new_serial = timeutils.utcnow_ts()
-
- if new_serial <= serial:
- new_serial = serial + 1
-
- return new_serial
-
-
def deep_dict_merge(a, b):
if not isinstance(b, dict):
return b
diff --git a/designate/worker/tasks/zone.py b/designate/worker/tasks/zone.py
index 0ee0b3a3..a7aeede6 100644
--- a/designate/worker/tasks/zone.py
+++ b/designate/worker/tasks/zone.py
@@ -26,7 +26,6 @@ from oslo_utils import timeutils
from designate import dnsutils
from designate import exceptions
from designate import objects
-from designate import utils
from designate.worker.tasks import base
LOG = logging.getLogger(__name__)
@@ -795,11 +794,10 @@ class RecoverShard(base.Task):
# Include things that have been hanging out in PENDING
# status for longer than they should
# Generate the current serial, will provide a UTC Unix TS.
- current = utils.increment_serial()
stale_criterion = {
'shard': "BETWEEN %s,%s" % (self.begin_shard, self.end_shard),
'status': 'PENDING',
- 'serial': "<%s" % (current - self.max_prop_time)
+ 'serial': "<%s" % (timeutils.utcnow_ts() - self.max_prop_time)
}
stale_zones = self.storage.find_zones(self.context, stale_criterion)
diff --git a/releasenotes/notes/batch-increment-serial-07485eb3bbbac6c3.yaml b/releasenotes/notes/batch-increment-serial-07485eb3bbbac6c3.yaml
new file mode 100644
index 00000000..04293940
--- /dev/null
+++ b/releasenotes/notes/batch-increment-serial-07485eb3bbbac6c3.yaml
@@ -0,0 +1,6 @@
+---
+features:
+ - |
+ Moved zone serial updates to a `designate-producer` task called
+ `increment_serial` to fix race conditions and to reduce the number of
+ updates to the upstream DNS servers when performing multiple DNS updates.
diff --git a/setup.cfg b/setup.cfg
index 92f789f1..65df4666 100644
--- a/setup.cfg
+++ b/setup.cfg
@@ -118,6 +118,7 @@ designate.producer_tasks =
periodic_exists = designate.producer.tasks:PeriodicExistsTask
periodic_secondary_refresh = designate.producer.tasks:PeriodicSecondaryRefreshTask
delayed_notify = designate.producer.tasks:PeriodicGenerateDelayedNotifyTask
+ increment_serial = designate.producer.tasks:PeriodicIncrementSerialTask
worker_periodic_recovery = designate.producer.tasks:WorkerPeriodicRecovery
designate.heartbeat_emitter =