summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJan Lehnardt <jan@apache.org>2020-07-26 20:06:38 +0200
committerJan Lehnardt <jan@apache.org>2020-07-26 20:09:36 +0200
commit4de5f6616864e5812b3046987c34ef45d2ada554 (patch)
tree5892c93a628cf2c80b6fb8961f26dc65b13b80e3
parent5650b03693c43015825ccd831652a58add83f0a3 (diff)
downloadcouchdb-4de5f6616864e5812b3046987c34ef45d2ada554.tar.gz
feat(replicator): add access support to replicator
-rw-r--r--src/couch_replicator/src/couch_replicator.erl8
-rw-r--r--src/couch_replicator/src/couch_replicator_scheduler_job.erl30
-rw-r--r--src/couch_replicator/test/eunit/couch_replicator_error_reporting_tests.erl6
3 files changed, 35 insertions, 9 deletions
diff --git a/src/couch_replicator/src/couch_replicator.erl b/src/couch_replicator/src/couch_replicator.erl
index b38f31b59..a50967412 100644
--- a/src/couch_replicator/src/couch_replicator.erl
+++ b/src/couch_replicator/src/couch_replicator.erl
@@ -72,7 +72,13 @@ replicate(PostBody, Ctx) ->
false ->
check_authorization(RepId, UserCtx),
{ok, Listener} = rep_result_listener(RepId),
- Result = do_replication_loop(Rep),
+ Result = case do_replication_loop(Rep) of
+ {ok, {ResultJson}} ->
+ {PublicRepId, _} = couch_replicator_ids:replication_id(Rep), % TODO: check with options
+ {ok, {[{<<"replication_id">>, ?l2b(PublicRepId)} | ResultJson]}};
+ Else ->
+ Else
+ end,
couch_replicator_notifier:stop(Listener),
Result
end.
diff --git a/src/couch_replicator/src/couch_replicator_scheduler_job.erl b/src/couch_replicator/src/couch_replicator_scheduler_job.erl
index 0b33419e1..c18fe2018 100644
--- a/src/couch_replicator/src/couch_replicator_scheduler_job.erl
+++ b/src/couch_replicator/src/couch_replicator_scheduler_job.erl
@@ -66,6 +66,8 @@
rep_starttime,
src_starttime,
tgt_starttime,
+ src_access,
+ tgt_access,
timer, % checkpoint timer
changes_queue,
changes_manager,
@@ -610,6 +612,8 @@ init_state(Rep) ->
rep_starttime = StartTime,
src_starttime = get_value(<<"instance_start_time">>, SourceInfo),
tgt_starttime = get_value(<<"instance_start_time">>, TargetInfo),
+ src_access = get_value(<<"access">>, SourceInfo),
+ tgt_access = get_value(<<"access">>, TargetInfo),
session_id = couch_uuids:random(),
source_seq = SourceSeq,
use_checkpoints = get_value(use_checkpoints, Options, true),
@@ -713,8 +717,10 @@ do_checkpoint(State) ->
rep_starttime = ReplicationStartTime,
src_starttime = SrcInstanceStartTime,
tgt_starttime = TgtInstanceStartTime,
+ src_access = SrcAccess,
+ tgt_access = TgtAccess,
stats = Stats,
- rep_details = #rep{options = Options},
+ rep_details = #rep{options = Options, user_ctx = UserCtx},
session_id = SessionId
} = State,
case commit_to_both(Source, Target) of
@@ -770,9 +776,9 @@ do_checkpoint(State) ->
try
{SrcRevPos, SrcRevId} = update_checkpoint(
- Source, SourceLog#doc{body = NewRepHistory}, source),
+ Source, SourceLog#doc{body = NewRepHistory}, SrcAccess, UserCtx, source),
{TgtRevPos, TgtRevId} = update_checkpoint(
- Target, TargetLog#doc{body = NewRepHistory}, target),
+ Target, TargetLog#doc{body = NewRepHistory}, TgtAccess, UserCtx, target),
NewState = State#rep_state{
checkpoint_history = NewRepHistory,
committed_seq = NewTsSeq,
@@ -797,16 +803,28 @@ do_checkpoint(State) ->
update_checkpoint(Db, Doc, DbType) ->
+ update_checkpoint(Db, Doc, false, #user_ctx{}, DbType).
+
+update_checkpoint(Db, Doc) ->
+ update_checkpoint(Db, Doc, false, #user_ctx{}).
+
+update_checkpoint(Db, Doc, Access, UserCtx, DbType) ->
try
- update_checkpoint(Db, Doc)
+ update_checkpoint(Db, Doc, Access, UserCtx)
catch throw:{checkpoint_commit_failure, Reason} ->
throw({checkpoint_commit_failure,
<<"Error updating the ", (to_binary(DbType))/binary,
" checkpoint document: ", (to_binary(Reason))/binary>>})
end.
+update_checkpoint(Db, #doc{id = LogId} = Doc0, Access, UserCtx) ->
+ % if db has _access, then:
+ % get userCtx from replication and splice into doc _access
+ Doc = case Access of
+ true -> Doc0#doc{access = [UserCtx#user_ctx.name]};
+ _False -> Doc0
+ end,
-update_checkpoint(Db, #doc{id = LogId, body = LogBody} = Doc) ->
try
case couch_replicator_api_wrap:update_doc(Db, Doc, [delay_commit]) of
{ok, PosRevId} ->
@@ -815,6 +833,8 @@ update_checkpoint(Db, #doc{id = LogId, body = LogBody} = Doc) ->
throw({checkpoint_commit_failure, Reason})
end
catch throw:conflict ->
+ % TODO: An admin could have changed the access on the checkpoint doc.
+ % However unlikely, we can handle this gracefully here.
case (catch couch_replicator_api_wrap:open_doc(Db, LogId, [ejson_body])) of
{ok, #doc{body = LogBody, revs = {Pos, [RevId | _]}}} ->
% This means that we were able to update successfully the
diff --git a/src/couch_replicator/test/eunit/couch_replicator_error_reporting_tests.erl b/src/couch_replicator/test/eunit/couch_replicator_error_reporting_tests.erl
index 6b4f95c25..be15bd3c8 100644
--- a/src/couch_replicator/test/eunit/couch_replicator_error_reporting_tests.erl
+++ b/src/couch_replicator/test/eunit/couch_replicator_error_reporting_tests.erl
@@ -119,7 +119,7 @@ t_fail_changes_queue({Source, Target}) ->
RepPid = couch_replicator_test_helper:get_pid(RepId),
State = sys:get_state(RepPid),
- ChangesQueue = element(20, State),
+ ChangesQueue = element(22, State),
?assert(is_process_alive(ChangesQueue)),
{ok, Listener} = rep_result_listener(RepId),
@@ -139,7 +139,7 @@ t_fail_changes_manager({Source, Target}) ->
RepPid = couch_replicator_test_helper:get_pid(RepId),
State = sys:get_state(RepPid),
- ChangesManager = element(21, State),
+ ChangesManager = element(23, State),
?assert(is_process_alive(ChangesManager)),
{ok, Listener} = rep_result_listener(RepId),
@@ -159,7 +159,7 @@ t_fail_changes_reader_proc({Source, Target}) ->
RepPid = couch_replicator_test_helper:get_pid(RepId),
State = sys:get_state(RepPid),
- ChangesReader = element(22, State),
+ ChangesReader = element(24, State),
?assert(is_process_alive(ChangesReader)),
{ok, Listener} = rep_result_listener(RepId),