summaryrefslogtreecommitdiff
path: root/designate/central/service.py
diff options
context:
space:
mode:
Diffstat (limited to 'designate/central/service.py')
-rw-r--r--designate/central/service.py223
1 files changed, 46 insertions, 177 deletions
diff --git a/designate/central/service.py b/designate/central/service.py
index 05173539..34e39338 100644
--- a/designate/central/service.py
+++ b/designate/central/service.py
@@ -14,16 +14,12 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
-import collections
import copy
-import functools
-import itertools
import random
from random import SystemRandom
import re
import signal
import string
-import threading
import time
from dns import exception as dnsexception
@@ -33,16 +29,16 @@ from oslo_log import log as logging
import oslo_messaging as messaging
from designate.common import constants
-from designate import context as dcontext
+from designate.common.decorators import lock
+from designate.common.decorators import notification
+from designate.common.decorators import rpc
from designate import coordination
from designate import dnsutils
from designate import exceptions
from designate import network_api
-from designate import notifications
from designate import objects
from designate import policy
from designate import quota
-from designate import rpc
from designate import scheduler
from designate import service
from designate import storage
@@ -51,135 +47,7 @@ from designate.storage import transaction_shallow_copy
from designate import utils
from designate.worker import rpcapi as worker_rpcapi
-
LOG = logging.getLogger(__name__)
-ZONE_LOCKS = threading.local()
-NOTIFICATION_BUFFER = threading.local()
-
-
-def synchronized_zone(zone_arg=1, new_zone=False):
- """Ensures only a single operation is in progress for each zone
-
- A Decorator which ensures only a single operation can be happening
- on a single zone at once, within the current designate-central instance
- """
- def outer(f):
- @functools.wraps(f)
- def sync_wrapper(self, *args, **kwargs):
- if not hasattr(ZONE_LOCKS, 'held'):
- # Create the held set if necessary
- ZONE_LOCKS.held = set()
-
- zone_id = None
-
- if 'zone_id' in kwargs:
- zone_id = kwargs['zone_id']
- elif 'zone' in kwargs:
- zone_id = kwargs['zone'].id
- elif 'recordset' in kwargs:
- zone_id = kwargs['recordset'].zone_id
- elif 'record' in kwargs:
- zone_id = kwargs['record'].zone_id
-
- # The various objects won't always have an ID set, we should
- # attempt to locate an Object containing the ID.
- if zone_id is None:
- for arg in itertools.chain(kwargs.values(), args):
- if isinstance(arg, objects.Zone):
- zone_id = arg.id
- if zone_id:
- break
- elif (isinstance(arg, objects.RecordSet) or
- isinstance(arg, objects.Record) or
- isinstance(arg, objects.ZoneTransferRequest) or
- isinstance(arg, objects.ZoneTransferAccept)):
- zone_id = arg.zone_id
- if zone_id:
- break
-
- # If we still don't have an ID, find the Nth argument as
- # defined by the zone_arg decorator option.
- if not zone_id and len(args) > zone_arg:
- zone_id = args[zone_arg]
- if isinstance(zone_id, objects.Zone):
- # If the value is a Zone object, extract it's ID.
- zone_id = zone_id.id
-
- if new_zone and not zone_id:
- lock_name = 'create-new-zone'
- elif not new_zone and zone_id:
- lock_name = 'zone-%s' % zone_id
- else:
- raise Exception('Failed to determine zone id for '
- 'synchronized operation')
-
- if zone_id in ZONE_LOCKS.held:
- return f(self, *args, **kwargs)
-
- with self.coordination.get_lock(lock_name):
- try:
- ZONE_LOCKS.held.add(zone_id)
- return f(self, *args, **kwargs)
- finally:
- ZONE_LOCKS.held.remove(zone_id)
-
- sync_wrapper.__wrapped_function = f
- sync_wrapper.__wrapper_name = 'synchronized_zone'
- return sync_wrapper
-
- return outer
-
-
-def notification(notification_type):
- def outer(f):
- @functools.wraps(f)
- def notification_wrapper(self, *args, **kwargs):
- if not hasattr(NOTIFICATION_BUFFER, 'queue'):
- # Create the notifications queue if necessary
- NOTIFICATION_BUFFER.stack = 0
- NOTIFICATION_BUFFER.queue = collections.deque()
-
- NOTIFICATION_BUFFER.stack += 1
-
- try:
- # Find the context argument
- context = dcontext.DesignateContext.\
- get_context_from_function_and_args(f, args, kwargs)
-
- # Call the wrapped function
- result = f(self, *args, **kwargs)
-
- # Feed the args/result to a notification plugin
- # to determine what is emitted
- payloads = notifications.get_plugin().emit(
- notification_type, context, result, args, kwargs)
-
- # Enqueue the notification
- for payload in payloads:
- LOG.debug('Queueing notification for %(type)s ',
- {'type': notification_type})
- NOTIFICATION_BUFFER.queue.appendleft(
- (context, notification_type, payload,))
-
- return result
-
- finally:
- NOTIFICATION_BUFFER.stack -= 1
-
- if NOTIFICATION_BUFFER.stack == 0:
- LOG.debug('Emitting %(count)d notifications',
- {'count': len(NOTIFICATION_BUFFER.queue)})
- # Send the queued notifications, in order.
- for value in NOTIFICATION_BUFFER.queue:
- LOG.debug('Emitting %(type)s notification',
- {'type': value[1]})
- self.notifier.info(value[0], value[1], value[2])
-
- # Reset the queue
- NOTIFICATION_BUFFER.queue.clear()
-
- return notification_wrapper
- return outer
class Service(service.RPCService):
@@ -188,6 +56,9 @@ class Service(service.RPCService):
target = messaging.Target(version=RPC_API_VERSION)
def __init__(self):
+ self.zone_lock_local = lock.ZoneLockLocal()
+ self.notification_thread_local = notification.NotificationThreadLocal()
+
self._scheduler = None
self._storage = None
self._quota = None
@@ -196,11 +67,9 @@ class Service(service.RPCService):
self.service_name, cfg.CONF['service:central'].topic,
threads=cfg.CONF['service:central'].threads,
)
-
self.coordination = coordination.Coordination(
self.service_name, self.tg, grouping_enabled=False
)
-
self.network_api = network_api.get_network_api(cfg.CONF.network_api)
@property
@@ -713,7 +582,7 @@ class Service(service.RPCService):
# TLD Methods
@rpc.expected_exceptions()
- @notification('dns.tld.create')
+ @notification.notify_type('dns.tld.create')
@transaction
def create_tld(self, context, tld):
policy.check('create_tld', context)
@@ -738,7 +607,7 @@ class Service(service.RPCService):
return self.storage.get_tld(context, tld_id)
@rpc.expected_exceptions()
- @notification('dns.tld.update')
+ @notification.notify_type('dns.tld.update')
@transaction
def update_tld(self, context, tld):
target = {
@@ -751,7 +620,7 @@ class Service(service.RPCService):
return tld
@rpc.expected_exceptions()
- @notification('dns.tld.delete')
+ @notification.notify_type('dns.tld.delete')
@transaction
def delete_tld(self, context, tld_id):
policy.check('delete_tld', context, {'tld_id': tld_id})
@@ -762,7 +631,7 @@ class Service(service.RPCService):
# TSIG Key Methods
@rpc.expected_exceptions()
- @notification('dns.tsigkey.create')
+ @notification.notify_type('dns.tsigkey.create')
@transaction
def create_tsigkey(self, context, tsigkey):
policy.check('create_tsigkey', context)
@@ -788,7 +657,7 @@ class Service(service.RPCService):
return self.storage.get_tsigkey(context, tsigkey_id)
@rpc.expected_exceptions()
- @notification('dns.tsigkey.update')
+ @notification.notify_type('dns.tsigkey.update')
@transaction
def update_tsigkey(self, context, tsigkey):
target = {
@@ -803,7 +672,7 @@ class Service(service.RPCService):
return tsigkey
@rpc.expected_exceptions()
- @notification('dns.tsigkey.delete')
+ @notification.notify_type('dns.tsigkey.delete')
@transaction
def delete_tsigkey(self, context, tsigkey_id):
policy.check('delete_tsigkey', context, {'tsigkey_id': tsigkey_id})
@@ -862,9 +731,9 @@ class Service(service.RPCService):
return pool.ns_records
@rpc.expected_exceptions()
- @notification('dns.domain.create')
- @notification('dns.zone.create')
- @synchronized_zone(new_zone=True)
+ @notification.notify_type('dns.domain.create')
+ @notification.notify_type('dns.zone.create')
+ @lock.synchronized_zone(new_zone=True)
def create_zone(self, context, zone):
"""Create zone: perform checks and then call _create_zone()
"""
@@ -1060,9 +929,9 @@ class Service(service.RPCService):
sort_key, sort_dir)
@rpc.expected_exceptions()
- @notification('dns.domain.update')
- @notification('dns.zone.update')
- @synchronized_zone()
+ @notification.notify_type('dns.domain.update')
+ @notification.notify_type('dns.zone.update')
+ @lock.synchronized_zone()
def update_zone(self, context, zone, increment_serial=True):
"""Update zone. Perform checks and then call _update_zone()
@@ -1134,9 +1003,9 @@ class Service(service.RPCService):
return zone
@rpc.expected_exceptions()
- @notification('dns.domain.delete')
- @notification('dns.zone.delete')
- @synchronized_zone()
+ @notification.notify_type('dns.domain.delete')
+ @notification.notify_type('dns.zone.delete')
+ @lock.synchronized_zone()
def delete_zone(self, context, zone_id):
"""Delete or abandon a zone
On abandon, delete the zone from the DB immediately.
@@ -1294,8 +1163,8 @@ class Service(service.RPCService):
# RecordSet Methods
@rpc.expected_exceptions()
- @notification('dns.recordset.create')
- @synchronized_zone()
+ @notification.notify_type('dns.recordset.create')
+ @lock.synchronized_zone()
def create_recordset(self, context, zone_id, recordset,
increment_serial=True):
zone = self.storage.get_zone(context, zone_id)
@@ -1467,8 +1336,8 @@ class Service(service.RPCService):
recordsets=recordsets)
@rpc.expected_exceptions()
- @notification('dns.recordset.update')
- @synchronized_zone()
+ @notification.notify_type('dns.recordset.update')
+ @lock.synchronized_zone()
def update_recordset(self, context, recordset, increment_serial=True):
zone_id = recordset.obj_get_original_value('zone_id')
zone = self.storage.get_zone(context, zone_id)
@@ -1550,8 +1419,8 @@ class Service(service.RPCService):
return recordset, zone
@rpc.expected_exceptions()
- @notification('dns.recordset.delete')
- @synchronized_zone()
+ @notification.notify_type('dns.recordset.delete')
+ @lock.synchronized_zone()
def delete_recordset(self, context, zone_id, recordset_id,
increment_serial=True):
zone = self.storage.get_zone(context, zone_id)
@@ -2049,7 +1918,7 @@ class Service(service.RPCService):
# Blacklisted zones
@rpc.expected_exceptions()
- @notification('dns.blacklist.create')
+ @notification.notify_type('dns.blacklist.create')
@transaction
def create_blacklist(self, context, blacklist):
policy.check('create_blacklist', context)
@@ -2078,7 +1947,7 @@ class Service(service.RPCService):
return blacklists
@rpc.expected_exceptions()
- @notification('dns.blacklist.update')
+ @notification.notify_type('dns.blacklist.update')
@transaction
def update_blacklist(self, context, blacklist):
target = {
@@ -2091,7 +1960,7 @@ class Service(service.RPCService):
return blacklist
@rpc.expected_exceptions()
- @notification('dns.blacklist.delete')
+ @notification.notify_type('dns.blacklist.delete')
@transaction
def delete_blacklist(self, context, blacklist_id):
policy.check('delete_blacklist', context)
@@ -2102,7 +1971,7 @@ class Service(service.RPCService):
# Server Pools
@rpc.expected_exceptions()
- @notification('dns.pool.create')
+ @notification.notify_type('dns.pool.create')
@transaction
def create_pool(self, context, pool):
# Verify that there is a tenant_id
@@ -2141,7 +2010,7 @@ class Service(service.RPCService):
return self.storage.get_pool(context, pool_id)
@rpc.expected_exceptions()
- @notification('dns.pool.update')
+ @notification.notify_type('dns.pool.update')
@transaction
def update_pool(self, context, pool):
policy.check('update_pool', context)
@@ -2202,7 +2071,7 @@ class Service(service.RPCService):
return updated_pool
@rpc.expected_exceptions()
- @notification('dns.pool.delete')
+ @notification.notify_type('dns.pool.delete')
@transaction
def delete_pool(self, context, pool_id):
@@ -2225,10 +2094,10 @@ class Service(service.RPCService):
# Pool Manager Integration
@rpc.expected_exceptions()
- @notification('dns.domain.update')
- @notification('dns.zone.update')
+ @notification.notify_type('dns.domain.update')
+ @notification.notify_type('dns.zone.update')
@transaction
- @synchronized_zone()
+ @lock.synchronized_zone()
def update_status(self, context, zone_id, status, serial, action=None):
"""
:param context: Security context information.
@@ -2356,7 +2225,7 @@ class Service(service.RPCService):
return ''.join(sysrand.choice(chars) for _ in range(size))
@rpc.expected_exceptions()
- @notification('dns.zone_transfer_request.create')
+ @notification.notify_type('dns.zone_transfer_request.create')
@transaction
def create_zone_transfer_request(self, context, zone_transfer_request):
@@ -2427,7 +2296,7 @@ class Service(service.RPCService):
return requests
@rpc.expected_exceptions()
- @notification('dns.zone_transfer_request.update')
+ @notification.notify_type('dns.zone_transfer_request.update')
@transaction
def update_zone_transfer_request(self, context, zone_transfer_request):
@@ -2449,7 +2318,7 @@ class Service(service.RPCService):
return request
@rpc.expected_exceptions()
- @notification('dns.zone_transfer_request.delete')
+ @notification.notify_type('dns.zone_transfer_request.delete')
@transaction
def delete_zone_transfer_request(self, context, zone_transfer_request_id):
# Get zone transfer request
@@ -2469,7 +2338,7 @@ class Service(service.RPCService):
zone_transfer_request_id)
@rpc.expected_exceptions()
- @notification('dns.zone_transfer_accept.create')
+ @notification.notify_type('dns.zone_transfer_accept.create')
@transaction
def create_zone_transfer_accept(self, context, zone_transfer_accept):
elevated_context = context.elevated(all_tenants=True)
@@ -2571,7 +2440,7 @@ class Service(service.RPCService):
# Zone Import Methods
@rpc.expected_exceptions()
- @notification('dns.zone_import.create')
+ @notification.notify_type('dns.zone_import.create')
def create_zone_import(self, context, request_body):
if policy.enforce_new_defaults():
target = {constants.RBAC_PROJECT_ID: context.project_id}
@@ -2667,7 +2536,7 @@ class Service(service.RPCService):
self.update_zone_import(context, zone_import)
@rpc.expected_exceptions()
- @notification('dns.zone_import.update')
+ @notification.notify_type('dns.zone_import.update')
def update_zone_import(self, context, zone_import):
if policy.enforce_new_defaults():
target = {constants.RBAC_PROJECT_ID: zone_import.tenant_id}
@@ -2710,7 +2579,7 @@ class Service(service.RPCService):
return self.storage.get_zone_import(context, zone_import_id)
@rpc.expected_exceptions()
- @notification('dns.zone_import.delete')
+ @notification.notify_type('dns.zone_import.delete')
@transaction
def delete_zone_import(self, context, zone_import_id):
@@ -2733,7 +2602,7 @@ class Service(service.RPCService):
# Zone Export Methods
@rpc.expected_exceptions()
- @notification('dns.zone_export.create')
+ @notification.notify_type('dns.zone_export.create')
def create_zone_export(self, context, zone_id):
# Try getting the zone to ensure it exists
zone = self.storage.get_zone(context, zone_id)
@@ -2797,7 +2666,7 @@ class Service(service.RPCService):
return self.storage.get_zone_export(context, zone_export_id)
@rpc.expected_exceptions()
- @notification('dns.zone_export.update')
+ @notification.notify_type('dns.zone_export.update')
def update_zone_export(self, context, zone_export):
if policy.enforce_new_defaults():
@@ -2810,7 +2679,7 @@ class Service(service.RPCService):
return self.storage.update_zone_export(context, zone_export)
@rpc.expected_exceptions()
- @notification('dns.zone_export.delete')
+ @notification.notify_type('dns.zone_export.delete')
@transaction
def delete_zone_export(self, context, zone_export_id):