summaryrefslogtreecommitdiff
path: root/app/models/concerns/bulk_insert_safe.rb
blob: 908f0b6a7e2b119cc7ed8302868e3d61dca0d84a (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
# frozen_string_literal: true

##
# A mixin for ActiveRecord models that enables callers to insert instances of the
# target class into the database en-bloc via the [bulk_insert] method.
#
# Upon inclusion in the target class, the mixin will perform a number of checks to
# ensure that the target is eligible for bulk insertions. For instance, it must not
# use ActiveRecord callbacks that fire between [save]s, since these would not run
# properly when instances are inserted in bulk.
#
# The mixin uses ActiveRecord 6's [InsertAll] type internally for bulk insertions.
# Unlike [InsertAll], however, it requires you to pass instances of the target type
# rather than row hashes, since it will run validations prior to insertion.
#
# @example
#
#   class MyRecord < ApplicationRecord
#     include BulkInsertSafe # must be included _last_ i.e. after any other concerns
#   end
#
#   # simple
#   MyRecord.bulk_insert!(items)
#
#   # with custom batch size
#   MyRecord.bulk_insert!(items, batch_size: 100)
#
#   # without validations
#   MyRecord.bulk_insert!(items, validate: false)
#
#   # with attribute hash modification
#   MyRecord.bulk_insert!(items) { |item_attrs| item_attrs['col'] = 42 }
#
#
module BulkInsertSafe
  extend ActiveSupport::Concern

  # These are the callbacks we think safe when used on models that are
  # written to the database in bulk
  ALLOWED_CALLBACKS = Set[
    :initialize,
    :validate,
    :validation,
    :find,
    :destroy
  ].freeze

  DEFAULT_BATCH_SIZE = 500

  MethodNotAllowedError = Class.new(StandardError)
  PrimaryKeySetError = Class.new(StandardError)

  class_methods do
    def set_callback(name, *args)
      unless _bulk_insert_callback_allowed?(name, args)
        raise MethodNotAllowedError,
          "Not allowed to call `set_callback(#{name}, #{args})` when model extends `BulkInsertSafe`." \
            "Callbacks that fire per each record being inserted do not work with bulk-inserts."
      end

      super
    end

    # Inserts the given ActiveRecord [items] to the table mapped to this class.
    # Items will be inserted in batches of a given size, where insertion semantics are
    # "atomic across all batches".
    #
    # @param [Boolean] validate          Whether validations should run on [items]
    # @param [Integer] batch_size        How many items should at most be inserted at once
    # @param [Boolean] skip_duplicates   Marks duplicates as allowed, and skips inserting them
    # @param [Symbol]  returns           Pass :ids to return an array with the primary key values
    #                                    for all inserted records or nil to omit the underlying
    #                                    RETURNING SQL clause entirely.
    # @param [Proc]    handle_attributes Block that will receive each item attribute hash
    #                                    prior to insertion for further processing
    #
    # Note that this method will throw on the following occasions:
    # - [PrimaryKeySetError]            when primary keys are set on entities prior to insertion
    # - [ActiveRecord::RecordInvalid]   on entity validation failures
    # - [ActiveRecord::RecordNotUnique] on duplicate key errors
    #
    # @return true if operation succeeded, throws otherwise.
    #
    def bulk_insert!(items, validate: true, skip_duplicates: false, returns: nil, batch_size: DEFAULT_BATCH_SIZE, &handle_attributes)
      _bulk_insert_all!(items,
        validate: validate,
        on_duplicate: skip_duplicates ? :skip : :raise,
        returns: returns,
        unique_by: nil,
        batch_size: batch_size,
        &handle_attributes)
    end

    # Upserts the given ActiveRecord [items] to the table mapped to this class.
    # Items will be inserted or updated in batches of a given size,
    # where insertion semantics are "atomic across all batches".
    #
    # @param [Boolean] validate          Whether validations should run on [items]
    # @param [Integer] batch_size        How many items should at most be inserted at once
    # @param [Symbol/Array] unique_by    Defines index or columns to use to consider item duplicate
    # @param [Symbol]  returns           Pass :ids to return an array with the primary key values
    #                                    for all inserted or updated records or nil to omit the
    #                                    underlying RETURNING SQL clause entirely.
    # @param [Proc]    handle_attributes Block that will receive each item attribute hash
    #                                    prior to insertion for further processing
    #
    # Unique indexes can be identified by columns or name:
    #  - unique_by: :isbn
    #  - unique_by: %i[ author_id name ]
    #  - unique_by: :index_books_on_isbn
    #
    # Note that this method will throw on the following occasions:
    # - [PrimaryKeySetError]            when primary keys are set on entities prior to insertion
    # - [ActiveRecord::RecordInvalid]   on entity validation failures
    # - [ActiveRecord::RecordNotUnique] on duplicate key errors
    #
    # @return true if operation succeeded, throws otherwise.
    #
    def bulk_upsert!(items, unique_by:, returns: nil, validate: true, batch_size: DEFAULT_BATCH_SIZE, &handle_attributes)
      _bulk_insert_all!(items,
        validate: validate,
        on_duplicate: :update,
        returns: returns,
        unique_by: unique_by,
        batch_size: batch_size,
        &handle_attributes)
    end

    private

    def _bulk_insert_all!(items, on_duplicate:, returns:, unique_by:, validate:, batch_size:, &handle_attributes)
      return [] if items.empty?

      returning =
        case returns
        when :ids
          [primary_key]
        when nil
          false
        else
          raise ArgumentError, "returns needs to be :ids or nil"
        end

      # Handle insertions for tables with a composite primary key
      primary_keys = connection.schema_cache.primary_keys(table_name)
      if unique_by.blank? && primary_key != primary_keys
        unique_by = primary_keys
      end

      transaction do
        items.each_slice(batch_size).flat_map do |item_batch|
          attributes = _bulk_insert_item_attributes(
            item_batch, validate, &handle_attributes)

          ActiveRecord::InsertAll
              .new(self, attributes, on_duplicate: on_duplicate, returning: returning, unique_by: unique_by)
              .execute
              .pluck(primary_key)
        end
      end
    end

    def _bulk_insert_item_attributes(items, validate_items)
      items.map do |item|
        item.validate! if validate_items

        attributes = {}
        column_names.each do |name|
          attributes[name] = item.read_attribute(name)
        end

        _bulk_insert_reject_primary_key!(attributes, item.class.primary_key)

        yield attributes if block_given?

        attributes
      end
    end

    def _bulk_insert_reject_primary_key!(attributes, primary_key)
      if existing_pk = attributes.delete(primary_key)
        raise PrimaryKeySetError, "Primary key set: #{primary_key}:#{existing_pk}\n" \
          "Bulk-inserts are only supported for rows that don't already have PK set"
      end
    end

    def _bulk_insert_callback_allowed?(name, args)
      ALLOWED_CALLBACKS.include?(name) || _bulk_insert_saved_from_belongs_to?(name, args)
    end

    # belongs_to associations will install a before_save hook during class loading
    def _bulk_insert_saved_from_belongs_to?(name, args)
      args.first == :before && args.second.to_s.start_with?('autosave_associated_records_for_')
    end
  end
end