diff options
author | Tim Burke <tim.burke@gmail.com> | 2020-06-18 09:41:46 -0700 |
---|---|---|
committer | Tim Burke <tim.burke@gmail.com> | 2020-06-18 09:41:46 -0700 |
commit | 481f126e6b59689599f438e5d27f7328f5b3e813 (patch) | |
tree | 14212db13aee782e95ffd36993d74c6bf35df0cb /swift | |
parent | b3fd0bd9d82160305a821e742b2cd968036911b2 (diff) | |
parent | 51a587ed8dd5700b558ad26d70dcb7facc0f91e4 (diff) | |
download | swift-feature/losf.tar.gz |
Merge remote-tracking branch 'gerrit/master' into feature/losffeature/losf
Change-Id: If9d7c63f3c4c15fbccff31e2b77a6911bb95972a
Diffstat (limited to 'swift')
-rw-r--r-- | swift/cli/info.py | 17 | ||||
-rw-r--r-- | swift/cli/relinker.py | 181 | ||||
-rw-r--r-- | swift/common/db.py | 53 | ||||
-rw-r--r-- | swift/common/memcached.py | 21 | ||||
-rw-r--r-- | swift/common/middleware/memcache.py | 5 | ||||
-rw-r--r-- | swift/common/middleware/ratelimit.py | 4 | ||||
-rw-r--r-- | swift/common/middleware/s3api/controllers/obj.py | 24 | ||||
-rw-r--r-- | swift/common/middleware/symlink.py | 31 | ||||
-rw-r--r-- | swift/common/middleware/versioned_writes/object_versioning.py | 6 | ||||
-rw-r--r-- | swift/common/swob.py | 27 | ||||
-rw-r--r-- | swift/common/utils.py | 86 | ||||
-rw-r--r-- | swift/container/backend.py | 70 | ||||
-rw-r--r-- | swift/container/server.py | 44 | ||||
-rw-r--r-- | swift/container/sharder.py | 17 | ||||
-rw-r--r-- | swift/obj/updater.py | 13 |
15 files changed, 484 insertions, 115 deletions
diff --git a/swift/cli/info.py b/swift/cli/info.py index 49c8da88e..dc29faded 100644 --- a/swift/cli/info.py +++ b/swift/cli/info.py @@ -57,6 +57,8 @@ def parse_get_node_args(options, args): else: raise InfoSystemExit('Ring file does not exist') + if options.quoted: + args = [urllib.parse.unquote(arg) for arg in args] if len(args) == 1: args = args[0].strip('/').split('/', 2) @@ -614,15 +616,15 @@ def print_item_locations(ring, ring_name=None, account=None, container=None, ring = POLICIES.get_object_ring(policy_index, swift_dir) ring_name = (POLICIES.get_by_name(policy_name)).ring_name - if account is None and (container is not None or obj is not None): + if (container or obj) and not account: print('No account specified') raise InfoSystemExit() - if container is None and obj is not None: + if obj and not container: print('No container specified') raise InfoSystemExit() - if account is None and part is None: + if not account and not part: print('No target specified') raise InfoSystemExit() @@ -654,8 +656,11 @@ def print_item_locations(ring, ring_name=None, account=None, container=None, print('Warning: account specified ' + 'but ring not named "account"') - print('\nAccount \t%s' % account) - print('Container\t%s' % container) - print('Object \t%s\n\n' % obj) + if account: + print('\nAccount \t%s' % urllib.parse.quote(account)) + if container: + print('Container\t%s' % urllib.parse.quote(container)) + if obj: + print('Object \t%s\n\n' % urllib.parse.quote(obj)) print_ring_locations(ring, loc, account, container, obj, part, all_nodes, policy_index=policy_index) diff --git a/swift/cli/relinker.py b/swift/cli/relinker.py index b7b4aaf73..630c0e98e 100644 --- a/swift/cli/relinker.py +++ b/swift/cli/relinker.py @@ -14,8 +14,12 @@ # limitations under the License. +import errno +import fcntl +import json import logging import os +from functools import partial from swift.common.storage_policy import POLICIES from swift.common.exceptions import DiskFileDeleted, DiskFileNotExist, \ DiskFileQuarantined @@ -24,10 +28,126 @@ from swift.common.utils import replace_partition_in_path, \ from swift.obj import diskfile +LOCK_FILE = '.relink.{datadir}.lock' +STATE_FILE = 'relink.{datadir}.json' +STATE_TMP_FILE = '.relink.{datadir}.json.tmp' +STEP_RELINK = 'relink' +STEP_CLEANUP = 'cleanup' + + +def devices_filter(device, _, devices): + if device: + devices = [d for d in devices if d == device] + + return set(devices) + + +def hook_pre_device(locks, states, datadir, device_path): + lock_file = os.path.join(device_path, LOCK_FILE.format(datadir=datadir)) + + fd = os.open(lock_file, os.O_CREAT | os.O_WRONLY) + fcntl.flock(fd, fcntl.LOCK_EX) + locks[0] = fd + + state_file = os.path.join(device_path, STATE_FILE.format(datadir=datadir)) + states.clear() + try: + with open(state_file, 'rt') as f: + tmp = json.load(f) + states.update(tmp) + except ValueError: + # Invalid JSON: remove the file to restart from scratch + os.unlink(state_file) + except IOError as err: + # Ignore file not found error + if err.errno != errno.ENOENT: + raise + + +def hook_post_device(locks, _): + os.close(locks[0]) + locks[0] = None + + +def partitions_filter(states, step, part_power, next_part_power, + datadir_path, partitions): + # Remove all non partitions first (eg: auditor_status_ALL.json) + partitions = [p for p in partitions if p.isdigit()] + + if not (step == STEP_CLEANUP and part_power == next_part_power): + # This is not a cleanup after cancel, partitions in the upper half are + # new partitions, there is nothing to relink/cleanup from there + partitions = [p for p in partitions + if int(p) < 2 ** next_part_power / 2] + + # Format: { 'part': [relinked, cleaned] } + if states: + missing = list(set(partitions) - set(states.keys())) + if missing: + # All missing partitions was created after the first run of + # relink, so after the new ring was distribued, so they already + # are hardlinked in both partitions, but they will need to + # cleaned.. Just update the state file. + for part in missing: + states[part] = [True, False] + if step == STEP_RELINK: + partitions = [str(p) for p, (r, c) in states.items() if not r] + elif step == STEP_CLEANUP: + partitions = [str(p) for p, (r, c) in states.items() if not c] + else: + states.update({str(p): [False, False] for p in partitions}) + + # Always scan the partitions in reverse order to minimize the amount of IO + # (it actually only matters for relink, not for cleanup). + # + # Initial situation: + # objects/0/000/00000000000000000000000000000000/12345.data + # -> relinked to objects/1/000/10000000000000000000000000000000/12345.data + # + # If the relinker then scan partition 1, it will listdir that object while + # it's unnecessary. By working in reverse order of partitions, this is + # avoided. + partitions = sorted(partitions, key=lambda x: int(x), reverse=True) + + return partitions + + +# Save states when a partition is done +def hook_post_partition(states, step, + partition_path): + part = os.path.basename(os.path.abspath(partition_path)) + datadir_path = os.path.dirname(os.path.abspath(partition_path)) + device_path = os.path.dirname(os.path.abspath(datadir_path)) + datadir_name = os.path.basename(os.path.abspath(datadir_path)) + state_tmp_file = os.path.join(device_path, + STATE_TMP_FILE.format(datadir=datadir_name)) + state_file = os.path.join(device_path, + STATE_FILE.format(datadir=datadir_name)) + + if step == STEP_RELINK: + states[part][0] = True + elif step == STEP_CLEANUP: + states[part][1] = True + with open(state_tmp_file, 'wt') as f: + json.dump(states, f) + os.fsync(f.fileno()) + os.rename(state_tmp_file, state_file) + + +def hashes_filter(next_part_power, suff_path, hashes): + hashes = list(hashes) + for hsh in hashes: + fname = os.path.join(suff_path, hsh, 'fake-file-name') + if replace_partition_in_path(fname, next_part_power) == fname: + hashes.remove(hsh) + return hashes + + def relink(swift_dir='/etc/swift', devices='/srv/node', skip_mount_check=False, - logger=logging.getLogger()): + logger=logging.getLogger(), + device=None): mount_check = not skip_mount_check run = False relinked = errors = 0 @@ -41,10 +161,31 @@ def relink(swift_dir='/etc/swift', logging.info('Relinking files for policy %s under %s', policy.name, devices) run = True + datadir = diskfile.get_data_dir(policy) + + locks = [None] + states = {} + relink_devices_filter = partial(devices_filter, device) + relink_hook_pre_device = partial(hook_pre_device, locks, states, + datadir) + relink_hook_post_device = partial(hook_post_device, locks) + relink_partition_filter = partial(partitions_filter, + states, STEP_RELINK, + part_power, next_part_power) + relink_hook_post_partition = partial(hook_post_partition, + states, STEP_RELINK) + relink_hashes_filter = partial(hashes_filter, next_part_power) + locations = audit_location_generator( devices, - diskfile.get_data_dir(policy), - mount_check=mount_check) + datadir, + mount_check=mount_check, + devices_filter=relink_devices_filter, + hook_pre_device=relink_hook_pre_device, + hook_post_device=relink_hook_post_device, + partitions_filter=relink_partition_filter, + hook_post_partition=relink_hook_post_partition, + hashes_filter=relink_hashes_filter) for fname, _, _ in locations: newfname = replace_partition_in_path(fname, next_part_power) try: @@ -67,7 +208,8 @@ def relink(swift_dir='/etc/swift', def cleanup(swift_dir='/etc/swift', devices='/srv/node', skip_mount_check=False, - logger=logging.getLogger()): + logger=logging.getLogger(), + device=None): mount_check = not skip_mount_check conf = {'devices': devices, 'mount_check': mount_check} diskfile_router = diskfile.DiskFileRouter(conf, get_logger(conf)) @@ -83,10 +225,31 @@ def cleanup(swift_dir='/etc/swift', logging.info('Cleaning up files for policy %s under %s', policy.name, devices) run = True + datadir = diskfile.get_data_dir(policy) + + locks = [None] + states = {} + cleanup_devices_filter = partial(devices_filter, device) + cleanup_hook_pre_device = partial(hook_pre_device, locks, states, + datadir) + cleanup_hook_post_device = partial(hook_post_device, locks) + cleanup_partition_filter = partial(partitions_filter, + states, STEP_CLEANUP, + part_power, next_part_power) + cleanup_hook_post_partition = partial(hook_post_partition, + states, STEP_CLEANUP) + cleanup_hashes_filter = partial(hashes_filter, next_part_power) + locations = audit_location_generator( devices, - diskfile.get_data_dir(policy), - mount_check=mount_check) + datadir, + mount_check=mount_check, + devices_filter=cleanup_devices_filter, + hook_pre_device=cleanup_hook_pre_device, + hook_post_device=cleanup_hook_post_device, + partitions_filter=cleanup_partition_filter, + hook_post_partition=cleanup_hook_post_partition, + hashes_filter=cleanup_hashes_filter) for fname, device, partition in locations: expected_fname = replace_partition_in_path(fname, part_power) if fname == expected_fname: @@ -152,8 +315,10 @@ def main(args): if args.action == 'relink': return relink( - args.swift_dir, args.devices, args.skip_mount_check, logger) + args.swift_dir, args.devices, args.skip_mount_check, logger, + device=args.device) if args.action == 'cleanup': return cleanup( - args.swift_dir, args.devices, args.skip_mount_check, logger) + args.swift_dir, args.devices, args.skip_mount_check, logger, + device=args.device) diff --git a/swift/common/db.py b/swift/common/db.py index c6df12aa3..e06baf5c6 100644 --- a/swift/common/db.py +++ b/swift/common/db.py @@ -53,6 +53,9 @@ PICKLE_PROTOCOL = 2 # records will be merged. PENDING_CAP = 131072 +SQLITE_ARG_LIMIT = 999 +RECLAIM_PAGE_SIZE = 10000 + def utf8encode(*args): return [(s.encode('utf8') if isinstance(s, six.text_type) else s) @@ -981,16 +984,48 @@ class DatabaseBroker(object): with lock_parent_directory(self.pending_file, self.pending_timeout): self._commit_puts() - with self.get() as conn: - self._reclaim(conn, age_timestamp, sync_timestamp) - self._reclaim_metadata(conn, age_timestamp) - conn.commit() + marker = '' + finished = False + while not finished: + with self.get() as conn: + marker = self._reclaim(conn, age_timestamp, marker) + if not marker: + finished = True + self._reclaim_other_stuff( + conn, age_timestamp, sync_timestamp) + conn.commit() + + def _reclaim_other_stuff(self, conn, age_timestamp, sync_timestamp): + """ + This is only called once at the end of reclaim after _reclaim has been + called for each page. + """ + self._reclaim_sync(conn, sync_timestamp) + self._reclaim_metadata(conn, age_timestamp) + + def _reclaim(self, conn, age_timestamp, marker): + clean_batch_qry = ''' + DELETE FROM %s WHERE deleted = 1 + AND name > ? AND %s < ? + ''' % (self.db_contains_type, self.db_reclaim_timestamp) + curs = conn.execute(''' + SELECT name FROM %s WHERE deleted = 1 + AND name > ? + ORDER BY NAME LIMIT 1 OFFSET ? + ''' % (self.db_contains_type,), (marker, RECLAIM_PAGE_SIZE)) + row = curs.fetchone() + if row: + # do a single book-ended DELETE and bounce out + end_marker = row[0] + conn.execute(clean_batch_qry + ' AND name <= ?', ( + marker, age_timestamp, end_marker)) + else: + # delete off the end and reset marker to indicate we're done + end_marker = '' + conn.execute(clean_batch_qry, (marker, age_timestamp)) + return end_marker - def _reclaim(self, conn, age_timestamp, sync_timestamp): - conn.execute(''' - DELETE FROM %s WHERE deleted = 1 AND %s < ? - ''' % (self.db_contains_type, self.db_reclaim_timestamp), - (age_timestamp,)) + def _reclaim_sync(self, conn, sync_timestamp): try: conn.execute(''' DELETE FROM outgoing_sync WHERE updated_at < ? diff --git a/swift/common/memcached.py b/swift/common/memcached.py index a80fa0fb6..08da5c7ba 100644 --- a/swift/common/memcached.py +++ b/swift/common/memcached.py @@ -160,7 +160,7 @@ class MemcacheRing(object): def __init__(self, servers, connect_timeout=CONN_TIMEOUT, io_timeout=IO_TIMEOUT, pool_timeout=POOL_TIMEOUT, tries=TRY_COUNT, allow_pickle=False, allow_unpickle=False, - max_conns=2): + max_conns=2, logger=None): self._ring = {} self._errors = dict(((serv, []) for serv in servers)) self._error_limited = dict(((serv, 0) for serv in servers)) @@ -178,18 +178,23 @@ class MemcacheRing(object): self._pool_timeout = pool_timeout self._allow_pickle = allow_pickle self._allow_unpickle = allow_unpickle or allow_pickle + if logger is None: + self.logger = logging.getLogger() + else: + self.logger = logger def _exception_occurred(self, server, e, action='talking', sock=None, fp=None, got_connection=True): if isinstance(e, Timeout): - logging.error("Timeout %(action)s to memcached: %(server)s", - {'action': action, 'server': server}) + self.logger.error("Timeout %(action)s to memcached: %(server)s", + {'action': action, 'server': server}) elif isinstance(e, (socket.error, MemcacheConnectionError)): - logging.error("Error %(action)s to memcached: %(server)s: %(err)s", - {'action': action, 'server': server, 'err': e}) + self.logger.error( + "Error %(action)s to memcached: %(server)s: %(err)s", + {'action': action, 'server': server, 'err': e}) else: - logging.exception("Error %(action)s to memcached: %(server)s", - {'action': action, 'server': server}) + self.logger.exception("Error %(action)s to memcached: %(server)s", + {'action': action, 'server': server}) try: if fp: fp.close() @@ -213,7 +218,7 @@ class MemcacheRing(object): if err > now - ERROR_LIMIT_TIME] if len(self._errors[server]) > ERROR_LIMIT_COUNT: self._error_limited[server] = now + ERROR_LIMIT_DURATION - logging.error('Error limiting server %s', server) + self.logger.error('Error limiting server %s', server) def _get_conns(self, key): """ diff --git a/swift/common/middleware/memcache.py b/swift/common/middleware/memcache.py index e846749cb..b5b9569a5 100644 --- a/swift/common/middleware/memcache.py +++ b/swift/common/middleware/memcache.py @@ -19,6 +19,7 @@ from six.moves.configparser import ConfigParser, NoSectionError, NoOptionError from swift.common.memcached import (MemcacheRing, CONN_TIMEOUT, POOL_TIMEOUT, IO_TIMEOUT, TRY_COUNT) +from swift.common.utils import get_logger class MemcacheMiddleware(object): @@ -28,6 +29,7 @@ class MemcacheMiddleware(object): def __init__(self, app, conf): self.app = app + self.logger = get_logger(conf, log_route='memcache') self.memcache_servers = conf.get('memcache_servers') serialization_format = conf.get('memcache_serialization_support') try: @@ -102,7 +104,8 @@ class MemcacheMiddleware(object): io_timeout=io_timeout, allow_pickle=(serialization_format == 0), allow_unpickle=(serialization_format <= 1), - max_conns=max_conns) + max_conns=max_conns, + logger=self.logger) def __call__(self, env, start_response): env['swift.cache'] = self.memcache diff --git a/swift/common/middleware/ratelimit.py b/swift/common/middleware/ratelimit.py index 72e1d6a40..9d3ff2fdd 100644 --- a/swift/common/middleware/ratelimit.py +++ b/swift/common/middleware/ratelimit.py @@ -242,6 +242,10 @@ class RateLimitMiddleware(object): if not self.memcache_client: return None + if req.environ.get('swift.ratelimit.handled'): + return None + req.environ['swift.ratelimit.handled'] = True + try: account_info = get_account_info(req.environ, self.app, swift_source='RL') diff --git a/swift/common/middleware/s3api/controllers/obj.py b/swift/common/middleware/s3api/controllers/obj.py index 293b14702..716c837b6 100644 --- a/swift/common/middleware/s3api/controllers/obj.py +++ b/swift/common/middleware/s3api/controllers/obj.py @@ -26,7 +26,7 @@ from swift.common.middleware.versioned_writes.object_versioning import \ from swift.common.middleware.s3api.utils import S3Timestamp, sysmeta_header from swift.common.middleware.s3api.controllers.base import Controller from swift.common.middleware.s3api.s3response import S3NotImplemented, \ - InvalidRange, NoSuchKey, InvalidArgument, HTTPNoContent, \ + InvalidRange, NoSuchKey, NoSuchVersion, InvalidArgument, HTTPNoContent, \ PreconditionFailed @@ -88,7 +88,15 @@ class ObjectController(Controller): if version_id not in ('null', None) and \ 'object_versioning' not in get_swift_info(): raise S3NotImplemented() + query = {} if version_id is None else {'version-id': version_id} + if version_id not in ('null', None): + container_info = req.get_container_info(self.app) + if not container_info.get( + 'sysmeta', {}).get('versions-container', ''): + # Versioning has never been enabled + raise NoSuchVersion(object_name, version_id) + resp = req.get_response(self.app, query=query) if req.method == 'HEAD': @@ -193,17 +201,25 @@ class ObjectController(Controller): 'object_versioning' not in get_swift_info(): raise S3NotImplemented() + version_id = req.params.get('versionId') + if version_id not in ('null', None): + container_info = req.get_container_info(self.app) + if not container_info.get( + 'sysmeta', {}).get('versions-container', ''): + # Versioning has never been enabled + return HTTPNoContent(headers={'x-amz-version-id': version_id}) + try: try: query = req.gen_multipart_manifest_delete_query( - self.app, version=req.params.get('versionId')) + self.app, version=version_id) except NoSuchKey: query = {} req.headers['Content-Type'] = None # Ignore client content-type - if 'versionId' in req.params: - query['version-id'] = req.params['versionId'] + if version_id is not None: + query['version-id'] = version_id query['symlink'] = 'get' resp = req.get_response(self.app, query=query) diff --git a/swift/common/middleware/symlink.py b/swift/common/middleware/symlink.py index d2c644438..bde163aa0 100644 --- a/swift/common/middleware/symlink.py +++ b/swift/common/middleware/symlink.py @@ -205,7 +205,8 @@ from swift.common.utils import get_logger, register_swift_info, split_path, \ MD5_OF_EMPTY_STRING, close_if_possible, closing_if_possible, \ config_true_value, drain_and_close from swift.common.constraints import check_account_format -from swift.common.wsgi import WSGIContext, make_subrequest +from swift.common.wsgi import WSGIContext, make_subrequest, \ + make_pre_authed_request from swift.common.request_helpers import get_sys_meta_prefix, \ check_path_header, get_container_update_override_key, \ update_ignore_range_header @@ -442,7 +443,9 @@ class SymlinkObjectContext(WSGIContext): content_type='text/plain') def _recursive_get_head(self, req, target_etag=None, - follow_softlinks=True): + follow_softlinks=True, orig_req=None): + if not orig_req: + orig_req = req resp = self._app_call(req.environ) def build_traversal_req(symlink_target): @@ -457,9 +460,20 @@ class SymlinkObjectContext(WSGIContext): '/', version, account, symlink_target.lstrip('/')) self._last_target_path = target_path - new_req = make_subrequest( - req.environ, path=target_path, method=req.method, - headers=req.headers, swift_source='SYM') + + subreq_headers = dict(req.headers) + if self._response_header_value(ALLOW_RESERVED_NAMES): + # this symlink's sysmeta says it can point to reserved names, + # we're infering that some piece of middleware had previously + # authorized this request because users can't access reserved + # names directly + subreq_meth = make_pre_authed_request + subreq_headers['X-Backend-Allow-Reserved-Names'] = 'true' + else: + subreq_meth = make_subrequest + new_req = subreq_meth(orig_req.environ, path=target_path, + method=req.method, headers=subreq_headers, + swift_source='SYM') new_req.headers.pop('X-Backend-Storage-Policy-Index', None) return new_req @@ -484,11 +498,8 @@ class SymlinkObjectContext(WSGIContext): if not config_true_value( self._response_header_value(SYMLOOP_EXTEND)): self._loop_count += 1 - if config_true_value( - self._response_header_value(ALLOW_RESERVED_NAMES)): - new_req.headers['X-Backend-Allow-Reserved-Names'] = 'true' - - return self._recursive_get_head(new_req, target_etag=resp_etag) + return self._recursive_get_head(new_req, target_etag=resp_etag, + orig_req=req) else: final_etag = self._response_header_value('etag') if final_etag and target_etag and target_etag != final_etag: diff --git a/swift/common/middleware/versioned_writes/object_versioning.py b/swift/common/middleware/versioned_writes/object_versioning.py index 5c9b72d5c..508972f72 100644 --- a/swift/common/middleware/versioned_writes/object_versioning.py +++ b/swift/common/middleware/versioned_writes/object_versioning.py @@ -152,7 +152,7 @@ from cgi import parse_header from six.moves.urllib.parse import unquote from swift.common.constraints import MAX_FILE_SIZE, valid_api_version, \ - ACCOUNT_LISTING_LIMIT + ACCOUNT_LISTING_LIMIT, CONTAINER_LISTING_LIMIT from swift.common.http import is_success, is_client_error, HTTP_NOT_FOUND, \ HTTP_CONFLICT from swift.common.request_helpers import get_sys_meta_prefix, \ @@ -1191,7 +1191,7 @@ class ContainerContext(ObjectVersioningContext): 'hash': item['hash'], 'last_modified': item['last_modified'], }) - limit = constrain_req_limit(req, ACCOUNT_LISTING_LIMIT) + limit = constrain_req_limit(req, CONTAINER_LISTING_LIMIT) body = build_listing( null_listing, subdir_listing, broken_listing, reverse=config_true_value(params.get('reverse', 'no')), @@ -1256,7 +1256,7 @@ class ContainerContext(ObjectVersioningContext): 'last_modified': item['last_modified'], }) - limit = constrain_req_limit(req, ACCOUNT_LISTING_LIMIT) + limit = constrain_req_limit(req, CONTAINER_LISTING_LIMIT) body = build_listing( null_listing, versions_listing, subdir_listing, broken_listing, diff --git a/swift/common/swob.py b/swift/common/swob.py index 61b66793c..76fb2fbc9 100644 --- a/swift/common/swob.py +++ b/swift/common/swob.py @@ -43,7 +43,6 @@ from email.utils import parsedate import re import random import functools -import inspect from io import BytesIO import six @@ -1563,23 +1562,15 @@ def wsgify(func): return a Response object into WSGI callables. Also catches any raised HTTPExceptions and treats them as a returned Response. """ - argspec = inspect.getargspec(func) - if argspec.args and argspec.args[0] == 'self': - @functools.wraps(func) - def _wsgify_self(self, env, start_response): - try: - return func(self, Request(env))(env, start_response) - except HTTPException as err_resp: - return err_resp(env, start_response) - return _wsgify_self - else: - @functools.wraps(func) - def _wsgify_bare(env, start_response): - try: - return func(Request(env))(env, start_response) - except HTTPException as err_resp: - return err_resp(env, start_response) - return _wsgify_bare + @functools.wraps(func) + def _wsgify(*args): + env, start_response = args[-2:] + new_args = args[:-2] + (Request(env), ) + try: + return func(*new_args)(env, start_response) + except HTTPException as err_resp: + return err_resp(env, start_response) + return _wsgify class StatusMap(object): diff --git a/swift/common/utils.py b/swift/common/utils.py index df8713d3a..23a137e6c 100644 --- a/swift/common/utils.py +++ b/swift/common/utils.py @@ -3152,11 +3152,26 @@ def remove_directory(path): def audit_location_generator(devices, datadir, suffix='', - mount_check=True, logger=None): + mount_check=True, logger=None, + devices_filter=None, partitions_filter=None, + suffixes_filter=None, hashes_filter=None, + hook_pre_device=None, hook_post_device=None, + hook_pre_partition=None, hook_post_partition=None, + hook_pre_suffix=None, hook_post_suffix=None, + hook_pre_hash=None, hook_post_hash=None): """ Given a devices path and a data directory, yield (path, device, partition) for all files in that directory + (devices|partitions|suffixes|hashes)_filter are meant to modify the list of + elements that will be iterated. eg: they can be used to exclude some + elements based on a custom condition defined by the caller. + + hook_pre_(device|partition|suffix|hash) are called before yielding the + element, hook_pos_(device|partition|suffix|hash) are called after the + element was yielded. They are meant to do some pre/post processing. + eg: saving a progress status. + :param devices: parent directory of the devices to be audited :param datadir: a directory located under self.devices. This should be one of the DATADIR constants defined in the account, @@ -3165,11 +3180,31 @@ def audit_location_generator(devices, datadir, suffix='', :param mount_check: Flag to check if a mount check should be performed on devices :param logger: a logger object + :devices_filter: a callable taking (devices, [list of devices]) as + parameters and returning a [list of devices] + :partitions_filter: a callable taking (datadir_path, [list of parts]) as + parameters and returning a [list of parts] + :suffixes_filter: a callable taking (part_path, [list of suffixes]) as + parameters and returning a [list of suffixes] + :hashes_filter: a callable taking (suff_path, [list of hashes]) as + parameters and returning a [list of hashes] + :hook_pre_device: a callable taking device_path as parameter + :hook_post_device: a callable taking device_path as parameter + :hook_pre_partition: a callable taking part_path as parameter + :hook_post_partition: a callable taking part_path as parameter + :hook_pre_suffix: a callable taking suff_path as parameter + :hook_post_suffix: a callable taking suff_path as parameter + :hook_pre_hash: a callable taking hash_path as parameter + :hook_post_hash: a callable taking hash_path as parameter """ device_dir = listdir(devices) # randomize devices in case of process restart before sweep completed shuffle(device_dir) + if devices_filter: + device_dir = devices_filter(devices, device_dir) for device in device_dir: + if hook_pre_device: + hook_pre_device(os.path.join(devices, device)) if mount_check and not ismount(os.path.join(devices, device)): if logger: logger.warning( @@ -3183,24 +3218,36 @@ def audit_location_generator(devices, datadir, suffix='', logger.warning(_('Skipping %(datadir)s because %(err)s'), {'datadir': datadir_path, 'err': e}) continue + if partitions_filter: + partitions = partitions_filter(datadir_path, partitions) for partition in partitions: part_path = os.path.join(datadir_path, partition) + if hook_pre_partition: + hook_pre_partition(part_path) try: suffixes = listdir(part_path) except OSError as e: if e.errno != errno.ENOTDIR: raise continue + if suffixes_filter: + suffixes = suffixes_filter(part_path, suffixes) for asuffix in suffixes: suff_path = os.path.join(part_path, asuffix) + if hook_pre_suffix: + hook_pre_suffix(suff_path) try: hashes = listdir(suff_path) except OSError as e: if e.errno != errno.ENOTDIR: raise continue + if hashes_filter: + hashes = hashes_filter(suff_path, hashes) for hsh in hashes: hash_path = os.path.join(suff_path, hsh) + if hook_pre_hash: + hook_pre_hash(hash_path) try: files = sorted(listdir(hash_path), reverse=True) except OSError as e: @@ -3212,6 +3259,14 @@ def audit_location_generator(devices, datadir, suffix='', continue path = os.path.join(hash_path, fname) yield path, device, partition + if hook_post_hash: + hook_post_hash(hash_path) + if hook_post_suffix: + hook_post_suffix(suff_path) + if hook_post_partition: + hook_post_partition(part_path) + if hook_post_device: + hook_post_device(os.path.join(devices, device)) def ratelimit_sleep(running_time, max_rate, incr_by=1, rate_buffer=5): @@ -4814,6 +4869,8 @@ class ShardRange(object): value. :param epoch: optional epoch timestamp which represents the time at which sharding was enabled for a container. + :param reported: optional indicator that this shard and its stats have + been reported to the root container. """ FOUND = 10 CREATED = 20 @@ -4864,7 +4921,8 @@ class ShardRange(object): def __init__(self, name, timestamp, lower=MIN, upper=MAX, object_count=0, bytes_used=0, meta_timestamp=None, - deleted=False, state=None, state_timestamp=None, epoch=None): + deleted=False, state=None, state_timestamp=None, epoch=None, + reported=False): self.account = self.container = self._timestamp = \ self._meta_timestamp = self._state_timestamp = self._epoch = None self._lower = ShardRange.MIN @@ -4883,6 +4941,7 @@ class ShardRange(object): self.state = self.FOUND if state is None else state self.state_timestamp = state_timestamp self.epoch = epoch + self.reported = reported @classmethod def _encode(cls, value): @@ -5063,8 +5122,14 @@ class ShardRange(object): cast to an int, or if meta_timestamp is neither None nor can be cast to a :class:`~swift.common.utils.Timestamp`. """ - self.object_count = int(object_count) - self.bytes_used = int(bytes_used) + if self.object_count != int(object_count): + self.object_count = int(object_count) + self.reported = False + + if self.bytes_used != int(bytes_used): + self.bytes_used = int(bytes_used) + self.reported = False + if meta_timestamp is None: self.meta_timestamp = Timestamp.now() else: @@ -5145,6 +5210,14 @@ class ShardRange(object): def epoch(self, epoch): self._epoch = self._to_timestamp(epoch) + @property + def reported(self): + return self._reported + + @reported.setter + def reported(self, value): + self._reported = bool(value) + def update_state(self, state, state_timestamp=None): """ Set state to the given value and optionally update the state_timestamp @@ -5161,6 +5234,7 @@ class ShardRange(object): self.state = state if state_timestamp is not None: self.state_timestamp = state_timestamp + self.reported = False return True @property @@ -5283,6 +5357,7 @@ class ShardRange(object): yield 'state', self.state yield 'state_timestamp', self.state_timestamp.internal yield 'epoch', self.epoch.internal if self.epoch is not None else None + yield 'reported', 1 if self.reported else 0 def copy(self, timestamp=None, **kwargs): """ @@ -5314,7 +5389,8 @@ class ShardRange(object): params['name'], params['timestamp'], params['lower'], params['upper'], params['object_count'], params['bytes_used'], params['meta_timestamp'], params['deleted'], params['state'], - params['state_timestamp'], params['epoch']) + params['state_timestamp'], params['epoch'], + params.get('reported', 0)) def find_shard_range(item, ranges): diff --git a/swift/container/backend.py b/swift/container/backend.py index 0a18fe48f..bdf34f7d8 100644 --- a/swift/container/backend.py +++ b/swift/container/backend.py @@ -34,9 +34,7 @@ from swift.common.utils import Timestamp, encode_timestamps, \ get_db_files, parse_db_filename, make_db_file_path, split_path, \ RESERVED_BYTE from swift.common.db import DatabaseBroker, utf8encode, BROKER_TIMEOUT, \ - zero_like, DatabaseAlreadyExists - -SQLITE_ARG_LIMIT = 999 + zero_like, DatabaseAlreadyExists, SQLITE_ARG_LIMIT DATADIR = 'containers' @@ -62,7 +60,7 @@ SHARD_UPDATE_STATES = [ShardRange.CREATED, ShardRange.CLEAVED, # tuples and vice-versa SHARD_RANGE_KEYS = ('name', 'timestamp', 'lower', 'upper', 'object_count', 'bytes_used', 'meta_timestamp', 'deleted', 'state', - 'state_timestamp', 'epoch') + 'state_timestamp', 'epoch', 'reported') POLICY_STAT_TABLE_CREATE = ''' CREATE TABLE policy_stat ( @@ -269,6 +267,7 @@ def merge_shards(shard_data, existing): if existing['timestamp'] < shard_data['timestamp']: # note that currently we do not roll forward any meta or state from # an item that was created at older time, newer created time trumps + shard_data['reported'] = 0 # reset the latch return True elif existing['timestamp'] > shard_data['timestamp']: return False @@ -285,6 +284,18 @@ def merge_shards(shard_data, existing): else: new_content = True + # We can latch the reported flag + if existing['reported'] and \ + existing['object_count'] == shard_data['object_count'] and \ + existing['bytes_used'] == shard_data['bytes_used'] and \ + existing['state'] == shard_data['state'] and \ + existing['epoch'] == shard_data['epoch']: + shard_data['reported'] = 1 + else: + shard_data.setdefault('reported', 0) + if shard_data['reported'] and not existing['reported']: + new_content = True + if (existing['state_timestamp'] == shard_data['state_timestamp'] and shard_data['state'] > existing['state']): new_content = True @@ -597,7 +608,8 @@ class ContainerBroker(DatabaseBroker): deleted INTEGER DEFAULT 0, state INTEGER, state_timestamp TEXT, - epoch TEXT + epoch TEXT, + reported INTEGER DEFAULT 0 ); """ % SHARD_RANGE_TABLE) @@ -1430,10 +1442,13 @@ class ContainerBroker(DatabaseBroker): # sqlite3.OperationalError: cannot start a transaction # within a transaction conn.rollback() - if ('no such table: %s' % SHARD_RANGE_TABLE) not in str(err): - raise - self.create_shard_range_table(conn) - return _really_merge_items(conn) + if 'no such column: reported' in str(err): + self._migrate_add_shard_range_reported(conn) + return _really_merge_items(conn) + if ('no such table: %s' % SHARD_RANGE_TABLE) in str(err): + self.create_shard_range_table(conn) + return _really_merge_items(conn) + raise def get_reconciler_sync(self): with self.get() as conn: @@ -1581,9 +1596,20 @@ class ContainerBroker(DatabaseBroker): CONTAINER_STAT_VIEW_SCRIPT + 'COMMIT;') - def _reclaim(self, conn, age_timestamp, sync_timestamp): - super(ContainerBroker, self)._reclaim(conn, age_timestamp, - sync_timestamp) + def _migrate_add_shard_range_reported(self, conn): + """ + Add the reported column to the 'shard_range' table. + """ + conn.executescript(''' + BEGIN; + ALTER TABLE %s + ADD COLUMN reported INTEGER DEFAULT 0; + COMMIT; + ''' % SHARD_RANGE_TABLE) + + def _reclaim_other_stuff(self, conn, age_timestamp, sync_timestamp): + super(ContainerBroker, self)._reclaim_other_stuff( + conn, age_timestamp, sync_timestamp) # populate instance cache, but use existing conn to avoid deadlock # when it has a pending update self._populate_instance_cache(conn=conn) @@ -1630,7 +1656,7 @@ class ContainerBroker(DatabaseBroker): elif states is not None: included_states.add(states) - def do_query(conn): + def do_query(conn, use_reported_column=True): condition = '' conditions = [] params = [] @@ -1648,21 +1674,27 @@ class ContainerBroker(DatabaseBroker): params.append(self.path) if conditions: condition = ' WHERE ' + ' AND '.join(conditions) + if use_reported_column: + columns = SHARD_RANGE_KEYS + else: + columns = SHARD_RANGE_KEYS[:-1] + ('0 as reported', ) sql = ''' SELECT %s FROM %s%s; - ''' % (', '.join(SHARD_RANGE_KEYS), SHARD_RANGE_TABLE, condition) + ''' % (', '.join(columns), SHARD_RANGE_TABLE, condition) data = conn.execute(sql, params) data.row_factory = None return [row for row in data] - try: - with self.maybe_get(connection) as conn: + with self.maybe_get(connection) as conn: + try: return do_query(conn) - except sqlite3.OperationalError as err: - if ('no such table: %s' % SHARD_RANGE_TABLE) not in str(err): + except sqlite3.OperationalError as err: + if ('no such table: %s' % SHARD_RANGE_TABLE) in str(err): + return [] + if 'no such column: reported' in str(err): + return do_query(conn, use_reported_column=False) raise - return [] @classmethod def resolve_shard_range_states(cls, states): diff --git a/swift/container/server.py b/swift/container/server.py index c8d7647aa..db9ac0291 100644 --- a/swift/container/server.py +++ b/swift/container/server.py @@ -155,6 +155,8 @@ class ContainerController(BaseStorageServer): conf['auto_create_account_prefix'] else: self.auto_create_account_prefix = AUTO_CREATE_ACCOUNT_PREFIX + self.shards_account_prefix = ( + self.auto_create_account_prefix + 'shards_') if config_true_value(conf.get('allow_versions', 'f')): self.save_headers.append('x-versions-location') if 'allow_versions' in conf: @@ -375,14 +377,12 @@ class ContainerController(BaseStorageServer): # auto create accounts) obj_policy_index = self.get_and_validate_policy_index(req) or 0 broker = self._get_container_broker(drive, part, account, container) - if account.startswith(self.auto_create_account_prefix) and obj and \ - not os.path.exists(broker.db_file): - try: - broker.initialize(req_timestamp.internal, obj_policy_index) - except DatabaseAlreadyExists: - pass - if not os.path.exists(broker.db_file): + if obj: + self._maybe_autocreate(broker, req_timestamp, account, + obj_policy_index, req) + elif not os.path.exists(broker.db_file): return HTTPNotFound() + if obj: # delete object # redirect if a shard range exists for the object name redirect = self._redirect_to_shard(req, broker, obj) @@ -449,11 +449,25 @@ class ContainerController(BaseStorageServer): broker.update_status_changed_at(timestamp) return recreated + def _should_autocreate(self, account, req): + auto_create_header = req.headers.get('X-Backend-Auto-Create') + if auto_create_header: + # If the caller included an explicit X-Backend-Auto-Create header, + # assume they know the behavior they want + return config_true_value(auto_create_header) + if account.startswith(self.shards_account_prefix): + # we have to specical case this subset of the + # auto_create_account_prefix because we don't want the updater + # accidently auto-creating shards; only the sharder creates + # shards and it will explicitly tell the server to do so + return False + return account.startswith(self.auto_create_account_prefix) + def _maybe_autocreate(self, broker, req_timestamp, account, - policy_index): + policy_index, req): created = False - if account.startswith(self.auto_create_account_prefix) and \ - not os.path.exists(broker.db_file): + should_autocreate = self._should_autocreate(account, req) + if should_autocreate and not os.path.exists(broker.db_file): if policy_index is None: raise HTTPBadRequest( 'X-Backend-Storage-Policy-Index header is required') @@ -506,8 +520,8 @@ class ContainerController(BaseStorageServer): # obj put expects the policy_index header, default is for # legacy support during upgrade. obj_policy_index = requested_policy_index or 0 - self._maybe_autocreate(broker, req_timestamp, account, - obj_policy_index) + self._maybe_autocreate( + broker, req_timestamp, account, obj_policy_index, req) # redirect if a shard exists for this object name response = self._redirect_to_shard(req, broker, obj) if response: @@ -531,8 +545,8 @@ class ContainerController(BaseStorageServer): for sr in json.loads(req.body)] except (ValueError, KeyError, TypeError) as err: return HTTPBadRequest('Invalid body: %r' % err) - created = self._maybe_autocreate(broker, req_timestamp, account, - requested_policy_index) + created = self._maybe_autocreate( + broker, req_timestamp, account, requested_policy_index, req) self._update_metadata(req, broker, req_timestamp, 'PUT') if shard_ranges: # TODO: consider writing the shard ranges into the pending @@ -805,7 +819,7 @@ class ContainerController(BaseStorageServer): requested_policy_index = self.get_and_validate_policy_index(req) broker = self._get_container_broker(drive, part, account, container) self._maybe_autocreate(broker, req_timestamp, account, - requested_policy_index) + requested_policy_index, req) try: objs = json.load(req.environ['wsgi.input']) except ValueError as err: diff --git a/swift/container/sharder.py b/swift/container/sharder.py index d9aa7c66d..dd33043ae 100644 --- a/swift/container/sharder.py +++ b/swift/container/sharder.py @@ -618,7 +618,8 @@ class ContainerSharder(ContainerReplicator): def _send_shard_ranges(self, account, container, shard_ranges, headers=None): - body = json.dumps([dict(sr) for sr in shard_ranges]).encode('ascii') + body = json.dumps([dict(sr, reported=0) + for sr in shard_ranges]).encode('ascii') part, nodes = self.ring.get_nodes(account, container) headers = headers or {} headers.update({'X-Backend-Record-Type': RECORD_TYPE_SHARD, @@ -1148,7 +1149,8 @@ class ContainerSharder(ContainerReplicator): 'X-Backend-Storage-Policy-Index': broker.storage_policy_index, 'X-Container-Sysmeta-Shard-Quoted-Root': quote( broker.root_path), - 'X-Container-Sysmeta-Sharding': True} + 'X-Container-Sysmeta-Sharding': 'True', + 'X-Backend-Auto-Create': 'True'} # NB: we *used* to send along # 'X-Container-Sysmeta-Shard-Root': broker.root_path # but that isn't safe for container names with nulls or newlines @@ -1468,7 +1470,7 @@ class ContainerSharder(ContainerReplicator): def _update_root_container(self, broker): own_shard_range = broker.get_own_shard_range(no_default=True) - if not own_shard_range: + if not own_shard_range or own_shard_range.reported: return # persist the reported shard metadata @@ -1478,9 +1480,12 @@ class ContainerSharder(ContainerReplicator): include_own=True, include_deleted=True) # send everything - self._send_shard_ranges( - broker.root_account, broker.root_container, - shard_ranges) + if self._send_shard_ranges( + broker.root_account, broker.root_container, shard_ranges): + # on success, mark ourselves as reported so we don't keep + # hammering the root + own_shard_range.reported = True + broker.merge_shard_ranges(own_shard_range) def _process_broker(self, broker, node, part): broker.get_info() # make sure account/container are populated diff --git a/swift/obj/updater.py b/swift/obj/updater.py index 466f294c0..f3a01a824 100644 --- a/swift/obj/updater.py +++ b/swift/obj/updater.py @@ -14,12 +14,13 @@ # limitations under the License. import six.moves.cPickle as pickle +import errno import os import signal import sys import time from swift import gettext_ as _ -from random import random +from random import random, shuffle from eventlet import spawn, Timeout @@ -230,7 +231,9 @@ class ObjectUpdater(Daemon): 'to a valid policy (%(error)s)') % { 'directory': asyncdir, 'error': e}) continue - for prefix in self._listdir(async_pending): + prefix_dirs = self._listdir(async_pending) + shuffle(prefix_dirs) + for prefix in prefix_dirs: prefix_path = os.path.join(async_pending, prefix) if not os.path.isdir(prefix_path): continue @@ -271,7 +274,11 @@ class ObjectUpdater(Daemon): if obj_hash == last_obj_hash: self.stats.unlinks += 1 self.logger.increment('unlinks') - os.unlink(update_path) + try: + os.unlink(update_path) + except OSError as e: + if e.errno != errno.ENOENT: + raise else: last_obj_hash = obj_hash yield {'device': device, 'policy': policy, |