diff options
Diffstat (limited to 'src/couch_replicator/src/couch_replicator_doc_processor.erl')
-rw-r--r-- | src/couch_replicator/src/couch_replicator_doc_processor.erl | 524 |
1 files changed, 248 insertions, 276 deletions
diff --git a/src/couch_replicator/src/couch_replicator_doc_processor.erl b/src/couch_replicator/src/couch_replicator_doc_processor.erl index ed6670615..436d7c44d 100644 --- a/src/couch_replicator/src/couch_replicator_doc_processor.erl +++ b/src/couch_replicator/src/couch_replicator_doc_processor.erl @@ -20,12 +20,12 @@ ]). -export([ - init/1, - terminate/2, - handle_call/3, - handle_info/2, - handle_cast/2, - code_change/3 + init/1, + terminate/2, + handle_call/3, + handle_info/2, + handle_cast/2, + code_change/3 ]). -export([ @@ -54,15 +54,15 @@ ]). -define(DEFAULT_UPDATE_DOCS, false). --define(ERROR_MAX_BACKOFF_EXPONENT, 12). % ~ 1 day on average +% ~ 1 day on average +-define(ERROR_MAX_BACKOFF_EXPONENT, 12). -define(TS_DAY_SEC, 86400). -define(INITIAL_BACKOFF_EXPONENT, 64). -define(MIN_FILTER_DELAY_SEC, 60). --type filter_type() :: nil | view | user | docids | mango. +-type filter_type() :: nil | view | user | docids | mango. -type repstate() :: initializing | error | scheduled. - -record(rdoc, { id :: db_doc_id() | '_' | {any(), '_'}, state :: repstate() | '_', @@ -75,7 +75,6 @@ last_updated :: erlang:timestamp() | '_' }). - % couch_multidb_changes API callbacks db_created(DbName, Server) -> @@ -83,35 +82,31 @@ db_created(DbName, Server) -> couch_replicator_docs:ensure_rep_ddoc_exists(DbName), Server. - db_deleted(DbName, Server) -> couch_stats:increment_counter([couch_replicator, docs, dbs_deleted]), ok = gen_server:call(?MODULE, {clean_up_replications, DbName}, infinity), Server. - db_found(DbName, Server) -> couch_stats:increment_counter([couch_replicator, docs, dbs_found]), couch_replicator_docs:ensure_rep_ddoc_exists(DbName), Server. - db_change(DbName, {ChangeProps} = Change, Server) -> couch_stats:increment_counter([couch_replicator, docs, db_changes]), try ok = process_change(DbName, Change) catch - exit:{Error, {gen_server, call, [?MODULE, _, _]}} -> - ErrMsg = "~p exited ~p while processing change from db ~p", - couch_log:error(ErrMsg, [?MODULE, Error, DbName]); - _Tag:Error -> - {RepProps} = get_json_value(doc, ChangeProps), - DocId = get_json_value(<<"_id">>, RepProps), - couch_replicator_docs:update_failed(DbName, DocId, Error) + exit:{Error, {gen_server, call, [?MODULE, _, _]}} -> + ErrMsg = "~p exited ~p while processing change from db ~p", + couch_log:error(ErrMsg, [?MODULE, Error, DbName]); + _Tag:Error -> + {RepProps} = get_json_value(doc, ChangeProps), + DocId = get_json_value(<<"_id">>, RepProps), + couch_replicator_docs:update_failed(DbName, DocId, Error) end, Server. - -spec get_worker_ref(db_doc_id()) -> reference() | nil. get_worker_ref({DbName, DocId}) when is_binary(DbName), is_binary(DocId) -> case ets:lookup(?MODULE, {DbName, DocId}) of @@ -123,46 +118,43 @@ get_worker_ref({DbName, DocId}) when is_binary(DbName), is_binary(DocId) -> nil end. - % Cluster membership change notification callback -spec notify_cluster_event(pid(), {cluster, any()}) -> ok. notify_cluster_event(Server, {cluster, _} = Event) -> gen_server:cast(Server, Event). - process_change(DbName, {Change}) -> {RepProps} = JsonRepDoc = get_json_value(doc, Change), DocId = get_json_value(<<"_id">>, RepProps), Owner = couch_replicator_clustering:owner(DbName, DocId), Id = {DbName, DocId}, case {Owner, get_json_value(deleted, Change, false)} of - {_, true} -> - ok = gen_server:call(?MODULE, {removed, Id}, infinity); - {unstable, false} -> - couch_log:notice("Not starting '~s' as cluster is unstable", [DocId]); - {ThisNode, false} when ThisNode =:= node() -> - case get_json_value(<<"_replication_state">>, RepProps) of - undefined -> - ok = process_updated(Id, JsonRepDoc); - <<"triggered">> -> - maybe_remove_state_fields(DbName, DocId), - ok = process_updated(Id, JsonRepDoc); - <<"completed">> -> - ok = gen_server:call(?MODULE, {completed, Id}, infinity); - <<"error">> -> - % Handle replications started from older versions of replicator - % which wrote transient errors to replication docs - maybe_remove_state_fields(DbName, DocId), - ok = process_updated(Id, JsonRepDoc); - <<"failed">> -> + {_, true} -> + ok = gen_server:call(?MODULE, {removed, Id}, infinity); + {unstable, false} -> + couch_log:notice("Not starting '~s' as cluster is unstable", [DocId]); + {ThisNode, false} when ThisNode =:= node() -> + case get_json_value(<<"_replication_state">>, RepProps) of + undefined -> + ok = process_updated(Id, JsonRepDoc); + <<"triggered">> -> + maybe_remove_state_fields(DbName, DocId), + ok = process_updated(Id, JsonRepDoc); + <<"completed">> -> + ok = gen_server:call(?MODULE, {completed, Id}, infinity); + <<"error">> -> + % Handle replications started from older versions of replicator + % which wrote transient errors to replication docs + maybe_remove_state_fields(DbName, DocId), + ok = process_updated(Id, JsonRepDoc); + <<"failed">> -> + ok + end; + {Owner, false} -> ok - end; - {Owner, false} -> - ok end, ok. - maybe_remove_state_fields(DbName, DocId) -> case update_docs() of true -> @@ -171,7 +163,6 @@ maybe_remove_state_fields(DbName, DocId) -> couch_replicator_docs:remove_state_fields(DbName, DocId) end. - process_updated({DbName, _DocId} = Id, JsonRepDoc) -> % Parsing replication doc (but not calculating the id) could throw an % exception which would indicate this document is malformed. This exception @@ -180,53 +171,54 @@ process_updated({DbName, _DocId} = Id, JsonRepDoc) -> % problem. Rep0 = couch_replicator_docs:parse_rep_doc_without_id(JsonRepDoc), Rep = Rep0#rep{db_name = DbName, start_time = os:timestamp()}, - Filter = case couch_replicator_filters:parse(Rep#rep.options) of - {ok, nil} -> - nil; - {ok, {user, _FName, _QP}} -> - user; - {ok, {view, _FName, _QP}} -> - view; - {ok, {docids, _DocIds}} -> - docids; - {ok, {mango, _Selector}} -> - mango; - {error, FilterError} -> - throw(FilterError) - end, + Filter = + case couch_replicator_filters:parse(Rep#rep.options) of + {ok, nil} -> + nil; + {ok, {user, _FName, _QP}} -> + user; + {ok, {view, _FName, _QP}} -> + view; + {ok, {docids, _DocIds}} -> + docids; + {ok, {mango, _Selector}} -> + mango; + {error, FilterError} -> + throw(FilterError) + end, gen_server:call(?MODULE, {updated, Id, Rep, Filter}, infinity). - % Doc processor gen_server API and callbacks start_link() -> - gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). - + gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). init([]) -> - ?MODULE = ets:new(?MODULE, [named_table, {keypos, #rdoc.id}, - {read_concurrency, true}, {write_concurrency, true}]), - couch_replicator_clustering:link_cluster_event_listener(?MODULE, - notify_cluster_event, [self()]), + ?MODULE = ets:new(?MODULE, [ + named_table, + {keypos, #rdoc.id}, + {read_concurrency, true}, + {write_concurrency, true} + ]), + couch_replicator_clustering:link_cluster_event_listener( + ?MODULE, + notify_cluster_event, + [self()] + ), {ok, nil}. - terminate(_Reason, _State) -> ok. - handle_call({updated, Id, Rep, Filter}, _From, State) -> ok = updated_doc(Id, Rep, Filter), {reply, ok, State}; - handle_call({removed, Id}, _From, State) -> ok = removed_doc(Id), {reply, ok, State}; - handle_call({completed, Id}, _From, State) -> true = ets:delete(?MODULE, Id), {reply, ok, State}; - handle_call({clean_up_replications, DbName}, _From, State) -> ok = removed_db(DbName), {reply, ok, State}. @@ -234,29 +226,29 @@ handle_call({clean_up_replications, DbName}, _From, State) -> handle_cast({cluster, unstable}, State) -> % Ignoring unstable state transition {noreply, State}; - handle_cast({cluster, stable}, State) -> % Membership changed recheck all the replication document ownership nil = ets:foldl(fun cluster_membership_foldl/2, nil, ?MODULE), {noreply, State}; - handle_cast(Msg, State) -> {stop, {error, unexpected_message, Msg}, State}. - -handle_info({'DOWN', _, _, _, #doc_worker_result{id = Id, wref = Ref, - result = Res}}, State) -> +handle_info( + {'DOWN', _, _, _, #doc_worker_result{ + id = Id, + wref = Ref, + result = Res + }}, + State +) -> ok = worker_returned(Ref, Id, Res), {noreply, State}; - handle_info(_Msg, State) -> {noreply, State}. - code_change(_OldVsn, State, _Extra) -> {ok, State}. - % Doc processor gen_server private helper functions % Handle doc update -- add to ets, then start a worker to try to turn it into @@ -289,7 +281,6 @@ updated_doc(Id, Rep, Filter) -> ok end. - % Return current #rep{} record if any. If replication hasn't been submitted % to the scheduler yet, #rep{} record will be in the document processor's % ETS table, otherwise query scheduler for the #rep{} record. @@ -308,81 +299,81 @@ current_rep({DbName, DocId}) when is_binary(DbName), is_binary(DocId) -> Rep end. - -spec worker_returned(reference(), db_doc_id(), rep_start_result()) -> ok. worker_returned(Ref, Id, {ok, RepId}) -> case ets:lookup(?MODULE, Id) of - [#rdoc{worker = Ref} = Row] -> - Row0 = Row#rdoc{ - state = scheduled, - errcnt = 0, - worker = nil, - last_updated = os:timestamp() - }, - NewRow = case Row0 of - #rdoc{rid = RepId, filter = user} -> - % Filtered replication id didn't change. - Row0; - #rdoc{rid = nil, filter = user} -> - % Calculated new replication id for a filtered replication. Make - % sure to schedule another check as filter code could change. - % Replication starts could have been failing, so also clear - % error count. - Row0#rdoc{rid = RepId}; - #rdoc{rid = OldRepId, filter = user} -> - % Replication id of existing replication job with filter has - % changed. Remove old replication job from scheduler and - % schedule check to check for future changes. - ok = couch_replicator_scheduler:remove_job(OldRepId), - Msg = io_lib:format("Replication id changed: ~p -> ~p", [ - OldRepId, RepId]), - Row0#rdoc{rid = RepId, info = couch_util:to_binary(Msg)}; - #rdoc{rid = nil} -> - % Calculated new replication id for non-filtered replication. - % Remove replication doc body, after this we won't need it - % anymore. - Row0#rdoc{rep=nil, rid=RepId, info=nil} - end, - true = ets:insert(?MODULE, NewRow), - ok = maybe_update_doc_triggered(Row#rdoc.rep, RepId), - ok = maybe_start_worker(Id); - _ -> - ok % doc could have been deleted, ignore + [#rdoc{worker = Ref} = Row] -> + Row0 = Row#rdoc{ + state = scheduled, + errcnt = 0, + worker = nil, + last_updated = os:timestamp() + }, + NewRow = + case Row0 of + #rdoc{rid = RepId, filter = user} -> + % Filtered replication id didn't change. + Row0; + #rdoc{rid = nil, filter = user} -> + % Calculated new replication id for a filtered replication. Make + % sure to schedule another check as filter code could change. + % Replication starts could have been failing, so also clear + % error count. + Row0#rdoc{rid = RepId}; + #rdoc{rid = OldRepId, filter = user} -> + % Replication id of existing replication job with filter has + % changed. Remove old replication job from scheduler and + % schedule check to check for future changes. + ok = couch_replicator_scheduler:remove_job(OldRepId), + Msg = io_lib:format("Replication id changed: ~p -> ~p", [ + OldRepId, RepId + ]), + Row0#rdoc{rid = RepId, info = couch_util:to_binary(Msg)}; + #rdoc{rid = nil} -> + % Calculated new replication id for non-filtered replication. + % Remove replication doc body, after this we won't need it + % anymore. + Row0#rdoc{rep = nil, rid = RepId, info = nil} + end, + true = ets:insert(?MODULE, NewRow), + ok = maybe_update_doc_triggered(Row#rdoc.rep, RepId), + ok = maybe_start_worker(Id); + _ -> + % doc could have been deleted, ignore + ok end, ok; - worker_returned(_Ref, _Id, ignore) -> ok; - worker_returned(Ref, Id, {temporary_error, Reason}) -> case ets:lookup(?MODULE, Id) of - [#rdoc{worker = Ref, errcnt = ErrCnt} = Row] -> - NewRow = Row#rdoc{ - rid = nil, - state = error, - info = Reason, - errcnt = ErrCnt + 1, - worker = nil, - last_updated = os:timestamp() - }, - true = ets:insert(?MODULE, NewRow), - ok = maybe_update_doc_error(NewRow#rdoc.rep, Reason), - ok = maybe_start_worker(Id); - _ -> - ok % doc could have been deleted, ignore + [#rdoc{worker = Ref, errcnt = ErrCnt} = Row] -> + NewRow = Row#rdoc{ + rid = nil, + state = error, + info = Reason, + errcnt = ErrCnt + 1, + worker = nil, + last_updated = os:timestamp() + }, + true = ets:insert(?MODULE, NewRow), + ok = maybe_update_doc_error(NewRow#rdoc.rep, Reason), + ok = maybe_start_worker(Id); + _ -> + % doc could have been deleted, ignore + ok end, ok; - worker_returned(Ref, Id, {permanent_failure, _Reason}) -> case ets:lookup(?MODULE, Id) of - [#rdoc{worker = Ref}] -> - true = ets:delete(?MODULE, Id); - _ -> - ok % doc could have been deleted, ignore + [#rdoc{worker = Ref}] -> + true = ets:delete(?MODULE, Id); + _ -> + % doc could have been deleted, ignore + ok end, ok. - -spec maybe_update_doc_error(#rep{}, any()) -> ok. maybe_update_doc_error(Rep, Reason) -> case update_docs() of @@ -392,7 +383,6 @@ maybe_update_doc_error(Rep, Reason) -> ok end. - -spec maybe_update_doc_triggered(#rep{}, rep_id()) -> ok. maybe_update_doc_triggered(Rep, RepId) -> case update_docs() of @@ -402,7 +392,6 @@ maybe_update_doc_triggered(Rep, RepId) -> ok end. - -spec error_backoff(non_neg_integer()) -> seconds(). error_backoff(ErrCnt) -> Exp = min(ErrCnt, ?ERROR_MAX_BACKOFF_EXPONENT), @@ -411,7 +400,6 @@ error_backoff(ErrCnt) -> % on average. Then 1 minute and so on. couch_rand:uniform(?INITIAL_BACKOFF_EXPONENT bsl Exp). - -spec filter_backoff() -> seconds(). filter_backoff() -> Total = ets:info(?MODULE, size), @@ -424,7 +412,6 @@ filter_backoff() -> Range = 1 + min(2 * (Total / 10), ?TS_DAY_SEC), ?MIN_FILTER_DELAY_SEC + couch_rand:uniform(round(Range)). - % Document removed from db -- clear ets table and remove all scheduled jobs -spec removed_doc(db_doc_id()) -> ok. removed_doc({DbName, DocId} = Id) -> @@ -432,7 +419,6 @@ removed_doc({DbName, DocId} = Id) -> RepIds = couch_replicator_scheduler:find_jobs_by_doc(DbName, DocId), lists:foreach(fun couch_replicator_scheduler:remove_job/1, RepIds). - % Whole db shard is gone -- remove all its ets rows and stop jobs -spec removed_db(binary()) -> ok. removed_db(DbName) -> @@ -441,32 +427,30 @@ removed_db(DbName) -> RepIds = couch_replicator_scheduler:find_jobs_by_dbname(DbName), lists:foreach(fun couch_replicator_scheduler:remove_job/1, RepIds). - % Spawn a worker process which will attempt to calculate a replication id, then % start a replication. Returns a process monitor reference. The worker is % guaranteed to exit with rep_start_result() type only. -spec maybe_start_worker(db_doc_id()) -> ok. maybe_start_worker(Id) -> case ets:lookup(?MODULE, Id) of - [] -> - ok; - [#rdoc{state = scheduled, filter = Filter}] when Filter =/= user -> - ok; - [#rdoc{rep = Rep} = Doc] -> - % For any replication with a user created filter function, periodically - % (every `filter_backoff/0` seconds) to try to see if the user filter - % has changed by using a worker to check for changes. When the worker - % returns check if replication ID has changed. If it hasn't keep - % checking (spawn another worker and so on). If it has stop the job - % with the old ID and continue checking. - Wait = get_worker_wait(Doc), - Ref = make_ref(), - true = ets:insert(?MODULE, Doc#rdoc{worker = Ref}), - couch_replicator_doc_processor_worker:spawn_worker(Id, Rep, Wait, Ref), - ok + [] -> + ok; + [#rdoc{state = scheduled, filter = Filter}] when Filter =/= user -> + ok; + [#rdoc{rep = Rep} = Doc] -> + % For any replication with a user created filter function, periodically + % (every `filter_backoff/0` seconds) to try to see if the user filter + % has changed by using a worker to check for changes. When the worker + % returns check if replication ID has changed. If it hasn't keep + % checking (spawn another worker and so on). If it has stop the job + % with the old ID and continue checking. + Wait = get_worker_wait(Doc), + Ref = make_ref(), + true = ets:insert(?MODULE, Doc#rdoc{worker = Ref}), + couch_replicator_doc_processor_worker:spawn_worker(Id, Rep, Wait, Ref), + ok end. - -spec get_worker_wait(#rdoc{}) -> seconds(). get_worker_wait(#rdoc{state = scheduled, filter = user}) -> filter_backoff(); @@ -475,45 +459,52 @@ get_worker_wait(#rdoc{state = error, errcnt = ErrCnt}) -> get_worker_wait(#rdoc{state = initializing}) -> 0. - -spec update_docs() -> boolean(). update_docs() -> config:get_boolean("replicator", "update_docs", ?DEFAULT_UPDATE_DOCS). - % _scheduler/docs HTTP endpoint helpers -spec docs([atom()]) -> [{[_]}] | []. docs(States) -> HealthThreshold = couch_replicator_scheduler:health_threshold(), - ets:foldl(fun(RDoc, Acc) -> - case ejson_doc(RDoc, HealthThreshold) of - nil -> - Acc; % Could have been deleted if job just completed - {Props} = EJson -> - {state, DocState} = lists:keyfind(state, 1, Props), - case ejson_doc_state_filter(DocState, States) of - true -> - [EJson | Acc]; - false -> - Acc - end - end - end, [], ?MODULE). - + ets:foldl( + fun(RDoc, Acc) -> + case ejson_doc(RDoc, HealthThreshold) of + nil -> + % Could have been deleted if job just completed + Acc; + {Props} = EJson -> + {state, DocState} = lists:keyfind(state, 1, Props), + case ejson_doc_state_filter(DocState, States) of + true -> + [EJson | Acc]; + false -> + Acc + end + end + end, + [], + ?MODULE + ). -spec doc(binary(), binary()) -> {ok, {[_]}} | {error, not_found}. doc(Db, DocId) -> HealthThreshold = couch_replicator_scheduler:health_threshold(), - Res = (catch ets:foldl(fun(RDoc, nil) -> - {Shard, RDocId} = RDoc#rdoc.id, - case {mem3:dbname(Shard), RDocId} of - {Db, DocId} -> - throw({found, ejson_doc(RDoc, HealthThreshold)}); - {_OtherDb, _OtherDocId} -> - nil - end - end, nil, ?MODULE)), + Res = + (catch ets:foldl( + fun(RDoc, nil) -> + {Shard, RDocId} = RDoc#rdoc.id, + case {mem3:dbname(Shard), RDocId} of + {Db, DocId} -> + throw({found, ejson_doc(RDoc, HealthThreshold)}); + {_OtherDb, _OtherDocId} -> + nil + end + end, + nil, + ?MODULE + )), case Res of {found, DocInfo} -> {ok, DocInfo}; @@ -521,7 +512,6 @@ doc(Db, DocId) -> {error, not_found} end. - -spec doc_lookup(binary(), binary(), integer()) -> {ok, {[_]}} | {error, not_found}. doc_lookup(Db, DocId, HealthThreshold) -> @@ -532,14 +522,12 @@ doc_lookup(Db, DocId, HealthThreshold) -> {error, not_found} end. - -spec ejson_rep_id(rep_id() | nil) -> binary() | null. ejson_rep_id(nil) -> null; ejson_rep_id({BaseId, Ext}) -> iolist_to_binary([BaseId, Ext]). - -spec ejson_doc(#rdoc{}, non_neg_integer()) -> {[_]} | nil. ejson_doc(#rdoc{state = scheduled} = RDoc, HealthThreshold) -> #rdoc{id = {DbName, DocId}, rid = RepId} = RDoc, @@ -552,18 +540,18 @@ ejson_doc(#rdoc{state = scheduled} = RDoc, HealthThreshold) -> {doc_id, DocId}, {database, DbName}, {id, ejson_rep_id(RepId)}, - {node, node()} | JobProps + {node, node()} + | JobProps ]} end; - ejson_doc(#rdoc{state = RepState} = RDoc, _HealthThreshold) -> #rdoc{ - id = {DbName, DocId}, - info = StateInfo, - rid = RepId, - errcnt = ErrorCount, - last_updated = StateTime, - rep = Rep + id = {DbName, DocId}, + info = StateInfo, + rid = RepId, + errcnt = ErrorCount, + last_updated = StateTime, + rep = Rep } = RDoc, {[ {doc_id, DocId}, @@ -577,14 +565,12 @@ ejson_doc(#rdoc{state = RepState} = RDoc, _HealthThreshold) -> {start_time, couch_replicator_utils:iso8601(Rep#rep.start_time)} ]}. - -spec ejson_doc_state_filter(atom(), [atom()]) -> boolean(). ejson_doc_state_filter(_DocState, []) -> true; ejson_doc_state_filter(State, States) when is_list(States), is_atom(State) -> lists:member(State, States). - -spec cluster_membership_foldl(#rdoc{}, nil) -> nil. cluster_membership_foldl(#rdoc{id = {DbName, DocId} = Id, rid = RepId}, nil) -> case couch_replicator_clustering:owner(DbName, DocId) of @@ -599,7 +585,6 @@ cluster_membership_foldl(#rdoc{id = {DbName, DocId} = Id, rid = RepId}, nil) -> nil end. - -ifdef(TEST). -include_lib("eunit/include/eunit.hrl"). @@ -611,7 +596,6 @@ cluster_membership_foldl(#rdoc{id = {DbName, DocId} = Id, rid = RepId}, nil) -> -define(R1, {"1", ""}). -define(R2, {"2", ""}). - doc_processor_test_() -> { setup, @@ -640,7 +624,6 @@ doc_processor_test_() -> } }. - % Can't parse replication doc, so should write failure state to document. t_bad_change() -> ?_test(begin @@ -648,7 +631,6 @@ t_bad_change() -> ?assert(updated_doc_with_failed_state()) end). - % Regular change, parse to a #rep{} and then add job. t_regular_change() -> ?_test(begin @@ -658,15 +640,13 @@ t_regular_change() -> ?assert(started_worker({?DB, ?DOC1})) end). - % Handle cases where doc processor exits or crashes while processing a change t_change_with_doc_processor_crash() -> ?_test(begin mock_existing_jobs_lookup([]), ?assertEqual(acc, db_change(?EXIT_DB, change(), acc)), ?assert(failed_state_not_updated()) - end). - + end). % Regular change, parse to a #rep{} and then add job but there is already % a running job with same Id found. @@ -678,7 +658,6 @@ t_change_with_existing_job() -> ?assert(started_worker({?DB, ?DOC1})) end). - % Change is a deletion, and job is running, so remove job. t_deleted_change() -> ?_test(begin @@ -687,7 +666,6 @@ t_deleted_change() -> ?assert(removed_job(?R2)) end). - % Change is in `triggered` state. Remove legacy state and add job. t_triggered_change() -> ?_test(begin @@ -698,7 +676,6 @@ t_triggered_change() -> ?assert(started_worker({?DB, ?DOC1})) end). - % Change is in `completed` state, so skip over it. t_completed_change() -> ?_test(begin @@ -708,7 +685,6 @@ t_completed_change() -> ?assert(did_not_spawn_worker()) end). - % Completed change comes for what used to be an active job. In this case % remove entry from doc_processor's ets (because there is no linkage or % callback mechanism for scheduler to tell doc_processsor a replication just @@ -723,7 +699,6 @@ t_active_replication_completed() -> ?assertNot(ets:member(?MODULE, {?DB, ?DOC1})) end). - % Change is in `error` state. Remove legacy state and retry % running the job. This state was used for transient erorrs which are not % written to the document anymore. @@ -736,7 +711,6 @@ t_error_change() -> ?assert(started_worker({?DB, ?DOC1})) end). - % Change is in `failed` state. This is a terminal state and it will not % be tried again, so skip over it. t_failed_change() -> @@ -747,27 +721,24 @@ t_failed_change() -> ?assert(did_not_spawn_worker()) end). - % Normal change, but according to cluster ownership algorithm, replication % belongs to a different node, so this node should skip it. t_change_for_different_node() -> - ?_test(begin + ?_test(begin meck:expect(couch_replicator_clustering, owner, 2, different_node), ?assertEqual(ok, process_change(?DB, change())), ?assert(did_not_spawn_worker()) - end). - + end). % Change handled when cluster is unstable (nodes are added or removed), so % job is not added. A rescan will be triggered soon and change will be % evaluated again. t_change_when_cluster_unstable() -> - ?_test(begin - meck:expect(couch_replicator_clustering, owner, 2, unstable), - ?assertEqual(ok, process_change(?DB, change())), - ?assert(did_not_spawn_worker()) - end). - + ?_test(begin + meck:expect(couch_replicator_clustering, owner, 2, unstable), + ?assertEqual(ok, process_change(?DB, change())), + ?assert(did_not_spawn_worker()) + end). % Check if docs/0 function produces expected ejson after adding a job t_ejson_docs() -> @@ -776,12 +747,17 @@ t_ejson_docs() -> ?assertEqual(ok, process_change(?DB, change())), ?assert(ets:member(?MODULE, {?DB, ?DOC1})), EJsonDocs = docs([]), - ?assertMatch([{[_|_]}], EJsonDocs), + ?assertMatch([{[_ | _]}], EJsonDocs), [{DocProps}] = EJsonDocs, - {value, StateTime, DocProps1} = lists:keytake(last_updated, 1, - DocProps), - ?assertMatch({last_updated, BinVal1} when is_binary(BinVal1), - StateTime), + {value, StateTime, DocProps1} = lists:keytake( + last_updated, + 1, + DocProps + ), + ?assertMatch( + {last_updated, BinVal1} when is_binary(BinVal1), + StateTime + ), {value, StartTime, DocProps2} = lists:keytake(start_time, 1, DocProps1), ?assertMatch({start_time, BinVal2} when is_binary(BinVal2), StartTime), ExpectedProps = [ @@ -796,11 +772,10 @@ t_ejson_docs() -> ?assertEqual(ExpectedProps, lists:usort(DocProps2)) end). - % Check that when cluster membership changes records from doc processor and job % scheduler get removed t_cluster_membership_foldl() -> - ?_test(begin + ?_test(begin mock_existing_jobs_lookup([test_rep(?R1)]), ?assertEqual(ok, process_change(?DB, change())), meck:expect(couch_replicator_clustering, owner, 2, different_node), @@ -809,8 +784,7 @@ t_cluster_membership_foldl() -> meck:wait(2, couch_replicator_scheduler, find_jobs_by_doc, 2, 5000), ?assertNot(ets:member(?MODULE, {?DB, ?DOC1})), ?assert(removed_job(?R1)) - end). - + end). get_worker_ref_test_() -> { @@ -830,10 +804,8 @@ get_worker_ref_test_() -> end) }. - % Test helper functions - setup_all() -> meck:expect(couch_log, info, 2, ok), meck:expect(couch_log, notice, 2, ok), @@ -842,8 +814,12 @@ setup_all() -> meck:expect(config, get, fun(_, _, Default) -> Default end), meck:expect(config, listen_for_changes, 2, ok), meck:expect(couch_replicator_clustering, owner, 2, node()), - meck:expect(couch_replicator_clustering, link_cluster_event_listener, 3, - ok), + meck:expect( + couch_replicator_clustering, + link_cluster_event_listener, + 3, + ok + ), meck:expect(couch_replicator_doc_processor_worker, spawn_worker, fun ({?EXIT_DB, _}, _, _, _) -> exit(kapow); (_, _, _, _) -> pid @@ -852,11 +828,9 @@ setup_all() -> meck:expect(couch_replicator_docs, remove_state_fields, 2, ok), meck:expect(couch_replicator_docs, update_failed, 3, ok). - teardown_all(_) -> meck:unload(). - setup() -> meck:reset([ config, @@ -873,30 +847,29 @@ setup() -> unlink(Pid), Pid. - teardown(Pid) -> - test_util:stop_sync(Pid, kill, 1000). % 1s wait should suffice - + % 1s wait should suffice + test_util:stop_sync(Pid, kill, 1000). removed_state_fields() -> meck:called(couch_replicator_docs, remove_state_fields, [?DB, ?DOC1]). - started_worker(_Id) -> 1 == meck:num_calls(couch_replicator_doc_processor_worker, spawn_worker, 4). - removed_job(Id) -> meck:called(couch_replicator_scheduler, remove_job, [test_rep(Id)]). - did_not_remove_state_fields() -> 0 == meck:num_calls(couch_replicator_docs, remove_state_fields, '_'). - did_not_spawn_worker() -> - 0 == meck:num_calls(couch_replicator_doc_processor_worker, spawn_worker, - '_'). + 0 == + meck:num_calls( + couch_replicator_doc_processor_worker, + spawn_worker, + '_' + ). updated_doc_with_failed_state() -> 1 == meck:num_calls(couch_replicator_docs, update_failed, '_'). @@ -910,53 +883,52 @@ mock_existing_jobs_lookup(ExistingJobs) -> (?DB, ?DOC1) -> ExistingJobs end). - test_rep(Id) -> - #rep{id = Id, start_time = {0, 0, 0}}. - + #rep{id = Id, start_time = {0, 0, 0}}. change() -> {[ {<<"id">>, ?DOC1}, - {doc, {[ - {<<"_id">>, ?DOC1}, - {<<"source">>, <<"http://srchost.local/src">>}, - {<<"target">>, <<"http://tgthost.local/tgt">>} - ]}} + {doc, + {[ + {<<"_id">>, ?DOC1}, + {<<"source">>, <<"http://srchost.local/src">>}, + {<<"target">>, <<"http://tgthost.local/tgt">>} + ]}} ]}. - change(State) -> {[ {<<"id">>, ?DOC1}, - {doc, {[ - {<<"_id">>, ?DOC1}, - {<<"source">>, <<"http://srchost.local/src">>}, - {<<"target">>, <<"http://tgthost.local/tgt">>}, - {<<"_replication_state">>, State} - ]}} + {doc, + {[ + {<<"_id">>, ?DOC1}, + {<<"source">>, <<"http://srchost.local/src">>}, + {<<"target">>, <<"http://tgthost.local/tgt">>}, + {<<"_replication_state">>, State} + ]}} ]}. - deleted_change() -> {[ {<<"id">>, ?DOC1}, {<<"deleted">>, true}, - {doc, {[ - {<<"_id">>, ?DOC1}, - {<<"source">>, <<"http://srchost.local/src">>}, - {<<"target">>, <<"http://tgthost.local/tgt">>} - ]}} + {doc, + {[ + {<<"_id">>, ?DOC1}, + {<<"source">>, <<"http://srchost.local/src">>}, + {<<"target">>, <<"http://tgthost.local/tgt">>} + ]}} ]}. - bad_change() -> {[ {<<"id">>, ?DOC2}, - {doc, {[ - {<<"_id">>, ?DOC2}, - {<<"source">>, <<"src">>} - ]}} + {doc, + {[ + {<<"_id">>, ?DOC2}, + {<<"source">>, <<"src">>} + ]}} ]}. -endif. |