diff options
author | Jenkins <jenkins@review.openstack.org> | 2012-08-23 23:08:03 +0000 |
---|---|---|
committer | Gerrit Code Review <review@openstack.org> | 2012-08-23 23:08:03 +0000 |
commit | d8c02dccc098177c9b63536353b986b27645909d (patch) | |
tree | 7d2c05367a1a868a587158a8a18b9b97810183f8 | |
parent | 3f01f889d5d61c0352ba73e7c1bca32b4b48d952 (diff) | |
parent | eb4af8f84097ae6f1c0057313dd8c0c002b3d49d (diff) | |
download | swift-d8c02dccc098177c9b63536353b986b27645909d.tar.gz |
Merge "split proxy controllers into individual modules"
-rw-r--r-- | swift/common/middleware/ratelimit.py | 2 | ||||
-rw-r--r-- | swift/proxy/controllers/__init__.py | 4 | ||||
-rw-r--r-- | swift/proxy/controllers/account.py | 164 | ||||
-rw-r--r-- | swift/proxy/controllers/base.py | 678 | ||||
-rw-r--r-- | swift/proxy/controllers/container.py | 228 | ||||
-rw-r--r-- | swift/proxy/controllers/obj.py | 945 | ||||
-rw-r--r-- | swift/proxy/server.py | 1881 | ||||
-rw-r--r-- | test/unit/common/middleware/test_ratelimit.py | 2 | ||||
-rw-r--r-- | test/unit/proxy/test_server.py | 662 |
9 files changed, 2321 insertions, 2245 deletions
diff --git a/swift/common/middleware/ratelimit.py b/swift/common/middleware/ratelimit.py index 86cf9e0d5..dad907c6d 100644 --- a/swift/common/middleware/ratelimit.py +++ b/swift/common/middleware/ratelimit.py @@ -16,7 +16,7 @@ import eventlet from webob import Request, Response from swift.common.utils import split_path, cache_from_env, get_logger -from swift.proxy.server import get_container_memcache_key +from swift.proxy.controllers.base import get_container_memcache_key from swift.common.memcached import MemcacheConnectionError diff --git a/swift/proxy/controllers/__init__.py b/swift/proxy/controllers/__init__.py new file mode 100644 index 000000000..516f3c285 --- /dev/null +++ b/swift/proxy/controllers/__init__.py @@ -0,0 +1,4 @@ +from swift.proxy.controllers.base import Controller +from swift.proxy.controllers.obj import ObjectController +from swift.proxy.controllers.account import AccountController +from swift.proxy.controllers.container import ContainerController diff --git a/swift/proxy/controllers/account.py b/swift/proxy/controllers/account.py new file mode 100644 index 000000000..bd95d5555 --- /dev/null +++ b/swift/proxy/controllers/account.py @@ -0,0 +1,164 @@ +# Copyright (c) 2010-2012 OpenStack, LLC. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# NOTE: swift_conn +# You'll see swift_conn passed around a few places in this file. This is the +# source httplib connection of whatever it is attached to. +# It is used when early termination of reading from the connection should +# happen, such as when a range request is satisfied but there's still more the +# source connection would like to send. To prevent having to read all the data +# that could be left, the source connection can be .close() and then reads +# commence to empty out any buffers. +# These shenanigans are to ensure all related objects can be garbage +# collected. We've seen objects hang around forever otherwise. + +import time +from urllib import unquote +from random import shuffle + +from webob.exc import HTTPBadRequest, HTTPMethodNotAllowed +from webob import Request + +from swift.common.utils import normalize_timestamp, public +from swift.common.constraints import check_metadata, MAX_ACCOUNT_NAME_LENGTH +from swift.common.http import is_success, HTTP_NOT_FOUND +from swift.proxy.controllers.base import Controller + + +class AccountController(Controller): + """WSGI controller for account requests""" + server_type = _('Account') + + def __init__(self, app, account_name, **kwargs): + Controller.__init__(self, app) + self.account_name = unquote(account_name) + + def GETorHEAD(self, req, stats_type): + """Handler for HTTP GET/HEAD requests.""" + start_time = time.time() + partition, nodes = self.app.account_ring.get_nodes(self.account_name) + shuffle(nodes) + resp = self.GETorHEAD_base(req, _('Account'), partition, nodes, + req.path_info.rstrip('/'), len(nodes)) + if resp.status_int == HTTP_NOT_FOUND and self.app.account_autocreate: + if len(self.account_name) > MAX_ACCOUNT_NAME_LENGTH: + resp = HTTPBadRequest(request=req) + resp.body = 'Account name length of %d longer than %d' % \ + (len(self.account_name), MAX_ACCOUNT_NAME_LENGTH) + self.app.logger.timing_since( + '%s.timing' % (stats_type,), start_time) + return resp + headers = {'X-Timestamp': normalize_timestamp(time.time()), + 'X-Trans-Id': self.trans_id, + 'Connection': 'close'} + resp = self.make_requests( + Request.blank('/v1/' + self.account_name), + self.app.account_ring, partition, 'PUT', + '/' + self.account_name, [headers] * len(nodes)) + if not is_success(resp.status_int): + self.app.logger.warning('Could not autocreate account %r' % + self.account_name) + return resp + resp = self.GETorHEAD_base(req, _('Account'), partition, nodes, + req.path_info.rstrip('/'), len(nodes)) + self.app.logger.timing_since('%s.timing' % (stats_type,), start_time) + return resp + + @public + def PUT(self, req): + """HTTP PUT request handler.""" + start_time = time.time() + if not self.app.allow_account_management: + self.app.logger.timing_since('PUT.timing', start_time) + return HTTPMethodNotAllowed(request=req) + error_response = check_metadata(req, 'account') + if error_response: + self.app.logger.increment('errors') + return error_response + if len(self.account_name) > MAX_ACCOUNT_NAME_LENGTH: + resp = HTTPBadRequest(request=req) + resp.body = 'Account name length of %d longer than %d' % \ + (len(self.account_name), MAX_ACCOUNT_NAME_LENGTH) + self.app.logger.increment('errors') + return resp + account_partition, accounts = \ + self.app.account_ring.get_nodes(self.account_name) + headers = {'X-Timestamp': normalize_timestamp(time.time()), + 'x-trans-id': self.trans_id, + 'Connection': 'close'} + self.transfer_headers(req.headers, headers) + if self.app.memcache: + self.app.memcache.delete('account%s' % req.path_info.rstrip('/')) + resp = self.make_requests(req, self.app.account_ring, + account_partition, 'PUT', req.path_info, [headers] * len(accounts)) + self.app.logger.timing_since('PUT.timing', start_time) + return resp + + @public + def POST(self, req): + """HTTP POST request handler.""" + start_time = time.time() + error_response = check_metadata(req, 'account') + if error_response: + self.app.logger.increment('errors') + return error_response + account_partition, accounts = \ + self.app.account_ring.get_nodes(self.account_name) + headers = {'X-Timestamp': normalize_timestamp(time.time()), + 'X-Trans-Id': self.trans_id, + 'Connection': 'close'} + self.transfer_headers(req.headers, headers) + if self.app.memcache: + self.app.memcache.delete('account%s' % req.path_info.rstrip('/')) + resp = self.make_requests(req, self.app.account_ring, + account_partition, 'POST', req.path_info, + [headers] * len(accounts)) + if resp.status_int == HTTP_NOT_FOUND and self.app.account_autocreate: + if len(self.account_name) > MAX_ACCOUNT_NAME_LENGTH: + resp = HTTPBadRequest(request=req) + resp.body = 'Account name length of %d longer than %d' % \ + (len(self.account_name), MAX_ACCOUNT_NAME_LENGTH) + self.app.logger.increment('errors') + return resp + resp = self.make_requests( + Request.blank('/v1/' + self.account_name), + self.app.account_ring, account_partition, 'PUT', + '/' + self.account_name, [headers] * len(accounts)) + if not is_success(resp.status_int): + self.app.logger.warning('Could not autocreate account %r' % + self.account_name) + return resp + self.app.logger.timing_since('POST.timing', start_time) + return resp + + @public + def DELETE(self, req): + """HTTP DELETE request handler.""" + start_time = time.time() + if not self.app.allow_account_management: + self.app.logger.timing_since('DELETE.timing', start_time) + return HTTPMethodNotAllowed(request=req) + account_partition, accounts = \ + self.app.account_ring.get_nodes(self.account_name) + headers = {'X-Timestamp': normalize_timestamp(time.time()), + 'X-Trans-Id': self.trans_id, + 'Connection': 'close'} + if self.app.memcache: + self.app.memcache.delete('account%s' % req.path_info.rstrip('/')) + resp = self.make_requests(req, self.app.account_ring, + account_partition, 'DELETE', req.path_info, + [headers] * len(accounts)) + self.app.logger.timing_since('DELETE.timing', start_time) + return resp diff --git a/swift/proxy/controllers/base.py b/swift/proxy/controllers/base.py new file mode 100644 index 000000000..ac032676f --- /dev/null +++ b/swift/proxy/controllers/base.py @@ -0,0 +1,678 @@ +# Copyright (c) 2010-2012 OpenStack, LLC. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# NOTE: swift_conn +# You'll see swift_conn passed around a few places in this file. This is the +# source httplib connection of whatever it is attached to. +# It is used when early termination of reading from the connection should +# happen, such as when a range request is satisfied but there's still more the +# source connection would like to send. To prevent having to read all the data +# that could be left, the source connection can be .close() and then reads +# commence to empty out any buffers. +# These shenanigans are to ensure all related objects can be garbage +# collected. We've seen objects hang around forever otherwise. + +import time +import functools + +from eventlet import spawn_n, GreenPile, Timeout +from eventlet.queue import Queue, Empty, Full +from eventlet.timeout import Timeout +from webob.exc import \ + status_map +from webob import Request, Response + +from swift.common.utils import normalize_timestamp, TRUE_VALUES, public +from swift.common.bufferedhttp import http_connect +from swift.common.constraints import MAX_ACCOUNT_NAME_LENGTH +from swift.common.exceptions import ChunkReadTimeout, ConnectionTimeout +from swift.common.http import is_informational, is_success, is_redirection, \ + is_server_error, HTTP_OK, HTTP_PARTIAL_CONTENT, HTTP_MULTIPLE_CHOICES, \ + HTTP_BAD_REQUEST, HTTP_NOT_FOUND, HTTP_SERVICE_UNAVAILABLE, \ + HTTP_INSUFFICIENT_STORAGE + + +def update_headers(response, headers): + """ + Helper function to update headers in the response. + + :param response: webob.Response object + :param headers: dictionary headers + """ + if hasattr(headers, 'items'): + headers = headers.items() + for name, value in headers: + if name == 'etag': + response.headers[name] = value.replace('"', '') + elif name not in ('date', 'content-length', 'content-type', + 'connection', 'x-put-timestamp', 'x-delete-after'): + response.headers[name] = value + + +def delay_denial(func): + """ + Decorator to declare which methods should have any swift.authorize call + delayed. This is so the method can load the Request object up with + additional information that may be needed by the authorization system. + + :param func: function for which authorization will be delayed + """ + func.delay_denial = True + + @functools.wraps(func) + def wrapped(*a, **kw): + return func(*a, **kw) + return wrapped + + +def get_account_memcache_key(account): + return 'account/%s' % account + + +def get_container_memcache_key(account, container): + return 'container/%s/%s' % (account, container) + + +class Controller(object): + """Base WSGI controller class for the proxy""" + server_type = _('Base') + + # Ensure these are all lowercase + pass_through_headers = [] + + def __init__(self, app): + self.account_name = None + self.app = app + self.trans_id = '-' + + def transfer_headers(self, src_headers, dst_headers): + x_remove = 'x-remove-%s-meta-' % self.server_type.lower() + x_meta = 'x-%s-meta-' % self.server_type.lower() + dst_headers.update((k.lower().replace('-remove', '', 1), '') + for k in src_headers + if k.lower().startswith(x_remove)) + dst_headers.update((k.lower(), v) + for k, v in src_headers.iteritems() + if k.lower() in self.pass_through_headers or + k.lower().startswith(x_meta)) + + def error_increment(self, node): + """ + Handles incrementing error counts when talking to nodes. + + :param node: dictionary of node to increment the error count for + """ + node['errors'] = node.get('errors', 0) + 1 + node['last_error'] = time.time() + + def error_occurred(self, node, msg): + """ + Handle logging, and handling of errors. + + :param node: dictionary of node to handle errors for + :param msg: error message + """ + self.error_increment(node) + self.app.logger.error(_('%(msg)s %(ip)s:%(port)s'), + {'msg': msg, 'ip': node['ip'], 'port': node['port']}) + + def exception_occurred(self, node, typ, additional_info): + """ + Handle logging of generic exceptions. + + :param node: dictionary of node to log the error for + :param typ: server type + :param additional_info: additional information to log + """ + self.app.logger.exception( + _('ERROR with %(type)s server %(ip)s:%(port)s/%(device)s re: ' + '%(info)s'), + {'type': typ, 'ip': node['ip'], 'port': node['port'], + 'device': node['device'], 'info': additional_info}) + + def error_limited(self, node): + """ + Check if the node is currently error limited. + + :param node: dictionary of node to check + :returns: True if error limited, False otherwise + """ + now = time.time() + if not 'errors' in node: + return False + if 'last_error' in node and node['last_error'] < \ + now - self.app.error_suppression_interval: + del node['last_error'] + if 'errors' in node: + del node['errors'] + return False + limited = node['errors'] > self.app.error_suppression_limit + if limited: + self.app.logger.debug( + _('Node error limited %(ip)s:%(port)s (%(device)s)'), node) + return limited + + def error_limit(self, node): + """ + Mark a node as error limited. + + :param node: dictionary of node to error limit + """ + node['errors'] = self.app.error_suppression_limit + 1 + node['last_error'] = time.time() + + def account_info(self, account, autocreate=False): + """ + Get account information, and also verify that the account exists. + + :param account: name of the account to get the info for + :returns: tuple of (account partition, account nodes, container_count) + or (None, None, None) if it does not exist + """ + partition, nodes = self.app.account_ring.get_nodes(account) + # 0 = no responses, 200 = found, 404 = not found, -1 = mixed responses + if self.app.memcache: + cache_key = get_account_memcache_key(account) + cache_value = self.app.memcache.get(cache_key) + if not isinstance(cache_value, dict): + result_code = cache_value + container_count = 0 + else: + result_code = cache_value['status'] + container_count = cache_value['container_count'] + if result_code == HTTP_OK: + return partition, nodes, container_count + elif result_code == HTTP_NOT_FOUND and not autocreate: + return None, None, None + result_code = 0 + container_count = 0 + attempts_left = len(nodes) + path = '/%s' % account + headers = {'x-trans-id': self.trans_id, 'Connection': 'close'} + iternodes = self.iter_nodes(partition, nodes, self.app.account_ring) + while attempts_left > 0: + try: + node = iternodes.next() + except StopIteration: + break + attempts_left -= 1 + try: + with ConnectionTimeout(self.app.conn_timeout): + conn = http_connect(node['ip'], node['port'], + node['device'], partition, 'HEAD', path, headers) + with Timeout(self.app.node_timeout): + resp = conn.getresponse() + body = resp.read() + if is_success(resp.status): + result_code = HTTP_OK + container_count = int( + resp.getheader('x-account-container-count') or 0) + break + elif resp.status == HTTP_NOT_FOUND: + if result_code == 0: + result_code = HTTP_NOT_FOUND + elif result_code != HTTP_NOT_FOUND: + result_code = -1 + elif resp.status == HTTP_INSUFFICIENT_STORAGE: + self.error_limit(node) + continue + else: + result_code = -1 + except (Exception, Timeout): + self.exception_occurred(node, _('Account'), + _('Trying to get account info for %s') % path) + if result_code == HTTP_NOT_FOUND and autocreate: + if len(account) > MAX_ACCOUNT_NAME_LENGTH: + return None, None, None + headers = {'X-Timestamp': normalize_timestamp(time.time()), + 'X-Trans-Id': self.trans_id, + 'Connection': 'close'} + resp = self.make_requests(Request.blank('/v1' + path), + self.app.account_ring, partition, 'PUT', + path, [headers] * len(nodes)) + if not is_success(resp.status_int): + self.app.logger.warning('Could not autocreate account %r' % \ + path) + return None, None, None + result_code = HTTP_OK + if self.app.memcache and result_code in (HTTP_OK, HTTP_NOT_FOUND): + if result_code == HTTP_OK: + cache_timeout = self.app.recheck_account_existence + else: + cache_timeout = self.app.recheck_account_existence * 0.1 + self.app.memcache.set(cache_key, + {'status': result_code, 'container_count': container_count}, + timeout=cache_timeout) + if result_code == HTTP_OK: + return partition, nodes, container_count + return None, None, None + + def container_info(self, account, container, account_autocreate=False): + """ + Get container information and thusly verify container existance. + This will also make a call to account_info to verify that the + account exists. + + :param account: account name for the container + :param container: container name to look up + :returns: tuple of (container partition, container nodes, container + read acl, container write acl, container sync key) or (None, + None, None, None, None) if the container does not exist + """ + partition, nodes = self.app.container_ring.get_nodes( + account, container) + path = '/%s/%s' % (account, container) + if self.app.memcache: + cache_key = get_container_memcache_key(account, container) + cache_value = self.app.memcache.get(cache_key) + if isinstance(cache_value, dict): + status = cache_value['status'] + read_acl = cache_value['read_acl'] + write_acl = cache_value['write_acl'] + sync_key = cache_value.get('sync_key') + versions = cache_value.get('versions') + if status == HTTP_OK: + return partition, nodes, read_acl, write_acl, sync_key, \ + versions + elif status == HTTP_NOT_FOUND: + return None, None, None, None, None, None + if not self.account_info(account, autocreate=account_autocreate)[1]: + return None, None, None, None, None, None + result_code = 0 + read_acl = None + write_acl = None + sync_key = None + container_size = None + versions = None + attempts_left = len(nodes) + headers = {'x-trans-id': self.trans_id, 'Connection': 'close'} + iternodes = self.iter_nodes(partition, nodes, self.app.container_ring) + while attempts_left > 0: + try: + node = iternodes.next() + except StopIteration: + break + attempts_left -= 1 + try: + with ConnectionTimeout(self.app.conn_timeout): + conn = http_connect(node['ip'], node['port'], + node['device'], partition, 'HEAD', path, headers) + with Timeout(self.app.node_timeout): + resp = conn.getresponse() + body = resp.read() + if is_success(resp.status): + result_code = HTTP_OK + read_acl = resp.getheader('x-container-read') + write_acl = resp.getheader('x-container-write') + sync_key = resp.getheader('x-container-sync-key') + container_size = \ + resp.getheader('X-Container-Object-Count') + versions = resp.getheader('x-versions-location') + break + elif resp.status == HTTP_NOT_FOUND: + if result_code == 0: + result_code = HTTP_NOT_FOUND + elif result_code != HTTP_NOT_FOUND: + result_code = -1 + elif resp.status == HTTP_INSUFFICIENT_STORAGE: + self.error_limit(node) + continue + else: + result_code = -1 + except (Exception, Timeout): + self.exception_occurred(node, _('Container'), + _('Trying to get container info for %s') % path) + if self.app.memcache and result_code in (HTTP_OK, HTTP_NOT_FOUND): + if result_code == HTTP_OK: + cache_timeout = self.app.recheck_container_existence + else: + cache_timeout = self.app.recheck_container_existence * 0.1 + self.app.memcache.set(cache_key, + {'status': result_code, + 'read_acl': read_acl, + 'write_acl': write_acl, + 'sync_key': sync_key, + 'container_size': container_size, + 'versions': versions}, + timeout=cache_timeout) + if result_code == HTTP_OK: + return partition, nodes, read_acl, write_acl, sync_key, versions + return None, None, None, None, None, None + + def iter_nodes(self, partition, nodes, ring): + """ + Node iterator that will first iterate over the normal nodes for a + partition and then the handoff partitions for the node. + + :param partition: partition to iterate nodes for + :param nodes: list of node dicts from the ring + :param ring: ring to get handoff nodes from + """ + for node in nodes: + if not self.error_limited(node): + yield node + handoffs = 0 + for node in ring.get_more_nodes(partition): + if not self.error_limited(node): + handoffs += 1 + if self.app.log_handoffs: + self.app.logger.increment('handoff_count') + self.app.logger.warning( + 'Handoff requested (%d)' % handoffs) + if handoffs == len(nodes): + self.app.logger.increment('handoff_all_count') + yield node + + def _make_request(self, nodes, part, method, path, headers, query, + logger_thread_locals): + self.app.logger.thread_locals = logger_thread_locals + for node in nodes: + try: + with ConnectionTimeout(self.app.conn_timeout): + conn = http_connect(node['ip'], node['port'], + node['device'], part, method, path, + headers=headers, query_string=query) + conn.node = node + with Timeout(self.app.node_timeout): + resp = conn.getresponse() + if not is_informational(resp.status) and \ + not is_server_error(resp.status): + return resp.status, resp.reason, resp.read() + elif resp.status == HTTP_INSUFFICIENT_STORAGE: + self.error_limit(node) + except (Exception, Timeout): + self.exception_occurred(node, self.server_type, + _('Trying to %(method)s %(path)s') % + {'method': method, 'path': path}) + + def make_requests(self, req, ring, part, method, path, headers, + query_string=''): + """ + Sends an HTTP request to multiple nodes and aggregates the results. + It attempts the primary nodes concurrently, then iterates over the + handoff nodes as needed. + + :param headers: a list of dicts, where each dict represents one + backend request that should be made. + :returns: a webob Response object + """ + start_nodes = ring.get_part_nodes(part) + nodes = self.iter_nodes(part, start_nodes, ring) + pile = GreenPile(len(start_nodes)) + for head in headers: + pile.spawn(self._make_request, nodes, part, method, path, + head, query_string, self.app.logger.thread_locals) + response = [resp for resp in pile if resp] + while len(response) < len(start_nodes): + response.append((HTTP_SERVICE_UNAVAILABLE, '', '')) + statuses, reasons, bodies = zip(*response) + return self.best_response(req, statuses, reasons, bodies, + '%s %s' % (self.server_type, req.method)) + + def best_response(self, req, statuses, reasons, bodies, server_type, + etag=None): + """ + Given a list of responses from several servers, choose the best to + return to the API. + + :param req: webob.Request object + :param statuses: list of statuses returned + :param reasons: list of reasons for each status + :param bodies: bodies of each response + :param server_type: type of server the responses came from + :param etag: etag + :returns: webob.Response object with the correct status, body, etc. set + """ + resp = Response(request=req) + if len(statuses): + for hundred in (HTTP_OK, HTTP_MULTIPLE_CHOICES, HTTP_BAD_REQUEST): + hstatuses = \ + [s for s in statuses if hundred <= s < hundred + 100] + if len(hstatuses) > len(statuses) / 2: + status = max(hstatuses) + status_index = statuses.index(status) + resp.status = '%s %s' % (status, reasons[status_index]) + resp.body = bodies[status_index] + resp.content_type = 'text/html' + if etag: + resp.headers['etag'] = etag.strip('"') + return resp + self.app.logger.error(_('%(type)s returning 503 for %(statuses)s'), + {'type': server_type, 'statuses': statuses}) + resp.status = '503 Internal Server Error' + return resp + + @public + def GET(self, req): + """Handler for HTTP GET requests.""" + return self.GETorHEAD(req, stats_type='GET') + + @public + def HEAD(self, req): + """Handler for HTTP HEAD requests.""" + return self.GETorHEAD(req, stats_type='HEAD') + + def _make_app_iter_reader(self, node, source, queue, logger_thread_locals): + """ + Reads from the source and places data in the queue. It expects + something else be reading from the queue and, if nothing does within + self.app.client_timeout seconds, the process will be aborted. + + :param node: The node dict that the source is connected to, for + logging/error-limiting purposes. + :param source: The httplib.Response object to read from. + :param queue: The eventlet.queue.Queue to place read source data into. + :param logger_thread_locals: The thread local values to be set on the + self.app.logger to retain transaction + logging information. + """ + self.app.logger.thread_locals = logger_thread_locals + success = True + try: + try: + while True: + with ChunkReadTimeout(self.app.node_timeout): + chunk = source.read(self.app.object_chunk_size) + if not chunk: + break + queue.put(chunk, timeout=self.app.client_timeout) + except Full: + self.app.logger.warn( + _('Client did not read from queue within %ss') % + self.app.client_timeout) + self.app.logger.increment('client_timeouts') + success = False + except (Exception, Timeout): + self.exception_occurred(node, _('Object'), + _('Trying to read during GET')) + success = False + finally: + # Ensure the queue getter gets a terminator. + queue.resize(2) + queue.put(success) + # Close-out the connection as best as possible. + if getattr(source, 'swift_conn', None): + try: + source.swift_conn.close() + except Exception: + pass + source.swift_conn = None + try: + while source.read(self.app.object_chunk_size): + pass + except Exception: + pass + try: + source.close() + except Exception: + pass + + def _make_app_iter(self, node, source, response): + """ + Returns an iterator over the contents of the source (via its read + func). There is also quite a bit of cleanup to ensure garbage + collection works and the underlying socket of the source is closed. + + :param response: The webob.Response object this iterator should be + assigned to via response.app_iter. + :param source: The httplib.Response object this iterator should read + from. + :param node: The node the source is reading from, for logging purposes. + """ + try: + try: + # Spawn reader to read from the source and place in the queue. + # We then drop any reference to the source or node, for garbage + # collection purposes. + queue = Queue(1) + spawn_n(self._make_app_iter_reader, node, source, queue, + self.app.logger.thread_locals) + source = node = None + while True: + chunk = queue.get(timeout=self.app.node_timeout) + if isinstance(chunk, bool): # terminator + success = chunk + if not success: + raise Exception(_('Failed to read all data' + ' from the source')) + break + yield chunk + except Empty: + raise ChunkReadTimeout() + except (GeneratorExit, Timeout): + self.app.logger.warn(_('Client disconnected on read')) + except Exception: + self.app.logger.exception(_('Trying to send to client')) + raise + finally: + response.app_iter = None + + def GETorHEAD_base(self, req, server_type, partition, nodes, path, + attempts): + """ + Base handler for HTTP GET or HEAD requests. + + :param req: webob.Request object + :param server_type: server type + :param partition: partition + :param nodes: nodes + :param path: path for the request + :param attempts: number of attempts to try + :returns: webob.Response object + """ + statuses = [] + reasons = [] + bodies = [] + source = None + newest = req.headers.get('x-newest', 'f').lower() in TRUE_VALUES + nodes = iter(nodes) + while len(statuses) < attempts: + try: + node = nodes.next() + except StopIteration: + break + if self.error_limited(node): + continue + try: + with ConnectionTimeout(self.app.conn_timeout): + headers = dict(req.headers) + headers['Connection'] = 'close' + conn = http_connect(node['ip'], node['port'], + node['device'], partition, req.method, path, + headers=headers, + query_string=req.query_string) + with Timeout(self.app.node_timeout): + possible_source = conn.getresponse() + # See NOTE: swift_conn at top of file about this. + possible_source.swift_conn = conn + except (Exception, Timeout): + self.exception_occurred(node, server_type, + _('Trying to %(method)s %(path)s') % + {'method': req.method, 'path': req.path}) + continue + if possible_source.status == HTTP_INSUFFICIENT_STORAGE: + self.error_limit(node) + continue + if is_success(possible_source.status) or \ + is_redirection(possible_source.status): + # 404 if we know we don't have a synced copy + if not float(possible_source.getheader('X-PUT-Timestamp', 1)): + statuses.append(HTTP_NOT_FOUND) + reasons.append('') + bodies.append('') + possible_source.read() + continue + if newest: + if source: + ts = float(source.getheader('x-put-timestamp') or + source.getheader('x-timestamp') or 0) + pts = float( + possible_source.getheader('x-put-timestamp') or + possible_source.getheader('x-timestamp') or 0) + if pts > ts: + source = possible_source + else: + source = possible_source + statuses.append(source.status) + reasons.append(source.reason) + bodies.append('') + continue + else: + source = possible_source + break + statuses.append(possible_source.status) + reasons.append(possible_source.reason) + bodies.append(possible_source.read()) + if is_server_error(possible_source.status): + self.error_occurred(node, _('ERROR %(status)d %(body)s ' \ + 'From %(type)s Server') % + {'status': possible_source.status, + 'body': bodies[-1][:1024], 'type': server_type}) + if source: + if req.method == 'GET' and \ + source.status in (HTTP_OK, HTTP_PARTIAL_CONTENT): + res = Response(request=req, conditional_response=True) + res.app_iter = self._make_app_iter(node, source, res) + # See NOTE: swift_conn at top of file about this. + res.swift_conn = source.swift_conn + update_headers(res, source.getheaders()) + # Used by container sync feature + if res.environ is None: + res.environ = dict() + res.environ['swift_x_timestamp'] = \ + source.getheader('x-timestamp') + update_headers(res, {'accept-ranges': 'bytes'}) + res.status = source.status + res.content_length = source.getheader('Content-Length') + if source.getheader('Content-Type'): + res.charset = None + res.content_type = source.getheader('Content-Type') + return res + elif is_success(source.status) or is_redirection(source.status): + res = status_map[source.status](request=req) + update_headers(res, source.getheaders()) + # Used by container sync feature + if res.environ is None: + res.environ = dict() + res.environ['swift_x_timestamp'] = \ + source.getheader('x-timestamp') + update_headers(res, {'accept-ranges': 'bytes'}) + res.content_length = source.getheader('Content-Length') + if source.getheader('Content-Type'): + res.charset = None + res.content_type = source.getheader('Content-Type') + return res + return self.best_response(req, statuses, reasons, bodies, + '%s %s' % (server_type, req.method)) diff --git a/swift/proxy/controllers/container.py b/swift/proxy/controllers/container.py new file mode 100644 index 000000000..34c733910 --- /dev/null +++ b/swift/proxy/controllers/container.py @@ -0,0 +1,228 @@ +# Copyright (c) 2010-2012 OpenStack, LLC. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# NOTE: swift_conn +# You'll see swift_conn passed around a few places in this file. This is the +# source httplib connection of whatever it is attached to. +# It is used when early termination of reading from the connection should +# happen, such as when a range request is satisfied but there's still more the +# source connection would like to send. To prevent having to read all the data +# that could be left, the source connection can be .close() and then reads +# commence to empty out any buffers. +# These shenanigans are to ensure all related objects can be garbage +# collected. We've seen objects hang around forever otherwise. + +import time +from urllib import unquote +from random import shuffle + +from webob.exc import HTTPBadRequest, HTTPForbidden, \ + HTTPNotFound + +from swift.common.utils import normalize_timestamp, public +from swift.common.constraints import check_metadata, MAX_CONTAINER_NAME_LENGTH +from swift.common.http import HTTP_ACCEPTED +from swift.proxy.controllers.base import Controller, delay_denial, \ + get_container_memcache_key + + +class ContainerController(Controller): + """WSGI controller for container requests""" + server_type = _('Container') + + # Ensure these are all lowercase + pass_through_headers = ['x-container-read', 'x-container-write', + 'x-container-sync-key', 'x-container-sync-to', + 'x-versions-location'] + + def __init__(self, app, account_name, container_name, **kwargs): + Controller.__init__(self, app) + self.account_name = unquote(account_name) + self.container_name = unquote(container_name) + + def clean_acls(self, req): + if 'swift.clean_acl' in req.environ: + for header in ('x-container-read', 'x-container-write'): + if header in req.headers: + try: + req.headers[header] = \ + req.environ['swift.clean_acl'](header, + req.headers[header]) + except ValueError, err: + return HTTPBadRequest(request=req, body=str(err)) + return None + + def GETorHEAD(self, req, stats_type): + """Handler for HTTP GET/HEAD requests.""" + start_time = time.time() + if not self.account_info(self.account_name)[1]: + self.app.logger.timing_since( + '%s.timing' % (stats_type,), start_time) + return HTTPNotFound(request=req) + part, nodes = self.app.container_ring.get_nodes( + self.account_name, self.container_name) + shuffle(nodes) + resp = self.GETorHEAD_base(req, _('Container'), part, nodes, + req.path_info, len(nodes)) + + if self.app.memcache: + # set the memcache container size for ratelimiting + cache_key = get_container_memcache_key(self.account_name, + self.container_name) + self.app.memcache.set(cache_key, + {'status': resp.status_int, + 'read_acl': resp.headers.get('x-container-read'), + 'write_acl': resp.headers.get('x-container-write'), + 'sync_key': resp.headers.get('x-container-sync-key'), + 'container_size': resp.headers.get('x-container-object-count'), + 'versions': resp.headers.get('x-versions-location')}, + timeout=self.app.recheck_container_existence) + + if 'swift.authorize' in req.environ: + req.acl = resp.headers.get('x-container-read') + aresp = req.environ['swift.authorize'](req) + if aresp: + self.app.logger.increment('auth_short_circuits') + return aresp + if not req.environ.get('swift_owner', False): + for key in ('x-container-read', 'x-container-write', + 'x-container-sync-key', 'x-container-sync-to'): + if key in resp.headers: + del resp.headers[key] + self.app.logger.timing_since('%s.timing' % (stats_type,), start_time) + return resp + + @public + @delay_denial + def GET(self, req): + """Handler for HTTP GET requests.""" + return self.GETorHEAD(req, stats_type='GET') + + @public + @delay_denial + def HEAD(self, req): + """Handler for HTTP HEAD requests.""" + return self.GETorHEAD(req, stats_type='HEAD') + + @public + def PUT(self, req): + """HTTP PUT request handler.""" + start_time = time.time() + error_response = \ + self.clean_acls(req) or check_metadata(req, 'container') + if error_response: + self.app.logger.increment('errors') + return error_response + if len(self.container_name) > MAX_CONTAINER_NAME_LENGTH: + resp = HTTPBadRequest(request=req) + resp.body = 'Container name length of %d longer than %d' % \ + (len(self.container_name), MAX_CONTAINER_NAME_LENGTH) + self.app.logger.increment('errors') + return resp + account_partition, accounts, container_count = \ + self.account_info(self.account_name, + autocreate=self.app.account_autocreate) + if self.app.max_containers_per_account > 0 and \ + container_count >= self.app.max_containers_per_account and \ + self.account_name not in self.app.max_containers_whitelist: + resp = HTTPForbidden(request=req) + resp.body = 'Reached container limit of %s' % \ + self.app.max_containers_per_account + return resp + if not accounts: + self.app.logger.timing_since('PUT.timing', start_time) + return HTTPNotFound(request=req) + container_partition, containers = self.app.container_ring.get_nodes( + self.account_name, self.container_name) + headers = [] + for account in accounts: + nheaders = {'X-Timestamp': normalize_timestamp(time.time()), + 'x-trans-id': self.trans_id, + 'X-Account-Host': '%(ip)s:%(port)s' % account, + 'X-Account-Partition': account_partition, + 'X-Account-Device': account['device'], + 'Connection': 'close'} + self.transfer_headers(req.headers, nheaders) + headers.append(nheaders) + if self.app.memcache: + cache_key = get_container_memcache_key(self.account_name, + self.container_name) + self.app.memcache.delete(cache_key) + resp = self.make_requests(req, self.app.container_ring, + container_partition, 'PUT', req.path_info, headers) + self.app.logger.timing_since('PUT.timing', start_time) + return resp + + @public + def POST(self, req): + """HTTP POST request handler.""" + start_time = time.time() + error_response = \ + self.clean_acls(req) or check_metadata(req, 'container') + if error_response: + self.app.logger.increment('errors') + return error_response + account_partition, accounts, container_count = \ + self.account_info(self.account_name, + autocreate=self.app.account_autocreate) + if not accounts: + self.app.logger.timing_since('POST.timing', start_time) + return HTTPNotFound(request=req) + container_partition, containers = self.app.container_ring.get_nodes( + self.account_name, self.container_name) + headers = {'X-Timestamp': normalize_timestamp(time.time()), + 'x-trans-id': self.trans_id, + 'Connection': 'close'} + self.transfer_headers(req.headers, headers) + if self.app.memcache: + cache_key = get_container_memcache_key(self.account_name, + self.container_name) + self.app.memcache.delete(cache_key) + resp = self.make_requests(req, self.app.container_ring, + container_partition, 'POST', req.path_info, + [headers] * len(containers)) + self.app.logger.timing_since('POST.timing', start_time) + return resp + + @public + def DELETE(self, req): + """HTTP DELETE request handler.""" + start_time = time.time() + account_partition, accounts, container_count = \ + self.account_info(self.account_name) + if not accounts: + self.app.logger.timing_since('DELETE.timing', start_time) + return HTTPNotFound(request=req) + container_partition, containers = self.app.container_ring.get_nodes( + self.account_name, self.container_name) + headers = [] + for account in accounts: + headers.append({'X-Timestamp': normalize_timestamp(time.time()), + 'X-Trans-Id': self.trans_id, + 'X-Account-Host': '%(ip)s:%(port)s' % account, + 'X-Account-Partition': account_partition, + 'X-Account-Device': account['device'], + 'Connection': 'close'}) + if self.app.memcache: + cache_key = get_container_memcache_key(self.account_name, + self.container_name) + self.app.memcache.delete(cache_key) + resp = self.make_requests(req, self.app.container_ring, + container_partition, 'DELETE', req.path_info, headers) + # Indicates no server had the container + self.app.logger.timing_since('DELETE.timing', start_time) + if resp.status_int == HTTP_ACCEPTED: + return HTTPNotFound(request=req) + return resp diff --git a/swift/proxy/controllers/obj.py b/swift/proxy/controllers/obj.py new file mode 100644 index 000000000..79963d9fb --- /dev/null +++ b/swift/proxy/controllers/obj.py @@ -0,0 +1,945 @@ +# Copyright (c) 2010-2012 OpenStack, LLC. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# NOTE: swift_conn +# You'll see swift_conn passed around a few places in this file. This is the +# source httplib connection of whatever it is attached to. +# It is used when early termination of reading from the connection should +# happen, such as when a range request is satisfied but there's still more the +# source connection would like to send. To prevent having to read all the data +# that could be left, the source connection can be .close() and then reads +# commence to empty out any buffers. +# These shenanigans are to ensure all related objects can be garbage +# collected. We've seen objects hang around forever otherwise. + +try: + import simplejson as json +except ImportError: + import json +import mimetypes +import re +import time +from datetime import datetime +from urllib import unquote, quote +from hashlib import md5 +from random import shuffle + +from eventlet import sleep, GreenPile, Timeout +from eventlet.queue import Queue +from eventlet.timeout import Timeout +from webob.exc import HTTPAccepted, HTTPBadRequest, HTTPNotFound, \ + HTTPPreconditionFailed, HTTPRequestEntityTooLarge, HTTPRequestTimeout, \ + HTTPServerError, HTTPServiceUnavailable +from webob import Request, Response + +from swift.common.utils import ContextPool, normalize_timestamp, TRUE_VALUES, \ + public +from swift.common.bufferedhttp import http_connect +from swift.common.constraints import check_metadata, check_object_creation, \ + CONTAINER_LISTING_LIMIT, MAX_FILE_SIZE +from swift.common.exceptions import ChunkReadTimeout, \ + ChunkWriteTimeout, ConnectionTimeout, ListingIterNotFound, \ + ListingIterNotAuthorized, ListingIterError +from swift.common.http import is_success, is_client_error, HTTP_CONTINUE, \ + HTTP_CREATED, HTTP_MULTIPLE_CHOICES, HTTP_NOT_FOUND, \ + HTTP_INTERNAL_SERVER_ERROR, HTTP_SERVICE_UNAVAILABLE, \ + HTTP_INSUFFICIENT_STORAGE, HTTPClientDisconnect +from swift.proxy.controllers.base import Controller, delay_denial + + +class SegmentedIterable(object): + """ + Iterable that returns the object contents for a segmented object in Swift. + + If there's a failure that cuts the transfer short, the response's + `status_int` will be updated (again, just for logging since the original + status would have already been sent to the client). + + :param controller: The ObjectController instance to work with. + :param container: The container the object segments are within. + :param listing: The listing of object segments to iterate over; this may + be an iterator or list that returns dicts with 'name' and + 'bytes' keys. + :param response: The webob.Response this iterable is associated with, if + any (default: None) + """ + + def __init__(self, controller, container, listing, response=None): + self.controller = controller + self.container = container + self.listing = iter(listing) + self.segment = -1 + self.segment_dict = None + self.segment_peek = None + self.seek = 0 + self.segment_iter = None + # See NOTE: swift_conn at top of file about this. + self.segment_iter_swift_conn = None + self.position = 0 + self.response = response + if not self.response: + self.response = Response() + self.next_get_time = 0 + + def _load_next_segment(self): + """ + Loads the self.segment_iter with the next object segment's contents. + + :raises: StopIteration when there are no more object segments. + """ + try: + self.segment += 1 + self.segment_dict = self.segment_peek or self.listing.next() + self.segment_peek = None + partition, nodes = self.controller.app.object_ring.get_nodes( + self.controller.account_name, self.container, + self.segment_dict['name']) + path = '/%s/%s/%s' % (self.controller.account_name, self.container, + self.segment_dict['name']) + req = Request.blank(path) + if self.seek: + req.range = 'bytes=%s-' % self.seek + self.seek = 0 + if self.segment > self.controller.app.rate_limit_after_segment: + sleep(max(self.next_get_time - time.time(), 0)) + self.next_get_time = time.time() + \ + 1.0 / self.controller.app.rate_limit_segments_per_sec + shuffle(nodes) + resp = self.controller.GETorHEAD_base(req, _('Object'), partition, + self.controller.iter_nodes(partition, nodes, + self.controller.app.object_ring), path, + len(nodes)) + if not is_success(resp.status_int): + raise Exception(_('Could not load object segment %(path)s:' \ + ' %(status)s') % {'path': path, 'status': resp.status_int}) + self.segment_iter = resp.app_iter + # See NOTE: swift_conn at top of file about this. + self.segment_iter_swift_conn = getattr(resp, 'swift_conn', None) + except StopIteration: + raise + except (Exception, Timeout), err: + if not getattr(err, 'swift_logged', False): + self.controller.app.logger.exception(_('ERROR: While ' + 'processing manifest /%(acc)s/%(cont)s/%(obj)s'), + {'acc': self.controller.account_name, + 'cont': self.controller.container_name, + 'obj': self.controller.object_name}) + err.swift_logged = True + self.response.status_int = HTTP_SERVICE_UNAVAILABLE + raise + + def next(self): + return iter(self).next() + + def __iter__(self): + """ Standard iterator function that returns the object's contents. """ + try: + while True: + if not self.segment_iter: + self._load_next_segment() + while True: + with ChunkReadTimeout(self.controller.app.node_timeout): + try: + chunk = self.segment_iter.next() + break + except StopIteration: + self._load_next_segment() + self.position += len(chunk) + yield chunk + except StopIteration: + raise + except (Exception, Timeout), err: + if not getattr(err, 'swift_logged', False): + self.controller.app.logger.exception(_('ERROR: While ' + 'processing manifest /%(acc)s/%(cont)s/%(obj)s'), + {'acc': self.controller.account_name, + 'cont': self.controller.container_name, + 'obj': self.controller.object_name}) + err.swift_logged = True + self.response.status_int = HTTP_SERVICE_UNAVAILABLE + raise + + def app_iter_range(self, start, stop): + """ + Non-standard iterator function for use with Webob in serving Range + requests more quickly. This will skip over segments and do a range + request on the first segment to return data from, if needed. + + :param start: The first byte (zero-based) to return. None for 0. + :param stop: The last byte (zero-based) to return. None for end. + """ + try: + if start: + self.segment_peek = self.listing.next() + while start >= self.position + self.segment_peek['bytes']: + self.segment += 1 + self.position += self.segment_peek['bytes'] + self.segment_peek = self.listing.next() + self.seek = start - self.position + else: + start = 0 + if stop is not None: + length = stop - start + else: + length = None + for chunk in self: + if length is not None: + length -= len(chunk) + if length < 0: + # Chop off the extra: + yield chunk[:length] + break + yield chunk + # See NOTE: swift_conn at top of file about this. + if self.segment_iter_swift_conn: + try: + self.segment_iter_swift_conn.close() + except Exception: + pass + self.segment_iter_swift_conn = None + if self.segment_iter: + try: + while self.segment_iter.next(): + pass + except Exception: + pass + self.segment_iter = None + except StopIteration: + raise + except (Exception, Timeout), err: + if not getattr(err, 'swift_logged', False): + self.controller.app.logger.exception(_('ERROR: While ' + 'processing manifest /%(acc)s/%(cont)s/%(obj)s'), + {'acc': self.controller.account_name, + 'cont': self.controller.container_name, + 'obj': self.controller.object_name}) + err.swift_logged = True + self.response.status_int = HTTP_SERVICE_UNAVAILABLE + raise + + +class ObjectController(Controller): + """WSGI controller for object requests.""" + server_type = _('Object') + + def __init__(self, app, account_name, container_name, object_name, + **kwargs): + Controller.__init__(self, app) + self.account_name = unquote(account_name) + self.container_name = unquote(container_name) + self.object_name = unquote(object_name) + + def _listing_iter(self, lcontainer, lprefix, env): + lpartition, lnodes = self.app.container_ring.get_nodes( + self.account_name, lcontainer) + marker = '' + while True: + lreq = Request.blank('i will be overridden by env', environ=env) + # Don't quote PATH_INFO, by WSGI spec + lreq.environ['PATH_INFO'] = \ + '/%s/%s' % (self.account_name, lcontainer) + lreq.environ['REQUEST_METHOD'] = 'GET' + lreq.environ['QUERY_STRING'] = \ + 'format=json&prefix=%s&marker=%s' % (quote(lprefix), + quote(marker)) + shuffle(lnodes) + lresp = self.GETorHEAD_base(lreq, _('Container'), + lpartition, lnodes, lreq.path_info, + len(lnodes)) + if 'swift.authorize' in env: + lreq.acl = lresp.headers.get('x-container-read') + aresp = env['swift.authorize'](lreq) + if aresp: + raise ListingIterNotAuthorized(aresp) + if lresp.status_int == HTTP_NOT_FOUND: + raise ListingIterNotFound() + elif not is_success(lresp.status_int): + raise ListingIterError() + if not lresp.body: + break + sublisting = json.loads(lresp.body) + if not sublisting: + break + marker = sublisting[-1]['name'] + for obj in sublisting: + yield obj + + def GETorHEAD(self, req, stats_type): + """Handle HTTP GET or HEAD requests.""" + start_time = time.time() + _junk, _junk, req.acl, _junk, _junk, object_versions = \ + self.container_info(self.account_name, self.container_name) + if 'swift.authorize' in req.environ: + aresp = req.environ['swift.authorize'](req) + if aresp: + self.app.logger.increment('auth_short_circuits') + return aresp + partition, nodes = self.app.object_ring.get_nodes( + self.account_name, self.container_name, self.object_name) + shuffle(nodes) + resp = self.GETorHEAD_base(req, _('Object'), partition, + self.iter_nodes(partition, nodes, self.app.object_ring), + req.path_info, len(nodes)) + # Whether we get a 416 Requested Range Not Satisfiable or not, + # we should request a manifest because size of manifest file + # can be not 0. After checking a manifest, redo the range request + # on the whole object. + if req.range: + req_range = req.range + req.range = None + resp2 = self.GETorHEAD_base(req, _('Object'), partition, + self.iter_nodes(partition, + nodes, + self.app.object_ring), + req.path_info, len(nodes)) + if 'x-object-manifest' not in resp2.headers: + self.app.logger.timing_since( + '%s.timing' % (stats_type,), start_time) + return resp + resp = resp2 + req.range = str(req_range) + + if 'x-object-manifest' in resp.headers: + lcontainer, lprefix = \ + resp.headers['x-object-manifest'].split('/', 1) + lcontainer = unquote(lcontainer) + lprefix = unquote(lprefix) + try: + listing = list(self._listing_iter(lcontainer, lprefix, + req.environ)) + except ListingIterNotFound: + self.app.logger.timing_since( + '%s.timing' % (stats_type,), start_time) + return HTTPNotFound(request=req) + except ListingIterNotAuthorized, err: + self.app.logger.increment('auth_short_circuits') + return err.aresp + except ListingIterError: + self.app.logger.increment('errors') + return HTTPServerError(request=req) + + if len(listing) > CONTAINER_LISTING_LIMIT: + resp = Response(headers=resp.headers, request=req, + conditional_response=True) + if req.method == 'HEAD': + # These shenanigans are because webob translates the HEAD + # request into a webob EmptyResponse for the body, which + # has a len, which eventlet translates as needing a + # content-length header added. So we call the original + # webob resp for the headers but return an empty iterator + # for the body. + + def head_response(environ, start_response): + resp(environ, start_response) + return iter([]) + + head_response.status_int = resp.status_int + self.app.logger.timing_since( + '%s.timing' % (stats_type,), start_time) + return head_response + else: + resp.app_iter = SegmentedIterable(self, lcontainer, + self._listing_iter(lcontainer, lprefix, req.environ), + resp) + + else: + # For objects with a reasonable number of segments, we'll serve + # them with a set content-length and computed etag. + if listing: + content_length = sum(o['bytes'] for o in listing) + last_modified = max(o['last_modified'] for o in listing) + last_modified = datetime(*map(int, re.split('[^\d]', + last_modified)[:-1])) + etag = md5( + ''.join(o['hash'] for o in listing)).hexdigest() + else: + content_length = 0 + last_modified = resp.last_modified + etag = md5().hexdigest() + resp = Response(headers=resp.headers, request=req, + conditional_response=True) + resp.app_iter = SegmentedIterable(self, lcontainer, listing, + resp) + resp.content_length = content_length + resp.last_modified = last_modified + resp.etag = etag + resp.headers['accept-ranges'] = 'bytes' + + self.app.logger.timing_since('%s.timing' % (stats_type,), start_time) + return resp + + @public + @delay_denial + def GET(self, req): + """Handler for HTTP GET requests.""" + return self.GETorHEAD(req, stats_type='GET') + + @public + @delay_denial + def HEAD(self, req): + """Handler for HTTP HEAD requests.""" + return self.GETorHEAD(req, stats_type='HEAD') + + @public + @delay_denial + def POST(self, req): + """HTTP POST request handler.""" + start_time = time.time() + if 'x-delete-after' in req.headers: + try: + x_delete_after = int(req.headers['x-delete-after']) + except ValueError: + self.app.logger.increment('errors') + return HTTPBadRequest(request=req, + content_type='text/plain', + body='Non-integer X-Delete-After') + req.headers['x-delete-at'] = '%d' % (time.time() + x_delete_after) + if self.app.object_post_as_copy: + req.method = 'PUT' + req.path_info = '/%s/%s/%s' % (self.account_name, + self.container_name, self.object_name) + req.headers['Content-Length'] = 0 + req.headers['X-Copy-From'] = quote('/%s/%s' % (self.container_name, + self.object_name)) + req.headers['X-Fresh-Metadata'] = 'true' + req.environ['swift_versioned_copy'] = True + resp = self.PUT(req, start_time=start_time, stats_type='POST') + # Older editions returned 202 Accepted on object POSTs, so we'll + # convert any 201 Created responses to that for compatibility with + # picky clients. + if resp.status_int != HTTP_CREATED: + return resp + return HTTPAccepted(request=req) + else: + error_response = check_metadata(req, 'object') + if error_response: + self.app.logger.increment('errors') + return error_response + container_partition, containers, _junk, req.acl, _junk, _junk = \ + self.container_info(self.account_name, self.container_name, + account_autocreate=self.app.account_autocreate) + if 'swift.authorize' in req.environ: + aresp = req.environ['swift.authorize'](req) + if aresp: + self.app.logger.increment('auth_short_circuits') + return aresp + if not containers: + self.app.logger.timing_since('POST.timing', start_time) + return HTTPNotFound(request=req) + if 'x-delete-at' in req.headers: + try: + x_delete_at = int(req.headers['x-delete-at']) + if x_delete_at < time.time(): + self.app.logger.increment('errors') + return HTTPBadRequest(body='X-Delete-At in past', + request=req, content_type='text/plain') + except ValueError: + self.app.logger.increment('errors') + return HTTPBadRequest(request=req, + content_type='text/plain', + body='Non-integer X-Delete-At') + delete_at_container = str(x_delete_at / + self.app.expiring_objects_container_divisor * + self.app.expiring_objects_container_divisor) + delete_at_part, delete_at_nodes = \ + self.app.container_ring.get_nodes( + self.app.expiring_objects_account, delete_at_container) + else: + delete_at_part = delete_at_nodes = None + partition, nodes = self.app.object_ring.get_nodes( + self.account_name, self.container_name, self.object_name) + req.headers['X-Timestamp'] = normalize_timestamp(time.time()) + headers = [] + for container in containers: + nheaders = dict(req.headers.iteritems()) + nheaders['Connection'] = 'close' + nheaders['X-Container-Host'] = '%(ip)s:%(port)s' % container + nheaders['X-Container-Partition'] = container_partition + nheaders['X-Container-Device'] = container['device'] + if delete_at_nodes: + node = delete_at_nodes.pop(0) + nheaders['X-Delete-At-Host'] = '%(ip)s:%(port)s' % node + nheaders['X-Delete-At-Partition'] = delete_at_part + nheaders['X-Delete-At-Device'] = node['device'] + headers.append(nheaders) + resp = self.make_requests(req, self.app.object_ring, partition, + 'POST', req.path_info, headers) + self.app.logger.timing_since('POST.timing', start_time) + return resp + + def _send_file(self, conn, path): + """Method for a file PUT coro""" + while True: + chunk = conn.queue.get() + if not conn.failed: + try: + with ChunkWriteTimeout(self.app.node_timeout): + conn.send(chunk) + except (Exception, ChunkWriteTimeout): + conn.failed = True + self.exception_occurred(conn.node, _('Object'), + _('Trying to write to %s') % path) + conn.queue.task_done() + + def _connect_put_node(self, nodes, part, path, headers, + logger_thread_locals): + """Method for a file PUT connect""" + self.app.logger.thread_locals = logger_thread_locals + for node in nodes: + try: + with ConnectionTimeout(self.app.conn_timeout): + conn = http_connect(node['ip'], node['port'], + node['device'], part, 'PUT', path, headers) + with Timeout(self.app.node_timeout): + resp = conn.getexpect() + if resp.status == HTTP_CONTINUE: + conn.node = node + return conn + elif resp.status == HTTP_INSUFFICIENT_STORAGE: + self.error_limit(node) + except: + self.exception_occurred(node, _('Object'), + _('Expect: 100-continue on %s') % path) + + @public + @delay_denial + def PUT(self, req, start_time=None, stats_type='PUT'): + """HTTP PUT request handler.""" + if not start_time: + start_time = time.time() + (container_partition, containers, _junk, req.acl, + req.environ['swift_sync_key'], object_versions) = \ + self.container_info(self.account_name, self.container_name, + account_autocreate=self.app.account_autocreate) + if 'swift.authorize' in req.environ: + aresp = req.environ['swift.authorize'](req) + if aresp: + self.app.logger.increment('auth_short_circuits') + return aresp + if not containers: + self.app.logger.timing_since( + '%s.timing' % (stats_type,), start_time) + return HTTPNotFound(request=req) + if 'x-delete-after' in req.headers: + try: + x_delete_after = int(req.headers['x-delete-after']) + except ValueError: + self.app.logger.increment('errors') + return HTTPBadRequest(request=req, + content_type='text/plain', + body='Non-integer X-Delete-After') + req.headers['x-delete-at'] = '%d' % (time.time() + x_delete_after) + if 'x-delete-at' in req.headers: + try: + x_delete_at = int(req.headers['x-delete-at']) + if x_delete_at < time.time(): + self.app.logger.increment('errors') + return HTTPBadRequest(body='X-Delete-At in past', + request=req, content_type='text/plain') + except ValueError: + self.app.logger.increment('errors') + return HTTPBadRequest(request=req, content_type='text/plain', + body='Non-integer X-Delete-At') + delete_at_container = str(x_delete_at / + self.app.expiring_objects_container_divisor * + self.app.expiring_objects_container_divisor) + delete_at_part, delete_at_nodes = \ + self.app.container_ring.get_nodes( + self.app.expiring_objects_account, delete_at_container) + else: + delete_at_part = delete_at_nodes = None + partition, nodes = self.app.object_ring.get_nodes( + self.account_name, self.container_name, self.object_name) + # do a HEAD request for container sync and checking object versions + if 'x-timestamp' in req.headers or (object_versions and not + req.environ.get('swift_versioned_copy')): + hreq = Request.blank(req.path_info, headers={'X-Newest': 'True'}, + environ={'REQUEST_METHOD': 'HEAD'}) + hresp = self.GETorHEAD_base(hreq, _('Object'), partition, nodes, + hreq.path_info, len(nodes)) + # Used by container sync feature + if 'x-timestamp' in req.headers: + try: + req.headers['X-Timestamp'] = \ + normalize_timestamp(float(req.headers['x-timestamp'])) + if hresp.environ and 'swift_x_timestamp' in hresp.environ and \ + float(hresp.environ['swift_x_timestamp']) >= \ + float(req.headers['x-timestamp']): + self.app.logger.timing_since( + '%.timing' % (stats_type,), start_time) + return HTTPAccepted(request=req) + except ValueError: + self.app.logger.increment('errors') + return HTTPBadRequest(request=req, content_type='text/plain', + body='X-Timestamp should be a UNIX timestamp float value; ' + 'was %r' % req.headers['x-timestamp']) + else: + req.headers['X-Timestamp'] = normalize_timestamp(time.time()) + # Sometimes the 'content-type' header exists, but is set to None. + content_type_manually_set = True + if not req.headers.get('content-type'): + guessed_type, _junk = mimetypes.guess_type(req.path_info) + req.headers['Content-Type'] = guessed_type or \ + 'application/octet-stream' + content_type_manually_set = False + error_response = check_object_creation(req, self.object_name) + if error_response: + self.app.logger.increment('errors') + return error_response + if object_versions and not req.environ.get('swift_versioned_copy'): + is_manifest = 'x-object-manifest' in req.headers or \ + 'x-object-manifest' in hresp.headers + if hresp.status_int != HTTP_NOT_FOUND and not is_manifest: + # This is a version manifest and needs to be handled + # differently. First copy the existing data to a new object, + # then write the data from this request to the version manifest + # object. + lcontainer = object_versions.split('/')[0] + prefix_len = '%03x' % len(self.object_name) + lprefix = prefix_len + self.object_name + '/' + ts_source = hresp.environ.get('swift_x_timestamp') + if ts_source is None: + ts_source = time.mktime(time.strptime( + hresp.headers['last-modified'], + '%a, %d %b %Y %H:%M:%S GMT')) + new_ts = normalize_timestamp(ts_source) + vers_obj_name = lprefix + new_ts + copy_headers = { + 'Destination': '%s/%s' % (lcontainer, vers_obj_name)} + copy_environ = {'REQUEST_METHOD': 'COPY', + 'swift_versioned_copy': True + } + copy_req = Request.blank(req.path_info, headers=copy_headers, + environ=copy_environ) + copy_resp = self.COPY(copy_req) + if is_client_error(copy_resp.status_int): + # missing container or bad permissions + return HTTPPreconditionFailed(request=req) + elif not is_success(copy_resp.status_int): + # could not copy the data, bail + return HTTPServiceUnavailable(request=req) + + reader = req.environ['wsgi.input'].read + data_source = iter(lambda: reader(self.app.client_chunk_size), '') + source_header = req.headers.get('X-Copy-From') + source_resp = None + if source_header: + source_header = unquote(source_header) + acct = req.path_info.split('/', 2)[1] + if isinstance(acct, unicode): + acct = acct.encode('utf-8') + if not source_header.startswith('/'): + source_header = '/' + source_header + source_header = '/' + acct + source_header + try: + src_container_name, src_obj_name = \ + source_header.split('/', 3)[2:] + except ValueError: + self.app.logger.increment('errors') + return HTTPPreconditionFailed(request=req, + body='X-Copy-From header must be of the form' + '<container name>/<object name>') + source_req = req.copy_get() + source_req.path_info = source_header + source_req.headers['X-Newest'] = 'true' + orig_obj_name = self.object_name + orig_container_name = self.container_name + self.object_name = src_obj_name + self.container_name = src_container_name + source_resp = self.GET(source_req) + if source_resp.status_int >= HTTP_MULTIPLE_CHOICES: + self.app.logger.timing_since( + '%s.timing' % (stats_type,), start_time) + return source_resp + self.object_name = orig_obj_name + self.container_name = orig_container_name + new_req = Request.blank(req.path_info, + environ=req.environ, headers=req.headers) + data_source = source_resp.app_iter + new_req.content_length = source_resp.content_length + if new_req.content_length is None: + # This indicates a transfer-encoding: chunked source object, + # which currently only happens because there are more than + # CONTAINER_LISTING_LIMIT segments in a segmented object. In + # this case, we're going to refuse to do the server-side copy. + self.app.logger.increment('errors') + return HTTPRequestEntityTooLarge(request=req) + new_req.etag = source_resp.etag + # we no longer need the X-Copy-From header + del new_req.headers['X-Copy-From'] + if not content_type_manually_set: + new_req.headers['Content-Type'] = \ + source_resp.headers['Content-Type'] + if new_req.headers.get('x-fresh-metadata', 'false').lower() \ + not in TRUE_VALUES: + for k, v in source_resp.headers.items(): + if k.lower().startswith('x-object-meta-'): + new_req.headers[k] = v + for k, v in req.headers.items(): + if k.lower().startswith('x-object-meta-'): + new_req.headers[k] = v + req = new_req + node_iter = self.iter_nodes(partition, nodes, self.app.object_ring) + pile = GreenPile(len(nodes)) + for container in containers: + nheaders = dict(req.headers.iteritems()) + nheaders['Connection'] = 'close' + nheaders['X-Container-Host'] = '%(ip)s:%(port)s' % container + nheaders['X-Container-Partition'] = container_partition + nheaders['X-Container-Device'] = container['device'] + nheaders['Expect'] = '100-continue' + if delete_at_nodes: + node = delete_at_nodes.pop(0) + nheaders['X-Delete-At-Host'] = '%(ip)s:%(port)s' % node + nheaders['X-Delete-At-Partition'] = delete_at_part + nheaders['X-Delete-At-Device'] = node['device'] + pile.spawn(self._connect_put_node, node_iter, partition, + req.path_info, nheaders, self.app.logger.thread_locals) + conns = [conn for conn in pile if conn] + if len(conns) <= len(nodes) / 2: + self.app.logger.error( + _('Object PUT returning 503, %(conns)s/%(nodes)s ' + 'required connections'), + {'conns': len(conns), 'nodes': len(nodes) // 2 + 1}) + self.app.logger.increment('errors') + return HTTPServiceUnavailable(request=req) + chunked = req.headers.get('transfer-encoding') + bytes_transferred = 0 + try: + with ContextPool(len(nodes)) as pool: + for conn in conns: + conn.failed = False + conn.queue = Queue(self.app.put_queue_depth) + pool.spawn(self._send_file, conn, req.path) + while True: + with ChunkReadTimeout(self.app.client_timeout): + try: + chunk = next(data_source) + except StopIteration: + if chunked: + [conn.queue.put('0\r\n\r\n') for conn in conns] + break + bytes_transferred += len(chunk) + if bytes_transferred > MAX_FILE_SIZE: + self.app.logger.increment('errors') + return HTTPRequestEntityTooLarge(request=req) + for conn in list(conns): + if not conn.failed: + conn.queue.put('%x\r\n%s\r\n' % (len(chunk), chunk) + if chunked else chunk) + else: + conns.remove(conn) + if len(conns) <= len(nodes) / 2: + self.app.logger.error(_('Object PUT exceptions during' + ' send, %(conns)s/%(nodes)s required connections'), + {'conns': len(conns), 'nodes': len(nodes) / 2 + 1}) + self.app.logger.increment('errors') + return HTTPServiceUnavailable(request=req) + for conn in conns: + if conn.queue.unfinished_tasks: + conn.queue.join() + conns = [conn for conn in conns if not conn.failed] + except ChunkReadTimeout, err: + self.app.logger.warn( + _('ERROR Client read timeout (%ss)'), err.seconds) + self.app.logger.increment('client_timeouts') + return HTTPRequestTimeout(request=req) + except (Exception, Timeout): + self.app.logger.exception( + _('ERROR Exception causing client disconnect')) + self.app.logger.increment('client_disconnects') + self.app.logger.timing_since( + '%s.timing' % (stats_type,), start_time) + return HTTPClientDisconnect(request=req) + if req.content_length and bytes_transferred < req.content_length: + req.client_disconnect = True + self.app.logger.warn( + _('Client disconnected without sending enough data')) + self.app.logger.increment('client_disconnects') + self.app.logger.timing_since( + '%s.timing' % (stats_type,), start_time) + return HTTPClientDisconnect(request=req) + statuses = [] + reasons = [] + bodies = [] + etags = set() + for conn in conns: + try: + with Timeout(self.app.node_timeout): + response = conn.getresponse() + statuses.append(response.status) + reasons.append(response.reason) + bodies.append(response.read()) + if response.status >= HTTP_INTERNAL_SERVER_ERROR: + self.error_occurred(conn.node, + _('ERROR %(status)d %(body)s From Object Server ' \ + 're: %(path)s') % {'status': response.status, + 'body': bodies[-1][:1024], 'path': req.path}) + elif is_success(response.status): + etags.add(response.getheader('etag').strip('"')) + except (Exception, Timeout): + self.exception_occurred(conn.node, _('Object'), + _('Trying to get final status of PUT to %s') % req.path) + if len(etags) > 1: + self.app.logger.error( + _('Object servers returned %s mismatched etags'), len(etags)) + self.app.logger.increment('errors') + return HTTPServerError(request=req) + etag = len(etags) and etags.pop() or None + while len(statuses) < len(nodes): + statuses.append(HTTP_SERVICE_UNAVAILABLE) + reasons.append('') + bodies.append('') + resp = self.best_response(req, statuses, reasons, bodies, + _('Object PUT'), etag=etag) + if source_header: + resp.headers['X-Copied-From'] = quote( + source_header.split('/', 2)[2]) + if 'last-modified' in source_resp.headers: + resp.headers['X-Copied-From-Last-Modified'] = \ + source_resp.headers['last-modified'] + for k, v in req.headers.items(): + if k.lower().startswith('x-object-meta-'): + resp.headers[k] = v + resp.last_modified = float(req.headers['X-Timestamp']) + self.app.logger.timing_since('%s.timing' % (stats_type,), start_time) + return resp + + @public + @delay_denial + def DELETE(self, req): + """HTTP DELETE request handler.""" + start_time = time.time() + (container_partition, containers, _junk, req.acl, + req.environ['swift_sync_key'], object_versions) = \ + self.container_info(self.account_name, self.container_name) + if object_versions: + # this is a version manifest and needs to be handled differently + lcontainer = object_versions.split('/')[0] + prefix_len = '%03x' % len(self.object_name) + lprefix = prefix_len + self.object_name + '/' + last_item = None + try: + for last_item in self._listing_iter(lcontainer, lprefix, + req.environ): + pass + except ListingIterNotFound: + # no worries, last_item is None + pass + except ListingIterNotAuthorized, err: + self.app.logger.increment('auth_short_circuits') + return err.aresp + except ListingIterError: + self.app.logger.increment('errors') + return HTTPServerError(request=req) + if last_item: + # there are older versions so copy the previous version to the + # current object and delete the previous version + orig_container = self.container_name + orig_obj = self.object_name + self.container_name = lcontainer + self.object_name = last_item['name'] + copy_path = '/' + self.account_name + '/' + \ + self.container_name + '/' + self.object_name + copy_headers = {'X-Newest': 'True', + 'Destination': orig_container + '/' + orig_obj + } + copy_environ = {'REQUEST_METHOD': 'COPY', + 'swift_versioned_copy': True + } + creq = Request.blank(copy_path, headers=copy_headers, + environ=copy_environ) + copy_resp = self.COPY(creq) + if is_client_error(copy_resp.status_int): + # some user error, maybe permissions + return HTTPPreconditionFailed(request=req) + elif not is_success(copy_resp.status_int): + # could not copy the data, bail + return HTTPServiceUnavailable(request=req) + # reset these because the COPY changed them + self.container_name = lcontainer + self.object_name = last_item['name'] + new_del_req = Request.blank(copy_path, environ=req.environ) + (container_partition, containers, + _junk, new_del_req.acl, _junk, _junk) = \ + self.container_info(self.account_name, self.container_name) + new_del_req.path_info = copy_path + req = new_del_req + if 'swift.authorize' in req.environ: + aresp = req.environ['swift.authorize'](req) + if aresp: + self.app.logger.increment('auth_short_circuits') + return aresp + if not containers: + self.app.logger.timing_since('DELETE.timing', start_time) + return HTTPNotFound(request=req) + partition, nodes = self.app.object_ring.get_nodes( + self.account_name, self.container_name, self.object_name) + # Used by container sync feature + if 'x-timestamp' in req.headers: + try: + req.headers['X-Timestamp'] = \ + normalize_timestamp(float(req.headers['x-timestamp'])) + except ValueError: + self.app.logger.increment('errors') + return HTTPBadRequest(request=req, content_type='text/plain', + body='X-Timestamp should be a UNIX timestamp float value; ' + 'was %r' % req.headers['x-timestamp']) + else: + req.headers['X-Timestamp'] = normalize_timestamp(time.time()) + headers = [] + for container in containers: + nheaders = dict(req.headers.iteritems()) + nheaders['Connection'] = 'close' + nheaders['X-Container-Host'] = '%(ip)s:%(port)s' % container + nheaders['X-Container-Partition'] = container_partition + nheaders['X-Container-Device'] = container['device'] + headers.append(nheaders) + resp = self.make_requests(req, self.app.object_ring, + partition, 'DELETE', req.path_info, headers) + self.app.logger.timing_since('DELETE.timing', start_time) + return resp + + @public + @delay_denial + def COPY(self, req): + """HTTP COPY request handler.""" + start_time = time.time() + dest = req.headers.get('Destination') + if not dest: + self.app.logger.increment('errors') + return HTTPPreconditionFailed(request=req, + body='Destination header required') + dest = unquote(dest) + if not dest.startswith('/'): + dest = '/' + dest + try: + _junk, dest_container, dest_object = dest.split('/', 2) + except ValueError: + self.app.logger.increment('errors') + return HTTPPreconditionFailed(request=req, + body='Destination header must be of the form ' + '<container name>/<object name>') + source = '/' + self.container_name + '/' + self.object_name + self.container_name = dest_container + self.object_name = dest_object + # re-write the existing request as a PUT instead of creating a new one + # since this one is already attached to the posthooklogger + req.method = 'PUT' + req.path_info = '/' + self.account_name + dest + req.headers['Content-Length'] = 0 + req.headers['X-Copy-From'] = quote(source) + del req.headers['Destination'] + return self.PUT(req, start_time=start_time, stats_type='COPY') diff --git a/swift/proxy/server.py b/swift/proxy/server.py index c42632cc2..b7aac4c3f 100644 --- a/swift/proxy/server.py +++ b/swift/proxy/server.py @@ -24,1886 +24,23 @@ # These shenanigans are to ensure all related objects can be garbage # collected. We've seen objects hang around forever otherwise. -from __future__ import with_statement -try: - import simplejson as json -except ImportError: - import json import mimetypes import os -import re import time -import traceback from ConfigParser import ConfigParser -from datetime import datetime -from urllib import unquote, quote import uuid -import functools -from hashlib import md5 -from random import shuffle -from eventlet import sleep, spawn_n, GreenPile, Timeout -from eventlet.queue import Queue, Empty, Full -from eventlet.timeout import Timeout -from webob.exc import HTTPAccepted, HTTPBadRequest, HTTPForbidden, \ - HTTPMethodNotAllowed, HTTPNotFound, HTTPPreconditionFailed, \ - HTTPRequestEntityTooLarge, HTTPRequestTimeout, HTTPServerError, \ - HTTPServiceUnavailable, status_map -from webob import Request, Response +from eventlet import Timeout +from webob.exc import HTTPBadRequest, HTTPForbidden, HTTPMethodNotAllowed, \ + HTTPNotFound, HTTPPreconditionFailed, HTTPServerError +from webob import Request from swift.common.ring import Ring -from swift.common.utils import cache_from_env, ContextPool, get_logger, \ - get_remote_client, normalize_timestamp, split_path, TRUE_VALUES, public -from swift.common.bufferedhttp import http_connect -from swift.common.constraints import check_metadata, check_object_creation, \ - check_utf8, CONTAINER_LISTING_LIMIT, MAX_ACCOUNT_NAME_LENGTH, \ - MAX_CONTAINER_NAME_LENGTH, MAX_FILE_SIZE -from swift.common.exceptions import ChunkReadTimeout, \ - ChunkWriteTimeout, ConnectionTimeout, ListingIterNotFound, \ - ListingIterNotAuthorized, ListingIterError -from swift.common.http import is_informational, is_success, is_redirection, \ - is_client_error, is_server_error, HTTP_CONTINUE, HTTP_OK, HTTP_CREATED, \ - HTTP_ACCEPTED, HTTP_PARTIAL_CONTENT, HTTP_MULTIPLE_CHOICES, \ - HTTP_BAD_REQUEST, HTTP_NOT_FOUND, HTTP_REQUESTED_RANGE_NOT_SATISFIABLE, \ - HTTP_INTERNAL_SERVER_ERROR, HTTP_SERVICE_UNAVAILABLE, \ - HTTP_INSUFFICIENT_STORAGE, HTTPClientDisconnect - - -def update_headers(response, headers): - """ - Helper function to update headers in the response. - - :param response: webob.Response object - :param headers: dictionary headers - """ - if hasattr(headers, 'items'): - headers = headers.items() - for name, value in headers: - if name == 'etag': - response.headers[name] = value.replace('"', '') - elif name not in ('date', 'content-length', 'content-type', - 'connection', 'x-put-timestamp', 'x-delete-after'): - response.headers[name] = value - - -def delay_denial(func): - """ - Decorator to declare which methods should have any swift.authorize call - delayed. This is so the method can load the Request object up with - additional information that may be needed by the authorization system. - - :param func: function for which authorization will be delayed - """ - func.delay_denial = True - - @functools.wraps(func) - def wrapped(*a, **kw): - return func(*a, **kw) - return wrapped - - -def get_account_memcache_key(account): - return 'account/%s' % account - - -def get_container_memcache_key(account, container): - return 'container/%s/%s' % (account, container) - - -class SegmentedIterable(object): - """ - Iterable that returns the object contents for a segmented object in Swift. - - If there's a failure that cuts the transfer short, the response's - `status_int` will be updated (again, just for logging since the original - status would have already been sent to the client). - - :param controller: The ObjectController instance to work with. - :param container: The container the object segments are within. - :param listing: The listing of object segments to iterate over; this may - be an iterator or list that returns dicts with 'name' and - 'bytes' keys. - :param response: The webob.Response this iterable is associated with, if - any (default: None) - """ - - def __init__(self, controller, container, listing, response=None): - self.controller = controller - self.container = container - self.listing = iter(listing) - self.segment = -1 - self.segment_dict = None - self.segment_peek = None - self.seek = 0 - self.segment_iter = None - # See NOTE: swift_conn at top of file about this. - self.segment_iter_swift_conn = None - self.position = 0 - self.response = response - if not self.response: - self.response = Response() - self.next_get_time = 0 - - def _load_next_segment(self): - """ - Loads the self.segment_iter with the next object segment's contents. - - :raises: StopIteration when there are no more object segments. - """ - try: - self.segment += 1 - self.segment_dict = self.segment_peek or self.listing.next() - self.segment_peek = None - partition, nodes = self.controller.app.object_ring.get_nodes( - self.controller.account_name, self.container, - self.segment_dict['name']) - path = '/%s/%s/%s' % (self.controller.account_name, self.container, - self.segment_dict['name']) - req = Request.blank(path) - if self.seek: - req.range = 'bytes=%s-' % self.seek - self.seek = 0 - if self.segment > self.controller.app.rate_limit_after_segment: - sleep(max(self.next_get_time - time.time(), 0)) - self.next_get_time = time.time() + \ - 1.0 / self.controller.app.rate_limit_segments_per_sec - shuffle(nodes) - resp = self.controller.GETorHEAD_base(req, _('Object'), partition, - self.controller.iter_nodes(partition, nodes, - self.controller.app.object_ring), path, - len(nodes)) - if not is_success(resp.status_int): - raise Exception(_('Could not load object segment %(path)s:' \ - ' %(status)s') % {'path': path, 'status': resp.status_int}) - self.segment_iter = resp.app_iter - # See NOTE: swift_conn at top of file about this. - self.segment_iter_swift_conn = getattr(resp, 'swift_conn', None) - except StopIteration: - raise - except (Exception, Timeout), err: - if not getattr(err, 'swift_logged', False): - self.controller.app.logger.exception(_('ERROR: While ' - 'processing manifest /%(acc)s/%(cont)s/%(obj)s'), - {'acc': self.controller.account_name, - 'cont': self.controller.container_name, - 'obj': self.controller.object_name}) - err.swift_logged = True - self.response.status_int = HTTP_SERVICE_UNAVAILABLE - raise - - def next(self): - return iter(self).next() - - def __iter__(self): - """ Standard iterator function that returns the object's contents. """ - try: - while True: - if not self.segment_iter: - self._load_next_segment() - while True: - with ChunkReadTimeout(self.controller.app.node_timeout): - try: - chunk = self.segment_iter.next() - break - except StopIteration: - self._load_next_segment() - self.position += len(chunk) - yield chunk - except StopIteration: - raise - except (Exception, Timeout), err: - if not getattr(err, 'swift_logged', False): - self.controller.app.logger.exception(_('ERROR: While ' - 'processing manifest /%(acc)s/%(cont)s/%(obj)s'), - {'acc': self.controller.account_name, - 'cont': self.controller.container_name, - 'obj': self.controller.object_name}) - err.swift_logged = True - self.response.status_int = HTTP_SERVICE_UNAVAILABLE - raise - - def app_iter_range(self, start, stop): - """ - Non-standard iterator function for use with Webob in serving Range - requests more quickly. This will skip over segments and do a range - request on the first segment to return data from, if needed. - - :param start: The first byte (zero-based) to return. None for 0. - :param stop: The last byte (zero-based) to return. None for end. - """ - try: - if start: - self.segment_peek = self.listing.next() - while start >= self.position + self.segment_peek['bytes']: - self.segment += 1 - self.position += self.segment_peek['bytes'] - self.segment_peek = self.listing.next() - self.seek = start - self.position - else: - start = 0 - if stop is not None: - length = stop - start - else: - length = None - for chunk in self: - if length is not None: - length -= len(chunk) - if length < 0: - # Chop off the extra: - yield chunk[:length] - break - yield chunk - # See NOTE: swift_conn at top of file about this. - if self.segment_iter_swift_conn: - try: - self.segment_iter_swift_conn.close() - except Exception: - pass - self.segment_iter_swift_conn = None - if self.segment_iter: - try: - while self.segment_iter.next(): - pass - except Exception: - pass - self.segment_iter = None - except StopIteration: - raise - except (Exception, Timeout), err: - if not getattr(err, 'swift_logged', False): - self.controller.app.logger.exception(_('ERROR: While ' - 'processing manifest /%(acc)s/%(cont)s/%(obj)s'), - {'acc': self.controller.account_name, - 'cont': self.controller.container_name, - 'obj': self.controller.object_name}) - err.swift_logged = True - self.response.status_int = HTTP_SERVICE_UNAVAILABLE - raise - - -class Controller(object): - """Base WSGI controller class for the proxy""" - server_type = _('Base') - - # Ensure these are all lowercase - pass_through_headers = [] - - def __init__(self, app): - self.account_name = None - self.app = app - self.trans_id = '-' - - def transfer_headers(self, src_headers, dst_headers): - x_remove = 'x-remove-%s-meta-' % self.server_type.lower() - x_meta = 'x-%s-meta-' % self.server_type.lower() - dst_headers.update((k.lower().replace('-remove', '', 1), '') - for k in src_headers - if k.lower().startswith(x_remove)) - dst_headers.update((k.lower(), v) - for k, v in src_headers.iteritems() - if k.lower() in self.pass_through_headers or - k.lower().startswith(x_meta)) - - def error_increment(self, node): - """ - Handles incrementing error counts when talking to nodes. - - :param node: dictionary of node to increment the error count for - """ - node['errors'] = node.get('errors', 0) + 1 - node['last_error'] = time.time() - - def error_occurred(self, node, msg): - """ - Handle logging, and handling of errors. - - :param node: dictionary of node to handle errors for - :param msg: error message - """ - self.error_increment(node) - self.app.logger.error(_('%(msg)s %(ip)s:%(port)s'), - {'msg': msg, 'ip': node['ip'], 'port': node['port']}) - - def exception_occurred(self, node, typ, additional_info): - """ - Handle logging of generic exceptions. - - :param node: dictionary of node to log the error for - :param typ: server type - :param additional_info: additional information to log - """ - self.app.logger.exception( - _('ERROR with %(type)s server %(ip)s:%(port)s/%(device)s re: ' - '%(info)s'), - {'type': typ, 'ip': node['ip'], 'port': node['port'], - 'device': node['device'], 'info': additional_info}) - - def error_limited(self, node): - """ - Check if the node is currently error limited. - - :param node: dictionary of node to check - :returns: True if error limited, False otherwise - """ - now = time.time() - if not 'errors' in node: - return False - if 'last_error' in node and node['last_error'] < \ - now - self.app.error_suppression_interval: - del node['last_error'] - if 'errors' in node: - del node['errors'] - return False - limited = node['errors'] > self.app.error_suppression_limit - if limited: - self.app.logger.debug( - _('Node error limited %(ip)s:%(port)s (%(device)s)'), node) - return limited - - def error_limit(self, node): - """ - Mark a node as error limited. - - :param node: dictionary of node to error limit - """ - node['errors'] = self.app.error_suppression_limit + 1 - node['last_error'] = time.time() - - def account_info(self, account, autocreate=False): - """ - Get account information, and also verify that the account exists. - - :param account: name of the account to get the info for - :returns: tuple of (account partition, account nodes, container_count) - or (None, None, None) if it does not exist - """ - partition, nodes = self.app.account_ring.get_nodes(account) - # 0 = no responses, 200 = found, 404 = not found, -1 = mixed responses - if self.app.memcache: - cache_key = get_account_memcache_key(account) - cache_value = self.app.memcache.get(cache_key) - if not isinstance(cache_value, dict): - result_code = cache_value - container_count = 0 - else: - result_code = cache_value['status'] - container_count = cache_value['container_count'] - if result_code == HTTP_OK: - return partition, nodes, container_count - elif result_code == HTTP_NOT_FOUND and not autocreate: - return None, None, None - result_code = 0 - container_count = 0 - attempts_left = len(nodes) - path = '/%s' % account - headers = {'x-trans-id': self.trans_id, 'Connection': 'close'} - iternodes = self.iter_nodes(partition, nodes, self.app.account_ring) - while attempts_left > 0: - try: - node = iternodes.next() - except StopIteration: - break - attempts_left -= 1 - try: - with ConnectionTimeout(self.app.conn_timeout): - conn = http_connect(node['ip'], node['port'], - node['device'], partition, 'HEAD', path, headers) - with Timeout(self.app.node_timeout): - resp = conn.getresponse() - body = resp.read() - if is_success(resp.status): - result_code = HTTP_OK - container_count = int( - resp.getheader('x-account-container-count') or 0) - break - elif resp.status == HTTP_NOT_FOUND: - if result_code == 0: - result_code = HTTP_NOT_FOUND - elif result_code != HTTP_NOT_FOUND: - result_code = -1 - elif resp.status == HTTP_INSUFFICIENT_STORAGE: - self.error_limit(node) - continue - else: - result_code = -1 - except (Exception, Timeout): - self.exception_occurred(node, _('Account'), - _('Trying to get account info for %s') % path) - if result_code == HTTP_NOT_FOUND and autocreate: - if len(account) > MAX_ACCOUNT_NAME_LENGTH: - return None, None, None - headers = {'X-Timestamp': normalize_timestamp(time.time()), - 'X-Trans-Id': self.trans_id, - 'Connection': 'close'} - resp = self.make_requests(Request.blank('/v1' + path), - self.app.account_ring, partition, 'PUT', - path, [headers] * len(nodes)) - if not is_success(resp.status_int): - self.app.logger.warning('Could not autocreate account %r' % \ - path) - return None, None, None - result_code = HTTP_OK - if self.app.memcache and result_code in (HTTP_OK, HTTP_NOT_FOUND): - if result_code == HTTP_OK: - cache_timeout = self.app.recheck_account_existence - else: - cache_timeout = self.app.recheck_account_existence * 0.1 - self.app.memcache.set(cache_key, - {'status': result_code, 'container_count': container_count}, - timeout=cache_timeout) - if result_code == HTTP_OK: - return partition, nodes, container_count - return None, None, None - - def container_info(self, account, container, account_autocreate=False): - """ - Get container information and thusly verify container existance. - This will also make a call to account_info to verify that the - account exists. - - :param account: account name for the container - :param container: container name to look up - :returns: tuple of (container partition, container nodes, container - read acl, container write acl, container sync key) or (None, - None, None, None, None) if the container does not exist - """ - partition, nodes = self.app.container_ring.get_nodes( - account, container) - path = '/%s/%s' % (account, container) - if self.app.memcache: - cache_key = get_container_memcache_key(account, container) - cache_value = self.app.memcache.get(cache_key) - if isinstance(cache_value, dict): - status = cache_value['status'] - read_acl = cache_value['read_acl'] - write_acl = cache_value['write_acl'] - sync_key = cache_value.get('sync_key') - versions = cache_value.get('versions') - if status == HTTP_OK: - return partition, nodes, read_acl, write_acl, sync_key, \ - versions - elif status == HTTP_NOT_FOUND: - return None, None, None, None, None, None - if not self.account_info(account, autocreate=account_autocreate)[1]: - return None, None, None, None, None, None - result_code = 0 - read_acl = None - write_acl = None - sync_key = None - container_size = None - versions = None - attempts_left = len(nodes) - headers = {'x-trans-id': self.trans_id, 'Connection': 'close'} - iternodes = self.iter_nodes(partition, nodes, self.app.container_ring) - while attempts_left > 0: - try: - node = iternodes.next() - except StopIteration: - break - attempts_left -= 1 - try: - with ConnectionTimeout(self.app.conn_timeout): - conn = http_connect(node['ip'], node['port'], - node['device'], partition, 'HEAD', path, headers) - with Timeout(self.app.node_timeout): - resp = conn.getresponse() - body = resp.read() - if is_success(resp.status): - result_code = HTTP_OK - read_acl = resp.getheader('x-container-read') - write_acl = resp.getheader('x-container-write') - sync_key = resp.getheader('x-container-sync-key') - container_size = \ - resp.getheader('X-Container-Object-Count') - versions = resp.getheader('x-versions-location') - break - elif resp.status == HTTP_NOT_FOUND: - if result_code == 0: - result_code = HTTP_NOT_FOUND - elif result_code != HTTP_NOT_FOUND: - result_code = -1 - elif resp.status == HTTP_INSUFFICIENT_STORAGE: - self.error_limit(node) - continue - else: - result_code = -1 - except (Exception, Timeout): - self.exception_occurred(node, _('Container'), - _('Trying to get container info for %s') % path) - if self.app.memcache and result_code in (HTTP_OK, HTTP_NOT_FOUND): - if result_code == HTTP_OK: - cache_timeout = self.app.recheck_container_existence - else: - cache_timeout = self.app.recheck_container_existence * 0.1 - self.app.memcache.set(cache_key, - {'status': result_code, - 'read_acl': read_acl, - 'write_acl': write_acl, - 'sync_key': sync_key, - 'container_size': container_size, - 'versions': versions}, - timeout=cache_timeout) - if result_code == HTTP_OK: - return partition, nodes, read_acl, write_acl, sync_key, versions - return None, None, None, None, None, None - - def iter_nodes(self, partition, nodes, ring): - """ - Node iterator that will first iterate over the normal nodes for a - partition and then the handoff partitions for the node. - - :param partition: partition to iterate nodes for - :param nodes: list of node dicts from the ring - :param ring: ring to get handoff nodes from - """ - for node in nodes: - if not self.error_limited(node): - yield node - handoffs = 0 - for node in ring.get_more_nodes(partition): - if not self.error_limited(node): - handoffs += 1 - if self.app.log_handoffs: - self.app.logger.increment('handoff_count') - self.app.logger.warning( - 'Handoff requested (%d)' % handoffs) - if handoffs == len(nodes): - self.app.logger.increment('handoff_all_count') - yield node - - def _make_request(self, nodes, part, method, path, headers, query, - logger_thread_locals): - self.app.logger.thread_locals = logger_thread_locals - for node in nodes: - try: - with ConnectionTimeout(self.app.conn_timeout): - conn = http_connect(node['ip'], node['port'], - node['device'], part, method, path, - headers=headers, query_string=query) - conn.node = node - with Timeout(self.app.node_timeout): - resp = conn.getresponse() - if not is_informational(resp.status) and \ - not is_server_error(resp.status): - return resp.status, resp.reason, resp.read() - elif resp.status == HTTP_INSUFFICIENT_STORAGE: - self.error_limit(node) - except (Exception, Timeout): - self.exception_occurred(node, self.server_type, - _('Trying to %(method)s %(path)s') % - {'method': method, 'path': path}) - - def make_requests(self, req, ring, part, method, path, headers, - query_string=''): - """ - Sends an HTTP request to multiple nodes and aggregates the results. - It attempts the primary nodes concurrently, then iterates over the - handoff nodes as needed. - - :param headers: a list of dicts, where each dict represents one - backend request that should be made. - :returns: a webob Response object - """ - start_nodes = ring.get_part_nodes(part) - nodes = self.iter_nodes(part, start_nodes, ring) - pile = GreenPile(len(start_nodes)) - for head in headers: - pile.spawn(self._make_request, nodes, part, method, path, - head, query_string, self.app.logger.thread_locals) - response = [resp for resp in pile if resp] - while len(response) < len(start_nodes): - response.append((HTTP_SERVICE_UNAVAILABLE, '', '')) - statuses, reasons, bodies = zip(*response) - return self.best_response(req, statuses, reasons, bodies, - '%s %s' % (self.server_type, req.method)) - - def best_response(self, req, statuses, reasons, bodies, server_type, - etag=None): - """ - Given a list of responses from several servers, choose the best to - return to the API. - - :param req: webob.Request object - :param statuses: list of statuses returned - :param reasons: list of reasons for each status - :param bodies: bodies of each response - :param server_type: type of server the responses came from - :param etag: etag - :returns: webob.Response object with the correct status, body, etc. set - """ - resp = Response(request=req) - if len(statuses): - for hundred in (HTTP_OK, HTTP_MULTIPLE_CHOICES, HTTP_BAD_REQUEST): - hstatuses = \ - [s for s in statuses if hundred <= s < hundred + 100] - if len(hstatuses) > len(statuses) / 2: - status = max(hstatuses) - status_index = statuses.index(status) - resp.status = '%s %s' % (status, reasons[status_index]) - resp.body = bodies[status_index] - resp.content_type = 'text/html' - if etag: - resp.headers['etag'] = etag.strip('"') - return resp - self.app.logger.error(_('%(type)s returning 503 for %(statuses)s'), - {'type': server_type, 'statuses': statuses}) - resp.status = '503 Internal Server Error' - return resp - - @public - def GET(self, req): - """Handler for HTTP GET requests.""" - return self.GETorHEAD(req, stats_type='GET') - - @public - def HEAD(self, req): - """Handler for HTTP HEAD requests.""" - return self.GETorHEAD(req, stats_type='HEAD') - - def _make_app_iter_reader(self, node, source, queue, logger_thread_locals): - """ - Reads from the source and places data in the queue. It expects - something else be reading from the queue and, if nothing does within - self.app.client_timeout seconds, the process will be aborted. - - :param node: The node dict that the source is connected to, for - logging/error-limiting purposes. - :param source: The httplib.Response object to read from. - :param queue: The eventlet.queue.Queue to place read source data into. - :param logger_thread_locals: The thread local values to be set on the - self.app.logger to retain transaction - logging information. - """ - self.app.logger.thread_locals = logger_thread_locals - success = True - try: - try: - while True: - with ChunkReadTimeout(self.app.node_timeout): - chunk = source.read(self.app.object_chunk_size) - if not chunk: - break - queue.put(chunk, timeout=self.app.client_timeout) - except Full: - self.app.logger.warn( - _('Client did not read from queue within %ss') % - self.app.client_timeout) - self.app.logger.increment('client_timeouts') - success = False - except (Exception, Timeout): - self.exception_occurred(node, _('Object'), - _('Trying to read during GET')) - success = False - finally: - # Ensure the queue getter gets a terminator. - queue.resize(2) - queue.put(success) - # Close-out the connection as best as possible. - if getattr(source, 'swift_conn', None): - try: - source.swift_conn.close() - except Exception: - pass - source.swift_conn = None - try: - while source.read(self.app.object_chunk_size): - pass - except Exception: - pass - try: - source.close() - except Exception: - pass - - def _make_app_iter(self, node, source, response): - """ - Returns an iterator over the contents of the source (via its read - func). There is also quite a bit of cleanup to ensure garbage - collection works and the underlying socket of the source is closed. - - :param response: The webob.Response object this iterator should be - assigned to via response.app_iter. - :param source: The httplib.Response object this iterator should read - from. - :param node: The node the source is reading from, for logging purposes. - """ - try: - try: - # Spawn reader to read from the source and place in the queue. - # We then drop any reference to the source or node, for garbage - # collection purposes. - queue = Queue(1) - spawn_n(self._make_app_iter_reader, node, source, queue, - self.app.logger.thread_locals) - source = node = None - while True: - chunk = queue.get(timeout=self.app.node_timeout) - if isinstance(chunk, bool): # terminator - success = chunk - if not success: - raise Exception(_('Failed to read all data' - ' from the source')) - break - yield chunk - except Empty: - raise ChunkReadTimeout() - except (GeneratorExit, Timeout): - self.app.logger.warn(_('Client disconnected on read')) - except Exception: - self.app.logger.exception(_('Trying to send to client')) - raise - finally: - response.app_iter = None - - def GETorHEAD_base(self, req, server_type, partition, nodes, path, - attempts): - """ - Base handler for HTTP GET or HEAD requests. - - :param req: webob.Request object - :param server_type: server type - :param partition: partition - :param nodes: nodes - :param path: path for the request - :param attempts: number of attempts to try - :returns: webob.Response object - """ - statuses = [] - reasons = [] - bodies = [] - source = None - newest = req.headers.get('x-newest', 'f').lower() in TRUE_VALUES - nodes = iter(nodes) - while len(statuses) < attempts: - try: - node = nodes.next() - except StopIteration: - break - if self.error_limited(node): - continue - try: - with ConnectionTimeout(self.app.conn_timeout): - headers = dict(req.headers) - headers['Connection'] = 'close' - conn = http_connect(node['ip'], node['port'], - node['device'], partition, req.method, path, - headers=headers, - query_string=req.query_string) - with Timeout(self.app.node_timeout): - possible_source = conn.getresponse() - # See NOTE: swift_conn at top of file about this. - possible_source.swift_conn = conn - except (Exception, Timeout): - self.exception_occurred(node, server_type, - _('Trying to %(method)s %(path)s') % - {'method': req.method, 'path': req.path}) - continue - if possible_source.status == HTTP_INSUFFICIENT_STORAGE: - self.error_limit(node) - continue - if is_success(possible_source.status) or \ - is_redirection(possible_source.status): - # 404 if we know we don't have a synced copy - if not float(possible_source.getheader('X-PUT-Timestamp', 1)): - statuses.append(HTTP_NOT_FOUND) - reasons.append('') - bodies.append('') - possible_source.read() - continue - if newest: - if source: - ts = float(source.getheader('x-put-timestamp') or - source.getheader('x-timestamp') or 0) - pts = float( - possible_source.getheader('x-put-timestamp') or - possible_source.getheader('x-timestamp') or 0) - if pts > ts: - source = possible_source - else: - source = possible_source - statuses.append(source.status) - reasons.append(source.reason) - bodies.append('') - continue - else: - source = possible_source - break - statuses.append(possible_source.status) - reasons.append(possible_source.reason) - bodies.append(possible_source.read()) - if is_server_error(possible_source.status): - self.error_occurred(node, _('ERROR %(status)d %(body)s ' \ - 'From %(type)s Server') % - {'status': possible_source.status, - 'body': bodies[-1][:1024], 'type': server_type}) - if source: - if req.method == 'GET' and \ - source.status in (HTTP_OK, HTTP_PARTIAL_CONTENT): - res = Response(request=req, conditional_response=True) - res.app_iter = self._make_app_iter(node, source, res) - # See NOTE: swift_conn at top of file about this. - res.swift_conn = source.swift_conn - update_headers(res, source.getheaders()) - # Used by container sync feature - if res.environ is None: - res.environ = dict() - res.environ['swift_x_timestamp'] = \ - source.getheader('x-timestamp') - update_headers(res, {'accept-ranges': 'bytes'}) - res.status = source.status - res.content_length = source.getheader('Content-Length') - if source.getheader('Content-Type'): - res.charset = None - res.content_type = source.getheader('Content-Type') - return res - elif is_success(source.status) or is_redirection(source.status): - res = status_map[source.status](request=req) - update_headers(res, source.getheaders()) - # Used by container sync feature - if res.environ is None: - res.environ = dict() - res.environ['swift_x_timestamp'] = \ - source.getheader('x-timestamp') - update_headers(res, {'accept-ranges': 'bytes'}) - res.content_length = source.getheader('Content-Length') - if source.getheader('Content-Type'): - res.charset = None - res.content_type = source.getheader('Content-Type') - return res - return self.best_response(req, statuses, reasons, bodies, - '%s %s' % (server_type, req.method)) - - -class ObjectController(Controller): - """WSGI controller for object requests.""" - server_type = _('Object') - - def __init__(self, app, account_name, container_name, object_name, - **kwargs): - Controller.__init__(self, app) - self.account_name = unquote(account_name) - self.container_name = unquote(container_name) - self.object_name = unquote(object_name) - - def _listing_iter(self, lcontainer, lprefix, env): - lpartition, lnodes = self.app.container_ring.get_nodes( - self.account_name, lcontainer) - marker = '' - while True: - lreq = Request.blank('i will be overridden by env', environ=env) - # Don't quote PATH_INFO, by WSGI spec - lreq.environ['PATH_INFO'] = \ - '/%s/%s' % (self.account_name, lcontainer) - lreq.environ['REQUEST_METHOD'] = 'GET' - lreq.environ['QUERY_STRING'] = \ - 'format=json&prefix=%s&marker=%s' % (quote(lprefix), - quote(marker)) - shuffle(lnodes) - lresp = self.GETorHEAD_base(lreq, _('Container'), - lpartition, lnodes, lreq.path_info, - len(lnodes)) - if 'swift.authorize' in env: - lreq.acl = lresp.headers.get('x-container-read') - aresp = env['swift.authorize'](lreq) - if aresp: - raise ListingIterNotAuthorized(aresp) - if lresp.status_int == HTTP_NOT_FOUND: - raise ListingIterNotFound() - elif not is_success(lresp.status_int): - raise ListingIterError() - if not lresp.body: - break - sublisting = json.loads(lresp.body) - if not sublisting: - break - marker = sublisting[-1]['name'] - for obj in sublisting: - yield obj - - def GETorHEAD(self, req, stats_type): - """Handle HTTP GET or HEAD requests.""" - start_time = time.time() - _junk, _junk, req.acl, _junk, _junk, object_versions = \ - self.container_info(self.account_name, self.container_name) - if 'swift.authorize' in req.environ: - aresp = req.environ['swift.authorize'](req) - if aresp: - self.app.logger.increment('auth_short_circuits') - return aresp - partition, nodes = self.app.object_ring.get_nodes( - self.account_name, self.container_name, self.object_name) - shuffle(nodes) - resp = self.GETorHEAD_base(req, _('Object'), partition, - self.iter_nodes(partition, nodes, self.app.object_ring), - req.path_info, len(nodes)) - # Whether we get a 416 Requested Range Not Satisfiable or not, - # we should request a manifest because size of manifest file - # can be not 0. After checking a manifest, redo the range request - # on the whole object. - if req.range: - req_range = req.range - req.range = None - resp2 = self.GETorHEAD_base(req, _('Object'), partition, - self.iter_nodes(partition, - nodes, - self.app.object_ring), - req.path_info, len(nodes)) - if 'x-object-manifest' not in resp2.headers: - self.app.logger.timing_since( - '%s.timing' % (stats_type,), start_time) - return resp - resp = resp2 - req.range = str(req_range) - - if 'x-object-manifest' in resp.headers: - lcontainer, lprefix = \ - resp.headers['x-object-manifest'].split('/', 1) - lcontainer = unquote(lcontainer) - lprefix = unquote(lprefix) - try: - listing = list(self._listing_iter(lcontainer, lprefix, - req.environ)) - except ListingIterNotFound: - self.app.logger.timing_since( - '%s.timing' % (stats_type,), start_time) - return HTTPNotFound(request=req) - except ListingIterNotAuthorized, err: - self.app.logger.increment('auth_short_circuits') - return err.aresp - except ListingIterError: - self.app.logger.increment('errors') - return HTTPServerError(request=req) - - if len(listing) > CONTAINER_LISTING_LIMIT: - resp = Response(headers=resp.headers, request=req, - conditional_response=True) - if req.method == 'HEAD': - # These shenanigans are because webob translates the HEAD - # request into a webob EmptyResponse for the body, which - # has a len, which eventlet translates as needing a - # content-length header added. So we call the original - # webob resp for the headers but return an empty iterator - # for the body. - - def head_response(environ, start_response): - resp(environ, start_response) - return iter([]) - - head_response.status_int = resp.status_int - self.app.logger.timing_since( - '%s.timing' % (stats_type,), start_time) - return head_response - else: - resp.app_iter = SegmentedIterable(self, lcontainer, - self._listing_iter(lcontainer, lprefix, req.environ), - resp) - - else: - # For objects with a reasonable number of segments, we'll serve - # them with a set content-length and computed etag. - if listing: - content_length = sum(o['bytes'] for o in listing) - last_modified = max(o['last_modified'] for o in listing) - last_modified = datetime(*map(int, re.split('[^\d]', - last_modified)[:-1])) - etag = md5( - ''.join(o['hash'] for o in listing)).hexdigest() - else: - content_length = 0 - last_modified = resp.last_modified - etag = md5().hexdigest() - resp = Response(headers=resp.headers, request=req, - conditional_response=True) - resp.app_iter = SegmentedIterable(self, lcontainer, listing, - resp) - resp.content_length = content_length - resp.last_modified = last_modified - resp.etag = etag - resp.headers['accept-ranges'] = 'bytes' - - self.app.logger.timing_since('%s.timing' % (stats_type,), start_time) - return resp - - @public - @delay_denial - def GET(self, req): - """Handler for HTTP GET requests.""" - return self.GETorHEAD(req, stats_type='GET') - - @public - @delay_denial - def HEAD(self, req): - """Handler for HTTP HEAD requests.""" - return self.GETorHEAD(req, stats_type='HEAD') - - @public - @delay_denial - def POST(self, req): - """HTTP POST request handler.""" - start_time = time.time() - if 'x-delete-after' in req.headers: - try: - x_delete_after = int(req.headers['x-delete-after']) - except ValueError: - self.app.logger.increment('errors') - return HTTPBadRequest(request=req, - content_type='text/plain', - body='Non-integer X-Delete-After') - req.headers['x-delete-at'] = '%d' % (time.time() + x_delete_after) - if self.app.object_post_as_copy: - req.method = 'PUT' - req.path_info = '/%s/%s/%s' % (self.account_name, - self.container_name, self.object_name) - req.headers['Content-Length'] = 0 - req.headers['X-Copy-From'] = quote('/%s/%s' % (self.container_name, - self.object_name)) - req.headers['X-Fresh-Metadata'] = 'true' - req.environ['swift_versioned_copy'] = True - resp = self.PUT(req, start_time=start_time, stats_type='POST') - # Older editions returned 202 Accepted on object POSTs, so we'll - # convert any 201 Created responses to that for compatibility with - # picky clients. - if resp.status_int != HTTP_CREATED: - return resp - return HTTPAccepted(request=req) - else: - error_response = check_metadata(req, 'object') - if error_response: - self.app.logger.increment('errors') - return error_response - container_partition, containers, _junk, req.acl, _junk, _junk = \ - self.container_info(self.account_name, self.container_name, - account_autocreate=self.app.account_autocreate) - if 'swift.authorize' in req.environ: - aresp = req.environ['swift.authorize'](req) - if aresp: - self.app.logger.increment('auth_short_circuits') - return aresp - if not containers: - self.app.logger.timing_since('POST.timing', start_time) - return HTTPNotFound(request=req) - if 'x-delete-at' in req.headers: - try: - x_delete_at = int(req.headers['x-delete-at']) - if x_delete_at < time.time(): - self.app.logger.increment('errors') - return HTTPBadRequest(body='X-Delete-At in past', - request=req, content_type='text/plain') - except ValueError: - self.app.logger.increment('errors') - return HTTPBadRequest(request=req, - content_type='text/plain', - body='Non-integer X-Delete-At') - delete_at_container = str(x_delete_at / - self.app.expiring_objects_container_divisor * - self.app.expiring_objects_container_divisor) - delete_at_part, delete_at_nodes = \ - self.app.container_ring.get_nodes( - self.app.expiring_objects_account, delete_at_container) - else: - delete_at_part = delete_at_nodes = None - partition, nodes = self.app.object_ring.get_nodes( - self.account_name, self.container_name, self.object_name) - req.headers['X-Timestamp'] = normalize_timestamp(time.time()) - headers = [] - for container in containers: - nheaders = dict(req.headers.iteritems()) - nheaders['Connection'] = 'close' - nheaders['X-Container-Host'] = '%(ip)s:%(port)s' % container - nheaders['X-Container-Partition'] = container_partition - nheaders['X-Container-Device'] = container['device'] - if delete_at_nodes: - node = delete_at_nodes.pop(0) - nheaders['X-Delete-At-Host'] = '%(ip)s:%(port)s' % node - nheaders['X-Delete-At-Partition'] = delete_at_part - nheaders['X-Delete-At-Device'] = node['device'] - headers.append(nheaders) - resp = self.make_requests(req, self.app.object_ring, partition, - 'POST', req.path_info, headers) - self.app.logger.timing_since('POST.timing', start_time) - return resp - - def _send_file(self, conn, path): - """Method for a file PUT coro""" - while True: - chunk = conn.queue.get() - if not conn.failed: - try: - with ChunkWriteTimeout(self.app.node_timeout): - conn.send(chunk) - except (Exception, ChunkWriteTimeout): - conn.failed = True - self.exception_occurred(conn.node, _('Object'), - _('Trying to write to %s') % path) - conn.queue.task_done() - - def _connect_put_node(self, nodes, part, path, headers, - logger_thread_locals): - """Method for a file PUT connect""" - self.app.logger.thread_locals = logger_thread_locals - for node in nodes: - try: - with ConnectionTimeout(self.app.conn_timeout): - conn = http_connect(node['ip'], node['port'], - node['device'], part, 'PUT', path, headers) - with Timeout(self.app.node_timeout): - resp = conn.getexpect() - if resp.status == HTTP_CONTINUE: - conn.node = node - return conn - elif resp.status == HTTP_INSUFFICIENT_STORAGE: - self.error_limit(node) - except: - self.exception_occurred(node, _('Object'), - _('Expect: 100-continue on %s') % path) - - @public - @delay_denial - def PUT(self, req, start_time=None, stats_type='PUT'): - """HTTP PUT request handler.""" - if not start_time: - start_time = time.time() - (container_partition, containers, _junk, req.acl, - req.environ['swift_sync_key'], object_versions) = \ - self.container_info(self.account_name, self.container_name, - account_autocreate=self.app.account_autocreate) - if 'swift.authorize' in req.environ: - aresp = req.environ['swift.authorize'](req) - if aresp: - self.app.logger.increment('auth_short_circuits') - return aresp - if not containers: - self.app.logger.timing_since( - '%s.timing' % (stats_type,), start_time) - return HTTPNotFound(request=req) - if 'x-delete-after' in req.headers: - try: - x_delete_after = int(req.headers['x-delete-after']) - except ValueError: - self.app.logger.increment('errors') - return HTTPBadRequest(request=req, - content_type='text/plain', - body='Non-integer X-Delete-After') - req.headers['x-delete-at'] = '%d' % (time.time() + x_delete_after) - if 'x-delete-at' in req.headers: - try: - x_delete_at = int(req.headers['x-delete-at']) - if x_delete_at < time.time(): - self.app.logger.increment('errors') - return HTTPBadRequest(body='X-Delete-At in past', - request=req, content_type='text/plain') - except ValueError: - self.app.logger.increment('errors') - return HTTPBadRequest(request=req, content_type='text/plain', - body='Non-integer X-Delete-At') - delete_at_container = str(x_delete_at / - self.app.expiring_objects_container_divisor * - self.app.expiring_objects_container_divisor) - delete_at_part, delete_at_nodes = \ - self.app.container_ring.get_nodes( - self.app.expiring_objects_account, delete_at_container) - else: - delete_at_part = delete_at_nodes = None - partition, nodes = self.app.object_ring.get_nodes( - self.account_name, self.container_name, self.object_name) - # do a HEAD request for container sync and checking object versions - if 'x-timestamp' in req.headers or (object_versions and not - req.environ.get('swift_versioned_copy')): - hreq = Request.blank(req.path_info, headers={'X-Newest': 'True'}, - environ={'REQUEST_METHOD': 'HEAD'}) - hresp = self.GETorHEAD_base(hreq, _('Object'), partition, nodes, - hreq.path_info, len(nodes)) - # Used by container sync feature - if 'x-timestamp' in req.headers: - try: - req.headers['X-Timestamp'] = \ - normalize_timestamp(float(req.headers['x-timestamp'])) - if hresp.environ and 'swift_x_timestamp' in hresp.environ and \ - float(hresp.environ['swift_x_timestamp']) >= \ - float(req.headers['x-timestamp']): - self.app.logger.timing_since( - '%.timing' % (stats_type,), start_time) - return HTTPAccepted(request=req) - except ValueError: - self.app.logger.increment('errors') - return HTTPBadRequest(request=req, content_type='text/plain', - body='X-Timestamp should be a UNIX timestamp float value; ' - 'was %r' % req.headers['x-timestamp']) - else: - req.headers['X-Timestamp'] = normalize_timestamp(time.time()) - # Sometimes the 'content-type' header exists, but is set to None. - content_type_manually_set = True - if not req.headers.get('content-type'): - guessed_type, _junk = mimetypes.guess_type(req.path_info) - req.headers['Content-Type'] = guessed_type or \ - 'application/octet-stream' - content_type_manually_set = False - error_response = check_object_creation(req, self.object_name) - if error_response: - self.app.logger.increment('errors') - return error_response - if object_versions and not req.environ.get('swift_versioned_copy'): - is_manifest = 'x-object-manifest' in req.headers or \ - 'x-object-manifest' in hresp.headers - if hresp.status_int != HTTP_NOT_FOUND and not is_manifest: - # This is a version manifest and needs to be handled - # differently. First copy the existing data to a new object, - # then write the data from this request to the version manifest - # object. - lcontainer = object_versions.split('/')[0] - prefix_len = '%03x' % len(self.object_name) - lprefix = prefix_len + self.object_name + '/' - ts_source = hresp.environ.get('swift_x_timestamp') - if ts_source is None: - ts_source = time.mktime(time.strptime( - hresp.headers['last-modified'], - '%a, %d %b %Y %H:%M:%S GMT')) - new_ts = normalize_timestamp(ts_source) - vers_obj_name = lprefix + new_ts - copy_headers = { - 'Destination': '%s/%s' % (lcontainer, vers_obj_name)} - copy_environ = {'REQUEST_METHOD': 'COPY', - 'swift_versioned_copy': True - } - copy_req = Request.blank(req.path_info, headers=copy_headers, - environ=copy_environ) - copy_resp = self.COPY(copy_req) - if is_client_error(copy_resp.status_int): - # missing container or bad permissions - return HTTPPreconditionFailed(request=req) - elif not is_success(copy_resp.status_int): - # could not copy the data, bail - return HTTPServiceUnavailable(request=req) - - reader = req.environ['wsgi.input'].read - data_source = iter(lambda: reader(self.app.client_chunk_size), '') - source_header = req.headers.get('X-Copy-From') - source_resp = None - if source_header: - source_header = unquote(source_header) - acct = req.path_info.split('/', 2)[1] - if isinstance(acct, unicode): - acct = acct.encode('utf-8') - if not source_header.startswith('/'): - source_header = '/' + source_header - source_header = '/' + acct + source_header - try: - src_container_name, src_obj_name = \ - source_header.split('/', 3)[2:] - except ValueError: - self.app.logger.increment('errors') - return HTTPPreconditionFailed(request=req, - body='X-Copy-From header must be of the form' - '<container name>/<object name>') - source_req = req.copy_get() - source_req.path_info = source_header - source_req.headers['X-Newest'] = 'true' - orig_obj_name = self.object_name - orig_container_name = self.container_name - self.object_name = src_obj_name - self.container_name = src_container_name - source_resp = self.GET(source_req) - if source_resp.status_int >= HTTP_MULTIPLE_CHOICES: - self.app.logger.timing_since( - '%s.timing' % (stats_type,), start_time) - return source_resp - self.object_name = orig_obj_name - self.container_name = orig_container_name - new_req = Request.blank(req.path_info, - environ=req.environ, headers=req.headers) - data_source = source_resp.app_iter - new_req.content_length = source_resp.content_length - if new_req.content_length is None: - # This indicates a transfer-encoding: chunked source object, - # which currently only happens because there are more than - # CONTAINER_LISTING_LIMIT segments in a segmented object. In - # this case, we're going to refuse to do the server-side copy. - self.app.logger.increment('errors') - return HTTPRequestEntityTooLarge(request=req) - new_req.etag = source_resp.etag - # we no longer need the X-Copy-From header - del new_req.headers['X-Copy-From'] - if not content_type_manually_set: - new_req.headers['Content-Type'] = \ - source_resp.headers['Content-Type'] - if new_req.headers.get('x-fresh-metadata', 'false').lower() \ - not in TRUE_VALUES: - for k, v in source_resp.headers.items(): - if k.lower().startswith('x-object-meta-'): - new_req.headers[k] = v - for k, v in req.headers.items(): - if k.lower().startswith('x-object-meta-'): - new_req.headers[k] = v - req = new_req - node_iter = self.iter_nodes(partition, nodes, self.app.object_ring) - pile = GreenPile(len(nodes)) - for container in containers: - nheaders = dict(req.headers.iteritems()) - nheaders['Connection'] = 'close' - nheaders['X-Container-Host'] = '%(ip)s:%(port)s' % container - nheaders['X-Container-Partition'] = container_partition - nheaders['X-Container-Device'] = container['device'] - nheaders['Expect'] = '100-continue' - if delete_at_nodes: - node = delete_at_nodes.pop(0) - nheaders['X-Delete-At-Host'] = '%(ip)s:%(port)s' % node - nheaders['X-Delete-At-Partition'] = delete_at_part - nheaders['X-Delete-At-Device'] = node['device'] - pile.spawn(self._connect_put_node, node_iter, partition, - req.path_info, nheaders, self.app.logger.thread_locals) - conns = [conn for conn in pile if conn] - if len(conns) <= len(nodes) / 2: - self.app.logger.error( - _('Object PUT returning 503, %(conns)s/%(nodes)s ' - 'required connections'), - {'conns': len(conns), 'nodes': len(nodes) // 2 + 1}) - self.app.logger.increment('errors') - return HTTPServiceUnavailable(request=req) - chunked = req.headers.get('transfer-encoding') - bytes_transferred = 0 - try: - with ContextPool(len(nodes)) as pool: - for conn in conns: - conn.failed = False - conn.queue = Queue(self.app.put_queue_depth) - pool.spawn(self._send_file, conn, req.path) - while True: - with ChunkReadTimeout(self.app.client_timeout): - try: - chunk = next(data_source) - except StopIteration: - if chunked: - [conn.queue.put('0\r\n\r\n') for conn in conns] - break - bytes_transferred += len(chunk) - if bytes_transferred > MAX_FILE_SIZE: - self.app.logger.increment('errors') - return HTTPRequestEntityTooLarge(request=req) - for conn in list(conns): - if not conn.failed: - conn.queue.put('%x\r\n%s\r\n' % (len(chunk), chunk) - if chunked else chunk) - else: - conns.remove(conn) - if len(conns) <= len(nodes) / 2: - self.app.logger.error(_('Object PUT exceptions during' - ' send, %(conns)s/%(nodes)s required connections'), - {'conns': len(conns), 'nodes': len(nodes) / 2 + 1}) - self.app.logger.increment('errors') - return HTTPServiceUnavailable(request=req) - for conn in conns: - if conn.queue.unfinished_tasks: - conn.queue.join() - conns = [conn for conn in conns if not conn.failed] - except ChunkReadTimeout, err: - self.app.logger.warn( - _('ERROR Client read timeout (%ss)'), err.seconds) - self.app.logger.increment('client_timeouts') - return HTTPRequestTimeout(request=req) - except (Exception, Timeout): - self.app.logger.exception( - _('ERROR Exception causing client disconnect')) - self.app.logger.increment('client_disconnects') - self.app.logger.timing_since( - '%s.timing' % (stats_type,), start_time) - return HTTPClientDisconnect(request=req) - if req.content_length and bytes_transferred < req.content_length: - req.client_disconnect = True - self.app.logger.warn( - _('Client disconnected without sending enough data')) - self.app.logger.increment('client_disconnects') - self.app.logger.timing_since( - '%s.timing' % (stats_type,), start_time) - return HTTPClientDisconnect(request=req) - statuses = [] - reasons = [] - bodies = [] - etags = set() - for conn in conns: - try: - with Timeout(self.app.node_timeout): - response = conn.getresponse() - statuses.append(response.status) - reasons.append(response.reason) - bodies.append(response.read()) - if response.status >= HTTP_INTERNAL_SERVER_ERROR: - self.error_occurred(conn.node, - _('ERROR %(status)d %(body)s From Object Server ' \ - 're: %(path)s') % {'status': response.status, - 'body': bodies[-1][:1024], 'path': req.path}) - elif is_success(response.status): - etags.add(response.getheader('etag').strip('"')) - except (Exception, Timeout): - self.exception_occurred(conn.node, _('Object'), - _('Trying to get final status of PUT to %s') % req.path) - if len(etags) > 1: - self.app.logger.error( - _('Object servers returned %s mismatched etags'), len(etags)) - self.app.logger.increment('errors') - return HTTPServerError(request=req) - etag = len(etags) and etags.pop() or None - while len(statuses) < len(nodes): - statuses.append(HTTP_SERVICE_UNAVAILABLE) - reasons.append('') - bodies.append('') - resp = self.best_response(req, statuses, reasons, bodies, - _('Object PUT'), etag=etag) - if source_header: - resp.headers['X-Copied-From'] = quote( - source_header.split('/', 2)[2]) - if 'last-modified' in source_resp.headers: - resp.headers['X-Copied-From-Last-Modified'] = \ - source_resp.headers['last-modified'] - for k, v in req.headers.items(): - if k.lower().startswith('x-object-meta-'): - resp.headers[k] = v - resp.last_modified = float(req.headers['X-Timestamp']) - self.app.logger.timing_since('%s.timing' % (stats_type,), start_time) - return resp - - @public - @delay_denial - def DELETE(self, req): - """HTTP DELETE request handler.""" - start_time = time.time() - (container_partition, containers, _junk, req.acl, - req.environ['swift_sync_key'], object_versions) = \ - self.container_info(self.account_name, self.container_name) - if object_versions: - # this is a version manifest and needs to be handled differently - lcontainer = object_versions.split('/')[0] - prefix_len = '%03x' % len(self.object_name) - lprefix = prefix_len + self.object_name + '/' - last_item = None - try: - for last_item in self._listing_iter(lcontainer, lprefix, - req.environ): - pass - except ListingIterNotFound: - # no worries, last_item is None - pass - except ListingIterNotAuthorized, err: - self.app.logger.increment('auth_short_circuits') - return err.aresp - except ListingIterError: - self.app.logger.increment('errors') - return HTTPServerError(request=req) - if last_item: - # there are older versions so copy the previous version to the - # current object and delete the previous version - orig_container = self.container_name - orig_obj = self.object_name - self.container_name = lcontainer - self.object_name = last_item['name'] - copy_path = '/' + self.account_name + '/' + \ - self.container_name + '/' + self.object_name - copy_headers = {'X-Newest': 'True', - 'Destination': orig_container + '/' + orig_obj - } - copy_environ = {'REQUEST_METHOD': 'COPY', - 'swift_versioned_copy': True - } - creq = Request.blank(copy_path, headers=copy_headers, - environ=copy_environ) - copy_resp = self.COPY(creq) - if is_client_error(copy_resp.status_int): - # some user error, maybe permissions - return HTTPPreconditionFailed(request=req) - elif not is_success(copy_resp.status_int): - # could not copy the data, bail - return HTTPServiceUnavailable(request=req) - # reset these because the COPY changed them - self.container_name = lcontainer - self.object_name = last_item['name'] - new_del_req = Request.blank(copy_path, environ=req.environ) - (container_partition, containers, - _junk, new_del_req.acl, _junk, _junk) = \ - self.container_info(self.account_name, self.container_name) - new_del_req.path_info = copy_path - req = new_del_req - if 'swift.authorize' in req.environ: - aresp = req.environ['swift.authorize'](req) - if aresp: - self.app.logger.increment('auth_short_circuits') - return aresp - if not containers: - self.app.logger.timing_since('DELETE.timing', start_time) - return HTTPNotFound(request=req) - partition, nodes = self.app.object_ring.get_nodes( - self.account_name, self.container_name, self.object_name) - # Used by container sync feature - if 'x-timestamp' in req.headers: - try: - req.headers['X-Timestamp'] = \ - normalize_timestamp(float(req.headers['x-timestamp'])) - except ValueError: - self.app.logger.increment('errors') - return HTTPBadRequest(request=req, content_type='text/plain', - body='X-Timestamp should be a UNIX timestamp float value; ' - 'was %r' % req.headers['x-timestamp']) - else: - req.headers['X-Timestamp'] = normalize_timestamp(time.time()) - headers = [] - for container in containers: - nheaders = dict(req.headers.iteritems()) - nheaders['Connection'] = 'close' - nheaders['X-Container-Host'] = '%(ip)s:%(port)s' % container - nheaders['X-Container-Partition'] = container_partition - nheaders['X-Container-Device'] = container['device'] - headers.append(nheaders) - resp = self.make_requests(req, self.app.object_ring, - partition, 'DELETE', req.path_info, headers) - self.app.logger.timing_since('DELETE.timing', start_time) - return resp - - @public - @delay_denial - def COPY(self, req): - """HTTP COPY request handler.""" - start_time = time.time() - dest = req.headers.get('Destination') - if not dest: - self.app.logger.increment('errors') - return HTTPPreconditionFailed(request=req, - body='Destination header required') - dest = unquote(dest) - if not dest.startswith('/'): - dest = '/' + dest - try: - _junk, dest_container, dest_object = dest.split('/', 2) - except ValueError: - self.app.logger.increment('errors') - return HTTPPreconditionFailed(request=req, - body='Destination header must be of the form ' - '<container name>/<object name>') - source = '/' + self.container_name + '/' + self.object_name - self.container_name = dest_container - self.object_name = dest_object - # re-write the existing request as a PUT instead of creating a new one - # since this one is already attached to the posthooklogger - req.method = 'PUT' - req.path_info = '/' + self.account_name + dest - req.headers['Content-Length'] = 0 - req.headers['X-Copy-From'] = quote(source) - del req.headers['Destination'] - return self.PUT(req, start_time=start_time, stats_type='COPY') - - -class ContainerController(Controller): - """WSGI controller for container requests""" - server_type = _('Container') - - # Ensure these are all lowercase - pass_through_headers = ['x-container-read', 'x-container-write', - 'x-container-sync-key', 'x-container-sync-to', - 'x-versions-location'] - - def __init__(self, app, account_name, container_name, **kwargs): - Controller.__init__(self, app) - self.account_name = unquote(account_name) - self.container_name = unquote(container_name) - - def clean_acls(self, req): - if 'swift.clean_acl' in req.environ: - for header in ('x-container-read', 'x-container-write'): - if header in req.headers: - try: - req.headers[header] = \ - req.environ['swift.clean_acl'](header, - req.headers[header]) - except ValueError, err: - return HTTPBadRequest(request=req, body=str(err)) - return None - - def GETorHEAD(self, req, stats_type): - """Handler for HTTP GET/HEAD requests.""" - start_time = time.time() - if not self.account_info(self.account_name)[1]: - self.app.logger.timing_since( - '%s.timing' % (stats_type,), start_time) - return HTTPNotFound(request=req) - part, nodes = self.app.container_ring.get_nodes( - self.account_name, self.container_name) - shuffle(nodes) - resp = self.GETorHEAD_base(req, _('Container'), part, nodes, - req.path_info, len(nodes)) - - if self.app.memcache: - # set the memcache container size for ratelimiting - cache_key = get_container_memcache_key(self.account_name, - self.container_name) - self.app.memcache.set(cache_key, - {'status': resp.status_int, - 'read_acl': resp.headers.get('x-container-read'), - 'write_acl': resp.headers.get('x-container-write'), - 'sync_key': resp.headers.get('x-container-sync-key'), - 'container_size': resp.headers.get('x-container-object-count'), - 'versions': resp.headers.get('x-versions-location')}, - timeout=self.app.recheck_container_existence) - - if 'swift.authorize' in req.environ: - req.acl = resp.headers.get('x-container-read') - aresp = req.environ['swift.authorize'](req) - if aresp: - self.app.logger.increment('auth_short_circuits') - return aresp - if not req.environ.get('swift_owner', False): - for key in ('x-container-read', 'x-container-write', - 'x-container-sync-key', 'x-container-sync-to'): - if key in resp.headers: - del resp.headers[key] - self.app.logger.timing_since('%s.timing' % (stats_type,), start_time) - return resp - - @public - @delay_denial - def GET(self, req): - """Handler for HTTP GET requests.""" - return self.GETorHEAD(req, stats_type='GET') - - @public - @delay_denial - def HEAD(self, req): - """Handler for HTTP HEAD requests.""" - return self.GETorHEAD(req, stats_type='HEAD') - - @public - def PUT(self, req): - """HTTP PUT request handler.""" - start_time = time.time() - error_response = \ - self.clean_acls(req) or check_metadata(req, 'container') - if error_response: - self.app.logger.increment('errors') - return error_response - if len(self.container_name) > MAX_CONTAINER_NAME_LENGTH: - resp = HTTPBadRequest(request=req) - resp.body = 'Container name length of %d longer than %d' % \ - (len(self.container_name), MAX_CONTAINER_NAME_LENGTH) - self.app.logger.increment('errors') - return resp - account_partition, accounts, container_count = \ - self.account_info(self.account_name, - autocreate=self.app.account_autocreate) - if self.app.max_containers_per_account > 0 and \ - container_count >= self.app.max_containers_per_account and \ - self.account_name not in self.app.max_containers_whitelist: - resp = HTTPForbidden(request=req) - resp.body = 'Reached container limit of %s' % \ - self.app.max_containers_per_account - return resp - if not accounts: - self.app.logger.timing_since('PUT.timing', start_time) - return HTTPNotFound(request=req) - container_partition, containers = self.app.container_ring.get_nodes( - self.account_name, self.container_name) - headers = [] - for account in accounts: - nheaders = {'X-Timestamp': normalize_timestamp(time.time()), - 'x-trans-id': self.trans_id, - 'X-Account-Host': '%(ip)s:%(port)s' % account, - 'X-Account-Partition': account_partition, - 'X-Account-Device': account['device'], - 'Connection': 'close'} - self.transfer_headers(req.headers, nheaders) - headers.append(nheaders) - if self.app.memcache: - cache_key = get_container_memcache_key(self.account_name, - self.container_name) - self.app.memcache.delete(cache_key) - resp = self.make_requests(req, self.app.container_ring, - container_partition, 'PUT', req.path_info, headers) - self.app.logger.timing_since('PUT.timing', start_time) - return resp - - @public - def POST(self, req): - """HTTP POST request handler.""" - start_time = time.time() - error_response = \ - self.clean_acls(req) or check_metadata(req, 'container') - if error_response: - self.app.logger.increment('errors') - return error_response - account_partition, accounts, container_count = \ - self.account_info(self.account_name, - autocreate=self.app.account_autocreate) - if not accounts: - self.app.logger.timing_since('POST.timing', start_time) - return HTTPNotFound(request=req) - container_partition, containers = self.app.container_ring.get_nodes( - self.account_name, self.container_name) - headers = {'X-Timestamp': normalize_timestamp(time.time()), - 'x-trans-id': self.trans_id, - 'Connection': 'close'} - self.transfer_headers(req.headers, headers) - if self.app.memcache: - cache_key = get_container_memcache_key(self.account_name, - self.container_name) - self.app.memcache.delete(cache_key) - resp = self.make_requests(req, self.app.container_ring, - container_partition, 'POST', req.path_info, - [headers] * len(containers)) - self.app.logger.timing_since('POST.timing', start_time) - return resp - - @public - def DELETE(self, req): - """HTTP DELETE request handler.""" - start_time = time.time() - account_partition, accounts, container_count = \ - self.account_info(self.account_name) - if not accounts: - self.app.logger.timing_since('DELETE.timing', start_time) - return HTTPNotFound(request=req) - container_partition, containers = self.app.container_ring.get_nodes( - self.account_name, self.container_name) - headers = [] - for account in accounts: - headers.append({'X-Timestamp': normalize_timestamp(time.time()), - 'X-Trans-Id': self.trans_id, - 'X-Account-Host': '%(ip)s:%(port)s' % account, - 'X-Account-Partition': account_partition, - 'X-Account-Device': account['device'], - 'Connection': 'close'}) - if self.app.memcache: - cache_key = get_container_memcache_key(self.account_name, - self.container_name) - self.app.memcache.delete(cache_key) - resp = self.make_requests(req, self.app.container_ring, - container_partition, 'DELETE', req.path_info, headers) - # Indicates no server had the container - self.app.logger.timing_since('DELETE.timing', start_time) - if resp.status_int == HTTP_ACCEPTED: - return HTTPNotFound(request=req) - return resp - - -class AccountController(Controller): - """WSGI controller for account requests""" - server_type = _('Account') - - def __init__(self, app, account_name, **kwargs): - Controller.__init__(self, app) - self.account_name = unquote(account_name) - - def GETorHEAD(self, req, stats_type): - """Handler for HTTP GET/HEAD requests.""" - start_time = time.time() - partition, nodes = self.app.account_ring.get_nodes(self.account_name) - shuffle(nodes) - resp = self.GETorHEAD_base(req, _('Account'), partition, nodes, - req.path_info.rstrip('/'), len(nodes)) - if resp.status_int == HTTP_NOT_FOUND and self.app.account_autocreate: - if len(self.account_name) > MAX_ACCOUNT_NAME_LENGTH: - resp = HTTPBadRequest(request=req) - resp.body = 'Account name length of %d longer than %d' % \ - (len(self.account_name), MAX_ACCOUNT_NAME_LENGTH) - self.app.logger.timing_since( - '%s.timing' % (stats_type,), start_time) - return resp - headers = {'X-Timestamp': normalize_timestamp(time.time()), - 'X-Trans-Id': self.trans_id, - 'Connection': 'close'} - resp = self.make_requests( - Request.blank('/v1/' + self.account_name), - self.app.account_ring, partition, 'PUT', - '/' + self.account_name, [headers] * len(nodes)) - if not is_success(resp.status_int): - self.app.logger.warning('Could not autocreate account %r' % - self.account_name) - return resp - resp = self.GETorHEAD_base(req, _('Account'), partition, nodes, - req.path_info.rstrip('/'), len(nodes)) - self.app.logger.timing_since('%s.timing' % (stats_type,), start_time) - return resp - - @public - def PUT(self, req): - """HTTP PUT request handler.""" - start_time = time.time() - if not self.app.allow_account_management: - self.app.logger.timing_since('PUT.timing', start_time) - return HTTPMethodNotAllowed(request=req) - error_response = check_metadata(req, 'account') - if error_response: - self.app.logger.increment('errors') - return error_response - if len(self.account_name) > MAX_ACCOUNT_NAME_LENGTH: - resp = HTTPBadRequest(request=req) - resp.body = 'Account name length of %d longer than %d' % \ - (len(self.account_name), MAX_ACCOUNT_NAME_LENGTH) - self.app.logger.increment('errors') - return resp - account_partition, accounts = \ - self.app.account_ring.get_nodes(self.account_name) - headers = {'X-Timestamp': normalize_timestamp(time.time()), - 'x-trans-id': self.trans_id, - 'Connection': 'close'} - self.transfer_headers(req.headers, headers) - if self.app.memcache: - self.app.memcache.delete('account%s' % req.path_info.rstrip('/')) - resp = self.make_requests(req, self.app.account_ring, - account_partition, 'PUT', req.path_info, [headers] * len(accounts)) - self.app.logger.timing_since('PUT.timing', start_time) - return resp - - @public - def POST(self, req): - """HTTP POST request handler.""" - start_time = time.time() - error_response = check_metadata(req, 'account') - if error_response: - self.app.logger.increment('errors') - return error_response - account_partition, accounts = \ - self.app.account_ring.get_nodes(self.account_name) - headers = {'X-Timestamp': normalize_timestamp(time.time()), - 'X-Trans-Id': self.trans_id, - 'Connection': 'close'} - self.transfer_headers(req.headers, headers) - if self.app.memcache: - self.app.memcache.delete('account%s' % req.path_info.rstrip('/')) - resp = self.make_requests(req, self.app.account_ring, - account_partition, 'POST', req.path_info, - [headers] * len(accounts)) - if resp.status_int == HTTP_NOT_FOUND and self.app.account_autocreate: - if len(self.account_name) > MAX_ACCOUNT_NAME_LENGTH: - resp = HTTPBadRequest(request=req) - resp.body = 'Account name length of %d longer than %d' % \ - (len(self.account_name), MAX_ACCOUNT_NAME_LENGTH) - self.app.logger.increment('errors') - return resp - resp = self.make_requests( - Request.blank('/v1/' + self.account_name), - self.app.account_ring, account_partition, 'PUT', - '/' + self.account_name, [headers] * len(accounts)) - if not is_success(resp.status_int): - self.app.logger.warning('Could not autocreate account %r' % - self.account_name) - return resp - self.app.logger.timing_since('POST.timing', start_time) - return resp - - @public - def DELETE(self, req): - """HTTP DELETE request handler.""" - start_time = time.time() - if not self.app.allow_account_management: - self.app.logger.timing_since('DELETE.timing', start_time) - return HTTPMethodNotAllowed(request=req) - account_partition, accounts = \ - self.app.account_ring.get_nodes(self.account_name) - headers = {'X-Timestamp': normalize_timestamp(time.time()), - 'X-Trans-Id': self.trans_id, - 'Connection': 'close'} - if self.app.memcache: - self.app.memcache.delete('account%s' % req.path_info.rstrip('/')) - resp = self.make_requests(req, self.app.account_ring, - account_partition, 'DELETE', req.path_info, - [headers] * len(accounts)) - self.app.logger.timing_since('DELETE.timing', start_time) - return resp +from swift.common.utils import cache_from_env, get_logger, \ + get_remote_client, split_path, TRUE_VALUES +from swift.common.constraints import check_utf8 +from swift.proxy.controllers import AccountController, ObjectController, \ + ContainerController, Controller class BaseApplication(object): diff --git a/test/unit/common/middleware/test_ratelimit.py b/test/unit/common/middleware/test_ratelimit.py index bf5973a9f..3db4e01bd 100644 --- a/test/unit/common/middleware/test_ratelimit.py +++ b/test/unit/common/middleware/test_ratelimit.py @@ -22,7 +22,7 @@ from webob import Request from test.unit import FakeLogger from swift.common.middleware import ratelimit -from swift.proxy.server import get_container_memcache_key +from swift.proxy.controllers.base import get_container_memcache_key from swift.common.memcached import MemcacheConnectionError diff --git a/test/unit/proxy/test_server.py b/test/unit/proxy/test_server.py index 6eddf9e15..5a2fd5eea 100644 --- a/test/unit/proxy/test_server.py +++ b/test/unit/proxy/test_server.py @@ -43,10 +43,15 @@ from swift.account import server as account_server from swift.container import server as container_server from swift.obj import server as object_server from swift.common import ring +from swift.common.exceptions import ChunkReadTimeout from swift.common.constraints import MAX_META_NAME_LENGTH, \ MAX_META_VALUE_LENGTH, MAX_META_COUNT, MAX_META_OVERALL_SIZE, MAX_FILE_SIZE from swift.common.utils import mkdirs, normalize_timestamp, NullLogger from swift.common.wsgi import monkey_patch_mimetools +from swift.proxy.controllers.obj import SegmentedIterable +from swift.proxy.controllers.base import get_container_memcache_key, \ + get_account_memcache_key +import swift.proxy.controllers # mocks logging.getLogger().addHandler(logging.StreamHandler(sys.stdout)) @@ -67,7 +72,8 @@ def setup(): mkdirs(os.path.join(_testdir, 'sda1', 'tmp')) mkdirs(os.path.join(_testdir, 'sdb1')) mkdirs(os.path.join(_testdir, 'sdb1', 'tmp')) - _orig_container_listing_limit = proxy_server.CONTAINER_LISTING_LIMIT + _orig_container_listing_limit = \ + swift.proxy.controllers.obj.CONTAINER_LISTING_LIMIT conf = {'devices': _testdir, 'swift_dir': _testdir, 'mount_check': 'false', 'allowed_headers': 'content-encoding, x-object-manifest, content-disposition, foo', @@ -122,8 +128,8 @@ def setup(): ts = normalize_timestamp(time()) partition, nodes = prosrv.account_ring.get_nodes('a') for node in nodes: - conn = proxy_server.http_connect(node['ip'], node['port'], - node['device'], partition, 'PUT', '/a', + conn = swift.proxy.controllers.obj.http_connect(node['ip'], + node['port'], node['device'], partition, 'PUT', '/a', {'X-Timestamp': ts, 'x-trans-id': 'test'}) resp = conn.getresponse() assert(resp.status == 201) @@ -142,7 +148,8 @@ def setup(): def teardown(): for server in _test_coros: server.kill() - proxy_server.CONTAINER_LISTING_LIMIT = _orig_container_listing_limit + swift.proxy.controllers.obj.CONTAINER_LISTING_LIMIT = \ + _orig_container_listing_limit rmtree(os.path.dirname(_testdir)) @@ -311,13 +318,36 @@ class FakeMemcacheReturnsNone(FakeMemcache): @contextmanager def save_globals(): - orig_http_connect = getattr(proxy_server, 'http_connect', None) + orig_http_connect = getattr(swift.proxy.controllers.base, 'http_connect', + None) orig_account_info = getattr(proxy_server.Controller, 'account_info', None) try: yield True finally: - proxy_server.http_connect = orig_http_connect proxy_server.Controller.account_info = orig_account_info + proxy_server.http_connect = orig_http_connect + swift.proxy.controllers.base.http_connect = orig_http_connect + swift.proxy.controllers.obj.http_connect = orig_http_connect + swift.proxy.controllers.account.http_connect = orig_http_connect + swift.proxy.controllers.container.http_connect = orig_http_connect + + +def set_http_connect(*args, **kwargs): + new_connect = fake_http_connect(*args, **kwargs) + proxy_server.http_connect = new_connect + swift.proxy.controllers.base.http_connect = new_connect + swift.proxy.controllers.obj.http_connect = new_connect + swift.proxy.controllers.account.http_connect = new_connect + swift.proxy.controllers.container.http_connect = new_connect + + +def set_shuffle(): + shuffle = lambda l: None + proxy_server.shuffle = shuffle + swift.proxy.controllers.base.shuffle = shuffle + swift.proxy.controllers.obj.shuffle = shuffle + swift.proxy.controllers.account.shuffle = shuffle + swift.proxy.controllers.container.shuffle = shuffle # tests @@ -349,11 +379,10 @@ class TestController(unittest.TestCase): def test_make_requests(self): with save_globals(): - proxy_server.http_connect = fake_http_connect(200) + set_http_connect(200) partition, nodes, count = \ self.controller.account_info(self.account) - proxy_server.http_connect = fake_http_connect(201, - raise_timeout_exc=True) + set_http_connect(201, raise_timeout_exc=True) self.controller._make_request( nodes, partition, 'POST', '/', '', '', self.controller.app.logger.thread_locals) @@ -361,17 +390,17 @@ class TestController(unittest.TestCase): # tests if 200 is cached and used def test_account_info_200(self): with save_globals(): - proxy_server.http_connect = fake_http_connect(200) + set_http_connect(200) partition, nodes, count = \ self.controller.account_info(self.account) self.check_account_info_return(partition, nodes) self.assertEquals(count, 12345) - cache_key = proxy_server.get_account_memcache_key(self.account) + cache_key = get_account_memcache_key(self.account) self.assertEquals({'status': 200, 'container_count': 12345}, self.memcache.get(cache_key)) - proxy_server.http_connect = fake_http_connect() + set_http_connect() partition, nodes, count = \ self.controller.account_info(self.account) self.check_account_info_return(partition, nodes) @@ -380,17 +409,17 @@ class TestController(unittest.TestCase): # tests if 404 is cached and used def test_account_info_404(self): with save_globals(): - proxy_server.http_connect = fake_http_connect(404, 404, 404) + set_http_connect(404, 404, 404) partition, nodes, count = \ self.controller.account_info(self.account) self.check_account_info_return(partition, nodes, True) self.assertEquals(count, None) - cache_key = proxy_server.get_account_memcache_key(self.account) + cache_key = get_account_memcache_key(self.account) self.assertEquals({'status': 404, 'container_count': 0}, self.memcache.get(cache_key)) - proxy_server.http_connect = fake_http_connect() + set_http_connect() partition, nodes, count = \ self.controller.account_info(self.account) self.check_account_info_return(partition, nodes, True) @@ -399,7 +428,7 @@ class TestController(unittest.TestCase): # tests if some http status codes are not cached def test_account_info_no_cache(self): def test(*status_list): - proxy_server.http_connect = fake_http_connect(*status_list) + set_http_connect(*status_list) partition, nodes, count = \ self.controller.account_info(self.account) self.assertEqual(len(self.memcache.keys()), 0) @@ -415,40 +444,35 @@ class TestController(unittest.TestCase): def test_account_info_account_autocreate(self): with save_globals(): self.memcache.store = {} - proxy_server.http_connect = \ - fake_http_connect(404, 404, 404, 201, 201, 201) + set_http_connect(404, 404, 404, 201, 201, 201) partition, nodes, count = \ self.controller.account_info(self.account, autocreate=False) self.check_account_info_return(partition, nodes, is_none=True) self.assertEquals(count, None) self.memcache.store = {} - proxy_server.http_connect = \ - fake_http_connect(404, 404, 404, 201, 201, 201) + set_http_connect(404, 404, 404, 201, 201, 201) partition, nodes, count = \ self.controller.account_info(self.account) self.check_account_info_return(partition, nodes, is_none=True) self.assertEquals(count, None) self.memcache.store = {} - proxy_server.http_connect = \ - fake_http_connect(404, 404, 404, 201, 201, 201) + set_http_connect(404, 404, 404, 201, 201, 201) partition, nodes, count = \ self.controller.account_info(self.account, autocreate=True) self.check_account_info_return(partition, nodes) self.assertEquals(count, 0) self.memcache.store = {} - proxy_server.http_connect = \ - fake_http_connect(404, 404, 404, 503, 201, 201) + set_http_connect(404, 404, 404, 503, 201, 201) partition, nodes, count = \ self.controller.account_info(self.account, autocreate=True) self.check_account_info_return(partition, nodes) self.assertEquals(count, 0) self.memcache.store = {} - proxy_server.http_connect = \ - fake_http_connect(404, 404, 404, 503, 201, 503) + set_http_connect(404, 404, 404, 503, 201, 503) exc = None partition, nodes, count = \ self.controller.account_info(self.account, autocreate=True) @@ -504,19 +528,18 @@ class TestController(unittest.TestCase): headers = {'x-container-read': self.read_acl, 'x-container-write': self.write_acl} proxy_server.Controller.account_info = account_info - proxy_server.http_connect = fake_http_connect(200, - headers=headers) + set_http_connect(200, headers=headers) ret = self.controller.container_info(self.account, self.container) self.check_container_info_return(ret) - cache_key = proxy_server.get_container_memcache_key(self.account, + cache_key = get_container_memcache_key(self.account, self.container) cache_value = self.memcache.get(cache_key) self.assertTrue(isinstance(cache_value, dict)) self.assertEquals(200, cache_value.get('status')) - proxy_server.http_connect = fake_http_connect() + set_http_connect() ret = self.controller.container_info(self.account, self.container) self.check_container_info_return(ret) @@ -528,18 +551,18 @@ class TestController(unittest.TestCase): with save_globals(): proxy_server.Controller.account_info = account_info - proxy_server.http_connect = fake_http_connect(404, 404, 404) + set_http_connect(404, 404, 404) ret = self.controller.container_info(self.account, self.container) self.check_container_info_return(ret, True) - cache_key = proxy_server.get_container_memcache_key(self.account, + cache_key = get_container_memcache_key(self.account, self.container) cache_value = self.memcache.get(cache_key) self.assertTrue(isinstance(cache_value, dict)) self.assertEquals(404, cache_value.get('status')) - proxy_server.http_connect = fake_http_connect() + set_http_connect() ret = self.controller.container_info(self.account, self.container) self.check_container_info_return(ret, True) @@ -547,7 +570,7 @@ class TestController(unittest.TestCase): # tests if some http status codes are not cached def test_container_info_no_cache(self): def test(*status_list): - proxy_server.http_connect = fake_http_connect(*status_list) + set_http_connect(*status_list) ret = self.controller.container_info(self.account, self.container) self.assertEqual(len(self.memcache.keys()), 0) @@ -598,7 +621,7 @@ class TestProxyServer(unittest.TestCase): def authorize(req): called[0] = True with save_globals(): - proxy_server.http_connect = fake_http_connect(200) + set_http_connect(200) app = proxy_server.Application(None, FakeMemcache(), account_ring=FakeRing(), container_ring=FakeRing(), object_ring=FakeRing()) @@ -662,16 +685,13 @@ class TestObjectController(unittest.TestCase): object_ring=FakeRing()) monkey_patch_mimetools() - def tearDown(self): - proxy_server.CONTAINER_LISTING_LIMIT = _orig_container_listing_limit - def assert_status_map(self, method, statuses, expected, raise_exc=False): with save_globals(): kwargs = {} if raise_exc: kwargs['raise_exc'] = raise_exc - proxy_server.http_connect = fake_http_connect(*statuses, **kwargs) + set_http_connect(*statuses, **kwargs) self.app.memcache.store = {} req = Request.blank('/a/c/o', headers={'Content-Length': '0', 'Content-Type': 'text/plain'}) @@ -680,7 +700,7 @@ class TestObjectController(unittest.TestCase): self.assertEquals(res.status_int, expected) # repeat test - proxy_server.http_connect = fake_http_connect(*statuses, **kwargs) + set_http_connect(*statuses, **kwargs) self.app.memcache.store = {} req = Request.blank('/a/c/o', headers={'Content-Length': '0', 'Content-Type': 'text/plain'}) @@ -697,7 +717,7 @@ class TestObjectController(unittest.TestCase): # The three responses here are for account_info() (HEAD to account server), # container_info() (HEAD to container server) and three calls to # _connect_put_node() (PUT to three object servers) - proxy_server.http_connect = fake_http_connect(201, 201, 201, 201, 201, + set_http_connect(201, 201, 201, 201, 201, give_content_type=lambda content_type: self.assertEquals(content_type, expected.next())) # We need into include a transfer-encoding to get past @@ -737,7 +757,7 @@ class TestObjectController(unittest.TestCase): 'container', 'object') def test_status_map(statuses, expected): - proxy_server.http_connect = fake_http_connect(*statuses) + set_http_connect(*statuses) req = Request.blank('/a/c/o.jpg', {}) req.content_length = 0 self.app.update_request(req) @@ -756,7 +776,7 @@ class TestObjectController(unittest.TestCase): 'container', 'object') def test_status_map(statuses, expected): - proxy_server.http_connect = fake_http_connect(*statuses) + set_http_connect(*statuses) self.app.memcache.store = {} req = Request.blank('/a/c/o.jpg', {}) req.content_length = 0 @@ -777,7 +797,7 @@ class TestObjectController(unittest.TestCase): def test_status_map(statuses, expected): self.app.memcache.store = {} - proxy_server.http_connect = fake_http_connect(*statuses) + set_http_connect(*statuses) req = Request.blank('/a/c/o.jpg', environ={'REQUEST_METHOD': 'PUT'}, body='some data') self.app.update_request(req) @@ -790,7 +810,7 @@ class TestObjectController(unittest.TestCase): def test_PUT_max_size(self): with save_globals(): - proxy_server.http_connect = fake_http_connect(201, 201, 201) + set_http_connect(201, 201, 201) controller = proxy_server.ObjectController(self.app, 'account', 'container', 'object') req = Request.blank('/a/c/o', {}, headers={ @@ -808,7 +828,7 @@ class TestObjectController(unittest.TestCase): def test_status_map(statuses, expected): self.app.memcache.store = {} - proxy_server.http_connect = fake_http_connect(*statuses) + set_http_connect(*statuses) req = Request.blank('/a/c/o.jpg', {}) req.content_length = 0 self.app.update_request(req) @@ -827,7 +847,7 @@ class TestObjectController(unittest.TestCase): 'container', 'object') def test_status_map(statuses, expected): - proxy_server.http_connect = fake_http_connect(*statuses) + set_http_connect(*statuses) self.app.memcache.store = {} req = Request.blank('/a/c/o', {}, headers={ 'Content-Type': 'foo/bar'}) @@ -849,7 +869,7 @@ class TestObjectController(unittest.TestCase): 'container', 'object') def test_status_map(statuses, expected): - proxy_server.http_connect = fake_http_connect(*statuses) + set_http_connect(*statuses) self.app.memcache.store = {} req = Request.blank('/a/c/o', {}, headers={ 'Content-Type': 'foo/bar'}) @@ -871,7 +891,7 @@ class TestObjectController(unittest.TestCase): 'container', 'object') def test_status_map(statuses, expected): - proxy_server.http_connect = fake_http_connect(*statuses) + set_http_connect(*statuses) self.app.memcache.store = {} req = Request.blank('/a/c/o', {}) self.app.update_request(req) @@ -891,7 +911,7 @@ class TestObjectController(unittest.TestCase): 'container', 'object') def test_status_map(statuses, expected): - proxy_server.http_connect = fake_http_connect(*statuses) + set_http_connect(*statuses) self.app.memcache.store = {} req = Request.blank('/a/c/o', {}) self.app.update_request(req) @@ -918,8 +938,7 @@ class TestObjectController(unittest.TestCase): def test_status_map(statuses, expected, timestamps, expected_timestamp): - proxy_server.http_connect = \ - fake_http_connect(*statuses, timestamps=timestamps) + set_http_connect(*statuses, timestamps=timestamps) self.app.memcache.store = {} req = Request.blank('/a/c/o', {}, headers={'x-newest': 'true'}) self.app.update_request(req) @@ -944,8 +963,7 @@ class TestObjectController(unittest.TestCase): def test_status_map(statuses, expected, timestamps, expected_timestamp): - proxy_server.http_connect = \ - fake_http_connect(*statuses, timestamps=timestamps) + set_http_connect(*statuses, timestamps=timestamps) self.app.memcache.store = {} req = Request.blank('/a/c/o', {}, headers={'x-newest': 'true'}) self.app.update_request(req) @@ -968,8 +986,7 @@ class TestObjectController(unittest.TestCase): def test_status_map(statuses, expected, timestamps, expected_timestamp): - proxy_server.http_connect = \ - fake_http_connect(*statuses, timestamps=timestamps) + set_http_connect(*statuses, timestamps=timestamps) self.app.memcache.store = {} req = Request.blank('/a/c/o', {}) self.app.update_request(req) @@ -990,16 +1007,15 @@ class TestObjectController(unittest.TestCase): self.app.object_post_as_copy = False controller = proxy_server.ObjectController(self.app, 'account', 'container', 'object') - proxy_server.http_connect = \ - fake_http_connect(200, 200, 202, 202, 202) - # acct cont obj obj obj + set_http_connect(200, 200, 202, 202, 202) + # acct cont obj obj obj req = Request.blank('/a/c/o', {}, headers={ 'Content-Type': 'foo/bar', 'X-Object-Meta-Foo': 'x' * 256}) self.app.update_request(req) res = controller.POST(req) self.assertEquals(res.status_int, 202) - proxy_server.http_connect = fake_http_connect(202, 202, 202) + set_http_connect(202, 202, 202) req = Request.blank('/a/c/o', {}, headers={ 'Content-Type': 'foo/bar', 'X-Object-Meta-Foo': 'x' * 257}) @@ -1011,16 +1027,15 @@ class TestObjectController(unittest.TestCase): with save_globals(): controller = proxy_server.ObjectController(self.app, 'account', 'container', 'object') - proxy_server.http_connect = \ - fake_http_connect(200, 200, 200, 200, 200, 202, 202, 202) - # acct cont objc objc objc obj obj obj + set_http_connect(200, 200, 200, 200, 200, 202, 202, 202) + # acct cont objc objc objc obj obj obj req = Request.blank('/a/c/o', {}, headers={ 'Content-Type': 'foo/bar', 'X-Object-Meta-Foo': 'x' * 256}) self.app.update_request(req) res = controller.POST(req) self.assertEquals(res.status_int, 202) - proxy_server.http_connect = fake_http_connect(202, 202, 202) + set_http_connect(202, 202, 202) req = Request.blank('/a/c/o', {}, headers={ 'Content-Type': 'foo/bar', 'X-Object-Meta-Foo': 'x' * 257}) @@ -1033,16 +1048,15 @@ class TestObjectController(unittest.TestCase): self.app.object_post_as_copy = False controller = proxy_server.ObjectController(self.app, 'account', 'container', 'object') - proxy_server.http_connect = \ - fake_http_connect(200, 200, 202, 202, 202) - # acct cont obj obj obj + set_http_connect(200, 200, 202, 202, 202) + # acct cont obj obj obj req = Request.blank('/a/c/o', {}, headers={ 'Content-Type': 'foo/bar', ('X-Object-Meta-' + 'x' * 128): 'x'}) self.app.update_request(req) res = controller.POST(req) self.assertEquals(res.status_int, 202) - proxy_server.http_connect = fake_http_connect(202, 202, 202) + set_http_connect(202, 202, 202) req = Request.blank('/a/c/o', {}, headers={ 'Content-Type': 'foo/bar', ('X-Object-Meta-' + 'x' * 129): 'x'}) @@ -1054,16 +1068,15 @@ class TestObjectController(unittest.TestCase): with save_globals(): controller = proxy_server.ObjectController(self.app, 'account', 'container', 'object') - proxy_server.http_connect = \ - fake_http_connect(200, 200, 200, 200, 200, 202, 202, 202) - # acct cont objc objc objc obj obj obj + set_http_connect(200, 200, 200, 200, 200, 202, 202, 202) + # acct cont objc objc objc obj obj obj req = Request.blank('/a/c/o', {}, headers={ 'Content-Type': 'foo/bar', ('X-Object-Meta-' + 'x' * 128): 'x'}) self.app.update_request(req) res = controller.POST(req) self.assertEquals(res.status_int, 202) - proxy_server.http_connect = fake_http_connect(202, 202, 202) + set_http_connect(202, 202, 202) req = Request.blank('/a/c/o', {}, headers={ 'Content-Type': 'foo/bar', ('X-Object-Meta-' + 'x' * 129): 'x'}) @@ -1078,7 +1091,7 @@ class TestObjectController(unittest.TestCase): headers = dict( (('X-Object-Meta-' + str(i), 'a') for i in xrange(91))) headers.update({'Content-Type': 'foo/bar'}) - proxy_server.http_connect = fake_http_connect(202, 202, 202) + set_http_connect(202, 202, 202) req = Request.blank('/a/c/o', {}, headers=headers) self.app.update_request(req) res = controller.POST(req) @@ -1091,7 +1104,7 @@ class TestObjectController(unittest.TestCase): headers = dict( (('X-Object-Meta-' + str(i), 'a' * 256) for i in xrange(1000))) headers.update({'Content-Type': 'foo/bar'}) - proxy_server.http_connect = fake_http_connect(202, 202, 202) + set_http_connect(202, 202, 202) req = Request.blank('/a/c/o', {}, headers=headers) self.app.update_request(req) res = controller.POST(req) @@ -1130,9 +1143,8 @@ class TestObjectController(unittest.TestCase): self.app.update_request(req) controller = proxy_server.ObjectController(self.app, 'account', 'container', 'object') - proxy_server.http_connect = \ - fake_http_connect(200, 200, 201, 201, 201) - # acct cont obj obj obj + set_http_connect(200, 200, 201, 201, 201) + # acct cont obj obj obj resp = controller.PUT(req) self.assertEquals(resp.status_int, 201) self.app.client_timeout = 0.1 @@ -1140,9 +1152,8 @@ class TestObjectController(unittest.TestCase): environ={'REQUEST_METHOD': 'PUT', 'wsgi.input': SlowBody()}, headers={'Content-Length': '4', 'Content-Type': 'text/plain'}) self.app.update_request(req) - proxy_server.http_connect = \ - fake_http_connect(201, 201, 201) - # obj obj obj + set_http_connect(201, 201, 201) + # obj obj obj resp = controller.PUT(req) self.assertEquals(resp.status_int, 408) @@ -1175,9 +1186,8 @@ class TestObjectController(unittest.TestCase): self.app.update_request(req) controller = proxy_server.ObjectController(self.app, 'account', 'container', 'object') - proxy_server.http_connect = \ - fake_http_connect(200, 200, 201, 201, 201) - # acct cont obj obj obj + set_http_connect(200, 200, 201, 201, 201) + # acct cont obj obj obj resp = controller.PUT(req) self.assertEquals(resp.status_int, 499) @@ -1199,24 +1209,22 @@ class TestObjectController(unittest.TestCase): self.app.update_request(req) controller = proxy_server.ObjectController(self.app, 'account', 'container', 'object') - proxy_server.http_connect = \ - fake_http_connect(200, 200, 200, slow=True) + set_http_connect(200, 200, 200, slow=True) req.sent_size = 0 resp = controller.GET(req) got_exc = False try: resp.body - except proxy_server.ChunkReadTimeout: + except ChunkReadTimeout: got_exc = True self.assert_(not got_exc) self.app.node_timeout = 0.1 - proxy_server.http_connect = \ - fake_http_connect(200, 200, 200, slow=True) + set_http_connect(200, 200, 200, slow=True) resp = controller.GET(req) got_exc = False try: resp.body - except proxy_server.ChunkReadTimeout: + except ChunkReadTimeout: got_exc = True self.assert_(got_exc) @@ -1241,13 +1249,11 @@ class TestObjectController(unittest.TestCase): self.app.update_request(req) controller = proxy_server.ObjectController(self.app, 'account', 'container', 'object') - proxy_server.http_connect = \ - fake_http_connect(200, 200, 201, 201, 201, slow=True) + set_http_connect(200, 200, 201, 201, 201, slow=True) resp = controller.PUT(req) self.assertEquals(resp.status_int, 201) self.app.node_timeout = 0.1 - proxy_server.http_connect = \ - fake_http_connect(201, 201, 201, slow=True) + set_http_connect(201, 201, 201, slow=True) req = Request.blank('/a/c/o', environ={'REQUEST_METHOD': 'PUT'}, headers={'Content-Length': '4', 'Content-Type': 'text/plain'}, @@ -1331,16 +1337,15 @@ class TestObjectController(unittest.TestCase): self.app.update_request(req) controller = proxy_server.ObjectController(self.app, 'account', 'container', 'object') - proxy_server.http_connect = fake_http_connect(200, 200, 200) + set_http_connect(200, 200, 200) resp = controller.GET(req) self.assertEquals(resp.status_int, 200) self.assertEquals(resp.content_type, 'x-application/test') - proxy_server.http_connect = fake_http_connect(200, 200, 200) + set_http_connect(200, 200, 200) resp = controller.GET(req) self.assertEquals(resp.status_int, 200) self.assertEquals(resp.content_length, 0) - proxy_server.http_connect = \ - fake_http_connect(200, 200, 200, slow=True) + set_http_connect(200, 200, 200, slow=True) resp = controller.GET(req) self.assertEquals(resp.status_int, 200) self.assertEquals(resp.content_length, 4) @@ -1351,22 +1356,22 @@ class TestObjectController(unittest.TestCase): self.app.update_request(req) controller = proxy_server.ObjectController(self.app, 'account', 'container', 'object') - proxy_server.http_connect = fake_http_connect(200, 200, 200) + set_http_connect(200, 200, 200) resp = controller.HEAD(req) self.assertEquals(resp.status_int, 200) self.assertEquals(resp.content_length, 0) - proxy_server.http_connect = \ - fake_http_connect(200, 200, 200, slow=True) + set_http_connect(200, 200, 200, slow=True) resp = controller.HEAD(req) self.assertEquals(resp.status_int, 200) self.assertEquals(resp.content_length, 4) def test_error_limiting(self): with save_globals(): - proxy_server.shuffle = lambda l: None + set_shuffle() controller = proxy_server.ObjectController(self.app, 'account', 'container', 'object') self.assert_status_map(controller.HEAD, (200, 200, 503, 200, 200), 200) + print controller.app.object_ring.devs self.assertEquals(controller.app.object_ring.devs[0]['errors'], 2) self.assert_('last_error' in controller.app.object_ring.devs[0]) for _junk in xrange(self.app.error_suppression_limit): @@ -1397,62 +1402,53 @@ class TestObjectController(unittest.TestCase): del dev['last_error'] controller = proxy_server.ObjectController(self.app, 'account', 'container', 'object') - proxy_server.http_connect = \ - fake_http_connect(200, 200, 200, 200, 200, 200) + set_http_connect(200, 200, 200, 200, 200, 200) req = Request.blank('/a/c/o', environ={'REQUEST_METHOD': 'DELETE'}) self.app.update_request(req) resp = getattr(controller, 'DELETE')(req) self.assertEquals(resp.status_int, 200) - proxy_server.http_connect = \ - fake_http_connect(404, 404, 404) - # acct acct acct + set_http_connect(404, 404, 404) + # acct acct acct resp = getattr(controller, 'DELETE')(req) self.assertEquals(resp.status_int, 404) - proxy_server.http_connect = \ - fake_http_connect(503, 404, 404) - # acct acct acct + set_http_connect(503, 404, 404) + # acct acct acct resp = getattr(controller, 'DELETE')(req) self.assertEquals(resp.status_int, 404) - proxy_server.http_connect = \ - fake_http_connect(503, 503, 404) - # acct acct acct + set_http_connect(503, 503, 404) + # acct acct acct resp = getattr(controller, 'DELETE')(req) self.assertEquals(resp.status_int, 404) - proxy_server.http_connect = \ - fake_http_connect(503, 503, 503) - # acct acct acct + set_http_connect(503, 503, 503) + # acct acct acct resp = getattr(controller, 'DELETE')(req) self.assertEquals(resp.status_int, 404) - proxy_server.http_connect = \ - fake_http_connect(200, 200, 204, 204, 204) - # acct cont obj obj obj + set_http_connect(200, 200, 204, 204, 204) + # acct cont obj obj obj resp = getattr(controller, 'DELETE')(req) self.assertEquals(resp.status_int, 204) - proxy_server.http_connect = \ - fake_http_connect(200, 404, 404, 404) - # acct cont cont cont + set_http_connect(200, 404, 404, 404) + # acct cont cont cont resp = getattr(controller, 'DELETE')(req) self.assertEquals(resp.status_int, 404) - proxy_server.http_connect = \ - fake_http_connect(200, 503, 503, 503) - # acct cont cont cont + set_http_connect(200, 503, 503, 503) + # acct cont cont cont resp = getattr(controller, 'DELETE')(req) self.assertEquals(resp.status_int, 404) for dev in self.app.account_ring.devs.values(): dev['errors'] = self.app.error_suppression_limit + 1 dev['last_error'] = time() - proxy_server.http_connect = \ - fake_http_connect(200) - # acct [isn't actually called since everything - # is error limited] + set_http_connect(200) + # acct [isn't actually called since everything + # is error limited] resp = getattr(controller, 'DELETE')(req) self.assertEquals(resp.status_int, 404) @@ -1461,10 +1457,9 @@ class TestObjectController(unittest.TestCase): for dev in self.app.container_ring.devs.values(): dev['errors'] = self.app.error_suppression_limit + 1 dev['last_error'] = time() - proxy_server.http_connect = \ - fake_http_connect(200, 200) - # acct cont [isn't actually called since - # everything is error limited] + set_http_connect(200, 200) + # acct cont [isn't actually called since + # everything is error limited] resp = getattr(controller, 'DELETE')(req) self.assertEquals(resp.status_int, 404) @@ -1475,15 +1470,13 @@ class TestObjectController(unittest.TestCase): controller = proxy_server.ObjectController(self.app, 'account', 'container', 'object') - proxy_server.http_connect = \ - fake_http_connect(200, 404, 404, 404, 200, 200, 200) + set_http_connect(200, 404, 404, 404, 200, 200, 200) req = Request.blank('/a/c/o', environ={'REQUEST_METHOD': 'PUT'}) self.app.update_request(req) resp = controller.PUT(req) self.assertEquals(resp.status_int, 404) - proxy_server.http_connect = \ - fake_http_connect(200, 404, 404, 404, 200, 200) + set_http_connect(200, 404, 404, 404, 200, 200) req = Request.blank('/a/c/o', environ={'REQUEST_METHOD': 'POST'}, headers={'Content-Type': 'text/plain'}) self.app.update_request(req) @@ -1495,16 +1488,13 @@ class TestObjectController(unittest.TestCase): self.app.memcache = FakeMemcacheReturnsNone() controller = proxy_server.ObjectController(self.app, 'account', 'container', 'object') - proxy_server.http_connect = \ - fake_http_connect(200, 404, 404, 404, 200, 200, 200) + set_http_connect(200, 404, 404, 404, 200, 200, 200) req = Request.blank('/a/c/o', environ={'REQUEST_METHOD': 'PUT'}) self.app.update_request(req) resp = controller.PUT(req) self.assertEquals(resp.status_int, 404) - proxy_server.http_connect = \ - fake_http_connect(200, 404, 404, 404, 200, 200, 200, 200, 200, - 200) + set_http_connect(200, 404, 404, 404, 200, 200, 200, 200, 200, 200) req = Request.blank('/a/c/o', environ={'REQUEST_METHOD': 'POST'}, headers={'Content-Type': 'text/plain'}) self.app.update_request(req) @@ -1515,16 +1505,15 @@ class TestObjectController(unittest.TestCase): with save_globals(): controller = proxy_server.ObjectController(self.app, 'account', 'container', 'object') - proxy_server.http_connect = \ - fake_http_connect(200, 200, 201, 201, 201) - # acct cont obj obj obj + set_http_connect(200, 200, 201, 201, 201) + # acct cont obj obj obj req = Request.blank('/a/c/o', environ={'REQUEST_METHOD': 'PUT'}, headers={'Content-Length': '0'}) self.app.update_request(req) resp = controller.PUT(req) self.assertEquals(resp.status_int, 201) - proxy_server.http_connect = fake_http_connect(201, 201, 201) + set_http_connect(201, 201, 201) req = Request.blank('/a/c/o', environ={'REQUEST_METHOD': 'PUT'}, headers={'Content-Length': '0', 'X-Object-Meta-' + ('a' * @@ -1532,7 +1521,7 @@ class TestObjectController(unittest.TestCase): self.app.update_request(req) resp = controller.PUT(req) self.assertEquals(resp.status_int, 201) - proxy_server.http_connect = fake_http_connect(201, 201, 201) + set_http_connect(201, 201, 201) req = Request.blank('/a/c/o', environ={'REQUEST_METHOD': 'PUT'}, headers={'Content-Length': '0', 'X-Object-Meta-' + ('a' * @@ -1541,7 +1530,7 @@ class TestObjectController(unittest.TestCase): resp = controller.PUT(req) self.assertEquals(resp.status_int, 400) - proxy_server.http_connect = fake_http_connect(201, 201, 201) + set_http_connect(201, 201, 201) req = Request.blank('/a/c/o', environ={'REQUEST_METHOD': 'PUT'}, headers={'Content-Length': '0', 'X-Object-Meta-Too-Long': 'a' * @@ -1549,7 +1538,7 @@ class TestObjectController(unittest.TestCase): self.app.update_request(req) resp = controller.PUT(req) self.assertEquals(resp.status_int, 201) - proxy_server.http_connect = fake_http_connect(201, 201, 201) + set_http_connect(201, 201, 201) req = Request.blank('/a/c/o', environ={'REQUEST_METHOD': 'PUT'}, headers={'Content-Length': '0', 'X-Object-Meta-Too-Long': 'a' * @@ -1558,7 +1547,7 @@ class TestObjectController(unittest.TestCase): resp = controller.PUT(req) self.assertEquals(resp.status_int, 400) - proxy_server.http_connect = fake_http_connect(201, 201, 201) + set_http_connect(201, 201, 201) headers = {'Content-Length': '0'} for x in xrange(MAX_META_COUNT): headers['X-Object-Meta-%d' % x] = 'v' @@ -1567,7 +1556,7 @@ class TestObjectController(unittest.TestCase): self.app.update_request(req) resp = controller.PUT(req) self.assertEquals(resp.status_int, 201) - proxy_server.http_connect = fake_http_connect(201, 201, 201) + set_http_connect(201, 201, 201) headers = {'Content-Length': '0'} for x in xrange(MAX_META_COUNT + 1): headers['X-Object-Meta-%d' % x] = 'v' @@ -1577,7 +1566,7 @@ class TestObjectController(unittest.TestCase): resp = controller.PUT(req) self.assertEquals(resp.status_int, 400) - proxy_server.http_connect = fake_http_connect(201, 201, 201) + set_http_connect(201, 201, 201) headers = {'Content-Length': '0'} header_value = 'a' * MAX_META_VALUE_LENGTH size = 0 @@ -1595,7 +1584,7 @@ class TestObjectController(unittest.TestCase): self.app.update_request(req) resp = controller.PUT(req) self.assertEquals(resp.status_int, 201) - proxy_server.http_connect = fake_http_connect(201, 201, 201) + set_http_connect(201, 201, 201) headers['X-Object-Meta-a'] = \ 'a' * (MAX_META_OVERALL_SIZE - size) req = Request.blank('/a/c/o', environ={'REQUEST_METHOD': 'PUT'}, @@ -1612,9 +1601,8 @@ class TestObjectController(unittest.TestCase): req = Request.blank('/a/c/o', environ={'REQUEST_METHOD': 'PUT'}, headers={'Content-Length': '0'}) self.app.update_request(req) - proxy_server.http_connect = \ - fake_http_connect(200, 200, 201, 201, 201) - # acct cont obj obj obj + set_http_connect(200, 200, 201, 201, 201) + # acct cont obj obj obj resp = controller.PUT(req) self.assertEquals(resp.status_int, 201) @@ -1623,11 +1611,8 @@ class TestObjectController(unittest.TestCase): headers={'Content-Length': '0', 'X-Copy-From': 'c/o'}) self.app.update_request(req) - proxy_server.http_connect = \ - fake_http_connect(200, 200, 200, 200, 200, 200, 200, 201, 201, - 201) - # acct cont acct cont objc objc objc obj obj - # obj + set_http_connect(200, 200, 200, 200, 200, 200, 200, 201, 201, 201) + # acct cont acct cont objc objc objc obj obj obj self.app.memcache.store = {} resp = controller.PUT(req) self.assertEquals(resp.status_int, 201) @@ -1638,9 +1623,8 @@ class TestObjectController(unittest.TestCase): headers={'Content-Length': '5', 'X-Copy-From': 'c/o'}) self.app.update_request(req) - proxy_server.http_connect = \ - fake_http_connect(200, 200, 200, 200, 200, 200, 200) - # acct cont acct cont objc objc objc + set_http_connect(200, 200, 200, 200, 200, 200, 200) + # acct cont acct cont objc objc objc self.app.memcache.store = {} resp = controller.PUT(req) self.assertEquals(resp.status_int, 400) @@ -1650,11 +1634,8 @@ class TestObjectController(unittest.TestCase): headers={'Content-Length': '0', 'X-Copy-From': 'c/o/o2'}) req.account = 'a' - proxy_server.http_connect = \ - fake_http_connect(200, 200, 200, 200, 200, 200, 200, 201, 201, - 201) - # acct cont acct cont objc objc objc obj obj - # obj + set_http_connect(200, 200, 200, 200, 200, 200, 200, 201, 201, 201) + # acct cont acct cont objc objc objc obj obj obj self.app.memcache.store = {} resp = controller.PUT(req) self.assertEquals(resp.status_int, 201) @@ -1665,11 +1646,8 @@ class TestObjectController(unittest.TestCase): headers={'Content-Length': '0', 'X-Copy-From': 'c/o%20o2'}) req.account = 'a' - proxy_server.http_connect = \ - fake_http_connect(200, 200, 200, 200, 200, 200, 200, 201, 201, - 201) - # acct cont acct cont objc objc objc obj obj - # obj + set_http_connect(200, 200, 200, 200, 200, 200, 200, 201, 201, 201) + # acct cont acct cont objc objc objc obj obj obj self.app.memcache.store = {} resp = controller.PUT(req) self.assertEquals(resp.status_int, 201) @@ -1680,11 +1658,8 @@ class TestObjectController(unittest.TestCase): headers={'Content-Length': '0', 'X-Copy-From': '/c/o'}) self.app.update_request(req) - proxy_server.http_connect = \ - fake_http_connect(200, 200, 200, 200, 200, 200, 200, 201, 201, - 201) - # acct cont acct cont objc objc objc obj obj - # obj + set_http_connect(200, 200, 200, 200, 200, 200, 200, 201, 201, 201) + # acct cont acct cont objc objc objc obj obj obj self.app.memcache.store = {} resp = controller.PUT(req) self.assertEquals(resp.status_int, 201) @@ -1694,11 +1669,8 @@ class TestObjectController(unittest.TestCase): headers={'Content-Length': '0', 'X-Copy-From': '/c/o/o2'}) req.account = 'a' - proxy_server.http_connect = \ - fake_http_connect(200, 200, 200, 200, 200, 200, 200, 201, 201, - 201) - # acct cont acct cont objc objc objc obj obj - # obj + set_http_connect(200, 200, 200, 200, 200, 200, 200, 201, 201, 201) + # acct cont acct cont objc objc objc obj obj obj self.app.memcache.store = {} resp = controller.PUT(req) self.assertEquals(resp.status_int, 201) @@ -1720,9 +1692,8 @@ class TestObjectController(unittest.TestCase): headers={'Content-Length': '0', 'X-Copy-From': '/c/o'}) self.app.update_request(req) - proxy_server.http_connect = \ - fake_http_connect(200, 200, 503, 503, 503) - # acct cont objc objc objc + set_http_connect(200, 200, 503, 503, 503) + # acct cont objc objc objc self.app.memcache.store = {} resp = controller.PUT(req) self.assertEquals(resp.status_int, 503) @@ -1732,9 +1703,8 @@ class TestObjectController(unittest.TestCase): headers={'Content-Length': '0', 'X-Copy-From': '/c/o'}) self.app.update_request(req) - proxy_server.http_connect = \ - fake_http_connect(200, 200, 404, 404, 404) - # acct cont objc objc objc + set_http_connect(200, 200, 404, 404, 404) + # acct cont objc objc objc self.app.memcache.store = {} resp = controller.PUT(req) self.assertEquals(resp.status_int, 404) @@ -1744,9 +1714,8 @@ class TestObjectController(unittest.TestCase): headers={'Content-Length': '0', 'X-Copy-From': '/c/o'}) self.app.update_request(req) - proxy_server.http_connect = \ - fake_http_connect(200, 200, 404, 404, 200, 201, 201, 201) - # acct cont objc objc objc obj obj obj + set_http_connect(200, 200, 404, 404, 200, 201, 201, 201) + # acct cont objc objc objc obj obj obj self.app.memcache.store = {} resp = controller.PUT(req) self.assertEquals(resp.status_int, 201) @@ -1757,9 +1726,8 @@ class TestObjectController(unittest.TestCase): 'X-Copy-From': '/c/o', 'X-Object-Meta-Ours': 'okay'}) self.app.update_request(req) - proxy_server.http_connect = \ - fake_http_connect(200, 200, 200, 200, 200, 201, 201, 201) - # acct cont objc objc objc obj obj obj + set_http_connect(200, 200, 200, 200, 200, 201, 201, 201) + # acct cont objc objc objc obj obj obj self.app.memcache.store = {} resp = controller.PUT(req) self.assertEquals(resp.status_int, 201) @@ -1773,20 +1741,16 @@ class TestObjectController(unittest.TestCase): req = Request.blank('/a/c/o', environ={'REQUEST_METHOD': 'PUT'}, headers={'Content-Length': '0'}) req.account = 'a' - proxy_server.http_connect = \ - fake_http_connect(200, 200, 201, 201, 201) - # acct cont obj obj obj + set_http_connect(200, 200, 201, 201, 201) + # acct cont obj obj obj resp = controller.PUT(req) self.assertEquals(resp.status_int, 201) req = Request.blank('/a/c/o', environ={'REQUEST_METHOD': 'COPY'}, headers={'Destination': 'c/o'}) req.account = 'a' - proxy_server.http_connect = \ - fake_http_connect(200, 200, 200, 200, 200, 200, 200, 201, 201, - 201) - # acct cont acct cont objc objc objc obj obj - # obj + set_http_connect(200, 200, 200, 200, 200, 200, 200, 201, 201, 201) + # acct cont acct cont objc objc objc obj obj obj self.app.memcache.store = {} resp = controller.COPY(req) self.assertEquals(resp.status_int, 201) @@ -1797,11 +1761,8 @@ class TestObjectController(unittest.TestCase): headers={'Destination': 'c/o'}) req.account = 'a' controller.object_name = 'o/o2' - proxy_server.http_connect = \ - fake_http_connect(200, 200, 200, 200, 200, 200, 200, 201, 201, - 201) - # acct cont acct cont objc objc objc obj obj - # obj + set_http_connect(200, 200, 200, 200, 200, 200, 200, 201, 201, 201) + # acct cont acct cont objc objc objc obj obj obj self.app.memcache.store = {} resp = controller.COPY(req) self.assertEquals(resp.status_int, 201) @@ -1811,11 +1772,8 @@ class TestObjectController(unittest.TestCase): headers={'Destination': '/c/o'}) req.account = 'a' controller.object_name = 'o' - proxy_server.http_connect = \ - fake_http_connect(200, 200, 200, 200, 200, 200, 200, 201, 201, - 201) - # acct cont acct cont objc objc objc obj obj - # obj + set_http_connect(200, 200, 200, 200, 200, 200, 200, 201, 201, 201) + # acct cont acct cont objc objc objc obj obj obj self.app.memcache.store = {} resp = controller.COPY(req) self.assertEquals(resp.status_int, 201) @@ -1826,11 +1784,8 @@ class TestObjectController(unittest.TestCase): headers={'Destination': '/c/o'}) req.account = 'a' controller.object_name = 'o/o2' - proxy_server.http_connect = \ - fake_http_connect(200, 200, 200, 200, 200, 200, 200, 201, 201, - 201) - # acct cont acct cont objc objc objc obj obj - # obj + set_http_connect(200, 200, 200, 200, 200, 200, 200, 201, 201, 201) + # acct cont acct cont objc objc objc obj obj obj self.app.memcache.store = {} resp = controller.COPY(req) self.assertEquals(resp.status_int, 201) @@ -1840,9 +1795,8 @@ class TestObjectController(unittest.TestCase): headers={'Destination': 'c_o'}) req.account = 'a' controller.object_name = 'o' - proxy_server.http_connect = \ - fake_http_connect(200, 200) - # acct cont + set_http_connect(200, 200) + # acct cont self.app.memcache.store = {} resp = controller.COPY(req) self.assertEquals(resp.status_int, 412) @@ -1851,9 +1805,8 @@ class TestObjectController(unittest.TestCase): headers={'Destination': '/c/o'}) req.account = 'a' controller.object_name = 'o' - proxy_server.http_connect = \ - fake_http_connect(200, 200, 503, 503, 503) - # acct cont objc objc objc + set_http_connect(200, 200, 503, 503, 503) + # acct cont objc objc objc self.app.memcache.store = {} resp = controller.COPY(req) self.assertEquals(resp.status_int, 503) @@ -1862,9 +1815,8 @@ class TestObjectController(unittest.TestCase): headers={'Destination': '/c/o'}) req.account = 'a' controller.object_name = 'o' - proxy_server.http_connect = \ - fake_http_connect(200, 200, 404, 404, 404) - # acct cont objc objc objc + set_http_connect(200, 200, 404, 404, 404) + # acct cont objc objc objc self.app.memcache.store = {} resp = controller.COPY(req) self.assertEquals(resp.status_int, 404) @@ -1873,9 +1825,8 @@ class TestObjectController(unittest.TestCase): headers={'Destination': '/c/o'}) req.account = 'a' controller.object_name = 'o' - proxy_server.http_connect = \ - fake_http_connect(200, 200, 404, 404, 200, 201, 201, 201) - # acct cont objc objc objc obj obj obj + set_http_connect(200, 200, 404, 404, 200, 201, 201, 201) + # acct cont objc objc objc obj obj obj self.app.memcache.store = {} resp = controller.COPY(req) self.assertEquals(resp.status_int, 201) @@ -1885,9 +1836,8 @@ class TestObjectController(unittest.TestCase): 'X-Object-Meta-Ours': 'okay'}) req.account = 'a' controller.object_name = 'o' - proxy_server.http_connect = \ - fake_http_connect(200, 200, 200, 200, 200, 201, 201, 201) - # acct cont objc objc objc obj obj obj + set_http_connect(200, 200, 200, 200, 200, 201, 201, 201) + # acct cont objc objc objc obj obj obj self.app.memcache.store = {} resp = controller.COPY(req) self.assertEquals(resp.status_int, 201) @@ -1902,10 +1852,9 @@ class TestObjectController(unittest.TestCase): headers={'Destination': '/c/o'}) req.account = 'a' controller.object_name = 'o' - proxy_server.http_connect = \ - fake_http_connect(200, 200, 200, 200, 200, 201, 201, 201, - timestamps=('1', '1', '1', '3', '2', '4', '4', '4')) - # acct cont objc objc objc obj obj obj + set_http_connect(200, 200, 200, 200, 200, 201, 201, 201, + # acct cont objc objc objc obj obj obj + timestamps=('1', '1', '1', '3', '2', '4', '4', '4')) self.app.memcache.store = {} resp = controller.COPY(req) self.assertEquals(resp.status_int, 201) @@ -1934,7 +1883,7 @@ class TestObjectController(unittest.TestCase): return data with save_globals(): - proxy_server.http_connect = fake_http_connect(201, 201, 201, 201) + set_http_connect(201, 201, 201, 201) controller = proxy_server.ObjectController(self.app, 'account', 'container', 'object') req = Request.blank('/a/c/o', environ={'REQUEST_METHOD': 'COPY'}, @@ -1948,8 +1897,7 @@ class TestObjectController(unittest.TestCase): self.assertEquals(res.status_int // 100, 2) # success # test 413 entity to large - from swift.proxy import server - proxy_server.http_connect = fake_http_connect(201, 201, 201, 201) + set_http_connect(201, 201, 201, 201) req = Request.blank('/a/c/o', environ={'REQUEST_METHOD': 'COPY'}, headers={'Transfer-Encoding': 'chunked', 'Content-Type': 'foo/bar'}) @@ -1957,11 +1905,11 @@ class TestObjectController(unittest.TestCase): self.app.memcache.store = {} self.app.update_request(req) try: - server.MAX_FILE_SIZE = 10 + swift.proxy.controllers.obj.MAX_FILE_SIZE = 10 res = controller.PUT(req) self.assertEquals(res.status_int, 413) finally: - server.MAX_FILE_SIZE = MAX_FILE_SIZE + swift.proxy.controllers.obj.MAX_FILE_SIZE = MAX_FILE_SIZE def test_chunked_put_bad_version(self): # Check bad version @@ -2727,7 +2675,7 @@ class TestObjectController(unittest.TestCase): body = fd.read() self.assertEquals(body, '1234 1234 1234 1234 1234 ') # Do it again but exceeding the container listing limit - proxy_server.CONTAINER_LISTING_LIMIT = 2 + swift.proxy.controllers.obj.CONTAINER_LISTING_LIMIT = 2 sock = connect_tcp(('localhost', prolis.getsockname()[1])) fd = sock.makefile() fd.write('GET /v1/a/segmented%20object/object%20name HTTP/1.1\r\n' @@ -2766,7 +2714,7 @@ class TestObjectController(unittest.TestCase): body = fd.read() # After adjusting the CONTAINER_LISTING_LIMIT, make a copy of # the manifested object which should consolidate the segments. - proxy_server.CONTAINER_LISTING_LIMIT = 10000 + swift.proxy.controllers.obj.CONTAINER_LISTING_LIMIT = 10000 sock = connect_tcp(('localhost', prolis.getsockname()[1])) fd = sock.makefile() fd.write('PUT /v1/a/segmented%20object/copy HTTP/1.1\r\n' @@ -2933,7 +2881,7 @@ class TestObjectController(unittest.TestCase): req = Request.blank('/a/c/o', environ={'REQUEST_METHOD': 'PUT'}, headers={'Content-Length': '0'}) self.app.update_request(req) - proxy_server.http_connect = fake_http_connect(200, 201, 201, 201, + set_http_connect(200, 201, 201, 201, etags=[None, '68b329da9893e34099c7d8ad5cb9c940', '68b329da9893e34099c7d8ad5cb9c940', @@ -2948,7 +2896,7 @@ class TestObjectController(unittest.TestCase): 'ETag': '68b329da9893e34099c7d8ad5cb9c940', }) self.app.update_request(req) - proxy_server.http_connect = fake_http_connect(200, 422, 422, 503, + set_http_connect(200, 422, 422, 503, etags=['68b329da9893e34099c7d8ad5cb9c940', '68b329da9893e34099c7d8ad5cb9c941', None, @@ -2962,7 +2910,7 @@ class TestObjectController(unittest.TestCase): self.app.update_request(req) controller = proxy_server.ObjectController(self.app, 'account', 'container', 'object') - proxy_server.http_connect = fake_http_connect(200, 200, 200) + set_http_connect(200, 200, 200) resp = controller.GET(req) self.assert_('accept-ranges' in resp.headers) self.assertEquals(resp.headers['accept-ranges'], 'bytes') @@ -2973,7 +2921,7 @@ class TestObjectController(unittest.TestCase): self.app.update_request(req) controller = proxy_server.ObjectController(self.app, 'account', 'container', 'object') - proxy_server.http_connect = fake_http_connect(200, 200, 200) + set_http_connect(200, 200, 200) resp = controller.HEAD(req) self.assert_('accept-ranges' in resp.headers) self.assertEquals(resp.headers['accept-ranges'], 'bytes') @@ -2985,8 +2933,7 @@ class TestObjectController(unittest.TestCase): called[0] = True return HTTPUnauthorized(request=req) with save_globals(): - proxy_server.http_connect = \ - fake_http_connect(200, 200, 201, 201, 201) + set_http_connect(200, 200, 201, 201, 201) controller = proxy_server.ObjectController(self.app, 'account', 'container', 'object') req = Request.blank('/a/c/o') @@ -3002,8 +2949,7 @@ class TestObjectController(unittest.TestCase): called[0] = True return HTTPUnauthorized(request=req) with save_globals(): - proxy_server.http_connect = \ - fake_http_connect(200, 200, 201, 201, 201) + set_http_connect(200, 200, 201, 201, 201) controller = proxy_server.ObjectController(self.app, 'account', 'container', 'object') req = Request.blank('/a/c/o', {'REQUEST_METHOD': 'HEAD'}) @@ -3020,8 +2966,7 @@ class TestObjectController(unittest.TestCase): return HTTPUnauthorized(request=req) with save_globals(): self.app.object_post_as_copy = False - proxy_server.http_connect = \ - fake_http_connect(200, 200, 201, 201, 201) + set_http_connect(200, 200, 201, 201, 201) controller = proxy_server.ObjectController(self.app, 'account', 'container', 'object') req = Request.blank('/a/c/o', environ={'REQUEST_METHOD': 'POST'}, @@ -3038,8 +2983,7 @@ class TestObjectController(unittest.TestCase): called[0] = True return HTTPUnauthorized(request=req) with save_globals(): - proxy_server.http_connect = \ - fake_http_connect(200, 200, 200, 200, 200, 201, 201, 201) + set_http_connect(200, 200, 200, 200, 200, 201, 201, 201) controller = proxy_server.ObjectController(self.app, 'account', 'container', 'object') req = Request.blank('/a/c/o', environ={'REQUEST_METHOD': 'POST'}, @@ -3056,8 +3000,7 @@ class TestObjectController(unittest.TestCase): called[0] = True return HTTPUnauthorized(request=req) with save_globals(): - proxy_server.http_connect = \ - fake_http_connect(200, 200, 201, 201, 201) + set_http_connect(200, 200, 201, 201, 201) controller = proxy_server.ObjectController(self.app, 'account', 'container', 'object') req = Request.blank('/a/c/o', environ={'REQUEST_METHOD': 'PUT'}, @@ -3074,8 +3017,7 @@ class TestObjectController(unittest.TestCase): called[0] = True return HTTPUnauthorized(request=req) with save_globals(): - proxy_server.http_connect = \ - fake_http_connect(200, 200, 200, 200, 200, 201, 201, 201) + set_http_connect(200, 200, 200, 200, 200, 201, 201, 201) controller = proxy_server.ObjectController(self.app, 'account', 'container', 'object') req = Request.blank('/a/c/o', environ={'REQUEST_METHOD': 'COPY'}, @@ -3089,8 +3031,7 @@ class TestObjectController(unittest.TestCase): with save_globals(): controller = proxy_server.ObjectController(self.app, 'account', 'container', 'object') - proxy_server.http_connect = \ - fake_http_connect(200, 200, 200, 200, 200, 202, 202, 202) + set_http_connect(200, 200, 200, 200, 200, 202, 202, 202) self.app.memcache.store = {} orig_time = proxy_server.time.time try: @@ -3107,8 +3048,7 @@ class TestObjectController(unittest.TestCase): self.app.object_post_as_copy = False controller = proxy_server.ObjectController(self.app, 'account', 'container', 'object') - proxy_server.http_connect = \ - fake_http_connect(200, 200, 202, 202, 202) + set_http_connect(200, 200, 202, 202, 202) self.app.memcache.store = {} req = Request.blank('/a/c/o', {}, headers={'Content-Type': 'foo/bar', 'X-Delete-After': '60'}) @@ -3124,8 +3064,7 @@ class TestObjectController(unittest.TestCase): with save_globals(): controller = proxy_server.ObjectController(self.app, 'account', 'container', 'object') - proxy_server.http_connect = \ - fake_http_connect(200, 200, 200, 200, 200, 202, 202, 202) + set_http_connect(200, 200, 200, 200, 200, 202, 202, 202) self.app.memcache.store = {} req = Request.blank('/a/c/o', {}, headers={'Content-Type': 'foo/bar', 'X-Delete-After': '60.1'}) @@ -3138,8 +3077,7 @@ class TestObjectController(unittest.TestCase): with save_globals(): controller = proxy_server.ObjectController(self.app, 'account', 'container', 'object') - proxy_server.http_connect = \ - fake_http_connect(200, 200, 200, 200, 200, 202, 202, 202) + set_http_connect(200, 200, 200, 200, 200, 202, 202, 202) self.app.memcache.store = {} req = Request.blank('/a/c/o', {}, headers={'Content-Type': 'foo/bar', 'X-Delete-After': '-60'}) @@ -3160,7 +3098,7 @@ class TestObjectController(unittest.TestCase): controller = proxy_server.ObjectController(self.app, 'account', 'container', 'object') controller.make_requests = fake_make_requests - proxy_server.http_connect = fake_http_connect(200, 200) + set_http_connect(200, 200) self.app.memcache.store = {} t = str(int(time() + 100)) req = Request.blank('/a/c/o', {}, @@ -3192,8 +3130,7 @@ class TestObjectController(unittest.TestCase): with save_globals(): controller = proxy_server.ObjectController(self.app, 'account', 'container', 'object') - proxy_server.http_connect = \ - fake_http_connect(200, 200, 201, 201, 201) + set_http_connect(200, 200, 201, 201, 201) self.app.memcache.store = {} orig_time = proxy_server.time.time try: @@ -3214,8 +3151,7 @@ class TestObjectController(unittest.TestCase): with save_globals(): controller = proxy_server.ObjectController(self.app, 'account', 'container', 'object') - proxy_server.http_connect = \ - fake_http_connect(200, 200, 201, 201, 201) + set_http_connect(200, 200, 201, 201, 201) self.app.memcache.store = {} req = Request.blank('/a/c/o', {}, headers={'Content-Length': '0', 'Content-Type': 'foo/bar', @@ -3229,8 +3165,7 @@ class TestObjectController(unittest.TestCase): with save_globals(): controller = proxy_server.ObjectController(self.app, 'account', 'container', 'object') - proxy_server.http_connect = \ - fake_http_connect(200, 200, 201, 201, 201) + set_http_connect(200, 200, 201, 201, 201) self.app.memcache.store = {} req = Request.blank('/a/c/o', {}, headers={'Content-Length': '0', 'Content-Type': 'foo/bar', @@ -3251,7 +3186,7 @@ class TestObjectController(unittest.TestCase): controller = proxy_server.ObjectController(self.app, 'account', 'container', 'object') controller._connect_put_node = fake_connect_put_node - proxy_server.http_connect = fake_http_connect(200, 200) + set_http_connect(200, 200) self.app.memcache.store = {} t = str(int(time() + 100)) req = Request.blank('/a/c/o', {}, @@ -3298,14 +3233,14 @@ class TestContainerController(unittest.TestCase): if raise_exc: kwargs['raise_exc'] = raise_exc kwargs['missing_container'] = missing_container - proxy_server.http_connect = fake_http_connect(*statuses, **kwargs) + set_http_connect(*statuses, **kwargs) self.app.memcache.store = {} req = Request.blank('/a/c', headers={'Content-Length': '0', 'Content-Type': 'text/plain'}) self.app.update_request(req) res = method(req) self.assertEquals(res.status_int, expected) - proxy_server.http_connect = fake_http_connect(*statuses, **kwargs) + set_http_connect(*statuses, **kwargs) self.app.memcache.store = {} req = Request.blank('/a/c/', headers={'Content-Length': '0', 'Content-Type': 'text/plain'}) @@ -3319,8 +3254,7 @@ class TestContainerController(unittest.TestCase): 'container') def test_status_map(statuses, expected, **kwargs): - proxy_server.http_connect = fake_http_connect(*statuses, - **kwargs) + set_http_connect(*statuses, **kwargs) self.app.memcache.store = {} req = Request.blank('/a/c', {}) self.app.update_request(req) @@ -3343,8 +3277,7 @@ class TestContainerController(unittest.TestCase): 'container') def test_status_map(statuses, expected, **kwargs): - proxy_server.http_connect = fake_http_connect(*statuses, - **kwargs) + set_http_connect(*statuses, **kwargs) self.app.memcache.store = {} req = Request.blank('/a/c', {}) req.content_length = 0 @@ -3413,38 +3346,32 @@ class TestContainerController(unittest.TestCase): controller = proxy_server.ContainerController(self.app, 'account', 'container') if meth == 'PUT': - proxy_server.http_connect = \ - fake_http_connect(200, 200, 200, 200, 200, 200, + set_http_connect(200, 200, 200, 200, 200, 200, missing_container=True) else: - proxy_server.http_connect = \ - fake_http_connect(200, 200, 200, 200) + set_http_connect(200, 200, 200, 200) self.app.memcache.store = {} req = Request.blank('/a/c', environ={'REQUEST_METHOD': meth}) self.app.update_request(req) resp = getattr(controller, meth)(req) self.assertEquals(resp.status_int, 200) - proxy_server.http_connect = \ - fake_http_connect(404, 404, 404, 200, 200, 200) + set_http_connect(404, 404, 404, 200, 200, 200) resp = getattr(controller, meth)(req) self.assertEquals(resp.status_int, 404) - proxy_server.http_connect = \ - fake_http_connect(503, 404, 404) + set_http_connect(503, 404, 404) resp = getattr(controller, meth)(req) self.assertEquals(resp.status_int, 404) - proxy_server.http_connect = \ - fake_http_connect(503, 404, raise_exc=True) + set_http_connect(503, 404, raise_exc=True) resp = getattr(controller, meth)(req) self.assertEquals(resp.status_int, 404) for dev in self.app.account_ring.devs.values(): dev['errors'] = self.app.error_suppression_limit + 1 dev['last_error'] = time() - proxy_server.http_connect = \ - fake_http_connect(200, 200, 200, 200, 200, 200) + set_http_connect(200, 200, 200, 200, 200, 200) resp = getattr(controller, meth)(req) self.assertEquals(resp.status_int, 404) @@ -3467,8 +3394,8 @@ class TestContainerController(unittest.TestCase): controller = proxy_server.ContainerController(self.app, 'account', 'container') self.app.memcache = MockMemcache(allow_lock=True) - proxy_server.http_connect = fake_http_connect( - 200, 200, 200, 201, 201, 201, missing_container=True) + set_http_connect(200, 200, 200, 201, 201, 201, + missing_container=True) req = Request.blank('/a/c', environ={'REQUEST_METHOD': 'PUT'}) self.app.update_request(req) res = controller.PUT(req) @@ -3476,7 +3403,7 @@ class TestContainerController(unittest.TestCase): def test_error_limiting(self): with save_globals(): - proxy_server.shuffle = lambda l: None + set_shuffle() controller = proxy_server.ContainerController(self.app, 'account', 'container') self.assert_status_map(controller.HEAD, (200, 503, 200, 200), 200, @@ -3524,7 +3451,7 @@ class TestContainerController(unittest.TestCase): def test_response_get_accept_ranges_header(self): with save_globals(): - proxy_server.http_connect = fake_http_connect(200, 200, body='{}') + set_http_connect(200, 200, body='{}') controller = proxy_server.ContainerController(self.app, 'account', 'container') req = Request.blank('/a/c?format=json') @@ -3535,7 +3462,7 @@ class TestContainerController(unittest.TestCase): def test_response_head_accept_ranges_header(self): with save_globals(): - proxy_server.http_connect = fake_http_connect(200, 200, body='{}') + set_http_connect(200, 200, body='{}') controller = proxy_server.ContainerController(self.app, 'account', 'container') req = Request.blank('/a/c?format=json') @@ -3576,8 +3503,7 @@ class TestContainerController(unittest.TestCase): with save_globals(): controller = \ proxy_server.ContainerController(self.app, 'a', 'c') - proxy_server.http_connect = fake_http_connect(200, 201, 201, - 201, give_connect=test_connect) + set_http_connect(200, 201, 201, 201, give_connect=test_connect) req = Request.blank('/a/c', environ={'REQUEST_METHOD': method}, headers={test_header: test_value}) self.app.update_request(req) @@ -3593,20 +3519,20 @@ class TestContainerController(unittest.TestCase): def bad_metadata_helper(self, method): with save_globals(): controller = proxy_server.ContainerController(self.app, 'a', 'c') - proxy_server.http_connect = fake_http_connect(200, 201, 201, 201) + set_http_connect(200, 201, 201, 201) req = Request.blank('/a/c', environ={'REQUEST_METHOD': method}) self.app.update_request(req) resp = getattr(controller, method)(req) self.assertEquals(resp.status_int, 201) - proxy_server.http_connect = fake_http_connect(201, 201, 201) + set_http_connect(201, 201, 201) req = Request.blank('/a/c', environ={'REQUEST_METHOD': method}, headers={'X-Container-Meta-' + ('a' * MAX_META_NAME_LENGTH): 'v'}) self.app.update_request(req) resp = getattr(controller, method)(req) self.assertEquals(resp.status_int, 201) - proxy_server.http_connect = fake_http_connect(201, 201, 201) + set_http_connect(201, 201, 201) req = Request.blank('/a/c', environ={'REQUEST_METHOD': method}, headers={'X-Container-Meta-' + ('a' * (MAX_META_NAME_LENGTH + 1)): 'v'}) @@ -3614,14 +3540,14 @@ class TestContainerController(unittest.TestCase): resp = getattr(controller, method)(req) self.assertEquals(resp.status_int, 400) - proxy_server.http_connect = fake_http_connect(201, 201, 201) + set_http_connect(201, 201, 201) req = Request.blank('/a/c', environ={'REQUEST_METHOD': method}, headers={'X-Container-Meta-Too-Long': 'a' * MAX_META_VALUE_LENGTH}) self.app.update_request(req) resp = getattr(controller, method)(req) self.assertEquals(resp.status_int, 201) - proxy_server.http_connect = fake_http_connect(201, 201, 201) + set_http_connect(201, 201, 201) req = Request.blank('/a/c', environ={'REQUEST_METHOD': method}, headers={'X-Container-Meta-Too-Long': 'a' * (MAX_META_VALUE_LENGTH + 1)}) @@ -3629,7 +3555,7 @@ class TestContainerController(unittest.TestCase): resp = getattr(controller, method)(req) self.assertEquals(resp.status_int, 400) - proxy_server.http_connect = fake_http_connect(201, 201, 201) + set_http_connect(201, 201, 201) headers = {} for x in xrange(MAX_META_COUNT): headers['X-Container-Meta-%d' % x] = 'v' @@ -3638,7 +3564,7 @@ class TestContainerController(unittest.TestCase): self.app.update_request(req) resp = getattr(controller, method)(req) self.assertEquals(resp.status_int, 201) - proxy_server.http_connect = fake_http_connect(201, 201, 201) + set_http_connect(201, 201, 201) headers = {} for x in xrange(MAX_META_COUNT + 1): headers['X-Container-Meta-%d' % x] = 'v' @@ -3648,7 +3574,7 @@ class TestContainerController(unittest.TestCase): resp = getattr(controller, method)(req) self.assertEquals(resp.status_int, 400) - proxy_server.http_connect = fake_http_connect(201, 201, 201) + set_http_connect(201, 201, 201) headers = {} header_value = 'a' * MAX_META_VALUE_LENGTH size = 0 @@ -3665,7 +3591,7 @@ class TestContainerController(unittest.TestCase): self.app.update_request(req) resp = getattr(controller, method)(req) self.assertEquals(resp.status_int, 201) - proxy_server.http_connect = fake_http_connect(201, 201, 201) + set_http_connect(201, 201, 201) headers['X-Container-Meta-a'] = \ 'a' * (MAX_META_OVERALL_SIZE - size) req = Request.blank('/a/c', environ={'REQUEST_METHOD': method}, @@ -3681,7 +3607,7 @@ class TestContainerController(unittest.TestCase): called[0] = True raise ValueError('fake error') with save_globals(): - proxy_server.http_connect = fake_http_connect(200, 201, 201, 201) + set_http_connect(200, 201, 201, 201) controller = proxy_server.ContainerController(self.app, 'account', 'container') req = Request.blank('/a/c', environ={'REQUEST_METHOD': 'POST'}, @@ -3692,7 +3618,7 @@ class TestContainerController(unittest.TestCase): self.assert_(called[0]) called[0] = False with save_globals(): - proxy_server.http_connect = fake_http_connect(200, 201, 201, 201) + set_http_connect(200, 201, 201, 201) controller = proxy_server.ContainerController(self.app, 'account', 'container') req = Request.blank('/a/c', environ={'REQUEST_METHOD': 'POST'}, @@ -3709,7 +3635,7 @@ class TestContainerController(unittest.TestCase): called[0] = True raise ValueError('fake error') with save_globals(): - proxy_server.http_connect = fake_http_connect(200, 201, 201, 201) + set_http_connect(200, 201, 201, 201) controller = proxy_server.ContainerController(self.app, 'account', 'container') req = Request.blank('/a/c', environ={'REQUEST_METHOD': 'PUT'}, @@ -3720,7 +3646,7 @@ class TestContainerController(unittest.TestCase): self.assert_(called[0]) called[0] = False with save_globals(): - proxy_server.http_connect = fake_http_connect(200, 201, 201, 201) + set_http_connect(200, 201, 201, 201) controller = proxy_server.ContainerController(self.app, 'account', 'container') req = Request.blank('/a/c', environ={'REQUEST_METHOD': 'PUT'}, @@ -3732,8 +3658,7 @@ class TestContainerController(unittest.TestCase): def test_GET_no_content(self): with save_globals(): - proxy_server.http_connect = \ - fake_http_connect(200, 204, 204, 204) + set_http_connect(200, 204, 204, 204) controller = proxy_server.ContainerController(self.app, 'account', 'container') req = Request.blank('/a/c') @@ -3749,8 +3674,7 @@ class TestContainerController(unittest.TestCase): called[0] = True return HTTPUnauthorized(request=req) with save_globals(): - proxy_server.http_connect = \ - fake_http_connect(200, 201, 201, 201) + set_http_connect(200, 201, 201, 201) controller = proxy_server.ContainerController(self.app, 'account', 'container') req = Request.blank('/a/c') @@ -3766,8 +3690,7 @@ class TestContainerController(unittest.TestCase): called[0] = True return HTTPUnauthorized(request=req) with save_globals(): - proxy_server.http_connect = \ - fake_http_connect(200, 201, 201, 201) + set_http_connect(200, 201, 201, 201) controller = proxy_server.ContainerController(self.app, 'account', 'container') req = Request.blank('/a/c', {'REQUEST_METHOD': 'HEAD'}) @@ -3786,12 +3709,12 @@ class TestAccountController(unittest.TestCase): def assert_status_map(self, method, statuses, expected): with save_globals(): - proxy_server.http_connect = fake_http_connect(*statuses) + set_http_connect(*statuses) req = Request.blank('/a', {}) self.app.update_request(req) res = method(req) self.assertEquals(res.status_int, expected) - proxy_server.http_connect = fake_http_connect(*statuses) + set_http_connect(*statuses) req = Request.blank('/a/', {}) self.app.update_request(req) res = method(req) @@ -3903,7 +3826,7 @@ class TestAccountController(unittest.TestCase): def test_response_get_accept_ranges_header(self): with save_globals(): - proxy_server.http_connect = fake_http_connect(200, 200, body='{}') + set_http_connect(200, 200, body='{}') controller = proxy_server.AccountController(self.app, 'account') req = Request.blank('/a?format=json') self.app.update_request(req) @@ -3913,7 +3836,7 @@ class TestAccountController(unittest.TestCase): def test_response_head_accept_ranges_header(self): with save_globals(): - proxy_server.http_connect = fake_http_connect(200, 200, body='{}') + set_http_connect(200, 200, body='{}') controller = proxy_server.AccountController(self.app, 'account') req = Request.blank('/a?format=json') self.app.update_request(req) @@ -3927,8 +3850,7 @@ class TestAccountController(unittest.TestCase): controller = proxy_server.AccountController(self.app, 'account') def test_status_map(statuses, expected, **kwargs): - proxy_server.http_connect = \ - fake_http_connect(*statuses, **kwargs) + set_http_connect(*statuses, **kwargs) self.app.memcache.store = {} req = Request.blank('/a', {}) req.content_length = 0 @@ -3992,8 +3914,7 @@ class TestAccountController(unittest.TestCase): self.app.allow_account_management = True controller = \ proxy_server.AccountController(self.app, 'a') - proxy_server.http_connect = fake_http_connect(201, 201, 201, - give_connect=test_connect) + set_http_connect(201, 201, 201, give_connect=test_connect) req = Request.blank('/a/c', environ={'REQUEST_METHOD': method}, headers={test_header: test_value}) self.app.update_request(req) @@ -4010,20 +3931,20 @@ class TestAccountController(unittest.TestCase): with save_globals(): self.app.allow_account_management = True controller = proxy_server.AccountController(self.app, 'a') - proxy_server.http_connect = fake_http_connect(200, 201, 201, 201) + set_http_connect(200, 201, 201, 201) req = Request.blank('/a/c', environ={'REQUEST_METHOD': method}) self.app.update_request(req) resp = getattr(controller, method)(req) self.assertEquals(resp.status_int, 201) - proxy_server.http_connect = fake_http_connect(201, 201, 201) + set_http_connect(201, 201, 201) req = Request.blank('/a/c', environ={'REQUEST_METHOD': method}, headers={'X-Account-Meta-' + ('a' * MAX_META_NAME_LENGTH): 'v'}) self.app.update_request(req) resp = getattr(controller, method)(req) self.assertEquals(resp.status_int, 201) - proxy_server.http_connect = fake_http_connect(201, 201, 201) + set_http_connect(201, 201, 201) req = Request.blank('/a/c', environ={'REQUEST_METHOD': method}, headers={'X-Account-Meta-' + ('a' * (MAX_META_NAME_LENGTH + 1)): 'v'}) @@ -4031,14 +3952,14 @@ class TestAccountController(unittest.TestCase): resp = getattr(controller, method)(req) self.assertEquals(resp.status_int, 400) - proxy_server.http_connect = fake_http_connect(201, 201, 201) + set_http_connect(201, 201, 201) req = Request.blank('/a/c', environ={'REQUEST_METHOD': method}, headers={'X-Account-Meta-Too-Long': 'a' * MAX_META_VALUE_LENGTH}) self.app.update_request(req) resp = getattr(controller, method)(req) self.assertEquals(resp.status_int, 201) - proxy_server.http_connect = fake_http_connect(201, 201, 201) + set_http_connect(201, 201, 201) req = Request.blank('/a/c', environ={'REQUEST_METHOD': method}, headers={'X-Account-Meta-Too-Long': 'a' * (MAX_META_VALUE_LENGTH + 1)}) @@ -4046,7 +3967,7 @@ class TestAccountController(unittest.TestCase): resp = getattr(controller, method)(req) self.assertEquals(resp.status_int, 400) - proxy_server.http_connect = fake_http_connect(201, 201, 201) + set_http_connect(201, 201, 201) headers = {} for x in xrange(MAX_META_COUNT): headers['X-Account-Meta-%d' % x] = 'v' @@ -4055,7 +3976,7 @@ class TestAccountController(unittest.TestCase): self.app.update_request(req) resp = getattr(controller, method)(req) self.assertEquals(resp.status_int, 201) - proxy_server.http_connect = fake_http_connect(201, 201, 201) + set_http_connect(201, 201, 201) headers = {} for x in xrange(MAX_META_COUNT + 1): headers['X-Account-Meta-%d' % x] = 'v' @@ -4065,7 +3986,7 @@ class TestAccountController(unittest.TestCase): resp = getattr(controller, method)(req) self.assertEquals(resp.status_int, 400) - proxy_server.http_connect = fake_http_connect(201, 201, 201) + set_http_connect(201, 201, 201) headers = {} header_value = 'a' * MAX_META_VALUE_LENGTH size = 0 @@ -4082,7 +4003,7 @@ class TestAccountController(unittest.TestCase): self.app.update_request(req) resp = getattr(controller, method)(req) self.assertEquals(resp.status_int, 201) - proxy_server.http_connect = fake_http_connect(201, 201, 201) + set_http_connect(201, 201, 201) headers['X-Account-Meta-a'] = \ 'a' * (MAX_META_OVERALL_SIZE - size) req = Request.blank('/a/c', environ={'REQUEST_METHOD': method}, @@ -4096,8 +4017,7 @@ class TestAccountController(unittest.TestCase): controller = proxy_server.AccountController(self.app, 'account') def test_status_map(statuses, expected, **kwargs): - proxy_server.http_connect = \ - fake_http_connect(*statuses, **kwargs) + set_http_connect(*statuses, **kwargs) self.app.memcache.store = {} req = Request.blank('/a', {'REQUEST_METHOD': 'DELETE'}) req.content_length = 0 @@ -4163,18 +4083,18 @@ class TestSegmentedIterable(unittest.TestCase): def test_load_next_segment_unexpected_error(self): # Iterator value isn't a dict self.assertRaises(Exception, - proxy_server.SegmentedIterable(self.controller, None, + SegmentedIterable(self.controller, None, [None])._load_next_segment) self.assert_(self.controller.exception_args[0].startswith( 'ERROR: While processing manifest')) def test_load_next_segment_with_no_segments(self): self.assertRaises(StopIteration, - proxy_server.SegmentedIterable(self.controller, 'lc', + SegmentedIterable(self.controller, 'lc', [])._load_next_segment) def test_load_next_segment_with_one_segment(self): - segit = proxy_server.SegmentedIterable(self.controller, 'lc', [{'name': + segit = SegmentedIterable(self.controller, 'lc', [{'name': 'o1'}]) segit._load_next_segment() self.assertEquals(self.controller.GETorHEAD_base_args[4], '/a/lc/o1') @@ -4182,7 +4102,7 @@ class TestSegmentedIterable(unittest.TestCase): self.assertEquals(data, '1') def test_load_next_segment_with_two_segments(self): - segit = proxy_server.SegmentedIterable(self.controller, 'lc', [{'name': + segit = SegmentedIterable(self.controller, 'lc', [{'name': 'o1'}, {'name': 'o2'}]) segit._load_next_segment() self.assertEquals(self.controller.GETorHEAD_base_args[4], '/a/lc/o1') @@ -4194,7 +4114,7 @@ class TestSegmentedIterable(unittest.TestCase): self.assertEquals(data, '22') def test_load_next_segment_with_two_segments_skip_first(self): - segit = proxy_server.SegmentedIterable(self.controller, 'lc', [{'name': + segit = SegmentedIterable(self.controller, 'lc', [{'name': 'o1'}, {'name': 'o2'}]) segit.segment = 0 segit.listing.next() @@ -4204,7 +4124,7 @@ class TestSegmentedIterable(unittest.TestCase): self.assertEquals(data, '22') def test_load_next_segment_with_seek(self): - segit = proxy_server.SegmentedIterable(self.controller, 'lc', [{'name': + segit = SegmentedIterable(self.controller, 'lc', [{'name': 'o1'}, {'name': 'o2'}]) segit.segment = 0 segit.listing.next() @@ -4223,7 +4143,7 @@ class TestSegmentedIterable(unittest.TestCase): self.controller.GETorHEAD_base = local_GETorHEAD_base self.assertRaises(Exception, - proxy_server.SegmentedIterable(self.controller, 'lc', [{'name': + SegmentedIterable(self.controller, 'lc', [{'name': 'o1'}])._load_next_segment) self.assert_(self.controller.exception_args[0].startswith( 'ERROR: While processing manifest')) @@ -4233,22 +4153,22 @@ class TestSegmentedIterable(unittest.TestCase): def test_iter_unexpected_error(self): # Iterator value isn't a dict self.assertRaises(Exception, ''.join, - proxy_server.SegmentedIterable(self.controller, None, [None])) + SegmentedIterable(self.controller, None, [None])) self.assert_(self.controller.exception_args[0].startswith( 'ERROR: While processing manifest')) def test_iter_with_no_segments(self): - segit = proxy_server.SegmentedIterable(self.controller, 'lc', []) + segit = SegmentedIterable(self.controller, 'lc', []) self.assertEquals(''.join(segit), '') def test_iter_with_one_segment(self): - segit = proxy_server.SegmentedIterable(self.controller, 'lc', [{'name': + segit = SegmentedIterable(self.controller, 'lc', [{'name': 'o1'}]) segit.response = Stub() self.assertEquals(''.join(segit), '1') def test_iter_with_two_segments(self): - segit = proxy_server.SegmentedIterable(self.controller, 'lc', [{'name': + segit = SegmentedIterable(self.controller, 'lc', [{'name': 'o1'}, {'name': 'o2'}]) segit.response = Stub() self.assertEquals(''.join(segit), '122') @@ -4260,7 +4180,7 @@ class TestSegmentedIterable(unittest.TestCase): self.controller.GETorHEAD_base = local_GETorHEAD_base self.assertRaises(Exception, ''.join, - proxy_server.SegmentedIterable(self.controller, 'lc', [{'name': + SegmentedIterable(self.controller, 'lc', [{'name': 'o1'}])) self.assert_(self.controller.exception_args[0].startswith( 'ERROR: While processing manifest')) @@ -4270,54 +4190,54 @@ class TestSegmentedIterable(unittest.TestCase): def test_app_iter_range_unexpected_error(self): # Iterator value isn't a dict self.assertRaises(Exception, - proxy_server.SegmentedIterable(self.controller, None, + SegmentedIterable(self.controller, None, [None]).app_iter_range(None, None).next) self.assert_(self.controller.exception_args[0].startswith( 'ERROR: While processing manifest')) def test_app_iter_range_with_no_segments(self): - self.assertEquals(''.join(proxy_server.SegmentedIterable( + self.assertEquals(''.join(SegmentedIterable( self.controller, 'lc', []).app_iter_range(None, None)), '') - self.assertEquals(''.join(proxy_server.SegmentedIterable( + self.assertEquals(''.join(SegmentedIterable( self.controller, 'lc', []).app_iter_range(3, None)), '') - self.assertEquals(''.join(proxy_server.SegmentedIterable( + self.assertEquals(''.join(SegmentedIterable( self.controller, 'lc', []).app_iter_range(3, 5)), '') - self.assertEquals(''.join(proxy_server.SegmentedIterable( + self.assertEquals(''.join(SegmentedIterable( self.controller, 'lc', []).app_iter_range(None, 5)), '') def test_app_iter_range_with_one_segment(self): listing = [{'name': 'o1', 'bytes': 1}] - segit = proxy_server.SegmentedIterable(self.controller, 'lc', listing) + segit = SegmentedIterable(self.controller, 'lc', listing) segit.response = Stub() self.assertEquals(''.join(segit.app_iter_range(None, None)), '1') - segit = proxy_server.SegmentedIterable(self.controller, 'lc', listing) + segit = SegmentedIterable(self.controller, 'lc', listing) self.assertEquals(''.join(segit.app_iter_range(3, None)), '') - segit = proxy_server.SegmentedIterable(self.controller, 'lc', listing) + segit = SegmentedIterable(self.controller, 'lc', listing) self.assertEquals(''.join(segit.app_iter_range(3, 5)), '') - segit = proxy_server.SegmentedIterable(self.controller, 'lc', listing) + segit = SegmentedIterable(self.controller, 'lc', listing) segit.response = Stub() self.assertEquals(''.join(segit.app_iter_range(None, 5)), '1') def test_app_iter_range_with_two_segments(self): listing = [{'name': 'o1', 'bytes': 1}, {'name': 'o2', 'bytes': 2}] - segit = proxy_server.SegmentedIterable(self.controller, 'lc', listing) + segit = SegmentedIterable(self.controller, 'lc', listing) segit.response = Stub() self.assertEquals(''.join(segit.app_iter_range(None, None)), '122') - segit = proxy_server.SegmentedIterable(self.controller, 'lc', listing) + segit = SegmentedIterable(self.controller, 'lc', listing) segit.response = Stub() self.assertEquals(''.join(segit.app_iter_range(1, None)), '22') - segit = proxy_server.SegmentedIterable(self.controller, 'lc', listing) + segit = SegmentedIterable(self.controller, 'lc', listing) segit.response = Stub() self.assertEquals(''.join(segit.app_iter_range(1, 5)), '22') - segit = proxy_server.SegmentedIterable(self.controller, 'lc', listing) + segit = SegmentedIterable(self.controller, 'lc', listing) segit.response = Stub() self.assertEquals(''.join(segit.app_iter_range(None, 2)), '12') @@ -4326,33 +4246,33 @@ class TestSegmentedIterable(unittest.TestCase): {'name': 'o3', 'bytes': 3}, {'name': 'o4', 'bytes': 4}, {'name': 'o5', 'bytes': 5}] - segit = proxy_server.SegmentedIterable(self.controller, 'lc', listing) + segit = SegmentedIterable(self.controller, 'lc', listing) segit.response = Stub() self.assertEquals(''.join(segit.app_iter_range(None, None)), '122333444455555') - segit = proxy_server.SegmentedIterable(self.controller, 'lc', listing) + segit = SegmentedIterable(self.controller, 'lc', listing) segit.response = Stub() self.assertEquals(''.join(segit.app_iter_range(3, None)), '333444455555') - segit = proxy_server.SegmentedIterable(self.controller, 'lc', listing) + segit = SegmentedIterable(self.controller, 'lc', listing) segit.response = Stub() self.assertEquals(''.join(segit.app_iter_range(5, None)), '3444455555') - segit = proxy_server.SegmentedIterable(self.controller, 'lc', listing) + segit = SegmentedIterable(self.controller, 'lc', listing) segit.response = Stub() self.assertEquals(''.join(segit.app_iter_range(None, 6)), '122333') - segit = proxy_server.SegmentedIterable(self.controller, 'lc', listing) + segit = SegmentedIterable(self.controller, 'lc', listing) segit.response = Stub() self.assertEquals(''.join(segit.app_iter_range(None, 7)), '1223334') - segit = proxy_server.SegmentedIterable(self.controller, 'lc', listing) + segit = SegmentedIterable(self.controller, 'lc', listing) segit.response = Stub() self.assertEquals(''.join(segit.app_iter_range(3, 7)), '3334') - segit = proxy_server.SegmentedIterable(self.controller, 'lc', listing) + segit = SegmentedIterable(self.controller, 'lc', listing) segit.response = Stub() self.assertEquals(''.join(segit.app_iter_range(5, 7)), '34') |