diff options
author | Micaël Bergeron <mbergeron@gitlab.com> | 2018-02-21 16:09:53 -0500 |
---|---|---|
committer | Micaël Bergeron <mbergeron@gitlab.com> | 2018-03-01 10:36:24 -0500 |
commit | a8df653faeec9147725a391d6de49d1b9fe4528e (patch) | |
tree | dd7537bd9be2adf3280b985600c0d88784ea6967 /app/workers | |
parent | 0f1d348d683fdef6c36c3b244c85e59f582ff886 (diff) | |
download | gitlab-ce-a8df653faeec9147725a391d6de49d1b9fe4528e.tar.gz |
another round of fixes
Diffstat (limited to 'app/workers')
-rw-r--r-- | app/workers/concerns/object_storage_queue.rb | 8 | ||||
-rw-r--r-- | app/workers/object_storage/background_move_worker.rb | 30 | ||||
-rw-r--r-- | app/workers/object_storage/migrate_uploads_worker.rb | 202 | ||||
-rw-r--r-- | app/workers/object_storage_upload_worker.rb | 9 |
4 files changed, 244 insertions, 5 deletions
diff --git a/app/workers/concerns/object_storage_queue.rb b/app/workers/concerns/object_storage_queue.rb new file mode 100644 index 00000000000..a80f473a6d4 --- /dev/null +++ b/app/workers/concerns/object_storage_queue.rb @@ -0,0 +1,8 @@ +# Concern for setting Sidekiq settings for the various GitLab ObjectStorage workers. +module ObjectStorageQueue + extend ActiveSupport::Concern + + included do + queue_namespace :object_storage + end +end diff --git a/app/workers/object_storage/background_move_worker.rb b/app/workers/object_storage/background_move_worker.rb new file mode 100644 index 00000000000..3d0a2109b1d --- /dev/null +++ b/app/workers/object_storage/background_move_worker.rb @@ -0,0 +1,30 @@ +module ObjectStorage + class BackgroundMoveWorker + include ApplicationWorker + include ObjectStorageQueue + + sidekiq_options retry: 5 + + def perform(uploader_class_name, subject_class_name, file_field, subject_id) + uploader_class = uploader_class_name.constantize + subject_class = subject_class_name.constantize + + return unless uploader_class < ObjectStorage::Concern + return unless uploader_class.object_store_enabled? + return unless uploader_class.licensed? + return unless uploader_class.background_upload_enabled? + + subject = subject_class.find(subject_id) + uploader = build_uploader(subject, file_field&.to_sym) + uploader.migrate!(ObjectStorage::Store::REMOTE) + end + + def build_uploader(subject, mount_point) + case subject + when Upload then subject.build_uploader(mount_point) + else + subject.send(mount_point) # rubocop:disable GitlabSecurity/PublicSend + end + end + end +end diff --git a/app/workers/object_storage/migrate_uploads_worker.rb b/app/workers/object_storage/migrate_uploads_worker.rb new file mode 100644 index 00000000000..01ed123e6c8 --- /dev/null +++ b/app/workers/object_storage/migrate_uploads_worker.rb @@ -0,0 +1,202 @@ +# frozen_string_literal: true +# rubocop:disable Metrics/LineLength +# rubocop:disable Style/Documentation + +module ObjectStorage + class MigrateUploadsWorker + include ApplicationWorker + include ObjectStorageQueue + + SanityCheckError = Class.new(StandardError) + + class Upload < ActiveRecord::Base + # Upper limit for foreground checksum processing + CHECKSUM_THRESHOLD = 100.megabytes + + belongs_to :model, polymorphic: true # rubocop:disable Cop/PolymorphicAssociations + + validates :size, presence: true + validates :path, presence: true + validates :model, presence: true + validates :uploader, presence: true + + before_save :calculate_checksum!, if: :foreground_checksummable? + after_commit :schedule_checksum, if: :checksummable? + + scope :stored_locally, -> { where(store: [nil, ObjectStorage::Store::LOCAL]) } + scope :stored_remotely, -> { where(store: ObjectStorage::Store::REMOTE) } + + def self.hexdigest(path) + Digest::SHA256.file(path).hexdigest + end + + def absolute_path + raise ObjectStorage::RemoteStoreError, "Remote object has no absolute path." unless local? + return path unless relative_path? + + uploader_class.absolute_path(self) + end + + def calculate_checksum! + self.checksum = nil + return unless checksummable? + + self.checksum = self.class.hexdigest(absolute_path) + end + + def build_uploader(mounted_as = nil) + uploader_class.new(model, mounted_as).tap do |uploader| + uploader.upload = self + uploader.retrieve_from_store!(identifier) + end + end + + def exist? + File.exist?(absolute_path) + end + + def local? + return true if store.nil? + + store == ObjectStorage::Store::LOCAL + end + + private + + def checksummable? + checksum.nil? && local? && exist? + end + + def foreground_checksummable? + checksummable? && size <= CHECKSUM_THRESHOLD + end + + def schedule_checksum + UploadChecksumWorker.perform_async(id) + end + + def relative_path? + !path.start_with?('/') + end + + def identifier + File.basename(path) + end + + def uploader_class + Object.const_get(uploader) + end + end + + class MigrationResult + attr_reader :upload + attr_accessor :error + + def initialize(upload, error = nil) + @upload, @error = upload, error + end + + def success? + error.nil? + end + + def to_s + success? ? "Migration successful." : "Error while migrating #{upload.id}: #{error.message}" + end + end + + module Report + class MigrationFailures < StandardError + attr_reader :errors + + def initialize(errors) + @errors = errors + end + + def message + errors.map(&:message).join("\n") + end + end + + def report!(results) + success, failures = results.partition(&:success?) + + Rails.logger.info header(success, failures) + Rails.logger.warn failures(failures) + + raise MigrationFailures.new(failures.map(&:error)) if failures.any? + end + + def header(success, failures) + "Migrated #{success.count}/#{success.count + failures.count} files." + end + + def failures(failures) + failures.map { |f| "\t#{f}" }.join('\n') + end + end + + include Report + + def self.enqueue!(uploads, mounted_as, to_store) + sanity_check!(uploads, mounted_as) + + perform_async(uploads.ids, mounted_as, to_store) + end + + # We need to be sure all the uploads are for the same uploader and model type + # and that the mount point exists if provided. + # + def self.sanity_check!(uploads, mounted_as) + upload = uploads.first + + uploader_class = upload.uploader.constantize + model_class = uploads.first.model_type.constantize + + uploader_types = uploads.map(&:uploader).uniq + model_types = uploads.map(&:model_type).uniq + model_has_mount = mounted_as.nil? || model_class.uploaders[mounted_as] == uploader_class + + raise(SanityCheckError, "Multiple uploaders found: #{uploader_types}") unless uploader_types.count == 1 + raise(SanityCheckError, "Multiple model types found: #{model_types}") unless model_types.count == 1 + raise(SanityCheckError, "Mount point #{mounted_as} not found in #{model_class}.") unless model_has_mount + end + + def perform(ids, mounted_as, to_store) + @mounted_as = mounted_as&.to_sym + @to_store = to_store + + uploads = Upload.preload(:model).where(id: ids) + + sanity_check!(uploads) + results = migrate(uploads) + + report!(results) + rescue SanityCheckError => e + # do not retry: the job is insane + Rails.logger.warn "#{self.class}: Sanity check error (#{e.message})" + end + + def sanity_check!(uploads) + self.class.sanity_check!(uploads, @mounted_as) + end + + def build_uploaders(uploads) + uploads.map { |upload| upload.build_uploader(@mounted_as) } + end + + def migrate(uploads) + build_uploaders(uploads).map(&method(:process_uploader)) + end + + def process_uploader(uploader) + MigrationResult.new(uploader.upload).tap do |result| + begin + uploader.migrate!(@to_store) + rescue => e + result.error = e + end + end + end + end +end diff --git a/app/workers/object_storage_upload_worker.rb b/app/workers/object_storage_upload_worker.rb index e087261770f..50d7cc82faa 100644 --- a/app/workers/object_storage_upload_worker.rb +++ b/app/workers/object_storage_upload_worker.rb @@ -1,6 +1,8 @@ +# @Deprecated - remove once the `object_storage_upload` queue is empty +# The queue has been renamed `object_storage:object_storage_background_upload` +# class ObjectStorageUploadWorker - include Sidekiq::Worker - include DedicatedSidekiqQueue + include ApplicationWorker sidekiq_options retry: 5 @@ -16,8 +18,5 @@ class ObjectStorageUploadWorker subject = subject_class.find(subject_id) uploader = subject.public_send(file_field) # rubocop:disable GitlabSecurity/PublicSend uploader.migrate!(ObjectStorage::Store::REMOTE) - rescue RecordNotFound - # does not retry when the record do not exists - Rails.logger.warn("Cannot find subject #{subject_class} with id=#{subject_id}.") end end |