summaryrefslogtreecommitdiff
path: root/swift/cli/relinker.py
diff options
context:
space:
mode:
Diffstat (limited to 'swift/cli/relinker.py')
-rw-r--r--swift/cli/relinker.py181
1 files changed, 173 insertions, 8 deletions
diff --git a/swift/cli/relinker.py b/swift/cli/relinker.py
index b7b4aaf73..630c0e98e 100644
--- a/swift/cli/relinker.py
+++ b/swift/cli/relinker.py
@@ -14,8 +14,12 @@
# limitations under the License.
+import errno
+import fcntl
+import json
import logging
import os
+from functools import partial
from swift.common.storage_policy import POLICIES
from swift.common.exceptions import DiskFileDeleted, DiskFileNotExist, \
DiskFileQuarantined
@@ -24,10 +28,126 @@ from swift.common.utils import replace_partition_in_path, \
from swift.obj import diskfile
+LOCK_FILE = '.relink.{datadir}.lock'
+STATE_FILE = 'relink.{datadir}.json'
+STATE_TMP_FILE = '.relink.{datadir}.json.tmp'
+STEP_RELINK = 'relink'
+STEP_CLEANUP = 'cleanup'
+
+
+def devices_filter(device, _, devices):
+ if device:
+ devices = [d for d in devices if d == device]
+
+ return set(devices)
+
+
+def hook_pre_device(locks, states, datadir, device_path):
+ lock_file = os.path.join(device_path, LOCK_FILE.format(datadir=datadir))
+
+ fd = os.open(lock_file, os.O_CREAT | os.O_WRONLY)
+ fcntl.flock(fd, fcntl.LOCK_EX)
+ locks[0] = fd
+
+ state_file = os.path.join(device_path, STATE_FILE.format(datadir=datadir))
+ states.clear()
+ try:
+ with open(state_file, 'rt') as f:
+ tmp = json.load(f)
+ states.update(tmp)
+ except ValueError:
+ # Invalid JSON: remove the file to restart from scratch
+ os.unlink(state_file)
+ except IOError as err:
+ # Ignore file not found error
+ if err.errno != errno.ENOENT:
+ raise
+
+
+def hook_post_device(locks, _):
+ os.close(locks[0])
+ locks[0] = None
+
+
+def partitions_filter(states, step, part_power, next_part_power,
+ datadir_path, partitions):
+ # Remove all non partitions first (eg: auditor_status_ALL.json)
+ partitions = [p for p in partitions if p.isdigit()]
+
+ if not (step == STEP_CLEANUP and part_power == next_part_power):
+ # This is not a cleanup after cancel, partitions in the upper half are
+ # new partitions, there is nothing to relink/cleanup from there
+ partitions = [p for p in partitions
+ if int(p) < 2 ** next_part_power / 2]
+
+ # Format: { 'part': [relinked, cleaned] }
+ if states:
+ missing = list(set(partitions) - set(states.keys()))
+ if missing:
+ # All missing partitions was created after the first run of
+ # relink, so after the new ring was distribued, so they already
+ # are hardlinked in both partitions, but they will need to
+ # cleaned.. Just update the state file.
+ for part in missing:
+ states[part] = [True, False]
+ if step == STEP_RELINK:
+ partitions = [str(p) for p, (r, c) in states.items() if not r]
+ elif step == STEP_CLEANUP:
+ partitions = [str(p) for p, (r, c) in states.items() if not c]
+ else:
+ states.update({str(p): [False, False] for p in partitions})
+
+ # Always scan the partitions in reverse order to minimize the amount of IO
+ # (it actually only matters for relink, not for cleanup).
+ #
+ # Initial situation:
+ # objects/0/000/00000000000000000000000000000000/12345.data
+ # -> relinked to objects/1/000/10000000000000000000000000000000/12345.data
+ #
+ # If the relinker then scan partition 1, it will listdir that object while
+ # it's unnecessary. By working in reverse order of partitions, this is
+ # avoided.
+ partitions = sorted(partitions, key=lambda x: int(x), reverse=True)
+
+ return partitions
+
+
+# Save states when a partition is done
+def hook_post_partition(states, step,
+ partition_path):
+ part = os.path.basename(os.path.abspath(partition_path))
+ datadir_path = os.path.dirname(os.path.abspath(partition_path))
+ device_path = os.path.dirname(os.path.abspath(datadir_path))
+ datadir_name = os.path.basename(os.path.abspath(datadir_path))
+ state_tmp_file = os.path.join(device_path,
+ STATE_TMP_FILE.format(datadir=datadir_name))
+ state_file = os.path.join(device_path,
+ STATE_FILE.format(datadir=datadir_name))
+
+ if step == STEP_RELINK:
+ states[part][0] = True
+ elif step == STEP_CLEANUP:
+ states[part][1] = True
+ with open(state_tmp_file, 'wt') as f:
+ json.dump(states, f)
+ os.fsync(f.fileno())
+ os.rename(state_tmp_file, state_file)
+
+
+def hashes_filter(next_part_power, suff_path, hashes):
+ hashes = list(hashes)
+ for hsh in hashes:
+ fname = os.path.join(suff_path, hsh, 'fake-file-name')
+ if replace_partition_in_path(fname, next_part_power) == fname:
+ hashes.remove(hsh)
+ return hashes
+
+
def relink(swift_dir='/etc/swift',
devices='/srv/node',
skip_mount_check=False,
- logger=logging.getLogger()):
+ logger=logging.getLogger(),
+ device=None):
mount_check = not skip_mount_check
run = False
relinked = errors = 0
@@ -41,10 +161,31 @@ def relink(swift_dir='/etc/swift',
logging.info('Relinking files for policy %s under %s',
policy.name, devices)
run = True
+ datadir = diskfile.get_data_dir(policy)
+
+ locks = [None]
+ states = {}
+ relink_devices_filter = partial(devices_filter, device)
+ relink_hook_pre_device = partial(hook_pre_device, locks, states,
+ datadir)
+ relink_hook_post_device = partial(hook_post_device, locks)
+ relink_partition_filter = partial(partitions_filter,
+ states, STEP_RELINK,
+ part_power, next_part_power)
+ relink_hook_post_partition = partial(hook_post_partition,
+ states, STEP_RELINK)
+ relink_hashes_filter = partial(hashes_filter, next_part_power)
+
locations = audit_location_generator(
devices,
- diskfile.get_data_dir(policy),
- mount_check=mount_check)
+ datadir,
+ mount_check=mount_check,
+ devices_filter=relink_devices_filter,
+ hook_pre_device=relink_hook_pre_device,
+ hook_post_device=relink_hook_post_device,
+ partitions_filter=relink_partition_filter,
+ hook_post_partition=relink_hook_post_partition,
+ hashes_filter=relink_hashes_filter)
for fname, _, _ in locations:
newfname = replace_partition_in_path(fname, next_part_power)
try:
@@ -67,7 +208,8 @@ def relink(swift_dir='/etc/swift',
def cleanup(swift_dir='/etc/swift',
devices='/srv/node',
skip_mount_check=False,
- logger=logging.getLogger()):
+ logger=logging.getLogger(),
+ device=None):
mount_check = not skip_mount_check
conf = {'devices': devices, 'mount_check': mount_check}
diskfile_router = diskfile.DiskFileRouter(conf, get_logger(conf))
@@ -83,10 +225,31 @@ def cleanup(swift_dir='/etc/swift',
logging.info('Cleaning up files for policy %s under %s',
policy.name, devices)
run = True
+ datadir = diskfile.get_data_dir(policy)
+
+ locks = [None]
+ states = {}
+ cleanup_devices_filter = partial(devices_filter, device)
+ cleanup_hook_pre_device = partial(hook_pre_device, locks, states,
+ datadir)
+ cleanup_hook_post_device = partial(hook_post_device, locks)
+ cleanup_partition_filter = partial(partitions_filter,
+ states, STEP_CLEANUP,
+ part_power, next_part_power)
+ cleanup_hook_post_partition = partial(hook_post_partition,
+ states, STEP_CLEANUP)
+ cleanup_hashes_filter = partial(hashes_filter, next_part_power)
+
locations = audit_location_generator(
devices,
- diskfile.get_data_dir(policy),
- mount_check=mount_check)
+ datadir,
+ mount_check=mount_check,
+ devices_filter=cleanup_devices_filter,
+ hook_pre_device=cleanup_hook_pre_device,
+ hook_post_device=cleanup_hook_post_device,
+ partitions_filter=cleanup_partition_filter,
+ hook_post_partition=cleanup_hook_post_partition,
+ hashes_filter=cleanup_hashes_filter)
for fname, device, partition in locations:
expected_fname = replace_partition_in_path(fname, part_power)
if fname == expected_fname:
@@ -152,8 +315,10 @@ def main(args):
if args.action == 'relink':
return relink(
- args.swift_dir, args.devices, args.skip_mount_check, logger)
+ args.swift_dir, args.devices, args.skip_mount_check, logger,
+ device=args.device)
if args.action == 'cleanup':
return cleanup(
- args.swift_dir, args.devices, args.skip_mount_check, logger)
+ args.swift_dir, args.devices, args.skip_mount_check, logger,
+ device=args.device)