diff options
Diffstat (limited to 'designate/tests/test_central/test_service.py')
-rw-r--r-- | designate/tests/test_central/test_service.py | 440 |
1 files changed, 420 insertions, 20 deletions
diff --git a/designate/tests/test_central/test_service.py b/designate/tests/test_central/test_service.py index 537a3aa6..08527d7e 100644 --- a/designate/tests/test_central/test_service.py +++ b/designate/tests/test_central/test_service.py @@ -24,6 +24,7 @@ import random from unittest import mock from oslo_config import cfg +from oslo_config import fixture as cfg_fixture from oslo_db import exception as db_exception from oslo_log import log as logging from oslo_messaging.notify import notifier @@ -421,8 +422,8 @@ class CentralServiceTest(CentralTestCase): admin_context = self.get_admin_context() admin_context.all_tenants = True - tenant_one_context = self.get_context(project_id=1) - tenant_two_context = self.get_context(project_id=2) + tenant_one_context = self.get_context(project_id='1') + tenant_two_context = self.get_context(project_id='2') # in the beginning, there should be nothing tenants = self.central_service.count_tenants(admin_context) @@ -719,7 +720,7 @@ class CentralServiceTest(CentralTestCase): self.policy({'use_low_ttl': '!'}) self.config(min_ttl=100, group='service:central') - context = self.get_context(project_id=1) + context = self.get_context(project_id='1') values = self.get_zone_fixture(fixture=1) values['ttl'] = 5 @@ -796,8 +797,8 @@ class CentralServiceTest(CentralTestCase): admin_context = self.get_admin_context() admin_context.all_tenants = True - tenant_one_context = self.get_context(project_id=1) - tenant_two_context = self.get_context(project_id=2) + tenant_one_context = self.get_context(project_id='1') + tenant_two_context = self.get_context(project_id='2') # Ensure we have no zones to start with. zones = self.central_service.find_zones(admin_context) @@ -822,7 +823,7 @@ class CentralServiceTest(CentralTestCase): def test_get_zone(self): # Create a zone - zone_name = '%d.example.com.' % random.randint(10, 1000) + zone_name = '%d.example.com.' % random.randint(10, 10000) expected_zone = self.create_zone(name=zone_name) # Retrieve it, and ensure it's the same @@ -833,6 +834,42 @@ class CentralServiceTest(CentralTestCase): self.assertEqual(expected_zone['name'], zone['name']) self.assertEqual(expected_zone['email'], zone['email']) + def test_get_zone_not_owner_not_shared(self): + # Create a zone + zone_name = '%d.example.com.' % random.randint(10, 10000) + expected_zone = self.create_zone(name=zone_name) + + context = self.get_context(project_id='fake') + + with mock.patch.object(self.central_service.storage, + 'is_zone_shared_with_project', + return_value=False): + # Make sure random projects can't get the zone + exc = self.assertRaises(rpc_dispatcher.ExpectedException, + self.central_service.get_zone, + context, expected_zone['id'], + apply_tenant_criteria=False) + self.assertEqual(exceptions.ZoneNotFound, exc.exc_info[0]) + + def test_get_zone_not_owner_shared(self): + # Create a zone + zone_name = '%d.example.com.' % random.randint(10, 10000) + expected_zone = self.create_zone(name=zone_name) + + context = self.get_context(project_id='fake') + + with mock.patch.object(self.central_service.storage, + 'is_zone_shared_with_project', + return_value=True): + + # Retrieve it, and ensure it's the same + zone = self.central_service.get_zone(context, expected_zone['id'], + apply_tenant_criteria=False) + + self.assertEqual(expected_zone['id'], zone['id']) + self.assertEqual(expected_zone['name'], zone['name']) + self.assertEqual(expected_zone['email'], zone['email']) + def test_get_zone_servers(self): # Create a zone zone = self.create_zone() @@ -985,6 +1022,51 @@ class CentralServiceTest(CentralTestCase): self.assertIsInstance(notified_zone, objects.Zone) self.assertEqual(deleted_zone.id, notified_zone.id) + @mock.patch.object(notifier.Notifier, "info") + def test_delete_zone_shared_no_delete_shares(self, mock_notifier): + # Create a zone + zone = self.create_zone() + + # Share the zone + self.share_zone(context=self.admin_context, zone_id=zone.id) + + mock_notifier.reset_mock() + + # Delete the zone + self.assertRaises(exceptions.ZoneShared, + self.central_service.delete_zone, + self.admin_context, zone['id']) + + @mock.patch.object(notifier.Notifier, "info") + def test_delete_zone_shared_delete_shares(self, mock_notifier): + context = self.get_admin_context(delete_shares=True) + + # Create a zone + zone = self.create_zone(context=context) + + # Share the zone + self.share_zone(context=context, zone_id=zone.id) + + mock_notifier.reset_mock() + + # Delete the zone + self.central_service.delete_zone(context, zone.id) + + # Fetch the zone + deleted_zone = self.central_service.get_zone(context, zone['id']) + + # Ensure the zone is marked for deletion + self.assertEqual(zone.id, deleted_zone.id) + self.assertEqual(zone.name, deleted_zone.name) + self.assertEqual(zone.email, deleted_zone.email) + self.assertEqual('PENDING', deleted_zone.status) + self.assertEqual(zone.tenant_id, deleted_zone.tenant_id) + self.assertEqual(zone.parent_zone_id, + deleted_zone.parent_zone_id) + self.assertEqual('DELETE', deleted_zone.action) + self.assertEqual(zone.serial, deleted_zone.serial) + self.assertEqual(zone.pool_id, deleted_zone.pool_id) + def test_delete_parent_zone(self): # Create the Parent Zone using fixture 0 parent_zone = self.create_zone(fixture=0) @@ -1391,6 +1473,69 @@ class CentralServiceTest(CentralTestCase): # in the recordset self.assertEqual(original_serial, new_serial) + def test_create_recordset_shared_zone(self): + zone = self.create_zone() + original_serial = zone.serial + + # Create the Object + recordset = objects.RecordSet(name='www.%s' % zone.name, type='A') + + context = self.get_context(project_id='1') + self.share_zone(context=self.admin_context, zone_id=zone.id, + target_project_id='1') + + # Persist the Object + recordset = self.central_service.create_recordset( + context, zone.id, recordset=recordset) + + # Get the zone again to check if serial increased + updated_zone = self.central_service.get_zone(self.admin_context, + zone.id) + new_serial = updated_zone.serial + + # Ensure all values have been set correctly + self.assertIsNotNone(recordset.id) + self.assertEqual('www.%s' % zone.name, recordset.name) + self.assertEqual('A', recordset.type) + + self.assertIsNotNone(recordset.records) + # The serial number does not get updated is there are no records + # in the recordset + self.assertEqual(original_serial, new_serial) + + def test_create_recordset_shared_zone_new_policy_defaults(self): + zone = self.create_zone() + original_serial = zone.serial + + # Create the Object + recordset = objects.RecordSet(name='www.%s' % zone.name, type='A') + + self.useFixture(cfg_fixture.Config(cfg.CONF)) + cfg.CONF.set_override('enforce_new_defaults', True, 'oslo_policy') + context = self.get_context(project_id='1', roles=['member', 'reader']) + + self.share_zone(context=self.admin_context, zone_id=zone.id, + target_project_id='1') + + # Persist the Object + recordset = self.central_service.create_recordset( + context, zone.id, recordset=recordset) + + # Get the zone again to check if serial increased + updated_zone = self.central_service.get_zone(self.admin_context, + zone.id) + new_serial = updated_zone.serial + + # Ensure all values have been set correctly + self.assertIsNotNone(recordset.id) + self.assertEqual('www.%s' % zone.name, recordset.name) + self.assertEqual('A', recordset.type) + + self.assertIsNotNone(recordset.records) + # The serial number does not get updated is there are no records + # in the recordset + self.assertEqual(original_serial, new_serial) + def test_create_recordset_with_records(self): zone = self.create_zone() original_serial = zone.serial @@ -1573,6 +1718,24 @@ class CentralServiceTest(CentralTestCase): self.assertEqual(exceptions.RecordSetNotFound, exc.exc_info[0]) + def test_get_recordset_shared_zone(self): + zone = self.create_zone() + + context = self.get_context(project_id='1') + self.share_zone(context=self.admin_context, zone_id=zone.id, + target_project_id='1') + + # Create a recordset + expected = self.create_recordset(zone) + + # Retrieve it, and ensure it's the same + recordset = self.central_service.get_recordset( + context, zone['id'], expected['id']) + + self.assertEqual(expected['id'], recordset['id']) + self.assertEqual(expected['name'], recordset['name']) + self.assertEqual(expected['type'], recordset['type']) + def test_find_recordsets(self): zone = self.create_zone() @@ -1606,6 +1769,41 @@ class CentralServiceTest(CentralTestCase): self.assertEqual('www.%s' % zone['name'], recordsets[2]['name']) self.assertEqual('mail.%s' % zone['name'], recordsets[3]['name']) + def test_find_recordsets_shared_zone(self): + zone = self.create_zone() + + context = self.get_context(project_id='1') + self.share_zone(context=self.admin_context, zone_id=zone.id, + target_project_id='1') + + criterion = {'zone_id': zone['id']} + + # Create a single recordset (using default values) + self.create_recordset(zone, name='www.%s' % zone['name']) + + # Ensure we can retrieve the newly created recordset + recordsets = self.central_service.find_recordsets(context, criterion) + + self.assertEqual(3, len(recordsets)) + self.assertEqual('www.%s' % zone['name'], recordsets[2]['name']) + + def test_find_recordsets_not_shared_zone(self): + zone = self.create_zone() + + context = self.get_context(project_id='2') + + criterion = {'zone_id': zone['id']} + + # Create a single recordset (using default values) + self.create_recordset(zone, name='www.%s' % zone['name']) + + # Ensure we can retrieve the newly created recordset + exc = self.assertRaises(rpc_dispatcher.ExpectedException, + self.central_service.find_recordsets, + context, criterion) + + self.assertEqual(exceptions.ZoneNotFound, exc.exc_info[0]) + def test_find_recordset(self): zone = self.create_zone() @@ -1856,6 +2054,37 @@ class CentralServiceTest(CentralTestCase): self.assertRaises(ovo_exc.ReadOnlyFieldError, setattr, recordset, 'type', cname_recordset.type) + def test_update_recordset_shared_zone(self): + # Create a zone + zone = self.create_zone() + original_serial = zone.serial + + context = self.get_context(project_id='1') + self.share_zone(context=self.admin_context, zone_id=zone.id, + target_project_id='1') + + # Create a recordset + recordset = self.create_recordset(zone, context=context) + + # Update the recordset + recordset.ttl = 1800 + + # Perform the update + self.central_service.update_recordset(context, recordset) + + # Get zone again to verify that serial number was updated + updated_zone = self.central_service.get_zone(self.admin_context, + zone.id) + new_serial = updated_zone.serial + + # Fetch the resource again + recordset = self.central_service.get_recordset( + self.admin_context, recordset.zone_id, recordset.id) + + # Ensure the new value took + self.assertEqual(1800, recordset.ttl) + self.assertThat(new_serial, GreaterThan(original_serial)) + def test_delete_recordset(self): zone = self.create_zone() original_serial = zone.serial @@ -3078,14 +3307,14 @@ class CentralServiceTest(CentralTestCase): self.assertEqual(zt_request.key, retrived_zt.key) def test_get_zone_transfer_request_scoped(self): - tenant_1_context = self.get_context(project_id=1) - tenant_2_context = self.get_context(project_id=2) - tenant_3_context = self.get_context(project_id=3) + tenant_1_context = self.get_context(project_id='1') + tenant_2_context = self.get_context(project_id='2') + tenant_3_context = self.get_context(project_id='3') zone = self.create_zone(context=tenant_1_context) zt_request = self.create_zone_transfer_request( zone, context=tenant_1_context, - target_tenant_id=2) + target_tenant_id='2') self.central_service.get_zone_transfer_request( tenant_2_context, zt_request.id) @@ -3129,8 +3358,8 @@ class CentralServiceTest(CentralTestCase): exc.exc_info[0]) def test_create_zone_transfer_accept(self): - tenant_1_context = self.get_context(project_id=1) - tenant_2_context = self.get_context(project_id=2) + tenant_1_context = self.get_context(project_id='1') + tenant_2_context = self.get_context(project_id="2") admin_context = self.get_admin_context() admin_context.all_tenants = True @@ -3179,8 +3408,8 @@ class CentralServiceTest(CentralTestCase): 'COMPLETE', result['zt_request'].status) def test_create_zone_transfer_accept_scoped(self): - tenant_1_context = self.get_context(project_id=1) - tenant_2_context = self.get_context(project_id=2) + tenant_1_context = self.get_context(project_id='1') + tenant_2_context = self.get_context(project_id="2") admin_context = self.get_admin_context() admin_context.all_tenants = True @@ -3231,8 +3460,8 @@ class CentralServiceTest(CentralTestCase): 'COMPLETE', result['zt_request'].status) def test_create_zone_transfer_accept_failed_key(self): - tenant_1_context = self.get_context(project_id=1) - tenant_2_context = self.get_context(project_id=2) + tenant_1_context = self.get_context(project_id='1') + tenant_2_context = self.get_context(project_id="2") admin_context = self.get_admin_context() admin_context.all_tenants = True @@ -3241,7 +3470,7 @@ class CentralServiceTest(CentralTestCase): zone_transfer_request = self.create_zone_transfer_request( zone, context=tenant_1_context, - target_tenant_id=2) + target_tenant_id="2") zone_transfer_accept = objects.ZoneTransferAccept() zone_transfer_accept.zone_transfer_request_id =\ @@ -3258,8 +3487,8 @@ class CentralServiceTest(CentralTestCase): self.assertEqual(exceptions.IncorrectZoneTransferKey, exc.exc_info[0]) def test_create_zone_tarnsfer_accept_out_of_tenant_scope(self): - tenant_1_context = self.get_context(project_id=1) - tenant_3_context = self.get_context(project_id=3) + tenant_1_context = self.get_context(project_id='1') + tenant_3_context = self.get_context(project_id="3") admin_context = self.get_admin_context() admin_context.all_tenants = True @@ -3268,7 +3497,7 @@ class CentralServiceTest(CentralTestCase): zone_transfer_request = self.create_zone_transfer_request( zone, context=tenant_1_context, - target_tenant_id=2) + target_tenant_id="2") zone_transfer_accept = objects.ZoneTransferAccept() zone_transfer_accept.zone_transfer_request_id =\ @@ -3532,3 +3761,174 @@ class CentralServiceTest(CentralTestCase): context, zone_import['id']) self.assertEqual(exceptions.ZoneImportNotFound, exc.exc_info[0]) + + def test_share_zone(self): + # Create a Shared Zone + context = self.get_context(project_id='1') + zone = self.create_zone(context=context) + shared_zone = self.share_zone(context=context, zone_id=zone.id) + + # Ensure all values have been set correctly + self.assertIsNotNone(shared_zone['id']) + self.assertEqual('target_project_id', shared_zone.target_project_id) + self.assertEqual(context.project_id, shared_zone.project_id) + self.assertEqual(zone.id, shared_zone.zone_id) + + def test_share_zone_new_policy_defaults(self): + # Configure designate for enforcing the new policy defaults + self.useFixture(cfg_fixture.Config(cfg.CONF)) + cfg.CONF.set_override('enforce_new_defaults', True, 'oslo_policy') + context = self.get_context(project_id='1', roles=['member', 'reader']) + + # Create a Shared Zone + zone = self.create_zone(context=context) + shared_zone = self.share_zone(context=context, zone_id=zone.id) + + # Ensure all values have been set correctly + self.assertIsNotNone(shared_zone['id']) + self.assertEqual('target_project_id', shared_zone.target_project_id) + self.assertEqual(context.project_id, shared_zone.project_id) + self.assertEqual(zone.id, shared_zone.zone_id) + + def test_unshare_zone(self): + context = self.get_context(project_id='1') + zone = self.create_zone(context=context) + shared_zone = self.share_zone(context=context, zone_id=zone.id) + + new_shared_zone_obj = self.central_service.unshare_zone( + context, zone.id, shared_zone.id + ) + + self.assertEqual(shared_zone.id, new_shared_zone_obj.id) + self.assertEqual(shared_zone.target_project_id, + new_shared_zone_obj.target_project_id) + self.assertEqual(shared_zone.project_id, + new_shared_zone_obj.project_id) + + def test_unshare_zone_new_policy_defaults(self): + # Configure designate for enforcing the new policy defaults + self.useFixture(cfg_fixture.Config(cfg.CONF)) + cfg.CONF.set_override('enforce_new_defaults', True, 'oslo_policy') + context = self.get_context(project_id='1', roles=['member', 'reader']) + + # Create a Shared Zone + zone = self.create_zone(context=context) + shared_zone = self.share_zone(context=context, zone_id=zone.id) + + new_shared_zone_obj = self.central_service.unshare_zone( + context, zone.id, shared_zone.id + ) + + self.assertEqual(shared_zone.id, new_shared_zone_obj.id) + self.assertEqual(shared_zone.target_project_id, + new_shared_zone_obj.target_project_id) + self.assertEqual(shared_zone.project_id, + new_shared_zone_obj.project_id) + + def test_unshare_zone_with_child_objects(self): + context = self.get_context(project_id='1') + zone = self.create_zone(context=context) + shared_zone = self.share_zone(context=context, zone_id=zone.id) + + with mock.patch.object(self.central_service.storage, + 'count_zones', return_value=1): + exc = self.assertRaises(rpc_dispatcher.ExpectedException, + self.central_service.unshare_zone, + context, zone.id, shared_zone.id) + + self.assertEqual(exceptions.SharedZoneHasSubZone, exc.exc_info[0]) + + with mock.patch.object(self.central_service.storage, + 'count_recordsets', return_value=1): + exc = self.assertRaises(rpc_dispatcher.ExpectedException, + self.central_service.unshare_zone, + context, zone.id, shared_zone.id) + + self.assertEqual( + exceptions.SharedZoneHasRecordSets, + exc.exc_info[0] + ) + + def test_find_shared_zones(self): + context = self.get_context(project_id='1') + zone = self.create_zone(context=context) + + # Ensure we have no shared zones to start with. + shared_zones = self.central_service.find_shared_zones(context, + criterion={'zone_id': zone.id}) + self.assertEqual(0, len(shared_zones)) + + # Create a first shared_zone + shared_zone = self.share_zone(context=context, zone_id=zone.id) + + # Ensure we can retrieve the newly created shared_zone + shared_zones = self.central_service.find_shared_zones(context, + criterion={'zone_id': zone.id}) + self.assertEqual(1, len(shared_zones)) + + # Ensure we can retrieve the newly created shared_zone no criteria + shared_zones = self.central_service.find_shared_zones(context) + self.assertEqual(1, len(shared_zones)) + + # Create a second shared_zone + second_shared_zone = self.share_zone( + context=context, zone_id=zone.id, target_project_id="second_tenant" + ) + + # Ensure we can retrieve both shared_zones + shared_zones = self.central_service.find_shared_zones(context, + criterion={'zone_id': zone.id}) + + self.assertEqual(2, len(shared_zones)) + self.assertEqual(zone.id, shared_zones[0].zone_id) + self.assertEqual(shared_zone.id, shared_zones[0].id) + self.assertEqual(zone.id, shared_zones[1].zone_id) + self.assertEqual(second_shared_zone.id, shared_zones[1].id) + + def test_find_shared_zones_new_policy_defaults(self): + # Configure designate for enforcing the new policy defaults + context = self.get_context(project_id='1', roles=['member', 'reader']) + + zone = self.create_zone(context=context) + + # Create a first shared_zone + shared_zone = self.share_zone(context=context, zone_id=zone.id) + + # Ensure we can retrieve the newly created shared_zone + shared_zones = self.central_service.find_shared_zones(context, + criterion={'zone_id': zone.id}) + self.assertEqual(1, len(shared_zones)) + + # Create a second shared_zone + second_shared_zone = self.share_zone( + context=context, zone_id=zone.id, target_project_id="second_tenant" + ) + + self.useFixture(cfg_fixture.Config(cfg.CONF)) + cfg.CONF.set_override('enforce_new_defaults', True, 'oslo_policy') + + # Ensure we can retrieve both shared_zones + shared_zones = self.central_service.find_shared_zones(context, + criterion={'zone_id': zone.id}) + + self.assertEqual(2, len(shared_zones)) + self.assertEqual(zone.id, shared_zones[0].zone_id) + self.assertEqual(shared_zone.id, shared_zones[0].id) + self.assertEqual(zone.id, shared_zones[1].zone_id) + self.assertEqual(second_shared_zone.id, shared_zones[1].id) + + def test_get_shared_zone(self): + context = self.get_context(project_id='1') + zone = self.create_zone(context=context) + + shared_zone = self.share_zone(context=context, zone_id=zone.id) + + retrived_shared_zone = self.central_service.get_shared_zone( + context, zone.id, shared_zone.id) + + self.assertEqual(zone.id, retrived_shared_zone.zone_id) + self.assertEqual(shared_zone.id, retrived_shared_zone.id) + self.assertEqual(shared_zone.target_project_id, + retrived_shared_zone.target_project_id) + self.assertEqual(shared_zone.project_id, + retrived_shared_zone.project_id) |