summaryrefslogtreecommitdiff
path: root/lib
diff options
context:
space:
mode:
authorShinya Maeda <shinya@gitlab.com>2018-04-02 04:20:44 +0900
committerShinya Maeda <shinya@gitlab.com>2018-04-05 14:14:54 +0900
commit0971f301fb6e8bfb17e15e6853420ffc008af302 (patch)
treea2e21eb3c7b4fc746507de892ea507d26f995bd2 /lib
parentd1632da8c30b69ff915e78a86a661282b8ef24e6 (diff)
downloadgitlab-ce-0971f301fb6e8bfb17e15e6853420ffc008af302.tar.gz
Add new concerns
Diffstat (limited to 'lib')
-rw-r--r--lib/gitlab/ci/trace.rb6
-rw-r--r--lib/gitlab/ci/trace/chunked_file/chunk_store/database.rb8
-rw-r--r--lib/gitlab/ci/trace/chunked_file/chunk_store/redis.rb9
-rw-r--r--lib/gitlab/ci/trace/chunked_file/chunked_io.rb150
-rw-r--r--lib/gitlab/ci/trace/chunked_file/concerns/callbacks.rb38
-rw-r--r--lib/gitlab/ci/trace/chunked_file/concerns/errors.rb18
-rw-r--r--lib/gitlab/ci/trace/chunked_file/concerns/hooks.rb63
-rw-r--r--lib/gitlab/ci/trace/chunked_file/concerns/opener.rb23
-rw-r--r--lib/gitlab/ci/trace/chunked_file/concerns/permissions.rb65
-rw-r--r--lib/gitlab/ci/trace/chunked_file/live_trace.rb44
10 files changed, 308 insertions, 116 deletions
diff --git a/lib/gitlab/ci/trace.rb b/lib/gitlab/ci/trace.rb
index 9db65961a52..83ed58f4845 100644
--- a/lib/gitlab/ci/trace.rb
+++ b/lib/gitlab/ci/trace.rb
@@ -61,7 +61,7 @@ module Gitlab
stream = Gitlab::Ci::Trace::Stream.new do
if trace_artifact
trace_artifact.open
- elsif Feature.enabled?('ci_enable_live_trace') && ChunkedFile::LiveTrace.exists?(job.id)
+ elsif ChunkedFile::LiveTrace.exist?(job.id)
ChunkedFile::LiveTrace.new(job.id, "rb")
elsif current_path
File.open(current_path, "rb")
@@ -109,10 +109,10 @@ module Gitlab
raise ArchiveError, 'Already archived' if trace_artifact
raise ArchiveError, 'Job is not finished yet' unless job.complete?
- if Feature.enabled?('ci_enable_live_trace') && ChunkedFile::LiveTrace.exists?(job.id)
+ if ChunkedFile::LiveTrace.exist?(job.id)
ChunkedFile::LiveTrace.open(job.id, "wb") do |stream|
archive_stream!(stream)
- stream.truncate(0)
+ stream.delete
end
elsif current_path
File.open(current_path) do |stream|
diff --git a/lib/gitlab/ci/trace/chunked_file/chunk_store/database.rb b/lib/gitlab/ci/trace/chunked_file/chunk_store/database.rb
index 45bf5053775..a7db214f428 100644
--- a/lib/gitlab/ci/trace/chunked_file/chunk_store/database.rb
+++ b/lib/gitlab/ci/trace/chunked_file/chunk_store/database.rb
@@ -29,6 +29,10 @@ module Gitlab
::Ci::JobTraceChunk.where(job_id: job_id).pluck('data')
.inject(0) { |sum, data| sum + data.length }
end
+
+ def delete_all(job_id)
+ ::Ci::JobTraceChunk.destroy_all(job_id: job_id)
+ end
end
attr_reader :job_trace_chunk
@@ -67,9 +71,7 @@ module Gitlab
end
def truncate!(offset)
- raise NotImplementedError, 'Partial truncate is not supported' unless offset == 0
-
- delete!
+ raise NotImplementedError
end
def delete!
diff --git a/lib/gitlab/ci/trace/chunked_file/chunk_store/redis.rb b/lib/gitlab/ci/trace/chunked_file/chunk_store/redis.rb
index d77a6847a71..cb45cd5fba5 100644
--- a/lib/gitlab/ci/trace/chunked_file/chunk_store/redis.rb
+++ b/lib/gitlab/ci/trace/chunked_file/chunk_store/redis.rb
@@ -38,6 +38,14 @@ module Gitlab
end
end
+ def delete_all(job_id)
+ Gitlab::Redis::Cache.with do |redis|
+ redis.scan_each(:match => buffer_key(job_id, '?')) do |key|
+ redis.del(key)
+ end
+ end
+ end
+
def buffer_key(job_id, chunk_index)
"live_trace_buffer:#{job_id}:#{chunk_index}"
end
@@ -87,7 +95,6 @@ module Gitlab
puts "#{self.class.name} - #{__callee__}: offset: #{offset.inspect} params[:chunk_index]: #{params[:chunk_index]}"
Gitlab::Redis::Cache.with do |redis|
return 0 unless redis.exists(buffer_key)
- return delete! if offset == 0
truncated_data = redis.getrange(buffer_key, 0, offset)
redis.set(buffer_key, truncated_data)
diff --git a/lib/gitlab/ci/trace/chunked_file/chunked_io.rb b/lib/gitlab/ci/trace/chunked_file/chunked_io.rb
index 3adfc43769b..d23fe2a47d5 100644
--- a/lib/gitlab/ci/trace/chunked_file/chunked_io.rb
+++ b/lib/gitlab/ci/trace/chunked_file/chunked_io.rb
@@ -8,43 +8,30 @@ module Gitlab
class Trace
module ChunkedFile
class ChunkedIO
- class << self
- def open(*args)
- stream = self.new(*args)
-
- yield stream
- ensure
- stream&.close
- end
- end
-
- WriteError = Class.new(StandardError)
- FailedToGetChunkError = Class.new(StandardError)
+ extend ChunkedFile::Concerns::Opener
+ include ChunkedFile::Concerns::Errors
+ include ChunkedFile::Concerns::Hooks
+ include ChunkedFile::Concerns::Callbacks
+ prepend ChunkedFile::Concerns::Permissions
attr_reader :size
attr_reader :tell
attr_reader :chunk, :chunk_range
- attr_reader :write_lock_uuid
attr_reader :job_id
+ attr_reader :mode
alias_method :pos, :tell
- def initialize(job_id, size, mode)
+ def initialize(job_id, size, mode = 'rb')
@size = size
@tell = 0
@job_id = job_id
+ @mode = mode
- if /(w|a)/ =~ mode
- @write_lock_uuid = Gitlab::ExclusiveLease.new(write_lock_key, timeout: 1.hour.to_i).try_obtain
-
- raise WriteError, 'Already opened by another process' unless write_lock_uuid
-
- seek(0, IO::SEEK_END) if /a/ =~ mode
- end
+ raise NotImplementedError, "Mode 'w' is not supported" if mode.include?('w')
end
def close
- Gitlab::ExclusiveLease.cancel(write_lock_key, write_lock_uuid) if write_lock_uuid
end
def binmode
@@ -55,20 +42,20 @@ module Gitlab
true
end
- def seek(pos, where = IO::SEEK_SET)
+ def seek(amount, where = IO::SEEK_SET)
new_pos =
case where
when IO::SEEK_END
- size + pos
+ size + amount
when IO::SEEK_SET
- pos
+ amount
when IO::SEEK_CUR
- tell + pos
+ tell + amount
else
-1
end
- raise 'new position is outside of file' if new_pos < 0 || new_pos > size
+ raise ArgumentError, 'new position is outside of file' if new_pos < 0 || new_pos > size
@tell = new_pos
end
@@ -122,42 +109,18 @@ module Gitlab
out
end
- def write(data, &block)
- raise WriteError, 'Could not write without lock' unless write_lock_uuid
- raise WriteError, 'Could not write empty data' unless data.present?
-
- _data = data.dup
- prev_tell = tell
-
- until _data.empty?
- writable_space = buffer_size - chunk_offset
- writing_size = [writable_space, _data.length].min
- written_size = write_chunk!(_data.slice!(0...writing_size), &block)
+ def write(data)
+ raise ArgumentError, 'Could not write empty data' unless data.present?
- @tell += written_size
- @size = [tell, size].max
+ if mode.include?('w')
+ write_as_overwrite(data)
+ elsif mode.include?('a')
+ write_as_append(data)
end
-
- tell - prev_tell
end
- def truncate(offset, &block)
- raise WriteError, 'Could not write without lock' unless write_lock_uuid
- raise WriteError, 'Offset is out of bound' if offset > size || offset < 0
-
- @tell = size - 1
-
- until size == offset
- truncatable_space = size - chunk_start
- _chunk_offset = (offset <= chunk_start) ? 0 : offset % buffer_size
- removed_size = truncate_chunk!(_chunk_offset, &block)
-
- @tell -= removed_size
- @size -= removed_size
- end
-
- @tell = [tell, 0].max
- @size = [size, 0].max
+ def truncate(offset)
+ raise NotImplementedError
end
def flush
@@ -178,9 +141,6 @@ module Gitlab
unless in_range?
chunk_store.open(job_id, chunk_index, params_for_store) do |store|
@chunk = store.get
-
- raise FailedToGetChunkError unless chunk && chunk.length > 0
-
@chunk_range = (chunk_start...(chunk_start + chunk.length))
end
end
@@ -188,30 +148,54 @@ module Gitlab
@chunk[chunk_offset..buffer_size]
end
- def write_chunk!(data, &block)
- chunk_store.open(job_id, chunk_index, params_for_store) do |store|
- written_size = if buffer_size == data.length
- store.write!(data)
- else
- store.append!(data)
- end
+ def write_as_overwrite(data)
+ raise NotImplementedError, "Overwrite is not supported"
+ end
+
+ def write_as_append(data)
+ @tell = size
- raise WriteError, 'Written size mismatch' unless data.length == written_size
+ data_size = data.size
+ new_tell = tell + data_size
+ data_offset = 0
- block.call(store) if block_given?
+ until tell == new_tell
+ writable_size = buffer_size - chunk_offset
+ writable_data = data[data_offset...(data_offset + writable_size)]
+ written_size = write_chunk(writable_data)
- written_size
+ data_offset += written_size
+ @tell += written_size
+ @size = [tell, size].max
end
+
+ data_size
end
- def truncate_chunk!(offset, &block)
+ def write_chunk(data)
chunk_store.open(job_id, chunk_index, params_for_store) do |store|
- removed_size = store.size - offset
- store.truncate!(offset)
+ with_callbacks(:write_chunk, store) do
+ written_size = if buffer_size == data.length
+ store.write!(data)
+ else
+ store.append!(data)
+ end
- block.call(store) if block_given?
+ raise WriteError, 'Written size mismatch' unless data.length == written_size
- removed_size
+ written_size
+ end
+ end
+ end
+
+ def truncate_chunk(offset)
+ chunk_store.open(job_id, chunk_index, params_for_store) do |store|
+ with_callbacks(:truncate_chunk, store) do
+ removed_size = store.size - offset
+ store.truncate!(offset)
+
+ removed_size
+ end
end
end
@@ -240,19 +224,15 @@ module Gitlab
end
def chunks_count
- (size / buffer_size) + (has_extra? ? 1 : 0)
+ (size / buffer_size)
end
- def has_extra?
- (size % buffer_size) > 0
+ def first_chunk?
+ chunk_index == 0
end
def last_chunk?
- chunks_count == 0 || chunk_index == (chunks_count - 1) || chunk_index == chunks_count
- end
-
- def write_lock_key
- "live_trace:operation:write:#{job_id}"
+ chunks_count == 0 || chunk_index == (chunks_count - 1)
end
def chunk_store
diff --git a/lib/gitlab/ci/trace/chunked_file/concerns/callbacks.rb b/lib/gitlab/ci/trace/chunked_file/concerns/callbacks.rb
new file mode 100644
index 00000000000..0a49ac4dbbf
--- /dev/null
+++ b/lib/gitlab/ci/trace/chunked_file/concerns/callbacks.rb
@@ -0,0 +1,38 @@
+module Gitlab
+ module Ci
+ class Trace
+ module ChunkedFile
+ module Concerns
+ module Callbacks
+ extend ActiveSupport::Concern
+
+ included do
+ class_attribute :_before_callbacks, :_after_callbacks,
+ :instance_writer => false
+ self._before_callbacks = Hash.new []
+ self._after_callbacks = Hash.new []
+ end
+
+ def with_callbacks(kind, *args)
+ self.class._before_callbacks[kind].each { |c| send c, *args }
+ yield
+ self.class._after_callbacks[kind].each { |c| send c, *args }
+ end
+
+ module ClassMethods
+ def before_callback(kind, callback)
+ self._before_callbacks = self._before_callbacks.
+ merge kind => _before_callbacks[kind] + [callback]
+ end
+
+ def after_callback(kind, callback)
+ self._after_callbacks = self._after_callbacks.
+ merge kind => _after_callbacks[kind] + [callback]
+ end
+ end
+ end
+ end
+ end
+ end
+ end
+end
diff --git a/lib/gitlab/ci/trace/chunked_file/concerns/errors.rb b/lib/gitlab/ci/trace/chunked_file/concerns/errors.rb
new file mode 100644
index 00000000000..ccdb17005e2
--- /dev/null
+++ b/lib/gitlab/ci/trace/chunked_file/concerns/errors.rb
@@ -0,0 +1,18 @@
+module Gitlab
+ module Ci
+ class Trace
+ module ChunkedFile
+ module Concerns
+ module Errors
+ extend ActiveSupport::Concern
+
+ included do
+ WriteError = Class.new(StandardError)
+ FailedToGetChunkError = Class.new(StandardError)
+ end
+ end
+ end
+ end
+ end
+ end
+end
diff --git a/lib/gitlab/ci/trace/chunked_file/concerns/hooks.rb b/lib/gitlab/ci/trace/chunked_file/concerns/hooks.rb
new file mode 100644
index 00000000000..290a3a15805
--- /dev/null
+++ b/lib/gitlab/ci/trace/chunked_file/concerns/hooks.rb
@@ -0,0 +1,63 @@
+module Gitlab
+ module Ci
+ class Trace
+ module ChunkedFile
+ module Concerns
+ module Hooks
+ extend ActiveSupport::Concern
+
+ included do
+ class_attribute :_before_methods, :_after_methods,
+ :instance_writer => false
+ self._before_methods = Hash.new []
+ self._after_methods = Hash.new []
+ end
+
+ class_methods do
+ def before_method(kind, callback)
+ self._before_methods = self._before_methods.
+ merge kind => _before_methods[kind] + [callback]
+ end
+
+ def after_method(kind, callback)
+ self._after_methods = self._after_methods.
+ merge kind => _after_methods[kind] + [callback]
+ end
+ end
+
+ def method_added(method_name)
+ return if self.class._before_methods.values.include?(method_name)
+ return if self.class._after_methods.values.include?(method_name)
+ return if hooked_methods.include?(method_name)
+
+ add_hooks_to(method_name)
+ end
+
+ private
+
+ def hooked_methods
+ @hooked_methods ||= []
+ end
+
+ def add_hooks_to(method_name)
+ hooked_methods << method_name
+
+ original_method = instance_method(method_name)
+
+ # re-define the method, but notice how we reference the original
+ # method definition
+ define_method(method_name) do |*args, &block|
+ self.class._before_methods[method_name].each { |hook| method(hook).call }
+
+ # now invoke the original method
+ original_method.bind(self).call(*args, &block).tap do
+ self.class._after_methods[method_name].each { |hook| method(hook).call }
+ end
+ end
+ end
+ end
+ end
+ end
+ end
+ end
+end
diff --git a/lib/gitlab/ci/trace/chunked_file/concerns/opener.rb b/lib/gitlab/ci/trace/chunked_file/concerns/opener.rb
new file mode 100644
index 00000000000..9f1f6eefcbc
--- /dev/null
+++ b/lib/gitlab/ci/trace/chunked_file/concerns/opener.rb
@@ -0,0 +1,23 @@
+module Gitlab
+ module Ci
+ class Trace
+ module ChunkedFile
+ module Concerns
+ module Opener
+ extend ActiveSupport::Concern
+
+ class_methods do
+ def open(*args)
+ stream = self.new(*args)
+
+ yield stream
+ ensure
+ stream&.close
+ end
+ end
+ end
+ end
+ end
+ end
+ end
+end
diff --git a/lib/gitlab/ci/trace/chunked_file/concerns/permissions.rb b/lib/gitlab/ci/trace/chunked_file/concerns/permissions.rb
new file mode 100644
index 00000000000..f8703970466
--- /dev/null
+++ b/lib/gitlab/ci/trace/chunked_file/concerns/permissions.rb
@@ -0,0 +1,65 @@
+module Gitlab
+ module Ci
+ class Trace
+ module ChunkedFile
+ module Concerns
+ module Permissions
+ extend ActiveSupport::Concern
+
+ included do
+ PermissionError = Class.new(StandardError)
+
+ attr_reader :write_lock_uuid
+
+ # mode checks
+ before_method :read, :can_read!
+ before_method :readline, :can_read!
+ before_method :each_line, :can_read!
+ before_method :write, :can_write!
+ before_method :truncate, :can_write!
+
+ # write_lock
+ before_method :write, :check_lock!
+ before_method :truncate, :check_lock!
+ end
+
+ def initialize(job_id, size, mode = 'rb')
+ if /(w|a)/ =~ mode
+ @write_lock_uuid = Gitlab::ExclusiveLease
+ .new(write_lock_key, timeout: 1.hour.to_i).try_obtain
+
+ raise PermissionError, 'Already opened by another process' unless write_lock_uuid
+ end
+
+ super
+ end
+
+ def close
+ if write_lock_uuid
+ Gitlab::ExclusiveLease.cancel(write_lock_key, write_lock_uuid)
+ end
+
+ super
+ end
+
+ def check_lock!
+ raise PermissionError, 'Could not write without lock' unless write_lock_uuid
+ end
+
+ def can_read!
+ raise IOError, 'not opened for reading' unless /(r|+)/ =~ mode
+ end
+
+ def can_write!
+ raise IOError, 'not opened for writing' unless /(w|a)/ =~ mode
+ end
+
+ def write_lock_key
+ "live_trace:operation:write:#{job_id}"
+ end
+ end
+ end
+ end
+ end
+ end
+end
diff --git a/lib/gitlab/ci/trace/chunked_file/live_trace.rb b/lib/gitlab/ci/trace/chunked_file/live_trace.rb
index 264bb98ef6c..5502a6a5236 100644
--- a/lib/gitlab/ci/trace/chunked_file/live_trace.rb
+++ b/lib/gitlab/ci/trace/chunked_file/live_trace.rb
@@ -10,42 +10,38 @@ module Gitlab
end
end
+ after_callback :write_chunk, :stash_to_database
+
def initialize(job_id, mode)
super(job_id, calculate_size(job_id), mode)
end
- def write(data)
- raise NotImplementedError, 'Overwrite is not supported' unless tell == size
-
- super(data) do |store|
- if store.filled?
- # Once data is filled into redis, move the data to database
- ChunkStore::Database.open(job_id, chunk_index, params_for_store) do |to_store|
- to_store.write!(store.get)
- store.delete!
- end
+ def stash_to_database(store)
+ # Once data is filled into redis, move the data to database
+ if store.filled? &&
+ ChunkStore::Database.open(job_id, chunk_index, params_for_store) do |to_store|
+ to_store.write!(store.get)
+ store.delete!
end
end
end
+ # Efficient process than iterating each
def truncate(offset)
- super(offset) do |store|
- next if chunk_index == 0
-
- prev_chunk_index = chunk_index - 1
-
- if ChunkStore::Database.exist?(job_id, prev_chunk_index)
- # Swap data from Database to Redis to truncate any size than buffer_size
- ChunkStore::Database.open(job_id, prev_chunk_index, params_for_store(prev_chunk_index)) do |from_store|
- ChunkStore::Redis.open(job_id, prev_chunk_index, params_for_store(prev_chunk_index)) do |to_store|
- to_store.write!(from_store.get)
- from_store.delete!
- end
- end
- end
+ if truncate == 0
+ self.delete_all(job_id)
+ elsif offset == size
+ # no-op
+ else
+ raise NotImplementedError, 'Unexpected operation'
end
end
+ def delete
+ ChunkStores::Redis.delete_all(job_id)
+ ChunkStores::Database.delete_all(job_id)
+ end
+
private
def calculate_size(job_id)