- replicate/2,
- replication_states/0,
- job/1,
- doc/3,
- active_doc/2,
- info_from_doc/2,
- restart_job/1
- % Just added to scheduler
- initializing,
- % Could not be turned into a replication job
- error,
- % Scheduled and running
- running,
- % Scheduled and waiting to run
- pending,
- % Scheduled but crashing, backed off by the scheduler
- crashing,
- % Non-continuous (normal) completed replication
- completed,
- % Terminal failure, will not be retried anymore
- failed
--import(couch_util, [
- get_value/2,
- get_value/3
--spec replicate({[_]}, any()) ->
- {ok, {continuous, binary()}}
- | {ok, {[_]}}
- | {ok, {cancelled, binary()}}
- | {error, any()}
- | no_return().
-replicate(PostBody, Ctx) ->
- {ok, Rep0} = couch_replicator_utils:parse_rep_doc(PostBody, Ctx),
- Rep = Rep0#rep{start_time = os:timestamp()},
- #rep{id = RepId, options = Options, user_ctx = UserCtx} = Rep,
- case get_value(cancel, Options, false) of
- true ->
- CancelRepId =
- case get_value(id, Options, nil) of
- nil ->
- RepId;
- RepId2 ->
- RepId2
- end,
- case check_authorization(CancelRepId, UserCtx) of
- ok ->
- cancel_replication(CancelRepId);
- not_found ->
- {error, not_found}
- end;
- false ->
- check_authorization(RepId, UserCtx),
- {ok, Listener} = rep_result_listener(RepId),
- Result = do_replication_loop(Rep),
- couch_replicator_notifier:stop(Listener),
- Result
- end.
--spec do_replication_loop(#rep{}) ->
- {ok, {continuous, binary()}} | {ok, tuple()} | {error, any()}.
-do_replication_loop(#rep{id = {BaseId, Ext} = Id, options = Options} = Rep) ->
- ok = couch_replicator_scheduler:add_job(Rep),
- case get_value(continuous, Options, false) of
- true ->
- {ok, {continuous, ?l2b(BaseId ++ Ext)}};
- false ->
- wait_for_result(Id)
- end.
--spec rep_result_listener(rep_id()) -> {ok, pid()}.
-rep_result_listener(RepId) ->
- ReplyTo = self(),
- {ok, _Listener} = couch_replicator_notifier:start_link(
- fun
- ({_, RepId2, _} = Ev) when RepId2 =:= RepId ->
- ReplyTo ! Ev;
- (_) ->
- ok
- end
- ).
--spec wait_for_result(rep_id()) ->
- {ok, {[_]}} | {error, any()}.
-wait_for_result(RepId) ->
- receive
- {finished, RepId, RepResult} ->
- {ok, RepResult};
- {error, RepId, Reason} ->
- {error, Reason}
- end.
--spec cancel_replication(rep_id()) ->
- {ok, {cancelled, binary()}} | {error, not_found}.
-cancel_replication({BasedId, Extension} = RepId) ->
- FullRepId = BasedId ++ Extension,
- couch_log:notice("Canceling replication '~s' ...", [FullRepId]),
- case couch_replicator_scheduler:rep_state(RepId) of
- #rep{} ->
- ok = couch_replicator_scheduler:remove_job(RepId),
- couch_log:notice("Replication '~s' cancelled", [FullRepId]),
- {ok, {cancelled, ?l2b(FullRepId)}};
- nil ->
- couch_log:notice("Replication '~s' not found", [FullRepId]),
- {error, not_found}
- end.
--spec replication_states() -> [atom()].
-replication_states() ->
--spec strip_url_creds(binary() | {[_]}) -> binary().
-strip_url_creds(Endpoint) ->
- try couch_replicator_docs:parse_rep_db(Endpoint, [], []) of
- #httpdb{url = Url} ->
- iolist_to_binary(couch_util:url_strip_password(Url))
- catch
- throw:{error, local_endpoints_not_supported} ->
- Endpoint;
- error:_ ->
- % Avoid exposing any part of the URL in case there is a password in
- % the malformed endpoint URL
- null
- end.
--spec job(binary()) -> {ok, {[_]}} | {error, not_found}.
-job(JobId0) when is_binary(JobId0) ->
- JobId = couch_replicator_ids:convert(JobId0),
- {Res, _Bad} = rpc:multicall(couch_replicator_scheduler, job, [JobId]),
- case [JobInfo || {ok, JobInfo} <- Res] of
- [JobInfo | _] ->
- {ok, JobInfo};
- [] ->
- {error, not_found}
- end.
--spec restart_job(binary() | list() | rep_id()) ->
- {ok, {[_]}} | {error, not_found}.
-restart_job(JobId0) ->
- JobId = couch_replicator_ids:convert(JobId0),
- {Res, _} = rpc:multicall(couch_replicator_scheduler, restart_job, [JobId]),
- case [JobInfo || {ok, JobInfo} <- Res] of
- [JobInfo | _] ->
- {ok, JobInfo};
- [] ->
- {error, not_found}
- end.
--spec active_doc(binary(), binary()) -> {ok, {[_]}} | {error, not_found}.
-active_doc(DbName, DocId) ->
- try
- Shards = mem3:shards(DbName),
- Live = [node() | nodes()],
- Nodes = lists:usort([
- N
- || #shard{node = N} <- Shards,
- lists:member(N, Live)
- ]),
- Owner = mem3:owner(DbName, DocId, Nodes),
- case active_doc_rpc(DbName, DocId, [Owner]) of
- {ok, DocInfo} ->
- {ok, DocInfo};
- {error, not_found} ->
- active_doc_rpc(DbName, DocId, Nodes -- [Owner])
- end
- catch
- % Might be a local database
- error:database_does_not_exist ->
- active_doc_rpc(DbName, DocId, [node()])
- end.
--spec active_doc_rpc(binary(), binary(), [node()]) ->
- {ok, {[_]}} | {error, not_found}.
-active_doc_rpc(_DbName, _DocId, []) ->
- {error, not_found};
-active_doc_rpc(DbName, DocId, [Node]) when Node =:= node() ->
- couch_replicator_doc_processor:doc(DbName, DocId);
-active_doc_rpc(DbName, DocId, Nodes) ->
- {Res, _Bad} = rpc:multicall(
- Nodes,
- couch_replicator_doc_processor,
- doc,
- [DbName, DocId]
- ),
- case [DocInfo || {ok, DocInfo} <- Res] of
- [DocInfo | _] ->
- {ok, DocInfo};
- [] ->
- {error, not_found}
- end.
--spec doc(binary(), binary(), any()) -> {ok, {[_]}} | {error, not_found}.
-doc(RepDb, DocId, UserCtx) ->
- case active_doc(RepDb, DocId) of
- {ok, DocInfo} ->
- {ok, DocInfo};
- {error, not_found} ->
- doc_from_db(RepDb, DocId, UserCtx)
- end.
--spec doc_from_db(binary(), binary(), any()) -> {ok, {[_]}} | {error, not_found}.
-doc_from_db(RepDb, DocId, UserCtx) ->
- case fabric:open_doc(RepDb, DocId, [UserCtx, ejson_body]) of
- {ok, Doc} ->
- {ok, info_from_doc(RepDb, couch_doc:to_json_obj(Doc, []))};
- {not_found, _Reason} ->
- {error, not_found}
- end.
--spec info_from_doc(binary(), {[_]}) -> {[_]}.
-info_from_doc(RepDb, {Props}) ->
- DocId = get_value(<<"_id">>, Props),
- Source = get_value(<<"source">>, Props),
- Target = get_value(<<"target">>, Props),
- State0 = state_atom(get_value(<<"_replication_state">>, Props, null)),
- StateTime = get_value(<<"_replication_state_time">>, Props, null),
- {State1, StateInfo, ErrorCount, StartTime} =
- case State0 of
- completed ->
- {InfoP} = get_value(<<"_replication_stats">>, Props, {[]}),
- case lists:keytake(<<"start_time">>, 1, InfoP) of
- {value, {_, Time}, InfoP1} ->
- {State0, {InfoP1}, 0, Time};
- false ->
- case lists:keytake(start_time, 1, InfoP) of
- {value, {_, Time}, InfoP1} ->
- {State0, {InfoP1}, 0, Time};
- false ->
- {State0, {InfoP}, 0, null}
- end
- end;
- failed ->
- Info = get_value(<<"_replication_state_reason">>, Props, nil),
- EJsonInfo = couch_replicator_utils:ejson_state_info(Info),
- {State0, EJsonInfo, 1, StateTime};
- _OtherState ->
- {null, null, 0, null}
- end,
- {[
- {doc_id, DocId},
- {database, RepDb},
- {id, null},
- {source, strip_url_creds(Source)},
- {target, strip_url_creds(Target)},
- {state, State1},
- {error_count, ErrorCount},
- {info, StateInfo},
- {start_time, StartTime},
- {last_updated, StateTime}
- ]}.
-state_atom(<<"triggered">>) ->
- % This handles a legacy case were document wasn't converted yet
- triggered;
-state_atom(State) when is_binary(State) ->
- erlang:binary_to_existing_atom(State, utf8);
-state_atom(State) when is_atom(State) ->
- State.
--spec check_authorization(rep_id(), #user_ctx{}) -> ok | not_found.
-check_authorization(RepId, #user_ctx{name = Name} = Ctx) ->
- case couch_replicator_scheduler:rep_state(RepId) of
- #rep{user_ctx = #user_ctx{name = Name}} ->
- ok;
- #rep{} ->
- couch_httpd:verify_is_server_admin(Ctx);
- nil ->
- not_found
- end.
-authorization_test_() ->
- {
- foreach,
- fun() -> ok end,
- fun(_) -> meck:unload() end,
- [
- t_admin_is_always_authorized(),
- t_username_must_match(),
- t_replication_not_found()
- ]
- }.
-t_admin_is_always_authorized() ->
- ?_test(begin
- expect_rep_user_ctx(<<"someuser">>, <<"_admin">>),
- UserCtx = #user_ctx{name = <<"adm">>, roles = [<<"_admin">>]},
- ?assertEqual(ok, check_authorization(<<"RepId">>, UserCtx))
- end).
-t_username_must_match() ->
- ?_test(begin
- expect_rep_user_ctx(<<"user">>, <<"somerole">>),
- UserCtx1 = #user_ctx{name = <<"user">>, roles = [<<"somerole">>]},
- ?assertEqual(ok, check_authorization(<<"RepId">>, UserCtx1)),
- UserCtx2 = #user_ctx{name = <<"other">>, roles = [<<"somerole">>]},
- ?assertThrow(
- {unauthorized, _},
- check_authorization(
- <<"RepId">>,
- UserCtx2
- )
- )
- end).
-t_replication_not_found() ->
- ?_test(begin
- meck:expect(couch_replicator_scheduler, rep_state, 1, nil),
- UserCtx1 = #user_ctx{name = <<"user">>, roles = [<<"somerole">>]},
- ?assertEqual(not_found, check_authorization(<<"RepId">>, UserCtx1)),
- UserCtx2 = #user_ctx{name = <<"adm">>, roles = [<<"_admin">>]},
- ?assertEqual(not_found, check_authorization(<<"RepId">>, UserCtx2))
- end).
-expect_rep_user_ctx(Name, Role) ->
- meck:expect(
- couch_replicator_scheduler,
- rep_state,
- fun(_Id) ->
- UserCtx = #user_ctx{name = Name, roles = [Role]},
- #rep{user_ctx = UserCtx}
- end
- ).
-strip_url_creds_test_() ->
- {
- setup,
- fun() ->
- meck:expect(config, get, fun(_, _, Default) -> Default end)
- end,
- fun(_) ->
- meck:unload()
- end,
- [
- t_strip_http_basic_creds(),
- t_strip_http_props_creds(),
- t_strip_local_db_creds(),
- t_strip_url_creds_errors()
- ]
- }.
-t_strip_local_db_creds() ->
- ?_test(?assertEqual(<<"localdb">>, strip_url_creds(<<"localdb">>))).
-t_strip_http_basic_creds() ->
- ?_test(begin
- Url1 = <<"http://adm:pass@host/db">>,
- ?assertEqual(<<"http://host/db/">>, strip_url_creds(Url1)),
- Url2 = <<"https://adm:pass@host/db">>,
- ?assertEqual(<<"https://host/db/">>, strip_url_creds(Url2)),
- Url3 = <<"http://adm:pass@host:80/db">>,
- ?assertEqual(<<"http://host:80/db/">>, strip_url_creds(Url3)),
- Url4 = <<"http://adm:pass@host/db?a=b&c=d">>,
- ?assertEqual(
- <<"http://host/db?a=b&c=d">>,
- strip_url_creds(Url4)
- )
- end).
-t_strip_http_props_creds() ->
- ?_test(begin
- Props1 = {[{<<"url">>, <<"http://adm:pass@host/db">>}]},
- ?assertEqual(<<"http://host/db/">>, strip_url_creds(Props1)),
- Props2 =
- {[
- {<<"url">>, <<"http://host/db">>},
- {<<"headers">>, {[{<<"Authorization">>, <<"Basic pa55">>}]}}
- ]},
- ?assertEqual(<<"http://host/db/">>, strip_url_creds(Props2))
- end).
-t_strip_url_creds_errors() ->
- ?_test(begin
- Bad1 = {[{<<"url">>, <<"http://adm:pass/bad">>}]},
- ?assertEqual(null, strip_url_creds(Bad1)),
- Bad2 = {[{<<"garbage">>, <<"more garbage">>}]},
- ?assertEqual(null, strip_url_creds(Bad2)),
- Bad3 = <<"http://a:b:c">>,
- ?assertEqual(null, strip_url_creds(Bad3)),
- Bad4 = <<"http://adm:pass:pass/bad">>,
- ?assertEqual(null, strip_url_creds(Bad4)),
- ?assertEqual(null, strip_url_creds(null)),
- ?assertEqual(null, strip_url_creds(42)),
- ?assertEqual(null, strip_url_creds([<<"a">>, <<"b">>])),
- Bad5 = {[{<<"source_proxy">>, <<"http://adm:pass/bad">>}]},
- ?assertEqual(null, strip_url_creds(Bad5))
- end).