summaryrefslogtreecommitdiff
path: root/app/models/loose_foreign_keys/deleted_record.rb
blob: 94444f4b6d3756cdd62250148a3de67681f42f6a (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
# frozen_string_literal: true

class LooseForeignKeys::DeletedRecord < Gitlab::Database::SharedModel
  PARTITION_DURATION = 1.day

  include PartitionedTable

  self.primary_key = :id
  self.ignored_columns = %i[partition]

  partitioned_by :partition, strategy: :sliding_list,
                             next_partition_if: -> (active_partition) do
                                                  oldest_record_in_partition = LooseForeignKeys::DeletedRecord
                                                    .select(:id, :created_at)
                                                    .for_partition(active_partition)
                                                    .order(:id)
                                                    .limit(1)
                                                    .take

                                                  oldest_record_in_partition.present? &&
                                                    oldest_record_in_partition.created_at < PARTITION_DURATION.ago
                                                end,
                             detach_partition_if: -> (partition) do
                                                    !LooseForeignKeys::DeletedRecord
                                                      .for_partition(partition)
                                                      .status_pending
                                                      .exists?
                                                  end

  scope :for_table, -> (table) { where(fully_qualified_table_name: table) }
  scope :for_partition, -> (partition) { where(partition: partition) }
  scope :consume_order, -> { order(:partition, :consume_after, :id) }

  enum status: { pending: 1, processed: 2 }, _prefix: :status

  def self.load_batch_for_table(table, batch_size)
    # selecting partition as partition_number to workaround the sliding partitioning column ignore
    select(arel_table[Arel.star], arel_table[:partition].as('partition_number'))
      .for_table(table)
      .status_pending
      .consume_order
      .limit(batch_size)
      .to_a
  end

  def self.mark_records_processed(records)
    update_by_partition(records) do |partitioned_scope|
      partitioned_scope.update_all(status: :processed)
    end
  end

  def self.reschedule(records, consume_after)
    update_by_partition(records) do |partitioned_scope|
      partitioned_scope.update_all(consume_after: consume_after, cleanup_attempts: 0)
    end
  end

  def self.increment_attempts(records)
    update_by_partition(records) do |partitioned_scope|
      # Naive incrementing of the cleanup_attempts is good enough for us.
      partitioned_scope.update_all('cleanup_attempts = cleanup_attempts + 1')
    end
  end

  def self.update_by_partition(records)
    update_count = 0

    # Run a query for each partition to optimize the row lookup by primary key (partition, id)
    records.group_by(&:partition_number).each do |partition, records_within_partition|
      partitioned_scope = status_pending
        .for_partition(partition)
        .where(id: records_within_partition.pluck(:id))

      update_count += yield(partitioned_scope)
    end

    update_count
  end

  private_class_method :update_by_partition
end