summaryrefslogtreecommitdiff
path: root/swift/obj/replicator.py
diff options
context:
space:
mode:
Diffstat (limited to 'swift/obj/replicator.py')
-rw-r--r--swift/obj/replicator.py36
1 files changed, 20 insertions, 16 deletions
diff --git a/swift/obj/replicator.py b/swift/obj/replicator.py
index e3634bb8f..e9d19975a 100644
--- a/swift/obj/replicator.py
+++ b/swift/obj/replicator.py
@@ -16,9 +16,8 @@
from collections import defaultdict
import os
import errno
-from os.path import isdir, isfile, join, dirname
+from os.path import join, dirname
import random
-import shutil
import time
import itertools
from six import viewkeys
@@ -33,7 +32,7 @@ from swift.common.constraints import check_drive
from swift.common.ring.utils import is_local_device
from swift.common.utils import whataremyips, unlink_older_than, \
compute_eta, get_logger, dump_recon_cache, \
- rsync_module_interpolation, mkdirs, config_true_value, \
+ rsync_module_interpolation, config_true_value, \
config_auto_int_value, storage_directory, \
load_recon_cache, PrefixLoggerAdapter, parse_override_options, \
distribute_evenly
@@ -485,9 +484,11 @@ class ObjectReplicator(Daemon):
:param job: a dict containing info about the partition to be replicated
"""
+ df_mgr = self._df_router[job['policy']]
+
def tpool_get_suffixes(path):
- return [suff for suff in os.listdir(path)
- if len(suff) == 3 and isdir(join(path, suff))]
+ return [suff for suff in df_mgr.listdir(path)
+ if len(suff) == 3 and df_mgr.isdir(join(path, suff))]
stats = self.stats_for_dev[job['device']]
stats.attempted += 1
@@ -562,10 +563,10 @@ class ObjectReplicator(Daemon):
failure_dev['device'])
for failure_dev in job['nodes']])
else:
- self.delete_partition(job['path'])
+ self.delete_partition(df_mgr, job['path'])
handoff_partition_deleted = True
elif not suffixes:
- self.delete_partition(job['path'])
+ self.delete_partition(df_mgr, job['path'])
handoff_partition_deleted = True
except (Exception, Timeout):
self.logger.exception(_("Error syncing handoff partition"))
@@ -580,25 +581,26 @@ class ObjectReplicator(Daemon):
self.partition_times.append(time.time() - begin)
self.logger.timing_since('partition.delete.timing', begin)
- def delete_partition(self, path):
+ def delete_partition(self, df_mgr, path):
self.logger.info(_("Removing partition: %s"), path)
try:
- tpool.execute(shutil.rmtree, path)
+ tpool.execute(df_mgr.rmtree, path)
except OSError as e:
if e.errno not in (errno.ENOENT, errno.ENOTEMPTY):
# If there was a race to create or delete, don't worry
raise
def delete_handoff_objs(self, job, delete_objs):
+ df_mgr = self._df_router[job['policy']]
success_paths = []
error_paths = []
for object_hash in delete_objs:
object_path = storage_directory(job['obj_path'], job['partition'],
object_hash)
- tpool.execute(shutil.rmtree, object_path, ignore_errors=True)
+ tpool.execute(df_mgr.rmtree, object_path, ignore_errors=True)
suffix_dir = dirname(object_path)
try:
- os.rmdir(suffix_dir)
+ df_mgr.rmdir(suffix_dir)
success_paths.append(object_path)
except OSError as e:
if e.errno not in (errno.ENOENT, errno.ENOTEMPTY):
@@ -816,13 +818,14 @@ class ObjectReplicator(Daemon):
tmp_path = join(dev_path, get_tmp_dir(policy))
unlink_older_than(tmp_path, time.time() -
df_mgr.reclaim_age)
- if not os.path.exists(obj_path):
+ if not df_mgr.exists(obj_path):
try:
- mkdirs(obj_path)
+ df_mgr.mkdirs(obj_path)
except Exception:
self.logger.exception('ERROR creating %s' % obj_path)
continue
- for partition in os.listdir(obj_path):
+
+ for partition in df_mgr.listdir(obj_path):
if (override_partitions is not None and partition.isdigit()
and int(partition) not in override_partitions):
continue
@@ -937,6 +940,7 @@ class ObjectReplicator(Daemon):
override_partitions=override_partitions,
override_policies=override_policies)
for job in jobs:
+ df_mgr = self._df_router[job['policy']]
dev_stats = self.stats_for_dev[job['device']]
num_jobs += 1
current_nodes = job['nodes']
@@ -964,13 +968,13 @@ class ObjectReplicator(Daemon):
return
try:
- if isfile(job['path']):
+ if df_mgr.isfile(job['path']):
# Clean up any (probably zero-byte) files where a
# partition should be.
self.logger.warning(
'Removing partition directory '
'which was a file: %s', job['path'])
- os.remove(job['path'])
+ df_mgr.remove(job['path'])
continue
except OSError:
continue