summaryrefslogtreecommitdiff
path: root/swift
diff options
context:
space:
mode:
authorTim Burke <tim.burke@gmail.com>2020-06-18 09:41:46 -0700
committerTim Burke <tim.burke@gmail.com>2020-06-18 09:41:46 -0700
commit481f126e6b59689599f438e5d27f7328f5b3e813 (patch)
tree14212db13aee782e95ffd36993d74c6bf35df0cb /swift
parentb3fd0bd9d82160305a821e742b2cd968036911b2 (diff)
parent51a587ed8dd5700b558ad26d70dcb7facc0f91e4 (diff)
downloadswift-feature/losf.tar.gz
Merge remote-tracking branch 'gerrit/master' into feature/losffeature/losf
Change-Id: If9d7c63f3c4c15fbccff31e2b77a6911bb95972a
Diffstat (limited to 'swift')
-rw-r--r--swift/cli/info.py17
-rw-r--r--swift/cli/relinker.py181
-rw-r--r--swift/common/db.py53
-rw-r--r--swift/common/memcached.py21
-rw-r--r--swift/common/middleware/memcache.py5
-rw-r--r--swift/common/middleware/ratelimit.py4
-rw-r--r--swift/common/middleware/s3api/controllers/obj.py24
-rw-r--r--swift/common/middleware/symlink.py31
-rw-r--r--swift/common/middleware/versioned_writes/object_versioning.py6
-rw-r--r--swift/common/swob.py27
-rw-r--r--swift/common/utils.py86
-rw-r--r--swift/container/backend.py70
-rw-r--r--swift/container/server.py44
-rw-r--r--swift/container/sharder.py17
-rw-r--r--swift/obj/updater.py13
15 files changed, 484 insertions, 115 deletions
diff --git a/swift/cli/info.py b/swift/cli/info.py
index 49c8da88e..dc29faded 100644
--- a/swift/cli/info.py
+++ b/swift/cli/info.py
@@ -57,6 +57,8 @@ def parse_get_node_args(options, args):
else:
raise InfoSystemExit('Ring file does not exist')
+ if options.quoted:
+ args = [urllib.parse.unquote(arg) for arg in args]
if len(args) == 1:
args = args[0].strip('/').split('/', 2)
@@ -614,15 +616,15 @@ def print_item_locations(ring, ring_name=None, account=None, container=None,
ring = POLICIES.get_object_ring(policy_index, swift_dir)
ring_name = (POLICIES.get_by_name(policy_name)).ring_name
- if account is None and (container is not None or obj is not None):
+ if (container or obj) and not account:
print('No account specified')
raise InfoSystemExit()
- if container is None and obj is not None:
+ if obj and not container:
print('No container specified')
raise InfoSystemExit()
- if account is None and part is None:
+ if not account and not part:
print('No target specified')
raise InfoSystemExit()
@@ -654,8 +656,11 @@ def print_item_locations(ring, ring_name=None, account=None, container=None,
print('Warning: account specified ' +
'but ring not named "account"')
- print('\nAccount \t%s' % account)
- print('Container\t%s' % container)
- print('Object \t%s\n\n' % obj)
+ if account:
+ print('\nAccount \t%s' % urllib.parse.quote(account))
+ if container:
+ print('Container\t%s' % urllib.parse.quote(container))
+ if obj:
+ print('Object \t%s\n\n' % urllib.parse.quote(obj))
print_ring_locations(ring, loc, account, container, obj, part, all_nodes,
policy_index=policy_index)
diff --git a/swift/cli/relinker.py b/swift/cli/relinker.py
index b7b4aaf73..630c0e98e 100644
--- a/swift/cli/relinker.py
+++ b/swift/cli/relinker.py
@@ -14,8 +14,12 @@
# limitations under the License.
+import errno
+import fcntl
+import json
import logging
import os
+from functools import partial
from swift.common.storage_policy import POLICIES
from swift.common.exceptions import DiskFileDeleted, DiskFileNotExist, \
DiskFileQuarantined
@@ -24,10 +28,126 @@ from swift.common.utils import replace_partition_in_path, \
from swift.obj import diskfile
+LOCK_FILE = '.relink.{datadir}.lock'
+STATE_FILE = 'relink.{datadir}.json'
+STATE_TMP_FILE = '.relink.{datadir}.json.tmp'
+STEP_RELINK = 'relink'
+STEP_CLEANUP = 'cleanup'
+
+
+def devices_filter(device, _, devices):
+ if device:
+ devices = [d for d in devices if d == device]
+
+ return set(devices)
+
+
+def hook_pre_device(locks, states, datadir, device_path):
+ lock_file = os.path.join(device_path, LOCK_FILE.format(datadir=datadir))
+
+ fd = os.open(lock_file, os.O_CREAT | os.O_WRONLY)
+ fcntl.flock(fd, fcntl.LOCK_EX)
+ locks[0] = fd
+
+ state_file = os.path.join(device_path, STATE_FILE.format(datadir=datadir))
+ states.clear()
+ try:
+ with open(state_file, 'rt') as f:
+ tmp = json.load(f)
+ states.update(tmp)
+ except ValueError:
+ # Invalid JSON: remove the file to restart from scratch
+ os.unlink(state_file)
+ except IOError as err:
+ # Ignore file not found error
+ if err.errno != errno.ENOENT:
+ raise
+
+
+def hook_post_device(locks, _):
+ os.close(locks[0])
+ locks[0] = None
+
+
+def partitions_filter(states, step, part_power, next_part_power,
+ datadir_path, partitions):
+ # Remove all non partitions first (eg: auditor_status_ALL.json)
+ partitions = [p for p in partitions if p.isdigit()]
+
+ if not (step == STEP_CLEANUP and part_power == next_part_power):
+ # This is not a cleanup after cancel, partitions in the upper half are
+ # new partitions, there is nothing to relink/cleanup from there
+ partitions = [p for p in partitions
+ if int(p) < 2 ** next_part_power / 2]
+
+ # Format: { 'part': [relinked, cleaned] }
+ if states:
+ missing = list(set(partitions) - set(states.keys()))
+ if missing:
+ # All missing partitions was created after the first run of
+ # relink, so after the new ring was distribued, so they already
+ # are hardlinked in both partitions, but they will need to
+ # cleaned.. Just update the state file.
+ for part in missing:
+ states[part] = [True, False]
+ if step == STEP_RELINK:
+ partitions = [str(p) for p, (r, c) in states.items() if not r]
+ elif step == STEP_CLEANUP:
+ partitions = [str(p) for p, (r, c) in states.items() if not c]
+ else:
+ states.update({str(p): [False, False] for p in partitions})
+
+ # Always scan the partitions in reverse order to minimize the amount of IO
+ # (it actually only matters for relink, not for cleanup).
+ #
+ # Initial situation:
+ # objects/0/000/00000000000000000000000000000000/12345.data
+ # -> relinked to objects/1/000/10000000000000000000000000000000/12345.data
+ #
+ # If the relinker then scan partition 1, it will listdir that object while
+ # it's unnecessary. By working in reverse order of partitions, this is
+ # avoided.
+ partitions = sorted(partitions, key=lambda x: int(x), reverse=True)
+
+ return partitions
+
+
+# Save states when a partition is done
+def hook_post_partition(states, step,
+ partition_path):
+ part = os.path.basename(os.path.abspath(partition_path))
+ datadir_path = os.path.dirname(os.path.abspath(partition_path))
+ device_path = os.path.dirname(os.path.abspath(datadir_path))
+ datadir_name = os.path.basename(os.path.abspath(datadir_path))
+ state_tmp_file = os.path.join(device_path,
+ STATE_TMP_FILE.format(datadir=datadir_name))
+ state_file = os.path.join(device_path,
+ STATE_FILE.format(datadir=datadir_name))
+
+ if step == STEP_RELINK:
+ states[part][0] = True
+ elif step == STEP_CLEANUP:
+ states[part][1] = True
+ with open(state_tmp_file, 'wt') as f:
+ json.dump(states, f)
+ os.fsync(f.fileno())
+ os.rename(state_tmp_file, state_file)
+
+
+def hashes_filter(next_part_power, suff_path, hashes):
+ hashes = list(hashes)
+ for hsh in hashes:
+ fname = os.path.join(suff_path, hsh, 'fake-file-name')
+ if replace_partition_in_path(fname, next_part_power) == fname:
+ hashes.remove(hsh)
+ return hashes
+
+
def relink(swift_dir='/etc/swift',
devices='/srv/node',
skip_mount_check=False,
- logger=logging.getLogger()):
+ logger=logging.getLogger(),
+ device=None):
mount_check = not skip_mount_check
run = False
relinked = errors = 0
@@ -41,10 +161,31 @@ def relink(swift_dir='/etc/swift',
logging.info('Relinking files for policy %s under %s',
policy.name, devices)
run = True
+ datadir = diskfile.get_data_dir(policy)
+
+ locks = [None]
+ states = {}
+ relink_devices_filter = partial(devices_filter, device)
+ relink_hook_pre_device = partial(hook_pre_device, locks, states,
+ datadir)
+ relink_hook_post_device = partial(hook_post_device, locks)
+ relink_partition_filter = partial(partitions_filter,
+ states, STEP_RELINK,
+ part_power, next_part_power)
+ relink_hook_post_partition = partial(hook_post_partition,
+ states, STEP_RELINK)
+ relink_hashes_filter = partial(hashes_filter, next_part_power)
+
locations = audit_location_generator(
devices,
- diskfile.get_data_dir(policy),
- mount_check=mount_check)
+ datadir,
+ mount_check=mount_check,
+ devices_filter=relink_devices_filter,
+ hook_pre_device=relink_hook_pre_device,
+ hook_post_device=relink_hook_post_device,
+ partitions_filter=relink_partition_filter,
+ hook_post_partition=relink_hook_post_partition,
+ hashes_filter=relink_hashes_filter)
for fname, _, _ in locations:
newfname = replace_partition_in_path(fname, next_part_power)
try:
@@ -67,7 +208,8 @@ def relink(swift_dir='/etc/swift',
def cleanup(swift_dir='/etc/swift',
devices='/srv/node',
skip_mount_check=False,
- logger=logging.getLogger()):
+ logger=logging.getLogger(),
+ device=None):
mount_check = not skip_mount_check
conf = {'devices': devices, 'mount_check': mount_check}
diskfile_router = diskfile.DiskFileRouter(conf, get_logger(conf))
@@ -83,10 +225,31 @@ def cleanup(swift_dir='/etc/swift',
logging.info('Cleaning up files for policy %s under %s',
policy.name, devices)
run = True
+ datadir = diskfile.get_data_dir(policy)
+
+ locks = [None]
+ states = {}
+ cleanup_devices_filter = partial(devices_filter, device)
+ cleanup_hook_pre_device = partial(hook_pre_device, locks, states,
+ datadir)
+ cleanup_hook_post_device = partial(hook_post_device, locks)
+ cleanup_partition_filter = partial(partitions_filter,
+ states, STEP_CLEANUP,
+ part_power, next_part_power)
+ cleanup_hook_post_partition = partial(hook_post_partition,
+ states, STEP_CLEANUP)
+ cleanup_hashes_filter = partial(hashes_filter, next_part_power)
+
locations = audit_location_generator(
devices,
- diskfile.get_data_dir(policy),
- mount_check=mount_check)
+ datadir,
+ mount_check=mount_check,
+ devices_filter=cleanup_devices_filter,
+ hook_pre_device=cleanup_hook_pre_device,
+ hook_post_device=cleanup_hook_post_device,
+ partitions_filter=cleanup_partition_filter,
+ hook_post_partition=cleanup_hook_post_partition,
+ hashes_filter=cleanup_hashes_filter)
for fname, device, partition in locations:
expected_fname = replace_partition_in_path(fname, part_power)
if fname == expected_fname:
@@ -152,8 +315,10 @@ def main(args):
if args.action == 'relink':
return relink(
- args.swift_dir, args.devices, args.skip_mount_check, logger)
+ args.swift_dir, args.devices, args.skip_mount_check, logger,
+ device=args.device)
if args.action == 'cleanup':
return cleanup(
- args.swift_dir, args.devices, args.skip_mount_check, logger)
+ args.swift_dir, args.devices, args.skip_mount_check, logger,
+ device=args.device)
diff --git a/swift/common/db.py b/swift/common/db.py
index c6df12aa3..e06baf5c6 100644
--- a/swift/common/db.py
+++ b/swift/common/db.py
@@ -53,6 +53,9 @@ PICKLE_PROTOCOL = 2
# records will be merged.
PENDING_CAP = 131072
+SQLITE_ARG_LIMIT = 999
+RECLAIM_PAGE_SIZE = 10000
+
def utf8encode(*args):
return [(s.encode('utf8') if isinstance(s, six.text_type) else s)
@@ -981,16 +984,48 @@ class DatabaseBroker(object):
with lock_parent_directory(self.pending_file,
self.pending_timeout):
self._commit_puts()
- with self.get() as conn:
- self._reclaim(conn, age_timestamp, sync_timestamp)
- self._reclaim_metadata(conn, age_timestamp)
- conn.commit()
+ marker = ''
+ finished = False
+ while not finished:
+ with self.get() as conn:
+ marker = self._reclaim(conn, age_timestamp, marker)
+ if not marker:
+ finished = True
+ self._reclaim_other_stuff(
+ conn, age_timestamp, sync_timestamp)
+ conn.commit()
+
+ def _reclaim_other_stuff(self, conn, age_timestamp, sync_timestamp):
+ """
+ This is only called once at the end of reclaim after _reclaim has been
+ called for each page.
+ """
+ self._reclaim_sync(conn, sync_timestamp)
+ self._reclaim_metadata(conn, age_timestamp)
+
+ def _reclaim(self, conn, age_timestamp, marker):
+ clean_batch_qry = '''
+ DELETE FROM %s WHERE deleted = 1
+ AND name > ? AND %s < ?
+ ''' % (self.db_contains_type, self.db_reclaim_timestamp)
+ curs = conn.execute('''
+ SELECT name FROM %s WHERE deleted = 1
+ AND name > ?
+ ORDER BY NAME LIMIT 1 OFFSET ?
+ ''' % (self.db_contains_type,), (marker, RECLAIM_PAGE_SIZE))
+ row = curs.fetchone()
+ if row:
+ # do a single book-ended DELETE and bounce out
+ end_marker = row[0]
+ conn.execute(clean_batch_qry + ' AND name <= ?', (
+ marker, age_timestamp, end_marker))
+ else:
+ # delete off the end and reset marker to indicate we're done
+ end_marker = ''
+ conn.execute(clean_batch_qry, (marker, age_timestamp))
+ return end_marker
- def _reclaim(self, conn, age_timestamp, sync_timestamp):
- conn.execute('''
- DELETE FROM %s WHERE deleted = 1 AND %s < ?
- ''' % (self.db_contains_type, self.db_reclaim_timestamp),
- (age_timestamp,))
+ def _reclaim_sync(self, conn, sync_timestamp):
try:
conn.execute('''
DELETE FROM outgoing_sync WHERE updated_at < ?
diff --git a/swift/common/memcached.py b/swift/common/memcached.py
index a80fa0fb6..08da5c7ba 100644
--- a/swift/common/memcached.py
+++ b/swift/common/memcached.py
@@ -160,7 +160,7 @@ class MemcacheRing(object):
def __init__(self, servers, connect_timeout=CONN_TIMEOUT,
io_timeout=IO_TIMEOUT, pool_timeout=POOL_TIMEOUT,
tries=TRY_COUNT, allow_pickle=False, allow_unpickle=False,
- max_conns=2):
+ max_conns=2, logger=None):
self._ring = {}
self._errors = dict(((serv, []) for serv in servers))
self._error_limited = dict(((serv, 0) for serv in servers))
@@ -178,18 +178,23 @@ class MemcacheRing(object):
self._pool_timeout = pool_timeout
self._allow_pickle = allow_pickle
self._allow_unpickle = allow_unpickle or allow_pickle
+ if logger is None:
+ self.logger = logging.getLogger()
+ else:
+ self.logger = logger
def _exception_occurred(self, server, e, action='talking',
sock=None, fp=None, got_connection=True):
if isinstance(e, Timeout):
- logging.error("Timeout %(action)s to memcached: %(server)s",
- {'action': action, 'server': server})
+ self.logger.error("Timeout %(action)s to memcached: %(server)s",
+ {'action': action, 'server': server})
elif isinstance(e, (socket.error, MemcacheConnectionError)):
- logging.error("Error %(action)s to memcached: %(server)s: %(err)s",
- {'action': action, 'server': server, 'err': e})
+ self.logger.error(
+ "Error %(action)s to memcached: %(server)s: %(err)s",
+ {'action': action, 'server': server, 'err': e})
else:
- logging.exception("Error %(action)s to memcached: %(server)s",
- {'action': action, 'server': server})
+ self.logger.exception("Error %(action)s to memcached: %(server)s",
+ {'action': action, 'server': server})
try:
if fp:
fp.close()
@@ -213,7 +218,7 @@ class MemcacheRing(object):
if err > now - ERROR_LIMIT_TIME]
if len(self._errors[server]) > ERROR_LIMIT_COUNT:
self._error_limited[server] = now + ERROR_LIMIT_DURATION
- logging.error('Error limiting server %s', server)
+ self.logger.error('Error limiting server %s', server)
def _get_conns(self, key):
"""
diff --git a/swift/common/middleware/memcache.py b/swift/common/middleware/memcache.py
index e846749cb..b5b9569a5 100644
--- a/swift/common/middleware/memcache.py
+++ b/swift/common/middleware/memcache.py
@@ -19,6 +19,7 @@ from six.moves.configparser import ConfigParser, NoSectionError, NoOptionError
from swift.common.memcached import (MemcacheRing, CONN_TIMEOUT, POOL_TIMEOUT,
IO_TIMEOUT, TRY_COUNT)
+from swift.common.utils import get_logger
class MemcacheMiddleware(object):
@@ -28,6 +29,7 @@ class MemcacheMiddleware(object):
def __init__(self, app, conf):
self.app = app
+ self.logger = get_logger(conf, log_route='memcache')
self.memcache_servers = conf.get('memcache_servers')
serialization_format = conf.get('memcache_serialization_support')
try:
@@ -102,7 +104,8 @@ class MemcacheMiddleware(object):
io_timeout=io_timeout,
allow_pickle=(serialization_format == 0),
allow_unpickle=(serialization_format <= 1),
- max_conns=max_conns)
+ max_conns=max_conns,
+ logger=self.logger)
def __call__(self, env, start_response):
env['swift.cache'] = self.memcache
diff --git a/swift/common/middleware/ratelimit.py b/swift/common/middleware/ratelimit.py
index 72e1d6a40..9d3ff2fdd 100644
--- a/swift/common/middleware/ratelimit.py
+++ b/swift/common/middleware/ratelimit.py
@@ -242,6 +242,10 @@ class RateLimitMiddleware(object):
if not self.memcache_client:
return None
+ if req.environ.get('swift.ratelimit.handled'):
+ return None
+ req.environ['swift.ratelimit.handled'] = True
+
try:
account_info = get_account_info(req.environ, self.app,
swift_source='RL')
diff --git a/swift/common/middleware/s3api/controllers/obj.py b/swift/common/middleware/s3api/controllers/obj.py
index 293b14702..716c837b6 100644
--- a/swift/common/middleware/s3api/controllers/obj.py
+++ b/swift/common/middleware/s3api/controllers/obj.py
@@ -26,7 +26,7 @@ from swift.common.middleware.versioned_writes.object_versioning import \
from swift.common.middleware.s3api.utils import S3Timestamp, sysmeta_header
from swift.common.middleware.s3api.controllers.base import Controller
from swift.common.middleware.s3api.s3response import S3NotImplemented, \
- InvalidRange, NoSuchKey, InvalidArgument, HTTPNoContent, \
+ InvalidRange, NoSuchKey, NoSuchVersion, InvalidArgument, HTTPNoContent, \
PreconditionFailed
@@ -88,7 +88,15 @@ class ObjectController(Controller):
if version_id not in ('null', None) and \
'object_versioning' not in get_swift_info():
raise S3NotImplemented()
+
query = {} if version_id is None else {'version-id': version_id}
+ if version_id not in ('null', None):
+ container_info = req.get_container_info(self.app)
+ if not container_info.get(
+ 'sysmeta', {}).get('versions-container', ''):
+ # Versioning has never been enabled
+ raise NoSuchVersion(object_name, version_id)
+
resp = req.get_response(self.app, query=query)
if req.method == 'HEAD':
@@ -193,17 +201,25 @@ class ObjectController(Controller):
'object_versioning' not in get_swift_info():
raise S3NotImplemented()
+ version_id = req.params.get('versionId')
+ if version_id not in ('null', None):
+ container_info = req.get_container_info(self.app)
+ if not container_info.get(
+ 'sysmeta', {}).get('versions-container', ''):
+ # Versioning has never been enabled
+ return HTTPNoContent(headers={'x-amz-version-id': version_id})
+
try:
try:
query = req.gen_multipart_manifest_delete_query(
- self.app, version=req.params.get('versionId'))
+ self.app, version=version_id)
except NoSuchKey:
query = {}
req.headers['Content-Type'] = None # Ignore client content-type
- if 'versionId' in req.params:
- query['version-id'] = req.params['versionId']
+ if version_id is not None:
+ query['version-id'] = version_id
query['symlink'] = 'get'
resp = req.get_response(self.app, query=query)
diff --git a/swift/common/middleware/symlink.py b/swift/common/middleware/symlink.py
index d2c644438..bde163aa0 100644
--- a/swift/common/middleware/symlink.py
+++ b/swift/common/middleware/symlink.py
@@ -205,7 +205,8 @@ from swift.common.utils import get_logger, register_swift_info, split_path, \
MD5_OF_EMPTY_STRING, close_if_possible, closing_if_possible, \
config_true_value, drain_and_close
from swift.common.constraints import check_account_format
-from swift.common.wsgi import WSGIContext, make_subrequest
+from swift.common.wsgi import WSGIContext, make_subrequest, \
+ make_pre_authed_request
from swift.common.request_helpers import get_sys_meta_prefix, \
check_path_header, get_container_update_override_key, \
update_ignore_range_header
@@ -442,7 +443,9 @@ class SymlinkObjectContext(WSGIContext):
content_type='text/plain')
def _recursive_get_head(self, req, target_etag=None,
- follow_softlinks=True):
+ follow_softlinks=True, orig_req=None):
+ if not orig_req:
+ orig_req = req
resp = self._app_call(req.environ)
def build_traversal_req(symlink_target):
@@ -457,9 +460,20 @@ class SymlinkObjectContext(WSGIContext):
'/', version, account,
symlink_target.lstrip('/'))
self._last_target_path = target_path
- new_req = make_subrequest(
- req.environ, path=target_path, method=req.method,
- headers=req.headers, swift_source='SYM')
+
+ subreq_headers = dict(req.headers)
+ if self._response_header_value(ALLOW_RESERVED_NAMES):
+ # this symlink's sysmeta says it can point to reserved names,
+ # we're infering that some piece of middleware had previously
+ # authorized this request because users can't access reserved
+ # names directly
+ subreq_meth = make_pre_authed_request
+ subreq_headers['X-Backend-Allow-Reserved-Names'] = 'true'
+ else:
+ subreq_meth = make_subrequest
+ new_req = subreq_meth(orig_req.environ, path=target_path,
+ method=req.method, headers=subreq_headers,
+ swift_source='SYM')
new_req.headers.pop('X-Backend-Storage-Policy-Index', None)
return new_req
@@ -484,11 +498,8 @@ class SymlinkObjectContext(WSGIContext):
if not config_true_value(
self._response_header_value(SYMLOOP_EXTEND)):
self._loop_count += 1
- if config_true_value(
- self._response_header_value(ALLOW_RESERVED_NAMES)):
- new_req.headers['X-Backend-Allow-Reserved-Names'] = 'true'
-
- return self._recursive_get_head(new_req, target_etag=resp_etag)
+ return self._recursive_get_head(new_req, target_etag=resp_etag,
+ orig_req=req)
else:
final_etag = self._response_header_value('etag')
if final_etag and target_etag and target_etag != final_etag:
diff --git a/swift/common/middleware/versioned_writes/object_versioning.py b/swift/common/middleware/versioned_writes/object_versioning.py
index 5c9b72d5c..508972f72 100644
--- a/swift/common/middleware/versioned_writes/object_versioning.py
+++ b/swift/common/middleware/versioned_writes/object_versioning.py
@@ -152,7 +152,7 @@ from cgi import parse_header
from six.moves.urllib.parse import unquote
from swift.common.constraints import MAX_FILE_SIZE, valid_api_version, \
- ACCOUNT_LISTING_LIMIT
+ ACCOUNT_LISTING_LIMIT, CONTAINER_LISTING_LIMIT
from swift.common.http import is_success, is_client_error, HTTP_NOT_FOUND, \
HTTP_CONFLICT
from swift.common.request_helpers import get_sys_meta_prefix, \
@@ -1191,7 +1191,7 @@ class ContainerContext(ObjectVersioningContext):
'hash': item['hash'],
'last_modified': item['last_modified'],
})
- limit = constrain_req_limit(req, ACCOUNT_LISTING_LIMIT)
+ limit = constrain_req_limit(req, CONTAINER_LISTING_LIMIT)
body = build_listing(
null_listing, subdir_listing, broken_listing,
reverse=config_true_value(params.get('reverse', 'no')),
@@ -1256,7 +1256,7 @@ class ContainerContext(ObjectVersioningContext):
'last_modified': item['last_modified'],
})
- limit = constrain_req_limit(req, ACCOUNT_LISTING_LIMIT)
+ limit = constrain_req_limit(req, CONTAINER_LISTING_LIMIT)
body = build_listing(
null_listing, versions_listing,
subdir_listing, broken_listing,
diff --git a/swift/common/swob.py b/swift/common/swob.py
index 61b66793c..76fb2fbc9 100644
--- a/swift/common/swob.py
+++ b/swift/common/swob.py
@@ -43,7 +43,6 @@ from email.utils import parsedate
import re
import random
import functools
-import inspect
from io import BytesIO
import six
@@ -1563,23 +1562,15 @@ def wsgify(func):
return a Response object into WSGI callables. Also catches any raised
HTTPExceptions and treats them as a returned Response.
"""
- argspec = inspect.getargspec(func)
- if argspec.args and argspec.args[0] == 'self':
- @functools.wraps(func)
- def _wsgify_self(self, env, start_response):
- try:
- return func(self, Request(env))(env, start_response)
- except HTTPException as err_resp:
- return err_resp(env, start_response)
- return _wsgify_self
- else:
- @functools.wraps(func)
- def _wsgify_bare(env, start_response):
- try:
- return func(Request(env))(env, start_response)
- except HTTPException as err_resp:
- return err_resp(env, start_response)
- return _wsgify_bare
+ @functools.wraps(func)
+ def _wsgify(*args):
+ env, start_response = args[-2:]
+ new_args = args[:-2] + (Request(env), )
+ try:
+ return func(*new_args)(env, start_response)
+ except HTTPException as err_resp:
+ return err_resp(env, start_response)
+ return _wsgify
class StatusMap(object):
diff --git a/swift/common/utils.py b/swift/common/utils.py
index df8713d3a..23a137e6c 100644
--- a/swift/common/utils.py
+++ b/swift/common/utils.py
@@ -3152,11 +3152,26 @@ def remove_directory(path):
def audit_location_generator(devices, datadir, suffix='',
- mount_check=True, logger=None):
+ mount_check=True, logger=None,
+ devices_filter=None, partitions_filter=None,
+ suffixes_filter=None, hashes_filter=None,
+ hook_pre_device=None, hook_post_device=None,
+ hook_pre_partition=None, hook_post_partition=None,
+ hook_pre_suffix=None, hook_post_suffix=None,
+ hook_pre_hash=None, hook_post_hash=None):
"""
Given a devices path and a data directory, yield (path, device,
partition) for all files in that directory
+ (devices|partitions|suffixes|hashes)_filter are meant to modify the list of
+ elements that will be iterated. eg: they can be used to exclude some
+ elements based on a custom condition defined by the caller.
+
+ hook_pre_(device|partition|suffix|hash) are called before yielding the
+ element, hook_pos_(device|partition|suffix|hash) are called after the
+ element was yielded. They are meant to do some pre/post processing.
+ eg: saving a progress status.
+
:param devices: parent directory of the devices to be audited
:param datadir: a directory located under self.devices. This should be
one of the DATADIR constants defined in the account,
@@ -3165,11 +3180,31 @@ def audit_location_generator(devices, datadir, suffix='',
:param mount_check: Flag to check if a mount check should be performed
on devices
:param logger: a logger object
+ :devices_filter: a callable taking (devices, [list of devices]) as
+ parameters and returning a [list of devices]
+ :partitions_filter: a callable taking (datadir_path, [list of parts]) as
+ parameters and returning a [list of parts]
+ :suffixes_filter: a callable taking (part_path, [list of suffixes]) as
+ parameters and returning a [list of suffixes]
+ :hashes_filter: a callable taking (suff_path, [list of hashes]) as
+ parameters and returning a [list of hashes]
+ :hook_pre_device: a callable taking device_path as parameter
+ :hook_post_device: a callable taking device_path as parameter
+ :hook_pre_partition: a callable taking part_path as parameter
+ :hook_post_partition: a callable taking part_path as parameter
+ :hook_pre_suffix: a callable taking suff_path as parameter
+ :hook_post_suffix: a callable taking suff_path as parameter
+ :hook_pre_hash: a callable taking hash_path as parameter
+ :hook_post_hash: a callable taking hash_path as parameter
"""
device_dir = listdir(devices)
# randomize devices in case of process restart before sweep completed
shuffle(device_dir)
+ if devices_filter:
+ device_dir = devices_filter(devices, device_dir)
for device in device_dir:
+ if hook_pre_device:
+ hook_pre_device(os.path.join(devices, device))
if mount_check and not ismount(os.path.join(devices, device)):
if logger:
logger.warning(
@@ -3183,24 +3218,36 @@ def audit_location_generator(devices, datadir, suffix='',
logger.warning(_('Skipping %(datadir)s because %(err)s'),
{'datadir': datadir_path, 'err': e})
continue
+ if partitions_filter:
+ partitions = partitions_filter(datadir_path, partitions)
for partition in partitions:
part_path = os.path.join(datadir_path, partition)
+ if hook_pre_partition:
+ hook_pre_partition(part_path)
try:
suffixes = listdir(part_path)
except OSError as e:
if e.errno != errno.ENOTDIR:
raise
continue
+ if suffixes_filter:
+ suffixes = suffixes_filter(part_path, suffixes)
for asuffix in suffixes:
suff_path = os.path.join(part_path, asuffix)
+ if hook_pre_suffix:
+ hook_pre_suffix(suff_path)
try:
hashes = listdir(suff_path)
except OSError as e:
if e.errno != errno.ENOTDIR:
raise
continue
+ if hashes_filter:
+ hashes = hashes_filter(suff_path, hashes)
for hsh in hashes:
hash_path = os.path.join(suff_path, hsh)
+ if hook_pre_hash:
+ hook_pre_hash(hash_path)
try:
files = sorted(listdir(hash_path), reverse=True)
except OSError as e:
@@ -3212,6 +3259,14 @@ def audit_location_generator(devices, datadir, suffix='',
continue
path = os.path.join(hash_path, fname)
yield path, device, partition
+ if hook_post_hash:
+ hook_post_hash(hash_path)
+ if hook_post_suffix:
+ hook_post_suffix(suff_path)
+ if hook_post_partition:
+ hook_post_partition(part_path)
+ if hook_post_device:
+ hook_post_device(os.path.join(devices, device))
def ratelimit_sleep(running_time, max_rate, incr_by=1, rate_buffer=5):
@@ -4814,6 +4869,8 @@ class ShardRange(object):
value.
:param epoch: optional epoch timestamp which represents the time at which
sharding was enabled for a container.
+ :param reported: optional indicator that this shard and its stats have
+ been reported to the root container.
"""
FOUND = 10
CREATED = 20
@@ -4864,7 +4921,8 @@ class ShardRange(object):
def __init__(self, name, timestamp, lower=MIN, upper=MAX,
object_count=0, bytes_used=0, meta_timestamp=None,
- deleted=False, state=None, state_timestamp=None, epoch=None):
+ deleted=False, state=None, state_timestamp=None, epoch=None,
+ reported=False):
self.account = self.container = self._timestamp = \
self._meta_timestamp = self._state_timestamp = self._epoch = None
self._lower = ShardRange.MIN
@@ -4883,6 +4941,7 @@ class ShardRange(object):
self.state = self.FOUND if state is None else state
self.state_timestamp = state_timestamp
self.epoch = epoch
+ self.reported = reported
@classmethod
def _encode(cls, value):
@@ -5063,8 +5122,14 @@ class ShardRange(object):
cast to an int, or if meta_timestamp is neither None nor can be
cast to a :class:`~swift.common.utils.Timestamp`.
"""
- self.object_count = int(object_count)
- self.bytes_used = int(bytes_used)
+ if self.object_count != int(object_count):
+ self.object_count = int(object_count)
+ self.reported = False
+
+ if self.bytes_used != int(bytes_used):
+ self.bytes_used = int(bytes_used)
+ self.reported = False
+
if meta_timestamp is None:
self.meta_timestamp = Timestamp.now()
else:
@@ -5145,6 +5210,14 @@ class ShardRange(object):
def epoch(self, epoch):
self._epoch = self._to_timestamp(epoch)
+ @property
+ def reported(self):
+ return self._reported
+
+ @reported.setter
+ def reported(self, value):
+ self._reported = bool(value)
+
def update_state(self, state, state_timestamp=None):
"""
Set state to the given value and optionally update the state_timestamp
@@ -5161,6 +5234,7 @@ class ShardRange(object):
self.state = state
if state_timestamp is not None:
self.state_timestamp = state_timestamp
+ self.reported = False
return True
@property
@@ -5283,6 +5357,7 @@ class ShardRange(object):
yield 'state', self.state
yield 'state_timestamp', self.state_timestamp.internal
yield 'epoch', self.epoch.internal if self.epoch is not None else None
+ yield 'reported', 1 if self.reported else 0
def copy(self, timestamp=None, **kwargs):
"""
@@ -5314,7 +5389,8 @@ class ShardRange(object):
params['name'], params['timestamp'], params['lower'],
params['upper'], params['object_count'], params['bytes_used'],
params['meta_timestamp'], params['deleted'], params['state'],
- params['state_timestamp'], params['epoch'])
+ params['state_timestamp'], params['epoch'],
+ params.get('reported', 0))
def find_shard_range(item, ranges):
diff --git a/swift/container/backend.py b/swift/container/backend.py
index 0a18fe48f..bdf34f7d8 100644
--- a/swift/container/backend.py
+++ b/swift/container/backend.py
@@ -34,9 +34,7 @@ from swift.common.utils import Timestamp, encode_timestamps, \
get_db_files, parse_db_filename, make_db_file_path, split_path, \
RESERVED_BYTE
from swift.common.db import DatabaseBroker, utf8encode, BROKER_TIMEOUT, \
- zero_like, DatabaseAlreadyExists
-
-SQLITE_ARG_LIMIT = 999
+ zero_like, DatabaseAlreadyExists, SQLITE_ARG_LIMIT
DATADIR = 'containers'
@@ -62,7 +60,7 @@ SHARD_UPDATE_STATES = [ShardRange.CREATED, ShardRange.CLEAVED,
# tuples and vice-versa
SHARD_RANGE_KEYS = ('name', 'timestamp', 'lower', 'upper', 'object_count',
'bytes_used', 'meta_timestamp', 'deleted', 'state',
- 'state_timestamp', 'epoch')
+ 'state_timestamp', 'epoch', 'reported')
POLICY_STAT_TABLE_CREATE = '''
CREATE TABLE policy_stat (
@@ -269,6 +267,7 @@ def merge_shards(shard_data, existing):
if existing['timestamp'] < shard_data['timestamp']:
# note that currently we do not roll forward any meta or state from
# an item that was created at older time, newer created time trumps
+ shard_data['reported'] = 0 # reset the latch
return True
elif existing['timestamp'] > shard_data['timestamp']:
return False
@@ -285,6 +284,18 @@ def merge_shards(shard_data, existing):
else:
new_content = True
+ # We can latch the reported flag
+ if existing['reported'] and \
+ existing['object_count'] == shard_data['object_count'] and \
+ existing['bytes_used'] == shard_data['bytes_used'] and \
+ existing['state'] == shard_data['state'] and \
+ existing['epoch'] == shard_data['epoch']:
+ shard_data['reported'] = 1
+ else:
+ shard_data.setdefault('reported', 0)
+ if shard_data['reported'] and not existing['reported']:
+ new_content = True
+
if (existing['state_timestamp'] == shard_data['state_timestamp']
and shard_data['state'] > existing['state']):
new_content = True
@@ -597,7 +608,8 @@ class ContainerBroker(DatabaseBroker):
deleted INTEGER DEFAULT 0,
state INTEGER,
state_timestamp TEXT,
- epoch TEXT
+ epoch TEXT,
+ reported INTEGER DEFAULT 0
);
""" % SHARD_RANGE_TABLE)
@@ -1430,10 +1442,13 @@ class ContainerBroker(DatabaseBroker):
# sqlite3.OperationalError: cannot start a transaction
# within a transaction
conn.rollback()
- if ('no such table: %s' % SHARD_RANGE_TABLE) not in str(err):
- raise
- self.create_shard_range_table(conn)
- return _really_merge_items(conn)
+ if 'no such column: reported' in str(err):
+ self._migrate_add_shard_range_reported(conn)
+ return _really_merge_items(conn)
+ if ('no such table: %s' % SHARD_RANGE_TABLE) in str(err):
+ self.create_shard_range_table(conn)
+ return _really_merge_items(conn)
+ raise
def get_reconciler_sync(self):
with self.get() as conn:
@@ -1581,9 +1596,20 @@ class ContainerBroker(DatabaseBroker):
CONTAINER_STAT_VIEW_SCRIPT +
'COMMIT;')
- def _reclaim(self, conn, age_timestamp, sync_timestamp):
- super(ContainerBroker, self)._reclaim(conn, age_timestamp,
- sync_timestamp)
+ def _migrate_add_shard_range_reported(self, conn):
+ """
+ Add the reported column to the 'shard_range' table.
+ """
+ conn.executescript('''
+ BEGIN;
+ ALTER TABLE %s
+ ADD COLUMN reported INTEGER DEFAULT 0;
+ COMMIT;
+ ''' % SHARD_RANGE_TABLE)
+
+ def _reclaim_other_stuff(self, conn, age_timestamp, sync_timestamp):
+ super(ContainerBroker, self)._reclaim_other_stuff(
+ conn, age_timestamp, sync_timestamp)
# populate instance cache, but use existing conn to avoid deadlock
# when it has a pending update
self._populate_instance_cache(conn=conn)
@@ -1630,7 +1656,7 @@ class ContainerBroker(DatabaseBroker):
elif states is not None:
included_states.add(states)
- def do_query(conn):
+ def do_query(conn, use_reported_column=True):
condition = ''
conditions = []
params = []
@@ -1648,21 +1674,27 @@ class ContainerBroker(DatabaseBroker):
params.append(self.path)
if conditions:
condition = ' WHERE ' + ' AND '.join(conditions)
+ if use_reported_column:
+ columns = SHARD_RANGE_KEYS
+ else:
+ columns = SHARD_RANGE_KEYS[:-1] + ('0 as reported', )
sql = '''
SELECT %s
FROM %s%s;
- ''' % (', '.join(SHARD_RANGE_KEYS), SHARD_RANGE_TABLE, condition)
+ ''' % (', '.join(columns), SHARD_RANGE_TABLE, condition)
data = conn.execute(sql, params)
data.row_factory = None
return [row for row in data]
- try:
- with self.maybe_get(connection) as conn:
+ with self.maybe_get(connection) as conn:
+ try:
return do_query(conn)
- except sqlite3.OperationalError as err:
- if ('no such table: %s' % SHARD_RANGE_TABLE) not in str(err):
+ except sqlite3.OperationalError as err:
+ if ('no such table: %s' % SHARD_RANGE_TABLE) in str(err):
+ return []
+ if 'no such column: reported' in str(err):
+ return do_query(conn, use_reported_column=False)
raise
- return []
@classmethod
def resolve_shard_range_states(cls, states):
diff --git a/swift/container/server.py b/swift/container/server.py
index c8d7647aa..db9ac0291 100644
--- a/swift/container/server.py
+++ b/swift/container/server.py
@@ -155,6 +155,8 @@ class ContainerController(BaseStorageServer):
conf['auto_create_account_prefix']
else:
self.auto_create_account_prefix = AUTO_CREATE_ACCOUNT_PREFIX
+ self.shards_account_prefix = (
+ self.auto_create_account_prefix + 'shards_')
if config_true_value(conf.get('allow_versions', 'f')):
self.save_headers.append('x-versions-location')
if 'allow_versions' in conf:
@@ -375,14 +377,12 @@ class ContainerController(BaseStorageServer):
# auto create accounts)
obj_policy_index = self.get_and_validate_policy_index(req) or 0
broker = self._get_container_broker(drive, part, account, container)
- if account.startswith(self.auto_create_account_prefix) and obj and \
- not os.path.exists(broker.db_file):
- try:
- broker.initialize(req_timestamp.internal, obj_policy_index)
- except DatabaseAlreadyExists:
- pass
- if not os.path.exists(broker.db_file):
+ if obj:
+ self._maybe_autocreate(broker, req_timestamp, account,
+ obj_policy_index, req)
+ elif not os.path.exists(broker.db_file):
return HTTPNotFound()
+
if obj: # delete object
# redirect if a shard range exists for the object name
redirect = self._redirect_to_shard(req, broker, obj)
@@ -449,11 +449,25 @@ class ContainerController(BaseStorageServer):
broker.update_status_changed_at(timestamp)
return recreated
+ def _should_autocreate(self, account, req):
+ auto_create_header = req.headers.get('X-Backend-Auto-Create')
+ if auto_create_header:
+ # If the caller included an explicit X-Backend-Auto-Create header,
+ # assume they know the behavior they want
+ return config_true_value(auto_create_header)
+ if account.startswith(self.shards_account_prefix):
+ # we have to specical case this subset of the
+ # auto_create_account_prefix because we don't want the updater
+ # accidently auto-creating shards; only the sharder creates
+ # shards and it will explicitly tell the server to do so
+ return False
+ return account.startswith(self.auto_create_account_prefix)
+
def _maybe_autocreate(self, broker, req_timestamp, account,
- policy_index):
+ policy_index, req):
created = False
- if account.startswith(self.auto_create_account_prefix) and \
- not os.path.exists(broker.db_file):
+ should_autocreate = self._should_autocreate(account, req)
+ if should_autocreate and not os.path.exists(broker.db_file):
if policy_index is None:
raise HTTPBadRequest(
'X-Backend-Storage-Policy-Index header is required')
@@ -506,8 +520,8 @@ class ContainerController(BaseStorageServer):
# obj put expects the policy_index header, default is for
# legacy support during upgrade.
obj_policy_index = requested_policy_index or 0
- self._maybe_autocreate(broker, req_timestamp, account,
- obj_policy_index)
+ self._maybe_autocreate(
+ broker, req_timestamp, account, obj_policy_index, req)
# redirect if a shard exists for this object name
response = self._redirect_to_shard(req, broker, obj)
if response:
@@ -531,8 +545,8 @@ class ContainerController(BaseStorageServer):
for sr in json.loads(req.body)]
except (ValueError, KeyError, TypeError) as err:
return HTTPBadRequest('Invalid body: %r' % err)
- created = self._maybe_autocreate(broker, req_timestamp, account,
- requested_policy_index)
+ created = self._maybe_autocreate(
+ broker, req_timestamp, account, requested_policy_index, req)
self._update_metadata(req, broker, req_timestamp, 'PUT')
if shard_ranges:
# TODO: consider writing the shard ranges into the pending
@@ -805,7 +819,7 @@ class ContainerController(BaseStorageServer):
requested_policy_index = self.get_and_validate_policy_index(req)
broker = self._get_container_broker(drive, part, account, container)
self._maybe_autocreate(broker, req_timestamp, account,
- requested_policy_index)
+ requested_policy_index, req)
try:
objs = json.load(req.environ['wsgi.input'])
except ValueError as err:
diff --git a/swift/container/sharder.py b/swift/container/sharder.py
index d9aa7c66d..dd33043ae 100644
--- a/swift/container/sharder.py
+++ b/swift/container/sharder.py
@@ -618,7 +618,8 @@ class ContainerSharder(ContainerReplicator):
def _send_shard_ranges(self, account, container, shard_ranges,
headers=None):
- body = json.dumps([dict(sr) for sr in shard_ranges]).encode('ascii')
+ body = json.dumps([dict(sr, reported=0)
+ for sr in shard_ranges]).encode('ascii')
part, nodes = self.ring.get_nodes(account, container)
headers = headers or {}
headers.update({'X-Backend-Record-Type': RECORD_TYPE_SHARD,
@@ -1148,7 +1149,8 @@ class ContainerSharder(ContainerReplicator):
'X-Backend-Storage-Policy-Index': broker.storage_policy_index,
'X-Container-Sysmeta-Shard-Quoted-Root': quote(
broker.root_path),
- 'X-Container-Sysmeta-Sharding': True}
+ 'X-Container-Sysmeta-Sharding': 'True',
+ 'X-Backend-Auto-Create': 'True'}
# NB: we *used* to send along
# 'X-Container-Sysmeta-Shard-Root': broker.root_path
# but that isn't safe for container names with nulls or newlines
@@ -1468,7 +1470,7 @@ class ContainerSharder(ContainerReplicator):
def _update_root_container(self, broker):
own_shard_range = broker.get_own_shard_range(no_default=True)
- if not own_shard_range:
+ if not own_shard_range or own_shard_range.reported:
return
# persist the reported shard metadata
@@ -1478,9 +1480,12 @@ class ContainerSharder(ContainerReplicator):
include_own=True,
include_deleted=True)
# send everything
- self._send_shard_ranges(
- broker.root_account, broker.root_container,
- shard_ranges)
+ if self._send_shard_ranges(
+ broker.root_account, broker.root_container, shard_ranges):
+ # on success, mark ourselves as reported so we don't keep
+ # hammering the root
+ own_shard_range.reported = True
+ broker.merge_shard_ranges(own_shard_range)
def _process_broker(self, broker, node, part):
broker.get_info() # make sure account/container are populated
diff --git a/swift/obj/updater.py b/swift/obj/updater.py
index 466f294c0..f3a01a824 100644
--- a/swift/obj/updater.py
+++ b/swift/obj/updater.py
@@ -14,12 +14,13 @@
# limitations under the License.
import six.moves.cPickle as pickle
+import errno
import os
import signal
import sys
import time
from swift import gettext_ as _
-from random import random
+from random import random, shuffle
from eventlet import spawn, Timeout
@@ -230,7 +231,9 @@ class ObjectUpdater(Daemon):
'to a valid policy (%(error)s)') % {
'directory': asyncdir, 'error': e})
continue
- for prefix in self._listdir(async_pending):
+ prefix_dirs = self._listdir(async_pending)
+ shuffle(prefix_dirs)
+ for prefix in prefix_dirs:
prefix_path = os.path.join(async_pending, prefix)
if not os.path.isdir(prefix_path):
continue
@@ -271,7 +274,11 @@ class ObjectUpdater(Daemon):
if obj_hash == last_obj_hash:
self.stats.unlinks += 1
self.logger.increment('unlinks')
- os.unlink(update_path)
+ try:
+ os.unlink(update_path)
+ except OSError as e:
+ if e.errno != errno.ENOENT:
+ raise
else:
last_obj_hash = obj_hash
yield {'device': device, 'policy': policy,