summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJenkins <jenkins@review.openstack.org>2012-08-23 23:08:03 +0000
committerGerrit Code Review <review@openstack.org>2012-08-23 23:08:03 +0000
commitd8c02dccc098177c9b63536353b986b27645909d (patch)
tree7d2c05367a1a868a587158a8a18b9b97810183f8
parent3f01f889d5d61c0352ba73e7c1bca32b4b48d952 (diff)
parenteb4af8f84097ae6f1c0057313dd8c0c002b3d49d (diff)
downloadswift-d8c02dccc098177c9b63536353b986b27645909d.tar.gz
Merge "split proxy controllers into individual modules"
-rw-r--r--swift/common/middleware/ratelimit.py2
-rw-r--r--swift/proxy/controllers/__init__.py4
-rw-r--r--swift/proxy/controllers/account.py164
-rw-r--r--swift/proxy/controllers/base.py678
-rw-r--r--swift/proxy/controllers/container.py228
-rw-r--r--swift/proxy/controllers/obj.py945
-rw-r--r--swift/proxy/server.py1881
-rw-r--r--test/unit/common/middleware/test_ratelimit.py2
-rw-r--r--test/unit/proxy/test_server.py662
9 files changed, 2321 insertions, 2245 deletions
diff --git a/swift/common/middleware/ratelimit.py b/swift/common/middleware/ratelimit.py
index 86cf9e0d5..dad907c6d 100644
--- a/swift/common/middleware/ratelimit.py
+++ b/swift/common/middleware/ratelimit.py
@@ -16,7 +16,7 @@ import eventlet
from webob import Request, Response
from swift.common.utils import split_path, cache_from_env, get_logger
-from swift.proxy.server import get_container_memcache_key
+from swift.proxy.controllers.base import get_container_memcache_key
from swift.common.memcached import MemcacheConnectionError
diff --git a/swift/proxy/controllers/__init__.py b/swift/proxy/controllers/__init__.py
new file mode 100644
index 000000000..516f3c285
--- /dev/null
+++ b/swift/proxy/controllers/__init__.py
@@ -0,0 +1,4 @@
+from swift.proxy.controllers.base import Controller
+from swift.proxy.controllers.obj import ObjectController
+from swift.proxy.controllers.account import AccountController
+from swift.proxy.controllers.container import ContainerController
diff --git a/swift/proxy/controllers/account.py b/swift/proxy/controllers/account.py
new file mode 100644
index 000000000..bd95d5555
--- /dev/null
+++ b/swift/proxy/controllers/account.py
@@ -0,0 +1,164 @@
+# Copyright (c) 2010-2012 OpenStack, LLC.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# NOTE: swift_conn
+# You'll see swift_conn passed around a few places in this file. This is the
+# source httplib connection of whatever it is attached to.
+# It is used when early termination of reading from the connection should
+# happen, such as when a range request is satisfied but there's still more the
+# source connection would like to send. To prevent having to read all the data
+# that could be left, the source connection can be .close() and then reads
+# commence to empty out any buffers.
+# These shenanigans are to ensure all related objects can be garbage
+# collected. We've seen objects hang around forever otherwise.
+
+import time
+from urllib import unquote
+from random import shuffle
+
+from webob.exc import HTTPBadRequest, HTTPMethodNotAllowed
+from webob import Request
+
+from swift.common.utils import normalize_timestamp, public
+from swift.common.constraints import check_metadata, MAX_ACCOUNT_NAME_LENGTH
+from swift.common.http import is_success, HTTP_NOT_FOUND
+from swift.proxy.controllers.base import Controller
+
+
+class AccountController(Controller):
+ """WSGI controller for account requests"""
+ server_type = _('Account')
+
+ def __init__(self, app, account_name, **kwargs):
+ Controller.__init__(self, app)
+ self.account_name = unquote(account_name)
+
+ def GETorHEAD(self, req, stats_type):
+ """Handler for HTTP GET/HEAD requests."""
+ start_time = time.time()
+ partition, nodes = self.app.account_ring.get_nodes(self.account_name)
+ shuffle(nodes)
+ resp = self.GETorHEAD_base(req, _('Account'), partition, nodes,
+ req.path_info.rstrip('/'), len(nodes))
+ if resp.status_int == HTTP_NOT_FOUND and self.app.account_autocreate:
+ if len(self.account_name) > MAX_ACCOUNT_NAME_LENGTH:
+ resp = HTTPBadRequest(request=req)
+ resp.body = 'Account name length of %d longer than %d' % \
+ (len(self.account_name), MAX_ACCOUNT_NAME_LENGTH)
+ self.app.logger.timing_since(
+ '%s.timing' % (stats_type,), start_time)
+ return resp
+ headers = {'X-Timestamp': normalize_timestamp(time.time()),
+ 'X-Trans-Id': self.trans_id,
+ 'Connection': 'close'}
+ resp = self.make_requests(
+ Request.blank('/v1/' + self.account_name),
+ self.app.account_ring, partition, 'PUT',
+ '/' + self.account_name, [headers] * len(nodes))
+ if not is_success(resp.status_int):
+ self.app.logger.warning('Could not autocreate account %r' %
+ self.account_name)
+ return resp
+ resp = self.GETorHEAD_base(req, _('Account'), partition, nodes,
+ req.path_info.rstrip('/'), len(nodes))
+ self.app.logger.timing_since('%s.timing' % (stats_type,), start_time)
+ return resp
+
+ @public
+ def PUT(self, req):
+ """HTTP PUT request handler."""
+ start_time = time.time()
+ if not self.app.allow_account_management:
+ self.app.logger.timing_since('PUT.timing', start_time)
+ return HTTPMethodNotAllowed(request=req)
+ error_response = check_metadata(req, 'account')
+ if error_response:
+ self.app.logger.increment('errors')
+ return error_response
+ if len(self.account_name) > MAX_ACCOUNT_NAME_LENGTH:
+ resp = HTTPBadRequest(request=req)
+ resp.body = 'Account name length of %d longer than %d' % \
+ (len(self.account_name), MAX_ACCOUNT_NAME_LENGTH)
+ self.app.logger.increment('errors')
+ return resp
+ account_partition, accounts = \
+ self.app.account_ring.get_nodes(self.account_name)
+ headers = {'X-Timestamp': normalize_timestamp(time.time()),
+ 'x-trans-id': self.trans_id,
+ 'Connection': 'close'}
+ self.transfer_headers(req.headers, headers)
+ if self.app.memcache:
+ self.app.memcache.delete('account%s' % req.path_info.rstrip('/'))
+ resp = self.make_requests(req, self.app.account_ring,
+ account_partition, 'PUT', req.path_info, [headers] * len(accounts))
+ self.app.logger.timing_since('PUT.timing', start_time)
+ return resp
+
+ @public
+ def POST(self, req):
+ """HTTP POST request handler."""
+ start_time = time.time()
+ error_response = check_metadata(req, 'account')
+ if error_response:
+ self.app.logger.increment('errors')
+ return error_response
+ account_partition, accounts = \
+ self.app.account_ring.get_nodes(self.account_name)
+ headers = {'X-Timestamp': normalize_timestamp(time.time()),
+ 'X-Trans-Id': self.trans_id,
+ 'Connection': 'close'}
+ self.transfer_headers(req.headers, headers)
+ if self.app.memcache:
+ self.app.memcache.delete('account%s' % req.path_info.rstrip('/'))
+ resp = self.make_requests(req, self.app.account_ring,
+ account_partition, 'POST', req.path_info,
+ [headers] * len(accounts))
+ if resp.status_int == HTTP_NOT_FOUND and self.app.account_autocreate:
+ if len(self.account_name) > MAX_ACCOUNT_NAME_LENGTH:
+ resp = HTTPBadRequest(request=req)
+ resp.body = 'Account name length of %d longer than %d' % \
+ (len(self.account_name), MAX_ACCOUNT_NAME_LENGTH)
+ self.app.logger.increment('errors')
+ return resp
+ resp = self.make_requests(
+ Request.blank('/v1/' + self.account_name),
+ self.app.account_ring, account_partition, 'PUT',
+ '/' + self.account_name, [headers] * len(accounts))
+ if not is_success(resp.status_int):
+ self.app.logger.warning('Could not autocreate account %r' %
+ self.account_name)
+ return resp
+ self.app.logger.timing_since('POST.timing', start_time)
+ return resp
+
+ @public
+ def DELETE(self, req):
+ """HTTP DELETE request handler."""
+ start_time = time.time()
+ if not self.app.allow_account_management:
+ self.app.logger.timing_since('DELETE.timing', start_time)
+ return HTTPMethodNotAllowed(request=req)
+ account_partition, accounts = \
+ self.app.account_ring.get_nodes(self.account_name)
+ headers = {'X-Timestamp': normalize_timestamp(time.time()),
+ 'X-Trans-Id': self.trans_id,
+ 'Connection': 'close'}
+ if self.app.memcache:
+ self.app.memcache.delete('account%s' % req.path_info.rstrip('/'))
+ resp = self.make_requests(req, self.app.account_ring,
+ account_partition, 'DELETE', req.path_info,
+ [headers] * len(accounts))
+ self.app.logger.timing_since('DELETE.timing', start_time)
+ return resp
diff --git a/swift/proxy/controllers/base.py b/swift/proxy/controllers/base.py
new file mode 100644
index 000000000..ac032676f
--- /dev/null
+++ b/swift/proxy/controllers/base.py
@@ -0,0 +1,678 @@
+# Copyright (c) 2010-2012 OpenStack, LLC.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# NOTE: swift_conn
+# You'll see swift_conn passed around a few places in this file. This is the
+# source httplib connection of whatever it is attached to.
+# It is used when early termination of reading from the connection should
+# happen, such as when a range request is satisfied but there's still more the
+# source connection would like to send. To prevent having to read all the data
+# that could be left, the source connection can be .close() and then reads
+# commence to empty out any buffers.
+# These shenanigans are to ensure all related objects can be garbage
+# collected. We've seen objects hang around forever otherwise.
+
+import time
+import functools
+
+from eventlet import spawn_n, GreenPile, Timeout
+from eventlet.queue import Queue, Empty, Full
+from eventlet.timeout import Timeout
+from webob.exc import \
+ status_map
+from webob import Request, Response
+
+from swift.common.utils import normalize_timestamp, TRUE_VALUES, public
+from swift.common.bufferedhttp import http_connect
+from swift.common.constraints import MAX_ACCOUNT_NAME_LENGTH
+from swift.common.exceptions import ChunkReadTimeout, ConnectionTimeout
+from swift.common.http import is_informational, is_success, is_redirection, \
+ is_server_error, HTTP_OK, HTTP_PARTIAL_CONTENT, HTTP_MULTIPLE_CHOICES, \
+ HTTP_BAD_REQUEST, HTTP_NOT_FOUND, HTTP_SERVICE_UNAVAILABLE, \
+ HTTP_INSUFFICIENT_STORAGE
+
+
+def update_headers(response, headers):
+ """
+ Helper function to update headers in the response.
+
+ :param response: webob.Response object
+ :param headers: dictionary headers
+ """
+ if hasattr(headers, 'items'):
+ headers = headers.items()
+ for name, value in headers:
+ if name == 'etag':
+ response.headers[name] = value.replace('"', '')
+ elif name not in ('date', 'content-length', 'content-type',
+ 'connection', 'x-put-timestamp', 'x-delete-after'):
+ response.headers[name] = value
+
+
+def delay_denial(func):
+ """
+ Decorator to declare which methods should have any swift.authorize call
+ delayed. This is so the method can load the Request object up with
+ additional information that may be needed by the authorization system.
+
+ :param func: function for which authorization will be delayed
+ """
+ func.delay_denial = True
+
+ @functools.wraps(func)
+ def wrapped(*a, **kw):
+ return func(*a, **kw)
+ return wrapped
+
+
+def get_account_memcache_key(account):
+ return 'account/%s' % account
+
+
+def get_container_memcache_key(account, container):
+ return 'container/%s/%s' % (account, container)
+
+
+class Controller(object):
+ """Base WSGI controller class for the proxy"""
+ server_type = _('Base')
+
+ # Ensure these are all lowercase
+ pass_through_headers = []
+
+ def __init__(self, app):
+ self.account_name = None
+ self.app = app
+ self.trans_id = '-'
+
+ def transfer_headers(self, src_headers, dst_headers):
+ x_remove = 'x-remove-%s-meta-' % self.server_type.lower()
+ x_meta = 'x-%s-meta-' % self.server_type.lower()
+ dst_headers.update((k.lower().replace('-remove', '', 1), '')
+ for k in src_headers
+ if k.lower().startswith(x_remove))
+ dst_headers.update((k.lower(), v)
+ for k, v in src_headers.iteritems()
+ if k.lower() in self.pass_through_headers or
+ k.lower().startswith(x_meta))
+
+ def error_increment(self, node):
+ """
+ Handles incrementing error counts when talking to nodes.
+
+ :param node: dictionary of node to increment the error count for
+ """
+ node['errors'] = node.get('errors', 0) + 1
+ node['last_error'] = time.time()
+
+ def error_occurred(self, node, msg):
+ """
+ Handle logging, and handling of errors.
+
+ :param node: dictionary of node to handle errors for
+ :param msg: error message
+ """
+ self.error_increment(node)
+ self.app.logger.error(_('%(msg)s %(ip)s:%(port)s'),
+ {'msg': msg, 'ip': node['ip'], 'port': node['port']})
+
+ def exception_occurred(self, node, typ, additional_info):
+ """
+ Handle logging of generic exceptions.
+
+ :param node: dictionary of node to log the error for
+ :param typ: server type
+ :param additional_info: additional information to log
+ """
+ self.app.logger.exception(
+ _('ERROR with %(type)s server %(ip)s:%(port)s/%(device)s re: '
+ '%(info)s'),
+ {'type': typ, 'ip': node['ip'], 'port': node['port'],
+ 'device': node['device'], 'info': additional_info})
+
+ def error_limited(self, node):
+ """
+ Check if the node is currently error limited.
+
+ :param node: dictionary of node to check
+ :returns: True if error limited, False otherwise
+ """
+ now = time.time()
+ if not 'errors' in node:
+ return False
+ if 'last_error' in node and node['last_error'] < \
+ now - self.app.error_suppression_interval:
+ del node['last_error']
+ if 'errors' in node:
+ del node['errors']
+ return False
+ limited = node['errors'] > self.app.error_suppression_limit
+ if limited:
+ self.app.logger.debug(
+ _('Node error limited %(ip)s:%(port)s (%(device)s)'), node)
+ return limited
+
+ def error_limit(self, node):
+ """
+ Mark a node as error limited.
+
+ :param node: dictionary of node to error limit
+ """
+ node['errors'] = self.app.error_suppression_limit + 1
+ node['last_error'] = time.time()
+
+ def account_info(self, account, autocreate=False):
+ """
+ Get account information, and also verify that the account exists.
+
+ :param account: name of the account to get the info for
+ :returns: tuple of (account partition, account nodes, container_count)
+ or (None, None, None) if it does not exist
+ """
+ partition, nodes = self.app.account_ring.get_nodes(account)
+ # 0 = no responses, 200 = found, 404 = not found, -1 = mixed responses
+ if self.app.memcache:
+ cache_key = get_account_memcache_key(account)
+ cache_value = self.app.memcache.get(cache_key)
+ if not isinstance(cache_value, dict):
+ result_code = cache_value
+ container_count = 0
+ else:
+ result_code = cache_value['status']
+ container_count = cache_value['container_count']
+ if result_code == HTTP_OK:
+ return partition, nodes, container_count
+ elif result_code == HTTP_NOT_FOUND and not autocreate:
+ return None, None, None
+ result_code = 0
+ container_count = 0
+ attempts_left = len(nodes)
+ path = '/%s' % account
+ headers = {'x-trans-id': self.trans_id, 'Connection': 'close'}
+ iternodes = self.iter_nodes(partition, nodes, self.app.account_ring)
+ while attempts_left > 0:
+ try:
+ node = iternodes.next()
+ except StopIteration:
+ break
+ attempts_left -= 1
+ try:
+ with ConnectionTimeout(self.app.conn_timeout):
+ conn = http_connect(node['ip'], node['port'],
+ node['device'], partition, 'HEAD', path, headers)
+ with Timeout(self.app.node_timeout):
+ resp = conn.getresponse()
+ body = resp.read()
+ if is_success(resp.status):
+ result_code = HTTP_OK
+ container_count = int(
+ resp.getheader('x-account-container-count') or 0)
+ break
+ elif resp.status == HTTP_NOT_FOUND:
+ if result_code == 0:
+ result_code = HTTP_NOT_FOUND
+ elif result_code != HTTP_NOT_FOUND:
+ result_code = -1
+ elif resp.status == HTTP_INSUFFICIENT_STORAGE:
+ self.error_limit(node)
+ continue
+ else:
+ result_code = -1
+ except (Exception, Timeout):
+ self.exception_occurred(node, _('Account'),
+ _('Trying to get account info for %s') % path)
+ if result_code == HTTP_NOT_FOUND and autocreate:
+ if len(account) > MAX_ACCOUNT_NAME_LENGTH:
+ return None, None, None
+ headers = {'X-Timestamp': normalize_timestamp(time.time()),
+ 'X-Trans-Id': self.trans_id,
+ 'Connection': 'close'}
+ resp = self.make_requests(Request.blank('/v1' + path),
+ self.app.account_ring, partition, 'PUT',
+ path, [headers] * len(nodes))
+ if not is_success(resp.status_int):
+ self.app.logger.warning('Could not autocreate account %r' % \
+ path)
+ return None, None, None
+ result_code = HTTP_OK
+ if self.app.memcache and result_code in (HTTP_OK, HTTP_NOT_FOUND):
+ if result_code == HTTP_OK:
+ cache_timeout = self.app.recheck_account_existence
+ else:
+ cache_timeout = self.app.recheck_account_existence * 0.1
+ self.app.memcache.set(cache_key,
+ {'status': result_code, 'container_count': container_count},
+ timeout=cache_timeout)
+ if result_code == HTTP_OK:
+ return partition, nodes, container_count
+ return None, None, None
+
+ def container_info(self, account, container, account_autocreate=False):
+ """
+ Get container information and thusly verify container existance.
+ This will also make a call to account_info to verify that the
+ account exists.
+
+ :param account: account name for the container
+ :param container: container name to look up
+ :returns: tuple of (container partition, container nodes, container
+ read acl, container write acl, container sync key) or (None,
+ None, None, None, None) if the container does not exist
+ """
+ partition, nodes = self.app.container_ring.get_nodes(
+ account, container)
+ path = '/%s/%s' % (account, container)
+ if self.app.memcache:
+ cache_key = get_container_memcache_key(account, container)
+ cache_value = self.app.memcache.get(cache_key)
+ if isinstance(cache_value, dict):
+ status = cache_value['status']
+ read_acl = cache_value['read_acl']
+ write_acl = cache_value['write_acl']
+ sync_key = cache_value.get('sync_key')
+ versions = cache_value.get('versions')
+ if status == HTTP_OK:
+ return partition, nodes, read_acl, write_acl, sync_key, \
+ versions
+ elif status == HTTP_NOT_FOUND:
+ return None, None, None, None, None, None
+ if not self.account_info(account, autocreate=account_autocreate)[1]:
+ return None, None, None, None, None, None
+ result_code = 0
+ read_acl = None
+ write_acl = None
+ sync_key = None
+ container_size = None
+ versions = None
+ attempts_left = len(nodes)
+ headers = {'x-trans-id': self.trans_id, 'Connection': 'close'}
+ iternodes = self.iter_nodes(partition, nodes, self.app.container_ring)
+ while attempts_left > 0:
+ try:
+ node = iternodes.next()
+ except StopIteration:
+ break
+ attempts_left -= 1
+ try:
+ with ConnectionTimeout(self.app.conn_timeout):
+ conn = http_connect(node['ip'], node['port'],
+ node['device'], partition, 'HEAD', path, headers)
+ with Timeout(self.app.node_timeout):
+ resp = conn.getresponse()
+ body = resp.read()
+ if is_success(resp.status):
+ result_code = HTTP_OK
+ read_acl = resp.getheader('x-container-read')
+ write_acl = resp.getheader('x-container-write')
+ sync_key = resp.getheader('x-container-sync-key')
+ container_size = \
+ resp.getheader('X-Container-Object-Count')
+ versions = resp.getheader('x-versions-location')
+ break
+ elif resp.status == HTTP_NOT_FOUND:
+ if result_code == 0:
+ result_code = HTTP_NOT_FOUND
+ elif result_code != HTTP_NOT_FOUND:
+ result_code = -1
+ elif resp.status == HTTP_INSUFFICIENT_STORAGE:
+ self.error_limit(node)
+ continue
+ else:
+ result_code = -1
+ except (Exception, Timeout):
+ self.exception_occurred(node, _('Container'),
+ _('Trying to get container info for %s') % path)
+ if self.app.memcache and result_code in (HTTP_OK, HTTP_NOT_FOUND):
+ if result_code == HTTP_OK:
+ cache_timeout = self.app.recheck_container_existence
+ else:
+ cache_timeout = self.app.recheck_container_existence * 0.1
+ self.app.memcache.set(cache_key,
+ {'status': result_code,
+ 'read_acl': read_acl,
+ 'write_acl': write_acl,
+ 'sync_key': sync_key,
+ 'container_size': container_size,
+ 'versions': versions},
+ timeout=cache_timeout)
+ if result_code == HTTP_OK:
+ return partition, nodes, read_acl, write_acl, sync_key, versions
+ return None, None, None, None, None, None
+
+ def iter_nodes(self, partition, nodes, ring):
+ """
+ Node iterator that will first iterate over the normal nodes for a
+ partition and then the handoff partitions for the node.
+
+ :param partition: partition to iterate nodes for
+ :param nodes: list of node dicts from the ring
+ :param ring: ring to get handoff nodes from
+ """
+ for node in nodes:
+ if not self.error_limited(node):
+ yield node
+ handoffs = 0
+ for node in ring.get_more_nodes(partition):
+ if not self.error_limited(node):
+ handoffs += 1
+ if self.app.log_handoffs:
+ self.app.logger.increment('handoff_count')
+ self.app.logger.warning(
+ 'Handoff requested (%d)' % handoffs)
+ if handoffs == len(nodes):
+ self.app.logger.increment('handoff_all_count')
+ yield node
+
+ def _make_request(self, nodes, part, method, path, headers, query,
+ logger_thread_locals):
+ self.app.logger.thread_locals = logger_thread_locals
+ for node in nodes:
+ try:
+ with ConnectionTimeout(self.app.conn_timeout):
+ conn = http_connect(node['ip'], node['port'],
+ node['device'], part, method, path,
+ headers=headers, query_string=query)
+ conn.node = node
+ with Timeout(self.app.node_timeout):
+ resp = conn.getresponse()
+ if not is_informational(resp.status) and \
+ not is_server_error(resp.status):
+ return resp.status, resp.reason, resp.read()
+ elif resp.status == HTTP_INSUFFICIENT_STORAGE:
+ self.error_limit(node)
+ except (Exception, Timeout):
+ self.exception_occurred(node, self.server_type,
+ _('Trying to %(method)s %(path)s') %
+ {'method': method, 'path': path})
+
+ def make_requests(self, req, ring, part, method, path, headers,
+ query_string=''):
+ """
+ Sends an HTTP request to multiple nodes and aggregates the results.
+ It attempts the primary nodes concurrently, then iterates over the
+ handoff nodes as needed.
+
+ :param headers: a list of dicts, where each dict represents one
+ backend request that should be made.
+ :returns: a webob Response object
+ """
+ start_nodes = ring.get_part_nodes(part)
+ nodes = self.iter_nodes(part, start_nodes, ring)
+ pile = GreenPile(len(start_nodes))
+ for head in headers:
+ pile.spawn(self._make_request, nodes, part, method, path,
+ head, query_string, self.app.logger.thread_locals)
+ response = [resp for resp in pile if resp]
+ while len(response) < len(start_nodes):
+ response.append((HTTP_SERVICE_UNAVAILABLE, '', ''))
+ statuses, reasons, bodies = zip(*response)
+ return self.best_response(req, statuses, reasons, bodies,
+ '%s %s' % (self.server_type, req.method))
+
+ def best_response(self, req, statuses, reasons, bodies, server_type,
+ etag=None):
+ """
+ Given a list of responses from several servers, choose the best to
+ return to the API.
+
+ :param req: webob.Request object
+ :param statuses: list of statuses returned
+ :param reasons: list of reasons for each status
+ :param bodies: bodies of each response
+ :param server_type: type of server the responses came from
+ :param etag: etag
+ :returns: webob.Response object with the correct status, body, etc. set
+ """
+ resp = Response(request=req)
+ if len(statuses):
+ for hundred in (HTTP_OK, HTTP_MULTIPLE_CHOICES, HTTP_BAD_REQUEST):
+ hstatuses = \
+ [s for s in statuses if hundred <= s < hundred + 100]
+ if len(hstatuses) > len(statuses) / 2:
+ status = max(hstatuses)
+ status_index = statuses.index(status)
+ resp.status = '%s %s' % (status, reasons[status_index])
+ resp.body = bodies[status_index]
+ resp.content_type = 'text/html'
+ if etag:
+ resp.headers['etag'] = etag.strip('"')
+ return resp
+ self.app.logger.error(_('%(type)s returning 503 for %(statuses)s'),
+ {'type': server_type, 'statuses': statuses})
+ resp.status = '503 Internal Server Error'
+ return resp
+
+ @public
+ def GET(self, req):
+ """Handler for HTTP GET requests."""
+ return self.GETorHEAD(req, stats_type='GET')
+
+ @public
+ def HEAD(self, req):
+ """Handler for HTTP HEAD requests."""
+ return self.GETorHEAD(req, stats_type='HEAD')
+
+ def _make_app_iter_reader(self, node, source, queue, logger_thread_locals):
+ """
+ Reads from the source and places data in the queue. It expects
+ something else be reading from the queue and, if nothing does within
+ self.app.client_timeout seconds, the process will be aborted.
+
+ :param node: The node dict that the source is connected to, for
+ logging/error-limiting purposes.
+ :param source: The httplib.Response object to read from.
+ :param queue: The eventlet.queue.Queue to place read source data into.
+ :param logger_thread_locals: The thread local values to be set on the
+ self.app.logger to retain transaction
+ logging information.
+ """
+ self.app.logger.thread_locals = logger_thread_locals
+ success = True
+ try:
+ try:
+ while True:
+ with ChunkReadTimeout(self.app.node_timeout):
+ chunk = source.read(self.app.object_chunk_size)
+ if not chunk:
+ break
+ queue.put(chunk, timeout=self.app.client_timeout)
+ except Full:
+ self.app.logger.warn(
+ _('Client did not read from queue within %ss') %
+ self.app.client_timeout)
+ self.app.logger.increment('client_timeouts')
+ success = False
+ except (Exception, Timeout):
+ self.exception_occurred(node, _('Object'),
+ _('Trying to read during GET'))
+ success = False
+ finally:
+ # Ensure the queue getter gets a terminator.
+ queue.resize(2)
+ queue.put(success)
+ # Close-out the connection as best as possible.
+ if getattr(source, 'swift_conn', None):
+ try:
+ source.swift_conn.close()
+ except Exception:
+ pass
+ source.swift_conn = None
+ try:
+ while source.read(self.app.object_chunk_size):
+ pass
+ except Exception:
+ pass
+ try:
+ source.close()
+ except Exception:
+ pass
+
+ def _make_app_iter(self, node, source, response):
+ """
+ Returns an iterator over the contents of the source (via its read
+ func). There is also quite a bit of cleanup to ensure garbage
+ collection works and the underlying socket of the source is closed.
+
+ :param response: The webob.Response object this iterator should be
+ assigned to via response.app_iter.
+ :param source: The httplib.Response object this iterator should read
+ from.
+ :param node: The node the source is reading from, for logging purposes.
+ """
+ try:
+ try:
+ # Spawn reader to read from the source and place in the queue.
+ # We then drop any reference to the source or node, for garbage
+ # collection purposes.
+ queue = Queue(1)
+ spawn_n(self._make_app_iter_reader, node, source, queue,
+ self.app.logger.thread_locals)
+ source = node = None
+ while True:
+ chunk = queue.get(timeout=self.app.node_timeout)
+ if isinstance(chunk, bool): # terminator
+ success = chunk
+ if not success:
+ raise Exception(_('Failed to read all data'
+ ' from the source'))
+ break
+ yield chunk
+ except Empty:
+ raise ChunkReadTimeout()
+ except (GeneratorExit, Timeout):
+ self.app.logger.warn(_('Client disconnected on read'))
+ except Exception:
+ self.app.logger.exception(_('Trying to send to client'))
+ raise
+ finally:
+ response.app_iter = None
+
+ def GETorHEAD_base(self, req, server_type, partition, nodes, path,
+ attempts):
+ """
+ Base handler for HTTP GET or HEAD requests.
+
+ :param req: webob.Request object
+ :param server_type: server type
+ :param partition: partition
+ :param nodes: nodes
+ :param path: path for the request
+ :param attempts: number of attempts to try
+ :returns: webob.Response object
+ """
+ statuses = []
+ reasons = []
+ bodies = []
+ source = None
+ newest = req.headers.get('x-newest', 'f').lower() in TRUE_VALUES
+ nodes = iter(nodes)
+ while len(statuses) < attempts:
+ try:
+ node = nodes.next()
+ except StopIteration:
+ break
+ if self.error_limited(node):
+ continue
+ try:
+ with ConnectionTimeout(self.app.conn_timeout):
+ headers = dict(req.headers)
+ headers['Connection'] = 'close'
+ conn = http_connect(node['ip'], node['port'],
+ node['device'], partition, req.method, path,
+ headers=headers,
+ query_string=req.query_string)
+ with Timeout(self.app.node_timeout):
+ possible_source = conn.getresponse()
+ # See NOTE: swift_conn at top of file about this.
+ possible_source.swift_conn = conn
+ except (Exception, Timeout):
+ self.exception_occurred(node, server_type,
+ _('Trying to %(method)s %(path)s') %
+ {'method': req.method, 'path': req.path})
+ continue
+ if possible_source.status == HTTP_INSUFFICIENT_STORAGE:
+ self.error_limit(node)
+ continue
+ if is_success(possible_source.status) or \
+ is_redirection(possible_source.status):
+ # 404 if we know we don't have a synced copy
+ if not float(possible_source.getheader('X-PUT-Timestamp', 1)):
+ statuses.append(HTTP_NOT_FOUND)
+ reasons.append('')
+ bodies.append('')
+ possible_source.read()
+ continue
+ if newest:
+ if source:
+ ts = float(source.getheader('x-put-timestamp') or
+ source.getheader('x-timestamp') or 0)
+ pts = float(
+ possible_source.getheader('x-put-timestamp') or
+ possible_source.getheader('x-timestamp') or 0)
+ if pts > ts:
+ source = possible_source
+ else:
+ source = possible_source
+ statuses.append(source.status)
+ reasons.append(source.reason)
+ bodies.append('')
+ continue
+ else:
+ source = possible_source
+ break
+ statuses.append(possible_source.status)
+ reasons.append(possible_source.reason)
+ bodies.append(possible_source.read())
+ if is_server_error(possible_source.status):
+ self.error_occurred(node, _('ERROR %(status)d %(body)s ' \
+ 'From %(type)s Server') %
+ {'status': possible_source.status,
+ 'body': bodies[-1][:1024], 'type': server_type})
+ if source:
+ if req.method == 'GET' and \
+ source.status in (HTTP_OK, HTTP_PARTIAL_CONTENT):
+ res = Response(request=req, conditional_response=True)
+ res.app_iter = self._make_app_iter(node, source, res)
+ # See NOTE: swift_conn at top of file about this.
+ res.swift_conn = source.swift_conn
+ update_headers(res, source.getheaders())
+ # Used by container sync feature
+ if res.environ is None:
+ res.environ = dict()
+ res.environ['swift_x_timestamp'] = \
+ source.getheader('x-timestamp')
+ update_headers(res, {'accept-ranges': 'bytes'})
+ res.status = source.status
+ res.content_length = source.getheader('Content-Length')
+ if source.getheader('Content-Type'):
+ res.charset = None
+ res.content_type = source.getheader('Content-Type')
+ return res
+ elif is_success(source.status) or is_redirection(source.status):
+ res = status_map[source.status](request=req)
+ update_headers(res, source.getheaders())
+ # Used by container sync feature
+ if res.environ is None:
+ res.environ = dict()
+ res.environ['swift_x_timestamp'] = \
+ source.getheader('x-timestamp')
+ update_headers(res, {'accept-ranges': 'bytes'})
+ res.content_length = source.getheader('Content-Length')
+ if source.getheader('Content-Type'):
+ res.charset = None
+ res.content_type = source.getheader('Content-Type')
+ return res
+ return self.best_response(req, statuses, reasons, bodies,
+ '%s %s' % (server_type, req.method))
diff --git a/swift/proxy/controllers/container.py b/swift/proxy/controllers/container.py
new file mode 100644
index 000000000..34c733910
--- /dev/null
+++ b/swift/proxy/controllers/container.py
@@ -0,0 +1,228 @@
+# Copyright (c) 2010-2012 OpenStack, LLC.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# NOTE: swift_conn
+# You'll see swift_conn passed around a few places in this file. This is the
+# source httplib connection of whatever it is attached to.
+# It is used when early termination of reading from the connection should
+# happen, such as when a range request is satisfied but there's still more the
+# source connection would like to send. To prevent having to read all the data
+# that could be left, the source connection can be .close() and then reads
+# commence to empty out any buffers.
+# These shenanigans are to ensure all related objects can be garbage
+# collected. We've seen objects hang around forever otherwise.
+
+import time
+from urllib import unquote
+from random import shuffle
+
+from webob.exc import HTTPBadRequest, HTTPForbidden, \
+ HTTPNotFound
+
+from swift.common.utils import normalize_timestamp, public
+from swift.common.constraints import check_metadata, MAX_CONTAINER_NAME_LENGTH
+from swift.common.http import HTTP_ACCEPTED
+from swift.proxy.controllers.base import Controller, delay_denial, \
+ get_container_memcache_key
+
+
+class ContainerController(Controller):
+ """WSGI controller for container requests"""
+ server_type = _('Container')
+
+ # Ensure these are all lowercase
+ pass_through_headers = ['x-container-read', 'x-container-write',
+ 'x-container-sync-key', 'x-container-sync-to',
+ 'x-versions-location']
+
+ def __init__(self, app, account_name, container_name, **kwargs):
+ Controller.__init__(self, app)
+ self.account_name = unquote(account_name)
+ self.container_name = unquote(container_name)
+
+ def clean_acls(self, req):
+ if 'swift.clean_acl' in req.environ:
+ for header in ('x-container-read', 'x-container-write'):
+ if header in req.headers:
+ try:
+ req.headers[header] = \
+ req.environ['swift.clean_acl'](header,
+ req.headers[header])
+ except ValueError, err:
+ return HTTPBadRequest(request=req, body=str(err))
+ return None
+
+ def GETorHEAD(self, req, stats_type):
+ """Handler for HTTP GET/HEAD requests."""
+ start_time = time.time()
+ if not self.account_info(self.account_name)[1]:
+ self.app.logger.timing_since(
+ '%s.timing' % (stats_type,), start_time)
+ return HTTPNotFound(request=req)
+ part, nodes = self.app.container_ring.get_nodes(
+ self.account_name, self.container_name)
+ shuffle(nodes)
+ resp = self.GETorHEAD_base(req, _('Container'), part, nodes,
+ req.path_info, len(nodes))
+
+ if self.app.memcache:
+ # set the memcache container size for ratelimiting
+ cache_key = get_container_memcache_key(self.account_name,
+ self.container_name)
+ self.app.memcache.set(cache_key,
+ {'status': resp.status_int,
+ 'read_acl': resp.headers.get('x-container-read'),
+ 'write_acl': resp.headers.get('x-container-write'),
+ 'sync_key': resp.headers.get('x-container-sync-key'),
+ 'container_size': resp.headers.get('x-container-object-count'),
+ 'versions': resp.headers.get('x-versions-location')},
+ timeout=self.app.recheck_container_existence)
+
+ if 'swift.authorize' in req.environ:
+ req.acl = resp.headers.get('x-container-read')
+ aresp = req.environ['swift.authorize'](req)
+ if aresp:
+ self.app.logger.increment('auth_short_circuits')
+ return aresp
+ if not req.environ.get('swift_owner', False):
+ for key in ('x-container-read', 'x-container-write',
+ 'x-container-sync-key', 'x-container-sync-to'):
+ if key in resp.headers:
+ del resp.headers[key]
+ self.app.logger.timing_since('%s.timing' % (stats_type,), start_time)
+ return resp
+
+ @public
+ @delay_denial
+ def GET(self, req):
+ """Handler for HTTP GET requests."""
+ return self.GETorHEAD(req, stats_type='GET')
+
+ @public
+ @delay_denial
+ def HEAD(self, req):
+ """Handler for HTTP HEAD requests."""
+ return self.GETorHEAD(req, stats_type='HEAD')
+
+ @public
+ def PUT(self, req):
+ """HTTP PUT request handler."""
+ start_time = time.time()
+ error_response = \
+ self.clean_acls(req) or check_metadata(req, 'container')
+ if error_response:
+ self.app.logger.increment('errors')
+ return error_response
+ if len(self.container_name) > MAX_CONTAINER_NAME_LENGTH:
+ resp = HTTPBadRequest(request=req)
+ resp.body = 'Container name length of %d longer than %d' % \
+ (len(self.container_name), MAX_CONTAINER_NAME_LENGTH)
+ self.app.logger.increment('errors')
+ return resp
+ account_partition, accounts, container_count = \
+ self.account_info(self.account_name,
+ autocreate=self.app.account_autocreate)
+ if self.app.max_containers_per_account > 0 and \
+ container_count >= self.app.max_containers_per_account and \
+ self.account_name not in self.app.max_containers_whitelist:
+ resp = HTTPForbidden(request=req)
+ resp.body = 'Reached container limit of %s' % \
+ self.app.max_containers_per_account
+ return resp
+ if not accounts:
+ self.app.logger.timing_since('PUT.timing', start_time)
+ return HTTPNotFound(request=req)
+ container_partition, containers = self.app.container_ring.get_nodes(
+ self.account_name, self.container_name)
+ headers = []
+ for account in accounts:
+ nheaders = {'X-Timestamp': normalize_timestamp(time.time()),
+ 'x-trans-id': self.trans_id,
+ 'X-Account-Host': '%(ip)s:%(port)s' % account,
+ 'X-Account-Partition': account_partition,
+ 'X-Account-Device': account['device'],
+ 'Connection': 'close'}
+ self.transfer_headers(req.headers, nheaders)
+ headers.append(nheaders)
+ if self.app.memcache:
+ cache_key = get_container_memcache_key(self.account_name,
+ self.container_name)
+ self.app.memcache.delete(cache_key)
+ resp = self.make_requests(req, self.app.container_ring,
+ container_partition, 'PUT', req.path_info, headers)
+ self.app.logger.timing_since('PUT.timing', start_time)
+ return resp
+
+ @public
+ def POST(self, req):
+ """HTTP POST request handler."""
+ start_time = time.time()
+ error_response = \
+ self.clean_acls(req) or check_metadata(req, 'container')
+ if error_response:
+ self.app.logger.increment('errors')
+ return error_response
+ account_partition, accounts, container_count = \
+ self.account_info(self.account_name,
+ autocreate=self.app.account_autocreate)
+ if not accounts:
+ self.app.logger.timing_since('POST.timing', start_time)
+ return HTTPNotFound(request=req)
+ container_partition, containers = self.app.container_ring.get_nodes(
+ self.account_name, self.container_name)
+ headers = {'X-Timestamp': normalize_timestamp(time.time()),
+ 'x-trans-id': self.trans_id,
+ 'Connection': 'close'}
+ self.transfer_headers(req.headers, headers)
+ if self.app.memcache:
+ cache_key = get_container_memcache_key(self.account_name,
+ self.container_name)
+ self.app.memcache.delete(cache_key)
+ resp = self.make_requests(req, self.app.container_ring,
+ container_partition, 'POST', req.path_info,
+ [headers] * len(containers))
+ self.app.logger.timing_since('POST.timing', start_time)
+ return resp
+
+ @public
+ def DELETE(self, req):
+ """HTTP DELETE request handler."""
+ start_time = time.time()
+ account_partition, accounts, container_count = \
+ self.account_info(self.account_name)
+ if not accounts:
+ self.app.logger.timing_since('DELETE.timing', start_time)
+ return HTTPNotFound(request=req)
+ container_partition, containers = self.app.container_ring.get_nodes(
+ self.account_name, self.container_name)
+ headers = []
+ for account in accounts:
+ headers.append({'X-Timestamp': normalize_timestamp(time.time()),
+ 'X-Trans-Id': self.trans_id,
+ 'X-Account-Host': '%(ip)s:%(port)s' % account,
+ 'X-Account-Partition': account_partition,
+ 'X-Account-Device': account['device'],
+ 'Connection': 'close'})
+ if self.app.memcache:
+ cache_key = get_container_memcache_key(self.account_name,
+ self.container_name)
+ self.app.memcache.delete(cache_key)
+ resp = self.make_requests(req, self.app.container_ring,
+ container_partition, 'DELETE', req.path_info, headers)
+ # Indicates no server had the container
+ self.app.logger.timing_since('DELETE.timing', start_time)
+ if resp.status_int == HTTP_ACCEPTED:
+ return HTTPNotFound(request=req)
+ return resp
diff --git a/swift/proxy/controllers/obj.py b/swift/proxy/controllers/obj.py
new file mode 100644
index 000000000..79963d9fb
--- /dev/null
+++ b/swift/proxy/controllers/obj.py
@@ -0,0 +1,945 @@
+# Copyright (c) 2010-2012 OpenStack, LLC.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# NOTE: swift_conn
+# You'll see swift_conn passed around a few places in this file. This is the
+# source httplib connection of whatever it is attached to.
+# It is used when early termination of reading from the connection should
+# happen, such as when a range request is satisfied but there's still more the
+# source connection would like to send. To prevent having to read all the data
+# that could be left, the source connection can be .close() and then reads
+# commence to empty out any buffers.
+# These shenanigans are to ensure all related objects can be garbage
+# collected. We've seen objects hang around forever otherwise.
+
+try:
+ import simplejson as json
+except ImportError:
+ import json
+import mimetypes
+import re
+import time
+from datetime import datetime
+from urllib import unquote, quote
+from hashlib import md5
+from random import shuffle
+
+from eventlet import sleep, GreenPile, Timeout
+from eventlet.queue import Queue
+from eventlet.timeout import Timeout
+from webob.exc import HTTPAccepted, HTTPBadRequest, HTTPNotFound, \
+ HTTPPreconditionFailed, HTTPRequestEntityTooLarge, HTTPRequestTimeout, \
+ HTTPServerError, HTTPServiceUnavailable
+from webob import Request, Response
+
+from swift.common.utils import ContextPool, normalize_timestamp, TRUE_VALUES, \
+ public
+from swift.common.bufferedhttp import http_connect
+from swift.common.constraints import check_metadata, check_object_creation, \
+ CONTAINER_LISTING_LIMIT, MAX_FILE_SIZE
+from swift.common.exceptions import ChunkReadTimeout, \
+ ChunkWriteTimeout, ConnectionTimeout, ListingIterNotFound, \
+ ListingIterNotAuthorized, ListingIterError
+from swift.common.http import is_success, is_client_error, HTTP_CONTINUE, \
+ HTTP_CREATED, HTTP_MULTIPLE_CHOICES, HTTP_NOT_FOUND, \
+ HTTP_INTERNAL_SERVER_ERROR, HTTP_SERVICE_UNAVAILABLE, \
+ HTTP_INSUFFICIENT_STORAGE, HTTPClientDisconnect
+from swift.proxy.controllers.base import Controller, delay_denial
+
+
+class SegmentedIterable(object):
+ """
+ Iterable that returns the object contents for a segmented object in Swift.
+
+ If there's a failure that cuts the transfer short, the response's
+ `status_int` will be updated (again, just for logging since the original
+ status would have already been sent to the client).
+
+ :param controller: The ObjectController instance to work with.
+ :param container: The container the object segments are within.
+ :param listing: The listing of object segments to iterate over; this may
+ be an iterator or list that returns dicts with 'name' and
+ 'bytes' keys.
+ :param response: The webob.Response this iterable is associated with, if
+ any (default: None)
+ """
+
+ def __init__(self, controller, container, listing, response=None):
+ self.controller = controller
+ self.container = container
+ self.listing = iter(listing)
+ self.segment = -1
+ self.segment_dict = None
+ self.segment_peek = None
+ self.seek = 0
+ self.segment_iter = None
+ # See NOTE: swift_conn at top of file about this.
+ self.segment_iter_swift_conn = None
+ self.position = 0
+ self.response = response
+ if not self.response:
+ self.response = Response()
+ self.next_get_time = 0
+
+ def _load_next_segment(self):
+ """
+ Loads the self.segment_iter with the next object segment's contents.
+
+ :raises: StopIteration when there are no more object segments.
+ """
+ try:
+ self.segment += 1
+ self.segment_dict = self.segment_peek or self.listing.next()
+ self.segment_peek = None
+ partition, nodes = self.controller.app.object_ring.get_nodes(
+ self.controller.account_name, self.container,
+ self.segment_dict['name'])
+ path = '/%s/%s/%s' % (self.controller.account_name, self.container,
+ self.segment_dict['name'])
+ req = Request.blank(path)
+ if self.seek:
+ req.range = 'bytes=%s-' % self.seek
+ self.seek = 0
+ if self.segment > self.controller.app.rate_limit_after_segment:
+ sleep(max(self.next_get_time - time.time(), 0))
+ self.next_get_time = time.time() + \
+ 1.0 / self.controller.app.rate_limit_segments_per_sec
+ shuffle(nodes)
+ resp = self.controller.GETorHEAD_base(req, _('Object'), partition,
+ self.controller.iter_nodes(partition, nodes,
+ self.controller.app.object_ring), path,
+ len(nodes))
+ if not is_success(resp.status_int):
+ raise Exception(_('Could not load object segment %(path)s:' \
+ ' %(status)s') % {'path': path, 'status': resp.status_int})
+ self.segment_iter = resp.app_iter
+ # See NOTE: swift_conn at top of file about this.
+ self.segment_iter_swift_conn = getattr(resp, 'swift_conn', None)
+ except StopIteration:
+ raise
+ except (Exception, Timeout), err:
+ if not getattr(err, 'swift_logged', False):
+ self.controller.app.logger.exception(_('ERROR: While '
+ 'processing manifest /%(acc)s/%(cont)s/%(obj)s'),
+ {'acc': self.controller.account_name,
+ 'cont': self.controller.container_name,
+ 'obj': self.controller.object_name})
+ err.swift_logged = True
+ self.response.status_int = HTTP_SERVICE_UNAVAILABLE
+ raise
+
+ def next(self):
+ return iter(self).next()
+
+ def __iter__(self):
+ """ Standard iterator function that returns the object's contents. """
+ try:
+ while True:
+ if not self.segment_iter:
+ self._load_next_segment()
+ while True:
+ with ChunkReadTimeout(self.controller.app.node_timeout):
+ try:
+ chunk = self.segment_iter.next()
+ break
+ except StopIteration:
+ self._load_next_segment()
+ self.position += len(chunk)
+ yield chunk
+ except StopIteration:
+ raise
+ except (Exception, Timeout), err:
+ if not getattr(err, 'swift_logged', False):
+ self.controller.app.logger.exception(_('ERROR: While '
+ 'processing manifest /%(acc)s/%(cont)s/%(obj)s'),
+ {'acc': self.controller.account_name,
+ 'cont': self.controller.container_name,
+ 'obj': self.controller.object_name})
+ err.swift_logged = True
+ self.response.status_int = HTTP_SERVICE_UNAVAILABLE
+ raise
+
+ def app_iter_range(self, start, stop):
+ """
+ Non-standard iterator function for use with Webob in serving Range
+ requests more quickly. This will skip over segments and do a range
+ request on the first segment to return data from, if needed.
+
+ :param start: The first byte (zero-based) to return. None for 0.
+ :param stop: The last byte (zero-based) to return. None for end.
+ """
+ try:
+ if start:
+ self.segment_peek = self.listing.next()
+ while start >= self.position + self.segment_peek['bytes']:
+ self.segment += 1
+ self.position += self.segment_peek['bytes']
+ self.segment_peek = self.listing.next()
+ self.seek = start - self.position
+ else:
+ start = 0
+ if stop is not None:
+ length = stop - start
+ else:
+ length = None
+ for chunk in self:
+ if length is not None:
+ length -= len(chunk)
+ if length < 0:
+ # Chop off the extra:
+ yield chunk[:length]
+ break
+ yield chunk
+ # See NOTE: swift_conn at top of file about this.
+ if self.segment_iter_swift_conn:
+ try:
+ self.segment_iter_swift_conn.close()
+ except Exception:
+ pass
+ self.segment_iter_swift_conn = None
+ if self.segment_iter:
+ try:
+ while self.segment_iter.next():
+ pass
+ except Exception:
+ pass
+ self.segment_iter = None
+ except StopIteration:
+ raise
+ except (Exception, Timeout), err:
+ if not getattr(err, 'swift_logged', False):
+ self.controller.app.logger.exception(_('ERROR: While '
+ 'processing manifest /%(acc)s/%(cont)s/%(obj)s'),
+ {'acc': self.controller.account_name,
+ 'cont': self.controller.container_name,
+ 'obj': self.controller.object_name})
+ err.swift_logged = True
+ self.response.status_int = HTTP_SERVICE_UNAVAILABLE
+ raise
+
+
+class ObjectController(Controller):
+ """WSGI controller for object requests."""
+ server_type = _('Object')
+
+ def __init__(self, app, account_name, container_name, object_name,
+ **kwargs):
+ Controller.__init__(self, app)
+ self.account_name = unquote(account_name)
+ self.container_name = unquote(container_name)
+ self.object_name = unquote(object_name)
+
+ def _listing_iter(self, lcontainer, lprefix, env):
+ lpartition, lnodes = self.app.container_ring.get_nodes(
+ self.account_name, lcontainer)
+ marker = ''
+ while True:
+ lreq = Request.blank('i will be overridden by env', environ=env)
+ # Don't quote PATH_INFO, by WSGI spec
+ lreq.environ['PATH_INFO'] = \
+ '/%s/%s' % (self.account_name, lcontainer)
+ lreq.environ['REQUEST_METHOD'] = 'GET'
+ lreq.environ['QUERY_STRING'] = \
+ 'format=json&prefix=%s&marker=%s' % (quote(lprefix),
+ quote(marker))
+ shuffle(lnodes)
+ lresp = self.GETorHEAD_base(lreq, _('Container'),
+ lpartition, lnodes, lreq.path_info,
+ len(lnodes))
+ if 'swift.authorize' in env:
+ lreq.acl = lresp.headers.get('x-container-read')
+ aresp = env['swift.authorize'](lreq)
+ if aresp:
+ raise ListingIterNotAuthorized(aresp)
+ if lresp.status_int == HTTP_NOT_FOUND:
+ raise ListingIterNotFound()
+ elif not is_success(lresp.status_int):
+ raise ListingIterError()
+ if not lresp.body:
+ break
+ sublisting = json.loads(lresp.body)
+ if not sublisting:
+ break
+ marker = sublisting[-1]['name']
+ for obj in sublisting:
+ yield obj
+
+ def GETorHEAD(self, req, stats_type):
+ """Handle HTTP GET or HEAD requests."""
+ start_time = time.time()
+ _junk, _junk, req.acl, _junk, _junk, object_versions = \
+ self.container_info(self.account_name, self.container_name)
+ if 'swift.authorize' in req.environ:
+ aresp = req.environ['swift.authorize'](req)
+ if aresp:
+ self.app.logger.increment('auth_short_circuits')
+ return aresp
+ partition, nodes = self.app.object_ring.get_nodes(
+ self.account_name, self.container_name, self.object_name)
+ shuffle(nodes)
+ resp = self.GETorHEAD_base(req, _('Object'), partition,
+ self.iter_nodes(partition, nodes, self.app.object_ring),
+ req.path_info, len(nodes))
+ # Whether we get a 416 Requested Range Not Satisfiable or not,
+ # we should request a manifest because size of manifest file
+ # can be not 0. After checking a manifest, redo the range request
+ # on the whole object.
+ if req.range:
+ req_range = req.range
+ req.range = None
+ resp2 = self.GETorHEAD_base(req, _('Object'), partition,
+ self.iter_nodes(partition,
+ nodes,
+ self.app.object_ring),
+ req.path_info, len(nodes))
+ if 'x-object-manifest' not in resp2.headers:
+ self.app.logger.timing_since(
+ '%s.timing' % (stats_type,), start_time)
+ return resp
+ resp = resp2
+ req.range = str(req_range)
+
+ if 'x-object-manifest' in resp.headers:
+ lcontainer, lprefix = \
+ resp.headers['x-object-manifest'].split('/', 1)
+ lcontainer = unquote(lcontainer)
+ lprefix = unquote(lprefix)
+ try:
+ listing = list(self._listing_iter(lcontainer, lprefix,
+ req.environ))
+ except ListingIterNotFound:
+ self.app.logger.timing_since(
+ '%s.timing' % (stats_type,), start_time)
+ return HTTPNotFound(request=req)
+ except ListingIterNotAuthorized, err:
+ self.app.logger.increment('auth_short_circuits')
+ return err.aresp
+ except ListingIterError:
+ self.app.logger.increment('errors')
+ return HTTPServerError(request=req)
+
+ if len(listing) > CONTAINER_LISTING_LIMIT:
+ resp = Response(headers=resp.headers, request=req,
+ conditional_response=True)
+ if req.method == 'HEAD':
+ # These shenanigans are because webob translates the HEAD
+ # request into a webob EmptyResponse for the body, which
+ # has a len, which eventlet translates as needing a
+ # content-length header added. So we call the original
+ # webob resp for the headers but return an empty iterator
+ # for the body.
+
+ def head_response(environ, start_response):
+ resp(environ, start_response)
+ return iter([])
+
+ head_response.status_int = resp.status_int
+ self.app.logger.timing_since(
+ '%s.timing' % (stats_type,), start_time)
+ return head_response
+ else:
+ resp.app_iter = SegmentedIterable(self, lcontainer,
+ self._listing_iter(lcontainer, lprefix, req.environ),
+ resp)
+
+ else:
+ # For objects with a reasonable number of segments, we'll serve
+ # them with a set content-length and computed etag.
+ if listing:
+ content_length = sum(o['bytes'] for o in listing)
+ last_modified = max(o['last_modified'] for o in listing)
+ last_modified = datetime(*map(int, re.split('[^\d]',
+ last_modified)[:-1]))
+ etag = md5(
+ ''.join(o['hash'] for o in listing)).hexdigest()
+ else:
+ content_length = 0
+ last_modified = resp.last_modified
+ etag = md5().hexdigest()
+ resp = Response(headers=resp.headers, request=req,
+ conditional_response=True)
+ resp.app_iter = SegmentedIterable(self, lcontainer, listing,
+ resp)
+ resp.content_length = content_length
+ resp.last_modified = last_modified
+ resp.etag = etag
+ resp.headers['accept-ranges'] = 'bytes'
+
+ self.app.logger.timing_since('%s.timing' % (stats_type,), start_time)
+ return resp
+
+ @public
+ @delay_denial
+ def GET(self, req):
+ """Handler for HTTP GET requests."""
+ return self.GETorHEAD(req, stats_type='GET')
+
+ @public
+ @delay_denial
+ def HEAD(self, req):
+ """Handler for HTTP HEAD requests."""
+ return self.GETorHEAD(req, stats_type='HEAD')
+
+ @public
+ @delay_denial
+ def POST(self, req):
+ """HTTP POST request handler."""
+ start_time = time.time()
+ if 'x-delete-after' in req.headers:
+ try:
+ x_delete_after = int(req.headers['x-delete-after'])
+ except ValueError:
+ self.app.logger.increment('errors')
+ return HTTPBadRequest(request=req,
+ content_type='text/plain',
+ body='Non-integer X-Delete-After')
+ req.headers['x-delete-at'] = '%d' % (time.time() + x_delete_after)
+ if self.app.object_post_as_copy:
+ req.method = 'PUT'
+ req.path_info = '/%s/%s/%s' % (self.account_name,
+ self.container_name, self.object_name)
+ req.headers['Content-Length'] = 0
+ req.headers['X-Copy-From'] = quote('/%s/%s' % (self.container_name,
+ self.object_name))
+ req.headers['X-Fresh-Metadata'] = 'true'
+ req.environ['swift_versioned_copy'] = True
+ resp = self.PUT(req, start_time=start_time, stats_type='POST')
+ # Older editions returned 202 Accepted on object POSTs, so we'll
+ # convert any 201 Created responses to that for compatibility with
+ # picky clients.
+ if resp.status_int != HTTP_CREATED:
+ return resp
+ return HTTPAccepted(request=req)
+ else:
+ error_response = check_metadata(req, 'object')
+ if error_response:
+ self.app.logger.increment('errors')
+ return error_response
+ container_partition, containers, _junk, req.acl, _junk, _junk = \
+ self.container_info(self.account_name, self.container_name,
+ account_autocreate=self.app.account_autocreate)
+ if 'swift.authorize' in req.environ:
+ aresp = req.environ['swift.authorize'](req)
+ if aresp:
+ self.app.logger.increment('auth_short_circuits')
+ return aresp
+ if not containers:
+ self.app.logger.timing_since('POST.timing', start_time)
+ return HTTPNotFound(request=req)
+ if 'x-delete-at' in req.headers:
+ try:
+ x_delete_at = int(req.headers['x-delete-at'])
+ if x_delete_at < time.time():
+ self.app.logger.increment('errors')
+ return HTTPBadRequest(body='X-Delete-At in past',
+ request=req, content_type='text/plain')
+ except ValueError:
+ self.app.logger.increment('errors')
+ return HTTPBadRequest(request=req,
+ content_type='text/plain',
+ body='Non-integer X-Delete-At')
+ delete_at_container = str(x_delete_at /
+ self.app.expiring_objects_container_divisor *
+ self.app.expiring_objects_container_divisor)
+ delete_at_part, delete_at_nodes = \
+ self.app.container_ring.get_nodes(
+ self.app.expiring_objects_account, delete_at_container)
+ else:
+ delete_at_part = delete_at_nodes = None
+ partition, nodes = self.app.object_ring.get_nodes(
+ self.account_name, self.container_name, self.object_name)
+ req.headers['X-Timestamp'] = normalize_timestamp(time.time())
+ headers = []
+ for container in containers:
+ nheaders = dict(req.headers.iteritems())
+ nheaders['Connection'] = 'close'
+ nheaders['X-Container-Host'] = '%(ip)s:%(port)s' % container
+ nheaders['X-Container-Partition'] = container_partition
+ nheaders['X-Container-Device'] = container['device']
+ if delete_at_nodes:
+ node = delete_at_nodes.pop(0)
+ nheaders['X-Delete-At-Host'] = '%(ip)s:%(port)s' % node
+ nheaders['X-Delete-At-Partition'] = delete_at_part
+ nheaders['X-Delete-At-Device'] = node['device']
+ headers.append(nheaders)
+ resp = self.make_requests(req, self.app.object_ring, partition,
+ 'POST', req.path_info, headers)
+ self.app.logger.timing_since('POST.timing', start_time)
+ return resp
+
+ def _send_file(self, conn, path):
+ """Method for a file PUT coro"""
+ while True:
+ chunk = conn.queue.get()
+ if not conn.failed:
+ try:
+ with ChunkWriteTimeout(self.app.node_timeout):
+ conn.send(chunk)
+ except (Exception, ChunkWriteTimeout):
+ conn.failed = True
+ self.exception_occurred(conn.node, _('Object'),
+ _('Trying to write to %s') % path)
+ conn.queue.task_done()
+
+ def _connect_put_node(self, nodes, part, path, headers,
+ logger_thread_locals):
+ """Method for a file PUT connect"""
+ self.app.logger.thread_locals = logger_thread_locals
+ for node in nodes:
+ try:
+ with ConnectionTimeout(self.app.conn_timeout):
+ conn = http_connect(node['ip'], node['port'],
+ node['device'], part, 'PUT', path, headers)
+ with Timeout(self.app.node_timeout):
+ resp = conn.getexpect()
+ if resp.status == HTTP_CONTINUE:
+ conn.node = node
+ return conn
+ elif resp.status == HTTP_INSUFFICIENT_STORAGE:
+ self.error_limit(node)
+ except:
+ self.exception_occurred(node, _('Object'),
+ _('Expect: 100-continue on %s') % path)
+
+ @public
+ @delay_denial
+ def PUT(self, req, start_time=None, stats_type='PUT'):
+ """HTTP PUT request handler."""
+ if not start_time:
+ start_time = time.time()
+ (container_partition, containers, _junk, req.acl,
+ req.environ['swift_sync_key'], object_versions) = \
+ self.container_info(self.account_name, self.container_name,
+ account_autocreate=self.app.account_autocreate)
+ if 'swift.authorize' in req.environ:
+ aresp = req.environ['swift.authorize'](req)
+ if aresp:
+ self.app.logger.increment('auth_short_circuits')
+ return aresp
+ if not containers:
+ self.app.logger.timing_since(
+ '%s.timing' % (stats_type,), start_time)
+ return HTTPNotFound(request=req)
+ if 'x-delete-after' in req.headers:
+ try:
+ x_delete_after = int(req.headers['x-delete-after'])
+ except ValueError:
+ self.app.logger.increment('errors')
+ return HTTPBadRequest(request=req,
+ content_type='text/plain',
+ body='Non-integer X-Delete-After')
+ req.headers['x-delete-at'] = '%d' % (time.time() + x_delete_after)
+ if 'x-delete-at' in req.headers:
+ try:
+ x_delete_at = int(req.headers['x-delete-at'])
+ if x_delete_at < time.time():
+ self.app.logger.increment('errors')
+ return HTTPBadRequest(body='X-Delete-At in past',
+ request=req, content_type='text/plain')
+ except ValueError:
+ self.app.logger.increment('errors')
+ return HTTPBadRequest(request=req, content_type='text/plain',
+ body='Non-integer X-Delete-At')
+ delete_at_container = str(x_delete_at /
+ self.app.expiring_objects_container_divisor *
+ self.app.expiring_objects_container_divisor)
+ delete_at_part, delete_at_nodes = \
+ self.app.container_ring.get_nodes(
+ self.app.expiring_objects_account, delete_at_container)
+ else:
+ delete_at_part = delete_at_nodes = None
+ partition, nodes = self.app.object_ring.get_nodes(
+ self.account_name, self.container_name, self.object_name)
+ # do a HEAD request for container sync and checking object versions
+ if 'x-timestamp' in req.headers or (object_versions and not
+ req.environ.get('swift_versioned_copy')):
+ hreq = Request.blank(req.path_info, headers={'X-Newest': 'True'},
+ environ={'REQUEST_METHOD': 'HEAD'})
+ hresp = self.GETorHEAD_base(hreq, _('Object'), partition, nodes,
+ hreq.path_info, len(nodes))
+ # Used by container sync feature
+ if 'x-timestamp' in req.headers:
+ try:
+ req.headers['X-Timestamp'] = \
+ normalize_timestamp(float(req.headers['x-timestamp']))
+ if hresp.environ and 'swift_x_timestamp' in hresp.environ and \
+ float(hresp.environ['swift_x_timestamp']) >= \
+ float(req.headers['x-timestamp']):
+ self.app.logger.timing_since(
+ '%.timing' % (stats_type,), start_time)
+ return HTTPAccepted(request=req)
+ except ValueError:
+ self.app.logger.increment('errors')
+ return HTTPBadRequest(request=req, content_type='text/plain',
+ body='X-Timestamp should be a UNIX timestamp float value; '
+ 'was %r' % req.headers['x-timestamp'])
+ else:
+ req.headers['X-Timestamp'] = normalize_timestamp(time.time())
+ # Sometimes the 'content-type' header exists, but is set to None.
+ content_type_manually_set = True
+ if not req.headers.get('content-type'):
+ guessed_type, _junk = mimetypes.guess_type(req.path_info)
+ req.headers['Content-Type'] = guessed_type or \
+ 'application/octet-stream'
+ content_type_manually_set = False
+ error_response = check_object_creation(req, self.object_name)
+ if error_response:
+ self.app.logger.increment('errors')
+ return error_response
+ if object_versions and not req.environ.get('swift_versioned_copy'):
+ is_manifest = 'x-object-manifest' in req.headers or \
+ 'x-object-manifest' in hresp.headers
+ if hresp.status_int != HTTP_NOT_FOUND and not is_manifest:
+ # This is a version manifest and needs to be handled
+ # differently. First copy the existing data to a new object,
+ # then write the data from this request to the version manifest
+ # object.
+ lcontainer = object_versions.split('/')[0]
+ prefix_len = '%03x' % len(self.object_name)
+ lprefix = prefix_len + self.object_name + '/'
+ ts_source = hresp.environ.get('swift_x_timestamp')
+ if ts_source is None:
+ ts_source = time.mktime(time.strptime(
+ hresp.headers['last-modified'],
+ '%a, %d %b %Y %H:%M:%S GMT'))
+ new_ts = normalize_timestamp(ts_source)
+ vers_obj_name = lprefix + new_ts
+ copy_headers = {
+ 'Destination': '%s/%s' % (lcontainer, vers_obj_name)}
+ copy_environ = {'REQUEST_METHOD': 'COPY',
+ 'swift_versioned_copy': True
+ }
+ copy_req = Request.blank(req.path_info, headers=copy_headers,
+ environ=copy_environ)
+ copy_resp = self.COPY(copy_req)
+ if is_client_error(copy_resp.status_int):
+ # missing container or bad permissions
+ return HTTPPreconditionFailed(request=req)
+ elif not is_success(copy_resp.status_int):
+ # could not copy the data, bail
+ return HTTPServiceUnavailable(request=req)
+
+ reader = req.environ['wsgi.input'].read
+ data_source = iter(lambda: reader(self.app.client_chunk_size), '')
+ source_header = req.headers.get('X-Copy-From')
+ source_resp = None
+ if source_header:
+ source_header = unquote(source_header)
+ acct = req.path_info.split('/', 2)[1]
+ if isinstance(acct, unicode):
+ acct = acct.encode('utf-8')
+ if not source_header.startswith('/'):
+ source_header = '/' + source_header
+ source_header = '/' + acct + source_header
+ try:
+ src_container_name, src_obj_name = \
+ source_header.split('/', 3)[2:]
+ except ValueError:
+ self.app.logger.increment('errors')
+ return HTTPPreconditionFailed(request=req,
+ body='X-Copy-From header must be of the form'
+ '<container name>/<object name>')
+ source_req = req.copy_get()
+ source_req.path_info = source_header
+ source_req.headers['X-Newest'] = 'true'
+ orig_obj_name = self.object_name
+ orig_container_name = self.container_name
+ self.object_name = src_obj_name
+ self.container_name = src_container_name
+ source_resp = self.GET(source_req)
+ if source_resp.status_int >= HTTP_MULTIPLE_CHOICES:
+ self.app.logger.timing_since(
+ '%s.timing' % (stats_type,), start_time)
+ return source_resp
+ self.object_name = orig_obj_name
+ self.container_name = orig_container_name
+ new_req = Request.blank(req.path_info,
+ environ=req.environ, headers=req.headers)
+ data_source = source_resp.app_iter
+ new_req.content_length = source_resp.content_length
+ if new_req.content_length is None:
+ # This indicates a transfer-encoding: chunked source object,
+ # which currently only happens because there are more than
+ # CONTAINER_LISTING_LIMIT segments in a segmented object. In
+ # this case, we're going to refuse to do the server-side copy.
+ self.app.logger.increment('errors')
+ return HTTPRequestEntityTooLarge(request=req)
+ new_req.etag = source_resp.etag
+ # we no longer need the X-Copy-From header
+ del new_req.headers['X-Copy-From']
+ if not content_type_manually_set:
+ new_req.headers['Content-Type'] = \
+ source_resp.headers['Content-Type']
+ if new_req.headers.get('x-fresh-metadata', 'false').lower() \
+ not in TRUE_VALUES:
+ for k, v in source_resp.headers.items():
+ if k.lower().startswith('x-object-meta-'):
+ new_req.headers[k] = v
+ for k, v in req.headers.items():
+ if k.lower().startswith('x-object-meta-'):
+ new_req.headers[k] = v
+ req = new_req
+ node_iter = self.iter_nodes(partition, nodes, self.app.object_ring)
+ pile = GreenPile(len(nodes))
+ for container in containers:
+ nheaders = dict(req.headers.iteritems())
+ nheaders['Connection'] = 'close'
+ nheaders['X-Container-Host'] = '%(ip)s:%(port)s' % container
+ nheaders['X-Container-Partition'] = container_partition
+ nheaders['X-Container-Device'] = container['device']
+ nheaders['Expect'] = '100-continue'
+ if delete_at_nodes:
+ node = delete_at_nodes.pop(0)
+ nheaders['X-Delete-At-Host'] = '%(ip)s:%(port)s' % node
+ nheaders['X-Delete-At-Partition'] = delete_at_part
+ nheaders['X-Delete-At-Device'] = node['device']
+ pile.spawn(self._connect_put_node, node_iter, partition,
+ req.path_info, nheaders, self.app.logger.thread_locals)
+ conns = [conn for conn in pile if conn]
+ if len(conns) <= len(nodes) / 2:
+ self.app.logger.error(
+ _('Object PUT returning 503, %(conns)s/%(nodes)s '
+ 'required connections'),
+ {'conns': len(conns), 'nodes': len(nodes) // 2 + 1})
+ self.app.logger.increment('errors')
+ return HTTPServiceUnavailable(request=req)
+ chunked = req.headers.get('transfer-encoding')
+ bytes_transferred = 0
+ try:
+ with ContextPool(len(nodes)) as pool:
+ for conn in conns:
+ conn.failed = False
+ conn.queue = Queue(self.app.put_queue_depth)
+ pool.spawn(self._send_file, conn, req.path)
+ while True:
+ with ChunkReadTimeout(self.app.client_timeout):
+ try:
+ chunk = next(data_source)
+ except StopIteration:
+ if chunked:
+ [conn.queue.put('0\r\n\r\n') for conn in conns]
+ break
+ bytes_transferred += len(chunk)
+ if bytes_transferred > MAX_FILE_SIZE:
+ self.app.logger.increment('errors')
+ return HTTPRequestEntityTooLarge(request=req)
+ for conn in list(conns):
+ if not conn.failed:
+ conn.queue.put('%x\r\n%s\r\n' % (len(chunk), chunk)
+ if chunked else chunk)
+ else:
+ conns.remove(conn)
+ if len(conns) <= len(nodes) / 2:
+ self.app.logger.error(_('Object PUT exceptions during'
+ ' send, %(conns)s/%(nodes)s required connections'),
+ {'conns': len(conns), 'nodes': len(nodes) / 2 + 1})
+ self.app.logger.increment('errors')
+ return HTTPServiceUnavailable(request=req)
+ for conn in conns:
+ if conn.queue.unfinished_tasks:
+ conn.queue.join()
+ conns = [conn for conn in conns if not conn.failed]
+ except ChunkReadTimeout, err:
+ self.app.logger.warn(
+ _('ERROR Client read timeout (%ss)'), err.seconds)
+ self.app.logger.increment('client_timeouts')
+ return HTTPRequestTimeout(request=req)
+ except (Exception, Timeout):
+ self.app.logger.exception(
+ _('ERROR Exception causing client disconnect'))
+ self.app.logger.increment('client_disconnects')
+ self.app.logger.timing_since(
+ '%s.timing' % (stats_type,), start_time)
+ return HTTPClientDisconnect(request=req)
+ if req.content_length and bytes_transferred < req.content_length:
+ req.client_disconnect = True
+ self.app.logger.warn(
+ _('Client disconnected without sending enough data'))
+ self.app.logger.increment('client_disconnects')
+ self.app.logger.timing_since(
+ '%s.timing' % (stats_type,), start_time)
+ return HTTPClientDisconnect(request=req)
+ statuses = []
+ reasons = []
+ bodies = []
+ etags = set()
+ for conn in conns:
+ try:
+ with Timeout(self.app.node_timeout):
+ response = conn.getresponse()
+ statuses.append(response.status)
+ reasons.append(response.reason)
+ bodies.append(response.read())
+ if response.status >= HTTP_INTERNAL_SERVER_ERROR:
+ self.error_occurred(conn.node,
+ _('ERROR %(status)d %(body)s From Object Server ' \
+ 're: %(path)s') % {'status': response.status,
+ 'body': bodies[-1][:1024], 'path': req.path})
+ elif is_success(response.status):
+ etags.add(response.getheader('etag').strip('"'))
+ except (Exception, Timeout):
+ self.exception_occurred(conn.node, _('Object'),
+ _('Trying to get final status of PUT to %s') % req.path)
+ if len(etags) > 1:
+ self.app.logger.error(
+ _('Object servers returned %s mismatched etags'), len(etags))
+ self.app.logger.increment('errors')
+ return HTTPServerError(request=req)
+ etag = len(etags) and etags.pop() or None
+ while len(statuses) < len(nodes):
+ statuses.append(HTTP_SERVICE_UNAVAILABLE)
+ reasons.append('')
+ bodies.append('')
+ resp = self.best_response(req, statuses, reasons, bodies,
+ _('Object PUT'), etag=etag)
+ if source_header:
+ resp.headers['X-Copied-From'] = quote(
+ source_header.split('/', 2)[2])
+ if 'last-modified' in source_resp.headers:
+ resp.headers['X-Copied-From-Last-Modified'] = \
+ source_resp.headers['last-modified']
+ for k, v in req.headers.items():
+ if k.lower().startswith('x-object-meta-'):
+ resp.headers[k] = v
+ resp.last_modified = float(req.headers['X-Timestamp'])
+ self.app.logger.timing_since('%s.timing' % (stats_type,), start_time)
+ return resp
+
+ @public
+ @delay_denial
+ def DELETE(self, req):
+ """HTTP DELETE request handler."""
+ start_time = time.time()
+ (container_partition, containers, _junk, req.acl,
+ req.environ['swift_sync_key'], object_versions) = \
+ self.container_info(self.account_name, self.container_name)
+ if object_versions:
+ # this is a version manifest and needs to be handled differently
+ lcontainer = object_versions.split('/')[0]
+ prefix_len = '%03x' % len(self.object_name)
+ lprefix = prefix_len + self.object_name + '/'
+ last_item = None
+ try:
+ for last_item in self._listing_iter(lcontainer, lprefix,
+ req.environ):
+ pass
+ except ListingIterNotFound:
+ # no worries, last_item is None
+ pass
+ except ListingIterNotAuthorized, err:
+ self.app.logger.increment('auth_short_circuits')
+ return err.aresp
+ except ListingIterError:
+ self.app.logger.increment('errors')
+ return HTTPServerError(request=req)
+ if last_item:
+ # there are older versions so copy the previous version to the
+ # current object and delete the previous version
+ orig_container = self.container_name
+ orig_obj = self.object_name
+ self.container_name = lcontainer
+ self.object_name = last_item['name']
+ copy_path = '/' + self.account_name + '/' + \
+ self.container_name + '/' + self.object_name
+ copy_headers = {'X-Newest': 'True',
+ 'Destination': orig_container + '/' + orig_obj
+ }
+ copy_environ = {'REQUEST_METHOD': 'COPY',
+ 'swift_versioned_copy': True
+ }
+ creq = Request.blank(copy_path, headers=copy_headers,
+ environ=copy_environ)
+ copy_resp = self.COPY(creq)
+ if is_client_error(copy_resp.status_int):
+ # some user error, maybe permissions
+ return HTTPPreconditionFailed(request=req)
+ elif not is_success(copy_resp.status_int):
+ # could not copy the data, bail
+ return HTTPServiceUnavailable(request=req)
+ # reset these because the COPY changed them
+ self.container_name = lcontainer
+ self.object_name = last_item['name']
+ new_del_req = Request.blank(copy_path, environ=req.environ)
+ (container_partition, containers,
+ _junk, new_del_req.acl, _junk, _junk) = \
+ self.container_info(self.account_name, self.container_name)
+ new_del_req.path_info = copy_path
+ req = new_del_req
+ if 'swift.authorize' in req.environ:
+ aresp = req.environ['swift.authorize'](req)
+ if aresp:
+ self.app.logger.increment('auth_short_circuits')
+ return aresp
+ if not containers:
+ self.app.logger.timing_since('DELETE.timing', start_time)
+ return HTTPNotFound(request=req)
+ partition, nodes = self.app.object_ring.get_nodes(
+ self.account_name, self.container_name, self.object_name)
+ # Used by container sync feature
+ if 'x-timestamp' in req.headers:
+ try:
+ req.headers['X-Timestamp'] = \
+ normalize_timestamp(float(req.headers['x-timestamp']))
+ except ValueError:
+ self.app.logger.increment('errors')
+ return HTTPBadRequest(request=req, content_type='text/plain',
+ body='X-Timestamp should be a UNIX timestamp float value; '
+ 'was %r' % req.headers['x-timestamp'])
+ else:
+ req.headers['X-Timestamp'] = normalize_timestamp(time.time())
+ headers = []
+ for container in containers:
+ nheaders = dict(req.headers.iteritems())
+ nheaders['Connection'] = 'close'
+ nheaders['X-Container-Host'] = '%(ip)s:%(port)s' % container
+ nheaders['X-Container-Partition'] = container_partition
+ nheaders['X-Container-Device'] = container['device']
+ headers.append(nheaders)
+ resp = self.make_requests(req, self.app.object_ring,
+ partition, 'DELETE', req.path_info, headers)
+ self.app.logger.timing_since('DELETE.timing', start_time)
+ return resp
+
+ @public
+ @delay_denial
+ def COPY(self, req):
+ """HTTP COPY request handler."""
+ start_time = time.time()
+ dest = req.headers.get('Destination')
+ if not dest:
+ self.app.logger.increment('errors')
+ return HTTPPreconditionFailed(request=req,
+ body='Destination header required')
+ dest = unquote(dest)
+ if not dest.startswith('/'):
+ dest = '/' + dest
+ try:
+ _junk, dest_container, dest_object = dest.split('/', 2)
+ except ValueError:
+ self.app.logger.increment('errors')
+ return HTTPPreconditionFailed(request=req,
+ body='Destination header must be of the form '
+ '<container name>/<object name>')
+ source = '/' + self.container_name + '/' + self.object_name
+ self.container_name = dest_container
+ self.object_name = dest_object
+ # re-write the existing request as a PUT instead of creating a new one
+ # since this one is already attached to the posthooklogger
+ req.method = 'PUT'
+ req.path_info = '/' + self.account_name + dest
+ req.headers['Content-Length'] = 0
+ req.headers['X-Copy-From'] = quote(source)
+ del req.headers['Destination']
+ return self.PUT(req, start_time=start_time, stats_type='COPY')
diff --git a/swift/proxy/server.py b/swift/proxy/server.py
index c42632cc2..b7aac4c3f 100644
--- a/swift/proxy/server.py
+++ b/swift/proxy/server.py
@@ -24,1886 +24,23 @@
# These shenanigans are to ensure all related objects can be garbage
# collected. We've seen objects hang around forever otherwise.
-from __future__ import with_statement
-try:
- import simplejson as json
-except ImportError:
- import json
import mimetypes
import os
-import re
import time
-import traceback
from ConfigParser import ConfigParser
-from datetime import datetime
-from urllib import unquote, quote
import uuid
-import functools
-from hashlib import md5
-from random import shuffle
-from eventlet import sleep, spawn_n, GreenPile, Timeout
-from eventlet.queue import Queue, Empty, Full
-from eventlet.timeout import Timeout
-from webob.exc import HTTPAccepted, HTTPBadRequest, HTTPForbidden, \
- HTTPMethodNotAllowed, HTTPNotFound, HTTPPreconditionFailed, \
- HTTPRequestEntityTooLarge, HTTPRequestTimeout, HTTPServerError, \
- HTTPServiceUnavailable, status_map
-from webob import Request, Response
+from eventlet import Timeout
+from webob.exc import HTTPBadRequest, HTTPForbidden, HTTPMethodNotAllowed, \
+ HTTPNotFound, HTTPPreconditionFailed, HTTPServerError
+from webob import Request
from swift.common.ring import Ring
-from swift.common.utils import cache_from_env, ContextPool, get_logger, \
- get_remote_client, normalize_timestamp, split_path, TRUE_VALUES, public
-from swift.common.bufferedhttp import http_connect
-from swift.common.constraints import check_metadata, check_object_creation, \
- check_utf8, CONTAINER_LISTING_LIMIT, MAX_ACCOUNT_NAME_LENGTH, \
- MAX_CONTAINER_NAME_LENGTH, MAX_FILE_SIZE
-from swift.common.exceptions import ChunkReadTimeout, \
- ChunkWriteTimeout, ConnectionTimeout, ListingIterNotFound, \
- ListingIterNotAuthorized, ListingIterError
-from swift.common.http import is_informational, is_success, is_redirection, \
- is_client_error, is_server_error, HTTP_CONTINUE, HTTP_OK, HTTP_CREATED, \
- HTTP_ACCEPTED, HTTP_PARTIAL_CONTENT, HTTP_MULTIPLE_CHOICES, \
- HTTP_BAD_REQUEST, HTTP_NOT_FOUND, HTTP_REQUESTED_RANGE_NOT_SATISFIABLE, \
- HTTP_INTERNAL_SERVER_ERROR, HTTP_SERVICE_UNAVAILABLE, \
- HTTP_INSUFFICIENT_STORAGE, HTTPClientDisconnect
-
-
-def update_headers(response, headers):
- """
- Helper function to update headers in the response.
-
- :param response: webob.Response object
- :param headers: dictionary headers
- """
- if hasattr(headers, 'items'):
- headers = headers.items()
- for name, value in headers:
- if name == 'etag':
- response.headers[name] = value.replace('"', '')
- elif name not in ('date', 'content-length', 'content-type',
- 'connection', 'x-put-timestamp', 'x-delete-after'):
- response.headers[name] = value
-
-
-def delay_denial(func):
- """
- Decorator to declare which methods should have any swift.authorize call
- delayed. This is so the method can load the Request object up with
- additional information that may be needed by the authorization system.
-
- :param func: function for which authorization will be delayed
- """
- func.delay_denial = True
-
- @functools.wraps(func)
- def wrapped(*a, **kw):
- return func(*a, **kw)
- return wrapped
-
-
-def get_account_memcache_key(account):
- return 'account/%s' % account
-
-
-def get_container_memcache_key(account, container):
- return 'container/%s/%s' % (account, container)
-
-
-class SegmentedIterable(object):
- """
- Iterable that returns the object contents for a segmented object in Swift.
-
- If there's a failure that cuts the transfer short, the response's
- `status_int` will be updated (again, just for logging since the original
- status would have already been sent to the client).
-
- :param controller: The ObjectController instance to work with.
- :param container: The container the object segments are within.
- :param listing: The listing of object segments to iterate over; this may
- be an iterator or list that returns dicts with 'name' and
- 'bytes' keys.
- :param response: The webob.Response this iterable is associated with, if
- any (default: None)
- """
-
- def __init__(self, controller, container, listing, response=None):
- self.controller = controller
- self.container = container
- self.listing = iter(listing)
- self.segment = -1
- self.segment_dict = None
- self.segment_peek = None
- self.seek = 0
- self.segment_iter = None
- # See NOTE: swift_conn at top of file about this.
- self.segment_iter_swift_conn = None
- self.position = 0
- self.response = response
- if not self.response:
- self.response = Response()
- self.next_get_time = 0
-
- def _load_next_segment(self):
- """
- Loads the self.segment_iter with the next object segment's contents.
-
- :raises: StopIteration when there are no more object segments.
- """
- try:
- self.segment += 1
- self.segment_dict = self.segment_peek or self.listing.next()
- self.segment_peek = None
- partition, nodes = self.controller.app.object_ring.get_nodes(
- self.controller.account_name, self.container,
- self.segment_dict['name'])
- path = '/%s/%s/%s' % (self.controller.account_name, self.container,
- self.segment_dict['name'])
- req = Request.blank(path)
- if self.seek:
- req.range = 'bytes=%s-' % self.seek
- self.seek = 0
- if self.segment > self.controller.app.rate_limit_after_segment:
- sleep(max(self.next_get_time - time.time(), 0))
- self.next_get_time = time.time() + \
- 1.0 / self.controller.app.rate_limit_segments_per_sec
- shuffle(nodes)
- resp = self.controller.GETorHEAD_base(req, _('Object'), partition,
- self.controller.iter_nodes(partition, nodes,
- self.controller.app.object_ring), path,
- len(nodes))
- if not is_success(resp.status_int):
- raise Exception(_('Could not load object segment %(path)s:' \
- ' %(status)s') % {'path': path, 'status': resp.status_int})
- self.segment_iter = resp.app_iter
- # See NOTE: swift_conn at top of file about this.
- self.segment_iter_swift_conn = getattr(resp, 'swift_conn', None)
- except StopIteration:
- raise
- except (Exception, Timeout), err:
- if not getattr(err, 'swift_logged', False):
- self.controller.app.logger.exception(_('ERROR: While '
- 'processing manifest /%(acc)s/%(cont)s/%(obj)s'),
- {'acc': self.controller.account_name,
- 'cont': self.controller.container_name,
- 'obj': self.controller.object_name})
- err.swift_logged = True
- self.response.status_int = HTTP_SERVICE_UNAVAILABLE
- raise
-
- def next(self):
- return iter(self).next()
-
- def __iter__(self):
- """ Standard iterator function that returns the object's contents. """
- try:
- while True:
- if not self.segment_iter:
- self._load_next_segment()
- while True:
- with ChunkReadTimeout(self.controller.app.node_timeout):
- try:
- chunk = self.segment_iter.next()
- break
- except StopIteration:
- self._load_next_segment()
- self.position += len(chunk)
- yield chunk
- except StopIteration:
- raise
- except (Exception, Timeout), err:
- if not getattr(err, 'swift_logged', False):
- self.controller.app.logger.exception(_('ERROR: While '
- 'processing manifest /%(acc)s/%(cont)s/%(obj)s'),
- {'acc': self.controller.account_name,
- 'cont': self.controller.container_name,
- 'obj': self.controller.object_name})
- err.swift_logged = True
- self.response.status_int = HTTP_SERVICE_UNAVAILABLE
- raise
-
- def app_iter_range(self, start, stop):
- """
- Non-standard iterator function for use with Webob in serving Range
- requests more quickly. This will skip over segments and do a range
- request on the first segment to return data from, if needed.
-
- :param start: The first byte (zero-based) to return. None for 0.
- :param stop: The last byte (zero-based) to return. None for end.
- """
- try:
- if start:
- self.segment_peek = self.listing.next()
- while start >= self.position + self.segment_peek['bytes']:
- self.segment += 1
- self.position += self.segment_peek['bytes']
- self.segment_peek = self.listing.next()
- self.seek = start - self.position
- else:
- start = 0
- if stop is not None:
- length = stop - start
- else:
- length = None
- for chunk in self:
- if length is not None:
- length -= len(chunk)
- if length < 0:
- # Chop off the extra:
- yield chunk[:length]
- break
- yield chunk
- # See NOTE: swift_conn at top of file about this.
- if self.segment_iter_swift_conn:
- try:
- self.segment_iter_swift_conn.close()
- except Exception:
- pass
- self.segment_iter_swift_conn = None
- if self.segment_iter:
- try:
- while self.segment_iter.next():
- pass
- except Exception:
- pass
- self.segment_iter = None
- except StopIteration:
- raise
- except (Exception, Timeout), err:
- if not getattr(err, 'swift_logged', False):
- self.controller.app.logger.exception(_('ERROR: While '
- 'processing manifest /%(acc)s/%(cont)s/%(obj)s'),
- {'acc': self.controller.account_name,
- 'cont': self.controller.container_name,
- 'obj': self.controller.object_name})
- err.swift_logged = True
- self.response.status_int = HTTP_SERVICE_UNAVAILABLE
- raise
-
-
-class Controller(object):
- """Base WSGI controller class for the proxy"""
- server_type = _('Base')
-
- # Ensure these are all lowercase
- pass_through_headers = []
-
- def __init__(self, app):
- self.account_name = None
- self.app = app
- self.trans_id = '-'
-
- def transfer_headers(self, src_headers, dst_headers):
- x_remove = 'x-remove-%s-meta-' % self.server_type.lower()
- x_meta = 'x-%s-meta-' % self.server_type.lower()
- dst_headers.update((k.lower().replace('-remove', '', 1), '')
- for k in src_headers
- if k.lower().startswith(x_remove))
- dst_headers.update((k.lower(), v)
- for k, v in src_headers.iteritems()
- if k.lower() in self.pass_through_headers or
- k.lower().startswith(x_meta))
-
- def error_increment(self, node):
- """
- Handles incrementing error counts when talking to nodes.
-
- :param node: dictionary of node to increment the error count for
- """
- node['errors'] = node.get('errors', 0) + 1
- node['last_error'] = time.time()
-
- def error_occurred(self, node, msg):
- """
- Handle logging, and handling of errors.
-
- :param node: dictionary of node to handle errors for
- :param msg: error message
- """
- self.error_increment(node)
- self.app.logger.error(_('%(msg)s %(ip)s:%(port)s'),
- {'msg': msg, 'ip': node['ip'], 'port': node['port']})
-
- def exception_occurred(self, node, typ, additional_info):
- """
- Handle logging of generic exceptions.
-
- :param node: dictionary of node to log the error for
- :param typ: server type
- :param additional_info: additional information to log
- """
- self.app.logger.exception(
- _('ERROR with %(type)s server %(ip)s:%(port)s/%(device)s re: '
- '%(info)s'),
- {'type': typ, 'ip': node['ip'], 'port': node['port'],
- 'device': node['device'], 'info': additional_info})
-
- def error_limited(self, node):
- """
- Check if the node is currently error limited.
-
- :param node: dictionary of node to check
- :returns: True if error limited, False otherwise
- """
- now = time.time()
- if not 'errors' in node:
- return False
- if 'last_error' in node and node['last_error'] < \
- now - self.app.error_suppression_interval:
- del node['last_error']
- if 'errors' in node:
- del node['errors']
- return False
- limited = node['errors'] > self.app.error_suppression_limit
- if limited:
- self.app.logger.debug(
- _('Node error limited %(ip)s:%(port)s (%(device)s)'), node)
- return limited
-
- def error_limit(self, node):
- """
- Mark a node as error limited.
-
- :param node: dictionary of node to error limit
- """
- node['errors'] = self.app.error_suppression_limit + 1
- node['last_error'] = time.time()
-
- def account_info(self, account, autocreate=False):
- """
- Get account information, and also verify that the account exists.
-
- :param account: name of the account to get the info for
- :returns: tuple of (account partition, account nodes, container_count)
- or (None, None, None) if it does not exist
- """
- partition, nodes = self.app.account_ring.get_nodes(account)
- # 0 = no responses, 200 = found, 404 = not found, -1 = mixed responses
- if self.app.memcache:
- cache_key = get_account_memcache_key(account)
- cache_value = self.app.memcache.get(cache_key)
- if not isinstance(cache_value, dict):
- result_code = cache_value
- container_count = 0
- else:
- result_code = cache_value['status']
- container_count = cache_value['container_count']
- if result_code == HTTP_OK:
- return partition, nodes, container_count
- elif result_code == HTTP_NOT_FOUND and not autocreate:
- return None, None, None
- result_code = 0
- container_count = 0
- attempts_left = len(nodes)
- path = '/%s' % account
- headers = {'x-trans-id': self.trans_id, 'Connection': 'close'}
- iternodes = self.iter_nodes(partition, nodes, self.app.account_ring)
- while attempts_left > 0:
- try:
- node = iternodes.next()
- except StopIteration:
- break
- attempts_left -= 1
- try:
- with ConnectionTimeout(self.app.conn_timeout):
- conn = http_connect(node['ip'], node['port'],
- node['device'], partition, 'HEAD', path, headers)
- with Timeout(self.app.node_timeout):
- resp = conn.getresponse()
- body = resp.read()
- if is_success(resp.status):
- result_code = HTTP_OK
- container_count = int(
- resp.getheader('x-account-container-count') or 0)
- break
- elif resp.status == HTTP_NOT_FOUND:
- if result_code == 0:
- result_code = HTTP_NOT_FOUND
- elif result_code != HTTP_NOT_FOUND:
- result_code = -1
- elif resp.status == HTTP_INSUFFICIENT_STORAGE:
- self.error_limit(node)
- continue
- else:
- result_code = -1
- except (Exception, Timeout):
- self.exception_occurred(node, _('Account'),
- _('Trying to get account info for %s') % path)
- if result_code == HTTP_NOT_FOUND and autocreate:
- if len(account) > MAX_ACCOUNT_NAME_LENGTH:
- return None, None, None
- headers = {'X-Timestamp': normalize_timestamp(time.time()),
- 'X-Trans-Id': self.trans_id,
- 'Connection': 'close'}
- resp = self.make_requests(Request.blank('/v1' + path),
- self.app.account_ring, partition, 'PUT',
- path, [headers] * len(nodes))
- if not is_success(resp.status_int):
- self.app.logger.warning('Could not autocreate account %r' % \
- path)
- return None, None, None
- result_code = HTTP_OK
- if self.app.memcache and result_code in (HTTP_OK, HTTP_NOT_FOUND):
- if result_code == HTTP_OK:
- cache_timeout = self.app.recheck_account_existence
- else:
- cache_timeout = self.app.recheck_account_existence * 0.1
- self.app.memcache.set(cache_key,
- {'status': result_code, 'container_count': container_count},
- timeout=cache_timeout)
- if result_code == HTTP_OK:
- return partition, nodes, container_count
- return None, None, None
-
- def container_info(self, account, container, account_autocreate=False):
- """
- Get container information and thusly verify container existance.
- This will also make a call to account_info to verify that the
- account exists.
-
- :param account: account name for the container
- :param container: container name to look up
- :returns: tuple of (container partition, container nodes, container
- read acl, container write acl, container sync key) or (None,
- None, None, None, None) if the container does not exist
- """
- partition, nodes = self.app.container_ring.get_nodes(
- account, container)
- path = '/%s/%s' % (account, container)
- if self.app.memcache:
- cache_key = get_container_memcache_key(account, container)
- cache_value = self.app.memcache.get(cache_key)
- if isinstance(cache_value, dict):
- status = cache_value['status']
- read_acl = cache_value['read_acl']
- write_acl = cache_value['write_acl']
- sync_key = cache_value.get('sync_key')
- versions = cache_value.get('versions')
- if status == HTTP_OK:
- return partition, nodes, read_acl, write_acl, sync_key, \
- versions
- elif status == HTTP_NOT_FOUND:
- return None, None, None, None, None, None
- if not self.account_info(account, autocreate=account_autocreate)[1]:
- return None, None, None, None, None, None
- result_code = 0
- read_acl = None
- write_acl = None
- sync_key = None
- container_size = None
- versions = None
- attempts_left = len(nodes)
- headers = {'x-trans-id': self.trans_id, 'Connection': 'close'}
- iternodes = self.iter_nodes(partition, nodes, self.app.container_ring)
- while attempts_left > 0:
- try:
- node = iternodes.next()
- except StopIteration:
- break
- attempts_left -= 1
- try:
- with ConnectionTimeout(self.app.conn_timeout):
- conn = http_connect(node['ip'], node['port'],
- node['device'], partition, 'HEAD', path, headers)
- with Timeout(self.app.node_timeout):
- resp = conn.getresponse()
- body = resp.read()
- if is_success(resp.status):
- result_code = HTTP_OK
- read_acl = resp.getheader('x-container-read')
- write_acl = resp.getheader('x-container-write')
- sync_key = resp.getheader('x-container-sync-key')
- container_size = \
- resp.getheader('X-Container-Object-Count')
- versions = resp.getheader('x-versions-location')
- break
- elif resp.status == HTTP_NOT_FOUND:
- if result_code == 0:
- result_code = HTTP_NOT_FOUND
- elif result_code != HTTP_NOT_FOUND:
- result_code = -1
- elif resp.status == HTTP_INSUFFICIENT_STORAGE:
- self.error_limit(node)
- continue
- else:
- result_code = -1
- except (Exception, Timeout):
- self.exception_occurred(node, _('Container'),
- _('Trying to get container info for %s') % path)
- if self.app.memcache and result_code in (HTTP_OK, HTTP_NOT_FOUND):
- if result_code == HTTP_OK:
- cache_timeout = self.app.recheck_container_existence
- else:
- cache_timeout = self.app.recheck_container_existence * 0.1
- self.app.memcache.set(cache_key,
- {'status': result_code,
- 'read_acl': read_acl,
- 'write_acl': write_acl,
- 'sync_key': sync_key,
- 'container_size': container_size,
- 'versions': versions},
- timeout=cache_timeout)
- if result_code == HTTP_OK:
- return partition, nodes, read_acl, write_acl, sync_key, versions
- return None, None, None, None, None, None
-
- def iter_nodes(self, partition, nodes, ring):
- """
- Node iterator that will first iterate over the normal nodes for a
- partition and then the handoff partitions for the node.
-
- :param partition: partition to iterate nodes for
- :param nodes: list of node dicts from the ring
- :param ring: ring to get handoff nodes from
- """
- for node in nodes:
- if not self.error_limited(node):
- yield node
- handoffs = 0
- for node in ring.get_more_nodes(partition):
- if not self.error_limited(node):
- handoffs += 1
- if self.app.log_handoffs:
- self.app.logger.increment('handoff_count')
- self.app.logger.warning(
- 'Handoff requested (%d)' % handoffs)
- if handoffs == len(nodes):
- self.app.logger.increment('handoff_all_count')
- yield node
-
- def _make_request(self, nodes, part, method, path, headers, query,
- logger_thread_locals):
- self.app.logger.thread_locals = logger_thread_locals
- for node in nodes:
- try:
- with ConnectionTimeout(self.app.conn_timeout):
- conn = http_connect(node['ip'], node['port'],
- node['device'], part, method, path,
- headers=headers, query_string=query)
- conn.node = node
- with Timeout(self.app.node_timeout):
- resp = conn.getresponse()
- if not is_informational(resp.status) and \
- not is_server_error(resp.status):
- return resp.status, resp.reason, resp.read()
- elif resp.status == HTTP_INSUFFICIENT_STORAGE:
- self.error_limit(node)
- except (Exception, Timeout):
- self.exception_occurred(node, self.server_type,
- _('Trying to %(method)s %(path)s') %
- {'method': method, 'path': path})
-
- def make_requests(self, req, ring, part, method, path, headers,
- query_string=''):
- """
- Sends an HTTP request to multiple nodes and aggregates the results.
- It attempts the primary nodes concurrently, then iterates over the
- handoff nodes as needed.
-
- :param headers: a list of dicts, where each dict represents one
- backend request that should be made.
- :returns: a webob Response object
- """
- start_nodes = ring.get_part_nodes(part)
- nodes = self.iter_nodes(part, start_nodes, ring)
- pile = GreenPile(len(start_nodes))
- for head in headers:
- pile.spawn(self._make_request, nodes, part, method, path,
- head, query_string, self.app.logger.thread_locals)
- response = [resp for resp in pile if resp]
- while len(response) < len(start_nodes):
- response.append((HTTP_SERVICE_UNAVAILABLE, '', ''))
- statuses, reasons, bodies = zip(*response)
- return self.best_response(req, statuses, reasons, bodies,
- '%s %s' % (self.server_type, req.method))
-
- def best_response(self, req, statuses, reasons, bodies, server_type,
- etag=None):
- """
- Given a list of responses from several servers, choose the best to
- return to the API.
-
- :param req: webob.Request object
- :param statuses: list of statuses returned
- :param reasons: list of reasons for each status
- :param bodies: bodies of each response
- :param server_type: type of server the responses came from
- :param etag: etag
- :returns: webob.Response object with the correct status, body, etc. set
- """
- resp = Response(request=req)
- if len(statuses):
- for hundred in (HTTP_OK, HTTP_MULTIPLE_CHOICES, HTTP_BAD_REQUEST):
- hstatuses = \
- [s for s in statuses if hundred <= s < hundred + 100]
- if len(hstatuses) > len(statuses) / 2:
- status = max(hstatuses)
- status_index = statuses.index(status)
- resp.status = '%s %s' % (status, reasons[status_index])
- resp.body = bodies[status_index]
- resp.content_type = 'text/html'
- if etag:
- resp.headers['etag'] = etag.strip('"')
- return resp
- self.app.logger.error(_('%(type)s returning 503 for %(statuses)s'),
- {'type': server_type, 'statuses': statuses})
- resp.status = '503 Internal Server Error'
- return resp
-
- @public
- def GET(self, req):
- """Handler for HTTP GET requests."""
- return self.GETorHEAD(req, stats_type='GET')
-
- @public
- def HEAD(self, req):
- """Handler for HTTP HEAD requests."""
- return self.GETorHEAD(req, stats_type='HEAD')
-
- def _make_app_iter_reader(self, node, source, queue, logger_thread_locals):
- """
- Reads from the source and places data in the queue. It expects
- something else be reading from the queue and, if nothing does within
- self.app.client_timeout seconds, the process will be aborted.
-
- :param node: The node dict that the source is connected to, for
- logging/error-limiting purposes.
- :param source: The httplib.Response object to read from.
- :param queue: The eventlet.queue.Queue to place read source data into.
- :param logger_thread_locals: The thread local values to be set on the
- self.app.logger to retain transaction
- logging information.
- """
- self.app.logger.thread_locals = logger_thread_locals
- success = True
- try:
- try:
- while True:
- with ChunkReadTimeout(self.app.node_timeout):
- chunk = source.read(self.app.object_chunk_size)
- if not chunk:
- break
- queue.put(chunk, timeout=self.app.client_timeout)
- except Full:
- self.app.logger.warn(
- _('Client did not read from queue within %ss') %
- self.app.client_timeout)
- self.app.logger.increment('client_timeouts')
- success = False
- except (Exception, Timeout):
- self.exception_occurred(node, _('Object'),
- _('Trying to read during GET'))
- success = False
- finally:
- # Ensure the queue getter gets a terminator.
- queue.resize(2)
- queue.put(success)
- # Close-out the connection as best as possible.
- if getattr(source, 'swift_conn', None):
- try:
- source.swift_conn.close()
- except Exception:
- pass
- source.swift_conn = None
- try:
- while source.read(self.app.object_chunk_size):
- pass
- except Exception:
- pass
- try:
- source.close()
- except Exception:
- pass
-
- def _make_app_iter(self, node, source, response):
- """
- Returns an iterator over the contents of the source (via its read
- func). There is also quite a bit of cleanup to ensure garbage
- collection works and the underlying socket of the source is closed.
-
- :param response: The webob.Response object this iterator should be
- assigned to via response.app_iter.
- :param source: The httplib.Response object this iterator should read
- from.
- :param node: The node the source is reading from, for logging purposes.
- """
- try:
- try:
- # Spawn reader to read from the source and place in the queue.
- # We then drop any reference to the source or node, for garbage
- # collection purposes.
- queue = Queue(1)
- spawn_n(self._make_app_iter_reader, node, source, queue,
- self.app.logger.thread_locals)
- source = node = None
- while True:
- chunk = queue.get(timeout=self.app.node_timeout)
- if isinstance(chunk, bool): # terminator
- success = chunk
- if not success:
- raise Exception(_('Failed to read all data'
- ' from the source'))
- break
- yield chunk
- except Empty:
- raise ChunkReadTimeout()
- except (GeneratorExit, Timeout):
- self.app.logger.warn(_('Client disconnected on read'))
- except Exception:
- self.app.logger.exception(_('Trying to send to client'))
- raise
- finally:
- response.app_iter = None
-
- def GETorHEAD_base(self, req, server_type, partition, nodes, path,
- attempts):
- """
- Base handler for HTTP GET or HEAD requests.
-
- :param req: webob.Request object
- :param server_type: server type
- :param partition: partition
- :param nodes: nodes
- :param path: path for the request
- :param attempts: number of attempts to try
- :returns: webob.Response object
- """
- statuses = []
- reasons = []
- bodies = []
- source = None
- newest = req.headers.get('x-newest', 'f').lower() in TRUE_VALUES
- nodes = iter(nodes)
- while len(statuses) < attempts:
- try:
- node = nodes.next()
- except StopIteration:
- break
- if self.error_limited(node):
- continue
- try:
- with ConnectionTimeout(self.app.conn_timeout):
- headers = dict(req.headers)
- headers['Connection'] = 'close'
- conn = http_connect(node['ip'], node['port'],
- node['device'], partition, req.method, path,
- headers=headers,
- query_string=req.query_string)
- with Timeout(self.app.node_timeout):
- possible_source = conn.getresponse()
- # See NOTE: swift_conn at top of file about this.
- possible_source.swift_conn = conn
- except (Exception, Timeout):
- self.exception_occurred(node, server_type,
- _('Trying to %(method)s %(path)s') %
- {'method': req.method, 'path': req.path})
- continue
- if possible_source.status == HTTP_INSUFFICIENT_STORAGE:
- self.error_limit(node)
- continue
- if is_success(possible_source.status) or \
- is_redirection(possible_source.status):
- # 404 if we know we don't have a synced copy
- if not float(possible_source.getheader('X-PUT-Timestamp', 1)):
- statuses.append(HTTP_NOT_FOUND)
- reasons.append('')
- bodies.append('')
- possible_source.read()
- continue
- if newest:
- if source:
- ts = float(source.getheader('x-put-timestamp') or
- source.getheader('x-timestamp') or 0)
- pts = float(
- possible_source.getheader('x-put-timestamp') or
- possible_source.getheader('x-timestamp') or 0)
- if pts > ts:
- source = possible_source
- else:
- source = possible_source
- statuses.append(source.status)
- reasons.append(source.reason)
- bodies.append('')
- continue
- else:
- source = possible_source
- break
- statuses.append(possible_source.status)
- reasons.append(possible_source.reason)
- bodies.append(possible_source.read())
- if is_server_error(possible_source.status):
- self.error_occurred(node, _('ERROR %(status)d %(body)s ' \
- 'From %(type)s Server') %
- {'status': possible_source.status,
- 'body': bodies[-1][:1024], 'type': server_type})
- if source:
- if req.method == 'GET' and \
- source.status in (HTTP_OK, HTTP_PARTIAL_CONTENT):
- res = Response(request=req, conditional_response=True)
- res.app_iter = self._make_app_iter(node, source, res)
- # See NOTE: swift_conn at top of file about this.
- res.swift_conn = source.swift_conn
- update_headers(res, source.getheaders())
- # Used by container sync feature
- if res.environ is None:
- res.environ = dict()
- res.environ['swift_x_timestamp'] = \
- source.getheader('x-timestamp')
- update_headers(res, {'accept-ranges': 'bytes'})
- res.status = source.status
- res.content_length = source.getheader('Content-Length')
- if source.getheader('Content-Type'):
- res.charset = None
- res.content_type = source.getheader('Content-Type')
- return res
- elif is_success(source.status) or is_redirection(source.status):
- res = status_map[source.status](request=req)
- update_headers(res, source.getheaders())
- # Used by container sync feature
- if res.environ is None:
- res.environ = dict()
- res.environ['swift_x_timestamp'] = \
- source.getheader('x-timestamp')
- update_headers(res, {'accept-ranges': 'bytes'})
- res.content_length = source.getheader('Content-Length')
- if source.getheader('Content-Type'):
- res.charset = None
- res.content_type = source.getheader('Content-Type')
- return res
- return self.best_response(req, statuses, reasons, bodies,
- '%s %s' % (server_type, req.method))
-
-
-class ObjectController(Controller):
- """WSGI controller for object requests."""
- server_type = _('Object')
-
- def __init__(self, app, account_name, container_name, object_name,
- **kwargs):
- Controller.__init__(self, app)
- self.account_name = unquote(account_name)
- self.container_name = unquote(container_name)
- self.object_name = unquote(object_name)
-
- def _listing_iter(self, lcontainer, lprefix, env):
- lpartition, lnodes = self.app.container_ring.get_nodes(
- self.account_name, lcontainer)
- marker = ''
- while True:
- lreq = Request.blank('i will be overridden by env', environ=env)
- # Don't quote PATH_INFO, by WSGI spec
- lreq.environ['PATH_INFO'] = \
- '/%s/%s' % (self.account_name, lcontainer)
- lreq.environ['REQUEST_METHOD'] = 'GET'
- lreq.environ['QUERY_STRING'] = \
- 'format=json&prefix=%s&marker=%s' % (quote(lprefix),
- quote(marker))
- shuffle(lnodes)
- lresp = self.GETorHEAD_base(lreq, _('Container'),
- lpartition, lnodes, lreq.path_info,
- len(lnodes))
- if 'swift.authorize' in env:
- lreq.acl = lresp.headers.get('x-container-read')
- aresp = env['swift.authorize'](lreq)
- if aresp:
- raise ListingIterNotAuthorized(aresp)
- if lresp.status_int == HTTP_NOT_FOUND:
- raise ListingIterNotFound()
- elif not is_success(lresp.status_int):
- raise ListingIterError()
- if not lresp.body:
- break
- sublisting = json.loads(lresp.body)
- if not sublisting:
- break
- marker = sublisting[-1]['name']
- for obj in sublisting:
- yield obj
-
- def GETorHEAD(self, req, stats_type):
- """Handle HTTP GET or HEAD requests."""
- start_time = time.time()
- _junk, _junk, req.acl, _junk, _junk, object_versions = \
- self.container_info(self.account_name, self.container_name)
- if 'swift.authorize' in req.environ:
- aresp = req.environ['swift.authorize'](req)
- if aresp:
- self.app.logger.increment('auth_short_circuits')
- return aresp
- partition, nodes = self.app.object_ring.get_nodes(
- self.account_name, self.container_name, self.object_name)
- shuffle(nodes)
- resp = self.GETorHEAD_base(req, _('Object'), partition,
- self.iter_nodes(partition, nodes, self.app.object_ring),
- req.path_info, len(nodes))
- # Whether we get a 416 Requested Range Not Satisfiable or not,
- # we should request a manifest because size of manifest file
- # can be not 0. After checking a manifest, redo the range request
- # on the whole object.
- if req.range:
- req_range = req.range
- req.range = None
- resp2 = self.GETorHEAD_base(req, _('Object'), partition,
- self.iter_nodes(partition,
- nodes,
- self.app.object_ring),
- req.path_info, len(nodes))
- if 'x-object-manifest' not in resp2.headers:
- self.app.logger.timing_since(
- '%s.timing' % (stats_type,), start_time)
- return resp
- resp = resp2
- req.range = str(req_range)
-
- if 'x-object-manifest' in resp.headers:
- lcontainer, lprefix = \
- resp.headers['x-object-manifest'].split('/', 1)
- lcontainer = unquote(lcontainer)
- lprefix = unquote(lprefix)
- try:
- listing = list(self._listing_iter(lcontainer, lprefix,
- req.environ))
- except ListingIterNotFound:
- self.app.logger.timing_since(
- '%s.timing' % (stats_type,), start_time)
- return HTTPNotFound(request=req)
- except ListingIterNotAuthorized, err:
- self.app.logger.increment('auth_short_circuits')
- return err.aresp
- except ListingIterError:
- self.app.logger.increment('errors')
- return HTTPServerError(request=req)
-
- if len(listing) > CONTAINER_LISTING_LIMIT:
- resp = Response(headers=resp.headers, request=req,
- conditional_response=True)
- if req.method == 'HEAD':
- # These shenanigans are because webob translates the HEAD
- # request into a webob EmptyResponse for the body, which
- # has a len, which eventlet translates as needing a
- # content-length header added. So we call the original
- # webob resp for the headers but return an empty iterator
- # for the body.
-
- def head_response(environ, start_response):
- resp(environ, start_response)
- return iter([])
-
- head_response.status_int = resp.status_int
- self.app.logger.timing_since(
- '%s.timing' % (stats_type,), start_time)
- return head_response
- else:
- resp.app_iter = SegmentedIterable(self, lcontainer,
- self._listing_iter(lcontainer, lprefix, req.environ),
- resp)
-
- else:
- # For objects with a reasonable number of segments, we'll serve
- # them with a set content-length and computed etag.
- if listing:
- content_length = sum(o['bytes'] for o in listing)
- last_modified = max(o['last_modified'] for o in listing)
- last_modified = datetime(*map(int, re.split('[^\d]',
- last_modified)[:-1]))
- etag = md5(
- ''.join(o['hash'] for o in listing)).hexdigest()
- else:
- content_length = 0
- last_modified = resp.last_modified
- etag = md5().hexdigest()
- resp = Response(headers=resp.headers, request=req,
- conditional_response=True)
- resp.app_iter = SegmentedIterable(self, lcontainer, listing,
- resp)
- resp.content_length = content_length
- resp.last_modified = last_modified
- resp.etag = etag
- resp.headers['accept-ranges'] = 'bytes'
-
- self.app.logger.timing_since('%s.timing' % (stats_type,), start_time)
- return resp
-
- @public
- @delay_denial
- def GET(self, req):
- """Handler for HTTP GET requests."""
- return self.GETorHEAD(req, stats_type='GET')
-
- @public
- @delay_denial
- def HEAD(self, req):
- """Handler for HTTP HEAD requests."""
- return self.GETorHEAD(req, stats_type='HEAD')
-
- @public
- @delay_denial
- def POST(self, req):
- """HTTP POST request handler."""
- start_time = time.time()
- if 'x-delete-after' in req.headers:
- try:
- x_delete_after = int(req.headers['x-delete-after'])
- except ValueError:
- self.app.logger.increment('errors')
- return HTTPBadRequest(request=req,
- content_type='text/plain',
- body='Non-integer X-Delete-After')
- req.headers['x-delete-at'] = '%d' % (time.time() + x_delete_after)
- if self.app.object_post_as_copy:
- req.method = 'PUT'
- req.path_info = '/%s/%s/%s' % (self.account_name,
- self.container_name, self.object_name)
- req.headers['Content-Length'] = 0
- req.headers['X-Copy-From'] = quote('/%s/%s' % (self.container_name,
- self.object_name))
- req.headers['X-Fresh-Metadata'] = 'true'
- req.environ['swift_versioned_copy'] = True
- resp = self.PUT(req, start_time=start_time, stats_type='POST')
- # Older editions returned 202 Accepted on object POSTs, so we'll
- # convert any 201 Created responses to that for compatibility with
- # picky clients.
- if resp.status_int != HTTP_CREATED:
- return resp
- return HTTPAccepted(request=req)
- else:
- error_response = check_metadata(req, 'object')
- if error_response:
- self.app.logger.increment('errors')
- return error_response
- container_partition, containers, _junk, req.acl, _junk, _junk = \
- self.container_info(self.account_name, self.container_name,
- account_autocreate=self.app.account_autocreate)
- if 'swift.authorize' in req.environ:
- aresp = req.environ['swift.authorize'](req)
- if aresp:
- self.app.logger.increment('auth_short_circuits')
- return aresp
- if not containers:
- self.app.logger.timing_since('POST.timing', start_time)
- return HTTPNotFound(request=req)
- if 'x-delete-at' in req.headers:
- try:
- x_delete_at = int(req.headers['x-delete-at'])
- if x_delete_at < time.time():
- self.app.logger.increment('errors')
- return HTTPBadRequest(body='X-Delete-At in past',
- request=req, content_type='text/plain')
- except ValueError:
- self.app.logger.increment('errors')
- return HTTPBadRequest(request=req,
- content_type='text/plain',
- body='Non-integer X-Delete-At')
- delete_at_container = str(x_delete_at /
- self.app.expiring_objects_container_divisor *
- self.app.expiring_objects_container_divisor)
- delete_at_part, delete_at_nodes = \
- self.app.container_ring.get_nodes(
- self.app.expiring_objects_account, delete_at_container)
- else:
- delete_at_part = delete_at_nodes = None
- partition, nodes = self.app.object_ring.get_nodes(
- self.account_name, self.container_name, self.object_name)
- req.headers['X-Timestamp'] = normalize_timestamp(time.time())
- headers = []
- for container in containers:
- nheaders = dict(req.headers.iteritems())
- nheaders['Connection'] = 'close'
- nheaders['X-Container-Host'] = '%(ip)s:%(port)s' % container
- nheaders['X-Container-Partition'] = container_partition
- nheaders['X-Container-Device'] = container['device']
- if delete_at_nodes:
- node = delete_at_nodes.pop(0)
- nheaders['X-Delete-At-Host'] = '%(ip)s:%(port)s' % node
- nheaders['X-Delete-At-Partition'] = delete_at_part
- nheaders['X-Delete-At-Device'] = node['device']
- headers.append(nheaders)
- resp = self.make_requests(req, self.app.object_ring, partition,
- 'POST', req.path_info, headers)
- self.app.logger.timing_since('POST.timing', start_time)
- return resp
-
- def _send_file(self, conn, path):
- """Method for a file PUT coro"""
- while True:
- chunk = conn.queue.get()
- if not conn.failed:
- try:
- with ChunkWriteTimeout(self.app.node_timeout):
- conn.send(chunk)
- except (Exception, ChunkWriteTimeout):
- conn.failed = True
- self.exception_occurred(conn.node, _('Object'),
- _('Trying to write to %s') % path)
- conn.queue.task_done()
-
- def _connect_put_node(self, nodes, part, path, headers,
- logger_thread_locals):
- """Method for a file PUT connect"""
- self.app.logger.thread_locals = logger_thread_locals
- for node in nodes:
- try:
- with ConnectionTimeout(self.app.conn_timeout):
- conn = http_connect(node['ip'], node['port'],
- node['device'], part, 'PUT', path, headers)
- with Timeout(self.app.node_timeout):
- resp = conn.getexpect()
- if resp.status == HTTP_CONTINUE:
- conn.node = node
- return conn
- elif resp.status == HTTP_INSUFFICIENT_STORAGE:
- self.error_limit(node)
- except:
- self.exception_occurred(node, _('Object'),
- _('Expect: 100-continue on %s') % path)
-
- @public
- @delay_denial
- def PUT(self, req, start_time=None, stats_type='PUT'):
- """HTTP PUT request handler."""
- if not start_time:
- start_time = time.time()
- (container_partition, containers, _junk, req.acl,
- req.environ['swift_sync_key'], object_versions) = \
- self.container_info(self.account_name, self.container_name,
- account_autocreate=self.app.account_autocreate)
- if 'swift.authorize' in req.environ:
- aresp = req.environ['swift.authorize'](req)
- if aresp:
- self.app.logger.increment('auth_short_circuits')
- return aresp
- if not containers:
- self.app.logger.timing_since(
- '%s.timing' % (stats_type,), start_time)
- return HTTPNotFound(request=req)
- if 'x-delete-after' in req.headers:
- try:
- x_delete_after = int(req.headers['x-delete-after'])
- except ValueError:
- self.app.logger.increment('errors')
- return HTTPBadRequest(request=req,
- content_type='text/plain',
- body='Non-integer X-Delete-After')
- req.headers['x-delete-at'] = '%d' % (time.time() + x_delete_after)
- if 'x-delete-at' in req.headers:
- try:
- x_delete_at = int(req.headers['x-delete-at'])
- if x_delete_at < time.time():
- self.app.logger.increment('errors')
- return HTTPBadRequest(body='X-Delete-At in past',
- request=req, content_type='text/plain')
- except ValueError:
- self.app.logger.increment('errors')
- return HTTPBadRequest(request=req, content_type='text/plain',
- body='Non-integer X-Delete-At')
- delete_at_container = str(x_delete_at /
- self.app.expiring_objects_container_divisor *
- self.app.expiring_objects_container_divisor)
- delete_at_part, delete_at_nodes = \
- self.app.container_ring.get_nodes(
- self.app.expiring_objects_account, delete_at_container)
- else:
- delete_at_part = delete_at_nodes = None
- partition, nodes = self.app.object_ring.get_nodes(
- self.account_name, self.container_name, self.object_name)
- # do a HEAD request for container sync and checking object versions
- if 'x-timestamp' in req.headers or (object_versions and not
- req.environ.get('swift_versioned_copy')):
- hreq = Request.blank(req.path_info, headers={'X-Newest': 'True'},
- environ={'REQUEST_METHOD': 'HEAD'})
- hresp = self.GETorHEAD_base(hreq, _('Object'), partition, nodes,
- hreq.path_info, len(nodes))
- # Used by container sync feature
- if 'x-timestamp' in req.headers:
- try:
- req.headers['X-Timestamp'] = \
- normalize_timestamp(float(req.headers['x-timestamp']))
- if hresp.environ and 'swift_x_timestamp' in hresp.environ and \
- float(hresp.environ['swift_x_timestamp']) >= \
- float(req.headers['x-timestamp']):
- self.app.logger.timing_since(
- '%.timing' % (stats_type,), start_time)
- return HTTPAccepted(request=req)
- except ValueError:
- self.app.logger.increment('errors')
- return HTTPBadRequest(request=req, content_type='text/plain',
- body='X-Timestamp should be a UNIX timestamp float value; '
- 'was %r' % req.headers['x-timestamp'])
- else:
- req.headers['X-Timestamp'] = normalize_timestamp(time.time())
- # Sometimes the 'content-type' header exists, but is set to None.
- content_type_manually_set = True
- if not req.headers.get('content-type'):
- guessed_type, _junk = mimetypes.guess_type(req.path_info)
- req.headers['Content-Type'] = guessed_type or \
- 'application/octet-stream'
- content_type_manually_set = False
- error_response = check_object_creation(req, self.object_name)
- if error_response:
- self.app.logger.increment('errors')
- return error_response
- if object_versions and not req.environ.get('swift_versioned_copy'):
- is_manifest = 'x-object-manifest' in req.headers or \
- 'x-object-manifest' in hresp.headers
- if hresp.status_int != HTTP_NOT_FOUND and not is_manifest:
- # This is a version manifest and needs to be handled
- # differently. First copy the existing data to a new object,
- # then write the data from this request to the version manifest
- # object.
- lcontainer = object_versions.split('/')[0]
- prefix_len = '%03x' % len(self.object_name)
- lprefix = prefix_len + self.object_name + '/'
- ts_source = hresp.environ.get('swift_x_timestamp')
- if ts_source is None:
- ts_source = time.mktime(time.strptime(
- hresp.headers['last-modified'],
- '%a, %d %b %Y %H:%M:%S GMT'))
- new_ts = normalize_timestamp(ts_source)
- vers_obj_name = lprefix + new_ts
- copy_headers = {
- 'Destination': '%s/%s' % (lcontainer, vers_obj_name)}
- copy_environ = {'REQUEST_METHOD': 'COPY',
- 'swift_versioned_copy': True
- }
- copy_req = Request.blank(req.path_info, headers=copy_headers,
- environ=copy_environ)
- copy_resp = self.COPY(copy_req)
- if is_client_error(copy_resp.status_int):
- # missing container or bad permissions
- return HTTPPreconditionFailed(request=req)
- elif not is_success(copy_resp.status_int):
- # could not copy the data, bail
- return HTTPServiceUnavailable(request=req)
-
- reader = req.environ['wsgi.input'].read
- data_source = iter(lambda: reader(self.app.client_chunk_size), '')
- source_header = req.headers.get('X-Copy-From')
- source_resp = None
- if source_header:
- source_header = unquote(source_header)
- acct = req.path_info.split('/', 2)[1]
- if isinstance(acct, unicode):
- acct = acct.encode('utf-8')
- if not source_header.startswith('/'):
- source_header = '/' + source_header
- source_header = '/' + acct + source_header
- try:
- src_container_name, src_obj_name = \
- source_header.split('/', 3)[2:]
- except ValueError:
- self.app.logger.increment('errors')
- return HTTPPreconditionFailed(request=req,
- body='X-Copy-From header must be of the form'
- '<container name>/<object name>')
- source_req = req.copy_get()
- source_req.path_info = source_header
- source_req.headers['X-Newest'] = 'true'
- orig_obj_name = self.object_name
- orig_container_name = self.container_name
- self.object_name = src_obj_name
- self.container_name = src_container_name
- source_resp = self.GET(source_req)
- if source_resp.status_int >= HTTP_MULTIPLE_CHOICES:
- self.app.logger.timing_since(
- '%s.timing' % (stats_type,), start_time)
- return source_resp
- self.object_name = orig_obj_name
- self.container_name = orig_container_name
- new_req = Request.blank(req.path_info,
- environ=req.environ, headers=req.headers)
- data_source = source_resp.app_iter
- new_req.content_length = source_resp.content_length
- if new_req.content_length is None:
- # This indicates a transfer-encoding: chunked source object,
- # which currently only happens because there are more than
- # CONTAINER_LISTING_LIMIT segments in a segmented object. In
- # this case, we're going to refuse to do the server-side copy.
- self.app.logger.increment('errors')
- return HTTPRequestEntityTooLarge(request=req)
- new_req.etag = source_resp.etag
- # we no longer need the X-Copy-From header
- del new_req.headers['X-Copy-From']
- if not content_type_manually_set:
- new_req.headers['Content-Type'] = \
- source_resp.headers['Content-Type']
- if new_req.headers.get('x-fresh-metadata', 'false').lower() \
- not in TRUE_VALUES:
- for k, v in source_resp.headers.items():
- if k.lower().startswith('x-object-meta-'):
- new_req.headers[k] = v
- for k, v in req.headers.items():
- if k.lower().startswith('x-object-meta-'):
- new_req.headers[k] = v
- req = new_req
- node_iter = self.iter_nodes(partition, nodes, self.app.object_ring)
- pile = GreenPile(len(nodes))
- for container in containers:
- nheaders = dict(req.headers.iteritems())
- nheaders['Connection'] = 'close'
- nheaders['X-Container-Host'] = '%(ip)s:%(port)s' % container
- nheaders['X-Container-Partition'] = container_partition
- nheaders['X-Container-Device'] = container['device']
- nheaders['Expect'] = '100-continue'
- if delete_at_nodes:
- node = delete_at_nodes.pop(0)
- nheaders['X-Delete-At-Host'] = '%(ip)s:%(port)s' % node
- nheaders['X-Delete-At-Partition'] = delete_at_part
- nheaders['X-Delete-At-Device'] = node['device']
- pile.spawn(self._connect_put_node, node_iter, partition,
- req.path_info, nheaders, self.app.logger.thread_locals)
- conns = [conn for conn in pile if conn]
- if len(conns) <= len(nodes) / 2:
- self.app.logger.error(
- _('Object PUT returning 503, %(conns)s/%(nodes)s '
- 'required connections'),
- {'conns': len(conns), 'nodes': len(nodes) // 2 + 1})
- self.app.logger.increment('errors')
- return HTTPServiceUnavailable(request=req)
- chunked = req.headers.get('transfer-encoding')
- bytes_transferred = 0
- try:
- with ContextPool(len(nodes)) as pool:
- for conn in conns:
- conn.failed = False
- conn.queue = Queue(self.app.put_queue_depth)
- pool.spawn(self._send_file, conn, req.path)
- while True:
- with ChunkReadTimeout(self.app.client_timeout):
- try:
- chunk = next(data_source)
- except StopIteration:
- if chunked:
- [conn.queue.put('0\r\n\r\n') for conn in conns]
- break
- bytes_transferred += len(chunk)
- if bytes_transferred > MAX_FILE_SIZE:
- self.app.logger.increment('errors')
- return HTTPRequestEntityTooLarge(request=req)
- for conn in list(conns):
- if not conn.failed:
- conn.queue.put('%x\r\n%s\r\n' % (len(chunk), chunk)
- if chunked else chunk)
- else:
- conns.remove(conn)
- if len(conns) <= len(nodes) / 2:
- self.app.logger.error(_('Object PUT exceptions during'
- ' send, %(conns)s/%(nodes)s required connections'),
- {'conns': len(conns), 'nodes': len(nodes) / 2 + 1})
- self.app.logger.increment('errors')
- return HTTPServiceUnavailable(request=req)
- for conn in conns:
- if conn.queue.unfinished_tasks:
- conn.queue.join()
- conns = [conn for conn in conns if not conn.failed]
- except ChunkReadTimeout, err:
- self.app.logger.warn(
- _('ERROR Client read timeout (%ss)'), err.seconds)
- self.app.logger.increment('client_timeouts')
- return HTTPRequestTimeout(request=req)
- except (Exception, Timeout):
- self.app.logger.exception(
- _('ERROR Exception causing client disconnect'))
- self.app.logger.increment('client_disconnects')
- self.app.logger.timing_since(
- '%s.timing' % (stats_type,), start_time)
- return HTTPClientDisconnect(request=req)
- if req.content_length and bytes_transferred < req.content_length:
- req.client_disconnect = True
- self.app.logger.warn(
- _('Client disconnected without sending enough data'))
- self.app.logger.increment('client_disconnects')
- self.app.logger.timing_since(
- '%s.timing' % (stats_type,), start_time)
- return HTTPClientDisconnect(request=req)
- statuses = []
- reasons = []
- bodies = []
- etags = set()
- for conn in conns:
- try:
- with Timeout(self.app.node_timeout):
- response = conn.getresponse()
- statuses.append(response.status)
- reasons.append(response.reason)
- bodies.append(response.read())
- if response.status >= HTTP_INTERNAL_SERVER_ERROR:
- self.error_occurred(conn.node,
- _('ERROR %(status)d %(body)s From Object Server ' \
- 're: %(path)s') % {'status': response.status,
- 'body': bodies[-1][:1024], 'path': req.path})
- elif is_success(response.status):
- etags.add(response.getheader('etag').strip('"'))
- except (Exception, Timeout):
- self.exception_occurred(conn.node, _('Object'),
- _('Trying to get final status of PUT to %s') % req.path)
- if len(etags) > 1:
- self.app.logger.error(
- _('Object servers returned %s mismatched etags'), len(etags))
- self.app.logger.increment('errors')
- return HTTPServerError(request=req)
- etag = len(etags) and etags.pop() or None
- while len(statuses) < len(nodes):
- statuses.append(HTTP_SERVICE_UNAVAILABLE)
- reasons.append('')
- bodies.append('')
- resp = self.best_response(req, statuses, reasons, bodies,
- _('Object PUT'), etag=etag)
- if source_header:
- resp.headers['X-Copied-From'] = quote(
- source_header.split('/', 2)[2])
- if 'last-modified' in source_resp.headers:
- resp.headers['X-Copied-From-Last-Modified'] = \
- source_resp.headers['last-modified']
- for k, v in req.headers.items():
- if k.lower().startswith('x-object-meta-'):
- resp.headers[k] = v
- resp.last_modified = float(req.headers['X-Timestamp'])
- self.app.logger.timing_since('%s.timing' % (stats_type,), start_time)
- return resp
-
- @public
- @delay_denial
- def DELETE(self, req):
- """HTTP DELETE request handler."""
- start_time = time.time()
- (container_partition, containers, _junk, req.acl,
- req.environ['swift_sync_key'], object_versions) = \
- self.container_info(self.account_name, self.container_name)
- if object_versions:
- # this is a version manifest and needs to be handled differently
- lcontainer = object_versions.split('/')[0]
- prefix_len = '%03x' % len(self.object_name)
- lprefix = prefix_len + self.object_name + '/'
- last_item = None
- try:
- for last_item in self._listing_iter(lcontainer, lprefix,
- req.environ):
- pass
- except ListingIterNotFound:
- # no worries, last_item is None
- pass
- except ListingIterNotAuthorized, err:
- self.app.logger.increment('auth_short_circuits')
- return err.aresp
- except ListingIterError:
- self.app.logger.increment('errors')
- return HTTPServerError(request=req)
- if last_item:
- # there are older versions so copy the previous version to the
- # current object and delete the previous version
- orig_container = self.container_name
- orig_obj = self.object_name
- self.container_name = lcontainer
- self.object_name = last_item['name']
- copy_path = '/' + self.account_name + '/' + \
- self.container_name + '/' + self.object_name
- copy_headers = {'X-Newest': 'True',
- 'Destination': orig_container + '/' + orig_obj
- }
- copy_environ = {'REQUEST_METHOD': 'COPY',
- 'swift_versioned_copy': True
- }
- creq = Request.blank(copy_path, headers=copy_headers,
- environ=copy_environ)
- copy_resp = self.COPY(creq)
- if is_client_error(copy_resp.status_int):
- # some user error, maybe permissions
- return HTTPPreconditionFailed(request=req)
- elif not is_success(copy_resp.status_int):
- # could not copy the data, bail
- return HTTPServiceUnavailable(request=req)
- # reset these because the COPY changed them
- self.container_name = lcontainer
- self.object_name = last_item['name']
- new_del_req = Request.blank(copy_path, environ=req.environ)
- (container_partition, containers,
- _junk, new_del_req.acl, _junk, _junk) = \
- self.container_info(self.account_name, self.container_name)
- new_del_req.path_info = copy_path
- req = new_del_req
- if 'swift.authorize' in req.environ:
- aresp = req.environ['swift.authorize'](req)
- if aresp:
- self.app.logger.increment('auth_short_circuits')
- return aresp
- if not containers:
- self.app.logger.timing_since('DELETE.timing', start_time)
- return HTTPNotFound(request=req)
- partition, nodes = self.app.object_ring.get_nodes(
- self.account_name, self.container_name, self.object_name)
- # Used by container sync feature
- if 'x-timestamp' in req.headers:
- try:
- req.headers['X-Timestamp'] = \
- normalize_timestamp(float(req.headers['x-timestamp']))
- except ValueError:
- self.app.logger.increment('errors')
- return HTTPBadRequest(request=req, content_type='text/plain',
- body='X-Timestamp should be a UNIX timestamp float value; '
- 'was %r' % req.headers['x-timestamp'])
- else:
- req.headers['X-Timestamp'] = normalize_timestamp(time.time())
- headers = []
- for container in containers:
- nheaders = dict(req.headers.iteritems())
- nheaders['Connection'] = 'close'
- nheaders['X-Container-Host'] = '%(ip)s:%(port)s' % container
- nheaders['X-Container-Partition'] = container_partition
- nheaders['X-Container-Device'] = container['device']
- headers.append(nheaders)
- resp = self.make_requests(req, self.app.object_ring,
- partition, 'DELETE', req.path_info, headers)
- self.app.logger.timing_since('DELETE.timing', start_time)
- return resp
-
- @public
- @delay_denial
- def COPY(self, req):
- """HTTP COPY request handler."""
- start_time = time.time()
- dest = req.headers.get('Destination')
- if not dest:
- self.app.logger.increment('errors')
- return HTTPPreconditionFailed(request=req,
- body='Destination header required')
- dest = unquote(dest)
- if not dest.startswith('/'):
- dest = '/' + dest
- try:
- _junk, dest_container, dest_object = dest.split('/', 2)
- except ValueError:
- self.app.logger.increment('errors')
- return HTTPPreconditionFailed(request=req,
- body='Destination header must be of the form '
- '<container name>/<object name>')
- source = '/' + self.container_name + '/' + self.object_name
- self.container_name = dest_container
- self.object_name = dest_object
- # re-write the existing request as a PUT instead of creating a new one
- # since this one is already attached to the posthooklogger
- req.method = 'PUT'
- req.path_info = '/' + self.account_name + dest
- req.headers['Content-Length'] = 0
- req.headers['X-Copy-From'] = quote(source)
- del req.headers['Destination']
- return self.PUT(req, start_time=start_time, stats_type='COPY')
-
-
-class ContainerController(Controller):
- """WSGI controller for container requests"""
- server_type = _('Container')
-
- # Ensure these are all lowercase
- pass_through_headers = ['x-container-read', 'x-container-write',
- 'x-container-sync-key', 'x-container-sync-to',
- 'x-versions-location']
-
- def __init__(self, app, account_name, container_name, **kwargs):
- Controller.__init__(self, app)
- self.account_name = unquote(account_name)
- self.container_name = unquote(container_name)
-
- def clean_acls(self, req):
- if 'swift.clean_acl' in req.environ:
- for header in ('x-container-read', 'x-container-write'):
- if header in req.headers:
- try:
- req.headers[header] = \
- req.environ['swift.clean_acl'](header,
- req.headers[header])
- except ValueError, err:
- return HTTPBadRequest(request=req, body=str(err))
- return None
-
- def GETorHEAD(self, req, stats_type):
- """Handler for HTTP GET/HEAD requests."""
- start_time = time.time()
- if not self.account_info(self.account_name)[1]:
- self.app.logger.timing_since(
- '%s.timing' % (stats_type,), start_time)
- return HTTPNotFound(request=req)
- part, nodes = self.app.container_ring.get_nodes(
- self.account_name, self.container_name)
- shuffle(nodes)
- resp = self.GETorHEAD_base(req, _('Container'), part, nodes,
- req.path_info, len(nodes))
-
- if self.app.memcache:
- # set the memcache container size for ratelimiting
- cache_key = get_container_memcache_key(self.account_name,
- self.container_name)
- self.app.memcache.set(cache_key,
- {'status': resp.status_int,
- 'read_acl': resp.headers.get('x-container-read'),
- 'write_acl': resp.headers.get('x-container-write'),
- 'sync_key': resp.headers.get('x-container-sync-key'),
- 'container_size': resp.headers.get('x-container-object-count'),
- 'versions': resp.headers.get('x-versions-location')},
- timeout=self.app.recheck_container_existence)
-
- if 'swift.authorize' in req.environ:
- req.acl = resp.headers.get('x-container-read')
- aresp = req.environ['swift.authorize'](req)
- if aresp:
- self.app.logger.increment('auth_short_circuits')
- return aresp
- if not req.environ.get('swift_owner', False):
- for key in ('x-container-read', 'x-container-write',
- 'x-container-sync-key', 'x-container-sync-to'):
- if key in resp.headers:
- del resp.headers[key]
- self.app.logger.timing_since('%s.timing' % (stats_type,), start_time)
- return resp
-
- @public
- @delay_denial
- def GET(self, req):
- """Handler for HTTP GET requests."""
- return self.GETorHEAD(req, stats_type='GET')
-
- @public
- @delay_denial
- def HEAD(self, req):
- """Handler for HTTP HEAD requests."""
- return self.GETorHEAD(req, stats_type='HEAD')
-
- @public
- def PUT(self, req):
- """HTTP PUT request handler."""
- start_time = time.time()
- error_response = \
- self.clean_acls(req) or check_metadata(req, 'container')
- if error_response:
- self.app.logger.increment('errors')
- return error_response
- if len(self.container_name) > MAX_CONTAINER_NAME_LENGTH:
- resp = HTTPBadRequest(request=req)
- resp.body = 'Container name length of %d longer than %d' % \
- (len(self.container_name), MAX_CONTAINER_NAME_LENGTH)
- self.app.logger.increment('errors')
- return resp
- account_partition, accounts, container_count = \
- self.account_info(self.account_name,
- autocreate=self.app.account_autocreate)
- if self.app.max_containers_per_account > 0 and \
- container_count >= self.app.max_containers_per_account and \
- self.account_name not in self.app.max_containers_whitelist:
- resp = HTTPForbidden(request=req)
- resp.body = 'Reached container limit of %s' % \
- self.app.max_containers_per_account
- return resp
- if not accounts:
- self.app.logger.timing_since('PUT.timing', start_time)
- return HTTPNotFound(request=req)
- container_partition, containers = self.app.container_ring.get_nodes(
- self.account_name, self.container_name)
- headers = []
- for account in accounts:
- nheaders = {'X-Timestamp': normalize_timestamp(time.time()),
- 'x-trans-id': self.trans_id,
- 'X-Account-Host': '%(ip)s:%(port)s' % account,
- 'X-Account-Partition': account_partition,
- 'X-Account-Device': account['device'],
- 'Connection': 'close'}
- self.transfer_headers(req.headers, nheaders)
- headers.append(nheaders)
- if self.app.memcache:
- cache_key = get_container_memcache_key(self.account_name,
- self.container_name)
- self.app.memcache.delete(cache_key)
- resp = self.make_requests(req, self.app.container_ring,
- container_partition, 'PUT', req.path_info, headers)
- self.app.logger.timing_since('PUT.timing', start_time)
- return resp
-
- @public
- def POST(self, req):
- """HTTP POST request handler."""
- start_time = time.time()
- error_response = \
- self.clean_acls(req) or check_metadata(req, 'container')
- if error_response:
- self.app.logger.increment('errors')
- return error_response
- account_partition, accounts, container_count = \
- self.account_info(self.account_name,
- autocreate=self.app.account_autocreate)
- if not accounts:
- self.app.logger.timing_since('POST.timing', start_time)
- return HTTPNotFound(request=req)
- container_partition, containers = self.app.container_ring.get_nodes(
- self.account_name, self.container_name)
- headers = {'X-Timestamp': normalize_timestamp(time.time()),
- 'x-trans-id': self.trans_id,
- 'Connection': 'close'}
- self.transfer_headers(req.headers, headers)
- if self.app.memcache:
- cache_key = get_container_memcache_key(self.account_name,
- self.container_name)
- self.app.memcache.delete(cache_key)
- resp = self.make_requests(req, self.app.container_ring,
- container_partition, 'POST', req.path_info,
- [headers] * len(containers))
- self.app.logger.timing_since('POST.timing', start_time)
- return resp
-
- @public
- def DELETE(self, req):
- """HTTP DELETE request handler."""
- start_time = time.time()
- account_partition, accounts, container_count = \
- self.account_info(self.account_name)
- if not accounts:
- self.app.logger.timing_since('DELETE.timing', start_time)
- return HTTPNotFound(request=req)
- container_partition, containers = self.app.container_ring.get_nodes(
- self.account_name, self.container_name)
- headers = []
- for account in accounts:
- headers.append({'X-Timestamp': normalize_timestamp(time.time()),
- 'X-Trans-Id': self.trans_id,
- 'X-Account-Host': '%(ip)s:%(port)s' % account,
- 'X-Account-Partition': account_partition,
- 'X-Account-Device': account['device'],
- 'Connection': 'close'})
- if self.app.memcache:
- cache_key = get_container_memcache_key(self.account_name,
- self.container_name)
- self.app.memcache.delete(cache_key)
- resp = self.make_requests(req, self.app.container_ring,
- container_partition, 'DELETE', req.path_info, headers)
- # Indicates no server had the container
- self.app.logger.timing_since('DELETE.timing', start_time)
- if resp.status_int == HTTP_ACCEPTED:
- return HTTPNotFound(request=req)
- return resp
-
-
-class AccountController(Controller):
- """WSGI controller for account requests"""
- server_type = _('Account')
-
- def __init__(self, app, account_name, **kwargs):
- Controller.__init__(self, app)
- self.account_name = unquote(account_name)
-
- def GETorHEAD(self, req, stats_type):
- """Handler for HTTP GET/HEAD requests."""
- start_time = time.time()
- partition, nodes = self.app.account_ring.get_nodes(self.account_name)
- shuffle(nodes)
- resp = self.GETorHEAD_base(req, _('Account'), partition, nodes,
- req.path_info.rstrip('/'), len(nodes))
- if resp.status_int == HTTP_NOT_FOUND and self.app.account_autocreate:
- if len(self.account_name) > MAX_ACCOUNT_NAME_LENGTH:
- resp = HTTPBadRequest(request=req)
- resp.body = 'Account name length of %d longer than %d' % \
- (len(self.account_name), MAX_ACCOUNT_NAME_LENGTH)
- self.app.logger.timing_since(
- '%s.timing' % (stats_type,), start_time)
- return resp
- headers = {'X-Timestamp': normalize_timestamp(time.time()),
- 'X-Trans-Id': self.trans_id,
- 'Connection': 'close'}
- resp = self.make_requests(
- Request.blank('/v1/' + self.account_name),
- self.app.account_ring, partition, 'PUT',
- '/' + self.account_name, [headers] * len(nodes))
- if not is_success(resp.status_int):
- self.app.logger.warning('Could not autocreate account %r' %
- self.account_name)
- return resp
- resp = self.GETorHEAD_base(req, _('Account'), partition, nodes,
- req.path_info.rstrip('/'), len(nodes))
- self.app.logger.timing_since('%s.timing' % (stats_type,), start_time)
- return resp
-
- @public
- def PUT(self, req):
- """HTTP PUT request handler."""
- start_time = time.time()
- if not self.app.allow_account_management:
- self.app.logger.timing_since('PUT.timing', start_time)
- return HTTPMethodNotAllowed(request=req)
- error_response = check_metadata(req, 'account')
- if error_response:
- self.app.logger.increment('errors')
- return error_response
- if len(self.account_name) > MAX_ACCOUNT_NAME_LENGTH:
- resp = HTTPBadRequest(request=req)
- resp.body = 'Account name length of %d longer than %d' % \
- (len(self.account_name), MAX_ACCOUNT_NAME_LENGTH)
- self.app.logger.increment('errors')
- return resp
- account_partition, accounts = \
- self.app.account_ring.get_nodes(self.account_name)
- headers = {'X-Timestamp': normalize_timestamp(time.time()),
- 'x-trans-id': self.trans_id,
- 'Connection': 'close'}
- self.transfer_headers(req.headers, headers)
- if self.app.memcache:
- self.app.memcache.delete('account%s' % req.path_info.rstrip('/'))
- resp = self.make_requests(req, self.app.account_ring,
- account_partition, 'PUT', req.path_info, [headers] * len(accounts))
- self.app.logger.timing_since('PUT.timing', start_time)
- return resp
-
- @public
- def POST(self, req):
- """HTTP POST request handler."""
- start_time = time.time()
- error_response = check_metadata(req, 'account')
- if error_response:
- self.app.logger.increment('errors')
- return error_response
- account_partition, accounts = \
- self.app.account_ring.get_nodes(self.account_name)
- headers = {'X-Timestamp': normalize_timestamp(time.time()),
- 'X-Trans-Id': self.trans_id,
- 'Connection': 'close'}
- self.transfer_headers(req.headers, headers)
- if self.app.memcache:
- self.app.memcache.delete('account%s' % req.path_info.rstrip('/'))
- resp = self.make_requests(req, self.app.account_ring,
- account_partition, 'POST', req.path_info,
- [headers] * len(accounts))
- if resp.status_int == HTTP_NOT_FOUND and self.app.account_autocreate:
- if len(self.account_name) > MAX_ACCOUNT_NAME_LENGTH:
- resp = HTTPBadRequest(request=req)
- resp.body = 'Account name length of %d longer than %d' % \
- (len(self.account_name), MAX_ACCOUNT_NAME_LENGTH)
- self.app.logger.increment('errors')
- return resp
- resp = self.make_requests(
- Request.blank('/v1/' + self.account_name),
- self.app.account_ring, account_partition, 'PUT',
- '/' + self.account_name, [headers] * len(accounts))
- if not is_success(resp.status_int):
- self.app.logger.warning('Could not autocreate account %r' %
- self.account_name)
- return resp
- self.app.logger.timing_since('POST.timing', start_time)
- return resp
-
- @public
- def DELETE(self, req):
- """HTTP DELETE request handler."""
- start_time = time.time()
- if not self.app.allow_account_management:
- self.app.logger.timing_since('DELETE.timing', start_time)
- return HTTPMethodNotAllowed(request=req)
- account_partition, accounts = \
- self.app.account_ring.get_nodes(self.account_name)
- headers = {'X-Timestamp': normalize_timestamp(time.time()),
- 'X-Trans-Id': self.trans_id,
- 'Connection': 'close'}
- if self.app.memcache:
- self.app.memcache.delete('account%s' % req.path_info.rstrip('/'))
- resp = self.make_requests(req, self.app.account_ring,
- account_partition, 'DELETE', req.path_info,
- [headers] * len(accounts))
- self.app.logger.timing_since('DELETE.timing', start_time)
- return resp
+from swift.common.utils import cache_from_env, get_logger, \
+ get_remote_client, split_path, TRUE_VALUES
+from swift.common.constraints import check_utf8
+from swift.proxy.controllers import AccountController, ObjectController, \
+ ContainerController, Controller
class BaseApplication(object):
diff --git a/test/unit/common/middleware/test_ratelimit.py b/test/unit/common/middleware/test_ratelimit.py
index bf5973a9f..3db4e01bd 100644
--- a/test/unit/common/middleware/test_ratelimit.py
+++ b/test/unit/common/middleware/test_ratelimit.py
@@ -22,7 +22,7 @@ from webob import Request
from test.unit import FakeLogger
from swift.common.middleware import ratelimit
-from swift.proxy.server import get_container_memcache_key
+from swift.proxy.controllers.base import get_container_memcache_key
from swift.common.memcached import MemcacheConnectionError
diff --git a/test/unit/proxy/test_server.py b/test/unit/proxy/test_server.py
index 6eddf9e15..5a2fd5eea 100644
--- a/test/unit/proxy/test_server.py
+++ b/test/unit/proxy/test_server.py
@@ -43,10 +43,15 @@ from swift.account import server as account_server
from swift.container import server as container_server
from swift.obj import server as object_server
from swift.common import ring
+from swift.common.exceptions import ChunkReadTimeout
from swift.common.constraints import MAX_META_NAME_LENGTH, \
MAX_META_VALUE_LENGTH, MAX_META_COUNT, MAX_META_OVERALL_SIZE, MAX_FILE_SIZE
from swift.common.utils import mkdirs, normalize_timestamp, NullLogger
from swift.common.wsgi import monkey_patch_mimetools
+from swift.proxy.controllers.obj import SegmentedIterable
+from swift.proxy.controllers.base import get_container_memcache_key, \
+ get_account_memcache_key
+import swift.proxy.controllers
# mocks
logging.getLogger().addHandler(logging.StreamHandler(sys.stdout))
@@ -67,7 +72,8 @@ def setup():
mkdirs(os.path.join(_testdir, 'sda1', 'tmp'))
mkdirs(os.path.join(_testdir, 'sdb1'))
mkdirs(os.path.join(_testdir, 'sdb1', 'tmp'))
- _orig_container_listing_limit = proxy_server.CONTAINER_LISTING_LIMIT
+ _orig_container_listing_limit = \
+ swift.proxy.controllers.obj.CONTAINER_LISTING_LIMIT
conf = {'devices': _testdir, 'swift_dir': _testdir,
'mount_check': 'false', 'allowed_headers':
'content-encoding, x-object-manifest, content-disposition, foo',
@@ -122,8 +128,8 @@ def setup():
ts = normalize_timestamp(time())
partition, nodes = prosrv.account_ring.get_nodes('a')
for node in nodes:
- conn = proxy_server.http_connect(node['ip'], node['port'],
- node['device'], partition, 'PUT', '/a',
+ conn = swift.proxy.controllers.obj.http_connect(node['ip'],
+ node['port'], node['device'], partition, 'PUT', '/a',
{'X-Timestamp': ts, 'x-trans-id': 'test'})
resp = conn.getresponse()
assert(resp.status == 201)
@@ -142,7 +148,8 @@ def setup():
def teardown():
for server in _test_coros:
server.kill()
- proxy_server.CONTAINER_LISTING_LIMIT = _orig_container_listing_limit
+ swift.proxy.controllers.obj.CONTAINER_LISTING_LIMIT = \
+ _orig_container_listing_limit
rmtree(os.path.dirname(_testdir))
@@ -311,13 +318,36 @@ class FakeMemcacheReturnsNone(FakeMemcache):
@contextmanager
def save_globals():
- orig_http_connect = getattr(proxy_server, 'http_connect', None)
+ orig_http_connect = getattr(swift.proxy.controllers.base, 'http_connect',
+ None)
orig_account_info = getattr(proxy_server.Controller, 'account_info', None)
try:
yield True
finally:
- proxy_server.http_connect = orig_http_connect
proxy_server.Controller.account_info = orig_account_info
+ proxy_server.http_connect = orig_http_connect
+ swift.proxy.controllers.base.http_connect = orig_http_connect
+ swift.proxy.controllers.obj.http_connect = orig_http_connect
+ swift.proxy.controllers.account.http_connect = orig_http_connect
+ swift.proxy.controllers.container.http_connect = orig_http_connect
+
+
+def set_http_connect(*args, **kwargs):
+ new_connect = fake_http_connect(*args, **kwargs)
+ proxy_server.http_connect = new_connect
+ swift.proxy.controllers.base.http_connect = new_connect
+ swift.proxy.controllers.obj.http_connect = new_connect
+ swift.proxy.controllers.account.http_connect = new_connect
+ swift.proxy.controllers.container.http_connect = new_connect
+
+
+def set_shuffle():
+ shuffle = lambda l: None
+ proxy_server.shuffle = shuffle
+ swift.proxy.controllers.base.shuffle = shuffle
+ swift.proxy.controllers.obj.shuffle = shuffle
+ swift.proxy.controllers.account.shuffle = shuffle
+ swift.proxy.controllers.container.shuffle = shuffle
# tests
@@ -349,11 +379,10 @@ class TestController(unittest.TestCase):
def test_make_requests(self):
with save_globals():
- proxy_server.http_connect = fake_http_connect(200)
+ set_http_connect(200)
partition, nodes, count = \
self.controller.account_info(self.account)
- proxy_server.http_connect = fake_http_connect(201,
- raise_timeout_exc=True)
+ set_http_connect(201, raise_timeout_exc=True)
self.controller._make_request(
nodes, partition, 'POST', '/', '', '',
self.controller.app.logger.thread_locals)
@@ -361,17 +390,17 @@ class TestController(unittest.TestCase):
# tests if 200 is cached and used
def test_account_info_200(self):
with save_globals():
- proxy_server.http_connect = fake_http_connect(200)
+ set_http_connect(200)
partition, nodes, count = \
self.controller.account_info(self.account)
self.check_account_info_return(partition, nodes)
self.assertEquals(count, 12345)
- cache_key = proxy_server.get_account_memcache_key(self.account)
+ cache_key = get_account_memcache_key(self.account)
self.assertEquals({'status': 200, 'container_count': 12345},
self.memcache.get(cache_key))
- proxy_server.http_connect = fake_http_connect()
+ set_http_connect()
partition, nodes, count = \
self.controller.account_info(self.account)
self.check_account_info_return(partition, nodes)
@@ -380,17 +409,17 @@ class TestController(unittest.TestCase):
# tests if 404 is cached and used
def test_account_info_404(self):
with save_globals():
- proxy_server.http_connect = fake_http_connect(404, 404, 404)
+ set_http_connect(404, 404, 404)
partition, nodes, count = \
self.controller.account_info(self.account)
self.check_account_info_return(partition, nodes, True)
self.assertEquals(count, None)
- cache_key = proxy_server.get_account_memcache_key(self.account)
+ cache_key = get_account_memcache_key(self.account)
self.assertEquals({'status': 404, 'container_count': 0},
self.memcache.get(cache_key))
- proxy_server.http_connect = fake_http_connect()
+ set_http_connect()
partition, nodes, count = \
self.controller.account_info(self.account)
self.check_account_info_return(partition, nodes, True)
@@ -399,7 +428,7 @@ class TestController(unittest.TestCase):
# tests if some http status codes are not cached
def test_account_info_no_cache(self):
def test(*status_list):
- proxy_server.http_connect = fake_http_connect(*status_list)
+ set_http_connect(*status_list)
partition, nodes, count = \
self.controller.account_info(self.account)
self.assertEqual(len(self.memcache.keys()), 0)
@@ -415,40 +444,35 @@ class TestController(unittest.TestCase):
def test_account_info_account_autocreate(self):
with save_globals():
self.memcache.store = {}
- proxy_server.http_connect = \
- fake_http_connect(404, 404, 404, 201, 201, 201)
+ set_http_connect(404, 404, 404, 201, 201, 201)
partition, nodes, count = \
self.controller.account_info(self.account, autocreate=False)
self.check_account_info_return(partition, nodes, is_none=True)
self.assertEquals(count, None)
self.memcache.store = {}
- proxy_server.http_connect = \
- fake_http_connect(404, 404, 404, 201, 201, 201)
+ set_http_connect(404, 404, 404, 201, 201, 201)
partition, nodes, count = \
self.controller.account_info(self.account)
self.check_account_info_return(partition, nodes, is_none=True)
self.assertEquals(count, None)
self.memcache.store = {}
- proxy_server.http_connect = \
- fake_http_connect(404, 404, 404, 201, 201, 201)
+ set_http_connect(404, 404, 404, 201, 201, 201)
partition, nodes, count = \
self.controller.account_info(self.account, autocreate=True)
self.check_account_info_return(partition, nodes)
self.assertEquals(count, 0)
self.memcache.store = {}
- proxy_server.http_connect = \
- fake_http_connect(404, 404, 404, 503, 201, 201)
+ set_http_connect(404, 404, 404, 503, 201, 201)
partition, nodes, count = \
self.controller.account_info(self.account, autocreate=True)
self.check_account_info_return(partition, nodes)
self.assertEquals(count, 0)
self.memcache.store = {}
- proxy_server.http_connect = \
- fake_http_connect(404, 404, 404, 503, 201, 503)
+ set_http_connect(404, 404, 404, 503, 201, 503)
exc = None
partition, nodes, count = \
self.controller.account_info(self.account, autocreate=True)
@@ -504,19 +528,18 @@ class TestController(unittest.TestCase):
headers = {'x-container-read': self.read_acl,
'x-container-write': self.write_acl}
proxy_server.Controller.account_info = account_info
- proxy_server.http_connect = fake_http_connect(200,
- headers=headers)
+ set_http_connect(200, headers=headers)
ret = self.controller.container_info(self.account,
self.container)
self.check_container_info_return(ret)
- cache_key = proxy_server.get_container_memcache_key(self.account,
+ cache_key = get_container_memcache_key(self.account,
self.container)
cache_value = self.memcache.get(cache_key)
self.assertTrue(isinstance(cache_value, dict))
self.assertEquals(200, cache_value.get('status'))
- proxy_server.http_connect = fake_http_connect()
+ set_http_connect()
ret = self.controller.container_info(self.account,
self.container)
self.check_container_info_return(ret)
@@ -528,18 +551,18 @@ class TestController(unittest.TestCase):
with save_globals():
proxy_server.Controller.account_info = account_info
- proxy_server.http_connect = fake_http_connect(404, 404, 404)
+ set_http_connect(404, 404, 404)
ret = self.controller.container_info(self.account,
self.container)
self.check_container_info_return(ret, True)
- cache_key = proxy_server.get_container_memcache_key(self.account,
+ cache_key = get_container_memcache_key(self.account,
self.container)
cache_value = self.memcache.get(cache_key)
self.assertTrue(isinstance(cache_value, dict))
self.assertEquals(404, cache_value.get('status'))
- proxy_server.http_connect = fake_http_connect()
+ set_http_connect()
ret = self.controller.container_info(self.account,
self.container)
self.check_container_info_return(ret, True)
@@ -547,7 +570,7 @@ class TestController(unittest.TestCase):
# tests if some http status codes are not cached
def test_container_info_no_cache(self):
def test(*status_list):
- proxy_server.http_connect = fake_http_connect(*status_list)
+ set_http_connect(*status_list)
ret = self.controller.container_info(self.account,
self.container)
self.assertEqual(len(self.memcache.keys()), 0)
@@ -598,7 +621,7 @@ class TestProxyServer(unittest.TestCase):
def authorize(req):
called[0] = True
with save_globals():
- proxy_server.http_connect = fake_http_connect(200)
+ set_http_connect(200)
app = proxy_server.Application(None, FakeMemcache(),
account_ring=FakeRing(), container_ring=FakeRing(),
object_ring=FakeRing())
@@ -662,16 +685,13 @@ class TestObjectController(unittest.TestCase):
object_ring=FakeRing())
monkey_patch_mimetools()
- def tearDown(self):
- proxy_server.CONTAINER_LISTING_LIMIT = _orig_container_listing_limit
-
def assert_status_map(self, method, statuses, expected, raise_exc=False):
with save_globals():
kwargs = {}
if raise_exc:
kwargs['raise_exc'] = raise_exc
- proxy_server.http_connect = fake_http_connect(*statuses, **kwargs)
+ set_http_connect(*statuses, **kwargs)
self.app.memcache.store = {}
req = Request.blank('/a/c/o', headers={'Content-Length': '0',
'Content-Type': 'text/plain'})
@@ -680,7 +700,7 @@ class TestObjectController(unittest.TestCase):
self.assertEquals(res.status_int, expected)
# repeat test
- proxy_server.http_connect = fake_http_connect(*statuses, **kwargs)
+ set_http_connect(*statuses, **kwargs)
self.app.memcache.store = {}
req = Request.blank('/a/c/o', headers={'Content-Length': '0',
'Content-Type': 'text/plain'})
@@ -697,7 +717,7 @@ class TestObjectController(unittest.TestCase):
# The three responses here are for account_info() (HEAD to account server),
# container_info() (HEAD to container server) and three calls to
# _connect_put_node() (PUT to three object servers)
- proxy_server.http_connect = fake_http_connect(201, 201, 201, 201, 201,
+ set_http_connect(201, 201, 201, 201, 201,
give_content_type=lambda content_type:
self.assertEquals(content_type, expected.next()))
# We need into include a transfer-encoding to get past
@@ -737,7 +757,7 @@ class TestObjectController(unittest.TestCase):
'container', 'object')
def test_status_map(statuses, expected):
- proxy_server.http_connect = fake_http_connect(*statuses)
+ set_http_connect(*statuses)
req = Request.blank('/a/c/o.jpg', {})
req.content_length = 0
self.app.update_request(req)
@@ -756,7 +776,7 @@ class TestObjectController(unittest.TestCase):
'container', 'object')
def test_status_map(statuses, expected):
- proxy_server.http_connect = fake_http_connect(*statuses)
+ set_http_connect(*statuses)
self.app.memcache.store = {}
req = Request.blank('/a/c/o.jpg', {})
req.content_length = 0
@@ -777,7 +797,7 @@ class TestObjectController(unittest.TestCase):
def test_status_map(statuses, expected):
self.app.memcache.store = {}
- proxy_server.http_connect = fake_http_connect(*statuses)
+ set_http_connect(*statuses)
req = Request.blank('/a/c/o.jpg',
environ={'REQUEST_METHOD': 'PUT'}, body='some data')
self.app.update_request(req)
@@ -790,7 +810,7 @@ class TestObjectController(unittest.TestCase):
def test_PUT_max_size(self):
with save_globals():
- proxy_server.http_connect = fake_http_connect(201, 201, 201)
+ set_http_connect(201, 201, 201)
controller = proxy_server.ObjectController(self.app, 'account',
'container', 'object')
req = Request.blank('/a/c/o', {}, headers={
@@ -808,7 +828,7 @@ class TestObjectController(unittest.TestCase):
def test_status_map(statuses, expected):
self.app.memcache.store = {}
- proxy_server.http_connect = fake_http_connect(*statuses)
+ set_http_connect(*statuses)
req = Request.blank('/a/c/o.jpg', {})
req.content_length = 0
self.app.update_request(req)
@@ -827,7 +847,7 @@ class TestObjectController(unittest.TestCase):
'container', 'object')
def test_status_map(statuses, expected):
- proxy_server.http_connect = fake_http_connect(*statuses)
+ set_http_connect(*statuses)
self.app.memcache.store = {}
req = Request.blank('/a/c/o', {}, headers={
'Content-Type': 'foo/bar'})
@@ -849,7 +869,7 @@ class TestObjectController(unittest.TestCase):
'container', 'object')
def test_status_map(statuses, expected):
- proxy_server.http_connect = fake_http_connect(*statuses)
+ set_http_connect(*statuses)
self.app.memcache.store = {}
req = Request.blank('/a/c/o', {}, headers={
'Content-Type': 'foo/bar'})
@@ -871,7 +891,7 @@ class TestObjectController(unittest.TestCase):
'container', 'object')
def test_status_map(statuses, expected):
- proxy_server.http_connect = fake_http_connect(*statuses)
+ set_http_connect(*statuses)
self.app.memcache.store = {}
req = Request.blank('/a/c/o', {})
self.app.update_request(req)
@@ -891,7 +911,7 @@ class TestObjectController(unittest.TestCase):
'container', 'object')
def test_status_map(statuses, expected):
- proxy_server.http_connect = fake_http_connect(*statuses)
+ set_http_connect(*statuses)
self.app.memcache.store = {}
req = Request.blank('/a/c/o', {})
self.app.update_request(req)
@@ -918,8 +938,7 @@ class TestObjectController(unittest.TestCase):
def test_status_map(statuses, expected, timestamps,
expected_timestamp):
- proxy_server.http_connect = \
- fake_http_connect(*statuses, timestamps=timestamps)
+ set_http_connect(*statuses, timestamps=timestamps)
self.app.memcache.store = {}
req = Request.blank('/a/c/o', {}, headers={'x-newest': 'true'})
self.app.update_request(req)
@@ -944,8 +963,7 @@ class TestObjectController(unittest.TestCase):
def test_status_map(statuses, expected, timestamps,
expected_timestamp):
- proxy_server.http_connect = \
- fake_http_connect(*statuses, timestamps=timestamps)
+ set_http_connect(*statuses, timestamps=timestamps)
self.app.memcache.store = {}
req = Request.blank('/a/c/o', {}, headers={'x-newest': 'true'})
self.app.update_request(req)
@@ -968,8 +986,7 @@ class TestObjectController(unittest.TestCase):
def test_status_map(statuses, expected, timestamps,
expected_timestamp):
- proxy_server.http_connect = \
- fake_http_connect(*statuses, timestamps=timestamps)
+ set_http_connect(*statuses, timestamps=timestamps)
self.app.memcache.store = {}
req = Request.blank('/a/c/o', {})
self.app.update_request(req)
@@ -990,16 +1007,15 @@ class TestObjectController(unittest.TestCase):
self.app.object_post_as_copy = False
controller = proxy_server.ObjectController(self.app, 'account',
'container', 'object')
- proxy_server.http_connect = \
- fake_http_connect(200, 200, 202, 202, 202)
- # acct cont obj obj obj
+ set_http_connect(200, 200, 202, 202, 202)
+ # acct cont obj obj obj
req = Request.blank('/a/c/o', {}, headers={
'Content-Type': 'foo/bar',
'X-Object-Meta-Foo': 'x' * 256})
self.app.update_request(req)
res = controller.POST(req)
self.assertEquals(res.status_int, 202)
- proxy_server.http_connect = fake_http_connect(202, 202, 202)
+ set_http_connect(202, 202, 202)
req = Request.blank('/a/c/o', {}, headers={
'Content-Type': 'foo/bar',
'X-Object-Meta-Foo': 'x' * 257})
@@ -1011,16 +1027,15 @@ class TestObjectController(unittest.TestCase):
with save_globals():
controller = proxy_server.ObjectController(self.app, 'account',
'container', 'object')
- proxy_server.http_connect = \
- fake_http_connect(200, 200, 200, 200, 200, 202, 202, 202)
- # acct cont objc objc objc obj obj obj
+ set_http_connect(200, 200, 200, 200, 200, 202, 202, 202)
+ # acct cont objc objc objc obj obj obj
req = Request.blank('/a/c/o', {}, headers={
'Content-Type': 'foo/bar',
'X-Object-Meta-Foo': 'x' * 256})
self.app.update_request(req)
res = controller.POST(req)
self.assertEquals(res.status_int, 202)
- proxy_server.http_connect = fake_http_connect(202, 202, 202)
+ set_http_connect(202, 202, 202)
req = Request.blank('/a/c/o', {}, headers={
'Content-Type': 'foo/bar',
'X-Object-Meta-Foo': 'x' * 257})
@@ -1033,16 +1048,15 @@ class TestObjectController(unittest.TestCase):
self.app.object_post_as_copy = False
controller = proxy_server.ObjectController(self.app, 'account',
'container', 'object')
- proxy_server.http_connect = \
- fake_http_connect(200, 200, 202, 202, 202)
- # acct cont obj obj obj
+ set_http_connect(200, 200, 202, 202, 202)
+ # acct cont obj obj obj
req = Request.blank('/a/c/o', {}, headers={
'Content-Type': 'foo/bar',
('X-Object-Meta-' + 'x' * 128): 'x'})
self.app.update_request(req)
res = controller.POST(req)
self.assertEquals(res.status_int, 202)
- proxy_server.http_connect = fake_http_connect(202, 202, 202)
+ set_http_connect(202, 202, 202)
req = Request.blank('/a/c/o', {}, headers={
'Content-Type': 'foo/bar',
('X-Object-Meta-' + 'x' * 129): 'x'})
@@ -1054,16 +1068,15 @@ class TestObjectController(unittest.TestCase):
with save_globals():
controller = proxy_server.ObjectController(self.app, 'account',
'container', 'object')
- proxy_server.http_connect = \
- fake_http_connect(200, 200, 200, 200, 200, 202, 202, 202)
- # acct cont objc objc objc obj obj obj
+ set_http_connect(200, 200, 200, 200, 200, 202, 202, 202)
+ # acct cont objc objc objc obj obj obj
req = Request.blank('/a/c/o', {}, headers={
'Content-Type': 'foo/bar',
('X-Object-Meta-' + 'x' * 128): 'x'})
self.app.update_request(req)
res = controller.POST(req)
self.assertEquals(res.status_int, 202)
- proxy_server.http_connect = fake_http_connect(202, 202, 202)
+ set_http_connect(202, 202, 202)
req = Request.blank('/a/c/o', {}, headers={
'Content-Type': 'foo/bar',
('X-Object-Meta-' + 'x' * 129): 'x'})
@@ -1078,7 +1091,7 @@ class TestObjectController(unittest.TestCase):
headers = dict(
(('X-Object-Meta-' + str(i), 'a') for i in xrange(91)))
headers.update({'Content-Type': 'foo/bar'})
- proxy_server.http_connect = fake_http_connect(202, 202, 202)
+ set_http_connect(202, 202, 202)
req = Request.blank('/a/c/o', {}, headers=headers)
self.app.update_request(req)
res = controller.POST(req)
@@ -1091,7 +1104,7 @@ class TestObjectController(unittest.TestCase):
headers = dict(
(('X-Object-Meta-' + str(i), 'a' * 256) for i in xrange(1000)))
headers.update({'Content-Type': 'foo/bar'})
- proxy_server.http_connect = fake_http_connect(202, 202, 202)
+ set_http_connect(202, 202, 202)
req = Request.blank('/a/c/o', {}, headers=headers)
self.app.update_request(req)
res = controller.POST(req)
@@ -1130,9 +1143,8 @@ class TestObjectController(unittest.TestCase):
self.app.update_request(req)
controller = proxy_server.ObjectController(self.app, 'account',
'container', 'object')
- proxy_server.http_connect = \
- fake_http_connect(200, 200, 201, 201, 201)
- # acct cont obj obj obj
+ set_http_connect(200, 200, 201, 201, 201)
+ # acct cont obj obj obj
resp = controller.PUT(req)
self.assertEquals(resp.status_int, 201)
self.app.client_timeout = 0.1
@@ -1140,9 +1152,8 @@ class TestObjectController(unittest.TestCase):
environ={'REQUEST_METHOD': 'PUT', 'wsgi.input': SlowBody()},
headers={'Content-Length': '4', 'Content-Type': 'text/plain'})
self.app.update_request(req)
- proxy_server.http_connect = \
- fake_http_connect(201, 201, 201)
- # obj obj obj
+ set_http_connect(201, 201, 201)
+ # obj obj obj
resp = controller.PUT(req)
self.assertEquals(resp.status_int, 408)
@@ -1175,9 +1186,8 @@ class TestObjectController(unittest.TestCase):
self.app.update_request(req)
controller = proxy_server.ObjectController(self.app, 'account',
'container', 'object')
- proxy_server.http_connect = \
- fake_http_connect(200, 200, 201, 201, 201)
- # acct cont obj obj obj
+ set_http_connect(200, 200, 201, 201, 201)
+ # acct cont obj obj obj
resp = controller.PUT(req)
self.assertEquals(resp.status_int, 499)
@@ -1199,24 +1209,22 @@ class TestObjectController(unittest.TestCase):
self.app.update_request(req)
controller = proxy_server.ObjectController(self.app, 'account',
'container', 'object')
- proxy_server.http_connect = \
- fake_http_connect(200, 200, 200, slow=True)
+ set_http_connect(200, 200, 200, slow=True)
req.sent_size = 0
resp = controller.GET(req)
got_exc = False
try:
resp.body
- except proxy_server.ChunkReadTimeout:
+ except ChunkReadTimeout:
got_exc = True
self.assert_(not got_exc)
self.app.node_timeout = 0.1
- proxy_server.http_connect = \
- fake_http_connect(200, 200, 200, slow=True)
+ set_http_connect(200, 200, 200, slow=True)
resp = controller.GET(req)
got_exc = False
try:
resp.body
- except proxy_server.ChunkReadTimeout:
+ except ChunkReadTimeout:
got_exc = True
self.assert_(got_exc)
@@ -1241,13 +1249,11 @@ class TestObjectController(unittest.TestCase):
self.app.update_request(req)
controller = proxy_server.ObjectController(self.app, 'account',
'container', 'object')
- proxy_server.http_connect = \
- fake_http_connect(200, 200, 201, 201, 201, slow=True)
+ set_http_connect(200, 200, 201, 201, 201, slow=True)
resp = controller.PUT(req)
self.assertEquals(resp.status_int, 201)
self.app.node_timeout = 0.1
- proxy_server.http_connect = \
- fake_http_connect(201, 201, 201, slow=True)
+ set_http_connect(201, 201, 201, slow=True)
req = Request.blank('/a/c/o',
environ={'REQUEST_METHOD': 'PUT'},
headers={'Content-Length': '4', 'Content-Type': 'text/plain'},
@@ -1331,16 +1337,15 @@ class TestObjectController(unittest.TestCase):
self.app.update_request(req)
controller = proxy_server.ObjectController(self.app, 'account',
'container', 'object')
- proxy_server.http_connect = fake_http_connect(200, 200, 200)
+ set_http_connect(200, 200, 200)
resp = controller.GET(req)
self.assertEquals(resp.status_int, 200)
self.assertEquals(resp.content_type, 'x-application/test')
- proxy_server.http_connect = fake_http_connect(200, 200, 200)
+ set_http_connect(200, 200, 200)
resp = controller.GET(req)
self.assertEquals(resp.status_int, 200)
self.assertEquals(resp.content_length, 0)
- proxy_server.http_connect = \
- fake_http_connect(200, 200, 200, slow=True)
+ set_http_connect(200, 200, 200, slow=True)
resp = controller.GET(req)
self.assertEquals(resp.status_int, 200)
self.assertEquals(resp.content_length, 4)
@@ -1351,22 +1356,22 @@ class TestObjectController(unittest.TestCase):
self.app.update_request(req)
controller = proxy_server.ObjectController(self.app, 'account',
'container', 'object')
- proxy_server.http_connect = fake_http_connect(200, 200, 200)
+ set_http_connect(200, 200, 200)
resp = controller.HEAD(req)
self.assertEquals(resp.status_int, 200)
self.assertEquals(resp.content_length, 0)
- proxy_server.http_connect = \
- fake_http_connect(200, 200, 200, slow=True)
+ set_http_connect(200, 200, 200, slow=True)
resp = controller.HEAD(req)
self.assertEquals(resp.status_int, 200)
self.assertEquals(resp.content_length, 4)
def test_error_limiting(self):
with save_globals():
- proxy_server.shuffle = lambda l: None
+ set_shuffle()
controller = proxy_server.ObjectController(self.app, 'account',
'container', 'object')
self.assert_status_map(controller.HEAD, (200, 200, 503, 200, 200), 200)
+ print controller.app.object_ring.devs
self.assertEquals(controller.app.object_ring.devs[0]['errors'], 2)
self.assert_('last_error' in controller.app.object_ring.devs[0])
for _junk in xrange(self.app.error_suppression_limit):
@@ -1397,62 +1402,53 @@ class TestObjectController(unittest.TestCase):
del dev['last_error']
controller = proxy_server.ObjectController(self.app, 'account',
'container', 'object')
- proxy_server.http_connect = \
- fake_http_connect(200, 200, 200, 200, 200, 200)
+ set_http_connect(200, 200, 200, 200, 200, 200)
req = Request.blank('/a/c/o', environ={'REQUEST_METHOD': 'DELETE'})
self.app.update_request(req)
resp = getattr(controller, 'DELETE')(req)
self.assertEquals(resp.status_int, 200)
- proxy_server.http_connect = \
- fake_http_connect(404, 404, 404)
- # acct acct acct
+ set_http_connect(404, 404, 404)
+ # acct acct acct
resp = getattr(controller, 'DELETE')(req)
self.assertEquals(resp.status_int, 404)
- proxy_server.http_connect = \
- fake_http_connect(503, 404, 404)
- # acct acct acct
+ set_http_connect(503, 404, 404)
+ # acct acct acct
resp = getattr(controller, 'DELETE')(req)
self.assertEquals(resp.status_int, 404)
- proxy_server.http_connect = \
- fake_http_connect(503, 503, 404)
- # acct acct acct
+ set_http_connect(503, 503, 404)
+ # acct acct acct
resp = getattr(controller, 'DELETE')(req)
self.assertEquals(resp.status_int, 404)
- proxy_server.http_connect = \
- fake_http_connect(503, 503, 503)
- # acct acct acct
+ set_http_connect(503, 503, 503)
+ # acct acct acct
resp = getattr(controller, 'DELETE')(req)
self.assertEquals(resp.status_int, 404)
- proxy_server.http_connect = \
- fake_http_connect(200, 200, 204, 204, 204)
- # acct cont obj obj obj
+ set_http_connect(200, 200, 204, 204, 204)
+ # acct cont obj obj obj
resp = getattr(controller, 'DELETE')(req)
self.assertEquals(resp.status_int, 204)
- proxy_server.http_connect = \
- fake_http_connect(200, 404, 404, 404)
- # acct cont cont cont
+ set_http_connect(200, 404, 404, 404)
+ # acct cont cont cont
resp = getattr(controller, 'DELETE')(req)
self.assertEquals(resp.status_int, 404)
- proxy_server.http_connect = \
- fake_http_connect(200, 503, 503, 503)
- # acct cont cont cont
+ set_http_connect(200, 503, 503, 503)
+ # acct cont cont cont
resp = getattr(controller, 'DELETE')(req)
self.assertEquals(resp.status_int, 404)
for dev in self.app.account_ring.devs.values():
dev['errors'] = self.app.error_suppression_limit + 1
dev['last_error'] = time()
- proxy_server.http_connect = \
- fake_http_connect(200)
- # acct [isn't actually called since everything
- # is error limited]
+ set_http_connect(200)
+ # acct [isn't actually called since everything
+ # is error limited]
resp = getattr(controller, 'DELETE')(req)
self.assertEquals(resp.status_int, 404)
@@ -1461,10 +1457,9 @@ class TestObjectController(unittest.TestCase):
for dev in self.app.container_ring.devs.values():
dev['errors'] = self.app.error_suppression_limit + 1
dev['last_error'] = time()
- proxy_server.http_connect = \
- fake_http_connect(200, 200)
- # acct cont [isn't actually called since
- # everything is error limited]
+ set_http_connect(200, 200)
+ # acct cont [isn't actually called since
+ # everything is error limited]
resp = getattr(controller, 'DELETE')(req)
self.assertEquals(resp.status_int, 404)
@@ -1475,15 +1470,13 @@ class TestObjectController(unittest.TestCase):
controller = proxy_server.ObjectController(self.app, 'account',
'container', 'object')
- proxy_server.http_connect = \
- fake_http_connect(200, 404, 404, 404, 200, 200, 200)
+ set_http_connect(200, 404, 404, 404, 200, 200, 200)
req = Request.blank('/a/c/o', environ={'REQUEST_METHOD': 'PUT'})
self.app.update_request(req)
resp = controller.PUT(req)
self.assertEquals(resp.status_int, 404)
- proxy_server.http_connect = \
- fake_http_connect(200, 404, 404, 404, 200, 200)
+ set_http_connect(200, 404, 404, 404, 200, 200)
req = Request.blank('/a/c/o', environ={'REQUEST_METHOD': 'POST'},
headers={'Content-Type': 'text/plain'})
self.app.update_request(req)
@@ -1495,16 +1488,13 @@ class TestObjectController(unittest.TestCase):
self.app.memcache = FakeMemcacheReturnsNone()
controller = proxy_server.ObjectController(self.app, 'account',
'container', 'object')
- proxy_server.http_connect = \
- fake_http_connect(200, 404, 404, 404, 200, 200, 200)
+ set_http_connect(200, 404, 404, 404, 200, 200, 200)
req = Request.blank('/a/c/o', environ={'REQUEST_METHOD': 'PUT'})
self.app.update_request(req)
resp = controller.PUT(req)
self.assertEquals(resp.status_int, 404)
- proxy_server.http_connect = \
- fake_http_connect(200, 404, 404, 404, 200, 200, 200, 200, 200,
- 200)
+ set_http_connect(200, 404, 404, 404, 200, 200, 200, 200, 200, 200)
req = Request.blank('/a/c/o', environ={'REQUEST_METHOD': 'POST'},
headers={'Content-Type': 'text/plain'})
self.app.update_request(req)
@@ -1515,16 +1505,15 @@ class TestObjectController(unittest.TestCase):
with save_globals():
controller = proxy_server.ObjectController(self.app, 'account',
'container', 'object')
- proxy_server.http_connect = \
- fake_http_connect(200, 200, 201, 201, 201)
- # acct cont obj obj obj
+ set_http_connect(200, 200, 201, 201, 201)
+ # acct cont obj obj obj
req = Request.blank('/a/c/o', environ={'REQUEST_METHOD': 'PUT'},
headers={'Content-Length': '0'})
self.app.update_request(req)
resp = controller.PUT(req)
self.assertEquals(resp.status_int, 201)
- proxy_server.http_connect = fake_http_connect(201, 201, 201)
+ set_http_connect(201, 201, 201)
req = Request.blank('/a/c/o', environ={'REQUEST_METHOD': 'PUT'},
headers={'Content-Length': '0',
'X-Object-Meta-' + ('a' *
@@ -1532,7 +1521,7 @@ class TestObjectController(unittest.TestCase):
self.app.update_request(req)
resp = controller.PUT(req)
self.assertEquals(resp.status_int, 201)
- proxy_server.http_connect = fake_http_connect(201, 201, 201)
+ set_http_connect(201, 201, 201)
req = Request.blank('/a/c/o', environ={'REQUEST_METHOD': 'PUT'},
headers={'Content-Length': '0',
'X-Object-Meta-' + ('a' *
@@ -1541,7 +1530,7 @@ class TestObjectController(unittest.TestCase):
resp = controller.PUT(req)
self.assertEquals(resp.status_int, 400)
- proxy_server.http_connect = fake_http_connect(201, 201, 201)
+ set_http_connect(201, 201, 201)
req = Request.blank('/a/c/o', environ={'REQUEST_METHOD': 'PUT'},
headers={'Content-Length': '0',
'X-Object-Meta-Too-Long': 'a' *
@@ -1549,7 +1538,7 @@ class TestObjectController(unittest.TestCase):
self.app.update_request(req)
resp = controller.PUT(req)
self.assertEquals(resp.status_int, 201)
- proxy_server.http_connect = fake_http_connect(201, 201, 201)
+ set_http_connect(201, 201, 201)
req = Request.blank('/a/c/o', environ={'REQUEST_METHOD': 'PUT'},
headers={'Content-Length': '0',
'X-Object-Meta-Too-Long': 'a' *
@@ -1558,7 +1547,7 @@ class TestObjectController(unittest.TestCase):
resp = controller.PUT(req)
self.assertEquals(resp.status_int, 400)
- proxy_server.http_connect = fake_http_connect(201, 201, 201)
+ set_http_connect(201, 201, 201)
headers = {'Content-Length': '0'}
for x in xrange(MAX_META_COUNT):
headers['X-Object-Meta-%d' % x] = 'v'
@@ -1567,7 +1556,7 @@ class TestObjectController(unittest.TestCase):
self.app.update_request(req)
resp = controller.PUT(req)
self.assertEquals(resp.status_int, 201)
- proxy_server.http_connect = fake_http_connect(201, 201, 201)
+ set_http_connect(201, 201, 201)
headers = {'Content-Length': '0'}
for x in xrange(MAX_META_COUNT + 1):
headers['X-Object-Meta-%d' % x] = 'v'
@@ -1577,7 +1566,7 @@ class TestObjectController(unittest.TestCase):
resp = controller.PUT(req)
self.assertEquals(resp.status_int, 400)
- proxy_server.http_connect = fake_http_connect(201, 201, 201)
+ set_http_connect(201, 201, 201)
headers = {'Content-Length': '0'}
header_value = 'a' * MAX_META_VALUE_LENGTH
size = 0
@@ -1595,7 +1584,7 @@ class TestObjectController(unittest.TestCase):
self.app.update_request(req)
resp = controller.PUT(req)
self.assertEquals(resp.status_int, 201)
- proxy_server.http_connect = fake_http_connect(201, 201, 201)
+ set_http_connect(201, 201, 201)
headers['X-Object-Meta-a'] = \
'a' * (MAX_META_OVERALL_SIZE - size)
req = Request.blank('/a/c/o', environ={'REQUEST_METHOD': 'PUT'},
@@ -1612,9 +1601,8 @@ class TestObjectController(unittest.TestCase):
req = Request.blank('/a/c/o', environ={'REQUEST_METHOD': 'PUT'},
headers={'Content-Length': '0'})
self.app.update_request(req)
- proxy_server.http_connect = \
- fake_http_connect(200, 200, 201, 201, 201)
- # acct cont obj obj obj
+ set_http_connect(200, 200, 201, 201, 201)
+ # acct cont obj obj obj
resp = controller.PUT(req)
self.assertEquals(resp.status_int, 201)
@@ -1623,11 +1611,8 @@ class TestObjectController(unittest.TestCase):
headers={'Content-Length': '0',
'X-Copy-From': 'c/o'})
self.app.update_request(req)
- proxy_server.http_connect = \
- fake_http_connect(200, 200, 200, 200, 200, 200, 200, 201, 201,
- 201)
- # acct cont acct cont objc objc objc obj obj
- # obj
+ set_http_connect(200, 200, 200, 200, 200, 200, 200, 201, 201, 201)
+ # acct cont acct cont objc objc objc obj obj obj
self.app.memcache.store = {}
resp = controller.PUT(req)
self.assertEquals(resp.status_int, 201)
@@ -1638,9 +1623,8 @@ class TestObjectController(unittest.TestCase):
headers={'Content-Length': '5',
'X-Copy-From': 'c/o'})
self.app.update_request(req)
- proxy_server.http_connect = \
- fake_http_connect(200, 200, 200, 200, 200, 200, 200)
- # acct cont acct cont objc objc objc
+ set_http_connect(200, 200, 200, 200, 200, 200, 200)
+ # acct cont acct cont objc objc objc
self.app.memcache.store = {}
resp = controller.PUT(req)
self.assertEquals(resp.status_int, 400)
@@ -1650,11 +1634,8 @@ class TestObjectController(unittest.TestCase):
headers={'Content-Length': '0',
'X-Copy-From': 'c/o/o2'})
req.account = 'a'
- proxy_server.http_connect = \
- fake_http_connect(200, 200, 200, 200, 200, 200, 200, 201, 201,
- 201)
- # acct cont acct cont objc objc objc obj obj
- # obj
+ set_http_connect(200, 200, 200, 200, 200, 200, 200, 201, 201, 201)
+ # acct cont acct cont objc objc objc obj obj obj
self.app.memcache.store = {}
resp = controller.PUT(req)
self.assertEquals(resp.status_int, 201)
@@ -1665,11 +1646,8 @@ class TestObjectController(unittest.TestCase):
headers={'Content-Length': '0',
'X-Copy-From': 'c/o%20o2'})
req.account = 'a'
- proxy_server.http_connect = \
- fake_http_connect(200, 200, 200, 200, 200, 200, 200, 201, 201,
- 201)
- # acct cont acct cont objc objc objc obj obj
- # obj
+ set_http_connect(200, 200, 200, 200, 200, 200, 200, 201, 201, 201)
+ # acct cont acct cont objc objc objc obj obj obj
self.app.memcache.store = {}
resp = controller.PUT(req)
self.assertEquals(resp.status_int, 201)
@@ -1680,11 +1658,8 @@ class TestObjectController(unittest.TestCase):
headers={'Content-Length': '0',
'X-Copy-From': '/c/o'})
self.app.update_request(req)
- proxy_server.http_connect = \
- fake_http_connect(200, 200, 200, 200, 200, 200, 200, 201, 201,
- 201)
- # acct cont acct cont objc objc objc obj obj
- # obj
+ set_http_connect(200, 200, 200, 200, 200, 200, 200, 201, 201, 201)
+ # acct cont acct cont objc objc objc obj obj obj
self.app.memcache.store = {}
resp = controller.PUT(req)
self.assertEquals(resp.status_int, 201)
@@ -1694,11 +1669,8 @@ class TestObjectController(unittest.TestCase):
headers={'Content-Length': '0',
'X-Copy-From': '/c/o/o2'})
req.account = 'a'
- proxy_server.http_connect = \
- fake_http_connect(200, 200, 200, 200, 200, 200, 200, 201, 201,
- 201)
- # acct cont acct cont objc objc objc obj obj
- # obj
+ set_http_connect(200, 200, 200, 200, 200, 200, 200, 201, 201, 201)
+ # acct cont acct cont objc objc objc obj obj obj
self.app.memcache.store = {}
resp = controller.PUT(req)
self.assertEquals(resp.status_int, 201)
@@ -1720,9 +1692,8 @@ class TestObjectController(unittest.TestCase):
headers={'Content-Length': '0',
'X-Copy-From': '/c/o'})
self.app.update_request(req)
- proxy_server.http_connect = \
- fake_http_connect(200, 200, 503, 503, 503)
- # acct cont objc objc objc
+ set_http_connect(200, 200, 503, 503, 503)
+ # acct cont objc objc objc
self.app.memcache.store = {}
resp = controller.PUT(req)
self.assertEquals(resp.status_int, 503)
@@ -1732,9 +1703,8 @@ class TestObjectController(unittest.TestCase):
headers={'Content-Length': '0',
'X-Copy-From': '/c/o'})
self.app.update_request(req)
- proxy_server.http_connect = \
- fake_http_connect(200, 200, 404, 404, 404)
- # acct cont objc objc objc
+ set_http_connect(200, 200, 404, 404, 404)
+ # acct cont objc objc objc
self.app.memcache.store = {}
resp = controller.PUT(req)
self.assertEquals(resp.status_int, 404)
@@ -1744,9 +1714,8 @@ class TestObjectController(unittest.TestCase):
headers={'Content-Length': '0',
'X-Copy-From': '/c/o'})
self.app.update_request(req)
- proxy_server.http_connect = \
- fake_http_connect(200, 200, 404, 404, 200, 201, 201, 201)
- # acct cont objc objc objc obj obj obj
+ set_http_connect(200, 200, 404, 404, 200, 201, 201, 201)
+ # acct cont objc objc objc obj obj obj
self.app.memcache.store = {}
resp = controller.PUT(req)
self.assertEquals(resp.status_int, 201)
@@ -1757,9 +1726,8 @@ class TestObjectController(unittest.TestCase):
'X-Copy-From': '/c/o',
'X-Object-Meta-Ours': 'okay'})
self.app.update_request(req)
- proxy_server.http_connect = \
- fake_http_connect(200, 200, 200, 200, 200, 201, 201, 201)
- # acct cont objc objc objc obj obj obj
+ set_http_connect(200, 200, 200, 200, 200, 201, 201, 201)
+ # acct cont objc objc objc obj obj obj
self.app.memcache.store = {}
resp = controller.PUT(req)
self.assertEquals(resp.status_int, 201)
@@ -1773,20 +1741,16 @@ class TestObjectController(unittest.TestCase):
req = Request.blank('/a/c/o', environ={'REQUEST_METHOD': 'PUT'},
headers={'Content-Length': '0'})
req.account = 'a'
- proxy_server.http_connect = \
- fake_http_connect(200, 200, 201, 201, 201)
- # acct cont obj obj obj
+ set_http_connect(200, 200, 201, 201, 201)
+ # acct cont obj obj obj
resp = controller.PUT(req)
self.assertEquals(resp.status_int, 201)
req = Request.blank('/a/c/o', environ={'REQUEST_METHOD': 'COPY'},
headers={'Destination': 'c/o'})
req.account = 'a'
- proxy_server.http_connect = \
- fake_http_connect(200, 200, 200, 200, 200, 200, 200, 201, 201,
- 201)
- # acct cont acct cont objc objc objc obj obj
- # obj
+ set_http_connect(200, 200, 200, 200, 200, 200, 200, 201, 201, 201)
+ # acct cont acct cont objc objc objc obj obj obj
self.app.memcache.store = {}
resp = controller.COPY(req)
self.assertEquals(resp.status_int, 201)
@@ -1797,11 +1761,8 @@ class TestObjectController(unittest.TestCase):
headers={'Destination': 'c/o'})
req.account = 'a'
controller.object_name = 'o/o2'
- proxy_server.http_connect = \
- fake_http_connect(200, 200, 200, 200, 200, 200, 200, 201, 201,
- 201)
- # acct cont acct cont objc objc objc obj obj
- # obj
+ set_http_connect(200, 200, 200, 200, 200, 200, 200, 201, 201, 201)
+ # acct cont acct cont objc objc objc obj obj obj
self.app.memcache.store = {}
resp = controller.COPY(req)
self.assertEquals(resp.status_int, 201)
@@ -1811,11 +1772,8 @@ class TestObjectController(unittest.TestCase):
headers={'Destination': '/c/o'})
req.account = 'a'
controller.object_name = 'o'
- proxy_server.http_connect = \
- fake_http_connect(200, 200, 200, 200, 200, 200, 200, 201, 201,
- 201)
- # acct cont acct cont objc objc objc obj obj
- # obj
+ set_http_connect(200, 200, 200, 200, 200, 200, 200, 201, 201, 201)
+ # acct cont acct cont objc objc objc obj obj obj
self.app.memcache.store = {}
resp = controller.COPY(req)
self.assertEquals(resp.status_int, 201)
@@ -1826,11 +1784,8 @@ class TestObjectController(unittest.TestCase):
headers={'Destination': '/c/o'})
req.account = 'a'
controller.object_name = 'o/o2'
- proxy_server.http_connect = \
- fake_http_connect(200, 200, 200, 200, 200, 200, 200, 201, 201,
- 201)
- # acct cont acct cont objc objc objc obj obj
- # obj
+ set_http_connect(200, 200, 200, 200, 200, 200, 200, 201, 201, 201)
+ # acct cont acct cont objc objc objc obj obj obj
self.app.memcache.store = {}
resp = controller.COPY(req)
self.assertEquals(resp.status_int, 201)
@@ -1840,9 +1795,8 @@ class TestObjectController(unittest.TestCase):
headers={'Destination': 'c_o'})
req.account = 'a'
controller.object_name = 'o'
- proxy_server.http_connect = \
- fake_http_connect(200, 200)
- # acct cont
+ set_http_connect(200, 200)
+ # acct cont
self.app.memcache.store = {}
resp = controller.COPY(req)
self.assertEquals(resp.status_int, 412)
@@ -1851,9 +1805,8 @@ class TestObjectController(unittest.TestCase):
headers={'Destination': '/c/o'})
req.account = 'a'
controller.object_name = 'o'
- proxy_server.http_connect = \
- fake_http_connect(200, 200, 503, 503, 503)
- # acct cont objc objc objc
+ set_http_connect(200, 200, 503, 503, 503)
+ # acct cont objc objc objc
self.app.memcache.store = {}
resp = controller.COPY(req)
self.assertEquals(resp.status_int, 503)
@@ -1862,9 +1815,8 @@ class TestObjectController(unittest.TestCase):
headers={'Destination': '/c/o'})
req.account = 'a'
controller.object_name = 'o'
- proxy_server.http_connect = \
- fake_http_connect(200, 200, 404, 404, 404)
- # acct cont objc objc objc
+ set_http_connect(200, 200, 404, 404, 404)
+ # acct cont objc objc objc
self.app.memcache.store = {}
resp = controller.COPY(req)
self.assertEquals(resp.status_int, 404)
@@ -1873,9 +1825,8 @@ class TestObjectController(unittest.TestCase):
headers={'Destination': '/c/o'})
req.account = 'a'
controller.object_name = 'o'
- proxy_server.http_connect = \
- fake_http_connect(200, 200, 404, 404, 200, 201, 201, 201)
- # acct cont objc objc objc obj obj obj
+ set_http_connect(200, 200, 404, 404, 200, 201, 201, 201)
+ # acct cont objc objc objc obj obj obj
self.app.memcache.store = {}
resp = controller.COPY(req)
self.assertEquals(resp.status_int, 201)
@@ -1885,9 +1836,8 @@ class TestObjectController(unittest.TestCase):
'X-Object-Meta-Ours': 'okay'})
req.account = 'a'
controller.object_name = 'o'
- proxy_server.http_connect = \
- fake_http_connect(200, 200, 200, 200, 200, 201, 201, 201)
- # acct cont objc objc objc obj obj obj
+ set_http_connect(200, 200, 200, 200, 200, 201, 201, 201)
+ # acct cont objc objc objc obj obj obj
self.app.memcache.store = {}
resp = controller.COPY(req)
self.assertEquals(resp.status_int, 201)
@@ -1902,10 +1852,9 @@ class TestObjectController(unittest.TestCase):
headers={'Destination': '/c/o'})
req.account = 'a'
controller.object_name = 'o'
- proxy_server.http_connect = \
- fake_http_connect(200, 200, 200, 200, 200, 201, 201, 201,
- timestamps=('1', '1', '1', '3', '2', '4', '4', '4'))
- # acct cont objc objc objc obj obj obj
+ set_http_connect(200, 200, 200, 200, 200, 201, 201, 201,
+ # acct cont objc objc objc obj obj obj
+ timestamps=('1', '1', '1', '3', '2', '4', '4', '4'))
self.app.memcache.store = {}
resp = controller.COPY(req)
self.assertEquals(resp.status_int, 201)
@@ -1934,7 +1883,7 @@ class TestObjectController(unittest.TestCase):
return data
with save_globals():
- proxy_server.http_connect = fake_http_connect(201, 201, 201, 201)
+ set_http_connect(201, 201, 201, 201)
controller = proxy_server.ObjectController(self.app, 'account',
'container', 'object')
req = Request.blank('/a/c/o', environ={'REQUEST_METHOD': 'COPY'},
@@ -1948,8 +1897,7 @@ class TestObjectController(unittest.TestCase):
self.assertEquals(res.status_int // 100, 2) # success
# test 413 entity to large
- from swift.proxy import server
- proxy_server.http_connect = fake_http_connect(201, 201, 201, 201)
+ set_http_connect(201, 201, 201, 201)
req = Request.blank('/a/c/o', environ={'REQUEST_METHOD': 'COPY'},
headers={'Transfer-Encoding': 'chunked',
'Content-Type': 'foo/bar'})
@@ -1957,11 +1905,11 @@ class TestObjectController(unittest.TestCase):
self.app.memcache.store = {}
self.app.update_request(req)
try:
- server.MAX_FILE_SIZE = 10
+ swift.proxy.controllers.obj.MAX_FILE_SIZE = 10
res = controller.PUT(req)
self.assertEquals(res.status_int, 413)
finally:
- server.MAX_FILE_SIZE = MAX_FILE_SIZE
+ swift.proxy.controllers.obj.MAX_FILE_SIZE = MAX_FILE_SIZE
def test_chunked_put_bad_version(self):
# Check bad version
@@ -2727,7 +2675,7 @@ class TestObjectController(unittest.TestCase):
body = fd.read()
self.assertEquals(body, '1234 1234 1234 1234 1234 ')
# Do it again but exceeding the container listing limit
- proxy_server.CONTAINER_LISTING_LIMIT = 2
+ swift.proxy.controllers.obj.CONTAINER_LISTING_LIMIT = 2
sock = connect_tcp(('localhost', prolis.getsockname()[1]))
fd = sock.makefile()
fd.write('GET /v1/a/segmented%20object/object%20name HTTP/1.1\r\n'
@@ -2766,7 +2714,7 @@ class TestObjectController(unittest.TestCase):
body = fd.read()
# After adjusting the CONTAINER_LISTING_LIMIT, make a copy of
# the manifested object which should consolidate the segments.
- proxy_server.CONTAINER_LISTING_LIMIT = 10000
+ swift.proxy.controllers.obj.CONTAINER_LISTING_LIMIT = 10000
sock = connect_tcp(('localhost', prolis.getsockname()[1]))
fd = sock.makefile()
fd.write('PUT /v1/a/segmented%20object/copy HTTP/1.1\r\n'
@@ -2933,7 +2881,7 @@ class TestObjectController(unittest.TestCase):
req = Request.blank('/a/c/o', environ={'REQUEST_METHOD': 'PUT'},
headers={'Content-Length': '0'})
self.app.update_request(req)
- proxy_server.http_connect = fake_http_connect(200, 201, 201, 201,
+ set_http_connect(200, 201, 201, 201,
etags=[None,
'68b329da9893e34099c7d8ad5cb9c940',
'68b329da9893e34099c7d8ad5cb9c940',
@@ -2948,7 +2896,7 @@ class TestObjectController(unittest.TestCase):
'ETag': '68b329da9893e34099c7d8ad5cb9c940',
})
self.app.update_request(req)
- proxy_server.http_connect = fake_http_connect(200, 422, 422, 503,
+ set_http_connect(200, 422, 422, 503,
etags=['68b329da9893e34099c7d8ad5cb9c940',
'68b329da9893e34099c7d8ad5cb9c941',
None,
@@ -2962,7 +2910,7 @@ class TestObjectController(unittest.TestCase):
self.app.update_request(req)
controller = proxy_server.ObjectController(self.app, 'account',
'container', 'object')
- proxy_server.http_connect = fake_http_connect(200, 200, 200)
+ set_http_connect(200, 200, 200)
resp = controller.GET(req)
self.assert_('accept-ranges' in resp.headers)
self.assertEquals(resp.headers['accept-ranges'], 'bytes')
@@ -2973,7 +2921,7 @@ class TestObjectController(unittest.TestCase):
self.app.update_request(req)
controller = proxy_server.ObjectController(self.app, 'account',
'container', 'object')
- proxy_server.http_connect = fake_http_connect(200, 200, 200)
+ set_http_connect(200, 200, 200)
resp = controller.HEAD(req)
self.assert_('accept-ranges' in resp.headers)
self.assertEquals(resp.headers['accept-ranges'], 'bytes')
@@ -2985,8 +2933,7 @@ class TestObjectController(unittest.TestCase):
called[0] = True
return HTTPUnauthorized(request=req)
with save_globals():
- proxy_server.http_connect = \
- fake_http_connect(200, 200, 201, 201, 201)
+ set_http_connect(200, 200, 201, 201, 201)
controller = proxy_server.ObjectController(self.app, 'account',
'container', 'object')
req = Request.blank('/a/c/o')
@@ -3002,8 +2949,7 @@ class TestObjectController(unittest.TestCase):
called[0] = True
return HTTPUnauthorized(request=req)
with save_globals():
- proxy_server.http_connect = \
- fake_http_connect(200, 200, 201, 201, 201)
+ set_http_connect(200, 200, 201, 201, 201)
controller = proxy_server.ObjectController(self.app, 'account',
'container', 'object')
req = Request.blank('/a/c/o', {'REQUEST_METHOD': 'HEAD'})
@@ -3020,8 +2966,7 @@ class TestObjectController(unittest.TestCase):
return HTTPUnauthorized(request=req)
with save_globals():
self.app.object_post_as_copy = False
- proxy_server.http_connect = \
- fake_http_connect(200, 200, 201, 201, 201)
+ set_http_connect(200, 200, 201, 201, 201)
controller = proxy_server.ObjectController(self.app, 'account',
'container', 'object')
req = Request.blank('/a/c/o', environ={'REQUEST_METHOD': 'POST'},
@@ -3038,8 +2983,7 @@ class TestObjectController(unittest.TestCase):
called[0] = True
return HTTPUnauthorized(request=req)
with save_globals():
- proxy_server.http_connect = \
- fake_http_connect(200, 200, 200, 200, 200, 201, 201, 201)
+ set_http_connect(200, 200, 200, 200, 200, 201, 201, 201)
controller = proxy_server.ObjectController(self.app, 'account',
'container', 'object')
req = Request.blank('/a/c/o', environ={'REQUEST_METHOD': 'POST'},
@@ -3056,8 +3000,7 @@ class TestObjectController(unittest.TestCase):
called[0] = True
return HTTPUnauthorized(request=req)
with save_globals():
- proxy_server.http_connect = \
- fake_http_connect(200, 200, 201, 201, 201)
+ set_http_connect(200, 200, 201, 201, 201)
controller = proxy_server.ObjectController(self.app, 'account',
'container', 'object')
req = Request.blank('/a/c/o', environ={'REQUEST_METHOD': 'PUT'},
@@ -3074,8 +3017,7 @@ class TestObjectController(unittest.TestCase):
called[0] = True
return HTTPUnauthorized(request=req)
with save_globals():
- proxy_server.http_connect = \
- fake_http_connect(200, 200, 200, 200, 200, 201, 201, 201)
+ set_http_connect(200, 200, 200, 200, 200, 201, 201, 201)
controller = proxy_server.ObjectController(self.app, 'account',
'container', 'object')
req = Request.blank('/a/c/o', environ={'REQUEST_METHOD': 'COPY'},
@@ -3089,8 +3031,7 @@ class TestObjectController(unittest.TestCase):
with save_globals():
controller = proxy_server.ObjectController(self.app, 'account',
'container', 'object')
- proxy_server.http_connect = \
- fake_http_connect(200, 200, 200, 200, 200, 202, 202, 202)
+ set_http_connect(200, 200, 200, 200, 200, 202, 202, 202)
self.app.memcache.store = {}
orig_time = proxy_server.time.time
try:
@@ -3107,8 +3048,7 @@ class TestObjectController(unittest.TestCase):
self.app.object_post_as_copy = False
controller = proxy_server.ObjectController(self.app, 'account',
'container', 'object')
- proxy_server.http_connect = \
- fake_http_connect(200, 200, 202, 202, 202)
+ set_http_connect(200, 200, 202, 202, 202)
self.app.memcache.store = {}
req = Request.blank('/a/c/o', {},
headers={'Content-Type': 'foo/bar', 'X-Delete-After': '60'})
@@ -3124,8 +3064,7 @@ class TestObjectController(unittest.TestCase):
with save_globals():
controller = proxy_server.ObjectController(self.app, 'account',
'container', 'object')
- proxy_server.http_connect = \
- fake_http_connect(200, 200, 200, 200, 200, 202, 202, 202)
+ set_http_connect(200, 200, 200, 200, 200, 202, 202, 202)
self.app.memcache.store = {}
req = Request.blank('/a/c/o', {},
headers={'Content-Type': 'foo/bar', 'X-Delete-After': '60.1'})
@@ -3138,8 +3077,7 @@ class TestObjectController(unittest.TestCase):
with save_globals():
controller = proxy_server.ObjectController(self.app, 'account',
'container', 'object')
- proxy_server.http_connect = \
- fake_http_connect(200, 200, 200, 200, 200, 202, 202, 202)
+ set_http_connect(200, 200, 200, 200, 200, 202, 202, 202)
self.app.memcache.store = {}
req = Request.blank('/a/c/o', {},
headers={'Content-Type': 'foo/bar', 'X-Delete-After': '-60'})
@@ -3160,7 +3098,7 @@ class TestObjectController(unittest.TestCase):
controller = proxy_server.ObjectController(self.app, 'account',
'container', 'object')
controller.make_requests = fake_make_requests
- proxy_server.http_connect = fake_http_connect(200, 200)
+ set_http_connect(200, 200)
self.app.memcache.store = {}
t = str(int(time() + 100))
req = Request.blank('/a/c/o', {},
@@ -3192,8 +3130,7 @@ class TestObjectController(unittest.TestCase):
with save_globals():
controller = proxy_server.ObjectController(self.app, 'account',
'container', 'object')
- proxy_server.http_connect = \
- fake_http_connect(200, 200, 201, 201, 201)
+ set_http_connect(200, 200, 201, 201, 201)
self.app.memcache.store = {}
orig_time = proxy_server.time.time
try:
@@ -3214,8 +3151,7 @@ class TestObjectController(unittest.TestCase):
with save_globals():
controller = proxy_server.ObjectController(self.app, 'account',
'container', 'object')
- proxy_server.http_connect = \
- fake_http_connect(200, 200, 201, 201, 201)
+ set_http_connect(200, 200, 201, 201, 201)
self.app.memcache.store = {}
req = Request.blank('/a/c/o', {},
headers={'Content-Length': '0', 'Content-Type': 'foo/bar',
@@ -3229,8 +3165,7 @@ class TestObjectController(unittest.TestCase):
with save_globals():
controller = proxy_server.ObjectController(self.app, 'account',
'container', 'object')
- proxy_server.http_connect = \
- fake_http_connect(200, 200, 201, 201, 201)
+ set_http_connect(200, 200, 201, 201, 201)
self.app.memcache.store = {}
req = Request.blank('/a/c/o', {},
headers={'Content-Length': '0', 'Content-Type': 'foo/bar',
@@ -3251,7 +3186,7 @@ class TestObjectController(unittest.TestCase):
controller = proxy_server.ObjectController(self.app, 'account',
'container', 'object')
controller._connect_put_node = fake_connect_put_node
- proxy_server.http_connect = fake_http_connect(200, 200)
+ set_http_connect(200, 200)
self.app.memcache.store = {}
t = str(int(time() + 100))
req = Request.blank('/a/c/o', {},
@@ -3298,14 +3233,14 @@ class TestContainerController(unittest.TestCase):
if raise_exc:
kwargs['raise_exc'] = raise_exc
kwargs['missing_container'] = missing_container
- proxy_server.http_connect = fake_http_connect(*statuses, **kwargs)
+ set_http_connect(*statuses, **kwargs)
self.app.memcache.store = {}
req = Request.blank('/a/c', headers={'Content-Length': '0',
'Content-Type': 'text/plain'})
self.app.update_request(req)
res = method(req)
self.assertEquals(res.status_int, expected)
- proxy_server.http_connect = fake_http_connect(*statuses, **kwargs)
+ set_http_connect(*statuses, **kwargs)
self.app.memcache.store = {}
req = Request.blank('/a/c/', headers={'Content-Length': '0',
'Content-Type': 'text/plain'})
@@ -3319,8 +3254,7 @@ class TestContainerController(unittest.TestCase):
'container')
def test_status_map(statuses, expected, **kwargs):
- proxy_server.http_connect = fake_http_connect(*statuses,
- **kwargs)
+ set_http_connect(*statuses, **kwargs)
self.app.memcache.store = {}
req = Request.blank('/a/c', {})
self.app.update_request(req)
@@ -3343,8 +3277,7 @@ class TestContainerController(unittest.TestCase):
'container')
def test_status_map(statuses, expected, **kwargs):
- proxy_server.http_connect = fake_http_connect(*statuses,
- **kwargs)
+ set_http_connect(*statuses, **kwargs)
self.app.memcache.store = {}
req = Request.blank('/a/c', {})
req.content_length = 0
@@ -3413,38 +3346,32 @@ class TestContainerController(unittest.TestCase):
controller = proxy_server.ContainerController(self.app,
'account', 'container')
if meth == 'PUT':
- proxy_server.http_connect = \
- fake_http_connect(200, 200, 200, 200, 200, 200,
+ set_http_connect(200, 200, 200, 200, 200, 200,
missing_container=True)
else:
- proxy_server.http_connect = \
- fake_http_connect(200, 200, 200, 200)
+ set_http_connect(200, 200, 200, 200)
self.app.memcache.store = {}
req = Request.blank('/a/c', environ={'REQUEST_METHOD': meth})
self.app.update_request(req)
resp = getattr(controller, meth)(req)
self.assertEquals(resp.status_int, 200)
- proxy_server.http_connect = \
- fake_http_connect(404, 404, 404, 200, 200, 200)
+ set_http_connect(404, 404, 404, 200, 200, 200)
resp = getattr(controller, meth)(req)
self.assertEquals(resp.status_int, 404)
- proxy_server.http_connect = \
- fake_http_connect(503, 404, 404)
+ set_http_connect(503, 404, 404)
resp = getattr(controller, meth)(req)
self.assertEquals(resp.status_int, 404)
- proxy_server.http_connect = \
- fake_http_connect(503, 404, raise_exc=True)
+ set_http_connect(503, 404, raise_exc=True)
resp = getattr(controller, meth)(req)
self.assertEquals(resp.status_int, 404)
for dev in self.app.account_ring.devs.values():
dev['errors'] = self.app.error_suppression_limit + 1
dev['last_error'] = time()
- proxy_server.http_connect = \
- fake_http_connect(200, 200, 200, 200, 200, 200)
+ set_http_connect(200, 200, 200, 200, 200, 200)
resp = getattr(controller, meth)(req)
self.assertEquals(resp.status_int, 404)
@@ -3467,8 +3394,8 @@ class TestContainerController(unittest.TestCase):
controller = proxy_server.ContainerController(self.app, 'account',
'container')
self.app.memcache = MockMemcache(allow_lock=True)
- proxy_server.http_connect = fake_http_connect(
- 200, 200, 200, 201, 201, 201, missing_container=True)
+ set_http_connect(200, 200, 200, 201, 201, 201,
+ missing_container=True)
req = Request.blank('/a/c', environ={'REQUEST_METHOD': 'PUT'})
self.app.update_request(req)
res = controller.PUT(req)
@@ -3476,7 +3403,7 @@ class TestContainerController(unittest.TestCase):
def test_error_limiting(self):
with save_globals():
- proxy_server.shuffle = lambda l: None
+ set_shuffle()
controller = proxy_server.ContainerController(self.app, 'account',
'container')
self.assert_status_map(controller.HEAD, (200, 503, 200, 200), 200,
@@ -3524,7 +3451,7 @@ class TestContainerController(unittest.TestCase):
def test_response_get_accept_ranges_header(self):
with save_globals():
- proxy_server.http_connect = fake_http_connect(200, 200, body='{}')
+ set_http_connect(200, 200, body='{}')
controller = proxy_server.ContainerController(self.app, 'account',
'container')
req = Request.blank('/a/c?format=json')
@@ -3535,7 +3462,7 @@ class TestContainerController(unittest.TestCase):
def test_response_head_accept_ranges_header(self):
with save_globals():
- proxy_server.http_connect = fake_http_connect(200, 200, body='{}')
+ set_http_connect(200, 200, body='{}')
controller = proxy_server.ContainerController(self.app, 'account',
'container')
req = Request.blank('/a/c?format=json')
@@ -3576,8 +3503,7 @@ class TestContainerController(unittest.TestCase):
with save_globals():
controller = \
proxy_server.ContainerController(self.app, 'a', 'c')
- proxy_server.http_connect = fake_http_connect(200, 201, 201,
- 201, give_connect=test_connect)
+ set_http_connect(200, 201, 201, 201, give_connect=test_connect)
req = Request.blank('/a/c', environ={'REQUEST_METHOD': method},
headers={test_header: test_value})
self.app.update_request(req)
@@ -3593,20 +3519,20 @@ class TestContainerController(unittest.TestCase):
def bad_metadata_helper(self, method):
with save_globals():
controller = proxy_server.ContainerController(self.app, 'a', 'c')
- proxy_server.http_connect = fake_http_connect(200, 201, 201, 201)
+ set_http_connect(200, 201, 201, 201)
req = Request.blank('/a/c', environ={'REQUEST_METHOD': method})
self.app.update_request(req)
resp = getattr(controller, method)(req)
self.assertEquals(resp.status_int, 201)
- proxy_server.http_connect = fake_http_connect(201, 201, 201)
+ set_http_connect(201, 201, 201)
req = Request.blank('/a/c', environ={'REQUEST_METHOD': method},
headers={'X-Container-Meta-' +
('a' * MAX_META_NAME_LENGTH): 'v'})
self.app.update_request(req)
resp = getattr(controller, method)(req)
self.assertEquals(resp.status_int, 201)
- proxy_server.http_connect = fake_http_connect(201, 201, 201)
+ set_http_connect(201, 201, 201)
req = Request.blank('/a/c', environ={'REQUEST_METHOD': method},
headers={'X-Container-Meta-' +
('a' * (MAX_META_NAME_LENGTH + 1)): 'v'})
@@ -3614,14 +3540,14 @@ class TestContainerController(unittest.TestCase):
resp = getattr(controller, method)(req)
self.assertEquals(resp.status_int, 400)
- proxy_server.http_connect = fake_http_connect(201, 201, 201)
+ set_http_connect(201, 201, 201)
req = Request.blank('/a/c', environ={'REQUEST_METHOD': method},
headers={'X-Container-Meta-Too-Long':
'a' * MAX_META_VALUE_LENGTH})
self.app.update_request(req)
resp = getattr(controller, method)(req)
self.assertEquals(resp.status_int, 201)
- proxy_server.http_connect = fake_http_connect(201, 201, 201)
+ set_http_connect(201, 201, 201)
req = Request.blank('/a/c', environ={'REQUEST_METHOD': method},
headers={'X-Container-Meta-Too-Long':
'a' * (MAX_META_VALUE_LENGTH + 1)})
@@ -3629,7 +3555,7 @@ class TestContainerController(unittest.TestCase):
resp = getattr(controller, method)(req)
self.assertEquals(resp.status_int, 400)
- proxy_server.http_connect = fake_http_connect(201, 201, 201)
+ set_http_connect(201, 201, 201)
headers = {}
for x in xrange(MAX_META_COUNT):
headers['X-Container-Meta-%d' % x] = 'v'
@@ -3638,7 +3564,7 @@ class TestContainerController(unittest.TestCase):
self.app.update_request(req)
resp = getattr(controller, method)(req)
self.assertEquals(resp.status_int, 201)
- proxy_server.http_connect = fake_http_connect(201, 201, 201)
+ set_http_connect(201, 201, 201)
headers = {}
for x in xrange(MAX_META_COUNT + 1):
headers['X-Container-Meta-%d' % x] = 'v'
@@ -3648,7 +3574,7 @@ class TestContainerController(unittest.TestCase):
resp = getattr(controller, method)(req)
self.assertEquals(resp.status_int, 400)
- proxy_server.http_connect = fake_http_connect(201, 201, 201)
+ set_http_connect(201, 201, 201)
headers = {}
header_value = 'a' * MAX_META_VALUE_LENGTH
size = 0
@@ -3665,7 +3591,7 @@ class TestContainerController(unittest.TestCase):
self.app.update_request(req)
resp = getattr(controller, method)(req)
self.assertEquals(resp.status_int, 201)
- proxy_server.http_connect = fake_http_connect(201, 201, 201)
+ set_http_connect(201, 201, 201)
headers['X-Container-Meta-a'] = \
'a' * (MAX_META_OVERALL_SIZE - size)
req = Request.blank('/a/c', environ={'REQUEST_METHOD': method},
@@ -3681,7 +3607,7 @@ class TestContainerController(unittest.TestCase):
called[0] = True
raise ValueError('fake error')
with save_globals():
- proxy_server.http_connect = fake_http_connect(200, 201, 201, 201)
+ set_http_connect(200, 201, 201, 201)
controller = proxy_server.ContainerController(self.app, 'account',
'container')
req = Request.blank('/a/c', environ={'REQUEST_METHOD': 'POST'},
@@ -3692,7 +3618,7 @@ class TestContainerController(unittest.TestCase):
self.assert_(called[0])
called[0] = False
with save_globals():
- proxy_server.http_connect = fake_http_connect(200, 201, 201, 201)
+ set_http_connect(200, 201, 201, 201)
controller = proxy_server.ContainerController(self.app, 'account',
'container')
req = Request.blank('/a/c', environ={'REQUEST_METHOD': 'POST'},
@@ -3709,7 +3635,7 @@ class TestContainerController(unittest.TestCase):
called[0] = True
raise ValueError('fake error')
with save_globals():
- proxy_server.http_connect = fake_http_connect(200, 201, 201, 201)
+ set_http_connect(200, 201, 201, 201)
controller = proxy_server.ContainerController(self.app, 'account',
'container')
req = Request.blank('/a/c', environ={'REQUEST_METHOD': 'PUT'},
@@ -3720,7 +3646,7 @@ class TestContainerController(unittest.TestCase):
self.assert_(called[0])
called[0] = False
with save_globals():
- proxy_server.http_connect = fake_http_connect(200, 201, 201, 201)
+ set_http_connect(200, 201, 201, 201)
controller = proxy_server.ContainerController(self.app, 'account',
'container')
req = Request.blank('/a/c', environ={'REQUEST_METHOD': 'PUT'},
@@ -3732,8 +3658,7 @@ class TestContainerController(unittest.TestCase):
def test_GET_no_content(self):
with save_globals():
- proxy_server.http_connect = \
- fake_http_connect(200, 204, 204, 204)
+ set_http_connect(200, 204, 204, 204)
controller = proxy_server.ContainerController(self.app, 'account',
'container')
req = Request.blank('/a/c')
@@ -3749,8 +3674,7 @@ class TestContainerController(unittest.TestCase):
called[0] = True
return HTTPUnauthorized(request=req)
with save_globals():
- proxy_server.http_connect = \
- fake_http_connect(200, 201, 201, 201)
+ set_http_connect(200, 201, 201, 201)
controller = proxy_server.ContainerController(self.app, 'account',
'container')
req = Request.blank('/a/c')
@@ -3766,8 +3690,7 @@ class TestContainerController(unittest.TestCase):
called[0] = True
return HTTPUnauthorized(request=req)
with save_globals():
- proxy_server.http_connect = \
- fake_http_connect(200, 201, 201, 201)
+ set_http_connect(200, 201, 201, 201)
controller = proxy_server.ContainerController(self.app, 'account',
'container')
req = Request.blank('/a/c', {'REQUEST_METHOD': 'HEAD'})
@@ -3786,12 +3709,12 @@ class TestAccountController(unittest.TestCase):
def assert_status_map(self, method, statuses, expected):
with save_globals():
- proxy_server.http_connect = fake_http_connect(*statuses)
+ set_http_connect(*statuses)
req = Request.blank('/a', {})
self.app.update_request(req)
res = method(req)
self.assertEquals(res.status_int, expected)
- proxy_server.http_connect = fake_http_connect(*statuses)
+ set_http_connect(*statuses)
req = Request.blank('/a/', {})
self.app.update_request(req)
res = method(req)
@@ -3903,7 +3826,7 @@ class TestAccountController(unittest.TestCase):
def test_response_get_accept_ranges_header(self):
with save_globals():
- proxy_server.http_connect = fake_http_connect(200, 200, body='{}')
+ set_http_connect(200, 200, body='{}')
controller = proxy_server.AccountController(self.app, 'account')
req = Request.blank('/a?format=json')
self.app.update_request(req)
@@ -3913,7 +3836,7 @@ class TestAccountController(unittest.TestCase):
def test_response_head_accept_ranges_header(self):
with save_globals():
- proxy_server.http_connect = fake_http_connect(200, 200, body='{}')
+ set_http_connect(200, 200, body='{}')
controller = proxy_server.AccountController(self.app, 'account')
req = Request.blank('/a?format=json')
self.app.update_request(req)
@@ -3927,8 +3850,7 @@ class TestAccountController(unittest.TestCase):
controller = proxy_server.AccountController(self.app, 'account')
def test_status_map(statuses, expected, **kwargs):
- proxy_server.http_connect = \
- fake_http_connect(*statuses, **kwargs)
+ set_http_connect(*statuses, **kwargs)
self.app.memcache.store = {}
req = Request.blank('/a', {})
req.content_length = 0
@@ -3992,8 +3914,7 @@ class TestAccountController(unittest.TestCase):
self.app.allow_account_management = True
controller = \
proxy_server.AccountController(self.app, 'a')
- proxy_server.http_connect = fake_http_connect(201, 201, 201,
- give_connect=test_connect)
+ set_http_connect(201, 201, 201, give_connect=test_connect)
req = Request.blank('/a/c', environ={'REQUEST_METHOD': method},
headers={test_header: test_value})
self.app.update_request(req)
@@ -4010,20 +3931,20 @@ class TestAccountController(unittest.TestCase):
with save_globals():
self.app.allow_account_management = True
controller = proxy_server.AccountController(self.app, 'a')
- proxy_server.http_connect = fake_http_connect(200, 201, 201, 201)
+ set_http_connect(200, 201, 201, 201)
req = Request.blank('/a/c', environ={'REQUEST_METHOD': method})
self.app.update_request(req)
resp = getattr(controller, method)(req)
self.assertEquals(resp.status_int, 201)
- proxy_server.http_connect = fake_http_connect(201, 201, 201)
+ set_http_connect(201, 201, 201)
req = Request.blank('/a/c', environ={'REQUEST_METHOD': method},
headers={'X-Account-Meta-' +
('a' * MAX_META_NAME_LENGTH): 'v'})
self.app.update_request(req)
resp = getattr(controller, method)(req)
self.assertEquals(resp.status_int, 201)
- proxy_server.http_connect = fake_http_connect(201, 201, 201)
+ set_http_connect(201, 201, 201)
req = Request.blank('/a/c', environ={'REQUEST_METHOD': method},
headers={'X-Account-Meta-' +
('a' * (MAX_META_NAME_LENGTH + 1)): 'v'})
@@ -4031,14 +3952,14 @@ class TestAccountController(unittest.TestCase):
resp = getattr(controller, method)(req)
self.assertEquals(resp.status_int, 400)
- proxy_server.http_connect = fake_http_connect(201, 201, 201)
+ set_http_connect(201, 201, 201)
req = Request.blank('/a/c', environ={'REQUEST_METHOD': method},
headers={'X-Account-Meta-Too-Long':
'a' * MAX_META_VALUE_LENGTH})
self.app.update_request(req)
resp = getattr(controller, method)(req)
self.assertEquals(resp.status_int, 201)
- proxy_server.http_connect = fake_http_connect(201, 201, 201)
+ set_http_connect(201, 201, 201)
req = Request.blank('/a/c', environ={'REQUEST_METHOD': method},
headers={'X-Account-Meta-Too-Long':
'a' * (MAX_META_VALUE_LENGTH + 1)})
@@ -4046,7 +3967,7 @@ class TestAccountController(unittest.TestCase):
resp = getattr(controller, method)(req)
self.assertEquals(resp.status_int, 400)
- proxy_server.http_connect = fake_http_connect(201, 201, 201)
+ set_http_connect(201, 201, 201)
headers = {}
for x in xrange(MAX_META_COUNT):
headers['X-Account-Meta-%d' % x] = 'v'
@@ -4055,7 +3976,7 @@ class TestAccountController(unittest.TestCase):
self.app.update_request(req)
resp = getattr(controller, method)(req)
self.assertEquals(resp.status_int, 201)
- proxy_server.http_connect = fake_http_connect(201, 201, 201)
+ set_http_connect(201, 201, 201)
headers = {}
for x in xrange(MAX_META_COUNT + 1):
headers['X-Account-Meta-%d' % x] = 'v'
@@ -4065,7 +3986,7 @@ class TestAccountController(unittest.TestCase):
resp = getattr(controller, method)(req)
self.assertEquals(resp.status_int, 400)
- proxy_server.http_connect = fake_http_connect(201, 201, 201)
+ set_http_connect(201, 201, 201)
headers = {}
header_value = 'a' * MAX_META_VALUE_LENGTH
size = 0
@@ -4082,7 +4003,7 @@ class TestAccountController(unittest.TestCase):
self.app.update_request(req)
resp = getattr(controller, method)(req)
self.assertEquals(resp.status_int, 201)
- proxy_server.http_connect = fake_http_connect(201, 201, 201)
+ set_http_connect(201, 201, 201)
headers['X-Account-Meta-a'] = \
'a' * (MAX_META_OVERALL_SIZE - size)
req = Request.blank('/a/c', environ={'REQUEST_METHOD': method},
@@ -4096,8 +4017,7 @@ class TestAccountController(unittest.TestCase):
controller = proxy_server.AccountController(self.app, 'account')
def test_status_map(statuses, expected, **kwargs):
- proxy_server.http_connect = \
- fake_http_connect(*statuses, **kwargs)
+ set_http_connect(*statuses, **kwargs)
self.app.memcache.store = {}
req = Request.blank('/a', {'REQUEST_METHOD': 'DELETE'})
req.content_length = 0
@@ -4163,18 +4083,18 @@ class TestSegmentedIterable(unittest.TestCase):
def test_load_next_segment_unexpected_error(self):
# Iterator value isn't a dict
self.assertRaises(Exception,
- proxy_server.SegmentedIterable(self.controller, None,
+ SegmentedIterable(self.controller, None,
[None])._load_next_segment)
self.assert_(self.controller.exception_args[0].startswith(
'ERROR: While processing manifest'))
def test_load_next_segment_with_no_segments(self):
self.assertRaises(StopIteration,
- proxy_server.SegmentedIterable(self.controller, 'lc',
+ SegmentedIterable(self.controller, 'lc',
[])._load_next_segment)
def test_load_next_segment_with_one_segment(self):
- segit = proxy_server.SegmentedIterable(self.controller, 'lc', [{'name':
+ segit = SegmentedIterable(self.controller, 'lc', [{'name':
'o1'}])
segit._load_next_segment()
self.assertEquals(self.controller.GETorHEAD_base_args[4], '/a/lc/o1')
@@ -4182,7 +4102,7 @@ class TestSegmentedIterable(unittest.TestCase):
self.assertEquals(data, '1')
def test_load_next_segment_with_two_segments(self):
- segit = proxy_server.SegmentedIterable(self.controller, 'lc', [{'name':
+ segit = SegmentedIterable(self.controller, 'lc', [{'name':
'o1'}, {'name': 'o2'}])
segit._load_next_segment()
self.assertEquals(self.controller.GETorHEAD_base_args[4], '/a/lc/o1')
@@ -4194,7 +4114,7 @@ class TestSegmentedIterable(unittest.TestCase):
self.assertEquals(data, '22')
def test_load_next_segment_with_two_segments_skip_first(self):
- segit = proxy_server.SegmentedIterable(self.controller, 'lc', [{'name':
+ segit = SegmentedIterable(self.controller, 'lc', [{'name':
'o1'}, {'name': 'o2'}])
segit.segment = 0
segit.listing.next()
@@ -4204,7 +4124,7 @@ class TestSegmentedIterable(unittest.TestCase):
self.assertEquals(data, '22')
def test_load_next_segment_with_seek(self):
- segit = proxy_server.SegmentedIterable(self.controller, 'lc', [{'name':
+ segit = SegmentedIterable(self.controller, 'lc', [{'name':
'o1'}, {'name': 'o2'}])
segit.segment = 0
segit.listing.next()
@@ -4223,7 +4143,7 @@ class TestSegmentedIterable(unittest.TestCase):
self.controller.GETorHEAD_base = local_GETorHEAD_base
self.assertRaises(Exception,
- proxy_server.SegmentedIterable(self.controller, 'lc', [{'name':
+ SegmentedIterable(self.controller, 'lc', [{'name':
'o1'}])._load_next_segment)
self.assert_(self.controller.exception_args[0].startswith(
'ERROR: While processing manifest'))
@@ -4233,22 +4153,22 @@ class TestSegmentedIterable(unittest.TestCase):
def test_iter_unexpected_error(self):
# Iterator value isn't a dict
self.assertRaises(Exception, ''.join,
- proxy_server.SegmentedIterable(self.controller, None, [None]))
+ SegmentedIterable(self.controller, None, [None]))
self.assert_(self.controller.exception_args[0].startswith(
'ERROR: While processing manifest'))
def test_iter_with_no_segments(self):
- segit = proxy_server.SegmentedIterable(self.controller, 'lc', [])
+ segit = SegmentedIterable(self.controller, 'lc', [])
self.assertEquals(''.join(segit), '')
def test_iter_with_one_segment(self):
- segit = proxy_server.SegmentedIterable(self.controller, 'lc', [{'name':
+ segit = SegmentedIterable(self.controller, 'lc', [{'name':
'o1'}])
segit.response = Stub()
self.assertEquals(''.join(segit), '1')
def test_iter_with_two_segments(self):
- segit = proxy_server.SegmentedIterable(self.controller, 'lc', [{'name':
+ segit = SegmentedIterable(self.controller, 'lc', [{'name':
'o1'}, {'name': 'o2'}])
segit.response = Stub()
self.assertEquals(''.join(segit), '122')
@@ -4260,7 +4180,7 @@ class TestSegmentedIterable(unittest.TestCase):
self.controller.GETorHEAD_base = local_GETorHEAD_base
self.assertRaises(Exception, ''.join,
- proxy_server.SegmentedIterable(self.controller, 'lc', [{'name':
+ SegmentedIterable(self.controller, 'lc', [{'name':
'o1'}]))
self.assert_(self.controller.exception_args[0].startswith(
'ERROR: While processing manifest'))
@@ -4270,54 +4190,54 @@ class TestSegmentedIterable(unittest.TestCase):
def test_app_iter_range_unexpected_error(self):
# Iterator value isn't a dict
self.assertRaises(Exception,
- proxy_server.SegmentedIterable(self.controller, None,
+ SegmentedIterable(self.controller, None,
[None]).app_iter_range(None, None).next)
self.assert_(self.controller.exception_args[0].startswith(
'ERROR: While processing manifest'))
def test_app_iter_range_with_no_segments(self):
- self.assertEquals(''.join(proxy_server.SegmentedIterable(
+ self.assertEquals(''.join(SegmentedIterable(
self.controller, 'lc', []).app_iter_range(None, None)), '')
- self.assertEquals(''.join(proxy_server.SegmentedIterable(
+ self.assertEquals(''.join(SegmentedIterable(
self.controller, 'lc', []).app_iter_range(3, None)), '')
- self.assertEquals(''.join(proxy_server.SegmentedIterable(
+ self.assertEquals(''.join(SegmentedIterable(
self.controller, 'lc', []).app_iter_range(3, 5)), '')
- self.assertEquals(''.join(proxy_server.SegmentedIterable(
+ self.assertEquals(''.join(SegmentedIterable(
self.controller, 'lc', []).app_iter_range(None, 5)), '')
def test_app_iter_range_with_one_segment(self):
listing = [{'name': 'o1', 'bytes': 1}]
- segit = proxy_server.SegmentedIterable(self.controller, 'lc', listing)
+ segit = SegmentedIterable(self.controller, 'lc', listing)
segit.response = Stub()
self.assertEquals(''.join(segit.app_iter_range(None, None)), '1')
- segit = proxy_server.SegmentedIterable(self.controller, 'lc', listing)
+ segit = SegmentedIterable(self.controller, 'lc', listing)
self.assertEquals(''.join(segit.app_iter_range(3, None)), '')
- segit = proxy_server.SegmentedIterable(self.controller, 'lc', listing)
+ segit = SegmentedIterable(self.controller, 'lc', listing)
self.assertEquals(''.join(segit.app_iter_range(3, 5)), '')
- segit = proxy_server.SegmentedIterable(self.controller, 'lc', listing)
+ segit = SegmentedIterable(self.controller, 'lc', listing)
segit.response = Stub()
self.assertEquals(''.join(segit.app_iter_range(None, 5)), '1')
def test_app_iter_range_with_two_segments(self):
listing = [{'name': 'o1', 'bytes': 1}, {'name': 'o2', 'bytes': 2}]
- segit = proxy_server.SegmentedIterable(self.controller, 'lc', listing)
+ segit = SegmentedIterable(self.controller, 'lc', listing)
segit.response = Stub()
self.assertEquals(''.join(segit.app_iter_range(None, None)), '122')
- segit = proxy_server.SegmentedIterable(self.controller, 'lc', listing)
+ segit = SegmentedIterable(self.controller, 'lc', listing)
segit.response = Stub()
self.assertEquals(''.join(segit.app_iter_range(1, None)), '22')
- segit = proxy_server.SegmentedIterable(self.controller, 'lc', listing)
+ segit = SegmentedIterable(self.controller, 'lc', listing)
segit.response = Stub()
self.assertEquals(''.join(segit.app_iter_range(1, 5)), '22')
- segit = proxy_server.SegmentedIterable(self.controller, 'lc', listing)
+ segit = SegmentedIterable(self.controller, 'lc', listing)
segit.response = Stub()
self.assertEquals(''.join(segit.app_iter_range(None, 2)), '12')
@@ -4326,33 +4246,33 @@ class TestSegmentedIterable(unittest.TestCase):
{'name': 'o3', 'bytes': 3}, {'name': 'o4', 'bytes': 4}, {'name':
'o5', 'bytes': 5}]
- segit = proxy_server.SegmentedIterable(self.controller, 'lc', listing)
+ segit = SegmentedIterable(self.controller, 'lc', listing)
segit.response = Stub()
self.assertEquals(''.join(segit.app_iter_range(None, None)),
'122333444455555')
- segit = proxy_server.SegmentedIterable(self.controller, 'lc', listing)
+ segit = SegmentedIterable(self.controller, 'lc', listing)
segit.response = Stub()
self.assertEquals(''.join(segit.app_iter_range(3, None)),
'333444455555')
- segit = proxy_server.SegmentedIterable(self.controller, 'lc', listing)
+ segit = SegmentedIterable(self.controller, 'lc', listing)
segit.response = Stub()
self.assertEquals(''.join(segit.app_iter_range(5, None)), '3444455555')
- segit = proxy_server.SegmentedIterable(self.controller, 'lc', listing)
+ segit = SegmentedIterable(self.controller, 'lc', listing)
segit.response = Stub()
self.assertEquals(''.join(segit.app_iter_range(None, 6)), '122333')
- segit = proxy_server.SegmentedIterable(self.controller, 'lc', listing)
+ segit = SegmentedIterable(self.controller, 'lc', listing)
segit.response = Stub()
self.assertEquals(''.join(segit.app_iter_range(None, 7)), '1223334')
- segit = proxy_server.SegmentedIterable(self.controller, 'lc', listing)
+ segit = SegmentedIterable(self.controller, 'lc', listing)
segit.response = Stub()
self.assertEquals(''.join(segit.app_iter_range(3, 7)), '3334')
- segit = proxy_server.SegmentedIterable(self.controller, 'lc', listing)
+ segit = SegmentedIterable(self.controller, 'lc', listing)
segit.response = Stub()
self.assertEquals(''.join(segit.app_iter_range(5, 7)), '34')