diff options
Diffstat (limited to 'app/models/ci/build_trace_chunk.rb')
-rw-r--r-- | app/models/ci/build_trace_chunk.rb | 104 |
1 files changed, 89 insertions, 15 deletions
diff --git a/app/models/ci/build_trace_chunk.rb b/app/models/ci/build_trace_chunk.rb index 444742062d9..6926ccd9438 100644 --- a/app/models/ci/build_trace_chunk.rb +++ b/app/models/ci/build_trace_chunk.rb @@ -3,9 +3,11 @@ module Ci class BuildTraceChunk < ApplicationRecord extend ::Gitlab::Ci::Model + include ::Comparable include ::FastDestroyAll include ::Checksummable include ::Gitlab::ExclusiveLeaseHelpers + include ::Gitlab::OptimisticLocking belongs_to :build, class_name: "Ci::Build", foreign_key: :build_id @@ -29,6 +31,7 @@ module Ci } scope :live, -> { redis } + scope :persisted, -> { not_redis.order(:chunk_index) } class << self def all_stores @@ -63,12 +66,24 @@ module Ci get_store_class(store).delete_keys(value) end end + + ## + # Sometimes we do not want to read raw data. This method makes it easier + # to find attributes that are just metadata excluding raw data. + # + def metadata_attributes + attribute_names - %w[raw_data] + end end def data @data ||= get_data.to_s end + def crc32 + checksum.to_i + end + def truncate(offset = 0) raise ArgumentError, 'Offset is out of range' if offset > size || offset < 0 return if offset == size # Skip the following process as it doesn't affect anything @@ -102,22 +117,47 @@ module Ci (start_offset...end_offset) end - def persist_data! - in_lock(*lock_params) { unsafe_persist_data! } - end - def schedule_to_persist! - return if persisted? + return if flushed? Ci::BuildTraceChunkFlushWorker.perform_async(id) end - def persisted? - !redis? - end + ## + # It is possible that we run into two concurrent migrations. It might + # happen that a chunk gets migrated after being loaded by another worker + # but before the worker acquires a lock to perform the migration. + # + # We are using Redis locking to ensure that we perform this operation + # inside an exclusive lock, but this does not prevent us from running into + # race conditions related to updating a model representation in the + # database. Optimistic locking is another mechanism that help here. + # + # We are using optimistic locking combined with Redis locking to ensure + # that a chunk gets migrated properly. + # + # We are catching an exception related to an exclusive lock not being + # acquired because it is creating a lot of noise, and is a result of + # duplicated workers running in parallel for the same build trace chunk. + # + def persist_data! + in_lock(*lock_params) do # exclusive Redis lock is acquired first + raise FailedToPersistDataError, 'Modifed build trace chunk detected' if has_changes_to_save? - def live? - redis? + self.reset.then do |chunk| # we ensure having latest lock_version + chunk.unsafe_persist_data! # we migrate the data and update data store + end + end + rescue FailedToObtainLockError + metrics.increment_trace_operation(operation: :stalled) + rescue ActiveRecord::StaleObjectError + raise FailedToPersistDataError, <<~MSG + Data migration race condition detected + + store: #{data_store} + build: #{build.id} + index: #{chunk_index} + MSG end ## @@ -126,11 +166,28 @@ module Ci # no chunk with higher index in the database. # def final? - build.pending_state.present? && - build.trace_chunks.maximum(:chunk_index).to_i == chunk_index + build.pending_state.present? && chunks_max_index == chunk_index end - private + def flushed? + !redis? + end + + def migrated? + flushed? + end + + def live? + redis? + end + + def <=>(other) + return unless self.build_id == other.build_id + + self.chunk_index <=> other.chunk_index + end + + protected def get_data # Redis / database return UTF-8 encoded string by default @@ -145,12 +202,19 @@ module Ci current_size = current_data&.bytesize.to_i unless current_size == CHUNK_SIZE || final? - raise FailedToPersistDataError, 'Data is not fulfilled in a bucket' + raise FailedToPersistDataError, <<~MSG + data is not fulfilled in a bucket + + size: #{current_size} + state: #{pending_state?} + max: #{chunks_max_index} + index: #{chunk_index} + MSG end self.raw_data = nil self.data_store = new_store - self.checksum = crc32(current_data) + self.checksum = self.class.crc32(current_data) ## # We need to so persist data then save a new store identifier before we @@ -199,10 +263,20 @@ module Ci size == CHUNK_SIZE end + private + + def pending_state? + build.pending_state.present? + end + def current_store self.class.get_store_class(data_store) end + def chunks_max_index + build.trace_chunks.maximum(:chunk_index).to_i + end + def lock_params ["trace_write:#{build_id}:chunks:#{chunk_index}", { ttl: WRITE_LOCK_TTL, |