require 'json' # See http://doc.gitlab.com/ce/development/migration_style_guide.html # for more information on how to write migrations for GitLab. class MigrateSidekiqQueuesFromDefault < ActiveRecord::Migration include Gitlab::Database::MigrationHelpers DOWNTIME = true DOWNTIME_REASON = <<-EOF Moving Sidekiq jobs from queues requires Sidekiq to be stopped. Not stopping Sidekiq will result in the loss of jobs that are scheduled after this migration completes. EOF disable_ddl_transaction! # Jobs for which the queue names have been changed (e.g. multiple workers # using the same non-default queue). # # The keys are the old queue names, the values the jobs to move and their new # queue names. RENAMED_QUEUES = { gitlab_shell: { 'GitGarbageCollectorWorker' => :git_garbage_collector, 'ProjectExportWorker' => :project_export, 'RepositoryForkWorker' => :repository_fork, 'RepositoryImportWorker' => :repository_import }, project_web_hook: { 'ProjectServiceWorker' => :project_service }, incoming_email: { 'EmailReceiverWorker' => :email_receiver }, mailers: { 'EmailsOnPushWorker' => :emails_on_push }, default: { 'AdminEmailWorker' => :cronjob, 'BuildCoverageWorker' => :build, 'BuildEmailWorker' => :build, 'BuildFinishedWorker' => :build, 'BuildHooksWorker' => :build, 'BuildSuccessWorker' => :build, 'ClearDatabaseCacheWorker' => :clear_database_cache, 'DeleteUserWorker' => :delete_user, 'ExpireBuildArtifactsWorker' => :cronjob, 'ExpireBuildInstanceArtifactsWorker' => :expire_build_instance_artifacts, 'GroupDestroyWorker' => :group_destroy, 'ImportExportProjectCleanupWorker' => :cronjob, 'IrkerWorker' => :irker, 'MergeWorker' => :merge, 'NewNoteWorker' => :new_note, 'PipelineHooksWorker' => :pipeline, 'PipelineMetricsWorker' => :pipeline, 'PipelineProcessWorker' => :pipeline, 'PipelineSuccessWorker' => :pipeline, 'PipelineUpdateWorker' => :pipeline, 'ProjectCacheWorker' => :project_cache, 'ProjectDestroyWorker' => :project_destroy, 'PruneOldEventsWorker' => :cronjob, 'RemoveExpiredGroupLinksWorker' => :cronjob, 'RemoveExpiredMembersWorker' => :cronjob, 'RepositoryArchiveCacheWorker' => :cronjob, 'RepositoryCheck::BatchWorker' => :cronjob, 'RepositoryCheck::ClearWorker' => :repository_check, 'RepositoryCheck::SingleRepositoryWorker' => :repository_check, 'RequestsProfilesWorker' => :cronjob, 'StuckCiBuildsWorker' => :cronjob, 'UpdateMergeRequestsWorker' => :update_merge_requests } }.freeze def up Sidekiq.redis do |redis| RENAMED_QUEUES.each do |queue, jobs| migrate_from_queue(redis, queue, jobs) end end end def down Sidekiq.redis do |redis| RENAMED_QUEUES.each do |dest_queue, jobs| jobs.each do |worker, from_queue| migrate_from_queue(redis, from_queue, worker => dest_queue) end end end end def migrate_from_queue(redis, queue, job_mapping) while job = redis.lpop("queue:#{queue}") payload = JSON.parse(job) new_queue = job_mapping[payload['class']] # If we have no target queue to migrate to we're probably dealing with # some ancient job for which the worker no longer exists. In that case # there's no sane option we can take, other than just dropping the job. next unless new_queue payload['queue'] = new_queue redis.lpush("queue:#{new_queue}", JSON.dump(payload)) end end end