summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNick Vatamaniuc <vatamane@apache.org>2016-04-21 02:57:43 -0400
committerNick Vatamaniuc <vatamane@apache.org>2017-04-28 17:35:50 -0400
commit4d2969d9a3e4899554a96d6711bf1d571277766a (patch)
treef3a1cbd89c4920b99a8770ebf7fd3688a6a286c7
parent9895a734f1054eba2fe0cf582574adf28fdec369 (diff)
downloadcouchdb-4d2969d9a3e4899554a96d6711bf1d571277766a.tar.gz
Implement multi-db shard change monitoring
Monitor shards which match a suffix for creation, deletion, and doc updates. To use implement `couch_multidb_changes` behavior. Call `start_link` with DbSuffix, with an option to skip design docs (`skip_ddocs`). Behavior callback functions will be called when shards are created, deleted, found and updated. Jira: COUCHDB-3324
-rw-r--r--src/couch/src/couch_multidb_changes.erl860
1 files changed, 860 insertions, 0 deletions
diff --git a/src/couch/src/couch_multidb_changes.erl b/src/couch/src/couch_multidb_changes.erl
new file mode 100644
index 000000000..b9cddd18c
--- /dev/null
+++ b/src/couch/src/couch_multidb_changes.erl
@@ -0,0 +1,860 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_multidb_changes).
+
+-behaviour(gen_server).
+
+-export([
+ start_link/4
+]).
+
+-export([
+ init/1,
+ terminate/2,
+ handle_call/3,
+ handle_info/2,
+ handle_cast/2,
+ code_change/3
+]).
+
+-export([
+ changes_reader/3,
+ changes_reader_cb/3
+]).
+
+-include_lib("couch/include/couch_db.hrl").
+-include_lib("mem3/include/mem3.hrl").
+
+-define(CTX, {user_ctx, #user_ctx{roles=[<<"_admin">>, <<"_replicator">>]}}).
+
+-define(AVG_DELAY_MSEC, 100).
+-define(MAX_DELAY_MSEC, 60000).
+
+-record(state, {
+ tid :: ets:tid(),
+ mod :: atom(),
+ ctx :: term(),
+ suffix :: binary(),
+ event_server :: reference(),
+ scanner :: nil | pid(),
+ pids :: [{binary(), pid()}],
+ skip_ddocs :: boolean()
+}).
+
+% Behavior API
+
+% For each db shard with a matching suffix, report created,
+% deleted, found (discovered) and change events.
+
+-callback db_created(DbName :: binary(), Context :: term()) ->
+ Context :: term().
+
+-callback db_deleted(DbName :: binary(), Context :: term()) ->
+ Context :: term().
+
+-callback db_found(DbName :: binary(), Context :: term()) ->
+ Context :: term().
+
+-callback db_change(DbName :: binary(), Change :: term(), Context :: term()) ->
+ Context :: term().
+
+
+% External API
+
+
+% Opts list can contain:
+% - `skip_ddocs` : Skip design docs
+
+-spec start_link(binary(), module(), term(), list()) ->
+ {ok, pid()} | ignore | {error, term()}.
+start_link(DbSuffix, Module, Context, Opts) when
+ is_binary(DbSuffix), is_atom(Module), is_list(Opts) ->
+ gen_server:start_link(?MODULE, [DbSuffix, Module, Context, Opts], []).
+
+
+% gen_server callbacks
+
+init([DbSuffix, Module, Context, Opts]) ->
+ process_flag(trap_exit, true),
+ Server = self(),
+ {ok, #state{
+ tid = ets:new(?MODULE, [set, protected]),
+ mod = Module,
+ ctx = Context,
+ suffix = DbSuffix,
+ event_server = register_with_event_server(Server),
+ scanner = spawn_link(fun() -> scan_all_dbs(Server, DbSuffix) end),
+ pids = [],
+ skip_ddocs = proplists:is_defined(skip_ddocs, Opts)
+ }}.
+
+
+terminate(_Reason, _State) ->
+ ok.
+
+
+handle_call({change, DbName, Change}, _From,
+ #state{skip_ddocs=SkipDDocs, mod=Mod, ctx=Ctx} = State) ->
+ case {SkipDDocs, is_design_doc(Change)} of
+ {true, true} ->
+ {reply, ok, State};
+ {_, _} ->
+ {reply, ok, State#state{ctx=Mod:db_change(DbName, Change, Ctx)}}
+ end;
+
+handle_call({checkpoint, DbName, EndSeq}, _From, #state{tid=Ets} = State) ->
+ case ets:lookup(Ets, DbName) of
+ [] ->
+ true = ets:insert(Ets, {DbName, EndSeq, false});
+ [{DbName, _OldSeq, Rescan}] ->
+ true = ets:insert(Ets, {DbName, EndSeq, Rescan})
+ end,
+ {reply, ok, State}.
+
+
+handle_cast({resume_scan, DbName}, State) ->
+ {noreply, resume_scan(DbName, State)}.
+
+
+handle_info({'$couch_event', DbName, Event}, #state{suffix = Suf} = State) ->
+ case Suf =:= couch_db:dbname_suffix(DbName) of
+ true ->
+ {noreply, db_callback(Event, DbName, State)};
+ _ ->
+ {noreply, State}
+ end;
+
+handle_info({'DOWN', Ref, _, _, Info}, #state{event_server = Ref} = State) ->
+ {stop, {couch_event_server_died, Info}, State};
+
+handle_info({'EXIT', From, normal}, #state{scanner = From} = State) ->
+ {noreply, State#state{scanner=nil}};
+
+handle_info({'EXIT', From, Reason}, #state{scanner = From} = State) ->
+ {stop, {scanner_died, Reason}, State};
+
+handle_info({'EXIT', From, Reason}, #state{pids = Pids} = State) ->
+ couch_log:debug("~p change feed exited ~p", [State#state.suffix, From]),
+ case lists:keytake(From, 2, Pids) of
+ {value, {DbName, From}, NewPids} ->
+ if Reason == normal -> ok; true ->
+ Fmt = "~s : Known change feed ~w died :: ~w",
+ couch_log:error(Fmt, [?MODULE, From, Reason])
+ end,
+ NewState = State#state{pids = NewPids},
+ case ets:lookup(State#state.tid, DbName) of
+ [{DbName, _EndSeq, true}] ->
+ {noreply, resume_scan(DbName, NewState)};
+ _ ->
+ {noreply, NewState}
+ end;
+ false when Reason == normal ->
+ {noreply, State};
+ false ->
+ Fmt = "~s(~p) : Unknown pid ~w died :: ~w",
+ couch_log:error(Fmt, [?MODULE, State#state.suffix, From, Reason]),
+ {stop, {unexpected_exit, From, Reason}, State}
+ end;
+
+handle_info(_Msg, State) ->
+ {noreply, State}.
+
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+
+% Private functions
+
+-spec register_with_event_server(pid()) -> reference().
+register_with_event_server(Server) ->
+ Ref = erlang:monitor(process, couch_event_server),
+ couch_event:register_all(Server),
+ Ref.
+
+
+-spec db_callback(created | deleted | updated, binary(), #state{}) -> #state{}.
+db_callback(created, DbName, #state{mod = Mod, ctx = Ctx} = State) ->
+ State#state{ctx = Mod:db_created(DbName, Ctx)};
+db_callback(deleted, DbName, #state{mod = Mod, ctx = Ctx} = State) ->
+ State#state{ctx = Mod:db_deleted(DbName, Ctx)};
+db_callback(updated, DbName, State) ->
+ resume_scan(DbName, State);
+db_callback(_Other, _DbName, State) ->
+ State.
+
+
+-spec resume_scan(binary(), #state{}) -> #state{}.
+resume_scan(DbName, #state{pids=Pids, tid=Ets} = State) ->
+ case {lists:keyfind(DbName, 1, Pids), ets:lookup(Ets, DbName)} of
+ {{DbName, _}, []} ->
+ % Found existing change feed, but not entry in ETS
+ % Flag a need to rescan from begining
+ true = ets:insert(Ets, {DbName, 0, true}),
+ State;
+ {{DbName, _}, [{DbName, EndSeq, _}]} ->
+ % Found existing change feed and entry in ETS
+ % Flag a need to rescan from last ETS checkpoint
+ true = ets:insert(Ets, {DbName, EndSeq, true}),
+ State;
+ {false, []} ->
+ % No existing change feed running. No entry in ETS.
+ % This is first time seeing this db shard.
+ % Notify user with a found callback. Insert checkpoint
+ % entry in ETS to start from 0. And start a change feed.
+ true = ets:insert(Ets, {DbName, 0, false}),
+ Mod = State#state.mod,
+ Ctx = Mod:db_found(DbName, State#state.ctx),
+ Pid = start_changes_reader(DbName, 0),
+ State#state{ctx=Ctx, pids=[{DbName, Pid} | Pids]};
+ {false, [{DbName, EndSeq, _}]} ->
+ % No existing change feed running. Found existing checkpoint.
+ % Start a new change reader from last checkpoint.
+ true = ets:insert(Ets, {DbName, EndSeq, false}),
+ Pid = start_changes_reader(DbName, EndSeq),
+ State#state{pids=[{DbName, Pid} | Pids]}
+ end.
+
+
+start_changes_reader(DbName, Since) ->
+ spawn_link(?MODULE, changes_reader, [self(), DbName, Since]).
+
+
+changes_reader(Server, DbName, Since) ->
+ {ok, Db} = couch_db:open_int(DbName, [?CTX, sys_db]),
+ ChangesArgs = #changes_args{
+ include_docs = true,
+ since = Since,
+ feed = "normal",
+ timeout = infinity
+ },
+ ChFun = couch_changes:handle_db_changes(ChangesArgs, {json_req, null}, Db),
+ ChFun({fun ?MODULE:changes_reader_cb/3, {Server, DbName}}).
+
+
+changes_reader_cb({change, Change, _}, _, {Server, DbName}) ->
+ ok = gen_server:call(Server, {change, DbName, Change}, infinity),
+ {Server, DbName};
+changes_reader_cb({stop, EndSeq}, _, {Server, DbName}) ->
+ ok = gen_server:call(Server, {checkpoint, DbName, EndSeq}, infinity),
+ {Server, DbName};
+changes_reader_cb(_, _, Acc) ->
+ Acc.
+
+
+scan_all_dbs(Server, DbSuffix) when is_pid(Server) ->
+ ok = scan_local_db(Server, DbSuffix),
+ {ok, Db} = mem3_util:ensure_exists(
+ config:get("mem3", "shards_db", "_dbs")),
+ ChangesFun = couch_changes:handle_changes(#changes_args{}, nil, Db, nil),
+ ChangesFun(fun({change, {Change}, _}, _) ->
+ DbName = couch_util:get_value(<<"id">>, Change),
+ case DbName of
+ <<"_design/", _/binary>> ->
+ ok;
+ _Else ->
+ case couch_replicator_utils:is_deleted(Change) of
+ true ->
+ ok;
+ false ->
+ [gen_server:cast(Server, {resume_scan, ShardName})
+ || ShardName <- filter_shards(DbName, DbSuffix)],
+ ok
+ end
+ end;
+ (_, _) -> ok
+ end),
+ couch_db:close(Db).
+
+
+filter_shards(DbName, DbSuffix) ->
+ case DbSuffix =:= couch_db:dbname_suffix(DbName) of
+ false ->
+ [];
+ true ->
+ try
+ [ShardName || #shard{name = ShardName} <- mem3:local_shards(DbName)]
+ catch
+ error:database_does_not_exist ->
+ []
+ end
+ end.
+
+
+scan_local_db(Server, DbSuffix) when is_pid(Server) ->
+ case couch_db:open_int(DbSuffix, [?CTX, sys_db, nologifmissing]) of
+ {ok, Db} ->
+ gen_server:cast(Server, {resume_scan, DbSuffix}),
+ ok = couch_db:close(Db);
+ _Error ->
+ ok
+ end.
+
+
+is_design_doc({Change}) ->
+ case lists:keyfind(<<"id">>, 1, Change) of
+ false ->
+ false;
+ {_, Id} ->
+ is_design_doc_id(Id)
+ end.
+
+
+is_design_doc_id(<<?DESIGN_DOC_PREFIX, _/binary>>) ->
+ true;
+is_design_doc_id(_) ->
+ false.
+
+
+-ifdef(TEST).
+
+-include_lib("eunit/include/eunit.hrl").
+-include_lib("couch/include/couch_eunit.hrl").
+
+-define(MOD, multidb_test_module).
+-define(SUFFIX, <<"suff">>).
+-define(DBNAME, <<"shards/40000000-5fffffff/acct/suff.0123456789">>).
+
+couch_multidb_changes_test_() ->
+ {
+ foreach,
+ fun setup/0,
+ fun teardown/1,
+ [
+ t_handle_call_change(),
+ t_handle_call_change_filter_design_docs(),
+ t_handle_call_checkpoint_new(),
+ t_handle_call_checkpoint_existing(),
+ t_handle_info_created(),
+ t_handle_info_deleted(),
+ t_handle_info_updated(),
+ t_handle_info_other_event(),
+ t_handle_info_created_other_db(),
+ t_handle_info_scanner_exit_normal(),
+ t_handle_info_scanner_crashed(),
+ t_handle_info_event_server_exited(),
+ t_handle_info_unknown_pid_exited(),
+ t_handle_info_change_feed_exited(),
+ t_handle_info_change_feed_exited_and_need_rescan(),
+ t_spawn_changes_reader(),
+ t_changes_reader_cb_change(),
+ t_changes_reader_cb_stop(),
+ t_changes_reader_cb_other(),
+ t_handle_call_resume_scan_no_chfeed_no_ets_entry(),
+ t_handle_call_resume_scan_chfeed_no_ets_entry(),
+ t_handle_call_resume_scan_chfeed_ets_entry(),
+ t_handle_call_resume_scan_no_chfeed_ets_entry(),
+ t_start_link(),
+ t_start_link_no_ddocs(),
+ t_misc_gen_server_callbacks()
+ ]
+ }.
+
+
+setup() ->
+ mock_logs(),
+ mock_callback_mod(),
+ meck:expect(couch_event, register_all, 1, ok),
+ meck:expect(config, get, ["mem3", "shards_db", '_'], "_dbs"),
+ meck:expect(mem3_util, ensure_exists, 1, {ok, dbs}),
+ ChangesFun = meck:val(fun(_) -> ok end),
+ meck:expect(couch_changes, handle_changes, 4, ChangesFun),
+ meck:expect(couch_db, open_int,
+ fun(?DBNAME, [?CTX, sys_db]) -> {ok, db};
+ (_, _) -> {not_found, no_db_file}
+ end),
+ meck:expect(couch_db, close, 1, ok),
+ mock_changes_reader(),
+ % create process to stand in for couch_event_server
+ % mocking erlang:monitor doesn't work, so give it real process to monitor
+ EvtPid = spawn_link(fun() -> receive looper -> ok end end),
+ true = register(couch_event_server, EvtPid),
+ EvtPid.
+
+
+teardown(EvtPid) ->
+ unlink(EvtPid),
+ exit(EvtPid, kill),
+ meck:unload().
+
+
+t_handle_call_change() ->
+ ?_test(begin
+ State = mock_state(),
+ Change = change_row(<<"blah">>),
+ handle_call_ok({change, ?DBNAME, Change}, State),
+ ?assert(meck:validate(?MOD)),
+ ?assert(meck:called(?MOD, db_change, [?DBNAME, Change, zig]))
+ end).
+
+
+t_handle_call_change_filter_design_docs() ->
+ ?_test(begin
+ State0 = mock_state(),
+ State = State0#state{skip_ddocs = true},
+ Change = change_row(<<"_design/blah">>),
+ handle_call_ok({change, ?DBNAME, Change}, State),
+ ?assert(meck:validate(?MOD)),
+ ?assertNot(meck:called(?MOD, db_change, [?DBNAME, Change, zig]))
+ end).
+
+
+t_handle_call_checkpoint_new() ->
+ ?_test(begin
+ Tid = mock_ets(),
+ State = mock_state(Tid),
+ handle_call_ok({checkpoint, ?DBNAME, 1}, State),
+ ?assertEqual([{?DBNAME, 1, false}], ets:tab2list(Tid)),
+ ets:delete(Tid)
+ end).
+
+
+t_handle_call_checkpoint_existing() ->
+ ?_test(begin
+ Tid = mock_ets(),
+ State = mock_state(Tid),
+ true = ets:insert(Tid, {?DBNAME, 1, true}),
+ handle_call_ok({checkpoint, ?DBNAME, 2}, State),
+ ?assertEqual([{?DBNAME, 2, true}], ets:tab2list(Tid)),
+ ets:delete(Tid)
+ end).
+
+
+t_handle_info_created() ->
+ ?_test(begin
+ State = mock_state(),
+ handle_info_check({'$couch_event', ?DBNAME, created}, State),
+ ?assert(meck:validate(?MOD)),
+ ?assert(meck:called(?MOD, db_created, [?DBNAME, zig]))
+ end).
+
+
+t_handle_info_deleted() ->
+ ?_test(begin
+ State = mock_state(),
+ handle_info_check({'$couch_event', ?DBNAME, deleted}, State),
+ ?assert(meck:validate(?MOD)),
+ ?assert(meck:called(?MOD, db_deleted, [?DBNAME, zig]))
+ end).
+
+
+t_handle_info_updated() ->
+ ?_test(begin
+ Tid = mock_ets(),
+ State = mock_state(Tid),
+ handle_info_check({'$couch_event', ?DBNAME, updated}, State),
+ ?assert(meck:validate(?MOD)),
+ ?assert(meck:called(?MOD, db_found, [?DBNAME, zig]))
+ end).
+
+
+t_handle_info_other_event() ->
+ ?_test(begin
+ State = mock_state(),
+ handle_info_check({'$couch_event', ?DBNAME, somethingelse}, State),
+ ?assertNot(meck:called(?MOD, db_created, [?DBNAME, somethingelse])),
+ ?assertNot(meck:called(?MOD, db_deleted, [?DBNAME, somethingelse])),
+ ?assertNot(meck:called(?MOD, db_found, [?DBNAME, somethingelse]))
+ end).
+
+
+t_handle_info_created_other_db() ->
+ ?_test(begin
+ State = mock_state(),
+ handle_info_check({'$couch_event', <<"otherdb">>, created}, State),
+ ?assertNot(meck:called(?MOD, db_created, [?DBNAME, zig]))
+ end).
+
+
+t_handle_info_scanner_exit_normal() ->
+ ?_test(begin
+ Res = handle_info({'EXIT', spid, normal}, mock_state()),
+ ?assertMatch({noreply, _}, Res),
+ {noreply, RState} = Res,
+ ?assertEqual(nil, RState#state.scanner)
+ end).
+
+
+t_handle_info_scanner_crashed() ->
+ ?_test(begin
+ Res = handle_info({'EXIT', spid, oops}, mock_state()),
+ ?assertMatch({stop, {scanner_died, oops}, _State}, Res)
+ end).
+
+
+t_handle_info_event_server_exited() ->
+ ?_test(begin
+ Res = handle_info({'DOWN', esref, type, espid, reason}, mock_state()),
+ ?assertMatch({stop, {couch_event_server_died, reason}, _}, Res)
+ end).
+
+
+t_handle_info_unknown_pid_exited() ->
+ ?_test(begin
+ State0 = mock_state(),
+ Res0 = handle_info({'EXIT', somepid, normal}, State0),
+ ?assertMatch({noreply, State0}, Res0),
+ State1 = mock_state(),
+ Res1 = handle_info({'EXIT', somepid, oops}, State1),
+ ?assertMatch({stop, {unexpected_exit, somepid, oops}, State1}, Res1)
+ end).
+
+
+t_handle_info_change_feed_exited() ->
+ ?_test(begin
+ Tid0 = mock_ets(),
+ State0 = mock_state(Tid0, cpid),
+ Res0 = handle_info({'EXIT', cpid, normal}, State0),
+ ?assertMatch({noreply, _}, Res0),
+ {noreply, RState0} = Res0,
+ ?assertEqual([], RState0#state.pids),
+ ets:delete(Tid0),
+ Tid1 = mock_ets(),
+ State1 = mock_state(Tid1, cpid),
+ Res1 = handle_info({'EXIT', cpid, oops}, State1),
+ ?assertMatch({noreply, _}, Res1),
+ {noreply, RState1} = Res1,
+ ?assertEqual([], RState1#state.pids),
+ ets:delete(Tid1)
+ end).
+
+
+t_handle_info_change_feed_exited_and_need_rescan() ->
+ ?_test(begin
+ Tid = mock_ets(),
+ true = ets:insert(Tid, {?DBNAME, 1, true}),
+ State = mock_state(Tid, cpid),
+ Res = handle_info({'EXIT', cpid, normal}, State),
+ ?assertMatch({noreply, _}, Res),
+ {noreply, RState} = Res,
+ % rescan flag should have been reset to false
+ ?assertEqual([{?DBNAME, 1, false}], ets:tab2list(Tid)),
+ % a mock change feed process should be running
+ [{?DBNAME, Pid}] = RState#state.pids,
+ ?assert(is_pid(Pid)),
+ ChArgs = kill_mock_changes_reader_and_get_its_args(Pid),
+ ?assertEqual({self(), ?DBNAME}, ChArgs),
+ ets:delete(Tid)
+ end).
+
+
+t_spawn_changes_reader() ->
+ ?_test(begin
+ Pid = start_changes_reader(?DBNAME, 3),
+ ?assert(erlang:is_process_alive(Pid)),
+ ChArgs = kill_mock_changes_reader_and_get_its_args(Pid),
+ ?assertEqual({self(), ?DBNAME}, ChArgs),
+ ?assert(meck:validate(couch_db)),
+ ?assert(meck:validate(couch_changes)),
+ ?assert(meck:called(couch_db, open_int, [?DBNAME, [?CTX, sys_db]])),
+ ?assert(meck:called(couch_changes, handle_db_changes, [
+ #changes_args{
+ include_docs = true,
+ since = 3,
+ feed = "normal",
+ timeout = infinity
+ }, {json_req, null}, db]))
+ end).
+
+
+t_changes_reader_cb_change() ->
+ ?_test(begin
+ {ok, Pid} = start_link(?SUFFIX, ?MOD, zig, []),
+ Change = change_row(<<"blah">>),
+ ChArg = {change, Change, ignore},
+ {Pid, ?DBNAME} = changes_reader_cb(ChArg, chtype, {Pid, ?DBNAME}),
+ ?assert(meck:called(?MOD, db_change, [?DBNAME, Change, zig])),
+ unlink(Pid),
+ exit(Pid, kill)
+ end).
+
+
+t_changes_reader_cb_stop() ->
+ ?_test(begin
+ {ok, Pid} = start_link(?SUFFIX, ?MOD, zig, []),
+ ChArg = {stop, 11},
+ {Pid, ?DBNAME} = changes_reader_cb(ChArg, chtype, {Pid, ?DBNAME}),
+ % We checkpoint on stop, check if checkpointed at correct sequence
+ #state{tid = Tid} = sys:get_state(Pid),
+ ?assertEqual([{?DBNAME, 11, false}], ets:tab2list(Tid)),
+ unlink(Pid),
+ exit(Pid, kill)
+ end).
+
+
+t_changes_reader_cb_other() ->
+ ?_assertEqual(acc, changes_reader_cb(other, chtype, acc)).
+
+
+t_handle_call_resume_scan_no_chfeed_no_ets_entry() ->
+ ?_test(begin
+ Tid = mock_ets(),
+ State = mock_state(Tid),
+ RState = resume_scan(?DBNAME, State),
+ % Check if inserted checkpoint entry in ets starting at 0
+ ?assertEqual([{?DBNAME, 0, false}], ets:tab2list(Tid)),
+ % Check if called db_found callback
+ ?assert(meck:called(?MOD, db_found, [?DBNAME, zig])),
+ % Check if started a change reader
+ [{?DBNAME, Pid}] = RState#state.pids,
+ ChArgs = kill_mock_changes_reader_and_get_its_args(Pid),
+ ?assertEqual({self(), ?DBNAME}, ChArgs),
+ ?assert(meck:called(couch_changes, handle_db_changes, [
+ #changes_args{
+ include_docs = true,
+ since = 0,
+ feed = "normal",
+ timeout = infinity
+ }, {json_req, null}, db])),
+ ets:delete(Tid)
+ end).
+
+
+t_handle_call_resume_scan_chfeed_no_ets_entry() ->
+ ?_test(begin
+ Tid = mock_ets(),
+ Pid = start_changes_reader(?DBNAME, 0),
+ State = mock_state(Tid, Pid),
+ resume_scan(?DBNAME, State),
+ % Check ets checkpoint is set to 0 and rescan = true
+ ?assertEqual([{?DBNAME, 0, true}], ets:tab2list(Tid)),
+ ets:delete(Tid),
+ kill_mock_changes_reader_and_get_its_args(Pid)
+ end).
+
+
+t_handle_call_resume_scan_chfeed_ets_entry() ->
+ ?_test(begin
+ Tid = mock_ets(),
+ true = ets:insert(Tid, [{?DBNAME, 2, false}]),
+ Pid = start_changes_reader(?DBNAME, 1),
+ State = mock_state(Tid, Pid),
+ resume_scan(?DBNAME, State),
+ % Check ets checkpoint is set to same endseq but rescan = true
+ ?assertEqual([{?DBNAME, 2, true}], ets:tab2list(Tid)),
+ ets:delete(Tid),
+ kill_mock_changes_reader_and_get_its_args(Pid)
+ end).
+
+
+t_handle_call_resume_scan_no_chfeed_ets_entry() ->
+ ?_test(begin
+ Tid = mock_ets(),
+ true = ets:insert(Tid, [{?DBNAME, 1, true}]),
+ State = mock_state(Tid),
+ RState = resume_scan(?DBNAME, State),
+ % Check if reset rescan to false but kept same endseq
+ ?assertEqual([{?DBNAME, 1, false}], ets:tab2list(Tid)),
+ % Check if started a change reader
+ [{?DBNAME, Pid}] = RState#state.pids,
+ ChArgs = kill_mock_changes_reader_and_get_its_args(Pid),
+ ?assertEqual({self(), ?DBNAME}, ChArgs),
+ ?assert(meck:called(couch_changes, handle_db_changes, [
+ #changes_args{
+ include_docs = true,
+ since = 1,
+ feed = "normal",
+ timeout = infinity
+ }, {json_req, null}, db])),
+ ets:delete(Tid)
+ end).
+
+
+t_start_link() ->
+ ?_test(begin
+ {ok, Pid} = start_link(?SUFFIX, ?MOD, nil, []),
+ ?assert(is_pid(Pid)),
+ ?assertMatch(#state{
+ mod = ?MOD,
+ suffix = ?SUFFIX,
+ ctx = nil,
+ pids = [],
+ skip_ddocs = false
+ }, sys:get_state(Pid)),
+ unlink(Pid),
+ exit(Pid, kill),
+ ?assert(meck:called(couch_event, register_all, [Pid]))
+ end).
+
+
+t_start_link_no_ddocs() ->
+ ?_test(begin
+ {ok, Pid} = start_link(?SUFFIX, ?MOD, nil, [skip_ddocs]),
+ ?assert(is_pid(Pid)),
+ ?assertMatch(#state{
+ mod = ?MOD,
+ suffix = ?SUFFIX,
+ ctx = nil,
+ pids = [],
+ skip_ddocs = true
+ }, sys:get_state(Pid)),
+ unlink(Pid),
+ exit(Pid, kill)
+ end).
+
+
+t_misc_gen_server_callbacks() ->
+ ?_test(begin
+ ?assertEqual(ok, terminate(reason, state)),
+ ?assertEqual({ok, state}, code_change(old, state, extra))
+ end).
+
+
+scan_dbs_test_() ->
+{
+ foreach,
+ fun() -> test_util:start_couch([mem3, fabric]) end,
+ fun(Ctx) -> test_util:stop_couch(Ctx) end,
+ [
+ t_pass_shard(),
+ t_fail_shard(),
+ t_pass_local(),
+ t_fail_local()
+ ]
+}.
+
+
+t_pass_shard() ->
+ ?_test(begin
+ DbName0 = ?tempdb(),
+ DbSuffix = <<"_replicator">>,
+ DbName = <<DbName0/binary, "/", DbSuffix/binary>>,
+ ok = fabric:create_db(DbName, [?CTX]),
+ ?assertEqual(8, length(filter_shards(DbName, DbSuffix))),
+ fabric:delete_db(DbName, [?CTX])
+ end).
+
+
+t_fail_shard() ->
+ ?_test(begin
+ DbName = ?tempdb(),
+ ok = fabric:create_db(DbName, [?CTX]),
+ ?assertEqual([], filter_shards(DbName, <<"_replicator">>)),
+ fabric:delete_db(DbName, [?CTX])
+ end).
+
+
+t_pass_local() ->
+ ?_test(begin
+ LocalDb = ?tempdb(),
+ {ok, Db} = couch_db:create(LocalDb, [?CTX]),
+ ok = couch_db:close(Db),
+ scan_local_db(self(), LocalDb),
+ receive
+ {'$gen_cast', Msg} ->
+ ?assertEqual(Msg, {resume_scan, LocalDb})
+ after 0 ->
+ ?assert(false)
+ end
+ end).
+
+
+t_fail_local() ->
+ ?_test(begin
+ LocalDb = ?tempdb(),
+ {ok, Db} = couch_db:create(LocalDb, [?CTX]),
+ ok = couch_db:close(Db),
+ scan_local_db(self(), <<"some_other_db">>),
+ receive
+ {'$gen_cast', Msg} ->
+ ?assertNotEqual(Msg, {resume_scan, LocalDb})
+ after 0 ->
+ ?assert(true)
+ end
+ end).
+
+
+% Test helper functions
+
+mock_logs() ->
+ meck:expect(couch_log, error, 2, ok),
+ meck:expect(couch_log, notice, 2, ok),
+ meck:expect(couch_log, info, 2, ok),
+ meck:expect(couch_log, debug, 2, ok).
+
+
+mock_callback_mod() ->
+ meck:new(?MOD, [non_strict]),
+ meck:expect(?MOD, db_created, fun(_DbName, Ctx) -> Ctx end),
+ meck:expect(?MOD, db_deleted, fun(_DbName, Ctx) -> Ctx end),
+ meck:expect(?MOD, db_found, fun(_DbName, Ctx) -> Ctx end),
+ meck:expect(?MOD, db_change, fun(_DbName, _Change, Ctx) -> Ctx end).
+
+
+mock_changes_reader_loop({_CbFun, {Server, DbName}}) ->
+ receive
+ die ->
+ exit({Server, DbName})
+ end.
+
+kill_mock_changes_reader_and_get_its_args(Pid) ->
+ Ref = monitor(process, Pid),
+ unlink(Pid),
+ Pid ! die,
+ receive
+ {'DOWN', Ref, _, Pid, {Server, DbName}} ->
+ {Server, DbName}
+ after 1000 ->
+ erlang:error(spawn_change_reader_timeout)
+ end.
+
+
+mock_changes_reader() ->
+ meck:expect(couch_changes, handle_db_changes,
+ fun(_ChArgs, _Req, db) ->
+ fun mock_changes_reader_loop/1
+ end).
+
+
+mock_ets() ->
+ ets:new(multidb_test_ets, [set, public]).
+
+
+mock_state() ->
+ #state{
+ mod = ?MOD,
+ ctx = zig,
+ suffix = ?SUFFIX,
+ event_server = esref,
+ scanner = spid,
+ pids = []}.
+
+
+mock_state(Ets) ->
+ State = mock_state(),
+ State#state{tid = Ets}.
+
+
+mock_state(Ets, Pid) ->
+ State = mock_state(Ets),
+ State#state{pids = [{?DBNAME, Pid}]}.
+
+
+change_row(Id) when is_binary(Id) ->
+ {[
+ {<<"seq">>, 1},
+ {<<"id">>, Id},
+ {<<"changes">>, [{[{<<"rev">>, <<"1-f00">>}]}]},
+ {doc, {[{<<"_id">>, Id}, {<<"_rev">>, <<"1-f00">>}]}}
+ ]}.
+
+
+handle_call_ok(Msg, State) ->
+ ?assertMatch({reply, ok, _}, handle_call(Msg, from, State)).
+
+
+handle_info_check(Msg, State) ->
+ ?assertMatch({noreply, _}, handle_info(Msg, State)).
+
+
+-endif.