summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMicaël Bergeron <mbergeron@gitlab.com>2018-03-01 11:18:37 -0500
committerMicaël Bergeron <mbergeron@gitlab.com>2018-03-01 11:18:37 -0500
commit0e732fa466c4d1c48163472b268dded18cc39407 (patch)
tree240c9aca3d1a92813f1eb6620dbb8e67bd9db8ac
parent508938873cb00b8eed53d6aacdd0e0940d8425d9 (diff)
downloadgitlab-ce-0e732fa466c4d1c48163472b268dded18cc39407.tar.gz
removed EE specific code from the port
-rw-r--r--ee/app/controllers/ee/projects/jobs_controller.rb32
-rw-r--r--ee/app/models/ee/ci/job_artifact.rb25
-rw-r--r--ee/app/models/ee/lfs_object.rb23
-rw-r--r--ee/app/models/geo/fdw/ci/job_artifact.rb11
-rw-r--r--ee/app/models/geo/fdw/lfs_object.rb9
-rw-r--r--ee/app/services/geo/files_expire_service.rb77
-rw-r--r--ee/app/services/geo/hashed_storage_attachments_migration_service.rb55
-rw-r--r--ee/app/services/geo/job_artifact_deleted_event_store.rb48
-rw-r--r--ee/app/services/geo/lfs_object_deleted_event_store.rb49
-rw-r--r--ee/app/uploaders/ee/job_artifact_uploader.rb13
-rw-r--r--ee/app/uploaders/object_storage.rb316
-rw-r--r--ee/lib/gitlab/geo/file_transfer.rb24
-rw-r--r--ee/lib/gitlab/geo/log_cursor/daemon.rb266
-rw-r--r--lib/gitlab/ci/trace/http_io.rb (renamed from ee/lib/gitlab/ci/trace/http_io.rb)0
-rw-r--r--spec/lib/gitlab/ci/trace/http_io_spec.rb (renamed from spec/ee/spec/lib/gitlab/ci/trace/http_io_spec.rb)0
15 files changed, 0 insertions, 948 deletions
diff --git a/ee/app/controllers/ee/projects/jobs_controller.rb b/ee/app/controllers/ee/projects/jobs_controller.rb
deleted file mode 100644
index 03e67d3b549..00000000000
--- a/ee/app/controllers/ee/projects/jobs_controller.rb
+++ /dev/null
@@ -1,32 +0,0 @@
-module EE
- module Projects
- module JobsController
- extend ActiveSupport::Concern
- include SendFileUpload
-
- def raw
- if trace_artifact_file
- send_upload(trace_artifact_file,
- send_params: raw_send_params,
- redirect_params: raw_redirect_params)
- else
- super
- end
- end
-
- private
-
- def raw_send_params
- { type: 'text/plain; charset=utf-8', disposition: 'inline' }
- end
-
- def raw_redirect_params
- { query: { 'response-content-type' => 'text/plain; charset=utf-8', 'response-content-disposition' => 'inline' } }
- end
-
- def trace_artifact_file
- @trace_artifact_file ||= build.job_artifacts_trace&.file
- end
- end
- end
-end
diff --git a/ee/app/models/ee/ci/job_artifact.rb b/ee/app/models/ee/ci/job_artifact.rb
deleted file mode 100644
index 7dd5925bfc9..00000000000
--- a/ee/app/models/ee/ci/job_artifact.rb
+++ /dev/null
@@ -1,25 +0,0 @@
-module EE
- # CI::JobArtifact EE mixin
- #
- # This module is intended to encapsulate EE-specific model logic
- # and be prepended in the `Ci::JobArtifact` model
- module Ci::JobArtifact
- extend ActiveSupport::Concern
-
- prepended do
- after_destroy :log_geo_event
-
- scope :with_files_stored_locally, -> { where(file_store: [nil, ::JobArtifactUploader::Store::LOCAL]) }
- end
-
- def local_store?
- [nil, ::JobArtifactUploader::Store::LOCAL].include?(self.file_store)
- end
-
- private
-
- def log_geo_event
- ::Geo::JobArtifactDeletedEventStore.new(self).create
- end
- end
-end
diff --git a/ee/app/models/ee/lfs_object.rb b/ee/app/models/ee/lfs_object.rb
deleted file mode 100644
index 6962c2bea4f..00000000000
--- a/ee/app/models/ee/lfs_object.rb
+++ /dev/null
@@ -1,23 +0,0 @@
-module EE
- # LFS Object EE mixin
- #
- # This module is intended to encapsulate EE-specific model logic
- # and be prepended in the `LfsObject` model
- module LfsObject
- extend ActiveSupport::Concern
-
- prepended do
- after_destroy :log_geo_event
- end
-
- def local_store?
- [nil, LfsObjectUploader::Store::LOCAL].include?(self.file_store)
- end
-
- private
-
- def log_geo_event
- ::Geo::LfsObjectDeletedEventStore.new(self).create
- end
- end
-end
diff --git a/ee/app/models/geo/fdw/ci/job_artifact.rb b/ee/app/models/geo/fdw/ci/job_artifact.rb
deleted file mode 100644
index eaca84b332e..00000000000
--- a/ee/app/models/geo/fdw/ci/job_artifact.rb
+++ /dev/null
@@ -1,11 +0,0 @@
-module Geo
- module Fdw
- module Ci
- class JobArtifact < ::Geo::BaseFdw
- self.table_name = Gitlab::Geo.fdw_table('ci_job_artifacts')
-
- scope :with_files_stored_locally, -> { where(file_store: [nil, JobArtifactUploader::Store::LOCAL]) }
- end
- end
- end
-end
diff --git a/ee/app/models/geo/fdw/lfs_object.rb b/ee/app/models/geo/fdw/lfs_object.rb
deleted file mode 100644
index 18aae28518d..00000000000
--- a/ee/app/models/geo/fdw/lfs_object.rb
+++ /dev/null
@@ -1,9 +0,0 @@
-module Geo
- module Fdw
- class LfsObject < ::Geo::BaseFdw
- self.table_name = Gitlab::Geo.fdw_table('lfs_objects')
-
- scope :with_files_stored_locally, -> { where(file_store: [nil, LfsObjectUploader::Store::LOCAL]) }
- end
- end
-end
diff --git a/ee/app/services/geo/files_expire_service.rb b/ee/app/services/geo/files_expire_service.rb
deleted file mode 100644
index e3604674d85..00000000000
--- a/ee/app/services/geo/files_expire_service.rb
+++ /dev/null
@@ -1,77 +0,0 @@
-module Geo
- class FilesExpireService
- include ::Gitlab::Geo::LogHelpers
-
- BATCH_SIZE = 500
-
- attr_reader :project, :old_full_path
-
- def initialize(project, old_full_path)
- @project = project
- @old_full_path = old_full_path
- end
-
- # Expire already replicated uploads
- #
- # This is a fallback solution to support projects that haven't rolled out to hashed-storage yet.
- #
- # Note: Unless we add some locking mechanism, this will be best effort only
- # as if there are files that are being replicated during this execution, they will not
- # be expired.
- #
- # The long-term solution is to use hashed storage.
- def execute
- return unless Gitlab::Geo.secondary?
-
- uploads = finder.find_project_uploads(project)
- log_info("Expiring replicated attachments after project rename", count: uploads.count)
-
- schedule_file_removal(uploads)
- mark_for_resync!
- end
-
- # Project's base directory for attachments storage
- #
- # @return base directory where all uploads for the project are stored
- def base_dir
- @base_dir ||= File.join(FileUploader.root, old_full_path)
- end
-
- private
-
- def schedule_file_removal(uploads)
- paths_to_remove = uploads.find_each(batch_size: BATCH_SIZE).each_with_object([]) do |upload, to_remove|
- file_path = File.join(base_dir, upload.path)
-
- if File.exist?(file_path)
- to_remove << [file_path]
-
- log_info("Scheduled to remove file", file_path: file_path)
- end
- end
-
- Geo::FileRemovalWorker.bulk_perform_async(paths_to_remove)
- end
-
- def mark_for_resync!
- finder.find_file_registries_uploads(project).delete_all
- end
-
- def finder
- @finder ||= ::Geo::ExpireUploadsFinder.new
- end
-
- # This is called by LogHelpers to build json log with context info
- #
- # @see ::Gitlab::Geo::LogHelpers
- def base_log_data(message)
- {
- class: self.class.name,
- project_id: project.id,
- project_path: project.full_path,
- project_old_path: old_full_path,
- message: message
- }
- end
- end
-end
diff --git a/ee/app/services/geo/hashed_storage_attachments_migration_service.rb b/ee/app/services/geo/hashed_storage_attachments_migration_service.rb
deleted file mode 100644
index d967d8f6d5e..00000000000
--- a/ee/app/services/geo/hashed_storage_attachments_migration_service.rb
+++ /dev/null
@@ -1,55 +0,0 @@
-module Geo
- AttachmentMigrationError = Class.new(StandardError)
-
- class HashedStorageAttachmentsMigrationService
- include ::Gitlab::Geo::LogHelpers
-
- attr_reader :project_id, :old_attachments_path, :new_attachments_path
-
- def initialize(project_id, old_attachments_path:, new_attachments_path:)
- @project_id = project_id
- @old_attachments_path = old_attachments_path
- @new_attachments_path = new_attachments_path
- end
-
- def async_execute
- Geo::HashedStorageAttachmentsMigrationWorker.perform_async(
- project_id,
- old_attachments_path,
- new_attachments_path
- )
- end
-
- def execute
- origin = File.join(FileUploader.root, old_attachments_path)
- target = File.join(FileUploader.root, new_attachments_path)
- move_folder!(origin, target)
- end
-
- private
-
- def project
- @project ||= Project.find(project_id)
- end
-
- def move_folder!(old_path, new_path)
- unless File.directory?(old_path)
- log_info("Skipped attachments migration to Hashed Storage, source path doesn't exist or is not a directory", project_id: project.id, source: old_path, target: new_path)
- return
- end
-
- if File.exist?(new_path)
- log_error("Cannot migrate attachments to Hashed Storage, target path already exist", project_id: project.id, source: old_path, target: new_path)
- raise AttachmentMigrationError, "Target path '#{new_path}' already exist"
- end
-
- # Create hashed storage base path folder
- FileUtils.mkdir_p(File.dirname(new_path))
-
- FileUtils.mv(old_path, new_path)
- log_info("Migrated project attachments to Hashed Storage", project_id: project.id, source: old_path, target: new_path)
-
- true
- end
- end
-end
diff --git a/ee/app/services/geo/job_artifact_deleted_event_store.rb b/ee/app/services/geo/job_artifact_deleted_event_store.rb
deleted file mode 100644
index 7455773985c..00000000000
--- a/ee/app/services/geo/job_artifact_deleted_event_store.rb
+++ /dev/null
@@ -1,48 +0,0 @@
-module Geo
- class JobArtifactDeletedEventStore < EventStore
- self.event_type = :job_artifact_deleted_event
-
- attr_reader :job_artifact
-
- def initialize(job_artifact)
- @job_artifact = job_artifact
- end
-
- def create
- return unless job_artifact.local_store?
-
- super
- end
-
- private
-
- def build_event
- Geo::JobArtifactDeletedEvent.new(
- job_artifact: job_artifact,
- file_path: relative_file_path
- )
- end
-
- def local_store_path
- Pathname.new(JobArtifactUploader.root)
- end
-
- def relative_file_path
- return unless job_artifact.file.present?
-
- Pathname.new(job_artifact.file.path).relative_path_from(local_store_path)
- end
-
- # This is called by ProjectLogHelpers to build json log with context info
- #
- # @see ::Gitlab::Geo::ProjectLogHelpers
- def base_log_data(message)
- {
- class: self.class.name,
- job_artifact_id: job_artifact.id,
- file_path: job_artifact.file.path,
- message: message
- }
- end
- end
-end
diff --git a/ee/app/services/geo/lfs_object_deleted_event_store.rb b/ee/app/services/geo/lfs_object_deleted_event_store.rb
deleted file mode 100644
index 9eb47f91472..00000000000
--- a/ee/app/services/geo/lfs_object_deleted_event_store.rb
+++ /dev/null
@@ -1,49 +0,0 @@
-module Geo
- class LfsObjectDeletedEventStore < EventStore
- self.event_type = :lfs_object_deleted_event
-
- attr_reader :lfs_object
-
- def initialize(lfs_object)
- @lfs_object = lfs_object
- end
-
- def create
- return unless lfs_object.local_store?
-
- super
- end
-
- private
-
- def build_event
- Geo::LfsObjectDeletedEvent.new(
- lfs_object: lfs_object,
- oid: lfs_object.oid,
- file_path: relative_file_path
- )
- end
-
- def local_store_path
- Pathname.new(LfsObjectUploader.root)
- end
-
- def relative_file_path
- return unless lfs_object.file.present?
-
- Pathname.new(lfs_object.file.path).relative_path_from(local_store_path)
- end
-
- # This is called by ProjectLogHelpers to build json log with context info
- #
- # @see ::Gitlab::Geo::ProjectLogHelpers
- def base_log_data(message)
- {
- class: self.class.name,
- lfs_object_id: lfs_object.id,
- file_path: lfs_object.file.path,
- message: message
- }
- end
- end
-end
diff --git a/ee/app/uploaders/ee/job_artifact_uploader.rb b/ee/app/uploaders/ee/job_artifact_uploader.rb
deleted file mode 100644
index e54419fe683..00000000000
--- a/ee/app/uploaders/ee/job_artifact_uploader.rb
+++ /dev/null
@@ -1,13 +0,0 @@
-module EE
- module JobArtifactUploader
- extend ActiveSupport::Concern
-
- def open
- if file_storage?
- super
- else
- ::Gitlab::Ci::Trace::HttpIO.new(url, size) if url
- end
- end
- end
-end
diff --git a/ee/app/uploaders/object_storage.rb b/ee/app/uploaders/object_storage.rb
deleted file mode 100644
index 23013f99d32..00000000000
--- a/ee/app/uploaders/object_storage.rb
+++ /dev/null
@@ -1,316 +0,0 @@
-require 'fog/aws'
-require 'carrierwave/storage/fog'
-
-#
-# This concern should add object storage support
-# to the GitlabUploader class
-#
-module ObjectStorage
- RemoteStoreError = Class.new(StandardError)
- UnknownStoreError = Class.new(StandardError)
- ObjectStoreUnavailable = Class.new(StandardError)
-
- module Store
- LOCAL = 1
- REMOTE = 2
- end
-
- module Extension
- # this extension is the glue between the ObjectStorage::Concern and RecordsUploads::Concern
- module RecordsUploads
- extend ActiveSupport::Concern
-
- prepended do |base|
- raise ObjectStoreUnavailable, "#{base} must include ObjectStorage::Concern to use extensions." unless base < Concern
-
- base.include(::RecordsUploads::Concern)
- end
-
- def retrieve_from_store!(identifier)
- paths = store_dirs.map { |store, path| File.join(path, identifier) }
-
- unless current_upload_satisfies?(paths, model)
- # the upload we already have isn't right, find the correct one
- self.upload = uploads.find_by(model: model, path: paths)
- end
-
- super
- end
-
- def build_upload_from_uploader(uploader)
- super.tap { |upload| upload.store = object_store }
- end
-
- def upload=(upload)
- return unless upload
-
- self.object_store = upload.store
- super
- end
-
- private
-
- def current_upload_satisfies?(paths, model)
- return false unless upload
- return false unless model
-
- paths.include?(upload.path) &&
- upload.model_id == model.id &&
- upload.model_type == model.class.base_class.sti_name
- end
- end
- end
-
- # Add support for automatic background uploading after the file is stored.
- #
- module BackgroundMove
- extend ActiveSupport::Concern
-
- def background_upload(mount_points = [])
- return unless mount_points.any?
-
- run_after_commit do
- mount_points.each { |mount| send(mount).schedule_background_upload } # rubocop:disable GitlabSecurity/PublicSend
- end
- end
-
- def changed_mounts
- self.class.uploaders.select do |mount, uploader_class|
- mounted_as = uploader_class.serialization_column(self.class, mount)
- uploader = send(:"#{mounted_as}") # rubocop:disable GitlabSecurity/PublicSend
-
- next unless uploader
- next unless uploader.exists?
- next unless send(:"#{mounted_as}_changed?") # rubocop:disable GitlabSecurity/PublicSend
-
- mount
- end.keys
- end
-
- included do
- after_save on: [:create, :update] do
- background_upload(changed_mounts)
- end
- end
- end
-
- module Concern
- extend ActiveSupport::Concern
-
- included do |base|
- base.include(ObjectStorage)
-
- before :store, :verify_license!
- after :migrate, :delete_migrated_file
- end
-
- class_methods do
- def object_store_options
- options.object_store
- end
-
- def object_store_enabled?
- object_store_options.enabled
- end
-
- def background_upload_enabled?
- object_store_options.background_upload
- end
-
- def object_store_credentials
- object_store_options.connection.to_hash.deep_symbolize_keys
- end
-
- def remote_store_path
- object_store_options.remote_directory
- end
-
- def licensed?
- License.feature_available?(:object_storage)
- end
- end
-
- def file_storage?
- storage.is_a?(CarrierWave::Storage::File)
- end
-
- def file_cache_storage?
- cache_storage.is_a?(CarrierWave::Storage::File)
- end
-
- def object_store
- @object_store ||= model.try(store_serialization_column) || Store::LOCAL
- end
-
- # rubocop:disable Gitlab/ModuleWithInstanceVariables
- def object_store=(value)
- @object_store = value || Store::LOCAL
- @storage = storage_for(object_store)
- end
- # rubocop:enable Gitlab/ModuleWithInstanceVariables
-
- # Return true if the current file is part or the model (i.e. is mounted in the model)
- #
- def persist_object_store?
- model.respond_to?(:"#{store_serialization_column}=")
- end
-
- # Save the current @object_store to the model <mounted_as>_store column
- def persist_object_store!
- return unless persist_object_store?
-
- updated = model.update_column(store_serialization_column, object_store)
- raise 'Failed to update object store' unless updated
- end
-
- def use_file
- if file_storage?
- return yield path
- end
-
- begin
- cache_stored_file!
- yield cache_path
- ensure
- cache_storage.delete_dir!(cache_path(nil))
- end
- end
-
- def filename
- super || file&.filename
- end
-
- #
- # Move the file to another store
- #
- # new_store: Enum (Store::LOCAL, Store::REMOTE)
- #
- def migrate!(new_store)
- uuid = Gitlab::ExclusiveLease.new(exclusive_lease_key, timeout: 1.hour.to_i).try_obtain
- raise 'Already running' unless uuid
-
- unsafe_migrate!(new_store)
- ensure
- Gitlab::ExclusiveLease.cancel(exclusive_lease_key, uuid)
- end
-
- def schedule_migration_to_object_storage(*args)
- return unless self.class.object_store_enabled?
- return unless self.class.background_upload_enabled?
- return unless self.class.licensed?
- return unless self.file_storage?
-
- ObjectStorageUploadWorker.perform_async(self.class.name, model.class.name, mounted_as, model.id)
- end
-
- def fog_directory
- self.class.remote_store_path
- end
-
- def fog_credentials
- self.class.object_store_credentials
- end
-
- def fog_public
- false
- end
-
- def delete_migrated_file(migrated_file)
- migrated_file.delete if exists?
- end
-
- def verify_license!(_file)
- return if file_storage?
-
- raise 'Object Storage feature is missing' unless self.class.licensed?
- end
-
- def exists?
- file.present?
- end
-
- def store_dir(store = nil)
- store_dirs[store || object_store]
- end
-
- def store_dirs
- {
- Store::LOCAL => File.join(base_dir, dynamic_segment),
- Store::REMOTE => File.join(dynamic_segment)
- }
- end
-
- private
-
- # this is a hack around CarrierWave. The #migrate method needs to be
- # able to force the current file to the migrated file upon success.
- def file=(file)
- @file = file # rubocop:disable Gitlab/ModuleWithInstanceVariables
- end
-
- def serialization_column
- model.class.uploader_options.dig(mounted_as, :mount_on) || mounted_as
- end
-
- # Returns the column where the 'store' is saved
- # defaults to 'store'
- def store_serialization_column
- [serialization_column, 'store'].compact.join('_').to_sym
- end
-
- def storage
- @storage ||= storage_for(object_store)
- end
-
- def storage_for(store)
- case store
- when Store::REMOTE
- raise 'Object Storage is not enabled' unless self.class.object_store_enabled?
-
- CarrierWave::Storage::Fog.new(self)
- when Store::LOCAL
- CarrierWave::Storage::File.new(self)
- else
- raise UnknownStoreError
- end
- end
-
- def exclusive_lease_key
- "object_storage_migrate:#{model.class}:#{model.id}"
- end
-
- #
- # Move the file to another store
- #
- # new_store: Enum (Store::LOCAL, Store::REMOTE)
- #
- def unsafe_migrate!(new_store)
- return unless object_store != new_store
- return unless file
-
- new_file = nil
- file_to_delete = file
- from_object_store = object_store
- self.object_store = new_store # changes the storage and file
-
- cache_stored_file! if file_storage?
-
- with_callbacks(:migrate, file_to_delete) do
- with_callbacks(:store, file_to_delete) do # for #store_versions!
- new_file = storage.store!(file)
- persist_object_store!
- self.file = new_file
- end
- end
-
- file
- rescue => e
- # in case of failure delete new file
- new_file.delete unless new_file.nil?
- # revert back to the old file
- self.object_store = from_object_store
- self.file = file_to_delete
- raise e
- end
- end
-end
diff --git a/ee/lib/gitlab/geo/file_transfer.rb b/ee/lib/gitlab/geo/file_transfer.rb
deleted file mode 100644
index 16db6f2d448..00000000000
--- a/ee/lib/gitlab/geo/file_transfer.rb
+++ /dev/null
@@ -1,24 +0,0 @@
-module Gitlab
- module Geo
- class FileTransfer < Transfer
- def initialize(file_type, upload)
- @file_type = file_type
- @file_id = upload.id
- @filename = upload.absolute_path
- @request_data = build_request_data(upload)
- rescue ObjectStorage::RemoteStoreError
- Rails.logger.warn "Cannot transfer a remote object."
- end
-
- private
-
- def build_request_data(upload)
- {
- id: upload.model_id,
- type: upload.model_type,
- checksum: upload.checksum
- }
- end
- end
- end
-end
diff --git a/ee/lib/gitlab/geo/log_cursor/daemon.rb b/ee/lib/gitlab/geo/log_cursor/daemon.rb
deleted file mode 100644
index d4596286641..00000000000
--- a/ee/lib/gitlab/geo/log_cursor/daemon.rb
+++ /dev/null
@@ -1,266 +0,0 @@
-module Gitlab
- module Geo
- module LogCursor
- class Daemon
- VERSION = '0.2.0'.freeze
- BATCH_SIZE = 250
- SECONDARY_CHECK_INTERVAL = 1.minute
-
- attr_reader :options
-
- def initialize(options = {})
- @options = options
- @exit = false
- logger.geo_logger.build.level = options[:debug] ? :debug : Rails.logger.level
- end
-
- def run!
- trap_signals
-
- until exit?
- # Prevent the node from processing events unless it's a secondary
- unless Geo.secondary?
- sleep(SECONDARY_CHECK_INTERVAL)
- next
- end
-
- lease = Lease.try_obtain_with_ttl { run_once! }
-
- return if exit?
-
- # When no new event is found sleep for a few moments
- arbitrary_sleep(lease[:ttl])
- end
- end
-
- def run_once!
- LogCursor::Events.fetch_in_batches { |batch| handle_events(batch) }
- end
-
- def handle_events(batch)
- batch.each do |event_log|
- next unless can_replay?(event_log)
-
- begin
- event = event_log.event
- handler = "handle_#{event.class.name.demodulize.underscore}"
-
- __send__(handler, event, event_log.created_at) # rubocop:disable GitlabSecurity/PublicSend
- rescue NoMethodError => e
- logger.error(e.message)
- raise e
- end
- end
- end
-
- private
-
- def trap_signals
- trap(:TERM) do
- quit!
- end
- trap(:INT) do
- quit!
- end
- end
-
- # Safe shutdown
- def quit!
- $stdout.puts 'Exiting...'
-
- @exit = true
- end
-
- def exit?
- @exit
- end
-
- def can_replay?(event_log)
- return true if event_log.project_id.nil?
-
- Gitlab::Geo.current_node&.projects_include?(event_log.project_id)
- end
-
- def handle_repository_created_event(event, created_at)
- registry = find_or_initialize_registry(event.project_id, resync_repository: true, resync_wiki: event.wiki_path.present?)
-
- logger.event_info(
- created_at,
- message: 'Repository created',
- project_id: event.project_id,
- repo_path: event.repo_path,
- wiki_path: event.wiki_path,
- resync_repository: registry.resync_repository,
- resync_wiki: registry.resync_wiki)
-
- registry.save!
-
- ::Geo::ProjectSyncWorker.perform_async(event.project_id, Time.now)
- end
-
- def handle_repository_updated_event(event, created_at)
- registry = find_or_initialize_registry(event.project_id, "resync_#{event.source}" => true)
-
- logger.event_info(
- created_at,
- message: 'Repository update',
- project_id: event.project_id,
- source: event.source,
- resync_repository: registry.resync_repository,
- resync_wiki: registry.resync_wiki)
-
- registry.save!
-
- ::Geo::ProjectSyncWorker.perform_async(event.project_id, Time.now)
- end
-
- def handle_repository_deleted_event(event, created_at)
- job_id = ::Geo::RepositoryDestroyService
- .new(event.project_id, event.deleted_project_name, event.deleted_path, event.repository_storage_name)
- .async_execute
-
- logger.event_info(
- created_at,
- message: 'Deleted project',
- project_id: event.project_id,
- repository_storage_name: event.repository_storage_name,
- disk_path: event.deleted_path,
- job_id: job_id)
-
- # No need to create a project entry if it doesn't exist
- ::Geo::ProjectRegistry.where(project_id: event.project_id).delete_all
- end
-
- def handle_repositories_changed_event(event, created_at)
- return unless Gitlab::Geo.current_node.id == event.geo_node_id
-
- job_id = ::Geo::RepositoriesCleanUpWorker.perform_in(1.hour, event.geo_node_id)
-
- if job_id
- logger.info('Scheduled repositories clean up for Geo node', geo_node_id: event.geo_node_id, job_id: job_id)
- else
- logger.error('Could not schedule repositories clean up for Geo node', geo_node_id: event.geo_node_id)
- end
- end
-
- def handle_repository_renamed_event(event, created_at)
- return unless event.project_id
-
- old_path = event.old_path_with_namespace
- new_path = event.new_path_with_namespace
-
- job_id = ::Geo::RenameRepositoryService
- .new(event.project_id, old_path, new_path)
- .async_execute
-
- logger.event_info(
- created_at,
- message: 'Renaming project',
- project_id: event.project_id,
- old_path: old_path,
- new_path: new_path,
- job_id: job_id)
- end
-
- def handle_hashed_storage_migrated_event(event, created_at)
- return unless event.project_id
-
- job_id = ::Geo::HashedStorageMigrationService.new(
- event.project_id,
- old_disk_path: event.old_disk_path,
- new_disk_path: event.new_disk_path,
- old_storage_version: event.old_storage_version
- ).async_execute
-
- logger.event_info(
- created_at,
- message: 'Migrating project to hashed storage',
- project_id: event.project_id,
- old_storage_version: event.old_storage_version,
- new_storage_version: event.new_storage_version,
- old_disk_path: event.old_disk_path,
- new_disk_path: event.new_disk_path,
- job_id: job_id)
- end
-
- def handle_hashed_storage_attachments_event(event, created_at)
- job_id = ::Geo::HashedStorageAttachmentsMigrationService.new(
- event.project_id,
- old_attachments_path: event.old_attachments_path,
- new_attachments_path: event.new_attachments_path
- ).async_execute
-
- logger.event_info(
- created_at,
- message: 'Migrating attachments to hashed storage',
- project_id: event.project_id,
- old_attachments_path: event.old_attachments_path,
- new_attachments_path: event.new_attachments_path,
- job_id: job_id
- )
- end
-
- def handle_lfs_object_deleted_event(event, created_at)
- file_path = File.join(LfsObjectUploader.root, event.file_path)
-
- job_id = ::Geo::FileRemovalWorker.perform_async(file_path)
-
- logger.event_info(
- created_at,
- message: 'Deleted LFS object',
- oid: event.oid,
- file_id: event.lfs_object_id,
- file_path: file_path,
- job_id: job_id)
-
- ::Geo::FileRegistry.lfs_objects.where(file_id: event.lfs_object_id).delete_all
- end
-
- def handle_job_artifact_deleted_event(event, created_at)
- file_registry_job_artifacts = ::Geo::FileRegistry.job_artifacts.where(file_id: event.job_artifact_id)
- return unless file_registry_job_artifacts.any? # avoid race condition
-
- file_path = File.join(::JobArtifactUploader.root, event.file_path)
-
- if File.file?(file_path)
- deleted = delete_file(file_path) # delete synchronously to ensure consistency
- return unless deleted # do not delete file from registry if deletion failed
- end
-
- logger.event_info(
- created_at,
- message: 'Deleted job artifact',
- file_id: event.job_artifact_id,
- file_path: file_path)
-
- file_registry_job_artifacts.delete_all
- end
-
- def find_or_initialize_registry(project_id, attrs)
- registry = ::Geo::ProjectRegistry.find_or_initialize_by(project_id: project_id)
- registry.assign_attributes(attrs)
- registry
- end
-
- def delete_file(path)
- File.delete(path)
- rescue => ex
- logger.error("Failed to remove file", exception: ex.class.name, details: ex.message, filename: path)
- false
- end
-
- # Sleeps for the expired TTL that remains on the lease plus some random seconds.
- #
- # This allows multiple GeoLogCursors to randomly process a batch of events,
- # without favouring the shortest path (or latency).
- def arbitrary_sleep(delay)
- sleep(delay + rand(1..20) * 0.1)
- end
-
- def logger
- Gitlab::Geo::LogCursor::Logger
- end
- end
- end
- end
-end
diff --git a/ee/lib/gitlab/ci/trace/http_io.rb b/lib/gitlab/ci/trace/http_io.rb
index 5256f7999c1..5256f7999c1 100644
--- a/ee/lib/gitlab/ci/trace/http_io.rb
+++ b/lib/gitlab/ci/trace/http_io.rb
diff --git a/spec/ee/spec/lib/gitlab/ci/trace/http_io_spec.rb b/spec/lib/gitlab/ci/trace/http_io_spec.rb
index b839ef7ce36..b839ef7ce36 100644
--- a/spec/ee/spec/lib/gitlab/ci/trace/http_io_spec.rb
+++ b/spec/lib/gitlab/ci/trace/http_io_spec.rb