summaryrefslogtreecommitdiff
path: root/src/storage/local/LocalStorage.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/storage/local/LocalStorage.cpp')
-rw-r--r--src/storage/local/LocalStorage.cpp421
1 files changed, 420 insertions, 1 deletions
diff --git a/src/storage/local/LocalStorage.cpp b/src/storage/local/LocalStorage.cpp
index c774c738..4c04e860 100644
--- a/src/storage/local/LocalStorage.cpp
+++ b/src/storage/local/LocalStorage.cpp
@@ -1,4 +1,4 @@
-// Copyright (C) 2021-2022 Joel Rosdahl and other contributors
+// Copyright (C) 2021-2023 Joel Rosdahl and other contributors
//
// See doc/AUTHORS.adoc for a complete list of contributors.
//
@@ -20,18 +20,40 @@
#include <AtomicFile.hpp>
#include <Config.hpp>
+#include <Context.hpp>
+#include <File.hpp>
#include <Logging.hpp>
#include <MiniTrace.hpp>
+#include <TemporaryFile.hpp>
+#include <ThreadPool.hpp>
#include <Util.hpp>
#include <assertions.hpp>
+#include <core/CacheEntry.hpp>
+#include <core/FileRecompressor.hpp>
+#include <core/Manifest.hpp>
+#include <core/Result.hpp>
+#include <core/Statistics.hpp>
#include <core/exceptions.hpp>
#include <core/wincompat.hpp>
#include <fmtmacros.hpp>
#include <storage/local/StatsFile.hpp>
+#include <storage/local/util.hpp>
#include <util/Duration.hpp>
+#include <util/expected.hpp>
#include <util/file.hpp>
#include <util/string.hpp>
+#include <third_party/fmt/core.h>
+
+#ifdef INODE_CACHE_SUPPORTED
+# include <InodeCache.hpp>
+#endif
+
+#include <algorithm>
+#include <atomic>
+#include <memory>
+#include <string>
+
#ifdef HAVE_UNISTD_H
# include <unistd.h>
#endif
@@ -369,6 +391,403 @@ LocalStorage::increment_statistics(const core::StatisticsCounters& statistics)
m_result_counter_updates.increment(statistics);
}
+// Zero all statistics counters except those tracking cache size and number of
+// files in the cache.
+void
+LocalStorage::zero_all_statistics()
+{
+ const auto now = util::TimePoint::now();
+ const auto zeroable_fields = core::Statistics::get_zeroable_fields();
+
+ for_each_level_1_and_2_stats_file(
+ m_config.cache_dir(), [=](const std::string& path) {
+ StatsFile(path).update([=](auto& cs) {
+ for (const auto statistic : zeroable_fields) {
+ cs.set(statistic, 0);
+ }
+ cs.set(core::Statistic::stats_zeroed_timestamp, now.sec());
+ });
+ });
+}
+
+// Get statistics and last time of update for the whole local storage cache.
+std::pair<core::StatisticsCounters, util::TimePoint>
+LocalStorage::get_all_statistics() const
+{
+ core::StatisticsCounters counters;
+ uint64_t zero_timestamp = 0;
+ util::TimePoint last_updated;
+
+ // Add up the stats in each directory.
+ for_each_level_1_and_2_stats_file(
+ m_config.cache_dir(), [&](const auto& path) {
+ counters.set(core::Statistic::stats_zeroed_timestamp, 0); // Don't add
+ counters.increment(StatsFile(path).read());
+ zero_timestamp = std::max(
+ counters.get(core::Statistic::stats_zeroed_timestamp), zero_timestamp);
+ last_updated = std::max(last_updated, Stat::stat(path).mtime());
+ });
+
+ counters.set(core::Statistic::stats_zeroed_timestamp, zero_timestamp);
+ return std::make_pair(counters, last_updated);
+}
+
+static void
+delete_file(const std::string& path,
+ const uint64_t size,
+ uint64_t* cache_size,
+ uint64_t* files_in_cache)
+{
+ const bool deleted = Util::unlink_safe(path, Util::UnlinkLog::ignore_failure);
+ if (!deleted && errno != ENOENT && errno != ESTALE) {
+ LOG("Failed to unlink {} ({})", path, strerror(errno));
+ } else if (cache_size && files_in_cache) {
+ // The counters are intentionally subtracted even if there was no file to
+ // delete since the final cache size calculation will be incorrect if they
+ // aren't. (This can happen when there are several parallel ongoing
+ // cleanups of the same directory.)
+ *cache_size -= size;
+ --*files_in_cache;
+ }
+}
+
+static void
+update_counters(const std::string& dir,
+ const uint64_t files_in_cache,
+ const uint64_t cache_size,
+ const bool cleanup_performed)
+{
+ const std::string stats_file = dir + "/stats";
+ StatsFile(stats_file).update([=](auto& cs) {
+ if (cleanup_performed) {
+ cs.increment(Statistic::cleanups_performed);
+ }
+ cs.set(Statistic::files_in_cache, files_in_cache);
+ cs.set(Statistic::cache_size_kibibyte, cache_size / 1024);
+ });
+}
+
+void
+LocalStorage::evict(const ProgressReceiver& progress_receiver,
+ std::optional<uint64_t> max_age,
+ std::optional<std::string> namespace_)
+{
+ for_each_level_1_subdir(
+ m_config.cache_dir(),
+ [&](const std::string& subdir,
+ const ProgressReceiver& sub_progress_receiver) {
+ clean_dir(subdir, 0, 0, max_age, namespace_, sub_progress_receiver);
+ },
+ progress_receiver);
+}
+
+// Clean up one cache subdirectory.
+void
+LocalStorage::clean_dir(const std::string& subdir,
+ const uint64_t max_size,
+ const uint64_t max_files,
+ const std::optional<uint64_t> max_age,
+ const std::optional<std::string> namespace_,
+ const ProgressReceiver& progress_receiver)
+{
+ LOG("Cleaning up cache directory {}", subdir);
+
+ auto files = get_level_1_files(
+ subdir, [&](double progress) { progress_receiver(progress / 3); });
+
+ uint64_t cache_size = 0;
+ uint64_t files_in_cache = 0;
+ auto current_time = util::TimePoint::now();
+ std::unordered_map<std::string /*result_file*/,
+ std::vector<std::string> /*associated_raw_files*/>
+ raw_files_map;
+
+ for (size_t i = 0; i < files.size();
+ ++i, progress_receiver(1.0 / 3 + 1.0 * i / files.size() / 3)) {
+ const auto& file = files[i];
+
+ if (!file.is_regular()) {
+ // Not a file or missing file.
+ continue;
+ }
+
+ // Delete any tmp files older than 1 hour right away.
+ if (file.mtime() + util::Duration(3600) < current_time
+ && TemporaryFile::is_tmp_file(file.path())) {
+ Util::unlink_tmp(file.path());
+ continue;
+ }
+
+ if (namespace_ && file_type_from_path(file.path()) == FileType::raw) {
+ const auto result_filename =
+ FMT("{}R", file.path().substr(0, file.path().length() - 2));
+ raw_files_map[result_filename].push_back(file.path());
+ }
+
+ cache_size += file.size_on_disk();
+ files_in_cache += 1;
+ }
+
+ // Sort according to modification time, oldest first.
+ std::sort(files.begin(), files.end(), [](const auto& f1, const auto& f2) {
+ return f1.mtime() < f2.mtime();
+ });
+
+ LOG("Before cleanup: {:.0f} KiB, {:.0f} files",
+ static_cast<double>(cache_size) / 1024,
+ static_cast<double>(files_in_cache));
+
+ bool cleaned = false;
+ for (size_t i = 0; i < files.size();
+ ++i, progress_receiver(2.0 / 3 + 1.0 * i / files.size() / 3)) {
+ const auto& file = files[i];
+
+ if (!file || file.is_directory()) {
+ continue;
+ }
+
+ if ((max_size == 0 || cache_size <= max_size)
+ && (max_files == 0 || files_in_cache <= max_files)
+ && (!max_age
+ || file.mtime() > (current_time - util::Duration(*max_age)))
+ && (!namespace_ || max_age)) {
+ break;
+ }
+
+ if (namespace_) {
+ try {
+ core::CacheEntry::Header header(file.path());
+ if (header.namespace_ != *namespace_) {
+ continue;
+ }
+ } catch (core::Error&) {
+ // Failed to read header: ignore.
+ continue;
+ }
+
+ // For namespace eviction we need to remove raw files based on result
+ // filename since they don't have a header.
+ if (file_type_from_path(file.path()) == FileType::result) {
+ const auto entry = raw_files_map.find(file.path());
+ if (entry != raw_files_map.end()) {
+ for (const auto& raw_file : entry->second) {
+ delete_file(raw_file,
+ Stat::lstat(raw_file).size_on_disk(),
+ &cache_size,
+ &files_in_cache);
+ }
+ }
+ }
+ }
+
+ delete_file(file.path(), file.size_on_disk(), &cache_size, &files_in_cache);
+ cleaned = true;
+ }
+
+ LOG("After cleanup: {:.0f} KiB, {:.0f} files",
+ static_cast<double>(cache_size) / 1024,
+ static_cast<double>(files_in_cache));
+
+ if (cleaned) {
+ LOG("Cleaned up cache directory {}", subdir);
+ }
+
+ update_counters(subdir, files_in_cache, cache_size, cleaned);
+}
+
+// Clean up all cache subdirectories.
+void
+LocalStorage::clean_all(const ProgressReceiver& progress_receiver)
+{
+ for_each_level_1_subdir(
+ m_config.cache_dir(),
+ [&](const std::string& subdir,
+ const ProgressReceiver& sub_progress_receiver) {
+ clean_dir(subdir,
+ m_config.max_size() / 16,
+ m_config.max_files() / 16,
+ std::nullopt,
+ std::nullopt,
+ sub_progress_receiver);
+ },
+ progress_receiver);
+}
+
+// Wipe one cache subdirectory.
+static void
+wipe_dir(const std::string& subdir, const ProgressReceiver& progress_receiver)
+{
+ LOG("Clearing out cache directory {}", subdir);
+
+ const auto files = get_level_1_files(
+ subdir, [&](double progress) { progress_receiver(progress / 2); });
+
+ for (size_t i = 0; i < files.size(); ++i) {
+ Util::unlink_safe(files[i].path());
+ progress_receiver(0.5 + 0.5 * i / files.size());
+ }
+
+ const bool cleared = !files.empty();
+ if (cleared) {
+ LOG("Cleared out cache directory {}", subdir);
+ }
+ update_counters(subdir, 0, 0, cleared);
+}
+
+// Wipe all cached files in all subdirectories.
+void
+LocalStorage::wipe_all(const ProgressReceiver& progress_receiver)
+{
+ for_each_level_1_subdir(m_config.cache_dir(), wipe_dir, progress_receiver);
+}
+
+CompressionStatistics
+LocalStorage::get_compression_statistics(
+ const ProgressReceiver& progress_receiver) const
+{
+ CompressionStatistics cs{};
+
+ for_each_level_1_subdir(
+ m_config.cache_dir(),
+ [&](const auto& subdir, const auto& sub_progress_receiver) {
+ const auto files = get_level_1_files(
+ subdir, [&](double progress) { sub_progress_receiver(progress / 2); });
+
+ for (size_t i = 0; i < files.size(); ++i) {
+ const auto& cache_file = files[i];
+ cs.on_disk_size += cache_file.size_on_disk();
+
+ try {
+ core::CacheEntry::Header header(cache_file.path());
+ cs.compr_size += cache_file.size();
+ cs.content_size += header.entry_size;
+ } catch (core::Error&) {
+ cs.incompr_size += cache_file.size();
+ }
+
+ sub_progress_receiver(1.0 / 2 + 1.0 * i / files.size() / 2);
+ }
+ },
+ progress_receiver);
+
+ return cs;
+}
+
+void
+LocalStorage::recompress(const std::optional<int8_t> level,
+ const uint32_t threads,
+ const ProgressReceiver& progress_receiver)
+{
+ const size_t read_ahead =
+ std::max(static_cast<size_t>(10), 2 * static_cast<size_t>(threads));
+ ThreadPool thread_pool(threads, read_ahead);
+ core::FileRecompressor recompressor;
+
+ std::atomic<uint64_t> incompressible_size = 0;
+
+ for_each_level_1_subdir(
+ m_config.cache_dir(),
+ [&](const auto& subdir, const auto& sub_progress_receiver) {
+ auto files = get_level_1_files(subdir, [&](double progress) {
+ sub_progress_receiver(0.1 * progress);
+ });
+
+ auto stats_file = subdir + "/stats";
+
+ for (size_t i = 0; i < files.size(); ++i) {
+ const auto& file = files[i];
+
+ if (file_type_from_path(file.path()) != FileType::unknown) {
+ thread_pool.enqueue(
+ [&recompressor, &incompressible_size, level, stats_file, file] {
+ try {
+ Stat new_stat = recompressor.recompress(
+ file, level, core::FileRecompressor::KeepAtime::no);
+ auto size_change_kibibyte =
+ Util::size_change_kibibyte(file, new_stat);
+ if (size_change_kibibyte != 0) {
+ StatsFile(stats_file).update([=](auto& cs) {
+ cs.increment(core::Statistic::cache_size_kibibyte,
+ size_change_kibibyte);
+ });
+ }
+ } catch (core::Error&) {
+ // Ignore for now.
+ incompressible_size += file.size_on_disk();
+ }
+ });
+ } else if (!TemporaryFile::is_tmp_file(file.path())) {
+ incompressible_size += file.size_on_disk();
+ }
+
+ sub_progress_receiver(0.1 + 0.9 * i / files.size());
+ }
+
+ if (util::ends_with(subdir, "f")) {
+ // Wait here instead of after for_each_level_1_subdir to avoid
+ // updating the progress bar to 100% before all work is done.
+ thread_pool.shut_down();
+ }
+ },
+ progress_receiver);
+
+ // In case there was no f subdir, shut down the thread pool now.
+ thread_pool.shut_down();
+
+ if (isatty(STDOUT_FILENO)) {
+ PRINT_RAW(stdout, "\n\n");
+ }
+
+ const double old_ratio = recompressor.old_size() > 0
+ ? static_cast<double>(recompressor.content_size())
+ / recompressor.old_size()
+ : 0.0;
+ const double old_savings =
+ old_ratio > 0.0 ? 100.0 - (100.0 / old_ratio) : 0.0;
+ const double new_ratio = recompressor.new_size() > 0
+ ? static_cast<double>(recompressor.content_size())
+ / recompressor.new_size()
+ : 0.0;
+ const double new_savings =
+ new_ratio > 0.0 ? 100.0 - (100.0 / new_ratio) : 0.0;
+ const int64_t size_difference =
+ static_cast<int64_t>(recompressor.new_size())
+ - static_cast<int64_t>(recompressor.old_size());
+
+ const std::string old_compr_size_str =
+ Util::format_human_readable_size(recompressor.old_size());
+ const std::string new_compr_size_str =
+ Util::format_human_readable_size(recompressor.new_size());
+ const std::string content_size_str =
+ Util::format_human_readable_size(recompressor.content_size());
+ const std::string incompr_size_str =
+ Util::format_human_readable_size(incompressible_size);
+ const std::string size_difference_str =
+ FMT("{}{}",
+ size_difference < 0 ? "-" : (size_difference > 0 ? "+" : " "),
+ Util::format_human_readable_size(
+ size_difference < 0 ? -size_difference : size_difference));
+
+ PRINT(stdout, "Original data: {:>8s}\n", content_size_str);
+ PRINT(stdout,
+ "Old compressed data: {:>8s} ({:.1f}% of original size)\n",
+ old_compr_size_str,
+ 100.0 - old_savings);
+ PRINT(stdout,
+ " - Compression ratio: {:>5.3f} x ({:.1f}% space savings)\n",
+ old_ratio,
+ old_savings);
+ PRINT(stdout,
+ "New compressed data: {:>8s} ({:.1f}% of original size)\n",
+ new_compr_size_str,
+ 100.0 - new_savings);
+ PRINT(stdout,
+ " - Compression ratio: {:>5.3f} x ({:.1f}% space savings)\n",
+ new_ratio,
+ new_savings);
+ PRINT(stdout, "Size change: {:>9s}\n", size_difference_str);
+}
+
// Private methods
LocalStorage::LookUpCacheFileResult