summaryrefslogtreecommitdiff
path: root/swift/common
diff options
context:
space:
mode:
authorTim Burke <tim.burke@gmail.com>2020-06-18 09:41:46 -0700
committerTim Burke <tim.burke@gmail.com>2020-06-18 09:41:46 -0700
commit481f126e6b59689599f438e5d27f7328f5b3e813 (patch)
tree14212db13aee782e95ffd36993d74c6bf35df0cb /swift/common
parentb3fd0bd9d82160305a821e742b2cd968036911b2 (diff)
parent51a587ed8dd5700b558ad26d70dcb7facc0f91e4 (diff)
downloadswift-feature/losf.tar.gz
Merge remote-tracking branch 'gerrit/master' into feature/losffeature/losf
Change-Id: If9d7c63f3c4c15fbccff31e2b77a6911bb95972a
Diffstat (limited to 'swift/common')
-rw-r--r--swift/common/db.py53
-rw-r--r--swift/common/memcached.py21
-rw-r--r--swift/common/middleware/memcache.py5
-rw-r--r--swift/common/middleware/ratelimit.py4
-rw-r--r--swift/common/middleware/s3api/controllers/obj.py24
-rw-r--r--swift/common/middleware/symlink.py31
-rw-r--r--swift/common/middleware/versioned_writes/object_versioning.py6
-rw-r--r--swift/common/swob.py27
-rw-r--r--swift/common/utils.py86
9 files changed, 199 insertions, 58 deletions
diff --git a/swift/common/db.py b/swift/common/db.py
index c6df12aa3..e06baf5c6 100644
--- a/swift/common/db.py
+++ b/swift/common/db.py
@@ -53,6 +53,9 @@ PICKLE_PROTOCOL = 2
# records will be merged.
PENDING_CAP = 131072
+SQLITE_ARG_LIMIT = 999
+RECLAIM_PAGE_SIZE = 10000
+
def utf8encode(*args):
return [(s.encode('utf8') if isinstance(s, six.text_type) else s)
@@ -981,16 +984,48 @@ class DatabaseBroker(object):
with lock_parent_directory(self.pending_file,
self.pending_timeout):
self._commit_puts()
- with self.get() as conn:
- self._reclaim(conn, age_timestamp, sync_timestamp)
- self._reclaim_metadata(conn, age_timestamp)
- conn.commit()
+ marker = ''
+ finished = False
+ while not finished:
+ with self.get() as conn:
+ marker = self._reclaim(conn, age_timestamp, marker)
+ if not marker:
+ finished = True
+ self._reclaim_other_stuff(
+ conn, age_timestamp, sync_timestamp)
+ conn.commit()
+
+ def _reclaim_other_stuff(self, conn, age_timestamp, sync_timestamp):
+ """
+ This is only called once at the end of reclaim after _reclaim has been
+ called for each page.
+ """
+ self._reclaim_sync(conn, sync_timestamp)
+ self._reclaim_metadata(conn, age_timestamp)
+
+ def _reclaim(self, conn, age_timestamp, marker):
+ clean_batch_qry = '''
+ DELETE FROM %s WHERE deleted = 1
+ AND name > ? AND %s < ?
+ ''' % (self.db_contains_type, self.db_reclaim_timestamp)
+ curs = conn.execute('''
+ SELECT name FROM %s WHERE deleted = 1
+ AND name > ?
+ ORDER BY NAME LIMIT 1 OFFSET ?
+ ''' % (self.db_contains_type,), (marker, RECLAIM_PAGE_SIZE))
+ row = curs.fetchone()
+ if row:
+ # do a single book-ended DELETE and bounce out
+ end_marker = row[0]
+ conn.execute(clean_batch_qry + ' AND name <= ?', (
+ marker, age_timestamp, end_marker))
+ else:
+ # delete off the end and reset marker to indicate we're done
+ end_marker = ''
+ conn.execute(clean_batch_qry, (marker, age_timestamp))
+ return end_marker
- def _reclaim(self, conn, age_timestamp, sync_timestamp):
- conn.execute('''
- DELETE FROM %s WHERE deleted = 1 AND %s < ?
- ''' % (self.db_contains_type, self.db_reclaim_timestamp),
- (age_timestamp,))
+ def _reclaim_sync(self, conn, sync_timestamp):
try:
conn.execute('''
DELETE FROM outgoing_sync WHERE updated_at < ?
diff --git a/swift/common/memcached.py b/swift/common/memcached.py
index a80fa0fb6..08da5c7ba 100644
--- a/swift/common/memcached.py
+++ b/swift/common/memcached.py
@@ -160,7 +160,7 @@ class MemcacheRing(object):
def __init__(self, servers, connect_timeout=CONN_TIMEOUT,
io_timeout=IO_TIMEOUT, pool_timeout=POOL_TIMEOUT,
tries=TRY_COUNT, allow_pickle=False, allow_unpickle=False,
- max_conns=2):
+ max_conns=2, logger=None):
self._ring = {}
self._errors = dict(((serv, []) for serv in servers))
self._error_limited = dict(((serv, 0) for serv in servers))
@@ -178,18 +178,23 @@ class MemcacheRing(object):
self._pool_timeout = pool_timeout
self._allow_pickle = allow_pickle
self._allow_unpickle = allow_unpickle or allow_pickle
+ if logger is None:
+ self.logger = logging.getLogger()
+ else:
+ self.logger = logger
def _exception_occurred(self, server, e, action='talking',
sock=None, fp=None, got_connection=True):
if isinstance(e, Timeout):
- logging.error("Timeout %(action)s to memcached: %(server)s",
- {'action': action, 'server': server})
+ self.logger.error("Timeout %(action)s to memcached: %(server)s",
+ {'action': action, 'server': server})
elif isinstance(e, (socket.error, MemcacheConnectionError)):
- logging.error("Error %(action)s to memcached: %(server)s: %(err)s",
- {'action': action, 'server': server, 'err': e})
+ self.logger.error(
+ "Error %(action)s to memcached: %(server)s: %(err)s",
+ {'action': action, 'server': server, 'err': e})
else:
- logging.exception("Error %(action)s to memcached: %(server)s",
- {'action': action, 'server': server})
+ self.logger.exception("Error %(action)s to memcached: %(server)s",
+ {'action': action, 'server': server})
try:
if fp:
fp.close()
@@ -213,7 +218,7 @@ class MemcacheRing(object):
if err > now - ERROR_LIMIT_TIME]
if len(self._errors[server]) > ERROR_LIMIT_COUNT:
self._error_limited[server] = now + ERROR_LIMIT_DURATION
- logging.error('Error limiting server %s', server)
+ self.logger.error('Error limiting server %s', server)
def _get_conns(self, key):
"""
diff --git a/swift/common/middleware/memcache.py b/swift/common/middleware/memcache.py
index e846749cb..b5b9569a5 100644
--- a/swift/common/middleware/memcache.py
+++ b/swift/common/middleware/memcache.py
@@ -19,6 +19,7 @@ from six.moves.configparser import ConfigParser, NoSectionError, NoOptionError
from swift.common.memcached import (MemcacheRing, CONN_TIMEOUT, POOL_TIMEOUT,
IO_TIMEOUT, TRY_COUNT)
+from swift.common.utils import get_logger
class MemcacheMiddleware(object):
@@ -28,6 +29,7 @@ class MemcacheMiddleware(object):
def __init__(self, app, conf):
self.app = app
+ self.logger = get_logger(conf, log_route='memcache')
self.memcache_servers = conf.get('memcache_servers')
serialization_format = conf.get('memcache_serialization_support')
try:
@@ -102,7 +104,8 @@ class MemcacheMiddleware(object):
io_timeout=io_timeout,
allow_pickle=(serialization_format == 0),
allow_unpickle=(serialization_format <= 1),
- max_conns=max_conns)
+ max_conns=max_conns,
+ logger=self.logger)
def __call__(self, env, start_response):
env['swift.cache'] = self.memcache
diff --git a/swift/common/middleware/ratelimit.py b/swift/common/middleware/ratelimit.py
index 72e1d6a40..9d3ff2fdd 100644
--- a/swift/common/middleware/ratelimit.py
+++ b/swift/common/middleware/ratelimit.py
@@ -242,6 +242,10 @@ class RateLimitMiddleware(object):
if not self.memcache_client:
return None
+ if req.environ.get('swift.ratelimit.handled'):
+ return None
+ req.environ['swift.ratelimit.handled'] = True
+
try:
account_info = get_account_info(req.environ, self.app,
swift_source='RL')
diff --git a/swift/common/middleware/s3api/controllers/obj.py b/swift/common/middleware/s3api/controllers/obj.py
index 293b14702..716c837b6 100644
--- a/swift/common/middleware/s3api/controllers/obj.py
+++ b/swift/common/middleware/s3api/controllers/obj.py
@@ -26,7 +26,7 @@ from swift.common.middleware.versioned_writes.object_versioning import \
from swift.common.middleware.s3api.utils import S3Timestamp, sysmeta_header
from swift.common.middleware.s3api.controllers.base import Controller
from swift.common.middleware.s3api.s3response import S3NotImplemented, \
- InvalidRange, NoSuchKey, InvalidArgument, HTTPNoContent, \
+ InvalidRange, NoSuchKey, NoSuchVersion, InvalidArgument, HTTPNoContent, \
PreconditionFailed
@@ -88,7 +88,15 @@ class ObjectController(Controller):
if version_id not in ('null', None) and \
'object_versioning' not in get_swift_info():
raise S3NotImplemented()
+
query = {} if version_id is None else {'version-id': version_id}
+ if version_id not in ('null', None):
+ container_info = req.get_container_info(self.app)
+ if not container_info.get(
+ 'sysmeta', {}).get('versions-container', ''):
+ # Versioning has never been enabled
+ raise NoSuchVersion(object_name, version_id)
+
resp = req.get_response(self.app, query=query)
if req.method == 'HEAD':
@@ -193,17 +201,25 @@ class ObjectController(Controller):
'object_versioning' not in get_swift_info():
raise S3NotImplemented()
+ version_id = req.params.get('versionId')
+ if version_id not in ('null', None):
+ container_info = req.get_container_info(self.app)
+ if not container_info.get(
+ 'sysmeta', {}).get('versions-container', ''):
+ # Versioning has never been enabled
+ return HTTPNoContent(headers={'x-amz-version-id': version_id})
+
try:
try:
query = req.gen_multipart_manifest_delete_query(
- self.app, version=req.params.get('versionId'))
+ self.app, version=version_id)
except NoSuchKey:
query = {}
req.headers['Content-Type'] = None # Ignore client content-type
- if 'versionId' in req.params:
- query['version-id'] = req.params['versionId']
+ if version_id is not None:
+ query['version-id'] = version_id
query['symlink'] = 'get'
resp = req.get_response(self.app, query=query)
diff --git a/swift/common/middleware/symlink.py b/swift/common/middleware/symlink.py
index d2c644438..bde163aa0 100644
--- a/swift/common/middleware/symlink.py
+++ b/swift/common/middleware/symlink.py
@@ -205,7 +205,8 @@ from swift.common.utils import get_logger, register_swift_info, split_path, \
MD5_OF_EMPTY_STRING, close_if_possible, closing_if_possible, \
config_true_value, drain_and_close
from swift.common.constraints import check_account_format
-from swift.common.wsgi import WSGIContext, make_subrequest
+from swift.common.wsgi import WSGIContext, make_subrequest, \
+ make_pre_authed_request
from swift.common.request_helpers import get_sys_meta_prefix, \
check_path_header, get_container_update_override_key, \
update_ignore_range_header
@@ -442,7 +443,9 @@ class SymlinkObjectContext(WSGIContext):
content_type='text/plain')
def _recursive_get_head(self, req, target_etag=None,
- follow_softlinks=True):
+ follow_softlinks=True, orig_req=None):
+ if not orig_req:
+ orig_req = req
resp = self._app_call(req.environ)
def build_traversal_req(symlink_target):
@@ -457,9 +460,20 @@ class SymlinkObjectContext(WSGIContext):
'/', version, account,
symlink_target.lstrip('/'))
self._last_target_path = target_path
- new_req = make_subrequest(
- req.environ, path=target_path, method=req.method,
- headers=req.headers, swift_source='SYM')
+
+ subreq_headers = dict(req.headers)
+ if self._response_header_value(ALLOW_RESERVED_NAMES):
+ # this symlink's sysmeta says it can point to reserved names,
+ # we're infering that some piece of middleware had previously
+ # authorized this request because users can't access reserved
+ # names directly
+ subreq_meth = make_pre_authed_request
+ subreq_headers['X-Backend-Allow-Reserved-Names'] = 'true'
+ else:
+ subreq_meth = make_subrequest
+ new_req = subreq_meth(orig_req.environ, path=target_path,
+ method=req.method, headers=subreq_headers,
+ swift_source='SYM')
new_req.headers.pop('X-Backend-Storage-Policy-Index', None)
return new_req
@@ -484,11 +498,8 @@ class SymlinkObjectContext(WSGIContext):
if not config_true_value(
self._response_header_value(SYMLOOP_EXTEND)):
self._loop_count += 1
- if config_true_value(
- self._response_header_value(ALLOW_RESERVED_NAMES)):
- new_req.headers['X-Backend-Allow-Reserved-Names'] = 'true'
-
- return self._recursive_get_head(new_req, target_etag=resp_etag)
+ return self._recursive_get_head(new_req, target_etag=resp_etag,
+ orig_req=req)
else:
final_etag = self._response_header_value('etag')
if final_etag and target_etag and target_etag != final_etag:
diff --git a/swift/common/middleware/versioned_writes/object_versioning.py b/swift/common/middleware/versioned_writes/object_versioning.py
index 5c9b72d5c..508972f72 100644
--- a/swift/common/middleware/versioned_writes/object_versioning.py
+++ b/swift/common/middleware/versioned_writes/object_versioning.py
@@ -152,7 +152,7 @@ from cgi import parse_header
from six.moves.urllib.parse import unquote
from swift.common.constraints import MAX_FILE_SIZE, valid_api_version, \
- ACCOUNT_LISTING_LIMIT
+ ACCOUNT_LISTING_LIMIT, CONTAINER_LISTING_LIMIT
from swift.common.http import is_success, is_client_error, HTTP_NOT_FOUND, \
HTTP_CONFLICT
from swift.common.request_helpers import get_sys_meta_prefix, \
@@ -1191,7 +1191,7 @@ class ContainerContext(ObjectVersioningContext):
'hash': item['hash'],
'last_modified': item['last_modified'],
})
- limit = constrain_req_limit(req, ACCOUNT_LISTING_LIMIT)
+ limit = constrain_req_limit(req, CONTAINER_LISTING_LIMIT)
body = build_listing(
null_listing, subdir_listing, broken_listing,
reverse=config_true_value(params.get('reverse', 'no')),
@@ -1256,7 +1256,7 @@ class ContainerContext(ObjectVersioningContext):
'last_modified': item['last_modified'],
})
- limit = constrain_req_limit(req, ACCOUNT_LISTING_LIMIT)
+ limit = constrain_req_limit(req, CONTAINER_LISTING_LIMIT)
body = build_listing(
null_listing, versions_listing,
subdir_listing, broken_listing,
diff --git a/swift/common/swob.py b/swift/common/swob.py
index 61b66793c..76fb2fbc9 100644
--- a/swift/common/swob.py
+++ b/swift/common/swob.py
@@ -43,7 +43,6 @@ from email.utils import parsedate
import re
import random
import functools
-import inspect
from io import BytesIO
import six
@@ -1563,23 +1562,15 @@ def wsgify(func):
return a Response object into WSGI callables. Also catches any raised
HTTPExceptions and treats them as a returned Response.
"""
- argspec = inspect.getargspec(func)
- if argspec.args and argspec.args[0] == 'self':
- @functools.wraps(func)
- def _wsgify_self(self, env, start_response):
- try:
- return func(self, Request(env))(env, start_response)
- except HTTPException as err_resp:
- return err_resp(env, start_response)
- return _wsgify_self
- else:
- @functools.wraps(func)
- def _wsgify_bare(env, start_response):
- try:
- return func(Request(env))(env, start_response)
- except HTTPException as err_resp:
- return err_resp(env, start_response)
- return _wsgify_bare
+ @functools.wraps(func)
+ def _wsgify(*args):
+ env, start_response = args[-2:]
+ new_args = args[:-2] + (Request(env), )
+ try:
+ return func(*new_args)(env, start_response)
+ except HTTPException as err_resp:
+ return err_resp(env, start_response)
+ return _wsgify
class StatusMap(object):
diff --git a/swift/common/utils.py b/swift/common/utils.py
index df8713d3a..23a137e6c 100644
--- a/swift/common/utils.py
+++ b/swift/common/utils.py
@@ -3152,11 +3152,26 @@ def remove_directory(path):
def audit_location_generator(devices, datadir, suffix='',
- mount_check=True, logger=None):
+ mount_check=True, logger=None,
+ devices_filter=None, partitions_filter=None,
+ suffixes_filter=None, hashes_filter=None,
+ hook_pre_device=None, hook_post_device=None,
+ hook_pre_partition=None, hook_post_partition=None,
+ hook_pre_suffix=None, hook_post_suffix=None,
+ hook_pre_hash=None, hook_post_hash=None):
"""
Given a devices path and a data directory, yield (path, device,
partition) for all files in that directory
+ (devices|partitions|suffixes|hashes)_filter are meant to modify the list of
+ elements that will be iterated. eg: they can be used to exclude some
+ elements based on a custom condition defined by the caller.
+
+ hook_pre_(device|partition|suffix|hash) are called before yielding the
+ element, hook_pos_(device|partition|suffix|hash) are called after the
+ element was yielded. They are meant to do some pre/post processing.
+ eg: saving a progress status.
+
:param devices: parent directory of the devices to be audited
:param datadir: a directory located under self.devices. This should be
one of the DATADIR constants defined in the account,
@@ -3165,11 +3180,31 @@ def audit_location_generator(devices, datadir, suffix='',
:param mount_check: Flag to check if a mount check should be performed
on devices
:param logger: a logger object
+ :devices_filter: a callable taking (devices, [list of devices]) as
+ parameters and returning a [list of devices]
+ :partitions_filter: a callable taking (datadir_path, [list of parts]) as
+ parameters and returning a [list of parts]
+ :suffixes_filter: a callable taking (part_path, [list of suffixes]) as
+ parameters and returning a [list of suffixes]
+ :hashes_filter: a callable taking (suff_path, [list of hashes]) as
+ parameters and returning a [list of hashes]
+ :hook_pre_device: a callable taking device_path as parameter
+ :hook_post_device: a callable taking device_path as parameter
+ :hook_pre_partition: a callable taking part_path as parameter
+ :hook_post_partition: a callable taking part_path as parameter
+ :hook_pre_suffix: a callable taking suff_path as parameter
+ :hook_post_suffix: a callable taking suff_path as parameter
+ :hook_pre_hash: a callable taking hash_path as parameter
+ :hook_post_hash: a callable taking hash_path as parameter
"""
device_dir = listdir(devices)
# randomize devices in case of process restart before sweep completed
shuffle(device_dir)
+ if devices_filter:
+ device_dir = devices_filter(devices, device_dir)
for device in device_dir:
+ if hook_pre_device:
+ hook_pre_device(os.path.join(devices, device))
if mount_check and not ismount(os.path.join(devices, device)):
if logger:
logger.warning(
@@ -3183,24 +3218,36 @@ def audit_location_generator(devices, datadir, suffix='',
logger.warning(_('Skipping %(datadir)s because %(err)s'),
{'datadir': datadir_path, 'err': e})
continue
+ if partitions_filter:
+ partitions = partitions_filter(datadir_path, partitions)
for partition in partitions:
part_path = os.path.join(datadir_path, partition)
+ if hook_pre_partition:
+ hook_pre_partition(part_path)
try:
suffixes = listdir(part_path)
except OSError as e:
if e.errno != errno.ENOTDIR:
raise
continue
+ if suffixes_filter:
+ suffixes = suffixes_filter(part_path, suffixes)
for asuffix in suffixes:
suff_path = os.path.join(part_path, asuffix)
+ if hook_pre_suffix:
+ hook_pre_suffix(suff_path)
try:
hashes = listdir(suff_path)
except OSError as e:
if e.errno != errno.ENOTDIR:
raise
continue
+ if hashes_filter:
+ hashes = hashes_filter(suff_path, hashes)
for hsh in hashes:
hash_path = os.path.join(suff_path, hsh)
+ if hook_pre_hash:
+ hook_pre_hash(hash_path)
try:
files = sorted(listdir(hash_path), reverse=True)
except OSError as e:
@@ -3212,6 +3259,14 @@ def audit_location_generator(devices, datadir, suffix='',
continue
path = os.path.join(hash_path, fname)
yield path, device, partition
+ if hook_post_hash:
+ hook_post_hash(hash_path)
+ if hook_post_suffix:
+ hook_post_suffix(suff_path)
+ if hook_post_partition:
+ hook_post_partition(part_path)
+ if hook_post_device:
+ hook_post_device(os.path.join(devices, device))
def ratelimit_sleep(running_time, max_rate, incr_by=1, rate_buffer=5):
@@ -4814,6 +4869,8 @@ class ShardRange(object):
value.
:param epoch: optional epoch timestamp which represents the time at which
sharding was enabled for a container.
+ :param reported: optional indicator that this shard and its stats have
+ been reported to the root container.
"""
FOUND = 10
CREATED = 20
@@ -4864,7 +4921,8 @@ class ShardRange(object):
def __init__(self, name, timestamp, lower=MIN, upper=MAX,
object_count=0, bytes_used=0, meta_timestamp=None,
- deleted=False, state=None, state_timestamp=None, epoch=None):
+ deleted=False, state=None, state_timestamp=None, epoch=None,
+ reported=False):
self.account = self.container = self._timestamp = \
self._meta_timestamp = self._state_timestamp = self._epoch = None
self._lower = ShardRange.MIN
@@ -4883,6 +4941,7 @@ class ShardRange(object):
self.state = self.FOUND if state is None else state
self.state_timestamp = state_timestamp
self.epoch = epoch
+ self.reported = reported
@classmethod
def _encode(cls, value):
@@ -5063,8 +5122,14 @@ class ShardRange(object):
cast to an int, or if meta_timestamp is neither None nor can be
cast to a :class:`~swift.common.utils.Timestamp`.
"""
- self.object_count = int(object_count)
- self.bytes_used = int(bytes_used)
+ if self.object_count != int(object_count):
+ self.object_count = int(object_count)
+ self.reported = False
+
+ if self.bytes_used != int(bytes_used):
+ self.bytes_used = int(bytes_used)
+ self.reported = False
+
if meta_timestamp is None:
self.meta_timestamp = Timestamp.now()
else:
@@ -5145,6 +5210,14 @@ class ShardRange(object):
def epoch(self, epoch):
self._epoch = self._to_timestamp(epoch)
+ @property
+ def reported(self):
+ return self._reported
+
+ @reported.setter
+ def reported(self, value):
+ self._reported = bool(value)
+
def update_state(self, state, state_timestamp=None):
"""
Set state to the given value and optionally update the state_timestamp
@@ -5161,6 +5234,7 @@ class ShardRange(object):
self.state = state
if state_timestamp is not None:
self.state_timestamp = state_timestamp
+ self.reported = False
return True
@property
@@ -5283,6 +5357,7 @@ class ShardRange(object):
yield 'state', self.state
yield 'state_timestamp', self.state_timestamp.internal
yield 'epoch', self.epoch.internal if self.epoch is not None else None
+ yield 'reported', 1 if self.reported else 0
def copy(self, timestamp=None, **kwargs):
"""
@@ -5314,7 +5389,8 @@ class ShardRange(object):
params['name'], params['timestamp'], params['lower'],
params['upper'], params['object_count'], params['bytes_used'],
params['meta_timestamp'], params['deleted'], params['state'],
- params['state_timestamp'], params['epoch'])
+ params['state_timestamp'], params['epoch'],
+ params.get('reported', 0))
def find_shard_range(item, ranges):