diff options
Diffstat (limited to 'designate/central/service.py')
-rw-r--r-- | designate/central/service.py | 223 |
1 files changed, 46 insertions, 177 deletions
diff --git a/designate/central/service.py b/designate/central/service.py index 05173539..34e39338 100644 --- a/designate/central/service.py +++ b/designate/central/service.py @@ -14,16 +14,12 @@ # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. -import collections import copy -import functools -import itertools import random from random import SystemRandom import re import signal import string -import threading import time from dns import exception as dnsexception @@ -33,16 +29,16 @@ from oslo_log import log as logging import oslo_messaging as messaging from designate.common import constants -from designate import context as dcontext +from designate.common.decorators import lock +from designate.common.decorators import notification +from designate.common.decorators import rpc from designate import coordination from designate import dnsutils from designate import exceptions from designate import network_api -from designate import notifications from designate import objects from designate import policy from designate import quota -from designate import rpc from designate import scheduler from designate import service from designate import storage @@ -51,135 +47,7 @@ from designate.storage import transaction_shallow_copy from designate import utils from designate.worker import rpcapi as worker_rpcapi - LOG = logging.getLogger(__name__) -ZONE_LOCKS = threading.local() -NOTIFICATION_BUFFER = threading.local() - - -def synchronized_zone(zone_arg=1, new_zone=False): - """Ensures only a single operation is in progress for each zone - - A Decorator which ensures only a single operation can be happening - on a single zone at once, within the current designate-central instance - """ - def outer(f): - @functools.wraps(f) - def sync_wrapper(self, *args, **kwargs): - if not hasattr(ZONE_LOCKS, 'held'): - # Create the held set if necessary - ZONE_LOCKS.held = set() - - zone_id = None - - if 'zone_id' in kwargs: - zone_id = kwargs['zone_id'] - elif 'zone' in kwargs: - zone_id = kwargs['zone'].id - elif 'recordset' in kwargs: - zone_id = kwargs['recordset'].zone_id - elif 'record' in kwargs: - zone_id = kwargs['record'].zone_id - - # The various objects won't always have an ID set, we should - # attempt to locate an Object containing the ID. - if zone_id is None: - for arg in itertools.chain(kwargs.values(), args): - if isinstance(arg, objects.Zone): - zone_id = arg.id - if zone_id: - break - elif (isinstance(arg, objects.RecordSet) or - isinstance(arg, objects.Record) or - isinstance(arg, objects.ZoneTransferRequest) or - isinstance(arg, objects.ZoneTransferAccept)): - zone_id = arg.zone_id - if zone_id: - break - - # If we still don't have an ID, find the Nth argument as - # defined by the zone_arg decorator option. - if not zone_id and len(args) > zone_arg: - zone_id = args[zone_arg] - if isinstance(zone_id, objects.Zone): - # If the value is a Zone object, extract it's ID. - zone_id = zone_id.id - - if new_zone and not zone_id: - lock_name = 'create-new-zone' - elif not new_zone and zone_id: - lock_name = 'zone-%s' % zone_id - else: - raise Exception('Failed to determine zone id for ' - 'synchronized operation') - - if zone_id in ZONE_LOCKS.held: - return f(self, *args, **kwargs) - - with self.coordination.get_lock(lock_name): - try: - ZONE_LOCKS.held.add(zone_id) - return f(self, *args, **kwargs) - finally: - ZONE_LOCKS.held.remove(zone_id) - - sync_wrapper.__wrapped_function = f - sync_wrapper.__wrapper_name = 'synchronized_zone' - return sync_wrapper - - return outer - - -def notification(notification_type): - def outer(f): - @functools.wraps(f) - def notification_wrapper(self, *args, **kwargs): - if not hasattr(NOTIFICATION_BUFFER, 'queue'): - # Create the notifications queue if necessary - NOTIFICATION_BUFFER.stack = 0 - NOTIFICATION_BUFFER.queue = collections.deque() - - NOTIFICATION_BUFFER.stack += 1 - - try: - # Find the context argument - context = dcontext.DesignateContext.\ - get_context_from_function_and_args(f, args, kwargs) - - # Call the wrapped function - result = f(self, *args, **kwargs) - - # Feed the args/result to a notification plugin - # to determine what is emitted - payloads = notifications.get_plugin().emit( - notification_type, context, result, args, kwargs) - - # Enqueue the notification - for payload in payloads: - LOG.debug('Queueing notification for %(type)s ', - {'type': notification_type}) - NOTIFICATION_BUFFER.queue.appendleft( - (context, notification_type, payload,)) - - return result - - finally: - NOTIFICATION_BUFFER.stack -= 1 - - if NOTIFICATION_BUFFER.stack == 0: - LOG.debug('Emitting %(count)d notifications', - {'count': len(NOTIFICATION_BUFFER.queue)}) - # Send the queued notifications, in order. - for value in NOTIFICATION_BUFFER.queue: - LOG.debug('Emitting %(type)s notification', - {'type': value[1]}) - self.notifier.info(value[0], value[1], value[2]) - - # Reset the queue - NOTIFICATION_BUFFER.queue.clear() - - return notification_wrapper - return outer class Service(service.RPCService): @@ -188,6 +56,9 @@ class Service(service.RPCService): target = messaging.Target(version=RPC_API_VERSION) def __init__(self): + self.zone_lock_local = lock.ZoneLockLocal() + self.notification_thread_local = notification.NotificationThreadLocal() + self._scheduler = None self._storage = None self._quota = None @@ -196,11 +67,9 @@ class Service(service.RPCService): self.service_name, cfg.CONF['service:central'].topic, threads=cfg.CONF['service:central'].threads, ) - self.coordination = coordination.Coordination( self.service_name, self.tg, grouping_enabled=False ) - self.network_api = network_api.get_network_api(cfg.CONF.network_api) @property @@ -713,7 +582,7 @@ class Service(service.RPCService): # TLD Methods @rpc.expected_exceptions() - @notification('dns.tld.create') + @notification.notify_type('dns.tld.create') @transaction def create_tld(self, context, tld): policy.check('create_tld', context) @@ -738,7 +607,7 @@ class Service(service.RPCService): return self.storage.get_tld(context, tld_id) @rpc.expected_exceptions() - @notification('dns.tld.update') + @notification.notify_type('dns.tld.update') @transaction def update_tld(self, context, tld): target = { @@ -751,7 +620,7 @@ class Service(service.RPCService): return tld @rpc.expected_exceptions() - @notification('dns.tld.delete') + @notification.notify_type('dns.tld.delete') @transaction def delete_tld(self, context, tld_id): policy.check('delete_tld', context, {'tld_id': tld_id}) @@ -762,7 +631,7 @@ class Service(service.RPCService): # TSIG Key Methods @rpc.expected_exceptions() - @notification('dns.tsigkey.create') + @notification.notify_type('dns.tsigkey.create') @transaction def create_tsigkey(self, context, tsigkey): policy.check('create_tsigkey', context) @@ -788,7 +657,7 @@ class Service(service.RPCService): return self.storage.get_tsigkey(context, tsigkey_id) @rpc.expected_exceptions() - @notification('dns.tsigkey.update') + @notification.notify_type('dns.tsigkey.update') @transaction def update_tsigkey(self, context, tsigkey): target = { @@ -803,7 +672,7 @@ class Service(service.RPCService): return tsigkey @rpc.expected_exceptions() - @notification('dns.tsigkey.delete') + @notification.notify_type('dns.tsigkey.delete') @transaction def delete_tsigkey(self, context, tsigkey_id): policy.check('delete_tsigkey', context, {'tsigkey_id': tsigkey_id}) @@ -862,9 +731,9 @@ class Service(service.RPCService): return pool.ns_records @rpc.expected_exceptions() - @notification('dns.domain.create') - @notification('dns.zone.create') - @synchronized_zone(new_zone=True) + @notification.notify_type('dns.domain.create') + @notification.notify_type('dns.zone.create') + @lock.synchronized_zone(new_zone=True) def create_zone(self, context, zone): """Create zone: perform checks and then call _create_zone() """ @@ -1060,9 +929,9 @@ class Service(service.RPCService): sort_key, sort_dir) @rpc.expected_exceptions() - @notification('dns.domain.update') - @notification('dns.zone.update') - @synchronized_zone() + @notification.notify_type('dns.domain.update') + @notification.notify_type('dns.zone.update') + @lock.synchronized_zone() def update_zone(self, context, zone, increment_serial=True): """Update zone. Perform checks and then call _update_zone() @@ -1134,9 +1003,9 @@ class Service(service.RPCService): return zone @rpc.expected_exceptions() - @notification('dns.domain.delete') - @notification('dns.zone.delete') - @synchronized_zone() + @notification.notify_type('dns.domain.delete') + @notification.notify_type('dns.zone.delete') + @lock.synchronized_zone() def delete_zone(self, context, zone_id): """Delete or abandon a zone On abandon, delete the zone from the DB immediately. @@ -1294,8 +1163,8 @@ class Service(service.RPCService): # RecordSet Methods @rpc.expected_exceptions() - @notification('dns.recordset.create') - @synchronized_zone() + @notification.notify_type('dns.recordset.create') + @lock.synchronized_zone() def create_recordset(self, context, zone_id, recordset, increment_serial=True): zone = self.storage.get_zone(context, zone_id) @@ -1467,8 +1336,8 @@ class Service(service.RPCService): recordsets=recordsets) @rpc.expected_exceptions() - @notification('dns.recordset.update') - @synchronized_zone() + @notification.notify_type('dns.recordset.update') + @lock.synchronized_zone() def update_recordset(self, context, recordset, increment_serial=True): zone_id = recordset.obj_get_original_value('zone_id') zone = self.storage.get_zone(context, zone_id) @@ -1550,8 +1419,8 @@ class Service(service.RPCService): return recordset, zone @rpc.expected_exceptions() - @notification('dns.recordset.delete') - @synchronized_zone() + @notification.notify_type('dns.recordset.delete') + @lock.synchronized_zone() def delete_recordset(self, context, zone_id, recordset_id, increment_serial=True): zone = self.storage.get_zone(context, zone_id) @@ -2049,7 +1918,7 @@ class Service(service.RPCService): # Blacklisted zones @rpc.expected_exceptions() - @notification('dns.blacklist.create') + @notification.notify_type('dns.blacklist.create') @transaction def create_blacklist(self, context, blacklist): policy.check('create_blacklist', context) @@ -2078,7 +1947,7 @@ class Service(service.RPCService): return blacklists @rpc.expected_exceptions() - @notification('dns.blacklist.update') + @notification.notify_type('dns.blacklist.update') @transaction def update_blacklist(self, context, blacklist): target = { @@ -2091,7 +1960,7 @@ class Service(service.RPCService): return blacklist @rpc.expected_exceptions() - @notification('dns.blacklist.delete') + @notification.notify_type('dns.blacklist.delete') @transaction def delete_blacklist(self, context, blacklist_id): policy.check('delete_blacklist', context) @@ -2102,7 +1971,7 @@ class Service(service.RPCService): # Server Pools @rpc.expected_exceptions() - @notification('dns.pool.create') + @notification.notify_type('dns.pool.create') @transaction def create_pool(self, context, pool): # Verify that there is a tenant_id @@ -2141,7 +2010,7 @@ class Service(service.RPCService): return self.storage.get_pool(context, pool_id) @rpc.expected_exceptions() - @notification('dns.pool.update') + @notification.notify_type('dns.pool.update') @transaction def update_pool(self, context, pool): policy.check('update_pool', context) @@ -2202,7 +2071,7 @@ class Service(service.RPCService): return updated_pool @rpc.expected_exceptions() - @notification('dns.pool.delete') + @notification.notify_type('dns.pool.delete') @transaction def delete_pool(self, context, pool_id): @@ -2225,10 +2094,10 @@ class Service(service.RPCService): # Pool Manager Integration @rpc.expected_exceptions() - @notification('dns.domain.update') - @notification('dns.zone.update') + @notification.notify_type('dns.domain.update') + @notification.notify_type('dns.zone.update') @transaction - @synchronized_zone() + @lock.synchronized_zone() def update_status(self, context, zone_id, status, serial, action=None): """ :param context: Security context information. @@ -2356,7 +2225,7 @@ class Service(service.RPCService): return ''.join(sysrand.choice(chars) for _ in range(size)) @rpc.expected_exceptions() - @notification('dns.zone_transfer_request.create') + @notification.notify_type('dns.zone_transfer_request.create') @transaction def create_zone_transfer_request(self, context, zone_transfer_request): @@ -2427,7 +2296,7 @@ class Service(service.RPCService): return requests @rpc.expected_exceptions() - @notification('dns.zone_transfer_request.update') + @notification.notify_type('dns.zone_transfer_request.update') @transaction def update_zone_transfer_request(self, context, zone_transfer_request): @@ -2449,7 +2318,7 @@ class Service(service.RPCService): return request @rpc.expected_exceptions() - @notification('dns.zone_transfer_request.delete') + @notification.notify_type('dns.zone_transfer_request.delete') @transaction def delete_zone_transfer_request(self, context, zone_transfer_request_id): # Get zone transfer request @@ -2469,7 +2338,7 @@ class Service(service.RPCService): zone_transfer_request_id) @rpc.expected_exceptions() - @notification('dns.zone_transfer_accept.create') + @notification.notify_type('dns.zone_transfer_accept.create') @transaction def create_zone_transfer_accept(self, context, zone_transfer_accept): elevated_context = context.elevated(all_tenants=True) @@ -2571,7 +2440,7 @@ class Service(service.RPCService): # Zone Import Methods @rpc.expected_exceptions() - @notification('dns.zone_import.create') + @notification.notify_type('dns.zone_import.create') def create_zone_import(self, context, request_body): if policy.enforce_new_defaults(): target = {constants.RBAC_PROJECT_ID: context.project_id} @@ -2667,7 +2536,7 @@ class Service(service.RPCService): self.update_zone_import(context, zone_import) @rpc.expected_exceptions() - @notification('dns.zone_import.update') + @notification.notify_type('dns.zone_import.update') def update_zone_import(self, context, zone_import): if policy.enforce_new_defaults(): target = {constants.RBAC_PROJECT_ID: zone_import.tenant_id} @@ -2710,7 +2579,7 @@ class Service(service.RPCService): return self.storage.get_zone_import(context, zone_import_id) @rpc.expected_exceptions() - @notification('dns.zone_import.delete') + @notification.notify_type('dns.zone_import.delete') @transaction def delete_zone_import(self, context, zone_import_id): @@ -2733,7 +2602,7 @@ class Service(service.RPCService): # Zone Export Methods @rpc.expected_exceptions() - @notification('dns.zone_export.create') + @notification.notify_type('dns.zone_export.create') def create_zone_export(self, context, zone_id): # Try getting the zone to ensure it exists zone = self.storage.get_zone(context, zone_id) @@ -2797,7 +2666,7 @@ class Service(service.RPCService): return self.storage.get_zone_export(context, zone_export_id) @rpc.expected_exceptions() - @notification('dns.zone_export.update') + @notification.notify_type('dns.zone_export.update') def update_zone_export(self, context, zone_export): if policy.enforce_new_defaults(): @@ -2810,7 +2679,7 @@ class Service(service.RPCService): return self.storage.update_zone_export(context, zone_export) @rpc.expected_exceptions() - @notification('dns.zone_export.delete') + @notification.notify_type('dns.zone_export.delete') @transaction def delete_zone_export(self, context, zone_export_id): |