diff options
Diffstat (limited to 'designate/tests/test_notification_handler/test_nova.py')
-rw-r--r-- | designate/tests/test_notification_handler/test_nova.py | 91 |
1 files changed, 88 insertions, 3 deletions
diff --git a/designate/tests/test_notification_handler/test_nova.py b/designate/tests/test_notification_handler/test_nova.py index 4d973987..245a36da 100644 --- a/designate/tests/test_notification_handler/test_nova.py +++ b/designate/tests/test_notification_handler/test_nova.py @@ -18,10 +18,11 @@ from unittest import mock from oslo_log import log as logging -from designate.tests import TestCase +from designate import exceptions from designate.notification_handler.nova import NovaFixedHandler -from designate.tests.test_notification_handler import \ - NotificationHandlerMixin +from designate import objects +from designate.tests.test_notification_handler import NotificationHandlerMixin +from designate.tests import TestCase LOG = logging.getLogger(__name__) @@ -148,6 +149,52 @@ class NovaFixedHandlerTest(TestCase, NotificationHandlerMixin): self.assertEqual('DELETE', record.action) self.assertEqual('172.16.0.14', record.data) + def test_instance_delete_start_record_status_changed(self): + start_event_type = 'compute.instance.create.end' + start_fixture = self.get_notification_fixture('nova', start_event_type) + + self.plugin.process_notification(self.admin_context.to_dict(), + start_event_type, + start_fixture['payload']) + + event_type = 'compute.instance.delete.start' + fixture = self.get_notification_fixture('nova', event_type) + + self.assertIn(event_type, self.plugin.get_event_types()) + + criterion = { + 'zone_id': self.zone_id, + 'managed': True, + 'managed_resource_type': 'instance', + } + + records = self.central_service.find_records(self.admin_context, + criterion) + + self.assertEqual(2, len(records)) + + org_find_recordset = self.central_service.find_recordset + + def mock_find_recordset(context, criterion): + results = org_find_recordset(context, criterion) + for r in results.records: + r.status = 'PENDING' + return results + + with mock.patch.object(self.central_service, 'find_recordset', + side_effect=mock_find_recordset): + self.plugin.process_notification( + self.admin_context.to_dict(), event_type, fixture['payload']) + + records = self.central_service.find_records(self.admin_context, + criterion) + + self.assertEqual(2, len(records), records) + + for record in records: + self.assertEqual('DELETE', record.action) + self.assertEqual('172.16.0.14', record.data) + def test_instance_delete_one_with_multiple_records_with_same_name(self): # Prepare for the test for start_event_type in ['compute.instance.create.end', @@ -222,6 +269,44 @@ class NovaFixedHandlerTest(TestCase, NotificationHandlerMixin): self.assertFalse(records) + def test_instance_delete_with_no_recordset(self): + start_event_type = 'compute.instance.create.end' + start_fixture = self.get_notification_fixture('nova', start_event_type) + + self.plugin.process_notification(self.admin_context.to_dict(), + start_event_type, + start_fixture['payload']) + + event_type = 'compute.instance.delete.start' + fixture = self.get_notification_fixture('nova', event_type) + + # Make sure we don't fail here, even though there is nothing to + # do, since the recordset we are trying to delete does not actually + # exist. + with mock.patch.object(self.central_service, 'find_recordset', + side_effect=exceptions.RecordSetNotFound): + self.plugin.process_notification( + self.admin_context.to_dict(), event_type, fixture['payload']) + + def test_instance_delete_with_no_records_in_recordset(self): + start_event_type = 'compute.instance.create.end' + start_fixture = self.get_notification_fixture('nova', start_event_type) + + self.plugin.process_notification(self.admin_context.to_dict(), + start_event_type, + start_fixture['payload']) + + event_type = 'compute.instance.delete.start' + fixture = self.get_notification_fixture('nova', event_type) + + # Make sure we don't fail here, even though there is nothing to + # do, since the recordset we are trying to delete contains no records. + with mock.patch.object( + self.central_service, 'find_recordset', + return_value=objects.RecordSet(records=objects.RecordList())): + self.plugin.process_notification( + self.admin_context.to_dict(), event_type, fixture['payload']) + def test_label_in_format_v4_v6(self): event_type = 'compute.instance.create.end' self.config(formatv4=['%(label)s.example.com.'], |