diff options
author | Paul J. Davis <paul.joseph.davis@gmail.com> | 2019-06-05 13:43:20 -0500 |
---|---|---|
committer | Paul J. Davis <paul.joseph.davis@gmail.com> | 2019-07-31 11:55:30 -0500 |
commit | 0cf5f463d0b73427656cea4465582f5102508627 (patch) | |
tree | 1906369a0695f6f0d093aa403563f9f7ad1fcdde | |
parent | 9083da6c46dedc0c63d597cd12c8ea13bd9eb304 (diff) | |
download | couchdb-0cf5f463d0b73427656cea4465582f5102508627.tar.gz |
Start switching chttpd HTTP endpoints to fabric2
This is not an exhaustive port of the entire chttpd API. However, this
is enough to support basic CRUD operations far enough that replication
works.
-rw-r--r-- | src/chttpd/src/chttpd.erl | 11 | ||||
-rw-r--r-- | src/chttpd/src/chttpd_auth_request.erl | 7 | ||||
-rw-r--r-- | src/chttpd/src/chttpd_changes.erl | 973 | ||||
-rw-r--r-- | src/chttpd/src/chttpd_db.erl | 328 | ||||
-rw-r--r-- | src/chttpd/src/chttpd_external.erl | 35 | ||||
-rw-r--r-- | src/chttpd/src/chttpd_misc.erl | 62 | ||||
-rw-r--r-- | src/chttpd/src/chttpd_show.erl | 5 | ||||
-rw-r--r-- | src/couch_mrview/src/couch_mrview.erl | 16 | ||||
-rw-r--r-- | test/elixir/test/basics_test.exs | 2 |
9 files changed, 1183 insertions, 256 deletions
diff --git a/src/chttpd/src/chttpd.erl b/src/chttpd/src/chttpd.erl index 1e1d638be..4d32c03c5 100644 --- a/src/chttpd/src/chttpd.erl +++ b/src/chttpd/src/chttpd.erl @@ -25,7 +25,7 @@ error_info/1, parse_form/1, json_body/1, json_body_obj/1, body/1, doc_etag/1, make_etag/1, etag_respond/3, etag_match/2, partition/1, serve_file/3, serve_file/4, - server_header/0, start_chunked_response/3,send_chunk/2, + server_header/0, start_chunked_response/3,send_chunk/2,last_chunk/1, start_response_length/4, send/2, start_json_response/2, start_json_response/3, end_json_response/1, send_response/4, send_response_no_cors/4, @@ -743,7 +743,14 @@ start_chunked_response(#httpd{mochi_req=MochiReq}=Req, Code, Headers0) -> {ok, Resp}. send_chunk(Resp, Data) -> - Resp:write_chunk(Data), + case iolist_size(Data) of + 0 -> ok; % do nothing + _ -> Resp:write_chunk(Data) + end, + {ok, Resp}. + +last_chunk(Resp) -> + Resp:write_chunk([]), {ok, Resp}. send_response(Req, Code, Headers0, Body) -> diff --git a/src/chttpd/src/chttpd_auth_request.erl b/src/chttpd/src/chttpd_auth_request.erl index 96dbf980c..7210905f0 100644 --- a/src/chttpd/src/chttpd_auth_request.erl +++ b/src/chttpd/src/chttpd_auth_request.erl @@ -103,7 +103,8 @@ server_authorization_check(#httpd{path_parts=[<<"_", _/binary>>|_]}=Req) -> require_admin(Req). db_authorization_check(#httpd{path_parts=[DbName|_],user_ctx=Ctx}=Req) -> - {_} = fabric:get_security(DbName, [{user_ctx, Ctx}]), + {ok, Db} = fabric2_db:open(DbName, [{user_ctx, Ctx}]), + fabric2_db:check_is_member(Db), Req. require_admin(Req) -> @@ -111,8 +112,8 @@ require_admin(Req) -> Req. require_db_admin(#httpd{path_parts=[DbName|_],user_ctx=Ctx}=Req) -> - Sec = fabric:get_security(DbName, [{user_ctx, Ctx}]), - + {ok, Db} = fabric2_db:open(DbName, [{user_ctx, Ctx}]), + Sec = fabric2_db:get_security(Db), case is_db_admin(Ctx,Sec) of true -> Req; false -> throw({unauthorized, <<"You are not a server or db admin.">>}) diff --git a/src/chttpd/src/chttpd_changes.erl b/src/chttpd/src/chttpd_changes.erl new file mode 100644 index 000000000..30caab2a0 --- /dev/null +++ b/src/chttpd/src/chttpd_changes.erl @@ -0,0 +1,973 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(chttpd_changes). +-include_lib("couch/include/couch_db.hrl"). +-include_lib("couch_mrview/include/couch_mrview.hrl"). + +-export([ + handle_db_changes/3, + handle_changes/4, + get_changes_timeout/2, + wait_updated/3, + get_rest_updated/1, + configure_filter/4, + filter/3, + handle_db_event/3, + handle_view_event/3, + view_filter/3, + send_changes_doc_ids/6, + send_changes_design_docs/6 +]). + +-export([changes_enumerator/2]). + +%% export so we can use fully qualified call to facilitate hot-code upgrade +-export([ + keep_sending_changes/3 +]). + +-record(changes_acc, { + db, + view_name, + ddoc_name, + view, + seq, + prepend, + filter, + callback, + user_acc, + resp_type, + limit, + include_docs, + doc_options, + conflicts, + timeout, + timeout_fun, + aggregation_kvs, + aggregation_results +}). + +handle_db_changes(Args, Req, Db) -> + handle_changes(Args, Req, Db, db). + +handle_changes(Args1, Req, Db, Type) -> + ReqPid = chttpd:header_value(Req, "XKCD", "<unknown>"), + #changes_args{ + style = Style, + filter = FilterName, + feed = Feed, + dir = Dir, + since = Since + } = Args1, + couch_log:error("XKCD: STARTING CHANGES FEED ~p for ~s : ~p", [self(), ReqPid, Since]), + Filter = configure_filter(FilterName, Style, Req, Db), + Args = Args1#changes_args{filter_fun = Filter}, + % The type of changes feed depends on the supplied filter. If the query is + % for an optimized view-filtered db changes, we need to use the view + % sequence tree. + {UseViewChanges, DDocName, ViewName} = case {Type, Filter} of + {{view, DDocName0, ViewName0}, _} -> + {true, DDocName0, ViewName0}; + {_, {fast_view, _, DDoc, ViewName0}} -> + {true, DDoc#doc.id, ViewName0}; + _ -> + {false, undefined, undefined} + end, + DbName = fabric2_db:name(Db), + {StartListenerFun, View} = if UseViewChanges -> + {ok, {_, View0, _}, _, _} = couch_mrview_util:get_view( + DbName, DDocName, ViewName, #mrargs{}), + case View0#mrview.seq_btree of + #btree{} -> + ok; + _ -> + throw({bad_request, "view changes not enabled"}) + end, + SNFun = fun() -> + couch_event:link_listener( + ?MODULE, handle_view_event, {self(), DDocName}, [{dbname, DbName}] + ) + end, + {SNFun, View0}; + true -> + SNFun = fun() -> + fabric2_events:link_listener( + ?MODULE, handle_db_event, self(), [{dbname, DbName}] + ) + end, + {SNFun, undefined} + end, + Start = fun() -> + StartSeq = case Dir of + rev -> + fabric2_fdb:get_update_seq(Db); + fwd -> + Since + end, + View2 = if UseViewChanges -> + {ok, {_, View1, _}, _, _} = couch_mrview_util:get_view( + DbName, DDocName, ViewName, #mrargs{}), + View1; + true -> + undefined + end, + {Db, View2, StartSeq} + end, + % begin timer to deal with heartbeat when filter function fails + case Args#changes_args.heartbeat of + undefined -> + erlang:erase(last_changes_heartbeat); + Val when is_integer(Val); Val =:= true -> + put(last_changes_heartbeat, os:timestamp()) + end, + + case lists:member(Feed, ["continuous", "longpoll", "eventsource"]) of + true -> + fun(CallbackAcc) -> + {Callback, UserAcc} = get_callback_acc(CallbackAcc), + {ok, Listener} = StartListenerFun(), + + {Db, View, StartSeq} = Start(), + UserAcc2 = start_sending_changes(Callback, UserAcc), + {Timeout, TimeoutFun} = get_changes_timeout(Args, Callback), + Acc0 = build_acc(Args, Callback, UserAcc2, Db, StartSeq, + <<"">>, Timeout, TimeoutFun, DDocName, ViewName, + View), + try + keep_sending_changes( + Args#changes_args{dir=fwd}, + Acc0, + true) + after + fabric2_events:stop_listener(Listener), + get_rest_updated(ok) % clean out any remaining update messages + end + end; + false -> + fun(CallbackAcc) -> + {Callback, UserAcc} = get_callback_acc(CallbackAcc), + UserAcc2 = start_sending_changes(Callback, UserAcc), + {Timeout, TimeoutFun} = get_changes_timeout(Args, Callback), + {Db, View, StartSeq} = Start(), + Acc0 = build_acc(Args#changes_args{feed="normal"}, Callback, + UserAcc2, Db, StartSeq, <<>>, Timeout, TimeoutFun, + DDocName, ViewName, View), + {ok, #changes_acc{seq = LastSeq, user_acc = UserAcc3}} = + send_changes( + Acc0, + Dir, + true), + end_sending_changes(Callback, UserAcc3, LastSeq) + end + end. + + +handle_db_event(_DbName, updated, Parent) -> + Parent ! updated, + {ok, Parent}; +handle_db_event(_DbName, deleted, Parent) -> + Parent ! deleted, + {ok, Parent}; +handle_db_event(_DbName, _Event, Parent) -> + {ok, Parent}. + + +handle_view_event(_DbName, Msg, {Parent, DDocId}) -> + case Msg of + {index_commit, DDocId} -> + Parent ! updated; + {index_delete, DDocId} -> + Parent ! deleted; + _ -> + ok + end, + {ok, {Parent, DDocId}}. + +get_callback_acc({Callback, _UserAcc} = Pair) when is_function(Callback, 2) -> + Pair; +get_callback_acc(Callback) when is_function(Callback, 1) -> + {fun(Ev, _) -> Callback(Ev) end, ok}. + + +configure_filter("_doc_ids", Style, Req, _Db) -> + {doc_ids, Style, get_doc_ids(Req)}; +configure_filter("_selector", Style, Req, _Db) -> + {selector, Style, get_selector_and_fields(Req)}; +configure_filter("_design", Style, _Req, _Db) -> + {design_docs, Style}; +configure_filter("_view", Style, Req, Db) -> + ViewName = get_view_qs(Req), + if ViewName /= "" -> ok; true -> + throw({bad_request, "`view` filter parameter is not provided."}) + end, + ViewNameParts = string:tokens(ViewName, "/"), + case [?l2b(couch_httpd:unquote(Part)) || Part <- ViewNameParts] of + [DName, VName] -> + {ok, DDoc} = open_ddoc(Db, <<"_design/", DName/binary>>), + check_member_exists(DDoc, [<<"views">>, VName]), + FilterType = try + true = couch_util:get_nested_json_value( + DDoc#doc.body, + [<<"options">>, <<"seq_indexed">>] + ), + fast_view + catch _:_ -> + view + end, + case fabric2_db:is_clustered(Db) of + true -> + DIR = fabric_util:doc_id_and_rev(DDoc), + {fetch, FilterType, Style, DIR, VName}; + false -> + {FilterType, Style, DDoc, VName} + end; + [] -> + Msg = "`view` must be of the form `designname/viewname`", + throw({bad_request, Msg}) + end; +configure_filter([$_ | _], _Style, _Req, _Db) -> + throw({bad_request, "unknown builtin filter name"}); +configure_filter("", main_only, _Req, _Db) -> + {default, main_only}; +configure_filter("", all_docs, _Req, _Db) -> + {default, all_docs}; +configure_filter(FilterName, Style, Req, Db) -> + FilterNameParts = string:tokens(FilterName, "/"), + case [?l2b(couch_httpd:unquote(Part)) || Part <- FilterNameParts] of + [DName, FName] -> + {ok, DDoc} = open_ddoc(Db, <<"_design/", DName/binary>>), + check_member_exists(DDoc, [<<"filters">>, FName]), + {custom, Style, Req, DDoc, FName}; + [] -> + {default, Style}; + _Else -> + Msg = "`filter` must be of the form `designname/filtername`", + throw({bad_request, Msg}) + end. + + +filter(Db, Change, {default, Style}) -> + apply_style(Db, Change, Style); +filter(Db, Change, {doc_ids, Style, DocIds}) -> + case lists:member(maps:get(id, Change), DocIds) of + true -> + apply_style(Db, Change, Style); + false -> + [] + end; +filter(Db, Change, {selector, Style, {Selector, _Fields}}) -> + Docs = open_revs(Db, Change, Style), + Passes = [mango_selector:match(Selector, couch_doc:to_json_obj(Doc, [])) + || Doc <- Docs], + filter_revs(Passes, Docs); +filter(Db, Change, {design_docs, Style}) -> + case maps:get(id, Change) of + <<"_design", _/binary>> -> + apply_style(Db, Change, Style); + _ -> + [] + end; +filter(Db, Change, {FilterType, Style, DDoc, VName}) + when FilterType == view; FilterType == fast_view -> + Docs = open_revs(Db, Change, Style), + {ok, Passes} = couch_query_servers:filter_view(DDoc, VName, Docs), + filter_revs(Passes, Docs); +filter(Db, Change, {custom, Style, Req0, DDoc, FName}) -> + Req = case Req0 of + {json_req, _} -> Req0; + #httpd{} -> {json_req, chttpd_external:json_req_obj(Req0, Db)} + end, + Docs = open_revs(Db, Change, Style), + {ok, Passes} = couch_query_servers:filter_docs(Req, Db, DDoc, FName, Docs), + filter_revs(Passes, Docs); +filter(A, B, C) -> + erlang:error({filter_error, A, B, C}). + +fast_view_filter(Db, {{Seq, _}, {ID, _, _}}, {fast_view, Style, _, _}) -> + case fabric2_db:get_doc_info(Db, ID) of + {ok, #doc_info{high_seq=Seq}=DocInfo} -> + Docs = open_revs(Db, DocInfo, Style), + Changes = lists:map(fun(#doc{revs={RevPos, [RevId | _]}}) -> + RevStr = couch_doc:rev_to_str({RevPos, RevId}), + {[{<<"rev">>, RevStr}]} + end, Docs), + {DocInfo, Changes}; + {ok, #doc_info{high_seq=HighSeq}} when Seq > HighSeq -> + % If the view seq tree is out of date (or if the view seq tree + % was opened before the db) seqs may come by from the seq tree + % which correspond to the not-most-current revision of a document. + % The proper thing to do is to not send this old revision, but wait + % until we reopen the up-to-date view seq tree and continue the + % fold. + % I left the Seq > HighSeq guard in so if (for some godforsaken + % reason) the seq in the view is more current than the database, + % we'll throw an error. + {undefined, []}; + {error, not_found} -> + {undefined, []} + end. + + + +view_filter(Db, KV, {default, Style}) -> + apply_view_style(Db, KV, Style). + + +get_view_qs({json_req, {Props}}) -> + {Query} = couch_util:get_value(<<"query">>, Props, {[]}), + binary_to_list(couch_util:get_value(<<"view">>, Query, "")); +get_view_qs(Req) -> + couch_httpd:qs_value(Req, "view", ""). + +get_doc_ids({json_req, {Props}}) -> + check_docids(couch_util:get_value(<<"doc_ids">>, Props)); +get_doc_ids(#httpd{method='POST'}=Req) -> + couch_httpd:validate_ctype(Req, "application/json"), + {Props} = couch_httpd:json_body_obj(Req), + check_docids(couch_util:get_value(<<"doc_ids">>, Props)); +get_doc_ids(#httpd{method='GET'}=Req) -> + DocIds = ?JSON_DECODE(couch_httpd:qs_value(Req, "doc_ids", "null")), + check_docids(DocIds); +get_doc_ids(_) -> + throw({bad_request, no_doc_ids_provided}). + + +get_selector_and_fields({json_req, {Props}}) -> + Selector = check_selector(couch_util:get_value(<<"selector">>, Props)), + Fields = check_fields(couch_util:get_value(<<"fields">>, Props, nil)), + {Selector, Fields}; +get_selector_and_fields(#httpd{method='POST'}=Req) -> + couch_httpd:validate_ctype(Req, "application/json"), + get_selector_and_fields({json_req, couch_httpd:json_body_obj(Req)}); +get_selector_and_fields(_) -> + throw({bad_request, "Selector must be specified in POST payload"}). + + +check_docids(DocIds) when is_list(DocIds) -> + lists:foreach(fun + (DocId) when not is_binary(DocId) -> + Msg = "`doc_ids` filter parameter is not a list of doc ids.", + throw({bad_request, Msg}); + (_) -> ok + end, DocIds), + DocIds; +check_docids(_) -> + Msg = "`doc_ids` filter parameter is not a list of doc ids.", + throw({bad_request, Msg}). + + +check_selector(Selector={_}) -> + try + mango_selector:normalize(Selector) + catch + {mango_error, Mod, Reason0} -> + {_StatusCode, _Error, Reason} = mango_error:info(Mod, Reason0), + throw({bad_request, Reason}) + end; +check_selector(_Selector) -> + throw({bad_request, "Selector error: expected a JSON object"}). + + +check_fields(nil) -> + nil; +check_fields(Fields) when is_list(Fields) -> + try + {ok, Fields1} = mango_fields:new(Fields), + Fields1 + catch + {mango_error, Mod, Reason0} -> + {_StatusCode, _Error, Reason} = mango_error:info(Mod, Reason0), + throw({bad_request, Reason}) + end; +check_fields(_Fields) -> + throw({bad_request, "Selector error: fields must be JSON array"}). + + +open_ddoc(Db, DDocId) -> + case ddoc_cache:open_doc(Db, DDocId) of + {ok, _} = Resp -> Resp; + Else -> throw(Else) + end. + + +check_member_exists(#doc{body={Props}}, Path) -> + couch_util:get_nested_json_value({Props}, Path). + + +apply_style(_Db, Change, main_only) -> + #{rev_id := RevId} = Change, + [{[{<<"rev">>, couch_doc:rev_to_str(RevId)}]}]; +apply_style(Db, Change, all_docs) -> + % We have to fetch all revs for this row + #{id := DocId} = Change, + {ok, Resps} = fabric2_db:open_doc_revs(Db, DocId, all, [deleted]), + lists:flatmap(fun(Resp) -> + case Resp of + {ok, #doc{revs = {Pos, [Rev | _]}}} -> + [{[{<<"rev">>, couch_doc:rev_to_str({Pos, Rev})}]}]; + _ -> + [] + end + end, Resps); +apply_style(A, B, C) -> + erlang:error({changes_apply_style, A, B, C}). + +apply_view_style(_Db, {{_Seq, _Key}, {_ID, _Value, Rev}}, main_only) -> + [{[{<<"rev">>, couch_doc:rev_to_str(Rev)}]}]; +apply_view_style(Db, {{_Seq, _Key}, {ID, _Value, _Rev}}, all_docs) -> + case couch_db:get_doc_info(Db, ID) of + {ok, DocInfo} -> + apply_style(Db, DocInfo, all_docs); + {error, not_found} -> + [] + end. + + +open_revs(Db, Change, Style) -> + #{id := DocId} = Change, + Options = [deleted, conflicts], + try + case Style of + main_only -> + {ok, Doc} = fabric2_db:open_doc(Db, DocId, Options), + [Doc]; + all_docs -> + {ok, Docs} = fabric2_db:open_doc_revs(Db, DocId, all, Options), + [Doc || {ok, Doc} <- Docs] + end + catch _:_ -> + % We didn't log this before, should we now? + [] + end. + + +filter_revs(Passes, Docs) -> + lists:flatmap(fun + ({true, #doc{revs={RevPos, [RevId | _]}}}) -> + RevStr = couch_doc:rev_to_str({RevPos, RevId}), + Change = {[{<<"rev">>, RevStr}]}, + [Change]; + (_) -> + [] + end, lists:zip(Passes, Docs)). + + +get_changes_timeout(Args, Callback) -> + #changes_args{ + heartbeat = Heartbeat, + timeout = Timeout, + feed = ResponseType + } = Args, + DefaultTimeout = list_to_integer( + config:get("httpd", "changes_timeout", "60000") + ), + case Heartbeat of + undefined -> + case Timeout of + undefined -> + {DefaultTimeout, fun(UserAcc) -> {stop, UserAcc} end}; + infinity -> + {infinity, fun(UserAcc) -> {stop, UserAcc} end}; + _ -> + {lists:min([DefaultTimeout, Timeout]), + fun(UserAcc) -> {stop, UserAcc} end} + end; + true -> + {DefaultTimeout, + fun(UserAcc) -> {ok, Callback(timeout, ResponseType, UserAcc)} end}; + _ -> + {lists:min([DefaultTimeout, Heartbeat]), + fun(UserAcc) -> {ok, Callback(timeout, ResponseType, UserAcc)} end} + end. + +start_sending_changes(Callback, UserAcc) -> + {_, NewUserAcc} = Callback(start, UserAcc), + NewUserAcc. + +build_acc(Args, Callback, UserAcc, Db, StartSeq, Prepend, Timeout, TimeoutFun, DDocName, ViewName, View) -> + #changes_args{ + include_docs = IncludeDocs, + doc_options = DocOpts, + conflicts = Conflicts, + limit = Limit, + feed = ResponseType, + filter_fun = Filter + } = Args, + #changes_acc{ + db = Db, + seq = StartSeq, + prepend = Prepend, + filter = Filter, + callback = Callback, + user_acc = UserAcc, + resp_type = ResponseType, + limit = Limit, + include_docs = IncludeDocs, + doc_options = DocOpts, + conflicts = Conflicts, + timeout = Timeout, + timeout_fun = TimeoutFun, + ddoc_name = DDocName, + view_name = ViewName, + view = View, + aggregation_results=[], + aggregation_kvs=[] + }. + +send_changes(Acc, Dir, FirstRound) -> + #changes_acc{ + db = Db, + seq = StartSeq, + filter = Filter, + view = View + } = Acc, + DbEnumFun = fun changes_enumerator/2, + case can_optimize(FirstRound, Filter) of + {true, Fun} -> + Fun(Db, StartSeq, Dir, DbEnumFun, Acc, Filter); + _ -> + case {View, Filter} of + {#mrview{}, {fast_view, _, _, _}} -> + couch_mrview:view_changes_since(View, StartSeq, DbEnumFun, [{dir, Dir}], Acc); + {undefined, _} -> + Opts = [{dir, Dir}], + fabric2_db:fold_changes(Db, StartSeq, DbEnumFun, Acc, Opts); + {#mrview{}, _} -> + ViewEnumFun = fun view_changes_enumerator/2, + {Go, Acc0} = couch_mrview:view_changes_since(View, StartSeq, ViewEnumFun, [{dir, Dir}], Acc), + case Acc0 of + #changes_acc{aggregation_results=[]} -> + {Go, Acc0}; + _ -> + #changes_acc{ + aggregation_results = AggResults, + aggregation_kvs = AggKVs, + user_acc = UserAcc, + callback = Callback, + resp_type = ResponseType, + prepend = Prepend + } = Acc0, + ChangesRow = view_changes_row(AggResults, AggKVs, Acc0), + UserAcc0 = Callback({change, ChangesRow, Prepend}, ResponseType, UserAcc), + reset_heartbeat(), + {Go, Acc0#changes_acc{user_acc=UserAcc0}} + end + end + end. + + +can_optimize(true, {doc_ids, _Style, DocIds}) -> + MaxDocIds = config:get_integer("couchdb", + "changes_doc_ids_optimization_threshold", 100), + if length(DocIds) =< MaxDocIds -> + {true, fun send_changes_doc_ids/6}; + true -> + false + end; +can_optimize(true, {design_docs, _Style}) -> + {true, fun send_changes_design_docs/6}; +can_optimize(_, _) -> + false. + + +send_changes_doc_ids(Db, StartSeq, Dir, Fun, Acc0, {doc_ids, _Style, DocIds}) -> + Results = fabric2_db:get_full_doc_infos(Db, DocIds), + FullInfos = lists:foldl(fun + (#full_doc_info{}=FDI, Acc) -> [FDI | Acc]; + (not_found, Acc) -> Acc + end, [], Results), + send_lookup_changes(FullInfos, StartSeq, Dir, Db, Fun, Acc0). + + +send_changes_design_docs(Db, StartSeq, Dir, Fun, Acc0, {design_docs, _Style}) -> + FoldFun = fun(FDI, Acc) -> {ok, [FDI | Acc]} end, + Opts = [ + include_deleted, + {start_key, <<"_design/">>}, + {end_key_gt, <<"_design0">>} + ], + {ok, FullInfos} = couch_db:fold_docs(Db, FoldFun, [], Opts), + send_lookup_changes(FullInfos, StartSeq, Dir, Db, Fun, Acc0). + + +send_lookup_changes(FullDocInfos, StartSeq, Dir, Db, Fun, Acc0) -> + FoldFun = case Dir of + fwd -> fun lists:foldl/3; + rev -> fun lists:foldr/3 + end, + GreaterFun = case Dir of + fwd -> fun(A, B) -> A > B end; + rev -> fun(A, B) -> A =< B end + end, + DocInfos = lists:foldl(fun(FDI, Acc) -> + DI = couch_doc:to_doc_info(FDI), + case GreaterFun(DI#doc_info.high_seq, StartSeq) of + true -> [DI | Acc]; + false -> Acc + end + end, [], FullDocInfos), + SortedDocInfos = lists:keysort(#doc_info.high_seq, DocInfos), + FinalAcc = try + FoldFun(fun(DocInfo, Acc) -> + % Kinda gross that we're munging this back to a map + % that will then have to re-read and rebuild the FDI + % for all_docs style. But c'est la vie. + #doc_info{ + id = DocId, + high_seq = Seq, + revs = [#rev_info{rev = Rev, deleted = Deleted} | _] + } = DocInfo, + Change = #{ + id => DocId, + sequence => Seq, + rev_id => Rev, + deleted => Deleted + }, + case Fun(Change, Acc) of + {ok, NewAcc} -> + NewAcc; + {stop, NewAcc} -> + throw({stop, NewAcc}) + end + end, Acc0, SortedDocInfos) + catch + {stop, Acc} -> Acc + end, + case Dir of + fwd -> + FinalAcc0 = case element(1, FinalAcc) of + changes_acc -> % we came here via couch_http or internal call + FinalAcc#changes_acc{seq = fabric2_db:get_update_seq(Db)}; + fabric_changes_acc -> % we came here via chttpd / fabric / rexi + FinalAcc#fabric_changes_acc{seq = couch_db:get_update_seq(Db)} + end, + {ok, FinalAcc0}; + rev -> {ok, FinalAcc} + end. + + +keep_sending_changes(Args, Acc0, FirstRound) -> + #changes_args{ + feed = ResponseType, + limit = Limit, + db_open_options = DbOptions + } = Args, + + {ok, ChangesAcc} = send_changes(Acc0, fwd, FirstRound), + + #changes_acc{ + db = Db, callback = Callback, + timeout = Timeout, timeout_fun = TimeoutFun, seq = EndSeq, + prepend = Prepend2, user_acc = UserAcc2, limit = NewLimit, + ddoc_name = DDocName, view_name = ViewName + } = ChangesAcc, + + if Limit > NewLimit, ResponseType == "longpoll" -> + end_sending_changes(Callback, UserAcc2, EndSeq); + true -> + {Go, UserAcc3} = notify_waiting_for_updates(Callback, UserAcc2), + if Go /= ok -> end_sending_changes(Callback, UserAcc3, EndSeq); true -> + case wait_updated(Timeout, TimeoutFun, UserAcc3) of + {updated, UserAcc4} -> + UserCtx = fabric2_db:get_user_ctx(Db), + DbOptions1 = [{user_ctx, UserCtx} | DbOptions], + case fabric2_db:open(fabric2_db:name(Db), DbOptions1) of + {ok, Db2} -> + ?MODULE:keep_sending_changes( + Args#changes_args{limit=NewLimit}, + ChangesAcc#changes_acc{ + db = Db2, + view = maybe_refresh_view(Db2, DDocName, ViewName), + user_acc = UserAcc4, + seq = EndSeq, + prepend = Prepend2, + timeout = Timeout, + timeout_fun = TimeoutFun}, + false); + _Else -> + end_sending_changes(Callback, UserAcc3, EndSeq) + end; + {stop, UserAcc4} -> + end_sending_changes(Callback, UserAcc4, EndSeq) + end + end + end. + +maybe_refresh_view(_, undefined, undefined) -> + undefined; +maybe_refresh_view(Db, DDocName, ViewName) -> + DbName = couch_db:name(Db), + {ok, {_, View, _}, _, _} = couch_mrview_util:get_view(DbName, DDocName, ViewName, #mrargs{}), + View. + +notify_waiting_for_updates(Callback, UserAcc) -> + Callback(waiting_for_updates, UserAcc). + +end_sending_changes(Callback, UserAcc, EndSeq) -> + Callback({stop, EndSeq, null}, UserAcc). + +view_changes_enumerator(Value, Acc) -> + #changes_acc{ + filter = Filter, callback = Callback, prepend = Prepend, + user_acc = UserAcc, limit = Limit, resp_type = ResponseType, db = Db, + timeout = Timeout, timeout_fun = TimeoutFun, seq = CurrentSeq, + aggregation_kvs=AggKVs, aggregation_results=AggResults + } = Acc, + + Results0 = view_filter(Db, Value, Filter), + Results = [Result || Result <- Results0, Result /= null], + {{Seq, _}, _} = Value, + + Go = if (Limit =< 1) andalso Results =/= [] -> stop; true -> ok end, + + if CurrentSeq =:= Seq -> + NewAggKVs = case Results of + [] -> AggKVs; + _ -> [Value|AggKVs] + end, + {Done, UserAcc2} = maybe_heartbeat(Timeout, TimeoutFun, UserAcc), + Acc0 = Acc#changes_acc{ + seq = Seq, + user_acc = UserAcc2, + aggregation_kvs=NewAggKVs + }, + case Done of + stop -> {stop, Acc0}; + ok -> {Go, Acc0} + end; + AggResults =/= [] -> + {NewAggKVs, NewAggResults} = case Results of + [] -> {[], []}; + _ -> {[Value], Results} + end, + if ResponseType =:= "continuous" orelse ResponseType =:= "eventsource" -> + ChangesRow = view_changes_row(AggResults, AggKVs, Acc), + UserAcc2 = Callback({change, ChangesRow, <<>>}, ResponseType, UserAcc), + reset_heartbeat(), + {Go, Acc#changes_acc{ + seq = Seq, user_acc = UserAcc2, limit = Limit - 1, + aggregation_kvs=NewAggKVs, aggregation_results=NewAggResults}}; + true -> + ChangesRow = view_changes_row(AggResults, AggKVs, Acc), + UserAcc2 = Callback({change, ChangesRow, Prepend}, ResponseType, UserAcc), + reset_heartbeat(), + {Go, Acc#changes_acc{ + seq = Seq, prepend = <<",\n">>, user_acc = UserAcc2, + limit = Limit - 1, aggregation_kvs=[Value], + aggregation_results=Results}} + end; + true -> + {NewAggKVs, NewAggResults} = case Results of + [] -> {[], []}; + _ -> {[Value], Results} + end, + {Done, UserAcc2} = maybe_heartbeat(Timeout, TimeoutFun, UserAcc), + Acc0 = Acc#changes_acc{ + seq = Seq, + user_acc = UserAcc2, + aggregation_kvs=NewAggKVs, + aggregation_results=NewAggResults + }, + case Done of + stop -> {stop, Acc0}; + ok -> {Go, Acc0} + end + end. + +changes_enumerator(Change0, Acc) -> + #changes_acc{ + filter = Filter, + callback = Callback, + user_acc = UserAcc, + limit = Limit, + db = Db, + timeout = Timeout, + timeout_fun = TimeoutFun + } = Acc, + {Change1, Results0} = case Filter of + {fast_view, _, _, _} -> + fast_view_filter(Db, Change0, Filter); + _ -> + {Change0, filter(Db, Change0, Filter)} + end, + Results = [Result || Result <- Results0, Result /= null], + Seq = maps:get(sequence, Change1), + Go = if (Limit =< 1) andalso Results =/= [] -> stop; true -> ok end, + case Results of + [] -> + {Done, UserAcc2} = maybe_heartbeat(Timeout, TimeoutFun, UserAcc), + case Done of + stop -> + {stop, Acc#changes_acc{seq = Seq, user_acc = UserAcc2}}; + ok -> + {Go, Acc#changes_acc{seq = Seq, user_acc = UserAcc2}} + end; + _ -> + ChangesRow = changes_row(Results, Change1, Acc), + {UserGo, UserAcc2} = Callback({change, ChangesRow}, UserAcc), + RealGo = case UserGo of + ok -> Go; + stop -> stop + end, + reset_heartbeat(), + couch_log:error("XKCD: CHANGE SEQ: ~p", [Seq]), + {RealGo, Acc#changes_acc{ + seq = Seq, + user_acc = UserAcc2, + limit = Limit - 1 + }} + end. + + + +view_changes_row(Results, KVs, Acc) -> + {Add, Remove} = lists:foldl(fun(Row, {AddAcc, RemAcc}) -> + {{_Seq, Key}, {_Id, Value, _Rev}} = Row, + case Value of + removed -> + {AddAcc, [Key|RemAcc]}; + {dups, DupValues} -> + AddAcc1 = lists:foldl(fun(DupValue, AddAcc0) -> + [[Key, DupValue]|AddAcc0] + end, AddAcc, DupValues), + {AddAcc1, RemAcc}; + _ -> + {[[Key, Value]|AddAcc], RemAcc} + end + end, {[], []}, KVs), + + % Seq, Id, and Rev should be the same for all KVs, since we're aggregating + % by seq. + [{{Seq, _Key}, {Id, _Value, Rev}}|_] = KVs, + + {[ + {<<"seq">>, Seq}, {<<"id">>, Id}, {<<"add">>, Add}, + {<<"remove">>, Remove}, {<<"changes">>, Results} + ] ++ maybe_get_changes_doc({Id, Rev}, Acc)}. + + +changes_row(Results, Change, Acc) -> + #{ + id := Id, + sequence := Seq, + deleted := Del + } = Change, + {[ + {<<"seq">>, Seq}, + {<<"id">>, Id}, + {<<"changes">>, Results} + ] ++ deleted_item(Del) ++ maybe_get_changes_doc(Change, Acc)}. + +maybe_get_changes_doc(Value, #changes_acc{include_docs=true}=Acc) -> + #changes_acc{ + db = Db, + doc_options = DocOpts, + conflicts = Conflicts, + filter = Filter + } = Acc, + Opts = case Conflicts of + true -> [deleted, conflicts]; + false -> [deleted] + end, + load_doc(Db, Value, Opts, DocOpts, Filter); + +maybe_get_changes_doc(_Value, _Acc) -> + []. + + +load_doc(Db, Value, Opts, DocOpts, Filter) -> + case load_doc(Db, Value, Opts) of + null -> + [{doc, null}]; + Doc -> + [{doc, doc_to_json(Doc, DocOpts, Filter)}] + end. + + +load_doc(Db, Change, Opts) -> + #{ + id := Id, + rev_id := RevId + } = Change, + case fabric2_db:open_doc_revs(Db, Id, [RevId], Opts) of + {ok, [{ok, Doc}]} -> + Doc; + _ -> + null + end. + + +doc_to_json(Doc, DocOpts, {selector, _Style, {_Selector, Fields}}) + when Fields =/= nil -> + mango_fields:extract(couch_doc:to_json_obj(Doc, DocOpts), Fields); +doc_to_json(Doc, DocOpts, _Filter) -> + couch_doc:to_json_obj(Doc, DocOpts). + + +deleted_item(true) -> [{<<"deleted">>, true}]; +deleted_item(_) -> []. + +% waits for a updated msg, if there are multiple msgs, collects them. +wait_updated(Timeout, TimeoutFun, UserAcc) -> + couch_log:error("XKCD: WAITING FOR UPDATE", []), + receive + updated -> + couch_log:error("XKCD: GOT UPDATED", []), + get_rest_updated(UserAcc); + deleted -> + couch_log:error("XKCD: DB DELETED", []), + {stop, UserAcc} + after Timeout -> + {Go, UserAcc2} = TimeoutFun(UserAcc), + case Go of + ok -> + couch_log:error("XKCD: WAIT UPDATED TIMEOUT, RETRY", []), + ?MODULE:wait_updated(Timeout, TimeoutFun, UserAcc2); + stop -> + couch_log:error("XKCD: WAIT UPDATED TIMEOUT STOP", []), + {stop, UserAcc2} + end + end. + +get_rest_updated(UserAcc) -> + receive + updated -> + get_rest_updated(UserAcc) + after 0 -> + {updated, UserAcc} + end. + +reset_heartbeat() -> + case get(last_changes_heartbeat) of + undefined -> + ok; + _ -> + put(last_changes_heartbeat, os:timestamp()) + end. + +maybe_heartbeat(Timeout, TimeoutFun, Acc) -> + Before = get(last_changes_heartbeat), + case Before of + undefined -> + {ok, Acc}; + _ -> + Now = os:timestamp(), + case timer:now_diff(Now, Before) div 1000 >= Timeout of + true -> + Acc2 = TimeoutFun(Acc), + put(last_changes_heartbeat, Now), + Acc2; + false -> + {ok, Acc} + end + end. diff --git a/src/chttpd/src/chttpd_db.erl b/src/chttpd/src/chttpd_db.erl index c6404b04d..40c1a1e38 100644 --- a/src/chttpd/src/chttpd_db.erl +++ b/src/chttpd/src/chttpd_db.erl @@ -93,18 +93,13 @@ handle_changes_req(#httpd{path_parts=[_,<<"_changes">>]}=Req, _Db) -> handle_changes_req1(#httpd{}=Req, Db) -> #changes_args{filter=Raw, style=Style} = Args0 = parse_changes_query(Req), ChangesArgs = Args0#changes_args{ - filter_fun = couch_changes:configure_filter(Raw, Style, Req, Db), - db_open_options = [{user_ctx, couch_db:get_user_ctx(Db)}] + db_open_options = [{user_ctx, fabric2_db:get_user_ctx(Db)}] }, + ChangesFun = chttpd_changes:handle_db_changes(ChangesArgs, Req, Db), Max = chttpd:chunked_response_buffer_size(), case ChangesArgs#changes_args.feed of "normal" -> - T0 = os:timestamp(), - {ok, Info} = fabric:get_db_info(Db), - Suffix = mem3:shard_suffix(Db), - Etag = chttpd:make_etag({Info, Suffix}), - DeltaT = timer:now_diff(os:timestamp(), T0) / 1000, - couch_stats:update_histogram([couchdb, dbinfo], DeltaT), + Etag = <<"foo">>, chttpd:etag_respond(Req, Etag, fun() -> Acc0 = #cacc{ feed = normal, @@ -112,7 +107,7 @@ handle_changes_req1(#httpd{}=Req, Db) -> mochi = Req, threshold = Max }, - fabric:changes(Db, fun changes_callback/2, Acc0, ChangesArgs) + ChangesFun({fun changes_callback/2, Acc0}) end); Feed when Feed =:= "continuous"; Feed =:= "longpoll"; Feed =:= "eventsource" -> couch_stats:increment_counter([couchdb, httpd, clients_requesting_changes]), @@ -122,7 +117,7 @@ handle_changes_req1(#httpd{}=Req, Db) -> threshold = Max }, try - fabric:changes(Db, fun changes_callback/2, Acc0, ChangesArgs) + ChangesFun({fun changes_callback/2, Acc0}) after couch_stats:decrement_counter([couchdb, httpd, clients_requesting_changes]) end; @@ -337,7 +332,7 @@ update_partition_stats(PathParts) -> handle_design_req(#httpd{ path_parts=[_DbName, _Design, Name, <<"_",_/binary>> = Action | _Rest] }=Req, Db) -> - DbName = mem3:dbname(couch_db:name(Db)), + DbName = fabric2_db:name(Db), case ddoc_cache:open(DbName, <<"_design/", Name/binary>>) of {ok, DDoc} -> Handler = chttpd_handlers:design_handler(Action, fun bad_action_req/3), @@ -365,56 +360,33 @@ handle_design_info_req(Req, _Db, _DDoc) -> create_db_req(#httpd{}=Req, DbName) -> couch_httpd:verify_is_server_admin(Req), - N = chttpd:qs_value(Req, "n", config:get("cluster", "n", "3")), - Q = chttpd:qs_value(Req, "q", config:get("cluster", "q", "8")), - P = chttpd:qs_value(Req, "placement", config:get("cluster", "placement")), - EngineOpt = parse_engine_opt(Req), - DbProps = parse_partitioned_opt(Req), - Options = [ - {n, N}, - {q, Q}, - {placement, P}, - {props, DbProps} - ] ++ EngineOpt, DocUrl = absolute_uri(Req, "/" ++ couch_util:url_encode(DbName)), - case fabric:create_db(DbName, Options) of - ok -> - send_json(Req, 201, [{"Location", DocUrl}], {[{ok, true}]}); - accepted -> - send_json(Req, 202, [{"Location", DocUrl}], {[{ok, true}]}); - {error, file_exists} -> - chttpd:send_error(Req, file_exists); - Error -> - throw(Error) + case fabric2_db:create(DbName, []) of + {ok, _} -> + send_json(Req, 201, [{"Location", DocUrl}], {[{ok, true}]}); + {error, file_exists} -> + chttpd:send_error(Req, file_exists); + Error -> + throw(Error) end. delete_db_req(#httpd{}=Req, DbName) -> couch_httpd:verify_is_server_admin(Req), - case fabric:delete_db(DbName, []) of - ok -> - send_json(Req, 200, {[{ok, true}]}); - accepted -> - send_json(Req, 202, {[{ok, true}]}); - Error -> - throw(Error) + case fabric2_db:delete(DbName, []) of + ok -> + send_json(Req, 200, {[{ok, true}]}); + Error -> + throw(Error) end. do_db_req(#httpd{path_parts=[DbName|_], user_ctx=Ctx}=Req, Fun) -> - Shard = hd(mem3:shards(DbName)), - Props = couch_util:get_value(props, Shard#shard.opts, []), - Opts = case Ctx of - undefined -> - [{props, Props}]; - #user_ctx{} -> - [{user_ctx, Ctx}, {props, Props}] - end, - {ok, Db} = couch_db:clustered_db(DbName, Opts), + {ok, Db} = fabric2_db:open(DbName, [{user_ctx, Ctx}]), Fun(Req, Db). -db_req(#httpd{method='GET',path_parts=[DbName]}=Req, _Db) -> +db_req(#httpd{method='GET',path_parts=[_DbName]}=Req, Db) -> % measure the time required to generate the etag, see if it's worth it T0 = os:timestamp(), - {ok, DbInfo} = fabric:get_db_info(DbName), + {ok, DbInfo} = fabric2_db:get_db_info(Db), DeltaT = timer:now_diff(os:timestamp(), T0) / 1000, couch_stats:update_histogram([couchdb, dbinfo], DeltaT), send_json(Req, {DbInfo}); @@ -422,22 +394,22 @@ db_req(#httpd{method='GET',path_parts=[DbName]}=Req, _Db) -> db_req(#httpd{method='POST', path_parts=[DbName], user_ctx=Ctx}=Req, Db) -> chttpd:validate_ctype(Req, "application/json"), - W = chttpd:qs_value(Req, "w", integer_to_list(mem3:quorum(Db))), - Options = [{user_ctx,Ctx}, {w,W}], + Options = [{user_ctx,Ctx}], - Doc = couch_db:doc_from_json_obj_validate(Db, chttpd:json_body(Req)), - Doc2 = case Doc#doc.id of + Doc0 = chttpd:json_body(Req), + Doc1 = couch_doc:from_json_obj_validate(Doc0, fabric2_db:name(Db)), + Doc2 = case Doc1#doc.id of <<"">> -> - Doc#doc{id=couch_uuids:new(), revs={0, []}}; + Doc1#doc{id=couch_uuids:new(), revs={0, []}}; _ -> - Doc + Doc1 end, DocId = Doc2#doc.id, case chttpd:qs_value(Req, "batch") of "ok" -> % async_batching spawn(fun() -> - case catch(fabric:update_doc(Db, Doc2, Options)) of + case catch(fabric2_db:update_doc(Db, Doc2, Options)) of {ok, _} -> chttpd_stats:incr_writes(), ok; @@ -457,7 +429,7 @@ db_req(#httpd{method='POST', path_parts=[DbName], user_ctx=Ctx}=Req, Db) -> % normal DocUrl = absolute_uri(Req, [$/, couch_util:url_encode(DbName), $/, couch_util:url_encode(DocId)]), - case fabric:update_doc(Db, Doc2, Options) of + case fabric2_db:update_doc(Db, Doc2, Options) of {ok, NewRev} -> chttpd_stats:incr_writes(), HttpCode = 201; @@ -475,13 +447,10 @@ db_req(#httpd{method='POST', path_parts=[DbName], user_ctx=Ctx}=Req, Db) -> db_req(#httpd{path_parts=[_DbName]}=Req, _Db) -> send_method_not_allowed(Req, "DELETE,GET,HEAD,POST"); -db_req(#httpd{method='POST', path_parts=[DbName, <<"_ensure_full_commit">>], - user_ctx=Ctx}=Req, _Db) -> +db_req(#httpd{method='POST', path_parts=[_DbName, <<"_ensure_full_commit">>], + user_ctx=Ctx}=Req, Db) -> chttpd:validate_ctype(Req, "application/json"), - %% use fabric call to trigger a database_does_not_exist exception - %% for missing databases that'd return error 404 from chttpd - %% get_security used to prefer shards on the same node over other nodes - fabric:get_security(DbName, [{user_ctx, Ctx}]), + #{db_prefix := <<_/binary>>} = Db, send_json(Req, 201, {[ {ok, true}, {instance_start_time, <<"0">>} @@ -503,22 +472,17 @@ db_req(#httpd{method='POST',path_parts=[_,<<"_bulk_docs">>], user_ctx=Ctx}=Req, DocsArray0 end, couch_stats:update_histogram([couchdb, httpd, bulk_docs], length(DocsArray)), - W = case couch_util:get_value(<<"w">>, JsonProps) of - Value when is_integer(Value) -> - integer_to_list(Value); - _ -> - chttpd:qs_value(Req, "w", integer_to_list(mem3:quorum(Db))) - end, case chttpd:header_value(Req, "X-Couch-Full-Commit") of "true" -> - Options = [full_commit, {user_ctx,Ctx}, {w,W}]; + Options = [full_commit, {user_ctx,Ctx}]; "false" -> - Options = [delay_commit, {user_ctx,Ctx}, {w,W}]; + Options = [delay_commit, {user_ctx,Ctx}]; _ -> - Options = [{user_ctx,Ctx}, {w,W}] + Options = [{user_ctx,Ctx}] end, + DbName = fabric2_db:name(Db), Docs = lists:map(fun(JsonObj) -> - Doc = couch_db:doc_from_json_obj_validate(Db, JsonObj), + Doc = couch_doc:from_json_obj_validate(JsonObj, DbName), validate_attachment_names(Doc), case Doc#doc.id of <<>> -> Doc#doc{id = couch_uuids:new()}; @@ -532,7 +496,7 @@ db_req(#httpd{method='POST',path_parts=[_,<<"_bulk_docs">>], user_ctx=Ctx}=Req, true -> [all_or_nothing|Options]; _ -> Options end, - case fabric:update_docs(Db, Docs, Options2) of + case fabric2_db:update_docs(Db, Docs, Options2) of {ok, Results} -> % output the results chttpd_stats:incr_writes(length(Results)), @@ -551,7 +515,7 @@ db_req(#httpd{method='POST',path_parts=[_,<<"_bulk_docs">>], user_ctx=Ctx}=Req, send_json(Req, 417, ErrorsJson) end; false -> - case fabric:update_docs(Db, Docs, [replicated_changes|Options]) of + case fabric2_db:update_docs(Db, Docs, [replicated_changes|Options]) of {ok, Errors} -> chttpd_stats:incr_writes(length(Docs)), ErrorsJson = lists:map(fun update_doc_result_to_json/1, Errors), @@ -647,8 +611,7 @@ db_req(#httpd{path_parts=[_, <<"_bulk_get">>]}=Req, _Db) -> db_req(#httpd{method='POST',path_parts=[_,<<"_purge">>]}=Req, Db) -> couch_stats:increment_counter([couchdb, httpd, purge_requests]), chttpd:validate_ctype(Req, "application/json"), - W = chttpd:qs_value(Req, "w", integer_to_list(mem3:quorum(Db))), - Options = [{user_ctx, Req#httpd.user_ctx}, {w, W}], + Options = [{user_ctx, Req#httpd.user_ctx}], {IdsRevs} = chttpd:json_body_obj(Req), IdsRevs2 = [{Id, couch_doc:parse_revs(Revs)} || {Id, Revs} <- IdsRevs], MaxIds = config:get_integer("purge", "max_document_id_number", 100), @@ -723,7 +686,7 @@ db_req(#httpd{path_parts=[_,OP]}=Req, _Db) when ?IS_ALL_DOCS(OP) -> db_req(#httpd{method='POST',path_parts=[_,<<"_missing_revs">>]}=Req, Db) -> chttpd:validate_ctype(Req, "application/json"), {JsonDocIdRevs} = chttpd:json_body_obj(Req), - case fabric:get_missing_revs(Db, JsonDocIdRevs) of + case fabric2_db:get_missing_revs(Db, JsonDocIdRevs) of {error, Reason} -> chttpd:send_error(Req, Reason); {ok, Results} -> @@ -740,7 +703,7 @@ db_req(#httpd{path_parts=[_,<<"_missing_revs">>]}=Req, _Db) -> db_req(#httpd{method='POST',path_parts=[_,<<"_revs_diff">>]}=Req, Db) -> chttpd:validate_ctype(Req, "application/json"), {JsonDocIdRevs} = chttpd:json_body_obj(Req), - case fabric:get_missing_revs(Db, JsonDocIdRevs) of + case fabric2_db:get_missing_revs(Db, JsonDocIdRevs) of {error, Reason} -> chttpd:send_error(Req, Reason); {ok, Results} -> @@ -856,22 +819,22 @@ multi_all_docs_view(Req, Db, OP, Queries) -> 200, [], FirstChunk), VAcc1 = VAcc0#vacc{resp=Resp0}, VAcc2 = lists:foldl(fun(Args, Acc0) -> - {ok, Acc1} = fabric:all_docs(Db, Options, + {ok, Acc1} = fabric2_db:fold_docs(Db, Options, fun view_cb/2, Acc0, Args), Acc1 end, VAcc1, ArgQueries), {ok, Resp1} = chttpd:send_delayed_chunk(VAcc2#vacc.resp, "\r\n]}"), chttpd:end_delayed_json_response(Resp1). -all_docs_view(Req, Db, Keys, OP) -> - Args0 = couch_mrview_http:parse_params(Req, Keys), - Args1 = Args0#mrargs{view_type=map}, - Args2 = fabric_util:validate_all_docs_args(Db, Args1), - Args3 = set_namespace(OP, Args2), +all_docs_view(Req, Db, _Keys, _OP) -> + % Args0 = couch_mrview_http:parse_params(Req, Keys), + % Args1 = Args0#mrargs{view_type=map}, + % Args2 = fabric_util:validate_all_docs_args(Db, Args1), + % Args3 = set_namespace(OP, Args2), Options = [{user_ctx, Req#httpd.user_ctx}], Max = chttpd:chunked_response_buffer_size(), VAcc = #vacc{db=Db, req=Req, threshold=Max}, - {ok, Resp} = fabric:all_docs(Db, Options, fun view_cb/2, VAcc, Args3), + {ok, Resp} = fabric2_db:fold_docs(Db, fun view_cb/2, VAcc, Options), {ok, Resp#vacc.resp}. view_cb({row, Row} = Msg, Acc) -> @@ -915,7 +878,7 @@ db_doc_req(#httpd{method='GET', mochi_req=MochiReq}=Req, Db, DocId) -> Doc = couch_doc_open(Db, DocId, Rev, Options2), send_doc(Req, Doc, Options2); _ -> - case fabric:open_revs(Db, DocId, Revs, Options) of + case fabric2_db:open_doc_revs(Db, DocId, Revs, Options) of {ok, []} when Revs == all -> chttpd:send_error(Req, {not_found, missing}); {ok, Results} -> @@ -956,8 +919,7 @@ db_doc_req(#httpd{method='POST', user_ctx=Ctx}=Req, Db, DocId) -> couch_db:validate_docid(Db, DocId), chttpd:validate_ctype(Req, "multipart/form-data"), - W = chttpd:qs_value(Req, "w", integer_to_list(mem3:quorum(Db))), - Options = [{user_ctx,Ctx}, {w,W}], + Options = [{user_ctx,Ctx}], Form = couch_httpd:parse_form(Req), case proplists:is_defined("_doc", Form) of @@ -966,7 +928,7 @@ db_doc_req(#httpd{method='POST', user_ctx=Ctx}=Req, Db, DocId) -> Doc = couch_doc_from_req(Req, Db, DocId, Json); false -> Rev = couch_doc:parse_rev(list_to_binary(couch_util:get_value("_rev", Form))), - Doc = case fabric:open_revs(Db, DocId, [Rev], []) of + Doc = case fabric2_db:open_doc_revs(Db, DocId, [Rev], []) of {ok, [{ok, Doc0}]} -> chttpd_stats:incr_reads(), Doc0; @@ -995,7 +957,7 @@ db_doc_req(#httpd{method='POST', user_ctx=Ctx}=Req, Db, DocId) -> NewDoc = Doc#doc{ atts = UpdatedAtts ++ OldAtts2 }, - case fabric:update_doc(Db, NewDoc, Options) of + case fabric2_db:update_doc(Db, NewDoc, Options) of {ok, NewRev} -> chttpd_stats:incr_writes(), HttpCode = 201; @@ -1013,11 +975,10 @@ db_doc_req(#httpd{method='PUT', user_ctx=Ctx}=Req, Db, DocId) -> #doc_query_args{ update_type = UpdateType } = parse_doc_query(Req), - DbName = couch_db:name(Db), - couch_db:validate_docid(Db, DocId), + DbName = fabric2_db:name(Db), + couch_doc:validate_docid(DocId), - W = chttpd:qs_value(Req, "w", integer_to_list(mem3:quorum(Db))), - Options = [{user_ctx,Ctx}, {w,W}], + Options = [{user_ctx, Ctx}], Loc = absolute_uri(Req, [$/, couch_util:url_encode(DbName), $/, couch_util:url_encode(DocId)]), @@ -1025,7 +986,7 @@ db_doc_req(#httpd{method='PUT', user_ctx=Ctx}=Req, Db, DocId) -> case couch_util:to_list(couch_httpd:header_value(Req, "Content-Type")) of ("multipart/related;" ++ _) = ContentType -> couch_httpd:check_max_request_length(Req), - couch_httpd_multipart:num_mp_writers(mem3:n(mem3:dbname(DbName), DocId)), + couch_httpd_multipart:num_mp_writers(1), {ok, Doc0, WaitFun, Parser} = couch_doc:doc_from_multi_part_stream(ContentType, fun() -> receive_request_data(Req) end), Doc = couch_doc_from_req(Req, Db, DocId, Doc0), @@ -1045,7 +1006,7 @@ db_doc_req(#httpd{method='PUT', user_ctx=Ctx}=Req, Db, DocId) -> Doc = couch_doc_from_req(Req, Db, DocId, chttpd:json_body(Req)), spawn(fun() -> - case catch(fabric:update_doc(Db, Doc, Options)) of + case catch(fabric2_db:update_doc(Db, Doc, Options)) of {ok, _} -> chttpd_stats:incr_writes(), ok; @@ -1079,7 +1040,7 @@ db_doc_req(#httpd{method='COPY', user_ctx=Ctx}=Req, Db, SourceDocId) -> % open old doc Doc = couch_doc_open(Db, SourceDocId, SourceRev, []), % save new doc - case fabric:update_doc(Db, + case fabric2_db:update_doc(Db, Doc#doc{id=TargetDocId, revs=TargetRevs}, [{user_ctx,Ctx}]) of {ok, NewTargetRev} -> chttpd_stats:incr_writes(), @@ -1180,7 +1141,7 @@ send_docs_multipart(Req, Results, Options1) -> CType = {"Content-Type", "multipart/mixed; boundary=\"" ++ ?b2l(OuterBoundary) ++ "\""}, {ok, Resp} = start_chunked_response(Req, 200, [CType]), - couch_httpd:send_chunk(Resp, <<"--", OuterBoundary/binary>>), + chttpd:send_chunk(Resp, <<"--", OuterBoundary/binary>>), lists:foreach( fun({ok, #doc{atts=Atts}=Doc}) -> Refs = monitor_attachments(Doc#doc.atts), @@ -1188,25 +1149,25 @@ send_docs_multipart(Req, Results, Options1) -> JsonBytes = ?JSON_ENCODE(couch_doc:to_json_obj(Doc, Options)), {ContentType, _Len} = couch_doc:len_doc_to_multi_part_stream( InnerBoundary, JsonBytes, Atts, true), - couch_httpd:send_chunk(Resp, <<"\r\nContent-Type: ", + chttpd:send_chunk(Resp, <<"\r\nContent-Type: ", ContentType/binary, "\r\n\r\n">>), couch_doc:doc_to_multi_part_stream(InnerBoundary, JsonBytes, Atts, - fun(Data) -> couch_httpd:send_chunk(Resp, Data) + fun(Data) -> chttpd:send_chunk(Resp, Data) end, true), - couch_httpd:send_chunk(Resp, <<"\r\n--", OuterBoundary/binary>>) + chttpd:send_chunk(Resp, <<"\r\n--", OuterBoundary/binary>>) after demonitor_refs(Refs) end; ({{not_found, missing}, RevId}) -> RevStr = couch_doc:rev_to_str(RevId), Json = ?JSON_ENCODE({[{<<"missing">>, RevStr}]}), - couch_httpd:send_chunk(Resp, + chttpd:send_chunk(Resp, [<<"\r\nContent-Type: application/json; error=\"true\"\r\n\r\n">>, Json, <<"\r\n--", OuterBoundary/binary>>]) end, Results), - couch_httpd:send_chunk(Resp, <<"--">>), - couch_httpd:last_chunk(Resp). + chttpd:send_chunk(Resp, <<"--">>), + chttpd:last_chunk(Resp). bulk_get_multipart_headers({0, []}, Id, Boundary) -> [ @@ -1276,15 +1237,14 @@ send_updated_doc(Req, Db, DocId, Doc, Headers) -> send_updated_doc(#httpd{user_ctx=Ctx} = Req, Db, DocId, #doc{deleted=Deleted}=Doc, Headers, UpdateType) -> - W = chttpd:qs_value(Req, "w", integer_to_list(mem3:quorum(Db))), Options = case couch_httpd:header_value(Req, "X-Couch-Full-Commit") of "true" -> - [full_commit, UpdateType, {user_ctx,Ctx}, {w,W}]; + [full_commit, UpdateType, {user_ctx,Ctx}]; "false" -> - [delay_commit, UpdateType, {user_ctx,Ctx}, {w,W}]; + [delay_commit, UpdateType, {user_ctx,Ctx}]; _ -> - [UpdateType, {user_ctx,Ctx}, {w,W}] + [UpdateType, {user_ctx,Ctx}] end, {Status, {etag, Etag}, Body} = update_doc(Db, DocId, #doc{deleted=Deleted}=Doc, Options), @@ -1303,31 +1263,7 @@ http_code_from_status(Status) -> end. update_doc(Db, DocId, #doc{deleted=Deleted, body=DocBody}=Doc, Options) -> - {_, Ref} = spawn_monitor(fun() -> - try fabric:update_doc(Db, Doc, Options) of - Resp -> - exit({exit_ok, Resp}) - catch - throw:Reason -> - exit({exit_throw, Reason}); - error:Reason -> - exit({exit_error, Reason}); - exit:Reason -> - exit({exit_exit, Reason}) - end - end), - Result = receive - {'DOWN', Ref, _, _, {exit_ok, Ret}} -> - Ret; - {'DOWN', Ref, _, _, {exit_throw, Reason}} -> - throw(Reason); - {'DOWN', Ref, _, _, {exit_error, Reason}} -> - erlang:error(Reason); - {'DOWN', Ref, _, _, {exit_exit, Reason}} -> - erlang:exit(Reason) - end, - - case Result of + case fabric2_db:update_doc(Db, Doc, Options) of {ok, NewRev} -> Accepted = false; {accepted, NewRev} -> @@ -1374,7 +1310,7 @@ couch_doc_from_req(Req, _Db, DocId, #doc{revs=Revs} = Doc) -> end, Doc#doc{id=DocId, revs=Revs2}; couch_doc_from_req(Req, Db, DocId, Json) -> - Doc = couch_db:doc_from_json_obj_validate(Db, Json), + Doc = couch_doc:from_json_obj_validate(Json, fabric2_db:name(Db)), couch_doc_from_req(Req, Db, DocId, Doc). @@ -1382,11 +1318,10 @@ couch_doc_from_req(Req, Db, DocId, Json) -> % couch_doc_open(Db, DocId) -> % couch_doc_open(Db, DocId, nil, []). -couch_doc_open(Db, DocId, Rev, Options0) -> - Options = [{user_ctx, couch_db:get_user_ctx(Db)} | Options0], +couch_doc_open(Db, DocId, Rev, Options) -> case Rev of nil -> % open most recent rev - case fabric:open_doc(Db, DocId, Options) of + case fabric2_db:open_doc(Db, DocId, Options) of {ok, Doc} -> chttpd_stats:incr_reads(), Doc; @@ -1394,7 +1329,7 @@ couch_doc_open(Db, DocId, Rev, Options0) -> throw(Error) end; _ -> % open a specific rev (deletions come back as stubs) - case fabric:open_revs(Db, DocId, [Rev], Options) of + case fabric2_db:open_doc_revs(Db, DocId, [Rev], Options) of {ok, [{ok, Doc}]} -> chttpd_stats:incr_reads(), Doc; @@ -1515,8 +1450,12 @@ db_attachment_req(#httpd{method='GET',mochi_req=MochiReq}=Req, Db, DocId, FileNa end; -db_attachment_req(#httpd{method=Method, user_ctx=Ctx}=Req, Db, DocId, FileNameParts) +db_attachment_req(#httpd{method=Method}=Req, Db, DocId, FileNameParts) when (Method == 'PUT') or (Method == 'DELETE') -> + #httpd{ + user_ctx = Ctx, + mochi_req = MochiReq + } = Req, FileName = validate_attachment_name( mochiweb_util:join( lists:map(fun binary_to_list/1, @@ -1526,16 +1465,45 @@ db_attachment_req(#httpd{method=Method, user_ctx=Ctx}=Req, Db, DocId, FileNamePa 'DELETE' -> []; _ -> - MimeType = case couch_httpd:header_value(Req,"Content-Type") of + MimeType = case chttpd:header_value(Req,"Content-Type") of % We could throw an error here or guess by the FileName. % Currently, just giving it a default. undefined -> <<"application/octet-stream">>; CType -> list_to_binary(CType) end, - Data = fabric:att_receiver(Req, chttpd:body_length(Req)), + Data = case chttpd:body_length(Req) of + undefined -> + <<"">>; + {unknown_transfer_encoding, Unknown} -> + exit({unknown_transfer_encoding, Unknown}); + chunked -> + fun(MaxChunkSize, ChunkFun, InitState) -> + chttpd:recv_chunked( + Req, MaxChunkSize, ChunkFun, InitState + ) + end; + 0 -> + <<"">>; + Length when is_integer(Length) -> + Expect = case chttpd:header_value(Req, "expect") of + undefined -> + undefined; + Value when is_list(Value) -> + string:to_lower(Value) + end, + case Expect of + "100-continue" -> + MochiReq:start_raw_response({100, gb_trees:empty()}); + _Else -> + ok + end, + fun() -> chttpd:recv(Req, 0) end; + Length -> + exit({length_not_integer, Length}) + end, ContentLen = case couch_httpd:header_value(Req,"Content-Length") of undefined -> undefined; - Length -> list_to_integer(Length) + CL -> list_to_integer(CL) end, ContentEnc = string:to_lower(string:strip( couch_httpd:header_value(Req, "Content-Encoding", "identity") @@ -1570,7 +1538,7 @@ db_attachment_req(#httpd{method=Method, user_ctx=Ctx}=Req, Db, DocId, FileNamePa couch_db:validate_docid(Db, DocId), #doc{id=DocId}; Rev -> - case fabric:open_revs(Db, DocId, [Rev], [{user_ctx,Ctx}]) of + case fabric2_db:open_doc_revs(Db, DocId, [Rev], [{user_ctx,Ctx}]) of {ok, [{ok, Doc0}]} -> chttpd_stats:incr_reads(), Doc0; @@ -1585,8 +1553,7 @@ db_attachment_req(#httpd{method=Method, user_ctx=Ctx}=Req, Db, DocId, FileNamePa DocEdited = Doc#doc{ atts = NewAtt ++ [A || A <- Atts, couch_att:fetch(name, A) /= FileName] }, - W = chttpd:qs_value(Req, "w", integer_to_list(mem3:quorum(Db))), - case fabric:update_doc(Db, DocEdited, [{user_ctx,Ctx}, {w,W}]) of + case fabric2_db:update_doc(Db, DocEdited, [{user_ctx,Ctx}]) of {ok, UpdatedRev} -> chttpd_stats:incr_writes(), HttpCode = 201; @@ -1595,7 +1562,7 @@ db_attachment_req(#httpd{method=Method, user_ctx=Ctx}=Req, Db, DocId, FileNamePa HttpCode = 202 end, erlang:put(mochiweb_request_recv, true), - DbName = couch_db:name(Db), + DbName = fabric2_db:name(Db), {Status, Headers} = case Method of 'DELETE' -> @@ -1682,46 +1649,6 @@ get_md5_header(Req) -> parse_doc_query(Req) -> lists:foldl(fun parse_doc_query/2, #doc_query_args{}, chttpd:qs(Req)). -parse_engine_opt(Req) -> - case chttpd:qs_value(Req, "engine") of - undefined -> - []; - Extension -> - Available = couch_server:get_engine_extensions(), - case lists:member(Extension, Available) of - true -> - [{engine, iolist_to_binary(Extension)}]; - false -> - throw({bad_request, invalid_engine_extension}) - end - end. - - -parse_partitioned_opt(Req) -> - case chttpd:qs_value(Req, "partitioned") of - undefined -> - []; - "false" -> - []; - "true" -> - ok = validate_partitioned_db_enabled(Req), - [ - {partitioned, true}, - {hash, [couch_partition, hash, []]} - ]; - _ -> - throw({bad_request, <<"Invalid `partitioned` parameter">>}) - end. - - -validate_partitioned_db_enabled(Req) -> - case couch_flags:is_enabled(partitioned, Req) of - true -> - ok; - false -> - throw({bad_request, <<"Partitioned feature is not enabled.">>}) - end. - parse_doc_query({Key, Value}, Args) -> case {Key, Value} of @@ -1791,7 +1718,7 @@ parse_changes_query(Req) -> {"descending", "true"} -> Args#changes_args{dir=rev}; {"since", _} -> - Args#changes_args{since=Value}; + Args#changes_args{since=parse_since_seq(Value)}; {"last-event-id", _} -> Args#changes_args{since=Value}; {"limit", _} -> @@ -1845,6 +1772,27 @@ parse_changes_query(Req) -> ChangesArgs end. + +parse_since_seq(Seq) when is_binary(Seq), size(Seq) > 30 -> + throw({bad_request, url_encoded_since_seq}); + +parse_since_seq(Seq) when is_binary(Seq), size(Seq) > 2 -> + % We have implicitly allowed the since seq to either be + % JSON encoded or a "raw" string. Here we just remove the + % surrounding quotes if they exist and are paired. + SeqSize = size(Seq) - 2, + case Seq of + <<"\"", S:SeqSize/binary, "\"">> -> S; + S -> S + end; + +parse_since_seq(Seq) when is_binary(Seq) -> + Seq; + +parse_since_seq(Seq) when is_list(Seq) -> + parse_since_seq(iolist_to_binary(Seq)). + + extract_header_rev(Req, ExplicitRev) when is_binary(ExplicitRev) or is_list(ExplicitRev)-> extract_header_rev(Req, couch_doc:parse_rev(ExplicitRev)); extract_header_rev(Req, ExplicitRev) -> @@ -1885,6 +1833,8 @@ monitor_attachments(Atts) when is_list(Atts) -> case couch_att:fetch(data, Att) of {Fd, _} -> [monitor(process, Fd) | Monitors]; + {loc, _, _, _} -> + Monitors; stub -> Monitors; Else -> @@ -1982,7 +1932,7 @@ bulk_get_open_doc_revs1(Db, Props, Options, {DocId, Revs}) -> bulk_get_open_doc_revs1(Db, Props, Options, {DocId, Revs, Options1}) end; bulk_get_open_doc_revs1(Db, Props, _, {DocId, Revs, Options}) -> - case fabric:open_revs(Db, DocId, Revs, Options) of + case fabric2_db:open_doc_revs(Db, DocId, Revs, Options) of {ok, []} -> RevStr = couch_util:get_value(<<"rev">>, Props), Error = {RevStr, <<"not_found">>, <<"missing">>}, diff --git a/src/chttpd/src/chttpd_external.erl b/src/chttpd/src/chttpd_external.erl index fa35c6ba2..3e59ffe4e 100644 --- a/src/chttpd/src/chttpd_external.erl +++ b/src/chttpd/src/chttpd_external.erl @@ -74,7 +74,7 @@ json_req_obj_fields() -> <<"peer">>, <<"form">>, <<"cookie">>, <<"userCtx">>, <<"secObj">>]. json_req_obj_field(<<"info">>, #httpd{}, Db, _DocId) -> - {ok, Info} = get_db_info(Db), + {ok, Info} = fabric2_db:get_db_info(Db), {Info}; json_req_obj_field(<<"uuid">>, #httpd{}, _Db, _DocId) -> couch_uuids:new(); @@ -117,27 +117,18 @@ json_req_obj_field(<<"form">>, #httpd{mochi_req=Req, method=Method}=HttpReq, Db, json_req_obj_field(<<"cookie">>, #httpd{mochi_req=Req}, _Db, _DocId) -> to_json_terms(Req:parse_cookie()); json_req_obj_field(<<"userCtx">>, #httpd{}, Db, _DocId) -> - couch_util:json_user_ctx(Db); -json_req_obj_field(<<"secObj">>, #httpd{user_ctx=UserCtx}, Db, _DocId) -> - get_db_security(Db, UserCtx). - - -get_db_info(Db) -> - case couch_db:is_clustered(Db) of - true -> - fabric:get_db_info(Db); - false -> - couch_db:get_db_info(Db) - end. - - -get_db_security(Db, #user_ctx{}) -> - case couch_db:is_clustered(Db) of - true -> - fabric:get_security(Db); - false -> - couch_db:get_security(Db) - end. + json_user_ctx(Db); +json_req_obj_field(<<"secObj">>, #httpd{user_ctx = #user_ctx{}}, Db, _DocId) -> + fabric2_db:get_security(Db). + + +json_user_ctx(Db) -> + Ctx = fabric2_db:get_user_ctx(Db), + {[ + {<<"db">>, fabric2_db:name(Db)}, + {<<"name">>, Ctx#user_ctx.name}, + {<<"roles">>, Ctx#user_ctx.roles} + ]}. to_json_terms(Data) -> diff --git a/src/chttpd/src/chttpd_misc.erl b/src/chttpd/src/chttpd_misc.erl index 819d7820e..b244e84f6 100644 --- a/src/chttpd/src/chttpd_misc.erl +++ b/src/chttpd/src/chttpd_misc.erl @@ -108,43 +108,39 @@ maybe_add_csp_headers(Headers, _) -> Headers. handle_all_dbs_req(#httpd{method='GET'}=Req) -> - Args = couch_mrview_http:parse_params(Req, undefined), - ShardDbName = config:get("mem3", "shards_db", "_dbs"), - %% shard_db is not sharded but mem3:shards treats it as an edge case - %% so it can be pushed thru fabric - {ok, Info} = fabric:get_db_info(ShardDbName), - Etag = couch_httpd:make_etag({Info}), - Options = [{user_ctx, Req#httpd.user_ctx}], + % TODO: Support args and options properly, transform + % this back into a fold call similar to the old + % version. + %% Args = couch_mrview_http:parse_params(Req, undefined), + % Eventually the Etag for this request will be derived + % from the \xFFmetadataVersion key in fdb + Etag = <<"foo">>, + %% Options = [{user_ctx, Req#httpd.user_ctx}], {ok, Resp} = chttpd:etag_respond(Req, Etag, fun() -> - {ok, Resp} = chttpd:start_delayed_json_response(Req, 200, [{"ETag",Etag}]), - VAcc = #vacc{req=Req,resp=Resp}, - fabric:all_docs(ShardDbName, Options, fun all_dbs_callback/2, VAcc, Args) - end), - case is_record(Resp, vacc) of - true -> {ok, Resp#vacc.resp}; - _ -> {ok, Resp} - end; + AllDbs = fabric2_db:list_dbs(), + chttpd:send_json(Req, AllDbs) + end); handle_all_dbs_req(Req) -> send_method_not_allowed(Req, "GET,HEAD"). -all_dbs_callback({meta, _Meta}, #vacc{resp=Resp0}=Acc) -> - {ok, Resp1} = chttpd:send_delayed_chunk(Resp0, "["), - {ok, Acc#vacc{resp=Resp1}}; -all_dbs_callback({row, Row}, #vacc{resp=Resp0}=Acc) -> - Prepend = couch_mrview_http:prepend_val(Acc), - case couch_util:get_value(id, Row) of <<"_design", _/binary>> -> - {ok, Acc}; - DbName -> - {ok, Resp1} = chttpd:send_delayed_chunk(Resp0, [Prepend, ?JSON_ENCODE(DbName)]), - {ok, Acc#vacc{prepend=",", resp=Resp1}} - end; -all_dbs_callback(complete, #vacc{resp=Resp0}=Acc) -> - {ok, Resp1} = chttpd:send_delayed_chunk(Resp0, "]"), - {ok, Resp2} = chttpd:end_delayed_json_response(Resp1), - {ok, Acc#vacc{resp=Resp2}}; -all_dbs_callback({error, Reason}, #vacc{resp=Resp0}=Acc) -> - {ok, Resp1} = chttpd:send_delayed_error(Resp0, Reason), - {ok, Acc#vacc{resp=Resp1}}. +%% all_dbs_callback({meta, _Meta}, #vacc{resp=Resp0}=Acc) -> +%% {ok, Resp1} = chttpd:send_delayed_chunk(Resp0, "["), +%% {ok, Acc#vacc{resp=Resp1}}; +%% all_dbs_callback({row, Row}, #vacc{resp=Resp0}=Acc) -> +%% Prepend = couch_mrview_http:prepend_val(Acc), +%% case couch_util:get_value(id, Row) of <<"_design", _/binary>> -> +%% {ok, Acc}; +%% DbName -> +%% {ok, Resp1} = chttpd:send_delayed_chunk(Resp0, [Prepend, ?JSON_ENCODE(DbName)]), +%% {ok, Acc#vacc{prepend=",", resp=Resp1}} +%% end; +%% all_dbs_callback(complete, #vacc{resp=Resp0}=Acc) -> +%% {ok, Resp1} = chttpd:send_delayed_chunk(Resp0, "]"), +%% {ok, Resp2} = chttpd:end_delayed_json_response(Resp1), +%% {ok, Acc#vacc{resp=Resp2}}; +%% all_dbs_callback({error, Reason}, #vacc{resp=Resp0}=Acc) -> +%% {ok, Resp1} = chttpd:send_delayed_error(Resp0, Reason), +%% {ok, Acc#vacc{resp=Resp1}}. handle_dbs_info_req(#httpd{method='POST'}=Req) -> chttpd:validate_ctype(Req, "application/json"), diff --git a/src/chttpd/src/chttpd_show.erl b/src/chttpd/src/chttpd_show.erl index c3bf11929..2eb6dc3f8 100644 --- a/src/chttpd/src/chttpd_show.erl +++ b/src/chttpd/src/chttpd_show.erl @@ -123,15 +123,14 @@ send_doc_update_response(Req, Db, DDoc, UpdateName, Doc, DocId) -> JsonReq = chttpd_external:json_req_obj(Req, Db, DocId), JsonDoc = couch_query_servers:json_doc(Doc), Cmd = [<<"updates">>, UpdateName], - W = chttpd:qs_value(Req, "w", integer_to_list(mem3:quorum(Db))), UpdateResp = couch_query_servers:ddoc_prompt(DDoc, Cmd, [JsonDoc, JsonReq]), JsonResp = case UpdateResp of [<<"up">>, {NewJsonDoc}, {JsonResp0}] -> case chttpd:header_value(Req, "X-Couch-Full-Commit", "false") of "true" -> - Options = [full_commit, {user_ctx, Req#httpd.user_ctx}, {w, W}]; + Options = [full_commit, {user_ctx, Req#httpd.user_ctx}]; _ -> - Options = [{user_ctx, Req#httpd.user_ctx}, {w, W}] + Options = [{user_ctx, Req#httpd.user_ctx}] end, NewDoc = couch_db:doc_from_json_obj_validate(Db, {NewJsonDoc}), couch_doc:validate_docid(NewDoc#doc.id), diff --git a/src/couch_mrview/src/couch_mrview.erl b/src/couch_mrview/src/couch_mrview.erl index ae1d8d6f5..cf6f27fde 100644 --- a/src/couch_mrview/src/couch_mrview.erl +++ b/src/couch_mrview/src/couch_mrview.erl @@ -173,8 +173,18 @@ join([H|[]], _, Acc) -> join([H|T], Sep, Acc) -> join(T, Sep, [Sep, H | Acc]). +validate(#{} = Db, DDoc) -> + DbName = fabric2_db:name(Db), + IsPartitioned = fabric2_db:is_partitioned(Db), + validate(DbName, IsPartitioned, DDoc); -validate(Db, DDoc) -> +validate(Db, DDoc) -> + DbName = couch_db:name(Db), + IsPartitioned = couch_db:is_partitioned(Db), + validate(DbName, IsPartitioned, DDoc). + + +validate(DbName, IsDbPartitioned, DDoc) -> ok = validate_ddoc_fields(DDoc#doc.body), GetName = fun (#mrview{map_names = [Name | _]}) -> Name; @@ -203,9 +213,9 @@ validate(Db, DDoc) -> language = Lang, views = Views, partitioned = Partitioned - }} = couch_mrview_util:ddoc_to_mrst(couch_db:name(Db), DDoc), + }} = couch_mrview_util:ddoc_to_mrst(DbName, DDoc), - case {couch_db:is_partitioned(Db), Partitioned} of + case {IsDbPartitioned, Partitioned} of {false, true} -> throw({invalid_design_doc, <<"partitioned option cannot be true in a " diff --git a/test/elixir/test/basics_test.exs b/test/elixir/test/basics_test.exs index 3491ef5a3..c28c78c81 100644 --- a/test/elixir/test/basics_test.exs +++ b/test/elixir/test/basics_test.exs @@ -100,7 +100,7 @@ defmodule BasicsTest do db_name = context[:db_name] {:ok, _} = create_doc(db_name, sample_doc_foo()) resp = Couch.get("/#{db_name}/foo", query: %{:local_seq => true}) - assert resp.body["_local_seq"] == 1, "Local seq value == 1" + assert is_binary(resp.body["_local_seq"]), "Local seq value is a binary" end @tag :with_db |