summaryrefslogtreecommitdiff
path: root/ee
diff options
context:
space:
mode:
Diffstat (limited to 'ee')
-rw-r--r--ee/app/models/ee/ci/job_artifact.rb25
-rw-r--r--ee/app/models/ee/lfs_object.rb23
-rw-r--r--ee/app/models/geo/fdw/ci/job_artifact.rb11
-rw-r--r--ee/app/models/geo/fdw/lfs_object.rb9
-rw-r--r--ee/app/services/geo/files_expire_service.rb77
-rw-r--r--ee/app/services/geo/hashed_storage_attachments_migration_service.rb55
-rw-r--r--ee/app/services/geo/job_artifact_deleted_event_store.rb48
-rw-r--r--ee/app/services/geo/lfs_object_deleted_event_store.rb49
-rw-r--r--ee/app/uploaders/object_storage.rb265
-rw-r--r--ee/lib/gitlab/geo/file_transfer.rb24
-rw-r--r--ee/lib/gitlab/geo/log_cursor/daemon.rb266
11 files changed, 852 insertions, 0 deletions
diff --git a/ee/app/models/ee/ci/job_artifact.rb b/ee/app/models/ee/ci/job_artifact.rb
new file mode 100644
index 00000000000..02c6715f447
--- /dev/null
+++ b/ee/app/models/ee/ci/job_artifact.rb
@@ -0,0 +1,25 @@
+module EE
+ # CI::JobArtifact EE mixin
+ #
+ # This module is intended to encapsulate EE-specific model logic
+ # and be prepended in the `Ci::JobArtifact` model
+ module Ci::JobArtifact
+ extend ActiveSupport::Concern
+
+ prepended do
+ after_destroy :log_geo_event
+
+ scope :with_files_stored_locally, -> { where(file_store: [nil, JobArtifactUploader::Store::LOCAL]) }
+ end
+
+ def local_store?
+ [nil, JobArtifactUploader::Store::LOCAL].include?(self.file_store)
+ end
+
+ private
+
+ def log_geo_event
+ ::Geo::JobArtifactDeletedEventStore.new(self).create
+ end
+ end
+end
diff --git a/ee/app/models/ee/lfs_object.rb b/ee/app/models/ee/lfs_object.rb
new file mode 100644
index 00000000000..6962c2bea4f
--- /dev/null
+++ b/ee/app/models/ee/lfs_object.rb
@@ -0,0 +1,23 @@
+module EE
+ # LFS Object EE mixin
+ #
+ # This module is intended to encapsulate EE-specific model logic
+ # and be prepended in the `LfsObject` model
+ module LfsObject
+ extend ActiveSupport::Concern
+
+ prepended do
+ after_destroy :log_geo_event
+ end
+
+ def local_store?
+ [nil, LfsObjectUploader::Store::LOCAL].include?(self.file_store)
+ end
+
+ private
+
+ def log_geo_event
+ ::Geo::LfsObjectDeletedEventStore.new(self).create
+ end
+ end
+end
diff --git a/ee/app/models/geo/fdw/ci/job_artifact.rb b/ee/app/models/geo/fdw/ci/job_artifact.rb
new file mode 100644
index 00000000000..eaca84b332e
--- /dev/null
+++ b/ee/app/models/geo/fdw/ci/job_artifact.rb
@@ -0,0 +1,11 @@
+module Geo
+ module Fdw
+ module Ci
+ class JobArtifact < ::Geo::BaseFdw
+ self.table_name = Gitlab::Geo.fdw_table('ci_job_artifacts')
+
+ scope :with_files_stored_locally, -> { where(file_store: [nil, JobArtifactUploader::Store::LOCAL]) }
+ end
+ end
+ end
+end
diff --git a/ee/app/models/geo/fdw/lfs_object.rb b/ee/app/models/geo/fdw/lfs_object.rb
new file mode 100644
index 00000000000..18aae28518d
--- /dev/null
+++ b/ee/app/models/geo/fdw/lfs_object.rb
@@ -0,0 +1,9 @@
+module Geo
+ module Fdw
+ class LfsObject < ::Geo::BaseFdw
+ self.table_name = Gitlab::Geo.fdw_table('lfs_objects')
+
+ scope :with_files_stored_locally, -> { where(file_store: [nil, LfsObjectUploader::Store::LOCAL]) }
+ end
+ end
+end
diff --git a/ee/app/services/geo/files_expire_service.rb b/ee/app/services/geo/files_expire_service.rb
new file mode 100644
index 00000000000..e3604674d85
--- /dev/null
+++ b/ee/app/services/geo/files_expire_service.rb
@@ -0,0 +1,77 @@
+module Geo
+ class FilesExpireService
+ include ::Gitlab::Geo::LogHelpers
+
+ BATCH_SIZE = 500
+
+ attr_reader :project, :old_full_path
+
+ def initialize(project, old_full_path)
+ @project = project
+ @old_full_path = old_full_path
+ end
+
+ # Expire already replicated uploads
+ #
+ # This is a fallback solution to support projects that haven't rolled out to hashed-storage yet.
+ #
+ # Note: Unless we add some locking mechanism, this will be best effort only
+ # as if there are files that are being replicated during this execution, they will not
+ # be expired.
+ #
+ # The long-term solution is to use hashed storage.
+ def execute
+ return unless Gitlab::Geo.secondary?
+
+ uploads = finder.find_project_uploads(project)
+ log_info("Expiring replicated attachments after project rename", count: uploads.count)
+
+ schedule_file_removal(uploads)
+ mark_for_resync!
+ end
+
+ # Project's base directory for attachments storage
+ #
+ # @return base directory where all uploads for the project are stored
+ def base_dir
+ @base_dir ||= File.join(FileUploader.root, old_full_path)
+ end
+
+ private
+
+ def schedule_file_removal(uploads)
+ paths_to_remove = uploads.find_each(batch_size: BATCH_SIZE).each_with_object([]) do |upload, to_remove|
+ file_path = File.join(base_dir, upload.path)
+
+ if File.exist?(file_path)
+ to_remove << [file_path]
+
+ log_info("Scheduled to remove file", file_path: file_path)
+ end
+ end
+
+ Geo::FileRemovalWorker.bulk_perform_async(paths_to_remove)
+ end
+
+ def mark_for_resync!
+ finder.find_file_registries_uploads(project).delete_all
+ end
+
+ def finder
+ @finder ||= ::Geo::ExpireUploadsFinder.new
+ end
+
+ # This is called by LogHelpers to build json log with context info
+ #
+ # @see ::Gitlab::Geo::LogHelpers
+ def base_log_data(message)
+ {
+ class: self.class.name,
+ project_id: project.id,
+ project_path: project.full_path,
+ project_old_path: old_full_path,
+ message: message
+ }
+ end
+ end
+end
diff --git a/ee/app/services/geo/hashed_storage_attachments_migration_service.rb b/ee/app/services/geo/hashed_storage_attachments_migration_service.rb
new file mode 100644
index 00000000000..d967d8f6d5e
--- /dev/null
+++ b/ee/app/services/geo/hashed_storage_attachments_migration_service.rb
@@ -0,0 +1,55 @@
+module Geo
+ AttachmentMigrationError = Class.new(StandardError)
+
+ class HashedStorageAttachmentsMigrationService
+ include ::Gitlab::Geo::LogHelpers
+
+ attr_reader :project_id, :old_attachments_path, :new_attachments_path
+
+ def initialize(project_id, old_attachments_path:, new_attachments_path:)
+ @project_id = project_id
+ @old_attachments_path = old_attachments_path
+ @new_attachments_path = new_attachments_path
+ end
+
+ def async_execute
+ Geo::HashedStorageAttachmentsMigrationWorker.perform_async(
+ project_id,
+ old_attachments_path,
+ new_attachments_path
+ )
+ end
+
+ def execute
+ origin = File.join(FileUploader.root, old_attachments_path)
+ target = File.join(FileUploader.root, new_attachments_path)
+ move_folder!(origin, target)
+ end
+
+ private
+
+ def project
+ @project ||= Project.find(project_id)
+ end
+
+ def move_folder!(old_path, new_path)
+ unless File.directory?(old_path)
+ log_info("Skipped attachments migration to Hashed Storage, source path doesn't exist or is not a directory", project_id: project.id, source: old_path, target: new_path)
+ return
+ end
+
+ if File.exist?(new_path)
+ log_error("Cannot migrate attachments to Hashed Storage, target path already exist", project_id: project.id, source: old_path, target: new_path)
+ raise AttachmentMigrationError, "Target path '#{new_path}' already exist"
+ end
+
+ # Create hashed storage base path folder
+ FileUtils.mkdir_p(File.dirname(new_path))
+
+ FileUtils.mv(old_path, new_path)
+ log_info("Migrated project attachments to Hashed Storage", project_id: project.id, source: old_path, target: new_path)
+
+ true
+ end
+ end
+end
diff --git a/ee/app/services/geo/job_artifact_deleted_event_store.rb b/ee/app/services/geo/job_artifact_deleted_event_store.rb
new file mode 100644
index 00000000000..7455773985c
--- /dev/null
+++ b/ee/app/services/geo/job_artifact_deleted_event_store.rb
@@ -0,0 +1,48 @@
+module Geo
+ class JobArtifactDeletedEventStore < EventStore
+ self.event_type = :job_artifact_deleted_event
+
+ attr_reader :job_artifact
+
+ def initialize(job_artifact)
+ @job_artifact = job_artifact
+ end
+
+ def create
+ return unless job_artifact.local_store?
+
+ super
+ end
+
+ private
+
+ def build_event
+ Geo::JobArtifactDeletedEvent.new(
+ job_artifact: job_artifact,
+ file_path: relative_file_path
+ )
+ end
+
+ def local_store_path
+ Pathname.new(JobArtifactUploader.root)
+ end
+
+ def relative_file_path
+ return unless job_artifact.file.present?
+
+ Pathname.new(job_artifact.file.path).relative_path_from(local_store_path)
+ end
+
+ # This is called by ProjectLogHelpers to build json log with context info
+ #
+ # @see ::Gitlab::Geo::ProjectLogHelpers
+ def base_log_data(message)
+ {
+ class: self.class.name,
+ job_artifact_id: job_artifact.id,
+ file_path: job_artifact.file.path,
+ message: message
+ }
+ end
+ end
+end
diff --git a/ee/app/services/geo/lfs_object_deleted_event_store.rb b/ee/app/services/geo/lfs_object_deleted_event_store.rb
new file mode 100644
index 00000000000..9eb47f91472
--- /dev/null
+++ b/ee/app/services/geo/lfs_object_deleted_event_store.rb
@@ -0,0 +1,49 @@
+module Geo
+ class LfsObjectDeletedEventStore < EventStore
+ self.event_type = :lfs_object_deleted_event
+
+ attr_reader :lfs_object
+
+ def initialize(lfs_object)
+ @lfs_object = lfs_object
+ end
+
+ def create
+ return unless lfs_object.local_store?
+
+ super
+ end
+
+ private
+
+ def build_event
+ Geo::LfsObjectDeletedEvent.new(
+ lfs_object: lfs_object,
+ oid: lfs_object.oid,
+ file_path: relative_file_path
+ )
+ end
+
+ def local_store_path
+ Pathname.new(LfsObjectUploader.root)
+ end
+
+ def relative_file_path
+ return unless lfs_object.file.present?
+
+ Pathname.new(lfs_object.file.path).relative_path_from(local_store_path)
+ end
+
+ # This is called by ProjectLogHelpers to build json log with context info
+ #
+ # @see ::Gitlab::Geo::ProjectLogHelpers
+ def base_log_data(message)
+ {
+ class: self.class.name,
+ lfs_object_id: lfs_object.id,
+ file_path: lfs_object.file.path,
+ message: message
+ }
+ end
+ end
+end
diff --git a/ee/app/uploaders/object_storage.rb b/ee/app/uploaders/object_storage.rb
new file mode 100644
index 00000000000..e5b087524f5
--- /dev/null
+++ b/ee/app/uploaders/object_storage.rb
@@ -0,0 +1,265 @@
+require 'fog/aws'
+require 'carrierwave/storage/fog'
+
+#
+# This concern should add object storage support
+# to the GitlabUploader class
+#
+module ObjectStorage
+ RemoteStoreError = Class.new(StandardError)
+ UnknownStoreError = Class.new(StandardError)
+ ObjectStoreUnavailable = Class.new(StandardError)
+
+ module Store
+ LOCAL = 1
+ REMOTE = 2
+ end
+
+ module Extension
+ # this extension is the glue between the ObjectStorage::Concern and RecordsUploads::Concern
+ module RecordsUploads
+ extend ActiveSupport::Concern
+
+ prepended do |base|
+ raise ObjectStoreUnavailable, "#{base} must include ObjectStorage::Concern to use extensions." unless base < Concern
+
+ base.include(::RecordsUploads::Concern)
+ end
+
+ def retrieve_from_store!(identifier)
+ paths = store_dirs.map { |store, path| File.join(path, identifier) }
+
+ unless current_upload_satisfies?(paths, model)
+ # the upload we already have isn't right, find the correct one
+ self.upload = uploads.find_by(model: model, path: paths)
+ end
+
+ super
+ end
+
+ def build_upload_from_uploader(uploader)
+ super.tap { |upload| upload.store = object_store }
+ end
+
+ def upload=(upload)
+ return unless upload
+
+ self.object_store = upload.store
+ super
+ end
+
+ private
+
+ def current_upload_satisfies?(paths, model)
+ return false unless upload
+ return false unless model
+
+ paths.include?(upload.path) &&
+ upload.model_id == model.id &&
+ upload.model_type == model.class.base_class.sti_name
+ end
+ end
+ end
+
+ module Concern
+ extend ActiveSupport::Concern
+
+ included do |base|
+ base.include(ObjectStorage)
+
+ before :store, :verify_license!
+ after :migrate, :delete_migrated_file
+ end
+
+ class_methods do
+ def object_store_options
+ options.object_store
+ end
+
+ def object_store_enabled?
+ object_store_options.enabled
+ end
+
+ def background_upload_enabled?
+ object_store_options.background_upload
+ end
+
+ def object_store_credentials
+ object_store_options.connection.to_hash.deep_symbolize_keys
+ end
+
+ def remote_store_path
+ object_store_options.remote_directory
+ end
+
+ def licensed?
+ License.feature_available?(:object_storage)
+ end
+ end
+
+ def file_storage?
+ storage.is_a?(CarrierWave::Storage::File)
+ end
+
+ def file_cache_storage?
+ cache_storage.is_a?(CarrierWave::Storage::File)
+ end
+
+ def object_store
+ @object_store ||= model.try(store_serialization_column) || Store::LOCAL
+ end
+
+ # rubocop:disable Gitlab/ModuleWithInstanceVariables
+ def object_store=(value)
+ @object_store = value || Store::LOCAL
+ @storage = storage_for(object_store)
+ end
+ # rubocop:enable Gitlab/ModuleWithInstanceVariables
+
+ # Return true if the current file is part or the model (i.e. is mounted in the model)
+ #
+ def persist_object_store?
+ model.respond_to?(:"#{store_serialization_column}=")
+ end
+
+ # Save the current @object_store to the model <mounted_as>_store column
+ def persist_object_store!
+ return unless persist_object_store?
+
+ updated = model.update_column(store_serialization_column, object_store)
+ raise ActiveRecordError unless updated
+ end
+
+ def use_file
+ if file_storage?
+ return yield path
+ end
+
+ begin
+ cache_stored_file!
+ yield cache_path
+ ensure
+ cache_storage.delete_dir!(cache_path(nil))
+ end
+ end
+
+ def filename
+ super || file&.filename
+ end
+
+ #
+ # Move the file to another store
+ #
+ # new_store: Enum (Store::LOCAL, Store::REMOTE)
+ #
+ def migrate!(new_store)
+ return unless object_store != new_store
+ return unless file
+
+ new_file = nil
+ file_to_delete = file
+ from_object_store = object_store
+ self.object_store = new_store # changes the storage and file
+
+ cache_stored_file! if file_storage?
+
+ with_callbacks(:migrate, file_to_delete) do
+ with_callbacks(:store, file_to_delete) do # for #store_versions!
+ new_file = storage.store!(file)
+ persist_object_store!
+ self.file = new_file
+ end
+ end
+
+ file
+ rescue => e
+ # in case of failure delete new file
+ new_file.delete unless new_file.nil?
+ # revert back to the old file
+ self.object_store = from_object_store
+ self.file = file_to_delete
+ raise e
+ end
+
+ def schedule_migration_to_object_storage(*args)
+ return unless self.class.object_store_enabled?
+ return unless self.class.background_upload_enabled?
+ return unless self.class.licensed?
+ return unless self.file_storage?
+
+ ObjectStorageUploadWorker.perform_async(self.class.name, model.class.name, mounted_as, model.id)
+ end
+
+ def fog_directory
+ self.class.remote_store_path
+ end
+
+ def fog_credentials
+ self.class.object_store_credentials
+ end
+
+ def fog_public
+ false
+ end
+
+ def delete_migrated_file(migrated_file)
+ migrated_file.delete if exists?
+ end
+
+ def verify_license!(_file)
+ return if file_storage?
+
+ raise 'Object Storage feature is missing' unless self.class.licensed?
+ end
+
+ def exists?
+ file.present?
+ end
+
+ def store_dir(store = nil)
+ store_dirs[store || object_store]
+ end
+
+ def store_dirs
+ {
+ Store::LOCAL => File.join(base_dir, dynamic_segment),
+ Store::REMOTE => File.join(dynamic_segment)
+ }
+ end
+
+ private
+
+ # this is a hack around CarrierWave. The #migrate method needs to be
+ # able to force the current file to the migrated file upon success.
+ def file=(file)
+ @file = file # rubocop:disable Gitlab/ModuleWithInstanceVariables
+ end
+
+ def serialization_column
+ model.class.uploader_options.dig(mounted_as, :mount_on) || mounted_as
+ end
+
+ # Returns the column where the 'store' is saved
+ # defaults to 'store'
+ def store_serialization_column
+ [serialization_column, 'store'].compact.join('_').to_sym
+ end
+
+ def storage
+ @storage ||= storage_for(object_store)
+ end
+
+ def storage_for(store)
+ case store
+ when Store::REMOTE
+ raise 'Object Storage is not enabled' unless self.class.object_store_enabled?
+
+ CarrierWave::Storage::Fog.new(self)
+ when Store::LOCAL
+ CarrierWave::Storage::File.new(self)
+ else
+ raise UnknownStoreError
+ end
+ end
+ end
+end
diff --git a/ee/lib/gitlab/geo/file_transfer.rb b/ee/lib/gitlab/geo/file_transfer.rb
new file mode 100644
index 00000000000..16db6f2d448
--- /dev/null
+++ b/ee/lib/gitlab/geo/file_transfer.rb
@@ -0,0 +1,24 @@
+module Gitlab
+ module Geo
+ class FileTransfer < Transfer
+ def initialize(file_type, upload)
+ @file_type = file_type
+ @file_id = upload.id
+ @filename = upload.absolute_path
+ @request_data = build_request_data(upload)
+ rescue ObjectStorage::RemoteStoreError
+ Rails.logger.warn "Cannot transfer a remote object."
+ end
+
+ private
+
+ def build_request_data(upload)
+ {
+ id: upload.model_id,
+ type: upload.model_type,
+ checksum: upload.checksum
+ }
+ end
+ end
+ end
+end
diff --git a/ee/lib/gitlab/geo/log_cursor/daemon.rb b/ee/lib/gitlab/geo/log_cursor/daemon.rb
new file mode 100644
index 00000000000..d4596286641
--- /dev/null
+++ b/ee/lib/gitlab/geo/log_cursor/daemon.rb
@@ -0,0 +1,266 @@
+module Gitlab
+ module Geo
+ module LogCursor
+ class Daemon
+ VERSION = '0.2.0'.freeze
+ BATCH_SIZE = 250
+ SECONDARY_CHECK_INTERVAL = 1.minute
+
+ attr_reader :options
+
+ def initialize(options = {})
+ @options = options
+ @exit = false
+ logger.geo_logger.build.level = options[:debug] ? :debug : Rails.logger.level
+ end
+
+ def run!
+ trap_signals
+
+ until exit?
+ # Prevent the node from processing events unless it's a secondary
+ unless Geo.secondary?
+ sleep(SECONDARY_CHECK_INTERVAL)
+ next
+ end
+
+ lease = Lease.try_obtain_with_ttl { run_once! }
+
+ return if exit?
+
+ # When no new event is found sleep for a few moments
+ arbitrary_sleep(lease[:ttl])
+ end
+ end
+
+ def run_once!
+ LogCursor::Events.fetch_in_batches { |batch| handle_events(batch) }
+ end
+
+ def handle_events(batch)
+ batch.each do |event_log|
+ next unless can_replay?(event_log)
+
+ begin
+ event = event_log.event
+ handler = "handle_#{event.class.name.demodulize.underscore}"
+
+ __send__(handler, event, event_log.created_at) # rubocop:disable GitlabSecurity/PublicSend
+ rescue NoMethodError => e
+ logger.error(e.message)
+ raise e
+ end
+ end
+ end
+
+ private
+
+ def trap_signals
+ trap(:TERM) do
+ quit!
+ end
+ trap(:INT) do
+ quit!
+ end
+ end
+
+ # Safe shutdown
+ def quit!
+ $stdout.puts 'Exiting...'
+
+ @exit = true
+ end
+
+ def exit?
+ @exit
+ end
+
+ def can_replay?(event_log)
+ return true if event_log.project_id.nil?
+
+ Gitlab::Geo.current_node&.projects_include?(event_log.project_id)
+ end
+
+ def handle_repository_created_event(event, created_at)
+ registry = find_or_initialize_registry(event.project_id, resync_repository: true, resync_wiki: event.wiki_path.present?)
+
+ logger.event_info(
+ created_at,
+ message: 'Repository created',
+ project_id: event.project_id,
+ repo_path: event.repo_path,
+ wiki_path: event.wiki_path,
+ resync_repository: registry.resync_repository,
+ resync_wiki: registry.resync_wiki)
+
+ registry.save!
+
+ ::Geo::ProjectSyncWorker.perform_async(event.project_id, Time.now)
+ end
+
+ def handle_repository_updated_event(event, created_at)
+ registry = find_or_initialize_registry(event.project_id, "resync_#{event.source}" => true)
+
+ logger.event_info(
+ created_at,
+ message: 'Repository update',
+ project_id: event.project_id,
+ source: event.source,
+ resync_repository: registry.resync_repository,
+ resync_wiki: registry.resync_wiki)
+
+ registry.save!
+
+ ::Geo::ProjectSyncWorker.perform_async(event.project_id, Time.now)
+ end
+
+ def handle_repository_deleted_event(event, created_at)
+ job_id = ::Geo::RepositoryDestroyService
+ .new(event.project_id, event.deleted_project_name, event.deleted_path, event.repository_storage_name)
+ .async_execute
+
+ logger.event_info(
+ created_at,
+ message: 'Deleted project',
+ project_id: event.project_id,
+ repository_storage_name: event.repository_storage_name,
+ disk_path: event.deleted_path,
+ job_id: job_id)
+
+ # No need to create a project entry if it doesn't exist
+ ::Geo::ProjectRegistry.where(project_id: event.project_id).delete_all
+ end
+
+ def handle_repositories_changed_event(event, created_at)
+ return unless Gitlab::Geo.current_node.id == event.geo_node_id
+
+ job_id = ::Geo::RepositoriesCleanUpWorker.perform_in(1.hour, event.geo_node_id)
+
+ if job_id
+ logger.info('Scheduled repositories clean up for Geo node', geo_node_id: event.geo_node_id, job_id: job_id)
+ else
+ logger.error('Could not schedule repositories clean up for Geo node', geo_node_id: event.geo_node_id)
+ end
+ end
+
+ def handle_repository_renamed_event(event, created_at)
+ return unless event.project_id
+
+ old_path = event.old_path_with_namespace
+ new_path = event.new_path_with_namespace
+
+ job_id = ::Geo::RenameRepositoryService
+ .new(event.project_id, old_path, new_path)
+ .async_execute
+
+ logger.event_info(
+ created_at,
+ message: 'Renaming project',
+ project_id: event.project_id,
+ old_path: old_path,
+ new_path: new_path,
+ job_id: job_id)
+ end
+
+ def handle_hashed_storage_migrated_event(event, created_at)
+ return unless event.project_id
+
+ job_id = ::Geo::HashedStorageMigrationService.new(
+ event.project_id,
+ old_disk_path: event.old_disk_path,
+ new_disk_path: event.new_disk_path,
+ old_storage_version: event.old_storage_version
+ ).async_execute
+
+ logger.event_info(
+ created_at,
+ message: 'Migrating project to hashed storage',
+ project_id: event.project_id,
+ old_storage_version: event.old_storage_version,
+ new_storage_version: event.new_storage_version,
+ old_disk_path: event.old_disk_path,
+ new_disk_path: event.new_disk_path,
+ job_id: job_id)
+ end
+
+ def handle_hashed_storage_attachments_event(event, created_at)
+ job_id = ::Geo::HashedStorageAttachmentsMigrationService.new(
+ event.project_id,
+ old_attachments_path: event.old_attachments_path,
+ new_attachments_path: event.new_attachments_path
+ ).async_execute
+
+ logger.event_info(
+ created_at,
+ message: 'Migrating attachments to hashed storage',
+ project_id: event.project_id,
+ old_attachments_path: event.old_attachments_path,
+ new_attachments_path: event.new_attachments_path,
+ job_id: job_id
+ )
+ end
+
+ def handle_lfs_object_deleted_event(event, created_at)
+ file_path = File.join(LfsObjectUploader.root, event.file_path)
+
+ job_id = ::Geo::FileRemovalWorker.perform_async(file_path)
+
+ logger.event_info(
+ created_at,
+ message: 'Deleted LFS object',
+ oid: event.oid,
+ file_id: event.lfs_object_id,
+ file_path: file_path,
+ job_id: job_id)
+
+ ::Geo::FileRegistry.lfs_objects.where(file_id: event.lfs_object_id).delete_all
+ end
+
+ def handle_job_artifact_deleted_event(event, created_at)
+ file_registry_job_artifacts = ::Geo::FileRegistry.job_artifacts.where(file_id: event.job_artifact_id)
+ return unless file_registry_job_artifacts.any? # avoid race condition
+
+ file_path = File.join(::JobArtifactUploader.root, event.file_path)
+
+ if File.file?(file_path)
+ deleted = delete_file(file_path) # delete synchronously to ensure consistency
+ return unless deleted # do not delete file from registry if deletion failed
+ end
+
+ logger.event_info(
+ created_at,
+ message: 'Deleted job artifact',
+ file_id: event.job_artifact_id,
+ file_path: file_path)
+
+ file_registry_job_artifacts.delete_all
+ end
+
+ def find_or_initialize_registry(project_id, attrs)
+ registry = ::Geo::ProjectRegistry.find_or_initialize_by(project_id: project_id)
+ registry.assign_attributes(attrs)
+ registry
+ end
+
+ def delete_file(path)
+ File.delete(path)
+ rescue => ex
+ logger.error("Failed to remove file", exception: ex.class.name, details: ex.message, filename: path)
+ false
+ end
+
+ # Sleeps for the expired TTL that remains on the lease plus some random seconds.
+ #
+ # This allows multiple GeoLogCursors to randomly process a batch of events,
+ # without favouring the shortest path (or latency).
+ def arbitrary_sleep(delay)
+ sleep(delay + rand(1..20) * 0.1)
+ end
+
+ def logger
+ Gitlab::Geo::LogCursor::Logger
+ end
+ end
+ end
+ end
+end