diff options
Diffstat (limited to 'lib/gitlab/sidekiq_middleware')
4 files changed, 89 insertions, 27 deletions
diff --git a/lib/gitlab/sidekiq_middleware/duplicate_jobs/client.rb b/lib/gitlab/sidekiq_middleware/duplicate_jobs/client.rb index ddd1b91410b..bb0c18735bb 100644 --- a/lib/gitlab/sidekiq_middleware/duplicate_jobs/client.rb +++ b/lib/gitlab/sidekiq_middleware/duplicate_jobs/client.rb @@ -5,9 +5,6 @@ module Gitlab module DuplicateJobs class Client def call(worker_class, job, queue, _redis_pool, &block) - # We don't try to deduplicate jobs that are scheduled in the future - return yield if job['at'] - DuplicateJob.new(job, queue).schedule(&block) end end diff --git a/lib/gitlab/sidekiq_middleware/duplicate_jobs/duplicate_job.rb b/lib/gitlab/sidekiq_middleware/duplicate_jobs/duplicate_job.rb index fa742d07af2..0dc53c61e84 100644 --- a/lib/gitlab/sidekiq_middleware/duplicate_jobs/duplicate_job.rb +++ b/lib/gitlab/sidekiq_middleware/duplicate_jobs/duplicate_job.rb @@ -18,13 +18,13 @@ module Gitlab # When new jobs can be scheduled again, the strategy calls `#delete`. class DuplicateJob DUPLICATE_KEY_TTL = 6.hours + DEFAULT_STRATEGY = :until_executing attr_reader :existing_jid - def initialize(job, queue_name, strategy: :until_executing) + def initialize(job, queue_name) @job = job @queue_name = queue_name - @strategy = strategy end # This will continue the middleware chain if the job should be scheduled @@ -41,12 +41,12 @@ module Gitlab end # This method will return the jid that was set in redis - def check! + def check!(expiry = DUPLICATE_KEY_TTL) read_jid = nil Sidekiq.redis do |redis| redis.multi do |multi| - redis.set(idempotency_key, jid, ex: DUPLICATE_KEY_TTL, nx: true) + redis.set(idempotency_key, jid, ex: expiry, nx: true) read_jid = redis.get(idempotency_key) end end @@ -60,6 +60,10 @@ module Gitlab end end + def scheduled? + scheduled_at.present? + end + def duplicate? raise "Call `#check!` first to check for existing duplicates" unless existing_jid @@ -67,14 +71,36 @@ module Gitlab end def droppable? - idempotent? && duplicate? && ::Feature.disabled?("disable_#{queue_name}_deduplication") + idempotent? && ::Feature.disabled?("disable_#{queue_name}_deduplication") + end + + def scheduled_at + job['at'] + end + + def options + return {} unless worker_klass + return {} unless worker_klass.respond_to?(:get_deduplication_options) + + worker_klass.get_deduplication_options end private - attr_reader :queue_name, :strategy, :job + attr_reader :queue_name, :job attr_writer :existing_jid + def worker_klass + @worker_klass ||= worker_class_name.to_s.safe_constantize + end + + def strategy + return DEFAULT_STRATEGY unless worker_klass + return DEFAULT_STRATEGY unless worker_klass.respond_to?(:idempotent?) + + worker_klass.get_deduplicate_strategy + end + def worker_class_name job['class'] end @@ -104,11 +130,10 @@ module Gitlab end def idempotent? - worker_class = worker_class_name.to_s.safe_constantize - return false unless worker_class - return false unless worker_class.respond_to?(:idempotent?) + return false unless worker_klass + return false unless worker_klass.respond_to?(:idempotent?) - worker_class.idempotent? + worker_klass.idempotent? end end end diff --git a/lib/gitlab/sidekiq_middleware/duplicate_jobs/strategies/until_executing.rb b/lib/gitlab/sidekiq_middleware/duplicate_jobs/strategies/until_executing.rb index 674e436b714..0ed4912c4cc 100644 --- a/lib/gitlab/sidekiq_middleware/duplicate_jobs/strategies/until_executing.rb +++ b/lib/gitlab/sidekiq_middleware/duplicate_jobs/strategies/until_executing.rb @@ -13,13 +13,13 @@ module Gitlab end def schedule(job) - if duplicate_job.check! && duplicate_job.duplicate? + if deduplicatable_job? && check! && duplicate_job.duplicate? job['duplicate-of'] = duplicate_job.existing_jid - end - if duplicate_job.droppable? - Gitlab::SidekiqLogging::DeduplicationLogger.instance.log(job, "dropped until executing") - return false + if duplicate_job.droppable? + Gitlab::SidekiqLogging::DeduplicationLogger.instance.log(job, "dropped until executing") + return false + end end yield @@ -34,6 +34,22 @@ module Gitlab private attr_reader :duplicate_job + + def deduplicatable_job? + !duplicate_job.scheduled? || duplicate_job.options[:including_scheduled] + end + + def check! + duplicate_job.check!(expiry) + end + + def expiry + return DuplicateJob::DUPLICATE_KEY_TTL unless duplicate_job.scheduled? + + time_diff = duplicate_job.scheduled_at.to_i - Time.now.to_i + + time_diff > 0 ? time_diff : DuplicateJob::DUPLICATE_KEY_TTL + end end end end diff --git a/lib/gitlab/sidekiq_middleware/server_metrics.rb b/lib/gitlab/sidekiq_middleware/server_metrics.rb index 61ed2fe1a06..6a942a6ce06 100644 --- a/lib/gitlab/sidekiq_middleware/server_metrics.rb +++ b/lib/gitlab/sidekiq_middleware/server_metrics.rb @@ -47,6 +47,10 @@ module Gitlab @metrics[:sidekiq_jobs_completion_seconds].observe(labels, monotonic_time) @metrics[:sidekiq_jobs_db_seconds].observe(labels, ActiveRecord::LogSubscriber.runtime / 1000) @metrics[:sidekiq_jobs_gitaly_seconds].observe(labels, get_gitaly_time(job)) + @metrics[:sidekiq_redis_requests_total].increment(labels, get_redis_calls(job)) + @metrics[:sidekiq_redis_requests_duration_seconds].observe(labels, get_redis_time(job)) + @metrics[:sidekiq_elasticsearch_requests_total].increment(labels, get_elasticsearch_calls(job)) + @metrics[:sidekiq_elasticsearch_requests_duration_seconds].observe(labels, get_elasticsearch_time(job)) end end @@ -54,15 +58,19 @@ module Gitlab def init_metrics { - sidekiq_jobs_cpu_seconds: ::Gitlab::Metrics.histogram(:sidekiq_jobs_cpu_seconds, 'Seconds of cpu time to run Sidekiq job', {}, SIDEKIQ_LATENCY_BUCKETS), - sidekiq_jobs_completion_seconds: ::Gitlab::Metrics.histogram(:sidekiq_jobs_completion_seconds, 'Seconds to complete Sidekiq job', {}, SIDEKIQ_LATENCY_BUCKETS), - sidekiq_jobs_db_seconds: ::Gitlab::Metrics.histogram(:sidekiq_jobs_db_seconds, 'Seconds of database time to run Sidekiq job', {}, SIDEKIQ_LATENCY_BUCKETS), - sidekiq_jobs_gitaly_seconds: ::Gitlab::Metrics.histogram(:sidekiq_jobs_gitaly_seconds, 'Seconds of Gitaly time to run Sidekiq job', {}, SIDEKIQ_LATENCY_BUCKETS), - sidekiq_jobs_queue_duration_seconds: ::Gitlab::Metrics.histogram(:sidekiq_jobs_queue_duration_seconds, 'Duration in seconds that a Sidekiq job was queued before being executed', {}, SIDEKIQ_LATENCY_BUCKETS), - sidekiq_jobs_failed_total: ::Gitlab::Metrics.counter(:sidekiq_jobs_failed_total, 'Sidekiq jobs failed'), - sidekiq_jobs_retried_total: ::Gitlab::Metrics.counter(:sidekiq_jobs_retried_total, 'Sidekiq jobs retried'), - sidekiq_running_jobs: ::Gitlab::Metrics.gauge(:sidekiq_running_jobs, 'Number of Sidekiq jobs running', {}, :all), - sidekiq_concurrency: ::Gitlab::Metrics.gauge(:sidekiq_concurrency, 'Maximum number of Sidekiq jobs', {}, :all) + sidekiq_jobs_cpu_seconds: ::Gitlab::Metrics.histogram(:sidekiq_jobs_cpu_seconds, 'Seconds of cpu time to run Sidekiq job', {}, SIDEKIQ_LATENCY_BUCKETS), + sidekiq_jobs_completion_seconds: ::Gitlab::Metrics.histogram(:sidekiq_jobs_completion_seconds, 'Seconds to complete Sidekiq job', {}, SIDEKIQ_LATENCY_BUCKETS), + sidekiq_jobs_db_seconds: ::Gitlab::Metrics.histogram(:sidekiq_jobs_db_seconds, 'Seconds of database time to run Sidekiq job', {}, SIDEKIQ_LATENCY_BUCKETS), + sidekiq_jobs_gitaly_seconds: ::Gitlab::Metrics.histogram(:sidekiq_jobs_gitaly_seconds, 'Seconds of Gitaly time to run Sidekiq job', {}, SIDEKIQ_LATENCY_BUCKETS), + sidekiq_jobs_queue_duration_seconds: ::Gitlab::Metrics.histogram(:sidekiq_jobs_queue_duration_seconds, 'Duration in seconds that a Sidekiq job was queued before being executed', {}, SIDEKIQ_LATENCY_BUCKETS), + sidekiq_redis_requests_duration_seconds: ::Gitlab::Metrics.histogram(:sidekiq_redis_requests_duration_seconds, 'Duration in seconds that a Sidekiq job spent requests a Redis server', {}, Gitlab::Instrumentation::Redis::QUERY_TIME_BUCKETS), + sidekiq_elasticsearch_requests_duration_seconds: ::Gitlab::Metrics.histogram(:sidekiq_elasticsearch_requests_duration_seconds, 'Duration in seconds that a Sidekiq job spent in requests to an Elasticsearch server', {}, SIDEKIQ_LATENCY_BUCKETS), + sidekiq_jobs_failed_total: ::Gitlab::Metrics.counter(:sidekiq_jobs_failed_total, 'Sidekiq jobs failed'), + sidekiq_jobs_retried_total: ::Gitlab::Metrics.counter(:sidekiq_jobs_retried_total, 'Sidekiq jobs retried'), + sidekiq_redis_requests_total: ::Gitlab::Metrics.counter(:sidekiq_redis_requests_total, 'Redis requests during a Sidekiq job execution'), + sidekiq_elasticsearch_requests_total: ::Gitlab::Metrics.counter(:sidekiq_elasticsearch_requests_total, 'Elasticsearch requests during a Sidekiq job execution'), + sidekiq_running_jobs: ::Gitlab::Metrics.gauge(:sidekiq_running_jobs, 'Number of Sidekiq jobs running', {}, :all), + sidekiq_concurrency: ::Gitlab::Metrics.gauge(:sidekiq_concurrency, 'Maximum number of Sidekiq jobs', {}, :all) } end @@ -70,6 +78,22 @@ module Gitlab defined?(Process::CLOCK_THREAD_CPUTIME_ID) ? Process.clock_gettime(Process::CLOCK_THREAD_CPUTIME_ID) : 0 end + def get_redis_time(job) + job.fetch(:redis_duration_s, 0) + end + + def get_redis_calls(job) + job.fetch(:redis_calls, 0) + end + + def get_elasticsearch_time(job) + job.fetch(:elasticsearch_duration_s, 0) + end + + def get_elasticsearch_calls(job) + job.fetch(:elasticsearch_calls, 0) + end + def get_gitaly_time(job) job.fetch(:gitaly_duration_s, 0) end |