diff options
author | Shinya Maeda <shinya@gitlab.com> | 2018-04-02 04:20:44 +0900 |
---|---|---|
committer | Shinya Maeda <shinya@gitlab.com> | 2018-04-05 14:14:54 +0900 |
commit | 0971f301fb6e8bfb17e15e6853420ffc008af302 (patch) | |
tree | a2e21eb3c7b4fc746507de892ea507d26f995bd2 /lib | |
parent | d1632da8c30b69ff915e78a86a661282b8ef24e6 (diff) | |
download | gitlab-ce-0971f301fb6e8bfb17e15e6853420ffc008af302.tar.gz |
Add new concerns
Diffstat (limited to 'lib')
-rw-r--r-- | lib/gitlab/ci/trace.rb | 6 | ||||
-rw-r--r-- | lib/gitlab/ci/trace/chunked_file/chunk_store/database.rb | 8 | ||||
-rw-r--r-- | lib/gitlab/ci/trace/chunked_file/chunk_store/redis.rb | 9 | ||||
-rw-r--r-- | lib/gitlab/ci/trace/chunked_file/chunked_io.rb | 150 | ||||
-rw-r--r-- | lib/gitlab/ci/trace/chunked_file/concerns/callbacks.rb | 38 | ||||
-rw-r--r-- | lib/gitlab/ci/trace/chunked_file/concerns/errors.rb | 18 | ||||
-rw-r--r-- | lib/gitlab/ci/trace/chunked_file/concerns/hooks.rb | 63 | ||||
-rw-r--r-- | lib/gitlab/ci/trace/chunked_file/concerns/opener.rb | 23 | ||||
-rw-r--r-- | lib/gitlab/ci/trace/chunked_file/concerns/permissions.rb | 65 | ||||
-rw-r--r-- | lib/gitlab/ci/trace/chunked_file/live_trace.rb | 44 |
10 files changed, 308 insertions, 116 deletions
diff --git a/lib/gitlab/ci/trace.rb b/lib/gitlab/ci/trace.rb index 9db65961a52..83ed58f4845 100644 --- a/lib/gitlab/ci/trace.rb +++ b/lib/gitlab/ci/trace.rb @@ -61,7 +61,7 @@ module Gitlab stream = Gitlab::Ci::Trace::Stream.new do if trace_artifact trace_artifact.open - elsif Feature.enabled?('ci_enable_live_trace') && ChunkedFile::LiveTrace.exists?(job.id) + elsif ChunkedFile::LiveTrace.exist?(job.id) ChunkedFile::LiveTrace.new(job.id, "rb") elsif current_path File.open(current_path, "rb") @@ -109,10 +109,10 @@ module Gitlab raise ArchiveError, 'Already archived' if trace_artifact raise ArchiveError, 'Job is not finished yet' unless job.complete? - if Feature.enabled?('ci_enable_live_trace') && ChunkedFile::LiveTrace.exists?(job.id) + if ChunkedFile::LiveTrace.exist?(job.id) ChunkedFile::LiveTrace.open(job.id, "wb") do |stream| archive_stream!(stream) - stream.truncate(0) + stream.delete end elsif current_path File.open(current_path) do |stream| diff --git a/lib/gitlab/ci/trace/chunked_file/chunk_store/database.rb b/lib/gitlab/ci/trace/chunked_file/chunk_store/database.rb index 45bf5053775..a7db214f428 100644 --- a/lib/gitlab/ci/trace/chunked_file/chunk_store/database.rb +++ b/lib/gitlab/ci/trace/chunked_file/chunk_store/database.rb @@ -29,6 +29,10 @@ module Gitlab ::Ci::JobTraceChunk.where(job_id: job_id).pluck('data') .inject(0) { |sum, data| sum + data.length } end + + def delete_all(job_id) + ::Ci::JobTraceChunk.destroy_all(job_id: job_id) + end end attr_reader :job_trace_chunk @@ -67,9 +71,7 @@ module Gitlab end def truncate!(offset) - raise NotImplementedError, 'Partial truncate is not supported' unless offset == 0 - - delete! + raise NotImplementedError end def delete! diff --git a/lib/gitlab/ci/trace/chunked_file/chunk_store/redis.rb b/lib/gitlab/ci/trace/chunked_file/chunk_store/redis.rb index d77a6847a71..cb45cd5fba5 100644 --- a/lib/gitlab/ci/trace/chunked_file/chunk_store/redis.rb +++ b/lib/gitlab/ci/trace/chunked_file/chunk_store/redis.rb @@ -38,6 +38,14 @@ module Gitlab end end + def delete_all(job_id) + Gitlab::Redis::Cache.with do |redis| + redis.scan_each(:match => buffer_key(job_id, '?')) do |key| + redis.del(key) + end + end + end + def buffer_key(job_id, chunk_index) "live_trace_buffer:#{job_id}:#{chunk_index}" end @@ -87,7 +95,6 @@ module Gitlab puts "#{self.class.name} - #{__callee__}: offset: #{offset.inspect} params[:chunk_index]: #{params[:chunk_index]}" Gitlab::Redis::Cache.with do |redis| return 0 unless redis.exists(buffer_key) - return delete! if offset == 0 truncated_data = redis.getrange(buffer_key, 0, offset) redis.set(buffer_key, truncated_data) diff --git a/lib/gitlab/ci/trace/chunked_file/chunked_io.rb b/lib/gitlab/ci/trace/chunked_file/chunked_io.rb index 3adfc43769b..d23fe2a47d5 100644 --- a/lib/gitlab/ci/trace/chunked_file/chunked_io.rb +++ b/lib/gitlab/ci/trace/chunked_file/chunked_io.rb @@ -8,43 +8,30 @@ module Gitlab class Trace module ChunkedFile class ChunkedIO - class << self - def open(*args) - stream = self.new(*args) - - yield stream - ensure - stream&.close - end - end - - WriteError = Class.new(StandardError) - FailedToGetChunkError = Class.new(StandardError) + extend ChunkedFile::Concerns::Opener + include ChunkedFile::Concerns::Errors + include ChunkedFile::Concerns::Hooks + include ChunkedFile::Concerns::Callbacks + prepend ChunkedFile::Concerns::Permissions attr_reader :size attr_reader :tell attr_reader :chunk, :chunk_range - attr_reader :write_lock_uuid attr_reader :job_id + attr_reader :mode alias_method :pos, :tell - def initialize(job_id, size, mode) + def initialize(job_id, size, mode = 'rb') @size = size @tell = 0 @job_id = job_id + @mode = mode - if /(w|a)/ =~ mode - @write_lock_uuid = Gitlab::ExclusiveLease.new(write_lock_key, timeout: 1.hour.to_i).try_obtain - - raise WriteError, 'Already opened by another process' unless write_lock_uuid - - seek(0, IO::SEEK_END) if /a/ =~ mode - end + raise NotImplementedError, "Mode 'w' is not supported" if mode.include?('w') end def close - Gitlab::ExclusiveLease.cancel(write_lock_key, write_lock_uuid) if write_lock_uuid end def binmode @@ -55,20 +42,20 @@ module Gitlab true end - def seek(pos, where = IO::SEEK_SET) + def seek(amount, where = IO::SEEK_SET) new_pos = case where when IO::SEEK_END - size + pos + size + amount when IO::SEEK_SET - pos + amount when IO::SEEK_CUR - tell + pos + tell + amount else -1 end - raise 'new position is outside of file' if new_pos < 0 || new_pos > size + raise ArgumentError, 'new position is outside of file' if new_pos < 0 || new_pos > size @tell = new_pos end @@ -122,42 +109,18 @@ module Gitlab out end - def write(data, &block) - raise WriteError, 'Could not write without lock' unless write_lock_uuid - raise WriteError, 'Could not write empty data' unless data.present? - - _data = data.dup - prev_tell = tell - - until _data.empty? - writable_space = buffer_size - chunk_offset - writing_size = [writable_space, _data.length].min - written_size = write_chunk!(_data.slice!(0...writing_size), &block) + def write(data) + raise ArgumentError, 'Could not write empty data' unless data.present? - @tell += written_size - @size = [tell, size].max + if mode.include?('w') + write_as_overwrite(data) + elsif mode.include?('a') + write_as_append(data) end - - tell - prev_tell end - def truncate(offset, &block) - raise WriteError, 'Could not write without lock' unless write_lock_uuid - raise WriteError, 'Offset is out of bound' if offset > size || offset < 0 - - @tell = size - 1 - - until size == offset - truncatable_space = size - chunk_start - _chunk_offset = (offset <= chunk_start) ? 0 : offset % buffer_size - removed_size = truncate_chunk!(_chunk_offset, &block) - - @tell -= removed_size - @size -= removed_size - end - - @tell = [tell, 0].max - @size = [size, 0].max + def truncate(offset) + raise NotImplementedError end def flush @@ -178,9 +141,6 @@ module Gitlab unless in_range? chunk_store.open(job_id, chunk_index, params_for_store) do |store| @chunk = store.get - - raise FailedToGetChunkError unless chunk && chunk.length > 0 - @chunk_range = (chunk_start...(chunk_start + chunk.length)) end end @@ -188,30 +148,54 @@ module Gitlab @chunk[chunk_offset..buffer_size] end - def write_chunk!(data, &block) - chunk_store.open(job_id, chunk_index, params_for_store) do |store| - written_size = if buffer_size == data.length - store.write!(data) - else - store.append!(data) - end + def write_as_overwrite(data) + raise NotImplementedError, "Overwrite is not supported" + end + + def write_as_append(data) + @tell = size - raise WriteError, 'Written size mismatch' unless data.length == written_size + data_size = data.size + new_tell = tell + data_size + data_offset = 0 - block.call(store) if block_given? + until tell == new_tell + writable_size = buffer_size - chunk_offset + writable_data = data[data_offset...(data_offset + writable_size)] + written_size = write_chunk(writable_data) - written_size + data_offset += written_size + @tell += written_size + @size = [tell, size].max end + + data_size end - def truncate_chunk!(offset, &block) + def write_chunk(data) chunk_store.open(job_id, chunk_index, params_for_store) do |store| - removed_size = store.size - offset - store.truncate!(offset) + with_callbacks(:write_chunk, store) do + written_size = if buffer_size == data.length + store.write!(data) + else + store.append!(data) + end - block.call(store) if block_given? + raise WriteError, 'Written size mismatch' unless data.length == written_size - removed_size + written_size + end + end + end + + def truncate_chunk(offset) + chunk_store.open(job_id, chunk_index, params_for_store) do |store| + with_callbacks(:truncate_chunk, store) do + removed_size = store.size - offset + store.truncate!(offset) + + removed_size + end end end @@ -240,19 +224,15 @@ module Gitlab end def chunks_count - (size / buffer_size) + (has_extra? ? 1 : 0) + (size / buffer_size) end - def has_extra? - (size % buffer_size) > 0 + def first_chunk? + chunk_index == 0 end def last_chunk? - chunks_count == 0 || chunk_index == (chunks_count - 1) || chunk_index == chunks_count - end - - def write_lock_key - "live_trace:operation:write:#{job_id}" + chunks_count == 0 || chunk_index == (chunks_count - 1) end def chunk_store diff --git a/lib/gitlab/ci/trace/chunked_file/concerns/callbacks.rb b/lib/gitlab/ci/trace/chunked_file/concerns/callbacks.rb new file mode 100644 index 00000000000..0a49ac4dbbf --- /dev/null +++ b/lib/gitlab/ci/trace/chunked_file/concerns/callbacks.rb @@ -0,0 +1,38 @@ +module Gitlab + module Ci + class Trace + module ChunkedFile + module Concerns + module Callbacks + extend ActiveSupport::Concern + + included do + class_attribute :_before_callbacks, :_after_callbacks, + :instance_writer => false + self._before_callbacks = Hash.new [] + self._after_callbacks = Hash.new [] + end + + def with_callbacks(kind, *args) + self.class._before_callbacks[kind].each { |c| send c, *args } + yield + self.class._after_callbacks[kind].each { |c| send c, *args } + end + + module ClassMethods + def before_callback(kind, callback) + self._before_callbacks = self._before_callbacks. + merge kind => _before_callbacks[kind] + [callback] + end + + def after_callback(kind, callback) + self._after_callbacks = self._after_callbacks. + merge kind => _after_callbacks[kind] + [callback] + end + end + end + end + end + end + end +end diff --git a/lib/gitlab/ci/trace/chunked_file/concerns/errors.rb b/lib/gitlab/ci/trace/chunked_file/concerns/errors.rb new file mode 100644 index 00000000000..ccdb17005e2 --- /dev/null +++ b/lib/gitlab/ci/trace/chunked_file/concerns/errors.rb @@ -0,0 +1,18 @@ +module Gitlab + module Ci + class Trace + module ChunkedFile + module Concerns + module Errors + extend ActiveSupport::Concern + + included do + WriteError = Class.new(StandardError) + FailedToGetChunkError = Class.new(StandardError) + end + end + end + end + end + end +end diff --git a/lib/gitlab/ci/trace/chunked_file/concerns/hooks.rb b/lib/gitlab/ci/trace/chunked_file/concerns/hooks.rb new file mode 100644 index 00000000000..290a3a15805 --- /dev/null +++ b/lib/gitlab/ci/trace/chunked_file/concerns/hooks.rb @@ -0,0 +1,63 @@ +module Gitlab + module Ci + class Trace + module ChunkedFile + module Concerns + module Hooks + extend ActiveSupport::Concern + + included do + class_attribute :_before_methods, :_after_methods, + :instance_writer => false + self._before_methods = Hash.new [] + self._after_methods = Hash.new [] + end + + class_methods do + def before_method(kind, callback) + self._before_methods = self._before_methods. + merge kind => _before_methods[kind] + [callback] + end + + def after_method(kind, callback) + self._after_methods = self._after_methods. + merge kind => _after_methods[kind] + [callback] + end + end + + def method_added(method_name) + return if self.class._before_methods.values.include?(method_name) + return if self.class._after_methods.values.include?(method_name) + return if hooked_methods.include?(method_name) + + add_hooks_to(method_name) + end + + private + + def hooked_methods + @hooked_methods ||= [] + end + + def add_hooks_to(method_name) + hooked_methods << method_name + + original_method = instance_method(method_name) + + # re-define the method, but notice how we reference the original + # method definition + define_method(method_name) do |*args, &block| + self.class._before_methods[method_name].each { |hook| method(hook).call } + + # now invoke the original method + original_method.bind(self).call(*args, &block).tap do + self.class._after_methods[method_name].each { |hook| method(hook).call } + end + end + end + end + end + end + end + end +end diff --git a/lib/gitlab/ci/trace/chunked_file/concerns/opener.rb b/lib/gitlab/ci/trace/chunked_file/concerns/opener.rb new file mode 100644 index 00000000000..9f1f6eefcbc --- /dev/null +++ b/lib/gitlab/ci/trace/chunked_file/concerns/opener.rb @@ -0,0 +1,23 @@ +module Gitlab + module Ci + class Trace + module ChunkedFile + module Concerns + module Opener + extend ActiveSupport::Concern + + class_methods do + def open(*args) + stream = self.new(*args) + + yield stream + ensure + stream&.close + end + end + end + end + end + end + end +end diff --git a/lib/gitlab/ci/trace/chunked_file/concerns/permissions.rb b/lib/gitlab/ci/trace/chunked_file/concerns/permissions.rb new file mode 100644 index 00000000000..f8703970466 --- /dev/null +++ b/lib/gitlab/ci/trace/chunked_file/concerns/permissions.rb @@ -0,0 +1,65 @@ +module Gitlab + module Ci + class Trace + module ChunkedFile + module Concerns + module Permissions + extend ActiveSupport::Concern + + included do + PermissionError = Class.new(StandardError) + + attr_reader :write_lock_uuid + + # mode checks + before_method :read, :can_read! + before_method :readline, :can_read! + before_method :each_line, :can_read! + before_method :write, :can_write! + before_method :truncate, :can_write! + + # write_lock + before_method :write, :check_lock! + before_method :truncate, :check_lock! + end + + def initialize(job_id, size, mode = 'rb') + if /(w|a)/ =~ mode + @write_lock_uuid = Gitlab::ExclusiveLease + .new(write_lock_key, timeout: 1.hour.to_i).try_obtain + + raise PermissionError, 'Already opened by another process' unless write_lock_uuid + end + + super + end + + def close + if write_lock_uuid + Gitlab::ExclusiveLease.cancel(write_lock_key, write_lock_uuid) + end + + super + end + + def check_lock! + raise PermissionError, 'Could not write without lock' unless write_lock_uuid + end + + def can_read! + raise IOError, 'not opened for reading' unless /(r|+)/ =~ mode + end + + def can_write! + raise IOError, 'not opened for writing' unless /(w|a)/ =~ mode + end + + def write_lock_key + "live_trace:operation:write:#{job_id}" + end + end + end + end + end + end +end diff --git a/lib/gitlab/ci/trace/chunked_file/live_trace.rb b/lib/gitlab/ci/trace/chunked_file/live_trace.rb index 264bb98ef6c..5502a6a5236 100644 --- a/lib/gitlab/ci/trace/chunked_file/live_trace.rb +++ b/lib/gitlab/ci/trace/chunked_file/live_trace.rb @@ -10,42 +10,38 @@ module Gitlab end end + after_callback :write_chunk, :stash_to_database + def initialize(job_id, mode) super(job_id, calculate_size(job_id), mode) end - def write(data) - raise NotImplementedError, 'Overwrite is not supported' unless tell == size - - super(data) do |store| - if store.filled? - # Once data is filled into redis, move the data to database - ChunkStore::Database.open(job_id, chunk_index, params_for_store) do |to_store| - to_store.write!(store.get) - store.delete! - end + def stash_to_database(store) + # Once data is filled into redis, move the data to database + if store.filled? && + ChunkStore::Database.open(job_id, chunk_index, params_for_store) do |to_store| + to_store.write!(store.get) + store.delete! end end end + # Efficient process than iterating each def truncate(offset) - super(offset) do |store| - next if chunk_index == 0 - - prev_chunk_index = chunk_index - 1 - - if ChunkStore::Database.exist?(job_id, prev_chunk_index) - # Swap data from Database to Redis to truncate any size than buffer_size - ChunkStore::Database.open(job_id, prev_chunk_index, params_for_store(prev_chunk_index)) do |from_store| - ChunkStore::Redis.open(job_id, prev_chunk_index, params_for_store(prev_chunk_index)) do |to_store| - to_store.write!(from_store.get) - from_store.delete! - end - end - end + if truncate == 0 + self.delete_all(job_id) + elsif offset == size + # no-op + else + raise NotImplementedError, 'Unexpected operation' end end + def delete + ChunkStores::Redis.delete_all(job_id) + ChunkStores::Database.delete_all(job_id) + end + private def calculate_size(job_id) |