summaryrefslogtreecommitdiff
path: root/designate/tests/test_central/test_service.py
diff options
context:
space:
mode:
Diffstat (limited to 'designate/tests/test_central/test_service.py')
-rw-r--r--designate/tests/test_central/test_service.py440
1 files changed, 420 insertions, 20 deletions
diff --git a/designate/tests/test_central/test_service.py b/designate/tests/test_central/test_service.py
index 537a3aa6..08527d7e 100644
--- a/designate/tests/test_central/test_service.py
+++ b/designate/tests/test_central/test_service.py
@@ -24,6 +24,7 @@ import random
from unittest import mock
from oslo_config import cfg
+from oslo_config import fixture as cfg_fixture
from oslo_db import exception as db_exception
from oslo_log import log as logging
from oslo_messaging.notify import notifier
@@ -421,8 +422,8 @@ class CentralServiceTest(CentralTestCase):
admin_context = self.get_admin_context()
admin_context.all_tenants = True
- tenant_one_context = self.get_context(project_id=1)
- tenant_two_context = self.get_context(project_id=2)
+ tenant_one_context = self.get_context(project_id='1')
+ tenant_two_context = self.get_context(project_id='2')
# in the beginning, there should be nothing
tenants = self.central_service.count_tenants(admin_context)
@@ -719,7 +720,7 @@ class CentralServiceTest(CentralTestCase):
self.policy({'use_low_ttl': '!'})
self.config(min_ttl=100,
group='service:central')
- context = self.get_context(project_id=1)
+ context = self.get_context(project_id='1')
values = self.get_zone_fixture(fixture=1)
values['ttl'] = 5
@@ -796,8 +797,8 @@ class CentralServiceTest(CentralTestCase):
admin_context = self.get_admin_context()
admin_context.all_tenants = True
- tenant_one_context = self.get_context(project_id=1)
- tenant_two_context = self.get_context(project_id=2)
+ tenant_one_context = self.get_context(project_id='1')
+ tenant_two_context = self.get_context(project_id='2')
# Ensure we have no zones to start with.
zones = self.central_service.find_zones(admin_context)
@@ -822,7 +823,7 @@ class CentralServiceTest(CentralTestCase):
def test_get_zone(self):
# Create a zone
- zone_name = '%d.example.com.' % random.randint(10, 1000)
+ zone_name = '%d.example.com.' % random.randint(10, 10000)
expected_zone = self.create_zone(name=zone_name)
# Retrieve it, and ensure it's the same
@@ -833,6 +834,42 @@ class CentralServiceTest(CentralTestCase):
self.assertEqual(expected_zone['name'], zone['name'])
self.assertEqual(expected_zone['email'], zone['email'])
+ def test_get_zone_not_owner_not_shared(self):
+ # Create a zone
+ zone_name = '%d.example.com.' % random.randint(10, 10000)
+ expected_zone = self.create_zone(name=zone_name)
+
+ context = self.get_context(project_id='fake')
+
+ with mock.patch.object(self.central_service.storage,
+ 'is_zone_shared_with_project',
+ return_value=False):
+ # Make sure random projects can't get the zone
+ exc = self.assertRaises(rpc_dispatcher.ExpectedException,
+ self.central_service.get_zone,
+ context, expected_zone['id'],
+ apply_tenant_criteria=False)
+ self.assertEqual(exceptions.ZoneNotFound, exc.exc_info[0])
+
+ def test_get_zone_not_owner_shared(self):
+ # Create a zone
+ zone_name = '%d.example.com.' % random.randint(10, 10000)
+ expected_zone = self.create_zone(name=zone_name)
+
+ context = self.get_context(project_id='fake')
+
+ with mock.patch.object(self.central_service.storage,
+ 'is_zone_shared_with_project',
+ return_value=True):
+
+ # Retrieve it, and ensure it's the same
+ zone = self.central_service.get_zone(context, expected_zone['id'],
+ apply_tenant_criteria=False)
+
+ self.assertEqual(expected_zone['id'], zone['id'])
+ self.assertEqual(expected_zone['name'], zone['name'])
+ self.assertEqual(expected_zone['email'], zone['email'])
+
def test_get_zone_servers(self):
# Create a zone
zone = self.create_zone()
@@ -985,6 +1022,51 @@ class CentralServiceTest(CentralTestCase):
self.assertIsInstance(notified_zone, objects.Zone)
self.assertEqual(deleted_zone.id, notified_zone.id)
+ @mock.patch.object(notifier.Notifier, "info")
+ def test_delete_zone_shared_no_delete_shares(self, mock_notifier):
+ # Create a zone
+ zone = self.create_zone()
+
+ # Share the zone
+ self.share_zone(context=self.admin_context, zone_id=zone.id)
+
+ mock_notifier.reset_mock()
+
+ # Delete the zone
+ self.assertRaises(exceptions.ZoneShared,
+ self.central_service.delete_zone,
+ self.admin_context, zone['id'])
+
+ @mock.patch.object(notifier.Notifier, "info")
+ def test_delete_zone_shared_delete_shares(self, mock_notifier):
+ context = self.get_admin_context(delete_shares=True)
+
+ # Create a zone
+ zone = self.create_zone(context=context)
+
+ # Share the zone
+ self.share_zone(context=context, zone_id=zone.id)
+
+ mock_notifier.reset_mock()
+
+ # Delete the zone
+ self.central_service.delete_zone(context, zone.id)
+
+ # Fetch the zone
+ deleted_zone = self.central_service.get_zone(context, zone['id'])
+
+ # Ensure the zone is marked for deletion
+ self.assertEqual(zone.id, deleted_zone.id)
+ self.assertEqual(zone.name, deleted_zone.name)
+ self.assertEqual(zone.email, deleted_zone.email)
+ self.assertEqual('PENDING', deleted_zone.status)
+ self.assertEqual(zone.tenant_id, deleted_zone.tenant_id)
+ self.assertEqual(zone.parent_zone_id,
+ deleted_zone.parent_zone_id)
+ self.assertEqual('DELETE', deleted_zone.action)
+ self.assertEqual(zone.serial, deleted_zone.serial)
+ self.assertEqual(zone.pool_id, deleted_zone.pool_id)
+
def test_delete_parent_zone(self):
# Create the Parent Zone using fixture 0
parent_zone = self.create_zone(fixture=0)
@@ -1391,6 +1473,69 @@ class CentralServiceTest(CentralTestCase):
# in the recordset
self.assertEqual(original_serial, new_serial)
+ def test_create_recordset_shared_zone(self):
+ zone = self.create_zone()
+ original_serial = zone.serial
+
+ # Create the Object
+ recordset = objects.RecordSet(name='www.%s' % zone.name, type='A')
+
+ context = self.get_context(project_id='1')
+ self.share_zone(context=self.admin_context, zone_id=zone.id,
+ target_project_id='1')
+
+ # Persist the Object
+ recordset = self.central_service.create_recordset(
+ context, zone.id, recordset=recordset)
+
+ # Get the zone again to check if serial increased
+ updated_zone = self.central_service.get_zone(self.admin_context,
+ zone.id)
+ new_serial = updated_zone.serial
+
+ # Ensure all values have been set correctly
+ self.assertIsNotNone(recordset.id)
+ self.assertEqual('www.%s' % zone.name, recordset.name)
+ self.assertEqual('A', recordset.type)
+
+ self.assertIsNotNone(recordset.records)
+ # The serial number does not get updated is there are no records
+ # in the recordset
+ self.assertEqual(original_serial, new_serial)
+
+ def test_create_recordset_shared_zone_new_policy_defaults(self):
+ zone = self.create_zone()
+ original_serial = zone.serial
+
+ # Create the Object
+ recordset = objects.RecordSet(name='www.%s' % zone.name, type='A')
+
+ self.useFixture(cfg_fixture.Config(cfg.CONF))
+ cfg.CONF.set_override('enforce_new_defaults', True, 'oslo_policy')
+ context = self.get_context(project_id='1', roles=['member', 'reader'])
+
+ self.share_zone(context=self.admin_context, zone_id=zone.id,
+ target_project_id='1')
+
+ # Persist the Object
+ recordset = self.central_service.create_recordset(
+ context, zone.id, recordset=recordset)
+
+ # Get the zone again to check if serial increased
+ updated_zone = self.central_service.get_zone(self.admin_context,
+ zone.id)
+ new_serial = updated_zone.serial
+
+ # Ensure all values have been set correctly
+ self.assertIsNotNone(recordset.id)
+ self.assertEqual('www.%s' % zone.name, recordset.name)
+ self.assertEqual('A', recordset.type)
+
+ self.assertIsNotNone(recordset.records)
+ # The serial number does not get updated is there are no records
+ # in the recordset
+ self.assertEqual(original_serial, new_serial)
+
def test_create_recordset_with_records(self):
zone = self.create_zone()
original_serial = zone.serial
@@ -1573,6 +1718,24 @@ class CentralServiceTest(CentralTestCase):
self.assertEqual(exceptions.RecordSetNotFound, exc.exc_info[0])
+ def test_get_recordset_shared_zone(self):
+ zone = self.create_zone()
+
+ context = self.get_context(project_id='1')
+ self.share_zone(context=self.admin_context, zone_id=zone.id,
+ target_project_id='1')
+
+ # Create a recordset
+ expected = self.create_recordset(zone)
+
+ # Retrieve it, and ensure it's the same
+ recordset = self.central_service.get_recordset(
+ context, zone['id'], expected['id'])
+
+ self.assertEqual(expected['id'], recordset['id'])
+ self.assertEqual(expected['name'], recordset['name'])
+ self.assertEqual(expected['type'], recordset['type'])
+
def test_find_recordsets(self):
zone = self.create_zone()
@@ -1606,6 +1769,41 @@ class CentralServiceTest(CentralTestCase):
self.assertEqual('www.%s' % zone['name'], recordsets[2]['name'])
self.assertEqual('mail.%s' % zone['name'], recordsets[3]['name'])
+ def test_find_recordsets_shared_zone(self):
+ zone = self.create_zone()
+
+ context = self.get_context(project_id='1')
+ self.share_zone(context=self.admin_context, zone_id=zone.id,
+ target_project_id='1')
+
+ criterion = {'zone_id': zone['id']}
+
+ # Create a single recordset (using default values)
+ self.create_recordset(zone, name='www.%s' % zone['name'])
+
+ # Ensure we can retrieve the newly created recordset
+ recordsets = self.central_service.find_recordsets(context, criterion)
+
+ self.assertEqual(3, len(recordsets))
+ self.assertEqual('www.%s' % zone['name'], recordsets[2]['name'])
+
+ def test_find_recordsets_not_shared_zone(self):
+ zone = self.create_zone()
+
+ context = self.get_context(project_id='2')
+
+ criterion = {'zone_id': zone['id']}
+
+ # Create a single recordset (using default values)
+ self.create_recordset(zone, name='www.%s' % zone['name'])
+
+ # Ensure we can retrieve the newly created recordset
+ exc = self.assertRaises(rpc_dispatcher.ExpectedException,
+ self.central_service.find_recordsets,
+ context, criterion)
+
+ self.assertEqual(exceptions.ZoneNotFound, exc.exc_info[0])
+
def test_find_recordset(self):
zone = self.create_zone()
@@ -1856,6 +2054,37 @@ class CentralServiceTest(CentralTestCase):
self.assertRaises(ovo_exc.ReadOnlyFieldError, setattr,
recordset, 'type', cname_recordset.type)
+ def test_update_recordset_shared_zone(self):
+ # Create a zone
+ zone = self.create_zone()
+ original_serial = zone.serial
+
+ context = self.get_context(project_id='1')
+ self.share_zone(context=self.admin_context, zone_id=zone.id,
+ target_project_id='1')
+
+ # Create a recordset
+ recordset = self.create_recordset(zone, context=context)
+
+ # Update the recordset
+ recordset.ttl = 1800
+
+ # Perform the update
+ self.central_service.update_recordset(context, recordset)
+
+ # Get zone again to verify that serial number was updated
+ updated_zone = self.central_service.get_zone(self.admin_context,
+ zone.id)
+ new_serial = updated_zone.serial
+
+ # Fetch the resource again
+ recordset = self.central_service.get_recordset(
+ self.admin_context, recordset.zone_id, recordset.id)
+
+ # Ensure the new value took
+ self.assertEqual(1800, recordset.ttl)
+ self.assertThat(new_serial, GreaterThan(original_serial))
+
def test_delete_recordset(self):
zone = self.create_zone()
original_serial = zone.serial
@@ -3078,14 +3307,14 @@ class CentralServiceTest(CentralTestCase):
self.assertEqual(zt_request.key, retrived_zt.key)
def test_get_zone_transfer_request_scoped(self):
- tenant_1_context = self.get_context(project_id=1)
- tenant_2_context = self.get_context(project_id=2)
- tenant_3_context = self.get_context(project_id=3)
+ tenant_1_context = self.get_context(project_id='1')
+ tenant_2_context = self.get_context(project_id='2')
+ tenant_3_context = self.get_context(project_id='3')
zone = self.create_zone(context=tenant_1_context)
zt_request = self.create_zone_transfer_request(
zone,
context=tenant_1_context,
- target_tenant_id=2)
+ target_tenant_id='2')
self.central_service.get_zone_transfer_request(
tenant_2_context, zt_request.id)
@@ -3129,8 +3358,8 @@ class CentralServiceTest(CentralTestCase):
exc.exc_info[0])
def test_create_zone_transfer_accept(self):
- tenant_1_context = self.get_context(project_id=1)
- tenant_2_context = self.get_context(project_id=2)
+ tenant_1_context = self.get_context(project_id='1')
+ tenant_2_context = self.get_context(project_id="2")
admin_context = self.get_admin_context()
admin_context.all_tenants = True
@@ -3179,8 +3408,8 @@ class CentralServiceTest(CentralTestCase):
'COMPLETE', result['zt_request'].status)
def test_create_zone_transfer_accept_scoped(self):
- tenant_1_context = self.get_context(project_id=1)
- tenant_2_context = self.get_context(project_id=2)
+ tenant_1_context = self.get_context(project_id='1')
+ tenant_2_context = self.get_context(project_id="2")
admin_context = self.get_admin_context()
admin_context.all_tenants = True
@@ -3231,8 +3460,8 @@ class CentralServiceTest(CentralTestCase):
'COMPLETE', result['zt_request'].status)
def test_create_zone_transfer_accept_failed_key(self):
- tenant_1_context = self.get_context(project_id=1)
- tenant_2_context = self.get_context(project_id=2)
+ tenant_1_context = self.get_context(project_id='1')
+ tenant_2_context = self.get_context(project_id="2")
admin_context = self.get_admin_context()
admin_context.all_tenants = True
@@ -3241,7 +3470,7 @@ class CentralServiceTest(CentralTestCase):
zone_transfer_request = self.create_zone_transfer_request(
zone,
context=tenant_1_context,
- target_tenant_id=2)
+ target_tenant_id="2")
zone_transfer_accept = objects.ZoneTransferAccept()
zone_transfer_accept.zone_transfer_request_id =\
@@ -3258,8 +3487,8 @@ class CentralServiceTest(CentralTestCase):
self.assertEqual(exceptions.IncorrectZoneTransferKey, exc.exc_info[0])
def test_create_zone_tarnsfer_accept_out_of_tenant_scope(self):
- tenant_1_context = self.get_context(project_id=1)
- tenant_3_context = self.get_context(project_id=3)
+ tenant_1_context = self.get_context(project_id='1')
+ tenant_3_context = self.get_context(project_id="3")
admin_context = self.get_admin_context()
admin_context.all_tenants = True
@@ -3268,7 +3497,7 @@ class CentralServiceTest(CentralTestCase):
zone_transfer_request = self.create_zone_transfer_request(
zone,
context=tenant_1_context,
- target_tenant_id=2)
+ target_tenant_id="2")
zone_transfer_accept = objects.ZoneTransferAccept()
zone_transfer_accept.zone_transfer_request_id =\
@@ -3532,3 +3761,174 @@ class CentralServiceTest(CentralTestCase):
context, zone_import['id'])
self.assertEqual(exceptions.ZoneImportNotFound, exc.exc_info[0])
+
+ def test_share_zone(self):
+ # Create a Shared Zone
+ context = self.get_context(project_id='1')
+ zone = self.create_zone(context=context)
+ shared_zone = self.share_zone(context=context, zone_id=zone.id)
+
+ # Ensure all values have been set correctly
+ self.assertIsNotNone(shared_zone['id'])
+ self.assertEqual('target_project_id', shared_zone.target_project_id)
+ self.assertEqual(context.project_id, shared_zone.project_id)
+ self.assertEqual(zone.id, shared_zone.zone_id)
+
+ def test_share_zone_new_policy_defaults(self):
+ # Configure designate for enforcing the new policy defaults
+ self.useFixture(cfg_fixture.Config(cfg.CONF))
+ cfg.CONF.set_override('enforce_new_defaults', True, 'oslo_policy')
+ context = self.get_context(project_id='1', roles=['member', 'reader'])
+
+ # Create a Shared Zone
+ zone = self.create_zone(context=context)
+ shared_zone = self.share_zone(context=context, zone_id=zone.id)
+
+ # Ensure all values have been set correctly
+ self.assertIsNotNone(shared_zone['id'])
+ self.assertEqual('target_project_id', shared_zone.target_project_id)
+ self.assertEqual(context.project_id, shared_zone.project_id)
+ self.assertEqual(zone.id, shared_zone.zone_id)
+
+ def test_unshare_zone(self):
+ context = self.get_context(project_id='1')
+ zone = self.create_zone(context=context)
+ shared_zone = self.share_zone(context=context, zone_id=zone.id)
+
+ new_shared_zone_obj = self.central_service.unshare_zone(
+ context, zone.id, shared_zone.id
+ )
+
+ self.assertEqual(shared_zone.id, new_shared_zone_obj.id)
+ self.assertEqual(shared_zone.target_project_id,
+ new_shared_zone_obj.target_project_id)
+ self.assertEqual(shared_zone.project_id,
+ new_shared_zone_obj.project_id)
+
+ def test_unshare_zone_new_policy_defaults(self):
+ # Configure designate for enforcing the new policy defaults
+ self.useFixture(cfg_fixture.Config(cfg.CONF))
+ cfg.CONF.set_override('enforce_new_defaults', True, 'oslo_policy')
+ context = self.get_context(project_id='1', roles=['member', 'reader'])
+
+ # Create a Shared Zone
+ zone = self.create_zone(context=context)
+ shared_zone = self.share_zone(context=context, zone_id=zone.id)
+
+ new_shared_zone_obj = self.central_service.unshare_zone(
+ context, zone.id, shared_zone.id
+ )
+
+ self.assertEqual(shared_zone.id, new_shared_zone_obj.id)
+ self.assertEqual(shared_zone.target_project_id,
+ new_shared_zone_obj.target_project_id)
+ self.assertEqual(shared_zone.project_id,
+ new_shared_zone_obj.project_id)
+
+ def test_unshare_zone_with_child_objects(self):
+ context = self.get_context(project_id='1')
+ zone = self.create_zone(context=context)
+ shared_zone = self.share_zone(context=context, zone_id=zone.id)
+
+ with mock.patch.object(self.central_service.storage,
+ 'count_zones', return_value=1):
+ exc = self.assertRaises(rpc_dispatcher.ExpectedException,
+ self.central_service.unshare_zone,
+ context, zone.id, shared_zone.id)
+
+ self.assertEqual(exceptions.SharedZoneHasSubZone, exc.exc_info[0])
+
+ with mock.patch.object(self.central_service.storage,
+ 'count_recordsets', return_value=1):
+ exc = self.assertRaises(rpc_dispatcher.ExpectedException,
+ self.central_service.unshare_zone,
+ context, zone.id, shared_zone.id)
+
+ self.assertEqual(
+ exceptions.SharedZoneHasRecordSets,
+ exc.exc_info[0]
+ )
+
+ def test_find_shared_zones(self):
+ context = self.get_context(project_id='1')
+ zone = self.create_zone(context=context)
+
+ # Ensure we have no shared zones to start with.
+ shared_zones = self.central_service.find_shared_zones(context,
+ criterion={'zone_id': zone.id})
+ self.assertEqual(0, len(shared_zones))
+
+ # Create a first shared_zone
+ shared_zone = self.share_zone(context=context, zone_id=zone.id)
+
+ # Ensure we can retrieve the newly created shared_zone
+ shared_zones = self.central_service.find_shared_zones(context,
+ criterion={'zone_id': zone.id})
+ self.assertEqual(1, len(shared_zones))
+
+ # Ensure we can retrieve the newly created shared_zone no criteria
+ shared_zones = self.central_service.find_shared_zones(context)
+ self.assertEqual(1, len(shared_zones))
+
+ # Create a second shared_zone
+ second_shared_zone = self.share_zone(
+ context=context, zone_id=zone.id, target_project_id="second_tenant"
+ )
+
+ # Ensure we can retrieve both shared_zones
+ shared_zones = self.central_service.find_shared_zones(context,
+ criterion={'zone_id': zone.id})
+
+ self.assertEqual(2, len(shared_zones))
+ self.assertEqual(zone.id, shared_zones[0].zone_id)
+ self.assertEqual(shared_zone.id, shared_zones[0].id)
+ self.assertEqual(zone.id, shared_zones[1].zone_id)
+ self.assertEqual(second_shared_zone.id, shared_zones[1].id)
+
+ def test_find_shared_zones_new_policy_defaults(self):
+ # Configure designate for enforcing the new policy defaults
+ context = self.get_context(project_id='1', roles=['member', 'reader'])
+
+ zone = self.create_zone(context=context)
+
+ # Create a first shared_zone
+ shared_zone = self.share_zone(context=context, zone_id=zone.id)
+
+ # Ensure we can retrieve the newly created shared_zone
+ shared_zones = self.central_service.find_shared_zones(context,
+ criterion={'zone_id': zone.id})
+ self.assertEqual(1, len(shared_zones))
+
+ # Create a second shared_zone
+ second_shared_zone = self.share_zone(
+ context=context, zone_id=zone.id, target_project_id="second_tenant"
+ )
+
+ self.useFixture(cfg_fixture.Config(cfg.CONF))
+ cfg.CONF.set_override('enforce_new_defaults', True, 'oslo_policy')
+
+ # Ensure we can retrieve both shared_zones
+ shared_zones = self.central_service.find_shared_zones(context,
+ criterion={'zone_id': zone.id})
+
+ self.assertEqual(2, len(shared_zones))
+ self.assertEqual(zone.id, shared_zones[0].zone_id)
+ self.assertEqual(shared_zone.id, shared_zones[0].id)
+ self.assertEqual(zone.id, shared_zones[1].zone_id)
+ self.assertEqual(second_shared_zone.id, shared_zones[1].id)
+
+ def test_get_shared_zone(self):
+ context = self.get_context(project_id='1')
+ zone = self.create_zone(context=context)
+
+ shared_zone = self.share_zone(context=context, zone_id=zone.id)
+
+ retrived_shared_zone = self.central_service.get_shared_zone(
+ context, zone.id, shared_zone.id)
+
+ self.assertEqual(zone.id, retrived_shared_zone.zone_id)
+ self.assertEqual(shared_zone.id, retrived_shared_zone.id)
+ self.assertEqual(shared_zone.target_project_id,
+ retrived_shared_zone.target_project_id)
+ self.assertEqual(shared_zone.project_id,
+ retrived_shared_zone.project_id)