diff options
author | Nick Vatamaniuc <vatamane@apache.org> | 2020-08-28 04:31:04 -0400 |
---|---|---|
committer | Nick Vatamaniuc <nickva@users.noreply.github.com> | 2020-09-15 16:13:46 -0400 |
commit | 4fc9a536ec85456ab60085f020548a08dd19ca36 (patch) | |
tree | 2f2f0f619ce2e24086243310bfc1d33a1a24552b | |
parent | d2c9dffa3aca3b3e2faed49526b0065ebb845fad (diff) | |
download | couchdb-4fc9a536ec85456ab60085f020548a08dd19ca36.tar.gz |
Delete old 2.x-3.x replicator modules
These modules are not used by the new replicator.
16 files changed, 0 insertions, 5936 deletions
diff --git a/src/couch_replicator/src/couch_replicator_clustering.erl b/src/couch_replicator/src/couch_replicator_clustering.erl deleted file mode 100644 index 18de1e825..000000000 --- a/src/couch_replicator/src/couch_replicator_clustering.erl +++ /dev/null @@ -1,279 +0,0 @@ -% Licensed under the Apache License, Version 2.0 (the "License"); you may not -% use this file except in compliance with the License. You may obtain a copy of -% the License at -% -% http://www.apache.org/licenses/LICENSE-2.0 -% -% Unless required by applicable law or agreed to in writing, software -% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -% License for the specific language governing permissions and limitations under -% the License. - - -% Maintain cluster membership and stability notifications for replications. -% On changes to cluster membership, broadcast events to `replication` gen_event. -% Listeners will get `{cluster, stable}` or `{cluster, unstable}` events. -% -% Cluster stability is defined as "there have been no nodes added or removed in -% last `QuietPeriod` seconds". QuietPeriod value is configurable. To ensure a -% speedier startup, during initialization there is a shorter StartupPeriod -% in effect (also configurable). -% -% This module is also in charge of calculating ownership of replications based -% on where their _replicator db documents shards live. - - --module(couch_replicator_clustering). - --behaviour(gen_server). --behaviour(config_listener). --behaviour(mem3_cluster). - --export([ - start_link/0 -]). - --export([ - init/1, - terminate/2, - handle_call/3, - handle_info/2, - handle_cast/2, - code_change/3 -]). - --export([ - owner/2, - is_stable/0, - link_cluster_event_listener/3 -]). - -% config_listener callbacks --export([ - handle_config_change/5, - handle_config_terminate/3 -]). - -% mem3_cluster callbacks --export([ - cluster_stable/1, - cluster_unstable/1 -]). - --include_lib("couch/include/couch_db.hrl"). --include_lib("mem3/include/mem3.hrl"). - --define(DEFAULT_QUIET_PERIOD, 60). % seconds --define(DEFAULT_START_PERIOD, 5). % seconds --define(RELISTEN_DELAY, 5000). - --record(state, { - mem3_cluster_pid :: pid(), - cluster_stable :: boolean() -}). - - --spec start_link() -> {ok, pid()} | ignore | {error, term()}. -start_link() -> - gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). - - -% owner/2 function computes ownership for a {DbName, DocId} tuple -% `unstable` if cluster is considered to be unstable i.e. it has changed -% recently, or returns node() which of the owner. -% --spec owner(Dbname :: binary(), DocId :: binary()) -> node() | unstable. -owner(<<"shards/", _/binary>> = DbName, DocId) -> - case is_stable() of - false -> - unstable; - true -> - owner_int(DbName, DocId) - end; -owner(_DbName, _DocId) -> - node(). - - --spec is_stable() -> true | false. -is_stable() -> - gen_server:call(?MODULE, is_stable). - - --spec link_cluster_event_listener(atom(), atom(), list()) -> pid(). -link_cluster_event_listener(Mod, Fun, Args) - when is_atom(Mod), is_atom(Fun), is_list(Args) -> - CallbackFun = - fun(Event = {cluster, _}) -> erlang:apply(Mod, Fun, Args ++ [Event]); - (_) -> ok - end, - {ok, Pid} = couch_replicator_notifier:start_link(CallbackFun), - Pid. - - -% Mem3 cluster callbacks - -cluster_unstable(Server) -> - ok = gen_server:call(Server, set_unstable), - couch_replicator_notifier:notify({cluster, unstable}), - couch_stats:update_gauge([couch_replicator, cluster_is_stable], 0), - couch_log:notice("~s : cluster unstable", [?MODULE]), - Server. - -cluster_stable(Server) -> - ok = gen_server:call(Server, set_stable), - couch_replicator_notifier:notify({cluster, stable}), - couch_stats:update_gauge([couch_replicator, cluster_is_stable], 1), - couch_log:notice("~s : cluster stable", [?MODULE]), - Server. - - -% gen_server callbacks - -init([]) -> - ok = config:listen_for_changes(?MODULE, nil), - Period = abs(config:get_integer("replicator", "cluster_quiet_period", - ?DEFAULT_QUIET_PERIOD)), - StartPeriod = abs(config:get_integer("replicator", "cluster_start_period", - ?DEFAULT_START_PERIOD)), - couch_stats:update_gauge([couch_replicator, cluster_is_stable], 0), - {ok, Mem3Cluster} = mem3_cluster:start_link(?MODULE, self(), StartPeriod, - Period), - {ok, #state{mem3_cluster_pid = Mem3Cluster, cluster_stable = false}}. - - -terminate(_Reason, _State) -> - ok. - - -handle_call(is_stable, _From, #state{cluster_stable = IsStable} = State) -> - {reply, IsStable, State}; - -handle_call(set_stable, _From, State) -> - {reply, ok, State#state{cluster_stable = true}}; - -handle_call(set_unstable, _From, State) -> - {reply, ok, State#state{cluster_stable = false}}. - - -handle_cast({set_period, Period}, #state{mem3_cluster_pid = Pid} = State) -> - ok = mem3_cluster:set_period(Pid, Period), - {noreply, State}. - - -handle_info(restart_config_listener, State) -> - ok = config:listen_for_changes(?MODULE, nil), - {noreply, State}. - - -code_change(_OldVsn, State, _Extra) -> - {ok, State}. - - -%% Internal functions - - -handle_config_change("replicator", "cluster_quiet_period", V, _, S) -> - ok = gen_server:cast(?MODULE, {set_period, list_to_integer(V)}), - {ok, S}; -handle_config_change(_, _, _, _, S) -> - {ok, S}. - - -handle_config_terminate(_, stop, _) -> ok; -handle_config_terminate(_S, _R, _St) -> - Pid = whereis(?MODULE), - erlang:send_after(?RELISTEN_DELAY, Pid, restart_config_listener). - - --spec owner_int(binary(), binary()) -> node(). -owner_int(ShardName, DocId) -> - DbName = mem3:dbname(ShardName), - Live = [node() | nodes()], - Shards = mem3:shards(DbName, DocId), - Nodes = [N || #shard{node=N} <- Shards, lists:member(N, Live)], - mem3:owner(DbName, DocId, Nodes). - - - --ifdef(TEST). - --include_lib("eunit/include/eunit.hrl"). - - -replicator_clustering_test_() -> - { - setup, - fun setup_all/0, - fun teardown_all/1, - { - foreach, - fun setup/0, - fun teardown/1, - [ - t_stable_callback(), - t_unstable_callback() - ] - } - }. - - -t_stable_callback() -> - ?_test(begin - ?assertEqual(false, is_stable()), - cluster_stable(whereis(?MODULE)), - ?assertEqual(true, is_stable()) - end). - - -t_unstable_callback() -> - ?_test(begin - cluster_stable(whereis(?MODULE)), - ?assertEqual(true, is_stable()), - cluster_unstable(whereis(?MODULE)), - ?assertEqual(false, is_stable()) - end). - - -setup_all() -> - meck:expect(couch_log, notice, 2, ok), - meck:expect(config, get, fun(_, _, Default) -> Default end), - meck:expect(config, listen_for_changes, 2, ok), - meck:expect(couch_stats, update_gauge, 2, ok), - meck:expect(couch_replicator_notifier, notify, 1, ok). - - -teardown_all(_) -> - meck:unload(). - - -setup() -> - meck:reset([ - config, - couch_log, - couch_stats, - couch_replicator_notifier - ]), - stop_clustering_process(), - {ok, Pid} = start_link(), - Pid. - - -teardown(Pid) -> - stop_clustering_process(Pid). - - -stop_clustering_process() -> - stop_clustering_process(whereis(?MODULE)). - - -stop_clustering_process(undefined) -> - ok; - -stop_clustering_process(Pid) when is_pid(Pid) -> - Ref = erlang:monitor(process, Pid), - unlink(Pid), - exit(Pid, kill), - receive {'DOWN', Ref, _, _, _} -> ok end. - --endif. diff --git a/src/couch_replicator/src/couch_replicator_db_changes.erl b/src/couch_replicator/src/couch_replicator_db_changes.erl deleted file mode 100644 index 92b0222c4..000000000 --- a/src/couch_replicator/src/couch_replicator_db_changes.erl +++ /dev/null @@ -1,108 +0,0 @@ -% Licensed under the Apache License, Version 2.0 (the "License"); you may not -% use this file except in compliance with the License. You may obtain a copy of -% the License at -% -% http://www.apache.org/licenses/LICENSE-2.0 -% -% Unless required by applicable law or agreed to in writing, software -% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -% License for the specific language governing permissions and limitations under -% the License. - --module(couch_replicator_db_changes). - --behaviour(gen_server). - --export([ - start_link/0 -]). - --export([ - init/1, - terminate/2, - handle_call/3, - handle_info/2, - handle_cast/2, - code_change/3 -]). - --export([ - notify_cluster_event/2 -]). - --record(state, { - event_listener :: pid(), - mdb_changes :: pid() | nil -}). - - --spec notify_cluster_event(pid(), {cluster, any()}) -> ok. -notify_cluster_event(Server, {cluster, _} = Event) -> - gen_server:cast(Server, Event). - - --spec start_link() -> - {ok, pid()} | ignore | {error, any()}. -start_link() -> - gen_server:start_link(?MODULE, [], []). - - -init([]) -> - EvtPid = couch_replicator_clustering:link_cluster_event_listener(?MODULE, - notify_cluster_event, [self()]), - State = #state{event_listener = EvtPid, mdb_changes = nil}, - case couch_replicator_clustering:is_stable() of - true -> - {ok, restart_mdb_changes(State)}; - false -> - {ok, State} - end. - - -terminate(_Reason, _State) -> - ok. - - -handle_call(_Msg, _From, State) -> - {reply, {error, invalid_call}, State}. - - -handle_cast({cluster, unstable}, State) -> - {noreply, stop_mdb_changes(State)}; - -handle_cast({cluster, stable}, State) -> - {noreply, restart_mdb_changes(State)}. - - -handle_info(_Msg, State) -> - {noreply, State}. - - -code_change(_OldVsn, State, _Extra) -> - {ok, State}. - - --spec restart_mdb_changes(#state{}) -> #state{}. -restart_mdb_changes(#state{mdb_changes = nil} = State) -> - Suffix = <<"_replicator">>, - CallbackMod = couch_replicator_doc_processor, - Options = [skip_ddocs], - {ok, Pid} = couch_multidb_changes:start_link(Suffix, CallbackMod, nil, - Options), - couch_stats:increment_counter([couch_replicator, db_scans]), - couch_log:notice("Started replicator db changes listener ~p", [Pid]), - State#state{mdb_changes = Pid}; - -restart_mdb_changes(#state{mdb_changes = _Pid} = State) -> - restart_mdb_changes(stop_mdb_changes(State)). - - --spec stop_mdb_changes(#state{}) -> #state{}. -stop_mdb_changes(#state{mdb_changes = nil} = State) -> - State; -stop_mdb_changes(#state{mdb_changes = Pid} = State) -> - couch_log:notice("Stopping replicator db changes listener ~p", [Pid]), - unlink(Pid), - exit(Pid, kill), - State#state{mdb_changes = nil}. diff --git a/src/couch_replicator/src/couch_replicator_doc_processor.erl b/src/couch_replicator/src/couch_replicator_doc_processor.erl deleted file mode 100644 index 6778d537d..000000000 --- a/src/couch_replicator/src/couch_replicator_doc_processor.erl +++ /dev/null @@ -1,962 +0,0 @@ -% Licensed under the Apache License, Version 2.0 (the "License"); you may not -% use this file except in compliance with the License. You may obtain a copy of -% the License at -% -% http://www.apache.org/licenses/LICENSE-2.0 -% -% Unless required by applicable law or agreed to in writing, software -% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -% License for the specific language governing permissions and limitations under -% the License. - --module(couch_replicator_doc_processor). - --behaviour(gen_server). --behaviour(couch_multidb_changes). - --export([ - start_link/0 -]). - --export([ - init/1, - terminate/2, - handle_call/3, - handle_info/2, - handle_cast/2, - code_change/3 -]). - --export([ - db_created/2, - db_deleted/2, - db_found/2, - db_change/3 -]). - --export([ - docs/1, - doc/2, - doc_lookup/3, - update_docs/0, - get_worker_ref/1, - notify_cluster_event/2 -]). - --include_lib("couch/include/couch_db.hrl"). --include("couch_replicator.hrl"). --include_lib("mem3/include/mem3.hrl"). - --import(couch_replicator_utils, [ - get_json_value/2, - get_json_value/3 -]). - --define(DEFAULT_UPDATE_DOCS, false). --define(ERROR_MAX_BACKOFF_EXPONENT, 12). % ~ 1 day on average --define(TS_DAY_SEC, 86400). --define(INITIAL_BACKOFF_EXPONENT, 64). --define(MIN_FILTER_DELAY_SEC, 60). - --type filter_type() :: nil | view | user | docids | mango. --type repstate() :: initializing | error | scheduled. - - --record(rdoc, { - id :: db_doc_id() | '_' | {any(), '_'}, - state :: repstate() | '_', - rep :: #rep{} | nil | '_', - rid :: rep_id() | nil | '_', - filter :: filter_type() | '_', - info :: binary() | nil | '_', - errcnt :: non_neg_integer() | '_', - worker :: reference() | nil | '_', - last_updated :: erlang:timestamp() | '_' -}). - - -% couch_multidb_changes API callbacks - -db_created(DbName, Server) -> - couch_stats:increment_counter([couch_replicator, docs, dbs_created]), - couch_replicator_docs:ensure_rep_ddoc_exists(DbName), - Server. - - -db_deleted(DbName, Server) -> - couch_stats:increment_counter([couch_replicator, docs, dbs_deleted]), - ok = gen_server:call(?MODULE, {clean_up_replications, DbName}, infinity), - Server. - - -db_found(DbName, Server) -> - couch_stats:increment_counter([couch_replicator, docs, dbs_found]), - couch_replicator_docs:ensure_rep_ddoc_exists(DbName), - Server. - - -db_change(DbName, {ChangeProps} = Change, Server) -> - couch_stats:increment_counter([couch_replicator, docs, db_changes]), - try - ok = process_change(DbName, Change) - catch - exit:{Error, {gen_server, call, [?MODULE, _, _]}} -> - ErrMsg = "~p exited ~p while processing change from db ~p", - couch_log:error(ErrMsg, [?MODULE, Error, DbName]); - _Tag:Error -> - {RepProps} = get_json_value(doc, ChangeProps), - DocId = get_json_value(<<"_id">>, RepProps), - couch_replicator_docs:update_failed(DbName, DocId, Error) - end, - Server. - - --spec get_worker_ref(db_doc_id()) -> reference() | nil. -get_worker_ref({DbName, DocId}) when is_binary(DbName), is_binary(DocId) -> - case ets:lookup(?MODULE, {DbName, DocId}) of - [#rdoc{worker = WRef}] when is_reference(WRef) -> - WRef; - [#rdoc{worker = nil}] -> - nil; - [] -> - nil - end. - - -% Cluster membership change notification callback --spec notify_cluster_event(pid(), {cluster, any()}) -> ok. -notify_cluster_event(Server, {cluster, _} = Event) -> - gen_server:cast(Server, Event). - - -process_change(DbName, {Change}) -> - {RepProps} = JsonRepDoc = get_json_value(doc, Change), - DocId = get_json_value(<<"_id">>, RepProps), - Owner = couch_replicator_clustering:owner(DbName, DocId), - Id = {DbName, DocId}, - case {Owner, get_json_value(deleted, Change, false)} of - {_, true} -> - ok = gen_server:call(?MODULE, {removed, Id}, infinity); - {unstable, false} -> - couch_log:notice("Not starting '~s' as cluster is unstable", [DocId]); - {ThisNode, false} when ThisNode =:= node() -> - case get_json_value(<<"_replication_state">>, RepProps) of - undefined -> - ok = process_updated(Id, JsonRepDoc); - <<"triggered">> -> - maybe_remove_state_fields(DbName, DocId), - ok = process_updated(Id, JsonRepDoc); - <<"completed">> -> - ok = gen_server:call(?MODULE, {completed, Id}, infinity); - <<"error">> -> - % Handle replications started from older versions of replicator - % which wrote transient errors to replication docs - maybe_remove_state_fields(DbName, DocId), - ok = process_updated(Id, JsonRepDoc); - <<"failed">> -> - ok - end; - {Owner, false} -> - ok - end, - ok. - - -maybe_remove_state_fields(DbName, DocId) -> - case update_docs() of - true -> - ok; - false -> - couch_replicator_docs:remove_state_fields(DbName, DocId) - end. - - -process_updated({DbName, _DocId} = Id, JsonRepDoc) -> - % Parsing replication doc (but not calculating the id) could throw an - % exception which would indicate this document is malformed. This exception - % should propagate to db_change function and will be recorded as permanent - % failure in the document. User will have to update the documet to fix the - % problem. - Rep0 = couch_replicator_docs:parse_rep_doc_without_id(JsonRepDoc), - Rep = Rep0#rep{db_name = DbName, start_time = os:timestamp()}, - Filter = case couch_replicator_filters:parse(Rep#rep.options) of - {ok, nil} -> - nil; - {ok, {user, _FName, _QP}} -> - user; - {ok, {view, _FName, _QP}} -> - view; - {ok, {docids, _DocIds}} -> - docids; - {ok, {mango, _Selector}} -> - mango; - {error, FilterError} -> - throw(FilterError) - end, - gen_server:call(?MODULE, {updated, Id, Rep, Filter}, infinity). - - -% Doc processor gen_server API and callbacks - -start_link() -> - gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). - - -init([]) -> - ?MODULE = ets:new(?MODULE, [named_table, {keypos, #rdoc.id}, - {read_concurrency, true}, {write_concurrency, true}]), - couch_replicator_clustering:link_cluster_event_listener(?MODULE, - notify_cluster_event, [self()]), - {ok, nil}. - - -terminate(_Reason, _State) -> - ok. - - -handle_call({updated, Id, Rep, Filter}, _From, State) -> - ok = updated_doc(Id, Rep, Filter), - {reply, ok, State}; - -handle_call({removed, Id}, _From, State) -> - ok = removed_doc(Id), - {reply, ok, State}; - -handle_call({completed, Id}, _From, State) -> - true = ets:delete(?MODULE, Id), - {reply, ok, State}; - -handle_call({clean_up_replications, DbName}, _From, State) -> - ok = removed_db(DbName), - {reply, ok, State}. - -handle_cast({cluster, unstable}, State) -> - % Ignoring unstable state transition - {noreply, State}; - -handle_cast({cluster, stable}, State) -> - % Membership changed recheck all the replication document ownership - nil = ets:foldl(fun cluster_membership_foldl/2, nil, ?MODULE), - {noreply, State}; - -handle_cast(Msg, State) -> - {stop, {error, unexpected_message, Msg}, State}. - - -handle_info({'DOWN', _, _, _, #doc_worker_result{id = Id, wref = Ref, - result = Res}}, State) -> - ok = worker_returned(Ref, Id, Res), - {noreply, State}; - -handle_info(_Msg, State) -> - {noreply, State}. - - -code_change(_OldVsn, State, _Extra) -> - {ok, State}. - - -% Doc processor gen_server private helper functions - -% Handle doc update -- add to ets, then start a worker to try to turn it into -% a replication job. In most cases it will succeed quickly but for filtered -% replications or if there are duplicates, it could take longer -% (theoretically indefinitely) until a replication could be started. Before -% adding replication job, make sure to delete all old jobs associated with -% same document. --spec updated_doc(db_doc_id(), #rep{}, filter_type()) -> ok. -updated_doc(Id, Rep, Filter) -> - NormCurRep = couch_replicator_utils:normalize_rep(current_rep(Id)), - NormNewRep = couch_replicator_utils:normalize_rep(Rep), - case NormCurRep == NormNewRep of - false -> - removed_doc(Id), - Row = #rdoc{ - id = Id, - state = initializing, - rep = Rep, - rid = nil, - filter = Filter, - info = nil, - errcnt = 0, - worker = nil, - last_updated = os:timestamp() - }, - true = ets:insert(?MODULE, Row), - ok = maybe_start_worker(Id); - true -> - ok - end. - - -% Return current #rep{} record if any. If replication hasn't been submitted -% to the scheduler yet, #rep{} record will be in the document processor's -% ETS table, otherwise query scheduler for the #rep{} record. --spec current_rep({binary(), binary()}) -> #rep{} | nil. -current_rep({DbName, DocId}) when is_binary(DbName), is_binary(DocId) -> - case ets:lookup(?MODULE, {DbName, DocId}) of - [] -> - nil; - [#rdoc{state = scheduled, rep = nil, rid = JobId}] -> - % When replication is scheduled, #rep{} record which can be quite - % large compared to other bits in #rdoc is removed in order to avoid - % having to keep 2 copies of it. So have to fetch it from the - % scheduler. - couch_replicator_scheduler:rep_state(JobId); - [#rdoc{rep = Rep}] -> - Rep - end. - - --spec worker_returned(reference(), db_doc_id(), rep_start_result()) -> ok. -worker_returned(Ref, Id, {ok, RepId}) -> - case ets:lookup(?MODULE, Id) of - [#rdoc{worker = Ref} = Row] -> - Row0 = Row#rdoc{ - state = scheduled, - errcnt = 0, - worker = nil, - last_updated = os:timestamp() - }, - NewRow = case Row0 of - #rdoc{rid = RepId, filter = user} -> - % Filtered replication id didn't change. - Row0; - #rdoc{rid = nil, filter = user} -> - % Calculated new replication id for a filtered replication. Make - % sure to schedule another check as filter code could change. - % Replication starts could have been failing, so also clear - % error count. - Row0#rdoc{rid = RepId}; - #rdoc{rid = OldRepId, filter = user} -> - % Replication id of existing replication job with filter has - % changed. Remove old replication job from scheduler and - % schedule check to check for future changes. - ok = couch_replicator_scheduler:remove_job(OldRepId), - Msg = io_lib:format("Replication id changed: ~p -> ~p", [ - OldRepId, RepId]), - Row0#rdoc{rid = RepId, info = couch_util:to_binary(Msg)}; - #rdoc{rid = nil} -> - % Calculated new replication id for non-filtered replication. - % Remove replication doc body, after this we won't need it - % anymore. - Row0#rdoc{rep=nil, rid=RepId, info=nil} - end, - true = ets:insert(?MODULE, NewRow), - ok = maybe_update_doc_triggered(Row#rdoc.rep, RepId), - ok = maybe_start_worker(Id); - _ -> - ok % doc could have been deleted, ignore - end, - ok; - -worker_returned(_Ref, _Id, ignore) -> - ok; - -worker_returned(Ref, Id, {temporary_error, Reason}) -> - case ets:lookup(?MODULE, Id) of - [#rdoc{worker = Ref, errcnt = ErrCnt} = Row] -> - NewRow = Row#rdoc{ - rid = nil, - state = error, - info = Reason, - errcnt = ErrCnt + 1, - worker = nil, - last_updated = os:timestamp() - }, - true = ets:insert(?MODULE, NewRow), - ok = maybe_update_doc_error(NewRow#rdoc.rep, Reason), - ok = maybe_start_worker(Id); - _ -> - ok % doc could have been deleted, ignore - end, - ok; - -worker_returned(Ref, Id, {permanent_failure, _Reason}) -> - case ets:lookup(?MODULE, Id) of - [#rdoc{worker = Ref}] -> - true = ets:delete(?MODULE, Id); - _ -> - ok % doc could have been deleted, ignore - end, - ok. - - --spec maybe_update_doc_error(#rep{}, any()) -> ok. -maybe_update_doc_error(Rep, Reason) -> - case update_docs() of - true -> - couch_replicator_docs:update_error(Rep, Reason); - false -> - ok - end. - - --spec maybe_update_doc_triggered(#rep{}, rep_id()) -> ok. -maybe_update_doc_triggered(Rep, RepId) -> - case update_docs() of - true -> - couch_replicator_docs:update_triggered(Rep, RepId); - false -> - ok - end. - - --spec error_backoff(non_neg_integer()) -> seconds(). -error_backoff(ErrCnt) -> - Exp = min(ErrCnt, ?ERROR_MAX_BACKOFF_EXPONENT), - % ErrCnt is the exponent here. The reason 64 is used is to start at - % 64 (about a minute) max range. Then first backoff would be 30 sec - % on average. Then 1 minute and so on. - couch_rand:uniform(?INITIAL_BACKOFF_EXPONENT bsl Exp). - - --spec filter_backoff() -> seconds(). -filter_backoff() -> - Total = ets:info(?MODULE, size), - % This value scaled by the number of replications. If the are a lot of them - % wait is longer, but not more than a day (?TS_DAY_SEC). If there are just - % few, wait is shorter, starting at about 30 seconds. `2 *` is used since - % the expected wait would then be 0.5 * Range so it is easier to see the - % average wait. `1 +` is used because couch_rand:uniform only - % accepts >= 1 values and crashes otherwise. - Range = 1 + min(2 * (Total / 10), ?TS_DAY_SEC), - ?MIN_FILTER_DELAY_SEC + couch_rand:uniform(round(Range)). - - -% Document removed from db -- clear ets table and remove all scheduled jobs --spec removed_doc(db_doc_id()) -> ok. -removed_doc({DbName, DocId} = Id) -> - ets:delete(?MODULE, Id), - RepIds = couch_replicator_scheduler:find_jobs_by_doc(DbName, DocId), - lists:foreach(fun couch_replicator_scheduler:remove_job/1, RepIds). - - -% Whole db shard is gone -- remove all its ets rows and stop jobs --spec removed_db(binary()) -> ok. -removed_db(DbName) -> - EtsPat = #rdoc{id = {DbName, '_'}, _ = '_'}, - ets:match_delete(?MODULE, EtsPat), - RepIds = couch_replicator_scheduler:find_jobs_by_dbname(DbName), - lists:foreach(fun couch_replicator_scheduler:remove_job/1, RepIds). - - -% Spawn a worker process which will attempt to calculate a replication id, then -% start a replication. Returns a process monitor reference. The worker is -% guaranteed to exit with rep_start_result() type only. --spec maybe_start_worker(db_doc_id()) -> ok. -maybe_start_worker(Id) -> - case ets:lookup(?MODULE, Id) of - [] -> - ok; - [#rdoc{state = scheduled, filter = Filter}] when Filter =/= user -> - ok; - [#rdoc{rep = Rep} = Doc] -> - % For any replication with a user created filter function, periodically - % (every `filter_backoff/0` seconds) to try to see if the user filter - % has changed by using a worker to check for changes. When the worker - % returns check if replication ID has changed. If it hasn't keep - % checking (spawn another worker and so on). If it has stop the job - % with the old ID and continue checking. - Wait = get_worker_wait(Doc), - Ref = make_ref(), - true = ets:insert(?MODULE, Doc#rdoc{worker = Ref}), - couch_replicator_doc_processor_worker:spawn_worker(Id, Rep, Wait, Ref), - ok - end. - - --spec get_worker_wait(#rdoc{}) -> seconds(). -get_worker_wait(#rdoc{state = scheduled, filter = user}) -> - filter_backoff(); -get_worker_wait(#rdoc{state = error, errcnt = ErrCnt}) -> - error_backoff(ErrCnt); -get_worker_wait(#rdoc{state = initializing}) -> - 0. - - --spec update_docs() -> boolean(). -update_docs() -> - config:get_boolean("replicator", "update_docs", ?DEFAULT_UPDATE_DOCS). - - -% _scheduler/docs HTTP endpoint helpers - --spec docs([atom()]) -> [{[_]}] | []. -docs(States) -> - HealthThreshold = couch_replicator_scheduler:health_threshold(), - ets:foldl(fun(RDoc, Acc) -> - case ejson_doc(RDoc, HealthThreshold) of - nil -> - Acc; % Could have been deleted if job just completed - {Props} = EJson -> - {state, DocState} = lists:keyfind(state, 1, Props), - case ejson_doc_state_filter(DocState, States) of - true -> - [EJson | Acc]; - false -> - Acc - end - end - end, [], ?MODULE). - - --spec doc(binary(), binary()) -> {ok, {[_]}} | {error, not_found}. -doc(Db, DocId) -> - HealthThreshold = couch_replicator_scheduler:health_threshold(), - Res = (catch ets:foldl(fun(RDoc, nil) -> - {Shard, RDocId} = RDoc#rdoc.id, - case {mem3:dbname(Shard), RDocId} of - {Db, DocId} -> - throw({found, ejson_doc(RDoc, HealthThreshold)}); - {_OtherDb, _OtherDocId} -> - nil - end - end, nil, ?MODULE)), - case Res of - {found, DocInfo} -> - {ok, DocInfo}; - nil -> - {error, not_found} - end. - - --spec doc_lookup(binary(), binary(), integer()) -> - {ok, {[_]}} | {error, not_found}. -doc_lookup(Db, DocId, HealthThreshold) -> - case ets:lookup(?MODULE, {Db, DocId}) of - [#rdoc{} = RDoc] -> - {ok, ejson_doc(RDoc, HealthThreshold)}; - [] -> - {error, not_found} - end. - - --spec ejson_rep_id(rep_id() | nil) -> binary() | null. -ejson_rep_id(nil) -> - null; -ejson_rep_id({BaseId, Ext}) -> - iolist_to_binary([BaseId, Ext]). - - --spec ejson_doc(#rdoc{}, non_neg_integer()) -> {[_]} | nil. -ejson_doc(#rdoc{state = scheduled} = RDoc, HealthThreshold) -> - #rdoc{id = {DbName, DocId}, rid = RepId} = RDoc, - JobProps = couch_replicator_scheduler:job_summary(RepId, HealthThreshold), - case JobProps of - nil -> - nil; - [{_, _} | _] -> - {[ - {doc_id, DocId}, - {database, DbName}, - {id, ejson_rep_id(RepId)}, - {node, node()} | JobProps - ]} - end; - -ejson_doc(#rdoc{state = RepState} = RDoc, _HealthThreshold) -> - #rdoc{ - id = {DbName, DocId}, - info = StateInfo, - rid = RepId, - errcnt = ErrorCount, - last_updated = StateTime, - rep = Rep - } = RDoc, - {[ - {doc_id, DocId}, - {database, DbName}, - {id, ejson_rep_id(RepId)}, - {state, RepState}, - {info, couch_replicator_utils:ejson_state_info(StateInfo)}, - {error_count, ErrorCount}, - {node, node()}, - {last_updated, couch_replicator_utils:iso8601(StateTime)}, - {start_time, couch_replicator_utils:iso8601(Rep#rep.start_time)} - ]}. - - --spec ejson_doc_state_filter(atom(), [atom()]) -> boolean(). -ejson_doc_state_filter(_DocState, []) -> - true; -ejson_doc_state_filter(State, States) when is_list(States), is_atom(State) -> - lists:member(State, States). - - --spec cluster_membership_foldl(#rdoc{}, nil) -> nil. -cluster_membership_foldl(#rdoc{id = {DbName, DocId} = Id, rid = RepId}, nil) -> - case couch_replicator_clustering:owner(DbName, DocId) of - unstable -> - nil; - ThisNode when ThisNode =:= node() -> - nil; - OtherNode -> - Msg = "Replication doc ~p:~p with id ~p usurped by node ~p", - couch_log:notice(Msg, [DbName, DocId, RepId, OtherNode]), - removed_doc(Id), - nil - end. - - --ifdef(TEST). - --include_lib("eunit/include/eunit.hrl"). - --define(DB, <<"db">>). --define(EXIT_DB, <<"exit_db">>). --define(DOC1, <<"doc1">>). --define(DOC2, <<"doc2">>). --define(R1, {"1", ""}). --define(R2, {"2", ""}). - - -doc_processor_test_() -> - { - setup, - fun setup_all/0, - fun teardown_all/1, - { - foreach, - fun setup/0, - fun teardown/1, - [ - t_bad_change(), - t_regular_change(), - t_change_with_doc_processor_crash(), - t_change_with_existing_job(), - t_deleted_change(), - t_triggered_change(), - t_completed_change(), - t_active_replication_completed(), - t_error_change(), - t_failed_change(), - t_change_for_different_node(), - t_change_when_cluster_unstable(), - t_ejson_docs(), - t_cluster_membership_foldl() - ] - } - }. - - -% Can't parse replication doc, so should write failure state to document. -t_bad_change() -> - ?_test(begin - ?assertEqual(acc, db_change(?DB, bad_change(), acc)), - ?assert(updated_doc_with_failed_state()) - end). - - -% Regular change, parse to a #rep{} and then add job. -t_regular_change() -> - ?_test(begin - mock_existing_jobs_lookup([]), - ?assertEqual(ok, process_change(?DB, change())), - ?assert(ets:member(?MODULE, {?DB, ?DOC1})), - ?assert(started_worker({?DB, ?DOC1})) - end). - - -% Handle cases where doc processor exits or crashes while processing a change -t_change_with_doc_processor_crash() -> - ?_test(begin - mock_existing_jobs_lookup([]), - ?assertEqual(acc, db_change(?EXIT_DB, change(), acc)), - ?assert(failed_state_not_updated()) - end). - - -% Regular change, parse to a #rep{} and then add job but there is already -% a running job with same Id found. -t_change_with_existing_job() -> - ?_test(begin - mock_existing_jobs_lookup([test_rep(?R2)]), - ?assertEqual(ok, process_change(?DB, change())), - ?assert(ets:member(?MODULE, {?DB, ?DOC1})), - ?assert(started_worker({?DB, ?DOC1})) - end). - - -% Change is a deletion, and job is running, so remove job. -t_deleted_change() -> - ?_test(begin - mock_existing_jobs_lookup([test_rep(?R2)]), - ?assertEqual(ok, process_change(?DB, deleted_change())), - ?assert(removed_job(?R2)) - end). - - -% Change is in `triggered` state. Remove legacy state and add job. -t_triggered_change() -> - ?_test(begin - mock_existing_jobs_lookup([]), - ?assertEqual(ok, process_change(?DB, change(<<"triggered">>))), - ?assert(removed_state_fields()), - ?assert(ets:member(?MODULE, {?DB, ?DOC1})), - ?assert(started_worker({?DB, ?DOC1})) - end). - - -% Change is in `completed` state, so skip over it. -t_completed_change() -> - ?_test(begin - ?assertEqual(ok, process_change(?DB, change(<<"completed">>))), - ?assert(did_not_remove_state_fields()), - ?assertNot(ets:member(?MODULE, {?DB, ?DOC1})), - ?assert(did_not_spawn_worker()) - end). - - -% Completed change comes for what used to be an active job. In this case -% remove entry from doc_processor's ets (because there is no linkage or -% callback mechanism for scheduler to tell doc_processsor a replication just -% completed). -t_active_replication_completed() -> - ?_test(begin - mock_existing_jobs_lookup([]), - ?assertEqual(ok, process_change(?DB, change())), - ?assert(ets:member(?MODULE, {?DB, ?DOC1})), - ?assertEqual(ok, process_change(?DB, change(<<"completed">>))), - ?assert(did_not_remove_state_fields()), - ?assertNot(ets:member(?MODULE, {?DB, ?DOC1})) - end). - - -% Change is in `error` state. Remove legacy state and retry -% running the job. This state was used for transient erorrs which are not -% written to the document anymore. -t_error_change() -> - ?_test(begin - mock_existing_jobs_lookup([]), - ?assertEqual(ok, process_change(?DB, change(<<"error">>))), - ?assert(removed_state_fields()), - ?assert(ets:member(?MODULE, {?DB, ?DOC1})), - ?assert(started_worker({?DB, ?DOC1})) - end). - - -% Change is in `failed` state. This is a terminal state and it will not -% be tried again, so skip over it. -t_failed_change() -> - ?_test(begin - ?assertEqual(ok, process_change(?DB, change(<<"failed">>))), - ?assert(did_not_remove_state_fields()), - ?assertNot(ets:member(?MODULE, {?DB, ?DOC1})), - ?assert(did_not_spawn_worker()) - end). - - -% Normal change, but according to cluster ownership algorithm, replication -% belongs to a different node, so this node should skip it. -t_change_for_different_node() -> - ?_test(begin - meck:expect(couch_replicator_clustering, owner, 2, different_node), - ?assertEqual(ok, process_change(?DB, change())), - ?assert(did_not_spawn_worker()) - end). - - -% Change handled when cluster is unstable (nodes are added or removed), so -% job is not added. A rescan will be triggered soon and change will be -% evaluated again. -t_change_when_cluster_unstable() -> - ?_test(begin - meck:expect(couch_replicator_clustering, owner, 2, unstable), - ?assertEqual(ok, process_change(?DB, change())), - ?assert(did_not_spawn_worker()) - end). - - -% Check if docs/0 function produces expected ejson after adding a job -t_ejson_docs() -> - ?_test(begin - mock_existing_jobs_lookup([]), - ?assertEqual(ok, process_change(?DB, change())), - ?assert(ets:member(?MODULE, {?DB, ?DOC1})), - EJsonDocs = docs([]), - ?assertMatch([{[_|_]}], EJsonDocs), - [{DocProps}] = EJsonDocs, - {value, StateTime, DocProps1} = lists:keytake(last_updated, 1, - DocProps), - ?assertMatch({last_updated, BinVal1} when is_binary(BinVal1), - StateTime), - {value, StartTime, DocProps2} = lists:keytake(start_time, 1, DocProps1), - ?assertMatch({start_time, BinVal2} when is_binary(BinVal2), StartTime), - ExpectedProps = [ - {database, ?DB}, - {doc_id, ?DOC1}, - {error_count, 0}, - {id, null}, - {info, null}, - {node, node()}, - {state, initializing} - ], - ?assertEqual(ExpectedProps, lists:usort(DocProps2)) - end). - - -% Check that when cluster membership changes records from doc processor and job -% scheduler get removed -t_cluster_membership_foldl() -> - ?_test(begin - mock_existing_jobs_lookup([test_rep(?R1)]), - ?assertEqual(ok, process_change(?DB, change())), - meck:expect(couch_replicator_clustering, owner, 2, different_node), - ?assert(ets:member(?MODULE, {?DB, ?DOC1})), - gen_server:cast(?MODULE, {cluster, stable}), - meck:wait(2, couch_replicator_scheduler, find_jobs_by_doc, 2, 5000), - ?assertNot(ets:member(?MODULE, {?DB, ?DOC1})), - ?assert(removed_job(?R1)) - end). - - -get_worker_ref_test_() -> - { - setup, - fun() -> - ets:new(?MODULE, [named_table, public, {keypos, #rdoc.id}]) - end, - fun(_) -> ets:delete(?MODULE) end, - ?_test(begin - Id = {<<"db">>, <<"doc">>}, - ?assertEqual(nil, get_worker_ref(Id)), - ets:insert(?MODULE, #rdoc{id = Id, worker = nil}), - ?assertEqual(nil, get_worker_ref(Id)), - Ref = make_ref(), - ets:insert(?MODULE, #rdoc{id = Id, worker = Ref}), - ?assertEqual(Ref, get_worker_ref(Id)) - end) - }. - - -% Test helper functions - - -setup_all() -> - meck:expect(couch_log, info, 2, ok), - meck:expect(couch_log, notice, 2, ok), - meck:expect(couch_log, warning, 2, ok), - meck:expect(couch_log, error, 2, ok), - meck:expect(config, get, fun(_, _, Default) -> Default end), - meck:expect(config, listen_for_changes, 2, ok), - meck:expect(couch_replicator_clustering, owner, 2, node()), - meck:expect(couch_replicator_clustering, link_cluster_event_listener, 3, - ok), - meck:expect(couch_replicator_doc_processor_worker, spawn_worker, fun - ({?EXIT_DB, _}, _, _, _) -> exit(kapow); - (_, _, _, _) -> pid - end), - meck:expect(couch_replicator_scheduler, remove_job, 1, ok), - meck:expect(couch_replicator_docs, remove_state_fields, 2, ok), - meck:expect(couch_replicator_docs, update_failed, 3, ok). - - -teardown_all(_) -> - meck:unload(). - - -setup() -> - meck:reset([ - config, - couch_log, - couch_replicator_clustering, - couch_replicator_doc_processor_worker, - couch_replicator_docs, - couch_replicator_scheduler - ]), - % Set this expectation back to the default for - % each test since some tests change it - meck:expect(couch_replicator_clustering, owner, 2, node()), - {ok, Pid} = start_link(), - unlink(Pid), - Pid. - - -teardown(Pid) -> - exit(Pid, kill). - - -removed_state_fields() -> - meck:called(couch_replicator_docs, remove_state_fields, [?DB, ?DOC1]). - - -started_worker(_Id) -> - 1 == meck:num_calls(couch_replicator_doc_processor_worker, spawn_worker, 4). - - -removed_job(Id) -> - meck:called(couch_replicator_scheduler, remove_job, [test_rep(Id)]). - - -did_not_remove_state_fields() -> - 0 == meck:num_calls(couch_replicator_docs, remove_state_fields, '_'). - - -did_not_spawn_worker() -> - 0 == meck:num_calls(couch_replicator_doc_processor_worker, spawn_worker, - '_'). - -updated_doc_with_failed_state() -> - 1 == meck:num_calls(couch_replicator_docs, update_failed, '_'). - -failed_state_not_updated() -> - 0 == meck:num_calls(couch_replicator_docs, update_failed, '_'). - -mock_existing_jobs_lookup(ExistingJobs) -> - meck:expect(couch_replicator_scheduler, find_jobs_by_doc, fun - (?EXIT_DB, ?DOC1) -> []; - (?DB, ?DOC1) -> ExistingJobs - end). - - -test_rep(Id) -> - #rep{id = Id, start_time = {0, 0, 0}}. - - -change() -> - {[ - {<<"id">>, ?DOC1}, - {doc, {[ - {<<"_id">>, ?DOC1}, - {<<"source">>, <<"http://srchost.local/src">>}, - {<<"target">>, <<"http://tgthost.local/tgt">>} - ]}} - ]}. - - -change(State) -> - {[ - {<<"id">>, ?DOC1}, - {doc, {[ - {<<"_id">>, ?DOC1}, - {<<"source">>, <<"http://srchost.local/src">>}, - {<<"target">>, <<"http://tgthost.local/tgt">>}, - {<<"_replication_state">>, State} - ]}} - ]}. - - -deleted_change() -> - {[ - {<<"id">>, ?DOC1}, - {<<"deleted">>, true}, - {doc, {[ - {<<"_id">>, ?DOC1}, - {<<"source">>, <<"http://srchost.local/src">>}, - {<<"target">>, <<"http://tgthost.local/tgt">>} - ]}} - ]}. - - -bad_change() -> - {[ - {<<"id">>, ?DOC2}, - {doc, {[ - {<<"_id">>, ?DOC2}, - {<<"source">>, <<"src">>} - ]}} - ]}. - --endif. diff --git a/src/couch_replicator/src/couch_replicator_doc_processor_worker.erl b/src/couch_replicator/src/couch_replicator_doc_processor_worker.erl deleted file mode 100644 index a4c829323..000000000 --- a/src/couch_replicator/src/couch_replicator_doc_processor_worker.erl +++ /dev/null @@ -1,284 +0,0 @@ -% Licensed under the Apache License, Version 2.0 (the "License"); you may not -% use this file except in compliance with the License. You may obtain a copy of -% the License at -% -% http://www.apache.org/licenses/LICENSE-2.0 -% -% Unless required by applicable law or agreed to in writing, software -% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -% License for the specific language governing permissions and limitations under -% the License. - --module(couch_replicator_doc_processor_worker). - --export([ - spawn_worker/4 -]). - --include("couch_replicator.hrl"). - --import(couch_replicator_utils, [ - pp_rep_id/1 -]). - -% 61 seconds here because request usually have 10, 15, 30 second -% timeouts set. We'd want the worker to get a chance to make a few -% requests (maybe one failing one and a retry) and then fail with its -% own error (timeout, network error), which would be more specific and -% informative, before it simply gets killed because of the timeout -% here. That is, if all fails and the worker is actually blocked then -% 61 sec is a safety net to brutally kill the worker so doesn't end up -% hung forever. --define(WORKER_TIMEOUT_MSEC, 61000). - - -% Spawn a worker which attempts to calculate replication id then add a -% replication job to scheduler. This function create a monitor to the worker -% a worker will then exit with the #doc_worker_result{} record within -% ?WORKER_TIMEOUT_MSEC timeout period.A timeout is considered a -%`temporary_error`. Result will be sent as the `Reason` in the {'DOWN',...} -% message. --spec spawn_worker(db_doc_id(), #rep{}, seconds(), reference()) -> pid(). -spawn_worker(Id, Rep, WaitSec, WRef) -> - {Pid, _Ref} = spawn_monitor(fun() -> - worker_fun(Id, Rep, WaitSec, WRef) - end), - Pid. - - -% Private functions - --spec worker_fun(db_doc_id(), #rep{}, seconds(), reference()) -> no_return(). -worker_fun(Id, Rep, WaitSec, WRef) -> - timer:sleep(WaitSec * 1000), - Fun = fun() -> - try maybe_start_replication(Id, Rep, WRef) of - Res -> - exit(Res) - catch - throw:{filter_fetch_error, Reason} -> - exit({temporary_error, Reason}); - _Tag:Reason -> - exit({temporary_error, Reason}) - end - end, - {Pid, Ref} = spawn_monitor(Fun), - receive - {'DOWN', Ref, _, Pid, Result} -> - exit(#doc_worker_result{id = Id, wref = WRef, result = Result}) - after ?WORKER_TIMEOUT_MSEC -> - erlang:demonitor(Ref, [flush]), - exit(Pid, kill), - {DbName, DocId} = Id, - TimeoutSec = round(?WORKER_TIMEOUT_MSEC / 1000), - Msg = io_lib:format("Replication for db ~p doc ~p failed to start due " - "to timeout after ~B seconds", [DbName, DocId, TimeoutSec]), - Result = {temporary_error, couch_util:to_binary(Msg)}, - exit(#doc_worker_result{id = Id, wref = WRef, result = Result}) - end. - - -% Try to start a replication. Used by a worker. This function should return -% rep_start_result(), also throws {filter_fetch_error, Reason} if cannot fetch -% filter.It can also block for an indeterminate amount of time while fetching -% filter. -maybe_start_replication(Id, RepWithoutId, WRef) -> - Rep = couch_replicator_docs:update_rep_id(RepWithoutId), - case maybe_add_job_to_scheduler(Id, Rep, WRef) of - ignore -> - ignore; - {ok, RepId} -> - {ok, RepId}; - {temporary_error, Reason} -> - {temporary_error, Reason}; - {permanent_failure, Reason} -> - {DbName, DocId} = Id, - couch_replicator_docs:update_failed(DbName, DocId, Reason), - {permanent_failure, Reason} - end. - - --spec maybe_add_job_to_scheduler(db_doc_id(), #rep{}, reference()) -> - rep_start_result(). -maybe_add_job_to_scheduler({DbName, DocId}, Rep, WRef) -> - RepId = Rep#rep.id, - case couch_replicator_scheduler:rep_state(RepId) of - nil -> - % Before adding a job check that this worker is still the current - % worker. This is to handle a race condition where a worker which was - % sleeping and then checking a replication filter may inadvertently - % re-add a replication which was already deleted. - case couch_replicator_doc_processor:get_worker_ref({DbName, DocId}) of - WRef -> - ok = couch_replicator_scheduler:add_job(Rep), - {ok, RepId}; - _NilOrOtherWRef -> - ignore - end; - #rep{doc_id = DocId} -> - {ok, RepId}; - #rep{doc_id = null} -> - Msg = io_lib:format("Replication `~s` specified by document `~s`" - " already running as a transient replication, started via" - " `_replicate` API endpoint", [pp_rep_id(RepId), DocId]), - {temporary_error, couch_util:to_binary(Msg)}; - #rep{db_name = OtherDb, doc_id = OtherDocId} -> - Msg = io_lib:format("Replication `~s` specified by document `~s`" - " already started, triggered by document `~s` from db `~s`", - [pp_rep_id(RepId), DocId, OtherDocId, mem3:dbname(OtherDb)]), - {permanent_failure, couch_util:to_binary(Msg)} - end. - - --ifdef(TEST). - --include_lib("eunit/include/eunit.hrl"). - --define(DB, <<"db">>). --define(DOC1, <<"doc1">>). --define(R1, {"ad08e05057046eabe898a2572bbfb573", ""}). - - -doc_processor_worker_test_() -> - { - foreach, - fun setup/0, - fun teardown/1, - [ - t_should_add_job(), - t_already_running_same_docid(), - t_already_running_transient(), - t_already_running_other_db_other_doc(), - t_spawn_worker(), - t_ignore_if_doc_deleted(), - t_ignore_if_worker_ref_does_not_match() - ] - }. - - -% Replication is already running, with same doc id. Ignore change. -t_should_add_job() -> - ?_test(begin - Id = {?DB, ?DOC1}, - Rep = couch_replicator_docs:parse_rep_doc_without_id(change()), - ?assertEqual({ok, ?R1}, maybe_start_replication(Id, Rep, nil)), - ?assert(added_job()) - end). - - -% Replication is already running, with same doc id. Ignore change. -t_already_running_same_docid() -> - ?_test(begin - Id = {?DB, ?DOC1}, - mock_already_running(?DB, ?DOC1), - Rep = couch_replicator_docs:parse_rep_doc_without_id(change()), - ?assertEqual({ok, ?R1}, maybe_start_replication(Id, Rep, nil)), - ?assert(did_not_add_job()) - end). - - -% There is a transient replication with same replication id running. Ignore. -t_already_running_transient() -> - ?_test(begin - Id = {?DB, ?DOC1}, - mock_already_running(null, null), - Rep = couch_replicator_docs:parse_rep_doc_without_id(change()), - ?assertMatch({temporary_error, _}, maybe_start_replication(Id, Rep, - nil)), - ?assert(did_not_add_job()) - end). - - -% There is a duplicate replication potentially from a different db and doc. -% Write permanent failure to doc. -t_already_running_other_db_other_doc() -> - ?_test(begin - Id = {?DB, ?DOC1}, - mock_already_running(<<"otherdb">>, <<"otherdoc">>), - Rep = couch_replicator_docs:parse_rep_doc_without_id(change()), - ?assertMatch({permanent_failure, _}, maybe_start_replication(Id, Rep, - nil)), - ?assert(did_not_add_job()), - 1 == meck:num_calls(couch_replicator_docs, update_failed, '_') - end). - - -% Should spawn worker -t_spawn_worker() -> - ?_test(begin - Id = {?DB, ?DOC1}, - Rep = couch_replicator_docs:parse_rep_doc_without_id(change()), - WRef = make_ref(), - meck:expect(couch_replicator_doc_processor, get_worker_ref, 1, WRef), - Pid = spawn_worker(Id, Rep, 0, WRef), - Res = receive {'DOWN', _Ref, process, Pid, Reason} -> Reason - after 1000 -> timeout end, - Expect = #doc_worker_result{id = Id, wref = WRef, result = {ok, ?R1}}, - ?assertEqual(Expect, Res), - ?assert(added_job()) - end). - - -% Should not add job if by the time worker got to fetching the filter -% and getting a replication id, replication doc was deleted -t_ignore_if_doc_deleted() -> - ?_test(begin - Id = {?DB, ?DOC1}, - Rep = couch_replicator_docs:parse_rep_doc_without_id(change()), - meck:expect(couch_replicator_doc_processor, get_worker_ref, 1, nil), - ?assertEqual(ignore, maybe_start_replication(Id, Rep, make_ref())), - ?assertNot(added_job()) - end). - - -% Should not add job if by the time worker got to fetchign the filter -% and building a replication id, another worker was spawned. -t_ignore_if_worker_ref_does_not_match() -> - ?_test(begin - Id = {?DB, ?DOC1}, - Rep = couch_replicator_docs:parse_rep_doc_without_id(change()), - meck:expect(couch_replicator_doc_processor, get_worker_ref, 1, - make_ref()), - ?assertEqual(ignore, maybe_start_replication(Id, Rep, make_ref())), - ?assertNot(added_job()) - end). - - -% Test helper functions - -setup() -> - meck:expect(couch_replicator_scheduler, add_job, 1, ok), - meck:expect(config, get, fun(_, _, Default) -> Default end), - meck:expect(couch_server, get_uuid, 0, this_is_snek), - meck:expect(couch_replicator_docs, update_failed, 3, ok), - meck:expect(couch_replicator_scheduler, rep_state, 1, nil), - meck:expect(couch_replicator_doc_processor, get_worker_ref, 1, nil), - ok. - - -teardown(_) -> - meck:unload(). - - -mock_already_running(DbName, DocId) -> - meck:expect(couch_replicator_scheduler, rep_state, - fun(RepId) -> #rep{id = RepId, doc_id = DocId, db_name = DbName} end). - - -added_job() -> - 1 == meck:num_calls(couch_replicator_scheduler, add_job, '_'). - - -did_not_add_job() -> - 0 == meck:num_calls(couch_replicator_scheduler, add_job, '_'). - - -change() -> - {[ - {<<"_id">>, ?DOC1}, - {<<"source">>, <<"http://srchost.local/src">>}, - {<<"target">>, <<"http://tgthost.local/tgt">>} - ]}. - --endif. diff --git a/src/couch_replicator/src/couch_replicator_fabric.erl b/src/couch_replicator/src/couch_replicator_fabric.erl deleted file mode 100644 index 1650105b5..000000000 --- a/src/couch_replicator/src/couch_replicator_fabric.erl +++ /dev/null @@ -1,155 +0,0 @@ -% Licensed under the Apache License, Version 2.0 (the "License"); you may not -% use this file except in compliance with the License. You may obtain a copy of -% the License at -% -% http://www.apache.org/licenses/LICENSE-2.0 -% -% Unless required by applicable law or agreed to in writing, software -% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -% License for the specific language governing permissions and limitations under -% the License. - --module(couch_replicator_fabric). - --export([ - docs/5 -]). - --include_lib("fabric/include/fabric.hrl"). --include_lib("mem3/include/mem3.hrl"). --include_lib("couch/include/couch_db.hrl"). --include_lib("couch_mrview/include/couch_mrview.hrl"). - -docs(DbName, Options, QueryArgs, Callback, Acc) -> - Shards = mem3:shards(DbName), - Workers0 = fabric_util:submit_jobs( - Shards, couch_replicator_fabric_rpc, docs, [Options, QueryArgs]), - RexiMon = fabric_util:create_monitors(Workers0), - try - case fabric_streams:start(Workers0, #shard.ref) of - {ok, Workers} -> - try - docs_int(DbName, Workers, QueryArgs, Callback, Acc) - after - fabric_streams:cleanup(Workers) - end; - {timeout, NewState} -> - DefunctWorkers = fabric_util:remove_done_workers( - NewState#stream_acc.workers, waiting - ), - fabric_util:log_timeout( - DefunctWorkers, - "replicator docs" - ), - Callback({error, timeout}, Acc); - {error, Error} -> - Callback({error, Error}, Acc) - end - after - rexi_monitor:stop(RexiMon) - end. - - -docs_int(DbName, Workers, QueryArgs, Callback, Acc0) -> - #mrargs{limit = Limit, skip = Skip} = QueryArgs, - State = #collector{ - db_name = DbName, - query_args = QueryArgs, - callback = Callback, - counters = fabric_dict:init(Workers, 0), - skip = Skip, - limit = Limit, - user_acc = Acc0, - update_seq = nil - }, - case rexi_utils:recv(Workers, #shard.ref, fun handle_message/3, - State, infinity, 5000) of - {ok, NewState} -> - {ok, NewState#collector.user_acc}; - {timeout, NewState} -> - Callback({error, timeout}, NewState#collector.user_acc); - {error, Resp} -> - {ok, Resp} - end. - -handle_message({rexi_DOWN, _, {_, NodeRef}, _}, _, State) -> - fabric_view:check_down_shards(State, NodeRef); - -handle_message({rexi_EXIT, Reason}, Worker, State) -> - fabric_view:handle_worker_exit(State, Worker, Reason); - -handle_message({meta, Meta0}, {Worker, From}, State) -> - Tot = couch_util:get_value(total, Meta0, 0), - Off = couch_util:get_value(offset, Meta0, 0), - #collector{ - callback = Callback, - counters = Counters0, - total_rows = Total0, - offset = Offset0, - user_acc = AccIn - } = State, - % Assert that we don't have other messages from this - % worker when the total_and_offset message arrives. - 0 = fabric_dict:lookup_element(Worker, Counters0), - rexi:stream_ack(From), - Counters1 = fabric_dict:update_counter(Worker, 1, Counters0), - Total = Total0 + Tot, - Offset = Offset0 + Off, - case fabric_dict:any(0, Counters1) of - true -> - {ok, State#collector{ - counters = Counters1, - total_rows = Total, - offset = Offset - }}; - false -> - FinalOffset = erlang:min(Total, Offset+State#collector.skip), - Meta = [{total, Total}, {offset, FinalOffset}], - {Go, Acc} = Callback({meta, Meta}, AccIn), - {Go, State#collector{ - counters = fabric_dict:decrement_all(Counters1), - total_rows = Total, - offset = FinalOffset, - user_acc = Acc - }} - end; - -handle_message(#view_row{id = Id, doc = Doc} = Row0, {Worker, From}, State) -> - #collector{query_args = Args, counters = Counters0, rows = Rows0} = State, - case maybe_fetch_and_filter_doc(Id, Doc, State) of - {[_ | _]} = NewDoc -> - Row = Row0#view_row{doc = NewDoc}, - Dir = Args#mrargs.direction, - Rows = merge_row(Dir, Row#view_row{worker={Worker, From}}, Rows0), - Counters1 = fabric_dict:update_counter(Worker, 1, Counters0), - State1 = State#collector{rows=Rows, counters=Counters1}, - fabric_view:maybe_send_row(State1); - skip -> - rexi:stream_ack(From), - {ok, State} - end; - -handle_message(complete, Worker, State) -> - Counters = fabric_dict:update_counter(Worker, 1, State#collector.counters), - fabric_view:maybe_send_row(State#collector{counters = Counters}). - - -merge_row(fwd, Row, Rows) -> - lists:keymerge(#view_row.id, [Row], Rows); -merge_row(rev, Row, Rows) -> - lists:rkeymerge(#view_row.id, [Row], Rows). - - -maybe_fetch_and_filter_doc(Id, undecided, State) -> - #collector{db_name = DbName, query_args = #mrargs{extra = Extra}} = State, - FilterStates = proplists:get_value(filter_states, Extra), - case couch_replicator:active_doc(DbName, Id) of - {ok, {Props} = DocInfo} -> - DocState = couch_util:get_value(state, Props), - couch_replicator_utils:filter_state(DocState, FilterStates, DocInfo); - {error, not_found} -> - skip % could have been deleted - end; -maybe_fetch_and_filter_doc(_Id, Doc, _State) -> - Doc. diff --git a/src/couch_replicator/src/couch_replicator_fabric_rpc.erl b/src/couch_replicator/src/couch_replicator_fabric_rpc.erl deleted file mode 100644 index d67f87548..000000000 --- a/src/couch_replicator/src/couch_replicator_fabric_rpc.erl +++ /dev/null @@ -1,97 +0,0 @@ -% Licensed under the Apache License, Version 2.0 (the "License"); you may not -% use this file except in compliance with the License. You may obtain a copy of -% the License at -% -% http://www.apache.org/licenses/LICENSE-2.0 -% -% Unless required by applicable law or agreed to in writing, software -% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -% License for the specific language governing permissions and limitations under -% the License. - --module(couch_replicator_fabric_rpc). - --export([ - docs/3 -]). - --include_lib("fabric/include/fabric.hrl"). --include_lib("couch/include/couch_db.hrl"). --include_lib("couch_mrview/include/couch_mrview.hrl"). - - -docs(DbName, Options, Args0) -> - set_io_priority(DbName, Options), - #mrargs{skip = Skip, limit = Limit, extra = Extra} = Args0, - FilterStates = proplists:get_value(filter_states, Extra), - Args = Args0#mrargs{skip = 0, limit = Skip + Limit}, - HealthThreshold = couch_replicator_scheduler:health_threshold(), - {ok, Db} = couch_db:open_int(DbName, Options), - Acc = {DbName, FilterStates, HealthThreshold}, - couch_mrview:query_all_docs(Db, Args, fun docs_cb/2, Acc). - - -docs_cb({meta, Meta}, Acc) -> - ok = rexi:stream2({meta, Meta}), - {ok, Acc}; -docs_cb({row, Row}, {DbName, States, HealthThreshold} = Acc) -> - Id = couch_util:get_value(id, Row), - Doc = couch_util:get_value(doc, Row), - ViewRow = #view_row{ - id = Id, - key = couch_util:get_value(key, Row), - value = couch_util:get_value(value, Row) - }, - case rep_doc_state(DbName, Id, Doc, States, HealthThreshold) of - skip -> - ok; - Other -> - ok = rexi:stream2(ViewRow#view_row{doc = Other}) - end, - {ok, Acc}; -docs_cb(complete, Acc) -> - ok = rexi:stream_last(complete), - {ok, Acc}. - - -set_io_priority(DbName, Options) -> - case lists:keyfind(io_priority, 1, Options) of - {io_priority, Pri} -> - erlang:put(io_priority, Pri); - false -> - erlang:put(io_priority, {interactive, DbName}) - end. - - -%% Get the state of the replication document. If it is found and has a terminal -%% state then it can be filtered and either included in the results or skipped. -%% If it is not in a terminal state, look it up in the local doc processor ETS -%% table. If it is there then filter by state. If it is not found there either -%% then mark it as `undecided` and let the coordinator try to fetch it. The -%% The idea is to do as much work as possible locally and leave the minimum -%% amount of work for the coordinator. -rep_doc_state(_Shard, <<"_design/", _/binary>>, _, _, _) -> - skip; -rep_doc_state(Shard, Id, {[_ | _]} = Doc, States, HealthThreshold) -> - DbName = mem3:dbname(Shard), - DocInfo = couch_replicator:info_from_doc(DbName, Doc), - case get_doc_state(DocInfo) of - null -> - % Fetch from local doc processor. If there, filter by state. - % If not there, mark as undecided. Let coordinator figure it out. - case couch_replicator_doc_processor:doc_lookup(Shard, Id, - HealthThreshold) of - {ok, EtsInfo} -> - State = get_doc_state(EtsInfo), - couch_replicator_utils:filter_state(State, States, EtsInfo); - {error, not_found} -> - undecided - end; - OtherState when is_atom(OtherState) -> - couch_replicator_utils:filter_state(OtherState, States, DocInfo) - end. - - -get_doc_state({Props})-> - couch_util:get_value(state, Props). diff --git a/src/couch_replicator/src/couch_replicator_httpd_util.erl b/src/couch_replicator/src/couch_replicator_httpd_util.erl deleted file mode 100644 index 624eddd2f..000000000 --- a/src/couch_replicator/src/couch_replicator_httpd_util.erl +++ /dev/null @@ -1,201 +0,0 @@ -% Licensed under the Apache License, Version 2.0 (the "License"); you may not -% use this file except in compliance with the License. You may obtain a copy of -% the License at -% -% http://www.apache.org/licenses/LICENSE-2.0 -% -% Unless required by applicable law or agreed to in writing, software -% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -% License for the specific language governing permissions and limitations under -% the License. - --module(couch_replicator_httpd_util). - --include_lib("couch/include/couch_db.hrl"). --include_lib("couch_mrview/include/couch_mrview.hrl"). - --export([ - validate_rep_props/1, - parse_int_param/5, - parse_replication_state_filter/1, - update_db_name/1, - docs_acc_new/3, - docs_acc_response/1, - docs_cb/2 -]). - --import(couch_httpd, [ - send_json/2, - send_json/3, - send_method_not_allowed/2 -]). - --import(couch_util, [ - to_binary/1 -]). - - -parse_replication_state_filter(undefined) -> - []; % This is the default (wildcard) filter -parse_replication_state_filter(States) when is_list(States) -> - AllStates = couch_replicator:replication_states(), - StrStates = [string:to_lower(S) || S <- string:tokens(States, ",")], - AtomStates = try - [list_to_existing_atom(S) || S <- StrStates] - catch error:badarg -> - Msg1 = io_lib:format("States must be one or more of ~w", [AllStates]), - throw({query_parse_error, ?l2b(Msg1)}) - end, - AllSet = sets:from_list(AllStates), - StatesSet = sets:from_list(AtomStates), - Diff = sets:to_list(sets:subtract(StatesSet, AllSet)), - case Diff of - [] -> - AtomStates; - _ -> - Args = [Diff, AllStates], - Msg2 = io_lib:format("Unknown states ~w. Choose from: ~w", Args), - throw({query_parse_error, ?l2b(Msg2)}) - end. - - -parse_int_param(Req, Param, Default, Min, Max) -> - IntVal = try - list_to_integer(chttpd:qs_value(Req, Param, integer_to_list(Default))) - catch error:badarg -> - Msg1 = io_lib:format("~s must be an integer", [Param]), - throw({query_parse_error, ?l2b(Msg1)}) - end, - case IntVal >= Min andalso IntVal =< Max of - true -> - IntVal; - false -> - Msg2 = io_lib:format("~s not in range of [~w,~w]", [Param, Min, Max]), - throw({query_parse_error, ?l2b(Msg2)}) - end. - - -validate_rep_props([]) -> - ok; -validate_rep_props([{<<"query_params">>, {Params}}|Rest]) -> - lists:foreach(fun - ({_,V}) when is_binary(V) -> ok; - ({K,_}) -> throw({bad_request, - <<K/binary," value must be a string.">>}) - end, Params), - validate_rep_props(Rest); -validate_rep_props([_|Rest]) -> - validate_rep_props(Rest). - - -prepend_val(#vacc{prepend=Prepend}) -> - case Prepend of - undefined -> - ""; - _ -> - Prepend - end. - - -maybe_flush_response(#vacc{bufsize=Size, threshold=Max} = Acc, Data, Len) - when Size > 0 andalso (Size + Len) > Max -> - #vacc{buffer = Buffer, resp = Resp} = Acc, - {ok, R1} = chttpd:send_delayed_chunk(Resp, Buffer), - {ok, Acc#vacc{prepend = ",\r\n", buffer = Data, bufsize = Len, resp = R1}}; -maybe_flush_response(Acc0, Data, Len) -> - #vacc{buffer = Buf, bufsize = Size} = Acc0, - Acc = Acc0#vacc{ - prepend = ",\r\n", - buffer = [Buf | Data], - bufsize = Size + Len - }, - {ok, Acc}. - -docs_acc_new(Req, Db, Threshold) -> - #vacc{db=Db, req=Req, threshold=Threshold}. - -docs_acc_response(#vacc{resp = Resp}) -> - Resp. - -docs_cb({error, Reason}, #vacc{resp=undefined}=Acc) -> - {ok, Resp} = chttpd:send_error(Acc#vacc.req, Reason), - {ok, Acc#vacc{resp=Resp}}; - -docs_cb(complete, #vacc{resp=undefined}=Acc) -> - % Nothing in view - {ok, Resp} = chttpd:send_json(Acc#vacc.req, 200, {[{rows, []}]}), - {ok, Acc#vacc{resp=Resp}}; - -docs_cb(Msg, #vacc{resp=undefined}=Acc) -> - %% Start response - Headers = [], - {ok, Resp} = chttpd:start_delayed_json_response(Acc#vacc.req, 200, Headers), - docs_cb(Msg, Acc#vacc{resp=Resp, should_close=true}); - -docs_cb({error, Reason}, #vacc{resp=Resp}=Acc) -> - {ok, Resp1} = chttpd:send_delayed_error(Resp, Reason), - {ok, Acc#vacc{resp=Resp1}}; - -docs_cb(complete, #vacc{resp=Resp, buffer=Buf, threshold=Max}=Acc) -> - % Finish view output and possibly end the response - {ok, Resp1} = chttpd:close_delayed_json_object(Resp, Buf, "\r\n]}", Max), - case Acc#vacc.should_close of - true -> - {ok, Resp2} = chttpd:end_delayed_json_response(Resp1), - {ok, Acc#vacc{resp=Resp2}}; - _ -> - {ok, Acc#vacc{resp=Resp1, meta_sent=false, row_sent=false, - prepend=",\r\n", buffer=[], bufsize=0}} - end; - -docs_cb({meta, Meta}, #vacc{meta_sent=false, row_sent=false}=Acc) -> - % Sending metadata as we've not sent it or any row yet - Parts = case couch_util:get_value(total, Meta) of - undefined -> []; - Total -> [io_lib:format("\"total_rows\":~p", [adjust_total(Total)])] - end ++ case couch_util:get_value(offset, Meta) of - undefined -> []; - Offset -> [io_lib:format("\"offset\":~p", [Offset])] - end ++ ["\"docs\":["], - Chunk = [prepend_val(Acc), "{", string:join(Parts, ","), "\r\n"], - {ok, AccOut} = maybe_flush_response(Acc, Chunk, iolist_size(Chunk)), - {ok, AccOut#vacc{prepend="", meta_sent=true}}; - - -docs_cb({meta, _Meta}, #vacc{}=Acc) -> - %% ignore metadata - {ok, Acc}; - -docs_cb({row, Row}, #vacc{meta_sent=false}=Acc) -> - %% sorted=false and row arrived before meta - % Adding another row - Chunk = [prepend_val(Acc), "{\"docs\":[\r\n", row_to_json(Row)], - maybe_flush_response(Acc#vacc{meta_sent=true, row_sent=true}, Chunk, iolist_size(Chunk)); - -docs_cb({row, Row}, #vacc{meta_sent=true}=Acc) -> - % Adding another row - Chunk = [prepend_val(Acc), row_to_json(Row)], - maybe_flush_response(Acc#vacc{row_sent=true}, Chunk, iolist_size(Chunk)). - - -update_db_name({Props}) -> - {value, {database, DbName}, Props1} = lists:keytake(database, 1, Props), - {[{database, normalize_db_name(DbName)} | Props1]}. - -normalize_db_name(<<"shards/", _/binary>> = DbName) -> - mem3:dbname(DbName); -normalize_db_name(DbName) -> - DbName. - -row_to_json(Row) -> - Doc0 = couch_util:get_value(doc, Row), - Doc1 = update_db_name(Doc0), - ?JSON_ENCODE(Doc1). - - -%% Adjust Total as there is an automatically created validation design doc -adjust_total(Total) when is_integer(Total), Total > 0 -> - Total - 1; -adjust_total(Total) when is_integer(Total) -> - 0. diff --git a/src/couch_replicator/src/couch_replicator_job_sup.erl b/src/couch_replicator/src/couch_replicator_job_sup.erl deleted file mode 100644 index 9ea65e85f..000000000 --- a/src/couch_replicator/src/couch_replicator_job_sup.erl +++ /dev/null @@ -1,34 +0,0 @@ -% Licensed under the Apache License, Version 2.0 (the "License"); you may not -% use this file except in compliance with the License. You may obtain a copy of -% the License at -% -% http://www.apache.org/licenses/LICENSE-2.0 -% -% Unless required by applicable law or agreed to in writing, software -% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -% License for the specific language governing permissions and limitations under -% the License. - --module(couch_replicator_job_sup). - --behaviour(supervisor). - --export([ - init/1, - start_link/0 -]). - -start_link() -> - supervisor:start_link({local,?MODULE}, ?MODULE, []). - -%%============================================================================= -%% supervisor callbacks -%%============================================================================= - -init([]) -> - {ok, {{one_for_one, 3, 10}, []}}. - -%%============================================================================= -%% internal functions -%%============================================================================= diff --git a/src/couch_replicator/src/couch_replicator_js_functions.hrl b/src/couch_replicator/src/couch_replicator_js_functions.hrl deleted file mode 100644 index d41043309..000000000 --- a/src/couch_replicator/src/couch_replicator_js_functions.hrl +++ /dev/null @@ -1,177 +0,0 @@ -% Licensed under the Apache License, Version 2.0 (the "License"); you may not -% use this file except in compliance with the License. You may obtain a copy of -% the License at -% -% http://www.apache.org/licenses/LICENSE-2.0 -% -% Unless required by applicable law or agreed to in writing, software -% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -% License for the specific language governing permissions and limitations under -% the License. - --define(REP_DB_DOC_VALIDATE_FUN, <<" - function(newDoc, oldDoc, userCtx) { - function reportError(error_msg) { - log('Error writing document `' + newDoc._id + - '\\' to the replicator database: ' + error_msg); - throw({forbidden: error_msg}); - } - - function validateEndpoint(endpoint, fieldName) { - if ((typeof endpoint !== 'string') && - ((typeof endpoint !== 'object') || (endpoint === null))) { - - reportError('The `' + fieldName + '\\' property must exist' + - ' and be either a string or an object.'); - } - - if (typeof endpoint === 'object') { - if ((typeof endpoint.url !== 'string') || !endpoint.url) { - reportError('The url property must exist in the `' + - fieldName + '\\' field and must be a non-empty string.'); - } - - if ((typeof endpoint.auth !== 'undefined') && - ((typeof endpoint.auth !== 'object') || - endpoint.auth === null)) { - - reportError('`' + fieldName + - '.auth\\' must be a non-null object.'); - } - - if ((typeof endpoint.headers !== 'undefined') && - ((typeof endpoint.headers !== 'object') || - endpoint.headers === null)) { - - reportError('`' + fieldName + - '.headers\\' must be a non-null object.'); - } - } - } - - var isReplicator = (userCtx.roles.indexOf('_replicator') >= 0); - var isAdmin = (userCtx.roles.indexOf('_admin') >= 0); - - if (isReplicator) { - // Always let replicator update the replication document - return; - } - - if (newDoc._replication_state === 'failed') { - // Skip validation in case when we update the document with the - // failed state. In this case it might be malformed. However, - // replicator will not pay attention to failed documents so this - // is safe. - return; - } - - if (!newDoc._deleted) { - validateEndpoint(newDoc.source, 'source'); - validateEndpoint(newDoc.target, 'target'); - - if ((typeof newDoc.create_target !== 'undefined') && - (typeof newDoc.create_target !== 'boolean')) { - - reportError('The `create_target\\' field must be a boolean.'); - } - - if ((typeof newDoc.continuous !== 'undefined') && - (typeof newDoc.continuous !== 'boolean')) { - - reportError('The `continuous\\' field must be a boolean.'); - } - - if ((typeof newDoc.doc_ids !== 'undefined') && - !isArray(newDoc.doc_ids)) { - - reportError('The `doc_ids\\' field must be an array of strings.'); - } - - if ((typeof newDoc.selector !== 'undefined') && - (typeof newDoc.selector !== 'object')) { - - reportError('The `selector\\' field must be an object.'); - } - - if ((typeof newDoc.filter !== 'undefined') && - ((typeof newDoc.filter !== 'string') || !newDoc.filter)) { - - reportError('The `filter\\' field must be a non-empty string.'); - } - - if ((typeof newDoc.doc_ids !== 'undefined') && - (typeof newDoc.selector !== 'undefined')) { - - reportError('`doc_ids\\' field is incompatible with `selector\\'.'); - } - - if ( ((typeof newDoc.doc_ids !== 'undefined') || - (typeof newDoc.selector !== 'undefined')) && - (typeof newDoc.filter !== 'undefined') ) { - - reportError('`filter\\' field is incompatible with `selector\\' and `doc_ids\\'.'); - } - - if ((typeof newDoc.query_params !== 'undefined') && - ((typeof newDoc.query_params !== 'object') || - newDoc.query_params === null)) { - - reportError('The `query_params\\' field must be an object.'); - } - - if (newDoc.user_ctx) { - var user_ctx = newDoc.user_ctx; - - if ((typeof user_ctx !== 'object') || (user_ctx === null)) { - reportError('The `user_ctx\\' property must be a ' + - 'non-null object.'); - } - - if (!(user_ctx.name === null || - (typeof user_ctx.name === 'undefined') || - ((typeof user_ctx.name === 'string') && - user_ctx.name.length > 0))) { - - reportError('The `user_ctx.name\\' property must be a ' + - 'non-empty string or null.'); - } - - if (!isAdmin && (user_ctx.name !== userCtx.name)) { - reportError('The given `user_ctx.name\\' is not valid'); - } - - if (user_ctx.roles && !isArray(user_ctx.roles)) { - reportError('The `user_ctx.roles\\' property must be ' + - 'an array of strings.'); - } - - if (!isAdmin && user_ctx.roles) { - for (var i = 0; i < user_ctx.roles.length; i++) { - var role = user_ctx.roles[i]; - - if (typeof role !== 'string' || role.length === 0) { - reportError('Roles must be non-empty strings.'); - } - if (userCtx.roles.indexOf(role) === -1) { - reportError('Invalid role (`' + role + - '\\') in the `user_ctx\\''); - } - } - } - } else { - if (!isAdmin) { - reportError('The `user_ctx\\' property is missing (it is ' + - 'optional for admins only).'); - } - } - } else { - if (!isAdmin) { - if (!oldDoc.user_ctx || (oldDoc.user_ctx.name !== userCtx.name)) { - reportError('Replication documents can only be deleted by ' + - 'admins or by the users who created them.'); - } - } - } - } -">>). diff --git a/src/couch_replicator/src/couch_replicator_notifier.erl b/src/couch_replicator/src/couch_replicator_notifier.erl deleted file mode 100644 index f7640a349..000000000 --- a/src/couch_replicator/src/couch_replicator_notifier.erl +++ /dev/null @@ -1,58 +0,0 @@ -% Licensed under the Apache License, Version 2.0 (the "License"); you may not -% use this file except in compliance with the License. You may obtain a copy of -% the License at -% -% http://www.apache.org/licenses/LICENSE-2.0 -% -% Unless required by applicable law or agreed to in writing, software -% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -% License for the specific language governing permissions and limitations under -% the License. - --module(couch_replicator_notifier). - --behaviour(gen_event). --vsn(1). - -% public API --export([start_link/1, stop/1, notify/1]). - -% gen_event callbacks --export([init/1, terminate/2, code_change/3]). --export([handle_event/2, handle_call/2, handle_info/2]). - --include_lib("couch/include/couch_db.hrl"). - -start_link(FunAcc) -> - couch_event_sup:start_link(couch_replication, - {couch_replicator_notifier, make_ref()}, FunAcc). - -notify(Event) -> - gen_event:notify(couch_replication, Event). - -stop(Pid) -> - couch_event_sup:stop(Pid). - - -init(FunAcc) -> - {ok, FunAcc}. - -terminate(_Reason, _State) -> - ok. - -handle_event(Event, Fun) when is_function(Fun, 1) -> - Fun(Event), - {ok, Fun}; -handle_event(Event, {Fun, Acc}) when is_function(Fun, 2) -> - Acc2 = Fun(Event, Acc), - {ok, {Fun, Acc2}}. - -handle_call(_Msg, State) -> - {ok, ok, State}. - -handle_info(_Msg, State) -> - {ok, State}. - -code_change(_OldVsn, State, _Extra) -> - {ok, State}. diff --git a/src/couch_replicator/src/couch_replicator_scheduler.erl b/src/couch_replicator/src/couch_replicator_scheduler.erl deleted file mode 100644 index 00a352bee..000000000 --- a/src/couch_replicator/src/couch_replicator_scheduler.erl +++ /dev/null @@ -1,1688 +0,0 @@ -% Licensed under the Apache License, Version 2.0 (the "License"); you may not -% use this file except in compliance with the License. You may obtain a copy of -% the License at -% -% http://www.apache.org/licenses/LICENSE-2.0 -% -% Unless required by applicable law or agreed to in writing, software -% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -% License for the specific language governing permissions and limitations under -% the License. - --module(couch_replicator_scheduler). - --behaviour(gen_server). --behaviour(config_listener). - --export([ - start_link/0 -]). - --export([ - init/1, - terminate/2, - handle_call/3, - handle_info/2, - handle_cast/2, - code_change/3, - format_status/2 -]). - --export([ - add_job/1, - remove_job/1, - reschedule/0, - rep_state/1, - find_jobs_by_dbname/1, - find_jobs_by_doc/2, - job_summary/2, - health_threshold/0, - jobs/0, - job/1, - restart_job/1, - update_job_stats/2 -]). - -%% config_listener callbacks --export([ - handle_config_change/5, - handle_config_terminate/3 -]). - -%% for status updater process to allow hot code loading --export([ - stats_updater_loop/1 -]). - --include("couch_replicator_scheduler.hrl"). --include("couch_replicator.hrl"). --include_lib("couch_replicator/include/couch_replicator_api_wrap.hrl"). --include_lib("couch/include/couch_db.hrl"). - -%% types --type event_type() :: added | started | stopped | {crashed, any()}. --type event() :: {Type:: event_type(), When :: erlang:timestamp()}. --type history() :: nonempty_list(event()). - -%% definitions --define(MAX_BACKOFF_EXPONENT, 10). --define(BACKOFF_INTERVAL_MICROS, 30 * 1000 * 1000). --define(DEFAULT_HEALTH_THRESHOLD_SEC, 2 * 60). --define(RELISTEN_DELAY, 5000). --define(STATS_UPDATE_WAIT, 5000). - --define(DEFAULT_MAX_JOBS, 500). --define(DEFAULT_MAX_CHURN, 20). --define(DEFAULT_MAX_HISTORY, 20). --define(DEFAULT_SCHEDULER_INTERVAL, 60000). - - --record(state, {interval, timer, max_jobs, max_churn, max_history, stats_pid}). --record(job, { - id :: job_id() | '$1' | '_', - rep :: #rep{} | '_', - pid :: undefined | pid() | '$1' | '_', - monitor :: undefined | reference() | '_', - history :: history() | '_' -}). - --record(stats_acc, { - pending_n = 0 :: non_neg_integer(), - running_n = 0 :: non_neg_integer(), - crashed_n = 0 :: non_neg_integer() -}). - - -%% public functions - --spec start_link() -> {ok, pid()} | ignore | {error, term()}. -start_link() -> - gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). - - --spec add_job(#rep{}) -> ok. -add_job(#rep{} = Rep) when Rep#rep.id /= undefined -> - case existing_replication(Rep) of - false -> - Job = #job{ - id = Rep#rep.id, - rep = Rep, - history = [{added, os:timestamp()}] - }, - gen_server:call(?MODULE, {add_job, Job}, infinity); - true -> - ok - end. - - --spec remove_job(job_id()) -> ok. -remove_job(Id) -> - gen_server:call(?MODULE, {remove_job, Id}, infinity). - - --spec reschedule() -> ok. -% Trigger a manual reschedule. Used for testing and/or ops. -reschedule() -> - gen_server:call(?MODULE, reschedule, infinity). - - --spec rep_state(rep_id()) -> #rep{} | nil. -rep_state(RepId) -> - case (catch ets:lookup_element(?MODULE, RepId, #job.rep)) of - {'EXIT',{badarg, _}} -> - nil; - Rep -> - Rep - end. - - --spec job_summary(job_id(), non_neg_integer()) -> [_] | nil. -job_summary(JobId, HealthThreshold) -> - case job_by_id(JobId) of - {ok, #job{pid = Pid, history = History, rep = Rep}} -> - ErrorCount = consecutive_crashes(History, HealthThreshold), - {State, Info} = case {Pid, ErrorCount} of - {undefined, 0} -> - case History of - [{{crashed, Error}, _When} | _] -> - {crashing, crash_reason_json(Error)}; - [_ | _] -> - {pending, Rep#rep.stats} - end; - {undefined, ErrorCount} when ErrorCount > 0 -> - [{{crashed, Error}, _When} | _] = History, - {crashing, crash_reason_json(Error)}; - {Pid, ErrorCount} when is_pid(Pid) -> - {running, Rep#rep.stats} - end, - [ - {source, iolist_to_binary(ejson_url(Rep#rep.source))}, - {target, iolist_to_binary(ejson_url(Rep#rep.target))}, - {state, State}, - {info, couch_replicator_utils:ejson_state_info(Info)}, - {error_count, ErrorCount}, - {last_updated, last_updated(History)}, - {start_time, - couch_replicator_utils:iso8601(Rep#rep.start_time)}, - {source_proxy, job_proxy_url(Rep#rep.source)}, - {target_proxy, job_proxy_url(Rep#rep.target)} - ]; - {error, not_found} -> - nil % Job might have just completed - end. - - -job_proxy_url(#httpdb{proxy_url = ProxyUrl}) when is_list(ProxyUrl) -> - list_to_binary(couch_util:url_strip_password(ProxyUrl)); -job_proxy_url(_Endpoint) -> - null. - - -% Health threshold is the minimum amount of time an unhealthy job should run -% crashing before it is considered to be healthy again. HealtThreashold should -% not be 0 as jobs could start and immediately crash, and it shouldn't be -% infinity, since then consecutive crashes would accumulate forever even if -% job is back to normal. --spec health_threshold() -> non_neg_integer(). -health_threshold() -> - config:get_integer("replicator", "health_threshold", - ?DEFAULT_HEALTH_THRESHOLD_SEC). - - --spec find_jobs_by_dbname(binary()) -> list(#rep{}). -find_jobs_by_dbname(DbName) -> - Rep = #rep{db_name = DbName, _ = '_'}, - MatchSpec = #job{id = '$1', rep = Rep, _ = '_'}, - [RepId || [RepId] <- ets:match(?MODULE, MatchSpec)]. - - --spec find_jobs_by_doc(binary(), binary()) -> list(#rep{}). -find_jobs_by_doc(DbName, DocId) -> - Rep = #rep{db_name = DbName, doc_id = DocId, _ = '_'}, - MatchSpec = #job{id = '$1', rep = Rep, _ = '_'}, - [RepId || [RepId] <- ets:match(?MODULE, MatchSpec)]. - - --spec restart_job(binary() | list() | rep_id()) -> - {ok, {[_]}} | {error, not_found}. -restart_job(JobId) -> - case rep_state(JobId) of - nil -> - {error, not_found}; - #rep{} = Rep -> - ok = remove_job(JobId), - ok = add_job(Rep), - job(JobId) - end. - - --spec update_job_stats(job_id(), term()) -> ok. -update_job_stats(JobId, Stats) -> - gen_server:cast(?MODULE, {update_job_stats, JobId, Stats}). - - -%% gen_server functions - -init(_) -> - % Temporarily disable on FDB, as it's not fully implemented yet - % config:enable_feature('scheduler'), - EtsOpts = [named_table, {keypos, #job.id}, {read_concurrency, true}, - {write_concurrency, true}], - ?MODULE = ets:new(?MODULE, EtsOpts), - ok = config:listen_for_changes(?MODULE, nil), - Interval = config:get_integer("replicator", "interval", - ?DEFAULT_SCHEDULER_INTERVAL), - MaxJobs = config:get_integer("replicator", "max_jobs", ?DEFAULT_MAX_JOBS), - MaxChurn = config:get_integer("replicator", "max_churn", - ?DEFAULT_MAX_CHURN), - MaxHistory = config:get_integer("replicator", "max_history", - ?DEFAULT_MAX_HISTORY), - Timer = erlang:send_after(Interval, self(), reschedule), - State = #state{ - interval = Interval, - max_jobs = MaxJobs, - max_churn = MaxChurn, - max_history = MaxHistory, - timer = Timer, - stats_pid = start_stats_updater() - }, - {ok, State}. - - -handle_call({add_job, Job}, _From, State) -> - ok = maybe_remove_job_int(Job#job.id, State), - true = add_job_int(Job), - ok = maybe_start_newly_added_job(Job, State), - couch_stats:increment_counter([couch_replicator, jobs, adds]), - TotalJobs = ets:info(?MODULE, size), - couch_stats:update_gauge([couch_replicator, jobs, total], TotalJobs), - {reply, ok, State}; - -handle_call({remove_job, Id}, _From, State) -> - ok = maybe_remove_job_int(Id, State), - {reply, ok, State}; - -handle_call(reschedule, _From, State) -> - ok = reschedule(State), - {reply, ok, State}; - -handle_call(_, _From, State) -> - {noreply, State}. - - -handle_cast({set_max_jobs, MaxJobs}, State) when is_integer(MaxJobs), - MaxJobs >= 0 -> - couch_log:notice("~p: max_jobs set to ~B", [?MODULE, MaxJobs]), - {noreply, State#state{max_jobs = MaxJobs}}; - -handle_cast({set_max_churn, MaxChurn}, State) when is_integer(MaxChurn), - MaxChurn > 0 -> - couch_log:notice("~p: max_churn set to ~B", [?MODULE, MaxChurn]), - {noreply, State#state{max_churn = MaxChurn}}; - -handle_cast({set_max_history, MaxHistory}, State) when is_integer(MaxHistory), - MaxHistory > 0 -> - couch_log:notice("~p: max_history set to ~B", [?MODULE, MaxHistory]), - {noreply, State#state{max_history = MaxHistory}}; - -handle_cast({set_interval, Interval}, State) when is_integer(Interval), - Interval > 0 -> - couch_log:notice("~p: interval set to ~B", [?MODULE, Interval]), - {noreply, State#state{interval = Interval}}; - -handle_cast({update_job_stats, JobId, Stats}, State) -> - case rep_state(JobId) of - nil -> - ok; - #rep{} = Rep -> - NewRep = Rep#rep{stats = Stats}, - true = ets:update_element(?MODULE, JobId, {#job.rep, NewRep}) - end, - {noreply, State}; - -handle_cast(UnexpectedMsg, State) -> - couch_log:error("~p: received un-expected cast ~p", [?MODULE, UnexpectedMsg]), - {noreply, State}. - - -handle_info(reschedule, State) -> - ok = reschedule(State), - erlang:cancel_timer(State#state.timer), - Timer = erlang:send_after(State#state.interval, self(), reschedule), - {noreply, State#state{timer = Timer}}; - -handle_info({'DOWN', _Ref, process, Pid, normal}, State) -> - {ok, Job} = job_by_pid(Pid), - couch_log:notice("~p: Job ~p completed normally", [?MODULE, Job#job.id]), - remove_job_int(Job), - update_running_jobs_stats(State#state.stats_pid), - {noreply, State}; - -handle_info({'DOWN', _Ref, process, Pid, Reason0}, State) -> - {ok, Job} = job_by_pid(Pid), - Reason = case Reason0 of - {shutdown, ShutdownReason} -> ShutdownReason; - Other -> Other - end, - ok = handle_crashed_job(Job, Reason, State), - {noreply, State}; - -handle_info(restart_config_listener, State) -> - ok = config:listen_for_changes(?MODULE, nil), - {noreply, State}; - -handle_info(_, State) -> - {noreply, State}. - - -code_change(_OldVsn, State, _Extra) -> - {ok, State}. - - -terminate(_Reason, _State) -> - ok. - - -format_status(_Opt, [_PDict, State]) -> - [ - {max_jobs, State#state.max_jobs}, - {running_jobs, running_job_count()}, - {pending_jobs, pending_job_count()} - ]. - - -%% config listener functions - -handle_config_change("replicator", "max_jobs", V, _, S) -> - ok = gen_server:cast(?MODULE, {set_max_jobs, list_to_integer(V)}), - {ok, S}; - -handle_config_change("replicator", "max_churn", V, _, S) -> - ok = gen_server:cast(?MODULE, {set_max_churn, list_to_integer(V)}), - {ok, S}; - -handle_config_change("replicator", "interval", V, _, S) -> - ok = gen_server:cast(?MODULE, {set_interval, list_to_integer(V)}), - {ok, S}; - -handle_config_change("replicator", "max_history", V, _, S) -> - ok = gen_server:cast(?MODULE, {set_max_history, list_to_integer(V)}), - {ok, S}; - -handle_config_change(_, _, _, _, S) -> - {ok, S}. - - -handle_config_terminate(_, stop, _) -> - ok; - -handle_config_terminate(_, _, _) -> - Pid = whereis(?MODULE), - erlang:send_after(?RELISTEN_DELAY, Pid, restart_config_listener). - - -%% Private functions - -% Handle crashed jobs. Handling differs between transient and permanent jobs. -% Transient jobs are those posted to the _replicate endpoint. They don't have a -% db associated with them. When those jobs crash, they are not restarted. That -% is also consistent with behavior when the node they run on, crashed and they -% do not migrate to other nodes. Permanent jobs are those created from -% replicator documents. Those jobs, once they pass basic validation and end up -% in the scheduler will be retried indefinitely (with appropriate exponential -% backoffs). --spec handle_crashed_job(#job{}, any(), #state{}) -> ok. -handle_crashed_job(#job{rep = #rep{db_name = null}} = Job, Reason, State) -> - Msg = "~p : Transient job ~p failed, removing. Error: ~p", - ErrorBinary = couch_replicator_utils:rep_error_to_binary(Reason), - couch_log:error(Msg, [?MODULE, Job#job.id, ErrorBinary]), - remove_job_int(Job), - update_running_jobs_stats(State#state.stats_pid), - ok; - -handle_crashed_job(Job, Reason, State) -> - ok = update_state_crashed(Job, Reason, State), - case couch_replicator_doc_processor:update_docs() of - true -> - couch_replicator_docs:update_error(Job#job.rep, Reason); - false -> - ok - end, - case ets:info(?MODULE, size) < State#state.max_jobs of - true -> - % Starting pending jobs is an O(TotalJobsCount) operation. Only do - % it if there is a relatively small number of jobs. Otherwise - % scheduler could be blocked if there is a cascade of lots failing - % jobs in a row. - start_pending_jobs(State), - update_running_jobs_stats(State#state.stats_pid), - ok; - false -> - ok - end. - - -% Attempt to start a newly added job. First quickly check if total jobs -% already exceed max jobs, then do a more expensive check which runs a -% select (an O(n) operation) to check pending jobs specifically. --spec maybe_start_newly_added_job(#job{}, #state{}) -> ok. -maybe_start_newly_added_job(Job, State) -> - MaxJobs = State#state.max_jobs, - TotalJobs = ets:info(?MODULE, size), - case TotalJobs < MaxJobs andalso running_job_count() < MaxJobs of - true -> - start_job_int(Job, State), - update_running_jobs_stats(State#state.stats_pid), - ok; - false -> - ok - end. - - -% Return up to a given number of oldest, not recently crashed jobs. Try to be -% memory efficient and use ets:foldl to accumulate jobs. --spec pending_jobs(non_neg_integer()) -> [#job{}]. -pending_jobs(0) -> - % Handle this case as user could set max_churn to 0. If this is passed to - % other function clause it will crash as gb_sets:largest assumes set is not - % empty. - []; - -pending_jobs(Count) when is_integer(Count), Count > 0 -> - Set0 = gb_sets:new(), % [{LastStart, Job},...] - Now = os:timestamp(), - Acc0 = {Set0, Now, Count, health_threshold()}, - {Set1, _, _, _} = ets:foldl(fun pending_fold/2, Acc0, ?MODULE), - [Job || {_Started, Job} <- gb_sets:to_list(Set1)]. - - -pending_fold(Job, {Set, Now, Count, HealthThreshold}) -> - Set1 = case {not_recently_crashed(Job, Now, HealthThreshold), - gb_sets:size(Set) >= Count} of - {true, true} -> - % Job is healthy but already reached accumulated limit, so might - % have to replace one of the accumulated jobs - pending_maybe_replace(Job, Set); - {true, false} -> - % Job is healthy and we haven't reached the limit, so add job - % to accumulator - gb_sets:add_element({last_started(Job), Job}, Set); - {false, _} -> - % This job is not healthy (has crashed too recently), so skip it. - Set - end, - {Set1, Now, Count, HealthThreshold}. - - -% Replace Job in the accumulator if it is older than youngest job there. -% "oldest" here means one which has been waiting to run the longest. "youngest" -% means the one with most recent activity. The goal is to keep up to Count -% oldest jobs during iteration. For example if there are jobs with these times -% accumulated so far [5, 7, 11], and start time of current job is 6. Then -% 6 < 11 is true, so 11 (youngest) is dropped and 6 inserted resulting in -% [5, 6, 7]. In the end the result might look like [1, 2, 5], for example. -pending_maybe_replace(Job, Set) -> - Started = last_started(Job), - {Youngest, YoungestJob} = gb_sets:largest(Set), - case Started < Youngest of - true -> - Set1 = gb_sets:delete({Youngest, YoungestJob}, Set), - gb_sets:add_element({Started, Job}, Set1); - false -> - Set - end. - - -start_jobs(Count, State) -> - [start_job_int(Job, State) || Job <- pending_jobs(Count)], - ok. - - --spec stop_jobs(non_neg_integer(), boolean(), #state{}) -> non_neg_integer(). -stop_jobs(Count, _, _) when is_integer(Count), Count =< 0 -> - 0; - -stop_jobs(Count, IsContinuous, State) when is_integer(Count) -> - Running0 = running_jobs(), - ContinuousPred = fun(Job) -> is_continuous(Job) =:= IsContinuous end, - Running1 = lists:filter(ContinuousPred, Running0), - Running2 = lists:sort(fun longest_running/2, Running1), - Running3 = lists:sublist(Running2, Count), - length([stop_job_int(Job, State) || Job <- Running3]). - - -longest_running(#job{} = A, #job{} = B) -> - last_started(A) =< last_started(B). - - -not_recently_crashed(#job{history = History}, Now, HealthThreshold) -> - case History of - [{added, _When}] -> - true; - [{stopped, _When} | _] -> - true; - _ -> - LatestCrashT = latest_crash_timestamp(History), - CrashCount = consecutive_crashes(History, HealthThreshold), - timer:now_diff(Now, LatestCrashT) >= backoff_micros(CrashCount) - end. - - -% Count consecutive crashes. A crash happens when there is a `crashed` event -% within a short period of time (configurable) after any other event. It could -% be `crashed, started` for jobs crashing quickly after starting, `crashed, -% crashed`, `crashed, stopped` if job repeatedly failed to start -% being stopped. Or it could be `crashed, added` if it crashed immediately after -% being added during start. -% -% A streak of "consecutive crashes" ends when a crashed event is seen starting -% and running successfully without crashing for a period of time. That period -% of time is the HealthThreshold. -% - --spec consecutive_crashes(history(), non_neg_integer()) -> non_neg_integer(). -consecutive_crashes(History, HealthThreshold) when is_list(History) -> - consecutive_crashes(History, HealthThreshold, 0). - - --spec consecutive_crashes(history(), non_neg_integer(), non_neg_integer()) -> - non_neg_integer(). -consecutive_crashes([], _HealthThreashold, Count) -> - Count; - -consecutive_crashes([{{crashed, _}, CrashT}, {_, PrevT} = PrevEvent | Rest], - HealthThreshold, Count) -> - case timer:now_diff(CrashT, PrevT) > HealthThreshold * 1000000 of - true -> - Count; - false -> - consecutive_crashes([PrevEvent | Rest], HealthThreshold, Count + 1) - end; - -consecutive_crashes([{stopped, _}, {started, _} | _], _HealthThreshold, - Count) -> - Count; - -consecutive_crashes([_ | Rest], HealthThreshold, Count) -> - consecutive_crashes(Rest, HealthThreshold, Count). - - --spec latest_crash_timestamp(history()) -> erlang:timestamp(). -latest_crash_timestamp([]) -> - {0, 0, 0}; % Used to avoid special-casing "no crash" when doing now_diff - -latest_crash_timestamp([{{crashed, _Reason}, When} | _]) -> - When; - -latest_crash_timestamp([_Event | Rest]) -> - latest_crash_timestamp(Rest). - - --spec backoff_micros(non_neg_integer()) -> non_neg_integer(). -backoff_micros(CrashCount) -> - % When calculating the backoff interval treat consecutive crash count as the - % exponent in Base * 2 ^ CrashCount to achieve an exponential backoff - % doubling every consecutive failure, starting with the base value of - % ?BACKOFF_INTERVAL_MICROS. - BackoffExp = erlang:min(CrashCount - 1, ?MAX_BACKOFF_EXPONENT), - (1 bsl BackoffExp) * ?BACKOFF_INTERVAL_MICROS. - - --spec add_job_int(#job{}) -> boolean(). -add_job_int(#job{} = Job) -> - ets:insert_new(?MODULE, Job). - - --spec maybe_remove_job_int(job_id(), #state{}) -> ok. -maybe_remove_job_int(JobId, State) -> - case job_by_id(JobId) of - {ok, Job} -> - ok = stop_job_int(Job, State), - true = remove_job_int(Job), - couch_stats:increment_counter([couch_replicator, jobs, removes]), - TotalJobs = ets:info(?MODULE, size), - couch_stats:update_gauge([couch_replicator, jobs, total], - TotalJobs), - update_running_jobs_stats(State#state.stats_pid), - ok; - {error, not_found} -> - ok - end. - - -start_job_int(#job{pid = Pid}, _State) when Pid /= undefined -> - ok; - -start_job_int(#job{} = Job0, State) -> - Job = maybe_optimize_job_for_rate_limiting(Job0), - case couch_replicator_scheduler_sup:start_child(Job#job.rep) of - {ok, Child} -> - Ref = monitor(process, Child), - ok = update_state_started(Job, Child, Ref, State), - couch_log:notice("~p: Job ~p started as ~p", - [?MODULE, Job#job.id, Child]); - {error, {already_started, OtherPid}} when node(OtherPid) =:= node() -> - Ref = monitor(process, OtherPid), - ok = update_state_started(Job, OtherPid, Ref, State), - couch_log:notice("~p: Job ~p already running as ~p. Most likely" - " because replicator scheduler was restarted", - [?MODULE, Job#job.id, OtherPid]); - {error, {already_started, OtherPid}} when node(OtherPid) =/= node() -> - CrashMsg = "Duplicate replication running on another node", - couch_log:notice("~p: Job ~p already running as ~p. Most likely" - " because a duplicate replication is running on another node", - [?MODULE, Job#job.id, OtherPid]), - ok = update_state_crashed(Job, CrashMsg, State); - {error, Reason} -> - couch_log:notice("~p: Job ~p failed to start for reason ~p", - [?MODULE, Job, Reason]), - ok = update_state_crashed(Job, Reason, State) - end. - - --spec stop_job_int(#job{}, #state{}) -> ok | {error, term()}. -stop_job_int(#job{pid = undefined}, _State) -> - ok; - -stop_job_int(#job{} = Job, State) -> - ok = couch_replicator_scheduler_sup:terminate_child(Job#job.pid), - demonitor(Job#job.monitor, [flush]), - ok = update_state_stopped(Job, State), - couch_log:notice("~p: Job ~p stopped as ~p", - [?MODULE, Job#job.id, Job#job.pid]). - - --spec remove_job_int(#job{}) -> true. -remove_job_int(#job{} = Job) -> - ets:delete(?MODULE, Job#job.id). - - --spec running_job_count() -> non_neg_integer(). -running_job_count() -> - ets:info(?MODULE, size) - pending_job_count(). - - --spec running_jobs() -> [#job{}]. -running_jobs() -> - ets:select(?MODULE, [{#job{pid = '$1', _='_'}, [{is_pid, '$1'}], ['$_']}]). - - --spec pending_job_count() -> non_neg_integer(). -pending_job_count() -> - ets:select_count(?MODULE, [{#job{pid=undefined, _='_'}, [], [true]}]). - - --spec job_by_pid(pid()) -> {ok, #job{}} | {error, not_found}. -job_by_pid(Pid) when is_pid(Pid) -> - case ets:match_object(?MODULE, #job{pid=Pid, _='_'}) of - [] -> - {error, not_found}; - [#job{}=Job] -> - {ok, Job} - end. - - --spec job_by_id(job_id()) -> {ok, #job{}} | {error, not_found}. -job_by_id(Id) -> - case ets:lookup(?MODULE, Id) of - [] -> - {error, not_found}; - [#job{}=Job] -> - {ok, Job} - end. - - --spec update_state_stopped(#job{}, #state{}) -> ok. -update_state_stopped(Job, State) -> - Job1 = reset_job_process(Job), - Job2 = update_history(Job1, stopped, os:timestamp(), State), - true = ets:insert(?MODULE, Job2), - couch_stats:increment_counter([couch_replicator, jobs, stops]), - ok. - - --spec update_state_started(#job{}, pid(), reference(), #state{}) -> ok. -update_state_started(Job, Pid, Ref, State) -> - Job1 = set_job_process(Job, Pid, Ref), - Job2 = update_history(Job1, started, os:timestamp(), State), - true = ets:insert(?MODULE, Job2), - couch_stats:increment_counter([couch_replicator, jobs, starts]), - ok. - - --spec update_state_crashed(#job{}, any(), #state{}) -> ok. -update_state_crashed(Job, Reason, State) -> - Job1 = reset_job_process(Job), - Job2 = update_history(Job1, {crashed, Reason}, os:timestamp(), State), - true = ets:insert(?MODULE, Job2), - couch_stats:increment_counter([couch_replicator, jobs, crashes]), - ok. - - --spec set_job_process(#job{}, pid(), reference()) -> #job{}. -set_job_process(#job{} = Job, Pid, Ref) when is_pid(Pid), is_reference(Ref) -> - Job#job{pid = Pid, monitor = Ref}. - - --spec reset_job_process(#job{}) -> #job{}. -reset_job_process(#job{} = Job) -> - Job#job{pid = undefined, monitor = undefined}. - - --spec reschedule(#state{}) -> ok. -reschedule(State) -> - StopCount = stop_excess_jobs(State, running_job_count()), - rotate_jobs(State, StopCount), - update_running_jobs_stats(State#state.stats_pid). - - --spec stop_excess_jobs(#state{}, non_neg_integer()) -> non_neg_integer(). -stop_excess_jobs(State, Running) -> - #state{max_jobs=MaxJobs} = State, - StopCount = max(0, Running - MaxJobs), - Stopped = stop_jobs(StopCount, true, State), - OneshotLeft = StopCount - Stopped, - stop_jobs(OneshotLeft, false, State), - StopCount. - - -start_pending_jobs(State) -> - #state{max_jobs=MaxJobs} = State, - Running = running_job_count(), - Pending = pending_job_count(), - if Running < MaxJobs, Pending > 0 -> - start_jobs(MaxJobs - Running, State); - true -> - ok - end. - - --spec rotate_jobs(#state{}, non_neg_integer()) -> ok. -rotate_jobs(State, ChurnSoFar) -> - #state{max_jobs=MaxJobs, max_churn=MaxChurn} = State, - Running = running_job_count(), - Pending = pending_job_count(), - % Reduce MaxChurn by the number of already stopped jobs in the - % current rescheduling cycle. - Churn = max(0, MaxChurn - ChurnSoFar), - SlotsAvailable = MaxJobs - Running, - if SlotsAvailable >= 0 -> - % If there is are enough SlotsAvailable reduce StopCount to avoid - % unnesessarily stopping jobs. `stop_jobs/3` ignores 0 or negative - % values so we don't worry about that here. - StopCount = lists:min([Pending - SlotsAvailable, Running, Churn]), - stop_jobs(StopCount, true, State), - StartCount = max(0, MaxJobs - running_job_count()), - start_jobs(StartCount, State); - true -> - ok - end. - - --spec last_started(#job{}) -> erlang:timestamp(). -last_started(#job{} = Job) -> - case lists:keyfind(started, 1, Job#job.history) of - false -> - {0, 0, 0}; - {started, When} -> - When - end. - - --spec update_history(#job{}, event_type(), erlang:timestamp(), #state{}) -> - #job{}. -update_history(Job, Type, When, State) -> - History0 = [{Type, When} | Job#job.history], - History1 = lists:sublist(History0, State#state.max_history), - Job#job{history = History1}. - - --spec ejson_url(#httpdb{} | binary()) -> binary(). -ejson_url(#httpdb{}=Httpdb) -> - couch_util:url_strip_password(Httpdb#httpdb.url); -ejson_url(DbName) when is_binary(DbName) -> - DbName. - - --spec job_ejson(#job{}) -> {[_ | _]}. -job_ejson(Job) -> - Rep = Job#job.rep, - Source = ejson_url(Rep#rep.source), - Target = ejson_url(Rep#rep.target), - History = lists:map(fun({Type, When}) -> - EventProps = case Type of - {crashed, Reason} -> - [{type, crashed}, {reason, crash_reason_json(Reason)}]; - Type -> - [{type, Type}] - end, - {[{timestamp, couch_replicator_utils:iso8601(When)} | EventProps]} - end, Job#job.history), - {BaseID, Ext} = Job#job.id, - Pid = case Job#job.pid of - undefined -> - null; - P when is_pid(P) -> - ?l2b(pid_to_list(P)) - end, - {[ - {id, iolist_to_binary([BaseID, Ext])}, - {pid, Pid}, - {source, iolist_to_binary(Source)}, - {target, iolist_to_binary(Target)}, - {database, Rep#rep.db_name}, - {user, (Rep#rep.user_ctx)#user_ctx.name}, - {doc_id, Rep#rep.doc_id}, - {info, couch_replicator_utils:ejson_state_info(Rep#rep.stats)}, - {history, History}, - {node, node()}, - {start_time, couch_replicator_utils:iso8601(Rep#rep.start_time)} - ]}. - - --spec jobs() -> [[tuple()]]. -jobs() -> - ets:foldl(fun(Job, Acc) -> [job_ejson(Job) | Acc] end, [], ?MODULE). - - --spec job(job_id()) -> {ok, {[_ | _]}} | {error, not_found}. -job(JobId) -> - case job_by_id(JobId) of - {ok, Job} -> - {ok, job_ejson(Job)}; - Error -> - Error - end. - - -crash_reason_json({_CrashType, Info}) when is_binary(Info) -> - Info; -crash_reason_json(Reason) when is_binary(Reason) -> - Reason; -crash_reason_json(Error) -> - couch_replicator_utils:rep_error_to_binary(Error). - - --spec last_updated([_]) -> binary(). -last_updated([{_Type, When} | _]) -> - couch_replicator_utils:iso8601(When). - - --spec is_continuous(#job{}) -> boolean(). -is_continuous(#job{rep = Rep}) -> - couch_util:get_value(continuous, Rep#rep.options, false). - - -% If job crashed last time because it was rate limited, try to -% optimize some options to help the job make progress. --spec maybe_optimize_job_for_rate_limiting(#job{}) -> #job{}. -maybe_optimize_job_for_rate_limiting(Job = #job{history = - [{{crashed, max_backoff}, _} | _]}) -> - Opts = [ - {checkpoint_interval, 5000}, - {worker_processes, 2}, - {worker_batch_size, 100}, - {http_connections, 5} - ], - Rep = lists:foldl(fun optimize_int_option/2, Job#job.rep, Opts), - Job#job{rep = Rep}; -maybe_optimize_job_for_rate_limiting(Job) -> - Job. - - --spec optimize_int_option({atom(), any()}, #rep{}) -> #rep{}. -optimize_int_option({Key, Val}, #rep{options = Options} = Rep) -> - case couch_util:get_value(Key, Options) of - CurVal when is_integer(CurVal), CurVal > Val -> - Msg = "~p replication ~p : setting ~p = ~p due to rate limiting", - couch_log:warning(Msg, [?MODULE, Rep#rep.id, Key, Val]), - Options1 = lists:keyreplace(Key, 1, Options, {Key, Val}), - Rep#rep{options = Options1}; - _ -> - Rep - end. - - -% Updater is a separate process. It receives `update_stats` messages and -% updates scheduler stats from the scheduler jobs table. Updates are -% performed no more frequently than once per ?STATS_UPDATE_WAIT milliseconds. - -update_running_jobs_stats(StatsPid) when is_pid(StatsPid) -> - StatsPid ! update_stats, - ok. - - -start_stats_updater() -> - erlang:spawn_link(?MODULE, stats_updater_loop, [undefined]). - - -stats_updater_loop(Timer) -> - receive - update_stats when Timer == undefined -> - TRef = erlang:send_after(?STATS_UPDATE_WAIT, self(), refresh_stats), - ?MODULE:stats_updater_loop(TRef); - update_stats when is_reference(Timer) -> - ?MODULE:stats_updater_loop(Timer); - refresh_stats -> - ok = stats_updater_refresh(), - ?MODULE:stats_updater_loop(undefined); - Else -> - erlang:exit({stats_updater_bad_msg, Else}) - end. - - --spec stats_updater_refresh() -> ok. -stats_updater_refresh() -> - #stats_acc{ - pending_n = PendingN, - running_n = RunningN, - crashed_n = CrashedN - } = ets:foldl(fun stats_fold/2, #stats_acc{}, ?MODULE), - couch_stats:update_gauge([couch_replicator, jobs, pending], PendingN), - couch_stats:update_gauge([couch_replicator, jobs, running], RunningN), - couch_stats:update_gauge([couch_replicator, jobs, crashed], CrashedN), - ok. - - --spec stats_fold(#job{}, #stats_acc{}) -> #stats_acc{}. -stats_fold(#job{pid = undefined, history = [{added, _}]}, Acc) -> - Acc#stats_acc{pending_n = Acc#stats_acc.pending_n + 1}; -stats_fold(#job{pid = undefined, history = [{stopped, _} | _]}, Acc) -> - Acc#stats_acc{pending_n = Acc#stats_acc.pending_n + 1}; -stats_fold(#job{pid = undefined, history = [{{crashed, _}, _} | _]}, Acc) -> - Acc#stats_acc{crashed_n =Acc#stats_acc.crashed_n + 1}; -stats_fold(#job{pid = P, history = [{started, _} | _]}, Acc) when is_pid(P) -> - Acc#stats_acc{running_n = Acc#stats_acc.running_n + 1}. - - --spec existing_replication(#rep{}) -> boolean(). -existing_replication(#rep{} = NewRep) -> - case job_by_id(NewRep#rep.id) of - {ok, #job{rep = CurRep}} -> - NormCurRep = couch_replicator_utils:normalize_rep(CurRep), - NormNewRep = couch_replicator_utils:normalize_rep(NewRep), - NormCurRep == NormNewRep; - {error, not_found} -> - false - end. - - --ifdef(TEST). - --include_lib("eunit/include/eunit.hrl"). - - -backoff_micros_test_() -> - BaseInterval = ?BACKOFF_INTERVAL_MICROS, - [?_assertEqual(R * BaseInterval, backoff_micros(N)) || {R, N} <- [ - {1, 1}, {2, 2}, {4, 3}, {8, 4}, {16, 5}, {32, 6}, {64, 7}, {128, 8}, - {256, 9}, {512, 10}, {1024, 11}, {1024, 12} - ]]. - - -consecutive_crashes_test_() -> - Threshold = ?DEFAULT_HEALTH_THRESHOLD_SEC, - [?_assertEqual(R, consecutive_crashes(H, Threshold)) || {R, H} <- [ - {0, []}, - {0, [added()]}, - {0, [stopped()]}, - {0, [crashed()]}, - {1, [crashed(), added()]}, - {1, [crashed(), crashed()]}, - {1, [crashed(), stopped()]}, - {3, [crashed(), crashed(), crashed(), added()]}, - {2, [crashed(), crashed(), stopped()]}, - {1, [crashed(), started(), added()]}, - {2, [crashed(3), started(2), crashed(1), started(0)]}, - {0, [stopped(3), started(2), crashed(1), started(0)]}, - {1, [crashed(3), started(2), stopped(1), started(0)]}, - {0, [crashed(999), started(0)]}, - {1, [crashed(999), started(998), crashed(997), started(0)]} - ]]. - - -consecutive_crashes_non_default_threshold_test_() -> - [?_assertEqual(R, consecutive_crashes(H, T)) || {R, H, T} <- [ - {0, [crashed(11), started(0)], 10}, - {1, [crashed(10), started(0)], 10} - ]]. - - -latest_crash_timestamp_test_() -> - [?_assertEqual({0, R, 0}, latest_crash_timestamp(H)) || {R, H} <- [ - {0, [added()]}, - {1, [crashed(1)]}, - {3, [crashed(3), started(2), crashed(1), started(0)]}, - {1, [started(3), stopped(2), crashed(1), started(0)]} - ]]. - - -last_started_test_() -> - [?_assertEqual({0, R, 0}, last_started(testjob(H))) || {R, H} <- [ - {0, [added()]}, - {0, [crashed(1)]}, - {1, [started(1)]}, - {1, [added(), started(1)]}, - {2, [started(2), started(1)]}, - {2, [crashed(3), started(2), started(1)]} - ]]. - - -longest_running_test() -> - J0 = testjob([crashed()]), - J1 = testjob([started(1)]), - J2 = testjob([started(2)]), - Sort = fun(Jobs) -> lists:sort(fun longest_running/2, Jobs) end, - ?assertEqual([], Sort([])), - ?assertEqual([J1], Sort([J1])), - ?assertEqual([J1, J2], Sort([J2, J1])), - ?assertEqual([J0, J1, J2], Sort([J2, J1, J0])). - - -scheduler_test_() -> - { - setup, - fun setup_all/0, - fun teardown_all/1, - { - foreach, - fun setup/0, - fun teardown/1, - [ - t_pending_jobs_simple(), - t_pending_jobs_skip_crashed(), - t_one_job_starts(), - t_no_jobs_start_if_max_is_0(), - t_one_job_starts_if_max_is_1(), - t_max_churn_does_not_throttle_initial_start(), - t_excess_oneshot_only_jobs(), - t_excess_continuous_only_jobs(), - t_excess_prefer_continuous_first(), - t_stop_oldest_first(), - t_start_oldest_first(), - t_jobs_churn_even_if_not_all_max_jobs_are_running(), - t_jobs_dont_churn_if_there_are_available_running_slots(), - t_start_only_pending_jobs_do_not_churn_existing_ones(), - t_dont_stop_if_nothing_pending(), - t_max_churn_limits_number_of_rotated_jobs(), - t_existing_jobs(), - t_if_pending_less_than_running_start_all_pending(), - t_running_less_than_pending_swap_all_running(), - t_oneshot_dont_get_rotated(), - t_rotate_continuous_only_if_mixed(), - t_oneshot_dont_get_starting_priority(), - t_oneshot_will_hog_the_scheduler(), - t_if_excess_is_trimmed_rotation_still_happens(), - t_if_transient_job_crashes_it_gets_removed(), - t_if_permanent_job_crashes_it_stays_in_ets(), - t_job_summary_running(), - t_job_summary_pending(), - t_job_summary_crashing_once(), - t_job_summary_crashing_many_times(), - t_job_summary_proxy_fields() - ] - } - }. - - -t_pending_jobs_simple() -> - ?_test(begin - Job1 = oneshot(1), - Job2 = oneshot(2), - setup_jobs([Job2, Job1]), - ?assertEqual([], pending_jobs(0)), - ?assertEqual([Job1], pending_jobs(1)), - ?assertEqual([Job1, Job2], pending_jobs(2)), - ?assertEqual([Job1, Job2], pending_jobs(3)) - end). - - -t_pending_jobs_skip_crashed() -> - ?_test(begin - Job = oneshot(1), - Ts = os:timestamp(), - History = [crashed(Ts), started(Ts) | Job#job.history], - Job1 = Job#job{history = History}, - Job2 = oneshot(2), - Job3 = oneshot(3), - setup_jobs([Job2, Job1, Job3]), - ?assertEqual([Job2], pending_jobs(1)), - ?assertEqual([Job2, Job3], pending_jobs(2)), - ?assertEqual([Job2, Job3], pending_jobs(3)) - end). - - -t_one_job_starts() -> - ?_test(begin - setup_jobs([oneshot(1)]), - ?assertEqual({0, 1}, run_stop_count()), - reschedule(mock_state(?DEFAULT_MAX_JOBS)), - ?assertEqual({1, 0}, run_stop_count()) - end). - - -t_no_jobs_start_if_max_is_0() -> - ?_test(begin - setup_jobs([oneshot(1)]), - reschedule(mock_state(0)), - ?assertEqual({0, 1}, run_stop_count()) - end). - - -t_one_job_starts_if_max_is_1() -> - ?_test(begin - setup_jobs([oneshot(1), oneshot(2)]), - reschedule(mock_state(1)), - ?assertEqual({1, 1}, run_stop_count()) - end). - - -t_max_churn_does_not_throttle_initial_start() -> - ?_test(begin - setup_jobs([oneshot(1), oneshot(2)]), - reschedule(mock_state(?DEFAULT_MAX_JOBS, 0)), - ?assertEqual({2, 0}, run_stop_count()) - end). - - -t_excess_oneshot_only_jobs() -> - ?_test(begin - setup_jobs([oneshot_running(1), oneshot_running(2)]), - ?assertEqual({2, 0}, run_stop_count()), - reschedule(mock_state(1)), - ?assertEqual({1, 1}, run_stop_count()), - reschedule(mock_state(0)), - ?assertEqual({0, 2}, run_stop_count()) - end). - - -t_excess_continuous_only_jobs() -> - ?_test(begin - setup_jobs([continuous_running(1), continuous_running(2)]), - ?assertEqual({2, 0}, run_stop_count()), - reschedule(mock_state(1)), - ?assertEqual({1, 1}, run_stop_count()), - reschedule(mock_state(0)), - ?assertEqual({0, 2}, run_stop_count()) - end). - - -t_excess_prefer_continuous_first() -> - ?_test(begin - Jobs = [ - continuous_running(1), - oneshot_running(2), - continuous_running(3) - ], - setup_jobs(Jobs), - ?assertEqual({3, 0}, run_stop_count()), - ?assertEqual({1, 0}, oneshot_run_stop_count()), - reschedule(mock_state(2)), - ?assertEqual({2, 1}, run_stop_count()), - ?assertEqual({1, 0}, oneshot_run_stop_count()), - reschedule(mock_state(1)), - ?assertEqual({1, 0}, oneshot_run_stop_count()), - reschedule(mock_state(0)), - ?assertEqual({0, 1}, oneshot_run_stop_count()) - end). - - -t_stop_oldest_first() -> - ?_test(begin - Jobs = [ - continuous_running(7), - continuous_running(4), - continuous_running(5) - ], - setup_jobs(Jobs), - reschedule(mock_state(2, 1)), - ?assertEqual({2, 1}, run_stop_count()), - ?assertEqual([4], jobs_stopped()), - reschedule(mock_state(1, 1)), - ?assertEqual([7], jobs_running()) - end). - - -t_start_oldest_first() -> - ?_test(begin - setup_jobs([continuous(7), continuous(2), continuous(5)]), - reschedule(mock_state(1)), - ?assertEqual({1, 2}, run_stop_count()), - ?assertEqual([2], jobs_running()), - reschedule(mock_state(2)), - ?assertEqual({2, 1}, run_stop_count()), - % After rescheduling with max_jobs = 2, 2 was stopped and 5, 7 should - % be running. - ?assertEqual([2], jobs_stopped()) - end). - - -t_jobs_churn_even_if_not_all_max_jobs_are_running() -> - ?_test(begin - setup_jobs([ - continuous_running(7), - continuous(2), - continuous(5) - ]), - reschedule(mock_state(2, 2)), - ?assertEqual({2, 1}, run_stop_count()), - ?assertEqual([7], jobs_stopped()) - end). - - -t_jobs_dont_churn_if_there_are_available_running_slots() -> - ?_test(begin - setup_jobs([ - continuous_running(1), - continuous_running(2) - ]), - reschedule(mock_state(2, 2)), - ?assertEqual({2, 0}, run_stop_count()), - ?assertEqual([], jobs_stopped()), - ?assertEqual(0, meck:num_calls(couch_replicator_scheduler_sup, start_child, 1)) - end). - - -t_start_only_pending_jobs_do_not_churn_existing_ones() -> - ?_test(begin - setup_jobs([ - continuous(1), - continuous_running(2) - ]), - reschedule(mock_state(2, 2)), - ?assertEqual(1, meck:num_calls(couch_replicator_scheduler_sup, start_child, 1)), - ?assertEqual([], jobs_stopped()), - ?assertEqual({2, 0}, run_stop_count()) - end). - - -t_dont_stop_if_nothing_pending() -> - ?_test(begin - setup_jobs([continuous_running(1), continuous_running(2)]), - reschedule(mock_state(2)), - ?assertEqual({2, 0}, run_stop_count()) - end). - - -t_max_churn_limits_number_of_rotated_jobs() -> - ?_test(begin - Jobs = [ - continuous(1), - continuous_running(2), - continuous(3), - continuous_running(4) - ], - setup_jobs(Jobs), - reschedule(mock_state(2, 1)), - ?assertEqual([2, 3], jobs_stopped()) - end). - - -t_if_pending_less_than_running_start_all_pending() -> - ?_test(begin - Jobs = [ - continuous(1), - continuous_running(2), - continuous(3), - continuous_running(4), - continuous_running(5) - ], - setup_jobs(Jobs), - reschedule(mock_state(3)), - ?assertEqual([1, 2, 5], jobs_running()) - end). - - -t_running_less_than_pending_swap_all_running() -> - ?_test(begin - Jobs = [ - continuous(1), - continuous(2), - continuous(3), - continuous_running(4), - continuous_running(5) - ], - setup_jobs(Jobs), - reschedule(mock_state(2)), - ?assertEqual([3, 4, 5], jobs_stopped()) - end). - - -t_oneshot_dont_get_rotated() -> - ?_test(begin - setup_jobs([oneshot_running(1), continuous(2)]), - reschedule(mock_state(1)), - ?assertEqual([1], jobs_running()) - end). - - -t_rotate_continuous_only_if_mixed() -> - ?_test(begin - setup_jobs([continuous(1), oneshot_running(2), continuous_running(3)]), - reschedule(mock_state(2)), - ?assertEqual([1, 2], jobs_running()) - end). - - -t_oneshot_dont_get_starting_priority() -> - ?_test(begin - setup_jobs([continuous(1), oneshot(2), continuous_running(3)]), - reschedule(mock_state(1)), - ?assertEqual([1], jobs_running()) - end). - - -% This tested in other test cases, it is here to mainly make explicit a property -% of one-shot replications -- they can starve other jobs if they "take control" -% of all the available scheduler slots. -t_oneshot_will_hog_the_scheduler() -> - ?_test(begin - Jobs = [ - oneshot_running(1), - oneshot_running(2), - oneshot(3), - continuous(4) - ], - setup_jobs(Jobs), - reschedule(mock_state(2)), - ?assertEqual([1, 2], jobs_running()) - end). - - -t_if_excess_is_trimmed_rotation_still_happens() -> - ?_test(begin - Jobs = [ - continuous(1), - continuous_running(2), - continuous_running(3) - ], - setup_jobs(Jobs), - reschedule(mock_state(1)), - ?assertEqual([1], jobs_running()) - end). - - -t_if_transient_job_crashes_it_gets_removed() -> - ?_test(begin - Pid = mock_pid(), - Job = #job{ - id = job1, - pid = Pid, - history = [added()], - rep = #rep{db_name = null, options = [{continuous, true}]} - }, - setup_jobs([Job]), - ?assertEqual(1, ets:info(?MODULE, size)), - State = #state{max_history = 3, stats_pid = self()}, - {noreply, State} = handle_info({'DOWN', r1, process, Pid, failed}, - State), - ?assertEqual(0, ets:info(?MODULE, size)) - end). - - -t_if_permanent_job_crashes_it_stays_in_ets() -> - ?_test(begin - Pid = mock_pid(), - Job = #job{ - id = job1, - pid = Pid, - history = [added()], - rep = #rep{db_name = <<"db1">>, options = [{continuous, true}]} - }, - setup_jobs([Job]), - ?assertEqual(1, ets:info(?MODULE, size)), - State = #state{max_jobs =1, max_history = 3, stats_pid = self()}, - {noreply, State} = handle_info({'DOWN', r1, process, Pid, failed}, - State), - ?assertEqual(1, ets:info(?MODULE, size)), - [Job1] = ets:lookup(?MODULE, job1), - [Latest | _] = Job1#job.history, - ?assertMatch({{crashed, failed}, _}, Latest) - end). - - -t_existing_jobs() -> - ?_test(begin - Rep = #rep{ - id = job1, - db_name = <<"db">>, - source = <<"s">>, - target = <<"t">>, - options = [{continuous, true}] - }, - setup_jobs([#job{id = Rep#rep.id, rep = Rep}]), - NewRep = #rep{ - id = Rep#rep.id, - db_name = <<"db">>, - source = <<"s">>, - target = <<"t">>, - options = [{continuous, true}] - }, - ?assert(existing_replication(NewRep)), - ?assertNot(existing_replication(NewRep#rep{source = <<"s1">>})), - ?assertNot(existing_replication(NewRep#rep{target = <<"t1">>})), - ?assertNot(existing_replication(NewRep#rep{options = []})) - end). - - -t_job_summary_running() -> - ?_test(begin - Job = #job{ - id = job1, - pid = mock_pid(), - history = [added()], - rep = #rep{ - db_name = <<"db1">>, - source = <<"s">>, - target = <<"t">> - } - }, - setup_jobs([Job]), - Summary = job_summary(job1, ?DEFAULT_HEALTH_THRESHOLD_SEC), - ?assertEqual(running, proplists:get_value(state, Summary)), - ?assertEqual(null, proplists:get_value(info, Summary)), - ?assertEqual(0, proplists:get_value(error_count, Summary)), - - Stats = [{source_seq, <<"1-abc">>}], - handle_cast({update_job_stats, job1, Stats}, mock_state(1)), - Summary1 = job_summary(job1, ?DEFAULT_HEALTH_THRESHOLD_SEC), - ?assertEqual({Stats}, proplists:get_value(info, Summary1)) - end). - - -t_job_summary_pending() -> - ?_test(begin - Job = #job{ - id = job1, - pid = undefined, - history = [stopped(20), started(10), added()], - rep = #rep{source = <<"s">>, target = <<"t">>} - }, - setup_jobs([Job]), - Summary = job_summary(job1, ?DEFAULT_HEALTH_THRESHOLD_SEC), - ?assertEqual(pending, proplists:get_value(state, Summary)), - ?assertEqual(null, proplists:get_value(info, Summary)), - ?assertEqual(0, proplists:get_value(error_count, Summary)), - - Stats = [{doc_write_failures, 1}], - handle_cast({update_job_stats, job1, Stats}, mock_state(1)), - Summary1 = job_summary(job1, ?DEFAULT_HEALTH_THRESHOLD_SEC), - ?assertEqual({Stats}, proplists:get_value(info, Summary1)) - end). - - -t_job_summary_crashing_once() -> - ?_test(begin - Job = #job{ - id = job1, - history = [crashed(?DEFAULT_HEALTH_THRESHOLD_SEC + 1), started(0)], - rep = #rep{source = <<"s">>, target = <<"t">>} - }, - setup_jobs([Job]), - Summary = job_summary(job1, ?DEFAULT_HEALTH_THRESHOLD_SEC), - ?assertEqual(crashing, proplists:get_value(state, Summary)), - Info = proplists:get_value(info, Summary), - ?assertEqual({[{<<"error">>, <<"some_reason">>}]}, Info), - ?assertEqual(0, proplists:get_value(error_count, Summary)) - end). - - -t_job_summary_crashing_many_times() -> - ?_test(begin - Job = #job{ - id = job1, - history = [crashed(4), started(3), crashed(2), started(1)], - rep = #rep{source = <<"s">>, target = <<"t">>} - }, - setup_jobs([Job]), - Summary = job_summary(job1, ?DEFAULT_HEALTH_THRESHOLD_SEC), - ?assertEqual(crashing, proplists:get_value(state, Summary)), - Info = proplists:get_value(info, Summary), - ?assertEqual({[{<<"error">>, <<"some_reason">>}]}, Info), - ?assertEqual(2, proplists:get_value(error_count, Summary)) - end). - - -t_job_summary_proxy_fields() -> - ?_test(begin - Job = #job{ - id = job1, - history = [started(10), added()], - rep = #rep{ - source = #httpdb{ - url = "https://s", - proxy_url = "http://u:p@sproxy:12" - }, - target = #httpdb{ - url = "http://t", - proxy_url = "socks5://u:p@tproxy:34" - } - } - }, - setup_jobs([Job]), - Summary = job_summary(job1, ?DEFAULT_HEALTH_THRESHOLD_SEC), - ?assertEqual(<<"http://u:*****@sproxy:12">>, - proplists:get_value(source_proxy, Summary)), - ?assertEqual(<<"socks5://u:*****@tproxy:34">>, - proplists:get_value(target_proxy, Summary)) - end). - - -% Test helper functions - -setup_all() -> - catch ets:delete(?MODULE), - meck:expect(couch_log, notice, 2, ok), - meck:expect(couch_log, warning, 2, ok), - meck:expect(couch_log, error, 2, ok), - meck:expect(couch_replicator_scheduler_sup, terminate_child, 1, ok), - meck:expect(couch_stats, increment_counter, 1, ok), - meck:expect(couch_stats, update_gauge, 2, ok), - Pid = mock_pid(), - meck:expect(couch_replicator_scheduler_sup, start_child, 1, {ok, Pid}). - - -teardown_all(_) -> - catch ets:delete(?MODULE), - meck:unload(). - - -setup() -> - meck:reset([ - couch_log, - couch_replicator_scheduler_sup, - couch_stats - ]). - - -teardown(_) -> - ok. - - -setup_jobs(Jobs) when is_list(Jobs) -> - ?MODULE = ets:new(?MODULE, [named_table, {keypos, #job.id}]), - ets:insert(?MODULE, Jobs). - - -all_jobs() -> - lists:usort(ets:tab2list(?MODULE)). - - -jobs_stopped() -> - [Job#job.id || Job <- all_jobs(), Job#job.pid =:= undefined]. - - -jobs_running() -> - [Job#job.id || Job <- all_jobs(), Job#job.pid =/= undefined]. - - -run_stop_count() -> - {length(jobs_running()), length(jobs_stopped())}. - - -oneshot_run_stop_count() -> - Running = [Job#job.id || Job <- all_jobs(), Job#job.pid =/= undefined, - not is_continuous(Job)], - Stopped = [Job#job.id || Job <- all_jobs(), Job#job.pid =:= undefined, - not is_continuous(Job)], - {length(Running), length(Stopped)}. - - -mock_state(MaxJobs) -> - #state{ - max_jobs = MaxJobs, - max_churn = ?DEFAULT_MAX_CHURN, - max_history = ?DEFAULT_MAX_HISTORY, - stats_pid = self() - }. - -mock_state(MaxJobs, MaxChurn) -> - #state{ - max_jobs = MaxJobs, - max_churn = MaxChurn, - max_history = ?DEFAULT_MAX_HISTORY, - stats_pid = self() - }. - - -continuous(Id) when is_integer(Id) -> - Started = Id, - Hist = [stopped(Started+1), started(Started), added()], - #job{ - id = Id, - history = Hist, - rep = #rep{options = [{continuous, true}]} - }. - - -continuous_running(Id) when is_integer(Id) -> - Started = Id, - Pid = mock_pid(), - #job{ - id = Id, - history = [started(Started), added()], - rep = #rep{options = [{continuous, true}]}, - pid = Pid, - monitor = monitor(process, Pid) - }. - - -oneshot(Id) when is_integer(Id) -> - Started = Id, - Hist = [stopped(Started + 1), started(Started), added()], - #job{id = Id, history = Hist, rep = #rep{options = []}}. - - -oneshot_running(Id) when is_integer(Id) -> - Started = Id, - Pid = mock_pid(), - #job{ - id = Id, - history = [started(Started), added()], - rep = #rep{options = []}, - pid = Pid, - monitor = monitor(process, Pid) - }. - - -testjob(Hist) when is_list(Hist) -> - #job{history = Hist}. - - -mock_pid() -> - list_to_pid("<0.999.999>"). - -crashed() -> - crashed(0). - - -crashed(WhenSec) when is_integer(WhenSec)-> - {{crashed, some_reason}, {0, WhenSec, 0}}; -crashed({MSec, Sec, USec}) -> - {{crashed, some_reason}, {MSec, Sec, USec}}. - - -started() -> - started(0). - - -started(WhenSec) when is_integer(WhenSec)-> - {started, {0, WhenSec, 0}}; - -started({MSec, Sec, USec}) -> - {started, {MSec, Sec, USec}}. - - -stopped() -> - stopped(0). - - -stopped(WhenSec) -> - {stopped, {0, WhenSec, 0}}. - - -added() -> - {added, {0, 0, 0}}. - --endif. diff --git a/src/couch_replicator/src/couch_replicator_scheduler.hrl b/src/couch_replicator/src/couch_replicator_scheduler.hrl deleted file mode 100644 index 5203b0caa..000000000 --- a/src/couch_replicator/src/couch_replicator_scheduler.hrl +++ /dev/null @@ -1,15 +0,0 @@ -% Licensed under the Apache License, Version 2.0 (the "License"); you may not -% use this file except in compliance with the License. You may obtain a copy of -% the License at -% -% http://www.apache.org/licenses/LICENSE-2.0 -% -% Unless required by applicable law or agreed to in writing, software -% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -% License for the specific language governing permissions and limitations under -% the License. - - --type job_id() :: term(). --type job_args() :: term(). diff --git a/src/couch_replicator/src/couch_replicator_scheduler_job.erl b/src/couch_replicator/src/couch_replicator_scheduler_job.erl deleted file mode 100644 index 0b33419e1..000000000 --- a/src/couch_replicator/src/couch_replicator_scheduler_job.erl +++ /dev/null @@ -1,1090 +0,0 @@ -% Licensed under the Apache License, Version 2.0 (the "License"); you may not -% use this file except in compliance with the License. You may obtain a copy of -% the License at -% -% http://www.apache.org/licenses/LICENSE-2.0 -% -% Unless required by applicable law or agreed to in writing, software -% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -% License for the specific language governing permissions and limitations under -% the License. - --module(couch_replicator_scheduler_job). - --behaviour(gen_server). - --export([ - start_link/1 -]). - --export([ - init/1, - terminate/2, - handle_call/3, - handle_info/2, - handle_cast/2, - code_change/3, - format_status/2 -]). - --include_lib("couch/include/couch_db.hrl"). --include_lib("couch_replicator/include/couch_replicator_api_wrap.hrl"). --include("couch_replicator_scheduler.hrl"). --include("couch_replicator.hrl"). - --import(couch_util, [ - get_value/2, - get_value/3, - to_binary/1 -]). - --import(couch_replicator_utils, [ - pp_rep_id/1 -]). - - --define(LOWEST_SEQ, 0). --define(DEFAULT_CHECKPOINT_INTERVAL, 30000). --define(STARTUP_JITTER_DEFAULT, 5000). - --record(rep_state, { - rep_details, - source_name, - target_name, - source, - target, - history, - checkpoint_history, - start_seq, - committed_seq, - current_through_seq, - seqs_in_progress = [], - highest_seq_done = {0, ?LOWEST_SEQ}, - source_log, - target_log, - rep_starttime, - src_starttime, - tgt_starttime, - timer, % checkpoint timer - changes_queue, - changes_manager, - changes_reader, - workers, - stats = couch_replicator_stats:new(), - session_id, - source_seq = nil, - use_checkpoints = true, - checkpoint_interval = ?DEFAULT_CHECKPOINT_INTERVAL, - type = db, - view = nil -}). - - -start_link(#rep{id = {BaseId, Ext}, source = Src, target = Tgt} = Rep) -> - RepChildId = BaseId ++ Ext, - Source = couch_replicator_api_wrap:db_uri(Src), - Target = couch_replicator_api_wrap:db_uri(Tgt), - ServerName = {global, {?MODULE, Rep#rep.id}}, - - case gen_server:start_link(ServerName, ?MODULE, Rep, []) of - {ok, Pid} -> - {ok, Pid}; - {error, Reason} -> - couch_log:warning("failed to start replication `~s` (`~s` -> `~s`)", - [RepChildId, Source, Target]), - {error, Reason} - end. - - -init(InitArgs) -> - {ok, InitArgs, 0}. - - -do_init(#rep{options = Options, id = {BaseId, Ext}, user_ctx=UserCtx} = Rep) -> - process_flag(trap_exit, true), - - timer:sleep(startup_jitter()), - - #rep_state{ - source = Source, - target = Target, - source_name = SourceName, - target_name = TargetName, - start_seq = {_Ts, StartSeq}, - highest_seq_done = {_, HighestSeq}, - checkpoint_interval = CheckpointInterval - } = State = init_state(Rep), - - NumWorkers = get_value(worker_processes, Options), - BatchSize = get_value(worker_batch_size, Options), - {ok, ChangesQueue} = couch_work_queue:new([ - {max_items, BatchSize * NumWorkers * 2}, - {max_size, 100 * 1024 * NumWorkers} - ]), - % This starts the _changes reader process. It adds the changes from - % the source db to the ChangesQueue. - {ok, ChangesReader} = couch_replicator_changes_reader:start_link( - StartSeq, Source, ChangesQueue, Options - ), - % Changes manager - responsible for dequeing batches from the changes queue - % and deliver them to the worker processes. - ChangesManager = spawn_changes_manager(self(), ChangesQueue, BatchSize), - % This starts the worker processes. They ask the changes queue manager for a - % a batch of _changes rows to process -> check which revs are missing in the - % target, and for the missing ones, it copies them from the source to the target. - MaxConns = get_value(http_connections, Options), - Workers = lists:map( - fun(_) -> - couch_stats:increment_counter([couch_replicator, workers_started]), - {ok, Pid} = couch_replicator_worker:start_link( - self(), Source, Target, ChangesManager, MaxConns), - Pid - end, - lists:seq(1, NumWorkers)), - - couch_task_status:add_task([ - {type, replication}, - {user, UserCtx#user_ctx.name}, - {replication_id, ?l2b(BaseId ++ Ext)}, - {database, Rep#rep.db_name}, - {doc_id, Rep#rep.doc_id}, - {source, ?l2b(SourceName)}, - {target, ?l2b(TargetName)}, - {continuous, get_value(continuous, Options, false)}, - {source_seq, HighestSeq}, - {checkpoint_interval, CheckpointInterval} - ] ++ rep_stats(State)), - couch_task_status:set_update_frequency(1000), - - % Until OTP R14B03: - % - % Restarting a temporary supervised child implies that the original arguments - % (#rep{} record) specified in the MFA component of the supervisor - % child spec will always be used whenever the child is restarted. - % This implies the same replication performance tunning parameters will - % always be used. The solution is to delete the child spec (see - % cancel_replication/1) and then start the replication again, but this is - % unfortunately not immune to race conditions. - - log_replication_start(State), - couch_log:debug("Worker pids are: ~p", [Workers]), - - doc_update_triggered(Rep), - - {ok, State#rep_state{ - changes_queue = ChangesQueue, - changes_manager = ChangesManager, - changes_reader = ChangesReader, - workers = Workers - } - }. - - -handle_call({add_stats, Stats}, From, State) -> - gen_server:reply(From, ok), - NewStats = couch_replicator_utils:sum_stats(State#rep_state.stats, Stats), - {noreply, State#rep_state{stats = NewStats}}; - -handle_call({report_seq_done, Seq, StatsInc}, From, - #rep_state{seqs_in_progress = SeqsInProgress, highest_seq_done = HighestDone, - current_through_seq = ThroughSeq, stats = Stats} = State) -> - gen_server:reply(From, ok), - {NewThroughSeq0, NewSeqsInProgress} = case SeqsInProgress of - [] -> - {Seq, []}; - [Seq | Rest] -> - {Seq, Rest}; - [_ | _] -> - {ThroughSeq, ordsets:del_element(Seq, SeqsInProgress)} - end, - NewHighestDone = lists:max([HighestDone, Seq]), - NewThroughSeq = case NewSeqsInProgress of - [] -> - lists:max([NewThroughSeq0, NewHighestDone]); - _ -> - NewThroughSeq0 - end, - couch_log:debug("Worker reported seq ~p, through seq was ~p, " - "new through seq is ~p, highest seq done was ~p, " - "new highest seq done is ~p~n" - "Seqs in progress were: ~p~nSeqs in progress are now: ~p", - [Seq, ThroughSeq, NewThroughSeq, HighestDone, - NewHighestDone, SeqsInProgress, NewSeqsInProgress]), - NewState = State#rep_state{ - stats = couch_replicator_utils:sum_stats(Stats, StatsInc), - current_through_seq = NewThroughSeq, - seqs_in_progress = NewSeqsInProgress, - highest_seq_done = NewHighestDone - }, - update_task(NewState), - {noreply, NewState}. - - -handle_cast(checkpoint, State) -> - case do_checkpoint(State) of - {ok, NewState} -> - couch_stats:increment_counter([couch_replicator, checkpoints, success]), - {noreply, NewState#rep_state{timer = start_timer(State)}}; - Error -> - couch_stats:increment_counter([couch_replicator, checkpoints, failure]), - {stop, Error, State} - end; - -handle_cast({report_seq, Seq}, - #rep_state{seqs_in_progress = SeqsInProgress} = State) -> - NewSeqsInProgress = ordsets:add_element(Seq, SeqsInProgress), - {noreply, State#rep_state{seqs_in_progress = NewSeqsInProgress}}. - - -handle_info(shutdown, St) -> - {stop, shutdown, St}; - -handle_info({'EXIT', Pid, max_backoff}, State) -> - couch_log:error("Max backoff reached child process ~p", [Pid]), - {stop, {shutdown, max_backoff}, State}; - -handle_info({'EXIT', Pid, {shutdown, max_backoff}}, State) -> - couch_log:error("Max backoff reached child process ~p", [Pid]), - {stop, {shutdown, max_backoff}, State}; - -handle_info({'EXIT', Pid, normal}, #rep_state{changes_reader=Pid} = State) -> - {noreply, State}; - -handle_info({'EXIT', Pid, Reason0}, #rep_state{changes_reader=Pid} = State) -> - couch_stats:increment_counter([couch_replicator, changes_reader_deaths]), - Reason = case Reason0 of - {changes_req_failed, _, _} = HttpFail -> - HttpFail; - {http_request_failed, _, _, {error, {code, Code}}} -> - {changes_req_failed, Code}; - {http_request_failed, _, _, {error, Err}} -> - {changes_req_failed, Err}; - Other -> - {changes_reader_died, Other} - end, - couch_log:error("ChangesReader process died with reason: ~p", [Reason]), - {stop, {shutdown, Reason}, cancel_timer(State)}; - -handle_info({'EXIT', Pid, normal}, #rep_state{changes_manager = Pid} = State) -> - {noreply, State}; - -handle_info({'EXIT', Pid, Reason}, #rep_state{changes_manager = Pid} = State) -> - couch_stats:increment_counter([couch_replicator, changes_manager_deaths]), - couch_log:error("ChangesManager process died with reason: ~p", [Reason]), - {stop, {shutdown, {changes_manager_died, Reason}}, cancel_timer(State)}; - -handle_info({'EXIT', Pid, normal}, #rep_state{changes_queue=Pid} = State) -> - {noreply, State}; - -handle_info({'EXIT', Pid, Reason}, #rep_state{changes_queue=Pid} = State) -> - couch_stats:increment_counter([couch_replicator, changes_queue_deaths]), - couch_log:error("ChangesQueue process died with reason: ~p", [Reason]), - {stop, {shutdown, {changes_queue_died, Reason}}, cancel_timer(State)}; - -handle_info({'EXIT', Pid, normal}, #rep_state{workers = Workers} = State) -> - case Workers -- [Pid] of - Workers -> - couch_log:error("unknown pid bit the dust ~p ~n",[Pid]), - {noreply, State#rep_state{workers = Workers}}; - %% not clear why a stop was here before - %%{stop, {unknown_process_died, Pid, normal}, State}; - [] -> - catch unlink(State#rep_state.changes_manager), - catch exit(State#rep_state.changes_manager, kill), - do_last_checkpoint(State); - Workers2 -> - {noreply, State#rep_state{workers = Workers2}} - end; - -handle_info({'EXIT', Pid, Reason}, #rep_state{workers = Workers} = State) -> - State2 = cancel_timer(State), - case lists:member(Pid, Workers) of - false -> - {stop, {unknown_process_died, Pid, Reason}, State2}; - true -> - couch_stats:increment_counter([couch_replicator, worker_deaths]), - StopReason = case Reason of - {shutdown, _} = Err -> - Err; - Other -> - couch_log:error("Worker ~p died with reason: ~p", [Pid, Reason]), - {worker_died, Pid, Other} - end, - {stop, StopReason, State2} - end; - -handle_info(timeout, InitArgs) -> - try do_init(InitArgs) of {ok, State} -> - {noreply, State} - catch - exit:{http_request_failed, _, _, max_backoff} -> - {stop, {shutdown, max_backoff}, {error, InitArgs}}; - Class:Error -> - ShutdownReason = {error, replication_start_error(Error)}, - StackTop2 = lists:sublist(erlang:get_stacktrace(), 2), - % Shutdown state is a hack as it is not really the state of the - % gen_server (it failed to initialize, so it doesn't have one). - % Shutdown state is used to pass extra info about why start failed. - ShutdownState = {error, Class, StackTop2, InitArgs}, - {stop, {shutdown, ShutdownReason}, ShutdownState} - end. - - -terminate(normal, #rep_state{rep_details = #rep{id = RepId} = Rep, - checkpoint_history = CheckpointHistory} = State) -> - terminate_cleanup(State), - couch_replicator_notifier:notify({finished, RepId, CheckpointHistory}), - doc_update_completed(Rep, rep_stats(State)); - -terminate(shutdown, #rep_state{rep_details = #rep{id = RepId}} = State) -> - % Replication stopped via _scheduler_sup:terminate_child/1, which can be - % occur during regular scheduler operation or when job is removed from - % the scheduler. - State1 = case do_checkpoint(State) of - {ok, NewState} -> - NewState; - Error -> - LogMsg = "~p : Failed last checkpoint. Job: ~p Error: ~p", - couch_log:error(LogMsg, [?MODULE, RepId, Error]), - State - end, - couch_replicator_notifier:notify({stopped, RepId, <<"stopped">>}), - terminate_cleanup(State1); - -terminate({shutdown, max_backoff}, {error, InitArgs}) -> - #rep{id = {BaseId, Ext} = RepId} = InitArgs, - couch_stats:increment_counter([couch_replicator, failed_starts]), - couch_log:warning("Replication `~s` reached max backoff ", [BaseId ++ Ext]), - couch_replicator_notifier:notify({error, RepId, max_backoff}); - -terminate({shutdown, {error, Error}}, {error, Class, Stack, InitArgs}) -> - #rep{ - id = {BaseId, Ext} = RepId, - source = Source0, - target = Target0, - doc_id = DocId, - db_name = DbName - } = InitArgs, - Source = couch_replicator_api_wrap:db_uri(Source0), - Target = couch_replicator_api_wrap:db_uri(Target0), - RepIdStr = BaseId ++ Ext, - Msg = "~p:~p: Replication ~s failed to start ~p -> ~p doc ~p:~p stack:~p", - couch_log:error(Msg, [Class, Error, RepIdStr, Source, Target, DbName, - DocId, Stack]), - couch_stats:increment_counter([couch_replicator, failed_starts]), - couch_replicator_notifier:notify({error, RepId, Error}); - -terminate({shutdown, max_backoff}, State) -> - #rep_state{ - source_name = Source, - target_name = Target, - rep_details = #rep{id = {BaseId, Ext} = RepId} - } = State, - couch_log:error("Replication `~s` (`~s` -> `~s`) reached max backoff", - [BaseId ++ Ext, Source, Target]), - terminate_cleanup(State), - couch_replicator_notifier:notify({error, RepId, max_backoff}); - -terminate({shutdown, Reason}, State) -> - % Unwrap so when reporting we don't have an extra {shutdown, ...} tuple - % wrapped around the message - terminate(Reason, State); - -terminate(Reason, State) -> -#rep_state{ - source_name = Source, - target_name = Target, - rep_details = #rep{id = {BaseId, Ext} = RepId} - } = State, - couch_log:error("Replication `~s` (`~s` -> `~s`) failed: ~s", - [BaseId ++ Ext, Source, Target, to_binary(Reason)]), - terminate_cleanup(State), - couch_replicator_notifier:notify({error, RepId, Reason}). - -terminate_cleanup(State) -> - update_task(State), - couch_replicator_api_wrap:db_close(State#rep_state.source), - couch_replicator_api_wrap:db_close(State#rep_state.target). - - -code_change(_OldVsn, #rep_state{}=State, _Extra) -> - {ok, State}. - - -format_status(_Opt, [_PDict, State]) -> - #rep_state{ - source = Source, - target = Target, - rep_details = RepDetails, - start_seq = StartSeq, - source_seq = SourceSeq, - committed_seq = CommitedSeq, - current_through_seq = ThroughSeq, - highest_seq_done = HighestSeqDone, - session_id = SessionId - } = state_strip_creds(State), - #rep{ - id = RepId, - options = Options, - doc_id = DocId, - db_name = DbName - } = RepDetails, - [ - {rep_id, RepId}, - {source, couch_replicator_api_wrap:db_uri(Source)}, - {target, couch_replicator_api_wrap:db_uri(Target)}, - {db_name, DbName}, - {doc_id, DocId}, - {options, Options}, - {session_id, SessionId}, - {start_seq, StartSeq}, - {source_seq, SourceSeq}, - {committed_seq, CommitedSeq}, - {current_through_seq, ThroughSeq}, - {highest_seq_done, HighestSeqDone} - ]. - - -startup_jitter() -> - Jitter = config:get_integer("replicator", "startup_jitter", - ?STARTUP_JITTER_DEFAULT), - couch_rand:uniform(erlang:max(1, Jitter)). - - -headers_strip_creds([], Acc) -> - lists:reverse(Acc); -headers_strip_creds([{Key, Value0} | Rest], Acc) -> - Value = case string:to_lower(Key) of - "authorization" -> - "****"; - _ -> - Value0 - end, - headers_strip_creds(Rest, [{Key, Value} | Acc]). - - -httpdb_strip_creds(#httpdb{url = Url, headers = Headers} = HttpDb) -> - HttpDb#httpdb{ - url = couch_util:url_strip_password(Url), - headers = headers_strip_creds(Headers, []) - }; -httpdb_strip_creds(LocalDb) -> - LocalDb. - - -rep_strip_creds(#rep{source = Source, target = Target} = Rep) -> - Rep#rep{ - source = httpdb_strip_creds(Source), - target = httpdb_strip_creds(Target) - }. - - -state_strip_creds(#rep_state{rep_details = Rep, source = Source, target = Target} = State) -> - % #rep_state contains the source and target at the top level and also - % in the nested #rep_details record - State#rep_state{ - rep_details = rep_strip_creds(Rep), - source = httpdb_strip_creds(Source), - target = httpdb_strip_creds(Target) - }. - - -adjust_maxconn(Src = #httpdb{http_connections = 1}, RepId) -> - Msg = "Adjusting minimum number of HTTP source connections to 2 for ~p", - couch_log:notice(Msg, [RepId]), - Src#httpdb{http_connections = 2}; -adjust_maxconn(Src, _RepId) -> - Src. - - --spec doc_update_triggered(#rep{}) -> ok. -doc_update_triggered(#rep{db_name = null}) -> - ok; -doc_update_triggered(#rep{id = RepId, doc_id = DocId} = Rep) -> - case couch_replicator_doc_processor:update_docs() of - true -> - couch_replicator_docs:update_triggered(Rep, RepId); - false -> - ok - end, - couch_log:notice("Document `~s` triggered replication `~s`", - [DocId, pp_rep_id(RepId)]), - ok. - - --spec doc_update_completed(#rep{}, list()) -> ok. -doc_update_completed(#rep{db_name = null}, _Stats) -> - ok; -doc_update_completed(#rep{id = RepId, doc_id = DocId, db_name = DbName, - start_time = StartTime}, Stats0) -> - Stats = Stats0 ++ [{start_time, couch_replicator_utils:iso8601(StartTime)}], - couch_replicator_docs:update_doc_completed(DbName, DocId, Stats), - couch_log:notice("Replication `~s` completed (triggered by `~s`)", - [pp_rep_id(RepId), DocId]), - ok. - - -do_last_checkpoint(#rep_state{seqs_in_progress = [], - highest_seq_done = {_Ts, ?LOWEST_SEQ}} = State) -> - {stop, normal, cancel_timer(State)}; -do_last_checkpoint(#rep_state{seqs_in_progress = [], - highest_seq_done = Seq} = State) -> - case do_checkpoint(State#rep_state{current_through_seq = Seq}) of - {ok, NewState} -> - couch_stats:increment_counter([couch_replicator, checkpoints, success]), - {stop, normal, cancel_timer(NewState)}; - Error -> - couch_stats:increment_counter([couch_replicator, checkpoints, failure]), - {stop, Error, State} - end. - - -start_timer(State) -> - After = State#rep_state.checkpoint_interval, - case timer:apply_after(After, gen_server, cast, [self(), checkpoint]) of - {ok, Ref} -> - Ref; - Error -> - couch_log:error("Replicator, error scheduling checkpoint: ~p", [Error]), - nil - end. - - -cancel_timer(#rep_state{timer = nil} = State) -> - State; -cancel_timer(#rep_state{timer = Timer} = State) -> - {ok, cancel} = timer:cancel(Timer), - State#rep_state{timer = nil}. - - -init_state(Rep) -> - #rep{ - id = {BaseId, _Ext}, - source = Src0, target = Tgt, - options = Options, - type = Type, view = View, - start_time = StartTime, - stats = ArgStats0 - } = Rep, - % Adjust minimum number of http source connections to 2 to avoid deadlock - Src = adjust_maxconn(Src0, BaseId), - {ok, Source} = couch_replicator_api_wrap:db_open(Src), - {CreateTargetParams} = get_value(create_target_params, Options, {[]}), - {ok, Target} = couch_replicator_api_wrap:db_open(Tgt, - get_value(create_target, Options, false), CreateTargetParams), - - {ok, SourceInfo} = couch_replicator_api_wrap:get_db_info(Source), - {ok, TargetInfo} = couch_replicator_api_wrap:get_db_info(Target), - - [SourceLog, TargetLog] = find_and_migrate_logs([Source, Target], Rep), - - {StartSeq0, History} = compare_replication_logs(SourceLog, TargetLog), - - ArgStats1 = couch_replicator_stats:new(ArgStats0), - HistoryStats = case History of - [{[_ | _] = HProps} | _] -> couch_replicator_stats:new(HProps); - _ -> couch_replicator_stats:new() - end, - Stats = couch_replicator_stats:max_stats(ArgStats1, HistoryStats), - - StartSeq1 = get_value(since_seq, Options, StartSeq0), - StartSeq = {0, StartSeq1}, - - SourceSeq = get_value(<<"update_seq">>, SourceInfo, ?LOWEST_SEQ), - - #doc{body={CheckpointHistory}} = SourceLog, - State = #rep_state{ - rep_details = Rep, - source_name = couch_replicator_api_wrap:db_uri(Source), - target_name = couch_replicator_api_wrap:db_uri(Target), - source = Source, - target = Target, - history = History, - checkpoint_history = {[{<<"no_changes">>, true}| CheckpointHistory]}, - start_seq = StartSeq, - current_through_seq = StartSeq, - committed_seq = StartSeq, - source_log = SourceLog, - target_log = TargetLog, - rep_starttime = StartTime, - src_starttime = get_value(<<"instance_start_time">>, SourceInfo), - tgt_starttime = get_value(<<"instance_start_time">>, TargetInfo), - session_id = couch_uuids:random(), - source_seq = SourceSeq, - use_checkpoints = get_value(use_checkpoints, Options, true), - checkpoint_interval = get_value(checkpoint_interval, Options, - ?DEFAULT_CHECKPOINT_INTERVAL), - type = Type, - view = View, - stats = Stats - }, - State#rep_state{timer = start_timer(State)}. - - -find_and_migrate_logs(DbList, #rep{id = {BaseId, _}} = Rep) -> - LogId = ?l2b(?LOCAL_DOC_PREFIX ++ BaseId), - fold_replication_logs(DbList, ?REP_ID_VERSION, LogId, LogId, Rep, []). - - -fold_replication_logs([], _Vsn, _LogId, _NewId, _Rep, Acc) -> - lists:reverse(Acc); - -fold_replication_logs([Db | Rest] = Dbs, Vsn, LogId, NewId, Rep, Acc) -> - case couch_replicator_api_wrap:open_doc(Db, LogId, [ejson_body]) of - {error, <<"not_found">>} when Vsn > 1 -> - OldRepId = couch_replicator_utils:replication_id(Rep, Vsn - 1), - fold_replication_logs(Dbs, Vsn - 1, - ?l2b(?LOCAL_DOC_PREFIX ++ OldRepId), NewId, Rep, Acc); - {error, <<"not_found">>} -> - fold_replication_logs( - Rest, ?REP_ID_VERSION, NewId, NewId, Rep, [#doc{id = NewId} | Acc]); - {ok, Doc} when LogId =:= NewId -> - fold_replication_logs( - Rest, ?REP_ID_VERSION, NewId, NewId, Rep, [Doc | Acc]); - {ok, Doc} -> - MigratedLog = #doc{id = NewId, body = Doc#doc.body}, - maybe_save_migrated_log(Rep, Db, MigratedLog, Doc#doc.id), - fold_replication_logs( - Rest, ?REP_ID_VERSION, NewId, NewId, Rep, [MigratedLog | Acc]) - end. - - -maybe_save_migrated_log(Rep, Db, #doc{} = Doc, OldId) -> - case get_value(use_checkpoints, Rep#rep.options, true) of - true -> - update_checkpoint(Db, Doc), - Msg = "Migrated replication checkpoint. Db:~p ~p -> ~p", - couch_log:notice(Msg, [httpdb_strip_creds(Db), OldId, Doc#doc.id]); - false -> - ok - end. - - -spawn_changes_manager(Parent, ChangesQueue, BatchSize) -> - spawn_link(fun() -> - changes_manager_loop_open(Parent, ChangesQueue, BatchSize, 1) - end). - - -changes_manager_loop_open(Parent, ChangesQueue, BatchSize, Ts) -> - receive - {get_changes, From} -> - case couch_work_queue:dequeue(ChangesQueue, BatchSize) of - closed -> - From ! {closed, self()}; - {ok, ChangesOrLastSeqs} -> - ReportSeq = case lists:last(ChangesOrLastSeqs) of - {last_seq, Seq} -> - {Ts, Seq}; - #doc_info{high_seq = Seq} -> - {Ts, Seq} - end, - Changes = lists:filter( - fun(#doc_info{}) -> - true; - ({last_seq, _Seq}) -> - false - end, ChangesOrLastSeqs), - ok = gen_server:cast(Parent, {report_seq, ReportSeq}), - From ! {changes, self(), Changes, ReportSeq} - end, - changes_manager_loop_open(Parent, ChangesQueue, BatchSize, Ts + 1) - end. - - -do_checkpoint(#rep_state{use_checkpoints=false} = State) -> - NewState = State#rep_state{checkpoint_history = {[{<<"use_checkpoints">>, false}]} }, - {ok, NewState}; -do_checkpoint(#rep_state{current_through_seq=Seq, committed_seq=Seq} = State) -> - update_task(State), - {ok, State}; -do_checkpoint(State) -> - #rep_state{ - source_name=SourceName, - target_name=TargetName, - source = Source, - target = Target, - history = OldHistory, - start_seq = {_, StartSeq}, - current_through_seq = {_Ts, NewSeq} = NewTsSeq, - source_log = SourceLog, - target_log = TargetLog, - rep_starttime = ReplicationStartTime, - src_starttime = SrcInstanceStartTime, - tgt_starttime = TgtInstanceStartTime, - stats = Stats, - rep_details = #rep{options = Options}, - session_id = SessionId - } = State, - case commit_to_both(Source, Target) of - {source_error, Reason} -> - {checkpoint_commit_failure, - <<"Failure on source commit: ", (to_binary(Reason))/binary>>}; - {target_error, Reason} -> - {checkpoint_commit_failure, - <<"Failure on target commit: ", (to_binary(Reason))/binary>>}; - {SrcInstanceStartTime, TgtInstanceStartTime} -> - couch_log:notice("recording a checkpoint for `~s` -> `~s` at source update_seq ~p", - [SourceName, TargetName, NewSeq]), - LocalStartTime = calendar:now_to_local_time(ReplicationStartTime), - StartTime = ?l2b(httpd_util:rfc1123_date(LocalStartTime)), - EndTime = ?l2b(httpd_util:rfc1123_date()), - NewHistoryEntry = {[ - {<<"session_id">>, SessionId}, - {<<"start_time">>, StartTime}, - {<<"end_time">>, EndTime}, - {<<"start_last_seq">>, StartSeq}, - {<<"end_last_seq">>, NewSeq}, - {<<"recorded_seq">>, NewSeq}, - {<<"missing_checked">>, couch_replicator_stats:missing_checked(Stats)}, - {<<"missing_found">>, couch_replicator_stats:missing_found(Stats)}, - {<<"docs_read">>, couch_replicator_stats:docs_read(Stats)}, - {<<"docs_written">>, couch_replicator_stats:docs_written(Stats)}, - {<<"doc_write_failures">>, couch_replicator_stats:doc_write_failures(Stats)} - ]}, - BaseHistory = [ - {<<"session_id">>, SessionId}, - {<<"source_last_seq">>, NewSeq}, - {<<"replication_id_version">>, ?REP_ID_VERSION} - ] ++ case get_value(doc_ids, Options) of - undefined -> - []; - _DocIds -> - % backwards compatibility with the result of a replication by - % doc IDs in versions 0.11.x and 1.0.x - % TODO: deprecate (use same history format, simplify code) - [ - {<<"start_time">>, StartTime}, - {<<"end_time">>, EndTime}, - {<<"docs_read">>, couch_replicator_stats:docs_read(Stats)}, - {<<"docs_written">>, couch_replicator_stats:docs_written(Stats)}, - {<<"doc_write_failures">>, couch_replicator_stats:doc_write_failures(Stats)} - ] - end, - % limit history to 50 entries - NewRepHistory = { - BaseHistory ++ - [{<<"history">>, lists:sublist([NewHistoryEntry | OldHistory], 50)}] - }, - - try - {SrcRevPos, SrcRevId} = update_checkpoint( - Source, SourceLog#doc{body = NewRepHistory}, source), - {TgtRevPos, TgtRevId} = update_checkpoint( - Target, TargetLog#doc{body = NewRepHistory}, target), - NewState = State#rep_state{ - checkpoint_history = NewRepHistory, - committed_seq = NewTsSeq, - source_log = SourceLog#doc{revs={SrcRevPos, [SrcRevId]}}, - target_log = TargetLog#doc{revs={TgtRevPos, [TgtRevId]}} - }, - update_task(NewState), - {ok, NewState} - catch throw:{checkpoint_commit_failure, _} = Failure -> - Failure - end; - {SrcInstanceStartTime, _NewTgtInstanceStartTime} -> - {checkpoint_commit_failure, <<"Target database out of sync. " - "Try to increase max_dbs_open at the target's server.">>}; - {_NewSrcInstanceStartTime, TgtInstanceStartTime} -> - {checkpoint_commit_failure, <<"Source database out of sync. " - "Try to increase max_dbs_open at the source's server.">>}; - {_NewSrcInstanceStartTime, _NewTgtInstanceStartTime} -> - {checkpoint_commit_failure, <<"Source and target databases out of " - "sync. Try to increase max_dbs_open at both servers.">>} - end. - - -update_checkpoint(Db, Doc, DbType) -> - try - update_checkpoint(Db, Doc) - catch throw:{checkpoint_commit_failure, Reason} -> - throw({checkpoint_commit_failure, - <<"Error updating the ", (to_binary(DbType))/binary, - " checkpoint document: ", (to_binary(Reason))/binary>>}) - end. - - -update_checkpoint(Db, #doc{id = LogId, body = LogBody} = Doc) -> - try - case couch_replicator_api_wrap:update_doc(Db, Doc, [delay_commit]) of - {ok, PosRevId} -> - PosRevId; - {error, Reason} -> - throw({checkpoint_commit_failure, Reason}) - end - catch throw:conflict -> - case (catch couch_replicator_api_wrap:open_doc(Db, LogId, [ejson_body])) of - {ok, #doc{body = LogBody, revs = {Pos, [RevId | _]}}} -> - % This means that we were able to update successfully the - % checkpoint doc in a previous attempt but we got a connection - % error (timeout for e.g.) before receiving the success response. - % Therefore the request was retried and we got a conflict, as the - % revision we sent is not the current one. - % We confirm this by verifying the doc body we just got is the same - % that we have just sent. - {Pos, RevId}; - _ -> - throw({checkpoint_commit_failure, conflict}) - end - end. - - -commit_to_both(Source, Target) -> - % commit the src async - ParentPid = self(), - SrcCommitPid = spawn_link( - fun() -> - Result = (catch couch_replicator_api_wrap:ensure_full_commit(Source)), - ParentPid ! {self(), Result} - end), - - % commit tgt sync - TargetResult = (catch couch_replicator_api_wrap:ensure_full_commit(Target)), - - SourceResult = receive - {SrcCommitPid, Result} -> - unlink(SrcCommitPid), - receive {'EXIT', SrcCommitPid, _} -> ok after 0 -> ok end, - Result; - {'EXIT', SrcCommitPid, Reason} -> - {error, Reason} - end, - case TargetResult of - {ok, TargetStartTime} -> - case SourceResult of - {ok, SourceStartTime} -> - {SourceStartTime, TargetStartTime}; - SourceError -> - {source_error, SourceError} - end; - TargetError -> - {target_error, TargetError} - end. - - -compare_replication_logs(SrcDoc, TgtDoc) -> - #doc{body={RepRecProps}} = SrcDoc, - #doc{body={RepRecPropsTgt}} = TgtDoc, - case get_value(<<"session_id">>, RepRecProps) == - get_value(<<"session_id">>, RepRecPropsTgt) of - true -> - % if the records have the same session id, - % then we have a valid replication history - OldSeqNum = get_value(<<"source_last_seq">>, RepRecProps, ?LOWEST_SEQ), - OldHistory = get_value(<<"history">>, RepRecProps, []), - {OldSeqNum, OldHistory}; - false -> - SourceHistory = get_value(<<"history">>, RepRecProps, []), - TargetHistory = get_value(<<"history">>, RepRecPropsTgt, []), - couch_log:notice("Replication records differ. " - "Scanning histories to find a common ancestor.", []), - couch_log:debug("Record on source:~p~nRecord on target:~p~n", - [RepRecProps, RepRecPropsTgt]), - compare_rep_history(SourceHistory, TargetHistory) - end. - - -compare_rep_history(S, T) when S =:= [] orelse T =:= [] -> - couch_log:notice("no common ancestry -- performing full replication", []), - {?LOWEST_SEQ, []}; -compare_rep_history([{S} | SourceRest], [{T} | TargetRest] = Target) -> - SourceId = get_value(<<"session_id">>, S), - case has_session_id(SourceId, Target) of - true -> - RecordSeqNum = get_value(<<"recorded_seq">>, S, ?LOWEST_SEQ), - couch_log:notice("found a common replication record with source_seq ~p", - [RecordSeqNum]), - {RecordSeqNum, SourceRest}; - false -> - TargetId = get_value(<<"session_id">>, T), - case has_session_id(TargetId, SourceRest) of - true -> - RecordSeqNum = get_value(<<"recorded_seq">>, T, ?LOWEST_SEQ), - couch_log:notice("found a common replication record with source_seq ~p", - [RecordSeqNum]), - {RecordSeqNum, TargetRest}; - false -> - compare_rep_history(SourceRest, TargetRest) - end - end. - - -has_session_id(_SessionId, []) -> - false; -has_session_id(SessionId, [{Props} | Rest]) -> - case get_value(<<"session_id">>, Props, nil) of - SessionId -> - true; - _Else -> - has_session_id(SessionId, Rest) - end. - - -get_pending_count(St) -> - Rep = St#rep_state.rep_details, - Timeout = get_value(connection_timeout, Rep#rep.options), - TimeoutMicro = Timeout * 1000, - case get(pending_count_state) of - {LastUpdate, PendingCount} -> - case timer:now_diff(os:timestamp(), LastUpdate) > TimeoutMicro of - true -> - NewPendingCount = get_pending_count_int(St), - put(pending_count_state, {os:timestamp(), NewPendingCount}), - NewPendingCount; - false -> - PendingCount - end; - undefined -> - NewPendingCount = get_pending_count_int(St), - put(pending_count_state, {os:timestamp(), NewPendingCount}), - NewPendingCount - end. - - -get_pending_count_int(#rep_state{source = #httpdb{} = Db0}=St) -> - {_, Seq} = St#rep_state.highest_seq_done, - Db = Db0#httpdb{retries = 3}, - case (catch couch_replicator_api_wrap:get_pending_count(Db, Seq)) of - {ok, Pending} -> - Pending; - _ -> - null - end; -get_pending_count_int(#rep_state{source = Db}=St) -> - {_, Seq} = St#rep_state.highest_seq_done, - {ok, Pending} = couch_replicator_api_wrap:get_pending_count(Db, Seq), - Pending. - - -update_task(State) -> - #rep_state{ - rep_details = #rep{id = JobId}, - current_through_seq = {_, ThroughSeq}, - highest_seq_done = {_, HighestSeq} - } = State, - Status = rep_stats(State) ++ [ - {source_seq, HighestSeq}, - {through_seq, ThroughSeq} - ], - couch_replicator_scheduler:update_job_stats(JobId, Status), - couch_task_status:update(Status). - - -rep_stats(State) -> - #rep_state{ - committed_seq = {_, CommittedSeq}, - stats = Stats - } = State, - [ - {revisions_checked, couch_replicator_stats:missing_checked(Stats)}, - {missing_revisions_found, couch_replicator_stats:missing_found(Stats)}, - {docs_read, couch_replicator_stats:docs_read(Stats)}, - {docs_written, couch_replicator_stats:docs_written(Stats)}, - {changes_pending, get_pending_count(State)}, - {doc_write_failures, couch_replicator_stats:doc_write_failures(Stats)}, - {checkpointed_source_seq, CommittedSeq} - ]. - - -replication_start_error({unauthorized, DbUri}) -> - {unauthorized, <<"unauthorized to access or create database ", DbUri/binary>>}; -replication_start_error({db_not_found, DbUri}) -> - {db_not_found, <<"could not open ", DbUri/binary>>}; -replication_start_error({http_request_failed, _Method, Url0, - {error, {error, {conn_failed, {error, nxdomain}}}}}) -> - Url = ?l2b(couch_util:url_strip_password(Url0)), - {nxdomain, <<"could not resolve ", Url/binary>>}; -replication_start_error({http_request_failed, Method0, Url0, - {error, {code, Code}}}) when is_integer(Code) -> - Url = ?l2b(couch_util:url_strip_password(Url0)), - Method = ?l2b(Method0), - {http_error_code, Code, <<Method/binary, " ", Url/binary>>}; -replication_start_error(Error) -> - Error. - - -log_replication_start(#rep_state{rep_details = Rep} = RepState) -> - #rep{ - id = {BaseId, Ext}, - doc_id = DocId, - db_name = DbName, - options = Options - } = Rep, - Id = BaseId ++ Ext, - Workers = get_value(worker_processes, Options), - BatchSize = get_value(worker_batch_size, Options), - #rep_state{ - source_name = Source, % credentials already stripped - target_name = Target, % credentials already stripped - session_id = Sid - } = RepState, - From = case DbName of - ShardName when is_binary(ShardName) -> - io_lib:format("from doc ~s:~s", [mem3:dbname(ShardName), DocId]); - _ -> - "from _replicate endpoint" - end, - Msg = "Starting replication ~s (~s -> ~s) ~s worker_procesess:~p" - " worker_batch_size:~p session_id:~s", - couch_log:notice(Msg, [Id, Source, Target, From, Workers, BatchSize, Sid]). - - --ifdef(TEST). - --include_lib("eunit/include/eunit.hrl"). - - -replication_start_error_test() -> - ?assertEqual({unauthorized, <<"unauthorized to access or create database" - " http://x/y">>}, replication_start_error({unauthorized, - <<"http://x/y">>})), - ?assertEqual({db_not_found, <<"could not open http://x/y">>}, - replication_start_error({db_not_found, <<"http://x/y">>})), - ?assertEqual({nxdomain,<<"could not resolve http://x/y">>}, - replication_start_error({http_request_failed, "GET", "http://x/y", - {error, {error, {conn_failed, {error, nxdomain}}}}})), - ?assertEqual({http_error_code,503,<<"GET http://x/y">>}, - replication_start_error({http_request_failed, "GET", "http://x/y", - {error, {code, 503}}})). - - -scheduler_job_format_status_test() -> - Source = <<"http://u:p@h1/d1">>, - Target = <<"http://u:p@h2/d2">>, - Rep = #rep{ - id = {"base", "+ext"}, - source = couch_replicator_docs:parse_rep_db(Source, [], []), - target = couch_replicator_docs:parse_rep_db(Target, [], []), - options = [{create_target, true}], - doc_id = <<"mydoc">>, - db_name = <<"mydb">> - }, - State = #rep_state{ - rep_details = Rep, - source = Rep#rep.source, - target = Rep#rep.target, - session_id = <<"a">>, - start_seq = <<"1">>, - source_seq = <<"2">>, - committed_seq = <<"3">>, - current_through_seq = <<"4">>, - highest_seq_done = <<"5">> - }, - Format = format_status(opts_ignored, [pdict, State]), - ?assertEqual("http://u:*****@h1/d1/", proplists:get_value(source, Format)), - ?assertEqual("http://u:*****@h2/d2/", proplists:get_value(target, Format)), - ?assertEqual({"base", "+ext"}, proplists:get_value(rep_id, Format)), - ?assertEqual([{create_target, true}], proplists:get_value(options, Format)), - ?assertEqual(<<"mydoc">>, proplists:get_value(doc_id, Format)), - ?assertEqual(<<"mydb">>, proplists:get_value(db_name, Format)), - ?assertEqual(<<"a">>, proplists:get_value(session_id, Format)), - ?assertEqual(<<"1">>, proplists:get_value(start_seq, Format)), - ?assertEqual(<<"2">>, proplists:get_value(source_seq, Format)), - ?assertEqual(<<"3">>, proplists:get_value(committed_seq, Format)), - ?assertEqual(<<"4">>, proplists:get_value(current_through_seq, Format)), - ?assertEqual(<<"5">>, proplists:get_value(highest_seq_done, Format)). - - --endif. diff --git a/src/couch_replicator/src/couch_replicator_scheduler_sup.erl b/src/couch_replicator/src/couch_replicator_scheduler_sup.erl deleted file mode 100644 index 8ab55f838..000000000 --- a/src/couch_replicator/src/couch_replicator_scheduler_sup.erl +++ /dev/null @@ -1,62 +0,0 @@ -% Licensed under the Apache License, Version 2.0 (the "License"); you may not -% use this file except in compliance with the License. You may obtain a copy of -% the License at -% -% http://www.apache.org/licenses/LICENSE-2.0 -% -% Unless required by applicable law or agreed to in writing, software -% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -% License for the specific language governing permissions and limitations under -% the License. - --module(couch_replicator_scheduler_sup). - --behaviour(supervisor). - -%% public api --export([ - start_link/0, - start_child/1, - terminate_child/1 -]). - -%% supervisor api --export([ - init/1 -]). - - -%% includes --include("couch_replicator.hrl"). - - -%% public functions - -start_link() -> - supervisor:start_link({local, ?MODULE}, ?MODULE, []). - - -start_child(#rep{} = Rep) -> - supervisor:start_child(?MODULE, [Rep]). - - -terminate_child(Pid) -> - supervisor:terminate_child(?MODULE, Pid). - -%% supervisor functions - -init(_Args) -> - Start = {couch_replicator_scheduler_job, start_link, []}, - Restart = temporary, % A crashed job is not entitled to immediate restart. - Shutdown = 5000, - Type = worker, - Modules = [couch_replicator_scheduler_job], - - RestartStrategy = simple_one_for_one, - MaxR = 10, - MaxT = 3, - - ChildSpec = - {undefined, Start, Restart, Shutdown, Type, Modules}, - {ok, {{RestartStrategy, MaxR, MaxT}, [ChildSpec]}}. diff --git a/src/couch_replicator/test/eunit/couch_replicator_compact_tests.erl b/src/couch_replicator/test/eunit/couch_replicator_compact_tests.erl deleted file mode 100644 index 997c84863..000000000 --- a/src/couch_replicator/test/eunit/couch_replicator_compact_tests.erl +++ /dev/null @@ -1,455 +0,0 @@ -% Licensed under the Apache License, Version 2.0 (the "License"); you may not -% use this file except in compliance with the License. You may obtain a copy of -% the License at -% -% http://www.apache.org/licenses/LICENSE-2.0 -% -% Unless required by applicable law or agreed to in writing, software -% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -% License for the specific language governing permissions and limitations under -% the License. - --module(couch_replicator_compact_tests). - --include_lib("couch/include/couch_eunit.hrl"). --include_lib("couch/include/couch_db.hrl"). --include_lib("couch_replicator/src/couch_replicator.hrl"). - --import(couch_replicator_test_helper, [ - db_url/1, - get_pid/1 -]). - --define(ATTFILE, filename:join([?FIXTURESDIR, "logo.png"])). --define(DELAY, 500). --define(TIMEOUT, 360000). --define(TIMEOUT_WRITER, 100000). --define(TIMEOUT_EUNIT, ?TIMEOUT div 1000 + 70). --define(WRITE_BATCH_SIZE, 25). - -setup() -> - DbName = ?tempdb(), - {ok, Db} = couch_db:create(DbName, [?ADMIN_CTX]), - ok = couch_db:close(Db), - DbName. - -setup(remote) -> - {remote, setup()}; -setup({A, B}) -> - Ctx = test_util:start_couch([couch_replicator]), - Source = setup(A), - Target = setup(B), - {Ctx, {Source, Target}}. - -teardown({remote, DbName}) -> - teardown(DbName); -teardown(DbName) -> - ok = couch_server:delete(DbName, [?ADMIN_CTX]), - ok. - -teardown(_, {Ctx, {Source, Target}}) -> - teardown(Source), - teardown(Target), - ok = application:stop(couch_replicator), - ok = test_util:stop_couch(Ctx). - -compact_test_() -> - Pairs = [{remote, remote}], - { - "Compaction during replication tests", - { - foreachx, - fun setup/1, fun teardown/2, - [{Pair, fun should_populate_replicate_compact/2} - || Pair <- Pairs] - } - }. - - -should_populate_replicate_compact({From, To}, {_Ctx, {Source, Target}}) -> - {ok, RepPid, RepId} = replicate(Source, Target), - {lists:flatten(io_lib:format("~p -> ~p", [From, To])), - {inorder, [ - should_run_replication(RepPid, RepId, Source, Target), - should_all_processes_be_alive(RepPid, Source, Target), - should_populate_and_compact(RepPid, Source, Target, 50, 3), - should_wait_target_in_sync(Source, Target), - should_ensure_replication_still_running(RepPid, RepId, Source, Target), - should_cancel_replication(RepId, RepPid), - should_compare_databases(Source, Target) - ]}}. - -should_all_processes_be_alive(RepPid, Source, Target) -> - ?_test(begin - {ok, SourceDb} = reopen_db(Source), - {ok, TargetDb} = reopen_db(Target), - ?assert(is_process_alive(RepPid)), - ?assert(is_process_alive(couch_db:get_pid(SourceDb))), - ?assert(is_process_alive(couch_db:get_pid(TargetDb))) - end). - -should_run_replication(RepPid, RepId, Source, Target) -> - ?_test(check_active_tasks(RepPid, RepId, Source, Target)). - -should_ensure_replication_still_running(RepPid, RepId, Source, Target) -> - ?_test(check_active_tasks(RepPid, RepId, Source, Target)). - -check_active_tasks(RepPid, {BaseId, Ext} = _RepId, Src, Tgt) -> - Source = case Src of - {remote, NameSrc} -> - <<(db_url(NameSrc))/binary, $/>>; - _ -> - Src - end, - Target = case Tgt of - {remote, NameTgt} -> - <<(db_url(NameTgt))/binary, $/>>; - _ -> - Tgt - end, - FullRepId = ?l2b(BaseId ++ Ext), - Pid = ?l2b(pid_to_list(RepPid)), - RepTasks = wait_for_task_status(), - ?assertNotEqual(timeout, RepTasks), - [RepTask] = RepTasks, - ?assertEqual(Pid, couch_util:get_value(pid, RepTask)), - ?assertEqual(FullRepId, couch_util:get_value(replication_id, RepTask)), - ?assertEqual(true, couch_util:get_value(continuous, RepTask)), - ?assertEqual(Source, couch_util:get_value(source, RepTask)), - ?assertEqual(Target, couch_util:get_value(target, RepTask)), - ?assert(is_integer(couch_util:get_value(docs_read, RepTask))), - ?assert(is_integer(couch_util:get_value(docs_written, RepTask))), - ?assert(is_integer(couch_util:get_value(doc_write_failures, RepTask))), - ?assert(is_integer(couch_util:get_value(revisions_checked, RepTask))), - ?assert(is_integer(couch_util:get_value(missing_revisions_found, RepTask))), - ?assert(is_integer(couch_util:get_value(checkpointed_source_seq, RepTask))), - ?assert(is_integer(couch_util:get_value(source_seq, RepTask))), - Pending = couch_util:get_value(changes_pending, RepTask), - ?assert(is_integer(Pending)). - -replication_tasks() -> - lists:filter(fun(P) -> - couch_util:get_value(type, P) =:= replication - end, couch_task_status:all()). - - -wait_for_task_status() -> - test_util:wait(fun() -> - case replication_tasks() of - [] -> - wait; - Tasks -> - Tasks - end - end). - -should_cancel_replication(RepId, RepPid) -> - ?_assertNot(begin - ok = couch_replicator_scheduler:remove_job(RepId), - is_process_alive(RepPid) - end). - -should_populate_and_compact(RepPid, Source, Target, BatchSize, Rounds) -> - {timeout, ?TIMEOUT_EUNIT, ?_test(begin - {ok, SourceDb0} = reopen_db(Source), - Writer = spawn_writer(SourceDb0), - lists:foreach( - fun(N) -> - {ok, SourceDb} = reopen_db(Source), - {ok, TargetDb} = reopen_db(Target), - pause_writer(Writer), - - compact_db("source", SourceDb), - ?assert(is_process_alive(RepPid)), - ?assert(is_process_alive(couch_db:get_pid(SourceDb))), - wait_for_compaction("source", SourceDb), - - compact_db("target", TargetDb), - ?assert(is_process_alive(RepPid)), - ?assert(is_process_alive(couch_db:get_pid(TargetDb))), - wait_for_compaction("target", TargetDb), - - {ok, SourceDb2} = reopen_db(SourceDb), - {ok, TargetDb2} = reopen_db(TargetDb), - - resume_writer(Writer), - wait_writer(Writer, BatchSize * N), - - compact_db("source", SourceDb2), - ?assert(is_process_alive(RepPid)), - ?assert(is_process_alive(couch_db:get_pid(SourceDb2))), - pause_writer(Writer), - wait_for_compaction("source", SourceDb2), - resume_writer(Writer), - - compact_db("target", TargetDb2), - ?assert(is_process_alive(RepPid)), - ?assert(is_process_alive(couch_db:get_pid(TargetDb2))), - pause_writer(Writer), - wait_for_compaction("target", TargetDb2), - resume_writer(Writer) - end, lists:seq(1, Rounds)), - stop_writer(Writer) - end)}. - -should_wait_target_in_sync({remote, Source}, Target) -> - should_wait_target_in_sync(Source, Target); -should_wait_target_in_sync(Source, {remote, Target}) -> - should_wait_target_in_sync(Source, Target); -should_wait_target_in_sync(Source, Target) -> - {timeout, ?TIMEOUT_EUNIT, ?_assert(begin - {ok, SourceDb} = couch_db:open_int(Source, []), - {ok, SourceInfo} = couch_db:get_db_info(SourceDb), - ok = couch_db:close(SourceDb), - SourceDocCount = couch_util:get_value(doc_count, SourceInfo), - wait_target_in_sync_loop(SourceDocCount, Target, 300) - end)}. - -wait_target_in_sync_loop(_DocCount, _TargetName, 0) -> - erlang:error( - {assertion_failed, - [{module, ?MODULE}, {line, ?LINE}, - {reason, "Could not get source and target databases in sync"}]}); -wait_target_in_sync_loop(DocCount, {remote, TargetName}, RetriesLeft) -> - wait_target_in_sync_loop(DocCount, TargetName, RetriesLeft); -wait_target_in_sync_loop(DocCount, TargetName, RetriesLeft) -> - {ok, Target} = couch_db:open_int(TargetName, []), - {ok, TargetInfo} = couch_db:get_db_info(Target), - ok = couch_db:close(Target), - TargetDocCount = couch_util:get_value(doc_count, TargetInfo), - case TargetDocCount == DocCount of - true -> - true; - false -> - ok = timer:sleep(?DELAY), - wait_target_in_sync_loop(DocCount, TargetName, RetriesLeft - 1) - end. - -should_compare_databases({remote, Source}, Target) -> - should_compare_databases(Source, Target); -should_compare_databases(Source, {remote, Target}) -> - should_compare_databases(Source, Target); -should_compare_databases(Source, Target) -> - {timeout, 35, ?_test(begin - {ok, SourceDb} = couch_db:open_int(Source, []), - {ok, TargetDb} = couch_db:open_int(Target, []), - Fun = fun(FullDocInfo, Acc) -> - {ok, Doc} = couch_db:open_doc(SourceDb, FullDocInfo), - {Props} = DocJson = couch_doc:to_json_obj(Doc, [attachments]), - DocId = couch_util:get_value(<<"_id">>, Props), - DocTarget = case couch_db:open_doc(TargetDb, DocId) of - {ok, DocT} -> - DocT; - Error -> - erlang:error( - {assertion_failed, - [{module, ?MODULE}, {line, ?LINE}, - {reason, lists:concat(["Error opening document '", - ?b2l(DocId), "' from target: ", - couch_util:to_list(Error)])}]}) - end, - DocTargetJson = couch_doc:to_json_obj(DocTarget, [attachments]), - ?assertEqual(DocJson, DocTargetJson), - {ok, Acc} - end, - {ok, _} = couch_db:fold_docs(SourceDb, Fun, [], []), - ok = couch_db:close(SourceDb), - ok = couch_db:close(TargetDb) - end)}. - - -reopen_db({remote, Db}) -> - reopen_db(Db); -reopen_db(DbName) when is_binary(DbName) -> - {ok, Db} = couch_db:open_int(DbName, []), - ok = couch_db:close(Db), - {ok, Db}; -reopen_db(Db) -> - reopen_db(couch_db:name(Db)). - - -compact_db(Type, Db0) -> - Name = couch_db:name(Db0), - {ok, Db} = couch_db:open_int(Name, []), - {ok, CompactPid} = couch_db:start_compact(Db), - MonRef = erlang:monitor(process, CompactPid), - receive - {'DOWN', MonRef, process, CompactPid, normal} -> - ok; - {'DOWN', MonRef, process, CompactPid, noproc} -> - ok; - {'DOWN', MonRef, process, CompactPid, Reason} -> - erlang:error( - {assertion_failed, - [{module, ?MODULE}, {line, ?LINE}, - {reason, - lists:concat(["Error compacting ", Type, " database ", - ?b2l(Name), ": ", - couch_util:to_list(Reason)])}]}) - after ?TIMEOUT -> - erlang:error( - {assertion_failed, - [{module, ?MODULE}, {line, ?LINE}, - {reason, lists:concat(["Compaction for ", Type, " database ", - ?b2l(Name), " didn't finish"])}]}) - end, - ok = couch_db:close(Db). - -wait_for_compaction(Type, Db) -> - case couch_db:wait_for_compaction(Db) of - ok -> - ok; - {error, noproc} -> - ok; - {error, Reason} -> - erlang:error( - {assertion_failed, - [{module, ?MODULE}, {line, ?LINE}, - {reason, lists:concat(["Compaction of ", Type, - " database failed with: ", Reason])}]}) - end. - -replicate({remote, Db}, Target) -> - replicate(db_url(Db), Target); - -replicate(Source, {remote, Db}) -> - replicate(Source, db_url(Db)); - -replicate(Source, Target) -> - RepObject = {[ - {<<"source">>, Source}, - {<<"target">>, Target}, - {<<"continuous">>, true} - ]}, - {ok, Rep} = couch_replicator_utils:parse_rep_doc(RepObject, ?ADMIN_USER), - ok = couch_replicator_scheduler:add_job(Rep), - couch_replicator_scheduler:reschedule(), - Pid = get_pid(Rep#rep.id), - {ok, Pid, Rep#rep.id}. - - -wait_writer(Pid, NumDocs) -> - case get_writer_num_docs_written(Pid) of - N when N >= NumDocs -> - ok; - _ -> - wait_writer(Pid, NumDocs) - end. - -spawn_writer(Db) -> - Parent = self(), - Pid = spawn(fun() -> writer_loop(Db, Parent, 0) end), - Pid. - - -pause_writer(Pid) -> - Ref = make_ref(), - Pid ! {pause, Ref}, - receive - {paused, Ref} -> - ok - after ?TIMEOUT_WRITER -> - erlang:error({assertion_failed, - [{module, ?MODULE}, - {line, ?LINE}, - {reason, "Failed to pause source database writer"}]}) - end. - -resume_writer(Pid) -> - Ref = make_ref(), - Pid ! {continue, Ref}, - receive - {ok, Ref} -> - ok - after ?TIMEOUT_WRITER -> - erlang:error({assertion_failed, - [{module, ?MODULE}, - {line, ?LINE}, - {reason, "Failed to pause source database writer"}]}) - end. - -get_writer_num_docs_written(Pid) -> - Ref = make_ref(), - Pid ! {get_count, Ref}, - receive - {count, Ref, Count} -> - Count - after ?TIMEOUT_WRITER -> - erlang:error({assertion_failed, - [{module, ?MODULE}, - {line, ?LINE}, - {reason, "Timeout getting number of documents written" - " from source database writer"}]}) - end. - -stop_writer(Pid) -> - Ref = make_ref(), - Pid ! {stop, Ref}, - receive - {stopped, Ref, DocsWritten} -> - MonRef = erlang:monitor(process, Pid), - receive - {'DOWN', MonRef, process, Pid, _Reason} -> - DocsWritten - after ?TIMEOUT -> - erlang:error({assertion_failed, - [{module, ?MODULE}, - {line, ?LINE}, - {reason, "Timeout stopping source database writer"}]}) - end - after ?TIMEOUT_WRITER -> - erlang:error({assertion_failed, - [{module, ?MODULE}, - {line, ?LINE}, - {reason, "Timeout stopping source database writer"}]}) - end. - -writer_loop(Db0, Parent, Counter) -> - DbName = couch_db:name(Db0), - {ok, Data} = file:read_file(?ATTFILE), - maybe_pause(Parent, Counter), - Docs = lists:map(fun(I) -> - couch_doc:from_json_obj({[ - {<<"_id">>, ?l2b(integer_to_list(Counter + I))}, - {<<"value">>, Counter + I}, - {<<"_attachments">>, {[ - {<<"icon1.png">>, {[ - {<<"data">>, base64:encode(Data)}, - {<<"content_type">>, <<"image/png">>} - ]}}, - {<<"icon2.png">>, {[ - {<<"data">>, base64:encode(iolist_to_binary([Data, Data]))}, - {<<"content_type">>, <<"image/png">>} - ]}} - ]}} - ]}) - end, lists:seq(1, ?WRITE_BATCH_SIZE)), - maybe_pause(Parent, Counter), - {ok, Db} = couch_db:open_int(DbName, []), - {ok, _} = couch_db:update_docs(Db, Docs, []), - ok = couch_db:close(Db), - receive - {get_count, Ref} -> - Parent ! {count, Ref, Counter + ?WRITE_BATCH_SIZE}, - writer_loop(Db, Parent, Counter + ?WRITE_BATCH_SIZE); - {stop, Ref} -> - Parent ! {stopped, Ref, Counter + ?WRITE_BATCH_SIZE} - after 0 -> - timer:sleep(?DELAY), - writer_loop(Db, Parent, Counter + ?WRITE_BATCH_SIZE) - end. - -maybe_pause(Parent, Counter) -> - receive - {get_count, Ref} -> - Parent ! {count, Ref, Counter}; - {pause, Ref} -> - Parent ! {paused, Ref}, - receive - {continue, Ref2} -> - Parent ! {ok, Ref2} - end - after 0 -> - ok - end. diff --git a/src/couch_replicator/test/eunit/couch_replicator_error_reporting_tests.erl b/src/couch_replicator/test/eunit/couch_replicator_error_reporting_tests.erl deleted file mode 100644 index 6b4f95c25..000000000 --- a/src/couch_replicator/test/eunit/couch_replicator_error_reporting_tests.erl +++ /dev/null @@ -1,271 +0,0 @@ -% Licensed under the Apache License, Version 2.0 (the "License"); you may not -% use this file except in compliance with the License. You may obtain a copy of -% the License at -% -% http://www.apache.org/licenses/LICENSE-2.0 -% -% Unless required by applicable law or agreed to in writing, software -% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -% License for the specific language governing permissions and limitations under -% the License. - --module(couch_replicator_error_reporting_tests). - --include_lib("couch/include/couch_eunit.hrl"). --include_lib("couch/include/couch_db.hrl"). --include_lib("couch_replicator/src/couch_replicator.hrl"). - - -setup_all() -> - test_util:start_couch([couch_replicator, chttpd, mem3, fabric]). - - -teardown_all(Ctx) -> - ok = test_util:stop_couch(Ctx). - - -setup() -> - meck:unload(), - Source = setup_db(), - Target = setup_db(), - {Source, Target}. - - -teardown({Source, Target}) -> - meck:unload(), - teardown_db(Source), - teardown_db(Target), - ok. - - -error_reporting_test_() -> - { - setup, - fun setup_all/0, - fun teardown_all/1, - { - foreach, - fun setup/0, - fun teardown/1, - [ - fun t_fail_bulk_docs/1, - fun t_fail_changes_reader/1, - fun t_fail_revs_diff/1, - fun t_fail_changes_queue/1, - fun t_fail_changes_manager/1, - fun t_fail_changes_reader_proc/1 - ] - } - }. - - -t_fail_bulk_docs({Source, Target}) -> - ?_test(begin - populate_db(Source, 1, 5), - {ok, RepId} = replicate(Source, Target), - wait_target_in_sync(Source, Target), - - {ok, Listener} = rep_result_listener(RepId), - mock_fail_req("/_bulk_docs", {ok, "403", [], [<<"{\"x\":\"y\"}">>]}), - populate_db(Source, 6, 6), - - {error, Result} = wait_rep_result(RepId), - ?assertEqual({bulk_docs_failed, 403, {[{<<"x">>, <<"y">>}]}}, Result), - - couch_replicator_notifier:stop(Listener) - end). - - -t_fail_changes_reader({Source, Target}) -> - ?_test(begin - populate_db(Source, 1, 5), - {ok, RepId} = replicate(Source, Target), - wait_target_in_sync(Source, Target), - - {ok, Listener} = rep_result_listener(RepId), - mock_fail_req("/_changes", {ok, "418", [], [<<"{\"x\":\"y\"}">>]}), - populate_db(Source, 6, 6), - - {error, Result} = wait_rep_result(RepId), - ?assertEqual({changes_req_failed, 418, {[{<<"x">>, <<"y">>}]}}, Result), - - couch_replicator_notifier:stop(Listener) - end). - - -t_fail_revs_diff({Source, Target}) -> - ?_test(begin - populate_db(Source, 1, 5), - {ok, RepId} = replicate(Source, Target), - wait_target_in_sync(Source, Target), - - {ok, Listener} = rep_result_listener(RepId), - mock_fail_req("/_revs_diff", {ok, "407", [], [<<"{\"x\":\"y\"}">>]}), - populate_db(Source, 6, 6), - - {error, Result} = wait_rep_result(RepId), - ?assertEqual({revs_diff_failed, 407, {[{<<"x">>, <<"y">>}]}}, Result), - - couch_replicator_notifier:stop(Listener) - end). - - -t_fail_changes_queue({Source, Target}) -> - ?_test(begin - populate_db(Source, 1, 5), - {ok, RepId} = replicate(Source, Target), - wait_target_in_sync(Source, Target), - - RepPid = couch_replicator_test_helper:get_pid(RepId), - State = sys:get_state(RepPid), - ChangesQueue = element(20, State), - ?assert(is_process_alive(ChangesQueue)), - - {ok, Listener} = rep_result_listener(RepId), - exit(ChangesQueue, boom), - - {error, Result} = wait_rep_result(RepId), - ?assertEqual({changes_queue_died, boom}, Result), - couch_replicator_notifier:stop(Listener) - end). - - -t_fail_changes_manager({Source, Target}) -> - ?_test(begin - populate_db(Source, 1, 5), - {ok, RepId} = replicate(Source, Target), - wait_target_in_sync(Source, Target), - - RepPid = couch_replicator_test_helper:get_pid(RepId), - State = sys:get_state(RepPid), - ChangesManager = element(21, State), - ?assert(is_process_alive(ChangesManager)), - - {ok, Listener} = rep_result_listener(RepId), - exit(ChangesManager, bam), - - {error, Result} = wait_rep_result(RepId), - ?assertEqual({changes_manager_died, bam}, Result), - couch_replicator_notifier:stop(Listener) - end). - - -t_fail_changes_reader_proc({Source, Target}) -> - ?_test(begin - populate_db(Source, 1, 5), - {ok, RepId} = replicate(Source, Target), - wait_target_in_sync(Source, Target), - - RepPid = couch_replicator_test_helper:get_pid(RepId), - State = sys:get_state(RepPid), - ChangesReader = element(22, State), - ?assert(is_process_alive(ChangesReader)), - - {ok, Listener} = rep_result_listener(RepId), - exit(ChangesReader, kapow), - - {error, Result} = wait_rep_result(RepId), - ?assertEqual({changes_reader_died, kapow}, Result), - couch_replicator_notifier:stop(Listener) - end). - - -mock_fail_req(Path, Return) -> - meck:expect(ibrowse, send_req_direct, - fun(W, Url, Headers, Meth, Body, Opts, TOut) -> - Args = [W, Url, Headers, Meth, Body, Opts, TOut], - {ok, {_, _, _, _, UPath, _}} = http_uri:parse(Url), - case lists:suffix(Path, UPath) of - true -> Return; - false -> meck:passthrough(Args) - end - end). - - -rep_result_listener(RepId) -> - ReplyTo = self(), - {ok, _Listener} = couch_replicator_notifier:start_link( - fun({_, RepId2, _} = Ev) when RepId2 =:= RepId -> - ReplyTo ! Ev; - (_) -> - ok - end). - - -wait_rep_result(RepId) -> - receive - {finished, RepId, RepResult} -> {ok, RepResult}; - {error, RepId, Reason} -> {error, Reason} - end. - - - -setup_db() -> - DbName = ?tempdb(), - {ok, Db} = couch_db:create(DbName, [?ADMIN_CTX]), - ok = couch_db:close(Db), - DbName. - - -teardown_db(DbName) -> - ok = couch_server:delete(DbName, [?ADMIN_CTX]). - - -populate_db(DbName, Start, End) -> - {ok, Db} = couch_db:open_int(DbName, []), - Docs = lists:foldl( - fun(DocIdCounter, Acc) -> - Id = integer_to_binary(DocIdCounter), - Doc = #doc{id = Id, body = {[]}}, - [Doc | Acc] - end, - [], lists:seq(Start, End)), - {ok, _} = couch_db:update_docs(Db, Docs, []), - ok = couch_db:close(Db). - - -wait_target_in_sync(Source, Target) -> - {ok, SourceDb} = couch_db:open_int(Source, []), - {ok, SourceInfo} = couch_db:get_db_info(SourceDb), - ok = couch_db:close(SourceDb), - SourceDocCount = couch_util:get_value(doc_count, SourceInfo), - wait_target_in_sync_loop(SourceDocCount, Target, 300). - - -wait_target_in_sync_loop(_DocCount, _TargetName, 0) -> - erlang:error({assertion_failed, [ - {module, ?MODULE}, {line, ?LINE}, - {reason, "Could not get source and target databases in sync"} - ]}); - -wait_target_in_sync_loop(DocCount, TargetName, RetriesLeft) -> - {ok, Target} = couch_db:open_int(TargetName, []), - {ok, TargetInfo} = couch_db:get_db_info(Target), - ok = couch_db:close(Target), - TargetDocCount = couch_util:get_value(doc_count, TargetInfo), - case TargetDocCount == DocCount of - true -> - true; - false -> - ok = timer:sleep(500), - wait_target_in_sync_loop(DocCount, TargetName, RetriesLeft - 1) - end. - - -replicate(Source, Target) -> - SrcUrl = couch_replicator_test_helper:db_url(Source), - TgtUrl = couch_replicator_test_helper:db_url(Target), - RepObject = {[ - {<<"source">>, SrcUrl}, - {<<"target">>, TgtUrl}, - {<<"continuous">>, true}, - {<<"worker_processes">>, 1}, - {<<"retries_per_request">>, 1}, - % Low connection timeout so _changes feed gets restarted quicker - {<<"connection_timeout">>, 3000} - ]}, - {ok, Rep} = couch_replicator_utils:parse_rep_doc(RepObject, ?ADMIN_USER), - ok = couch_replicator_scheduler:add_job(Rep), - couch_replicator_scheduler:reschedule(), - {ok, Rep#rep.id}. |