summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEric Avdey <eiri@eiri.ca>2019-02-28 16:54:13 -0400
committerEric Avdey <eiri@eiri.ca>2019-02-28 16:54:13 -0400
commit7cd1026d4de113e8c018f7190aca709b27b2e53d (patch)
tree6d81eb748ad018050fde6c16da7ea9470c38810c
parentca70537c308a8ef7144d653fadd436f853708942 (diff)
downloadcouchdb-7cd1026d4de113e8c018f7190aca709b27b2e53d.tar.gz
Add tests for shard's split changes feedshard-split-changes-feed-test
-rw-r--r--src/mem3/test/mem3_reshard_changes_feed_test.erl447
1 files changed, 447 insertions, 0 deletions
diff --git a/src/mem3/test/mem3_reshard_changes_feed_test.erl b/src/mem3/test/mem3_reshard_changes_feed_test.erl
new file mode 100644
index 000000000..0cbf33ba2
--- /dev/null
+++ b/src/mem3/test/mem3_reshard_changes_feed_test.erl
@@ -0,0 +1,447 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(mem3_reshard_changes_feed_test).
+
+
+-include_lib("couch/include/couch_eunit.hrl").
+-include_lib("couch/include/couch_db.hrl").
+-include_lib("mem3/src/mem3_reshard.hrl").
+
+
+-define(assertChanges(Expected, Received),
+ begin
+ ((fun() ->
+ ExpectedIDs = lists:sort([I || #{id := I} <- Expected]),
+ ReceivedIDs = lists:sort([I || #{id := I} <- Received]),
+ ?assertEqual(ExpectedIDs, ReceivedIDs)
+ end)())
+ end).
+
+
+setup() ->
+ Db1 = ?tempdb(),
+ create_db(Db1, [{q, 1}, {n, 1}]),
+ #{db1 => Db1}.
+
+
+teardown(#{} = Dbs) ->
+ mem3_reshard:reset_state(),
+ maps:map(fun(_, Db) -> delete_db(Db) end, Dbs).
+
+
+start_couch() ->
+ test_util:start_couch(?CONFIG_CHAIN, [mem3, fabric]).
+
+
+stop_couch(Ctx) ->
+ test_util:stop_couch(Ctx).
+
+
+mem3_reshard_changes_feed_test_() ->
+ {
+ "mem3 shard split changes feed tests",
+ {
+ setup,
+ fun start_couch/0, fun stop_couch/1,
+ {
+ foreach,
+ fun setup/0, fun teardown/1,
+ [
+ fun normal_feed_should_work_after_split/1,
+ fun continuous_feed_should_work_during_split/1
+ ]
+ }
+ }
+ }.
+
+
+normal_feed_should_work_after_split(#{db1 := Db}) ->
+ ?_test(begin
+ DocSpec = #{
+ docs => [1, 10],
+ delete => [5, 6]
+ },
+ add_test_docs(Db, DocSpec),
+
+ % gather pre-shard changes
+ BaseArgs = #changes_args{feed = "normal", dir = fwd, since = 0},
+ {ok, OldChanges, OldEndSeq} = get_changes_feed(Db, BaseArgs),
+
+ % Split the shard
+ split_and_wait(Db),
+
+ % verify changes list consistent for all the old seqs
+ lists:foldl(fun(#{seq := Seq} = C, ExpectedChanges) ->
+ Args = BaseArgs#changes_args{since = Seq},
+ {ok, Changes, _EndSeq} = get_changes_feed(Db, Args),
+ ?assertChanges(ExpectedChanges, Changes),
+ [C | ExpectedChanges]
+ end, [], OldChanges),
+
+ % confirm that old LastSeq respected
+ Args1 = BaseArgs#changes_args{since = OldEndSeq},
+ {ok, Changes1, EndSeq1} = get_changes_feed(Db, Args1),
+ ?assertChanges([], Changes1),
+
+ % confirm that new LastSeq also respected
+ Args2 = BaseArgs#changes_args{since = EndSeq1},
+ {ok, Changes2, EndSeq2} = get_changes_feed(Db, Args2),
+ ?assertChanges([], Changes2),
+ ?assertEqual(EndSeq2, EndSeq1),
+
+ % confirm we didn't lost any changes and have consistent last seq
+ {ok, Changes3, EndSeq3} = get_changes_feed(Db, BaseArgs),
+ ?assertChanges(OldChanges, Changes3),
+ %% FIXME. last_seq should be consistent
+ % ?assertEqual(EndSeq3, EndSeq2),
+
+ % add some docs
+ add_test_docs(Db, #{docs => [11, 15]}),
+ Args4 = BaseArgs#changes_args{since = EndSeq3},
+ {ok, Changes4, EndSeq4} = get_changes_feed(Db, Args4),
+ AddedChanges = [#{id => ID} || #doc{id = ID} <- docs([11, 15])],
+ ?assertChanges(AddedChanges, Changes4),
+
+ % confirm include_docs and deleted works
+ Args5 = BaseArgs#changes_args{include_docs = true},
+ {ok, Changes5, EndSeq5} = get_changes_feed(Db, Args5),
+ ?assertEqual(EndSeq4, EndSeq5),
+ [SampleChange] = [C || #{id := ID} = C <- Changes5, ID == <<"00005">>],
+ ?assertMatch(#{deleted := true}, SampleChange),
+ ?assertMatch(#{doc := {Body}} when is_list(Body), SampleChange),
+
+ % update and delete some pre and post split docs
+ AllDocs = [couch_doc:from_json_obj(Doc) || #{doc := Doc} <- Changes5],
+ UpdateDocs = lists:filtermap(fun
+ (#doc{id = <<"00002">>}) -> true;
+ (#doc{id = <<"00012">>}) -> true;
+ (#doc{id = <<"00004">>} = Doc) -> {true, Doc#doc{deleted = true}};
+ (#doc{id = <<"00014">>} = Doc) -> {true, Doc#doc{deleted = true}};
+ (_) -> false
+ end, AllDocs),
+ update_docs(Db, UpdateDocs),
+
+ Args6 = BaseArgs#changes_args{since = EndSeq5},
+ {ok, Changes6, EndSeq6} = get_changes_feed(Db, Args6),
+ UpdatedChanges = [#{id => ID} || #doc{id = ID} <- UpdateDocs],
+ ?assertChanges(UpdatedChanges, Changes6),
+ [#{seq := Seq6} | _] = Changes6,
+ ?assertEqual(EndSeq6, Seq6),
+
+ Args7 = Args6#changes_args{dir = rev, limit = 4},
+ {ok, Changes7, EndSeq7} = get_changes_feed(Db, Args7),
+ %% FIXME - I believe rev is just conceptually broken in Couch 2.x
+ % ?assertChanges(lists:reverse(Changes6), Changes7),
+ ?assertEqual(4, length(Changes7)),
+ [#{seq := Seq7} | _] = Changes7,
+ ?assertEqual(EndSeq7, Seq7)
+ end).
+
+
+continuous_feed_should_work_during_split(#{db1 := Db}) ->
+ ?_test(begin
+ {UpdaterPid, UpdaterRef} = spawn_monitor(fun() ->
+ Updater = fun U({State, I}) ->
+ receive
+ {get_state, {Pid, Ref}} ->
+ Pid ! {state, Ref, State},
+ U({State, I});
+ add ->
+ DocSpec = #{docs => [I, I]},
+ add_test_docs(Db, DocSpec),
+ U({State, I + 1});
+ split ->
+ spawn_monitor(fun() -> split_and_wait(Db) end),
+ U({"in_process", I});
+ stop ->
+ ok;
+ {'DOWN', _, process, _, normal} ->
+ U({"after", I})
+ end
+ end,
+ Updater({"before", 1})
+ end),
+
+ Callback = fun
+ (start, Acc) ->
+ {ok, Acc};
+ (waiting_for_updates, Acc) ->
+ Ref = make_ref(),
+ UpdaterPid ! {get_state, {self(), Ref}},
+ receive {state, Ref, State} -> ok end,
+ case {State, length(Acc)} of
+ {"before", N} when N < 5 ->
+ UpdaterPid ! add;
+ {"before", _} ->
+ UpdaterPid ! split;
+ {"in_process", N} when N < 10 ->
+ UpdaterPid ! add;
+ {"in_process", _} ->
+ wait;
+ {"after", N} when N < 15 ->
+ UpdaterPid ! add;
+ {"after", _} ->
+ UpdaterPid ! stop,
+ %% nfc why simple return of {stop, Acc} doesn't work
+ exit({with_proc_res, {ok, Acc}})
+ end,
+ {ok, Acc};
+ (timeout, Acc) ->
+ {ok, Acc};
+ ({change, {Change}}, Acc) ->
+ CM = maps:from_list(Change),
+ {ok, [CM | Acc]};
+ ({stop, EndSeq, _Pending}, _Acc) ->
+ UpdaterPid ! stop,
+ Error = {assertion_fail, [
+ {module, ?MODULE},
+ {line, ?LINE},
+ {value, {last_seq, EndSeq}},
+ {reason, "Changes feed stopped on shard split"}
+ ]},
+ exit({with_proc_res, {error, Error}})
+ end,
+ BaseArgs = #changes_args{
+ feed = "continuous",
+ heartbeat = 100,
+ timeout = 1000
+ },
+ Result = get_changes_feed(Db, BaseArgs, Callback),
+
+ receive
+ {'DOWN', UpdaterRef, process, UpdaterPid, normal} ->
+ ok;
+ {'DOWN', UpdaterRef, process, UpdaterPid, Error} ->
+ erlang:error({test_context_failed, [
+ {module, ?MODULE},
+ {line, ?LINE},
+ {value, Error},
+ {reason, "Updater died"}]})
+ end,
+
+ case Result of
+ {ok, Changes} ->
+ Expected = [#{id => ID} || #doc{id = ID} <- docs([1, 15])],
+ ?assertChanges(Expected, Changes);
+ {error, Err} ->
+ erlang:error(Err)
+ end
+ end).
+
+
+split_and_wait(Db) ->
+ [#shard{name = Shard}] = lists:sort(mem3:local_shards(Db)),
+ {ok, JobId} = mem3_reshard:start_split_job(Shard),
+ wait_state(JobId, completed),
+ ResultShards = lists:sort(mem3:local_shards(Db)),
+ ?assertEqual(2, length(ResultShards)).
+
+
+wait_state(JobId, State) ->
+ test_util:wait(fun() ->
+ case mem3_reshard:job(JobId) of
+ {ok, {Props}} ->
+ case couch_util:get_value(job_state, Props) of
+ State -> ok;
+ _ -> timer:sleep(100), wait
+ end;
+ {error, not_found} -> timer:sleep(100), wait
+ end
+ end, 30000).
+
+
+get_changes_feed(Db, Args) ->
+ get_changes_feed(Db, Args, fun changes_callback/2).
+
+get_changes_feed(Db, Args, Callback) ->
+ with_proc(fun() ->
+ fabric:changes(Db, Callback, [], Args)
+ end).
+
+
+changes_callback(start, Acc) ->
+ {ok, Acc};
+changes_callback({change, {Change}}, Acc) ->
+ CM = maps:from_list(Change),
+ {ok, [CM | Acc]};
+changes_callback({stop, EndSeq, _Pending}, Acc) ->
+ {ok, Acc, EndSeq}.
+
+
+%% common helpers from here
+
+
+create_db(DbName, Opts) ->
+ GL = erlang:group_leader(),
+ with_proc(fun() -> fabric:create_db(DbName, Opts) end, GL).
+
+
+delete_db(DbName) ->
+ GL = erlang:group_leader(),
+ with_proc(fun() -> fabric:delete_db(DbName, [?ADMIN_CTX]) end, GL).
+
+
+with_proc(Fun) ->
+ with_proc(Fun, undefined, 30000).
+
+
+with_proc(Fun, GroupLeader) ->
+ with_proc(Fun, GroupLeader, 30000).
+
+
+with_proc(Fun, GroupLeader, Timeout) ->
+ {Pid, Ref} = spawn_monitor(fun() ->
+ case GroupLeader of
+ undefined -> ok;
+ _ -> erlang:group_leader(GroupLeader, self())
+ end,
+ exit({with_proc_res, Fun()})
+ end),
+ receive
+ {'DOWN', Ref, process, Pid, {with_proc_res, Res}} ->
+ Res;
+ {'DOWN', Ref, process, Pid, Error} ->
+ error(Error)
+ after Timeout ->
+ erlang:demonitor(Ref, [flush]),
+ exit(Pid, kill),
+ error({with_proc_timeout, Fun, Timeout})
+ end.
+
+
+add_test_docs(DbName, #{} = DocSpec) ->
+ Docs = docs(maps:get(docs, DocSpec, []))
+ ++ ddocs(mrview, maps:get(mrview, DocSpec, []))
+ ++ ddocs(search, maps:get(search, DocSpec, []))
+ ++ ddocs(geo, maps:get(geo, DocSpec, []))
+ ++ ldocs(maps:get(local, DocSpec, [])),
+ Res = update_docs(DbName, Docs),
+ Docs1 = lists:map(fun({Doc, {ok, {RevPos, Rev}}}) ->
+ Doc#doc{revs = {RevPos, [Rev]}}
+ end, lists:zip(Docs, Res)),
+ case delete_docs(maps:get(delete, DocSpec, []), Docs1) of
+ [] -> ok;
+ [_ | _] = Deleted -> update_docs(DbName, Deleted)
+ end,
+ ok.
+
+
+update_docs(DbName, Docs) ->
+ with_proc(fun() ->
+ case fabric:update_docs(DbName, Docs, [?ADMIN_CTX]) of
+ {accepted, Res} -> Res;
+ {ok, Res} -> Res
+ end
+ end).
+
+
+delete_docs([S, E], Docs) when E >= S ->
+ ToDelete = [doc_id(<<"">>, I) || I <- lists:seq(S, E)],
+ lists:filtermap(fun(#doc{id = Id} = Doc) ->
+ case lists:member(Id, ToDelete) of
+ true -> {true, Doc#doc{deleted = true}};
+ false -> false
+ end
+ end, Docs);
+delete_docs(_, _) ->
+ [].
+
+
+docs([S, E]) when E >= S ->
+ [doc(<<"">>, I) || I <- lists:seq(S, E)];
+docs(_) ->
+ [].
+
+
+ddocs(Type, [S, E]) when E >= S ->
+ Body = ddprop(Type),
+ BType = atom_to_binary(Type, utf8),
+ [doc(<<"_design/", BType/binary>>, I, Body, 0) || I <- lists:seq(S, E)];
+ddocs(_, _) ->
+ [].
+
+
+ldocs([S, E]) when E >= S ->
+ [doc(<<"_local/">>, I, bodyprops(), 0) || I <- lists:seq(S, E)];
+ldocs(_) ->
+ [].
+
+
+
+doc(Pref, Id) ->
+ Body = bodyprops(),
+ doc(Pref, Id, Body, 42).
+
+
+doc(Pref, Id, BodyProps, AttSize) ->
+ #doc{
+ id = doc_id(Pref, Id),
+ body = {BodyProps},
+ atts = atts(AttSize)
+ }.
+
+
+doc_id(Pref, Id) ->
+ IdBin = iolist_to_binary(io_lib:format("~5..0B", [Id])),
+ <<Pref/binary, IdBin/binary>>.
+
+
+ddprop(mrview) ->
+ [
+ {<<"views">>, {[
+ {<<"v1">>, {[
+ {<<"map">>, <<"function(d){emit(d);}">>}
+ ]}}
+ ]}}
+ ];
+
+ddprop(geo) ->
+ [
+ {<<"st_indexes">>, {[
+ {<<"area">>, {[
+ {<<"analyzer">>, <<"standard">>},
+ {<<"index">>, <<"function(d){if(d.g){st_index(d.g)}}">> }
+ ]}}
+ ]}}
+ ];
+
+ddprop(search) ->
+ [
+ {<<"indexes">>, {[
+ {<<"types">>, {[
+ {<<"index">>, <<"function(d){if(d.g){st_index(d.g.type)}}">>}
+ ]}}
+ ]}}
+ ].
+
+
+bodyprops() ->
+ [
+ {<<"g">>, {[
+ {<<"type">>, <<"Polygon">>},
+ {<<"coordinates">>, [[[-71.0, 48.4], [-70.0, 48.4], [-71.0, 48.4]]]}
+ ]}}
+ ].
+
+
+atts(0) ->
+ [];
+
+atts(Size) when is_integer(Size), Size >= 1 ->
+ Data = << <<"x">> || _ <- lists:seq(1, Size) >>,
+ [couch_att:new([
+ {name, <<"att">>},
+ {type, <<"app/binary">>},
+ {att_len, Size},
+ {data, Data}
+ ])].