diff options
Diffstat (limited to 'designate/tests/test_notification_handler/test_nova.py')
-rw-r--r-- | designate/tests/test_notification_handler/test_nova.py | 117 |
1 files changed, 99 insertions, 18 deletions
diff --git a/designate/tests/test_notification_handler/test_nova.py b/designate/tests/test_notification_handler/test_nova.py index 141d7378..4d973987 100644 --- a/designate/tests/test_notification_handler/test_nova.py +++ b/designate/tests/test_notification_handler/test_nova.py @@ -47,22 +47,25 @@ class NovaFixedHandlerTest(TestCase, NotificationHandlerMixin): self.assertIn(event_type, self.plugin.get_event_types()) - criterion = {'zone_id': self.zone_id} + criterion = { + 'zone_id': self.zone_id, + 'managed': True, + 'managed_resource_type': 'instance', + } - # Ensure we start with 2 records records = self.central_service.find_records(self.admin_context, criterion) - # Should only be SOA and NS records - self.assertEqual(2, len(records)) + # Ensure we start with zero managed records + self.assertFalse(records) self.plugin.process_notification( self.admin_context.to_dict(), event_type, fixture['payload']) - # Ensure we now have exactly 1 more record + # Ensure we now have exactly 2 records. records = self.central_service.find_records(self.admin_context, criterion) - self.assertEqual(4, len(records)) + self.assertEqual(2, len(records)) def test_instance_create_end_utf8(self): self.config(formatv4=['%(display_name)s.%(zone)s'], @@ -77,13 +80,14 @@ class NovaFixedHandlerTest(TestCase, NotificationHandlerMixin): self.assertIn(event_type, self.plugin.get_event_types()) - criterion = {'zone_id': self.zone_id} + criterion = { + 'zone_id': self.zone_id, + } - # Ensure we start with 2 records recordsets = self.central_service.find_recordsets( self.admin_context, criterion) - # Should only be SOA and NS recordsets + # Ensure that we only have SOA and NS recordsets. self.assertEqual(2, len(recordsets)) self.plugin.process_notification( @@ -118,28 +122,105 @@ class NovaFixedHandlerTest(TestCase, NotificationHandlerMixin): self.assertIn(event_type, self.plugin.get_event_types()) - criterion = {'zone_id': self.zone_id} + criterion = { + 'zone_id': self.zone_id, + 'managed': True, + 'managed_resource_type': 'instance', + } + + # Ensure we start with exactly 2 records. + records = self.central_service.find_records(self.admin_context, + criterion) + + self.assertEqual(2, len(records)) + + self.plugin.process_notification( + self.admin_context.to_dict(), event_type, fixture['payload']) - # Ensure we start with at least 1 record, plus NS and SOA + # Ensure we now have exactly zero active records records = self.central_service.find_records(self.admin_context, criterion) + self.assertEqual(2, len(records), records) + + # The two deleted records should now be in action state DELETE. + for record in records: + self.assertEqual('DELETE', record.action) + self.assertEqual('172.16.0.14', record.data) + + def test_instance_delete_one_with_multiple_records_with_same_name(self): + # Prepare for the test + for start_event_type in ['compute.instance.create.end', + 'compute.instance.create.end-2']: + start_fixture = self.get_notification_fixture( + 'nova', start_event_type + ) + self.plugin.process_notification( + self.admin_context.to_dict(), + start_fixture['event_type'], + start_fixture['payload'] + ) + + # Now - Onto the real test + event_type = 'compute.instance.delete.start' + fixture = self.get_notification_fixture('nova', event_type) + + self.assertIn(event_type, self.plugin.get_event_types()) + + criterion = { + 'zone_id': self.zone_id, + 'managed': True, + 'managed_resource_type': 'instance', + } + + records = self.central_service.find_records(self.admin_context, + criterion) + + # Ensure we start with exactly 4 records. self.assertEqual(4, len(records)) self.plugin.process_notification( self.admin_context.to_dict(), event_type, fixture['payload']) - # Simulate the record having been deleted on the backend - zone_serial = self.central_service.get_zone( - self.admin_context, self.zone_id).serial - self.central_service.update_status( - self.admin_context, self.zone_id, "SUCCESS", zone_serial) + # Ensure we now have exactly 2 records + records = self.central_service.find_records(self.admin_context, + criterion) + + self.assertEqual(2, len(records), records) + + # The two remaining records should be in waiting UPDATE. + for record in records: + self.assertEqual('UPDATE', record.action) + self.assertEqual('172.16.0.15', record.data) + + def test_instance_delete_with_no_record(self): + event_type = 'compute.instance.delete.start' + fixture = self.get_notification_fixture('nova', event_type) + + self.assertIn(event_type, self.plugin.get_event_types()) + + criterion = { + 'zone_id': self.zone_id, + 'managed': True, + 'managed_resource_type': 'instance', + } - # Ensure we now have exactly 0 records, plus NS and SOA records = self.central_service.find_records(self.admin_context, criterion) - self.assertEqual(2, len(records)) + # Ensure we start with zero records + self.assertFalse(records) + + # Make sure we don't fail here, even though there is nothing to + # do, since the record we are trying to delete does not actually exist. + self.plugin.process_notification( + self.admin_context.to_dict(), event_type, fixture['payload']) + + # Ensure we still have zero records + records = self.central_service.find_records(self.admin_context, + criterion) + + self.assertFalse(records) def test_label_in_format_v4_v6(self): event_type = 'compute.instance.create.end' |