diff options
Diffstat (limited to 'swift/cli/relinker.py')
-rw-r--r-- | swift/cli/relinker.py | 181 |
1 files changed, 173 insertions, 8 deletions
diff --git a/swift/cli/relinker.py b/swift/cli/relinker.py index b7b4aaf73..630c0e98e 100644 --- a/swift/cli/relinker.py +++ b/swift/cli/relinker.py @@ -14,8 +14,12 @@ # limitations under the License. +import errno +import fcntl +import json import logging import os +from functools import partial from swift.common.storage_policy import POLICIES from swift.common.exceptions import DiskFileDeleted, DiskFileNotExist, \ DiskFileQuarantined @@ -24,10 +28,126 @@ from swift.common.utils import replace_partition_in_path, \ from swift.obj import diskfile +LOCK_FILE = '.relink.{datadir}.lock' +STATE_FILE = 'relink.{datadir}.json' +STATE_TMP_FILE = '.relink.{datadir}.json.tmp' +STEP_RELINK = 'relink' +STEP_CLEANUP = 'cleanup' + + +def devices_filter(device, _, devices): + if device: + devices = [d for d in devices if d == device] + + return set(devices) + + +def hook_pre_device(locks, states, datadir, device_path): + lock_file = os.path.join(device_path, LOCK_FILE.format(datadir=datadir)) + + fd = os.open(lock_file, os.O_CREAT | os.O_WRONLY) + fcntl.flock(fd, fcntl.LOCK_EX) + locks[0] = fd + + state_file = os.path.join(device_path, STATE_FILE.format(datadir=datadir)) + states.clear() + try: + with open(state_file, 'rt') as f: + tmp = json.load(f) + states.update(tmp) + except ValueError: + # Invalid JSON: remove the file to restart from scratch + os.unlink(state_file) + except IOError as err: + # Ignore file not found error + if err.errno != errno.ENOENT: + raise + + +def hook_post_device(locks, _): + os.close(locks[0]) + locks[0] = None + + +def partitions_filter(states, step, part_power, next_part_power, + datadir_path, partitions): + # Remove all non partitions first (eg: auditor_status_ALL.json) + partitions = [p for p in partitions if p.isdigit()] + + if not (step == STEP_CLEANUP and part_power == next_part_power): + # This is not a cleanup after cancel, partitions in the upper half are + # new partitions, there is nothing to relink/cleanup from there + partitions = [p for p in partitions + if int(p) < 2 ** next_part_power / 2] + + # Format: { 'part': [relinked, cleaned] } + if states: + missing = list(set(partitions) - set(states.keys())) + if missing: + # All missing partitions was created after the first run of + # relink, so after the new ring was distribued, so they already + # are hardlinked in both partitions, but they will need to + # cleaned.. Just update the state file. + for part in missing: + states[part] = [True, False] + if step == STEP_RELINK: + partitions = [str(p) for p, (r, c) in states.items() if not r] + elif step == STEP_CLEANUP: + partitions = [str(p) for p, (r, c) in states.items() if not c] + else: + states.update({str(p): [False, False] for p in partitions}) + + # Always scan the partitions in reverse order to minimize the amount of IO + # (it actually only matters for relink, not for cleanup). + # + # Initial situation: + # objects/0/000/00000000000000000000000000000000/12345.data + # -> relinked to objects/1/000/10000000000000000000000000000000/12345.data + # + # If the relinker then scan partition 1, it will listdir that object while + # it's unnecessary. By working in reverse order of partitions, this is + # avoided. + partitions = sorted(partitions, key=lambda x: int(x), reverse=True) + + return partitions + + +# Save states when a partition is done +def hook_post_partition(states, step, + partition_path): + part = os.path.basename(os.path.abspath(partition_path)) + datadir_path = os.path.dirname(os.path.abspath(partition_path)) + device_path = os.path.dirname(os.path.abspath(datadir_path)) + datadir_name = os.path.basename(os.path.abspath(datadir_path)) + state_tmp_file = os.path.join(device_path, + STATE_TMP_FILE.format(datadir=datadir_name)) + state_file = os.path.join(device_path, + STATE_FILE.format(datadir=datadir_name)) + + if step == STEP_RELINK: + states[part][0] = True + elif step == STEP_CLEANUP: + states[part][1] = True + with open(state_tmp_file, 'wt') as f: + json.dump(states, f) + os.fsync(f.fileno()) + os.rename(state_tmp_file, state_file) + + +def hashes_filter(next_part_power, suff_path, hashes): + hashes = list(hashes) + for hsh in hashes: + fname = os.path.join(suff_path, hsh, 'fake-file-name') + if replace_partition_in_path(fname, next_part_power) == fname: + hashes.remove(hsh) + return hashes + + def relink(swift_dir='/etc/swift', devices='/srv/node', skip_mount_check=False, - logger=logging.getLogger()): + logger=logging.getLogger(), + device=None): mount_check = not skip_mount_check run = False relinked = errors = 0 @@ -41,10 +161,31 @@ def relink(swift_dir='/etc/swift', logging.info('Relinking files for policy %s under %s', policy.name, devices) run = True + datadir = diskfile.get_data_dir(policy) + + locks = [None] + states = {} + relink_devices_filter = partial(devices_filter, device) + relink_hook_pre_device = partial(hook_pre_device, locks, states, + datadir) + relink_hook_post_device = partial(hook_post_device, locks) + relink_partition_filter = partial(partitions_filter, + states, STEP_RELINK, + part_power, next_part_power) + relink_hook_post_partition = partial(hook_post_partition, + states, STEP_RELINK) + relink_hashes_filter = partial(hashes_filter, next_part_power) + locations = audit_location_generator( devices, - diskfile.get_data_dir(policy), - mount_check=mount_check) + datadir, + mount_check=mount_check, + devices_filter=relink_devices_filter, + hook_pre_device=relink_hook_pre_device, + hook_post_device=relink_hook_post_device, + partitions_filter=relink_partition_filter, + hook_post_partition=relink_hook_post_partition, + hashes_filter=relink_hashes_filter) for fname, _, _ in locations: newfname = replace_partition_in_path(fname, next_part_power) try: @@ -67,7 +208,8 @@ def relink(swift_dir='/etc/swift', def cleanup(swift_dir='/etc/swift', devices='/srv/node', skip_mount_check=False, - logger=logging.getLogger()): + logger=logging.getLogger(), + device=None): mount_check = not skip_mount_check conf = {'devices': devices, 'mount_check': mount_check} diskfile_router = diskfile.DiskFileRouter(conf, get_logger(conf)) @@ -83,10 +225,31 @@ def cleanup(swift_dir='/etc/swift', logging.info('Cleaning up files for policy %s under %s', policy.name, devices) run = True + datadir = diskfile.get_data_dir(policy) + + locks = [None] + states = {} + cleanup_devices_filter = partial(devices_filter, device) + cleanup_hook_pre_device = partial(hook_pre_device, locks, states, + datadir) + cleanup_hook_post_device = partial(hook_post_device, locks) + cleanup_partition_filter = partial(partitions_filter, + states, STEP_CLEANUP, + part_power, next_part_power) + cleanup_hook_post_partition = partial(hook_post_partition, + states, STEP_CLEANUP) + cleanup_hashes_filter = partial(hashes_filter, next_part_power) + locations = audit_location_generator( devices, - diskfile.get_data_dir(policy), - mount_check=mount_check) + datadir, + mount_check=mount_check, + devices_filter=cleanup_devices_filter, + hook_pre_device=cleanup_hook_pre_device, + hook_post_device=cleanup_hook_post_device, + partitions_filter=cleanup_partition_filter, + hook_post_partition=cleanup_hook_post_partition, + hashes_filter=cleanup_hashes_filter) for fname, device, partition in locations: expected_fname = replace_partition_in_path(fname, part_power) if fname == expected_fname: @@ -152,8 +315,10 @@ def main(args): if args.action == 'relink': return relink( - args.swift_dir, args.devices, args.skip_mount_check, logger) + args.swift_dir, args.devices, args.skip_mount_check, logger, + device=args.device) if args.action == 'cleanup': return cleanup( - args.swift_dir, args.devices, args.skip_mount_check, logger) + args.swift_dir, args.devices, args.skip_mount_check, logger, + device=args.device) |