diff options
Diffstat (limited to 'swift/obj/replicator.py')
-rw-r--r-- | swift/obj/replicator.py | 36 |
1 files changed, 20 insertions, 16 deletions
diff --git a/swift/obj/replicator.py b/swift/obj/replicator.py index e3634bb8f..e9d19975a 100644 --- a/swift/obj/replicator.py +++ b/swift/obj/replicator.py @@ -16,9 +16,8 @@ from collections import defaultdict import os import errno -from os.path import isdir, isfile, join, dirname +from os.path import join, dirname import random -import shutil import time import itertools from six import viewkeys @@ -33,7 +32,7 @@ from swift.common.constraints import check_drive from swift.common.ring.utils import is_local_device from swift.common.utils import whataremyips, unlink_older_than, \ compute_eta, get_logger, dump_recon_cache, \ - rsync_module_interpolation, mkdirs, config_true_value, \ + rsync_module_interpolation, config_true_value, \ config_auto_int_value, storage_directory, \ load_recon_cache, PrefixLoggerAdapter, parse_override_options, \ distribute_evenly @@ -485,9 +484,11 @@ class ObjectReplicator(Daemon): :param job: a dict containing info about the partition to be replicated """ + df_mgr = self._df_router[job['policy']] + def tpool_get_suffixes(path): - return [suff for suff in os.listdir(path) - if len(suff) == 3 and isdir(join(path, suff))] + return [suff for suff in df_mgr.listdir(path) + if len(suff) == 3 and df_mgr.isdir(join(path, suff))] stats = self.stats_for_dev[job['device']] stats.attempted += 1 @@ -562,10 +563,10 @@ class ObjectReplicator(Daemon): failure_dev['device']) for failure_dev in job['nodes']]) else: - self.delete_partition(job['path']) + self.delete_partition(df_mgr, job['path']) handoff_partition_deleted = True elif not suffixes: - self.delete_partition(job['path']) + self.delete_partition(df_mgr, job['path']) handoff_partition_deleted = True except (Exception, Timeout): self.logger.exception(_("Error syncing handoff partition")) @@ -580,25 +581,26 @@ class ObjectReplicator(Daemon): self.partition_times.append(time.time() - begin) self.logger.timing_since('partition.delete.timing', begin) - def delete_partition(self, path): + def delete_partition(self, df_mgr, path): self.logger.info(_("Removing partition: %s"), path) try: - tpool.execute(shutil.rmtree, path) + tpool.execute(df_mgr.rmtree, path) except OSError as e: if e.errno not in (errno.ENOENT, errno.ENOTEMPTY): # If there was a race to create or delete, don't worry raise def delete_handoff_objs(self, job, delete_objs): + df_mgr = self._df_router[job['policy']] success_paths = [] error_paths = [] for object_hash in delete_objs: object_path = storage_directory(job['obj_path'], job['partition'], object_hash) - tpool.execute(shutil.rmtree, object_path, ignore_errors=True) + tpool.execute(df_mgr.rmtree, object_path, ignore_errors=True) suffix_dir = dirname(object_path) try: - os.rmdir(suffix_dir) + df_mgr.rmdir(suffix_dir) success_paths.append(object_path) except OSError as e: if e.errno not in (errno.ENOENT, errno.ENOTEMPTY): @@ -816,13 +818,14 @@ class ObjectReplicator(Daemon): tmp_path = join(dev_path, get_tmp_dir(policy)) unlink_older_than(tmp_path, time.time() - df_mgr.reclaim_age) - if not os.path.exists(obj_path): + if not df_mgr.exists(obj_path): try: - mkdirs(obj_path) + df_mgr.mkdirs(obj_path) except Exception: self.logger.exception('ERROR creating %s' % obj_path) continue - for partition in os.listdir(obj_path): + + for partition in df_mgr.listdir(obj_path): if (override_partitions is not None and partition.isdigit() and int(partition) not in override_partitions): continue @@ -937,6 +940,7 @@ class ObjectReplicator(Daemon): override_partitions=override_partitions, override_policies=override_policies) for job in jobs: + df_mgr = self._df_router[job['policy']] dev_stats = self.stats_for_dev[job['device']] num_jobs += 1 current_nodes = job['nodes'] @@ -964,13 +968,13 @@ class ObjectReplicator(Daemon): return try: - if isfile(job['path']): + if df_mgr.isfile(job['path']): # Clean up any (probably zero-byte) files where a # partition should be. self.logger.warning( 'Removing partition directory ' 'which was a file: %s', job['path']) - os.remove(job['path']) + df_mgr.remove(job['path']) continue except OSError: continue |