diff options
Diffstat (limited to 'src/storage/local/LocalStorage.cpp')
-rw-r--r-- | src/storage/local/LocalStorage.cpp | 421 |
1 files changed, 420 insertions, 1 deletions
diff --git a/src/storage/local/LocalStorage.cpp b/src/storage/local/LocalStorage.cpp index c774c738..4c04e860 100644 --- a/src/storage/local/LocalStorage.cpp +++ b/src/storage/local/LocalStorage.cpp @@ -1,4 +1,4 @@ -// Copyright (C) 2021-2022 Joel Rosdahl and other contributors +// Copyright (C) 2021-2023 Joel Rosdahl and other contributors // // See doc/AUTHORS.adoc for a complete list of contributors. // @@ -20,18 +20,40 @@ #include <AtomicFile.hpp> #include <Config.hpp> +#include <Context.hpp> +#include <File.hpp> #include <Logging.hpp> #include <MiniTrace.hpp> +#include <TemporaryFile.hpp> +#include <ThreadPool.hpp> #include <Util.hpp> #include <assertions.hpp> +#include <core/CacheEntry.hpp> +#include <core/FileRecompressor.hpp> +#include <core/Manifest.hpp> +#include <core/Result.hpp> +#include <core/Statistics.hpp> #include <core/exceptions.hpp> #include <core/wincompat.hpp> #include <fmtmacros.hpp> #include <storage/local/StatsFile.hpp> +#include <storage/local/util.hpp> #include <util/Duration.hpp> +#include <util/expected.hpp> #include <util/file.hpp> #include <util/string.hpp> +#include <third_party/fmt/core.h> + +#ifdef INODE_CACHE_SUPPORTED +# include <InodeCache.hpp> +#endif + +#include <algorithm> +#include <atomic> +#include <memory> +#include <string> + #ifdef HAVE_UNISTD_H # include <unistd.h> #endif @@ -369,6 +391,403 @@ LocalStorage::increment_statistics(const core::StatisticsCounters& statistics) m_result_counter_updates.increment(statistics); } +// Zero all statistics counters except those tracking cache size and number of +// files in the cache. +void +LocalStorage::zero_all_statistics() +{ + const auto now = util::TimePoint::now(); + const auto zeroable_fields = core::Statistics::get_zeroable_fields(); + + for_each_level_1_and_2_stats_file( + m_config.cache_dir(), [=](const std::string& path) { + StatsFile(path).update([=](auto& cs) { + for (const auto statistic : zeroable_fields) { + cs.set(statistic, 0); + } + cs.set(core::Statistic::stats_zeroed_timestamp, now.sec()); + }); + }); +} + +// Get statistics and last time of update for the whole local storage cache. +std::pair<core::StatisticsCounters, util::TimePoint> +LocalStorage::get_all_statistics() const +{ + core::StatisticsCounters counters; + uint64_t zero_timestamp = 0; + util::TimePoint last_updated; + + // Add up the stats in each directory. + for_each_level_1_and_2_stats_file( + m_config.cache_dir(), [&](const auto& path) { + counters.set(core::Statistic::stats_zeroed_timestamp, 0); // Don't add + counters.increment(StatsFile(path).read()); + zero_timestamp = std::max( + counters.get(core::Statistic::stats_zeroed_timestamp), zero_timestamp); + last_updated = std::max(last_updated, Stat::stat(path).mtime()); + }); + + counters.set(core::Statistic::stats_zeroed_timestamp, zero_timestamp); + return std::make_pair(counters, last_updated); +} + +static void +delete_file(const std::string& path, + const uint64_t size, + uint64_t* cache_size, + uint64_t* files_in_cache) +{ + const bool deleted = Util::unlink_safe(path, Util::UnlinkLog::ignore_failure); + if (!deleted && errno != ENOENT && errno != ESTALE) { + LOG("Failed to unlink {} ({})", path, strerror(errno)); + } else if (cache_size && files_in_cache) { + // The counters are intentionally subtracted even if there was no file to + // delete since the final cache size calculation will be incorrect if they + // aren't. (This can happen when there are several parallel ongoing + // cleanups of the same directory.) + *cache_size -= size; + --*files_in_cache; + } +} + +static void +update_counters(const std::string& dir, + const uint64_t files_in_cache, + const uint64_t cache_size, + const bool cleanup_performed) +{ + const std::string stats_file = dir + "/stats"; + StatsFile(stats_file).update([=](auto& cs) { + if (cleanup_performed) { + cs.increment(Statistic::cleanups_performed); + } + cs.set(Statistic::files_in_cache, files_in_cache); + cs.set(Statistic::cache_size_kibibyte, cache_size / 1024); + }); +} + +void +LocalStorage::evict(const ProgressReceiver& progress_receiver, + std::optional<uint64_t> max_age, + std::optional<std::string> namespace_) +{ + for_each_level_1_subdir( + m_config.cache_dir(), + [&](const std::string& subdir, + const ProgressReceiver& sub_progress_receiver) { + clean_dir(subdir, 0, 0, max_age, namespace_, sub_progress_receiver); + }, + progress_receiver); +} + +// Clean up one cache subdirectory. +void +LocalStorage::clean_dir(const std::string& subdir, + const uint64_t max_size, + const uint64_t max_files, + const std::optional<uint64_t> max_age, + const std::optional<std::string> namespace_, + const ProgressReceiver& progress_receiver) +{ + LOG("Cleaning up cache directory {}", subdir); + + auto files = get_level_1_files( + subdir, [&](double progress) { progress_receiver(progress / 3); }); + + uint64_t cache_size = 0; + uint64_t files_in_cache = 0; + auto current_time = util::TimePoint::now(); + std::unordered_map<std::string /*result_file*/, + std::vector<std::string> /*associated_raw_files*/> + raw_files_map; + + for (size_t i = 0; i < files.size(); + ++i, progress_receiver(1.0 / 3 + 1.0 * i / files.size() / 3)) { + const auto& file = files[i]; + + if (!file.is_regular()) { + // Not a file or missing file. + continue; + } + + // Delete any tmp files older than 1 hour right away. + if (file.mtime() + util::Duration(3600) < current_time + && TemporaryFile::is_tmp_file(file.path())) { + Util::unlink_tmp(file.path()); + continue; + } + + if (namespace_ && file_type_from_path(file.path()) == FileType::raw) { + const auto result_filename = + FMT("{}R", file.path().substr(0, file.path().length() - 2)); + raw_files_map[result_filename].push_back(file.path()); + } + + cache_size += file.size_on_disk(); + files_in_cache += 1; + } + + // Sort according to modification time, oldest first. + std::sort(files.begin(), files.end(), [](const auto& f1, const auto& f2) { + return f1.mtime() < f2.mtime(); + }); + + LOG("Before cleanup: {:.0f} KiB, {:.0f} files", + static_cast<double>(cache_size) / 1024, + static_cast<double>(files_in_cache)); + + bool cleaned = false; + for (size_t i = 0; i < files.size(); + ++i, progress_receiver(2.0 / 3 + 1.0 * i / files.size() / 3)) { + const auto& file = files[i]; + + if (!file || file.is_directory()) { + continue; + } + + if ((max_size == 0 || cache_size <= max_size) + && (max_files == 0 || files_in_cache <= max_files) + && (!max_age + || file.mtime() > (current_time - util::Duration(*max_age))) + && (!namespace_ || max_age)) { + break; + } + + if (namespace_) { + try { + core::CacheEntry::Header header(file.path()); + if (header.namespace_ != *namespace_) { + continue; + } + } catch (core::Error&) { + // Failed to read header: ignore. + continue; + } + + // For namespace eviction we need to remove raw files based on result + // filename since they don't have a header. + if (file_type_from_path(file.path()) == FileType::result) { + const auto entry = raw_files_map.find(file.path()); + if (entry != raw_files_map.end()) { + for (const auto& raw_file : entry->second) { + delete_file(raw_file, + Stat::lstat(raw_file).size_on_disk(), + &cache_size, + &files_in_cache); + } + } + } + } + + delete_file(file.path(), file.size_on_disk(), &cache_size, &files_in_cache); + cleaned = true; + } + + LOG("After cleanup: {:.0f} KiB, {:.0f} files", + static_cast<double>(cache_size) / 1024, + static_cast<double>(files_in_cache)); + + if (cleaned) { + LOG("Cleaned up cache directory {}", subdir); + } + + update_counters(subdir, files_in_cache, cache_size, cleaned); +} + +// Clean up all cache subdirectories. +void +LocalStorage::clean_all(const ProgressReceiver& progress_receiver) +{ + for_each_level_1_subdir( + m_config.cache_dir(), + [&](const std::string& subdir, + const ProgressReceiver& sub_progress_receiver) { + clean_dir(subdir, + m_config.max_size() / 16, + m_config.max_files() / 16, + std::nullopt, + std::nullopt, + sub_progress_receiver); + }, + progress_receiver); +} + +// Wipe one cache subdirectory. +static void +wipe_dir(const std::string& subdir, const ProgressReceiver& progress_receiver) +{ + LOG("Clearing out cache directory {}", subdir); + + const auto files = get_level_1_files( + subdir, [&](double progress) { progress_receiver(progress / 2); }); + + for (size_t i = 0; i < files.size(); ++i) { + Util::unlink_safe(files[i].path()); + progress_receiver(0.5 + 0.5 * i / files.size()); + } + + const bool cleared = !files.empty(); + if (cleared) { + LOG("Cleared out cache directory {}", subdir); + } + update_counters(subdir, 0, 0, cleared); +} + +// Wipe all cached files in all subdirectories. +void +LocalStorage::wipe_all(const ProgressReceiver& progress_receiver) +{ + for_each_level_1_subdir(m_config.cache_dir(), wipe_dir, progress_receiver); +} + +CompressionStatistics +LocalStorage::get_compression_statistics( + const ProgressReceiver& progress_receiver) const +{ + CompressionStatistics cs{}; + + for_each_level_1_subdir( + m_config.cache_dir(), + [&](const auto& subdir, const auto& sub_progress_receiver) { + const auto files = get_level_1_files( + subdir, [&](double progress) { sub_progress_receiver(progress / 2); }); + + for (size_t i = 0; i < files.size(); ++i) { + const auto& cache_file = files[i]; + cs.on_disk_size += cache_file.size_on_disk(); + + try { + core::CacheEntry::Header header(cache_file.path()); + cs.compr_size += cache_file.size(); + cs.content_size += header.entry_size; + } catch (core::Error&) { + cs.incompr_size += cache_file.size(); + } + + sub_progress_receiver(1.0 / 2 + 1.0 * i / files.size() / 2); + } + }, + progress_receiver); + + return cs; +} + +void +LocalStorage::recompress(const std::optional<int8_t> level, + const uint32_t threads, + const ProgressReceiver& progress_receiver) +{ + const size_t read_ahead = + std::max(static_cast<size_t>(10), 2 * static_cast<size_t>(threads)); + ThreadPool thread_pool(threads, read_ahead); + core::FileRecompressor recompressor; + + std::atomic<uint64_t> incompressible_size = 0; + + for_each_level_1_subdir( + m_config.cache_dir(), + [&](const auto& subdir, const auto& sub_progress_receiver) { + auto files = get_level_1_files(subdir, [&](double progress) { + sub_progress_receiver(0.1 * progress); + }); + + auto stats_file = subdir + "/stats"; + + for (size_t i = 0; i < files.size(); ++i) { + const auto& file = files[i]; + + if (file_type_from_path(file.path()) != FileType::unknown) { + thread_pool.enqueue( + [&recompressor, &incompressible_size, level, stats_file, file] { + try { + Stat new_stat = recompressor.recompress( + file, level, core::FileRecompressor::KeepAtime::no); + auto size_change_kibibyte = + Util::size_change_kibibyte(file, new_stat); + if (size_change_kibibyte != 0) { + StatsFile(stats_file).update([=](auto& cs) { + cs.increment(core::Statistic::cache_size_kibibyte, + size_change_kibibyte); + }); + } + } catch (core::Error&) { + // Ignore for now. + incompressible_size += file.size_on_disk(); + } + }); + } else if (!TemporaryFile::is_tmp_file(file.path())) { + incompressible_size += file.size_on_disk(); + } + + sub_progress_receiver(0.1 + 0.9 * i / files.size()); + } + + if (util::ends_with(subdir, "f")) { + // Wait here instead of after for_each_level_1_subdir to avoid + // updating the progress bar to 100% before all work is done. + thread_pool.shut_down(); + } + }, + progress_receiver); + + // In case there was no f subdir, shut down the thread pool now. + thread_pool.shut_down(); + + if (isatty(STDOUT_FILENO)) { + PRINT_RAW(stdout, "\n\n"); + } + + const double old_ratio = recompressor.old_size() > 0 + ? static_cast<double>(recompressor.content_size()) + / recompressor.old_size() + : 0.0; + const double old_savings = + old_ratio > 0.0 ? 100.0 - (100.0 / old_ratio) : 0.0; + const double new_ratio = recompressor.new_size() > 0 + ? static_cast<double>(recompressor.content_size()) + / recompressor.new_size() + : 0.0; + const double new_savings = + new_ratio > 0.0 ? 100.0 - (100.0 / new_ratio) : 0.0; + const int64_t size_difference = + static_cast<int64_t>(recompressor.new_size()) + - static_cast<int64_t>(recompressor.old_size()); + + const std::string old_compr_size_str = + Util::format_human_readable_size(recompressor.old_size()); + const std::string new_compr_size_str = + Util::format_human_readable_size(recompressor.new_size()); + const std::string content_size_str = + Util::format_human_readable_size(recompressor.content_size()); + const std::string incompr_size_str = + Util::format_human_readable_size(incompressible_size); + const std::string size_difference_str = + FMT("{}{}", + size_difference < 0 ? "-" : (size_difference > 0 ? "+" : " "), + Util::format_human_readable_size( + size_difference < 0 ? -size_difference : size_difference)); + + PRINT(stdout, "Original data: {:>8s}\n", content_size_str); + PRINT(stdout, + "Old compressed data: {:>8s} ({:.1f}% of original size)\n", + old_compr_size_str, + 100.0 - old_savings); + PRINT(stdout, + " - Compression ratio: {:>5.3f} x ({:.1f}% space savings)\n", + old_ratio, + old_savings); + PRINT(stdout, + "New compressed data: {:>8s} ({:.1f}% of original size)\n", + new_compr_size_str, + 100.0 - new_savings); + PRINT(stdout, + " - Compression ratio: {:>5.3f} x ({:.1f}% space savings)\n", + new_ratio, + new_savings); + PRINT(stdout, "Size change: {:>9s}\n", size_difference_str); +} + // Private methods LocalStorage::LookUpCacheFileResult |