summaryrefslogtreecommitdiff
path: root/src/couch/src/couch_multidb_changes.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/couch/src/couch_multidb_changes.erl')
-rw-r--r--src/couch/src/couch_multidb_changes.erl903
1 files changed, 0 insertions, 903 deletions
diff --git a/src/couch/src/couch_multidb_changes.erl b/src/couch/src/couch_multidb_changes.erl
deleted file mode 100644
index 09278656e..000000000
--- a/src/couch/src/couch_multidb_changes.erl
+++ /dev/null
@@ -1,903 +0,0 @@
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-%
-% http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
-
--module(couch_multidb_changes).
-
--behaviour(gen_server).
-
--export([
- start_link/4
-]).
-
--export([
- init/1,
- terminate/2,
- handle_call/3,
- handle_info/2,
- handle_cast/2,
- code_change/3,
- format_status/2
-]).
-
--export([
- changes_reader/3,
- changes_reader_cb/3
-]).
-
--include_lib("couch/include/couch_db.hrl").
--include_lib("mem3/include/mem3.hrl").
-
--define(CTX, {user_ctx, #user_ctx{roles=[<<"_admin">>, <<"_replicator">>]}}).
-
--define(AVG_DELAY_MSEC, 10).
--define(MAX_DELAY_MSEC, 120000).
-
--record(state, {
- tid :: ets:tid(),
- mod :: atom(),
- ctx :: term(),
- suffix :: binary(),
- event_server :: reference(),
- scanner :: nil | pid(),
- pids :: [{binary(), pid()}],
- skip_ddocs :: boolean()
-}).
-
-% Behavior API
-
-% For each db shard with a matching suffix, report created,
-% deleted, found (discovered) and change events.
-
--callback db_created(DbName :: binary(), Context :: term()) ->
- Context :: term().
-
--callback db_deleted(DbName :: binary(), Context :: term()) ->
- Context :: term().
-
--callback db_found(DbName :: binary(), Context :: term()) ->
- Context :: term().
-
--callback db_change(DbName :: binary(), Change :: term(), Context :: term()) ->
- Context :: term().
-
-
-% External API
-
-
-% Opts list can contain:
-% - `skip_ddocs` : Skip design docs
-
--spec start_link(binary(), module(), term(), list()) ->
- {ok, pid()} | ignore | {error, term()}.
-start_link(DbSuffix, Module, Context, Opts) when
- is_binary(DbSuffix), is_atom(Module), is_list(Opts) ->
- gen_server:start_link(?MODULE, [DbSuffix, Module, Context, Opts], []).
-
-
-% gen_server callbacks
-
-init([DbSuffix, Module, Context, Opts]) ->
- process_flag(trap_exit, true),
- Server = self(),
- {ok, #state{
- tid = ets:new(?MODULE, [set, protected]),
- mod = Module,
- ctx = Context,
- suffix = DbSuffix,
- event_server = register_with_event_server(Server),
- scanner = spawn_link(fun() -> scan_all_dbs(Server, DbSuffix) end),
- pids = [],
- skip_ddocs = proplists:is_defined(skip_ddocs, Opts)
- }}.
-
-
-terminate(_Reason, _State) ->
- ok.
-
-
-handle_call({change, DbName, Change}, _From,
- #state{skip_ddocs=SkipDDocs, mod=Mod, ctx=Ctx} = State) ->
- case {SkipDDocs, is_design_doc(Change)} of
- {true, true} ->
- {reply, ok, State};
- {_, _} ->
- {reply, ok, State#state{ctx=Mod:db_change(DbName, Change, Ctx)}}
- end;
-
-handle_call({checkpoint, DbName, EndSeq}, _From, #state{tid=Ets} = State) ->
- case ets:lookup(Ets, DbName) of
- [] ->
- true = ets:insert(Ets, {DbName, EndSeq, false});
- [{DbName, _OldSeq, Rescan}] ->
- true = ets:insert(Ets, {DbName, EndSeq, Rescan})
- end,
- {reply, ok, State}.
-
-
-handle_cast({resume_scan, DbName}, State) ->
- {noreply, resume_scan(DbName, State)}.
-
-
-handle_info({'$couch_event', DbName, Event}, #state{suffix = Suf} = State) ->
- case Suf =:= couch_db:dbname_suffix(DbName) of
- true ->
- {noreply, db_callback(Event, DbName, State)};
- _ ->
- {noreply, State}
- end;
-
-handle_info({'DOWN', Ref, _, _, Info}, #state{event_server = Ref} = State) ->
- {stop, {couch_event_server_died, Info}, State};
-
-handle_info({'EXIT', From, normal}, #state{scanner = From} = State) ->
- {noreply, State#state{scanner=nil}};
-
-handle_info({'EXIT', From, Reason}, #state{scanner = From} = State) ->
- {stop, {scanner_died, Reason}, State};
-
-handle_info({'EXIT', From, Reason}, #state{pids = Pids} = State) ->
- couch_log:debug("~p change feed exited ~p", [State#state.suffix, From]),
- case lists:keytake(From, 2, Pids) of
- {value, {DbName, From}, NewPids} ->
- if Reason == normal -> ok; true ->
- Fmt = "~s : Known change feed ~w died :: ~w",
- couch_log:error(Fmt, [?MODULE, From, Reason])
- end,
- NewState = State#state{pids = NewPids},
- case ets:lookup(State#state.tid, DbName) of
- [{DbName, _EndSeq, true}] ->
- {noreply, resume_scan(DbName, NewState)};
- _ ->
- {noreply, NewState}
- end;
- false when Reason == normal ->
- {noreply, State};
- false ->
- Fmt = "~s(~p) : Unknown pid ~w died :: ~w",
- couch_log:error(Fmt, [?MODULE, State#state.suffix, From, Reason]),
- {stop, {unexpected_exit, From, Reason}, State}
- end;
-
-handle_info(_Msg, State) ->
- {noreply, State}.
-
-
-code_change(_OldVsn, State, _Extra) ->
- {ok, State}.
-
-
-format_status(_Opt, [_PDict, State]) ->
- #state{
- pids=Pids
- } = State,
- Scrubbed = State#state{
- pids={length, length(Pids)}
- },
- [{data, [{"State",
- ?record_to_keyval(state, Scrubbed)
- }]}].
-
-% Private functions
-
--spec register_with_event_server(pid()) -> reference().
-register_with_event_server(Server) ->
- Ref = erlang:monitor(process, couch_event_server),
- couch_event:register_all(Server),
- Ref.
-
-
--spec db_callback(created | deleted | updated, binary(), #state{}) -> #state{}.
-db_callback(created, DbName, #state{mod = Mod, ctx = Ctx} = State) ->
- NewState = State#state{ctx = Mod:db_created(DbName, Ctx)},
- resume_scan(DbName, NewState);
-db_callback(deleted, DbName, #state{mod = Mod, ctx = Ctx} = State) ->
- State#state{ctx = Mod:db_deleted(DbName, Ctx)};
-db_callback(updated, DbName, State) ->
- resume_scan(DbName, State);
-db_callback(_Other, _DbName, State) ->
- State.
-
-
--spec resume_scan(binary(), #state{}) -> #state{}.
-resume_scan(DbName, #state{pids=Pids, tid=Ets} = State) ->
- case {lists:keyfind(DbName, 1, Pids), ets:lookup(Ets, DbName)} of
- {{DbName, _}, []} ->
- % Found existing change feed, but not entry in ETS
- % Flag a need to rescan from begining
- true = ets:insert(Ets, {DbName, 0, true}),
- State;
- {{DbName, _}, [{DbName, EndSeq, _}]} ->
- % Found existing change feed and entry in ETS
- % Flag a need to rescan from last ETS checkpoint
- true = ets:insert(Ets, {DbName, EndSeq, true}),
- State;
- {false, []} ->
- % No existing change feed running. No entry in ETS.
- % This is first time seeing this db shard.
- % Notify user with a found callback. Insert checkpoint
- % entry in ETS to start from 0. And start a change feed.
- true = ets:insert(Ets, {DbName, 0, false}),
- Mod = State#state.mod,
- Ctx = Mod:db_found(DbName, State#state.ctx),
- Pid = start_changes_reader(DbName, 0),
- State#state{ctx=Ctx, pids=[{DbName, Pid} | Pids]};
- {false, [{DbName, EndSeq, _}]} ->
- % No existing change feed running. Found existing checkpoint.
- % Start a new change reader from last checkpoint.
- true = ets:insert(Ets, {DbName, EndSeq, false}),
- Pid = start_changes_reader(DbName, EndSeq),
- State#state{pids=[{DbName, Pid} | Pids]}
- end.
-
-
-start_changes_reader(DbName, Since) ->
- spawn_link(?MODULE, changes_reader, [self(), DbName, Since]).
-
-
-changes_reader(Server, DbName, Since) ->
- {ok, Db} = couch_db:open_int(DbName, [?CTX, sys_db]),
- ChangesArgs = #changes_args{
- include_docs = true,
- since = Since,
- feed = "normal",
- timeout = infinity
- },
- ChFun = couch_changes:handle_db_changes(ChangesArgs, {json_req, null}, Db),
- ChFun({fun ?MODULE:changes_reader_cb/3, {Server, DbName}}).
-
-
-changes_reader_cb({change, Change, _}, _, {Server, DbName}) ->
- ok = gen_server:call(Server, {change, DbName, Change}, infinity),
- {Server, DbName};
-changes_reader_cb({stop, EndSeq}, _, {Server, DbName}) ->
- ok = gen_server:call(Server, {checkpoint, DbName, EndSeq}, infinity),
- {Server, DbName};
-changes_reader_cb(_, _, Acc) ->
- Acc.
-
-
-scan_all_dbs(Server, DbSuffix) when is_pid(Server) ->
- ok = scan_local_db(Server, DbSuffix),
- {ok, Db} = mem3_util:ensure_exists(
- config:get("mem3", "shards_db", "_dbs")),
- ChangesFun = couch_changes:handle_db_changes(#changes_args{}, nil, Db),
- ChangesFun({fun scan_changes_cb/3, {Server, DbSuffix, 1}}),
- couch_db:close(Db).
-
-
-scan_changes_cb({change, {Change}, _}, _, {_Server, DbSuffix, _Count} = Acc) ->
- DbName = couch_util:get_value(<<"id">>, Change),
- case DbName of <<"_design/", _/binary>> -> Acc; _Else ->
- NameMatch = DbSuffix =:= couch_db:dbname_suffix(DbName),
- case {NameMatch, couch_replicator_utils:is_deleted(Change)} of
- {false, _} ->
- Acc;
- {true, true} ->
- Acc;
- {true, false} ->
- Shards = local_shards(DbName),
- lists:foldl(fun notify_fold/2, Acc, Shards)
- end
- end;
-scan_changes_cb(_, _, Acc) ->
- Acc.
-
-
-local_shards(DbName) ->
- try
- [ShardName || #shard{name = ShardName} <- mem3:local_shards(DbName)]
- catch
- error:database_does_not_exist ->
- []
- end.
-
-
-notify_fold(DbName, {Server, DbSuffix, Count}) ->
- Jitter = jitter(Count),
- spawn_link(fun() ->
- timer:sleep(Jitter),
- gen_server:cast(Server, {resume_scan, DbName})
- end),
- {Server, DbSuffix, Count + 1}.
-
-
-% Jitter is proportional to the number of shards found so far. This is done to
-% avoid a stampede and notifying the callback function with potentially a large
-% number of shards back to back during startup.
-jitter(N) ->
- Range = min(2 * N * ?AVG_DELAY_MSEC, ?MAX_DELAY_MSEC),
- couch_rand:uniform(Range).
-
-
-scan_local_db(Server, DbSuffix) when is_pid(Server) ->
- case couch_db:open_int(DbSuffix, [?CTX, sys_db, nologifmissing]) of
- {ok, Db} ->
- gen_server:cast(Server, {resume_scan, DbSuffix}),
- ok = couch_db:close(Db);
- _Error ->
- ok
- end.
-
-
-is_design_doc({Change}) ->
- case lists:keyfind(<<"id">>, 1, Change) of
- false ->
- false;
- {_, Id} ->
- is_design_doc_id(Id)
- end.
-
-
-is_design_doc_id(<<?DESIGN_DOC_PREFIX, _/binary>>) ->
- true;
-is_design_doc_id(_) ->
- false.
-
-
--ifdef(TEST).
-
--include_lib("eunit/include/eunit.hrl").
--include_lib("couch/include/couch_eunit.hrl").
-
--define(MOD, multidb_test_module).
--define(SUFFIX, <<"suff">>).
--define(DBNAME, <<"shards/40000000-5fffffff/acct/suff.0123456789">>).
-
-couch_multidb_changes_test_() ->
- {
- setup,
- fun setup_all/0,
- fun teardown_all/1,
- {
- foreach,
- fun setup/0,
- fun teardown/1,
- [
- t_handle_call_change(),
- t_handle_call_change_filter_design_docs(),
- t_handle_call_checkpoint_new(),
- t_handle_call_checkpoint_existing(),
- t_handle_info_created(),
- t_handle_info_deleted(),
- t_handle_info_updated(),
- t_handle_info_other_event(),
- t_handle_info_created_other_db(),
- t_handle_info_scanner_exit_normal(),
- t_handle_info_scanner_crashed(),
- t_handle_info_event_server_exited(),
- t_handle_info_unknown_pid_exited(),
- t_handle_info_change_feed_exited(),
- t_handle_info_change_feed_exited_and_need_rescan(),
- t_spawn_changes_reader(),
- t_changes_reader_cb_change(),
- t_changes_reader_cb_stop(),
- t_changes_reader_cb_other(),
- t_handle_call_resume_scan_no_chfeed_no_ets_entry(),
- t_handle_call_resume_scan_chfeed_no_ets_entry(),
- t_handle_call_resume_scan_chfeed_ets_entry(),
- t_handle_call_resume_scan_no_chfeed_ets_entry(),
- t_start_link(),
- t_start_link_no_ddocs(),
- t_misc_gen_server_callbacks()
- ]
- }
- }.
-
-
-setup_all() ->
- mock_logs(),
- mock_callback_mod(),
- meck:expect(couch_event, register_all, 1, ok),
- meck:expect(config, get, ["mem3", "shards_db", '_'], "_dbs"),
- meck:expect(mem3_util, ensure_exists, 1, {ok, dbs}),
- ChangesFun = meck:val(fun(_) -> ok end),
- meck:expect(couch_changes, handle_db_changes, 3, ChangesFun),
- meck:expect(couch_db, open_int,
- fun(?DBNAME, [?CTX, sys_db]) -> {ok, db};
- (_, _) -> {not_found, no_db_file}
- end),
- meck:expect(couch_db, close, 1, ok),
- mock_changes_reader(),
- % create process to stand in for couch_event_server
- % mocking erlang:monitor doesn't work, so give it real process to monitor
- EvtPid = spawn_link(fun() -> receive looper -> ok end end),
- true = register(couch_event_server, EvtPid),
- EvtPid.
-
-
-teardown_all(EvtPid) ->
- unlink(EvtPid),
- exit(EvtPid, kill),
- meck:unload().
-
-
-setup() ->
- meck:reset([
- ?MOD,
- couch_changes,
- couch_db,
- couch_event,
- couch_log
- ]).
-
-
-teardown(_) ->
- ok.
-
-
-t_handle_call_change() ->
- ?_test(begin
- State = mock_state(),
- Change = change_row(<<"blah">>),
- handle_call_ok({change, ?DBNAME, Change}, State),
- ?assert(meck:validate(?MOD)),
- ?assert(meck:called(?MOD, db_change, [?DBNAME, Change, zig]))
- end).
-
-
-t_handle_call_change_filter_design_docs() ->
- ?_test(begin
- State0 = mock_state(),
- State = State0#state{skip_ddocs = true},
- Change = change_row(<<"_design/blah">>),
- handle_call_ok({change, ?DBNAME, Change}, State),
- ?assert(meck:validate(?MOD)),
- ?assertNot(meck:called(?MOD, db_change, [?DBNAME, Change, zig]))
- end).
-
-
-t_handle_call_checkpoint_new() ->
- ?_test(begin
- Tid = mock_ets(),
- State = mock_state(Tid),
- handle_call_ok({checkpoint, ?DBNAME, 1}, State),
- ?assertEqual([{?DBNAME, 1, false}], ets:tab2list(Tid)),
- ets:delete(Tid)
- end).
-
-
-t_handle_call_checkpoint_existing() ->
- ?_test(begin
- Tid = mock_ets(),
- State = mock_state(Tid),
- true = ets:insert(Tid, {?DBNAME, 1, true}),
- handle_call_ok({checkpoint, ?DBNAME, 2}, State),
- ?assertEqual([{?DBNAME, 2, true}], ets:tab2list(Tid)),
- ets:delete(Tid)
- end).
-
-
-t_handle_info_created() ->
- ?_test(begin
- Tid = mock_ets(),
- State = mock_state(Tid),
- handle_info_check({'$couch_event', ?DBNAME, created}, State),
- ?assert(meck:validate(?MOD)),
- ?assert(meck:called(?MOD, db_created, [?DBNAME, zig]))
- end).
-
-
-t_handle_info_deleted() ->
- ?_test(begin
- State = mock_state(),
- handle_info_check({'$couch_event', ?DBNAME, deleted}, State),
- ?assert(meck:validate(?MOD)),
- ?assert(meck:called(?MOD, db_deleted, [?DBNAME, zig]))
- end).
-
-
-t_handle_info_updated() ->
- ?_test(begin
- Tid = mock_ets(),
- State = mock_state(Tid),
- handle_info_check({'$couch_event', ?DBNAME, updated}, State),
- ?assert(meck:validate(?MOD)),
- ?assert(meck:called(?MOD, db_found, [?DBNAME, zig]))
- end).
-
-
-t_handle_info_other_event() ->
- ?_test(begin
- State = mock_state(),
- handle_info_check({'$couch_event', ?DBNAME, somethingelse}, State),
- ?assertNot(meck:called(?MOD, db_created, [?DBNAME, somethingelse])),
- ?assertNot(meck:called(?MOD, db_deleted, [?DBNAME, somethingelse])),
- ?assertNot(meck:called(?MOD, db_found, [?DBNAME, somethingelse]))
- end).
-
-
-t_handle_info_created_other_db() ->
- ?_test(begin
- State = mock_state(),
- handle_info_check({'$couch_event', <<"otherdb">>, created}, State),
- ?assertNot(meck:called(?MOD, db_created, [?DBNAME, zig]))
- end).
-
-
-t_handle_info_scanner_exit_normal() ->
- ?_test(begin
- Res = handle_info({'EXIT', spid, normal}, mock_state()),
- ?assertMatch({noreply, _}, Res),
- {noreply, RState} = Res,
- ?assertEqual(nil, RState#state.scanner)
- end).
-
-
-t_handle_info_scanner_crashed() ->
- ?_test(begin
- Res = handle_info({'EXIT', spid, oops}, mock_state()),
- ?assertMatch({stop, {scanner_died, oops}, _State}, Res)
- end).
-
-
-t_handle_info_event_server_exited() ->
- ?_test(begin
- Res = handle_info({'DOWN', esref, type, espid, reason}, mock_state()),
- ?assertMatch({stop, {couch_event_server_died, reason}, _}, Res)
- end).
-
-
-t_handle_info_unknown_pid_exited() ->
- ?_test(begin
- State0 = mock_state(),
- Res0 = handle_info({'EXIT', somepid, normal}, State0),
- ?assertMatch({noreply, State0}, Res0),
- State1 = mock_state(),
- Res1 = handle_info({'EXIT', somepid, oops}, State1),
- ?assertMatch({stop, {unexpected_exit, somepid, oops}, State1}, Res1)
- end).
-
-
-t_handle_info_change_feed_exited() ->
- ?_test(begin
- Tid0 = mock_ets(),
- State0 = mock_state(Tid0, cpid),
- Res0 = handle_info({'EXIT', cpid, normal}, State0),
- ?assertMatch({noreply, _}, Res0),
- {noreply, RState0} = Res0,
- ?assertEqual([], RState0#state.pids),
- ets:delete(Tid0),
- Tid1 = mock_ets(),
- State1 = mock_state(Tid1, cpid),
- Res1 = handle_info({'EXIT', cpid, oops}, State1),
- ?assertMatch({noreply, _}, Res1),
- {noreply, RState1} = Res1,
- ?assertEqual([], RState1#state.pids),
- ets:delete(Tid1)
- end).
-
-
-t_handle_info_change_feed_exited_and_need_rescan() ->
- ?_test(begin
- Tid = mock_ets(),
- true = ets:insert(Tid, {?DBNAME, 1, true}),
- State = mock_state(Tid, cpid),
- Res = handle_info({'EXIT', cpid, normal}, State),
- ?assertMatch({noreply, _}, Res),
- {noreply, RState} = Res,
- % rescan flag should have been reset to false
- ?assertEqual([{?DBNAME, 1, false}], ets:tab2list(Tid)),
- % a mock change feed process should be running
- [{?DBNAME, Pid}] = RState#state.pids,
- ?assert(is_pid(Pid)),
- ChArgs = kill_mock_changes_reader_and_get_its_args(Pid),
- ?assertEqual({self(), ?DBNAME}, ChArgs),
- ets:delete(Tid)
- end).
-
-
-t_spawn_changes_reader() ->
- ?_test(begin
- Pid = start_changes_reader(?DBNAME, 3),
- ?assert(erlang:is_process_alive(Pid)),
- ChArgs = kill_mock_changes_reader_and_get_its_args(Pid),
- ?assertEqual({self(), ?DBNAME}, ChArgs),
- ?assert(meck:validate(couch_db)),
- ?assert(meck:validate(couch_changes)),
- ?assert(meck:called(couch_db, open_int, [?DBNAME, [?CTX, sys_db]])),
- ?assert(meck:called(couch_changes, handle_db_changes, [
- #changes_args{
- include_docs = true,
- since = 3,
- feed = "normal",
- timeout = infinity
- }, {json_req, null}, db]))
- end).
-
-
-t_changes_reader_cb_change() ->
- ?_test(begin
- {ok, Pid} = start_link(?SUFFIX, ?MOD, zig, []),
- Change = change_row(<<"blah">>),
- ChArg = {change, Change, ignore},
- {Pid, ?DBNAME} = changes_reader_cb(ChArg, chtype, {Pid, ?DBNAME}),
- ?assert(meck:called(?MOD, db_change, [?DBNAME, Change, zig])),
- unlink(Pid),
- exit(Pid, kill)
- end).
-
-
-t_changes_reader_cb_stop() ->
- ?_test(begin
- {ok, Pid} = start_link(?SUFFIX, ?MOD, zig, []),
- ChArg = {stop, 11},
- {Pid, ?DBNAME} = changes_reader_cb(ChArg, chtype, {Pid, ?DBNAME}),
- % We checkpoint on stop, check if checkpointed at correct sequence
- #state{tid = Tid} = sys:get_state(Pid),
- ?assertEqual([{?DBNAME, 11, false}], ets:tab2list(Tid)),
- unlink(Pid),
- exit(Pid, kill)
- end).
-
-
-t_changes_reader_cb_other() ->
- ?_assertEqual(acc, changes_reader_cb(other, chtype, acc)).
-
-
-t_handle_call_resume_scan_no_chfeed_no_ets_entry() ->
- ?_test(begin
- Tid = mock_ets(),
- State = mock_state(Tid),
- RState = resume_scan(?DBNAME, State),
- % Check if inserted checkpoint entry in ets starting at 0
- ?assertEqual([{?DBNAME, 0, false}], ets:tab2list(Tid)),
- % Check if called db_found callback
- ?assert(meck:called(?MOD, db_found, [?DBNAME, zig])),
- % Check if started a change reader
- [{?DBNAME, Pid}] = RState#state.pids,
- ChArgs = kill_mock_changes_reader_and_get_its_args(Pid),
- ?assertEqual({self(), ?DBNAME}, ChArgs),
- ?assert(meck:called(couch_changes, handle_db_changes, [
- #changes_args{
- include_docs = true,
- since = 0,
- feed = "normal",
- timeout = infinity
- }, {json_req, null}, db])),
- ets:delete(Tid)
- end).
-
-
-t_handle_call_resume_scan_chfeed_no_ets_entry() ->
- ?_test(begin
- Tid = mock_ets(),
- Pid = start_changes_reader(?DBNAME, 0),
- State = mock_state(Tid, Pid),
- resume_scan(?DBNAME, State),
- % Check ets checkpoint is set to 0 and rescan = true
- ?assertEqual([{?DBNAME, 0, true}], ets:tab2list(Tid)),
- ets:delete(Tid),
- kill_mock_changes_reader_and_get_its_args(Pid)
- end).
-
-
-t_handle_call_resume_scan_chfeed_ets_entry() ->
- ?_test(begin
- Tid = mock_ets(),
- true = ets:insert(Tid, [{?DBNAME, 2, false}]),
- Pid = start_changes_reader(?DBNAME, 1),
- State = mock_state(Tid, Pid),
- resume_scan(?DBNAME, State),
- % Check ets checkpoint is set to same endseq but rescan = true
- ?assertEqual([{?DBNAME, 2, true}], ets:tab2list(Tid)),
- ets:delete(Tid),
- kill_mock_changes_reader_and_get_its_args(Pid)
- end).
-
-
-t_handle_call_resume_scan_no_chfeed_ets_entry() ->
- ?_test(begin
- Tid = mock_ets(),
- true = ets:insert(Tid, [{?DBNAME, 1, true}]),
- State = mock_state(Tid),
- RState = resume_scan(?DBNAME, State),
- % Check if reset rescan to false but kept same endseq
- ?assertEqual([{?DBNAME, 1, false}], ets:tab2list(Tid)),
- % Check if started a change reader
- [{?DBNAME, Pid}] = RState#state.pids,
- ChArgs = kill_mock_changes_reader_and_get_its_args(Pid),
- ?assertEqual({self(), ?DBNAME}, ChArgs),
- ?assert(meck:called(couch_changes, handle_db_changes, [
- #changes_args{
- include_docs = true,
- since = 1,
- feed = "normal",
- timeout = infinity
- }, {json_req, null}, db])),
- ets:delete(Tid)
- end).
-
-
-t_start_link() ->
- ?_test(begin
- {ok, Pid} = start_link(?SUFFIX, ?MOD, nil, []),
- ?assert(is_pid(Pid)),
- ?assertMatch(#state{
- mod = ?MOD,
- suffix = ?SUFFIX,
- ctx = nil,
- pids = [],
- skip_ddocs = false
- }, sys:get_state(Pid)),
- unlink(Pid),
- exit(Pid, kill),
- ?assert(meck:called(couch_event, register_all, [Pid]))
- end).
-
-
-t_start_link_no_ddocs() ->
- ?_test(begin
- {ok, Pid} = start_link(?SUFFIX, ?MOD, nil, [skip_ddocs]),
- ?assert(is_pid(Pid)),
- ?assertMatch(#state{
- mod = ?MOD,
- suffix = ?SUFFIX,
- ctx = nil,
- pids = [],
- skip_ddocs = true
- }, sys:get_state(Pid)),
- unlink(Pid),
- exit(Pid, kill)
- end).
-
-
-t_misc_gen_server_callbacks() ->
- ?_test(begin
- ?assertEqual(ok, terminate(reason, state)),
- ?assertEqual({ok, state}, code_change(old, state, extra))
- end).
-
-
-scan_dbs_test_() ->
-{
- setup,
- fun() ->
- Ctx = test_util:start_couch([mem3, fabric]),
- GlobalDb = ?tempdb(),
- ok = fabric:create_db(GlobalDb, [?CTX]),
- #shard{name = LocalDb} = hd(mem3:local_shards(GlobalDb)),
- {Ctx, GlobalDb, LocalDb}
- end,
- fun({Ctx, GlobalDb, _LocalDb}) ->
- fabric:delete_db(GlobalDb, [?CTX]),
- test_util:stop_couch(Ctx)
- end,
- {with, [
- fun t_find_shard/1,
- fun t_shard_not_found/1,
- fun t_pass_local/1,
- fun t_fail_local/1
- ]}
-}.
-
-
-t_find_shard({_, DbName, _}) ->
- ?_test(begin
- ?assertEqual(2, length(local_shards(DbName)))
- end).
-
-
-t_shard_not_found(_) ->
- ?_test(begin
- ?assertEqual([], local_shards(?tempdb()))
- end).
-
-
-t_pass_local({_, _, LocalDb}) ->
- ?_test(begin
- scan_local_db(self(), LocalDb),
- receive
- {'$gen_cast', Msg} ->
- ?assertEqual(Msg, {resume_scan, LocalDb})
- after 0 ->
- ?assert(false)
- end
- end).
-
-
-t_fail_local({_, _, LocalDb}) ->
- ?_test(begin
- scan_local_db(self(), <<"some_other_db">>),
- receive
- {'$gen_cast', Msg} ->
- ?assertNotEqual(Msg, {resume_scan, LocalDb})
- after 0 ->
- ?assert(true)
- end
- end).
-
-
-% Test helper functions
-
-mock_logs() ->
- meck:expect(couch_log, error, 2, ok),
- meck:expect(couch_log, notice, 2, ok),
- meck:expect(couch_log, info, 2, ok),
- meck:expect(couch_log, debug, 2, ok).
-
-
-mock_callback_mod() ->
- meck:new(?MOD, [non_strict]),
- meck:expect(?MOD, db_created, fun(_DbName, Ctx) -> Ctx end),
- meck:expect(?MOD, db_deleted, fun(_DbName, Ctx) -> Ctx end),
- meck:expect(?MOD, db_found, fun(_DbName, Ctx) -> Ctx end),
- meck:expect(?MOD, db_change, fun(_DbName, _Change, Ctx) -> Ctx end).
-
-
-mock_changes_reader_loop({_CbFun, {Server, DbName}}) ->
- receive
- die ->
- exit({Server, DbName})
- end.
-
-kill_mock_changes_reader_and_get_its_args(Pid) ->
- Ref = monitor(process, Pid),
- unlink(Pid),
- Pid ! die,
- receive
- {'DOWN', Ref, _, Pid, {Server, DbName}} ->
- {Server, DbName}
- after 1000 ->
- erlang:error(spawn_change_reader_timeout)
- end.
-
-
-mock_changes_reader() ->
- meck:expect(couch_changes, handle_db_changes,
- fun
- (_ChArgs, _Req, db) -> fun mock_changes_reader_loop/1;
- (_ChArgs, _Req, dbs) -> fun(_) -> ok end
- end).
-
-
-mock_ets() ->
- ets:new(multidb_test_ets, [set, public]).
-
-
-mock_state() ->
- #state{
- mod = ?MOD,
- ctx = zig,
- suffix = ?SUFFIX,
- event_server = esref,
- scanner = spid,
- pids = []}.
-
-
-mock_state(Ets) ->
- State = mock_state(),
- State#state{tid = Ets}.
-
-
-mock_state(Ets, Pid) ->
- State = mock_state(Ets),
- State#state{pids = [{?DBNAME, Pid}]}.
-
-
-change_row(Id) when is_binary(Id) ->
- {[
- {<<"seq">>, 1},
- {<<"id">>, Id},
- {<<"changes">>, [{[{<<"rev">>, <<"1-f00">>}]}]},
- {doc, {[{<<"_id">>, Id}, {<<"_rev">>, <<"1-f00">>}]}}
- ]}.
-
-
-handle_call_ok(Msg, State) ->
- ?assertMatch({reply, ok, _}, handle_call(Msg, from, State)).
-
-
-handle_info_check(Msg, State) ->
- ?assertMatch({noreply, _}, handle_info(Msg, State)).
-
-
--endif.