diff options
author | Paul J. Davis <paul.joseph.davis@gmail.com> | 2019-05-23 21:58:28 -0500 |
---|---|---|
committer | Paul J. Davis <paul.joseph.davis@gmail.com> | 2019-05-23 21:58:33 -0500 |
commit | 819c9e6037e064a9e7d53b2f57bc86694374e6ce (patch) | |
tree | fb16e242b6e92014ed5d572e56805b4cbc5b802a | |
parent | a11ac081bc97cd1e44138a32feb38088d31f18e4 (diff) | |
download | couchdb-819c9e6037e064a9e7d53b2f57bc86694374e6ce.tar.gz |
BLARGH YE MATEY
Storm rolling through. Pushing to the cloud in case my laptop gets
fried.
-rw-r--r-- | src/chttpd/src/chttpd.erl | 17 | ||||
-rw-r--r-- | src/chttpd/src/chttpd_changes.erl | 308 | ||||
-rw-r--r-- | src/chttpd/src/chttpd_db.erl | 148 | ||||
-rw-r--r-- | src/chttpd/src/chttpd_external.erl | 35 | ||||
-rw-r--r-- | src/couch/src/couch_att.erl | 658 | ||||
-rw-r--r-- | src/couch/src/couch_doc.erl | 11 | ||||
-rw-r--r-- | src/couch_replicator/src/couch_replicator_api_wrap.erl | 7 | ||||
-rw-r--r-- | src/couch_replicator/src/couch_replicator_changes_reader.erl | 1 | ||||
-rw-r--r-- | src/couch_replicator/src/couch_replicator_scheduler_job.erl | 1 | ||||
-rw-r--r-- | src/ddoc_cache/src/ddoc_cache_entry_ddocid.erl | 2 | ||||
-rw-r--r-- | src/ddoc_cache/src/ddoc_cache_entry_ddocid_rev.erl | 2 | ||||
-rw-r--r-- | src/fabric/src/fabric2.hrl | 4 | ||||
-rw-r--r-- | src/fabric/src/fabric2_db.erl | 133 | ||||
-rw-r--r-- | src/fabric/src/fabric2_events.erl | 84 | ||||
-rw-r--r-- | src/fabric/src/fabric2_fdb.erl | 130 | ||||
-rw-r--r-- | src/fabric/src/fabric2_util.erl | 5 | ||||
-rw-r--r-- | src/fabric/test/fabric2_doc_crud_tests.erl | 15 |
17 files changed, 891 insertions, 670 deletions
diff --git a/src/chttpd/src/chttpd.erl b/src/chttpd/src/chttpd.erl index 631eb77c9..c548e1818 100644 --- a/src/chttpd/src/chttpd.erl +++ b/src/chttpd/src/chttpd.erl @@ -25,7 +25,7 @@ error_info/1, parse_form/1, json_body/1, json_body_obj/1, body/1, doc_etag/1, make_etag/1, etag_respond/3, etag_match/2, partition/1, serve_file/3, serve_file/4, - server_header/0, start_chunked_response/3,send_chunk/2, + server_header/0, start_chunked_response/3,send_chunk/2,last_chunk/1, start_response_length/4, send/2, start_json_response/2, start_json_response/3, end_json_response/1, send_response/4, send_response_no_cors/4, @@ -381,7 +381,7 @@ update_stats(#httpd{begin_ts = BeginTime}, #httpd_resp{} = Res) -> end, Res. -maybe_log(#httpd{} = HttpReq, #httpd_resp{should_log = true} = HttpResp) -> +maybe_log(#httpd{} = HttpReq, #httpd_resp{should_log = _} = HttpResp) -> #httpd{ mochi_req = MochiReq, begin_ts = BeginTime, @@ -397,9 +397,9 @@ maybe_log(#httpd{} = HttpReq, #httpd_resp{should_log = true} = HttpResp) -> Host = MochiReq:get_header_value("Host"), RawUri = MochiReq:get(raw_path), RequestTime = timer:now_diff(EndTime, BeginTime) / 1000, - couch_log:notice("~s ~s ~s ~s ~s ~B ~p ~B", [Host, Peer, User, + couch_log:error("~s ~s ~s ~s ~s ~B ~p ~B", [Host, Peer, User, Method, RawUri, Code, Status, round(RequestTime)]); -maybe_log(_HttpReq, #httpd_resp{should_log = false}) -> +maybe_log(_HttpReq, _) -> ok. @@ -737,7 +737,14 @@ start_chunked_response(#httpd{mochi_req=MochiReq}=Req, Code, Headers0) -> {ok, Resp}. send_chunk(Resp, Data) -> - Resp:write_chunk(Data), + case iolist_size(Data) of + 0 -> ok; % do nothing + _ -> Resp:write_chunk(Data) + end, + {ok, Resp}. + +last_chunk(Resp) -> + Resp:write_chunk([]), {ok, Resp}. send_response(Req, Code, Headers0, Body) -> diff --git a/src/chttpd/src/chttpd_changes.erl b/src/chttpd/src/chttpd_changes.erl index 2fe824cec..30caab2a0 100644 --- a/src/chttpd/src/chttpd_changes.erl +++ b/src/chttpd/src/chttpd_changes.erl @@ -60,7 +60,8 @@ handle_db_changes(Args, Req, Db) -> handle_changes(Args, Req, Db, db). -handle_changes(Args1, Req, Db0, Type) -> +handle_changes(Args1, Req, Db, Type) -> + ReqPid = chttpd:header_value(Req, "XKCD", "<unknown>"), #changes_args{ style = Style, filter = FilterName, @@ -68,7 +69,8 @@ handle_changes(Args1, Req, Db0, Type) -> dir = Dir, since = Since } = Args1, - Filter = configure_filter(FilterName, Style, Req, Db0), + couch_log:error("XKCD: STARTING CHANGES FEED ~p for ~s : ~p", [self(), ReqPid, Since]), + Filter = configure_filter(FilterName, Style, Req, Db), Args = Args1#changes_args{filter_fun = Filter}, % The type of changes feed depends on the supplied filter. If the query is % for an optimized view-filtered db changes, we need to use the view @@ -81,7 +83,7 @@ handle_changes(Args1, Req, Db0, Type) -> _ -> {false, undefined, undefined} end, - DbName = couch_db:name(Db0), + DbName = fabric2_db:name(Db), {StartListenerFun, View} = if UseViewChanges -> {ok, {_, View0, _}, _, _} = couch_mrview_util:get_view( DbName, DDocName, ViewName, #mrargs{}), @@ -99,17 +101,16 @@ handle_changes(Args1, Req, Db0, Type) -> {SNFun, View0}; true -> SNFun = fun() -> - couch_event:link_listener( - ?MODULE, handle_db_event, self(), [{dbname, DbName}] - ) + fabric2_events:link_listener( + ?MODULE, handle_db_event, self(), [{dbname, DbName}] + ) end, {SNFun, undefined} end, Start = fun() -> - {ok, Db} = couch_db:reopen(Db0), StartSeq = case Dir of rev -> - couch_db:get_update_seq(Db); + fabric2_fdb:get_update_seq(Db); fwd -> Since end, @@ -137,7 +138,7 @@ handle_changes(Args1, Req, Db0, Type) -> {ok, Listener} = StartListenerFun(), {Db, View, StartSeq} = Start(), - UserAcc2 = start_sending_changes(Callback, UserAcc, Feed), + UserAcc2 = start_sending_changes(Callback, UserAcc), {Timeout, TimeoutFun} = get_changes_timeout(Args, Callback), Acc0 = build_acc(Args, Callback, UserAcc2, Db, StartSeq, <<"">>, Timeout, TimeoutFun, DDocName, ViewName, @@ -148,14 +149,14 @@ handle_changes(Args1, Req, Db0, Type) -> Acc0, true) after - couch_event:stop_listener(Listener), + fabric2_events:stop_listener(Listener), get_rest_updated(ok) % clean out any remaining update messages end end; false -> fun(CallbackAcc) -> {Callback, UserAcc} = get_callback_acc(CallbackAcc), - UserAcc2 = start_sending_changes(Callback, UserAcc, Feed), + UserAcc2 = start_sending_changes(Callback, UserAcc), {Timeout, TimeoutFun} = get_changes_timeout(Args, Callback), {Db, View, StartSeq} = Start(), Acc0 = build_acc(Args#changes_args{feed="normal"}, Callback, @@ -166,7 +167,7 @@ handle_changes(Args1, Req, Db0, Type) -> Acc0, Dir, true), - end_sending_changes(Callback, UserAcc3, LastSeq, Feed) + end_sending_changes(Callback, UserAcc3, LastSeq) end end. @@ -192,10 +193,10 @@ handle_view_event(_DbName, Msg, {Parent, DDocId}) -> end, {ok, {Parent, DDocId}}. -get_callback_acc({Callback, _UserAcc} = Pair) when is_function(Callback, 3) -> +get_callback_acc({Callback, _UserAcc} = Pair) when is_function(Callback, 2) -> Pair; -get_callback_acc(Callback) when is_function(Callback, 2) -> - {fun(Ev, Data, _) -> Callback(Ev, Data) end, ok}. +get_callback_acc(Callback) when is_function(Callback, 1) -> + {fun(Ev, _) -> Callback(Ev) end, ok}. configure_filter("_doc_ids", Style, Req, _Db) -> @@ -223,7 +224,7 @@ configure_filter("_view", Style, Req, Db) -> catch _:_ -> view end, - case couch_db:is_clustered(Db) of + case fabric2_db:is_clustered(Db) of true -> DIR = fabric_util:doc_id_and_rev(DDoc), {fetch, FilterType, Style, DIR, VName}; @@ -246,8 +247,7 @@ configure_filter(FilterName, Style, Req, Db) -> [DName, FName] -> {ok, DDoc} = open_ddoc(Db, <<"_design/", DName/binary>>), check_member_exists(DDoc, [<<"filters">>, FName]), - DIR = fabric_util:doc_id_and_rev(DDoc), - {fetch, custom, Style, Req, DIR, FName}; + {custom, Style, Req, DDoc, FName}; [] -> {default, Style}; _Else -> @@ -256,45 +256,45 @@ configure_filter(FilterName, Style, Req, Db) -> end. -filter(Db, #full_doc_info{}=FDI, Filter) -> - filter(Db, couch_doc:to_doc_info(FDI), Filter); -filter(_Db, DocInfo, {default, Style}) -> - apply_style(DocInfo, Style); -filter(_Db, DocInfo, {doc_ids, Style, DocIds}) -> - case lists:member(DocInfo#doc_info.id, DocIds) of +filter(Db, Change, {default, Style}) -> + apply_style(Db, Change, Style); +filter(Db, Change, {doc_ids, Style, DocIds}) -> + case lists:member(maps:get(id, Change), DocIds) of true -> - apply_style(DocInfo, Style); + apply_style(Db, Change, Style); false -> [] end; -filter(Db, DocInfo, {selector, Style, {Selector, _Fields}}) -> - Docs = open_revs(Db, DocInfo, Style), +filter(Db, Change, {selector, Style, {Selector, _Fields}}) -> + Docs = open_revs(Db, Change, Style), Passes = [mango_selector:match(Selector, couch_doc:to_json_obj(Doc, [])) || Doc <- Docs], filter_revs(Passes, Docs); -filter(_Db, DocInfo, {design_docs, Style}) -> - case DocInfo#doc_info.id of +filter(Db, Change, {design_docs, Style}) -> + case maps:get(id, Change) of <<"_design", _/binary>> -> - apply_style(DocInfo, Style); + apply_style(Db, Change, Style); _ -> [] end; -filter(Db, DocInfo, {FilterType, Style, DDoc, VName}) +filter(Db, Change, {FilterType, Style, DDoc, VName}) when FilterType == view; FilterType == fast_view -> - Docs = open_revs(Db, DocInfo, Style), + Docs = open_revs(Db, Change, Style), {ok, Passes} = couch_query_servers:filter_view(DDoc, VName, Docs), filter_revs(Passes, Docs); -filter(Db, DocInfo, {custom, Style, Req0, DDoc, FName}) -> +filter(Db, Change, {custom, Style, Req0, DDoc, FName}) -> Req = case Req0 of {json_req, _} -> Req0; - #httpd{} -> {json_req, couch_httpd_external:json_req_obj(Req0, Db)} + #httpd{} -> {json_req, chttpd_external:json_req_obj(Req0, Db)} end, - Docs = open_revs(Db, DocInfo, Style), + Docs = open_revs(Db, Change, Style), {ok, Passes} = couch_query_servers:filter_docs(Req, Db, DDoc, FName, Docs), - filter_revs(Passes, Docs). + filter_revs(Passes, Docs); +filter(A, B, C) -> + erlang:error({filter_error, A, B, C}). fast_view_filter(Db, {{Seq, _}, {ID, _, _}}, {fast_view, Style, _, _}) -> - case couch_db:get_doc_info(Db, ID) of + case fabric2_db:get_doc_info(Db, ID) of {ok, #doc_info{high_seq=Seq}=DocInfo} -> Docs = open_revs(Db, DocInfo, Style), Changes = lists:map(fun(#doc{revs={RevPos, [RevId | _]}}) -> @@ -404,32 +404,51 @@ check_member_exists(#doc{body={Props}}, Path) -> couch_util:get_nested_json_value({Props}, Path). -apply_style(#doc_info{revs=Revs}, main_only) -> - [#rev_info{rev=Rev} | _] = Revs, - [{[{<<"rev">>, couch_doc:rev_to_str(Rev)}]}]; -apply_style(#doc_info{revs=Revs}, all_docs) -> - [{[{<<"rev">>, couch_doc:rev_to_str(R)}]} || #rev_info{rev=R} <- Revs]. +apply_style(_Db, Change, main_only) -> + #{rev_id := RevId} = Change, + [{[{<<"rev">>, couch_doc:rev_to_str(RevId)}]}]; +apply_style(Db, Change, all_docs) -> + % We have to fetch all revs for this row + #{id := DocId} = Change, + {ok, Resps} = fabric2_db:open_doc_revs(Db, DocId, all, [deleted]), + lists:flatmap(fun(Resp) -> + case Resp of + {ok, #doc{revs = {Pos, [Rev | _]}}} -> + [{[{<<"rev">>, couch_doc:rev_to_str({Pos, Rev})}]}]; + _ -> + [] + end + end, Resps); +apply_style(A, B, C) -> + erlang:error({changes_apply_style, A, B, C}). apply_view_style(_Db, {{_Seq, _Key}, {_ID, _Value, Rev}}, main_only) -> [{[{<<"rev">>, couch_doc:rev_to_str(Rev)}]}]; apply_view_style(Db, {{_Seq, _Key}, {ID, _Value, _Rev}}, all_docs) -> case couch_db:get_doc_info(Db, ID) of {ok, DocInfo} -> - apply_style(DocInfo, all_docs); + apply_style(Db, DocInfo, all_docs); {error, not_found} -> [] end. -open_revs(Db, DocInfo, Style) -> - DocInfos = case Style of - main_only -> [DocInfo]; - all_docs -> [DocInfo#doc_info{revs=[R]}|| R <- DocInfo#doc_info.revs] - end, - OpenOpts = [deleted, conflicts], - % Relying on list comprehensions to silence errors - OpenResults = [couch_db:open_doc(Db, DI, OpenOpts) || DI <- DocInfos], - [Doc || {ok, Doc} <- OpenResults]. +open_revs(Db, Change, Style) -> + #{id := DocId} = Change, + Options = [deleted, conflicts], + try + case Style of + main_only -> + {ok, Doc} = fabric2_db:open_doc(Db, DocId, Options), + [Doc]; + all_docs -> + {ok, Docs} = fabric2_db:open_doc_revs(Db, DocId, all, Options), + [Doc || {ok, Doc} <- Docs] + end + catch _:_ -> + % We didn't log this before, should we now? + [] + end. filter_revs(Passes, Docs) -> @@ -471,12 +490,9 @@ get_changes_timeout(Args, Callback) -> fun(UserAcc) -> {ok, Callback(timeout, ResponseType, UserAcc)} end} end. -start_sending_changes(_Callback, UserAcc, ResponseType) - when ResponseType =:= "continuous" - orelse ResponseType =:= "eventsource" -> - UserAcc; -start_sending_changes(Callback, UserAcc, ResponseType) -> - Callback(start, ResponseType, UserAcc). +start_sending_changes(Callback, UserAcc) -> + {_, NewUserAcc} = Callback(start, UserAcc), + NewUserAcc. build_acc(Args, Callback, UserAcc, Db, StartSeq, Prepend, Timeout, TimeoutFun, DDocName, ViewName, View) -> #changes_args{ @@ -525,7 +541,7 @@ send_changes(Acc, Dir, FirstRound) -> couch_mrview:view_changes_since(View, StartSeq, DbEnumFun, [{dir, Dir}], Acc); {undefined, _} -> Opts = [{dir, Dir}], - couch_db:fold_changes(Db, StartSeq, DbEnumFun, Acc, Opts); + fabric2_db:fold_changes(Db, StartSeq, DbEnumFun, Acc, Opts); {#mrview{}, _} -> ViewEnumFun = fun view_changes_enumerator/2, {Go, Acc0} = couch_mrview:view_changes_since(View, StartSeq, ViewEnumFun, [{dir, Dir}], Acc), @@ -565,7 +581,7 @@ can_optimize(_, _) -> send_changes_doc_ids(Db, StartSeq, Dir, Fun, Acc0, {doc_ids, _Style, DocIds}) -> - Results = couch_db:get_full_doc_infos(Db, DocIds), + Results = fabric2_db:get_full_doc_infos(Db, DocIds), FullInfos = lists:foldl(fun (#full_doc_info{}=FDI, Acc) -> [FDI | Acc]; (not_found, Acc) -> Acc @@ -603,7 +619,21 @@ send_lookup_changes(FullDocInfos, StartSeq, Dir, Db, Fun, Acc0) -> SortedDocInfos = lists:keysort(#doc_info.high_seq, DocInfos), FinalAcc = try FoldFun(fun(DocInfo, Acc) -> - case Fun(DocInfo, Acc) of + % Kinda gross that we're munging this back to a map + % that will then have to re-read and rebuild the FDI + % for all_docs style. But c'est la vie. + #doc_info{ + id = DocId, + high_seq = Seq, + revs = [#rev_info{rev = Rev, deleted = Deleted} | _] + } = DocInfo, + Change = #{ + id => DocId, + sequence => Seq, + rev_id => Rev, + deleted => Deleted + }, + case Fun(Change, Acc) of {ok, NewAcc} -> NewAcc; {stop, NewAcc} -> @@ -617,7 +647,7 @@ send_lookup_changes(FullDocInfos, StartSeq, Dir, Db, Fun, Acc0) -> fwd -> FinalAcc0 = case element(1, FinalAcc) of changes_acc -> % we came here via couch_http or internal call - FinalAcc#changes_acc{seq = couch_db:get_update_seq(Db)}; + FinalAcc#changes_acc{seq = fabric2_db:get_update_seq(Db)}; fabric_changes_acc -> % we came here via chttpd / fabric / rexi FinalAcc#fabric_changes_acc{seq = couch_db:get_update_seq(Db)} end, @@ -642,31 +672,34 @@ keep_sending_changes(Args, Acc0, FirstRound) -> ddoc_name = DDocName, view_name = ViewName } = ChangesAcc, - couch_db:close(Db), if Limit > NewLimit, ResponseType == "longpoll" -> - end_sending_changes(Callback, UserAcc2, EndSeq, ResponseType); + end_sending_changes(Callback, UserAcc2, EndSeq); true -> - case wait_updated(Timeout, TimeoutFun, UserAcc2) of - {updated, UserAcc4} -> - DbOptions1 = [{user_ctx, couch_db:get_user_ctx(Db)} | DbOptions], - case couch_db:open(couch_db:name(Db), DbOptions1) of - {ok, Db2} -> - ?MODULE:keep_sending_changes( - Args#changes_args{limit=NewLimit}, - ChangesAcc#changes_acc{ - db = Db2, - view = maybe_refresh_view(Db2, DDocName, ViewName), - user_acc = UserAcc4, - seq = EndSeq, - prepend = Prepend2, - timeout = Timeout, - timeout_fun = TimeoutFun}, - false); - _Else -> - end_sending_changes(Callback, UserAcc2, EndSeq, ResponseType) - end; - {stop, UserAcc4} -> - end_sending_changes(Callback, UserAcc4, EndSeq, ResponseType) + {Go, UserAcc3} = notify_waiting_for_updates(Callback, UserAcc2), + if Go /= ok -> end_sending_changes(Callback, UserAcc3, EndSeq); true -> + case wait_updated(Timeout, TimeoutFun, UserAcc3) of + {updated, UserAcc4} -> + UserCtx = fabric2_db:get_user_ctx(Db), + DbOptions1 = [{user_ctx, UserCtx} | DbOptions], + case fabric2_db:open(fabric2_db:name(Db), DbOptions1) of + {ok, Db2} -> + ?MODULE:keep_sending_changes( + Args#changes_args{limit=NewLimit}, + ChangesAcc#changes_acc{ + db = Db2, + view = maybe_refresh_view(Db2, DDocName, ViewName), + user_acc = UserAcc4, + seq = EndSeq, + prepend = Prepend2, + timeout = Timeout, + timeout_fun = TimeoutFun}, + false); + _Else -> + end_sending_changes(Callback, UserAcc3, EndSeq) + end; + {stop, UserAcc4} -> + end_sending_changes(Callback, UserAcc4, EndSeq) + end end end. @@ -677,8 +710,11 @@ maybe_refresh_view(Db, DDocName, ViewName) -> {ok, {_, View, _}, _, _} = couch_mrview_util:get_view(DbName, DDocName, ViewName, #mrargs{}), View. -end_sending_changes(Callback, UserAcc, EndSeq, ResponseType) -> - Callback({stop, EndSeq}, ResponseType, UserAcc). +notify_waiting_for_updates(Callback, UserAcc) -> + Callback(waiting_for_updates, UserAcc). + +end_sending_changes(Callback, UserAcc, EndSeq) -> + Callback({stop, EndSeq, null}, UserAcc). view_changes_enumerator(Value, Acc) -> #changes_acc{ @@ -748,27 +784,24 @@ view_changes_enumerator(Value, Acc) -> end end. -changes_enumerator(Value0, Acc) -> +changes_enumerator(Change0, Acc) -> #changes_acc{ - filter = Filter, callback = Callback, prepend = Prepend, - user_acc = UserAcc, limit = Limit, resp_type = ResponseType, db = Db, - timeout = Timeout, timeout_fun = TimeoutFun + filter = Filter, + callback = Callback, + user_acc = UserAcc, + limit = Limit, + db = Db, + timeout = Timeout, + timeout_fun = TimeoutFun } = Acc, - {Value, Results0} = case Filter of + {Change1, Results0} = case Filter of {fast_view, _, _, _} -> - fast_view_filter(Db, Value0, Filter); + fast_view_filter(Db, Change0, Filter); _ -> - {Value0, filter(Db, Value0, Filter)} + {Change0, filter(Db, Change0, Filter)} end, Results = [Result || Result <- Results0, Result /= null], - Seq = case Value of - #full_doc_info{} -> - Value#full_doc_info.update_seq; - #doc_info{} -> - Value#doc_info.high_seq; - {{Seq0, _}, _} -> - Seq0 - end, + Seq = maps:get(sequence, Change1), Go = if (Limit =< 1) andalso Results =/= [] -> stop; true -> ok end, case Results of [] -> @@ -780,19 +813,19 @@ changes_enumerator(Value0, Acc) -> {Go, Acc#changes_acc{seq = Seq, user_acc = UserAcc2}} end; _ -> - if ResponseType =:= "continuous" orelse ResponseType =:= "eventsource" -> - ChangesRow = changes_row(Results, Value, Acc), - UserAcc2 = Callback({change, ChangesRow, <<>>}, ResponseType, UserAcc), - reset_heartbeat(), - {Go, Acc#changes_acc{seq = Seq, user_acc = UserAcc2, limit = Limit - 1}}; - true -> - ChangesRow = changes_row(Results, Value, Acc), - UserAcc2 = Callback({change, ChangesRow, Prepend}, ResponseType, UserAcc), - reset_heartbeat(), - {Go, Acc#changes_acc{ - seq = Seq, prepend = <<",\n">>, - user_acc = UserAcc2, limit = Limit - 1}} - end + ChangesRow = changes_row(Results, Change1, Acc), + {UserGo, UserAcc2} = Callback({change, ChangesRow}, UserAcc), + RealGo = case UserGo of + ok -> Go; + stop -> stop + end, + reset_heartbeat(), + couch_log:error("XKCD: CHANGE SEQ: ~p", [Seq]), + {RealGo, Acc#changes_acc{ + seq = Seq, + user_acc = UserAcc2, + limit = Limit - 1 + }} end. @@ -823,14 +856,17 @@ view_changes_row(Results, KVs, Acc) -> ] ++ maybe_get_changes_doc({Id, Rev}, Acc)}. -changes_row(Results, #full_doc_info{} = FDI, Acc) -> - changes_row(Results, couch_doc:to_doc_info(FDI), Acc); -changes_row(Results, DocInfo, Acc) -> - #doc_info{ - id = Id, high_seq = Seq, revs = [#rev_info{deleted = Del} | _] - } = DocInfo, - {[{<<"seq">>, Seq}, {<<"id">>, Id}, {<<"changes">>, Results}] ++ - deleted_item(Del) ++ maybe_get_changes_doc(DocInfo, Acc)}. +changes_row(Results, Change, Acc) -> + #{ + id := Id, + sequence := Seq, + deleted := Del + } = Change, + {[ + {<<"seq">>, Seq}, + {<<"id">>, Id}, + {<<"changes">>, Results} + ] ++ deleted_item(Del) ++ maybe_get_changes_doc(Change, Acc)}. maybe_get_changes_doc(Value, #changes_acc{include_docs=true}=Acc) -> #changes_acc{ @@ -840,9 +876,9 @@ maybe_get_changes_doc(Value, #changes_acc{include_docs=true}=Acc) -> filter = Filter } = Acc, Opts = case Conflicts of - true -> [deleted, conflicts]; - false -> [deleted] - end, + true -> [deleted, conflicts]; + false -> [deleted] + end, load_doc(Db, Value, Opts, DocOpts, Filter); maybe_get_changes_doc(_Value, _Acc) -> @@ -850,7 +886,7 @@ maybe_get_changes_doc(_Value, _Acc) -> load_doc(Db, Value, Opts, DocOpts, Filter) -> - case couch_index_util:load_doc(Db, Value, Opts) of + case load_doc(Db, Value, Opts) of null -> [{doc, null}]; Doc -> @@ -858,6 +894,19 @@ load_doc(Db, Value, Opts, DocOpts, Filter) -> end. +load_doc(Db, Change, Opts) -> + #{ + id := Id, + rev_id := RevId + } = Change, + case fabric2_db:open_doc_revs(Db, Id, [RevId], Opts) of + {ok, [{ok, Doc}]} -> + Doc; + _ -> + null + end. + + doc_to_json(Doc, DocOpts, {selector, _Style, {_Selector, Fields}}) when Fields =/= nil -> mango_fields:extract(couch_doc:to_json_obj(Doc, DocOpts), Fields); @@ -870,17 +919,22 @@ deleted_item(_) -> []. % waits for a updated msg, if there are multiple msgs, collects them. wait_updated(Timeout, TimeoutFun, UserAcc) -> + couch_log:error("XKCD: WAITING FOR UPDATE", []), receive updated -> + couch_log:error("XKCD: GOT UPDATED", []), get_rest_updated(UserAcc); deleted -> + couch_log:error("XKCD: DB DELETED", []), {stop, UserAcc} after Timeout -> {Go, UserAcc2} = TimeoutFun(UserAcc), case Go of ok -> + couch_log:error("XKCD: WAIT UPDATED TIMEOUT, RETRY", []), ?MODULE:wait_updated(Timeout, TimeoutFun, UserAcc2); stop -> + couch_log:error("XKCD: WAIT UPDATED TIMEOUT STOP", []), {stop, UserAcc2} end end. diff --git a/src/chttpd/src/chttpd_db.erl b/src/chttpd/src/chttpd_db.erl index 27c1a8a81..76b333ef7 100644 --- a/src/chttpd/src/chttpd_db.erl +++ b/src/chttpd/src/chttpd_db.erl @@ -93,9 +93,9 @@ handle_changes_req(#httpd{path_parts=[_,<<"_changes">>]}=Req, _Db) -> handle_changes_req1(#httpd{}=Req, Db) -> #changes_args{filter=Raw, style=Style} = Args0 = parse_changes_query(Req), ChangesArgs = Args0#changes_args{ - filter_fun = couch_changes:configure_filter(Raw, Style, Req, Db), db_open_options = [{user_ctx, fabric2_db:get_user_ctx(Db)}] }, + ChangesFun = chttpd_changes:handle_db_changes(ChangesArgs, Req, Db), Max = chttpd:chunked_response_buffer_size(), case ChangesArgs#changes_args.feed of "normal" -> @@ -107,7 +107,7 @@ handle_changes_req1(#httpd{}=Req, Db) -> mochi = Req, threshold = Max }, - fabric2_db:fold_changes(Db, <<>>, fun changes_callback/3, Acc0) + ChangesFun({fun changes_callback/2, Acc0}) end); Feed when Feed =:= "continuous"; Feed =:= "longpoll"; Feed =:= "eventsource" -> couch_stats:increment_counter([couchdb, httpd, clients_requesting_changes]), @@ -117,7 +117,7 @@ handle_changes_req1(#httpd{}=Req, Db) -> threshold = Max }, try - fabric:changes(Db, fun changes_callback/3, Acc0, ChangesArgs) + ChangesFun({fun changes_callback/2, Acc0}) after couch_stats:decrement_counter([couchdb, httpd, clients_requesting_changes]) end; @@ -127,15 +127,15 @@ handle_changes_req1(#httpd{}=Req, Db) -> end. % callbacks for continuous feed (newline-delimited JSON Objects) -changes_callback(_TxDb, start, #cacc{feed = continuous} = Acc) -> +changes_callback(start, #cacc{feed = continuous} = Acc) -> {ok, Resp} = chttpd:start_delayed_json_response(Acc#cacc.mochi, 200), {ok, Acc#cacc{mochi = Resp, responding = true}}; -changes_callback(_TxDb, {change, Change}, #cacc{feed = continuous} = Acc) -> +changes_callback({change, Change}, #cacc{feed = continuous} = Acc) -> chttpd_stats:incr_rows(), Data = [?JSON_ENCODE(Change) | "\n"], Len = iolist_size(Data), maybe_flush_changes_feed(Acc, Data, Len); -changes_callback(_TxDb, {stop, EndSeq, Pending}, #cacc{feed = continuous} = Acc) -> +changes_callback({stop, EndSeq, Pending}, #cacc{feed = continuous} = Acc) -> #cacc{mochi = Resp, buffer = Buf} = Acc, Row = {[ {<<"last_seq">>, EndSeq}, @@ -146,7 +146,7 @@ changes_callback(_TxDb, {stop, EndSeq, Pending}, #cacc{feed = continuous} = Acc) chttpd:end_delayed_json_response(Resp1); % callbacks for eventsource feed (newline-delimited eventsource Objects) -changes_callback(_TxDb, start, #cacc{feed = eventsource} = Acc) -> +changes_callback(start, #cacc{feed = eventsource} = Acc) -> #cacc{mochi = Req} = Acc, Headers = [ {"Content-Type", "text/event-stream"}, @@ -154,7 +154,7 @@ changes_callback(_TxDb, start, #cacc{feed = eventsource} = Acc) -> ], {ok, Resp} = chttpd:start_delayed_json_response(Req, 200, Headers), {ok, Acc#cacc{mochi = Resp, responding = true}}; -changes_callback(_TxDb, {change, {ChangeProp}=Change}, #cacc{feed = eventsource} = Acc) -> +changes_callback({change, {ChangeProp}=Change}, #cacc{feed = eventsource} = Acc) -> chttpd_stats:incr_rows(), Seq = proplists:get_value(seq, ChangeProp), Chunk = [ @@ -164,34 +164,34 @@ changes_callback(_TxDb, {change, {ChangeProp}=Change}, #cacc{feed = eventsource} ], Len = iolist_size(Chunk), maybe_flush_changes_feed(Acc, Chunk, Len); -changes_callback(_TxDb, timeout, #cacc{feed = eventsource} = Acc) -> +changes_callback(timeout, #cacc{feed = eventsource} = Acc) -> #cacc{mochi = Resp} = Acc, Chunk = "event: heartbeat\ndata: \n\n", {ok, Resp1} = chttpd:send_delayed_chunk(Resp, Chunk), {ok, Acc#cacc{mochi = Resp1}}; -changes_callback(_TxDb, {stop, _EndSeq}, #cacc{feed = eventsource} = Acc) -> +changes_callback({stop, _EndSeq}, #cacc{feed = eventsource} = Acc) -> #cacc{mochi = Resp, buffer = Buf} = Acc, {ok, Resp1} = chttpd:send_delayed_chunk(Resp, Buf), chttpd:end_delayed_json_response(Resp1); % callbacks for longpoll and normal (single JSON Object) -changes_callback(_TxDb, start, #cacc{feed = normal} = Acc) -> +changes_callback(start, #cacc{feed = normal} = Acc) -> #cacc{etag = Etag, mochi = Req} = Acc, FirstChunk = "{\"results\":[\n", {ok, Resp} = chttpd:start_delayed_json_response(Req, 200, [{"ETag",Etag}], FirstChunk), {ok, Acc#cacc{mochi = Resp, responding = true}}; -changes_callback(_TxDb, start, Acc) -> +changes_callback(start, Acc) -> #cacc{mochi = Req} = Acc, FirstChunk = "{\"results\":[\n", {ok, Resp} = chttpd:start_delayed_json_response(Req, 200, [], FirstChunk), {ok, Acc#cacc{mochi = Resp, responding = true}}; -changes_callback(_TxDb, {change, Change}, Acc) -> +changes_callback({change, Change}, Acc) -> chttpd_stats:incr_rows(), Data = [Acc#cacc.prepend, ?JSON_ENCODE(Change)], Len = iolist_size(Data), maybe_flush_changes_feed(Acc, Data, Len); -changes_callback(_TxDb, {stop, EndSeq, Pending}, Acc) -> +changes_callback({stop, EndSeq, Pending}, Acc) -> #cacc{buffer = Buf, mochi = Resp, threshold = Max} = Acc, Terminator = [ "\n],\n\"last_seq\":", @@ -203,23 +203,26 @@ changes_callback(_TxDb, {stop, EndSeq, Pending}, Acc) -> {ok, Resp1} = chttpd:close_delayed_json_object(Resp, Buf, Terminator, Max), chttpd:end_delayed_json_response(Resp1); -changes_callback(_TxDb, waiting_for_updates, #cacc{buffer = []} = Acc) -> +changes_callback(waiting_for_updates, #cacc{buffer = []} = Acc) -> {ok, Acc}; -changes_callback(_TxDb, waiting_for_updates, Acc) -> +changes_callback(waiting_for_updates, Acc) -> #cacc{buffer = Buf, mochi = Resp} = Acc, {ok, Resp1} = chttpd:send_delayed_chunk(Resp, Buf), {ok, Acc#cacc{buffer = [], bufsize = 0, mochi = Resp1}}; -changes_callback(_TxDb, timeout, Acc) -> +changes_callback(timeout, Acc) -> {ok, Resp1} = chttpd:send_delayed_chunk(Acc#cacc.mochi, "\n"), {ok, Acc#cacc{mochi = Resp1}}; -changes_callback(_TxDb, {error, Reason}, #cacc{mochi = #httpd{}} = Acc) -> +changes_callback({error, Reason}, #cacc{mochi = #httpd{}} = Acc) -> #cacc{mochi = Req} = Acc, chttpd:send_error(Req, Reason); -changes_callback(_TxDb, {error, Reason}, #cacc{feed = normal, responding = false} = Acc) -> +changes_callback({error, Reason}, #cacc{feed = normal, responding = false} = Acc) -> #cacc{mochi = Req} = Acc, chttpd:send_error(Req, Reason); -changes_callback(_TxDb, {error, Reason}, Acc) -> - chttpd:send_delayed_error(Acc#cacc.mochi, Reason). +changes_callback({error, Reason}, Acc) -> + chttpd:send_delayed_error(Acc#cacc.mochi, Reason); + +changes_callback(A, B) -> + erlang:error({changes_error, A, B}). maybe_flush_changes_feed(#cacc{bufsize=Size, threshold=Max} = Acc, Data, Len) when Size > 0 andalso (Size + Len) > Max -> @@ -432,13 +435,10 @@ db_req(#httpd{method='POST', path_parts=[DbName], user_ctx=Ctx}=Req, Db) -> db_req(#httpd{path_parts=[_DbName]}=Req, _Db) -> send_method_not_allowed(Req, "DELETE,GET,HEAD,POST"); -db_req(#httpd{method='POST', path_parts=[DbName, <<"_ensure_full_commit">>], - user_ctx=Ctx}=Req, _Db) -> +db_req(#httpd{method='POST', path_parts=[_DbName, <<"_ensure_full_commit">>], + user_ctx=Ctx}=Req, Db) -> chttpd:validate_ctype(Req, "application/json"), - %% use fabric call to trigger a database_does_not_exist exception - %% for missing databases that'd return error 404 from chttpd - %% get_security used to prefer shards on the same node over other nodes - fabric:get_security(DbName, [{user_ctx, Ctx}]), + #{db_prefix := <<_/binary>>} = Db, send_json(Req, 201, {[ {ok, true}, {instance_start_time, <<"0">>} @@ -807,8 +807,8 @@ multi_all_docs_view(Req, Db, OP, Queries) -> 200, [], FirstChunk), VAcc1 = VAcc0#vacc{resp=Resp0}, VAcc2 = lists:foldl(fun(Args, Acc0) -> - {ok, Acc1} = fabric:all_docs(Db, Options, - fun view_cb/2, Acc0, Args), + {ok, Acc1} = fabric2_db:fold_docs(Db, Options, + fun view_cb/3, Acc0, Args), Acc1 end, VAcc1, ArgQueries), {ok, Resp1} = chttpd:send_delayed_chunk(VAcc2#vacc.resp, "\r\n]}"), @@ -822,10 +822,10 @@ all_docs_view(Req, Db, _Keys, _OP) -> Options = [{user_ctx, Req#httpd.user_ctx}], Max = chttpd:chunked_response_buffer_size(), VAcc = #vacc{db=Db, req=Req, threshold=Max}, - {ok, Resp} = fabric2_db:fold_docs(Db, fun view_cb/2, VAcc, Options), + {ok, Resp} = fabric2_db:fold_docs(Db, fun view_cb/3, VAcc, Options), {ok, Resp#vacc.resp}. -view_cb({row, Row} = Msg, Acc) -> +view_cb(_TxDb, {row, Row} = Msg, Acc) -> case lists:keymember(doc, 1, Row) of true -> chttpd_stats:incr_reads(); false -> ok @@ -833,7 +833,7 @@ view_cb({row, Row} = Msg, Acc) -> chttpd_stats:incr_rows(), couch_mrview_http:view_cb(Msg, Acc); -view_cb(Msg, Acc) -> +view_cb(_TxDb, Msg, Acc) -> couch_mrview_http:view_cb(Msg, Acc). db_doc_req(#httpd{method='DELETE'}=Req, Db, DocId) -> @@ -974,7 +974,7 @@ db_doc_req(#httpd{method='PUT', user_ctx=Ctx}=Req, Db, DocId) -> case couch_util:to_list(couch_httpd:header_value(Req, "Content-Type")) of ("multipart/related;" ++ _) = ContentType -> couch_httpd:check_max_request_length(Req), - couch_httpd_multipart:num_mp_writers(mem3:n(mem3:dbname(DbName), DocId)), + couch_httpd_multipart:num_mp_writers(1), {ok, Doc0, WaitFun, Parser} = couch_doc:doc_from_multi_part_stream(ContentType, fun() -> receive_request_data(Req) end), Doc = couch_doc_from_req(Req, Db, DocId, Doc0), @@ -1129,7 +1129,7 @@ send_docs_multipart(Req, Results, Options1) -> CType = {"Content-Type", "multipart/mixed; boundary=\"" ++ ?b2l(OuterBoundary) ++ "\""}, {ok, Resp} = start_chunked_response(Req, 200, [CType]), - couch_httpd:send_chunk(Resp, <<"--", OuterBoundary/binary>>), + chttpd:send_chunk(Resp, <<"--", OuterBoundary/binary>>), lists:foreach( fun({ok, #doc{atts=Atts}=Doc}) -> Refs = monitor_attachments(Doc#doc.atts), @@ -1137,25 +1137,25 @@ send_docs_multipart(Req, Results, Options1) -> JsonBytes = ?JSON_ENCODE(couch_doc:to_json_obj(Doc, Options)), {ContentType, _Len} = couch_doc:len_doc_to_multi_part_stream( InnerBoundary, JsonBytes, Atts, true), - couch_httpd:send_chunk(Resp, <<"\r\nContent-Type: ", + chttpd:send_chunk(Resp, <<"\r\nContent-Type: ", ContentType/binary, "\r\n\r\n">>), couch_doc:doc_to_multi_part_stream(InnerBoundary, JsonBytes, Atts, - fun(Data) -> couch_httpd:send_chunk(Resp, Data) + fun(Data) -> chttpd:send_chunk(Resp, Data) end, true), - couch_httpd:send_chunk(Resp, <<"\r\n--", OuterBoundary/binary>>) + chttpd:send_chunk(Resp, <<"\r\n--", OuterBoundary/binary>>) after demonitor_refs(Refs) end; ({{not_found, missing}, RevId}) -> RevStr = couch_doc:rev_to_str(RevId), Json = ?JSON_ENCODE({[{<<"missing">>, RevStr}]}), - couch_httpd:send_chunk(Resp, + chttpd:send_chunk(Resp, [<<"\r\nContent-Type: application/json; error=\"true\"\r\n\r\n">>, Json, <<"\r\n--", OuterBoundary/binary>>]) end, Results), - couch_httpd:send_chunk(Resp, <<"--">>), - couch_httpd:last_chunk(Resp). + chttpd:send_chunk(Resp, <<"--">>), + chttpd:last_chunk(Resp). bulk_get_multipart_headers({0, []}, Id, Boundary) -> [ @@ -1438,8 +1438,12 @@ db_attachment_req(#httpd{method='GET',mochi_req=MochiReq}=Req, Db, DocId, FileNa end; -db_attachment_req(#httpd{method=Method, user_ctx=Ctx}=Req, Db, DocId, FileNameParts) +db_attachment_req(#httpd{method=Method}=Req, Db, DocId, FileNameParts) when (Method == 'PUT') or (Method == 'DELETE') -> + #httpd{ + user_ctx = Ctx, + mochi_req = MochiReq + } = Req, FileName = validate_attachment_name( mochiweb_util:join( lists:map(fun binary_to_list/1, @@ -1449,16 +1453,45 @@ db_attachment_req(#httpd{method=Method, user_ctx=Ctx}=Req, Db, DocId, FileNamePa 'DELETE' -> []; _ -> - MimeType = case couch_httpd:header_value(Req,"Content-Type") of + MimeType = case chttpd:header_value(Req,"Content-Type") of % We could throw an error here or guess by the FileName. % Currently, just giving it a default. undefined -> <<"application/octet-stream">>; CType -> list_to_binary(CType) end, - Data = fabric:att_receiver(Req, chttpd:body_length(Req)), + Data = case chttpd:body_length(Req) of + undefined -> + <<"">>; + {unknown_transfer_encoding, Unknown} -> + exit({unknown_transfer_encoding, Unknown}); + chunked -> + fun(MaxChunkSize, ChunkFun, InitState) -> + chttpd:recv_chunked( + Req, MaxChunkSize, ChunkFun, InitState + ) + end; + 0 -> + <<"">>; + Length when is_integer(Length) -> + Expect = case chttpd:header_value(Req, "expect") of + undefined -> + undefined; + Value when is_list(Value) -> + string:to_lower(Value) + end, + case Expect of + "100-continue" -> + MochiReq:start_raw_response({100, gb_trees:empty()}); + _Else -> + ok + end, + fun() -> chttpd:recv(Req, 0) end; + Length -> + exit({length_not_integer, Length}) + end, ContentLen = case couch_httpd:header_value(Req,"Content-Length") of undefined -> undefined; - Length -> list_to_integer(Length) + CL -> list_to_integer(CL) end, ContentEnc = string:to_lower(string:strip( couch_httpd:header_value(Req, "Content-Encoding", "identity") @@ -1517,7 +1550,7 @@ db_attachment_req(#httpd{method=Method, user_ctx=Ctx}=Req, Db, DocId, FileNamePa HttpCode = 202 end, erlang:put(mochiweb_request_recv, true), - DbName = couch_db:name(Db), + DbName = fabric2_db:name(Db), {Status, Headers} = case Method of 'DELETE' -> @@ -1673,7 +1706,7 @@ parse_changes_query(Req) -> {"descending", "true"} -> Args#changes_args{dir=rev}; {"since", _} -> - Args#changes_args{since=Value}; + Args#changes_args{since=parse_since_seq(Value)}; {"last-event-id", _} -> Args#changes_args{since=Value}; {"limit", _} -> @@ -1727,6 +1760,27 @@ parse_changes_query(Req) -> ChangesArgs end. + +parse_since_seq(Seq) when is_binary(Seq), size(Seq) > 30 -> + throw({bad_request, url_encoded_since_seq}); + +parse_since_seq(Seq) when is_binary(Seq), size(Seq) > 2 -> + % We have implicitly allowed the since seq to either be + % JSON encoded or a "raw" string. Here we just remove the + % surrounding quotes if they exist and are paired. + SeqSize = size(Seq) - 2, + case Seq of + <<"\"", S:SeqSize/binary, "\"">> -> S; + S -> S + end; + +parse_since_seq(Seq) when is_binary(Seq) -> + Seq; + +parse_since_seq(Seq) when is_list(Seq) -> + parse_since_seq(iolist_to_binary(Seq)). + + extract_header_rev(Req, ExplicitRev) when is_binary(ExplicitRev) or is_list(ExplicitRev)-> extract_header_rev(Req, couch_doc:parse_rev(ExplicitRev)); extract_header_rev(Req, ExplicitRev) -> @@ -1767,6 +1821,8 @@ monitor_attachments(Atts) when is_list(Atts) -> case couch_att:fetch(data, Att) of {Fd, _} -> [monitor(process, Fd) | Monitors]; + {loc, _, _, _} -> + Monitors; stub -> Monitors; Else -> diff --git a/src/chttpd/src/chttpd_external.erl b/src/chttpd/src/chttpd_external.erl index fa35c6ba2..3e59ffe4e 100644 --- a/src/chttpd/src/chttpd_external.erl +++ b/src/chttpd/src/chttpd_external.erl @@ -74,7 +74,7 @@ json_req_obj_fields() -> <<"peer">>, <<"form">>, <<"cookie">>, <<"userCtx">>, <<"secObj">>]. json_req_obj_field(<<"info">>, #httpd{}, Db, _DocId) -> - {ok, Info} = get_db_info(Db), + {ok, Info} = fabric2_db:get_db_info(Db), {Info}; json_req_obj_field(<<"uuid">>, #httpd{}, _Db, _DocId) -> couch_uuids:new(); @@ -117,27 +117,18 @@ json_req_obj_field(<<"form">>, #httpd{mochi_req=Req, method=Method}=HttpReq, Db, json_req_obj_field(<<"cookie">>, #httpd{mochi_req=Req}, _Db, _DocId) -> to_json_terms(Req:parse_cookie()); json_req_obj_field(<<"userCtx">>, #httpd{}, Db, _DocId) -> - couch_util:json_user_ctx(Db); -json_req_obj_field(<<"secObj">>, #httpd{user_ctx=UserCtx}, Db, _DocId) -> - get_db_security(Db, UserCtx). - - -get_db_info(Db) -> - case couch_db:is_clustered(Db) of - true -> - fabric:get_db_info(Db); - false -> - couch_db:get_db_info(Db) - end. - - -get_db_security(Db, #user_ctx{}) -> - case couch_db:is_clustered(Db) of - true -> - fabric:get_security(Db); - false -> - couch_db:get_security(Db) - end. + json_user_ctx(Db); +json_req_obj_field(<<"secObj">>, #httpd{user_ctx = #user_ctx{}}, Db, _DocId) -> + fabric2_db:get_security(Db). + + +json_user_ctx(Db) -> + Ctx = fabric2_db:get_user_ctx(Db), + {[ + {<<"db">>, fabric2_db:name(Db)}, + {<<"name">>, Ctx#user_ctx.name}, + {<<"roles">>, Ctx#user_ctx.roles} + ]}. to_json_terms(Data) -> diff --git a/src/couch/src/couch_att.erl b/src/couch/src/couch_att.erl index a24de21d6..924b58da4 100644 --- a/src/couch/src/couch_att.erl +++ b/src/couch/src/couch_att.erl @@ -29,7 +29,7 @@ -export([ size_info/1, to_disk_term/1, - from_disk_term/2 + from_disk_term/3 ]). -export([ @@ -38,7 +38,7 @@ ]). -export([ - flush/2, + flush/3, foldl/3, range_foldl/5, foldl_decode/3, @@ -46,11 +46,6 @@ ]). -export([ - upgrade/1, - downgrade/1 -]). - --export([ max_attachment_size/0, validate_attachment_size/3 ]). @@ -58,137 +53,61 @@ -compile(nowarn_deprecated_type). -export_type([att/0]). --include_lib("couch/include/couch_db.hrl"). - - -%% Legacy attachment record. This is going to be phased out by the new proplist -%% based structure. It's needed for now to allow code to perform lazy upgrades -%% while the patch is rolled out to the cluster. Attachments passed as records -%% will remain so until they are required to be represented as property lists. -%% Once this has been widely deployed, this record will be removed entirely and -%% property lists will be the main format. --record(att, { - name :: binary(), - type :: binary(), - att_len :: non_neg_integer(), - - %% length of the attachment in its identity form - %% (that is, without a content encoding applied to it) - %% differs from att_len when encoding /= identity - disk_len :: non_neg_integer(), - - md5 = <<>> :: binary(), - revpos = 0 :: non_neg_integer(), - data :: stub | follows | binary() | {any(), any()} | - {follows, pid(), reference()} | fun(() -> binary()), - - %% Encoding of the attachment - %% currently supported values are: - %% identity, gzip - %% additional values to support in the future: - %% deflate, compress - encoding = identity :: identity | gzip -}). - - -%% Extensible Attachment Type -%% -%% The following types describe the known properties for attachment fields -%% encoded as property lists to allow easier upgrades. Values not in this list -%% should be accepted at runtime but should be treated as opaque data as might -%% be used by upgraded code. If you plan on operating on new data, please add -%% an entry here as documentation. - - -%% The name of the attachment is also used as the mime-part name for file -%% downloads. These must be unique per document. --type name_prop() :: {name, binary()}. - - -%% The mime type of the attachment. This does affect compression of certain -%% attachments if the type is found to be configured as a compressable type. -%% This is commonly reserved for text/* types but could include other custom -%% cases as well. See definition and use of couch_util:compressable_att_type/1. --type type_prop() :: {type, binary()}. - - -%% The attachment length is similar to disk-length but ignores additional -%% encoding that may have occurred. --type att_len_prop() :: {att_len, non_neg_integer()}. - - -%% The size of the attachment as stored in a disk stream. --type disk_len_prop() :: {disk_len, non_neg_integer()}. - - -%% This is a digest of the original attachment data as uploaded by the client. -%% it's useful for checking validity of contents against other attachment data -%% as well as quick digest computation of the enclosing document. --type md5_prop() :: {md5, binary()}. - --type revpos_prop() :: {revpos, 0}. +-include_lib("couch/include/couch_db.hrl"). -%% This field is currently overloaded with just about everything. The -%% {any(), any()} type is just there until I have time to check the actual -%% values expected. Over time this should be split into more than one property -%% to allow simpler handling. --type data_prop() :: { - data, stub | follows | binary() | {any(), any()} | - {follows, pid(), reference()} | fun(() -> binary()) -}. +-define(CURRENT_ATT_FORMAT, 0). -%% We will occasionally compress our data. See type_prop() for more information -%% on when this happens. --type encoding_prop() :: {encoding, identity | gzip}. +-type prop_name() :: + name | + type | + att_len | + disk_len | + md5 | + revpos | + data | + encoding. --type attachment() :: [ - name_prop() | type_prop() | - att_len_prop() | disk_len_prop() | - md5_prop() | revpos_prop() | - data_prop() | encoding_prop() -]. +-type data_prop_type() :: + {loc, #{}, binary(), binary()} | + stub | + follows | + binary() | + {follows, pid(), reference()} | + fun(() -> binary()). --type disk_att_v1() :: { - Name :: binary(), - Type :: binary(), - Sp :: any(), - AttLen :: non_neg_integer(), - RevPos :: non_neg_integer(), - Md5 :: binary() -}. --type disk_att_v2() :: { - Name :: binary(), - Type :: binary(), - Sp :: any(), - AttLen :: non_neg_integer(), - DiskLen :: non_neg_integer(), - RevPos :: non_neg_integer(), - Md5 :: binary(), - Enc :: identity | gzip +-type att() :: #{ + name := binary(), + type := binary(), + att_len := non_neg_integer() | undefined, + disk_len := non_neg_integer() | undefined, + md5 := binary() | undefined, + revpos := non_neg_integer(), + data := data_prop_type(), + encoding := identity | gzip | undefined, + headers := [{binary(), binary()}] | undefined }. --type disk_att_v3() :: {Base :: tuple(), Extended :: list()}. - --type disk_att() :: disk_att_v1() | disk_att_v2() | disk_att_v3(). - --type att() :: #att{} | attachment() | disk_att(). new() -> - %% We construct a record by default for compatability. This will be - %% upgraded on demand. A subtle effect this has on all attachments - %% constructed via new is that it will pick up the proper defaults - %% from the #att record definition given above. Newer properties do - %% not support special default values and will all be treated as - %% undefined. - #att{}. + #{ + name => <<>>, + type => <<>>, + att_len => undefined, + disk_len => undefined, + md5 => undefined, + revpos => 0, + data => undefined, + encoding => undefined, + headers => undefined + }. --spec new([{atom(), any()}]) -> att(). +-spec new([{prop_name(), any()}]) -> att(). new(Props) -> store(Props, new()). @@ -197,71 +116,28 @@ new(Props) -> (atom(), att()) -> any(). fetch(Fields, Att) when is_list(Fields) -> [fetch(Field, Att) || Field <- Fields]; -fetch(Field, Att) when is_list(Att) -> - case lists:keyfind(Field, 1, Att) of - {Field, Value} -> Value; - false -> undefined - end; -fetch(name, #att{name = Name}) -> - Name; -fetch(type, #att{type = Type}) -> - Type; -fetch(att_len, #att{att_len = AttLen}) -> - AttLen; -fetch(disk_len, #att{disk_len = DiskLen}) -> - DiskLen; -fetch(md5, #att{md5 = Digest}) -> - Digest; -fetch(revpos, #att{revpos = RevPos}) -> - RevPos; -fetch(data, #att{data = Data}) -> - Data; -fetch(encoding, #att{encoding = Encoding}) -> - Encoding; -fetch(_, _) -> - undefined. +fetch(Field, Att) -> + maps:get(Field, Att). -spec store([{atom(), any()}], att()) -> att(). store(Props, Att0) -> lists:foldl(fun({Field, Value}, Att) -> - store(Field, Value, Att) + maps:update(Field, Value, Att) end, Att0, Props). --spec store(atom(), any(), att()) -> att(). -store(Field, undefined, Att) when is_list(Att) -> - lists:keydelete(Field, 1, Att); -store(Field, Value, Att) when is_list(Att) -> - lists:keystore(Field, 1, Att, {Field, Value}); -store(name, Name, Att) -> - Att#att{name = Name}; -store(type, Type, Att) -> - Att#att{type = Type}; -store(att_len, AttLen, Att) -> - Att#att{att_len = AttLen}; -store(disk_len, DiskLen, Att) -> - Att#att{disk_len = DiskLen}; -store(md5, Digest, Att) -> - Att#att{md5 = Digest}; -store(revpos, RevPos, Att) -> - Att#att{revpos = RevPos}; -store(data, Data, Att) -> - Att#att{data = Data}; -store(encoding, Encoding, Att) -> - Att#att{encoding = Encoding}; store(Field, Value, Att) -> - store(Field, Value, upgrade(Att)). + maps:update(Field, Value, Att). -spec transform(atom(), fun(), att()) -> att(). transform(Field, Fun, Att) -> - NewValue = Fun(fetch(Field, Att)), - store(Field, NewValue, Att). + maps:update_with(Field, Fun, Att). -is_stub(Att) -> - stub == fetch(data, Att). +is_stub(#{data := stub}) -> true; +is_stub(#{}) -> false. %% merge_stubs takes all stub attachments and replaces them with on disk @@ -275,8 +151,7 @@ merge_stubs(MemAtts, DiskAtts) -> merge_stubs(MemAtts, OnDisk, []). -%% restore spec when R14 support is dropped -%% -spec merge_stubs([att()], dict:dict(), [att()]) -> [att()]. +-spec merge_stubs([att()], dict:dict(), [att()]) -> [att()]. merge_stubs([Att | Rest], OnDisk, Merged) -> case fetch(data, Att) of stub -> @@ -308,14 +183,8 @@ size_info([]) -> {ok, []}; size_info(Atts) -> Info = lists:map(fun(Att) -> - AttLen = fetch(att_len, Att), - case fetch(data, Att) of - {stream, StreamEngine} -> - {ok, SPos} = couch_stream:to_disk_term(StreamEngine), - {SPos, AttLen}; - {_, SPos} -> - {SPos, AttLen} - end + [{loc, _Db, _DocId, AttId}, AttLen] = fetch([data, att_len], Att), + {AttId, AttLen} end, Atts), {ok, lists:usort(Info)}. @@ -324,89 +193,44 @@ size_info(Atts) -> %% old format when possible. This should help make the attachment lazy upgrade %% as safe as possible, avoiding the need for complicated disk versioning %% schemes. -to_disk_term(#att{} = Att) -> - {stream, StreamEngine} = fetch(data, Att), - {ok, Sp} = couch_stream:to_disk_term(StreamEngine), - { +to_disk_term(Att) -> + {loc, #{}, _DocId, AttId} = fetch(data, Att), + {?CURRENT_ATT_FORMAT, { fetch(name, Att), fetch(type, Att), - Sp, + AttId, fetch(att_len, Att), fetch(disk_len, Att), fetch(revpos, Att), fetch(md5, Att), - fetch(encoding, Att) - }; -to_disk_term(Att) -> - BaseProps = [name, type, data, att_len, disk_len, revpos, md5, encoding], - {Extended, Base} = lists:foldl( - fun - (data, {Props, Values}) -> - case lists:keytake(data, 1, Props) of - {value, {_, {stream, StreamEngine}}, Other} -> - {ok, Sp} = couch_stream:to_disk_term(StreamEngine), - {Other, [Sp | Values]}; - {value, {_, Value}, Other} -> - {Other, [Value | Values]}; - false -> - {Props, [undefined | Values]} - end; - (Key, {Props, Values}) -> - case lists:keytake(Key, 1, Props) of - {value, {_, Value}, Other} -> {Other, [Value | Values]}; - false -> {Props, [undefined | Values]} - end - end, - {Att, []}, - BaseProps - ), - {list_to_tuple(lists:reverse(Base)), Extended}. - - -%% The new disk term format is a simple wrapper around the legacy format. Base -%% properties will remain in a tuple while the new fields and possibly data from -%% future extensions will be stored in a list of atom/value pairs. While this is -%% slightly less efficient, future work should be able to make use of -%% compression to remove these sorts of common bits (block level compression -%% with something like a shared dictionary that is checkpointed every now and -%% then). -from_disk_term(StreamSrc, {Base, Extended}) - when is_tuple(Base), is_list(Extended) -> - store(Extended, from_disk_term(StreamSrc, Base)); -from_disk_term(StreamSrc, {Name,Type,Sp,AttLen,DiskLen,RevPos,Md5,Enc}) -> - {ok, Stream} = open_stream(StreamSrc, Sp), - #att{ - name=Name, - type=Type, - att_len=AttLen, - disk_len=DiskLen, - md5=Md5, - revpos=RevPos, - data={stream, Stream}, - encoding=upgrade_encoding(Enc) - }; -from_disk_term(StreamSrc, {Name,Type,Sp,AttLen,RevPos,Md5}) -> - {ok, Stream} = open_stream(StreamSrc, Sp), - #att{ - name=Name, - type=Type, - att_len=AttLen, - disk_len=AttLen, - md5=Md5, - revpos=RevPos, - data={stream, Stream} - }; -from_disk_term(StreamSrc, {Name,{Type,Sp,AttLen}}) -> - {ok, Stream} = open_stream(StreamSrc, Sp), - #att{ - name=Name, - type=Type, - att_len=AttLen, - disk_len=AttLen, - md5= <<>>, - revpos=0, - data={stream, Stream} - }. + fetch(encoding, Att), + fetch(headers, Att) + }}. + + +from_disk_term(#{} = Db, DocId, {?CURRENT_ATT_FORMAT, Props}) -> + { + Name, + Type, + AttId, + AttLen, + DiskLen, + RevPos, + Md5, + Encoding, + Headers + } = Props, + new([ + {name, Name}, + {type, Type}, + {data, {loc, Db#{tx := undefined}, DocId, AttId}}, + {att_len, AttLen}, + {disk_len, DiskLen}, + {revpos, RevPos}, + {md5, Md5}, + {encoding, Encoding}, + {headers, Headers} + ]). %% from_json reads in embedded JSON attachments and creates usable attachment @@ -433,8 +257,12 @@ stub_from_json(Att, Props) -> %% json object. See merge_stubs/3 for the stub check. RevPos = couch_util:get_value(<<"revpos">>, Props), store([ - {md5, Digest}, {revpos, RevPos}, {data, stub}, {disk_len, DiskLen}, - {att_len, EncodedLen}, {encoding, Encoding} + {data, stub}, + {disk_len, DiskLen}, + {att_len, EncodedLen}, + {revpos, RevPos}, + {md5, Digest}, + {encoding, Encoding} ], Att). @@ -443,8 +271,12 @@ follow_from_json(Att, Props) -> Digest = digest_from_json(Props), RevPos = couch_util:get_value(<<"revpos">>, Props, 0), store([ - {md5, Digest}, {revpos, RevPos}, {data, follows}, {disk_len, DiskLen}, - {att_len, EncodedLen}, {encoding, Encoding} + {data, follows}, + {disk_len, DiskLen}, + {att_len, EncodedLen}, + {revpos, RevPos}, + {md5, Digest}, + {encoding, Encoding} ], Att). @@ -455,8 +287,10 @@ inline_from_json(Att, Props) -> Length = size(Data), RevPos = couch_util:get_value(<<"revpos">>, Props, 0), store([ - {data, Data}, {revpos, RevPos}, {disk_len, Length}, - {att_len, Length} + {data, Data}, + {disk_len, Length}, + {att_len, Length}, + {revpos, RevPos} ], Att) catch _:_ -> @@ -466,7 +300,6 @@ inline_from_json(Att, Props) -> end. - encoded_lengths_from_json(Props) -> Len = couch_util:get_value(<<"length">>, Props), case couch_util:get_value(<<"encoding">>, Props) of @@ -488,9 +321,17 @@ digest_from_json(Props) -> to_json(Att, OutputData, DataToFollow, ShowEncoding) -> - [Name, Data, DiskLen, AttLen, Enc, Type, RevPos, Md5] = fetch( - [name, data, disk_len, att_len, encoding, type, revpos, md5], Att - ), + #{ + name := Name, + type := Type, + data := Data, + disk_len := DiskLen, + att_len := AttLen, + revpos := RevPos, + md5 := Md5, + encoding := Encoding, + headers := Headers + } = Att, Props = [ {<<"content_type">>, Type}, {<<"revpos">>, RevPos} @@ -505,71 +346,71 @@ to_json(Att, OutputData, DataToFollow, ShowEncoding) -> DataToFollow -> [{<<"length">>, DiskLen}, {<<"follows">>, true}]; true -> - AttData = case Enc of + AttData = case Encoding of gzip -> zlib:gunzip(to_binary(Att)); identity -> to_binary(Att) end, [{<<"data">>, base64:encode(AttData)}] end, EncodingProps = if - ShowEncoding andalso Enc /= identity -> + ShowEncoding andalso Encoding /= identity -> [ - {<<"encoding">>, couch_util:to_binary(Enc)}, + {<<"encoding">>, couch_util:to_binary(Encoding)}, {<<"encoded_length">>, AttLen} ]; true -> [] end, - HeadersProp = case fetch(headers, Att) of + HeadersProp = case Headers of undefined -> []; Headers -> [{<<"headers">>, Headers}] end, {Name, {Props ++ DigestProp ++ DataProps ++ EncodingProps ++ HeadersProp}}. -flush(Db, Att) -> - flush_data(Db, fetch(data, Att), Att). +flush(Db, DocId, Att1) -> + Att2 = read_data(fetch(data, Att1), Att1), + [ + Data, + AttLen, + DiskLen, + ReqMd5, + Encoding + ] = fetch([data, att_len, disk_len, md5, encoding], Att2), + + % Eventually, we'll check if we can compress this + % attachment here and do so if possible. + + % If we were sent a gzip'ed attachment with no + % length data, we have to set it here. + Att3 = case AttLen of + undefined -> store(att_len, DiskLen, Att2); + _ -> Att2 + end, + % If no encoding has been set, default to + % identity + Att4 = case Encoding of + undefined -> store(encoding, identity, Att3); + _ -> Att3 + end, -flush_data(Db, Data, Att) when is_binary(Data) -> - couch_db:with_stream(Db, Att, fun(OutputStream) -> - couch_stream:write(OutputStream, Data) - end); -flush_data(Db, Fun, Att) when is_function(Fun) -> - AttName = fetch(name, Att), - MaxAttSize = max_attachment_size(), - case fetch(att_len, Att) of - undefined -> - couch_db:with_stream(Db, Att, fun(OutputStream) -> - % Fun(MaxChunkSize, WriterFun) must call WriterFun - % once for each chunk of the attachment, - Fun(4096, - % WriterFun({Length, Binary}, State) - % WriterFun({0, _Footers}, State) - % Called with Length == 0 on the last time. - % WriterFun returns NewState. - fun({0, Footers}, _Total) -> - F = mochiweb_headers:from_binary(Footers), - case mochiweb_headers:get_value("Content-MD5", F) of - undefined -> - ok; - Md5 -> - {md5, base64:decode(Md5)} - end; - ({Length, Chunk}, Total0) -> - Total = Total0 + Length, - validate_attachment_size(AttName, Total, MaxAttSize), - couch_stream:write(OutputStream, Chunk), - Total - end, 0) - end); - AttLen -> - validate_attachment_size(AttName, AttLen, MaxAttSize), - couch_db:with_stream(Db, Att, fun(OutputStream) -> - write_streamed_attachment(OutputStream, Fun, AttLen) - end) - end; -flush_data(Db, {follows, Parser, Ref}, Att) -> + case Data of + {loc, _, _, _} -> + % Already flushed + Att1; + _ when is_binary(Data) -> + IdentityMd5 = get_identity_md5(Data, fetch(encoding, Att4)), + couch_util:check_md5(IdentityMd5, ReqMd5), + fabric2_db:write_attachment(Db, DocId, Att4) + end. + + +read_data({loc, #{}, _DocId, _AttId}, Att) -> + % Attachment already written to fdb + Att; + +read_data({follows, Parser, Ref}, Att) -> ParserRef = erlang:monitor(process, Parser), Fun = fun() -> Parser ! {get_bytes, Ref, self()}, @@ -583,41 +424,72 @@ flush_data(Db, {follows, Parser, Ref}, Att) -> end end, try - flush_data(Db, Fun, store(data, Fun, Att)) + read_data(Fun, store(data, Fun, Att)) after erlang:demonitor(ParserRef, [flush]) end; -flush_data(Db, {stream, StreamEngine}, Att) -> - case couch_db:is_active_stream(Db, StreamEngine) of - true -> - % Already written - Att; - false -> - NewAtt = couch_db:with_stream(Db, Att, fun(OutputStream) -> - couch_stream:copy(StreamEngine, OutputStream) - end), - InMd5 = fetch(md5, Att), - OutMd5 = fetch(md5, NewAtt), - couch_util:check_md5(OutMd5, InMd5), - NewAtt + +read_data(Data, Att) when is_binary(Data) -> + Att; + +read_data(Fun, Att) when is_function(Fun) -> + [AttName, AttLen, InMd5] = fetch([name, att_len, md5], Att), + MaxAttSize = max_attachment_size(), + case AttLen of + undefined -> + % Fun(MaxChunkSize, WriterFun) must call WriterFun + % once for each chunk of the attachment, + WriterFun = fun + ({0, Footers}, {Len, Acc}) -> + F = mochiweb_headers:from_binary(Footers), + Md5 = case mochiweb_headers:get_value("Content-MD5", F) of + undefined -> undefined; + Value -> base64:decode(Value) + end, + Props0 = [ + {data, iolist_to_binary(lists:reverse(Acc))}, + {disk_len, Len} + ], + Props1 = if InMd5 /= md5_in_footer -> Props0; true -> + [{md5, Md5} | Props0] + end, + store(Props1, Att); + ({ChunkLen, Chunk}, {Len, Acc}) -> + NewLen = Len + ChunkLen, + validate_attachment_size(AttName, NewLen, MaxAttSize), + {NewLen, [Chunk | Acc]} + end, + Fun(8192, WriterFun, {0, []}); + AttLen -> + validate_attachment_size(AttName, AttLen, MaxAttSize), + read_streamed_attachment(Att, Fun, AttLen, []) end. -write_streamed_attachment(_Stream, _F, 0) -> - ok; -write_streamed_attachment(_Stream, _F, LenLeft) when LenLeft < 0 -> +read_streamed_attachment(Att, _F, 0, Acc) -> + Bin = iolist_to_binary(lists:reverse(Acc)), + store([ + {data, Bin}, + {disk_len, size(Bin)} + ], Att); + +read_streamed_attachment(_Att, _F, LenLeft, _Acc) when LenLeft < 0 -> throw({bad_request, <<"attachment longer than expected">>}); -write_streamed_attachment(Stream, F, LenLeft) when LenLeft > 0 -> - Bin = try read_next_chunk(F, LenLeft) + +read_streamed_attachment(Att, F, LenLeft, Acc) when LenLeft > 0 -> + Bin = try + read_next_chunk(F, LenLeft) catch {mp_parser_died, normal} -> throw({bad_request, <<"attachment shorter than expected">>}) end, - ok = couch_stream:write(Stream, Bin), - write_streamed_attachment(Stream, F, LenLeft - iolist_size(Bin)). + Size = iolist_size(Bin), + read_streamed_attachment(Att, F, LenLeft - Size, [Bin | Acc]). + read_next_chunk(F, _) when is_function(F, 0) -> F(); + read_next_chunk(F, LenLeft) when is_function(F, 1) -> F(lists:min([LenLeft, 16#2000])). @@ -626,14 +498,17 @@ foldl(Att, Fun, Acc) -> foldl(fetch(data, Att), Att, Fun, Acc). +foldl({loc, Db, DocId, AttId}, _Att, Fun, Acc) -> + Bin = fabric2_db:read_attachment(Db#{tx := undefined}, DocId, AttId), + Fun(Bin, Acc); + foldl(Bin, _Att, Fun, Acc) when is_binary(Bin) -> Fun(Bin, Acc); -foldl({stream, StreamEngine}, Att, Fun, Acc) -> - Md5 = fetch(md5, Att), - couch_stream:foldl(StreamEngine, Md5, Fun, Acc); + foldl(DataFun, Att, Fun, Acc) when is_function(DataFun) -> Len = fetch(att_len, Att), fold_streamed_data(DataFun, Len, Fun, Acc); + foldl({follows, Parser, Ref}, Att, Fun, Acc) -> ParserRef = erlang:monitor(process, Parser), DataFun = fun() -> @@ -654,19 +529,26 @@ foldl({follows, Parser, Ref}, Att, Fun, Acc) -> end. +range_foldl(Bin1, From, To, Fun, Acc) when is_binary(Bin1) -> + ReadLen = To - From, + Bin2 = case Bin1 of + _ when size(Bin1) < From -> <<>>; + <<_:From/binary, B2>> -> B2 + end, + Bin3 = case Bin2 of + _ when size(Bin2) < ReadLen -> Bin2; + <<B3:ReadLen/binary, _/binary>> -> B3 + end, + Fun(Bin3, Acc); + range_foldl(Att, From, To, Fun, Acc) -> - {stream, StreamEngine} = fetch(data, Att), - couch_stream:range_foldl(StreamEngine, From, To, Fun, Acc). + {loc, Db, DocId, AttId} = fetch(data, Att), + Bin = fabric2_db:read_attachment(Db, DocId, AttId), + range_foldl(Bin, From, To, Fun, Acc). -foldl_decode(Att, Fun, Acc) -> - case fetch([data, encoding], Att) of - [{stream, StreamEngine}, Enc] -> - couch_stream:foldl_decode( - StreamEngine, fetch(md5, Att), Enc, Fun, Acc); - [Fun2, identity] -> - fold_streamed_data(Fun2, fetch(att_len, Att), Fun, Acc) - end. +foldl_decode(_Att, _Fun, _Acc) -> + erlang:error(not_supported). to_binary(Att) -> @@ -677,10 +559,8 @@ to_binary(Bin, _Att) when is_binary(Bin) -> Bin; to_binary(Iolist, _Att) when is_list(Iolist) -> iolist_to_binary(Iolist); -to_binary({stream, _StreamEngine}, Att) -> - iolist_to_binary( - lists:reverse(foldl(Att, fun(Bin,Acc) -> [Bin|Acc] end, [])) - ); +to_binary({loc, Db, DocId, AttId}, _Att) -> + fabric2_db:read_attachmet(Db, DocId, AttId); to_binary(DataFun, Att) when is_function(DataFun)-> Len = fetch(att_len, Att), iolist_to_binary( @@ -695,46 +575,22 @@ to_binary(DataFun, Att) when is_function(DataFun)-> fold_streamed_data(_RcvFun, 0, _Fun, Acc) -> Acc; + fold_streamed_data(RcvFun, LenLeft, Fun, Acc) when LenLeft > 0-> Bin = RcvFun(), ResultAcc = Fun(Bin, Acc), fold_streamed_data(RcvFun, LenLeft - size(Bin), Fun, ResultAcc). -%% Upgrade an attachment record to a property list on demand. This is a one-way -%% operation as downgrading potentially truncates fields with important data. --spec upgrade(#att{}) -> attachment(). -upgrade(#att{} = Att) -> - Map = lists:zip( - record_info(fields, att), - lists:seq(2, record_info(size, att)) - ), - %% Don't store undefined elements since that is default - [{F, element(I, Att)} || {F, I} <- Map, element(I, Att) /= undefined]; -upgrade(Att) -> - Att. - - -%% Downgrade is exposed for interactive convenience. In practice, unless done -%% manually, upgrades are always one-way. -downgrade(#att{} = Att) -> - Att; -downgrade(Att) -> - #att{ - name = fetch(name, Att), - type = fetch(type, Att), - att_len = fetch(att_len, Att), - disk_len = fetch(disk_len, Att), - md5 = fetch(md5, Att), - revpos = fetch(revpos, Att), - data = fetch(data, Att), - encoding = fetch(encoding, Att) - }. - - -upgrade_encoding(true) -> gzip; -upgrade_encoding(false) -> identity; -upgrade_encoding(Encoding) -> Encoding. +get_identity_md5(Bin, gzip) -> + Z = zlib:open(), + ok = zlib:inflateInit(Z, 16 + 15), + Inflated = zlib:inflate(Z, Bin), + ok = zlib:inflateEnd(Z), + ok = zlib:close(Z), + couch_hash:md5_hash(Inflated); +get_identity_md5(Bin, _) -> + couch_hash:md5_hash(Bin). max_attachment_size() -> @@ -753,18 +609,22 @@ validate_attachment_size(_AttName, _AttSize, _MAxAttSize) -> ok. -open_stream(StreamSrc, Data) -> - case couch_db:is_db(StreamSrc) of - true -> - couch_db:open_read_stream(StreamSrc, Data); - false -> - case is_function(StreamSrc, 1) of - true -> - StreamSrc(Data); - false -> - erlang:error({invalid_stream_source, StreamSrc}) - end - end. +%% is_compressible(Type) when is_binary(Type) -> +%% is_compressible(binary_to_list(Type)); +%% is_compressible(Type) -> +%% TypeExpList = re:split( +%% config:get("attachments", "compressible_types", ""), +%% "\\s*,\\s*", +%% [{return, list}] +%% ), +%% lists:any( +%% fun(TypeExp) -> +%% Regexp = ["^\\s*", re:replace(TypeExp, "\\*", ".*"), +%% "(?:\\s*;.*?)?\\s*", $$], +%% re:run(Type, Regexp, [caseless]) =/= nomatch +%% end, +%% [T || T <- TypeExpList, T /= []] +%% ). -ifdef(TEST). diff --git a/src/couch/src/couch_doc.erl b/src/couch/src/couch_doc.erl index 4a49372c7..d33325eb1 100644 --- a/src/couch/src/couch_doc.erl +++ b/src/couch/src/couch_doc.erl @@ -374,6 +374,17 @@ rev_info({#doc{} = Doc, {Pos, [RevId | _]}}) -> body_sp = undefined, seq = undefined, rev = {Pos, RevId} + }; +rev_info({#{} = RevInfo, {Pos, [RevId | _]}}) -> + #{ + deleted := Deleted, + sequence := Sequence + } = RevInfo, + #rev_info{ + deleted = Deleted, + body_sp = undefined, + seq = Sequence, + rev = {Pos, RevId} }. is_deleted(#full_doc_info{rev_tree=Tree}) -> diff --git a/src/couch_replicator/src/couch_replicator_api_wrap.erl b/src/couch_replicator/src/couch_replicator_api_wrap.erl index 44c290d33..f73141d9a 100644 --- a/src/couch_replicator/src/couch_replicator_api_wrap.erl +++ b/src/couch_replicator/src/couch_replicator_api_wrap.erl @@ -99,13 +99,13 @@ db_open(#httpdb{} = Db1, _Options, Create, CreateParams) -> _ -> {ok, Db} end; - (200, _, _Body) -> + (200, _H, _Body) -> throw({db_not_found, ?l2b(db_uri(Db))}); (401, _, _) -> throw({unauthorized, ?l2b(db_uri(Db))}); (403, _, _) -> throw({forbidden, ?l2b(db_uri(Db))}); - (_, _, _) -> + (_A, _B, _C) -> throw({db_not_found, ?l2b(db_uri(Db))}) end) catch @@ -501,11 +501,12 @@ changes_since(#httpdb{headers = Headers1, timeout = InactiveTimeout} = HttpDb, JsonDocIds = ?JSON_ENCODE({[{<<"doc_ids">>, DocIds}]}), {[{"filter", "_doc_ids"} | BaseQArgs], post, JsonDocIds, Headers2} end, + Me = lists:flatten(io_lib:format("~p", [self()])), try send_req( HttpDb, [{method, Method}, {path, "_changes"}, {qs, QArgs}, - {headers, Headers}, {body, Body}, + {headers, Headers ++ [{"XKCD", Me}]}, {body, Body}, {ibrowse_options, [{stream_to, {self(), once}}]}], fun(200, _, DataStreamFun) -> parse_changes_feed(Options, UserFun, DataStreamFun); diff --git a/src/couch_replicator/src/couch_replicator_changes_reader.erl b/src/couch_replicator/src/couch_replicator_changes_reader.erl index 2e4df5365..9911f4834 100644 --- a/src/couch_replicator/src/couch_replicator_changes_reader.erl +++ b/src/couch_replicator/src/couch_replicator_changes_reader.erl @@ -98,6 +98,7 @@ process_change(#doc_info{id = Id} = DocInfo, {Parent, Db, ChangesQueue, _}) -> Stats = couch_replicator_stats:new([{doc_write_failures, 1}]), ok = gen_server:call(Parent, {add_stats, Stats}, infinity); false -> + couch_log:error("XKCD: REPL CHANGE: ~p", [DocInfo#doc_info.high_seq]), ok = couch_work_queue:queue(ChangesQueue, DocInfo), put(last_seq, DocInfo#doc_info.high_seq) end; diff --git a/src/couch_replicator/src/couch_replicator_scheduler_job.erl b/src/couch_replicator/src/couch_replicator_scheduler_job.erl index 412ff7d05..7786a7d48 100644 --- a/src/couch_replicator/src/couch_replicator_scheduler_job.erl +++ b/src/couch_replicator/src/couch_replicator_scheduler_job.erl @@ -977,6 +977,7 @@ update_task(State) -> current_through_seq = {_, ThroughSeq}, highest_seq_done = {_, HighestSeq} } = State, + couch_log:error("XKCD: UPDATE REPL TASK: ~p : ~p", [ThroughSeq, HighestSeq]), update_scheduler_job_stats(State), couch_task_status:update( rep_stats(State) ++ [ diff --git a/src/ddoc_cache/src/ddoc_cache_entry_ddocid.erl b/src/ddoc_cache/src/ddoc_cache_entry_ddocid.erl index 5248469fb..7c3dc6787 100644 --- a/src/ddoc_cache/src/ddoc_cache_entry_ddocid.erl +++ b/src/ddoc_cache/src/ddoc_cache_entry_ddocid.erl @@ -33,7 +33,7 @@ ddocid({_, DDocId}) -> recover({DbName, DDocId}) -> - fabric:open_doc(DbName, DDocId, [ejson_body, ?ADMIN_CTX]). + fabric2_db:open_doc(DbName, DDocId, [ejson_body, ?ADMIN_CTX]). insert({DbName, DDocId}, {ok, #doc{revs = Revs} = DDoc}) -> diff --git a/src/ddoc_cache/src/ddoc_cache_entry_ddocid_rev.erl b/src/ddoc_cache/src/ddoc_cache_entry_ddocid_rev.erl index 868fa7789..38445af96 100644 --- a/src/ddoc_cache/src/ddoc_cache_entry_ddocid_rev.erl +++ b/src/ddoc_cache/src/ddoc_cache_entry_ddocid_rev.erl @@ -34,7 +34,7 @@ ddocid({_, DDocId, _}) -> recover({DbName, DDocId, Rev}) -> Opts = [ejson_body, ?ADMIN_CTX], - {ok, [Resp]} = fabric:open_revs(DbName, DDocId, [Rev], Opts), + {ok, [Resp]} = fabric2_db:open_doc_revs(DbName, DDocId, [Rev], Opts), Resp. diff --git a/src/fabric/src/fabric2.hrl b/src/fabric/src/fabric2.hrl index e8d0b13c9..7ea0577f0 100644 --- a/src/fabric/src/fabric2.hrl +++ b/src/fabric/src/fabric2.hrl @@ -40,6 +40,7 @@ -define(DB_REVS, 20). -define(DB_DOCS, 21). -define(DB_LOCAL_DOCS, 22). +-define(DB_ATTS, 23). % Versions @@ -54,3 +55,6 @@ -define(PDICT_TX_ID_KEY, '$fabric_tx_id'). -define(PDICT_TX_RES_KEY, '$fabric_tx_result'). -define(COMMIT_UNKNOWN_RESULT, 1021). + + +-define(ATTACHMENT_CHUNK_SIZE, 100000). diff --git a/src/fabric/src/fabric2_db.erl b/src/fabric/src/fabric2_db.erl index a9c17c992..230cec0d3 100644 --- a/src/fabric/src/fabric2_db.erl +++ b/src/fabric/src/fabric2_db.erl @@ -70,9 +70,9 @@ open_doc/3, open_doc_revs/4, %% open_doc_int/3, - %% get_doc_info/2, - %% get_full_doc_info/2, - %% get_full_doc_infos/2, + get_doc_info/2, + get_full_doc_info/2, + get_full_doc_infos/2, get_missing_revs/2, %% get_design_doc/2, %% get_design_docs/1, @@ -94,10 +94,8 @@ %% purge_docs/2, %% purge_docs/3, - %% with_stream/3, - %% open_write_stream/2, - %% open_read_stream/2, - %% is_active_stream/2, + read_attachment/3, + write_attachment/3, fold_docs/3, fold_docs/4, @@ -461,7 +459,43 @@ open_doc_revs(Db, DocId, Revs, Options) -> end). -get_missing_revs(Db, IdRevs) -> +get_doc_info(Db, DocId) -> + case get_full_doc_info(Db, DocId) of + not_found -> not_found; + FDI -> couch_doc:to_doc_info(FDI) + end. + + +get_full_doc_info(Db, DocId) -> + RevInfos = fabric2_fdb:transactional(Db, fun(TxDb) -> + fabric2_fdb:get_all_revs(TxDb, DocId) + end), + if RevInfos == [] -> not_found; true -> + #{winner := true} = Winner = lists:last(RevInfos), + RevTree = lists:foldl(fun(RI, TreeAcc) -> + RIPath = fabric2_util:revinfo_to_path(RI), + {Merged, _} = couch_key_tree:merge(TreeAcc, RIPath), + Merged + end, [], RevInfos), + #full_doc_info{ + id = DocId, + update_seq = fabric2_fdb:vs_to_seq(maps:get(sequence, Winner)), + deleted = maps:get(deleted, Winner), + rev_tree = RevTree + } + end. + + +get_full_doc_infos(Db, DocIds) -> + fabric2_fdb:transactional(Db, fun(TxDb) -> + lists:map(fun(DocId) -> + get_full_doc_info(TxDb, DocId) + end, DocIds) + end). + + +get_missing_revs(Db, JsonIdRevs) -> + IdRevs = [idrevs(IdR) || IdR <- JsonIdRevs], AllRevInfos = fabric2_fdb:transactional(Db, fun(TxDb) -> lists:foldl(fun({Id, _Revs}, Acc) -> case maps:is_key(Id, Acc) of @@ -542,6 +576,20 @@ update_docs(Db, Docs, Options) -> {Status, Resps1}. +read_attachment(Db, DocId, AttId) -> + fabric2_fdb:transactional(Db, fun(TxDb) -> + fabric2_fdb:read_attachment(TxDb, DocId, AttId) + end). + + +write_attachment(Db, DocId, Att) -> + Data = couch_att:fetch(data, Att), + {ok, AttId} = fabric2_fdb:transactional(Db, fun(TxDb) -> + fabric2_fdb:write_attachment(TxDb, DocId, Data) + end), + couch_att:store(data, {loc, Db, DocId, AttId}, Att). + + fold_docs(Db, UserFun, UserAcc) -> fold_docs(Db, UserFun, UserAcc, []). @@ -718,8 +766,8 @@ apply_open_doc_opts(Doc, Revs, Options) -> end, Meta4 = if not IncludeLocalSeq -> []; true -> - #{winner := true, sequence := Seq} = lists:last(Revs), - [{local_seq, erlfdb_tuple:pack({Seq})}] + #{winner := true, sequence := SeqVS} = lists:last(Revs), + [{local_seq, fabric2_fdb:vs_to_seq(SeqVS)}] end, case Doc#doc.deleted and not ReturnDeleted of @@ -907,6 +955,8 @@ update_doc_interactive(Db, Doc0, Future, _Options) -> revs = {NewRevPos, [NewRev | NewRevPath]} } = Doc3 = new_revid(Doc2), + Doc4 = update_attachment_revpos(Doc3), + NewRevInfo = #{ winner => undefined, deleted => NewDeleted, @@ -918,9 +968,9 @@ update_doc_interactive(Db, Doc0, Future, _Options) -> % Gather the list of possible winnig revisions Possible = case Target == Winner of - true when not Doc3#doc.deleted -> + true when not Doc4#doc.deleted -> [NewRevInfo]; - true when Doc3#doc.deleted -> + true when Doc4#doc.deleted -> case SecondPlace of #{} -> [NewRevInfo, SecondPlace]; not_found -> [NewRevInfo] @@ -945,7 +995,7 @@ update_doc_interactive(Db, Doc0, Future, _Options) -> ok = fabric2_fdb:write_doc( Db, - Doc3, + Doc4, NewWinner, Winner, ToUpdate, @@ -1049,6 +1099,21 @@ update_local_doc(Db, Doc0, _Options) -> {ok, {0, integer_to_binary(Rev)}}. +update_attachment_revpos(#doc{revs = {RevPos, _Revs}, atts = Atts0} = Doc) -> + Atts = lists:map(fun(Att) -> + case couch_att:fetch(data, Att) of + {loc, _Db, _DocId, _AttId} -> + % Attachment was already on disk + Att; + _ -> + % We will write this attachment with this update + % so mark it with the RevPos that will be written + couch_att:store(revpos, RevPos, Att) + end + end, Atts0), + Doc#doc{atts = Atts}. + + get_winning_rev_futures(Db, Docs) -> lists:foldl(fun(Doc, Acc) -> #doc{ @@ -1068,29 +1133,30 @@ get_winning_rev_futures(Db, Docs) -> end, #{}, Docs). -prep_and_validate(Db, Doc, PrevRevInfo) -> - HasStubs = couch_doc:has_stubs(Doc), +prep_and_validate(Db, NewDoc, PrevRevInfo) -> + HasStubs = couch_doc:has_stubs(NewDoc), HasVDUs = [] /= maps:get(validate_doc_update_funs, Db), - IsDDoc = case Doc#doc.id of + IsDDoc = case NewDoc#doc.id of <<?DESIGN_DOC_PREFIX, _/binary>> -> true; _ -> false end, PrevDoc = case HasStubs orelse (HasVDUs and not IsDDoc) of true when PrevRevInfo /= not_found -> - case fabric2_fdb:get_doc_body(Db, Doc#doc.id, PrevRevInfo) of - #doc{} = Doc -> Doc; + case fabric2_fdb:get_doc_body(Db, NewDoc#doc.id, PrevRevInfo) of + #doc{} = PDoc -> PDoc; {not_found, _} -> nil end; _ -> nil end, - MergedDoc = if not HasStubs -> Doc; true -> + MergedDoc = if not HasStubs -> NewDoc; true -> % This will throw an error if we have any % attachment stubs missing data - couch_doc:merge_stubs(Doc, PrevDoc) + couch_doc:merge_stubs(NewDoc, PrevDoc) end, + check_duplicate_attachments(MergedDoc), validate_doc_update(Db, MergedDoc, PrevDoc), MergedDoc. @@ -1140,6 +1206,16 @@ validate_ddoc(Db, DDoc) -> end. +check_duplicate_attachments(#doc{atts = Atts}) -> + lists:foldl(fun(Att, Names) -> + Name = couch_att:fetch(name, Att), + case ordsets:is_element(Name, Names) of + true -> throw({bad_request, <<"Duplicate attachments">>}); + false -> ordsets:add_element(Name, Names) + end + end, ordsets:new(), Atts). + + get_leaf_path(Pos, Rev, [{Pos, [{Rev, _RevInfo} | LeafPath]} | _]) -> LeafPath; get_leaf_path(Pos, Rev, [_WrongLeaf | RestLeafs]) -> @@ -1187,3 +1263,20 @@ tag_docs([#doc{meta = Meta} = Doc | Rest]) -> doc_tag(#doc{meta = Meta}) -> fabric2_util:get_value(ref, Meta). + + +idrevs({Id, Revs}) when is_list(Revs) -> + {docid(Id), [rev(R) || R <- Revs]}. + + +docid(DocId) when is_list(DocId) -> + list_to_binary(DocId); +docid(DocId) -> + DocId. + + +rev(Rev) when is_list(Rev); is_binary(Rev) -> + couch_doc:parse_rev(Rev); +rev({Seq, Hash} = Rev) when is_integer(Seq), is_binary(Hash) -> + Rev. + diff --git a/src/fabric/src/fabric2_events.erl b/src/fabric/src/fabric2_events.erl new file mode 100644 index 000000000..a5717147f --- /dev/null +++ b/src/fabric/src/fabric2_events.erl @@ -0,0 +1,84 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(fabric2_events). + + +-export([ + link_listener/4, + stop_listener/1 +]). + +-export([ + init/5, + poll/5 +]). + + +-include_lib("couch/include/couch_db.hrl"). + + +link_listener(Mod, Fun, St, Options) -> + DbName = fabric2_util:get_value(dbname, Options), + Pid = spawn_link(?MODULE, init, [self(), DbName, Mod, Fun, St]), + receive + {Pid, initialized} -> ok + end, + {ok, Pid}. + + +stop_listener(Pid) -> + Pid ! stop_listening. + + +init(Parent, DbName, Mod, Fun, St) -> + {ok, Db} = fabric2_db:open(DbName, [?ADMIN_CTX]), + Since = fabric2_db:get_update_seq(Db), + couch_log:error("XKCD: START LISTENER: ~s : ~p for ~p", [DbName, Since, Parent]), + erlang:monitor(process, Parent), + Parent ! {self(), initialized}, + poll(DbName, Since, Mod, Fun, St), + couch_log:error("XKCD: STOP LISTENER for ~p", [Parent]). + + +poll(DbName, Since, Mod, Fun, St) -> + {Resp, NewSince} = try + case fabric2_db:open(DbName, [?ADMIN_CTX]) of + {ok, Db} -> + case fabric2_db:get_update_seq(Db) of + Since -> + couch_log:error("XKCD: NO UPDATE: ~s :: ~p", [DbName, Since]), + {{ok, St}, Since}; + Other -> + couch_log:error("XKCD: UPDATED: ~s :: ~p -> ~p", [DbName, Since, Other]), + {Mod:Fun(DbName, updated, St), Other} + end; + Error -> + exit(Error) + end + catch error:database_does_not_exist -> + Mod:Fun(DbName, deleted, St) + end, + receive + stop_listening -> + ok; + {'DOWN', _, _, _, _} -> + ok + after 0 -> + case Resp of + {ok, NewSt} -> + timer:sleep(1000), + ?MODULE:poll(DbName, NewSince, Mod, Fun, NewSt); + {stop, _} -> + ok + end + end. diff --git a/src/fabric/src/fabric2_fdb.erl b/src/fabric/src/fabric2_fdb.erl index d57ad3ecd..666a20b32 100644 --- a/src/fabric/src/fabric2_fdb.erl +++ b/src/fabric/src/fabric2_fdb.erl @@ -45,10 +45,15 @@ write_doc/6, write_local_doc/2, + read_attachment/3, + write_attachment/3, + fold_docs/4, fold_changes/5, get_last_change/1, + vs_to_seq/1, + debug_cluster/0, debug_cluster/2 ]). @@ -260,13 +265,12 @@ get_info(#{} = Db) -> RawSeq = case erlfdb:wait(ChangesFuture) of [] -> - fabric2_util:seq_zero(); + vs_to_seq(fabric2_util:seq_zero_vs()); [{SeqKey, _}] -> {?DB_CHANGES, SeqVS} = erlfdb_tuple:unpack(SeqKey, DbPrefix), - <<51:8, SeqBin:12/binary>> = erlfdb_tuple:pack({SeqVS}), - SeqBin + vs_to_seq(SeqVS) end, - CProp = {update_seq, fabric2_util:to_hex(RawSeq)}, + CProp = {update_seq, RawSeq}, MProps = lists:flatmap(fun({K, V}) -> case erlfdb_tuple:unpack(K, DbPrefix) of @@ -565,13 +569,37 @@ write_local_doc(#{} = Db0, Doc) -> ok. -write_doc_body(#{} = Db0, #doc{} = Doc) -> +read_attachment(#{} = Db, DocId, AttId) -> #{ - tx := Tx - } = Db = ensure_current(Db0), + tx := Tx, + db_prefix := DbPrefix + } = ensure_current(Db), - {NewDocKey, NewDocVal} = doc_to_fdb(Db, Doc), - erlfdb:set(Tx, NewDocKey, NewDocVal). + AttKey = erlfdb_tuple:pack({?DB_ATTS, DocId, AttId}, DbPrefix), + case erlfdb:wait(erlfdb:get_range_startswith(Tx, AttKey)) of + not_found -> + throw({not_found, missing}); + KVs -> + Vs = [V || {_K, V} <- KVs], + iolist_to_binary(Vs) + end. + + +write_attachment(#{} = Db, DocId, Data) when is_binary(Data) -> + #{ + tx := Tx, + db_prefix := DbPrefix + } = ensure_current(Db), + + AttId = fabric2_util:uuid(), + Chunks = chunkify_attachment(Data), + + lists:foldl(fun(Chunk, ChunkId) -> + AttKey = erlfdb_tuple:pack({?DB_ATTS, DocId, AttId, ChunkId}, DbPrefix), + ok = erlfdb:set(Tx, AttKey, Chunk), + ChunkId + 1 + end, 0, Chunks), + {ok, AttId}. fold_docs(#{} = Db, UserFun, UserAcc0, Options) -> @@ -634,38 +662,21 @@ fold_changes(#{} = Db, SinceSeq0, UserFun, UserAcc0, Options) -> end, try - % We have to track this to return last_seq - <<51:8, FirstSeq:12/binary>> = erlfdb_tuple:pack({SinceSeq1}), - put('$last_changes_seq', fabric2_util:to_hex(FirstSeq)), - - UserAcc1 = maybe_stop(UserFun(Db, start, UserAcc0)), - - UserAcc2 = erlfdb:fold_range(Tx, Start, End, fun({K, V}, UserAccIn) -> - {?DB_CHANGES, UpdateSeq} = erlfdb_tuple:unpack(K, DbPrefix), + {ok, erlfdb:fold_range(Tx, Start, End, fun({K, V}, UserAccIn) -> + {?DB_CHANGES, SeqVS} = erlfdb_tuple:unpack(K, DbPrefix), {DocId, Deleted, RevId} = erlfdb_tuple:unpack(V), - % This comes back as a versionstamp so we have - % to pack it to get a binary. - <<51:8, SeqBin:12/binary>> = erlfdb_tuple:pack({UpdateSeq}), - SeqHex = fabric2_util:to_hex(SeqBin), - put('$last_changes_seq', SeqHex), + Change = #{ + id => DocId, + sequence => vs_to_seq(SeqVS), + rev_id => RevId, + deleted => Deleted + }, - DelMember = if not Deleted -> []; true -> - [{deleted, true}] - end, - - maybe_stop(UserFun(Db, {change, {[ - {seq, SeqHex}, - {id, DocId}, - {changes, [{[{rev, couch_doc:rev_to_str(RevId)}]}]} - ] ++ DelMember}}, UserAccIn)) - end, UserAcc1, [{reverse, Reverse}] ++ Options), - - UserFun(Db, {stop, get('$last_changes_seq'), null}, UserAcc2) + maybe_stop(UserFun(Change, UserAccIn)) + end, UserAcc0, [{reverse, Reverse}] ++ Options)} catch throw:{stop, FinalUserAcc} -> {ok, FinalUserAcc} - after - erase('$last_changes_seq') end. @@ -682,8 +693,7 @@ get_last_change(#{} = Db) -> fabric2_util:to_hex(fabric2_util:seq_zero()); [{K, _V}] -> {?DB_CHANGES, SeqVS} = erlfdb_tuple:unpack(K, DbPrefix), - <<51:8, SeqBin:12/binary>> = erlfdb_tuple:pack({SeqVS}), - fabric2_util:to_hex(SeqBin) + vs_to_seq(SeqVS) end. @@ -693,6 +703,11 @@ maybe_stop({stop, Acc}) -> throw({stop, Acc}). +vs_to_seq(VS) -> + <<51:8, SeqBin:12/binary>> = erlfdb_tuple:pack({VS}), + fabric2_util:to_hex(SeqBin). + + debug_cluster() -> debug_cluster(<<>>, <<16#FE, 16#FF, 16#FF>>). @@ -730,6 +745,15 @@ bump_metadata_version(Tx) -> erlfdb:set_versionstamped_value(Tx, ?METADATA_VERSION_KEY, <<0:112>>). +write_doc_body(#{} = Db0, #doc{} = Doc) -> + #{ + tx := Tx + } = Db = ensure_current(Db0), + + {NewDocKey, NewDocVal} = doc_to_fdb(Db, Doc), + erlfdb:set(Tx, NewDocKey, NewDocVal). + + revinfo_to_fdb(Tx, DbPrefix, DocId, #{winner := true} = RevId) -> #{ deleted := Deleted, @@ -793,7 +817,7 @@ doc_to_fdb(Db, #doc{} = Doc) -> body = Body, atts = Atts, deleted = Deleted - } = Doc, + } = doc_flush_atts(Db, Doc), Key = erlfdb_tuple:pack({?DB_DOCS, Id, Start, Rev}, DbPrefix), Val = {Body, Atts, Deleted}, @@ -846,6 +870,24 @@ fdb_to_local_doc(_Db, _DocId, not_found) -> {not_found, missing}. +doc_flush_atts(Db, Doc) -> + Atts = lists:map(fun(Att) -> + couch_att:flush(Db, Doc#doc.id, Att) + end, Doc#doc.atts), + Doc#doc{atts = Atts}. + + +chunkify_attachment(Data) -> + case Data of + <<>> -> + []; + <<Head:?ATTACHMENT_CHUNK_SIZE/binary, Rest/binary>> -> + [Head | chunkify_attachment(Rest)]; + <<_/binary>> when size(Data) < ?ATTACHMENT_CHUNK_SIZE -> + [Data] + end. + + get_dir_and_bounds(DbPrefix, Options) -> Reverse = case fabric2_util:get_value(dir, Options, fwd) of fwd -> false; @@ -906,7 +948,7 @@ get_dir_and_bounds(DbPrefix, Options) -> {Reverse, StartKey4, EndKey4}. -get_since_seq(Seq) when Seq == 0; Seq == <<"0">>; Seq == <<>> -> +get_since_seq(Seq) when Seq == <<>>; Seq == <<"0">>; Seq == 0-> fabric2_util:seq_zero_vs(); get_since_seq(Seq) when Seq == now; Seq == <<"now">> -> @@ -916,7 +958,13 @@ get_since_seq(Seq) when is_binary(Seq), size(Seq) == 24 -> Seq1 = fabric2_util:from_hex(Seq), Seq2 = <<51:8, Seq1/binary>>, {SeqVS} = erlfdb_tuple:unpack(Seq2), - SeqVS. + SeqVS; + +get_since_seq(List) when is_list(List) -> + get_since_seq(list_to_binary(List)); + +get_since_seq(Seq) -> + erlang:error({invalid_since_seq, Seq}). get_db_handle() -> diff --git a/src/fabric/src/fabric2_util.erl b/src/fabric/src/fabric2_util.erl index 1696f06a6..6e2df67c2 100644 --- a/src/fabric/src/fabric2_util.erl +++ b/src/fabric/src/fabric2_util.erl @@ -17,7 +17,6 @@ revinfo_to_path/1, sort_revinfos/1, - seq_zero/0, seq_zero_vs/0, seq_max_vs/0, @@ -66,10 +65,6 @@ rev_sort_key(#{} = RevInfo) -> {not Deleted, RevPos, Rev}. -seq_zero() -> - <<0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0>>. - - seq_zero_vs() -> {versionstamp, 0, 0, 0}. diff --git a/src/fabric/test/fabric2_doc_crud_tests.erl b/src/fabric/test/fabric2_doc_crud_tests.erl index 17e8c3689..85b276679 100644 --- a/src/fabric/test/fabric2_doc_crud_tests.erl +++ b/src/fabric/test/fabric2_doc_crud_tests.erl @@ -53,6 +53,7 @@ doc_crud_test_() -> fun open_doc_revs_all/1, fun open_doc_revs_latest/1, fun get_missing_revs_basic/1, + fun get_missing_revs_on_missing_doc/1, fun open_missing_local_doc/1, fun create_local_doc_basic/1, fun update_local_doc_basic/1, @@ -615,6 +616,20 @@ get_missing_revs_basic({Db, _}) -> ). +get_missing_revs_on_missing_doc({Db, _}) -> + Revs = lists:sort([ + couch_doc:rev_to_str({1, fabric2_util:uuid()}), + couch_doc:rev_to_str({2, fabric2_util:uuid()}), + couch_doc:rev_to_str({800, fabric2_util:uuid()}) + ]), + DocId = fabric2_util:uuid(), + {ok, Resp} = fabric2_db:get_missing_revs(Db, [{DocId, Revs}]), + ?assertMatch([{DocId, [_ | _], []}], Resp), + [{DocId, Missing, _}] = Resp, + MissingStrs = [couch_doc:rev_to_str(Rev) || Rev <- Missing], + ?assertEqual(Revs, lists:sort(MissingStrs)). + + open_missing_local_doc({Db, _}) -> ?assertEqual( {not_found, missing}, |