summaryrefslogtreecommitdiff
path: root/designate/tests/test_notification_handler/test_nova.py
diff options
context:
space:
mode:
Diffstat (limited to 'designate/tests/test_notification_handler/test_nova.py')
-rw-r--r--designate/tests/test_notification_handler/test_nova.py117
1 files changed, 99 insertions, 18 deletions
diff --git a/designate/tests/test_notification_handler/test_nova.py b/designate/tests/test_notification_handler/test_nova.py
index 141d7378..4d973987 100644
--- a/designate/tests/test_notification_handler/test_nova.py
+++ b/designate/tests/test_notification_handler/test_nova.py
@@ -47,22 +47,25 @@ class NovaFixedHandlerTest(TestCase, NotificationHandlerMixin):
self.assertIn(event_type, self.plugin.get_event_types())
- criterion = {'zone_id': self.zone_id}
+ criterion = {
+ 'zone_id': self.zone_id,
+ 'managed': True,
+ 'managed_resource_type': 'instance',
+ }
- # Ensure we start with 2 records
records = self.central_service.find_records(self.admin_context,
criterion)
- # Should only be SOA and NS records
- self.assertEqual(2, len(records))
+ # Ensure we start with zero managed records
+ self.assertFalse(records)
self.plugin.process_notification(
self.admin_context.to_dict(), event_type, fixture['payload'])
- # Ensure we now have exactly 1 more record
+ # Ensure we now have exactly 2 records.
records = self.central_service.find_records(self.admin_context,
criterion)
- self.assertEqual(4, len(records))
+ self.assertEqual(2, len(records))
def test_instance_create_end_utf8(self):
self.config(formatv4=['%(display_name)s.%(zone)s'],
@@ -77,13 +80,14 @@ class NovaFixedHandlerTest(TestCase, NotificationHandlerMixin):
self.assertIn(event_type, self.plugin.get_event_types())
- criterion = {'zone_id': self.zone_id}
+ criterion = {
+ 'zone_id': self.zone_id,
+ }
- # Ensure we start with 2 records
recordsets = self.central_service.find_recordsets(
self.admin_context, criterion)
- # Should only be SOA and NS recordsets
+ # Ensure that we only have SOA and NS recordsets.
self.assertEqual(2, len(recordsets))
self.plugin.process_notification(
@@ -118,28 +122,105 @@ class NovaFixedHandlerTest(TestCase, NotificationHandlerMixin):
self.assertIn(event_type, self.plugin.get_event_types())
- criterion = {'zone_id': self.zone_id}
+ criterion = {
+ 'zone_id': self.zone_id,
+ 'managed': True,
+ 'managed_resource_type': 'instance',
+ }
+
+ # Ensure we start with exactly 2 records.
+ records = self.central_service.find_records(self.admin_context,
+ criterion)
+
+ self.assertEqual(2, len(records))
+
+ self.plugin.process_notification(
+ self.admin_context.to_dict(), event_type, fixture['payload'])
- # Ensure we start with at least 1 record, plus NS and SOA
+ # Ensure we now have exactly zero active records
records = self.central_service.find_records(self.admin_context,
criterion)
+ self.assertEqual(2, len(records), records)
+
+ # The two deleted records should now be in action state DELETE.
+ for record in records:
+ self.assertEqual('DELETE', record.action)
+ self.assertEqual('172.16.0.14', record.data)
+
+ def test_instance_delete_one_with_multiple_records_with_same_name(self):
+ # Prepare for the test
+ for start_event_type in ['compute.instance.create.end',
+ 'compute.instance.create.end-2']:
+ start_fixture = self.get_notification_fixture(
+ 'nova', start_event_type
+ )
+ self.plugin.process_notification(
+ self.admin_context.to_dict(),
+ start_fixture['event_type'],
+ start_fixture['payload']
+ )
+
+ # Now - Onto the real test
+ event_type = 'compute.instance.delete.start'
+ fixture = self.get_notification_fixture('nova', event_type)
+
+ self.assertIn(event_type, self.plugin.get_event_types())
+
+ criterion = {
+ 'zone_id': self.zone_id,
+ 'managed': True,
+ 'managed_resource_type': 'instance',
+ }
+
+ records = self.central_service.find_records(self.admin_context,
+ criterion)
+
+ # Ensure we start with exactly 4 records.
self.assertEqual(4, len(records))
self.plugin.process_notification(
self.admin_context.to_dict(), event_type, fixture['payload'])
- # Simulate the record having been deleted on the backend
- zone_serial = self.central_service.get_zone(
- self.admin_context, self.zone_id).serial
- self.central_service.update_status(
- self.admin_context, self.zone_id, "SUCCESS", zone_serial)
+ # Ensure we now have exactly 2 records
+ records = self.central_service.find_records(self.admin_context,
+ criterion)
+
+ self.assertEqual(2, len(records), records)
+
+ # The two remaining records should be in waiting UPDATE.
+ for record in records:
+ self.assertEqual('UPDATE', record.action)
+ self.assertEqual('172.16.0.15', record.data)
+
+ def test_instance_delete_with_no_record(self):
+ event_type = 'compute.instance.delete.start'
+ fixture = self.get_notification_fixture('nova', event_type)
+
+ self.assertIn(event_type, self.plugin.get_event_types())
+
+ criterion = {
+ 'zone_id': self.zone_id,
+ 'managed': True,
+ 'managed_resource_type': 'instance',
+ }
- # Ensure we now have exactly 0 records, plus NS and SOA
records = self.central_service.find_records(self.admin_context,
criterion)
- self.assertEqual(2, len(records))
+ # Ensure we start with zero records
+ self.assertFalse(records)
+
+ # Make sure we don't fail here, even though there is nothing to
+ # do, since the record we are trying to delete does not actually exist.
+ self.plugin.process_notification(
+ self.admin_context.to_dict(), event_type, fixture['payload'])
+
+ # Ensure we still have zero records
+ records = self.central_service.find_records(self.admin_context,
+ criterion)
+
+ self.assertFalse(records)
def test_label_in_format_v4_v6(self):
event_type = 'compute.instance.create.end'