diff options
author | Jan Lehnardt <jan@apache.org> | 2018-03-05 21:42:11 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2018-03-05 21:42:11 +0100 |
commit | b3216354003089ba58d4c178ec33f68d2e5accf6 (patch) | |
tree | fdfac5e086a667dc44192e37dc60434a33ff3574 | |
parent | b884abc6f9762e4ffc65d85a0c655cb1ac0d917a (diff) | |
parent | 2c43e6201e1c67418ed2dc945ce04945e764da0a (diff) | |
download | couchdb-fix/compaction-daemon.tar.gz |
Merge branch 'master' into fix/compaction-daemonfix/compaction-daemon
18 files changed, 1017 insertions, 139 deletions
diff --git a/rel/overlay/etc/default.ini b/rel/overlay/etc/default.ini index ec3e22314..9762536ac 100644 --- a/rel/overlay/etc/default.ini +++ b/rel/overlay/etc/default.ini @@ -430,6 +430,21 @@ ssl_certificate_max_depth = 3 ; Re-check cluster state at least every cluster_quiet_period seconds ; cluster_quiet_period = 60 +; List of replicator client authentication plugins to try. Plugins will be +; tried in order. The first to initialize successfully will be used for that +; particular endpoint (source or target). Normally couch_replicator_auth_noop +; would be used at the end of the list as a "catch-all". It doesn't do anything +; and effectively implements the previous behavior of using basic auth. +; There are currently two plugins available: +; couch_replicator_auth_session - use _session cookie authentication +; couch_replicator_auth_noop - use basic authentication (previous default) +; Currently previous default behavior is still the default. To start using +; session auth, use this as the list of plugins: +; `couch_replicator_auth_session,couch_replicator_auth_noop`. +; In a future release the session plugin might be used by default. +;auth_plugins = couch_replicator_auth_noop + + [compaction_daemon] ; The delay, in seconds, between each check for which database and view indexes ; need to be compacted. diff --git a/src/couch/src/couch_bt_engine.erl b/src/couch/src/couch_bt_engine.erl index 347c9318f..940689708 100644 --- a/src/couch/src/couch_bt_engine.erl +++ b/src/couch/src/couch_bt_engine.erl @@ -464,7 +464,10 @@ fold_docs(St, UserFun, UserAcc, Options) -> fold_local_docs(St, UserFun, UserAcc, Options) -> - fold_docs_int(St, St#st.local_tree, UserFun, UserAcc, Options). + case fold_docs_int(St, St#st.local_tree, UserFun, UserAcc, Options) of + {ok, _Reds, FinalAcc} -> {ok, null, FinalAcc}; + {ok, FinalAcc} -> {ok, FinalAcc} + end. fold_changes(St, SinceSeq, UserFun, UserAcc, Options) -> diff --git a/src/couch_replicator/src/couch_replicator_api_wrap.hrl b/src/couch_replicator/include/couch_replicator_api_wrap.hrl index d2e0fdff5..0f8213c51 100644 --- a/src/couch_replicator/src/couch_replicator_api_wrap.hrl +++ b/src/couch_replicator/include/couch_replicator_api_wrap.hrl @@ -14,7 +14,7 @@ -record(httpdb, { url, - oauth = nil, + auth_props = [], headers = [ {"Accept", "application/json"}, {"User-Agent", "CouchDB-Replicator/" ++ couch_server:get_version()} @@ -26,13 +26,6 @@ httpc_pool = nil, http_connections, first_error_timestamp = nil, - proxy_url -}). - --record(oauth, { - consumer_key, - token, - token_secret, - consumer_secret, - signature_method + proxy_url, + auth_context = nil }). diff --git a/src/couch_replicator/src/couch_replicator.erl b/src/couch_replicator/src/couch_replicator.erl index 8b7cd5cb1..39141c301 100644 --- a/src/couch_replicator/src/couch_replicator.erl +++ b/src/couch_replicator/src/couch_replicator.erl @@ -25,7 +25,7 @@ -include_lib("couch/include/couch_db.hrl"). -include("couch_replicator.hrl"). --include("couch_replicator_api_wrap.hrl"). +-include_lib("couch_replicator/include/couch_replicator_api_wrap.hrl"). -include_lib("couch_mrview/include/couch_mrview.hrl"). -include_lib("mem3/include/mem3.hrl"). diff --git a/src/couch_replicator/src/couch_replicator_api_wrap.erl b/src/couch_replicator/src/couch_replicator_api_wrap.erl index b5ea57c3c..44c290d33 100644 --- a/src/couch_replicator/src/couch_replicator_api_wrap.erl +++ b/src/couch_replicator/src/couch_replicator_api_wrap.erl @@ -142,7 +142,8 @@ db_open(DbName, Options, Create, _CreateParams) -> throw({unauthorized, DbName}) end. -db_close(#httpdb{httpc_pool = Pool}) -> +db_close(#httpdb{httpc_pool = Pool} = HttpDb) -> + couch_replicator_auth:cleanup(HttpDb), unlink(Pool), ok = couch_replicator_httpc_pool:stop(Pool); db_close(DbName) -> @@ -1009,7 +1010,7 @@ header_value(Key, Headers, Default) -> normalize_db(#httpdb{} = HttpDb) -> #httpdb{ url = HttpDb#httpdb.url, - oauth = HttpDb#httpdb.oauth, + auth_props = lists:sort(HttpDb#httpdb.auth_props), headers = lists:keysort(1, HttpDb#httpdb.headers), timeout = HttpDb#httpdb.timeout, ibrowse_options = lists:keysort(1, HttpDb#httpdb.ibrowse_options), @@ -1037,7 +1038,7 @@ maybe_append_create_query_params(Db, CreateParams) -> normalize_http_db_test() -> HttpDb = #httpdb{ url = "http://host/db", - oauth = #oauth{}, + auth_props = [{"key", "val"}], headers = [{"k2","v2"}, {"k1","v1"}], timeout = 30000, ibrowse_options = [{k2, v2}, {k1, v1}], diff --git a/src/couch_replicator/src/couch_replicator_auth.erl b/src/couch_replicator/src/couch_replicator_auth.erl new file mode 100644 index 000000000..1c9a49723 --- /dev/null +++ b/src/couch_replicator/src/couch_replicator_auth.erl @@ -0,0 +1,99 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(couch_replicator_auth). + + +-export([ + initialize/1, + update_headers/2, + handle_response/3, + cleanup/1 +]). + + +-include_lib("couch_replicator/include/couch_replicator_api_wrap.hrl"). + + +-type headers() :: [{string(), string()}]. +-type code() :: non_neg_integer(). + + +-define(DEFAULT_PLUGINS, "couch_replicator_auth_noop"). + + +% Behavior API + +-callback initialize(#httpdb{}) -> {ok, #httpdb{}, term()} | ignore. + +-callback update_headers(term(), headers()) -> {headers(), term()}. + +-callback handle_response(term(), code(), headers()) -> + {continue | retry, term()}. + +-callback cleanup(term()) -> ok. + + +% Main API + +-spec initialize(#httpdb{}) -> {ok, #httpdb{}} | {error, term()}. +initialize(#httpdb{auth_context = nil} = HttpDb) -> + case try_initialize(get_plugin_modules(), HttpDb) of + {ok, Mod, HttpDb1, Context} -> + {ok, HttpDb1#httpdb{auth_context = {Mod, Context}}}; + {error, Error} -> + {error, Error} + end. + + +-spec update_headers(#httpdb{}, headers()) -> {headers(), #httpdb{}}. +update_headers(#httpdb{auth_context = {Mod, Context}} = HttpDb, Headers) -> + {Headers1, Context1} = Mod:update_headers(Context, Headers), + {Headers1, HttpDb#httpdb{auth_context = {Mod, Context1}}}. + + +-spec handle_response(#httpdb{}, code(), headers()) -> + {continue | retry, term()}. +handle_response(#httpdb{} = HttpDb, Code, Headers) -> + {Mod, Context} = HttpDb#httpdb.auth_context, + {Res, Context1} = Mod:handle_response(Context, Code, Headers), + {Res, HttpDb#httpdb{auth_context = {Mod, Context1}}}. + + +-spec cleanup(#httpdb{}) -> #httpdb{}. +cleanup(#httpdb{auth_context = {Module, Context}} = HttpDb) -> + ok = Module:cleanup(Context), + HttpDb#httpdb{auth_context = nil}. + + +% Private helper functions + +-spec get_plugin_modules() -> [atom()]. +get_plugin_modules() -> + Plugins1 = config:get("replicator", "auth_plugins", ?DEFAULT_PLUGINS), + [list_to_atom(Plugin) || Plugin <- string:tokens(Plugins1, ",")]. + + +try_initialize([], _HttpDb) -> + {error, no_more_auth_plugins_left_to_try}; +try_initialize([Mod | Modules], HttpDb) -> + try Mod:initialize(HttpDb) of + {ok, HttpDb1, Context} -> + {ok, Mod, HttpDb1, Context}; + ignore -> + try_initialize(Modules, HttpDb); + {error, Error} -> + {error, Error} + catch + error:undef -> + {error, {could_not_load_plugin_module, Mod}} + end. diff --git a/src/couch_replicator/src/couch_replicator_auth_noop.erl b/src/couch_replicator/src/couch_replicator_auth_noop.erl new file mode 100644 index 000000000..5dbf13335 --- /dev/null +++ b/src/couch_replicator/src/couch_replicator_auth_noop.erl @@ -0,0 +1,52 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(couch_replicator_auth_noop). + + +-behavior(couch_replicator_auth). + + +-export([ + initialize/1, + update_headers/2, + handle_response/3, + cleanup/1 +]). + + +-include_lib("couch_replicator/include/couch_replicator_api_wrap.hrl"). + + +-type headers() :: [{string(), string()}]. +-type code() :: non_neg_integer(). + + +-spec initialize(#httpdb{}) -> {ok, #httpdb{}, term()} | ignore. +initialize(#httpdb{} = HttpDb) -> + {ok, HttpDb, nil}. + + +-spec update_headers(term(), headers()) -> {headers(), term()}. +update_headers(Context, Headers) -> + {Headers, Context}. + + +-spec handle_response(term(), code(), headers()) -> + {continue | retry, term()}. +handle_response(Context, _Code, _Headers) -> + {continue, Context}. + + +-spec cleanup(term()) -> ok. +cleanup(_Context) -> + ok. diff --git a/src/couch_replicator/src/couch_replicator_auth_session.erl b/src/couch_replicator/src/couch_replicator_auth_session.erl new file mode 100644 index 000000000..3fff29572 --- /dev/null +++ b/src/couch_replicator/src/couch_replicator_auth_session.erl @@ -0,0 +1,692 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + + +% This is the replicator session auth plugin. It implements session based +% authentication for the replicator. The only public API are the functions from +% the couch_replicator_auth behaviour. Most of the logic and state is in the +% gen_server. An instance of a gen_server could be spawned for the source and +% target endpoints of each replication jobs. +% +% The workflow is roughly this: +% +% * On initialization, try to get a cookie in `refresh/1` If an error occurs, +% the crash. If `_session` endpoint fails with a 404 (not found), return +% `ignore` assuming session authentication is not support or we simply hit a +% non-CouchDb server. +% +% * Before each request, auth framework calls `update_headers` API function. +% Before updating the headers and returning, check if need to refresh again. +% The check looks `next_refresh` time. If that time is set (not `infinity`) +% and just expired, then obtain a new cookie, then update headers and +% return. +% +% * After each request, auth framework calls `handle_response` function. If +% request was successful check if a new cookie was sent by the server in the +% `Set-Cookie` header. If it was then then that becomes the current cookie. +% +% * If last request has an auth failure, check if request used a stale cookie +% In this case nothing is done, and the client is told to retry. Next time +% it updates its headers befor the request it should pick up the latest +% cookie. +% +% * If last request failed and cookie was the latest known cookie, schedule a +% refresh and tell client to retry. However, if the cookie was just updated, +% tell the client to continue such that it will handle the auth failure on +% its own via a set of retries with exponential backoffs. This is it to +% ensure if something goes wrong and one of the endpoints issues invalid +% cookies, replicator won't be stuck in a busy loop refreshing them. + + +-module(couch_replicator_auth_session). + + +-behaviour(couch_replicator_auth). +-behaviour(gen_server). + + +-export([ + initialize/1, + update_headers/2, + handle_response/3, + cleanup/1 +]). + +-export([ + init/1, + terminate/2, + handle_call/3, + handle_cast/2, + handle_info/2, + code_change/3, + format_status/2 +]). + + +-include_lib("ibrowse/include/ibrowse.hrl"). +-include_lib("couch_replicator/include/couch_replicator_api_wrap.hrl"). + + +-type headers() :: [{string(), string()}]. +-type code() :: non_neg_integer(). +-type creds() :: {string() | undefined, string() | undefined}. + + +-define(MIN_UPDATE_INTERVAL, 5). + + +-record(state, { + epoch = 0 :: non_neg_integer(), + cookie :: string() | undefined, + user :: string() | undefined, + pass :: string() | undefined, + httpdb_timeout :: integer(), + httpdb_pool :: pid(), + httpdb_ibrowse_options = [] :: list(), + session_url :: string(), + next_refresh = infinity :: infinity | non_neg_integer(), + refresh_tstamp = 0 :: non_neg_integer() +}). + + +% Behavior API callbacks + +-spec initialize(#httpdb{}) -> {ok, #httpdb{}, term()} | ignore. +initialize(#httpdb{} = HttpDb) -> + case init_state(HttpDb) of + {ok, HttpDb1, State} -> + {ok, Pid} = gen_server:start_link(?MODULE, [State], []), + Epoch = State#state.epoch, + Timeout = State#state.httpdb_timeout, + {ok, HttpDb1, {Pid, Epoch, Timeout}}; + {error, Error} -> + {error, Error}; + ignore -> + ignore + end. + + +-spec update_headers(term(), headers()) -> {headers(), term()}. +update_headers({Pid, Epoch, Timeout}, Headers) -> + Args = {update_headers, Headers, Epoch}, + {Headers1, Epoch1} = gen_server:call(Pid, Args, Timeout * 10), + {Headers1, {Pid, Epoch1, Timeout}}. + + +-spec handle_response(term(), code(), headers()) -> + {continue | retry, term()}. +handle_response({Pid, Epoch, Timeout}, Code, Headers) -> + Args = {handle_response, Code, Headers, Epoch}, + {Retry, Epoch1} = gen_server:call(Pid, Args, Timeout * 10), + {Retry, {Pid, Epoch1, Timeout}}. + + +-spec cleanup(term()) -> ok. +cleanup({Pid, _Epoch, Timeout}) -> + gen_server:call(Pid, stop, Timeout * 10). + + +%% gen_server functions + +init([#state{} = State]) -> + {ok, State}. + + +terminate(_Reason, _State) -> + ok. + + +handle_call({update_headers, Headers, _Epoch}, _From, State) -> + case maybe_refresh(State) of + {ok, State1} -> + Cookie = "AuthSession=" ++ State1#state.cookie, + Headers1 = [{"Cookie", Cookie} | Headers], + {reply, {Headers1, State1#state.epoch}, State1}; + {error, Error} -> + LogMsg = "~p: Stopping session auth plugin because of error ~p", + couch_log:error(LogMsg, [?MODULE, Error]), + {stop, Error, State} + end; + +handle_call({handle_response, Code, Headers, Epoch}, _From, State) -> + {Retry, State1} = process_response(Code, Headers, Epoch, State), + {reply, {Retry, State1#state.epoch}, State1}; + +handle_call(stop, _From, State) -> + {stop, normal, ok, State}. + + +handle_cast(Msg, State) -> + couch_log:error("~p: Received un-expected cast ~p", [?MODULE, Msg]), + {noreply, State}. + + +handle_info(Msg, State) -> + couch_log:error("~p : Received un-expected message ~p", [?MODULE, Msg]), + {noreply, State}. + + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + + +format_status(_Opt, [_PDict, State]) -> + [ + {epoch, State#state.epoch}, + {user, State#state.user}, + {session_url, State#state.session_url}, + {refresh_tstamp, State#state.refresh_tstamp} + ]. + + +%% Private helper functions + + +-spec init_state(#httpdb{}) -> + {ok, #httpdb{}, #state{}} | {error, term()} | ignore. +init_state(#httpdb{} = HttpDb) -> + case extract_creds(HttpDb) of + {ok, User, Pass, HttpDb1} -> + State = #state{ + user = User, + pass = Pass, + session_url = get_session_url(HttpDb1#httpdb.url), + httpdb_pool = HttpDb1#httpdb.httpc_pool, + httpdb_timeout = HttpDb1#httpdb.timeout, + httpdb_ibrowse_options = HttpDb1#httpdb.ibrowse_options + }, + case refresh(State) of + {ok, State1} -> + {ok, HttpDb1, State1}; + {error, {session_not_supported, _, _}} -> + ignore; + {error, Error} -> + {error, Error} + end; + {error, missing_credentials} -> + ignore; + {error, Error} -> + {error, Error} + end. + + +-spec extract_creds(#httpdb{}) -> + {ok, string(), string(), #httpdb{}} | {error, term()}. +extract_creds(#httpdb{url = Url, headers = Headers} = HttpDb) -> + {{HeadersUser, HeadersPass}, HeadersNoCreds} = + couch_replicator_utils:remove_basic_auth_from_headers(Headers), + case extract_creds_from_url(Url) of + {ok, UrlUser, UrlPass, UrlNoCreds} -> + case pick_creds({UrlUser, UrlPass}, {HeadersUser, HeadersPass}) of + {ok, User, Pass} -> + HttpDb1 = HttpDb#httpdb{ + url = UrlNoCreds, + headers = HeadersNoCreds + }, + {ok, User, Pass, HttpDb1}; + {error, Error} -> + {error, Error} + end; + {error, Error} -> + {error, Error} + end. + + +% Credentials could be specified in the url and/or in the headers. +% * If no credentials specified return error. +% * If specified in url but not in headers, pick url creds. +% * Otherwise pick headers creds. +% +-spec pick_creds(creds(), creds()) -> + {ok, string(), string()} | {error, missing_credentials}. +pick_creds({undefined, _}, {undefined, _}) -> + {error, missing_credentials}; +pick_creds({UrlUser, UrlPass}, {undefined, _}) -> + {ok, UrlUser, UrlPass}; +pick_creds({_, _}, {HeadersUser, HeadersPass}) -> + {ok, HeadersUser, HeadersPass}. + + +-spec extract_creds_from_url(string()) -> + {ok, string() | undefined, string() | undefined, string()} | + {error, term()}. +extract_creds_from_url(Url) -> + case ibrowse_lib:parse_url(Url) of + {error, Error} -> + {error, Error}; + #url{username = undefined, password = undefined} -> + {ok, undefined, undefined, Url}; + #url{protocol = Proto, username = User, password = Pass} -> + % Excise user and pass parts from the url. Try to keep the host, + % port and path as they were in the original. + Prefix = lists:concat([Proto, "://", User, ":", Pass, "@"]), + Suffix = lists:sublist(Url, length(Prefix) + 1, length(Url) + 1), + NoCreds = lists:concat([Proto, "://", Suffix]), + {ok, User, Pass, NoCreds} + end. + + +-spec process_response(non_neg_integer(), headers(), + non_neg_integer(), #state{}) -> {retry | continue, #state{}}. +process_response(403, _Headers, Epoch, State) -> + process_auth_failure(Epoch, State); +process_response(401, _Headers, Epoch, State) -> + process_auth_failure(Epoch, State); +process_response(Code, Headers, _Epoch, State) when Code >= 200, Code < 300 -> + % If server noticed cookie is about to time out it can send a new cookie in + % the response headers. Take advantage of that and refresh the cookie. + State1 = case maybe_update_cookie(Headers, State) of + {ok, UpdatedState} -> + UpdatedState; + {error, cookie_not_found} -> + State; + {error, Other} -> + LogMsg = "~p : Could not parse cookie from response headers ~p", + couch_log:error(LogMsg, [?MODULE, Other]), + State + end, + {continue, State1}; +process_response(_Code, _Headers, _Epoch, State) -> + {continue, State}. + + +-spec process_auth_failure(non_neg_integer(), #state{}) -> + {retry | continue, #state{}}. +process_auth_failure(Epoch, #state{epoch = StateEpoch} = State) + when StateEpoch > Epoch -> + % This request used an outdated cookie, tell it to immediately retry + % and it will pick up the current cookie when its headers are updated + {retry, State}; +process_auth_failure(Epoch, #state{epoch = Epoch} = State) -> + MinInterval = min_update_interval(), + case cookie_age_sec(State, now_sec()) of + AgeSec when AgeSec < MinInterval -> + % A recently acquired cookie failed. Schedule a refresh and + % return `continue` to let httpc's retry apply a backoff + {continue, schedule_refresh(now_sec() + MinInterval, State)}; + _AgeSec -> + % Current cookie failed auth. Schedule refresh and ask + % httpc to retry the request. + {retry, schedule_refresh(now_sec(), State)} + end. + + +-spec get_session_url(string()) -> string(). +get_session_url(Url) -> + #url{ + protocol = Proto, + host = Host, + port = Port + } = ibrowse_lib:parse_url(Url), + WithPort = lists:concat([Proto, "://", Host, ":", Port]), + case lists:prefix(WithPort, Url) of + true -> + % Explicit port specified in the original url + WithPort ++ "/_session"; + false -> + % Implicit proto default port was used + lists:concat([Proto, "://", Host, "/_session"]) + end. + + +-spec schedule_refresh(non_neg_integer(), #state{}) -> #state{}. +schedule_refresh(T, #state{next_refresh = Tc} = State) when T < Tc -> + State#state{next_refresh = T}; +schedule_refresh(_, #state{} = State) -> + State. + + +-spec maybe_refresh(#state{}) -> {ok, #state{}} | {error, term()}. +maybe_refresh(#state{next_refresh = T} = State) -> + case now_sec() >= T of + true -> + refresh(State#state{next_refresh = infinity}); + false -> + {ok, State} + end. + + +-spec refresh(#state{}) -> {ok, #state{}} | {error, term()}. +refresh(#state{session_url = Url, user = User, pass = Pass} = State) -> + Body = mochiweb_util:urlencode([{name, User}, {password, Pass}]), + Headers = [{"Content-Type", "application/x-www-form-urlencoded"}], + Result = http_request(State, Url, Headers, post, Body), + http_response(Result, State). + + +-spec http_request(#state{}, string(), headers(), atom(), iolist()) -> + {ok, string(), headers(), binary()} | {error, term()}. +http_request(#state{httpdb_pool = Pool} = State, Url, Headers, Method, Body) -> + Timeout = State#state.httpdb_timeout, + Opts = [ + {response_format, binary}, + {inactivity_timeout, Timeout} + | State#state.httpdb_ibrowse_options + ], + {ok, Wrk} = couch_replicator_httpc_pool:get_worker(Pool), + try + ibrowse:send_req_direct(Wrk, Url, Headers, Method, Body, Opts, Timeout) + after + ok = couch_replicator_httpc_pool:release_worker(Pool, Wrk) + end. + + +-spec http_response({ok, string(), headers(), binary()} | {error, term()}, + #state{}) -> {ok, #state{}} | {error, term()}. +http_response({ok, "200", Headers, _}, State) -> + maybe_update_cookie(Headers, State); +http_response({ok, "401", _, _}, #state{session_url = Url, user = User}) -> + {error, {session_request_unauthorized, Url, User}}; +http_response({ok, "403", _, _}, #state{session_url = Url, user = User}) -> + {error, {session_request_forbidden, Url, User}}; +http_response({ok, "404", _, _}, #state{session_url = Url, user = User}) -> + {error, {session_not_supported, Url, User}}; +http_response({ok, Code, _, _}, #state{session_url = Url, user = User}) -> + {error, {session_unexpected_result, Code, Url, User}}; +http_response({error, Error}, #state{session_url = Url, user = User}) -> + {error, {session_request_failed, Url, User, Error}}. + + +-spec parse_cookie(list()) -> {ok, string()} | {error, term()}. +parse_cookie(Headers0) -> + Headers = mochiweb_headers:make(Headers0), + case mochiweb_headers:get_value("Set-Cookie", Headers) of + undefined -> + {error, cookie_not_found}; + CookieHeader -> + CookieKVs = mochiweb_cookies:parse_cookie(CookieHeader), + CaseInsKVs = mochiweb_headers:make(CookieKVs), + case mochiweb_headers:get_value("AuthSession", CaseInsKVs) of + undefined -> + {error, cookie_format_invalid}; + Cookie -> + {ok, Cookie} + end + end. + + +-spec maybe_update_cookie(headers(), #state{}) -> + {ok, string()} | {error, term()}. +maybe_update_cookie(ResponseHeaders, State) -> + case parse_cookie(ResponseHeaders) of + {ok, Cookie} -> + {ok, update_cookie(State, Cookie, now_sec())}; + {error, Error} -> + {error, Error} + end. + + +-spec update_cookie(#state{}, string(), non_neg_integer()) -> #state{}. +update_cookie(#state{cookie = Cookie} = State, Cookie, _) -> + State; +update_cookie(#state{epoch = Epoch} = State, Cookie, NowSec) -> + State#state{ + epoch = Epoch + 1, + cookie = Cookie, + refresh_tstamp = NowSec + }. + + +-spec cookie_age_sec(#state{}, non_neg_integer()) -> non_neg_integer(). +cookie_age_sec(#state{refresh_tstamp = RefreshTs}, Now) -> + max(0, Now - RefreshTs). + + +-spec now_sec() -> non_neg_integer(). +now_sec() -> + {Mega, Sec, _Micro} = os:timestamp(), + Mega * 1000000 + Sec. + + +-spec min_update_interval() -> non_neg_integer(). +min_update_interval() -> + config:get_integer("replicator", "session_min_update_interval", + ?MIN_UPDATE_INTERVAL). + + +-ifdef(TEST). + +-include_lib("eunit/include/eunit.hrl"). + + +get_session_url_test_() -> + [?_assertEqual(SessionUrl, get_session_url(Url)) || {Url, SessionUrl} <- [ + {"http://host/db", "http://host/_session"}, + {"http://127.0.0.1/db", "http://127.0.0.1/_session"}, + {"http://host/x/y/z", "http://host/_session"}, + {"http://host:5984/db", "http://host:5984/_session"}, + {"https://host/db?q=1", "https://host/_session"} + ]]. + + +extract_creds_success_test_() -> + DefaultHeaders = (#httpdb{})#httpdb.headers, + [?_assertEqual({ok, User, Pass, HttpDb2}, extract_creds(HttpDb1)) || + {HttpDb1, {User, Pass, HttpDb2}} <- [ + { + #httpdb{url = "http://u:p@x.y/db"}, + {"u", "p", #httpdb{url = "http://x.y/db"}} + }, + { + #httpdb{url = "http://u:p@h:80/db"}, + {"u", "p", #httpdb{url = "http://h:80/db"}} + }, + { + #httpdb{url = "https://u:p@h/db"}, + {"u", "p", #httpdb{url = "https://h/db"}} + }, + { + #httpdb{url = "http://u:p@127.0.0.1:5984/db"}, + {"u", "p", #httpdb{url = "http://127.0.0.1:5984/db"}} + }, + { + #httpdb{url = "http://u:p@[2001:db8:a1b:12f9::1]/db"}, + {"u", "p", #httpdb{url = "http://[2001:db8:a1b:12f9::1]/db"}} + }, + { + #httpdb{url = "http://u:p@[2001:db8:a1b:12f9::1]:81/db"}, + {"u", "p", #httpdb{url = "http://[2001:db8:a1b:12f9::1]:81/db"}} + }, + { + #httpdb{url = "http://u:p@x.y/db/other?query=Z&query=w"}, + {"u", "p", #httpdb{url = "http://x.y/db/other?query=Z&query=w"}} + }, + { + #httpdb{ + url = "http://h/db", + headers = DefaultHeaders ++ [ + {"Authorization", "Basic " ++ b64creds("u", "p")} + ] + }, + {"u", "p", #httpdb{url = "http://h/db"}} + }, + { + #httpdb{ + url = "http://h/db", + headers = DefaultHeaders ++ [ + {"aUthoriZation", "bASIC " ++ b64creds("U", "p")} + ] + }, + {"U", "p", #httpdb{url = "http://h/db"}} + }, + { + #httpdb{ + url = "http://u1:p1@h/db", + headers = DefaultHeaders ++ [ + {"Authorization", "Basic " ++ b64creds("u2", "p2")} + ] + }, + {"u2", "p2", #httpdb{url = "http://h/db"}} + } + ]]. + + +cookie_update_test_() -> + { + foreach, + fun setup/0, + fun teardown/1, + [ + t_do_refresh(), + t_dont_refresh(), + t_process_auth_failure(), + t_process_auth_failure_stale_epoch(), + t_process_auth_failure_too_frequent(), + t_process_ok_update_cookie(), + t_process_ok_no_cookie(), + t_init_state_fails_on_401(), + t_init_state_404(), + t_init_state_no_creds(), + t_init_state_http_error() + ] + }. + + +t_do_refresh() -> + ?_test(begin + State = #state{next_refresh = 0}, + {ok, State1} = maybe_refresh(State), + ?assertMatch(#state{ + next_refresh = infinity, + epoch = 1, + cookie = "Abc" + }, State1) + end). + + +t_dont_refresh() -> + ?_test(begin + State = #state{next_refresh = now_sec() + 100}, + {ok, State1} = maybe_refresh(State), + ?assertMatch(State, State1), + State2 = #state{next_refresh = infinity}, + {ok, State3} = maybe_refresh(State2), + ?assertMatch(State2, State3) + end). + + +t_process_auth_failure() -> + ?_test(begin + State = #state{epoch = 1, refresh_tstamp = 0}, + {retry, State1} = process_auth_failure(1, State), + NextRefresh = State1#state.next_refresh, + ?assert(NextRefresh =< now_sec()) + end). + + +t_process_auth_failure_stale_epoch() -> + ?_test(begin + State = #state{epoch = 3}, + ?assertMatch({retry, State}, process_auth_failure(2, State)) + end). + + +t_process_auth_failure_too_frequent() -> + ?_test(begin + State = #state{epoch = 4, refresh_tstamp = now_sec()}, + ?assertMatch({continue, _}, process_auth_failure(4, State)) + end). + + +t_process_ok_update_cookie() -> + ?_test(begin + Headers = [{"set-CookiE", "AuthSession=xyz; Path=/;"}, {"X", "y"}], + Res = process_response(200, Headers, 1, #state{}), + ?assertMatch({continue, #state{cookie = "xyz", epoch = 1}}, Res), + State = #state{cookie = "xyz", refresh_tstamp = 42, epoch = 2}, + Res2 = process_response(200, Headers, 1, State), + ?assertMatch({continue, #state{cookie = "xyz", epoch = 2}}, Res2) + end). + + +t_process_ok_no_cookie() -> + ?_test(begin + Headers = [{"X", "y"}], + State = #state{cookie = "old", epoch = 3, refresh_tstamp = 42}, + Res = process_response(200, Headers, 1, State), + ?assertMatch({continue, State}, Res) + end). + + +t_init_state_fails_on_401() -> + ?_test(begin + mock_http_401_response(), + {error, Error} = init_state(#httpdb{url = "http://u:p@h"}), + SessionUrl = "http://h/_session", + ?assertEqual({session_request_unauthorized, SessionUrl, "u"}, Error) + end). + + +t_init_state_404() -> + ?_test(begin + mock_http_404_response(), + ?assertEqual(ignore, init_state(#httpdb{url = "http://u:p@h"})) + end). + + +t_init_state_no_creds() -> + ?_test(begin + ?_assertEqual(ignore, init_state(#httpdb{url = "http://h"})) + end). + + +t_init_state_http_error() -> + ?_test(begin + mock_http_error_response(), + {error, Error} = init_state(#httpdb{url = "http://u:p@h"}), + SessionUrl = "http://h/_session", + ?assertEqual({session_request_failed, SessionUrl, "u", x}, Error) + end). + + +setup() -> + meck:expect(couch_replicator_httpc_pool, get_worker, 1, {ok, worker}), + meck:expect(couch_replicator_httpc_pool, release_worker, 2, ok), + meck:expect(config, get, fun(_, _, Default) -> Default end), + mock_http_cookie_response("Abc"), + ok. + + +teardown(_) -> + meck:unload(). + + +mock_http_cookie_response(Cookie) -> + Resp = {ok, "200", [{"Set-Cookie", "AuthSession=" ++ Cookie}], []}, + meck:expect(ibrowse, send_req_direct, 7, Resp). + + +mock_http_401_response() -> + meck:expect(ibrowse, send_req_direct, 7, {ok, "401", [], []}). + + +mock_http_404_response() -> + meck:expect(ibrowse, send_req_direct, 7, {ok, "404", [], []}). + + +mock_http_error_response() -> + meck:expect(ibrowse, send_req_direct, 7, {error, x}). + + +extract_creds_error_test_() -> + [?_assertMatch({error, Error}, extract_creds(HttpDb)) || + {HttpDb, Error} <- [ + {#httpdb{url = "some_junk"}, invalid_uri}, + {#httpdb{url = "http://h/db"}, missing_credentials} + ]]. + + +b64creds(User, Pass) -> + base64:encode_to_string(User ++ ":" ++ Pass). + + +-endif. diff --git a/src/couch_replicator/src/couch_replicator_changes_reader.erl b/src/couch_replicator/src/couch_replicator_changes_reader.erl index 3659d9592..2e4df5365 100644 --- a/src/couch_replicator/src/couch_replicator_changes_reader.erl +++ b/src/couch_replicator/src/couch_replicator_changes_reader.erl @@ -19,7 +19,7 @@ -export([read_changes/5]). -include_lib("couch/include/couch_db.hrl"). --include("couch_replicator_api_wrap.hrl"). +-include_lib("couch_replicator/include/couch_replicator_api_wrap.hrl"). -include("couch_replicator.hrl"). -import(couch_util, [ diff --git a/src/couch_replicator/src/couch_replicator_docs.erl b/src/couch_replicator/src/couch_replicator_docs.erl index 1fe91eca4..62d21fe12 100644 --- a/src/couch_replicator/src/couch_replicator_docs.erl +++ b/src/couch_replicator/src/couch_replicator_docs.erl @@ -35,7 +35,7 @@ -include_lib("couch/include/couch_db.hrl"). -include_lib("ibrowse/include/ibrowse.hrl"). -include_lib("mem3/include/mem3.hrl"). --include("couch_replicator_api_wrap.hrl"). +-include_lib("couch_replicator/include/couch_replicator_api_wrap.hrl"). -include("couch_replicator.hrl"). -include("couch_replicator_js_functions.hrl"). @@ -396,28 +396,9 @@ parse_rep_db({Props}, Proxy, Options) -> {BinHeaders} = get_value(<<"headers">>, Props, {[]}), Headers = lists:ukeysort(1, [{?b2l(K), ?b2l(V)} || {K, V} <- BinHeaders]), DefaultHeaders = (#httpdb{})#httpdb.headers, - OAuth = case get_value(<<"oauth">>, AuthProps) of - undefined -> - nil; - {OauthProps} -> - #oauth{ - consumer_key = ?b2l(get_value(<<"consumer_key">>, OauthProps)), - token = ?b2l(get_value(<<"token">>, OauthProps)), - token_secret = ?b2l(get_value(<<"token_secret">>, OauthProps)), - consumer_secret = ?b2l(get_value(<<"consumer_secret">>, - OauthProps)), - signature_method = - case get_value(<<"signature_method">>, OauthProps) of - undefined -> hmac_sha1; - <<"PLAINTEXT">> -> plaintext; - <<"HMAC-SHA1">> -> hmac_sha1; - <<"RSA-SHA1">> -> rsa_sha1 - end - } - end, #httpdb{ url = Url, - oauth = OAuth, + auth_props = AuthProps, headers = lists:ukeymerge(1, Headers, DefaultHeaders), ibrowse_options = lists:keysort(1, [{socket_options, get_value(socket_options, Options)} | @@ -695,8 +676,7 @@ strip_credentials(Url) when is_binary(Url) -> "http\\1://\\2", [{return, binary}]); strip_credentials({Props}) -> - Props1 = lists:keydelete(<<"oauth">>, 1, Props), - {lists:keydelete(<<"headers">>, 1, Props1)}. + {lists:keydelete(<<"headers">>, 1, Props)}. error_reason({shutdown, Error}) -> @@ -774,10 +754,6 @@ check_strip_credentials_test() -> }, { {[{<<"_id">>, <<"foo">>}]}, - {[{<<"_id">>, <<"foo">>}, {<<"oauth">>, <<"bar">>}]} - }, - { - {[{<<"_id">>, <<"foo">>}]}, {[{<<"_id">>, <<"foo">>}, {<<"headers">>, <<"bar">>}]} }, { @@ -786,8 +762,7 @@ check_strip_credentials_test() -> }, { {[{<<"_id">>, <<"foo">>}]}, - {[{<<"_id">>, <<"foo">>}, {<<"oauth">>, <<"bar">>}, - {<<"headers">>, <<"baz">>}]} + {[{<<"_id">>, <<"foo">>}, {<<"headers">>, <<"baz">>}]} } ]]. diff --git a/src/couch_replicator/src/couch_replicator_httpc.erl b/src/couch_replicator/src/couch_replicator_httpc.erl index 45472f431..6e787514b 100644 --- a/src/couch_replicator/src/couch_replicator_httpc.erl +++ b/src/couch_replicator/src/couch_replicator_httpc.erl @@ -14,7 +14,7 @@ -include_lib("couch/include/couch_db.hrl"). -include_lib("ibrowse/include/ibrowse.hrl"). --include("couch_replicator_api_wrap.hrl"). +-include_lib("couch_replicator/include/couch_replicator_api_wrap.hrl"). -export([setup/1]). -export([send_req/3]). @@ -51,8 +51,17 @@ setup(Db) -> undefined -> Url; _ when is_list(ProxyURL) -> ProxyURL end, - {ok, Pid} = couch_replicator_httpc_pool:start_link(HttpcURL, [{max_connections, MaxConns}]), - {ok, Db#httpdb{httpc_pool = Pid}}. + {ok, Pid} = couch_replicator_httpc_pool:start_link(HttpcURL, + [{max_connections, MaxConns}]), + case couch_replicator_auth:initialize(Db#httpdb{httpc_pool = Pid}) of + {ok, Db1} -> + {ok, Db1}; + {error, Error} -> + LogMsg = "~p: auth plugin initialization failed ~p ~p", + LogUrl = couch_util:url_strip_password(Url), + couch_log:error(LogMsg, [?MODULE, LogUrl, Error]), + throw({replication_auth_error, Error}) + end. send_req(HttpDb, Params1, Callback) -> @@ -86,11 +95,11 @@ send_req(HttpDb, Params1, Callback) -> end. -send_ibrowse_req(#httpdb{headers = BaseHeaders} = HttpDb, Params) -> +send_ibrowse_req(#httpdb{headers = BaseHeaders} = HttpDb0, Params) -> Method = get_value(method, Params, get), UserHeaders = lists:keysort(1, get_value(headers, Params, [])), Headers1 = lists:ukeymerge(1, UserHeaders, BaseHeaders), - Headers2 = oauth_header(HttpDb, Params) ++ Headers1, + {Headers2, HttpDb} = couch_replicator_auth:update_headers(HttpDb0, Headers1), Url = full_url(HttpDb, Params), Body = get_value(body, Params, []), case get_value(path, Params) == "_changes" of @@ -157,6 +166,7 @@ process_response({ok, Code, Headers, Body}, Worker, HttpDb, Params, Callback) -> Json -> ?JSON_DECODE(Json) end, + process_auth_response(HttpDb, Ok, Headers, Params), Callback(Ok, Headers, EJson); R when R =:= 301 ; R =:= 302 ; R =:= 303 -> backoff_success(HttpDb, Params), @@ -179,8 +189,9 @@ process_stream_response(ReqId, Worker, HttpDb, Params, Callback) -> backoff(HttpDb#httpdb{timeout = Timeout}, Params); Ok when (Ok >= 200 andalso Ok < 300) ; (Ok >= 400 andalso Ok < 500) -> backoff_success(HttpDb, Params), + HttpDb1 = process_auth_response(HttpDb, Ok, Headers, Params), StreamDataFun = fun() -> - stream_data_self(HttpDb, Params, Worker, ReqId, Callback) + stream_data_self(HttpDb1, Params, Worker, ReqId, Callback) end, put(?STREAM_STATUS, {streaming, Worker}), ibrowse:stream_next(ReqId), @@ -190,9 +201,9 @@ process_stream_response(ReqId, Worker, HttpDb, Params, Callback) -> catch throw:{maybe_retry_req, connection_closed} -> maybe_retry({connection_closed, mid_stream}, - Worker, HttpDb, Params); + Worker, HttpDb1, Params); throw:{maybe_retry_req, Err} -> - maybe_retry(Err, Worker, HttpDb, Params) + maybe_retry(Err, Worker, HttpDb1, Params) end; R when R =:= 301 ; R =:= 302 ; R =:= 303 -> backoff_success(HttpDb, Params), @@ -216,6 +227,16 @@ process_stream_response(ReqId, Worker, HttpDb, Params, Callback) -> end. +process_auth_response(HttpDb, Code, Headers, Params) -> + case couch_replicator_auth:handle_response(HttpDb, Code, Headers) of + {continue, HttpDb1} -> + HttpDb1; + {retry, HttpDb1} -> + log_retry_error(Params, HttpDb1, 0, Code), + throw({retry, HttpDb1, Params}) + end. + + % Only streaming HTTP requests send messages back from % the ibrowse worker process. We can detect that based % on the ibrowse_req_id format. This just drops all @@ -397,28 +418,6 @@ query_args_to_string([{K, V} | Rest], Acc) -> query_args_to_string(Rest, [K ++ "=" ++ couch_httpd:quote(V) | Acc]). -oauth_header(#httpdb{oauth = nil}, _ConnParams) -> - []; -oauth_header(#httpdb{url = BaseUrl, oauth = OAuth}, ConnParams) -> - Consumer = { - OAuth#oauth.consumer_key, - OAuth#oauth.consumer_secret, - OAuth#oauth.signature_method - }, - Method = case get_value(method, ConnParams, get) of - get -> "GET"; - post -> "POST"; - put -> "PUT"; - head -> "HEAD" - end, - QSL = get_value(qs, ConnParams, []), - OAuthParams = oauth:sign(Method, - BaseUrl ++ get_value(path, ConnParams, []), - QSL, Consumer, OAuth#oauth.token, OAuth#oauth.token_secret) -- QSL, - [{"Authorization", - "OAuth " ++ oauth:header_params_encode(OAuthParams)}]. - - do_redirect(_Worker, Code, Headers, #httpdb{url = Url} = HttpDb, Params, _Cb) -> RedirectUrl = redirect_url(Headers, Url), {HttpDb2, Params2} = after_redirect(RedirectUrl, Code, HttpDb, Params), diff --git a/src/couch_replicator/src/couch_replicator_ids.erl b/src/couch_replicator/src/couch_replicator_ids.erl index e7067622b..e8faf8ea3 100644 --- a/src/couch_replicator/src/couch_replicator_ids.erl +++ b/src/couch_replicator/src/couch_replicator_ids.erl @@ -21,7 +21,7 @@ -include_lib("ibrowse/include/ibrowse.hrl"). -include_lib("couch/include/couch_db.hrl"). --include("couch_replicator_api_wrap.hrl"). +-include_lib("couch_replicator/include/couch_replicator_api_wrap.hrl"). -include("couch_replicator.hrl"). % replication_id/1 and replication_id/2 will attempt to fetch @@ -127,62 +127,25 @@ maybe_append_options(Options, RepOptions) -> end, [], Options). -get_rep_endpoint(_UserCtx, #httpdb{url=Url, headers=Headers, oauth=OAuth}) -> +get_rep_endpoint(_UserCtx, #httpdb{url=Url, headers=Headers}) -> DefaultHeaders = (#httpdb{})#httpdb.headers, - case OAuth of - nil -> - {remote, Url, Headers -- DefaultHeaders}; - #oauth{} -> - {remote, Url, Headers -- DefaultHeaders, OAuth} - end; + {remote, Url, Headers -- DefaultHeaders}; get_rep_endpoint(UserCtx, <<DbName/binary>>) -> {local, DbName, UserCtx}. get_v4_endpoint(UserCtx, #httpdb{} = HttpDb) -> - {Url, Headers, OAuth} = case get_rep_endpoint(UserCtx, HttpDb) of - {remote, U, Hds} -> - {U, Hds, undefined}; - {remote, U, Hds, OA} -> - {U, Hds, OA} - end, - {UserFromHeaders, HeadersWithoutBasicAuth} = remove_basic_auth(Headers), + {remote, Url, Headers} = get_rep_endpoint(UserCtx, HttpDb), + {{UserFromHeaders, _}, HeadersWithoutBasicAuth} = + couch_replicator_utils:remove_basic_auth_from_headers(Headers), {UserFromUrl, Host, NonDefaultPort, Path} = get_v4_url_info(Url), User = pick_defined_value([UserFromUrl, UserFromHeaders]), + OAuth = undefined, % Keep this to ensure checkpoints don't change {remote, User, Host, NonDefaultPort, Path, HeadersWithoutBasicAuth, OAuth}; get_v4_endpoint(UserCtx, <<DbName/binary>>) -> {local, DbName, UserCtx}. -remove_basic_auth(Headers) -> - case lists:partition(fun is_basic_auth/1, Headers) of - {[], HeadersWithoutBasicAuth} -> - {undefined, HeadersWithoutBasicAuth}; - {[{_, "Basic " ++ Base64} | _], HeadersWithoutBasicAuth} -> - User = get_basic_auth_user(Base64), - {User, HeadersWithoutBasicAuth} - end. - - -is_basic_auth({"Authorization", "Basic " ++ _Base64}) -> - true; -is_basic_auth(_) -> - false. - - -get_basic_auth_user(Base64) -> - try re:split(base64:decode(Base64), ":", [{return, list}, {parts, 2}]) of - [User, _Pass] -> - User; - _ -> - undefined - catch - % Tolerate invalid B64 values here to avoid crashing replicator - error:function_clause -> - undefined - end. - - pick_defined_value(Values) -> case [V || V <- Values, V /= undefined] of [] -> diff --git a/src/couch_replicator/src/couch_replicator_scheduler.erl b/src/couch_replicator/src/couch_replicator_scheduler.erl index be956b6a7..0b396346a 100644 --- a/src/couch_replicator/src/couch_replicator_scheduler.erl +++ b/src/couch_replicator/src/couch_replicator_scheduler.erl @@ -56,7 +56,7 @@ -include("couch_replicator_scheduler.hrl"). -include("couch_replicator.hrl"). --include("couch_replicator_api_wrap.hrl"). +-include_lib("couch_replicator/include/couch_replicator_api_wrap.hrl"). -include_lib("couch/include/couch_db.hrl"). %% types diff --git a/src/couch_replicator/src/couch_replicator_scheduler_job.erl b/src/couch_replicator/src/couch_replicator_scheduler_job.erl index 0438249be..1467d9f30 100644 --- a/src/couch_replicator/src/couch_replicator_scheduler_job.erl +++ b/src/couch_replicator/src/couch_replicator_scheduler_job.erl @@ -29,7 +29,7 @@ ]). -include_lib("couch/include/couch_db.hrl"). --include("couch_replicator_api_wrap.hrl"). +-include_lib("couch_replicator/include/couch_replicator_api_wrap.hrl"). -include("couch_replicator_scheduler.hrl"). -include("couch_replicator.hrl"). diff --git a/src/couch_replicator/src/couch_replicator_utils.erl b/src/couch_replicator/src/couch_replicator_utils.erl index 01881e423..218fcf501 100644 --- a/src/couch_replicator/src/couch_replicator_utils.erl +++ b/src/couch_replicator/src/couch_replicator_utils.erl @@ -27,7 +27,8 @@ get_json_value/3, pp_rep_id/1, iso8601/1, - filter_state/3 + filter_state/3, + remove_basic_auth_from_headers/1 ]). -export([ @@ -36,7 +37,7 @@ -include_lib("couch/include/couch_db.hrl"). -include("couch_replicator.hrl"). --include("couch_replicator_api_wrap.hrl"). +-include_lib("couch_replicator/include/couch_replicator_api_wrap.hrl"). -import(couch_util, [ get_value/2, @@ -174,3 +175,88 @@ filter_state(State, States, Info) -> false -> skip end. + + +remove_basic_auth_from_headers(Headers) -> + Headers1 = mochiweb_headers:make(Headers), + case mochiweb_headers:get_value("Authorization", Headers1) of + undefined -> + {{undefined, undefined}, Headers}; + Auth -> + {Basic, Base64} = lists:splitwith(fun(X) -> X =/= $\s end, Auth), + maybe_remove_basic_auth(string:to_lower(Basic), Base64, Headers1) + end. + + +maybe_remove_basic_auth("basic", " " ++ Base64, Headers) -> + Headers1 = mochiweb_headers:delete_any("Authorization", Headers), + {decode_basic_creds(Base64), mochiweb_headers:to_list(Headers1)}; +maybe_remove_basic_auth(_, _, Headers) -> + {{undefined, undefined}, mochiweb_headers:to_list(Headers)}. + + +decode_basic_creds(Base64) -> + try re:split(base64:decode(Base64), ":", [{return, list}, {parts, 2}]) of + [User, Pass] -> + {User, Pass}; + _ -> + {undefined, undefined} + catch + % Tolerate invalid B64 values here to avoid crashing replicator + error:function_clause -> + {undefined, undefined} + end. + + +-ifdef(TEST). + +-include_lib("eunit/include/eunit.hrl"). + +remove_basic_auth_from_headers_test_() -> + [?_assertMatch({{User, Pass}, NoAuthHeaders}, + remove_basic_auth_from_headers(Headers)) || + {{User, Pass, NoAuthHeaders}, Headers} <- [ + { + {undefined, undefined, []}, + [] + }, + { + {undefined, undefined, [{"h", "v"}]}, + [{"h", "v"}] + }, + { + {undefined, undefined, [{"Authorization", "junk"}]}, + [{"Authorization", "junk"}] + }, + { + {undefined, undefined, []}, + [{"Authorization", "basic X"}] + }, + { + {"user", "pass", []}, + [{"Authorization", "Basic " ++ b64creds("user", "pass")}] + }, + { + {"user", "pass", []}, + [{"AuThorization", "Basic " ++ b64creds("user", "pass")}] + }, + { + {"user", "pass", []}, + [{"Authorization", "bAsIc " ++ b64creds("user", "pass")}] + }, + { + {"user", "pass", [{"h", "v"}]}, + [ + {"Authorization", "Basic " ++ b64creds("user", "pass")}, + {"h", "v"} + ] + } + ] + ]. + + +b64creds(User, Pass) -> + base64:encode_to_string(User ++ ":" ++ Pass). + + +-endif. diff --git a/src/couch_replicator/src/couch_replicator_worker.erl b/src/couch_replicator/src/couch_replicator_worker.erl index db6b72b2e..e51565866 100644 --- a/src/couch_replicator/src/couch_replicator_worker.erl +++ b/src/couch_replicator/src/couch_replicator_worker.erl @@ -22,7 +22,7 @@ -export([handle_call/3, handle_cast/2, handle_info/2]). -include_lib("couch/include/couch_db.hrl"). --include("couch_replicator_api_wrap.hrl"). +-include_lib("couch_replicator/include/couch_replicator_api_wrap.hrl"). -include("couch_replicator.hrl"). % TODO: maybe make both buffer max sizes configurable diff --git a/src/couch_replicator/test/couch_replicator_proxy_tests.erl b/src/couch_replicator/test/couch_replicator_proxy_tests.erl index a40e5b166..4f545bcb5 100644 --- a/src/couch_replicator/test/couch_replicator_proxy_tests.erl +++ b/src/couch_replicator/test/couch_replicator_proxy_tests.erl @@ -14,7 +14,7 @@ -include_lib("couch/include/couch_eunit.hrl"). -include_lib("couch_replicator/src/couch_replicator.hrl"). --include_lib("couch_replicator/src/couch_replicator_api_wrap.hrl"). +-include_lib("couch_replicator/include/couch_replicator_api_wrap.hrl"). setup() -> diff --git a/src/couch_replicator/test/couch_replicator_small_max_request_size_target.erl b/src/couch_replicator/test/couch_replicator_small_max_request_size_target.erl index 6f3308c39..af3a285f5 100644 --- a/src/couch_replicator/test/couch_replicator_small_max_request_size_target.erl +++ b/src/couch_replicator/test/couch_replicator_small_max_request_size_target.erl @@ -61,8 +61,8 @@ reduce_max_request_size_test_() -> % attachment which exceed maximum request size are simply % closed instead of returning a 413 request. That makes these % tests flaky. - % ++ [{Pair, fun should_replicate_one_with_attachment/2} - % || Pair <- Pairs] + ++ [{Pair, fun should_replicate_one_with_attachment/2} + || Pair <- Pairs] } }. @@ -90,12 +90,12 @@ should_replicate_one({From, To}, {_Ctx, {Source, Target}}) -> % POST-ing individual documents directly and skip bulk_docs. Test that case % separately % See note in main test function why this was disabled. -% should_replicate_one_with_attachment({From, To}, {_Ctx, {Source, Target}}) -> -% {lists:flatten(io_lib:format("~p -> ~p", [From, To])), -% {inorder, [should_populate_source_one_large_attachment(Source), -% should_populate_source(Source), -% should_replicate(Source, Target), -% should_compare_databases(Source, Target, [<<"doc0">>])]}}. +should_replicate_one_with_attachment({From, To}, {_Ctx, {Source, Target}}) -> + {lists:flatten(io_lib:format("~p -> ~p", [From, To])), + {inorder, [should_populate_source_one_large_attachment(Source), + should_populate_source(Source), + should_replicate(Source, Target), + should_compare_databases(Source, Target, [<<"doc0">>])]}}. should_populate_source({remote, Source}) -> @@ -112,11 +112,11 @@ should_populate_source_one_large_one_small(Source) -> {timeout, ?TIMEOUT_EUNIT, ?_test(one_large_one_small(Source, 12000, 3000))}. -% should_populate_source_one_large_attachment({remote, Source}) -> -% should_populate_source_one_large_attachment(Source); +should_populate_source_one_large_attachment({remote, Source}) -> + should_populate_source_one_large_attachment(Source); -% should_populate_source_one_large_attachment(Source) -> -% {timeout, ?TIMEOUT_EUNIT, ?_test(one_large_attachment(Source, 70000, 70000))}. +should_populate_source_one_large_attachment(Source) -> + {timeout, ?TIMEOUT_EUNIT, ?_test(one_large_attachment(Source, 70000, 70000))}. should_replicate({remote, Source}, Target) -> @@ -156,8 +156,8 @@ one_large_one_small(DbName, Large, Small) -> add_doc(DbName, <<"doc1">>, Small, 0). -% one_large_attachment(DbName, Size, AttSize) -> -% add_doc(DbName, <<"doc0">>, Size, AttSize). +one_large_attachment(DbName, Size, AttSize) -> + add_doc(DbName, <<"doc0">>, Size, AttSize). add_doc(DbName, DocId, Size, AttSize) when is_binary(DocId) -> |