diff options
author | Max Illfelder <illfelder@users.noreply.github.com> | 2016-10-19 19:41:37 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2016-10-19 19:41:37 -0700 |
commit | c502fb72eb2190dfabeeb87e63baef5862559cbc (patch) | |
tree | fc391a10de58d61a8f6f526d73d20c5d0791584c | |
parent | 985e071dba939b62dd451da6848e36088b54aa92 (diff) | |
download | google-compute-image-packages-c502fb72eb2190dfabeeb87e63baef5862559cbc.tar.gz |
Add a timeout option to metadata watcher calls. (#344)
When watching metadata changes, this allows a daemon to be notified
after a timeout, rather than waiting for a change. The accounts daemon
uses this logic to remove expired keys in the guest. The IP forwarding
daemon uses this logic to setup local routes after networking reboot.
6 files changed, 61 insertions, 31 deletions
diff --git a/google_compute_engine/accounts/accounts_daemon.py b/google_compute_engine/accounts/accounts_daemon.py index 5b9779d..fd20f80 100755 --- a/google_compute_engine/accounts/accounts_daemon.py +++ b/google_compute_engine/accounts/accounts_daemon.py @@ -19,6 +19,7 @@ import datetime import json import logging.handlers import optparse +import random from google_compute_engine import config_manager from google_compute_engine import file_utils @@ -51,7 +52,9 @@ class AccountsDaemon(object): try: with file_utils.LockFile(LOCKFILE): self.logger.info('Starting Google Accounts daemon.') - self.watcher.WatchMetadata(self.HandleAccounts, recursive=True) + timeout = 60 + random.randint(0, 30) + self.watcher.WatchMetadata( + self.HandleAccounts, recursive=True, timeout=timeout) except (IOError, OSError) as e: self.logger.warning(str(e)) diff --git a/google_compute_engine/accounts/tests/accounts_daemon_test.py b/google_compute_engine/accounts/tests/accounts_daemon_test.py index 3fd830d..2b6700b 100644 --- a/google_compute_engine/accounts/tests/accounts_daemon_test.py +++ b/google_compute_engine/accounts/tests/accounts_daemon_test.py @@ -59,7 +59,7 @@ class AccountsDaemonTest(unittest.TestCase): mock.call.lock.LockFile().__enter__(), mock.call.logger.Logger().info(mock.ANY), mock.call.watcher.MetadataWatcher().WatchMetadata( - mock_handle, recursive=True), + mock_handle, recursive=True, timeout=mock.ANY), mock.call.lock.LockFile().__exit__(None, None, None), ] self.assertEqual(mocks.mock_calls, expected_calls) diff --git a/google_compute_engine/ip_forwarding/ip_forwarding_daemon.py b/google_compute_engine/ip_forwarding/ip_forwarding_daemon.py index 04c0e09..a2e43c9 100755 --- a/google_compute_engine/ip_forwarding/ip_forwarding_daemon.py +++ b/google_compute_engine/ip_forwarding/ip_forwarding_daemon.py @@ -16,8 +16,9 @@ """Manage IP forwarding on a Google Compute Engine instance. Fetch a list of public endpoint IPs from the metadata server, compare it with -the IPs configured on eth0, and add or remove addresses from eth0 to make them -match. Only remove those which match our proto code. +the IPs configured the associated interfaces, and add or remove addresses from +the interfaces to make them match. Only remove those which match our proto +code. Command used to add IPs: ip route add to local $IP/32 dev eth0 proto 66 @@ -27,6 +28,7 @@ Command used to fetch list of configured IPs: import logging.handlers import optparse +import random from google_compute_engine import config_manager from google_compute_engine import file_utils @@ -61,9 +63,10 @@ class IpForwardingDaemon(object): try: with file_utils.LockFile(LOCKFILE): self.logger.info('Starting Google IP Forwarding daemon.') + timeout = 60 + random.randint(0, 30) self.watcher.WatchMetadata( self.HandleNetworkInterfaces, metadata_key=self.network_interfaces, - recursive=True) + recursive=True, timeout=timeout) except (IOError, OSError) as e: self.logger.warning(str(e)) diff --git a/google_compute_engine/ip_forwarding/tests/ip_forwarding_daemon_test.py b/google_compute_engine/ip_forwarding/tests/ip_forwarding_daemon_test.py index 71dee68..a34e9e8 100644 --- a/google_compute_engine/ip_forwarding/tests/ip_forwarding_daemon_test.py +++ b/google_compute_engine/ip_forwarding/tests/ip_forwarding_daemon_test.py @@ -66,7 +66,8 @@ class IpForwardingDaemonTest(unittest.TestCase): mock.call.lock.LockFile().__enter__(), mock.call.logger.Logger().info(mock.ANY), mock.call.watcher.MetadataWatcher().WatchMetadata( - mock_handle, metadata_key=metadata_key, recursive=True), + mock_handle, metadata_key=metadata_key, recursive=True, + timeout=mock.ANY), mock.call.lock.LockFile().__exit__(None, None, None), ] self.assertEqual(mocks.mock_calls, expected_calls) diff --git a/google_compute_engine/metadata_watcher.py b/google_compute_engine/metadata_watcher.py index e7da49b..3ff648d 100644 --- a/google_compute_engine/metadata_watcher.py +++ b/google_compute_engine/metadata_watcher.py @@ -76,12 +76,13 @@ class MetadataWatcher(object): self.timeout = timeout @RetryOnUnavailable - def _GetMetadataRequest(self, metadata_url, params=None): + def _GetMetadataRequest(self, metadata_url, params=None, timeout=None): """Performs a GET request with the metadata headers. Args: metadata_url: string, the URL to perform a GET request on. params: dictionary, the query parameters in the GET request. + timeout: int, timeout in seconds for metadata requests. Returns: HTTP response from the GET request. @@ -94,7 +95,8 @@ class MetadataWatcher(object): url = '%s?%s' % (metadata_url, params) request = urlrequest.Request(url, headers=headers) request_opener = urlrequest.build_opener(urlrequest.ProxyHandler({})) - return request_opener.open(request, timeout=self.timeout*1.1) + timeout = timeout or self.timeout + return request_opener.open(request, timeout=timeout*1.1) def _UpdateEtag(self, response): """Update the etag from an API response. @@ -110,13 +112,15 @@ class MetadataWatcher(object): self.etag = etag return etag_updated - def _GetMetadataUpdate(self, metadata_key='', recursive=True, wait=True): + def _GetMetadataUpdate( + self, metadata_key='', recursive=True, wait=True, timeout=None): """Request the contents of metadata server and deserialize the response. Args: metadata_key: string, the metadata key to watch for changes. recursive: bool, True if we should recursively watch for metadata changes. wait: bool, True if we should wait for a metadata change. + timeout: int, timeout in seconds for returning metadata output. Returns: json, the deserialized contents of the metadata server. @@ -127,27 +131,33 @@ class MetadataWatcher(object): 'alt': 'json', 'last_etag': self.etag, 'recursive': recursive, - 'timeout_sec': self.timeout, + 'timeout_sec': timeout or self.timeout, 'wait_for_change': wait, } while True: - response = self._GetMetadataRequest(metadata_url, params=params) + response = self._GetMetadataRequest( + metadata_url, params=params, timeout=timeout) etag_updated = self._UpdateEtag(response) - if wait and not etag_updated: + if wait and not etag_updated and not timeout: # Retry until the etag is updated. continue else: - # Waiting for change is not required or the etag is updated. + # One of the following are true: + # - Waiting for change is not required. + # - The etag is updated. + # - The user specified a request timeout. break return json.loads(response.read().decode('utf-8')) - def _HandleMetadataUpdate(self, metadata_key='', recursive=True, wait=True): + def _HandleMetadataUpdate( + self, metadata_key='', recursive=True, wait=True, timeout=None): """Wait for a successful metadata response. Args: metadata_key: string, the metadata key to watch for changes. recursive: bool, True if we should recursively watch for metadata changes. wait: bool, True if we should wait for a metadata change. + timeout: int, timeout in seconds for returning metadata output. Returns: json, the deserialized contents of the metadata server. @@ -156,7 +166,8 @@ class MetadataWatcher(object): while True: try: return self._GetMetadataUpdate( - metadata_key=metadata_key, recursive=recursive, wait=wait) + metadata_key=metadata_key, recursive=recursive, wait=wait, + timeout=timeout) except (httpclient.HTTPException, socket.error, urlerror.URLError) as e: if isinstance(e, type(exception)): continue @@ -164,31 +175,36 @@ class MetadataWatcher(object): exception = e self.logger.exception('GET request error retrieving metadata.') - def WatchMetadata(self, handler, metadata_key='', recursive=True): + def WatchMetadata( + self, handler, metadata_key='', recursive=True, timeout=None): """Watch for changes to the contents of the metadata server. Args: handler: callable, a function to call with the updated metadata contents. metadata_key: string, the metadata key to watch for changes. recursive: bool, True if we should recursively watch for metadata changes. + timeout: int, timeout in seconds for returning metadata output. """ while True: response = self._HandleMetadataUpdate( - metadata_key=metadata_key, recursive=recursive, wait=True) + metadata_key=metadata_key, recursive=recursive, wait=True, + timeout=timeout) try: handler(response) except Exception as e: self.logger.exception('Exception calling the response handler. %s.', e) - def GetMetadata(self, metadata_key='', recursive=True): + def GetMetadata(self, metadata_key='', recursive=True, timeout=None): """Retrieve the contents of metadata server for a metadata key. Args: metadata_key: string, the metadata key to watch for changes. recursive: bool, True if we should recursively watch for metadata changes. + timeout: int, timeout in seconds for returning metadata output. Returns: json, the deserialized contents of the metadata server or None if error. """ return self._HandleMetadataUpdate( - metadata_key=metadata_key, recursive=recursive, wait=False) + metadata_key=metadata_key, recursive=recursive, wait=False, + timeout=timeout) diff --git a/google_compute_engine/tests/metadata_watcher_test.py b/google_compute_engine/tests/metadata_watcher_test.py index a1e5240..2245711 100644 --- a/google_compute_engine/tests/metadata_watcher_test.py +++ b/google_compute_engine/tests/metadata_watcher_test.py @@ -191,7 +191,8 @@ class MetadataWatcherTest(unittest.TestCase): self.assertEqual(self.mock_watcher._GetMetadataUpdate(), {}) self.assertEqual(self.mock_watcher.etag, 1) - mock_response.assert_called_once_with(request_url, params=self.params) + mock_response.assert_called_once_with( + request_url, params=self.params, timeout=None) def testGetMetadataUpdateArgs(self): mock_response = mock.Mock() @@ -205,9 +206,10 @@ class MetadataWatcherTest(unittest.TestCase): request_url = os.path.join(self.url, metadata_key) self.mock_watcher._GetMetadataUpdate( - metadata_key=metadata_key, recursive=False, wait=False) + metadata_key=metadata_key, recursive=False, wait=False, timeout=60) self.assertEqual(self.mock_watcher.etag, 0) - mock_response.assert_called_once_with(request_url, params=self.params) + mock_response.assert_called_once_with( + request_url, params=self.params, timeout=60) def testGetMetadataUpdateWait(self): self.params['last_etag'] = 1 @@ -225,7 +227,9 @@ class MetadataWatcherTest(unittest.TestCase): self.mock_watcher._GetMetadataUpdate() self.assertEqual(self.mock_watcher.etag, 2) - expected_calls = [mock.call(request_url, params=self.params)] * 3 + expected_calls = [ + mock.call(request_url, params=self.params, timeout=None), + ] * 3 self.assertEqual(mock_response.mock_calls, expected_calls) def testHandleMetadataUpdate(self): @@ -235,7 +239,7 @@ class MetadataWatcherTest(unittest.TestCase): self.assertEqual(self.mock_watcher.GetMetadata(), {}) mock_response.assert_called_once_with( - metadata_key='', recursive=True, wait=False) + metadata_key='', recursive=True, wait=False, timeout=None) self.mock_watcher.logger.exception.assert_not_called() def testHandleMetadataUpdateException(self): @@ -250,10 +254,13 @@ class MetadataWatcherTest(unittest.TestCase): self.assertEqual( self.mock_watcher._HandleMetadataUpdate( - metadata_key=metadata_key, recursive=recursive, wait=wait), + metadata_key=metadata_key, recursive=recursive, wait=wait, + timeout=None), {}) expected_calls = [ - mock.call(metadata_key=metadata_key, recursive=recursive, wait=wait), + mock.call( + metadata_key=metadata_key, recursive=recursive, wait=wait, + timeout=None), ] * 4 self.assertEqual(mock_response.mock_calls, expected_calls) expected_calls = [mock.call.exception(mock.ANY)] * 2 @@ -272,7 +279,7 @@ class MetadataWatcherTest(unittest.TestCase): self.mock_watcher.WatchMetadata(mock_handler, recursive=recursive) mock_handler.assert_called_once_with({}) mock_response.assert_called_once_with( - metadata_key='', recursive=recursive, wait=True) + metadata_key='', recursive=recursive, wait=True, timeout=None) def testWatchMetadataException(self): mock_response = mock.Mock() @@ -286,7 +293,7 @@ class MetadataWatcherTest(unittest.TestCase): self.mock_watcher.WatchMetadata( None, metadata_key=metadata_key, recursive=recursive) mock_response.assert_called_once_with( - metadata_key=metadata_key, recursive=recursive, wait=True) + metadata_key=metadata_key, recursive=recursive, wait=True, timeout=None) def testGetMetadata(self): mock_response = mock.Mock() @@ -295,7 +302,7 @@ class MetadataWatcherTest(unittest.TestCase): self.assertEqual(self.mock_watcher.GetMetadata(), {}) mock_response.assert_called_once_with( - metadata_key='', recursive=True, wait=False) + metadata_key='', recursive=True, wait=False, timeout=None) self.mock_watcher.logger.exception.assert_not_called() def testGetMetadataArgs(self): @@ -306,10 +313,10 @@ class MetadataWatcherTest(unittest.TestCase): recursive = False response = self.mock_watcher.GetMetadata( - metadata_key=metadata_key, recursive=recursive) + metadata_key=metadata_key, recursive=recursive, timeout=60) self.assertEqual(response, {}) mock_response.assert_called_once_with( - metadata_key=metadata_key, recursive=False, wait=False) + metadata_key=metadata_key, recursive=False, wait=False, timeout=60) self.mock_watcher.logger.exception.assert_not_called() |