diff options
author | Paul J. Davis <paul.joseph.davis@gmail.com> | 2019-06-06 13:30:01 -0500 |
---|---|---|
committer | Paul J. Davis <paul.joseph.davis@gmail.com> | 2019-06-06 13:32:22 -0500 |
commit | 6835e18e9db23833ae178488efdb42edc9aecc0d (patch) | |
tree | 5b37be42b47d6774b5a18baebea791fccac359d4 | |
parent | e0b2dc16e2463969eaf7e715b88e130216fba6bb (diff) | |
download | couchdb-6835e18e9db23833ae178488efdb42edc9aecc0d.tar.gz |
Implement attachment compression
This still holds all attachment data in RAM which we'll have to revisit
at some point.
-rw-r--r-- | src/couch/src/couch_att.erl | 109 | ||||
-rw-r--r-- | test/elixir/test/replication_test.exs | 7 |
2 files changed, 77 insertions, 39 deletions
diff --git a/src/couch/src/couch_att.erl b/src/couch/src/couch_att.erl index 0dc5fa56b..90d364441 100644 --- a/src/couch/src/couch_att.erl +++ b/src/couch/src/couch_att.erl @@ -383,8 +383,8 @@ flush(Db, DocId, Att1) -> % If we were sent a gzip'ed attachment with no % length data, we have to set it here. - Att3 = case AttLen of - undefined -> store(att_len, DiskLen, Att2); + Att3 = case DiskLen of + undefined -> store(disk_len, AttLen, Att2); _ -> Att2 end, @@ -400,12 +400,13 @@ flush(Db, DocId, Att1) -> % Already flushed Att1; _ when is_binary(Data) -> - IdentMd5 = get_identity_md5(Data, fetch(encoding, Att4)), + DataMd5 = couch_hash:md5_hash(Data), if ReqMd5 == undefined -> ok; true -> - couch_util:check_md5(IdentMd5, ReqMd5) + couch_util:check_md5(DataMd5, ReqMd5) end, - Att5 = store(md5, IdentMd5, Att4), - fabric2_db:write_attachment(Db, DocId, Att5) + Att5 = store(md5, DataMd5, Att4), + Att6 = maybe_compress(Att5), + fabric2_db:write_attachment(Db, DocId, Att6) end. @@ -451,7 +452,7 @@ read_data(Fun, Att) when is_function(Fun) -> end, Props0 = [ {data, iolist_to_binary(lists:reverse(Acc))}, - {disk_len, Len} + {att_len, Len} ], Props1 = if InMd5 /= md5_in_footer -> Props0; true -> [{md5, Md5} | Props0] @@ -473,7 +474,7 @@ read_streamed_attachment(Att, _F, 0, Acc) -> Bin = iolist_to_binary(lists:reverse(Acc)), store([ {data, Bin}, - {disk_len, size(Bin)} + {att_len, size(Bin)} ], Att); read_streamed_attachment(_Att, _F, LenLeft, _Acc) when LenLeft < 0 -> @@ -550,8 +551,23 @@ range_foldl(Att, From, To, Fun, Acc) -> range_foldl(Bin, From, To, Fun, Acc). -foldl_decode(_Att, _Fun, _Acc) -> - erlang:error(not_supported). +foldl_decode(Att, Fun, Acc) -> + [Encoding, Data] = fetch([encoding, data], Att), + case {Encoding, Data} of + {gzip, {loc, Db, DocId, AttId}} -> + NoTxDb = Db#{tx := undefined}, + Bin = fabric2_db:read_attachment(NoTxDb, DocId, AttId), + foldl_decode(store(data, Bin, Att), Fun, Acc); + {gzip, _} when is_binary(Data) -> + Z = zlib:open(), + ok = zlib:inflateInit(Z, 16 + 15), + Inflated = iolist_to_binary(zlib:inflate(Z, Data)), + ok = zlib:inflateEnd(Z), + ok = zlib:close(Z), + foldl(Inflated, Att, Fun, Acc); + _ -> + foldl(Att, Fun, Acc) + end. to_binary(Att) -> @@ -563,7 +579,8 @@ to_binary(Bin, _Att) when is_binary(Bin) -> to_binary(Iolist, _Att) when is_list(Iolist) -> iolist_to_binary(Iolist); to_binary({loc, Db, DocId, AttId}, _Att) -> - fabric2_db:read_attachmet(Db, DocId, AttId); + NoTxDb = Db#{tx := undefined}, + fabric2_db:read_attachment(NoTxDb, DocId, AttId); to_binary(DataFun, Att) when is_function(DataFun)-> Len = fetch(att_len, Att), iolist_to_binary( @@ -585,15 +602,53 @@ fold_streamed_data(RcvFun, LenLeft, Fun, Acc) when LenLeft > 0-> fold_streamed_data(RcvFun, LenLeft - size(Bin), Fun, ResultAcc). -get_identity_md5(Bin, gzip) -> +maybe_compress(Att) -> + [Encoding, Type] = fetch([encoding, type], Att), + IsCompressible = is_compressible(Type), + CompLevel = config:get_integer("attachments", "compression_level", 0), + case Encoding of + identity when IsCompressible, CompLevel >= 1, CompLevel =< 9 -> + compress(Att, CompLevel); + _ -> + Att + end. + + +compress(Att, Level) -> + Data = fetch(data, Att), + Z = zlib:open(), - ok = zlib:inflateInit(Z, 16 + 15), - Inflated = zlib:inflate(Z, Bin), - ok = zlib:inflateEnd(Z), + % 15 = ?MAX_WBITS (defined in the zlib module) + % the 16 + ?MAX_WBITS formula was obtained by inspecting zlib:gzip/1 + ok = zlib:deflateInit(Z, Level, deflated, 16 + 15, 8, default), + CompData = iolist_to_binary(zlib:deflate(Z, Data, finish)), + ok = zlib:deflateEnd(Z), ok = zlib:close(Z), - couch_hash:md5_hash(Inflated); -get_identity_md5(Bin, _) -> - couch_hash:md5_hash(Bin). + + store([ + {att_len, size(CompData)}, + {md5, couch_hash:md5_hash(CompData)}, + {data, CompData}, + {encoding, gzip} + ], Att). + + +is_compressible(Type) when is_binary(Type) -> + is_compressible(binary_to_list(Type)); +is_compressible(Type) -> + TypeExpList = re:split( + config:get("attachments", "compressible_types", ""), + "\\s*,\\s*", + [{return, list}] + ), + lists:any( + fun(TypeExp) -> + Regexp = ["^\\s*", re:replace(TypeExp, "\\*", ".*"), + "(?:\\s*;.*?)?\\s*", $$], + re:run(Type, Regexp, [caseless]) =/= nomatch + end, + [T || T <- TypeExpList, T /= []] + ). max_attachment_size() -> @@ -612,24 +667,6 @@ validate_attachment_size(_AttName, _AttSize, _MAxAttSize) -> ok. -%% is_compressible(Type) when is_binary(Type) -> -%% is_compressible(binary_to_list(Type)); -%% is_compressible(Type) -> -%% TypeExpList = re:split( -%% config:get("attachments", "compressible_types", ""), -%% "\\s*,\\s*", -%% [{return, list}] -%% ), -%% lists:any( -%% fun(TypeExp) -> -%% Regexp = ["^\\s*", re:replace(TypeExp, "\\*", ".*"), -%% "(?:\\s*;.*?)?\\s*", $$], -%% re:run(Type, Regexp, [caseless]) =/= nomatch -%% end, -%% [T || T <- TypeExpList, T /= []] -%% ). - - -ifdef(TEST). -include_lib("eunit/include/eunit.hrl"). diff --git a/test/elixir/test/replication_test.exs b/test/elixir/test/replication_test.exs index e98775fbd..3f0045b4e 100644 --- a/test/elixir/test/replication_test.exs +++ b/test/elixir/test/replication_test.exs @@ -717,9 +717,10 @@ defmodule ReplicationTest do assert tgt_info["doc_count"] == src_info["doc_count"] - src_shards = seq_to_shards(src_info["update_seq"]) - tgt_shards = seq_to_shards(tgt_info["update_seq"]) - assert tgt_shards == src_shards + # This assertion is no longer valid + # src_shards = seq_to_shards(src_info["update_seq"]) + # tgt_shards = seq_to_shards(tgt_info["update_seq"]) + # assert tgt_shards == src_shards end) end |