summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNikhil Manchanda <SlickNik@gmail.com>2014-03-25 11:26:43 -0700
committerNikhil Manchanda <SlickNik@gmail.com>2014-03-31 16:25:07 -0700
commit3668abcca14df3e94c4199fed57c8464967b6d15 (patch)
tree2ce758596ff10f5973a0b6b11105d0215ae8a56d
parent45a868aa63bac9a6cc22e17cb7b7c1b4950af1b3 (diff)
downloadtrove-3668abcca14df3e94c4199fed57c8464967b6d15.tar.gz
Remove mockito, and replace with mock
There are unit tests in trove which are dependent on mockito. This fixes them to use the standard mock library which is a part of global_requirements. Co-Authored-By: Justin Hopper <justin.hopper@hp.com> Implements-bp: remove-mockito Change-Id: Ic030cc846d9e7d0c9077240b0fac90f4f66a5a42
-rw-r--r--test-requirements.txt1
-rw-r--r--trove/tests/fakes/swift.py100
-rw-r--r--trove/tests/unittests/api/common/test_limits.py41
-rw-r--r--trove/tests/unittests/backup/test_backup_models.py230
-rw-r--r--trove/tests/unittests/backup/test_backupagent.py206
-rw-r--r--trove/tests/unittests/backup/test_storage.py94
-rw-r--r--trove/tests/unittests/common/test_remote.py19
-rw-r--r--trove/tests/unittests/conductor/test_methods.py2
-rw-r--r--trove/tests/unittests/dns/test_designate_driver.py50
-rw-r--r--trove/tests/unittests/guestagent/test_api.py309
-rw-r--r--trove/tests/unittests/guestagent/test_cassandra_manager.py54
-rw-r--r--trove/tests/unittests/guestagent/test_couchbase_manager.py56
-rw-r--r--trove/tests/unittests/guestagent/test_dbaas.py451
-rw-r--r--trove/tests/unittests/guestagent/test_mongodb_manager.py40
-rw-r--r--trove/tests/unittests/guestagent/test_mysql_manager.py132
-rw-r--r--trove/tests/unittests/guestagent/test_pkg.py17
-rw-r--r--trove/tests/unittests/guestagent/test_redis_manager.py90
-rw-r--r--trove/tests/unittests/mgmt/test_models.py418
-rw-r--r--trove/tests/unittests/quota/test_quota.py87
-rw-r--r--trove/tests/unittests/secgroups/test_security_group.py6
-rw-r--r--trove/tests/unittests/taskmanager/test_models.py330
21 files changed, 1401 insertions, 1332 deletions
diff --git a/test-requirements.txt b/test-requirements.txt
index 7baf2407..d99f6d2f 100644
--- a/test-requirements.txt
+++ b/test-requirements.txt
@@ -18,4 +18,3 @@ mox>=0.5.3
testtools>=0.9.32
discover
testrepository>=0.0.17
-mockito
diff --git a/trove/tests/fakes/swift.py b/trove/tests/fakes/swift.py
index b1df0071..9044ea87 100644
--- a/trove/tests/fakes/swift.py
+++ b/trove/tests/fakes/swift.py
@@ -1,10 +1,3 @@
-import uuid
-import logging
-from mockito import when, any
-import swiftclient.client as swift_client
-import swiftclient
-
-
# Copyright (C) 2012 Hewlett-Packard Development Company, L.P.
# All Rights Reserved.
#
@@ -24,7 +17,12 @@ import httplib
import json
import os
import socket
+import uuid
+import logging
+import swiftclient.client as swift_client
+import swiftclient
from hashlib import md5
+from mock import MagicMock
from swiftclient import client as swift
@@ -260,10 +258,6 @@ class SwiftClientStub(object):
def __init__(self):
self._connection = swift_client.Connection()
- # simulate getting an unknown container
- when(swift_client.Connection).get_container(any()).thenRaise(
- swiftclient.ClientException('Resource Not Found', http_status=404))
-
self._containers = {}
self._containers_list = []
self._objects = {}
@@ -302,7 +296,7 @@ class SwiftClientStub(object):
'content-type': 'application/json; charset=utf-8',
'x-account-object-count': '0'}, self._containers_list)
- when(swift_client.Connection).get_auth().thenReturn((
+ swift_client.Connection.get_auth = MagicMock(return_value=(
u"http://127.0.0.1:8080/v1/AUTH_c7b038976df24d96bf1980f5da17bd89",
u'MIINrwYJKoZIhvcNAQcCoIINoDCCDZwCAQExCTAHBgUrDgMCGjCCDIgGCSqGSIb3'
u'DQEHAaCCDHkEggx1eyJhY2Nlc3MiOiB7InRva2VuIjogeyJpc3N1ZWRfYXQiOiAi'
@@ -312,7 +306,8 @@ class SwiftClientStub(object):
u'ZWRkd2FyZiIsICJpZCI6ICJjN2IwMzg5NzZkZjI0ZDk2YmYxOTgwZjVkYTE3YmQ4'
u'OSJ9fSwgInNlcnZpY2VDYXRhbG9nIjogW3siZW5kcG9pbnRzIjogW3siYWRtaW5')
)
- when(swift_client.Connection).get_account().thenReturn(account_resp())
+ swift_client.Connection.get_account = MagicMock(
+ return_value=account_resp())
return self
def _create_container(self, container_name):
@@ -356,12 +351,19 @@ class SwiftClientStub(object):
self._objects[container])
# if this is called multiple times then nothing happens
- when(swift_client.Connection).put_container(container_name).thenReturn(
- None)
+ swift_client.Connection.put_container = MagicMock(return_value=None)
+
+ def side_effect_func(*args, **kwargs):
+ if args[0] in self._containers:
+ return container_resp(args[0])
+ else:
+ raise swiftclient.ClientException('Resource Not Found',
+ http_status=404)
+
self._create_container(container_name)
# return container headers
- when(swift_client.Connection).get_container(container_name).thenReturn(
- container_resp(container_name))
+ swift_client.Connection.get_container = MagicMock(
+ side_effect=side_effect_func)
return self
@@ -384,12 +386,6 @@ class SwiftClientStub(object):
"""
# first ensure container
self._ensure_container_exists(container)
- # allow one call to get container and then throw exceptions (may need
- # to be revised
- when(swift_client.Connection).delete_container(container).thenRaise(
- swiftclient.ClientException("Resource Not Found", http_status=404))
- when(swift_client.Connection).get_container(container).thenRaise(
- swiftclient.ClientException("Resource Not Found", http_status=404))
self._delete_container(container)
return self
@@ -415,24 +411,35 @@ class SwiftClientStub(object):
:param contents: the contents of the object
"""
- self._connection.get_container(container)
- when(swift_client.Connection).put_object(container, name,
- contents).thenReturn(
- uuid.uuid1())
- when(swift_client.Connection).get_object(container, name).thenReturn(
- ({'content-length': len(contents), 'accept-ranges': 'bytes',
- 'last-modified': 'Mon, 10 Mar 2013 01:06:34 GMT',
- 'etag': 'eb15a6874ce265e2c3eb1b4891567bab',
- 'x-timestamp': '1363568794.67584',
- 'x-trans-id': 'txef3aaf26c897420c8e77c9750ce6a501',
- 'date': 'Mon, 10 Mar 2013 05:35:14 GMT',
- 'content-type': 'application/octet-stream'}, contents)
- )
+ swift_client.Connection.put_object = MagicMock(
+ return_value=uuid.uuid1())
+
+ def side_effect_func(*args, **kwargs):
+ if (args[0] in self._containers and
+ args[1] in map(lambda x: x['name'],
+ self._objects[args[0]])):
+ return (
+ {'content-length': len(contents), 'accept-ranges': 'bytes',
+ 'last-modified': 'Mon, 10 Mar 2013 01:06:34 GMT',
+ 'etag': 'eb15a6874ce265e2c3eb1b4891567bab',
+ 'x-timestamp': '1363568794.67584',
+ 'x-trans-id': 'txef3aaf26c897420c8e77c9750ce6a501',
+ 'date': 'Mon, 10 Mar 2013 05:35:14 GMT',
+ 'content-type': 'application/octet-stream'},
+ [obj for obj in self._objects[args[0]]
+ if obj['name'] == args[1]][0]['contents'])
+ else:
+ raise swiftclient.ClientException('Resource Not Found',
+ http_status=404)
+
+ swift_client.Connection.get_object = MagicMock(
+ side_effect=side_effect_func)
+
self._remove_object(name, self._objects[container])
self._objects[container].append(
{'bytes': 13, 'last_modified': '2013-03-15T22:10:49.361950',
'hash': 'ccc55aefbf92aa66f42b638802c5e7f6', 'name': name,
- 'content_type': 'application/octet-stream'})
+ 'content_type': 'application/octet-stream', 'contents': contents})
return self
def without_object(self, container, name):
@@ -455,13 +462,18 @@ class SwiftClientStub(object):
"""
self._ensure_container_exists(container)
self._ensure_object_exists(container, name)
- # throw exception if someone calls get object
- when(swift_client.Connection).get_object(container, name).thenRaise(
- swiftclient.ClientException('Resource Not found', http_status=404))
- when(swift_client.Connection).delete_object(
- container, name).thenReturn(None).thenRaise(
- swiftclient.ClientException('Resource Not Found',
- http_status=404))
+
+ def side_effect_func(*args, **kwargs):
+ if not [obj for obj in self._objects[args[0]]
+ if obj['name'] == [args[1]]]:
+ raise swiftclient.ClientException('Resource Not found',
+ http_status=404)
+ else:
+ return None
+
+ swift_client.Connection.delete_object = MagicMock(
+ side_effect=side_effect_func)
+
self._remove_object(name, self._objects[container])
return self
diff --git a/trove/tests/unittests/api/common/test_limits.py b/trove/tests/unittests/api/common/test_limits.py
index 80c63072..e74e3c2f 100644
--- a/trove/tests/unittests/api/common/test_limits.py
+++ b/trove/tests/unittests/api/common/test_limits.py
@@ -23,7 +23,7 @@ from trove.quota.models import Quota
import testtools
import webob
-from mockito import when, mock, any
+from mock import Mock, MagicMock
from trove.common import limits
from trove.common.limits import Limit
from trove.limits import views
@@ -55,9 +55,10 @@ class LimitsControllerTest(BaseLimitTestSuite):
def test_limit_index_empty(self):
limit_controller = LimitsController()
- req = mock()
+ req = MagicMock()
req.environ = {}
- when(QUOTAS).get_all_quotas_by_tenant(any()).thenReturn({})
+
+ QUOTAS.get_all_quotas_by_tenant = MagicMock(return_value={})
view = limit_controller.index(req, "test_tenant_id")
expected = {'limits': [{'verb': 'ABSOLUTE'}]}
@@ -118,10 +119,11 @@ class LimitsControllerTest(BaseLimitTestSuite):
resource="volumes",
hard_limit=55)}
- req = mock()
+ req = MagicMock()
req.environ = {"trove.limits": limits}
- when(QUOTAS).get_all_quotas_by_tenant(tenant_id).thenReturn(abs_limits)
+ QUOTAS.get_all_quotas_by_tenant = MagicMock(return_value=abs_limits)
+
view = limit_controller.index(req, tenant_id)
expected = {
@@ -239,7 +241,8 @@ class LimitTest(BaseLimitTestSuite):
def test_GET_no_delay(self):
# Test a limit handles 1 GET per second.
limit = Limit("GET", "*", ".*", 1, 1)
- when(limit)._get_time().thenReturn(0.0)
+
+ limit._get_time = MagicMock(return_value=0.0)
delay = limit("GET", "/anything")
self.assertEqual(None, delay)
self.assertEqual(0, limit.next_request)
@@ -248,7 +251,7 @@ class LimitTest(BaseLimitTestSuite):
def test_GET_delay(self):
# Test two calls to 1 GET per second limit.
limit = Limit("GET", "*", ".*", 1, 1)
- when(limit)._get_time().thenReturn(0.0)
+ limit._get_time = MagicMock(return_value=0.0)
delay = limit("GET", "/anything")
self.assertEqual(None, delay)
@@ -258,7 +261,7 @@ class LimitTest(BaseLimitTestSuite):
self.assertEqual(1, limit.next_request)
self.assertEqual(0, limit.last_request)
- when(limit)._get_time().thenReturn(4.0)
+ limit._get_time = MagicMock(return_value=4.0)
delay = limit("GET", "/anything")
self.assertEqual(None, delay)
@@ -337,17 +340,16 @@ class LimiterTest(BaseLimitTestSuite):
Tests for the in-memory `limits.Limiter` class.
"""
- def update_limits(self, delay):
- for l in TEST_LIMITS:
- when(l)._get_time().thenReturn(delay)
+ def update_limits(self, delay, limit_list):
+ for ln in limit_list:
+ ln._get_time = Mock(return_value=delay)
def setUp(self):
"""Run before each test."""
super(LimiterTest, self).setUp()
userlimits = {'user:user3': ''}
- self.update_limits(0.0)
-
+ self.update_limits(0.0, TEST_LIMITS)
self.limiter = limits.Limiter(TEST_LIMITS, **userlimits)
def _check(self, num, verb, url, username=None):
@@ -413,7 +415,7 @@ class LimiterTest(BaseLimitTestSuite):
self.assertEqual(expected, results)
# Advance time
- self.update_limits(6.0)
+ self.update_limits(6.0, self.limiter.levels[None])
expected = [None, 6.0]
results = list(self._check(2, "PUT", "/anything"))
@@ -425,7 +427,7 @@ class LimiterTest(BaseLimitTestSuite):
results = list(self._check(20, "PUT", "/anything"))
self.assertEqual(expected, results)
- self.update_limits(1.0)
+ self.update_limits(1.0, self.limiter.levels[None])
expected = [5.0] * 10
results = list(self._check(10, "PUT", "/anything"))
@@ -438,7 +440,7 @@ class LimiterTest(BaseLimitTestSuite):
def test_multiple_users(self):
# Tests involving multiple users.
# User1
- self.update_limits(0.0)
+ self.update_limits(0.0, self.limiter.levels["user1"])
expected = [None] * 10 + [6.0] * 10
results = list(self._check(20, "PUT", "/anything", "user1"))
self.assertEqual(expected, results)
@@ -453,15 +455,14 @@ class LimiterTest(BaseLimitTestSuite):
results = list(self._check(20, "PUT", "/anything", "user3"))
self.assertEqual(expected, results)
- self.update_limits(1.0)
# User1 again
+ self.update_limits(1.0, self.limiter.levels["user1"])
expected = [5.0] * 10
results = list(self._check(10, "PUT", "/anything", "user1"))
self.assertEqual(expected, results)
- self.update_limits(2.0)
-
- # User1 again
+ # User2 again
+ self.update_limits(2.0, self.limiter.levels["user2"])
expected = [4.0] * 5
results = list(self._check(5, "PUT", "/anything", "user2"))
self.assertEqual(expected, results)
diff --git a/trove/tests/unittests/backup/test_backup_models.py b/trove/tests/unittests/backup/test_backup_models.py
index a24d4f89..24ef7783 100644
--- a/trove/tests/unittests/backup/test_backup_models.py
+++ b/trove/tests/unittests/backup/test_backup_models.py
@@ -13,7 +13,7 @@
import datetime
-from mockito import mock, when, unstub, any
+from mock import MagicMock, patch
import testtools
from trove.backup import models
@@ -48,65 +48,76 @@ class BackupCreateTest(testtools.TestCase):
def tearDown(self):
super(BackupCreateTest, self).tearDown()
- unstub()
if self.created:
models.DBBackup.find_by(
tenant_id=self.context.tenant).delete()
def test_create(self):
- instance = mock(instance_models.Instance)
- when(instance_models.BuiltInstance).load(any(), any()).thenReturn(
- instance)
- when(instance).validate_can_perform_action().thenReturn(None)
- when(models.Backup).validate_can_perform_action(
- any(), any()).thenReturn(None)
- when(models.Backup).verify_swift_auth_token(any()).thenReturn(
- None)
- when(api.API).create_backup(any()).thenReturn(None)
-
- bu = models.Backup.create(self.context, self.instance_id,
- BACKUP_NAME, BACKUP_DESC)
- self.created = True
-
- self.assertEqual(BACKUP_NAME, bu.name)
- self.assertEqual(BACKUP_DESC, bu.description)
- self.assertEqual(self.instance_id, bu.instance_id)
- self.assertEqual(models.BackupState.NEW, bu.state)
-
- db_record = models.DBBackup.find_by(id=bu.id)
- self.assertEqual(bu.id, db_record['id'])
- self.assertEqual(BACKUP_NAME, db_record['name'])
- self.assertEqual(BACKUP_DESC, db_record['description'])
- self.assertEqual(self.instance_id, db_record['instance_id'])
- self.assertEqual(models.BackupState.NEW, db_record['state'])
+ instance = MagicMock()
+ with patch.object(instance_models.BuiltInstance, 'load',
+ return_value=instance):
+ instance.validate_can_perform_action = MagicMock(
+ return_value=None)
+ with patch.object(models.Backup, 'validate_can_perform_action',
+ return_value=None):
+ with patch.object(models.Backup, 'verify_swift_auth_token',
+ return_value=None):
+ api.API.create_backup = MagicMock(return_value=None)
+ bu = models.Backup.create(self.context, self.instance_id,
+ BACKUP_NAME, BACKUP_DESC)
+ self.created = True
+
+ self.assertEqual(BACKUP_NAME, bu.name)
+ self.assertEqual(BACKUP_DESC, bu.description)
+ self.assertEqual(self.instance_id, bu.instance_id)
+ self.assertEqual(models.BackupState.NEW, bu.state)
+
+ db_record = models.DBBackup.find_by(id=bu.id)
+ self.assertEqual(bu.id, db_record['id'])
+ self.assertEqual(BACKUP_NAME, db_record['name'])
+ self.assertEqual(BACKUP_DESC, db_record['description'])
+ self.assertEqual(self.instance_id,
+ db_record['instance_id'])
+ self.assertEqual(models.BackupState.NEW,
+ db_record['state'])
def test_create_incremental(self):
- instance = mock(instance_models.Instance)
- parent = mock(models.DBBackup)
- when(instance_models.BuiltInstance).load(any(), any()).thenReturn(
- instance)
- when(instance).validate_can_perform_action().thenReturn(None)
- when(models.Backup).validate_can_perform_action(
- any(), any()).thenReturn(None)
- when(models.Backup).verify_swift_auth_token(any()).thenReturn(
- None)
- when(api.API).create_backup(any()).thenReturn(None)
- when(models.Backup).get_by_id(any(), any()).thenReturn(
- parent)
-
- incremental = models.Backup.create(self.context, self.instance_id,
- BACKUP_NAME, BACKUP_DESC,
- parent_id='parent_uuid')
-
- self.created = True
-
- db_record = models.DBBackup.find_by(id=incremental.id)
- self.assertEqual(incremental.id, db_record['id'])
- self.assertEqual(BACKUP_NAME, db_record['name'])
- self.assertEqual(BACKUP_DESC, db_record['description'])
- self.assertEqual(self.instance_id, db_record['instance_id'])
- self.assertEqual(models.BackupState.NEW, db_record['state'])
- self.assertEqual('parent_uuid', db_record['parent_id'])
+ instance = MagicMock()
+ parent = MagicMock(spec=models.DBBackup)
+ with patch.object(instance_models.BuiltInstance, 'load',
+ return_value=instance):
+ instance.validate_can_perform_action = MagicMock(
+ return_value=None)
+ with patch.object(models.Backup, 'validate_can_perform_action',
+ return_value=None):
+ with patch.object(models.Backup, 'verify_swift_auth_token',
+ return_value=None):
+ api.API.create_backup = MagicMock(return_value=None)
+ with patch.object(models.Backup, 'get_by_id',
+ return_value=parent):
+
+ incremental = models.Backup.create(
+ self.context,
+ self.instance_id,
+ BACKUP_NAME,
+ BACKUP_DESC,
+ parent_id='parent_uuid')
+
+ self.created = True
+
+ db_record = models.DBBackup.find_by(id=incremental.id)
+ self.assertEqual(incremental.id,
+ db_record['id'])
+ self.assertEqual(BACKUP_NAME,
+ db_record['name'])
+ self.assertEqual(BACKUP_DESC,
+ db_record['description'])
+ self.assertEqual(self.instance_id,
+ db_record['instance_id'])
+ self.assertEqual(models.BackupState.NEW,
+ db_record['state'])
+ self.assertEqual('parent_uuid',
+ db_record['parent_id'])
def test_create_instance_not_found(self):
self.assertRaises(exception.NotFound, models.Backup.create,
@@ -114,52 +125,60 @@ class BackupCreateTest(testtools.TestCase):
BACKUP_NAME, BACKUP_DESC)
def test_create_incremental_not_found(self):
- instance = mock(instance_models.Instance)
- when(instance_models.BuiltInstance).load(any(), any()).thenReturn(
- instance)
- when(instance).validate_can_perform_action().thenReturn(None)
- when(models.Backup).validate_can_perform_action(
- any(), any()).thenReturn(None)
- when(models.Backup).verify_swift_auth_token(any()).thenReturn(
- None)
- self.assertRaises(exception.NotFound, models.Backup.create,
- self.context, self.instance_id,
- BACKUP_NAME, BACKUP_DESC, parent_id='BAD')
+ instance = MagicMock()
+ with patch.object(instance_models.BuiltInstance, 'load',
+ return_value=instance):
+ instance.validate_can_perform_action = MagicMock(
+ return_value=None)
+ with patch.object(models.Backup, 'validate_can_perform_action',
+ return_value=None):
+ with patch.object(models.Backup, 'verify_swift_auth_token',
+ return_value=None):
+ self.assertRaises(exception.NotFound, models.Backup.create,
+ self.context, self.instance_id,
+ BACKUP_NAME, BACKUP_DESC,
+ parent_id='BAD')
def test_create_instance_not_active(self):
- instance = mock(instance_models.Instance)
- when(instance_models.BuiltInstance).load(any(), any()).thenReturn(
- instance)
- when(instance).validate_can_perform_action().thenRaise(
- exception.UnprocessableEntity)
- self.assertRaises(exception.UnprocessableEntity, models.Backup.create,
- self.context, self.instance_id,
- BACKUP_NAME, BACKUP_DESC)
+ instance = MagicMock()
+ with patch.object(instance_models.BuiltInstance, 'load',
+ return_value=instance):
+ instance.validate_can_perform_action = MagicMock(
+ side_effect=exception.UnprocessableEntity)
+ self.assertRaises(exception.UnprocessableEntity,
+ models.Backup.create,
+ self.context, self.instance_id,
+ BACKUP_NAME, BACKUP_DESC)
def test_create_backup_swift_token_invalid(self):
- instance = mock(instance_models.Instance)
- when(instance_models.BuiltInstance).load(any(), any()).thenReturn(
- instance)
- when(instance).validate_can_perform_action().thenReturn(None)
- when(models.Backup).validate_can_perform_action(
- any(), any()).thenReturn(None)
- when(models.Backup).verify_swift_auth_token(any()).thenRaise(
- exception.SwiftAuthError)
- self.assertRaises(exception.SwiftAuthError, models.Backup.create,
- self.context, self.instance_id,
- BACKUP_NAME, BACKUP_DESC)
+ instance = MagicMock()
+ with patch.object(instance_models.BuiltInstance, 'load',
+ return_value=instance):
+ instance.validate_can_perform_action = MagicMock(
+ return_value=None)
+ with patch.object(models.Backup, 'validate_can_perform_action',
+ return_value=None):
+ with patch.object(models.Backup, 'verify_swift_auth_token',
+ side_effect=exception.SwiftAuthError):
+ self.assertRaises(exception.SwiftAuthError,
+ models.Backup.create,
+ self.context, self.instance_id,
+ BACKUP_NAME, BACKUP_DESC)
def test_create_backup_datastore_operation_not_supported(self):
- instance = mock(instance_models.Instance)
- when(instance_models.BuiltInstance).load(any(), any()).thenReturn(
- instance)
- when(models.Backup).validate_can_perform_action(
- any(), any()).thenRaise(
- exception.DatastoreOperationNotSupported)
- self.assertRaises(exception.DatastoreOperationNotSupported,
- models.Backup.create,
- self.context, self.instance_id,
- BACKUP_NAME, BACKUP_DESC)
+ instance = MagicMock()
+ with patch.object(instance_models.BuiltInstance, 'load',
+ return_value=instance):
+ instance.validate_can_perform_action = MagicMock(
+ return_value=None)
+ with patch.object(
+ models.Backup, 'validate_can_perform_action',
+ side_effect=exception.DatastoreOperationNotSupported
+ ):
+ self.assertRaises(exception.DatastoreOperationNotSupported,
+ models.Backup.create,
+ self.context, self.instance_id,
+ BACKUP_NAME, BACKUP_DESC)
class BackupDeleteTest(testtools.TestCase):
@@ -170,27 +189,27 @@ class BackupDeleteTest(testtools.TestCase):
def tearDown(self):
super(BackupDeleteTest, self).tearDown()
- unstub()
def test_delete_backup_not_found(self):
self.assertRaises(exception.NotFound, models.Backup.delete,
self.context, 'backup-id')
def test_delete_backup_is_running(self):
- backup = mock()
+ backup = MagicMock()
backup.is_running = True
- when(models.Backup).get_by_id(any(), any()).thenReturn(backup)
- self.assertRaises(exception.UnprocessableEntity,
- models.Backup.delete, self.context, 'backup_id')
+ with patch.object(models.Backup, 'get_by_id', return_value=backup):
+ self.assertRaises(exception.UnprocessableEntity,
+ models.Backup.delete, self.context, 'backup_id')
def test_delete_backup_swift_token_invalid(self):
- backup = mock()
+ backup = MagicMock()
backup.is_running = False
- when(models.Backup).get_by_id(any(), any()).thenReturn(backup)
- when(models.Backup).verify_swift_auth_token(any()).thenRaise(
- exception.SwiftAuthError)
- self.assertRaises(exception.SwiftAuthError, models.Backup.delete,
- self.context, 'backup_id')
+ with patch.object(models.Backup, 'get_by_id', return_value=backup):
+ with patch.object(models.Backup, 'verify_swift_auth_token',
+ side_effect=exception.SwiftAuthError):
+ self.assertRaises(exception.SwiftAuthError,
+ models.Backup.delete,
+ self.context, 'backup_id')
class BackupORMTest(testtools.TestCase):
@@ -209,7 +228,6 @@ class BackupORMTest(testtools.TestCase):
def tearDown(self):
super(BackupORMTest, self).tearDown()
- unstub()
if not self.deleted:
models.DBBackup.find_by(tenant_id=self.context.tenant).delete()
@@ -304,7 +322,6 @@ class PaginationTests(testtools.TestCase):
def tearDown(self):
super(PaginationTests, self).tearDown()
- unstub()
query = models.DBBackup.query()
query.filter_by(instance_id=self.instance_id).delete()
@@ -378,7 +395,6 @@ class OrderingTests(testtools.TestCase):
def tearDown(self):
super(OrderingTests, self).tearDown()
- unstub()
query = models.DBBackup.query()
query.filter_by(instance_id=self.instance_id).delete()
diff --git a/trove/tests/unittests/backup/test_backupagent.py b/trove/tests/unittests/backup/test_backupagent.py
index 7028696e..efd59683 100644
--- a/trove/tests/unittests/backup/test_backupagent.py
+++ b/trove/tests/unittests/backup/test_backupagent.py
@@ -12,8 +12,7 @@
#See the License for the specific language governing permissions and
#limitations under the License.
-from mock import Mock
-from mockito import when, unstub, any
+from mock import Mock, MagicMock, patch, ANY
from webob.exc import HTTPNotFound
import hashlib
@@ -160,14 +159,12 @@ class MockStats:
class BackupAgentTest(testtools.TestCase):
def setUp(self):
super(BackupAgentTest, self).setUp()
- when(mysql_impl).get_auth_password().thenReturn('123')
- when(backupagent).get_storage_strategy(any(), any()).thenReturn(
- MockSwift)
- when(os).statvfs(any()).thenReturn(MockStats)
+ mysql_impl.get_auth_password = MagicMock(return_value='123')
+ backupagent.get_storage_strategy = MagicMock(return_value=MockSwift)
+ os.statvfs = MagicMock(return_value=MockStats)
def tearDown(self):
super(BackupAgentTest, self).tearDown()
- unstub()
def test_backup_impl_MySQLDump(self):
"""This test is for
@@ -243,24 +240,24 @@ class BackupAgentTest(testtools.TestCase):
self.assertTrue(
conductor_api.API.update_backup.called_once_with(
- any(),
+ ANY,
backup_id=backup_info['id'],
state=BackupState.NEW))
self.assertTrue(
conductor_api.API.update_backup.called_once_with(
- any(),
+ ANY,
backup_id=backup_info['id'],
- size=any(),
+ size=ANY,
state=BackupState.BUILDING))
self.assertTrue(
conductor_api.API.update_backup.called_once_with(
- any(),
+ ANY,
backup_id=backup_info['id'],
- checksum=any(),
- location=any(),
- note=any(),
+ checksum=ANY,
+ location=ANY,
+ note=ANY,
backup_type=backup_info['type'],
state=BackupState.COMPLETED))
@@ -278,47 +275,49 @@ class BackupAgentTest(testtools.TestCase):
self.assertTrue(
conductor_api.API.update_backup.called_once_with(
- any(),
+ ANY,
backup_id=backup_info['id'],
state=BackupState.NEW))
self.assertTrue(
conductor_api.API.update_backup.called_once_with(
- any(),
+ ANY,
backup_id=backup_info['id'],
- size=any(),
+ size=ANY,
state=BackupState.BUILDING))
self.assertTrue(
conductor_api.API.update_backup.called_once_with(
- any(),
+ ANY,
backup_id=backup_info['id'],
- checksum=any(),
- location=any(),
- note=any(),
+ checksum=ANY,
+ location=ANY,
+ note=ANY,
backup_type=backup_info['type'],
state=BackupState.FAILED))
def test_execute_lossy_backup(self):
"""This test verifies that incomplete writes to swift will fail."""
- when(MockSwift).save(any(), any()).thenReturn((False, 'Error', 'y',
- 'z'))
- agent = backupagent.BackupAgent()
+ with patch.object(MockSwift, 'save',
+ return_value=(False, 'Error', 'y', 'z')):
- backup_info = {'id': '123',
- 'location': 'fake-location',
- 'type': 'InnoBackupEx',
- 'checksum': 'fake-checksum',
- }
- self.assertRaises(backupagent.BackupError, agent.execute_backup,
- context=None, backup_info=backup_info,
- runner=MockLossyBackup)
+ agent = backupagent.BackupAgent()
- self.assertTrue(
- conductor_api.API.update_backup.called_once_with(
- any(),
- backup_id=backup_info['id'],
- state=BackupState.FAILED))
+ backup_info = {'id': '123',
+ 'location': 'fake-location',
+ 'type': 'InnoBackupEx',
+ 'checksum': 'fake-checksum',
+ }
+
+ self.assertRaises(backupagent.BackupError, agent.execute_backup,
+ context=None, backup_info=backup_info,
+ runner=MockLossyBackup)
+
+ self.assertTrue(
+ conductor_api.API.update_backup.called_once_with(
+ ANY,
+ backup_id=backup_info['id'],
+ state=BackupState.FAILED))
def test_execute_restore(self):
"""This test should ensure backup agent
@@ -327,79 +326,86 @@ class BackupAgentTest(testtools.TestCase):
transfers/downloads data and invokes the restore module
reports status
"""
- when(backupagent).get_storage_strategy(any(), any()).thenReturn(
- MockStorage)
- when(backupagent).get_restore_strategy(
- 'InnoBackupEx', any()).thenReturn(MockRestoreRunner)
-
- agent = backupagent.BackupAgent()
-
- bkup_info = {'id': '123',
- 'location': 'fake-location',
- 'type': 'InnoBackupEx',
- 'checksum': 'fake-checksum',
- }
- agent.execute_restore(TroveContext(), bkup_info, '/var/lib/mysql')
+ with patch.object(backupagent, 'get_storage_strategy',
+ return_value=MockStorage):
- def test_restore_unknown(self):
- when(backupagent).get_restore_strategy(
- 'foo', any()).thenRaise(ImportError)
+ with patch.object(backupagent, 'get_restore_strategy',
+ return_value=MockRestoreRunner):
- agent = backupagent.BackupAgent()
+ agent = backupagent.BackupAgent()
- bkup_info = {'id': '123',
- 'location': 'fake-location',
- 'type': 'foo',
- 'checksum': 'fake-checksum',
- }
- self.assertRaises(UnknownBackupType, agent.execute_restore,
- context=None, backup_info=bkup_info,
- restore_location='/var/lib/mysql')
+ bkup_info = {'id': '123',
+ 'location': 'fake-location',
+ 'type': 'InnoBackupEx',
+ 'checksum': 'fake-checksum',
+ }
+ agent.execute_restore(TroveContext(),
+ bkup_info,
+ '/var/lib/mysql')
- def test_backup_incremental_metadata(self):
- when(backupagent).get_storage_strategy(any(), any()).thenReturn(
- MockSwift)
- MockStorage.save_metadata = Mock()
- when(MockSwift).load_metadata(any(), any()).thenReturn(
- {'lsn': '54321'})
-
- meta = {
- 'lsn': '12345',
- 'parent_location': 'fake',
- 'parent_checksum': 'md5',
- }
- when(mysql_impl.InnoBackupExIncremental).metadata().thenReturn(meta)
- when(mysql_impl.InnoBackupExIncremental).run().thenReturn(True)
- when(mysql_impl.InnoBackupExIncremental).__exit__().thenReturn(True)
+ def test_restore_unknown(self):
+ with patch.object(backupagent, 'get_restore_strategy',
+ side_effect=ImportError):
- agent = backupagent.BackupAgent()
+ agent = backupagent.BackupAgent()
- bkup_info = {'id': '123',
- 'location': 'fake-location',
- 'type': 'InnoBackupEx',
- 'checksum': 'fake-checksum',
- 'parent': {'location': 'fake', 'checksum': 'md5'}
- }
+ bkup_info = {'id': '123',
+ 'location': 'fake-location',
+ 'type': 'foo',
+ 'checksum': 'fake-checksum',
+ }
- agent.execute_backup(TroveContext(), bkup_info, '/var/lib/mysql')
+ self.assertRaises(UnknownBackupType, agent.execute_restore,
+ context=None, backup_info=bkup_info,
+ restore_location='/var/lib/mysql')
- self.assertTrue(MockStorage.save_metadata.called_once_with(
- any(),
- meta))
+ def test_backup_incremental_metadata(self):
+ with patch.object(backupagent, 'get_storage_strategy',
+ return_value=MockSwift):
+ MockStorage.save_metadata = Mock()
+ with patch.object(MockSwift, 'load_metadata',
+ return_value={'lsn': '54321'}):
+ meta = {
+ 'lsn': '12345',
+ 'parent_location': 'fake',
+ 'parent_checksum': 'md5',
+ }
+
+ mysql_impl.InnoBackupExIncremental.metadata = MagicMock(
+ return_value=meta)
+ mysql_impl.InnoBackupExIncremental.check_process = MagicMock(
+ return_value=True)
+
+ agent = backupagent.BackupAgent()
+
+ bkup_info = {'id': '123',
+ 'location': 'fake-location',
+ 'type': 'InnoBackupEx',
+ 'checksum': 'fake-checksum',
+ 'parent': {'location': 'fake', 'checksum': 'md5'}
+ }
+
+ agent.execute_backup(TroveContext(),
+ bkup_info,
+ '/var/lib/mysql')
+
+ self.assertTrue(MockStorage.save_metadata.called_once_with(
+ ANY,
+ meta))
def test_backup_incremental_bad_metadata(self):
- when(backupagent).get_storage_strategy(any(), any()).thenReturn(
- MockSwift)
+ with patch.object(backupagent, 'get_storage_strategy',
+ return_value=MockSwift):
- agent = backupagent.BackupAgent()
+ agent = backupagent.BackupAgent()
- bkup_info = {'id': '123',
- 'location': 'fake-location',
- 'type': 'InnoBackupEx',
- 'checksum': 'fake-checksum',
- 'parent': {'location': 'fake', 'checksum': 'md5'}
- }
+ bkup_info = {'id': '123',
+ 'location': 'fake-location',
+ 'type': 'InnoBackupEx',
+ 'checksum': 'fake-checksum',
+ 'parent': {'location': 'fake', 'checksum': 'md5'}
+ }
- self.assertRaises(
- AttributeError,
- agent.execute_backup, TroveContext(), bkup_info, 'location')
+ self.assertRaises(
+ AttributeError,
+ agent.execute_backup, TroveContext(), bkup_info, 'location')
diff --git a/trove/tests/unittests/backup/test_storage.py b/trove/tests/unittests/backup/test_storage.py
index a300a4fe..a078e2d8 100644
--- a/trove/tests/unittests/backup/test_storage.py
+++ b/trove/tests/unittests/backup/test_storage.py
@@ -13,8 +13,7 @@
#limitations under the License.
import testtools
-from mock import Mock
-from mockito import when, unstub, any
+from mock import Mock, MagicMock, patch
import hashlib
from trove.common.context import TroveContext
@@ -36,7 +35,6 @@ class SwiftStorageSaveChecksumTests(testtools.TestCase):
def tearDown(self):
super(SwiftStorageSaveChecksumTests, self).tearDown()
- unstub()
def test_swift_checksum_save(self):
"""This tests that SwiftStorage.save returns the swift checksum"""
@@ -46,16 +44,17 @@ class SwiftStorageSaveChecksumTests(testtools.TestCase):
password = 'password'
swift_client = FakeSwiftConnection()
- when(swift).create_swift_client(context).thenReturn(swift_client)
- storage_strategy = SwiftStorage(context)
-
- with MockBackupRunner(filename=backup_id,
- user=user,
- password=password) as runner:
- (success,
- note,
- checksum,
- location) = storage_strategy.save(runner.manifest, runner)
+ with patch.object(swift, 'create_swift_client',
+ return_value=swift_client):
+ storage_strategy = SwiftStorage(context)
+
+ with MockBackupRunner(filename=backup_id,
+ user=user,
+ password=password) as runner:
+ (success,
+ note,
+ checksum,
+ location) = storage_strategy.save(runner.manifest, runner)
self.assertTrue(success, "The backup should have been successful.")
self.assertIsNotNone(note, "A note should have been returned.")
@@ -75,16 +74,17 @@ class SwiftStorageSaveChecksumTests(testtools.TestCase):
password = 'password'
swift_client = FakeSwiftConnection()
- when(swift).create_swift_client(context).thenReturn(swift_client)
- storage_strategy = SwiftStorage(context)
-
- with MockBackupRunner(filename=backup_id,
- user=user,
- password=password) as runner:
- (success,
- note,
- checksum,
- location) = storage_strategy.save(runner.manifest, runner)
+ with patch.object(swift, 'create_swift_client',
+ return_value=swift_client):
+ storage_strategy = SwiftStorage(context)
+
+ with MockBackupRunner(filename=backup_id,
+ user=user,
+ password=password) as runner:
+ (success,
+ note,
+ checksum,
+ location) = storage_strategy.save(runner.manifest, runner)
self.assertFalse(success, "The backup should have failed!")
self.assertTrue(note.startswith("Error saving data to Swift!"))
@@ -107,16 +107,17 @@ class SwiftStorageSaveChecksumTests(testtools.TestCase):
password = 'password'
swift_client = FakeSwiftConnection()
- when(swift).create_swift_client(context).thenReturn(swift_client)
- storage_strategy = SwiftStorage(context)
-
- with MockBackupRunner(filename=backup_id,
- user=user,
- password=password) as runner:
- (success,
- note,
- checksum,
- location) = storage_strategy.save(runner.manifest, runner)
+ with patch.object(swift, 'create_swift_client',
+ return_value=swift_client):
+ storage_strategy = SwiftStorage(context)
+
+ with MockBackupRunner(filename=backup_id,
+ user=user,
+ password=password) as runner:
+ (success,
+ note,
+ checksum,
+ location) = storage_strategy.save(runner.manifest, runner)
self.assertFalse(success, "The backup should have failed!")
self.assertTrue(note.startswith("Error saving data to Swift!"))
@@ -134,7 +135,7 @@ class SwiftStorageUtils(testtools.TestCase):
super(SwiftStorageUtils, self).setUp()
context = TroveContext()
swift_client = FakeSwiftConnection()
- when(swift).create_swift_client(context).thenReturn(swift_client)
+ swift.create_swift_client = MagicMock(return_value=swift_client)
self.swift = SwiftStorage(context)
def tearDown(self):
@@ -168,7 +169,6 @@ class SwiftStorageLoad(testtools.TestCase):
def tearDown(self):
super(SwiftStorageLoad, self).tearDown()
- unstub()
def test_run_verify_checksum(self):
"""This tests that swift download cmd runs if original backup checksum
@@ -180,10 +180,11 @@ class SwiftStorageLoad(testtools.TestCase):
backup_checksum = "fake-md5-sum"
swift_client = FakeSwiftConnection()
- when(swift).create_swift_client(context).thenReturn(swift_client)
+ with patch.object(swift, 'create_swift_client',
+ return_value=swift_client):
- storage_strategy = SwiftStorage(context)
- download_stream = storage_strategy.load(location, backup_checksum)
+ storage_strategy = SwiftStorage(context)
+ download_stream = storage_strategy.load(location, backup_checksum)
self.assertIsNotNone(download_stream)
def test_run_verify_checksum_mismatch(self):
@@ -197,9 +198,9 @@ class SwiftStorageLoad(testtools.TestCase):
backup_checksum = "checksum_different_then_fake_swift_etag"
swift_client = FakeSwiftConnection()
- when(swift).create_swift_client(context).thenReturn(swift_client)
-
- storage_strategy = SwiftStorage(context)
+ with patch.object(swift, 'create_swift_client',
+ return_value=swift_client):
+ storage_strategy = SwiftStorage(context)
self.assertRaises(SwiftDownloadIntegrityError,
storage_strategy.load,
@@ -274,13 +275,11 @@ class SwiftMetadataTests(testtools.TestCase):
super(SwiftMetadataTests, self).setUp()
self.swift_client = FakeSwiftConnection()
self.context = TroveContext()
- when(swift).create_swift_client(self.context).thenReturn(
- self.swift_client)
+ swift.create_swift_client = MagicMock(return_value=self.swift_client)
self.swift = SwiftStorage(self.context)
def tearDown(self):
super(SwiftMetadataTests, self).tearDown()
- unstub()
def test__get_attr(self):
normal_header = self.swift._get_attr('content-type')
@@ -302,10 +301,9 @@ class SwiftMetadataTests(testtools.TestCase):
'etag': '"fake-md5-sum"',
'x-object-meta-lsn': '1234567'
}
- when(self.swift_client).head_object(any(), any()).thenReturn(
- headers)
-
- metadata = self.swift.load_metadata(location, 'fake-md5-sum')
+ with patch.object(self.swift_client, 'head_object',
+ return_value=headers):
+ metadata = self.swift.load_metadata(location, 'fake-md5-sum')
self.assertEqual({'lsn': '1234567'}, metadata)
def test_save_metadata(self):
diff --git a/trove/tests/unittests/common/test_remote.py b/trove/tests/unittests/common/test_remote.py
index 3139c045..731f78b7 100644
--- a/trove/tests/unittests/common/test_remote.py
+++ b/trove/tests/unittests/common/test_remote.py
@@ -16,7 +16,7 @@
# under the License.
#
-from mockito import mock, when, unstub
+from mock import MagicMock
import testtools
from testtools import matchers
@@ -33,17 +33,16 @@ class TestRemote(testtools.TestCase):
def tearDown(self):
super(TestRemote, self).tearDown()
- unstub()
def test_creation(self):
- when(swiftclient.client.Connection).get_auth().thenReturn(None)
+ swiftclient.client.Connection.get_auth = MagicMock(return_value=None)
conn = swiftclient.client.Connection()
self.assertIsNone(conn.get_auth())
def test_create_swift_client(self):
- mock_resp = mock(dict)
- when(swiftclient.client.Connection).get_container('bob').thenReturn(
- ["text", mock_resp])
+ mock_resp = MagicMock()
+ swiftclient.client.Connection.get_container = MagicMock(
+ return_value=["text", mock_resp])
client = remote.create_swift_client(TroveContext(tenant='123'))
headers, container = client.get_container('bob')
self.assertIs(headers, "text")
@@ -132,7 +131,8 @@ class TestRemote(testtools.TestCase):
self.assertThat(obj_1, matchers.Equals(
{'bytes': 13, 'last_modified': '2013-03-15T22:10:49.361950',
'hash': 'ccc55aefbf92aa66f42b638802c5e7f6', 'name': 'test',
- 'content_type': 'application/octet-stream'}))
+ 'content_type': 'application/octet-stream',
+ 'contents': 'test_contents'}))
# test object api - not much to do here
self.assertThat(conn.get_object('bob', 'test')[1],
matchers.Is('test_contents'))
@@ -140,7 +140,6 @@ class TestRemote(testtools.TestCase):
# test remove object
swift_stub.without_object('bob', 'test')
# interact
- conn.delete_object('bob', 'test')
with testtools.ExpectedException(swiftclient.ClientException):
conn.delete_object('bob', 'test')
self.assertThat(len(conn.get_container('bob')[1]), matchers.Is(0))
@@ -169,14 +168,14 @@ class TestRemote(testtools.TestCase):
self.assertThat(cont_info[1][0], matchers.Equals(
{'bytes': 13, 'last_modified': '2013-03-15T22:10:49.361950',
'hash': 'ccc55aefbf92aa66f42b638802c5e7f6', 'name': 'test',
- 'content_type': 'application/octet-stream'}))
+ 'content_type': 'application/octet-stream',
+ 'contents': 'test_contents'}))
self.assertThat(conn.get_object('bob', 'test')[1],
matchers.Is('test_contents'))
self.assertThat(conn.get_object('bob', 'test2')[1],
matchers.Is('test_contents2'))
swift_stub.without_object('bob', 'test')
- conn.delete_object('bob', 'test')
with testtools.ExpectedException(swiftclient.ClientException):
conn.delete_object('bob', 'test')
self.assertThat(len(conn.get_container('bob')[1]), matchers.Is(1))
diff --git a/trove/tests/unittests/conductor/test_methods.py b/trove/tests/unittests/conductor/test_methods.py
index 11136338..b88475ff 100644
--- a/trove/tests/unittests/conductor/test_methods.py
+++ b/trove/tests/unittests/conductor/test_methods.py
@@ -13,7 +13,6 @@
# under the License.
import testtools
-from mockito import unstub
from trove.backup import models as bkup_models
from trove.common import exception as t_exception
from trove.common import utils
@@ -39,7 +38,6 @@ class ConductorMethodTests(testtools.TestCase):
def tearDown(self):
super(ConductorMethodTests, self).tearDown()
- unstub()
def _create_iss(self):
new_id = utils.generate_uuid()
diff --git a/trove/tests/unittests/dns/test_designate_driver.py b/trove/tests/unittests/dns/test_designate_driver.py
index 418166a1..7e0c3d43 100644
--- a/trove/tests/unittests/dns/test_designate_driver.py
+++ b/trove/tests/unittests/dns/test_designate_driver.py
@@ -15,9 +15,7 @@ import testtools
from designateclient.v1.domains import Domain
from designateclient.v1.records import Record
from trove.dns.designate import driver
-from mockito import any
-from mockito import mock
-from mockito import when
+from mock import MagicMock
import base64
import hashlib
@@ -87,9 +85,9 @@ class DesignateDriverTest(testtools.TestCase):
def test_get_entries_by_name(self):
zone = driver.DesignateDnsZone('123', 'www.example.com')
- when(driver).create_designate_client().thenReturn(None)
- when(driver.DesignateDriver)._get_records(any()).thenReturn(
- self.records)
+ driver.create_designate_client = MagicMock(return_value=None)
+ driver.DesignateDriver._get_records = MagicMock(
+ return_value=self.records)
dns_driver = driver.DesignateDriver()
entries = dns_driver.get_entries_by_name('record2', zone)
self.assertTrue(len(entries) == 1, 'More than one record found')
@@ -105,18 +103,18 @@ class DesignateDriverTest(testtools.TestCase):
def test_get_entries_by_name_not_found(self):
zone = driver.DesignateDnsZone('123', 'www.example.com')
- when(driver).create_designate_client().thenReturn(None)
- when(driver.DesignateDriver)._get_records(any()).thenReturn(
- self.records)
+ driver.create_designate_client = MagicMock(return_value=None)
+ driver.DesignateDriver._get_records = MagicMock(
+ return_value=self.records)
dns_driver = driver.DesignateDriver()
entries = dns_driver.get_entries_by_name('record_not_found', zone)
self.assertTrue(len(entries) == 0, 'Some records were returned')
def test_get_entries_by_content(self):
zone = driver.DesignateDnsZone('123', 'www.example.com')
- when(driver).create_designate_client().thenReturn(None)
- when(driver.DesignateDriver)._get_records(any()).thenReturn(
- self.records)
+ driver.create_designate_client = MagicMock(return_value=None)
+ driver.DesignateDriver._get_records = MagicMock(
+ return_value=self.records)
dns_driver = driver.DesignateDriver()
entries = dns_driver.get_entries_by_content('10.0.0.1', zone)
self.assertTrue(len(entries) == 1, 'More than one record found')
@@ -132,19 +130,17 @@ class DesignateDriverTest(testtools.TestCase):
def test_get_entries_by_content_not_found(self):
zone = driver.DesignateDnsZone('123', 'www.example.com')
- when(driver).create_designate_client().thenReturn(None)
- when(driver.DesignateDriver)._get_records(any()).thenReturn(
- self.records)
+ driver.create_designate_client = MagicMock(return_value=None)
+ driver.DesignateDriver._get_records = MagicMock(
+ return_value=self.records)
dns_driver = driver.DesignateDriver()
entries = dns_driver.get_entries_by_content('127.0.0.1', zone)
self.assertTrue(len(entries) == 0, 'Some records were returned')
def test_get_dnz_zones(self):
-
- client = mock()
- client.domains = mock()
- when(driver).create_designate_client().thenReturn(client)
- when(client.domains).list().thenReturn(self.domains)
+ client = MagicMock()
+ driver.create_designate_client = MagicMock(return_value=client)
+ client.domains.list = MagicMock(return_value=self.domains)
dns_driver = driver.DesignateDriver()
zones = dns_driver.get_dns_zones()
self.assertTrue(len(zones) == 3)
@@ -152,20 +148,18 @@ class DesignateDriverTest(testtools.TestCase):
self.assertDomainsAreEqual(self.domains[x], zones[x])
def test_get_dnz_zones_by_name(self):
- client = mock()
- client.domains = mock()
- when(driver).create_designate_client().thenReturn(client)
- when(client.domains).list().thenReturn(self.domains)
+ client = MagicMock()
+ driver.create_designate_client = MagicMock(return_value=client)
+ client.domains.list = MagicMock(return_value=self.domains)
dns_driver = driver.DesignateDriver()
zones = dns_driver.get_dns_zones('www.trove.com')
self.assertTrue(len(zones) == 1)
self.assertDomainsAreEqual(self.domains[1], zones[0])
def test_get_dnz_zones_not_found(self):
- client = mock()
- client.domains = mock()
- when(driver).create_designate_client().thenReturn(client)
- when(client.domains).list().thenReturn(self.domains)
+ client = MagicMock()
+ driver.create_designate_client = MagicMock(return_value=client)
+ client.domains.list = MagicMock(return_value=self.domains)
dns_driver = driver.DesignateDriver()
zones = dns_driver.get_dns_zones('www.notfound.com')
self.assertTrue(len(zones) == 0)
diff --git a/trove/tests/unittests/guestagent/test_api.py b/trove/tests/unittests/guestagent/test_api.py
index a2c7c993..dce0b89f 100644
--- a/trove/tests/unittests/guestagent/test_api.py
+++ b/trove/tests/unittests/guestagent/test_api.py
@@ -11,21 +11,15 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
-from mockito import when
-from mockito import any
-from mockito import verify
-from mockito import unstub
-from mockito import mock
-from mockito import verifyZeroInteractions
-from mockito import never
-import mockito.matchers
+import mock
import testtools
from testtools.matchers import KeysEqual, Is
-from trove.guestagent import models as agent_models
-import trove.db.models as db_models
+
+import trove.common.context as context
from trove.common import exception
from trove.guestagent import api
import trove.openstack.common.rpc as rpc
+import trove.common.rpc as trove_rpc
def _mock_call_pwd_change(cmd, users=None):
@@ -47,23 +41,12 @@ def _mock_call(cmd, timerout, username=None, hostname=None,
class ApiTest(testtools.TestCase):
def setUp(self):
super(ApiTest, self).setUp()
- self.guest = api.API('mock_content', 0)
+ self.context = context.TroveContext()
+ self.guest = api.API(self.context, 0)
self.guest._cast = _mock_call_pwd_change
self.guest._call = _mock_call
self.FAKE_ID = 'instance-id-x23d2d'
- self.api = api.API(mock(), self.FAKE_ID)
- when(rpc).call(any(), any(), any(), any(int)).thenRaise(
- ValueError('Unexpected Rpc Invocation'))
- when(rpc).cast(any(), any(), any()).thenRaise(
- ValueError('Unexpected Rpc Invocation'))
-
- def tearDown(self):
- super(ApiTest, self).tearDown()
- unstub()
-
- def test_delete_queue(self):
- self.skipTest("find out if this delete_queue function is needed "
- "anymore, Bug#1097482")
+ self.api = api.API(self.context, self.FAKE_ID)
def test_change_passwords(self):
self.assertIsNone(self.guest.change_passwords("dummy"))
@@ -85,285 +68,311 @@ class ApiTest(testtools.TestCase):
self.assertEqual('guestagent.' + self.FAKE_ID,
self.api._get_routing_key())
- def test_check_for_heartbeat_positive(self):
- when(db_models.DatabaseModelBase).find_by(
- instance_id=any()).thenReturn('agent')
- when(agent_models.AgentHeartBeat).is_active('agent').thenReturn(True)
+ @mock.patch('trove.guestagent.models.AgentHeartBeat')
+ def test_check_for_heartbeat_positive(self, mock_agent):
self.assertTrue(self.api._check_for_hearbeat())
- def test_check_for_heartbeat_exception(self):
+ @mock.patch('trove.guestagent.models.AgentHeartBeat')
+ def test_check_for_heartbeat_exception(self, mock_agent):
# TODO(juice): maybe it would be ok to extend the test to validate
# the is_active method on the heartbeat
- when(db_models.DatabaseModelBase).find_by(instance_id=any()).thenRaise(
- exception.ModelNotFoundError)
- when(agent_models.AgentHeartBeat).is_active(any()).thenReturn(None)
-
+ mock_agent.find_by.side_effect = exception.ModelNotFoundError("Uh Oh!")
+ # execute
self.assertRaises(exception.GuestTimeout, self.api._check_for_hearbeat)
+ # validate
+ self.assertEqual(mock_agent.is_active.call_count, 0)
- verify(agent_models.AgentHeartBeat, times=0).is_active(any())
-
- def test_check_for_heartbeat_negative(self):
+ @mock.patch('trove.guestagent.models.AgentHeartBeat')
+ def test_check_for_heartbeat_negative(self, mock_agent):
# TODO(juice): maybe it would be ok to extend the test to validate
# the is_active method on the heartbeat
- when(db_models.DatabaseModelBase).find_by(
- instance_id=any()).thenReturn('agent')
- when(agent_models.AgentHeartBeat).is_active(any()).thenReturn(False)
+ mock_agent.is_active.return_value = False
self.assertRaises(exception.GuestTimeout, self.api._check_for_hearbeat)
+ def test_delete_queue(self):
+ trove_rpc.delete_queue = mock.Mock()
+ # execute
+ self.api.delete_queue()
+ # verify
+ trove_rpc.delete_queue.assert_called_with(self.context, mock.ANY)
+
def test_create_user(self):
+ rpc.cast = mock.Mock()
exp_msg = RpcMsgMatcher('create_user', 'users')
- self._mock_rpc_cast(exp_msg)
self.api.create_user('test_user')
- self._verify_rpc_cast(exp_msg)
+ self._verify_rpc_cast(exp_msg, rpc.cast)
def test_rpc_cast_exception(self):
+ rpc.cast = mock.Mock(side_effect=IOError('host down'))
exp_msg = RpcMsgMatcher('create_user', 'users')
- when(rpc).cast(any(), any(), exp_msg).thenRaise(IOError('host down'))
-
+ # execute
with testtools.ExpectedException(exception.GuestError, '.* host down'):
self.api.create_user('test_user')
-
- self._verify_rpc_cast(exp_msg)
+ # verify
+ self._verify_rpc_cast(exp_msg, rpc.cast)
def test_list_users(self):
+ exp_resp = ['user1', 'user2', 'user3']
+ rpc.call = mock.Mock(return_value=exp_resp)
exp_msg = RpcMsgMatcher('list_users', 'limit', 'marker',
'include_marker')
- exp_resp = ['user1', 'user2', 'user3']
- self._mock_rpc_call(exp_msg, exp_resp)
+ # execute
act_resp = self.api.list_users()
+ # verify
self.assertThat(act_resp, Is(exp_resp))
- self._verify_rpc_call(exp_msg)
+ self._verify_rpc_call(exp_msg, rpc.call)
def test_rpc_call_exception(self):
+ rpc.call = mock.Mock(side_effect=IOError('host_down'))
exp_msg = RpcMsgMatcher('list_users', 'limit', 'marker',
'include_marker')
- when(rpc).call(any(), any(), exp_msg, any(int)).thenRaise(
- IOError('host down'))
-
+ # execute
with testtools.ExpectedException(exception.GuestError,
'An error occurred.*'):
self.api.list_users()
-
- self._verify_rpc_call(exp_msg)
+ # verify
+ self._verify_rpc_call(exp_msg, rpc.call)
def test_delete_user(self):
+ rpc.cast = mock.Mock()
exp_msg = RpcMsgMatcher('delete_user', 'user')
- self._mock_rpc_cast(exp_msg)
+ # execute
self.api.delete_user('test_user')
- self._mock_rpc_cast(exp_msg)
+ # verify
+ self._verify_rpc_cast(exp_msg, rpc.cast)
def test_create_database(self):
+ rpc.cast = mock.Mock()
exp_msg = RpcMsgMatcher('create_database', 'databases')
- self._mock_rpc_cast(exp_msg)
+ # execute
self.api.create_database(['db1', 'db2', 'db3'])
- self._verify_rpc_cast(exp_msg)
+ # verify
+ self._verify_rpc_cast(exp_msg, rpc.cast)
def test_list_databases(self):
+ exp_resp = ['db1', 'db2', 'db3']
+ rpc.call = mock.Mock(return_value=exp_resp)
exp_msg = RpcMsgMatcher('list_databases', 'limit', 'marker',
'include_marker')
- exp_resp = ['db1', 'db2', 'db3']
- self._mock_rpc_call(exp_msg, exp_resp)
+ # execute
resp = self.api.list_databases(limit=1, marker=2,
include_marker=False)
+ # verify
self.assertThat(resp, Is(exp_resp))
- self._verify_rpc_call(exp_msg)
+ self._verify_rpc_call(exp_msg, rpc.call)
def test_delete_database(self):
+ rpc.cast = mock.Mock()
exp_msg = RpcMsgMatcher('delete_database', 'database')
- self._mock_rpc_cast(exp_msg)
+ # execute
self.api.delete_database('test_database_name')
- self._verify_rpc_cast(exp_msg)
+ # verify
+ self._verify_rpc_cast(exp_msg, rpc.cast)
def test_enable_root(self):
+ rpc.call = mock.Mock(return_value=True)
exp_msg = RpcMsgMatcher('enable_root')
- self._mock_rpc_call(exp_msg, True)
+ # execute
self.assertThat(self.api.enable_root(), Is(True))
- self._verify_rpc_call(exp_msg)
+ # verify
+ self._verify_rpc_call(exp_msg, rpc.call)
def test_disable_root(self):
+ rpc.call = mock.Mock(return_value=True)
exp_msg = RpcMsgMatcher('disable_root')
- self._mock_rpc_call(exp_msg, True)
+ # execute
self.assertThat(self.api.disable_root(), Is(True))
- self._verify_rpc_call(exp_msg)
+ # verify
+ self._verify_rpc_call(exp_msg, rpc.call)
def test_is_root_enabled(self):
+ rpc.call = mock.Mock(return_value=False)
exp_msg = RpcMsgMatcher('is_root_enabled')
- self._mock_rpc_call(exp_msg, False)
+ # execute
self.assertThat(self.api.is_root_enabled(), Is(False))
- self._verify_rpc_call(exp_msg)
+ # verify
+ self._verify_rpc_call(exp_msg, rpc.call)
def test_get_hwinfo(self):
+ rpc.call = mock.Mock(return_value='[blah]')
exp_msg = RpcMsgMatcher('get_hwinfo')
- self._mock_rpc_call(exp_msg)
- self.api.get_hwinfo()
- self._verify_rpc_call(exp_msg)
+ # execute
+ self.assertThat(self.api.get_hwinfo(), Is('[blah]'))
+ # verify
+ self._verify_rpc_call(exp_msg, rpc.call)
def test_get_diagnostics(self):
+ rpc.call = mock.Mock(spec=rpc, return_value='[all good]')
exp_msg = RpcMsgMatcher('get_diagnostics')
- self._mock_rpc_call(exp_msg)
- self.api.get_diagnostics()
- self._verify_rpc_call(exp_msg)
+ # execute
+ self.assertThat(self.api.get_diagnostics(), Is('[all good]'))
+ # verify
+ self._verify_rpc_call(exp_msg, rpc.call)
def test_restart(self):
+ rpc.call = mock.Mock()
exp_msg = RpcMsgMatcher('restart')
- self._mock_rpc_call(exp_msg)
+ # execute
self.api.restart()
- self._verify_rpc_call(exp_msg)
+ # verify
+ self._verify_rpc_call(exp_msg, rpc.call)
def test_start_db_with_conf_changes(self):
+ rpc.call = mock.Mock()
exp_msg = RpcMsgMatcher('start_db_with_conf_changes',
'config_contents')
- self._mock_rpc_call(exp_msg)
+ # execute
self.api.start_db_with_conf_changes(None)
- self._verify_rpc_call(exp_msg)
+ # verify
+ self._verify_rpc_call(exp_msg, rpc.call)
def test_stop_db(self):
+ rpc.call = mock.Mock()
exp_msg = RpcMsgMatcher('stop_db', 'do_not_start_on_reboot')
- self._mock_rpc_call(exp_msg)
+ # execute
self.api.stop_db(do_not_start_on_reboot=False)
- self._verify_rpc_call(exp_msg)
+ # verify
+ self._verify_rpc_call(exp_msg, rpc.call)
def test_get_volume_info(self):
fake_resp = {'fake': 'resp'}
+ rpc.call = mock.Mock(return_value=fake_resp)
exp_msg = RpcMsgMatcher('get_filesystem_stats', 'fs_path')
- self._mock_rpc_call(exp_msg, fake_resp)
+ # execute
self.assertThat(self.api.get_volume_info(), Is(fake_resp))
- self._verify_rpc_call(exp_msg)
+ # verify
+ self._verify_rpc_call(exp_msg, rpc.call)
def test_update_guest(self):
+ rpc.call = mock.Mock()
exp_msg = RpcMsgMatcher('update_guest')
- self._mock_rpc_call(exp_msg)
+ # execute
self.api.update_guest()
- self._verify_rpc_call(exp_msg)
+ # verify
+ self._verify_rpc_call(exp_msg, rpc.call)
def test_create_backup(self):
+ rpc.cast = mock.Mock()
exp_msg = RpcMsgMatcher('create_backup', 'backup_info')
- self._mock_rpc_cast(exp_msg)
+ # execute
self.api.create_backup({'id': '123'})
- self._verify_rpc_cast(exp_msg)
+ # verify
+ self._verify_rpc_cast(exp_msg, rpc.cast)
def test_update_overrides(self):
+ rpc.cast = mock.Mock()
exp_msg = RpcMsgMatcher('update_overrides', 'overrides', 'remove')
- self._mock_rpc_cast(exp_msg)
+ # execute
self.api.update_overrides('123')
- self._verify_rpc_cast(exp_msg)
+ # verify
+ self._verify_rpc_cast(exp_msg, rpc.cast)
def test_apply_overrides(self):
+ rpc.cast = mock.Mock()
exp_msg = RpcMsgMatcher('apply_overrides', 'overrides')
- self._mock_rpc_cast(exp_msg)
+ # execute
self.api.apply_overrides('123')
- self._verify_rpc_cast(exp_msg)
+ # verify
+ self._verify_rpc_cast(exp_msg, rpc.cast)
def _verify_rpc_connection_and_cast(self, rpc, mock_conn, exp_msg):
- verify(rpc).create_connection(new=True)
- verify(mock_conn).create_consumer(self.api._get_routing_key(), None,
- fanout=False)
- verify(rpc).cast(any(), any(), exp_msg)
+ rpc.create_connection.assert_called_with(new=True)
+ mock_conn.create_consumer.assert_called_with(
+ self.api._get_routing_key(), None, fanout=False)
+ rpc.cast.assert_called_with(mock.ANY, mock.ANY, exp_msg)
def test_prepare(self):
- mock_conn = mock()
- when(rpc).create_connection(new=True).thenReturn(mock_conn)
- when(mock_conn).create_consumer(any(), any(), any()).thenReturn(None)
+ mock_conn = mock.Mock()
+ rpc.create_connection = mock.Mock(return_value=mock_conn)
+ rpc.cast = mock.Mock()
exp_msg = RpcMsgMatcher('prepare', 'memory_mb', 'packages',
'databases', 'users', 'device_path',
'mount_point', 'backup_info',
'config_contents', 'root_password',
'overrides')
- when(rpc).cast(any(), any(), exp_msg).thenReturn(None)
-
+ # execute
self.api.prepare('2048', 'package1', 'db1', 'user1', '/dev/vdt',
'/mnt/opt', 'bkup-1232', 'cont', '1-2-3-4',
'override')
-
+ # verify
self._verify_rpc_connection_and_cast(rpc, mock_conn, exp_msg)
def test_prepare_with_backup(self):
- mock_conn = mock()
- when(rpc).create_connection(new=True).thenReturn(mock_conn)
- when(mock_conn).create_consumer(any(), any(), any()).thenReturn(None)
+ mock_conn = mock.Mock()
+ rpc.create_connection = mock.Mock(return_value=mock_conn)
+ rpc.cast = mock.Mock()
exp_msg = RpcMsgMatcher('prepare', 'memory_mb', 'packages',
'databases', 'users', 'device_path',
'mount_point', 'backup_info',
'config_contents', 'root_password',
'overrides')
- when(rpc).cast(any(), any(), exp_msg).thenReturn(None)
bkup = {'id': 'backup_id_123'}
-
+ # execute
self.api.prepare('2048', 'package1', 'db1', 'user1', '/dev/vdt',
'/mnt/opt', bkup, 'cont', '1-2-3-4',
'overrides')
+ # verify
self._verify_rpc_connection_and_cast(rpc, mock_conn, exp_msg)
def test_upgrade(self):
- mock_conn = mock()
- when(rpc).create_connection(new=True).thenReturn(mock_conn)
- when(mock_conn).create_consumer(any(), any(), any()).thenReturn(None)
+ mock_conn = mock.Mock()
+ rpc.create_connection = mock.Mock(return_value=mock_conn)
+ rpc.cast = mock.Mock()
exp_msg = RpcMsgMatcher('upgrade')
- when(rpc).cast(any(), any(), exp_msg).thenReturn(None)
-
+ # execute
self.api.upgrade()
-
+ # verify
self._verify_rpc_connection_and_cast(rpc, mock_conn, exp_msg)
def test_rpc_cast_with_consumer_exception(self):
- mock_conn = mock()
- when(rpc).create_connection(new=True).thenRaise(IOError('host down'))
- exp_msg = RpcMsgMatcher('prepare', 'memory_mb', 'packages',
- 'databases', 'users', 'device_path',
- 'mount_point')
-
+ mock_conn = mock.Mock()
+ rpc.create_connection = mock.Mock(side_effect=IOError('host down'))
+ rpc.cast = mock.Mock()
+ # execute
with testtools.ExpectedException(exception.GuestError, '.* host down'):
self.api.prepare('2048', 'package1', 'db1', 'user1', '/dev/vdt',
'/mnt/opt')
+ # verify
+ rpc.create_connection.assert_called_with(new=True)
+ self.assertThat(mock_conn.call_count, Is(0))
+ self.assertThat(rpc.cast.call_count, Is(0))
- verify(rpc).create_connection(new=True)
- verifyZeroInteractions(mock_conn)
- verify(rpc, never).cast(any(), any(), exp_msg)
+ def _verify_rpc_call(self, exp_msg, mock_call=None):
+ mock_call.assert_called_with(self.context, mock.ANY, exp_msg,
+ mock.ANY)
- def _mock_rpc_call(self, exp_msg, resp=None):
- rpc.common = mock()
- when(rpc).call(any(), any(), exp_msg, any(int)).thenReturn(resp)
-
- def _verify_rpc_call(self, exp_msg):
- verify(rpc).call(any(), any(), exp_msg, any(int))
-
- def _mock_rpc_cast(self, exp_msg):
- when(rpc).cast(any(), any(), exp_msg).thenReturn(None)
-
- def _verify_rpc_cast(self, exp_msg):
- verify(rpc).cast(any(), any(), exp_msg)
+ def _verify_rpc_cast(self, exp_msg, mock_cast=None):
+ mock_cast.assert_called_with(mock.ANY,
+ mock.ANY, exp_msg)
class CastWithConsumerTest(testtools.TestCase):
def setUp(self):
super(CastWithConsumerTest, self).setUp()
- self.api = api.API(mock(), 'instance-id-x23d2d')
-
- def tearDown(self):
- super(CastWithConsumerTest, self).tearDown()
- unstub()
-
- def test__cast_with_consumer(self):
- mock_conn = mock()
- when(rpc).create_connection(new=True).thenReturn(mock_conn)
- when(mock_conn).create_consumer(any(), any(), any()).thenReturn(None)
- when(rpc).cast(any(), any(), any()).thenReturn(None)
-
+ self.context = context.TroveContext()
+ self.api = api.API(self.context, 'instance-id-x23d2d')
+
+ def test_cast_with_consumer(self):
+ mock_conn = mock.Mock()
+ rpc.create_connection = mock.Mock(return_value=mock_conn)
+ rpc.cast = mock.Mock()
+ # execute
self.api._cast_with_consumer('fake_method_name', fake_param=1)
-
- verify(rpc).create_connection(new=True)
- verify(mock_conn).create_consumer(any(), None, fanout=False)
- verify(rpc).cast(any(), any(), any())
+ # verify
+ rpc.create_connection.assert_called_with(new=True)
+ mock_conn.create_consumer.assert_called_with(mock.ANY, None,
+ fanout=False)
+ rpc.cast.assert_called_with(self.context, mock.ANY, mock.ANY)
-class RpcMsgMatcher(mockito.matchers.Matcher):
+class RpcMsgMatcher(object):
def __init__(self, method, *args_dict):
self.wanted_method = method
self.wanted_dict = KeysEqual('version', 'method', 'args', 'namespace')
args_dict = args_dict or [{}]
self.args_dict = KeysEqual(*args_dict)
- def matches(self, arg):
+ def __eq__(self, arg):
if self.wanted_method != arg['method']:
raise Exception("Method does not match: %s != %s" %
(self.wanted_method, arg['method']))
diff --git a/trove/tests/unittests/guestagent/test_cassandra_manager.py b/trove/tests/unittests/guestagent/test_cassandra_manager.py
index 62a6391a..037d726b 100644
--- a/trove/tests/unittests/guestagent/test_cassandra_manager.py
+++ b/trove/tests/unittests/guestagent/test_cassandra_manager.py
@@ -15,15 +15,14 @@
import os
import testtools
-from mock import Mock
-from mockito import verify, when, unstub, any, mock
+from mock import MagicMock
from trove.common.context import TroveContext
from trove.common.instance import ServiceStatuses
from trove.guestagent import volume
from trove.guestagent.common import operating_system
from trove.guestagent.datastore.cassandra import service as cass_service
from trove.guestagent.datastore.cassandra import manager as cass_manager
-from trove.guestagent import pkg
+from trove.guestagent import pkg as pkg
class GuestAgentCassandraDBManagerTest(testtools.TestCase):
@@ -38,7 +37,7 @@ class GuestAgentCassandraDBManagerTest(testtools.TestCase):
def save(self):
pass
- cass_service.CassandraAppStatus.set_status = Mock(
+ cass_service.CassandraAppStatus.set_status = MagicMock(
return_value=FakeInstanceServiceStatus())
self.context = TroveContext()
self.manager = cass_manager.Manager()
@@ -69,13 +68,12 @@ class GuestAgentCassandraDBManagerTest(testtools.TestCase):
operating_system.get_ip_address = self.original_get_ip
cass_service.CassandraApp.make_host_reachable = (
self.orig_make_host_reachable)
- unstub()
def test_update_status(self):
- mock_status = mock()
+ mock_status = MagicMock()
self.manager.appStatus = mock_status
self.manager.update_status(self.context)
- verify(mock_status).update()
+ mock_status.update.assert_any_call()
def test_prepare_pkg(self):
self._prepare_dynamic(['cassandra'])
@@ -91,7 +89,7 @@ class GuestAgentCassandraDBManagerTest(testtools.TestCase):
is_db_installed=True)
def _prepare_dynamic(self, packages,
- config_content=any(), device_path='/dev/vdb',
+ config_content='MockContent', device_path='/dev/vdb',
is_db_installed=True, backup_id=None,
is_root_enabled=False,
overrides=None):
@@ -103,25 +101,22 @@ class GuestAgentCassandraDBManagerTest(testtools.TestCase):
'checksum': 'fake-checksum',
}
- mock_status = mock()
+ mock_status = MagicMock()
+ mock_app = MagicMock()
self.manager.appStatus = mock_status
- when(mock_status).begin_install().thenReturn(None)
-
- mock_app = mock()
self.manager.app = mock_app
- when(mock_app).install_if_needed(packages).thenReturn(None)
- (when(pkg.Package).pkg_is_installed(any()).
- thenReturn(is_db_installed))
- when(mock_app).init_storage_structure(any()).thenReturn(None)
- when(mock_app).write_config(config_content).thenReturn(None)
- when(mock_app).make_host_reachable().thenReturn(None)
- when(mock_app).restart().thenReturn(None)
- when(os.path).exists(any()).thenReturn(True)
-
- when(volume.VolumeDevice).format().thenReturn(None)
- when(volume.VolumeDevice).migrate_data(any()).thenReturn(None)
- when(volume.VolumeDevice).mount().thenReturn(None)
+ mock_status.begin_install = MagicMock(return_value=None)
+ mock_app.install_if_needed = MagicMock(return_value=None)
+ pkg.Package.pkg_is_installed = MagicMock(return_value=is_db_installed)
+ mock_app.init_storage_structure = MagicMock(return_value=None)
+ mock_app.write_config = MagicMock(return_value=None)
+ mock_app.make_host_reachable = MagicMock(return_value=None)
+ mock_app.restart = MagicMock(return_value=None)
+ os.path.exists = MagicMock(return_value=True)
+ volume.VolumeDevice.format = MagicMock(return_value=None)
+ volume.VolumeDevice.migrate_data = MagicMock(return_value=None)
+ volume.VolumeDevice.mount = MagicMock(return_value=None)
# invocation
self.manager.prepare(context=self.context, packages=packages,
@@ -132,9 +127,10 @@ class GuestAgentCassandraDBManagerTest(testtools.TestCase):
mount_point="/var/lib/cassandra",
backup_info=backup_info,
overrides=None)
+
# verification/assertion
- verify(mock_status).begin_install()
- verify(mock_app).install_if_needed(packages)
- verify(mock_app).init_storage_structure(any())
- verify(mock_app).make_host_reachable()
- verify(mock_app).restart()
+ mock_status.begin_install.assert_any_call()
+ mock_app.install_if_needed.assert_any_call(packages)
+ mock_app.init_storage_structure.assert_any_call('/var/lib/cassandra')
+ mock_app.make_host_reachable.assert_any_call()
+ mock_app.restart.assert_any_call()
diff --git a/trove/tests/unittests/guestagent/test_couchbase_manager.py b/trove/tests/unittests/guestagent/test_couchbase_manager.py
index 8416ad06..e63a77b2 100644
--- a/trove/tests/unittests/guestagent/test_couchbase_manager.py
+++ b/trove/tests/unittests/guestagent/test_couchbase_manager.py
@@ -13,8 +13,7 @@
# under the License.
import testtools
-from mock import Mock
-from mockito import verify, when, unstub, any, mock
+from mock import MagicMock
from trove.common.context import TroveContext
from trove.guestagent import volume
from trove.guestagent.common import operating_system
@@ -34,7 +33,11 @@ class GuestAgentCouchbaseManagerTest(testtools.TestCase):
self.origin_mount = volume.VolumeDevice.mount
self.origin_stop_db = couch_service.CouchbaseApp.stop_db
self.origin_start_db = couch_service.CouchbaseApp.start_db
- operating_system.get_ip_address = Mock()
+ self.origin_restart = couch_service.CouchbaseApp.restart
+ self.origin_install_if = couch_service.CouchbaseApp.install_if_needed
+ self.origin_complete_install = \
+ couch_service.CouchbaseApp.complete_install_or_restart
+ operating_system.get_ip_address = MagicMock()
def tearDown(self):
super(GuestAgentCouchbaseManagerTest, self).tearDown()
@@ -43,53 +46,60 @@ class GuestAgentCouchbaseManagerTest(testtools.TestCase):
volume.VolumeDevice.mount = self.origin_mount
couch_service.CouchbaseApp.stop_db = self.origin_stop_db
couch_service.CouchbaseApp.start_db = self.origin_start_db
- unstub()
+ couch_service.CouchbaseApp.restart = self.origin_restart
+ couch_service.CouchbaseApp.install_if_needed = self.origin_install_if
+ couch_service.CouchbaseApp.complete_install_or_restart = \
+ self.origin_complete_install
def test_update_status(self):
- mock_status = mock()
+ mock_status = MagicMock()
self.manager.appStatus = mock_status
self.manager.update_status(self.context)
- verify(mock_status).update()
+ mock_status.update.assert_any_call()
def test_prepare_device_path_true(self):
self._prepare_dynamic()
def _prepare_dynamic(self, device_path='/dev/vdb', is_db_installed=True,
backup_info=None):
- mock_status = mock()
+ mock_status = MagicMock()
self.manager.appStatus = mock_status
- when(mock_status).begin_install().thenReturn(None)
- when(volume.VolumeDevice).format().thenReturn(None)
- when(volume.VolumeDevice).mount().thenReturn(None)
- when(couch_service.CouchbaseApp).install_if_needed().thenReturn(None)
- when(couch_service.CouchbaseApp).complete_install_or_restart(
- any()).thenReturn(None)
+
+ mock_status.begin_install = MagicMock(return_value=None)
+ volume.VolumeDevice.format = MagicMock(return_value=None)
+ volume.VolumeDevice.mount = MagicMock(return_value=None)
+ couch_service.CouchbaseApp.install_if_needed = MagicMock(
+ return_value=None)
+ couch_service.CouchbaseApp.complete_install_or_restart = MagicMock(
+ return_value=None)
+
#invocation
self.manager.prepare(self.context, self.packages, None, 2048,
None, device_path=device_path,
mount_point='/var/lib/couchbase',
backup_info=backup_info)
#verification/assertion
- verify(mock_status).begin_install()
- verify(couch_service.CouchbaseApp).install_if_needed(self.packages)
- verify(couch_service.CouchbaseApp).complete_install_or_restart()
+ mock_status.begin_install.assert_any_call()
+ couch_service.CouchbaseApp.install_if_needed.assert_any_call(
+ self.packages)
+ couch_service.CouchbaseApp.complete_install_or_restart.\
+ assert_any_call()
def test_restart(self):
- mock_status = mock()
+ mock_status = MagicMock()
self.manager.appStatus = mock_status
- when(couch_service.CouchbaseApp).restart().thenReturn(None)
+ couch_service.CouchbaseApp.restart = MagicMock(return_value=None)
#invocation
self.manager.restart(self.context)
#verification/assertion
- verify(couch_service.CouchbaseApp).restart()
+ couch_service.CouchbaseApp.restart.assert_any_call()
def test_stop_db(self):
- mock_status = mock()
+ mock_status = MagicMock()
self.manager.appStatus = mock_status
- when(couch_service.CouchbaseApp).stop_db(
- do_not_start_on_reboot=False).thenReturn(None)
+ couch_service.CouchbaseApp.stop_db = MagicMock(return_value=None)
#invocation
self.manager.stop_db(self.context)
#verification/assertion
- verify(couch_service.CouchbaseApp).stop_db(
+ couch_service.CouchbaseApp.stop_db.assert_any_call(
do_not_start_on_reboot=False)
diff --git a/trove/tests/unittests/guestagent/test_dbaas.py b/trove/tests/unittests/guestagent/test_dbaas.py
index cdc77e08..8451ced1 100644
--- a/trove/tests/unittests/guestagent/test_dbaas.py
+++ b/trove/tests/unittests/guestagent/test_dbaas.py
@@ -17,14 +17,8 @@ from uuid import uuid4
import time
from mock import Mock
from mock import MagicMock
-from mockito import mock
-from mockito import when
-from mockito import any
-from mockito import unstub
-from mockito import verify
-from mockito import contains
-from mockito import never
-from mockito import matchers
+from mock import patch
+from mock import ANY
import sqlalchemy
import testtools
from testtools.matchers import Is
@@ -115,8 +109,8 @@ class DbaasTest(testtools.TestCase):
self.assertRaises(RuntimeError, dbaas.get_auth_password)
def test_service_discovery(self):
- when(os.path).isfile(any()).thenReturn(True)
- mysql_service = dbaas.operating_system.service_discovery(["mysql"])
+ with patch.object(os.path, 'isfile', return_value=True):
+ mysql_service = dbaas.operating_system.service_discovery(["mysql"])
self.assertIsNotNone(mysql_service['cmd_start'])
self.assertIsNotNone(mysql_service['cmd_enable'])
@@ -125,10 +119,10 @@ class DbaasTest(testtools.TestCase):
output = "mysqld would've been started with the these args:\n"\
"--user=mysql --port=3306 --basedir=/usr "\
"--tmpdir=/tmp --skip-external-locking"
- when(os.path).isfile(any()).thenReturn(True)
- dbaas.utils.execute = Mock(return_value=(output, None))
- options = dbaas.load_mysqld_options()
+ with patch.object(os.path, 'isfile', return_value=True):
+ dbaas.utils.execute = Mock(return_value=(output, None))
+ options = dbaas.load_mysqld_options()
self.assertEqual(5, len(options))
self.assertEqual(options["user"], "mysql")
@@ -164,18 +158,16 @@ class MySqlAdminMockTest(testtools.TestCase):
def tearDown(self):
super(MySqlAdminMockTest, self).tearDown()
- unstub()
def test_list_databases(self):
- mock_conn = mock_admin_sql_connection()
-
- when(mock_conn).execute(
- TextClauseMatcher('schema_name as name')).thenReturn(
- ResultSetStub([('db1', 'utf8', 'utf8_bin'),
- ('db2', 'utf8', 'utf8_bin'),
- ('db3', 'utf8', 'utf8_bin')]))
+ mock_conn = mock_sql_connection()
- databases, next_marker = MySqlAdmin().list_databases(limit=10)
+ with patch.object(mock_conn, 'execute',
+ return_value=ResultSetStub(
+ [('db1', 'utf8', 'utf8_bin'),
+ ('db2', 'utf8', 'utf8_bin'),
+ ('db3', 'utf8', 'utf8_bin')])):
+ databases, next_marker = MySqlAdmin().list_databases(limit=10)
self.assertThat(next_marker, Is(None))
self.assertThat(len(databases), Is(3))
@@ -681,11 +673,13 @@ class MySqlAppInstallTest(MySqlAppTest):
super(MySqlAppInstallTest, self).setUp()
self.orig_create_engine = sqlalchemy.create_engine
self.orig_pkg_version = dbaas.packager.pkg_version
+ self.orig_utils_execute_with_timeout = utils.execute_with_timeout
def tearDown(self):
super(MySqlAppInstallTest, self).tearDown()
sqlalchemy.create_engine = self.orig_create_engine
dbaas.packager.pkg_version = self.orig_pkg_version
+ utils.execute_with_timeout = self.orig_utils_execute_with_timeout
def test_install(self):
@@ -752,113 +746,103 @@ class MySqlAppInstallTest(MySqlAppTest):
self.assert_reported_status(rd_instance.ServiceStatuses.NEW)
-class TextClauseMatcher(matchers.Matcher):
+class TextClauseMatcher(object):
def __init__(self, text):
- self.contains = contains(text)
+ self.text = text
def __repr__(self):
- return "TextClause(%s)" % self.contains.sub
+ return "TextClause(%s)" % self.text
- def matches(self, arg):
+ def __eq__(self, arg):
print("Matching %s" % arg.text)
- return self.contains.matches(arg.text)
+ return self.text in arg.text
def mock_sql_connection():
- mock_engine = mock()
- when(sqlalchemy).create_engine("mysql://root:@localhost:3306",
- echo=True).thenReturn(mock_engine)
- mock_conn = mock()
- when(dbaas.LocalSqlClient).__enter__().thenReturn(mock_conn)
- when(dbaas.LocalSqlClient).__exit__(any(), any(), any()).thenReturn(None)
- return mock_conn
-
-
-def mock_admin_sql_connection():
- when(utils).execute_with_timeout("sudo", "awk", any(), any()).thenReturn(
- ['fake_password', None])
- mock_engine = mock()
- when(sqlalchemy).create_engine("mysql://root:@localhost:3306",
- pool_recycle=any(), echo=True,
- listeners=[any()]).thenReturn(mock_engine)
- mock_conn = mock()
- when(dbaas.LocalSqlClient).__enter__().thenReturn(mock_conn)
- when(dbaas.LocalSqlClient).__exit__(any(), any(), any()).thenReturn(None)
+ utils.execute_with_timeout = MagicMock(return_value=['fake_password',
+ None])
+ mock_engine = MagicMock()
+ sqlalchemy.create_engine = MagicMock(return_value=mock_engine)
+ mock_conn = MagicMock()
+ dbaas.LocalSqlClient.__enter__ = MagicMock(return_value=mock_conn)
+ dbaas.LocalSqlClient.__exit__ = MagicMock(return_value=None)
return mock_conn
class MySqlAppMockTest(testtools.TestCase):
+ def setUp(self):
+ super(MySqlAppMockTest, self).setUp()
+ self.orig_utils_execute_with_timeout = utils.execute_with_timeout
+
def tearDown(self):
super(MySqlAppMockTest, self).tearDown()
- unstub()
+ utils.execute_with_timeout = self.orig_utils_execute_with_timeout
def test_secure_keep_root(self):
mock_conn = mock_sql_connection()
- when(mock_conn).execute(any()).thenReturn(None)
- when(utils).execute_with_timeout("sudo", any(str), "stop").thenReturn(
- None)
- # skip writing the file for now
- when(os.path).isfile(any()).thenReturn(False)
- when(utils).execute_with_timeout(
- "sudo", "chmod", any(), any()).thenReturn(None)
- mock_status = mock()
- when(mock_status).wait_for_real_status_to_change_to(
- any(), any(), any()).thenReturn(True)
- when(dbaas).clear_expired_password().thenReturn(None)
- app = MySqlApp(mock_status)
- when(app)._write_mycnf(any(), any()).thenReturn(True)
- when(app).start_mysql().thenReturn(None)
- when(app).stop_db().thenReturn(None)
- app.secure('foo', None)
- verify(mock_conn, never).execute(TextClauseMatcher('root'))
+ with patch.object(mock_conn, 'execute', return_value=None):
+ utils.execute_with_timeout = MagicMock(return_value=None)
+ # skip writing the file for now
+ with patch.object(os.path, 'isfile', return_value=False):
+ mock_status = MagicMock()
+ mock_status.wait_for_real_status_to_change_to = MagicMock(
+ return_value=True)
+ dbaas.clear_expired_password = MagicMock(return_value=None)
+ app = MySqlApp(mock_status)
+ app._write_mycnf = MagicMock(return_value=True)
+ app.start_mysql = MagicMock(return_value=None)
+ app.stop_db = MagicMock(return_value=None)
+ app.secure('foo', None)
+ self.assertTrue(mock_conn.execute.called)
class MySqlRootStatusTest(testtools.TestCase):
+ def setUp(self):
+ super(MySqlRootStatusTest, self).setUp()
+ self.orig_utils_execute_with_timeout = utils.execute_with_timeout
+
def tearDown(self):
super(MySqlRootStatusTest, self).tearDown()
- unstub()
+ utils.execute_with_timeout = self.orig_utils_execute_with_timeout
def test_root_is_enabled(self):
- mock_conn = mock_admin_sql_connection()
+ mock_conn = mock_sql_connection()
- mock_rs = mock()
+ mock_rs = MagicMock()
mock_rs.rowcount = 1
- when(mock_conn).execute(
- TextClauseMatcher(
- "User = 'root' AND Host != 'localhost'")).thenReturn(mock_rs)
-
- self.assertThat(MySqlRootAccess().is_root_enabled(), Is(True))
+ with patch.object(mock_conn, 'execute', return_value=mock_rs):
+ self.assertThat(MySqlRootAccess().is_root_enabled(), Is(True))
def test_root_is_not_enabled(self):
- mock_conn = mock_admin_sql_connection()
+ mock_conn = mock_sql_connection()
- mock_rs = mock()
+ mock_rs = MagicMock()
mock_rs.rowcount = 0
- when(mock_conn).execute(
- TextClauseMatcher(
- "User = 'root' AND Host != 'localhost'")).thenReturn(mock_rs)
-
- self.assertThat(MySqlRootAccess.is_root_enabled(), Equals(False))
+ with patch.object(mock_conn, 'execute', return_value=mock_rs):
+ self.assertThat(MySqlRootAccess.is_root_enabled(), Equals(False))
def test_enable_root(self):
- mock_conn = mock_admin_sql_connection()
- when(mock_conn).execute(any()).thenReturn(None)
- # invocation
- user_ser = MySqlRootAccess.enable_root()
- # verification
- self.assertThat(user_ser, Not(Is(None)))
- verify(mock_conn).execute(TextClauseMatcher('CREATE USER'),
- user='root', host='%')
- verify(mock_conn).execute(TextClauseMatcher(
- 'GRANT ALL PRIVILEGES ON *.*'))
- verify(mock_conn).execute(TextClauseMatcher('UPDATE mysql.user'))
+ mock_conn = mock_sql_connection()
+
+ with patch.object(mock_conn, 'execute', return_value=None):
+ # invocation
+ user_ser = MySqlRootAccess.enable_root()
+ # verification
+ self.assertThat(user_ser, Not(Is(None)))
+ mock_conn.execute.assert_any_call(TextClauseMatcher('CREATE USER'),
+ user='root', host='%')
+ mock_conn.execute.assert_any_call(TextClauseMatcher(
+ 'GRANT ALL PRIVILEGES ON *.*'))
+ mock_conn.execute.assert_any_call(TextClauseMatcher(
+ 'UPDATE mysql.user'))
def test_enable_root_failed(self):
- when(models.MySQLUser)._is_valid_user_name(any()).thenReturn(False)
- self.assertRaises(ValueError, MySqlAdmin().enable_root)
+ with patch.object(models.MySQLUser, '_is_valid_user_name',
+ return_value=False):
+ self.assertRaises(ValueError, MySqlAdmin().enable_root)
class MockStats:
@@ -871,7 +855,6 @@ class InterrogatorTest(testtools.TestCase):
def tearDown(self):
super(InterrogatorTest, self).tearDown()
- unstub()
def test_to_gb(self):
result = to_gb(123456789)
@@ -882,8 +865,8 @@ class InterrogatorTest(testtools.TestCase):
self.assertEqual(result, 0.0)
def test_get_filesystem_volume_stats(self):
- when(os).statvfs(any()).thenReturn(MockStats)
- result = get_filesystem_volume_stats('/some/path/')
+ with patch.object(os, 'statvfs', return_value=MockStats):
+ result = get_filesystem_volume_stats('/some/path/')
self.assertEqual(result['block_size'], 4096)
self.assertEqual(result['total_blocks'], 1048576)
@@ -893,10 +876,10 @@ class InterrogatorTest(testtools.TestCase):
self.assertEqual(result['used'], 2.0)
def test_get_filesystem_volume_stats_error(self):
- when(os).statvfs(any()).thenRaise(OSError)
- self.assertRaises(
- RuntimeError,
- get_filesystem_volume_stats, '/nonexistent/path')
+ with patch.object(os, 'statvfs', side_effect=OSError):
+ self.assertRaises(
+ RuntimeError,
+ get_filesystem_volume_stats, '/nonexistent/path')
class ServiceRegistryTest(testtools.TestCase):
@@ -1226,175 +1209,161 @@ class TestRedisApp(testtools.TestCase):
utils.execute_with_timeout = self.orig_utils_execute_with_timeout
rservice.utils.execute_with_timeout = \
self.orig_utils_execute_with_timeout
- unstub()
def test_install_if_needed_installed(self):
- when(pkg.Package).pkg_is_installed(any()).thenReturn(True)
- when(RedisApp)._install_redis('bar').thenReturn(None)
- self.app.install_if_needed('bar')
- verify(pkg.Package).pkg_is_installed('bar')
- verify(RedisApp, times=0)._install_redis('bar')
+ with patch.object(pkg.Package, 'pkg_is_installed', return_value=True):
+ with patch.object(RedisApp, '_install_redis', return_value=None):
+ self.app.install_if_needed('bar')
+ pkg.Package.pkg_is_installed.assert_any_call('bar')
+ self.assertEqual(RedisApp._install_redis.call_count, 0)
def test_install_if_needed_not_installed(self):
- when(pkg.Package).pkg_is_installed(any()).thenReturn(False)
- when(RedisApp)._install_redis('asdf').thenReturn(None)
- self.app.install_if_needed('asdf')
- verify(pkg.Package).pkg_is_installed('asdf')
- verify(RedisApp)._install_redis('asdf')
+ with patch.object(pkg.Package, 'pkg_is_installed', return_value=False):
+ with patch.object(RedisApp, '_install_redis', return_value=None):
+ self.app.install_if_needed('asdf')
+ pkg.Package.pkg_is_installed.assert_any_call('asdf')
+ RedisApp._install_redis.assert_any_call('asdf')
def test_install_redis(self):
- when(utils).execute_with_timeout(any())
- when(pkg.Package).pkg_install('redis', {}, 1200).thenReturn(None)
- when(RedisApp).start_redis().thenReturn(None)
- self.app._install_redis('redis')
- verify(utils).execute_with_timeout(any())
- verify(pkg.Package).pkg_install('redis', {}, 1200)
- verify(RedisApp).start_redis()
+ with patch.object(utils, 'execute_with_timeout'):
+ with patch.object(pkg.Package, 'pkg_install', return_value=None):
+ with patch.object(RedisApp, 'start_redis', return_value=None):
+ self.app._install_redis('redis')
+ pkg.Package.pkg_install.assert_any_call('redis', {}, 1200)
+ RedisApp.start_redis.assert_any_call()
+ self.assertTrue(utils.execute_with_timeout.called)
def test_enable_redis_on_boot_without_upstart(self):
- when(os.path).isfile(RedisSystem.REDIS_INIT).thenReturn(False)
- when(utils).execute_with_timeout('sudo ' +
- RedisSystem.REDIS_CMD_ENABLE,
- shell=True).thenReturn(None)
- self.app._enable_redis_on_boot()
- verify(os.path).isfile(RedisSystem.REDIS_INIT)
- verify(utils).execute_with_timeout('sudo ' +
- RedisSystem.REDIS_CMD_ENABLE,
- shell=True)
+ with patch.object(os.path, 'isfile', return_value=False):
+ with patch.object(utils, 'execute_with_timeout',
+ return_value=None):
+ self.app._enable_redis_on_boot()
+ os.path.isfile.assert_any_call(RedisSystem.REDIS_INIT)
+ utils.execute_with_timeout.assert_any_call(
+ 'sudo ' + RedisSystem.REDIS_CMD_ENABLE,
+ shell=True)
def test_enable_redis_on_boot_with_upstart(self):
- when(os.path).isfile(RedisSystem.REDIS_INIT).thenReturn(True)
- when(utils).execute_with_timeout("sudo sed -i '/^manual$/d' " +
- RedisSystem.REDIS_INIT,
- shell=True).thenReturn(None)
- self.app._enable_redis_on_boot()
- verify(os.path).isfile(RedisSystem.REDIS_INIT)
- verify(utils).execute_with_timeout("sudo sed -i '/^manual$/d' " +
- RedisSystem.REDIS_INIT,
- shell=True)
+ with patch.object(os.path, 'isfile', return_value=True):
+ with patch.object(utils, 'execute_with_timeout',
+ return_value=None):
+ self.app._enable_redis_on_boot()
+ os.path.isfile.assert_any_call(RedisSystem.REDIS_INIT)
+ utils.execute_with_timeout.assert_any_call(
+ "sudo sed -i '/^manual$/d' " + RedisSystem.REDIS_INIT,
+ shell=True)
def test_disable_redis_on_boot_with_upstart(self):
- when(os.path).isfile(RedisSystem.REDIS_INIT).thenReturn(True)
- when(utils).execute_with_timeout('echo',
- "'manual'",
- '>>',
- RedisSystem.REDIS_INIT,
- run_as_root=True,
- root_helper='sudo').thenReturn(None)
- self.app._disable_redis_on_boot()
- verify(os.path).isfile(RedisSystem.REDIS_INIT)
- verify(utils).execute_with_timeout('echo',
- "'manual'",
- '>>',
- RedisSystem.REDIS_INIT,
- run_as_root=True,
- root_helper='sudo')
+ with patch.object(os.path, 'isfile', return_value=True):
+ with patch.object(utils, 'execute_with_timeout',
+ return_value=None):
+ self.app._disable_redis_on_boot()
+ os.path.isfile.assert_any_call(RedisSystem.REDIS_INIT)
+ utils.execute_with_timeout.assert_any_call(
+ 'echo',
+ "'manual'",
+ '>>',
+ RedisSystem.REDIS_INIT,
+ run_as_root=True,
+ root_helper='sudo')
def test_disable_redis_on_boot_without_upstart(self):
- when(os.path).isfile(RedisSystem.REDIS_INIT).thenReturn(False)
- when(utils).execute_with_timeout('sudo ' +
- RedisSystem.REDIS_CMD_DISABLE,
- shell=True).thenReturn(None)
- self.app._disable_redis_on_boot()
- verify(os.path).isfile(RedisSystem.REDIS_INIT)
- verify(utils).execute_with_timeout('sudo ' +
- RedisSystem.REDIS_CMD_DISABLE,
- shell=True)
+ with patch.object(os.path, 'isfile', return_value=False):
+ with patch.object(utils, 'execute_with_timeout',
+ return_value=None):
+ self.app._disable_redis_on_boot()
+ os.path.isfile.assert_any_call(RedisSystem.REDIS_INIT)
+ utils.execute_with_timeout.assert_any_call(
+ 'sudo ' + RedisSystem.REDIS_CMD_DISABLE,
+ shell=True)
def test_stop_db_without_fail(self):
- mock_status = mock()
- when(mock_status).wait_for_real_status_to_change_to(
- any(), any(), any()).thenReturn(True)
+ mock_status = MagicMock()
+ mock_status.wait_for_real_status_to_change_to = MagicMock(
+ return_value=True)
app = RedisApp(mock_status, state_change_wait_time=0)
- when(RedisApp)._disable_redis_on_boot().thenReturn(None)
- when(utils).execute_with_timeout('sudo ' + RedisSystem.REDIS_CMD_STOP,
- shell=True).thenReturn(None)
- when(mock_status).wait_for_real_status_to_change_to(
- any(),
- any(),
- any()).thenReturn(True)
- app.stop_db(do_not_start_on_reboot=True)
- verify(RedisApp)._disable_redis_on_boot()
- verify(utils).execute_with_timeout('sudo ' +
- RedisSystem.REDIS_CMD_STOP,
- shell=True)
- verify(mock_status).wait_for_real_status_to_change_to(
- any(),
- any(),
- any())
+ RedisApp._disable_redis_on_boot = MagicMock(
+ return_value=None)
+
+ with patch.object(utils, 'execute_with_timeout', return_value=None):
+ mock_status.wait_for_real_status_to_change_to = MagicMock(
+ return_value=True)
+ app.stop_db(do_not_start_on_reboot=True)
+
+ utils.execute_with_timeout.assert_any_call(
+ 'sudo ' + RedisSystem.REDIS_CMD_STOP,
+ shell=True)
+ self.assertTrue(RedisApp._disable_redis_on_boot.called)
+ self.assertTrue(
+ mock_status.wait_for_real_status_to_change_to.called)
def test_stop_db_with_failure(self):
- mock_status = mock()
- when(mock_status).wait_for_real_status_to_change_to(
- any(), any(), any()).thenReturn(True)
+ mock_status = MagicMock()
+ mock_status.wait_for_real_status_to_change_to = MagicMock(
+ return_value=True)
app = RedisApp(mock_status, state_change_wait_time=0)
- when(RedisApp)._disable_redis_on_boot().thenReturn(None)
- when(utils).execute_with_timeout('sudo ' + RedisSystem.REDIS_CMD_STOP,
- shell=True).thenReturn(None)
- when(mock_status).wait_for_real_status_to_change_to(
- any(),
- any(),
- any()).thenReturn(False)
- app.stop_db(do_not_start_on_reboot=True)
- verify(RedisApp)._disable_redis_on_boot()
- verify(utils).execute_with_timeout('sudo ' +
- RedisSystem.REDIS_CMD_STOP,
- shell=True)
- verify(mock_status).wait_for_real_status_to_change_to(
- any(),
- any(),
- any())
- verify(mock_status).end_install_or_restart()
+ RedisApp._disable_redis_on_boot = MagicMock(
+ return_value=None)
+
+ with patch.object(utils, 'execute_with_timeout', return_value=None):
+ mock_status.wait_for_real_status_to_change_to = MagicMock(
+ return_value=False)
+ app.stop_db(do_not_start_on_reboot=True)
+
+ utils.execute_with_timeout.assert_any_call(
+ 'sudo ' + RedisSystem.REDIS_CMD_STOP,
+ shell=True)
+ self.assertTrue(RedisApp._disable_redis_on_boot.called)
+ self.assertTrue(mock_status.end_install_or_restart.called)
+ self.assertTrue(
+ mock_status.wait_for_real_status_to_change_to.called)
def test_restart(self):
- mock_status = mock()
+ mock_status = MagicMock()
app = RedisApp(mock_status, state_change_wait_time=0)
- when(mock_status).begin_restart().thenReturn(None)
- when(RedisApp).stop_db().thenReturn(None)
- when(RedisApp).start_redis().thenReturn(None)
- when(mock_status).end_install_or_restart().thenReturn(None)
- app.restart()
- verify(mock_status).begin_restart()
- verify(RedisApp).stop_db()
- verify(RedisApp).start_redis()
- verify(mock_status).end_install_or_restart()
+ mock_status.begin_restart = MagicMock(return_value=None)
+ with patch.object(RedisApp, 'stop_db', return_value=None):
+ with patch.object(RedisApp, 'start_redis', return_value=None):
+ mock_status.end_install_or_restart = MagicMock(
+ return_value=None)
+ app.restart()
+ mock_status.begin_restart.assert_any_call()
+ RedisApp.stop_db.assert_any_call()
+ RedisApp.start_redis.assert_any_call()
+ mock_status.end_install_or_restart.assert_any_call()
def test_start_redis(self):
- mock_status = mock()
+ mock_status = MagicMock()
app = RedisApp(mock_status, state_change_wait_time=0)
- when(RedisApp)._enable_redis_on_boot().thenReturn(None)
- when(utils).execute_with_timeout('sudo ' + RedisSystem.REDIS_CMD_START,
- shell=True).thenReturn(None)
- when(mock_status).wait_for_real_status_to_change_to(any(),
- any(),
- any()).thenReturn(
- None)
- when(utils).execute_with_timeout('pkill', '-9',
- 'redis-server',
- run_as_root=True,
- root_helper='sudo').thenReturn(None)
- when(mock_status).end_install_or_restart().thenReturn(None)
- app.start_redis()
- verify(RedisApp)._enable_redis_on_boot()
- verify(utils).execute_with_timeout('sudo ' +
- RedisSystem.REDIS_CMD_START,
- shell=True)
- verify(mock_status).wait_for_real_status_to_change_to(any(),
- any(),
- any())
- verify(utils).execute_with_timeout('pkill', '-9',
- 'redis-server',
- run_as_root=True,
- root_helper='sudo')
- verify(mock_status).end_install_or_restart()
+ with patch.object(RedisApp, '_enable_redis_on_boot',
+ return_value=None):
+ with patch.object(utils, 'execute_with_timeout',
+ return_value=None):
+ mock_status.wait_for_real_status_to_change_to = MagicMock(
+ return_value=None)
+ mock_status.end_install_or_restart = MagicMock(
+ return_value=None)
+ app.start_redis()
+
+ utils.execute_with_timeout.assert_any_call(
+ 'sudo ' + RedisSystem.REDIS_CMD_START,
+ shell=True)
+ utils.execute_with_timeout.assert_any_call('pkill', '-9',
+ 'redis-server',
+ run_as_root=True,
+ root_helper='sudo')
+ self.assertTrue(RedisApp._enable_redis_on_boot.called)
+ self.assertTrue(mock_status.end_install_or_restart.called)
+ self.assertTrue(
+ mock_status.wait_for_real_status_to_change_to.callled)
class CassandraDBAppTest(testtools.TestCase):
def setUp(self):
super(CassandraDBAppTest, self).setUp()
- self.utils_execute_with_timeout = (cass_service .
- utils.execute_with_timeout)
+ self.utils_execute_with_timeout = (
+ cass_service.utils.execute_with_timeout)
self.sleep = time.sleep
self.pkg_version = cass_service.packager.pkg_version
self.pkg = cass_service.packager
@@ -1765,16 +1734,16 @@ class MongoDBAppTest(testtools.TestCase):
Mock())
def test_install_when_db_installed(self):
- packager_mock = mock()
- when(packager_mock).pkg_is_installed(any()).thenReturn(True)
+ packager_mock = MagicMock()
+ packager_mock.pkg_is_installed = MagicMock(return_value=True)
mongo_system.PACKAGER = packager_mock
self.mongoDbApp.install_if_needed(['package'])
self.assert_reported_status(rd_instance.ServiceStatuses.NEW)
def test_install_when_db_not_installed(self):
- packager_mock = mock()
- when(packager_mock).pkg_is_installed(any()).thenReturn(False)
+ packager_mock = MagicMock()
+ packager_mock.pkg_is_installed = MagicMock(return_value=False)
mongo_system.PACKAGER = packager_mock
self.mongoDbApp.install_if_needed(['package'])
- verify(packager_mock).pkg_install(any(), {}, any())
+ packager_mock.pkg_install.assert_any_call(ANY, {}, ANY)
self.assert_reported_status(rd_instance.ServiceStatuses.NEW)
diff --git a/trove/tests/unittests/guestagent/test_mongodb_manager.py b/trove/tests/unittests/guestagent/test_mongodb_manager.py
index 2e4dd69d..ecc32d02 100644
--- a/trove/tests/unittests/guestagent/test_mongodb_manager.py
+++ b/trove/tests/unittests/guestagent/test_mongodb_manager.py
@@ -15,7 +15,7 @@
import os
import testtools
-from mockito import verify, when, unstub, any, mock
+from mock import MagicMock
from trove.common.context import TroveContext
from trove.guestagent import volume
from trove.guestagent.datastore.mongodb import service as mongo_service
@@ -46,12 +46,11 @@ class GuestAgentMongoDBManagerTest(testtools.TestCase):
volume.VolumeDevice.mount = self.origin_mount
mongo_service.MongoDBApp.stop_db = self.origin_stop_db
mongo_service.MongoDBApp.start_db = self.origin_start_db
- unstub()
def test_update_status(self):
- self.manager.status = mock()
+ self.manager.status = MagicMock()
self.manager.update_status(self.context)
- verify(self.manager.status).update()
+ self.manager.status.update.assert_any_call()
def test_prepare_from_backup(self):
self._prepare_dynamic(backup_id='backup_id_123abc')
@@ -65,20 +64,20 @@ class GuestAgentMongoDBManagerTest(testtools.TestCase):
'type': 'MongoDBDump',
'checksum': 'fake-checksum'} if backup_id else None
- mock_status = mock()
+ mock_status = MagicMock()
+ mock_app = MagicMock()
self.manager.status = mock_status
- when(mock_status).begin_install().thenReturn(None)
+ self.manager.app = mock_app
- when(VolumeDevice).format().thenReturn(None)
- when(VolumeDevice).migrate_data(any()).thenReturn(None)
- when(VolumeDevice).mount().thenReturn(None)
+ mock_status.begin_install = MagicMock(return_value=None)
+ volume.VolumeDevice.format = MagicMock(return_value=None)
+ volume.VolumeDevice.migrate_data = MagicMock(return_value=None)
+ volume.VolumeDevice.mount = MagicMock(return_value=None)
- mock_app = mock()
- self.manager.app = mock_app
- when(mock_app).stop_db().thenReturn(None)
- when(mock_app).start_db().thenReturn(None)
- when(mock_app).clear_storage().thenReturn(None)
- when(os.path).exists(any()).thenReturn(is_db_installed)
+ mock_app.stop_db = MagicMock(return_value=None)
+ mock_app.start_db = MagicMock(return_value=None)
+ mock_app.clear_storage = MagicMock(return_value=None)
+ os.path.exists = MagicMock(return_value=is_db_installed)
# invocation
self.manager.prepare(context=self.context, databases=None,
@@ -87,9 +86,10 @@ class GuestAgentMongoDBManagerTest(testtools.TestCase):
device_path=device_path,
mount_point='/var/lib/mongodb',
backup_info=backup_info)
+
# verification/assertion
- verify(mock_status).begin_install()
- verify(VolumeDevice).format()
- verify(mock_app).stop_db()
- verify(VolumeDevice).migrate_data(any())
- verify(mock_app).install_if_needed(any())
+ mock_status.begin_install.assert_any_call()
+ mock_app.install_if_needed.assert_any_call(['package'])
+ mock_app.stop_db.assert_any_call()
+ VolumeDevice.format.assert_any_call()
+ VolumeDevice.migrate_data.assert_any_call('/var/lib/mongodb')
diff --git a/trove/tests/unittests/guestagent/test_mysql_manager.py b/trove/tests/unittests/guestagent/test_mysql_manager.py
index ee0157fc..30af10e2 100644
--- a/trove/tests/unittests/guestagent/test_mysql_manager.py
+++ b/trove/tests/unittests/guestagent/test_mysql_manager.py
@@ -15,7 +15,7 @@
import os
import testtools
-from mockito import verify, when, unstub, any, mock, never
+from mock import MagicMock
from testtools.matchers import Is, Equals, Not
from trove.common.context import TroveContext
from trove.guestagent import volume
@@ -23,7 +23,7 @@ from trove.guestagent.datastore.mysql.manager import Manager
import trove.guestagent.datastore.mysql.service as dbaas
from trove.guestagent import backup
from trove.guestagent.volume import VolumeDevice
-from trove.guestagent import pkg
+from trove.guestagent import pkg as pkg
class GuestAgentManagerTest(testtools.TestCase):
@@ -52,94 +52,89 @@ class GuestAgentManagerTest(testtools.TestCase):
dbaas.MySqlApp.start_mysql = self.origin_start_mysql
pkg.Package.pkg_is_installed = self.origin_pkg_is_installed
os.path.exists = self.origin_os_path_exists
- unstub()
def test_update_status(self):
- mock_status = mock()
- when(dbaas.MySqlAppStatus).get().thenReturn(mock_status)
+ mock_status = MagicMock()
+ dbaas.MySqlAppStatus.get = MagicMock(return_value=mock_status)
self.manager.update_status(self.context)
- verify(dbaas.MySqlAppStatus).get()
- verify(mock_status).update()
+ dbaas.MySqlAppStatus.get.assert_any_call()
+ mock_status.update.assert_any_call()
def test_create_database(self):
- when(dbaas.MySqlAdmin).create_database(['db1']).thenReturn(None)
+ dbaas.MySqlAdmin.create_database = MagicMock(return_value=None)
self.manager.create_database(self.context, ['db1'])
- verify(dbaas.MySqlAdmin).create_database(['db1'])
+ dbaas.MySqlAdmin.create_database.assert_any_call(['db1'])
def test_create_user(self):
- when(dbaas.MySqlAdmin).create_user(['user1']).thenReturn(None)
+ dbaas.MySqlAdmin.create_user = MagicMock(return_value=None)
self.manager.create_user(self.context, ['user1'])
- verify(dbaas.MySqlAdmin).create_user(['user1'])
+ dbaas.MySqlAdmin.create_user.assert_any_call(['user1'])
def test_delete_database(self):
databases = ['db1']
- when(dbaas.MySqlAdmin).delete_database(databases).thenReturn(None)
+ dbaas.MySqlAdmin.delete_database = MagicMock(return_value=None)
self.manager.delete_database(self.context, databases)
- verify(dbaas.MySqlAdmin).delete_database(databases)
+ dbaas.MySqlAdmin.delete_database.assert_any_call(databases)
def test_delete_user(self):
user = ['user1']
- when(dbaas.MySqlAdmin).delete_user(user).thenReturn(None)
+ dbaas.MySqlAdmin.delete_user = MagicMock(return_value=None)
self.manager.delete_user(self.context, user)
- verify(dbaas.MySqlAdmin).delete_user(user)
+ dbaas.MySqlAdmin.delete_user.assert_any_call(user)
def test_grant_access(self):
username = "test_user"
hostname = "test_host"
databases = ["test_database"]
- when(dbaas.MySqlAdmin).grant_access(username,
- hostname,
- databases).thenReturn(None)
-
+ dbaas.MySqlAdmin.grant_access = MagicMock(return_value=None)
self.manager.grant_access(self.context,
username,
hostname,
databases)
- verify(dbaas.MySqlAdmin).grant_access(username, hostname, databases)
+ dbaas.MySqlAdmin.grant_access.assert_any_call(username,
+ hostname,
+ databases)
def test_list_databases(self):
- when(dbaas.MySqlAdmin).list_databases(None, None,
- False).thenReturn(['database1'])
+ dbaas.MySqlAdmin.list_databases = MagicMock(return_value=['database1'])
databases = self.manager.list_databases(self.context)
self.assertThat(databases, Not(Is(None)))
self.assertThat(databases, Equals(['database1']))
- verify(dbaas.MySqlAdmin).list_databases(None, None, False)
+ dbaas.MySqlAdmin.list_databases.assert_any_call(None, None, False)
def test_list_users(self):
- when(dbaas.MySqlAdmin).list_users(None, None,
- False).thenReturn(['user1'])
+ dbaas.MySqlAdmin.list_users = MagicMock(return_value=['user1'])
users = self.manager.list_users(self.context)
self.assertThat(users, Equals(['user1']))
- verify(dbaas.MySqlAdmin).list_users(None, None, False)
+ dbaas.MySqlAdmin.list_users.assert_any_call(None, None, False)
def test_get_users(self):
username = ['user1']
hostname = ['host']
- when(dbaas.MySqlAdmin).get_user(username,
- hostname).thenReturn(['user1'])
+ dbaas.MySqlAdmin.get_user = MagicMock(return_value=['user1'])
users = self.manager.get_user(self.context, username, hostname)
self.assertThat(users, Equals(['user1']))
- verify(dbaas.MySqlAdmin).get_user(username, hostname)
+ dbaas.MySqlAdmin.get_user.assert_any_call(username, hostname)
def test_enable_root(self):
- when(dbaas.MySqlAdmin).enable_root().thenReturn('user_id_stuff')
+ dbaas.MySqlAdmin.enable_root = MagicMock(return_value='user_id_stuff')
user_id = self.manager.enable_root(self.context)
self.assertThat(user_id, Is('user_id_stuff'))
- verify(dbaas.MySqlAdmin).enable_root()
+ dbaas.MySqlAdmin.enable_root.assert_any_call()
def test_is_root_enabled(self):
- when(dbaas.MySqlAdmin).is_root_enabled().thenReturn(True)
+ dbaas.MySqlAdmin.is_root_enabled = MagicMock(return_value=True)
is_enabled = self.manager.is_root_enabled(self.context)
self.assertThat(is_enabled, Is(True))
- verify(dbaas.MySqlAdmin).is_root_enabled()
+ dbaas.MySqlAdmin.is_root_enabled.assert_any_call()
def test_create_backup(self):
- when(backup).backup(self.context, 'backup_id_123').thenReturn(None)
+ backup.backup = MagicMock(return_value=None)
# entry point
Manager().create_backup(self.context, 'backup_id_123')
# assertions
- verify(backup).backup(self.context, 'backup_id_123')
+ backup.backup.assert_any_call(self.context, 'backup_id_123')
def test_prepare_device_path_true(self):
self._prepare_dynamic()
@@ -171,27 +166,27 @@ class GuestAgentManagerTest(testtools.TestCase):
}
# TODO(juice): this should stub an instance of the MySqlAppStatus
- mock_status = mock()
- when(dbaas.MySqlAppStatus).get().thenReturn(mock_status)
- when(mock_status).begin_install().thenReturn(None)
- when(VolumeDevice).format().thenReturn(None)
- when(VolumeDevice).migrate_data(any()).thenReturn(None)
- when(VolumeDevice).mount().thenReturn(None)
- when(dbaas.MySqlApp).stop_db().thenReturn(None)
- when(dbaas.MySqlApp).start_mysql().thenReturn(None)
- when(dbaas.MySqlApp).install_if_needed(any()).thenReturn(None)
- when(backup).restore(self.context,
- backup_info,
- '/var/lib/mysql').thenReturn(None)
- when(dbaas.MySqlApp).secure(any()).thenReturn(None)
- when(dbaas.MySqlApp).secure_root(any()).thenReturn(None)
- (when(pkg.Package).pkg_is_installed(any()).
- thenReturn(is_mysql_installed))
- when(dbaas.MySqlAdmin).is_root_enabled().thenReturn(is_root_enabled)
- when(dbaas.MySqlAdmin).create_user().thenReturn(None)
- when(dbaas.MySqlAdmin).create_database().thenReturn(None)
-
- when(os.path).exists(any()).thenReturn(True)
+ mock_status = MagicMock()
+
+ dbaas.MySqlAppStatus.get = MagicMock(return_value=mock_status)
+ mock_status.begin_install = MagicMock(return_value=None)
+ VolumeDevice.format = MagicMock(return_value=None)
+ VolumeDevice.migrate_data = MagicMock(return_value=None)
+ VolumeDevice.mount = MagicMock(return_value=None)
+ dbaas.MySqlApp.stop_db = MagicMock(return_value=None)
+ dbaas.MySqlApp.start_mysql = MagicMock(return_value=None)
+ dbaas.MySqlApp.install_if_needed = MagicMock(return_value=None)
+ backup.restore = MagicMock(return_value=None)
+ dbaas.MySqlApp.secure = MagicMock(return_value=None)
+ dbaas.MySqlApp.secure_root = MagicMock(return_value=None)
+ pkg.Package.pkg_is_installed = MagicMock(
+ return_value=is_mysql_installed)
+ dbaas.MySqlAdmin.is_root_enabled = MagicMock(
+ return_value=is_root_enabled)
+ dbaas.MySqlAdmin.create_user = MagicMock(return_value=None)
+ dbaas.MySqlAdmin.create_database = MagicMock(return_value=None)
+
+ os.path.exists = MagicMock(return_value=True)
# invocation
self.manager.prepare(context=self.context,
packages=None,
@@ -202,18 +197,21 @@ class GuestAgentManagerTest(testtools.TestCase):
mount_point='/var/lib/mysql',
backup_info=backup_info,
overrides=overrides)
+
# verification/assertion
- verify(mock_status).begin_install()
+ mock_status.begin_install.assert_any_call()
- verify(VolumeDevice, times=COUNT).format()
- verify(dbaas.MySqlApp, times=COUNT).stop_db()
- verify(VolumeDevice, times=COUNT).migrate_data(
- any())
+ self.assertEqual(VolumeDevice.format.call_count, COUNT)
+ self.assertEqual(VolumeDevice.migrate_data.call_count, COUNT)
+ self.assertEqual(dbaas.MySqlApp.stop_db.call_count, COUNT)
if backup_info:
- verify(backup).restore(self.context, backup_info, '/var/lib/mysql')
- verify(dbaas.MySqlApp).install_if_needed(any())
+ backup.restore.assert_any_call(self.context,
+ backup_info,
+ '/var/lib/mysql')
+ dbaas.MySqlApp.install_if_needed.assert_any_call(None)
# We dont need to make sure the exact contents are there
- verify(dbaas.MySqlApp).secure(any(), overrides)
- verify(dbaas.MySqlAdmin, never).create_database()
- verify(dbaas.MySqlAdmin, never).create_user()
- verify(dbaas.MySqlApp).secure_root(secure_remote_root=any())
+ dbaas.MySqlApp.secure.assert_any_call(None, None)
+ self.assertFalse(dbaas.MySqlAdmin.create_database.called)
+ self.assertFalse(dbaas.MySqlAdmin.create_user.called)
+ dbaas.MySqlApp.secure_root.assert_any_call(
+ secure_remote_root=not is_root_enabled)
diff --git a/trove/tests/unittests/guestagent/test_pkg.py b/trove/tests/unittests/guestagent/test_pkg.py
index bce5b315..864b2f52 100644
--- a/trove/tests/unittests/guestagent/test_pkg.py
+++ b/trove/tests/unittests/guestagent/test_pkg.py
@@ -14,8 +14,7 @@
# under the License.
import testtools
-from mock import Mock
-from mockito import when, any
+from mock import Mock, MagicMock
import pexpect
from trove.common import utils
from trove.guestagent import pkg
@@ -58,15 +57,12 @@ class PkgDEBInstallTestCase(testtools.TestCase):
def test_pkg_is_instaled_yes(self):
packages = "package1=1.0 package2"
- when(self.pkg).pkg_version("package1").thenReturn("1.0")
- when(self.pkg).pkg_version("package2").thenReturn("2.0")
+ self.pkg.pkg_version = MagicMock(side_effect=["1.0", "2.0"])
self.assertTrue(self.pkg.pkg_is_installed(packages))
def test_pkg_is_instaled_no(self):
packages = "package1=1.0 package2 package3=3.1"
- when(self.pkg).pkg_version("package1").thenReturn("1.0")
- when(self.pkg).pkg_version("package2").thenReturn("2.0")
- when(self.pkg).pkg_version("package3").thenReturn("3.0")
+ self.pkg.pkg_version = MagicMock(side_effect=["1.0", "2.0", "3.0"])
self.assertFalse(self.pkg.pkg_is_installed(packages))
def test_success_install(self):
@@ -311,13 +307,14 @@ class PkgRPMInstallTestCase(testtools.TestCase):
def test_pkg_is_instaled_yes(self):
packages = "package1=1.0 package2"
- when(commands).getstatusoutput(any()).thenReturn({1: "package1=1.0\n"
- "package2=2.0"})
+ commands.getstatusoutput = MagicMock(return_value={1: "package1=1.0\n"
+ "package2=2.0"})
self.assertTrue(self.pkg.pkg_is_installed(packages))
def test_pkg_is_instaled_no(self):
packages = "package1=1.0 package2 package3=3.0"
- when(commands).getstatusoutput({1: "package1=1.0\npackage2=2.0"})
+ commands.getstatusoutput = MagicMock(return_value={1: "package1=1.0\n"
+ "package2=2.0"})
self.assertFalse(self.pkg.pkg_is_installed(packages))
def test_permission_error(self):
diff --git a/trove/tests/unittests/guestagent/test_redis_manager.py b/trove/tests/unittests/guestagent/test_redis_manager.py
index d3857f89..0516e21c 100644
--- a/trove/tests/unittests/guestagent/test_redis_manager.py
+++ b/trove/tests/unittests/guestagent/test_redis_manager.py
@@ -13,7 +13,7 @@
# under the License.
import testtools
-from mockito import verify, when, unstub, any, mock
+from mock import MagicMock
from trove.common.context import TroveContext
from trove.guestagent.datastore.redis.manager import Manager as RedisManager
import trove.guestagent.datastore.redis.service as redis_service
@@ -29,9 +29,17 @@ class RedisGuestAgentManagerTest(testtools.TestCase):
self.manager = RedisManager()
self.packages = 'redis-server'
self.origin_RedisAppStatus = redis_service.RedisAppStatus
- self.origin_stop_redis = redis_service.RedisApp.stop_db
self.origin_start_redis = redis_service.RedisApp.start_redis
+ self.origin_stop_redis = redis_service.RedisApp.stop_db
self.origin_install_redis = redis_service.RedisApp._install_redis
+ self.origin_write_config = redis_service.RedisApp.write_config
+ self.origin_install_if_needed = \
+ redis_service.RedisApp.install_if_needed
+ self.origin_complete_install_or_restart = \
+ redis_service.RedisApp.complete_install_or_restart
+ self.origin_format = VolumeDevice.format
+ self.origin_mount = VolumeDevice.mount
+ self.origin_restore = backup.restore
def tearDown(self):
super(RedisGuestAgentManagerTest, self).tearDown()
@@ -39,14 +47,22 @@ class RedisGuestAgentManagerTest(testtools.TestCase):
redis_service.RedisApp.stop_db = self.origin_stop_redis
redis_service.RedisApp.start_redis = self.origin_start_redis
redis_service.RedisApp._install_redis = self.origin_install_redis
- unstub()
+ redis_service.RedisApp.write_config = self.origin_write_config
+ redis_service.RedisApp.install_if_needed = \
+ self.origin_install_if_needed
+ redis_service.RedisApp.complete_install_or_restart = \
+ self.origin_complete_install_or_restart
+ VolumeDevice.format = self.origin_format
+ VolumeDevice.mount = self.origin_mount
+ backup.restore = self.origin_restore
def test_update_status(self):
- mock_status = mock()
- when(redis_service.RedisAppStatus).get().thenReturn(mock_status)
+ mock_status = MagicMock()
+ self.manager.appStatus = mock_status
+ redis_service.RedisAppStatus.get = MagicMock(return_value=mock_status)
self.manager.update_status(self.context)
- verify(redis_service.RedisAppStatus).get()
- verify(mock_status).update()
+ redis_service.RedisAppStatus.get.assert_any_call()
+ mock_status.update.assert_any_call()
def test_prepare_redis_not_installed(self):
self._prepare_dynamic(is_redis_installed=False)
@@ -56,42 +72,46 @@ class RedisGuestAgentManagerTest(testtools.TestCase):
mount_point='var/lib/redis'):
# covering all outcomes is starting to cause trouble here
- mock_status = mock()
- when(redis_service.RedisAppStatus).get().thenReturn(mock_status)
- when(mock_status).begin_install().thenReturn(None)
- when(VolumeDevice).format().thenReturn(None)
- when(VolumeDevice).mount().thenReturn(None)
- when(redis_service.RedisApp).start_redis().thenReturn(None)
- when(redis_service.RedisApp).install_if_needed().thenReturn(None)
- when(backup).restore(self.context, backup_info).thenReturn(None)
- when(redis_service.RedisApp).write_config(any()).thenReturn(None)
- when(redis_service.RedisApp).complete_install_or_restart(
- any()).thenReturn(None)
+ mock_status = MagicMock()
+ redis_service.RedisAppStatus.get = MagicMock(return_value=mock_status)
+ redis_service.RedisApp.start_redis = MagicMock(return_value=None)
+ redis_service.RedisApp.install_if_needed = MagicMock(return_value=None)
+ redis_service.RedisApp.write_config = MagicMock(return_value=None)
+ redis_service.RedisApp.complete_install_or_restart = MagicMock(
+ return_value=None)
+ mock_status.begin_install = MagicMock(return_value=None)
+ VolumeDevice.format = MagicMock(return_value=None)
+ VolumeDevice.mount = MagicMock(return_value=None)
+ backup.restore = MagicMock(return_value=None)
+
self.manager.prepare(self.context, self.packages,
None, '2048',
None, device_path=device_path,
mount_point='/var/lib/redis',
backup_info=backup_info)
- verify(redis_service.RedisAppStatus, times=2).get()
- verify(mock_status).begin_install()
- verify(VolumeDevice).format()
- verify(redis_service.RedisApp).install_if_needed(self.packages)
- verify(redis_service.RedisApp).write_config(None)
- verify(redis_service.RedisApp).complete_install_or_restart()
+
+ self.assertEqual(redis_service.RedisAppStatus.get.call_count, 2)
+ mock_status.begin_install.assert_any_call()
+ VolumeDevice.format.assert_any_call()
+ redis_service.RedisApp.install_if_needed.assert_any_call(self.packages)
+ redis_service.RedisApp.write_config.assert_any_call(None)
+ redis_service.RedisApp.complete_install_or_restart.assert_any_call()
def test_restart(self):
- mock_status = mock()
- when(redis_service.RedisAppStatus).get().thenReturn(mock_status)
- when(redis_service.RedisApp).restart().thenReturn(None)
+ mock_status = MagicMock()
+ self.manager.appStatus = mock_status
+ redis_service.RedisAppStatus.get = MagicMock(return_value=mock_status)
+ redis_service.RedisApp.restart = MagicMock(return_value=None)
self.manager.restart(self.context)
- verify(redis_service.RedisAppStatus).get()
- verify(redis_service.RedisApp).restart()
+ redis_service.RedisAppStatus.get.assert_any_call()
+ redis_service.RedisApp.restart.assert_any_call()
def test_stop_db(self):
- mock_status = mock()
- when(redis_service.RedisAppStatus).get().thenReturn(mock_status)
- when(redis_service.RedisApp).stop_db(do_not_start_on_reboot=
- False).thenReturn(None)
+ mock_status = MagicMock()
+ self.manager.appStatus = mock_status
+ redis_service.RedisAppStatus.get = MagicMock(return_value=mock_status)
+ redis_service.RedisApp.stop_db = MagicMock(return_value=None)
self.manager.stop_db(self.context)
- verify(redis_service.RedisAppStatus).get()
- verify(redis_service.RedisApp).stop_db(do_not_start_on_reboot=False)
+ redis_service.RedisAppStatus.get.assert_any_call()
+ redis_service.RedisApp.stop_db.assert_any_call(
+ do_not_start_on_reboot=False)
diff --git a/trove/tests/unittests/mgmt/test_models.py b/trove/tests/unittests/mgmt/test_models.py
index 6bd637e2..5d92b882 100644
--- a/trove/tests/unittests/mgmt/test_models.py
+++ b/trove/tests/unittests/mgmt/test_models.py
@@ -13,14 +13,14 @@
# License for the specific language governing permissions and limitations
# under the License.
#
-from mockito import mock, when, verify, unstub, any
+from mock import MagicMock, patch, ANY
from testtools import TestCase
from testtools.matchers import Equals, Is, Not
from novaclient.v1_1 import Client
from novaclient.v1_1.flavors import FlavorManager, Flavor
from novaclient.v1_1.servers import Server, ServerManager
-from oslo.config.cfg import ConfigOpts
+from oslo.config import cfg
from trove.backup.models import Backup
from trove.common.context import TroveContext
from trove.common import instance as rd_instance
@@ -34,28 +34,27 @@ from trove.openstack.common.notifier import api as notifier
from trove.common import remote
from trove.tests.util import test_config
+CONF = cfg.CONF
+
class MockMgmtInstanceTest(TestCase):
def setUp(self):
super(MockMgmtInstanceTest, self).setUp()
self.context = TroveContext()
self.context.auth_token = 'some_secret_password'
- self.client = mock(Client)
- self.server_mgr = mock(ServerManager)
+ self.client = MagicMock(spec=Client)
+ self.server_mgr = MagicMock(spec=ServerManager)
self.client.servers = self.server_mgr
- self.flavor_mgr = mock(FlavorManager)
+ self.flavor_mgr = MagicMock(spec=FlavorManager)
self.client.flavors = self.flavor_mgr
- when(remote).create_admin_nova_client(self.context).thenReturn(
- self.client)
- when(ConfigOpts)._get('host').thenReturn('test_host')
- when(ConfigOpts)._get('exists_notification_ticks').thenReturn(1)
- when(ConfigOpts)._get('report_interval').thenReturn(20)
- when(ConfigOpts)._get('notification_service_id').thenReturn(
- {'mysql': '123'})
+ remote.create_admin_nova_client = MagicMock(return_value=self.client)
+ CONF.set_override('host', 'test_host')
+ CONF.set_override('exists_notification_ticks', 1)
+ CONF.set_override('report_interval', 20)
+ CONF.set_override('notification_service_id', {'mysql': '123'})
def tearDown(self):
super(MockMgmtInstanceTest, self).tearDown()
- unstub()
@staticmethod
def build_db_instance(status, task_status=InstanceTasks.DELETING):
@@ -73,37 +72,44 @@ class MockMgmtInstanceTest(TestCase):
class TestNotificationTransformer(MockMgmtInstanceTest):
+
def test_tranformer(self):
transformer = mgmtmodels.NotificationTransformer(context=self.context)
status = rd_instance.ServiceStatuses.BUILDING.api_status
db_instance = MockMgmtInstanceTest.build_db_instance(
status, InstanceTasks.BUILDING)
- when(DatabaseModelBase).find_all(deleted=False).thenReturn(
- [db_instance])
- stub_dsv_db_info = mock(datastore_models.DBDatastoreVersion)
- stub_dsv_db_info.id = "test_datastore_version"
- stub_dsv_db_info.datastore_id = "mysql_test_version"
- stub_dsv_db_info.name = "test_datastore_name"
- stub_dsv_db_info.image_id = "test_datastore_image_id"
- stub_dsv_db_info.packages = "test_datastore_pacakges"
- stub_dsv_db_info.active = 1
- stub_dsv_db_info.manager = "mysql"
- stub_datastore_version = datastore_models.DatastoreVersion(
- stub_dsv_db_info)
- when(DatabaseModelBase).find_by(id=any()).thenReturn(
- stub_datastore_version)
-
- when(DatabaseModelBase).find_by(instance_id='1').thenReturn(
- InstanceServiceStatus(rd_instance.ServiceStatuses.BUILDING))
-
- payloads = transformer()
- self.assertIsNotNone(payloads)
- self.assertThat(len(payloads), Equals(1))
- payload = payloads[0]
- self.assertThat(payload['audit_period_beginning'], Not(Is(None)))
- self.assertThat(payload['audit_period_ending'], Not(Is(None)))
- self.assertThat(payload['state'], Equals(status.lower()))
+ with patch.object(DatabaseModelBase, 'find_all',
+ return_value=[db_instance]):
+ stub_dsv_db_info = MagicMock(
+ spec=datastore_models.DBDatastoreVersion)
+ stub_dsv_db_info.id = "test_datastore_version"
+ stub_dsv_db_info.datastore_id = "mysql_test_version"
+ stub_dsv_db_info.name = "test_datastore_name"
+ stub_dsv_db_info.image_id = "test_datastore_image_id"
+ stub_dsv_db_info.packages = "test_datastore_pacakges"
+ stub_dsv_db_info.active = 1
+ stub_dsv_db_info.manager = "mysql"
+ stub_datastore_version = datastore_models.DatastoreVersion(
+ stub_dsv_db_info)
+
+ def side_effect_func(*args, **kwargs):
+ if 'instance_id' in kwargs:
+ return InstanceServiceStatus(
+ rd_instance.ServiceStatuses.BUILDING)
+ else:
+ return stub_datastore_version
+
+ with patch.object(DatabaseModelBase, 'find_by',
+ side_effect=side_effect_func):
+ payloads = transformer()
+ self.assertIsNotNone(payloads)
+ self.assertThat(len(payloads), Equals(1))
+ payload = payloads[0]
+ self.assertThat(payload['audit_period_beginning'],
+ Not(Is(None)))
+ self.assertThat(payload['audit_period_ending'], Not(Is(None)))
+ self.assertThat(payload['state'], Equals(status.lower()))
def test_get_service_id(self):
id_map = {
@@ -126,31 +132,33 @@ class TestNotificationTransformer(MockMgmtInstanceTest):
class TestNovaNotificationTransformer(MockMgmtInstanceTest):
def test_transformer_cache(self):
- flavor = mock(Flavor)
+ flavor = MagicMock(spec=Flavor)
flavor.name = 'db.small'
- when(self.flavor_mgr).get('flavor_1').thenReturn(flavor)
- transformer = mgmtmodels.NovaNotificationTransformer(
- context=self.context)
- transformer2 = mgmtmodels.NovaNotificationTransformer(
- context=self.context)
- self.assertThat(transformer._flavor_cache,
- Not(Is(transformer2._flavor_cache)))
+ with patch.object(self.flavor_mgr, 'get', return_value=flavor):
+ transformer = mgmtmodels.NovaNotificationTransformer(
+ context=self.context)
+ transformer2 = mgmtmodels.NovaNotificationTransformer(
+ context=self.context)
+ self.assertThat(transformer._flavor_cache,
+ Not(Is(transformer2._flavor_cache)))
def test_lookup_flavor(self):
- flavor = mock(Flavor)
+ flavor = MagicMock(spec=Flavor)
flavor.name = 'flav_1'
- when(self.flavor_mgr).get('1').thenReturn(flavor)
- transformer = mgmtmodels.NovaNotificationTransformer(
- context=self.context)
- self.assertThat(transformer._lookup_flavor('1'), Equals(flavor.name))
- self.assertThat(transformer._lookup_flavor('2'), Equals('unknown'))
+ with patch.object(self.flavor_mgr, 'get', side_effect=[flavor, None]):
+ transformer = mgmtmodels.NovaNotificationTransformer(
+ context=self.context)
+ self.assertThat(transformer._lookup_flavor('1'),
+ Equals(flavor.name))
+ self.assertThat(transformer._lookup_flavor('2'),
+ Equals('unknown'))
def test_tranformer(self):
status = rd_instance.ServiceStatuses.BUILDING.api_status
db_instance = MockMgmtInstanceTest.build_db_instance(
status, task_status=InstanceTasks.BUILDING)
- stub_dsv_db_info = mock(datastore_models.DBDatastoreVersion)
+ stub_dsv_db_info = MagicMock(spec=datastore_models.DBDatastoreVersion)
stub_dsv_db_info.id = "test_datastore_version"
stub_dsv_db_info.datastore_id = "mysql_test_version"
stub_dsv_db_info.name = "test_datastore_name"
@@ -160,118 +168,129 @@ class TestNovaNotificationTransformer(MockMgmtInstanceTest):
stub_dsv_db_info.manager = "mysql"
stub_datastore_version = datastore_models.DatastoreVersion(
stub_dsv_db_info)
- when(DatabaseModelBase).find_by(id=any()).thenReturn(
- stub_datastore_version)
- server = mock(Server)
+ flavor = MagicMock(spec=Flavor)
+ flavor.name = 'db.small'
+
+ server = MagicMock(spec=Server)
server.user_id = 'test_user_id'
mgmt_instance = mgmtmodels.SimpleMgmtInstance(self.context,
db_instance,
server,
None)
- when(mgmtmodels).load_mgmt_instances(
- self.context,
- deleted=False,
- client=self.client).thenReturn(
- [mgmt_instance])
- flavor = mock(Flavor)
- flavor.name = 'db.small'
- when(self.flavor_mgr).get('flavor_1').thenReturn(flavor)
-
- # invocation
- transformer = mgmtmodels.NovaNotificationTransformer(
- context=self.context)
- payloads = transformer()
- # assertions
- self.assertIsNotNone(payloads)
- self.assertThat(len(payloads), Equals(1))
- payload = payloads[0]
- self.assertThat(payload['audit_period_beginning'], Not(Is(None)))
- self.assertThat(payload['audit_period_ending'], Not(Is(None)))
- self.assertThat(payload['state'], Equals(status.lower()))
- self.assertThat(payload['instance_type'], Equals('db.small'))
- self.assertThat(payload['instance_type_id'], Equals('flavor_1'))
- self.assertThat(payload['user_id'], Equals('test_user_id'))
- self.assertThat(payload['service_id'], Equals('123'))
+
+ with patch.object(DatabaseModelBase, 'find_by',
+ return_value=stub_datastore_version):
+
+ with patch.object(mgmtmodels, 'load_mgmt_instances',
+ return_value=[mgmt_instance]):
+
+ with patch.object(self.flavor_mgr, 'get', return_value=flavor):
+
+ # invocation
+ transformer = mgmtmodels.NovaNotificationTransformer(
+ context=self.context)
+ payloads = transformer()
+
+ # assertions
+ self.assertIsNotNone(payloads)
+ self.assertThat(len(payloads), Equals(1))
+ payload = payloads[0]
+ self.assertThat(payload['audit_period_beginning'],
+ Not(Is(None)))
+ self.assertThat(payload['audit_period_ending'],
+ Not(Is(None)))
+ self.assertThat(payload['state'], Equals(status.lower()))
+ self.assertThat(payload['instance_type'],
+ Equals('db.small'))
+ self.assertThat(payload['instance_type_id'],
+ Equals('flavor_1'))
+ self.assertThat(payload['user_id'], Equals('test_user_id'))
+ self.assertThat(payload['service_id'], Equals('123'))
def test_tranformer_invalid_datastore_manager(self):
status = rd_instance.ServiceStatuses.BUILDING.api_status
db_instance = MockMgmtInstanceTest.build_db_instance(
status, task_status=InstanceTasks.BUILDING)
- server = mock(Server)
+ server = MagicMock(spec=Server)
server.user_id = 'test_user_id'
- stub_datastore_version = mock()
+ stub_datastore_version = MagicMock()
stub_datastore_version.id = "stub_datastore_version"
stub_datastore_version.manager = "m0ng0"
- when(datastore_models.
- DatastoreVersion).load(any(), any()).thenReturn(
- stub_datastore_version)
- when(datastore_models.
- DatastoreVersion).load_by_uuid(any()).thenReturn(
- stub_datastore_version)
-
- stub_datastore = mock()
+ stub_datastore = MagicMock()
stub_datastore.default_datastore_version = "stub_datastore_version"
- when(datastore_models.
- Datastore).load(any()).thenReturn(stub_datastore)
- mgmt_instance = mgmtmodels.SimpleMgmtInstance(self.context,
- db_instance,
- server,
- None)
- when(mgmtmodels).load_mgmt_instances(
- self.context,
- deleted=False,
- client=self.client).thenReturn(
- [mgmt_instance])
- flavor = mock(Flavor)
+
+ flavor = MagicMock(spec=Flavor)
flavor.name = 'db.small'
- when(self.flavor_mgr).get('flavor_1').thenReturn(flavor)
-
- # invocation
- transformer = mgmtmodels.NovaNotificationTransformer(
- context=self.context)
- payloads = transformer()
- # assertions
- self.assertIsNotNone(payloads)
- self.assertThat(len(payloads), Equals(1))
- payload = payloads[0]
- self.assertThat(payload['audit_period_beginning'], Not(Is(None)))
- self.assertThat(payload['audit_period_ending'], Not(Is(None)))
- self.assertThat(payload['state'], Equals(status.lower()))
- self.assertThat(payload['instance_type'], Equals('db.small'))
- self.assertThat(payload['instance_type_id'], Equals('flavor_1'))
- self.assertThat(payload['user_id'], Equals('test_user_id'))
- self.assertThat(payload['service_id'],
- Equals('unknown-service-id-error'))
+
+ with patch.object(datastore_models.DatastoreVersion, 'load',
+ return_value=stub_datastore_version):
+ with patch.object(datastore_models.DatastoreVersion,
+ 'load_by_uuid',
+ return_value=stub_datastore_version):
+ with patch.object(datastore_models.Datastore, 'load',
+ return_value=stub_datastore):
+ mgmt_instance = mgmtmodels.SimpleMgmtInstance(self.context,
+ db_instance,
+ server,
+ None)
+ with patch.object(mgmtmodels, 'load_mgmt_instances',
+ return_value=[mgmt_instance]):
+ with patch.object(self.flavor_mgr,
+ 'get', return_value=flavor):
+
+ # invocation
+ transformer = (
+ mgmtmodels.NovaNotificationTransformer(
+ context=self.context)
+ )
+
+ payloads = transformer()
+ # assertions
+ self.assertIsNotNone(payloads)
+ self.assertThat(len(payloads), Equals(1))
+ payload = payloads[0]
+ self.assertThat(payload['audit_period_beginning'],
+ Not(Is(None)))
+ self.assertThat(payload['audit_period_ending'],
+ Not(Is(None)))
+ self.assertThat(payload['state'],
+ Equals(status.lower()))
+ self.assertThat(payload['instance_type'],
+ Equals('db.small'))
+ self.assertThat(payload['instance_type_id'],
+ Equals('flavor_1'))
+ self.assertThat(payload['user_id'],
+ Equals('test_user_id'))
+ self.assertThat(payload['service_id'],
+ Equals('unknown-service-id-error'))
def test_tranformer_shutdown_instance(self):
status = rd_instance.ServiceStatuses.SHUTDOWN.api_status
db_instance = self.build_db_instance(status)
- server = mock(Server)
+ server = MagicMock(spec=Server)
server.user_id = 'test_user_id'
mgmt_instance = mgmtmodels.SimpleMgmtInstance(self.context,
db_instance,
server,
None)
- when(Backup).running('1').thenReturn(None)
- self.assertThat(mgmt_instance.status, Equals('SHUTDOWN'))
- when(mgmtmodels).load_mgmt_instances(
- self.context,
- deleted=False,
- client=self.client).thenReturn(
- [mgmt_instance])
- flavor = mock(Flavor)
+ flavor = MagicMock(spec=Flavor)
flavor.name = 'db.small'
- when(self.flavor_mgr).get('flavor_1').thenReturn(flavor)
- # invocation
- transformer = mgmtmodels.NovaNotificationTransformer(
- context=self.context)
- payloads = transformer()
- # assertion that SHUTDOWN instances are not reported
- self.assertIsNotNone(payloads)
- self.assertThat(len(payloads), Equals(0))
+
+ with patch.object(Backup, 'running', return_value=None):
+ self.assertThat(mgmt_instance.status, Equals('SHUTDOWN'))
+ with patch.object(mgmtmodels, 'load_mgmt_instances',
+ return_value=[mgmt_instance]):
+ with patch.object(self.flavor_mgr, 'get', return_value=flavor):
+ # invocation
+ transformer = mgmtmodels.NovaNotificationTransformer(
+ context=self.context)
+ payloads = transformer()
+ # assertion that SHUTDOWN instances are not reported
+ self.assertIsNotNone(payloads)
+ self.assertThat(len(payloads), Equals(0))
def test_tranformer_no_nova_instance(self):
status = rd_instance.ServiceStatuses.SHUTDOWN.api_status
@@ -281,59 +300,57 @@ class TestNovaNotificationTransformer(MockMgmtInstanceTest):
db_instance,
None,
None)
- when(Backup).running('1').thenReturn(None)
- self.assertThat(mgmt_instance.status, Equals('SHUTDOWN'))
- when(mgmtmodels).load_mgmt_instances(
- self.context,
- deleted=False,
- client=self.client).thenReturn(
- [mgmt_instance])
- flavor = mock(Flavor)
+ flavor = MagicMock(spec=Flavor)
flavor.name = 'db.small'
- when(self.flavor_mgr).get('flavor_1').thenReturn(flavor)
- # invocation
- transformer = mgmtmodels.NovaNotificationTransformer(
- context=self.context)
- payloads = transformer()
- # assertion that SHUTDOWN instances are not reported
- self.assertIsNotNone(payloads)
- self.assertThat(len(payloads), Equals(0))
+
+ with patch.object(Backup, 'running', return_value=None):
+ self.assertThat(mgmt_instance.status, Equals('SHUTDOWN'))
+ with patch.object(mgmtmodels, 'load_mgmt_instances',
+ return_value=[mgmt_instance]):
+ with patch.object(self.flavor_mgr, 'get', return_value=flavor):
+ # invocation
+ transformer = mgmtmodels.NovaNotificationTransformer(
+ context=self.context)
+ payloads = transformer()
+ # assertion that SHUTDOWN instances are not reported
+ self.assertIsNotNone(payloads)
+ self.assertThat(len(payloads), Equals(0))
def test_tranformer_flavor_cache(self):
status = rd_instance.ServiceStatuses.BUILDING.api_status
db_instance = MockMgmtInstanceTest.build_db_instance(
status, InstanceTasks.BUILDING)
- server = mock(Server)
+ server = MagicMock(spec=Server)
server.user_id = 'test_user_id'
mgmt_instance = mgmtmodels.SimpleMgmtInstance(self.context,
db_instance,
server,
None)
- when(mgmtmodels).load_mgmt_instances(
- self.context,
- deleted=False,
- client=self.client).thenReturn(
- [mgmt_instance])
- flavor = mock(Flavor)
+ flavor = MagicMock(spec=Flavor)
flavor.name = 'db.small'
- when(self.flavor_mgr).get('flavor_1').thenReturn(flavor)
- transformer = mgmtmodels.NovaNotificationTransformer(
- context=self.context)
- transformer()
- # call twice ensure client.flavor invoked once
- payloads = transformer()
- self.assertIsNotNone(payloads)
- self.assertThat(len(payloads), Equals(1))
- payload = payloads[0]
- self.assertThat(payload['audit_period_beginning'], Not(Is(None)))
- self.assertThat(payload['audit_period_ending'], Not(Is(None)))
- self.assertThat(payload['state'], Equals(status.lower()))
- self.assertThat(payload['instance_type'], Equals('db.small'))
- self.assertThat(payload['instance_type_id'], Equals('flavor_1'))
- self.assertThat(payload['user_id'], Equals('test_user_id'))
- # ensure cache was used to get flavor second time
- verify(self.flavor_mgr).get('flavor_1')
+
+ with patch.object(mgmtmodels, 'load_mgmt_instances',
+ return_value=[mgmt_instance]):
+ with patch.object(self.flavor_mgr, 'get', return_value=flavor):
+ transformer = mgmtmodels.NovaNotificationTransformer(
+ context=self.context)
+ transformer()
+ # call twice ensure client.flavor invoked once
+ payloads = transformer()
+ self.assertIsNotNone(payloads)
+ self.assertThat(len(payloads), Equals(1))
+ payload = payloads[0]
+ self.assertThat(payload['audit_period_beginning'],
+ Not(Is(None)))
+ self.assertThat(payload['audit_period_ending'], Not(Is(None)))
+ self.assertThat(payload['state'], Equals(status.lower()))
+ self.assertThat(payload['instance_type'], Equals('db.small'))
+ self.assertThat(payload['instance_type_id'],
+ Equals('flavor_1'))
+ self.assertThat(payload['user_id'], Equals('test_user_id'))
+ # ensure cache was used to get flavor second time
+ self.flavor_mgr.get.assert_any_call('flavor_1')
class TestMgmtInstanceTasks(MockMgmtInstanceTest):
@@ -342,34 +359,31 @@ class TestMgmtInstanceTasks(MockMgmtInstanceTest):
db_instance = MockMgmtInstanceTest.build_db_instance(
status, task_status=InstanceTasks.BUILDING)
- server = mock(Server)
+ server = MagicMock(spec=Server)
server.user_id = 'test_user_id'
mgmt_instance = mgmtmodels.SimpleMgmtInstance(self.context,
db_instance,
server,
None)
- when(mgmtmodels).load_mgmt_instances(
- self.context,
- deleted=False,
- client=self.client).thenReturn(
- [mgmt_instance, mgmt_instance])
- flavor = mock(Flavor)
+
+ flavor = MagicMock(spec=Flavor)
flavor.name = 'db.small'
- when(self.flavor_mgr).get('flavor_1').thenReturn(flavor)
- self.assertThat(self.context.auth_token, Is('some_secret_password'))
- when(notifier).notify(self.context,
- any(str),
- 'trove.instance.exists',
- 'INFO',
- any(dict)).thenReturn(None)
- # invocation
- mgmtmodels.publish_exist_events(
- mgmtmodels.NovaNotificationTransformer(context=self.context),
- self.context)
- # assertion
- verify(notifier, times=2).notify(self.context,
- any(str),
- 'trove.instance.exists',
- 'INFO',
- any(dict))
- self.assertThat(self.context.auth_token, Is(None))
+
+ with patch.object(mgmtmodels, 'load_mgmt_instances',
+ return_value=[mgmt_instance]):
+ with patch.object(self.flavor_mgr, 'get', return_value=flavor):
+ self.assertThat(self.context.auth_token,
+ Is('some_secret_password'))
+ with patch.object(notifier, 'notify', return_value=None):
+ # invocation
+ mgmtmodels.publish_exist_events(
+ mgmtmodels.NovaNotificationTransformer(
+ context=self.context),
+ self.context)
+ # assertion
+ notifier.notify.assert_any_call(self.context,
+ 'test_host',
+ 'trove.instance.exists',
+ 'INFO',
+ ANY)
+ self.assertThat(self.context.auth_token, Is(None))
diff --git a/trove/tests/unittests/quota/test_quota.py b/trove/tests/unittests/quota/test_quota.py
index 685c4969..c7628359 100644
--- a/trove/tests/unittests/quota/test_quota.py
+++ b/trove/tests/unittests/quota/test_quota.py
@@ -12,8 +12,7 @@
# License for the specific language governing permissions and limitations
# under the License.
import testtools
-from mockito import mock, when, unstub, any, verify, never
-from mock import Mock
+from mock import Mock, MagicMock, patch
from trove.quota.quota import DbQuotaDriver
from trove.quota.models import Resource
from trove.quota.models import Quota
@@ -82,17 +81,16 @@ class QuotaControllerTest(testtools.TestCase):
def setUp(self):
super(QuotaControllerTest, self).setUp()
- context = mock()
+ context = MagicMock()
context.is_admin = True
- req = mock()
- req.environ = mock()
- when(req.environ).get(any()).thenReturn(context)
+ req = MagicMock()
+ req.environ = MagicMock()
+ req.environ.get = MagicMock(return_value=context)
self.req = req
self.controller = QuotaController()
def tearDown(self):
super(QuotaControllerTest, self).tearDown()
- unstub()
def test_update_unknown_resource(self):
body = {'quotas': {'unknown_resource': 5}}
@@ -101,47 +99,46 @@ class QuotaControllerTest(testtools.TestCase):
FAKE_TENANT1, FAKE_TENANT2)
def test_update_resource_no_value(self):
- quota = mock(Quota)
- when(DatabaseModelBase).find_by(tenant_id=FAKE_TENANT2,
- resource='instances').thenReturn(quota)
- body = {'quotas': {'instances': None}}
- result = self.controller.update(self.req, body, FAKE_TENANT1,
- FAKE_TENANT2)
- verify(quota, never).save()
- self.assertEqual(200, result.status)
+ quota = MagicMock(spec=Quota)
+ with patch.object(DatabaseModelBase, 'find_by', return_value=quota):
+ body = {'quotas': {'instances': None}}
+ result = self.controller.update(self.req, body, FAKE_TENANT1,
+ FAKE_TENANT2)
+ self.assertEqual(quota.save.call_count, 0)
+ self.assertEqual(200, result.status)
def test_update_resource_instance(self):
- instance_quota = mock(Quota)
- when(DatabaseModelBase).find_by(
- tenant_id=FAKE_TENANT2,
- resource='instances').thenReturn(instance_quota)
- body = {'quotas': {'instances': 2}}
- result = self.controller.update(self.req, body, FAKE_TENANT1,
- FAKE_TENANT2)
- verify(instance_quota, times=1).save()
- self.assertTrue('instances' in result._data['quotas'])
- self.assertEqual(200, result.status)
- self.assertEqual(2, result._data['quotas']['instances'])
+ instance_quota = MagicMock(spec=Quota)
+ with patch.object(DatabaseModelBase, 'find_by',
+ return_value=instance_quota):
+ body = {'quotas': {'instances': 2}}
+ result = self.controller.update(self.req, body, FAKE_TENANT1,
+ FAKE_TENANT2)
+ self.assertEqual(instance_quota.save.call_count, 1)
+ self.assertTrue('instances' in result._data['quotas'])
+ self.assertEqual(200, result.status)
+ self.assertEqual(2, result._data['quotas']['instances'])
@testtools.skipIf(not CONF.trove_volume_support,
'Volume support is not enabled')
def test_update_resource_volume(self):
- instance_quota = mock(Quota)
- when(DatabaseModelBase).find_by(
- tenant_id=FAKE_TENANT2,
- resource='instances').thenReturn(instance_quota)
- volume_quota = mock(Quota)
- when(DatabaseModelBase).find_by(
- tenant_id=FAKE_TENANT2,
- resource='volumes').thenReturn(volume_quota)
- body = {'quotas': {'instances': None, 'volumes': 10}}
- result = self.controller.update(self.req, body, FAKE_TENANT1,
- FAKE_TENANT2)
- verify(instance_quota, never).save()
- self.assertFalse('instances' in result._data['quotas'])
- verify(volume_quota, times=1).save()
- self.assertEqual(200, result.status)
- self.assertEqual(10, result._data['quotas']['volumes'])
+ instance_quota = MagicMock(spec=Quota)
+ volume_quota = MagicMock(spec=Quota)
+
+ def side_effect_func(*args, **kwargs):
+ return (instance_quota if kwargs['resource'] == 'instances'
+ else volume_quota)
+
+ with patch.object(DatabaseModelBase, 'find_by',
+ side_effect=side_effect_func):
+ body = {'quotas': {'instances': None, 'volumes': 10}}
+ result = self.controller.update(self.req, body, FAKE_TENANT1,
+ FAKE_TENANT2)
+ self.assertEqual(instance_quota.save.call_count, 0)
+ self.assertFalse('instances' in result._data['quotas'])
+ self.assertEqual(volume_quota.save.call_count, 1)
+ self.assertEqual(200, result.status)
+ self.assertEqual(10, result._data['quotas']['volumes'])
class DbQuotaDriverTest(testtools.TestCase):
@@ -336,8 +333,12 @@ class DbQuotaDriverTest(testtools.TestCase):
in_use=0,
reserved=0)]
+ def side_effect_func(*args, **kwargs):
+ return (FAKE_QUOTAS[0] if kwargs['resource'] == 'instances'
+ else FAKE_QUOTAS[1])
+
self.mock_usage_result.all = Mock(return_value=[])
- QuotaUsage.create = Mock(side_effect=FAKE_QUOTAS)
+ QuotaUsage.create = Mock(side_effect=side_effect_func)
usages = self.driver.get_all_quota_usages_by_tenant(FAKE_TENANT1,
resources.keys())
diff --git a/trove/tests/unittests/secgroups/test_security_group.py b/trove/tests/unittests/secgroups/test_security_group.py
index 2ef08b0b..ccf7f08b 100644
--- a/trove/tests/unittests/secgroups/test_security_group.py
+++ b/trove/tests/unittests/secgroups/test_security_group.py
@@ -16,7 +16,6 @@ import testtools
import uuid
import trove.common.remote
from mock import Mock
-from mockito import mock, unstub
from trove.common import exception
from trove.tests.fakes import nova
from trove.extensions.security_group import models as sec_mod
@@ -103,8 +102,8 @@ class SecurityGroupDeleteTest(testtools.TestCase):
def setUp(self):
super(SecurityGroupDeleteTest, self).setUp()
- inst_model.CONF = mock()
- self.context = mock()
+ inst_model.CONF = Mock()
+ self.context = Mock()
self. original_find_by = (sec_mod.
SecurityGroupInstanceAssociation.
find_by)
@@ -119,7 +118,6 @@ class SecurityGroupDeleteTest(testtools.TestCase):
find_by) = self.original_find_by
(sec_mod.SecurityGroupInstanceAssociation.
delete) = self.original_delete
- unstub()
def _raise(self, ex):
raise ex
diff --git a/trove/tests/unittests/taskmanager/test_models.py b/trove/tests/unittests/taskmanager/test_models.py
index 5d931d31..8a6bbd15 100644
--- a/trove/tests/unittests/taskmanager/test_models.py
+++ b/trove/tests/unittests/taskmanager/test_models.py
@@ -14,9 +14,8 @@
import datetime
import testtools
-from mock import Mock
+from mock import Mock, MagicMock, patch
from testtools.matchers import Equals, Is
-from mockito import mock, when, unstub, any, verify, never
from cinderclient import exceptions as cinder_exceptions
import novaclient.v1_1.servers
import novaclient.v1_1.flavors
@@ -102,6 +101,7 @@ class fake_InstanceServiceStatus(object):
def __init__(self):
self.deleted = False
+ self.status = None
pass
def set_status(self, status):
@@ -163,23 +163,25 @@ class fake_DBInstance(object):
class FreshInstanceTasksTest(testtools.TestCase):
def setUp(self):
super(FreshInstanceTasksTest, self).setUp()
+ mock_instance = patch('trove.instance.models.FreshInstance')
+ mock_instance.start()
+ self.addCleanup(mock_instance.stop)
+ mock_instance.id = Mock(return_value='instance_id')
+ mock_instance.tenant_id = Mock(return_value="tenant_id")
+ mock_instance.hostname = Mock(return_value="hostname")
+ mock_instance.name = Mock(return_value='name')
+ mock_instance.nova_client = Mock(
+ return_value=fake_nova_client())
+ mock_datastore_v = patch(
+ 'trove.datastore.models.DatastoreVersion')
+ mock_datastore_v.start()
+ self.addCleanup(mock_datastore_v.stop)
+ mock_datastore = patch(
+ 'trove.datastore.models.Datastore')
+ mock_datastore.start()
+ self.addCleanup(mock_datastore.stop)
- when(taskmanager_models.FreshInstanceTasks).id().thenReturn(
- "instance_id")
- when(taskmanager_models.FreshInstanceTasks).tenant_id().thenReturn(
- "tenant_id")
- when(taskmanager_models.FreshInstanceTasks).hostname().thenReturn(
- "hostname")
- when(taskmanager_models.FreshInstanceTasks).name().thenReturn(
- 'name')
- when(datastore_models.
- DatastoreVersion).load(any(), any()).thenReturn(mock())
- when(datastore_models.
- DatastoreVersion).load_by_uuid(any()).thenReturn(mock())
- when(datastore_models.
- Datastore).load(any()).thenReturn(mock())
taskmanager_models.FreshInstanceTasks.nova_client = fake_nova_client()
- taskmanager_models.CONF = mock()
self.orig_ISS_find_by = InstanceServiceStatus.find_by
self.orig_DBI_find_by = DBInstance.find_by
self.userdata = "hello moto"
@@ -191,7 +193,7 @@ class FreshInstanceTasksTest(testtools.TestCase):
self.guestconfig = f.name
f.write(self.guestconfig_content)
self.freshinstancetasks = taskmanager_models.FreshInstanceTasks(
- None, mock(), None, None)
+ None, Mock(), None, None)
def tearDown(self):
super(FreshInstanceTasksTest, self).tearDown()
@@ -199,55 +201,74 @@ class FreshInstanceTasksTest(testtools.TestCase):
os.remove(self.guestconfig)
InstanceServiceStatus.find_by = self.orig_ISS_find_by
DBInstance.find_by = self.orig_DBI_find_by
- unstub()
- def test_create_instance_userdata(self):
- when(taskmanager_models.CONF).get(any()).thenReturn('')
+ @patch('trove.taskmanager.models.CONF')
+ def test_create_instance_userdata(self, mock_conf):
cloudinit_location = os.path.dirname(self.cloudinit)
datastore_manager = os.path.splitext(os.path.basename(self.
cloudinit))[0]
- when(taskmanager_models.CONF).get("cloudinit_location").thenReturn(
- cloudinit_location)
+
+ def fake_conf_getter(*args, **kwargs):
+ if args[0] == 'cloudinit_location':
+ return cloudinit_location
+ else:
+ return ''
+ mock_conf.get.side_effect = fake_conf_getter
+
server = self.freshinstancetasks._create_server(
None, None, None, datastore_manager, None, None, None)
self.assertEqual(server.userdata, self.userdata)
- def test_create_instance_guestconfig(self):
- when(taskmanager_models.CONF).get(any()).thenReturn('')
- when(taskmanager_models.CONF).get("guest_config").thenReturn(
- self.guestconfig)
+ @patch('trove.taskmanager.models.CONF')
+ def test_create_instance_guestconfig(self, mock_conf):
+ def fake_conf_getter(*args, **kwargs):
+ if args[0] == 'guest_config':
+ return self.guestconfig
+ else:
+ return ''
+ mock_conf.get.side_effect = fake_conf_getter
+ # execute
server = self.freshinstancetasks._create_server(
None, None, None, "test", None, None, None)
+ # verify
self.assertTrue('/etc/trove-guestagent.conf' in server.files)
self.assertEqual(server.files['/etc/trove-guestagent.conf'],
self.guestconfig_content)
- def test_create_instance_with_az_kwarg(self):
- when(taskmanager_models.CONF).get(any()).thenReturn('')
+ @patch('trove.taskmanager.models.CONF')
+ def test_create_instance_with_az_kwarg(self, mock_conf):
+ mock_conf.get.return_value = ''
+ # execute
server = self.freshinstancetasks._create_server(
None, None, None, None, None, availability_zone='nova', nics=None)
-
+ # verify
self.assertIsNotNone(server)
- def test_create_instance_with_az(self):
- when(taskmanager_models.CONF).get(any()).thenReturn('')
+ @patch('trove.taskmanager.models.CONF')
+ def test_create_instance_with_az(self, mock_conf):
+ mock_conf.get.return_value = ''
+ # execute
server = self.freshinstancetasks._create_server(
None, None, None, None, None, 'nova', None)
-
+ # verify
self.assertIsNotNone(server)
- def test_create_instance_with_az_none(self):
- when(taskmanager_models.CONF).get(any()).thenReturn('')
+ @patch('trove.taskmanager.models.CONF')
+ def test_create_instance_with_az_none(self, mock_conf):
+ mock_conf.get.return_value = ''
+ # execute
server = self.freshinstancetasks._create_server(
None, None, None, None, None, None, None)
-
+ # verify
self.assertIsNotNone(server)
- def test_update_status_of_intance_failure(self):
-
+ @patch('trove.taskmanager.models.CONF')
+ def test_update_status_of_intance_failure(self, mock_conf):
+ mock_conf.get.return_value = ''
InstanceServiceStatus.find_by = Mock(
return_value=fake_InstanceServiceStatus.find_by())
- DBInstance.find_by = Mock(return_value=fake_DBInstance.find_by())
+ DBInstance.find_by = Mock(
+ return_value=fake_DBInstance.find_by())
self.freshinstancetasks.update_statuses_on_time_out()
self.assertEqual(fake_InstanceServiceStatus.find_by().get_status(),
ServiceStatuses.FAILED_TIMEOUT_GUESTAGENT)
@@ -260,7 +281,8 @@ class FreshInstanceTasksTest(testtools.TestCase):
Mock(return_value={'id': uuid.uuid4(),
'name': uuid.uuid4()}))
taskmanager_models.CONF.get = Mock(return_value=FakeOptGroup())
- taskmanager_models.SecurityGroupRule.create_sec_group_rule = Mock()
+ taskmanager_models.SecurityGroupRule.create_sec_group_rule = (
+ Mock())
self.freshinstancetasks._create_secgroup(datastore_manager)
self.assertEqual(2, taskmanager_models.SecurityGroupRule.
create_sec_group_rule.call_count)
@@ -273,7 +295,8 @@ class FreshInstanceTasksTest(testtools.TestCase):
taskmanager_models.CONF.get = Mock(
return_value=FakeOptGroup(tcp_ports=['3306', '-3306']))
self.freshinstancetasks.update_db = Mock()
- taskmanager_models.SecurityGroupRule.create_sec_group_rule = Mock()
+ taskmanager_models.SecurityGroupRule.create_sec_group_rule = (
+ Mock())
self.assertRaises(MalformedSecurityGroupRuleError,
self.freshinstancetasks._create_secgroup,
datastore_manager)
@@ -286,7 +309,8 @@ class FreshInstanceTasksTest(testtools.TestCase):
taskmanager_models.CONF.get = Mock(
return_value=FakeOptGroup(tcp_ports=['3306', '33060-3306']))
self.freshinstancetasks.update_db = Mock()
- taskmanager_models.SecurityGroupRule.create_sec_group_rule = Mock()
+ taskmanager_models.SecurityGroupRule.create_sec_group_rule = (
+ Mock())
self.assertRaises(MalformedSecurityGroupRuleError,
self.freshinstancetasks._create_secgroup,
datastore_manager)
@@ -299,7 +323,8 @@ class FreshInstanceTasksTest(testtools.TestCase):
taskmanager_models.CONF.get = Mock(
return_value=FakeOptGroup(
tcp_ports=['3306', '3306', '3306-3307', '3306-3307']))
- taskmanager_models.SecurityGroupRule.create_sec_group_rule = Mock()
+ taskmanager_models.SecurityGroupRule.create_sec_group_rule = (
+ Mock())
self.freshinstancetasks.update_db = Mock()
self.freshinstancetasks._create_secgroup(datastore_manager)
self.assertEqual(2, taskmanager_models.SecurityGroupRule.
@@ -308,7 +333,8 @@ class FreshInstanceTasksTest(testtools.TestCase):
def test_create_sg_rules_exception_with_malformed_ports_or_range(self):
datastore_manager = 'mysql'
taskmanager_models.SecurityGroup.create_for_instance = (
- Mock(return_value={'id': uuid.uuid4(), 'name': uuid.uuid4()}))
+ Mock(return_value={'id': uuid.uuid4(),
+ 'name': uuid.uuid4()}))
taskmanager_models.CONF.get = Mock(
return_value=FakeOptGroup(tcp_ports=['A', 'B-C']))
self.freshinstancetasks.update_db = Mock()
@@ -372,7 +398,8 @@ class ResizeVolumeTest(testtools.TestCase):
self.instance.reset_mock()
def test_resize_volume_verify_extend_no_volume(self):
- self.instance.volume_client.volumes.get = Mock(return_value=None)
+ self.instance.volume_client.volumes.get = Mock(
+ return_value=None)
self.assertRaises(cinder_exceptions.ClientException,
self.action._verify_extend)
self.instance.reset_mock()
@@ -410,36 +437,30 @@ class ResizeVolumeTest(testtools.TestCase):
class BuiltInstanceTasksTest(testtools.TestCase):
- def stub_inst_service_status(self, status_id, statuses):
+ def get_inst_service_status(self, status_id, statuses):
answers = []
for i, status in enumerate(statuses):
inst_svc_status = InstanceServiceStatus(status,
id="%s-%s" % (status_id,
i))
- when(inst_svc_status).save().thenReturn(None)
+ inst_svc_status.save = MagicMock(return_value=None)
answers.append(inst_svc_status)
-
- when(trove.db.models.DatabaseModelBase).find_by(
- instance_id=any()).thenReturn(*answers)
+ return answers
def _stub_volume_client(self):
- self.instance_task._volume_client = mock(cinderclient.Client)
- stub_volume_mgr = mock(cinderclient.volumes.VolumeManager)
+ self.instance_task._volume_client = MagicMock(spec=cinderclient.Client)
+ stub_volume_mgr = MagicMock(spec=cinderclient.volumes.VolumeManager)
self.instance_task.volume_client.volumes = stub_volume_mgr
- stub_volume = cinderclient.volumes.Volume(stub_volume_mgr,
- {'status': 'available'},
- True)
- when(stub_volume_mgr).extend(VOLUME_ID, 2).thenReturn(None)
+ stub_volume_mgr.extend = MagicMock(return_value=None)
stub_new_volume = cinderclient.volumes.Volume(
stub_volume_mgr, {'status': 'available', 'size': 2}, True)
- when(stub_volume_mgr).get(any()).thenReturn(
- stub_volume).thenReturn(stub_new_volume)
- when(stub_volume_mgr).attach(any(), VOLUME_ID).thenReturn(None)
+ stub_volume_mgr.get = MagicMock(return_value=stub_new_volume)
+ stub_volume_mgr.attach = MagicMock(return_value=None)
def setUp(self):
super(BuiltInstanceTasksTest, self).setUp()
self.new_flavor = {'id': 8, 'ram': 768, 'name': 'bigger_flavor'}
- stub_nova_server = mock(novaclient.v1_1.servers.Server)
+ stub_nova_server = MagicMock()
db_instance = DBInstance(InstanceTasks.NONE,
id=INST_ID,
name='resize-inst-name',
@@ -453,13 +474,14 @@ class BuiltInstanceTasksTest(testtools.TestCase):
tenant_id='testresize-tenant-id',
volume_size='1',
volume_id=VOLUME_ID)
+
# this is used during the final check of whether the resize successful
db_instance.server_status = 'ACTIVE'
self.db_instance = db_instance
- when(datastore_models.DatastoreVersion).load_by_uuid(any()).thenReturn(
- datastore_models.DatastoreVersion(db_instance))
- when(datastore_models.Datastore).load('id-1').thenReturn(
- datastore_models.Datastore(db_instance))
+ datastore_models.DatastoreVersion.load_by_uuid = MagicMock(
+ return_value=datastore_models.DatastoreVersion(db_instance))
+ datastore_models.Datastore.load = MagicMock(
+ return_value=datastore_models.Datastore(db_instance))
self.instance_task = taskmanager_models.BuiltInstanceTasks(
trove.common.context.TroveContext(),
@@ -468,47 +490,59 @@ class BuiltInstanceTasksTest(testtools.TestCase):
InstanceServiceStatus(ServiceStatuses.RUNNING,
id='inst-stat-id-0'))
- self.instance_task._guest = mock(trove.guestagent.api.API)
- self.instance_task._nova_client = mock(novaclient.v1_1.Client)
- self.stub_server_mgr = mock(novaclient.v1_1.servers.ServerManager)
- self.stub_running_server = mock(novaclient.v1_1.servers.Server)
+ self.instance_task._guest = MagicMock(spec=trove.guestagent.api.API)
+ self.instance_task._nova_client = MagicMock(
+ spec=novaclient.v1_1.Client)
+ self.stub_server_mgr = MagicMock(
+ spec=novaclient.v1_1.servers.ServerManager)
+ self.stub_running_server = MagicMock(
+ spec=novaclient.v1_1.servers.Server)
self.stub_running_server.status = 'ACTIVE'
self.stub_running_server.flavor = {'id': 6, 'ram': 512}
- self.stub_verifying_server = mock(novaclient.v1_1.servers.Server)
+ self.stub_verifying_server = MagicMock(
+ spec=novaclient.v1_1.servers.Server)
self.stub_verifying_server.status = 'VERIFY_RESIZE'
self.stub_verifying_server.flavor = {'id': 8, 'ram': 768}
- when(self.stub_server_mgr).get(any()).thenReturn(
- self.stub_verifying_server)
+ self.stub_server_mgr.get = MagicMock(
+ return_value=self.stub_verifying_server)
self.instance_task._nova_client.servers = self.stub_server_mgr
- stub_flavor_manager = mock(novaclient.v1_1.flavors.FlavorManager)
+ stub_flavor_manager = MagicMock(
+ spec=novaclient.v1_1.flavors.FlavorManager)
self.instance_task._nova_client.flavors = stub_flavor_manager
nova_flavor = novaclient.v1_1.flavors.Flavor(stub_flavor_manager,
self.new_flavor,
True)
- when(stub_flavor_manager).get(any()).thenReturn(nova_flavor)
-
- self.stub_inst_service_status('inst_stat-id',
- [ServiceStatuses.SHUTDOWN,
- ServiceStatuses.RUNNING,
- ServiceStatuses.RUNNING])
-
- when(template).SingleInstanceConfigTemplate(
- any(), any(), any()).thenReturn(
- mock(template.SingleInstanceConfigTemplate))
-
- when(trove.db.models.DatabaseModelBase).find_by(
- id=any(), deleted=False).thenReturn(db_instance)
- when(db_instance).save().thenReturn(None)
-
- when(trove.backup.models.Backup).running(any()).thenReturn(None)
+ stub_flavor_manager.get = MagicMock(return_value=nova_flavor)
+
+ answers = (status for status in
+ self.get_inst_service_status('inst_stat-id',
+ [ServiceStatuses.SHUTDOWN,
+ ServiceStatuses.RUNNING,
+ ServiceStatuses.RUNNING,
+ ServiceStatuses.RUNNING]))
+
+ def side_effect_func(*args, **kwargs):
+ if 'instance_id' in kwargs:
+ return answers.next()
+ elif ('id' in kwargs and 'deleted' in kwargs
+ and not kwargs['deleted']):
+ return db_instance
+ else:
+ return MagicMock()
+ trove.db.models.DatabaseModelBase.find_by = MagicMock(
+ side_effect=side_effect_func)
+
+ template.SingleInstanceConfigTemplate = MagicMock(
+ spec=template.SingleInstanceConfigTemplate)
+ db_instance.save = MagicMock(return_value=None)
+ trove.backup.models.Backup.running = MagicMock(return_value=None)
if 'volume' in self._testMethodName:
self._stub_volume_client()
def tearDown(self):
super(BuiltInstanceTasksTest, self).tearDown()
- unstub()
def test_resize_flavor(self):
orig_server = self.instance_task.server
@@ -516,27 +550,31 @@ class BuiltInstanceTasksTest(testtools.TestCase):
self.new_flavor)
# verify
self.assertIsNot(self.instance_task.server, orig_server)
- verify(self.instance_task._guest).stop_db(do_not_start_on_reboot=True)
- verify(orig_server).resize(self.new_flavor['id'])
+ self.instance_task._guest.stop_db.assert_any_call(
+ do_not_start_on_reboot=True)
+ orig_server.resize.assert_any_call(self.new_flavor['id'])
self.assertThat(self.db_instance.task_status, Is(InstanceTasks.NONE))
- verify(self.stub_server_mgr, times=1).get(any())
+ self.assertEqual(self.stub_server_mgr.get.call_count, 1)
self.assertThat(self.db_instance.flavor_id, Is(self.new_flavor['id']))
def test_resize_flavor_resize_failure(self):
orig_server = self.instance_task.server
self.stub_verifying_server.status = 'ERROR'
- when(self.instance_task._nova_client.servers).get(any()).thenReturn(
- self.stub_verifying_server)
- # execute
- self.assertRaises(TroveError, self.instance_task.resize_flavor,
- {'id': 1, 'ram': 512}, self.new_flavor)
- # verify
- verify(self.stub_server_mgr, times=1).get(any())
- self.assertIs(self.instance_task.server, self.stub_verifying_server)
- verify(self.instance_task._guest).stop_db(do_not_start_on_reboot=True)
- verify(orig_server).resize(self.new_flavor['id'])
- self.assertThat(self.db_instance.task_status, Is(InstanceTasks.NONE))
- self.assertThat(self.db_instance.flavor_id, Is('6'))
+ with patch.object(self.instance_task._nova_client.servers, 'get',
+ return_value=self.stub_verifying_server):
+ # execute
+ self.assertRaises(TroveError, self.instance_task.resize_flavor,
+ {'id': 1, 'ram': 512}, self.new_flavor)
+ # verify
+ self.assertTrue(self.stub_server_mgr.get.called)
+ self.assertIs(self.instance_task.server,
+ self.stub_verifying_server)
+ self.instance_task._guest.stop_db.assert_any_call(
+ do_not_start_on_reboot=True)
+ orig_server.resize.assert_any_call(self.new_flavor['id'])
+ self.assertThat(self.db_instance.task_status,
+ Is(InstanceTasks.NONE))
+ self.assertThat(self.db_instance.flavor_id, Is('6'))
class BackupTasksTest(testtools.TestCase):
@@ -556,62 +594,58 @@ class BackupTasksTest(testtools.TestCase):
[{'name': 'first'},
{'name': 'second'},
{'name': 'third'}])
- when(backup_models.Backup).delete(any()).thenReturn(None)
- when(backup_models.Backup).get_by_id(
- any(), self.backup.id).thenReturn(self.backup)
- when(backup_models.DBBackup).save(any()).thenReturn(self.backup)
- when(self.backup).delete(any()).thenReturn(None)
- self.swift_client = mock()
- when(remote).create_swift_client(
- any()).thenReturn(self.swift_client)
- when(self.swift_client).head_container(
- any()).thenRaise(ClientException("foo"))
- when(self.swift_client).head_object(
- any(), any()).thenRaise(ClientException("foo"))
- when(self.swift_client).get_container(any()).thenReturn(
- self.container_content)
- when(self.swift_client).delete_object(any(), any()).thenReturn(None)
- when(self.swift_client).delete_container(any()).thenReturn(None)
+ backup_models.Backup.delete = MagicMock(return_value=None)
+ backup_models.Backup.get_by_id = MagicMock(return_value=self.backup)
+ backup_models.DBBackup.save = MagicMock(return_value=self.backup)
+ self.backup.delete = MagicMock(return_value=None)
+ self.swift_client = MagicMock()
+ remote.create_swift_client = MagicMock(return_value=self.swift_client)
+
+ self.swift_client.head_container = MagicMock(
+ side_effect=ClientException("foo"))
+ self.swift_client.head_object = MagicMock(
+ side_effect=ClientException("foo"))
+ self.swift_client.get_container = MagicMock(
+ return_value=self.container_content)
+ self.swift_client.delete_object = MagicMock(return_value=None)
+ self.swift_client.delete_container = MagicMock(return_value=None)
def tearDown(self):
super(BackupTasksTest, self).tearDown()
- unstub()
def test_delete_backup_nolocation(self):
self.backup.location = ''
taskmanager_models.BackupTasks.delete_backup('dummy context',
self.backup.id)
- verify(self.backup).delete()
+ self.backup.delete.assert_any_call()
def test_delete_backup_fail_delete_manifest(self):
- filename = self.backup.location[self.backup.location.rfind("/") + 1:]
- when(self.swift_client).delete_object(
- any(),
- filename).thenRaise(ClientException("foo"))
- when(self.swift_client).head_object(any(), any()).thenReturn({})
- self.assertRaises(
- TroveError,
- taskmanager_models.BackupTasks.delete_backup,
- 'dummy context', self.backup.id)
- verify(backup_models.Backup, never).delete(self.backup.id)
- self.assertEqual(
- backup_models.BackupState.DELETE_FAILED,
- self.backup.state,
- "backup should be in DELETE_FAILED status")
+ with patch.object(self.swift_client, 'delete_object',
+ side_effect=ClientException("foo")):
+ with patch.object(self.swift_client, 'head_object',
+ return_value={}):
+ self.assertRaises(
+ TroveError,
+ taskmanager_models.BackupTasks.delete_backup,
+ 'dummy context', self.backup.id)
+ self.assertFalse(backup_models.Backup.delete.called)
+ self.assertEqual(
+ backup_models.BackupState.DELETE_FAILED,
+ self.backup.state,
+ "backup should be in DELETE_FAILED status")
def test_delete_backup_fail_delete_segment(self):
- when(self.swift_client).delete_object(
- any(),
- 'second').thenRaise(ClientException("foo"))
- self.assertRaises(
- TroveError,
- taskmanager_models.BackupTasks.delete_backup,
- 'dummy context', self.backup.id)
- verify(backup_models.Backup, never).delete(self.backup.id)
- self.assertEqual(
- backup_models.BackupState.DELETE_FAILED,
- self.backup.state,
- "backup should be in DELETE_FAILED status")
+ with patch.object(self.swift_client, 'delete_object',
+ side_effect=ClientException("foo")):
+ self.assertRaises(
+ TroveError,
+ taskmanager_models.BackupTasks.delete_backup,
+ 'dummy context', self.backup.id)
+ self.assertFalse(backup_models.Backup.delete.called)
+ self.assertEqual(
+ backup_models.BackupState.DELETE_FAILED,
+ self.backup.state,
+ "backup should be in DELETE_FAILED status")
def test_parse_manifest(self):
manifest = 'container/prefix'