diff options
Diffstat (limited to 'swift/obj/vfile.py')
-rw-r--r-- | swift/obj/vfile.py | 1201 |
1 files changed, 1201 insertions, 0 deletions
diff --git a/swift/obj/vfile.py b/swift/obj/vfile.py new file mode 100644 index 000000000..57bacd558 --- /dev/null +++ b/swift/obj/vfile.py @@ -0,0 +1,1201 @@ +# Copyright (c) 2010-2012 OpenStack Foundation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +A "vfile" is a virtual file stored in a "volume". +A volume is an actual file where vfiles are stored. +vfile names and metadata (xattr) are also stored in the volume. +""" + +import errno +import fcntl +import six +import hashlib +import re +from eventlet.green import os +from swift.obj.header import ObjectHeader, VolumeHeader, ALIGNMENT, \ + read_volume_header, HeaderException, STATE_OBJ_QUARANTINED, \ + STATE_OBJ_FILE, write_object_header, \ + read_object_header, OBJECT_HEADER_VERSION, write_volume_header, \ + erase_object_header, MAX_OBJECT_HEADER_LEN +from swift.common.exceptions import DiskFileNoSpace, \ + DiskFileBadMetadataChecksum +from swift.common.storage_policy import POLICIES +from swift.common.utils import fsync, fdatasync, fsync_dir, \ + fallocate +from swift.obj import rpc_http as rpc +from swift.obj.rpc_http import RpcError, StatusCode +from swift.obj.fmgr_pb2 import STATE_RW +from swift.obj.meta_pb2 import Metadata +from swift.obj.diskfile import _encode_metadata +from swift.common import utils +from swift.obj.vfile_utils import SwiftPathInfo, get_volume_index, \ + get_volume_type, next_aligned_offset, SwiftQuarantinedPathInfo, VOSError, \ + VIOError, VFileException + +VCREATION_LOCK_NAME = "volume_creation.lock" + +PICKLE_PROTOCOL = 2 +METADATA_RESERVE = 500 +VOL_AND_LOCKS_RE = re.compile(r'v\d{7}(.writelock)?') + + +def increment(logger, counter, count=1): + if logger is not None: + try: + logger.update_stats(counter, count) + except Exception: + pass + + +class VFileReader(object): + """ + Represents a vfile stored in a volume. + """ + def __init__(self, fp, name, offset, header, metadata, logger): + self.fp = fp + self.name = name + self.offset = offset + self._header = header + self.metadata = metadata + self.logger = logger + + @property + def data_size(self): + return self._header.data_size + + @classmethod + def get_vfile(cls, filepath, logger): + """ + Returns a VFileReader instance from the path expected by swift + :param filepath: full path to file + :param logger: a logger object + """ + si = SwiftPathInfo.from_path(filepath) + if si.type != "file": + err_msg = "Not a path to a swift file ({})".format(filepath) + raise VIOError(errno.EINVAL, err_msg) + + full_name = si.ohash + si.filename + return cls._get_vfile(full_name, si.volume_dir, si.socket_path, logger) + + @classmethod + def get_quarantined_vfile(cls, filepath, logger): + si = SwiftQuarantinedPathInfo.from_path(filepath) + + if si.type != "file": + err_msg = "Not a path to a swift file ({})".format(filepath) + raise VIOError(errno.EINVAL, err_msg) + + full_name = si.ohash + si.filename + return cls._get_vfile(full_name, si.volume_dir, si.socket_path, logger, + is_quarantined=True) + + @classmethod + def _get_vfile(cls, name, volume_dir, socket_path, logger, + is_quarantined=False, repair_tool=False): + """ + Returns a VFileReader instance + :param name: full name: object hash+filename + :param volume_dir: directory where the volume is stored + :param socket_path: full path to KV socket + :param logger: logger object + :param is_quarantined: object is quarantined + :param repair_tool: True if requests comes from a repair tool + """ + # get the object + try: + obj = rpc.get_object(socket_path, name, + is_quarantined=is_quarantined, + repair_tool=repair_tool) + except RpcError as e: + if e.code == StatusCode.NotFound: + raise VIOError(errno.ENOENT, + "No such file or directory: {}".format(name)) + # May need to handle more cases ? + raise (e) + + # get the volume file name from the object + volume_filename = get_volume_name(obj.volume_index) + volume_filepath = os.path.join(volume_dir, volume_filename) + + fp = open(volume_filepath, 'rb') + fp.seek(obj.offset) + try: + header = read_object_header(fp) + except HeaderException: + fp.seek(obj.offset) + data = fp.read(512) + if all(c == '\x00' for c in data): + # unregister the object here + rpc.unregister_object(socket_path, name) + msg = "Zeroed header found for {} at offset {} in volume\ + {}".format(name, obj.offset, volume_filepath) + increment(logger, 'vfile.already_punched') + raise VFileException(msg) + msg = "Failed to read header for {} at offset {} in volume\ + {}".format(name, obj.offset, volume_filepath) + raise VIOError(errno.EIO, msg) + + # check that we have the object we were expecting + header_fullname = "{}{}".format(header.ohash, header.filename) + if header_fullname != name: + # until we journal the renames, after a crash we may not have the + # rename in the KV. Handle this here for now + non_durable_name = re.sub(r'(#\d+)#d.', r'\1.', header_fullname) + if non_durable_name == name: + increment(logger, 'vfile.already_renamed') + rpc.rename_object(socket_path, name, header_fullname) + else: + increment(logger, 'vfile.wrong_object_header_name') + raise VIOError(errno.EIO, + "Wrong object header name. Header: {} Expected:\ + {}".format(header_fullname, name)) + + metadata = read_metadata(fp, obj.offset, header) + + # seek to beginning of data + fp.seek(obj.offset + header.data_offset) + + return cls(fp, obj.name, obj.offset, header, metadata, logger) + + def read(self, size=None): + """ + Wraps read to prevent reading beyond the vfile content. + """ + curpos = self.fp.tell() + data_size = self._header.data_size + data_start_offset = self.offset + self._header.data_offset + data_end_offset = data_start_offset + data_size + + if curpos >= data_end_offset or size == 0: + return '' + if size: + if size > data_end_offset - curpos: + size = data_end_offset - curpos + else: + size = data_end_offset - curpos + + buf = self.fp.read(size) + return buf + + def seek(self, pos): + """ + Wraps seek to bind offset from the vfile start to its end. + """ + real_data_offset = self.offset + self._header.data_offset + real_new_pos = real_data_offset + pos + if (real_new_pos < real_data_offset or + real_new_pos > real_data_offset + self._header.data_size): + raise VIOError(errno.EINVAL, "Invalid seek") + self.fp.seek(real_new_pos) + + def tell(self): + curpos = self.fp.tell() + vpos = curpos - (self.offset + self._header.data_offset) + return vpos + + def close(self): + self.fp.close() + + +def _may_grow_volume(volume_fd, volume_offset, obj_size, conf, logger): + """ + Grows a volume if needed. + if free_space < obj_size + object header len, allocate obj_size padded to + volume_alloc_chunk_size + + """ + volume_alloc_chunk_size = conf['volume_alloc_chunk_size'] + + if obj_size is None: + obj_size = 0 + + volume_size = os.lseek(volume_fd, 0, os.SEEK_END) + free_space = volume_size - volume_offset + + obj_header_len = len(ObjectHeader(version=OBJECT_HEADER_VERSION)) + required_space = obj_header_len + obj_size + + if free_space < required_space: + _allocate_volume_space(volume_fd, volume_offset, required_space, + volume_alloc_chunk_size, logger) + + +class VFileWriter(object): + def __init__(self, datadir, fd, lock_fd, volume_dir, + volume_index, header, offset, logger): + si = SwiftPathInfo.from_path(datadir) + + self.fd = fd + self.lock_fd = lock_fd + self.volume_dir = volume_dir + self.volume_index = volume_index + self.header = header + self.offset = offset + self.socket_path = si.socket_path + self.partition = si.partition + # may be used for statsd. Do not use it to log or it will hang the + # object-server process. (eventlet) + self.logger = logger + + @classmethod + def create(cls, datadir, obj_size, conf, logger, extension=None): + # parse datadir + si = SwiftPathInfo.from_path(datadir) + + if si.type != "ohash": + raise VOSError("not a valid object hash path") + + if obj_size is not None: + if obj_size < 0: + raise VOSError("obj size may not be negative") + + socket_path = os.path.normpath(si.socket_path) + volume_dir = os.path.normpath(si.volume_dir) + + # get a writable volume + # TODO : check that we fallocate enough if obj_size > volume + # chunk alloc size + volume_file, lock_file, volume_path = open_or_create_volume( + socket_path, si.partition, extension, volume_dir, + conf, logger, size=obj_size) + volume_index = get_volume_index(volume_path) + + # create object header + header = ObjectHeader(version=OBJECT_HEADER_VERSION) + # TODO: this is unused, always set to zero. + header.ohash = si.ohash + header.policy_idx = 0 + header.data_offset = len(header) + conf['metadata_reserve'] + header.data_size = 0 + # requires header v3 + header.state = STATE_OBJ_FILE + + try: + # get offset at which to start writing + offset = rpc.get_next_offset(socket_path, volume_index) + + # pre-allocate space if needed + _may_grow_volume(volume_file, offset, obj_size, conf, logger) + + # seek to absolute object offset + relative data offset + # (we leave space for the header and some metadata) + os.lseek(volume_file, offset + header.data_offset, + os.SEEK_SET) + except Exception: + os.close(volume_file) + os.close(lock_file) + raise + + return cls(datadir, volume_file, lock_file, volume_dir, + volume_index, header, offset, logger) + + def commit(self, filename, metadata): + """ + Write the header, metadata, sync, and register vfile in KV. + """ + if self.fd < 0: + raise VIOError(errno.EBADF, "Bad file descriptor") + + if not filename: + raise VIOError("filename cannot be empty") + + # how much data has been written ? + # header.data_offset is relative to the object's offset + data_offset = self.offset + self.header.data_offset + data_end = os.lseek(self.fd, 0, os.SEEK_CUR) + self.header.data_size = data_end - data_offset + # FIXME: this is unused message, please fix as expected + # txt = "commit: {} data_end {} data_offset: {}" + + self.header.filename = filename + # metastr = pickle.dumps(self.metadata, PICKLE_PROTOCOL) + # create and populate protobuf object + meta = Metadata() + enc_metadata = _encode_metadata(metadata) + for k, v in enc_metadata.items(): + meta.attrs.add(key=k, value=v) + + metastr = meta.SerializeToString() + metastr_md5 = hashlib.md5(metastr).hexdigest().encode('ascii') + + self.header.metadata_size = len(metastr) + self.header.metadata_offset = len(self.header) + self.header.metastr_md5 = metastr_md5 + + # calculate the end object offset (this includes the padding, if any) + # start from data_end, and add: metadata remainder if any, footer, + # padding + + # how much reserved metadata space do we have ? + # Should be equal to "metadata_reserve" + metadata_available_space = (self.header.data_offset - + self.header.metadata_offset) + metadata_remainder = max(0, self.header.metadata_size - + metadata_available_space) + + object_end = data_end + metadata_remainder + + object_end = next_aligned_offset(object_end, ALIGNMENT) + + self.header.total_size = object_end - self.offset + + # write header + os.lseek(self.fd, self.offset, os.SEEK_SET) + os.write(self.fd, self.header.pack()) + + # write metadata, and footer + metadata_offset = self.offset + self.header.metadata_offset + if self.header.metadata_size > metadata_available_space: + os.lseek(self.fd, metadata_offset, os.SEEK_SET) + os.write(self.fd, metastr[:metadata_available_space]) + # metadata does not fit in reserved space, + # write the remainder after the data + os.lseek(self.fd, data_end, os.SEEK_SET) + os.write(self.fd, metastr[metadata_available_space:]) + else: + os.lseek(self.fd, metadata_offset, os.SEEK_SET) + os.write(self.fd, metastr) + + # Sanity check, we should not go beyond object_end + curpos = os.lseek(self.fd, 0, os.SEEK_CUR) + if curpos > object_end: + errtxt = "BUG: wrote past object_end! curpos: {} object_end: {}" + raise Exception(errtxt.format(curpos, object_end)) + + # sync data. fdatasync() is enough, if the volume was just created, + # it has been fsync()'ed previously, along with its parent directory. + fdatasync(self.fd) + + # register object + full_name = "{}{}".format(self.header.ohash, filename) + try: + rpc.register_object(self.socket_path, full_name, self.volume_index, + self.offset, object_end) + except RpcError: + # If we failed to register the object, erase the header so that it + # will not be picked up by the volume checker if there is a crash + # or power failure before it gets overwritten by another object. + erase_object_header(self.fd, self.offset) + raise + + increment(self.logger, 'vfile.vfile_creation') + increment(self.logger, 'vfile.total_space_used', + self.header.total_size) + + +def open_or_create_volume(socket_path, partition, extension, volume_dir, + conf, logger, size=0): + """ + Tries to open or create a volume for writing. If a volume cannot be + opened or created, a VOSError exception is raised. + :return: volume file descriptor, lock file descriptor, absolute path + to volume. + """ + volume_file, lock_file, volume_path = open_writable_volume(socket_path, + partition, + extension, + volume_dir, + conf, + logger) + if not volume_file: + # attempt to create new volume for partition + try: + volume_file, lock_file, volume_path = create_writable_volume( + socket_path, partition, extension, volume_dir, + conf, logger, size=size) + except Exception as err: + error_msg = "Failed to open or create a volume for writing: " + error_msg += getattr(err, "strerror", "Unknown error") + raise VOSError(errno.ENOSPC, error_msg) + + return volume_file, lock_file, volume_path + + +def _create_new_lock_file(volume_dir, logger): + creation_lock_path = os.path.join(volume_dir, VCREATION_LOCK_NAME) + with open(creation_lock_path, 'w') as creation_lock_file: + # this may block + fcntl.flock(creation_lock_file, fcntl.LOCK_EX) + + index = get_next_volume_index(volume_dir) + next_lock_name = get_lock_file_name(index) + next_lock_path = os.path.join(volume_dir, next_lock_name) + + try: + lock_file = os.open(next_lock_path, + os.O_CREAT | os.O_EXCL | os.O_WRONLY, 0o600) + except OSError: + increment(logger, 'vfile.volume_creation.fail_other') + raise + + try: + fcntl.flock(lock_file, fcntl.LOCK_EX | fcntl.LOCK_NB) + except IOError: + increment(logger, 'vfile.volume_creation.fail_other') + os.close(lock_file) + os.unlink(next_lock_path) + raise + + return index, next_lock_path, lock_file + + +# create a new volume +def create_writable_volume(socket_path, partition, extension, volume_dir, + conf, logger, state=STATE_RW, size=0): + """ + Creates a new writable volume, and associated lock file. + returns a volume_file, a lock_file, and the index of the volume that has + been created. + If the extension is specified, a specific volume may be used. + (Currently, .ts files go to separate volumes as they are short-lived, in + order to limit fragmentation) + state can be STATE_RW (new RW volume) or STATE_COMPACTION_TARGET (new empty + volume which will be used for compaction, and to which new objects cannot + be written). + size is the space that should be allocated to the volume (in addition to + the volume header) + """ + + if size is None: + size = 0 + + # Check if we have exceeded the allowed volume count for this partition + # Move this check below with the lock held ? (now, we may have + # a few extra volumes) + volume_type = get_volume_type(extension) + volumes = rpc.list_volumes(socket_path, partition, volume_type) + max_volume_count = conf['max_volume_count'] + if len(volumes) >= max_volume_count: + err_txt = ("Maximum count of volumes reached for partition:" + " {} type: {}".format(partition, volume_type)) + increment(logger, 'vfile.volume_creation.fail_count_exceeded') + raise VOSError(errno.EDQUOT, err_txt) + + try: + os.makedirs(volume_dir) + except OSError as err: + if err.errno == errno.EEXIST: + pass + else: + raise + + index, next_lock_path, lock_file = _create_new_lock_file( + volume_dir, logger) + + # create the volume + next_volume_name = get_volume_name(index) + next_volume_path = os.path.join(volume_dir, next_volume_name) + + vol_header = VolumeHeader() + vol_header.volume_idx = index + vol_header.type = volume_type + vol_header.partition = int(partition) + # first object alignment + vol_header.first_obj_offset = len(vol_header) + ( + ALIGNMENT - len(vol_header) % ALIGNMENT) + vol_header.state = state + + # How much space is needed for the object ? (assuming metadata fits in the + # reserved space, but we cannot know this in advance) + alloc_size = vol_header.first_obj_offset + size + volume_alloc_chunk_size = conf['volume_alloc_chunk_size'] + + try: + volume_file = os.open(next_volume_path, + os.O_CREAT | os.O_EXCL | os.O_WRONLY, 0o600) + _allocate_volume_space(volume_file, 0, alloc_size, + volume_alloc_chunk_size, logger) + + # Write volume header + write_volume_header(vol_header, volume_file) + + # If the uploader is slow to send data to the object server, a crash + # may occur before the object is received and a call to fsync() is + # issued. We end up with volumes without a header. + # Issue a fsync() here, at the cost of performance early on. As + # partitions get volumes we switch to open_writable_volume, avoiding + # the fsync. + fsync(volume_file) + fsync_dir(volume_dir) + + # Register volume + rpc.register_volume(socket_path, partition, vol_header.type, index, + vol_header.first_obj_offset, vol_header.state) + except Exception: + os.close(lock_file) + os.close(volume_file) + os.unlink(next_lock_path) + os.unlink(next_volume_path) + increment(logger, 'vfile.volume_creation.fail_other') + raise + + increment(logger, 'vfile.volume_creation.ok') + return volume_file, lock_file, next_volume_path + + +def _allocate_volume_space(volume_fd, offset, length, volume_alloc_chunk_size, + logger, ignore_error=False): + """ + Will pre-allocate space for the volume given the offset and length, + aligned to volume_alloc_chunk_size. + May ignore an OSError + :param volume_fd: file descriptor of the volume + :param offset: offset from which to grow the volume + :param length: length to grow, relative to offset + :param volume_alloc_chunk_size: pad length to align to this chunk size + :param ignore_error: ignore OSError + :return: + """ + try: + alloc_size = next_aligned_offset(length, volume_alloc_chunk_size) + fallocate(volume_fd, alloc_size, offset) + increment(logger, 'vfile.volume_alloc_space', + alloc_size) + except OSError as err: + if not ignore_error: + if err.errno in (errno.ENOSPC, errno.EDQUOT): + raise DiskFileNoSpace() + raise + + +def delete_volume(socket_path, volume_path, logger): + """ + Deletes a volume from disk and removes entry in the KV + """ + index = get_volume_index(volume_path) + volume_lock_path = "{}.writelock".format(volume_path) + + # Remove KV entry + rpc.unregister_volume(socket_path, index) + + # Remove volume and lock + os.unlink(volume_path) + os.unlink(volume_lock_path) + + +def open_writable_volume(socket_path, partition, extension, volume_dir, conf, + logger): + """ + Opens a volume available for writing. + returns a volume file, a lock_file, and the volume path + :param socket_path: full path to KV socket + :param partition: partition name + :param extension: file extension + """ + volume_type = get_volume_type(extension) + volume_file = None + lock_file = None + volume_file_path = None + # query the KV for available volumes given the partition and type + volumes = rpc.list_volumes(socket_path, partition, volume_type) + + # writable candidates are volumes which are in RW state and not too large + volumes = [vol for vol in volumes if vol.volume_state == STATE_RW and + vol.next_offset < conf['max_volume_size']] + volume_files = [get_volume_name(volume.volume_index) for volume in + volumes] + + for volume_file_name in volume_files: + volume_file_path = os.path.join(volume_dir, volume_file_name) + volume_file, lock_file = open_volume(volume_file_path) + if volume_file: + break + + return volume_file, lock_file, volume_file_path + + +def open_volume(volume_path): + """Locks the volume, and returns a fd to the volume and a fd to its lock + file. Returns None, None, if it cannot be locked. Raises for any other + error. + :param volume_path: full path to volume + :return: (volume fd, lock fd) + """ + lock_file_path = "{}.writelock".format(volume_path) + + try: + lock_file = os.open(lock_file_path, os.O_WRONLY) + except OSError as e: + if e.errno != errno.ENOENT: + raise + # if the volume lock file as been removed, create it + lock_file = os.open(lock_file_path, + os.O_CREAT | os.O_EXCL | os.O_WRONLY, 0o600) + + try: + fcntl.flock(lock_file, fcntl.LOCK_EX | fcntl.LOCK_NB) + except IOError as err: + if err.errno in (errno.EACCES, errno.EAGAIN): + # volume is locked + os.close(lock_file) + return None, None + else: + try: + os.close(lock_file) + except Exception: + pass + raise + except Exception: + try: + os.close(lock_file) + except Exception: + pass + raise + + volume_file = os.open(volume_path, os.O_WRONLY) + return volume_file, lock_file + + +def change_volume_state(volume_file_path, state, compaction_target=None): + """ + Changes the volumes state. caller locks the volume + TODO: should this handle RPC as well ? (currently done by caller) + TODO: take an optional size parameter so we can fallocate() the file + :param volume_file_path: path of volume to modify + :param state: new state + :param compaction_target: ID of the volume compaction target, if applicable + """ + volume_file = open(volume_file_path, "rb+") + + h = read_volume_header(volume_file) + h.state = state + if compaction_target: + h.compaction_target = compaction_target + volume_file.seek(0) + volume_file.write(h.pack()) + + +def get_next_volume_index(volume_dir): + """ + Returns the next volume index to use for the given dir. + Caller must hold the volume creation lock. + :param volume_dir: volume directory + :return: the next volume index to use + """ + dir_entries = os.listdir(volume_dir) + # Get all volumes and their lock: a volume should always have a lock, + # but a fsck may have removed either. If we find such a case, skip the + # index. + volumes_and_locks_idxs = set([name[1:8] for name in dir_entries if + VOL_AND_LOCKS_RE.match(name)]) + if len(volumes_and_locks_idxs) < 1: + return 1 + + # This is about 30% faster than calling int() in the list comprehension + # above. + idxs = sorted(int(i) for i in volumes_and_locks_idxs) + + # find the first "hole" in the indexes + for pos, idx in enumerate(idxs, start=1): + if pos != idx: + return pos + + # no hole found + return idx + 1 + + +def get_lock_file_name(index): + if index <= 0 or index > 9999999: + raise VFileException("invalid lock file index") + lock_file_name = "v{0:07d}.writelock".format(index) + return lock_file_name + + +def get_volume_name(index): + if index <= 0 or index > 9999999: + raise VFileException("invalid volume file index") + volume_file_name = "v{0:07d}".format(index) + return volume_file_name + + +def listdir(path): + type_to_func = { + 'ohash': _list_ohash, + 'suffix': _list_suffix, + 'partition': _list_partition, + 'partitions': _list_partitions + } + + path = os.path.normpath(path) + si = SwiftPathInfo.from_path(path) + + # get part power from the ring (needed as we generate "directories" based + # on it. + if not POLICIES[si.policy_idx].object_ring: + POLICIES[si.policy_idx].load_ring('/etc/swift') + part_power = 32 - POLICIES[si.policy_idx].object_ring._part_shift + + ret = type_to_func[si.type](si, part_power) + return [str(e) for e in ret] + + +def exists(path): + """ + Similar to os.path.exists + LOSF manages files below the "objects" directory. if the query is about + that directory, use os.path.exists, otherwise check in the KV. + It does not really make sense in LOSF context as callers will then issue a + "mkdir", which is a noop. But having this means we touch less of the + existing code. (diskfile, reconstructor, replicator) + :param path: full path to directory + :return: True is path exists, False otherwise + """ + si = SwiftPathInfo.from_path(path) + if si.type == 'partitions': + return os.path.exists(path) + + # if a directory is empty, it does not exist + if listdir(path): + return True + else: + return False + + # Does not handle "file" + + +def isdir(path): + """ + Similar to os.path.isdir + :param path: full path to directory + :return: + """ + si = SwiftPathInfo.from_path(path) + if si.type == 'partitions': + return os.path.isdir(path) + if si.type == 'file': + return False + if listdir(path): + return True + else: + return False + + +def isfile(path): + """ + Similar to os.path.isfile + :param path: full path to directory + :return: + """ + si = SwiftPathInfo.from_path(path) + if si.type == 'partitions': + return os.path.isfile(path) + if si.type == 'file': + return True + return False + + +def mkdirs(path): + """ + Similar to utils.mkdirs + Noop, except if the directory is the "objects" directory + :param path: full path to directory + """ + si = SwiftPathInfo.from_path(path) + if si.type == 'partitions': + return utils.mkdirs(path) + + +def list_quarantine(quarantine_path): + """ + Lists all quarantined object hashes for the disk/policy + :param quarantine_path: quarantined path + :return: a list of quarantined object hashes + """ + si = SwiftQuarantinedPathInfo.from_path(quarantine_path) + if si.type != "ohashes": + err_msg = "Not a path to a quarantined file ({})".format( + quarantine_path) + raise VIOError(errno.EINVAL, err_msg) + return rpc.list_quarantined_ohashes(si.socket_path) + + +def list_quarantined_ohash(quarantined_ohash_path): + si = SwiftQuarantinedPathInfo.from_path(quarantined_ohash_path) + if si.type != "ohash": + err_msg = "Not a path to a quarantined file ({})".format( + quarantined_ohash_path) + raise VIOError(errno.EINVAL, err_msg) + return rpc.list_quarantined_ohash(si.socket_path, si.ohash) + + +def _list_ohash(si, part_power): + """ + :param si: SwiftPathInfo object + :param part_power: + :return: list of files within the object directory + """ + return rpc.list_prefix(si.socket_path, si.ohash) + + +def _list_suffix(si, part_power): + """ + :param si: SwiftPathInfo object + :param part_power: + :return: list of object hashes directory within the suffix directory + """ + return rpc.list_suffix(si.socket_path, int(si.partition), + si.suffix, part_power) + + +def _list_partition(si, part_power): + """ + :param si: SwiftPathInfo object + :param part_power: + :return: list of suffixes within the partition + """ + return rpc.list_partition(si.socket_path, int(si.partition), + part_power) + + +def _list_partitions(si, part_power): + """ + :param si: SwiftPathInfo object + :param part_power: + :return: list of partitions + """ + return rpc.list_partitions(si.socket_path, part_power) + + +def set_header_state(socket_path, name, quarantine): + """ + Set a vfile header state (quarantined or not) + :param name: full name + :param socket_path: socket path + :param quarantine: True to quarantine, False to unquarantine + :return: + """ + try: + obj = rpc.get_object(socket_path, name, is_quarantined=not quarantine, + repair_tool=False) + except RpcError as e: + if e.code == StatusCode.NotFound: + raise IOError("No such file or directory: {}".format(name)) + raise (e) + + volume_filename = get_volume_name(obj.volume_index) + volume_dir = socket_path.replace("rpc.socket", "volumes") + volume_filepath = os.path.join(volume_dir, volume_filename) + with open(volume_filepath, 'r+b') as fp: + fp.seek(obj.offset) + try: + header = read_object_header(fp) + except HeaderException: + # until we journal the deletes, after a crash we may have an entry + # for an object that has been "punched" from the volume. + # if we find a hole instead of the header, remove entry from + # kv and return. + fp.seek(obj.offset) + data = fp.read(MAX_OBJECT_HEADER_LEN) + if all(c == '\x00' for c in data): + # unregister the object here + rpc.unregister_object(socket_path, name) + return + msg = "Failed to read header for {} at offset {} in volume\ + {}".format(name, obj.offset, volume_filepath) + raise VFileException(msg) + if quarantine: + header.state = STATE_OBJ_QUARANTINED + else: + header.state = STATE_OBJ_FILE + fp.seek(obj.offset) + write_object_header(header, fp) + + +def quarantine_ohash(dirpath, policy): + """ + Quarantine the object (all files below the object hash directory) + :param dirpath: path to object directory + :param policy: policy + :return: + """ + si = SwiftPathInfo.from_path(dirpath) + if si.type != 'ohash': + raise VFileException("dirpath not an object dir: {}".format(dirpath)) + + if policy.policy_type == 'erasure_coding': + sort_f = lambda x: utils.Timestamp(x.split('#')[0]) + else: + sort_f = lambda x: utils.Timestamp(os.path.splitext(x)[0]) + + final = [] + vfiles = listdir(dirpath) + + try: + for ext in ['.data', '.meta', '.ts']: + partial = [v for v in vfiles if os.path.splitext(v)[1] == ext] + partial.sort(key=sort_f) + final.extend(partial) + except Exception: + final = vfiles + + for vfile in final: + vfilepath = os.path.join(dirpath, vfile) + sif = SwiftPathInfo.from_path(vfilepath) + full_name = sif.ohash + sif.filename + # update header + set_header_state(sif.socket_path, full_name, quarantine=True) + try: + # update KV + rpc.quarantine_object(si.socket_path, full_name) + except RpcError as e: + if e.code == StatusCode.NotFound: + errmsg = "No such file or directory: '{}'" + raise OSError(2, errmsg.format(vfilepath)) + raise(e) + + +def unquarantine_ohash(socket_path, ohash): + """ + Unquarantine the object (all files below the object hash directory). + Is this needed? Used for tests but currently not called from anywhere + :param socket_path: path to KV socket + :param ohash: object hash + """ + for objname in rpc.list_quarantined_ohash(socket_path, ohash): + full_name = "{}{}".format(ohash, objname) + set_header_state(socket_path, full_name, quarantine=False) + try: + rpc.unquarantine_object(socket_path, full_name) + except RpcError as e: + if e.code == StatusCode.NotFound: + errmsg = "No such file or directory: '{}'" + raise OSError(2, errmsg.format(full_name)) + raise(e) + + +# def exists(path): +# """ +# :param filepath: path to vfile +# :return: True if file exists, False otherwise +# """ +# si = SwiftPathInfo.from_path(path) +# if si.type == 'partitions': +# os.path.exists +# try: +# VFileReader.get_vfile(path, None) +# return True +# except RpcError as e: +# if e.code == StatusCode.NotFound: +# return False +# raise (e) + + +def rmtree(path): + """ + Delete a directory recursively. (Actually, it only delete objects, as + directories do not exist) + :param path: path to the "directory" to remove + :param logger: + """ + type_to_func = { + 'ohash': _rmtree_ohash, + 'suffix': _rmtree_suffix, + 'partition': _rmtree_partition + # 'partitions': _rmtree_partitions + } + + path = os.path.normpath(path) + si = SwiftPathInfo.from_path(path) + type_to_func[si.type](path) + + +def _rmtree_ohash(path): + files = listdir(path) + for name in files: + filepath = os.path.join(path, name) + delete_vfile_from_path(filepath) + + +def _rmtree_suffix(path): + ohashes = listdir(path) + for ohash in ohashes: + ohashpath = os.path.join(path, ohash) + _rmtree_ohash(ohashpath) + + +def _rmtree_partition(path): + suffixes = listdir(path) + for suffix in suffixes: + suffixpath = os.path.join(path, suffix) + _rmtree_suffix(suffixpath) + + +def delete_vfile_from_path(filepath): + si = SwiftPathInfo.from_path(filepath) + full_name = si.ohash + si.filename + + def _unregister_object(socket_path, name, volume_index, offset, size): + try: + rpc.unregister_object(socket_path, name) + except RpcError as e: + if e.code == StatusCode.NotFound: + raise VOSError(errno.ENOENT, "No such file or directory:\ + '{}'".format(filepath)) + raise(e) + + try: + obj = rpc.get_object(si.socket_path, full_name) + except RpcError as e: + if e.code == StatusCode.NotFound: + raise VOSError(errno.ENOENT, "No such file or directory:\ + '{}'".format(filepath)) + volume_filename = get_volume_name(obj.volume_index) + volume_filepath = os.path.join(si.volume_dir, volume_filename) + + with open(volume_filepath, 'r+b') as fp: + # get object length + fp.seek(obj.offset) + try: + header = read_object_header(fp) + except HeaderException: + # until we journal the deletes, after a crash we may have an entry + # for an object that has been "punched" from the volume. + # if we find a hole instead of the header, remove entry from + # kv and return. + fp.seek(obj.offset) + data = fp.read(MAX_OBJECT_HEADER_LEN) + if all(c == '\x00' for c in data): + # unregister the object here + _unregister_object(si.socket_path, full_name, + obj.volume_index, obj.offset, 0) + return + + msg = "Failed to read header for {} at offset {} in volume\ + {}".format(full_name, obj.offset, volume_filepath) + raise VFileException(msg) + + # check that we have the object we were expecting + header_fullname = "{}{}".format(header.ohash, header.filename) + if header_fullname != full_name: + # until we journal the renames, after a crash we may not have the + # rename in the KV. If that's the case, continue. + non_durable_name = re.sub(r'(#\d+)#d.', r'\1.', header_fullname) + if non_durable_name != full_name: + raise VFileException( + "Wrong header name. Header: {} Expected: {}".format( + header_fullname, full_name)) + utils.punch_hole(fp.fileno(), obj.offset, header.total_size) + + _unregister_object(si.socket_path, full_name, obj.volume_index, + obj.offset, header.total_size) + + +# delete an object hash directory +def rmtree_ohash(path): + pass + + +def read_metadata(fp, offset, header): + """ + Reads vfile metadata + :param fp: opened file + :param offset: absolute offset to the beginning of the vfile + :param header: vfile header + :return: metadata dict + """ + metadata_offset = offset + header.metadata_offset + metadata_size = header.metadata_size + data_offset = offset + header.data_offset + data_end = offset + header.data_offset + header.data_size + metadata_available_space = data_offset - metadata_offset + + fp.seek(metadata_offset) + if metadata_size > metadata_available_space: + metastr = fp.read(metadata_available_space) + fp.seek(data_end) + metastr += fp.read(metadata_size - metadata_available_space) + else: + metastr = fp.read(metadata_size) + + # Verify checksum, if any + if hasattr(header, 'metastr_md5'): + metadata_checksum = header.metastr_md5 + computed_checksum = hashlib.md5(metastr).hexdigest().encode('ascii') + if metadata_checksum != computed_checksum: + raise DiskFileBadMetadataChecksum( + "Metadata checksum mismatch for %s: " + "stored checksum='%s', computed='%s'" % ( + header.filename, metadata_checksum, computed_checksum)) + else: + # we don't support updating from the older format for now + pass + + meta = Metadata() + meta.ParseFromString(metastr) + metadata = {} + for attr in meta.attrs: + if attr.key: + if six.PY2: + metadata[attr.key] = attr.value + else: + metadata[attr.key.decode('utf8', 'surrogateescape')] = \ + attr.value.decode('utf8', 'surrogateescape') + + return metadata + + +def rename_vfile(filepath, newfilepath, logger): + """ + Renames a vfile. All writes to the KV are asynchronous. If we were to make + a synchronous WriteBatch call, all previous writes would also be synced, + killing performance. See: + https://github.com/google/leveldb/blob/master/doc/index.md + Currently : + - update the header in place , synchronously + - update the KV asynchronously (delete, put) + + A file can only be renamed within a KV + This is currently only used by the erasure code diskfile manager + """ + # Get current file info + si = SwiftPathInfo.from_path(filepath) + full_name = si.ohash + si.filename + + # Get new file info + si_new = SwiftPathInfo.from_path(newfilepath) + new_full_name = si_new.ohash + si_new.filename + + # Sanity check, same KV + if si.socket_path != si_new.socket_path: + raise VFileException("attempted to rename a file to a different KV") + + # rename file in place in the header + vf_reader = VFileReader._get_vfile(full_name, si.volume_dir, + si.socket_path, logger) + vf_offset = vf_reader.offset + header = vf_reader._header + volume_path = vf_reader.fp.name + vf_reader.close() + + header.filename = si_new.filename + + vol_fd = os.open(volume_path, os.O_WRONLY) + os.lseek(vol_fd, vf_offset, os.SEEK_SET) + os.write(vol_fd, header.pack()) + fdatasync(vol_fd) + os.close(vol_fd) + + # Update the KV (async) + try: + rpc.rename_object(si.socket_path, full_name, new_full_name, + si.partition) + except RpcError as e: + if e.code == StatusCode.NotFound: + raise VIOError(errno.ENOENT, + "No such file or directory: {}".format(full_name)) + else: + raise |