From d7256329b5a16bcbaaaf9492d1835aa8a6ed1f9c Mon Sep 17 00:00:00 2001 From: Andreas Brandl Date: Wed, 10 Jul 2019 14:17:38 +0200 Subject: WIP: Scratchpad of SKIP LOCKED scheduler --- app/models/job.rb | 3 ++ app/models/job_log.rb | 13 ++++++ app/services/job_scheduler.rb | 50 ++++++++++++++++++++++++ app/workers/job_worker.rb | 12 ++++++ db/migrate/20190710103736_create_queue_tables.rb | 44 +++++++++++++++++++++ db/schema.rb | 15 ++++++- db/seeds.rb | 4 ++ run_job_scheduler.sh | 6 +++ 8 files changed, 146 insertions(+), 1 deletion(-) create mode 100644 app/models/job.rb create mode 100644 app/models/job_log.rb create mode 100644 app/services/job_scheduler.rb create mode 100644 app/workers/job_worker.rb create mode 100644 db/migrate/20190710103736_create_queue_tables.rb create mode 100755 run_job_scheduler.sh diff --git a/app/models/job.rb b/app/models/job.rb new file mode 100644 index 00000000000..4e7e098d854 --- /dev/null +++ b/app/models/job.rb @@ -0,0 +1,3 @@ +class Job < ApplicationRecord + has_many :job_logs +end diff --git a/app/models/job_log.rb b/app/models/job_log.rb new file mode 100644 index 00000000000..5296f429320 --- /dev/null +++ b/app/models/job_log.rb @@ -0,0 +1,13 @@ +class JobLog < ApplicationRecord + belongs_to :job + + after_initialize do |log| + log.started_at = Time.now() + end + + before_create do + if self.finished_at.blank? + self.finished_at = Time.now() + end + end +end diff --git a/app/services/job_scheduler.rb b/app/services/job_scheduler.rb new file mode 100644 index 00000000000..01c9bdbcc84 --- /dev/null +++ b/app/services/job_scheduler.rb @@ -0,0 +1,50 @@ +class JobScheduler + + attr_reader :ident + def initialize(ident = nil) + @ident = ident + end + + def start_work! + while run_next_job + end + end + + private + def run_next_job + ActiveRecord::Base.transaction do + job = dequeue_job! + + unless job + Rails.logger.info "Nothing to do." + return false + end + + log = JobLog.new(job: job, scheduler: ident) + + # Note that we may want to trade full consistency over + # making sure the database transactions are short. + # + # We could reschedule the job here and commit + # and only then execute the job. + + begin + JobWorker.new(job).perform + ensure + log.save! + reschedule(job) + end + + return true + end + end + + def dequeue_job! + Job.lock('FOR UPDATE SKIP LOCKED').where('scheduled_for <= NOW()').order(:scheduled_for, :id).limit(1).first + end + + def reschedule(job) + job.scheduled_for = Time.now() + 10.seconds + job.save! + end +end diff --git a/app/workers/job_worker.rb b/app/workers/job_worker.rb new file mode 100644 index 00000000000..19b3d015096 --- /dev/null +++ b/app/workers/job_worker.rb @@ -0,0 +1,12 @@ +class JobWorker + + attr_reader :job + + def initialize(job) + @job = job + end + + def perform + # Do nothing + end +end diff --git a/db/migrate/20190710103736_create_queue_tables.rb b/db/migrate/20190710103736_create_queue_tables.rb new file mode 100644 index 00000000000..9dd36753851 --- /dev/null +++ b/db/migrate/20190710103736_create_queue_tables.rb @@ -0,0 +1,44 @@ +# frozen_string_literal: true + +# See http://doc.gitlab.com/ce/development/migration_style_guide.html +# for more information on how to write migrations for GitLab. + +class CreateQueueTables < ActiveRecord::Migration[5.1] + include Gitlab::Database::MigrationHelpers + + # Set this constant to true if this migration requires downtime. + DOWNTIME = false + + # When a migration requires downtime you **must** uncomment the following + # constant and define a short and easy to understand explanation as to why the + # migration requires downtime. + # DOWNTIME_REASON = '' + + # When using the methods "add_concurrent_index", "remove_concurrent_index" or + # "add_column_with_default" you must disable the use of transactions + # as these methods can not run in an existing transaction. + # When using "add_concurrent_index" or "remove_concurrent_index" methods make sure + # that either of them is the _only_ method called in the migration, + # any other changes should go in a separate migration. + # This ensures that upon failure _only_ the index creation or removing fails + # and can be retried or reverted easily. + # + # To disable transactions uncomment the following line and remove these + # comments: + # disable_ddl_transaction! + + def change + create_table :jobs do |t| + t.datetime :scheduled_for, null: false + end + + add_index :jobs, [:scheduled_for, :id] + + create_table :job_logs do |t| + t.references :job, index: true, foreign_key: { on_delete: :cascade }, null: false + t.datetime :started_at, null: false + t.datetime :finished_at, null: false + t.string :scheduler + end + end +end diff --git a/db/schema.rb b/db/schema.rb index 9a8b64689bd..b9c75992ecc 100644 --- a/db/schema.rb +++ b/db/schema.rb @@ -10,7 +10,7 @@ # # It's strongly recommended that you check this file into your version control system. -ActiveRecord::Schema.define(version: 20190703130053) do +ActiveRecord::Schema.define(version: 20190710103736) do # These are extensions that must be enabled in order to support this database enable_extension "plpgsql" @@ -1722,6 +1722,18 @@ ActiveRecord::Schema.define(version: 20190703130053) do t.index ["service_id"], name: "index_jira_tracker_data_on_service_id", using: :btree end + create_table "job_logs", force: :cascade do |t| + t.bigint "job_id", null: false + t.datetime "started_at", null: false + t.datetime "finished_at", null: false + t.index ["job_id"], name: "index_job_logs_on_job_id", using: :btree + end + + create_table "jobs", force: :cascade do |t| + t.datetime "scheduled_for", null: false + t.index ["scheduled_for"], name: "index_jobs_on_scheduled_for", using: :btree + end + create_table "keys", id: :serial, force: :cascade do |t| t.integer "user_id" t.datetime "created_at" @@ -3748,6 +3760,7 @@ ActiveRecord::Schema.define(version: 20190703130053) do add_foreign_key "jira_connect_subscriptions", "jira_connect_installations", name: "fk_f1d617343f", on_delete: :cascade add_foreign_key "jira_connect_subscriptions", "namespaces", name: "fk_a3c10bcf7d", on_delete: :cascade add_foreign_key "jira_tracker_data", "services", on_delete: :cascade + add_foreign_key "job_logs", "jobs", on_delete: :cascade add_foreign_key "label_links", "labels", name: "fk_d97dd08678", on_delete: :cascade add_foreign_key "label_priorities", "labels", on_delete: :cascade add_foreign_key "label_priorities", "projects", on_delete: :cascade diff --git a/db/seeds.rb b/db/seeds.rb index e69de29bb2d..3cad3458733 100644 --- a/db/seeds.rb +++ b/db/seeds.rb @@ -0,0 +1,4 @@ + +ActiveRecord::Base.connection.execute <<~SQL + insert into jobs (scheduled_for) SELECT now() FROM generate_series(1,10000); +SQL diff --git a/run_job_scheduler.sh b/run_job_scheduler.sh new file mode 100755 index 00000000000..160f69c623d --- /dev/null +++ b/run_job_scheduler.sh @@ -0,0 +1,6 @@ +#!/bin/bash +for i in {1..10}; do + bin/rails runner "JobScheduler.new('sched-${i}').start_work!" & +done + +sleep 60 -- cgit v1.2.1