summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJan Lehnardt <jan@apache.org>2022-06-27 10:54:36 +0200
committerJan Lehnardt <jan@apache.org>2022-12-16 16:56:18 +0100
commitbcd3ccec14ebf26a0a6ac60e557b1b4071100b5e (patch)
tree5547006a8533372883d46efcabee8f2ed235d4e2
parent9af77d9915bb2f5666f7f7d385bf48aa84627e74 (diff)
downloadcouchdb-bcd3ccec14ebf26a0a6ac60e557b1b4071100b5e.tar.gz
feat(access): add access handling to replicator
-rw-r--r--src/couch_replicator/src/couch_replicator.erl8
-rw-r--r--src/couch_replicator/src/couch_replicator_scheduler_job.erl31
-rw-r--r--src/couch_replicator/test/eunit/couch_replicator_error_reporting_tests.erl6
3 files changed, 34 insertions, 11 deletions
diff --git a/src/couch_replicator/src/couch_replicator.erl b/src/couch_replicator/src/couch_replicator.erl
index 935daaa80..ac3807d11 100644
--- a/src/couch_replicator/src/couch_replicator.erl
+++ b/src/couch_replicator/src/couch_replicator.erl
@@ -77,7 +77,13 @@ replicate(PostBody, Ctx) ->
false ->
check_authorization(RepId, UserCtx),
{ok, Listener} = rep_result_listener(RepId),
- Result = do_replication_loop(Rep),
+ Result = case do_replication_loop(Rep) of % TODO: review why we need this
+ {ok, {ResultJson}} ->
+ {PublicRepId, _} = couch_replicator_ids:replication_id(Rep), % TODO: check with options
+ {ok, {[{<<"replication_id">>, ?l2b(PublicRepId)} | ResultJson]}};
+ Else ->
+ Else
+ end,
couch_replicator_notifier:stop(Listener),
Result
end.
diff --git a/src/couch_replicator/src/couch_replicator_scheduler_job.erl b/src/couch_replicator/src/couch_replicator_scheduler_job.erl
index 416220efd..ac979d37c 100644
--- a/src/couch_replicator/src/couch_replicator_scheduler_job.erl
+++ b/src/couch_replicator/src/couch_replicator_scheduler_job.erl
@@ -66,6 +66,8 @@
rep_starttime,
src_starttime,
tgt_starttime,
+ src_access,
+ tgt_access,
% checkpoint timer
timer,
changes_queue,
@@ -676,6 +678,8 @@ init_state(Rep) ->
rep_starttime = StartTime,
src_starttime = get_value(<<"instance_start_time">>, SourceInfo),
tgt_starttime = get_value(<<"instance_start_time">>, TargetInfo),
+ src_access = get_value(<<"access">>, SourceInfo),
+ tgt_access = get_value(<<"access">>, TargetInfo),
session_id = couch_uuids:random(),
source_seq = SourceSeq,
use_checkpoints = get_value(use_checkpoints, Options, true),
@@ -788,8 +792,10 @@ do_checkpoint(State) ->
rep_starttime = ReplicationStartTime,
src_starttime = SrcInstanceStartTime,
tgt_starttime = TgtInstanceStartTime,
+ src_access = SrcAccess,
+ tgt_access = TgtAccess,
stats = Stats,
- rep_details = #rep{options = Options},
+ rep_details = #rep{options = Options, user_ctx = UserCtx},
session_id = SessionId
} = State,
case commit_to_both(Source, Target) of
@@ -853,11 +859,9 @@ do_checkpoint(State) ->
try
{SrcRevPos, SrcRevId} = update_checkpoint(
- Source, SourceLog#doc{body = NewRepHistory}, source
- ),
+ Source, SourceLog#doc{body = NewRepHistory}, SrcAccess, UserCtx, source),
{TgtRevPos, TgtRevId} = update_checkpoint(
- Target, TargetLog#doc{body = NewRepHistory}, target
- ),
+ Target, TargetLog#doc{body = NewRepHistory}, TgtAccess, UserCtx, target),
NewState = State#rep_state{
checkpoint_history = NewRepHistory,
committed_seq = NewTsSeq,
@@ -885,8 +889,12 @@ do_checkpoint(State) ->
end.
update_checkpoint(Db, Doc, DbType) ->
+ update_checkpoint(Db, Doc, false, #user_ctx{}, DbType).
+update_checkpoint(Db, Doc) ->
+ update_checkpoint(Db, Doc, false, #user_ctx{}).
+update_checkpoint(Db, Doc, Access, UserCtx, DbType) ->
try
- update_checkpoint(Db, Doc)
+ update_checkpoint(Db, Doc, Access, UserCtx)
catch
throw:{checkpoint_commit_failure, Reason} ->
throw(
@@ -896,7 +904,14 @@ update_checkpoint(Db, Doc, DbType) ->
)
end.
-update_checkpoint(Db, #doc{id = LogId, body = LogBody} = Doc) ->
+update_checkpoint(Db, #doc{id = LogId} = Doc0, Access, UserCtx) ->
+ % if db has _access, then:
+ % get userCtx from replication and splice into doc _access
+ Doc = case Access of
+ true -> Doc0#doc{access = [UserCtx#user_ctx.name]};
+ _False -> Doc0
+ end,
+
try
case couch_replicator_api_wrap:update_doc(Db, Doc, [delay_commit]) of
{ok, PosRevId} ->
@@ -906,6 +921,8 @@ update_checkpoint(Db, #doc{id = LogId, body = LogBody} = Doc) ->
end
catch
throw:conflict ->
+ % TODO: An admin could have changed the access on the checkpoint doc.
+ % However unlikely, we can handle this gracefully here.
case (catch couch_replicator_api_wrap:open_doc(Db, LogId, [ejson_body])) of
{ok, #doc{body = LogBody, revs = {Pos, [RevId | _]}}} ->
% This means that we were able to update successfully the
diff --git a/src/couch_replicator/test/eunit/couch_replicator_error_reporting_tests.erl b/src/couch_replicator/test/eunit/couch_replicator_error_reporting_tests.erl
index dd6609941..618e3013a 100644
--- a/src/couch_replicator/test/eunit/couch_replicator_error_reporting_tests.erl
+++ b/src/couch_replicator/test/eunit/couch_replicator_error_reporting_tests.erl
@@ -108,7 +108,7 @@ t_fail_changes_queue({_Ctx, {Source, Target}}) ->
RepPid = couch_replicator_test_helper:get_pid(RepId),
State = sys:get_state(RepPid),
- ChangesQueue = element(20, State),
+ ChangesQueue = element(22, State),
?assert(is_process_alive(ChangesQueue)),
{ok, Listener} = rep_result_listener(RepId),
@@ -125,7 +125,7 @@ t_fail_changes_manager({_Ctx, {Source, Target}}) ->
RepPid = couch_replicator_test_helper:get_pid(RepId),
State = sys:get_state(RepPid),
- ChangesManager = element(21, State),
+ ChangesManager = element(23, State),
?assert(is_process_alive(ChangesManager)),
{ok, Listener} = rep_result_listener(RepId),
@@ -142,7 +142,7 @@ t_fail_changes_reader_proc({_Ctx, {Source, Target}}) ->
RepPid = couch_replicator_test_helper:get_pid(RepId),
State = sys:get_state(RepPid),
- ChangesReader = element(22, State),
+ ChangesReader = element(24, State),
?assert(is_process_alive(ChangesReader)),
{ok, Listener} = rep_result_listener(RepId),