authorbenoitc <>2014-02-07 15:38:34 +0100
committerbenoitc <>2014-02-07 15:38:34 +0100
commit38a18abbc4109ffbb7734a261c521b544e9a1a32 (patch)
parentc73144aaef7f86d169afd685dae121463efea658 (diff)
add supports of view changes in the _changes API
Now when the option `seq_indexed=true` is set in the design doc, the view filter in _changes will use it to retrieve the results. Compared to the current way, using a view index will be faster to retrieve changes. It also gives the possibility to filter changes by key or get changes in a key range. All the view options can be used. Note 1: if someone is trying to filter a changes with view options when the views are not indexed by sequence, a 400 error will be returned. Note 2: The changes will only be returned when the view is updated if seq_indexed=true
diff --git a/apps/couch/src/couch_util.erl b/apps/couch/src/couch_util.erl
index 40401e2a3..2a3bba7f4 100644
--- a/apps/couch/src/couch_util.erl
+++ b/apps/couch/src/couch_util.erl
@@ -32,6 +32,7 @@
-export([rfc1123_date/0, rfc1123_date/1]).
@@ -352,6 +353,12 @@ to_list(V) when is_atom(V) ->
to_list(V) ->
lists:flatten(io_lib:format("~p", [V])).
+to_atom(V) when is_binary(V) ->
+ list_to_atom(binary_to_list(V));
+to_atom(V) when is_list(V) ->
+ list_to_atom(V).
url_encode(Bin) when is_binary(Bin) ->
url_encode([H|T]) ->
diff --git a/apps/couch_httpd/src/couch_httpd_changes.erl b/apps/couch_httpd/src/couch_httpd_changes.erl
index 1e431e9c7..56ce5597a 100644
--- a/apps/couch_httpd/src/couch_httpd_changes.erl
+++ b/apps/couch_httpd/src/couch_httpd_changes.erl
@@ -12,7 +12,9 @@
+ handle_changes/3,
+ handle_view_changes/3]).
@@ -34,9 +36,7 @@ handle_changes_req1(Req, #db{name=DbName}=Db) ->
% on other databases, _changes is free for all.
- handle_changes_req2(Req, Db).
-handle_changes_req2(Req, Db) ->
MakeCallback = fun(Resp) ->
fun({change, {ChangeProp}=Change, _}, "eventsource") ->
Seq = proplists:get_value(<<"seq">>, ChangeProp),
@@ -72,7 +72,7 @@ handle_changes_req2(Req, Db) ->
ChangesArgs = parse_changes_query(Req, Db),
- ChangesFun = couch_changes:handle_changes(ChangesArgs, Req, Db),
+ ChangesFun = handle_changes(ChangesArgs, Req, Db),
WrapperFun = case ChangesArgs#changes_args.feed of
"normal" ->
{ok, Info} = couch_db:get_db_info(Db),
@@ -116,6 +116,164 @@ handle_changes_req2(Req, Db) ->
+handle_changes(ChangesArgs, Req, Db) ->
+ case ChangesArgs#changes_args.filter of
+ "_view" ->
+ handle_view_changes(ChangesArgs, Req, Db);
+ _ ->
+ couch_changes:handle_changes(ChangesArgs, Req, Db)
+ end.
+%% wrapper around couch_mrview_changes.
+%% This wrapper mimic couch_changes:handle_changes/3 and return a
+%% Changefun that can be used by the handle_changes_req function. Also
+%% while couch_mrview_changes:handle_changes/6 is returning tha view
+%% changes this function return docs corresponding to the changes
+%% instead so it can be used to replace the _view filter.
+handle_view_changes(ChangesArgs, Req, Db) ->
+ %% parse view parameter
+ {DDocId, VName} = parse_view_param(Req),
+ %% get view options
+ Query = case Req of
+ {json_req, {Props}} ->
+ {Q} = couch_util:get_value(<<"query">>, Props, {[]}),
+ Q;
+ _ ->
+ couch_httpd:qs(Req)
+ end,
+ ViewOptions = parse_view_options(Query, []),
+ {ok, Infos} = couch_mrview:get_info(Db, DDocId),
+ case lists:member(<<"seq_indexed">>,
+ proplists:get_value(update_options, Infos, [])) of
+ true ->
+ handle_view_changes(Db, DDocId, VName, ViewOptions, ChangesArgs,
+ Req);
+ false when ViewOptions /= [] ->
+ ?LOG_ERROR("Tried to filter a non sequence indexed view~n",[]),
+ throw({bad_request, seqs_not_indexed});
+ false ->
+ %% old method we are getting changes using the btree instead
+ %% which is not efficient, log it
+ ?LOG_WARN("Get view changes with seq_indexed=false.~n", []),
+ couch_changes:handle_changes(ChangesArgs, Req, Db)
+ end.
+handle_view_changes(#db{name=DbName}=Db0, DDocId, VName, ViewOptions,
+ ChangesArgs, Req) ->
+ #changes_args{
+ feed = ResponseType,
+ since = Since,
+ db_open_options = DbOptions} = ChangesArgs,
+ Options0 = [{since, Since},
+ {view_options, ViewOptions}],
+ Options = case ResponseType of
+ "continuous" -> [stream | Options0];
+ "eventsource" -> [stream | Options0];
+ "longpoll" -> [{stream, once} | Options0];
+ _ -> Options0
+ end,
+ %% reopen the db with the db options given to the changes args
+ couch_db:close(Db0),
+ DbOptions1 = [{user_ctx, Db0#db.user_ctx} | DbOptions],
+ {ok, Db} = couch_db:open(DbName, DbOptions1),
+ %% initialise the changes fun
+ ChangesFun = fun(Callback) ->
+ Callback(start, ResponseType),
+ Acc0 = {"", 0, Db, Callback, ChangesArgs},
+ couch_mrview_changes:handle_changes(DbName, DDocId, VName,
+ fun view_changes_cb/2,
+ Acc0, Options)
+ end,
+ ChangesFun.
+view_changes_cb(stop, {LastSeq, {_, _, _, Callback, Args}}) ->
+ Callback({stop, LastSeq}, Args#changes_args.feed);
+view_changes_cb(heartbeat, {_, _, _, Callback, Args}=Acc) ->
+ Callback(timeout, Args#changes_args.feed),
+ {ok, Acc};
+view_changes_cb({{Seq, _Key, DocId}, _VAl},
+ {Prepend, OldLimit, Db0, Callback, Args}=Acc) ->
+ #changes_args{
+ feed = ResponseType,
+ limit = Limit} = Args,
+ %% if the doc sequence is > to the one in the db record, reopen the
+ %% database since it means we don't have the latest db value.
+ Db = case Db0#db.update_seq >= Seq of
+ true -> Db0;
+ false ->
+ {ok, Db1} = couch_db:reopen_db(Db0),
+ Db1
+ end,
+ case couch_db:get_doc_info(Db, DocId) of
+ {ok, DocInfo} ->
+ %% get change row
+ ChangeRow = view_change_row(Db, DocInfo, Args),
+ %% emit change row
+ Callback({change, ChangeRow, Prepend}, ResponseType),
+ %% if we achieved the limit, stop here, else continue.
+ NewLimit = OldLimit + 1,
+ if Limit > NewLimit ->
+ {ok, {<<",\n">>, Db, NewLimit, Callback, Args}};
+ true ->
+ {stop, {<<"">>, Db, NewLimit, Callback, Args}}
+ end;
+ {error, not_found} ->
+ %% doc not found, continue
+ {ok, Acc};
+ Error ->
+ throw(Error)
+ end.
+view_change_row(Db, DocInfo, Args) ->
+ #doc_info{id = Id, high_seq = Seq, revs = Revs} = DocInfo,
+ [#rev_info{rev=Rev, deleted=Del} | _] = Revs,
+ #changes_args{style=Style,
+ include_docs=InDoc,
+ doc_options = DocOpts,
+ conflicts=Conflicts}=Args,
+ Changes = case Style of
+ main_only ->
+ [{[{<<"rev">>, couch_doc:rev_to_str(Rev)}]}];
+ all_docs ->
+ [{[{<<"rev">>, couch_doc:rev_to_str(R)}]}
+ || #rev_info{rev=R} <- Revs]
+ end,
+ {[{<<"seq">>, Seq}, {<<"id">>, Id}, {<<"changes">>, Changes}] ++
+ deleted_item(Del) ++ case InDoc of
+ true ->
+ Opts = case Conflicts of
+ true -> [deleted, conflicts];
+ false -> [deleted]
+ end,
+ Doc = couch_index_util:load_doc(Db, DocInfo, Opts),
+ case Doc of
+ null ->
+ [{doc, null}];
+ _ ->
+ [{doc, couch_doc:to_json_obj(Doc, DocOpts)}]
+ end;
+ false ->
+ []
+ end}.
parse_changes_query(Req, Db) ->
ChangesArgs = lists:foldl(fun({Key, Value}, Args) ->
case {string:to_lower(Key), Value} of
@@ -172,3 +330,87 @@ parse_changes_query(Req, Db) ->
_ ->
+parse_view_param({json_req, {Props}}) ->
+ {Query} = couch_util:get_value(<<"query">>, Props),
+ parse_view_param1(couch_util:get_value(<<"view">>, Query, <<"">>));
+parse_view_param(Req) ->
+ parse_view_param1(list_to_binary(couch_httpd:qs_value(Req, "view", ""))).
+parse_view_param1(ViewParam) ->
+ case re:split(ViewParam, <<"/">>) of
+ [DName, ViewName] ->
+ {<< "_design/", DName/binary >>, ViewName};
+ _ ->
+ throw({bad_request, "Invalid `view` parameter."})
+ end.
+parse_view_options([], Acc) ->
+ Acc;
+parse_view_options([{K, V} | Rest], Acc) ->
+ Acc1 = case couch_util:to_binary(K) of
+ <<"reduce">> ->
+ [{reduce, couch_mrview_http:parse_boolean(V)}];
+ <<"key">> ->
+ V1 = parse_json(V),
+ [{start_key, V1}, {end_key, V1} | Acc];
+ <<"keys">> ->
+ [{keys, parse_json(V)} | Acc];
+ <<"startkey">> ->
+ [{start_key, parse_json(V)} | Acc];
+ <<"start_key">> ->
+ [{start_key, parse_json(V)} | Acc];
+ <<"startkey_docid">> ->
+ [{start_key_docid, couch_util:to_binary(V)} | Acc];
+ <<"start_key_docid">> ->
+ [{start_key_docid, couch_util:to_binary(V)} | Acc];
+ <<"endkey">> ->
+ [{end_key, parse_json(V)} | Acc];
+ <<"end_key">> ->
+ [{end_key, parse_json(V)} | Acc];
+ <<"endkey_docid">> ->
+ [{start_key_docid, couch_util:to_binary(V)} | Acc];
+ <<"end_key_docid">> ->
+ [{start_key_docid, couch_util:to_binary(V)} | Acc];
+ <<"limit">> ->
+ [{limit, couch_mrview_http:parse_pos_int(V)} | Acc];
+ <<"count">> ->
+ throw({query_parse_error, <<"QS param `count` is not `limit`">>});
+ <<"stale">> when V =:= <<"ok">> orelse V =:= "ok" ->
+ [{stale, ok} | Acc];
+ <<"stale">> when V =:= <<"update_after">> orelse V =:= "update_after" ->
+ [{stale, update_after} | Acc];
+ <<"stale">> ->
+ throw({query_parse_error, <<"Invalid value for `stale`.">>});
+ <<"descending">> ->
+ case couch_mrview_http:parse_boolean(V) of
+ true ->
+ [{direction, rev} | Acc];
+ _ ->
+ [{direction, fwd} | Acc]
+ end;
+ <<"skip">> ->
+ [{skip, couch_mrview_http:parse_pos_int(V)} | Acc];
+ <<"group">> ->
+ case couch_mrview_http:parse_booolean(V) of
+ true ->
+ [{group_level, exact} | Acc];
+ _ ->
+ [{group_level, 0} | Acc]
+ end;
+ <<"group_level">> ->
+ [{group_level, couch_mrview_http:parse_pos_int(V)} | Acc];
+ <<"inclusive_end">> ->
+ [{inclusive_end, couch_mrview_http:parse_boolean(V)}];
+ _ ->
+ Acc
+ end,
+ parse_view_options(Rest, Acc1).
+parse_json(V) when is_list(V) ->
+parse_json(V) ->
+ V.
+deleted_item(true) -> [{<<"deleted">>, true}];
+deleted_item(_) -> [].
diff --git a/apps/couch_index/src/couch_index.erl b/apps/couch_index/src/couch_index.erl
index c48c06674..b9ae567d0 100644
--- a/apps/couch_index/src/couch_index.erl
+++ b/apps/couch_index/src/couch_index.erl
@@ -254,12 +254,12 @@ handle_cast(delete, State) ->
DbName = Mod:get(db_name, IdxState),
DDocId = Mod:get(idx_name, IdxState),
- ok = Mod:delete(IdxState),
%% notify about the index deletion
{DbName, DDocId, Mod}}),
+ ok = Mod:delete(IdxState),
{stop, normal, State};
handle_cast(ddoc_updated, State) ->
#st{mod = Mod, idx_state = IdxState, waiters = Waiters} = State,
@@ -319,7 +319,14 @@ handle_info(commit, State) ->
{noreply, State}
handle_info({'DOWN', _, _, _Pid, _}, #st{mod=Mod, idx_state=IdxState}=State) ->
- Args = [Mod:get(db_name, IdxState), Mod:get(idx_name, IdxState)],
+ DbName = Mod:get(db_name, IdxState),
+ DDocId = Mod:get(idx_name, IdxState),
+ %% notify to event listeners that the index has been
+ %% updated
+ couch_index_event:notify({index_delete, {DbName, DDocId, Mod}}),
+ Args = [DbName, DDocId],
?LOG_INFO("Index shutdown by monitor notice for db: ~s idx: ~s", Args),
catch send_all(State#st.waiters, shutdown),
{stop, normal, State#st{waiters=[]}}.
diff --git a/apps/couch_index/src/couch_index_server.erl b/apps/couch_index/src/couch_index_server.erl
index facde1403..86791db5d 100644
--- a/apps/couch_index/src/couch_index_server.erl
+++ b/apps/couch_index/src/couch_index_server.erl
@@ -159,8 +159,6 @@ reset_indexes(DbName, Root) ->
MRef = erlang:monitor(process, Pid),
gen_server:cast(Pid, delete),
receive {'DOWN', MRef, _, _, _} -> ok end,
- couch_index_event:notify({index_delete,
- {DbName, DDocId, couch_mrview_index}}),
rem_from_ets(DbName, Sig, DDocId, Pid)
lists:foreach(Fun, ets:lookup(?BY_DB, DbName)),
@@ -195,11 +193,6 @@ update_notify({ddoc_updated, {DbName, DDocId}}) ->
fun({_DbName, {_DDocId, Sig}}) ->
case ets:lookup(?BY_SIG, {DbName, Sig}) of
[{_, IndexPid}] ->
- %% notify to event listeners that the index has been
- %% updated
- couch_index_event:notify({index_update,
- {DbName, DDocId,
- couch_mrview_index}}),
(catch gen_server:cast(IndexPid, ddoc_updated));
[] ->
diff --git a/apps/couch_mrview/src/couch_mrview_changes.erl b/apps/couch_mrview/src/couch_mrview_changes.erl
index 2b8f91044..a0e528147 100644
--- a/apps/couch_mrview/src/couch_mrview_changes.erl
+++ b/apps/couch_mrview/src/couch_mrview_changes.erl
@@ -141,10 +141,8 @@ view_changes_since(#vst{dbname=DbName, ddoc=DDocId, view=View,
true -> OldSeq
- case Callback(KV, Acc2) of
- {ok, Acc3} -> {ok, {Go, Acc3, LastSeq}};
- {stop, Acc3} -> {stop, {stop, Acc3, LastSeq}}
- end
+ {Go, Acc3} = Callback(KV, Acc2),
+ {Go, {Go, Acc3, LastSeq}}
Acc0 = {ok, UserAcc, Since},
diff --git a/apps/couch_mrview/src/couch_mrview_http.erl b/apps/couch_mrview/src/couch_mrview_http.erl
index 22e0dc3ec..4801e27ed 100644
--- a/apps/couch_mrview/src/couch_mrview_http.erl
+++ b/apps/couch_mrview/src/couch_mrview_http.erl
@@ -22,6 +22,10 @@
+ parse_int/1,
+ parse_pos_int/1]).
@@ -378,6 +382,10 @@ parse_qs(Key, Val, Args) ->
+parse_boolean(true) ->
+ true;
+parse_boolean(false) ->
+ false;
parse_boolean(Val) ->
case string:to_lower(Val) of
"true" -> true;
@@ -387,7 +395,8 @@ parse_boolean(Val) ->
throw({query_parse_error, ?l2b(Msg)})
+parse_int(Val) when is_integer(Val) ->
+ Val;
parse_int(Val) ->
case (catch list_to_integer(Val)) of
IntVal when is_integer(IntVal) ->
@@ -397,7 +406,6 @@ parse_int(Val) ->
throw({query_parse_error, ?l2b(Msg)})
parse_pos_int(Val) ->
case parse_int(Val) of
IntVal when IntVal >= 0 ->
diff --git a/apps/couch_mrview/src/couch_mrview_index.erl b/apps/couch_mrview/src/couch_mrview_index.erl
index 0835f446d..5f5ae8846 100644
--- a/apps/couch_mrview/src/couch_mrview_index.erl
+++ b/apps/couch_mrview/src/couch_mrview_index.erl
@@ -38,8 +38,10 @@ get(Property, State) ->
Opts = State#mrst.design_opts,
IncDesign = couch_util:get_value(<<"include_design">>, Opts, false),
LocalSeq = couch_util:get_value(<<"local_seq">>, Opts, false),
+ SeqIndexed = couch_util:get_value(<<"seq_indexed">>, Opts, false),
if IncDesign -> [include_design]; true -> [] end
- ++ if LocalSeq -> [local_seq]; true -> [] end;
+ ++ if LocalSeq -> [local_seq]; true -> [] end
+ ++ if SeqIndexed -> [seq_indexed]; true -> [] end;
info ->
fd = Fd,
@@ -49,19 +51,32 @@ get(Property, State) ->
language = Lang,
update_seq = UpdateSeq,
purge_seq = PurgeSeq,
- views = Views
+ views = Views,
+ design_opts = Opts
} = State,
{ok, Size} = couch_file:bytes(Fd),
{ok, DataSize} = couch_mrview_util:calculate_data_size(IdBtree,
+ IncDesign = couch_util:get_value(<<"include_design">>, Opts, false),
+ LocalSeq = couch_util:get_value(<<"local_seq">>, Opts, false),
+ SeqIndexed = couch_util:get_value(<<"seq_indexed">>, Opts, false),
+ UpdateOptions =
+ if IncDesign -> [<<"include_design">>]; true -> [] end
+ ++ if LocalSeq -> [<<"local_seq">>]; true -> [] end
+ ++ if SeqIndexed -> [<<"seq_indexed">>]; true -> [] end,
{ok, [
{signature, list_to_binary(couch_index_util:hexsig(Sig))},
{language, Lang},
{disk_size, Size},
{data_size, DataSize},
{update_seq, UpdateSeq},
- {purge_seq, PurgeSeq}
+ {purge_seq, PurgeSeq},
+ {update_options, UpdateOptions}
Other ->
throw({unknown_index_property, Other})
diff --git a/apps/couch_mrview/test/09-index-events.t b/apps/couch_mrview/test/09-index-events.t
index 1489e4e9a..6cc1e9c9f 100644
--- a/apps/couch_mrview/test/09-index-events.t
+++ b/apps/couch_mrview/test/09-index-events.t
@@ -15,7 +15,7 @@
% the License.
main(_) ->
- etap:plan(4),
+ etap:plan(5),
case (catch test()) of
ok ->
@@ -29,11 +29,18 @@ main(_) ->
test() ->
{ok, Db} = couch_mrview_test_util:init_db(<<"foo">>, changes),
+ test_info(Db),
+test_info(Db) ->
+ {ok, Info} = couch_mrview:get_info(Db, <<"_design/bar">>),
+ etap:is(getval(update_options, Info), [<<"seq_indexed">>],
+ "update options OK"),
+ ok.
test_update_event(Db) ->
{ok, Pid} = couch_index_event:start_link(self()),
etap:ok(is_pid(Pid), "event handler added"),
@@ -47,11 +54,13 @@ test_update_event(Db) ->
test_delete_event(Db) ->
- ok = couch_mrview:refresh(Db, <<"_design/bar">>),
+ ok = couch_mrview:refresh(Db, <<"_design/bar">>),
+ timer:sleep(300),
{ok, Pid} = couch_index_event:start_link(self()),
+ etap:ok(is_pid(Pid), "delete event handler added"),
- etap:ok(is_pid(Pid), "event handler added"),
- couch_mrview_test_util:delete_db(<<"foo">>),
+ catch couch_mrview_test_util:delete_db(<<"foo">>),
Expect = {index_delete, {<<"foo">>, <<"_design/bar">>,
@@ -59,3 +68,7 @@ test_delete_event(Db) ->
etap:is(Event, Expect, "index delete events OK")
+getval(Key, PL) ->
+ {value, {Key, Val}} = lists:keysearch(Key, 1, PL),
+ Val.
diff --git a/share/www/script/test/changes.js b/share/www/script/test/changes.js
index 0fba9f9fa..5c543f1ef 100644
--- a/share/www/script/test/changes.js
+++ b/share/www/script/test/changes.js
@@ -283,7 +283,28 @@ couchTests.changes = function(debug) {
+ var ddoc1 = {
+ _id : "_design/changes_seq_indexed",
+ options : {
+ local_seq : true,
+ seq_indexed : true
+ },
+ views : {
+ local_seq : {
+ map : "function(doc) {emit(doc._local_seq, null)}"
+ },
+ blah: {
+ map : 'function(doc) {' +
+ ' if (doc._id == "blah") {' +
+ ' emit("test", null);' +
+ ' }' +
+ '}'
+ }
+ }
+ };
var req = CouchDB.request("GET", "/test_suite_db/_changes?filter=changes_filter/bop");
var resp = JSON.parse(req.responseText);
@@ -404,6 +425,24 @@ couchTests.changes = function(debug) {
T(resp.results.length === 1);
T(resp.results[0].id === "blah");
+ var req = CouchDB.request("GET", '/test_suite_db/_changes?filter=_view&view=changes_filter/blah&key="test"');
+ TEquals(400, req.status, "should return 400 for when seq_indexed=false");
+ // test filter on view function (map) with seq_indexed = true
+ //
+ var resp = db.view('changes_seq_indexed/blah', {limit: 1});
+ T(resp.rows.length == 1);
+ var req = CouchDB.request("GET", "/test_suite_db/_changes?filter=_view&view=changes_seq_indexed/blah");
+ var resp = JSON.parse(req.responseText);
+ T(resp.results.length === 1);
+ T(resp.results[0].id === "blah");
+ var req = CouchDB.request("GET", '/test_suite_db/_changes?filter=_view&view=changes_seq_indexed/blah&key="test"');
+ var resp = JSON.parse(req.responseText);
+ T(resp.results.length === 1);
+ T(resp.results[0].id === "blah");
// test for userCtx