diff options
author | Jan Lehnardt <jan@apache.org> | 2020-07-26 20:06:38 +0200 |
---|---|---|
committer | Jan Lehnardt <jan@apache.org> | 2020-07-26 20:09:36 +0200 |
commit | 4de5f6616864e5812b3046987c34ef45d2ada554 (patch) | |
tree | 5892c93a628cf2c80b6fb8961f26dc65b13b80e3 | |
parent | 5650b03693c43015825ccd831652a58add83f0a3 (diff) | |
download | couchdb-4de5f6616864e5812b3046987c34ef45d2ada554.tar.gz |
feat(replicator): add access support to replicator
3 files changed, 35 insertions, 9 deletions
diff --git a/src/couch_replicator/src/couch_replicator.erl b/src/couch_replicator/src/couch_replicator.erl index b38f31b59..a50967412 100644 --- a/src/couch_replicator/src/couch_replicator.erl +++ b/src/couch_replicator/src/couch_replicator.erl @@ -72,7 +72,13 @@ replicate(PostBody, Ctx) -> false -> check_authorization(RepId, UserCtx), {ok, Listener} = rep_result_listener(RepId), - Result = do_replication_loop(Rep), + Result = case do_replication_loop(Rep) of + {ok, {ResultJson}} -> + {PublicRepId, _} = couch_replicator_ids:replication_id(Rep), % TODO: check with options + {ok, {[{<<"replication_id">>, ?l2b(PublicRepId)} | ResultJson]}}; + Else -> + Else + end, couch_replicator_notifier:stop(Listener), Result end. diff --git a/src/couch_replicator/src/couch_replicator_scheduler_job.erl b/src/couch_replicator/src/couch_replicator_scheduler_job.erl index 0b33419e1..c18fe2018 100644 --- a/src/couch_replicator/src/couch_replicator_scheduler_job.erl +++ b/src/couch_replicator/src/couch_replicator_scheduler_job.erl @@ -66,6 +66,8 @@ rep_starttime, src_starttime, tgt_starttime, + src_access, + tgt_access, timer, % checkpoint timer changes_queue, changes_manager, @@ -610,6 +612,8 @@ init_state(Rep) -> rep_starttime = StartTime, src_starttime = get_value(<<"instance_start_time">>, SourceInfo), tgt_starttime = get_value(<<"instance_start_time">>, TargetInfo), + src_access = get_value(<<"access">>, SourceInfo), + tgt_access = get_value(<<"access">>, TargetInfo), session_id = couch_uuids:random(), source_seq = SourceSeq, use_checkpoints = get_value(use_checkpoints, Options, true), @@ -713,8 +717,10 @@ do_checkpoint(State) -> rep_starttime = ReplicationStartTime, src_starttime = SrcInstanceStartTime, tgt_starttime = TgtInstanceStartTime, + src_access = SrcAccess, + tgt_access = TgtAccess, stats = Stats, - rep_details = #rep{options = Options}, + rep_details = #rep{options = Options, user_ctx = UserCtx}, session_id = SessionId } = State, case commit_to_both(Source, Target) of @@ -770,9 +776,9 @@ do_checkpoint(State) -> try {SrcRevPos, SrcRevId} = update_checkpoint( - Source, SourceLog#doc{body = NewRepHistory}, source), + Source, SourceLog#doc{body = NewRepHistory}, SrcAccess, UserCtx, source), {TgtRevPos, TgtRevId} = update_checkpoint( - Target, TargetLog#doc{body = NewRepHistory}, target), + Target, TargetLog#doc{body = NewRepHistory}, TgtAccess, UserCtx, target), NewState = State#rep_state{ checkpoint_history = NewRepHistory, committed_seq = NewTsSeq, @@ -797,16 +803,28 @@ do_checkpoint(State) -> update_checkpoint(Db, Doc, DbType) -> + update_checkpoint(Db, Doc, false, #user_ctx{}, DbType). + +update_checkpoint(Db, Doc) -> + update_checkpoint(Db, Doc, false, #user_ctx{}). + +update_checkpoint(Db, Doc, Access, UserCtx, DbType) -> try - update_checkpoint(Db, Doc) + update_checkpoint(Db, Doc, Access, UserCtx) catch throw:{checkpoint_commit_failure, Reason} -> throw({checkpoint_commit_failure, <<"Error updating the ", (to_binary(DbType))/binary, " checkpoint document: ", (to_binary(Reason))/binary>>}) end. +update_checkpoint(Db, #doc{id = LogId} = Doc0, Access, UserCtx) -> + % if db has _access, then: + % get userCtx from replication and splice into doc _access + Doc = case Access of + true -> Doc0#doc{access = [UserCtx#user_ctx.name]}; + _False -> Doc0 + end, -update_checkpoint(Db, #doc{id = LogId, body = LogBody} = Doc) -> try case couch_replicator_api_wrap:update_doc(Db, Doc, [delay_commit]) of {ok, PosRevId} -> @@ -815,6 +833,8 @@ update_checkpoint(Db, #doc{id = LogId, body = LogBody} = Doc) -> throw({checkpoint_commit_failure, Reason}) end catch throw:conflict -> + % TODO: An admin could have changed the access on the checkpoint doc. + % However unlikely, we can handle this gracefully here. case (catch couch_replicator_api_wrap:open_doc(Db, LogId, [ejson_body])) of {ok, #doc{body = LogBody, revs = {Pos, [RevId | _]}}} -> % This means that we were able to update successfully the diff --git a/src/couch_replicator/test/eunit/couch_replicator_error_reporting_tests.erl b/src/couch_replicator/test/eunit/couch_replicator_error_reporting_tests.erl index 6b4f95c25..be15bd3c8 100644 --- a/src/couch_replicator/test/eunit/couch_replicator_error_reporting_tests.erl +++ b/src/couch_replicator/test/eunit/couch_replicator_error_reporting_tests.erl @@ -119,7 +119,7 @@ t_fail_changes_queue({Source, Target}) -> RepPid = couch_replicator_test_helper:get_pid(RepId), State = sys:get_state(RepPid), - ChangesQueue = element(20, State), + ChangesQueue = element(22, State), ?assert(is_process_alive(ChangesQueue)), {ok, Listener} = rep_result_listener(RepId), @@ -139,7 +139,7 @@ t_fail_changes_manager({Source, Target}) -> RepPid = couch_replicator_test_helper:get_pid(RepId), State = sys:get_state(RepPid), - ChangesManager = element(21, State), + ChangesManager = element(23, State), ?assert(is_process_alive(ChangesManager)), {ok, Listener} = rep_result_listener(RepId), @@ -159,7 +159,7 @@ t_fail_changes_reader_proc({Source, Target}) -> RepPid = couch_replicator_test_helper:get_pid(RepId), State = sys:get_state(RepPid), - ChangesReader = element(22, State), + ChangesReader = element(24, State), ?assert(is_process_alive(ChangesReader)), {ok, Listener} = rep_result_listener(RepId), |