diff options
Diffstat (limited to 'src/couch_replicator/src/couch_replicator_scheduler_job.erl')
-rw-r--r-- | src/couch_replicator/src/couch_replicator_scheduler_job.erl | 1182 |
1 files changed, 0 insertions, 1182 deletions
diff --git a/src/couch_replicator/src/couch_replicator_scheduler_job.erl b/src/couch_replicator/src/couch_replicator_scheduler_job.erl deleted file mode 100644 index 777636691..000000000 --- a/src/couch_replicator/src/couch_replicator_scheduler_job.erl +++ /dev/null @@ -1,1182 +0,0 @@ -% Licensed under the Apache License, Version 2.0 (the "License"); you may not -% use this file except in compliance with the License. You may obtain a copy of -% the License at -% -% http://www.apache.org/licenses/LICENSE-2.0 -% -% Unless required by applicable law or agreed to in writing, software -% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -% License for the specific language governing permissions and limitations under -% the License. - --module(couch_replicator_scheduler_job). - --behaviour(gen_server). - --export([ - start_link/1 -]). - --export([ - init/1, - terminate/2, - handle_call/3, - handle_info/2, - handle_cast/2, - code_change/3, - format_status/2 -]). - --include_lib("couch/include/couch_db.hrl"). --include_lib("couch_replicator/include/couch_replicator_api_wrap.hrl"). --include("couch_replicator.hrl"). - --import(couch_util, [ - get_value/2, - get_value/3, - to_binary/1 -]). - --import(couch_replicator_utils, [ - pp_rep_id/1 -]). - --define(LOWEST_SEQ, 0). --define(DEFAULT_CHECKPOINT_INTERVAL, 30000). --define(STARTUP_JITTER_DEFAULT, 5000). - --record(rep_state, { - rep_details, - source_name, - target_name, - source, - target, - history, - checkpoint_history, - start_seq, - committed_seq, - current_through_seq, - seqs_in_progress = [], - highest_seq_done = {0, ?LOWEST_SEQ}, - source_log, - target_log, - rep_starttime, - src_starttime, - tgt_starttime, - % checkpoint timer - timer, - changes_queue, - changes_manager, - changes_reader, - workers, - stats = couch_replicator_stats:new(), - session_id, - source_seq = nil, - use_checkpoints = true, - checkpoint_interval = ?DEFAULT_CHECKPOINT_INTERVAL, - type = db, - view = nil -}). - -start_link(#rep{id = {BaseId, Ext}, source = Src, target = Tgt} = Rep) -> - RepChildId = BaseId ++ Ext, - Source = couch_replicator_api_wrap:db_uri(Src), - Target = couch_replicator_api_wrap:db_uri(Tgt), - ServerName = {global, {?MODULE, Rep#rep.id}}, - - case gen_server:start_link(ServerName, ?MODULE, Rep, []) of - {ok, Pid} -> - {ok, Pid}; - {error, Reason} -> - couch_log:warning( - "failed to start replication `~s` (`~s` -> `~s`)", - [RepChildId, Source, Target] - ), - {error, Reason} - end. - -init(InitArgs) -> - {ok, InitArgs, 0}. - -do_init(#rep{options = Options, id = {BaseId, Ext}, user_ctx = UserCtx} = Rep) -> - process_flag(trap_exit, true), - - timer:sleep(startup_jitter()), - - #rep_state{ - source = Source, - target = Target, - source_name = SourceName, - target_name = TargetName, - start_seq = {_Ts, StartSeq}, - highest_seq_done = {_, HighestSeq}, - checkpoint_interval = CheckpointInterval - } = State = init_state(Rep), - - NumWorkers = get_value(worker_processes, Options), - BatchSize = get_value(worker_batch_size, Options), - {ok, ChangesQueue} = couch_work_queue:new([ - {max_items, BatchSize * NumWorkers * 2}, - {max_size, 100 * 1024 * NumWorkers} - ]), - % This starts the _changes reader process. It adds the changes from - % the source db to the ChangesQueue. - {ok, ChangesReader} = couch_replicator_changes_reader:start_link( - StartSeq, Source, ChangesQueue, Options - ), - % Changes manager - responsible for dequeing batches from the changes queue - % and deliver them to the worker processes. - ChangesManager = spawn_changes_manager(self(), ChangesQueue, BatchSize), - % This starts the worker processes. They ask the changes queue manager for a - % a batch of _changes rows to process -> check which revs are missing in the - % target, and for the missing ones, it copies them from the source to the target. - MaxConns = get_value(http_connections, Options), - Workers = lists:map( - fun(_) -> - couch_stats:increment_counter([couch_replicator, workers_started]), - {ok, Pid} = couch_replicator_worker:start_link( - self(), Source, Target, ChangesManager, MaxConns - ), - Pid - end, - lists:seq(1, NumWorkers) - ), - - couch_task_status:add_task( - [ - {type, replication}, - {user, UserCtx#user_ctx.name}, - {replication_id, ?l2b(BaseId ++ Ext)}, - {database, Rep#rep.db_name}, - {doc_id, Rep#rep.doc_id}, - {source, ?l2b(SourceName)}, - {target, ?l2b(TargetName)}, - {continuous, get_value(continuous, Options, false)}, - {source_seq, HighestSeq}, - {checkpoint_interval, CheckpointInterval} - ] ++ rep_stats(State) - ), - couch_task_status:set_update_frequency(1000), - - % Until OTP R14B03: - % - % Restarting a temporary supervised child implies that the original arguments - % (#rep{} record) specified in the MFA component of the supervisor - % child spec will always be used whenever the child is restarted. - % This implies the same replication performance tunning parameters will - % always be used. The solution is to delete the child spec (see - % cancel_replication/1) and then start the replication again, but this is - % unfortunately not immune to race conditions. - - log_replication_start(State), - couch_log:debug("Worker pids are: ~p", [Workers]), - - doc_update_triggered(Rep), - - {ok, State#rep_state{ - changes_queue = ChangesQueue, - changes_manager = ChangesManager, - changes_reader = ChangesReader, - workers = Workers - }}. - -handle_call({add_stats, Stats}, From, State) -> - gen_server:reply(From, ok), - NewStats = couch_replicator_utils:sum_stats(State#rep_state.stats, Stats), - {noreply, State#rep_state{stats = NewStats}}; -handle_call( - {report_seq_done, Seq, StatsInc}, - From, - #rep_state{ - seqs_in_progress = SeqsInProgress, - highest_seq_done = HighestDone, - current_through_seq = ThroughSeq, - stats = Stats - } = State -) -> - gen_server:reply(From, ok), - {NewThroughSeq0, NewSeqsInProgress} = - case SeqsInProgress of - [] -> - {Seq, []}; - [Seq | Rest] -> - {Seq, Rest}; - [_ | _] -> - {ThroughSeq, ordsets:del_element(Seq, SeqsInProgress)} - end, - NewHighestDone = lists:max([HighestDone, Seq]), - NewThroughSeq = - case NewSeqsInProgress of - [] -> - lists:max([NewThroughSeq0, NewHighestDone]); - _ -> - NewThroughSeq0 - end, - couch_log:debug( - "Worker reported seq ~p, through seq was ~p, " - "new through seq is ~p, highest seq done was ~p, " - "new highest seq done is ~p~n" - "Seqs in progress were: ~p~nSeqs in progress are now: ~p", - [ - Seq, - ThroughSeq, - NewThroughSeq, - HighestDone, - NewHighestDone, - SeqsInProgress, - NewSeqsInProgress - ] - ), - NewState = State#rep_state{ - stats = couch_replicator_utils:sum_stats(Stats, StatsInc), - current_through_seq = NewThroughSeq, - seqs_in_progress = NewSeqsInProgress, - highest_seq_done = NewHighestDone - }, - update_task(NewState), - {noreply, NewState}. - -handle_cast(checkpoint, State) -> - case do_checkpoint(State) of - {ok, NewState} -> - couch_stats:increment_counter([couch_replicator, checkpoints, success]), - {noreply, NewState#rep_state{timer = start_timer(State)}}; - Error -> - couch_stats:increment_counter([couch_replicator, checkpoints, failure]), - {stop, Error, State} - end; -handle_cast( - {report_seq, Seq}, - #rep_state{seqs_in_progress = SeqsInProgress} = State -) -> - NewSeqsInProgress = ordsets:add_element(Seq, SeqsInProgress), - {noreply, State#rep_state{seqs_in_progress = NewSeqsInProgress}}. - -handle_info(shutdown, St) -> - {stop, shutdown, St}; - -handle_info({'EXIT', Pid, max_backoff}, State) -> - couch_log:error("Max backoff reached child process ~p", [Pid]), - {stop, {shutdown, max_backoff}, State}; - -handle_info({'EXIT', Pid, {shutdown, max_backoff}}, State) -> - couch_log:error("Max backoff reached child process ~p", [Pid]), - {stop, {shutdown, max_backoff}, State}; - -handle_info({'EXIT', Pid, normal}, #rep_state{changes_reader=Pid} = State) -> - {noreply, State}; - -handle_info({'EXIT', Pid, Reason0}, #rep_state{changes_reader=Pid} = State) -> - couch_stats:increment_counter([couch_replicator, changes_reader_deaths]), - Reason = case Reason0 of - {changes_req_failed, _, _} = HttpFail -> - HttpFail; - {http_request_failed, _, _, {error, {code, Code}}} -> - {changes_req_failed, Code}; - {http_request_failed, _, _, {error, Err}} -> - {changes_req_failed, Err}; - Other -> - {changes_reader_died, Other} - end, - couch_log:error("ChangesReader process died with reason: ~p", [Reason]), - {stop, {shutdown, Reason}, cancel_timer(State)}; - -handle_info({'EXIT', Pid, normal}, #rep_state{changes_manager = Pid} = State) -> - {noreply, State}; - -handle_info({'EXIT', Pid, Reason}, #rep_state{changes_manager = Pid} = State) -> - couch_stats:increment_counter([couch_replicator, changes_manager_deaths]), - couch_log:error("ChangesManager process died with reason: ~p", [Reason]), - {stop, {shutdown, {changes_manager_died, Reason}}, cancel_timer(State)}; - -handle_info({'EXIT', Pid, normal}, #rep_state{changes_queue=Pid} = State) -> - {noreply, State}; - -handle_info({'EXIT', Pid, Reason}, #rep_state{changes_queue=Pid} = State) -> - couch_stats:increment_counter([couch_replicator, changes_queue_deaths]), - couch_log:error("ChangesQueue process died with reason: ~p", [Reason]), - {stop, {shutdown, {changes_queue_died, Reason}}, cancel_timer(State)}; - -handle_info({'EXIT', Pid, normal}, #rep_state{workers = Workers} = State) -> - case Workers -- [Pid] of - Workers -> - couch_log:error("unknown pid bit the dust ~p ~n",[Pid]), - {noreply, State#rep_state{workers = Workers}}; - %% not clear why a stop was here before - %%{stop, {unknown_process_died, Pid, normal}, State}; - [] -> - catch unlink(State#rep_state.changes_manager), - catch exit(State#rep_state.changes_manager, kill), - do_last_checkpoint(State); - Workers2 -> - {noreply, State#rep_state{workers = Workers2}} - end; - -handle_info({'EXIT', Pid, Reason}, #rep_state{workers = Workers} = State) -> - State2 = cancel_timer(State), - case lists:member(Pid, Workers) of - false -> - {stop, {unknown_process_died, Pid, Reason}, State2}; - true -> - couch_stats:increment_counter([couch_replicator, worker_deaths]), - StopReason = case Reason of - {shutdown, _} = Err -> - Err; - Other -> - couch_log:error("Worker ~p died with reason: ~p", [Pid, Reason]), - {worker_died, Pid, Other} - end, - {stop, StopReason, State2} - end; - -handle_info(timeout, InitArgs) -> - try do_init(InitArgs) of {ok, State} -> - {noreply, State} - catch - exit:{http_request_failed, _, _, max_backoff} -> - {stop, {shutdown, max_backoff}, {error, InitArgs}}; - ?STACKTRACE(Class, Error, Stack) - ShutdownReason = {error, replication_start_error(Error)}, - StackTop2 = lists:sublist(Stack, 2), - % Shutdown state is a hack as it is not really the state of the - % gen_server (it failed to initialize, so it doesn't have one). - % Shutdown state is used to pass extra info about why start failed. - ShutdownState = {error, Class, StackTop2, InitArgs}, - {stop, {shutdown, ShutdownReason}, ShutdownState} - end; - -handle_info({Ref, Tuple}, State) when is_reference(Ref), is_tuple(Tuple) -> - % Ignore responses from timed-out or retried ibrowse calls. Aliases in - % Erlang 24 should help with this problem, so we should revisit this clause - % when we update our minimum Erlang version to >= 24. - {noreply, State}. - -terminate( - normal, - #rep_state{ - rep_details = #rep{id = RepId} = Rep, - checkpoint_history = CheckpointHistory - } = State -) -> - terminate_cleanup(State), - couch_replicator_notifier:notify({finished, RepId, CheckpointHistory}), - doc_update_completed(Rep, rep_stats(State)); -terminate(shutdown, #rep_state{rep_details = #rep{id = RepId}} = State) -> - % Replication stopped via _scheduler_sup:terminate_child/1, which can be - % occur during regular scheduler operation or when job is removed from - % the scheduler. - State1 = - case do_checkpoint(State) of - {ok, NewState} -> - NewState; - Error -> - LogMsg = "~p : Failed last checkpoint. Job: ~p Error: ~p", - couch_log:error(LogMsg, [?MODULE, RepId, Error]), - State - end, - couch_replicator_notifier:notify({stopped, RepId, <<"stopped">>}), - terminate_cleanup(State1); -terminate({shutdown, max_backoff}, {error, InitArgs}) -> - #rep{id = {BaseId, Ext} = RepId} = InitArgs, - couch_stats:increment_counter([couch_replicator, failed_starts]), - couch_log:warning("Replication `~s` reached max backoff ", [BaseId ++ Ext]), - couch_replicator_notifier:notify({error, RepId, max_backoff}); -terminate({shutdown, {error, Error}}, {error, Class, Stack, InitArgs}) -> - #rep{ - id = {BaseId, Ext} = RepId, - source = Source0, - target = Target0, - doc_id = DocId, - db_name = DbName - } = InitArgs, - Source = couch_replicator_api_wrap:db_uri(Source0), - Target = couch_replicator_api_wrap:db_uri(Target0), - RepIdStr = BaseId ++ Ext, - Msg = "~p:~p: Replication ~s failed to start ~p -> ~p doc ~p:~p stack:~p", - couch_log:error(Msg, [ - Class, - Error, - RepIdStr, - Source, - Target, - DbName, - DocId, - Stack - ]), - couch_stats:increment_counter([couch_replicator, failed_starts]), - couch_replicator_notifier:notify({error, RepId, Error}); -terminate({shutdown, max_backoff}, State) -> - #rep_state{ - source_name = Source, - target_name = Target, - rep_details = #rep{id = {BaseId, Ext} = RepId} - } = State, - couch_log:error( - "Replication `~s` (`~s` -> `~s`) reached max backoff", - [BaseId ++ Ext, Source, Target] - ), - terminate_cleanup(State), - couch_replicator_notifier:notify({error, RepId, max_backoff}); -terminate({shutdown, Reason}, State) -> - % Unwrap so when reporting we don't have an extra {shutdown, ...} tuple - % wrapped around the message - terminate(Reason, State); -terminate(Reason, State) -> - #rep_state{ - source_name = Source, - target_name = Target, - rep_details = #rep{id = {BaseId, Ext} = RepId} - } = State, - couch_log:error( - "Replication `~s` (`~s` -> `~s`) failed: ~s", - [BaseId ++ Ext, Source, Target, to_binary(Reason)] - ), - terminate_cleanup(State), - couch_replicator_notifier:notify({error, RepId, Reason}). - -terminate_cleanup(State) -> - update_task(State), - couch_replicator_api_wrap:db_close(State#rep_state.source), - couch_replicator_api_wrap:db_close(State#rep_state.target). - -code_change(_OldVsn, #rep_state{} = State, _Extra) -> - {ok, State}. - -format_status(_Opt, [_PDict, State]) -> - #rep_state{ - source = Source, - target = Target, - rep_details = RepDetails, - start_seq = StartSeq, - source_seq = SourceSeq, - committed_seq = CommitedSeq, - current_through_seq = ThroughSeq, - highest_seq_done = HighestSeqDone, - session_id = SessionId - } = state_strip_creds(State), - #rep{ - id = RepId, - options = Options, - doc_id = DocId, - db_name = DbName - } = RepDetails, - [ - {rep_id, RepId}, - {source, couch_replicator_api_wrap:db_uri(Source)}, - {target, couch_replicator_api_wrap:db_uri(Target)}, - {db_name, DbName}, - {doc_id, DocId}, - {options, Options}, - {session_id, SessionId}, - {start_seq, StartSeq}, - {source_seq, SourceSeq}, - {committed_seq, CommitedSeq}, - {current_through_seq, ThroughSeq}, - {highest_seq_done, HighestSeqDone} - ]. - -startup_jitter() -> - Jitter = config:get_integer( - "replicator", - "startup_jitter", - ?STARTUP_JITTER_DEFAULT - ), - couch_rand:uniform(erlang:max(1, Jitter)). - -headers_strip_creds([], Acc) -> - lists:reverse(Acc); -headers_strip_creds([{Key, Value0} | Rest], Acc) -> - Value = - case string:to_lower(Key) of - "authorization" -> - "****"; - _ -> - Value0 - end, - headers_strip_creds(Rest, [{Key, Value} | Acc]). - -httpdb_strip_creds(#httpdb{url = Url, headers = Headers} = HttpDb) -> - HttpDb#httpdb{ - url = couch_util:url_strip_password(Url), - headers = headers_strip_creds(Headers, []) - }; -httpdb_strip_creds(LocalDb) -> - LocalDb. - -rep_strip_creds(#rep{source = Source, target = Target} = Rep) -> - Rep#rep{ - source = httpdb_strip_creds(Source), - target = httpdb_strip_creds(Target) - }. - -state_strip_creds(#rep_state{rep_details = Rep, source = Source, target = Target} = State) -> - % #rep_state contains the source and target at the top level and also - % in the nested #rep_details record - State#rep_state{ - rep_details = rep_strip_creds(Rep), - source = httpdb_strip_creds(Source), - target = httpdb_strip_creds(Target) - }. - -adjust_maxconn(Src = #httpdb{http_connections = 1}, RepId) -> - Msg = "Adjusting minimum number of HTTP source connections to 2 for ~p", - couch_log:notice(Msg, [RepId]), - Src#httpdb{http_connections = 2}; -adjust_maxconn(Src, _RepId) -> - Src. - --spec doc_update_triggered(#rep{}) -> ok. -doc_update_triggered(#rep{db_name = null}) -> - ok; -doc_update_triggered(#rep{id = RepId, doc_id = DocId} = Rep) -> - case couch_replicator_doc_processor:update_docs() of - true -> - couch_replicator_docs:update_triggered(Rep, RepId); - false -> - ok - end, - couch_log:notice( - "Document `~s` triggered replication `~s`", - [DocId, pp_rep_id(RepId)] - ), - ok. - --spec doc_update_completed(#rep{}, list()) -> ok. -doc_update_completed(#rep{db_name = null}, _Stats) -> - ok; -doc_update_completed( - #rep{ - id = RepId, - doc_id = DocId, - db_name = DbName, - start_time = StartTime - }, - Stats0 -) -> - Stats = Stats0 ++ [{start_time, couch_replicator_utils:iso8601(StartTime)}], - couch_replicator_docs:update_doc_completed(DbName, DocId, Stats), - couch_log:notice( - "Replication `~s` completed (triggered by `~s`)", - [pp_rep_id(RepId), DocId] - ), - ok. - -do_last_checkpoint( - #rep_state{ - seqs_in_progress = [], - highest_seq_done = {_Ts, ?LOWEST_SEQ} - } = State -) -> - {stop, normal, cancel_timer(State)}; -do_last_checkpoint( - #rep_state{ - seqs_in_progress = [], - highest_seq_done = Seq - } = State -) -> - case do_checkpoint(State#rep_state{current_through_seq = Seq}) of - {ok, NewState} -> - couch_stats:increment_counter([couch_replicator, checkpoints, success]), - {stop, normal, cancel_timer(NewState)}; - Error -> - couch_stats:increment_counter([couch_replicator, checkpoints, failure]), - {stop, Error, State} - end. - -start_timer(State) -> - After = State#rep_state.checkpoint_interval, - case timer:apply_after(After, gen_server, cast, [self(), checkpoint]) of - {ok, Ref} -> - Ref; - Error -> - couch_log:error("Replicator, error scheduling checkpoint: ~p", [Error]), - nil - end. - -cancel_timer(#rep_state{timer = nil} = State) -> - State; -cancel_timer(#rep_state{timer = Timer} = State) -> - {ok, cancel} = timer:cancel(Timer), - State#rep_state{timer = nil}. - -init_state(Rep) -> - #rep{ - id = {BaseId, _Ext}, - source = Src0, - target = Tgt, - options = Options, - type = Type, - view = View, - start_time = StartTime, - stats = ArgStats0 - } = Rep, - % Adjust minimum number of http source connections to 2 to avoid deadlock - Src = adjust_maxconn(Src0, BaseId), - {ok, Source} = couch_replicator_api_wrap:db_open(Src), - {CreateTargetParams} = get_value(create_target_params, Options, {[]}), - {ok, Target} = couch_replicator_api_wrap:db_open( - Tgt, - get_value(create_target, Options, false), - CreateTargetParams - ), - - {ok, SourceInfo} = couch_replicator_api_wrap:get_db_info(Source), - {ok, TargetInfo} = couch_replicator_api_wrap:get_db_info(Target), - - [SourceLog, TargetLog] = find_and_migrate_logs([Source, Target], Rep), - - {StartSeq0, History} = compare_replication_logs(SourceLog, TargetLog), - - ArgStats1 = couch_replicator_stats:new(ArgStats0), - HistoryStats = - case History of - [{[_ | _] = HProps} | _] -> couch_replicator_stats:new(HProps); - _ -> couch_replicator_stats:new() - end, - Stats = couch_replicator_stats:max_stats(ArgStats1, HistoryStats), - - StartSeq1 = get_value(since_seq, Options, StartSeq0), - StartSeq = {0, StartSeq1}, - - SourceSeq = get_value(<<"update_seq">>, SourceInfo, ?LOWEST_SEQ), - - #doc{body = {CheckpointHistory}} = SourceLog, - State = #rep_state{ - rep_details = Rep, - source_name = couch_replicator_api_wrap:db_uri(Source), - target_name = couch_replicator_api_wrap:db_uri(Target), - source = Source, - target = Target, - history = History, - checkpoint_history = {[{<<"no_changes">>, true} | CheckpointHistory]}, - start_seq = StartSeq, - current_through_seq = StartSeq, - committed_seq = StartSeq, - source_log = SourceLog, - target_log = TargetLog, - rep_starttime = StartTime, - src_starttime = get_value(<<"instance_start_time">>, SourceInfo), - tgt_starttime = get_value(<<"instance_start_time">>, TargetInfo), - session_id = couch_uuids:random(), - source_seq = SourceSeq, - use_checkpoints = get_value(use_checkpoints, Options, true), - checkpoint_interval = get_value( - checkpoint_interval, - Options, - ?DEFAULT_CHECKPOINT_INTERVAL - ), - type = Type, - view = View, - stats = Stats - }, - State#rep_state{timer = start_timer(State)}. - -find_and_migrate_logs(DbList, #rep{id = {BaseId, _}} = Rep) -> - LogId = ?l2b(?LOCAL_DOC_PREFIX ++ BaseId), - fold_replication_logs(DbList, ?REP_ID_VERSION, LogId, LogId, Rep, []). - -fold_replication_logs([], _Vsn, _LogId, _NewId, _Rep, Acc) -> - lists:reverse(Acc); -fold_replication_logs([Db | Rest] = Dbs, Vsn, LogId, NewId, Rep, Acc) -> - case couch_replicator_api_wrap:open_doc(Db, LogId, [ejson_body]) of - {error, <<"not_found">>} when Vsn > 1 -> - OldRepId = couch_replicator_utils:replication_id(Rep, Vsn - 1), - fold_replication_logs( - Dbs, - Vsn - 1, - ?l2b(?LOCAL_DOC_PREFIX ++ OldRepId), - NewId, - Rep, - Acc - ); - {error, <<"not_found">>} -> - fold_replication_logs( - Rest, ?REP_ID_VERSION, NewId, NewId, Rep, [#doc{id = NewId} | Acc] - ); - {ok, Doc} when LogId =:= NewId -> - fold_replication_logs( - Rest, ?REP_ID_VERSION, NewId, NewId, Rep, [Doc | Acc] - ); - {ok, Doc} -> - MigratedLog = #doc{id = NewId, body = Doc#doc.body}, - maybe_save_migrated_log(Rep, Db, MigratedLog, Doc#doc.id), - fold_replication_logs( - Rest, ?REP_ID_VERSION, NewId, NewId, Rep, [MigratedLog | Acc] - ) - end. - -maybe_save_migrated_log(Rep, Db, #doc{} = Doc, OldId) -> - case get_value(use_checkpoints, Rep#rep.options, true) of - true -> - update_checkpoint(Db, Doc), - Msg = "Migrated replication checkpoint. Db:~p ~p -> ~p", - couch_log:notice(Msg, [httpdb_strip_creds(Db), OldId, Doc#doc.id]); - false -> - ok - end. - -spawn_changes_manager(Parent, ChangesQueue, BatchSize) -> - spawn_link(fun() -> - changes_manager_loop_open(Parent, ChangesQueue, BatchSize, 1) - end). - -changes_manager_loop_open(Parent, ChangesQueue, BatchSize, Ts) -> - receive - {get_changes, From} -> - case couch_work_queue:dequeue(ChangesQueue, BatchSize) of - closed -> - From ! {closed, self()}; - {ok, ChangesOrLastSeqs} -> - ReportSeq = - case lists:last(ChangesOrLastSeqs) of - {last_seq, Seq} -> - {Ts, Seq}; - #doc_info{high_seq = Seq} -> - {Ts, Seq} - end, - Changes = lists:filter( - fun - (#doc_info{}) -> - true; - ({last_seq, _Seq}) -> - false - end, - ChangesOrLastSeqs - ), - ok = gen_server:cast(Parent, {report_seq, ReportSeq}), - From ! {changes, self(), Changes, ReportSeq} - end, - changes_manager_loop_open(Parent, ChangesQueue, BatchSize, Ts + 1) - end. - -do_checkpoint(#rep_state{use_checkpoints = false} = State) -> - NewState = State#rep_state{checkpoint_history = {[{<<"use_checkpoints">>, false}]}}, - {ok, NewState}; -do_checkpoint(#rep_state{current_through_seq = Seq, committed_seq = Seq} = State) -> - update_task(State), - {ok, State}; -do_checkpoint(State) -> - #rep_state{ - source_name = SourceName, - target_name = TargetName, - source = Source, - target = Target, - history = OldHistory, - start_seq = {_, StartSeq}, - current_through_seq = {_Ts, NewSeq} = NewTsSeq, - source_log = SourceLog, - target_log = TargetLog, - rep_starttime = ReplicationStartTime, - src_starttime = SrcInstanceStartTime, - tgt_starttime = TgtInstanceStartTime, - stats = Stats, - rep_details = #rep{options = Options}, - session_id = SessionId - } = State, - case commit_to_both(Source, Target) of - {source_error, Reason} -> - {checkpoint_commit_failure, - <<"Failure on source commit: ", (to_binary(Reason))/binary>>}; - {target_error, Reason} -> - {checkpoint_commit_failure, - <<"Failure on target commit: ", (to_binary(Reason))/binary>>}; - {SrcInstanceStartTime, TgtInstanceStartTime} -> - couch_log:notice( - "recording a checkpoint for `~s` -> `~s` at source update_seq ~p", - [SourceName, TargetName, NewSeq] - ), - LocalStartTime = calendar:now_to_local_time(ReplicationStartTime), - StartTime = ?l2b(httpd_util:rfc1123_date(LocalStartTime)), - EndTime = ?l2b(httpd_util:rfc1123_date()), - NewHistoryEntry = - {[ - {<<"session_id">>, SessionId}, - {<<"start_time">>, StartTime}, - {<<"end_time">>, EndTime}, - {<<"start_last_seq">>, StartSeq}, - {<<"end_last_seq">>, NewSeq}, - {<<"recorded_seq">>, NewSeq}, - {<<"missing_checked">>, couch_replicator_stats:missing_checked(Stats)}, - {<<"missing_found">>, couch_replicator_stats:missing_found(Stats)}, - {<<"docs_read">>, couch_replicator_stats:docs_read(Stats)}, - {<<"docs_written">>, couch_replicator_stats:docs_written(Stats)}, - {<<"doc_write_failures">>, couch_replicator_stats:doc_write_failures(Stats)} - ]}, - BaseHistory = - [ - {<<"session_id">>, SessionId}, - {<<"source_last_seq">>, NewSeq}, - {<<"replication_id_version">>, ?REP_ID_VERSION} - ] ++ - case get_value(doc_ids, Options) of - undefined -> - []; - _DocIds -> - % backwards compatibility with the result of a replication by - % doc IDs in versions 0.11.x and 1.0.x - % TODO: deprecate (use same history format, simplify code) - [ - {<<"start_time">>, StartTime}, - {<<"end_time">>, EndTime}, - {<<"docs_read">>, couch_replicator_stats:docs_read(Stats)}, - {<<"docs_written">>, couch_replicator_stats:docs_written(Stats)}, - {<<"doc_write_failures">>, - couch_replicator_stats:doc_write_failures(Stats)} - ] - end, - % limit history to 50 entries - NewRepHistory = { - BaseHistory ++ - [{<<"history">>, lists:sublist([NewHistoryEntry | OldHistory], 50)}] - }, - - try - {SrcRevPos, SrcRevId} = update_checkpoint( - Source, SourceLog#doc{body = NewRepHistory}, source - ), - {TgtRevPos, TgtRevId} = update_checkpoint( - Target, TargetLog#doc{body = NewRepHistory}, target - ), - NewState = State#rep_state{ - checkpoint_history = NewRepHistory, - committed_seq = NewTsSeq, - source_log = SourceLog#doc{revs = {SrcRevPos, [SrcRevId]}}, - target_log = TargetLog#doc{revs = {TgtRevPos, [TgtRevId]}} - }, - update_task(NewState), - {ok, NewState} - catch - throw:{checkpoint_commit_failure, _} = Failure -> - Failure - end; - {SrcInstanceStartTime, _NewTgtInstanceStartTime} -> - {checkpoint_commit_failure, << - "instance_start_time on target database has changed since last checkpoint." - >>}; - {_NewSrcInstanceStartTime, TgtInstanceStartTime} -> - {checkpoint_commit_failure, << - "instance_start_time on source database has changed since last checkpoint." - >>}; - {_NewSrcInstanceStartTime, _NewTgtInstanceStartTime} -> - {checkpoint_commit_failure, << - "instance_start_time on source and target database has changed since last checkpoint." - >>} - end. - -update_checkpoint(Db, Doc, DbType) -> - try - update_checkpoint(Db, Doc) - catch - throw:{checkpoint_commit_failure, Reason} -> - throw( - {checkpoint_commit_failure, - <<"Error updating the ", (to_binary(DbType))/binary, " checkpoint document: ", - (to_binary(Reason))/binary>>} - ) - end. - -update_checkpoint(Db, #doc{id = LogId, body = LogBody} = Doc) -> - try - case couch_replicator_api_wrap:update_doc(Db, Doc, [delay_commit]) of - {ok, PosRevId} -> - PosRevId; - {error, Reason} -> - throw({checkpoint_commit_failure, Reason}) - end - catch - throw:conflict -> - case (catch couch_replicator_api_wrap:open_doc(Db, LogId, [ejson_body])) of - {ok, #doc{body = LogBody, revs = {Pos, [RevId | _]}}} -> - % This means that we were able to update successfully the - % checkpoint doc in a previous attempt but we got a connection - % error (timeout for e.g.) before receiving the success response. - % Therefore the request was retried and we got a conflict, as the - % revision we sent is not the current one. - % We confirm this by verifying the doc body we just got is the same - % that we have just sent. - {Pos, RevId}; - _ -> - throw({checkpoint_commit_failure, conflict}) - end - end. - -commit_to_both(Source, Target) -> - % commit the src async - ParentPid = self(), - SrcCommitPid = spawn_link( - fun() -> - Result = (catch couch_replicator_api_wrap:ensure_full_commit(Source)), - ParentPid ! {self(), Result} - end - ), - - % commit tgt sync - TargetResult = (catch couch_replicator_api_wrap:ensure_full_commit(Target)), - - SourceResult = - receive - {SrcCommitPid, Result} -> - unlink(SrcCommitPid), - receive - {'EXIT', SrcCommitPid, _} -> ok - after 0 -> ok - end, - Result; - {'EXIT', SrcCommitPid, Reason} -> - {error, Reason} - end, - case TargetResult of - {ok, TargetStartTime} -> - case SourceResult of - {ok, SourceStartTime} -> - {SourceStartTime, TargetStartTime}; - SourceError -> - {source_error, SourceError} - end; - TargetError -> - {target_error, TargetError} - end. - -compare_replication_logs(SrcDoc, TgtDoc) -> - #doc{body = {RepRecProps}} = SrcDoc, - #doc{body = {RepRecPropsTgt}} = TgtDoc, - case - get_value(<<"session_id">>, RepRecProps) == - get_value(<<"session_id">>, RepRecPropsTgt) - of - true -> - % if the records have the same session id, - % then we have a valid replication history - OldSeqNum = get_value(<<"source_last_seq">>, RepRecProps, ?LOWEST_SEQ), - OldHistory = get_value(<<"history">>, RepRecProps, []), - {OldSeqNum, OldHistory}; - false -> - SourceHistory = get_value(<<"history">>, RepRecProps, []), - TargetHistory = get_value(<<"history">>, RepRecPropsTgt, []), - couch_log:notice( - "Replication records differ. " - "Scanning histories to find a common ancestor.", - [] - ), - couch_log:debug( - "Record on source:~p~nRecord on target:~p~n", - [RepRecProps, RepRecPropsTgt] - ), - compare_rep_history(SourceHistory, TargetHistory) - end. - -compare_rep_history(S, T) when S =:= [] orelse T =:= [] -> - couch_log:notice("no common ancestry -- performing full replication", []), - {?LOWEST_SEQ, []}; -compare_rep_history([{S} | SourceRest], [{T} | TargetRest] = Target) -> - SourceId = get_value(<<"session_id">>, S), - case has_session_id(SourceId, Target) of - true -> - RecordSeqNum = get_value(<<"recorded_seq">>, S, ?LOWEST_SEQ), - couch_log:notice( - "found a common replication record with source_seq ~p", - [RecordSeqNum] - ), - {RecordSeqNum, SourceRest}; - false -> - TargetId = get_value(<<"session_id">>, T), - case has_session_id(TargetId, SourceRest) of - true -> - RecordSeqNum = get_value(<<"recorded_seq">>, T, ?LOWEST_SEQ), - couch_log:notice( - "found a common replication record with source_seq ~p", - [RecordSeqNum] - ), - {RecordSeqNum, TargetRest}; - false -> - compare_rep_history(SourceRest, TargetRest) - end - end. - -has_session_id(_SessionId, []) -> - false; -has_session_id(SessionId, [{Props} | Rest]) -> - case get_value(<<"session_id">>, Props, nil) of - SessionId -> - true; - _Else -> - has_session_id(SessionId, Rest) - end. - -get_pending_count(St) -> - Rep = St#rep_state.rep_details, - Timeout = get_value(connection_timeout, Rep#rep.options), - TimeoutMicro = Timeout * 1000, - case get(pending_count_state) of - {LastUpdate, PendingCount} -> - case timer:now_diff(os:timestamp(), LastUpdate) > TimeoutMicro of - true -> - NewPendingCount = get_pending_count_int(St), - put(pending_count_state, {os:timestamp(), NewPendingCount}), - NewPendingCount; - false -> - PendingCount - end; - undefined -> - NewPendingCount = get_pending_count_int(St), - put(pending_count_state, {os:timestamp(), NewPendingCount}), - NewPendingCount - end. - -get_pending_count_int(#rep_state{source = #httpdb{} = Db0} = St) -> - {_, Seq} = St#rep_state.highest_seq_done, - Db = Db0#httpdb{retries = 3}, - case (catch couch_replicator_api_wrap:get_pending_count(Db, Seq)) of - {ok, Pending} -> - Pending; - _ -> - null - end; -get_pending_count_int(#rep_state{source = Db} = St) -> - {_, Seq} = St#rep_state.highest_seq_done, - {ok, Pending} = couch_replicator_api_wrap:get_pending_count(Db, Seq), - Pending. - -update_task(State) -> - #rep_state{ - rep_details = #rep{id = JobId}, - current_through_seq = {_, ThroughSeq}, - highest_seq_done = {_, HighestSeq} - } = State, - Status = - rep_stats(State) ++ - [ - {source_seq, HighestSeq}, - {through_seq, ThroughSeq} - ], - couch_replicator_scheduler:update_job_stats(JobId, Status), - couch_task_status:update(Status). - -rep_stats(State) -> - #rep_state{ - committed_seq = {_, CommittedSeq}, - stats = Stats - } = State, - [ - {revisions_checked, couch_replicator_stats:missing_checked(Stats)}, - {missing_revisions_found, couch_replicator_stats:missing_found(Stats)}, - {docs_read, couch_replicator_stats:docs_read(Stats)}, - {docs_written, couch_replicator_stats:docs_written(Stats)}, - {changes_pending, get_pending_count(State)}, - {doc_write_failures, couch_replicator_stats:doc_write_failures(Stats)}, - {checkpointed_source_seq, CommittedSeq} - ]. - -replication_start_error({unauthorized, DbUri}) -> - {unauthorized, <<"unauthorized to access or create database ", DbUri/binary>>}; -replication_start_error({db_not_found, DbUri}) -> - {db_not_found, <<"could not open ", DbUri/binary>>}; -replication_start_error( - {http_request_failed, _Method, Url0, {error, {error, {conn_failed, {error, nxdomain}}}}} -) -> - Url = ?l2b(couch_util:url_strip_password(Url0)), - {nxdomain, <<"could not resolve ", Url/binary>>}; -replication_start_error({http_request_failed, Method0, Url0, {error, {code, Code}}}) when - is_integer(Code) --> - Url = ?l2b(couch_util:url_strip_password(Url0)), - Method = ?l2b(Method0), - {http_error_code, Code, <<Method/binary, " ", Url/binary>>}; -replication_start_error(Error) -> - Error. - -log_replication_start(#rep_state{rep_details = Rep} = RepState) -> - #rep{ - id = {BaseId, Ext}, - doc_id = DocId, - db_name = DbName, - options = Options - } = Rep, - Id = BaseId ++ Ext, - Workers = get_value(worker_processes, Options), - BatchSize = get_value(worker_batch_size, Options), - #rep_state{ - % credentials already stripped - source_name = Source, - % credentials already stripped - target_name = Target, - session_id = Sid - } = RepState, - From = - case DbName of - ShardName when is_binary(ShardName) -> - io_lib:format("from doc ~s:~s", [mem3:dbname(ShardName), DocId]); - _ -> - "from _replicate endpoint" - end, - Msg = - "Starting replication ~s (~s -> ~s) ~s worker_procesess:~p" - " worker_batch_size:~p session_id:~s", - couch_log:notice(Msg, [Id, Source, Target, From, Workers, BatchSize, Sid]). - --ifdef(TEST). - --include_lib("eunit/include/eunit.hrl"). - -replication_start_error_test() -> - ?assertEqual( - {unauthorized, << - "unauthorized to access or create database" - " http://x/y" - >>}, - replication_start_error({unauthorized, <<"http://x/y">>}) - ), - ?assertEqual( - {db_not_found, <<"could not open http://x/y">>}, - replication_start_error({db_not_found, <<"http://x/y">>}) - ), - ?assertEqual( - {nxdomain, <<"could not resolve http://x/y">>}, - replication_start_error( - {http_request_failed, "GET", "http://x/y", - {error, {error, {conn_failed, {error, nxdomain}}}}} - ) - ), - ?assertEqual( - {http_error_code, 503, <<"GET http://x/y">>}, - replication_start_error({http_request_failed, "GET", "http://x/y", {error, {code, 503}}}) - ). - -scheduler_job_format_status_test() -> - Source = <<"http://u:p@h1/d1">>, - Target = <<"http://u:p@h2/d2">>, - Rep = #rep{ - id = {"base", "+ext"}, - source = couch_replicator_docs:parse_rep_db(Source, [], []), - target = couch_replicator_docs:parse_rep_db(Target, [], []), - options = [{create_target, true}], - doc_id = <<"mydoc">>, - db_name = <<"mydb">> - }, - State = #rep_state{ - rep_details = Rep, - source = Rep#rep.source, - target = Rep#rep.target, - session_id = <<"a">>, - start_seq = <<"1">>, - source_seq = <<"2">>, - committed_seq = <<"3">>, - current_through_seq = <<"4">>, - highest_seq_done = <<"5">> - }, - Format = format_status(opts_ignored, [pdict, State]), - ?assertEqual("http://h1/d1/", proplists:get_value(source, Format)), - ?assertEqual("http://h2/d2/", proplists:get_value(target, Format)), - ?assertEqual({"base", "+ext"}, proplists:get_value(rep_id, Format)), - ?assertEqual([{create_target, true}], proplists:get_value(options, Format)), - ?assertEqual(<<"mydoc">>, proplists:get_value(doc_id, Format)), - ?assertEqual(<<"mydb">>, proplists:get_value(db_name, Format)), - ?assertEqual(<<"a">>, proplists:get_value(session_id, Format)), - ?assertEqual(<<"1">>, proplists:get_value(start_seq, Format)), - ?assertEqual(<<"2">>, proplists:get_value(source_seq, Format)), - ?assertEqual(<<"3">>, proplists:get_value(committed_seq, Format)), - ?assertEqual(<<"4">>, proplists:get_value(current_through_seq, Format)), - ?assertEqual(<<"5">>, proplists:get_value(highest_seq_done, Format)). - --endif. |