diff options
Diffstat (limited to 'swift/obj')
-rw-r--r-- | swift/obj/diskfile.py | 136 | ||||
-rw-r--r-- | swift/obj/fmgr.proto | 225 | ||||
-rw-r--r-- | swift/obj/fmgr_pb2.py | 2119 | ||||
-rw-r--r-- | swift/obj/header.py | 394 | ||||
-rw-r--r-- | swift/obj/kvfile.py | 1260 | ||||
-rw-r--r-- | swift/obj/meta.proto | 14 | ||||
-rw-r--r-- | swift/obj/meta_pb2.py | 115 | ||||
-rw-r--r-- | swift/obj/objectrpcmanager.py | 157 | ||||
-rw-r--r-- | swift/obj/reconstructor.py | 29 | ||||
-rw-r--r-- | swift/obj/replicator.py | 36 | ||||
-rw-r--r-- | swift/obj/rpc_http.py | 370 | ||||
-rw-r--r-- | swift/obj/vfile.py | 1201 | ||||
-rw-r--r-- | swift/obj/vfile_utils.py | 228 |
13 files changed, 6215 insertions, 69 deletions
diff --git a/swift/obj/diskfile.py b/swift/obj/diskfile.py index 68f23b3e4..9e5fdd93f 100644 --- a/swift/obj/diskfile.py +++ b/swift/obj/diskfile.py @@ -37,6 +37,7 @@ import errno import fcntl import json import os +import shutil import re import time import uuid @@ -44,7 +45,6 @@ from hashlib import md5 import logging import traceback import xattr -from os.path import basename, dirname, exists, join, splitext from random import shuffle from tempfile import mkstemp from contextlib import contextmanager @@ -323,11 +323,11 @@ def quarantine_renamer(device_path, corrupted_file_path): if policy is None: # TODO: support a quarantine-unknown location policy = POLICIES.legacy - from_dir = dirname(corrupted_file_path) - to_dir = join(device_path, 'quarantined', - get_data_dir(policy), - basename(from_dir)) - invalidate_hash(dirname(from_dir)) + from_dir = os.path.dirname(corrupted_file_path) + to_dir = os.path.join(device_path, 'quarantined', + get_data_dir(policy), + os.path.basename(from_dir)) + invalidate_hash(os.path.dirname(from_dir)) try: renamer(from_dir, to_dir, fsync=False) except OSError as e: @@ -345,7 +345,7 @@ def read_hashes(partition_dir): :returns: a dict, the suffix hashes (if any), the key 'valid' will be False if hashes.pkl is corrupt, cannot be read or does not exist """ - hashes_file = join(partition_dir, HASH_FILE) + hashes_file = os.path.join(partition_dir, HASH_FILE) hashes = {'valid': False} try: with open(hashes_file, 'rb') as hashes_fp: @@ -378,7 +378,7 @@ def write_hashes(partition_dir, hashes): The updated key is added to hashes before it is written. """ - hashes_file = join(partition_dir, HASH_FILE) + hashes_file = os.path.join(partition_dir, HASH_FILE) # 'valid' key should always be set by the caller; however, if there's a bug # setting invalid is most safe hashes.setdefault('valid', False) @@ -397,7 +397,7 @@ def consolidate_hashes(partition_dir): :returns: a dict, the suffix hashes (if any), the key 'valid' will be False if hashes.pkl is corrupt, cannot be read or does not exist """ - invalidations_file = join(partition_dir, HASH_INVALIDATIONS_FILE) + invalidations_file = os.path.join(partition_dir, HASH_INVALIDATIONS_FILE) with lock_path(partition_dir): hashes = read_hashes(partition_dir) @@ -431,9 +431,9 @@ def invalidate_hash(suffix_dir): invalidating """ - suffix = basename(suffix_dir) - partition_dir = dirname(suffix_dir) - invalidations_file = join(partition_dir, HASH_INVALIDATIONS_FILE) + suffix = os.path.basename(suffix_dir) + partition_dir = os.path.dirname(suffix_dir) + invalidations_file = os.path.join(partition_dir, HASH_INVALIDATIONS_FILE) if not isinstance(suffix, bytes): suffix = suffix.encode('utf-8') with lock_path(partition_dir), open(invalidations_file, 'ab') as inv_fh: @@ -803,7 +803,7 @@ class BaseDiskFileManager(object): validated. """ ts_ctype = None - fname, ext = splitext(filename) + fname, ext = os.path.splitext(filename) try: if ext == '.meta': timestamp, ts_ctype = decode_timestamps( @@ -1032,7 +1032,8 @@ class BaseDiskFileManager(object): for info_key in ('data_info', 'meta_info', 'ts_info', 'ctype_info'): info = results.get(info_key) key = info_key[:-5] + '_file' - results[key] = join(datadir, info['filename']) if info else None + results[key] = os.path.join( + datadir, info['filename']) if info else None if verify: assert self._verify_ondisk_files( @@ -1074,14 +1075,14 @@ class BaseDiskFileManager(object): files, hsh_path, verify=False, **kwargs) if 'ts_info' in results and is_reclaimable( results['ts_info']['timestamp']): - remove_file(join(hsh_path, results['ts_info']['filename'])) + remove_file(os.path.join(hsh_path, results['ts_info']['filename'])) files.remove(results.pop('ts_info')['filename']) for file_info in results.get('possible_reclaim', []): # stray files are not deleted until reclaim-age if is_reclaimable(file_info['timestamp']): results.setdefault('obsolete', []).append(file_info) for file_info in results.get('obsolete', []): - remove_file(join(hsh_path, file_info['filename'])) + remove_file(os.path.join(hsh_path, file_info['filename'])) files.remove(file_info['filename']) results['files'] = files if not files: # everything got unlinked @@ -1135,23 +1136,23 @@ class BaseDiskFileManager(object): raise PathNotDir() raise for hsh in path_contents: - hsh_path = join(path, hsh) + hsh_path = os.path.join(path, hsh) try: ondisk_info = self.cleanup_ondisk_files( hsh_path, policy=policy) except OSError as err: if err.errno == errno.ENOTDIR: - partition_path = dirname(path) - objects_path = dirname(partition_path) - device_path = dirname(objects_path) + partition_path = os.path.dirname(path) + objects_path = os.path.dirname(partition_path) + device_path = os.path.dirname(objects_path) # The made-up filename is so that the eventual dirpath() # will result in this object directory that we care about. # Some failures will result in an object directory # becoming a file, thus causing the parent directory to # be qarantined. - quar_path = quarantine_renamer(device_path, - join(hsh_path, - "made-up-filename")) + quar_path = quarantine_renamer( + device_path, os.path.join( + hsh_path, "made-up-filename")) logging.exception( _('Quarantined %(hsh_path)s to %(quar_path)s because ' 'it is not a directory'), {'hsh_path': hsh_path, @@ -1236,7 +1237,7 @@ class BaseDiskFileManager(object): hashed = 0 dev_path = self.get_dev_path(device) partition_path = get_part_path(dev_path, policy, partition) - hashes_file = join(partition_path, HASH_FILE) + hashes_file = os.path.join(partition_path, HASH_FILE) modified = False orig_hashes = {'valid': False} @@ -1278,7 +1279,7 @@ class BaseDiskFileManager(object): hashes.update((suffix, None) for suffix in recalculate) for suffix, hash_ in list(hashes.items()): if not hash_: - suffix_dir = join(partition_path, suffix) + suffix_dir = os.path.join(partition_path, suffix) try: hashes[suffix] = self._hash_suffix( suffix_dir, policy=policy) @@ -1322,7 +1323,7 @@ class BaseDiskFileManager(object): """ if mount_check is False: # explicitly forbidden from syscall, just return path - return join(self.devices, device) + return os.path.join(self.devices, device) # we'll do some kind of check if not explicitly forbidden try: return check_drive(self.devices, device, @@ -1471,9 +1472,9 @@ class BaseDiskFileManager(object): # Some failures will result in an object directory # becoming a file, thus causing the parent directory to # be qarantined. - quar_path = self.quarantine_renamer(dev_path, - join(object_path, - "made-up-filename")) + quar_path = self.quarantine_renamer( + dev_path, os.path.join( + object_path, "made-up-filename")) logging.exception( _('Quarantined %(object_path)s to %(quar_path)s because ' 'it is not a directory'), {'object_path': object_path, @@ -1529,6 +1530,63 @@ class BaseDiskFileManager(object): path, err) return [] + def exists(self, path): + """ + :param path: full path to directory + """ + return os.path.exists(path) + + def mkdirs(self, path): + """ + :param path: full path to directory + """ + return mkdirs(path) + + def listdir(self, path): + """ + :param path: full path to directory + """ + return os.listdir(path) + + def rmtree(self, path, ignore_errors=False): + """ + :param path: full path to directory + :param ignore_errors: if True, ignore errors from failed removals, + else, raise an exception. + """ + return shutil.rmtree(path, ignore_errors) + + def remove_file(self, path): + """ + quiet wrapper around os.unlink. can be merged with remove? + :param path: full path to directory + """ + return remove_file(path) + + def remove(self, path): + """ + :param path: full path to directory + """ + return os.remove(path) + + def isdir(self, path): + """ + :param path: full path to directory + """ + return os.path.isdir(path) + + def isfile(self, path): + """ + :param path: full path to directory + """ + return os.path.isfile(path) + + def rmdir(self, path): + """ + :param path: full path to directory + """ + return os.rmdir(path) + def yield_suffixes(self, device, partition, policy): """ Yields tuples of (full_path, suffix_only) for suffixes stored @@ -1701,9 +1759,9 @@ class BaseDiskFileWriter(object): else: raise if not self.manager.use_linkat: - tmpdir = join(self._diskfile._device_path, - get_tmp_dir(self._diskfile.policy)) - if not exists(tmpdir): + tmpdir = os.path.join(self._diskfile._device_path, + get_tmp_dir(self._diskfile.policy)) + if not os.path.exists(tmpdir): mkdirs(tmpdir) fd, tmppath = mkstemp(dir=tmpdir) return fd, tmppath @@ -1792,7 +1850,7 @@ class BaseDiskFileWriter(object): # drop_cache() after fsync() to avoid redundant work (pages all # clean). drop_buffer_cache(self._fd, 0, self._upload_size) - self.manager.invalidate_hash(dirname(self._datadir)) + self.manager.invalidate_hash(os.path.dirname(self._datadir)) # After the rename/linkat completes, this object will be available for # requests to reference. if self._tmppath: @@ -1852,7 +1910,7 @@ class BaseDiskFileWriter(object): timestamp, self._extension, ctype_timestamp=ctype_timestamp, *a, **kw) metadata['name'] = self._name - target_path = join(self._datadir, filename) + target_path = os.path.join(self._datadir, filename) tpool.execute(self._finalize_put, metadata, target_path, cleanup) @@ -2294,7 +2352,7 @@ class BaseDiskFile(object): self._account = None self._container = None self._obj = None - self._tmpdir = join(device_path, get_tmp_dir(policy)) + self._tmpdir = os.path.join(device_path, get_tmp_dir(policy)) self._ondisk_info = None self._metadata = None self._datafile_metadata = None @@ -2307,7 +2365,7 @@ class BaseDiskFile(object): self._datadir = _datadir else: name_hash = hash_path(account, container, obj) - self._datadir = join( + self._datadir = os.path.join( device_path, storage_directory(get_data_dir(policy), partition, name_hash)) @@ -3111,7 +3169,7 @@ class ECDiskFileWriter(BaseDiskFileWriter): :raises DiskFileError: if the diskfile frag_index has not been set (either during initialisation or a call to put()) """ - data_file_path = join( + data_file_path = os.path.join( self._datadir, self.manager.make_on_disk_filename( timestamp, '.data', self._diskfile._frag_index)) durable_data_file_path = os.path.join( @@ -3270,7 +3328,7 @@ class ECDiskFile(BaseDiskFile): timestamp, ext='.data', frag_index=frag_index, durable=True) remove_file(os.path.join(self._datadir, purge_file)) remove_directory(self._datadir) - self.manager.invalidate_hash(dirname(self._datadir)) + self.manager.invalidate_hash(os.path.dirname(self._datadir)) class ECDiskFileManager(BaseDiskFileManager): @@ -3354,7 +3412,7 @@ class ECDiskFileManager(BaseDiskFileManager): validated. """ frag_index = None - float_frag, ext = splitext(filename) + float_frag, ext = os.path.splitext(filename) if ext == '.data': parts = float_frag.split('#') try: diff --git a/swift/obj/fmgr.proto b/swift/obj/fmgr.proto new file mode 100644 index 000000000..2604305bb --- /dev/null +++ b/swift/obj/fmgr.proto @@ -0,0 +1,225 @@ +syntax = "proto3"; + +package filemgr; + +// Python: protoc -I. --python_out=. fmgr.proto +// Golang : protoc -I proto proto/fmgr.proto --go_out=proto + +message RegisterVolumeRequest { + uint32 partition = 1; // Swift partition + VolumeType type = 2; + uint32 volume_index = 3; + uint64 offset = 4; // Next available offset to use in the volume. + VolumeState state = 5; + bool repair_tool = 6; // Request is coming from a repair tool +} + +message RegisterVolumeReply {} + +message UnregisterVolumeRequest { + uint32 index = 1; + bool repair_tool = 2; +} + +message UnregisterVolumeReply {} + +message UpdateVolumeStateRequest { + uint32 volume_index = 1; + VolumeState state = 2; + bool repair_tool = 3; +} + +message UpdateVolumeStateReply {} + +message GetVolumeRequest { + uint32 index = 1; + bool repair_tool = 2; +} + +message GetVolumeReply { + uint32 volume_index = 1; + VolumeType volume_type = 2; + uint32 volume_state = 3; + uint32 partition = 4; + uint64 next_offset = 5; +} + +message ListVolumesRequest { + uint32 partition = 1; + VolumeType type = 2; + bool repair_tool = 3; +} + +message ListVolumesReply { + repeated Volume volumes = 1; +} + +message RegisterObjectRequest { + bytes name = 1; + uint32 volume_index = 2; + uint64 offset = 3; // Object offset within volume + uint64 next_offset = 4; // Next offset to start from in the volume + bool repair_tool = 5; +} + +message RegisterObjectReply {} + +message UnregisterObjectRequest { + bytes name = 1; + bool repair_tool = 2; +} + +message UnregisterObjectReply {} + +message RenameObjectRequest { + bytes name = 1; + bytes new_name = 2; + bool repair_tool = 3; +} + +message RenameObjectReply {} + +message LoadObjectRequest { + bytes name = 1; + bool is_quarantined = 2; + bool repair_tool = 3; +} + +message LoadObjectReply { + bytes name = 1; + uint32 volume_index = 2; + uint64 offset = 3; +} + +message QuarantineObjectRequest { + bytes name = 1; + bool repair_tool = 2; +} + +message QuarantineObjectReply {} + +message UnquarantineObjectRequest { + bytes name = 1; + bool repair_tool = 2; +} + +message UnquarantineObjectReply {} + +message LoadObjectsByPrefixRequest { + bytes prefix = 1; + bool repair_tool = 2; +} + +message LoadObjectsByPrefixReply { + repeated Object objects = 1; +} + +message LoadObjectsByVolumeRequest { + uint32 index = 1; + bool quarantined = 2; // List only quarantined files, if true + bytes page_token = 3; + uint32 page_size = 4; + bool repair_tool = 5; +} + +message LoadObjectsByVolumeReply { + repeated Object objects = 1; + bytes next_page_token = 2; +} + +message ListPartitionsRequest { + uint32 partition_bits = 1; +} + +message ListPartitionRequest { + uint32 partition = 1; + uint32 partition_bits = 2; +} + +message ListSuffixRequest { + uint32 partition = 1; + bytes suffix = 2; + uint32 partition_bits = 3; +} + +message ListQuarantinedOHashesRequest { + bytes page_token = 1; + uint32 page_size = 2; +} + +message ListQuarantinedOHashesReply { + repeated QuarantinedObjectName objects = 1; + bytes next_page_token = 2; +} + +message ListQuarantinedOHashRequest { + bytes prefix = 1; + bool repair_tool = 2; +} + +message ListQuarantinedOHashReply { + repeated Object objects = 1; +} + +message GetNextOffsetRequest { + uint32 volume_index = 1; + bool repair_tool = 2; +} + +message GetNextOffsetReply { + uint64 offset = 1; +} + +message GetStatsRequest {} + +message GetStatsReply { + map<string, uint64> stats = 1; +} + +message SetKvStateReply {} + +message GetKvStateRequest {} + +message KvState { + bool isClean = 1; +} + +// Generic messages +message Volume { + uint32 volume_index = 1; + VolumeType volume_type = 2; + uint32 volume_state = 3; + uint32 partition = 4; + uint64 next_offset = 5; +} + +message Object { + bytes name = 1; + uint32 volume_index = 2; + uint64 offset = 3; +} + +message QuarantinedObjectName { + bytes name = 1; +} + +// For listdir() like functions +message DirEntries { + repeated string entry = 1; +} + +// Enums +enum VolumeType { + VOLUME_DEFAULT = 0; + VOLUME_TOMBSTONE = 1; + VOLUME_X_DELETE_AT = 2; +} + +enum VolumeState { + // Default state, volume can be read from and written to + STATE_RW = 0; + // Volume is being compacted (source). New objects cannot be appended + STATE_COMPACTION_SRC = 1; + // Volume is a compaction target. New objects cannot be appended + STATE_COMPACTION_TARGET = 2; +} diff --git a/swift/obj/fmgr_pb2.py b/swift/obj/fmgr_pb2.py new file mode 100644 index 000000000..161aa7693 --- /dev/null +++ b/swift/obj/fmgr_pb2.py @@ -0,0 +1,2119 @@ +# -*- coding: utf-8 -*- +# Generated by the protocol buffer compiler. DO NOT EDIT! +# source: fmgr.proto + +from google.protobuf.internal import enum_type_wrapper +from google.protobuf import descriptor as _descriptor +from google.protobuf import message as _message +from google.protobuf import reflection as _reflection +from google.protobuf import symbol_database as _symbol_database +# @@protoc_insertion_point(imports) + +_sym_db = _symbol_database.Default() + + + + +DESCRIPTOR = _descriptor.FileDescriptor( + name='fmgr.proto', + package='filemgr', + syntax='proto3', + serialized_options=None, + serialized_pb=b'\n\nfmgr.proto\x12\x07\x66ilemgr\"\xad\x01\n\x15RegisterVolumeRequest\x12\x11\n\tpartition\x18\x01 \x01(\r\x12!\n\x04type\x18\x02 \x01(\x0e\x32\x13.filemgr.VolumeType\x12\x14\n\x0cvolume_index\x18\x03 \x01(\r\x12\x0e\n\x06offset\x18\x04 \x01(\x04\x12#\n\x05state\x18\x05 \x01(\x0e\x32\x14.filemgr.VolumeState\x12\x13\n\x0brepair_tool\x18\x06 \x01(\x08\"\x15\n\x13RegisterVolumeReply\"=\n\x17UnregisterVolumeRequest\x12\r\n\x05index\x18\x01 \x01(\r\x12\x13\n\x0brepair_tool\x18\x02 \x01(\x08\"\x17\n\x15UnregisterVolumeReply\"j\n\x18UpdateVolumeStateRequest\x12\x14\n\x0cvolume_index\x18\x01 \x01(\r\x12#\n\x05state\x18\x02 \x01(\x0e\x32\x14.filemgr.VolumeState\x12\x13\n\x0brepair_tool\x18\x03 \x01(\x08\"\x18\n\x16UpdateVolumeStateReply\"6\n\x10GetVolumeRequest\x12\r\n\x05index\x18\x01 \x01(\r\x12\x13\n\x0brepair_tool\x18\x02 \x01(\x08\"\x8e\x01\n\x0eGetVolumeReply\x12\x14\n\x0cvolume_index\x18\x01 \x01(\r\x12(\n\x0bvolume_type\x18\x02 \x01(\x0e\x32\x13.filemgr.VolumeType\x12\x14\n\x0cvolume_state\x18\x03 \x01(\r\x12\x11\n\tpartition\x18\x04 \x01(\r\x12\x13\n\x0bnext_offset\x18\x05 \x01(\x04\"_\n\x12ListVolumesRequest\x12\x11\n\tpartition\x18\x01 \x01(\r\x12!\n\x04type\x18\x02 \x01(\x0e\x32\x13.filemgr.VolumeType\x12\x13\n\x0brepair_tool\x18\x03 \x01(\x08\"4\n\x10ListVolumesReply\x12 \n\x07volumes\x18\x01 \x03(\x0b\x32\x0f.filemgr.Volume\"u\n\x15RegisterObjectRequest\x12\x0c\n\x04name\x18\x01 \x01(\x0c\x12\x14\n\x0cvolume_index\x18\x02 \x01(\r\x12\x0e\n\x06offset\x18\x03 \x01(\x04\x12\x13\n\x0bnext_offset\x18\x04 \x01(\x04\x12\x13\n\x0brepair_tool\x18\x05 \x01(\x08\"\x15\n\x13RegisterObjectReply\"<\n\x17UnregisterObjectRequest\x12\x0c\n\x04name\x18\x01 \x01(\x0c\x12\x13\n\x0brepair_tool\x18\x02 \x01(\x08\"\x17\n\x15UnregisterObjectReply\"J\n\x13RenameObjectRequest\x12\x0c\n\x04name\x18\x01 \x01(\x0c\x12\x10\n\x08new_name\x18\x02 \x01(\x0c\x12\x13\n\x0brepair_tool\x18\x03 \x01(\x08\"\x13\n\x11RenameObjectReply\"N\n\x11LoadObjectRequest\x12\x0c\n\x04name\x18\x01 \x01(\x0c\x12\x16\n\x0eis_quarantined\x18\x02 \x01(\x08\x12\x13\n\x0brepair_tool\x18\x03 \x01(\x08\"E\n\x0fLoadObjectReply\x12\x0c\n\x04name\x18\x01 \x01(\x0c\x12\x14\n\x0cvolume_index\x18\x02 \x01(\r\x12\x0e\n\x06offset\x18\x03 \x01(\x04\"<\n\x17QuarantineObjectRequest\x12\x0c\n\x04name\x18\x01 \x01(\x0c\x12\x13\n\x0brepair_tool\x18\x02 \x01(\x08\"\x17\n\x15QuarantineObjectReply\">\n\x19UnquarantineObjectRequest\x12\x0c\n\x04name\x18\x01 \x01(\x0c\x12\x13\n\x0brepair_tool\x18\x02 \x01(\x08\"\x19\n\x17UnquarantineObjectReply\"A\n\x1aLoadObjectsByPrefixRequest\x12\x0e\n\x06prefix\x18\x01 \x01(\x0c\x12\x13\n\x0brepair_tool\x18\x02 \x01(\x08\"<\n\x18LoadObjectsByPrefixReply\x12 \n\x07objects\x18\x01 \x03(\x0b\x32\x0f.filemgr.Object\"|\n\x1aLoadObjectsByVolumeRequest\x12\r\n\x05index\x18\x01 \x01(\r\x12\x13\n\x0bquarantined\x18\x02 \x01(\x08\x12\x12\n\npage_token\x18\x03 \x01(\x0c\x12\x11\n\tpage_size\x18\x04 \x01(\r\x12\x13\n\x0brepair_tool\x18\x05 \x01(\x08\"U\n\x18LoadObjectsByVolumeReply\x12 \n\x07objects\x18\x01 \x03(\x0b\x32\x0f.filemgr.Object\x12\x17\n\x0fnext_page_token\x18\x02 \x01(\x0c\"/\n\x15ListPartitionsRequest\x12\x16\n\x0epartition_bits\x18\x01 \x01(\r\"A\n\x14ListPartitionRequest\x12\x11\n\tpartition\x18\x01 \x01(\r\x12\x16\n\x0epartition_bits\x18\x02 \x01(\r\"N\n\x11ListSuffixRequest\x12\x11\n\tpartition\x18\x01 \x01(\r\x12\x0e\n\x06suffix\x18\x02 \x01(\x0c\x12\x16\n\x0epartition_bits\x18\x03 \x01(\r\"F\n\x1dListQuarantinedOHashesRequest\x12\x12\n\npage_token\x18\x01 \x01(\x0c\x12\x11\n\tpage_size\x18\x02 \x01(\r\"g\n\x1bListQuarantinedOHashesReply\x12/\n\x07objects\x18\x01 \x03(\x0b\x32\x1e.filemgr.QuarantinedObjectName\x12\x17\n\x0fnext_page_token\x18\x02 \x01(\x0c\"%\n\x15QuarantinedObjectName\x12\x0c\n\x04name\x18\x01 \x01(\x0c\"B\n\x1bListQuarantinedOHashRequest\x12\x0e\n\x06prefix\x18\x01 \x01(\x0c\x12\x13\n\x0brepair_tool\x18\x02 \x01(\x08\"=\n\x19ListQuarantinedOHashReply\x12 \n\x07objects\x18\x01 \x03(\x0b\x32\x0f.filemgr.Object\"A\n\x14GetNextOffsetRequest\x12\x14\n\x0cvolume_index\x18\x01 \x01(\r\x12\x13\n\x0brepair_tool\x18\x02 \x01(\x08\"$\n\x12GetNextOffsetReply\x12\x0e\n\x06offset\x18\x01 \x01(\x04\"\x11\n\x0fGetStatsRequest\"o\n\rGetStatsReply\x12\x30\n\x05stats\x18\x01 \x03(\x0b\x32!.filemgr.GetStatsReply.StatsEntry\x1a,\n\nStatsEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\x04:\x02\x38\x01\"\x11\n\x0fSetKvStateReply\"\x13\n\x11GetKvStateRequest\"\x1a\n\x07KvState\x12\x0f\n\x07isClean\x18\x01 \x01(\x08\"\x86\x01\n\x06Volume\x12\x14\n\x0cvolume_index\x18\x01 \x01(\r\x12(\n\x0bvolume_type\x18\x02 \x01(\x0e\x32\x13.filemgr.VolumeType\x12\x14\n\x0cvolume_state\x18\x03 \x01(\r\x12\x11\n\tpartition\x18\x04 \x01(\r\x12\x13\n\x0bnext_offset\x18\x05 \x01(\x04\"<\n\x06Object\x12\x0c\n\x04name\x18\x01 \x01(\x0c\x12\x14\n\x0cvolume_index\x18\x02 \x01(\r\x12\x0e\n\x06offset\x18\x03 \x01(\x04\"\x1b\n\nDirEntries\x12\r\n\x05\x65ntry\x18\x01 \x03(\t*N\n\nVolumeType\x12\x12\n\x0eVOLUME_DEFAULT\x10\x00\x12\x14\n\x10VOLUME_TOMBSTONE\x10\x01\x12\x16\n\x12VOLUME_X_DELETE_AT\x10\x02*R\n\x0bVolumeState\x12\x0c\n\x08STATE_RW\x10\x00\x12\x18\n\x14STATE_COMPACTION_SRC\x10\x01\x12\x1b\n\x17STATE_COMPACTION_TARGET\x10\x02\x62\x06proto3' +) + +_VOLUMETYPE = _descriptor.EnumDescriptor( + name='VolumeType', + full_name='filemgr.VolumeType', + filename=None, + file=DESCRIPTOR, + values=[ + _descriptor.EnumValueDescriptor( + name='VOLUME_DEFAULT', index=0, number=0, + serialized_options=None, + type=None), + _descriptor.EnumValueDescriptor( + name='VOLUME_TOMBSTONE', index=1, number=1, + serialized_options=None, + type=None), + _descriptor.EnumValueDescriptor( + name='VOLUME_X_DELETE_AT', index=2, number=2, + serialized_options=None, + type=None), + ], + containing_type=None, + serialized_options=None, + serialized_start=2869, + serialized_end=2947, +) +_sym_db.RegisterEnumDescriptor(_VOLUMETYPE) + +VolumeType = enum_type_wrapper.EnumTypeWrapper(_VOLUMETYPE) +_VOLUMESTATE = _descriptor.EnumDescriptor( + name='VolumeState', + full_name='filemgr.VolumeState', + filename=None, + file=DESCRIPTOR, + values=[ + _descriptor.EnumValueDescriptor( + name='STATE_RW', index=0, number=0, + serialized_options=None, + type=None), + _descriptor.EnumValueDescriptor( + name='STATE_COMPACTION_SRC', index=1, number=1, + serialized_options=None, + type=None), + _descriptor.EnumValueDescriptor( + name='STATE_COMPACTION_TARGET', index=2, number=2, + serialized_options=None, + type=None), + ], + containing_type=None, + serialized_options=None, + serialized_start=2949, + serialized_end=3031, +) +_sym_db.RegisterEnumDescriptor(_VOLUMESTATE) + +VolumeState = enum_type_wrapper.EnumTypeWrapper(_VOLUMESTATE) +VOLUME_DEFAULT = 0 +VOLUME_TOMBSTONE = 1 +VOLUME_X_DELETE_AT = 2 +STATE_RW = 0 +STATE_COMPACTION_SRC = 1 +STATE_COMPACTION_TARGET = 2 + + + +_REGISTERVOLUMEREQUEST = _descriptor.Descriptor( + name='RegisterVolumeRequest', + full_name='filemgr.RegisterVolumeRequest', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='partition', full_name='filemgr.RegisterVolumeRequest.partition', index=0, + number=1, type=13, cpp_type=3, label=1, + has_default_value=False, default_value=0, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='type', full_name='filemgr.RegisterVolumeRequest.type', index=1, + number=2, type=14, cpp_type=8, label=1, + has_default_value=False, default_value=0, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='volume_index', full_name='filemgr.RegisterVolumeRequest.volume_index', index=2, + number=3, type=13, cpp_type=3, label=1, + has_default_value=False, default_value=0, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='offset', full_name='filemgr.RegisterVolumeRequest.offset', index=3, + number=4, type=4, cpp_type=4, label=1, + has_default_value=False, default_value=0, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='state', full_name='filemgr.RegisterVolumeRequest.state', index=4, + number=5, type=14, cpp_type=8, label=1, + has_default_value=False, default_value=0, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='repair_tool', full_name='filemgr.RegisterVolumeRequest.repair_tool', index=5, + number=6, type=8, cpp_type=7, label=1, + has_default_value=False, default_value=False, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + serialized_options=None, + is_extendable=False, + syntax='proto3', + extension_ranges=[], + oneofs=[ + ], + serialized_start=24, + serialized_end=197, +) + + +_REGISTERVOLUMEREPLY = _descriptor.Descriptor( + name='RegisterVolumeReply', + full_name='filemgr.RegisterVolumeReply', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + serialized_options=None, + is_extendable=False, + syntax='proto3', + extension_ranges=[], + oneofs=[ + ], + serialized_start=199, + serialized_end=220, +) + + +_UNREGISTERVOLUMEREQUEST = _descriptor.Descriptor( + name='UnregisterVolumeRequest', + full_name='filemgr.UnregisterVolumeRequest', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='index', full_name='filemgr.UnregisterVolumeRequest.index', index=0, + number=1, type=13, cpp_type=3, label=1, + has_default_value=False, default_value=0, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='repair_tool', full_name='filemgr.UnregisterVolumeRequest.repair_tool', index=1, + number=2, type=8, cpp_type=7, label=1, + has_default_value=False, default_value=False, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + serialized_options=None, + is_extendable=False, + syntax='proto3', + extension_ranges=[], + oneofs=[ + ], + serialized_start=222, + serialized_end=283, +) + + +_UNREGISTERVOLUMEREPLY = _descriptor.Descriptor( + name='UnregisterVolumeReply', + full_name='filemgr.UnregisterVolumeReply', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + serialized_options=None, + is_extendable=False, + syntax='proto3', + extension_ranges=[], + oneofs=[ + ], + serialized_start=285, + serialized_end=308, +) + + +_UPDATEVOLUMESTATEREQUEST = _descriptor.Descriptor( + name='UpdateVolumeStateRequest', + full_name='filemgr.UpdateVolumeStateRequest', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='volume_index', full_name='filemgr.UpdateVolumeStateRequest.volume_index', index=0, + number=1, type=13, cpp_type=3, label=1, + has_default_value=False, default_value=0, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='state', full_name='filemgr.UpdateVolumeStateRequest.state', index=1, + number=2, type=14, cpp_type=8, label=1, + has_default_value=False, default_value=0, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='repair_tool', full_name='filemgr.UpdateVolumeStateRequest.repair_tool', index=2, + number=3, type=8, cpp_type=7, label=1, + has_default_value=False, default_value=False, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + serialized_options=None, + is_extendable=False, + syntax='proto3', + extension_ranges=[], + oneofs=[ + ], + serialized_start=310, + serialized_end=416, +) + + +_UPDATEVOLUMESTATEREPLY = _descriptor.Descriptor( + name='UpdateVolumeStateReply', + full_name='filemgr.UpdateVolumeStateReply', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + serialized_options=None, + is_extendable=False, + syntax='proto3', + extension_ranges=[], + oneofs=[ + ], + serialized_start=418, + serialized_end=442, +) + + +_GETVOLUMEREQUEST = _descriptor.Descriptor( + name='GetVolumeRequest', + full_name='filemgr.GetVolumeRequest', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='index', full_name='filemgr.GetVolumeRequest.index', index=0, + number=1, type=13, cpp_type=3, label=1, + has_default_value=False, default_value=0, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='repair_tool', full_name='filemgr.GetVolumeRequest.repair_tool', index=1, + number=2, type=8, cpp_type=7, label=1, + has_default_value=False, default_value=False, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + serialized_options=None, + is_extendable=False, + syntax='proto3', + extension_ranges=[], + oneofs=[ + ], + serialized_start=444, + serialized_end=498, +) + + +_GETVOLUMEREPLY = _descriptor.Descriptor( + name='GetVolumeReply', + full_name='filemgr.GetVolumeReply', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='volume_index', full_name='filemgr.GetVolumeReply.volume_index', index=0, + number=1, type=13, cpp_type=3, label=1, + has_default_value=False, default_value=0, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='volume_type', full_name='filemgr.GetVolumeReply.volume_type', index=1, + number=2, type=14, cpp_type=8, label=1, + has_default_value=False, default_value=0, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='volume_state', full_name='filemgr.GetVolumeReply.volume_state', index=2, + number=3, type=13, cpp_type=3, label=1, + has_default_value=False, default_value=0, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='partition', full_name='filemgr.GetVolumeReply.partition', index=3, + number=4, type=13, cpp_type=3, label=1, + has_default_value=False, default_value=0, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='next_offset', full_name='filemgr.GetVolumeReply.next_offset', index=4, + number=5, type=4, cpp_type=4, label=1, + has_default_value=False, default_value=0, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + serialized_options=None, + is_extendable=False, + syntax='proto3', + extension_ranges=[], + oneofs=[ + ], + serialized_start=501, + serialized_end=643, +) + + +_LISTVOLUMESREQUEST = _descriptor.Descriptor( + name='ListVolumesRequest', + full_name='filemgr.ListVolumesRequest', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='partition', full_name='filemgr.ListVolumesRequest.partition', index=0, + number=1, type=13, cpp_type=3, label=1, + has_default_value=False, default_value=0, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='type', full_name='filemgr.ListVolumesRequest.type', index=1, + number=2, type=14, cpp_type=8, label=1, + has_default_value=False, default_value=0, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='repair_tool', full_name='filemgr.ListVolumesRequest.repair_tool', index=2, + number=3, type=8, cpp_type=7, label=1, + has_default_value=False, default_value=False, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + serialized_options=None, + is_extendable=False, + syntax='proto3', + extension_ranges=[], + oneofs=[ + ], + serialized_start=645, + serialized_end=740, +) + + +_LISTVOLUMESREPLY = _descriptor.Descriptor( + name='ListVolumesReply', + full_name='filemgr.ListVolumesReply', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='volumes', full_name='filemgr.ListVolumesReply.volumes', index=0, + number=1, type=11, cpp_type=10, label=3, + has_default_value=False, default_value=[], + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + serialized_options=None, + is_extendable=False, + syntax='proto3', + extension_ranges=[], + oneofs=[ + ], + serialized_start=742, + serialized_end=794, +) + + +_REGISTEROBJECTREQUEST = _descriptor.Descriptor( + name='RegisterObjectRequest', + full_name='filemgr.RegisterObjectRequest', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='name', full_name='filemgr.RegisterObjectRequest.name', index=0, + number=1, type=12, cpp_type=9, label=1, + has_default_value=False, default_value=b"", + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='volume_index', full_name='filemgr.RegisterObjectRequest.volume_index', index=1, + number=2, type=13, cpp_type=3, label=1, + has_default_value=False, default_value=0, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='offset', full_name='filemgr.RegisterObjectRequest.offset', index=2, + number=3, type=4, cpp_type=4, label=1, + has_default_value=False, default_value=0, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='next_offset', full_name='filemgr.RegisterObjectRequest.next_offset', index=3, + number=4, type=4, cpp_type=4, label=1, + has_default_value=False, default_value=0, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='repair_tool', full_name='filemgr.RegisterObjectRequest.repair_tool', index=4, + number=5, type=8, cpp_type=7, label=1, + has_default_value=False, default_value=False, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + serialized_options=None, + is_extendable=False, + syntax='proto3', + extension_ranges=[], + oneofs=[ + ], + serialized_start=796, + serialized_end=913, +) + + +_REGISTEROBJECTREPLY = _descriptor.Descriptor( + name='RegisterObjectReply', + full_name='filemgr.RegisterObjectReply', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + serialized_options=None, + is_extendable=False, + syntax='proto3', + extension_ranges=[], + oneofs=[ + ], + serialized_start=915, + serialized_end=936, +) + + +_UNREGISTEROBJECTREQUEST = _descriptor.Descriptor( + name='UnregisterObjectRequest', + full_name='filemgr.UnregisterObjectRequest', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='name', full_name='filemgr.UnregisterObjectRequest.name', index=0, + number=1, type=12, cpp_type=9, label=1, + has_default_value=False, default_value=b"", + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='repair_tool', full_name='filemgr.UnregisterObjectRequest.repair_tool', index=1, + number=2, type=8, cpp_type=7, label=1, + has_default_value=False, default_value=False, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + serialized_options=None, + is_extendable=False, + syntax='proto3', + extension_ranges=[], + oneofs=[ + ], + serialized_start=938, + serialized_end=998, +) + + +_UNREGISTEROBJECTREPLY = _descriptor.Descriptor( + name='UnregisterObjectReply', + full_name='filemgr.UnregisterObjectReply', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + serialized_options=None, + is_extendable=False, + syntax='proto3', + extension_ranges=[], + oneofs=[ + ], + serialized_start=1000, + serialized_end=1023, +) + + +_RENAMEOBJECTREQUEST = _descriptor.Descriptor( + name='RenameObjectRequest', + full_name='filemgr.RenameObjectRequest', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='name', full_name='filemgr.RenameObjectRequest.name', index=0, + number=1, type=12, cpp_type=9, label=1, + has_default_value=False, default_value=b"", + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='new_name', full_name='filemgr.RenameObjectRequest.new_name', index=1, + number=2, type=12, cpp_type=9, label=1, + has_default_value=False, default_value=b"", + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='repair_tool', full_name='filemgr.RenameObjectRequest.repair_tool', index=2, + number=3, type=8, cpp_type=7, label=1, + has_default_value=False, default_value=False, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + serialized_options=None, + is_extendable=False, + syntax='proto3', + extension_ranges=[], + oneofs=[ + ], + serialized_start=1025, + serialized_end=1099, +) + + +_RENAMEOBJECTREPLY = _descriptor.Descriptor( + name='RenameObjectReply', + full_name='filemgr.RenameObjectReply', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + serialized_options=None, + is_extendable=False, + syntax='proto3', + extension_ranges=[], + oneofs=[ + ], + serialized_start=1101, + serialized_end=1120, +) + + +_LOADOBJECTREQUEST = _descriptor.Descriptor( + name='LoadObjectRequest', + full_name='filemgr.LoadObjectRequest', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='name', full_name='filemgr.LoadObjectRequest.name', index=0, + number=1, type=12, cpp_type=9, label=1, + has_default_value=False, default_value=b"", + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='is_quarantined', full_name='filemgr.LoadObjectRequest.is_quarantined', index=1, + number=2, type=8, cpp_type=7, label=1, + has_default_value=False, default_value=False, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='repair_tool', full_name='filemgr.LoadObjectRequest.repair_tool', index=2, + number=3, type=8, cpp_type=7, label=1, + has_default_value=False, default_value=False, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + serialized_options=None, + is_extendable=False, + syntax='proto3', + extension_ranges=[], + oneofs=[ + ], + serialized_start=1122, + serialized_end=1200, +) + + +_LOADOBJECTREPLY = _descriptor.Descriptor( + name='LoadObjectReply', + full_name='filemgr.LoadObjectReply', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='name', full_name='filemgr.LoadObjectReply.name', index=0, + number=1, type=12, cpp_type=9, label=1, + has_default_value=False, default_value=b"", + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='volume_index', full_name='filemgr.LoadObjectReply.volume_index', index=1, + number=2, type=13, cpp_type=3, label=1, + has_default_value=False, default_value=0, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='offset', full_name='filemgr.LoadObjectReply.offset', index=2, + number=3, type=4, cpp_type=4, label=1, + has_default_value=False, default_value=0, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + serialized_options=None, + is_extendable=False, + syntax='proto3', + extension_ranges=[], + oneofs=[ + ], + serialized_start=1202, + serialized_end=1271, +) + + +_QUARANTINEOBJECTREQUEST = _descriptor.Descriptor( + name='QuarantineObjectRequest', + full_name='filemgr.QuarantineObjectRequest', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='name', full_name='filemgr.QuarantineObjectRequest.name', index=0, + number=1, type=12, cpp_type=9, label=1, + has_default_value=False, default_value=b"", + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='repair_tool', full_name='filemgr.QuarantineObjectRequest.repair_tool', index=1, + number=2, type=8, cpp_type=7, label=1, + has_default_value=False, default_value=False, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + serialized_options=None, + is_extendable=False, + syntax='proto3', + extension_ranges=[], + oneofs=[ + ], + serialized_start=1273, + serialized_end=1333, +) + + +_QUARANTINEOBJECTREPLY = _descriptor.Descriptor( + name='QuarantineObjectReply', + full_name='filemgr.QuarantineObjectReply', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + serialized_options=None, + is_extendable=False, + syntax='proto3', + extension_ranges=[], + oneofs=[ + ], + serialized_start=1335, + serialized_end=1358, +) + + +_UNQUARANTINEOBJECTREQUEST = _descriptor.Descriptor( + name='UnquarantineObjectRequest', + full_name='filemgr.UnquarantineObjectRequest', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='name', full_name='filemgr.UnquarantineObjectRequest.name', index=0, + number=1, type=12, cpp_type=9, label=1, + has_default_value=False, default_value=b"", + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='repair_tool', full_name='filemgr.UnquarantineObjectRequest.repair_tool', index=1, + number=2, type=8, cpp_type=7, label=1, + has_default_value=False, default_value=False, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + serialized_options=None, + is_extendable=False, + syntax='proto3', + extension_ranges=[], + oneofs=[ + ], + serialized_start=1360, + serialized_end=1422, +) + + +_UNQUARANTINEOBJECTREPLY = _descriptor.Descriptor( + name='UnquarantineObjectReply', + full_name='filemgr.UnquarantineObjectReply', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + serialized_options=None, + is_extendable=False, + syntax='proto3', + extension_ranges=[], + oneofs=[ + ], + serialized_start=1424, + serialized_end=1449, +) + + +_LOADOBJECTSBYPREFIXREQUEST = _descriptor.Descriptor( + name='LoadObjectsByPrefixRequest', + full_name='filemgr.LoadObjectsByPrefixRequest', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='prefix', full_name='filemgr.LoadObjectsByPrefixRequest.prefix', index=0, + number=1, type=12, cpp_type=9, label=1, + has_default_value=False, default_value=b"", + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='repair_tool', full_name='filemgr.LoadObjectsByPrefixRequest.repair_tool', index=1, + number=2, type=8, cpp_type=7, label=1, + has_default_value=False, default_value=False, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + serialized_options=None, + is_extendable=False, + syntax='proto3', + extension_ranges=[], + oneofs=[ + ], + serialized_start=1451, + serialized_end=1516, +) + + +_LOADOBJECTSBYPREFIXREPLY = _descriptor.Descriptor( + name='LoadObjectsByPrefixReply', + full_name='filemgr.LoadObjectsByPrefixReply', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='objects', full_name='filemgr.LoadObjectsByPrefixReply.objects', index=0, + number=1, type=11, cpp_type=10, label=3, + has_default_value=False, default_value=[], + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + serialized_options=None, + is_extendable=False, + syntax='proto3', + extension_ranges=[], + oneofs=[ + ], + serialized_start=1518, + serialized_end=1578, +) + + +_LOADOBJECTSBYVOLUMEREQUEST = _descriptor.Descriptor( + name='LoadObjectsByVolumeRequest', + full_name='filemgr.LoadObjectsByVolumeRequest', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='index', full_name='filemgr.LoadObjectsByVolumeRequest.index', index=0, + number=1, type=13, cpp_type=3, label=1, + has_default_value=False, default_value=0, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='quarantined', full_name='filemgr.LoadObjectsByVolumeRequest.quarantined', index=1, + number=2, type=8, cpp_type=7, label=1, + has_default_value=False, default_value=False, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='page_token', full_name='filemgr.LoadObjectsByVolumeRequest.page_token', index=2, + number=3, type=12, cpp_type=9, label=1, + has_default_value=False, default_value=b"", + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='page_size', full_name='filemgr.LoadObjectsByVolumeRequest.page_size', index=3, + number=4, type=13, cpp_type=3, label=1, + has_default_value=False, default_value=0, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='repair_tool', full_name='filemgr.LoadObjectsByVolumeRequest.repair_tool', index=4, + number=5, type=8, cpp_type=7, label=1, + has_default_value=False, default_value=False, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + serialized_options=None, + is_extendable=False, + syntax='proto3', + extension_ranges=[], + oneofs=[ + ], + serialized_start=1580, + serialized_end=1704, +) + + +_LOADOBJECTSBYVOLUMEREPLY = _descriptor.Descriptor( + name='LoadObjectsByVolumeReply', + full_name='filemgr.LoadObjectsByVolumeReply', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='objects', full_name='filemgr.LoadObjectsByVolumeReply.objects', index=0, + number=1, type=11, cpp_type=10, label=3, + has_default_value=False, default_value=[], + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='next_page_token', full_name='filemgr.LoadObjectsByVolumeReply.next_page_token', index=1, + number=2, type=12, cpp_type=9, label=1, + has_default_value=False, default_value=b"", + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + serialized_options=None, + is_extendable=False, + syntax='proto3', + extension_ranges=[], + oneofs=[ + ], + serialized_start=1706, + serialized_end=1791, +) + + +_LISTPARTITIONSREQUEST = _descriptor.Descriptor( + name='ListPartitionsRequest', + full_name='filemgr.ListPartitionsRequest', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='partition_bits', full_name='filemgr.ListPartitionsRequest.partition_bits', index=0, + number=1, type=13, cpp_type=3, label=1, + has_default_value=False, default_value=0, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + serialized_options=None, + is_extendable=False, + syntax='proto3', + extension_ranges=[], + oneofs=[ + ], + serialized_start=1793, + serialized_end=1840, +) + + +_LISTPARTITIONREQUEST = _descriptor.Descriptor( + name='ListPartitionRequest', + full_name='filemgr.ListPartitionRequest', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='partition', full_name='filemgr.ListPartitionRequest.partition', index=0, + number=1, type=13, cpp_type=3, label=1, + has_default_value=False, default_value=0, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='partition_bits', full_name='filemgr.ListPartitionRequest.partition_bits', index=1, + number=2, type=13, cpp_type=3, label=1, + has_default_value=False, default_value=0, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + serialized_options=None, + is_extendable=False, + syntax='proto3', + extension_ranges=[], + oneofs=[ + ], + serialized_start=1842, + serialized_end=1907, +) + + +_LISTSUFFIXREQUEST = _descriptor.Descriptor( + name='ListSuffixRequest', + full_name='filemgr.ListSuffixRequest', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='partition', full_name='filemgr.ListSuffixRequest.partition', index=0, + number=1, type=13, cpp_type=3, label=1, + has_default_value=False, default_value=0, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='suffix', full_name='filemgr.ListSuffixRequest.suffix', index=1, + number=2, type=12, cpp_type=9, label=1, + has_default_value=False, default_value=b"", + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='partition_bits', full_name='filemgr.ListSuffixRequest.partition_bits', index=2, + number=3, type=13, cpp_type=3, label=1, + has_default_value=False, default_value=0, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + serialized_options=None, + is_extendable=False, + syntax='proto3', + extension_ranges=[], + oneofs=[ + ], + serialized_start=1909, + serialized_end=1987, +) + + +_LISTQUARANTINEDOHASHESREQUEST = _descriptor.Descriptor( + name='ListQuarantinedOHashesRequest', + full_name='filemgr.ListQuarantinedOHashesRequest', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='page_token', full_name='filemgr.ListQuarantinedOHashesRequest.page_token', index=0, + number=1, type=12, cpp_type=9, label=1, + has_default_value=False, default_value=b"", + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='page_size', full_name='filemgr.ListQuarantinedOHashesRequest.page_size', index=1, + number=2, type=13, cpp_type=3, label=1, + has_default_value=False, default_value=0, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + serialized_options=None, + is_extendable=False, + syntax='proto3', + extension_ranges=[], + oneofs=[ + ], + serialized_start=1989, + serialized_end=2059, +) + + +_LISTQUARANTINEDOHASHESREPLY = _descriptor.Descriptor( + name='ListQuarantinedOHashesReply', + full_name='filemgr.ListQuarantinedOHashesReply', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='objects', full_name='filemgr.ListQuarantinedOHashesReply.objects', index=0, + number=1, type=11, cpp_type=10, label=3, + has_default_value=False, default_value=[], + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='next_page_token', full_name='filemgr.ListQuarantinedOHashesReply.next_page_token', index=1, + number=2, type=12, cpp_type=9, label=1, + has_default_value=False, default_value=b"", + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + serialized_options=None, + is_extendable=False, + syntax='proto3', + extension_ranges=[], + oneofs=[ + ], + serialized_start=2061, + serialized_end=2164, +) + + +_QUARANTINEDOBJECTNAME = _descriptor.Descriptor( + name='QuarantinedObjectName', + full_name='filemgr.QuarantinedObjectName', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='name', full_name='filemgr.QuarantinedObjectName.name', index=0, + number=1, type=12, cpp_type=9, label=1, + has_default_value=False, default_value=b"", + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + serialized_options=None, + is_extendable=False, + syntax='proto3', + extension_ranges=[], + oneofs=[ + ], + serialized_start=2166, + serialized_end=2203, +) + + +_LISTQUARANTINEDOHASHREQUEST = _descriptor.Descriptor( + name='ListQuarantinedOHashRequest', + full_name='filemgr.ListQuarantinedOHashRequest', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='prefix', full_name='filemgr.ListQuarantinedOHashRequest.prefix', index=0, + number=1, type=12, cpp_type=9, label=1, + has_default_value=False, default_value=b"", + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='repair_tool', full_name='filemgr.ListQuarantinedOHashRequest.repair_tool', index=1, + number=2, type=8, cpp_type=7, label=1, + has_default_value=False, default_value=False, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + serialized_options=None, + is_extendable=False, + syntax='proto3', + extension_ranges=[], + oneofs=[ + ], + serialized_start=2205, + serialized_end=2271, +) + + +_LISTQUARANTINEDOHASHREPLY = _descriptor.Descriptor( + name='ListQuarantinedOHashReply', + full_name='filemgr.ListQuarantinedOHashReply', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='objects', full_name='filemgr.ListQuarantinedOHashReply.objects', index=0, + number=1, type=11, cpp_type=10, label=3, + has_default_value=False, default_value=[], + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + serialized_options=None, + is_extendable=False, + syntax='proto3', + extension_ranges=[], + oneofs=[ + ], + serialized_start=2273, + serialized_end=2334, +) + + +_GETNEXTOFFSETREQUEST = _descriptor.Descriptor( + name='GetNextOffsetRequest', + full_name='filemgr.GetNextOffsetRequest', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='volume_index', full_name='filemgr.GetNextOffsetRequest.volume_index', index=0, + number=1, type=13, cpp_type=3, label=1, + has_default_value=False, default_value=0, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='repair_tool', full_name='filemgr.GetNextOffsetRequest.repair_tool', index=1, + number=2, type=8, cpp_type=7, label=1, + has_default_value=False, default_value=False, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + serialized_options=None, + is_extendable=False, + syntax='proto3', + extension_ranges=[], + oneofs=[ + ], + serialized_start=2336, + serialized_end=2401, +) + + +_GETNEXTOFFSETREPLY = _descriptor.Descriptor( + name='GetNextOffsetReply', + full_name='filemgr.GetNextOffsetReply', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='offset', full_name='filemgr.GetNextOffsetReply.offset', index=0, + number=1, type=4, cpp_type=4, label=1, + has_default_value=False, default_value=0, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + serialized_options=None, + is_extendable=False, + syntax='proto3', + extension_ranges=[], + oneofs=[ + ], + serialized_start=2403, + serialized_end=2439, +) + + +_GETSTATSREQUEST = _descriptor.Descriptor( + name='GetStatsRequest', + full_name='filemgr.GetStatsRequest', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + serialized_options=None, + is_extendable=False, + syntax='proto3', + extension_ranges=[], + oneofs=[ + ], + serialized_start=2441, + serialized_end=2458, +) + + +_GETSTATSREPLY_STATSENTRY = _descriptor.Descriptor( + name='StatsEntry', + full_name='filemgr.GetStatsReply.StatsEntry', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='key', full_name='filemgr.GetStatsReply.StatsEntry.key', index=0, + number=1, type=9, cpp_type=9, label=1, + has_default_value=False, default_value=b"".decode('utf-8'), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='value', full_name='filemgr.GetStatsReply.StatsEntry.value', index=1, + number=2, type=4, cpp_type=4, label=1, + has_default_value=False, default_value=0, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + serialized_options=b'8\001', + is_extendable=False, + syntax='proto3', + extension_ranges=[], + oneofs=[ + ], + serialized_start=2527, + serialized_end=2571, +) + +_GETSTATSREPLY = _descriptor.Descriptor( + name='GetStatsReply', + full_name='filemgr.GetStatsReply', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='stats', full_name='filemgr.GetStatsReply.stats', index=0, + number=1, type=11, cpp_type=10, label=3, + has_default_value=False, default_value=[], + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + ], + extensions=[ + ], + nested_types=[_GETSTATSREPLY_STATSENTRY, ], + enum_types=[ + ], + serialized_options=None, + is_extendable=False, + syntax='proto3', + extension_ranges=[], + oneofs=[ + ], + serialized_start=2460, + serialized_end=2571, +) + + +_SETKVSTATEREPLY = _descriptor.Descriptor( + name='SetKvStateReply', + full_name='filemgr.SetKvStateReply', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + serialized_options=None, + is_extendable=False, + syntax='proto3', + extension_ranges=[], + oneofs=[ + ], + serialized_start=2573, + serialized_end=2590, +) + + +_GETKVSTATEREQUEST = _descriptor.Descriptor( + name='GetKvStateRequest', + full_name='filemgr.GetKvStateRequest', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + serialized_options=None, + is_extendable=False, + syntax='proto3', + extension_ranges=[], + oneofs=[ + ], + serialized_start=2592, + serialized_end=2611, +) + + +_KVSTATE = _descriptor.Descriptor( + name='KvState', + full_name='filemgr.KvState', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='isClean', full_name='filemgr.KvState.isClean', index=0, + number=1, type=8, cpp_type=7, label=1, + has_default_value=False, default_value=False, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + serialized_options=None, + is_extendable=False, + syntax='proto3', + extension_ranges=[], + oneofs=[ + ], + serialized_start=2613, + serialized_end=2639, +) + + +_VOLUME = _descriptor.Descriptor( + name='Volume', + full_name='filemgr.Volume', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='volume_index', full_name='filemgr.Volume.volume_index', index=0, + number=1, type=13, cpp_type=3, label=1, + has_default_value=False, default_value=0, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='volume_type', full_name='filemgr.Volume.volume_type', index=1, + number=2, type=14, cpp_type=8, label=1, + has_default_value=False, default_value=0, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='volume_state', full_name='filemgr.Volume.volume_state', index=2, + number=3, type=13, cpp_type=3, label=1, + has_default_value=False, default_value=0, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='partition', full_name='filemgr.Volume.partition', index=3, + number=4, type=13, cpp_type=3, label=1, + has_default_value=False, default_value=0, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='next_offset', full_name='filemgr.Volume.next_offset', index=4, + number=5, type=4, cpp_type=4, label=1, + has_default_value=False, default_value=0, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + serialized_options=None, + is_extendable=False, + syntax='proto3', + extension_ranges=[], + oneofs=[ + ], + serialized_start=2642, + serialized_end=2776, +) + + +_OBJECT = _descriptor.Descriptor( + name='Object', + full_name='filemgr.Object', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='name', full_name='filemgr.Object.name', index=0, + number=1, type=12, cpp_type=9, label=1, + has_default_value=False, default_value=b"", + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='volume_index', full_name='filemgr.Object.volume_index', index=1, + number=2, type=13, cpp_type=3, label=1, + has_default_value=False, default_value=0, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='offset', full_name='filemgr.Object.offset', index=2, + number=3, type=4, cpp_type=4, label=1, + has_default_value=False, default_value=0, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + serialized_options=None, + is_extendable=False, + syntax='proto3', + extension_ranges=[], + oneofs=[ + ], + serialized_start=2778, + serialized_end=2838, +) + + +_DIRENTRIES = _descriptor.Descriptor( + name='DirEntries', + full_name='filemgr.DirEntries', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='entry', full_name='filemgr.DirEntries.entry', index=0, + number=1, type=9, cpp_type=9, label=3, + has_default_value=False, default_value=[], + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + serialized_options=None, + is_extendable=False, + syntax='proto3', + extension_ranges=[], + oneofs=[ + ], + serialized_start=2840, + serialized_end=2867, +) + +_REGISTERVOLUMEREQUEST.fields_by_name['type'].enum_type = _VOLUMETYPE +_REGISTERVOLUMEREQUEST.fields_by_name['state'].enum_type = _VOLUMESTATE +_UPDATEVOLUMESTATEREQUEST.fields_by_name['state'].enum_type = _VOLUMESTATE +_GETVOLUMEREPLY.fields_by_name['volume_type'].enum_type = _VOLUMETYPE +_LISTVOLUMESREQUEST.fields_by_name['type'].enum_type = _VOLUMETYPE +_LISTVOLUMESREPLY.fields_by_name['volumes'].message_type = _VOLUME +_LOADOBJECTSBYPREFIXREPLY.fields_by_name['objects'].message_type = _OBJECT +_LOADOBJECTSBYVOLUMEREPLY.fields_by_name['objects'].message_type = _OBJECT +_LISTQUARANTINEDOHASHESREPLY.fields_by_name['objects'].message_type = _QUARANTINEDOBJECTNAME +_LISTQUARANTINEDOHASHREPLY.fields_by_name['objects'].message_type = _OBJECT +_GETSTATSREPLY_STATSENTRY.containing_type = _GETSTATSREPLY +_GETSTATSREPLY.fields_by_name['stats'].message_type = _GETSTATSREPLY_STATSENTRY +_VOLUME.fields_by_name['volume_type'].enum_type = _VOLUMETYPE +DESCRIPTOR.message_types_by_name['RegisterVolumeRequest'] = _REGISTERVOLUMEREQUEST +DESCRIPTOR.message_types_by_name['RegisterVolumeReply'] = _REGISTERVOLUMEREPLY +DESCRIPTOR.message_types_by_name['UnregisterVolumeRequest'] = _UNREGISTERVOLUMEREQUEST +DESCRIPTOR.message_types_by_name['UnregisterVolumeReply'] = _UNREGISTERVOLUMEREPLY +DESCRIPTOR.message_types_by_name['UpdateVolumeStateRequest'] = _UPDATEVOLUMESTATEREQUEST +DESCRIPTOR.message_types_by_name['UpdateVolumeStateReply'] = _UPDATEVOLUMESTATEREPLY +DESCRIPTOR.message_types_by_name['GetVolumeRequest'] = _GETVOLUMEREQUEST +DESCRIPTOR.message_types_by_name['GetVolumeReply'] = _GETVOLUMEREPLY +DESCRIPTOR.message_types_by_name['ListVolumesRequest'] = _LISTVOLUMESREQUEST +DESCRIPTOR.message_types_by_name['ListVolumesReply'] = _LISTVOLUMESREPLY +DESCRIPTOR.message_types_by_name['RegisterObjectRequest'] = _REGISTEROBJECTREQUEST +DESCRIPTOR.message_types_by_name['RegisterObjectReply'] = _REGISTEROBJECTREPLY +DESCRIPTOR.message_types_by_name['UnregisterObjectRequest'] = _UNREGISTEROBJECTREQUEST +DESCRIPTOR.message_types_by_name['UnregisterObjectReply'] = _UNREGISTEROBJECTREPLY +DESCRIPTOR.message_types_by_name['RenameObjectRequest'] = _RENAMEOBJECTREQUEST +DESCRIPTOR.message_types_by_name['RenameObjectReply'] = _RENAMEOBJECTREPLY +DESCRIPTOR.message_types_by_name['LoadObjectRequest'] = _LOADOBJECTREQUEST +DESCRIPTOR.message_types_by_name['LoadObjectReply'] = _LOADOBJECTREPLY +DESCRIPTOR.message_types_by_name['QuarantineObjectRequest'] = _QUARANTINEOBJECTREQUEST +DESCRIPTOR.message_types_by_name['QuarantineObjectReply'] = _QUARANTINEOBJECTREPLY +DESCRIPTOR.message_types_by_name['UnquarantineObjectRequest'] = _UNQUARANTINEOBJECTREQUEST +DESCRIPTOR.message_types_by_name['UnquarantineObjectReply'] = _UNQUARANTINEOBJECTREPLY +DESCRIPTOR.message_types_by_name['LoadObjectsByPrefixRequest'] = _LOADOBJECTSBYPREFIXREQUEST +DESCRIPTOR.message_types_by_name['LoadObjectsByPrefixReply'] = _LOADOBJECTSBYPREFIXREPLY +DESCRIPTOR.message_types_by_name['LoadObjectsByVolumeRequest'] = _LOADOBJECTSBYVOLUMEREQUEST +DESCRIPTOR.message_types_by_name['LoadObjectsByVolumeReply'] = _LOADOBJECTSBYVOLUMEREPLY +DESCRIPTOR.message_types_by_name['ListPartitionsRequest'] = _LISTPARTITIONSREQUEST +DESCRIPTOR.message_types_by_name['ListPartitionRequest'] = _LISTPARTITIONREQUEST +DESCRIPTOR.message_types_by_name['ListSuffixRequest'] = _LISTSUFFIXREQUEST +DESCRIPTOR.message_types_by_name['ListQuarantinedOHashesRequest'] = _LISTQUARANTINEDOHASHESREQUEST +DESCRIPTOR.message_types_by_name['ListQuarantinedOHashesReply'] = _LISTQUARANTINEDOHASHESREPLY +DESCRIPTOR.message_types_by_name['QuarantinedObjectName'] = _QUARANTINEDOBJECTNAME +DESCRIPTOR.message_types_by_name['ListQuarantinedOHashRequest'] = _LISTQUARANTINEDOHASHREQUEST +DESCRIPTOR.message_types_by_name['ListQuarantinedOHashReply'] = _LISTQUARANTINEDOHASHREPLY +DESCRIPTOR.message_types_by_name['GetNextOffsetRequest'] = _GETNEXTOFFSETREQUEST +DESCRIPTOR.message_types_by_name['GetNextOffsetReply'] = _GETNEXTOFFSETREPLY +DESCRIPTOR.message_types_by_name['GetStatsRequest'] = _GETSTATSREQUEST +DESCRIPTOR.message_types_by_name['GetStatsReply'] = _GETSTATSREPLY +DESCRIPTOR.message_types_by_name['SetKvStateReply'] = _SETKVSTATEREPLY +DESCRIPTOR.message_types_by_name['GetKvStateRequest'] = _GETKVSTATEREQUEST +DESCRIPTOR.message_types_by_name['KvState'] = _KVSTATE +DESCRIPTOR.message_types_by_name['Volume'] = _VOLUME +DESCRIPTOR.message_types_by_name['Object'] = _OBJECT +DESCRIPTOR.message_types_by_name['DirEntries'] = _DIRENTRIES +DESCRIPTOR.enum_types_by_name['VolumeType'] = _VOLUMETYPE +DESCRIPTOR.enum_types_by_name['VolumeState'] = _VOLUMESTATE +_sym_db.RegisterFileDescriptor(DESCRIPTOR) + +RegisterVolumeRequest = _reflection.GeneratedProtocolMessageType('RegisterVolumeRequest', (_message.Message,), { + 'DESCRIPTOR' : _REGISTERVOLUMEREQUEST, + '__module__' : 'fmgr_pb2' + # @@protoc_insertion_point(class_scope:filemgr.RegisterVolumeRequest) + }) +_sym_db.RegisterMessage(RegisterVolumeRequest) + +RegisterVolumeReply = _reflection.GeneratedProtocolMessageType('RegisterVolumeReply', (_message.Message,), { + 'DESCRIPTOR' : _REGISTERVOLUMEREPLY, + '__module__' : 'fmgr_pb2' + # @@protoc_insertion_point(class_scope:filemgr.RegisterVolumeReply) + }) +_sym_db.RegisterMessage(RegisterVolumeReply) + +UnregisterVolumeRequest = _reflection.GeneratedProtocolMessageType('UnregisterVolumeRequest', (_message.Message,), { + 'DESCRIPTOR' : _UNREGISTERVOLUMEREQUEST, + '__module__' : 'fmgr_pb2' + # @@protoc_insertion_point(class_scope:filemgr.UnregisterVolumeRequest) + }) +_sym_db.RegisterMessage(UnregisterVolumeRequest) + +UnregisterVolumeReply = _reflection.GeneratedProtocolMessageType('UnregisterVolumeReply', (_message.Message,), { + 'DESCRIPTOR' : _UNREGISTERVOLUMEREPLY, + '__module__' : 'fmgr_pb2' + # @@protoc_insertion_point(class_scope:filemgr.UnregisterVolumeReply) + }) +_sym_db.RegisterMessage(UnregisterVolumeReply) + +UpdateVolumeStateRequest = _reflection.GeneratedProtocolMessageType('UpdateVolumeStateRequest', (_message.Message,), { + 'DESCRIPTOR' : _UPDATEVOLUMESTATEREQUEST, + '__module__' : 'fmgr_pb2' + # @@protoc_insertion_point(class_scope:filemgr.UpdateVolumeStateRequest) + }) +_sym_db.RegisterMessage(UpdateVolumeStateRequest) + +UpdateVolumeStateReply = _reflection.GeneratedProtocolMessageType('UpdateVolumeStateReply', (_message.Message,), { + 'DESCRIPTOR' : _UPDATEVOLUMESTATEREPLY, + '__module__' : 'fmgr_pb2' + # @@protoc_insertion_point(class_scope:filemgr.UpdateVolumeStateReply) + }) +_sym_db.RegisterMessage(UpdateVolumeStateReply) + +GetVolumeRequest = _reflection.GeneratedProtocolMessageType('GetVolumeRequest', (_message.Message,), { + 'DESCRIPTOR' : _GETVOLUMEREQUEST, + '__module__' : 'fmgr_pb2' + # @@protoc_insertion_point(class_scope:filemgr.GetVolumeRequest) + }) +_sym_db.RegisterMessage(GetVolumeRequest) + +GetVolumeReply = _reflection.GeneratedProtocolMessageType('GetVolumeReply', (_message.Message,), { + 'DESCRIPTOR' : _GETVOLUMEREPLY, + '__module__' : 'fmgr_pb2' + # @@protoc_insertion_point(class_scope:filemgr.GetVolumeReply) + }) +_sym_db.RegisterMessage(GetVolumeReply) + +ListVolumesRequest = _reflection.GeneratedProtocolMessageType('ListVolumesRequest', (_message.Message,), { + 'DESCRIPTOR' : _LISTVOLUMESREQUEST, + '__module__' : 'fmgr_pb2' + # @@protoc_insertion_point(class_scope:filemgr.ListVolumesRequest) + }) +_sym_db.RegisterMessage(ListVolumesRequest) + +ListVolumesReply = _reflection.GeneratedProtocolMessageType('ListVolumesReply', (_message.Message,), { + 'DESCRIPTOR' : _LISTVOLUMESREPLY, + '__module__' : 'fmgr_pb2' + # @@protoc_insertion_point(class_scope:filemgr.ListVolumesReply) + }) +_sym_db.RegisterMessage(ListVolumesReply) + +RegisterObjectRequest = _reflection.GeneratedProtocolMessageType('RegisterObjectRequest', (_message.Message,), { + 'DESCRIPTOR' : _REGISTEROBJECTREQUEST, + '__module__' : 'fmgr_pb2' + # @@protoc_insertion_point(class_scope:filemgr.RegisterObjectRequest) + }) +_sym_db.RegisterMessage(RegisterObjectRequest) + +RegisterObjectReply = _reflection.GeneratedProtocolMessageType('RegisterObjectReply', (_message.Message,), { + 'DESCRIPTOR' : _REGISTEROBJECTREPLY, + '__module__' : 'fmgr_pb2' + # @@protoc_insertion_point(class_scope:filemgr.RegisterObjectReply) + }) +_sym_db.RegisterMessage(RegisterObjectReply) + +UnregisterObjectRequest = _reflection.GeneratedProtocolMessageType('UnregisterObjectRequest', (_message.Message,), { + 'DESCRIPTOR' : _UNREGISTEROBJECTREQUEST, + '__module__' : 'fmgr_pb2' + # @@protoc_insertion_point(class_scope:filemgr.UnregisterObjectRequest) + }) +_sym_db.RegisterMessage(UnregisterObjectRequest) + +UnregisterObjectReply = _reflection.GeneratedProtocolMessageType('UnregisterObjectReply', (_message.Message,), { + 'DESCRIPTOR' : _UNREGISTEROBJECTREPLY, + '__module__' : 'fmgr_pb2' + # @@protoc_insertion_point(class_scope:filemgr.UnregisterObjectReply) + }) +_sym_db.RegisterMessage(UnregisterObjectReply) + +RenameObjectRequest = _reflection.GeneratedProtocolMessageType('RenameObjectRequest', (_message.Message,), { + 'DESCRIPTOR' : _RENAMEOBJECTREQUEST, + '__module__' : 'fmgr_pb2' + # @@protoc_insertion_point(class_scope:filemgr.RenameObjectRequest) + }) +_sym_db.RegisterMessage(RenameObjectRequest) + +RenameObjectReply = _reflection.GeneratedProtocolMessageType('RenameObjectReply', (_message.Message,), { + 'DESCRIPTOR' : _RENAMEOBJECTREPLY, + '__module__' : 'fmgr_pb2' + # @@protoc_insertion_point(class_scope:filemgr.RenameObjectReply) + }) +_sym_db.RegisterMessage(RenameObjectReply) + +LoadObjectRequest = _reflection.GeneratedProtocolMessageType('LoadObjectRequest', (_message.Message,), { + 'DESCRIPTOR' : _LOADOBJECTREQUEST, + '__module__' : 'fmgr_pb2' + # @@protoc_insertion_point(class_scope:filemgr.LoadObjectRequest) + }) +_sym_db.RegisterMessage(LoadObjectRequest) + +LoadObjectReply = _reflection.GeneratedProtocolMessageType('LoadObjectReply', (_message.Message,), { + 'DESCRIPTOR' : _LOADOBJECTREPLY, + '__module__' : 'fmgr_pb2' + # @@protoc_insertion_point(class_scope:filemgr.LoadObjectReply) + }) +_sym_db.RegisterMessage(LoadObjectReply) + +QuarantineObjectRequest = _reflection.GeneratedProtocolMessageType('QuarantineObjectRequest', (_message.Message,), { + 'DESCRIPTOR' : _QUARANTINEOBJECTREQUEST, + '__module__' : 'fmgr_pb2' + # @@protoc_insertion_point(class_scope:filemgr.QuarantineObjectRequest) + }) +_sym_db.RegisterMessage(QuarantineObjectRequest) + +QuarantineObjectReply = _reflection.GeneratedProtocolMessageType('QuarantineObjectReply', (_message.Message,), { + 'DESCRIPTOR' : _QUARANTINEOBJECTREPLY, + '__module__' : 'fmgr_pb2' + # @@protoc_insertion_point(class_scope:filemgr.QuarantineObjectReply) + }) +_sym_db.RegisterMessage(QuarantineObjectReply) + +UnquarantineObjectRequest = _reflection.GeneratedProtocolMessageType('UnquarantineObjectRequest', (_message.Message,), { + 'DESCRIPTOR' : _UNQUARANTINEOBJECTREQUEST, + '__module__' : 'fmgr_pb2' + # @@protoc_insertion_point(class_scope:filemgr.UnquarantineObjectRequest) + }) +_sym_db.RegisterMessage(UnquarantineObjectRequest) + +UnquarantineObjectReply = _reflection.GeneratedProtocolMessageType('UnquarantineObjectReply', (_message.Message,), { + 'DESCRIPTOR' : _UNQUARANTINEOBJECTREPLY, + '__module__' : 'fmgr_pb2' + # @@protoc_insertion_point(class_scope:filemgr.UnquarantineObjectReply) + }) +_sym_db.RegisterMessage(UnquarantineObjectReply) + +LoadObjectsByPrefixRequest = _reflection.GeneratedProtocolMessageType('LoadObjectsByPrefixRequest', (_message.Message,), { + 'DESCRIPTOR' : _LOADOBJECTSBYPREFIXREQUEST, + '__module__' : 'fmgr_pb2' + # @@protoc_insertion_point(class_scope:filemgr.LoadObjectsByPrefixRequest) + }) +_sym_db.RegisterMessage(LoadObjectsByPrefixRequest) + +LoadObjectsByPrefixReply = _reflection.GeneratedProtocolMessageType('LoadObjectsByPrefixReply', (_message.Message,), { + 'DESCRIPTOR' : _LOADOBJECTSBYPREFIXREPLY, + '__module__' : 'fmgr_pb2' + # @@protoc_insertion_point(class_scope:filemgr.LoadObjectsByPrefixReply) + }) +_sym_db.RegisterMessage(LoadObjectsByPrefixReply) + +LoadObjectsByVolumeRequest = _reflection.GeneratedProtocolMessageType('LoadObjectsByVolumeRequest', (_message.Message,), { + 'DESCRIPTOR' : _LOADOBJECTSBYVOLUMEREQUEST, + '__module__' : 'fmgr_pb2' + # @@protoc_insertion_point(class_scope:filemgr.LoadObjectsByVolumeRequest) + }) +_sym_db.RegisterMessage(LoadObjectsByVolumeRequest) + +LoadObjectsByVolumeReply = _reflection.GeneratedProtocolMessageType('LoadObjectsByVolumeReply', (_message.Message,), { + 'DESCRIPTOR' : _LOADOBJECTSBYVOLUMEREPLY, + '__module__' : 'fmgr_pb2' + # @@protoc_insertion_point(class_scope:filemgr.LoadObjectsByVolumeReply) + }) +_sym_db.RegisterMessage(LoadObjectsByVolumeReply) + +ListPartitionsRequest = _reflection.GeneratedProtocolMessageType('ListPartitionsRequest', (_message.Message,), { + 'DESCRIPTOR' : _LISTPARTITIONSREQUEST, + '__module__' : 'fmgr_pb2' + # @@protoc_insertion_point(class_scope:filemgr.ListPartitionsRequest) + }) +_sym_db.RegisterMessage(ListPartitionsRequest) + +ListPartitionRequest = _reflection.GeneratedProtocolMessageType('ListPartitionRequest', (_message.Message,), { + 'DESCRIPTOR' : _LISTPARTITIONREQUEST, + '__module__' : 'fmgr_pb2' + # @@protoc_insertion_point(class_scope:filemgr.ListPartitionRequest) + }) +_sym_db.RegisterMessage(ListPartitionRequest) + +ListSuffixRequest = _reflection.GeneratedProtocolMessageType('ListSuffixRequest', (_message.Message,), { + 'DESCRIPTOR' : _LISTSUFFIXREQUEST, + '__module__' : 'fmgr_pb2' + # @@protoc_insertion_point(class_scope:filemgr.ListSuffixRequest) + }) +_sym_db.RegisterMessage(ListSuffixRequest) + +ListQuarantinedOHashesRequest = _reflection.GeneratedProtocolMessageType('ListQuarantinedOHashesRequest', (_message.Message,), { + 'DESCRIPTOR' : _LISTQUARANTINEDOHASHESREQUEST, + '__module__' : 'fmgr_pb2' + # @@protoc_insertion_point(class_scope:filemgr.ListQuarantinedOHashesRequest) + }) +_sym_db.RegisterMessage(ListQuarantinedOHashesRequest) + +ListQuarantinedOHashesReply = _reflection.GeneratedProtocolMessageType('ListQuarantinedOHashesReply', (_message.Message,), { + 'DESCRIPTOR' : _LISTQUARANTINEDOHASHESREPLY, + '__module__' : 'fmgr_pb2' + # @@protoc_insertion_point(class_scope:filemgr.ListQuarantinedOHashesReply) + }) +_sym_db.RegisterMessage(ListQuarantinedOHashesReply) + +QuarantinedObjectName = _reflection.GeneratedProtocolMessageType('QuarantinedObjectName', (_message.Message,), { + 'DESCRIPTOR' : _QUARANTINEDOBJECTNAME, + '__module__' : 'fmgr_pb2' + # @@protoc_insertion_point(class_scope:filemgr.QuarantinedObjectName) + }) +_sym_db.RegisterMessage(QuarantinedObjectName) + +ListQuarantinedOHashRequest = _reflection.GeneratedProtocolMessageType('ListQuarantinedOHashRequest', (_message.Message,), { + 'DESCRIPTOR' : _LISTQUARANTINEDOHASHREQUEST, + '__module__' : 'fmgr_pb2' + # @@protoc_insertion_point(class_scope:filemgr.ListQuarantinedOHashRequest) + }) +_sym_db.RegisterMessage(ListQuarantinedOHashRequest) + +ListQuarantinedOHashReply = _reflection.GeneratedProtocolMessageType('ListQuarantinedOHashReply', (_message.Message,), { + 'DESCRIPTOR' : _LISTQUARANTINEDOHASHREPLY, + '__module__' : 'fmgr_pb2' + # @@protoc_insertion_point(class_scope:filemgr.ListQuarantinedOHashReply) + }) +_sym_db.RegisterMessage(ListQuarantinedOHashReply) + +GetNextOffsetRequest = _reflection.GeneratedProtocolMessageType('GetNextOffsetRequest', (_message.Message,), { + 'DESCRIPTOR' : _GETNEXTOFFSETREQUEST, + '__module__' : 'fmgr_pb2' + # @@protoc_insertion_point(class_scope:filemgr.GetNextOffsetRequest) + }) +_sym_db.RegisterMessage(GetNextOffsetRequest) + +GetNextOffsetReply = _reflection.GeneratedProtocolMessageType('GetNextOffsetReply', (_message.Message,), { + 'DESCRIPTOR' : _GETNEXTOFFSETREPLY, + '__module__' : 'fmgr_pb2' + # @@protoc_insertion_point(class_scope:filemgr.GetNextOffsetReply) + }) +_sym_db.RegisterMessage(GetNextOffsetReply) + +GetStatsRequest = _reflection.GeneratedProtocolMessageType('GetStatsRequest', (_message.Message,), { + 'DESCRIPTOR' : _GETSTATSREQUEST, + '__module__' : 'fmgr_pb2' + # @@protoc_insertion_point(class_scope:filemgr.GetStatsRequest) + }) +_sym_db.RegisterMessage(GetStatsRequest) + +GetStatsReply = _reflection.GeneratedProtocolMessageType('GetStatsReply', (_message.Message,), { + + 'StatsEntry' : _reflection.GeneratedProtocolMessageType('StatsEntry', (_message.Message,), { + 'DESCRIPTOR' : _GETSTATSREPLY_STATSENTRY, + '__module__' : 'fmgr_pb2' + # @@protoc_insertion_point(class_scope:filemgr.GetStatsReply.StatsEntry) + }) + , + 'DESCRIPTOR' : _GETSTATSREPLY, + '__module__' : 'fmgr_pb2' + # @@protoc_insertion_point(class_scope:filemgr.GetStatsReply) + }) +_sym_db.RegisterMessage(GetStatsReply) +_sym_db.RegisterMessage(GetStatsReply.StatsEntry) + +SetKvStateReply = _reflection.GeneratedProtocolMessageType('SetKvStateReply', (_message.Message,), { + 'DESCRIPTOR' : _SETKVSTATEREPLY, + '__module__' : 'fmgr_pb2' + # @@protoc_insertion_point(class_scope:filemgr.SetKvStateReply) + }) +_sym_db.RegisterMessage(SetKvStateReply) + +GetKvStateRequest = _reflection.GeneratedProtocolMessageType('GetKvStateRequest', (_message.Message,), { + 'DESCRIPTOR' : _GETKVSTATEREQUEST, + '__module__' : 'fmgr_pb2' + # @@protoc_insertion_point(class_scope:filemgr.GetKvStateRequest) + }) +_sym_db.RegisterMessage(GetKvStateRequest) + +KvState = _reflection.GeneratedProtocolMessageType('KvState', (_message.Message,), { + 'DESCRIPTOR' : _KVSTATE, + '__module__' : 'fmgr_pb2' + # @@protoc_insertion_point(class_scope:filemgr.KvState) + }) +_sym_db.RegisterMessage(KvState) + +Volume = _reflection.GeneratedProtocolMessageType('Volume', (_message.Message,), { + 'DESCRIPTOR' : _VOLUME, + '__module__' : 'fmgr_pb2' + # @@protoc_insertion_point(class_scope:filemgr.Volume) + }) +_sym_db.RegisterMessage(Volume) + +Object = _reflection.GeneratedProtocolMessageType('Object', (_message.Message,), { + 'DESCRIPTOR' : _OBJECT, + '__module__' : 'fmgr_pb2' + # @@protoc_insertion_point(class_scope:filemgr.Object) + }) +_sym_db.RegisterMessage(Object) + +DirEntries = _reflection.GeneratedProtocolMessageType('DirEntries', (_message.Message,), { + 'DESCRIPTOR' : _DIRENTRIES, + '__module__' : 'fmgr_pb2' + # @@protoc_insertion_point(class_scope:filemgr.DirEntries) + }) +_sym_db.RegisterMessage(DirEntries) + + +_GETSTATSREPLY_STATSENTRY._options = None +# @@protoc_insertion_point(module_scope) diff --git a/swift/obj/header.py b/swift/obj/header.py new file mode 100644 index 000000000..fd442ea83 --- /dev/null +++ b/swift/obj/header.py @@ -0,0 +1,394 @@ +# Copyright (c) 2010-2012 OpenStack Foundation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import six +import os +import struct + +from swift.common.utils import fdatasync + +PICKLE_PROTOCOL = 2 + +# header version to use for new objects +OBJECT_HEADER_VERSION = 4 +VOLUME_HEADER_VERSION = 1 + +# maximum serialized header length +MAX_OBJECT_HEADER_LEN = 512 +MAX_VOLUME_HEADER_LEN = 128 + +OBJECT_START_MARKER = b"SWIFTOBJ" +VOLUME_START_MARKER = b"SWIFTVOL" + +# object alignment within a volume. +# this is needed so that FALLOC_FL_PUNCH_HOLE can actually return space back +# to the filesystem (tested on XFS and ext4) +# we may not need to align files in volumes dedicated to short-lived files, +# such as tombstones (.ts extension), +# but currently we do align for all volume types. +ALIGNMENT = 4096 + +# constants +STATE_OBJ_FILE = 0 +STATE_OBJ_QUARANTINED = 1 + + +class HeaderException(IOError): + def __init__(self, message): + self.message = message + super(HeaderException, self).__init__(message) + + +object_header_formats = { + 1: '8sBQQQ30sQQQQQ', + 2: '8sBQQQ64sQQQQQ', # 64 characters for the filename + 3: '8sBQQQ64sQQQQQB', # add state field + 4: '8sBQQQ64sQQQQQB32s' # add metadata checksum +} + + +class ObjectHeader(object): + """ + Version 1: + Magic string (8 bytes) + Header version (1 byte) + Policy index (8 bytes) + Object hash (16 bytes) (__init__) + Filename (30 chars) + Metadata offset (8 bytes) + Metadata size (8 bytes) + Data offset (8 bytes) + Data size (8 bytes) + Total object size (8 bytes) + + Version 2: similar but 64 chars for the filename + Version 3: Adds a "state" field (unsigned char) + """ + + def __init__(self, version=OBJECT_HEADER_VERSION): + if version not in object_header_formats.keys(): + raise HeaderException('Unsupported object header version') + self.magic_string = OBJECT_START_MARKER + self.version = version + + def __eq__(self, other): + return self.__dict__ == other.__dict__ + + def __len__(self): + try: + fmt = object_header_formats[self.version] + except KeyError: + raise HeaderException('Unsupported header version') + return struct.calcsize(fmt) + + def pack(self): + version_to_pack = { + 1: self.__pack_v1, + 2: self.__pack_v2, + 3: self.__pack_v3, + 4: self.__pack_v4 + } + return version_to_pack[self.version]() + + def __pack_v1(self): + fmt = object_header_formats[1] + ohash_h = int(self.ohash, 16) >> 64 + ohash_l = int(self.ohash, 16) & 0x0000000000000000ffffffffffffffff + + args = (self.magic_string, self.version, + self.policy_idx, ohash_h, ohash_l, + str(self.filename).encode('ascii'), + self.metadata_offset, self.metadata_size, + self.data_offset, self.data_size, self.total_size) + + return struct.pack(fmt, *args) + + def __pack_v2(self): + fmt = object_header_formats[2] + ohash_h = int(self.ohash, 16) >> 64 + ohash_l = int(self.ohash, 16) & 0x0000000000000000ffffffffffffffff + + args = (self.magic_string, self.version, + self.policy_idx, ohash_h, ohash_l, + str(self.filename).encode('ascii'), + self.metadata_offset, self.metadata_size, + self.data_offset, self.data_size, self.total_size) + + return struct.pack(fmt, *args) + + def __pack_v3(self): + fmt = object_header_formats[3] + ohash_h = int(self.ohash, 16) >> 64 + ohash_l = int(self.ohash, 16) & 0x0000000000000000ffffffffffffffff + + args = (self.magic_string, self.version, + self.policy_idx, ohash_h, ohash_l, + str(self.filename).encode('ascii'), + self.metadata_offset, self.metadata_size, + self.data_offset, self.data_size, self.total_size, self.state) + + return struct.pack(fmt, *args) + + def __pack_v4(self): + fmt = object_header_formats[4] + ohash_h = int(self.ohash, 16) >> 64 + ohash_l = int(self.ohash, 16) & 0x0000000000000000ffffffffffffffff + + args = (self.magic_string, self.version, + self.policy_idx, ohash_h, ohash_l, + str(self.filename).encode('ascii'), + self.metadata_offset, self.metadata_size, + self.data_offset, self.data_size, self.total_size, self.state, + self.metastr_md5) + + return struct.pack(fmt, *args) + + @classmethod + def unpack(cls, buf): + version_to_unpack = { + 1: cls.__unpack_v1, + 2: cls.__unpack_v2, + 3: cls.__unpack_v3, + 4: cls.__unpack_v4 + } + + if buf[0:8] != OBJECT_START_MARKER: + raise HeaderException('Not a header') + version = struct.unpack('<B', buf[8:9])[0] + if version not in object_header_formats.keys(): + raise HeaderException('Unsupported header version') + + return version_to_unpack[version](buf) + + @classmethod + def __unpack_v1(cls, buf): + fmt = object_header_formats[1] + raw_header = struct.unpack(fmt, buf[0:struct.calcsize(fmt)]) + header = cls() + header.magic_string = raw_header[0] + header.version = raw_header[1] + header.policy_idx = raw_header[2] + header.ohash = "{:032x}".format((raw_header[3] << 64) + raw_header[4]) + if six.PY2: + header.filename = raw_header[5].rstrip(b'\0') + else: + header.filename = raw_header[5].rstrip(b'\0').decode('ascii') + header.metadata_offset = raw_header[6] + header.metadata_size = raw_header[7] + header.data_offset = raw_header[8] + header.data_size = raw_header[9] + # currently, total_size gets padded to the next 4k boundary, so that + # fallocate can reclaim the block when hole punching. + header.total_size = raw_header[10] + + return header + + @classmethod + def __unpack_v2(cls, buf): + fmt = object_header_formats[2] + raw_header = struct.unpack(fmt, buf[0:struct.calcsize(fmt)]) + header = cls() + header.magic_string = raw_header[0] + header.version = raw_header[1] + header.policy_idx = raw_header[2] + header.ohash = "{:032x}".format((raw_header[3] << 64) + raw_header[4]) + if six.PY2: + header.filename = raw_header[5].rstrip(b'\0') + else: + header.filename = raw_header[5].rstrip(b'\0').decode('ascii') + header.metadata_offset = raw_header[6] + header.metadata_size = raw_header[7] + header.data_offset = raw_header[8] + header.data_size = raw_header[9] + # currently, total_size gets padded to the next 4k boundary, so that + # fallocate can reclaim the block when hole punching. + header.total_size = raw_header[10] + + return header + + @classmethod + def __unpack_v3(cls, buf): + fmt = object_header_formats[3] + raw_header = struct.unpack(fmt, buf[0:struct.calcsize(fmt)]) + header = cls() + header.magic_string = raw_header[0] + header.version = raw_header[1] + header.policy_idx = raw_header[2] + header.ohash = "{:032x}".format((raw_header[3] << 64) + raw_header[4]) + if six.PY2: + header.filename = raw_header[5].rstrip(b'\0') + else: + header.filename = raw_header[5].rstrip(b'\0').decode('ascii') + header.metadata_offset = raw_header[6] + header.metadata_size = raw_header[7] + header.data_offset = raw_header[8] + header.data_size = raw_header[9] + # currently, total_size gets padded to the next 4k boundary, so that + # fallocate can reclaim the block when hole punching. + header.total_size = raw_header[10] + header.state = raw_header[11] + + return header + + @classmethod + def __unpack_v4(cls, buf): + fmt = object_header_formats[4] + raw_header = struct.unpack(fmt, buf[0:struct.calcsize(fmt)]) + header = cls() + header.magic_string = raw_header[0] + header.version = raw_header[1] + header.policy_idx = raw_header[2] + header.ohash = "{:032x}".format((raw_header[3] << 64) + raw_header[4]) + if six.PY2: + header.filename = raw_header[5].rstrip(b'\0') + else: + header.filename = raw_header[5].rstrip(b'\0').decode('ascii') + header.metadata_offset = raw_header[6] + header.metadata_size = raw_header[7] + header.data_offset = raw_header[8] + header.data_size = raw_header[9] + # currently, total_size gets padded to the next 4k boundary, so that + # fallocate can reclaim the block when hole punching. + header.total_size = raw_header[10] + header.state = raw_header[11] + header.metastr_md5 = raw_header[12] + + return header + + +volume_header_formats = { + 1: '8sBQQQQLQ' +} + + +class VolumeHeader(object): + """ + Version 1: + Magic string (8 bytes) + Header version (1 byte) + Volume index (8 bytes) + Partition index (8 bytes) + Volume type (8 bytes) + First object offset (8 bytes) + Volume state (4 bytes) (enum from fmgr.proto) + Volume compaction target (8 bytes) + (only valid if state is STATE_COMPACTION_SRC) + """ + def __init__(self, version=VOLUME_HEADER_VERSION): + self.magic_string = VOLUME_START_MARKER + self.version = version + self.state = 0 + self.compaction_target = 0 + + def __str__(self): + prop_list = ['volume_idx', 'partition', 'type', + 'state', 'compaction_target'] + h_str = "" + for prop in prop_list: + h_str += "{}: {}\n".format(prop, getattr(self, prop)) + return h_str[:-1] + + def __len__(self): + try: + fmt = volume_header_formats[self.version] + except KeyError: + raise HeaderException('Unsupported header version') + return struct.calcsize(fmt) + + def pack(self): + version_to_pack = { + 1: self.__pack_v1, + } + return version_to_pack[self.version]() + + def __pack_v1(self): + fmt = volume_header_formats[1] + + args = (self.magic_string, self.version, + self.volume_idx, self.partition, self.type, + self.first_obj_offset, self.state, + self.compaction_target) + + return struct.pack(fmt, *args) + + @classmethod + def unpack(cls, buf): + version_to_unpack = { + 1: cls.__unpack_v1 + } + if buf[0:8] != VOLUME_START_MARKER: + raise HeaderException('Not a header') + version = struct.unpack('<B', buf[8:9])[0] + if version not in volume_header_formats.keys(): + raise HeaderException('Unsupported header version') + + return version_to_unpack[version](buf) + + @classmethod + def __unpack_v1(cls, buf): + fmt = volume_header_formats[1] + raw_header = struct.unpack(fmt, buf[0:struct.calcsize(fmt)]) + header = cls() + header.magic_string = raw_header[0] + header.version = raw_header[1] + header.volume_idx = raw_header[2] + header.partition = raw_header[3] + header.type = raw_header[4] + header.first_obj_offset = raw_header[5] + header.state = raw_header[6] + header.compaction_target = raw_header[7] + + return header + + +# Read volume header. Expects fp to be positionned at header offset +def read_volume_header(fp): + buf = fp.read(MAX_VOLUME_HEADER_LEN) + header = VolumeHeader.unpack(buf) + return header + + +def write_volume_header(header, fd): + os.write(fd, header.pack()) + + +def read_object_header(fp): + """ + Read object header + :param fp: opened file, positioned at header start + :return: an ObjectHeader + """ + buf = fp.read(MAX_OBJECT_HEADER_LEN) + header = ObjectHeader.unpack(buf) + return header + + +def write_object_header(header, fp): + """ + Rewrites header in open file + :param header: header to write + :param fp: opened volume + """ + fp.write(header.pack()) + fdatasync(fp.fileno()) + + +def erase_object_header(fd, offset): + """ + Erase an object header by writing null bytes over it + :param fd: volume file descriptor + :param offset: absolute header offset + """ + os.lseek(fd, offset, os.SEEK_SET) + os.write(fd, b"\x00" * MAX_OBJECT_HEADER_LEN) diff --git a/swift/obj/kvfile.py b/swift/obj/kvfile.py new file mode 100644 index 000000000..9a0347329 --- /dev/null +++ b/swift/obj/kvfile.py @@ -0,0 +1,1260 @@ +# Copyright (c) 2010-2012 OpenStack Foundation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import copy +import errno +import os +import time +import json +from hashlib import md5 +import logging +import traceback +from os.path import basename, dirname, join +from random import shuffle +from contextlib import contextmanager +from collections import defaultdict + +from eventlet import Timeout, tpool +import six + +from swift import gettext_ as _ +from swift.common.constraints import check_drive +from swift.common.request_helpers import is_sys_meta +from swift.common.utils import fdatasync, \ + config_true_value, listdir, split_path, lock_path +from swift.common.exceptions import DiskFileQuarantined, DiskFileNotExist, \ + DiskFileCollision, DiskFileNoSpace, DiskFileDeviceUnavailable, \ + DiskFileError, PathNotDir, \ + DiskFileExpired, DiskFileXattrNotSupported, \ + DiskFileBadMetadataChecksum +from swift.common.storage_policy import ( + split_policy_string, POLICIES, + REPL_POLICY, EC_POLICY) + +from swift.obj import vfile +from swift.obj.diskfile import BaseDiskFileManager, DiskFileManager, \ + ECDiskFileManager, BaseDiskFile, DiskFile, DiskFileReader, DiskFileWriter,\ + BaseDiskFileReader, BaseDiskFileWriter, ECDiskFile, ECDiskFileReader, \ + ECDiskFileWriter, AuditLocation, RESERVED_DATAFILE_META, \ + DATAFILE_SYSTEM_META, strip_self, DEFAULT_RECLAIM_AGE, _encode_metadata, \ + get_part_path, get_data_dir, update_auditor_status, extract_policy, \ + HASH_FILE, read_hashes, write_hashes, consolidate_hashes, invalidate_hash + + +def quarantine_vrenamer(device_path, corrupted_file_path): + """ + In the case that a file is corrupted, move it to a quarantined + area to allow replication to fix it. + + :params device_path: The path to the device the corrupted file is on. + :params corrupted_file_path: The path to the file you want quarantined. + + :returns: path (str) of directory the file was moved to + :raises OSError: re-raises non errno.EEXIST / errno.ENOTEMPTY + exceptions from rename + """ + policy = extract_policy(corrupted_file_path) + if policy is None: + # TODO: support a quarantine-unknown location + policy = POLICIES.legacy + + # rename key in KV + return vfile.quarantine_ohash(dirname(corrupted_file_path), policy) + + +def object_audit_location_generator(devices, datadir, mount_check=True, + logger=None, device_dirs=None, + auditor_type="ALL"): + """ + Given a devices path (e.g. "/srv/node"), yield an AuditLocation for all + objects stored under that directory for the given datadir (policy), + if device_dirs isn't set. If device_dirs is set, only yield AuditLocation + for the objects under the entries in device_dirs. The AuditLocation only + knows the path to the hash directory, not to the .data file therein + (if any). This is to avoid a double listdir(hash_dir); the DiskFile object + will always do one, so we don't. + + :param devices: parent directory of the devices to be audited + :param datadir: objects directory + :param mount_check: flag to check if a mount check should be performed + on devices + :param logger: a logger object + :param device_dirs: a list of directories under devices to traverse + :param auditor_type: either ALL or ZBF + """ + if not device_dirs: + device_dirs = listdir(devices) + else: + # remove bogus devices and duplicates from device_dirs + device_dirs = list( + set(listdir(devices)).intersection(set(device_dirs))) + # randomize devices in case of process restart before sweep completed + shuffle(device_dirs) + + base, policy = split_policy_string(datadir) + for device in device_dirs: + if not check_drive(devices, device, mount_check): + if logger: + logger.debug( + 'Skipping %s as it is not %s', device, + 'mounted' if mount_check else 'a dir') + continue + + datadir_path = os.path.join(devices, device, datadir) + if not os.path.exists(datadir_path): + continue + + partitions = get_auditor_status(datadir_path, logger, auditor_type) + + for pos, partition in enumerate(partitions): + update_auditor_status(datadir_path, logger, + partitions[pos:], auditor_type) + part_path = os.path.join(datadir_path, partition) + try: + suffixes = vfile.listdir(part_path) + except OSError as e: + if e.errno != errno.ENOTDIR: + raise + continue + for asuffix in suffixes: + suff_path = os.path.join(part_path, asuffix) + try: + hashes = vfile.listdir(suff_path) + except OSError as e: + if e.errno != errno.ENOTDIR: + raise + continue + for hsh in hashes: + hsh_path = os.path.join(suff_path, hsh) + yield AuditLocation(hsh_path, device, partition, + policy) + + update_auditor_status(datadir_path, logger, [], auditor_type) + + +def get_auditor_status(datadir_path, logger, auditor_type): + auditor_status = os.path.join( + datadir_path, "auditor_status_%s.json" % auditor_type) + status = {} + try: + if six.PY3: + statusfile = open(auditor_status, encoding='utf8') + else: + statusfile = open(auditor_status, 'rb') + with statusfile: + status = statusfile.read() + except (OSError, IOError) as e: + if e.errno != errno.ENOENT and logger: + logger.warning(_('Cannot read %(auditor_status)s (%(err)s)') % + {'auditor_status': auditor_status, 'err': e}) + return vfile.listdir(datadir_path) + try: + status = json.loads(status) + except ValueError as e: + logger.warning(_('Loading JSON from %(auditor_status)s failed' + ' (%(err)s)') % + {'auditor_status': auditor_status, 'err': e}) + return vfile.listdir(datadir_path) + return status['partitions'] + + +class BaseKVFile(BaseDiskFile): + # Todo: we may want a separate __init__ to define KV specific attribute + def open(self, modernize=False, current_time=None): + """ + Open the object. + + This implementation opens the data file representing the object, reads + the associated metadata in the extended attributes, additionally + combining metadata from fast-POST `.meta` files. + + :param modernize: if set, update this diskfile to the latest format. + Currently, this means adding metadata checksums if none are + present. + :param current_time: Unix time used in checking expiration. If not + present, the current time will be used. + + .. note:: + + An implementation is allowed to raise any of the following + exceptions, but is only required to raise `DiskFileNotExist` when + the object representation does not exist. + + :raises DiskFileCollision: on name mis-match with metadata + :raises DiskFileNotExist: if the object does not exist + :raises DiskFileDeleted: if the object was previously deleted + :raises DiskFileQuarantined: if while reading metadata of the file + some data did pass cross checks + :returns: itself for use as a context manager + """ + try: + files = vfile.listdir(self._datadir) + except (OSError, vfile.VFileException) as err: + raise DiskFileError( + "Error listing directory %s: %s" % (self._datadir, err)) + + # gather info about the valid files to use to open the DiskFile + file_info = self._get_ondisk_files(files) + + self._data_file = file_info.get('data_file') + if not self._data_file: + raise self._construct_exception_from_ts_file(**file_info) + self._vfr = self._construct_from_data_file( + current_time=current_time, modernize=modernize, **file_info) + # This method must populate the internal _metadata attribute. + self._metadata = self._metadata or {} + return self + + def _verify_data_file(self, data_file, data_vfr, current_time): + """ + Verify the metadata's name value matches what we think the object is + named. + + :param data_file: data file name being consider, used when quarantines + occur + :param fp: open file pointer so that we can `fstat()` the file to + verify the on-disk size with Content-Length metadata value + :param current_time: Unix time used in checking expiration + :raises DiskFileCollision: if the metadata stored name does not match + the referenced name of the file + :raises DiskFileExpired: if the object has expired + :raises DiskFileQuarantined: if data inconsistencies were detected + between the metadata and the file-system + metadata + """ + try: + mname = self._metadata['name'] + except KeyError: + raise self._quarantine(data_file, "missing name metadata") + else: + if mname != self._name: + self._logger.error( + _('Client path %(client)s does not match ' + 'path stored in object metadata %(meta)s'), + {'client': self._name, 'meta': mname}) + raise DiskFileCollision('Client path does not match path ' + 'stored in object metadata') + try: + x_delete_at = int(self._metadata['X-Delete-At']) + except KeyError: + pass + except ValueError: + # Quarantine, the x-delete-at key is present but not an + # integer. + raise self._quarantine( + data_file, "bad metadata x-delete-at value %s" % ( + self._metadata['X-Delete-At'])) + else: + if current_time is None: + current_time = time.time() + if x_delete_at <= current_time and not self._open_expired: + raise DiskFileExpired(metadata=self._metadata) + try: + metadata_size = int(self._metadata['Content-Length']) + except KeyError: + raise self._quarantine( + data_file, "missing content-length in metadata") + except ValueError: + # Quarantine, the content-length key is present but not an + # integer. + raise self._quarantine( + data_file, "bad metadata content-length value %s" % ( + self._metadata['Content-Length'])) + + obj_size = data_vfr.data_size + if obj_size != metadata_size: + raise self._quarantine( + data_file, "metadata content-length %s does" + " not match actual object size %s" % ( + metadata_size, obj_size)) + self._content_length = obj_size + return obj_size + + def _failsafe_read_metadata(self, source, quarantine_filename=None, + add_missing_checksum=False): + """ + Read metadata from source object file. In case of failure, quarantine + the file. + + Takes source and filename separately so we can read from an open + file if we have one. + + :param source: file descriptor or filename to load the metadata from + :param quarantine_filename: full path of file to load the metadata from + :param add_missing_checksum: ignored + """ + try: + vfr = vfile.VFileReader.get_vfile(source, self._logger) + vfr_metadata = vfr.metadata + vfr.close() + return vfr_metadata + except (DiskFileXattrNotSupported, DiskFileNotExist): + raise + except DiskFileBadMetadataChecksum as err: + raise self._quarantine(quarantine_filename, str(err)) + except Exception as err: + raise self._quarantine( + quarantine_filename, + "Exception reading metadata: %s" % err) + + # This could be left unchanged now that _failsafe_read_metadata() is + # patched. Test it. + def _merge_content_type_metadata(self, ctype_file): + """ + When a second .meta file is providing the most recent Content-Type + metadata then merge it into the metafile_metadata. + + :param ctype_file: An on-disk .meta file + """ + try: + ctype_vfr = vfile.VFileReader.get_vfile(ctype_file, self._logger) + except IOError as e: + if e.errno == errno.ENOENT: + raise DiskFileNotExist() + raise + ctypefile_metadata = ctype_vfr.metadata + ctype_vfr.close() + if ('Content-Type' in ctypefile_metadata + and (ctypefile_metadata.get('Content-Type-Timestamp') > + self._metafile_metadata.get('Content-Type-Timestamp')) + and (ctypefile_metadata.get('Content-Type-Timestamp') > + self.data_timestamp)): + self._metafile_metadata['Content-Type'] = \ + ctypefile_metadata['Content-Type'] + self._metafile_metadata['Content-Type-Timestamp'] = \ + ctypefile_metadata.get('Content-Type-Timestamp') + + def _construct_from_data_file(self, data_file, meta_file, ctype_file, + current_time, modernize=False, + **kwargs): + """ + Open the `.data` file to fetch its metadata, and fetch the metadata + from fast-POST `.meta` files as well if any exist, merging them + properly. + + :param data_file: on-disk `.data` file being considered + :param meta_file: on-disk fast-POST `.meta` file being considered + :param ctype_file: on-disk fast-POST `.meta` file being considered that + contains content-type and content-type timestamp + :param current_time: Unix time used in checking expiration + :param modernize: ignored + :returns: an opened data file pointer + :raises DiskFileError: various exceptions from + :func:`swift.obj.diskfile.DiskFile._verify_data_file` + """ + # TODO: need to catch exception, check if ENOENT (see in diskfile) + try: + data_vfr = vfile.VFileReader.get_vfile(data_file, self._logger) + except IOError as e: + if e.errno == errno.ENOENT: + raise DiskFileNotExist() + raise + + self._datafile_metadata = data_vfr.metadata + + self._metadata = {} + if meta_file: + self._metafile_metadata = self._failsafe_read_metadata( + meta_file, meta_file) + + if ctype_file and ctype_file != meta_file: + self._merge_content_type_metadata(ctype_file) + sys_metadata = dict( + [(key, val) for key, val in self._datafile_metadata.items() + if key.lower() in (RESERVED_DATAFILE_META | + DATAFILE_SYSTEM_META) + or is_sys_meta('object', key)]) + self._metadata.update(self._metafile_metadata) + self._metadata.update(sys_metadata) + # diskfile writer added 'name' to metafile, so remove it here + self._metafile_metadata.pop('name', None) + # TODO: the check for Content-Type is only here for tests that + # create .data files without Content-Type + if ('Content-Type' in self._datafile_metadata and + (self.data_timestamp > + self._metafile_metadata.get('Content-Type-Timestamp'))): + self._metadata['Content-Type'] = \ + self._datafile_metadata['Content-Type'] + self._metadata.pop('Content-Type-Timestamp', None) + else: + self._metadata.update(self._datafile_metadata) + if self._name is None: + # If we don't know our name, we were just given a hash dir at + # instantiation, so we'd better validate that the name hashes back + # to us + self._name = self._metadata['name'] + self._verify_name_matches_hash(data_file) + self._verify_data_file(data_file, data_vfr, current_time) + return data_vfr + + def reader(self, keep_cache=False, + _quarantine_hook=lambda m: None): + """ + Return a :class:`swift.common.swob.Response` class compatible + "`app_iter`" object as defined by + :class:`swift.obj.diskfile.DiskFileReader`. + + For this implementation, the responsibility of closing the open file + is passed to the :class:`swift.obj.diskfile.DiskFileReader` object. + + :param keep_cache: caller's preference for keeping data read in the + OS buffer cache + :param _quarantine_hook: 1-arg callable called when obj quarantined; + the arg is the reason for quarantine. + Default is to ignore it. + Not needed by the REST layer. + :returns: a :class:`swift.obj.diskfile.DiskFileReader` object + """ + dr = self.reader_cls( + self._vfr, self._data_file, int(self._metadata['Content-Length']), + self._metadata['ETag'], self._disk_chunk_size, + self._manager.keep_cache_size, self._device_path, self._logger, + use_splice=self._use_splice, quarantine_hook=_quarantine_hook, + pipe_size=self._pipe_size, diskfile=self, keep_cache=keep_cache) + # At this point the reader object is now responsible for closing + # the file pointer. + # self._fp = None + self._vfr = None + return dr + + def writer(self, size=None): + return self.writer_cls(self._manager, self._name, self._datadir, size, + self._bytes_per_sync, self, self._logger) + + def _get_tempfile(self): + raise Exception("_get_tempfile called, shouldn't happen") + + @contextmanager + def create(self, size=None, extension=None): + """ + Context manager to create a vfile. + It could create separate volumes based on the extension. + + Currently no caller will set the extension. The easiest would be to + patch server.py, in DELETE(), add an extension=".ts" parameter to the + self.get_diskfile() call, as kwargs is passed all the way down to here. + + It's also possible to have the writer_cls handle the volume creation + later: at the first write() call, if any, assume it's not a tombstone, + and in put(), check self._extension. + + :param size: optional initial size of file to explicitly allocate on + disk + :param extension: file extension, with dot ('.ts') + :raises DiskFileNoSpace: if a size is specified and allocation fails + """ + dfw = self.writer(size) + try: + yield dfw.open() + finally: + dfw.close() + + +class BaseKVFileReader(BaseDiskFileReader): + def __init__(self, vfr, data_file, obj_size, etag, + disk_chunk_size, keep_cache_size, device_path, logger, + quarantine_hook, use_splice, pipe_size, diskfile, + keep_cache=False): + # Parameter tracking + self._vfr = vfr + self._data_file = data_file + self._obj_size = obj_size + self._etag = etag + self._diskfile = diskfile + self._disk_chunk_size = disk_chunk_size + self._device_path = device_path + self._logger = logger + self._quarantine_hook = quarantine_hook + self._use_splice = use_splice + self._pipe_size = pipe_size + if keep_cache: + # Caller suggests we keep this in cache, only do it if the + # object's size is less than the maximum. + self._keep_cache = obj_size < keep_cache_size + else: + self._keep_cache = False + + # Internal Attributes + self._iter_etag = None + self._bytes_read = 0 + self._started_at_0 = False + self._read_to_eof = False + self._md5_of_sent_bytes = None + self._suppress_file_closing = False + self._quarantined_dir = None + + def _init_checks(self): + if self._vfr.tell() == 0: + self._started_at_0 = True + self._iter_etag = md5() + + def __iter__(self): + """Returns an iterator over the data file.""" + try: + self._bytes_read = 0 + self._started_at_0 = False + self._read_to_eof = False + self._init_checks() + while True: + chunk = self._vfr.read(self._disk_chunk_size) + if chunk: + self._update_checks(chunk) + self._bytes_read += len(chunk) + yield chunk + else: + self._read_to_eof = True + break + finally: + if not self._suppress_file_closing: + self.close() + + def can_zero_copy_send(self): + return False + + def app_iter_range(self, start, stop): + """ + Returns an iterator over the data file for range (start, stop) + + """ + if start or start == 0: + self._vfr.seek(start) + if stop is not None: + length = stop - start + else: + length = None + try: + for chunk in self: + if length is not None: + length -= len(chunk) + if length < 0: + # Chop off the extra: + yield chunk[:length] + break + yield chunk + finally: + if not self._suppress_file_closing: + self.close() + + def close(self): + """ + Close the open file handle if present. + + For this specific implementation, this method will handle quarantining + the file if necessary. + """ + if self._vfr: + try: + if self._started_at_0 and self._read_to_eof: + self._handle_close_quarantine() + except DiskFileQuarantined: + raise + except (Exception, Timeout) as e: + self._logger.error(_( + 'ERROR DiskFile %(data_file)s' + ' close failure: %(exc)s : %(stack)s'), + {'exc': e, 'stack': ''.join(traceback.format_stack()), + 'data_file': self._data_file}) + finally: + vfr, self._vfr = self._vfr, None + vfr.close() + + +class BaseKVFileWriter(BaseDiskFileWriter): + def __init__(self, mgr, name, datadir, size, bytes_per_sync, diskfile, + logger): + # Parameter tracking + self._manager = mgr + self._name = name + self._datadir = datadir + self._vfile_writer = None + self._tmppath = None + self._size = size + self._chunks_etag = md5() + self._bytes_per_sync = bytes_per_sync + self._diskfile = diskfile + self._logger = logger + + # Internal attributes + self._upload_size = 0 + self._last_sync = 0 + self._extension = '.data' + self._put_succeeded = False + + def open(self): + if self._vfile_writer is not None: + raise ValueError('DiskFileWriter is already open') + + try: + # TODO: support extension + self._vfile_writer = vfile.VFileWriter.create( + self._datadir, self._size, self._manager.vfile_conf, + self._logger, extension=None) + except OSError as err: + if err.errno in (errno.ENOSPC, errno.EDQUOT): + # No more inodes in filesystem + raise DiskFileNoSpace(err.strerror) + raise + return self + + def close(self): + if self._vfile_writer: + try: + os.close(self._vfile_writer.fd) + os.close(self._vfile_writer.lock_fd) + except OSError: + pass + self._vfile_writer = None + + def write(self, chunk): + """ + Write a chunk of data to disk. All invocations of this method must + come before invoking the :func: + + :param chunk: the chunk of data to write as a string object + + :returns: the total number of bytes written to an object + """ + + if not self._vfile_writer: + raise ValueError('Writer is not open') + self._chunks_etag.update(chunk) + while chunk: + written = os.write(self._vfile_writer.fd, chunk) + self._upload_size += written + chunk = chunk[written:] + + # For large files sync every 512MB (by default) written + diff = self._upload_size - self._last_sync + if diff >= self._bytes_per_sync: + tpool.execute(fdatasync, self._vfile_writer.fd) + # drop_buffer_cache(self._vfile_writer.fd, self._last_sync, diff) + self._last_sync = self._upload_size + + return self._upload_size + + def _finalize_put(self, metadata, target_path, cleanup): + filename = basename(target_path) + # write metadata and sync + self._vfile_writer.commit(filename, _encode_metadata(metadata)) + self._put_succeeded = True + if cleanup: + try: + self.manager.cleanup_ondisk_files(self._datadir)['files'] + except OSError: + logging.exception(_('Problem cleaning up %s'), self._datadir) + + +class KVFileReader(BaseKVFileReader, DiskFileReader): + pass + + +class KVFileWriter(BaseKVFileWriter, DiskFileWriter): + def put(self, metadata): + """ + Finalize writing the file on disk. + + :param metadata: dictionary of metadata to be associated with the + object + """ + super(KVFileWriter, self)._put(metadata, True) + + +class KVFile(BaseKVFile, DiskFile): + reader_cls = KVFileReader + writer_cls = KVFileWriter + + +class BaseKVFileManager(BaseDiskFileManager): + diskfile_cls = None # must be set by subclasses + + invalidate_hash = strip_self(invalidate_hash) + consolidate_hashes = strip_self(consolidate_hashes) + quarantine_renamer = strip_self(quarantine_vrenamer) + + def __init__(self, conf, logger): + self.logger = logger + self.devices = conf.get('devices', '/srv/node') + self.disk_chunk_size = int(conf.get('disk_chunk_size', 65536)) + self.keep_cache_size = int(conf.get('keep_cache_size', 5242880)) + self.bytes_per_sync = int(conf.get('mb_per_sync', 512)) * 1024 * 1024 + # vfile specific config + self.vfile_conf = {} + self.vfile_conf['volume_alloc_chunk_size'] = int( + conf.get('volume_alloc_chunk_size', 16 * 1024)) + self.vfile_conf['volume_low_free_space'] = int( + conf.get('volume_low_free_space', 8 * 1024)) + self.vfile_conf['metadata_reserve'] = int( + conf.get('metadata_reserve', 500)) + self.vfile_conf['max_volume_count'] = int( + conf.get('max_volume_count', 1000)) + self.vfile_conf['max_volume_size'] = int( + conf.get('max_volume_size', 10 * 1024 * 1024 * 1024)) + # end vfile specific config + self.mount_check = config_true_value(conf.get('mount_check', 'true')) + + self.reclaim_age = int(conf.get('reclaim_age', DEFAULT_RECLAIM_AGE)) + replication_concurrency_per_device = conf.get( + 'replication_concurrency_per_device') + replication_one_per_device = conf.get('replication_one_per_device') + if replication_concurrency_per_device is None \ + and replication_one_per_device is not None: + self.logger.warning('Option replication_one_per_device is ' + 'deprecated and will be removed in a future ' + 'version. Update your configuration to use ' + 'option replication_concurrency_per_device.') + if config_true_value(replication_one_per_device): + replication_concurrency_per_device = 1 + else: + replication_concurrency_per_device = 0 + elif replication_one_per_device is not None: + self.logger.warning('Option replication_one_per_device ignored as ' + 'replication_concurrency_per_device is ' + 'defined.') + if replication_concurrency_per_device is None: + self.replication_concurrency_per_device = 1 + else: + self.replication_concurrency_per_device = int( + replication_concurrency_per_device) + self.replication_lock_timeout = int(conf.get( + 'replication_lock_timeout', 15)) + + self.use_splice = False + self.pipe_size = None + + def cleanup_ondisk_files(self, hsh_path, **kwargs): + """ + Clean up on-disk files that are obsolete and gather the set of valid + on-disk files for an object. + + :param hsh_path: object hash path + :param frag_index: if set, search for a specific fragment index .data + file, otherwise accept the first valid .data file + :returns: a dict that may contain: valid on disk files keyed by their + filename extension; a list of obsolete files stored under the + key 'obsolete'; a list of files remaining in the directory, + reverse sorted, stored under the key 'files'. + """ + + def is_reclaimable(timestamp): + return (time.time() - float(timestamp)) > self.reclaim_age + + files = vfile.listdir(hsh_path) + + files.sort(reverse=True) + results = self.get_ondisk_files( + files, hsh_path, verify=False, **kwargs) + if 'ts_info' in results and is_reclaimable( + results['ts_info']['timestamp']): + remove_vfile(join(hsh_path, results['ts_info']['filename'])) + files.remove(results.pop('ts_info')['filename']) + for file_info in results.get('possible_reclaim', []): + # stray files are not deleted until reclaim-age + if is_reclaimable(file_info['timestamp']): + results.setdefault('obsolete', []).append(file_info) + for file_info in results.get('obsolete', []): + remove_vfile(join(hsh_path, file_info['filename'])) + files.remove(file_info['filename']) + results['files'] = files + + return results + + def object_audit_location_generator(self, policy, device_dirs=None, + auditor_type="ALL"): + """ + Yield an AuditLocation for all objects stored under device_dirs. + + :param policy: the StoragePolicy instance + :param device_dirs: directory of target device + :param auditor_type: either ALL or ZBF + """ + datadir = get_data_dir(policy) + return object_audit_location_generator(self.devices, datadir, + self.mount_check, + self.logger, device_dirs, + auditor_type) + + def _hash_suffix_dir(self, path, policy): + """ + + :param path: full path to directory + :param policy: storage policy used + """ + if six.PY2: + hashes = defaultdict(md5) + else: + class shim(object): + def __init__(self): + self.md5 = md5() + + def update(self, s): + if isinstance(s, str): + self.md5.update(s.encode('utf-8')) + else: + self.md5.update(s) + + def hexdigest(self): + return self.md5.hexdigest() + hashes = defaultdict(shim) + try: + path_contents = sorted(vfile.listdir(path)) + except OSError as err: + if err.errno in (errno.ENOTDIR, errno.ENOENT): + raise PathNotDir() + raise + for hsh in path_contents: + hsh_path = os.path.join(path, hsh) + try: + ondisk_info = self.cleanup_ondisk_files( + hsh_path, policy=policy) + except OSError as err: + if err.errno == errno.ENOTDIR: + partition_path = os.path.dirname(path) + objects_path = os.path.dirname(partition_path) + device_path = os.path.dirname(objects_path) + # The made-up filename is so that the eventual dirpath() + # will result in this object directory that we care about. + # Some failures will result in an object directory + # becoming a file, thus causing the parent directory to + # be qarantined. + quar_path = quarantine_vrenamer( + device_path, os.path.join( + hsh_path, "made-up-filename")) + logging.exception( + _('Quarantined %(hsh_path)s to %(quar_path)s because ' + 'it is not a directory'), {'hsh_path': hsh_path, + 'quar_path': quar_path}) + continue + raise + if not ondisk_info['files']: + continue + + # ondisk_info has info dicts containing timestamps for those + # files that could determine the state of the diskfile if it were + # to be opened. We update the suffix hash with the concatenation of + # each file's timestamp and extension. The extension is added to + # guarantee distinct hash values from two object dirs that have + # different file types at the same timestamp(s). + # + # Files that may be in the object dir but would have no effect on + # the state of the diskfile are not used to update the hash. + for key in (k for k in ('meta_info', 'ts_info') + if k in ondisk_info): + info = ondisk_info[key] + hashes[None].update(info['timestamp'].internal + info['ext']) + + # delegate to subclass for data file related updates... + self._update_suffix_hashes(hashes, ondisk_info) + + if 'ctype_info' in ondisk_info: + # We have a distinct content-type timestamp so update the + # hash. As a precaution, append '_ctype' to differentiate this + # value from any other timestamp value that might included in + # the hash in future. There is no .ctype file so use _ctype to + # avoid any confusion. + info = ondisk_info['ctype_info'] + hashes[None].update(info['ctype_timestamp'].internal + + '_ctype') + + return hashes + + def _get_hashes(self, *args, **kwargs): + hashed, hashes = self.__get_hashes(*args, **kwargs) + hashes.pop('updated', None) + hashes.pop('valid', None) + return hashed, hashes + + def __get_hashes(self, device, partition, policy, recalculate=None, + do_listdir=False): + """ + Get hashes for each suffix dir in a partition. do_listdir causes it to + mistrust the hash cache for suffix existence at the (unexpectedly high) + cost of a listdir. + + :param device: name of target device + :param partition: partition on the device in which the object lives + :param policy: the StoragePolicy instance + :param recalculate: list of suffixes which should be recalculated when + got + :param do_listdir: force existence check for all hashes in the + partition + + :returns: tuple of (number of suffix dirs hashed, dictionary of hashes) + """ + hashed = 0 + dev_path = self.get_dev_path(device) + partition_path = get_part_path(dev_path, policy, partition) + hashes_file = os.path.join(partition_path, HASH_FILE) + modified = False + orig_hashes = {'valid': False} + + if recalculate is None: + recalculate = [] + + try: + orig_hashes = self.consolidate_hashes(partition_path) + except Exception: + self.logger.warning('Unable to read %r', hashes_file, + exc_info=True) + + if not orig_hashes['valid']: + # This is the only path to a valid hashes from invalid read (e.g. + # does not exist, corrupt, etc.). Moreover, in order to write this + # valid hashes we must read *the exact same* invalid state or we'll + # trigger race detection. + do_listdir = True + hashes = {'valid': True} + # If the exception handling around consolidate_hashes fired we're + # going to do a full rehash regardless; but we need to avoid + # needless recursion if the on-disk hashes.pkl is actually readable + # (worst case is consolidate_hashes keeps raising exceptions and we + # eventually run out of stack). + # N.B. orig_hashes invalid only effects new parts and error/edge + # conditions - so try not to get overly caught up trying to + # optimize it out unless you manage to convince yourself there's a + # bad behavior. + orig_hashes = read_hashes(partition_path) + else: + hashes = copy.deepcopy(orig_hashes) + + if do_listdir: + for suff in vfile.listdir(partition_path): + if len(suff) == 3: + hashes.setdefault(suff, None) + modified = True + hashes.update((suffix, None) for suffix in recalculate) + for suffix, hash_ in list(hashes.items()): + if not hash_: + suffix_dir = os.path.join(partition_path, suffix) + try: + hashes[suffix] = self._hash_suffix( + suffix_dir, policy=policy) + hashed += 1 + except PathNotDir: + del hashes[suffix] + except OSError: + logging.exception(_('Error hashing suffix')) + modified = True + if modified: + with lock_path(partition_path): + if read_hashes(partition_path) == orig_hashes: + write_hashes(partition_path, hashes) + return hashed, hashes + return self.__get_hashes(device, partition, policy, + recalculate=recalculate, + do_listdir=do_listdir) + else: + return hashed, hashes + + def get_diskfile_from_hash(self, device, partition, object_hash, + policy, **kwargs): + """ + Returns a DiskFile instance for an object at the given + object_hash. Just in case someone thinks of refactoring, be + sure DiskFileDeleted is *not* raised, but the DiskFile + instance representing the tombstoned object is returned + instead. + + :param device: name of target device + :param partition: partition on the device in which the object lives + :param object_hash: the hash of an object path + :param policy: the StoragePolicy instance + :raises DiskFileNotExist: if the object does not exist + :returns: an instance of BaseDiskFile + """ + dev_path = self.get_dev_path(device) + if not dev_path: + raise DiskFileDeviceUnavailable() + object_path = join( + dev_path, get_data_dir(policy), str(partition), object_hash[-3:], + object_hash) + try: + filenames = self.cleanup_ondisk_files(object_path)['files'] + except OSError as err: + if err.errno == errno.ENOTDIR: + quar_path = self.quarantine_renamer(dev_path, object_path) + logging.exception( + _('Quarantined %(object_path)s to %(quar_path)s because ' + 'it is not a directory'), {'object_path': object_path, + 'quar_path': quar_path}) + raise DiskFileNotExist() + if err.errno != errno.ENOENT: + raise + raise DiskFileNotExist() + if not filenames: + raise DiskFileNotExist() + + try: + vf = vfile.VFileReader.get_vfile(join(object_path, filenames[-1]), + self.logger) + metadata = vf.metadata + vf.close() + except EOFError: + raise DiskFileNotExist() + try: + account, container, obj = split_path( + metadata.get('name', ''), 3, 3, True) + except ValueError: + raise DiskFileNotExist() + return self.diskfile_cls(self, dev_path, + partition, account, container, obj, + policy=policy, **kwargs) + + def _listdir(self, path): + """ + :param path: full path to directory + """ + try: + return vfile.listdir(path) + except OSError as err: + if err.errno != errno.ENOENT: + self.logger.error( + 'ERROR: Skipping %r due to error with listdir attempt: %s', + path, err) + return [] + + def exists(self, path): + """ + :param path: full path to directory + """ + return vfile.exists(path) + + def mkdirs(self, path): + """ + :param path: full path to directory + """ + return vfile.mkdirs(path) + + def listdir(self, path): + """ + :param path: full path to directory + """ + return vfile.listdir(path) + + def rmtree(self, path, ignore_errors=False): + vfile.rmtree(path) + + def remove_file(self, path): + """ + similar to utils.remove_file + :param path: full path to directory + """ + try: + return vfile.delete_vfile_from_path(path) + except (OSError, vfile.VFileException): + pass + + def remove(self, path): + """ + :param path: full path to directory + """ + return vfile.delete_vfile_from_path(path) + + def isdir(self, path): + """ + :param path: full path to directory + """ + return vfile.isdir(path) + + def isfile(self, path): + """ + :param path: full path to directory + """ + return vfile.isfile(path) + + def rmdir(self, path): + """ + :param path: full path to directory + """ + pass + + +class KVFileManager(BaseKVFileManager, DiskFileManager): + diskfile_cls = KVFile + policy_type = REPL_POLICY + + +class ECKVFileReader(BaseKVFileReader, ECDiskFileReader): + def __init__(self, vfr, data_file, obj_size, etag, + disk_chunk_size, keep_cache_size, device_path, logger, + quarantine_hook, use_splice, pipe_size, diskfile, + keep_cache=False): + super(ECKVFileReader, self).__init__( + vfr, data_file, obj_size, etag, + disk_chunk_size, keep_cache_size, device_path, logger, + quarantine_hook, use_splice, pipe_size, diskfile, keep_cache) + self.frag_buf = None + self.frag_offset = 0 + self.frag_size = self._diskfile.policy.fragment_size + + def _init_checks(self): + super(ECKVFileReader, self)._init_checks() + # for a multi-range GET this will be called at the start of each range; + # only initialise the frag_buf for reads starting at 0. + # TODO: reset frag buf to '' if tell() shows that start is on a frag + # boundary so that we check frags selected by a range not starting at 0 + # ECDOING - check _started_at_0 is defined correctly + if self._started_at_0: + self.frag_buf = '' + else: + self.frag_buf = None + + def _update_checks(self, chunk): + # super(ECKVFileReader, self)._update_checks(chunk) + + # Because of python's MRO, this will call + # ECDiskFileReader._update_checks, and blow up. + # rather than mess with the class's mro() function, explicitely call + # the one we want. + BaseDiskFileReader._update_checks(self, chunk) + if self.frag_buf is not None: + self.frag_buf += chunk + cursor = 0 + while len(self.frag_buf) >= cursor + self.frag_size: + self._check_frag(self.frag_buf[cursor:cursor + self.frag_size]) + cursor += self.frag_size + self.frag_offset += self.frag_size + if cursor: + self.frag_buf = self.frag_buf[cursor:] + + def _handle_close_quarantine(self): + # super(ECKVFileReader, self)._handle_close_quarantine() + BaseDiskFileReader._handle_close_quarantine(self) + self._check_frag(self.frag_buf) + + +class ECKVFileWriter(BaseKVFileWriter, ECDiskFileWriter): + # TODO: this needs to be updated wrt. next_part_power, and other changes + # in diskfile.py + def _finalize_durable(self, data_file_path, durable_data_file_path, + timestamp): + exc = None + try: + try: + vfile.rename_vfile(data_file_path, durable_data_file_path, + self._diskfile._logger) + except (OSError, IOError) as err: + if err.errno == errno.ENOENT: + files = vfile.listdir(self._datadir) + results = self.manager.get_ondisk_files( + files, self._datadir, + frag_index=self._diskfile._frag_index, + policy=self._diskfile.policy) + # We "succeeded" if another writer cleaned up our data + ts_info = results.get('ts_info') + durables = results.get('durable_frag_set', []) + if ts_info and ts_info['timestamp'] > timestamp: + return + elif any(frag_set['timestamp'] > timestamp + for frag_set in durables): + return + + if err.errno not in (errno.ENOSPC, errno.EDQUOT): + # re-raise to catch all handler + raise + params = {'file': durable_data_file_path, 'err': err} + self.manager.logger.exception( + _('No space left on device for %(file)s (%(err)s)'), + params) + exc = DiskFileNoSpace( + 'No space left on device for %(file)s (%(err)s)' % params) + else: + try: + self.manager.cleanup_ondisk_files(self._datadir)['files'] + except OSError as os_err: + self.manager.logger.exception( + _('Problem cleaning up %(datadir)s (%(err)s)'), + {'datadir': self._datadir, 'err': os_err}) + except Exception as err: + params = {'file': durable_data_file_path, 'err': err} + self.manager.logger.exception( + _('Problem making data file durable %(file)s (%(err)s)'), + params) + exc = DiskFileError( + 'Problem making data file durable %(file)s (%(err)s)' % params) + if exc: + raise exc + + def put(self, metadata): + """ + The only difference between this method and the replication policy + DiskFileWriter method is adding the frag index to the metadata. + + :param metadata: dictionary of metadata to be associated with object + """ + fi = None + cleanup = True + if self._extension == '.data': + # generally we treat the fragment index provided in metadata as + # canon, but if it's unavailable (e.g. tests) it's reasonable to + # use the frag_index provided at instantiation. Either way make + # sure that the fragment index is included in object sysmeta. + fi = metadata.setdefault('X-Object-Sysmeta-Ec-Frag-Index', + self._diskfile._frag_index) + fi = self.manager.validate_fragment_index(fi) + self._diskfile._frag_index = fi + # defer cleanup until commit() writes makes diskfile durable + cleanup = False + super(ECKVFileWriter, self)._put(metadata, cleanup, frag_index=fi) + + +class ECKVFile(BaseKVFile, ECDiskFile): + + reader_cls = ECKVFileReader + writer_cls = ECKVFileWriter + + def purge(self, timestamp, frag_index): + """ + Remove a tombstone file matching the specified timestamp or + datafile matching the specified timestamp and fragment index + from the object directory. + + This provides the EC reconstructor/ssync process with a way to + remove a tombstone or fragment from a handoff node after + reverting it to its primary node. + + The hash will be invalidated, and if empty or invalid the + hsh_path will be removed on next cleanup_ondisk_files. + + :param timestamp: the object timestamp, an instance of + :class:`~swift.common.utils.Timestamp` + :param frag_index: fragment archive index, must be + a whole number or None. + """ + purge_file = self.manager.make_on_disk_filename( + timestamp, ext='.ts') + remove_vfile(os.path.join(self._datadir, purge_file)) + if frag_index is not None: + # data file may or may not be durable so try removing both filename + # possibilities + purge_file = self.manager.make_on_disk_filename( + timestamp, ext='.data', frag_index=frag_index) + remove_vfile(os.path.join(self._datadir, purge_file)) + purge_file = self.manager.make_on_disk_filename( + timestamp, ext='.data', frag_index=frag_index, durable=True) + remove_vfile(os.path.join(self._datadir, purge_file)) + # we don't use hashes.pkl files + # self.manager.invalidate_hash(dirname(self._datadir)) + + +class ECKVFileManager(BaseKVFileManager, ECDiskFileManager): + diskfile_cls = ECKVFile + policy_type = EC_POLICY + + +def remove_vfile(filepath): + try: + vfile.delete_vfile_from_path(filepath) + except OSError: + pass diff --git a/swift/obj/meta.proto b/swift/obj/meta.proto new file mode 100644 index 000000000..55d8492f3 --- /dev/null +++ b/swift/obj/meta.proto @@ -0,0 +1,14 @@ +syntax = "proto3"; + +package meta; + +// Generate module with: protoc -I. --python_out=. meta.proto + +message Attr { + bytes key = 1; + bytes value = 2; +} + +message Metadata { + repeated Attr attrs = 1; +} diff --git a/swift/obj/meta_pb2.py b/swift/obj/meta_pb2.py new file mode 100644 index 000000000..a0cadec66 --- /dev/null +++ b/swift/obj/meta_pb2.py @@ -0,0 +1,115 @@ +# -*- coding: utf-8 -*- +# Generated by the protocol buffer compiler. DO NOT EDIT! +# source: meta.proto + +from google.protobuf import descriptor as _descriptor +from google.protobuf import message as _message +from google.protobuf import reflection as _reflection +from google.protobuf import symbol_database as _symbol_database +# @@protoc_insertion_point(imports) + +_sym_db = _symbol_database.Default() + + + + +DESCRIPTOR = _descriptor.FileDescriptor( + name='meta.proto', + package='meta', + syntax='proto3', + serialized_options=None, + serialized_pb=b'\n\nmeta.proto\x12\x04meta\"\"\n\x04\x41ttr\x12\x0b\n\x03key\x18\x01 \x01(\x0c\x12\r\n\x05value\x18\x02 \x01(\x0c\"%\n\x08Metadata\x12\x19\n\x05\x61ttrs\x18\x01 \x03(\x0b\x32\n.meta.Attrb\x06proto3' +) + + + + +_ATTR = _descriptor.Descriptor( + name='Attr', + full_name='meta.Attr', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='key', full_name='meta.Attr.key', index=0, + number=1, type=12, cpp_type=9, label=1, + has_default_value=False, default_value=b"", + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='value', full_name='meta.Attr.value', index=1, + number=2, type=12, cpp_type=9, label=1, + has_default_value=False, default_value=b"", + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + serialized_options=None, + is_extendable=False, + syntax='proto3', + extension_ranges=[], + oneofs=[ + ], + serialized_start=20, + serialized_end=54, +) + + +_METADATA = _descriptor.Descriptor( + name='Metadata', + full_name='meta.Metadata', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='attrs', full_name='meta.Metadata.attrs', index=0, + number=1, type=11, cpp_type=10, label=3, + has_default_value=False, default_value=[], + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + serialized_options=None, + is_extendable=False, + syntax='proto3', + extension_ranges=[], + oneofs=[ + ], + serialized_start=56, + serialized_end=93, +) + +_METADATA.fields_by_name['attrs'].message_type = _ATTR +DESCRIPTOR.message_types_by_name['Attr'] = _ATTR +DESCRIPTOR.message_types_by_name['Metadata'] = _METADATA +_sym_db.RegisterFileDescriptor(DESCRIPTOR) + +Attr = _reflection.GeneratedProtocolMessageType('Attr', (_message.Message,), { + 'DESCRIPTOR' : _ATTR, + '__module__' : 'meta_pb2' + # @@protoc_insertion_point(class_scope:meta.Attr) + }) +_sym_db.RegisterMessage(Attr) + +Metadata = _reflection.GeneratedProtocolMessageType('Metadata', (_message.Message,), { + 'DESCRIPTOR' : _METADATA, + '__module__' : 'meta_pb2' + # @@protoc_insertion_point(class_scope:meta.Metadata) + }) +_sym_db.RegisterMessage(Metadata) + + +# @@protoc_insertion_point(module_scope) diff --git a/swift/obj/objectrpcmanager.py b/swift/obj/objectrpcmanager.py new file mode 100644 index 000000000..a64dfb49d --- /dev/null +++ b/swift/obj/objectrpcmanager.py @@ -0,0 +1,157 @@ +# Copyright (c) 2010-2015 OpenStack Foundation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import os +import subprocess +import six +from eventlet import sleep + +from swift.common.daemon import Daemon +from swift.common.ring.utils import is_local_device +from swift.common.storage_policy import POLICIES, get_policy_string +from swift.common.utils import PrefixLoggerAdapter, get_logger, \ + config_true_value, whataremyips +from swift.obj import rpc_http as rpc + + +class ObjectRpcManager(Daemon): + def __init__(self, conf, logger=None): + self.conf = conf + self.logger = PrefixLoggerAdapter( + logger or get_logger(conf, log_route='object-rpcmanager'), {}) + self.devices_dir = conf.get('devices', '/srv/node') + self.mount_check = config_true_value(conf.get('mount_check', 'true')) + # use native golang leveldb implementation + self.use_go_leveldb = config_true_value( + conf.get('use_go_leveldb', 'false')) + self.swift_dir = conf.get('swift_dir', '/etc/swift') + self.bind_ip = conf.get('bind_ip', '0.0.0.0') + self.servers_per_port = int(conf.get('servers_per_port', '0') or 0) + self.port = None if self.servers_per_port else \ + int(conf.get('bind_port', 6200)) + self.volcheck = conf.get('volcheck', '') + self.losf_bin = conf.get('losf_bin', '') + self.healthcheck_interval = int(conf.get('healthcheck_interval', 10)) + + # check if the path to LOSF binary and volume checker exist + if not os.path.exists(self.volcheck): + raise AttributeError( + "Invalid or missing volcheck in your config file") + if not os.path.exists(self.losf_bin): + raise AttributeError( + "Invalid or missing losf_bin in your config file") + + # this should select only kv enabled policies + # (requires loading policies?) + self.policies = POLICIES + self.ring_check_interval = int(conf.get('ring_check_interval', 15)) + + # add RPC state check interval + self.kv_disks = self.get_kv_disks() + + def load_object_ring(self, policy): + """ + Make sure the policy's rings are loaded. + + :param policy: the StoragePolicy instance + :returns: appropriate ring object + """ + policy.load_ring(self.swift_dir) + return policy.object_ring + + # add filter for KV only + def get_policy2devices(self): + ips = whataremyips(self.bind_ip) + policy2devices = {} + for policy in self.policies: + self.load_object_ring(policy) + local_devices = list(six.moves.filter( + lambda dev: dev and is_local_device( + ips, self.port, + dev['replication_ip'], dev['replication_port']), + policy.object_ring.devs)) + policy2devices[policy] = local_devices + return policy2devices + + def get_kv_disks(self): + """ + Returns a dict of KV backed policies to list of devices + :return: dict + """ + policy2devices = self.get_policy2devices() + kv_disks = {} + for policy, devs in policy2devices.items(): + if policy.diskfile_module.endswith(('.kv', '.hybrid')): + kv_disks[policy.idx] = [d['device'] for d in devs] + + return kv_disks + + def get_worker_args(self, once=False, **kwargs): + """ + Take the set of all local devices for this node from all the KV + backed policies rings. + + :param once: False if the worker(s) will be daemonized, True if the + worker(s) will be run once + :param kwargs: optional overrides from the command line + """ + + # Note that this get re-used in is_healthy + self.kv_disks = self.get_kv_disks() + + # TODO: what to do in this case ? + if not self.kv_disks: + # we only need a single worker to do nothing until a ring change + yield dict(multiprocess_worker_index=0) + return + + for policy_idx, devs in self.kv_disks.iteritems(): + for dev in devs: + disk_path = os.path.join(self.devices_dir, dev) + losf_dir = get_policy_string('losf', policy_idx) + socket_path = os.path.join(disk_path, losf_dir, 'rpc.socket') + yield dict(policy_idx=policy_idx, disk_path=disk_path) + yield dict(policy_idx=policy_idx, disk_path=disk_path, + socket_path=socket_path, statecheck=True) + + def is_healthy(self): + return self.get_kv_disks() == self.kv_disks + + def run_forever(self, policy_idx=None, disk_path=None, socket_path=None, + statecheck=None, *args, **kwargs): + + if statecheck: + volcheck_args = [self.volcheck, '--disk_path', str(disk_path), + '--policy_idx', str(policy_idx), + '--keepuser', '--repair', '--no_prompt'] + # sleep a bit to let the RPC server start. Otherwise it will + # timeout and take longer to get the checks started. + sleep(2) + while True: + try: + state = rpc.get_kv_state(socket_path) + if not state.isClean: + self.logger.debug(volcheck_args) + subprocess.call(volcheck_args) + except Exception: + self.logger.exception("state check failed, continue") + sleep(10) + else: + losf_args = ['swift-losf-rpc', '-diskPath', str(disk_path), + '-debug', 'info', + '-policyIdx', str(policy_idx), + '-waitForMount={}'.format(str(self.mount_check))] + if self.use_go_leveldb: + losf_args.append('-useGoLevelDB') + os.execv(self.losf_bin, losf_args) diff --git a/swift/obj/reconstructor.py b/swift/obj/reconstructor.py index 135f9d5f9..ede7ecad7 100644 --- a/swift/obj/reconstructor.py +++ b/swift/obj/reconstructor.py @@ -22,7 +22,6 @@ import time from collections import defaultdict import six import six.moves.cPickle as pickle -import shutil from eventlet import (GreenPile, GreenPool, Timeout, sleep, tpool, spawn) from eventlet.support.greenlets import GreenletExit @@ -30,10 +29,10 @@ from eventlet.support.greenlets import GreenletExit from swift import gettext_ as _ from swift.common.utils import ( whataremyips, unlink_older_than, compute_eta, get_logger, - dump_recon_cache, mkdirs, config_true_value, - GreenAsyncPile, Timestamp, remove_file, - load_recon_cache, parse_override_options, distribute_evenly, - PrefixLoggerAdapter, remove_directory) + dump_recon_cache, config_true_value, + GreenAsyncPile, Timestamp, load_recon_cache, + parse_override_options, distribute_evenly, PrefixLoggerAdapter, + remove_directory) from swift.common.header_key_dict import HeaderKeyDict from swift.common.bufferedhttp import http_connect from swift.common.daemon import Daemon @@ -1113,15 +1112,16 @@ class ObjectReconstructor(Daemon): tmp_path = join(dev_path, get_tmp_dir(int(policy))) unlink_older_than(tmp_path, time.time() - df_mgr.reclaim_age) - if not os.path.exists(obj_path): + + if not df_mgr.exists(obj_path): try: - mkdirs(obj_path) + df_mgr.mkdirs(obj_path) except Exception: self.logger.exception( 'Unable to create %s' % obj_path) continue try: - partitions = os.listdir(obj_path) + partitions = df_mgr.listdir(obj_path) except OSError: self.logger.exception( 'Unable to list partitions in %r' % obj_path) @@ -1137,7 +1137,7 @@ class ObjectReconstructor(Daemon): if not partition.isdigit(): self.logger.warning( 'Unexpected entity in data dir: %r' % part_path) - self.delete_partition(part_path) + self.delete_partition(df_mgr, part_path) self.reconstruction_part_count += 1 continue partition = int(partition) @@ -1194,13 +1194,13 @@ class ObjectReconstructor(Daemon): self.last_reconstruction_count = -1 self.handoffs_remaining = 0 - def delete_partition(self, path): - def kill_it(path): - shutil.rmtree(path, ignore_errors=True) - remove_file(path) + def delete_partition(self, df_mgr, path): + def kill_it(df_mgr, path): + df_mgr.rmtree(path, ignore_errors=True) + df_mgr.remove_file(path) self.logger.info(_("Removing partition: %s"), path) - tpool.execute(kill_it, path) + tpool.execute(kill_it, df_mgr, path) def reconstruct(self, **kwargs): """Run a reconstruction pass""" @@ -1230,6 +1230,7 @@ class ObjectReconstructor(Daemon): # Therefore we know this part a) doesn't belong on # this node and b) doesn't have any suffixes in it. self.run_pool.spawn(self.delete_partition, + self._df_router[part_info['policy']], part_info['part_path']) for job in jobs: self.run_pool.spawn(self.process_job, job) diff --git a/swift/obj/replicator.py b/swift/obj/replicator.py index e3634bb8f..e9d19975a 100644 --- a/swift/obj/replicator.py +++ b/swift/obj/replicator.py @@ -16,9 +16,8 @@ from collections import defaultdict import os import errno -from os.path import isdir, isfile, join, dirname +from os.path import join, dirname import random -import shutil import time import itertools from six import viewkeys @@ -33,7 +32,7 @@ from swift.common.constraints import check_drive from swift.common.ring.utils import is_local_device from swift.common.utils import whataremyips, unlink_older_than, \ compute_eta, get_logger, dump_recon_cache, \ - rsync_module_interpolation, mkdirs, config_true_value, \ + rsync_module_interpolation, config_true_value, \ config_auto_int_value, storage_directory, \ load_recon_cache, PrefixLoggerAdapter, parse_override_options, \ distribute_evenly @@ -485,9 +484,11 @@ class ObjectReplicator(Daemon): :param job: a dict containing info about the partition to be replicated """ + df_mgr = self._df_router[job['policy']] + def tpool_get_suffixes(path): - return [suff for suff in os.listdir(path) - if len(suff) == 3 and isdir(join(path, suff))] + return [suff for suff in df_mgr.listdir(path) + if len(suff) == 3 and df_mgr.isdir(join(path, suff))] stats = self.stats_for_dev[job['device']] stats.attempted += 1 @@ -562,10 +563,10 @@ class ObjectReplicator(Daemon): failure_dev['device']) for failure_dev in job['nodes']]) else: - self.delete_partition(job['path']) + self.delete_partition(df_mgr, job['path']) handoff_partition_deleted = True elif not suffixes: - self.delete_partition(job['path']) + self.delete_partition(df_mgr, job['path']) handoff_partition_deleted = True except (Exception, Timeout): self.logger.exception(_("Error syncing handoff partition")) @@ -580,25 +581,26 @@ class ObjectReplicator(Daemon): self.partition_times.append(time.time() - begin) self.logger.timing_since('partition.delete.timing', begin) - def delete_partition(self, path): + def delete_partition(self, df_mgr, path): self.logger.info(_("Removing partition: %s"), path) try: - tpool.execute(shutil.rmtree, path) + tpool.execute(df_mgr.rmtree, path) except OSError as e: if e.errno not in (errno.ENOENT, errno.ENOTEMPTY): # If there was a race to create or delete, don't worry raise def delete_handoff_objs(self, job, delete_objs): + df_mgr = self._df_router[job['policy']] success_paths = [] error_paths = [] for object_hash in delete_objs: object_path = storage_directory(job['obj_path'], job['partition'], object_hash) - tpool.execute(shutil.rmtree, object_path, ignore_errors=True) + tpool.execute(df_mgr.rmtree, object_path, ignore_errors=True) suffix_dir = dirname(object_path) try: - os.rmdir(suffix_dir) + df_mgr.rmdir(suffix_dir) success_paths.append(object_path) except OSError as e: if e.errno not in (errno.ENOENT, errno.ENOTEMPTY): @@ -816,13 +818,14 @@ class ObjectReplicator(Daemon): tmp_path = join(dev_path, get_tmp_dir(policy)) unlink_older_than(tmp_path, time.time() - df_mgr.reclaim_age) - if not os.path.exists(obj_path): + if not df_mgr.exists(obj_path): try: - mkdirs(obj_path) + df_mgr.mkdirs(obj_path) except Exception: self.logger.exception('ERROR creating %s' % obj_path) continue - for partition in os.listdir(obj_path): + + for partition in df_mgr.listdir(obj_path): if (override_partitions is not None and partition.isdigit() and int(partition) not in override_partitions): continue @@ -937,6 +940,7 @@ class ObjectReplicator(Daemon): override_partitions=override_partitions, override_policies=override_policies) for job in jobs: + df_mgr = self._df_router[job['policy']] dev_stats = self.stats_for_dev[job['device']] num_jobs += 1 current_nodes = job['nodes'] @@ -964,13 +968,13 @@ class ObjectReplicator(Daemon): return try: - if isfile(job['path']): + if df_mgr.isfile(job['path']): # Clean up any (probably zero-byte) files where a # partition should be. self.logger.warning( 'Removing partition directory ' 'which was a file: %s', job['path']) - os.remove(job['path']) + df_mgr.remove(job['path']) continue except OSError: continue diff --git a/swift/obj/rpc_http.py b/swift/obj/rpc_http.py new file mode 100644 index 000000000..df3ff6e94 --- /dev/null +++ b/swift/obj/rpc_http.py @@ -0,0 +1,370 @@ +# Copyright (c) 2010-2012 OpenStack Foundation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Implements RPC: protobuf over a UNIX domain socket + +import socket +from eventlet.green import httplib +from swift.obj import fmgr_pb2 as pb + + +class UnixHTTPConnection(httplib.HTTPConnection): + """Support for unix domain socket with httplib""" + + def __init__(self, path, host='localhost', port=None, strict=None, + timeout=None): + httplib.HTTPConnection.__init__(self, host, port=port, strict=strict, + timeout=timeout) + self.path = path + + def connect(self): + sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + sock.connect(self.path) + self.sock = sock + + +class StatusCode(object): + Ok = 200 + Cancelled = 299 + InvalidArgument = 400 + NotFound = 404 + AlreadyExists = 409 + PermissionDenied = 403 + FailedPrecondition = 412 + Unimplemented = 501 + Internal = 500 + Unavailable = 503 + + +class RpcError(Exception): + def __init__(self, message, code): + self.code = code + super(RpcError, self).__init__(message) + + +def get_rpc_reply(conn, pb_type): + """ + Read the response from the index server over HTTP. If the status is 200, + deserialize the body as a protobuf object and return it. + If the status is not 200, raise an RpcError exception. + :param conn: HTTP connection to the index server + :param pb_type: protobuf type we expect in return + :return: protobuf object, or raise an exception if HTTP status is not 200 + """ + # if buffering is not set, httplib will call recvfrom() for every char + http_response = conn.getresponse(buffering=True) + if http_response.status != StatusCode.Ok: + raise RpcError(http_response.read(), http_response.status) + + pb_obj = pb_type() + pb_obj.ParseFromString(http_response.read()) + return pb_obj + + +def get_next_offset(socket_path, volume_index, repair_tool=False): + """ + Returns the next offset to use in the volume + """ + volume = pb.GetNextOffsetRequest(volume_index=int(volume_index), + repair_tool=repair_tool) + conn = UnixHTTPConnection(socket_path) + conn.request('POST', '/get_next_offset', volume.SerializeToString()) + response = get_rpc_reply(conn, pb.GetNextOffsetReply) + return response.offset + + +def register_volume(socket_path, partition, volume_type, volume_index, + first_obj_offset, state, repair_tool=False): + volume = pb.RegisterVolumeRequest(partition=int(partition), + type=int(volume_type), + volume_index=int(volume_index), + offset=first_obj_offset, state=state, + repair_tool=repair_tool) + + conn = UnixHTTPConnection(socket_path) + conn.request('POST', '/register_volume', volume.SerializeToString()) + response = get_rpc_reply(conn, pb.RegisterVolumeReply) + return response + + +def unregister_volume(socket_path, volume_index): + index = pb.UnregisterVolumeRequest(index=volume_index) + conn = UnixHTTPConnection(socket_path) + conn.request('POST', '/unregister_volume', index.SerializeToString()) + response = get_rpc_reply(conn, pb.UnregisterVolumeReply) + return response + + +def update_volume_state(socket_path, volume_index, new_state, + repair_tool=False): + state_update = pb.UpdateVolumeStateRequest(volume_index=int(volume_index), + state=new_state, + repair_tool=repair_tool) + conn = UnixHTTPConnection(socket_path) + conn.request('POST', '/update_volume_state', + state_update.SerializeToString()) + response = get_rpc_reply(conn, pb.UpdateVolumeStateReply) + return response + + +def register_object(socket_path, name, volume_index, offset, next_offset, + repair_tool=False): + """ + register a vfile + """ + obj = pb.RegisterObjectRequest(name=str(name), + volume_index=int(volume_index), + offset=int(offset), + next_offset=int(next_offset), + repair_tool=repair_tool) + conn = UnixHTTPConnection(socket_path) + conn.request('POST', '/register_object', obj.SerializeToString()) + response = get_rpc_reply(conn, pb.RegisterObjectReply) + return response + + +def unregister_object(socket_path, name, repair_tool=False): + obj = pb.UnregisterObjectRequest(name=str(name), repair_tool=repair_tool) + conn = UnixHTTPConnection(socket_path) + conn.request('POST', '/unregister_object', obj.SerializeToString()) + response = get_rpc_reply(conn, pb.UnregisterObjectReply) + return response + + +def rename_object(socket_path, name, new_name, repair_tool=False): + rename_req = pb.RenameObjectRequest(name=str(name), new_name=str(new_name), + repair_tool=repair_tool) + conn = UnixHTTPConnection(socket_path) + conn.request('POST', '/rename_object', rename_req.SerializeToString()) + response = get_rpc_reply(conn, pb.RenameObjectReply) + return response + + +def quarantine_object(socket_path, name, repair_tool=False): + objname = pb.QuarantineObjectRequest(name=str(name), + repair_tool=repair_tool) + conn = UnixHTTPConnection(socket_path) + conn.request('POST', '/quarantine_object', objname.SerializeToString()) + response = get_rpc_reply(conn, pb.QuarantineObjectReply) + return response + + +def unquarantine_object(socket_path, name, repair_tool=False): + objname = pb.UnquarantineObjectRequest(name=str(name), + repair_tool=repair_tool) + conn = UnixHTTPConnection(socket_path) + conn.request('POST', '/unquarantine_object', objname.SerializeToString()) + response = get_rpc_reply(conn, pb.UnquarantineObjectReply) + return response + + +def _list_quarantined_ohashes(socket_path, page_token, page_size): + """ + Returns quarantined hashes, with pagination (as with the regular diskfile, + they are not below partition/suffix directories) + :param socket_path: socket_path for index-server + :param page_token: where to start for pagination + :param page_size: maximum number of results to be returned + :return: A list of quarantined object hashes + """ + req_args = pb.ListQuarantinedOHashesRequest(page_token=str(page_token), + page_size=page_size) + conn = UnixHTTPConnection(socket_path) + conn.request('POST', '/list_quarantined_ohashes', + req_args.SerializeToString()) + response = get_rpc_reply(conn, pb.ListQuarantinedOHashesReply) + return response + + +def list_quarantined_ohashes(socket_path, page_size=10000): + """ + Returns all quarantined hashes, wraps _list_quarantined_ohashes so caller + does not have to deal with pagination + :param socket_path: socket_path + :param page_size: page_size to pass to wrapped function + :return: an iterator for all quarantined objects + """ + page_token = "" + while True: + response = _list_quarantined_ohashes(socket_path, page_token, + page_size) + for r in response.objects: + yield (r) + page_token = response.next_page_token + if not page_token: + break + + +def _list_objects_by_volume(socket_path, volume_index, quarantined, page_token, + page_size, repair_tool=False): + """ + Returns objects within the volume, either quarantined or not, with + pagination. + :param socket_path: socket_path for index-server + :param volume_index: index of the volume for which to list objects + :param quarantined: if true, returns quarantined objects. if false, returns + non-quarantined objects. + :param page_token: where to start for pagination + :param page_size: maximum number of results to be returned + :param repair_tool: set to true if caller is a repair tool + :return: A list of objects for the volume + """ + req_args = pb.LoadObjectsByVolumeRequest(index=volume_index, + quarantined=quarantined, + page_token=page_token, + page_size=page_size, + repair_tool=repair_tool) + conn = UnixHTTPConnection(socket_path) + conn.request('POST', '/load_objects_by_volume', + req_args.SerializeToString()) + response = get_rpc_reply(conn, pb.LoadObjectsByVolumeReply) + return response + + +def list_objects_by_volume(socket_path, volume_index, quarantined=False, + page_size=10000, repair_tool=False): + page_token = "" + while True: + response = _list_objects_by_volume(socket_path, volume_index, + quarantined, page_token, page_size, + repair_tool) + for r in response.objects: + yield (r) + page_token = response.next_page_token + if not page_token: + break + + +def list_quarantined_ohash(socket_path, prefix, repair_tool=False): + len_prefix = len(prefix) + prefix = pb.ListQuarantinedOHashRequest(prefix=str(prefix), + repair_tool=repair_tool) + conn = UnixHTTPConnection(socket_path) + conn.request('POST', '/list_quarantined_ohash', prefix.SerializeToString()) + response = get_rpc_reply(conn, pb.ListQuarantinedOHashReply) + + # Caller expects object names without the prefix, similar + # to os.listdir, not actual objects. + objnames = [] + for obj in response.objects: + objnames.append(obj.name[len_prefix:]) + + return objnames + + +# listdir like function for the KV +def list_prefix(socket_path, prefix, repair_tool=False): + len_prefix = len(prefix) + prefix = str(prefix) + prefix = pb.LoadObjectsByPrefixRequest(prefix=prefix, + repair_tool=repair_tool) + conn = UnixHTTPConnection(socket_path) + conn.request('POST', '/load_objects_by_prefix', prefix.SerializeToString()) + response = get_rpc_reply(conn, pb.LoadObjectsByPrefixReply) + # response.objets is an iterable + # TBD, caller expects object names without the prefix, similar + # to os.listdir, not actual objects. + # Fix this in the rpc server + # return response.objects + objnames = [] + for obj in response.objects: + objnames.append(obj.name[len_prefix:]) + + return objnames + + +def get_object(socket_path, name, is_quarantined=False, repair_tool=False): + """ + returns an object given its whole key + """ + object_name = pb.LoadObjectRequest(name=str(name), + is_quarantined=is_quarantined, + repair_tool=repair_tool) + conn = UnixHTTPConnection(socket_path) + conn.request('POST', '/load_object', object_name.SerializeToString()) + response = get_rpc_reply(conn, pb.LoadObjectReply) + return response + + +def list_partitions(socket_path, partition_bits): + list_partitions_req = pb.ListPartitionsRequest( + partition_bits=partition_bits) + conn = UnixHTTPConnection(socket_path) + conn.request('POST', '/list_partitions', + list_partitions_req.SerializeToString()) + response = get_rpc_reply(conn, pb.DirEntries) + return response.entry + + +def list_partition(socket_path, partition, partition_bits): + list_partition_req = pb.ListPartitionRequest(partition=partition, + partition_bits=partition_bits) + conn = UnixHTTPConnection(socket_path) + conn.request('POST', '/list_partition', + list_partition_req.SerializeToString()) + response = get_rpc_reply(conn, pb.DirEntries) + return response.entry + + +def list_suffix(socket_path, partition, suffix, partition_bits): + suffix = str(suffix) + list_suffix_req = pb.ListSuffixRequest(partition=partition, + suffix=suffix, + partition_bits=partition_bits) + conn = UnixHTTPConnection(socket_path) + conn.request('POST', '/list_suffix', list_suffix_req.SerializeToString()) + response = get_rpc_reply(conn, pb.DirEntries) + return response.entry + + +def list_volumes(socket_path, partition, type, repair_tool=False): + list_req = pb.ListVolumesRequest(partition=int(partition), type=type, + repair_tool=repair_tool) + conn = UnixHTTPConnection(socket_path) + conn.request('POST', '/list_volumes', list_req.SerializeToString()) + response = get_rpc_reply(conn, pb.ListVolumesReply) + return response.volumes + + +def get_volume(socket_path, index, repair_tool=False): + volume_idx = pb.GetVolumeRequest(index=index, repair_tool=repair_tool) + conn = UnixHTTPConnection(socket_path) + conn.request('POST', '/get_volume', volume_idx.SerializeToString()) + response = get_rpc_reply(conn, pb.Volume) + return response + + +def get_stats(socket_path): + stats_req = pb.GetStatsInfo() + conn = UnixHTTPConnection(socket_path) + conn.request('POST', '/get_stats', stats_req.SerializeToString()) + response = get_rpc_reply(conn, pb.GetStatsReply) + return response + + +def get_kv_state(socket_path): + pb_out = pb.GetKvStateRequest() + conn = UnixHTTPConnection(socket_path) + conn.request('POST', '/get_kv_state', pb_out.SerializeToString()) + response = get_rpc_reply(conn, pb.KvState) + return response + + +def set_kv_state(socket_path, isClean): + conn = UnixHTTPConnection(socket_path) + newKvState = pb.KvState(isClean=isClean) + conn.request('POST', '/set_kv_state', newKvState.SerializeToString()) + response = get_rpc_reply(conn, pb.SetKvStateReply) + return response diff --git a/swift/obj/vfile.py b/swift/obj/vfile.py new file mode 100644 index 000000000..57bacd558 --- /dev/null +++ b/swift/obj/vfile.py @@ -0,0 +1,1201 @@ +# Copyright (c) 2010-2012 OpenStack Foundation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +A "vfile" is a virtual file stored in a "volume". +A volume is an actual file where vfiles are stored. +vfile names and metadata (xattr) are also stored in the volume. +""" + +import errno +import fcntl +import six +import hashlib +import re +from eventlet.green import os +from swift.obj.header import ObjectHeader, VolumeHeader, ALIGNMENT, \ + read_volume_header, HeaderException, STATE_OBJ_QUARANTINED, \ + STATE_OBJ_FILE, write_object_header, \ + read_object_header, OBJECT_HEADER_VERSION, write_volume_header, \ + erase_object_header, MAX_OBJECT_HEADER_LEN +from swift.common.exceptions import DiskFileNoSpace, \ + DiskFileBadMetadataChecksum +from swift.common.storage_policy import POLICIES +from swift.common.utils import fsync, fdatasync, fsync_dir, \ + fallocate +from swift.obj import rpc_http as rpc +from swift.obj.rpc_http import RpcError, StatusCode +from swift.obj.fmgr_pb2 import STATE_RW +from swift.obj.meta_pb2 import Metadata +from swift.obj.diskfile import _encode_metadata +from swift.common import utils +from swift.obj.vfile_utils import SwiftPathInfo, get_volume_index, \ + get_volume_type, next_aligned_offset, SwiftQuarantinedPathInfo, VOSError, \ + VIOError, VFileException + +VCREATION_LOCK_NAME = "volume_creation.lock" + +PICKLE_PROTOCOL = 2 +METADATA_RESERVE = 500 +VOL_AND_LOCKS_RE = re.compile(r'v\d{7}(.writelock)?') + + +def increment(logger, counter, count=1): + if logger is not None: + try: + logger.update_stats(counter, count) + except Exception: + pass + + +class VFileReader(object): + """ + Represents a vfile stored in a volume. + """ + def __init__(self, fp, name, offset, header, metadata, logger): + self.fp = fp + self.name = name + self.offset = offset + self._header = header + self.metadata = metadata + self.logger = logger + + @property + def data_size(self): + return self._header.data_size + + @classmethod + def get_vfile(cls, filepath, logger): + """ + Returns a VFileReader instance from the path expected by swift + :param filepath: full path to file + :param logger: a logger object + """ + si = SwiftPathInfo.from_path(filepath) + if si.type != "file": + err_msg = "Not a path to a swift file ({})".format(filepath) + raise VIOError(errno.EINVAL, err_msg) + + full_name = si.ohash + si.filename + return cls._get_vfile(full_name, si.volume_dir, si.socket_path, logger) + + @classmethod + def get_quarantined_vfile(cls, filepath, logger): + si = SwiftQuarantinedPathInfo.from_path(filepath) + + if si.type != "file": + err_msg = "Not a path to a swift file ({})".format(filepath) + raise VIOError(errno.EINVAL, err_msg) + + full_name = si.ohash + si.filename + return cls._get_vfile(full_name, si.volume_dir, si.socket_path, logger, + is_quarantined=True) + + @classmethod + def _get_vfile(cls, name, volume_dir, socket_path, logger, + is_quarantined=False, repair_tool=False): + """ + Returns a VFileReader instance + :param name: full name: object hash+filename + :param volume_dir: directory where the volume is stored + :param socket_path: full path to KV socket + :param logger: logger object + :param is_quarantined: object is quarantined + :param repair_tool: True if requests comes from a repair tool + """ + # get the object + try: + obj = rpc.get_object(socket_path, name, + is_quarantined=is_quarantined, + repair_tool=repair_tool) + except RpcError as e: + if e.code == StatusCode.NotFound: + raise VIOError(errno.ENOENT, + "No such file or directory: {}".format(name)) + # May need to handle more cases ? + raise (e) + + # get the volume file name from the object + volume_filename = get_volume_name(obj.volume_index) + volume_filepath = os.path.join(volume_dir, volume_filename) + + fp = open(volume_filepath, 'rb') + fp.seek(obj.offset) + try: + header = read_object_header(fp) + except HeaderException: + fp.seek(obj.offset) + data = fp.read(512) + if all(c == '\x00' for c in data): + # unregister the object here + rpc.unregister_object(socket_path, name) + msg = "Zeroed header found for {} at offset {} in volume\ + {}".format(name, obj.offset, volume_filepath) + increment(logger, 'vfile.already_punched') + raise VFileException(msg) + msg = "Failed to read header for {} at offset {} in volume\ + {}".format(name, obj.offset, volume_filepath) + raise VIOError(errno.EIO, msg) + + # check that we have the object we were expecting + header_fullname = "{}{}".format(header.ohash, header.filename) + if header_fullname != name: + # until we journal the renames, after a crash we may not have the + # rename in the KV. Handle this here for now + non_durable_name = re.sub(r'(#\d+)#d.', r'\1.', header_fullname) + if non_durable_name == name: + increment(logger, 'vfile.already_renamed') + rpc.rename_object(socket_path, name, header_fullname) + else: + increment(logger, 'vfile.wrong_object_header_name') + raise VIOError(errno.EIO, + "Wrong object header name. Header: {} Expected:\ + {}".format(header_fullname, name)) + + metadata = read_metadata(fp, obj.offset, header) + + # seek to beginning of data + fp.seek(obj.offset + header.data_offset) + + return cls(fp, obj.name, obj.offset, header, metadata, logger) + + def read(self, size=None): + """ + Wraps read to prevent reading beyond the vfile content. + """ + curpos = self.fp.tell() + data_size = self._header.data_size + data_start_offset = self.offset + self._header.data_offset + data_end_offset = data_start_offset + data_size + + if curpos >= data_end_offset or size == 0: + return '' + if size: + if size > data_end_offset - curpos: + size = data_end_offset - curpos + else: + size = data_end_offset - curpos + + buf = self.fp.read(size) + return buf + + def seek(self, pos): + """ + Wraps seek to bind offset from the vfile start to its end. + """ + real_data_offset = self.offset + self._header.data_offset + real_new_pos = real_data_offset + pos + if (real_new_pos < real_data_offset or + real_new_pos > real_data_offset + self._header.data_size): + raise VIOError(errno.EINVAL, "Invalid seek") + self.fp.seek(real_new_pos) + + def tell(self): + curpos = self.fp.tell() + vpos = curpos - (self.offset + self._header.data_offset) + return vpos + + def close(self): + self.fp.close() + + +def _may_grow_volume(volume_fd, volume_offset, obj_size, conf, logger): + """ + Grows a volume if needed. + if free_space < obj_size + object header len, allocate obj_size padded to + volume_alloc_chunk_size + + """ + volume_alloc_chunk_size = conf['volume_alloc_chunk_size'] + + if obj_size is None: + obj_size = 0 + + volume_size = os.lseek(volume_fd, 0, os.SEEK_END) + free_space = volume_size - volume_offset + + obj_header_len = len(ObjectHeader(version=OBJECT_HEADER_VERSION)) + required_space = obj_header_len + obj_size + + if free_space < required_space: + _allocate_volume_space(volume_fd, volume_offset, required_space, + volume_alloc_chunk_size, logger) + + +class VFileWriter(object): + def __init__(self, datadir, fd, lock_fd, volume_dir, + volume_index, header, offset, logger): + si = SwiftPathInfo.from_path(datadir) + + self.fd = fd + self.lock_fd = lock_fd + self.volume_dir = volume_dir + self.volume_index = volume_index + self.header = header + self.offset = offset + self.socket_path = si.socket_path + self.partition = si.partition + # may be used for statsd. Do not use it to log or it will hang the + # object-server process. (eventlet) + self.logger = logger + + @classmethod + def create(cls, datadir, obj_size, conf, logger, extension=None): + # parse datadir + si = SwiftPathInfo.from_path(datadir) + + if si.type != "ohash": + raise VOSError("not a valid object hash path") + + if obj_size is not None: + if obj_size < 0: + raise VOSError("obj size may not be negative") + + socket_path = os.path.normpath(si.socket_path) + volume_dir = os.path.normpath(si.volume_dir) + + # get a writable volume + # TODO : check that we fallocate enough if obj_size > volume + # chunk alloc size + volume_file, lock_file, volume_path = open_or_create_volume( + socket_path, si.partition, extension, volume_dir, + conf, logger, size=obj_size) + volume_index = get_volume_index(volume_path) + + # create object header + header = ObjectHeader(version=OBJECT_HEADER_VERSION) + # TODO: this is unused, always set to zero. + header.ohash = si.ohash + header.policy_idx = 0 + header.data_offset = len(header) + conf['metadata_reserve'] + header.data_size = 0 + # requires header v3 + header.state = STATE_OBJ_FILE + + try: + # get offset at which to start writing + offset = rpc.get_next_offset(socket_path, volume_index) + + # pre-allocate space if needed + _may_grow_volume(volume_file, offset, obj_size, conf, logger) + + # seek to absolute object offset + relative data offset + # (we leave space for the header and some metadata) + os.lseek(volume_file, offset + header.data_offset, + os.SEEK_SET) + except Exception: + os.close(volume_file) + os.close(lock_file) + raise + + return cls(datadir, volume_file, lock_file, volume_dir, + volume_index, header, offset, logger) + + def commit(self, filename, metadata): + """ + Write the header, metadata, sync, and register vfile in KV. + """ + if self.fd < 0: + raise VIOError(errno.EBADF, "Bad file descriptor") + + if not filename: + raise VIOError("filename cannot be empty") + + # how much data has been written ? + # header.data_offset is relative to the object's offset + data_offset = self.offset + self.header.data_offset + data_end = os.lseek(self.fd, 0, os.SEEK_CUR) + self.header.data_size = data_end - data_offset + # FIXME: this is unused message, please fix as expected + # txt = "commit: {} data_end {} data_offset: {}" + + self.header.filename = filename + # metastr = pickle.dumps(self.metadata, PICKLE_PROTOCOL) + # create and populate protobuf object + meta = Metadata() + enc_metadata = _encode_metadata(metadata) + for k, v in enc_metadata.items(): + meta.attrs.add(key=k, value=v) + + metastr = meta.SerializeToString() + metastr_md5 = hashlib.md5(metastr).hexdigest().encode('ascii') + + self.header.metadata_size = len(metastr) + self.header.metadata_offset = len(self.header) + self.header.metastr_md5 = metastr_md5 + + # calculate the end object offset (this includes the padding, if any) + # start from data_end, and add: metadata remainder if any, footer, + # padding + + # how much reserved metadata space do we have ? + # Should be equal to "metadata_reserve" + metadata_available_space = (self.header.data_offset - + self.header.metadata_offset) + metadata_remainder = max(0, self.header.metadata_size - + metadata_available_space) + + object_end = data_end + metadata_remainder + + object_end = next_aligned_offset(object_end, ALIGNMENT) + + self.header.total_size = object_end - self.offset + + # write header + os.lseek(self.fd, self.offset, os.SEEK_SET) + os.write(self.fd, self.header.pack()) + + # write metadata, and footer + metadata_offset = self.offset + self.header.metadata_offset + if self.header.metadata_size > metadata_available_space: + os.lseek(self.fd, metadata_offset, os.SEEK_SET) + os.write(self.fd, metastr[:metadata_available_space]) + # metadata does not fit in reserved space, + # write the remainder after the data + os.lseek(self.fd, data_end, os.SEEK_SET) + os.write(self.fd, metastr[metadata_available_space:]) + else: + os.lseek(self.fd, metadata_offset, os.SEEK_SET) + os.write(self.fd, metastr) + + # Sanity check, we should not go beyond object_end + curpos = os.lseek(self.fd, 0, os.SEEK_CUR) + if curpos > object_end: + errtxt = "BUG: wrote past object_end! curpos: {} object_end: {}" + raise Exception(errtxt.format(curpos, object_end)) + + # sync data. fdatasync() is enough, if the volume was just created, + # it has been fsync()'ed previously, along with its parent directory. + fdatasync(self.fd) + + # register object + full_name = "{}{}".format(self.header.ohash, filename) + try: + rpc.register_object(self.socket_path, full_name, self.volume_index, + self.offset, object_end) + except RpcError: + # If we failed to register the object, erase the header so that it + # will not be picked up by the volume checker if there is a crash + # or power failure before it gets overwritten by another object. + erase_object_header(self.fd, self.offset) + raise + + increment(self.logger, 'vfile.vfile_creation') + increment(self.logger, 'vfile.total_space_used', + self.header.total_size) + + +def open_or_create_volume(socket_path, partition, extension, volume_dir, + conf, logger, size=0): + """ + Tries to open or create a volume for writing. If a volume cannot be + opened or created, a VOSError exception is raised. + :return: volume file descriptor, lock file descriptor, absolute path + to volume. + """ + volume_file, lock_file, volume_path = open_writable_volume(socket_path, + partition, + extension, + volume_dir, + conf, + logger) + if not volume_file: + # attempt to create new volume for partition + try: + volume_file, lock_file, volume_path = create_writable_volume( + socket_path, partition, extension, volume_dir, + conf, logger, size=size) + except Exception as err: + error_msg = "Failed to open or create a volume for writing: " + error_msg += getattr(err, "strerror", "Unknown error") + raise VOSError(errno.ENOSPC, error_msg) + + return volume_file, lock_file, volume_path + + +def _create_new_lock_file(volume_dir, logger): + creation_lock_path = os.path.join(volume_dir, VCREATION_LOCK_NAME) + with open(creation_lock_path, 'w') as creation_lock_file: + # this may block + fcntl.flock(creation_lock_file, fcntl.LOCK_EX) + + index = get_next_volume_index(volume_dir) + next_lock_name = get_lock_file_name(index) + next_lock_path = os.path.join(volume_dir, next_lock_name) + + try: + lock_file = os.open(next_lock_path, + os.O_CREAT | os.O_EXCL | os.O_WRONLY, 0o600) + except OSError: + increment(logger, 'vfile.volume_creation.fail_other') + raise + + try: + fcntl.flock(lock_file, fcntl.LOCK_EX | fcntl.LOCK_NB) + except IOError: + increment(logger, 'vfile.volume_creation.fail_other') + os.close(lock_file) + os.unlink(next_lock_path) + raise + + return index, next_lock_path, lock_file + + +# create a new volume +def create_writable_volume(socket_path, partition, extension, volume_dir, + conf, logger, state=STATE_RW, size=0): + """ + Creates a new writable volume, and associated lock file. + returns a volume_file, a lock_file, and the index of the volume that has + been created. + If the extension is specified, a specific volume may be used. + (Currently, .ts files go to separate volumes as they are short-lived, in + order to limit fragmentation) + state can be STATE_RW (new RW volume) or STATE_COMPACTION_TARGET (new empty + volume which will be used for compaction, and to which new objects cannot + be written). + size is the space that should be allocated to the volume (in addition to + the volume header) + """ + + if size is None: + size = 0 + + # Check if we have exceeded the allowed volume count for this partition + # Move this check below with the lock held ? (now, we may have + # a few extra volumes) + volume_type = get_volume_type(extension) + volumes = rpc.list_volumes(socket_path, partition, volume_type) + max_volume_count = conf['max_volume_count'] + if len(volumes) >= max_volume_count: + err_txt = ("Maximum count of volumes reached for partition:" + " {} type: {}".format(partition, volume_type)) + increment(logger, 'vfile.volume_creation.fail_count_exceeded') + raise VOSError(errno.EDQUOT, err_txt) + + try: + os.makedirs(volume_dir) + except OSError as err: + if err.errno == errno.EEXIST: + pass + else: + raise + + index, next_lock_path, lock_file = _create_new_lock_file( + volume_dir, logger) + + # create the volume + next_volume_name = get_volume_name(index) + next_volume_path = os.path.join(volume_dir, next_volume_name) + + vol_header = VolumeHeader() + vol_header.volume_idx = index + vol_header.type = volume_type + vol_header.partition = int(partition) + # first object alignment + vol_header.first_obj_offset = len(vol_header) + ( + ALIGNMENT - len(vol_header) % ALIGNMENT) + vol_header.state = state + + # How much space is needed for the object ? (assuming metadata fits in the + # reserved space, but we cannot know this in advance) + alloc_size = vol_header.first_obj_offset + size + volume_alloc_chunk_size = conf['volume_alloc_chunk_size'] + + try: + volume_file = os.open(next_volume_path, + os.O_CREAT | os.O_EXCL | os.O_WRONLY, 0o600) + _allocate_volume_space(volume_file, 0, alloc_size, + volume_alloc_chunk_size, logger) + + # Write volume header + write_volume_header(vol_header, volume_file) + + # If the uploader is slow to send data to the object server, a crash + # may occur before the object is received and a call to fsync() is + # issued. We end up with volumes without a header. + # Issue a fsync() here, at the cost of performance early on. As + # partitions get volumes we switch to open_writable_volume, avoiding + # the fsync. + fsync(volume_file) + fsync_dir(volume_dir) + + # Register volume + rpc.register_volume(socket_path, partition, vol_header.type, index, + vol_header.first_obj_offset, vol_header.state) + except Exception: + os.close(lock_file) + os.close(volume_file) + os.unlink(next_lock_path) + os.unlink(next_volume_path) + increment(logger, 'vfile.volume_creation.fail_other') + raise + + increment(logger, 'vfile.volume_creation.ok') + return volume_file, lock_file, next_volume_path + + +def _allocate_volume_space(volume_fd, offset, length, volume_alloc_chunk_size, + logger, ignore_error=False): + """ + Will pre-allocate space for the volume given the offset and length, + aligned to volume_alloc_chunk_size. + May ignore an OSError + :param volume_fd: file descriptor of the volume + :param offset: offset from which to grow the volume + :param length: length to grow, relative to offset + :param volume_alloc_chunk_size: pad length to align to this chunk size + :param ignore_error: ignore OSError + :return: + """ + try: + alloc_size = next_aligned_offset(length, volume_alloc_chunk_size) + fallocate(volume_fd, alloc_size, offset) + increment(logger, 'vfile.volume_alloc_space', + alloc_size) + except OSError as err: + if not ignore_error: + if err.errno in (errno.ENOSPC, errno.EDQUOT): + raise DiskFileNoSpace() + raise + + +def delete_volume(socket_path, volume_path, logger): + """ + Deletes a volume from disk and removes entry in the KV + """ + index = get_volume_index(volume_path) + volume_lock_path = "{}.writelock".format(volume_path) + + # Remove KV entry + rpc.unregister_volume(socket_path, index) + + # Remove volume and lock + os.unlink(volume_path) + os.unlink(volume_lock_path) + + +def open_writable_volume(socket_path, partition, extension, volume_dir, conf, + logger): + """ + Opens a volume available for writing. + returns a volume file, a lock_file, and the volume path + :param socket_path: full path to KV socket + :param partition: partition name + :param extension: file extension + """ + volume_type = get_volume_type(extension) + volume_file = None + lock_file = None + volume_file_path = None + # query the KV for available volumes given the partition and type + volumes = rpc.list_volumes(socket_path, partition, volume_type) + + # writable candidates are volumes which are in RW state and not too large + volumes = [vol for vol in volumes if vol.volume_state == STATE_RW and + vol.next_offset < conf['max_volume_size']] + volume_files = [get_volume_name(volume.volume_index) for volume in + volumes] + + for volume_file_name in volume_files: + volume_file_path = os.path.join(volume_dir, volume_file_name) + volume_file, lock_file = open_volume(volume_file_path) + if volume_file: + break + + return volume_file, lock_file, volume_file_path + + +def open_volume(volume_path): + """Locks the volume, and returns a fd to the volume and a fd to its lock + file. Returns None, None, if it cannot be locked. Raises for any other + error. + :param volume_path: full path to volume + :return: (volume fd, lock fd) + """ + lock_file_path = "{}.writelock".format(volume_path) + + try: + lock_file = os.open(lock_file_path, os.O_WRONLY) + except OSError as e: + if e.errno != errno.ENOENT: + raise + # if the volume lock file as been removed, create it + lock_file = os.open(lock_file_path, + os.O_CREAT | os.O_EXCL | os.O_WRONLY, 0o600) + + try: + fcntl.flock(lock_file, fcntl.LOCK_EX | fcntl.LOCK_NB) + except IOError as err: + if err.errno in (errno.EACCES, errno.EAGAIN): + # volume is locked + os.close(lock_file) + return None, None + else: + try: + os.close(lock_file) + except Exception: + pass + raise + except Exception: + try: + os.close(lock_file) + except Exception: + pass + raise + + volume_file = os.open(volume_path, os.O_WRONLY) + return volume_file, lock_file + + +def change_volume_state(volume_file_path, state, compaction_target=None): + """ + Changes the volumes state. caller locks the volume + TODO: should this handle RPC as well ? (currently done by caller) + TODO: take an optional size parameter so we can fallocate() the file + :param volume_file_path: path of volume to modify + :param state: new state + :param compaction_target: ID of the volume compaction target, if applicable + """ + volume_file = open(volume_file_path, "rb+") + + h = read_volume_header(volume_file) + h.state = state + if compaction_target: + h.compaction_target = compaction_target + volume_file.seek(0) + volume_file.write(h.pack()) + + +def get_next_volume_index(volume_dir): + """ + Returns the next volume index to use for the given dir. + Caller must hold the volume creation lock. + :param volume_dir: volume directory + :return: the next volume index to use + """ + dir_entries = os.listdir(volume_dir) + # Get all volumes and their lock: a volume should always have a lock, + # but a fsck may have removed either. If we find such a case, skip the + # index. + volumes_and_locks_idxs = set([name[1:8] for name in dir_entries if + VOL_AND_LOCKS_RE.match(name)]) + if len(volumes_and_locks_idxs) < 1: + return 1 + + # This is about 30% faster than calling int() in the list comprehension + # above. + idxs = sorted(int(i) for i in volumes_and_locks_idxs) + + # find the first "hole" in the indexes + for pos, idx in enumerate(idxs, start=1): + if pos != idx: + return pos + + # no hole found + return idx + 1 + + +def get_lock_file_name(index): + if index <= 0 or index > 9999999: + raise VFileException("invalid lock file index") + lock_file_name = "v{0:07d}.writelock".format(index) + return lock_file_name + + +def get_volume_name(index): + if index <= 0 or index > 9999999: + raise VFileException("invalid volume file index") + volume_file_name = "v{0:07d}".format(index) + return volume_file_name + + +def listdir(path): + type_to_func = { + 'ohash': _list_ohash, + 'suffix': _list_suffix, + 'partition': _list_partition, + 'partitions': _list_partitions + } + + path = os.path.normpath(path) + si = SwiftPathInfo.from_path(path) + + # get part power from the ring (needed as we generate "directories" based + # on it. + if not POLICIES[si.policy_idx].object_ring: + POLICIES[si.policy_idx].load_ring('/etc/swift') + part_power = 32 - POLICIES[si.policy_idx].object_ring._part_shift + + ret = type_to_func[si.type](si, part_power) + return [str(e) for e in ret] + + +def exists(path): + """ + Similar to os.path.exists + LOSF manages files below the "objects" directory. if the query is about + that directory, use os.path.exists, otherwise check in the KV. + It does not really make sense in LOSF context as callers will then issue a + "mkdir", which is a noop. But having this means we touch less of the + existing code. (diskfile, reconstructor, replicator) + :param path: full path to directory + :return: True is path exists, False otherwise + """ + si = SwiftPathInfo.from_path(path) + if si.type == 'partitions': + return os.path.exists(path) + + # if a directory is empty, it does not exist + if listdir(path): + return True + else: + return False + + # Does not handle "file" + + +def isdir(path): + """ + Similar to os.path.isdir + :param path: full path to directory + :return: + """ + si = SwiftPathInfo.from_path(path) + if si.type == 'partitions': + return os.path.isdir(path) + if si.type == 'file': + return False + if listdir(path): + return True + else: + return False + + +def isfile(path): + """ + Similar to os.path.isfile + :param path: full path to directory + :return: + """ + si = SwiftPathInfo.from_path(path) + if si.type == 'partitions': + return os.path.isfile(path) + if si.type == 'file': + return True + return False + + +def mkdirs(path): + """ + Similar to utils.mkdirs + Noop, except if the directory is the "objects" directory + :param path: full path to directory + """ + si = SwiftPathInfo.from_path(path) + if si.type == 'partitions': + return utils.mkdirs(path) + + +def list_quarantine(quarantine_path): + """ + Lists all quarantined object hashes for the disk/policy + :param quarantine_path: quarantined path + :return: a list of quarantined object hashes + """ + si = SwiftQuarantinedPathInfo.from_path(quarantine_path) + if si.type != "ohashes": + err_msg = "Not a path to a quarantined file ({})".format( + quarantine_path) + raise VIOError(errno.EINVAL, err_msg) + return rpc.list_quarantined_ohashes(si.socket_path) + + +def list_quarantined_ohash(quarantined_ohash_path): + si = SwiftQuarantinedPathInfo.from_path(quarantined_ohash_path) + if si.type != "ohash": + err_msg = "Not a path to a quarantined file ({})".format( + quarantined_ohash_path) + raise VIOError(errno.EINVAL, err_msg) + return rpc.list_quarantined_ohash(si.socket_path, si.ohash) + + +def _list_ohash(si, part_power): + """ + :param si: SwiftPathInfo object + :param part_power: + :return: list of files within the object directory + """ + return rpc.list_prefix(si.socket_path, si.ohash) + + +def _list_suffix(si, part_power): + """ + :param si: SwiftPathInfo object + :param part_power: + :return: list of object hashes directory within the suffix directory + """ + return rpc.list_suffix(si.socket_path, int(si.partition), + si.suffix, part_power) + + +def _list_partition(si, part_power): + """ + :param si: SwiftPathInfo object + :param part_power: + :return: list of suffixes within the partition + """ + return rpc.list_partition(si.socket_path, int(si.partition), + part_power) + + +def _list_partitions(si, part_power): + """ + :param si: SwiftPathInfo object + :param part_power: + :return: list of partitions + """ + return rpc.list_partitions(si.socket_path, part_power) + + +def set_header_state(socket_path, name, quarantine): + """ + Set a vfile header state (quarantined or not) + :param name: full name + :param socket_path: socket path + :param quarantine: True to quarantine, False to unquarantine + :return: + """ + try: + obj = rpc.get_object(socket_path, name, is_quarantined=not quarantine, + repair_tool=False) + except RpcError as e: + if e.code == StatusCode.NotFound: + raise IOError("No such file or directory: {}".format(name)) + raise (e) + + volume_filename = get_volume_name(obj.volume_index) + volume_dir = socket_path.replace("rpc.socket", "volumes") + volume_filepath = os.path.join(volume_dir, volume_filename) + with open(volume_filepath, 'r+b') as fp: + fp.seek(obj.offset) + try: + header = read_object_header(fp) + except HeaderException: + # until we journal the deletes, after a crash we may have an entry + # for an object that has been "punched" from the volume. + # if we find a hole instead of the header, remove entry from + # kv and return. + fp.seek(obj.offset) + data = fp.read(MAX_OBJECT_HEADER_LEN) + if all(c == '\x00' for c in data): + # unregister the object here + rpc.unregister_object(socket_path, name) + return + msg = "Failed to read header for {} at offset {} in volume\ + {}".format(name, obj.offset, volume_filepath) + raise VFileException(msg) + if quarantine: + header.state = STATE_OBJ_QUARANTINED + else: + header.state = STATE_OBJ_FILE + fp.seek(obj.offset) + write_object_header(header, fp) + + +def quarantine_ohash(dirpath, policy): + """ + Quarantine the object (all files below the object hash directory) + :param dirpath: path to object directory + :param policy: policy + :return: + """ + si = SwiftPathInfo.from_path(dirpath) + if si.type != 'ohash': + raise VFileException("dirpath not an object dir: {}".format(dirpath)) + + if policy.policy_type == 'erasure_coding': + sort_f = lambda x: utils.Timestamp(x.split('#')[0]) + else: + sort_f = lambda x: utils.Timestamp(os.path.splitext(x)[0]) + + final = [] + vfiles = listdir(dirpath) + + try: + for ext in ['.data', '.meta', '.ts']: + partial = [v for v in vfiles if os.path.splitext(v)[1] == ext] + partial.sort(key=sort_f) + final.extend(partial) + except Exception: + final = vfiles + + for vfile in final: + vfilepath = os.path.join(dirpath, vfile) + sif = SwiftPathInfo.from_path(vfilepath) + full_name = sif.ohash + sif.filename + # update header + set_header_state(sif.socket_path, full_name, quarantine=True) + try: + # update KV + rpc.quarantine_object(si.socket_path, full_name) + except RpcError as e: + if e.code == StatusCode.NotFound: + errmsg = "No such file or directory: '{}'" + raise OSError(2, errmsg.format(vfilepath)) + raise(e) + + +def unquarantine_ohash(socket_path, ohash): + """ + Unquarantine the object (all files below the object hash directory). + Is this needed? Used for tests but currently not called from anywhere + :param socket_path: path to KV socket + :param ohash: object hash + """ + for objname in rpc.list_quarantined_ohash(socket_path, ohash): + full_name = "{}{}".format(ohash, objname) + set_header_state(socket_path, full_name, quarantine=False) + try: + rpc.unquarantine_object(socket_path, full_name) + except RpcError as e: + if e.code == StatusCode.NotFound: + errmsg = "No such file or directory: '{}'" + raise OSError(2, errmsg.format(full_name)) + raise(e) + + +# def exists(path): +# """ +# :param filepath: path to vfile +# :return: True if file exists, False otherwise +# """ +# si = SwiftPathInfo.from_path(path) +# if si.type == 'partitions': +# os.path.exists +# try: +# VFileReader.get_vfile(path, None) +# return True +# except RpcError as e: +# if e.code == StatusCode.NotFound: +# return False +# raise (e) + + +def rmtree(path): + """ + Delete a directory recursively. (Actually, it only delete objects, as + directories do not exist) + :param path: path to the "directory" to remove + :param logger: + """ + type_to_func = { + 'ohash': _rmtree_ohash, + 'suffix': _rmtree_suffix, + 'partition': _rmtree_partition + # 'partitions': _rmtree_partitions + } + + path = os.path.normpath(path) + si = SwiftPathInfo.from_path(path) + type_to_func[si.type](path) + + +def _rmtree_ohash(path): + files = listdir(path) + for name in files: + filepath = os.path.join(path, name) + delete_vfile_from_path(filepath) + + +def _rmtree_suffix(path): + ohashes = listdir(path) + for ohash in ohashes: + ohashpath = os.path.join(path, ohash) + _rmtree_ohash(ohashpath) + + +def _rmtree_partition(path): + suffixes = listdir(path) + for suffix in suffixes: + suffixpath = os.path.join(path, suffix) + _rmtree_suffix(suffixpath) + + +def delete_vfile_from_path(filepath): + si = SwiftPathInfo.from_path(filepath) + full_name = si.ohash + si.filename + + def _unregister_object(socket_path, name, volume_index, offset, size): + try: + rpc.unregister_object(socket_path, name) + except RpcError as e: + if e.code == StatusCode.NotFound: + raise VOSError(errno.ENOENT, "No such file or directory:\ + '{}'".format(filepath)) + raise(e) + + try: + obj = rpc.get_object(si.socket_path, full_name) + except RpcError as e: + if e.code == StatusCode.NotFound: + raise VOSError(errno.ENOENT, "No such file or directory:\ + '{}'".format(filepath)) + volume_filename = get_volume_name(obj.volume_index) + volume_filepath = os.path.join(si.volume_dir, volume_filename) + + with open(volume_filepath, 'r+b') as fp: + # get object length + fp.seek(obj.offset) + try: + header = read_object_header(fp) + except HeaderException: + # until we journal the deletes, after a crash we may have an entry + # for an object that has been "punched" from the volume. + # if we find a hole instead of the header, remove entry from + # kv and return. + fp.seek(obj.offset) + data = fp.read(MAX_OBJECT_HEADER_LEN) + if all(c == '\x00' for c in data): + # unregister the object here + _unregister_object(si.socket_path, full_name, + obj.volume_index, obj.offset, 0) + return + + msg = "Failed to read header for {} at offset {} in volume\ + {}".format(full_name, obj.offset, volume_filepath) + raise VFileException(msg) + + # check that we have the object we were expecting + header_fullname = "{}{}".format(header.ohash, header.filename) + if header_fullname != full_name: + # until we journal the renames, after a crash we may not have the + # rename in the KV. If that's the case, continue. + non_durable_name = re.sub(r'(#\d+)#d.', r'\1.', header_fullname) + if non_durable_name != full_name: + raise VFileException( + "Wrong header name. Header: {} Expected: {}".format( + header_fullname, full_name)) + utils.punch_hole(fp.fileno(), obj.offset, header.total_size) + + _unregister_object(si.socket_path, full_name, obj.volume_index, + obj.offset, header.total_size) + + +# delete an object hash directory +def rmtree_ohash(path): + pass + + +def read_metadata(fp, offset, header): + """ + Reads vfile metadata + :param fp: opened file + :param offset: absolute offset to the beginning of the vfile + :param header: vfile header + :return: metadata dict + """ + metadata_offset = offset + header.metadata_offset + metadata_size = header.metadata_size + data_offset = offset + header.data_offset + data_end = offset + header.data_offset + header.data_size + metadata_available_space = data_offset - metadata_offset + + fp.seek(metadata_offset) + if metadata_size > metadata_available_space: + metastr = fp.read(metadata_available_space) + fp.seek(data_end) + metastr += fp.read(metadata_size - metadata_available_space) + else: + metastr = fp.read(metadata_size) + + # Verify checksum, if any + if hasattr(header, 'metastr_md5'): + metadata_checksum = header.metastr_md5 + computed_checksum = hashlib.md5(metastr).hexdigest().encode('ascii') + if metadata_checksum != computed_checksum: + raise DiskFileBadMetadataChecksum( + "Metadata checksum mismatch for %s: " + "stored checksum='%s', computed='%s'" % ( + header.filename, metadata_checksum, computed_checksum)) + else: + # we don't support updating from the older format for now + pass + + meta = Metadata() + meta.ParseFromString(metastr) + metadata = {} + for attr in meta.attrs: + if attr.key: + if six.PY2: + metadata[attr.key] = attr.value + else: + metadata[attr.key.decode('utf8', 'surrogateescape')] = \ + attr.value.decode('utf8', 'surrogateescape') + + return metadata + + +def rename_vfile(filepath, newfilepath, logger): + """ + Renames a vfile. All writes to the KV are asynchronous. If we were to make + a synchronous WriteBatch call, all previous writes would also be synced, + killing performance. See: + https://github.com/google/leveldb/blob/master/doc/index.md + Currently : + - update the header in place , synchronously + - update the KV asynchronously (delete, put) + + A file can only be renamed within a KV + This is currently only used by the erasure code diskfile manager + """ + # Get current file info + si = SwiftPathInfo.from_path(filepath) + full_name = si.ohash + si.filename + + # Get new file info + si_new = SwiftPathInfo.from_path(newfilepath) + new_full_name = si_new.ohash + si_new.filename + + # Sanity check, same KV + if si.socket_path != si_new.socket_path: + raise VFileException("attempted to rename a file to a different KV") + + # rename file in place in the header + vf_reader = VFileReader._get_vfile(full_name, si.volume_dir, + si.socket_path, logger) + vf_offset = vf_reader.offset + header = vf_reader._header + volume_path = vf_reader.fp.name + vf_reader.close() + + header.filename = si_new.filename + + vol_fd = os.open(volume_path, os.O_WRONLY) + os.lseek(vol_fd, vf_offset, os.SEEK_SET) + os.write(vol_fd, header.pack()) + fdatasync(vol_fd) + os.close(vol_fd) + + # Update the KV (async) + try: + rpc.rename_object(si.socket_path, full_name, new_full_name, + si.partition) + except RpcError as e: + if e.code == StatusCode.NotFound: + raise VIOError(errno.ENOENT, + "No such file or directory: {}".format(full_name)) + else: + raise diff --git a/swift/obj/vfile_utils.py b/swift/obj/vfile_utils.py new file mode 100644 index 000000000..8b9036b6c --- /dev/null +++ b/swift/obj/vfile_utils.py @@ -0,0 +1,228 @@ +# Copyright (c) 2010-2012 OpenStack Foundation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import os.path +import pwd +import re + +from swift.common.storage_policy import split_policy_string +from swift.obj.fmgr_pb2 import VOLUME_DEFAULT, VOLUME_TOMBSTONE + +# regex to extract policy from path (one KV per policy) +# TODO: use split_policy_string or similar, not re +policy_re = re.compile(r"^objects(-\d+)?$") +volume_name_re = re.compile(r"^v\d{7}$") +losf_name_re = re.compile(r"^losf(-\d+)?$") + + +class VIOError(IOError): + """ + Exceptions are part of the interface, subclass IOError to make it easier + to interface with diskfile.py + """ + + +class VOSError(OSError): + """ + Exceptions are part of the interface, subclass OSError to make it easier + to interface with diskfile.py + """ + + +class VFileException(Exception): + pass + + +def get_volume_type(extension): + ext_map = { + ".ts": VOLUME_TOMBSTONE + } + + return ext_map.get(extension, VOLUME_DEFAULT) + + +def valid_volume_name(name): + """Returns True if name is a valid volume name, False otherwise""" + if volume_name_re.match(name): + return True + else: + return False + + +def valid_losf_name(name): + """Returns True if name is a valid losf dir name, False otherwise""" + if losf_name_re.match(name): + return True + else: + return False + + +# used by "fsck" to get the socket path from the volume path +def get_socket_path_from_volume_path(volume_path): + volume_path = os.path.normpath(volume_path) + volume_dir_path, volume_name = os.path.split(volume_path) + losf_path, volume_dir = os.path.split(volume_dir_path) + mount_path, losf_dir = os.path.split(losf_path) + if volume_dir != "volumes" or not valid_volume_name(volume_name) or \ + not valid_losf_name(losf_dir): + raise ValueError("Invalid volume path") + + socket_path = os.path.join(losf_path, "rpc.socket") + return socket_path + + +def get_mountpoint_from_volume_path(volume_path): + volume_path = os.path.normpath(volume_path) + volume_dir_path, volume_name = os.path.split(volume_path) + losf_path, volume_dir = os.path.split(volume_dir_path) + mount_path, losf_dir = os.path.split(losf_path) + if volume_dir != "volumes" or not valid_volume_name(volume_name) or \ + not valid_losf_name(losf_dir): + raise ValueError("Invalid volume path") + return mount_path + + +class SwiftPathInfo(object): + def __init__(self, type, socket_path=None, volume_dir=None, + policy_idx=None, partition=None, suffix=None, ohash=None, + filename=None): + self.type = type + self.socket_path = socket_path + self.volume_dir = volume_dir + self.policy_idx = policy_idx + self.partition = partition + self.suffix = suffix + self.ohash = ohash + self.filename = filename + + # parses a swift path, returns a SwiftPathInfo instance + @classmethod + def from_path(cls, path): + count_to_type = { + 4: "file", + 3: "ohash", + 2: "suffix", + 1: "partition", + 0: "partitions" # "objects" directory + } + + clean_path = os.path.normpath(path) + ldir = clean_path.split(os.sep) + + try: + obj_idx = [i for i, elem in enumerate(ldir) + if elem.startswith("objects")][0] + except IndexError: + raise VOSError("cannot parse object directory") + + elements = ldir[(obj_idx + 1):] + count = len(elements) + + if count > 4: + raise VOSError("cannot parse swift file path") + + _, policy = split_policy_string(ldir[obj_idx]) + policy_idx = policy.idx + + prefix = os.path.join("/", *ldir[0:obj_idx]) + m = policy_re.match(ldir[obj_idx]) + if not m: + raise VOSError( + "cannot parse object element of directory") + if m.group(1): + sofsdir = "losf{}".format(m.group(1)) + else: + sofsdir = "losf" + socket_path = os.path.join(prefix, sofsdir, "rpc.socket") + volume_dir = os.path.join(prefix, sofsdir, "volumes") + + type = count_to_type[count] + return cls(type, socket_path, volume_dir, policy_idx, *elements) + + +class SwiftQuarantinedPathInfo(object): + def __init__(self, type, socket_path=None, volume_dir=None, + policy_idx=None, ohash=None, filename=None): + self.type = type + self.socket_path = socket_path + self.volume_dir = volume_dir + self.policy_idx = policy_idx + self.ohash = ohash + self.filename = filename + + # parses a quarantined path (<device>/quarantined/objects-X or below), + # returns a SwiftQuarantinedPathInfo instance + @classmethod + def from_path(cls, path): + count_to_type = { + 3: "file", + 2: "ohash", + 1: "ohashes", + } + + clean_path = os.path.normpath(path) + ldir = clean_path.split(os.sep) + + try: + quar_idx = ldir.index("quarantined") + except ValueError: + raise VOSError("cannot parse quarantined path %s" % + path) + + elements = ldir[(quar_idx + 1):] + count = len(elements) + + if count < 1 or count > 3 or "objects" not in elements[0]: + raise VOSError("cannot parse quarantined path %s" % + path) + + _, policy = split_policy_string(elements[0]) + policy_idx = policy.idx + + prefix = os.path.join("/", *ldir[:quar_idx]) + prefix = os.path.join(prefix, elements[0].replace("objects", "losf")) + socket_path = os.path.join(prefix, "rpc.socket") + volume_dir = os.path.join(prefix, "volumes") + + type = count_to_type[count] + return cls(type, socket_path, volume_dir, policy_idx, *elements[1:]) + + +def get_volume_index(volume_path): + """ + returns the volume index, either from its basename, or full path + """ + name = os.path.split(volume_path)[1] + + if not valid_volume_name(name): + raise ValueError("Invalid volume name") + + index = int(name[1:8]) + return index + + +# given an offset and alignement, returns the next aligned offset +def next_aligned_offset(offset, alignment): + if offset % alignment != 0: + return (offset + (alignment - offset % alignment)) + else: + return offset + + +def change_user(username): + pw = pwd.getpwnam(username) + uid = pw.pw_uid + os.setuid(uid) |