summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPaul J. Davis <paul.joseph.davis@gmail.com>2019-06-06 13:30:01 -0500
committerPaul J. Davis <paul.joseph.davis@gmail.com>2019-06-06 13:32:22 -0500
commit6835e18e9db23833ae178488efdb42edc9aecc0d (patch)
tree5b37be42b47d6774b5a18baebea791fccac359d4
parente0b2dc16e2463969eaf7e715b88e130216fba6bb (diff)
downloadcouchdb-6835e18e9db23833ae178488efdb42edc9aecc0d.tar.gz
Implement attachment compression
This still holds all attachment data in RAM which we'll have to revisit at some point.
-rw-r--r--src/couch/src/couch_att.erl109
-rw-r--r--test/elixir/test/replication_test.exs7
2 files changed, 77 insertions, 39 deletions
diff --git a/src/couch/src/couch_att.erl b/src/couch/src/couch_att.erl
index 0dc5fa56b..90d364441 100644
--- a/src/couch/src/couch_att.erl
+++ b/src/couch/src/couch_att.erl
@@ -383,8 +383,8 @@ flush(Db, DocId, Att1) ->
% If we were sent a gzip'ed attachment with no
% length data, we have to set it here.
- Att3 = case AttLen of
- undefined -> store(att_len, DiskLen, Att2);
+ Att3 = case DiskLen of
+ undefined -> store(disk_len, AttLen, Att2);
_ -> Att2
end,
@@ -400,12 +400,13 @@ flush(Db, DocId, Att1) ->
% Already flushed
Att1;
_ when is_binary(Data) ->
- IdentMd5 = get_identity_md5(Data, fetch(encoding, Att4)),
+ DataMd5 = couch_hash:md5_hash(Data),
if ReqMd5 == undefined -> ok; true ->
- couch_util:check_md5(IdentMd5, ReqMd5)
+ couch_util:check_md5(DataMd5, ReqMd5)
end,
- Att5 = store(md5, IdentMd5, Att4),
- fabric2_db:write_attachment(Db, DocId, Att5)
+ Att5 = store(md5, DataMd5, Att4),
+ Att6 = maybe_compress(Att5),
+ fabric2_db:write_attachment(Db, DocId, Att6)
end.
@@ -451,7 +452,7 @@ read_data(Fun, Att) when is_function(Fun) ->
end,
Props0 = [
{data, iolist_to_binary(lists:reverse(Acc))},
- {disk_len, Len}
+ {att_len, Len}
],
Props1 = if InMd5 /= md5_in_footer -> Props0; true ->
[{md5, Md5} | Props0]
@@ -473,7 +474,7 @@ read_streamed_attachment(Att, _F, 0, Acc) ->
Bin = iolist_to_binary(lists:reverse(Acc)),
store([
{data, Bin},
- {disk_len, size(Bin)}
+ {att_len, size(Bin)}
], Att);
read_streamed_attachment(_Att, _F, LenLeft, _Acc) when LenLeft < 0 ->
@@ -550,8 +551,23 @@ range_foldl(Att, From, To, Fun, Acc) ->
range_foldl(Bin, From, To, Fun, Acc).
-foldl_decode(_Att, _Fun, _Acc) ->
- erlang:error(not_supported).
+foldl_decode(Att, Fun, Acc) ->
+ [Encoding, Data] = fetch([encoding, data], Att),
+ case {Encoding, Data} of
+ {gzip, {loc, Db, DocId, AttId}} ->
+ NoTxDb = Db#{tx := undefined},
+ Bin = fabric2_db:read_attachment(NoTxDb, DocId, AttId),
+ foldl_decode(store(data, Bin, Att), Fun, Acc);
+ {gzip, _} when is_binary(Data) ->
+ Z = zlib:open(),
+ ok = zlib:inflateInit(Z, 16 + 15),
+ Inflated = iolist_to_binary(zlib:inflate(Z, Data)),
+ ok = zlib:inflateEnd(Z),
+ ok = zlib:close(Z),
+ foldl(Inflated, Att, Fun, Acc);
+ _ ->
+ foldl(Att, Fun, Acc)
+ end.
to_binary(Att) ->
@@ -563,7 +579,8 @@ to_binary(Bin, _Att) when is_binary(Bin) ->
to_binary(Iolist, _Att) when is_list(Iolist) ->
iolist_to_binary(Iolist);
to_binary({loc, Db, DocId, AttId}, _Att) ->
- fabric2_db:read_attachmet(Db, DocId, AttId);
+ NoTxDb = Db#{tx := undefined},
+ fabric2_db:read_attachment(NoTxDb, DocId, AttId);
to_binary(DataFun, Att) when is_function(DataFun)->
Len = fetch(att_len, Att),
iolist_to_binary(
@@ -585,15 +602,53 @@ fold_streamed_data(RcvFun, LenLeft, Fun, Acc) when LenLeft > 0->
fold_streamed_data(RcvFun, LenLeft - size(Bin), Fun, ResultAcc).
-get_identity_md5(Bin, gzip) ->
+maybe_compress(Att) ->
+ [Encoding, Type] = fetch([encoding, type], Att),
+ IsCompressible = is_compressible(Type),
+ CompLevel = config:get_integer("attachments", "compression_level", 0),
+ case Encoding of
+ identity when IsCompressible, CompLevel >= 1, CompLevel =< 9 ->
+ compress(Att, CompLevel);
+ _ ->
+ Att
+ end.
+
+
+compress(Att, Level) ->
+ Data = fetch(data, Att),
+
Z = zlib:open(),
- ok = zlib:inflateInit(Z, 16 + 15),
- Inflated = zlib:inflate(Z, Bin),
- ok = zlib:inflateEnd(Z),
+ % 15 = ?MAX_WBITS (defined in the zlib module)
+ % the 16 + ?MAX_WBITS formula was obtained by inspecting zlib:gzip/1
+ ok = zlib:deflateInit(Z, Level, deflated, 16 + 15, 8, default),
+ CompData = iolist_to_binary(zlib:deflate(Z, Data, finish)),
+ ok = zlib:deflateEnd(Z),
ok = zlib:close(Z),
- couch_hash:md5_hash(Inflated);
-get_identity_md5(Bin, _) ->
- couch_hash:md5_hash(Bin).
+
+ store([
+ {att_len, size(CompData)},
+ {md5, couch_hash:md5_hash(CompData)},
+ {data, CompData},
+ {encoding, gzip}
+ ], Att).
+
+
+is_compressible(Type) when is_binary(Type) ->
+ is_compressible(binary_to_list(Type));
+is_compressible(Type) ->
+ TypeExpList = re:split(
+ config:get("attachments", "compressible_types", ""),
+ "\\s*,\\s*",
+ [{return, list}]
+ ),
+ lists:any(
+ fun(TypeExp) ->
+ Regexp = ["^\\s*", re:replace(TypeExp, "\\*", ".*"),
+ "(?:\\s*;.*?)?\\s*", $$],
+ re:run(Type, Regexp, [caseless]) =/= nomatch
+ end,
+ [T || T <- TypeExpList, T /= []]
+ ).
max_attachment_size() ->
@@ -612,24 +667,6 @@ validate_attachment_size(_AttName, _AttSize, _MAxAttSize) ->
ok.
-%% is_compressible(Type) when is_binary(Type) ->
-%% is_compressible(binary_to_list(Type));
-%% is_compressible(Type) ->
-%% TypeExpList = re:split(
-%% config:get("attachments", "compressible_types", ""),
-%% "\\s*,\\s*",
-%% [{return, list}]
-%% ),
-%% lists:any(
-%% fun(TypeExp) ->
-%% Regexp = ["^\\s*", re:replace(TypeExp, "\\*", ".*"),
-%% "(?:\\s*;.*?)?\\s*", $$],
-%% re:run(Type, Regexp, [caseless]) =/= nomatch
-%% end,
-%% [T || T <- TypeExpList, T /= []]
-%% ).
-
-
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
diff --git a/test/elixir/test/replication_test.exs b/test/elixir/test/replication_test.exs
index e98775fbd..3f0045b4e 100644
--- a/test/elixir/test/replication_test.exs
+++ b/test/elixir/test/replication_test.exs
@@ -717,9 +717,10 @@ defmodule ReplicationTest do
assert tgt_info["doc_count"] == src_info["doc_count"]
- src_shards = seq_to_shards(src_info["update_seq"])
- tgt_shards = seq_to_shards(tgt_info["update_seq"])
- assert tgt_shards == src_shards
+ # This assertion is no longer valid
+ # src_shards = seq_to_shards(src_info["update_seq"])
+ # tgt_shards = seq_to_shards(tgt_info["update_seq"])
+ # assert tgt_shards == src_shards
end)
end