diff options
author | Eric Avdey <eiri@eiri.ca> | 2019-02-28 16:54:13 -0400 |
---|---|---|
committer | Eric Avdey <eiri@eiri.ca> | 2019-02-28 16:54:13 -0400 |
commit | 7cd1026d4de113e8c018f7190aca709b27b2e53d (patch) | |
tree | 6d81eb748ad018050fde6c16da7ea9470c38810c | |
parent | ca70537c308a8ef7144d653fadd436f853708942 (diff) | |
download | couchdb-7cd1026d4de113e8c018f7190aca709b27b2e53d.tar.gz |
Add tests for shard's split changes feedshard-split-changes-feed-test
-rw-r--r-- | src/mem3/test/mem3_reshard_changes_feed_test.erl | 447 |
1 files changed, 447 insertions, 0 deletions
diff --git a/src/mem3/test/mem3_reshard_changes_feed_test.erl b/src/mem3/test/mem3_reshard_changes_feed_test.erl new file mode 100644 index 000000000..0cbf33ba2 --- /dev/null +++ b/src/mem3/test/mem3_reshard_changes_feed_test.erl @@ -0,0 +1,447 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(mem3_reshard_changes_feed_test). + + +-include_lib("couch/include/couch_eunit.hrl"). +-include_lib("couch/include/couch_db.hrl"). +-include_lib("mem3/src/mem3_reshard.hrl"). + + +-define(assertChanges(Expected, Received), + begin + ((fun() -> + ExpectedIDs = lists:sort([I || #{id := I} <- Expected]), + ReceivedIDs = lists:sort([I || #{id := I} <- Received]), + ?assertEqual(ExpectedIDs, ReceivedIDs) + end)()) + end). + + +setup() -> + Db1 = ?tempdb(), + create_db(Db1, [{q, 1}, {n, 1}]), + #{db1 => Db1}. + + +teardown(#{} = Dbs) -> + mem3_reshard:reset_state(), + maps:map(fun(_, Db) -> delete_db(Db) end, Dbs). + + +start_couch() -> + test_util:start_couch(?CONFIG_CHAIN, [mem3, fabric]). + + +stop_couch(Ctx) -> + test_util:stop_couch(Ctx). + + +mem3_reshard_changes_feed_test_() -> + { + "mem3 shard split changes feed tests", + { + setup, + fun start_couch/0, fun stop_couch/1, + { + foreach, + fun setup/0, fun teardown/1, + [ + fun normal_feed_should_work_after_split/1, + fun continuous_feed_should_work_during_split/1 + ] + } + } + }. + + +normal_feed_should_work_after_split(#{db1 := Db}) -> + ?_test(begin + DocSpec = #{ + docs => [1, 10], + delete => [5, 6] + }, + add_test_docs(Db, DocSpec), + + % gather pre-shard changes + BaseArgs = #changes_args{feed = "normal", dir = fwd, since = 0}, + {ok, OldChanges, OldEndSeq} = get_changes_feed(Db, BaseArgs), + + % Split the shard + split_and_wait(Db), + + % verify changes list consistent for all the old seqs + lists:foldl(fun(#{seq := Seq} = C, ExpectedChanges) -> + Args = BaseArgs#changes_args{since = Seq}, + {ok, Changes, _EndSeq} = get_changes_feed(Db, Args), + ?assertChanges(ExpectedChanges, Changes), + [C | ExpectedChanges] + end, [], OldChanges), + + % confirm that old LastSeq respected + Args1 = BaseArgs#changes_args{since = OldEndSeq}, + {ok, Changes1, EndSeq1} = get_changes_feed(Db, Args1), + ?assertChanges([], Changes1), + + % confirm that new LastSeq also respected + Args2 = BaseArgs#changes_args{since = EndSeq1}, + {ok, Changes2, EndSeq2} = get_changes_feed(Db, Args2), + ?assertChanges([], Changes2), + ?assertEqual(EndSeq2, EndSeq1), + + % confirm we didn't lost any changes and have consistent last seq + {ok, Changes3, EndSeq3} = get_changes_feed(Db, BaseArgs), + ?assertChanges(OldChanges, Changes3), + %% FIXME. last_seq should be consistent + % ?assertEqual(EndSeq3, EndSeq2), + + % add some docs + add_test_docs(Db, #{docs => [11, 15]}), + Args4 = BaseArgs#changes_args{since = EndSeq3}, + {ok, Changes4, EndSeq4} = get_changes_feed(Db, Args4), + AddedChanges = [#{id => ID} || #doc{id = ID} <- docs([11, 15])], + ?assertChanges(AddedChanges, Changes4), + + % confirm include_docs and deleted works + Args5 = BaseArgs#changes_args{include_docs = true}, + {ok, Changes5, EndSeq5} = get_changes_feed(Db, Args5), + ?assertEqual(EndSeq4, EndSeq5), + [SampleChange] = [C || #{id := ID} = C <- Changes5, ID == <<"00005">>], + ?assertMatch(#{deleted := true}, SampleChange), + ?assertMatch(#{doc := {Body}} when is_list(Body), SampleChange), + + % update and delete some pre and post split docs + AllDocs = [couch_doc:from_json_obj(Doc) || #{doc := Doc} <- Changes5], + UpdateDocs = lists:filtermap(fun + (#doc{id = <<"00002">>}) -> true; + (#doc{id = <<"00012">>}) -> true; + (#doc{id = <<"00004">>} = Doc) -> {true, Doc#doc{deleted = true}}; + (#doc{id = <<"00014">>} = Doc) -> {true, Doc#doc{deleted = true}}; + (_) -> false + end, AllDocs), + update_docs(Db, UpdateDocs), + + Args6 = BaseArgs#changes_args{since = EndSeq5}, + {ok, Changes6, EndSeq6} = get_changes_feed(Db, Args6), + UpdatedChanges = [#{id => ID} || #doc{id = ID} <- UpdateDocs], + ?assertChanges(UpdatedChanges, Changes6), + [#{seq := Seq6} | _] = Changes6, + ?assertEqual(EndSeq6, Seq6), + + Args7 = Args6#changes_args{dir = rev, limit = 4}, + {ok, Changes7, EndSeq7} = get_changes_feed(Db, Args7), + %% FIXME - I believe rev is just conceptually broken in Couch 2.x + % ?assertChanges(lists:reverse(Changes6), Changes7), + ?assertEqual(4, length(Changes7)), + [#{seq := Seq7} | _] = Changes7, + ?assertEqual(EndSeq7, Seq7) + end). + + +continuous_feed_should_work_during_split(#{db1 := Db}) -> + ?_test(begin + {UpdaterPid, UpdaterRef} = spawn_monitor(fun() -> + Updater = fun U({State, I}) -> + receive + {get_state, {Pid, Ref}} -> + Pid ! {state, Ref, State}, + U({State, I}); + add -> + DocSpec = #{docs => [I, I]}, + add_test_docs(Db, DocSpec), + U({State, I + 1}); + split -> + spawn_monitor(fun() -> split_and_wait(Db) end), + U({"in_process", I}); + stop -> + ok; + {'DOWN', _, process, _, normal} -> + U({"after", I}) + end + end, + Updater({"before", 1}) + end), + + Callback = fun + (start, Acc) -> + {ok, Acc}; + (waiting_for_updates, Acc) -> + Ref = make_ref(), + UpdaterPid ! {get_state, {self(), Ref}}, + receive {state, Ref, State} -> ok end, + case {State, length(Acc)} of + {"before", N} when N < 5 -> + UpdaterPid ! add; + {"before", _} -> + UpdaterPid ! split; + {"in_process", N} when N < 10 -> + UpdaterPid ! add; + {"in_process", _} -> + wait; + {"after", N} when N < 15 -> + UpdaterPid ! add; + {"after", _} -> + UpdaterPid ! stop, + %% nfc why simple return of {stop, Acc} doesn't work + exit({with_proc_res, {ok, Acc}}) + end, + {ok, Acc}; + (timeout, Acc) -> + {ok, Acc}; + ({change, {Change}}, Acc) -> + CM = maps:from_list(Change), + {ok, [CM | Acc]}; + ({stop, EndSeq, _Pending}, _Acc) -> + UpdaterPid ! stop, + Error = {assertion_fail, [ + {module, ?MODULE}, + {line, ?LINE}, + {value, {last_seq, EndSeq}}, + {reason, "Changes feed stopped on shard split"} + ]}, + exit({with_proc_res, {error, Error}}) + end, + BaseArgs = #changes_args{ + feed = "continuous", + heartbeat = 100, + timeout = 1000 + }, + Result = get_changes_feed(Db, BaseArgs, Callback), + + receive + {'DOWN', UpdaterRef, process, UpdaterPid, normal} -> + ok; + {'DOWN', UpdaterRef, process, UpdaterPid, Error} -> + erlang:error({test_context_failed, [ + {module, ?MODULE}, + {line, ?LINE}, + {value, Error}, + {reason, "Updater died"}]}) + end, + + case Result of + {ok, Changes} -> + Expected = [#{id => ID} || #doc{id = ID} <- docs([1, 15])], + ?assertChanges(Expected, Changes); + {error, Err} -> + erlang:error(Err) + end + end). + + +split_and_wait(Db) -> + [#shard{name = Shard}] = lists:sort(mem3:local_shards(Db)), + {ok, JobId} = mem3_reshard:start_split_job(Shard), + wait_state(JobId, completed), + ResultShards = lists:sort(mem3:local_shards(Db)), + ?assertEqual(2, length(ResultShards)). + + +wait_state(JobId, State) -> + test_util:wait(fun() -> + case mem3_reshard:job(JobId) of + {ok, {Props}} -> + case couch_util:get_value(job_state, Props) of + State -> ok; + _ -> timer:sleep(100), wait + end; + {error, not_found} -> timer:sleep(100), wait + end + end, 30000). + + +get_changes_feed(Db, Args) -> + get_changes_feed(Db, Args, fun changes_callback/2). + +get_changes_feed(Db, Args, Callback) -> + with_proc(fun() -> + fabric:changes(Db, Callback, [], Args) + end). + + +changes_callback(start, Acc) -> + {ok, Acc}; +changes_callback({change, {Change}}, Acc) -> + CM = maps:from_list(Change), + {ok, [CM | Acc]}; +changes_callback({stop, EndSeq, _Pending}, Acc) -> + {ok, Acc, EndSeq}. + + +%% common helpers from here + + +create_db(DbName, Opts) -> + GL = erlang:group_leader(), + with_proc(fun() -> fabric:create_db(DbName, Opts) end, GL). + + +delete_db(DbName) -> + GL = erlang:group_leader(), + with_proc(fun() -> fabric:delete_db(DbName, [?ADMIN_CTX]) end, GL). + + +with_proc(Fun) -> + with_proc(Fun, undefined, 30000). + + +with_proc(Fun, GroupLeader) -> + with_proc(Fun, GroupLeader, 30000). + + +with_proc(Fun, GroupLeader, Timeout) -> + {Pid, Ref} = spawn_monitor(fun() -> + case GroupLeader of + undefined -> ok; + _ -> erlang:group_leader(GroupLeader, self()) + end, + exit({with_proc_res, Fun()}) + end), + receive + {'DOWN', Ref, process, Pid, {with_proc_res, Res}} -> + Res; + {'DOWN', Ref, process, Pid, Error} -> + error(Error) + after Timeout -> + erlang:demonitor(Ref, [flush]), + exit(Pid, kill), + error({with_proc_timeout, Fun, Timeout}) + end. + + +add_test_docs(DbName, #{} = DocSpec) -> + Docs = docs(maps:get(docs, DocSpec, [])) + ++ ddocs(mrview, maps:get(mrview, DocSpec, [])) + ++ ddocs(search, maps:get(search, DocSpec, [])) + ++ ddocs(geo, maps:get(geo, DocSpec, [])) + ++ ldocs(maps:get(local, DocSpec, [])), + Res = update_docs(DbName, Docs), + Docs1 = lists:map(fun({Doc, {ok, {RevPos, Rev}}}) -> + Doc#doc{revs = {RevPos, [Rev]}} + end, lists:zip(Docs, Res)), + case delete_docs(maps:get(delete, DocSpec, []), Docs1) of + [] -> ok; + [_ | _] = Deleted -> update_docs(DbName, Deleted) + end, + ok. + + +update_docs(DbName, Docs) -> + with_proc(fun() -> + case fabric:update_docs(DbName, Docs, [?ADMIN_CTX]) of + {accepted, Res} -> Res; + {ok, Res} -> Res + end + end). + + +delete_docs([S, E], Docs) when E >= S -> + ToDelete = [doc_id(<<"">>, I) || I <- lists:seq(S, E)], + lists:filtermap(fun(#doc{id = Id} = Doc) -> + case lists:member(Id, ToDelete) of + true -> {true, Doc#doc{deleted = true}}; + false -> false + end + end, Docs); +delete_docs(_, _) -> + []. + + +docs([S, E]) when E >= S -> + [doc(<<"">>, I) || I <- lists:seq(S, E)]; +docs(_) -> + []. + + +ddocs(Type, [S, E]) when E >= S -> + Body = ddprop(Type), + BType = atom_to_binary(Type, utf8), + [doc(<<"_design/", BType/binary>>, I, Body, 0) || I <- lists:seq(S, E)]; +ddocs(_, _) -> + []. + + +ldocs([S, E]) when E >= S -> + [doc(<<"_local/">>, I, bodyprops(), 0) || I <- lists:seq(S, E)]; +ldocs(_) -> + []. + + + +doc(Pref, Id) -> + Body = bodyprops(), + doc(Pref, Id, Body, 42). + + +doc(Pref, Id, BodyProps, AttSize) -> + #doc{ + id = doc_id(Pref, Id), + body = {BodyProps}, + atts = atts(AttSize) + }. + + +doc_id(Pref, Id) -> + IdBin = iolist_to_binary(io_lib:format("~5..0B", [Id])), + <<Pref/binary, IdBin/binary>>. + + +ddprop(mrview) -> + [ + {<<"views">>, {[ + {<<"v1">>, {[ + {<<"map">>, <<"function(d){emit(d);}">>} + ]}} + ]}} + ]; + +ddprop(geo) -> + [ + {<<"st_indexes">>, {[ + {<<"area">>, {[ + {<<"analyzer">>, <<"standard">>}, + {<<"index">>, <<"function(d){if(d.g){st_index(d.g)}}">> } + ]}} + ]}} + ]; + +ddprop(search) -> + [ + {<<"indexes">>, {[ + {<<"types">>, {[ + {<<"index">>, <<"function(d){if(d.g){st_index(d.g.type)}}">>} + ]}} + ]}} + ]. + + +bodyprops() -> + [ + {<<"g">>, {[ + {<<"type">>, <<"Polygon">>}, + {<<"coordinates">>, [[[-71.0, 48.4], [-70.0, 48.4], [-71.0, 48.4]]]} + ]}} + ]. + + +atts(0) -> + []; + +atts(Size) when is_integer(Size), Size >= 1 -> + Data = << <<"x">> || _ <- lists:seq(1, Size) >>, + [couch_att:new([ + {name, <<"att">>}, + {type, <<"app/binary">>}, + {att_len, Size}, + {data, Data} + ])]. |