1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
|
# frozen_string_literal: true
##
# A mixin for ActiveRecord models that enables callers to insert instances of the
# target class into the database en-bloc via the [bulk_insert] method.
#
# Upon inclusion in the target class, the mixin will perform a number of checks to
# ensure that the target is eligible for bulk insertions. For instance, it must not
# use ActiveRecord callbacks that fire between [save]s, since these would not run
# properly when instances are inserted in bulk.
#
# The mixin uses ActiveRecord 6's [InsertAll] type internally for bulk insertions.
# Unlike [InsertAll], however, it requires you to pass instances of the target type
# rather than row hashes, since it will run validations prior to insertion.
#
# @example
#
# class MyRecord < ApplicationRecord
# include BulkInsertSafe # must be included _last_ i.e. after any other concerns
# end
#
# # simple
# MyRecord.bulk_insert!(items)
#
# # with custom batch size
# MyRecord.bulk_insert!(items, batch_size: 100)
#
# # without validations
# MyRecord.bulk_insert!(items, validate: false)
#
# # with attribute hash modification
# MyRecord.bulk_insert!(items) { |item_attrs| item_attrs['col'] = 42 }
#
#
module BulkInsertSafe
extend ActiveSupport::Concern
# These are the callbacks we think safe when used on models that are
# written to the database in bulk
ALLOWED_CALLBACKS = Set[
:initialize,
:validate,
:validation,
:find,
:destroy
].freeze
DEFAULT_BATCH_SIZE = 500
MethodNotAllowedError = Class.new(StandardError)
PrimaryKeySetError = Class.new(StandardError)
class_methods do
def set_callback(name, *args)
unless _bulk_insert_callback_allowed?(name, args)
raise MethodNotAllowedError.new(
"Not allowed to call `set_callback(#{name}, #{args})` when model extends `BulkInsertSafe`." \
"Callbacks that fire per each record being inserted do not work with bulk-inserts.")
end
super
end
# Inserts the given ActiveRecord [items] to the table mapped to this class.
# Items will be inserted in batches of a given size, where insertion semantics are
# "atomic across all batches".
#
# @param [Boolean] validate Whether validations should run on [items]
# @param [Integer] batch_size How many items should at most be inserted at once
# @param [Boolean] skip_duplicates Marks duplicates as allowed, and skips inserting them
# @param [Symbol] returns Pass :ids to return an array with the primary key values
# for all inserted records or nil to omit the underlying
# RETURNING SQL clause entirely.
# @param [Proc] handle_attributes Block that will receive each item attribute hash
# prior to insertion for further processing
#
# Note that this method will throw on the following occasions:
# - [PrimaryKeySetError] when primary keys are set on entities prior to insertion
# - [ActiveRecord::RecordInvalid] on entity validation failures
# - [ActiveRecord::RecordNotUnique] on duplicate key errors
#
# @return true if operation succeeded, throws otherwise.
#
def bulk_insert!(items, validate: true, skip_duplicates: false, returns: nil, batch_size: DEFAULT_BATCH_SIZE, &handle_attributes)
_bulk_insert_all!(items,
validate: validate,
on_duplicate: skip_duplicates ? :skip : :raise,
returns: returns,
unique_by: nil,
batch_size: batch_size,
&handle_attributes)
end
# Upserts the given ActiveRecord [items] to the table mapped to this class.
# Items will be inserted or updated in batches of a given size,
# where insertion semantics are "atomic across all batches".
#
# @param [Boolean] validate Whether validations should run on [items]
# @param [Integer] batch_size How many items should at most be inserted at once
# @param [Symbol/Array] unique_by Defines index or columns to use to consider item duplicate
# @param [Symbol] returns Pass :ids to return an array with the primary key values
# for all inserted or updated records or nil to omit the
# underlying RETURNING SQL clause entirely.
# @param [Proc] handle_attributes Block that will receive each item attribute hash
# prior to insertion for further processing
#
# Unique indexes can be identified by columns or name:
# - unique_by: :isbn
# - unique_by: %i[ author_id name ]
# - unique_by: :index_books_on_isbn
#
# Note that this method will throw on the following occasions:
# - [PrimaryKeySetError] when primary keys are set on entities prior to insertion
# - [ActiveRecord::RecordInvalid] on entity validation failures
# - [ActiveRecord::RecordNotUnique] on duplicate key errors
#
# @return true if operation succeeded, throws otherwise.
#
def bulk_upsert!(items, unique_by:, returns: nil, validate: true, batch_size: DEFAULT_BATCH_SIZE, &handle_attributes)
_bulk_insert_all!(items,
validate: validate,
on_duplicate: :update,
returns: returns,
unique_by: unique_by,
batch_size: batch_size,
&handle_attributes)
end
private
def _bulk_insert_all!(items, on_duplicate:, returns:, unique_by:, validate:, batch_size:, &handle_attributes)
return [] if items.empty?
returning =
case returns
when :ids
[primary_key]
when nil
false
else
raise ArgumentError, "returns needs to be :ids or nil"
end
transaction do
items.each_slice(batch_size).flat_map do |item_batch|
attributes = _bulk_insert_item_attributes(
item_batch, validate, &handle_attributes)
ActiveRecord::InsertAll
.new(self, attributes, on_duplicate: on_duplicate, returning: returning, unique_by: unique_by)
.execute
.pluck(primary_key)
end
end
end
def _bulk_insert_item_attributes(items, validate_items)
items.map do |item|
item.validate! if validate_items
attributes = {}
column_names.each do |name|
attributes[name] = item.read_attribute(name)
end
_bulk_insert_reject_primary_key!(attributes, item.class.primary_key)
yield attributes if block_given?
attributes
end
end
def _bulk_insert_reject_primary_key!(attributes, primary_key)
if existing_pk = attributes.delete(primary_key)
raise PrimaryKeySetError, "Primary key set: #{primary_key}:#{existing_pk}\n" \
"Bulk-inserts are only supported for rows that don't already have PK set"
end
end
def _bulk_insert_callback_allowed?(name, args)
ALLOWED_CALLBACKS.include?(name) || _bulk_insert_saved_from_belongs_to?(name, args)
end
# belongs_to associations will install a before_save hook during class loading
def _bulk_insert_saved_from_belongs_to?(name, args)
args.first == :before && args.second.to_s.start_with?('autosave_associated_records_for_')
end
end
end
|