diff options
Diffstat (limited to 'ee')
-rw-r--r-- | ee/app/models/ee/ci/job_artifact.rb | 25 | ||||
-rw-r--r-- | ee/app/models/ee/lfs_object.rb | 23 | ||||
-rw-r--r-- | ee/app/models/geo/fdw/ci/job_artifact.rb | 11 | ||||
-rw-r--r-- | ee/app/models/geo/fdw/lfs_object.rb | 9 | ||||
-rw-r--r-- | ee/app/services/geo/files_expire_service.rb | 77 | ||||
-rw-r--r-- | ee/app/services/geo/hashed_storage_attachments_migration_service.rb | 55 | ||||
-rw-r--r-- | ee/app/services/geo/job_artifact_deleted_event_store.rb | 48 | ||||
-rw-r--r-- | ee/app/services/geo/lfs_object_deleted_event_store.rb | 49 | ||||
-rw-r--r-- | ee/app/uploaders/object_storage.rb | 265 | ||||
-rw-r--r-- | ee/lib/gitlab/geo/file_transfer.rb | 24 | ||||
-rw-r--r-- | ee/lib/gitlab/geo/log_cursor/daemon.rb | 266 |
11 files changed, 852 insertions, 0 deletions
diff --git a/ee/app/models/ee/ci/job_artifact.rb b/ee/app/models/ee/ci/job_artifact.rb new file mode 100644 index 00000000000..02c6715f447 --- /dev/null +++ b/ee/app/models/ee/ci/job_artifact.rb @@ -0,0 +1,25 @@ +module EE + # CI::JobArtifact EE mixin + # + # This module is intended to encapsulate EE-specific model logic + # and be prepended in the `Ci::JobArtifact` model + module Ci::JobArtifact + extend ActiveSupport::Concern + + prepended do + after_destroy :log_geo_event + + scope :with_files_stored_locally, -> { where(file_store: [nil, JobArtifactUploader::Store::LOCAL]) } + end + + def local_store? + [nil, JobArtifactUploader::Store::LOCAL].include?(self.file_store) + end + + private + + def log_geo_event + ::Geo::JobArtifactDeletedEventStore.new(self).create + end + end +end diff --git a/ee/app/models/ee/lfs_object.rb b/ee/app/models/ee/lfs_object.rb new file mode 100644 index 00000000000..6962c2bea4f --- /dev/null +++ b/ee/app/models/ee/lfs_object.rb @@ -0,0 +1,23 @@ +module EE + # LFS Object EE mixin + # + # This module is intended to encapsulate EE-specific model logic + # and be prepended in the `LfsObject` model + module LfsObject + extend ActiveSupport::Concern + + prepended do + after_destroy :log_geo_event + end + + def local_store? + [nil, LfsObjectUploader::Store::LOCAL].include?(self.file_store) + end + + private + + def log_geo_event + ::Geo::LfsObjectDeletedEventStore.new(self).create + end + end +end diff --git a/ee/app/models/geo/fdw/ci/job_artifact.rb b/ee/app/models/geo/fdw/ci/job_artifact.rb new file mode 100644 index 00000000000..eaca84b332e --- /dev/null +++ b/ee/app/models/geo/fdw/ci/job_artifact.rb @@ -0,0 +1,11 @@ +module Geo + module Fdw + module Ci + class JobArtifact < ::Geo::BaseFdw + self.table_name = Gitlab::Geo.fdw_table('ci_job_artifacts') + + scope :with_files_stored_locally, -> { where(file_store: [nil, JobArtifactUploader::Store::LOCAL]) } + end + end + end +end diff --git a/ee/app/models/geo/fdw/lfs_object.rb b/ee/app/models/geo/fdw/lfs_object.rb new file mode 100644 index 00000000000..18aae28518d --- /dev/null +++ b/ee/app/models/geo/fdw/lfs_object.rb @@ -0,0 +1,9 @@ +module Geo + module Fdw + class LfsObject < ::Geo::BaseFdw + self.table_name = Gitlab::Geo.fdw_table('lfs_objects') + + scope :with_files_stored_locally, -> { where(file_store: [nil, LfsObjectUploader::Store::LOCAL]) } + end + end +end diff --git a/ee/app/services/geo/files_expire_service.rb b/ee/app/services/geo/files_expire_service.rb new file mode 100644 index 00000000000..e3604674d85 --- /dev/null +++ b/ee/app/services/geo/files_expire_service.rb @@ -0,0 +1,77 @@ +module Geo + class FilesExpireService + include ::Gitlab::Geo::LogHelpers + + BATCH_SIZE = 500 + + attr_reader :project, :old_full_path + + def initialize(project, old_full_path) + @project = project + @old_full_path = old_full_path + end + + # Expire already replicated uploads + # + # This is a fallback solution to support projects that haven't rolled out to hashed-storage yet. + # + # Note: Unless we add some locking mechanism, this will be best effort only + # as if there are files that are being replicated during this execution, they will not + # be expired. + # + # The long-term solution is to use hashed storage. + def execute + return unless Gitlab::Geo.secondary? + + uploads = finder.find_project_uploads(project) + log_info("Expiring replicated attachments after project rename", count: uploads.count) + + schedule_file_removal(uploads) + mark_for_resync! + end + + # Project's base directory for attachments storage + # + # @return base directory where all uploads for the project are stored + def base_dir + @base_dir ||= File.join(FileUploader.root, old_full_path) + end + + private + + def schedule_file_removal(uploads) + paths_to_remove = uploads.find_each(batch_size: BATCH_SIZE).each_with_object([]) do |upload, to_remove| + file_path = File.join(base_dir, upload.path) + + if File.exist?(file_path) + to_remove << [file_path] + + log_info("Scheduled to remove file", file_path: file_path) + end + end + + Geo::FileRemovalWorker.bulk_perform_async(paths_to_remove) + end + + def mark_for_resync! + finder.find_file_registries_uploads(project).delete_all + end + + def finder + @finder ||= ::Geo::ExpireUploadsFinder.new + end + + # This is called by LogHelpers to build json log with context info + # + # @see ::Gitlab::Geo::LogHelpers + def base_log_data(message) + { + class: self.class.name, + project_id: project.id, + project_path: project.full_path, + project_old_path: old_full_path, + message: message + } + end + end +end diff --git a/ee/app/services/geo/hashed_storage_attachments_migration_service.rb b/ee/app/services/geo/hashed_storage_attachments_migration_service.rb new file mode 100644 index 00000000000..d967d8f6d5e --- /dev/null +++ b/ee/app/services/geo/hashed_storage_attachments_migration_service.rb @@ -0,0 +1,55 @@ +module Geo + AttachmentMigrationError = Class.new(StandardError) + + class HashedStorageAttachmentsMigrationService + include ::Gitlab::Geo::LogHelpers + + attr_reader :project_id, :old_attachments_path, :new_attachments_path + + def initialize(project_id, old_attachments_path:, new_attachments_path:) + @project_id = project_id + @old_attachments_path = old_attachments_path + @new_attachments_path = new_attachments_path + end + + def async_execute + Geo::HashedStorageAttachmentsMigrationWorker.perform_async( + project_id, + old_attachments_path, + new_attachments_path + ) + end + + def execute + origin = File.join(FileUploader.root, old_attachments_path) + target = File.join(FileUploader.root, new_attachments_path) + move_folder!(origin, target) + end + + private + + def project + @project ||= Project.find(project_id) + end + + def move_folder!(old_path, new_path) + unless File.directory?(old_path) + log_info("Skipped attachments migration to Hashed Storage, source path doesn't exist or is not a directory", project_id: project.id, source: old_path, target: new_path) + return + end + + if File.exist?(new_path) + log_error("Cannot migrate attachments to Hashed Storage, target path already exist", project_id: project.id, source: old_path, target: new_path) + raise AttachmentMigrationError, "Target path '#{new_path}' already exist" + end + + # Create hashed storage base path folder + FileUtils.mkdir_p(File.dirname(new_path)) + + FileUtils.mv(old_path, new_path) + log_info("Migrated project attachments to Hashed Storage", project_id: project.id, source: old_path, target: new_path) + + true + end + end +end diff --git a/ee/app/services/geo/job_artifact_deleted_event_store.rb b/ee/app/services/geo/job_artifact_deleted_event_store.rb new file mode 100644 index 00000000000..7455773985c --- /dev/null +++ b/ee/app/services/geo/job_artifact_deleted_event_store.rb @@ -0,0 +1,48 @@ +module Geo + class JobArtifactDeletedEventStore < EventStore + self.event_type = :job_artifact_deleted_event + + attr_reader :job_artifact + + def initialize(job_artifact) + @job_artifact = job_artifact + end + + def create + return unless job_artifact.local_store? + + super + end + + private + + def build_event + Geo::JobArtifactDeletedEvent.new( + job_artifact: job_artifact, + file_path: relative_file_path + ) + end + + def local_store_path + Pathname.new(JobArtifactUploader.root) + end + + def relative_file_path + return unless job_artifact.file.present? + + Pathname.new(job_artifact.file.path).relative_path_from(local_store_path) + end + + # This is called by ProjectLogHelpers to build json log with context info + # + # @see ::Gitlab::Geo::ProjectLogHelpers + def base_log_data(message) + { + class: self.class.name, + job_artifact_id: job_artifact.id, + file_path: job_artifact.file.path, + message: message + } + end + end +end diff --git a/ee/app/services/geo/lfs_object_deleted_event_store.rb b/ee/app/services/geo/lfs_object_deleted_event_store.rb new file mode 100644 index 00000000000..9eb47f91472 --- /dev/null +++ b/ee/app/services/geo/lfs_object_deleted_event_store.rb @@ -0,0 +1,49 @@ +module Geo + class LfsObjectDeletedEventStore < EventStore + self.event_type = :lfs_object_deleted_event + + attr_reader :lfs_object + + def initialize(lfs_object) + @lfs_object = lfs_object + end + + def create + return unless lfs_object.local_store? + + super + end + + private + + def build_event + Geo::LfsObjectDeletedEvent.new( + lfs_object: lfs_object, + oid: lfs_object.oid, + file_path: relative_file_path + ) + end + + def local_store_path + Pathname.new(LfsObjectUploader.root) + end + + def relative_file_path + return unless lfs_object.file.present? + + Pathname.new(lfs_object.file.path).relative_path_from(local_store_path) + end + + # This is called by ProjectLogHelpers to build json log with context info + # + # @see ::Gitlab::Geo::ProjectLogHelpers + def base_log_data(message) + { + class: self.class.name, + lfs_object_id: lfs_object.id, + file_path: lfs_object.file.path, + message: message + } + end + end +end diff --git a/ee/app/uploaders/object_storage.rb b/ee/app/uploaders/object_storage.rb new file mode 100644 index 00000000000..e5b087524f5 --- /dev/null +++ b/ee/app/uploaders/object_storage.rb @@ -0,0 +1,265 @@ +require 'fog/aws' +require 'carrierwave/storage/fog' + +# +# This concern should add object storage support +# to the GitlabUploader class +# +module ObjectStorage + RemoteStoreError = Class.new(StandardError) + UnknownStoreError = Class.new(StandardError) + ObjectStoreUnavailable = Class.new(StandardError) + + module Store + LOCAL = 1 + REMOTE = 2 + end + + module Extension + # this extension is the glue between the ObjectStorage::Concern and RecordsUploads::Concern + module RecordsUploads + extend ActiveSupport::Concern + + prepended do |base| + raise ObjectStoreUnavailable, "#{base} must include ObjectStorage::Concern to use extensions." unless base < Concern + + base.include(::RecordsUploads::Concern) + end + + def retrieve_from_store!(identifier) + paths = store_dirs.map { |store, path| File.join(path, identifier) } + + unless current_upload_satisfies?(paths, model) + # the upload we already have isn't right, find the correct one + self.upload = uploads.find_by(model: model, path: paths) + end + + super + end + + def build_upload_from_uploader(uploader) + super.tap { |upload| upload.store = object_store } + end + + def upload=(upload) + return unless upload + + self.object_store = upload.store + super + end + + private + + def current_upload_satisfies?(paths, model) + return false unless upload + return false unless model + + paths.include?(upload.path) && + upload.model_id == model.id && + upload.model_type == model.class.base_class.sti_name + end + end + end + + module Concern + extend ActiveSupport::Concern + + included do |base| + base.include(ObjectStorage) + + before :store, :verify_license! + after :migrate, :delete_migrated_file + end + + class_methods do + def object_store_options + options.object_store + end + + def object_store_enabled? + object_store_options.enabled + end + + def background_upload_enabled? + object_store_options.background_upload + end + + def object_store_credentials + object_store_options.connection.to_hash.deep_symbolize_keys + end + + def remote_store_path + object_store_options.remote_directory + end + + def licensed? + License.feature_available?(:object_storage) + end + end + + def file_storage? + storage.is_a?(CarrierWave::Storage::File) + end + + def file_cache_storage? + cache_storage.is_a?(CarrierWave::Storage::File) + end + + def object_store + @object_store ||= model.try(store_serialization_column) || Store::LOCAL + end + + # rubocop:disable Gitlab/ModuleWithInstanceVariables + def object_store=(value) + @object_store = value || Store::LOCAL + @storage = storage_for(object_store) + end + # rubocop:enable Gitlab/ModuleWithInstanceVariables + + # Return true if the current file is part or the model (i.e. is mounted in the model) + # + def persist_object_store? + model.respond_to?(:"#{store_serialization_column}=") + end + + # Save the current @object_store to the model <mounted_as>_store column + def persist_object_store! + return unless persist_object_store? + + updated = model.update_column(store_serialization_column, object_store) + raise ActiveRecordError unless updated + end + + def use_file + if file_storage? + return yield path + end + + begin + cache_stored_file! + yield cache_path + ensure + cache_storage.delete_dir!(cache_path(nil)) + end + end + + def filename + super || file&.filename + end + + # + # Move the file to another store + # + # new_store: Enum (Store::LOCAL, Store::REMOTE) + # + def migrate!(new_store) + return unless object_store != new_store + return unless file + + new_file = nil + file_to_delete = file + from_object_store = object_store + self.object_store = new_store # changes the storage and file + + cache_stored_file! if file_storage? + + with_callbacks(:migrate, file_to_delete) do + with_callbacks(:store, file_to_delete) do # for #store_versions! + new_file = storage.store!(file) + persist_object_store! + self.file = new_file + end + end + + file + rescue => e + # in case of failure delete new file + new_file.delete unless new_file.nil? + # revert back to the old file + self.object_store = from_object_store + self.file = file_to_delete + raise e + end + + def schedule_migration_to_object_storage(*args) + return unless self.class.object_store_enabled? + return unless self.class.background_upload_enabled? + return unless self.class.licensed? + return unless self.file_storage? + + ObjectStorageUploadWorker.perform_async(self.class.name, model.class.name, mounted_as, model.id) + end + + def fog_directory + self.class.remote_store_path + end + + def fog_credentials + self.class.object_store_credentials + end + + def fog_public + false + end + + def delete_migrated_file(migrated_file) + migrated_file.delete if exists? + end + + def verify_license!(_file) + return if file_storage? + + raise 'Object Storage feature is missing' unless self.class.licensed? + end + + def exists? + file.present? + end + + def store_dir(store = nil) + store_dirs[store || object_store] + end + + def store_dirs + { + Store::LOCAL => File.join(base_dir, dynamic_segment), + Store::REMOTE => File.join(dynamic_segment) + } + end + + private + + # this is a hack around CarrierWave. The #migrate method needs to be + # able to force the current file to the migrated file upon success. + def file=(file) + @file = file # rubocop:disable Gitlab/ModuleWithInstanceVariables + end + + def serialization_column + model.class.uploader_options.dig(mounted_as, :mount_on) || mounted_as + end + + # Returns the column where the 'store' is saved + # defaults to 'store' + def store_serialization_column + [serialization_column, 'store'].compact.join('_').to_sym + end + + def storage + @storage ||= storage_for(object_store) + end + + def storage_for(store) + case store + when Store::REMOTE + raise 'Object Storage is not enabled' unless self.class.object_store_enabled? + + CarrierWave::Storage::Fog.new(self) + when Store::LOCAL + CarrierWave::Storage::File.new(self) + else + raise UnknownStoreError + end + end + end +end diff --git a/ee/lib/gitlab/geo/file_transfer.rb b/ee/lib/gitlab/geo/file_transfer.rb new file mode 100644 index 00000000000..16db6f2d448 --- /dev/null +++ b/ee/lib/gitlab/geo/file_transfer.rb @@ -0,0 +1,24 @@ +module Gitlab + module Geo + class FileTransfer < Transfer + def initialize(file_type, upload) + @file_type = file_type + @file_id = upload.id + @filename = upload.absolute_path + @request_data = build_request_data(upload) + rescue ObjectStorage::RemoteStoreError + Rails.logger.warn "Cannot transfer a remote object." + end + + private + + def build_request_data(upload) + { + id: upload.model_id, + type: upload.model_type, + checksum: upload.checksum + } + end + end + end +end diff --git a/ee/lib/gitlab/geo/log_cursor/daemon.rb b/ee/lib/gitlab/geo/log_cursor/daemon.rb new file mode 100644 index 00000000000..d4596286641 --- /dev/null +++ b/ee/lib/gitlab/geo/log_cursor/daemon.rb @@ -0,0 +1,266 @@ +module Gitlab + module Geo + module LogCursor + class Daemon + VERSION = '0.2.0'.freeze + BATCH_SIZE = 250 + SECONDARY_CHECK_INTERVAL = 1.minute + + attr_reader :options + + def initialize(options = {}) + @options = options + @exit = false + logger.geo_logger.build.level = options[:debug] ? :debug : Rails.logger.level + end + + def run! + trap_signals + + until exit? + # Prevent the node from processing events unless it's a secondary + unless Geo.secondary? + sleep(SECONDARY_CHECK_INTERVAL) + next + end + + lease = Lease.try_obtain_with_ttl { run_once! } + + return if exit? + + # When no new event is found sleep for a few moments + arbitrary_sleep(lease[:ttl]) + end + end + + def run_once! + LogCursor::Events.fetch_in_batches { |batch| handle_events(batch) } + end + + def handle_events(batch) + batch.each do |event_log| + next unless can_replay?(event_log) + + begin + event = event_log.event + handler = "handle_#{event.class.name.demodulize.underscore}" + + __send__(handler, event, event_log.created_at) # rubocop:disable GitlabSecurity/PublicSend + rescue NoMethodError => e + logger.error(e.message) + raise e + end + end + end + + private + + def trap_signals + trap(:TERM) do + quit! + end + trap(:INT) do + quit! + end + end + + # Safe shutdown + def quit! + $stdout.puts 'Exiting...' + + @exit = true + end + + def exit? + @exit + end + + def can_replay?(event_log) + return true if event_log.project_id.nil? + + Gitlab::Geo.current_node&.projects_include?(event_log.project_id) + end + + def handle_repository_created_event(event, created_at) + registry = find_or_initialize_registry(event.project_id, resync_repository: true, resync_wiki: event.wiki_path.present?) + + logger.event_info( + created_at, + message: 'Repository created', + project_id: event.project_id, + repo_path: event.repo_path, + wiki_path: event.wiki_path, + resync_repository: registry.resync_repository, + resync_wiki: registry.resync_wiki) + + registry.save! + + ::Geo::ProjectSyncWorker.perform_async(event.project_id, Time.now) + end + + def handle_repository_updated_event(event, created_at) + registry = find_or_initialize_registry(event.project_id, "resync_#{event.source}" => true) + + logger.event_info( + created_at, + message: 'Repository update', + project_id: event.project_id, + source: event.source, + resync_repository: registry.resync_repository, + resync_wiki: registry.resync_wiki) + + registry.save! + + ::Geo::ProjectSyncWorker.perform_async(event.project_id, Time.now) + end + + def handle_repository_deleted_event(event, created_at) + job_id = ::Geo::RepositoryDestroyService + .new(event.project_id, event.deleted_project_name, event.deleted_path, event.repository_storage_name) + .async_execute + + logger.event_info( + created_at, + message: 'Deleted project', + project_id: event.project_id, + repository_storage_name: event.repository_storage_name, + disk_path: event.deleted_path, + job_id: job_id) + + # No need to create a project entry if it doesn't exist + ::Geo::ProjectRegistry.where(project_id: event.project_id).delete_all + end + + def handle_repositories_changed_event(event, created_at) + return unless Gitlab::Geo.current_node.id == event.geo_node_id + + job_id = ::Geo::RepositoriesCleanUpWorker.perform_in(1.hour, event.geo_node_id) + + if job_id + logger.info('Scheduled repositories clean up for Geo node', geo_node_id: event.geo_node_id, job_id: job_id) + else + logger.error('Could not schedule repositories clean up for Geo node', geo_node_id: event.geo_node_id) + end + end + + def handle_repository_renamed_event(event, created_at) + return unless event.project_id + + old_path = event.old_path_with_namespace + new_path = event.new_path_with_namespace + + job_id = ::Geo::RenameRepositoryService + .new(event.project_id, old_path, new_path) + .async_execute + + logger.event_info( + created_at, + message: 'Renaming project', + project_id: event.project_id, + old_path: old_path, + new_path: new_path, + job_id: job_id) + end + + def handle_hashed_storage_migrated_event(event, created_at) + return unless event.project_id + + job_id = ::Geo::HashedStorageMigrationService.new( + event.project_id, + old_disk_path: event.old_disk_path, + new_disk_path: event.new_disk_path, + old_storage_version: event.old_storage_version + ).async_execute + + logger.event_info( + created_at, + message: 'Migrating project to hashed storage', + project_id: event.project_id, + old_storage_version: event.old_storage_version, + new_storage_version: event.new_storage_version, + old_disk_path: event.old_disk_path, + new_disk_path: event.new_disk_path, + job_id: job_id) + end + + def handle_hashed_storage_attachments_event(event, created_at) + job_id = ::Geo::HashedStorageAttachmentsMigrationService.new( + event.project_id, + old_attachments_path: event.old_attachments_path, + new_attachments_path: event.new_attachments_path + ).async_execute + + logger.event_info( + created_at, + message: 'Migrating attachments to hashed storage', + project_id: event.project_id, + old_attachments_path: event.old_attachments_path, + new_attachments_path: event.new_attachments_path, + job_id: job_id + ) + end + + def handle_lfs_object_deleted_event(event, created_at) + file_path = File.join(LfsObjectUploader.root, event.file_path) + + job_id = ::Geo::FileRemovalWorker.perform_async(file_path) + + logger.event_info( + created_at, + message: 'Deleted LFS object', + oid: event.oid, + file_id: event.lfs_object_id, + file_path: file_path, + job_id: job_id) + + ::Geo::FileRegistry.lfs_objects.where(file_id: event.lfs_object_id).delete_all + end + + def handle_job_artifact_deleted_event(event, created_at) + file_registry_job_artifacts = ::Geo::FileRegistry.job_artifacts.where(file_id: event.job_artifact_id) + return unless file_registry_job_artifacts.any? # avoid race condition + + file_path = File.join(::JobArtifactUploader.root, event.file_path) + + if File.file?(file_path) + deleted = delete_file(file_path) # delete synchronously to ensure consistency + return unless deleted # do not delete file from registry if deletion failed + end + + logger.event_info( + created_at, + message: 'Deleted job artifact', + file_id: event.job_artifact_id, + file_path: file_path) + + file_registry_job_artifacts.delete_all + end + + def find_or_initialize_registry(project_id, attrs) + registry = ::Geo::ProjectRegistry.find_or_initialize_by(project_id: project_id) + registry.assign_attributes(attrs) + registry + end + + def delete_file(path) + File.delete(path) + rescue => ex + logger.error("Failed to remove file", exception: ex.class.name, details: ex.message, filename: path) + false + end + + # Sleeps for the expired TTL that remains on the lease plus some random seconds. + # + # This allows multiple GeoLogCursors to randomly process a batch of events, + # without favouring the shortest path (or latency). + def arbitrary_sleep(delay) + sleep(delay + rand(1..20) * 0.1) + end + + def logger + Gitlab::Geo::LogCursor::Logger + end + end + end + end +end |