diff options
author | Tim Burke <tim.burke@gmail.com> | 2020-06-18 09:41:46 -0700 |
---|---|---|
committer | Tim Burke <tim.burke@gmail.com> | 2020-06-18 09:41:46 -0700 |
commit | 481f126e6b59689599f438e5d27f7328f5b3e813 (patch) | |
tree | 14212db13aee782e95ffd36993d74c6bf35df0cb /swift/common | |
parent | b3fd0bd9d82160305a821e742b2cd968036911b2 (diff) | |
parent | 51a587ed8dd5700b558ad26d70dcb7facc0f91e4 (diff) | |
download | swift-feature/losf.tar.gz |
Merge remote-tracking branch 'gerrit/master' into feature/losffeature/losf
Change-Id: If9d7c63f3c4c15fbccff31e2b77a6911bb95972a
Diffstat (limited to 'swift/common')
-rw-r--r-- | swift/common/db.py | 53 | ||||
-rw-r--r-- | swift/common/memcached.py | 21 | ||||
-rw-r--r-- | swift/common/middleware/memcache.py | 5 | ||||
-rw-r--r-- | swift/common/middleware/ratelimit.py | 4 | ||||
-rw-r--r-- | swift/common/middleware/s3api/controllers/obj.py | 24 | ||||
-rw-r--r-- | swift/common/middleware/symlink.py | 31 | ||||
-rw-r--r-- | swift/common/middleware/versioned_writes/object_versioning.py | 6 | ||||
-rw-r--r-- | swift/common/swob.py | 27 | ||||
-rw-r--r-- | swift/common/utils.py | 86 |
9 files changed, 199 insertions, 58 deletions
diff --git a/swift/common/db.py b/swift/common/db.py index c6df12aa3..e06baf5c6 100644 --- a/swift/common/db.py +++ b/swift/common/db.py @@ -53,6 +53,9 @@ PICKLE_PROTOCOL = 2 # records will be merged. PENDING_CAP = 131072 +SQLITE_ARG_LIMIT = 999 +RECLAIM_PAGE_SIZE = 10000 + def utf8encode(*args): return [(s.encode('utf8') if isinstance(s, six.text_type) else s) @@ -981,16 +984,48 @@ class DatabaseBroker(object): with lock_parent_directory(self.pending_file, self.pending_timeout): self._commit_puts() - with self.get() as conn: - self._reclaim(conn, age_timestamp, sync_timestamp) - self._reclaim_metadata(conn, age_timestamp) - conn.commit() + marker = '' + finished = False + while not finished: + with self.get() as conn: + marker = self._reclaim(conn, age_timestamp, marker) + if not marker: + finished = True + self._reclaim_other_stuff( + conn, age_timestamp, sync_timestamp) + conn.commit() + + def _reclaim_other_stuff(self, conn, age_timestamp, sync_timestamp): + """ + This is only called once at the end of reclaim after _reclaim has been + called for each page. + """ + self._reclaim_sync(conn, sync_timestamp) + self._reclaim_metadata(conn, age_timestamp) + + def _reclaim(self, conn, age_timestamp, marker): + clean_batch_qry = ''' + DELETE FROM %s WHERE deleted = 1 + AND name > ? AND %s < ? + ''' % (self.db_contains_type, self.db_reclaim_timestamp) + curs = conn.execute(''' + SELECT name FROM %s WHERE deleted = 1 + AND name > ? + ORDER BY NAME LIMIT 1 OFFSET ? + ''' % (self.db_contains_type,), (marker, RECLAIM_PAGE_SIZE)) + row = curs.fetchone() + if row: + # do a single book-ended DELETE and bounce out + end_marker = row[0] + conn.execute(clean_batch_qry + ' AND name <= ?', ( + marker, age_timestamp, end_marker)) + else: + # delete off the end and reset marker to indicate we're done + end_marker = '' + conn.execute(clean_batch_qry, (marker, age_timestamp)) + return end_marker - def _reclaim(self, conn, age_timestamp, sync_timestamp): - conn.execute(''' - DELETE FROM %s WHERE deleted = 1 AND %s < ? - ''' % (self.db_contains_type, self.db_reclaim_timestamp), - (age_timestamp,)) + def _reclaim_sync(self, conn, sync_timestamp): try: conn.execute(''' DELETE FROM outgoing_sync WHERE updated_at < ? diff --git a/swift/common/memcached.py b/swift/common/memcached.py index a80fa0fb6..08da5c7ba 100644 --- a/swift/common/memcached.py +++ b/swift/common/memcached.py @@ -160,7 +160,7 @@ class MemcacheRing(object): def __init__(self, servers, connect_timeout=CONN_TIMEOUT, io_timeout=IO_TIMEOUT, pool_timeout=POOL_TIMEOUT, tries=TRY_COUNT, allow_pickle=False, allow_unpickle=False, - max_conns=2): + max_conns=2, logger=None): self._ring = {} self._errors = dict(((serv, []) for serv in servers)) self._error_limited = dict(((serv, 0) for serv in servers)) @@ -178,18 +178,23 @@ class MemcacheRing(object): self._pool_timeout = pool_timeout self._allow_pickle = allow_pickle self._allow_unpickle = allow_unpickle or allow_pickle + if logger is None: + self.logger = logging.getLogger() + else: + self.logger = logger def _exception_occurred(self, server, e, action='talking', sock=None, fp=None, got_connection=True): if isinstance(e, Timeout): - logging.error("Timeout %(action)s to memcached: %(server)s", - {'action': action, 'server': server}) + self.logger.error("Timeout %(action)s to memcached: %(server)s", + {'action': action, 'server': server}) elif isinstance(e, (socket.error, MemcacheConnectionError)): - logging.error("Error %(action)s to memcached: %(server)s: %(err)s", - {'action': action, 'server': server, 'err': e}) + self.logger.error( + "Error %(action)s to memcached: %(server)s: %(err)s", + {'action': action, 'server': server, 'err': e}) else: - logging.exception("Error %(action)s to memcached: %(server)s", - {'action': action, 'server': server}) + self.logger.exception("Error %(action)s to memcached: %(server)s", + {'action': action, 'server': server}) try: if fp: fp.close() @@ -213,7 +218,7 @@ class MemcacheRing(object): if err > now - ERROR_LIMIT_TIME] if len(self._errors[server]) > ERROR_LIMIT_COUNT: self._error_limited[server] = now + ERROR_LIMIT_DURATION - logging.error('Error limiting server %s', server) + self.logger.error('Error limiting server %s', server) def _get_conns(self, key): """ diff --git a/swift/common/middleware/memcache.py b/swift/common/middleware/memcache.py index e846749cb..b5b9569a5 100644 --- a/swift/common/middleware/memcache.py +++ b/swift/common/middleware/memcache.py @@ -19,6 +19,7 @@ from six.moves.configparser import ConfigParser, NoSectionError, NoOptionError from swift.common.memcached import (MemcacheRing, CONN_TIMEOUT, POOL_TIMEOUT, IO_TIMEOUT, TRY_COUNT) +from swift.common.utils import get_logger class MemcacheMiddleware(object): @@ -28,6 +29,7 @@ class MemcacheMiddleware(object): def __init__(self, app, conf): self.app = app + self.logger = get_logger(conf, log_route='memcache') self.memcache_servers = conf.get('memcache_servers') serialization_format = conf.get('memcache_serialization_support') try: @@ -102,7 +104,8 @@ class MemcacheMiddleware(object): io_timeout=io_timeout, allow_pickle=(serialization_format == 0), allow_unpickle=(serialization_format <= 1), - max_conns=max_conns) + max_conns=max_conns, + logger=self.logger) def __call__(self, env, start_response): env['swift.cache'] = self.memcache diff --git a/swift/common/middleware/ratelimit.py b/swift/common/middleware/ratelimit.py index 72e1d6a40..9d3ff2fdd 100644 --- a/swift/common/middleware/ratelimit.py +++ b/swift/common/middleware/ratelimit.py @@ -242,6 +242,10 @@ class RateLimitMiddleware(object): if not self.memcache_client: return None + if req.environ.get('swift.ratelimit.handled'): + return None + req.environ['swift.ratelimit.handled'] = True + try: account_info = get_account_info(req.environ, self.app, swift_source='RL') diff --git a/swift/common/middleware/s3api/controllers/obj.py b/swift/common/middleware/s3api/controllers/obj.py index 293b14702..716c837b6 100644 --- a/swift/common/middleware/s3api/controllers/obj.py +++ b/swift/common/middleware/s3api/controllers/obj.py @@ -26,7 +26,7 @@ from swift.common.middleware.versioned_writes.object_versioning import \ from swift.common.middleware.s3api.utils import S3Timestamp, sysmeta_header from swift.common.middleware.s3api.controllers.base import Controller from swift.common.middleware.s3api.s3response import S3NotImplemented, \ - InvalidRange, NoSuchKey, InvalidArgument, HTTPNoContent, \ + InvalidRange, NoSuchKey, NoSuchVersion, InvalidArgument, HTTPNoContent, \ PreconditionFailed @@ -88,7 +88,15 @@ class ObjectController(Controller): if version_id not in ('null', None) and \ 'object_versioning' not in get_swift_info(): raise S3NotImplemented() + query = {} if version_id is None else {'version-id': version_id} + if version_id not in ('null', None): + container_info = req.get_container_info(self.app) + if not container_info.get( + 'sysmeta', {}).get('versions-container', ''): + # Versioning has never been enabled + raise NoSuchVersion(object_name, version_id) + resp = req.get_response(self.app, query=query) if req.method == 'HEAD': @@ -193,17 +201,25 @@ class ObjectController(Controller): 'object_versioning' not in get_swift_info(): raise S3NotImplemented() + version_id = req.params.get('versionId') + if version_id not in ('null', None): + container_info = req.get_container_info(self.app) + if not container_info.get( + 'sysmeta', {}).get('versions-container', ''): + # Versioning has never been enabled + return HTTPNoContent(headers={'x-amz-version-id': version_id}) + try: try: query = req.gen_multipart_manifest_delete_query( - self.app, version=req.params.get('versionId')) + self.app, version=version_id) except NoSuchKey: query = {} req.headers['Content-Type'] = None # Ignore client content-type - if 'versionId' in req.params: - query['version-id'] = req.params['versionId'] + if version_id is not None: + query['version-id'] = version_id query['symlink'] = 'get' resp = req.get_response(self.app, query=query) diff --git a/swift/common/middleware/symlink.py b/swift/common/middleware/symlink.py index d2c644438..bde163aa0 100644 --- a/swift/common/middleware/symlink.py +++ b/swift/common/middleware/symlink.py @@ -205,7 +205,8 @@ from swift.common.utils import get_logger, register_swift_info, split_path, \ MD5_OF_EMPTY_STRING, close_if_possible, closing_if_possible, \ config_true_value, drain_and_close from swift.common.constraints import check_account_format -from swift.common.wsgi import WSGIContext, make_subrequest +from swift.common.wsgi import WSGIContext, make_subrequest, \ + make_pre_authed_request from swift.common.request_helpers import get_sys_meta_prefix, \ check_path_header, get_container_update_override_key, \ update_ignore_range_header @@ -442,7 +443,9 @@ class SymlinkObjectContext(WSGIContext): content_type='text/plain') def _recursive_get_head(self, req, target_etag=None, - follow_softlinks=True): + follow_softlinks=True, orig_req=None): + if not orig_req: + orig_req = req resp = self._app_call(req.environ) def build_traversal_req(symlink_target): @@ -457,9 +460,20 @@ class SymlinkObjectContext(WSGIContext): '/', version, account, symlink_target.lstrip('/')) self._last_target_path = target_path - new_req = make_subrequest( - req.environ, path=target_path, method=req.method, - headers=req.headers, swift_source='SYM') + + subreq_headers = dict(req.headers) + if self._response_header_value(ALLOW_RESERVED_NAMES): + # this symlink's sysmeta says it can point to reserved names, + # we're infering that some piece of middleware had previously + # authorized this request because users can't access reserved + # names directly + subreq_meth = make_pre_authed_request + subreq_headers['X-Backend-Allow-Reserved-Names'] = 'true' + else: + subreq_meth = make_subrequest + new_req = subreq_meth(orig_req.environ, path=target_path, + method=req.method, headers=subreq_headers, + swift_source='SYM') new_req.headers.pop('X-Backend-Storage-Policy-Index', None) return new_req @@ -484,11 +498,8 @@ class SymlinkObjectContext(WSGIContext): if not config_true_value( self._response_header_value(SYMLOOP_EXTEND)): self._loop_count += 1 - if config_true_value( - self._response_header_value(ALLOW_RESERVED_NAMES)): - new_req.headers['X-Backend-Allow-Reserved-Names'] = 'true' - - return self._recursive_get_head(new_req, target_etag=resp_etag) + return self._recursive_get_head(new_req, target_etag=resp_etag, + orig_req=req) else: final_etag = self._response_header_value('etag') if final_etag and target_etag and target_etag != final_etag: diff --git a/swift/common/middleware/versioned_writes/object_versioning.py b/swift/common/middleware/versioned_writes/object_versioning.py index 5c9b72d5c..508972f72 100644 --- a/swift/common/middleware/versioned_writes/object_versioning.py +++ b/swift/common/middleware/versioned_writes/object_versioning.py @@ -152,7 +152,7 @@ from cgi import parse_header from six.moves.urllib.parse import unquote from swift.common.constraints import MAX_FILE_SIZE, valid_api_version, \ - ACCOUNT_LISTING_LIMIT + ACCOUNT_LISTING_LIMIT, CONTAINER_LISTING_LIMIT from swift.common.http import is_success, is_client_error, HTTP_NOT_FOUND, \ HTTP_CONFLICT from swift.common.request_helpers import get_sys_meta_prefix, \ @@ -1191,7 +1191,7 @@ class ContainerContext(ObjectVersioningContext): 'hash': item['hash'], 'last_modified': item['last_modified'], }) - limit = constrain_req_limit(req, ACCOUNT_LISTING_LIMIT) + limit = constrain_req_limit(req, CONTAINER_LISTING_LIMIT) body = build_listing( null_listing, subdir_listing, broken_listing, reverse=config_true_value(params.get('reverse', 'no')), @@ -1256,7 +1256,7 @@ class ContainerContext(ObjectVersioningContext): 'last_modified': item['last_modified'], }) - limit = constrain_req_limit(req, ACCOUNT_LISTING_LIMIT) + limit = constrain_req_limit(req, CONTAINER_LISTING_LIMIT) body = build_listing( null_listing, versions_listing, subdir_listing, broken_listing, diff --git a/swift/common/swob.py b/swift/common/swob.py index 61b66793c..76fb2fbc9 100644 --- a/swift/common/swob.py +++ b/swift/common/swob.py @@ -43,7 +43,6 @@ from email.utils import parsedate import re import random import functools -import inspect from io import BytesIO import six @@ -1563,23 +1562,15 @@ def wsgify(func): return a Response object into WSGI callables. Also catches any raised HTTPExceptions and treats them as a returned Response. """ - argspec = inspect.getargspec(func) - if argspec.args and argspec.args[0] == 'self': - @functools.wraps(func) - def _wsgify_self(self, env, start_response): - try: - return func(self, Request(env))(env, start_response) - except HTTPException as err_resp: - return err_resp(env, start_response) - return _wsgify_self - else: - @functools.wraps(func) - def _wsgify_bare(env, start_response): - try: - return func(Request(env))(env, start_response) - except HTTPException as err_resp: - return err_resp(env, start_response) - return _wsgify_bare + @functools.wraps(func) + def _wsgify(*args): + env, start_response = args[-2:] + new_args = args[:-2] + (Request(env), ) + try: + return func(*new_args)(env, start_response) + except HTTPException as err_resp: + return err_resp(env, start_response) + return _wsgify class StatusMap(object): diff --git a/swift/common/utils.py b/swift/common/utils.py index df8713d3a..23a137e6c 100644 --- a/swift/common/utils.py +++ b/swift/common/utils.py @@ -3152,11 +3152,26 @@ def remove_directory(path): def audit_location_generator(devices, datadir, suffix='', - mount_check=True, logger=None): + mount_check=True, logger=None, + devices_filter=None, partitions_filter=None, + suffixes_filter=None, hashes_filter=None, + hook_pre_device=None, hook_post_device=None, + hook_pre_partition=None, hook_post_partition=None, + hook_pre_suffix=None, hook_post_suffix=None, + hook_pre_hash=None, hook_post_hash=None): """ Given a devices path and a data directory, yield (path, device, partition) for all files in that directory + (devices|partitions|suffixes|hashes)_filter are meant to modify the list of + elements that will be iterated. eg: they can be used to exclude some + elements based on a custom condition defined by the caller. + + hook_pre_(device|partition|suffix|hash) are called before yielding the + element, hook_pos_(device|partition|suffix|hash) are called after the + element was yielded. They are meant to do some pre/post processing. + eg: saving a progress status. + :param devices: parent directory of the devices to be audited :param datadir: a directory located under self.devices. This should be one of the DATADIR constants defined in the account, @@ -3165,11 +3180,31 @@ def audit_location_generator(devices, datadir, suffix='', :param mount_check: Flag to check if a mount check should be performed on devices :param logger: a logger object + :devices_filter: a callable taking (devices, [list of devices]) as + parameters and returning a [list of devices] + :partitions_filter: a callable taking (datadir_path, [list of parts]) as + parameters and returning a [list of parts] + :suffixes_filter: a callable taking (part_path, [list of suffixes]) as + parameters and returning a [list of suffixes] + :hashes_filter: a callable taking (suff_path, [list of hashes]) as + parameters and returning a [list of hashes] + :hook_pre_device: a callable taking device_path as parameter + :hook_post_device: a callable taking device_path as parameter + :hook_pre_partition: a callable taking part_path as parameter + :hook_post_partition: a callable taking part_path as parameter + :hook_pre_suffix: a callable taking suff_path as parameter + :hook_post_suffix: a callable taking suff_path as parameter + :hook_pre_hash: a callable taking hash_path as parameter + :hook_post_hash: a callable taking hash_path as parameter """ device_dir = listdir(devices) # randomize devices in case of process restart before sweep completed shuffle(device_dir) + if devices_filter: + device_dir = devices_filter(devices, device_dir) for device in device_dir: + if hook_pre_device: + hook_pre_device(os.path.join(devices, device)) if mount_check and not ismount(os.path.join(devices, device)): if logger: logger.warning( @@ -3183,24 +3218,36 @@ def audit_location_generator(devices, datadir, suffix='', logger.warning(_('Skipping %(datadir)s because %(err)s'), {'datadir': datadir_path, 'err': e}) continue + if partitions_filter: + partitions = partitions_filter(datadir_path, partitions) for partition in partitions: part_path = os.path.join(datadir_path, partition) + if hook_pre_partition: + hook_pre_partition(part_path) try: suffixes = listdir(part_path) except OSError as e: if e.errno != errno.ENOTDIR: raise continue + if suffixes_filter: + suffixes = suffixes_filter(part_path, suffixes) for asuffix in suffixes: suff_path = os.path.join(part_path, asuffix) + if hook_pre_suffix: + hook_pre_suffix(suff_path) try: hashes = listdir(suff_path) except OSError as e: if e.errno != errno.ENOTDIR: raise continue + if hashes_filter: + hashes = hashes_filter(suff_path, hashes) for hsh in hashes: hash_path = os.path.join(suff_path, hsh) + if hook_pre_hash: + hook_pre_hash(hash_path) try: files = sorted(listdir(hash_path), reverse=True) except OSError as e: @@ -3212,6 +3259,14 @@ def audit_location_generator(devices, datadir, suffix='', continue path = os.path.join(hash_path, fname) yield path, device, partition + if hook_post_hash: + hook_post_hash(hash_path) + if hook_post_suffix: + hook_post_suffix(suff_path) + if hook_post_partition: + hook_post_partition(part_path) + if hook_post_device: + hook_post_device(os.path.join(devices, device)) def ratelimit_sleep(running_time, max_rate, incr_by=1, rate_buffer=5): @@ -4814,6 +4869,8 @@ class ShardRange(object): value. :param epoch: optional epoch timestamp which represents the time at which sharding was enabled for a container. + :param reported: optional indicator that this shard and its stats have + been reported to the root container. """ FOUND = 10 CREATED = 20 @@ -4864,7 +4921,8 @@ class ShardRange(object): def __init__(self, name, timestamp, lower=MIN, upper=MAX, object_count=0, bytes_used=0, meta_timestamp=None, - deleted=False, state=None, state_timestamp=None, epoch=None): + deleted=False, state=None, state_timestamp=None, epoch=None, + reported=False): self.account = self.container = self._timestamp = \ self._meta_timestamp = self._state_timestamp = self._epoch = None self._lower = ShardRange.MIN @@ -4883,6 +4941,7 @@ class ShardRange(object): self.state = self.FOUND if state is None else state self.state_timestamp = state_timestamp self.epoch = epoch + self.reported = reported @classmethod def _encode(cls, value): @@ -5063,8 +5122,14 @@ class ShardRange(object): cast to an int, or if meta_timestamp is neither None nor can be cast to a :class:`~swift.common.utils.Timestamp`. """ - self.object_count = int(object_count) - self.bytes_used = int(bytes_used) + if self.object_count != int(object_count): + self.object_count = int(object_count) + self.reported = False + + if self.bytes_used != int(bytes_used): + self.bytes_used = int(bytes_used) + self.reported = False + if meta_timestamp is None: self.meta_timestamp = Timestamp.now() else: @@ -5145,6 +5210,14 @@ class ShardRange(object): def epoch(self, epoch): self._epoch = self._to_timestamp(epoch) + @property + def reported(self): + return self._reported + + @reported.setter + def reported(self, value): + self._reported = bool(value) + def update_state(self, state, state_timestamp=None): """ Set state to the given value and optionally update the state_timestamp @@ -5161,6 +5234,7 @@ class ShardRange(object): self.state = state if state_timestamp is not None: self.state_timestamp = state_timestamp + self.reported = False return True @property @@ -5283,6 +5357,7 @@ class ShardRange(object): yield 'state', self.state yield 'state_timestamp', self.state_timestamp.internal yield 'epoch', self.epoch.internal if self.epoch is not None else None + yield 'reported', 1 if self.reported else 0 def copy(self, timestamp=None, **kwargs): """ @@ -5314,7 +5389,8 @@ class ShardRange(object): params['name'], params['timestamp'], params['lower'], params['upper'], params['object_count'], params['bytes_used'], params['meta_timestamp'], params['deleted'], params['state'], - params['state_timestamp'], params['epoch']) + params['state_timestamp'], params['epoch'], + params.get('reported', 0)) def find_shard_range(item, ranges): |