diff options
author | benoitc <bchesneau@gmail.com> | 2014-02-07 15:38:34 +0100 |
---|---|---|
committer | benoitc <bchesneau@gmail.com> | 2014-02-07 15:38:34 +0100 |
commit | 38a18abbc4109ffbb7734a261c521b544e9a1a32 (patch) | |
tree | 10b911faa7124a9efbef45939f520a68657df610 | |
parent | c73144aaef7f86d169afd685dae121463efea658 (diff) | |
download | couchdb-38a18abbc4109ffbb7734a261c521b544e9a1a32.tar.gz |
add supports of view changes in the _changes API
Now when the option `seq_indexed=true` is set in the design doc, the
view filter in _changes will use it to retrieve the results. Compared to
the current way, using a view index will be faster to retrieve changes.
It also gives the possibility to filter changes by key or get changes in
a key range. All the view options can be used.
Note 1: if someone is trying to filter a changes with view options when
the views are not indexed by sequence, a 400 error will be returned.
Note 2: The changes will only be returned when the view is updated if
seq_indexed=true
-rw-r--r-- | apps/couch/src/couch_util.erl | 7 | ||||
-rw-r--r-- | apps/couch_httpd/src/couch_httpd_changes.erl | 250 | ||||
-rw-r--r-- | apps/couch_index/src/couch_index.erl | 13 | ||||
-rw-r--r-- | apps/couch_index/src/couch_index_server.erl | 7 | ||||
-rw-r--r-- | apps/couch_mrview/src/couch_mrview_changes.erl | 6 | ||||
-rw-r--r-- | apps/couch_mrview/src/couch_mrview_http.erl | 12 | ||||
-rw-r--r-- | apps/couch_mrview/src/couch_mrview_index.erl | 21 | ||||
-rw-r--r-- | apps/couch_mrview/test/09-index-events.t | 21 | ||||
-rw-r--r-- | share/www/script/test/changes.js | 39 |
9 files changed, 349 insertions, 27 deletions
diff --git a/apps/couch/src/couch_util.erl b/apps/couch/src/couch_util.erl index 40401e2a3..2a3bba7f4 100644 --- a/apps/couch/src/couch_util.erl +++ b/apps/couch/src/couch_util.erl @@ -32,6 +32,7 @@ -export([with_db/2]). -export([rfc1123_date/0, rfc1123_date/1]). -export([find_in_binary/2]). +-export([to_atom/1]). -include("couch_db.hrl"). @@ -352,6 +353,12 @@ to_list(V) when is_atom(V) -> to_list(V) -> lists:flatten(io_lib:format("~p", [V])). + +to_atom(V) when is_binary(V) -> + list_to_atom(binary_to_list(V)); +to_atom(V) when is_list(V) -> + list_to_atom(V). + url_encode(Bin) when is_binary(Bin) -> url_encode(binary_to_list(Bin)); url_encode([H|T]) -> diff --git a/apps/couch_httpd/src/couch_httpd_changes.erl b/apps/couch_httpd/src/couch_httpd_changes.erl index 1e431e9c7..56ce5597a 100644 --- a/apps/couch_httpd/src/couch_httpd_changes.erl +++ b/apps/couch_httpd/src/couch_httpd_changes.erl @@ -12,7 +12,9 @@ -module(couch_httpd_changes). --export([handle_changes_req/2]). +-export([handle_changes_req/2, + handle_changes/3, + handle_view_changes/3]). -include_lib("couch/include/couch_db.hrl"). @@ -34,9 +36,7 @@ handle_changes_req1(Req, #db{name=DbName}=Db) -> % on other databases, _changes is free for all. ok end, - handle_changes_req2(Req, Db). -handle_changes_req2(Req, Db) -> MakeCallback = fun(Resp) -> fun({change, {ChangeProp}=Change, _}, "eventsource") -> Seq = proplists:get_value(<<"seq">>, ChangeProp), @@ -72,7 +72,7 @@ handle_changes_req2(Req, Db) -> end end, ChangesArgs = parse_changes_query(Req, Db), - ChangesFun = couch_changes:handle_changes(ChangesArgs, Req, Db), + ChangesFun = handle_changes(ChangesArgs, Req, Db), WrapperFun = case ChangesArgs#changes_args.feed of "normal" -> {ok, Info} = couch_db:get_db_info(Db), @@ -116,6 +116,164 @@ handle_changes_req2(Req, Db) -> ) end. + +handle_changes(ChangesArgs, Req, Db) -> + case ChangesArgs#changes_args.filter of + "_view" -> + handle_view_changes(ChangesArgs, Req, Db); + _ -> + couch_changes:handle_changes(ChangesArgs, Req, Db) + end. + +%% wrapper around couch_mrview_changes. +%% This wrapper mimic couch_changes:handle_changes/3 and return a +%% Changefun that can be used by the handle_changes_req function. Also +%% while couch_mrview_changes:handle_changes/6 is returning tha view +%% changes this function return docs corresponding to the changes +%% instead so it can be used to replace the _view filter. +handle_view_changes(ChangesArgs, Req, Db) -> + %% parse view parameter + {DDocId, VName} = parse_view_param(Req), + + %% get view options + Query = case Req of + {json_req, {Props}} -> + {Q} = couch_util:get_value(<<"query">>, Props, {[]}), + Q; + _ -> + couch_httpd:qs(Req) + end, + ViewOptions = parse_view_options(Query, []), + + {ok, Infos} = couch_mrview:get_info(Db, DDocId), + case lists:member(<<"seq_indexed">>, + proplists:get_value(update_options, Infos, [])) of + true -> + handle_view_changes(Db, DDocId, VName, ViewOptions, ChangesArgs, + Req); + false when ViewOptions /= [] -> + ?LOG_ERROR("Tried to filter a non sequence indexed view~n",[]), + throw({bad_request, seqs_not_indexed}); + false -> + %% old method we are getting changes using the btree instead + %% which is not efficient, log it + ?LOG_WARN("Get view changes with seq_indexed=false.~n", []), + couch_changes:handle_changes(ChangesArgs, Req, Db) + end. + +handle_view_changes(#db{name=DbName}=Db0, DDocId, VName, ViewOptions, + ChangesArgs, Req) -> + #changes_args{ + feed = ResponseType, + since = Since, + db_open_options = DbOptions} = ChangesArgs, + + Options0 = [{since, Since}, + {view_options, ViewOptions}], + Options = case ResponseType of + "continuous" -> [stream | Options0]; + "eventsource" -> [stream | Options0]; + "longpoll" -> [{stream, once} | Options0]; + _ -> Options0 + end, + + %% reopen the db with the db options given to the changes args + couch_db:close(Db0), + DbOptions1 = [{user_ctx, Db0#db.user_ctx} | DbOptions], + {ok, Db} = couch_db:open(DbName, DbOptions1), + + + %% initialise the changes fun + ChangesFun = fun(Callback) -> + Callback(start, ResponseType), + + Acc0 = {"", 0, Db, Callback, ChangesArgs}, + couch_mrview_changes:handle_changes(DbName, DDocId, VName, + fun view_changes_cb/2, + Acc0, Options) + end, + ChangesFun. + + +view_changes_cb(stop, {LastSeq, {_, _, _, Callback, Args}}) -> + Callback({stop, LastSeq}, Args#changes_args.feed); + +view_changes_cb(heartbeat, {_, _, _, Callback, Args}=Acc) -> + Callback(timeout, Args#changes_args.feed), + {ok, Acc}; +view_changes_cb({{Seq, _Key, DocId}, _VAl}, + {Prepend, OldLimit, Db0, Callback, Args}=Acc) -> + + #changes_args{ + feed = ResponseType, + limit = Limit} = Args, + + %% if the doc sequence is > to the one in the db record, reopen the + %% database since it means we don't have the latest db value. + Db = case Db0#db.update_seq >= Seq of + true -> Db0; + false -> + {ok, Db1} = couch_db:reopen_db(Db0), + Db1 + end, + + case couch_db:get_doc_info(Db, DocId) of + {ok, DocInfo} -> + %% get change row + ChangeRow = view_change_row(Db, DocInfo, Args), + %% emit change row + Callback({change, ChangeRow, Prepend}, ResponseType), + + %% if we achieved the limit, stop here, else continue. + NewLimit = OldLimit + 1, + if Limit > NewLimit -> + {ok, {<<",\n">>, Db, NewLimit, Callback, Args}}; + true -> + {stop, {<<"">>, Db, NewLimit, Callback, Args}} + end; + {error, not_found} -> + %% doc not found, continue + {ok, Acc}; + Error -> + throw(Error) + end. + + +view_change_row(Db, DocInfo, Args) -> + #doc_info{id = Id, high_seq = Seq, revs = Revs} = DocInfo, + [#rev_info{rev=Rev, deleted=Del} | _] = Revs, + + #changes_args{style=Style, + include_docs=InDoc, + doc_options = DocOpts, + conflicts=Conflicts}=Args, + + Changes = case Style of + main_only -> + [{[{<<"rev">>, couch_doc:rev_to_str(Rev)}]}]; + all_docs -> + [{[{<<"rev">>, couch_doc:rev_to_str(R)}]} + || #rev_info{rev=R} <- Revs] + end, + + {[{<<"seq">>, Seq}, {<<"id">>, Id}, {<<"changes">>, Changes}] ++ + deleted_item(Del) ++ case InDoc of + true -> + Opts = case Conflicts of + true -> [deleted, conflicts]; + false -> [deleted] + end, + Doc = couch_index_util:load_doc(Db, DocInfo, Opts), + case Doc of + null -> + [{doc, null}]; + _ -> + [{doc, couch_doc:to_json_obj(Doc, DocOpts)}] + end; + false -> + [] + end}. + parse_changes_query(Req, Db) -> ChangesArgs = lists:foldl(fun({Key, Value}, Args) -> case {string:to_lower(Key), Value} of @@ -172,3 +330,87 @@ parse_changes_query(Req, Db) -> _ -> ChangesArgs end. + +parse_view_param({json_req, {Props}}) -> + {Query} = couch_util:get_value(<<"query">>, Props), + parse_view_param1(couch_util:get_value(<<"view">>, Query, <<"">>)); +parse_view_param(Req) -> + parse_view_param1(list_to_binary(couch_httpd:qs_value(Req, "view", ""))). + +parse_view_param1(ViewParam) -> + case re:split(ViewParam, <<"/">>) of + [DName, ViewName] -> + {<< "_design/", DName/binary >>, ViewName}; + _ -> + throw({bad_request, "Invalid `view` parameter."}) + end. + +parse_view_options([], Acc) -> + Acc; +parse_view_options([{K, V} | Rest], Acc) -> + Acc1 = case couch_util:to_binary(K) of + <<"reduce">> -> + [{reduce, couch_mrview_http:parse_boolean(V)}]; + <<"key">> -> + V1 = parse_json(V), + [{start_key, V1}, {end_key, V1} | Acc]; + <<"keys">> -> + [{keys, parse_json(V)} | Acc]; + <<"startkey">> -> + [{start_key, parse_json(V)} | Acc]; + <<"start_key">> -> + [{start_key, parse_json(V)} | Acc]; + <<"startkey_docid">> -> + [{start_key_docid, couch_util:to_binary(V)} | Acc]; + <<"start_key_docid">> -> + [{start_key_docid, couch_util:to_binary(V)} | Acc]; + <<"endkey">> -> + [{end_key, parse_json(V)} | Acc]; + <<"end_key">> -> + [{end_key, parse_json(V)} | Acc]; + <<"endkey_docid">> -> + [{start_key_docid, couch_util:to_binary(V)} | Acc]; + <<"end_key_docid">> -> + [{start_key_docid, couch_util:to_binary(V)} | Acc]; + <<"limit">> -> + [{limit, couch_mrview_http:parse_pos_int(V)} | Acc]; + <<"count">> -> + throw({query_parse_error, <<"QS param `count` is not `limit`">>}); + <<"stale">> when V =:= <<"ok">> orelse V =:= "ok" -> + [{stale, ok} | Acc]; + <<"stale">> when V =:= <<"update_after">> orelse V =:= "update_after" -> + [{stale, update_after} | Acc]; + <<"stale">> -> + throw({query_parse_error, <<"Invalid value for `stale`.">>}); + <<"descending">> -> + case couch_mrview_http:parse_boolean(V) of + true -> + [{direction, rev} | Acc]; + _ -> + [{direction, fwd} | Acc] + end; + <<"skip">> -> + [{skip, couch_mrview_http:parse_pos_int(V)} | Acc]; + <<"group">> -> + case couch_mrview_http:parse_booolean(V) of + true -> + [{group_level, exact} | Acc]; + _ -> + [{group_level, 0} | Acc] + end; + <<"group_level">> -> + [{group_level, couch_mrview_http:parse_pos_int(V)} | Acc]; + <<"inclusive_end">> -> + [{inclusive_end, couch_mrview_http:parse_boolean(V)}]; + _ -> + Acc + end, + parse_view_options(Rest, Acc1). + +parse_json(V) when is_list(V) -> + ?JSON_DECODE(V); +parse_json(V) -> + V. + +deleted_item(true) -> [{<<"deleted">>, true}]; +deleted_item(_) -> []. diff --git a/apps/couch_index/src/couch_index.erl b/apps/couch_index/src/couch_index.erl index c48c06674..b9ae567d0 100644 --- a/apps/couch_index/src/couch_index.erl +++ b/apps/couch_index/src/couch_index.erl @@ -254,12 +254,12 @@ handle_cast(delete, State) -> DbName = Mod:get(db_name, IdxState), DDocId = Mod:get(idx_name, IdxState), - ok = Mod:delete(IdxState), - %% notify about the index deletion couch_index_event:notify({index_delete, {DbName, DDocId, Mod}}), + ok = Mod:delete(IdxState), + {stop, normal, State}; handle_cast(ddoc_updated, State) -> #st{mod = Mod, idx_state = IdxState, waiters = Waiters} = State, @@ -319,7 +319,14 @@ handle_info(commit, State) -> {noreply, State} end; handle_info({'DOWN', _, _, _Pid, _}, #st{mod=Mod, idx_state=IdxState}=State) -> - Args = [Mod:get(db_name, IdxState), Mod:get(idx_name, IdxState)], + DbName = Mod:get(db_name, IdxState), + DDocId = Mod:get(idx_name, IdxState), + + %% notify to event listeners that the index has been + %% updated + couch_index_event:notify({index_delete, {DbName, DDocId, Mod}}), + + Args = [DbName, DDocId], ?LOG_INFO("Index shutdown by monitor notice for db: ~s idx: ~s", Args), catch send_all(State#st.waiters, shutdown), {stop, normal, State#st{waiters=[]}}. diff --git a/apps/couch_index/src/couch_index_server.erl b/apps/couch_index/src/couch_index_server.erl index facde1403..86791db5d 100644 --- a/apps/couch_index/src/couch_index_server.erl +++ b/apps/couch_index/src/couch_index_server.erl @@ -159,8 +159,6 @@ reset_indexes(DbName, Root) -> MRef = erlang:monitor(process, Pid), gen_server:cast(Pid, delete), receive {'DOWN', MRef, _, _, _} -> ok end, - couch_index_event:notify({index_delete, - {DbName, DDocId, couch_mrview_index}}), rem_from_ets(DbName, Sig, DDocId, Pid) end, lists:foreach(Fun, ets:lookup(?BY_DB, DbName)), @@ -195,11 +193,6 @@ update_notify({ddoc_updated, {DbName, DDocId}}) -> fun({_DbName, {_DDocId, Sig}}) -> case ets:lookup(?BY_SIG, {DbName, Sig}) of [{_, IndexPid}] -> - %% notify to event listeners that the index has been - %% updated - couch_index_event:notify({index_update, - {DbName, DDocId, - couch_mrview_index}}), (catch gen_server:cast(IndexPid, ddoc_updated)); [] -> ok diff --git a/apps/couch_mrview/src/couch_mrview_changes.erl b/apps/couch_mrview/src/couch_mrview_changes.erl index 2b8f91044..a0e528147 100644 --- a/apps/couch_mrview/src/couch_mrview_changes.erl +++ b/apps/couch_mrview/src/couch_mrview_changes.erl @@ -141,10 +141,8 @@ view_changes_since(#vst{dbname=DbName, ddoc=DDocId, view=View, true -> OldSeq end, - case Callback(KV, Acc2) of - {ok, Acc3} -> {ok, {Go, Acc3, LastSeq}}; - {stop, Acc3} -> {stop, {stop, Acc3, LastSeq}} - end + {Go, Acc3} = Callback(KV, Acc2), + {Go, {Go, Acc3, LastSeq}} end, Acc0 = {ok, UserAcc, Since}, diff --git a/apps/couch_mrview/src/couch_mrview_http.erl b/apps/couch_mrview/src/couch_mrview_http.erl index 22e0dc3ec..4801e27ed 100644 --- a/apps/couch_mrview/src/couch_mrview_http.erl +++ b/apps/couch_mrview/src/couch_mrview_http.erl @@ -22,6 +22,10 @@ parse_qs/2 ]). +-export([parse_boolean/1, + parse_int/1, + parse_pos_int/1]). + -include_lib("couch/include/couch_db.hrl"). -include_lib("couch_mrview/include/couch_mrview.hrl"). @@ -378,6 +382,10 @@ parse_qs(Key, Val, Args) -> end. +parse_boolean(true) -> + true; +parse_boolean(false) -> + false; parse_boolean(Val) -> case string:to_lower(Val) of "true" -> true; @@ -387,7 +395,8 @@ parse_boolean(Val) -> throw({query_parse_error, ?l2b(Msg)}) end. - +parse_int(Val) when is_integer(Val) -> + Val; parse_int(Val) -> case (catch list_to_integer(Val)) of IntVal when is_integer(IntVal) -> @@ -397,7 +406,6 @@ parse_int(Val) -> throw({query_parse_error, ?l2b(Msg)}) end. - parse_pos_int(Val) -> case parse_int(Val) of IntVal when IntVal >= 0 -> diff --git a/apps/couch_mrview/src/couch_mrview_index.erl b/apps/couch_mrview/src/couch_mrview_index.erl index 0835f446d..5f5ae8846 100644 --- a/apps/couch_mrview/src/couch_mrview_index.erl +++ b/apps/couch_mrview/src/couch_mrview_index.erl @@ -38,8 +38,10 @@ get(Property, State) -> Opts = State#mrst.design_opts, IncDesign = couch_util:get_value(<<"include_design">>, Opts, false), LocalSeq = couch_util:get_value(<<"local_seq">>, Opts, false), + SeqIndexed = couch_util:get_value(<<"seq_indexed">>, Opts, false), if IncDesign -> [include_design]; true -> [] end - ++ if LocalSeq -> [local_seq]; true -> [] end; + ++ if LocalSeq -> [local_seq]; true -> [] end + ++ if SeqIndexed -> [seq_indexed]; true -> [] end; info -> #mrst{ fd = Fd, @@ -49,19 +51,32 @@ get(Property, State) -> language = Lang, update_seq = UpdateSeq, purge_seq = PurgeSeq, - views = Views + views = Views, + design_opts = Opts } = State, {ok, Size} = couch_file:bytes(Fd), {ok, DataSize} = couch_mrview_util:calculate_data_size(IdBtree, LogBtree, Views), + + + IncDesign = couch_util:get_value(<<"include_design">>, Opts, false), + LocalSeq = couch_util:get_value(<<"local_seq">>, Opts, false), + SeqIndexed = couch_util:get_value(<<"seq_indexed">>, Opts, false), + UpdateOptions = + if IncDesign -> [<<"include_design">>]; true -> [] end + ++ if LocalSeq -> [<<"local_seq">>]; true -> [] end + ++ if SeqIndexed -> [<<"seq_indexed">>]; true -> [] end, + + {ok, [ {signature, list_to_binary(couch_index_util:hexsig(Sig))}, {language, Lang}, {disk_size, Size}, {data_size, DataSize}, {update_seq, UpdateSeq}, - {purge_seq, PurgeSeq} + {purge_seq, PurgeSeq}, + {update_options, UpdateOptions} ]}; Other -> throw({unknown_index_property, Other}) diff --git a/apps/couch_mrview/test/09-index-events.t b/apps/couch_mrview/test/09-index-events.t index 1489e4e9a..6cc1e9c9f 100644 --- a/apps/couch_mrview/test/09-index-events.t +++ b/apps/couch_mrview/test/09-index-events.t @@ -15,7 +15,7 @@ % the License. main(_) -> - etap:plan(4), + etap:plan(5), case (catch test()) of ok -> etap:end_tests(); @@ -29,11 +29,18 @@ main(_) -> test() -> test_util:start_couch(), {ok, Db} = couch_mrview_test_util:init_db(<<"foo">>, changes), + test_info(Db), test_update_event(Db), test_delete_event(Db), test_util:stop_couch(), ok. +test_info(Db) -> + {ok, Info} = couch_mrview:get_info(Db, <<"_design/bar">>), + etap:is(getval(update_options, Info), [<<"seq_indexed">>], + "update options OK"), + ok. + test_update_event(Db) -> {ok, Pid} = couch_index_event:start_link(self()), etap:ok(is_pid(Pid), "event handler added"), @@ -47,11 +54,13 @@ test_update_event(Db) -> couch_index_event:stop(Pid). test_delete_event(Db) -> - ok = couch_mrview:refresh(Db, <<"_design/bar">>), + ok = couch_mrview:refresh(Db, <<"_design/bar">>), + timer:sleep(300), {ok, Pid} = couch_index_event:start_link(self()), + etap:ok(is_pid(Pid), "delete event handler added"), - etap:ok(is_pid(Pid), "event handler added"), - couch_mrview_test_util:delete_db(<<"foo">>), + + catch couch_mrview_test_util:delete_db(<<"foo">>), Expect = {index_delete, {<<"foo">>, <<"_design/bar">>, couch_mrview_index}}, receive @@ -59,3 +68,7 @@ test_delete_event(Db) -> etap:is(Event, Expect, "index delete events OK") end, couch_index_event:stop(Pid). + +getval(Key, PL) -> + {value, {Key, Val}} = lists:keysearch(Key, 1, PL), + Val. diff --git a/share/www/script/test/changes.js b/share/www/script/test/changes.js index 0fba9f9fa..5c543f1ef 100644 --- a/share/www/script/test/changes.js +++ b/share/www/script/test/changes.js @@ -283,7 +283,28 @@ couchTests.changes = function(debug) { } }; + var ddoc1 = { + _id : "_design/changes_seq_indexed", + options : { + local_seq : true, + seq_indexed : true + }, + views : { + local_seq : { + map : "function(doc) {emit(doc._local_seq, null)}" + }, + blah: { + map : 'function(doc) {' + + ' if (doc._id == "blah") {' + + ' emit("test", null);' + + ' }' + + '}' + } + } + }; + db.save(ddoc); + db.save(ddoc1); var req = CouchDB.request("GET", "/test_suite_db/_changes?filter=changes_filter/bop"); var resp = JSON.parse(req.responseText); @@ -404,6 +425,24 @@ couchTests.changes = function(debug) { T(resp.results.length === 1); T(resp.results[0].id === "blah"); + var req = CouchDB.request("GET", '/test_suite_db/_changes?filter=_view&view=changes_filter/blah&key="test"'); + TEquals(400, req.status, "should return 400 for when seq_indexed=false"); + + // test filter on view function (map) with seq_indexed = true + // + + var resp = db.view('changes_seq_indexed/blah', {limit: 1}); + T(resp.rows.length == 1); + + var req = CouchDB.request("GET", "/test_suite_db/_changes?filter=_view&view=changes_seq_indexed/blah"); + var resp = JSON.parse(req.responseText); + T(resp.results.length === 1); + T(resp.results[0].id === "blah"); + + var req = CouchDB.request("GET", '/test_suite_db/_changes?filter=_view&view=changes_seq_indexed/blah&key="test"'); + var resp = JSON.parse(req.responseText); + T(resp.results.length === 1); + T(resp.results[0].id === "blah"); // test for userCtx run_on_modified_server( |