summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJan Lehnardt <jan@apache.org>2018-03-05 21:42:11 +0100
committerGitHub <noreply@github.com>2018-03-05 21:42:11 +0100
commitb3216354003089ba58d4c178ec33f68d2e5accf6 (patch)
treefdfac5e086a667dc44192e37dc60434a33ff3574
parentb884abc6f9762e4ffc65d85a0c655cb1ac0d917a (diff)
parent2c43e6201e1c67418ed2dc945ce04945e764da0a (diff)
downloadcouchdb-fix/compaction-daemon.tar.gz
Merge branch 'master' into fix/compaction-daemonfix/compaction-daemon
-rw-r--r--rel/overlay/etc/default.ini15
-rw-r--r--src/couch/src/couch_bt_engine.erl5
-rw-r--r--src/couch_replicator/include/couch_replicator_api_wrap.hrl (renamed from src/couch_replicator/src/couch_replicator_api_wrap.hrl)13
-rw-r--r--src/couch_replicator/src/couch_replicator.erl2
-rw-r--r--src/couch_replicator/src/couch_replicator_api_wrap.erl7
-rw-r--r--src/couch_replicator/src/couch_replicator_auth.erl99
-rw-r--r--src/couch_replicator/src/couch_replicator_auth_noop.erl52
-rw-r--r--src/couch_replicator/src/couch_replicator_auth_session.erl692
-rw-r--r--src/couch_replicator/src/couch_replicator_changes_reader.erl2
-rw-r--r--src/couch_replicator/src/couch_replicator_docs.erl33
-rw-r--r--src/couch_replicator/src/couch_replicator_httpc.erl59
-rw-r--r--src/couch_replicator/src/couch_replicator_ids.erl51
-rw-r--r--src/couch_replicator/src/couch_replicator_scheduler.erl2
-rw-r--r--src/couch_replicator/src/couch_replicator_scheduler_job.erl2
-rw-r--r--src/couch_replicator/src/couch_replicator_utils.erl90
-rw-r--r--src/couch_replicator/src/couch_replicator_worker.erl2
-rw-r--r--src/couch_replicator/test/couch_replicator_proxy_tests.erl2
-rw-r--r--src/couch_replicator/test/couch_replicator_small_max_request_size_target.erl28
18 files changed, 1017 insertions, 139 deletions
diff --git a/rel/overlay/etc/default.ini b/rel/overlay/etc/default.ini
index ec3e22314..9762536ac 100644
--- a/rel/overlay/etc/default.ini
+++ b/rel/overlay/etc/default.ini
@@ -430,6 +430,21 @@ ssl_certificate_max_depth = 3
; Re-check cluster state at least every cluster_quiet_period seconds
; cluster_quiet_period = 60
+; List of replicator client authentication plugins to try. Plugins will be
+; tried in order. The first to initialize successfully will be used for that
+; particular endpoint (source or target). Normally couch_replicator_auth_noop
+; would be used at the end of the list as a "catch-all". It doesn't do anything
+; and effectively implements the previous behavior of using basic auth.
+; There are currently two plugins available:
+; couch_replicator_auth_session - use _session cookie authentication
+; couch_replicator_auth_noop - use basic authentication (previous default)
+; Currently previous default behavior is still the default. To start using
+; session auth, use this as the list of plugins:
+; `couch_replicator_auth_session,couch_replicator_auth_noop`.
+; In a future release the session plugin might be used by default.
+;auth_plugins = couch_replicator_auth_noop
+
+
[compaction_daemon]
; The delay, in seconds, between each check for which database and view indexes
; need to be compacted.
diff --git a/src/couch/src/couch_bt_engine.erl b/src/couch/src/couch_bt_engine.erl
index 347c9318f..940689708 100644
--- a/src/couch/src/couch_bt_engine.erl
+++ b/src/couch/src/couch_bt_engine.erl
@@ -464,7 +464,10 @@ fold_docs(St, UserFun, UserAcc, Options) ->
fold_local_docs(St, UserFun, UserAcc, Options) ->
- fold_docs_int(St, St#st.local_tree, UserFun, UserAcc, Options).
+ case fold_docs_int(St, St#st.local_tree, UserFun, UserAcc, Options) of
+ {ok, _Reds, FinalAcc} -> {ok, null, FinalAcc};
+ {ok, FinalAcc} -> {ok, FinalAcc}
+ end.
fold_changes(St, SinceSeq, UserFun, UserAcc, Options) ->
diff --git a/src/couch_replicator/src/couch_replicator_api_wrap.hrl b/src/couch_replicator/include/couch_replicator_api_wrap.hrl
index d2e0fdff5..0f8213c51 100644
--- a/src/couch_replicator/src/couch_replicator_api_wrap.hrl
+++ b/src/couch_replicator/include/couch_replicator_api_wrap.hrl
@@ -14,7 +14,7 @@
-record(httpdb, {
url,
- oauth = nil,
+ auth_props = [],
headers = [
{"Accept", "application/json"},
{"User-Agent", "CouchDB-Replicator/" ++ couch_server:get_version()}
@@ -26,13 +26,6 @@
httpc_pool = nil,
http_connections,
first_error_timestamp = nil,
- proxy_url
-}).
-
--record(oauth, {
- consumer_key,
- token,
- token_secret,
- consumer_secret,
- signature_method
+ proxy_url,
+ auth_context = nil
}).
diff --git a/src/couch_replicator/src/couch_replicator.erl b/src/couch_replicator/src/couch_replicator.erl
index 8b7cd5cb1..39141c301 100644
--- a/src/couch_replicator/src/couch_replicator.erl
+++ b/src/couch_replicator/src/couch_replicator.erl
@@ -25,7 +25,7 @@
-include_lib("couch/include/couch_db.hrl").
-include("couch_replicator.hrl").
--include("couch_replicator_api_wrap.hrl").
+-include_lib("couch_replicator/include/couch_replicator_api_wrap.hrl").
-include_lib("couch_mrview/include/couch_mrview.hrl").
-include_lib("mem3/include/mem3.hrl").
diff --git a/src/couch_replicator/src/couch_replicator_api_wrap.erl b/src/couch_replicator/src/couch_replicator_api_wrap.erl
index b5ea57c3c..44c290d33 100644
--- a/src/couch_replicator/src/couch_replicator_api_wrap.erl
+++ b/src/couch_replicator/src/couch_replicator_api_wrap.erl
@@ -142,7 +142,8 @@ db_open(DbName, Options, Create, _CreateParams) ->
throw({unauthorized, DbName})
end.
-db_close(#httpdb{httpc_pool = Pool}) ->
+db_close(#httpdb{httpc_pool = Pool} = HttpDb) ->
+ couch_replicator_auth:cleanup(HttpDb),
unlink(Pool),
ok = couch_replicator_httpc_pool:stop(Pool);
db_close(DbName) ->
@@ -1009,7 +1010,7 @@ header_value(Key, Headers, Default) ->
normalize_db(#httpdb{} = HttpDb) ->
#httpdb{
url = HttpDb#httpdb.url,
- oauth = HttpDb#httpdb.oauth,
+ auth_props = lists:sort(HttpDb#httpdb.auth_props),
headers = lists:keysort(1, HttpDb#httpdb.headers),
timeout = HttpDb#httpdb.timeout,
ibrowse_options = lists:keysort(1, HttpDb#httpdb.ibrowse_options),
@@ -1037,7 +1038,7 @@ maybe_append_create_query_params(Db, CreateParams) ->
normalize_http_db_test() ->
HttpDb = #httpdb{
url = "http://host/db",
- oauth = #oauth{},
+ auth_props = [{"key", "val"}],
headers = [{"k2","v2"}, {"k1","v1"}],
timeout = 30000,
ibrowse_options = [{k2, v2}, {k1, v1}],
diff --git a/src/couch_replicator/src/couch_replicator_auth.erl b/src/couch_replicator/src/couch_replicator_auth.erl
new file mode 100644
index 000000000..1c9a49723
--- /dev/null
+++ b/src/couch_replicator/src/couch_replicator_auth.erl
@@ -0,0 +1,99 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_replicator_auth).
+
+
+-export([
+ initialize/1,
+ update_headers/2,
+ handle_response/3,
+ cleanup/1
+]).
+
+
+-include_lib("couch_replicator/include/couch_replicator_api_wrap.hrl").
+
+
+-type headers() :: [{string(), string()}].
+-type code() :: non_neg_integer().
+
+
+-define(DEFAULT_PLUGINS, "couch_replicator_auth_noop").
+
+
+% Behavior API
+
+-callback initialize(#httpdb{}) -> {ok, #httpdb{}, term()} | ignore.
+
+-callback update_headers(term(), headers()) -> {headers(), term()}.
+
+-callback handle_response(term(), code(), headers()) ->
+ {continue | retry, term()}.
+
+-callback cleanup(term()) -> ok.
+
+
+% Main API
+
+-spec initialize(#httpdb{}) -> {ok, #httpdb{}} | {error, term()}.
+initialize(#httpdb{auth_context = nil} = HttpDb) ->
+ case try_initialize(get_plugin_modules(), HttpDb) of
+ {ok, Mod, HttpDb1, Context} ->
+ {ok, HttpDb1#httpdb{auth_context = {Mod, Context}}};
+ {error, Error} ->
+ {error, Error}
+ end.
+
+
+-spec update_headers(#httpdb{}, headers()) -> {headers(), #httpdb{}}.
+update_headers(#httpdb{auth_context = {Mod, Context}} = HttpDb, Headers) ->
+ {Headers1, Context1} = Mod:update_headers(Context, Headers),
+ {Headers1, HttpDb#httpdb{auth_context = {Mod, Context1}}}.
+
+
+-spec handle_response(#httpdb{}, code(), headers()) ->
+ {continue | retry, term()}.
+handle_response(#httpdb{} = HttpDb, Code, Headers) ->
+ {Mod, Context} = HttpDb#httpdb.auth_context,
+ {Res, Context1} = Mod:handle_response(Context, Code, Headers),
+ {Res, HttpDb#httpdb{auth_context = {Mod, Context1}}}.
+
+
+-spec cleanup(#httpdb{}) -> #httpdb{}.
+cleanup(#httpdb{auth_context = {Module, Context}} = HttpDb) ->
+ ok = Module:cleanup(Context),
+ HttpDb#httpdb{auth_context = nil}.
+
+
+% Private helper functions
+
+-spec get_plugin_modules() -> [atom()].
+get_plugin_modules() ->
+ Plugins1 = config:get("replicator", "auth_plugins", ?DEFAULT_PLUGINS),
+ [list_to_atom(Plugin) || Plugin <- string:tokens(Plugins1, ",")].
+
+
+try_initialize([], _HttpDb) ->
+ {error, no_more_auth_plugins_left_to_try};
+try_initialize([Mod | Modules], HttpDb) ->
+ try Mod:initialize(HttpDb) of
+ {ok, HttpDb1, Context} ->
+ {ok, Mod, HttpDb1, Context};
+ ignore ->
+ try_initialize(Modules, HttpDb);
+ {error, Error} ->
+ {error, Error}
+ catch
+ error:undef ->
+ {error, {could_not_load_plugin_module, Mod}}
+ end.
diff --git a/src/couch_replicator/src/couch_replicator_auth_noop.erl b/src/couch_replicator/src/couch_replicator_auth_noop.erl
new file mode 100644
index 000000000..5dbf13335
--- /dev/null
+++ b/src/couch_replicator/src/couch_replicator_auth_noop.erl
@@ -0,0 +1,52 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_replicator_auth_noop).
+
+
+-behavior(couch_replicator_auth).
+
+
+-export([
+ initialize/1,
+ update_headers/2,
+ handle_response/3,
+ cleanup/1
+]).
+
+
+-include_lib("couch_replicator/include/couch_replicator_api_wrap.hrl").
+
+
+-type headers() :: [{string(), string()}].
+-type code() :: non_neg_integer().
+
+
+-spec initialize(#httpdb{}) -> {ok, #httpdb{}, term()} | ignore.
+initialize(#httpdb{} = HttpDb) ->
+ {ok, HttpDb, nil}.
+
+
+-spec update_headers(term(), headers()) -> {headers(), term()}.
+update_headers(Context, Headers) ->
+ {Headers, Context}.
+
+
+-spec handle_response(term(), code(), headers()) ->
+ {continue | retry, term()}.
+handle_response(Context, _Code, _Headers) ->
+ {continue, Context}.
+
+
+-spec cleanup(term()) -> ok.
+cleanup(_Context) ->
+ ok.
diff --git a/src/couch_replicator/src/couch_replicator_auth_session.erl b/src/couch_replicator/src/couch_replicator_auth_session.erl
new file mode 100644
index 000000000..3fff29572
--- /dev/null
+++ b/src/couch_replicator/src/couch_replicator_auth_session.erl
@@ -0,0 +1,692 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+
+% This is the replicator session auth plugin. It implements session based
+% authentication for the replicator. The only public API are the functions from
+% the couch_replicator_auth behaviour. Most of the logic and state is in the
+% gen_server. An instance of a gen_server could be spawned for the source and
+% target endpoints of each replication jobs.
+%
+% The workflow is roughly this:
+%
+% * On initialization, try to get a cookie in `refresh/1` If an error occurs,
+% the crash. If `_session` endpoint fails with a 404 (not found), return
+% `ignore` assuming session authentication is not support or we simply hit a
+% non-CouchDb server.
+%
+% * Before each request, auth framework calls `update_headers` API function.
+% Before updating the headers and returning, check if need to refresh again.
+% The check looks `next_refresh` time. If that time is set (not `infinity`)
+% and just expired, then obtain a new cookie, then update headers and
+% return.
+%
+% * After each request, auth framework calls `handle_response` function. If
+% request was successful check if a new cookie was sent by the server in the
+% `Set-Cookie` header. If it was then then that becomes the current cookie.
+%
+% * If last request has an auth failure, check if request used a stale cookie
+% In this case nothing is done, and the client is told to retry. Next time
+% it updates its headers befor the request it should pick up the latest
+% cookie.
+%
+% * If last request failed and cookie was the latest known cookie, schedule a
+% refresh and tell client to retry. However, if the cookie was just updated,
+% tell the client to continue such that it will handle the auth failure on
+% its own via a set of retries with exponential backoffs. This is it to
+% ensure if something goes wrong and one of the endpoints issues invalid
+% cookies, replicator won't be stuck in a busy loop refreshing them.
+
+
+-module(couch_replicator_auth_session).
+
+
+-behaviour(couch_replicator_auth).
+-behaviour(gen_server).
+
+
+-export([
+ initialize/1,
+ update_headers/2,
+ handle_response/3,
+ cleanup/1
+]).
+
+-export([
+ init/1,
+ terminate/2,
+ handle_call/3,
+ handle_cast/2,
+ handle_info/2,
+ code_change/3,
+ format_status/2
+]).
+
+
+-include_lib("ibrowse/include/ibrowse.hrl").
+-include_lib("couch_replicator/include/couch_replicator_api_wrap.hrl").
+
+
+-type headers() :: [{string(), string()}].
+-type code() :: non_neg_integer().
+-type creds() :: {string() | undefined, string() | undefined}.
+
+
+-define(MIN_UPDATE_INTERVAL, 5).
+
+
+-record(state, {
+ epoch = 0 :: non_neg_integer(),
+ cookie :: string() | undefined,
+ user :: string() | undefined,
+ pass :: string() | undefined,
+ httpdb_timeout :: integer(),
+ httpdb_pool :: pid(),
+ httpdb_ibrowse_options = [] :: list(),
+ session_url :: string(),
+ next_refresh = infinity :: infinity | non_neg_integer(),
+ refresh_tstamp = 0 :: non_neg_integer()
+}).
+
+
+% Behavior API callbacks
+
+-spec initialize(#httpdb{}) -> {ok, #httpdb{}, term()} | ignore.
+initialize(#httpdb{} = HttpDb) ->
+ case init_state(HttpDb) of
+ {ok, HttpDb1, State} ->
+ {ok, Pid} = gen_server:start_link(?MODULE, [State], []),
+ Epoch = State#state.epoch,
+ Timeout = State#state.httpdb_timeout,
+ {ok, HttpDb1, {Pid, Epoch, Timeout}};
+ {error, Error} ->
+ {error, Error};
+ ignore ->
+ ignore
+ end.
+
+
+-spec update_headers(term(), headers()) -> {headers(), term()}.
+update_headers({Pid, Epoch, Timeout}, Headers) ->
+ Args = {update_headers, Headers, Epoch},
+ {Headers1, Epoch1} = gen_server:call(Pid, Args, Timeout * 10),
+ {Headers1, {Pid, Epoch1, Timeout}}.
+
+
+-spec handle_response(term(), code(), headers()) ->
+ {continue | retry, term()}.
+handle_response({Pid, Epoch, Timeout}, Code, Headers) ->
+ Args = {handle_response, Code, Headers, Epoch},
+ {Retry, Epoch1} = gen_server:call(Pid, Args, Timeout * 10),
+ {Retry, {Pid, Epoch1, Timeout}}.
+
+
+-spec cleanup(term()) -> ok.
+cleanup({Pid, _Epoch, Timeout}) ->
+ gen_server:call(Pid, stop, Timeout * 10).
+
+
+%% gen_server functions
+
+init([#state{} = State]) ->
+ {ok, State}.
+
+
+terminate(_Reason, _State) ->
+ ok.
+
+
+handle_call({update_headers, Headers, _Epoch}, _From, State) ->
+ case maybe_refresh(State) of
+ {ok, State1} ->
+ Cookie = "AuthSession=" ++ State1#state.cookie,
+ Headers1 = [{"Cookie", Cookie} | Headers],
+ {reply, {Headers1, State1#state.epoch}, State1};
+ {error, Error} ->
+ LogMsg = "~p: Stopping session auth plugin because of error ~p",
+ couch_log:error(LogMsg, [?MODULE, Error]),
+ {stop, Error, State}
+ end;
+
+handle_call({handle_response, Code, Headers, Epoch}, _From, State) ->
+ {Retry, State1} = process_response(Code, Headers, Epoch, State),
+ {reply, {Retry, State1#state.epoch}, State1};
+
+handle_call(stop, _From, State) ->
+ {stop, normal, ok, State}.
+
+
+handle_cast(Msg, State) ->
+ couch_log:error("~p: Received un-expected cast ~p", [?MODULE, Msg]),
+ {noreply, State}.
+
+
+handle_info(Msg, State) ->
+ couch_log:error("~p : Received un-expected message ~p", [?MODULE, Msg]),
+ {noreply, State}.
+
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+
+format_status(_Opt, [_PDict, State]) ->
+ [
+ {epoch, State#state.epoch},
+ {user, State#state.user},
+ {session_url, State#state.session_url},
+ {refresh_tstamp, State#state.refresh_tstamp}
+ ].
+
+
+%% Private helper functions
+
+
+-spec init_state(#httpdb{}) ->
+ {ok, #httpdb{}, #state{}} | {error, term()} | ignore.
+init_state(#httpdb{} = HttpDb) ->
+ case extract_creds(HttpDb) of
+ {ok, User, Pass, HttpDb1} ->
+ State = #state{
+ user = User,
+ pass = Pass,
+ session_url = get_session_url(HttpDb1#httpdb.url),
+ httpdb_pool = HttpDb1#httpdb.httpc_pool,
+ httpdb_timeout = HttpDb1#httpdb.timeout,
+ httpdb_ibrowse_options = HttpDb1#httpdb.ibrowse_options
+ },
+ case refresh(State) of
+ {ok, State1} ->
+ {ok, HttpDb1, State1};
+ {error, {session_not_supported, _, _}} ->
+ ignore;
+ {error, Error} ->
+ {error, Error}
+ end;
+ {error, missing_credentials} ->
+ ignore;
+ {error, Error} ->
+ {error, Error}
+ end.
+
+
+-spec extract_creds(#httpdb{}) ->
+ {ok, string(), string(), #httpdb{}} | {error, term()}.
+extract_creds(#httpdb{url = Url, headers = Headers} = HttpDb) ->
+ {{HeadersUser, HeadersPass}, HeadersNoCreds} =
+ couch_replicator_utils:remove_basic_auth_from_headers(Headers),
+ case extract_creds_from_url(Url) of
+ {ok, UrlUser, UrlPass, UrlNoCreds} ->
+ case pick_creds({UrlUser, UrlPass}, {HeadersUser, HeadersPass}) of
+ {ok, User, Pass} ->
+ HttpDb1 = HttpDb#httpdb{
+ url = UrlNoCreds,
+ headers = HeadersNoCreds
+ },
+ {ok, User, Pass, HttpDb1};
+ {error, Error} ->
+ {error, Error}
+ end;
+ {error, Error} ->
+ {error, Error}
+ end.
+
+
+% Credentials could be specified in the url and/or in the headers.
+% * If no credentials specified return error.
+% * If specified in url but not in headers, pick url creds.
+% * Otherwise pick headers creds.
+%
+-spec pick_creds(creds(), creds()) ->
+ {ok, string(), string()} | {error, missing_credentials}.
+pick_creds({undefined, _}, {undefined, _}) ->
+ {error, missing_credentials};
+pick_creds({UrlUser, UrlPass}, {undefined, _}) ->
+ {ok, UrlUser, UrlPass};
+pick_creds({_, _}, {HeadersUser, HeadersPass}) ->
+ {ok, HeadersUser, HeadersPass}.
+
+
+-spec extract_creds_from_url(string()) ->
+ {ok, string() | undefined, string() | undefined, string()} |
+ {error, term()}.
+extract_creds_from_url(Url) ->
+ case ibrowse_lib:parse_url(Url) of
+ {error, Error} ->
+ {error, Error};
+ #url{username = undefined, password = undefined} ->
+ {ok, undefined, undefined, Url};
+ #url{protocol = Proto, username = User, password = Pass} ->
+ % Excise user and pass parts from the url. Try to keep the host,
+ % port and path as they were in the original.
+ Prefix = lists:concat([Proto, "://", User, ":", Pass, "@"]),
+ Suffix = lists:sublist(Url, length(Prefix) + 1, length(Url) + 1),
+ NoCreds = lists:concat([Proto, "://", Suffix]),
+ {ok, User, Pass, NoCreds}
+ end.
+
+
+-spec process_response(non_neg_integer(), headers(),
+ non_neg_integer(), #state{}) -> {retry | continue, #state{}}.
+process_response(403, _Headers, Epoch, State) ->
+ process_auth_failure(Epoch, State);
+process_response(401, _Headers, Epoch, State) ->
+ process_auth_failure(Epoch, State);
+process_response(Code, Headers, _Epoch, State) when Code >= 200, Code < 300 ->
+ % If server noticed cookie is about to time out it can send a new cookie in
+ % the response headers. Take advantage of that and refresh the cookie.
+ State1 = case maybe_update_cookie(Headers, State) of
+ {ok, UpdatedState} ->
+ UpdatedState;
+ {error, cookie_not_found} ->
+ State;
+ {error, Other} ->
+ LogMsg = "~p : Could not parse cookie from response headers ~p",
+ couch_log:error(LogMsg, [?MODULE, Other]),
+ State
+ end,
+ {continue, State1};
+process_response(_Code, _Headers, _Epoch, State) ->
+ {continue, State}.
+
+
+-spec process_auth_failure(non_neg_integer(), #state{}) ->
+ {retry | continue, #state{}}.
+process_auth_failure(Epoch, #state{epoch = StateEpoch} = State)
+ when StateEpoch > Epoch ->
+ % This request used an outdated cookie, tell it to immediately retry
+ % and it will pick up the current cookie when its headers are updated
+ {retry, State};
+process_auth_failure(Epoch, #state{epoch = Epoch} = State) ->
+ MinInterval = min_update_interval(),
+ case cookie_age_sec(State, now_sec()) of
+ AgeSec when AgeSec < MinInterval ->
+ % A recently acquired cookie failed. Schedule a refresh and
+ % return `continue` to let httpc's retry apply a backoff
+ {continue, schedule_refresh(now_sec() + MinInterval, State)};
+ _AgeSec ->
+ % Current cookie failed auth. Schedule refresh and ask
+ % httpc to retry the request.
+ {retry, schedule_refresh(now_sec(), State)}
+ end.
+
+
+-spec get_session_url(string()) -> string().
+get_session_url(Url) ->
+ #url{
+ protocol = Proto,
+ host = Host,
+ port = Port
+ } = ibrowse_lib:parse_url(Url),
+ WithPort = lists:concat([Proto, "://", Host, ":", Port]),
+ case lists:prefix(WithPort, Url) of
+ true ->
+ % Explicit port specified in the original url
+ WithPort ++ "/_session";
+ false ->
+ % Implicit proto default port was used
+ lists:concat([Proto, "://", Host, "/_session"])
+ end.
+
+
+-spec schedule_refresh(non_neg_integer(), #state{}) -> #state{}.
+schedule_refresh(T, #state{next_refresh = Tc} = State) when T < Tc ->
+ State#state{next_refresh = T};
+schedule_refresh(_, #state{} = State) ->
+ State.
+
+
+-spec maybe_refresh(#state{}) -> {ok, #state{}} | {error, term()}.
+maybe_refresh(#state{next_refresh = T} = State) ->
+ case now_sec() >= T of
+ true ->
+ refresh(State#state{next_refresh = infinity});
+ false ->
+ {ok, State}
+ end.
+
+
+-spec refresh(#state{}) -> {ok, #state{}} | {error, term()}.
+refresh(#state{session_url = Url, user = User, pass = Pass} = State) ->
+ Body = mochiweb_util:urlencode([{name, User}, {password, Pass}]),
+ Headers = [{"Content-Type", "application/x-www-form-urlencoded"}],
+ Result = http_request(State, Url, Headers, post, Body),
+ http_response(Result, State).
+
+
+-spec http_request(#state{}, string(), headers(), atom(), iolist()) ->
+ {ok, string(), headers(), binary()} | {error, term()}.
+http_request(#state{httpdb_pool = Pool} = State, Url, Headers, Method, Body) ->
+ Timeout = State#state.httpdb_timeout,
+ Opts = [
+ {response_format, binary},
+ {inactivity_timeout, Timeout}
+ | State#state.httpdb_ibrowse_options
+ ],
+ {ok, Wrk} = couch_replicator_httpc_pool:get_worker(Pool),
+ try
+ ibrowse:send_req_direct(Wrk, Url, Headers, Method, Body, Opts, Timeout)
+ after
+ ok = couch_replicator_httpc_pool:release_worker(Pool, Wrk)
+ end.
+
+
+-spec http_response({ok, string(), headers(), binary()} | {error, term()},
+ #state{}) -> {ok, #state{}} | {error, term()}.
+http_response({ok, "200", Headers, _}, State) ->
+ maybe_update_cookie(Headers, State);
+http_response({ok, "401", _, _}, #state{session_url = Url, user = User}) ->
+ {error, {session_request_unauthorized, Url, User}};
+http_response({ok, "403", _, _}, #state{session_url = Url, user = User}) ->
+ {error, {session_request_forbidden, Url, User}};
+http_response({ok, "404", _, _}, #state{session_url = Url, user = User}) ->
+ {error, {session_not_supported, Url, User}};
+http_response({ok, Code, _, _}, #state{session_url = Url, user = User}) ->
+ {error, {session_unexpected_result, Code, Url, User}};
+http_response({error, Error}, #state{session_url = Url, user = User}) ->
+ {error, {session_request_failed, Url, User, Error}}.
+
+
+-spec parse_cookie(list()) -> {ok, string()} | {error, term()}.
+parse_cookie(Headers0) ->
+ Headers = mochiweb_headers:make(Headers0),
+ case mochiweb_headers:get_value("Set-Cookie", Headers) of
+ undefined ->
+ {error, cookie_not_found};
+ CookieHeader ->
+ CookieKVs = mochiweb_cookies:parse_cookie(CookieHeader),
+ CaseInsKVs = mochiweb_headers:make(CookieKVs),
+ case mochiweb_headers:get_value("AuthSession", CaseInsKVs) of
+ undefined ->
+ {error, cookie_format_invalid};
+ Cookie ->
+ {ok, Cookie}
+ end
+ end.
+
+
+-spec maybe_update_cookie(headers(), #state{}) ->
+ {ok, string()} | {error, term()}.
+maybe_update_cookie(ResponseHeaders, State) ->
+ case parse_cookie(ResponseHeaders) of
+ {ok, Cookie} ->
+ {ok, update_cookie(State, Cookie, now_sec())};
+ {error, Error} ->
+ {error, Error}
+ end.
+
+
+-spec update_cookie(#state{}, string(), non_neg_integer()) -> #state{}.
+update_cookie(#state{cookie = Cookie} = State, Cookie, _) ->
+ State;
+update_cookie(#state{epoch = Epoch} = State, Cookie, NowSec) ->
+ State#state{
+ epoch = Epoch + 1,
+ cookie = Cookie,
+ refresh_tstamp = NowSec
+ }.
+
+
+-spec cookie_age_sec(#state{}, non_neg_integer()) -> non_neg_integer().
+cookie_age_sec(#state{refresh_tstamp = RefreshTs}, Now) ->
+ max(0, Now - RefreshTs).
+
+
+-spec now_sec() -> non_neg_integer().
+now_sec() ->
+ {Mega, Sec, _Micro} = os:timestamp(),
+ Mega * 1000000 + Sec.
+
+
+-spec min_update_interval() -> non_neg_integer().
+min_update_interval() ->
+ config:get_integer("replicator", "session_min_update_interval",
+ ?MIN_UPDATE_INTERVAL).
+
+
+-ifdef(TEST).
+
+-include_lib("eunit/include/eunit.hrl").
+
+
+get_session_url_test_() ->
+ [?_assertEqual(SessionUrl, get_session_url(Url)) || {Url, SessionUrl} <- [
+ {"http://host/db", "http://host/_session"},
+ {"http://127.0.0.1/db", "http://127.0.0.1/_session"},
+ {"http://host/x/y/z", "http://host/_session"},
+ {"http://host:5984/db", "http://host:5984/_session"},
+ {"https://host/db?q=1", "https://host/_session"}
+ ]].
+
+
+extract_creds_success_test_() ->
+ DefaultHeaders = (#httpdb{})#httpdb.headers,
+ [?_assertEqual({ok, User, Pass, HttpDb2}, extract_creds(HttpDb1)) ||
+ {HttpDb1, {User, Pass, HttpDb2}} <- [
+ {
+ #httpdb{url = "http://u:p@x.y/db"},
+ {"u", "p", #httpdb{url = "http://x.y/db"}}
+ },
+ {
+ #httpdb{url = "http://u:p@h:80/db"},
+ {"u", "p", #httpdb{url = "http://h:80/db"}}
+ },
+ {
+ #httpdb{url = "https://u:p@h/db"},
+ {"u", "p", #httpdb{url = "https://h/db"}}
+ },
+ {
+ #httpdb{url = "http://u:p@127.0.0.1:5984/db"},
+ {"u", "p", #httpdb{url = "http://127.0.0.1:5984/db"}}
+ },
+ {
+ #httpdb{url = "http://u:p@[2001:db8:a1b:12f9::1]/db"},
+ {"u", "p", #httpdb{url = "http://[2001:db8:a1b:12f9::1]/db"}}
+ },
+ {
+ #httpdb{url = "http://u:p@[2001:db8:a1b:12f9::1]:81/db"},
+ {"u", "p", #httpdb{url = "http://[2001:db8:a1b:12f9::1]:81/db"}}
+ },
+ {
+ #httpdb{url = "http://u:p@x.y/db/other?query=Z&query=w"},
+ {"u", "p", #httpdb{url = "http://x.y/db/other?query=Z&query=w"}}
+ },
+ {
+ #httpdb{
+ url = "http://h/db",
+ headers = DefaultHeaders ++ [
+ {"Authorization", "Basic " ++ b64creds("u", "p")}
+ ]
+ },
+ {"u", "p", #httpdb{url = "http://h/db"}}
+ },
+ {
+ #httpdb{
+ url = "http://h/db",
+ headers = DefaultHeaders ++ [
+ {"aUthoriZation", "bASIC " ++ b64creds("U", "p")}
+ ]
+ },
+ {"U", "p", #httpdb{url = "http://h/db"}}
+ },
+ {
+ #httpdb{
+ url = "http://u1:p1@h/db",
+ headers = DefaultHeaders ++ [
+ {"Authorization", "Basic " ++ b64creds("u2", "p2")}
+ ]
+ },
+ {"u2", "p2", #httpdb{url = "http://h/db"}}
+ }
+ ]].
+
+
+cookie_update_test_() ->
+ {
+ foreach,
+ fun setup/0,
+ fun teardown/1,
+ [
+ t_do_refresh(),
+ t_dont_refresh(),
+ t_process_auth_failure(),
+ t_process_auth_failure_stale_epoch(),
+ t_process_auth_failure_too_frequent(),
+ t_process_ok_update_cookie(),
+ t_process_ok_no_cookie(),
+ t_init_state_fails_on_401(),
+ t_init_state_404(),
+ t_init_state_no_creds(),
+ t_init_state_http_error()
+ ]
+ }.
+
+
+t_do_refresh() ->
+ ?_test(begin
+ State = #state{next_refresh = 0},
+ {ok, State1} = maybe_refresh(State),
+ ?assertMatch(#state{
+ next_refresh = infinity,
+ epoch = 1,
+ cookie = "Abc"
+ }, State1)
+ end).
+
+
+t_dont_refresh() ->
+ ?_test(begin
+ State = #state{next_refresh = now_sec() + 100},
+ {ok, State1} = maybe_refresh(State),
+ ?assertMatch(State, State1),
+ State2 = #state{next_refresh = infinity},
+ {ok, State3} = maybe_refresh(State2),
+ ?assertMatch(State2, State3)
+ end).
+
+
+t_process_auth_failure() ->
+ ?_test(begin
+ State = #state{epoch = 1, refresh_tstamp = 0},
+ {retry, State1} = process_auth_failure(1, State),
+ NextRefresh = State1#state.next_refresh,
+ ?assert(NextRefresh =< now_sec())
+ end).
+
+
+t_process_auth_failure_stale_epoch() ->
+ ?_test(begin
+ State = #state{epoch = 3},
+ ?assertMatch({retry, State}, process_auth_failure(2, State))
+ end).
+
+
+t_process_auth_failure_too_frequent() ->
+ ?_test(begin
+ State = #state{epoch = 4, refresh_tstamp = now_sec()},
+ ?assertMatch({continue, _}, process_auth_failure(4, State))
+ end).
+
+
+t_process_ok_update_cookie() ->
+ ?_test(begin
+ Headers = [{"set-CookiE", "AuthSession=xyz; Path=/;"}, {"X", "y"}],
+ Res = process_response(200, Headers, 1, #state{}),
+ ?assertMatch({continue, #state{cookie = "xyz", epoch = 1}}, Res),
+ State = #state{cookie = "xyz", refresh_tstamp = 42, epoch = 2},
+ Res2 = process_response(200, Headers, 1, State),
+ ?assertMatch({continue, #state{cookie = "xyz", epoch = 2}}, Res2)
+ end).
+
+
+t_process_ok_no_cookie() ->
+ ?_test(begin
+ Headers = [{"X", "y"}],
+ State = #state{cookie = "old", epoch = 3, refresh_tstamp = 42},
+ Res = process_response(200, Headers, 1, State),
+ ?assertMatch({continue, State}, Res)
+ end).
+
+
+t_init_state_fails_on_401() ->
+ ?_test(begin
+ mock_http_401_response(),
+ {error, Error} = init_state(#httpdb{url = "http://u:p@h"}),
+ SessionUrl = "http://h/_session",
+ ?assertEqual({session_request_unauthorized, SessionUrl, "u"}, Error)
+ end).
+
+
+t_init_state_404() ->
+ ?_test(begin
+ mock_http_404_response(),
+ ?assertEqual(ignore, init_state(#httpdb{url = "http://u:p@h"}))
+ end).
+
+
+t_init_state_no_creds() ->
+ ?_test(begin
+ ?_assertEqual(ignore, init_state(#httpdb{url = "http://h"}))
+ end).
+
+
+t_init_state_http_error() ->
+ ?_test(begin
+ mock_http_error_response(),
+ {error, Error} = init_state(#httpdb{url = "http://u:p@h"}),
+ SessionUrl = "http://h/_session",
+ ?assertEqual({session_request_failed, SessionUrl, "u", x}, Error)
+ end).
+
+
+setup() ->
+ meck:expect(couch_replicator_httpc_pool, get_worker, 1, {ok, worker}),
+ meck:expect(couch_replicator_httpc_pool, release_worker, 2, ok),
+ meck:expect(config, get, fun(_, _, Default) -> Default end),
+ mock_http_cookie_response("Abc"),
+ ok.
+
+
+teardown(_) ->
+ meck:unload().
+
+
+mock_http_cookie_response(Cookie) ->
+ Resp = {ok, "200", [{"Set-Cookie", "AuthSession=" ++ Cookie}], []},
+ meck:expect(ibrowse, send_req_direct, 7, Resp).
+
+
+mock_http_401_response() ->
+ meck:expect(ibrowse, send_req_direct, 7, {ok, "401", [], []}).
+
+
+mock_http_404_response() ->
+ meck:expect(ibrowse, send_req_direct, 7, {ok, "404", [], []}).
+
+
+mock_http_error_response() ->
+ meck:expect(ibrowse, send_req_direct, 7, {error, x}).
+
+
+extract_creds_error_test_() ->
+ [?_assertMatch({error, Error}, extract_creds(HttpDb)) ||
+ {HttpDb, Error} <- [
+ {#httpdb{url = "some_junk"}, invalid_uri},
+ {#httpdb{url = "http://h/db"}, missing_credentials}
+ ]].
+
+
+b64creds(User, Pass) ->
+ base64:encode_to_string(User ++ ":" ++ Pass).
+
+
+-endif.
diff --git a/src/couch_replicator/src/couch_replicator_changes_reader.erl b/src/couch_replicator/src/couch_replicator_changes_reader.erl
index 3659d9592..2e4df5365 100644
--- a/src/couch_replicator/src/couch_replicator_changes_reader.erl
+++ b/src/couch_replicator/src/couch_replicator_changes_reader.erl
@@ -19,7 +19,7 @@
-export([read_changes/5]).
-include_lib("couch/include/couch_db.hrl").
--include("couch_replicator_api_wrap.hrl").
+-include_lib("couch_replicator/include/couch_replicator_api_wrap.hrl").
-include("couch_replicator.hrl").
-import(couch_util, [
diff --git a/src/couch_replicator/src/couch_replicator_docs.erl b/src/couch_replicator/src/couch_replicator_docs.erl
index 1fe91eca4..62d21fe12 100644
--- a/src/couch_replicator/src/couch_replicator_docs.erl
+++ b/src/couch_replicator/src/couch_replicator_docs.erl
@@ -35,7 +35,7 @@
-include_lib("couch/include/couch_db.hrl").
-include_lib("ibrowse/include/ibrowse.hrl").
-include_lib("mem3/include/mem3.hrl").
--include("couch_replicator_api_wrap.hrl").
+-include_lib("couch_replicator/include/couch_replicator_api_wrap.hrl").
-include("couch_replicator.hrl").
-include("couch_replicator_js_functions.hrl").
@@ -396,28 +396,9 @@ parse_rep_db({Props}, Proxy, Options) ->
{BinHeaders} = get_value(<<"headers">>, Props, {[]}),
Headers = lists:ukeysort(1, [{?b2l(K), ?b2l(V)} || {K, V} <- BinHeaders]),
DefaultHeaders = (#httpdb{})#httpdb.headers,
- OAuth = case get_value(<<"oauth">>, AuthProps) of
- undefined ->
- nil;
- {OauthProps} ->
- #oauth{
- consumer_key = ?b2l(get_value(<<"consumer_key">>, OauthProps)),
- token = ?b2l(get_value(<<"token">>, OauthProps)),
- token_secret = ?b2l(get_value(<<"token_secret">>, OauthProps)),
- consumer_secret = ?b2l(get_value(<<"consumer_secret">>,
- OauthProps)),
- signature_method =
- case get_value(<<"signature_method">>, OauthProps) of
- undefined -> hmac_sha1;
- <<"PLAINTEXT">> -> plaintext;
- <<"HMAC-SHA1">> -> hmac_sha1;
- <<"RSA-SHA1">> -> rsa_sha1
- end
- }
- end,
#httpdb{
url = Url,
- oauth = OAuth,
+ auth_props = AuthProps,
headers = lists:ukeymerge(1, Headers, DefaultHeaders),
ibrowse_options = lists:keysort(1,
[{socket_options, get_value(socket_options, Options)} |
@@ -695,8 +676,7 @@ strip_credentials(Url) when is_binary(Url) ->
"http\\1://\\2",
[{return, binary}]);
strip_credentials({Props}) ->
- Props1 = lists:keydelete(<<"oauth">>, 1, Props),
- {lists:keydelete(<<"headers">>, 1, Props1)}.
+ {lists:keydelete(<<"headers">>, 1, Props)}.
error_reason({shutdown, Error}) ->
@@ -774,10 +754,6 @@ check_strip_credentials_test() ->
},
{
{[{<<"_id">>, <<"foo">>}]},
- {[{<<"_id">>, <<"foo">>}, {<<"oauth">>, <<"bar">>}]}
- },
- {
- {[{<<"_id">>, <<"foo">>}]},
{[{<<"_id">>, <<"foo">>}, {<<"headers">>, <<"bar">>}]}
},
{
@@ -786,8 +762,7 @@ check_strip_credentials_test() ->
},
{
{[{<<"_id">>, <<"foo">>}]},
- {[{<<"_id">>, <<"foo">>}, {<<"oauth">>, <<"bar">>},
- {<<"headers">>, <<"baz">>}]}
+ {[{<<"_id">>, <<"foo">>}, {<<"headers">>, <<"baz">>}]}
}
]].
diff --git a/src/couch_replicator/src/couch_replicator_httpc.erl b/src/couch_replicator/src/couch_replicator_httpc.erl
index 45472f431..6e787514b 100644
--- a/src/couch_replicator/src/couch_replicator_httpc.erl
+++ b/src/couch_replicator/src/couch_replicator_httpc.erl
@@ -14,7 +14,7 @@
-include_lib("couch/include/couch_db.hrl").
-include_lib("ibrowse/include/ibrowse.hrl").
--include("couch_replicator_api_wrap.hrl").
+-include_lib("couch_replicator/include/couch_replicator_api_wrap.hrl").
-export([setup/1]).
-export([send_req/3]).
@@ -51,8 +51,17 @@ setup(Db) ->
undefined -> Url;
_ when is_list(ProxyURL) -> ProxyURL
end,
- {ok, Pid} = couch_replicator_httpc_pool:start_link(HttpcURL, [{max_connections, MaxConns}]),
- {ok, Db#httpdb{httpc_pool = Pid}}.
+ {ok, Pid} = couch_replicator_httpc_pool:start_link(HttpcURL,
+ [{max_connections, MaxConns}]),
+ case couch_replicator_auth:initialize(Db#httpdb{httpc_pool = Pid}) of
+ {ok, Db1} ->
+ {ok, Db1};
+ {error, Error} ->
+ LogMsg = "~p: auth plugin initialization failed ~p ~p",
+ LogUrl = couch_util:url_strip_password(Url),
+ couch_log:error(LogMsg, [?MODULE, LogUrl, Error]),
+ throw({replication_auth_error, Error})
+ end.
send_req(HttpDb, Params1, Callback) ->
@@ -86,11 +95,11 @@ send_req(HttpDb, Params1, Callback) ->
end.
-send_ibrowse_req(#httpdb{headers = BaseHeaders} = HttpDb, Params) ->
+send_ibrowse_req(#httpdb{headers = BaseHeaders} = HttpDb0, Params) ->
Method = get_value(method, Params, get),
UserHeaders = lists:keysort(1, get_value(headers, Params, [])),
Headers1 = lists:ukeymerge(1, UserHeaders, BaseHeaders),
- Headers2 = oauth_header(HttpDb, Params) ++ Headers1,
+ {Headers2, HttpDb} = couch_replicator_auth:update_headers(HttpDb0, Headers1),
Url = full_url(HttpDb, Params),
Body = get_value(body, Params, []),
case get_value(path, Params) == "_changes" of
@@ -157,6 +166,7 @@ process_response({ok, Code, Headers, Body}, Worker, HttpDb, Params, Callback) ->
Json ->
?JSON_DECODE(Json)
end,
+ process_auth_response(HttpDb, Ok, Headers, Params),
Callback(Ok, Headers, EJson);
R when R =:= 301 ; R =:= 302 ; R =:= 303 ->
backoff_success(HttpDb, Params),
@@ -179,8 +189,9 @@ process_stream_response(ReqId, Worker, HttpDb, Params, Callback) ->
backoff(HttpDb#httpdb{timeout = Timeout}, Params);
Ok when (Ok >= 200 andalso Ok < 300) ; (Ok >= 400 andalso Ok < 500) ->
backoff_success(HttpDb, Params),
+ HttpDb1 = process_auth_response(HttpDb, Ok, Headers, Params),
StreamDataFun = fun() ->
- stream_data_self(HttpDb, Params, Worker, ReqId, Callback)
+ stream_data_self(HttpDb1, Params, Worker, ReqId, Callback)
end,
put(?STREAM_STATUS, {streaming, Worker}),
ibrowse:stream_next(ReqId),
@@ -190,9 +201,9 @@ process_stream_response(ReqId, Worker, HttpDb, Params, Callback) ->
catch
throw:{maybe_retry_req, connection_closed} ->
maybe_retry({connection_closed, mid_stream},
- Worker, HttpDb, Params);
+ Worker, HttpDb1, Params);
throw:{maybe_retry_req, Err} ->
- maybe_retry(Err, Worker, HttpDb, Params)
+ maybe_retry(Err, Worker, HttpDb1, Params)
end;
R when R =:= 301 ; R =:= 302 ; R =:= 303 ->
backoff_success(HttpDb, Params),
@@ -216,6 +227,16 @@ process_stream_response(ReqId, Worker, HttpDb, Params, Callback) ->
end.
+process_auth_response(HttpDb, Code, Headers, Params) ->
+ case couch_replicator_auth:handle_response(HttpDb, Code, Headers) of
+ {continue, HttpDb1} ->
+ HttpDb1;
+ {retry, HttpDb1} ->
+ log_retry_error(Params, HttpDb1, 0, Code),
+ throw({retry, HttpDb1, Params})
+ end.
+
+
% Only streaming HTTP requests send messages back from
% the ibrowse worker process. We can detect that based
% on the ibrowse_req_id format. This just drops all
@@ -397,28 +418,6 @@ query_args_to_string([{K, V} | Rest], Acc) ->
query_args_to_string(Rest, [K ++ "=" ++ couch_httpd:quote(V) | Acc]).
-oauth_header(#httpdb{oauth = nil}, _ConnParams) ->
- [];
-oauth_header(#httpdb{url = BaseUrl, oauth = OAuth}, ConnParams) ->
- Consumer = {
- OAuth#oauth.consumer_key,
- OAuth#oauth.consumer_secret,
- OAuth#oauth.signature_method
- },
- Method = case get_value(method, ConnParams, get) of
- get -> "GET";
- post -> "POST";
- put -> "PUT";
- head -> "HEAD"
- end,
- QSL = get_value(qs, ConnParams, []),
- OAuthParams = oauth:sign(Method,
- BaseUrl ++ get_value(path, ConnParams, []),
- QSL, Consumer, OAuth#oauth.token, OAuth#oauth.token_secret) -- QSL,
- [{"Authorization",
- "OAuth " ++ oauth:header_params_encode(OAuthParams)}].
-
-
do_redirect(_Worker, Code, Headers, #httpdb{url = Url} = HttpDb, Params, _Cb) ->
RedirectUrl = redirect_url(Headers, Url),
{HttpDb2, Params2} = after_redirect(RedirectUrl, Code, HttpDb, Params),
diff --git a/src/couch_replicator/src/couch_replicator_ids.erl b/src/couch_replicator/src/couch_replicator_ids.erl
index e7067622b..e8faf8ea3 100644
--- a/src/couch_replicator/src/couch_replicator_ids.erl
+++ b/src/couch_replicator/src/couch_replicator_ids.erl
@@ -21,7 +21,7 @@
-include_lib("ibrowse/include/ibrowse.hrl").
-include_lib("couch/include/couch_db.hrl").
--include("couch_replicator_api_wrap.hrl").
+-include_lib("couch_replicator/include/couch_replicator_api_wrap.hrl").
-include("couch_replicator.hrl").
% replication_id/1 and replication_id/2 will attempt to fetch
@@ -127,62 +127,25 @@ maybe_append_options(Options, RepOptions) ->
end, [], Options).
-get_rep_endpoint(_UserCtx, #httpdb{url=Url, headers=Headers, oauth=OAuth}) ->
+get_rep_endpoint(_UserCtx, #httpdb{url=Url, headers=Headers}) ->
DefaultHeaders = (#httpdb{})#httpdb.headers,
- case OAuth of
- nil ->
- {remote, Url, Headers -- DefaultHeaders};
- #oauth{} ->
- {remote, Url, Headers -- DefaultHeaders, OAuth}
- end;
+ {remote, Url, Headers -- DefaultHeaders};
get_rep_endpoint(UserCtx, <<DbName/binary>>) ->
{local, DbName, UserCtx}.
get_v4_endpoint(UserCtx, #httpdb{} = HttpDb) ->
- {Url, Headers, OAuth} = case get_rep_endpoint(UserCtx, HttpDb) of
- {remote, U, Hds} ->
- {U, Hds, undefined};
- {remote, U, Hds, OA} ->
- {U, Hds, OA}
- end,
- {UserFromHeaders, HeadersWithoutBasicAuth} = remove_basic_auth(Headers),
+ {remote, Url, Headers} = get_rep_endpoint(UserCtx, HttpDb),
+ {{UserFromHeaders, _}, HeadersWithoutBasicAuth} =
+ couch_replicator_utils:remove_basic_auth_from_headers(Headers),
{UserFromUrl, Host, NonDefaultPort, Path} = get_v4_url_info(Url),
User = pick_defined_value([UserFromUrl, UserFromHeaders]),
+ OAuth = undefined, % Keep this to ensure checkpoints don't change
{remote, User, Host, NonDefaultPort, Path, HeadersWithoutBasicAuth, OAuth};
get_v4_endpoint(UserCtx, <<DbName/binary>>) ->
{local, DbName, UserCtx}.
-remove_basic_auth(Headers) ->
- case lists:partition(fun is_basic_auth/1, Headers) of
- {[], HeadersWithoutBasicAuth} ->
- {undefined, HeadersWithoutBasicAuth};
- {[{_, "Basic " ++ Base64} | _], HeadersWithoutBasicAuth} ->
- User = get_basic_auth_user(Base64),
- {User, HeadersWithoutBasicAuth}
- end.
-
-
-is_basic_auth({"Authorization", "Basic " ++ _Base64}) ->
- true;
-is_basic_auth(_) ->
- false.
-
-
-get_basic_auth_user(Base64) ->
- try re:split(base64:decode(Base64), ":", [{return, list}, {parts, 2}]) of
- [User, _Pass] ->
- User;
- _ ->
- undefined
- catch
- % Tolerate invalid B64 values here to avoid crashing replicator
- error:function_clause ->
- undefined
- end.
-
-
pick_defined_value(Values) ->
case [V || V <- Values, V /= undefined] of
[] ->
diff --git a/src/couch_replicator/src/couch_replicator_scheduler.erl b/src/couch_replicator/src/couch_replicator_scheduler.erl
index be956b6a7..0b396346a 100644
--- a/src/couch_replicator/src/couch_replicator_scheduler.erl
+++ b/src/couch_replicator/src/couch_replicator_scheduler.erl
@@ -56,7 +56,7 @@
-include("couch_replicator_scheduler.hrl").
-include("couch_replicator.hrl").
--include("couch_replicator_api_wrap.hrl").
+-include_lib("couch_replicator/include/couch_replicator_api_wrap.hrl").
-include_lib("couch/include/couch_db.hrl").
%% types
diff --git a/src/couch_replicator/src/couch_replicator_scheduler_job.erl b/src/couch_replicator/src/couch_replicator_scheduler_job.erl
index 0438249be..1467d9f30 100644
--- a/src/couch_replicator/src/couch_replicator_scheduler_job.erl
+++ b/src/couch_replicator/src/couch_replicator_scheduler_job.erl
@@ -29,7 +29,7 @@
]).
-include_lib("couch/include/couch_db.hrl").
--include("couch_replicator_api_wrap.hrl").
+-include_lib("couch_replicator/include/couch_replicator_api_wrap.hrl").
-include("couch_replicator_scheduler.hrl").
-include("couch_replicator.hrl").
diff --git a/src/couch_replicator/src/couch_replicator_utils.erl b/src/couch_replicator/src/couch_replicator_utils.erl
index 01881e423..218fcf501 100644
--- a/src/couch_replicator/src/couch_replicator_utils.erl
+++ b/src/couch_replicator/src/couch_replicator_utils.erl
@@ -27,7 +27,8 @@
get_json_value/3,
pp_rep_id/1,
iso8601/1,
- filter_state/3
+ filter_state/3,
+ remove_basic_auth_from_headers/1
]).
-export([
@@ -36,7 +37,7 @@
-include_lib("couch/include/couch_db.hrl").
-include("couch_replicator.hrl").
--include("couch_replicator_api_wrap.hrl").
+-include_lib("couch_replicator/include/couch_replicator_api_wrap.hrl").
-import(couch_util, [
get_value/2,
@@ -174,3 +175,88 @@ filter_state(State, States, Info) ->
false ->
skip
end.
+
+
+remove_basic_auth_from_headers(Headers) ->
+ Headers1 = mochiweb_headers:make(Headers),
+ case mochiweb_headers:get_value("Authorization", Headers1) of
+ undefined ->
+ {{undefined, undefined}, Headers};
+ Auth ->
+ {Basic, Base64} = lists:splitwith(fun(X) -> X =/= $\s end, Auth),
+ maybe_remove_basic_auth(string:to_lower(Basic), Base64, Headers1)
+ end.
+
+
+maybe_remove_basic_auth("basic", " " ++ Base64, Headers) ->
+ Headers1 = mochiweb_headers:delete_any("Authorization", Headers),
+ {decode_basic_creds(Base64), mochiweb_headers:to_list(Headers1)};
+maybe_remove_basic_auth(_, _, Headers) ->
+ {{undefined, undefined}, mochiweb_headers:to_list(Headers)}.
+
+
+decode_basic_creds(Base64) ->
+ try re:split(base64:decode(Base64), ":", [{return, list}, {parts, 2}]) of
+ [User, Pass] ->
+ {User, Pass};
+ _ ->
+ {undefined, undefined}
+ catch
+ % Tolerate invalid B64 values here to avoid crashing replicator
+ error:function_clause ->
+ {undefined, undefined}
+ end.
+
+
+-ifdef(TEST).
+
+-include_lib("eunit/include/eunit.hrl").
+
+remove_basic_auth_from_headers_test_() ->
+ [?_assertMatch({{User, Pass}, NoAuthHeaders},
+ remove_basic_auth_from_headers(Headers)) ||
+ {{User, Pass, NoAuthHeaders}, Headers} <- [
+ {
+ {undefined, undefined, []},
+ []
+ },
+ {
+ {undefined, undefined, [{"h", "v"}]},
+ [{"h", "v"}]
+ },
+ {
+ {undefined, undefined, [{"Authorization", "junk"}]},
+ [{"Authorization", "junk"}]
+ },
+ {
+ {undefined, undefined, []},
+ [{"Authorization", "basic X"}]
+ },
+ {
+ {"user", "pass", []},
+ [{"Authorization", "Basic " ++ b64creds("user", "pass")}]
+ },
+ {
+ {"user", "pass", []},
+ [{"AuThorization", "Basic " ++ b64creds("user", "pass")}]
+ },
+ {
+ {"user", "pass", []},
+ [{"Authorization", "bAsIc " ++ b64creds("user", "pass")}]
+ },
+ {
+ {"user", "pass", [{"h", "v"}]},
+ [
+ {"Authorization", "Basic " ++ b64creds("user", "pass")},
+ {"h", "v"}
+ ]
+ }
+ ]
+ ].
+
+
+b64creds(User, Pass) ->
+ base64:encode_to_string(User ++ ":" ++ Pass).
+
+
+-endif.
diff --git a/src/couch_replicator/src/couch_replicator_worker.erl b/src/couch_replicator/src/couch_replicator_worker.erl
index db6b72b2e..e51565866 100644
--- a/src/couch_replicator/src/couch_replicator_worker.erl
+++ b/src/couch_replicator/src/couch_replicator_worker.erl
@@ -22,7 +22,7 @@
-export([handle_call/3, handle_cast/2, handle_info/2]).
-include_lib("couch/include/couch_db.hrl").
--include("couch_replicator_api_wrap.hrl").
+-include_lib("couch_replicator/include/couch_replicator_api_wrap.hrl").
-include("couch_replicator.hrl").
% TODO: maybe make both buffer max sizes configurable
diff --git a/src/couch_replicator/test/couch_replicator_proxy_tests.erl b/src/couch_replicator/test/couch_replicator_proxy_tests.erl
index a40e5b166..4f545bcb5 100644
--- a/src/couch_replicator/test/couch_replicator_proxy_tests.erl
+++ b/src/couch_replicator/test/couch_replicator_proxy_tests.erl
@@ -14,7 +14,7 @@
-include_lib("couch/include/couch_eunit.hrl").
-include_lib("couch_replicator/src/couch_replicator.hrl").
--include_lib("couch_replicator/src/couch_replicator_api_wrap.hrl").
+-include_lib("couch_replicator/include/couch_replicator_api_wrap.hrl").
setup() ->
diff --git a/src/couch_replicator/test/couch_replicator_small_max_request_size_target.erl b/src/couch_replicator/test/couch_replicator_small_max_request_size_target.erl
index 6f3308c39..af3a285f5 100644
--- a/src/couch_replicator/test/couch_replicator_small_max_request_size_target.erl
+++ b/src/couch_replicator/test/couch_replicator_small_max_request_size_target.erl
@@ -61,8 +61,8 @@ reduce_max_request_size_test_() ->
% attachment which exceed maximum request size are simply
% closed instead of returning a 413 request. That makes these
% tests flaky.
- % ++ [{Pair, fun should_replicate_one_with_attachment/2}
- % || Pair <- Pairs]
+ ++ [{Pair, fun should_replicate_one_with_attachment/2}
+ || Pair <- Pairs]
}
}.
@@ -90,12 +90,12 @@ should_replicate_one({From, To}, {_Ctx, {Source, Target}}) ->
% POST-ing individual documents directly and skip bulk_docs. Test that case
% separately
% See note in main test function why this was disabled.
-% should_replicate_one_with_attachment({From, To}, {_Ctx, {Source, Target}}) ->
-% {lists:flatten(io_lib:format("~p -> ~p", [From, To])),
-% {inorder, [should_populate_source_one_large_attachment(Source),
-% should_populate_source(Source),
-% should_replicate(Source, Target),
-% should_compare_databases(Source, Target, [<<"doc0">>])]}}.
+should_replicate_one_with_attachment({From, To}, {_Ctx, {Source, Target}}) ->
+ {lists:flatten(io_lib:format("~p -> ~p", [From, To])),
+ {inorder, [should_populate_source_one_large_attachment(Source),
+ should_populate_source(Source),
+ should_replicate(Source, Target),
+ should_compare_databases(Source, Target, [<<"doc0">>])]}}.
should_populate_source({remote, Source}) ->
@@ -112,11 +112,11 @@ should_populate_source_one_large_one_small(Source) ->
{timeout, ?TIMEOUT_EUNIT, ?_test(one_large_one_small(Source, 12000, 3000))}.
-% should_populate_source_one_large_attachment({remote, Source}) ->
-% should_populate_source_one_large_attachment(Source);
+should_populate_source_one_large_attachment({remote, Source}) ->
+ should_populate_source_one_large_attachment(Source);
-% should_populate_source_one_large_attachment(Source) ->
-% {timeout, ?TIMEOUT_EUNIT, ?_test(one_large_attachment(Source, 70000, 70000))}.
+should_populate_source_one_large_attachment(Source) ->
+ {timeout, ?TIMEOUT_EUNIT, ?_test(one_large_attachment(Source, 70000, 70000))}.
should_replicate({remote, Source}, Target) ->
@@ -156,8 +156,8 @@ one_large_one_small(DbName, Large, Small) ->
add_doc(DbName, <<"doc1">>, Small, 0).
-% one_large_attachment(DbName, Size, AttSize) ->
-% add_doc(DbName, <<"doc0">>, Size, AttSize).
+one_large_attachment(DbName, Size, AttSize) ->
+ add_doc(DbName, <<"doc0">>, Size, AttSize).
add_doc(DbName, DocId, Size, AttSize) when is_binary(DocId) ->