diff options
Diffstat (limited to 'src/couch_replicator/src/couch_replicator.erl')
-rw-r--r-- | src/couch_replicator/src/couch_replicator.erl | 419 |
1 files changed, 0 insertions, 419 deletions
diff --git a/src/couch_replicator/src/couch_replicator.erl b/src/couch_replicator/src/couch_replicator.erl deleted file mode 100644 index 39b3903ea..000000000 --- a/src/couch_replicator/src/couch_replicator.erl +++ /dev/null @@ -1,419 +0,0 @@ -% Licensed under the Apache License, Version 2.0 (the "License"); you may not -% use this file except in compliance with the License. You may obtain a copy of -% the License at -% -% http://www.apache.org/licenses/LICENSE-2.0 -% -% Unless required by applicable law or agreed to in writing, software -% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -% License for the specific language governing permissions and limitations under -% the License. - --module(couch_replicator). - --export([ - replicate/2, - replication_states/0, - job/1, - doc/3, - active_doc/2, - info_from_doc/2, - restart_job/1 -]). - --include_lib("couch/include/couch_db.hrl"). --include("couch_replicator.hrl"). --include_lib("couch_replicator/include/couch_replicator_api_wrap.hrl"). --include_lib("couch_mrview/include/couch_mrview.hrl"). --include_lib("mem3/include/mem3.hrl"). - --define(DESIGN_DOC_CREATION_DELAY_MSEC, 1000). --define(REPLICATION_STATES, [ - % Just added to scheduler - initializing, - % Could not be turned into a replication job - error, - % Scheduled and running - running, - % Scheduled and waiting to run - pending, - % Scheduled but crashing, backed off by the scheduler - crashing, - % Non-continuous (normal) completed replication - completed, - % Terminal failure, will not be retried anymore - failed -]). - --import(couch_util, [ - get_value/2, - get_value/3 -]). - --spec replicate({[_]}, any()) -> - {ok, {continuous, binary()}} - | {ok, {[_]}} - | {ok, {cancelled, binary()}} - | {error, any()} - | no_return(). -replicate(PostBody, Ctx) -> - {ok, Rep0} = couch_replicator_utils:parse_rep_doc(PostBody, Ctx), - Rep = Rep0#rep{start_time = os:timestamp()}, - #rep{id = RepId, options = Options, user_ctx = UserCtx} = Rep, - case get_value(cancel, Options, false) of - true -> - CancelRepId = - case get_value(id, Options, nil) of - nil -> - RepId; - RepId2 -> - RepId2 - end, - case check_authorization(CancelRepId, UserCtx) of - ok -> - cancel_replication(CancelRepId); - not_found -> - {error, not_found} - end; - false -> - check_authorization(RepId, UserCtx), - {ok, Listener} = rep_result_listener(RepId), - Result = do_replication_loop(Rep), - couch_replicator_notifier:stop(Listener), - Result - end. - --spec do_replication_loop(#rep{}) -> - {ok, {continuous, binary()}} | {ok, tuple()} | {error, any()}. -do_replication_loop(#rep{id = {BaseId, Ext} = Id, options = Options} = Rep) -> - ok = couch_replicator_scheduler:add_job(Rep), - case get_value(continuous, Options, false) of - true -> - {ok, {continuous, ?l2b(BaseId ++ Ext)}}; - false -> - wait_for_result(Id) - end. - --spec rep_result_listener(rep_id()) -> {ok, pid()}. -rep_result_listener(RepId) -> - ReplyTo = self(), - {ok, _Listener} = couch_replicator_notifier:start_link( - fun - ({_, RepId2, _} = Ev) when RepId2 =:= RepId -> - ReplyTo ! Ev; - (_) -> - ok - end - ). - --spec wait_for_result(rep_id()) -> - {ok, {[_]}} | {error, any()}. -wait_for_result(RepId) -> - receive - {finished, RepId, RepResult} -> - {ok, RepResult}; - {error, RepId, Reason} -> - {error, Reason} - end. - --spec cancel_replication(rep_id()) -> - {ok, {cancelled, binary()}} | {error, not_found}. -cancel_replication({BasedId, Extension} = RepId) -> - FullRepId = BasedId ++ Extension, - couch_log:notice("Canceling replication '~s' ...", [FullRepId]), - case couch_replicator_scheduler:rep_state(RepId) of - #rep{} -> - ok = couch_replicator_scheduler:remove_job(RepId), - couch_log:notice("Replication '~s' cancelled", [FullRepId]), - {ok, {cancelled, ?l2b(FullRepId)}}; - nil -> - couch_log:notice("Replication '~s' not found", [FullRepId]), - {error, not_found} - end. - --spec replication_states() -> [atom()]. -replication_states() -> - ?REPLICATION_STATES. - --spec strip_url_creds(binary() | {[_]}) -> binary(). -strip_url_creds(Endpoint) -> - try couch_replicator_docs:parse_rep_db(Endpoint, [], []) of - #httpdb{url = Url} -> - iolist_to_binary(couch_util:url_strip_password(Url)) - catch - throw:{error, local_endpoints_not_supported} -> - Endpoint; - error:_ -> - % Avoid exposing any part of the URL in case there is a password in - % the malformed endpoint URL - null - end. - --spec job(binary()) -> {ok, {[_]}} | {error, not_found}. -job(JobId0) when is_binary(JobId0) -> - JobId = couch_replicator_ids:convert(JobId0), - {Res, _Bad} = rpc:multicall(couch_replicator_scheduler, job, [JobId]), - case [JobInfo || {ok, JobInfo} <- Res] of - [JobInfo | _] -> - {ok, JobInfo}; - [] -> - {error, not_found} - end. - --spec restart_job(binary() | list() | rep_id()) -> - {ok, {[_]}} | {error, not_found}. -restart_job(JobId0) -> - JobId = couch_replicator_ids:convert(JobId0), - {Res, _} = rpc:multicall(couch_replicator_scheduler, restart_job, [JobId]), - case [JobInfo || {ok, JobInfo} <- Res] of - [JobInfo | _] -> - {ok, JobInfo}; - [] -> - {error, not_found} - end. - --spec active_doc(binary(), binary()) -> {ok, {[_]}} | {error, not_found}. -active_doc(DbName, DocId) -> - try - Shards = mem3:shards(DbName), - Live = [node() | nodes()], - Nodes = lists:usort([ - N - || #shard{node = N} <- Shards, - lists:member(N, Live) - ]), - Owner = mem3:owner(DbName, DocId, Nodes), - case active_doc_rpc(DbName, DocId, [Owner]) of - {ok, DocInfo} -> - {ok, DocInfo}; - {error, not_found} -> - active_doc_rpc(DbName, DocId, Nodes -- [Owner]) - end - catch - % Might be a local database - error:database_does_not_exist -> - active_doc_rpc(DbName, DocId, [node()]) - end. - --spec active_doc_rpc(binary(), binary(), [node()]) -> - {ok, {[_]}} | {error, not_found}. -active_doc_rpc(_DbName, _DocId, []) -> - {error, not_found}; -active_doc_rpc(DbName, DocId, [Node]) when Node =:= node() -> - couch_replicator_doc_processor:doc(DbName, DocId); -active_doc_rpc(DbName, DocId, Nodes) -> - {Res, _Bad} = rpc:multicall( - Nodes, - couch_replicator_doc_processor, - doc, - [DbName, DocId] - ), - case [DocInfo || {ok, DocInfo} <- Res] of - [DocInfo | _] -> - {ok, DocInfo}; - [] -> - {error, not_found} - end. - --spec doc(binary(), binary(), any()) -> {ok, {[_]}} | {error, not_found}. -doc(RepDb, DocId, UserCtx) -> - case active_doc(RepDb, DocId) of - {ok, DocInfo} -> - {ok, DocInfo}; - {error, not_found} -> - doc_from_db(RepDb, DocId, UserCtx) - end. - --spec doc_from_db(binary(), binary(), any()) -> {ok, {[_]}} | {error, not_found}. -doc_from_db(RepDb, DocId, UserCtx) -> - case fabric:open_doc(RepDb, DocId, [UserCtx, ejson_body]) of - {ok, Doc} -> - {ok, info_from_doc(RepDb, couch_doc:to_json_obj(Doc, []))}; - {not_found, _Reason} -> - {error, not_found} - end. - --spec info_from_doc(binary(), {[_]}) -> {[_]}. -info_from_doc(RepDb, {Props}) -> - DocId = get_value(<<"_id">>, Props), - Source = get_value(<<"source">>, Props), - Target = get_value(<<"target">>, Props), - State0 = state_atom(get_value(<<"_replication_state">>, Props, null)), - StateTime = get_value(<<"_replication_state_time">>, Props, null), - {State1, StateInfo, ErrorCount, StartTime} = - case State0 of - completed -> - {InfoP} = get_value(<<"_replication_stats">>, Props, {[]}), - case lists:keytake(<<"start_time">>, 1, InfoP) of - {value, {_, Time}, InfoP1} -> - {State0, {InfoP1}, 0, Time}; - false -> - case lists:keytake(start_time, 1, InfoP) of - {value, {_, Time}, InfoP1} -> - {State0, {InfoP1}, 0, Time}; - false -> - {State0, {InfoP}, 0, null} - end - end; - failed -> - Info = get_value(<<"_replication_state_reason">>, Props, nil), - EJsonInfo = couch_replicator_utils:ejson_state_info(Info), - {State0, EJsonInfo, 1, StateTime}; - _OtherState -> - {null, null, 0, null} - end, - {[ - {doc_id, DocId}, - {database, RepDb}, - {id, null}, - {source, strip_url_creds(Source)}, - {target, strip_url_creds(Target)}, - {state, State1}, - {error_count, ErrorCount}, - {info, StateInfo}, - {start_time, StartTime}, - {last_updated, StateTime} - ]}. - -state_atom(<<"triggered">>) -> - % This handles a legacy case were document wasn't converted yet - triggered; -state_atom(State) when is_binary(State) -> - erlang:binary_to_existing_atom(State, utf8); -state_atom(State) when is_atom(State) -> - State. - --spec check_authorization(rep_id(), #user_ctx{}) -> ok | not_found. -check_authorization(RepId, #user_ctx{name = Name} = Ctx) -> - case couch_replicator_scheduler:rep_state(RepId) of - #rep{user_ctx = #user_ctx{name = Name}} -> - ok; - #rep{} -> - couch_httpd:verify_is_server_admin(Ctx); - nil -> - not_found - end. - --ifdef(TEST). - --include_lib("eunit/include/eunit.hrl"). - -authorization_test_() -> - { - foreach, - fun() -> ok end, - fun(_) -> meck:unload() end, - [ - t_admin_is_always_authorized(), - t_username_must_match(), - t_replication_not_found() - ] - }. - -t_admin_is_always_authorized() -> - ?_test(begin - expect_rep_user_ctx(<<"someuser">>, <<"_admin">>), - UserCtx = #user_ctx{name = <<"adm">>, roles = [<<"_admin">>]}, - ?assertEqual(ok, check_authorization(<<"RepId">>, UserCtx)) - end). - -t_username_must_match() -> - ?_test(begin - expect_rep_user_ctx(<<"user">>, <<"somerole">>), - UserCtx1 = #user_ctx{name = <<"user">>, roles = [<<"somerole">>]}, - ?assertEqual(ok, check_authorization(<<"RepId">>, UserCtx1)), - UserCtx2 = #user_ctx{name = <<"other">>, roles = [<<"somerole">>]}, - ?assertThrow( - {unauthorized, _}, - check_authorization( - <<"RepId">>, - UserCtx2 - ) - ) - end). - -t_replication_not_found() -> - ?_test(begin - meck:expect(couch_replicator_scheduler, rep_state, 1, nil), - UserCtx1 = #user_ctx{name = <<"user">>, roles = [<<"somerole">>]}, - ?assertEqual(not_found, check_authorization(<<"RepId">>, UserCtx1)), - UserCtx2 = #user_ctx{name = <<"adm">>, roles = [<<"_admin">>]}, - ?assertEqual(not_found, check_authorization(<<"RepId">>, UserCtx2)) - end). - -expect_rep_user_ctx(Name, Role) -> - meck:expect( - couch_replicator_scheduler, - rep_state, - fun(_Id) -> - UserCtx = #user_ctx{name = Name, roles = [Role]}, - #rep{user_ctx = UserCtx} - end - ). - -strip_url_creds_test_() -> - { - setup, - fun() -> - meck:expect(config, get, fun(_, _, Default) -> Default end) - end, - fun(_) -> - meck:unload() - end, - [ - t_strip_http_basic_creds(), - t_strip_http_props_creds(), - t_strip_local_db_creds(), - t_strip_url_creds_errors() - ] - }. - -t_strip_local_db_creds() -> - ?_test(?assertEqual(<<"localdb">>, strip_url_creds(<<"localdb">>))). - -t_strip_http_basic_creds() -> - ?_test(begin - Url1 = <<"http://adm:pass@host/db">>, - ?assertEqual(<<"http://host/db/">>, strip_url_creds(Url1)), - Url2 = <<"https://adm:pass@host/db">>, - ?assertEqual(<<"https://host/db/">>, strip_url_creds(Url2)), - Url3 = <<"http://adm:pass@host:80/db">>, - ?assertEqual(<<"http://host:80/db/">>, strip_url_creds(Url3)), - Url4 = <<"http://adm:pass@host/db?a=b&c=d">>, - ?assertEqual( - <<"http://host/db?a=b&c=d">>, - strip_url_creds(Url4) - ) - end). - -t_strip_http_props_creds() -> - ?_test(begin - Props1 = {[{<<"url">>, <<"http://adm:pass@host/db">>}]}, - ?assertEqual(<<"http://host/db/">>, strip_url_creds(Props1)), - Props2 = - {[ - {<<"url">>, <<"http://host/db">>}, - {<<"headers">>, {[{<<"Authorization">>, <<"Basic pa55">>}]}} - ]}, - ?assertEqual(<<"http://host/db/">>, strip_url_creds(Props2)) - end). - -t_strip_url_creds_errors() -> - ?_test(begin - Bad1 = {[{<<"url">>, <<"http://adm:pass/bad">>}]}, - ?assertEqual(null, strip_url_creds(Bad1)), - Bad2 = {[{<<"garbage">>, <<"more garbage">>}]}, - ?assertEqual(null, strip_url_creds(Bad2)), - Bad3 = <<"http://a:b:c">>, - ?assertEqual(null, strip_url_creds(Bad3)), - Bad4 = <<"http://adm:pass:pass/bad">>, - ?assertEqual(null, strip_url_creds(Bad4)), - ?assertEqual(null, strip_url_creds(null)), - ?assertEqual(null, strip_url_creds(42)), - ?assertEqual(null, strip_url_creds([<<"a">>, <<"b">>])), - Bad5 = {[{<<"source_proxy">>, <<"http://adm:pass/bad">>}]}, - ?assertEqual(null, strip_url_creds(Bad5)) - end). - --endif. |