diff options
7 files changed, 189 insertions, 7 deletions
diff --git a/app/workers/background_migration_worker.rb b/app/workers/background_migration_worker.rb index e85e221d353..45ce49bb5c0 100644 --- a/app/workers/background_migration_worker.rb +++ b/app/workers/background_migration_worker.rb @@ -2,18 +2,34 @@ class BackgroundMigrationWorker include Sidekiq::Worker include DedicatedSidekiqQueue - # Schedules a number of jobs in bulk + # Enqueues a number of jobs in bulk. # # The `jobs` argument should be an Array of Arrays, each sub-array must be in # the form: # # [migration-class, [arg1, arg2, ...]] - def self.perform_bulk(*jobs) + def self.perform_bulk(jobs) Sidekiq::Client.push_bulk('class' => self, 'queue' => sidekiq_options['queue'], 'args' => jobs) end + # Schedules multiple jobs in bulk, with a delay. + # + def self.perform_bulk_in(delay, jobs) + now = Time.now.to_i + schedule = now + delay.to_i + + if schedule <= now + raise ArgumentError, 'The schedule time must be in the future!' + end + + Sidekiq::Client.push_bulk('class' => self, + 'queue' => sidekiq_options['queue'], + 'args' => jobs, + 'at' => schedule) + end + # Performs the background migration. # # See Gitlab::BackgroundMigration.perform for more information. diff --git a/db/post_migrate/20170628080858_migrate_stage_id_reference_in_background.rb b/db/post_migrate/20170628080858_migrate_stage_id_reference_in_background.rb new file mode 100644 index 00000000000..f31015d77a3 --- /dev/null +++ b/db/post_migrate/20170628080858_migrate_stage_id_reference_in_background.rb @@ -0,0 +1,33 @@ +class MigrateStageIdReferenceInBackground < ActiveRecord::Migration + include Gitlab::Database::MigrationHelpers + + DOWNTIME = false + BATCH_SIZE = 10000 + RANGE_SIZE = 1000 + MIGRATION = 'MigrateBuildStageIdReference'.freeze + + disable_ddl_transaction! + + class Build < ActiveRecord::Base + self.table_name = 'ci_builds' + include ::EachBatch + end + + ## + # It will take around 3 days to process 20M ci_builds. + # + def up + Build.where(stage_id: nil).each_batch(of: BATCH_SIZE) do |relation, index| + relation.each_batch(of: RANGE_SIZE) do |relation| + range = relation.pluck('MIN(id)', 'MAX(id)').first + + BackgroundMigrationWorker + .perform_in(index * 2.minutes, MIGRATION, range) + end + end + end + + def down + # noop + end +end diff --git a/doc/development/background_migrations.md b/doc/development/background_migrations.md index 0239e6b3163..72a34aa7de9 100644 --- a/doc/development/background_migrations.md +++ b/doc/development/background_migrations.md @@ -50,14 +50,13 @@ your migration: BackgroundMigrationWorker.perform_async('BackgroundMigrationClassName', [arg1, arg2, ...]) ``` -Usually it's better to schedule jobs in bulk, for this you can use +Usually it's better to enqueue jobs in bulk, for this you can use `BackgroundMigrationWorker.perform_bulk`: ```ruby BackgroundMigrationWorker.perform_bulk( - ['BackgroundMigrationClassName', [1]], - ['BackgroundMigrationClassName', [2]], - ... + [['BackgroundMigrationClassName', [1]], + ['BackgroundMigrationClassName', [2]]] ) ``` @@ -68,6 +67,16 @@ consuming migrations it's best to schedule a background job using an updates. Removals in turn can be handled by simply defining foreign keys with cascading deletes. +If you would like to schedule jobs in bulk with a delay, you can use +`BackgroundMigrationWorker.perform_bulk_in`: + +```ruby +jobs = [['BackgroundMigrationClassName', [1]], + ['BackgroundMigrationClassName', [2]]] + +BackgroundMigrationWorker.perform_bulk_in(5.minutes, jobs) +``` + ## Cleaning Up Because background migrations can take a long time you can't immediately clean diff --git a/lib/gitlab/background_migration/migrate_build_stage_id_reference.rb b/lib/gitlab/background_migration/migrate_build_stage_id_reference.rb new file mode 100644 index 00000000000..91540127ea9 --- /dev/null +++ b/lib/gitlab/background_migration/migrate_build_stage_id_reference.rb @@ -0,0 +1,19 @@ +module Gitlab + module BackgroundMigration + class MigrateBuildStageIdReference + def perform(start_id, stop_id) + sql = <<-SQL.strip_heredoc + UPDATE ci_builds + SET stage_id = + (SELECT id FROM ci_stages + WHERE ci_stages.pipeline_id = ci_builds.commit_id + AND ci_stages.name = ci_builds.stage) + WHERE ci_builds.id BETWEEN #{start_id.to_i} AND #{stop_id.to_i} + AND ci_builds.stage_id IS NULL + SQL + + ActiveRecord::Base.connection.execute(sql) + end + end + end +end diff --git a/spec/migrations/migrate_stage_id_reference_in_background_spec.rb b/spec/migrations/migrate_stage_id_reference_in_background_spec.rb new file mode 100644 index 00000000000..260378adaa7 --- /dev/null +++ b/spec/migrations/migrate_stage_id_reference_in_background_spec.rb @@ -0,0 +1,68 @@ +require 'spec_helper' +require Rails.root.join('db', 'post_migrate', '20170628080858_migrate_stage_id_reference_in_background') + +describe MigrateStageIdReferenceInBackground, :migration, :sidekiq do + matcher :be_scheduled_migration do |delay, *expected| + match do |migration| + BackgroundMigrationWorker.jobs.any? do |job| + job['args'] == [migration, expected] && + job['at'].to_i == (delay.to_i + Time.now.to_i) + end + end + + failure_message do |migration| + "Migration `#{migration}` with args `#{expected.inspect}` not scheduled!" + end + end + + let(:jobs) { table(:ci_builds) } + let(:stages) { table(:ci_stages) } + let(:pipelines) { table(:ci_pipelines) } + let(:projects) { table(:projects) } + + before do + stub_const("#{described_class.name}::BATCH_SIZE", 3) + stub_const("#{described_class.name}::RANGE_SIZE", 2) + + projects.create!(id: 123, name: 'gitlab1', path: 'gitlab1') + projects.create!(id: 345, name: 'gitlab2', path: 'gitlab2') + + pipelines.create!(id: 1, project_id: 123, ref: 'master', sha: 'adf43c3a') + pipelines.create!(id: 2, project_id: 345, ref: 'feature', sha: 'cdf43c3c') + + jobs.create!(id: 1, commit_id: 1, project_id: 123, stage_idx: 2, stage: 'build') + jobs.create!(id: 2, commit_id: 1, project_id: 123, stage_idx: 2, stage: 'build') + jobs.create!(id: 3, commit_id: 1, project_id: 123, stage_idx: 1, stage: 'test') + jobs.create!(id: 4, commit_id: 1, project_id: 123, stage_idx: 3, stage: 'deploy') + jobs.create!(id: 5, commit_id: 2, project_id: 345, stage_idx: 1, stage: 'test') + + stages.create(id: 101, pipeline_id: 1, project_id: 123, name: 'test') + stages.create(id: 102, pipeline_id: 1, project_id: 123, name: 'build') + stages.create(id: 103, pipeline_id: 1, project_id: 123, name: 'deploy') + + jobs.create!(id: 6, commit_id: 2, project_id: 345, stage_id: 101, stage_idx: 1, stage: 'test') + end + + it 'correctly schedules background migrations' do + Sidekiq::Testing.fake! do + Timecop.freeze do + migrate! + + expect(described_class::MIGRATION).to be_scheduled_migration(2.minutes, 1, 2) + expect(described_class::MIGRATION).to be_scheduled_migration(2.minutes, 3, 3) + expect(described_class::MIGRATION).to be_scheduled_migration(4.minutes, 4, 5) + expect(BackgroundMigrationWorker.jobs.size).to eq 3 + end + end + end + + it 'schedules background migrations' do + Sidekiq::Testing.inline! do + expect(jobs.where(stage_id: nil).count).to eq 5 + + migrate! + + expect(jobs.where(stage_id: nil).count).to eq 1 + end + end +end diff --git a/spec/support/sidekiq.rb b/spec/support/sidekiq.rb index 575d3451150..5478fea4e64 100644 --- a/spec/support/sidekiq.rb +++ b/spec/support/sidekiq.rb @@ -3,3 +3,9 @@ require 'sidekiq/testing/inline' Sidekiq::Testing.server_middleware do |chain| chain.add Gitlab::SidekiqStatus::ServerMiddleware end + +RSpec.configure do |config| + config.after(:each, :sidekiq) do + Sidekiq::Worker.clear_all + end +end diff --git a/spec/workers/background_migration_worker_spec.rb b/spec/workers/background_migration_worker_spec.rb index 85939429feb..4f6e3474634 100644 --- a/spec/workers/background_migration_worker_spec.rb +++ b/spec/workers/background_migration_worker_spec.rb @@ -1,6 +1,6 @@ require 'spec_helper' -describe BackgroundMigrationWorker do +describe BackgroundMigrationWorker, :sidekiq do describe '.perform' do it 'performs a background migration' do expect(Gitlab::BackgroundMigration) @@ -10,4 +10,35 @@ describe BackgroundMigrationWorker do described_class.new.perform('Foo', [10, 20]) end end + + describe '.perform_bulk' do + it 'enqueues background migrations in bulk' do + Sidekiq::Testing.fake! do + described_class.perform_bulk([['Foo', [1]], ['Foo', [2]]]) + + expect(described_class.jobs.count).to eq 2 + expect(described_class.jobs).to all(include('enqueued_at')) + end + end + end + + describe '.perform_bulk_in' do + context 'when delay is valid' do + it 'correctly schedules background migrations' do + Sidekiq::Testing.fake! do + described_class.perform_bulk_in(1.minute, [['Foo', [1]], ['Foo', [2]]]) + + expect(described_class.jobs.count).to eq 2 + expect(described_class.jobs).to all(include('at')) + end + end + end + + context 'when delay is invalid' do + it 'raises an ArgumentError exception' do + expect { described_class.perform_bulk_in(-60, [['Foo']]) } + .to raise_error(ArgumentError) + end + end + end end |