summaryrefslogtreecommitdiff
path: root/packages/python-google-compute-engine/google_compute_engine/tests
diff options
context:
space:
mode:
authorLiam Hopkins <liamh@google.com>2018-12-14 12:44:47 -0800
committerGitHub <noreply@github.com>2018-12-14 12:44:47 -0800
commitf773905cc0a70927c7180dd60d939fbf21264c92 (patch)
treea8aa77f094f896d6689fcee711eb490822b6b1f0 /packages/python-google-compute-engine/google_compute_engine/tests
parent091c4251a0d5e4af7c006af747251af7d7bcee62 (diff)
downloadgoogle-compute-image-packages-f773905cc0a70927c7180dd60d939fbf21264c92.tar.gz
Repo layout changes (#688)
Diffstat (limited to 'packages/python-google-compute-engine/google_compute_engine/tests')
-rw-r--r--packages/python-google-compute-engine/google_compute_engine/tests/compat_test.py123
-rw-r--r--packages/python-google-compute-engine/google_compute_engine/tests/config_manager_test.py177
-rw-r--r--packages/python-google-compute-engine/google_compute_engine/tests/file_utils_test.py198
-rw-r--r--packages/python-google-compute-engine/google_compute_engine/tests/logger_test.py54
-rw-r--r--packages/python-google-compute-engine/google_compute_engine/tests/metadata_watcher_test.py358
-rw-r--r--packages/python-google-compute-engine/google_compute_engine/tests/network_utils_test.py106
6 files changed, 1016 insertions, 0 deletions
diff --git a/packages/python-google-compute-engine/google_compute_engine/tests/compat_test.py b/packages/python-google-compute-engine/google_compute_engine/tests/compat_test.py
new file mode 100644
index 0000000..ec28323
--- /dev/null
+++ b/packages/python-google-compute-engine/google_compute_engine/tests/compat_test.py
@@ -0,0 +1,123 @@
+#!/usr/bin/python
+# Copyright 2017 Google Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Unittest for compat.py module."""
+
+import sys
+
+import google_compute_engine.compat
+from google_compute_engine.test_compat import mock
+from google_compute_engine.test_compat import reload_import
+from google_compute_engine.test_compat import unittest
+from google_compute_engine.test_compat import urlretrieve
+
+
+class CompatTest(unittest.TestCase):
+
+ @mock.patch('google_compute_engine.compat.subprocess.check_call')
+ def testCurlRetrieve(self, mock_call):
+ url = 'http://www.example.com/script.sh'
+ filename = None
+ expected = ['curl', '--max-time', mock.ANY, '--retry', mock.ANY, '--', url]
+
+ if sys.version_info < (2, 7, 9):
+ urlretrieve.urlretrieve(url, filename)
+ mock_call.assert_called_with(expected)
+ else:
+ pass
+
+ @mock.patch('google_compute_engine.compat.subprocess.check_call')
+ def testCurlRetrieveFilename(self, mock_call):
+ url = 'http://www.example.com/script.sh'
+ filename = '/tmp/filename.txt'
+ expected = [
+ 'curl', '--max-time', mock.ANY, '--retry', mock.ANY, '-o', filename,
+ '--', url,
+ ]
+
+ if sys.version_info < (2, 7, 9):
+ urlretrieve.urlretrieve(url, filename)
+ mock_call.assert_called_with(expected)
+ else:
+ pass
+
+ @mock.patch('google_compute_engine.compat.subprocess.check_call')
+ @mock.patch('google_compute_engine.compat.urlretrieve.urlretrieve')
+ def testUrlRetrieve(self, mock_retrieve, mock_call):
+ url = 'http://www.example.com/script.sh'
+ filename = '/tmp/filename.txt'
+ args = ['arg1', 'arg2', 'arg3']
+ kwargs = {'kwarg1': 1, 'kwarg2': 2}
+
+ if sys.version_info >= (2, 7, 9):
+ urlretrieve.urlretrieve(url, filename, *args, **kwargs)
+ mock_retrieve.assert_called_once_with(url, filename, *args, **kwargs)
+ mock_call.assert_not_called()
+ else:
+ pass
+
+ @mock.patch('google_compute_engine.compat.distro.linux_distribution')
+ def testDistroCompatLinux(self, mock_call):
+ test_cases = {
+ ('Fedora', '28', ''):
+ google_compute_engine.distro_lib.el_7.utils,
+ ('debian', '8.10', ''):
+ google_compute_engine.distro_lib.debian_8.utils,
+ ('debian', '9.3', ''):
+ google_compute_engine.distro_lib.debian_9.utils,
+ ('debian', '10.3', ''):
+ google_compute_engine.distro_lib.debian_9.utils,
+ ('SUSE Linux Enterprise Server', '11', 'x86_64'):
+ google_compute_engine.distro_lib.sles_11.utils,
+ ('SUSE Linux Enterprise Server', '12', 'x86_64'):
+ google_compute_engine.distro_lib.sles_12.utils,
+ ('SUSE Linux Enterprise Server', '13', 'x86_64'):
+ google_compute_engine.distro_lib.sles_12.utils,
+ ('CentOS Linux', '6.4.3', 'Core'):
+ google_compute_engine.distro_lib.el_6.utils,
+ ('CentOS Linux', '7.4.1708', 'Core'):
+ google_compute_engine.distro_lib.el_7.utils,
+ ('CentOS Linux', '8.4.3', 'Core'):
+ google_compute_engine.distro_lib.el_7.utils,
+ ('Red Hat Enterprise Linux Server', '6.3.2', ''):
+ google_compute_engine.distro_lib.el_6.utils,
+ ('Red Hat Enterprise Linux Server', '7.4', ''):
+ google_compute_engine.distro_lib.el_7.utils,
+ ('Red Hat Enterprise Linux Server', '8.5.1', ''):
+ google_compute_engine.distro_lib.el_7.utils,
+ ('', '', ''):
+ google_compute_engine.distro_lib.debian_9.utils,
+ ('xxxx', 'xxxx', 'xxxx'):
+ google_compute_engine.distro_lib.debian_9.utils,
+ }
+
+ for distro in test_cases:
+ mock_call.return_value = distro
+ reload_import(google_compute_engine.compat)
+ self.assertEqual(
+ test_cases[distro], google_compute_engine.compat.distro_utils)
+
+ @mock.patch('google_compute_engine.compat.sys.platform', 'freebsd')
+ @mock.patch('google_compute_engine.compat.distro.version')
+ def testDistroCompatFreeBSD(self, mock_call):
+ mock_call.return_value = 'FreeBSD 11.1-RELEASE-p4 #0: Tue Nov 14 06:12:40'
+ reload_import(google_compute_engine.compat)
+ self.assertEqual(
+ google_compute_engine.distro_lib.freebsd_11.utils,
+ google_compute_engine.compat.distro_utils)
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/packages/python-google-compute-engine/google_compute_engine/tests/config_manager_test.py b/packages/python-google-compute-engine/google_compute_engine/tests/config_manager_test.py
new file mode 100644
index 0000000..185989a
--- /dev/null
+++ b/packages/python-google-compute-engine/google_compute_engine/tests/config_manager_test.py
@@ -0,0 +1,177 @@
+#!/usr/bin/python
+# Copyright 2016 Google Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Unittest for config_manager.py module."""
+
+from google_compute_engine import config_manager
+from google_compute_engine.test_compat import builtin
+from google_compute_engine.test_compat import mock
+from google_compute_engine.test_compat import unittest
+
+
+def _HasOption(_, option):
+ """Validate the option exists in the config file.
+
+ Args:
+ option: string, the config option to check.
+
+ Returns:
+ bool, True if test is not in the option name.
+ """
+ return 'test' not in option
+
+
+def _HasSection(section):
+ """Validate the section exists in the config file.
+
+ Args:
+ section: string, the config section to check.
+
+ Returns:
+ bool, True if test is not in the section name.
+ """
+ return 'test' not in section
+
+
+class ConfigManagerTest(unittest.TestCase):
+
+ option = 'option'
+ section = 'section'
+ value = 'value'
+
+ def setUp(self):
+ self.mock_config = mock.Mock()
+ self.mock_config.has_option.side_effect = _HasOption
+ self.mock_config.has_section.side_effect = _HasSection
+ config_manager.parser.Parser = mock.Mock()
+ config_manager.parser.Parser.return_value = self.mock_config
+
+ self.config_file = 'test.cfg'
+ self.config_header = 'Config file header.'
+
+ self.mock_config_manager = config_manager.ConfigManager(
+ config_file=self.config_file, config_header=self.config_header)
+
+ def testAddHeader(self):
+ mock_fp = mock.Mock()
+ self.mock_config_manager._AddHeader(mock_fp)
+ expected_calls = [
+ mock.call('# %s' % self.config_header),
+ mock.call('\n\n'),
+ ]
+ self.assertEqual(mock_fp.write.mock_calls, expected_calls)
+
+ def testGetOptionString(self):
+ self.mock_config_manager.GetOptionString(self.section, self.option)
+ expected_calls = [
+ mock.call.read(self.config_file),
+ mock.call.has_option(self.section, self.option),
+ mock.call.get(self.section, self.option),
+ ]
+ self.assertEqual(self.mock_config.mock_calls, expected_calls)
+
+ def testGetOptionStringNoOption(self):
+ option = 'test-option'
+ self.assertIsNone(
+ self.mock_config_manager.GetOptionString(self.section, option))
+ expected_calls = [
+ mock.call.read(self.config_file),
+ mock.call.has_option(self.section, option),
+ ]
+ self.assertEqual(self.mock_config.mock_calls, expected_calls)
+
+ def testGetOptionBool(self):
+ self.mock_config_manager.GetOptionBool(self.section, self.option)
+ expected_calls = [
+ mock.call.read(self.config_file),
+ mock.call.has_option(self.section, self.option),
+ mock.call.getboolean(self.section, self.option),
+ ]
+ self.assertEqual(self.mock_config.mock_calls, expected_calls)
+
+ def testGetOptionBoolNoOption(self):
+ option = 'test-option'
+ self.assertTrue(
+ self.mock_config_manager.GetOptionBool(self.section, option))
+ expected_calls = [
+ mock.call.read(self.config_file),
+ mock.call.has_option(self.section, option),
+ ]
+ self.assertEqual(self.mock_config.mock_calls, expected_calls)
+
+ def testSetOption(self):
+ self.mock_config_manager.SetOption(self.section, self.option, self.value)
+ expected_calls = [
+ mock.call.read(self.config_file),
+ mock.call.has_section(self.section),
+ mock.call.set(self.section, self.option, self.value),
+ ]
+ self.assertEqual(self.mock_config.mock_calls, expected_calls)
+
+ def testSetOptionNoOverwrite(self):
+ self.mock_config_manager.SetOption(
+ self.section, self.option, self.value, overwrite=False)
+ expected_calls = [
+ mock.call.read(self.config_file),
+ mock.call.has_option(self.section, self.option),
+ ]
+ self.assertEqual(self.mock_config.mock_calls, expected_calls)
+
+ def testSetOptionNewSection(self):
+ section = 'test-section'
+ self.mock_config_manager.SetOption(section, self.option, self.value)
+ expected_calls = [
+ mock.call.read(self.config_file),
+ mock.call.has_section(section),
+ mock.call.add_section(section),
+ mock.call.set(section, self.option, self.value),
+ ]
+ self.assertEqual(self.mock_config.mock_calls, expected_calls)
+
+ def testWriteConfig(self):
+ mock_open = mock.mock_open()
+ with mock.patch('%s.open' % builtin, mock_open, create=False):
+ self.mock_config_manager.WriteConfig()
+ expected_calls = [
+ mock.call('# %s' % self.config_header),
+ mock.call('\n\n'),
+ ]
+ self.assertEqual(mock_open().write.mock_calls, expected_calls)
+
+ @mock.patch('google_compute_engine.config_manager.file_utils')
+ def testWriteConfigNoHeader(self, mock_lock):
+ self.mock_config_manager = config_manager.ConfigManager(
+ config_file='/tmp/file.cfg')
+ mock_open = mock.mock_open()
+ with mock.patch('%s.open' % builtin, mock_open, create=False):
+ self.mock_config_manager.WriteConfig()
+ mock_open().write.assert_not_called()
+ mock_lock.LockFile.assert_called_once_with('/var/lock/google_file.lock')
+
+ @mock.patch('google_compute_engine.config_manager.file_utils')
+ def testWriteConfigLocked(self, mock_lock):
+ ioerror = IOError('Test Error')
+ mock_lock.LockFile.side_effect = ioerror
+ mock_open = mock.mock_open()
+ with mock.patch('%s.open' % builtin, mock_open, create=False):
+ with self.assertRaises(IOError) as error:
+ self.mock_config_manager.WriteConfig()
+ self.assertEqual(error.exception, ioerror)
+ mock_open().write.assert_not_called()
+ mock_lock.LockFile.assert_called_once_with('/var/lock/google_test.lock')
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/packages/python-google-compute-engine/google_compute_engine/tests/file_utils_test.py b/packages/python-google-compute-engine/google_compute_engine/tests/file_utils_test.py
new file mode 100644
index 0000000..1b170a3
--- /dev/null
+++ b/packages/python-google-compute-engine/google_compute_engine/tests/file_utils_test.py
@@ -0,0 +1,198 @@
+#!/usr/bin/python
+# Copyright 2016 Google Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Unittest for file_utils.py module."""
+
+from google_compute_engine import file_utils
+from google_compute_engine.test_compat import mock
+from google_compute_engine.test_compat import unittest
+
+
+class FileUtilsTest(unittest.TestCase):
+
+ def setUp(self):
+ self.fd = 1
+ self.path = '/tmp/path'
+
+ @mock.patch('google_compute_engine.file_utils.subprocess.call')
+ @mock.patch('google_compute_engine.file_utils.os.access')
+ @mock.patch('google_compute_engine.file_utils.os.path.isfile')
+ def testSetSELinuxContext(self, mock_isfile, mock_access, mock_call):
+ restorecon = '/sbin/restorecon'
+ path = 'path'
+ mock_isfile.return_value = True
+ mock_access.return_value = True
+ file_utils._SetSELinuxContext(path)
+ mock_isfile.assert_called_once_with(restorecon)
+ mock_access.assert_called_once_with(restorecon, file_utils.os.X_OK)
+ mock_call.assert_called_once_with([restorecon, path])
+
+ @mock.patch('google_compute_engine.file_utils.subprocess.call')
+ @mock.patch('google_compute_engine.file_utils.os.access')
+ @mock.patch('google_compute_engine.file_utils.os.path.isfile')
+ def testSetSELinuxContextSkip(self, mock_isfile, mock_access, mock_call):
+ mock_isfile.side_effect = [True, False, False]
+ mock_access.side_effect = [False, True, False]
+ file_utils._SetSELinuxContext('1')
+ file_utils._SetSELinuxContext('2')
+ file_utils._SetSELinuxContext('3')
+ mock_call.assert_not_called()
+
+ @mock.patch('google_compute_engine.file_utils._SetSELinuxContext')
+ @mock.patch('google_compute_engine.file_utils.os.path.exists')
+ @mock.patch('google_compute_engine.file_utils.os.mkdir')
+ @mock.patch('google_compute_engine.file_utils.os.chown')
+ @mock.patch('google_compute_engine.file_utils.os.chmod')
+ def testSetPermissions(
+ self, mock_chmod, mock_chown, mock_mkdir, mock_exists, mock_context):
+ mocks = mock.Mock()
+ mocks.attach_mock(mock_chmod, 'chmod')
+ mocks.attach_mock(mock_chown, 'chown')
+ mocks.attach_mock(mock_mkdir, 'mkdir')
+ mocks.attach_mock(mock_exists, 'exists')
+ mocks.attach_mock(mock_context, 'context')
+ path = 'path'
+ mode = 'mode'
+ uid = 'uid'
+ gid = 'gid'
+ mock_exists.side_effect = [False, True, False]
+
+ # Create a new directory.
+ file_utils.SetPermissions(path, mode=mode, uid=uid, gid=gid, mkdir=True)
+ # The path exists, so do not create a new directory.
+ file_utils.SetPermissions(path, mode=mode, uid=uid, gid=gid, mkdir=True)
+ # Create a new directory without a mode specified.
+ file_utils.SetPermissions(path, uid=uid, gid=gid, mkdir=True)
+ # Do not create the path even though it does not exist.
+ file_utils.SetPermissions(path, mode=mode, uid=uid, gid=gid, mkdir=False)
+ # Do not set an owner when a UID or GID is not specified.
+ file_utils.SetPermissions(path, mode=mode, mkdir=False)
+ # Set the SELinux context when no parameters are specified.
+ file_utils.SetPermissions(path)
+ expected_calls = [
+ # Create a new directory.
+ mock.call.exists(path),
+ mock.call.mkdir(path, mode),
+ mock.call.chown(path, uid, gid),
+ mock.call.context(path),
+ # Attempt to create a new path but reuse existing path.
+ mock.call.exists(path),
+ mock.call.chmod(path, mode),
+ mock.call.chown(path, uid, gid),
+ mock.call.context(path),
+ # Create a new directory with default mode.
+ mock.call.exists(path),
+ mock.call.mkdir(path, 0o777),
+ mock.call.chown(path, uid, gid),
+ mock.call.context(path),
+ # Set permissions and owner on an existing path.
+ mock.call.chmod(path, mode),
+ mock.call.chown(path, uid, gid),
+ mock.call.context(path),
+ # Set permissions, without changing ownership, of an existing path.
+ mock.call.chmod(path, mode),
+ mock.call.context(path),
+ # Set SELinux context on an existing path.
+ mock.call.context(path),
+ ]
+ self.assertEqual(mocks.mock_calls, expected_calls)
+
+ @mock.patch('google_compute_engine.file_utils.fcntl.flock')
+ def testLock(self, mock_flock):
+ operation = file_utils.fcntl.LOCK_EX | file_utils.fcntl.LOCK_NB
+ file_utils.Lock(self.fd, self.path, False)
+ mock_flock.assert_called_once_with(self.fd, operation)
+
+ @mock.patch('google_compute_engine.file_utils.fcntl.flock')
+ def testLockBlocking(self, mock_flock):
+ operation = file_utils.fcntl.LOCK_EX
+ file_utils.Lock(self.fd, self.path, True)
+ mock_flock.assert_called_once_with(self.fd, operation)
+
+ @mock.patch('google_compute_engine.file_utils.fcntl.flock')
+ def testLockTakenException(self, mock_flock):
+ error = IOError('Test Error')
+ error.errno = file_utils.errno.EWOULDBLOCK
+ mock_flock.side_effect = error
+ try:
+ file_utils.Lock(self.fd, self.path, False)
+ except IOError as e:
+ self.assertTrue(self.path in str(e))
+
+ @mock.patch('google_compute_engine.file_utils.fcntl.flock')
+ def testLockException(self, mock_flock):
+ error = IOError('Test Error')
+ mock_flock.side_effect = error
+ try:
+ file_utils.Lock(self.fd, self.path, False)
+ except IOError as e:
+ self.assertTrue(self.path in str(e))
+ self.assertTrue('Test Error' in str(e))
+
+ @mock.patch('google_compute_engine.file_utils.fcntl.flock')
+ def testUnlock(self, mock_flock):
+ operation = file_utils.fcntl.LOCK_UN | file_utils.fcntl.LOCK_NB
+ file_utils.Unlock(self.fd, self.path)
+ mock_flock.assert_called_once_with(self.fd, operation)
+
+ @mock.patch('google_compute_engine.file_utils.fcntl.flock')
+ def testUnlockTakenException(self, mock_flock):
+ error = IOError('Test Error')
+ error.errno = file_utils.errno.EWOULDBLOCK
+ mock_flock.side_effect = error
+ try:
+ file_utils.Unlock(self.fd, self.path)
+ except IOError as e:
+ self.assertTrue(self.path in str(e))
+
+ @mock.patch('google_compute_engine.file_utils.fcntl.flock')
+ def testUnlockException(self, mock_flock):
+ error = IOError('Test Error')
+ mock_flock.side_effect = error
+ try:
+ file_utils.Unlock(self.fd, self.path)
+ except IOError as e:
+ self.assertTrue(self.path in str(e))
+ self.assertTrue('Test Error' in str(e))
+
+ @mock.patch('google_compute_engine.file_utils.Unlock')
+ @mock.patch('google_compute_engine.file_utils.Lock')
+ @mock.patch('google_compute_engine.file_utils.os')
+ def testLockFile(self, mock_os, mock_lock, mock_unlock):
+ mock_callable = mock.Mock()
+ mock_os.open.return_value = self.fd
+ mock_os.O_CREAT = 1
+ mocks = mock.Mock()
+ mocks.attach_mock(mock_callable, 'callable')
+ mocks.attach_mock(mock_lock, 'lock')
+ mocks.attach_mock(mock_unlock, 'unlock')
+ mocks.attach_mock(mock_os.open, 'open')
+ mocks.attach_mock(mock_os.close, 'close')
+
+ with file_utils.LockFile(self.path, blocking=True):
+ mock_callable('test')
+
+ expected_calls = [
+ mock.call.open(self.path, 1),
+ mock.call.lock(self.fd, self.path, True),
+ mock.call.callable('test'),
+ mock.call.unlock(self.fd, self.path),
+ mock.call.close(1),
+ ]
+ self.assertEqual(mocks.mock_calls, expected_calls)
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/packages/python-google-compute-engine/google_compute_engine/tests/logger_test.py b/packages/python-google-compute-engine/google_compute_engine/tests/logger_test.py
new file mode 100644
index 0000000..a0b6100
--- /dev/null
+++ b/packages/python-google-compute-engine/google_compute_engine/tests/logger_test.py
@@ -0,0 +1,54 @@
+#!/usr/bin/python
+# Copyright 2016 Google Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Unittest for logger.py module."""
+
+from google_compute_engine import logger
+from google_compute_engine.test_compat import mock
+from google_compute_engine.test_compat import unittest
+
+
+class LoggerTest(unittest.TestCase):
+
+ @mock.patch('google_compute_engine.logger.logging.handlers.SysLogHandler')
+ @mock.patch('google_compute_engine.logger.logging.StreamHandler')
+ @mock.patch('google_compute_engine.logger.logging.NullHandler')
+ def testLogger(self, mock_null, mock_stream, mock_syslog):
+ mock_null.return_value = mock_null
+ mock_stream.return_value = mock_stream
+ mock_syslog.return_value = mock_syslog
+ name = 'test'
+
+ # Verify basic logger setup.
+ named_logger = logger.Logger(name=name, debug=True)
+ mock_stream.setLevel.assert_called_once_with(logger.logging.DEBUG)
+ self.assertEqual(named_logger.handlers, [mock_null, mock_stream])
+
+ # Verify logger setup with a facility.
+ address = '/dev/log'
+ facility = 1
+ named_logger = logger.Logger(name=name, debug=True, facility=facility)
+ mock_syslog.assert_called_once_with(address=address, facility=facility)
+ mock_syslog.setLevel.assert_called_once_with(logger.logging.INFO)
+ self.assertEqual(
+ named_logger.handlers, [mock_null, mock_stream, mock_syslog])
+
+ # Verify the handlers are reset during repeated calls.
+ named_logger = logger.Logger(name=name, debug=False)
+ self.assertEqual(named_logger.handlers, [mock_null])
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/packages/python-google-compute-engine/google_compute_engine/tests/metadata_watcher_test.py b/packages/python-google-compute-engine/google_compute_engine/tests/metadata_watcher_test.py
new file mode 100644
index 0000000..1bce509
--- /dev/null
+++ b/packages/python-google-compute-engine/google_compute_engine/tests/metadata_watcher_test.py
@@ -0,0 +1,358 @@
+#!/usr/bin/python
+# Copyright 2016 Google Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Unittest for metadata_watcher.py module."""
+
+import os
+
+from google_compute_engine import metadata_watcher
+from google_compute_engine.test_compat import mock
+from google_compute_engine.test_compat import unittest
+
+
+class MetadataWatcherTest(unittest.TestCase):
+
+ def setUp(self):
+ self.mock_logger = mock.Mock()
+ self.timeout = 60
+ self.url = 'http://metadata.google.internal/computeMetadata/v1'
+ self.params = {
+ 'alt': 'json',
+ 'last_etag': 0,
+ 'recursive': True,
+ 'timeout_sec': self.timeout,
+ 'wait_for_change': True,
+ }
+ self.mock_watcher = metadata_watcher.MetadataWatcher(
+ logger=self.mock_logger, timeout=self.timeout)
+
+ @mock.patch('google_compute_engine.metadata_watcher.urlrequest.build_opener')
+ @mock.patch('google_compute_engine.metadata_watcher.urlrequest.ProxyHandler')
+ @mock.patch('google_compute_engine.metadata_watcher.urlrequest.Request')
+ def testGetMetadataRequest(self, mock_request, mock_proxy, mock_opener):
+ mock_open = mock.Mock()
+ mock_handler = mock.Mock()
+ mock_response = mock.Mock()
+ mocks = mock.Mock()
+ mocks.attach_mock(mock_request, 'request')
+ mocks.attach_mock(mock_proxy, 'proxy')
+ mocks.attach_mock(mock_handler, 'handler')
+ mocks.attach_mock(mock_opener, 'opener')
+ mocks.attach_mock(mock_open, 'open')
+ mocks.attach_mock(mock_response, 'response')
+ mock_request.return_value = mock_request
+ mock_proxy.return_value = mock_handler
+ mock_opener.return_value = mock_open
+ mock_response.getcode.return_value = metadata_watcher.httpclient.OK
+ mock_open.open.return_value = mock_response
+ params = {'hello': 'world'}
+ request_url = '%s?hello=world' % self.url
+ headers = {'Metadata-Flavor': 'Google'}
+ timeout = self.timeout * 1.1
+
+ self.mock_watcher._GetMetadataRequest(self.url, params=params)
+ expected_calls = [
+ mock.call.request(request_url, headers=headers),
+ mock.call.proxy({}),
+ mock.call.opener(mock_handler),
+ mock.call.open.open(mock_request, timeout=timeout),
+ mock.call.response.getcode(),
+ ]
+ self.assertEqual(mocks.mock_calls, expected_calls)
+
+ @mock.patch('google_compute_engine.metadata_watcher.time')
+ @mock.patch('google_compute_engine.metadata_watcher.urlrequest.build_opener')
+ @mock.patch('google_compute_engine.metadata_watcher.urlrequest.ProxyHandler')
+ @mock.patch('google_compute_engine.metadata_watcher.urlrequest.Request')
+ def testGetMetadataRequestRetry(
+ self, mock_request, mock_proxy, mock_opener, mock_time):
+ mock_open = mock.Mock()
+ mock_handler = mock.Mock()
+ mocks = mock.Mock()
+ mocks.attach_mock(mock_request, 'request')
+ mocks.attach_mock(mock_proxy, 'proxy')
+ mocks.attach_mock(mock_handler, 'handler')
+ mocks.attach_mock(mock_opener, 'opener')
+ mocks.attach_mock(mock_open, 'open')
+ mocks.attach_mock(mock_time, 'time')
+ mock_request.return_value = mock_request
+ mock_proxy.return_value = mock_handler
+ mock_opener.return_value = mock_open
+
+ mock_unavailable = mock.Mock()
+ mock_unavailable.getcode.return_value = (
+ metadata_watcher.httpclient.SERVICE_UNAVAILABLE)
+ mock_timeout = metadata_watcher.socket.timeout('Test timeout')
+ mock_success = mock.Mock()
+ mock_success.getcode.return_value = metadata_watcher.httpclient.OK
+
+ # Retry after a service unavailable error response.
+ mock_open.open.side_effect = [
+ metadata_watcher.StatusException(mock_unavailable),
+ mock_timeout,
+ mock_success,
+ ]
+ request_url = '%s?' % self.url
+ headers = {'Metadata-Flavor': 'Google'}
+ timeout = self.timeout * 1.1
+
+ self.mock_watcher._GetMetadataRequest(self.url)
+ expected_calls = [
+ mock.call.request(request_url, headers=headers),
+ mock.call.proxy({}),
+ mock.call.opener(mock_handler),
+ mock.call.open.open(mock_request, timeout=timeout),
+ mock.call.time.sleep(mock.ANY),
+ mock.call.request(request_url, headers=headers),
+ mock.call.proxy({}),
+ mock.call.opener(mock_handler),
+ mock.call.open.open(mock_request, timeout=timeout),
+ mock.call.time.sleep(mock.ANY),
+ mock.call.request(request_url, headers=headers),
+ mock.call.proxy({}),
+ mock.call.opener(mock_handler),
+ mock.call.open.open(mock_request, timeout=timeout),
+ ]
+ self.assertEqual(mocks.mock_calls, expected_calls)
+
+ @mock.patch('google_compute_engine.metadata_watcher.time')
+ @mock.patch('google_compute_engine.metadata_watcher.urlrequest.build_opener')
+ @mock.patch('google_compute_engine.metadata_watcher.urlrequest.ProxyHandler')
+ @mock.patch('google_compute_engine.metadata_watcher.urlrequest.Request')
+ def testGetMetadataRequestHttpException(
+ self, mock_request, mock_proxy, mock_opener, mock_time):
+ mock_open = mock.Mock()
+ mock_handler = mock.Mock()
+ mock_response = mock.Mock()
+ mock_request.return_value = mock_request
+ mock_proxy.return_value = mock_handler
+ mock_opener.return_value = mock_open
+ mock_response.getcode.return_value = metadata_watcher.httpclient.NOT_FOUND
+ mock_open.open.side_effect = metadata_watcher.StatusException(mock_response)
+
+ with self.assertRaises(metadata_watcher.StatusException):
+ self.mock_watcher._GetMetadataRequest(self.url)
+ self.assertEqual(mock_request.call_count, 1)
+ self.assertEqual(mock_proxy.call_count, 1)
+ self.assertEqual(mock_opener.call_count, 1)
+ self.assertEqual(mock_open.open.call_count, 1)
+ self.assertEqual(mock_response.getcode.call_count, 1)
+
+ @mock.patch('google_compute_engine.metadata_watcher.urlrequest.build_opener')
+ @mock.patch('google_compute_engine.metadata_watcher.urlrequest.ProxyHandler')
+ @mock.patch('google_compute_engine.metadata_watcher.urlrequest.Request')
+ def testGetMetadataRequestException(
+ self, mock_request, mock_proxy, mock_opener):
+ mock_open = mock.Mock()
+ mock_handler = mock.Mock()
+ mock_response = mock.Mock()
+ mock_request.return_value = mock_request
+ mock_proxy.return_value = mock_handler
+ mock_opener.return_value = mock_open
+ mock_response.getcode.return_value = metadata_watcher.httpclient.NOT_FOUND
+ mock_open.open.side_effect = mock_response
+
+ with self.assertRaises(metadata_watcher.StatusException):
+ self.mock_watcher._GetMetadataRequest(self.url)
+ self.assertEqual(mock_request.call_count, 1)
+ self.assertEqual(mock_proxy.call_count, 1)
+ self.assertEqual(mock_opener.call_count, 1)
+ self.assertEqual(mock_open.open.call_count, 1)
+
+ def testUpdateEtag(self):
+ mock_response = mock.Mock()
+ mock_response.headers = {'etag': 1}
+ self.assertEqual(self.mock_watcher.etag, 0)
+
+ # Update the etag if the etag is set.
+ self.assertTrue(self.mock_watcher._UpdateEtag(mock_response))
+ self.assertEqual(self.mock_watcher.etag, 1)
+
+ # Do not update the etag if the etag is unchanged.
+ self.assertFalse(self.mock_watcher._UpdateEtag(mock_response))
+ self.assertEqual(self.mock_watcher.etag, 1)
+
+ # Do not update the etag if the etag is not set.
+ mock_response.headers = {}
+ self.assertFalse(self.mock_watcher._UpdateEtag(mock_response))
+ self.assertEqual(self.mock_watcher.etag, 1)
+
+ def testGetMetadataUpdate(self):
+ mock_response = mock.Mock()
+ mock_response.return_value = mock_response
+ mock_response.headers = {'etag': 1}
+ mock_response.read.return_value = bytes(b'{}')
+ self.mock_watcher._GetMetadataRequest = mock_response
+ request_url = os.path.join(self.url, '')
+
+ self.assertEqual(self.mock_watcher._GetMetadataUpdate(), {})
+ self.assertEqual(self.mock_watcher.etag, 1)
+ mock_response.assert_called_once_with(
+ request_url, params=self.params, timeout=None)
+
+ def testGetMetadataUpdateArgs(self):
+ mock_response = mock.Mock()
+ mock_response.return_value = mock_response
+ mock_response.headers = {'etag': 0}
+ mock_response.read.return_value = bytes(b'{}')
+ self.mock_watcher._GetMetadataRequest = mock_response
+ metadata_key = 'instance/id'
+ self.params['recursive'] = False
+ self.params['wait_for_change'] = False
+ request_url = os.path.join(self.url, metadata_key)
+
+ self.mock_watcher._GetMetadataUpdate(
+ metadata_key=metadata_key, recursive=False, wait=False, timeout=60)
+ self.assertEqual(self.mock_watcher.etag, 0)
+ mock_response.assert_called_once_with(
+ request_url, params=self.params, timeout=60)
+
+ def testGetMetadataUpdateWait(self):
+ self.params['last_etag'] = 1
+ self.mock_watcher.etag = 1
+ mock_unchanged = mock.Mock()
+ mock_unchanged.headers = {'etag': 1}
+ mock_unchanged.read.return_value = bytes(b'{}')
+ mock_changed = mock.Mock()
+ mock_changed.headers = {'etag': 2}
+ mock_changed.read.return_value = bytes(b'{}')
+ mock_response = mock.Mock()
+ mock_response.side_effect = [mock_unchanged, mock_unchanged, mock_changed]
+ self.mock_watcher._GetMetadataRequest = mock_response
+ request_url = os.path.join(self.url, '')
+
+ self.mock_watcher._GetMetadataUpdate()
+ self.assertEqual(self.mock_watcher.etag, 2)
+ expected_calls = [
+ mock.call(request_url, params=self.params, timeout=None),
+ ] * 3
+ self.assertEqual(mock_response.mock_calls, expected_calls)
+
+ def testHandleMetadataUpdate(self):
+ mock_response = mock.Mock()
+ mock_response.return_value = {}
+ self.mock_watcher._GetMetadataUpdate = mock_response
+
+ self.assertEqual(self.mock_watcher.GetMetadata(), {})
+ mock_response.assert_called_once_with(
+ metadata_key='', recursive=True, wait=False, timeout=None)
+ self.mock_watcher.logger.exception.assert_not_called()
+
+ def testHandleMetadataUpdateException(self):
+ mock_response = mock.Mock()
+ first = metadata_watcher.socket.timeout()
+ second = metadata_watcher.urlerror.URLError('Test')
+ mock_response.side_effect = [first, first, second, {}]
+ self.mock_watcher._GetMetadataUpdate = mock_response
+ metadata_key = 'instance/id'
+ recursive = False
+ wait = False
+ retry = True
+
+ self.assertEqual(
+ self.mock_watcher._HandleMetadataUpdate(
+ metadata_key=metadata_key, recursive=recursive, wait=wait,
+ timeout=None, retry=retry),
+ {})
+ expected_calls = [
+ mock.call(
+ metadata_key=metadata_key, recursive=recursive, wait=wait,
+ timeout=None),
+ ] * 4
+ self.assertEqual(mock_response.mock_calls, expected_calls)
+ expected_calls = [mock.call.error(mock.ANY, mock.ANY)] * 2
+ self.assertEqual(self.mock_logger.mock_calls, expected_calls)
+
+ def testHandleMetadataUpdateExceptionNoRetry(self):
+ mock_response = mock.Mock()
+ mock_response.side_effect = metadata_watcher.socket.timeout()
+ self.mock_watcher._GetMetadataUpdate = mock_response
+ metadata_key = 'instance/id'
+ recursive = False
+ wait = False
+ retry = False
+
+ self.assertIsNone(
+ self.mock_watcher._HandleMetadataUpdate(
+ metadata_key=metadata_key, recursive=recursive, wait=wait,
+ timeout=None, retry=retry))
+ expected_calls = [
+ mock.call(
+ metadata_key=metadata_key, recursive=recursive, wait=wait,
+ timeout=None),
+ ]
+ self.assertEqual(mock_response.mock_calls, expected_calls)
+ expected_calls = [mock.call.error(mock.ANY, mock.ANY)]
+ self.assertEqual(self.mock_logger.mock_calls, expected_calls)
+
+ def testWatchMetadata(self):
+ mock_response = mock.Mock()
+ mock_response.return_value = {}
+ self.mock_watcher._HandleMetadataUpdate = mock_response
+ mock_handler = mock.Mock()
+ mock_handler.side_effect = Exception()
+ self.mock_logger.exception.side_effect = RuntimeError()
+ recursive = True
+
+ with self.assertRaises(RuntimeError):
+ self.mock_watcher.WatchMetadata(mock_handler, recursive=recursive)
+ mock_handler.assert_called_once_with({})
+ mock_response.assert_called_once_with(
+ metadata_key='', recursive=recursive, wait=True, timeout=None)
+
+ def testWatchMetadataException(self):
+ mock_response = mock.Mock()
+ mock_response.side_effect = metadata_watcher.socket.timeout()
+ self.mock_watcher._GetMetadataUpdate = mock_response
+ self.mock_logger.error.side_effect = RuntimeError()
+ metadata_key = 'instance/id'
+ recursive = False
+
+ with self.assertRaises(RuntimeError):
+ self.mock_watcher.WatchMetadata(
+ None, metadata_key=metadata_key, recursive=recursive)
+ mock_response.assert_called_once_with(
+ metadata_key=metadata_key, recursive=recursive, wait=True, timeout=None)
+
+ def testGetMetadata(self):
+ mock_response = mock.Mock()
+ mock_response.return_value = {}
+ self.mock_watcher._HandleMetadataUpdate = mock_response
+
+ self.assertEqual(self.mock_watcher.GetMetadata(), {})
+ mock_response.assert_called_once_with(
+ metadata_key='', recursive=True, wait=False, timeout=None, retry=True)
+ self.mock_watcher.logger.exception.assert_not_called()
+
+ def testGetMetadataArgs(self):
+ mock_response = mock.Mock()
+ mock_response.return_value = {}
+ self.mock_watcher._HandleMetadataUpdate = mock_response
+ metadata_key = 'instance/id'
+ recursive = False
+ retry = False
+
+ response = self.mock_watcher.GetMetadata(
+ metadata_key=metadata_key, recursive=recursive, timeout=60,
+ retry=retry)
+ self.assertEqual(response, {})
+ mock_response.assert_called_once_with(
+ metadata_key=metadata_key, recursive=False, wait=False, timeout=60,
+ retry=False)
+ self.mock_watcher.logger.exception.assert_not_called()
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/packages/python-google-compute-engine/google_compute_engine/tests/network_utils_test.py b/packages/python-google-compute-engine/google_compute_engine/tests/network_utils_test.py
new file mode 100644
index 0000000..5660f7f
--- /dev/null
+++ b/packages/python-google-compute-engine/google_compute_engine/tests/network_utils_test.py
@@ -0,0 +1,106 @@
+#!/usr/bin/python
+# Copyright 2016 Google Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Unittest for network_utils.py module."""
+
+from google_compute_engine import network_utils
+from google_compute_engine.test_compat import builtin
+from google_compute_engine.test_compat import mock
+from google_compute_engine.test_compat import unittest
+
+
+class NetworkUtilsTest(unittest.TestCase):
+
+ def setUp(self):
+ self.mock_logger = mock.Mock()
+ self.interfaces = {'address': 'interface'}
+ self.mock_utils = network_utils.NetworkUtils(self.mock_logger)
+ self.mock_utils.interfaces = self.interfaces
+
+ @mock.patch('google_compute_engine.network_utils.netifaces', False)
+ @mock.patch('google_compute_engine.network_utils.os.listdir')
+ def testCreateInterfaceMapSysfs(self, mock_listdir):
+ mock_open = mock.mock_open()
+ interface_map = {
+ '1': 'a',
+ '2': 'b',
+ '3': 'c',
+ }
+ mock_listdir.return_value = interface_map.values()
+
+ with mock.patch('%s.open' % builtin, mock_open, create=False):
+ addresses = interface_map.keys()
+ addresses = ['%s\n' % address for address in addresses]
+ mock_open().read.side_effect = interface_map.keys()
+ self.assertEqual(self.mock_utils._CreateInterfaceMap(), interface_map)
+
+ @mock.patch('google_compute_engine.network_utils.netifaces', False)
+ @mock.patch('google_compute_engine.network_utils.os.listdir')
+ def testCreateInterfaceMapSysfsError(self, mock_listdir):
+ mock_open = mock.mock_open()
+ mock_listdir.return_value = ['a', 'b', 'c']
+
+ with mock.patch('%s.open' % builtin, mock_open, create=False):
+ mock_open().read.side_effect = [
+ '1', OSError('OSError'), IOError('IOError')]
+ self.assertEqual(self.mock_utils._CreateInterfaceMap(), {'1': 'a'})
+ expected_calls = [
+ mock.call.warning(mock.ANY, 'b', 'OSError'),
+ mock.call.warning(mock.ANY, 'c', 'IOError'),
+ ]
+ self.assertEqual(self.mock_logger.mock_calls, expected_calls)
+
+ @mock.patch('google_compute_engine.network_utils.netifaces.AF_LINK', 88)
+ @mock.patch('google_compute_engine.network_utils.netifaces')
+ def testCreateInterfaceMapNetifaces(self, mock_netifaces):
+ interface_map = {
+ '11:11:11:11:11:11': 'a',
+ '22:22:22:22:22:22': 'b',
+ '33:33:33:33:33:33': 'c',
+ }
+ ifaddress_map = {
+ 'a': {mock_netifaces.AF_LINK: [{'addr': '11:11:11:11:11:11'}]},
+ 'b': {mock_netifaces.AF_LINK: [{'addr': '22:22:22:22:22:22'}]},
+ 'c': {mock_netifaces.AF_LINK: [{'addr': '33:33:33:33:33:33'}]},
+ }
+ mock_netifaces.interfaces.return_value = interface_map.values()
+ mock_netifaces.ifaddresses.side_effect = (
+ lambda interface: ifaddress_map[interface])
+ self.assertEqual(self.mock_utils._CreateInterfaceMap(), interface_map)
+
+ @mock.patch('google_compute_engine.network_utils.netifaces.AF_LINK', 88)
+ @mock.patch('google_compute_engine.network_utils.netifaces')
+ def testCreateInterfaceMapNetifacesError(self, mock_netifaces):
+ ifaddress_map = {
+ 'a': {mock_netifaces.AF_LINK: [{'addr': '11:11:11:11:11:11'}]},
+ 'b': {},
+ 'c': {mock_netifaces.AF_LINK: [{'addr': ''}]},
+ }
+ mock_netifaces.interfaces.return_value = ['a', 'b', 'c']
+ mock_netifaces.ifaddresses.side_effect = (
+ lambda interface: ifaddress_map[interface])
+
+ self.assertEqual(
+ self.mock_utils._CreateInterfaceMap(), {'11:11:11:11:11:11': 'a'})
+ expected_calls = [
+ mock.call.warning(mock.ANY, 'b'),
+ mock.call.warning(mock.ANY, 'c'),
+ ]
+ self.assertEqual(self.mock_logger.mock_calls, expected_calls)
+
+ def testGetNetworkInterface(self):
+ self.assertIsNone(self.mock_utils.GetNetworkInterface('invalid'))
+ self.assertEqual(
+ self.mock_utils.GetNetworkInterface('address'), 'interface')