diff options
author | Nick Vatamaniuc <vatamane@apache.org> | 2020-08-28 04:33:18 -0400 |
---|---|---|
committer | Nick Vatamaniuc <nickva@users.noreply.github.com> | 2020-09-15 16:13:46 -0400 |
commit | 3c9b7540cbb41225b35c89b741e0c5b83cdbf4e1 (patch) | |
tree | be491ab99fc2f3c8b687ea612d91e6e7e3a923fc | |
parent | 276d19731bc5df73838f40efc126f1f709e04fbe (diff) | |
download | couchdb-3c9b7540cbb41225b35c89b741e0c5b83cdbf4e1.tar.gz |
Introduce couch_replicator_jobs abstraction module
This is the `couch_jobs` abstraction module. All replicator calls to
`couch_jobs` should go through it. This module takes care of adding types to
some of the API calls, handles maintencence of the RepId -> JobId mappings when
jobs are added and removed, and some subscription logic.
`fabric2.hrl` include file is updated with the definition of the
`?REPLICATION_IDS` prefix where the RepId -> JobId keyspace lives.
-rw-r--r-- | src/couch_replicator/src/couch_replicator_jobs.erl | 312 | ||||
-rw-r--r-- | src/fabric/include/fabric2.hrl | 1 |
2 files changed, 313 insertions, 0 deletions
diff --git a/src/couch_replicator/src/couch_replicator_jobs.erl b/src/couch_replicator/src/couch_replicator_jobs.erl new file mode 100644 index 000000000..a602b0c62 --- /dev/null +++ b/src/couch_replicator/src/couch_replicator_jobs.erl @@ -0,0 +1,312 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(couch_replicator_jobs). + + +-export([ + % couch_jobs type timeouts + set_timeout/0, + get_timeout/0, + + % Job creation and querying + new_job/7, + add_job/3, + remove_job/2, + get_job_data/2, + fold_jobs/3, + pending_count/2, + + % Job subscription + wait_running/1, + wait_result/1, + + % Job execution + accept_job/1, + update_job_data/3, + finish_job/3, + reschedule_job/4, + + % (..., ?REPLICATION_IDS) -> JobId handling + try_update_rep_id/3, + update_rep_id/3, + clear_old_rep_id/3, + get_job_id/2, + + % Debug functions + remove_jobs/2, + get_job_ids/1 +]). + + +-include("couch_replicator.hrl"). +-include_lib("fabric/include/fabric2.hrl"). + + +-define(REP_JOBS, <<"rep_jobs">>). +-define(REP_JOBS_TIMEOUT_SEC, 61). + + +% Data model +% ---------- +% +% State kept in couch_jobs under the ?REP_JOBS type +% +% Job IDs are defined as: +% * Replicator DB instance UUID + doc ID for persistent replications +% * Hash(username|source|target|options) for transient replications +% +% To map replication IDs to couch_job jobs, there is a separate index that +% looks like: +% (?REPLICATION_IDS, RepId) -> JobId +% + +set_timeout() -> + couch_jobs:set_type_timeout(?REP_JOBS, ?REP_JOBS_TIMEOUT_SEC). + + +get_timeout() -> + ?REP_JOBS_TIMEOUT_SEC. + + +new_job(#{} = Rep, DbName, DbUUID, DocId, State, StateInfo, DocState) -> + NowSec = erlang:system_time(second), + AddedEvent = #{?HIST_TYPE => ?HIST_ADDED, ?HIST_TIMESTAMP => NowSec}, + #{ + ?REP => Rep, + ?REP_ID => null, + ?BASE_ID => null, + ?DB_NAME => DbName, + ?DB_UUID => DbUUID, + ?DOC_ID => DocId, + ?ERROR_COUNT => 0, + ?REP_STATS => #{}, + ?STATE => State, + ?STATE_INFO => StateInfo, + ?DOC_STATE => DocState, + ?LAST_UPDATED => NowSec, + ?LAST_START => 0, + ?LAST_ERROR => null, + ?REP_NODE => null, + ?REP_PID => null, + ?JOB_HISTORY => [AddedEvent], + ?CHECKPOINT_HISTORY => [] + }. + + +add_job(Tx, JobId, JobData) -> + couch_stats:increment_counter([couch_replicator, jobs, adds]), + couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(Tx), fun(JTx) -> + case couch_jobs:get_job_data(JTx, ?REP_JOBS, JobId) of + {ok, #{} = OldData} -> + ok = remove_job(JTx, JobId, OldData); + {error, not_found} -> + ok + end, + ok = couch_jobs:add(JTx, ?REP_JOBS, JobId, JobData) + end). + + +remove_job(Tx, JobId) -> + couch_stats:increment_counter([couch_replicator, jobs, removes]), + couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(Tx), fun(JTx) -> + case couch_jobs:get_job_data(JTx, ?REP_JOBS, JobId) of + {ok, #{} = JobData} -> + ok = remove_job(JTx, JobId, JobData); + {error, not_found} -> + ok + end + end). + + +get_job_data(Tx, JobId) -> + couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(Tx), fun(JTx) -> + couch_jobs:get_job_data(JTx, ?REP_JOBS, JobId) + end). + + +% UserFun = fun(JTx, JobId, JobState, JobData, UserAcc) +% +fold_jobs(Tx, UserFun, Acc) when is_function(UserFun, 5) -> + couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(Tx), fun(JTx) -> + couch_jobs:fold_jobs(JTx, ?REP_JOBS, UserFun, Acc) + end). + + +pending_count(_Tx, Limit) when is_integer(Limit), Limit =< 0 -> + 0; + +pending_count(Tx, Limit) when is_integer(Limit), Limit > 0 -> + Opts = #{ + max_sched_time => erlang:system_time(second), + limit => Limit + }, + couch_jobs:pending_count(Tx, ?REP_JOBS, Opts). + + +wait_running(JobId) -> + case couch_jobs:subscribe(?REP_JOBS, JobId) of + {ok, finished, JobData} -> + {ok, JobData}; + {ok, SubId, running, #{?STATE := ?ST_PENDING}} -> + wait_running(JobId, SubId); + {ok, SubId, running, JobData} -> + ok = couch_jobs:unsubscribe(SubId), + {ok, JobData}; + {ok, SubId, pending, _} -> + wait_running(JobId, SubId); + {error, Error} -> + {error, Error} + end. + + +wait_running(JobId, SubId) -> + case couch_jobs:wait(SubId, running, infinity) of + {?REP_JOBS, _, running, #{?STATE := ?ST_PENDING}} -> + wait_running(JobId, SubId); + {?REP_JOBS, _, running, JobData} -> + ok = couch_jobs:unsubscribe(SubId), + {ok, JobData}; + {?REP_JOBS, _, finished, JobData} -> + ok = couch_jobs:unsubscribe(SubId), + {ok, JobData} + end. + + +wait_result(JobId) -> + case couch_jobs:subscribe(?REP_JOBS, JobId) of + {ok, finished, JobData} -> + {ok, JobData}; + {ok, SubId, _, _} -> + {?REP_JOBS, _, finished, JobData} = couch_jobs:wait(SubId, + finished, infinity), + {ok, JobData}; + {error, Error} -> + {error, Error} + end. + + +accept_job(MaxSchedTime) when is_integer(MaxSchedTime) -> + Opts = #{max_sched_time => MaxSchedTime}, + couch_jobs:accept(?REP_JOBS, Opts). + + +update_job_data(Tx, #{} = Job, #{} = JobData) -> + couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(Tx), fun(JTx) -> + couch_jobs:update(JTx, Job, JobData) + end). + + +finish_job(Tx, #{} = Job, #{} = JobData) -> + couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(Tx), fun(JTx) -> + couch_jobs:finish(JTx, Job, JobData) + end). + + +reschedule_job(Tx, #{} = Job, #{} = JobData, Time) -> + couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(Tx), fun(JTx) -> + {ok, Job1} = couch_jobs:resubmit(JTx, Job, Time), + ok = couch_jobs:finish(JTx, Job1, JobData) + end). + + +try_update_rep_id(Tx, JobId, RepId) -> + couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(Tx), fun(JTx) -> + #{tx := ErlFdbTx, layer_prefix := LayerPrefix} = JTx, + Key = erlfdb_tuple:pack({?REPLICATION_IDS, RepId}, LayerPrefix), + case get_job_id(JTx, RepId) of + {error, not_found} -> + ok = erlfdb:set(ErlFdbTx, Key, JobId); + {ok, JobId} -> + ok; + {ok, OtherJobId} when is_binary(OtherJobId) -> + {error, {replication_job_conflict, OtherJobId}} + end + end). + + +update_rep_id(Tx, JobId, RepId) -> + couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(Tx), fun(JTx) -> + #{tx := ErlFdbTx, layer_prefix := LayerPrefix} = JTx, + Key = erlfdb_tuple:pack({?REPLICATION_IDS, RepId}, LayerPrefix), + ok = erlfdb:set(ErlFdbTx, Key, JobId) + end). + + +clear_old_rep_id(_, _, null) -> + ok; + +clear_old_rep_id(Tx, JobId, RepId) -> + couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(Tx), fun(JTx) -> + #{tx := ErlFdbTx, layer_prefix := LayerPrefix} = JTx, + Key = erlfdb_tuple:pack({?REPLICATION_IDS, RepId}, LayerPrefix), + case get_job_id(JTx, RepId) of + {error, not_found} -> + ok; + {ok, JobId} -> + ok = erlfdb:clear(ErlFdbTx, Key); + {ok, OtherJobId} when is_binary(OtherJobId) -> + ok + end + end). + + +get_job_id(Tx, RepId) -> + couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(Tx), fun(JTx) -> + #{tx := ErlFdbTx, layer_prefix := LayerPrefix} = JTx, + Key = erlfdb_tuple:pack({?REPLICATION_IDS, RepId}, LayerPrefix), + case erlfdb:wait(erlfdb:get(ErlFdbTx, Key)) of + not_found -> + {error, not_found}; + <<_/binary>> = JobId -> + {ok, JobId} + end + end). + + +% Debug functions + +remove_jobs(Tx, JobIds) when is_list(JobIds) -> + couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(Tx), fun(JTx) -> + lists:foreach(fun(JobId) -> remove_job(JTx, JobId) end, JobIds) + end), + []. + + +get_job_ids(Tx) -> + couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(Tx), fun(JTx) -> + #{tx := ErlFdbTx, layer_prefix := LayerPrefix} = JTx, + Prefix = erlfdb_tuple:pack({?REPLICATION_IDS}, LayerPrefix), + KVs = erlfdb:wait(erlfdb:get_range_startswith(ErlFdbTx, Prefix)), + lists:map(fun({K, JobId}) -> + {RepId} = erlfdb_tuple:unpack(K, Prefix), + {RepId, JobId} + end, KVs) + end). + + +% Private functions + +remove_job(#{jtx := true} = JTx, JobId, OldJobData) -> + #{tx := Tx, layer_prefix := LayerPrefix} = JTx, + case OldJobData of + #{?REP_ID := null} -> + couch_jobs:remove(JTx, ?REP_JOBS, JobId); + #{?REP_ID := RepId} when is_binary(RepId) -> + Key = erlfdb_tuple:pack({?REPLICATION_IDS, RepId}, LayerPrefix), + case erlfdb:wait(erlfdb:get(Tx, Key)) of + not_found -> ok; + JobId -> erlfdb:clear(Tx, Key); + <<_/binary>> -> ok + end, + couch_jobs:remove(JTx, ?REP_JOBS, JobId) + end. diff --git a/src/fabric/include/fabric2.hrl b/src/fabric/include/fabric2.hrl index 2eba4d5eb..ebbb7c7c5 100644 --- a/src/fabric/include/fabric2.hrl +++ b/src/fabric/include/fabric2.hrl @@ -26,6 +26,7 @@ -define(DELETED_DBS, 3). -define(DBS, 15). -define(EXPIRING_CACHE, 53). +-define(REPLICATION_IDS, 54). -define(TX_IDS, 255). % Cluster Level: (LayerPrefix, ?CLUSTER_CONFIG, X, ...) |