diff options
author | Paul J. Davis <paul.joseph.davis@gmail.com> | 2019-06-05 13:29:33 -0500 |
---|---|---|
committer | Paul J. Davis <paul.joseph.davis@gmail.com> | 2019-06-05 13:53:37 -0500 |
commit | ee2e4c8cd70f833f04c4504f6664e241fad5061f (patch) | |
tree | e9fac9739d67e05ccf4a1ac2c3d34c2526c1a996 | |
parent | d7b015c006eed95d3ad80a2c1daadd94503dfd2d (diff) | |
download | couchdb-ee2e4c8cd70f833f04c4504f6664e241fad5061f.tar.gz |
Initial fabric2 implementation on FoundationDB
This provides a base implementation of a fabric API backed by
FoundationDB. While a lot of functionality is provided there are a
number of places that still require work. An incomplete list includes:
1. Document bodies are currently a single key/value
2. Attachments are stored as a range of key/value pairs
3. There is no support for indexing
4. Request size limits are not enforced directly
5. Auth is still backed by a legacy CouchDB database
6. No support for before_doc_update/after_doc_read
7. Various implementation shortcuts need to be expanded for full API
support.
-rw-r--r-- | FDB_NOTES.md | 57 | ||||
-rw-r--r-- | src/couch/src/couch_att.erl | 661 | ||||
-rw-r--r-- | src/couch/src/couch_doc.erl | 11 | ||||
-rw-r--r-- | src/fabric/src/fabric.app.src | 8 | ||||
-rw-r--r-- | src/fabric/src/fabric2.hrl | 66 | ||||
-rw-r--r-- | src/fabric/src/fabric2_app.erl | 32 | ||||
-rw-r--r-- | src/fabric/src/fabric2_db.erl | 1299 | ||||
-rw-r--r-- | src/fabric/src/fabric2_events.erl | 84 | ||||
-rw-r--r-- | src/fabric/src/fabric2_fdb.erl | 1187 | ||||
-rw-r--r-- | src/fabric/src/fabric2_server.erl | 104 | ||||
-rw-r--r-- | src/fabric/src/fabric2_sup.erl | 47 | ||||
-rw-r--r-- | src/fabric/src/fabric2_txids.erl | 144 | ||||
-rw-r--r-- | src/fabric/src/fabric2_util.erl | 203 |
13 files changed, 3502 insertions, 401 deletions
diff --git a/FDB_NOTES.md b/FDB_NOTES.md new file mode 100644 index 000000000..c0cdc8cc2 --- /dev/null +++ b/FDB_NOTES.md @@ -0,0 +1,57 @@ +Things of Note +=== + + +1. If a replication sends us two revisions A and B where one is an + ancestor of the other, we likely have divergent behavior. However, + this should never happen In Theory. + +2. Multiple updates to the same document in a _bulk_docs (or if they + just happen to be in the same update batch in non-fdb CouchDB) + we likely have subtly different behavior. + +3. I'm relying on repeated reads in an fdb transaction to be "cheap" + in that the reads would be cached in the fdb_transaction object. + This needs to be checked for certainty but that appeared to + be how things behaved in testing. + +4. When attempting to create a doc from scratch in an interacitve_edit + update, with revisions specified *and* attachment stubs, the reported + error is now a conflict. Previously the missing_stubs error was + raised earlier. + +5. There may be a difference in behavior if a) there are no VDU functions + set on a db and no design documents in a batch. This is because in + this situation we don't run the prep_and_validate code on pre-fdb + CouchDB. The new code always checks stubs before merging revision trees. + I'm sure the old way would fail somehow, but it would fail further on + which means we may have failed with a different reason (conflict, etc) + before we got to the next place we check for missing stubs. + +6. For multi-doc updates we'll need to investigate user versions on + versionstamps within a transaction. Also this likely prevents the + ability to have multiple updates to the same doc in a single + _bulk_docs transaction + +7. Document body storage needs to be implemented beyond the single + key/value approach. + +8. We'll want to look at how we currently apply open options to individual + elements of an open_revs call. Might turn out that we have to grab a + full FDI even if we could look up a rev directly. (i.e., revs_info + would require us having the entire FDI, however it'd be wasteful to return + all of that in an open_revs call, but bug compatibility ftw!) + +9. Is it possible that a server_admin can delete a db without being able + to open it? If so that's probably changed behavior. + +10. All docs on large active databases might be a thing getting the doc + count. If we allow range requests up to 5s, and we continue to return + the doc count total we may have to play games with snapshot reads on + the doc count key or else it'll whack any _all_docs range requests + +11. Revision infos need to track their size f we want to maintain a database + size counter we'll want to store the size of a given doc body for each + revision so that we don't have to read the old body when updating the tree. + +12. Update sequences do not yet include an incarnation value.
\ No newline at end of file diff --git a/src/couch/src/couch_att.erl b/src/couch/src/couch_att.erl index a24de21d6..0dc5fa56b 100644 --- a/src/couch/src/couch_att.erl +++ b/src/couch/src/couch_att.erl @@ -29,7 +29,7 @@ -export([ size_info/1, to_disk_term/1, - from_disk_term/2 + from_disk_term/3 ]). -export([ @@ -38,7 +38,7 @@ ]). -export([ - flush/2, + flush/3, foldl/3, range_foldl/5, foldl_decode/3, @@ -46,11 +46,6 @@ ]). -export([ - upgrade/1, - downgrade/1 -]). - --export([ max_attachment_size/0, validate_attachment_size/3 ]). @@ -58,137 +53,61 @@ -compile(nowarn_deprecated_type). -export_type([att/0]). --include_lib("couch/include/couch_db.hrl"). - - -%% Legacy attachment record. This is going to be phased out by the new proplist -%% based structure. It's needed for now to allow code to perform lazy upgrades -%% while the patch is rolled out to the cluster. Attachments passed as records -%% will remain so until they are required to be represented as property lists. -%% Once this has been widely deployed, this record will be removed entirely and -%% property lists will be the main format. --record(att, { - name :: binary(), - type :: binary(), - att_len :: non_neg_integer(), - - %% length of the attachment in its identity form - %% (that is, without a content encoding applied to it) - %% differs from att_len when encoding /= identity - disk_len :: non_neg_integer(), - - md5 = <<>> :: binary(), - revpos = 0 :: non_neg_integer(), - data :: stub | follows | binary() | {any(), any()} | - {follows, pid(), reference()} | fun(() -> binary()), - - %% Encoding of the attachment - %% currently supported values are: - %% identity, gzip - %% additional values to support in the future: - %% deflate, compress - encoding = identity :: identity | gzip -}). - - -%% Extensible Attachment Type -%% -%% The following types describe the known properties for attachment fields -%% encoded as property lists to allow easier upgrades. Values not in this list -%% should be accepted at runtime but should be treated as opaque data as might -%% be used by upgraded code. If you plan on operating on new data, please add -%% an entry here as documentation. - - -%% The name of the attachment is also used as the mime-part name for file -%% downloads. These must be unique per document. --type name_prop() :: {name, binary()}. - - -%% The mime type of the attachment. This does affect compression of certain -%% attachments if the type is found to be configured as a compressable type. -%% This is commonly reserved for text/* types but could include other custom -%% cases as well. See definition and use of couch_util:compressable_att_type/1. --type type_prop() :: {type, binary()}. - - -%% The attachment length is similar to disk-length but ignores additional -%% encoding that may have occurred. --type att_len_prop() :: {att_len, non_neg_integer()}. - - -%% The size of the attachment as stored in a disk stream. --type disk_len_prop() :: {disk_len, non_neg_integer()}. - - -%% This is a digest of the original attachment data as uploaded by the client. -%% it's useful for checking validity of contents against other attachment data -%% as well as quick digest computation of the enclosing document. --type md5_prop() :: {md5, binary()}. - --type revpos_prop() :: {revpos, 0}. +-include_lib("couch/include/couch_db.hrl"). -%% This field is currently overloaded with just about everything. The -%% {any(), any()} type is just there until I have time to check the actual -%% values expected. Over time this should be split into more than one property -%% to allow simpler handling. --type data_prop() :: { - data, stub | follows | binary() | {any(), any()} | - {follows, pid(), reference()} | fun(() -> binary()) -}. +-define(CURRENT_ATT_FORMAT, 0). -%% We will occasionally compress our data. See type_prop() for more information -%% on when this happens. --type encoding_prop() :: {encoding, identity | gzip}. +-type prop_name() :: + name | + type | + att_len | + disk_len | + md5 | + revpos | + data | + encoding. --type attachment() :: [ - name_prop() | type_prop() | - att_len_prop() | disk_len_prop() | - md5_prop() | revpos_prop() | - data_prop() | encoding_prop() -]. +-type data_prop_type() :: + {loc, #{}, binary(), binary()} | + stub | + follows | + binary() | + {follows, pid(), reference()} | + fun(() -> binary()). --type disk_att_v1() :: { - Name :: binary(), - Type :: binary(), - Sp :: any(), - AttLen :: non_neg_integer(), - RevPos :: non_neg_integer(), - Md5 :: binary() -}. --type disk_att_v2() :: { - Name :: binary(), - Type :: binary(), - Sp :: any(), - AttLen :: non_neg_integer(), - DiskLen :: non_neg_integer(), - RevPos :: non_neg_integer(), - Md5 :: binary(), - Enc :: identity | gzip +-type att() :: #{ + name := binary(), + type := binary(), + att_len := non_neg_integer() | undefined, + disk_len := non_neg_integer() | undefined, + md5 := binary() | undefined, + revpos := non_neg_integer(), + data := data_prop_type(), + encoding := identity | gzip | undefined, + headers := [{binary(), binary()}] | undefined }. --type disk_att_v3() :: {Base :: tuple(), Extended :: list()}. - --type disk_att() :: disk_att_v1() | disk_att_v2() | disk_att_v3(). - --type att() :: #att{} | attachment() | disk_att(). new() -> - %% We construct a record by default for compatability. This will be - %% upgraded on demand. A subtle effect this has on all attachments - %% constructed via new is that it will pick up the proper defaults - %% from the #att record definition given above. Newer properties do - %% not support special default values and will all be treated as - %% undefined. - #att{}. + #{ + name => <<>>, + type => <<>>, + att_len => undefined, + disk_len => undefined, + md5 => undefined, + revpos => 0, + data => undefined, + encoding => undefined, + headers => undefined + }. --spec new([{atom(), any()}]) -> att(). +-spec new([{prop_name(), any()}]) -> att(). new(Props) -> store(Props, new()). @@ -197,71 +116,28 @@ new(Props) -> (atom(), att()) -> any(). fetch(Fields, Att) when is_list(Fields) -> [fetch(Field, Att) || Field <- Fields]; -fetch(Field, Att) when is_list(Att) -> - case lists:keyfind(Field, 1, Att) of - {Field, Value} -> Value; - false -> undefined - end; -fetch(name, #att{name = Name}) -> - Name; -fetch(type, #att{type = Type}) -> - Type; -fetch(att_len, #att{att_len = AttLen}) -> - AttLen; -fetch(disk_len, #att{disk_len = DiskLen}) -> - DiskLen; -fetch(md5, #att{md5 = Digest}) -> - Digest; -fetch(revpos, #att{revpos = RevPos}) -> - RevPos; -fetch(data, #att{data = Data}) -> - Data; -fetch(encoding, #att{encoding = Encoding}) -> - Encoding; -fetch(_, _) -> - undefined. +fetch(Field, Att) -> + maps:get(Field, Att). -spec store([{atom(), any()}], att()) -> att(). store(Props, Att0) -> lists:foldl(fun({Field, Value}, Att) -> - store(Field, Value, Att) + maps:update(Field, Value, Att) end, Att0, Props). --spec store(atom(), any(), att()) -> att(). -store(Field, undefined, Att) when is_list(Att) -> - lists:keydelete(Field, 1, Att); -store(Field, Value, Att) when is_list(Att) -> - lists:keystore(Field, 1, Att, {Field, Value}); -store(name, Name, Att) -> - Att#att{name = Name}; -store(type, Type, Att) -> - Att#att{type = Type}; -store(att_len, AttLen, Att) -> - Att#att{att_len = AttLen}; -store(disk_len, DiskLen, Att) -> - Att#att{disk_len = DiskLen}; -store(md5, Digest, Att) -> - Att#att{md5 = Digest}; -store(revpos, RevPos, Att) -> - Att#att{revpos = RevPos}; -store(data, Data, Att) -> - Att#att{data = Data}; -store(encoding, Encoding, Att) -> - Att#att{encoding = Encoding}; store(Field, Value, Att) -> - store(Field, Value, upgrade(Att)). + maps:update(Field, Value, Att). -spec transform(atom(), fun(), att()) -> att(). transform(Field, Fun, Att) -> - NewValue = Fun(fetch(Field, Att)), - store(Field, NewValue, Att). + maps:update_with(Field, Fun, Att). -is_stub(Att) -> - stub == fetch(data, Att). +is_stub(#{data := stub}) -> true; +is_stub(#{}) -> false. %% merge_stubs takes all stub attachments and replaces them with on disk @@ -275,8 +151,7 @@ merge_stubs(MemAtts, DiskAtts) -> merge_stubs(MemAtts, OnDisk, []). -%% restore spec when R14 support is dropped -%% -spec merge_stubs([att()], dict:dict(), [att()]) -> [att()]. +-spec merge_stubs([att()], dict:dict(), [att()]) -> [att()]. merge_stubs([Att | Rest], OnDisk, Merged) -> case fetch(data, Att) of stub -> @@ -308,14 +183,8 @@ size_info([]) -> {ok, []}; size_info(Atts) -> Info = lists:map(fun(Att) -> - AttLen = fetch(att_len, Att), - case fetch(data, Att) of - {stream, StreamEngine} -> - {ok, SPos} = couch_stream:to_disk_term(StreamEngine), - {SPos, AttLen}; - {_, SPos} -> - {SPos, AttLen} - end + [{loc, _Db, _DocId, AttId}, AttLen] = fetch([data, att_len], Att), + {AttId, AttLen} end, Atts), {ok, lists:usort(Info)}. @@ -324,89 +193,44 @@ size_info(Atts) -> %% old format when possible. This should help make the attachment lazy upgrade %% as safe as possible, avoiding the need for complicated disk versioning %% schemes. -to_disk_term(#att{} = Att) -> - {stream, StreamEngine} = fetch(data, Att), - {ok, Sp} = couch_stream:to_disk_term(StreamEngine), - { +to_disk_term(Att) -> + {loc, #{}, _DocId, AttId} = fetch(data, Att), + {?CURRENT_ATT_FORMAT, { fetch(name, Att), fetch(type, Att), - Sp, + AttId, fetch(att_len, Att), fetch(disk_len, Att), fetch(revpos, Att), fetch(md5, Att), - fetch(encoding, Att) - }; -to_disk_term(Att) -> - BaseProps = [name, type, data, att_len, disk_len, revpos, md5, encoding], - {Extended, Base} = lists:foldl( - fun - (data, {Props, Values}) -> - case lists:keytake(data, 1, Props) of - {value, {_, {stream, StreamEngine}}, Other} -> - {ok, Sp} = couch_stream:to_disk_term(StreamEngine), - {Other, [Sp | Values]}; - {value, {_, Value}, Other} -> - {Other, [Value | Values]}; - false -> - {Props, [undefined | Values]} - end; - (Key, {Props, Values}) -> - case lists:keytake(Key, 1, Props) of - {value, {_, Value}, Other} -> {Other, [Value | Values]}; - false -> {Props, [undefined | Values]} - end - end, - {Att, []}, - BaseProps - ), - {list_to_tuple(lists:reverse(Base)), Extended}. - - -%% The new disk term format is a simple wrapper around the legacy format. Base -%% properties will remain in a tuple while the new fields and possibly data from -%% future extensions will be stored in a list of atom/value pairs. While this is -%% slightly less efficient, future work should be able to make use of -%% compression to remove these sorts of common bits (block level compression -%% with something like a shared dictionary that is checkpointed every now and -%% then). -from_disk_term(StreamSrc, {Base, Extended}) - when is_tuple(Base), is_list(Extended) -> - store(Extended, from_disk_term(StreamSrc, Base)); -from_disk_term(StreamSrc, {Name,Type,Sp,AttLen,DiskLen,RevPos,Md5,Enc}) -> - {ok, Stream} = open_stream(StreamSrc, Sp), - #att{ - name=Name, - type=Type, - att_len=AttLen, - disk_len=DiskLen, - md5=Md5, - revpos=RevPos, - data={stream, Stream}, - encoding=upgrade_encoding(Enc) - }; -from_disk_term(StreamSrc, {Name,Type,Sp,AttLen,RevPos,Md5}) -> - {ok, Stream} = open_stream(StreamSrc, Sp), - #att{ - name=Name, - type=Type, - att_len=AttLen, - disk_len=AttLen, - md5=Md5, - revpos=RevPos, - data={stream, Stream} - }; -from_disk_term(StreamSrc, {Name,{Type,Sp,AttLen}}) -> - {ok, Stream} = open_stream(StreamSrc, Sp), - #att{ - name=Name, - type=Type, - att_len=AttLen, - disk_len=AttLen, - md5= <<>>, - revpos=0, - data={stream, Stream} - }. + fetch(encoding, Att), + fetch(headers, Att) + }}. + + +from_disk_term(#{} = Db, DocId, {?CURRENT_ATT_FORMAT, Props}) -> + { + Name, + Type, + AttId, + AttLen, + DiskLen, + RevPos, + Md5, + Encoding, + Headers + } = Props, + new([ + {name, Name}, + {type, Type}, + {data, {loc, Db#{tx := undefined}, DocId, AttId}}, + {att_len, AttLen}, + {disk_len, DiskLen}, + {revpos, RevPos}, + {md5, Md5}, + {encoding, Encoding}, + {headers, Headers} + ]). %% from_json reads in embedded JSON attachments and creates usable attachment @@ -433,8 +257,12 @@ stub_from_json(Att, Props) -> %% json object. See merge_stubs/3 for the stub check. RevPos = couch_util:get_value(<<"revpos">>, Props), store([ - {md5, Digest}, {revpos, RevPos}, {data, stub}, {disk_len, DiskLen}, - {att_len, EncodedLen}, {encoding, Encoding} + {data, stub}, + {disk_len, DiskLen}, + {att_len, EncodedLen}, + {revpos, RevPos}, + {md5, Digest}, + {encoding, Encoding} ], Att). @@ -443,8 +271,12 @@ follow_from_json(Att, Props) -> Digest = digest_from_json(Props), RevPos = couch_util:get_value(<<"revpos">>, Props, 0), store([ - {md5, Digest}, {revpos, RevPos}, {data, follows}, {disk_len, DiskLen}, - {att_len, EncodedLen}, {encoding, Encoding} + {data, follows}, + {disk_len, DiskLen}, + {att_len, EncodedLen}, + {revpos, RevPos}, + {md5, Digest}, + {encoding, Encoding} ], Att). @@ -455,8 +287,10 @@ inline_from_json(Att, Props) -> Length = size(Data), RevPos = couch_util:get_value(<<"revpos">>, Props, 0), store([ - {data, Data}, {revpos, RevPos}, {disk_len, Length}, - {att_len, Length} + {data, Data}, + {disk_len, Length}, + {att_len, Length}, + {revpos, RevPos} ], Att) catch _:_ -> @@ -466,7 +300,6 @@ inline_from_json(Att, Props) -> end. - encoded_lengths_from_json(Props) -> Len = couch_util:get_value(<<"length">>, Props), case couch_util:get_value(<<"encoding">>, Props) of @@ -488,9 +321,17 @@ digest_from_json(Props) -> to_json(Att, OutputData, DataToFollow, ShowEncoding) -> - [Name, Data, DiskLen, AttLen, Enc, Type, RevPos, Md5] = fetch( - [name, data, disk_len, att_len, encoding, type, revpos, md5], Att - ), + #{ + name := Name, + type := Type, + data := Data, + disk_len := DiskLen, + att_len := AttLen, + revpos := RevPos, + md5 := Md5, + encoding := Encoding, + headers := Headers + } = Att, Props = [ {<<"content_type">>, Type}, {<<"revpos">>, RevPos} @@ -505,71 +346,74 @@ to_json(Att, OutputData, DataToFollow, ShowEncoding) -> DataToFollow -> [{<<"length">>, DiskLen}, {<<"follows">>, true}]; true -> - AttData = case Enc of + AttData = case Encoding of gzip -> zlib:gunzip(to_binary(Att)); identity -> to_binary(Att) end, [{<<"data">>, base64:encode(AttData)}] end, EncodingProps = if - ShowEncoding andalso Enc /= identity -> + ShowEncoding andalso Encoding /= identity -> [ - {<<"encoding">>, couch_util:to_binary(Enc)}, + {<<"encoding">>, couch_util:to_binary(Encoding)}, {<<"encoded_length">>, AttLen} ]; true -> [] end, - HeadersProp = case fetch(headers, Att) of + HeadersProp = case Headers of undefined -> []; Headers -> [{<<"headers">>, Headers}] end, {Name, {Props ++ DigestProp ++ DataProps ++ EncodingProps ++ HeadersProp}}. -flush(Db, Att) -> - flush_data(Db, fetch(data, Att), Att). +flush(Db, DocId, Att1) -> + Att2 = read_data(fetch(data, Att1), Att1), + [ + Data, + AttLen, + DiskLen, + ReqMd5, + Encoding + ] = fetch([data, att_len, disk_len, md5, encoding], Att2), + + % Eventually, we'll check if we can compress this + % attachment here and do so if possible. + + % If we were sent a gzip'ed attachment with no + % length data, we have to set it here. + Att3 = case AttLen of + undefined -> store(att_len, DiskLen, Att2); + _ -> Att2 + end, + + % If no encoding has been set, default to + % identity + Att4 = case Encoding of + undefined -> store(encoding, identity, Att3); + _ -> Att3 + end, + + case Data of + {loc, _, _, _} -> + % Already flushed + Att1; + _ when is_binary(Data) -> + IdentMd5 = get_identity_md5(Data, fetch(encoding, Att4)), + if ReqMd5 == undefined -> ok; true -> + couch_util:check_md5(IdentMd5, ReqMd5) + end, + Att5 = store(md5, IdentMd5, Att4), + fabric2_db:write_attachment(Db, DocId, Att5) + end. -flush_data(Db, Data, Att) when is_binary(Data) -> - couch_db:with_stream(Db, Att, fun(OutputStream) -> - couch_stream:write(OutputStream, Data) - end); -flush_data(Db, Fun, Att) when is_function(Fun) -> - AttName = fetch(name, Att), - MaxAttSize = max_attachment_size(), - case fetch(att_len, Att) of - undefined -> - couch_db:with_stream(Db, Att, fun(OutputStream) -> - % Fun(MaxChunkSize, WriterFun) must call WriterFun - % once for each chunk of the attachment, - Fun(4096, - % WriterFun({Length, Binary}, State) - % WriterFun({0, _Footers}, State) - % Called with Length == 0 on the last time. - % WriterFun returns NewState. - fun({0, Footers}, _Total) -> - F = mochiweb_headers:from_binary(Footers), - case mochiweb_headers:get_value("Content-MD5", F) of - undefined -> - ok; - Md5 -> - {md5, base64:decode(Md5)} - end; - ({Length, Chunk}, Total0) -> - Total = Total0 + Length, - validate_attachment_size(AttName, Total, MaxAttSize), - couch_stream:write(OutputStream, Chunk), - Total - end, 0) - end); - AttLen -> - validate_attachment_size(AttName, AttLen, MaxAttSize), - couch_db:with_stream(Db, Att, fun(OutputStream) -> - write_streamed_attachment(OutputStream, Fun, AttLen) - end) - end; -flush_data(Db, {follows, Parser, Ref}, Att) -> +read_data({loc, #{}, _DocId, _AttId}, Att) -> + % Attachment already written to fdb + Att; + +read_data({follows, Parser, Ref}, Att) -> ParserRef = erlang:monitor(process, Parser), Fun = fun() -> Parser ! {get_bytes, Ref, self()}, @@ -583,41 +427,72 @@ flush_data(Db, {follows, Parser, Ref}, Att) -> end end, try - flush_data(Db, Fun, store(data, Fun, Att)) + read_data(Fun, store(data, Fun, Att)) after erlang:demonitor(ParserRef, [flush]) end; -flush_data(Db, {stream, StreamEngine}, Att) -> - case couch_db:is_active_stream(Db, StreamEngine) of - true -> - % Already written - Att; - false -> - NewAtt = couch_db:with_stream(Db, Att, fun(OutputStream) -> - couch_stream:copy(StreamEngine, OutputStream) - end), - InMd5 = fetch(md5, Att), - OutMd5 = fetch(md5, NewAtt), - couch_util:check_md5(OutMd5, InMd5), - NewAtt + +read_data(Data, Att) when is_binary(Data) -> + Att; + +read_data(Fun, Att) when is_function(Fun) -> + [AttName, AttLen, InMd5] = fetch([name, att_len, md5], Att), + MaxAttSize = max_attachment_size(), + case AttLen of + undefined -> + % Fun(MaxChunkSize, WriterFun) must call WriterFun + % once for each chunk of the attachment, + WriterFun = fun + ({0, Footers}, {Len, Acc}) -> + F = mochiweb_headers:from_binary(Footers), + Md5 = case mochiweb_headers:get_value("Content-MD5", F) of + undefined -> undefined; + Value -> base64:decode(Value) + end, + Props0 = [ + {data, iolist_to_binary(lists:reverse(Acc))}, + {disk_len, Len} + ], + Props1 = if InMd5 /= md5_in_footer -> Props0; true -> + [{md5, Md5} | Props0] + end, + store(Props1, Att); + ({ChunkLen, Chunk}, {Len, Acc}) -> + NewLen = Len + ChunkLen, + validate_attachment_size(AttName, NewLen, MaxAttSize), + {NewLen, [Chunk | Acc]} + end, + Fun(8192, WriterFun, {0, []}); + AttLen -> + validate_attachment_size(AttName, AttLen, MaxAttSize), + read_streamed_attachment(Att, Fun, AttLen, []) end. -write_streamed_attachment(_Stream, _F, 0) -> - ok; -write_streamed_attachment(_Stream, _F, LenLeft) when LenLeft < 0 -> +read_streamed_attachment(Att, _F, 0, Acc) -> + Bin = iolist_to_binary(lists:reverse(Acc)), + store([ + {data, Bin}, + {disk_len, size(Bin)} + ], Att); + +read_streamed_attachment(_Att, _F, LenLeft, _Acc) when LenLeft < 0 -> throw({bad_request, <<"attachment longer than expected">>}); -write_streamed_attachment(Stream, F, LenLeft) when LenLeft > 0 -> - Bin = try read_next_chunk(F, LenLeft) + +read_streamed_attachment(Att, F, LenLeft, Acc) when LenLeft > 0 -> + Bin = try + read_next_chunk(F, LenLeft) catch {mp_parser_died, normal} -> throw({bad_request, <<"attachment shorter than expected">>}) end, - ok = couch_stream:write(Stream, Bin), - write_streamed_attachment(Stream, F, LenLeft - iolist_size(Bin)). + Size = iolist_size(Bin), + read_streamed_attachment(Att, F, LenLeft - Size, [Bin | Acc]). + read_next_chunk(F, _) when is_function(F, 0) -> F(); + read_next_chunk(F, LenLeft) when is_function(F, 1) -> F(lists:min([LenLeft, 16#2000])). @@ -626,14 +501,17 @@ foldl(Att, Fun, Acc) -> foldl(fetch(data, Att), Att, Fun, Acc). +foldl({loc, Db, DocId, AttId}, _Att, Fun, Acc) -> + Bin = fabric2_db:read_attachment(Db#{tx := undefined}, DocId, AttId), + Fun(Bin, Acc); + foldl(Bin, _Att, Fun, Acc) when is_binary(Bin) -> Fun(Bin, Acc); -foldl({stream, StreamEngine}, Att, Fun, Acc) -> - Md5 = fetch(md5, Att), - couch_stream:foldl(StreamEngine, Md5, Fun, Acc); + foldl(DataFun, Att, Fun, Acc) when is_function(DataFun) -> Len = fetch(att_len, Att), fold_streamed_data(DataFun, Len, Fun, Acc); + foldl({follows, Parser, Ref}, Att, Fun, Acc) -> ParserRef = erlang:monitor(process, Parser), DataFun = fun() -> @@ -654,19 +532,26 @@ foldl({follows, Parser, Ref}, Att, Fun, Acc) -> end. +range_foldl(Bin1, From, To, Fun, Acc) when is_binary(Bin1) -> + ReadLen = To - From, + Bin2 = case Bin1 of + _ when size(Bin1) < From -> <<>>; + <<_:From/binary, B2>> -> B2 + end, + Bin3 = case Bin2 of + _ when size(Bin2) < ReadLen -> Bin2; + <<B3:ReadLen/binary, _/binary>> -> B3 + end, + Fun(Bin3, Acc); + range_foldl(Att, From, To, Fun, Acc) -> - {stream, StreamEngine} = fetch(data, Att), - couch_stream:range_foldl(StreamEngine, From, To, Fun, Acc). + {loc, Db, DocId, AttId} = fetch(data, Att), + Bin = fabric2_db:read_attachment(Db, DocId, AttId), + range_foldl(Bin, From, To, Fun, Acc). -foldl_decode(Att, Fun, Acc) -> - case fetch([data, encoding], Att) of - [{stream, StreamEngine}, Enc] -> - couch_stream:foldl_decode( - StreamEngine, fetch(md5, Att), Enc, Fun, Acc); - [Fun2, identity] -> - fold_streamed_data(Fun2, fetch(att_len, Att), Fun, Acc) - end. +foldl_decode(_Att, _Fun, _Acc) -> + erlang:error(not_supported). to_binary(Att) -> @@ -677,10 +562,8 @@ to_binary(Bin, _Att) when is_binary(Bin) -> Bin; to_binary(Iolist, _Att) when is_list(Iolist) -> iolist_to_binary(Iolist); -to_binary({stream, _StreamEngine}, Att) -> - iolist_to_binary( - lists:reverse(foldl(Att, fun(Bin,Acc) -> [Bin|Acc] end, [])) - ); +to_binary({loc, Db, DocId, AttId}, _Att) -> + fabric2_db:read_attachmet(Db, DocId, AttId); to_binary(DataFun, Att) when is_function(DataFun)-> Len = fetch(att_len, Att), iolist_to_binary( @@ -695,46 +578,22 @@ to_binary(DataFun, Att) when is_function(DataFun)-> fold_streamed_data(_RcvFun, 0, _Fun, Acc) -> Acc; + fold_streamed_data(RcvFun, LenLeft, Fun, Acc) when LenLeft > 0-> Bin = RcvFun(), ResultAcc = Fun(Bin, Acc), fold_streamed_data(RcvFun, LenLeft - size(Bin), Fun, ResultAcc). -%% Upgrade an attachment record to a property list on demand. This is a one-way -%% operation as downgrading potentially truncates fields with important data. --spec upgrade(#att{}) -> attachment(). -upgrade(#att{} = Att) -> - Map = lists:zip( - record_info(fields, att), - lists:seq(2, record_info(size, att)) - ), - %% Don't store undefined elements since that is default - [{F, element(I, Att)} || {F, I} <- Map, element(I, Att) /= undefined]; -upgrade(Att) -> - Att. - - -%% Downgrade is exposed for interactive convenience. In practice, unless done -%% manually, upgrades are always one-way. -downgrade(#att{} = Att) -> - Att; -downgrade(Att) -> - #att{ - name = fetch(name, Att), - type = fetch(type, Att), - att_len = fetch(att_len, Att), - disk_len = fetch(disk_len, Att), - md5 = fetch(md5, Att), - revpos = fetch(revpos, Att), - data = fetch(data, Att), - encoding = fetch(encoding, Att) - }. - - -upgrade_encoding(true) -> gzip; -upgrade_encoding(false) -> identity; -upgrade_encoding(Encoding) -> Encoding. +get_identity_md5(Bin, gzip) -> + Z = zlib:open(), + ok = zlib:inflateInit(Z, 16 + 15), + Inflated = zlib:inflate(Z, Bin), + ok = zlib:inflateEnd(Z), + ok = zlib:close(Z), + couch_hash:md5_hash(Inflated); +get_identity_md5(Bin, _) -> + couch_hash:md5_hash(Bin). max_attachment_size() -> @@ -753,18 +612,22 @@ validate_attachment_size(_AttName, _AttSize, _MAxAttSize) -> ok. -open_stream(StreamSrc, Data) -> - case couch_db:is_db(StreamSrc) of - true -> - couch_db:open_read_stream(StreamSrc, Data); - false -> - case is_function(StreamSrc, 1) of - true -> - StreamSrc(Data); - false -> - erlang:error({invalid_stream_source, StreamSrc}) - end - end. +%% is_compressible(Type) when is_binary(Type) -> +%% is_compressible(binary_to_list(Type)); +%% is_compressible(Type) -> +%% TypeExpList = re:split( +%% config:get("attachments", "compressible_types", ""), +%% "\\s*,\\s*", +%% [{return, list}] +%% ), +%% lists:any( +%% fun(TypeExp) -> +%% Regexp = ["^\\s*", re:replace(TypeExp, "\\*", ".*"), +%% "(?:\\s*;.*?)?\\s*", $$], +%% re:run(Type, Regexp, [caseless]) =/= nomatch +%% end, +%% [T || T <- TypeExpList, T /= []] +%% ). -ifdef(TEST). diff --git a/src/couch/src/couch_doc.erl b/src/couch/src/couch_doc.erl index 4a49372c7..d33325eb1 100644 --- a/src/couch/src/couch_doc.erl +++ b/src/couch/src/couch_doc.erl @@ -374,6 +374,17 @@ rev_info({#doc{} = Doc, {Pos, [RevId | _]}}) -> body_sp = undefined, seq = undefined, rev = {Pos, RevId} + }; +rev_info({#{} = RevInfo, {Pos, [RevId | _]}}) -> + #{ + deleted := Deleted, + sequence := Sequence + } = RevInfo, + #rev_info{ + deleted = Deleted, + body_sp = undefined, + seq = Sequence, + rev = {Pos, RevId} }. is_deleted(#full_doc_info{rev_tree=Tree}) -> diff --git a/src/fabric/src/fabric.app.src b/src/fabric/src/fabric.app.src index d7686ca1a..20fbb1e2a 100644 --- a/src/fabric/src/fabric.app.src +++ b/src/fabric/src/fabric.app.src @@ -13,7 +13,10 @@ {application, fabric, [ {description, "Routing and proxying layer for CouchDB cluster"}, {vsn, git}, - {registered, []}, + {mod, {fabric2_app, []}}, + {registered, [ + fabric_server + ]}, {applications, [ kernel, stdlib, @@ -22,6 +25,7 @@ rexi, mem3, couch_log, - couch_stats + couch_stats, + erlfdb ]} ]}. diff --git a/src/fabric/src/fabric2.hrl b/src/fabric/src/fabric2.hrl new file mode 100644 index 000000000..de1d3d177 --- /dev/null +++ b/src/fabric/src/fabric2.hrl @@ -0,0 +1,66 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + + +-define(uint2bin(I), binary:encode_unsigned(I, little)). +-define(bin2uint(I), binary:decode_unsigned(I, little)). + +% This will eventually be the `\xFFmetadataVersion` key that is +% currently only available in FoundationDB master. +% +% https://forums.foundationdb.org/t/a-new-tool-for-managing-layer-metadata/1191 +% +% Until then we'll fake the same behavior using a randomish +% key for tracking metadata changse. Once we get to the +% new feature this will be more performant by updating +% this define. +-define(METADATA_VERSION_KEY, <<"$metadata_version_key$">>). + + +% Prefix Definitions + +% Layer Level: (LayerPrefix, X, ...) + +-define(CLUSTER_CONFIG, 0). +-define(ALL_DBS, 1). +-define(DBS, 15). +-define(TX_IDS, 255). + +% Database Level: (LayerPrefix, ?DBS, DbPrefix, X, ...) + +-define(DB_VERSION, 0). +-define(DB_CONFIG, 16). +-define(DB_STATS, 17). +-define(DB_ALL_DOCS, 18). +-define(DB_CHANGES, 19). +-define(DB_REVS, 20). +-define(DB_DOCS, 21). +-define(DB_LOCAL_DOCS, 22). +-define(DB_ATTS, 23). + + +% Versions + +-define(CURR_REV_FORMAT, 0). + + +% Misc constants + +-define(PDICT_DB_KEY, '$fabric_db_handle'). +-define(PDICT_LAYER_CACHE, '$fabric_layer_id'). +-define(PDICT_CHECKED_DB_IS_CURRENT, '$fabric_checked_db_is_current'). +-define(PDICT_TX_ID_KEY, '$fabric_tx_id'). +-define(PDICT_TX_RES_KEY, '$fabric_tx_result'). +-define(COMMIT_UNKNOWN_RESULT, 1021). + + +-define(ATTACHMENT_CHUNK_SIZE, 100000). diff --git a/src/fabric/src/fabric2_app.erl b/src/fabric/src/fabric2_app.erl new file mode 100644 index 000000000..da95acb53 --- /dev/null +++ b/src/fabric/src/fabric2_app.erl @@ -0,0 +1,32 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(fabric2_app). +-behaviour(application). + + +-export([ + start/2, + stop/1 +]). + + +start(_Type, StartArgs) -> + fabric2_sup:start_link(StartArgs). + + +stop(_State) -> + case application:get_env(erlfdb, test_cluster_pid) of + {ok, Pid} -> Pid ! close; + _ -> ok + end, + ok. diff --git a/src/fabric/src/fabric2_db.erl b/src/fabric/src/fabric2_db.erl new file mode 100644 index 000000000..02a18fa23 --- /dev/null +++ b/src/fabric/src/fabric2_db.erl @@ -0,0 +1,1299 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(fabric2_db). + + +-export([ + create/2, + open/2, + delete/2, + + list_dbs/0, + list_dbs/1, + + is_admin/1, + check_is_admin/1, + check_is_member/1, + + name/1, + get_after_doc_read_fun/1, + get_before_doc_update_fun/1, + get_committed_update_seq/1, + get_compacted_seq/1, + get_compactor_pid/1, + get_db_info/1, + %% get_partition_info/2, + get_del_doc_count/1, + get_doc_count/1, + get_doc_count/2, + %% get_epochs/1, + %% get_filepath/1, + get_instance_start_time/1, + get_pid/1, + get_revs_limit/1, + get_security/1, + get_update_seq/1, + get_user_ctx/1, + get_uuid/1, + %% get_purge_seq/1, + %% get_oldest_purge_seq/1, + %% get_purge_infos_limit/1, + + is_clustered/1, + is_db/1, + is_partitioned/1, + is_system_db/1, + is_system_db_name/1, + + set_revs_limit/2, + %% set_purge_infos_limit/2, + set_security/2, + set_user_ctx/2, + + ensure_full_commit/1, + ensure_full_commit/2, + + %% load_validation_funs/1, + %% reload_validation_funs/1, + + open_doc/2, + open_doc/3, + open_doc_revs/4, + %% open_doc_int/3, + get_doc_info/2, + get_full_doc_info/2, + get_full_doc_infos/2, + get_missing_revs/2, + %% get_design_doc/2, + %% get_design_docs/1, + %% get_design_doc_count/1, + %% get_purge_infos/2, + + %% get_minimum_purge_seq/1, + %% purge_client_exists/3, + + %% validate_docid/2, + %% doc_from_json_obj_validate/2, + + update_doc/2, + update_doc/3, + update_docs/2, + update_docs/3, + %% delete_doc/3, + + %% purge_docs/2, + %% purge_docs/3, + + read_attachment/3, + write_attachment/3, + + fold_docs/3, + fold_docs/4, + %% fold_local_docs/4, + %% fold_design_docs/4, + fold_changes/4, + fold_changes/5, + %% count_changes_since/2, + %% fold_purge_infos/4, + %% fold_purge_infos/5, + + %% calculate_start_seq/3, + %% owner_of/2, + + %% start_compact/1, + %% cancel_compact/1, + %% wait_for_compaction/1, + %% wait_for_compaction/2, + + %% dbname_suffix/1, + %% normalize_dbname/1, + %% validate_dbname/1, + + %% make_doc/5, + new_revid/1 +]). + + +-include_lib("couch/include/couch_db.hrl"). +-include("fabric2.hrl"). + + +-define(DBNAME_REGEX, + "^[a-z][a-z0-9\\_\\$()\\+\\-\\/]*" % use the stock CouchDB regex + "(\\.[0-9]{10,})?$" % but allow an optional shard timestamp at the end +). + + +-define(RETURN(Term), throw({?MODULE, Term})). + + +create(DbName, Options) -> + Result = fabric2_fdb:transactional(DbName, Options, fun(TxDb) -> + case fabric2_fdb:exists(TxDb) of + true -> + {error, file_exists}; + false -> + fabric2_fdb:create(TxDb, Options) + end + end), + % We cache outside of the transaction so that we're sure + % that the transaction was committed. + case Result of + #{} = Db -> + ok = fabric2_server:store(Db), + {ok, Db#{tx := undefined}}; + Error -> + Error + end. + + +open(DbName, Options) -> + case fabric2_server:fetch(DbName) of + #{} = Db -> + {ok, maybe_set_user_ctx(Db, Options)}; + undefined -> + Result = fabric2_fdb:transactional(DbName, Options, fun(TxDb) -> + fabric2_fdb:open(TxDb, Options) + end), + % Cache outside the transaction retry loop + case Result of + #{} = Db -> + ok = fabric2_server:store(Db), + {ok, Db#{tx := undefined}}; + Error -> + Error + end + end. + + +delete(DbName, Options) -> + % This will throw if the db does not exist + {ok, Db} = open(DbName, Options), + Resp = fabric2_fdb:transactional(Db, fun(TxDb) -> + fabric2_fdb:delete(TxDb) + end), + if Resp /= ok -> Resp; true -> + fabric2_server:remove(DbName) + end. + + +list_dbs() -> + list_dbs([]). + + +list_dbs(Options) -> + fabric2_fdb:transactional(fun(Tx) -> + fabric2_fdb:list_dbs(Tx, Options) + end). + + +is_admin(Db) -> + % TODO: Need to re-consider couch_db_plugin:check_is_admin/1 + {SecProps} = get_security(Db), + UserCtx = get_user_ctx(Db), + {Admins} = get_admins(SecProps), + is_authorized(Admins, UserCtx). + + +check_is_admin(Db) -> + case is_admin(Db) of + true -> + ok; + false -> + UserCtx = get_user_ctx(Db), + Reason = <<"You are not a db or server admin.">>, + throw_security_error(UserCtx, Reason) + end. + + +check_is_member(Db) -> + case is_member(Db) of + true -> + ok; + false -> + UserCtx = get_user_ctx(Db), + throw_security_error(UserCtx) + end. + + +name(#{name := DbName}) -> + DbName. + + +get_after_doc_read_fun(#{after_doc_read := AfterDocRead}) -> + AfterDocRead. + + +get_before_doc_update_fun(#{before_doc_update := BeforeDocUpdate}) -> + BeforeDocUpdate. + +get_committed_update_seq(#{} = Db) -> + get_update_seq(Db). + + +get_compacted_seq(#{} = Db) -> + get_update_seq(Db). + + +get_compactor_pid(#{} = _Db) -> + nil. + + +get_db_info(#{} = Db) -> + DbProps = fabric2_fdb:transactional(Db, fun(TxDb) -> + fabric2_fdb:get_info(TxDb) + end), + + BaseProps = [ + {cluster, {[{n, 0}, {q, 0}, {r, 0}, {w, 0}]}}, + {compact_running, false}, + {data_size, 0}, + {db_name, name(Db)}, + {disk_format_version, 0}, + {disk_size, 0}, + {instance_start_time, <<"0">>}, + {purge_seq, 0} + ], + + {ok, lists:foldl(fun({Key, Val}, Acc) -> + lists:keystore(Key, 1, Acc, {Key, Val}) + end, BaseProps, DbProps)}. + + +get_del_doc_count(#{} = Db) -> + get_doc_count(Db, <<"doc_del_count">>). + + +get_doc_count(Db) -> + get_doc_count(Db, <<"doc_count">>). + + +get_doc_count(Db, <<"_all_docs">>) -> + get_doc_count(Db, <<"doc_count">>); + +get_doc_count(DbName, <<"_design">>) -> + get_doc_count(DbName, <<"doc_design_count">>); + +get_doc_count(DbName, <<"_local">>) -> + get_doc_count(DbName, <<"doc_local_count">>); + +get_doc_count(Db, Key) -> + fabric2_fdb:transactional(Db, fun(TxDb) -> + fabric2_fdb:get_stat(TxDb, Key) + end). + + +get_instance_start_time(#{}) -> + 0. + + +get_pid(#{}) -> + nil. + + +get_revs_limit(#{revs_limit := RevsLimit}) -> + RevsLimit. + + +get_security(#{security_doc := SecurityDoc}) -> + SecurityDoc. + + +get_update_seq(#{} = Db) -> + fabric2_fdb:transactional(Db, fun(TxDb) -> + fabric2_fdb:get_last_change(TxDb) + end). + + +get_user_ctx(#{user_ctx := UserCtx}) -> + UserCtx. + + +get_uuid(#{uuid := UUID}) -> + UUID. + + +is_clustered(#{}) -> + false. + + +is_db(#{name := _}) -> + true; +is_db(_) -> + false. + + +is_partitioned(#{}) -> + false. + + +is_system_db(#{name := DbName}) -> + is_system_db_name(DbName). + + +is_system_db_name(DbName) when is_list(DbName) -> + is_system_db_name(?l2b(DbName)); +is_system_db_name(DbName) when is_binary(DbName) -> + Suffix = filename:basename(DbName), + case {filename:dirname(DbName), lists:member(Suffix, ?SYSTEM_DATABASES)} of + {<<".">>, Result} -> Result; + {_Prefix, false} -> false; + {Prefix, true} -> + ReOpts = [{capture,none}, dollar_endonly], + re:run(Prefix, ?DBNAME_REGEX, ReOpts) == match + end. + + +set_revs_limit(#{} = Db, RevsLimit) -> + check_is_admin(Db), + RevsLimBin = ?uint2bin(RevsLimit), + Resp = fabric2_fdb:transactional(Db, fun(TxDb) -> + fabric2_fdb:set_config(TxDb, <<"revs_limit">>, RevsLimBin) + end), + if Resp /= ok -> Resp; true -> + fabric2_server:store(Db#{revs_limit := RevsLimit}) + end. + + +set_security(#{} = Db, Security) -> + check_is_admin(Db), + ok = fabric2_util:validate_security_object(Security), + SecBin = ?JSON_ENCODE(Security), + Resp = fabric2_fdb:transactional(Db, fun(TxDb) -> + fabric2_fdb:set_config(TxDb, <<"security_doc">>, SecBin) + end), + if Resp /= ok -> Resp; true -> + fabric2_server:store(Db#{security_doc := Security}) + end. + + +set_user_ctx(#{} = Db, UserCtx) -> + Db#{user_ctx := UserCtx}. + + +ensure_full_commit(#{}) -> + {ok, 0}. + + +ensure_full_commit(#{}, _Timeout) -> + {ok, 0}. + + +open_doc(#{} = Db, DocId) -> + open_doc(Db, DocId, []). + + +open_doc(#{} = Db, <<?LOCAL_DOC_PREFIX, _/binary>> = DocId, _Options) -> + fabric2_fdb:transactional(Db, fun(TxDb) -> + case fabric2_fdb:get_local_doc(TxDb, DocId) of + #doc{} = Doc -> {ok, Doc}; + Else -> Else + end + end); + +open_doc(#{} = Db, DocId, Options) -> + NeedsTreeOpts = [revs_info, conflicts, deleted_conflicts], + NeedsTree = (Options -- NeedsTreeOpts /= Options), + fabric2_fdb:transactional(Db, fun(TxDb) -> + Revs = case NeedsTree of + true -> fabric2_fdb:get_all_revs(TxDb, DocId); + false -> fabric2_fdb:get_winning_revs(TxDb, DocId, 1) + end, + if Revs == [] -> {not_found, missing}; true -> + #{winner := true} = RI = lists:last(Revs), + case fabric2_fdb:get_doc_body(TxDb, DocId, RI) of + #doc{} = Doc -> + apply_open_doc_opts(Doc, Revs, Options); + Else -> + Else + end + end + end). + + +open_doc_revs(Db, DocId, Revs, Options) -> + Latest = lists:member(latest, Options), + fabric2_fdb:transactional(Db, fun(TxDb) -> + AllRevInfos = fabric2_fdb:get_all_revs(TxDb, DocId), + RevTree = lists:foldl(fun(RI, TreeAcc) -> + RIPath = fabric2_util:revinfo_to_path(RI), + {Merged, _} = couch_key_tree:merge(TreeAcc, RIPath), + Merged + end, [], AllRevInfos), + {Found, Missing} = case Revs of + all -> + {couch_key_tree:get_all_leafs(RevTree), []}; + _ when Latest -> + couch_key_tree:get_key_leafs(RevTree, Revs); + _ -> + couch_key_tree:get(RevTree, Revs) + end, + Docs = lists:map(fun({Value, {Pos, [Rev | RevPath]}}) -> + case Value of + ?REV_MISSING -> + % We have the rev in our list but know nothing about it + {{not_found, missing}, {Pos, Rev}}; + _ -> + RevInfo = #{ + rev_id => {Pos, Rev}, + rev_path => RevPath + }, + case fabric2_fdb:get_doc_body(TxDb, DocId, RevInfo) of + #doc{} = Doc -> {ok, Doc}; + Else -> {Else, {Pos, Rev}} + end + end + end, Found), + MissingDocs = [{{not_found, missing}, MRev} || MRev <- Missing], + {ok, Docs ++ MissingDocs} + end). + + +get_doc_info(Db, DocId) -> + case get_full_doc_info(Db, DocId) of + not_found -> not_found; + FDI -> couch_doc:to_doc_info(FDI) + end. + + +get_full_doc_info(Db, DocId) -> + RevInfos = fabric2_fdb:transactional(Db, fun(TxDb) -> + fabric2_fdb:get_all_revs(TxDb, DocId) + end), + if RevInfos == [] -> not_found; true -> + #{winner := true} = Winner = lists:last(RevInfos), + RevTree = lists:foldl(fun(RI, TreeAcc) -> + RIPath = fabric2_util:revinfo_to_path(RI), + {Merged, _} = couch_key_tree:merge(TreeAcc, RIPath), + Merged + end, [], RevInfos), + #full_doc_info{ + id = DocId, + update_seq = fabric2_fdb:vs_to_seq(maps:get(sequence, Winner)), + deleted = maps:get(deleted, Winner), + rev_tree = RevTree + } + end. + + +get_full_doc_infos(Db, DocIds) -> + fabric2_fdb:transactional(Db, fun(TxDb) -> + lists:map(fun(DocId) -> + get_full_doc_info(TxDb, DocId) + end, DocIds) + end). + + +get_missing_revs(Db, JsonIdRevs) -> + IdRevs = [idrevs(IdR) || IdR <- JsonIdRevs], + AllRevInfos = fabric2_fdb:transactional(Db, fun(TxDb) -> + lists:foldl(fun({Id, _Revs}, Acc) -> + case maps:is_key(Id, Acc) of + true -> + Acc; + false -> + RevInfos = fabric2_fdb:get_all_revs(TxDb, Id), + Acc#{Id => RevInfos} + end + end, #{}, IdRevs) + end), + AllMissing = lists:flatmap(fun({Id, Revs}) -> + #{Id := RevInfos} = AllRevInfos, + Missing = try + lists:foldl(fun(RevInfo, RevAcc) -> + if RevAcc /= [] -> ok; true -> + throw(all_found) + end, + filter_found_revs(RevInfo, RevAcc) + end, Revs, RevInfos) + catch throw:all_found -> + [] + end, + if Missing == [] -> []; true -> + PossibleAncestors = find_possible_ancestors(RevInfos, Missing), + [{Id, Missing, PossibleAncestors}] + end + end, IdRevs), + {ok, AllMissing}. + + +update_doc(Db, Doc) -> + update_doc(Db, Doc, []). + + +update_doc(Db, Doc, Options) -> + case update_docs(Db, [Doc], Options) of + {ok, [{ok, NewRev}]} -> + {ok, NewRev}; + {ok, [{{_Id, _Rev}, Error}]} -> + throw(Error); + {error, [{{_Id, _Rev}, Error}]} -> + throw(Error); + {error, [Error]} -> + throw(Error); + {ok, []} -> + % replication success + {Pos, [RevId | _]} = Doc#doc.revs, + {ok, {Pos, RevId}} + end. + + +update_docs(Db, Docs) -> + update_docs(Db, Docs, []). + + +update_docs(Db, Docs, Options) -> + Resps0 = case lists:member(replicated_changes, Options) of + false -> + fabric2_fdb:transactional(Db, fun(TxDb) -> + update_docs_interactive(TxDb, Docs, Options) + end); + true -> + lists:map(fun(Doc) -> + fabric2_fdb:transactional(Db, fun(TxDb) -> + update_doc_int(TxDb, Doc, Options) + end) + end, Docs) + end, + % Convert errors + Resps1 = lists:map(fun(Resp) -> + case Resp of + {#doc{} = Doc, Error} -> + #doc{ + id = DocId, + revs = Revs + } = Doc, + RevId = case Revs of + {RevPos, [Rev | _]} -> {RevPos, Rev}; + {0, []} -> {0, <<>>} + end, + {{DocId, RevId}, Error}; + Else -> + Else + end + end, Resps0), + case lists:member(replicated_changes, Options) of + true -> + {ok, [R || R <- Resps1, R /= {ok, []}]}; + false -> + Status = lists:foldl(fun(Resp, Acc) -> + case Resp of + {ok, _} -> Acc; + _ -> error + end + end, ok, Resps1), + {Status, Resps1} + end. + + +read_attachment(Db, DocId, AttId) -> + fabric2_fdb:transactional(Db, fun(TxDb) -> + fabric2_fdb:read_attachment(TxDb, DocId, AttId) + end). + + +write_attachment(Db, DocId, Att) -> + Data = couch_att:fetch(data, Att), + {ok, AttId} = fabric2_fdb:transactional(Db, fun(TxDb) -> + fabric2_fdb:write_attachment(TxDb, DocId, Data) + end), + couch_att:store(data, {loc, Db, DocId, AttId}, Att). + + +fold_docs(Db, UserFun, UserAcc) -> + fold_docs(Db, UserFun, UserAcc, []). + + +fold_docs(Db, UserFun, UserAcc, Options) -> + fabric2_fdb:transactional(Db, fun(TxDb) -> + fabric2_fdb:fold_docs(TxDb, UserFun, UserAcc, Options) + end). + + +fold_changes(Db, SinceSeq, UserFun, UserAcc) -> + fold_changes(Db, SinceSeq, UserFun, UserAcc, []). + + +fold_changes(Db, SinceSeq, UserFun, UserAcc, Options) -> + fabric2_fdb:transactional(Db, fun(TxDb) -> + fabric2_fdb:fold_changes(TxDb, SinceSeq, UserFun, UserAcc, Options) + end). + + +new_revid(Doc) -> + #doc{ + body = Body, + revs = {OldStart, OldRevs}, + atts = Atts, + deleted = Deleted + } = Doc, + + DigestedAtts = lists:foldl(fun(Att, Acc) -> + [N, T, M] = couch_att:fetch([name, type, md5], Att), + case M == <<>> of + true -> Acc; + false -> [{N, T, M} | Acc] + end + end, [], Atts), + + Rev = case DigestedAtts of + Atts2 when length(Atts) =/= length(Atts2) -> + % We must have old style non-md5 attachments + list_to_binary(integer_to_list(couch_util:rand32())); + Atts2 -> + OldRev = case OldRevs of [] -> 0; [OldRev0 | _] -> OldRev0 end, + SigTerm = [Deleted, OldStart, OldRev, Body, Atts2], + couch_hash:md5_hash(term_to_binary(SigTerm, [{minor_version, 1}])) + end, + + Doc#doc{revs = {OldStart + 1, [Rev | OldRevs]}}. + + +maybe_set_user_ctx(Db, Options) -> + case fabric2_util:get_value(user_ctx, Options) of + #user_ctx{} = UserCtx -> + set_user_ctx(Db, UserCtx); + undefined -> + Db + end. + + +is_member(Db) -> + {SecProps} = get_security(Db), + case is_admin(Db) of + true -> + true; + false -> + case is_public_db(SecProps) of + true -> + true; + false -> + {Members} = get_members(SecProps), + UserCtx = get_user_ctx(Db), + is_authorized(Members, UserCtx) + end + end. + + +is_authorized(Group, UserCtx) -> + #user_ctx{ + name = UserName, + roles = UserRoles + } = UserCtx, + Names = fabric2_util:get_value(<<"names">>, Group, []), + Roles = fabric2_util:get_value(<<"roles">>, Group, []), + case check_security(roles, UserRoles, [<<"_admin">> | Roles]) of + true -> + true; + false -> + check_security(names, UserName, Names) + end. + + +check_security(roles, [], _) -> + false; +check_security(roles, UserRoles, Roles) -> + UserRolesSet = ordsets:from_list(UserRoles), + RolesSet = ordsets:from_list(Roles), + not ordsets:is_disjoint(UserRolesSet, RolesSet); +check_security(names, _, []) -> + false; +check_security(names, null, _) -> + false; +check_security(names, UserName, Names) -> + lists:member(UserName, Names). + + +throw_security_error(#user_ctx{name = null} = UserCtx) -> + Reason = <<"You are not authorized to access this db.">>, + throw_security_error(UserCtx, Reason); +throw_security_error(#user_ctx{name = _} = UserCtx) -> + Reason = <<"You are not allowed to access this db.">>, + throw_security_error(UserCtx, Reason). + + +throw_security_error(#user_ctx{} = UserCtx, Reason) -> + Error = security_error_type(UserCtx), + throw({Error, Reason}). + + +security_error_type(#user_ctx{name = null}) -> + unauthorized; +security_error_type(#user_ctx{name = _}) -> + forbidden. + + +is_public_db(SecProps) -> + {Members} = get_members(SecProps), + Names = fabric2_util:get_value(<<"names">>, Members, []), + Roles = fabric2_util:get_value(<<"roles">>, Members, []), + Names =:= [] andalso Roles =:= []. + + +get_admins(SecProps) -> + fabric2_util:get_value(<<"admins">>, SecProps, {[]}). + + +get_members(SecProps) -> + % we fallback to readers here for backwards compatibility + case fabric2_util:get_value(<<"members">>, SecProps) of + undefined -> + fabric2_util:get_value(<<"readers">>, SecProps, {[]}); + Members -> + Members + end. + + +apply_open_doc_opts(Doc, Revs, Options) -> + IncludeRevsInfo = lists:member(revs_info, Options), + IncludeConflicts = lists:member(conflicts, Options), + IncludeDelConflicts = lists:member(deleted_conflicts, Options), + IncludeLocalSeq = lists:member(local_seq, Options), + ReturnDeleted = lists:member(deleted, Options), + + % This revs_info becomes fairly useless now that we're + % not keeping old document bodies around... + Meta1 = if not IncludeRevsInfo -> []; true -> + {Pos, [Rev | RevPath]} = Doc#doc.revs, + RevPathMissing = lists:map(fun(R) -> {R, missing} end, RevPath), + [{revs_info, Pos, [{Rev, available} | RevPathMissing]}] + end, + + Meta2 = if not IncludeConflicts -> []; true -> + Conflicts = [RI || RI = #{winner := false, deleted := false} <- Revs], + if Conflicts == [] -> []; true -> + ConflictRevs = [maps:get(rev_id, RI) || RI <- Conflicts], + [{conflicts, ConflictRevs}] + end + end, + + Meta3 = if not IncludeDelConflicts -> []; true -> + DelConflicts = [RI || RI = #{winner := false, deleted := true} <- Revs], + if DelConflicts == [] -> []; true -> + DelConflictRevs = [maps:get(rev_id, RI) || RI <- DelConflicts], + [{deleted_conflicts, DelConflictRevs}] + end + end, + + Meta4 = if not IncludeLocalSeq -> []; true -> + #{winner := true, sequence := SeqVS} = lists:last(Revs), + [{local_seq, fabric2_fdb:vs_to_seq(SeqVS)}] + end, + + case Doc#doc.deleted and not ReturnDeleted of + true -> + {not_found, deleted}; + false -> + {ok, Doc#doc{ + meta = Meta1 ++ Meta2 ++ Meta3 ++ Meta4 + }} + end. + + +filter_found_revs(RevInfo, Revs) -> + #{ + rev_id := {Pos, Rev}, + rev_path := RevPath + } = RevInfo, + FullRevPath = [Rev | RevPath], + lists:flatmap(fun({FindPos, FindRev} = RevIdToFind) -> + if FindPos > Pos -> [RevIdToFind]; true -> + % Add 1 because lists:nth is 1 based + Idx = Pos - FindPos + 1, + case Idx > length(FullRevPath) of + true -> + [RevIdToFind]; + false -> + case lists:nth(Idx, FullRevPath) == FindRev of + true -> []; + false -> [RevIdToFind] + end + end + end + end, Revs). + + +find_possible_ancestors(RevInfos, MissingRevs) -> + % Find any revinfos that are possible ancestors + % of the missing revs. A possible ancestor is + % any rev that has a start position less than + % any missing revision. Stated alternatively, + % find any revinfo that could theoretically + % extended to be one or more of the missing + % revisions. + % + % Since we are looking at any missing revision + % we can just compare against the maximum missing + % start position. + MaxMissingPos = case MissingRevs of + [] -> 0; + [_ | _] -> lists:max([Start || {Start, _Rev} <- MissingRevs]) + end, + lists:flatmap(fun(RevInfo) -> + #{rev_id := {RevPos, _} = RevId} = RevInfo, + case RevPos < MaxMissingPos of + true -> [RevId]; + false -> [] + end + end, RevInfos). + + +update_doc_int(#{} = Db, #doc{} = Doc, Options) -> + IsLocal = case Doc#doc.id of + <<?LOCAL_DOC_PREFIX, _/binary>> -> true; + _ -> false + end, + IsReplicated = lists:member(replicated_changes, Options), + try + case {IsLocal, IsReplicated} of + {false, false} -> update_doc_interactive(Db, Doc, Options); + {false, true} -> update_doc_replicated(Db, Doc, Options); + {true, _} -> update_local_doc(Db, Doc, Options) + end + catch throw:{?MODULE, Return} -> + Return + end. + + +update_docs_interactive(Db, Docs0, Options) -> + Docs = tag_docs(Docs0), + Futures = get_winning_rev_futures(Db, Docs), + {Result, _} = lists:mapfoldl(fun(Doc, SeenIds) -> + try + update_docs_interactive(Db, Doc, Options, Futures, SeenIds) + catch throw:{?MODULE, Return} -> + {Return, SeenIds} + end + end, [], Docs), + Result. + + +update_docs_interactive(Db, #doc{id = <<?LOCAL_DOC_PREFIX, _/binary>>} = Doc, + Options, _Futures, SeenIds) -> + {update_local_doc(Db, Doc, Options), SeenIds}; + +update_docs_interactive(Db, Doc, Options, Futures, SeenIds) -> + case lists:member(Doc#doc.id, SeenIds) of + true -> + {{error, conflict}, SeenIds}; + false -> + Future = maps:get(doc_tag(Doc), Futures), + case update_doc_interactive(Db, Doc, Future, Options) of + {ok, _} = Resp -> + {Resp, [Doc#doc.id | SeenIds]}; + _ = Resp -> + {Resp, SeenIds} + end + end. + + +update_doc_interactive(Db, Doc0, Options) -> + % Get the current winning revision. This is needed + % regardless of which branch we're updating. The extra + % revision we're grabbing is an optimization to + % save us a round trip if we end up deleting + % the winning revision branch. + NumRevs = if Doc0#doc.deleted -> 2; true -> 1 end, + Future = fabric2_fdb:get_winning_revs_future(Db, Doc0#doc.id, NumRevs), + update_doc_interactive(Db, Doc0, Future, Options). + + +update_doc_interactive(Db, Doc0, Future, _Options) -> + RevInfos = fabric2_fdb:get_winning_revs_wait(Db, Future), + {Winner, SecondPlace} = case RevInfos of + [] -> {not_found, not_found}; + [WRI] -> {WRI, not_found}; + [WRI, SPRI] -> {WRI, SPRI} + end, + WinnerRevId = case Winner of + not_found -> + {0, <<>>}; + _ -> + case maps:get(deleted, Winner) of + true -> {0, <<>>}; + false -> maps:get(rev_id, Winner) + end + end, + + % Check that a revision was specified if required + Doc0RevId = doc_to_revid(Doc0), + if Doc0RevId /= {0, <<>>} orelse WinnerRevId == {0, <<>>} -> ok; true -> + ?RETURN({error, conflict}) + end, + + % Check that we're not trying to create a deleted doc + if Doc0RevId /= {0, <<>>} orelse not Doc0#doc.deleted -> ok; true -> + ?RETURN({error, conflict}) + end, + + % Get the target revision to update + Target = case Doc0RevId == WinnerRevId of + true -> + Winner; + false -> + case fabric2_fdb:get_non_deleted_rev(Db, Doc0#doc.id, Doc0RevId) of + #{deleted := false} = Target0 -> + Target0; + not_found -> + % Either a missing revision or a deleted + % revision. Either way a conflict. Note + % that we get not_found for a deleted revision + % because we only check for the non-deleted + % key in fdb + ?RETURN({error, conflict}) + end + end, + + % When recreating a deleted document we want to extend + % the winning revision branch rather than create a + % new branch. If we did not do this we could be + % recreating into a state that previously existed. + Doc1 = case Winner of + #{deleted := true} when not Doc0#doc.deleted -> + {WinnerRevPos, WinnerRev} = maps:get(rev_id, Winner), + WinnerRevPath = maps:get(rev_path, Winner), + Doc0#doc{revs = {WinnerRevPos, [WinnerRev | WinnerRevPath]}}; + _ -> + Doc0 + end, + + % Validate the doc update and create the + % new revinfo map + Doc2 = prep_and_validate(Db, Doc1, Target), + #doc{ + deleted = NewDeleted, + revs = {NewRevPos, [NewRev | NewRevPath]} + } = Doc3 = new_revid(Doc2), + + Doc4 = update_attachment_revpos(Doc3), + + NewRevInfo = #{ + winner => undefined, + deleted => NewDeleted, + rev_id => {NewRevPos, NewRev}, + rev_path => NewRevPath, + sequence => undefined, + branch_count => undefined + }, + + % Gather the list of possible winnig revisions + Possible = case Target == Winner of + true when not Doc4#doc.deleted -> + [NewRevInfo]; + true when Doc4#doc.deleted -> + case SecondPlace of + #{} -> [NewRevInfo, SecondPlace]; + not_found -> [NewRevInfo] + end; + false -> + [NewRevInfo, Winner] + end, + + % Sort the rev infos such that the winner is first + {NewWinner0, NonWinner} = case fabric2_util:sort_revinfos(Possible) of + [W] -> {W, not_found}; + [W, NW] -> {W, NW} + end, + + BranchCount = case Winner of + not_found -> 1; + #{branch_count := BC} -> BC + end, + NewWinner = NewWinner0#{branch_count := BranchCount}, + ToUpdate = if NonWinner == not_found -> []; true -> [NonWinner] end, + ToRemove = if Target == not_found -> []; true -> [Target] end, + + ok = fabric2_fdb:write_doc( + Db, + Doc4, + NewWinner, + Winner, + ToUpdate, + ToRemove + ), + + {ok, {NewRevPos, NewRev}}. + + +update_doc_replicated(Db, Doc0, _Options) -> + #doc{ + id = DocId, + deleted = Deleted, + revs = {RevPos, [Rev | RevPath]} + } = Doc0, + + DocRevInfo0 = #{ + winner => undefined, + deleted => Deleted, + rev_id => {RevPos, Rev}, + rev_path => RevPath, + sequence => undefined, + branch_count => undefined + }, + + AllRevInfos = fabric2_fdb:get_all_revs(Db, DocId), + + RevTree = lists:foldl(fun(RI, TreeAcc) -> + RIPath = fabric2_util:revinfo_to_path(RI), + {Merged, _} = couch_key_tree:merge(TreeAcc, RIPath), + Merged + end, [], AllRevInfos), + + DocRevPath = fabric2_util:revinfo_to_path(DocRevInfo0), + {NewTree, Status} = couch_key_tree:merge(RevTree, DocRevPath), + if Status /= internal_node -> ok; true -> + % We already know this revision so nothing + % left to do. + ?RETURN({ok, []}) + end, + + % Its possible to have a replication with fewer than $revs_limit + % revisions which extends an existing branch. To avoid + % losing revision history we extract the new node from the + % tree and use the combined path after stemming. + {[{_, {RevPos, UnstemmedRevs}}], []} + = couch_key_tree:get(NewTree, [{RevPos, Rev}]), + RevsLimit = fabric2_db:get_revs_limit(Db), + Doc1 = Doc0#doc{ + revs = {RevPos, lists:sublist(UnstemmedRevs, RevsLimit)} + }, + {RevPos, [Rev | NewRevPath]} = Doc1#doc.revs, + DocRevInfo1 = DocRevInfo0#{rev_path := NewRevPath}, + + % Find any previous revision we knew about for + % validation and attachment handling. + AllLeafsFull = couch_key_tree:get_all_leafs_full(NewTree), + LeafPath = get_leaf_path(RevPos, Rev, AllLeafsFull), + PrevRevInfo = find_prev_revinfo(RevPos, LeafPath), + Doc2 = prep_and_validate(Db, Doc1, PrevRevInfo), + + % Possible winners are the previous winner and + % the new DocRevInfo + Winner = case fabric2_util:sort_revinfos(AllRevInfos) of + [#{winner := true} = WRI | _] -> WRI; + [] -> not_found + end, + {NewWinner0, NonWinner} = case Winner == PrevRevInfo of + true -> + {DocRevInfo1, not_found}; + false -> + [W, NW] = fabric2_util:sort_revinfos([Winner, DocRevInfo1]), + {W, NW} + end, + + NewWinner = NewWinner0#{branch_count := length(AllLeafsFull)}, + ToUpdate = if NonWinner == not_found -> []; true -> [NonWinner] end, + ToRemove = if PrevRevInfo == not_found -> []; true -> [PrevRevInfo] end, + + ok = fabric2_fdb:write_doc( + Db, + Doc2, + NewWinner, + Winner, + ToUpdate, + ToRemove + ), + + {ok, []}. + + +update_local_doc(Db, Doc0, _Options) -> + Doc1 = case increment_local_doc_rev(Doc0) of + {ok, Updated} -> Updated; + {error, _} = Error -> ?RETURN(Error) + end, + + ok = fabric2_fdb:write_local_doc(Db, Doc1), + + #doc{revs = {0, [Rev]}} = Doc1, + {ok, {0, integer_to_binary(Rev)}}. + + +update_attachment_revpos(#doc{revs = {RevPos, _Revs}, atts = Atts0} = Doc) -> + Atts = lists:map(fun(Att) -> + case couch_att:fetch(data, Att) of + {loc, _Db, _DocId, _AttId} -> + % Attachment was already on disk + Att; + _ -> + % We will write this attachment with this update + % so mark it with the RevPos that will be written + couch_att:store(revpos, RevPos, Att) + end + end, Atts0), + Doc#doc{atts = Atts}. + + +get_winning_rev_futures(Db, Docs) -> + lists:foldl(fun(Doc, Acc) -> + #doc{ + id = DocId, + deleted = Deleted + } = Doc, + IsLocal = case DocId of + <<?LOCAL_DOC_PREFIX, _/binary>> -> true; + _ -> false + end, + if IsLocal -> Acc; true -> + NumRevs = if Deleted -> 2; true -> 1 end, + Future = fabric2_fdb:get_winning_revs_future(Db, DocId, NumRevs), + DocTag = doc_tag(Doc), + Acc#{DocTag => Future} + end + end, #{}, Docs). + + +prep_and_validate(Db, NewDoc, PrevRevInfo) -> + HasStubs = couch_doc:has_stubs(NewDoc), + HasVDUs = [] /= maps:get(validate_doc_update_funs, Db), + IsDDoc = case NewDoc#doc.id of + <<?DESIGN_DOC_PREFIX, _/binary>> -> true; + _ -> false + end, + + PrevDoc = case HasStubs orelse (HasVDUs and not IsDDoc) of + true when PrevRevInfo /= not_found -> + case fabric2_fdb:get_doc_body(Db, NewDoc#doc.id, PrevRevInfo) of + #doc{} = PDoc -> PDoc; + {not_found, _} -> nil + end; + _ -> + nil + end, + + MergedDoc = if not HasStubs -> NewDoc; true -> + % This will throw an error if we have any + % attachment stubs missing data + couch_doc:merge_stubs(NewDoc, PrevDoc) + end, + check_duplicate_attachments(MergedDoc), + validate_doc_update(Db, MergedDoc, PrevDoc), + MergedDoc. + + +validate_doc_update(Db, #doc{id = <<"_design/", _/binary>>} = Doc, _) -> + case catch check_is_admin(Db) of + ok -> validate_ddoc(Db, Doc); + Error -> ?RETURN({Doc, Error}) + end; +validate_doc_update(Db, Doc, PrevDoc) -> + #{ + security_doc := Security, + validate_doc_update_funs := VDUs + } = Db, + Fun = fun() -> + JsonCtx = fabric2_util:user_ctx_to_json(Db), + lists:map(fun(VDU) -> + try + case VDU(Doc, PrevDoc, JsonCtx, Security) of + ok -> ok; + Error1 -> throw(Error1) + end + catch throw:Error2 -> + ?RETURN({Doc, Error2}) + end + end, VDUs) + end, + Stat = [couchdb, query_server, vdu_process_time], + if VDUs == [] -> ok; true -> + couch_stats:update_histogram(Stat, Fun) + end. + + +validate_ddoc(Db, DDoc) -> + try + ok = couch_index_server:validate(Db, couch_doc:with_ejson_body(DDoc)) + catch + throw:{invalid_design_doc, Reason} -> + throw({bad_request, invalid_design_doc, Reason}); + throw:{compilation_error, Reason} -> + throw({bad_request, compilation_error, Reason}); + throw:Error -> + ?RETURN({DDoc, Error}) + end. + + +check_duplicate_attachments(#doc{atts = Atts}) -> + lists:foldl(fun(Att, Names) -> + Name = couch_att:fetch(name, Att), + case ordsets:is_element(Name, Names) of + true -> throw({bad_request, <<"Duplicate attachments">>}); + false -> ordsets:add_element(Name, Names) + end + end, ordsets:new(), Atts). + + +get_leaf_path(Pos, Rev, [{Pos, [{Rev, _RevInfo} | LeafPath]} | _]) -> + LeafPath; +get_leaf_path(Pos, Rev, [_WrongLeaf | RestLeafs]) -> + get_leaf_path(Pos, Rev, RestLeafs). + + +find_prev_revinfo(_Pos, []) -> + not_found; +find_prev_revinfo(Pos, [{_Rev, ?REV_MISSING} | RestPath]) -> + find_prev_revinfo(Pos - 1, RestPath); +find_prev_revinfo(_Pos, [{_Rev, #{} = RevInfo} | _]) -> + RevInfo. + + +increment_local_doc_rev(#doc{deleted = true} = Doc) -> + {ok, Doc#doc{revs = {0, [0]}}}; +increment_local_doc_rev(#doc{revs = {0, []}} = Doc) -> + {ok, Doc#doc{revs = {0, [1]}}}; +increment_local_doc_rev(#doc{revs = {0, [RevStr | _]}} = Doc) -> + try + PrevRev = binary_to_integer(RevStr), + {ok, Doc#doc{revs = {0, [PrevRev + 1]}}} + catch error:badarg -> + {error, <<"Invalid rev format">>} + end; +increment_local_doc_rev(#doc{}) -> + {error, <<"Invalid rev format">>}. + + +doc_to_revid(#doc{revs = Revs}) -> + case Revs of + {0, []} -> {0, <<>>}; + {RevPos, [Rev | _]} -> {RevPos, Rev} + end. + + +tag_docs([]) -> + []; +tag_docs([#doc{meta = Meta} = Doc | Rest]) -> + NewDoc = Doc#doc{ + meta = [{ref, make_ref()} | Meta] + }, + [NewDoc | tag_docs(Rest)]. + + +doc_tag(#doc{meta = Meta}) -> + fabric2_util:get_value(ref, Meta). + + +idrevs({Id, Revs}) when is_list(Revs) -> + {docid(Id), [rev(R) || R <- Revs]}. + + +docid(DocId) when is_list(DocId) -> + list_to_binary(DocId); +docid(DocId) -> + DocId. + + +rev(Rev) when is_list(Rev); is_binary(Rev) -> + couch_doc:parse_rev(Rev); +rev({Seq, Hash} = Rev) when is_integer(Seq), is_binary(Hash) -> + Rev. + diff --git a/src/fabric/src/fabric2_events.erl b/src/fabric/src/fabric2_events.erl new file mode 100644 index 000000000..a5717147f --- /dev/null +++ b/src/fabric/src/fabric2_events.erl @@ -0,0 +1,84 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(fabric2_events). + + +-export([ + link_listener/4, + stop_listener/1 +]). + +-export([ + init/5, + poll/5 +]). + + +-include_lib("couch/include/couch_db.hrl"). + + +link_listener(Mod, Fun, St, Options) -> + DbName = fabric2_util:get_value(dbname, Options), + Pid = spawn_link(?MODULE, init, [self(), DbName, Mod, Fun, St]), + receive + {Pid, initialized} -> ok + end, + {ok, Pid}. + + +stop_listener(Pid) -> + Pid ! stop_listening. + + +init(Parent, DbName, Mod, Fun, St) -> + {ok, Db} = fabric2_db:open(DbName, [?ADMIN_CTX]), + Since = fabric2_db:get_update_seq(Db), + couch_log:error("XKCD: START LISTENER: ~s : ~p for ~p", [DbName, Since, Parent]), + erlang:monitor(process, Parent), + Parent ! {self(), initialized}, + poll(DbName, Since, Mod, Fun, St), + couch_log:error("XKCD: STOP LISTENER for ~p", [Parent]). + + +poll(DbName, Since, Mod, Fun, St) -> + {Resp, NewSince} = try + case fabric2_db:open(DbName, [?ADMIN_CTX]) of + {ok, Db} -> + case fabric2_db:get_update_seq(Db) of + Since -> + couch_log:error("XKCD: NO UPDATE: ~s :: ~p", [DbName, Since]), + {{ok, St}, Since}; + Other -> + couch_log:error("XKCD: UPDATED: ~s :: ~p -> ~p", [DbName, Since, Other]), + {Mod:Fun(DbName, updated, St), Other} + end; + Error -> + exit(Error) + end + catch error:database_does_not_exist -> + Mod:Fun(DbName, deleted, St) + end, + receive + stop_listening -> + ok; + {'DOWN', _, _, _, _} -> + ok + after 0 -> + case Resp of + {ok, NewSt} -> + timer:sleep(1000), + ?MODULE:poll(DbName, NewSince, Mod, Fun, NewSt); + {stop, _} -> + ok + end + end. diff --git a/src/fabric/src/fabric2_fdb.erl b/src/fabric/src/fabric2_fdb.erl new file mode 100644 index 000000000..0a4f2981b --- /dev/null +++ b/src/fabric/src/fabric2_fdb.erl @@ -0,0 +1,1187 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(fabric2_fdb). + + +-export([ + transactional/1, + transactional/3, + transactional/2, + + create/2, + open/2, + reopen/1, + delete/1, + exists/1, + + list_dbs/2, + + get_info/1, + get_config/1, + set_config/3, + + get_stat/2, + incr_stat/3, + + get_all_revs/2, + get_winning_revs/3, + get_winning_revs_future/3, + get_winning_revs_wait/2, + get_non_deleted_rev/3, + + get_doc_body/3, + get_doc_body_future/3, + get_doc_body_wait/4, + get_local_doc/2, + + write_doc/6, + write_local_doc/2, + + read_attachment/3, + write_attachment/3, + + fold_docs/4, + fold_changes/5, + get_last_change/1, + + vs_to_seq/1, + + debug_cluster/0, + debug_cluster/2 +]). + + +-include_lib("couch/include/couch_db.hrl"). +-include("fabric2.hrl"). + + +transactional(Fun) -> + do_transaction(Fun, undefined). + + +transactional(DbName, Options, Fun) when is_binary(DbName) -> + transactional(fun(Tx) -> + Fun(init_db(Tx, DbName, Options)) + end). + + +transactional(#{tx := undefined} = Db, Fun) -> + #{layer_prefix := LayerPrefix} = Db, + do_transaction(fun(Tx) -> + Fun(Db#{tx => Tx}) + end, LayerPrefix); + +transactional(#{tx := {erlfdb_transaction, _}} = Db, Fun) -> + Fun(Db). + + +do_transaction(Fun, LayerPrefix) when is_function(Fun, 1) -> + Db = get_db_handle(), + try + erlfdb:transactional(Db, fun(Tx) -> + case get(erlfdb_trace) of + Name when is_binary(Name) -> + erlfdb:set_option(Tx, transaction_logging_enable, Name); + _ -> + ok + end, + case is_transaction_applied(Tx) of + true -> + get_previous_transaction_result(); + false -> + execute_transaction(Tx, Fun, LayerPrefix) + end + end) + after + clear_transaction() + end. + + +create(#{} = Db0, Options) -> + #{ + name := DbName, + tx := Tx, + layer_prefix := LayerPrefix + } = Db = ensure_current(Db0, false), + + % Eventually DbPrefix will be HCA allocated. For now + % we're just using the DbName so that debugging is easier. + DbKey = erlfdb_tuple:pack({?ALL_DBS, DbName}, LayerPrefix), + DbPrefix = erlfdb_tuple:pack({?DBS, DbName}, LayerPrefix), + erlfdb:set(Tx, DbKey, DbPrefix), + + % This key is responsible for telling us when something in + % the database cache (i.e., fabric2_server's ets table) has + % changed and requires re-loading. This currently includes + % revs_limit and validate_doc_update functions. There's + % no order to versioning here. Its just a value that changes + % that is used in the ensure_current check. + DbVersionKey = erlfdb_tuple:pack({?DB_VERSION}, DbPrefix), + DbVersion = fabric2_util:uuid(), + erlfdb:set(Tx, DbVersionKey, DbVersion), + + UUID = fabric2_util:uuid(), + + Defaults = [ + {?DB_CONFIG, <<"uuid">>, UUID}, + {?DB_CONFIG, <<"revs_limit">>, ?uint2bin(1000)}, + {?DB_CONFIG, <<"security_doc">>, <<"{}">>}, + {?DB_STATS, <<"doc_count">>, ?uint2bin(0)}, + {?DB_STATS, <<"doc_del_count">>, ?uint2bin(0)}, + {?DB_STATS, <<"doc_design_count">>, ?uint2bin(0)}, + {?DB_STATS, <<"doc_local_count">>, ?uint2bin(0)}, + {?DB_STATS, <<"size">>, ?uint2bin(2)} + ], + lists:foreach(fun({P, K, V}) -> + Key = erlfdb_tuple:pack({P, K}, DbPrefix), + erlfdb:set(Tx, Key, V) + end, Defaults), + + UserCtx = fabric2_util:get_value(user_ctx, Options, #user_ctx{}), + + Db#{ + uuid => UUID, + db_prefix => DbPrefix, + db_version => DbVersion, + + revs_limit => 1000, + security_doc => {[]}, + user_ctx => UserCtx, + + validate_doc_update_funs => [], + before_doc_update => undefined, + after_doc_read => undefined, + % All other db things as we add features, + + db_options => Options + }. + + +open(#{} = Db0, Options) -> + #{ + name := DbName, + tx := Tx, + layer_prefix := LayerPrefix + } = Db1 = ensure_current(Db0, false), + + DbKey = erlfdb_tuple:pack({?ALL_DBS, DbName}, LayerPrefix), + DbPrefix = case erlfdb:wait(erlfdb:get(Tx, DbKey)) of + Bin when is_binary(Bin) -> Bin; + not_found -> erlang:error(database_does_not_exist) + end, + + DbVersionKey = erlfdb_tuple:pack({?DB_VERSION}, DbPrefix), + DbVersion = erlfdb:wait(erlfdb:get(Tx, DbVersionKey)), + + UserCtx = fabric2_util:get_value(user_ctx, Options, #user_ctx{}), + + Db2 = Db1#{ + db_prefix => DbPrefix, + db_version => DbVersion, + + revs_limit => 1000, + security_doc => {[]}, + user_ctx => UserCtx, + + % Place holders until we implement these + % bits. + validate_doc_update_funs => [], + before_doc_update => undefined, + after_doc_read => undefined, + + db_options => Options + }, + + Db3 = lists:foldl(fun({Key, Val}, DbAcc) -> + case Key of + <<"uuid">> -> + DbAcc#{uuid => Val}; + <<"revs_limit">> -> + DbAcc#{revs_limit => ?bin2uint(Val)}; + <<"security_doc">> -> + DbAcc#{security_doc => ?JSON_DECODE(Val)} + end + end, Db2, get_config(Db2)), + + load_validate_doc_funs(Db3). + + +reopen(#{} = OldDb) -> + require_transaction(OldDb), + #{ + tx := Tx, + name := DbName, + db_options := Options + } = OldDb, + open(init_db(Tx, DbName, Options), Options). + + +delete(#{} = Db) -> + #{ + name := DbName, + tx := Tx, + layer_prefix := LayerPrefix, + db_prefix := DbPrefix + } = ensure_current(Db), + + DbKey = erlfdb_tuple:pack({?ALL_DBS, DbName}, LayerPrefix), + erlfdb:clear(Tx, DbKey), + erlfdb:clear_range_startswith(Tx, DbPrefix), + bump_metadata_version(Tx), + ok. + + +exists(#{name := DbName} = Db) when is_binary(DbName) -> + #{ + tx := Tx, + layer_prefix := LayerPrefix + } = ensure_current(Db, false), + + DbKey = erlfdb_tuple:pack({?ALL_DBS, DbName}, LayerPrefix), + case erlfdb:wait(erlfdb:get(Tx, DbKey)) of + Bin when is_binary(Bin) -> true; + not_found -> false + end. + + +list_dbs(Tx, _Options) -> + Root = erlfdb_directory:root(), + CouchDB = erlfdb_directory:create_or_open(Tx, Root, [<<"couchdb">>]), + LayerPrefix = erlfdb_directory:get_name(CouchDB), + {Start, End} = erlfdb_tuple:range({?ALL_DBS}, LayerPrefix), + Future = erlfdb:get_range(Tx, Start, End), + lists:map(fun({K, _V}) -> + {?ALL_DBS, DbName} = erlfdb_tuple:unpack(K, LayerPrefix), + DbName + end, erlfdb:wait(Future)). + + +get_info(#{} = Db) -> + #{ + tx := Tx, + db_prefix := DbPrefix + } = ensure_current(Db), + + {CStart, CEnd} = erlfdb_tuple:range({?DB_CHANGES}, DbPrefix), + ChangesFuture = erlfdb:get_range(Tx, CStart, CEnd, [ + {streaming_mode, exact}, + {limit, 1}, + {reverse, true} + ]), + + StatsPrefix = erlfdb_tuple:pack({?DB_STATS}, DbPrefix), + MetaFuture = erlfdb:get_range_startswith(Tx, StatsPrefix), + + RawSeq = case erlfdb:wait(ChangesFuture) of + [] -> + vs_to_seq(fabric2_util:seq_zero_vs()); + [{SeqKey, _}] -> + {?DB_CHANGES, SeqVS} = erlfdb_tuple:unpack(SeqKey, DbPrefix), + vs_to_seq(SeqVS) + end, + CProp = {update_seq, RawSeq}, + + MProps = lists:flatmap(fun({K, V}) -> + case erlfdb_tuple:unpack(K, DbPrefix) of + {?DB_STATS, <<"doc_count">>} -> + [{doc_count, ?bin2uint(V)}]; + {?DB_STATS, <<"doc_del_count">>} -> + [{doc_del_count, ?bin2uint(V)}]; + {?DB_STATS, <<"size">>} -> + Val = ?bin2uint(V), + [ + {other, {[{data_size, Val}]}}, + {sizes, {[ + {active, 0}, + {external, Val}, + {file, 0} + ]}} + ]; + {?DB_STATS, _} -> + [] + end + end, erlfdb:wait(MetaFuture)), + + [CProp | MProps]. + + +get_config(#{} = Db) -> + #{ + tx := Tx, + db_prefix := DbPrefix + } = Db = ensure_current(Db), + + {Start, End} = erlfdb_tuple:range({?DB_CONFIG}, DbPrefix), + Future = erlfdb:get_range(Tx, Start, End), + + lists:map(fun({K, V}) -> + {?DB_CONFIG, Key} = erlfdb_tuple:unpack(K, DbPrefix), + {Key, V} + end, erlfdb:wait(Future)). + + +set_config(#{} = Db, ConfigKey, ConfigVal) -> + #{ + tx := Tx, + db_prefix := DbPrefix + } = ensure_current(Db), + + Key = erlfdb_tuple:pack({?DB_CONFIG, ConfigKey}, DbPrefix), + erlfdb:set(Tx, Key, ConfigVal), + bump_metadata_version(Tx). + + +get_stat(#{} = Db, StatKey) -> + #{ + tx := Tx, + db_prefix := DbPrefix + } = ensure_current(Db), + + Key = erlfdb_tuple:pack({?DB_STATS, StatKey}, DbPrefix), + + % Might need to figure out some sort of type + % system here. Uints are because stats are all + % atomic op adds for the moment. + ?bin2uint(erlfdb:wait(erlfdb:get(Tx, Key))). + + +incr_stat(_Db, _StatKey, 0) -> + ok; + +incr_stat(#{} = Db, StatKey, Increment) when is_integer(Increment) -> + #{ + tx := Tx, + db_prefix := DbPrefix + } = ensure_current(Db), + + Key = erlfdb_tuple:pack({?DB_STATS, StatKey}, DbPrefix), + erlfdb:add(Tx, Key, Increment). + + +get_all_revs(#{} = Db, DocId) -> + #{ + tx := Tx, + db_prefix := DbPrefix + } = ensure_current(Db), + + Prefix = erlfdb_tuple:pack({?DB_REVS, DocId}, DbPrefix), + Options = [{streaming_mode, want_all}], + Future = erlfdb:get_range_startswith(Tx, Prefix, Options), + lists:map(fun({K, V}) -> + Key = erlfdb_tuple:unpack(K, DbPrefix), + Val = erlfdb_tuple:unpack(V), + fdb_to_revinfo(Key, Val) + end, erlfdb:wait(Future)). + + +get_winning_revs(Db, DocId, NumRevs) -> + Future = get_winning_revs_future(Db, DocId, NumRevs), + get_winning_revs_wait(Db, Future). + + +get_winning_revs_future(#{} = Db, DocId, NumRevs) -> + #{ + tx := Tx, + db_prefix := DbPrefix + } = ensure_current(Db), + + {StartKey, EndKey} = erlfdb_tuple:range({?DB_REVS, DocId}, DbPrefix), + Options = [{reverse, true}, {limit, NumRevs}], + erlfdb:get_range_raw(Tx, StartKey, EndKey, Options). + + +get_winning_revs_wait(#{} = Db, Future) -> + #{ + db_prefix := DbPrefix + } = ensure_current(Db), + {Rows, _, _} = erlfdb:wait(Future), + lists:map(fun({K, V}) -> + Key = erlfdb_tuple:unpack(K, DbPrefix), + Val = erlfdb_tuple:unpack(V), + fdb_to_revinfo(Key, Val) + end, Rows). + + +get_non_deleted_rev(#{} = Db, DocId, RevId) -> + #{ + tx := Tx, + db_prefix := DbPrefix + } = ensure_current(Db), + + {RevPos, Rev} = RevId, + + BaseKey = {?DB_REVS, DocId, true, RevPos, Rev}, + Key = erlfdb_tuple:pack(BaseKey, DbPrefix), + case erlfdb:wait(erlfdb:get(Tx, Key)) of + not_found -> + not_found; + Val -> + fdb_to_revinfo(BaseKey, erlfdb_tuple:unpack(Val)) + end. + + +get_doc_body(Db, DocId, RevInfo) -> + Future = get_doc_body_future(Db, DocId, RevInfo), + get_doc_body_wait(Db, DocId, RevInfo, Future). + + +get_doc_body_future(#{} = Db, DocId, RevInfo) -> + #{ + tx := Tx, + db_prefix := DbPrefix + } = ensure_current(Db), + + #{ + rev_id := {RevPos, Rev} + } = RevInfo, + + Key = erlfdb_tuple:pack({?DB_DOCS, DocId, RevPos, Rev}, DbPrefix), + erlfdb:get(Tx, Key). + + +get_doc_body_wait(#{} = Db0, DocId, RevInfo, Future) -> + Db = ensure_current(Db0), + + #{ + rev_id := {RevPos, Rev}, + rev_path := RevPath + } = RevInfo, + + Val = erlfdb:wait(Future), + fdb_to_doc(Db, DocId, RevPos, [Rev | RevPath], Val). + + +get_local_doc(#{} = Db0, <<?LOCAL_DOC_PREFIX, _/binary>> = DocId) -> + #{ + tx := Tx, + db_prefix := DbPrefix + } = Db = ensure_current(Db0), + + Key = erlfdb_tuple:pack({?DB_LOCAL_DOCS, DocId}, DbPrefix), + Val = erlfdb:wait(erlfdb:get(Tx, Key)), + fdb_to_local_doc(Db, DocId, Val). + + +write_doc(#{} = Db0, Doc, NewWinner0, OldWinner, ToUpdate, ToRemove) -> + #{ + tx := Tx, + db_prefix := DbPrefix + } = Db = ensure_current(Db0), + + #doc{ + id = DocId, + deleted = Deleted + } = Doc, + + % Revision tree + + NewWinner = NewWinner0#{winner := true}, + NewRevId = maps:get(rev_id, NewWinner), + + {WKey, WVal, WinnerVS} = revinfo_to_fdb(Tx, DbPrefix, DocId, NewWinner), + ok = erlfdb:set_versionstamped_value(Tx, WKey, WVal), + + lists:foreach(fun(RI0) -> + RI = RI0#{winner := false}, + {K, V, undefined} = revinfo_to_fdb(Tx, DbPrefix, DocId, RI), + ok = erlfdb:set(Tx, K, V) + end, ToUpdate), + + lists:foreach(fun(RI0) -> + RI = RI0#{winner := false}, + {K, _, undefined} = revinfo_to_fdb(Tx, DbPrefix, DocId, RI), + ok = erlfdb:clear(Tx, K) + end, ToRemove), + + % _all_docs + + UpdateStatus = case {OldWinner, NewWinner} of + {not_found, #{deleted := false}} -> + created; + {#{deleted := true}, #{deleted := false}} -> + recreated; + {#{deleted := false}, #{deleted := false}} -> + updated; + {#{deleted := false}, #{deleted := true}} -> + deleted + end, + + case UpdateStatus of + Status when Status == created orelse Status == recreated -> + ADKey = erlfdb_tuple:pack({?DB_ALL_DOCS, DocId}, DbPrefix), + ADVal = erlfdb_tuple:pack(NewRevId), + ok = erlfdb:set(Tx, ADKey, ADVal); + deleted -> + ADKey = erlfdb_tuple:pack({?DB_ALL_DOCS, DocId}, DbPrefix), + ok = erlfdb:clear(Tx, ADKey); + updated -> + ok + end, + + % _changes + + if OldWinner == not_found -> ok; true -> + OldSeq = maps:get(sequence, OldWinner), + OldSeqKey = erlfdb_tuple:pack({?DB_CHANGES, OldSeq}, DbPrefix), + erlfdb:clear(Tx, OldSeqKey) + end, + + NewSeqKey = erlfdb_tuple:pack_vs({?DB_CHANGES, WinnerVS}, DbPrefix), + NewSeqVal = erlfdb_tuple:pack({DocId, Deleted, NewRevId}), + erlfdb:set_versionstamped_key(Tx, NewSeqKey, NewSeqVal), + + % And all the rest... + + ok = write_doc_body(Db, Doc), + + IsDDoc = case Doc#doc.id of + <<?DESIGN_DOC_PREFIX, _/binary>> -> true; + _ -> false + end, + + if not IsDDoc -> ok; true -> + bump_db_version(Db) + end, + + case UpdateStatus of + created -> + if not IsDDoc -> ok; true -> + incr_stat(Db, <<"doc_design_count">>, 1) + end, + incr_stat(Db, <<"doc_count">>, 1); + recreated -> + if not IsDDoc -> ok; true -> + incr_stat(Db, <<"doc_design_count">>, 1) + end, + incr_stat(Db, <<"doc_count">>, 1), + incr_stat(Db, <<"doc_del_count">>, -1); + deleted -> + if not IsDDoc -> ok; true -> + incr_stat(Db, <<"doc_design_count">>, -1) + end, + incr_stat(Db, <<"doc_count">>, -1), + incr_stat(Db, <<"doc_del_count">>, 1); + updated -> + ok + end, + + ok. + + +write_local_doc(#{} = Db0, Doc) -> + #{ + tx := Tx + } = Db = ensure_current(Db0), + + {LDocKey, LDocVal} = local_doc_to_fdb(Db, Doc), + + WasDeleted = case erlfdb:wait(erlfdb:get(Tx, LDocKey)) of + <<_/binary>> -> false; + not_found -> true + end, + + case Doc#doc.deleted of + true -> erlfdb:clear(Tx, LDocKey); + false -> erlfdb:set(Tx, LDocKey, LDocVal) + end, + + case {WasDeleted, Doc#doc.deleted} of + {true, false} -> + incr_stat(Db, <<"doc_local_count">>, 1); + {false, true} -> + incr_stat(Db, <<"doc_local_count">>, -1); + _ -> + ok + end, + + ok. + + +read_attachment(#{} = Db, DocId, AttId) -> + #{ + tx := Tx, + db_prefix := DbPrefix + } = ensure_current(Db), + + AttKey = erlfdb_tuple:pack({?DB_ATTS, DocId, AttId}, DbPrefix), + case erlfdb:wait(erlfdb:get_range_startswith(Tx, AttKey)) of + not_found -> + throw({not_found, missing}); + KVs -> + Vs = [V || {_K, V} <- KVs], + iolist_to_binary(Vs) + end. + + +write_attachment(#{} = Db, DocId, Data) when is_binary(Data) -> + #{ + tx := Tx, + db_prefix := DbPrefix + } = ensure_current(Db), + + AttId = fabric2_util:uuid(), + Chunks = chunkify_attachment(Data), + + lists:foldl(fun(Chunk, ChunkId) -> + AttKey = erlfdb_tuple:pack({?DB_ATTS, DocId, AttId, ChunkId}, DbPrefix), + ok = erlfdb:set(Tx, AttKey, Chunk), + ChunkId + 1 + end, 0, Chunks), + {ok, AttId}. + + +fold_docs(#{} = Db, UserFun, UserAcc0, Options) -> + #{ + tx := Tx, + db_prefix := DbPrefix + } = ensure_current(Db), + + {Reverse, Start, End} = get_dir_and_bounds(DbPrefix, Options), + + DocCountKey = erlfdb_tuple:pack({?DB_STATS, <<"doc_count">>}, DbPrefix), + DocCountBin = erlfdb:wait(erlfdb:get(Tx, DocCountKey)), + + try + UserAcc1 = maybe_stop(UserFun({meta, [ + {total, ?bin2uint(DocCountBin)}, + {offset, null} + ]}, UserAcc0)), + + UserAcc2 = erlfdb:fold_range(Tx, Start, End, fun({K, V}, UserAccIn) -> + {?DB_ALL_DOCS, DocId} = erlfdb_tuple:unpack(K, DbPrefix), + RevId = erlfdb_tuple:unpack(V), + maybe_stop(UserFun({row, [ + {id, DocId}, + {key, DocId}, + {value, couch_doc:rev_to_str(RevId)} + ]}, UserAccIn)) + end, UserAcc1, [{reverse, Reverse}] ++ Options), + + {ok, maybe_stop(UserFun(complete, UserAcc2))} + catch throw:{stop, FinalUserAcc} -> + {ok, FinalUserAcc} + end. + + +fold_changes(#{} = Db, SinceSeq0, UserFun, UserAcc0, Options) -> + #{ + tx := Tx, + db_prefix := DbPrefix + } = ensure_current(Db), + + SinceSeq1 = get_since_seq(SinceSeq0), + + Reverse = case fabric2_util:get_value(dir, Options, fwd) of + fwd -> false; + rev -> true + end, + + {Start0, End0} = case Reverse of + false -> {SinceSeq1, fabric2_util:seq_max_vs()}; + true -> {fabric2_util:seq_zero_vs(), SinceSeq1} + end, + + Start1 = erlfdb_tuple:pack({?DB_CHANGES, Start0}, DbPrefix), + End1 = erlfdb_tuple:pack({?DB_CHANGES, End0}, DbPrefix), + + {Start, End} = case Reverse of + false -> {erlfdb_key:first_greater_than(Start1), End1}; + true -> {Start1, erlfdb_key:first_greater_than(End1)} + end, + + try + {ok, erlfdb:fold_range(Tx, Start, End, fun({K, V}, UserAccIn) -> + {?DB_CHANGES, SeqVS} = erlfdb_tuple:unpack(K, DbPrefix), + {DocId, Deleted, RevId} = erlfdb_tuple:unpack(V), + + Change = #{ + id => DocId, + sequence => vs_to_seq(SeqVS), + rev_id => RevId, + deleted => Deleted + }, + + maybe_stop(UserFun(Change, UserAccIn)) + end, UserAcc0, [{reverse, Reverse}] ++ Options)} + catch throw:{stop, FinalUserAcc} -> + {ok, FinalUserAcc} + end. + + +get_last_change(#{} = Db) -> + #{ + tx := Tx, + db_prefix := DbPrefix + } = ensure_current(Db), + + {Start, End} = erlfdb_tuple:range({?DB_CHANGES}, DbPrefix), + Options = [{limit, 1}, {reverse, true}], + case erlfdb:get_range(Tx, Start, End, Options) of + [] -> + vs_to_seq(fabric2_util:seq_zero_vs()); + [{K, _V}] -> + {?DB_CHANGES, SeqVS} = erlfdb_tuple:unpack(K, DbPrefix), + vs_to_seq(SeqVS) + end. + + +maybe_stop({ok, Acc}) -> + Acc; +maybe_stop({stop, Acc}) -> + throw({stop, Acc}). + + +vs_to_seq(VS) -> + <<51:8, SeqBin:12/binary>> = erlfdb_tuple:pack({VS}), + fabric2_util:to_hex(SeqBin). + + +debug_cluster() -> + debug_cluster(<<>>, <<16#FE, 16#FF, 16#FF>>). + + +debug_cluster(Start, End) -> + transactional(fun(Tx) -> + lists:foreach(fun({Key, Val}) -> + io:format("~s => ~s~n", [ + string:pad(erlfdb_util:repr(Key), 60), + erlfdb_util:repr(Val) + ]) + end, erlfdb:get_range(Tx, Start, End)) + end). + + +init_db(Tx, DbName, Options) -> + Root = erlfdb_directory:root(), + CouchDB = erlfdb_directory:create_or_open(Tx, Root, [<<"couchdb">>]), + Prefix = erlfdb_directory:get_name(CouchDB), + Version = erlfdb:wait(erlfdb:get(Tx, ?METADATA_VERSION_KEY)), + #{ + name => DbName, + tx => Tx, + layer_prefix => Prefix, + md_version => Version, + + db_options => Options + }. + + +load_validate_doc_funs(#{} = Db) -> + FoldFun = fun + ({row, Row}, Acc) -> + DDocInfo = #{id => fabric2_util:get_value(id, Row)}, + {ok, [DDocInfo | Acc]}; + (_, Acc) -> + {ok, Acc} + end, + + Options = [ + {start_key, <<"_design/">>}, + {end_key, <<"_design0">>} + ], + + {ok, Infos1} = fold_docs(Db, FoldFun, [], Options), + + Infos2 = lists:map(fun(Info) -> + #{ + id := DDocId = <<"_design/", _/binary>> + } = Info, + Info#{ + rev_info => get_winning_revs_future(Db, DDocId, 1) + } + end, Infos1), + + Infos3 = lists:flatmap(fun(Info) -> + #{ + id := DDocId, + rev_info := RevInfoFuture + } = Info, + [RevInfo] = get_winning_revs_wait(Db, RevInfoFuture), + #{deleted := Deleted} = RevInfo, + if Deleted -> []; true -> + [Info#{ + rev_info := RevInfo, + body => get_doc_body_future(Db, DDocId, RevInfo) + }] + end + end, Infos2), + + VDUs = lists:flatmap(fun(Info) -> + #{ + id := DDocId, + rev_info := RevInfo, + body := BodyFuture + } = Info, + #doc{} = Doc = get_doc_body_wait(Db, DDocId, RevInfo, BodyFuture), + case couch_doc:get_validate_doc_fun(Doc) of + nil -> []; + Fun -> [Fun] + end + end, Infos3), + + Db#{ + validate_doc_update_funs := VDUs + }. + + +bump_metadata_version(Tx) -> + % The 14 zero bytes is pulled from the PR for adding the + % metadata version key. Not sure why 14 bytes when version + % stamps are only 80, but whatever for now. + erlfdb:set_versionstamped_value(Tx, ?METADATA_VERSION_KEY, <<0:112>>). + + +bump_db_version(#{} = Db) -> + #{ + tx := Tx, + db_prefix := DbPrefix + } = Db, + + DbVersionKey = erlfdb_tuple:pack({?DB_VERSION}, DbPrefix), + DbVersion = fabric2_util:uuid(), + ok = erlfdb:set(Tx, DbVersionKey, DbVersion). + + +write_doc_body(#{} = Db0, #doc{} = Doc) -> + #{ + tx := Tx + } = Db = ensure_current(Db0), + + {NewDocKey, NewDocVal} = doc_to_fdb(Db, Doc), + erlfdb:set(Tx, NewDocKey, NewDocVal). + + +revinfo_to_fdb(Tx, DbPrefix, DocId, #{winner := true} = RevId) -> + #{ + deleted := Deleted, + rev_id := {RevPos, Rev}, + rev_path := RevPath, + branch_count := BranchCount + } = RevId, + VS = new_versionstamp(Tx), + Key = {?DB_REVS, DocId, not Deleted, RevPos, Rev}, + Val = {?CURR_REV_FORMAT, VS, BranchCount, list_to_tuple(RevPath)}, + KBin = erlfdb_tuple:pack(Key, DbPrefix), + VBin = erlfdb_tuple:pack_vs(Val), + {KBin, VBin, VS}; + +revinfo_to_fdb(_Tx, DbPrefix, DocId, #{} = RevId) -> + #{ + deleted := Deleted, + rev_id := {RevPos, Rev}, + rev_path := RevPath + } = RevId, + Key = {?DB_REVS, DocId, not Deleted, RevPos, Rev}, + Val = {?CURR_REV_FORMAT, list_to_tuple(RevPath)}, + KBin = erlfdb_tuple:pack(Key, DbPrefix), + VBin = erlfdb_tuple:pack(Val), + {KBin, VBin, undefined}. + + +fdb_to_revinfo(Key, {?CURR_REV_FORMAT, _, _, _} = Val) -> + {?DB_REVS, _DocId, NotDeleted, RevPos, Rev} = Key, + {_RevFormat, Sequence, BranchCount, RevPath} = Val, + #{ + winner => true, + deleted => not NotDeleted, + rev_id => {RevPos, Rev}, + rev_path => tuple_to_list(RevPath), + sequence => Sequence, + branch_count => BranchCount + }; + +fdb_to_revinfo(Key, {?CURR_REV_FORMAT, _} = Val) -> + {?DB_REVS, _DocId, NotDeleted, RevPos, Rev} = Key, + {_RevFormat, RevPath} = Val, + #{ + winner => false, + deleted => not NotDeleted, + rev_id => {RevPos, Rev}, + rev_path => tuple_to_list(RevPath), + sequence => undefined, + branch_count => undefined + }. + + +doc_to_fdb(Db, #doc{} = Doc) -> + #{ + db_prefix := DbPrefix + } = Db, + + #doc{ + id = Id, + revs = {Start, [Rev | _]}, + body = Body, + atts = Atts, + deleted = Deleted + } = doc_flush_atts(Db, Doc), + + Key = erlfdb_tuple:pack({?DB_DOCS, Id, Start, Rev}, DbPrefix), + Val = {Body, Atts, Deleted}, + {Key, term_to_binary(Val, [{minor_version, 1}])}. + + +fdb_to_doc(_Db, DocId, Pos, Path, Bin) when is_binary(Bin) -> + {Body, Atts, Deleted} = binary_to_term(Bin, [safe]), + #doc{ + id = DocId, + revs = {Pos, Path}, + body = Body, + atts = Atts, + deleted = Deleted + }; +fdb_to_doc(_Db, _DocId, _Pos, _Path, not_found) -> + {not_found, missing}. + + +local_doc_to_fdb(Db, #doc{} = Doc) -> + #{ + db_prefix := DbPrefix + } = Db, + + #doc{ + id = Id, + revs = {0, [Rev]}, + body = Body + } = Doc, + + StoreRev = case Rev of + _ when is_integer(Rev) -> integer_to_binary(Rev); + _ when is_binary(Rev) -> Rev + end, + + Key = erlfdb_tuple:pack({?DB_LOCAL_DOCS, Id}, DbPrefix), + Val = {StoreRev, Body}, + {Key, term_to_binary(Val, [{minor_version, 1}])}. + + +fdb_to_local_doc(_Db, DocId, Bin) when is_binary(Bin) -> + {Rev, Body} = binary_to_term(Bin, [safe]), + #doc{ + id = DocId, + revs = {0, [Rev]}, + deleted = false, + body = Body + }; +fdb_to_local_doc(_Db, _DocId, not_found) -> + {not_found, missing}. + + +doc_flush_atts(Db, Doc) -> + Atts = lists:map(fun(Att) -> + couch_att:flush(Db, Doc#doc.id, Att) + end, Doc#doc.atts), + Doc#doc{atts = Atts}. + + +chunkify_attachment(Data) -> + case Data of + <<>> -> + []; + <<Head:?ATTACHMENT_CHUNK_SIZE/binary, Rest/binary>> -> + [Head | chunkify_attachment(Rest)]; + <<_/binary>> when size(Data) < ?ATTACHMENT_CHUNK_SIZE -> + [Data] + end. + + +get_dir_and_bounds(DbPrefix, Options) -> + Reverse = case fabric2_util:get_value(dir, Options, fwd) of + fwd -> false; + rev -> true + end, + StartKey0 = fabric2_util:get_value(start_key, Options), + EndKeyGt = fabric2_util:get_value(end_key_gt, Options), + EndKey0 = fabric2_util:get_value(end_key, Options, EndKeyGt), + InclusiveEnd = EndKeyGt == undefined, + + % CouchDB swaps the key meanings based on the direction + % of the fold. FoundationDB does not so we have to + % swap back here. + {StartKey1, EndKey1} = case Reverse of + false -> {StartKey0, EndKey0}; + true -> {EndKey0, StartKey0} + end, + + % Set the maximum bounds for the start and endkey + StartKey2 = case StartKey1 of + undefined -> {?DB_ALL_DOCS}; + SK2 when is_binary(SK2) -> {?DB_ALL_DOCS, SK2} + end, + + EndKey2 = case EndKey1 of + undefined -> {?DB_ALL_DOCS, <<16#FF>>}; + EK2 when is_binary(EK2) -> {?DB_ALL_DOCS, EK2} + end, + + StartKey3 = erlfdb_tuple:pack(StartKey2, DbPrefix), + EndKey3 = erlfdb_tuple:pack(EndKey2, DbPrefix), + + % FoundationDB ranges are applied as SK <= key < EK + % By default, CouchDB is SK <= key <= EK with the + % optional inclusive_end=false option changing that + % to SK <= key < EK. Also, remember that CouchDB + % swaps the meaning of SK and EK based on direction. + % + % Thus we have this wonderful bit of logic to account + % for all of those combinations. + + StartKey4 = case {Reverse, InclusiveEnd} of + {true, false} -> + erlfdb_key:first_greater_than(StartKey3); + _ -> + StartKey3 + end, + + EndKey4 = case {Reverse, InclusiveEnd} of + {false, true} when EndKey0 /= undefined -> + erlfdb_key:first_greater_than(EndKey3); + {true, _} -> + erlfdb_key:first_greater_than(EndKey3); + _ -> + EndKey3 + end, + + {Reverse, StartKey4, EndKey4}. + + +get_since_seq(Seq) when Seq == <<>>; Seq == <<"0">>; Seq == 0-> + fabric2_util:seq_zero_vs(); + +get_since_seq(Seq) when Seq == now; Seq == <<"now">> -> + fabric2_util:seq_max_vs(); + +get_since_seq(Seq) when is_binary(Seq), size(Seq) == 24 -> + Seq1 = fabric2_util:from_hex(Seq), + Seq2 = <<51:8, Seq1/binary>>, + {SeqVS} = erlfdb_tuple:unpack(Seq2), + SeqVS; + +get_since_seq(List) when is_list(List) -> + get_since_seq(list_to_binary(List)); + +get_since_seq(Seq) -> + erlang:error({invalid_since_seq, Seq}). + + +get_db_handle() -> + case get(?PDICT_DB_KEY) of + undefined -> + {ok, Db} = application:get_env(fabric, db), + put(?PDICT_DB_KEY, Db), + Db; + Db -> + Db + end. + + +require_transaction(#{tx := {erlfdb_transaction, _}} = _Db) -> + ok; +require_transaction(#{} = _Db) -> + erlang:error(transaction_required). + + +ensure_current(Db) -> + ensure_current(Db, true). + + +ensure_current(#{} = Db, CheckDbVersion) -> + require_transaction(Db), + + #{ + tx := Tx, + md_version := MetaDataVersion + } = Db, + + case erlfdb:wait(erlfdb:get(Tx, ?METADATA_VERSION_KEY)) of + MetaDataVersion -> Db; + _NewVersion -> reopen(Db) + end, + + AlreadyChecked = get(?PDICT_CHECKED_DB_IS_CURRENT), + if not CheckDbVersion orelse AlreadyChecked == true -> Db; true -> + #{ + db_prefix := DbPrefix, + db_version := DbVersion + } = Db, + + DbVersionKey = erlfdb_tuple:pack({?DB_VERSION}, DbPrefix), + + case erlfdb:wait(erlfdb:get(Tx, DbVersionKey)) of + DbVersion -> + put(?PDICT_CHECKED_DB_IS_CURRENT, true), + Db; + _NewDBVersion -> + fabric2_server:remove(maps:get(name, Db)), + reopen(Db) + end + end. + + +is_transaction_applied(Tx) -> + is_commit_unknown_result() + andalso has_transaction_id() + andalso transaction_id_exists(Tx). + + +get_previous_transaction_result() -> + get(?PDICT_TX_RES_KEY). + + +execute_transaction(Tx, Fun, LayerPrefix) -> + put(?PDICT_CHECKED_DB_IS_CURRENT, false), + Result = Fun(Tx), + case erlfdb:is_read_only(Tx) of + true -> + ok; + false -> + erlfdb:set(Tx, get_transaction_id(Tx, LayerPrefix), <<>>), + put(?PDICT_TX_RES_KEY, Result) + end, + Result. + + +clear_transaction() -> + fabric2_txids:remove(get(?PDICT_TX_ID_KEY)), + erase(?PDICT_CHECKED_DB_IS_CURRENT), + erase(?PDICT_TX_ID_KEY), + erase(?PDICT_TX_RES_KEY). + + +is_commit_unknown_result() -> + erlfdb:get_last_error() == ?COMMIT_UNKNOWN_RESULT. + + +has_transaction_id() -> + is_binary(get(?PDICT_TX_ID_KEY)). + + +transaction_id_exists(Tx) -> + erlfdb:wait(erlfdb:get(Tx, get(?PDICT_TX_ID_KEY))) == <<>>. + + +get_transaction_id(Tx, LayerPrefix) -> + case get(?PDICT_TX_ID_KEY) of + undefined -> + TxId = fabric2_txids:create(Tx, LayerPrefix), + put(?PDICT_TX_ID_KEY, TxId), + TxId; + TxId when is_binary(TxId) -> + TxId + end. + + +new_versionstamp(Tx) -> + TxId = erlfdb:get_next_tx_id(Tx), + {versionstamp, 16#FFFFFFFFFFFFFFFF, 16#FFFF, TxId}. + diff --git a/src/fabric/src/fabric2_server.erl b/src/fabric/src/fabric2_server.erl new file mode 100644 index 000000000..5b826cd14 --- /dev/null +++ b/src/fabric/src/fabric2_server.erl @@ -0,0 +1,104 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(fabric2_server). +-behaviour(gen_server). +-vsn(1). + + +-export([ + start_link/0, + fetch/1, + store/1, + remove/1 +]). + + +-export([ + init/1, + terminate/2, + handle_call/3, + handle_cast/2, + handle_info/2, + code_change/3 +]). + + +-include_lib("couch/include/couch_db.hrl"). + + +-define(CLUSTER_FILE, "/usr/local/etc/foundationdb/fdb.cluster"). + + +start_link() -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). + + +fetch(DbName) when is_binary(DbName) -> + case ets:lookup(?MODULE, DbName) of + [{DbName, #{} = Db}] -> Db; + [] -> undefined + end. + + +store(#{name := DbName} = Db0) when is_binary(DbName) -> + Db1 = Db0#{ + tx := undefined, + user_ctx := #user_ctx{} + }, + true = ets:insert(?MODULE, {DbName, Db1}), + ok. + + +remove(DbName) when is_binary(DbName) -> + true = ets:delete(?MODULE, DbName), + ok. + + +init(_) -> + ets:new(?MODULE, [ + public, + named_table, + {read_concurrency, true}, + {write_concurrency, true} + ]), + + Db = case application:get_env(fabric, eunit_run) of + {ok, true} -> + erlfdb_util:get_test_db([empty]); + undefined -> + ClusterStr = config:get("erlfdb", "cluster_file", ?CLUSTER_FILE), + erlfdb:open(iolist_to_binary(ClusterStr)) + end, + application:set_env(fabric, db, Db), + + {ok, nil}. + + +terminate(_, _St) -> + ok. + + +handle_call(Msg, _From, St) -> + {stop, {bad_call, Msg}, {bad_call, Msg}, St}. + + +handle_cast(Msg, St) -> + {stop, {bad_cast, Msg}, St}. + + +handle_info(Msg, St) -> + {stop, {bad_info, Msg}, St}. + + +code_change(_OldVsn, St, _Extra) -> + {ok, St}. diff --git a/src/fabric/src/fabric2_sup.erl b/src/fabric/src/fabric2_sup.erl new file mode 100644 index 000000000..73c6c1f4d --- /dev/null +++ b/src/fabric/src/fabric2_sup.erl @@ -0,0 +1,47 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(fabric2_sup). +-behaviour(supervisor). +-vsn(1). + + +-export([ + start_link/1 +]). + +-export([ + init/1 +]). + + +start_link(Args) -> + supervisor:start_link({local, ?MODULE}, ?MODULE, Args). + + +init([]) -> + Flags = #{ + strategy => one_for_one, + intensity => 1, + period => 5 + }, + Children = [ + #{ + id => fabric2_server, + start => {fabric2_server, start_link, []} + }, + #{ + id => fabric2_txids, + start => {fabric2_txids, start_link, []} + } + ], + {ok, {Flags, Children}}. diff --git a/src/fabric/src/fabric2_txids.erl b/src/fabric/src/fabric2_txids.erl new file mode 100644 index 000000000..bbb8bdf57 --- /dev/null +++ b/src/fabric/src/fabric2_txids.erl @@ -0,0 +1,144 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(fabric2_txids). +-behaviour(gen_server). +-vsn(1). + + +-export([ + start_link/0, + create/2, + remove/1 +]). + + +-export([ + init/1, + terminate/2, + handle_call/3, + handle_cast/2, + handle_info/2, + code_change/3 +]). + + +-include("fabric2.hrl"). + + +-define(ONE_HOUR, 3600000000). +-define(MAX_TX_IDS, 1000). + + +start_link() -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). + + +create(Tx, undefined) -> + Root = erlfdb_directory:root(), + CouchDB = erlfdb_directory:create_or_open(Tx, Root, [<<"couchdb">>]), + Prefix = erlfdb_directory:get_name(CouchDB), + create(Tx, Prefix); + +create(_Tx, LayerPrefix) -> + {Mega, Secs, Micro} = os:timestamp(), + Key = {?TX_IDS, Mega, Secs, Micro, fabric2_util:uuid()}, + erlfdb_tuple:pack(Key, LayerPrefix). + + +remove(TxId) when is_binary(TxId) -> + gen_server:cast(?MODULE, {remove, TxId}); + +remove(undefined) -> + ok. + + + +init(_) -> + {ok, #{ + last_sweep => os:timestamp(), + txids => [] + }}. + + +terminate(_, #{txids := TxIds}) -> + if TxIds == [] -> ok; true -> + fabric2_fdb:transactional(fun(Tx) -> + lists:foreach(fun(TxId) -> + erlfdb:clear(Tx, TxId) + end) + end) + end, + ok. + + +handle_call(Msg, _From, St) -> + {stop, {bad_call, Msg}, {bad_call, Msg}, St}. + + +handle_cast({remove, TxId}, St) -> + #{ + last_sweep := LastSweep, + txids := TxIds + } = St, + + NewTxIds = [TxId | TxIds], + NewSt = St#{txids := NewTxIds}, + + NeedsSweep = timer:now_diff(os:timestamp(), LastSweep) > ?ONE_HOUR, + + case NeedsSweep orelse length(NewTxIds) >= ?MAX_TX_IDS of + true -> + {noreply, clean(NewSt, NeedsSweep)}; + false -> + {noreply, NewSt} + end. + + +handle_info(Msg, St) -> + {stop, {bad_info, Msg}, St}. + + +code_change(_OldVsn, St, _Extra) -> + {ok, St}. + + +clean(St, NeedsSweep) -> + #{ + last_sweep := LastSweep, + txids := TxIds + } = St, + fabric2_fdb:transactional(fun(Tx) -> + lists:foreach(fun(TxId) -> + erlfdb:clear(Tx, TxId) + end, TxIds), + case NeedsSweep of + true -> + sweep(Tx, LastSweep), + St#{ + last_sweep := os:timestamp(), + txids := [] + }; + false -> + St#{txids := []} + end + end). + + +sweep(Tx, {Mega, Secs, Micro}) -> + Root = erlfdb_directory:root(), + CouchDB = erlfdb_directory:create_or_open(Tx, Root, [<<"couchdb">>]), + Prefix = erlfdb_directory:get_name(CouchDB), + StartKey = erlfdb_tuple:pack({?TX_IDS}, Prefix), + EndKey = erlfdb_tuple:pack({?TX_IDS, Mega, Secs, Micro}, Prefix), + erlfdb:set_option(Tx, next_write_no_write_conflict_range), + erlfdb:clear_range(Tx, StartKey, EndKey). diff --git a/src/fabric/src/fabric2_util.erl b/src/fabric/src/fabric2_util.erl new file mode 100644 index 000000000..6e2df67c2 --- /dev/null +++ b/src/fabric/src/fabric2_util.erl @@ -0,0 +1,203 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(fabric2_util). + + +-export([ + revinfo_to_path/1, + sort_revinfos/1, + + seq_zero_vs/0, + seq_max_vs/0, + + user_ctx_to_json/1, + + validate_security_object/1, + + get_value/2, + get_value/3, + to_hex/1, + from_hex/1, + uuid/0 +]). + + +-include_lib("couch/include/couch_db.hrl"). + + +revinfo_to_path(RevInfo) -> + #{ + rev_id := {RevPos, Rev}, + rev_path := RevPath + } = RevInfo, + Revs = lists:reverse(RevPath, [Rev]), + Path = revinfo_to_path(RevInfo, Revs), + {RevPos - length(Revs) + 1, Path}. + + +revinfo_to_path(RevInfo, [Rev]) -> + {Rev, RevInfo, []}; + +revinfo_to_path(RevInfo, [Rev | Rest]) -> + {Rev, ?REV_MISSING, [revinfo_to_path(RevInfo, Rest)]}. + + +sort_revinfos(RevInfos) -> + CmpFun = fun(A, B) -> rev_sort_key(A) > rev_sort_key(B) end, + lists:sort(CmpFun, RevInfos). + + +rev_sort_key(#{} = RevInfo) -> + #{ + deleted := Deleted, + rev_id := {RevPos, Rev} + } = RevInfo, + {not Deleted, RevPos, Rev}. + + +seq_zero_vs() -> + {versionstamp, 0, 0, 0}. + + +seq_max_vs() -> + {versionstamp, 18446744073709551615, 65535, 65535}. + + +user_ctx_to_json(Db) -> + UserCtx = fabric2_db:get_user_ctx(Db), + {[ + {<<"db">>, fabric2_db:name(Db)}, + {<<"name">>, UserCtx#user_ctx.name}, + {<<"roles">>, UserCtx#user_ctx.roles} + ]}. + + +validate_security_object({SecProps}) -> + Admins = get_value(<<"admins">>, SecProps, {[]}), + ok = validate_names_and_roles(Admins), + + % we fallback to readers here for backwards compatibility + Readers = get_value(<<"readers">>, SecProps, {[]}), + Members = get_value(<<"members">>, SecProps, Readers), + ok = validate_names_and_roles(Members). + + +validate_names_and_roles({Props}) when is_list(Props) -> + validate_json_list_of_strings(<<"names">>, Props), + validate_json_list_of_strings(<<"roles">>, Props); +validate_names_and_roles(_) -> + throw("admins or members must be a JSON list of strings"). + + +validate_json_list_of_strings(Member, Props) -> + case get_value(Member, Props, []) of + Values when is_list(Values) -> + NonBinary = lists:filter(fun(V) -> not is_binary(V) end, Values), + if NonBinary == [] -> ok; true -> + MemberStr = binary_to_list(Member), + throw(MemberStr ++ " must be a JSON list of strings") + end; + _ -> + MemberStr = binary_to_list(Member), + throw(MemberStr ++ " must be a JSON list of strings") + end. + + +get_value(Key, List) -> + get_value(Key, List, undefined). + + +get_value(Key, List, Default) -> + case lists:keysearch(Key, 1, List) of + {value, {Key,Value}} -> + Value; + false -> + Default + end. + + +to_hex(Bin) -> + list_to_binary(to_hex_int(Bin)). + + +to_hex_int(<<>>) -> + []; +to_hex_int(<<Hi:4, Lo:4, Rest/binary>>) -> + [nibble_to_hex(Hi), nibble_to_hex(Lo) | to_hex(Rest)]. + + +nibble_to_hex(I) -> + case I of + 0 -> $0; + 1 -> $1; + 2 -> $2; + 3 -> $3; + 4 -> $4; + 5 -> $5; + 6 -> $6; + 7 -> $7; + 8 -> $8; + 9 -> $9; + 10 -> $a; + 11 -> $b; + 12 -> $c; + 13 -> $d; + 14 -> $e; + 15 -> $f + end. + + +from_hex(Bin) -> + iolist_to_binary(from_hex_int(Bin)). + + +from_hex_int(<<>>) -> + []; +from_hex_int(<<Hi:8, Lo:8, RestBinary/binary>>) -> + HiNib = hex_to_nibble(Hi), + LoNib = hex_to_nibble(Lo), + [<<HiNib:4, LoNib:4>> | from_hex_int(RestBinary)]; +from_hex_int(<<BadHex/binary>>) -> + erlang:error({invalid_hex, BadHex}). + + +hex_to_nibble(N) -> + case N of + $0 -> 0; + $1 -> 1; + $2 -> 2; + $3 -> 3; + $4 -> 4; + $5 -> 5; + $6 -> 6; + $7 -> 7; + $8 -> 8; + $9 -> 9; + $a -> 10; + $A -> 10; + $b -> 11; + $B -> 11; + $c -> 12; + $C -> 12; + $d -> 13; + $D -> 13; + $e -> 14; + $E -> 14; + $f -> 15; + $F -> 15; + _ -> erlang:error({invalid_hex, N}) + end. + + +uuid() -> + to_hex(crypto:strong_rand_bytes(16)). |