summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPaul J. Davis <paul.joseph.davis@gmail.com>2019-06-05 13:29:33 -0500
committerPaul J. Davis <paul.joseph.davis@gmail.com>2019-06-05 13:53:37 -0500
commitee2e4c8cd70f833f04c4504f6664e241fad5061f (patch)
treee9fac9739d67e05ccf4a1ac2c3d34c2526c1a996
parentd7b015c006eed95d3ad80a2c1daadd94503dfd2d (diff)
downloadcouchdb-ee2e4c8cd70f833f04c4504f6664e241fad5061f.tar.gz
Initial fabric2 implementation on FoundationDB
This provides a base implementation of a fabric API backed by FoundationDB. While a lot of functionality is provided there are a number of places that still require work. An incomplete list includes: 1. Document bodies are currently a single key/value 2. Attachments are stored as a range of key/value pairs 3. There is no support for indexing 4. Request size limits are not enforced directly 5. Auth is still backed by a legacy CouchDB database 6. No support for before_doc_update/after_doc_read 7. Various implementation shortcuts need to be expanded for full API support.
-rw-r--r--FDB_NOTES.md57
-rw-r--r--src/couch/src/couch_att.erl661
-rw-r--r--src/couch/src/couch_doc.erl11
-rw-r--r--src/fabric/src/fabric.app.src8
-rw-r--r--src/fabric/src/fabric2.hrl66
-rw-r--r--src/fabric/src/fabric2_app.erl32
-rw-r--r--src/fabric/src/fabric2_db.erl1299
-rw-r--r--src/fabric/src/fabric2_events.erl84
-rw-r--r--src/fabric/src/fabric2_fdb.erl1187
-rw-r--r--src/fabric/src/fabric2_server.erl104
-rw-r--r--src/fabric/src/fabric2_sup.erl47
-rw-r--r--src/fabric/src/fabric2_txids.erl144
-rw-r--r--src/fabric/src/fabric2_util.erl203
13 files changed, 3502 insertions, 401 deletions
diff --git a/FDB_NOTES.md b/FDB_NOTES.md
new file mode 100644
index 000000000..c0cdc8cc2
--- /dev/null
+++ b/FDB_NOTES.md
@@ -0,0 +1,57 @@
+Things of Note
+===
+
+
+1. If a replication sends us two revisions A and B where one is an
+ ancestor of the other, we likely have divergent behavior. However,
+ this should never happen In Theory.
+
+2. Multiple updates to the same document in a _bulk_docs (or if they
+ just happen to be in the same update batch in non-fdb CouchDB)
+ we likely have subtly different behavior.
+
+3. I'm relying on repeated reads in an fdb transaction to be "cheap"
+ in that the reads would be cached in the fdb_transaction object.
+ This needs to be checked for certainty but that appeared to
+ be how things behaved in testing.
+
+4. When attempting to create a doc from scratch in an interacitve_edit
+ update, with revisions specified *and* attachment stubs, the reported
+ error is now a conflict. Previously the missing_stubs error was
+ raised earlier.
+
+5. There may be a difference in behavior if a) there are no VDU functions
+ set on a db and no design documents in a batch. This is because in
+ this situation we don't run the prep_and_validate code on pre-fdb
+ CouchDB. The new code always checks stubs before merging revision trees.
+ I'm sure the old way would fail somehow, but it would fail further on
+ which means we may have failed with a different reason (conflict, etc)
+ before we got to the next place we check for missing stubs.
+
+6. For multi-doc updates we'll need to investigate user versions on
+ versionstamps within a transaction. Also this likely prevents the
+ ability to have multiple updates to the same doc in a single
+ _bulk_docs transaction
+
+7. Document body storage needs to be implemented beyond the single
+ key/value approach.
+
+8. We'll want to look at how we currently apply open options to individual
+ elements of an open_revs call. Might turn out that we have to grab a
+ full FDI even if we could look up a rev directly. (i.e., revs_info
+ would require us having the entire FDI, however it'd be wasteful to return
+ all of that in an open_revs call, but bug compatibility ftw!)
+
+9. Is it possible that a server_admin can delete a db without being able
+ to open it? If so that's probably changed behavior.
+
+10. All docs on large active databases might be a thing getting the doc
+ count. If we allow range requests up to 5s, and we continue to return
+ the doc count total we may have to play games with snapshot reads on
+ the doc count key or else it'll whack any _all_docs range requests
+
+11. Revision infos need to track their size f we want to maintain a database
+ size counter we'll want to store the size of a given doc body for each
+ revision so that we don't have to read the old body when updating the tree.
+
+12. Update sequences do not yet include an incarnation value. \ No newline at end of file
diff --git a/src/couch/src/couch_att.erl b/src/couch/src/couch_att.erl
index a24de21d6..0dc5fa56b 100644
--- a/src/couch/src/couch_att.erl
+++ b/src/couch/src/couch_att.erl
@@ -29,7 +29,7 @@
-export([
size_info/1,
to_disk_term/1,
- from_disk_term/2
+ from_disk_term/3
]).
-export([
@@ -38,7 +38,7 @@
]).
-export([
- flush/2,
+ flush/3,
foldl/3,
range_foldl/5,
foldl_decode/3,
@@ -46,11 +46,6 @@
]).
-export([
- upgrade/1,
- downgrade/1
-]).
-
--export([
max_attachment_size/0,
validate_attachment_size/3
]).
@@ -58,137 +53,61 @@
-compile(nowarn_deprecated_type).
-export_type([att/0]).
--include_lib("couch/include/couch_db.hrl").
-
-
-%% Legacy attachment record. This is going to be phased out by the new proplist
-%% based structure. It's needed for now to allow code to perform lazy upgrades
-%% while the patch is rolled out to the cluster. Attachments passed as records
-%% will remain so until they are required to be represented as property lists.
-%% Once this has been widely deployed, this record will be removed entirely and
-%% property lists will be the main format.
--record(att, {
- name :: binary(),
- type :: binary(),
- att_len :: non_neg_integer(),
-
- %% length of the attachment in its identity form
- %% (that is, without a content encoding applied to it)
- %% differs from att_len when encoding /= identity
- disk_len :: non_neg_integer(),
-
- md5 = <<>> :: binary(),
- revpos = 0 :: non_neg_integer(),
- data :: stub | follows | binary() | {any(), any()} |
- {follows, pid(), reference()} | fun(() -> binary()),
-
- %% Encoding of the attachment
- %% currently supported values are:
- %% identity, gzip
- %% additional values to support in the future:
- %% deflate, compress
- encoding = identity :: identity | gzip
-}).
-
-
-%% Extensible Attachment Type
-%%
-%% The following types describe the known properties for attachment fields
-%% encoded as property lists to allow easier upgrades. Values not in this list
-%% should be accepted at runtime but should be treated as opaque data as might
-%% be used by upgraded code. If you plan on operating on new data, please add
-%% an entry here as documentation.
-
-
-%% The name of the attachment is also used as the mime-part name for file
-%% downloads. These must be unique per document.
--type name_prop() :: {name, binary()}.
-
-
-%% The mime type of the attachment. This does affect compression of certain
-%% attachments if the type is found to be configured as a compressable type.
-%% This is commonly reserved for text/* types but could include other custom
-%% cases as well. See definition and use of couch_util:compressable_att_type/1.
--type type_prop() :: {type, binary()}.
-
-
-%% The attachment length is similar to disk-length but ignores additional
-%% encoding that may have occurred.
--type att_len_prop() :: {att_len, non_neg_integer()}.
-
-
-%% The size of the attachment as stored in a disk stream.
--type disk_len_prop() :: {disk_len, non_neg_integer()}.
-
-
-%% This is a digest of the original attachment data as uploaded by the client.
-%% it's useful for checking validity of contents against other attachment data
-%% as well as quick digest computation of the enclosing document.
--type md5_prop() :: {md5, binary()}.
-
--type revpos_prop() :: {revpos, 0}.
+-include_lib("couch/include/couch_db.hrl").
-%% This field is currently overloaded with just about everything. The
-%% {any(), any()} type is just there until I have time to check the actual
-%% values expected. Over time this should be split into more than one property
-%% to allow simpler handling.
--type data_prop() :: {
- data, stub | follows | binary() | {any(), any()} |
- {follows, pid(), reference()} | fun(() -> binary())
-}.
+-define(CURRENT_ATT_FORMAT, 0).
-%% We will occasionally compress our data. See type_prop() for more information
-%% on when this happens.
--type encoding_prop() :: {encoding, identity | gzip}.
+-type prop_name() ::
+ name |
+ type |
+ att_len |
+ disk_len |
+ md5 |
+ revpos |
+ data |
+ encoding.
--type attachment() :: [
- name_prop() | type_prop() |
- att_len_prop() | disk_len_prop() |
- md5_prop() | revpos_prop() |
- data_prop() | encoding_prop()
-].
+-type data_prop_type() ::
+ {loc, #{}, binary(), binary()} |
+ stub |
+ follows |
+ binary() |
+ {follows, pid(), reference()} |
+ fun(() -> binary()).
--type disk_att_v1() :: {
- Name :: binary(),
- Type :: binary(),
- Sp :: any(),
- AttLen :: non_neg_integer(),
- RevPos :: non_neg_integer(),
- Md5 :: binary()
-}.
--type disk_att_v2() :: {
- Name :: binary(),
- Type :: binary(),
- Sp :: any(),
- AttLen :: non_neg_integer(),
- DiskLen :: non_neg_integer(),
- RevPos :: non_neg_integer(),
- Md5 :: binary(),
- Enc :: identity | gzip
+-type att() :: #{
+ name := binary(),
+ type := binary(),
+ att_len := non_neg_integer() | undefined,
+ disk_len := non_neg_integer() | undefined,
+ md5 := binary() | undefined,
+ revpos := non_neg_integer(),
+ data := data_prop_type(),
+ encoding := identity | gzip | undefined,
+ headers := [{binary(), binary()}] | undefined
}.
--type disk_att_v3() :: {Base :: tuple(), Extended :: list()}.
-
--type disk_att() :: disk_att_v1() | disk_att_v2() | disk_att_v3().
-
--type att() :: #att{} | attachment() | disk_att().
new() ->
- %% We construct a record by default for compatability. This will be
- %% upgraded on demand. A subtle effect this has on all attachments
- %% constructed via new is that it will pick up the proper defaults
- %% from the #att record definition given above. Newer properties do
- %% not support special default values and will all be treated as
- %% undefined.
- #att{}.
+ #{
+ name => <<>>,
+ type => <<>>,
+ att_len => undefined,
+ disk_len => undefined,
+ md5 => undefined,
+ revpos => 0,
+ data => undefined,
+ encoding => undefined,
+ headers => undefined
+ }.
--spec new([{atom(), any()}]) -> att().
+-spec new([{prop_name(), any()}]) -> att().
new(Props) ->
store(Props, new()).
@@ -197,71 +116,28 @@ new(Props) ->
(atom(), att()) -> any().
fetch(Fields, Att) when is_list(Fields) ->
[fetch(Field, Att) || Field <- Fields];
-fetch(Field, Att) when is_list(Att) ->
- case lists:keyfind(Field, 1, Att) of
- {Field, Value} -> Value;
- false -> undefined
- end;
-fetch(name, #att{name = Name}) ->
- Name;
-fetch(type, #att{type = Type}) ->
- Type;
-fetch(att_len, #att{att_len = AttLen}) ->
- AttLen;
-fetch(disk_len, #att{disk_len = DiskLen}) ->
- DiskLen;
-fetch(md5, #att{md5 = Digest}) ->
- Digest;
-fetch(revpos, #att{revpos = RevPos}) ->
- RevPos;
-fetch(data, #att{data = Data}) ->
- Data;
-fetch(encoding, #att{encoding = Encoding}) ->
- Encoding;
-fetch(_, _) ->
- undefined.
+fetch(Field, Att) ->
+ maps:get(Field, Att).
-spec store([{atom(), any()}], att()) -> att().
store(Props, Att0) ->
lists:foldl(fun({Field, Value}, Att) ->
- store(Field, Value, Att)
+ maps:update(Field, Value, Att)
end, Att0, Props).
--spec store(atom(), any(), att()) -> att().
-store(Field, undefined, Att) when is_list(Att) ->
- lists:keydelete(Field, 1, Att);
-store(Field, Value, Att) when is_list(Att) ->
- lists:keystore(Field, 1, Att, {Field, Value});
-store(name, Name, Att) ->
- Att#att{name = Name};
-store(type, Type, Att) ->
- Att#att{type = Type};
-store(att_len, AttLen, Att) ->
- Att#att{att_len = AttLen};
-store(disk_len, DiskLen, Att) ->
- Att#att{disk_len = DiskLen};
-store(md5, Digest, Att) ->
- Att#att{md5 = Digest};
-store(revpos, RevPos, Att) ->
- Att#att{revpos = RevPos};
-store(data, Data, Att) ->
- Att#att{data = Data};
-store(encoding, Encoding, Att) ->
- Att#att{encoding = Encoding};
store(Field, Value, Att) ->
- store(Field, Value, upgrade(Att)).
+ maps:update(Field, Value, Att).
-spec transform(atom(), fun(), att()) -> att().
transform(Field, Fun, Att) ->
- NewValue = Fun(fetch(Field, Att)),
- store(Field, NewValue, Att).
+ maps:update_with(Field, Fun, Att).
-is_stub(Att) ->
- stub == fetch(data, Att).
+is_stub(#{data := stub}) -> true;
+is_stub(#{}) -> false.
%% merge_stubs takes all stub attachments and replaces them with on disk
@@ -275,8 +151,7 @@ merge_stubs(MemAtts, DiskAtts) ->
merge_stubs(MemAtts, OnDisk, []).
-%% restore spec when R14 support is dropped
-%% -spec merge_stubs([att()], dict:dict(), [att()]) -> [att()].
+-spec merge_stubs([att()], dict:dict(), [att()]) -> [att()].
merge_stubs([Att | Rest], OnDisk, Merged) ->
case fetch(data, Att) of
stub ->
@@ -308,14 +183,8 @@ size_info([]) ->
{ok, []};
size_info(Atts) ->
Info = lists:map(fun(Att) ->
- AttLen = fetch(att_len, Att),
- case fetch(data, Att) of
- {stream, StreamEngine} ->
- {ok, SPos} = couch_stream:to_disk_term(StreamEngine),
- {SPos, AttLen};
- {_, SPos} ->
- {SPos, AttLen}
- end
+ [{loc, _Db, _DocId, AttId}, AttLen] = fetch([data, att_len], Att),
+ {AttId, AttLen}
end, Atts),
{ok, lists:usort(Info)}.
@@ -324,89 +193,44 @@ size_info(Atts) ->
%% old format when possible. This should help make the attachment lazy upgrade
%% as safe as possible, avoiding the need for complicated disk versioning
%% schemes.
-to_disk_term(#att{} = Att) ->
- {stream, StreamEngine} = fetch(data, Att),
- {ok, Sp} = couch_stream:to_disk_term(StreamEngine),
- {
+to_disk_term(Att) ->
+ {loc, #{}, _DocId, AttId} = fetch(data, Att),
+ {?CURRENT_ATT_FORMAT, {
fetch(name, Att),
fetch(type, Att),
- Sp,
+ AttId,
fetch(att_len, Att),
fetch(disk_len, Att),
fetch(revpos, Att),
fetch(md5, Att),
- fetch(encoding, Att)
- };
-to_disk_term(Att) ->
- BaseProps = [name, type, data, att_len, disk_len, revpos, md5, encoding],
- {Extended, Base} = lists:foldl(
- fun
- (data, {Props, Values}) ->
- case lists:keytake(data, 1, Props) of
- {value, {_, {stream, StreamEngine}}, Other} ->
- {ok, Sp} = couch_stream:to_disk_term(StreamEngine),
- {Other, [Sp | Values]};
- {value, {_, Value}, Other} ->
- {Other, [Value | Values]};
- false ->
- {Props, [undefined | Values]}
- end;
- (Key, {Props, Values}) ->
- case lists:keytake(Key, 1, Props) of
- {value, {_, Value}, Other} -> {Other, [Value | Values]};
- false -> {Props, [undefined | Values]}
- end
- end,
- {Att, []},
- BaseProps
- ),
- {list_to_tuple(lists:reverse(Base)), Extended}.
-
-
-%% The new disk term format is a simple wrapper around the legacy format. Base
-%% properties will remain in a tuple while the new fields and possibly data from
-%% future extensions will be stored in a list of atom/value pairs. While this is
-%% slightly less efficient, future work should be able to make use of
-%% compression to remove these sorts of common bits (block level compression
-%% with something like a shared dictionary that is checkpointed every now and
-%% then).
-from_disk_term(StreamSrc, {Base, Extended})
- when is_tuple(Base), is_list(Extended) ->
- store(Extended, from_disk_term(StreamSrc, Base));
-from_disk_term(StreamSrc, {Name,Type,Sp,AttLen,DiskLen,RevPos,Md5,Enc}) ->
- {ok, Stream} = open_stream(StreamSrc, Sp),
- #att{
- name=Name,
- type=Type,
- att_len=AttLen,
- disk_len=DiskLen,
- md5=Md5,
- revpos=RevPos,
- data={stream, Stream},
- encoding=upgrade_encoding(Enc)
- };
-from_disk_term(StreamSrc, {Name,Type,Sp,AttLen,RevPos,Md5}) ->
- {ok, Stream} = open_stream(StreamSrc, Sp),
- #att{
- name=Name,
- type=Type,
- att_len=AttLen,
- disk_len=AttLen,
- md5=Md5,
- revpos=RevPos,
- data={stream, Stream}
- };
-from_disk_term(StreamSrc, {Name,{Type,Sp,AttLen}}) ->
- {ok, Stream} = open_stream(StreamSrc, Sp),
- #att{
- name=Name,
- type=Type,
- att_len=AttLen,
- disk_len=AttLen,
- md5= <<>>,
- revpos=0,
- data={stream, Stream}
- }.
+ fetch(encoding, Att),
+ fetch(headers, Att)
+ }}.
+
+
+from_disk_term(#{} = Db, DocId, {?CURRENT_ATT_FORMAT, Props}) ->
+ {
+ Name,
+ Type,
+ AttId,
+ AttLen,
+ DiskLen,
+ RevPos,
+ Md5,
+ Encoding,
+ Headers
+ } = Props,
+ new([
+ {name, Name},
+ {type, Type},
+ {data, {loc, Db#{tx := undefined}, DocId, AttId}},
+ {att_len, AttLen},
+ {disk_len, DiskLen},
+ {revpos, RevPos},
+ {md5, Md5},
+ {encoding, Encoding},
+ {headers, Headers}
+ ]).
%% from_json reads in embedded JSON attachments and creates usable attachment
@@ -433,8 +257,12 @@ stub_from_json(Att, Props) ->
%% json object. See merge_stubs/3 for the stub check.
RevPos = couch_util:get_value(<<"revpos">>, Props),
store([
- {md5, Digest}, {revpos, RevPos}, {data, stub}, {disk_len, DiskLen},
- {att_len, EncodedLen}, {encoding, Encoding}
+ {data, stub},
+ {disk_len, DiskLen},
+ {att_len, EncodedLen},
+ {revpos, RevPos},
+ {md5, Digest},
+ {encoding, Encoding}
], Att).
@@ -443,8 +271,12 @@ follow_from_json(Att, Props) ->
Digest = digest_from_json(Props),
RevPos = couch_util:get_value(<<"revpos">>, Props, 0),
store([
- {md5, Digest}, {revpos, RevPos}, {data, follows}, {disk_len, DiskLen},
- {att_len, EncodedLen}, {encoding, Encoding}
+ {data, follows},
+ {disk_len, DiskLen},
+ {att_len, EncodedLen},
+ {revpos, RevPos},
+ {md5, Digest},
+ {encoding, Encoding}
], Att).
@@ -455,8 +287,10 @@ inline_from_json(Att, Props) ->
Length = size(Data),
RevPos = couch_util:get_value(<<"revpos">>, Props, 0),
store([
- {data, Data}, {revpos, RevPos}, {disk_len, Length},
- {att_len, Length}
+ {data, Data},
+ {disk_len, Length},
+ {att_len, Length},
+ {revpos, RevPos}
], Att)
catch
_:_ ->
@@ -466,7 +300,6 @@ inline_from_json(Att, Props) ->
end.
-
encoded_lengths_from_json(Props) ->
Len = couch_util:get_value(<<"length">>, Props),
case couch_util:get_value(<<"encoding">>, Props) of
@@ -488,9 +321,17 @@ digest_from_json(Props) ->
to_json(Att, OutputData, DataToFollow, ShowEncoding) ->
- [Name, Data, DiskLen, AttLen, Enc, Type, RevPos, Md5] = fetch(
- [name, data, disk_len, att_len, encoding, type, revpos, md5], Att
- ),
+ #{
+ name := Name,
+ type := Type,
+ data := Data,
+ disk_len := DiskLen,
+ att_len := AttLen,
+ revpos := RevPos,
+ md5 := Md5,
+ encoding := Encoding,
+ headers := Headers
+ } = Att,
Props = [
{<<"content_type">>, Type},
{<<"revpos">>, RevPos}
@@ -505,71 +346,74 @@ to_json(Att, OutputData, DataToFollow, ShowEncoding) ->
DataToFollow ->
[{<<"length">>, DiskLen}, {<<"follows">>, true}];
true ->
- AttData = case Enc of
+ AttData = case Encoding of
gzip -> zlib:gunzip(to_binary(Att));
identity -> to_binary(Att)
end,
[{<<"data">>, base64:encode(AttData)}]
end,
EncodingProps = if
- ShowEncoding andalso Enc /= identity ->
+ ShowEncoding andalso Encoding /= identity ->
[
- {<<"encoding">>, couch_util:to_binary(Enc)},
+ {<<"encoding">>, couch_util:to_binary(Encoding)},
{<<"encoded_length">>, AttLen}
];
true ->
[]
end,
- HeadersProp = case fetch(headers, Att) of
+ HeadersProp = case Headers of
undefined -> [];
Headers -> [{<<"headers">>, Headers}]
end,
{Name, {Props ++ DigestProp ++ DataProps ++ EncodingProps ++ HeadersProp}}.
-flush(Db, Att) ->
- flush_data(Db, fetch(data, Att), Att).
+flush(Db, DocId, Att1) ->
+ Att2 = read_data(fetch(data, Att1), Att1),
+ [
+ Data,
+ AttLen,
+ DiskLen,
+ ReqMd5,
+ Encoding
+ ] = fetch([data, att_len, disk_len, md5, encoding], Att2),
+
+ % Eventually, we'll check if we can compress this
+ % attachment here and do so if possible.
+
+ % If we were sent a gzip'ed attachment with no
+ % length data, we have to set it here.
+ Att3 = case AttLen of
+ undefined -> store(att_len, DiskLen, Att2);
+ _ -> Att2
+ end,
+
+ % If no encoding has been set, default to
+ % identity
+ Att4 = case Encoding of
+ undefined -> store(encoding, identity, Att3);
+ _ -> Att3
+ end,
+
+ case Data of
+ {loc, _, _, _} ->
+ % Already flushed
+ Att1;
+ _ when is_binary(Data) ->
+ IdentMd5 = get_identity_md5(Data, fetch(encoding, Att4)),
+ if ReqMd5 == undefined -> ok; true ->
+ couch_util:check_md5(IdentMd5, ReqMd5)
+ end,
+ Att5 = store(md5, IdentMd5, Att4),
+ fabric2_db:write_attachment(Db, DocId, Att5)
+ end.
-flush_data(Db, Data, Att) when is_binary(Data) ->
- couch_db:with_stream(Db, Att, fun(OutputStream) ->
- couch_stream:write(OutputStream, Data)
- end);
-flush_data(Db, Fun, Att) when is_function(Fun) ->
- AttName = fetch(name, Att),
- MaxAttSize = max_attachment_size(),
- case fetch(att_len, Att) of
- undefined ->
- couch_db:with_stream(Db, Att, fun(OutputStream) ->
- % Fun(MaxChunkSize, WriterFun) must call WriterFun
- % once for each chunk of the attachment,
- Fun(4096,
- % WriterFun({Length, Binary}, State)
- % WriterFun({0, _Footers}, State)
- % Called with Length == 0 on the last time.
- % WriterFun returns NewState.
- fun({0, Footers}, _Total) ->
- F = mochiweb_headers:from_binary(Footers),
- case mochiweb_headers:get_value("Content-MD5", F) of
- undefined ->
- ok;
- Md5 ->
- {md5, base64:decode(Md5)}
- end;
- ({Length, Chunk}, Total0) ->
- Total = Total0 + Length,
- validate_attachment_size(AttName, Total, MaxAttSize),
- couch_stream:write(OutputStream, Chunk),
- Total
- end, 0)
- end);
- AttLen ->
- validate_attachment_size(AttName, AttLen, MaxAttSize),
- couch_db:with_stream(Db, Att, fun(OutputStream) ->
- write_streamed_attachment(OutputStream, Fun, AttLen)
- end)
- end;
-flush_data(Db, {follows, Parser, Ref}, Att) ->
+read_data({loc, #{}, _DocId, _AttId}, Att) ->
+ % Attachment already written to fdb
+ Att;
+
+read_data({follows, Parser, Ref}, Att) ->
ParserRef = erlang:monitor(process, Parser),
Fun = fun() ->
Parser ! {get_bytes, Ref, self()},
@@ -583,41 +427,72 @@ flush_data(Db, {follows, Parser, Ref}, Att) ->
end
end,
try
- flush_data(Db, Fun, store(data, Fun, Att))
+ read_data(Fun, store(data, Fun, Att))
after
erlang:demonitor(ParserRef, [flush])
end;
-flush_data(Db, {stream, StreamEngine}, Att) ->
- case couch_db:is_active_stream(Db, StreamEngine) of
- true ->
- % Already written
- Att;
- false ->
- NewAtt = couch_db:with_stream(Db, Att, fun(OutputStream) ->
- couch_stream:copy(StreamEngine, OutputStream)
- end),
- InMd5 = fetch(md5, Att),
- OutMd5 = fetch(md5, NewAtt),
- couch_util:check_md5(OutMd5, InMd5),
- NewAtt
+
+read_data(Data, Att) when is_binary(Data) ->
+ Att;
+
+read_data(Fun, Att) when is_function(Fun) ->
+ [AttName, AttLen, InMd5] = fetch([name, att_len, md5], Att),
+ MaxAttSize = max_attachment_size(),
+ case AttLen of
+ undefined ->
+ % Fun(MaxChunkSize, WriterFun) must call WriterFun
+ % once for each chunk of the attachment,
+ WriterFun = fun
+ ({0, Footers}, {Len, Acc}) ->
+ F = mochiweb_headers:from_binary(Footers),
+ Md5 = case mochiweb_headers:get_value("Content-MD5", F) of
+ undefined -> undefined;
+ Value -> base64:decode(Value)
+ end,
+ Props0 = [
+ {data, iolist_to_binary(lists:reverse(Acc))},
+ {disk_len, Len}
+ ],
+ Props1 = if InMd5 /= md5_in_footer -> Props0; true ->
+ [{md5, Md5} | Props0]
+ end,
+ store(Props1, Att);
+ ({ChunkLen, Chunk}, {Len, Acc}) ->
+ NewLen = Len + ChunkLen,
+ validate_attachment_size(AttName, NewLen, MaxAttSize),
+ {NewLen, [Chunk | Acc]}
+ end,
+ Fun(8192, WriterFun, {0, []});
+ AttLen ->
+ validate_attachment_size(AttName, AttLen, MaxAttSize),
+ read_streamed_attachment(Att, Fun, AttLen, [])
end.
-write_streamed_attachment(_Stream, _F, 0) ->
- ok;
-write_streamed_attachment(_Stream, _F, LenLeft) when LenLeft < 0 ->
+read_streamed_attachment(Att, _F, 0, Acc) ->
+ Bin = iolist_to_binary(lists:reverse(Acc)),
+ store([
+ {data, Bin},
+ {disk_len, size(Bin)}
+ ], Att);
+
+read_streamed_attachment(_Att, _F, LenLeft, _Acc) when LenLeft < 0 ->
throw({bad_request, <<"attachment longer than expected">>});
-write_streamed_attachment(Stream, F, LenLeft) when LenLeft > 0 ->
- Bin = try read_next_chunk(F, LenLeft)
+
+read_streamed_attachment(Att, F, LenLeft, Acc) when LenLeft > 0 ->
+ Bin = try
+ read_next_chunk(F, LenLeft)
catch
{mp_parser_died, normal} ->
throw({bad_request, <<"attachment shorter than expected">>})
end,
- ok = couch_stream:write(Stream, Bin),
- write_streamed_attachment(Stream, F, LenLeft - iolist_size(Bin)).
+ Size = iolist_size(Bin),
+ read_streamed_attachment(Att, F, LenLeft - Size, [Bin | Acc]).
+
read_next_chunk(F, _) when is_function(F, 0) ->
F();
+
read_next_chunk(F, LenLeft) when is_function(F, 1) ->
F(lists:min([LenLeft, 16#2000])).
@@ -626,14 +501,17 @@ foldl(Att, Fun, Acc) ->
foldl(fetch(data, Att), Att, Fun, Acc).
+foldl({loc, Db, DocId, AttId}, _Att, Fun, Acc) ->
+ Bin = fabric2_db:read_attachment(Db#{tx := undefined}, DocId, AttId),
+ Fun(Bin, Acc);
+
foldl(Bin, _Att, Fun, Acc) when is_binary(Bin) ->
Fun(Bin, Acc);
-foldl({stream, StreamEngine}, Att, Fun, Acc) ->
- Md5 = fetch(md5, Att),
- couch_stream:foldl(StreamEngine, Md5, Fun, Acc);
+
foldl(DataFun, Att, Fun, Acc) when is_function(DataFun) ->
Len = fetch(att_len, Att),
fold_streamed_data(DataFun, Len, Fun, Acc);
+
foldl({follows, Parser, Ref}, Att, Fun, Acc) ->
ParserRef = erlang:monitor(process, Parser),
DataFun = fun() ->
@@ -654,19 +532,26 @@ foldl({follows, Parser, Ref}, Att, Fun, Acc) ->
end.
+range_foldl(Bin1, From, To, Fun, Acc) when is_binary(Bin1) ->
+ ReadLen = To - From,
+ Bin2 = case Bin1 of
+ _ when size(Bin1) < From -> <<>>;
+ <<_:From/binary, B2>> -> B2
+ end,
+ Bin3 = case Bin2 of
+ _ when size(Bin2) < ReadLen -> Bin2;
+ <<B3:ReadLen/binary, _/binary>> -> B3
+ end,
+ Fun(Bin3, Acc);
+
range_foldl(Att, From, To, Fun, Acc) ->
- {stream, StreamEngine} = fetch(data, Att),
- couch_stream:range_foldl(StreamEngine, From, To, Fun, Acc).
+ {loc, Db, DocId, AttId} = fetch(data, Att),
+ Bin = fabric2_db:read_attachment(Db, DocId, AttId),
+ range_foldl(Bin, From, To, Fun, Acc).
-foldl_decode(Att, Fun, Acc) ->
- case fetch([data, encoding], Att) of
- [{stream, StreamEngine}, Enc] ->
- couch_stream:foldl_decode(
- StreamEngine, fetch(md5, Att), Enc, Fun, Acc);
- [Fun2, identity] ->
- fold_streamed_data(Fun2, fetch(att_len, Att), Fun, Acc)
- end.
+foldl_decode(_Att, _Fun, _Acc) ->
+ erlang:error(not_supported).
to_binary(Att) ->
@@ -677,10 +562,8 @@ to_binary(Bin, _Att) when is_binary(Bin) ->
Bin;
to_binary(Iolist, _Att) when is_list(Iolist) ->
iolist_to_binary(Iolist);
-to_binary({stream, _StreamEngine}, Att) ->
- iolist_to_binary(
- lists:reverse(foldl(Att, fun(Bin,Acc) -> [Bin|Acc] end, []))
- );
+to_binary({loc, Db, DocId, AttId}, _Att) ->
+ fabric2_db:read_attachmet(Db, DocId, AttId);
to_binary(DataFun, Att) when is_function(DataFun)->
Len = fetch(att_len, Att),
iolist_to_binary(
@@ -695,46 +578,22 @@ to_binary(DataFun, Att) when is_function(DataFun)->
fold_streamed_data(_RcvFun, 0, _Fun, Acc) ->
Acc;
+
fold_streamed_data(RcvFun, LenLeft, Fun, Acc) when LenLeft > 0->
Bin = RcvFun(),
ResultAcc = Fun(Bin, Acc),
fold_streamed_data(RcvFun, LenLeft - size(Bin), Fun, ResultAcc).
-%% Upgrade an attachment record to a property list on demand. This is a one-way
-%% operation as downgrading potentially truncates fields with important data.
--spec upgrade(#att{}) -> attachment().
-upgrade(#att{} = Att) ->
- Map = lists:zip(
- record_info(fields, att),
- lists:seq(2, record_info(size, att))
- ),
- %% Don't store undefined elements since that is default
- [{F, element(I, Att)} || {F, I} <- Map, element(I, Att) /= undefined];
-upgrade(Att) ->
- Att.
-
-
-%% Downgrade is exposed for interactive convenience. In practice, unless done
-%% manually, upgrades are always one-way.
-downgrade(#att{} = Att) ->
- Att;
-downgrade(Att) ->
- #att{
- name = fetch(name, Att),
- type = fetch(type, Att),
- att_len = fetch(att_len, Att),
- disk_len = fetch(disk_len, Att),
- md5 = fetch(md5, Att),
- revpos = fetch(revpos, Att),
- data = fetch(data, Att),
- encoding = fetch(encoding, Att)
- }.
-
-
-upgrade_encoding(true) -> gzip;
-upgrade_encoding(false) -> identity;
-upgrade_encoding(Encoding) -> Encoding.
+get_identity_md5(Bin, gzip) ->
+ Z = zlib:open(),
+ ok = zlib:inflateInit(Z, 16 + 15),
+ Inflated = zlib:inflate(Z, Bin),
+ ok = zlib:inflateEnd(Z),
+ ok = zlib:close(Z),
+ couch_hash:md5_hash(Inflated);
+get_identity_md5(Bin, _) ->
+ couch_hash:md5_hash(Bin).
max_attachment_size() ->
@@ -753,18 +612,22 @@ validate_attachment_size(_AttName, _AttSize, _MAxAttSize) ->
ok.
-open_stream(StreamSrc, Data) ->
- case couch_db:is_db(StreamSrc) of
- true ->
- couch_db:open_read_stream(StreamSrc, Data);
- false ->
- case is_function(StreamSrc, 1) of
- true ->
- StreamSrc(Data);
- false ->
- erlang:error({invalid_stream_source, StreamSrc})
- end
- end.
+%% is_compressible(Type) when is_binary(Type) ->
+%% is_compressible(binary_to_list(Type));
+%% is_compressible(Type) ->
+%% TypeExpList = re:split(
+%% config:get("attachments", "compressible_types", ""),
+%% "\\s*,\\s*",
+%% [{return, list}]
+%% ),
+%% lists:any(
+%% fun(TypeExp) ->
+%% Regexp = ["^\\s*", re:replace(TypeExp, "\\*", ".*"),
+%% "(?:\\s*;.*?)?\\s*", $$],
+%% re:run(Type, Regexp, [caseless]) =/= nomatch
+%% end,
+%% [T || T <- TypeExpList, T /= []]
+%% ).
-ifdef(TEST).
diff --git a/src/couch/src/couch_doc.erl b/src/couch/src/couch_doc.erl
index 4a49372c7..d33325eb1 100644
--- a/src/couch/src/couch_doc.erl
+++ b/src/couch/src/couch_doc.erl
@@ -374,6 +374,17 @@ rev_info({#doc{} = Doc, {Pos, [RevId | _]}}) ->
body_sp = undefined,
seq = undefined,
rev = {Pos, RevId}
+ };
+rev_info({#{} = RevInfo, {Pos, [RevId | _]}}) ->
+ #{
+ deleted := Deleted,
+ sequence := Sequence
+ } = RevInfo,
+ #rev_info{
+ deleted = Deleted,
+ body_sp = undefined,
+ seq = Sequence,
+ rev = {Pos, RevId}
}.
is_deleted(#full_doc_info{rev_tree=Tree}) ->
diff --git a/src/fabric/src/fabric.app.src b/src/fabric/src/fabric.app.src
index d7686ca1a..20fbb1e2a 100644
--- a/src/fabric/src/fabric.app.src
+++ b/src/fabric/src/fabric.app.src
@@ -13,7 +13,10 @@
{application, fabric, [
{description, "Routing and proxying layer for CouchDB cluster"},
{vsn, git},
- {registered, []},
+ {mod, {fabric2_app, []}},
+ {registered, [
+ fabric_server
+ ]},
{applications, [
kernel,
stdlib,
@@ -22,6 +25,7 @@
rexi,
mem3,
couch_log,
- couch_stats
+ couch_stats,
+ erlfdb
]}
]}.
diff --git a/src/fabric/src/fabric2.hrl b/src/fabric/src/fabric2.hrl
new file mode 100644
index 000000000..de1d3d177
--- /dev/null
+++ b/src/fabric/src/fabric2.hrl
@@ -0,0 +1,66 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+
+-define(uint2bin(I), binary:encode_unsigned(I, little)).
+-define(bin2uint(I), binary:decode_unsigned(I, little)).
+
+% This will eventually be the `\xFFmetadataVersion` key that is
+% currently only available in FoundationDB master.
+%
+% https://forums.foundationdb.org/t/a-new-tool-for-managing-layer-metadata/1191
+%
+% Until then we'll fake the same behavior using a randomish
+% key for tracking metadata changse. Once we get to the
+% new feature this will be more performant by updating
+% this define.
+-define(METADATA_VERSION_KEY, <<"$metadata_version_key$">>).
+
+
+% Prefix Definitions
+
+% Layer Level: (LayerPrefix, X, ...)
+
+-define(CLUSTER_CONFIG, 0).
+-define(ALL_DBS, 1).
+-define(DBS, 15).
+-define(TX_IDS, 255).
+
+% Database Level: (LayerPrefix, ?DBS, DbPrefix, X, ...)
+
+-define(DB_VERSION, 0).
+-define(DB_CONFIG, 16).
+-define(DB_STATS, 17).
+-define(DB_ALL_DOCS, 18).
+-define(DB_CHANGES, 19).
+-define(DB_REVS, 20).
+-define(DB_DOCS, 21).
+-define(DB_LOCAL_DOCS, 22).
+-define(DB_ATTS, 23).
+
+
+% Versions
+
+-define(CURR_REV_FORMAT, 0).
+
+
+% Misc constants
+
+-define(PDICT_DB_KEY, '$fabric_db_handle').
+-define(PDICT_LAYER_CACHE, '$fabric_layer_id').
+-define(PDICT_CHECKED_DB_IS_CURRENT, '$fabric_checked_db_is_current').
+-define(PDICT_TX_ID_KEY, '$fabric_tx_id').
+-define(PDICT_TX_RES_KEY, '$fabric_tx_result').
+-define(COMMIT_UNKNOWN_RESULT, 1021).
+
+
+-define(ATTACHMENT_CHUNK_SIZE, 100000).
diff --git a/src/fabric/src/fabric2_app.erl b/src/fabric/src/fabric2_app.erl
new file mode 100644
index 000000000..da95acb53
--- /dev/null
+++ b/src/fabric/src/fabric2_app.erl
@@ -0,0 +1,32 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(fabric2_app).
+-behaviour(application).
+
+
+-export([
+ start/2,
+ stop/1
+]).
+
+
+start(_Type, StartArgs) ->
+ fabric2_sup:start_link(StartArgs).
+
+
+stop(_State) ->
+ case application:get_env(erlfdb, test_cluster_pid) of
+ {ok, Pid} -> Pid ! close;
+ _ -> ok
+ end,
+ ok.
diff --git a/src/fabric/src/fabric2_db.erl b/src/fabric/src/fabric2_db.erl
new file mode 100644
index 000000000..02a18fa23
--- /dev/null
+++ b/src/fabric/src/fabric2_db.erl
@@ -0,0 +1,1299 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(fabric2_db).
+
+
+-export([
+ create/2,
+ open/2,
+ delete/2,
+
+ list_dbs/0,
+ list_dbs/1,
+
+ is_admin/1,
+ check_is_admin/1,
+ check_is_member/1,
+
+ name/1,
+ get_after_doc_read_fun/1,
+ get_before_doc_update_fun/1,
+ get_committed_update_seq/1,
+ get_compacted_seq/1,
+ get_compactor_pid/1,
+ get_db_info/1,
+ %% get_partition_info/2,
+ get_del_doc_count/1,
+ get_doc_count/1,
+ get_doc_count/2,
+ %% get_epochs/1,
+ %% get_filepath/1,
+ get_instance_start_time/1,
+ get_pid/1,
+ get_revs_limit/1,
+ get_security/1,
+ get_update_seq/1,
+ get_user_ctx/1,
+ get_uuid/1,
+ %% get_purge_seq/1,
+ %% get_oldest_purge_seq/1,
+ %% get_purge_infos_limit/1,
+
+ is_clustered/1,
+ is_db/1,
+ is_partitioned/1,
+ is_system_db/1,
+ is_system_db_name/1,
+
+ set_revs_limit/2,
+ %% set_purge_infos_limit/2,
+ set_security/2,
+ set_user_ctx/2,
+
+ ensure_full_commit/1,
+ ensure_full_commit/2,
+
+ %% load_validation_funs/1,
+ %% reload_validation_funs/1,
+
+ open_doc/2,
+ open_doc/3,
+ open_doc_revs/4,
+ %% open_doc_int/3,
+ get_doc_info/2,
+ get_full_doc_info/2,
+ get_full_doc_infos/2,
+ get_missing_revs/2,
+ %% get_design_doc/2,
+ %% get_design_docs/1,
+ %% get_design_doc_count/1,
+ %% get_purge_infos/2,
+
+ %% get_minimum_purge_seq/1,
+ %% purge_client_exists/3,
+
+ %% validate_docid/2,
+ %% doc_from_json_obj_validate/2,
+
+ update_doc/2,
+ update_doc/3,
+ update_docs/2,
+ update_docs/3,
+ %% delete_doc/3,
+
+ %% purge_docs/2,
+ %% purge_docs/3,
+
+ read_attachment/3,
+ write_attachment/3,
+
+ fold_docs/3,
+ fold_docs/4,
+ %% fold_local_docs/4,
+ %% fold_design_docs/4,
+ fold_changes/4,
+ fold_changes/5,
+ %% count_changes_since/2,
+ %% fold_purge_infos/4,
+ %% fold_purge_infos/5,
+
+ %% calculate_start_seq/3,
+ %% owner_of/2,
+
+ %% start_compact/1,
+ %% cancel_compact/1,
+ %% wait_for_compaction/1,
+ %% wait_for_compaction/2,
+
+ %% dbname_suffix/1,
+ %% normalize_dbname/1,
+ %% validate_dbname/1,
+
+ %% make_doc/5,
+ new_revid/1
+]).
+
+
+-include_lib("couch/include/couch_db.hrl").
+-include("fabric2.hrl").
+
+
+-define(DBNAME_REGEX,
+ "^[a-z][a-z0-9\\_\\$()\\+\\-\\/]*" % use the stock CouchDB regex
+ "(\\.[0-9]{10,})?$" % but allow an optional shard timestamp at the end
+).
+
+
+-define(RETURN(Term), throw({?MODULE, Term})).
+
+
+create(DbName, Options) ->
+ Result = fabric2_fdb:transactional(DbName, Options, fun(TxDb) ->
+ case fabric2_fdb:exists(TxDb) of
+ true ->
+ {error, file_exists};
+ false ->
+ fabric2_fdb:create(TxDb, Options)
+ end
+ end),
+ % We cache outside of the transaction so that we're sure
+ % that the transaction was committed.
+ case Result of
+ #{} = Db ->
+ ok = fabric2_server:store(Db),
+ {ok, Db#{tx := undefined}};
+ Error ->
+ Error
+ end.
+
+
+open(DbName, Options) ->
+ case fabric2_server:fetch(DbName) of
+ #{} = Db ->
+ {ok, maybe_set_user_ctx(Db, Options)};
+ undefined ->
+ Result = fabric2_fdb:transactional(DbName, Options, fun(TxDb) ->
+ fabric2_fdb:open(TxDb, Options)
+ end),
+ % Cache outside the transaction retry loop
+ case Result of
+ #{} = Db ->
+ ok = fabric2_server:store(Db),
+ {ok, Db#{tx := undefined}};
+ Error ->
+ Error
+ end
+ end.
+
+
+delete(DbName, Options) ->
+ % This will throw if the db does not exist
+ {ok, Db} = open(DbName, Options),
+ Resp = fabric2_fdb:transactional(Db, fun(TxDb) ->
+ fabric2_fdb:delete(TxDb)
+ end),
+ if Resp /= ok -> Resp; true ->
+ fabric2_server:remove(DbName)
+ end.
+
+
+list_dbs() ->
+ list_dbs([]).
+
+
+list_dbs(Options) ->
+ fabric2_fdb:transactional(fun(Tx) ->
+ fabric2_fdb:list_dbs(Tx, Options)
+ end).
+
+
+is_admin(Db) ->
+ % TODO: Need to re-consider couch_db_plugin:check_is_admin/1
+ {SecProps} = get_security(Db),
+ UserCtx = get_user_ctx(Db),
+ {Admins} = get_admins(SecProps),
+ is_authorized(Admins, UserCtx).
+
+
+check_is_admin(Db) ->
+ case is_admin(Db) of
+ true ->
+ ok;
+ false ->
+ UserCtx = get_user_ctx(Db),
+ Reason = <<"You are not a db or server admin.">>,
+ throw_security_error(UserCtx, Reason)
+ end.
+
+
+check_is_member(Db) ->
+ case is_member(Db) of
+ true ->
+ ok;
+ false ->
+ UserCtx = get_user_ctx(Db),
+ throw_security_error(UserCtx)
+ end.
+
+
+name(#{name := DbName}) ->
+ DbName.
+
+
+get_after_doc_read_fun(#{after_doc_read := AfterDocRead}) ->
+ AfterDocRead.
+
+
+get_before_doc_update_fun(#{before_doc_update := BeforeDocUpdate}) ->
+ BeforeDocUpdate.
+
+get_committed_update_seq(#{} = Db) ->
+ get_update_seq(Db).
+
+
+get_compacted_seq(#{} = Db) ->
+ get_update_seq(Db).
+
+
+get_compactor_pid(#{} = _Db) ->
+ nil.
+
+
+get_db_info(#{} = Db) ->
+ DbProps = fabric2_fdb:transactional(Db, fun(TxDb) ->
+ fabric2_fdb:get_info(TxDb)
+ end),
+
+ BaseProps = [
+ {cluster, {[{n, 0}, {q, 0}, {r, 0}, {w, 0}]}},
+ {compact_running, false},
+ {data_size, 0},
+ {db_name, name(Db)},
+ {disk_format_version, 0},
+ {disk_size, 0},
+ {instance_start_time, <<"0">>},
+ {purge_seq, 0}
+ ],
+
+ {ok, lists:foldl(fun({Key, Val}, Acc) ->
+ lists:keystore(Key, 1, Acc, {Key, Val})
+ end, BaseProps, DbProps)}.
+
+
+get_del_doc_count(#{} = Db) ->
+ get_doc_count(Db, <<"doc_del_count">>).
+
+
+get_doc_count(Db) ->
+ get_doc_count(Db, <<"doc_count">>).
+
+
+get_doc_count(Db, <<"_all_docs">>) ->
+ get_doc_count(Db, <<"doc_count">>);
+
+get_doc_count(DbName, <<"_design">>) ->
+ get_doc_count(DbName, <<"doc_design_count">>);
+
+get_doc_count(DbName, <<"_local">>) ->
+ get_doc_count(DbName, <<"doc_local_count">>);
+
+get_doc_count(Db, Key) ->
+ fabric2_fdb:transactional(Db, fun(TxDb) ->
+ fabric2_fdb:get_stat(TxDb, Key)
+ end).
+
+
+get_instance_start_time(#{}) ->
+ 0.
+
+
+get_pid(#{}) ->
+ nil.
+
+
+get_revs_limit(#{revs_limit := RevsLimit}) ->
+ RevsLimit.
+
+
+get_security(#{security_doc := SecurityDoc}) ->
+ SecurityDoc.
+
+
+get_update_seq(#{} = Db) ->
+ fabric2_fdb:transactional(Db, fun(TxDb) ->
+ fabric2_fdb:get_last_change(TxDb)
+ end).
+
+
+get_user_ctx(#{user_ctx := UserCtx}) ->
+ UserCtx.
+
+
+get_uuid(#{uuid := UUID}) ->
+ UUID.
+
+
+is_clustered(#{}) ->
+ false.
+
+
+is_db(#{name := _}) ->
+ true;
+is_db(_) ->
+ false.
+
+
+is_partitioned(#{}) ->
+ false.
+
+
+is_system_db(#{name := DbName}) ->
+ is_system_db_name(DbName).
+
+
+is_system_db_name(DbName) when is_list(DbName) ->
+ is_system_db_name(?l2b(DbName));
+is_system_db_name(DbName) when is_binary(DbName) ->
+ Suffix = filename:basename(DbName),
+ case {filename:dirname(DbName), lists:member(Suffix, ?SYSTEM_DATABASES)} of
+ {<<".">>, Result} -> Result;
+ {_Prefix, false} -> false;
+ {Prefix, true} ->
+ ReOpts = [{capture,none}, dollar_endonly],
+ re:run(Prefix, ?DBNAME_REGEX, ReOpts) == match
+ end.
+
+
+set_revs_limit(#{} = Db, RevsLimit) ->
+ check_is_admin(Db),
+ RevsLimBin = ?uint2bin(RevsLimit),
+ Resp = fabric2_fdb:transactional(Db, fun(TxDb) ->
+ fabric2_fdb:set_config(TxDb, <<"revs_limit">>, RevsLimBin)
+ end),
+ if Resp /= ok -> Resp; true ->
+ fabric2_server:store(Db#{revs_limit := RevsLimit})
+ end.
+
+
+set_security(#{} = Db, Security) ->
+ check_is_admin(Db),
+ ok = fabric2_util:validate_security_object(Security),
+ SecBin = ?JSON_ENCODE(Security),
+ Resp = fabric2_fdb:transactional(Db, fun(TxDb) ->
+ fabric2_fdb:set_config(TxDb, <<"security_doc">>, SecBin)
+ end),
+ if Resp /= ok -> Resp; true ->
+ fabric2_server:store(Db#{security_doc := Security})
+ end.
+
+
+set_user_ctx(#{} = Db, UserCtx) ->
+ Db#{user_ctx := UserCtx}.
+
+
+ensure_full_commit(#{}) ->
+ {ok, 0}.
+
+
+ensure_full_commit(#{}, _Timeout) ->
+ {ok, 0}.
+
+
+open_doc(#{} = Db, DocId) ->
+ open_doc(Db, DocId, []).
+
+
+open_doc(#{} = Db, <<?LOCAL_DOC_PREFIX, _/binary>> = DocId, _Options) ->
+ fabric2_fdb:transactional(Db, fun(TxDb) ->
+ case fabric2_fdb:get_local_doc(TxDb, DocId) of
+ #doc{} = Doc -> {ok, Doc};
+ Else -> Else
+ end
+ end);
+
+open_doc(#{} = Db, DocId, Options) ->
+ NeedsTreeOpts = [revs_info, conflicts, deleted_conflicts],
+ NeedsTree = (Options -- NeedsTreeOpts /= Options),
+ fabric2_fdb:transactional(Db, fun(TxDb) ->
+ Revs = case NeedsTree of
+ true -> fabric2_fdb:get_all_revs(TxDb, DocId);
+ false -> fabric2_fdb:get_winning_revs(TxDb, DocId, 1)
+ end,
+ if Revs == [] -> {not_found, missing}; true ->
+ #{winner := true} = RI = lists:last(Revs),
+ case fabric2_fdb:get_doc_body(TxDb, DocId, RI) of
+ #doc{} = Doc ->
+ apply_open_doc_opts(Doc, Revs, Options);
+ Else ->
+ Else
+ end
+ end
+ end).
+
+
+open_doc_revs(Db, DocId, Revs, Options) ->
+ Latest = lists:member(latest, Options),
+ fabric2_fdb:transactional(Db, fun(TxDb) ->
+ AllRevInfos = fabric2_fdb:get_all_revs(TxDb, DocId),
+ RevTree = lists:foldl(fun(RI, TreeAcc) ->
+ RIPath = fabric2_util:revinfo_to_path(RI),
+ {Merged, _} = couch_key_tree:merge(TreeAcc, RIPath),
+ Merged
+ end, [], AllRevInfos),
+ {Found, Missing} = case Revs of
+ all ->
+ {couch_key_tree:get_all_leafs(RevTree), []};
+ _ when Latest ->
+ couch_key_tree:get_key_leafs(RevTree, Revs);
+ _ ->
+ couch_key_tree:get(RevTree, Revs)
+ end,
+ Docs = lists:map(fun({Value, {Pos, [Rev | RevPath]}}) ->
+ case Value of
+ ?REV_MISSING ->
+ % We have the rev in our list but know nothing about it
+ {{not_found, missing}, {Pos, Rev}};
+ _ ->
+ RevInfo = #{
+ rev_id => {Pos, Rev},
+ rev_path => RevPath
+ },
+ case fabric2_fdb:get_doc_body(TxDb, DocId, RevInfo) of
+ #doc{} = Doc -> {ok, Doc};
+ Else -> {Else, {Pos, Rev}}
+ end
+ end
+ end, Found),
+ MissingDocs = [{{not_found, missing}, MRev} || MRev <- Missing],
+ {ok, Docs ++ MissingDocs}
+ end).
+
+
+get_doc_info(Db, DocId) ->
+ case get_full_doc_info(Db, DocId) of
+ not_found -> not_found;
+ FDI -> couch_doc:to_doc_info(FDI)
+ end.
+
+
+get_full_doc_info(Db, DocId) ->
+ RevInfos = fabric2_fdb:transactional(Db, fun(TxDb) ->
+ fabric2_fdb:get_all_revs(TxDb, DocId)
+ end),
+ if RevInfos == [] -> not_found; true ->
+ #{winner := true} = Winner = lists:last(RevInfos),
+ RevTree = lists:foldl(fun(RI, TreeAcc) ->
+ RIPath = fabric2_util:revinfo_to_path(RI),
+ {Merged, _} = couch_key_tree:merge(TreeAcc, RIPath),
+ Merged
+ end, [], RevInfos),
+ #full_doc_info{
+ id = DocId,
+ update_seq = fabric2_fdb:vs_to_seq(maps:get(sequence, Winner)),
+ deleted = maps:get(deleted, Winner),
+ rev_tree = RevTree
+ }
+ end.
+
+
+get_full_doc_infos(Db, DocIds) ->
+ fabric2_fdb:transactional(Db, fun(TxDb) ->
+ lists:map(fun(DocId) ->
+ get_full_doc_info(TxDb, DocId)
+ end, DocIds)
+ end).
+
+
+get_missing_revs(Db, JsonIdRevs) ->
+ IdRevs = [idrevs(IdR) || IdR <- JsonIdRevs],
+ AllRevInfos = fabric2_fdb:transactional(Db, fun(TxDb) ->
+ lists:foldl(fun({Id, _Revs}, Acc) ->
+ case maps:is_key(Id, Acc) of
+ true ->
+ Acc;
+ false ->
+ RevInfos = fabric2_fdb:get_all_revs(TxDb, Id),
+ Acc#{Id => RevInfos}
+ end
+ end, #{}, IdRevs)
+ end),
+ AllMissing = lists:flatmap(fun({Id, Revs}) ->
+ #{Id := RevInfos} = AllRevInfos,
+ Missing = try
+ lists:foldl(fun(RevInfo, RevAcc) ->
+ if RevAcc /= [] -> ok; true ->
+ throw(all_found)
+ end,
+ filter_found_revs(RevInfo, RevAcc)
+ end, Revs, RevInfos)
+ catch throw:all_found ->
+ []
+ end,
+ if Missing == [] -> []; true ->
+ PossibleAncestors = find_possible_ancestors(RevInfos, Missing),
+ [{Id, Missing, PossibleAncestors}]
+ end
+ end, IdRevs),
+ {ok, AllMissing}.
+
+
+update_doc(Db, Doc) ->
+ update_doc(Db, Doc, []).
+
+
+update_doc(Db, Doc, Options) ->
+ case update_docs(Db, [Doc], Options) of
+ {ok, [{ok, NewRev}]} ->
+ {ok, NewRev};
+ {ok, [{{_Id, _Rev}, Error}]} ->
+ throw(Error);
+ {error, [{{_Id, _Rev}, Error}]} ->
+ throw(Error);
+ {error, [Error]} ->
+ throw(Error);
+ {ok, []} ->
+ % replication success
+ {Pos, [RevId | _]} = Doc#doc.revs,
+ {ok, {Pos, RevId}}
+ end.
+
+
+update_docs(Db, Docs) ->
+ update_docs(Db, Docs, []).
+
+
+update_docs(Db, Docs, Options) ->
+ Resps0 = case lists:member(replicated_changes, Options) of
+ false ->
+ fabric2_fdb:transactional(Db, fun(TxDb) ->
+ update_docs_interactive(TxDb, Docs, Options)
+ end);
+ true ->
+ lists:map(fun(Doc) ->
+ fabric2_fdb:transactional(Db, fun(TxDb) ->
+ update_doc_int(TxDb, Doc, Options)
+ end)
+ end, Docs)
+ end,
+ % Convert errors
+ Resps1 = lists:map(fun(Resp) ->
+ case Resp of
+ {#doc{} = Doc, Error} ->
+ #doc{
+ id = DocId,
+ revs = Revs
+ } = Doc,
+ RevId = case Revs of
+ {RevPos, [Rev | _]} -> {RevPos, Rev};
+ {0, []} -> {0, <<>>}
+ end,
+ {{DocId, RevId}, Error};
+ Else ->
+ Else
+ end
+ end, Resps0),
+ case lists:member(replicated_changes, Options) of
+ true ->
+ {ok, [R || R <- Resps1, R /= {ok, []}]};
+ false ->
+ Status = lists:foldl(fun(Resp, Acc) ->
+ case Resp of
+ {ok, _} -> Acc;
+ _ -> error
+ end
+ end, ok, Resps1),
+ {Status, Resps1}
+ end.
+
+
+read_attachment(Db, DocId, AttId) ->
+ fabric2_fdb:transactional(Db, fun(TxDb) ->
+ fabric2_fdb:read_attachment(TxDb, DocId, AttId)
+ end).
+
+
+write_attachment(Db, DocId, Att) ->
+ Data = couch_att:fetch(data, Att),
+ {ok, AttId} = fabric2_fdb:transactional(Db, fun(TxDb) ->
+ fabric2_fdb:write_attachment(TxDb, DocId, Data)
+ end),
+ couch_att:store(data, {loc, Db, DocId, AttId}, Att).
+
+
+fold_docs(Db, UserFun, UserAcc) ->
+ fold_docs(Db, UserFun, UserAcc, []).
+
+
+fold_docs(Db, UserFun, UserAcc, Options) ->
+ fabric2_fdb:transactional(Db, fun(TxDb) ->
+ fabric2_fdb:fold_docs(TxDb, UserFun, UserAcc, Options)
+ end).
+
+
+fold_changes(Db, SinceSeq, UserFun, UserAcc) ->
+ fold_changes(Db, SinceSeq, UserFun, UserAcc, []).
+
+
+fold_changes(Db, SinceSeq, UserFun, UserAcc, Options) ->
+ fabric2_fdb:transactional(Db, fun(TxDb) ->
+ fabric2_fdb:fold_changes(TxDb, SinceSeq, UserFun, UserAcc, Options)
+ end).
+
+
+new_revid(Doc) ->
+ #doc{
+ body = Body,
+ revs = {OldStart, OldRevs},
+ atts = Atts,
+ deleted = Deleted
+ } = Doc,
+
+ DigestedAtts = lists:foldl(fun(Att, Acc) ->
+ [N, T, M] = couch_att:fetch([name, type, md5], Att),
+ case M == <<>> of
+ true -> Acc;
+ false -> [{N, T, M} | Acc]
+ end
+ end, [], Atts),
+
+ Rev = case DigestedAtts of
+ Atts2 when length(Atts) =/= length(Atts2) ->
+ % We must have old style non-md5 attachments
+ list_to_binary(integer_to_list(couch_util:rand32()));
+ Atts2 ->
+ OldRev = case OldRevs of [] -> 0; [OldRev0 | _] -> OldRev0 end,
+ SigTerm = [Deleted, OldStart, OldRev, Body, Atts2],
+ couch_hash:md5_hash(term_to_binary(SigTerm, [{minor_version, 1}]))
+ end,
+
+ Doc#doc{revs = {OldStart + 1, [Rev | OldRevs]}}.
+
+
+maybe_set_user_ctx(Db, Options) ->
+ case fabric2_util:get_value(user_ctx, Options) of
+ #user_ctx{} = UserCtx ->
+ set_user_ctx(Db, UserCtx);
+ undefined ->
+ Db
+ end.
+
+
+is_member(Db) ->
+ {SecProps} = get_security(Db),
+ case is_admin(Db) of
+ true ->
+ true;
+ false ->
+ case is_public_db(SecProps) of
+ true ->
+ true;
+ false ->
+ {Members} = get_members(SecProps),
+ UserCtx = get_user_ctx(Db),
+ is_authorized(Members, UserCtx)
+ end
+ end.
+
+
+is_authorized(Group, UserCtx) ->
+ #user_ctx{
+ name = UserName,
+ roles = UserRoles
+ } = UserCtx,
+ Names = fabric2_util:get_value(<<"names">>, Group, []),
+ Roles = fabric2_util:get_value(<<"roles">>, Group, []),
+ case check_security(roles, UserRoles, [<<"_admin">> | Roles]) of
+ true ->
+ true;
+ false ->
+ check_security(names, UserName, Names)
+ end.
+
+
+check_security(roles, [], _) ->
+ false;
+check_security(roles, UserRoles, Roles) ->
+ UserRolesSet = ordsets:from_list(UserRoles),
+ RolesSet = ordsets:from_list(Roles),
+ not ordsets:is_disjoint(UserRolesSet, RolesSet);
+check_security(names, _, []) ->
+ false;
+check_security(names, null, _) ->
+ false;
+check_security(names, UserName, Names) ->
+ lists:member(UserName, Names).
+
+
+throw_security_error(#user_ctx{name = null} = UserCtx) ->
+ Reason = <<"You are not authorized to access this db.">>,
+ throw_security_error(UserCtx, Reason);
+throw_security_error(#user_ctx{name = _} = UserCtx) ->
+ Reason = <<"You are not allowed to access this db.">>,
+ throw_security_error(UserCtx, Reason).
+
+
+throw_security_error(#user_ctx{} = UserCtx, Reason) ->
+ Error = security_error_type(UserCtx),
+ throw({Error, Reason}).
+
+
+security_error_type(#user_ctx{name = null}) ->
+ unauthorized;
+security_error_type(#user_ctx{name = _}) ->
+ forbidden.
+
+
+is_public_db(SecProps) ->
+ {Members} = get_members(SecProps),
+ Names = fabric2_util:get_value(<<"names">>, Members, []),
+ Roles = fabric2_util:get_value(<<"roles">>, Members, []),
+ Names =:= [] andalso Roles =:= [].
+
+
+get_admins(SecProps) ->
+ fabric2_util:get_value(<<"admins">>, SecProps, {[]}).
+
+
+get_members(SecProps) ->
+ % we fallback to readers here for backwards compatibility
+ case fabric2_util:get_value(<<"members">>, SecProps) of
+ undefined ->
+ fabric2_util:get_value(<<"readers">>, SecProps, {[]});
+ Members ->
+ Members
+ end.
+
+
+apply_open_doc_opts(Doc, Revs, Options) ->
+ IncludeRevsInfo = lists:member(revs_info, Options),
+ IncludeConflicts = lists:member(conflicts, Options),
+ IncludeDelConflicts = lists:member(deleted_conflicts, Options),
+ IncludeLocalSeq = lists:member(local_seq, Options),
+ ReturnDeleted = lists:member(deleted, Options),
+
+ % This revs_info becomes fairly useless now that we're
+ % not keeping old document bodies around...
+ Meta1 = if not IncludeRevsInfo -> []; true ->
+ {Pos, [Rev | RevPath]} = Doc#doc.revs,
+ RevPathMissing = lists:map(fun(R) -> {R, missing} end, RevPath),
+ [{revs_info, Pos, [{Rev, available} | RevPathMissing]}]
+ end,
+
+ Meta2 = if not IncludeConflicts -> []; true ->
+ Conflicts = [RI || RI = #{winner := false, deleted := false} <- Revs],
+ if Conflicts == [] -> []; true ->
+ ConflictRevs = [maps:get(rev_id, RI) || RI <- Conflicts],
+ [{conflicts, ConflictRevs}]
+ end
+ end,
+
+ Meta3 = if not IncludeDelConflicts -> []; true ->
+ DelConflicts = [RI || RI = #{winner := false, deleted := true} <- Revs],
+ if DelConflicts == [] -> []; true ->
+ DelConflictRevs = [maps:get(rev_id, RI) || RI <- DelConflicts],
+ [{deleted_conflicts, DelConflictRevs}]
+ end
+ end,
+
+ Meta4 = if not IncludeLocalSeq -> []; true ->
+ #{winner := true, sequence := SeqVS} = lists:last(Revs),
+ [{local_seq, fabric2_fdb:vs_to_seq(SeqVS)}]
+ end,
+
+ case Doc#doc.deleted and not ReturnDeleted of
+ true ->
+ {not_found, deleted};
+ false ->
+ {ok, Doc#doc{
+ meta = Meta1 ++ Meta2 ++ Meta3 ++ Meta4
+ }}
+ end.
+
+
+filter_found_revs(RevInfo, Revs) ->
+ #{
+ rev_id := {Pos, Rev},
+ rev_path := RevPath
+ } = RevInfo,
+ FullRevPath = [Rev | RevPath],
+ lists:flatmap(fun({FindPos, FindRev} = RevIdToFind) ->
+ if FindPos > Pos -> [RevIdToFind]; true ->
+ % Add 1 because lists:nth is 1 based
+ Idx = Pos - FindPos + 1,
+ case Idx > length(FullRevPath) of
+ true ->
+ [RevIdToFind];
+ false ->
+ case lists:nth(Idx, FullRevPath) == FindRev of
+ true -> [];
+ false -> [RevIdToFind]
+ end
+ end
+ end
+ end, Revs).
+
+
+find_possible_ancestors(RevInfos, MissingRevs) ->
+ % Find any revinfos that are possible ancestors
+ % of the missing revs. A possible ancestor is
+ % any rev that has a start position less than
+ % any missing revision. Stated alternatively,
+ % find any revinfo that could theoretically
+ % extended to be one or more of the missing
+ % revisions.
+ %
+ % Since we are looking at any missing revision
+ % we can just compare against the maximum missing
+ % start position.
+ MaxMissingPos = case MissingRevs of
+ [] -> 0;
+ [_ | _] -> lists:max([Start || {Start, _Rev} <- MissingRevs])
+ end,
+ lists:flatmap(fun(RevInfo) ->
+ #{rev_id := {RevPos, _} = RevId} = RevInfo,
+ case RevPos < MaxMissingPos of
+ true -> [RevId];
+ false -> []
+ end
+ end, RevInfos).
+
+
+update_doc_int(#{} = Db, #doc{} = Doc, Options) ->
+ IsLocal = case Doc#doc.id of
+ <<?LOCAL_DOC_PREFIX, _/binary>> -> true;
+ _ -> false
+ end,
+ IsReplicated = lists:member(replicated_changes, Options),
+ try
+ case {IsLocal, IsReplicated} of
+ {false, false} -> update_doc_interactive(Db, Doc, Options);
+ {false, true} -> update_doc_replicated(Db, Doc, Options);
+ {true, _} -> update_local_doc(Db, Doc, Options)
+ end
+ catch throw:{?MODULE, Return} ->
+ Return
+ end.
+
+
+update_docs_interactive(Db, Docs0, Options) ->
+ Docs = tag_docs(Docs0),
+ Futures = get_winning_rev_futures(Db, Docs),
+ {Result, _} = lists:mapfoldl(fun(Doc, SeenIds) ->
+ try
+ update_docs_interactive(Db, Doc, Options, Futures, SeenIds)
+ catch throw:{?MODULE, Return} ->
+ {Return, SeenIds}
+ end
+ end, [], Docs),
+ Result.
+
+
+update_docs_interactive(Db, #doc{id = <<?LOCAL_DOC_PREFIX, _/binary>>} = Doc,
+ Options, _Futures, SeenIds) ->
+ {update_local_doc(Db, Doc, Options), SeenIds};
+
+update_docs_interactive(Db, Doc, Options, Futures, SeenIds) ->
+ case lists:member(Doc#doc.id, SeenIds) of
+ true ->
+ {{error, conflict}, SeenIds};
+ false ->
+ Future = maps:get(doc_tag(Doc), Futures),
+ case update_doc_interactive(Db, Doc, Future, Options) of
+ {ok, _} = Resp ->
+ {Resp, [Doc#doc.id | SeenIds]};
+ _ = Resp ->
+ {Resp, SeenIds}
+ end
+ end.
+
+
+update_doc_interactive(Db, Doc0, Options) ->
+ % Get the current winning revision. This is needed
+ % regardless of which branch we're updating. The extra
+ % revision we're grabbing is an optimization to
+ % save us a round trip if we end up deleting
+ % the winning revision branch.
+ NumRevs = if Doc0#doc.deleted -> 2; true -> 1 end,
+ Future = fabric2_fdb:get_winning_revs_future(Db, Doc0#doc.id, NumRevs),
+ update_doc_interactive(Db, Doc0, Future, Options).
+
+
+update_doc_interactive(Db, Doc0, Future, _Options) ->
+ RevInfos = fabric2_fdb:get_winning_revs_wait(Db, Future),
+ {Winner, SecondPlace} = case RevInfos of
+ [] -> {not_found, not_found};
+ [WRI] -> {WRI, not_found};
+ [WRI, SPRI] -> {WRI, SPRI}
+ end,
+ WinnerRevId = case Winner of
+ not_found ->
+ {0, <<>>};
+ _ ->
+ case maps:get(deleted, Winner) of
+ true -> {0, <<>>};
+ false -> maps:get(rev_id, Winner)
+ end
+ end,
+
+ % Check that a revision was specified if required
+ Doc0RevId = doc_to_revid(Doc0),
+ if Doc0RevId /= {0, <<>>} orelse WinnerRevId == {0, <<>>} -> ok; true ->
+ ?RETURN({error, conflict})
+ end,
+
+ % Check that we're not trying to create a deleted doc
+ if Doc0RevId /= {0, <<>>} orelse not Doc0#doc.deleted -> ok; true ->
+ ?RETURN({error, conflict})
+ end,
+
+ % Get the target revision to update
+ Target = case Doc0RevId == WinnerRevId of
+ true ->
+ Winner;
+ false ->
+ case fabric2_fdb:get_non_deleted_rev(Db, Doc0#doc.id, Doc0RevId) of
+ #{deleted := false} = Target0 ->
+ Target0;
+ not_found ->
+ % Either a missing revision or a deleted
+ % revision. Either way a conflict. Note
+ % that we get not_found for a deleted revision
+ % because we only check for the non-deleted
+ % key in fdb
+ ?RETURN({error, conflict})
+ end
+ end,
+
+ % When recreating a deleted document we want to extend
+ % the winning revision branch rather than create a
+ % new branch. If we did not do this we could be
+ % recreating into a state that previously existed.
+ Doc1 = case Winner of
+ #{deleted := true} when not Doc0#doc.deleted ->
+ {WinnerRevPos, WinnerRev} = maps:get(rev_id, Winner),
+ WinnerRevPath = maps:get(rev_path, Winner),
+ Doc0#doc{revs = {WinnerRevPos, [WinnerRev | WinnerRevPath]}};
+ _ ->
+ Doc0
+ end,
+
+ % Validate the doc update and create the
+ % new revinfo map
+ Doc2 = prep_and_validate(Db, Doc1, Target),
+ #doc{
+ deleted = NewDeleted,
+ revs = {NewRevPos, [NewRev | NewRevPath]}
+ } = Doc3 = new_revid(Doc2),
+
+ Doc4 = update_attachment_revpos(Doc3),
+
+ NewRevInfo = #{
+ winner => undefined,
+ deleted => NewDeleted,
+ rev_id => {NewRevPos, NewRev},
+ rev_path => NewRevPath,
+ sequence => undefined,
+ branch_count => undefined
+ },
+
+ % Gather the list of possible winnig revisions
+ Possible = case Target == Winner of
+ true when not Doc4#doc.deleted ->
+ [NewRevInfo];
+ true when Doc4#doc.deleted ->
+ case SecondPlace of
+ #{} -> [NewRevInfo, SecondPlace];
+ not_found -> [NewRevInfo]
+ end;
+ false ->
+ [NewRevInfo, Winner]
+ end,
+
+ % Sort the rev infos such that the winner is first
+ {NewWinner0, NonWinner} = case fabric2_util:sort_revinfos(Possible) of
+ [W] -> {W, not_found};
+ [W, NW] -> {W, NW}
+ end,
+
+ BranchCount = case Winner of
+ not_found -> 1;
+ #{branch_count := BC} -> BC
+ end,
+ NewWinner = NewWinner0#{branch_count := BranchCount},
+ ToUpdate = if NonWinner == not_found -> []; true -> [NonWinner] end,
+ ToRemove = if Target == not_found -> []; true -> [Target] end,
+
+ ok = fabric2_fdb:write_doc(
+ Db,
+ Doc4,
+ NewWinner,
+ Winner,
+ ToUpdate,
+ ToRemove
+ ),
+
+ {ok, {NewRevPos, NewRev}}.
+
+
+update_doc_replicated(Db, Doc0, _Options) ->
+ #doc{
+ id = DocId,
+ deleted = Deleted,
+ revs = {RevPos, [Rev | RevPath]}
+ } = Doc0,
+
+ DocRevInfo0 = #{
+ winner => undefined,
+ deleted => Deleted,
+ rev_id => {RevPos, Rev},
+ rev_path => RevPath,
+ sequence => undefined,
+ branch_count => undefined
+ },
+
+ AllRevInfos = fabric2_fdb:get_all_revs(Db, DocId),
+
+ RevTree = lists:foldl(fun(RI, TreeAcc) ->
+ RIPath = fabric2_util:revinfo_to_path(RI),
+ {Merged, _} = couch_key_tree:merge(TreeAcc, RIPath),
+ Merged
+ end, [], AllRevInfos),
+
+ DocRevPath = fabric2_util:revinfo_to_path(DocRevInfo0),
+ {NewTree, Status} = couch_key_tree:merge(RevTree, DocRevPath),
+ if Status /= internal_node -> ok; true ->
+ % We already know this revision so nothing
+ % left to do.
+ ?RETURN({ok, []})
+ end,
+
+ % Its possible to have a replication with fewer than $revs_limit
+ % revisions which extends an existing branch. To avoid
+ % losing revision history we extract the new node from the
+ % tree and use the combined path after stemming.
+ {[{_, {RevPos, UnstemmedRevs}}], []}
+ = couch_key_tree:get(NewTree, [{RevPos, Rev}]),
+ RevsLimit = fabric2_db:get_revs_limit(Db),
+ Doc1 = Doc0#doc{
+ revs = {RevPos, lists:sublist(UnstemmedRevs, RevsLimit)}
+ },
+ {RevPos, [Rev | NewRevPath]} = Doc1#doc.revs,
+ DocRevInfo1 = DocRevInfo0#{rev_path := NewRevPath},
+
+ % Find any previous revision we knew about for
+ % validation and attachment handling.
+ AllLeafsFull = couch_key_tree:get_all_leafs_full(NewTree),
+ LeafPath = get_leaf_path(RevPos, Rev, AllLeafsFull),
+ PrevRevInfo = find_prev_revinfo(RevPos, LeafPath),
+ Doc2 = prep_and_validate(Db, Doc1, PrevRevInfo),
+
+ % Possible winners are the previous winner and
+ % the new DocRevInfo
+ Winner = case fabric2_util:sort_revinfos(AllRevInfos) of
+ [#{winner := true} = WRI | _] -> WRI;
+ [] -> not_found
+ end,
+ {NewWinner0, NonWinner} = case Winner == PrevRevInfo of
+ true ->
+ {DocRevInfo1, not_found};
+ false ->
+ [W, NW] = fabric2_util:sort_revinfos([Winner, DocRevInfo1]),
+ {W, NW}
+ end,
+
+ NewWinner = NewWinner0#{branch_count := length(AllLeafsFull)},
+ ToUpdate = if NonWinner == not_found -> []; true -> [NonWinner] end,
+ ToRemove = if PrevRevInfo == not_found -> []; true -> [PrevRevInfo] end,
+
+ ok = fabric2_fdb:write_doc(
+ Db,
+ Doc2,
+ NewWinner,
+ Winner,
+ ToUpdate,
+ ToRemove
+ ),
+
+ {ok, []}.
+
+
+update_local_doc(Db, Doc0, _Options) ->
+ Doc1 = case increment_local_doc_rev(Doc0) of
+ {ok, Updated} -> Updated;
+ {error, _} = Error -> ?RETURN(Error)
+ end,
+
+ ok = fabric2_fdb:write_local_doc(Db, Doc1),
+
+ #doc{revs = {0, [Rev]}} = Doc1,
+ {ok, {0, integer_to_binary(Rev)}}.
+
+
+update_attachment_revpos(#doc{revs = {RevPos, _Revs}, atts = Atts0} = Doc) ->
+ Atts = lists:map(fun(Att) ->
+ case couch_att:fetch(data, Att) of
+ {loc, _Db, _DocId, _AttId} ->
+ % Attachment was already on disk
+ Att;
+ _ ->
+ % We will write this attachment with this update
+ % so mark it with the RevPos that will be written
+ couch_att:store(revpos, RevPos, Att)
+ end
+ end, Atts0),
+ Doc#doc{atts = Atts}.
+
+
+get_winning_rev_futures(Db, Docs) ->
+ lists:foldl(fun(Doc, Acc) ->
+ #doc{
+ id = DocId,
+ deleted = Deleted
+ } = Doc,
+ IsLocal = case DocId of
+ <<?LOCAL_DOC_PREFIX, _/binary>> -> true;
+ _ -> false
+ end,
+ if IsLocal -> Acc; true ->
+ NumRevs = if Deleted -> 2; true -> 1 end,
+ Future = fabric2_fdb:get_winning_revs_future(Db, DocId, NumRevs),
+ DocTag = doc_tag(Doc),
+ Acc#{DocTag => Future}
+ end
+ end, #{}, Docs).
+
+
+prep_and_validate(Db, NewDoc, PrevRevInfo) ->
+ HasStubs = couch_doc:has_stubs(NewDoc),
+ HasVDUs = [] /= maps:get(validate_doc_update_funs, Db),
+ IsDDoc = case NewDoc#doc.id of
+ <<?DESIGN_DOC_PREFIX, _/binary>> -> true;
+ _ -> false
+ end,
+
+ PrevDoc = case HasStubs orelse (HasVDUs and not IsDDoc) of
+ true when PrevRevInfo /= not_found ->
+ case fabric2_fdb:get_doc_body(Db, NewDoc#doc.id, PrevRevInfo) of
+ #doc{} = PDoc -> PDoc;
+ {not_found, _} -> nil
+ end;
+ _ ->
+ nil
+ end,
+
+ MergedDoc = if not HasStubs -> NewDoc; true ->
+ % This will throw an error if we have any
+ % attachment stubs missing data
+ couch_doc:merge_stubs(NewDoc, PrevDoc)
+ end,
+ check_duplicate_attachments(MergedDoc),
+ validate_doc_update(Db, MergedDoc, PrevDoc),
+ MergedDoc.
+
+
+validate_doc_update(Db, #doc{id = <<"_design/", _/binary>>} = Doc, _) ->
+ case catch check_is_admin(Db) of
+ ok -> validate_ddoc(Db, Doc);
+ Error -> ?RETURN({Doc, Error})
+ end;
+validate_doc_update(Db, Doc, PrevDoc) ->
+ #{
+ security_doc := Security,
+ validate_doc_update_funs := VDUs
+ } = Db,
+ Fun = fun() ->
+ JsonCtx = fabric2_util:user_ctx_to_json(Db),
+ lists:map(fun(VDU) ->
+ try
+ case VDU(Doc, PrevDoc, JsonCtx, Security) of
+ ok -> ok;
+ Error1 -> throw(Error1)
+ end
+ catch throw:Error2 ->
+ ?RETURN({Doc, Error2})
+ end
+ end, VDUs)
+ end,
+ Stat = [couchdb, query_server, vdu_process_time],
+ if VDUs == [] -> ok; true ->
+ couch_stats:update_histogram(Stat, Fun)
+ end.
+
+
+validate_ddoc(Db, DDoc) ->
+ try
+ ok = couch_index_server:validate(Db, couch_doc:with_ejson_body(DDoc))
+ catch
+ throw:{invalid_design_doc, Reason} ->
+ throw({bad_request, invalid_design_doc, Reason});
+ throw:{compilation_error, Reason} ->
+ throw({bad_request, compilation_error, Reason});
+ throw:Error ->
+ ?RETURN({DDoc, Error})
+ end.
+
+
+check_duplicate_attachments(#doc{atts = Atts}) ->
+ lists:foldl(fun(Att, Names) ->
+ Name = couch_att:fetch(name, Att),
+ case ordsets:is_element(Name, Names) of
+ true -> throw({bad_request, <<"Duplicate attachments">>});
+ false -> ordsets:add_element(Name, Names)
+ end
+ end, ordsets:new(), Atts).
+
+
+get_leaf_path(Pos, Rev, [{Pos, [{Rev, _RevInfo} | LeafPath]} | _]) ->
+ LeafPath;
+get_leaf_path(Pos, Rev, [_WrongLeaf | RestLeafs]) ->
+ get_leaf_path(Pos, Rev, RestLeafs).
+
+
+find_prev_revinfo(_Pos, []) ->
+ not_found;
+find_prev_revinfo(Pos, [{_Rev, ?REV_MISSING} | RestPath]) ->
+ find_prev_revinfo(Pos - 1, RestPath);
+find_prev_revinfo(_Pos, [{_Rev, #{} = RevInfo} | _]) ->
+ RevInfo.
+
+
+increment_local_doc_rev(#doc{deleted = true} = Doc) ->
+ {ok, Doc#doc{revs = {0, [0]}}};
+increment_local_doc_rev(#doc{revs = {0, []}} = Doc) ->
+ {ok, Doc#doc{revs = {0, [1]}}};
+increment_local_doc_rev(#doc{revs = {0, [RevStr | _]}} = Doc) ->
+ try
+ PrevRev = binary_to_integer(RevStr),
+ {ok, Doc#doc{revs = {0, [PrevRev + 1]}}}
+ catch error:badarg ->
+ {error, <<"Invalid rev format">>}
+ end;
+increment_local_doc_rev(#doc{}) ->
+ {error, <<"Invalid rev format">>}.
+
+
+doc_to_revid(#doc{revs = Revs}) ->
+ case Revs of
+ {0, []} -> {0, <<>>};
+ {RevPos, [Rev | _]} -> {RevPos, Rev}
+ end.
+
+
+tag_docs([]) ->
+ [];
+tag_docs([#doc{meta = Meta} = Doc | Rest]) ->
+ NewDoc = Doc#doc{
+ meta = [{ref, make_ref()} | Meta]
+ },
+ [NewDoc | tag_docs(Rest)].
+
+
+doc_tag(#doc{meta = Meta}) ->
+ fabric2_util:get_value(ref, Meta).
+
+
+idrevs({Id, Revs}) when is_list(Revs) ->
+ {docid(Id), [rev(R) || R <- Revs]}.
+
+
+docid(DocId) when is_list(DocId) ->
+ list_to_binary(DocId);
+docid(DocId) ->
+ DocId.
+
+
+rev(Rev) when is_list(Rev); is_binary(Rev) ->
+ couch_doc:parse_rev(Rev);
+rev({Seq, Hash} = Rev) when is_integer(Seq), is_binary(Hash) ->
+ Rev.
+
diff --git a/src/fabric/src/fabric2_events.erl b/src/fabric/src/fabric2_events.erl
new file mode 100644
index 000000000..a5717147f
--- /dev/null
+++ b/src/fabric/src/fabric2_events.erl
@@ -0,0 +1,84 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(fabric2_events).
+
+
+-export([
+ link_listener/4,
+ stop_listener/1
+]).
+
+-export([
+ init/5,
+ poll/5
+]).
+
+
+-include_lib("couch/include/couch_db.hrl").
+
+
+link_listener(Mod, Fun, St, Options) ->
+ DbName = fabric2_util:get_value(dbname, Options),
+ Pid = spawn_link(?MODULE, init, [self(), DbName, Mod, Fun, St]),
+ receive
+ {Pid, initialized} -> ok
+ end,
+ {ok, Pid}.
+
+
+stop_listener(Pid) ->
+ Pid ! stop_listening.
+
+
+init(Parent, DbName, Mod, Fun, St) ->
+ {ok, Db} = fabric2_db:open(DbName, [?ADMIN_CTX]),
+ Since = fabric2_db:get_update_seq(Db),
+ couch_log:error("XKCD: START LISTENER: ~s : ~p for ~p", [DbName, Since, Parent]),
+ erlang:monitor(process, Parent),
+ Parent ! {self(), initialized},
+ poll(DbName, Since, Mod, Fun, St),
+ couch_log:error("XKCD: STOP LISTENER for ~p", [Parent]).
+
+
+poll(DbName, Since, Mod, Fun, St) ->
+ {Resp, NewSince} = try
+ case fabric2_db:open(DbName, [?ADMIN_CTX]) of
+ {ok, Db} ->
+ case fabric2_db:get_update_seq(Db) of
+ Since ->
+ couch_log:error("XKCD: NO UPDATE: ~s :: ~p", [DbName, Since]),
+ {{ok, St}, Since};
+ Other ->
+ couch_log:error("XKCD: UPDATED: ~s :: ~p -> ~p", [DbName, Since, Other]),
+ {Mod:Fun(DbName, updated, St), Other}
+ end;
+ Error ->
+ exit(Error)
+ end
+ catch error:database_does_not_exist ->
+ Mod:Fun(DbName, deleted, St)
+ end,
+ receive
+ stop_listening ->
+ ok;
+ {'DOWN', _, _, _, _} ->
+ ok
+ after 0 ->
+ case Resp of
+ {ok, NewSt} ->
+ timer:sleep(1000),
+ ?MODULE:poll(DbName, NewSince, Mod, Fun, NewSt);
+ {stop, _} ->
+ ok
+ end
+ end.
diff --git a/src/fabric/src/fabric2_fdb.erl b/src/fabric/src/fabric2_fdb.erl
new file mode 100644
index 000000000..0a4f2981b
--- /dev/null
+++ b/src/fabric/src/fabric2_fdb.erl
@@ -0,0 +1,1187 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(fabric2_fdb).
+
+
+-export([
+ transactional/1,
+ transactional/3,
+ transactional/2,
+
+ create/2,
+ open/2,
+ reopen/1,
+ delete/1,
+ exists/1,
+
+ list_dbs/2,
+
+ get_info/1,
+ get_config/1,
+ set_config/3,
+
+ get_stat/2,
+ incr_stat/3,
+
+ get_all_revs/2,
+ get_winning_revs/3,
+ get_winning_revs_future/3,
+ get_winning_revs_wait/2,
+ get_non_deleted_rev/3,
+
+ get_doc_body/3,
+ get_doc_body_future/3,
+ get_doc_body_wait/4,
+ get_local_doc/2,
+
+ write_doc/6,
+ write_local_doc/2,
+
+ read_attachment/3,
+ write_attachment/3,
+
+ fold_docs/4,
+ fold_changes/5,
+ get_last_change/1,
+
+ vs_to_seq/1,
+
+ debug_cluster/0,
+ debug_cluster/2
+]).
+
+
+-include_lib("couch/include/couch_db.hrl").
+-include("fabric2.hrl").
+
+
+transactional(Fun) ->
+ do_transaction(Fun, undefined).
+
+
+transactional(DbName, Options, Fun) when is_binary(DbName) ->
+ transactional(fun(Tx) ->
+ Fun(init_db(Tx, DbName, Options))
+ end).
+
+
+transactional(#{tx := undefined} = Db, Fun) ->
+ #{layer_prefix := LayerPrefix} = Db,
+ do_transaction(fun(Tx) ->
+ Fun(Db#{tx => Tx})
+ end, LayerPrefix);
+
+transactional(#{tx := {erlfdb_transaction, _}} = Db, Fun) ->
+ Fun(Db).
+
+
+do_transaction(Fun, LayerPrefix) when is_function(Fun, 1) ->
+ Db = get_db_handle(),
+ try
+ erlfdb:transactional(Db, fun(Tx) ->
+ case get(erlfdb_trace) of
+ Name when is_binary(Name) ->
+ erlfdb:set_option(Tx, transaction_logging_enable, Name);
+ _ ->
+ ok
+ end,
+ case is_transaction_applied(Tx) of
+ true ->
+ get_previous_transaction_result();
+ false ->
+ execute_transaction(Tx, Fun, LayerPrefix)
+ end
+ end)
+ after
+ clear_transaction()
+ end.
+
+
+create(#{} = Db0, Options) ->
+ #{
+ name := DbName,
+ tx := Tx,
+ layer_prefix := LayerPrefix
+ } = Db = ensure_current(Db0, false),
+
+ % Eventually DbPrefix will be HCA allocated. For now
+ % we're just using the DbName so that debugging is easier.
+ DbKey = erlfdb_tuple:pack({?ALL_DBS, DbName}, LayerPrefix),
+ DbPrefix = erlfdb_tuple:pack({?DBS, DbName}, LayerPrefix),
+ erlfdb:set(Tx, DbKey, DbPrefix),
+
+ % This key is responsible for telling us when something in
+ % the database cache (i.e., fabric2_server's ets table) has
+ % changed and requires re-loading. This currently includes
+ % revs_limit and validate_doc_update functions. There's
+ % no order to versioning here. Its just a value that changes
+ % that is used in the ensure_current check.
+ DbVersionKey = erlfdb_tuple:pack({?DB_VERSION}, DbPrefix),
+ DbVersion = fabric2_util:uuid(),
+ erlfdb:set(Tx, DbVersionKey, DbVersion),
+
+ UUID = fabric2_util:uuid(),
+
+ Defaults = [
+ {?DB_CONFIG, <<"uuid">>, UUID},
+ {?DB_CONFIG, <<"revs_limit">>, ?uint2bin(1000)},
+ {?DB_CONFIG, <<"security_doc">>, <<"{}">>},
+ {?DB_STATS, <<"doc_count">>, ?uint2bin(0)},
+ {?DB_STATS, <<"doc_del_count">>, ?uint2bin(0)},
+ {?DB_STATS, <<"doc_design_count">>, ?uint2bin(0)},
+ {?DB_STATS, <<"doc_local_count">>, ?uint2bin(0)},
+ {?DB_STATS, <<"size">>, ?uint2bin(2)}
+ ],
+ lists:foreach(fun({P, K, V}) ->
+ Key = erlfdb_tuple:pack({P, K}, DbPrefix),
+ erlfdb:set(Tx, Key, V)
+ end, Defaults),
+
+ UserCtx = fabric2_util:get_value(user_ctx, Options, #user_ctx{}),
+
+ Db#{
+ uuid => UUID,
+ db_prefix => DbPrefix,
+ db_version => DbVersion,
+
+ revs_limit => 1000,
+ security_doc => {[]},
+ user_ctx => UserCtx,
+
+ validate_doc_update_funs => [],
+ before_doc_update => undefined,
+ after_doc_read => undefined,
+ % All other db things as we add features,
+
+ db_options => Options
+ }.
+
+
+open(#{} = Db0, Options) ->
+ #{
+ name := DbName,
+ tx := Tx,
+ layer_prefix := LayerPrefix
+ } = Db1 = ensure_current(Db0, false),
+
+ DbKey = erlfdb_tuple:pack({?ALL_DBS, DbName}, LayerPrefix),
+ DbPrefix = case erlfdb:wait(erlfdb:get(Tx, DbKey)) of
+ Bin when is_binary(Bin) -> Bin;
+ not_found -> erlang:error(database_does_not_exist)
+ end,
+
+ DbVersionKey = erlfdb_tuple:pack({?DB_VERSION}, DbPrefix),
+ DbVersion = erlfdb:wait(erlfdb:get(Tx, DbVersionKey)),
+
+ UserCtx = fabric2_util:get_value(user_ctx, Options, #user_ctx{}),
+
+ Db2 = Db1#{
+ db_prefix => DbPrefix,
+ db_version => DbVersion,
+
+ revs_limit => 1000,
+ security_doc => {[]},
+ user_ctx => UserCtx,
+
+ % Place holders until we implement these
+ % bits.
+ validate_doc_update_funs => [],
+ before_doc_update => undefined,
+ after_doc_read => undefined,
+
+ db_options => Options
+ },
+
+ Db3 = lists:foldl(fun({Key, Val}, DbAcc) ->
+ case Key of
+ <<"uuid">> ->
+ DbAcc#{uuid => Val};
+ <<"revs_limit">> ->
+ DbAcc#{revs_limit => ?bin2uint(Val)};
+ <<"security_doc">> ->
+ DbAcc#{security_doc => ?JSON_DECODE(Val)}
+ end
+ end, Db2, get_config(Db2)),
+
+ load_validate_doc_funs(Db3).
+
+
+reopen(#{} = OldDb) ->
+ require_transaction(OldDb),
+ #{
+ tx := Tx,
+ name := DbName,
+ db_options := Options
+ } = OldDb,
+ open(init_db(Tx, DbName, Options), Options).
+
+
+delete(#{} = Db) ->
+ #{
+ name := DbName,
+ tx := Tx,
+ layer_prefix := LayerPrefix,
+ db_prefix := DbPrefix
+ } = ensure_current(Db),
+
+ DbKey = erlfdb_tuple:pack({?ALL_DBS, DbName}, LayerPrefix),
+ erlfdb:clear(Tx, DbKey),
+ erlfdb:clear_range_startswith(Tx, DbPrefix),
+ bump_metadata_version(Tx),
+ ok.
+
+
+exists(#{name := DbName} = Db) when is_binary(DbName) ->
+ #{
+ tx := Tx,
+ layer_prefix := LayerPrefix
+ } = ensure_current(Db, false),
+
+ DbKey = erlfdb_tuple:pack({?ALL_DBS, DbName}, LayerPrefix),
+ case erlfdb:wait(erlfdb:get(Tx, DbKey)) of
+ Bin when is_binary(Bin) -> true;
+ not_found -> false
+ end.
+
+
+list_dbs(Tx, _Options) ->
+ Root = erlfdb_directory:root(),
+ CouchDB = erlfdb_directory:create_or_open(Tx, Root, [<<"couchdb">>]),
+ LayerPrefix = erlfdb_directory:get_name(CouchDB),
+ {Start, End} = erlfdb_tuple:range({?ALL_DBS}, LayerPrefix),
+ Future = erlfdb:get_range(Tx, Start, End),
+ lists:map(fun({K, _V}) ->
+ {?ALL_DBS, DbName} = erlfdb_tuple:unpack(K, LayerPrefix),
+ DbName
+ end, erlfdb:wait(Future)).
+
+
+get_info(#{} = Db) ->
+ #{
+ tx := Tx,
+ db_prefix := DbPrefix
+ } = ensure_current(Db),
+
+ {CStart, CEnd} = erlfdb_tuple:range({?DB_CHANGES}, DbPrefix),
+ ChangesFuture = erlfdb:get_range(Tx, CStart, CEnd, [
+ {streaming_mode, exact},
+ {limit, 1},
+ {reverse, true}
+ ]),
+
+ StatsPrefix = erlfdb_tuple:pack({?DB_STATS}, DbPrefix),
+ MetaFuture = erlfdb:get_range_startswith(Tx, StatsPrefix),
+
+ RawSeq = case erlfdb:wait(ChangesFuture) of
+ [] ->
+ vs_to_seq(fabric2_util:seq_zero_vs());
+ [{SeqKey, _}] ->
+ {?DB_CHANGES, SeqVS} = erlfdb_tuple:unpack(SeqKey, DbPrefix),
+ vs_to_seq(SeqVS)
+ end,
+ CProp = {update_seq, RawSeq},
+
+ MProps = lists:flatmap(fun({K, V}) ->
+ case erlfdb_tuple:unpack(K, DbPrefix) of
+ {?DB_STATS, <<"doc_count">>} ->
+ [{doc_count, ?bin2uint(V)}];
+ {?DB_STATS, <<"doc_del_count">>} ->
+ [{doc_del_count, ?bin2uint(V)}];
+ {?DB_STATS, <<"size">>} ->
+ Val = ?bin2uint(V),
+ [
+ {other, {[{data_size, Val}]}},
+ {sizes, {[
+ {active, 0},
+ {external, Val},
+ {file, 0}
+ ]}}
+ ];
+ {?DB_STATS, _} ->
+ []
+ end
+ end, erlfdb:wait(MetaFuture)),
+
+ [CProp | MProps].
+
+
+get_config(#{} = Db) ->
+ #{
+ tx := Tx,
+ db_prefix := DbPrefix
+ } = Db = ensure_current(Db),
+
+ {Start, End} = erlfdb_tuple:range({?DB_CONFIG}, DbPrefix),
+ Future = erlfdb:get_range(Tx, Start, End),
+
+ lists:map(fun({K, V}) ->
+ {?DB_CONFIG, Key} = erlfdb_tuple:unpack(K, DbPrefix),
+ {Key, V}
+ end, erlfdb:wait(Future)).
+
+
+set_config(#{} = Db, ConfigKey, ConfigVal) ->
+ #{
+ tx := Tx,
+ db_prefix := DbPrefix
+ } = ensure_current(Db),
+
+ Key = erlfdb_tuple:pack({?DB_CONFIG, ConfigKey}, DbPrefix),
+ erlfdb:set(Tx, Key, ConfigVal),
+ bump_metadata_version(Tx).
+
+
+get_stat(#{} = Db, StatKey) ->
+ #{
+ tx := Tx,
+ db_prefix := DbPrefix
+ } = ensure_current(Db),
+
+ Key = erlfdb_tuple:pack({?DB_STATS, StatKey}, DbPrefix),
+
+ % Might need to figure out some sort of type
+ % system here. Uints are because stats are all
+ % atomic op adds for the moment.
+ ?bin2uint(erlfdb:wait(erlfdb:get(Tx, Key))).
+
+
+incr_stat(_Db, _StatKey, 0) ->
+ ok;
+
+incr_stat(#{} = Db, StatKey, Increment) when is_integer(Increment) ->
+ #{
+ tx := Tx,
+ db_prefix := DbPrefix
+ } = ensure_current(Db),
+
+ Key = erlfdb_tuple:pack({?DB_STATS, StatKey}, DbPrefix),
+ erlfdb:add(Tx, Key, Increment).
+
+
+get_all_revs(#{} = Db, DocId) ->
+ #{
+ tx := Tx,
+ db_prefix := DbPrefix
+ } = ensure_current(Db),
+
+ Prefix = erlfdb_tuple:pack({?DB_REVS, DocId}, DbPrefix),
+ Options = [{streaming_mode, want_all}],
+ Future = erlfdb:get_range_startswith(Tx, Prefix, Options),
+ lists:map(fun({K, V}) ->
+ Key = erlfdb_tuple:unpack(K, DbPrefix),
+ Val = erlfdb_tuple:unpack(V),
+ fdb_to_revinfo(Key, Val)
+ end, erlfdb:wait(Future)).
+
+
+get_winning_revs(Db, DocId, NumRevs) ->
+ Future = get_winning_revs_future(Db, DocId, NumRevs),
+ get_winning_revs_wait(Db, Future).
+
+
+get_winning_revs_future(#{} = Db, DocId, NumRevs) ->
+ #{
+ tx := Tx,
+ db_prefix := DbPrefix
+ } = ensure_current(Db),
+
+ {StartKey, EndKey} = erlfdb_tuple:range({?DB_REVS, DocId}, DbPrefix),
+ Options = [{reverse, true}, {limit, NumRevs}],
+ erlfdb:get_range_raw(Tx, StartKey, EndKey, Options).
+
+
+get_winning_revs_wait(#{} = Db, Future) ->
+ #{
+ db_prefix := DbPrefix
+ } = ensure_current(Db),
+ {Rows, _, _} = erlfdb:wait(Future),
+ lists:map(fun({K, V}) ->
+ Key = erlfdb_tuple:unpack(K, DbPrefix),
+ Val = erlfdb_tuple:unpack(V),
+ fdb_to_revinfo(Key, Val)
+ end, Rows).
+
+
+get_non_deleted_rev(#{} = Db, DocId, RevId) ->
+ #{
+ tx := Tx,
+ db_prefix := DbPrefix
+ } = ensure_current(Db),
+
+ {RevPos, Rev} = RevId,
+
+ BaseKey = {?DB_REVS, DocId, true, RevPos, Rev},
+ Key = erlfdb_tuple:pack(BaseKey, DbPrefix),
+ case erlfdb:wait(erlfdb:get(Tx, Key)) of
+ not_found ->
+ not_found;
+ Val ->
+ fdb_to_revinfo(BaseKey, erlfdb_tuple:unpack(Val))
+ end.
+
+
+get_doc_body(Db, DocId, RevInfo) ->
+ Future = get_doc_body_future(Db, DocId, RevInfo),
+ get_doc_body_wait(Db, DocId, RevInfo, Future).
+
+
+get_doc_body_future(#{} = Db, DocId, RevInfo) ->
+ #{
+ tx := Tx,
+ db_prefix := DbPrefix
+ } = ensure_current(Db),
+
+ #{
+ rev_id := {RevPos, Rev}
+ } = RevInfo,
+
+ Key = erlfdb_tuple:pack({?DB_DOCS, DocId, RevPos, Rev}, DbPrefix),
+ erlfdb:get(Tx, Key).
+
+
+get_doc_body_wait(#{} = Db0, DocId, RevInfo, Future) ->
+ Db = ensure_current(Db0),
+
+ #{
+ rev_id := {RevPos, Rev},
+ rev_path := RevPath
+ } = RevInfo,
+
+ Val = erlfdb:wait(Future),
+ fdb_to_doc(Db, DocId, RevPos, [Rev | RevPath], Val).
+
+
+get_local_doc(#{} = Db0, <<?LOCAL_DOC_PREFIX, _/binary>> = DocId) ->
+ #{
+ tx := Tx,
+ db_prefix := DbPrefix
+ } = Db = ensure_current(Db0),
+
+ Key = erlfdb_tuple:pack({?DB_LOCAL_DOCS, DocId}, DbPrefix),
+ Val = erlfdb:wait(erlfdb:get(Tx, Key)),
+ fdb_to_local_doc(Db, DocId, Val).
+
+
+write_doc(#{} = Db0, Doc, NewWinner0, OldWinner, ToUpdate, ToRemove) ->
+ #{
+ tx := Tx,
+ db_prefix := DbPrefix
+ } = Db = ensure_current(Db0),
+
+ #doc{
+ id = DocId,
+ deleted = Deleted
+ } = Doc,
+
+ % Revision tree
+
+ NewWinner = NewWinner0#{winner := true},
+ NewRevId = maps:get(rev_id, NewWinner),
+
+ {WKey, WVal, WinnerVS} = revinfo_to_fdb(Tx, DbPrefix, DocId, NewWinner),
+ ok = erlfdb:set_versionstamped_value(Tx, WKey, WVal),
+
+ lists:foreach(fun(RI0) ->
+ RI = RI0#{winner := false},
+ {K, V, undefined} = revinfo_to_fdb(Tx, DbPrefix, DocId, RI),
+ ok = erlfdb:set(Tx, K, V)
+ end, ToUpdate),
+
+ lists:foreach(fun(RI0) ->
+ RI = RI0#{winner := false},
+ {K, _, undefined} = revinfo_to_fdb(Tx, DbPrefix, DocId, RI),
+ ok = erlfdb:clear(Tx, K)
+ end, ToRemove),
+
+ % _all_docs
+
+ UpdateStatus = case {OldWinner, NewWinner} of
+ {not_found, #{deleted := false}} ->
+ created;
+ {#{deleted := true}, #{deleted := false}} ->
+ recreated;
+ {#{deleted := false}, #{deleted := false}} ->
+ updated;
+ {#{deleted := false}, #{deleted := true}} ->
+ deleted
+ end,
+
+ case UpdateStatus of
+ Status when Status == created orelse Status == recreated ->
+ ADKey = erlfdb_tuple:pack({?DB_ALL_DOCS, DocId}, DbPrefix),
+ ADVal = erlfdb_tuple:pack(NewRevId),
+ ok = erlfdb:set(Tx, ADKey, ADVal);
+ deleted ->
+ ADKey = erlfdb_tuple:pack({?DB_ALL_DOCS, DocId}, DbPrefix),
+ ok = erlfdb:clear(Tx, ADKey);
+ updated ->
+ ok
+ end,
+
+ % _changes
+
+ if OldWinner == not_found -> ok; true ->
+ OldSeq = maps:get(sequence, OldWinner),
+ OldSeqKey = erlfdb_tuple:pack({?DB_CHANGES, OldSeq}, DbPrefix),
+ erlfdb:clear(Tx, OldSeqKey)
+ end,
+
+ NewSeqKey = erlfdb_tuple:pack_vs({?DB_CHANGES, WinnerVS}, DbPrefix),
+ NewSeqVal = erlfdb_tuple:pack({DocId, Deleted, NewRevId}),
+ erlfdb:set_versionstamped_key(Tx, NewSeqKey, NewSeqVal),
+
+ % And all the rest...
+
+ ok = write_doc_body(Db, Doc),
+
+ IsDDoc = case Doc#doc.id of
+ <<?DESIGN_DOC_PREFIX, _/binary>> -> true;
+ _ -> false
+ end,
+
+ if not IsDDoc -> ok; true ->
+ bump_db_version(Db)
+ end,
+
+ case UpdateStatus of
+ created ->
+ if not IsDDoc -> ok; true ->
+ incr_stat(Db, <<"doc_design_count">>, 1)
+ end,
+ incr_stat(Db, <<"doc_count">>, 1);
+ recreated ->
+ if not IsDDoc -> ok; true ->
+ incr_stat(Db, <<"doc_design_count">>, 1)
+ end,
+ incr_stat(Db, <<"doc_count">>, 1),
+ incr_stat(Db, <<"doc_del_count">>, -1);
+ deleted ->
+ if not IsDDoc -> ok; true ->
+ incr_stat(Db, <<"doc_design_count">>, -1)
+ end,
+ incr_stat(Db, <<"doc_count">>, -1),
+ incr_stat(Db, <<"doc_del_count">>, 1);
+ updated ->
+ ok
+ end,
+
+ ok.
+
+
+write_local_doc(#{} = Db0, Doc) ->
+ #{
+ tx := Tx
+ } = Db = ensure_current(Db0),
+
+ {LDocKey, LDocVal} = local_doc_to_fdb(Db, Doc),
+
+ WasDeleted = case erlfdb:wait(erlfdb:get(Tx, LDocKey)) of
+ <<_/binary>> -> false;
+ not_found -> true
+ end,
+
+ case Doc#doc.deleted of
+ true -> erlfdb:clear(Tx, LDocKey);
+ false -> erlfdb:set(Tx, LDocKey, LDocVal)
+ end,
+
+ case {WasDeleted, Doc#doc.deleted} of
+ {true, false} ->
+ incr_stat(Db, <<"doc_local_count">>, 1);
+ {false, true} ->
+ incr_stat(Db, <<"doc_local_count">>, -1);
+ _ ->
+ ok
+ end,
+
+ ok.
+
+
+read_attachment(#{} = Db, DocId, AttId) ->
+ #{
+ tx := Tx,
+ db_prefix := DbPrefix
+ } = ensure_current(Db),
+
+ AttKey = erlfdb_tuple:pack({?DB_ATTS, DocId, AttId}, DbPrefix),
+ case erlfdb:wait(erlfdb:get_range_startswith(Tx, AttKey)) of
+ not_found ->
+ throw({not_found, missing});
+ KVs ->
+ Vs = [V || {_K, V} <- KVs],
+ iolist_to_binary(Vs)
+ end.
+
+
+write_attachment(#{} = Db, DocId, Data) when is_binary(Data) ->
+ #{
+ tx := Tx,
+ db_prefix := DbPrefix
+ } = ensure_current(Db),
+
+ AttId = fabric2_util:uuid(),
+ Chunks = chunkify_attachment(Data),
+
+ lists:foldl(fun(Chunk, ChunkId) ->
+ AttKey = erlfdb_tuple:pack({?DB_ATTS, DocId, AttId, ChunkId}, DbPrefix),
+ ok = erlfdb:set(Tx, AttKey, Chunk),
+ ChunkId + 1
+ end, 0, Chunks),
+ {ok, AttId}.
+
+
+fold_docs(#{} = Db, UserFun, UserAcc0, Options) ->
+ #{
+ tx := Tx,
+ db_prefix := DbPrefix
+ } = ensure_current(Db),
+
+ {Reverse, Start, End} = get_dir_and_bounds(DbPrefix, Options),
+
+ DocCountKey = erlfdb_tuple:pack({?DB_STATS, <<"doc_count">>}, DbPrefix),
+ DocCountBin = erlfdb:wait(erlfdb:get(Tx, DocCountKey)),
+
+ try
+ UserAcc1 = maybe_stop(UserFun({meta, [
+ {total, ?bin2uint(DocCountBin)},
+ {offset, null}
+ ]}, UserAcc0)),
+
+ UserAcc2 = erlfdb:fold_range(Tx, Start, End, fun({K, V}, UserAccIn) ->
+ {?DB_ALL_DOCS, DocId} = erlfdb_tuple:unpack(K, DbPrefix),
+ RevId = erlfdb_tuple:unpack(V),
+ maybe_stop(UserFun({row, [
+ {id, DocId},
+ {key, DocId},
+ {value, couch_doc:rev_to_str(RevId)}
+ ]}, UserAccIn))
+ end, UserAcc1, [{reverse, Reverse}] ++ Options),
+
+ {ok, maybe_stop(UserFun(complete, UserAcc2))}
+ catch throw:{stop, FinalUserAcc} ->
+ {ok, FinalUserAcc}
+ end.
+
+
+fold_changes(#{} = Db, SinceSeq0, UserFun, UserAcc0, Options) ->
+ #{
+ tx := Tx,
+ db_prefix := DbPrefix
+ } = ensure_current(Db),
+
+ SinceSeq1 = get_since_seq(SinceSeq0),
+
+ Reverse = case fabric2_util:get_value(dir, Options, fwd) of
+ fwd -> false;
+ rev -> true
+ end,
+
+ {Start0, End0} = case Reverse of
+ false -> {SinceSeq1, fabric2_util:seq_max_vs()};
+ true -> {fabric2_util:seq_zero_vs(), SinceSeq1}
+ end,
+
+ Start1 = erlfdb_tuple:pack({?DB_CHANGES, Start0}, DbPrefix),
+ End1 = erlfdb_tuple:pack({?DB_CHANGES, End0}, DbPrefix),
+
+ {Start, End} = case Reverse of
+ false -> {erlfdb_key:first_greater_than(Start1), End1};
+ true -> {Start1, erlfdb_key:first_greater_than(End1)}
+ end,
+
+ try
+ {ok, erlfdb:fold_range(Tx, Start, End, fun({K, V}, UserAccIn) ->
+ {?DB_CHANGES, SeqVS} = erlfdb_tuple:unpack(K, DbPrefix),
+ {DocId, Deleted, RevId} = erlfdb_tuple:unpack(V),
+
+ Change = #{
+ id => DocId,
+ sequence => vs_to_seq(SeqVS),
+ rev_id => RevId,
+ deleted => Deleted
+ },
+
+ maybe_stop(UserFun(Change, UserAccIn))
+ end, UserAcc0, [{reverse, Reverse}] ++ Options)}
+ catch throw:{stop, FinalUserAcc} ->
+ {ok, FinalUserAcc}
+ end.
+
+
+get_last_change(#{} = Db) ->
+ #{
+ tx := Tx,
+ db_prefix := DbPrefix
+ } = ensure_current(Db),
+
+ {Start, End} = erlfdb_tuple:range({?DB_CHANGES}, DbPrefix),
+ Options = [{limit, 1}, {reverse, true}],
+ case erlfdb:get_range(Tx, Start, End, Options) of
+ [] ->
+ vs_to_seq(fabric2_util:seq_zero_vs());
+ [{K, _V}] ->
+ {?DB_CHANGES, SeqVS} = erlfdb_tuple:unpack(K, DbPrefix),
+ vs_to_seq(SeqVS)
+ end.
+
+
+maybe_stop({ok, Acc}) ->
+ Acc;
+maybe_stop({stop, Acc}) ->
+ throw({stop, Acc}).
+
+
+vs_to_seq(VS) ->
+ <<51:8, SeqBin:12/binary>> = erlfdb_tuple:pack({VS}),
+ fabric2_util:to_hex(SeqBin).
+
+
+debug_cluster() ->
+ debug_cluster(<<>>, <<16#FE, 16#FF, 16#FF>>).
+
+
+debug_cluster(Start, End) ->
+ transactional(fun(Tx) ->
+ lists:foreach(fun({Key, Val}) ->
+ io:format("~s => ~s~n", [
+ string:pad(erlfdb_util:repr(Key), 60),
+ erlfdb_util:repr(Val)
+ ])
+ end, erlfdb:get_range(Tx, Start, End))
+ end).
+
+
+init_db(Tx, DbName, Options) ->
+ Root = erlfdb_directory:root(),
+ CouchDB = erlfdb_directory:create_or_open(Tx, Root, [<<"couchdb">>]),
+ Prefix = erlfdb_directory:get_name(CouchDB),
+ Version = erlfdb:wait(erlfdb:get(Tx, ?METADATA_VERSION_KEY)),
+ #{
+ name => DbName,
+ tx => Tx,
+ layer_prefix => Prefix,
+ md_version => Version,
+
+ db_options => Options
+ }.
+
+
+load_validate_doc_funs(#{} = Db) ->
+ FoldFun = fun
+ ({row, Row}, Acc) ->
+ DDocInfo = #{id => fabric2_util:get_value(id, Row)},
+ {ok, [DDocInfo | Acc]};
+ (_, Acc) ->
+ {ok, Acc}
+ end,
+
+ Options = [
+ {start_key, <<"_design/">>},
+ {end_key, <<"_design0">>}
+ ],
+
+ {ok, Infos1} = fold_docs(Db, FoldFun, [], Options),
+
+ Infos2 = lists:map(fun(Info) ->
+ #{
+ id := DDocId = <<"_design/", _/binary>>
+ } = Info,
+ Info#{
+ rev_info => get_winning_revs_future(Db, DDocId, 1)
+ }
+ end, Infos1),
+
+ Infos3 = lists:flatmap(fun(Info) ->
+ #{
+ id := DDocId,
+ rev_info := RevInfoFuture
+ } = Info,
+ [RevInfo] = get_winning_revs_wait(Db, RevInfoFuture),
+ #{deleted := Deleted} = RevInfo,
+ if Deleted -> []; true ->
+ [Info#{
+ rev_info := RevInfo,
+ body => get_doc_body_future(Db, DDocId, RevInfo)
+ }]
+ end
+ end, Infos2),
+
+ VDUs = lists:flatmap(fun(Info) ->
+ #{
+ id := DDocId,
+ rev_info := RevInfo,
+ body := BodyFuture
+ } = Info,
+ #doc{} = Doc = get_doc_body_wait(Db, DDocId, RevInfo, BodyFuture),
+ case couch_doc:get_validate_doc_fun(Doc) of
+ nil -> [];
+ Fun -> [Fun]
+ end
+ end, Infos3),
+
+ Db#{
+ validate_doc_update_funs := VDUs
+ }.
+
+
+bump_metadata_version(Tx) ->
+ % The 14 zero bytes is pulled from the PR for adding the
+ % metadata version key. Not sure why 14 bytes when version
+ % stamps are only 80, but whatever for now.
+ erlfdb:set_versionstamped_value(Tx, ?METADATA_VERSION_KEY, <<0:112>>).
+
+
+bump_db_version(#{} = Db) ->
+ #{
+ tx := Tx,
+ db_prefix := DbPrefix
+ } = Db,
+
+ DbVersionKey = erlfdb_tuple:pack({?DB_VERSION}, DbPrefix),
+ DbVersion = fabric2_util:uuid(),
+ ok = erlfdb:set(Tx, DbVersionKey, DbVersion).
+
+
+write_doc_body(#{} = Db0, #doc{} = Doc) ->
+ #{
+ tx := Tx
+ } = Db = ensure_current(Db0),
+
+ {NewDocKey, NewDocVal} = doc_to_fdb(Db, Doc),
+ erlfdb:set(Tx, NewDocKey, NewDocVal).
+
+
+revinfo_to_fdb(Tx, DbPrefix, DocId, #{winner := true} = RevId) ->
+ #{
+ deleted := Deleted,
+ rev_id := {RevPos, Rev},
+ rev_path := RevPath,
+ branch_count := BranchCount
+ } = RevId,
+ VS = new_versionstamp(Tx),
+ Key = {?DB_REVS, DocId, not Deleted, RevPos, Rev},
+ Val = {?CURR_REV_FORMAT, VS, BranchCount, list_to_tuple(RevPath)},
+ KBin = erlfdb_tuple:pack(Key, DbPrefix),
+ VBin = erlfdb_tuple:pack_vs(Val),
+ {KBin, VBin, VS};
+
+revinfo_to_fdb(_Tx, DbPrefix, DocId, #{} = RevId) ->
+ #{
+ deleted := Deleted,
+ rev_id := {RevPos, Rev},
+ rev_path := RevPath
+ } = RevId,
+ Key = {?DB_REVS, DocId, not Deleted, RevPos, Rev},
+ Val = {?CURR_REV_FORMAT, list_to_tuple(RevPath)},
+ KBin = erlfdb_tuple:pack(Key, DbPrefix),
+ VBin = erlfdb_tuple:pack(Val),
+ {KBin, VBin, undefined}.
+
+
+fdb_to_revinfo(Key, {?CURR_REV_FORMAT, _, _, _} = Val) ->
+ {?DB_REVS, _DocId, NotDeleted, RevPos, Rev} = Key,
+ {_RevFormat, Sequence, BranchCount, RevPath} = Val,
+ #{
+ winner => true,
+ deleted => not NotDeleted,
+ rev_id => {RevPos, Rev},
+ rev_path => tuple_to_list(RevPath),
+ sequence => Sequence,
+ branch_count => BranchCount
+ };
+
+fdb_to_revinfo(Key, {?CURR_REV_FORMAT, _} = Val) ->
+ {?DB_REVS, _DocId, NotDeleted, RevPos, Rev} = Key,
+ {_RevFormat, RevPath} = Val,
+ #{
+ winner => false,
+ deleted => not NotDeleted,
+ rev_id => {RevPos, Rev},
+ rev_path => tuple_to_list(RevPath),
+ sequence => undefined,
+ branch_count => undefined
+ }.
+
+
+doc_to_fdb(Db, #doc{} = Doc) ->
+ #{
+ db_prefix := DbPrefix
+ } = Db,
+
+ #doc{
+ id = Id,
+ revs = {Start, [Rev | _]},
+ body = Body,
+ atts = Atts,
+ deleted = Deleted
+ } = doc_flush_atts(Db, Doc),
+
+ Key = erlfdb_tuple:pack({?DB_DOCS, Id, Start, Rev}, DbPrefix),
+ Val = {Body, Atts, Deleted},
+ {Key, term_to_binary(Val, [{minor_version, 1}])}.
+
+
+fdb_to_doc(_Db, DocId, Pos, Path, Bin) when is_binary(Bin) ->
+ {Body, Atts, Deleted} = binary_to_term(Bin, [safe]),
+ #doc{
+ id = DocId,
+ revs = {Pos, Path},
+ body = Body,
+ atts = Atts,
+ deleted = Deleted
+ };
+fdb_to_doc(_Db, _DocId, _Pos, _Path, not_found) ->
+ {not_found, missing}.
+
+
+local_doc_to_fdb(Db, #doc{} = Doc) ->
+ #{
+ db_prefix := DbPrefix
+ } = Db,
+
+ #doc{
+ id = Id,
+ revs = {0, [Rev]},
+ body = Body
+ } = Doc,
+
+ StoreRev = case Rev of
+ _ when is_integer(Rev) -> integer_to_binary(Rev);
+ _ when is_binary(Rev) -> Rev
+ end,
+
+ Key = erlfdb_tuple:pack({?DB_LOCAL_DOCS, Id}, DbPrefix),
+ Val = {StoreRev, Body},
+ {Key, term_to_binary(Val, [{minor_version, 1}])}.
+
+
+fdb_to_local_doc(_Db, DocId, Bin) when is_binary(Bin) ->
+ {Rev, Body} = binary_to_term(Bin, [safe]),
+ #doc{
+ id = DocId,
+ revs = {0, [Rev]},
+ deleted = false,
+ body = Body
+ };
+fdb_to_local_doc(_Db, _DocId, not_found) ->
+ {not_found, missing}.
+
+
+doc_flush_atts(Db, Doc) ->
+ Atts = lists:map(fun(Att) ->
+ couch_att:flush(Db, Doc#doc.id, Att)
+ end, Doc#doc.atts),
+ Doc#doc{atts = Atts}.
+
+
+chunkify_attachment(Data) ->
+ case Data of
+ <<>> ->
+ [];
+ <<Head:?ATTACHMENT_CHUNK_SIZE/binary, Rest/binary>> ->
+ [Head | chunkify_attachment(Rest)];
+ <<_/binary>> when size(Data) < ?ATTACHMENT_CHUNK_SIZE ->
+ [Data]
+ end.
+
+
+get_dir_and_bounds(DbPrefix, Options) ->
+ Reverse = case fabric2_util:get_value(dir, Options, fwd) of
+ fwd -> false;
+ rev -> true
+ end,
+ StartKey0 = fabric2_util:get_value(start_key, Options),
+ EndKeyGt = fabric2_util:get_value(end_key_gt, Options),
+ EndKey0 = fabric2_util:get_value(end_key, Options, EndKeyGt),
+ InclusiveEnd = EndKeyGt == undefined,
+
+ % CouchDB swaps the key meanings based on the direction
+ % of the fold. FoundationDB does not so we have to
+ % swap back here.
+ {StartKey1, EndKey1} = case Reverse of
+ false -> {StartKey0, EndKey0};
+ true -> {EndKey0, StartKey0}
+ end,
+
+ % Set the maximum bounds for the start and endkey
+ StartKey2 = case StartKey1 of
+ undefined -> {?DB_ALL_DOCS};
+ SK2 when is_binary(SK2) -> {?DB_ALL_DOCS, SK2}
+ end,
+
+ EndKey2 = case EndKey1 of
+ undefined -> {?DB_ALL_DOCS, <<16#FF>>};
+ EK2 when is_binary(EK2) -> {?DB_ALL_DOCS, EK2}
+ end,
+
+ StartKey3 = erlfdb_tuple:pack(StartKey2, DbPrefix),
+ EndKey3 = erlfdb_tuple:pack(EndKey2, DbPrefix),
+
+ % FoundationDB ranges are applied as SK <= key < EK
+ % By default, CouchDB is SK <= key <= EK with the
+ % optional inclusive_end=false option changing that
+ % to SK <= key < EK. Also, remember that CouchDB
+ % swaps the meaning of SK and EK based on direction.
+ %
+ % Thus we have this wonderful bit of logic to account
+ % for all of those combinations.
+
+ StartKey4 = case {Reverse, InclusiveEnd} of
+ {true, false} ->
+ erlfdb_key:first_greater_than(StartKey3);
+ _ ->
+ StartKey3
+ end,
+
+ EndKey4 = case {Reverse, InclusiveEnd} of
+ {false, true} when EndKey0 /= undefined ->
+ erlfdb_key:first_greater_than(EndKey3);
+ {true, _} ->
+ erlfdb_key:first_greater_than(EndKey3);
+ _ ->
+ EndKey3
+ end,
+
+ {Reverse, StartKey4, EndKey4}.
+
+
+get_since_seq(Seq) when Seq == <<>>; Seq == <<"0">>; Seq == 0->
+ fabric2_util:seq_zero_vs();
+
+get_since_seq(Seq) when Seq == now; Seq == <<"now">> ->
+ fabric2_util:seq_max_vs();
+
+get_since_seq(Seq) when is_binary(Seq), size(Seq) == 24 ->
+ Seq1 = fabric2_util:from_hex(Seq),
+ Seq2 = <<51:8, Seq1/binary>>,
+ {SeqVS} = erlfdb_tuple:unpack(Seq2),
+ SeqVS;
+
+get_since_seq(List) when is_list(List) ->
+ get_since_seq(list_to_binary(List));
+
+get_since_seq(Seq) ->
+ erlang:error({invalid_since_seq, Seq}).
+
+
+get_db_handle() ->
+ case get(?PDICT_DB_KEY) of
+ undefined ->
+ {ok, Db} = application:get_env(fabric, db),
+ put(?PDICT_DB_KEY, Db),
+ Db;
+ Db ->
+ Db
+ end.
+
+
+require_transaction(#{tx := {erlfdb_transaction, _}} = _Db) ->
+ ok;
+require_transaction(#{} = _Db) ->
+ erlang:error(transaction_required).
+
+
+ensure_current(Db) ->
+ ensure_current(Db, true).
+
+
+ensure_current(#{} = Db, CheckDbVersion) ->
+ require_transaction(Db),
+
+ #{
+ tx := Tx,
+ md_version := MetaDataVersion
+ } = Db,
+
+ case erlfdb:wait(erlfdb:get(Tx, ?METADATA_VERSION_KEY)) of
+ MetaDataVersion -> Db;
+ _NewVersion -> reopen(Db)
+ end,
+
+ AlreadyChecked = get(?PDICT_CHECKED_DB_IS_CURRENT),
+ if not CheckDbVersion orelse AlreadyChecked == true -> Db; true ->
+ #{
+ db_prefix := DbPrefix,
+ db_version := DbVersion
+ } = Db,
+
+ DbVersionKey = erlfdb_tuple:pack({?DB_VERSION}, DbPrefix),
+
+ case erlfdb:wait(erlfdb:get(Tx, DbVersionKey)) of
+ DbVersion ->
+ put(?PDICT_CHECKED_DB_IS_CURRENT, true),
+ Db;
+ _NewDBVersion ->
+ fabric2_server:remove(maps:get(name, Db)),
+ reopen(Db)
+ end
+ end.
+
+
+is_transaction_applied(Tx) ->
+ is_commit_unknown_result()
+ andalso has_transaction_id()
+ andalso transaction_id_exists(Tx).
+
+
+get_previous_transaction_result() ->
+ get(?PDICT_TX_RES_KEY).
+
+
+execute_transaction(Tx, Fun, LayerPrefix) ->
+ put(?PDICT_CHECKED_DB_IS_CURRENT, false),
+ Result = Fun(Tx),
+ case erlfdb:is_read_only(Tx) of
+ true ->
+ ok;
+ false ->
+ erlfdb:set(Tx, get_transaction_id(Tx, LayerPrefix), <<>>),
+ put(?PDICT_TX_RES_KEY, Result)
+ end,
+ Result.
+
+
+clear_transaction() ->
+ fabric2_txids:remove(get(?PDICT_TX_ID_KEY)),
+ erase(?PDICT_CHECKED_DB_IS_CURRENT),
+ erase(?PDICT_TX_ID_KEY),
+ erase(?PDICT_TX_RES_KEY).
+
+
+is_commit_unknown_result() ->
+ erlfdb:get_last_error() == ?COMMIT_UNKNOWN_RESULT.
+
+
+has_transaction_id() ->
+ is_binary(get(?PDICT_TX_ID_KEY)).
+
+
+transaction_id_exists(Tx) ->
+ erlfdb:wait(erlfdb:get(Tx, get(?PDICT_TX_ID_KEY))) == <<>>.
+
+
+get_transaction_id(Tx, LayerPrefix) ->
+ case get(?PDICT_TX_ID_KEY) of
+ undefined ->
+ TxId = fabric2_txids:create(Tx, LayerPrefix),
+ put(?PDICT_TX_ID_KEY, TxId),
+ TxId;
+ TxId when is_binary(TxId) ->
+ TxId
+ end.
+
+
+new_versionstamp(Tx) ->
+ TxId = erlfdb:get_next_tx_id(Tx),
+ {versionstamp, 16#FFFFFFFFFFFFFFFF, 16#FFFF, TxId}.
+
diff --git a/src/fabric/src/fabric2_server.erl b/src/fabric/src/fabric2_server.erl
new file mode 100644
index 000000000..5b826cd14
--- /dev/null
+++ b/src/fabric/src/fabric2_server.erl
@@ -0,0 +1,104 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(fabric2_server).
+-behaviour(gen_server).
+-vsn(1).
+
+
+-export([
+ start_link/0,
+ fetch/1,
+ store/1,
+ remove/1
+]).
+
+
+-export([
+ init/1,
+ terminate/2,
+ handle_call/3,
+ handle_cast/2,
+ handle_info/2,
+ code_change/3
+]).
+
+
+-include_lib("couch/include/couch_db.hrl").
+
+
+-define(CLUSTER_FILE, "/usr/local/etc/foundationdb/fdb.cluster").
+
+
+start_link() ->
+ gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+
+
+fetch(DbName) when is_binary(DbName) ->
+ case ets:lookup(?MODULE, DbName) of
+ [{DbName, #{} = Db}] -> Db;
+ [] -> undefined
+ end.
+
+
+store(#{name := DbName} = Db0) when is_binary(DbName) ->
+ Db1 = Db0#{
+ tx := undefined,
+ user_ctx := #user_ctx{}
+ },
+ true = ets:insert(?MODULE, {DbName, Db1}),
+ ok.
+
+
+remove(DbName) when is_binary(DbName) ->
+ true = ets:delete(?MODULE, DbName),
+ ok.
+
+
+init(_) ->
+ ets:new(?MODULE, [
+ public,
+ named_table,
+ {read_concurrency, true},
+ {write_concurrency, true}
+ ]),
+
+ Db = case application:get_env(fabric, eunit_run) of
+ {ok, true} ->
+ erlfdb_util:get_test_db([empty]);
+ undefined ->
+ ClusterStr = config:get("erlfdb", "cluster_file", ?CLUSTER_FILE),
+ erlfdb:open(iolist_to_binary(ClusterStr))
+ end,
+ application:set_env(fabric, db, Db),
+
+ {ok, nil}.
+
+
+terminate(_, _St) ->
+ ok.
+
+
+handle_call(Msg, _From, St) ->
+ {stop, {bad_call, Msg}, {bad_call, Msg}, St}.
+
+
+handle_cast(Msg, St) ->
+ {stop, {bad_cast, Msg}, St}.
+
+
+handle_info(Msg, St) ->
+ {stop, {bad_info, Msg}, St}.
+
+
+code_change(_OldVsn, St, _Extra) ->
+ {ok, St}.
diff --git a/src/fabric/src/fabric2_sup.erl b/src/fabric/src/fabric2_sup.erl
new file mode 100644
index 000000000..73c6c1f4d
--- /dev/null
+++ b/src/fabric/src/fabric2_sup.erl
@@ -0,0 +1,47 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(fabric2_sup).
+-behaviour(supervisor).
+-vsn(1).
+
+
+-export([
+ start_link/1
+]).
+
+-export([
+ init/1
+]).
+
+
+start_link(Args) ->
+ supervisor:start_link({local, ?MODULE}, ?MODULE, Args).
+
+
+init([]) ->
+ Flags = #{
+ strategy => one_for_one,
+ intensity => 1,
+ period => 5
+ },
+ Children = [
+ #{
+ id => fabric2_server,
+ start => {fabric2_server, start_link, []}
+ },
+ #{
+ id => fabric2_txids,
+ start => {fabric2_txids, start_link, []}
+ }
+ ],
+ {ok, {Flags, Children}}.
diff --git a/src/fabric/src/fabric2_txids.erl b/src/fabric/src/fabric2_txids.erl
new file mode 100644
index 000000000..bbb8bdf57
--- /dev/null
+++ b/src/fabric/src/fabric2_txids.erl
@@ -0,0 +1,144 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(fabric2_txids).
+-behaviour(gen_server).
+-vsn(1).
+
+
+-export([
+ start_link/0,
+ create/2,
+ remove/1
+]).
+
+
+-export([
+ init/1,
+ terminate/2,
+ handle_call/3,
+ handle_cast/2,
+ handle_info/2,
+ code_change/3
+]).
+
+
+-include("fabric2.hrl").
+
+
+-define(ONE_HOUR, 3600000000).
+-define(MAX_TX_IDS, 1000).
+
+
+start_link() ->
+ gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+
+
+create(Tx, undefined) ->
+ Root = erlfdb_directory:root(),
+ CouchDB = erlfdb_directory:create_or_open(Tx, Root, [<<"couchdb">>]),
+ Prefix = erlfdb_directory:get_name(CouchDB),
+ create(Tx, Prefix);
+
+create(_Tx, LayerPrefix) ->
+ {Mega, Secs, Micro} = os:timestamp(),
+ Key = {?TX_IDS, Mega, Secs, Micro, fabric2_util:uuid()},
+ erlfdb_tuple:pack(Key, LayerPrefix).
+
+
+remove(TxId) when is_binary(TxId) ->
+ gen_server:cast(?MODULE, {remove, TxId});
+
+remove(undefined) ->
+ ok.
+
+
+
+init(_) ->
+ {ok, #{
+ last_sweep => os:timestamp(),
+ txids => []
+ }}.
+
+
+terminate(_, #{txids := TxIds}) ->
+ if TxIds == [] -> ok; true ->
+ fabric2_fdb:transactional(fun(Tx) ->
+ lists:foreach(fun(TxId) ->
+ erlfdb:clear(Tx, TxId)
+ end)
+ end)
+ end,
+ ok.
+
+
+handle_call(Msg, _From, St) ->
+ {stop, {bad_call, Msg}, {bad_call, Msg}, St}.
+
+
+handle_cast({remove, TxId}, St) ->
+ #{
+ last_sweep := LastSweep,
+ txids := TxIds
+ } = St,
+
+ NewTxIds = [TxId | TxIds],
+ NewSt = St#{txids := NewTxIds},
+
+ NeedsSweep = timer:now_diff(os:timestamp(), LastSweep) > ?ONE_HOUR,
+
+ case NeedsSweep orelse length(NewTxIds) >= ?MAX_TX_IDS of
+ true ->
+ {noreply, clean(NewSt, NeedsSweep)};
+ false ->
+ {noreply, NewSt}
+ end.
+
+
+handle_info(Msg, St) ->
+ {stop, {bad_info, Msg}, St}.
+
+
+code_change(_OldVsn, St, _Extra) ->
+ {ok, St}.
+
+
+clean(St, NeedsSweep) ->
+ #{
+ last_sweep := LastSweep,
+ txids := TxIds
+ } = St,
+ fabric2_fdb:transactional(fun(Tx) ->
+ lists:foreach(fun(TxId) ->
+ erlfdb:clear(Tx, TxId)
+ end, TxIds),
+ case NeedsSweep of
+ true ->
+ sweep(Tx, LastSweep),
+ St#{
+ last_sweep := os:timestamp(),
+ txids := []
+ };
+ false ->
+ St#{txids := []}
+ end
+ end).
+
+
+sweep(Tx, {Mega, Secs, Micro}) ->
+ Root = erlfdb_directory:root(),
+ CouchDB = erlfdb_directory:create_or_open(Tx, Root, [<<"couchdb">>]),
+ Prefix = erlfdb_directory:get_name(CouchDB),
+ StartKey = erlfdb_tuple:pack({?TX_IDS}, Prefix),
+ EndKey = erlfdb_tuple:pack({?TX_IDS, Mega, Secs, Micro}, Prefix),
+ erlfdb:set_option(Tx, next_write_no_write_conflict_range),
+ erlfdb:clear_range(Tx, StartKey, EndKey).
diff --git a/src/fabric/src/fabric2_util.erl b/src/fabric/src/fabric2_util.erl
new file mode 100644
index 000000000..6e2df67c2
--- /dev/null
+++ b/src/fabric/src/fabric2_util.erl
@@ -0,0 +1,203 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(fabric2_util).
+
+
+-export([
+ revinfo_to_path/1,
+ sort_revinfos/1,
+
+ seq_zero_vs/0,
+ seq_max_vs/0,
+
+ user_ctx_to_json/1,
+
+ validate_security_object/1,
+
+ get_value/2,
+ get_value/3,
+ to_hex/1,
+ from_hex/1,
+ uuid/0
+]).
+
+
+-include_lib("couch/include/couch_db.hrl").
+
+
+revinfo_to_path(RevInfo) ->
+ #{
+ rev_id := {RevPos, Rev},
+ rev_path := RevPath
+ } = RevInfo,
+ Revs = lists:reverse(RevPath, [Rev]),
+ Path = revinfo_to_path(RevInfo, Revs),
+ {RevPos - length(Revs) + 1, Path}.
+
+
+revinfo_to_path(RevInfo, [Rev]) ->
+ {Rev, RevInfo, []};
+
+revinfo_to_path(RevInfo, [Rev | Rest]) ->
+ {Rev, ?REV_MISSING, [revinfo_to_path(RevInfo, Rest)]}.
+
+
+sort_revinfos(RevInfos) ->
+ CmpFun = fun(A, B) -> rev_sort_key(A) > rev_sort_key(B) end,
+ lists:sort(CmpFun, RevInfos).
+
+
+rev_sort_key(#{} = RevInfo) ->
+ #{
+ deleted := Deleted,
+ rev_id := {RevPos, Rev}
+ } = RevInfo,
+ {not Deleted, RevPos, Rev}.
+
+
+seq_zero_vs() ->
+ {versionstamp, 0, 0, 0}.
+
+
+seq_max_vs() ->
+ {versionstamp, 18446744073709551615, 65535, 65535}.
+
+
+user_ctx_to_json(Db) ->
+ UserCtx = fabric2_db:get_user_ctx(Db),
+ {[
+ {<<"db">>, fabric2_db:name(Db)},
+ {<<"name">>, UserCtx#user_ctx.name},
+ {<<"roles">>, UserCtx#user_ctx.roles}
+ ]}.
+
+
+validate_security_object({SecProps}) ->
+ Admins = get_value(<<"admins">>, SecProps, {[]}),
+ ok = validate_names_and_roles(Admins),
+
+ % we fallback to readers here for backwards compatibility
+ Readers = get_value(<<"readers">>, SecProps, {[]}),
+ Members = get_value(<<"members">>, SecProps, Readers),
+ ok = validate_names_and_roles(Members).
+
+
+validate_names_and_roles({Props}) when is_list(Props) ->
+ validate_json_list_of_strings(<<"names">>, Props),
+ validate_json_list_of_strings(<<"roles">>, Props);
+validate_names_and_roles(_) ->
+ throw("admins or members must be a JSON list of strings").
+
+
+validate_json_list_of_strings(Member, Props) ->
+ case get_value(Member, Props, []) of
+ Values when is_list(Values) ->
+ NonBinary = lists:filter(fun(V) -> not is_binary(V) end, Values),
+ if NonBinary == [] -> ok; true ->
+ MemberStr = binary_to_list(Member),
+ throw(MemberStr ++ " must be a JSON list of strings")
+ end;
+ _ ->
+ MemberStr = binary_to_list(Member),
+ throw(MemberStr ++ " must be a JSON list of strings")
+ end.
+
+
+get_value(Key, List) ->
+ get_value(Key, List, undefined).
+
+
+get_value(Key, List, Default) ->
+ case lists:keysearch(Key, 1, List) of
+ {value, {Key,Value}} ->
+ Value;
+ false ->
+ Default
+ end.
+
+
+to_hex(Bin) ->
+ list_to_binary(to_hex_int(Bin)).
+
+
+to_hex_int(<<>>) ->
+ [];
+to_hex_int(<<Hi:4, Lo:4, Rest/binary>>) ->
+ [nibble_to_hex(Hi), nibble_to_hex(Lo) | to_hex(Rest)].
+
+
+nibble_to_hex(I) ->
+ case I of
+ 0 -> $0;
+ 1 -> $1;
+ 2 -> $2;
+ 3 -> $3;
+ 4 -> $4;
+ 5 -> $5;
+ 6 -> $6;
+ 7 -> $7;
+ 8 -> $8;
+ 9 -> $9;
+ 10 -> $a;
+ 11 -> $b;
+ 12 -> $c;
+ 13 -> $d;
+ 14 -> $e;
+ 15 -> $f
+ end.
+
+
+from_hex(Bin) ->
+ iolist_to_binary(from_hex_int(Bin)).
+
+
+from_hex_int(<<>>) ->
+ [];
+from_hex_int(<<Hi:8, Lo:8, RestBinary/binary>>) ->
+ HiNib = hex_to_nibble(Hi),
+ LoNib = hex_to_nibble(Lo),
+ [<<HiNib:4, LoNib:4>> | from_hex_int(RestBinary)];
+from_hex_int(<<BadHex/binary>>) ->
+ erlang:error({invalid_hex, BadHex}).
+
+
+hex_to_nibble(N) ->
+ case N of
+ $0 -> 0;
+ $1 -> 1;
+ $2 -> 2;
+ $3 -> 3;
+ $4 -> 4;
+ $5 -> 5;
+ $6 -> 6;
+ $7 -> 7;
+ $8 -> 8;
+ $9 -> 9;
+ $a -> 10;
+ $A -> 10;
+ $b -> 11;
+ $B -> 11;
+ $c -> 12;
+ $C -> 12;
+ $d -> 13;
+ $D -> 13;
+ $e -> 14;
+ $E -> 14;
+ $f -> 15;
+ $F -> 15;
+ _ -> erlang:error({invalid_hex, N})
+ end.
+
+
+uuid() ->
+ to_hex(crypto:strong_rand_bytes(16)).