summaryrefslogtreecommitdiff
path: root/src/couch_replicator/src/couch_replicator_doc_processor.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/couch_replicator/src/couch_replicator_doc_processor.erl')
-rw-r--r--src/couch_replicator/src/couch_replicator_doc_processor.erl524
1 files changed, 248 insertions, 276 deletions
diff --git a/src/couch_replicator/src/couch_replicator_doc_processor.erl b/src/couch_replicator/src/couch_replicator_doc_processor.erl
index ed6670615..436d7c44d 100644
--- a/src/couch_replicator/src/couch_replicator_doc_processor.erl
+++ b/src/couch_replicator/src/couch_replicator_doc_processor.erl
@@ -20,12 +20,12 @@
]).
-export([
- init/1,
- terminate/2,
- handle_call/3,
- handle_info/2,
- handle_cast/2,
- code_change/3
+ init/1,
+ terminate/2,
+ handle_call/3,
+ handle_info/2,
+ handle_cast/2,
+ code_change/3
]).
-export([
@@ -54,15 +54,15 @@
]).
-define(DEFAULT_UPDATE_DOCS, false).
--define(ERROR_MAX_BACKOFF_EXPONENT, 12). % ~ 1 day on average
+% ~ 1 day on average
+-define(ERROR_MAX_BACKOFF_EXPONENT, 12).
-define(TS_DAY_SEC, 86400).
-define(INITIAL_BACKOFF_EXPONENT, 64).
-define(MIN_FILTER_DELAY_SEC, 60).
--type filter_type() :: nil | view | user | docids | mango.
+-type filter_type() :: nil | view | user | docids | mango.
-type repstate() :: initializing | error | scheduled.
-
-record(rdoc, {
id :: db_doc_id() | '_' | {any(), '_'},
state :: repstate() | '_',
@@ -75,7 +75,6 @@
last_updated :: erlang:timestamp() | '_'
}).
-
% couch_multidb_changes API callbacks
db_created(DbName, Server) ->
@@ -83,35 +82,31 @@ db_created(DbName, Server) ->
couch_replicator_docs:ensure_rep_ddoc_exists(DbName),
Server.
-
db_deleted(DbName, Server) ->
couch_stats:increment_counter([couch_replicator, docs, dbs_deleted]),
ok = gen_server:call(?MODULE, {clean_up_replications, DbName}, infinity),
Server.
-
db_found(DbName, Server) ->
couch_stats:increment_counter([couch_replicator, docs, dbs_found]),
couch_replicator_docs:ensure_rep_ddoc_exists(DbName),
Server.
-
db_change(DbName, {ChangeProps} = Change, Server) ->
couch_stats:increment_counter([couch_replicator, docs, db_changes]),
try
ok = process_change(DbName, Change)
catch
- exit:{Error, {gen_server, call, [?MODULE, _, _]}} ->
- ErrMsg = "~p exited ~p while processing change from db ~p",
- couch_log:error(ErrMsg, [?MODULE, Error, DbName]);
- _Tag:Error ->
- {RepProps} = get_json_value(doc, ChangeProps),
- DocId = get_json_value(<<"_id">>, RepProps),
- couch_replicator_docs:update_failed(DbName, DocId, Error)
+ exit:{Error, {gen_server, call, [?MODULE, _, _]}} ->
+ ErrMsg = "~p exited ~p while processing change from db ~p",
+ couch_log:error(ErrMsg, [?MODULE, Error, DbName]);
+ _Tag:Error ->
+ {RepProps} = get_json_value(doc, ChangeProps),
+ DocId = get_json_value(<<"_id">>, RepProps),
+ couch_replicator_docs:update_failed(DbName, DocId, Error)
end,
Server.
-
-spec get_worker_ref(db_doc_id()) -> reference() | nil.
get_worker_ref({DbName, DocId}) when is_binary(DbName), is_binary(DocId) ->
case ets:lookup(?MODULE, {DbName, DocId}) of
@@ -123,46 +118,43 @@ get_worker_ref({DbName, DocId}) when is_binary(DbName), is_binary(DocId) ->
nil
end.
-
% Cluster membership change notification callback
-spec notify_cluster_event(pid(), {cluster, any()}) -> ok.
notify_cluster_event(Server, {cluster, _} = Event) ->
gen_server:cast(Server, Event).
-
process_change(DbName, {Change}) ->
{RepProps} = JsonRepDoc = get_json_value(doc, Change),
DocId = get_json_value(<<"_id">>, RepProps),
Owner = couch_replicator_clustering:owner(DbName, DocId),
Id = {DbName, DocId},
case {Owner, get_json_value(deleted, Change, false)} of
- {_, true} ->
- ok = gen_server:call(?MODULE, {removed, Id}, infinity);
- {unstable, false} ->
- couch_log:notice("Not starting '~s' as cluster is unstable", [DocId]);
- {ThisNode, false} when ThisNode =:= node() ->
- case get_json_value(<<"_replication_state">>, RepProps) of
- undefined ->
- ok = process_updated(Id, JsonRepDoc);
- <<"triggered">> ->
- maybe_remove_state_fields(DbName, DocId),
- ok = process_updated(Id, JsonRepDoc);
- <<"completed">> ->
- ok = gen_server:call(?MODULE, {completed, Id}, infinity);
- <<"error">> ->
- % Handle replications started from older versions of replicator
- % which wrote transient errors to replication docs
- maybe_remove_state_fields(DbName, DocId),
- ok = process_updated(Id, JsonRepDoc);
- <<"failed">> ->
+ {_, true} ->
+ ok = gen_server:call(?MODULE, {removed, Id}, infinity);
+ {unstable, false} ->
+ couch_log:notice("Not starting '~s' as cluster is unstable", [DocId]);
+ {ThisNode, false} when ThisNode =:= node() ->
+ case get_json_value(<<"_replication_state">>, RepProps) of
+ undefined ->
+ ok = process_updated(Id, JsonRepDoc);
+ <<"triggered">> ->
+ maybe_remove_state_fields(DbName, DocId),
+ ok = process_updated(Id, JsonRepDoc);
+ <<"completed">> ->
+ ok = gen_server:call(?MODULE, {completed, Id}, infinity);
+ <<"error">> ->
+ % Handle replications started from older versions of replicator
+ % which wrote transient errors to replication docs
+ maybe_remove_state_fields(DbName, DocId),
+ ok = process_updated(Id, JsonRepDoc);
+ <<"failed">> ->
+ ok
+ end;
+ {Owner, false} ->
ok
- end;
- {Owner, false} ->
- ok
end,
ok.
-
maybe_remove_state_fields(DbName, DocId) ->
case update_docs() of
true ->
@@ -171,7 +163,6 @@ maybe_remove_state_fields(DbName, DocId) ->
couch_replicator_docs:remove_state_fields(DbName, DocId)
end.
-
process_updated({DbName, _DocId} = Id, JsonRepDoc) ->
% Parsing replication doc (but not calculating the id) could throw an
% exception which would indicate this document is malformed. This exception
@@ -180,53 +171,54 @@ process_updated({DbName, _DocId} = Id, JsonRepDoc) ->
% problem.
Rep0 = couch_replicator_docs:parse_rep_doc_without_id(JsonRepDoc),
Rep = Rep0#rep{db_name = DbName, start_time = os:timestamp()},
- Filter = case couch_replicator_filters:parse(Rep#rep.options) of
- {ok, nil} ->
- nil;
- {ok, {user, _FName, _QP}} ->
- user;
- {ok, {view, _FName, _QP}} ->
- view;
- {ok, {docids, _DocIds}} ->
- docids;
- {ok, {mango, _Selector}} ->
- mango;
- {error, FilterError} ->
- throw(FilterError)
- end,
+ Filter =
+ case couch_replicator_filters:parse(Rep#rep.options) of
+ {ok, nil} ->
+ nil;
+ {ok, {user, _FName, _QP}} ->
+ user;
+ {ok, {view, _FName, _QP}} ->
+ view;
+ {ok, {docids, _DocIds}} ->
+ docids;
+ {ok, {mango, _Selector}} ->
+ mango;
+ {error, FilterError} ->
+ throw(FilterError)
+ end,
gen_server:call(?MODULE, {updated, Id, Rep, Filter}, infinity).
-
% Doc processor gen_server API and callbacks
start_link() ->
- gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
-
+ gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
init([]) ->
- ?MODULE = ets:new(?MODULE, [named_table, {keypos, #rdoc.id},
- {read_concurrency, true}, {write_concurrency, true}]),
- couch_replicator_clustering:link_cluster_event_listener(?MODULE,
- notify_cluster_event, [self()]),
+ ?MODULE = ets:new(?MODULE, [
+ named_table,
+ {keypos, #rdoc.id},
+ {read_concurrency, true},
+ {write_concurrency, true}
+ ]),
+ couch_replicator_clustering:link_cluster_event_listener(
+ ?MODULE,
+ notify_cluster_event,
+ [self()]
+ ),
{ok, nil}.
-
terminate(_Reason, _State) ->
ok.
-
handle_call({updated, Id, Rep, Filter}, _From, State) ->
ok = updated_doc(Id, Rep, Filter),
{reply, ok, State};
-
handle_call({removed, Id}, _From, State) ->
ok = removed_doc(Id),
{reply, ok, State};
-
handle_call({completed, Id}, _From, State) ->
true = ets:delete(?MODULE, Id),
{reply, ok, State};
-
handle_call({clean_up_replications, DbName}, _From, State) ->
ok = removed_db(DbName),
{reply, ok, State}.
@@ -234,29 +226,29 @@ handle_call({clean_up_replications, DbName}, _From, State) ->
handle_cast({cluster, unstable}, State) ->
% Ignoring unstable state transition
{noreply, State};
-
handle_cast({cluster, stable}, State) ->
% Membership changed recheck all the replication document ownership
nil = ets:foldl(fun cluster_membership_foldl/2, nil, ?MODULE),
{noreply, State};
-
handle_cast(Msg, State) ->
{stop, {error, unexpected_message, Msg}, State}.
-
-handle_info({'DOWN', _, _, _, #doc_worker_result{id = Id, wref = Ref,
- result = Res}}, State) ->
+handle_info(
+ {'DOWN', _, _, _, #doc_worker_result{
+ id = Id,
+ wref = Ref,
+ result = Res
+ }},
+ State
+) ->
ok = worker_returned(Ref, Id, Res),
{noreply, State};
-
handle_info(_Msg, State) ->
{noreply, State}.
-
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
-
% Doc processor gen_server private helper functions
% Handle doc update -- add to ets, then start a worker to try to turn it into
@@ -289,7 +281,6 @@ updated_doc(Id, Rep, Filter) ->
ok
end.
-
% Return current #rep{} record if any. If replication hasn't been submitted
% to the scheduler yet, #rep{} record will be in the document processor's
% ETS table, otherwise query scheduler for the #rep{} record.
@@ -308,81 +299,81 @@ current_rep({DbName, DocId}) when is_binary(DbName), is_binary(DocId) ->
Rep
end.
-
-spec worker_returned(reference(), db_doc_id(), rep_start_result()) -> ok.
worker_returned(Ref, Id, {ok, RepId}) ->
case ets:lookup(?MODULE, Id) of
- [#rdoc{worker = Ref} = Row] ->
- Row0 = Row#rdoc{
- state = scheduled,
- errcnt = 0,
- worker = nil,
- last_updated = os:timestamp()
- },
- NewRow = case Row0 of
- #rdoc{rid = RepId, filter = user} ->
- % Filtered replication id didn't change.
- Row0;
- #rdoc{rid = nil, filter = user} ->
- % Calculated new replication id for a filtered replication. Make
- % sure to schedule another check as filter code could change.
- % Replication starts could have been failing, so also clear
- % error count.
- Row0#rdoc{rid = RepId};
- #rdoc{rid = OldRepId, filter = user} ->
- % Replication id of existing replication job with filter has
- % changed. Remove old replication job from scheduler and
- % schedule check to check for future changes.
- ok = couch_replicator_scheduler:remove_job(OldRepId),
- Msg = io_lib:format("Replication id changed: ~p -> ~p", [
- OldRepId, RepId]),
- Row0#rdoc{rid = RepId, info = couch_util:to_binary(Msg)};
- #rdoc{rid = nil} ->
- % Calculated new replication id for non-filtered replication.
- % Remove replication doc body, after this we won't need it
- % anymore.
- Row0#rdoc{rep=nil, rid=RepId, info=nil}
- end,
- true = ets:insert(?MODULE, NewRow),
- ok = maybe_update_doc_triggered(Row#rdoc.rep, RepId),
- ok = maybe_start_worker(Id);
- _ ->
- ok % doc could have been deleted, ignore
+ [#rdoc{worker = Ref} = Row] ->
+ Row0 = Row#rdoc{
+ state = scheduled,
+ errcnt = 0,
+ worker = nil,
+ last_updated = os:timestamp()
+ },
+ NewRow =
+ case Row0 of
+ #rdoc{rid = RepId, filter = user} ->
+ % Filtered replication id didn't change.
+ Row0;
+ #rdoc{rid = nil, filter = user} ->
+ % Calculated new replication id for a filtered replication. Make
+ % sure to schedule another check as filter code could change.
+ % Replication starts could have been failing, so also clear
+ % error count.
+ Row0#rdoc{rid = RepId};
+ #rdoc{rid = OldRepId, filter = user} ->
+ % Replication id of existing replication job with filter has
+ % changed. Remove old replication job from scheduler and
+ % schedule check to check for future changes.
+ ok = couch_replicator_scheduler:remove_job(OldRepId),
+ Msg = io_lib:format("Replication id changed: ~p -> ~p", [
+ OldRepId, RepId
+ ]),
+ Row0#rdoc{rid = RepId, info = couch_util:to_binary(Msg)};
+ #rdoc{rid = nil} ->
+ % Calculated new replication id for non-filtered replication.
+ % Remove replication doc body, after this we won't need it
+ % anymore.
+ Row0#rdoc{rep = nil, rid = RepId, info = nil}
+ end,
+ true = ets:insert(?MODULE, NewRow),
+ ok = maybe_update_doc_triggered(Row#rdoc.rep, RepId),
+ ok = maybe_start_worker(Id);
+ _ ->
+ % doc could have been deleted, ignore
+ ok
end,
ok;
-
worker_returned(_Ref, _Id, ignore) ->
ok;
-
worker_returned(Ref, Id, {temporary_error, Reason}) ->
case ets:lookup(?MODULE, Id) of
- [#rdoc{worker = Ref, errcnt = ErrCnt} = Row] ->
- NewRow = Row#rdoc{
- rid = nil,
- state = error,
- info = Reason,
- errcnt = ErrCnt + 1,
- worker = nil,
- last_updated = os:timestamp()
- },
- true = ets:insert(?MODULE, NewRow),
- ok = maybe_update_doc_error(NewRow#rdoc.rep, Reason),
- ok = maybe_start_worker(Id);
- _ ->
- ok % doc could have been deleted, ignore
+ [#rdoc{worker = Ref, errcnt = ErrCnt} = Row] ->
+ NewRow = Row#rdoc{
+ rid = nil,
+ state = error,
+ info = Reason,
+ errcnt = ErrCnt + 1,
+ worker = nil,
+ last_updated = os:timestamp()
+ },
+ true = ets:insert(?MODULE, NewRow),
+ ok = maybe_update_doc_error(NewRow#rdoc.rep, Reason),
+ ok = maybe_start_worker(Id);
+ _ ->
+ % doc could have been deleted, ignore
+ ok
end,
ok;
-
worker_returned(Ref, Id, {permanent_failure, _Reason}) ->
case ets:lookup(?MODULE, Id) of
- [#rdoc{worker = Ref}] ->
- true = ets:delete(?MODULE, Id);
- _ ->
- ok % doc could have been deleted, ignore
+ [#rdoc{worker = Ref}] ->
+ true = ets:delete(?MODULE, Id);
+ _ ->
+ % doc could have been deleted, ignore
+ ok
end,
ok.
-
-spec maybe_update_doc_error(#rep{}, any()) -> ok.
maybe_update_doc_error(Rep, Reason) ->
case update_docs() of
@@ -392,7 +383,6 @@ maybe_update_doc_error(Rep, Reason) ->
ok
end.
-
-spec maybe_update_doc_triggered(#rep{}, rep_id()) -> ok.
maybe_update_doc_triggered(Rep, RepId) ->
case update_docs() of
@@ -402,7 +392,6 @@ maybe_update_doc_triggered(Rep, RepId) ->
ok
end.
-
-spec error_backoff(non_neg_integer()) -> seconds().
error_backoff(ErrCnt) ->
Exp = min(ErrCnt, ?ERROR_MAX_BACKOFF_EXPONENT),
@@ -411,7 +400,6 @@ error_backoff(ErrCnt) ->
% on average. Then 1 minute and so on.
couch_rand:uniform(?INITIAL_BACKOFF_EXPONENT bsl Exp).
-
-spec filter_backoff() -> seconds().
filter_backoff() ->
Total = ets:info(?MODULE, size),
@@ -424,7 +412,6 @@ filter_backoff() ->
Range = 1 + min(2 * (Total / 10), ?TS_DAY_SEC),
?MIN_FILTER_DELAY_SEC + couch_rand:uniform(round(Range)).
-
% Document removed from db -- clear ets table and remove all scheduled jobs
-spec removed_doc(db_doc_id()) -> ok.
removed_doc({DbName, DocId} = Id) ->
@@ -432,7 +419,6 @@ removed_doc({DbName, DocId} = Id) ->
RepIds = couch_replicator_scheduler:find_jobs_by_doc(DbName, DocId),
lists:foreach(fun couch_replicator_scheduler:remove_job/1, RepIds).
-
% Whole db shard is gone -- remove all its ets rows and stop jobs
-spec removed_db(binary()) -> ok.
removed_db(DbName) ->
@@ -441,32 +427,30 @@ removed_db(DbName) ->
RepIds = couch_replicator_scheduler:find_jobs_by_dbname(DbName),
lists:foreach(fun couch_replicator_scheduler:remove_job/1, RepIds).
-
% Spawn a worker process which will attempt to calculate a replication id, then
% start a replication. Returns a process monitor reference. The worker is
% guaranteed to exit with rep_start_result() type only.
-spec maybe_start_worker(db_doc_id()) -> ok.
maybe_start_worker(Id) ->
case ets:lookup(?MODULE, Id) of
- [] ->
- ok;
- [#rdoc{state = scheduled, filter = Filter}] when Filter =/= user ->
- ok;
- [#rdoc{rep = Rep} = Doc] ->
- % For any replication with a user created filter function, periodically
- % (every `filter_backoff/0` seconds) to try to see if the user filter
- % has changed by using a worker to check for changes. When the worker
- % returns check if replication ID has changed. If it hasn't keep
- % checking (spawn another worker and so on). If it has stop the job
- % with the old ID and continue checking.
- Wait = get_worker_wait(Doc),
- Ref = make_ref(),
- true = ets:insert(?MODULE, Doc#rdoc{worker = Ref}),
- couch_replicator_doc_processor_worker:spawn_worker(Id, Rep, Wait, Ref),
- ok
+ [] ->
+ ok;
+ [#rdoc{state = scheduled, filter = Filter}] when Filter =/= user ->
+ ok;
+ [#rdoc{rep = Rep} = Doc] ->
+ % For any replication with a user created filter function, periodically
+ % (every `filter_backoff/0` seconds) to try to see if the user filter
+ % has changed by using a worker to check for changes. When the worker
+ % returns check if replication ID has changed. If it hasn't keep
+ % checking (spawn another worker and so on). If it has stop the job
+ % with the old ID and continue checking.
+ Wait = get_worker_wait(Doc),
+ Ref = make_ref(),
+ true = ets:insert(?MODULE, Doc#rdoc{worker = Ref}),
+ couch_replicator_doc_processor_worker:spawn_worker(Id, Rep, Wait, Ref),
+ ok
end.
-
-spec get_worker_wait(#rdoc{}) -> seconds().
get_worker_wait(#rdoc{state = scheduled, filter = user}) ->
filter_backoff();
@@ -475,45 +459,52 @@ get_worker_wait(#rdoc{state = error, errcnt = ErrCnt}) ->
get_worker_wait(#rdoc{state = initializing}) ->
0.
-
-spec update_docs() -> boolean().
update_docs() ->
config:get_boolean("replicator", "update_docs", ?DEFAULT_UPDATE_DOCS).
-
% _scheduler/docs HTTP endpoint helpers
-spec docs([atom()]) -> [{[_]}] | [].
docs(States) ->
HealthThreshold = couch_replicator_scheduler:health_threshold(),
- ets:foldl(fun(RDoc, Acc) ->
- case ejson_doc(RDoc, HealthThreshold) of
- nil ->
- Acc; % Could have been deleted if job just completed
- {Props} = EJson ->
- {state, DocState} = lists:keyfind(state, 1, Props),
- case ejson_doc_state_filter(DocState, States) of
- true ->
- [EJson | Acc];
- false ->
- Acc
- end
- end
- end, [], ?MODULE).
-
+ ets:foldl(
+ fun(RDoc, Acc) ->
+ case ejson_doc(RDoc, HealthThreshold) of
+ nil ->
+ % Could have been deleted if job just completed
+ Acc;
+ {Props} = EJson ->
+ {state, DocState} = lists:keyfind(state, 1, Props),
+ case ejson_doc_state_filter(DocState, States) of
+ true ->
+ [EJson | Acc];
+ false ->
+ Acc
+ end
+ end
+ end,
+ [],
+ ?MODULE
+ ).
-spec doc(binary(), binary()) -> {ok, {[_]}} | {error, not_found}.
doc(Db, DocId) ->
HealthThreshold = couch_replicator_scheduler:health_threshold(),
- Res = (catch ets:foldl(fun(RDoc, nil) ->
- {Shard, RDocId} = RDoc#rdoc.id,
- case {mem3:dbname(Shard), RDocId} of
- {Db, DocId} ->
- throw({found, ejson_doc(RDoc, HealthThreshold)});
- {_OtherDb, _OtherDocId} ->
- nil
- end
- end, nil, ?MODULE)),
+ Res =
+ (catch ets:foldl(
+ fun(RDoc, nil) ->
+ {Shard, RDocId} = RDoc#rdoc.id,
+ case {mem3:dbname(Shard), RDocId} of
+ {Db, DocId} ->
+ throw({found, ejson_doc(RDoc, HealthThreshold)});
+ {_OtherDb, _OtherDocId} ->
+ nil
+ end
+ end,
+ nil,
+ ?MODULE
+ )),
case Res of
{found, DocInfo} ->
{ok, DocInfo};
@@ -521,7 +512,6 @@ doc(Db, DocId) ->
{error, not_found}
end.
-
-spec doc_lookup(binary(), binary(), integer()) ->
{ok, {[_]}} | {error, not_found}.
doc_lookup(Db, DocId, HealthThreshold) ->
@@ -532,14 +522,12 @@ doc_lookup(Db, DocId, HealthThreshold) ->
{error, not_found}
end.
-
-spec ejson_rep_id(rep_id() | nil) -> binary() | null.
ejson_rep_id(nil) ->
null;
ejson_rep_id({BaseId, Ext}) ->
iolist_to_binary([BaseId, Ext]).
-
-spec ejson_doc(#rdoc{}, non_neg_integer()) -> {[_]} | nil.
ejson_doc(#rdoc{state = scheduled} = RDoc, HealthThreshold) ->
#rdoc{id = {DbName, DocId}, rid = RepId} = RDoc,
@@ -552,18 +540,18 @@ ejson_doc(#rdoc{state = scheduled} = RDoc, HealthThreshold) ->
{doc_id, DocId},
{database, DbName},
{id, ejson_rep_id(RepId)},
- {node, node()} | JobProps
+ {node, node()}
+ | JobProps
]}
end;
-
ejson_doc(#rdoc{state = RepState} = RDoc, _HealthThreshold) ->
#rdoc{
- id = {DbName, DocId},
- info = StateInfo,
- rid = RepId,
- errcnt = ErrorCount,
- last_updated = StateTime,
- rep = Rep
+ id = {DbName, DocId},
+ info = StateInfo,
+ rid = RepId,
+ errcnt = ErrorCount,
+ last_updated = StateTime,
+ rep = Rep
} = RDoc,
{[
{doc_id, DocId},
@@ -577,14 +565,12 @@ ejson_doc(#rdoc{state = RepState} = RDoc, _HealthThreshold) ->
{start_time, couch_replicator_utils:iso8601(Rep#rep.start_time)}
]}.
-
-spec ejson_doc_state_filter(atom(), [atom()]) -> boolean().
ejson_doc_state_filter(_DocState, []) ->
true;
ejson_doc_state_filter(State, States) when is_list(States), is_atom(State) ->
lists:member(State, States).
-
-spec cluster_membership_foldl(#rdoc{}, nil) -> nil.
cluster_membership_foldl(#rdoc{id = {DbName, DocId} = Id, rid = RepId}, nil) ->
case couch_replicator_clustering:owner(DbName, DocId) of
@@ -599,7 +585,6 @@ cluster_membership_foldl(#rdoc{id = {DbName, DocId} = Id, rid = RepId}, nil) ->
nil
end.
-
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
@@ -611,7 +596,6 @@ cluster_membership_foldl(#rdoc{id = {DbName, DocId} = Id, rid = RepId}, nil) ->
-define(R1, {"1", ""}).
-define(R2, {"2", ""}).
-
doc_processor_test_() ->
{
setup,
@@ -640,7 +624,6 @@ doc_processor_test_() ->
}
}.
-
% Can't parse replication doc, so should write failure state to document.
t_bad_change() ->
?_test(begin
@@ -648,7 +631,6 @@ t_bad_change() ->
?assert(updated_doc_with_failed_state())
end).
-
% Regular change, parse to a #rep{} and then add job.
t_regular_change() ->
?_test(begin
@@ -658,15 +640,13 @@ t_regular_change() ->
?assert(started_worker({?DB, ?DOC1}))
end).
-
% Handle cases where doc processor exits or crashes while processing a change
t_change_with_doc_processor_crash() ->
?_test(begin
mock_existing_jobs_lookup([]),
?assertEqual(acc, db_change(?EXIT_DB, change(), acc)),
?assert(failed_state_not_updated())
- end).
-
+ end).
% Regular change, parse to a #rep{} and then add job but there is already
% a running job with same Id found.
@@ -678,7 +658,6 @@ t_change_with_existing_job() ->
?assert(started_worker({?DB, ?DOC1}))
end).
-
% Change is a deletion, and job is running, so remove job.
t_deleted_change() ->
?_test(begin
@@ -687,7 +666,6 @@ t_deleted_change() ->
?assert(removed_job(?R2))
end).
-
% Change is in `triggered` state. Remove legacy state and add job.
t_triggered_change() ->
?_test(begin
@@ -698,7 +676,6 @@ t_triggered_change() ->
?assert(started_worker({?DB, ?DOC1}))
end).
-
% Change is in `completed` state, so skip over it.
t_completed_change() ->
?_test(begin
@@ -708,7 +685,6 @@ t_completed_change() ->
?assert(did_not_spawn_worker())
end).
-
% Completed change comes for what used to be an active job. In this case
% remove entry from doc_processor's ets (because there is no linkage or
% callback mechanism for scheduler to tell doc_processsor a replication just
@@ -723,7 +699,6 @@ t_active_replication_completed() ->
?assertNot(ets:member(?MODULE, {?DB, ?DOC1}))
end).
-
% Change is in `error` state. Remove legacy state and retry
% running the job. This state was used for transient erorrs which are not
% written to the document anymore.
@@ -736,7 +711,6 @@ t_error_change() ->
?assert(started_worker({?DB, ?DOC1}))
end).
-
% Change is in `failed` state. This is a terminal state and it will not
% be tried again, so skip over it.
t_failed_change() ->
@@ -747,27 +721,24 @@ t_failed_change() ->
?assert(did_not_spawn_worker())
end).
-
% Normal change, but according to cluster ownership algorithm, replication
% belongs to a different node, so this node should skip it.
t_change_for_different_node() ->
- ?_test(begin
+ ?_test(begin
meck:expect(couch_replicator_clustering, owner, 2, different_node),
?assertEqual(ok, process_change(?DB, change())),
?assert(did_not_spawn_worker())
- end).
-
+ end).
% Change handled when cluster is unstable (nodes are added or removed), so
% job is not added. A rescan will be triggered soon and change will be
% evaluated again.
t_change_when_cluster_unstable() ->
- ?_test(begin
- meck:expect(couch_replicator_clustering, owner, 2, unstable),
- ?assertEqual(ok, process_change(?DB, change())),
- ?assert(did_not_spawn_worker())
- end).
-
+ ?_test(begin
+ meck:expect(couch_replicator_clustering, owner, 2, unstable),
+ ?assertEqual(ok, process_change(?DB, change())),
+ ?assert(did_not_spawn_worker())
+ end).
% Check if docs/0 function produces expected ejson after adding a job
t_ejson_docs() ->
@@ -776,12 +747,17 @@ t_ejson_docs() ->
?assertEqual(ok, process_change(?DB, change())),
?assert(ets:member(?MODULE, {?DB, ?DOC1})),
EJsonDocs = docs([]),
- ?assertMatch([{[_|_]}], EJsonDocs),
+ ?assertMatch([{[_ | _]}], EJsonDocs),
[{DocProps}] = EJsonDocs,
- {value, StateTime, DocProps1} = lists:keytake(last_updated, 1,
- DocProps),
- ?assertMatch({last_updated, BinVal1} when is_binary(BinVal1),
- StateTime),
+ {value, StateTime, DocProps1} = lists:keytake(
+ last_updated,
+ 1,
+ DocProps
+ ),
+ ?assertMatch(
+ {last_updated, BinVal1} when is_binary(BinVal1),
+ StateTime
+ ),
{value, StartTime, DocProps2} = lists:keytake(start_time, 1, DocProps1),
?assertMatch({start_time, BinVal2} when is_binary(BinVal2), StartTime),
ExpectedProps = [
@@ -796,11 +772,10 @@ t_ejson_docs() ->
?assertEqual(ExpectedProps, lists:usort(DocProps2))
end).
-
% Check that when cluster membership changes records from doc processor and job
% scheduler get removed
t_cluster_membership_foldl() ->
- ?_test(begin
+ ?_test(begin
mock_existing_jobs_lookup([test_rep(?R1)]),
?assertEqual(ok, process_change(?DB, change())),
meck:expect(couch_replicator_clustering, owner, 2, different_node),
@@ -809,8 +784,7 @@ t_cluster_membership_foldl() ->
meck:wait(2, couch_replicator_scheduler, find_jobs_by_doc, 2, 5000),
?assertNot(ets:member(?MODULE, {?DB, ?DOC1})),
?assert(removed_job(?R1))
- end).
-
+ end).
get_worker_ref_test_() ->
{
@@ -830,10 +804,8 @@ get_worker_ref_test_() ->
end)
}.
-
% Test helper functions
-
setup_all() ->
meck:expect(couch_log, info, 2, ok),
meck:expect(couch_log, notice, 2, ok),
@@ -842,8 +814,12 @@ setup_all() ->
meck:expect(config, get, fun(_, _, Default) -> Default end),
meck:expect(config, listen_for_changes, 2, ok),
meck:expect(couch_replicator_clustering, owner, 2, node()),
- meck:expect(couch_replicator_clustering, link_cluster_event_listener, 3,
- ok),
+ meck:expect(
+ couch_replicator_clustering,
+ link_cluster_event_listener,
+ 3,
+ ok
+ ),
meck:expect(couch_replicator_doc_processor_worker, spawn_worker, fun
({?EXIT_DB, _}, _, _, _) -> exit(kapow);
(_, _, _, _) -> pid
@@ -852,11 +828,9 @@ setup_all() ->
meck:expect(couch_replicator_docs, remove_state_fields, 2, ok),
meck:expect(couch_replicator_docs, update_failed, 3, ok).
-
teardown_all(_) ->
meck:unload().
-
setup() ->
meck:reset([
config,
@@ -873,30 +847,29 @@ setup() ->
unlink(Pid),
Pid.
-
teardown(Pid) ->
- test_util:stop_sync(Pid, kill, 1000). % 1s wait should suffice
-
+ % 1s wait should suffice
+ test_util:stop_sync(Pid, kill, 1000).
removed_state_fields() ->
meck:called(couch_replicator_docs, remove_state_fields, [?DB, ?DOC1]).
-
started_worker(_Id) ->
1 == meck:num_calls(couch_replicator_doc_processor_worker, spawn_worker, 4).
-
removed_job(Id) ->
meck:called(couch_replicator_scheduler, remove_job, [test_rep(Id)]).
-
did_not_remove_state_fields() ->
0 == meck:num_calls(couch_replicator_docs, remove_state_fields, '_').
-
did_not_spawn_worker() ->
- 0 == meck:num_calls(couch_replicator_doc_processor_worker, spawn_worker,
- '_').
+ 0 ==
+ meck:num_calls(
+ couch_replicator_doc_processor_worker,
+ spawn_worker,
+ '_'
+ ).
updated_doc_with_failed_state() ->
1 == meck:num_calls(couch_replicator_docs, update_failed, '_').
@@ -910,53 +883,52 @@ mock_existing_jobs_lookup(ExistingJobs) ->
(?DB, ?DOC1) -> ExistingJobs
end).
-
test_rep(Id) ->
- #rep{id = Id, start_time = {0, 0, 0}}.
-
+ #rep{id = Id, start_time = {0, 0, 0}}.
change() ->
{[
{<<"id">>, ?DOC1},
- {doc, {[
- {<<"_id">>, ?DOC1},
- {<<"source">>, <<"http://srchost.local/src">>},
- {<<"target">>, <<"http://tgthost.local/tgt">>}
- ]}}
+ {doc,
+ {[
+ {<<"_id">>, ?DOC1},
+ {<<"source">>, <<"http://srchost.local/src">>},
+ {<<"target">>, <<"http://tgthost.local/tgt">>}
+ ]}}
]}.
-
change(State) ->
{[
{<<"id">>, ?DOC1},
- {doc, {[
- {<<"_id">>, ?DOC1},
- {<<"source">>, <<"http://srchost.local/src">>},
- {<<"target">>, <<"http://tgthost.local/tgt">>},
- {<<"_replication_state">>, State}
- ]}}
+ {doc,
+ {[
+ {<<"_id">>, ?DOC1},
+ {<<"source">>, <<"http://srchost.local/src">>},
+ {<<"target">>, <<"http://tgthost.local/tgt">>},
+ {<<"_replication_state">>, State}
+ ]}}
]}.
-
deleted_change() ->
{[
{<<"id">>, ?DOC1},
{<<"deleted">>, true},
- {doc, {[
- {<<"_id">>, ?DOC1},
- {<<"source">>, <<"http://srchost.local/src">>},
- {<<"target">>, <<"http://tgthost.local/tgt">>}
- ]}}
+ {doc,
+ {[
+ {<<"_id">>, ?DOC1},
+ {<<"source">>, <<"http://srchost.local/src">>},
+ {<<"target">>, <<"http://tgthost.local/tgt">>}
+ ]}}
]}.
-
bad_change() ->
{[
{<<"id">>, ?DOC2},
- {doc, {[
- {<<"_id">>, ?DOC2},
- {<<"source">>, <<"src">>}
- ]}}
+ {doc,
+ {[
+ {<<"_id">>, ?DOC2},
+ {<<"source">>, <<"src">>}
+ ]}}
]}.
-endif.