summaryrefslogtreecommitdiff
path: root/src/mem3/test/eunit/mem3_reshard_changes_feed_test.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/mem3/test/eunit/mem3_reshard_changes_feed_test.erl')
-rw-r--r--src/mem3/test/eunit/mem3_reshard_changes_feed_test.erl389
1 files changed, 0 insertions, 389 deletions
diff --git a/src/mem3/test/eunit/mem3_reshard_changes_feed_test.erl b/src/mem3/test/eunit/mem3_reshard_changes_feed_test.erl
deleted file mode 100644
index 4b9e2a34a..000000000
--- a/src/mem3/test/eunit/mem3_reshard_changes_feed_test.erl
+++ /dev/null
@@ -1,389 +0,0 @@
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-%
-% http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
-
--module(mem3_reshard_changes_feed_test).
-
-
--include_lib("couch/include/couch_eunit.hrl").
--include_lib("couch/include/couch_db.hrl").
--include_lib("mem3/src/mem3_reshard.hrl").
-
--define(TIMEOUT, 60). % seconds
-
--define(assertChanges(Expected, Received),
- begin
- ((fun() ->
- ExpectedIDs = lists:sort([I || #{id := I} <- Expected]),
- ReceivedIDs = lists:sort([I || #{id := I} <- Received]),
- ?assertEqual(ExpectedIDs, ReceivedIDs)
- end)())
- end).
-
-
-setup() ->
- Db1 = ?tempdb(),
- create_db(Db1, [{q, 1}, {n, 1}]),
- #{db1 => Db1}.
-
-
-teardown(#{} = Dbs) ->
- mem3_reshard:reset_state(),
- maps:map(fun(_, Db) -> delete_db(Db) end, Dbs).
-
-
-start_couch() ->
- test_util:start_couch(?CONFIG_CHAIN, [mem3, fabric]).
-
-
-stop_couch(Ctx) ->
- test_util:stop_couch(Ctx).
-
-
-mem3_reshard_changes_feed_test_() ->
- {
- "mem3 shard split changes feed tests",
- {
- setup,
- fun start_couch/0, fun stop_couch/1,
- {
- foreach,
- fun setup/0, fun teardown/1,
- [
- fun normal_feed_should_work_after_split/1,
- fun continuous_feed_should_work_during_split/1
- ]
- }
- }
- }.
-
-
-normal_feed_should_work_after_split(#{db1 := Db}) ->
- {timeout, ?TIMEOUT, ?_test(begin
- DocSpec = #{
- docs => [1, 10],
- delete => [5, 6]
- },
- add_test_docs(Db, DocSpec),
-
- % gather pre-shard changes
- BaseArgs = #changes_args{feed = "normal", dir = fwd, since = 0},
- {ok, OldChanges, OldEndSeq} = get_changes_feed(Db, BaseArgs),
-
- % Split the shard
- split_and_wait(Db),
-
- % verify changes list consistent for all the old seqs
- lists:foldl(fun(#{seq := Seq} = C, ExpectedChanges) ->
- Args = BaseArgs#changes_args{since = Seq},
- {ok, Changes, _EndSeq} = get_changes_feed(Db, Args),
- ?assertChanges(ExpectedChanges, Changes),
- [C | ExpectedChanges]
- end, [], OldChanges),
-
- % confirm that old LastSeq respected
- Args1 = BaseArgs#changes_args{since = OldEndSeq},
- {ok, Changes1, EndSeq1} = get_changes_feed(Db, Args1),
- ?assertChanges([], Changes1),
-
- % confirm that new LastSeq also respected
- Args2 = BaseArgs#changes_args{since = EndSeq1},
- {ok, Changes2, EndSeq2} = get_changes_feed(Db, Args2),
- ?assertChanges([], Changes2),
- ?assertEqual(EndSeq2, EndSeq1),
-
- % confirm we didn't lost any changes and have consistent last seq
- {ok, Changes3, EndSeq3} = get_changes_feed(Db, BaseArgs),
- ?assertChanges(OldChanges, Changes3),
-
- % add some docs
- add_test_docs(Db, #{docs => [11, 15]}),
- Args4 = BaseArgs#changes_args{since = EndSeq3},
- {ok, Changes4, EndSeq4} = get_changes_feed(Db, Args4),
- AddedChanges = [#{id => ID} || #doc{id = ID} <- docs([11, 15])],
- ?assertChanges(AddedChanges, Changes4),
-
- % confirm include_docs and deleted works
- Args5 = BaseArgs#changes_args{include_docs = true},
- {ok, Changes5, EndSeq5} = get_changes_feed(Db, Args5),
- ?assertEqual(EndSeq4, EndSeq5),
- [SampleChange] = [C || #{id := ID} = C <- Changes5, ID == <<"00005">>],
- ?assertMatch(#{deleted := true}, SampleChange),
- ?assertMatch(#{doc := {Body}} when is_list(Body), SampleChange),
-
- % update and delete some pre and post split docs
- AllDocs = [couch_doc:from_json_obj(Doc) || #{doc := Doc} <- Changes5],
- UpdateDocs = lists:filtermap(fun
- (#doc{id = <<"00002">>}) -> true;
- (#doc{id = <<"00012">>}) -> true;
- (#doc{id = <<"00004">>} = Doc) -> {true, Doc#doc{deleted = true}};
- (#doc{id = <<"00014">>} = Doc) -> {true, Doc#doc{deleted = true}};
- (_) -> false
- end, AllDocs),
- update_docs(Db, UpdateDocs),
-
- Args6 = BaseArgs#changes_args{since = EndSeq5},
- {ok, Changes6, EndSeq6} = get_changes_feed(Db, Args6),
- UpdatedChanges = [#{id => ID} || #doc{id = ID} <- UpdateDocs],
- ?assertChanges(UpdatedChanges, Changes6),
- [#{seq := Seq6} | _] = Changes6,
- ?assertEqual(EndSeq6, Seq6),
-
- Args7 = Args6#changes_args{dir = rev, limit = 4},
- {ok, Changes7, EndSeq7} = get_changes_feed(Db, Args7),
- ?assertEqual(4, length(Changes7)),
- [#{seq := Seq7} | _] = Changes7,
- ?assertEqual(EndSeq7, Seq7)
- end)}.
-
-
-continuous_feed_should_work_during_split(#{db1 := Db}) ->
- {timeout, ?TIMEOUT, ?_test(begin
- {UpdaterPid, UpdaterRef} = spawn_monitor(fun() ->
- Updater = fun U({State, I}) ->
- receive
- {get_state, {Pid, Ref}} ->
- Pid ! {state, Ref, {State, I}},
- U({State, I});
- add ->
- DocSpec = #{docs => [I, I]},
- add_test_docs(Db, DocSpec),
- U({State, I + 1});
- split ->
- spawn_monitor(fun() -> split_and_wait(Db) end),
- U({"in_process", I});
- stop ->
- receive {'DOWN', _, process, _, _} -> ok end,
- ok
- end
- end,
- Updater({"before", 1})
- end),
-
- Callback = fun
- (start, Acc) ->
- {ok, Acc};
- (waiting_for_updates, Acc) ->
- Ref = make_ref(),
- UpdaterPid ! {get_state, {self(), Ref}},
- receive {state, Ref, {State, _}} -> ok end,
- case {State, length(Acc)} of
- {"before", N} when N < 5 ->
- UpdaterPid ! add,
- {ok, Acc};
- {"before", _} ->
- UpdaterPid ! split,
- {ok, Acc};
- {"in_process", N} when N < 10 ->
- UpdaterPid ! add,
- {ok, Acc};
- {"in_process", _} ->
- {ok, Acc}
- end;
- (timeout, Acc) ->
- {ok, Acc};
- ({change, {Change}}, Acc) ->
- CM = maps:from_list(Change),
- {ok, [CM | Acc]};
- ({stop, EndSeq, _Pending}, Acc) ->
- % Notice updater is still running
- {stop, EndSeq, Acc}
- end,
-
- BaseArgs = #changes_args{
- feed = "continuous",
- heartbeat = 100,
- timeout = 1000
- },
- StopResult = get_changes_feed(Db, BaseArgs, Callback),
-
- % Changes feed stopped when source shard was deleted
- ?assertMatch({stop, _, _}, StopResult),
- {stop, StopEndSeq, StopChanges} = StopResult,
-
- % Add 5 extra docs to the db right after changes feed was stopped
- [UpdaterPid ! add || _ <- lists:seq(1, 5)],
-
- % The the number of documents that updater had added
- Ref = make_ref(),
- UpdaterPid ! {get_state, {self(), Ref}},
- DocCount = receive {state, Ref, {_, I}} -> I - 1 end,
-
- UpdaterPid ! stop,
- receive
- {'DOWN', UpdaterRef, process, UpdaterPid, normal} ->
- ok;
- {'DOWN', UpdaterRef, process, UpdaterPid, Error} ->
- erlang:error({test_context_failed, [
- {module, ?MODULE},
- {line, ?LINE},
- {value, Error},
- {reason, "Updater died"}]})
- end,
-
- AfterArgs = #changes_args{feed = "normal", since = StopEndSeq},
- {ok, AfterChanges, _} = get_changes_feed(Db, AfterArgs),
- DocIDs = [Id || #{id := Id} <- StopChanges ++ AfterChanges],
- ExpectedDocIDs = [doc_id(<<>>, N) || N <- lists:seq(1, DocCount)],
- ?assertEqual(ExpectedDocIDs, lists:usort(DocIDs))
- end)}.
-
-
-split_and_wait(Db) ->
- [#shard{name = Shard}] = lists:sort(mem3:local_shards(Db)),
- {ok, JobId} = mem3_reshard:start_split_job(Shard),
- wait_state(JobId, completed),
- ResultShards = lists:sort(mem3:local_shards(Db)),
- ?assertEqual(2, length(ResultShards)).
-
-
-wait_state(JobId, State) ->
- test_util:wait(fun() ->
- case mem3_reshard:job(JobId) of
- {ok, {Props}} ->
- case couch_util:get_value(job_state, Props) of
- State -> ok;
- _ -> timer:sleep(100), wait
- end;
- {error, not_found} -> timer:sleep(100), wait
- end
- end, 30000).
-
-
-get_changes_feed(Db, Args) ->
- get_changes_feed(Db, Args, fun changes_callback/2).
-
-
-get_changes_feed(Db, Args, Callback) ->
- with_proc(fun() ->
- fabric:changes(Db, Callback, [], Args)
- end).
-
-
-changes_callback(start, Acc) ->
- {ok, Acc};
-changes_callback({change, {Change}}, Acc) ->
- CM = maps:from_list(Change),
- {ok, [CM | Acc]};
-changes_callback({stop, EndSeq, _Pending}, Acc) ->
- {ok, Acc, EndSeq}.
-
-
-%% common helpers from here
-
-
-create_db(DbName, Opts) ->
- GL = erlang:group_leader(),
- with_proc(fun() -> fabric:create_db(DbName, Opts) end, GL).
-
-
-delete_db(DbName) ->
- GL = erlang:group_leader(),
- with_proc(fun() -> fabric:delete_db(DbName, [?ADMIN_CTX]) end, GL).
-
-
-with_proc(Fun) ->
- with_proc(Fun, undefined, 30000).
-
-
-with_proc(Fun, GroupLeader) ->
- with_proc(Fun, GroupLeader, 30000).
-
-
-with_proc(Fun, GroupLeader, Timeout) ->
- {Pid, Ref} = spawn_monitor(fun() ->
- case GroupLeader of
- undefined -> ok;
- _ -> erlang:group_leader(GroupLeader, self())
- end,
- exit({with_proc_res, Fun()})
- end),
- receive
- {'DOWN', Ref, process, Pid, {with_proc_res, Res}} ->
- Res;
- {'DOWN', Ref, process, Pid, Error} ->
- error(Error)
- after Timeout ->
- erlang:demonitor(Ref, [flush]),
- exit(Pid, kill),
- error({with_proc_timeout, Fun, Timeout})
- end.
-
-
-add_test_docs(DbName, #{} = DocSpec) ->
- Docs = docs(maps:get(docs, DocSpec, [])),
- Res = update_docs(DbName, Docs),
- Docs1 = lists:map(fun({Doc, {ok, {RevPos, Rev}}}) ->
- Doc#doc{revs = {RevPos, [Rev]}}
- end, lists:zip(Docs, Res)),
- case delete_docs(maps:get(delete, DocSpec, []), Docs1) of
- [] -> ok;
- [_ | _] = Deleted -> update_docs(DbName, Deleted)
- end,
- ok.
-
-
-update_docs(DbName, Docs) ->
- with_proc(fun() ->
- case fabric:update_docs(DbName, Docs, [?ADMIN_CTX]) of
- {accepted, Res} -> Res;
- {ok, Res} -> Res
- end
- end).
-
-
-delete_docs([S, E], Docs) when E >= S ->
- ToDelete = [doc_id(<<"">>, I) || I <- lists:seq(S, E)],
- lists:filtermap(fun(#doc{id = Id} = Doc) ->
- case lists:member(Id, ToDelete) of
- true -> {true, Doc#doc{deleted = true}};
- false -> false
- end
- end, Docs);
-delete_docs(_, _) ->
- [].
-
-
-docs([S, E]) when E >= S ->
- [doc(<<"">>, I) || I <- lists:seq(S, E)];
-docs(_) ->
- [].
-
-
-doc(Pref, Id) ->
- Body = [{<<"a">>, <<"b">>}],
- doc(Pref, Id, Body, 42).
-
-
-doc(Pref, Id, BodyProps, AttSize) ->
- #doc{
- id = doc_id(Pref, Id),
- body = {BodyProps},
- atts = atts(AttSize)
- }.
-
-
-doc_id(Pref, Id) ->
- IdBin = iolist_to_binary(io_lib:format("~5..0B", [Id])),
- <<Pref/binary, IdBin/binary>>.
-
-
-atts(0) ->
- [];
-
-atts(Size) when is_integer(Size), Size >= 1 ->
- Data = << <<"x">> || _ <- lists:seq(1, Size) >>,
- [couch_att:new([
- {name, <<"att">>},
- {type, <<"app/binary">>},
- {att_len, Size},
- {data, Data}
- ])].