summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNick Vatamaniuc <vatamane@apache.org>2020-08-28 04:31:04 -0400
committerNick Vatamaniuc <nickva@users.noreply.github.com>2020-09-15 16:13:46 -0400
commit4fc9a536ec85456ab60085f020548a08dd19ca36 (patch)
tree2f2f0f619ce2e24086243310bfc1d33a1a24552b
parentd2c9dffa3aca3b3e2faed49526b0065ebb845fad (diff)
downloadcouchdb-4fc9a536ec85456ab60085f020548a08dd19ca36.tar.gz
Delete old 2.x-3.x replicator modules
These modules are not used by the new replicator.
-rw-r--r--src/couch_replicator/src/couch_replicator_clustering.erl279
-rw-r--r--src/couch_replicator/src/couch_replicator_db_changes.erl108
-rw-r--r--src/couch_replicator/src/couch_replicator_doc_processor.erl962
-rw-r--r--src/couch_replicator/src/couch_replicator_doc_processor_worker.erl284
-rw-r--r--src/couch_replicator/src/couch_replicator_fabric.erl155
-rw-r--r--src/couch_replicator/src/couch_replicator_fabric_rpc.erl97
-rw-r--r--src/couch_replicator/src/couch_replicator_httpd_util.erl201
-rw-r--r--src/couch_replicator/src/couch_replicator_job_sup.erl34
-rw-r--r--src/couch_replicator/src/couch_replicator_js_functions.hrl177
-rw-r--r--src/couch_replicator/src/couch_replicator_notifier.erl58
-rw-r--r--src/couch_replicator/src/couch_replicator_scheduler.erl1688
-rw-r--r--src/couch_replicator/src/couch_replicator_scheduler.hrl15
-rw-r--r--src/couch_replicator/src/couch_replicator_scheduler_job.erl1090
-rw-r--r--src/couch_replicator/src/couch_replicator_scheduler_sup.erl62
-rw-r--r--src/couch_replicator/test/eunit/couch_replicator_compact_tests.erl455
-rw-r--r--src/couch_replicator/test/eunit/couch_replicator_error_reporting_tests.erl271
16 files changed, 0 insertions, 5936 deletions
diff --git a/src/couch_replicator/src/couch_replicator_clustering.erl b/src/couch_replicator/src/couch_replicator_clustering.erl
deleted file mode 100644
index 18de1e825..000000000
--- a/src/couch_replicator/src/couch_replicator_clustering.erl
+++ /dev/null
@@ -1,279 +0,0 @@
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-%
-% http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
-
-
-% Maintain cluster membership and stability notifications for replications.
-% On changes to cluster membership, broadcast events to `replication` gen_event.
-% Listeners will get `{cluster, stable}` or `{cluster, unstable}` events.
-%
-% Cluster stability is defined as "there have been no nodes added or removed in
-% last `QuietPeriod` seconds". QuietPeriod value is configurable. To ensure a
-% speedier startup, during initialization there is a shorter StartupPeriod
-% in effect (also configurable).
-%
-% This module is also in charge of calculating ownership of replications based
-% on where their _replicator db documents shards live.
-
-
--module(couch_replicator_clustering).
-
--behaviour(gen_server).
--behaviour(config_listener).
--behaviour(mem3_cluster).
-
--export([
- start_link/0
-]).
-
--export([
- init/1,
- terminate/2,
- handle_call/3,
- handle_info/2,
- handle_cast/2,
- code_change/3
-]).
-
--export([
- owner/2,
- is_stable/0,
- link_cluster_event_listener/3
-]).
-
-% config_listener callbacks
--export([
- handle_config_change/5,
- handle_config_terminate/3
-]).
-
-% mem3_cluster callbacks
--export([
- cluster_stable/1,
- cluster_unstable/1
-]).
-
--include_lib("couch/include/couch_db.hrl").
--include_lib("mem3/include/mem3.hrl").
-
--define(DEFAULT_QUIET_PERIOD, 60). % seconds
--define(DEFAULT_START_PERIOD, 5). % seconds
--define(RELISTEN_DELAY, 5000).
-
--record(state, {
- mem3_cluster_pid :: pid(),
- cluster_stable :: boolean()
-}).
-
-
--spec start_link() -> {ok, pid()} | ignore | {error, term()}.
-start_link() ->
- gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
-
-
-% owner/2 function computes ownership for a {DbName, DocId} tuple
-% `unstable` if cluster is considered to be unstable i.e. it has changed
-% recently, or returns node() which of the owner.
-%
--spec owner(Dbname :: binary(), DocId :: binary()) -> node() | unstable.
-owner(<<"shards/", _/binary>> = DbName, DocId) ->
- case is_stable() of
- false ->
- unstable;
- true ->
- owner_int(DbName, DocId)
- end;
-owner(_DbName, _DocId) ->
- node().
-
-
--spec is_stable() -> true | false.
-is_stable() ->
- gen_server:call(?MODULE, is_stable).
-
-
--spec link_cluster_event_listener(atom(), atom(), list()) -> pid().
-link_cluster_event_listener(Mod, Fun, Args)
- when is_atom(Mod), is_atom(Fun), is_list(Args) ->
- CallbackFun =
- fun(Event = {cluster, _}) -> erlang:apply(Mod, Fun, Args ++ [Event]);
- (_) -> ok
- end,
- {ok, Pid} = couch_replicator_notifier:start_link(CallbackFun),
- Pid.
-
-
-% Mem3 cluster callbacks
-
-cluster_unstable(Server) ->
- ok = gen_server:call(Server, set_unstable),
- couch_replicator_notifier:notify({cluster, unstable}),
- couch_stats:update_gauge([couch_replicator, cluster_is_stable], 0),
- couch_log:notice("~s : cluster unstable", [?MODULE]),
- Server.
-
-cluster_stable(Server) ->
- ok = gen_server:call(Server, set_stable),
- couch_replicator_notifier:notify({cluster, stable}),
- couch_stats:update_gauge([couch_replicator, cluster_is_stable], 1),
- couch_log:notice("~s : cluster stable", [?MODULE]),
- Server.
-
-
-% gen_server callbacks
-
-init([]) ->
- ok = config:listen_for_changes(?MODULE, nil),
- Period = abs(config:get_integer("replicator", "cluster_quiet_period",
- ?DEFAULT_QUIET_PERIOD)),
- StartPeriod = abs(config:get_integer("replicator", "cluster_start_period",
- ?DEFAULT_START_PERIOD)),
- couch_stats:update_gauge([couch_replicator, cluster_is_stable], 0),
- {ok, Mem3Cluster} = mem3_cluster:start_link(?MODULE, self(), StartPeriod,
- Period),
- {ok, #state{mem3_cluster_pid = Mem3Cluster, cluster_stable = false}}.
-
-
-terminate(_Reason, _State) ->
- ok.
-
-
-handle_call(is_stable, _From, #state{cluster_stable = IsStable} = State) ->
- {reply, IsStable, State};
-
-handle_call(set_stable, _From, State) ->
- {reply, ok, State#state{cluster_stable = true}};
-
-handle_call(set_unstable, _From, State) ->
- {reply, ok, State#state{cluster_stable = false}}.
-
-
-handle_cast({set_period, Period}, #state{mem3_cluster_pid = Pid} = State) ->
- ok = mem3_cluster:set_period(Pid, Period),
- {noreply, State}.
-
-
-handle_info(restart_config_listener, State) ->
- ok = config:listen_for_changes(?MODULE, nil),
- {noreply, State}.
-
-
-code_change(_OldVsn, State, _Extra) ->
- {ok, State}.
-
-
-%% Internal functions
-
-
-handle_config_change("replicator", "cluster_quiet_period", V, _, S) ->
- ok = gen_server:cast(?MODULE, {set_period, list_to_integer(V)}),
- {ok, S};
-handle_config_change(_, _, _, _, S) ->
- {ok, S}.
-
-
-handle_config_terminate(_, stop, _) -> ok;
-handle_config_terminate(_S, _R, _St) ->
- Pid = whereis(?MODULE),
- erlang:send_after(?RELISTEN_DELAY, Pid, restart_config_listener).
-
-
--spec owner_int(binary(), binary()) -> node().
-owner_int(ShardName, DocId) ->
- DbName = mem3:dbname(ShardName),
- Live = [node() | nodes()],
- Shards = mem3:shards(DbName, DocId),
- Nodes = [N || #shard{node=N} <- Shards, lists:member(N, Live)],
- mem3:owner(DbName, DocId, Nodes).
-
-
-
--ifdef(TEST).
-
--include_lib("eunit/include/eunit.hrl").
-
-
-replicator_clustering_test_() ->
- {
- setup,
- fun setup_all/0,
- fun teardown_all/1,
- {
- foreach,
- fun setup/0,
- fun teardown/1,
- [
- t_stable_callback(),
- t_unstable_callback()
- ]
- }
- }.
-
-
-t_stable_callback() ->
- ?_test(begin
- ?assertEqual(false, is_stable()),
- cluster_stable(whereis(?MODULE)),
- ?assertEqual(true, is_stable())
- end).
-
-
-t_unstable_callback() ->
- ?_test(begin
- cluster_stable(whereis(?MODULE)),
- ?assertEqual(true, is_stable()),
- cluster_unstable(whereis(?MODULE)),
- ?assertEqual(false, is_stable())
- end).
-
-
-setup_all() ->
- meck:expect(couch_log, notice, 2, ok),
- meck:expect(config, get, fun(_, _, Default) -> Default end),
- meck:expect(config, listen_for_changes, 2, ok),
- meck:expect(couch_stats, update_gauge, 2, ok),
- meck:expect(couch_replicator_notifier, notify, 1, ok).
-
-
-teardown_all(_) ->
- meck:unload().
-
-
-setup() ->
- meck:reset([
- config,
- couch_log,
- couch_stats,
- couch_replicator_notifier
- ]),
- stop_clustering_process(),
- {ok, Pid} = start_link(),
- Pid.
-
-
-teardown(Pid) ->
- stop_clustering_process(Pid).
-
-
-stop_clustering_process() ->
- stop_clustering_process(whereis(?MODULE)).
-
-
-stop_clustering_process(undefined) ->
- ok;
-
-stop_clustering_process(Pid) when is_pid(Pid) ->
- Ref = erlang:monitor(process, Pid),
- unlink(Pid),
- exit(Pid, kill),
- receive {'DOWN', Ref, _, _, _} -> ok end.
-
--endif.
diff --git a/src/couch_replicator/src/couch_replicator_db_changes.erl b/src/couch_replicator/src/couch_replicator_db_changes.erl
deleted file mode 100644
index 92b0222c4..000000000
--- a/src/couch_replicator/src/couch_replicator_db_changes.erl
+++ /dev/null
@@ -1,108 +0,0 @@
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-%
-% http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
-
--module(couch_replicator_db_changes).
-
--behaviour(gen_server).
-
--export([
- start_link/0
-]).
-
--export([
- init/1,
- terminate/2,
- handle_call/3,
- handle_info/2,
- handle_cast/2,
- code_change/3
-]).
-
--export([
- notify_cluster_event/2
-]).
-
--record(state, {
- event_listener :: pid(),
- mdb_changes :: pid() | nil
-}).
-
-
--spec notify_cluster_event(pid(), {cluster, any()}) -> ok.
-notify_cluster_event(Server, {cluster, _} = Event) ->
- gen_server:cast(Server, Event).
-
-
--spec start_link() ->
- {ok, pid()} | ignore | {error, any()}.
-start_link() ->
- gen_server:start_link(?MODULE, [], []).
-
-
-init([]) ->
- EvtPid = couch_replicator_clustering:link_cluster_event_listener(?MODULE,
- notify_cluster_event, [self()]),
- State = #state{event_listener = EvtPid, mdb_changes = nil},
- case couch_replicator_clustering:is_stable() of
- true ->
- {ok, restart_mdb_changes(State)};
- false ->
- {ok, State}
- end.
-
-
-terminate(_Reason, _State) ->
- ok.
-
-
-handle_call(_Msg, _From, State) ->
- {reply, {error, invalid_call}, State}.
-
-
-handle_cast({cluster, unstable}, State) ->
- {noreply, stop_mdb_changes(State)};
-
-handle_cast({cluster, stable}, State) ->
- {noreply, restart_mdb_changes(State)}.
-
-
-handle_info(_Msg, State) ->
- {noreply, State}.
-
-
-code_change(_OldVsn, State, _Extra) ->
- {ok, State}.
-
-
--spec restart_mdb_changes(#state{}) -> #state{}.
-restart_mdb_changes(#state{mdb_changes = nil} = State) ->
- Suffix = <<"_replicator">>,
- CallbackMod = couch_replicator_doc_processor,
- Options = [skip_ddocs],
- {ok, Pid} = couch_multidb_changes:start_link(Suffix, CallbackMod, nil,
- Options),
- couch_stats:increment_counter([couch_replicator, db_scans]),
- couch_log:notice("Started replicator db changes listener ~p", [Pid]),
- State#state{mdb_changes = Pid};
-
-restart_mdb_changes(#state{mdb_changes = _Pid} = State) ->
- restart_mdb_changes(stop_mdb_changes(State)).
-
-
--spec stop_mdb_changes(#state{}) -> #state{}.
-stop_mdb_changes(#state{mdb_changes = nil} = State) ->
- State;
-stop_mdb_changes(#state{mdb_changes = Pid} = State) ->
- couch_log:notice("Stopping replicator db changes listener ~p", [Pid]),
- unlink(Pid),
- exit(Pid, kill),
- State#state{mdb_changes = nil}.
diff --git a/src/couch_replicator/src/couch_replicator_doc_processor.erl b/src/couch_replicator/src/couch_replicator_doc_processor.erl
deleted file mode 100644
index 6778d537d..000000000
--- a/src/couch_replicator/src/couch_replicator_doc_processor.erl
+++ /dev/null
@@ -1,962 +0,0 @@
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-%
-% http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
-
--module(couch_replicator_doc_processor).
-
--behaviour(gen_server).
--behaviour(couch_multidb_changes).
-
--export([
- start_link/0
-]).
-
--export([
- init/1,
- terminate/2,
- handle_call/3,
- handle_info/2,
- handle_cast/2,
- code_change/3
-]).
-
--export([
- db_created/2,
- db_deleted/2,
- db_found/2,
- db_change/3
-]).
-
--export([
- docs/1,
- doc/2,
- doc_lookup/3,
- update_docs/0,
- get_worker_ref/1,
- notify_cluster_event/2
-]).
-
--include_lib("couch/include/couch_db.hrl").
--include("couch_replicator.hrl").
--include_lib("mem3/include/mem3.hrl").
-
--import(couch_replicator_utils, [
- get_json_value/2,
- get_json_value/3
-]).
-
--define(DEFAULT_UPDATE_DOCS, false).
--define(ERROR_MAX_BACKOFF_EXPONENT, 12). % ~ 1 day on average
--define(TS_DAY_SEC, 86400).
--define(INITIAL_BACKOFF_EXPONENT, 64).
--define(MIN_FILTER_DELAY_SEC, 60).
-
--type filter_type() :: nil | view | user | docids | mango.
--type repstate() :: initializing | error | scheduled.
-
-
--record(rdoc, {
- id :: db_doc_id() | '_' | {any(), '_'},
- state :: repstate() | '_',
- rep :: #rep{} | nil | '_',
- rid :: rep_id() | nil | '_',
- filter :: filter_type() | '_',
- info :: binary() | nil | '_',
- errcnt :: non_neg_integer() | '_',
- worker :: reference() | nil | '_',
- last_updated :: erlang:timestamp() | '_'
-}).
-
-
-% couch_multidb_changes API callbacks
-
-db_created(DbName, Server) ->
- couch_stats:increment_counter([couch_replicator, docs, dbs_created]),
- couch_replicator_docs:ensure_rep_ddoc_exists(DbName),
- Server.
-
-
-db_deleted(DbName, Server) ->
- couch_stats:increment_counter([couch_replicator, docs, dbs_deleted]),
- ok = gen_server:call(?MODULE, {clean_up_replications, DbName}, infinity),
- Server.
-
-
-db_found(DbName, Server) ->
- couch_stats:increment_counter([couch_replicator, docs, dbs_found]),
- couch_replicator_docs:ensure_rep_ddoc_exists(DbName),
- Server.
-
-
-db_change(DbName, {ChangeProps} = Change, Server) ->
- couch_stats:increment_counter([couch_replicator, docs, db_changes]),
- try
- ok = process_change(DbName, Change)
- catch
- exit:{Error, {gen_server, call, [?MODULE, _, _]}} ->
- ErrMsg = "~p exited ~p while processing change from db ~p",
- couch_log:error(ErrMsg, [?MODULE, Error, DbName]);
- _Tag:Error ->
- {RepProps} = get_json_value(doc, ChangeProps),
- DocId = get_json_value(<<"_id">>, RepProps),
- couch_replicator_docs:update_failed(DbName, DocId, Error)
- end,
- Server.
-
-
--spec get_worker_ref(db_doc_id()) -> reference() | nil.
-get_worker_ref({DbName, DocId}) when is_binary(DbName), is_binary(DocId) ->
- case ets:lookup(?MODULE, {DbName, DocId}) of
- [#rdoc{worker = WRef}] when is_reference(WRef) ->
- WRef;
- [#rdoc{worker = nil}] ->
- nil;
- [] ->
- nil
- end.
-
-
-% Cluster membership change notification callback
--spec notify_cluster_event(pid(), {cluster, any()}) -> ok.
-notify_cluster_event(Server, {cluster, _} = Event) ->
- gen_server:cast(Server, Event).
-
-
-process_change(DbName, {Change}) ->
- {RepProps} = JsonRepDoc = get_json_value(doc, Change),
- DocId = get_json_value(<<"_id">>, RepProps),
- Owner = couch_replicator_clustering:owner(DbName, DocId),
- Id = {DbName, DocId},
- case {Owner, get_json_value(deleted, Change, false)} of
- {_, true} ->
- ok = gen_server:call(?MODULE, {removed, Id}, infinity);
- {unstable, false} ->
- couch_log:notice("Not starting '~s' as cluster is unstable", [DocId]);
- {ThisNode, false} when ThisNode =:= node() ->
- case get_json_value(<<"_replication_state">>, RepProps) of
- undefined ->
- ok = process_updated(Id, JsonRepDoc);
- <<"triggered">> ->
- maybe_remove_state_fields(DbName, DocId),
- ok = process_updated(Id, JsonRepDoc);
- <<"completed">> ->
- ok = gen_server:call(?MODULE, {completed, Id}, infinity);
- <<"error">> ->
- % Handle replications started from older versions of replicator
- % which wrote transient errors to replication docs
- maybe_remove_state_fields(DbName, DocId),
- ok = process_updated(Id, JsonRepDoc);
- <<"failed">> ->
- ok
- end;
- {Owner, false} ->
- ok
- end,
- ok.
-
-
-maybe_remove_state_fields(DbName, DocId) ->
- case update_docs() of
- true ->
- ok;
- false ->
- couch_replicator_docs:remove_state_fields(DbName, DocId)
- end.
-
-
-process_updated({DbName, _DocId} = Id, JsonRepDoc) ->
- % Parsing replication doc (but not calculating the id) could throw an
- % exception which would indicate this document is malformed. This exception
- % should propagate to db_change function and will be recorded as permanent
- % failure in the document. User will have to update the documet to fix the
- % problem.
- Rep0 = couch_replicator_docs:parse_rep_doc_without_id(JsonRepDoc),
- Rep = Rep0#rep{db_name = DbName, start_time = os:timestamp()},
- Filter = case couch_replicator_filters:parse(Rep#rep.options) of
- {ok, nil} ->
- nil;
- {ok, {user, _FName, _QP}} ->
- user;
- {ok, {view, _FName, _QP}} ->
- view;
- {ok, {docids, _DocIds}} ->
- docids;
- {ok, {mango, _Selector}} ->
- mango;
- {error, FilterError} ->
- throw(FilterError)
- end,
- gen_server:call(?MODULE, {updated, Id, Rep, Filter}, infinity).
-
-
-% Doc processor gen_server API and callbacks
-
-start_link() ->
- gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
-
-
-init([]) ->
- ?MODULE = ets:new(?MODULE, [named_table, {keypos, #rdoc.id},
- {read_concurrency, true}, {write_concurrency, true}]),
- couch_replicator_clustering:link_cluster_event_listener(?MODULE,
- notify_cluster_event, [self()]),
- {ok, nil}.
-
-
-terminate(_Reason, _State) ->
- ok.
-
-
-handle_call({updated, Id, Rep, Filter}, _From, State) ->
- ok = updated_doc(Id, Rep, Filter),
- {reply, ok, State};
-
-handle_call({removed, Id}, _From, State) ->
- ok = removed_doc(Id),
- {reply, ok, State};
-
-handle_call({completed, Id}, _From, State) ->
- true = ets:delete(?MODULE, Id),
- {reply, ok, State};
-
-handle_call({clean_up_replications, DbName}, _From, State) ->
- ok = removed_db(DbName),
- {reply, ok, State}.
-
-handle_cast({cluster, unstable}, State) ->
- % Ignoring unstable state transition
- {noreply, State};
-
-handle_cast({cluster, stable}, State) ->
- % Membership changed recheck all the replication document ownership
- nil = ets:foldl(fun cluster_membership_foldl/2, nil, ?MODULE),
- {noreply, State};
-
-handle_cast(Msg, State) ->
- {stop, {error, unexpected_message, Msg}, State}.
-
-
-handle_info({'DOWN', _, _, _, #doc_worker_result{id = Id, wref = Ref,
- result = Res}}, State) ->
- ok = worker_returned(Ref, Id, Res),
- {noreply, State};
-
-handle_info(_Msg, State) ->
- {noreply, State}.
-
-
-code_change(_OldVsn, State, _Extra) ->
- {ok, State}.
-
-
-% Doc processor gen_server private helper functions
-
-% Handle doc update -- add to ets, then start a worker to try to turn it into
-% a replication job. In most cases it will succeed quickly but for filtered
-% replications or if there are duplicates, it could take longer
-% (theoretically indefinitely) until a replication could be started. Before
-% adding replication job, make sure to delete all old jobs associated with
-% same document.
--spec updated_doc(db_doc_id(), #rep{}, filter_type()) -> ok.
-updated_doc(Id, Rep, Filter) ->
- NormCurRep = couch_replicator_utils:normalize_rep(current_rep(Id)),
- NormNewRep = couch_replicator_utils:normalize_rep(Rep),
- case NormCurRep == NormNewRep of
- false ->
- removed_doc(Id),
- Row = #rdoc{
- id = Id,
- state = initializing,
- rep = Rep,
- rid = nil,
- filter = Filter,
- info = nil,
- errcnt = 0,
- worker = nil,
- last_updated = os:timestamp()
- },
- true = ets:insert(?MODULE, Row),
- ok = maybe_start_worker(Id);
- true ->
- ok
- end.
-
-
-% Return current #rep{} record if any. If replication hasn't been submitted
-% to the scheduler yet, #rep{} record will be in the document processor's
-% ETS table, otherwise query scheduler for the #rep{} record.
--spec current_rep({binary(), binary()}) -> #rep{} | nil.
-current_rep({DbName, DocId}) when is_binary(DbName), is_binary(DocId) ->
- case ets:lookup(?MODULE, {DbName, DocId}) of
- [] ->
- nil;
- [#rdoc{state = scheduled, rep = nil, rid = JobId}] ->
- % When replication is scheduled, #rep{} record which can be quite
- % large compared to other bits in #rdoc is removed in order to avoid
- % having to keep 2 copies of it. So have to fetch it from the
- % scheduler.
- couch_replicator_scheduler:rep_state(JobId);
- [#rdoc{rep = Rep}] ->
- Rep
- end.
-
-
--spec worker_returned(reference(), db_doc_id(), rep_start_result()) -> ok.
-worker_returned(Ref, Id, {ok, RepId}) ->
- case ets:lookup(?MODULE, Id) of
- [#rdoc{worker = Ref} = Row] ->
- Row0 = Row#rdoc{
- state = scheduled,
- errcnt = 0,
- worker = nil,
- last_updated = os:timestamp()
- },
- NewRow = case Row0 of
- #rdoc{rid = RepId, filter = user} ->
- % Filtered replication id didn't change.
- Row0;
- #rdoc{rid = nil, filter = user} ->
- % Calculated new replication id for a filtered replication. Make
- % sure to schedule another check as filter code could change.
- % Replication starts could have been failing, so also clear
- % error count.
- Row0#rdoc{rid = RepId};
- #rdoc{rid = OldRepId, filter = user} ->
- % Replication id of existing replication job with filter has
- % changed. Remove old replication job from scheduler and
- % schedule check to check for future changes.
- ok = couch_replicator_scheduler:remove_job(OldRepId),
- Msg = io_lib:format("Replication id changed: ~p -> ~p", [
- OldRepId, RepId]),
- Row0#rdoc{rid = RepId, info = couch_util:to_binary(Msg)};
- #rdoc{rid = nil} ->
- % Calculated new replication id for non-filtered replication.
- % Remove replication doc body, after this we won't need it
- % anymore.
- Row0#rdoc{rep=nil, rid=RepId, info=nil}
- end,
- true = ets:insert(?MODULE, NewRow),
- ok = maybe_update_doc_triggered(Row#rdoc.rep, RepId),
- ok = maybe_start_worker(Id);
- _ ->
- ok % doc could have been deleted, ignore
- end,
- ok;
-
-worker_returned(_Ref, _Id, ignore) ->
- ok;
-
-worker_returned(Ref, Id, {temporary_error, Reason}) ->
- case ets:lookup(?MODULE, Id) of
- [#rdoc{worker = Ref, errcnt = ErrCnt} = Row] ->
- NewRow = Row#rdoc{
- rid = nil,
- state = error,
- info = Reason,
- errcnt = ErrCnt + 1,
- worker = nil,
- last_updated = os:timestamp()
- },
- true = ets:insert(?MODULE, NewRow),
- ok = maybe_update_doc_error(NewRow#rdoc.rep, Reason),
- ok = maybe_start_worker(Id);
- _ ->
- ok % doc could have been deleted, ignore
- end,
- ok;
-
-worker_returned(Ref, Id, {permanent_failure, _Reason}) ->
- case ets:lookup(?MODULE, Id) of
- [#rdoc{worker = Ref}] ->
- true = ets:delete(?MODULE, Id);
- _ ->
- ok % doc could have been deleted, ignore
- end,
- ok.
-
-
--spec maybe_update_doc_error(#rep{}, any()) -> ok.
-maybe_update_doc_error(Rep, Reason) ->
- case update_docs() of
- true ->
- couch_replicator_docs:update_error(Rep, Reason);
- false ->
- ok
- end.
-
-
--spec maybe_update_doc_triggered(#rep{}, rep_id()) -> ok.
-maybe_update_doc_triggered(Rep, RepId) ->
- case update_docs() of
- true ->
- couch_replicator_docs:update_triggered(Rep, RepId);
- false ->
- ok
- end.
-
-
--spec error_backoff(non_neg_integer()) -> seconds().
-error_backoff(ErrCnt) ->
- Exp = min(ErrCnt, ?ERROR_MAX_BACKOFF_EXPONENT),
- % ErrCnt is the exponent here. The reason 64 is used is to start at
- % 64 (about a minute) max range. Then first backoff would be 30 sec
- % on average. Then 1 minute and so on.
- couch_rand:uniform(?INITIAL_BACKOFF_EXPONENT bsl Exp).
-
-
--spec filter_backoff() -> seconds().
-filter_backoff() ->
- Total = ets:info(?MODULE, size),
- % This value scaled by the number of replications. If the are a lot of them
- % wait is longer, but not more than a day (?TS_DAY_SEC). If there are just
- % few, wait is shorter, starting at about 30 seconds. `2 *` is used since
- % the expected wait would then be 0.5 * Range so it is easier to see the
- % average wait. `1 +` is used because couch_rand:uniform only
- % accepts >= 1 values and crashes otherwise.
- Range = 1 + min(2 * (Total / 10), ?TS_DAY_SEC),
- ?MIN_FILTER_DELAY_SEC + couch_rand:uniform(round(Range)).
-
-
-% Document removed from db -- clear ets table and remove all scheduled jobs
--spec removed_doc(db_doc_id()) -> ok.
-removed_doc({DbName, DocId} = Id) ->
- ets:delete(?MODULE, Id),
- RepIds = couch_replicator_scheduler:find_jobs_by_doc(DbName, DocId),
- lists:foreach(fun couch_replicator_scheduler:remove_job/1, RepIds).
-
-
-% Whole db shard is gone -- remove all its ets rows and stop jobs
--spec removed_db(binary()) -> ok.
-removed_db(DbName) ->
- EtsPat = #rdoc{id = {DbName, '_'}, _ = '_'},
- ets:match_delete(?MODULE, EtsPat),
- RepIds = couch_replicator_scheduler:find_jobs_by_dbname(DbName),
- lists:foreach(fun couch_replicator_scheduler:remove_job/1, RepIds).
-
-
-% Spawn a worker process which will attempt to calculate a replication id, then
-% start a replication. Returns a process monitor reference. The worker is
-% guaranteed to exit with rep_start_result() type only.
--spec maybe_start_worker(db_doc_id()) -> ok.
-maybe_start_worker(Id) ->
- case ets:lookup(?MODULE, Id) of
- [] ->
- ok;
- [#rdoc{state = scheduled, filter = Filter}] when Filter =/= user ->
- ok;
- [#rdoc{rep = Rep} = Doc] ->
- % For any replication with a user created filter function, periodically
- % (every `filter_backoff/0` seconds) to try to see if the user filter
- % has changed by using a worker to check for changes. When the worker
- % returns check if replication ID has changed. If it hasn't keep
- % checking (spawn another worker and so on). If it has stop the job
- % with the old ID and continue checking.
- Wait = get_worker_wait(Doc),
- Ref = make_ref(),
- true = ets:insert(?MODULE, Doc#rdoc{worker = Ref}),
- couch_replicator_doc_processor_worker:spawn_worker(Id, Rep, Wait, Ref),
- ok
- end.
-
-
--spec get_worker_wait(#rdoc{}) -> seconds().
-get_worker_wait(#rdoc{state = scheduled, filter = user}) ->
- filter_backoff();
-get_worker_wait(#rdoc{state = error, errcnt = ErrCnt}) ->
- error_backoff(ErrCnt);
-get_worker_wait(#rdoc{state = initializing}) ->
- 0.
-
-
--spec update_docs() -> boolean().
-update_docs() ->
- config:get_boolean("replicator", "update_docs", ?DEFAULT_UPDATE_DOCS).
-
-
-% _scheduler/docs HTTP endpoint helpers
-
--spec docs([atom()]) -> [{[_]}] | [].
-docs(States) ->
- HealthThreshold = couch_replicator_scheduler:health_threshold(),
- ets:foldl(fun(RDoc, Acc) ->
- case ejson_doc(RDoc, HealthThreshold) of
- nil ->
- Acc; % Could have been deleted if job just completed
- {Props} = EJson ->
- {state, DocState} = lists:keyfind(state, 1, Props),
- case ejson_doc_state_filter(DocState, States) of
- true ->
- [EJson | Acc];
- false ->
- Acc
- end
- end
- end, [], ?MODULE).
-
-
--spec doc(binary(), binary()) -> {ok, {[_]}} | {error, not_found}.
-doc(Db, DocId) ->
- HealthThreshold = couch_replicator_scheduler:health_threshold(),
- Res = (catch ets:foldl(fun(RDoc, nil) ->
- {Shard, RDocId} = RDoc#rdoc.id,
- case {mem3:dbname(Shard), RDocId} of
- {Db, DocId} ->
- throw({found, ejson_doc(RDoc, HealthThreshold)});
- {_OtherDb, _OtherDocId} ->
- nil
- end
- end, nil, ?MODULE)),
- case Res of
- {found, DocInfo} ->
- {ok, DocInfo};
- nil ->
- {error, not_found}
- end.
-
-
--spec doc_lookup(binary(), binary(), integer()) ->
- {ok, {[_]}} | {error, not_found}.
-doc_lookup(Db, DocId, HealthThreshold) ->
- case ets:lookup(?MODULE, {Db, DocId}) of
- [#rdoc{} = RDoc] ->
- {ok, ejson_doc(RDoc, HealthThreshold)};
- [] ->
- {error, not_found}
- end.
-
-
--spec ejson_rep_id(rep_id() | nil) -> binary() | null.
-ejson_rep_id(nil) ->
- null;
-ejson_rep_id({BaseId, Ext}) ->
- iolist_to_binary([BaseId, Ext]).
-
-
--spec ejson_doc(#rdoc{}, non_neg_integer()) -> {[_]} | nil.
-ejson_doc(#rdoc{state = scheduled} = RDoc, HealthThreshold) ->
- #rdoc{id = {DbName, DocId}, rid = RepId} = RDoc,
- JobProps = couch_replicator_scheduler:job_summary(RepId, HealthThreshold),
- case JobProps of
- nil ->
- nil;
- [{_, _} | _] ->
- {[
- {doc_id, DocId},
- {database, DbName},
- {id, ejson_rep_id(RepId)},
- {node, node()} | JobProps
- ]}
- end;
-
-ejson_doc(#rdoc{state = RepState} = RDoc, _HealthThreshold) ->
- #rdoc{
- id = {DbName, DocId},
- info = StateInfo,
- rid = RepId,
- errcnt = ErrorCount,
- last_updated = StateTime,
- rep = Rep
- } = RDoc,
- {[
- {doc_id, DocId},
- {database, DbName},
- {id, ejson_rep_id(RepId)},
- {state, RepState},
- {info, couch_replicator_utils:ejson_state_info(StateInfo)},
- {error_count, ErrorCount},
- {node, node()},
- {last_updated, couch_replicator_utils:iso8601(StateTime)},
- {start_time, couch_replicator_utils:iso8601(Rep#rep.start_time)}
- ]}.
-
-
--spec ejson_doc_state_filter(atom(), [atom()]) -> boolean().
-ejson_doc_state_filter(_DocState, []) ->
- true;
-ejson_doc_state_filter(State, States) when is_list(States), is_atom(State) ->
- lists:member(State, States).
-
-
--spec cluster_membership_foldl(#rdoc{}, nil) -> nil.
-cluster_membership_foldl(#rdoc{id = {DbName, DocId} = Id, rid = RepId}, nil) ->
- case couch_replicator_clustering:owner(DbName, DocId) of
- unstable ->
- nil;
- ThisNode when ThisNode =:= node() ->
- nil;
- OtherNode ->
- Msg = "Replication doc ~p:~p with id ~p usurped by node ~p",
- couch_log:notice(Msg, [DbName, DocId, RepId, OtherNode]),
- removed_doc(Id),
- nil
- end.
-
-
--ifdef(TEST).
-
--include_lib("eunit/include/eunit.hrl").
-
--define(DB, <<"db">>).
--define(EXIT_DB, <<"exit_db">>).
--define(DOC1, <<"doc1">>).
--define(DOC2, <<"doc2">>).
--define(R1, {"1", ""}).
--define(R2, {"2", ""}).
-
-
-doc_processor_test_() ->
- {
- setup,
- fun setup_all/0,
- fun teardown_all/1,
- {
- foreach,
- fun setup/0,
- fun teardown/1,
- [
- t_bad_change(),
- t_regular_change(),
- t_change_with_doc_processor_crash(),
- t_change_with_existing_job(),
- t_deleted_change(),
- t_triggered_change(),
- t_completed_change(),
- t_active_replication_completed(),
- t_error_change(),
- t_failed_change(),
- t_change_for_different_node(),
- t_change_when_cluster_unstable(),
- t_ejson_docs(),
- t_cluster_membership_foldl()
- ]
- }
- }.
-
-
-% Can't parse replication doc, so should write failure state to document.
-t_bad_change() ->
- ?_test(begin
- ?assertEqual(acc, db_change(?DB, bad_change(), acc)),
- ?assert(updated_doc_with_failed_state())
- end).
-
-
-% Regular change, parse to a #rep{} and then add job.
-t_regular_change() ->
- ?_test(begin
- mock_existing_jobs_lookup([]),
- ?assertEqual(ok, process_change(?DB, change())),
- ?assert(ets:member(?MODULE, {?DB, ?DOC1})),
- ?assert(started_worker({?DB, ?DOC1}))
- end).
-
-
-% Handle cases where doc processor exits or crashes while processing a change
-t_change_with_doc_processor_crash() ->
- ?_test(begin
- mock_existing_jobs_lookup([]),
- ?assertEqual(acc, db_change(?EXIT_DB, change(), acc)),
- ?assert(failed_state_not_updated())
- end).
-
-
-% Regular change, parse to a #rep{} and then add job but there is already
-% a running job with same Id found.
-t_change_with_existing_job() ->
- ?_test(begin
- mock_existing_jobs_lookup([test_rep(?R2)]),
- ?assertEqual(ok, process_change(?DB, change())),
- ?assert(ets:member(?MODULE, {?DB, ?DOC1})),
- ?assert(started_worker({?DB, ?DOC1}))
- end).
-
-
-% Change is a deletion, and job is running, so remove job.
-t_deleted_change() ->
- ?_test(begin
- mock_existing_jobs_lookup([test_rep(?R2)]),
- ?assertEqual(ok, process_change(?DB, deleted_change())),
- ?assert(removed_job(?R2))
- end).
-
-
-% Change is in `triggered` state. Remove legacy state and add job.
-t_triggered_change() ->
- ?_test(begin
- mock_existing_jobs_lookup([]),
- ?assertEqual(ok, process_change(?DB, change(<<"triggered">>))),
- ?assert(removed_state_fields()),
- ?assert(ets:member(?MODULE, {?DB, ?DOC1})),
- ?assert(started_worker({?DB, ?DOC1}))
- end).
-
-
-% Change is in `completed` state, so skip over it.
-t_completed_change() ->
- ?_test(begin
- ?assertEqual(ok, process_change(?DB, change(<<"completed">>))),
- ?assert(did_not_remove_state_fields()),
- ?assertNot(ets:member(?MODULE, {?DB, ?DOC1})),
- ?assert(did_not_spawn_worker())
- end).
-
-
-% Completed change comes for what used to be an active job. In this case
-% remove entry from doc_processor's ets (because there is no linkage or
-% callback mechanism for scheduler to tell doc_processsor a replication just
-% completed).
-t_active_replication_completed() ->
- ?_test(begin
- mock_existing_jobs_lookup([]),
- ?assertEqual(ok, process_change(?DB, change())),
- ?assert(ets:member(?MODULE, {?DB, ?DOC1})),
- ?assertEqual(ok, process_change(?DB, change(<<"completed">>))),
- ?assert(did_not_remove_state_fields()),
- ?assertNot(ets:member(?MODULE, {?DB, ?DOC1}))
- end).
-
-
-% Change is in `error` state. Remove legacy state and retry
-% running the job. This state was used for transient erorrs which are not
-% written to the document anymore.
-t_error_change() ->
- ?_test(begin
- mock_existing_jobs_lookup([]),
- ?assertEqual(ok, process_change(?DB, change(<<"error">>))),
- ?assert(removed_state_fields()),
- ?assert(ets:member(?MODULE, {?DB, ?DOC1})),
- ?assert(started_worker({?DB, ?DOC1}))
- end).
-
-
-% Change is in `failed` state. This is a terminal state and it will not
-% be tried again, so skip over it.
-t_failed_change() ->
- ?_test(begin
- ?assertEqual(ok, process_change(?DB, change(<<"failed">>))),
- ?assert(did_not_remove_state_fields()),
- ?assertNot(ets:member(?MODULE, {?DB, ?DOC1})),
- ?assert(did_not_spawn_worker())
- end).
-
-
-% Normal change, but according to cluster ownership algorithm, replication
-% belongs to a different node, so this node should skip it.
-t_change_for_different_node() ->
- ?_test(begin
- meck:expect(couch_replicator_clustering, owner, 2, different_node),
- ?assertEqual(ok, process_change(?DB, change())),
- ?assert(did_not_spawn_worker())
- end).
-
-
-% Change handled when cluster is unstable (nodes are added or removed), so
-% job is not added. A rescan will be triggered soon and change will be
-% evaluated again.
-t_change_when_cluster_unstable() ->
- ?_test(begin
- meck:expect(couch_replicator_clustering, owner, 2, unstable),
- ?assertEqual(ok, process_change(?DB, change())),
- ?assert(did_not_spawn_worker())
- end).
-
-
-% Check if docs/0 function produces expected ejson after adding a job
-t_ejson_docs() ->
- ?_test(begin
- mock_existing_jobs_lookup([]),
- ?assertEqual(ok, process_change(?DB, change())),
- ?assert(ets:member(?MODULE, {?DB, ?DOC1})),
- EJsonDocs = docs([]),
- ?assertMatch([{[_|_]}], EJsonDocs),
- [{DocProps}] = EJsonDocs,
- {value, StateTime, DocProps1} = lists:keytake(last_updated, 1,
- DocProps),
- ?assertMatch({last_updated, BinVal1} when is_binary(BinVal1),
- StateTime),
- {value, StartTime, DocProps2} = lists:keytake(start_time, 1, DocProps1),
- ?assertMatch({start_time, BinVal2} when is_binary(BinVal2), StartTime),
- ExpectedProps = [
- {database, ?DB},
- {doc_id, ?DOC1},
- {error_count, 0},
- {id, null},
- {info, null},
- {node, node()},
- {state, initializing}
- ],
- ?assertEqual(ExpectedProps, lists:usort(DocProps2))
- end).
-
-
-% Check that when cluster membership changes records from doc processor and job
-% scheduler get removed
-t_cluster_membership_foldl() ->
- ?_test(begin
- mock_existing_jobs_lookup([test_rep(?R1)]),
- ?assertEqual(ok, process_change(?DB, change())),
- meck:expect(couch_replicator_clustering, owner, 2, different_node),
- ?assert(ets:member(?MODULE, {?DB, ?DOC1})),
- gen_server:cast(?MODULE, {cluster, stable}),
- meck:wait(2, couch_replicator_scheduler, find_jobs_by_doc, 2, 5000),
- ?assertNot(ets:member(?MODULE, {?DB, ?DOC1})),
- ?assert(removed_job(?R1))
- end).
-
-
-get_worker_ref_test_() ->
- {
- setup,
- fun() ->
- ets:new(?MODULE, [named_table, public, {keypos, #rdoc.id}])
- end,
- fun(_) -> ets:delete(?MODULE) end,
- ?_test(begin
- Id = {<<"db">>, <<"doc">>},
- ?assertEqual(nil, get_worker_ref(Id)),
- ets:insert(?MODULE, #rdoc{id = Id, worker = nil}),
- ?assertEqual(nil, get_worker_ref(Id)),
- Ref = make_ref(),
- ets:insert(?MODULE, #rdoc{id = Id, worker = Ref}),
- ?assertEqual(Ref, get_worker_ref(Id))
- end)
- }.
-
-
-% Test helper functions
-
-
-setup_all() ->
- meck:expect(couch_log, info, 2, ok),
- meck:expect(couch_log, notice, 2, ok),
- meck:expect(couch_log, warning, 2, ok),
- meck:expect(couch_log, error, 2, ok),
- meck:expect(config, get, fun(_, _, Default) -> Default end),
- meck:expect(config, listen_for_changes, 2, ok),
- meck:expect(couch_replicator_clustering, owner, 2, node()),
- meck:expect(couch_replicator_clustering, link_cluster_event_listener, 3,
- ok),
- meck:expect(couch_replicator_doc_processor_worker, spawn_worker, fun
- ({?EXIT_DB, _}, _, _, _) -> exit(kapow);
- (_, _, _, _) -> pid
- end),
- meck:expect(couch_replicator_scheduler, remove_job, 1, ok),
- meck:expect(couch_replicator_docs, remove_state_fields, 2, ok),
- meck:expect(couch_replicator_docs, update_failed, 3, ok).
-
-
-teardown_all(_) ->
- meck:unload().
-
-
-setup() ->
- meck:reset([
- config,
- couch_log,
- couch_replicator_clustering,
- couch_replicator_doc_processor_worker,
- couch_replicator_docs,
- couch_replicator_scheduler
- ]),
- % Set this expectation back to the default for
- % each test since some tests change it
- meck:expect(couch_replicator_clustering, owner, 2, node()),
- {ok, Pid} = start_link(),
- unlink(Pid),
- Pid.
-
-
-teardown(Pid) ->
- exit(Pid, kill).
-
-
-removed_state_fields() ->
- meck:called(couch_replicator_docs, remove_state_fields, [?DB, ?DOC1]).
-
-
-started_worker(_Id) ->
- 1 == meck:num_calls(couch_replicator_doc_processor_worker, spawn_worker, 4).
-
-
-removed_job(Id) ->
- meck:called(couch_replicator_scheduler, remove_job, [test_rep(Id)]).
-
-
-did_not_remove_state_fields() ->
- 0 == meck:num_calls(couch_replicator_docs, remove_state_fields, '_').
-
-
-did_not_spawn_worker() ->
- 0 == meck:num_calls(couch_replicator_doc_processor_worker, spawn_worker,
- '_').
-
-updated_doc_with_failed_state() ->
- 1 == meck:num_calls(couch_replicator_docs, update_failed, '_').
-
-failed_state_not_updated() ->
- 0 == meck:num_calls(couch_replicator_docs, update_failed, '_').
-
-mock_existing_jobs_lookup(ExistingJobs) ->
- meck:expect(couch_replicator_scheduler, find_jobs_by_doc, fun
- (?EXIT_DB, ?DOC1) -> [];
- (?DB, ?DOC1) -> ExistingJobs
- end).
-
-
-test_rep(Id) ->
- #rep{id = Id, start_time = {0, 0, 0}}.
-
-
-change() ->
- {[
- {<<"id">>, ?DOC1},
- {doc, {[
- {<<"_id">>, ?DOC1},
- {<<"source">>, <<"http://srchost.local/src">>},
- {<<"target">>, <<"http://tgthost.local/tgt">>}
- ]}}
- ]}.
-
-
-change(State) ->
- {[
- {<<"id">>, ?DOC1},
- {doc, {[
- {<<"_id">>, ?DOC1},
- {<<"source">>, <<"http://srchost.local/src">>},
- {<<"target">>, <<"http://tgthost.local/tgt">>},
- {<<"_replication_state">>, State}
- ]}}
- ]}.
-
-
-deleted_change() ->
- {[
- {<<"id">>, ?DOC1},
- {<<"deleted">>, true},
- {doc, {[
- {<<"_id">>, ?DOC1},
- {<<"source">>, <<"http://srchost.local/src">>},
- {<<"target">>, <<"http://tgthost.local/tgt">>}
- ]}}
- ]}.
-
-
-bad_change() ->
- {[
- {<<"id">>, ?DOC2},
- {doc, {[
- {<<"_id">>, ?DOC2},
- {<<"source">>, <<"src">>}
- ]}}
- ]}.
-
--endif.
diff --git a/src/couch_replicator/src/couch_replicator_doc_processor_worker.erl b/src/couch_replicator/src/couch_replicator_doc_processor_worker.erl
deleted file mode 100644
index a4c829323..000000000
--- a/src/couch_replicator/src/couch_replicator_doc_processor_worker.erl
+++ /dev/null
@@ -1,284 +0,0 @@
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-%
-% http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
-
--module(couch_replicator_doc_processor_worker).
-
--export([
- spawn_worker/4
-]).
-
--include("couch_replicator.hrl").
-
--import(couch_replicator_utils, [
- pp_rep_id/1
-]).
-
-% 61 seconds here because request usually have 10, 15, 30 second
-% timeouts set. We'd want the worker to get a chance to make a few
-% requests (maybe one failing one and a retry) and then fail with its
-% own error (timeout, network error), which would be more specific and
-% informative, before it simply gets killed because of the timeout
-% here. That is, if all fails and the worker is actually blocked then
-% 61 sec is a safety net to brutally kill the worker so doesn't end up
-% hung forever.
--define(WORKER_TIMEOUT_MSEC, 61000).
-
-
-% Spawn a worker which attempts to calculate replication id then add a
-% replication job to scheduler. This function create a monitor to the worker
-% a worker will then exit with the #doc_worker_result{} record within
-% ?WORKER_TIMEOUT_MSEC timeout period.A timeout is considered a
-%`temporary_error`. Result will be sent as the `Reason` in the {'DOWN',...}
-% message.
--spec spawn_worker(db_doc_id(), #rep{}, seconds(), reference()) -> pid().
-spawn_worker(Id, Rep, WaitSec, WRef) ->
- {Pid, _Ref} = spawn_monitor(fun() ->
- worker_fun(Id, Rep, WaitSec, WRef)
- end),
- Pid.
-
-
-% Private functions
-
--spec worker_fun(db_doc_id(), #rep{}, seconds(), reference()) -> no_return().
-worker_fun(Id, Rep, WaitSec, WRef) ->
- timer:sleep(WaitSec * 1000),
- Fun = fun() ->
- try maybe_start_replication(Id, Rep, WRef) of
- Res ->
- exit(Res)
- catch
- throw:{filter_fetch_error, Reason} ->
- exit({temporary_error, Reason});
- _Tag:Reason ->
- exit({temporary_error, Reason})
- end
- end,
- {Pid, Ref} = spawn_monitor(Fun),
- receive
- {'DOWN', Ref, _, Pid, Result} ->
- exit(#doc_worker_result{id = Id, wref = WRef, result = Result})
- after ?WORKER_TIMEOUT_MSEC ->
- erlang:demonitor(Ref, [flush]),
- exit(Pid, kill),
- {DbName, DocId} = Id,
- TimeoutSec = round(?WORKER_TIMEOUT_MSEC / 1000),
- Msg = io_lib:format("Replication for db ~p doc ~p failed to start due "
- "to timeout after ~B seconds", [DbName, DocId, TimeoutSec]),
- Result = {temporary_error, couch_util:to_binary(Msg)},
- exit(#doc_worker_result{id = Id, wref = WRef, result = Result})
- end.
-
-
-% Try to start a replication. Used by a worker. This function should return
-% rep_start_result(), also throws {filter_fetch_error, Reason} if cannot fetch
-% filter.It can also block for an indeterminate amount of time while fetching
-% filter.
-maybe_start_replication(Id, RepWithoutId, WRef) ->
- Rep = couch_replicator_docs:update_rep_id(RepWithoutId),
- case maybe_add_job_to_scheduler(Id, Rep, WRef) of
- ignore ->
- ignore;
- {ok, RepId} ->
- {ok, RepId};
- {temporary_error, Reason} ->
- {temporary_error, Reason};
- {permanent_failure, Reason} ->
- {DbName, DocId} = Id,
- couch_replicator_docs:update_failed(DbName, DocId, Reason),
- {permanent_failure, Reason}
- end.
-
-
--spec maybe_add_job_to_scheduler(db_doc_id(), #rep{}, reference()) ->
- rep_start_result().
-maybe_add_job_to_scheduler({DbName, DocId}, Rep, WRef) ->
- RepId = Rep#rep.id,
- case couch_replicator_scheduler:rep_state(RepId) of
- nil ->
- % Before adding a job check that this worker is still the current
- % worker. This is to handle a race condition where a worker which was
- % sleeping and then checking a replication filter may inadvertently
- % re-add a replication which was already deleted.
- case couch_replicator_doc_processor:get_worker_ref({DbName, DocId}) of
- WRef ->
- ok = couch_replicator_scheduler:add_job(Rep),
- {ok, RepId};
- _NilOrOtherWRef ->
- ignore
- end;
- #rep{doc_id = DocId} ->
- {ok, RepId};
- #rep{doc_id = null} ->
- Msg = io_lib:format("Replication `~s` specified by document `~s`"
- " already running as a transient replication, started via"
- " `_replicate` API endpoint", [pp_rep_id(RepId), DocId]),
- {temporary_error, couch_util:to_binary(Msg)};
- #rep{db_name = OtherDb, doc_id = OtherDocId} ->
- Msg = io_lib:format("Replication `~s` specified by document `~s`"
- " already started, triggered by document `~s` from db `~s`",
- [pp_rep_id(RepId), DocId, OtherDocId, mem3:dbname(OtherDb)]),
- {permanent_failure, couch_util:to_binary(Msg)}
- end.
-
-
--ifdef(TEST).
-
--include_lib("eunit/include/eunit.hrl").
-
--define(DB, <<"db">>).
--define(DOC1, <<"doc1">>).
--define(R1, {"ad08e05057046eabe898a2572bbfb573", ""}).
-
-
-doc_processor_worker_test_() ->
- {
- foreach,
- fun setup/0,
- fun teardown/1,
- [
- t_should_add_job(),
- t_already_running_same_docid(),
- t_already_running_transient(),
- t_already_running_other_db_other_doc(),
- t_spawn_worker(),
- t_ignore_if_doc_deleted(),
- t_ignore_if_worker_ref_does_not_match()
- ]
- }.
-
-
-% Replication is already running, with same doc id. Ignore change.
-t_should_add_job() ->
- ?_test(begin
- Id = {?DB, ?DOC1},
- Rep = couch_replicator_docs:parse_rep_doc_without_id(change()),
- ?assertEqual({ok, ?R1}, maybe_start_replication(Id, Rep, nil)),
- ?assert(added_job())
- end).
-
-
-% Replication is already running, with same doc id. Ignore change.
-t_already_running_same_docid() ->
- ?_test(begin
- Id = {?DB, ?DOC1},
- mock_already_running(?DB, ?DOC1),
- Rep = couch_replicator_docs:parse_rep_doc_without_id(change()),
- ?assertEqual({ok, ?R1}, maybe_start_replication(Id, Rep, nil)),
- ?assert(did_not_add_job())
- end).
-
-
-% There is a transient replication with same replication id running. Ignore.
-t_already_running_transient() ->
- ?_test(begin
- Id = {?DB, ?DOC1},
- mock_already_running(null, null),
- Rep = couch_replicator_docs:parse_rep_doc_without_id(change()),
- ?assertMatch({temporary_error, _}, maybe_start_replication(Id, Rep,
- nil)),
- ?assert(did_not_add_job())
- end).
-
-
-% There is a duplicate replication potentially from a different db and doc.
-% Write permanent failure to doc.
-t_already_running_other_db_other_doc() ->
- ?_test(begin
- Id = {?DB, ?DOC1},
- mock_already_running(<<"otherdb">>, <<"otherdoc">>),
- Rep = couch_replicator_docs:parse_rep_doc_without_id(change()),
- ?assertMatch({permanent_failure, _}, maybe_start_replication(Id, Rep,
- nil)),
- ?assert(did_not_add_job()),
- 1 == meck:num_calls(couch_replicator_docs, update_failed, '_')
- end).
-
-
-% Should spawn worker
-t_spawn_worker() ->
- ?_test(begin
- Id = {?DB, ?DOC1},
- Rep = couch_replicator_docs:parse_rep_doc_without_id(change()),
- WRef = make_ref(),
- meck:expect(couch_replicator_doc_processor, get_worker_ref, 1, WRef),
- Pid = spawn_worker(Id, Rep, 0, WRef),
- Res = receive {'DOWN', _Ref, process, Pid, Reason} -> Reason
- after 1000 -> timeout end,
- Expect = #doc_worker_result{id = Id, wref = WRef, result = {ok, ?R1}},
- ?assertEqual(Expect, Res),
- ?assert(added_job())
- end).
-
-
-% Should not add job if by the time worker got to fetching the filter
-% and getting a replication id, replication doc was deleted
-t_ignore_if_doc_deleted() ->
- ?_test(begin
- Id = {?DB, ?DOC1},
- Rep = couch_replicator_docs:parse_rep_doc_without_id(change()),
- meck:expect(couch_replicator_doc_processor, get_worker_ref, 1, nil),
- ?assertEqual(ignore, maybe_start_replication(Id, Rep, make_ref())),
- ?assertNot(added_job())
- end).
-
-
-% Should not add job if by the time worker got to fetchign the filter
-% and building a replication id, another worker was spawned.
-t_ignore_if_worker_ref_does_not_match() ->
- ?_test(begin
- Id = {?DB, ?DOC1},
- Rep = couch_replicator_docs:parse_rep_doc_without_id(change()),
- meck:expect(couch_replicator_doc_processor, get_worker_ref, 1,
- make_ref()),
- ?assertEqual(ignore, maybe_start_replication(Id, Rep, make_ref())),
- ?assertNot(added_job())
- end).
-
-
-% Test helper functions
-
-setup() ->
- meck:expect(couch_replicator_scheduler, add_job, 1, ok),
- meck:expect(config, get, fun(_, _, Default) -> Default end),
- meck:expect(couch_server, get_uuid, 0, this_is_snek),
- meck:expect(couch_replicator_docs, update_failed, 3, ok),
- meck:expect(couch_replicator_scheduler, rep_state, 1, nil),
- meck:expect(couch_replicator_doc_processor, get_worker_ref, 1, nil),
- ok.
-
-
-teardown(_) ->
- meck:unload().
-
-
-mock_already_running(DbName, DocId) ->
- meck:expect(couch_replicator_scheduler, rep_state,
- fun(RepId) -> #rep{id = RepId, doc_id = DocId, db_name = DbName} end).
-
-
-added_job() ->
- 1 == meck:num_calls(couch_replicator_scheduler, add_job, '_').
-
-
-did_not_add_job() ->
- 0 == meck:num_calls(couch_replicator_scheduler, add_job, '_').
-
-
-change() ->
- {[
- {<<"_id">>, ?DOC1},
- {<<"source">>, <<"http://srchost.local/src">>},
- {<<"target">>, <<"http://tgthost.local/tgt">>}
- ]}.
-
--endif.
diff --git a/src/couch_replicator/src/couch_replicator_fabric.erl b/src/couch_replicator/src/couch_replicator_fabric.erl
deleted file mode 100644
index 1650105b5..000000000
--- a/src/couch_replicator/src/couch_replicator_fabric.erl
+++ /dev/null
@@ -1,155 +0,0 @@
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-%
-% http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
-
--module(couch_replicator_fabric).
-
--export([
- docs/5
-]).
-
--include_lib("fabric/include/fabric.hrl").
--include_lib("mem3/include/mem3.hrl").
--include_lib("couch/include/couch_db.hrl").
--include_lib("couch_mrview/include/couch_mrview.hrl").
-
-docs(DbName, Options, QueryArgs, Callback, Acc) ->
- Shards = mem3:shards(DbName),
- Workers0 = fabric_util:submit_jobs(
- Shards, couch_replicator_fabric_rpc, docs, [Options, QueryArgs]),
- RexiMon = fabric_util:create_monitors(Workers0),
- try
- case fabric_streams:start(Workers0, #shard.ref) of
- {ok, Workers} ->
- try
- docs_int(DbName, Workers, QueryArgs, Callback, Acc)
- after
- fabric_streams:cleanup(Workers)
- end;
- {timeout, NewState} ->
- DefunctWorkers = fabric_util:remove_done_workers(
- NewState#stream_acc.workers, waiting
- ),
- fabric_util:log_timeout(
- DefunctWorkers,
- "replicator docs"
- ),
- Callback({error, timeout}, Acc);
- {error, Error} ->
- Callback({error, Error}, Acc)
- end
- after
- rexi_monitor:stop(RexiMon)
- end.
-
-
-docs_int(DbName, Workers, QueryArgs, Callback, Acc0) ->
- #mrargs{limit = Limit, skip = Skip} = QueryArgs,
- State = #collector{
- db_name = DbName,
- query_args = QueryArgs,
- callback = Callback,
- counters = fabric_dict:init(Workers, 0),
- skip = Skip,
- limit = Limit,
- user_acc = Acc0,
- update_seq = nil
- },
- case rexi_utils:recv(Workers, #shard.ref, fun handle_message/3,
- State, infinity, 5000) of
- {ok, NewState} ->
- {ok, NewState#collector.user_acc};
- {timeout, NewState} ->
- Callback({error, timeout}, NewState#collector.user_acc);
- {error, Resp} ->
- {ok, Resp}
- end.
-
-handle_message({rexi_DOWN, _, {_, NodeRef}, _}, _, State) ->
- fabric_view:check_down_shards(State, NodeRef);
-
-handle_message({rexi_EXIT, Reason}, Worker, State) ->
- fabric_view:handle_worker_exit(State, Worker, Reason);
-
-handle_message({meta, Meta0}, {Worker, From}, State) ->
- Tot = couch_util:get_value(total, Meta0, 0),
- Off = couch_util:get_value(offset, Meta0, 0),
- #collector{
- callback = Callback,
- counters = Counters0,
- total_rows = Total0,
- offset = Offset0,
- user_acc = AccIn
- } = State,
- % Assert that we don't have other messages from this
- % worker when the total_and_offset message arrives.
- 0 = fabric_dict:lookup_element(Worker, Counters0),
- rexi:stream_ack(From),
- Counters1 = fabric_dict:update_counter(Worker, 1, Counters0),
- Total = Total0 + Tot,
- Offset = Offset0 + Off,
- case fabric_dict:any(0, Counters1) of
- true ->
- {ok, State#collector{
- counters = Counters1,
- total_rows = Total,
- offset = Offset
- }};
- false ->
- FinalOffset = erlang:min(Total, Offset+State#collector.skip),
- Meta = [{total, Total}, {offset, FinalOffset}],
- {Go, Acc} = Callback({meta, Meta}, AccIn),
- {Go, State#collector{
- counters = fabric_dict:decrement_all(Counters1),
- total_rows = Total,
- offset = FinalOffset,
- user_acc = Acc
- }}
- end;
-
-handle_message(#view_row{id = Id, doc = Doc} = Row0, {Worker, From}, State) ->
- #collector{query_args = Args, counters = Counters0, rows = Rows0} = State,
- case maybe_fetch_and_filter_doc(Id, Doc, State) of
- {[_ | _]} = NewDoc ->
- Row = Row0#view_row{doc = NewDoc},
- Dir = Args#mrargs.direction,
- Rows = merge_row(Dir, Row#view_row{worker={Worker, From}}, Rows0),
- Counters1 = fabric_dict:update_counter(Worker, 1, Counters0),
- State1 = State#collector{rows=Rows, counters=Counters1},
- fabric_view:maybe_send_row(State1);
- skip ->
- rexi:stream_ack(From),
- {ok, State}
- end;
-
-handle_message(complete, Worker, State) ->
- Counters = fabric_dict:update_counter(Worker, 1, State#collector.counters),
- fabric_view:maybe_send_row(State#collector{counters = Counters}).
-
-
-merge_row(fwd, Row, Rows) ->
- lists:keymerge(#view_row.id, [Row], Rows);
-merge_row(rev, Row, Rows) ->
- lists:rkeymerge(#view_row.id, [Row], Rows).
-
-
-maybe_fetch_and_filter_doc(Id, undecided, State) ->
- #collector{db_name = DbName, query_args = #mrargs{extra = Extra}} = State,
- FilterStates = proplists:get_value(filter_states, Extra),
- case couch_replicator:active_doc(DbName, Id) of
- {ok, {Props} = DocInfo} ->
- DocState = couch_util:get_value(state, Props),
- couch_replicator_utils:filter_state(DocState, FilterStates, DocInfo);
- {error, not_found} ->
- skip % could have been deleted
- end;
-maybe_fetch_and_filter_doc(_Id, Doc, _State) ->
- Doc.
diff --git a/src/couch_replicator/src/couch_replicator_fabric_rpc.erl b/src/couch_replicator/src/couch_replicator_fabric_rpc.erl
deleted file mode 100644
index d67f87548..000000000
--- a/src/couch_replicator/src/couch_replicator_fabric_rpc.erl
+++ /dev/null
@@ -1,97 +0,0 @@
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-%
-% http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
-
--module(couch_replicator_fabric_rpc).
-
--export([
- docs/3
-]).
-
--include_lib("fabric/include/fabric.hrl").
--include_lib("couch/include/couch_db.hrl").
--include_lib("couch_mrview/include/couch_mrview.hrl").
-
-
-docs(DbName, Options, Args0) ->
- set_io_priority(DbName, Options),
- #mrargs{skip = Skip, limit = Limit, extra = Extra} = Args0,
- FilterStates = proplists:get_value(filter_states, Extra),
- Args = Args0#mrargs{skip = 0, limit = Skip + Limit},
- HealthThreshold = couch_replicator_scheduler:health_threshold(),
- {ok, Db} = couch_db:open_int(DbName, Options),
- Acc = {DbName, FilterStates, HealthThreshold},
- couch_mrview:query_all_docs(Db, Args, fun docs_cb/2, Acc).
-
-
-docs_cb({meta, Meta}, Acc) ->
- ok = rexi:stream2({meta, Meta}),
- {ok, Acc};
-docs_cb({row, Row}, {DbName, States, HealthThreshold} = Acc) ->
- Id = couch_util:get_value(id, Row),
- Doc = couch_util:get_value(doc, Row),
- ViewRow = #view_row{
- id = Id,
- key = couch_util:get_value(key, Row),
- value = couch_util:get_value(value, Row)
- },
- case rep_doc_state(DbName, Id, Doc, States, HealthThreshold) of
- skip ->
- ok;
- Other ->
- ok = rexi:stream2(ViewRow#view_row{doc = Other})
- end,
- {ok, Acc};
-docs_cb(complete, Acc) ->
- ok = rexi:stream_last(complete),
- {ok, Acc}.
-
-
-set_io_priority(DbName, Options) ->
- case lists:keyfind(io_priority, 1, Options) of
- {io_priority, Pri} ->
- erlang:put(io_priority, Pri);
- false ->
- erlang:put(io_priority, {interactive, DbName})
- end.
-
-
-%% Get the state of the replication document. If it is found and has a terminal
-%% state then it can be filtered and either included in the results or skipped.
-%% If it is not in a terminal state, look it up in the local doc processor ETS
-%% table. If it is there then filter by state. If it is not found there either
-%% then mark it as `undecided` and let the coordinator try to fetch it. The
-%% The idea is to do as much work as possible locally and leave the minimum
-%% amount of work for the coordinator.
-rep_doc_state(_Shard, <<"_design/", _/binary>>, _, _, _) ->
- skip;
-rep_doc_state(Shard, Id, {[_ | _]} = Doc, States, HealthThreshold) ->
- DbName = mem3:dbname(Shard),
- DocInfo = couch_replicator:info_from_doc(DbName, Doc),
- case get_doc_state(DocInfo) of
- null ->
- % Fetch from local doc processor. If there, filter by state.
- % If not there, mark as undecided. Let coordinator figure it out.
- case couch_replicator_doc_processor:doc_lookup(Shard, Id,
- HealthThreshold) of
- {ok, EtsInfo} ->
- State = get_doc_state(EtsInfo),
- couch_replicator_utils:filter_state(State, States, EtsInfo);
- {error, not_found} ->
- undecided
- end;
- OtherState when is_atom(OtherState) ->
- couch_replicator_utils:filter_state(OtherState, States, DocInfo)
- end.
-
-
-get_doc_state({Props})->
- couch_util:get_value(state, Props).
diff --git a/src/couch_replicator/src/couch_replicator_httpd_util.erl b/src/couch_replicator/src/couch_replicator_httpd_util.erl
deleted file mode 100644
index 624eddd2f..000000000
--- a/src/couch_replicator/src/couch_replicator_httpd_util.erl
+++ /dev/null
@@ -1,201 +0,0 @@
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-%
-% http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
-
--module(couch_replicator_httpd_util).
-
--include_lib("couch/include/couch_db.hrl").
--include_lib("couch_mrview/include/couch_mrview.hrl").
-
--export([
- validate_rep_props/1,
- parse_int_param/5,
- parse_replication_state_filter/1,
- update_db_name/1,
- docs_acc_new/3,
- docs_acc_response/1,
- docs_cb/2
-]).
-
--import(couch_httpd, [
- send_json/2,
- send_json/3,
- send_method_not_allowed/2
-]).
-
--import(couch_util, [
- to_binary/1
-]).
-
-
-parse_replication_state_filter(undefined) ->
- []; % This is the default (wildcard) filter
-parse_replication_state_filter(States) when is_list(States) ->
- AllStates = couch_replicator:replication_states(),
- StrStates = [string:to_lower(S) || S <- string:tokens(States, ",")],
- AtomStates = try
- [list_to_existing_atom(S) || S <- StrStates]
- catch error:badarg ->
- Msg1 = io_lib:format("States must be one or more of ~w", [AllStates]),
- throw({query_parse_error, ?l2b(Msg1)})
- end,
- AllSet = sets:from_list(AllStates),
- StatesSet = sets:from_list(AtomStates),
- Diff = sets:to_list(sets:subtract(StatesSet, AllSet)),
- case Diff of
- [] ->
- AtomStates;
- _ ->
- Args = [Diff, AllStates],
- Msg2 = io_lib:format("Unknown states ~w. Choose from: ~w", Args),
- throw({query_parse_error, ?l2b(Msg2)})
- end.
-
-
-parse_int_param(Req, Param, Default, Min, Max) ->
- IntVal = try
- list_to_integer(chttpd:qs_value(Req, Param, integer_to_list(Default)))
- catch error:badarg ->
- Msg1 = io_lib:format("~s must be an integer", [Param]),
- throw({query_parse_error, ?l2b(Msg1)})
- end,
- case IntVal >= Min andalso IntVal =< Max of
- true ->
- IntVal;
- false ->
- Msg2 = io_lib:format("~s not in range of [~w,~w]", [Param, Min, Max]),
- throw({query_parse_error, ?l2b(Msg2)})
- end.
-
-
-validate_rep_props([]) ->
- ok;
-validate_rep_props([{<<"query_params">>, {Params}}|Rest]) ->
- lists:foreach(fun
- ({_,V}) when is_binary(V) -> ok;
- ({K,_}) -> throw({bad_request,
- <<K/binary," value must be a string.">>})
- end, Params),
- validate_rep_props(Rest);
-validate_rep_props([_|Rest]) ->
- validate_rep_props(Rest).
-
-
-prepend_val(#vacc{prepend=Prepend}) ->
- case Prepend of
- undefined ->
- "";
- _ ->
- Prepend
- end.
-
-
-maybe_flush_response(#vacc{bufsize=Size, threshold=Max} = Acc, Data, Len)
- when Size > 0 andalso (Size + Len) > Max ->
- #vacc{buffer = Buffer, resp = Resp} = Acc,
- {ok, R1} = chttpd:send_delayed_chunk(Resp, Buffer),
- {ok, Acc#vacc{prepend = ",\r\n", buffer = Data, bufsize = Len, resp = R1}};
-maybe_flush_response(Acc0, Data, Len) ->
- #vacc{buffer = Buf, bufsize = Size} = Acc0,
- Acc = Acc0#vacc{
- prepend = ",\r\n",
- buffer = [Buf | Data],
- bufsize = Size + Len
- },
- {ok, Acc}.
-
-docs_acc_new(Req, Db, Threshold) ->
- #vacc{db=Db, req=Req, threshold=Threshold}.
-
-docs_acc_response(#vacc{resp = Resp}) ->
- Resp.
-
-docs_cb({error, Reason}, #vacc{resp=undefined}=Acc) ->
- {ok, Resp} = chttpd:send_error(Acc#vacc.req, Reason),
- {ok, Acc#vacc{resp=Resp}};
-
-docs_cb(complete, #vacc{resp=undefined}=Acc) ->
- % Nothing in view
- {ok, Resp} = chttpd:send_json(Acc#vacc.req, 200, {[{rows, []}]}),
- {ok, Acc#vacc{resp=Resp}};
-
-docs_cb(Msg, #vacc{resp=undefined}=Acc) ->
- %% Start response
- Headers = [],
- {ok, Resp} = chttpd:start_delayed_json_response(Acc#vacc.req, 200, Headers),
- docs_cb(Msg, Acc#vacc{resp=Resp, should_close=true});
-
-docs_cb({error, Reason}, #vacc{resp=Resp}=Acc) ->
- {ok, Resp1} = chttpd:send_delayed_error(Resp, Reason),
- {ok, Acc#vacc{resp=Resp1}};
-
-docs_cb(complete, #vacc{resp=Resp, buffer=Buf, threshold=Max}=Acc) ->
- % Finish view output and possibly end the response
- {ok, Resp1} = chttpd:close_delayed_json_object(Resp, Buf, "\r\n]}", Max),
- case Acc#vacc.should_close of
- true ->
- {ok, Resp2} = chttpd:end_delayed_json_response(Resp1),
- {ok, Acc#vacc{resp=Resp2}};
- _ ->
- {ok, Acc#vacc{resp=Resp1, meta_sent=false, row_sent=false,
- prepend=",\r\n", buffer=[], bufsize=0}}
- end;
-
-docs_cb({meta, Meta}, #vacc{meta_sent=false, row_sent=false}=Acc) ->
- % Sending metadata as we've not sent it or any row yet
- Parts = case couch_util:get_value(total, Meta) of
- undefined -> [];
- Total -> [io_lib:format("\"total_rows\":~p", [adjust_total(Total)])]
- end ++ case couch_util:get_value(offset, Meta) of
- undefined -> [];
- Offset -> [io_lib:format("\"offset\":~p", [Offset])]
- end ++ ["\"docs\":["],
- Chunk = [prepend_val(Acc), "{", string:join(Parts, ","), "\r\n"],
- {ok, AccOut} = maybe_flush_response(Acc, Chunk, iolist_size(Chunk)),
- {ok, AccOut#vacc{prepend="", meta_sent=true}};
-
-
-docs_cb({meta, _Meta}, #vacc{}=Acc) ->
- %% ignore metadata
- {ok, Acc};
-
-docs_cb({row, Row}, #vacc{meta_sent=false}=Acc) ->
- %% sorted=false and row arrived before meta
- % Adding another row
- Chunk = [prepend_val(Acc), "{\"docs\":[\r\n", row_to_json(Row)],
- maybe_flush_response(Acc#vacc{meta_sent=true, row_sent=true}, Chunk, iolist_size(Chunk));
-
-docs_cb({row, Row}, #vacc{meta_sent=true}=Acc) ->
- % Adding another row
- Chunk = [prepend_val(Acc), row_to_json(Row)],
- maybe_flush_response(Acc#vacc{row_sent=true}, Chunk, iolist_size(Chunk)).
-
-
-update_db_name({Props}) ->
- {value, {database, DbName}, Props1} = lists:keytake(database, 1, Props),
- {[{database, normalize_db_name(DbName)} | Props1]}.
-
-normalize_db_name(<<"shards/", _/binary>> = DbName) ->
- mem3:dbname(DbName);
-normalize_db_name(DbName) ->
- DbName.
-
-row_to_json(Row) ->
- Doc0 = couch_util:get_value(doc, Row),
- Doc1 = update_db_name(Doc0),
- ?JSON_ENCODE(Doc1).
-
-
-%% Adjust Total as there is an automatically created validation design doc
-adjust_total(Total) when is_integer(Total), Total > 0 ->
- Total - 1;
-adjust_total(Total) when is_integer(Total) ->
- 0.
diff --git a/src/couch_replicator/src/couch_replicator_job_sup.erl b/src/couch_replicator/src/couch_replicator_job_sup.erl
deleted file mode 100644
index 9ea65e85f..000000000
--- a/src/couch_replicator/src/couch_replicator_job_sup.erl
+++ /dev/null
@@ -1,34 +0,0 @@
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-%
-% http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
-
--module(couch_replicator_job_sup).
-
--behaviour(supervisor).
-
--export([
- init/1,
- start_link/0
-]).
-
-start_link() ->
- supervisor:start_link({local,?MODULE}, ?MODULE, []).
-
-%%=============================================================================
-%% supervisor callbacks
-%%=============================================================================
-
-init([]) ->
- {ok, {{one_for_one, 3, 10}, []}}.
-
-%%=============================================================================
-%% internal functions
-%%=============================================================================
diff --git a/src/couch_replicator/src/couch_replicator_js_functions.hrl b/src/couch_replicator/src/couch_replicator_js_functions.hrl
deleted file mode 100644
index d41043309..000000000
--- a/src/couch_replicator/src/couch_replicator_js_functions.hrl
+++ /dev/null
@@ -1,177 +0,0 @@
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-%
-% http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
-
--define(REP_DB_DOC_VALIDATE_FUN, <<"
- function(newDoc, oldDoc, userCtx) {
- function reportError(error_msg) {
- log('Error writing document `' + newDoc._id +
- '\\' to the replicator database: ' + error_msg);
- throw({forbidden: error_msg});
- }
-
- function validateEndpoint(endpoint, fieldName) {
- if ((typeof endpoint !== 'string') &&
- ((typeof endpoint !== 'object') || (endpoint === null))) {
-
- reportError('The `' + fieldName + '\\' property must exist' +
- ' and be either a string or an object.');
- }
-
- if (typeof endpoint === 'object') {
- if ((typeof endpoint.url !== 'string') || !endpoint.url) {
- reportError('The url property must exist in the `' +
- fieldName + '\\' field and must be a non-empty string.');
- }
-
- if ((typeof endpoint.auth !== 'undefined') &&
- ((typeof endpoint.auth !== 'object') ||
- endpoint.auth === null)) {
-
- reportError('`' + fieldName +
- '.auth\\' must be a non-null object.');
- }
-
- if ((typeof endpoint.headers !== 'undefined') &&
- ((typeof endpoint.headers !== 'object') ||
- endpoint.headers === null)) {
-
- reportError('`' + fieldName +
- '.headers\\' must be a non-null object.');
- }
- }
- }
-
- var isReplicator = (userCtx.roles.indexOf('_replicator') >= 0);
- var isAdmin = (userCtx.roles.indexOf('_admin') >= 0);
-
- if (isReplicator) {
- // Always let replicator update the replication document
- return;
- }
-
- if (newDoc._replication_state === 'failed') {
- // Skip validation in case when we update the document with the
- // failed state. In this case it might be malformed. However,
- // replicator will not pay attention to failed documents so this
- // is safe.
- return;
- }
-
- if (!newDoc._deleted) {
- validateEndpoint(newDoc.source, 'source');
- validateEndpoint(newDoc.target, 'target');
-
- if ((typeof newDoc.create_target !== 'undefined') &&
- (typeof newDoc.create_target !== 'boolean')) {
-
- reportError('The `create_target\\' field must be a boolean.');
- }
-
- if ((typeof newDoc.continuous !== 'undefined') &&
- (typeof newDoc.continuous !== 'boolean')) {
-
- reportError('The `continuous\\' field must be a boolean.');
- }
-
- if ((typeof newDoc.doc_ids !== 'undefined') &&
- !isArray(newDoc.doc_ids)) {
-
- reportError('The `doc_ids\\' field must be an array of strings.');
- }
-
- if ((typeof newDoc.selector !== 'undefined') &&
- (typeof newDoc.selector !== 'object')) {
-
- reportError('The `selector\\' field must be an object.');
- }
-
- if ((typeof newDoc.filter !== 'undefined') &&
- ((typeof newDoc.filter !== 'string') || !newDoc.filter)) {
-
- reportError('The `filter\\' field must be a non-empty string.');
- }
-
- if ((typeof newDoc.doc_ids !== 'undefined') &&
- (typeof newDoc.selector !== 'undefined')) {
-
- reportError('`doc_ids\\' field is incompatible with `selector\\'.');
- }
-
- if ( ((typeof newDoc.doc_ids !== 'undefined') ||
- (typeof newDoc.selector !== 'undefined')) &&
- (typeof newDoc.filter !== 'undefined') ) {
-
- reportError('`filter\\' field is incompatible with `selector\\' and `doc_ids\\'.');
- }
-
- if ((typeof newDoc.query_params !== 'undefined') &&
- ((typeof newDoc.query_params !== 'object') ||
- newDoc.query_params === null)) {
-
- reportError('The `query_params\\' field must be an object.');
- }
-
- if (newDoc.user_ctx) {
- var user_ctx = newDoc.user_ctx;
-
- if ((typeof user_ctx !== 'object') || (user_ctx === null)) {
- reportError('The `user_ctx\\' property must be a ' +
- 'non-null object.');
- }
-
- if (!(user_ctx.name === null ||
- (typeof user_ctx.name === 'undefined') ||
- ((typeof user_ctx.name === 'string') &&
- user_ctx.name.length > 0))) {
-
- reportError('The `user_ctx.name\\' property must be a ' +
- 'non-empty string or null.');
- }
-
- if (!isAdmin && (user_ctx.name !== userCtx.name)) {
- reportError('The given `user_ctx.name\\' is not valid');
- }
-
- if (user_ctx.roles && !isArray(user_ctx.roles)) {
- reportError('The `user_ctx.roles\\' property must be ' +
- 'an array of strings.');
- }
-
- if (!isAdmin && user_ctx.roles) {
- for (var i = 0; i < user_ctx.roles.length; i++) {
- var role = user_ctx.roles[i];
-
- if (typeof role !== 'string' || role.length === 0) {
- reportError('Roles must be non-empty strings.');
- }
- if (userCtx.roles.indexOf(role) === -1) {
- reportError('Invalid role (`' + role +
- '\\') in the `user_ctx\\'');
- }
- }
- }
- } else {
- if (!isAdmin) {
- reportError('The `user_ctx\\' property is missing (it is ' +
- 'optional for admins only).');
- }
- }
- } else {
- if (!isAdmin) {
- if (!oldDoc.user_ctx || (oldDoc.user_ctx.name !== userCtx.name)) {
- reportError('Replication documents can only be deleted by ' +
- 'admins or by the users who created them.');
- }
- }
- }
- }
-">>).
diff --git a/src/couch_replicator/src/couch_replicator_notifier.erl b/src/couch_replicator/src/couch_replicator_notifier.erl
deleted file mode 100644
index f7640a349..000000000
--- a/src/couch_replicator/src/couch_replicator_notifier.erl
+++ /dev/null
@@ -1,58 +0,0 @@
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-%
-% http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
-
--module(couch_replicator_notifier).
-
--behaviour(gen_event).
--vsn(1).
-
-% public API
--export([start_link/1, stop/1, notify/1]).
-
-% gen_event callbacks
--export([init/1, terminate/2, code_change/3]).
--export([handle_event/2, handle_call/2, handle_info/2]).
-
--include_lib("couch/include/couch_db.hrl").
-
-start_link(FunAcc) ->
- couch_event_sup:start_link(couch_replication,
- {couch_replicator_notifier, make_ref()}, FunAcc).
-
-notify(Event) ->
- gen_event:notify(couch_replication, Event).
-
-stop(Pid) ->
- couch_event_sup:stop(Pid).
-
-
-init(FunAcc) ->
- {ok, FunAcc}.
-
-terminate(_Reason, _State) ->
- ok.
-
-handle_event(Event, Fun) when is_function(Fun, 1) ->
- Fun(Event),
- {ok, Fun};
-handle_event(Event, {Fun, Acc}) when is_function(Fun, 2) ->
- Acc2 = Fun(Event, Acc),
- {ok, {Fun, Acc2}}.
-
-handle_call(_Msg, State) ->
- {ok, ok, State}.
-
-handle_info(_Msg, State) ->
- {ok, State}.
-
-code_change(_OldVsn, State, _Extra) ->
- {ok, State}.
diff --git a/src/couch_replicator/src/couch_replicator_scheduler.erl b/src/couch_replicator/src/couch_replicator_scheduler.erl
deleted file mode 100644
index 00a352bee..000000000
--- a/src/couch_replicator/src/couch_replicator_scheduler.erl
+++ /dev/null
@@ -1,1688 +0,0 @@
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-%
-% http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
-
--module(couch_replicator_scheduler).
-
--behaviour(gen_server).
--behaviour(config_listener).
-
--export([
- start_link/0
-]).
-
--export([
- init/1,
- terminate/2,
- handle_call/3,
- handle_info/2,
- handle_cast/2,
- code_change/3,
- format_status/2
-]).
-
--export([
- add_job/1,
- remove_job/1,
- reschedule/0,
- rep_state/1,
- find_jobs_by_dbname/1,
- find_jobs_by_doc/2,
- job_summary/2,
- health_threshold/0,
- jobs/0,
- job/1,
- restart_job/1,
- update_job_stats/2
-]).
-
-%% config_listener callbacks
--export([
- handle_config_change/5,
- handle_config_terminate/3
-]).
-
-%% for status updater process to allow hot code loading
--export([
- stats_updater_loop/1
-]).
-
--include("couch_replicator_scheduler.hrl").
--include("couch_replicator.hrl").
--include_lib("couch_replicator/include/couch_replicator_api_wrap.hrl").
--include_lib("couch/include/couch_db.hrl").
-
-%% types
--type event_type() :: added | started | stopped | {crashed, any()}.
--type event() :: {Type:: event_type(), When :: erlang:timestamp()}.
--type history() :: nonempty_list(event()).
-
-%% definitions
--define(MAX_BACKOFF_EXPONENT, 10).
--define(BACKOFF_INTERVAL_MICROS, 30 * 1000 * 1000).
--define(DEFAULT_HEALTH_THRESHOLD_SEC, 2 * 60).
--define(RELISTEN_DELAY, 5000).
--define(STATS_UPDATE_WAIT, 5000).
-
--define(DEFAULT_MAX_JOBS, 500).
--define(DEFAULT_MAX_CHURN, 20).
--define(DEFAULT_MAX_HISTORY, 20).
--define(DEFAULT_SCHEDULER_INTERVAL, 60000).
-
-
--record(state, {interval, timer, max_jobs, max_churn, max_history, stats_pid}).
--record(job, {
- id :: job_id() | '$1' | '_',
- rep :: #rep{} | '_',
- pid :: undefined | pid() | '$1' | '_',
- monitor :: undefined | reference() | '_',
- history :: history() | '_'
-}).
-
--record(stats_acc, {
- pending_n = 0 :: non_neg_integer(),
- running_n = 0 :: non_neg_integer(),
- crashed_n = 0 :: non_neg_integer()
-}).
-
-
-%% public functions
-
--spec start_link() -> {ok, pid()} | ignore | {error, term()}.
-start_link() ->
- gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
-
-
--spec add_job(#rep{}) -> ok.
-add_job(#rep{} = Rep) when Rep#rep.id /= undefined ->
- case existing_replication(Rep) of
- false ->
- Job = #job{
- id = Rep#rep.id,
- rep = Rep,
- history = [{added, os:timestamp()}]
- },
- gen_server:call(?MODULE, {add_job, Job}, infinity);
- true ->
- ok
- end.
-
-
--spec remove_job(job_id()) -> ok.
-remove_job(Id) ->
- gen_server:call(?MODULE, {remove_job, Id}, infinity).
-
-
--spec reschedule() -> ok.
-% Trigger a manual reschedule. Used for testing and/or ops.
-reschedule() ->
- gen_server:call(?MODULE, reschedule, infinity).
-
-
--spec rep_state(rep_id()) -> #rep{} | nil.
-rep_state(RepId) ->
- case (catch ets:lookup_element(?MODULE, RepId, #job.rep)) of
- {'EXIT',{badarg, _}} ->
- nil;
- Rep ->
- Rep
- end.
-
-
--spec job_summary(job_id(), non_neg_integer()) -> [_] | nil.
-job_summary(JobId, HealthThreshold) ->
- case job_by_id(JobId) of
- {ok, #job{pid = Pid, history = History, rep = Rep}} ->
- ErrorCount = consecutive_crashes(History, HealthThreshold),
- {State, Info} = case {Pid, ErrorCount} of
- {undefined, 0} ->
- case History of
- [{{crashed, Error}, _When} | _] ->
- {crashing, crash_reason_json(Error)};
- [_ | _] ->
- {pending, Rep#rep.stats}
- end;
- {undefined, ErrorCount} when ErrorCount > 0 ->
- [{{crashed, Error}, _When} | _] = History,
- {crashing, crash_reason_json(Error)};
- {Pid, ErrorCount} when is_pid(Pid) ->
- {running, Rep#rep.stats}
- end,
- [
- {source, iolist_to_binary(ejson_url(Rep#rep.source))},
- {target, iolist_to_binary(ejson_url(Rep#rep.target))},
- {state, State},
- {info, couch_replicator_utils:ejson_state_info(Info)},
- {error_count, ErrorCount},
- {last_updated, last_updated(History)},
- {start_time,
- couch_replicator_utils:iso8601(Rep#rep.start_time)},
- {source_proxy, job_proxy_url(Rep#rep.source)},
- {target_proxy, job_proxy_url(Rep#rep.target)}
- ];
- {error, not_found} ->
- nil % Job might have just completed
- end.
-
-
-job_proxy_url(#httpdb{proxy_url = ProxyUrl}) when is_list(ProxyUrl) ->
- list_to_binary(couch_util:url_strip_password(ProxyUrl));
-job_proxy_url(_Endpoint) ->
- null.
-
-
-% Health threshold is the minimum amount of time an unhealthy job should run
-% crashing before it is considered to be healthy again. HealtThreashold should
-% not be 0 as jobs could start and immediately crash, and it shouldn't be
-% infinity, since then consecutive crashes would accumulate forever even if
-% job is back to normal.
--spec health_threshold() -> non_neg_integer().
-health_threshold() ->
- config:get_integer("replicator", "health_threshold",
- ?DEFAULT_HEALTH_THRESHOLD_SEC).
-
-
--spec find_jobs_by_dbname(binary()) -> list(#rep{}).
-find_jobs_by_dbname(DbName) ->
- Rep = #rep{db_name = DbName, _ = '_'},
- MatchSpec = #job{id = '$1', rep = Rep, _ = '_'},
- [RepId || [RepId] <- ets:match(?MODULE, MatchSpec)].
-
-
--spec find_jobs_by_doc(binary(), binary()) -> list(#rep{}).
-find_jobs_by_doc(DbName, DocId) ->
- Rep = #rep{db_name = DbName, doc_id = DocId, _ = '_'},
- MatchSpec = #job{id = '$1', rep = Rep, _ = '_'},
- [RepId || [RepId] <- ets:match(?MODULE, MatchSpec)].
-
-
--spec restart_job(binary() | list() | rep_id()) ->
- {ok, {[_]}} | {error, not_found}.
-restart_job(JobId) ->
- case rep_state(JobId) of
- nil ->
- {error, not_found};
- #rep{} = Rep ->
- ok = remove_job(JobId),
- ok = add_job(Rep),
- job(JobId)
- end.
-
-
--spec update_job_stats(job_id(), term()) -> ok.
-update_job_stats(JobId, Stats) ->
- gen_server:cast(?MODULE, {update_job_stats, JobId, Stats}).
-
-
-%% gen_server functions
-
-init(_) ->
- % Temporarily disable on FDB, as it's not fully implemented yet
- % config:enable_feature('scheduler'),
- EtsOpts = [named_table, {keypos, #job.id}, {read_concurrency, true},
- {write_concurrency, true}],
- ?MODULE = ets:new(?MODULE, EtsOpts),
- ok = config:listen_for_changes(?MODULE, nil),
- Interval = config:get_integer("replicator", "interval",
- ?DEFAULT_SCHEDULER_INTERVAL),
- MaxJobs = config:get_integer("replicator", "max_jobs", ?DEFAULT_MAX_JOBS),
- MaxChurn = config:get_integer("replicator", "max_churn",
- ?DEFAULT_MAX_CHURN),
- MaxHistory = config:get_integer("replicator", "max_history",
- ?DEFAULT_MAX_HISTORY),
- Timer = erlang:send_after(Interval, self(), reschedule),
- State = #state{
- interval = Interval,
- max_jobs = MaxJobs,
- max_churn = MaxChurn,
- max_history = MaxHistory,
- timer = Timer,
- stats_pid = start_stats_updater()
- },
- {ok, State}.
-
-
-handle_call({add_job, Job}, _From, State) ->
- ok = maybe_remove_job_int(Job#job.id, State),
- true = add_job_int(Job),
- ok = maybe_start_newly_added_job(Job, State),
- couch_stats:increment_counter([couch_replicator, jobs, adds]),
- TotalJobs = ets:info(?MODULE, size),
- couch_stats:update_gauge([couch_replicator, jobs, total], TotalJobs),
- {reply, ok, State};
-
-handle_call({remove_job, Id}, _From, State) ->
- ok = maybe_remove_job_int(Id, State),
- {reply, ok, State};
-
-handle_call(reschedule, _From, State) ->
- ok = reschedule(State),
- {reply, ok, State};
-
-handle_call(_, _From, State) ->
- {noreply, State}.
-
-
-handle_cast({set_max_jobs, MaxJobs}, State) when is_integer(MaxJobs),
- MaxJobs >= 0 ->
- couch_log:notice("~p: max_jobs set to ~B", [?MODULE, MaxJobs]),
- {noreply, State#state{max_jobs = MaxJobs}};
-
-handle_cast({set_max_churn, MaxChurn}, State) when is_integer(MaxChurn),
- MaxChurn > 0 ->
- couch_log:notice("~p: max_churn set to ~B", [?MODULE, MaxChurn]),
- {noreply, State#state{max_churn = MaxChurn}};
-
-handle_cast({set_max_history, MaxHistory}, State) when is_integer(MaxHistory),
- MaxHistory > 0 ->
- couch_log:notice("~p: max_history set to ~B", [?MODULE, MaxHistory]),
- {noreply, State#state{max_history = MaxHistory}};
-
-handle_cast({set_interval, Interval}, State) when is_integer(Interval),
- Interval > 0 ->
- couch_log:notice("~p: interval set to ~B", [?MODULE, Interval]),
- {noreply, State#state{interval = Interval}};
-
-handle_cast({update_job_stats, JobId, Stats}, State) ->
- case rep_state(JobId) of
- nil ->
- ok;
- #rep{} = Rep ->
- NewRep = Rep#rep{stats = Stats},
- true = ets:update_element(?MODULE, JobId, {#job.rep, NewRep})
- end,
- {noreply, State};
-
-handle_cast(UnexpectedMsg, State) ->
- couch_log:error("~p: received un-expected cast ~p", [?MODULE, UnexpectedMsg]),
- {noreply, State}.
-
-
-handle_info(reschedule, State) ->
- ok = reschedule(State),
- erlang:cancel_timer(State#state.timer),
- Timer = erlang:send_after(State#state.interval, self(), reschedule),
- {noreply, State#state{timer = Timer}};
-
-handle_info({'DOWN', _Ref, process, Pid, normal}, State) ->
- {ok, Job} = job_by_pid(Pid),
- couch_log:notice("~p: Job ~p completed normally", [?MODULE, Job#job.id]),
- remove_job_int(Job),
- update_running_jobs_stats(State#state.stats_pid),
- {noreply, State};
-
-handle_info({'DOWN', _Ref, process, Pid, Reason0}, State) ->
- {ok, Job} = job_by_pid(Pid),
- Reason = case Reason0 of
- {shutdown, ShutdownReason} -> ShutdownReason;
- Other -> Other
- end,
- ok = handle_crashed_job(Job, Reason, State),
- {noreply, State};
-
-handle_info(restart_config_listener, State) ->
- ok = config:listen_for_changes(?MODULE, nil),
- {noreply, State};
-
-handle_info(_, State) ->
- {noreply, State}.
-
-
-code_change(_OldVsn, State, _Extra) ->
- {ok, State}.
-
-
-terminate(_Reason, _State) ->
- ok.
-
-
-format_status(_Opt, [_PDict, State]) ->
- [
- {max_jobs, State#state.max_jobs},
- {running_jobs, running_job_count()},
- {pending_jobs, pending_job_count()}
- ].
-
-
-%% config listener functions
-
-handle_config_change("replicator", "max_jobs", V, _, S) ->
- ok = gen_server:cast(?MODULE, {set_max_jobs, list_to_integer(V)}),
- {ok, S};
-
-handle_config_change("replicator", "max_churn", V, _, S) ->
- ok = gen_server:cast(?MODULE, {set_max_churn, list_to_integer(V)}),
- {ok, S};
-
-handle_config_change("replicator", "interval", V, _, S) ->
- ok = gen_server:cast(?MODULE, {set_interval, list_to_integer(V)}),
- {ok, S};
-
-handle_config_change("replicator", "max_history", V, _, S) ->
- ok = gen_server:cast(?MODULE, {set_max_history, list_to_integer(V)}),
- {ok, S};
-
-handle_config_change(_, _, _, _, S) ->
- {ok, S}.
-
-
-handle_config_terminate(_, stop, _) ->
- ok;
-
-handle_config_terminate(_, _, _) ->
- Pid = whereis(?MODULE),
- erlang:send_after(?RELISTEN_DELAY, Pid, restart_config_listener).
-
-
-%% Private functions
-
-% Handle crashed jobs. Handling differs between transient and permanent jobs.
-% Transient jobs are those posted to the _replicate endpoint. They don't have a
-% db associated with them. When those jobs crash, they are not restarted. That
-% is also consistent with behavior when the node they run on, crashed and they
-% do not migrate to other nodes. Permanent jobs are those created from
-% replicator documents. Those jobs, once they pass basic validation and end up
-% in the scheduler will be retried indefinitely (with appropriate exponential
-% backoffs).
--spec handle_crashed_job(#job{}, any(), #state{}) -> ok.
-handle_crashed_job(#job{rep = #rep{db_name = null}} = Job, Reason, State) ->
- Msg = "~p : Transient job ~p failed, removing. Error: ~p",
- ErrorBinary = couch_replicator_utils:rep_error_to_binary(Reason),
- couch_log:error(Msg, [?MODULE, Job#job.id, ErrorBinary]),
- remove_job_int(Job),
- update_running_jobs_stats(State#state.stats_pid),
- ok;
-
-handle_crashed_job(Job, Reason, State) ->
- ok = update_state_crashed(Job, Reason, State),
- case couch_replicator_doc_processor:update_docs() of
- true ->
- couch_replicator_docs:update_error(Job#job.rep, Reason);
- false ->
- ok
- end,
- case ets:info(?MODULE, size) < State#state.max_jobs of
- true ->
- % Starting pending jobs is an O(TotalJobsCount) operation. Only do
- % it if there is a relatively small number of jobs. Otherwise
- % scheduler could be blocked if there is a cascade of lots failing
- % jobs in a row.
- start_pending_jobs(State),
- update_running_jobs_stats(State#state.stats_pid),
- ok;
- false ->
- ok
- end.
-
-
-% Attempt to start a newly added job. First quickly check if total jobs
-% already exceed max jobs, then do a more expensive check which runs a
-% select (an O(n) operation) to check pending jobs specifically.
--spec maybe_start_newly_added_job(#job{}, #state{}) -> ok.
-maybe_start_newly_added_job(Job, State) ->
- MaxJobs = State#state.max_jobs,
- TotalJobs = ets:info(?MODULE, size),
- case TotalJobs < MaxJobs andalso running_job_count() < MaxJobs of
- true ->
- start_job_int(Job, State),
- update_running_jobs_stats(State#state.stats_pid),
- ok;
- false ->
- ok
- end.
-
-
-% Return up to a given number of oldest, not recently crashed jobs. Try to be
-% memory efficient and use ets:foldl to accumulate jobs.
--spec pending_jobs(non_neg_integer()) -> [#job{}].
-pending_jobs(0) ->
- % Handle this case as user could set max_churn to 0. If this is passed to
- % other function clause it will crash as gb_sets:largest assumes set is not
- % empty.
- [];
-
-pending_jobs(Count) when is_integer(Count), Count > 0 ->
- Set0 = gb_sets:new(), % [{LastStart, Job},...]
- Now = os:timestamp(),
- Acc0 = {Set0, Now, Count, health_threshold()},
- {Set1, _, _, _} = ets:foldl(fun pending_fold/2, Acc0, ?MODULE),
- [Job || {_Started, Job} <- gb_sets:to_list(Set1)].
-
-
-pending_fold(Job, {Set, Now, Count, HealthThreshold}) ->
- Set1 = case {not_recently_crashed(Job, Now, HealthThreshold),
- gb_sets:size(Set) >= Count} of
- {true, true} ->
- % Job is healthy but already reached accumulated limit, so might
- % have to replace one of the accumulated jobs
- pending_maybe_replace(Job, Set);
- {true, false} ->
- % Job is healthy and we haven't reached the limit, so add job
- % to accumulator
- gb_sets:add_element({last_started(Job), Job}, Set);
- {false, _} ->
- % This job is not healthy (has crashed too recently), so skip it.
- Set
- end,
- {Set1, Now, Count, HealthThreshold}.
-
-
-% Replace Job in the accumulator if it is older than youngest job there.
-% "oldest" here means one which has been waiting to run the longest. "youngest"
-% means the one with most recent activity. The goal is to keep up to Count
-% oldest jobs during iteration. For example if there are jobs with these times
-% accumulated so far [5, 7, 11], and start time of current job is 6. Then
-% 6 < 11 is true, so 11 (youngest) is dropped and 6 inserted resulting in
-% [5, 6, 7]. In the end the result might look like [1, 2, 5], for example.
-pending_maybe_replace(Job, Set) ->
- Started = last_started(Job),
- {Youngest, YoungestJob} = gb_sets:largest(Set),
- case Started < Youngest of
- true ->
- Set1 = gb_sets:delete({Youngest, YoungestJob}, Set),
- gb_sets:add_element({Started, Job}, Set1);
- false ->
- Set
- end.
-
-
-start_jobs(Count, State) ->
- [start_job_int(Job, State) || Job <- pending_jobs(Count)],
- ok.
-
-
--spec stop_jobs(non_neg_integer(), boolean(), #state{}) -> non_neg_integer().
-stop_jobs(Count, _, _) when is_integer(Count), Count =< 0 ->
- 0;
-
-stop_jobs(Count, IsContinuous, State) when is_integer(Count) ->
- Running0 = running_jobs(),
- ContinuousPred = fun(Job) -> is_continuous(Job) =:= IsContinuous end,
- Running1 = lists:filter(ContinuousPred, Running0),
- Running2 = lists:sort(fun longest_running/2, Running1),
- Running3 = lists:sublist(Running2, Count),
- length([stop_job_int(Job, State) || Job <- Running3]).
-
-
-longest_running(#job{} = A, #job{} = B) ->
- last_started(A) =< last_started(B).
-
-
-not_recently_crashed(#job{history = History}, Now, HealthThreshold) ->
- case History of
- [{added, _When}] ->
- true;
- [{stopped, _When} | _] ->
- true;
- _ ->
- LatestCrashT = latest_crash_timestamp(History),
- CrashCount = consecutive_crashes(History, HealthThreshold),
- timer:now_diff(Now, LatestCrashT) >= backoff_micros(CrashCount)
- end.
-
-
-% Count consecutive crashes. A crash happens when there is a `crashed` event
-% within a short period of time (configurable) after any other event. It could
-% be `crashed, started` for jobs crashing quickly after starting, `crashed,
-% crashed`, `crashed, stopped` if job repeatedly failed to start
-% being stopped. Or it could be `crashed, added` if it crashed immediately after
-% being added during start.
-%
-% A streak of "consecutive crashes" ends when a crashed event is seen starting
-% and running successfully without crashing for a period of time. That period
-% of time is the HealthThreshold.
-%
-
--spec consecutive_crashes(history(), non_neg_integer()) -> non_neg_integer().
-consecutive_crashes(History, HealthThreshold) when is_list(History) ->
- consecutive_crashes(History, HealthThreshold, 0).
-
-
--spec consecutive_crashes(history(), non_neg_integer(), non_neg_integer()) ->
- non_neg_integer().
-consecutive_crashes([], _HealthThreashold, Count) ->
- Count;
-
-consecutive_crashes([{{crashed, _}, CrashT}, {_, PrevT} = PrevEvent | Rest],
- HealthThreshold, Count) ->
- case timer:now_diff(CrashT, PrevT) > HealthThreshold * 1000000 of
- true ->
- Count;
- false ->
- consecutive_crashes([PrevEvent | Rest], HealthThreshold, Count + 1)
- end;
-
-consecutive_crashes([{stopped, _}, {started, _} | _], _HealthThreshold,
- Count) ->
- Count;
-
-consecutive_crashes([_ | Rest], HealthThreshold, Count) ->
- consecutive_crashes(Rest, HealthThreshold, Count).
-
-
--spec latest_crash_timestamp(history()) -> erlang:timestamp().
-latest_crash_timestamp([]) ->
- {0, 0, 0}; % Used to avoid special-casing "no crash" when doing now_diff
-
-latest_crash_timestamp([{{crashed, _Reason}, When} | _]) ->
- When;
-
-latest_crash_timestamp([_Event | Rest]) ->
- latest_crash_timestamp(Rest).
-
-
--spec backoff_micros(non_neg_integer()) -> non_neg_integer().
-backoff_micros(CrashCount) ->
- % When calculating the backoff interval treat consecutive crash count as the
- % exponent in Base * 2 ^ CrashCount to achieve an exponential backoff
- % doubling every consecutive failure, starting with the base value of
- % ?BACKOFF_INTERVAL_MICROS.
- BackoffExp = erlang:min(CrashCount - 1, ?MAX_BACKOFF_EXPONENT),
- (1 bsl BackoffExp) * ?BACKOFF_INTERVAL_MICROS.
-
-
--spec add_job_int(#job{}) -> boolean().
-add_job_int(#job{} = Job) ->
- ets:insert_new(?MODULE, Job).
-
-
--spec maybe_remove_job_int(job_id(), #state{}) -> ok.
-maybe_remove_job_int(JobId, State) ->
- case job_by_id(JobId) of
- {ok, Job} ->
- ok = stop_job_int(Job, State),
- true = remove_job_int(Job),
- couch_stats:increment_counter([couch_replicator, jobs, removes]),
- TotalJobs = ets:info(?MODULE, size),
- couch_stats:update_gauge([couch_replicator, jobs, total],
- TotalJobs),
- update_running_jobs_stats(State#state.stats_pid),
- ok;
- {error, not_found} ->
- ok
- end.
-
-
-start_job_int(#job{pid = Pid}, _State) when Pid /= undefined ->
- ok;
-
-start_job_int(#job{} = Job0, State) ->
- Job = maybe_optimize_job_for_rate_limiting(Job0),
- case couch_replicator_scheduler_sup:start_child(Job#job.rep) of
- {ok, Child} ->
- Ref = monitor(process, Child),
- ok = update_state_started(Job, Child, Ref, State),
- couch_log:notice("~p: Job ~p started as ~p",
- [?MODULE, Job#job.id, Child]);
- {error, {already_started, OtherPid}} when node(OtherPid) =:= node() ->
- Ref = monitor(process, OtherPid),
- ok = update_state_started(Job, OtherPid, Ref, State),
- couch_log:notice("~p: Job ~p already running as ~p. Most likely"
- " because replicator scheduler was restarted",
- [?MODULE, Job#job.id, OtherPid]);
- {error, {already_started, OtherPid}} when node(OtherPid) =/= node() ->
- CrashMsg = "Duplicate replication running on another node",
- couch_log:notice("~p: Job ~p already running as ~p. Most likely"
- " because a duplicate replication is running on another node",
- [?MODULE, Job#job.id, OtherPid]),
- ok = update_state_crashed(Job, CrashMsg, State);
- {error, Reason} ->
- couch_log:notice("~p: Job ~p failed to start for reason ~p",
- [?MODULE, Job, Reason]),
- ok = update_state_crashed(Job, Reason, State)
- end.
-
-
--spec stop_job_int(#job{}, #state{}) -> ok | {error, term()}.
-stop_job_int(#job{pid = undefined}, _State) ->
- ok;
-
-stop_job_int(#job{} = Job, State) ->
- ok = couch_replicator_scheduler_sup:terminate_child(Job#job.pid),
- demonitor(Job#job.monitor, [flush]),
- ok = update_state_stopped(Job, State),
- couch_log:notice("~p: Job ~p stopped as ~p",
- [?MODULE, Job#job.id, Job#job.pid]).
-
-
--spec remove_job_int(#job{}) -> true.
-remove_job_int(#job{} = Job) ->
- ets:delete(?MODULE, Job#job.id).
-
-
--spec running_job_count() -> non_neg_integer().
-running_job_count() ->
- ets:info(?MODULE, size) - pending_job_count().
-
-
--spec running_jobs() -> [#job{}].
-running_jobs() ->
- ets:select(?MODULE, [{#job{pid = '$1', _='_'}, [{is_pid, '$1'}], ['$_']}]).
-
-
--spec pending_job_count() -> non_neg_integer().
-pending_job_count() ->
- ets:select_count(?MODULE, [{#job{pid=undefined, _='_'}, [], [true]}]).
-
-
--spec job_by_pid(pid()) -> {ok, #job{}} | {error, not_found}.
-job_by_pid(Pid) when is_pid(Pid) ->
- case ets:match_object(?MODULE, #job{pid=Pid, _='_'}) of
- [] ->
- {error, not_found};
- [#job{}=Job] ->
- {ok, Job}
- end.
-
-
--spec job_by_id(job_id()) -> {ok, #job{}} | {error, not_found}.
-job_by_id(Id) ->
- case ets:lookup(?MODULE, Id) of
- [] ->
- {error, not_found};
- [#job{}=Job] ->
- {ok, Job}
- end.
-
-
--spec update_state_stopped(#job{}, #state{}) -> ok.
-update_state_stopped(Job, State) ->
- Job1 = reset_job_process(Job),
- Job2 = update_history(Job1, stopped, os:timestamp(), State),
- true = ets:insert(?MODULE, Job2),
- couch_stats:increment_counter([couch_replicator, jobs, stops]),
- ok.
-
-
--spec update_state_started(#job{}, pid(), reference(), #state{}) -> ok.
-update_state_started(Job, Pid, Ref, State) ->
- Job1 = set_job_process(Job, Pid, Ref),
- Job2 = update_history(Job1, started, os:timestamp(), State),
- true = ets:insert(?MODULE, Job2),
- couch_stats:increment_counter([couch_replicator, jobs, starts]),
- ok.
-
-
--spec update_state_crashed(#job{}, any(), #state{}) -> ok.
-update_state_crashed(Job, Reason, State) ->
- Job1 = reset_job_process(Job),
- Job2 = update_history(Job1, {crashed, Reason}, os:timestamp(), State),
- true = ets:insert(?MODULE, Job2),
- couch_stats:increment_counter([couch_replicator, jobs, crashes]),
- ok.
-
-
--spec set_job_process(#job{}, pid(), reference()) -> #job{}.
-set_job_process(#job{} = Job, Pid, Ref) when is_pid(Pid), is_reference(Ref) ->
- Job#job{pid = Pid, monitor = Ref}.
-
-
--spec reset_job_process(#job{}) -> #job{}.
-reset_job_process(#job{} = Job) ->
- Job#job{pid = undefined, monitor = undefined}.
-
-
--spec reschedule(#state{}) -> ok.
-reschedule(State) ->
- StopCount = stop_excess_jobs(State, running_job_count()),
- rotate_jobs(State, StopCount),
- update_running_jobs_stats(State#state.stats_pid).
-
-
--spec stop_excess_jobs(#state{}, non_neg_integer()) -> non_neg_integer().
-stop_excess_jobs(State, Running) ->
- #state{max_jobs=MaxJobs} = State,
- StopCount = max(0, Running - MaxJobs),
- Stopped = stop_jobs(StopCount, true, State),
- OneshotLeft = StopCount - Stopped,
- stop_jobs(OneshotLeft, false, State),
- StopCount.
-
-
-start_pending_jobs(State) ->
- #state{max_jobs=MaxJobs} = State,
- Running = running_job_count(),
- Pending = pending_job_count(),
- if Running < MaxJobs, Pending > 0 ->
- start_jobs(MaxJobs - Running, State);
- true ->
- ok
- end.
-
-
--spec rotate_jobs(#state{}, non_neg_integer()) -> ok.
-rotate_jobs(State, ChurnSoFar) ->
- #state{max_jobs=MaxJobs, max_churn=MaxChurn} = State,
- Running = running_job_count(),
- Pending = pending_job_count(),
- % Reduce MaxChurn by the number of already stopped jobs in the
- % current rescheduling cycle.
- Churn = max(0, MaxChurn - ChurnSoFar),
- SlotsAvailable = MaxJobs - Running,
- if SlotsAvailable >= 0 ->
- % If there is are enough SlotsAvailable reduce StopCount to avoid
- % unnesessarily stopping jobs. `stop_jobs/3` ignores 0 or negative
- % values so we don't worry about that here.
- StopCount = lists:min([Pending - SlotsAvailable, Running, Churn]),
- stop_jobs(StopCount, true, State),
- StartCount = max(0, MaxJobs - running_job_count()),
- start_jobs(StartCount, State);
- true ->
- ok
- end.
-
-
--spec last_started(#job{}) -> erlang:timestamp().
-last_started(#job{} = Job) ->
- case lists:keyfind(started, 1, Job#job.history) of
- false ->
- {0, 0, 0};
- {started, When} ->
- When
- end.
-
-
--spec update_history(#job{}, event_type(), erlang:timestamp(), #state{}) ->
- #job{}.
-update_history(Job, Type, When, State) ->
- History0 = [{Type, When} | Job#job.history],
- History1 = lists:sublist(History0, State#state.max_history),
- Job#job{history = History1}.
-
-
--spec ejson_url(#httpdb{} | binary()) -> binary().
-ejson_url(#httpdb{}=Httpdb) ->
- couch_util:url_strip_password(Httpdb#httpdb.url);
-ejson_url(DbName) when is_binary(DbName) ->
- DbName.
-
-
--spec job_ejson(#job{}) -> {[_ | _]}.
-job_ejson(Job) ->
- Rep = Job#job.rep,
- Source = ejson_url(Rep#rep.source),
- Target = ejson_url(Rep#rep.target),
- History = lists:map(fun({Type, When}) ->
- EventProps = case Type of
- {crashed, Reason} ->
- [{type, crashed}, {reason, crash_reason_json(Reason)}];
- Type ->
- [{type, Type}]
- end,
- {[{timestamp, couch_replicator_utils:iso8601(When)} | EventProps]}
- end, Job#job.history),
- {BaseID, Ext} = Job#job.id,
- Pid = case Job#job.pid of
- undefined ->
- null;
- P when is_pid(P) ->
- ?l2b(pid_to_list(P))
- end,
- {[
- {id, iolist_to_binary([BaseID, Ext])},
- {pid, Pid},
- {source, iolist_to_binary(Source)},
- {target, iolist_to_binary(Target)},
- {database, Rep#rep.db_name},
- {user, (Rep#rep.user_ctx)#user_ctx.name},
- {doc_id, Rep#rep.doc_id},
- {info, couch_replicator_utils:ejson_state_info(Rep#rep.stats)},
- {history, History},
- {node, node()},
- {start_time, couch_replicator_utils:iso8601(Rep#rep.start_time)}
- ]}.
-
-
--spec jobs() -> [[tuple()]].
-jobs() ->
- ets:foldl(fun(Job, Acc) -> [job_ejson(Job) | Acc] end, [], ?MODULE).
-
-
--spec job(job_id()) -> {ok, {[_ | _]}} | {error, not_found}.
-job(JobId) ->
- case job_by_id(JobId) of
- {ok, Job} ->
- {ok, job_ejson(Job)};
- Error ->
- Error
- end.
-
-
-crash_reason_json({_CrashType, Info}) when is_binary(Info) ->
- Info;
-crash_reason_json(Reason) when is_binary(Reason) ->
- Reason;
-crash_reason_json(Error) ->
- couch_replicator_utils:rep_error_to_binary(Error).
-
-
--spec last_updated([_]) -> binary().
-last_updated([{_Type, When} | _]) ->
- couch_replicator_utils:iso8601(When).
-
-
--spec is_continuous(#job{}) -> boolean().
-is_continuous(#job{rep = Rep}) ->
- couch_util:get_value(continuous, Rep#rep.options, false).
-
-
-% If job crashed last time because it was rate limited, try to
-% optimize some options to help the job make progress.
--spec maybe_optimize_job_for_rate_limiting(#job{}) -> #job{}.
-maybe_optimize_job_for_rate_limiting(Job = #job{history =
- [{{crashed, max_backoff}, _} | _]}) ->
- Opts = [
- {checkpoint_interval, 5000},
- {worker_processes, 2},
- {worker_batch_size, 100},
- {http_connections, 5}
- ],
- Rep = lists:foldl(fun optimize_int_option/2, Job#job.rep, Opts),
- Job#job{rep = Rep};
-maybe_optimize_job_for_rate_limiting(Job) ->
- Job.
-
-
--spec optimize_int_option({atom(), any()}, #rep{}) -> #rep{}.
-optimize_int_option({Key, Val}, #rep{options = Options} = Rep) ->
- case couch_util:get_value(Key, Options) of
- CurVal when is_integer(CurVal), CurVal > Val ->
- Msg = "~p replication ~p : setting ~p = ~p due to rate limiting",
- couch_log:warning(Msg, [?MODULE, Rep#rep.id, Key, Val]),
- Options1 = lists:keyreplace(Key, 1, Options, {Key, Val}),
- Rep#rep{options = Options1};
- _ ->
- Rep
- end.
-
-
-% Updater is a separate process. It receives `update_stats` messages and
-% updates scheduler stats from the scheduler jobs table. Updates are
-% performed no more frequently than once per ?STATS_UPDATE_WAIT milliseconds.
-
-update_running_jobs_stats(StatsPid) when is_pid(StatsPid) ->
- StatsPid ! update_stats,
- ok.
-
-
-start_stats_updater() ->
- erlang:spawn_link(?MODULE, stats_updater_loop, [undefined]).
-
-
-stats_updater_loop(Timer) ->
- receive
- update_stats when Timer == undefined ->
- TRef = erlang:send_after(?STATS_UPDATE_WAIT, self(), refresh_stats),
- ?MODULE:stats_updater_loop(TRef);
- update_stats when is_reference(Timer) ->
- ?MODULE:stats_updater_loop(Timer);
- refresh_stats ->
- ok = stats_updater_refresh(),
- ?MODULE:stats_updater_loop(undefined);
- Else ->
- erlang:exit({stats_updater_bad_msg, Else})
- end.
-
-
--spec stats_updater_refresh() -> ok.
-stats_updater_refresh() ->
- #stats_acc{
- pending_n = PendingN,
- running_n = RunningN,
- crashed_n = CrashedN
- } = ets:foldl(fun stats_fold/2, #stats_acc{}, ?MODULE),
- couch_stats:update_gauge([couch_replicator, jobs, pending], PendingN),
- couch_stats:update_gauge([couch_replicator, jobs, running], RunningN),
- couch_stats:update_gauge([couch_replicator, jobs, crashed], CrashedN),
- ok.
-
-
--spec stats_fold(#job{}, #stats_acc{}) -> #stats_acc{}.
-stats_fold(#job{pid = undefined, history = [{added, _}]}, Acc) ->
- Acc#stats_acc{pending_n = Acc#stats_acc.pending_n + 1};
-stats_fold(#job{pid = undefined, history = [{stopped, _} | _]}, Acc) ->
- Acc#stats_acc{pending_n = Acc#stats_acc.pending_n + 1};
-stats_fold(#job{pid = undefined, history = [{{crashed, _}, _} | _]}, Acc) ->
- Acc#stats_acc{crashed_n =Acc#stats_acc.crashed_n + 1};
-stats_fold(#job{pid = P, history = [{started, _} | _]}, Acc) when is_pid(P) ->
- Acc#stats_acc{running_n = Acc#stats_acc.running_n + 1}.
-
-
--spec existing_replication(#rep{}) -> boolean().
-existing_replication(#rep{} = NewRep) ->
- case job_by_id(NewRep#rep.id) of
- {ok, #job{rep = CurRep}} ->
- NormCurRep = couch_replicator_utils:normalize_rep(CurRep),
- NormNewRep = couch_replicator_utils:normalize_rep(NewRep),
- NormCurRep == NormNewRep;
- {error, not_found} ->
- false
- end.
-
-
--ifdef(TEST).
-
--include_lib("eunit/include/eunit.hrl").
-
-
-backoff_micros_test_() ->
- BaseInterval = ?BACKOFF_INTERVAL_MICROS,
- [?_assertEqual(R * BaseInterval, backoff_micros(N)) || {R, N} <- [
- {1, 1}, {2, 2}, {4, 3}, {8, 4}, {16, 5}, {32, 6}, {64, 7}, {128, 8},
- {256, 9}, {512, 10}, {1024, 11}, {1024, 12}
- ]].
-
-
-consecutive_crashes_test_() ->
- Threshold = ?DEFAULT_HEALTH_THRESHOLD_SEC,
- [?_assertEqual(R, consecutive_crashes(H, Threshold)) || {R, H} <- [
- {0, []},
- {0, [added()]},
- {0, [stopped()]},
- {0, [crashed()]},
- {1, [crashed(), added()]},
- {1, [crashed(), crashed()]},
- {1, [crashed(), stopped()]},
- {3, [crashed(), crashed(), crashed(), added()]},
- {2, [crashed(), crashed(), stopped()]},
- {1, [crashed(), started(), added()]},
- {2, [crashed(3), started(2), crashed(1), started(0)]},
- {0, [stopped(3), started(2), crashed(1), started(0)]},
- {1, [crashed(3), started(2), stopped(1), started(0)]},
- {0, [crashed(999), started(0)]},
- {1, [crashed(999), started(998), crashed(997), started(0)]}
- ]].
-
-
-consecutive_crashes_non_default_threshold_test_() ->
- [?_assertEqual(R, consecutive_crashes(H, T)) || {R, H, T} <- [
- {0, [crashed(11), started(0)], 10},
- {1, [crashed(10), started(0)], 10}
- ]].
-
-
-latest_crash_timestamp_test_() ->
- [?_assertEqual({0, R, 0}, latest_crash_timestamp(H)) || {R, H} <- [
- {0, [added()]},
- {1, [crashed(1)]},
- {3, [crashed(3), started(2), crashed(1), started(0)]},
- {1, [started(3), stopped(2), crashed(1), started(0)]}
- ]].
-
-
-last_started_test_() ->
- [?_assertEqual({0, R, 0}, last_started(testjob(H))) || {R, H} <- [
- {0, [added()]},
- {0, [crashed(1)]},
- {1, [started(1)]},
- {1, [added(), started(1)]},
- {2, [started(2), started(1)]},
- {2, [crashed(3), started(2), started(1)]}
- ]].
-
-
-longest_running_test() ->
- J0 = testjob([crashed()]),
- J1 = testjob([started(1)]),
- J2 = testjob([started(2)]),
- Sort = fun(Jobs) -> lists:sort(fun longest_running/2, Jobs) end,
- ?assertEqual([], Sort([])),
- ?assertEqual([J1], Sort([J1])),
- ?assertEqual([J1, J2], Sort([J2, J1])),
- ?assertEqual([J0, J1, J2], Sort([J2, J1, J0])).
-
-
-scheduler_test_() ->
- {
- setup,
- fun setup_all/0,
- fun teardown_all/1,
- {
- foreach,
- fun setup/0,
- fun teardown/1,
- [
- t_pending_jobs_simple(),
- t_pending_jobs_skip_crashed(),
- t_one_job_starts(),
- t_no_jobs_start_if_max_is_0(),
- t_one_job_starts_if_max_is_1(),
- t_max_churn_does_not_throttle_initial_start(),
- t_excess_oneshot_only_jobs(),
- t_excess_continuous_only_jobs(),
- t_excess_prefer_continuous_first(),
- t_stop_oldest_first(),
- t_start_oldest_first(),
- t_jobs_churn_even_if_not_all_max_jobs_are_running(),
- t_jobs_dont_churn_if_there_are_available_running_slots(),
- t_start_only_pending_jobs_do_not_churn_existing_ones(),
- t_dont_stop_if_nothing_pending(),
- t_max_churn_limits_number_of_rotated_jobs(),
- t_existing_jobs(),
- t_if_pending_less_than_running_start_all_pending(),
- t_running_less_than_pending_swap_all_running(),
- t_oneshot_dont_get_rotated(),
- t_rotate_continuous_only_if_mixed(),
- t_oneshot_dont_get_starting_priority(),
- t_oneshot_will_hog_the_scheduler(),
- t_if_excess_is_trimmed_rotation_still_happens(),
- t_if_transient_job_crashes_it_gets_removed(),
- t_if_permanent_job_crashes_it_stays_in_ets(),
- t_job_summary_running(),
- t_job_summary_pending(),
- t_job_summary_crashing_once(),
- t_job_summary_crashing_many_times(),
- t_job_summary_proxy_fields()
- ]
- }
- }.
-
-
-t_pending_jobs_simple() ->
- ?_test(begin
- Job1 = oneshot(1),
- Job2 = oneshot(2),
- setup_jobs([Job2, Job1]),
- ?assertEqual([], pending_jobs(0)),
- ?assertEqual([Job1], pending_jobs(1)),
- ?assertEqual([Job1, Job2], pending_jobs(2)),
- ?assertEqual([Job1, Job2], pending_jobs(3))
- end).
-
-
-t_pending_jobs_skip_crashed() ->
- ?_test(begin
- Job = oneshot(1),
- Ts = os:timestamp(),
- History = [crashed(Ts), started(Ts) | Job#job.history],
- Job1 = Job#job{history = History},
- Job2 = oneshot(2),
- Job3 = oneshot(3),
- setup_jobs([Job2, Job1, Job3]),
- ?assertEqual([Job2], pending_jobs(1)),
- ?assertEqual([Job2, Job3], pending_jobs(2)),
- ?assertEqual([Job2, Job3], pending_jobs(3))
- end).
-
-
-t_one_job_starts() ->
- ?_test(begin
- setup_jobs([oneshot(1)]),
- ?assertEqual({0, 1}, run_stop_count()),
- reschedule(mock_state(?DEFAULT_MAX_JOBS)),
- ?assertEqual({1, 0}, run_stop_count())
- end).
-
-
-t_no_jobs_start_if_max_is_0() ->
- ?_test(begin
- setup_jobs([oneshot(1)]),
- reschedule(mock_state(0)),
- ?assertEqual({0, 1}, run_stop_count())
- end).
-
-
-t_one_job_starts_if_max_is_1() ->
- ?_test(begin
- setup_jobs([oneshot(1), oneshot(2)]),
- reschedule(mock_state(1)),
- ?assertEqual({1, 1}, run_stop_count())
- end).
-
-
-t_max_churn_does_not_throttle_initial_start() ->
- ?_test(begin
- setup_jobs([oneshot(1), oneshot(2)]),
- reschedule(mock_state(?DEFAULT_MAX_JOBS, 0)),
- ?assertEqual({2, 0}, run_stop_count())
- end).
-
-
-t_excess_oneshot_only_jobs() ->
- ?_test(begin
- setup_jobs([oneshot_running(1), oneshot_running(2)]),
- ?assertEqual({2, 0}, run_stop_count()),
- reschedule(mock_state(1)),
- ?assertEqual({1, 1}, run_stop_count()),
- reschedule(mock_state(0)),
- ?assertEqual({0, 2}, run_stop_count())
- end).
-
-
-t_excess_continuous_only_jobs() ->
- ?_test(begin
- setup_jobs([continuous_running(1), continuous_running(2)]),
- ?assertEqual({2, 0}, run_stop_count()),
- reschedule(mock_state(1)),
- ?assertEqual({1, 1}, run_stop_count()),
- reschedule(mock_state(0)),
- ?assertEqual({0, 2}, run_stop_count())
- end).
-
-
-t_excess_prefer_continuous_first() ->
- ?_test(begin
- Jobs = [
- continuous_running(1),
- oneshot_running(2),
- continuous_running(3)
- ],
- setup_jobs(Jobs),
- ?assertEqual({3, 0}, run_stop_count()),
- ?assertEqual({1, 0}, oneshot_run_stop_count()),
- reschedule(mock_state(2)),
- ?assertEqual({2, 1}, run_stop_count()),
- ?assertEqual({1, 0}, oneshot_run_stop_count()),
- reschedule(mock_state(1)),
- ?assertEqual({1, 0}, oneshot_run_stop_count()),
- reschedule(mock_state(0)),
- ?assertEqual({0, 1}, oneshot_run_stop_count())
- end).
-
-
-t_stop_oldest_first() ->
- ?_test(begin
- Jobs = [
- continuous_running(7),
- continuous_running(4),
- continuous_running(5)
- ],
- setup_jobs(Jobs),
- reschedule(mock_state(2, 1)),
- ?assertEqual({2, 1}, run_stop_count()),
- ?assertEqual([4], jobs_stopped()),
- reschedule(mock_state(1, 1)),
- ?assertEqual([7], jobs_running())
- end).
-
-
-t_start_oldest_first() ->
- ?_test(begin
- setup_jobs([continuous(7), continuous(2), continuous(5)]),
- reschedule(mock_state(1)),
- ?assertEqual({1, 2}, run_stop_count()),
- ?assertEqual([2], jobs_running()),
- reschedule(mock_state(2)),
- ?assertEqual({2, 1}, run_stop_count()),
- % After rescheduling with max_jobs = 2, 2 was stopped and 5, 7 should
- % be running.
- ?assertEqual([2], jobs_stopped())
- end).
-
-
-t_jobs_churn_even_if_not_all_max_jobs_are_running() ->
- ?_test(begin
- setup_jobs([
- continuous_running(7),
- continuous(2),
- continuous(5)
- ]),
- reschedule(mock_state(2, 2)),
- ?assertEqual({2, 1}, run_stop_count()),
- ?assertEqual([7], jobs_stopped())
- end).
-
-
-t_jobs_dont_churn_if_there_are_available_running_slots() ->
- ?_test(begin
- setup_jobs([
- continuous_running(1),
- continuous_running(2)
- ]),
- reschedule(mock_state(2, 2)),
- ?assertEqual({2, 0}, run_stop_count()),
- ?assertEqual([], jobs_stopped()),
- ?assertEqual(0, meck:num_calls(couch_replicator_scheduler_sup, start_child, 1))
- end).
-
-
-t_start_only_pending_jobs_do_not_churn_existing_ones() ->
- ?_test(begin
- setup_jobs([
- continuous(1),
- continuous_running(2)
- ]),
- reschedule(mock_state(2, 2)),
- ?assertEqual(1, meck:num_calls(couch_replicator_scheduler_sup, start_child, 1)),
- ?assertEqual([], jobs_stopped()),
- ?assertEqual({2, 0}, run_stop_count())
- end).
-
-
-t_dont_stop_if_nothing_pending() ->
- ?_test(begin
- setup_jobs([continuous_running(1), continuous_running(2)]),
- reschedule(mock_state(2)),
- ?assertEqual({2, 0}, run_stop_count())
- end).
-
-
-t_max_churn_limits_number_of_rotated_jobs() ->
- ?_test(begin
- Jobs = [
- continuous(1),
- continuous_running(2),
- continuous(3),
- continuous_running(4)
- ],
- setup_jobs(Jobs),
- reschedule(mock_state(2, 1)),
- ?assertEqual([2, 3], jobs_stopped())
- end).
-
-
-t_if_pending_less_than_running_start_all_pending() ->
- ?_test(begin
- Jobs = [
- continuous(1),
- continuous_running(2),
- continuous(3),
- continuous_running(4),
- continuous_running(5)
- ],
- setup_jobs(Jobs),
- reschedule(mock_state(3)),
- ?assertEqual([1, 2, 5], jobs_running())
- end).
-
-
-t_running_less_than_pending_swap_all_running() ->
- ?_test(begin
- Jobs = [
- continuous(1),
- continuous(2),
- continuous(3),
- continuous_running(4),
- continuous_running(5)
- ],
- setup_jobs(Jobs),
- reschedule(mock_state(2)),
- ?assertEqual([3, 4, 5], jobs_stopped())
- end).
-
-
-t_oneshot_dont_get_rotated() ->
- ?_test(begin
- setup_jobs([oneshot_running(1), continuous(2)]),
- reschedule(mock_state(1)),
- ?assertEqual([1], jobs_running())
- end).
-
-
-t_rotate_continuous_only_if_mixed() ->
- ?_test(begin
- setup_jobs([continuous(1), oneshot_running(2), continuous_running(3)]),
- reschedule(mock_state(2)),
- ?assertEqual([1, 2], jobs_running())
- end).
-
-
-t_oneshot_dont_get_starting_priority() ->
- ?_test(begin
- setup_jobs([continuous(1), oneshot(2), continuous_running(3)]),
- reschedule(mock_state(1)),
- ?assertEqual([1], jobs_running())
- end).
-
-
-% This tested in other test cases, it is here to mainly make explicit a property
-% of one-shot replications -- they can starve other jobs if they "take control"
-% of all the available scheduler slots.
-t_oneshot_will_hog_the_scheduler() ->
- ?_test(begin
- Jobs = [
- oneshot_running(1),
- oneshot_running(2),
- oneshot(3),
- continuous(4)
- ],
- setup_jobs(Jobs),
- reschedule(mock_state(2)),
- ?assertEqual([1, 2], jobs_running())
- end).
-
-
-t_if_excess_is_trimmed_rotation_still_happens() ->
- ?_test(begin
- Jobs = [
- continuous(1),
- continuous_running(2),
- continuous_running(3)
- ],
- setup_jobs(Jobs),
- reschedule(mock_state(1)),
- ?assertEqual([1], jobs_running())
- end).
-
-
-t_if_transient_job_crashes_it_gets_removed() ->
- ?_test(begin
- Pid = mock_pid(),
- Job = #job{
- id = job1,
- pid = Pid,
- history = [added()],
- rep = #rep{db_name = null, options = [{continuous, true}]}
- },
- setup_jobs([Job]),
- ?assertEqual(1, ets:info(?MODULE, size)),
- State = #state{max_history = 3, stats_pid = self()},
- {noreply, State} = handle_info({'DOWN', r1, process, Pid, failed},
- State),
- ?assertEqual(0, ets:info(?MODULE, size))
- end).
-
-
-t_if_permanent_job_crashes_it_stays_in_ets() ->
- ?_test(begin
- Pid = mock_pid(),
- Job = #job{
- id = job1,
- pid = Pid,
- history = [added()],
- rep = #rep{db_name = <<"db1">>, options = [{continuous, true}]}
- },
- setup_jobs([Job]),
- ?assertEqual(1, ets:info(?MODULE, size)),
- State = #state{max_jobs =1, max_history = 3, stats_pid = self()},
- {noreply, State} = handle_info({'DOWN', r1, process, Pid, failed},
- State),
- ?assertEqual(1, ets:info(?MODULE, size)),
- [Job1] = ets:lookup(?MODULE, job1),
- [Latest | _] = Job1#job.history,
- ?assertMatch({{crashed, failed}, _}, Latest)
- end).
-
-
-t_existing_jobs() ->
- ?_test(begin
- Rep = #rep{
- id = job1,
- db_name = <<"db">>,
- source = <<"s">>,
- target = <<"t">>,
- options = [{continuous, true}]
- },
- setup_jobs([#job{id = Rep#rep.id, rep = Rep}]),
- NewRep = #rep{
- id = Rep#rep.id,
- db_name = <<"db">>,
- source = <<"s">>,
- target = <<"t">>,
- options = [{continuous, true}]
- },
- ?assert(existing_replication(NewRep)),
- ?assertNot(existing_replication(NewRep#rep{source = <<"s1">>})),
- ?assertNot(existing_replication(NewRep#rep{target = <<"t1">>})),
- ?assertNot(existing_replication(NewRep#rep{options = []}))
- end).
-
-
-t_job_summary_running() ->
- ?_test(begin
- Job = #job{
- id = job1,
- pid = mock_pid(),
- history = [added()],
- rep = #rep{
- db_name = <<"db1">>,
- source = <<"s">>,
- target = <<"t">>
- }
- },
- setup_jobs([Job]),
- Summary = job_summary(job1, ?DEFAULT_HEALTH_THRESHOLD_SEC),
- ?assertEqual(running, proplists:get_value(state, Summary)),
- ?assertEqual(null, proplists:get_value(info, Summary)),
- ?assertEqual(0, proplists:get_value(error_count, Summary)),
-
- Stats = [{source_seq, <<"1-abc">>}],
- handle_cast({update_job_stats, job1, Stats}, mock_state(1)),
- Summary1 = job_summary(job1, ?DEFAULT_HEALTH_THRESHOLD_SEC),
- ?assertEqual({Stats}, proplists:get_value(info, Summary1))
- end).
-
-
-t_job_summary_pending() ->
- ?_test(begin
- Job = #job{
- id = job1,
- pid = undefined,
- history = [stopped(20), started(10), added()],
- rep = #rep{source = <<"s">>, target = <<"t">>}
- },
- setup_jobs([Job]),
- Summary = job_summary(job1, ?DEFAULT_HEALTH_THRESHOLD_SEC),
- ?assertEqual(pending, proplists:get_value(state, Summary)),
- ?assertEqual(null, proplists:get_value(info, Summary)),
- ?assertEqual(0, proplists:get_value(error_count, Summary)),
-
- Stats = [{doc_write_failures, 1}],
- handle_cast({update_job_stats, job1, Stats}, mock_state(1)),
- Summary1 = job_summary(job1, ?DEFAULT_HEALTH_THRESHOLD_SEC),
- ?assertEqual({Stats}, proplists:get_value(info, Summary1))
- end).
-
-
-t_job_summary_crashing_once() ->
- ?_test(begin
- Job = #job{
- id = job1,
- history = [crashed(?DEFAULT_HEALTH_THRESHOLD_SEC + 1), started(0)],
- rep = #rep{source = <<"s">>, target = <<"t">>}
- },
- setup_jobs([Job]),
- Summary = job_summary(job1, ?DEFAULT_HEALTH_THRESHOLD_SEC),
- ?assertEqual(crashing, proplists:get_value(state, Summary)),
- Info = proplists:get_value(info, Summary),
- ?assertEqual({[{<<"error">>, <<"some_reason">>}]}, Info),
- ?assertEqual(0, proplists:get_value(error_count, Summary))
- end).
-
-
-t_job_summary_crashing_many_times() ->
- ?_test(begin
- Job = #job{
- id = job1,
- history = [crashed(4), started(3), crashed(2), started(1)],
- rep = #rep{source = <<"s">>, target = <<"t">>}
- },
- setup_jobs([Job]),
- Summary = job_summary(job1, ?DEFAULT_HEALTH_THRESHOLD_SEC),
- ?assertEqual(crashing, proplists:get_value(state, Summary)),
- Info = proplists:get_value(info, Summary),
- ?assertEqual({[{<<"error">>, <<"some_reason">>}]}, Info),
- ?assertEqual(2, proplists:get_value(error_count, Summary))
- end).
-
-
-t_job_summary_proxy_fields() ->
- ?_test(begin
- Job = #job{
- id = job1,
- history = [started(10), added()],
- rep = #rep{
- source = #httpdb{
- url = "https://s",
- proxy_url = "http://u:p@sproxy:12"
- },
- target = #httpdb{
- url = "http://t",
- proxy_url = "socks5://u:p@tproxy:34"
- }
- }
- },
- setup_jobs([Job]),
- Summary = job_summary(job1, ?DEFAULT_HEALTH_THRESHOLD_SEC),
- ?assertEqual(<<"http://u:*****@sproxy:12">>,
- proplists:get_value(source_proxy, Summary)),
- ?assertEqual(<<"socks5://u:*****@tproxy:34">>,
- proplists:get_value(target_proxy, Summary))
- end).
-
-
-% Test helper functions
-
-setup_all() ->
- catch ets:delete(?MODULE),
- meck:expect(couch_log, notice, 2, ok),
- meck:expect(couch_log, warning, 2, ok),
- meck:expect(couch_log, error, 2, ok),
- meck:expect(couch_replicator_scheduler_sup, terminate_child, 1, ok),
- meck:expect(couch_stats, increment_counter, 1, ok),
- meck:expect(couch_stats, update_gauge, 2, ok),
- Pid = mock_pid(),
- meck:expect(couch_replicator_scheduler_sup, start_child, 1, {ok, Pid}).
-
-
-teardown_all(_) ->
- catch ets:delete(?MODULE),
- meck:unload().
-
-
-setup() ->
- meck:reset([
- couch_log,
- couch_replicator_scheduler_sup,
- couch_stats
- ]).
-
-
-teardown(_) ->
- ok.
-
-
-setup_jobs(Jobs) when is_list(Jobs) ->
- ?MODULE = ets:new(?MODULE, [named_table, {keypos, #job.id}]),
- ets:insert(?MODULE, Jobs).
-
-
-all_jobs() ->
- lists:usort(ets:tab2list(?MODULE)).
-
-
-jobs_stopped() ->
- [Job#job.id || Job <- all_jobs(), Job#job.pid =:= undefined].
-
-
-jobs_running() ->
- [Job#job.id || Job <- all_jobs(), Job#job.pid =/= undefined].
-
-
-run_stop_count() ->
- {length(jobs_running()), length(jobs_stopped())}.
-
-
-oneshot_run_stop_count() ->
- Running = [Job#job.id || Job <- all_jobs(), Job#job.pid =/= undefined,
- not is_continuous(Job)],
- Stopped = [Job#job.id || Job <- all_jobs(), Job#job.pid =:= undefined,
- not is_continuous(Job)],
- {length(Running), length(Stopped)}.
-
-
-mock_state(MaxJobs) ->
- #state{
- max_jobs = MaxJobs,
- max_churn = ?DEFAULT_MAX_CHURN,
- max_history = ?DEFAULT_MAX_HISTORY,
- stats_pid = self()
- }.
-
-mock_state(MaxJobs, MaxChurn) ->
- #state{
- max_jobs = MaxJobs,
- max_churn = MaxChurn,
- max_history = ?DEFAULT_MAX_HISTORY,
- stats_pid = self()
- }.
-
-
-continuous(Id) when is_integer(Id) ->
- Started = Id,
- Hist = [stopped(Started+1), started(Started), added()],
- #job{
- id = Id,
- history = Hist,
- rep = #rep{options = [{continuous, true}]}
- }.
-
-
-continuous_running(Id) when is_integer(Id) ->
- Started = Id,
- Pid = mock_pid(),
- #job{
- id = Id,
- history = [started(Started), added()],
- rep = #rep{options = [{continuous, true}]},
- pid = Pid,
- monitor = monitor(process, Pid)
- }.
-
-
-oneshot(Id) when is_integer(Id) ->
- Started = Id,
- Hist = [stopped(Started + 1), started(Started), added()],
- #job{id = Id, history = Hist, rep = #rep{options = []}}.
-
-
-oneshot_running(Id) when is_integer(Id) ->
- Started = Id,
- Pid = mock_pid(),
- #job{
- id = Id,
- history = [started(Started), added()],
- rep = #rep{options = []},
- pid = Pid,
- monitor = monitor(process, Pid)
- }.
-
-
-testjob(Hist) when is_list(Hist) ->
- #job{history = Hist}.
-
-
-mock_pid() ->
- list_to_pid("<0.999.999>").
-
-crashed() ->
- crashed(0).
-
-
-crashed(WhenSec) when is_integer(WhenSec)->
- {{crashed, some_reason}, {0, WhenSec, 0}};
-crashed({MSec, Sec, USec}) ->
- {{crashed, some_reason}, {MSec, Sec, USec}}.
-
-
-started() ->
- started(0).
-
-
-started(WhenSec) when is_integer(WhenSec)->
- {started, {0, WhenSec, 0}};
-
-started({MSec, Sec, USec}) ->
- {started, {MSec, Sec, USec}}.
-
-
-stopped() ->
- stopped(0).
-
-
-stopped(WhenSec) ->
- {stopped, {0, WhenSec, 0}}.
-
-
-added() ->
- {added, {0, 0, 0}}.
-
--endif.
diff --git a/src/couch_replicator/src/couch_replicator_scheduler.hrl b/src/couch_replicator/src/couch_replicator_scheduler.hrl
deleted file mode 100644
index 5203b0caa..000000000
--- a/src/couch_replicator/src/couch_replicator_scheduler.hrl
+++ /dev/null
@@ -1,15 +0,0 @@
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-%
-% http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
-
-
--type job_id() :: term().
--type job_args() :: term().
diff --git a/src/couch_replicator/src/couch_replicator_scheduler_job.erl b/src/couch_replicator/src/couch_replicator_scheduler_job.erl
deleted file mode 100644
index 0b33419e1..000000000
--- a/src/couch_replicator/src/couch_replicator_scheduler_job.erl
+++ /dev/null
@@ -1,1090 +0,0 @@
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-%
-% http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
-
--module(couch_replicator_scheduler_job).
-
--behaviour(gen_server).
-
--export([
- start_link/1
-]).
-
--export([
- init/1,
- terminate/2,
- handle_call/3,
- handle_info/2,
- handle_cast/2,
- code_change/3,
- format_status/2
-]).
-
--include_lib("couch/include/couch_db.hrl").
--include_lib("couch_replicator/include/couch_replicator_api_wrap.hrl").
--include("couch_replicator_scheduler.hrl").
--include("couch_replicator.hrl").
-
--import(couch_util, [
- get_value/2,
- get_value/3,
- to_binary/1
-]).
-
--import(couch_replicator_utils, [
- pp_rep_id/1
-]).
-
-
--define(LOWEST_SEQ, 0).
--define(DEFAULT_CHECKPOINT_INTERVAL, 30000).
--define(STARTUP_JITTER_DEFAULT, 5000).
-
--record(rep_state, {
- rep_details,
- source_name,
- target_name,
- source,
- target,
- history,
- checkpoint_history,
- start_seq,
- committed_seq,
- current_through_seq,
- seqs_in_progress = [],
- highest_seq_done = {0, ?LOWEST_SEQ},
- source_log,
- target_log,
- rep_starttime,
- src_starttime,
- tgt_starttime,
- timer, % checkpoint timer
- changes_queue,
- changes_manager,
- changes_reader,
- workers,
- stats = couch_replicator_stats:new(),
- session_id,
- source_seq = nil,
- use_checkpoints = true,
- checkpoint_interval = ?DEFAULT_CHECKPOINT_INTERVAL,
- type = db,
- view = nil
-}).
-
-
-start_link(#rep{id = {BaseId, Ext}, source = Src, target = Tgt} = Rep) ->
- RepChildId = BaseId ++ Ext,
- Source = couch_replicator_api_wrap:db_uri(Src),
- Target = couch_replicator_api_wrap:db_uri(Tgt),
- ServerName = {global, {?MODULE, Rep#rep.id}},
-
- case gen_server:start_link(ServerName, ?MODULE, Rep, []) of
- {ok, Pid} ->
- {ok, Pid};
- {error, Reason} ->
- couch_log:warning("failed to start replication `~s` (`~s` -> `~s`)",
- [RepChildId, Source, Target]),
- {error, Reason}
- end.
-
-
-init(InitArgs) ->
- {ok, InitArgs, 0}.
-
-
-do_init(#rep{options = Options, id = {BaseId, Ext}, user_ctx=UserCtx} = Rep) ->
- process_flag(trap_exit, true),
-
- timer:sleep(startup_jitter()),
-
- #rep_state{
- source = Source,
- target = Target,
- source_name = SourceName,
- target_name = TargetName,
- start_seq = {_Ts, StartSeq},
- highest_seq_done = {_, HighestSeq},
- checkpoint_interval = CheckpointInterval
- } = State = init_state(Rep),
-
- NumWorkers = get_value(worker_processes, Options),
- BatchSize = get_value(worker_batch_size, Options),
- {ok, ChangesQueue} = couch_work_queue:new([
- {max_items, BatchSize * NumWorkers * 2},
- {max_size, 100 * 1024 * NumWorkers}
- ]),
- % This starts the _changes reader process. It adds the changes from
- % the source db to the ChangesQueue.
- {ok, ChangesReader} = couch_replicator_changes_reader:start_link(
- StartSeq, Source, ChangesQueue, Options
- ),
- % Changes manager - responsible for dequeing batches from the changes queue
- % and deliver them to the worker processes.
- ChangesManager = spawn_changes_manager(self(), ChangesQueue, BatchSize),
- % This starts the worker processes. They ask the changes queue manager for a
- % a batch of _changes rows to process -> check which revs are missing in the
- % target, and for the missing ones, it copies them from the source to the target.
- MaxConns = get_value(http_connections, Options),
- Workers = lists:map(
- fun(_) ->
- couch_stats:increment_counter([couch_replicator, workers_started]),
- {ok, Pid} = couch_replicator_worker:start_link(
- self(), Source, Target, ChangesManager, MaxConns),
- Pid
- end,
- lists:seq(1, NumWorkers)),
-
- couch_task_status:add_task([
- {type, replication},
- {user, UserCtx#user_ctx.name},
- {replication_id, ?l2b(BaseId ++ Ext)},
- {database, Rep#rep.db_name},
- {doc_id, Rep#rep.doc_id},
- {source, ?l2b(SourceName)},
- {target, ?l2b(TargetName)},
- {continuous, get_value(continuous, Options, false)},
- {source_seq, HighestSeq},
- {checkpoint_interval, CheckpointInterval}
- ] ++ rep_stats(State)),
- couch_task_status:set_update_frequency(1000),
-
- % Until OTP R14B03:
- %
- % Restarting a temporary supervised child implies that the original arguments
- % (#rep{} record) specified in the MFA component of the supervisor
- % child spec will always be used whenever the child is restarted.
- % This implies the same replication performance tunning parameters will
- % always be used. The solution is to delete the child spec (see
- % cancel_replication/1) and then start the replication again, but this is
- % unfortunately not immune to race conditions.
-
- log_replication_start(State),
- couch_log:debug("Worker pids are: ~p", [Workers]),
-
- doc_update_triggered(Rep),
-
- {ok, State#rep_state{
- changes_queue = ChangesQueue,
- changes_manager = ChangesManager,
- changes_reader = ChangesReader,
- workers = Workers
- }
- }.
-
-
-handle_call({add_stats, Stats}, From, State) ->
- gen_server:reply(From, ok),
- NewStats = couch_replicator_utils:sum_stats(State#rep_state.stats, Stats),
- {noreply, State#rep_state{stats = NewStats}};
-
-handle_call({report_seq_done, Seq, StatsInc}, From,
- #rep_state{seqs_in_progress = SeqsInProgress, highest_seq_done = HighestDone,
- current_through_seq = ThroughSeq, stats = Stats} = State) ->
- gen_server:reply(From, ok),
- {NewThroughSeq0, NewSeqsInProgress} = case SeqsInProgress of
- [] ->
- {Seq, []};
- [Seq | Rest] ->
- {Seq, Rest};
- [_ | _] ->
- {ThroughSeq, ordsets:del_element(Seq, SeqsInProgress)}
- end,
- NewHighestDone = lists:max([HighestDone, Seq]),
- NewThroughSeq = case NewSeqsInProgress of
- [] ->
- lists:max([NewThroughSeq0, NewHighestDone]);
- _ ->
- NewThroughSeq0
- end,
- couch_log:debug("Worker reported seq ~p, through seq was ~p, "
- "new through seq is ~p, highest seq done was ~p, "
- "new highest seq done is ~p~n"
- "Seqs in progress were: ~p~nSeqs in progress are now: ~p",
- [Seq, ThroughSeq, NewThroughSeq, HighestDone,
- NewHighestDone, SeqsInProgress, NewSeqsInProgress]),
- NewState = State#rep_state{
- stats = couch_replicator_utils:sum_stats(Stats, StatsInc),
- current_through_seq = NewThroughSeq,
- seqs_in_progress = NewSeqsInProgress,
- highest_seq_done = NewHighestDone
- },
- update_task(NewState),
- {noreply, NewState}.
-
-
-handle_cast(checkpoint, State) ->
- case do_checkpoint(State) of
- {ok, NewState} ->
- couch_stats:increment_counter([couch_replicator, checkpoints, success]),
- {noreply, NewState#rep_state{timer = start_timer(State)}};
- Error ->
- couch_stats:increment_counter([couch_replicator, checkpoints, failure]),
- {stop, Error, State}
- end;
-
-handle_cast({report_seq, Seq},
- #rep_state{seqs_in_progress = SeqsInProgress} = State) ->
- NewSeqsInProgress = ordsets:add_element(Seq, SeqsInProgress),
- {noreply, State#rep_state{seqs_in_progress = NewSeqsInProgress}}.
-
-
-handle_info(shutdown, St) ->
- {stop, shutdown, St};
-
-handle_info({'EXIT', Pid, max_backoff}, State) ->
- couch_log:error("Max backoff reached child process ~p", [Pid]),
- {stop, {shutdown, max_backoff}, State};
-
-handle_info({'EXIT', Pid, {shutdown, max_backoff}}, State) ->
- couch_log:error("Max backoff reached child process ~p", [Pid]),
- {stop, {shutdown, max_backoff}, State};
-
-handle_info({'EXIT', Pid, normal}, #rep_state{changes_reader=Pid} = State) ->
- {noreply, State};
-
-handle_info({'EXIT', Pid, Reason0}, #rep_state{changes_reader=Pid} = State) ->
- couch_stats:increment_counter([couch_replicator, changes_reader_deaths]),
- Reason = case Reason0 of
- {changes_req_failed, _, _} = HttpFail ->
- HttpFail;
- {http_request_failed, _, _, {error, {code, Code}}} ->
- {changes_req_failed, Code};
- {http_request_failed, _, _, {error, Err}} ->
- {changes_req_failed, Err};
- Other ->
- {changes_reader_died, Other}
- end,
- couch_log:error("ChangesReader process died with reason: ~p", [Reason]),
- {stop, {shutdown, Reason}, cancel_timer(State)};
-
-handle_info({'EXIT', Pid, normal}, #rep_state{changes_manager = Pid} = State) ->
- {noreply, State};
-
-handle_info({'EXIT', Pid, Reason}, #rep_state{changes_manager = Pid} = State) ->
- couch_stats:increment_counter([couch_replicator, changes_manager_deaths]),
- couch_log:error("ChangesManager process died with reason: ~p", [Reason]),
- {stop, {shutdown, {changes_manager_died, Reason}}, cancel_timer(State)};
-
-handle_info({'EXIT', Pid, normal}, #rep_state{changes_queue=Pid} = State) ->
- {noreply, State};
-
-handle_info({'EXIT', Pid, Reason}, #rep_state{changes_queue=Pid} = State) ->
- couch_stats:increment_counter([couch_replicator, changes_queue_deaths]),
- couch_log:error("ChangesQueue process died with reason: ~p", [Reason]),
- {stop, {shutdown, {changes_queue_died, Reason}}, cancel_timer(State)};
-
-handle_info({'EXIT', Pid, normal}, #rep_state{workers = Workers} = State) ->
- case Workers -- [Pid] of
- Workers ->
- couch_log:error("unknown pid bit the dust ~p ~n",[Pid]),
- {noreply, State#rep_state{workers = Workers}};
- %% not clear why a stop was here before
- %%{stop, {unknown_process_died, Pid, normal}, State};
- [] ->
- catch unlink(State#rep_state.changes_manager),
- catch exit(State#rep_state.changes_manager, kill),
- do_last_checkpoint(State);
- Workers2 ->
- {noreply, State#rep_state{workers = Workers2}}
- end;
-
-handle_info({'EXIT', Pid, Reason}, #rep_state{workers = Workers} = State) ->
- State2 = cancel_timer(State),
- case lists:member(Pid, Workers) of
- false ->
- {stop, {unknown_process_died, Pid, Reason}, State2};
- true ->
- couch_stats:increment_counter([couch_replicator, worker_deaths]),
- StopReason = case Reason of
- {shutdown, _} = Err ->
- Err;
- Other ->
- couch_log:error("Worker ~p died with reason: ~p", [Pid, Reason]),
- {worker_died, Pid, Other}
- end,
- {stop, StopReason, State2}
- end;
-
-handle_info(timeout, InitArgs) ->
- try do_init(InitArgs) of {ok, State} ->
- {noreply, State}
- catch
- exit:{http_request_failed, _, _, max_backoff} ->
- {stop, {shutdown, max_backoff}, {error, InitArgs}};
- Class:Error ->
- ShutdownReason = {error, replication_start_error(Error)},
- StackTop2 = lists:sublist(erlang:get_stacktrace(), 2),
- % Shutdown state is a hack as it is not really the state of the
- % gen_server (it failed to initialize, so it doesn't have one).
- % Shutdown state is used to pass extra info about why start failed.
- ShutdownState = {error, Class, StackTop2, InitArgs},
- {stop, {shutdown, ShutdownReason}, ShutdownState}
- end.
-
-
-terminate(normal, #rep_state{rep_details = #rep{id = RepId} = Rep,
- checkpoint_history = CheckpointHistory} = State) ->
- terminate_cleanup(State),
- couch_replicator_notifier:notify({finished, RepId, CheckpointHistory}),
- doc_update_completed(Rep, rep_stats(State));
-
-terminate(shutdown, #rep_state{rep_details = #rep{id = RepId}} = State) ->
- % Replication stopped via _scheduler_sup:terminate_child/1, which can be
- % occur during regular scheduler operation or when job is removed from
- % the scheduler.
- State1 = case do_checkpoint(State) of
- {ok, NewState} ->
- NewState;
- Error ->
- LogMsg = "~p : Failed last checkpoint. Job: ~p Error: ~p",
- couch_log:error(LogMsg, [?MODULE, RepId, Error]),
- State
- end,
- couch_replicator_notifier:notify({stopped, RepId, <<"stopped">>}),
- terminate_cleanup(State1);
-
-terminate({shutdown, max_backoff}, {error, InitArgs}) ->
- #rep{id = {BaseId, Ext} = RepId} = InitArgs,
- couch_stats:increment_counter([couch_replicator, failed_starts]),
- couch_log:warning("Replication `~s` reached max backoff ", [BaseId ++ Ext]),
- couch_replicator_notifier:notify({error, RepId, max_backoff});
-
-terminate({shutdown, {error, Error}}, {error, Class, Stack, InitArgs}) ->
- #rep{
- id = {BaseId, Ext} = RepId,
- source = Source0,
- target = Target0,
- doc_id = DocId,
- db_name = DbName
- } = InitArgs,
- Source = couch_replicator_api_wrap:db_uri(Source0),
- Target = couch_replicator_api_wrap:db_uri(Target0),
- RepIdStr = BaseId ++ Ext,
- Msg = "~p:~p: Replication ~s failed to start ~p -> ~p doc ~p:~p stack:~p",
- couch_log:error(Msg, [Class, Error, RepIdStr, Source, Target, DbName,
- DocId, Stack]),
- couch_stats:increment_counter([couch_replicator, failed_starts]),
- couch_replicator_notifier:notify({error, RepId, Error});
-
-terminate({shutdown, max_backoff}, State) ->
- #rep_state{
- source_name = Source,
- target_name = Target,
- rep_details = #rep{id = {BaseId, Ext} = RepId}
- } = State,
- couch_log:error("Replication `~s` (`~s` -> `~s`) reached max backoff",
- [BaseId ++ Ext, Source, Target]),
- terminate_cleanup(State),
- couch_replicator_notifier:notify({error, RepId, max_backoff});
-
-terminate({shutdown, Reason}, State) ->
- % Unwrap so when reporting we don't have an extra {shutdown, ...} tuple
- % wrapped around the message
- terminate(Reason, State);
-
-terminate(Reason, State) ->
-#rep_state{
- source_name = Source,
- target_name = Target,
- rep_details = #rep{id = {BaseId, Ext} = RepId}
- } = State,
- couch_log:error("Replication `~s` (`~s` -> `~s`) failed: ~s",
- [BaseId ++ Ext, Source, Target, to_binary(Reason)]),
- terminate_cleanup(State),
- couch_replicator_notifier:notify({error, RepId, Reason}).
-
-terminate_cleanup(State) ->
- update_task(State),
- couch_replicator_api_wrap:db_close(State#rep_state.source),
- couch_replicator_api_wrap:db_close(State#rep_state.target).
-
-
-code_change(_OldVsn, #rep_state{}=State, _Extra) ->
- {ok, State}.
-
-
-format_status(_Opt, [_PDict, State]) ->
- #rep_state{
- source = Source,
- target = Target,
- rep_details = RepDetails,
- start_seq = StartSeq,
- source_seq = SourceSeq,
- committed_seq = CommitedSeq,
- current_through_seq = ThroughSeq,
- highest_seq_done = HighestSeqDone,
- session_id = SessionId
- } = state_strip_creds(State),
- #rep{
- id = RepId,
- options = Options,
- doc_id = DocId,
- db_name = DbName
- } = RepDetails,
- [
- {rep_id, RepId},
- {source, couch_replicator_api_wrap:db_uri(Source)},
- {target, couch_replicator_api_wrap:db_uri(Target)},
- {db_name, DbName},
- {doc_id, DocId},
- {options, Options},
- {session_id, SessionId},
- {start_seq, StartSeq},
- {source_seq, SourceSeq},
- {committed_seq, CommitedSeq},
- {current_through_seq, ThroughSeq},
- {highest_seq_done, HighestSeqDone}
- ].
-
-
-startup_jitter() ->
- Jitter = config:get_integer("replicator", "startup_jitter",
- ?STARTUP_JITTER_DEFAULT),
- couch_rand:uniform(erlang:max(1, Jitter)).
-
-
-headers_strip_creds([], Acc) ->
- lists:reverse(Acc);
-headers_strip_creds([{Key, Value0} | Rest], Acc) ->
- Value = case string:to_lower(Key) of
- "authorization" ->
- "****";
- _ ->
- Value0
- end,
- headers_strip_creds(Rest, [{Key, Value} | Acc]).
-
-
-httpdb_strip_creds(#httpdb{url = Url, headers = Headers} = HttpDb) ->
- HttpDb#httpdb{
- url = couch_util:url_strip_password(Url),
- headers = headers_strip_creds(Headers, [])
- };
-httpdb_strip_creds(LocalDb) ->
- LocalDb.
-
-
-rep_strip_creds(#rep{source = Source, target = Target} = Rep) ->
- Rep#rep{
- source = httpdb_strip_creds(Source),
- target = httpdb_strip_creds(Target)
- }.
-
-
-state_strip_creds(#rep_state{rep_details = Rep, source = Source, target = Target} = State) ->
- % #rep_state contains the source and target at the top level and also
- % in the nested #rep_details record
- State#rep_state{
- rep_details = rep_strip_creds(Rep),
- source = httpdb_strip_creds(Source),
- target = httpdb_strip_creds(Target)
- }.
-
-
-adjust_maxconn(Src = #httpdb{http_connections = 1}, RepId) ->
- Msg = "Adjusting minimum number of HTTP source connections to 2 for ~p",
- couch_log:notice(Msg, [RepId]),
- Src#httpdb{http_connections = 2};
-adjust_maxconn(Src, _RepId) ->
- Src.
-
-
--spec doc_update_triggered(#rep{}) -> ok.
-doc_update_triggered(#rep{db_name = null}) ->
- ok;
-doc_update_triggered(#rep{id = RepId, doc_id = DocId} = Rep) ->
- case couch_replicator_doc_processor:update_docs() of
- true ->
- couch_replicator_docs:update_triggered(Rep, RepId);
- false ->
- ok
- end,
- couch_log:notice("Document `~s` triggered replication `~s`",
- [DocId, pp_rep_id(RepId)]),
- ok.
-
-
--spec doc_update_completed(#rep{}, list()) -> ok.
-doc_update_completed(#rep{db_name = null}, _Stats) ->
- ok;
-doc_update_completed(#rep{id = RepId, doc_id = DocId, db_name = DbName,
- start_time = StartTime}, Stats0) ->
- Stats = Stats0 ++ [{start_time, couch_replicator_utils:iso8601(StartTime)}],
- couch_replicator_docs:update_doc_completed(DbName, DocId, Stats),
- couch_log:notice("Replication `~s` completed (triggered by `~s`)",
- [pp_rep_id(RepId), DocId]),
- ok.
-
-
-do_last_checkpoint(#rep_state{seqs_in_progress = [],
- highest_seq_done = {_Ts, ?LOWEST_SEQ}} = State) ->
- {stop, normal, cancel_timer(State)};
-do_last_checkpoint(#rep_state{seqs_in_progress = [],
- highest_seq_done = Seq} = State) ->
- case do_checkpoint(State#rep_state{current_through_seq = Seq}) of
- {ok, NewState} ->
- couch_stats:increment_counter([couch_replicator, checkpoints, success]),
- {stop, normal, cancel_timer(NewState)};
- Error ->
- couch_stats:increment_counter([couch_replicator, checkpoints, failure]),
- {stop, Error, State}
- end.
-
-
-start_timer(State) ->
- After = State#rep_state.checkpoint_interval,
- case timer:apply_after(After, gen_server, cast, [self(), checkpoint]) of
- {ok, Ref} ->
- Ref;
- Error ->
- couch_log:error("Replicator, error scheduling checkpoint: ~p", [Error]),
- nil
- end.
-
-
-cancel_timer(#rep_state{timer = nil} = State) ->
- State;
-cancel_timer(#rep_state{timer = Timer} = State) ->
- {ok, cancel} = timer:cancel(Timer),
- State#rep_state{timer = nil}.
-
-
-init_state(Rep) ->
- #rep{
- id = {BaseId, _Ext},
- source = Src0, target = Tgt,
- options = Options,
- type = Type, view = View,
- start_time = StartTime,
- stats = ArgStats0
- } = Rep,
- % Adjust minimum number of http source connections to 2 to avoid deadlock
- Src = adjust_maxconn(Src0, BaseId),
- {ok, Source} = couch_replicator_api_wrap:db_open(Src),
- {CreateTargetParams} = get_value(create_target_params, Options, {[]}),
- {ok, Target} = couch_replicator_api_wrap:db_open(Tgt,
- get_value(create_target, Options, false), CreateTargetParams),
-
- {ok, SourceInfo} = couch_replicator_api_wrap:get_db_info(Source),
- {ok, TargetInfo} = couch_replicator_api_wrap:get_db_info(Target),
-
- [SourceLog, TargetLog] = find_and_migrate_logs([Source, Target], Rep),
-
- {StartSeq0, History} = compare_replication_logs(SourceLog, TargetLog),
-
- ArgStats1 = couch_replicator_stats:new(ArgStats0),
- HistoryStats = case History of
- [{[_ | _] = HProps} | _] -> couch_replicator_stats:new(HProps);
- _ -> couch_replicator_stats:new()
- end,
- Stats = couch_replicator_stats:max_stats(ArgStats1, HistoryStats),
-
- StartSeq1 = get_value(since_seq, Options, StartSeq0),
- StartSeq = {0, StartSeq1},
-
- SourceSeq = get_value(<<"update_seq">>, SourceInfo, ?LOWEST_SEQ),
-
- #doc{body={CheckpointHistory}} = SourceLog,
- State = #rep_state{
- rep_details = Rep,
- source_name = couch_replicator_api_wrap:db_uri(Source),
- target_name = couch_replicator_api_wrap:db_uri(Target),
- source = Source,
- target = Target,
- history = History,
- checkpoint_history = {[{<<"no_changes">>, true}| CheckpointHistory]},
- start_seq = StartSeq,
- current_through_seq = StartSeq,
- committed_seq = StartSeq,
- source_log = SourceLog,
- target_log = TargetLog,
- rep_starttime = StartTime,
- src_starttime = get_value(<<"instance_start_time">>, SourceInfo),
- tgt_starttime = get_value(<<"instance_start_time">>, TargetInfo),
- session_id = couch_uuids:random(),
- source_seq = SourceSeq,
- use_checkpoints = get_value(use_checkpoints, Options, true),
- checkpoint_interval = get_value(checkpoint_interval, Options,
- ?DEFAULT_CHECKPOINT_INTERVAL),
- type = Type,
- view = View,
- stats = Stats
- },
- State#rep_state{timer = start_timer(State)}.
-
-
-find_and_migrate_logs(DbList, #rep{id = {BaseId, _}} = Rep) ->
- LogId = ?l2b(?LOCAL_DOC_PREFIX ++ BaseId),
- fold_replication_logs(DbList, ?REP_ID_VERSION, LogId, LogId, Rep, []).
-
-
-fold_replication_logs([], _Vsn, _LogId, _NewId, _Rep, Acc) ->
- lists:reverse(Acc);
-
-fold_replication_logs([Db | Rest] = Dbs, Vsn, LogId, NewId, Rep, Acc) ->
- case couch_replicator_api_wrap:open_doc(Db, LogId, [ejson_body]) of
- {error, <<"not_found">>} when Vsn > 1 ->
- OldRepId = couch_replicator_utils:replication_id(Rep, Vsn - 1),
- fold_replication_logs(Dbs, Vsn - 1,
- ?l2b(?LOCAL_DOC_PREFIX ++ OldRepId), NewId, Rep, Acc);
- {error, <<"not_found">>} ->
- fold_replication_logs(
- Rest, ?REP_ID_VERSION, NewId, NewId, Rep, [#doc{id = NewId} | Acc]);
- {ok, Doc} when LogId =:= NewId ->
- fold_replication_logs(
- Rest, ?REP_ID_VERSION, NewId, NewId, Rep, [Doc | Acc]);
- {ok, Doc} ->
- MigratedLog = #doc{id = NewId, body = Doc#doc.body},
- maybe_save_migrated_log(Rep, Db, MigratedLog, Doc#doc.id),
- fold_replication_logs(
- Rest, ?REP_ID_VERSION, NewId, NewId, Rep, [MigratedLog | Acc])
- end.
-
-
-maybe_save_migrated_log(Rep, Db, #doc{} = Doc, OldId) ->
- case get_value(use_checkpoints, Rep#rep.options, true) of
- true ->
- update_checkpoint(Db, Doc),
- Msg = "Migrated replication checkpoint. Db:~p ~p -> ~p",
- couch_log:notice(Msg, [httpdb_strip_creds(Db), OldId, Doc#doc.id]);
- false ->
- ok
- end.
-
-
-spawn_changes_manager(Parent, ChangesQueue, BatchSize) ->
- spawn_link(fun() ->
- changes_manager_loop_open(Parent, ChangesQueue, BatchSize, 1)
- end).
-
-
-changes_manager_loop_open(Parent, ChangesQueue, BatchSize, Ts) ->
- receive
- {get_changes, From} ->
- case couch_work_queue:dequeue(ChangesQueue, BatchSize) of
- closed ->
- From ! {closed, self()};
- {ok, ChangesOrLastSeqs} ->
- ReportSeq = case lists:last(ChangesOrLastSeqs) of
- {last_seq, Seq} ->
- {Ts, Seq};
- #doc_info{high_seq = Seq} ->
- {Ts, Seq}
- end,
- Changes = lists:filter(
- fun(#doc_info{}) ->
- true;
- ({last_seq, _Seq}) ->
- false
- end, ChangesOrLastSeqs),
- ok = gen_server:cast(Parent, {report_seq, ReportSeq}),
- From ! {changes, self(), Changes, ReportSeq}
- end,
- changes_manager_loop_open(Parent, ChangesQueue, BatchSize, Ts + 1)
- end.
-
-
-do_checkpoint(#rep_state{use_checkpoints=false} = State) ->
- NewState = State#rep_state{checkpoint_history = {[{<<"use_checkpoints">>, false}]} },
- {ok, NewState};
-do_checkpoint(#rep_state{current_through_seq=Seq, committed_seq=Seq} = State) ->
- update_task(State),
- {ok, State};
-do_checkpoint(State) ->
- #rep_state{
- source_name=SourceName,
- target_name=TargetName,
- source = Source,
- target = Target,
- history = OldHistory,
- start_seq = {_, StartSeq},
- current_through_seq = {_Ts, NewSeq} = NewTsSeq,
- source_log = SourceLog,
- target_log = TargetLog,
- rep_starttime = ReplicationStartTime,
- src_starttime = SrcInstanceStartTime,
- tgt_starttime = TgtInstanceStartTime,
- stats = Stats,
- rep_details = #rep{options = Options},
- session_id = SessionId
- } = State,
- case commit_to_both(Source, Target) of
- {source_error, Reason} ->
- {checkpoint_commit_failure,
- <<"Failure on source commit: ", (to_binary(Reason))/binary>>};
- {target_error, Reason} ->
- {checkpoint_commit_failure,
- <<"Failure on target commit: ", (to_binary(Reason))/binary>>};
- {SrcInstanceStartTime, TgtInstanceStartTime} ->
- couch_log:notice("recording a checkpoint for `~s` -> `~s` at source update_seq ~p",
- [SourceName, TargetName, NewSeq]),
- LocalStartTime = calendar:now_to_local_time(ReplicationStartTime),
- StartTime = ?l2b(httpd_util:rfc1123_date(LocalStartTime)),
- EndTime = ?l2b(httpd_util:rfc1123_date()),
- NewHistoryEntry = {[
- {<<"session_id">>, SessionId},
- {<<"start_time">>, StartTime},
- {<<"end_time">>, EndTime},
- {<<"start_last_seq">>, StartSeq},
- {<<"end_last_seq">>, NewSeq},
- {<<"recorded_seq">>, NewSeq},
- {<<"missing_checked">>, couch_replicator_stats:missing_checked(Stats)},
- {<<"missing_found">>, couch_replicator_stats:missing_found(Stats)},
- {<<"docs_read">>, couch_replicator_stats:docs_read(Stats)},
- {<<"docs_written">>, couch_replicator_stats:docs_written(Stats)},
- {<<"doc_write_failures">>, couch_replicator_stats:doc_write_failures(Stats)}
- ]},
- BaseHistory = [
- {<<"session_id">>, SessionId},
- {<<"source_last_seq">>, NewSeq},
- {<<"replication_id_version">>, ?REP_ID_VERSION}
- ] ++ case get_value(doc_ids, Options) of
- undefined ->
- [];
- _DocIds ->
- % backwards compatibility with the result of a replication by
- % doc IDs in versions 0.11.x and 1.0.x
- % TODO: deprecate (use same history format, simplify code)
- [
- {<<"start_time">>, StartTime},
- {<<"end_time">>, EndTime},
- {<<"docs_read">>, couch_replicator_stats:docs_read(Stats)},
- {<<"docs_written">>, couch_replicator_stats:docs_written(Stats)},
- {<<"doc_write_failures">>, couch_replicator_stats:doc_write_failures(Stats)}
- ]
- end,
- % limit history to 50 entries
- NewRepHistory = {
- BaseHistory ++
- [{<<"history">>, lists:sublist([NewHistoryEntry | OldHistory], 50)}]
- },
-
- try
- {SrcRevPos, SrcRevId} = update_checkpoint(
- Source, SourceLog#doc{body = NewRepHistory}, source),
- {TgtRevPos, TgtRevId} = update_checkpoint(
- Target, TargetLog#doc{body = NewRepHistory}, target),
- NewState = State#rep_state{
- checkpoint_history = NewRepHistory,
- committed_seq = NewTsSeq,
- source_log = SourceLog#doc{revs={SrcRevPos, [SrcRevId]}},
- target_log = TargetLog#doc{revs={TgtRevPos, [TgtRevId]}}
- },
- update_task(NewState),
- {ok, NewState}
- catch throw:{checkpoint_commit_failure, _} = Failure ->
- Failure
- end;
- {SrcInstanceStartTime, _NewTgtInstanceStartTime} ->
- {checkpoint_commit_failure, <<"Target database out of sync. "
- "Try to increase max_dbs_open at the target's server.">>};
- {_NewSrcInstanceStartTime, TgtInstanceStartTime} ->
- {checkpoint_commit_failure, <<"Source database out of sync. "
- "Try to increase max_dbs_open at the source's server.">>};
- {_NewSrcInstanceStartTime, _NewTgtInstanceStartTime} ->
- {checkpoint_commit_failure, <<"Source and target databases out of "
- "sync. Try to increase max_dbs_open at both servers.">>}
- end.
-
-
-update_checkpoint(Db, Doc, DbType) ->
- try
- update_checkpoint(Db, Doc)
- catch throw:{checkpoint_commit_failure, Reason} ->
- throw({checkpoint_commit_failure,
- <<"Error updating the ", (to_binary(DbType))/binary,
- " checkpoint document: ", (to_binary(Reason))/binary>>})
- end.
-
-
-update_checkpoint(Db, #doc{id = LogId, body = LogBody} = Doc) ->
- try
- case couch_replicator_api_wrap:update_doc(Db, Doc, [delay_commit]) of
- {ok, PosRevId} ->
- PosRevId;
- {error, Reason} ->
- throw({checkpoint_commit_failure, Reason})
- end
- catch throw:conflict ->
- case (catch couch_replicator_api_wrap:open_doc(Db, LogId, [ejson_body])) of
- {ok, #doc{body = LogBody, revs = {Pos, [RevId | _]}}} ->
- % This means that we were able to update successfully the
- % checkpoint doc in a previous attempt but we got a connection
- % error (timeout for e.g.) before receiving the success response.
- % Therefore the request was retried and we got a conflict, as the
- % revision we sent is not the current one.
- % We confirm this by verifying the doc body we just got is the same
- % that we have just sent.
- {Pos, RevId};
- _ ->
- throw({checkpoint_commit_failure, conflict})
- end
- end.
-
-
-commit_to_both(Source, Target) ->
- % commit the src async
- ParentPid = self(),
- SrcCommitPid = spawn_link(
- fun() ->
- Result = (catch couch_replicator_api_wrap:ensure_full_commit(Source)),
- ParentPid ! {self(), Result}
- end),
-
- % commit tgt sync
- TargetResult = (catch couch_replicator_api_wrap:ensure_full_commit(Target)),
-
- SourceResult = receive
- {SrcCommitPid, Result} ->
- unlink(SrcCommitPid),
- receive {'EXIT', SrcCommitPid, _} -> ok after 0 -> ok end,
- Result;
- {'EXIT', SrcCommitPid, Reason} ->
- {error, Reason}
- end,
- case TargetResult of
- {ok, TargetStartTime} ->
- case SourceResult of
- {ok, SourceStartTime} ->
- {SourceStartTime, TargetStartTime};
- SourceError ->
- {source_error, SourceError}
- end;
- TargetError ->
- {target_error, TargetError}
- end.
-
-
-compare_replication_logs(SrcDoc, TgtDoc) ->
- #doc{body={RepRecProps}} = SrcDoc,
- #doc{body={RepRecPropsTgt}} = TgtDoc,
- case get_value(<<"session_id">>, RepRecProps) ==
- get_value(<<"session_id">>, RepRecPropsTgt) of
- true ->
- % if the records have the same session id,
- % then we have a valid replication history
- OldSeqNum = get_value(<<"source_last_seq">>, RepRecProps, ?LOWEST_SEQ),
- OldHistory = get_value(<<"history">>, RepRecProps, []),
- {OldSeqNum, OldHistory};
- false ->
- SourceHistory = get_value(<<"history">>, RepRecProps, []),
- TargetHistory = get_value(<<"history">>, RepRecPropsTgt, []),
- couch_log:notice("Replication records differ. "
- "Scanning histories to find a common ancestor.", []),
- couch_log:debug("Record on source:~p~nRecord on target:~p~n",
- [RepRecProps, RepRecPropsTgt]),
- compare_rep_history(SourceHistory, TargetHistory)
- end.
-
-
-compare_rep_history(S, T) when S =:= [] orelse T =:= [] ->
- couch_log:notice("no common ancestry -- performing full replication", []),
- {?LOWEST_SEQ, []};
-compare_rep_history([{S} | SourceRest], [{T} | TargetRest] = Target) ->
- SourceId = get_value(<<"session_id">>, S),
- case has_session_id(SourceId, Target) of
- true ->
- RecordSeqNum = get_value(<<"recorded_seq">>, S, ?LOWEST_SEQ),
- couch_log:notice("found a common replication record with source_seq ~p",
- [RecordSeqNum]),
- {RecordSeqNum, SourceRest};
- false ->
- TargetId = get_value(<<"session_id">>, T),
- case has_session_id(TargetId, SourceRest) of
- true ->
- RecordSeqNum = get_value(<<"recorded_seq">>, T, ?LOWEST_SEQ),
- couch_log:notice("found a common replication record with source_seq ~p",
- [RecordSeqNum]),
- {RecordSeqNum, TargetRest};
- false ->
- compare_rep_history(SourceRest, TargetRest)
- end
- end.
-
-
-has_session_id(_SessionId, []) ->
- false;
-has_session_id(SessionId, [{Props} | Rest]) ->
- case get_value(<<"session_id">>, Props, nil) of
- SessionId ->
- true;
- _Else ->
- has_session_id(SessionId, Rest)
- end.
-
-
-get_pending_count(St) ->
- Rep = St#rep_state.rep_details,
- Timeout = get_value(connection_timeout, Rep#rep.options),
- TimeoutMicro = Timeout * 1000,
- case get(pending_count_state) of
- {LastUpdate, PendingCount} ->
- case timer:now_diff(os:timestamp(), LastUpdate) > TimeoutMicro of
- true ->
- NewPendingCount = get_pending_count_int(St),
- put(pending_count_state, {os:timestamp(), NewPendingCount}),
- NewPendingCount;
- false ->
- PendingCount
- end;
- undefined ->
- NewPendingCount = get_pending_count_int(St),
- put(pending_count_state, {os:timestamp(), NewPendingCount}),
- NewPendingCount
- end.
-
-
-get_pending_count_int(#rep_state{source = #httpdb{} = Db0}=St) ->
- {_, Seq} = St#rep_state.highest_seq_done,
- Db = Db0#httpdb{retries = 3},
- case (catch couch_replicator_api_wrap:get_pending_count(Db, Seq)) of
- {ok, Pending} ->
- Pending;
- _ ->
- null
- end;
-get_pending_count_int(#rep_state{source = Db}=St) ->
- {_, Seq} = St#rep_state.highest_seq_done,
- {ok, Pending} = couch_replicator_api_wrap:get_pending_count(Db, Seq),
- Pending.
-
-
-update_task(State) ->
- #rep_state{
- rep_details = #rep{id = JobId},
- current_through_seq = {_, ThroughSeq},
- highest_seq_done = {_, HighestSeq}
- } = State,
- Status = rep_stats(State) ++ [
- {source_seq, HighestSeq},
- {through_seq, ThroughSeq}
- ],
- couch_replicator_scheduler:update_job_stats(JobId, Status),
- couch_task_status:update(Status).
-
-
-rep_stats(State) ->
- #rep_state{
- committed_seq = {_, CommittedSeq},
- stats = Stats
- } = State,
- [
- {revisions_checked, couch_replicator_stats:missing_checked(Stats)},
- {missing_revisions_found, couch_replicator_stats:missing_found(Stats)},
- {docs_read, couch_replicator_stats:docs_read(Stats)},
- {docs_written, couch_replicator_stats:docs_written(Stats)},
- {changes_pending, get_pending_count(State)},
- {doc_write_failures, couch_replicator_stats:doc_write_failures(Stats)},
- {checkpointed_source_seq, CommittedSeq}
- ].
-
-
-replication_start_error({unauthorized, DbUri}) ->
- {unauthorized, <<"unauthorized to access or create database ", DbUri/binary>>};
-replication_start_error({db_not_found, DbUri}) ->
- {db_not_found, <<"could not open ", DbUri/binary>>};
-replication_start_error({http_request_failed, _Method, Url0,
- {error, {error, {conn_failed, {error, nxdomain}}}}}) ->
- Url = ?l2b(couch_util:url_strip_password(Url0)),
- {nxdomain, <<"could not resolve ", Url/binary>>};
-replication_start_error({http_request_failed, Method0, Url0,
- {error, {code, Code}}}) when is_integer(Code) ->
- Url = ?l2b(couch_util:url_strip_password(Url0)),
- Method = ?l2b(Method0),
- {http_error_code, Code, <<Method/binary, " ", Url/binary>>};
-replication_start_error(Error) ->
- Error.
-
-
-log_replication_start(#rep_state{rep_details = Rep} = RepState) ->
- #rep{
- id = {BaseId, Ext},
- doc_id = DocId,
- db_name = DbName,
- options = Options
- } = Rep,
- Id = BaseId ++ Ext,
- Workers = get_value(worker_processes, Options),
- BatchSize = get_value(worker_batch_size, Options),
- #rep_state{
- source_name = Source, % credentials already stripped
- target_name = Target, % credentials already stripped
- session_id = Sid
- } = RepState,
- From = case DbName of
- ShardName when is_binary(ShardName) ->
- io_lib:format("from doc ~s:~s", [mem3:dbname(ShardName), DocId]);
- _ ->
- "from _replicate endpoint"
- end,
- Msg = "Starting replication ~s (~s -> ~s) ~s worker_procesess:~p"
- " worker_batch_size:~p session_id:~s",
- couch_log:notice(Msg, [Id, Source, Target, From, Workers, BatchSize, Sid]).
-
-
--ifdef(TEST).
-
--include_lib("eunit/include/eunit.hrl").
-
-
-replication_start_error_test() ->
- ?assertEqual({unauthorized, <<"unauthorized to access or create database"
- " http://x/y">>}, replication_start_error({unauthorized,
- <<"http://x/y">>})),
- ?assertEqual({db_not_found, <<"could not open http://x/y">>},
- replication_start_error({db_not_found, <<"http://x/y">>})),
- ?assertEqual({nxdomain,<<"could not resolve http://x/y">>},
- replication_start_error({http_request_failed, "GET", "http://x/y",
- {error, {error, {conn_failed, {error, nxdomain}}}}})),
- ?assertEqual({http_error_code,503,<<"GET http://x/y">>},
- replication_start_error({http_request_failed, "GET", "http://x/y",
- {error, {code, 503}}})).
-
-
-scheduler_job_format_status_test() ->
- Source = <<"http://u:p@h1/d1">>,
- Target = <<"http://u:p@h2/d2">>,
- Rep = #rep{
- id = {"base", "+ext"},
- source = couch_replicator_docs:parse_rep_db(Source, [], []),
- target = couch_replicator_docs:parse_rep_db(Target, [], []),
- options = [{create_target, true}],
- doc_id = <<"mydoc">>,
- db_name = <<"mydb">>
- },
- State = #rep_state{
- rep_details = Rep,
- source = Rep#rep.source,
- target = Rep#rep.target,
- session_id = <<"a">>,
- start_seq = <<"1">>,
- source_seq = <<"2">>,
- committed_seq = <<"3">>,
- current_through_seq = <<"4">>,
- highest_seq_done = <<"5">>
- },
- Format = format_status(opts_ignored, [pdict, State]),
- ?assertEqual("http://u:*****@h1/d1/", proplists:get_value(source, Format)),
- ?assertEqual("http://u:*****@h2/d2/", proplists:get_value(target, Format)),
- ?assertEqual({"base", "+ext"}, proplists:get_value(rep_id, Format)),
- ?assertEqual([{create_target, true}], proplists:get_value(options, Format)),
- ?assertEqual(<<"mydoc">>, proplists:get_value(doc_id, Format)),
- ?assertEqual(<<"mydb">>, proplists:get_value(db_name, Format)),
- ?assertEqual(<<"a">>, proplists:get_value(session_id, Format)),
- ?assertEqual(<<"1">>, proplists:get_value(start_seq, Format)),
- ?assertEqual(<<"2">>, proplists:get_value(source_seq, Format)),
- ?assertEqual(<<"3">>, proplists:get_value(committed_seq, Format)),
- ?assertEqual(<<"4">>, proplists:get_value(current_through_seq, Format)),
- ?assertEqual(<<"5">>, proplists:get_value(highest_seq_done, Format)).
-
-
--endif.
diff --git a/src/couch_replicator/src/couch_replicator_scheduler_sup.erl b/src/couch_replicator/src/couch_replicator_scheduler_sup.erl
deleted file mode 100644
index 8ab55f838..000000000
--- a/src/couch_replicator/src/couch_replicator_scheduler_sup.erl
+++ /dev/null
@@ -1,62 +0,0 @@
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-%
-% http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
-
--module(couch_replicator_scheduler_sup).
-
--behaviour(supervisor).
-
-%% public api
--export([
- start_link/0,
- start_child/1,
- terminate_child/1
-]).
-
-%% supervisor api
--export([
- init/1
-]).
-
-
-%% includes
--include("couch_replicator.hrl").
-
-
-%% public functions
-
-start_link() ->
- supervisor:start_link({local, ?MODULE}, ?MODULE, []).
-
-
-start_child(#rep{} = Rep) ->
- supervisor:start_child(?MODULE, [Rep]).
-
-
-terminate_child(Pid) ->
- supervisor:terminate_child(?MODULE, Pid).
-
-%% supervisor functions
-
-init(_Args) ->
- Start = {couch_replicator_scheduler_job, start_link, []},
- Restart = temporary, % A crashed job is not entitled to immediate restart.
- Shutdown = 5000,
- Type = worker,
- Modules = [couch_replicator_scheduler_job],
-
- RestartStrategy = simple_one_for_one,
- MaxR = 10,
- MaxT = 3,
-
- ChildSpec =
- {undefined, Start, Restart, Shutdown, Type, Modules},
- {ok, {{RestartStrategy, MaxR, MaxT}, [ChildSpec]}}.
diff --git a/src/couch_replicator/test/eunit/couch_replicator_compact_tests.erl b/src/couch_replicator/test/eunit/couch_replicator_compact_tests.erl
deleted file mode 100644
index 997c84863..000000000
--- a/src/couch_replicator/test/eunit/couch_replicator_compact_tests.erl
+++ /dev/null
@@ -1,455 +0,0 @@
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-%
-% http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
-
--module(couch_replicator_compact_tests).
-
--include_lib("couch/include/couch_eunit.hrl").
--include_lib("couch/include/couch_db.hrl").
--include_lib("couch_replicator/src/couch_replicator.hrl").
-
--import(couch_replicator_test_helper, [
- db_url/1,
- get_pid/1
-]).
-
--define(ATTFILE, filename:join([?FIXTURESDIR, "logo.png"])).
--define(DELAY, 500).
--define(TIMEOUT, 360000).
--define(TIMEOUT_WRITER, 100000).
--define(TIMEOUT_EUNIT, ?TIMEOUT div 1000 + 70).
--define(WRITE_BATCH_SIZE, 25).
-
-setup() ->
- DbName = ?tempdb(),
- {ok, Db} = couch_db:create(DbName, [?ADMIN_CTX]),
- ok = couch_db:close(Db),
- DbName.
-
-setup(remote) ->
- {remote, setup()};
-setup({A, B}) ->
- Ctx = test_util:start_couch([couch_replicator]),
- Source = setup(A),
- Target = setup(B),
- {Ctx, {Source, Target}}.
-
-teardown({remote, DbName}) ->
- teardown(DbName);
-teardown(DbName) ->
- ok = couch_server:delete(DbName, [?ADMIN_CTX]),
- ok.
-
-teardown(_, {Ctx, {Source, Target}}) ->
- teardown(Source),
- teardown(Target),
- ok = application:stop(couch_replicator),
- ok = test_util:stop_couch(Ctx).
-
-compact_test_() ->
- Pairs = [{remote, remote}],
- {
- "Compaction during replication tests",
- {
- foreachx,
- fun setup/1, fun teardown/2,
- [{Pair, fun should_populate_replicate_compact/2}
- || Pair <- Pairs]
- }
- }.
-
-
-should_populate_replicate_compact({From, To}, {_Ctx, {Source, Target}}) ->
- {ok, RepPid, RepId} = replicate(Source, Target),
- {lists:flatten(io_lib:format("~p -> ~p", [From, To])),
- {inorder, [
- should_run_replication(RepPid, RepId, Source, Target),
- should_all_processes_be_alive(RepPid, Source, Target),
- should_populate_and_compact(RepPid, Source, Target, 50, 3),
- should_wait_target_in_sync(Source, Target),
- should_ensure_replication_still_running(RepPid, RepId, Source, Target),
- should_cancel_replication(RepId, RepPid),
- should_compare_databases(Source, Target)
- ]}}.
-
-should_all_processes_be_alive(RepPid, Source, Target) ->
- ?_test(begin
- {ok, SourceDb} = reopen_db(Source),
- {ok, TargetDb} = reopen_db(Target),
- ?assert(is_process_alive(RepPid)),
- ?assert(is_process_alive(couch_db:get_pid(SourceDb))),
- ?assert(is_process_alive(couch_db:get_pid(TargetDb)))
- end).
-
-should_run_replication(RepPid, RepId, Source, Target) ->
- ?_test(check_active_tasks(RepPid, RepId, Source, Target)).
-
-should_ensure_replication_still_running(RepPid, RepId, Source, Target) ->
- ?_test(check_active_tasks(RepPid, RepId, Source, Target)).
-
-check_active_tasks(RepPid, {BaseId, Ext} = _RepId, Src, Tgt) ->
- Source = case Src of
- {remote, NameSrc} ->
- <<(db_url(NameSrc))/binary, $/>>;
- _ ->
- Src
- end,
- Target = case Tgt of
- {remote, NameTgt} ->
- <<(db_url(NameTgt))/binary, $/>>;
- _ ->
- Tgt
- end,
- FullRepId = ?l2b(BaseId ++ Ext),
- Pid = ?l2b(pid_to_list(RepPid)),
- RepTasks = wait_for_task_status(),
- ?assertNotEqual(timeout, RepTasks),
- [RepTask] = RepTasks,
- ?assertEqual(Pid, couch_util:get_value(pid, RepTask)),
- ?assertEqual(FullRepId, couch_util:get_value(replication_id, RepTask)),
- ?assertEqual(true, couch_util:get_value(continuous, RepTask)),
- ?assertEqual(Source, couch_util:get_value(source, RepTask)),
- ?assertEqual(Target, couch_util:get_value(target, RepTask)),
- ?assert(is_integer(couch_util:get_value(docs_read, RepTask))),
- ?assert(is_integer(couch_util:get_value(docs_written, RepTask))),
- ?assert(is_integer(couch_util:get_value(doc_write_failures, RepTask))),
- ?assert(is_integer(couch_util:get_value(revisions_checked, RepTask))),
- ?assert(is_integer(couch_util:get_value(missing_revisions_found, RepTask))),
- ?assert(is_integer(couch_util:get_value(checkpointed_source_seq, RepTask))),
- ?assert(is_integer(couch_util:get_value(source_seq, RepTask))),
- Pending = couch_util:get_value(changes_pending, RepTask),
- ?assert(is_integer(Pending)).
-
-replication_tasks() ->
- lists:filter(fun(P) ->
- couch_util:get_value(type, P) =:= replication
- end, couch_task_status:all()).
-
-
-wait_for_task_status() ->
- test_util:wait(fun() ->
- case replication_tasks() of
- [] ->
- wait;
- Tasks ->
- Tasks
- end
- end).
-
-should_cancel_replication(RepId, RepPid) ->
- ?_assertNot(begin
- ok = couch_replicator_scheduler:remove_job(RepId),
- is_process_alive(RepPid)
- end).
-
-should_populate_and_compact(RepPid, Source, Target, BatchSize, Rounds) ->
- {timeout, ?TIMEOUT_EUNIT, ?_test(begin
- {ok, SourceDb0} = reopen_db(Source),
- Writer = spawn_writer(SourceDb0),
- lists:foreach(
- fun(N) ->
- {ok, SourceDb} = reopen_db(Source),
- {ok, TargetDb} = reopen_db(Target),
- pause_writer(Writer),
-
- compact_db("source", SourceDb),
- ?assert(is_process_alive(RepPid)),
- ?assert(is_process_alive(couch_db:get_pid(SourceDb))),
- wait_for_compaction("source", SourceDb),
-
- compact_db("target", TargetDb),
- ?assert(is_process_alive(RepPid)),
- ?assert(is_process_alive(couch_db:get_pid(TargetDb))),
- wait_for_compaction("target", TargetDb),
-
- {ok, SourceDb2} = reopen_db(SourceDb),
- {ok, TargetDb2} = reopen_db(TargetDb),
-
- resume_writer(Writer),
- wait_writer(Writer, BatchSize * N),
-
- compact_db("source", SourceDb2),
- ?assert(is_process_alive(RepPid)),
- ?assert(is_process_alive(couch_db:get_pid(SourceDb2))),
- pause_writer(Writer),
- wait_for_compaction("source", SourceDb2),
- resume_writer(Writer),
-
- compact_db("target", TargetDb2),
- ?assert(is_process_alive(RepPid)),
- ?assert(is_process_alive(couch_db:get_pid(TargetDb2))),
- pause_writer(Writer),
- wait_for_compaction("target", TargetDb2),
- resume_writer(Writer)
- end, lists:seq(1, Rounds)),
- stop_writer(Writer)
- end)}.
-
-should_wait_target_in_sync({remote, Source}, Target) ->
- should_wait_target_in_sync(Source, Target);
-should_wait_target_in_sync(Source, {remote, Target}) ->
- should_wait_target_in_sync(Source, Target);
-should_wait_target_in_sync(Source, Target) ->
- {timeout, ?TIMEOUT_EUNIT, ?_assert(begin
- {ok, SourceDb} = couch_db:open_int(Source, []),
- {ok, SourceInfo} = couch_db:get_db_info(SourceDb),
- ok = couch_db:close(SourceDb),
- SourceDocCount = couch_util:get_value(doc_count, SourceInfo),
- wait_target_in_sync_loop(SourceDocCount, Target, 300)
- end)}.
-
-wait_target_in_sync_loop(_DocCount, _TargetName, 0) ->
- erlang:error(
- {assertion_failed,
- [{module, ?MODULE}, {line, ?LINE},
- {reason, "Could not get source and target databases in sync"}]});
-wait_target_in_sync_loop(DocCount, {remote, TargetName}, RetriesLeft) ->
- wait_target_in_sync_loop(DocCount, TargetName, RetriesLeft);
-wait_target_in_sync_loop(DocCount, TargetName, RetriesLeft) ->
- {ok, Target} = couch_db:open_int(TargetName, []),
- {ok, TargetInfo} = couch_db:get_db_info(Target),
- ok = couch_db:close(Target),
- TargetDocCount = couch_util:get_value(doc_count, TargetInfo),
- case TargetDocCount == DocCount of
- true ->
- true;
- false ->
- ok = timer:sleep(?DELAY),
- wait_target_in_sync_loop(DocCount, TargetName, RetriesLeft - 1)
- end.
-
-should_compare_databases({remote, Source}, Target) ->
- should_compare_databases(Source, Target);
-should_compare_databases(Source, {remote, Target}) ->
- should_compare_databases(Source, Target);
-should_compare_databases(Source, Target) ->
- {timeout, 35, ?_test(begin
- {ok, SourceDb} = couch_db:open_int(Source, []),
- {ok, TargetDb} = couch_db:open_int(Target, []),
- Fun = fun(FullDocInfo, Acc) ->
- {ok, Doc} = couch_db:open_doc(SourceDb, FullDocInfo),
- {Props} = DocJson = couch_doc:to_json_obj(Doc, [attachments]),
- DocId = couch_util:get_value(<<"_id">>, Props),
- DocTarget = case couch_db:open_doc(TargetDb, DocId) of
- {ok, DocT} ->
- DocT;
- Error ->
- erlang:error(
- {assertion_failed,
- [{module, ?MODULE}, {line, ?LINE},
- {reason, lists:concat(["Error opening document '",
- ?b2l(DocId), "' from target: ",
- couch_util:to_list(Error)])}]})
- end,
- DocTargetJson = couch_doc:to_json_obj(DocTarget, [attachments]),
- ?assertEqual(DocJson, DocTargetJson),
- {ok, Acc}
- end,
- {ok, _} = couch_db:fold_docs(SourceDb, Fun, [], []),
- ok = couch_db:close(SourceDb),
- ok = couch_db:close(TargetDb)
- end)}.
-
-
-reopen_db({remote, Db}) ->
- reopen_db(Db);
-reopen_db(DbName) when is_binary(DbName) ->
- {ok, Db} = couch_db:open_int(DbName, []),
- ok = couch_db:close(Db),
- {ok, Db};
-reopen_db(Db) ->
- reopen_db(couch_db:name(Db)).
-
-
-compact_db(Type, Db0) ->
- Name = couch_db:name(Db0),
- {ok, Db} = couch_db:open_int(Name, []),
- {ok, CompactPid} = couch_db:start_compact(Db),
- MonRef = erlang:monitor(process, CompactPid),
- receive
- {'DOWN', MonRef, process, CompactPid, normal} ->
- ok;
- {'DOWN', MonRef, process, CompactPid, noproc} ->
- ok;
- {'DOWN', MonRef, process, CompactPid, Reason} ->
- erlang:error(
- {assertion_failed,
- [{module, ?MODULE}, {line, ?LINE},
- {reason,
- lists:concat(["Error compacting ", Type, " database ",
- ?b2l(Name), ": ",
- couch_util:to_list(Reason)])}]})
- after ?TIMEOUT ->
- erlang:error(
- {assertion_failed,
- [{module, ?MODULE}, {line, ?LINE},
- {reason, lists:concat(["Compaction for ", Type, " database ",
- ?b2l(Name), " didn't finish"])}]})
- end,
- ok = couch_db:close(Db).
-
-wait_for_compaction(Type, Db) ->
- case couch_db:wait_for_compaction(Db) of
- ok ->
- ok;
- {error, noproc} ->
- ok;
- {error, Reason} ->
- erlang:error(
- {assertion_failed,
- [{module, ?MODULE}, {line, ?LINE},
- {reason, lists:concat(["Compaction of ", Type,
- " database failed with: ", Reason])}]})
- end.
-
-replicate({remote, Db}, Target) ->
- replicate(db_url(Db), Target);
-
-replicate(Source, {remote, Db}) ->
- replicate(Source, db_url(Db));
-
-replicate(Source, Target) ->
- RepObject = {[
- {<<"source">>, Source},
- {<<"target">>, Target},
- {<<"continuous">>, true}
- ]},
- {ok, Rep} = couch_replicator_utils:parse_rep_doc(RepObject, ?ADMIN_USER),
- ok = couch_replicator_scheduler:add_job(Rep),
- couch_replicator_scheduler:reschedule(),
- Pid = get_pid(Rep#rep.id),
- {ok, Pid, Rep#rep.id}.
-
-
-wait_writer(Pid, NumDocs) ->
- case get_writer_num_docs_written(Pid) of
- N when N >= NumDocs ->
- ok;
- _ ->
- wait_writer(Pid, NumDocs)
- end.
-
-spawn_writer(Db) ->
- Parent = self(),
- Pid = spawn(fun() -> writer_loop(Db, Parent, 0) end),
- Pid.
-
-
-pause_writer(Pid) ->
- Ref = make_ref(),
- Pid ! {pause, Ref},
- receive
- {paused, Ref} ->
- ok
- after ?TIMEOUT_WRITER ->
- erlang:error({assertion_failed,
- [{module, ?MODULE},
- {line, ?LINE},
- {reason, "Failed to pause source database writer"}]})
- end.
-
-resume_writer(Pid) ->
- Ref = make_ref(),
- Pid ! {continue, Ref},
- receive
- {ok, Ref} ->
- ok
- after ?TIMEOUT_WRITER ->
- erlang:error({assertion_failed,
- [{module, ?MODULE},
- {line, ?LINE},
- {reason, "Failed to pause source database writer"}]})
- end.
-
-get_writer_num_docs_written(Pid) ->
- Ref = make_ref(),
- Pid ! {get_count, Ref},
- receive
- {count, Ref, Count} ->
- Count
- after ?TIMEOUT_WRITER ->
- erlang:error({assertion_failed,
- [{module, ?MODULE},
- {line, ?LINE},
- {reason, "Timeout getting number of documents written"
- " from source database writer"}]})
- end.
-
-stop_writer(Pid) ->
- Ref = make_ref(),
- Pid ! {stop, Ref},
- receive
- {stopped, Ref, DocsWritten} ->
- MonRef = erlang:monitor(process, Pid),
- receive
- {'DOWN', MonRef, process, Pid, _Reason} ->
- DocsWritten
- after ?TIMEOUT ->
- erlang:error({assertion_failed,
- [{module, ?MODULE},
- {line, ?LINE},
- {reason, "Timeout stopping source database writer"}]})
- end
- after ?TIMEOUT_WRITER ->
- erlang:error({assertion_failed,
- [{module, ?MODULE},
- {line, ?LINE},
- {reason, "Timeout stopping source database writer"}]})
- end.
-
-writer_loop(Db0, Parent, Counter) ->
- DbName = couch_db:name(Db0),
- {ok, Data} = file:read_file(?ATTFILE),
- maybe_pause(Parent, Counter),
- Docs = lists:map(fun(I) ->
- couch_doc:from_json_obj({[
- {<<"_id">>, ?l2b(integer_to_list(Counter + I))},
- {<<"value">>, Counter + I},
- {<<"_attachments">>, {[
- {<<"icon1.png">>, {[
- {<<"data">>, base64:encode(Data)},
- {<<"content_type">>, <<"image/png">>}
- ]}},
- {<<"icon2.png">>, {[
- {<<"data">>, base64:encode(iolist_to_binary([Data, Data]))},
- {<<"content_type">>, <<"image/png">>}
- ]}}
- ]}}
- ]})
- end, lists:seq(1, ?WRITE_BATCH_SIZE)),
- maybe_pause(Parent, Counter),
- {ok, Db} = couch_db:open_int(DbName, []),
- {ok, _} = couch_db:update_docs(Db, Docs, []),
- ok = couch_db:close(Db),
- receive
- {get_count, Ref} ->
- Parent ! {count, Ref, Counter + ?WRITE_BATCH_SIZE},
- writer_loop(Db, Parent, Counter + ?WRITE_BATCH_SIZE);
- {stop, Ref} ->
- Parent ! {stopped, Ref, Counter + ?WRITE_BATCH_SIZE}
- after 0 ->
- timer:sleep(?DELAY),
- writer_loop(Db, Parent, Counter + ?WRITE_BATCH_SIZE)
- end.
-
-maybe_pause(Parent, Counter) ->
- receive
- {get_count, Ref} ->
- Parent ! {count, Ref, Counter};
- {pause, Ref} ->
- Parent ! {paused, Ref},
- receive
- {continue, Ref2} ->
- Parent ! {ok, Ref2}
- end
- after 0 ->
- ok
- end.
diff --git a/src/couch_replicator/test/eunit/couch_replicator_error_reporting_tests.erl b/src/couch_replicator/test/eunit/couch_replicator_error_reporting_tests.erl
deleted file mode 100644
index 6b4f95c25..000000000
--- a/src/couch_replicator/test/eunit/couch_replicator_error_reporting_tests.erl
+++ /dev/null
@@ -1,271 +0,0 @@
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-%
-% http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
-
--module(couch_replicator_error_reporting_tests).
-
--include_lib("couch/include/couch_eunit.hrl").
--include_lib("couch/include/couch_db.hrl").
--include_lib("couch_replicator/src/couch_replicator.hrl").
-
-
-setup_all() ->
- test_util:start_couch([couch_replicator, chttpd, mem3, fabric]).
-
-
-teardown_all(Ctx) ->
- ok = test_util:stop_couch(Ctx).
-
-
-setup() ->
- meck:unload(),
- Source = setup_db(),
- Target = setup_db(),
- {Source, Target}.
-
-
-teardown({Source, Target}) ->
- meck:unload(),
- teardown_db(Source),
- teardown_db(Target),
- ok.
-
-
-error_reporting_test_() ->
- {
- setup,
- fun setup_all/0,
- fun teardown_all/1,
- {
- foreach,
- fun setup/0,
- fun teardown/1,
- [
- fun t_fail_bulk_docs/1,
- fun t_fail_changes_reader/1,
- fun t_fail_revs_diff/1,
- fun t_fail_changes_queue/1,
- fun t_fail_changes_manager/1,
- fun t_fail_changes_reader_proc/1
- ]
- }
- }.
-
-
-t_fail_bulk_docs({Source, Target}) ->
- ?_test(begin
- populate_db(Source, 1, 5),
- {ok, RepId} = replicate(Source, Target),
- wait_target_in_sync(Source, Target),
-
- {ok, Listener} = rep_result_listener(RepId),
- mock_fail_req("/_bulk_docs", {ok, "403", [], [<<"{\"x\":\"y\"}">>]}),
- populate_db(Source, 6, 6),
-
- {error, Result} = wait_rep_result(RepId),
- ?assertEqual({bulk_docs_failed, 403, {[{<<"x">>, <<"y">>}]}}, Result),
-
- couch_replicator_notifier:stop(Listener)
- end).
-
-
-t_fail_changes_reader({Source, Target}) ->
- ?_test(begin
- populate_db(Source, 1, 5),
- {ok, RepId} = replicate(Source, Target),
- wait_target_in_sync(Source, Target),
-
- {ok, Listener} = rep_result_listener(RepId),
- mock_fail_req("/_changes", {ok, "418", [], [<<"{\"x\":\"y\"}">>]}),
- populate_db(Source, 6, 6),
-
- {error, Result} = wait_rep_result(RepId),
- ?assertEqual({changes_req_failed, 418, {[{<<"x">>, <<"y">>}]}}, Result),
-
- couch_replicator_notifier:stop(Listener)
- end).
-
-
-t_fail_revs_diff({Source, Target}) ->
- ?_test(begin
- populate_db(Source, 1, 5),
- {ok, RepId} = replicate(Source, Target),
- wait_target_in_sync(Source, Target),
-
- {ok, Listener} = rep_result_listener(RepId),
- mock_fail_req("/_revs_diff", {ok, "407", [], [<<"{\"x\":\"y\"}">>]}),
- populate_db(Source, 6, 6),
-
- {error, Result} = wait_rep_result(RepId),
- ?assertEqual({revs_diff_failed, 407, {[{<<"x">>, <<"y">>}]}}, Result),
-
- couch_replicator_notifier:stop(Listener)
- end).
-
-
-t_fail_changes_queue({Source, Target}) ->
- ?_test(begin
- populate_db(Source, 1, 5),
- {ok, RepId} = replicate(Source, Target),
- wait_target_in_sync(Source, Target),
-
- RepPid = couch_replicator_test_helper:get_pid(RepId),
- State = sys:get_state(RepPid),
- ChangesQueue = element(20, State),
- ?assert(is_process_alive(ChangesQueue)),
-
- {ok, Listener} = rep_result_listener(RepId),
- exit(ChangesQueue, boom),
-
- {error, Result} = wait_rep_result(RepId),
- ?assertEqual({changes_queue_died, boom}, Result),
- couch_replicator_notifier:stop(Listener)
- end).
-
-
-t_fail_changes_manager({Source, Target}) ->
- ?_test(begin
- populate_db(Source, 1, 5),
- {ok, RepId} = replicate(Source, Target),
- wait_target_in_sync(Source, Target),
-
- RepPid = couch_replicator_test_helper:get_pid(RepId),
- State = sys:get_state(RepPid),
- ChangesManager = element(21, State),
- ?assert(is_process_alive(ChangesManager)),
-
- {ok, Listener} = rep_result_listener(RepId),
- exit(ChangesManager, bam),
-
- {error, Result} = wait_rep_result(RepId),
- ?assertEqual({changes_manager_died, bam}, Result),
- couch_replicator_notifier:stop(Listener)
- end).
-
-
-t_fail_changes_reader_proc({Source, Target}) ->
- ?_test(begin
- populate_db(Source, 1, 5),
- {ok, RepId} = replicate(Source, Target),
- wait_target_in_sync(Source, Target),
-
- RepPid = couch_replicator_test_helper:get_pid(RepId),
- State = sys:get_state(RepPid),
- ChangesReader = element(22, State),
- ?assert(is_process_alive(ChangesReader)),
-
- {ok, Listener} = rep_result_listener(RepId),
- exit(ChangesReader, kapow),
-
- {error, Result} = wait_rep_result(RepId),
- ?assertEqual({changes_reader_died, kapow}, Result),
- couch_replicator_notifier:stop(Listener)
- end).
-
-
-mock_fail_req(Path, Return) ->
- meck:expect(ibrowse, send_req_direct,
- fun(W, Url, Headers, Meth, Body, Opts, TOut) ->
- Args = [W, Url, Headers, Meth, Body, Opts, TOut],
- {ok, {_, _, _, _, UPath, _}} = http_uri:parse(Url),
- case lists:suffix(Path, UPath) of
- true -> Return;
- false -> meck:passthrough(Args)
- end
- end).
-
-
-rep_result_listener(RepId) ->
- ReplyTo = self(),
- {ok, _Listener} = couch_replicator_notifier:start_link(
- fun({_, RepId2, _} = Ev) when RepId2 =:= RepId ->
- ReplyTo ! Ev;
- (_) ->
- ok
- end).
-
-
-wait_rep_result(RepId) ->
- receive
- {finished, RepId, RepResult} -> {ok, RepResult};
- {error, RepId, Reason} -> {error, Reason}
- end.
-
-
-
-setup_db() ->
- DbName = ?tempdb(),
- {ok, Db} = couch_db:create(DbName, [?ADMIN_CTX]),
- ok = couch_db:close(Db),
- DbName.
-
-
-teardown_db(DbName) ->
- ok = couch_server:delete(DbName, [?ADMIN_CTX]).
-
-
-populate_db(DbName, Start, End) ->
- {ok, Db} = couch_db:open_int(DbName, []),
- Docs = lists:foldl(
- fun(DocIdCounter, Acc) ->
- Id = integer_to_binary(DocIdCounter),
- Doc = #doc{id = Id, body = {[]}},
- [Doc | Acc]
- end,
- [], lists:seq(Start, End)),
- {ok, _} = couch_db:update_docs(Db, Docs, []),
- ok = couch_db:close(Db).
-
-
-wait_target_in_sync(Source, Target) ->
- {ok, SourceDb} = couch_db:open_int(Source, []),
- {ok, SourceInfo} = couch_db:get_db_info(SourceDb),
- ok = couch_db:close(SourceDb),
- SourceDocCount = couch_util:get_value(doc_count, SourceInfo),
- wait_target_in_sync_loop(SourceDocCount, Target, 300).
-
-
-wait_target_in_sync_loop(_DocCount, _TargetName, 0) ->
- erlang:error({assertion_failed, [
- {module, ?MODULE}, {line, ?LINE},
- {reason, "Could not get source and target databases in sync"}
- ]});
-
-wait_target_in_sync_loop(DocCount, TargetName, RetriesLeft) ->
- {ok, Target} = couch_db:open_int(TargetName, []),
- {ok, TargetInfo} = couch_db:get_db_info(Target),
- ok = couch_db:close(Target),
- TargetDocCount = couch_util:get_value(doc_count, TargetInfo),
- case TargetDocCount == DocCount of
- true ->
- true;
- false ->
- ok = timer:sleep(500),
- wait_target_in_sync_loop(DocCount, TargetName, RetriesLeft - 1)
- end.
-
-
-replicate(Source, Target) ->
- SrcUrl = couch_replicator_test_helper:db_url(Source),
- TgtUrl = couch_replicator_test_helper:db_url(Target),
- RepObject = {[
- {<<"source">>, SrcUrl},
- {<<"target">>, TgtUrl},
- {<<"continuous">>, true},
- {<<"worker_processes">>, 1},
- {<<"retries_per_request">>, 1},
- % Low connection timeout so _changes feed gets restarted quicker
- {<<"connection_timeout">>, 3000}
- ]},
- {ok, Rep} = couch_replicator_utils:parse_rep_doc(RepObject, ?ADMIN_USER),
- ok = couch_replicator_scheduler:add_job(Rep),
- couch_replicator_scheduler:reschedule(),
- {ok, Rep#rep.id}.