summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPaul J. Davis <paul.joseph.davis@gmail.com>2019-06-05 13:43:20 -0500
committerPaul J. Davis <paul.joseph.davis@gmail.com>2019-07-31 11:55:30 -0500
commit0cf5f463d0b73427656cea4465582f5102508627 (patch)
tree1906369a0695f6f0d093aa403563f9f7ad1fcdde
parent9083da6c46dedc0c63d597cd12c8ea13bd9eb304 (diff)
downloadcouchdb-0cf5f463d0b73427656cea4465582f5102508627.tar.gz
Start switching chttpd HTTP endpoints to fabric2
This is not an exhaustive port of the entire chttpd API. However, this is enough to support basic CRUD operations far enough that replication works.
-rw-r--r--src/chttpd/src/chttpd.erl11
-rw-r--r--src/chttpd/src/chttpd_auth_request.erl7
-rw-r--r--src/chttpd/src/chttpd_changes.erl973
-rw-r--r--src/chttpd/src/chttpd_db.erl328
-rw-r--r--src/chttpd/src/chttpd_external.erl35
-rw-r--r--src/chttpd/src/chttpd_misc.erl62
-rw-r--r--src/chttpd/src/chttpd_show.erl5
-rw-r--r--src/couch_mrview/src/couch_mrview.erl16
-rw-r--r--test/elixir/test/basics_test.exs2
9 files changed, 1183 insertions, 256 deletions
diff --git a/src/chttpd/src/chttpd.erl b/src/chttpd/src/chttpd.erl
index 1e1d638be..4d32c03c5 100644
--- a/src/chttpd/src/chttpd.erl
+++ b/src/chttpd/src/chttpd.erl
@@ -25,7 +25,7 @@
error_info/1, parse_form/1, json_body/1, json_body_obj/1, body/1,
doc_etag/1, make_etag/1, etag_respond/3, etag_match/2,
partition/1, serve_file/3, serve_file/4,
- server_header/0, start_chunked_response/3,send_chunk/2,
+ server_header/0, start_chunked_response/3,send_chunk/2,last_chunk/1,
start_response_length/4, send/2, start_json_response/2,
start_json_response/3, end_json_response/1, send_response/4,
send_response_no_cors/4,
@@ -743,7 +743,14 @@ start_chunked_response(#httpd{mochi_req=MochiReq}=Req, Code, Headers0) ->
{ok, Resp}.
send_chunk(Resp, Data) ->
- Resp:write_chunk(Data),
+ case iolist_size(Data) of
+ 0 -> ok; % do nothing
+ _ -> Resp:write_chunk(Data)
+ end,
+ {ok, Resp}.
+
+last_chunk(Resp) ->
+ Resp:write_chunk([]),
{ok, Resp}.
send_response(Req, Code, Headers0, Body) ->
diff --git a/src/chttpd/src/chttpd_auth_request.erl b/src/chttpd/src/chttpd_auth_request.erl
index 96dbf980c..7210905f0 100644
--- a/src/chttpd/src/chttpd_auth_request.erl
+++ b/src/chttpd/src/chttpd_auth_request.erl
@@ -103,7 +103,8 @@ server_authorization_check(#httpd{path_parts=[<<"_", _/binary>>|_]}=Req) ->
require_admin(Req).
db_authorization_check(#httpd{path_parts=[DbName|_],user_ctx=Ctx}=Req) ->
- {_} = fabric:get_security(DbName, [{user_ctx, Ctx}]),
+ {ok, Db} = fabric2_db:open(DbName, [{user_ctx, Ctx}]),
+ fabric2_db:check_is_member(Db),
Req.
require_admin(Req) ->
@@ -111,8 +112,8 @@ require_admin(Req) ->
Req.
require_db_admin(#httpd{path_parts=[DbName|_],user_ctx=Ctx}=Req) ->
- Sec = fabric:get_security(DbName, [{user_ctx, Ctx}]),
-
+ {ok, Db} = fabric2_db:open(DbName, [{user_ctx, Ctx}]),
+ Sec = fabric2_db:get_security(Db),
case is_db_admin(Ctx,Sec) of
true -> Req;
false -> throw({unauthorized, <<"You are not a server or db admin.">>})
diff --git a/src/chttpd/src/chttpd_changes.erl b/src/chttpd/src/chttpd_changes.erl
new file mode 100644
index 000000000..30caab2a0
--- /dev/null
+++ b/src/chttpd/src/chttpd_changes.erl
@@ -0,0 +1,973 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(chttpd_changes).
+-include_lib("couch/include/couch_db.hrl").
+-include_lib("couch_mrview/include/couch_mrview.hrl").
+
+-export([
+ handle_db_changes/3,
+ handle_changes/4,
+ get_changes_timeout/2,
+ wait_updated/3,
+ get_rest_updated/1,
+ configure_filter/4,
+ filter/3,
+ handle_db_event/3,
+ handle_view_event/3,
+ view_filter/3,
+ send_changes_doc_ids/6,
+ send_changes_design_docs/6
+]).
+
+-export([changes_enumerator/2]).
+
+%% export so we can use fully qualified call to facilitate hot-code upgrade
+-export([
+ keep_sending_changes/3
+]).
+
+-record(changes_acc, {
+ db,
+ view_name,
+ ddoc_name,
+ view,
+ seq,
+ prepend,
+ filter,
+ callback,
+ user_acc,
+ resp_type,
+ limit,
+ include_docs,
+ doc_options,
+ conflicts,
+ timeout,
+ timeout_fun,
+ aggregation_kvs,
+ aggregation_results
+}).
+
+handle_db_changes(Args, Req, Db) ->
+ handle_changes(Args, Req, Db, db).
+
+handle_changes(Args1, Req, Db, Type) ->
+ ReqPid = chttpd:header_value(Req, "XKCD", "<unknown>"),
+ #changes_args{
+ style = Style,
+ filter = FilterName,
+ feed = Feed,
+ dir = Dir,
+ since = Since
+ } = Args1,
+ couch_log:error("XKCD: STARTING CHANGES FEED ~p for ~s : ~p", [self(), ReqPid, Since]),
+ Filter = configure_filter(FilterName, Style, Req, Db),
+ Args = Args1#changes_args{filter_fun = Filter},
+ % The type of changes feed depends on the supplied filter. If the query is
+ % for an optimized view-filtered db changes, we need to use the view
+ % sequence tree.
+ {UseViewChanges, DDocName, ViewName} = case {Type, Filter} of
+ {{view, DDocName0, ViewName0}, _} ->
+ {true, DDocName0, ViewName0};
+ {_, {fast_view, _, DDoc, ViewName0}} ->
+ {true, DDoc#doc.id, ViewName0};
+ _ ->
+ {false, undefined, undefined}
+ end,
+ DbName = fabric2_db:name(Db),
+ {StartListenerFun, View} = if UseViewChanges ->
+ {ok, {_, View0, _}, _, _} = couch_mrview_util:get_view(
+ DbName, DDocName, ViewName, #mrargs{}),
+ case View0#mrview.seq_btree of
+ #btree{} ->
+ ok;
+ _ ->
+ throw({bad_request, "view changes not enabled"})
+ end,
+ SNFun = fun() ->
+ couch_event:link_listener(
+ ?MODULE, handle_view_event, {self(), DDocName}, [{dbname, DbName}]
+ )
+ end,
+ {SNFun, View0};
+ true ->
+ SNFun = fun() ->
+ fabric2_events:link_listener(
+ ?MODULE, handle_db_event, self(), [{dbname, DbName}]
+ )
+ end,
+ {SNFun, undefined}
+ end,
+ Start = fun() ->
+ StartSeq = case Dir of
+ rev ->
+ fabric2_fdb:get_update_seq(Db);
+ fwd ->
+ Since
+ end,
+ View2 = if UseViewChanges ->
+ {ok, {_, View1, _}, _, _} = couch_mrview_util:get_view(
+ DbName, DDocName, ViewName, #mrargs{}),
+ View1;
+ true ->
+ undefined
+ end,
+ {Db, View2, StartSeq}
+ end,
+ % begin timer to deal with heartbeat when filter function fails
+ case Args#changes_args.heartbeat of
+ undefined ->
+ erlang:erase(last_changes_heartbeat);
+ Val when is_integer(Val); Val =:= true ->
+ put(last_changes_heartbeat, os:timestamp())
+ end,
+
+ case lists:member(Feed, ["continuous", "longpoll", "eventsource"]) of
+ true ->
+ fun(CallbackAcc) ->
+ {Callback, UserAcc} = get_callback_acc(CallbackAcc),
+ {ok, Listener} = StartListenerFun(),
+
+ {Db, View, StartSeq} = Start(),
+ UserAcc2 = start_sending_changes(Callback, UserAcc),
+ {Timeout, TimeoutFun} = get_changes_timeout(Args, Callback),
+ Acc0 = build_acc(Args, Callback, UserAcc2, Db, StartSeq,
+ <<"">>, Timeout, TimeoutFun, DDocName, ViewName,
+ View),
+ try
+ keep_sending_changes(
+ Args#changes_args{dir=fwd},
+ Acc0,
+ true)
+ after
+ fabric2_events:stop_listener(Listener),
+ get_rest_updated(ok) % clean out any remaining update messages
+ end
+ end;
+ false ->
+ fun(CallbackAcc) ->
+ {Callback, UserAcc} = get_callback_acc(CallbackAcc),
+ UserAcc2 = start_sending_changes(Callback, UserAcc),
+ {Timeout, TimeoutFun} = get_changes_timeout(Args, Callback),
+ {Db, View, StartSeq} = Start(),
+ Acc0 = build_acc(Args#changes_args{feed="normal"}, Callback,
+ UserAcc2, Db, StartSeq, <<>>, Timeout, TimeoutFun,
+ DDocName, ViewName, View),
+ {ok, #changes_acc{seq = LastSeq, user_acc = UserAcc3}} =
+ send_changes(
+ Acc0,
+ Dir,
+ true),
+ end_sending_changes(Callback, UserAcc3, LastSeq)
+ end
+ end.
+
+
+handle_db_event(_DbName, updated, Parent) ->
+ Parent ! updated,
+ {ok, Parent};
+handle_db_event(_DbName, deleted, Parent) ->
+ Parent ! deleted,
+ {ok, Parent};
+handle_db_event(_DbName, _Event, Parent) ->
+ {ok, Parent}.
+
+
+handle_view_event(_DbName, Msg, {Parent, DDocId}) ->
+ case Msg of
+ {index_commit, DDocId} ->
+ Parent ! updated;
+ {index_delete, DDocId} ->
+ Parent ! deleted;
+ _ ->
+ ok
+ end,
+ {ok, {Parent, DDocId}}.
+
+get_callback_acc({Callback, _UserAcc} = Pair) when is_function(Callback, 2) ->
+ Pair;
+get_callback_acc(Callback) when is_function(Callback, 1) ->
+ {fun(Ev, _) -> Callback(Ev) end, ok}.
+
+
+configure_filter("_doc_ids", Style, Req, _Db) ->
+ {doc_ids, Style, get_doc_ids(Req)};
+configure_filter("_selector", Style, Req, _Db) ->
+ {selector, Style, get_selector_and_fields(Req)};
+configure_filter("_design", Style, _Req, _Db) ->
+ {design_docs, Style};
+configure_filter("_view", Style, Req, Db) ->
+ ViewName = get_view_qs(Req),
+ if ViewName /= "" -> ok; true ->
+ throw({bad_request, "`view` filter parameter is not provided."})
+ end,
+ ViewNameParts = string:tokens(ViewName, "/"),
+ case [?l2b(couch_httpd:unquote(Part)) || Part <- ViewNameParts] of
+ [DName, VName] ->
+ {ok, DDoc} = open_ddoc(Db, <<"_design/", DName/binary>>),
+ check_member_exists(DDoc, [<<"views">>, VName]),
+ FilterType = try
+ true = couch_util:get_nested_json_value(
+ DDoc#doc.body,
+ [<<"options">>, <<"seq_indexed">>]
+ ),
+ fast_view
+ catch _:_ ->
+ view
+ end,
+ case fabric2_db:is_clustered(Db) of
+ true ->
+ DIR = fabric_util:doc_id_and_rev(DDoc),
+ {fetch, FilterType, Style, DIR, VName};
+ false ->
+ {FilterType, Style, DDoc, VName}
+ end;
+ [] ->
+ Msg = "`view` must be of the form `designname/viewname`",
+ throw({bad_request, Msg})
+ end;
+configure_filter([$_ | _], _Style, _Req, _Db) ->
+ throw({bad_request, "unknown builtin filter name"});
+configure_filter("", main_only, _Req, _Db) ->
+ {default, main_only};
+configure_filter("", all_docs, _Req, _Db) ->
+ {default, all_docs};
+configure_filter(FilterName, Style, Req, Db) ->
+ FilterNameParts = string:tokens(FilterName, "/"),
+ case [?l2b(couch_httpd:unquote(Part)) || Part <- FilterNameParts] of
+ [DName, FName] ->
+ {ok, DDoc} = open_ddoc(Db, <<"_design/", DName/binary>>),
+ check_member_exists(DDoc, [<<"filters">>, FName]),
+ {custom, Style, Req, DDoc, FName};
+ [] ->
+ {default, Style};
+ _Else ->
+ Msg = "`filter` must be of the form `designname/filtername`",
+ throw({bad_request, Msg})
+ end.
+
+
+filter(Db, Change, {default, Style}) ->
+ apply_style(Db, Change, Style);
+filter(Db, Change, {doc_ids, Style, DocIds}) ->
+ case lists:member(maps:get(id, Change), DocIds) of
+ true ->
+ apply_style(Db, Change, Style);
+ false ->
+ []
+ end;
+filter(Db, Change, {selector, Style, {Selector, _Fields}}) ->
+ Docs = open_revs(Db, Change, Style),
+ Passes = [mango_selector:match(Selector, couch_doc:to_json_obj(Doc, []))
+ || Doc <- Docs],
+ filter_revs(Passes, Docs);
+filter(Db, Change, {design_docs, Style}) ->
+ case maps:get(id, Change) of
+ <<"_design", _/binary>> ->
+ apply_style(Db, Change, Style);
+ _ ->
+ []
+ end;
+filter(Db, Change, {FilterType, Style, DDoc, VName})
+ when FilterType == view; FilterType == fast_view ->
+ Docs = open_revs(Db, Change, Style),
+ {ok, Passes} = couch_query_servers:filter_view(DDoc, VName, Docs),
+ filter_revs(Passes, Docs);
+filter(Db, Change, {custom, Style, Req0, DDoc, FName}) ->
+ Req = case Req0 of
+ {json_req, _} -> Req0;
+ #httpd{} -> {json_req, chttpd_external:json_req_obj(Req0, Db)}
+ end,
+ Docs = open_revs(Db, Change, Style),
+ {ok, Passes} = couch_query_servers:filter_docs(Req, Db, DDoc, FName, Docs),
+ filter_revs(Passes, Docs);
+filter(A, B, C) ->
+ erlang:error({filter_error, A, B, C}).
+
+fast_view_filter(Db, {{Seq, _}, {ID, _, _}}, {fast_view, Style, _, _}) ->
+ case fabric2_db:get_doc_info(Db, ID) of
+ {ok, #doc_info{high_seq=Seq}=DocInfo} ->
+ Docs = open_revs(Db, DocInfo, Style),
+ Changes = lists:map(fun(#doc{revs={RevPos, [RevId | _]}}) ->
+ RevStr = couch_doc:rev_to_str({RevPos, RevId}),
+ {[{<<"rev">>, RevStr}]}
+ end, Docs),
+ {DocInfo, Changes};
+ {ok, #doc_info{high_seq=HighSeq}} when Seq > HighSeq ->
+ % If the view seq tree is out of date (or if the view seq tree
+ % was opened before the db) seqs may come by from the seq tree
+ % which correspond to the not-most-current revision of a document.
+ % The proper thing to do is to not send this old revision, but wait
+ % until we reopen the up-to-date view seq tree and continue the
+ % fold.
+ % I left the Seq > HighSeq guard in so if (for some godforsaken
+ % reason) the seq in the view is more current than the database,
+ % we'll throw an error.
+ {undefined, []};
+ {error, not_found} ->
+ {undefined, []}
+ end.
+
+
+
+view_filter(Db, KV, {default, Style}) ->
+ apply_view_style(Db, KV, Style).
+
+
+get_view_qs({json_req, {Props}}) ->
+ {Query} = couch_util:get_value(<<"query">>, Props, {[]}),
+ binary_to_list(couch_util:get_value(<<"view">>, Query, ""));
+get_view_qs(Req) ->
+ couch_httpd:qs_value(Req, "view", "").
+
+get_doc_ids({json_req, {Props}}) ->
+ check_docids(couch_util:get_value(<<"doc_ids">>, Props));
+get_doc_ids(#httpd{method='POST'}=Req) ->
+ couch_httpd:validate_ctype(Req, "application/json"),
+ {Props} = couch_httpd:json_body_obj(Req),
+ check_docids(couch_util:get_value(<<"doc_ids">>, Props));
+get_doc_ids(#httpd{method='GET'}=Req) ->
+ DocIds = ?JSON_DECODE(couch_httpd:qs_value(Req, "doc_ids", "null")),
+ check_docids(DocIds);
+get_doc_ids(_) ->
+ throw({bad_request, no_doc_ids_provided}).
+
+
+get_selector_and_fields({json_req, {Props}}) ->
+ Selector = check_selector(couch_util:get_value(<<"selector">>, Props)),
+ Fields = check_fields(couch_util:get_value(<<"fields">>, Props, nil)),
+ {Selector, Fields};
+get_selector_and_fields(#httpd{method='POST'}=Req) ->
+ couch_httpd:validate_ctype(Req, "application/json"),
+ get_selector_and_fields({json_req, couch_httpd:json_body_obj(Req)});
+get_selector_and_fields(_) ->
+ throw({bad_request, "Selector must be specified in POST payload"}).
+
+
+check_docids(DocIds) when is_list(DocIds) ->
+ lists:foreach(fun
+ (DocId) when not is_binary(DocId) ->
+ Msg = "`doc_ids` filter parameter is not a list of doc ids.",
+ throw({bad_request, Msg});
+ (_) -> ok
+ end, DocIds),
+ DocIds;
+check_docids(_) ->
+ Msg = "`doc_ids` filter parameter is not a list of doc ids.",
+ throw({bad_request, Msg}).
+
+
+check_selector(Selector={_}) ->
+ try
+ mango_selector:normalize(Selector)
+ catch
+ {mango_error, Mod, Reason0} ->
+ {_StatusCode, _Error, Reason} = mango_error:info(Mod, Reason0),
+ throw({bad_request, Reason})
+ end;
+check_selector(_Selector) ->
+ throw({bad_request, "Selector error: expected a JSON object"}).
+
+
+check_fields(nil) ->
+ nil;
+check_fields(Fields) when is_list(Fields) ->
+ try
+ {ok, Fields1} = mango_fields:new(Fields),
+ Fields1
+ catch
+ {mango_error, Mod, Reason0} ->
+ {_StatusCode, _Error, Reason} = mango_error:info(Mod, Reason0),
+ throw({bad_request, Reason})
+ end;
+check_fields(_Fields) ->
+ throw({bad_request, "Selector error: fields must be JSON array"}).
+
+
+open_ddoc(Db, DDocId) ->
+ case ddoc_cache:open_doc(Db, DDocId) of
+ {ok, _} = Resp -> Resp;
+ Else -> throw(Else)
+ end.
+
+
+check_member_exists(#doc{body={Props}}, Path) ->
+ couch_util:get_nested_json_value({Props}, Path).
+
+
+apply_style(_Db, Change, main_only) ->
+ #{rev_id := RevId} = Change,
+ [{[{<<"rev">>, couch_doc:rev_to_str(RevId)}]}];
+apply_style(Db, Change, all_docs) ->
+ % We have to fetch all revs for this row
+ #{id := DocId} = Change,
+ {ok, Resps} = fabric2_db:open_doc_revs(Db, DocId, all, [deleted]),
+ lists:flatmap(fun(Resp) ->
+ case Resp of
+ {ok, #doc{revs = {Pos, [Rev | _]}}} ->
+ [{[{<<"rev">>, couch_doc:rev_to_str({Pos, Rev})}]}];
+ _ ->
+ []
+ end
+ end, Resps);
+apply_style(A, B, C) ->
+ erlang:error({changes_apply_style, A, B, C}).
+
+apply_view_style(_Db, {{_Seq, _Key}, {_ID, _Value, Rev}}, main_only) ->
+ [{[{<<"rev">>, couch_doc:rev_to_str(Rev)}]}];
+apply_view_style(Db, {{_Seq, _Key}, {ID, _Value, _Rev}}, all_docs) ->
+ case couch_db:get_doc_info(Db, ID) of
+ {ok, DocInfo} ->
+ apply_style(Db, DocInfo, all_docs);
+ {error, not_found} ->
+ []
+ end.
+
+
+open_revs(Db, Change, Style) ->
+ #{id := DocId} = Change,
+ Options = [deleted, conflicts],
+ try
+ case Style of
+ main_only ->
+ {ok, Doc} = fabric2_db:open_doc(Db, DocId, Options),
+ [Doc];
+ all_docs ->
+ {ok, Docs} = fabric2_db:open_doc_revs(Db, DocId, all, Options),
+ [Doc || {ok, Doc} <- Docs]
+ end
+ catch _:_ ->
+ % We didn't log this before, should we now?
+ []
+ end.
+
+
+filter_revs(Passes, Docs) ->
+ lists:flatmap(fun
+ ({true, #doc{revs={RevPos, [RevId | _]}}}) ->
+ RevStr = couch_doc:rev_to_str({RevPos, RevId}),
+ Change = {[{<<"rev">>, RevStr}]},
+ [Change];
+ (_) ->
+ []
+ end, lists:zip(Passes, Docs)).
+
+
+get_changes_timeout(Args, Callback) ->
+ #changes_args{
+ heartbeat = Heartbeat,
+ timeout = Timeout,
+ feed = ResponseType
+ } = Args,
+ DefaultTimeout = list_to_integer(
+ config:get("httpd", "changes_timeout", "60000")
+ ),
+ case Heartbeat of
+ undefined ->
+ case Timeout of
+ undefined ->
+ {DefaultTimeout, fun(UserAcc) -> {stop, UserAcc} end};
+ infinity ->
+ {infinity, fun(UserAcc) -> {stop, UserAcc} end};
+ _ ->
+ {lists:min([DefaultTimeout, Timeout]),
+ fun(UserAcc) -> {stop, UserAcc} end}
+ end;
+ true ->
+ {DefaultTimeout,
+ fun(UserAcc) -> {ok, Callback(timeout, ResponseType, UserAcc)} end};
+ _ ->
+ {lists:min([DefaultTimeout, Heartbeat]),
+ fun(UserAcc) -> {ok, Callback(timeout, ResponseType, UserAcc)} end}
+ end.
+
+start_sending_changes(Callback, UserAcc) ->
+ {_, NewUserAcc} = Callback(start, UserAcc),
+ NewUserAcc.
+
+build_acc(Args, Callback, UserAcc, Db, StartSeq, Prepend, Timeout, TimeoutFun, DDocName, ViewName, View) ->
+ #changes_args{
+ include_docs = IncludeDocs,
+ doc_options = DocOpts,
+ conflicts = Conflicts,
+ limit = Limit,
+ feed = ResponseType,
+ filter_fun = Filter
+ } = Args,
+ #changes_acc{
+ db = Db,
+ seq = StartSeq,
+ prepend = Prepend,
+ filter = Filter,
+ callback = Callback,
+ user_acc = UserAcc,
+ resp_type = ResponseType,
+ limit = Limit,
+ include_docs = IncludeDocs,
+ doc_options = DocOpts,
+ conflicts = Conflicts,
+ timeout = Timeout,
+ timeout_fun = TimeoutFun,
+ ddoc_name = DDocName,
+ view_name = ViewName,
+ view = View,
+ aggregation_results=[],
+ aggregation_kvs=[]
+ }.
+
+send_changes(Acc, Dir, FirstRound) ->
+ #changes_acc{
+ db = Db,
+ seq = StartSeq,
+ filter = Filter,
+ view = View
+ } = Acc,
+ DbEnumFun = fun changes_enumerator/2,
+ case can_optimize(FirstRound, Filter) of
+ {true, Fun} ->
+ Fun(Db, StartSeq, Dir, DbEnumFun, Acc, Filter);
+ _ ->
+ case {View, Filter} of
+ {#mrview{}, {fast_view, _, _, _}} ->
+ couch_mrview:view_changes_since(View, StartSeq, DbEnumFun, [{dir, Dir}], Acc);
+ {undefined, _} ->
+ Opts = [{dir, Dir}],
+ fabric2_db:fold_changes(Db, StartSeq, DbEnumFun, Acc, Opts);
+ {#mrview{}, _} ->
+ ViewEnumFun = fun view_changes_enumerator/2,
+ {Go, Acc0} = couch_mrview:view_changes_since(View, StartSeq, ViewEnumFun, [{dir, Dir}], Acc),
+ case Acc0 of
+ #changes_acc{aggregation_results=[]} ->
+ {Go, Acc0};
+ _ ->
+ #changes_acc{
+ aggregation_results = AggResults,
+ aggregation_kvs = AggKVs,
+ user_acc = UserAcc,
+ callback = Callback,
+ resp_type = ResponseType,
+ prepend = Prepend
+ } = Acc0,
+ ChangesRow = view_changes_row(AggResults, AggKVs, Acc0),
+ UserAcc0 = Callback({change, ChangesRow, Prepend}, ResponseType, UserAcc),
+ reset_heartbeat(),
+ {Go, Acc0#changes_acc{user_acc=UserAcc0}}
+ end
+ end
+ end.
+
+
+can_optimize(true, {doc_ids, _Style, DocIds}) ->
+ MaxDocIds = config:get_integer("couchdb",
+ "changes_doc_ids_optimization_threshold", 100),
+ if length(DocIds) =< MaxDocIds ->
+ {true, fun send_changes_doc_ids/6};
+ true ->
+ false
+ end;
+can_optimize(true, {design_docs, _Style}) ->
+ {true, fun send_changes_design_docs/6};
+can_optimize(_, _) ->
+ false.
+
+
+send_changes_doc_ids(Db, StartSeq, Dir, Fun, Acc0, {doc_ids, _Style, DocIds}) ->
+ Results = fabric2_db:get_full_doc_infos(Db, DocIds),
+ FullInfos = lists:foldl(fun
+ (#full_doc_info{}=FDI, Acc) -> [FDI | Acc];
+ (not_found, Acc) -> Acc
+ end, [], Results),
+ send_lookup_changes(FullInfos, StartSeq, Dir, Db, Fun, Acc0).
+
+
+send_changes_design_docs(Db, StartSeq, Dir, Fun, Acc0, {design_docs, _Style}) ->
+ FoldFun = fun(FDI, Acc) -> {ok, [FDI | Acc]} end,
+ Opts = [
+ include_deleted,
+ {start_key, <<"_design/">>},
+ {end_key_gt, <<"_design0">>}
+ ],
+ {ok, FullInfos} = couch_db:fold_docs(Db, FoldFun, [], Opts),
+ send_lookup_changes(FullInfos, StartSeq, Dir, Db, Fun, Acc0).
+
+
+send_lookup_changes(FullDocInfos, StartSeq, Dir, Db, Fun, Acc0) ->
+ FoldFun = case Dir of
+ fwd -> fun lists:foldl/3;
+ rev -> fun lists:foldr/3
+ end,
+ GreaterFun = case Dir of
+ fwd -> fun(A, B) -> A > B end;
+ rev -> fun(A, B) -> A =< B end
+ end,
+ DocInfos = lists:foldl(fun(FDI, Acc) ->
+ DI = couch_doc:to_doc_info(FDI),
+ case GreaterFun(DI#doc_info.high_seq, StartSeq) of
+ true -> [DI | Acc];
+ false -> Acc
+ end
+ end, [], FullDocInfos),
+ SortedDocInfos = lists:keysort(#doc_info.high_seq, DocInfos),
+ FinalAcc = try
+ FoldFun(fun(DocInfo, Acc) ->
+ % Kinda gross that we're munging this back to a map
+ % that will then have to re-read and rebuild the FDI
+ % for all_docs style. But c'est la vie.
+ #doc_info{
+ id = DocId,
+ high_seq = Seq,
+ revs = [#rev_info{rev = Rev, deleted = Deleted} | _]
+ } = DocInfo,
+ Change = #{
+ id => DocId,
+ sequence => Seq,
+ rev_id => Rev,
+ deleted => Deleted
+ },
+ case Fun(Change, Acc) of
+ {ok, NewAcc} ->
+ NewAcc;
+ {stop, NewAcc} ->
+ throw({stop, NewAcc})
+ end
+ end, Acc0, SortedDocInfos)
+ catch
+ {stop, Acc} -> Acc
+ end,
+ case Dir of
+ fwd ->
+ FinalAcc0 = case element(1, FinalAcc) of
+ changes_acc -> % we came here via couch_http or internal call
+ FinalAcc#changes_acc{seq = fabric2_db:get_update_seq(Db)};
+ fabric_changes_acc -> % we came here via chttpd / fabric / rexi
+ FinalAcc#fabric_changes_acc{seq = couch_db:get_update_seq(Db)}
+ end,
+ {ok, FinalAcc0};
+ rev -> {ok, FinalAcc}
+ end.
+
+
+keep_sending_changes(Args, Acc0, FirstRound) ->
+ #changes_args{
+ feed = ResponseType,
+ limit = Limit,
+ db_open_options = DbOptions
+ } = Args,
+
+ {ok, ChangesAcc} = send_changes(Acc0, fwd, FirstRound),
+
+ #changes_acc{
+ db = Db, callback = Callback,
+ timeout = Timeout, timeout_fun = TimeoutFun, seq = EndSeq,
+ prepend = Prepend2, user_acc = UserAcc2, limit = NewLimit,
+ ddoc_name = DDocName, view_name = ViewName
+ } = ChangesAcc,
+
+ if Limit > NewLimit, ResponseType == "longpoll" ->
+ end_sending_changes(Callback, UserAcc2, EndSeq);
+ true ->
+ {Go, UserAcc3} = notify_waiting_for_updates(Callback, UserAcc2),
+ if Go /= ok -> end_sending_changes(Callback, UserAcc3, EndSeq); true ->
+ case wait_updated(Timeout, TimeoutFun, UserAcc3) of
+ {updated, UserAcc4} ->
+ UserCtx = fabric2_db:get_user_ctx(Db),
+ DbOptions1 = [{user_ctx, UserCtx} | DbOptions],
+ case fabric2_db:open(fabric2_db:name(Db), DbOptions1) of
+ {ok, Db2} ->
+ ?MODULE:keep_sending_changes(
+ Args#changes_args{limit=NewLimit},
+ ChangesAcc#changes_acc{
+ db = Db2,
+ view = maybe_refresh_view(Db2, DDocName, ViewName),
+ user_acc = UserAcc4,
+ seq = EndSeq,
+ prepend = Prepend2,
+ timeout = Timeout,
+ timeout_fun = TimeoutFun},
+ false);
+ _Else ->
+ end_sending_changes(Callback, UserAcc3, EndSeq)
+ end;
+ {stop, UserAcc4} ->
+ end_sending_changes(Callback, UserAcc4, EndSeq)
+ end
+ end
+ end.
+
+maybe_refresh_view(_, undefined, undefined) ->
+ undefined;
+maybe_refresh_view(Db, DDocName, ViewName) ->
+ DbName = couch_db:name(Db),
+ {ok, {_, View, _}, _, _} = couch_mrview_util:get_view(DbName, DDocName, ViewName, #mrargs{}),
+ View.
+
+notify_waiting_for_updates(Callback, UserAcc) ->
+ Callback(waiting_for_updates, UserAcc).
+
+end_sending_changes(Callback, UserAcc, EndSeq) ->
+ Callback({stop, EndSeq, null}, UserAcc).
+
+view_changes_enumerator(Value, Acc) ->
+ #changes_acc{
+ filter = Filter, callback = Callback, prepend = Prepend,
+ user_acc = UserAcc, limit = Limit, resp_type = ResponseType, db = Db,
+ timeout = Timeout, timeout_fun = TimeoutFun, seq = CurrentSeq,
+ aggregation_kvs=AggKVs, aggregation_results=AggResults
+ } = Acc,
+
+ Results0 = view_filter(Db, Value, Filter),
+ Results = [Result || Result <- Results0, Result /= null],
+ {{Seq, _}, _} = Value,
+
+ Go = if (Limit =< 1) andalso Results =/= [] -> stop; true -> ok end,
+
+ if CurrentSeq =:= Seq ->
+ NewAggKVs = case Results of
+ [] -> AggKVs;
+ _ -> [Value|AggKVs]
+ end,
+ {Done, UserAcc2} = maybe_heartbeat(Timeout, TimeoutFun, UserAcc),
+ Acc0 = Acc#changes_acc{
+ seq = Seq,
+ user_acc = UserAcc2,
+ aggregation_kvs=NewAggKVs
+ },
+ case Done of
+ stop -> {stop, Acc0};
+ ok -> {Go, Acc0}
+ end;
+ AggResults =/= [] ->
+ {NewAggKVs, NewAggResults} = case Results of
+ [] -> {[], []};
+ _ -> {[Value], Results}
+ end,
+ if ResponseType =:= "continuous" orelse ResponseType =:= "eventsource" ->
+ ChangesRow = view_changes_row(AggResults, AggKVs, Acc),
+ UserAcc2 = Callback({change, ChangesRow, <<>>}, ResponseType, UserAcc),
+ reset_heartbeat(),
+ {Go, Acc#changes_acc{
+ seq = Seq, user_acc = UserAcc2, limit = Limit - 1,
+ aggregation_kvs=NewAggKVs, aggregation_results=NewAggResults}};
+ true ->
+ ChangesRow = view_changes_row(AggResults, AggKVs, Acc),
+ UserAcc2 = Callback({change, ChangesRow, Prepend}, ResponseType, UserAcc),
+ reset_heartbeat(),
+ {Go, Acc#changes_acc{
+ seq = Seq, prepend = <<",\n">>, user_acc = UserAcc2,
+ limit = Limit - 1, aggregation_kvs=[Value],
+ aggregation_results=Results}}
+ end;
+ true ->
+ {NewAggKVs, NewAggResults} = case Results of
+ [] -> {[], []};
+ _ -> {[Value], Results}
+ end,
+ {Done, UserAcc2} = maybe_heartbeat(Timeout, TimeoutFun, UserAcc),
+ Acc0 = Acc#changes_acc{
+ seq = Seq,
+ user_acc = UserAcc2,
+ aggregation_kvs=NewAggKVs,
+ aggregation_results=NewAggResults
+ },
+ case Done of
+ stop -> {stop, Acc0};
+ ok -> {Go, Acc0}
+ end
+ end.
+
+changes_enumerator(Change0, Acc) ->
+ #changes_acc{
+ filter = Filter,
+ callback = Callback,
+ user_acc = UserAcc,
+ limit = Limit,
+ db = Db,
+ timeout = Timeout,
+ timeout_fun = TimeoutFun
+ } = Acc,
+ {Change1, Results0} = case Filter of
+ {fast_view, _, _, _} ->
+ fast_view_filter(Db, Change0, Filter);
+ _ ->
+ {Change0, filter(Db, Change0, Filter)}
+ end,
+ Results = [Result || Result <- Results0, Result /= null],
+ Seq = maps:get(sequence, Change1),
+ Go = if (Limit =< 1) andalso Results =/= [] -> stop; true -> ok end,
+ case Results of
+ [] ->
+ {Done, UserAcc2} = maybe_heartbeat(Timeout, TimeoutFun, UserAcc),
+ case Done of
+ stop ->
+ {stop, Acc#changes_acc{seq = Seq, user_acc = UserAcc2}};
+ ok ->
+ {Go, Acc#changes_acc{seq = Seq, user_acc = UserAcc2}}
+ end;
+ _ ->
+ ChangesRow = changes_row(Results, Change1, Acc),
+ {UserGo, UserAcc2} = Callback({change, ChangesRow}, UserAcc),
+ RealGo = case UserGo of
+ ok -> Go;
+ stop -> stop
+ end,
+ reset_heartbeat(),
+ couch_log:error("XKCD: CHANGE SEQ: ~p", [Seq]),
+ {RealGo, Acc#changes_acc{
+ seq = Seq,
+ user_acc = UserAcc2,
+ limit = Limit - 1
+ }}
+ end.
+
+
+
+view_changes_row(Results, KVs, Acc) ->
+ {Add, Remove} = lists:foldl(fun(Row, {AddAcc, RemAcc}) ->
+ {{_Seq, Key}, {_Id, Value, _Rev}} = Row,
+ case Value of
+ removed ->
+ {AddAcc, [Key|RemAcc]};
+ {dups, DupValues} ->
+ AddAcc1 = lists:foldl(fun(DupValue, AddAcc0) ->
+ [[Key, DupValue]|AddAcc0]
+ end, AddAcc, DupValues),
+ {AddAcc1, RemAcc};
+ _ ->
+ {[[Key, Value]|AddAcc], RemAcc}
+ end
+ end, {[], []}, KVs),
+
+ % Seq, Id, and Rev should be the same for all KVs, since we're aggregating
+ % by seq.
+ [{{Seq, _Key}, {Id, _Value, Rev}}|_] = KVs,
+
+ {[
+ {<<"seq">>, Seq}, {<<"id">>, Id}, {<<"add">>, Add},
+ {<<"remove">>, Remove}, {<<"changes">>, Results}
+ ] ++ maybe_get_changes_doc({Id, Rev}, Acc)}.
+
+
+changes_row(Results, Change, Acc) ->
+ #{
+ id := Id,
+ sequence := Seq,
+ deleted := Del
+ } = Change,
+ {[
+ {<<"seq">>, Seq},
+ {<<"id">>, Id},
+ {<<"changes">>, Results}
+ ] ++ deleted_item(Del) ++ maybe_get_changes_doc(Change, Acc)}.
+
+maybe_get_changes_doc(Value, #changes_acc{include_docs=true}=Acc) ->
+ #changes_acc{
+ db = Db,
+ doc_options = DocOpts,
+ conflicts = Conflicts,
+ filter = Filter
+ } = Acc,
+ Opts = case Conflicts of
+ true -> [deleted, conflicts];
+ false -> [deleted]
+ end,
+ load_doc(Db, Value, Opts, DocOpts, Filter);
+
+maybe_get_changes_doc(_Value, _Acc) ->
+ [].
+
+
+load_doc(Db, Value, Opts, DocOpts, Filter) ->
+ case load_doc(Db, Value, Opts) of
+ null ->
+ [{doc, null}];
+ Doc ->
+ [{doc, doc_to_json(Doc, DocOpts, Filter)}]
+ end.
+
+
+load_doc(Db, Change, Opts) ->
+ #{
+ id := Id,
+ rev_id := RevId
+ } = Change,
+ case fabric2_db:open_doc_revs(Db, Id, [RevId], Opts) of
+ {ok, [{ok, Doc}]} ->
+ Doc;
+ _ ->
+ null
+ end.
+
+
+doc_to_json(Doc, DocOpts, {selector, _Style, {_Selector, Fields}})
+ when Fields =/= nil ->
+ mango_fields:extract(couch_doc:to_json_obj(Doc, DocOpts), Fields);
+doc_to_json(Doc, DocOpts, _Filter) ->
+ couch_doc:to_json_obj(Doc, DocOpts).
+
+
+deleted_item(true) -> [{<<"deleted">>, true}];
+deleted_item(_) -> [].
+
+% waits for a updated msg, if there are multiple msgs, collects them.
+wait_updated(Timeout, TimeoutFun, UserAcc) ->
+ couch_log:error("XKCD: WAITING FOR UPDATE", []),
+ receive
+ updated ->
+ couch_log:error("XKCD: GOT UPDATED", []),
+ get_rest_updated(UserAcc);
+ deleted ->
+ couch_log:error("XKCD: DB DELETED", []),
+ {stop, UserAcc}
+ after Timeout ->
+ {Go, UserAcc2} = TimeoutFun(UserAcc),
+ case Go of
+ ok ->
+ couch_log:error("XKCD: WAIT UPDATED TIMEOUT, RETRY", []),
+ ?MODULE:wait_updated(Timeout, TimeoutFun, UserAcc2);
+ stop ->
+ couch_log:error("XKCD: WAIT UPDATED TIMEOUT STOP", []),
+ {stop, UserAcc2}
+ end
+ end.
+
+get_rest_updated(UserAcc) ->
+ receive
+ updated ->
+ get_rest_updated(UserAcc)
+ after 0 ->
+ {updated, UserAcc}
+ end.
+
+reset_heartbeat() ->
+ case get(last_changes_heartbeat) of
+ undefined ->
+ ok;
+ _ ->
+ put(last_changes_heartbeat, os:timestamp())
+ end.
+
+maybe_heartbeat(Timeout, TimeoutFun, Acc) ->
+ Before = get(last_changes_heartbeat),
+ case Before of
+ undefined ->
+ {ok, Acc};
+ _ ->
+ Now = os:timestamp(),
+ case timer:now_diff(Now, Before) div 1000 >= Timeout of
+ true ->
+ Acc2 = TimeoutFun(Acc),
+ put(last_changes_heartbeat, Now),
+ Acc2;
+ false ->
+ {ok, Acc}
+ end
+ end.
diff --git a/src/chttpd/src/chttpd_db.erl b/src/chttpd/src/chttpd_db.erl
index c6404b04d..40c1a1e38 100644
--- a/src/chttpd/src/chttpd_db.erl
+++ b/src/chttpd/src/chttpd_db.erl
@@ -93,18 +93,13 @@ handle_changes_req(#httpd{path_parts=[_,<<"_changes">>]}=Req, _Db) ->
handle_changes_req1(#httpd{}=Req, Db) ->
#changes_args{filter=Raw, style=Style} = Args0 = parse_changes_query(Req),
ChangesArgs = Args0#changes_args{
- filter_fun = couch_changes:configure_filter(Raw, Style, Req, Db),
- db_open_options = [{user_ctx, couch_db:get_user_ctx(Db)}]
+ db_open_options = [{user_ctx, fabric2_db:get_user_ctx(Db)}]
},
+ ChangesFun = chttpd_changes:handle_db_changes(ChangesArgs, Req, Db),
Max = chttpd:chunked_response_buffer_size(),
case ChangesArgs#changes_args.feed of
"normal" ->
- T0 = os:timestamp(),
- {ok, Info} = fabric:get_db_info(Db),
- Suffix = mem3:shard_suffix(Db),
- Etag = chttpd:make_etag({Info, Suffix}),
- DeltaT = timer:now_diff(os:timestamp(), T0) / 1000,
- couch_stats:update_histogram([couchdb, dbinfo], DeltaT),
+ Etag = <<"foo">>,
chttpd:etag_respond(Req, Etag, fun() ->
Acc0 = #cacc{
feed = normal,
@@ -112,7 +107,7 @@ handle_changes_req1(#httpd{}=Req, Db) ->
mochi = Req,
threshold = Max
},
- fabric:changes(Db, fun changes_callback/2, Acc0, ChangesArgs)
+ ChangesFun({fun changes_callback/2, Acc0})
end);
Feed when Feed =:= "continuous"; Feed =:= "longpoll"; Feed =:= "eventsource" ->
couch_stats:increment_counter([couchdb, httpd, clients_requesting_changes]),
@@ -122,7 +117,7 @@ handle_changes_req1(#httpd{}=Req, Db) ->
threshold = Max
},
try
- fabric:changes(Db, fun changes_callback/2, Acc0, ChangesArgs)
+ ChangesFun({fun changes_callback/2, Acc0})
after
couch_stats:decrement_counter([couchdb, httpd, clients_requesting_changes])
end;
@@ -337,7 +332,7 @@ update_partition_stats(PathParts) ->
handle_design_req(#httpd{
path_parts=[_DbName, _Design, Name, <<"_",_/binary>> = Action | _Rest]
}=Req, Db) ->
- DbName = mem3:dbname(couch_db:name(Db)),
+ DbName = fabric2_db:name(Db),
case ddoc_cache:open(DbName, <<"_design/", Name/binary>>) of
{ok, DDoc} ->
Handler = chttpd_handlers:design_handler(Action, fun bad_action_req/3),
@@ -365,56 +360,33 @@ handle_design_info_req(Req, _Db, _DDoc) ->
create_db_req(#httpd{}=Req, DbName) ->
couch_httpd:verify_is_server_admin(Req),
- N = chttpd:qs_value(Req, "n", config:get("cluster", "n", "3")),
- Q = chttpd:qs_value(Req, "q", config:get("cluster", "q", "8")),
- P = chttpd:qs_value(Req, "placement", config:get("cluster", "placement")),
- EngineOpt = parse_engine_opt(Req),
- DbProps = parse_partitioned_opt(Req),
- Options = [
- {n, N},
- {q, Q},
- {placement, P},
- {props, DbProps}
- ] ++ EngineOpt,
DocUrl = absolute_uri(Req, "/" ++ couch_util:url_encode(DbName)),
- case fabric:create_db(DbName, Options) of
- ok ->
- send_json(Req, 201, [{"Location", DocUrl}], {[{ok, true}]});
- accepted ->
- send_json(Req, 202, [{"Location", DocUrl}], {[{ok, true}]});
- {error, file_exists} ->
- chttpd:send_error(Req, file_exists);
- Error ->
- throw(Error)
+ case fabric2_db:create(DbName, []) of
+ {ok, _} ->
+ send_json(Req, 201, [{"Location", DocUrl}], {[{ok, true}]});
+ {error, file_exists} ->
+ chttpd:send_error(Req, file_exists);
+ Error ->
+ throw(Error)
end.
delete_db_req(#httpd{}=Req, DbName) ->
couch_httpd:verify_is_server_admin(Req),
- case fabric:delete_db(DbName, []) of
- ok ->
- send_json(Req, 200, {[{ok, true}]});
- accepted ->
- send_json(Req, 202, {[{ok, true}]});
- Error ->
- throw(Error)
+ case fabric2_db:delete(DbName, []) of
+ ok ->
+ send_json(Req, 200, {[{ok, true}]});
+ Error ->
+ throw(Error)
end.
do_db_req(#httpd{path_parts=[DbName|_], user_ctx=Ctx}=Req, Fun) ->
- Shard = hd(mem3:shards(DbName)),
- Props = couch_util:get_value(props, Shard#shard.opts, []),
- Opts = case Ctx of
- undefined ->
- [{props, Props}];
- #user_ctx{} ->
- [{user_ctx, Ctx}, {props, Props}]
- end,
- {ok, Db} = couch_db:clustered_db(DbName, Opts),
+ {ok, Db} = fabric2_db:open(DbName, [{user_ctx, Ctx}]),
Fun(Req, Db).
-db_req(#httpd{method='GET',path_parts=[DbName]}=Req, _Db) ->
+db_req(#httpd{method='GET',path_parts=[_DbName]}=Req, Db) ->
% measure the time required to generate the etag, see if it's worth it
T0 = os:timestamp(),
- {ok, DbInfo} = fabric:get_db_info(DbName),
+ {ok, DbInfo} = fabric2_db:get_db_info(Db),
DeltaT = timer:now_diff(os:timestamp(), T0) / 1000,
couch_stats:update_histogram([couchdb, dbinfo], DeltaT),
send_json(Req, {DbInfo});
@@ -422,22 +394,22 @@ db_req(#httpd{method='GET',path_parts=[DbName]}=Req, _Db) ->
db_req(#httpd{method='POST', path_parts=[DbName], user_ctx=Ctx}=Req, Db) ->
chttpd:validate_ctype(Req, "application/json"),
- W = chttpd:qs_value(Req, "w", integer_to_list(mem3:quorum(Db))),
- Options = [{user_ctx,Ctx}, {w,W}],
+ Options = [{user_ctx,Ctx}],
- Doc = couch_db:doc_from_json_obj_validate(Db, chttpd:json_body(Req)),
- Doc2 = case Doc#doc.id of
+ Doc0 = chttpd:json_body(Req),
+ Doc1 = couch_doc:from_json_obj_validate(Doc0, fabric2_db:name(Db)),
+ Doc2 = case Doc1#doc.id of
<<"">> ->
- Doc#doc{id=couch_uuids:new(), revs={0, []}};
+ Doc1#doc{id=couch_uuids:new(), revs={0, []}};
_ ->
- Doc
+ Doc1
end,
DocId = Doc2#doc.id,
case chttpd:qs_value(Req, "batch") of
"ok" ->
% async_batching
spawn(fun() ->
- case catch(fabric:update_doc(Db, Doc2, Options)) of
+ case catch(fabric2_db:update_doc(Db, Doc2, Options)) of
{ok, _} ->
chttpd_stats:incr_writes(),
ok;
@@ -457,7 +429,7 @@ db_req(#httpd{method='POST', path_parts=[DbName], user_ctx=Ctx}=Req, Db) ->
% normal
DocUrl = absolute_uri(Req, [$/, couch_util:url_encode(DbName),
$/, couch_util:url_encode(DocId)]),
- case fabric:update_doc(Db, Doc2, Options) of
+ case fabric2_db:update_doc(Db, Doc2, Options) of
{ok, NewRev} ->
chttpd_stats:incr_writes(),
HttpCode = 201;
@@ -475,13 +447,10 @@ db_req(#httpd{method='POST', path_parts=[DbName], user_ctx=Ctx}=Req, Db) ->
db_req(#httpd{path_parts=[_DbName]}=Req, _Db) ->
send_method_not_allowed(Req, "DELETE,GET,HEAD,POST");
-db_req(#httpd{method='POST', path_parts=[DbName, <<"_ensure_full_commit">>],
- user_ctx=Ctx}=Req, _Db) ->
+db_req(#httpd{method='POST', path_parts=[_DbName, <<"_ensure_full_commit">>],
+ user_ctx=Ctx}=Req, Db) ->
chttpd:validate_ctype(Req, "application/json"),
- %% use fabric call to trigger a database_does_not_exist exception
- %% for missing databases that'd return error 404 from chttpd
- %% get_security used to prefer shards on the same node over other nodes
- fabric:get_security(DbName, [{user_ctx, Ctx}]),
+ #{db_prefix := <<_/binary>>} = Db,
send_json(Req, 201, {[
{ok, true},
{instance_start_time, <<"0">>}
@@ -503,22 +472,17 @@ db_req(#httpd{method='POST',path_parts=[_,<<"_bulk_docs">>], user_ctx=Ctx}=Req,
DocsArray0
end,
couch_stats:update_histogram([couchdb, httpd, bulk_docs], length(DocsArray)),
- W = case couch_util:get_value(<<"w">>, JsonProps) of
- Value when is_integer(Value) ->
- integer_to_list(Value);
- _ ->
- chttpd:qs_value(Req, "w", integer_to_list(mem3:quorum(Db)))
- end,
case chttpd:header_value(Req, "X-Couch-Full-Commit") of
"true" ->
- Options = [full_commit, {user_ctx,Ctx}, {w,W}];
+ Options = [full_commit, {user_ctx,Ctx}];
"false" ->
- Options = [delay_commit, {user_ctx,Ctx}, {w,W}];
+ Options = [delay_commit, {user_ctx,Ctx}];
_ ->
- Options = [{user_ctx,Ctx}, {w,W}]
+ Options = [{user_ctx,Ctx}]
end,
+ DbName = fabric2_db:name(Db),
Docs = lists:map(fun(JsonObj) ->
- Doc = couch_db:doc_from_json_obj_validate(Db, JsonObj),
+ Doc = couch_doc:from_json_obj_validate(JsonObj, DbName),
validate_attachment_names(Doc),
case Doc#doc.id of
<<>> -> Doc#doc{id = couch_uuids:new()};
@@ -532,7 +496,7 @@ db_req(#httpd{method='POST',path_parts=[_,<<"_bulk_docs">>], user_ctx=Ctx}=Req,
true -> [all_or_nothing|Options];
_ -> Options
end,
- case fabric:update_docs(Db, Docs, Options2) of
+ case fabric2_db:update_docs(Db, Docs, Options2) of
{ok, Results} ->
% output the results
chttpd_stats:incr_writes(length(Results)),
@@ -551,7 +515,7 @@ db_req(#httpd{method='POST',path_parts=[_,<<"_bulk_docs">>], user_ctx=Ctx}=Req,
send_json(Req, 417, ErrorsJson)
end;
false ->
- case fabric:update_docs(Db, Docs, [replicated_changes|Options]) of
+ case fabric2_db:update_docs(Db, Docs, [replicated_changes|Options]) of
{ok, Errors} ->
chttpd_stats:incr_writes(length(Docs)),
ErrorsJson = lists:map(fun update_doc_result_to_json/1, Errors),
@@ -647,8 +611,7 @@ db_req(#httpd{path_parts=[_, <<"_bulk_get">>]}=Req, _Db) ->
db_req(#httpd{method='POST',path_parts=[_,<<"_purge">>]}=Req, Db) ->
couch_stats:increment_counter([couchdb, httpd, purge_requests]),
chttpd:validate_ctype(Req, "application/json"),
- W = chttpd:qs_value(Req, "w", integer_to_list(mem3:quorum(Db))),
- Options = [{user_ctx, Req#httpd.user_ctx}, {w, W}],
+ Options = [{user_ctx, Req#httpd.user_ctx}],
{IdsRevs} = chttpd:json_body_obj(Req),
IdsRevs2 = [{Id, couch_doc:parse_revs(Revs)} || {Id, Revs} <- IdsRevs],
MaxIds = config:get_integer("purge", "max_document_id_number", 100),
@@ -723,7 +686,7 @@ db_req(#httpd{path_parts=[_,OP]}=Req, _Db) when ?IS_ALL_DOCS(OP) ->
db_req(#httpd{method='POST',path_parts=[_,<<"_missing_revs">>]}=Req, Db) ->
chttpd:validate_ctype(Req, "application/json"),
{JsonDocIdRevs} = chttpd:json_body_obj(Req),
- case fabric:get_missing_revs(Db, JsonDocIdRevs) of
+ case fabric2_db:get_missing_revs(Db, JsonDocIdRevs) of
{error, Reason} ->
chttpd:send_error(Req, Reason);
{ok, Results} ->
@@ -740,7 +703,7 @@ db_req(#httpd{path_parts=[_,<<"_missing_revs">>]}=Req, _Db) ->
db_req(#httpd{method='POST',path_parts=[_,<<"_revs_diff">>]}=Req, Db) ->
chttpd:validate_ctype(Req, "application/json"),
{JsonDocIdRevs} = chttpd:json_body_obj(Req),
- case fabric:get_missing_revs(Db, JsonDocIdRevs) of
+ case fabric2_db:get_missing_revs(Db, JsonDocIdRevs) of
{error, Reason} ->
chttpd:send_error(Req, Reason);
{ok, Results} ->
@@ -856,22 +819,22 @@ multi_all_docs_view(Req, Db, OP, Queries) ->
200, [], FirstChunk),
VAcc1 = VAcc0#vacc{resp=Resp0},
VAcc2 = lists:foldl(fun(Args, Acc0) ->
- {ok, Acc1} = fabric:all_docs(Db, Options,
+ {ok, Acc1} = fabric2_db:fold_docs(Db, Options,
fun view_cb/2, Acc0, Args),
Acc1
end, VAcc1, ArgQueries),
{ok, Resp1} = chttpd:send_delayed_chunk(VAcc2#vacc.resp, "\r\n]}"),
chttpd:end_delayed_json_response(Resp1).
-all_docs_view(Req, Db, Keys, OP) ->
- Args0 = couch_mrview_http:parse_params(Req, Keys),
- Args1 = Args0#mrargs{view_type=map},
- Args2 = fabric_util:validate_all_docs_args(Db, Args1),
- Args3 = set_namespace(OP, Args2),
+all_docs_view(Req, Db, _Keys, _OP) ->
+ % Args0 = couch_mrview_http:parse_params(Req, Keys),
+ % Args1 = Args0#mrargs{view_type=map},
+ % Args2 = fabric_util:validate_all_docs_args(Db, Args1),
+ % Args3 = set_namespace(OP, Args2),
Options = [{user_ctx, Req#httpd.user_ctx}],
Max = chttpd:chunked_response_buffer_size(),
VAcc = #vacc{db=Db, req=Req, threshold=Max},
- {ok, Resp} = fabric:all_docs(Db, Options, fun view_cb/2, VAcc, Args3),
+ {ok, Resp} = fabric2_db:fold_docs(Db, fun view_cb/2, VAcc, Options),
{ok, Resp#vacc.resp}.
view_cb({row, Row} = Msg, Acc) ->
@@ -915,7 +878,7 @@ db_doc_req(#httpd{method='GET', mochi_req=MochiReq}=Req, Db, DocId) ->
Doc = couch_doc_open(Db, DocId, Rev, Options2),
send_doc(Req, Doc, Options2);
_ ->
- case fabric:open_revs(Db, DocId, Revs, Options) of
+ case fabric2_db:open_doc_revs(Db, DocId, Revs, Options) of
{ok, []} when Revs == all ->
chttpd:send_error(Req, {not_found, missing});
{ok, Results} ->
@@ -956,8 +919,7 @@ db_doc_req(#httpd{method='POST', user_ctx=Ctx}=Req, Db, DocId) ->
couch_db:validate_docid(Db, DocId),
chttpd:validate_ctype(Req, "multipart/form-data"),
- W = chttpd:qs_value(Req, "w", integer_to_list(mem3:quorum(Db))),
- Options = [{user_ctx,Ctx}, {w,W}],
+ Options = [{user_ctx,Ctx}],
Form = couch_httpd:parse_form(Req),
case proplists:is_defined("_doc", Form) of
@@ -966,7 +928,7 @@ db_doc_req(#httpd{method='POST', user_ctx=Ctx}=Req, Db, DocId) ->
Doc = couch_doc_from_req(Req, Db, DocId, Json);
false ->
Rev = couch_doc:parse_rev(list_to_binary(couch_util:get_value("_rev", Form))),
- Doc = case fabric:open_revs(Db, DocId, [Rev], []) of
+ Doc = case fabric2_db:open_doc_revs(Db, DocId, [Rev], []) of
{ok, [{ok, Doc0}]} ->
chttpd_stats:incr_reads(),
Doc0;
@@ -995,7 +957,7 @@ db_doc_req(#httpd{method='POST', user_ctx=Ctx}=Req, Db, DocId) ->
NewDoc = Doc#doc{
atts = UpdatedAtts ++ OldAtts2
},
- case fabric:update_doc(Db, NewDoc, Options) of
+ case fabric2_db:update_doc(Db, NewDoc, Options) of
{ok, NewRev} ->
chttpd_stats:incr_writes(),
HttpCode = 201;
@@ -1013,11 +975,10 @@ db_doc_req(#httpd{method='PUT', user_ctx=Ctx}=Req, Db, DocId) ->
#doc_query_args{
update_type = UpdateType
} = parse_doc_query(Req),
- DbName = couch_db:name(Db),
- couch_db:validate_docid(Db, DocId),
+ DbName = fabric2_db:name(Db),
+ couch_doc:validate_docid(DocId),
- W = chttpd:qs_value(Req, "w", integer_to_list(mem3:quorum(Db))),
- Options = [{user_ctx,Ctx}, {w,W}],
+ Options = [{user_ctx, Ctx}],
Loc = absolute_uri(Req, [$/, couch_util:url_encode(DbName),
$/, couch_util:url_encode(DocId)]),
@@ -1025,7 +986,7 @@ db_doc_req(#httpd{method='PUT', user_ctx=Ctx}=Req, Db, DocId) ->
case couch_util:to_list(couch_httpd:header_value(Req, "Content-Type")) of
("multipart/related;" ++ _) = ContentType ->
couch_httpd:check_max_request_length(Req),
- couch_httpd_multipart:num_mp_writers(mem3:n(mem3:dbname(DbName), DocId)),
+ couch_httpd_multipart:num_mp_writers(1),
{ok, Doc0, WaitFun, Parser} = couch_doc:doc_from_multi_part_stream(ContentType,
fun() -> receive_request_data(Req) end),
Doc = couch_doc_from_req(Req, Db, DocId, Doc0),
@@ -1045,7 +1006,7 @@ db_doc_req(#httpd{method='PUT', user_ctx=Ctx}=Req, Db, DocId) ->
Doc = couch_doc_from_req(Req, Db, DocId, chttpd:json_body(Req)),
spawn(fun() ->
- case catch(fabric:update_doc(Db, Doc, Options)) of
+ case catch(fabric2_db:update_doc(Db, Doc, Options)) of
{ok, _} ->
chttpd_stats:incr_writes(),
ok;
@@ -1079,7 +1040,7 @@ db_doc_req(#httpd{method='COPY', user_ctx=Ctx}=Req, Db, SourceDocId) ->
% open old doc
Doc = couch_doc_open(Db, SourceDocId, SourceRev, []),
% save new doc
- case fabric:update_doc(Db,
+ case fabric2_db:update_doc(Db,
Doc#doc{id=TargetDocId, revs=TargetRevs}, [{user_ctx,Ctx}]) of
{ok, NewTargetRev} ->
chttpd_stats:incr_writes(),
@@ -1180,7 +1141,7 @@ send_docs_multipart(Req, Results, Options1) ->
CType = {"Content-Type",
"multipart/mixed; boundary=\"" ++ ?b2l(OuterBoundary) ++ "\""},
{ok, Resp} = start_chunked_response(Req, 200, [CType]),
- couch_httpd:send_chunk(Resp, <<"--", OuterBoundary/binary>>),
+ chttpd:send_chunk(Resp, <<"--", OuterBoundary/binary>>),
lists:foreach(
fun({ok, #doc{atts=Atts}=Doc}) ->
Refs = monitor_attachments(Doc#doc.atts),
@@ -1188,25 +1149,25 @@ send_docs_multipart(Req, Results, Options1) ->
JsonBytes = ?JSON_ENCODE(couch_doc:to_json_obj(Doc, Options)),
{ContentType, _Len} = couch_doc:len_doc_to_multi_part_stream(
InnerBoundary, JsonBytes, Atts, true),
- couch_httpd:send_chunk(Resp, <<"\r\nContent-Type: ",
+ chttpd:send_chunk(Resp, <<"\r\nContent-Type: ",
ContentType/binary, "\r\n\r\n">>),
couch_doc:doc_to_multi_part_stream(InnerBoundary, JsonBytes, Atts,
- fun(Data) -> couch_httpd:send_chunk(Resp, Data)
+ fun(Data) -> chttpd:send_chunk(Resp, Data)
end, true),
- couch_httpd:send_chunk(Resp, <<"\r\n--", OuterBoundary/binary>>)
+ chttpd:send_chunk(Resp, <<"\r\n--", OuterBoundary/binary>>)
after
demonitor_refs(Refs)
end;
({{not_found, missing}, RevId}) ->
RevStr = couch_doc:rev_to_str(RevId),
Json = ?JSON_ENCODE({[{<<"missing">>, RevStr}]}),
- couch_httpd:send_chunk(Resp,
+ chttpd:send_chunk(Resp,
[<<"\r\nContent-Type: application/json; error=\"true\"\r\n\r\n">>,
Json,
<<"\r\n--", OuterBoundary/binary>>])
end, Results),
- couch_httpd:send_chunk(Resp, <<"--">>),
- couch_httpd:last_chunk(Resp).
+ chttpd:send_chunk(Resp, <<"--">>),
+ chttpd:last_chunk(Resp).
bulk_get_multipart_headers({0, []}, Id, Boundary) ->
[
@@ -1276,15 +1237,14 @@ send_updated_doc(Req, Db, DocId, Doc, Headers) ->
send_updated_doc(#httpd{user_ctx=Ctx} = Req, Db, DocId, #doc{deleted=Deleted}=Doc,
Headers, UpdateType) ->
- W = chttpd:qs_value(Req, "w", integer_to_list(mem3:quorum(Db))),
Options =
case couch_httpd:header_value(Req, "X-Couch-Full-Commit") of
"true" ->
- [full_commit, UpdateType, {user_ctx,Ctx}, {w,W}];
+ [full_commit, UpdateType, {user_ctx,Ctx}];
"false" ->
- [delay_commit, UpdateType, {user_ctx,Ctx}, {w,W}];
+ [delay_commit, UpdateType, {user_ctx,Ctx}];
_ ->
- [UpdateType, {user_ctx,Ctx}, {w,W}]
+ [UpdateType, {user_ctx,Ctx}]
end,
{Status, {etag, Etag}, Body} = update_doc(Db, DocId,
#doc{deleted=Deleted}=Doc, Options),
@@ -1303,31 +1263,7 @@ http_code_from_status(Status) ->
end.
update_doc(Db, DocId, #doc{deleted=Deleted, body=DocBody}=Doc, Options) ->
- {_, Ref} = spawn_monitor(fun() ->
- try fabric:update_doc(Db, Doc, Options) of
- Resp ->
- exit({exit_ok, Resp})
- catch
- throw:Reason ->
- exit({exit_throw, Reason});
- error:Reason ->
- exit({exit_error, Reason});
- exit:Reason ->
- exit({exit_exit, Reason})
- end
- end),
- Result = receive
- {'DOWN', Ref, _, _, {exit_ok, Ret}} ->
- Ret;
- {'DOWN', Ref, _, _, {exit_throw, Reason}} ->
- throw(Reason);
- {'DOWN', Ref, _, _, {exit_error, Reason}} ->
- erlang:error(Reason);
- {'DOWN', Ref, _, _, {exit_exit, Reason}} ->
- erlang:exit(Reason)
- end,
-
- case Result of
+ case fabric2_db:update_doc(Db, Doc, Options) of
{ok, NewRev} ->
Accepted = false;
{accepted, NewRev} ->
@@ -1374,7 +1310,7 @@ couch_doc_from_req(Req, _Db, DocId, #doc{revs=Revs} = Doc) ->
end,
Doc#doc{id=DocId, revs=Revs2};
couch_doc_from_req(Req, Db, DocId, Json) ->
- Doc = couch_db:doc_from_json_obj_validate(Db, Json),
+ Doc = couch_doc:from_json_obj_validate(Json, fabric2_db:name(Db)),
couch_doc_from_req(Req, Db, DocId, Doc).
@@ -1382,11 +1318,10 @@ couch_doc_from_req(Req, Db, DocId, Json) ->
% couch_doc_open(Db, DocId) ->
% couch_doc_open(Db, DocId, nil, []).
-couch_doc_open(Db, DocId, Rev, Options0) ->
- Options = [{user_ctx, couch_db:get_user_ctx(Db)} | Options0],
+couch_doc_open(Db, DocId, Rev, Options) ->
case Rev of
nil -> % open most recent rev
- case fabric:open_doc(Db, DocId, Options) of
+ case fabric2_db:open_doc(Db, DocId, Options) of
{ok, Doc} ->
chttpd_stats:incr_reads(),
Doc;
@@ -1394,7 +1329,7 @@ couch_doc_open(Db, DocId, Rev, Options0) ->
throw(Error)
end;
_ -> % open a specific rev (deletions come back as stubs)
- case fabric:open_revs(Db, DocId, [Rev], Options) of
+ case fabric2_db:open_doc_revs(Db, DocId, [Rev], Options) of
{ok, [{ok, Doc}]} ->
chttpd_stats:incr_reads(),
Doc;
@@ -1515,8 +1450,12 @@ db_attachment_req(#httpd{method='GET',mochi_req=MochiReq}=Req, Db, DocId, FileNa
end;
-db_attachment_req(#httpd{method=Method, user_ctx=Ctx}=Req, Db, DocId, FileNameParts)
+db_attachment_req(#httpd{method=Method}=Req, Db, DocId, FileNameParts)
when (Method == 'PUT') or (Method == 'DELETE') ->
+ #httpd{
+ user_ctx = Ctx,
+ mochi_req = MochiReq
+ } = Req,
FileName = validate_attachment_name(
mochiweb_util:join(
lists:map(fun binary_to_list/1,
@@ -1526,16 +1465,45 @@ db_attachment_req(#httpd{method=Method, user_ctx=Ctx}=Req, Db, DocId, FileNamePa
'DELETE' ->
[];
_ ->
- MimeType = case couch_httpd:header_value(Req,"Content-Type") of
+ MimeType = case chttpd:header_value(Req,"Content-Type") of
% We could throw an error here or guess by the FileName.
% Currently, just giving it a default.
undefined -> <<"application/octet-stream">>;
CType -> list_to_binary(CType)
end,
- Data = fabric:att_receiver(Req, chttpd:body_length(Req)),
+ Data = case chttpd:body_length(Req) of
+ undefined ->
+ <<"">>;
+ {unknown_transfer_encoding, Unknown} ->
+ exit({unknown_transfer_encoding, Unknown});
+ chunked ->
+ fun(MaxChunkSize, ChunkFun, InitState) ->
+ chttpd:recv_chunked(
+ Req, MaxChunkSize, ChunkFun, InitState
+ )
+ end;
+ 0 ->
+ <<"">>;
+ Length when is_integer(Length) ->
+ Expect = case chttpd:header_value(Req, "expect") of
+ undefined ->
+ undefined;
+ Value when is_list(Value) ->
+ string:to_lower(Value)
+ end,
+ case Expect of
+ "100-continue" ->
+ MochiReq:start_raw_response({100, gb_trees:empty()});
+ _Else ->
+ ok
+ end,
+ fun() -> chttpd:recv(Req, 0) end;
+ Length ->
+ exit({length_not_integer, Length})
+ end,
ContentLen = case couch_httpd:header_value(Req,"Content-Length") of
undefined -> undefined;
- Length -> list_to_integer(Length)
+ CL -> list_to_integer(CL)
end,
ContentEnc = string:to_lower(string:strip(
couch_httpd:header_value(Req, "Content-Encoding", "identity")
@@ -1570,7 +1538,7 @@ db_attachment_req(#httpd{method=Method, user_ctx=Ctx}=Req, Db, DocId, FileNamePa
couch_db:validate_docid(Db, DocId),
#doc{id=DocId};
Rev ->
- case fabric:open_revs(Db, DocId, [Rev], [{user_ctx,Ctx}]) of
+ case fabric2_db:open_doc_revs(Db, DocId, [Rev], [{user_ctx,Ctx}]) of
{ok, [{ok, Doc0}]} ->
chttpd_stats:incr_reads(),
Doc0;
@@ -1585,8 +1553,7 @@ db_attachment_req(#httpd{method=Method, user_ctx=Ctx}=Req, Db, DocId, FileNamePa
DocEdited = Doc#doc{
atts = NewAtt ++ [A || A <- Atts, couch_att:fetch(name, A) /= FileName]
},
- W = chttpd:qs_value(Req, "w", integer_to_list(mem3:quorum(Db))),
- case fabric:update_doc(Db, DocEdited, [{user_ctx,Ctx}, {w,W}]) of
+ case fabric2_db:update_doc(Db, DocEdited, [{user_ctx,Ctx}]) of
{ok, UpdatedRev} ->
chttpd_stats:incr_writes(),
HttpCode = 201;
@@ -1595,7 +1562,7 @@ db_attachment_req(#httpd{method=Method, user_ctx=Ctx}=Req, Db, DocId, FileNamePa
HttpCode = 202
end,
erlang:put(mochiweb_request_recv, true),
- DbName = couch_db:name(Db),
+ DbName = fabric2_db:name(Db),
{Status, Headers} = case Method of
'DELETE' ->
@@ -1682,46 +1649,6 @@ get_md5_header(Req) ->
parse_doc_query(Req) ->
lists:foldl(fun parse_doc_query/2, #doc_query_args{}, chttpd:qs(Req)).
-parse_engine_opt(Req) ->
- case chttpd:qs_value(Req, "engine") of
- undefined ->
- [];
- Extension ->
- Available = couch_server:get_engine_extensions(),
- case lists:member(Extension, Available) of
- true ->
- [{engine, iolist_to_binary(Extension)}];
- false ->
- throw({bad_request, invalid_engine_extension})
- end
- end.
-
-
-parse_partitioned_opt(Req) ->
- case chttpd:qs_value(Req, "partitioned") of
- undefined ->
- [];
- "false" ->
- [];
- "true" ->
- ok = validate_partitioned_db_enabled(Req),
- [
- {partitioned, true},
- {hash, [couch_partition, hash, []]}
- ];
- _ ->
- throw({bad_request, <<"Invalid `partitioned` parameter">>})
- end.
-
-
-validate_partitioned_db_enabled(Req) ->
- case couch_flags:is_enabled(partitioned, Req) of
- true ->
- ok;
- false ->
- throw({bad_request, <<"Partitioned feature is not enabled.">>})
- end.
-
parse_doc_query({Key, Value}, Args) ->
case {Key, Value} of
@@ -1791,7 +1718,7 @@ parse_changes_query(Req) ->
{"descending", "true"} ->
Args#changes_args{dir=rev};
{"since", _} ->
- Args#changes_args{since=Value};
+ Args#changes_args{since=parse_since_seq(Value)};
{"last-event-id", _} ->
Args#changes_args{since=Value};
{"limit", _} ->
@@ -1845,6 +1772,27 @@ parse_changes_query(Req) ->
ChangesArgs
end.
+
+parse_since_seq(Seq) when is_binary(Seq), size(Seq) > 30 ->
+ throw({bad_request, url_encoded_since_seq});
+
+parse_since_seq(Seq) when is_binary(Seq), size(Seq) > 2 ->
+ % We have implicitly allowed the since seq to either be
+ % JSON encoded or a "raw" string. Here we just remove the
+ % surrounding quotes if they exist and are paired.
+ SeqSize = size(Seq) - 2,
+ case Seq of
+ <<"\"", S:SeqSize/binary, "\"">> -> S;
+ S -> S
+ end;
+
+parse_since_seq(Seq) when is_binary(Seq) ->
+ Seq;
+
+parse_since_seq(Seq) when is_list(Seq) ->
+ parse_since_seq(iolist_to_binary(Seq)).
+
+
extract_header_rev(Req, ExplicitRev) when is_binary(ExplicitRev) or is_list(ExplicitRev)->
extract_header_rev(Req, couch_doc:parse_rev(ExplicitRev));
extract_header_rev(Req, ExplicitRev) ->
@@ -1885,6 +1833,8 @@ monitor_attachments(Atts) when is_list(Atts) ->
case couch_att:fetch(data, Att) of
{Fd, _} ->
[monitor(process, Fd) | Monitors];
+ {loc, _, _, _} ->
+ Monitors;
stub ->
Monitors;
Else ->
@@ -1982,7 +1932,7 @@ bulk_get_open_doc_revs1(Db, Props, Options, {DocId, Revs}) ->
bulk_get_open_doc_revs1(Db, Props, Options, {DocId, Revs, Options1})
end;
bulk_get_open_doc_revs1(Db, Props, _, {DocId, Revs, Options}) ->
- case fabric:open_revs(Db, DocId, Revs, Options) of
+ case fabric2_db:open_doc_revs(Db, DocId, Revs, Options) of
{ok, []} ->
RevStr = couch_util:get_value(<<"rev">>, Props),
Error = {RevStr, <<"not_found">>, <<"missing">>},
diff --git a/src/chttpd/src/chttpd_external.erl b/src/chttpd/src/chttpd_external.erl
index fa35c6ba2..3e59ffe4e 100644
--- a/src/chttpd/src/chttpd_external.erl
+++ b/src/chttpd/src/chttpd_external.erl
@@ -74,7 +74,7 @@ json_req_obj_fields() ->
<<"peer">>, <<"form">>, <<"cookie">>, <<"userCtx">>, <<"secObj">>].
json_req_obj_field(<<"info">>, #httpd{}, Db, _DocId) ->
- {ok, Info} = get_db_info(Db),
+ {ok, Info} = fabric2_db:get_db_info(Db),
{Info};
json_req_obj_field(<<"uuid">>, #httpd{}, _Db, _DocId) ->
couch_uuids:new();
@@ -117,27 +117,18 @@ json_req_obj_field(<<"form">>, #httpd{mochi_req=Req, method=Method}=HttpReq, Db,
json_req_obj_field(<<"cookie">>, #httpd{mochi_req=Req}, _Db, _DocId) ->
to_json_terms(Req:parse_cookie());
json_req_obj_field(<<"userCtx">>, #httpd{}, Db, _DocId) ->
- couch_util:json_user_ctx(Db);
-json_req_obj_field(<<"secObj">>, #httpd{user_ctx=UserCtx}, Db, _DocId) ->
- get_db_security(Db, UserCtx).
-
-
-get_db_info(Db) ->
- case couch_db:is_clustered(Db) of
- true ->
- fabric:get_db_info(Db);
- false ->
- couch_db:get_db_info(Db)
- end.
-
-
-get_db_security(Db, #user_ctx{}) ->
- case couch_db:is_clustered(Db) of
- true ->
- fabric:get_security(Db);
- false ->
- couch_db:get_security(Db)
- end.
+ json_user_ctx(Db);
+json_req_obj_field(<<"secObj">>, #httpd{user_ctx = #user_ctx{}}, Db, _DocId) ->
+ fabric2_db:get_security(Db).
+
+
+json_user_ctx(Db) ->
+ Ctx = fabric2_db:get_user_ctx(Db),
+ {[
+ {<<"db">>, fabric2_db:name(Db)},
+ {<<"name">>, Ctx#user_ctx.name},
+ {<<"roles">>, Ctx#user_ctx.roles}
+ ]}.
to_json_terms(Data) ->
diff --git a/src/chttpd/src/chttpd_misc.erl b/src/chttpd/src/chttpd_misc.erl
index 819d7820e..b244e84f6 100644
--- a/src/chttpd/src/chttpd_misc.erl
+++ b/src/chttpd/src/chttpd_misc.erl
@@ -108,43 +108,39 @@ maybe_add_csp_headers(Headers, _) ->
Headers.
handle_all_dbs_req(#httpd{method='GET'}=Req) ->
- Args = couch_mrview_http:parse_params(Req, undefined),
- ShardDbName = config:get("mem3", "shards_db", "_dbs"),
- %% shard_db is not sharded but mem3:shards treats it as an edge case
- %% so it can be pushed thru fabric
- {ok, Info} = fabric:get_db_info(ShardDbName),
- Etag = couch_httpd:make_etag({Info}),
- Options = [{user_ctx, Req#httpd.user_ctx}],
+ % TODO: Support args and options properly, transform
+ % this back into a fold call similar to the old
+ % version.
+ %% Args = couch_mrview_http:parse_params(Req, undefined),
+ % Eventually the Etag for this request will be derived
+ % from the \xFFmetadataVersion key in fdb
+ Etag = <<"foo">>,
+ %% Options = [{user_ctx, Req#httpd.user_ctx}],
{ok, Resp} = chttpd:etag_respond(Req, Etag, fun() ->
- {ok, Resp} = chttpd:start_delayed_json_response(Req, 200, [{"ETag",Etag}]),
- VAcc = #vacc{req=Req,resp=Resp},
- fabric:all_docs(ShardDbName, Options, fun all_dbs_callback/2, VAcc, Args)
- end),
- case is_record(Resp, vacc) of
- true -> {ok, Resp#vacc.resp};
- _ -> {ok, Resp}
- end;
+ AllDbs = fabric2_db:list_dbs(),
+ chttpd:send_json(Req, AllDbs)
+ end);
handle_all_dbs_req(Req) ->
send_method_not_allowed(Req, "GET,HEAD").
-all_dbs_callback({meta, _Meta}, #vacc{resp=Resp0}=Acc) ->
- {ok, Resp1} = chttpd:send_delayed_chunk(Resp0, "["),
- {ok, Acc#vacc{resp=Resp1}};
-all_dbs_callback({row, Row}, #vacc{resp=Resp0}=Acc) ->
- Prepend = couch_mrview_http:prepend_val(Acc),
- case couch_util:get_value(id, Row) of <<"_design", _/binary>> ->
- {ok, Acc};
- DbName ->
- {ok, Resp1} = chttpd:send_delayed_chunk(Resp0, [Prepend, ?JSON_ENCODE(DbName)]),
- {ok, Acc#vacc{prepend=",", resp=Resp1}}
- end;
-all_dbs_callback(complete, #vacc{resp=Resp0}=Acc) ->
- {ok, Resp1} = chttpd:send_delayed_chunk(Resp0, "]"),
- {ok, Resp2} = chttpd:end_delayed_json_response(Resp1),
- {ok, Acc#vacc{resp=Resp2}};
-all_dbs_callback({error, Reason}, #vacc{resp=Resp0}=Acc) ->
- {ok, Resp1} = chttpd:send_delayed_error(Resp0, Reason),
- {ok, Acc#vacc{resp=Resp1}}.
+%% all_dbs_callback({meta, _Meta}, #vacc{resp=Resp0}=Acc) ->
+%% {ok, Resp1} = chttpd:send_delayed_chunk(Resp0, "["),
+%% {ok, Acc#vacc{resp=Resp1}};
+%% all_dbs_callback({row, Row}, #vacc{resp=Resp0}=Acc) ->
+%% Prepend = couch_mrview_http:prepend_val(Acc),
+%% case couch_util:get_value(id, Row) of <<"_design", _/binary>> ->
+%% {ok, Acc};
+%% DbName ->
+%% {ok, Resp1} = chttpd:send_delayed_chunk(Resp0, [Prepend, ?JSON_ENCODE(DbName)]),
+%% {ok, Acc#vacc{prepend=",", resp=Resp1}}
+%% end;
+%% all_dbs_callback(complete, #vacc{resp=Resp0}=Acc) ->
+%% {ok, Resp1} = chttpd:send_delayed_chunk(Resp0, "]"),
+%% {ok, Resp2} = chttpd:end_delayed_json_response(Resp1),
+%% {ok, Acc#vacc{resp=Resp2}};
+%% all_dbs_callback({error, Reason}, #vacc{resp=Resp0}=Acc) ->
+%% {ok, Resp1} = chttpd:send_delayed_error(Resp0, Reason),
+%% {ok, Acc#vacc{resp=Resp1}}.
handle_dbs_info_req(#httpd{method='POST'}=Req) ->
chttpd:validate_ctype(Req, "application/json"),
diff --git a/src/chttpd/src/chttpd_show.erl b/src/chttpd/src/chttpd_show.erl
index c3bf11929..2eb6dc3f8 100644
--- a/src/chttpd/src/chttpd_show.erl
+++ b/src/chttpd/src/chttpd_show.erl
@@ -123,15 +123,14 @@ send_doc_update_response(Req, Db, DDoc, UpdateName, Doc, DocId) ->
JsonReq = chttpd_external:json_req_obj(Req, Db, DocId),
JsonDoc = couch_query_servers:json_doc(Doc),
Cmd = [<<"updates">>, UpdateName],
- W = chttpd:qs_value(Req, "w", integer_to_list(mem3:quorum(Db))),
UpdateResp = couch_query_servers:ddoc_prompt(DDoc, Cmd, [JsonDoc, JsonReq]),
JsonResp = case UpdateResp of
[<<"up">>, {NewJsonDoc}, {JsonResp0}] ->
case chttpd:header_value(Req, "X-Couch-Full-Commit", "false") of
"true" ->
- Options = [full_commit, {user_ctx, Req#httpd.user_ctx}, {w, W}];
+ Options = [full_commit, {user_ctx, Req#httpd.user_ctx}];
_ ->
- Options = [{user_ctx, Req#httpd.user_ctx}, {w, W}]
+ Options = [{user_ctx, Req#httpd.user_ctx}]
end,
NewDoc = couch_db:doc_from_json_obj_validate(Db, {NewJsonDoc}),
couch_doc:validate_docid(NewDoc#doc.id),
diff --git a/src/couch_mrview/src/couch_mrview.erl b/src/couch_mrview/src/couch_mrview.erl
index ae1d8d6f5..cf6f27fde 100644
--- a/src/couch_mrview/src/couch_mrview.erl
+++ b/src/couch_mrview/src/couch_mrview.erl
@@ -173,8 +173,18 @@ join([H|[]], _, Acc) ->
join([H|T], Sep, Acc) ->
join(T, Sep, [Sep, H | Acc]).
+validate(#{} = Db, DDoc) ->
+ DbName = fabric2_db:name(Db),
+ IsPartitioned = fabric2_db:is_partitioned(Db),
+ validate(DbName, IsPartitioned, DDoc);
-validate(Db, DDoc) ->
+validate(Db, DDoc) ->
+ DbName = couch_db:name(Db),
+ IsPartitioned = couch_db:is_partitioned(Db),
+ validate(DbName, IsPartitioned, DDoc).
+
+
+validate(DbName, IsDbPartitioned, DDoc) ->
ok = validate_ddoc_fields(DDoc#doc.body),
GetName = fun
(#mrview{map_names = [Name | _]}) -> Name;
@@ -203,9 +213,9 @@ validate(Db, DDoc) ->
language = Lang,
views = Views,
partitioned = Partitioned
- }} = couch_mrview_util:ddoc_to_mrst(couch_db:name(Db), DDoc),
+ }} = couch_mrview_util:ddoc_to_mrst(DbName, DDoc),
- case {couch_db:is_partitioned(Db), Partitioned} of
+ case {IsDbPartitioned, Partitioned} of
{false, true} ->
throw({invalid_design_doc,
<<"partitioned option cannot be true in a "
diff --git a/test/elixir/test/basics_test.exs b/test/elixir/test/basics_test.exs
index 3491ef5a3..c28c78c81 100644
--- a/test/elixir/test/basics_test.exs
+++ b/test/elixir/test/basics_test.exs
@@ -100,7 +100,7 @@ defmodule BasicsTest do
db_name = context[:db_name]
{:ok, _} = create_doc(db_name, sample_doc_foo())
resp = Couch.get("/#{db_name}/foo", query: %{:local_seq => true})
- assert resp.body["_local_seq"] == 1, "Local seq value == 1"
+ assert is_binary(resp.body["_local_seq"]), "Local seq value is a binary"
end
@tag :with_db