summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKamil Trzciński <ayufan@ayufan.eu>2019-09-10 16:16:11 +0000
committerKamil Trzciński <ayufan@ayufan.eu>2019-09-10 16:16:11 +0000
commit0078ea44c292cd0e5eb7f4ae52358087c8ee34db (patch)
tree97cecf00526d3548d0a8d5074f502f2f4bd62596
parent7e8453fe926325ccffbadb509c59b8a5e1867886 (diff)
parenteeeaebe60813393940751ea9bf81e720b65bf95a (diff)
downloadgitlab-ce-0078ea44c292cd0e5eb7f4ae52358087c8ee34db.tar.gz
Merge branch 'sidekiq-monitor-namespace-change' into 'master'
Sidekiq monitor namespace change See merge request gitlab-org/gitlab-ce!32878
-rw-r--r--config/initializers/sidekiq.rb2
-rw-r--r--doc/administration/troubleshooting/sidekiq.md2
-rw-r--r--lib/gitlab/sidekiq_daemon/monitor.rb184
-rw-r--r--lib/gitlab/sidekiq_middleware/monitor.rb4
-rw-r--r--lib/gitlab/sidekiq_monitor.rb182
-rw-r--r--spec/lib/gitlab/sidekiq_daemon/monitor_spec.rb (renamed from spec/lib/gitlab/sidekiq_monitor_spec.rb)4
-rw-r--r--spec/lib/gitlab/sidekiq_middleware/monitor_spec.rb6
7 files changed, 193 insertions, 191 deletions
diff --git a/config/initializers/sidekiq.rb b/config/initializers/sidekiq.rb
index 9f3e104bc2b..20f31ff6810 100644
--- a/config/initializers/sidekiq.rb
+++ b/config/initializers/sidekiq.rb
@@ -60,7 +60,7 @@ Sidekiq.configure_server do |config|
# Sidekiq (e.g. in an initializer).
ActiveRecord::Base.clear_all_connections!
- Gitlab::SidekiqMonitor.instance.start if enable_sidekiq_monitor
+ Gitlab::SidekiqDaemon::Monitor.instance.start if enable_sidekiq_monitor
end
if enable_reliable_fetch?
diff --git a/doc/administration/troubleshooting/sidekiq.md b/doc/administration/troubleshooting/sidekiq.md
index c41edb5dbfc..fdafac8420e 100644
--- a/doc/administration/troubleshooting/sidekiq.md
+++ b/doc/administration/troubleshooting/sidekiq.md
@@ -270,7 +270,7 @@ is interrupted mid-execution and it is not guaranteed
that proper rollback of transactions is implemented.
```ruby
-Gitlab::SidekiqMonitor.cancel_job('job-id')
+Gitlab::SidekiqDaemon::Monitor.cancel_job('job-id')
```
> This requires the Sidekiq to be run with `SIDEKIQ_MONITOR_WORKER=1`
diff --git a/lib/gitlab/sidekiq_daemon/monitor.rb b/lib/gitlab/sidekiq_daemon/monitor.rb
new file mode 100644
index 00000000000..bbfca130425
--- /dev/null
+++ b/lib/gitlab/sidekiq_daemon/monitor.rb
@@ -0,0 +1,184 @@
+# frozen_string_literal: true
+
+module Gitlab
+ module SidekiqDaemon
+ class Monitor < Daemon
+ include ::Gitlab::Utils::StrongMemoize
+
+ NOTIFICATION_CHANNEL = 'sidekiq:cancel:notifications'
+ CANCEL_DEADLINE = 24.hours.seconds
+ RECONNECT_TIME = 3.seconds
+
+ # We use exception derived from `Exception`
+ # to consider this as an very low-level exception
+ # that should not be caught by application
+ CancelledError = Class.new(Exception) # rubocop:disable Lint/InheritException
+
+ attr_reader :jobs_thread
+ attr_reader :jobs_mutex
+
+ def initialize
+ super
+
+ @jobs_thread = {}
+ @jobs_mutex = Mutex.new
+ end
+
+ def within_job(jid, queue)
+ jobs_mutex.synchronize do
+ jobs_thread[jid] = Thread.current
+ end
+
+ if cancelled?(jid)
+ Sidekiq.logger.warn(
+ class: self.class.to_s,
+ action: 'run',
+ queue: queue,
+ jid: jid,
+ canceled: true
+ )
+ raise CancelledError
+ end
+
+ yield
+ ensure
+ jobs_mutex.synchronize do
+ jobs_thread.delete(jid)
+ end
+ end
+
+ def self.cancel_job(jid)
+ payload = {
+ action: 'cancel',
+ jid: jid
+ }.to_json
+
+ ::Gitlab::Redis::SharedState.with do |redis|
+ redis.setex(cancel_job_key(jid), CANCEL_DEADLINE, 1)
+ redis.publish(NOTIFICATION_CHANNEL, payload)
+ end
+ end
+
+ private
+
+ def start_working
+ Sidekiq.logger.info(
+ class: self.class.to_s,
+ action: 'start',
+ message: 'Starting Monitor Daemon'
+ )
+
+ while enabled?
+ process_messages
+ sleep(RECONNECT_TIME)
+ end
+
+ ensure
+ Sidekiq.logger.warn(
+ class: self.class.to_s,
+ action: 'stop',
+ message: 'Stopping Monitor Daemon'
+ )
+ end
+
+ def stop_working
+ thread.raise(Interrupt) if thread.alive?
+ end
+
+ def process_messages
+ ::Gitlab::Redis::SharedState.with do |redis|
+ redis.subscribe(NOTIFICATION_CHANNEL) do |on|
+ on.message do |channel, message|
+ process_message(message)
+ end
+ end
+ end
+ rescue Exception => e # rubocop:disable Lint/RescueException
+ Sidekiq.logger.warn(
+ class: self.class.to_s,
+ action: 'exception',
+ message: e.message
+ )
+
+ # we re-raise system exceptions
+ raise e unless e.is_a?(StandardError)
+ end
+
+ def process_message(message)
+ Sidekiq.logger.info(
+ class: self.class.to_s,
+ channel: NOTIFICATION_CHANNEL,
+ message: 'Received payload on channel',
+ payload: message
+ )
+
+ message = safe_parse(message)
+ return unless message
+
+ case message['action']
+ when 'cancel'
+ process_job_cancel(message['jid'])
+ else
+ # unknown message
+ end
+ end
+
+ def safe_parse(message)
+ JSON.parse(message)
+ rescue JSON::ParserError
+ end
+
+ def process_job_cancel(jid)
+ return unless jid
+
+ # try to find thread without lock
+ return unless find_thread_unsafe(jid)
+
+ Thread.new do
+ # try to find a thread, but with guaranteed
+ # that handle for thread corresponds to actually
+ # running job
+ find_thread_with_lock(jid) do |thread|
+ Sidekiq.logger.warn(
+ class: self.class.to_s,
+ action: 'cancel',
+ message: 'Canceling thread with CancelledError',
+ jid: jid,
+ thread_id: thread.object_id
+ )
+
+ thread&.raise(CancelledError)
+ end
+ end
+ end
+
+ # This method needs to be thread-safe
+ # This is why it passes thread in block,
+ # to ensure that we do process this thread
+ def find_thread_unsafe(jid)
+ jobs_thread[jid]
+ end
+
+ def find_thread_with_lock(jid)
+ # don't try to lock if we cannot find the thread
+ return unless find_thread_unsafe(jid)
+
+ jobs_mutex.synchronize do
+ find_thread_unsafe(jid).tap do |thread|
+ yield(thread) if thread
+ end
+ end
+ end
+
+ def cancelled?(jid)
+ ::Gitlab::Redis::SharedState.with do |redis|
+ redis.exists(self.class.cancel_job_key(jid))
+ end
+ end
+
+ def self.cancel_job_key(jid)
+ "sidekiq:cancel:#{jid}"
+ end
+ end
+ end
+end
diff --git a/lib/gitlab/sidekiq_middleware/monitor.rb b/lib/gitlab/sidekiq_middleware/monitor.rb
index 53a6132edac..00965bf5506 100644
--- a/lib/gitlab/sidekiq_middleware/monitor.rb
+++ b/lib/gitlab/sidekiq_middleware/monitor.rb
@@ -4,10 +4,10 @@ module Gitlab
module SidekiqMiddleware
class Monitor
def call(worker, job, queue)
- Gitlab::SidekiqMonitor.instance.within_job(job['jid'], queue) do
+ Gitlab::SidekiqDaemon::Monitor.instance.within_job(job['jid'], queue) do
yield
end
- rescue Gitlab::SidekiqMonitor::CancelledError
+ rescue Gitlab::SidekiqDaemon::Monitor::CancelledError
# push job to DeadSet
payload = ::Sidekiq.dump_json(job)
::Sidekiq::DeadSet.new.kill(payload, notify_failure: false)
diff --git a/lib/gitlab/sidekiq_monitor.rb b/lib/gitlab/sidekiq_monitor.rb
deleted file mode 100644
index a58b33534bf..00000000000
--- a/lib/gitlab/sidekiq_monitor.rb
+++ /dev/null
@@ -1,182 +0,0 @@
-# frozen_string_literal: true
-
-module Gitlab
- class SidekiqMonitor < Daemon
- include ::Gitlab::Utils::StrongMemoize
-
- NOTIFICATION_CHANNEL = 'sidekiq:cancel:notifications'
- CANCEL_DEADLINE = 24.hours.seconds
- RECONNECT_TIME = 3.seconds
-
- # We use exception derived from `Exception`
- # to consider this as an very low-level exception
- # that should not be caught by application
- CancelledError = Class.new(Exception) # rubocop:disable Lint/InheritException
-
- attr_reader :jobs_thread
- attr_reader :jobs_mutex
-
- def initialize
- super
-
- @jobs_thread = {}
- @jobs_mutex = Mutex.new
- end
-
- def within_job(jid, queue)
- jobs_mutex.synchronize do
- jobs_thread[jid] = Thread.current
- end
-
- if cancelled?(jid)
- Sidekiq.logger.warn(
- class: self.class.to_s,
- action: 'run',
- queue: queue,
- jid: jid,
- canceled: true
- )
- raise CancelledError
- end
-
- yield
- ensure
- jobs_mutex.synchronize do
- jobs_thread.delete(jid)
- end
- end
-
- def self.cancel_job(jid)
- payload = {
- action: 'cancel',
- jid: jid
- }.to_json
-
- ::Gitlab::Redis::SharedState.with do |redis|
- redis.setex(cancel_job_key(jid), CANCEL_DEADLINE, 1)
- redis.publish(NOTIFICATION_CHANNEL, payload)
- end
- end
-
- private
-
- def start_working
- Sidekiq.logger.info(
- class: self.class.to_s,
- action: 'start',
- message: 'Starting Monitor Daemon'
- )
-
- while enabled?
- process_messages
- sleep(RECONNECT_TIME)
- end
-
- ensure
- Sidekiq.logger.warn(
- class: self.class.to_s,
- action: 'stop',
- message: 'Stopping Monitor Daemon'
- )
- end
-
- def stop_working
- thread.raise(Interrupt) if thread.alive?
- end
-
- def process_messages
- ::Gitlab::Redis::SharedState.with do |redis|
- redis.subscribe(NOTIFICATION_CHANNEL) do |on|
- on.message do |channel, message|
- process_message(message)
- end
- end
- end
- rescue Exception => e # rubocop:disable Lint/RescueException
- Sidekiq.logger.warn(
- class: self.class.to_s,
- action: 'exception',
- message: e.message
- )
-
- # we re-raise system exceptions
- raise e unless e.is_a?(StandardError)
- end
-
- def process_message(message)
- Sidekiq.logger.info(
- class: self.class.to_s,
- channel: NOTIFICATION_CHANNEL,
- message: 'Received payload on channel',
- payload: message
- )
-
- message = safe_parse(message)
- return unless message
-
- case message['action']
- when 'cancel'
- process_job_cancel(message['jid'])
- else
- # unknown message
- end
- end
-
- def safe_parse(message)
- JSON.parse(message)
- rescue JSON::ParserError
- end
-
- def process_job_cancel(jid)
- return unless jid
-
- # try to find thread without lock
- return unless find_thread_unsafe(jid)
-
- Thread.new do
- # try to find a thread, but with guaranteed
- # that handle for thread corresponds to actually
- # running job
- find_thread_with_lock(jid) do |thread|
- Sidekiq.logger.warn(
- class: self.class.to_s,
- action: 'cancel',
- message: 'Canceling thread with CancelledError',
- jid: jid,
- thread_id: thread.object_id
- )
-
- thread&.raise(CancelledError)
- end
- end
- end
-
- # This method needs to be thread-safe
- # This is why it passes thread in block,
- # to ensure that we do process this thread
- def find_thread_unsafe(jid)
- jobs_thread[jid]
- end
-
- def find_thread_with_lock(jid)
- # don't try to lock if we cannot find the thread
- return unless find_thread_unsafe(jid)
-
- jobs_mutex.synchronize do
- find_thread_unsafe(jid).tap do |thread|
- yield(thread) if thread
- end
- end
- end
-
- def cancelled?(jid)
- ::Gitlab::Redis::SharedState.with do |redis|
- redis.exists(self.class.cancel_job_key(jid))
- end
- end
-
- def self.cancel_job_key(jid)
- "sidekiq:cancel:#{jid}"
- end
- end
-end
diff --git a/spec/lib/gitlab/sidekiq_monitor_spec.rb b/spec/lib/gitlab/sidekiq_daemon/monitor_spec.rb
index bbd7bf90217..acbb09e3542 100644
--- a/spec/lib/gitlab/sidekiq_monitor_spec.rb
+++ b/spec/lib/gitlab/sidekiq_daemon/monitor_spec.rb
@@ -2,7 +2,7 @@
require 'spec_helper'
-describe Gitlab::SidekiqMonitor do
+describe Gitlab::SidekiqDaemon::Monitor do
let(:monitor) { described_class.new }
describe '#within_job' do
@@ -43,7 +43,7 @@ describe Gitlab::SidekiqMonitor do
before do
# we want to run at most once cycle
# we toggle `enabled?` flag after the first call
- stub_const('Gitlab::SidekiqMonitor::RECONNECT_TIME', 0)
+ stub_const('Gitlab::SidekiqDaemon::Monitor::RECONNECT_TIME', 0)
allow(monitor).to receive(:enabled?).and_return(true, false)
allow(Sidekiq.logger).to receive(:info)
diff --git a/spec/lib/gitlab/sidekiq_middleware/monitor_spec.rb b/spec/lib/gitlab/sidekiq_middleware/monitor_spec.rb
index 7319cdc2399..023df1a6391 100644
--- a/spec/lib/gitlab/sidekiq_middleware/monitor_spec.rb
+++ b/spec/lib/gitlab/sidekiq_middleware/monitor_spec.rb
@@ -10,8 +10,8 @@ describe Gitlab::SidekiqMiddleware::Monitor do
let(:job) { { 'jid' => 'job-id' } }
let(:queue) { 'my-queue' }
- it 'calls SidekiqMonitor' do
- expect(Gitlab::SidekiqMonitor.instance).to receive(:within_job)
+ it 'calls Gitlab::SidekiqDaemon::Monitor' do
+ expect(Gitlab::SidekiqDaemon::Monitor.instance).to receive(:within_job)
.with('job-id', 'my-queue')
.and_call_original
@@ -29,7 +29,7 @@ describe Gitlab::SidekiqMiddleware::Monitor do
context 'when cancel happens' do
subject do
monitor.call(worker, job, queue) do
- raise Gitlab::SidekiqMonitor::CancelledError
+ raise Gitlab::SidekiqDaemon::Monitor::CancelledError
end
end