summaryrefslogtreecommitdiff
path: root/swift/obj
diff options
context:
space:
mode:
Diffstat (limited to 'swift/obj')
-rw-r--r--swift/obj/diskfile.py136
-rw-r--r--swift/obj/fmgr.proto225
-rw-r--r--swift/obj/fmgr_pb2.py2119
-rw-r--r--swift/obj/header.py394
-rw-r--r--swift/obj/kvfile.py1260
-rw-r--r--swift/obj/meta.proto14
-rw-r--r--swift/obj/meta_pb2.py115
-rw-r--r--swift/obj/objectrpcmanager.py157
-rw-r--r--swift/obj/reconstructor.py29
-rw-r--r--swift/obj/replicator.py36
-rw-r--r--swift/obj/rpc_http.py370
-rw-r--r--swift/obj/vfile.py1201
-rw-r--r--swift/obj/vfile_utils.py228
13 files changed, 6215 insertions, 69 deletions
diff --git a/swift/obj/diskfile.py b/swift/obj/diskfile.py
index 68f23b3e4..9e5fdd93f 100644
--- a/swift/obj/diskfile.py
+++ b/swift/obj/diskfile.py
@@ -37,6 +37,7 @@ import errno
import fcntl
import json
import os
+import shutil
import re
import time
import uuid
@@ -44,7 +45,6 @@ from hashlib import md5
import logging
import traceback
import xattr
-from os.path import basename, dirname, exists, join, splitext
from random import shuffle
from tempfile import mkstemp
from contextlib import contextmanager
@@ -323,11 +323,11 @@ def quarantine_renamer(device_path, corrupted_file_path):
if policy is None:
# TODO: support a quarantine-unknown location
policy = POLICIES.legacy
- from_dir = dirname(corrupted_file_path)
- to_dir = join(device_path, 'quarantined',
- get_data_dir(policy),
- basename(from_dir))
- invalidate_hash(dirname(from_dir))
+ from_dir = os.path.dirname(corrupted_file_path)
+ to_dir = os.path.join(device_path, 'quarantined',
+ get_data_dir(policy),
+ os.path.basename(from_dir))
+ invalidate_hash(os.path.dirname(from_dir))
try:
renamer(from_dir, to_dir, fsync=False)
except OSError as e:
@@ -345,7 +345,7 @@ def read_hashes(partition_dir):
:returns: a dict, the suffix hashes (if any), the key 'valid' will be False
if hashes.pkl is corrupt, cannot be read or does not exist
"""
- hashes_file = join(partition_dir, HASH_FILE)
+ hashes_file = os.path.join(partition_dir, HASH_FILE)
hashes = {'valid': False}
try:
with open(hashes_file, 'rb') as hashes_fp:
@@ -378,7 +378,7 @@ def write_hashes(partition_dir, hashes):
The updated key is added to hashes before it is written.
"""
- hashes_file = join(partition_dir, HASH_FILE)
+ hashes_file = os.path.join(partition_dir, HASH_FILE)
# 'valid' key should always be set by the caller; however, if there's a bug
# setting invalid is most safe
hashes.setdefault('valid', False)
@@ -397,7 +397,7 @@ def consolidate_hashes(partition_dir):
:returns: a dict, the suffix hashes (if any), the key 'valid' will be False
if hashes.pkl is corrupt, cannot be read or does not exist
"""
- invalidations_file = join(partition_dir, HASH_INVALIDATIONS_FILE)
+ invalidations_file = os.path.join(partition_dir, HASH_INVALIDATIONS_FILE)
with lock_path(partition_dir):
hashes = read_hashes(partition_dir)
@@ -431,9 +431,9 @@ def invalidate_hash(suffix_dir):
invalidating
"""
- suffix = basename(suffix_dir)
- partition_dir = dirname(suffix_dir)
- invalidations_file = join(partition_dir, HASH_INVALIDATIONS_FILE)
+ suffix = os.path.basename(suffix_dir)
+ partition_dir = os.path.dirname(suffix_dir)
+ invalidations_file = os.path.join(partition_dir, HASH_INVALIDATIONS_FILE)
if not isinstance(suffix, bytes):
suffix = suffix.encode('utf-8')
with lock_path(partition_dir), open(invalidations_file, 'ab') as inv_fh:
@@ -803,7 +803,7 @@ class BaseDiskFileManager(object):
validated.
"""
ts_ctype = None
- fname, ext = splitext(filename)
+ fname, ext = os.path.splitext(filename)
try:
if ext == '.meta':
timestamp, ts_ctype = decode_timestamps(
@@ -1032,7 +1032,8 @@ class BaseDiskFileManager(object):
for info_key in ('data_info', 'meta_info', 'ts_info', 'ctype_info'):
info = results.get(info_key)
key = info_key[:-5] + '_file'
- results[key] = join(datadir, info['filename']) if info else None
+ results[key] = os.path.join(
+ datadir, info['filename']) if info else None
if verify:
assert self._verify_ondisk_files(
@@ -1074,14 +1075,14 @@ class BaseDiskFileManager(object):
files, hsh_path, verify=False, **kwargs)
if 'ts_info' in results and is_reclaimable(
results['ts_info']['timestamp']):
- remove_file(join(hsh_path, results['ts_info']['filename']))
+ remove_file(os.path.join(hsh_path, results['ts_info']['filename']))
files.remove(results.pop('ts_info')['filename'])
for file_info in results.get('possible_reclaim', []):
# stray files are not deleted until reclaim-age
if is_reclaimable(file_info['timestamp']):
results.setdefault('obsolete', []).append(file_info)
for file_info in results.get('obsolete', []):
- remove_file(join(hsh_path, file_info['filename']))
+ remove_file(os.path.join(hsh_path, file_info['filename']))
files.remove(file_info['filename'])
results['files'] = files
if not files: # everything got unlinked
@@ -1135,23 +1136,23 @@ class BaseDiskFileManager(object):
raise PathNotDir()
raise
for hsh in path_contents:
- hsh_path = join(path, hsh)
+ hsh_path = os.path.join(path, hsh)
try:
ondisk_info = self.cleanup_ondisk_files(
hsh_path, policy=policy)
except OSError as err:
if err.errno == errno.ENOTDIR:
- partition_path = dirname(path)
- objects_path = dirname(partition_path)
- device_path = dirname(objects_path)
+ partition_path = os.path.dirname(path)
+ objects_path = os.path.dirname(partition_path)
+ device_path = os.path.dirname(objects_path)
# The made-up filename is so that the eventual dirpath()
# will result in this object directory that we care about.
# Some failures will result in an object directory
# becoming a file, thus causing the parent directory to
# be qarantined.
- quar_path = quarantine_renamer(device_path,
- join(hsh_path,
- "made-up-filename"))
+ quar_path = quarantine_renamer(
+ device_path, os.path.join(
+ hsh_path, "made-up-filename"))
logging.exception(
_('Quarantined %(hsh_path)s to %(quar_path)s because '
'it is not a directory'), {'hsh_path': hsh_path,
@@ -1236,7 +1237,7 @@ class BaseDiskFileManager(object):
hashed = 0
dev_path = self.get_dev_path(device)
partition_path = get_part_path(dev_path, policy, partition)
- hashes_file = join(partition_path, HASH_FILE)
+ hashes_file = os.path.join(partition_path, HASH_FILE)
modified = False
orig_hashes = {'valid': False}
@@ -1278,7 +1279,7 @@ class BaseDiskFileManager(object):
hashes.update((suffix, None) for suffix in recalculate)
for suffix, hash_ in list(hashes.items()):
if not hash_:
- suffix_dir = join(partition_path, suffix)
+ suffix_dir = os.path.join(partition_path, suffix)
try:
hashes[suffix] = self._hash_suffix(
suffix_dir, policy=policy)
@@ -1322,7 +1323,7 @@ class BaseDiskFileManager(object):
"""
if mount_check is False:
# explicitly forbidden from syscall, just return path
- return join(self.devices, device)
+ return os.path.join(self.devices, device)
# we'll do some kind of check if not explicitly forbidden
try:
return check_drive(self.devices, device,
@@ -1471,9 +1472,9 @@ class BaseDiskFileManager(object):
# Some failures will result in an object directory
# becoming a file, thus causing the parent directory to
# be qarantined.
- quar_path = self.quarantine_renamer(dev_path,
- join(object_path,
- "made-up-filename"))
+ quar_path = self.quarantine_renamer(
+ dev_path, os.path.join(
+ object_path, "made-up-filename"))
logging.exception(
_('Quarantined %(object_path)s to %(quar_path)s because '
'it is not a directory'), {'object_path': object_path,
@@ -1529,6 +1530,63 @@ class BaseDiskFileManager(object):
path, err)
return []
+ def exists(self, path):
+ """
+ :param path: full path to directory
+ """
+ return os.path.exists(path)
+
+ def mkdirs(self, path):
+ """
+ :param path: full path to directory
+ """
+ return mkdirs(path)
+
+ def listdir(self, path):
+ """
+ :param path: full path to directory
+ """
+ return os.listdir(path)
+
+ def rmtree(self, path, ignore_errors=False):
+ """
+ :param path: full path to directory
+ :param ignore_errors: if True, ignore errors from failed removals,
+ else, raise an exception.
+ """
+ return shutil.rmtree(path, ignore_errors)
+
+ def remove_file(self, path):
+ """
+ quiet wrapper around os.unlink. can be merged with remove?
+ :param path: full path to directory
+ """
+ return remove_file(path)
+
+ def remove(self, path):
+ """
+ :param path: full path to directory
+ """
+ return os.remove(path)
+
+ def isdir(self, path):
+ """
+ :param path: full path to directory
+ """
+ return os.path.isdir(path)
+
+ def isfile(self, path):
+ """
+ :param path: full path to directory
+ """
+ return os.path.isfile(path)
+
+ def rmdir(self, path):
+ """
+ :param path: full path to directory
+ """
+ return os.rmdir(path)
+
def yield_suffixes(self, device, partition, policy):
"""
Yields tuples of (full_path, suffix_only) for suffixes stored
@@ -1701,9 +1759,9 @@ class BaseDiskFileWriter(object):
else:
raise
if not self.manager.use_linkat:
- tmpdir = join(self._diskfile._device_path,
- get_tmp_dir(self._diskfile.policy))
- if not exists(tmpdir):
+ tmpdir = os.path.join(self._diskfile._device_path,
+ get_tmp_dir(self._diskfile.policy))
+ if not os.path.exists(tmpdir):
mkdirs(tmpdir)
fd, tmppath = mkstemp(dir=tmpdir)
return fd, tmppath
@@ -1792,7 +1850,7 @@ class BaseDiskFileWriter(object):
# drop_cache() after fsync() to avoid redundant work (pages all
# clean).
drop_buffer_cache(self._fd, 0, self._upload_size)
- self.manager.invalidate_hash(dirname(self._datadir))
+ self.manager.invalidate_hash(os.path.dirname(self._datadir))
# After the rename/linkat completes, this object will be available for
# requests to reference.
if self._tmppath:
@@ -1852,7 +1910,7 @@ class BaseDiskFileWriter(object):
timestamp, self._extension, ctype_timestamp=ctype_timestamp,
*a, **kw)
metadata['name'] = self._name
- target_path = join(self._datadir, filename)
+ target_path = os.path.join(self._datadir, filename)
tpool.execute(self._finalize_put, metadata, target_path, cleanup)
@@ -2294,7 +2352,7 @@ class BaseDiskFile(object):
self._account = None
self._container = None
self._obj = None
- self._tmpdir = join(device_path, get_tmp_dir(policy))
+ self._tmpdir = os.path.join(device_path, get_tmp_dir(policy))
self._ondisk_info = None
self._metadata = None
self._datafile_metadata = None
@@ -2307,7 +2365,7 @@ class BaseDiskFile(object):
self._datadir = _datadir
else:
name_hash = hash_path(account, container, obj)
- self._datadir = join(
+ self._datadir = os.path.join(
device_path, storage_directory(get_data_dir(policy),
partition, name_hash))
@@ -3111,7 +3169,7 @@ class ECDiskFileWriter(BaseDiskFileWriter):
:raises DiskFileError: if the diskfile frag_index has not been set
(either during initialisation or a call to put())
"""
- data_file_path = join(
+ data_file_path = os.path.join(
self._datadir, self.manager.make_on_disk_filename(
timestamp, '.data', self._diskfile._frag_index))
durable_data_file_path = os.path.join(
@@ -3270,7 +3328,7 @@ class ECDiskFile(BaseDiskFile):
timestamp, ext='.data', frag_index=frag_index, durable=True)
remove_file(os.path.join(self._datadir, purge_file))
remove_directory(self._datadir)
- self.manager.invalidate_hash(dirname(self._datadir))
+ self.manager.invalidate_hash(os.path.dirname(self._datadir))
class ECDiskFileManager(BaseDiskFileManager):
@@ -3354,7 +3412,7 @@ class ECDiskFileManager(BaseDiskFileManager):
validated.
"""
frag_index = None
- float_frag, ext = splitext(filename)
+ float_frag, ext = os.path.splitext(filename)
if ext == '.data':
parts = float_frag.split('#')
try:
diff --git a/swift/obj/fmgr.proto b/swift/obj/fmgr.proto
new file mode 100644
index 000000000..2604305bb
--- /dev/null
+++ b/swift/obj/fmgr.proto
@@ -0,0 +1,225 @@
+syntax = "proto3";
+
+package filemgr;
+
+// Python: protoc -I. --python_out=. fmgr.proto
+// Golang : protoc -I proto proto/fmgr.proto --go_out=proto
+
+message RegisterVolumeRequest {
+ uint32 partition = 1; // Swift partition
+ VolumeType type = 2;
+ uint32 volume_index = 3;
+ uint64 offset = 4; // Next available offset to use in the volume.
+ VolumeState state = 5;
+ bool repair_tool = 6; // Request is coming from a repair tool
+}
+
+message RegisterVolumeReply {}
+
+message UnregisterVolumeRequest {
+ uint32 index = 1;
+ bool repair_tool = 2;
+}
+
+message UnregisterVolumeReply {}
+
+message UpdateVolumeStateRequest {
+ uint32 volume_index = 1;
+ VolumeState state = 2;
+ bool repair_tool = 3;
+}
+
+message UpdateVolumeStateReply {}
+
+message GetVolumeRequest {
+ uint32 index = 1;
+ bool repair_tool = 2;
+}
+
+message GetVolumeReply {
+ uint32 volume_index = 1;
+ VolumeType volume_type = 2;
+ uint32 volume_state = 3;
+ uint32 partition = 4;
+ uint64 next_offset = 5;
+}
+
+message ListVolumesRequest {
+ uint32 partition = 1;
+ VolumeType type = 2;
+ bool repair_tool = 3;
+}
+
+message ListVolumesReply {
+ repeated Volume volumes = 1;
+}
+
+message RegisterObjectRequest {
+ bytes name = 1;
+ uint32 volume_index = 2;
+ uint64 offset = 3; // Object offset within volume
+ uint64 next_offset = 4; // Next offset to start from in the volume
+ bool repair_tool = 5;
+}
+
+message RegisterObjectReply {}
+
+message UnregisterObjectRequest {
+ bytes name = 1;
+ bool repair_tool = 2;
+}
+
+message UnregisterObjectReply {}
+
+message RenameObjectRequest {
+ bytes name = 1;
+ bytes new_name = 2;
+ bool repair_tool = 3;
+}
+
+message RenameObjectReply {}
+
+message LoadObjectRequest {
+ bytes name = 1;
+ bool is_quarantined = 2;
+ bool repair_tool = 3;
+}
+
+message LoadObjectReply {
+ bytes name = 1;
+ uint32 volume_index = 2;
+ uint64 offset = 3;
+}
+
+message QuarantineObjectRequest {
+ bytes name = 1;
+ bool repair_tool = 2;
+}
+
+message QuarantineObjectReply {}
+
+message UnquarantineObjectRequest {
+ bytes name = 1;
+ bool repair_tool = 2;
+}
+
+message UnquarantineObjectReply {}
+
+message LoadObjectsByPrefixRequest {
+ bytes prefix = 1;
+ bool repair_tool = 2;
+}
+
+message LoadObjectsByPrefixReply {
+ repeated Object objects = 1;
+}
+
+message LoadObjectsByVolumeRequest {
+ uint32 index = 1;
+ bool quarantined = 2; // List only quarantined files, if true
+ bytes page_token = 3;
+ uint32 page_size = 4;
+ bool repair_tool = 5;
+}
+
+message LoadObjectsByVolumeReply {
+ repeated Object objects = 1;
+ bytes next_page_token = 2;
+}
+
+message ListPartitionsRequest {
+ uint32 partition_bits = 1;
+}
+
+message ListPartitionRequest {
+ uint32 partition = 1;
+ uint32 partition_bits = 2;
+}
+
+message ListSuffixRequest {
+ uint32 partition = 1;
+ bytes suffix = 2;
+ uint32 partition_bits = 3;
+}
+
+message ListQuarantinedOHashesRequest {
+ bytes page_token = 1;
+ uint32 page_size = 2;
+}
+
+message ListQuarantinedOHashesReply {
+ repeated QuarantinedObjectName objects = 1;
+ bytes next_page_token = 2;
+}
+
+message ListQuarantinedOHashRequest {
+ bytes prefix = 1;
+ bool repair_tool = 2;
+}
+
+message ListQuarantinedOHashReply {
+ repeated Object objects = 1;
+}
+
+message GetNextOffsetRequest {
+ uint32 volume_index = 1;
+ bool repair_tool = 2;
+}
+
+message GetNextOffsetReply {
+ uint64 offset = 1;
+}
+
+message GetStatsRequest {}
+
+message GetStatsReply {
+ map<string, uint64> stats = 1;
+}
+
+message SetKvStateReply {}
+
+message GetKvStateRequest {}
+
+message KvState {
+ bool isClean = 1;
+}
+
+// Generic messages
+message Volume {
+ uint32 volume_index = 1;
+ VolumeType volume_type = 2;
+ uint32 volume_state = 3;
+ uint32 partition = 4;
+ uint64 next_offset = 5;
+}
+
+message Object {
+ bytes name = 1;
+ uint32 volume_index = 2;
+ uint64 offset = 3;
+}
+
+message QuarantinedObjectName {
+ bytes name = 1;
+}
+
+// For listdir() like functions
+message DirEntries {
+ repeated string entry = 1;
+}
+
+// Enums
+enum VolumeType {
+ VOLUME_DEFAULT = 0;
+ VOLUME_TOMBSTONE = 1;
+ VOLUME_X_DELETE_AT = 2;
+}
+
+enum VolumeState {
+ // Default state, volume can be read from and written to
+ STATE_RW = 0;
+ // Volume is being compacted (source). New objects cannot be appended
+ STATE_COMPACTION_SRC = 1;
+ // Volume is a compaction target. New objects cannot be appended
+ STATE_COMPACTION_TARGET = 2;
+}
diff --git a/swift/obj/fmgr_pb2.py b/swift/obj/fmgr_pb2.py
new file mode 100644
index 000000000..161aa7693
--- /dev/null
+++ b/swift/obj/fmgr_pb2.py
@@ -0,0 +1,2119 @@
+# -*- coding: utf-8 -*-
+# Generated by the protocol buffer compiler. DO NOT EDIT!
+# source: fmgr.proto
+
+from google.protobuf.internal import enum_type_wrapper
+from google.protobuf import descriptor as _descriptor
+from google.protobuf import message as _message
+from google.protobuf import reflection as _reflection
+from google.protobuf import symbol_database as _symbol_database
+# @@protoc_insertion_point(imports)
+
+_sym_db = _symbol_database.Default()
+
+
+
+
+DESCRIPTOR = _descriptor.FileDescriptor(
+ name='fmgr.proto',
+ package='filemgr',
+ syntax='proto3',
+ serialized_options=None,
+ serialized_pb=b'\n\nfmgr.proto\x12\x07\x66ilemgr\"\xad\x01\n\x15RegisterVolumeRequest\x12\x11\n\tpartition\x18\x01 \x01(\r\x12!\n\x04type\x18\x02 \x01(\x0e\x32\x13.filemgr.VolumeType\x12\x14\n\x0cvolume_index\x18\x03 \x01(\r\x12\x0e\n\x06offset\x18\x04 \x01(\x04\x12#\n\x05state\x18\x05 \x01(\x0e\x32\x14.filemgr.VolumeState\x12\x13\n\x0brepair_tool\x18\x06 \x01(\x08\"\x15\n\x13RegisterVolumeReply\"=\n\x17UnregisterVolumeRequest\x12\r\n\x05index\x18\x01 \x01(\r\x12\x13\n\x0brepair_tool\x18\x02 \x01(\x08\"\x17\n\x15UnregisterVolumeReply\"j\n\x18UpdateVolumeStateRequest\x12\x14\n\x0cvolume_index\x18\x01 \x01(\r\x12#\n\x05state\x18\x02 \x01(\x0e\x32\x14.filemgr.VolumeState\x12\x13\n\x0brepair_tool\x18\x03 \x01(\x08\"\x18\n\x16UpdateVolumeStateReply\"6\n\x10GetVolumeRequest\x12\r\n\x05index\x18\x01 \x01(\r\x12\x13\n\x0brepair_tool\x18\x02 \x01(\x08\"\x8e\x01\n\x0eGetVolumeReply\x12\x14\n\x0cvolume_index\x18\x01 \x01(\r\x12(\n\x0bvolume_type\x18\x02 \x01(\x0e\x32\x13.filemgr.VolumeType\x12\x14\n\x0cvolume_state\x18\x03 \x01(\r\x12\x11\n\tpartition\x18\x04 \x01(\r\x12\x13\n\x0bnext_offset\x18\x05 \x01(\x04\"_\n\x12ListVolumesRequest\x12\x11\n\tpartition\x18\x01 \x01(\r\x12!\n\x04type\x18\x02 \x01(\x0e\x32\x13.filemgr.VolumeType\x12\x13\n\x0brepair_tool\x18\x03 \x01(\x08\"4\n\x10ListVolumesReply\x12 \n\x07volumes\x18\x01 \x03(\x0b\x32\x0f.filemgr.Volume\"u\n\x15RegisterObjectRequest\x12\x0c\n\x04name\x18\x01 \x01(\x0c\x12\x14\n\x0cvolume_index\x18\x02 \x01(\r\x12\x0e\n\x06offset\x18\x03 \x01(\x04\x12\x13\n\x0bnext_offset\x18\x04 \x01(\x04\x12\x13\n\x0brepair_tool\x18\x05 \x01(\x08\"\x15\n\x13RegisterObjectReply\"<\n\x17UnregisterObjectRequest\x12\x0c\n\x04name\x18\x01 \x01(\x0c\x12\x13\n\x0brepair_tool\x18\x02 \x01(\x08\"\x17\n\x15UnregisterObjectReply\"J\n\x13RenameObjectRequest\x12\x0c\n\x04name\x18\x01 \x01(\x0c\x12\x10\n\x08new_name\x18\x02 \x01(\x0c\x12\x13\n\x0brepair_tool\x18\x03 \x01(\x08\"\x13\n\x11RenameObjectReply\"N\n\x11LoadObjectRequest\x12\x0c\n\x04name\x18\x01 \x01(\x0c\x12\x16\n\x0eis_quarantined\x18\x02 \x01(\x08\x12\x13\n\x0brepair_tool\x18\x03 \x01(\x08\"E\n\x0fLoadObjectReply\x12\x0c\n\x04name\x18\x01 \x01(\x0c\x12\x14\n\x0cvolume_index\x18\x02 \x01(\r\x12\x0e\n\x06offset\x18\x03 \x01(\x04\"<\n\x17QuarantineObjectRequest\x12\x0c\n\x04name\x18\x01 \x01(\x0c\x12\x13\n\x0brepair_tool\x18\x02 \x01(\x08\"\x17\n\x15QuarantineObjectReply\">\n\x19UnquarantineObjectRequest\x12\x0c\n\x04name\x18\x01 \x01(\x0c\x12\x13\n\x0brepair_tool\x18\x02 \x01(\x08\"\x19\n\x17UnquarantineObjectReply\"A\n\x1aLoadObjectsByPrefixRequest\x12\x0e\n\x06prefix\x18\x01 \x01(\x0c\x12\x13\n\x0brepair_tool\x18\x02 \x01(\x08\"<\n\x18LoadObjectsByPrefixReply\x12 \n\x07objects\x18\x01 \x03(\x0b\x32\x0f.filemgr.Object\"|\n\x1aLoadObjectsByVolumeRequest\x12\r\n\x05index\x18\x01 \x01(\r\x12\x13\n\x0bquarantined\x18\x02 \x01(\x08\x12\x12\n\npage_token\x18\x03 \x01(\x0c\x12\x11\n\tpage_size\x18\x04 \x01(\r\x12\x13\n\x0brepair_tool\x18\x05 \x01(\x08\"U\n\x18LoadObjectsByVolumeReply\x12 \n\x07objects\x18\x01 \x03(\x0b\x32\x0f.filemgr.Object\x12\x17\n\x0fnext_page_token\x18\x02 \x01(\x0c\"/\n\x15ListPartitionsRequest\x12\x16\n\x0epartition_bits\x18\x01 \x01(\r\"A\n\x14ListPartitionRequest\x12\x11\n\tpartition\x18\x01 \x01(\r\x12\x16\n\x0epartition_bits\x18\x02 \x01(\r\"N\n\x11ListSuffixRequest\x12\x11\n\tpartition\x18\x01 \x01(\r\x12\x0e\n\x06suffix\x18\x02 \x01(\x0c\x12\x16\n\x0epartition_bits\x18\x03 \x01(\r\"F\n\x1dListQuarantinedOHashesRequest\x12\x12\n\npage_token\x18\x01 \x01(\x0c\x12\x11\n\tpage_size\x18\x02 \x01(\r\"g\n\x1bListQuarantinedOHashesReply\x12/\n\x07objects\x18\x01 \x03(\x0b\x32\x1e.filemgr.QuarantinedObjectName\x12\x17\n\x0fnext_page_token\x18\x02 \x01(\x0c\"%\n\x15QuarantinedObjectName\x12\x0c\n\x04name\x18\x01 \x01(\x0c\"B\n\x1bListQuarantinedOHashRequest\x12\x0e\n\x06prefix\x18\x01 \x01(\x0c\x12\x13\n\x0brepair_tool\x18\x02 \x01(\x08\"=\n\x19ListQuarantinedOHashReply\x12 \n\x07objects\x18\x01 \x03(\x0b\x32\x0f.filemgr.Object\"A\n\x14GetNextOffsetRequest\x12\x14\n\x0cvolume_index\x18\x01 \x01(\r\x12\x13\n\x0brepair_tool\x18\x02 \x01(\x08\"$\n\x12GetNextOffsetReply\x12\x0e\n\x06offset\x18\x01 \x01(\x04\"\x11\n\x0fGetStatsRequest\"o\n\rGetStatsReply\x12\x30\n\x05stats\x18\x01 \x03(\x0b\x32!.filemgr.GetStatsReply.StatsEntry\x1a,\n\nStatsEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\x04:\x02\x38\x01\"\x11\n\x0fSetKvStateReply\"\x13\n\x11GetKvStateRequest\"\x1a\n\x07KvState\x12\x0f\n\x07isClean\x18\x01 \x01(\x08\"\x86\x01\n\x06Volume\x12\x14\n\x0cvolume_index\x18\x01 \x01(\r\x12(\n\x0bvolume_type\x18\x02 \x01(\x0e\x32\x13.filemgr.VolumeType\x12\x14\n\x0cvolume_state\x18\x03 \x01(\r\x12\x11\n\tpartition\x18\x04 \x01(\r\x12\x13\n\x0bnext_offset\x18\x05 \x01(\x04\"<\n\x06Object\x12\x0c\n\x04name\x18\x01 \x01(\x0c\x12\x14\n\x0cvolume_index\x18\x02 \x01(\r\x12\x0e\n\x06offset\x18\x03 \x01(\x04\"\x1b\n\nDirEntries\x12\r\n\x05\x65ntry\x18\x01 \x03(\t*N\n\nVolumeType\x12\x12\n\x0eVOLUME_DEFAULT\x10\x00\x12\x14\n\x10VOLUME_TOMBSTONE\x10\x01\x12\x16\n\x12VOLUME_X_DELETE_AT\x10\x02*R\n\x0bVolumeState\x12\x0c\n\x08STATE_RW\x10\x00\x12\x18\n\x14STATE_COMPACTION_SRC\x10\x01\x12\x1b\n\x17STATE_COMPACTION_TARGET\x10\x02\x62\x06proto3'
+)
+
+_VOLUMETYPE = _descriptor.EnumDescriptor(
+ name='VolumeType',
+ full_name='filemgr.VolumeType',
+ filename=None,
+ file=DESCRIPTOR,
+ values=[
+ _descriptor.EnumValueDescriptor(
+ name='VOLUME_DEFAULT', index=0, number=0,
+ serialized_options=None,
+ type=None),
+ _descriptor.EnumValueDescriptor(
+ name='VOLUME_TOMBSTONE', index=1, number=1,
+ serialized_options=None,
+ type=None),
+ _descriptor.EnumValueDescriptor(
+ name='VOLUME_X_DELETE_AT', index=2, number=2,
+ serialized_options=None,
+ type=None),
+ ],
+ containing_type=None,
+ serialized_options=None,
+ serialized_start=2869,
+ serialized_end=2947,
+)
+_sym_db.RegisterEnumDescriptor(_VOLUMETYPE)
+
+VolumeType = enum_type_wrapper.EnumTypeWrapper(_VOLUMETYPE)
+_VOLUMESTATE = _descriptor.EnumDescriptor(
+ name='VolumeState',
+ full_name='filemgr.VolumeState',
+ filename=None,
+ file=DESCRIPTOR,
+ values=[
+ _descriptor.EnumValueDescriptor(
+ name='STATE_RW', index=0, number=0,
+ serialized_options=None,
+ type=None),
+ _descriptor.EnumValueDescriptor(
+ name='STATE_COMPACTION_SRC', index=1, number=1,
+ serialized_options=None,
+ type=None),
+ _descriptor.EnumValueDescriptor(
+ name='STATE_COMPACTION_TARGET', index=2, number=2,
+ serialized_options=None,
+ type=None),
+ ],
+ containing_type=None,
+ serialized_options=None,
+ serialized_start=2949,
+ serialized_end=3031,
+)
+_sym_db.RegisterEnumDescriptor(_VOLUMESTATE)
+
+VolumeState = enum_type_wrapper.EnumTypeWrapper(_VOLUMESTATE)
+VOLUME_DEFAULT = 0
+VOLUME_TOMBSTONE = 1
+VOLUME_X_DELETE_AT = 2
+STATE_RW = 0
+STATE_COMPACTION_SRC = 1
+STATE_COMPACTION_TARGET = 2
+
+
+
+_REGISTERVOLUMEREQUEST = _descriptor.Descriptor(
+ name='RegisterVolumeRequest',
+ full_name='filemgr.RegisterVolumeRequest',
+ filename=None,
+ file=DESCRIPTOR,
+ containing_type=None,
+ fields=[
+ _descriptor.FieldDescriptor(
+ name='partition', full_name='filemgr.RegisterVolumeRequest.partition', index=0,
+ number=1, type=13, cpp_type=3, label=1,
+ has_default_value=False, default_value=0,
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ serialized_options=None, file=DESCRIPTOR),
+ _descriptor.FieldDescriptor(
+ name='type', full_name='filemgr.RegisterVolumeRequest.type', index=1,
+ number=2, type=14, cpp_type=8, label=1,
+ has_default_value=False, default_value=0,
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ serialized_options=None, file=DESCRIPTOR),
+ _descriptor.FieldDescriptor(
+ name='volume_index', full_name='filemgr.RegisterVolumeRequest.volume_index', index=2,
+ number=3, type=13, cpp_type=3, label=1,
+ has_default_value=False, default_value=0,
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ serialized_options=None, file=DESCRIPTOR),
+ _descriptor.FieldDescriptor(
+ name='offset', full_name='filemgr.RegisterVolumeRequest.offset', index=3,
+ number=4, type=4, cpp_type=4, label=1,
+ has_default_value=False, default_value=0,
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ serialized_options=None, file=DESCRIPTOR),
+ _descriptor.FieldDescriptor(
+ name='state', full_name='filemgr.RegisterVolumeRequest.state', index=4,
+ number=5, type=14, cpp_type=8, label=1,
+ has_default_value=False, default_value=0,
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ serialized_options=None, file=DESCRIPTOR),
+ _descriptor.FieldDescriptor(
+ name='repair_tool', full_name='filemgr.RegisterVolumeRequest.repair_tool', index=5,
+ number=6, type=8, cpp_type=7, label=1,
+ has_default_value=False, default_value=False,
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ serialized_options=None, file=DESCRIPTOR),
+ ],
+ extensions=[
+ ],
+ nested_types=[],
+ enum_types=[
+ ],
+ serialized_options=None,
+ is_extendable=False,
+ syntax='proto3',
+ extension_ranges=[],
+ oneofs=[
+ ],
+ serialized_start=24,
+ serialized_end=197,
+)
+
+
+_REGISTERVOLUMEREPLY = _descriptor.Descriptor(
+ name='RegisterVolumeReply',
+ full_name='filemgr.RegisterVolumeReply',
+ filename=None,
+ file=DESCRIPTOR,
+ containing_type=None,
+ fields=[
+ ],
+ extensions=[
+ ],
+ nested_types=[],
+ enum_types=[
+ ],
+ serialized_options=None,
+ is_extendable=False,
+ syntax='proto3',
+ extension_ranges=[],
+ oneofs=[
+ ],
+ serialized_start=199,
+ serialized_end=220,
+)
+
+
+_UNREGISTERVOLUMEREQUEST = _descriptor.Descriptor(
+ name='UnregisterVolumeRequest',
+ full_name='filemgr.UnregisterVolumeRequest',
+ filename=None,
+ file=DESCRIPTOR,
+ containing_type=None,
+ fields=[
+ _descriptor.FieldDescriptor(
+ name='index', full_name='filemgr.UnregisterVolumeRequest.index', index=0,
+ number=1, type=13, cpp_type=3, label=1,
+ has_default_value=False, default_value=0,
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ serialized_options=None, file=DESCRIPTOR),
+ _descriptor.FieldDescriptor(
+ name='repair_tool', full_name='filemgr.UnregisterVolumeRequest.repair_tool', index=1,
+ number=2, type=8, cpp_type=7, label=1,
+ has_default_value=False, default_value=False,
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ serialized_options=None, file=DESCRIPTOR),
+ ],
+ extensions=[
+ ],
+ nested_types=[],
+ enum_types=[
+ ],
+ serialized_options=None,
+ is_extendable=False,
+ syntax='proto3',
+ extension_ranges=[],
+ oneofs=[
+ ],
+ serialized_start=222,
+ serialized_end=283,
+)
+
+
+_UNREGISTERVOLUMEREPLY = _descriptor.Descriptor(
+ name='UnregisterVolumeReply',
+ full_name='filemgr.UnregisterVolumeReply',
+ filename=None,
+ file=DESCRIPTOR,
+ containing_type=None,
+ fields=[
+ ],
+ extensions=[
+ ],
+ nested_types=[],
+ enum_types=[
+ ],
+ serialized_options=None,
+ is_extendable=False,
+ syntax='proto3',
+ extension_ranges=[],
+ oneofs=[
+ ],
+ serialized_start=285,
+ serialized_end=308,
+)
+
+
+_UPDATEVOLUMESTATEREQUEST = _descriptor.Descriptor(
+ name='UpdateVolumeStateRequest',
+ full_name='filemgr.UpdateVolumeStateRequest',
+ filename=None,
+ file=DESCRIPTOR,
+ containing_type=None,
+ fields=[
+ _descriptor.FieldDescriptor(
+ name='volume_index', full_name='filemgr.UpdateVolumeStateRequest.volume_index', index=0,
+ number=1, type=13, cpp_type=3, label=1,
+ has_default_value=False, default_value=0,
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ serialized_options=None, file=DESCRIPTOR),
+ _descriptor.FieldDescriptor(
+ name='state', full_name='filemgr.UpdateVolumeStateRequest.state', index=1,
+ number=2, type=14, cpp_type=8, label=1,
+ has_default_value=False, default_value=0,
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ serialized_options=None, file=DESCRIPTOR),
+ _descriptor.FieldDescriptor(
+ name='repair_tool', full_name='filemgr.UpdateVolumeStateRequest.repair_tool', index=2,
+ number=3, type=8, cpp_type=7, label=1,
+ has_default_value=False, default_value=False,
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ serialized_options=None, file=DESCRIPTOR),
+ ],
+ extensions=[
+ ],
+ nested_types=[],
+ enum_types=[
+ ],
+ serialized_options=None,
+ is_extendable=False,
+ syntax='proto3',
+ extension_ranges=[],
+ oneofs=[
+ ],
+ serialized_start=310,
+ serialized_end=416,
+)
+
+
+_UPDATEVOLUMESTATEREPLY = _descriptor.Descriptor(
+ name='UpdateVolumeStateReply',
+ full_name='filemgr.UpdateVolumeStateReply',
+ filename=None,
+ file=DESCRIPTOR,
+ containing_type=None,
+ fields=[
+ ],
+ extensions=[
+ ],
+ nested_types=[],
+ enum_types=[
+ ],
+ serialized_options=None,
+ is_extendable=False,
+ syntax='proto3',
+ extension_ranges=[],
+ oneofs=[
+ ],
+ serialized_start=418,
+ serialized_end=442,
+)
+
+
+_GETVOLUMEREQUEST = _descriptor.Descriptor(
+ name='GetVolumeRequest',
+ full_name='filemgr.GetVolumeRequest',
+ filename=None,
+ file=DESCRIPTOR,
+ containing_type=None,
+ fields=[
+ _descriptor.FieldDescriptor(
+ name='index', full_name='filemgr.GetVolumeRequest.index', index=0,
+ number=1, type=13, cpp_type=3, label=1,
+ has_default_value=False, default_value=0,
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ serialized_options=None, file=DESCRIPTOR),
+ _descriptor.FieldDescriptor(
+ name='repair_tool', full_name='filemgr.GetVolumeRequest.repair_tool', index=1,
+ number=2, type=8, cpp_type=7, label=1,
+ has_default_value=False, default_value=False,
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ serialized_options=None, file=DESCRIPTOR),
+ ],
+ extensions=[
+ ],
+ nested_types=[],
+ enum_types=[
+ ],
+ serialized_options=None,
+ is_extendable=False,
+ syntax='proto3',
+ extension_ranges=[],
+ oneofs=[
+ ],
+ serialized_start=444,
+ serialized_end=498,
+)
+
+
+_GETVOLUMEREPLY = _descriptor.Descriptor(
+ name='GetVolumeReply',
+ full_name='filemgr.GetVolumeReply',
+ filename=None,
+ file=DESCRIPTOR,
+ containing_type=None,
+ fields=[
+ _descriptor.FieldDescriptor(
+ name='volume_index', full_name='filemgr.GetVolumeReply.volume_index', index=0,
+ number=1, type=13, cpp_type=3, label=1,
+ has_default_value=False, default_value=0,
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ serialized_options=None, file=DESCRIPTOR),
+ _descriptor.FieldDescriptor(
+ name='volume_type', full_name='filemgr.GetVolumeReply.volume_type', index=1,
+ number=2, type=14, cpp_type=8, label=1,
+ has_default_value=False, default_value=0,
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ serialized_options=None, file=DESCRIPTOR),
+ _descriptor.FieldDescriptor(
+ name='volume_state', full_name='filemgr.GetVolumeReply.volume_state', index=2,
+ number=3, type=13, cpp_type=3, label=1,
+ has_default_value=False, default_value=0,
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ serialized_options=None, file=DESCRIPTOR),
+ _descriptor.FieldDescriptor(
+ name='partition', full_name='filemgr.GetVolumeReply.partition', index=3,
+ number=4, type=13, cpp_type=3, label=1,
+ has_default_value=False, default_value=0,
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ serialized_options=None, file=DESCRIPTOR),
+ _descriptor.FieldDescriptor(
+ name='next_offset', full_name='filemgr.GetVolumeReply.next_offset', index=4,
+ number=5, type=4, cpp_type=4, label=1,
+ has_default_value=False, default_value=0,
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ serialized_options=None, file=DESCRIPTOR),
+ ],
+ extensions=[
+ ],
+ nested_types=[],
+ enum_types=[
+ ],
+ serialized_options=None,
+ is_extendable=False,
+ syntax='proto3',
+ extension_ranges=[],
+ oneofs=[
+ ],
+ serialized_start=501,
+ serialized_end=643,
+)
+
+
+_LISTVOLUMESREQUEST = _descriptor.Descriptor(
+ name='ListVolumesRequest',
+ full_name='filemgr.ListVolumesRequest',
+ filename=None,
+ file=DESCRIPTOR,
+ containing_type=None,
+ fields=[
+ _descriptor.FieldDescriptor(
+ name='partition', full_name='filemgr.ListVolumesRequest.partition', index=0,
+ number=1, type=13, cpp_type=3, label=1,
+ has_default_value=False, default_value=0,
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ serialized_options=None, file=DESCRIPTOR),
+ _descriptor.FieldDescriptor(
+ name='type', full_name='filemgr.ListVolumesRequest.type', index=1,
+ number=2, type=14, cpp_type=8, label=1,
+ has_default_value=False, default_value=0,
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ serialized_options=None, file=DESCRIPTOR),
+ _descriptor.FieldDescriptor(
+ name='repair_tool', full_name='filemgr.ListVolumesRequest.repair_tool', index=2,
+ number=3, type=8, cpp_type=7, label=1,
+ has_default_value=False, default_value=False,
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ serialized_options=None, file=DESCRIPTOR),
+ ],
+ extensions=[
+ ],
+ nested_types=[],
+ enum_types=[
+ ],
+ serialized_options=None,
+ is_extendable=False,
+ syntax='proto3',
+ extension_ranges=[],
+ oneofs=[
+ ],
+ serialized_start=645,
+ serialized_end=740,
+)
+
+
+_LISTVOLUMESREPLY = _descriptor.Descriptor(
+ name='ListVolumesReply',
+ full_name='filemgr.ListVolumesReply',
+ filename=None,
+ file=DESCRIPTOR,
+ containing_type=None,
+ fields=[
+ _descriptor.FieldDescriptor(
+ name='volumes', full_name='filemgr.ListVolumesReply.volumes', index=0,
+ number=1, type=11, cpp_type=10, label=3,
+ has_default_value=False, default_value=[],
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ serialized_options=None, file=DESCRIPTOR),
+ ],
+ extensions=[
+ ],
+ nested_types=[],
+ enum_types=[
+ ],
+ serialized_options=None,
+ is_extendable=False,
+ syntax='proto3',
+ extension_ranges=[],
+ oneofs=[
+ ],
+ serialized_start=742,
+ serialized_end=794,
+)
+
+
+_REGISTEROBJECTREQUEST = _descriptor.Descriptor(
+ name='RegisterObjectRequest',
+ full_name='filemgr.RegisterObjectRequest',
+ filename=None,
+ file=DESCRIPTOR,
+ containing_type=None,
+ fields=[
+ _descriptor.FieldDescriptor(
+ name='name', full_name='filemgr.RegisterObjectRequest.name', index=0,
+ number=1, type=12, cpp_type=9, label=1,
+ has_default_value=False, default_value=b"",
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ serialized_options=None, file=DESCRIPTOR),
+ _descriptor.FieldDescriptor(
+ name='volume_index', full_name='filemgr.RegisterObjectRequest.volume_index', index=1,
+ number=2, type=13, cpp_type=3, label=1,
+ has_default_value=False, default_value=0,
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ serialized_options=None, file=DESCRIPTOR),
+ _descriptor.FieldDescriptor(
+ name='offset', full_name='filemgr.RegisterObjectRequest.offset', index=2,
+ number=3, type=4, cpp_type=4, label=1,
+ has_default_value=False, default_value=0,
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ serialized_options=None, file=DESCRIPTOR),
+ _descriptor.FieldDescriptor(
+ name='next_offset', full_name='filemgr.RegisterObjectRequest.next_offset', index=3,
+ number=4, type=4, cpp_type=4, label=1,
+ has_default_value=False, default_value=0,
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ serialized_options=None, file=DESCRIPTOR),
+ _descriptor.FieldDescriptor(
+ name='repair_tool', full_name='filemgr.RegisterObjectRequest.repair_tool', index=4,
+ number=5, type=8, cpp_type=7, label=1,
+ has_default_value=False, default_value=False,
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ serialized_options=None, file=DESCRIPTOR),
+ ],
+ extensions=[
+ ],
+ nested_types=[],
+ enum_types=[
+ ],
+ serialized_options=None,
+ is_extendable=False,
+ syntax='proto3',
+ extension_ranges=[],
+ oneofs=[
+ ],
+ serialized_start=796,
+ serialized_end=913,
+)
+
+
+_REGISTEROBJECTREPLY = _descriptor.Descriptor(
+ name='RegisterObjectReply',
+ full_name='filemgr.RegisterObjectReply',
+ filename=None,
+ file=DESCRIPTOR,
+ containing_type=None,
+ fields=[
+ ],
+ extensions=[
+ ],
+ nested_types=[],
+ enum_types=[
+ ],
+ serialized_options=None,
+ is_extendable=False,
+ syntax='proto3',
+ extension_ranges=[],
+ oneofs=[
+ ],
+ serialized_start=915,
+ serialized_end=936,
+)
+
+
+_UNREGISTEROBJECTREQUEST = _descriptor.Descriptor(
+ name='UnregisterObjectRequest',
+ full_name='filemgr.UnregisterObjectRequest',
+ filename=None,
+ file=DESCRIPTOR,
+ containing_type=None,
+ fields=[
+ _descriptor.FieldDescriptor(
+ name='name', full_name='filemgr.UnregisterObjectRequest.name', index=0,
+ number=1, type=12, cpp_type=9, label=1,
+ has_default_value=False, default_value=b"",
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ serialized_options=None, file=DESCRIPTOR),
+ _descriptor.FieldDescriptor(
+ name='repair_tool', full_name='filemgr.UnregisterObjectRequest.repair_tool', index=1,
+ number=2, type=8, cpp_type=7, label=1,
+ has_default_value=False, default_value=False,
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ serialized_options=None, file=DESCRIPTOR),
+ ],
+ extensions=[
+ ],
+ nested_types=[],
+ enum_types=[
+ ],
+ serialized_options=None,
+ is_extendable=False,
+ syntax='proto3',
+ extension_ranges=[],
+ oneofs=[
+ ],
+ serialized_start=938,
+ serialized_end=998,
+)
+
+
+_UNREGISTEROBJECTREPLY = _descriptor.Descriptor(
+ name='UnregisterObjectReply',
+ full_name='filemgr.UnregisterObjectReply',
+ filename=None,
+ file=DESCRIPTOR,
+ containing_type=None,
+ fields=[
+ ],
+ extensions=[
+ ],
+ nested_types=[],
+ enum_types=[
+ ],
+ serialized_options=None,
+ is_extendable=False,
+ syntax='proto3',
+ extension_ranges=[],
+ oneofs=[
+ ],
+ serialized_start=1000,
+ serialized_end=1023,
+)
+
+
+_RENAMEOBJECTREQUEST = _descriptor.Descriptor(
+ name='RenameObjectRequest',
+ full_name='filemgr.RenameObjectRequest',
+ filename=None,
+ file=DESCRIPTOR,
+ containing_type=None,
+ fields=[
+ _descriptor.FieldDescriptor(
+ name='name', full_name='filemgr.RenameObjectRequest.name', index=0,
+ number=1, type=12, cpp_type=9, label=1,
+ has_default_value=False, default_value=b"",
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ serialized_options=None, file=DESCRIPTOR),
+ _descriptor.FieldDescriptor(
+ name='new_name', full_name='filemgr.RenameObjectRequest.new_name', index=1,
+ number=2, type=12, cpp_type=9, label=1,
+ has_default_value=False, default_value=b"",
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ serialized_options=None, file=DESCRIPTOR),
+ _descriptor.FieldDescriptor(
+ name='repair_tool', full_name='filemgr.RenameObjectRequest.repair_tool', index=2,
+ number=3, type=8, cpp_type=7, label=1,
+ has_default_value=False, default_value=False,
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ serialized_options=None, file=DESCRIPTOR),
+ ],
+ extensions=[
+ ],
+ nested_types=[],
+ enum_types=[
+ ],
+ serialized_options=None,
+ is_extendable=False,
+ syntax='proto3',
+ extension_ranges=[],
+ oneofs=[
+ ],
+ serialized_start=1025,
+ serialized_end=1099,
+)
+
+
+_RENAMEOBJECTREPLY = _descriptor.Descriptor(
+ name='RenameObjectReply',
+ full_name='filemgr.RenameObjectReply',
+ filename=None,
+ file=DESCRIPTOR,
+ containing_type=None,
+ fields=[
+ ],
+ extensions=[
+ ],
+ nested_types=[],
+ enum_types=[
+ ],
+ serialized_options=None,
+ is_extendable=False,
+ syntax='proto3',
+ extension_ranges=[],
+ oneofs=[
+ ],
+ serialized_start=1101,
+ serialized_end=1120,
+)
+
+
+_LOADOBJECTREQUEST = _descriptor.Descriptor(
+ name='LoadObjectRequest',
+ full_name='filemgr.LoadObjectRequest',
+ filename=None,
+ file=DESCRIPTOR,
+ containing_type=None,
+ fields=[
+ _descriptor.FieldDescriptor(
+ name='name', full_name='filemgr.LoadObjectRequest.name', index=0,
+ number=1, type=12, cpp_type=9, label=1,
+ has_default_value=False, default_value=b"",
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ serialized_options=None, file=DESCRIPTOR),
+ _descriptor.FieldDescriptor(
+ name='is_quarantined', full_name='filemgr.LoadObjectRequest.is_quarantined', index=1,
+ number=2, type=8, cpp_type=7, label=1,
+ has_default_value=False, default_value=False,
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ serialized_options=None, file=DESCRIPTOR),
+ _descriptor.FieldDescriptor(
+ name='repair_tool', full_name='filemgr.LoadObjectRequest.repair_tool', index=2,
+ number=3, type=8, cpp_type=7, label=1,
+ has_default_value=False, default_value=False,
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ serialized_options=None, file=DESCRIPTOR),
+ ],
+ extensions=[
+ ],
+ nested_types=[],
+ enum_types=[
+ ],
+ serialized_options=None,
+ is_extendable=False,
+ syntax='proto3',
+ extension_ranges=[],
+ oneofs=[
+ ],
+ serialized_start=1122,
+ serialized_end=1200,
+)
+
+
+_LOADOBJECTREPLY = _descriptor.Descriptor(
+ name='LoadObjectReply',
+ full_name='filemgr.LoadObjectReply',
+ filename=None,
+ file=DESCRIPTOR,
+ containing_type=None,
+ fields=[
+ _descriptor.FieldDescriptor(
+ name='name', full_name='filemgr.LoadObjectReply.name', index=0,
+ number=1, type=12, cpp_type=9, label=1,
+ has_default_value=False, default_value=b"",
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ serialized_options=None, file=DESCRIPTOR),
+ _descriptor.FieldDescriptor(
+ name='volume_index', full_name='filemgr.LoadObjectReply.volume_index', index=1,
+ number=2, type=13, cpp_type=3, label=1,
+ has_default_value=False, default_value=0,
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ serialized_options=None, file=DESCRIPTOR),
+ _descriptor.FieldDescriptor(
+ name='offset', full_name='filemgr.LoadObjectReply.offset', index=2,
+ number=3, type=4, cpp_type=4, label=1,
+ has_default_value=False, default_value=0,
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ serialized_options=None, file=DESCRIPTOR),
+ ],
+ extensions=[
+ ],
+ nested_types=[],
+ enum_types=[
+ ],
+ serialized_options=None,
+ is_extendable=False,
+ syntax='proto3',
+ extension_ranges=[],
+ oneofs=[
+ ],
+ serialized_start=1202,
+ serialized_end=1271,
+)
+
+
+_QUARANTINEOBJECTREQUEST = _descriptor.Descriptor(
+ name='QuarantineObjectRequest',
+ full_name='filemgr.QuarantineObjectRequest',
+ filename=None,
+ file=DESCRIPTOR,
+ containing_type=None,
+ fields=[
+ _descriptor.FieldDescriptor(
+ name='name', full_name='filemgr.QuarantineObjectRequest.name', index=0,
+ number=1, type=12, cpp_type=9, label=1,
+ has_default_value=False, default_value=b"",
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ serialized_options=None, file=DESCRIPTOR),
+ _descriptor.FieldDescriptor(
+ name='repair_tool', full_name='filemgr.QuarantineObjectRequest.repair_tool', index=1,
+ number=2, type=8, cpp_type=7, label=1,
+ has_default_value=False, default_value=False,
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ serialized_options=None, file=DESCRIPTOR),
+ ],
+ extensions=[
+ ],
+ nested_types=[],
+ enum_types=[
+ ],
+ serialized_options=None,
+ is_extendable=False,
+ syntax='proto3',
+ extension_ranges=[],
+ oneofs=[
+ ],
+ serialized_start=1273,
+ serialized_end=1333,
+)
+
+
+_QUARANTINEOBJECTREPLY = _descriptor.Descriptor(
+ name='QuarantineObjectReply',
+ full_name='filemgr.QuarantineObjectReply',
+ filename=None,
+ file=DESCRIPTOR,
+ containing_type=None,
+ fields=[
+ ],
+ extensions=[
+ ],
+ nested_types=[],
+ enum_types=[
+ ],
+ serialized_options=None,
+ is_extendable=False,
+ syntax='proto3',
+ extension_ranges=[],
+ oneofs=[
+ ],
+ serialized_start=1335,
+ serialized_end=1358,
+)
+
+
+_UNQUARANTINEOBJECTREQUEST = _descriptor.Descriptor(
+ name='UnquarantineObjectRequest',
+ full_name='filemgr.UnquarantineObjectRequest',
+ filename=None,
+ file=DESCRIPTOR,
+ containing_type=None,
+ fields=[
+ _descriptor.FieldDescriptor(
+ name='name', full_name='filemgr.UnquarantineObjectRequest.name', index=0,
+ number=1, type=12, cpp_type=9, label=1,
+ has_default_value=False, default_value=b"",
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ serialized_options=None, file=DESCRIPTOR),
+ _descriptor.FieldDescriptor(
+ name='repair_tool', full_name='filemgr.UnquarantineObjectRequest.repair_tool', index=1,
+ number=2, type=8, cpp_type=7, label=1,
+ has_default_value=False, default_value=False,
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ serialized_options=None, file=DESCRIPTOR),
+ ],
+ extensions=[
+ ],
+ nested_types=[],
+ enum_types=[
+ ],
+ serialized_options=None,
+ is_extendable=False,
+ syntax='proto3',
+ extension_ranges=[],
+ oneofs=[
+ ],
+ serialized_start=1360,
+ serialized_end=1422,
+)
+
+
+_UNQUARANTINEOBJECTREPLY = _descriptor.Descriptor(
+ name='UnquarantineObjectReply',
+ full_name='filemgr.UnquarantineObjectReply',
+ filename=None,
+ file=DESCRIPTOR,
+ containing_type=None,
+ fields=[
+ ],
+ extensions=[
+ ],
+ nested_types=[],
+ enum_types=[
+ ],
+ serialized_options=None,
+ is_extendable=False,
+ syntax='proto3',
+ extension_ranges=[],
+ oneofs=[
+ ],
+ serialized_start=1424,
+ serialized_end=1449,
+)
+
+
+_LOADOBJECTSBYPREFIXREQUEST = _descriptor.Descriptor(
+ name='LoadObjectsByPrefixRequest',
+ full_name='filemgr.LoadObjectsByPrefixRequest',
+ filename=None,
+ file=DESCRIPTOR,
+ containing_type=None,
+ fields=[
+ _descriptor.FieldDescriptor(
+ name='prefix', full_name='filemgr.LoadObjectsByPrefixRequest.prefix', index=0,
+ number=1, type=12, cpp_type=9, label=1,
+ has_default_value=False, default_value=b"",
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ serialized_options=None, file=DESCRIPTOR),
+ _descriptor.FieldDescriptor(
+ name='repair_tool', full_name='filemgr.LoadObjectsByPrefixRequest.repair_tool', index=1,
+ number=2, type=8, cpp_type=7, label=1,
+ has_default_value=False, default_value=False,
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ serialized_options=None, file=DESCRIPTOR),
+ ],
+ extensions=[
+ ],
+ nested_types=[],
+ enum_types=[
+ ],
+ serialized_options=None,
+ is_extendable=False,
+ syntax='proto3',
+ extension_ranges=[],
+ oneofs=[
+ ],
+ serialized_start=1451,
+ serialized_end=1516,
+)
+
+
+_LOADOBJECTSBYPREFIXREPLY = _descriptor.Descriptor(
+ name='LoadObjectsByPrefixReply',
+ full_name='filemgr.LoadObjectsByPrefixReply',
+ filename=None,
+ file=DESCRIPTOR,
+ containing_type=None,
+ fields=[
+ _descriptor.FieldDescriptor(
+ name='objects', full_name='filemgr.LoadObjectsByPrefixReply.objects', index=0,
+ number=1, type=11, cpp_type=10, label=3,
+ has_default_value=False, default_value=[],
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ serialized_options=None, file=DESCRIPTOR),
+ ],
+ extensions=[
+ ],
+ nested_types=[],
+ enum_types=[
+ ],
+ serialized_options=None,
+ is_extendable=False,
+ syntax='proto3',
+ extension_ranges=[],
+ oneofs=[
+ ],
+ serialized_start=1518,
+ serialized_end=1578,
+)
+
+
+_LOADOBJECTSBYVOLUMEREQUEST = _descriptor.Descriptor(
+ name='LoadObjectsByVolumeRequest',
+ full_name='filemgr.LoadObjectsByVolumeRequest',
+ filename=None,
+ file=DESCRIPTOR,
+ containing_type=None,
+ fields=[
+ _descriptor.FieldDescriptor(
+ name='index', full_name='filemgr.LoadObjectsByVolumeRequest.index', index=0,
+ number=1, type=13, cpp_type=3, label=1,
+ has_default_value=False, default_value=0,
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ serialized_options=None, file=DESCRIPTOR),
+ _descriptor.FieldDescriptor(
+ name='quarantined', full_name='filemgr.LoadObjectsByVolumeRequest.quarantined', index=1,
+ number=2, type=8, cpp_type=7, label=1,
+ has_default_value=False, default_value=False,
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ serialized_options=None, file=DESCRIPTOR),
+ _descriptor.FieldDescriptor(
+ name='page_token', full_name='filemgr.LoadObjectsByVolumeRequest.page_token', index=2,
+ number=3, type=12, cpp_type=9, label=1,
+ has_default_value=False, default_value=b"",
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ serialized_options=None, file=DESCRIPTOR),
+ _descriptor.FieldDescriptor(
+ name='page_size', full_name='filemgr.LoadObjectsByVolumeRequest.page_size', index=3,
+ number=4, type=13, cpp_type=3, label=1,
+ has_default_value=False, default_value=0,
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ serialized_options=None, file=DESCRIPTOR),
+ _descriptor.FieldDescriptor(
+ name='repair_tool', full_name='filemgr.LoadObjectsByVolumeRequest.repair_tool', index=4,
+ number=5, type=8, cpp_type=7, label=1,
+ has_default_value=False, default_value=False,
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ serialized_options=None, file=DESCRIPTOR),
+ ],
+ extensions=[
+ ],
+ nested_types=[],
+ enum_types=[
+ ],
+ serialized_options=None,
+ is_extendable=False,
+ syntax='proto3',
+ extension_ranges=[],
+ oneofs=[
+ ],
+ serialized_start=1580,
+ serialized_end=1704,
+)
+
+
+_LOADOBJECTSBYVOLUMEREPLY = _descriptor.Descriptor(
+ name='LoadObjectsByVolumeReply',
+ full_name='filemgr.LoadObjectsByVolumeReply',
+ filename=None,
+ file=DESCRIPTOR,
+ containing_type=None,
+ fields=[
+ _descriptor.FieldDescriptor(
+ name='objects', full_name='filemgr.LoadObjectsByVolumeReply.objects', index=0,
+ number=1, type=11, cpp_type=10, label=3,
+ has_default_value=False, default_value=[],
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ serialized_options=None, file=DESCRIPTOR),
+ _descriptor.FieldDescriptor(
+ name='next_page_token', full_name='filemgr.LoadObjectsByVolumeReply.next_page_token', index=1,
+ number=2, type=12, cpp_type=9, label=1,
+ has_default_value=False, default_value=b"",
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ serialized_options=None, file=DESCRIPTOR),
+ ],
+ extensions=[
+ ],
+ nested_types=[],
+ enum_types=[
+ ],
+ serialized_options=None,
+ is_extendable=False,
+ syntax='proto3',
+ extension_ranges=[],
+ oneofs=[
+ ],
+ serialized_start=1706,
+ serialized_end=1791,
+)
+
+
+_LISTPARTITIONSREQUEST = _descriptor.Descriptor(
+ name='ListPartitionsRequest',
+ full_name='filemgr.ListPartitionsRequest',
+ filename=None,
+ file=DESCRIPTOR,
+ containing_type=None,
+ fields=[
+ _descriptor.FieldDescriptor(
+ name='partition_bits', full_name='filemgr.ListPartitionsRequest.partition_bits', index=0,
+ number=1, type=13, cpp_type=3, label=1,
+ has_default_value=False, default_value=0,
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ serialized_options=None, file=DESCRIPTOR),
+ ],
+ extensions=[
+ ],
+ nested_types=[],
+ enum_types=[
+ ],
+ serialized_options=None,
+ is_extendable=False,
+ syntax='proto3',
+ extension_ranges=[],
+ oneofs=[
+ ],
+ serialized_start=1793,
+ serialized_end=1840,
+)
+
+
+_LISTPARTITIONREQUEST = _descriptor.Descriptor(
+ name='ListPartitionRequest',
+ full_name='filemgr.ListPartitionRequest',
+ filename=None,
+ file=DESCRIPTOR,
+ containing_type=None,
+ fields=[
+ _descriptor.FieldDescriptor(
+ name='partition', full_name='filemgr.ListPartitionRequest.partition', index=0,
+ number=1, type=13, cpp_type=3, label=1,
+ has_default_value=False, default_value=0,
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ serialized_options=None, file=DESCRIPTOR),
+ _descriptor.FieldDescriptor(
+ name='partition_bits', full_name='filemgr.ListPartitionRequest.partition_bits', index=1,
+ number=2, type=13, cpp_type=3, label=1,
+ has_default_value=False, default_value=0,
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ serialized_options=None, file=DESCRIPTOR),
+ ],
+ extensions=[
+ ],
+ nested_types=[],
+ enum_types=[
+ ],
+ serialized_options=None,
+ is_extendable=False,
+ syntax='proto3',
+ extension_ranges=[],
+ oneofs=[
+ ],
+ serialized_start=1842,
+ serialized_end=1907,
+)
+
+
+_LISTSUFFIXREQUEST = _descriptor.Descriptor(
+ name='ListSuffixRequest',
+ full_name='filemgr.ListSuffixRequest',
+ filename=None,
+ file=DESCRIPTOR,
+ containing_type=None,
+ fields=[
+ _descriptor.FieldDescriptor(
+ name='partition', full_name='filemgr.ListSuffixRequest.partition', index=0,
+ number=1, type=13, cpp_type=3, label=1,
+ has_default_value=False, default_value=0,
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ serialized_options=None, file=DESCRIPTOR),
+ _descriptor.FieldDescriptor(
+ name='suffix', full_name='filemgr.ListSuffixRequest.suffix', index=1,
+ number=2, type=12, cpp_type=9, label=1,
+ has_default_value=False, default_value=b"",
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ serialized_options=None, file=DESCRIPTOR),
+ _descriptor.FieldDescriptor(
+ name='partition_bits', full_name='filemgr.ListSuffixRequest.partition_bits', index=2,
+ number=3, type=13, cpp_type=3, label=1,
+ has_default_value=False, default_value=0,
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ serialized_options=None, file=DESCRIPTOR),
+ ],
+ extensions=[
+ ],
+ nested_types=[],
+ enum_types=[
+ ],
+ serialized_options=None,
+ is_extendable=False,
+ syntax='proto3',
+ extension_ranges=[],
+ oneofs=[
+ ],
+ serialized_start=1909,
+ serialized_end=1987,
+)
+
+
+_LISTQUARANTINEDOHASHESREQUEST = _descriptor.Descriptor(
+ name='ListQuarantinedOHashesRequest',
+ full_name='filemgr.ListQuarantinedOHashesRequest',
+ filename=None,
+ file=DESCRIPTOR,
+ containing_type=None,
+ fields=[
+ _descriptor.FieldDescriptor(
+ name='page_token', full_name='filemgr.ListQuarantinedOHashesRequest.page_token', index=0,
+ number=1, type=12, cpp_type=9, label=1,
+ has_default_value=False, default_value=b"",
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ serialized_options=None, file=DESCRIPTOR),
+ _descriptor.FieldDescriptor(
+ name='page_size', full_name='filemgr.ListQuarantinedOHashesRequest.page_size', index=1,
+ number=2, type=13, cpp_type=3, label=1,
+ has_default_value=False, default_value=0,
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ serialized_options=None, file=DESCRIPTOR),
+ ],
+ extensions=[
+ ],
+ nested_types=[],
+ enum_types=[
+ ],
+ serialized_options=None,
+ is_extendable=False,
+ syntax='proto3',
+ extension_ranges=[],
+ oneofs=[
+ ],
+ serialized_start=1989,
+ serialized_end=2059,
+)
+
+
+_LISTQUARANTINEDOHASHESREPLY = _descriptor.Descriptor(
+ name='ListQuarantinedOHashesReply',
+ full_name='filemgr.ListQuarantinedOHashesReply',
+ filename=None,
+ file=DESCRIPTOR,
+ containing_type=None,
+ fields=[
+ _descriptor.FieldDescriptor(
+ name='objects', full_name='filemgr.ListQuarantinedOHashesReply.objects', index=0,
+ number=1, type=11, cpp_type=10, label=3,
+ has_default_value=False, default_value=[],
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ serialized_options=None, file=DESCRIPTOR),
+ _descriptor.FieldDescriptor(
+ name='next_page_token', full_name='filemgr.ListQuarantinedOHashesReply.next_page_token', index=1,
+ number=2, type=12, cpp_type=9, label=1,
+ has_default_value=False, default_value=b"",
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ serialized_options=None, file=DESCRIPTOR),
+ ],
+ extensions=[
+ ],
+ nested_types=[],
+ enum_types=[
+ ],
+ serialized_options=None,
+ is_extendable=False,
+ syntax='proto3',
+ extension_ranges=[],
+ oneofs=[
+ ],
+ serialized_start=2061,
+ serialized_end=2164,
+)
+
+
+_QUARANTINEDOBJECTNAME = _descriptor.Descriptor(
+ name='QuarantinedObjectName',
+ full_name='filemgr.QuarantinedObjectName',
+ filename=None,
+ file=DESCRIPTOR,
+ containing_type=None,
+ fields=[
+ _descriptor.FieldDescriptor(
+ name='name', full_name='filemgr.QuarantinedObjectName.name', index=0,
+ number=1, type=12, cpp_type=9, label=1,
+ has_default_value=False, default_value=b"",
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ serialized_options=None, file=DESCRIPTOR),
+ ],
+ extensions=[
+ ],
+ nested_types=[],
+ enum_types=[
+ ],
+ serialized_options=None,
+ is_extendable=False,
+ syntax='proto3',
+ extension_ranges=[],
+ oneofs=[
+ ],
+ serialized_start=2166,
+ serialized_end=2203,
+)
+
+
+_LISTQUARANTINEDOHASHREQUEST = _descriptor.Descriptor(
+ name='ListQuarantinedOHashRequest',
+ full_name='filemgr.ListQuarantinedOHashRequest',
+ filename=None,
+ file=DESCRIPTOR,
+ containing_type=None,
+ fields=[
+ _descriptor.FieldDescriptor(
+ name='prefix', full_name='filemgr.ListQuarantinedOHashRequest.prefix', index=0,
+ number=1, type=12, cpp_type=9, label=1,
+ has_default_value=False, default_value=b"",
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ serialized_options=None, file=DESCRIPTOR),
+ _descriptor.FieldDescriptor(
+ name='repair_tool', full_name='filemgr.ListQuarantinedOHashRequest.repair_tool', index=1,
+ number=2, type=8, cpp_type=7, label=1,
+ has_default_value=False, default_value=False,
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ serialized_options=None, file=DESCRIPTOR),
+ ],
+ extensions=[
+ ],
+ nested_types=[],
+ enum_types=[
+ ],
+ serialized_options=None,
+ is_extendable=False,
+ syntax='proto3',
+ extension_ranges=[],
+ oneofs=[
+ ],
+ serialized_start=2205,
+ serialized_end=2271,
+)
+
+
+_LISTQUARANTINEDOHASHREPLY = _descriptor.Descriptor(
+ name='ListQuarantinedOHashReply',
+ full_name='filemgr.ListQuarantinedOHashReply',
+ filename=None,
+ file=DESCRIPTOR,
+ containing_type=None,
+ fields=[
+ _descriptor.FieldDescriptor(
+ name='objects', full_name='filemgr.ListQuarantinedOHashReply.objects', index=0,
+ number=1, type=11, cpp_type=10, label=3,
+ has_default_value=False, default_value=[],
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ serialized_options=None, file=DESCRIPTOR),
+ ],
+ extensions=[
+ ],
+ nested_types=[],
+ enum_types=[
+ ],
+ serialized_options=None,
+ is_extendable=False,
+ syntax='proto3',
+ extension_ranges=[],
+ oneofs=[
+ ],
+ serialized_start=2273,
+ serialized_end=2334,
+)
+
+
+_GETNEXTOFFSETREQUEST = _descriptor.Descriptor(
+ name='GetNextOffsetRequest',
+ full_name='filemgr.GetNextOffsetRequest',
+ filename=None,
+ file=DESCRIPTOR,
+ containing_type=None,
+ fields=[
+ _descriptor.FieldDescriptor(
+ name='volume_index', full_name='filemgr.GetNextOffsetRequest.volume_index', index=0,
+ number=1, type=13, cpp_type=3, label=1,
+ has_default_value=False, default_value=0,
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ serialized_options=None, file=DESCRIPTOR),
+ _descriptor.FieldDescriptor(
+ name='repair_tool', full_name='filemgr.GetNextOffsetRequest.repair_tool', index=1,
+ number=2, type=8, cpp_type=7, label=1,
+ has_default_value=False, default_value=False,
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ serialized_options=None, file=DESCRIPTOR),
+ ],
+ extensions=[
+ ],
+ nested_types=[],
+ enum_types=[
+ ],
+ serialized_options=None,
+ is_extendable=False,
+ syntax='proto3',
+ extension_ranges=[],
+ oneofs=[
+ ],
+ serialized_start=2336,
+ serialized_end=2401,
+)
+
+
+_GETNEXTOFFSETREPLY = _descriptor.Descriptor(
+ name='GetNextOffsetReply',
+ full_name='filemgr.GetNextOffsetReply',
+ filename=None,
+ file=DESCRIPTOR,
+ containing_type=None,
+ fields=[
+ _descriptor.FieldDescriptor(
+ name='offset', full_name='filemgr.GetNextOffsetReply.offset', index=0,
+ number=1, type=4, cpp_type=4, label=1,
+ has_default_value=False, default_value=0,
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ serialized_options=None, file=DESCRIPTOR),
+ ],
+ extensions=[
+ ],
+ nested_types=[],
+ enum_types=[
+ ],
+ serialized_options=None,
+ is_extendable=False,
+ syntax='proto3',
+ extension_ranges=[],
+ oneofs=[
+ ],
+ serialized_start=2403,
+ serialized_end=2439,
+)
+
+
+_GETSTATSREQUEST = _descriptor.Descriptor(
+ name='GetStatsRequest',
+ full_name='filemgr.GetStatsRequest',
+ filename=None,
+ file=DESCRIPTOR,
+ containing_type=None,
+ fields=[
+ ],
+ extensions=[
+ ],
+ nested_types=[],
+ enum_types=[
+ ],
+ serialized_options=None,
+ is_extendable=False,
+ syntax='proto3',
+ extension_ranges=[],
+ oneofs=[
+ ],
+ serialized_start=2441,
+ serialized_end=2458,
+)
+
+
+_GETSTATSREPLY_STATSENTRY = _descriptor.Descriptor(
+ name='StatsEntry',
+ full_name='filemgr.GetStatsReply.StatsEntry',
+ filename=None,
+ file=DESCRIPTOR,
+ containing_type=None,
+ fields=[
+ _descriptor.FieldDescriptor(
+ name='key', full_name='filemgr.GetStatsReply.StatsEntry.key', index=0,
+ number=1, type=9, cpp_type=9, label=1,
+ has_default_value=False, default_value=b"".decode('utf-8'),
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ serialized_options=None, file=DESCRIPTOR),
+ _descriptor.FieldDescriptor(
+ name='value', full_name='filemgr.GetStatsReply.StatsEntry.value', index=1,
+ number=2, type=4, cpp_type=4, label=1,
+ has_default_value=False, default_value=0,
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ serialized_options=None, file=DESCRIPTOR),
+ ],
+ extensions=[
+ ],
+ nested_types=[],
+ enum_types=[
+ ],
+ serialized_options=b'8\001',
+ is_extendable=False,
+ syntax='proto3',
+ extension_ranges=[],
+ oneofs=[
+ ],
+ serialized_start=2527,
+ serialized_end=2571,
+)
+
+_GETSTATSREPLY = _descriptor.Descriptor(
+ name='GetStatsReply',
+ full_name='filemgr.GetStatsReply',
+ filename=None,
+ file=DESCRIPTOR,
+ containing_type=None,
+ fields=[
+ _descriptor.FieldDescriptor(
+ name='stats', full_name='filemgr.GetStatsReply.stats', index=0,
+ number=1, type=11, cpp_type=10, label=3,
+ has_default_value=False, default_value=[],
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ serialized_options=None, file=DESCRIPTOR),
+ ],
+ extensions=[
+ ],
+ nested_types=[_GETSTATSREPLY_STATSENTRY, ],
+ enum_types=[
+ ],
+ serialized_options=None,
+ is_extendable=False,
+ syntax='proto3',
+ extension_ranges=[],
+ oneofs=[
+ ],
+ serialized_start=2460,
+ serialized_end=2571,
+)
+
+
+_SETKVSTATEREPLY = _descriptor.Descriptor(
+ name='SetKvStateReply',
+ full_name='filemgr.SetKvStateReply',
+ filename=None,
+ file=DESCRIPTOR,
+ containing_type=None,
+ fields=[
+ ],
+ extensions=[
+ ],
+ nested_types=[],
+ enum_types=[
+ ],
+ serialized_options=None,
+ is_extendable=False,
+ syntax='proto3',
+ extension_ranges=[],
+ oneofs=[
+ ],
+ serialized_start=2573,
+ serialized_end=2590,
+)
+
+
+_GETKVSTATEREQUEST = _descriptor.Descriptor(
+ name='GetKvStateRequest',
+ full_name='filemgr.GetKvStateRequest',
+ filename=None,
+ file=DESCRIPTOR,
+ containing_type=None,
+ fields=[
+ ],
+ extensions=[
+ ],
+ nested_types=[],
+ enum_types=[
+ ],
+ serialized_options=None,
+ is_extendable=False,
+ syntax='proto3',
+ extension_ranges=[],
+ oneofs=[
+ ],
+ serialized_start=2592,
+ serialized_end=2611,
+)
+
+
+_KVSTATE = _descriptor.Descriptor(
+ name='KvState',
+ full_name='filemgr.KvState',
+ filename=None,
+ file=DESCRIPTOR,
+ containing_type=None,
+ fields=[
+ _descriptor.FieldDescriptor(
+ name='isClean', full_name='filemgr.KvState.isClean', index=0,
+ number=1, type=8, cpp_type=7, label=1,
+ has_default_value=False, default_value=False,
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ serialized_options=None, file=DESCRIPTOR),
+ ],
+ extensions=[
+ ],
+ nested_types=[],
+ enum_types=[
+ ],
+ serialized_options=None,
+ is_extendable=False,
+ syntax='proto3',
+ extension_ranges=[],
+ oneofs=[
+ ],
+ serialized_start=2613,
+ serialized_end=2639,
+)
+
+
+_VOLUME = _descriptor.Descriptor(
+ name='Volume',
+ full_name='filemgr.Volume',
+ filename=None,
+ file=DESCRIPTOR,
+ containing_type=None,
+ fields=[
+ _descriptor.FieldDescriptor(
+ name='volume_index', full_name='filemgr.Volume.volume_index', index=0,
+ number=1, type=13, cpp_type=3, label=1,
+ has_default_value=False, default_value=0,
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ serialized_options=None, file=DESCRIPTOR),
+ _descriptor.FieldDescriptor(
+ name='volume_type', full_name='filemgr.Volume.volume_type', index=1,
+ number=2, type=14, cpp_type=8, label=1,
+ has_default_value=False, default_value=0,
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ serialized_options=None, file=DESCRIPTOR),
+ _descriptor.FieldDescriptor(
+ name='volume_state', full_name='filemgr.Volume.volume_state', index=2,
+ number=3, type=13, cpp_type=3, label=1,
+ has_default_value=False, default_value=0,
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ serialized_options=None, file=DESCRIPTOR),
+ _descriptor.FieldDescriptor(
+ name='partition', full_name='filemgr.Volume.partition', index=3,
+ number=4, type=13, cpp_type=3, label=1,
+ has_default_value=False, default_value=0,
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ serialized_options=None, file=DESCRIPTOR),
+ _descriptor.FieldDescriptor(
+ name='next_offset', full_name='filemgr.Volume.next_offset', index=4,
+ number=5, type=4, cpp_type=4, label=1,
+ has_default_value=False, default_value=0,
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ serialized_options=None, file=DESCRIPTOR),
+ ],
+ extensions=[
+ ],
+ nested_types=[],
+ enum_types=[
+ ],
+ serialized_options=None,
+ is_extendable=False,
+ syntax='proto3',
+ extension_ranges=[],
+ oneofs=[
+ ],
+ serialized_start=2642,
+ serialized_end=2776,
+)
+
+
+_OBJECT = _descriptor.Descriptor(
+ name='Object',
+ full_name='filemgr.Object',
+ filename=None,
+ file=DESCRIPTOR,
+ containing_type=None,
+ fields=[
+ _descriptor.FieldDescriptor(
+ name='name', full_name='filemgr.Object.name', index=0,
+ number=1, type=12, cpp_type=9, label=1,
+ has_default_value=False, default_value=b"",
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ serialized_options=None, file=DESCRIPTOR),
+ _descriptor.FieldDescriptor(
+ name='volume_index', full_name='filemgr.Object.volume_index', index=1,
+ number=2, type=13, cpp_type=3, label=1,
+ has_default_value=False, default_value=0,
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ serialized_options=None, file=DESCRIPTOR),
+ _descriptor.FieldDescriptor(
+ name='offset', full_name='filemgr.Object.offset', index=2,
+ number=3, type=4, cpp_type=4, label=1,
+ has_default_value=False, default_value=0,
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ serialized_options=None, file=DESCRIPTOR),
+ ],
+ extensions=[
+ ],
+ nested_types=[],
+ enum_types=[
+ ],
+ serialized_options=None,
+ is_extendable=False,
+ syntax='proto3',
+ extension_ranges=[],
+ oneofs=[
+ ],
+ serialized_start=2778,
+ serialized_end=2838,
+)
+
+
+_DIRENTRIES = _descriptor.Descriptor(
+ name='DirEntries',
+ full_name='filemgr.DirEntries',
+ filename=None,
+ file=DESCRIPTOR,
+ containing_type=None,
+ fields=[
+ _descriptor.FieldDescriptor(
+ name='entry', full_name='filemgr.DirEntries.entry', index=0,
+ number=1, type=9, cpp_type=9, label=3,
+ has_default_value=False, default_value=[],
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ serialized_options=None, file=DESCRIPTOR),
+ ],
+ extensions=[
+ ],
+ nested_types=[],
+ enum_types=[
+ ],
+ serialized_options=None,
+ is_extendable=False,
+ syntax='proto3',
+ extension_ranges=[],
+ oneofs=[
+ ],
+ serialized_start=2840,
+ serialized_end=2867,
+)
+
+_REGISTERVOLUMEREQUEST.fields_by_name['type'].enum_type = _VOLUMETYPE
+_REGISTERVOLUMEREQUEST.fields_by_name['state'].enum_type = _VOLUMESTATE
+_UPDATEVOLUMESTATEREQUEST.fields_by_name['state'].enum_type = _VOLUMESTATE
+_GETVOLUMEREPLY.fields_by_name['volume_type'].enum_type = _VOLUMETYPE
+_LISTVOLUMESREQUEST.fields_by_name['type'].enum_type = _VOLUMETYPE
+_LISTVOLUMESREPLY.fields_by_name['volumes'].message_type = _VOLUME
+_LOADOBJECTSBYPREFIXREPLY.fields_by_name['objects'].message_type = _OBJECT
+_LOADOBJECTSBYVOLUMEREPLY.fields_by_name['objects'].message_type = _OBJECT
+_LISTQUARANTINEDOHASHESREPLY.fields_by_name['objects'].message_type = _QUARANTINEDOBJECTNAME
+_LISTQUARANTINEDOHASHREPLY.fields_by_name['objects'].message_type = _OBJECT
+_GETSTATSREPLY_STATSENTRY.containing_type = _GETSTATSREPLY
+_GETSTATSREPLY.fields_by_name['stats'].message_type = _GETSTATSREPLY_STATSENTRY
+_VOLUME.fields_by_name['volume_type'].enum_type = _VOLUMETYPE
+DESCRIPTOR.message_types_by_name['RegisterVolumeRequest'] = _REGISTERVOLUMEREQUEST
+DESCRIPTOR.message_types_by_name['RegisterVolumeReply'] = _REGISTERVOLUMEREPLY
+DESCRIPTOR.message_types_by_name['UnregisterVolumeRequest'] = _UNREGISTERVOLUMEREQUEST
+DESCRIPTOR.message_types_by_name['UnregisterVolumeReply'] = _UNREGISTERVOLUMEREPLY
+DESCRIPTOR.message_types_by_name['UpdateVolumeStateRequest'] = _UPDATEVOLUMESTATEREQUEST
+DESCRIPTOR.message_types_by_name['UpdateVolumeStateReply'] = _UPDATEVOLUMESTATEREPLY
+DESCRIPTOR.message_types_by_name['GetVolumeRequest'] = _GETVOLUMEREQUEST
+DESCRIPTOR.message_types_by_name['GetVolumeReply'] = _GETVOLUMEREPLY
+DESCRIPTOR.message_types_by_name['ListVolumesRequest'] = _LISTVOLUMESREQUEST
+DESCRIPTOR.message_types_by_name['ListVolumesReply'] = _LISTVOLUMESREPLY
+DESCRIPTOR.message_types_by_name['RegisterObjectRequest'] = _REGISTEROBJECTREQUEST
+DESCRIPTOR.message_types_by_name['RegisterObjectReply'] = _REGISTEROBJECTREPLY
+DESCRIPTOR.message_types_by_name['UnregisterObjectRequest'] = _UNREGISTEROBJECTREQUEST
+DESCRIPTOR.message_types_by_name['UnregisterObjectReply'] = _UNREGISTEROBJECTREPLY
+DESCRIPTOR.message_types_by_name['RenameObjectRequest'] = _RENAMEOBJECTREQUEST
+DESCRIPTOR.message_types_by_name['RenameObjectReply'] = _RENAMEOBJECTREPLY
+DESCRIPTOR.message_types_by_name['LoadObjectRequest'] = _LOADOBJECTREQUEST
+DESCRIPTOR.message_types_by_name['LoadObjectReply'] = _LOADOBJECTREPLY
+DESCRIPTOR.message_types_by_name['QuarantineObjectRequest'] = _QUARANTINEOBJECTREQUEST
+DESCRIPTOR.message_types_by_name['QuarantineObjectReply'] = _QUARANTINEOBJECTREPLY
+DESCRIPTOR.message_types_by_name['UnquarantineObjectRequest'] = _UNQUARANTINEOBJECTREQUEST
+DESCRIPTOR.message_types_by_name['UnquarantineObjectReply'] = _UNQUARANTINEOBJECTREPLY
+DESCRIPTOR.message_types_by_name['LoadObjectsByPrefixRequest'] = _LOADOBJECTSBYPREFIXREQUEST
+DESCRIPTOR.message_types_by_name['LoadObjectsByPrefixReply'] = _LOADOBJECTSBYPREFIXREPLY
+DESCRIPTOR.message_types_by_name['LoadObjectsByVolumeRequest'] = _LOADOBJECTSBYVOLUMEREQUEST
+DESCRIPTOR.message_types_by_name['LoadObjectsByVolumeReply'] = _LOADOBJECTSBYVOLUMEREPLY
+DESCRIPTOR.message_types_by_name['ListPartitionsRequest'] = _LISTPARTITIONSREQUEST
+DESCRIPTOR.message_types_by_name['ListPartitionRequest'] = _LISTPARTITIONREQUEST
+DESCRIPTOR.message_types_by_name['ListSuffixRequest'] = _LISTSUFFIXREQUEST
+DESCRIPTOR.message_types_by_name['ListQuarantinedOHashesRequest'] = _LISTQUARANTINEDOHASHESREQUEST
+DESCRIPTOR.message_types_by_name['ListQuarantinedOHashesReply'] = _LISTQUARANTINEDOHASHESREPLY
+DESCRIPTOR.message_types_by_name['QuarantinedObjectName'] = _QUARANTINEDOBJECTNAME
+DESCRIPTOR.message_types_by_name['ListQuarantinedOHashRequest'] = _LISTQUARANTINEDOHASHREQUEST
+DESCRIPTOR.message_types_by_name['ListQuarantinedOHashReply'] = _LISTQUARANTINEDOHASHREPLY
+DESCRIPTOR.message_types_by_name['GetNextOffsetRequest'] = _GETNEXTOFFSETREQUEST
+DESCRIPTOR.message_types_by_name['GetNextOffsetReply'] = _GETNEXTOFFSETREPLY
+DESCRIPTOR.message_types_by_name['GetStatsRequest'] = _GETSTATSREQUEST
+DESCRIPTOR.message_types_by_name['GetStatsReply'] = _GETSTATSREPLY
+DESCRIPTOR.message_types_by_name['SetKvStateReply'] = _SETKVSTATEREPLY
+DESCRIPTOR.message_types_by_name['GetKvStateRequest'] = _GETKVSTATEREQUEST
+DESCRIPTOR.message_types_by_name['KvState'] = _KVSTATE
+DESCRIPTOR.message_types_by_name['Volume'] = _VOLUME
+DESCRIPTOR.message_types_by_name['Object'] = _OBJECT
+DESCRIPTOR.message_types_by_name['DirEntries'] = _DIRENTRIES
+DESCRIPTOR.enum_types_by_name['VolumeType'] = _VOLUMETYPE
+DESCRIPTOR.enum_types_by_name['VolumeState'] = _VOLUMESTATE
+_sym_db.RegisterFileDescriptor(DESCRIPTOR)
+
+RegisterVolumeRequest = _reflection.GeneratedProtocolMessageType('RegisterVolumeRequest', (_message.Message,), {
+ 'DESCRIPTOR' : _REGISTERVOLUMEREQUEST,
+ '__module__' : 'fmgr_pb2'
+ # @@protoc_insertion_point(class_scope:filemgr.RegisterVolumeRequest)
+ })
+_sym_db.RegisterMessage(RegisterVolumeRequest)
+
+RegisterVolumeReply = _reflection.GeneratedProtocolMessageType('RegisterVolumeReply', (_message.Message,), {
+ 'DESCRIPTOR' : _REGISTERVOLUMEREPLY,
+ '__module__' : 'fmgr_pb2'
+ # @@protoc_insertion_point(class_scope:filemgr.RegisterVolumeReply)
+ })
+_sym_db.RegisterMessage(RegisterVolumeReply)
+
+UnregisterVolumeRequest = _reflection.GeneratedProtocolMessageType('UnregisterVolumeRequest', (_message.Message,), {
+ 'DESCRIPTOR' : _UNREGISTERVOLUMEREQUEST,
+ '__module__' : 'fmgr_pb2'
+ # @@protoc_insertion_point(class_scope:filemgr.UnregisterVolumeRequest)
+ })
+_sym_db.RegisterMessage(UnregisterVolumeRequest)
+
+UnregisterVolumeReply = _reflection.GeneratedProtocolMessageType('UnregisterVolumeReply', (_message.Message,), {
+ 'DESCRIPTOR' : _UNREGISTERVOLUMEREPLY,
+ '__module__' : 'fmgr_pb2'
+ # @@protoc_insertion_point(class_scope:filemgr.UnregisterVolumeReply)
+ })
+_sym_db.RegisterMessage(UnregisterVolumeReply)
+
+UpdateVolumeStateRequest = _reflection.GeneratedProtocolMessageType('UpdateVolumeStateRequest', (_message.Message,), {
+ 'DESCRIPTOR' : _UPDATEVOLUMESTATEREQUEST,
+ '__module__' : 'fmgr_pb2'
+ # @@protoc_insertion_point(class_scope:filemgr.UpdateVolumeStateRequest)
+ })
+_sym_db.RegisterMessage(UpdateVolumeStateRequest)
+
+UpdateVolumeStateReply = _reflection.GeneratedProtocolMessageType('UpdateVolumeStateReply', (_message.Message,), {
+ 'DESCRIPTOR' : _UPDATEVOLUMESTATEREPLY,
+ '__module__' : 'fmgr_pb2'
+ # @@protoc_insertion_point(class_scope:filemgr.UpdateVolumeStateReply)
+ })
+_sym_db.RegisterMessage(UpdateVolumeStateReply)
+
+GetVolumeRequest = _reflection.GeneratedProtocolMessageType('GetVolumeRequest', (_message.Message,), {
+ 'DESCRIPTOR' : _GETVOLUMEREQUEST,
+ '__module__' : 'fmgr_pb2'
+ # @@protoc_insertion_point(class_scope:filemgr.GetVolumeRequest)
+ })
+_sym_db.RegisterMessage(GetVolumeRequest)
+
+GetVolumeReply = _reflection.GeneratedProtocolMessageType('GetVolumeReply', (_message.Message,), {
+ 'DESCRIPTOR' : _GETVOLUMEREPLY,
+ '__module__' : 'fmgr_pb2'
+ # @@protoc_insertion_point(class_scope:filemgr.GetVolumeReply)
+ })
+_sym_db.RegisterMessage(GetVolumeReply)
+
+ListVolumesRequest = _reflection.GeneratedProtocolMessageType('ListVolumesRequest', (_message.Message,), {
+ 'DESCRIPTOR' : _LISTVOLUMESREQUEST,
+ '__module__' : 'fmgr_pb2'
+ # @@protoc_insertion_point(class_scope:filemgr.ListVolumesRequest)
+ })
+_sym_db.RegisterMessage(ListVolumesRequest)
+
+ListVolumesReply = _reflection.GeneratedProtocolMessageType('ListVolumesReply', (_message.Message,), {
+ 'DESCRIPTOR' : _LISTVOLUMESREPLY,
+ '__module__' : 'fmgr_pb2'
+ # @@protoc_insertion_point(class_scope:filemgr.ListVolumesReply)
+ })
+_sym_db.RegisterMessage(ListVolumesReply)
+
+RegisterObjectRequest = _reflection.GeneratedProtocolMessageType('RegisterObjectRequest', (_message.Message,), {
+ 'DESCRIPTOR' : _REGISTEROBJECTREQUEST,
+ '__module__' : 'fmgr_pb2'
+ # @@protoc_insertion_point(class_scope:filemgr.RegisterObjectRequest)
+ })
+_sym_db.RegisterMessage(RegisterObjectRequest)
+
+RegisterObjectReply = _reflection.GeneratedProtocolMessageType('RegisterObjectReply', (_message.Message,), {
+ 'DESCRIPTOR' : _REGISTEROBJECTREPLY,
+ '__module__' : 'fmgr_pb2'
+ # @@protoc_insertion_point(class_scope:filemgr.RegisterObjectReply)
+ })
+_sym_db.RegisterMessage(RegisterObjectReply)
+
+UnregisterObjectRequest = _reflection.GeneratedProtocolMessageType('UnregisterObjectRequest', (_message.Message,), {
+ 'DESCRIPTOR' : _UNREGISTEROBJECTREQUEST,
+ '__module__' : 'fmgr_pb2'
+ # @@protoc_insertion_point(class_scope:filemgr.UnregisterObjectRequest)
+ })
+_sym_db.RegisterMessage(UnregisterObjectRequest)
+
+UnregisterObjectReply = _reflection.GeneratedProtocolMessageType('UnregisterObjectReply', (_message.Message,), {
+ 'DESCRIPTOR' : _UNREGISTEROBJECTREPLY,
+ '__module__' : 'fmgr_pb2'
+ # @@protoc_insertion_point(class_scope:filemgr.UnregisterObjectReply)
+ })
+_sym_db.RegisterMessage(UnregisterObjectReply)
+
+RenameObjectRequest = _reflection.GeneratedProtocolMessageType('RenameObjectRequest', (_message.Message,), {
+ 'DESCRIPTOR' : _RENAMEOBJECTREQUEST,
+ '__module__' : 'fmgr_pb2'
+ # @@protoc_insertion_point(class_scope:filemgr.RenameObjectRequest)
+ })
+_sym_db.RegisterMessage(RenameObjectRequest)
+
+RenameObjectReply = _reflection.GeneratedProtocolMessageType('RenameObjectReply', (_message.Message,), {
+ 'DESCRIPTOR' : _RENAMEOBJECTREPLY,
+ '__module__' : 'fmgr_pb2'
+ # @@protoc_insertion_point(class_scope:filemgr.RenameObjectReply)
+ })
+_sym_db.RegisterMessage(RenameObjectReply)
+
+LoadObjectRequest = _reflection.GeneratedProtocolMessageType('LoadObjectRequest', (_message.Message,), {
+ 'DESCRIPTOR' : _LOADOBJECTREQUEST,
+ '__module__' : 'fmgr_pb2'
+ # @@protoc_insertion_point(class_scope:filemgr.LoadObjectRequest)
+ })
+_sym_db.RegisterMessage(LoadObjectRequest)
+
+LoadObjectReply = _reflection.GeneratedProtocolMessageType('LoadObjectReply', (_message.Message,), {
+ 'DESCRIPTOR' : _LOADOBJECTREPLY,
+ '__module__' : 'fmgr_pb2'
+ # @@protoc_insertion_point(class_scope:filemgr.LoadObjectReply)
+ })
+_sym_db.RegisterMessage(LoadObjectReply)
+
+QuarantineObjectRequest = _reflection.GeneratedProtocolMessageType('QuarantineObjectRequest', (_message.Message,), {
+ 'DESCRIPTOR' : _QUARANTINEOBJECTREQUEST,
+ '__module__' : 'fmgr_pb2'
+ # @@protoc_insertion_point(class_scope:filemgr.QuarantineObjectRequest)
+ })
+_sym_db.RegisterMessage(QuarantineObjectRequest)
+
+QuarantineObjectReply = _reflection.GeneratedProtocolMessageType('QuarantineObjectReply', (_message.Message,), {
+ 'DESCRIPTOR' : _QUARANTINEOBJECTREPLY,
+ '__module__' : 'fmgr_pb2'
+ # @@protoc_insertion_point(class_scope:filemgr.QuarantineObjectReply)
+ })
+_sym_db.RegisterMessage(QuarantineObjectReply)
+
+UnquarantineObjectRequest = _reflection.GeneratedProtocolMessageType('UnquarantineObjectRequest', (_message.Message,), {
+ 'DESCRIPTOR' : _UNQUARANTINEOBJECTREQUEST,
+ '__module__' : 'fmgr_pb2'
+ # @@protoc_insertion_point(class_scope:filemgr.UnquarantineObjectRequest)
+ })
+_sym_db.RegisterMessage(UnquarantineObjectRequest)
+
+UnquarantineObjectReply = _reflection.GeneratedProtocolMessageType('UnquarantineObjectReply', (_message.Message,), {
+ 'DESCRIPTOR' : _UNQUARANTINEOBJECTREPLY,
+ '__module__' : 'fmgr_pb2'
+ # @@protoc_insertion_point(class_scope:filemgr.UnquarantineObjectReply)
+ })
+_sym_db.RegisterMessage(UnquarantineObjectReply)
+
+LoadObjectsByPrefixRequest = _reflection.GeneratedProtocolMessageType('LoadObjectsByPrefixRequest', (_message.Message,), {
+ 'DESCRIPTOR' : _LOADOBJECTSBYPREFIXREQUEST,
+ '__module__' : 'fmgr_pb2'
+ # @@protoc_insertion_point(class_scope:filemgr.LoadObjectsByPrefixRequest)
+ })
+_sym_db.RegisterMessage(LoadObjectsByPrefixRequest)
+
+LoadObjectsByPrefixReply = _reflection.GeneratedProtocolMessageType('LoadObjectsByPrefixReply', (_message.Message,), {
+ 'DESCRIPTOR' : _LOADOBJECTSBYPREFIXREPLY,
+ '__module__' : 'fmgr_pb2'
+ # @@protoc_insertion_point(class_scope:filemgr.LoadObjectsByPrefixReply)
+ })
+_sym_db.RegisterMessage(LoadObjectsByPrefixReply)
+
+LoadObjectsByVolumeRequest = _reflection.GeneratedProtocolMessageType('LoadObjectsByVolumeRequest', (_message.Message,), {
+ 'DESCRIPTOR' : _LOADOBJECTSBYVOLUMEREQUEST,
+ '__module__' : 'fmgr_pb2'
+ # @@protoc_insertion_point(class_scope:filemgr.LoadObjectsByVolumeRequest)
+ })
+_sym_db.RegisterMessage(LoadObjectsByVolumeRequest)
+
+LoadObjectsByVolumeReply = _reflection.GeneratedProtocolMessageType('LoadObjectsByVolumeReply', (_message.Message,), {
+ 'DESCRIPTOR' : _LOADOBJECTSBYVOLUMEREPLY,
+ '__module__' : 'fmgr_pb2'
+ # @@protoc_insertion_point(class_scope:filemgr.LoadObjectsByVolumeReply)
+ })
+_sym_db.RegisterMessage(LoadObjectsByVolumeReply)
+
+ListPartitionsRequest = _reflection.GeneratedProtocolMessageType('ListPartitionsRequest', (_message.Message,), {
+ 'DESCRIPTOR' : _LISTPARTITIONSREQUEST,
+ '__module__' : 'fmgr_pb2'
+ # @@protoc_insertion_point(class_scope:filemgr.ListPartitionsRequest)
+ })
+_sym_db.RegisterMessage(ListPartitionsRequest)
+
+ListPartitionRequest = _reflection.GeneratedProtocolMessageType('ListPartitionRequest', (_message.Message,), {
+ 'DESCRIPTOR' : _LISTPARTITIONREQUEST,
+ '__module__' : 'fmgr_pb2'
+ # @@protoc_insertion_point(class_scope:filemgr.ListPartitionRequest)
+ })
+_sym_db.RegisterMessage(ListPartitionRequest)
+
+ListSuffixRequest = _reflection.GeneratedProtocolMessageType('ListSuffixRequest', (_message.Message,), {
+ 'DESCRIPTOR' : _LISTSUFFIXREQUEST,
+ '__module__' : 'fmgr_pb2'
+ # @@protoc_insertion_point(class_scope:filemgr.ListSuffixRequest)
+ })
+_sym_db.RegisterMessage(ListSuffixRequest)
+
+ListQuarantinedOHashesRequest = _reflection.GeneratedProtocolMessageType('ListQuarantinedOHashesRequest', (_message.Message,), {
+ 'DESCRIPTOR' : _LISTQUARANTINEDOHASHESREQUEST,
+ '__module__' : 'fmgr_pb2'
+ # @@protoc_insertion_point(class_scope:filemgr.ListQuarantinedOHashesRequest)
+ })
+_sym_db.RegisterMessage(ListQuarantinedOHashesRequest)
+
+ListQuarantinedOHashesReply = _reflection.GeneratedProtocolMessageType('ListQuarantinedOHashesReply', (_message.Message,), {
+ 'DESCRIPTOR' : _LISTQUARANTINEDOHASHESREPLY,
+ '__module__' : 'fmgr_pb2'
+ # @@protoc_insertion_point(class_scope:filemgr.ListQuarantinedOHashesReply)
+ })
+_sym_db.RegisterMessage(ListQuarantinedOHashesReply)
+
+QuarantinedObjectName = _reflection.GeneratedProtocolMessageType('QuarantinedObjectName', (_message.Message,), {
+ 'DESCRIPTOR' : _QUARANTINEDOBJECTNAME,
+ '__module__' : 'fmgr_pb2'
+ # @@protoc_insertion_point(class_scope:filemgr.QuarantinedObjectName)
+ })
+_sym_db.RegisterMessage(QuarantinedObjectName)
+
+ListQuarantinedOHashRequest = _reflection.GeneratedProtocolMessageType('ListQuarantinedOHashRequest', (_message.Message,), {
+ 'DESCRIPTOR' : _LISTQUARANTINEDOHASHREQUEST,
+ '__module__' : 'fmgr_pb2'
+ # @@protoc_insertion_point(class_scope:filemgr.ListQuarantinedOHashRequest)
+ })
+_sym_db.RegisterMessage(ListQuarantinedOHashRequest)
+
+ListQuarantinedOHashReply = _reflection.GeneratedProtocolMessageType('ListQuarantinedOHashReply', (_message.Message,), {
+ 'DESCRIPTOR' : _LISTQUARANTINEDOHASHREPLY,
+ '__module__' : 'fmgr_pb2'
+ # @@protoc_insertion_point(class_scope:filemgr.ListQuarantinedOHashReply)
+ })
+_sym_db.RegisterMessage(ListQuarantinedOHashReply)
+
+GetNextOffsetRequest = _reflection.GeneratedProtocolMessageType('GetNextOffsetRequest', (_message.Message,), {
+ 'DESCRIPTOR' : _GETNEXTOFFSETREQUEST,
+ '__module__' : 'fmgr_pb2'
+ # @@protoc_insertion_point(class_scope:filemgr.GetNextOffsetRequest)
+ })
+_sym_db.RegisterMessage(GetNextOffsetRequest)
+
+GetNextOffsetReply = _reflection.GeneratedProtocolMessageType('GetNextOffsetReply', (_message.Message,), {
+ 'DESCRIPTOR' : _GETNEXTOFFSETREPLY,
+ '__module__' : 'fmgr_pb2'
+ # @@protoc_insertion_point(class_scope:filemgr.GetNextOffsetReply)
+ })
+_sym_db.RegisterMessage(GetNextOffsetReply)
+
+GetStatsRequest = _reflection.GeneratedProtocolMessageType('GetStatsRequest', (_message.Message,), {
+ 'DESCRIPTOR' : _GETSTATSREQUEST,
+ '__module__' : 'fmgr_pb2'
+ # @@protoc_insertion_point(class_scope:filemgr.GetStatsRequest)
+ })
+_sym_db.RegisterMessage(GetStatsRequest)
+
+GetStatsReply = _reflection.GeneratedProtocolMessageType('GetStatsReply', (_message.Message,), {
+
+ 'StatsEntry' : _reflection.GeneratedProtocolMessageType('StatsEntry', (_message.Message,), {
+ 'DESCRIPTOR' : _GETSTATSREPLY_STATSENTRY,
+ '__module__' : 'fmgr_pb2'
+ # @@protoc_insertion_point(class_scope:filemgr.GetStatsReply.StatsEntry)
+ })
+ ,
+ 'DESCRIPTOR' : _GETSTATSREPLY,
+ '__module__' : 'fmgr_pb2'
+ # @@protoc_insertion_point(class_scope:filemgr.GetStatsReply)
+ })
+_sym_db.RegisterMessage(GetStatsReply)
+_sym_db.RegisterMessage(GetStatsReply.StatsEntry)
+
+SetKvStateReply = _reflection.GeneratedProtocolMessageType('SetKvStateReply', (_message.Message,), {
+ 'DESCRIPTOR' : _SETKVSTATEREPLY,
+ '__module__' : 'fmgr_pb2'
+ # @@protoc_insertion_point(class_scope:filemgr.SetKvStateReply)
+ })
+_sym_db.RegisterMessage(SetKvStateReply)
+
+GetKvStateRequest = _reflection.GeneratedProtocolMessageType('GetKvStateRequest', (_message.Message,), {
+ 'DESCRIPTOR' : _GETKVSTATEREQUEST,
+ '__module__' : 'fmgr_pb2'
+ # @@protoc_insertion_point(class_scope:filemgr.GetKvStateRequest)
+ })
+_sym_db.RegisterMessage(GetKvStateRequest)
+
+KvState = _reflection.GeneratedProtocolMessageType('KvState', (_message.Message,), {
+ 'DESCRIPTOR' : _KVSTATE,
+ '__module__' : 'fmgr_pb2'
+ # @@protoc_insertion_point(class_scope:filemgr.KvState)
+ })
+_sym_db.RegisterMessage(KvState)
+
+Volume = _reflection.GeneratedProtocolMessageType('Volume', (_message.Message,), {
+ 'DESCRIPTOR' : _VOLUME,
+ '__module__' : 'fmgr_pb2'
+ # @@protoc_insertion_point(class_scope:filemgr.Volume)
+ })
+_sym_db.RegisterMessage(Volume)
+
+Object = _reflection.GeneratedProtocolMessageType('Object', (_message.Message,), {
+ 'DESCRIPTOR' : _OBJECT,
+ '__module__' : 'fmgr_pb2'
+ # @@protoc_insertion_point(class_scope:filemgr.Object)
+ })
+_sym_db.RegisterMessage(Object)
+
+DirEntries = _reflection.GeneratedProtocolMessageType('DirEntries', (_message.Message,), {
+ 'DESCRIPTOR' : _DIRENTRIES,
+ '__module__' : 'fmgr_pb2'
+ # @@protoc_insertion_point(class_scope:filemgr.DirEntries)
+ })
+_sym_db.RegisterMessage(DirEntries)
+
+
+_GETSTATSREPLY_STATSENTRY._options = None
+# @@protoc_insertion_point(module_scope)
diff --git a/swift/obj/header.py b/swift/obj/header.py
new file mode 100644
index 000000000..fd442ea83
--- /dev/null
+++ b/swift/obj/header.py
@@ -0,0 +1,394 @@
+# Copyright (c) 2010-2012 OpenStack Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import six
+import os
+import struct
+
+from swift.common.utils import fdatasync
+
+PICKLE_PROTOCOL = 2
+
+# header version to use for new objects
+OBJECT_HEADER_VERSION = 4
+VOLUME_HEADER_VERSION = 1
+
+# maximum serialized header length
+MAX_OBJECT_HEADER_LEN = 512
+MAX_VOLUME_HEADER_LEN = 128
+
+OBJECT_START_MARKER = b"SWIFTOBJ"
+VOLUME_START_MARKER = b"SWIFTVOL"
+
+# object alignment within a volume.
+# this is needed so that FALLOC_FL_PUNCH_HOLE can actually return space back
+# to the filesystem (tested on XFS and ext4)
+# we may not need to align files in volumes dedicated to short-lived files,
+# such as tombstones (.ts extension),
+# but currently we do align for all volume types.
+ALIGNMENT = 4096
+
+# constants
+STATE_OBJ_FILE = 0
+STATE_OBJ_QUARANTINED = 1
+
+
+class HeaderException(IOError):
+ def __init__(self, message):
+ self.message = message
+ super(HeaderException, self).__init__(message)
+
+
+object_header_formats = {
+ 1: '8sBQQQ30sQQQQQ',
+ 2: '8sBQQQ64sQQQQQ', # 64 characters for the filename
+ 3: '8sBQQQ64sQQQQQB', # add state field
+ 4: '8sBQQQ64sQQQQQB32s' # add metadata checksum
+}
+
+
+class ObjectHeader(object):
+ """
+ Version 1:
+ Magic string (8 bytes)
+ Header version (1 byte)
+ Policy index (8 bytes)
+ Object hash (16 bytes) (__init__)
+ Filename (30 chars)
+ Metadata offset (8 bytes)
+ Metadata size (8 bytes)
+ Data offset (8 bytes)
+ Data size (8 bytes)
+ Total object size (8 bytes)
+
+ Version 2: similar but 64 chars for the filename
+ Version 3: Adds a "state" field (unsigned char)
+ """
+
+ def __init__(self, version=OBJECT_HEADER_VERSION):
+ if version not in object_header_formats.keys():
+ raise HeaderException('Unsupported object header version')
+ self.magic_string = OBJECT_START_MARKER
+ self.version = version
+
+ def __eq__(self, other):
+ return self.__dict__ == other.__dict__
+
+ def __len__(self):
+ try:
+ fmt = object_header_formats[self.version]
+ except KeyError:
+ raise HeaderException('Unsupported header version')
+ return struct.calcsize(fmt)
+
+ def pack(self):
+ version_to_pack = {
+ 1: self.__pack_v1,
+ 2: self.__pack_v2,
+ 3: self.__pack_v3,
+ 4: self.__pack_v4
+ }
+ return version_to_pack[self.version]()
+
+ def __pack_v1(self):
+ fmt = object_header_formats[1]
+ ohash_h = int(self.ohash, 16) >> 64
+ ohash_l = int(self.ohash, 16) & 0x0000000000000000ffffffffffffffff
+
+ args = (self.magic_string, self.version,
+ self.policy_idx, ohash_h, ohash_l,
+ str(self.filename).encode('ascii'),
+ self.metadata_offset, self.metadata_size,
+ self.data_offset, self.data_size, self.total_size)
+
+ return struct.pack(fmt, *args)
+
+ def __pack_v2(self):
+ fmt = object_header_formats[2]
+ ohash_h = int(self.ohash, 16) >> 64
+ ohash_l = int(self.ohash, 16) & 0x0000000000000000ffffffffffffffff
+
+ args = (self.magic_string, self.version,
+ self.policy_idx, ohash_h, ohash_l,
+ str(self.filename).encode('ascii'),
+ self.metadata_offset, self.metadata_size,
+ self.data_offset, self.data_size, self.total_size)
+
+ return struct.pack(fmt, *args)
+
+ def __pack_v3(self):
+ fmt = object_header_formats[3]
+ ohash_h = int(self.ohash, 16) >> 64
+ ohash_l = int(self.ohash, 16) & 0x0000000000000000ffffffffffffffff
+
+ args = (self.magic_string, self.version,
+ self.policy_idx, ohash_h, ohash_l,
+ str(self.filename).encode('ascii'),
+ self.metadata_offset, self.metadata_size,
+ self.data_offset, self.data_size, self.total_size, self.state)
+
+ return struct.pack(fmt, *args)
+
+ def __pack_v4(self):
+ fmt = object_header_formats[4]
+ ohash_h = int(self.ohash, 16) >> 64
+ ohash_l = int(self.ohash, 16) & 0x0000000000000000ffffffffffffffff
+
+ args = (self.magic_string, self.version,
+ self.policy_idx, ohash_h, ohash_l,
+ str(self.filename).encode('ascii'),
+ self.metadata_offset, self.metadata_size,
+ self.data_offset, self.data_size, self.total_size, self.state,
+ self.metastr_md5)
+
+ return struct.pack(fmt, *args)
+
+ @classmethod
+ def unpack(cls, buf):
+ version_to_unpack = {
+ 1: cls.__unpack_v1,
+ 2: cls.__unpack_v2,
+ 3: cls.__unpack_v3,
+ 4: cls.__unpack_v4
+ }
+
+ if buf[0:8] != OBJECT_START_MARKER:
+ raise HeaderException('Not a header')
+ version = struct.unpack('<B', buf[8:9])[0]
+ if version not in object_header_formats.keys():
+ raise HeaderException('Unsupported header version')
+
+ return version_to_unpack[version](buf)
+
+ @classmethod
+ def __unpack_v1(cls, buf):
+ fmt = object_header_formats[1]
+ raw_header = struct.unpack(fmt, buf[0:struct.calcsize(fmt)])
+ header = cls()
+ header.magic_string = raw_header[0]
+ header.version = raw_header[1]
+ header.policy_idx = raw_header[2]
+ header.ohash = "{:032x}".format((raw_header[3] << 64) + raw_header[4])
+ if six.PY2:
+ header.filename = raw_header[5].rstrip(b'\0')
+ else:
+ header.filename = raw_header[5].rstrip(b'\0').decode('ascii')
+ header.metadata_offset = raw_header[6]
+ header.metadata_size = raw_header[7]
+ header.data_offset = raw_header[8]
+ header.data_size = raw_header[9]
+ # currently, total_size gets padded to the next 4k boundary, so that
+ # fallocate can reclaim the block when hole punching.
+ header.total_size = raw_header[10]
+
+ return header
+
+ @classmethod
+ def __unpack_v2(cls, buf):
+ fmt = object_header_formats[2]
+ raw_header = struct.unpack(fmt, buf[0:struct.calcsize(fmt)])
+ header = cls()
+ header.magic_string = raw_header[0]
+ header.version = raw_header[1]
+ header.policy_idx = raw_header[2]
+ header.ohash = "{:032x}".format((raw_header[3] << 64) + raw_header[4])
+ if six.PY2:
+ header.filename = raw_header[5].rstrip(b'\0')
+ else:
+ header.filename = raw_header[5].rstrip(b'\0').decode('ascii')
+ header.metadata_offset = raw_header[6]
+ header.metadata_size = raw_header[7]
+ header.data_offset = raw_header[8]
+ header.data_size = raw_header[9]
+ # currently, total_size gets padded to the next 4k boundary, so that
+ # fallocate can reclaim the block when hole punching.
+ header.total_size = raw_header[10]
+
+ return header
+
+ @classmethod
+ def __unpack_v3(cls, buf):
+ fmt = object_header_formats[3]
+ raw_header = struct.unpack(fmt, buf[0:struct.calcsize(fmt)])
+ header = cls()
+ header.magic_string = raw_header[0]
+ header.version = raw_header[1]
+ header.policy_idx = raw_header[2]
+ header.ohash = "{:032x}".format((raw_header[3] << 64) + raw_header[4])
+ if six.PY2:
+ header.filename = raw_header[5].rstrip(b'\0')
+ else:
+ header.filename = raw_header[5].rstrip(b'\0').decode('ascii')
+ header.metadata_offset = raw_header[6]
+ header.metadata_size = raw_header[7]
+ header.data_offset = raw_header[8]
+ header.data_size = raw_header[9]
+ # currently, total_size gets padded to the next 4k boundary, so that
+ # fallocate can reclaim the block when hole punching.
+ header.total_size = raw_header[10]
+ header.state = raw_header[11]
+
+ return header
+
+ @classmethod
+ def __unpack_v4(cls, buf):
+ fmt = object_header_formats[4]
+ raw_header = struct.unpack(fmt, buf[0:struct.calcsize(fmt)])
+ header = cls()
+ header.magic_string = raw_header[0]
+ header.version = raw_header[1]
+ header.policy_idx = raw_header[2]
+ header.ohash = "{:032x}".format((raw_header[3] << 64) + raw_header[4])
+ if six.PY2:
+ header.filename = raw_header[5].rstrip(b'\0')
+ else:
+ header.filename = raw_header[5].rstrip(b'\0').decode('ascii')
+ header.metadata_offset = raw_header[6]
+ header.metadata_size = raw_header[7]
+ header.data_offset = raw_header[8]
+ header.data_size = raw_header[9]
+ # currently, total_size gets padded to the next 4k boundary, so that
+ # fallocate can reclaim the block when hole punching.
+ header.total_size = raw_header[10]
+ header.state = raw_header[11]
+ header.metastr_md5 = raw_header[12]
+
+ return header
+
+
+volume_header_formats = {
+ 1: '8sBQQQQLQ'
+}
+
+
+class VolumeHeader(object):
+ """
+ Version 1:
+ Magic string (8 bytes)
+ Header version (1 byte)
+ Volume index (8 bytes)
+ Partition index (8 bytes)
+ Volume type (8 bytes)
+ First object offset (8 bytes)
+ Volume state (4 bytes) (enum from fmgr.proto)
+ Volume compaction target (8 bytes)
+ (only valid if state is STATE_COMPACTION_SRC)
+ """
+ def __init__(self, version=VOLUME_HEADER_VERSION):
+ self.magic_string = VOLUME_START_MARKER
+ self.version = version
+ self.state = 0
+ self.compaction_target = 0
+
+ def __str__(self):
+ prop_list = ['volume_idx', 'partition', 'type',
+ 'state', 'compaction_target']
+ h_str = ""
+ for prop in prop_list:
+ h_str += "{}: {}\n".format(prop, getattr(self, prop))
+ return h_str[:-1]
+
+ def __len__(self):
+ try:
+ fmt = volume_header_formats[self.version]
+ except KeyError:
+ raise HeaderException('Unsupported header version')
+ return struct.calcsize(fmt)
+
+ def pack(self):
+ version_to_pack = {
+ 1: self.__pack_v1,
+ }
+ return version_to_pack[self.version]()
+
+ def __pack_v1(self):
+ fmt = volume_header_formats[1]
+
+ args = (self.magic_string, self.version,
+ self.volume_idx, self.partition, self.type,
+ self.first_obj_offset, self.state,
+ self.compaction_target)
+
+ return struct.pack(fmt, *args)
+
+ @classmethod
+ def unpack(cls, buf):
+ version_to_unpack = {
+ 1: cls.__unpack_v1
+ }
+ if buf[0:8] != VOLUME_START_MARKER:
+ raise HeaderException('Not a header')
+ version = struct.unpack('<B', buf[8:9])[0]
+ if version not in volume_header_formats.keys():
+ raise HeaderException('Unsupported header version')
+
+ return version_to_unpack[version](buf)
+
+ @classmethod
+ def __unpack_v1(cls, buf):
+ fmt = volume_header_formats[1]
+ raw_header = struct.unpack(fmt, buf[0:struct.calcsize(fmt)])
+ header = cls()
+ header.magic_string = raw_header[0]
+ header.version = raw_header[1]
+ header.volume_idx = raw_header[2]
+ header.partition = raw_header[3]
+ header.type = raw_header[4]
+ header.first_obj_offset = raw_header[5]
+ header.state = raw_header[6]
+ header.compaction_target = raw_header[7]
+
+ return header
+
+
+# Read volume header. Expects fp to be positionned at header offset
+def read_volume_header(fp):
+ buf = fp.read(MAX_VOLUME_HEADER_LEN)
+ header = VolumeHeader.unpack(buf)
+ return header
+
+
+def write_volume_header(header, fd):
+ os.write(fd, header.pack())
+
+
+def read_object_header(fp):
+ """
+ Read object header
+ :param fp: opened file, positioned at header start
+ :return: an ObjectHeader
+ """
+ buf = fp.read(MAX_OBJECT_HEADER_LEN)
+ header = ObjectHeader.unpack(buf)
+ return header
+
+
+def write_object_header(header, fp):
+ """
+ Rewrites header in open file
+ :param header: header to write
+ :param fp: opened volume
+ """
+ fp.write(header.pack())
+ fdatasync(fp.fileno())
+
+
+def erase_object_header(fd, offset):
+ """
+ Erase an object header by writing null bytes over it
+ :param fd: volume file descriptor
+ :param offset: absolute header offset
+ """
+ os.lseek(fd, offset, os.SEEK_SET)
+ os.write(fd, b"\x00" * MAX_OBJECT_HEADER_LEN)
diff --git a/swift/obj/kvfile.py b/swift/obj/kvfile.py
new file mode 100644
index 000000000..9a0347329
--- /dev/null
+++ b/swift/obj/kvfile.py
@@ -0,0 +1,1260 @@
+# Copyright (c) 2010-2012 OpenStack Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import copy
+import errno
+import os
+import time
+import json
+from hashlib import md5
+import logging
+import traceback
+from os.path import basename, dirname, join
+from random import shuffle
+from contextlib import contextmanager
+from collections import defaultdict
+
+from eventlet import Timeout, tpool
+import six
+
+from swift import gettext_ as _
+from swift.common.constraints import check_drive
+from swift.common.request_helpers import is_sys_meta
+from swift.common.utils import fdatasync, \
+ config_true_value, listdir, split_path, lock_path
+from swift.common.exceptions import DiskFileQuarantined, DiskFileNotExist, \
+ DiskFileCollision, DiskFileNoSpace, DiskFileDeviceUnavailable, \
+ DiskFileError, PathNotDir, \
+ DiskFileExpired, DiskFileXattrNotSupported, \
+ DiskFileBadMetadataChecksum
+from swift.common.storage_policy import (
+ split_policy_string, POLICIES,
+ REPL_POLICY, EC_POLICY)
+
+from swift.obj import vfile
+from swift.obj.diskfile import BaseDiskFileManager, DiskFileManager, \
+ ECDiskFileManager, BaseDiskFile, DiskFile, DiskFileReader, DiskFileWriter,\
+ BaseDiskFileReader, BaseDiskFileWriter, ECDiskFile, ECDiskFileReader, \
+ ECDiskFileWriter, AuditLocation, RESERVED_DATAFILE_META, \
+ DATAFILE_SYSTEM_META, strip_self, DEFAULT_RECLAIM_AGE, _encode_metadata, \
+ get_part_path, get_data_dir, update_auditor_status, extract_policy, \
+ HASH_FILE, read_hashes, write_hashes, consolidate_hashes, invalidate_hash
+
+
+def quarantine_vrenamer(device_path, corrupted_file_path):
+ """
+ In the case that a file is corrupted, move it to a quarantined
+ area to allow replication to fix it.
+
+ :params device_path: The path to the device the corrupted file is on.
+ :params corrupted_file_path: The path to the file you want quarantined.
+
+ :returns: path (str) of directory the file was moved to
+ :raises OSError: re-raises non errno.EEXIST / errno.ENOTEMPTY
+ exceptions from rename
+ """
+ policy = extract_policy(corrupted_file_path)
+ if policy is None:
+ # TODO: support a quarantine-unknown location
+ policy = POLICIES.legacy
+
+ # rename key in KV
+ return vfile.quarantine_ohash(dirname(corrupted_file_path), policy)
+
+
+def object_audit_location_generator(devices, datadir, mount_check=True,
+ logger=None, device_dirs=None,
+ auditor_type="ALL"):
+ """
+ Given a devices path (e.g. "/srv/node"), yield an AuditLocation for all
+ objects stored under that directory for the given datadir (policy),
+ if device_dirs isn't set. If device_dirs is set, only yield AuditLocation
+ for the objects under the entries in device_dirs. The AuditLocation only
+ knows the path to the hash directory, not to the .data file therein
+ (if any). This is to avoid a double listdir(hash_dir); the DiskFile object
+ will always do one, so we don't.
+
+ :param devices: parent directory of the devices to be audited
+ :param datadir: objects directory
+ :param mount_check: flag to check if a mount check should be performed
+ on devices
+ :param logger: a logger object
+ :param device_dirs: a list of directories under devices to traverse
+ :param auditor_type: either ALL or ZBF
+ """
+ if not device_dirs:
+ device_dirs = listdir(devices)
+ else:
+ # remove bogus devices and duplicates from device_dirs
+ device_dirs = list(
+ set(listdir(devices)).intersection(set(device_dirs)))
+ # randomize devices in case of process restart before sweep completed
+ shuffle(device_dirs)
+
+ base, policy = split_policy_string(datadir)
+ for device in device_dirs:
+ if not check_drive(devices, device, mount_check):
+ if logger:
+ logger.debug(
+ 'Skipping %s as it is not %s', device,
+ 'mounted' if mount_check else 'a dir')
+ continue
+
+ datadir_path = os.path.join(devices, device, datadir)
+ if not os.path.exists(datadir_path):
+ continue
+
+ partitions = get_auditor_status(datadir_path, logger, auditor_type)
+
+ for pos, partition in enumerate(partitions):
+ update_auditor_status(datadir_path, logger,
+ partitions[pos:], auditor_type)
+ part_path = os.path.join(datadir_path, partition)
+ try:
+ suffixes = vfile.listdir(part_path)
+ except OSError as e:
+ if e.errno != errno.ENOTDIR:
+ raise
+ continue
+ for asuffix in suffixes:
+ suff_path = os.path.join(part_path, asuffix)
+ try:
+ hashes = vfile.listdir(suff_path)
+ except OSError as e:
+ if e.errno != errno.ENOTDIR:
+ raise
+ continue
+ for hsh in hashes:
+ hsh_path = os.path.join(suff_path, hsh)
+ yield AuditLocation(hsh_path, device, partition,
+ policy)
+
+ update_auditor_status(datadir_path, logger, [], auditor_type)
+
+
+def get_auditor_status(datadir_path, logger, auditor_type):
+ auditor_status = os.path.join(
+ datadir_path, "auditor_status_%s.json" % auditor_type)
+ status = {}
+ try:
+ if six.PY3:
+ statusfile = open(auditor_status, encoding='utf8')
+ else:
+ statusfile = open(auditor_status, 'rb')
+ with statusfile:
+ status = statusfile.read()
+ except (OSError, IOError) as e:
+ if e.errno != errno.ENOENT and logger:
+ logger.warning(_('Cannot read %(auditor_status)s (%(err)s)') %
+ {'auditor_status': auditor_status, 'err': e})
+ return vfile.listdir(datadir_path)
+ try:
+ status = json.loads(status)
+ except ValueError as e:
+ logger.warning(_('Loading JSON from %(auditor_status)s failed'
+ ' (%(err)s)') %
+ {'auditor_status': auditor_status, 'err': e})
+ return vfile.listdir(datadir_path)
+ return status['partitions']
+
+
+class BaseKVFile(BaseDiskFile):
+ # Todo: we may want a separate __init__ to define KV specific attribute
+ def open(self, modernize=False, current_time=None):
+ """
+ Open the object.
+
+ This implementation opens the data file representing the object, reads
+ the associated metadata in the extended attributes, additionally
+ combining metadata from fast-POST `.meta` files.
+
+ :param modernize: if set, update this diskfile to the latest format.
+ Currently, this means adding metadata checksums if none are
+ present.
+ :param current_time: Unix time used in checking expiration. If not
+ present, the current time will be used.
+
+ .. note::
+
+ An implementation is allowed to raise any of the following
+ exceptions, but is only required to raise `DiskFileNotExist` when
+ the object representation does not exist.
+
+ :raises DiskFileCollision: on name mis-match with metadata
+ :raises DiskFileNotExist: if the object does not exist
+ :raises DiskFileDeleted: if the object was previously deleted
+ :raises DiskFileQuarantined: if while reading metadata of the file
+ some data did pass cross checks
+ :returns: itself for use as a context manager
+ """
+ try:
+ files = vfile.listdir(self._datadir)
+ except (OSError, vfile.VFileException) as err:
+ raise DiskFileError(
+ "Error listing directory %s: %s" % (self._datadir, err))
+
+ # gather info about the valid files to use to open the DiskFile
+ file_info = self._get_ondisk_files(files)
+
+ self._data_file = file_info.get('data_file')
+ if not self._data_file:
+ raise self._construct_exception_from_ts_file(**file_info)
+ self._vfr = self._construct_from_data_file(
+ current_time=current_time, modernize=modernize, **file_info)
+ # This method must populate the internal _metadata attribute.
+ self._metadata = self._metadata or {}
+ return self
+
+ def _verify_data_file(self, data_file, data_vfr, current_time):
+ """
+ Verify the metadata's name value matches what we think the object is
+ named.
+
+ :param data_file: data file name being consider, used when quarantines
+ occur
+ :param fp: open file pointer so that we can `fstat()` the file to
+ verify the on-disk size with Content-Length metadata value
+ :param current_time: Unix time used in checking expiration
+ :raises DiskFileCollision: if the metadata stored name does not match
+ the referenced name of the file
+ :raises DiskFileExpired: if the object has expired
+ :raises DiskFileQuarantined: if data inconsistencies were detected
+ between the metadata and the file-system
+ metadata
+ """
+ try:
+ mname = self._metadata['name']
+ except KeyError:
+ raise self._quarantine(data_file, "missing name metadata")
+ else:
+ if mname != self._name:
+ self._logger.error(
+ _('Client path %(client)s does not match '
+ 'path stored in object metadata %(meta)s'),
+ {'client': self._name, 'meta': mname})
+ raise DiskFileCollision('Client path does not match path '
+ 'stored in object metadata')
+ try:
+ x_delete_at = int(self._metadata['X-Delete-At'])
+ except KeyError:
+ pass
+ except ValueError:
+ # Quarantine, the x-delete-at key is present but not an
+ # integer.
+ raise self._quarantine(
+ data_file, "bad metadata x-delete-at value %s" % (
+ self._metadata['X-Delete-At']))
+ else:
+ if current_time is None:
+ current_time = time.time()
+ if x_delete_at <= current_time and not self._open_expired:
+ raise DiskFileExpired(metadata=self._metadata)
+ try:
+ metadata_size = int(self._metadata['Content-Length'])
+ except KeyError:
+ raise self._quarantine(
+ data_file, "missing content-length in metadata")
+ except ValueError:
+ # Quarantine, the content-length key is present but not an
+ # integer.
+ raise self._quarantine(
+ data_file, "bad metadata content-length value %s" % (
+ self._metadata['Content-Length']))
+
+ obj_size = data_vfr.data_size
+ if obj_size != metadata_size:
+ raise self._quarantine(
+ data_file, "metadata content-length %s does"
+ " not match actual object size %s" % (
+ metadata_size, obj_size))
+ self._content_length = obj_size
+ return obj_size
+
+ def _failsafe_read_metadata(self, source, quarantine_filename=None,
+ add_missing_checksum=False):
+ """
+ Read metadata from source object file. In case of failure, quarantine
+ the file.
+
+ Takes source and filename separately so we can read from an open
+ file if we have one.
+
+ :param source: file descriptor or filename to load the metadata from
+ :param quarantine_filename: full path of file to load the metadata from
+ :param add_missing_checksum: ignored
+ """
+ try:
+ vfr = vfile.VFileReader.get_vfile(source, self._logger)
+ vfr_metadata = vfr.metadata
+ vfr.close()
+ return vfr_metadata
+ except (DiskFileXattrNotSupported, DiskFileNotExist):
+ raise
+ except DiskFileBadMetadataChecksum as err:
+ raise self._quarantine(quarantine_filename, str(err))
+ except Exception as err:
+ raise self._quarantine(
+ quarantine_filename,
+ "Exception reading metadata: %s" % err)
+
+ # This could be left unchanged now that _failsafe_read_metadata() is
+ # patched. Test it.
+ def _merge_content_type_metadata(self, ctype_file):
+ """
+ When a second .meta file is providing the most recent Content-Type
+ metadata then merge it into the metafile_metadata.
+
+ :param ctype_file: An on-disk .meta file
+ """
+ try:
+ ctype_vfr = vfile.VFileReader.get_vfile(ctype_file, self._logger)
+ except IOError as e:
+ if e.errno == errno.ENOENT:
+ raise DiskFileNotExist()
+ raise
+ ctypefile_metadata = ctype_vfr.metadata
+ ctype_vfr.close()
+ if ('Content-Type' in ctypefile_metadata
+ and (ctypefile_metadata.get('Content-Type-Timestamp') >
+ self._metafile_metadata.get('Content-Type-Timestamp'))
+ and (ctypefile_metadata.get('Content-Type-Timestamp') >
+ self.data_timestamp)):
+ self._metafile_metadata['Content-Type'] = \
+ ctypefile_metadata['Content-Type']
+ self._metafile_metadata['Content-Type-Timestamp'] = \
+ ctypefile_metadata.get('Content-Type-Timestamp')
+
+ def _construct_from_data_file(self, data_file, meta_file, ctype_file,
+ current_time, modernize=False,
+ **kwargs):
+ """
+ Open the `.data` file to fetch its metadata, and fetch the metadata
+ from fast-POST `.meta` files as well if any exist, merging them
+ properly.
+
+ :param data_file: on-disk `.data` file being considered
+ :param meta_file: on-disk fast-POST `.meta` file being considered
+ :param ctype_file: on-disk fast-POST `.meta` file being considered that
+ contains content-type and content-type timestamp
+ :param current_time: Unix time used in checking expiration
+ :param modernize: ignored
+ :returns: an opened data file pointer
+ :raises DiskFileError: various exceptions from
+ :func:`swift.obj.diskfile.DiskFile._verify_data_file`
+ """
+ # TODO: need to catch exception, check if ENOENT (see in diskfile)
+ try:
+ data_vfr = vfile.VFileReader.get_vfile(data_file, self._logger)
+ except IOError as e:
+ if e.errno == errno.ENOENT:
+ raise DiskFileNotExist()
+ raise
+
+ self._datafile_metadata = data_vfr.metadata
+
+ self._metadata = {}
+ if meta_file:
+ self._metafile_metadata = self._failsafe_read_metadata(
+ meta_file, meta_file)
+
+ if ctype_file and ctype_file != meta_file:
+ self._merge_content_type_metadata(ctype_file)
+ sys_metadata = dict(
+ [(key, val) for key, val in self._datafile_metadata.items()
+ if key.lower() in (RESERVED_DATAFILE_META |
+ DATAFILE_SYSTEM_META)
+ or is_sys_meta('object', key)])
+ self._metadata.update(self._metafile_metadata)
+ self._metadata.update(sys_metadata)
+ # diskfile writer added 'name' to metafile, so remove it here
+ self._metafile_metadata.pop('name', None)
+ # TODO: the check for Content-Type is only here for tests that
+ # create .data files without Content-Type
+ if ('Content-Type' in self._datafile_metadata and
+ (self.data_timestamp >
+ self._metafile_metadata.get('Content-Type-Timestamp'))):
+ self._metadata['Content-Type'] = \
+ self._datafile_metadata['Content-Type']
+ self._metadata.pop('Content-Type-Timestamp', None)
+ else:
+ self._metadata.update(self._datafile_metadata)
+ if self._name is None:
+ # If we don't know our name, we were just given a hash dir at
+ # instantiation, so we'd better validate that the name hashes back
+ # to us
+ self._name = self._metadata['name']
+ self._verify_name_matches_hash(data_file)
+ self._verify_data_file(data_file, data_vfr, current_time)
+ return data_vfr
+
+ def reader(self, keep_cache=False,
+ _quarantine_hook=lambda m: None):
+ """
+ Return a :class:`swift.common.swob.Response` class compatible
+ "`app_iter`" object as defined by
+ :class:`swift.obj.diskfile.DiskFileReader`.
+
+ For this implementation, the responsibility of closing the open file
+ is passed to the :class:`swift.obj.diskfile.DiskFileReader` object.
+
+ :param keep_cache: caller's preference for keeping data read in the
+ OS buffer cache
+ :param _quarantine_hook: 1-arg callable called when obj quarantined;
+ the arg is the reason for quarantine.
+ Default is to ignore it.
+ Not needed by the REST layer.
+ :returns: a :class:`swift.obj.diskfile.DiskFileReader` object
+ """
+ dr = self.reader_cls(
+ self._vfr, self._data_file, int(self._metadata['Content-Length']),
+ self._metadata['ETag'], self._disk_chunk_size,
+ self._manager.keep_cache_size, self._device_path, self._logger,
+ use_splice=self._use_splice, quarantine_hook=_quarantine_hook,
+ pipe_size=self._pipe_size, diskfile=self, keep_cache=keep_cache)
+ # At this point the reader object is now responsible for closing
+ # the file pointer.
+ # self._fp = None
+ self._vfr = None
+ return dr
+
+ def writer(self, size=None):
+ return self.writer_cls(self._manager, self._name, self._datadir, size,
+ self._bytes_per_sync, self, self._logger)
+
+ def _get_tempfile(self):
+ raise Exception("_get_tempfile called, shouldn't happen")
+
+ @contextmanager
+ def create(self, size=None, extension=None):
+ """
+ Context manager to create a vfile.
+ It could create separate volumes based on the extension.
+
+ Currently no caller will set the extension. The easiest would be to
+ patch server.py, in DELETE(), add an extension=".ts" parameter to the
+ self.get_diskfile() call, as kwargs is passed all the way down to here.
+
+ It's also possible to have the writer_cls handle the volume creation
+ later: at the first write() call, if any, assume it's not a tombstone,
+ and in put(), check self._extension.
+
+ :param size: optional initial size of file to explicitly allocate on
+ disk
+ :param extension: file extension, with dot ('.ts')
+ :raises DiskFileNoSpace: if a size is specified and allocation fails
+ """
+ dfw = self.writer(size)
+ try:
+ yield dfw.open()
+ finally:
+ dfw.close()
+
+
+class BaseKVFileReader(BaseDiskFileReader):
+ def __init__(self, vfr, data_file, obj_size, etag,
+ disk_chunk_size, keep_cache_size, device_path, logger,
+ quarantine_hook, use_splice, pipe_size, diskfile,
+ keep_cache=False):
+ # Parameter tracking
+ self._vfr = vfr
+ self._data_file = data_file
+ self._obj_size = obj_size
+ self._etag = etag
+ self._diskfile = diskfile
+ self._disk_chunk_size = disk_chunk_size
+ self._device_path = device_path
+ self._logger = logger
+ self._quarantine_hook = quarantine_hook
+ self._use_splice = use_splice
+ self._pipe_size = pipe_size
+ if keep_cache:
+ # Caller suggests we keep this in cache, only do it if the
+ # object's size is less than the maximum.
+ self._keep_cache = obj_size < keep_cache_size
+ else:
+ self._keep_cache = False
+
+ # Internal Attributes
+ self._iter_etag = None
+ self._bytes_read = 0
+ self._started_at_0 = False
+ self._read_to_eof = False
+ self._md5_of_sent_bytes = None
+ self._suppress_file_closing = False
+ self._quarantined_dir = None
+
+ def _init_checks(self):
+ if self._vfr.tell() == 0:
+ self._started_at_0 = True
+ self._iter_etag = md5()
+
+ def __iter__(self):
+ """Returns an iterator over the data file."""
+ try:
+ self._bytes_read = 0
+ self._started_at_0 = False
+ self._read_to_eof = False
+ self._init_checks()
+ while True:
+ chunk = self._vfr.read(self._disk_chunk_size)
+ if chunk:
+ self._update_checks(chunk)
+ self._bytes_read += len(chunk)
+ yield chunk
+ else:
+ self._read_to_eof = True
+ break
+ finally:
+ if not self._suppress_file_closing:
+ self.close()
+
+ def can_zero_copy_send(self):
+ return False
+
+ def app_iter_range(self, start, stop):
+ """
+ Returns an iterator over the data file for range (start, stop)
+
+ """
+ if start or start == 0:
+ self._vfr.seek(start)
+ if stop is not None:
+ length = stop - start
+ else:
+ length = None
+ try:
+ for chunk in self:
+ if length is not None:
+ length -= len(chunk)
+ if length < 0:
+ # Chop off the extra:
+ yield chunk[:length]
+ break
+ yield chunk
+ finally:
+ if not self._suppress_file_closing:
+ self.close()
+
+ def close(self):
+ """
+ Close the open file handle if present.
+
+ For this specific implementation, this method will handle quarantining
+ the file if necessary.
+ """
+ if self._vfr:
+ try:
+ if self._started_at_0 and self._read_to_eof:
+ self._handle_close_quarantine()
+ except DiskFileQuarantined:
+ raise
+ except (Exception, Timeout) as e:
+ self._logger.error(_(
+ 'ERROR DiskFile %(data_file)s'
+ ' close failure: %(exc)s : %(stack)s'),
+ {'exc': e, 'stack': ''.join(traceback.format_stack()),
+ 'data_file': self._data_file})
+ finally:
+ vfr, self._vfr = self._vfr, None
+ vfr.close()
+
+
+class BaseKVFileWriter(BaseDiskFileWriter):
+ def __init__(self, mgr, name, datadir, size, bytes_per_sync, diskfile,
+ logger):
+ # Parameter tracking
+ self._manager = mgr
+ self._name = name
+ self._datadir = datadir
+ self._vfile_writer = None
+ self._tmppath = None
+ self._size = size
+ self._chunks_etag = md5()
+ self._bytes_per_sync = bytes_per_sync
+ self._diskfile = diskfile
+ self._logger = logger
+
+ # Internal attributes
+ self._upload_size = 0
+ self._last_sync = 0
+ self._extension = '.data'
+ self._put_succeeded = False
+
+ def open(self):
+ if self._vfile_writer is not None:
+ raise ValueError('DiskFileWriter is already open')
+
+ try:
+ # TODO: support extension
+ self._vfile_writer = vfile.VFileWriter.create(
+ self._datadir, self._size, self._manager.vfile_conf,
+ self._logger, extension=None)
+ except OSError as err:
+ if err.errno in (errno.ENOSPC, errno.EDQUOT):
+ # No more inodes in filesystem
+ raise DiskFileNoSpace(err.strerror)
+ raise
+ return self
+
+ def close(self):
+ if self._vfile_writer:
+ try:
+ os.close(self._vfile_writer.fd)
+ os.close(self._vfile_writer.lock_fd)
+ except OSError:
+ pass
+ self._vfile_writer = None
+
+ def write(self, chunk):
+ """
+ Write a chunk of data to disk. All invocations of this method must
+ come before invoking the :func:
+
+ :param chunk: the chunk of data to write as a string object
+
+ :returns: the total number of bytes written to an object
+ """
+
+ if not self._vfile_writer:
+ raise ValueError('Writer is not open')
+ self._chunks_etag.update(chunk)
+ while chunk:
+ written = os.write(self._vfile_writer.fd, chunk)
+ self._upload_size += written
+ chunk = chunk[written:]
+
+ # For large files sync every 512MB (by default) written
+ diff = self._upload_size - self._last_sync
+ if diff >= self._bytes_per_sync:
+ tpool.execute(fdatasync, self._vfile_writer.fd)
+ # drop_buffer_cache(self._vfile_writer.fd, self._last_sync, diff)
+ self._last_sync = self._upload_size
+
+ return self._upload_size
+
+ def _finalize_put(self, metadata, target_path, cleanup):
+ filename = basename(target_path)
+ # write metadata and sync
+ self._vfile_writer.commit(filename, _encode_metadata(metadata))
+ self._put_succeeded = True
+ if cleanup:
+ try:
+ self.manager.cleanup_ondisk_files(self._datadir)['files']
+ except OSError:
+ logging.exception(_('Problem cleaning up %s'), self._datadir)
+
+
+class KVFileReader(BaseKVFileReader, DiskFileReader):
+ pass
+
+
+class KVFileWriter(BaseKVFileWriter, DiskFileWriter):
+ def put(self, metadata):
+ """
+ Finalize writing the file on disk.
+
+ :param metadata: dictionary of metadata to be associated with the
+ object
+ """
+ super(KVFileWriter, self)._put(metadata, True)
+
+
+class KVFile(BaseKVFile, DiskFile):
+ reader_cls = KVFileReader
+ writer_cls = KVFileWriter
+
+
+class BaseKVFileManager(BaseDiskFileManager):
+ diskfile_cls = None # must be set by subclasses
+
+ invalidate_hash = strip_self(invalidate_hash)
+ consolidate_hashes = strip_self(consolidate_hashes)
+ quarantine_renamer = strip_self(quarantine_vrenamer)
+
+ def __init__(self, conf, logger):
+ self.logger = logger
+ self.devices = conf.get('devices', '/srv/node')
+ self.disk_chunk_size = int(conf.get('disk_chunk_size', 65536))
+ self.keep_cache_size = int(conf.get('keep_cache_size', 5242880))
+ self.bytes_per_sync = int(conf.get('mb_per_sync', 512)) * 1024 * 1024
+ # vfile specific config
+ self.vfile_conf = {}
+ self.vfile_conf['volume_alloc_chunk_size'] = int(
+ conf.get('volume_alloc_chunk_size', 16 * 1024))
+ self.vfile_conf['volume_low_free_space'] = int(
+ conf.get('volume_low_free_space', 8 * 1024))
+ self.vfile_conf['metadata_reserve'] = int(
+ conf.get('metadata_reserve', 500))
+ self.vfile_conf['max_volume_count'] = int(
+ conf.get('max_volume_count', 1000))
+ self.vfile_conf['max_volume_size'] = int(
+ conf.get('max_volume_size', 10 * 1024 * 1024 * 1024))
+ # end vfile specific config
+ self.mount_check = config_true_value(conf.get('mount_check', 'true'))
+
+ self.reclaim_age = int(conf.get('reclaim_age', DEFAULT_RECLAIM_AGE))
+ replication_concurrency_per_device = conf.get(
+ 'replication_concurrency_per_device')
+ replication_one_per_device = conf.get('replication_one_per_device')
+ if replication_concurrency_per_device is None \
+ and replication_one_per_device is not None:
+ self.logger.warning('Option replication_one_per_device is '
+ 'deprecated and will be removed in a future '
+ 'version. Update your configuration to use '
+ 'option replication_concurrency_per_device.')
+ if config_true_value(replication_one_per_device):
+ replication_concurrency_per_device = 1
+ else:
+ replication_concurrency_per_device = 0
+ elif replication_one_per_device is not None:
+ self.logger.warning('Option replication_one_per_device ignored as '
+ 'replication_concurrency_per_device is '
+ 'defined.')
+ if replication_concurrency_per_device is None:
+ self.replication_concurrency_per_device = 1
+ else:
+ self.replication_concurrency_per_device = int(
+ replication_concurrency_per_device)
+ self.replication_lock_timeout = int(conf.get(
+ 'replication_lock_timeout', 15))
+
+ self.use_splice = False
+ self.pipe_size = None
+
+ def cleanup_ondisk_files(self, hsh_path, **kwargs):
+ """
+ Clean up on-disk files that are obsolete and gather the set of valid
+ on-disk files for an object.
+
+ :param hsh_path: object hash path
+ :param frag_index: if set, search for a specific fragment index .data
+ file, otherwise accept the first valid .data file
+ :returns: a dict that may contain: valid on disk files keyed by their
+ filename extension; a list of obsolete files stored under the
+ key 'obsolete'; a list of files remaining in the directory,
+ reverse sorted, stored under the key 'files'.
+ """
+
+ def is_reclaimable(timestamp):
+ return (time.time() - float(timestamp)) > self.reclaim_age
+
+ files = vfile.listdir(hsh_path)
+
+ files.sort(reverse=True)
+ results = self.get_ondisk_files(
+ files, hsh_path, verify=False, **kwargs)
+ if 'ts_info' in results and is_reclaimable(
+ results['ts_info']['timestamp']):
+ remove_vfile(join(hsh_path, results['ts_info']['filename']))
+ files.remove(results.pop('ts_info')['filename'])
+ for file_info in results.get('possible_reclaim', []):
+ # stray files are not deleted until reclaim-age
+ if is_reclaimable(file_info['timestamp']):
+ results.setdefault('obsolete', []).append(file_info)
+ for file_info in results.get('obsolete', []):
+ remove_vfile(join(hsh_path, file_info['filename']))
+ files.remove(file_info['filename'])
+ results['files'] = files
+
+ return results
+
+ def object_audit_location_generator(self, policy, device_dirs=None,
+ auditor_type="ALL"):
+ """
+ Yield an AuditLocation for all objects stored under device_dirs.
+
+ :param policy: the StoragePolicy instance
+ :param device_dirs: directory of target device
+ :param auditor_type: either ALL or ZBF
+ """
+ datadir = get_data_dir(policy)
+ return object_audit_location_generator(self.devices, datadir,
+ self.mount_check,
+ self.logger, device_dirs,
+ auditor_type)
+
+ def _hash_suffix_dir(self, path, policy):
+ """
+
+ :param path: full path to directory
+ :param policy: storage policy used
+ """
+ if six.PY2:
+ hashes = defaultdict(md5)
+ else:
+ class shim(object):
+ def __init__(self):
+ self.md5 = md5()
+
+ def update(self, s):
+ if isinstance(s, str):
+ self.md5.update(s.encode('utf-8'))
+ else:
+ self.md5.update(s)
+
+ def hexdigest(self):
+ return self.md5.hexdigest()
+ hashes = defaultdict(shim)
+ try:
+ path_contents = sorted(vfile.listdir(path))
+ except OSError as err:
+ if err.errno in (errno.ENOTDIR, errno.ENOENT):
+ raise PathNotDir()
+ raise
+ for hsh in path_contents:
+ hsh_path = os.path.join(path, hsh)
+ try:
+ ondisk_info = self.cleanup_ondisk_files(
+ hsh_path, policy=policy)
+ except OSError as err:
+ if err.errno == errno.ENOTDIR:
+ partition_path = os.path.dirname(path)
+ objects_path = os.path.dirname(partition_path)
+ device_path = os.path.dirname(objects_path)
+ # The made-up filename is so that the eventual dirpath()
+ # will result in this object directory that we care about.
+ # Some failures will result in an object directory
+ # becoming a file, thus causing the parent directory to
+ # be qarantined.
+ quar_path = quarantine_vrenamer(
+ device_path, os.path.join(
+ hsh_path, "made-up-filename"))
+ logging.exception(
+ _('Quarantined %(hsh_path)s to %(quar_path)s because '
+ 'it is not a directory'), {'hsh_path': hsh_path,
+ 'quar_path': quar_path})
+ continue
+ raise
+ if not ondisk_info['files']:
+ continue
+
+ # ondisk_info has info dicts containing timestamps for those
+ # files that could determine the state of the diskfile if it were
+ # to be opened. We update the suffix hash with the concatenation of
+ # each file's timestamp and extension. The extension is added to
+ # guarantee distinct hash values from two object dirs that have
+ # different file types at the same timestamp(s).
+ #
+ # Files that may be in the object dir but would have no effect on
+ # the state of the diskfile are not used to update the hash.
+ for key in (k for k in ('meta_info', 'ts_info')
+ if k in ondisk_info):
+ info = ondisk_info[key]
+ hashes[None].update(info['timestamp'].internal + info['ext'])
+
+ # delegate to subclass for data file related updates...
+ self._update_suffix_hashes(hashes, ondisk_info)
+
+ if 'ctype_info' in ondisk_info:
+ # We have a distinct content-type timestamp so update the
+ # hash. As a precaution, append '_ctype' to differentiate this
+ # value from any other timestamp value that might included in
+ # the hash in future. There is no .ctype file so use _ctype to
+ # avoid any confusion.
+ info = ondisk_info['ctype_info']
+ hashes[None].update(info['ctype_timestamp'].internal
+ + '_ctype')
+
+ return hashes
+
+ def _get_hashes(self, *args, **kwargs):
+ hashed, hashes = self.__get_hashes(*args, **kwargs)
+ hashes.pop('updated', None)
+ hashes.pop('valid', None)
+ return hashed, hashes
+
+ def __get_hashes(self, device, partition, policy, recalculate=None,
+ do_listdir=False):
+ """
+ Get hashes for each suffix dir in a partition. do_listdir causes it to
+ mistrust the hash cache for suffix existence at the (unexpectedly high)
+ cost of a listdir.
+
+ :param device: name of target device
+ :param partition: partition on the device in which the object lives
+ :param policy: the StoragePolicy instance
+ :param recalculate: list of suffixes which should be recalculated when
+ got
+ :param do_listdir: force existence check for all hashes in the
+ partition
+
+ :returns: tuple of (number of suffix dirs hashed, dictionary of hashes)
+ """
+ hashed = 0
+ dev_path = self.get_dev_path(device)
+ partition_path = get_part_path(dev_path, policy, partition)
+ hashes_file = os.path.join(partition_path, HASH_FILE)
+ modified = False
+ orig_hashes = {'valid': False}
+
+ if recalculate is None:
+ recalculate = []
+
+ try:
+ orig_hashes = self.consolidate_hashes(partition_path)
+ except Exception:
+ self.logger.warning('Unable to read %r', hashes_file,
+ exc_info=True)
+
+ if not orig_hashes['valid']:
+ # This is the only path to a valid hashes from invalid read (e.g.
+ # does not exist, corrupt, etc.). Moreover, in order to write this
+ # valid hashes we must read *the exact same* invalid state or we'll
+ # trigger race detection.
+ do_listdir = True
+ hashes = {'valid': True}
+ # If the exception handling around consolidate_hashes fired we're
+ # going to do a full rehash regardless; but we need to avoid
+ # needless recursion if the on-disk hashes.pkl is actually readable
+ # (worst case is consolidate_hashes keeps raising exceptions and we
+ # eventually run out of stack).
+ # N.B. orig_hashes invalid only effects new parts and error/edge
+ # conditions - so try not to get overly caught up trying to
+ # optimize it out unless you manage to convince yourself there's a
+ # bad behavior.
+ orig_hashes = read_hashes(partition_path)
+ else:
+ hashes = copy.deepcopy(orig_hashes)
+
+ if do_listdir:
+ for suff in vfile.listdir(partition_path):
+ if len(suff) == 3:
+ hashes.setdefault(suff, None)
+ modified = True
+ hashes.update((suffix, None) for suffix in recalculate)
+ for suffix, hash_ in list(hashes.items()):
+ if not hash_:
+ suffix_dir = os.path.join(partition_path, suffix)
+ try:
+ hashes[suffix] = self._hash_suffix(
+ suffix_dir, policy=policy)
+ hashed += 1
+ except PathNotDir:
+ del hashes[suffix]
+ except OSError:
+ logging.exception(_('Error hashing suffix'))
+ modified = True
+ if modified:
+ with lock_path(partition_path):
+ if read_hashes(partition_path) == orig_hashes:
+ write_hashes(partition_path, hashes)
+ return hashed, hashes
+ return self.__get_hashes(device, partition, policy,
+ recalculate=recalculate,
+ do_listdir=do_listdir)
+ else:
+ return hashed, hashes
+
+ def get_diskfile_from_hash(self, device, partition, object_hash,
+ policy, **kwargs):
+ """
+ Returns a DiskFile instance for an object at the given
+ object_hash. Just in case someone thinks of refactoring, be
+ sure DiskFileDeleted is *not* raised, but the DiskFile
+ instance representing the tombstoned object is returned
+ instead.
+
+ :param device: name of target device
+ :param partition: partition on the device in which the object lives
+ :param object_hash: the hash of an object path
+ :param policy: the StoragePolicy instance
+ :raises DiskFileNotExist: if the object does not exist
+ :returns: an instance of BaseDiskFile
+ """
+ dev_path = self.get_dev_path(device)
+ if not dev_path:
+ raise DiskFileDeviceUnavailable()
+ object_path = join(
+ dev_path, get_data_dir(policy), str(partition), object_hash[-3:],
+ object_hash)
+ try:
+ filenames = self.cleanup_ondisk_files(object_path)['files']
+ except OSError as err:
+ if err.errno == errno.ENOTDIR:
+ quar_path = self.quarantine_renamer(dev_path, object_path)
+ logging.exception(
+ _('Quarantined %(object_path)s to %(quar_path)s because '
+ 'it is not a directory'), {'object_path': object_path,
+ 'quar_path': quar_path})
+ raise DiskFileNotExist()
+ if err.errno != errno.ENOENT:
+ raise
+ raise DiskFileNotExist()
+ if not filenames:
+ raise DiskFileNotExist()
+
+ try:
+ vf = vfile.VFileReader.get_vfile(join(object_path, filenames[-1]),
+ self.logger)
+ metadata = vf.metadata
+ vf.close()
+ except EOFError:
+ raise DiskFileNotExist()
+ try:
+ account, container, obj = split_path(
+ metadata.get('name', ''), 3, 3, True)
+ except ValueError:
+ raise DiskFileNotExist()
+ return self.diskfile_cls(self, dev_path,
+ partition, account, container, obj,
+ policy=policy, **kwargs)
+
+ def _listdir(self, path):
+ """
+ :param path: full path to directory
+ """
+ try:
+ return vfile.listdir(path)
+ except OSError as err:
+ if err.errno != errno.ENOENT:
+ self.logger.error(
+ 'ERROR: Skipping %r due to error with listdir attempt: %s',
+ path, err)
+ return []
+
+ def exists(self, path):
+ """
+ :param path: full path to directory
+ """
+ return vfile.exists(path)
+
+ def mkdirs(self, path):
+ """
+ :param path: full path to directory
+ """
+ return vfile.mkdirs(path)
+
+ def listdir(self, path):
+ """
+ :param path: full path to directory
+ """
+ return vfile.listdir(path)
+
+ def rmtree(self, path, ignore_errors=False):
+ vfile.rmtree(path)
+
+ def remove_file(self, path):
+ """
+ similar to utils.remove_file
+ :param path: full path to directory
+ """
+ try:
+ return vfile.delete_vfile_from_path(path)
+ except (OSError, vfile.VFileException):
+ pass
+
+ def remove(self, path):
+ """
+ :param path: full path to directory
+ """
+ return vfile.delete_vfile_from_path(path)
+
+ def isdir(self, path):
+ """
+ :param path: full path to directory
+ """
+ return vfile.isdir(path)
+
+ def isfile(self, path):
+ """
+ :param path: full path to directory
+ """
+ return vfile.isfile(path)
+
+ def rmdir(self, path):
+ """
+ :param path: full path to directory
+ """
+ pass
+
+
+class KVFileManager(BaseKVFileManager, DiskFileManager):
+ diskfile_cls = KVFile
+ policy_type = REPL_POLICY
+
+
+class ECKVFileReader(BaseKVFileReader, ECDiskFileReader):
+ def __init__(self, vfr, data_file, obj_size, etag,
+ disk_chunk_size, keep_cache_size, device_path, logger,
+ quarantine_hook, use_splice, pipe_size, diskfile,
+ keep_cache=False):
+ super(ECKVFileReader, self).__init__(
+ vfr, data_file, obj_size, etag,
+ disk_chunk_size, keep_cache_size, device_path, logger,
+ quarantine_hook, use_splice, pipe_size, diskfile, keep_cache)
+ self.frag_buf = None
+ self.frag_offset = 0
+ self.frag_size = self._diskfile.policy.fragment_size
+
+ def _init_checks(self):
+ super(ECKVFileReader, self)._init_checks()
+ # for a multi-range GET this will be called at the start of each range;
+ # only initialise the frag_buf for reads starting at 0.
+ # TODO: reset frag buf to '' if tell() shows that start is on a frag
+ # boundary so that we check frags selected by a range not starting at 0
+ # ECDOING - check _started_at_0 is defined correctly
+ if self._started_at_0:
+ self.frag_buf = ''
+ else:
+ self.frag_buf = None
+
+ def _update_checks(self, chunk):
+ # super(ECKVFileReader, self)._update_checks(chunk)
+
+ # Because of python's MRO, this will call
+ # ECDiskFileReader._update_checks, and blow up.
+ # rather than mess with the class's mro() function, explicitely call
+ # the one we want.
+ BaseDiskFileReader._update_checks(self, chunk)
+ if self.frag_buf is not None:
+ self.frag_buf += chunk
+ cursor = 0
+ while len(self.frag_buf) >= cursor + self.frag_size:
+ self._check_frag(self.frag_buf[cursor:cursor + self.frag_size])
+ cursor += self.frag_size
+ self.frag_offset += self.frag_size
+ if cursor:
+ self.frag_buf = self.frag_buf[cursor:]
+
+ def _handle_close_quarantine(self):
+ # super(ECKVFileReader, self)._handle_close_quarantine()
+ BaseDiskFileReader._handle_close_quarantine(self)
+ self._check_frag(self.frag_buf)
+
+
+class ECKVFileWriter(BaseKVFileWriter, ECDiskFileWriter):
+ # TODO: this needs to be updated wrt. next_part_power, and other changes
+ # in diskfile.py
+ def _finalize_durable(self, data_file_path, durable_data_file_path,
+ timestamp):
+ exc = None
+ try:
+ try:
+ vfile.rename_vfile(data_file_path, durable_data_file_path,
+ self._diskfile._logger)
+ except (OSError, IOError) as err:
+ if err.errno == errno.ENOENT:
+ files = vfile.listdir(self._datadir)
+ results = self.manager.get_ondisk_files(
+ files, self._datadir,
+ frag_index=self._diskfile._frag_index,
+ policy=self._diskfile.policy)
+ # We "succeeded" if another writer cleaned up our data
+ ts_info = results.get('ts_info')
+ durables = results.get('durable_frag_set', [])
+ if ts_info and ts_info['timestamp'] > timestamp:
+ return
+ elif any(frag_set['timestamp'] > timestamp
+ for frag_set in durables):
+ return
+
+ if err.errno not in (errno.ENOSPC, errno.EDQUOT):
+ # re-raise to catch all handler
+ raise
+ params = {'file': durable_data_file_path, 'err': err}
+ self.manager.logger.exception(
+ _('No space left on device for %(file)s (%(err)s)'),
+ params)
+ exc = DiskFileNoSpace(
+ 'No space left on device for %(file)s (%(err)s)' % params)
+ else:
+ try:
+ self.manager.cleanup_ondisk_files(self._datadir)['files']
+ except OSError as os_err:
+ self.manager.logger.exception(
+ _('Problem cleaning up %(datadir)s (%(err)s)'),
+ {'datadir': self._datadir, 'err': os_err})
+ except Exception as err:
+ params = {'file': durable_data_file_path, 'err': err}
+ self.manager.logger.exception(
+ _('Problem making data file durable %(file)s (%(err)s)'),
+ params)
+ exc = DiskFileError(
+ 'Problem making data file durable %(file)s (%(err)s)' % params)
+ if exc:
+ raise exc
+
+ def put(self, metadata):
+ """
+ The only difference between this method and the replication policy
+ DiskFileWriter method is adding the frag index to the metadata.
+
+ :param metadata: dictionary of metadata to be associated with object
+ """
+ fi = None
+ cleanup = True
+ if self._extension == '.data':
+ # generally we treat the fragment index provided in metadata as
+ # canon, but if it's unavailable (e.g. tests) it's reasonable to
+ # use the frag_index provided at instantiation. Either way make
+ # sure that the fragment index is included in object sysmeta.
+ fi = metadata.setdefault('X-Object-Sysmeta-Ec-Frag-Index',
+ self._diskfile._frag_index)
+ fi = self.manager.validate_fragment_index(fi)
+ self._diskfile._frag_index = fi
+ # defer cleanup until commit() writes makes diskfile durable
+ cleanup = False
+ super(ECKVFileWriter, self)._put(metadata, cleanup, frag_index=fi)
+
+
+class ECKVFile(BaseKVFile, ECDiskFile):
+
+ reader_cls = ECKVFileReader
+ writer_cls = ECKVFileWriter
+
+ def purge(self, timestamp, frag_index):
+ """
+ Remove a tombstone file matching the specified timestamp or
+ datafile matching the specified timestamp and fragment index
+ from the object directory.
+
+ This provides the EC reconstructor/ssync process with a way to
+ remove a tombstone or fragment from a handoff node after
+ reverting it to its primary node.
+
+ The hash will be invalidated, and if empty or invalid the
+ hsh_path will be removed on next cleanup_ondisk_files.
+
+ :param timestamp: the object timestamp, an instance of
+ :class:`~swift.common.utils.Timestamp`
+ :param frag_index: fragment archive index, must be
+ a whole number or None.
+ """
+ purge_file = self.manager.make_on_disk_filename(
+ timestamp, ext='.ts')
+ remove_vfile(os.path.join(self._datadir, purge_file))
+ if frag_index is not None:
+ # data file may or may not be durable so try removing both filename
+ # possibilities
+ purge_file = self.manager.make_on_disk_filename(
+ timestamp, ext='.data', frag_index=frag_index)
+ remove_vfile(os.path.join(self._datadir, purge_file))
+ purge_file = self.manager.make_on_disk_filename(
+ timestamp, ext='.data', frag_index=frag_index, durable=True)
+ remove_vfile(os.path.join(self._datadir, purge_file))
+ # we don't use hashes.pkl files
+ # self.manager.invalidate_hash(dirname(self._datadir))
+
+
+class ECKVFileManager(BaseKVFileManager, ECDiskFileManager):
+ diskfile_cls = ECKVFile
+ policy_type = EC_POLICY
+
+
+def remove_vfile(filepath):
+ try:
+ vfile.delete_vfile_from_path(filepath)
+ except OSError:
+ pass
diff --git a/swift/obj/meta.proto b/swift/obj/meta.proto
new file mode 100644
index 000000000..55d8492f3
--- /dev/null
+++ b/swift/obj/meta.proto
@@ -0,0 +1,14 @@
+syntax = "proto3";
+
+package meta;
+
+// Generate module with: protoc -I. --python_out=. meta.proto
+
+message Attr {
+ bytes key = 1;
+ bytes value = 2;
+}
+
+message Metadata {
+ repeated Attr attrs = 1;
+}
diff --git a/swift/obj/meta_pb2.py b/swift/obj/meta_pb2.py
new file mode 100644
index 000000000..a0cadec66
--- /dev/null
+++ b/swift/obj/meta_pb2.py
@@ -0,0 +1,115 @@
+# -*- coding: utf-8 -*-
+# Generated by the protocol buffer compiler. DO NOT EDIT!
+# source: meta.proto
+
+from google.protobuf import descriptor as _descriptor
+from google.protobuf import message as _message
+from google.protobuf import reflection as _reflection
+from google.protobuf import symbol_database as _symbol_database
+# @@protoc_insertion_point(imports)
+
+_sym_db = _symbol_database.Default()
+
+
+
+
+DESCRIPTOR = _descriptor.FileDescriptor(
+ name='meta.proto',
+ package='meta',
+ syntax='proto3',
+ serialized_options=None,
+ serialized_pb=b'\n\nmeta.proto\x12\x04meta\"\"\n\x04\x41ttr\x12\x0b\n\x03key\x18\x01 \x01(\x0c\x12\r\n\x05value\x18\x02 \x01(\x0c\"%\n\x08Metadata\x12\x19\n\x05\x61ttrs\x18\x01 \x03(\x0b\x32\n.meta.Attrb\x06proto3'
+)
+
+
+
+
+_ATTR = _descriptor.Descriptor(
+ name='Attr',
+ full_name='meta.Attr',
+ filename=None,
+ file=DESCRIPTOR,
+ containing_type=None,
+ fields=[
+ _descriptor.FieldDescriptor(
+ name='key', full_name='meta.Attr.key', index=0,
+ number=1, type=12, cpp_type=9, label=1,
+ has_default_value=False, default_value=b"",
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ serialized_options=None, file=DESCRIPTOR),
+ _descriptor.FieldDescriptor(
+ name='value', full_name='meta.Attr.value', index=1,
+ number=2, type=12, cpp_type=9, label=1,
+ has_default_value=False, default_value=b"",
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ serialized_options=None, file=DESCRIPTOR),
+ ],
+ extensions=[
+ ],
+ nested_types=[],
+ enum_types=[
+ ],
+ serialized_options=None,
+ is_extendable=False,
+ syntax='proto3',
+ extension_ranges=[],
+ oneofs=[
+ ],
+ serialized_start=20,
+ serialized_end=54,
+)
+
+
+_METADATA = _descriptor.Descriptor(
+ name='Metadata',
+ full_name='meta.Metadata',
+ filename=None,
+ file=DESCRIPTOR,
+ containing_type=None,
+ fields=[
+ _descriptor.FieldDescriptor(
+ name='attrs', full_name='meta.Metadata.attrs', index=0,
+ number=1, type=11, cpp_type=10, label=3,
+ has_default_value=False, default_value=[],
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ serialized_options=None, file=DESCRIPTOR),
+ ],
+ extensions=[
+ ],
+ nested_types=[],
+ enum_types=[
+ ],
+ serialized_options=None,
+ is_extendable=False,
+ syntax='proto3',
+ extension_ranges=[],
+ oneofs=[
+ ],
+ serialized_start=56,
+ serialized_end=93,
+)
+
+_METADATA.fields_by_name['attrs'].message_type = _ATTR
+DESCRIPTOR.message_types_by_name['Attr'] = _ATTR
+DESCRIPTOR.message_types_by_name['Metadata'] = _METADATA
+_sym_db.RegisterFileDescriptor(DESCRIPTOR)
+
+Attr = _reflection.GeneratedProtocolMessageType('Attr', (_message.Message,), {
+ 'DESCRIPTOR' : _ATTR,
+ '__module__' : 'meta_pb2'
+ # @@protoc_insertion_point(class_scope:meta.Attr)
+ })
+_sym_db.RegisterMessage(Attr)
+
+Metadata = _reflection.GeneratedProtocolMessageType('Metadata', (_message.Message,), {
+ 'DESCRIPTOR' : _METADATA,
+ '__module__' : 'meta_pb2'
+ # @@protoc_insertion_point(class_scope:meta.Metadata)
+ })
+_sym_db.RegisterMessage(Metadata)
+
+
+# @@protoc_insertion_point(module_scope)
diff --git a/swift/obj/objectrpcmanager.py b/swift/obj/objectrpcmanager.py
new file mode 100644
index 000000000..a64dfb49d
--- /dev/null
+++ b/swift/obj/objectrpcmanager.py
@@ -0,0 +1,157 @@
+# Copyright (c) 2010-2015 OpenStack Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import os
+import subprocess
+import six
+from eventlet import sleep
+
+from swift.common.daemon import Daemon
+from swift.common.ring.utils import is_local_device
+from swift.common.storage_policy import POLICIES, get_policy_string
+from swift.common.utils import PrefixLoggerAdapter, get_logger, \
+ config_true_value, whataremyips
+from swift.obj import rpc_http as rpc
+
+
+class ObjectRpcManager(Daemon):
+ def __init__(self, conf, logger=None):
+ self.conf = conf
+ self.logger = PrefixLoggerAdapter(
+ logger or get_logger(conf, log_route='object-rpcmanager'), {})
+ self.devices_dir = conf.get('devices', '/srv/node')
+ self.mount_check = config_true_value(conf.get('mount_check', 'true'))
+ # use native golang leveldb implementation
+ self.use_go_leveldb = config_true_value(
+ conf.get('use_go_leveldb', 'false'))
+ self.swift_dir = conf.get('swift_dir', '/etc/swift')
+ self.bind_ip = conf.get('bind_ip', '0.0.0.0')
+ self.servers_per_port = int(conf.get('servers_per_port', '0') or 0)
+ self.port = None if self.servers_per_port else \
+ int(conf.get('bind_port', 6200))
+ self.volcheck = conf.get('volcheck', '')
+ self.losf_bin = conf.get('losf_bin', '')
+ self.healthcheck_interval = int(conf.get('healthcheck_interval', 10))
+
+ # check if the path to LOSF binary and volume checker exist
+ if not os.path.exists(self.volcheck):
+ raise AttributeError(
+ "Invalid or missing volcheck in your config file")
+ if not os.path.exists(self.losf_bin):
+ raise AttributeError(
+ "Invalid or missing losf_bin in your config file")
+
+ # this should select only kv enabled policies
+ # (requires loading policies?)
+ self.policies = POLICIES
+ self.ring_check_interval = int(conf.get('ring_check_interval', 15))
+
+ # add RPC state check interval
+ self.kv_disks = self.get_kv_disks()
+
+ def load_object_ring(self, policy):
+ """
+ Make sure the policy's rings are loaded.
+
+ :param policy: the StoragePolicy instance
+ :returns: appropriate ring object
+ """
+ policy.load_ring(self.swift_dir)
+ return policy.object_ring
+
+ # add filter for KV only
+ def get_policy2devices(self):
+ ips = whataremyips(self.bind_ip)
+ policy2devices = {}
+ for policy in self.policies:
+ self.load_object_ring(policy)
+ local_devices = list(six.moves.filter(
+ lambda dev: dev and is_local_device(
+ ips, self.port,
+ dev['replication_ip'], dev['replication_port']),
+ policy.object_ring.devs))
+ policy2devices[policy] = local_devices
+ return policy2devices
+
+ def get_kv_disks(self):
+ """
+ Returns a dict of KV backed policies to list of devices
+ :return: dict
+ """
+ policy2devices = self.get_policy2devices()
+ kv_disks = {}
+ for policy, devs in policy2devices.items():
+ if policy.diskfile_module.endswith(('.kv', '.hybrid')):
+ kv_disks[policy.idx] = [d['device'] for d in devs]
+
+ return kv_disks
+
+ def get_worker_args(self, once=False, **kwargs):
+ """
+ Take the set of all local devices for this node from all the KV
+ backed policies rings.
+
+ :param once: False if the worker(s) will be daemonized, True if the
+ worker(s) will be run once
+ :param kwargs: optional overrides from the command line
+ """
+
+ # Note that this get re-used in is_healthy
+ self.kv_disks = self.get_kv_disks()
+
+ # TODO: what to do in this case ?
+ if not self.kv_disks:
+ # we only need a single worker to do nothing until a ring change
+ yield dict(multiprocess_worker_index=0)
+ return
+
+ for policy_idx, devs in self.kv_disks.iteritems():
+ for dev in devs:
+ disk_path = os.path.join(self.devices_dir, dev)
+ losf_dir = get_policy_string('losf', policy_idx)
+ socket_path = os.path.join(disk_path, losf_dir, 'rpc.socket')
+ yield dict(policy_idx=policy_idx, disk_path=disk_path)
+ yield dict(policy_idx=policy_idx, disk_path=disk_path,
+ socket_path=socket_path, statecheck=True)
+
+ def is_healthy(self):
+ return self.get_kv_disks() == self.kv_disks
+
+ def run_forever(self, policy_idx=None, disk_path=None, socket_path=None,
+ statecheck=None, *args, **kwargs):
+
+ if statecheck:
+ volcheck_args = [self.volcheck, '--disk_path', str(disk_path),
+ '--policy_idx', str(policy_idx),
+ '--keepuser', '--repair', '--no_prompt']
+ # sleep a bit to let the RPC server start. Otherwise it will
+ # timeout and take longer to get the checks started.
+ sleep(2)
+ while True:
+ try:
+ state = rpc.get_kv_state(socket_path)
+ if not state.isClean:
+ self.logger.debug(volcheck_args)
+ subprocess.call(volcheck_args)
+ except Exception:
+ self.logger.exception("state check failed, continue")
+ sleep(10)
+ else:
+ losf_args = ['swift-losf-rpc', '-diskPath', str(disk_path),
+ '-debug', 'info',
+ '-policyIdx', str(policy_idx),
+ '-waitForMount={}'.format(str(self.mount_check))]
+ if self.use_go_leveldb:
+ losf_args.append('-useGoLevelDB')
+ os.execv(self.losf_bin, losf_args)
diff --git a/swift/obj/reconstructor.py b/swift/obj/reconstructor.py
index 135f9d5f9..ede7ecad7 100644
--- a/swift/obj/reconstructor.py
+++ b/swift/obj/reconstructor.py
@@ -22,7 +22,6 @@ import time
from collections import defaultdict
import six
import six.moves.cPickle as pickle
-import shutil
from eventlet import (GreenPile, GreenPool, Timeout, sleep, tpool, spawn)
from eventlet.support.greenlets import GreenletExit
@@ -30,10 +29,10 @@ from eventlet.support.greenlets import GreenletExit
from swift import gettext_ as _
from swift.common.utils import (
whataremyips, unlink_older_than, compute_eta, get_logger,
- dump_recon_cache, mkdirs, config_true_value,
- GreenAsyncPile, Timestamp, remove_file,
- load_recon_cache, parse_override_options, distribute_evenly,
- PrefixLoggerAdapter, remove_directory)
+ dump_recon_cache, config_true_value,
+ GreenAsyncPile, Timestamp, load_recon_cache,
+ parse_override_options, distribute_evenly, PrefixLoggerAdapter,
+ remove_directory)
from swift.common.header_key_dict import HeaderKeyDict
from swift.common.bufferedhttp import http_connect
from swift.common.daemon import Daemon
@@ -1113,15 +1112,16 @@ class ObjectReconstructor(Daemon):
tmp_path = join(dev_path, get_tmp_dir(int(policy)))
unlink_older_than(tmp_path, time.time() -
df_mgr.reclaim_age)
- if not os.path.exists(obj_path):
+
+ if not df_mgr.exists(obj_path):
try:
- mkdirs(obj_path)
+ df_mgr.mkdirs(obj_path)
except Exception:
self.logger.exception(
'Unable to create %s' % obj_path)
continue
try:
- partitions = os.listdir(obj_path)
+ partitions = df_mgr.listdir(obj_path)
except OSError:
self.logger.exception(
'Unable to list partitions in %r' % obj_path)
@@ -1137,7 +1137,7 @@ class ObjectReconstructor(Daemon):
if not partition.isdigit():
self.logger.warning(
'Unexpected entity in data dir: %r' % part_path)
- self.delete_partition(part_path)
+ self.delete_partition(df_mgr, part_path)
self.reconstruction_part_count += 1
continue
partition = int(partition)
@@ -1194,13 +1194,13 @@ class ObjectReconstructor(Daemon):
self.last_reconstruction_count = -1
self.handoffs_remaining = 0
- def delete_partition(self, path):
- def kill_it(path):
- shutil.rmtree(path, ignore_errors=True)
- remove_file(path)
+ def delete_partition(self, df_mgr, path):
+ def kill_it(df_mgr, path):
+ df_mgr.rmtree(path, ignore_errors=True)
+ df_mgr.remove_file(path)
self.logger.info(_("Removing partition: %s"), path)
- tpool.execute(kill_it, path)
+ tpool.execute(kill_it, df_mgr, path)
def reconstruct(self, **kwargs):
"""Run a reconstruction pass"""
@@ -1230,6 +1230,7 @@ class ObjectReconstructor(Daemon):
# Therefore we know this part a) doesn't belong on
# this node and b) doesn't have any suffixes in it.
self.run_pool.spawn(self.delete_partition,
+ self._df_router[part_info['policy']],
part_info['part_path'])
for job in jobs:
self.run_pool.spawn(self.process_job, job)
diff --git a/swift/obj/replicator.py b/swift/obj/replicator.py
index e3634bb8f..e9d19975a 100644
--- a/swift/obj/replicator.py
+++ b/swift/obj/replicator.py
@@ -16,9 +16,8 @@
from collections import defaultdict
import os
import errno
-from os.path import isdir, isfile, join, dirname
+from os.path import join, dirname
import random
-import shutil
import time
import itertools
from six import viewkeys
@@ -33,7 +32,7 @@ from swift.common.constraints import check_drive
from swift.common.ring.utils import is_local_device
from swift.common.utils import whataremyips, unlink_older_than, \
compute_eta, get_logger, dump_recon_cache, \
- rsync_module_interpolation, mkdirs, config_true_value, \
+ rsync_module_interpolation, config_true_value, \
config_auto_int_value, storage_directory, \
load_recon_cache, PrefixLoggerAdapter, parse_override_options, \
distribute_evenly
@@ -485,9 +484,11 @@ class ObjectReplicator(Daemon):
:param job: a dict containing info about the partition to be replicated
"""
+ df_mgr = self._df_router[job['policy']]
+
def tpool_get_suffixes(path):
- return [suff for suff in os.listdir(path)
- if len(suff) == 3 and isdir(join(path, suff))]
+ return [suff for suff in df_mgr.listdir(path)
+ if len(suff) == 3 and df_mgr.isdir(join(path, suff))]
stats = self.stats_for_dev[job['device']]
stats.attempted += 1
@@ -562,10 +563,10 @@ class ObjectReplicator(Daemon):
failure_dev['device'])
for failure_dev in job['nodes']])
else:
- self.delete_partition(job['path'])
+ self.delete_partition(df_mgr, job['path'])
handoff_partition_deleted = True
elif not suffixes:
- self.delete_partition(job['path'])
+ self.delete_partition(df_mgr, job['path'])
handoff_partition_deleted = True
except (Exception, Timeout):
self.logger.exception(_("Error syncing handoff partition"))
@@ -580,25 +581,26 @@ class ObjectReplicator(Daemon):
self.partition_times.append(time.time() - begin)
self.logger.timing_since('partition.delete.timing', begin)
- def delete_partition(self, path):
+ def delete_partition(self, df_mgr, path):
self.logger.info(_("Removing partition: %s"), path)
try:
- tpool.execute(shutil.rmtree, path)
+ tpool.execute(df_mgr.rmtree, path)
except OSError as e:
if e.errno not in (errno.ENOENT, errno.ENOTEMPTY):
# If there was a race to create or delete, don't worry
raise
def delete_handoff_objs(self, job, delete_objs):
+ df_mgr = self._df_router[job['policy']]
success_paths = []
error_paths = []
for object_hash in delete_objs:
object_path = storage_directory(job['obj_path'], job['partition'],
object_hash)
- tpool.execute(shutil.rmtree, object_path, ignore_errors=True)
+ tpool.execute(df_mgr.rmtree, object_path, ignore_errors=True)
suffix_dir = dirname(object_path)
try:
- os.rmdir(suffix_dir)
+ df_mgr.rmdir(suffix_dir)
success_paths.append(object_path)
except OSError as e:
if e.errno not in (errno.ENOENT, errno.ENOTEMPTY):
@@ -816,13 +818,14 @@ class ObjectReplicator(Daemon):
tmp_path = join(dev_path, get_tmp_dir(policy))
unlink_older_than(tmp_path, time.time() -
df_mgr.reclaim_age)
- if not os.path.exists(obj_path):
+ if not df_mgr.exists(obj_path):
try:
- mkdirs(obj_path)
+ df_mgr.mkdirs(obj_path)
except Exception:
self.logger.exception('ERROR creating %s' % obj_path)
continue
- for partition in os.listdir(obj_path):
+
+ for partition in df_mgr.listdir(obj_path):
if (override_partitions is not None and partition.isdigit()
and int(partition) not in override_partitions):
continue
@@ -937,6 +940,7 @@ class ObjectReplicator(Daemon):
override_partitions=override_partitions,
override_policies=override_policies)
for job in jobs:
+ df_mgr = self._df_router[job['policy']]
dev_stats = self.stats_for_dev[job['device']]
num_jobs += 1
current_nodes = job['nodes']
@@ -964,13 +968,13 @@ class ObjectReplicator(Daemon):
return
try:
- if isfile(job['path']):
+ if df_mgr.isfile(job['path']):
# Clean up any (probably zero-byte) files where a
# partition should be.
self.logger.warning(
'Removing partition directory '
'which was a file: %s', job['path'])
- os.remove(job['path'])
+ df_mgr.remove(job['path'])
continue
except OSError:
continue
diff --git a/swift/obj/rpc_http.py b/swift/obj/rpc_http.py
new file mode 100644
index 000000000..df3ff6e94
--- /dev/null
+++ b/swift/obj/rpc_http.py
@@ -0,0 +1,370 @@
+# Copyright (c) 2010-2012 OpenStack Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# Implements RPC: protobuf over a UNIX domain socket
+
+import socket
+from eventlet.green import httplib
+from swift.obj import fmgr_pb2 as pb
+
+
+class UnixHTTPConnection(httplib.HTTPConnection):
+ """Support for unix domain socket with httplib"""
+
+ def __init__(self, path, host='localhost', port=None, strict=None,
+ timeout=None):
+ httplib.HTTPConnection.__init__(self, host, port=port, strict=strict,
+ timeout=timeout)
+ self.path = path
+
+ def connect(self):
+ sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
+ sock.connect(self.path)
+ self.sock = sock
+
+
+class StatusCode(object):
+ Ok = 200
+ Cancelled = 299
+ InvalidArgument = 400
+ NotFound = 404
+ AlreadyExists = 409
+ PermissionDenied = 403
+ FailedPrecondition = 412
+ Unimplemented = 501
+ Internal = 500
+ Unavailable = 503
+
+
+class RpcError(Exception):
+ def __init__(self, message, code):
+ self.code = code
+ super(RpcError, self).__init__(message)
+
+
+def get_rpc_reply(conn, pb_type):
+ """
+ Read the response from the index server over HTTP. If the status is 200,
+ deserialize the body as a protobuf object and return it.
+ If the status is not 200, raise an RpcError exception.
+ :param conn: HTTP connection to the index server
+ :param pb_type: protobuf type we expect in return
+ :return: protobuf object, or raise an exception if HTTP status is not 200
+ """
+ # if buffering is not set, httplib will call recvfrom() for every char
+ http_response = conn.getresponse(buffering=True)
+ if http_response.status != StatusCode.Ok:
+ raise RpcError(http_response.read(), http_response.status)
+
+ pb_obj = pb_type()
+ pb_obj.ParseFromString(http_response.read())
+ return pb_obj
+
+
+def get_next_offset(socket_path, volume_index, repair_tool=False):
+ """
+ Returns the next offset to use in the volume
+ """
+ volume = pb.GetNextOffsetRequest(volume_index=int(volume_index),
+ repair_tool=repair_tool)
+ conn = UnixHTTPConnection(socket_path)
+ conn.request('POST', '/get_next_offset', volume.SerializeToString())
+ response = get_rpc_reply(conn, pb.GetNextOffsetReply)
+ return response.offset
+
+
+def register_volume(socket_path, partition, volume_type, volume_index,
+ first_obj_offset, state, repair_tool=False):
+ volume = pb.RegisterVolumeRequest(partition=int(partition),
+ type=int(volume_type),
+ volume_index=int(volume_index),
+ offset=first_obj_offset, state=state,
+ repair_tool=repair_tool)
+
+ conn = UnixHTTPConnection(socket_path)
+ conn.request('POST', '/register_volume', volume.SerializeToString())
+ response = get_rpc_reply(conn, pb.RegisterVolumeReply)
+ return response
+
+
+def unregister_volume(socket_path, volume_index):
+ index = pb.UnregisterVolumeRequest(index=volume_index)
+ conn = UnixHTTPConnection(socket_path)
+ conn.request('POST', '/unregister_volume', index.SerializeToString())
+ response = get_rpc_reply(conn, pb.UnregisterVolumeReply)
+ return response
+
+
+def update_volume_state(socket_path, volume_index, new_state,
+ repair_tool=False):
+ state_update = pb.UpdateVolumeStateRequest(volume_index=int(volume_index),
+ state=new_state,
+ repair_tool=repair_tool)
+ conn = UnixHTTPConnection(socket_path)
+ conn.request('POST', '/update_volume_state',
+ state_update.SerializeToString())
+ response = get_rpc_reply(conn, pb.UpdateVolumeStateReply)
+ return response
+
+
+def register_object(socket_path, name, volume_index, offset, next_offset,
+ repair_tool=False):
+ """
+ register a vfile
+ """
+ obj = pb.RegisterObjectRequest(name=str(name),
+ volume_index=int(volume_index),
+ offset=int(offset),
+ next_offset=int(next_offset),
+ repair_tool=repair_tool)
+ conn = UnixHTTPConnection(socket_path)
+ conn.request('POST', '/register_object', obj.SerializeToString())
+ response = get_rpc_reply(conn, pb.RegisterObjectReply)
+ return response
+
+
+def unregister_object(socket_path, name, repair_tool=False):
+ obj = pb.UnregisterObjectRequest(name=str(name), repair_tool=repair_tool)
+ conn = UnixHTTPConnection(socket_path)
+ conn.request('POST', '/unregister_object', obj.SerializeToString())
+ response = get_rpc_reply(conn, pb.UnregisterObjectReply)
+ return response
+
+
+def rename_object(socket_path, name, new_name, repair_tool=False):
+ rename_req = pb.RenameObjectRequest(name=str(name), new_name=str(new_name),
+ repair_tool=repair_tool)
+ conn = UnixHTTPConnection(socket_path)
+ conn.request('POST', '/rename_object', rename_req.SerializeToString())
+ response = get_rpc_reply(conn, pb.RenameObjectReply)
+ return response
+
+
+def quarantine_object(socket_path, name, repair_tool=False):
+ objname = pb.QuarantineObjectRequest(name=str(name),
+ repair_tool=repair_tool)
+ conn = UnixHTTPConnection(socket_path)
+ conn.request('POST', '/quarantine_object', objname.SerializeToString())
+ response = get_rpc_reply(conn, pb.QuarantineObjectReply)
+ return response
+
+
+def unquarantine_object(socket_path, name, repair_tool=False):
+ objname = pb.UnquarantineObjectRequest(name=str(name),
+ repair_tool=repair_tool)
+ conn = UnixHTTPConnection(socket_path)
+ conn.request('POST', '/unquarantine_object', objname.SerializeToString())
+ response = get_rpc_reply(conn, pb.UnquarantineObjectReply)
+ return response
+
+
+def _list_quarantined_ohashes(socket_path, page_token, page_size):
+ """
+ Returns quarantined hashes, with pagination (as with the regular diskfile,
+ they are not below partition/suffix directories)
+ :param socket_path: socket_path for index-server
+ :param page_token: where to start for pagination
+ :param page_size: maximum number of results to be returned
+ :return: A list of quarantined object hashes
+ """
+ req_args = pb.ListQuarantinedOHashesRequest(page_token=str(page_token),
+ page_size=page_size)
+ conn = UnixHTTPConnection(socket_path)
+ conn.request('POST', '/list_quarantined_ohashes',
+ req_args.SerializeToString())
+ response = get_rpc_reply(conn, pb.ListQuarantinedOHashesReply)
+ return response
+
+
+def list_quarantined_ohashes(socket_path, page_size=10000):
+ """
+ Returns all quarantined hashes, wraps _list_quarantined_ohashes so caller
+ does not have to deal with pagination
+ :param socket_path: socket_path
+ :param page_size: page_size to pass to wrapped function
+ :return: an iterator for all quarantined objects
+ """
+ page_token = ""
+ while True:
+ response = _list_quarantined_ohashes(socket_path, page_token,
+ page_size)
+ for r in response.objects:
+ yield (r)
+ page_token = response.next_page_token
+ if not page_token:
+ break
+
+
+def _list_objects_by_volume(socket_path, volume_index, quarantined, page_token,
+ page_size, repair_tool=False):
+ """
+ Returns objects within the volume, either quarantined or not, with
+ pagination.
+ :param socket_path: socket_path for index-server
+ :param volume_index: index of the volume for which to list objects
+ :param quarantined: if true, returns quarantined objects. if false, returns
+ non-quarantined objects.
+ :param page_token: where to start for pagination
+ :param page_size: maximum number of results to be returned
+ :param repair_tool: set to true if caller is a repair tool
+ :return: A list of objects for the volume
+ """
+ req_args = pb.LoadObjectsByVolumeRequest(index=volume_index,
+ quarantined=quarantined,
+ page_token=page_token,
+ page_size=page_size,
+ repair_tool=repair_tool)
+ conn = UnixHTTPConnection(socket_path)
+ conn.request('POST', '/load_objects_by_volume',
+ req_args.SerializeToString())
+ response = get_rpc_reply(conn, pb.LoadObjectsByVolumeReply)
+ return response
+
+
+def list_objects_by_volume(socket_path, volume_index, quarantined=False,
+ page_size=10000, repair_tool=False):
+ page_token = ""
+ while True:
+ response = _list_objects_by_volume(socket_path, volume_index,
+ quarantined, page_token, page_size,
+ repair_tool)
+ for r in response.objects:
+ yield (r)
+ page_token = response.next_page_token
+ if not page_token:
+ break
+
+
+def list_quarantined_ohash(socket_path, prefix, repair_tool=False):
+ len_prefix = len(prefix)
+ prefix = pb.ListQuarantinedOHashRequest(prefix=str(prefix),
+ repair_tool=repair_tool)
+ conn = UnixHTTPConnection(socket_path)
+ conn.request('POST', '/list_quarantined_ohash', prefix.SerializeToString())
+ response = get_rpc_reply(conn, pb.ListQuarantinedOHashReply)
+
+ # Caller expects object names without the prefix, similar
+ # to os.listdir, not actual objects.
+ objnames = []
+ for obj in response.objects:
+ objnames.append(obj.name[len_prefix:])
+
+ return objnames
+
+
+# listdir like function for the KV
+def list_prefix(socket_path, prefix, repair_tool=False):
+ len_prefix = len(prefix)
+ prefix = str(prefix)
+ prefix = pb.LoadObjectsByPrefixRequest(prefix=prefix,
+ repair_tool=repair_tool)
+ conn = UnixHTTPConnection(socket_path)
+ conn.request('POST', '/load_objects_by_prefix', prefix.SerializeToString())
+ response = get_rpc_reply(conn, pb.LoadObjectsByPrefixReply)
+ # response.objets is an iterable
+ # TBD, caller expects object names without the prefix, similar
+ # to os.listdir, not actual objects.
+ # Fix this in the rpc server
+ # return response.objects
+ objnames = []
+ for obj in response.objects:
+ objnames.append(obj.name[len_prefix:])
+
+ return objnames
+
+
+def get_object(socket_path, name, is_quarantined=False, repair_tool=False):
+ """
+ returns an object given its whole key
+ """
+ object_name = pb.LoadObjectRequest(name=str(name),
+ is_quarantined=is_quarantined,
+ repair_tool=repair_tool)
+ conn = UnixHTTPConnection(socket_path)
+ conn.request('POST', '/load_object', object_name.SerializeToString())
+ response = get_rpc_reply(conn, pb.LoadObjectReply)
+ return response
+
+
+def list_partitions(socket_path, partition_bits):
+ list_partitions_req = pb.ListPartitionsRequest(
+ partition_bits=partition_bits)
+ conn = UnixHTTPConnection(socket_path)
+ conn.request('POST', '/list_partitions',
+ list_partitions_req.SerializeToString())
+ response = get_rpc_reply(conn, pb.DirEntries)
+ return response.entry
+
+
+def list_partition(socket_path, partition, partition_bits):
+ list_partition_req = pb.ListPartitionRequest(partition=partition,
+ partition_bits=partition_bits)
+ conn = UnixHTTPConnection(socket_path)
+ conn.request('POST', '/list_partition',
+ list_partition_req.SerializeToString())
+ response = get_rpc_reply(conn, pb.DirEntries)
+ return response.entry
+
+
+def list_suffix(socket_path, partition, suffix, partition_bits):
+ suffix = str(suffix)
+ list_suffix_req = pb.ListSuffixRequest(partition=partition,
+ suffix=suffix,
+ partition_bits=partition_bits)
+ conn = UnixHTTPConnection(socket_path)
+ conn.request('POST', '/list_suffix', list_suffix_req.SerializeToString())
+ response = get_rpc_reply(conn, pb.DirEntries)
+ return response.entry
+
+
+def list_volumes(socket_path, partition, type, repair_tool=False):
+ list_req = pb.ListVolumesRequest(partition=int(partition), type=type,
+ repair_tool=repair_tool)
+ conn = UnixHTTPConnection(socket_path)
+ conn.request('POST', '/list_volumes', list_req.SerializeToString())
+ response = get_rpc_reply(conn, pb.ListVolumesReply)
+ return response.volumes
+
+
+def get_volume(socket_path, index, repair_tool=False):
+ volume_idx = pb.GetVolumeRequest(index=index, repair_tool=repair_tool)
+ conn = UnixHTTPConnection(socket_path)
+ conn.request('POST', '/get_volume', volume_idx.SerializeToString())
+ response = get_rpc_reply(conn, pb.Volume)
+ return response
+
+
+def get_stats(socket_path):
+ stats_req = pb.GetStatsInfo()
+ conn = UnixHTTPConnection(socket_path)
+ conn.request('POST', '/get_stats', stats_req.SerializeToString())
+ response = get_rpc_reply(conn, pb.GetStatsReply)
+ return response
+
+
+def get_kv_state(socket_path):
+ pb_out = pb.GetKvStateRequest()
+ conn = UnixHTTPConnection(socket_path)
+ conn.request('POST', '/get_kv_state', pb_out.SerializeToString())
+ response = get_rpc_reply(conn, pb.KvState)
+ return response
+
+
+def set_kv_state(socket_path, isClean):
+ conn = UnixHTTPConnection(socket_path)
+ newKvState = pb.KvState(isClean=isClean)
+ conn.request('POST', '/set_kv_state', newKvState.SerializeToString())
+ response = get_rpc_reply(conn, pb.SetKvStateReply)
+ return response
diff --git a/swift/obj/vfile.py b/swift/obj/vfile.py
new file mode 100644
index 000000000..57bacd558
--- /dev/null
+++ b/swift/obj/vfile.py
@@ -0,0 +1,1201 @@
+# Copyright (c) 2010-2012 OpenStack Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+A "vfile" is a virtual file stored in a "volume".
+A volume is an actual file where vfiles are stored.
+vfile names and metadata (xattr) are also stored in the volume.
+"""
+
+import errno
+import fcntl
+import six
+import hashlib
+import re
+from eventlet.green import os
+from swift.obj.header import ObjectHeader, VolumeHeader, ALIGNMENT, \
+ read_volume_header, HeaderException, STATE_OBJ_QUARANTINED, \
+ STATE_OBJ_FILE, write_object_header, \
+ read_object_header, OBJECT_HEADER_VERSION, write_volume_header, \
+ erase_object_header, MAX_OBJECT_HEADER_LEN
+from swift.common.exceptions import DiskFileNoSpace, \
+ DiskFileBadMetadataChecksum
+from swift.common.storage_policy import POLICIES
+from swift.common.utils import fsync, fdatasync, fsync_dir, \
+ fallocate
+from swift.obj import rpc_http as rpc
+from swift.obj.rpc_http import RpcError, StatusCode
+from swift.obj.fmgr_pb2 import STATE_RW
+from swift.obj.meta_pb2 import Metadata
+from swift.obj.diskfile import _encode_metadata
+from swift.common import utils
+from swift.obj.vfile_utils import SwiftPathInfo, get_volume_index, \
+ get_volume_type, next_aligned_offset, SwiftQuarantinedPathInfo, VOSError, \
+ VIOError, VFileException
+
+VCREATION_LOCK_NAME = "volume_creation.lock"
+
+PICKLE_PROTOCOL = 2
+METADATA_RESERVE = 500
+VOL_AND_LOCKS_RE = re.compile(r'v\d{7}(.writelock)?')
+
+
+def increment(logger, counter, count=1):
+ if logger is not None:
+ try:
+ logger.update_stats(counter, count)
+ except Exception:
+ pass
+
+
+class VFileReader(object):
+ """
+ Represents a vfile stored in a volume.
+ """
+ def __init__(self, fp, name, offset, header, metadata, logger):
+ self.fp = fp
+ self.name = name
+ self.offset = offset
+ self._header = header
+ self.metadata = metadata
+ self.logger = logger
+
+ @property
+ def data_size(self):
+ return self._header.data_size
+
+ @classmethod
+ def get_vfile(cls, filepath, logger):
+ """
+ Returns a VFileReader instance from the path expected by swift
+ :param filepath: full path to file
+ :param logger: a logger object
+ """
+ si = SwiftPathInfo.from_path(filepath)
+ if si.type != "file":
+ err_msg = "Not a path to a swift file ({})".format(filepath)
+ raise VIOError(errno.EINVAL, err_msg)
+
+ full_name = si.ohash + si.filename
+ return cls._get_vfile(full_name, si.volume_dir, si.socket_path, logger)
+
+ @classmethod
+ def get_quarantined_vfile(cls, filepath, logger):
+ si = SwiftQuarantinedPathInfo.from_path(filepath)
+
+ if si.type != "file":
+ err_msg = "Not a path to a swift file ({})".format(filepath)
+ raise VIOError(errno.EINVAL, err_msg)
+
+ full_name = si.ohash + si.filename
+ return cls._get_vfile(full_name, si.volume_dir, si.socket_path, logger,
+ is_quarantined=True)
+
+ @classmethod
+ def _get_vfile(cls, name, volume_dir, socket_path, logger,
+ is_quarantined=False, repair_tool=False):
+ """
+ Returns a VFileReader instance
+ :param name: full name: object hash+filename
+ :param volume_dir: directory where the volume is stored
+ :param socket_path: full path to KV socket
+ :param logger: logger object
+ :param is_quarantined: object is quarantined
+ :param repair_tool: True if requests comes from a repair tool
+ """
+ # get the object
+ try:
+ obj = rpc.get_object(socket_path, name,
+ is_quarantined=is_quarantined,
+ repair_tool=repair_tool)
+ except RpcError as e:
+ if e.code == StatusCode.NotFound:
+ raise VIOError(errno.ENOENT,
+ "No such file or directory: {}".format(name))
+ # May need to handle more cases ?
+ raise (e)
+
+ # get the volume file name from the object
+ volume_filename = get_volume_name(obj.volume_index)
+ volume_filepath = os.path.join(volume_dir, volume_filename)
+
+ fp = open(volume_filepath, 'rb')
+ fp.seek(obj.offset)
+ try:
+ header = read_object_header(fp)
+ except HeaderException:
+ fp.seek(obj.offset)
+ data = fp.read(512)
+ if all(c == '\x00' for c in data):
+ # unregister the object here
+ rpc.unregister_object(socket_path, name)
+ msg = "Zeroed header found for {} at offset {} in volume\
+ {}".format(name, obj.offset, volume_filepath)
+ increment(logger, 'vfile.already_punched')
+ raise VFileException(msg)
+ msg = "Failed to read header for {} at offset {} in volume\
+ {}".format(name, obj.offset, volume_filepath)
+ raise VIOError(errno.EIO, msg)
+
+ # check that we have the object we were expecting
+ header_fullname = "{}{}".format(header.ohash, header.filename)
+ if header_fullname != name:
+ # until we journal the renames, after a crash we may not have the
+ # rename in the KV. Handle this here for now
+ non_durable_name = re.sub(r'(#\d+)#d.', r'\1.', header_fullname)
+ if non_durable_name == name:
+ increment(logger, 'vfile.already_renamed')
+ rpc.rename_object(socket_path, name, header_fullname)
+ else:
+ increment(logger, 'vfile.wrong_object_header_name')
+ raise VIOError(errno.EIO,
+ "Wrong object header name. Header: {} Expected:\
+ {}".format(header_fullname, name))
+
+ metadata = read_metadata(fp, obj.offset, header)
+
+ # seek to beginning of data
+ fp.seek(obj.offset + header.data_offset)
+
+ return cls(fp, obj.name, obj.offset, header, metadata, logger)
+
+ def read(self, size=None):
+ """
+ Wraps read to prevent reading beyond the vfile content.
+ """
+ curpos = self.fp.tell()
+ data_size = self._header.data_size
+ data_start_offset = self.offset + self._header.data_offset
+ data_end_offset = data_start_offset + data_size
+
+ if curpos >= data_end_offset or size == 0:
+ return ''
+ if size:
+ if size > data_end_offset - curpos:
+ size = data_end_offset - curpos
+ else:
+ size = data_end_offset - curpos
+
+ buf = self.fp.read(size)
+ return buf
+
+ def seek(self, pos):
+ """
+ Wraps seek to bind offset from the vfile start to its end.
+ """
+ real_data_offset = self.offset + self._header.data_offset
+ real_new_pos = real_data_offset + pos
+ if (real_new_pos < real_data_offset or
+ real_new_pos > real_data_offset + self._header.data_size):
+ raise VIOError(errno.EINVAL, "Invalid seek")
+ self.fp.seek(real_new_pos)
+
+ def tell(self):
+ curpos = self.fp.tell()
+ vpos = curpos - (self.offset + self._header.data_offset)
+ return vpos
+
+ def close(self):
+ self.fp.close()
+
+
+def _may_grow_volume(volume_fd, volume_offset, obj_size, conf, logger):
+ """
+ Grows a volume if needed.
+ if free_space < obj_size + object header len, allocate obj_size padded to
+ volume_alloc_chunk_size
+
+ """
+ volume_alloc_chunk_size = conf['volume_alloc_chunk_size']
+
+ if obj_size is None:
+ obj_size = 0
+
+ volume_size = os.lseek(volume_fd, 0, os.SEEK_END)
+ free_space = volume_size - volume_offset
+
+ obj_header_len = len(ObjectHeader(version=OBJECT_HEADER_VERSION))
+ required_space = obj_header_len + obj_size
+
+ if free_space < required_space:
+ _allocate_volume_space(volume_fd, volume_offset, required_space,
+ volume_alloc_chunk_size, logger)
+
+
+class VFileWriter(object):
+ def __init__(self, datadir, fd, lock_fd, volume_dir,
+ volume_index, header, offset, logger):
+ si = SwiftPathInfo.from_path(datadir)
+
+ self.fd = fd
+ self.lock_fd = lock_fd
+ self.volume_dir = volume_dir
+ self.volume_index = volume_index
+ self.header = header
+ self.offset = offset
+ self.socket_path = si.socket_path
+ self.partition = si.partition
+ # may be used for statsd. Do not use it to log or it will hang the
+ # object-server process. (eventlet)
+ self.logger = logger
+
+ @classmethod
+ def create(cls, datadir, obj_size, conf, logger, extension=None):
+ # parse datadir
+ si = SwiftPathInfo.from_path(datadir)
+
+ if si.type != "ohash":
+ raise VOSError("not a valid object hash path")
+
+ if obj_size is not None:
+ if obj_size < 0:
+ raise VOSError("obj size may not be negative")
+
+ socket_path = os.path.normpath(si.socket_path)
+ volume_dir = os.path.normpath(si.volume_dir)
+
+ # get a writable volume
+ # TODO : check that we fallocate enough if obj_size > volume
+ # chunk alloc size
+ volume_file, lock_file, volume_path = open_or_create_volume(
+ socket_path, si.partition, extension, volume_dir,
+ conf, logger, size=obj_size)
+ volume_index = get_volume_index(volume_path)
+
+ # create object header
+ header = ObjectHeader(version=OBJECT_HEADER_VERSION)
+ # TODO: this is unused, always set to zero.
+ header.ohash = si.ohash
+ header.policy_idx = 0
+ header.data_offset = len(header) + conf['metadata_reserve']
+ header.data_size = 0
+ # requires header v3
+ header.state = STATE_OBJ_FILE
+
+ try:
+ # get offset at which to start writing
+ offset = rpc.get_next_offset(socket_path, volume_index)
+
+ # pre-allocate space if needed
+ _may_grow_volume(volume_file, offset, obj_size, conf, logger)
+
+ # seek to absolute object offset + relative data offset
+ # (we leave space for the header and some metadata)
+ os.lseek(volume_file, offset + header.data_offset,
+ os.SEEK_SET)
+ except Exception:
+ os.close(volume_file)
+ os.close(lock_file)
+ raise
+
+ return cls(datadir, volume_file, lock_file, volume_dir,
+ volume_index, header, offset, logger)
+
+ def commit(self, filename, metadata):
+ """
+ Write the header, metadata, sync, and register vfile in KV.
+ """
+ if self.fd < 0:
+ raise VIOError(errno.EBADF, "Bad file descriptor")
+
+ if not filename:
+ raise VIOError("filename cannot be empty")
+
+ # how much data has been written ?
+ # header.data_offset is relative to the object's offset
+ data_offset = self.offset + self.header.data_offset
+ data_end = os.lseek(self.fd, 0, os.SEEK_CUR)
+ self.header.data_size = data_end - data_offset
+ # FIXME: this is unused message, please fix as expected
+ # txt = "commit: {} data_end {} data_offset: {}"
+
+ self.header.filename = filename
+ # metastr = pickle.dumps(self.metadata, PICKLE_PROTOCOL)
+ # create and populate protobuf object
+ meta = Metadata()
+ enc_metadata = _encode_metadata(metadata)
+ for k, v in enc_metadata.items():
+ meta.attrs.add(key=k, value=v)
+
+ metastr = meta.SerializeToString()
+ metastr_md5 = hashlib.md5(metastr).hexdigest().encode('ascii')
+
+ self.header.metadata_size = len(metastr)
+ self.header.metadata_offset = len(self.header)
+ self.header.metastr_md5 = metastr_md5
+
+ # calculate the end object offset (this includes the padding, if any)
+ # start from data_end, and add: metadata remainder if any, footer,
+ # padding
+
+ # how much reserved metadata space do we have ?
+ # Should be equal to "metadata_reserve"
+ metadata_available_space = (self.header.data_offset -
+ self.header.metadata_offset)
+ metadata_remainder = max(0, self.header.metadata_size -
+ metadata_available_space)
+
+ object_end = data_end + metadata_remainder
+
+ object_end = next_aligned_offset(object_end, ALIGNMENT)
+
+ self.header.total_size = object_end - self.offset
+
+ # write header
+ os.lseek(self.fd, self.offset, os.SEEK_SET)
+ os.write(self.fd, self.header.pack())
+
+ # write metadata, and footer
+ metadata_offset = self.offset + self.header.metadata_offset
+ if self.header.metadata_size > metadata_available_space:
+ os.lseek(self.fd, metadata_offset, os.SEEK_SET)
+ os.write(self.fd, metastr[:metadata_available_space])
+ # metadata does not fit in reserved space,
+ # write the remainder after the data
+ os.lseek(self.fd, data_end, os.SEEK_SET)
+ os.write(self.fd, metastr[metadata_available_space:])
+ else:
+ os.lseek(self.fd, metadata_offset, os.SEEK_SET)
+ os.write(self.fd, metastr)
+
+ # Sanity check, we should not go beyond object_end
+ curpos = os.lseek(self.fd, 0, os.SEEK_CUR)
+ if curpos > object_end:
+ errtxt = "BUG: wrote past object_end! curpos: {} object_end: {}"
+ raise Exception(errtxt.format(curpos, object_end))
+
+ # sync data. fdatasync() is enough, if the volume was just created,
+ # it has been fsync()'ed previously, along with its parent directory.
+ fdatasync(self.fd)
+
+ # register object
+ full_name = "{}{}".format(self.header.ohash, filename)
+ try:
+ rpc.register_object(self.socket_path, full_name, self.volume_index,
+ self.offset, object_end)
+ except RpcError:
+ # If we failed to register the object, erase the header so that it
+ # will not be picked up by the volume checker if there is a crash
+ # or power failure before it gets overwritten by another object.
+ erase_object_header(self.fd, self.offset)
+ raise
+
+ increment(self.logger, 'vfile.vfile_creation')
+ increment(self.logger, 'vfile.total_space_used',
+ self.header.total_size)
+
+
+def open_or_create_volume(socket_path, partition, extension, volume_dir,
+ conf, logger, size=0):
+ """
+ Tries to open or create a volume for writing. If a volume cannot be
+ opened or created, a VOSError exception is raised.
+ :return: volume file descriptor, lock file descriptor, absolute path
+ to volume.
+ """
+ volume_file, lock_file, volume_path = open_writable_volume(socket_path,
+ partition,
+ extension,
+ volume_dir,
+ conf,
+ logger)
+ if not volume_file:
+ # attempt to create new volume for partition
+ try:
+ volume_file, lock_file, volume_path = create_writable_volume(
+ socket_path, partition, extension, volume_dir,
+ conf, logger, size=size)
+ except Exception as err:
+ error_msg = "Failed to open or create a volume for writing: "
+ error_msg += getattr(err, "strerror", "Unknown error")
+ raise VOSError(errno.ENOSPC, error_msg)
+
+ return volume_file, lock_file, volume_path
+
+
+def _create_new_lock_file(volume_dir, logger):
+ creation_lock_path = os.path.join(volume_dir, VCREATION_LOCK_NAME)
+ with open(creation_lock_path, 'w') as creation_lock_file:
+ # this may block
+ fcntl.flock(creation_lock_file, fcntl.LOCK_EX)
+
+ index = get_next_volume_index(volume_dir)
+ next_lock_name = get_lock_file_name(index)
+ next_lock_path = os.path.join(volume_dir, next_lock_name)
+
+ try:
+ lock_file = os.open(next_lock_path,
+ os.O_CREAT | os.O_EXCL | os.O_WRONLY, 0o600)
+ except OSError:
+ increment(logger, 'vfile.volume_creation.fail_other')
+ raise
+
+ try:
+ fcntl.flock(lock_file, fcntl.LOCK_EX | fcntl.LOCK_NB)
+ except IOError:
+ increment(logger, 'vfile.volume_creation.fail_other')
+ os.close(lock_file)
+ os.unlink(next_lock_path)
+ raise
+
+ return index, next_lock_path, lock_file
+
+
+# create a new volume
+def create_writable_volume(socket_path, partition, extension, volume_dir,
+ conf, logger, state=STATE_RW, size=0):
+ """
+ Creates a new writable volume, and associated lock file.
+ returns a volume_file, a lock_file, and the index of the volume that has
+ been created.
+ If the extension is specified, a specific volume may be used.
+ (Currently, .ts files go to separate volumes as they are short-lived, in
+ order to limit fragmentation)
+ state can be STATE_RW (new RW volume) or STATE_COMPACTION_TARGET (new empty
+ volume which will be used for compaction, and to which new objects cannot
+ be written).
+ size is the space that should be allocated to the volume (in addition to
+ the volume header)
+ """
+
+ if size is None:
+ size = 0
+
+ # Check if we have exceeded the allowed volume count for this partition
+ # Move this check below with the lock held ? (now, we may have
+ # a few extra volumes)
+ volume_type = get_volume_type(extension)
+ volumes = rpc.list_volumes(socket_path, partition, volume_type)
+ max_volume_count = conf['max_volume_count']
+ if len(volumes) >= max_volume_count:
+ err_txt = ("Maximum count of volumes reached for partition:"
+ " {} type: {}".format(partition, volume_type))
+ increment(logger, 'vfile.volume_creation.fail_count_exceeded')
+ raise VOSError(errno.EDQUOT, err_txt)
+
+ try:
+ os.makedirs(volume_dir)
+ except OSError as err:
+ if err.errno == errno.EEXIST:
+ pass
+ else:
+ raise
+
+ index, next_lock_path, lock_file = _create_new_lock_file(
+ volume_dir, logger)
+
+ # create the volume
+ next_volume_name = get_volume_name(index)
+ next_volume_path = os.path.join(volume_dir, next_volume_name)
+
+ vol_header = VolumeHeader()
+ vol_header.volume_idx = index
+ vol_header.type = volume_type
+ vol_header.partition = int(partition)
+ # first object alignment
+ vol_header.first_obj_offset = len(vol_header) + (
+ ALIGNMENT - len(vol_header) % ALIGNMENT)
+ vol_header.state = state
+
+ # How much space is needed for the object ? (assuming metadata fits in the
+ # reserved space, but we cannot know this in advance)
+ alloc_size = vol_header.first_obj_offset + size
+ volume_alloc_chunk_size = conf['volume_alloc_chunk_size']
+
+ try:
+ volume_file = os.open(next_volume_path,
+ os.O_CREAT | os.O_EXCL | os.O_WRONLY, 0o600)
+ _allocate_volume_space(volume_file, 0, alloc_size,
+ volume_alloc_chunk_size, logger)
+
+ # Write volume header
+ write_volume_header(vol_header, volume_file)
+
+ # If the uploader is slow to send data to the object server, a crash
+ # may occur before the object is received and a call to fsync() is
+ # issued. We end up with volumes without a header.
+ # Issue a fsync() here, at the cost of performance early on. As
+ # partitions get volumes we switch to open_writable_volume, avoiding
+ # the fsync.
+ fsync(volume_file)
+ fsync_dir(volume_dir)
+
+ # Register volume
+ rpc.register_volume(socket_path, partition, vol_header.type, index,
+ vol_header.first_obj_offset, vol_header.state)
+ except Exception:
+ os.close(lock_file)
+ os.close(volume_file)
+ os.unlink(next_lock_path)
+ os.unlink(next_volume_path)
+ increment(logger, 'vfile.volume_creation.fail_other')
+ raise
+
+ increment(logger, 'vfile.volume_creation.ok')
+ return volume_file, lock_file, next_volume_path
+
+
+def _allocate_volume_space(volume_fd, offset, length, volume_alloc_chunk_size,
+ logger, ignore_error=False):
+ """
+ Will pre-allocate space for the volume given the offset and length,
+ aligned to volume_alloc_chunk_size.
+ May ignore an OSError
+ :param volume_fd: file descriptor of the volume
+ :param offset: offset from which to grow the volume
+ :param length: length to grow, relative to offset
+ :param volume_alloc_chunk_size: pad length to align to this chunk size
+ :param ignore_error: ignore OSError
+ :return:
+ """
+ try:
+ alloc_size = next_aligned_offset(length, volume_alloc_chunk_size)
+ fallocate(volume_fd, alloc_size, offset)
+ increment(logger, 'vfile.volume_alloc_space',
+ alloc_size)
+ except OSError as err:
+ if not ignore_error:
+ if err.errno in (errno.ENOSPC, errno.EDQUOT):
+ raise DiskFileNoSpace()
+ raise
+
+
+def delete_volume(socket_path, volume_path, logger):
+ """
+ Deletes a volume from disk and removes entry in the KV
+ """
+ index = get_volume_index(volume_path)
+ volume_lock_path = "{}.writelock".format(volume_path)
+
+ # Remove KV entry
+ rpc.unregister_volume(socket_path, index)
+
+ # Remove volume and lock
+ os.unlink(volume_path)
+ os.unlink(volume_lock_path)
+
+
+def open_writable_volume(socket_path, partition, extension, volume_dir, conf,
+ logger):
+ """
+ Opens a volume available for writing.
+ returns a volume file, a lock_file, and the volume path
+ :param socket_path: full path to KV socket
+ :param partition: partition name
+ :param extension: file extension
+ """
+ volume_type = get_volume_type(extension)
+ volume_file = None
+ lock_file = None
+ volume_file_path = None
+ # query the KV for available volumes given the partition and type
+ volumes = rpc.list_volumes(socket_path, partition, volume_type)
+
+ # writable candidates are volumes which are in RW state and not too large
+ volumes = [vol for vol in volumes if vol.volume_state == STATE_RW and
+ vol.next_offset < conf['max_volume_size']]
+ volume_files = [get_volume_name(volume.volume_index) for volume in
+ volumes]
+
+ for volume_file_name in volume_files:
+ volume_file_path = os.path.join(volume_dir, volume_file_name)
+ volume_file, lock_file = open_volume(volume_file_path)
+ if volume_file:
+ break
+
+ return volume_file, lock_file, volume_file_path
+
+
+def open_volume(volume_path):
+ """Locks the volume, and returns a fd to the volume and a fd to its lock
+ file. Returns None, None, if it cannot be locked. Raises for any other
+ error.
+ :param volume_path: full path to volume
+ :return: (volume fd, lock fd)
+ """
+ lock_file_path = "{}.writelock".format(volume_path)
+
+ try:
+ lock_file = os.open(lock_file_path, os.O_WRONLY)
+ except OSError as e:
+ if e.errno != errno.ENOENT:
+ raise
+ # if the volume lock file as been removed, create it
+ lock_file = os.open(lock_file_path,
+ os.O_CREAT | os.O_EXCL | os.O_WRONLY, 0o600)
+
+ try:
+ fcntl.flock(lock_file, fcntl.LOCK_EX | fcntl.LOCK_NB)
+ except IOError as err:
+ if err.errno in (errno.EACCES, errno.EAGAIN):
+ # volume is locked
+ os.close(lock_file)
+ return None, None
+ else:
+ try:
+ os.close(lock_file)
+ except Exception:
+ pass
+ raise
+ except Exception:
+ try:
+ os.close(lock_file)
+ except Exception:
+ pass
+ raise
+
+ volume_file = os.open(volume_path, os.O_WRONLY)
+ return volume_file, lock_file
+
+
+def change_volume_state(volume_file_path, state, compaction_target=None):
+ """
+ Changes the volumes state. caller locks the volume
+ TODO: should this handle RPC as well ? (currently done by caller)
+ TODO: take an optional size parameter so we can fallocate() the file
+ :param volume_file_path: path of volume to modify
+ :param state: new state
+ :param compaction_target: ID of the volume compaction target, if applicable
+ """
+ volume_file = open(volume_file_path, "rb+")
+
+ h = read_volume_header(volume_file)
+ h.state = state
+ if compaction_target:
+ h.compaction_target = compaction_target
+ volume_file.seek(0)
+ volume_file.write(h.pack())
+
+
+def get_next_volume_index(volume_dir):
+ """
+ Returns the next volume index to use for the given dir.
+ Caller must hold the volume creation lock.
+ :param volume_dir: volume directory
+ :return: the next volume index to use
+ """
+ dir_entries = os.listdir(volume_dir)
+ # Get all volumes and their lock: a volume should always have a lock,
+ # but a fsck may have removed either. If we find such a case, skip the
+ # index.
+ volumes_and_locks_idxs = set([name[1:8] for name in dir_entries if
+ VOL_AND_LOCKS_RE.match(name)])
+ if len(volumes_and_locks_idxs) < 1:
+ return 1
+
+ # This is about 30% faster than calling int() in the list comprehension
+ # above.
+ idxs = sorted(int(i) for i in volumes_and_locks_idxs)
+
+ # find the first "hole" in the indexes
+ for pos, idx in enumerate(idxs, start=1):
+ if pos != idx:
+ return pos
+
+ # no hole found
+ return idx + 1
+
+
+def get_lock_file_name(index):
+ if index <= 0 or index > 9999999:
+ raise VFileException("invalid lock file index")
+ lock_file_name = "v{0:07d}.writelock".format(index)
+ return lock_file_name
+
+
+def get_volume_name(index):
+ if index <= 0 or index > 9999999:
+ raise VFileException("invalid volume file index")
+ volume_file_name = "v{0:07d}".format(index)
+ return volume_file_name
+
+
+def listdir(path):
+ type_to_func = {
+ 'ohash': _list_ohash,
+ 'suffix': _list_suffix,
+ 'partition': _list_partition,
+ 'partitions': _list_partitions
+ }
+
+ path = os.path.normpath(path)
+ si = SwiftPathInfo.from_path(path)
+
+ # get part power from the ring (needed as we generate "directories" based
+ # on it.
+ if not POLICIES[si.policy_idx].object_ring:
+ POLICIES[si.policy_idx].load_ring('/etc/swift')
+ part_power = 32 - POLICIES[si.policy_idx].object_ring._part_shift
+
+ ret = type_to_func[si.type](si, part_power)
+ return [str(e) for e in ret]
+
+
+def exists(path):
+ """
+ Similar to os.path.exists
+ LOSF manages files below the "objects" directory. if the query is about
+ that directory, use os.path.exists, otherwise check in the KV.
+ It does not really make sense in LOSF context as callers will then issue a
+ "mkdir", which is a noop. But having this means we touch less of the
+ existing code. (diskfile, reconstructor, replicator)
+ :param path: full path to directory
+ :return: True is path exists, False otherwise
+ """
+ si = SwiftPathInfo.from_path(path)
+ if si.type == 'partitions':
+ return os.path.exists(path)
+
+ # if a directory is empty, it does not exist
+ if listdir(path):
+ return True
+ else:
+ return False
+
+ # Does not handle "file"
+
+
+def isdir(path):
+ """
+ Similar to os.path.isdir
+ :param path: full path to directory
+ :return:
+ """
+ si = SwiftPathInfo.from_path(path)
+ if si.type == 'partitions':
+ return os.path.isdir(path)
+ if si.type == 'file':
+ return False
+ if listdir(path):
+ return True
+ else:
+ return False
+
+
+def isfile(path):
+ """
+ Similar to os.path.isfile
+ :param path: full path to directory
+ :return:
+ """
+ si = SwiftPathInfo.from_path(path)
+ if si.type == 'partitions':
+ return os.path.isfile(path)
+ if si.type == 'file':
+ return True
+ return False
+
+
+def mkdirs(path):
+ """
+ Similar to utils.mkdirs
+ Noop, except if the directory is the "objects" directory
+ :param path: full path to directory
+ """
+ si = SwiftPathInfo.from_path(path)
+ if si.type == 'partitions':
+ return utils.mkdirs(path)
+
+
+def list_quarantine(quarantine_path):
+ """
+ Lists all quarantined object hashes for the disk/policy
+ :param quarantine_path: quarantined path
+ :return: a list of quarantined object hashes
+ """
+ si = SwiftQuarantinedPathInfo.from_path(quarantine_path)
+ if si.type != "ohashes":
+ err_msg = "Not a path to a quarantined file ({})".format(
+ quarantine_path)
+ raise VIOError(errno.EINVAL, err_msg)
+ return rpc.list_quarantined_ohashes(si.socket_path)
+
+
+def list_quarantined_ohash(quarantined_ohash_path):
+ si = SwiftQuarantinedPathInfo.from_path(quarantined_ohash_path)
+ if si.type != "ohash":
+ err_msg = "Not a path to a quarantined file ({})".format(
+ quarantined_ohash_path)
+ raise VIOError(errno.EINVAL, err_msg)
+ return rpc.list_quarantined_ohash(si.socket_path, si.ohash)
+
+
+def _list_ohash(si, part_power):
+ """
+ :param si: SwiftPathInfo object
+ :param part_power:
+ :return: list of files within the object directory
+ """
+ return rpc.list_prefix(si.socket_path, si.ohash)
+
+
+def _list_suffix(si, part_power):
+ """
+ :param si: SwiftPathInfo object
+ :param part_power:
+ :return: list of object hashes directory within the suffix directory
+ """
+ return rpc.list_suffix(si.socket_path, int(si.partition),
+ si.suffix, part_power)
+
+
+def _list_partition(si, part_power):
+ """
+ :param si: SwiftPathInfo object
+ :param part_power:
+ :return: list of suffixes within the partition
+ """
+ return rpc.list_partition(si.socket_path, int(si.partition),
+ part_power)
+
+
+def _list_partitions(si, part_power):
+ """
+ :param si: SwiftPathInfo object
+ :param part_power:
+ :return: list of partitions
+ """
+ return rpc.list_partitions(si.socket_path, part_power)
+
+
+def set_header_state(socket_path, name, quarantine):
+ """
+ Set a vfile header state (quarantined or not)
+ :param name: full name
+ :param socket_path: socket path
+ :param quarantine: True to quarantine, False to unquarantine
+ :return:
+ """
+ try:
+ obj = rpc.get_object(socket_path, name, is_quarantined=not quarantine,
+ repair_tool=False)
+ except RpcError as e:
+ if e.code == StatusCode.NotFound:
+ raise IOError("No such file or directory: {}".format(name))
+ raise (e)
+
+ volume_filename = get_volume_name(obj.volume_index)
+ volume_dir = socket_path.replace("rpc.socket", "volumes")
+ volume_filepath = os.path.join(volume_dir, volume_filename)
+ with open(volume_filepath, 'r+b') as fp:
+ fp.seek(obj.offset)
+ try:
+ header = read_object_header(fp)
+ except HeaderException:
+ # until we journal the deletes, after a crash we may have an entry
+ # for an object that has been "punched" from the volume.
+ # if we find a hole instead of the header, remove entry from
+ # kv and return.
+ fp.seek(obj.offset)
+ data = fp.read(MAX_OBJECT_HEADER_LEN)
+ if all(c == '\x00' for c in data):
+ # unregister the object here
+ rpc.unregister_object(socket_path, name)
+ return
+ msg = "Failed to read header for {} at offset {} in volume\
+ {}".format(name, obj.offset, volume_filepath)
+ raise VFileException(msg)
+ if quarantine:
+ header.state = STATE_OBJ_QUARANTINED
+ else:
+ header.state = STATE_OBJ_FILE
+ fp.seek(obj.offset)
+ write_object_header(header, fp)
+
+
+def quarantine_ohash(dirpath, policy):
+ """
+ Quarantine the object (all files below the object hash directory)
+ :param dirpath: path to object directory
+ :param policy: policy
+ :return:
+ """
+ si = SwiftPathInfo.from_path(dirpath)
+ if si.type != 'ohash':
+ raise VFileException("dirpath not an object dir: {}".format(dirpath))
+
+ if policy.policy_type == 'erasure_coding':
+ sort_f = lambda x: utils.Timestamp(x.split('#')[0])
+ else:
+ sort_f = lambda x: utils.Timestamp(os.path.splitext(x)[0])
+
+ final = []
+ vfiles = listdir(dirpath)
+
+ try:
+ for ext in ['.data', '.meta', '.ts']:
+ partial = [v for v in vfiles if os.path.splitext(v)[1] == ext]
+ partial.sort(key=sort_f)
+ final.extend(partial)
+ except Exception:
+ final = vfiles
+
+ for vfile in final:
+ vfilepath = os.path.join(dirpath, vfile)
+ sif = SwiftPathInfo.from_path(vfilepath)
+ full_name = sif.ohash + sif.filename
+ # update header
+ set_header_state(sif.socket_path, full_name, quarantine=True)
+ try:
+ # update KV
+ rpc.quarantine_object(si.socket_path, full_name)
+ except RpcError as e:
+ if e.code == StatusCode.NotFound:
+ errmsg = "No such file or directory: '{}'"
+ raise OSError(2, errmsg.format(vfilepath))
+ raise(e)
+
+
+def unquarantine_ohash(socket_path, ohash):
+ """
+ Unquarantine the object (all files below the object hash directory).
+ Is this needed? Used for tests but currently not called from anywhere
+ :param socket_path: path to KV socket
+ :param ohash: object hash
+ """
+ for objname in rpc.list_quarantined_ohash(socket_path, ohash):
+ full_name = "{}{}".format(ohash, objname)
+ set_header_state(socket_path, full_name, quarantine=False)
+ try:
+ rpc.unquarantine_object(socket_path, full_name)
+ except RpcError as e:
+ if e.code == StatusCode.NotFound:
+ errmsg = "No such file or directory: '{}'"
+ raise OSError(2, errmsg.format(full_name))
+ raise(e)
+
+
+# def exists(path):
+# """
+# :param filepath: path to vfile
+# :return: True if file exists, False otherwise
+# """
+# si = SwiftPathInfo.from_path(path)
+# if si.type == 'partitions':
+# os.path.exists
+# try:
+# VFileReader.get_vfile(path, None)
+# return True
+# except RpcError as e:
+# if e.code == StatusCode.NotFound:
+# return False
+# raise (e)
+
+
+def rmtree(path):
+ """
+ Delete a directory recursively. (Actually, it only delete objects, as
+ directories do not exist)
+ :param path: path to the "directory" to remove
+ :param logger:
+ """
+ type_to_func = {
+ 'ohash': _rmtree_ohash,
+ 'suffix': _rmtree_suffix,
+ 'partition': _rmtree_partition
+ # 'partitions': _rmtree_partitions
+ }
+
+ path = os.path.normpath(path)
+ si = SwiftPathInfo.from_path(path)
+ type_to_func[si.type](path)
+
+
+def _rmtree_ohash(path):
+ files = listdir(path)
+ for name in files:
+ filepath = os.path.join(path, name)
+ delete_vfile_from_path(filepath)
+
+
+def _rmtree_suffix(path):
+ ohashes = listdir(path)
+ for ohash in ohashes:
+ ohashpath = os.path.join(path, ohash)
+ _rmtree_ohash(ohashpath)
+
+
+def _rmtree_partition(path):
+ suffixes = listdir(path)
+ for suffix in suffixes:
+ suffixpath = os.path.join(path, suffix)
+ _rmtree_suffix(suffixpath)
+
+
+def delete_vfile_from_path(filepath):
+ si = SwiftPathInfo.from_path(filepath)
+ full_name = si.ohash + si.filename
+
+ def _unregister_object(socket_path, name, volume_index, offset, size):
+ try:
+ rpc.unregister_object(socket_path, name)
+ except RpcError as e:
+ if e.code == StatusCode.NotFound:
+ raise VOSError(errno.ENOENT, "No such file or directory:\
+ '{}'".format(filepath))
+ raise(e)
+
+ try:
+ obj = rpc.get_object(si.socket_path, full_name)
+ except RpcError as e:
+ if e.code == StatusCode.NotFound:
+ raise VOSError(errno.ENOENT, "No such file or directory:\
+ '{}'".format(filepath))
+ volume_filename = get_volume_name(obj.volume_index)
+ volume_filepath = os.path.join(si.volume_dir, volume_filename)
+
+ with open(volume_filepath, 'r+b') as fp:
+ # get object length
+ fp.seek(obj.offset)
+ try:
+ header = read_object_header(fp)
+ except HeaderException:
+ # until we journal the deletes, after a crash we may have an entry
+ # for an object that has been "punched" from the volume.
+ # if we find a hole instead of the header, remove entry from
+ # kv and return.
+ fp.seek(obj.offset)
+ data = fp.read(MAX_OBJECT_HEADER_LEN)
+ if all(c == '\x00' for c in data):
+ # unregister the object here
+ _unregister_object(si.socket_path, full_name,
+ obj.volume_index, obj.offset, 0)
+ return
+
+ msg = "Failed to read header for {} at offset {} in volume\
+ {}".format(full_name, obj.offset, volume_filepath)
+ raise VFileException(msg)
+
+ # check that we have the object we were expecting
+ header_fullname = "{}{}".format(header.ohash, header.filename)
+ if header_fullname != full_name:
+ # until we journal the renames, after a crash we may not have the
+ # rename in the KV. If that's the case, continue.
+ non_durable_name = re.sub(r'(#\d+)#d.', r'\1.', header_fullname)
+ if non_durable_name != full_name:
+ raise VFileException(
+ "Wrong header name. Header: {} Expected: {}".format(
+ header_fullname, full_name))
+ utils.punch_hole(fp.fileno(), obj.offset, header.total_size)
+
+ _unregister_object(si.socket_path, full_name, obj.volume_index,
+ obj.offset, header.total_size)
+
+
+# delete an object hash directory
+def rmtree_ohash(path):
+ pass
+
+
+def read_metadata(fp, offset, header):
+ """
+ Reads vfile metadata
+ :param fp: opened file
+ :param offset: absolute offset to the beginning of the vfile
+ :param header: vfile header
+ :return: metadata dict
+ """
+ metadata_offset = offset + header.metadata_offset
+ metadata_size = header.metadata_size
+ data_offset = offset + header.data_offset
+ data_end = offset + header.data_offset + header.data_size
+ metadata_available_space = data_offset - metadata_offset
+
+ fp.seek(metadata_offset)
+ if metadata_size > metadata_available_space:
+ metastr = fp.read(metadata_available_space)
+ fp.seek(data_end)
+ metastr += fp.read(metadata_size - metadata_available_space)
+ else:
+ metastr = fp.read(metadata_size)
+
+ # Verify checksum, if any
+ if hasattr(header, 'metastr_md5'):
+ metadata_checksum = header.metastr_md5
+ computed_checksum = hashlib.md5(metastr).hexdigest().encode('ascii')
+ if metadata_checksum != computed_checksum:
+ raise DiskFileBadMetadataChecksum(
+ "Metadata checksum mismatch for %s: "
+ "stored checksum='%s', computed='%s'" % (
+ header.filename, metadata_checksum, computed_checksum))
+ else:
+ # we don't support updating from the older format for now
+ pass
+
+ meta = Metadata()
+ meta.ParseFromString(metastr)
+ metadata = {}
+ for attr in meta.attrs:
+ if attr.key:
+ if six.PY2:
+ metadata[attr.key] = attr.value
+ else:
+ metadata[attr.key.decode('utf8', 'surrogateescape')] = \
+ attr.value.decode('utf8', 'surrogateescape')
+
+ return metadata
+
+
+def rename_vfile(filepath, newfilepath, logger):
+ """
+ Renames a vfile. All writes to the KV are asynchronous. If we were to make
+ a synchronous WriteBatch call, all previous writes would also be synced,
+ killing performance. See:
+ https://github.com/google/leveldb/blob/master/doc/index.md
+ Currently :
+ - update the header in place , synchronously
+ - update the KV asynchronously (delete, put)
+
+ A file can only be renamed within a KV
+ This is currently only used by the erasure code diskfile manager
+ """
+ # Get current file info
+ si = SwiftPathInfo.from_path(filepath)
+ full_name = si.ohash + si.filename
+
+ # Get new file info
+ si_new = SwiftPathInfo.from_path(newfilepath)
+ new_full_name = si_new.ohash + si_new.filename
+
+ # Sanity check, same KV
+ if si.socket_path != si_new.socket_path:
+ raise VFileException("attempted to rename a file to a different KV")
+
+ # rename file in place in the header
+ vf_reader = VFileReader._get_vfile(full_name, si.volume_dir,
+ si.socket_path, logger)
+ vf_offset = vf_reader.offset
+ header = vf_reader._header
+ volume_path = vf_reader.fp.name
+ vf_reader.close()
+
+ header.filename = si_new.filename
+
+ vol_fd = os.open(volume_path, os.O_WRONLY)
+ os.lseek(vol_fd, vf_offset, os.SEEK_SET)
+ os.write(vol_fd, header.pack())
+ fdatasync(vol_fd)
+ os.close(vol_fd)
+
+ # Update the KV (async)
+ try:
+ rpc.rename_object(si.socket_path, full_name, new_full_name,
+ si.partition)
+ except RpcError as e:
+ if e.code == StatusCode.NotFound:
+ raise VIOError(errno.ENOENT,
+ "No such file or directory: {}".format(full_name))
+ else:
+ raise
diff --git a/swift/obj/vfile_utils.py b/swift/obj/vfile_utils.py
new file mode 100644
index 000000000..8b9036b6c
--- /dev/null
+++ b/swift/obj/vfile_utils.py
@@ -0,0 +1,228 @@
+# Copyright (c) 2010-2012 OpenStack Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import os
+import os.path
+import pwd
+import re
+
+from swift.common.storage_policy import split_policy_string
+from swift.obj.fmgr_pb2 import VOLUME_DEFAULT, VOLUME_TOMBSTONE
+
+# regex to extract policy from path (one KV per policy)
+# TODO: use split_policy_string or similar, not re
+policy_re = re.compile(r"^objects(-\d+)?$")
+volume_name_re = re.compile(r"^v\d{7}$")
+losf_name_re = re.compile(r"^losf(-\d+)?$")
+
+
+class VIOError(IOError):
+ """
+ Exceptions are part of the interface, subclass IOError to make it easier
+ to interface with diskfile.py
+ """
+
+
+class VOSError(OSError):
+ """
+ Exceptions are part of the interface, subclass OSError to make it easier
+ to interface with diskfile.py
+ """
+
+
+class VFileException(Exception):
+ pass
+
+
+def get_volume_type(extension):
+ ext_map = {
+ ".ts": VOLUME_TOMBSTONE
+ }
+
+ return ext_map.get(extension, VOLUME_DEFAULT)
+
+
+def valid_volume_name(name):
+ """Returns True if name is a valid volume name, False otherwise"""
+ if volume_name_re.match(name):
+ return True
+ else:
+ return False
+
+
+def valid_losf_name(name):
+ """Returns True if name is a valid losf dir name, False otherwise"""
+ if losf_name_re.match(name):
+ return True
+ else:
+ return False
+
+
+# used by "fsck" to get the socket path from the volume path
+def get_socket_path_from_volume_path(volume_path):
+ volume_path = os.path.normpath(volume_path)
+ volume_dir_path, volume_name = os.path.split(volume_path)
+ losf_path, volume_dir = os.path.split(volume_dir_path)
+ mount_path, losf_dir = os.path.split(losf_path)
+ if volume_dir != "volumes" or not valid_volume_name(volume_name) or \
+ not valid_losf_name(losf_dir):
+ raise ValueError("Invalid volume path")
+
+ socket_path = os.path.join(losf_path, "rpc.socket")
+ return socket_path
+
+
+def get_mountpoint_from_volume_path(volume_path):
+ volume_path = os.path.normpath(volume_path)
+ volume_dir_path, volume_name = os.path.split(volume_path)
+ losf_path, volume_dir = os.path.split(volume_dir_path)
+ mount_path, losf_dir = os.path.split(losf_path)
+ if volume_dir != "volumes" or not valid_volume_name(volume_name) or \
+ not valid_losf_name(losf_dir):
+ raise ValueError("Invalid volume path")
+ return mount_path
+
+
+class SwiftPathInfo(object):
+ def __init__(self, type, socket_path=None, volume_dir=None,
+ policy_idx=None, partition=None, suffix=None, ohash=None,
+ filename=None):
+ self.type = type
+ self.socket_path = socket_path
+ self.volume_dir = volume_dir
+ self.policy_idx = policy_idx
+ self.partition = partition
+ self.suffix = suffix
+ self.ohash = ohash
+ self.filename = filename
+
+ # parses a swift path, returns a SwiftPathInfo instance
+ @classmethod
+ def from_path(cls, path):
+ count_to_type = {
+ 4: "file",
+ 3: "ohash",
+ 2: "suffix",
+ 1: "partition",
+ 0: "partitions" # "objects" directory
+ }
+
+ clean_path = os.path.normpath(path)
+ ldir = clean_path.split(os.sep)
+
+ try:
+ obj_idx = [i for i, elem in enumerate(ldir)
+ if elem.startswith("objects")][0]
+ except IndexError:
+ raise VOSError("cannot parse object directory")
+
+ elements = ldir[(obj_idx + 1):]
+ count = len(elements)
+
+ if count > 4:
+ raise VOSError("cannot parse swift file path")
+
+ _, policy = split_policy_string(ldir[obj_idx])
+ policy_idx = policy.idx
+
+ prefix = os.path.join("/", *ldir[0:obj_idx])
+ m = policy_re.match(ldir[obj_idx])
+ if not m:
+ raise VOSError(
+ "cannot parse object element of directory")
+ if m.group(1):
+ sofsdir = "losf{}".format(m.group(1))
+ else:
+ sofsdir = "losf"
+ socket_path = os.path.join(prefix, sofsdir, "rpc.socket")
+ volume_dir = os.path.join(prefix, sofsdir, "volumes")
+
+ type = count_to_type[count]
+ return cls(type, socket_path, volume_dir, policy_idx, *elements)
+
+
+class SwiftQuarantinedPathInfo(object):
+ def __init__(self, type, socket_path=None, volume_dir=None,
+ policy_idx=None, ohash=None, filename=None):
+ self.type = type
+ self.socket_path = socket_path
+ self.volume_dir = volume_dir
+ self.policy_idx = policy_idx
+ self.ohash = ohash
+ self.filename = filename
+
+ # parses a quarantined path (<device>/quarantined/objects-X or below),
+ # returns a SwiftQuarantinedPathInfo instance
+ @classmethod
+ def from_path(cls, path):
+ count_to_type = {
+ 3: "file",
+ 2: "ohash",
+ 1: "ohashes",
+ }
+
+ clean_path = os.path.normpath(path)
+ ldir = clean_path.split(os.sep)
+
+ try:
+ quar_idx = ldir.index("quarantined")
+ except ValueError:
+ raise VOSError("cannot parse quarantined path %s" %
+ path)
+
+ elements = ldir[(quar_idx + 1):]
+ count = len(elements)
+
+ if count < 1 or count > 3 or "objects" not in elements[0]:
+ raise VOSError("cannot parse quarantined path %s" %
+ path)
+
+ _, policy = split_policy_string(elements[0])
+ policy_idx = policy.idx
+
+ prefix = os.path.join("/", *ldir[:quar_idx])
+ prefix = os.path.join(prefix, elements[0].replace("objects", "losf"))
+ socket_path = os.path.join(prefix, "rpc.socket")
+ volume_dir = os.path.join(prefix, "volumes")
+
+ type = count_to_type[count]
+ return cls(type, socket_path, volume_dir, policy_idx, *elements[1:])
+
+
+def get_volume_index(volume_path):
+ """
+ returns the volume index, either from its basename, or full path
+ """
+ name = os.path.split(volume_path)[1]
+
+ if not valid_volume_name(name):
+ raise ValueError("Invalid volume name")
+
+ index = int(name[1:8])
+ return index
+
+
+# given an offset and alignement, returns the next aligned offset
+def next_aligned_offset(offset, alignment):
+ if offset % alignment != 0:
+ return (offset + (alignment - offset % alignment))
+ else:
+ return offset
+
+
+def change_user(username):
+ pw = pwd.getpwnam(username)
+ uid = pw.pw_uid
+ os.setuid(uid)