summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJoan Touzet <wohali@users.noreply.github.com>2020-01-16 13:39:09 -0500
committerGitHub <noreply@github.com>2020-01-16 13:39:09 -0500
commit7429f59bb087a152b379108169b7d5ee9ca3b6f2 (patch)
treef752922656cc30757517fffb8ca0629d68b877af
parentf55bd0006947fa88b8e5c3efd795e7264f8fd3a1 (diff)
parentd7188ba8dea81739e8535b92ede1c6613bb598f8 (diff)
downloadcouchdb-mango_metrics.tar.gz
Merge branch 'master' into mango_metricsmango_metrics
-rw-r--r--.gitignore1
-rw-r--r--LICENSE2
-rw-r--r--NOTICE2
-rw-r--r--build-aux/Jenkinsfile.full57
-rw-r--r--rebar.config.script2
-rw-r--r--rel/overlay/etc/default.ini2
-rw-r--r--src/couch/priv/couch_js/60/main.cpp3
-rw-r--r--src/couch/src/couch_bt_engine.erl2
-rw-r--r--src/couch/src/couch_httpd_auth.erl16
-rw-r--r--src/couch/test/exunit/same_site_cookie_tests.exs44
-rw-r--r--src/couch_mrview/src/couch_mrview_index.erl6
-rw-r--r--src/couch_replicator/src/couch_replicator_api_wrap.erl12
-rw-r--r--src/couch_replicator/src/couch_replicator_scheduler.erl8
-rw-r--r--src/couch_replicator/src/couch_replicator_scheduler_job.erl63
-rw-r--r--src/couch_replicator/src/couch_replicator_stats.erl38
-rw-r--r--src/couch_replicator/src/couch_replicator_worker.erl23
-rw-r--r--src/couch_replicator/test/eunit/couch_replicator_error_reporting_tests.erl271
-rw-r--r--src/couch_replicator/test/eunit/couch_replicator_retain_stats_between_job_runs.erl145
-rw-r--r--src/dreyfus/src/dreyfus_fabric.erl133
-rw-r--r--src/dreyfus/src/dreyfus_fabric_group1.erl9
-rw-r--r--src/dreyfus/src/dreyfus_fabric_group2.erl9
-rw-r--r--src/dreyfus/src/dreyfus_fabric_info.erl6
-rw-r--r--src/dreyfus/src/dreyfus_fabric_search.erl18
-rw-r--r--src/dreyfus/src/dreyfus_httpd.erl2
-rw-r--r--src/dreyfus/src/dreyfus_index_updater.erl4
-rw-r--r--src/dreyfus/src/dreyfus_util.erl24
-rw-r--r--src/fabric/src/fabric_db_partition_info.erl84
-rw-r--r--src/fabric/src/fabric_util.erl7
-rw-r--r--src/fabric/src/fabric_view.erl36
-rw-r--r--src/ken/rebar.config.script4
-rw-r--r--src/mem3/src/mem3_rep.erl1
-rw-r--r--src/mem3/src/mem3_sync_event_listener.erl6
-rw-r--r--test/elixir/test/replication_test.exs72
33 files changed, 888 insertions, 224 deletions
diff --git a/.gitignore b/.gitignore
index 5eec70f3e..3fa860c59 100644
--- a/.gitignore
+++ b/.gitignore
@@ -8,6 +8,7 @@
*~
.venv
.DS_Store
+.vscode
.rebar/
.eunit/
cover/
diff --git a/LICENSE b/LICENSE
index 43602516a..8278d4a2d 100644
--- a/LICENSE
+++ b/LICENSE
@@ -187,7 +187,7 @@
same "printed page" as the copyright notice for easier
identification within third-party archives.
- Copyright 2019 The Apache Foundation
+ Copyright 2020 The Apache Foundation
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
diff --git a/NOTICE b/NOTICE
index 23cf02ff6..e8674668b 100644
--- a/NOTICE
+++ b/NOTICE
@@ -1,5 +1,5 @@
Apache CouchDB
-Copyright 2009-2019 The Apache Software Foundation
+Copyright 2009-2020 The Apache Software Foundation
This product includes software developed at
The Apache Software Foundation (http://www.apache.org/).
diff --git a/build-aux/Jenkinsfile.full b/build-aux/Jenkinsfile.full
index 2472d7827..c9327f800 100644
--- a/build-aux/Jenkinsfile.full
+++ b/build-aux/Jenkinsfile.full
@@ -467,22 +467,57 @@ pipeline {
} // post
} // stage
+ stage('Debian Buster arm64v8') {
+ agent {
+ docker {
+ image 'couchdbdev/arm64v8-debian-buster-erlang-20.3.8.24-1:latest'
+ label 'arm64v8'
+ alwaysPull true
+ args "${DOCKER_ARGS}"
+ }
+ }
+ environment {
+ platform = 'buster'
+ sm_ver = '60'
+ }
+ stages {
+ stage('Build from tarball & test') {
+ steps {
+ unstash 'tarball'
+ sh( script: build_and_test )
+ }
+ post {
+ always {
+ junit '**/.eunit/*.xml, **/_build/*/lib/couchdbtest/*.xml, **/src/mango/nosetests.xml, **/test/javascript/junit.xml'
+ }
+ }
+ }
+ stage('Build CouchDB packages') {
+ steps {
+ sh( script: make_packages )
+ sh( script: cleanup_and_save )
+ }
+ post {
+ success {
+ archiveArtifacts artifacts: 'pkgs/**', fingerprint: true
+ }
+ }
+ }
+ } // stages
+ post {
+ cleanup {
+ sh 'rm -rf ${WORKSPACE}/*'
+ }
+ } // post
+ } // stage
+
+
/*
- *
- * This is just taking too long to run. Right now, the khash tests are timing out
- * on the IBM servers:
- *
- * [2019-12-31T20:58:48.704Z] khash randomized test
- * [2019-12-31T20:59:04.869Z] khash_test:103: randomized_test_ (State matches dict implementation)...*timed out*
- *
- * So, this is DISABLED until we get an actual arm builder machine.
- *
- * ppc64le is actually slower to emulate than arm, so we're not even going to try that.
+ * Example of how to do a qemu-based run, please leave here
*/
/*
stage('Debian Buster arm64v8') {
- // once we have an arm64v8 node again, can restore this to original form that is less ugly
// the process is convoluted to ensure we have the latest qemu static binaries on the node first
// before trying to run a foreign docker container type. Alternately ensuring the `update_qemu`
// container is run on every Jenkins agent *after every restart of the Docker daemon* would work.
diff --git a/rebar.config.script b/rebar.config.script
index 5d5a6aac3..e39a08228 100644
--- a/rebar.config.script
+++ b/rebar.config.script
@@ -158,7 +158,7 @@ DepDescs = [
{hyper, "hyper", {tag, "CouchDB-2.2.0-4"}},
{ibrowse, "ibrowse", {tag, "CouchDB-4.0.1-1"}},
{jiffy, "jiffy", {tag, "CouchDB-0.14.11-2"}},
-{mochiweb, "mochiweb", {tag, "v2.19.0"}},
+{mochiweb, "mochiweb", {tag, "v2.20.0"}},
{meck, "meck", {tag, "0.8.8"}}
],
diff --git a/rel/overlay/etc/default.ini b/rel/overlay/etc/default.ini
index 64265dce4..506d88784 100644
--- a/rel/overlay/etc/default.ini
+++ b/rel/overlay/etc/default.ini
@@ -245,6 +245,8 @@ iterations = 10 ; iterations for password hashing
; secret =
; users_db_public = false
; cookie_domain = example.com
+; Set the SameSite cookie property for the auth cookie. If empty, the SameSite property is not set.
+; same_site =
; CSP (Content Security Policy) Support for _utils
[csp]
diff --git a/src/couch/priv/couch_js/60/main.cpp b/src/couch/priv/couch_js/60/main.cpp
index e36bc619b..e78dbb46d 100644
--- a/src/couch/priv/couch_js/60/main.cpp
+++ b/src/couch/priv/couch_js/60/main.cpp
@@ -416,6 +416,9 @@ main(int argc, const char* argv[])
if(cx == NULL)
return 1;
+ JS_SetGlobalJitCompilerOption(cx, JSJITCOMPILER_BASELINE_ENABLE, 0);
+ JS_SetGlobalJitCompilerOption(cx, JSJITCOMPILER_ION_ENABLE, 0);
+
if (!JS::InitSelfHostedCode(cx))
return 1;
diff --git a/src/couch/src/couch_bt_engine.erl b/src/couch/src/couch_bt_engine.erl
index b659719f5..48e751a82 100644
--- a/src/couch/src/couch_bt_engine.erl
+++ b/src/couch/src/couch_bt_engine.erl
@@ -994,7 +994,7 @@ upgrade_purge_info(Fd, Header) ->
_ ->
{ok, PurgedIdsRevs} = couch_file:pread_term(Fd, Ptr),
- {Infos, NewSeq} = lists:foldl(fun({Id, Revs}, {InfoAcc, PSeq}) ->
+ {Infos, _} = lists:foldl(fun({Id, Revs}, {InfoAcc, PSeq}) ->
Info = {PSeq, couch_uuids:random(), Id, Revs},
{[Info | InfoAcc], PSeq + 1}
end, {[], PurgeSeq}, PurgedIdsRevs),
diff --git a/src/couch/src/couch_httpd_auth.erl b/src/couch/src/couch_httpd_auth.erl
index 515ce6132..96de5bf3b 100644
--- a/src/couch/src/couch_httpd_auth.erl
+++ b/src/couch/src/couch_httpd_auth.erl
@@ -273,7 +273,7 @@ cookie_auth_cookie(Req, User, Secret, TimeStamp) ->
Hash = crypto:hmac(sha, Secret, SessionData),
mochiweb_cookies:cookie("AuthSession",
couch_util:encodeBase64Url(SessionData ++ ":" ++ ?b2l(Hash)),
- [{path, "/"}] ++ cookie_scheme(Req) ++ max_age() ++ cookie_domain()).
+ [{path, "/"}] ++ cookie_scheme(Req) ++ max_age() ++ cookie_domain() ++ same_site()).
ensure_cookie_auth_secret() ->
case config:get("couch_httpd_auth", "secret", undefined) of
@@ -457,6 +457,20 @@ cookie_domain() ->
_ -> [{domain, Domain}]
end.
+
+same_site() ->
+ SameSite = config:get("couch_httpd_auth", "same_site", ""),
+ case string:to_lower(SameSite) of
+ "" -> [];
+ "none" -> [{same_site, none}];
+ "lax" -> [{same_site, lax}];
+ "strict" -> [{same_site, strict}];
+ _ ->
+ couch_log:error("invalid config value couch_httpd_auth.same_site: ~p ",[SameSite]),
+ []
+ end.
+
+
reject_if_totp(User) ->
case get_totp_config(User) of
undefined ->
diff --git a/src/couch/test/exunit/same_site_cookie_tests.exs b/src/couch/test/exunit/same_site_cookie_tests.exs
new file mode 100644
index 000000000..bad32ada4
--- /dev/null
+++ b/src/couch/test/exunit/same_site_cookie_tests.exs
@@ -0,0 +1,44 @@
+defmodule SameSiteCookieTests do
+ use CouchTestCase
+
+ @moduletag :authentication
+
+ def get_cookie(user, pass) do
+ resp = Couch.post("/_session", body: %{:username => user, :password => pass})
+
+ true = resp.body["ok"]
+ resp.headers[:"set-cookie"]
+ end
+
+ @tag config: [{"admins", "jan", "apple"}, {"couch_httpd_auth", "same_site", "None"}]
+ test "Set same_site None" do
+ cookie = get_cookie("jan", "apple")
+ assert cookie =~ "; SameSite=None"
+ end
+
+ @tag config: [{"admins", "jan", "apple"}, {"couch_httpd_auth", "same_site", ""}]
+ test "same_site not set" do
+ cookie = get_cookie("jan", "apple")
+ assert cookie
+ refute cookie =~ "; SameSite="
+ end
+
+ @tag config: [{"admins", "jan", "apple"}, {"couch_httpd_auth", "same_site", "Strict"}]
+ test "Set same_site Strict" do
+ cookie = get_cookie("jan", "apple")
+ assert cookie =~ "; SameSite=Strict"
+ end
+
+ @tag config: [{"admins", "jan", "apple"}, {"couch_httpd_auth", "same_site", "Lax"}]
+ test "Set same_site Lax" do
+ cookie = get_cookie("jan", "apple")
+ assert cookie =~ "; SameSite=Lax"
+ end
+
+ @tag config: [{"admins", "jan", "apple"}, {"couch_httpd_auth", "same_site", "Invalid"}]
+ test "Set same_site invalid" do
+ cookie = get_cookie("jan", "apple")
+ assert cookie
+ refute cookie =~ "; SameSite="
+ end
+end
diff --git a/src/couch_mrview/src/couch_mrview_index.erl b/src/couch_mrview/src/couch_mrview_index.erl
index 8542cc63f..c96d87173 100644
--- a/src/couch_mrview/src/couch_mrview_index.erl
+++ b/src/couch_mrview/src/couch_mrview_index.erl
@@ -127,6 +127,12 @@ open(Db, State0) ->
NewSt = couch_mrview_util:init_state(Db, Fd, State, Header),
ensure_local_purge_doc(Db, NewSt),
{ok, NewSt};
+ {ok, {WrongSig, _}} ->
+ couch_log:error("~s has the wrong signature: expected: ~p but got ~p",
+ [IndexFName, Sig, WrongSig]),
+ NewSt = couch_mrview_util:reset_index(Db, Fd, State),
+ ensure_local_purge_doc(Db, NewSt),
+ {ok, NewSt};
no_valid_header ->
NewSt = couch_mrview_util:reset_index(Db, Fd, State),
ensure_local_purge_doc(Db, NewSt),
diff --git a/src/couch_replicator/src/couch_replicator_api_wrap.erl b/src/couch_replicator/src/couch_replicator_api_wrap.erl
index ab1de7df9..a21de4242 100644
--- a/src/couch_replicator/src/couch_replicator_api_wrap.erl
+++ b/src/couch_replicator/src/couch_replicator_api_wrap.erl
@@ -186,7 +186,9 @@ get_missing_revs(#httpdb{} = Db, IdRevs) ->
),
{Id, MissingRevs, PossibleAncestors}
end,
- {ok, lists:map(ConvertToNativeFun, Props)}
+ {ok, lists:map(ConvertToNativeFun, Props)};
+ (ErrCode, _, ErrMsg) when is_integer(ErrCode) ->
+ {error, {revs_diff_failed, ErrCode, ErrMsg}}
end).
@@ -408,7 +410,9 @@ update_docs(#httpdb{} = HttpDb, DocList, Options, UpdateType) ->
(413, _, _) ->
{error, request_body_too_large};
(417, _, Results) when is_list(Results) ->
- {ok, bulk_results_to_errors(DocList, Results, remote)}
+ {ok, bulk_results_to_errors(DocList, Results, remote)};
+ (ErrCode, _, ErrMsg) when is_integer(ErrCode) ->
+ {error, {bulk_docs_failed, ErrCode, ErrMsg}}
end).
@@ -466,7 +470,9 @@ changes_since(#httpdb{headers = Headers1, timeout = InactiveTimeout} = HttpDb,
end,
parse_changes_feed(Options, UserFun2,
DataStreamFun2)
- end)
+ end);
+ (ErrCode, _, ErrMsg) when is_integer(ErrCode) ->
+ throw({retry_limit, {changes_req_failed, ErrCode, ErrMsg}})
end)
catch
exit:{http_request_failed, _, _, max_backoff} ->
diff --git a/src/couch_replicator/src/couch_replicator_scheduler.erl b/src/couch_replicator/src/couch_replicator_scheduler.erl
index f84860c6e..53c040e8c 100644
--- a/src/couch_replicator/src/couch_replicator_scheduler.erl
+++ b/src/couch_replicator/src/couch_replicator_scheduler.erl
@@ -318,8 +318,12 @@ handle_info({'DOWN', _Ref, process, Pid, normal}, State) ->
update_running_jobs_stats(State#state.stats_pid),
{noreply, State};
-handle_info({'DOWN', _Ref, process, Pid, Reason}, State) ->
+handle_info({'DOWN', _Ref, process, Pid, Reason0}, State) ->
{ok, Job} = job_by_pid(Pid),
+ Reason = case Reason0 of
+ {shutdown, ShutdownReason} -> ShutdownReason;
+ Other -> Other
+ end,
ok = handle_crashed_job(Job, Reason, State),
{noreply, State};
@@ -873,7 +877,7 @@ is_continuous(#job{rep = Rep}) ->
% optimize some options to help the job make progress.
-spec maybe_optimize_job_for_rate_limiting(#job{}) -> #job{}.
maybe_optimize_job_for_rate_limiting(Job = #job{history =
- [{{crashed, {shutdown, max_backoff}}, _} | _]}) ->
+ [{{crashed, max_backoff}, _} | _]}) ->
Opts = [
{checkpoint_interval, 5000},
{worker_processes, 2},
diff --git a/src/couch_replicator/src/couch_replicator_scheduler_job.erl b/src/couch_replicator/src/couch_replicator_scheduler_job.erl
index d69febb81..0b33419e1 100644
--- a/src/couch_replicator/src/couch_replicator_scheduler_job.erl
+++ b/src/couch_replicator/src/couch_replicator_scheduler_job.erl
@@ -73,8 +73,6 @@
workers,
stats = couch_replicator_stats:new(),
session_id,
- source_monitor = nil,
- target_monitor = nil,
source_seq = nil,
use_checkpoints = true,
checkpoint_interval = ?DEFAULT_CHECKPOINT_INTERVAL,
@@ -242,14 +240,6 @@ handle_cast({report_seq, Seq},
handle_info(shutdown, St) ->
{stop, shutdown, St};
-handle_info({'DOWN', Ref, _, _, Why}, #rep_state{source_monitor = Ref} = St) ->
- couch_log:error("Source database is down. Reason: ~p", [Why]),
- {stop, source_db_down, St};
-
-handle_info({'DOWN', Ref, _, _, Why}, #rep_state{target_monitor = Ref} = St) ->
- couch_log:error("Target database is down. Reason: ~p", [Why]),
- {stop, target_db_down, St};
-
handle_info({'EXIT', Pid, max_backoff}, State) ->
couch_log:error("Max backoff reached child process ~p", [Pid]),
{stop, {shutdown, max_backoff}, State};
@@ -261,10 +251,20 @@ handle_info({'EXIT', Pid, {shutdown, max_backoff}}, State) ->
handle_info({'EXIT', Pid, normal}, #rep_state{changes_reader=Pid} = State) ->
{noreply, State};
-handle_info({'EXIT', Pid, Reason}, #rep_state{changes_reader=Pid} = State) ->
+handle_info({'EXIT', Pid, Reason0}, #rep_state{changes_reader=Pid} = State) ->
couch_stats:increment_counter([couch_replicator, changes_reader_deaths]),
+ Reason = case Reason0 of
+ {changes_req_failed, _, _} = HttpFail ->
+ HttpFail;
+ {http_request_failed, _, _, {error, {code, Code}}} ->
+ {changes_req_failed, Code};
+ {http_request_failed, _, _, {error, Err}} ->
+ {changes_req_failed, Err};
+ Other ->
+ {changes_reader_died, Other}
+ end,
couch_log:error("ChangesReader process died with reason: ~p", [Reason]),
- {stop, changes_reader_died, cancel_timer(State)};
+ {stop, {shutdown, Reason}, cancel_timer(State)};
handle_info({'EXIT', Pid, normal}, #rep_state{changes_manager = Pid} = State) ->
{noreply, State};
@@ -272,7 +272,7 @@ handle_info({'EXIT', Pid, normal}, #rep_state{changes_manager = Pid} = State) ->
handle_info({'EXIT', Pid, Reason}, #rep_state{changes_manager = Pid} = State) ->
couch_stats:increment_counter([couch_replicator, changes_manager_deaths]),
couch_log:error("ChangesManager process died with reason: ~p", [Reason]),
- {stop, changes_manager_died, cancel_timer(State)};
+ {stop, {shutdown, {changes_manager_died, Reason}}, cancel_timer(State)};
handle_info({'EXIT', Pid, normal}, #rep_state{changes_queue=Pid} = State) ->
{noreply, State};
@@ -280,7 +280,7 @@ handle_info({'EXIT', Pid, normal}, #rep_state{changes_queue=Pid} = State) ->
handle_info({'EXIT', Pid, Reason}, #rep_state{changes_queue=Pid} = State) ->
couch_stats:increment_counter([couch_replicator, changes_queue_deaths]),
couch_log:error("ChangesQueue process died with reason: ~p", [Reason]),
- {stop, changes_queue_died, cancel_timer(State)};
+ {stop, {shutdown, {changes_queue_died, Reason}}, cancel_timer(State)};
handle_info({'EXIT', Pid, normal}, #rep_state{workers = Workers} = State) ->
case Workers -- [Pid] of
@@ -304,8 +304,14 @@ handle_info({'EXIT', Pid, Reason}, #rep_state{workers = Workers} = State) ->
{stop, {unknown_process_died, Pid, Reason}, State2};
true ->
couch_stats:increment_counter([couch_replicator, worker_deaths]),
- couch_log:error("Worker ~p died with reason: ~p", [Pid, Reason]),
- {stop, {worker_died, Pid, Reason}, State2}
+ StopReason = case Reason of
+ {shutdown, _} = Err ->
+ Err;
+ Other ->
+ couch_log:error("Worker ~p died with reason: ~p", [Pid, Reason]),
+ {worker_died, Pid, Other}
+ end,
+ {stop, StopReason, State2}
end;
handle_info(timeout, InitArgs) ->
@@ -380,6 +386,11 @@ terminate({shutdown, max_backoff}, State) ->
terminate_cleanup(State),
couch_replicator_notifier:notify({error, RepId, max_backoff});
+terminate({shutdown, Reason}, State) ->
+ % Unwrap so when reporting we don't have an extra {shutdown, ...} tuple
+ % wrapped around the message
+ terminate(Reason, State);
+
terminate(Reason, State) ->
#rep_state{
source_name = Source,
@@ -554,7 +565,7 @@ init_state(Rep) ->
options = Options,
type = Type, view = View,
start_time = StartTime,
- stats = Stats
+ stats = ArgStats0
} = Rep,
% Adjust minimum number of http source connections to 2 to avoid deadlock
Src = adjust_maxconn(Src0, BaseId),
@@ -569,6 +580,14 @@ init_state(Rep) ->
[SourceLog, TargetLog] = find_and_migrate_logs([Source, Target], Rep),
{StartSeq0, History} = compare_replication_logs(SourceLog, TargetLog),
+
+ ArgStats1 = couch_replicator_stats:new(ArgStats0),
+ HistoryStats = case History of
+ [{[_ | _] = HProps} | _] -> couch_replicator_stats:new(HProps);
+ _ -> couch_replicator_stats:new()
+ end,
+ Stats = couch_replicator_stats:max_stats(ArgStats1, HistoryStats),
+
StartSeq1 = get_value(since_seq, Options, StartSeq0),
StartSeq = {0, StartSeq1},
@@ -592,15 +611,13 @@ init_state(Rep) ->
src_starttime = get_value(<<"instance_start_time">>, SourceInfo),
tgt_starttime = get_value(<<"instance_start_time">>, TargetInfo),
session_id = couch_uuids:random(),
- source_monitor = db_monitor(Source),
- target_monitor = db_monitor(Target),
source_seq = SourceSeq,
use_checkpoints = get_value(use_checkpoints, Options, true),
checkpoint_interval = get_value(checkpoint_interval, Options,
?DEFAULT_CHECKPOINT_INTERVAL),
type = Type,
view = View,
- stats = couch_replicator_stats:new(Stats)
+ stats = Stats
},
State#rep_state{timer = start_timer(State)}.
@@ -905,12 +922,6 @@ has_session_id(SessionId, [{Props} | Rest]) ->
end.
-db_monitor(#httpdb{}) ->
- nil;
-db_monitor(Db) ->
- couch_db:monitor(Db).
-
-
get_pending_count(St) ->
Rep = St#rep_state.rep_details,
Timeout = get_value(connection_timeout, Rep#rep.options),
diff --git a/src/couch_replicator/src/couch_replicator_stats.erl b/src/couch_replicator/src/couch_replicator_stats.erl
index cd62949e9..37848b3ee 100644
--- a/src/couch_replicator/src/couch_replicator_stats.erl
+++ b/src/couch_replicator/src/couch_replicator_stats.erl
@@ -17,7 +17,8 @@
new/1,
get/2,
increment/2,
- sum_stats/2
+ sum_stats/2,
+ max_stats/2
]).
-export([
@@ -64,14 +65,29 @@ increment(Field, Stats) ->
sum_stats(S1, S2) ->
orddict:merge(fun(_, V1, V2) -> V1+V2 end, S1, S2).
+max_stats(S1, S2) ->
+ orddict:merge(fun(_, V1, V2) -> max(V1, V2) end, S1, S2).
-% Handle initializing from a status object which uses same values but different
-% field names.
-fmap({revisions_checked, V}) -> {true, {missing_checked, V}};
-fmap({missing_revisions_found, V}) -> {true, {missing_found, V}};
-fmap({missing_checked, _}) -> true;
-fmap({missing_found, _}) -> true;
-fmap({docs_read, _}) -> true;
-fmap({docs_written, _}) -> true;
-fmap({doc_write_failures, _}) -> true;
-fmap({_, _}) -> false.
+
+% Handle initializing from a status object, which uses same values but
+% different field names, as well as from ejson props from the checkpoint
+% history
+%
+fmap({missing_found, _}) -> true;
+fmap({missing_revisions_found, V}) -> {true, {missing_found, V}};
+fmap({<<"missing_found">>, V}) -> {true, {missing_found, V}};
+
+fmap({missing_checked, _}) -> true;
+fmap({revisions_checked, V}) -> {true, {missing_checked, V}};
+fmap({<<"missing_checked">>, V}) -> {true, {missing_checked, V}};
+
+fmap({docs_read, _}) -> true;
+fmap({<<"docs_read">>, V}) -> {true, {docs_read, V}};
+
+fmap({docs_written, _}) -> true;
+fmap({<<"docs_written">>, V}) -> {true, {docs_written, V}};
+
+fmap({doc_write_failures, _}) -> true;
+fmap({<<"doc_write_failures">>, V}) -> {true, {doc_write_failures, V}};
+
+fmap({_, _}) -> false.
diff --git a/src/couch_replicator/src/couch_replicator_worker.erl b/src/couch_replicator/src/couch_replicator_worker.erl
index 3d80f5883..eb8beaaa9 100644
--- a/src/couch_replicator/src/couch_replicator_worker.erl
+++ b/src/couch_replicator/src/couch_replicator_worker.erl
@@ -169,6 +169,15 @@ handle_info({'EXIT', Pid, normal}, #state{writer = nil} = State) ->
handle_info({'EXIT', _Pid, max_backoff}, State) ->
{stop, {shutdown, max_backoff}, State};
+handle_info({'EXIT', _Pid, {bulk_docs_failed, _, _} = Err}, State) ->
+ {stop, {shutdown, Err}, State};
+
+handle_info({'EXIT', _Pid, {revs_diff_failed, _, _} = Err}, State) ->
+ {stop, {shutdown, Err}, State};
+
+handle_info({'EXIT', _Pid, {http_request_failed, _, _, _} = Err}, State) ->
+ {stop, {shutdown, Err}, State};
+
handle_info({'EXIT', Pid, Reason}, State) ->
{stop, {process_died, Pid, Reason}, State}.
@@ -372,8 +381,9 @@ handle_flush_docs_result({error, request_body_too_large}, Target, DocList) ->
" request body is too large. Splitting batch into 2 separate batches of"
" sizes ~p and ~p", [Len, couch_replicator_api_wrap:db_uri(Target),
length(DocList1), length(DocList2)]),
- flush_docs(Target, DocList1),
- flush_docs(Target, DocList2);
+ Stats1 = flush_docs(Target, DocList1),
+ Stats2 = flush_docs(Target, DocList2),
+ couch_replicator_stats:sum_stats(Stats1, Stats2);
handle_flush_docs_result({ok, Errors}, Target, DocList) ->
DbUri = couch_replicator_api_wrap:db_uri(Target),
lists:foreach(
@@ -386,7 +396,9 @@ handle_flush_docs_result({ok, Errors}, Target, DocList) ->
couch_replicator_stats:new([
{docs_written, length(DocList) - length(Errors)},
{doc_write_failures, length(Errors)}
- ]).
+ ]);
+handle_flush_docs_result({error, {bulk_docs_failed, _, _} = Err}, _, _) ->
+ exit(Err).
flush_doc(Target, #doc{id = Id, revs = {Pos, [RevId | _]}} = Doc) ->
@@ -425,7 +437,10 @@ find_missing(DocInfos, Target) ->
end, {[], 0}, DocInfos),
- {ok, Missing} = couch_replicator_api_wrap:get_missing_revs(Target, IdRevs),
+ Missing = case couch_replicator_api_wrap:get_missing_revs(Target, IdRevs) of
+ {ok, Result} -> Result;
+ {error, Error} -> exit(Error)
+ end,
MissingRevsCount = lists:foldl(
fun({_Id, MissingRevs, _PAs}, Acc) -> Acc + length(MissingRevs) end,
0, Missing),
diff --git a/src/couch_replicator/test/eunit/couch_replicator_error_reporting_tests.erl b/src/couch_replicator/test/eunit/couch_replicator_error_reporting_tests.erl
new file mode 100644
index 000000000..6b4f95c25
--- /dev/null
+++ b/src/couch_replicator/test/eunit/couch_replicator_error_reporting_tests.erl
@@ -0,0 +1,271 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_replicator_error_reporting_tests).
+
+-include_lib("couch/include/couch_eunit.hrl").
+-include_lib("couch/include/couch_db.hrl").
+-include_lib("couch_replicator/src/couch_replicator.hrl").
+
+
+setup_all() ->
+ test_util:start_couch([couch_replicator, chttpd, mem3, fabric]).
+
+
+teardown_all(Ctx) ->
+ ok = test_util:stop_couch(Ctx).
+
+
+setup() ->
+ meck:unload(),
+ Source = setup_db(),
+ Target = setup_db(),
+ {Source, Target}.
+
+
+teardown({Source, Target}) ->
+ meck:unload(),
+ teardown_db(Source),
+ teardown_db(Target),
+ ok.
+
+
+error_reporting_test_() ->
+ {
+ setup,
+ fun setup_all/0,
+ fun teardown_all/1,
+ {
+ foreach,
+ fun setup/0,
+ fun teardown/1,
+ [
+ fun t_fail_bulk_docs/1,
+ fun t_fail_changes_reader/1,
+ fun t_fail_revs_diff/1,
+ fun t_fail_changes_queue/1,
+ fun t_fail_changes_manager/1,
+ fun t_fail_changes_reader_proc/1
+ ]
+ }
+ }.
+
+
+t_fail_bulk_docs({Source, Target}) ->
+ ?_test(begin
+ populate_db(Source, 1, 5),
+ {ok, RepId} = replicate(Source, Target),
+ wait_target_in_sync(Source, Target),
+
+ {ok, Listener} = rep_result_listener(RepId),
+ mock_fail_req("/_bulk_docs", {ok, "403", [], [<<"{\"x\":\"y\"}">>]}),
+ populate_db(Source, 6, 6),
+
+ {error, Result} = wait_rep_result(RepId),
+ ?assertEqual({bulk_docs_failed, 403, {[{<<"x">>, <<"y">>}]}}, Result),
+
+ couch_replicator_notifier:stop(Listener)
+ end).
+
+
+t_fail_changes_reader({Source, Target}) ->
+ ?_test(begin
+ populate_db(Source, 1, 5),
+ {ok, RepId} = replicate(Source, Target),
+ wait_target_in_sync(Source, Target),
+
+ {ok, Listener} = rep_result_listener(RepId),
+ mock_fail_req("/_changes", {ok, "418", [], [<<"{\"x\":\"y\"}">>]}),
+ populate_db(Source, 6, 6),
+
+ {error, Result} = wait_rep_result(RepId),
+ ?assertEqual({changes_req_failed, 418, {[{<<"x">>, <<"y">>}]}}, Result),
+
+ couch_replicator_notifier:stop(Listener)
+ end).
+
+
+t_fail_revs_diff({Source, Target}) ->
+ ?_test(begin
+ populate_db(Source, 1, 5),
+ {ok, RepId} = replicate(Source, Target),
+ wait_target_in_sync(Source, Target),
+
+ {ok, Listener} = rep_result_listener(RepId),
+ mock_fail_req("/_revs_diff", {ok, "407", [], [<<"{\"x\":\"y\"}">>]}),
+ populate_db(Source, 6, 6),
+
+ {error, Result} = wait_rep_result(RepId),
+ ?assertEqual({revs_diff_failed, 407, {[{<<"x">>, <<"y">>}]}}, Result),
+
+ couch_replicator_notifier:stop(Listener)
+ end).
+
+
+t_fail_changes_queue({Source, Target}) ->
+ ?_test(begin
+ populate_db(Source, 1, 5),
+ {ok, RepId} = replicate(Source, Target),
+ wait_target_in_sync(Source, Target),
+
+ RepPid = couch_replicator_test_helper:get_pid(RepId),
+ State = sys:get_state(RepPid),
+ ChangesQueue = element(20, State),
+ ?assert(is_process_alive(ChangesQueue)),
+
+ {ok, Listener} = rep_result_listener(RepId),
+ exit(ChangesQueue, boom),
+
+ {error, Result} = wait_rep_result(RepId),
+ ?assertEqual({changes_queue_died, boom}, Result),
+ couch_replicator_notifier:stop(Listener)
+ end).
+
+
+t_fail_changes_manager({Source, Target}) ->
+ ?_test(begin
+ populate_db(Source, 1, 5),
+ {ok, RepId} = replicate(Source, Target),
+ wait_target_in_sync(Source, Target),
+
+ RepPid = couch_replicator_test_helper:get_pid(RepId),
+ State = sys:get_state(RepPid),
+ ChangesManager = element(21, State),
+ ?assert(is_process_alive(ChangesManager)),
+
+ {ok, Listener} = rep_result_listener(RepId),
+ exit(ChangesManager, bam),
+
+ {error, Result} = wait_rep_result(RepId),
+ ?assertEqual({changes_manager_died, bam}, Result),
+ couch_replicator_notifier:stop(Listener)
+ end).
+
+
+t_fail_changes_reader_proc({Source, Target}) ->
+ ?_test(begin
+ populate_db(Source, 1, 5),
+ {ok, RepId} = replicate(Source, Target),
+ wait_target_in_sync(Source, Target),
+
+ RepPid = couch_replicator_test_helper:get_pid(RepId),
+ State = sys:get_state(RepPid),
+ ChangesReader = element(22, State),
+ ?assert(is_process_alive(ChangesReader)),
+
+ {ok, Listener} = rep_result_listener(RepId),
+ exit(ChangesReader, kapow),
+
+ {error, Result} = wait_rep_result(RepId),
+ ?assertEqual({changes_reader_died, kapow}, Result),
+ couch_replicator_notifier:stop(Listener)
+ end).
+
+
+mock_fail_req(Path, Return) ->
+ meck:expect(ibrowse, send_req_direct,
+ fun(W, Url, Headers, Meth, Body, Opts, TOut) ->
+ Args = [W, Url, Headers, Meth, Body, Opts, TOut],
+ {ok, {_, _, _, _, UPath, _}} = http_uri:parse(Url),
+ case lists:suffix(Path, UPath) of
+ true -> Return;
+ false -> meck:passthrough(Args)
+ end
+ end).
+
+
+rep_result_listener(RepId) ->
+ ReplyTo = self(),
+ {ok, _Listener} = couch_replicator_notifier:start_link(
+ fun({_, RepId2, _} = Ev) when RepId2 =:= RepId ->
+ ReplyTo ! Ev;
+ (_) ->
+ ok
+ end).
+
+
+wait_rep_result(RepId) ->
+ receive
+ {finished, RepId, RepResult} -> {ok, RepResult};
+ {error, RepId, Reason} -> {error, Reason}
+ end.
+
+
+
+setup_db() ->
+ DbName = ?tempdb(),
+ {ok, Db} = couch_db:create(DbName, [?ADMIN_CTX]),
+ ok = couch_db:close(Db),
+ DbName.
+
+
+teardown_db(DbName) ->
+ ok = couch_server:delete(DbName, [?ADMIN_CTX]).
+
+
+populate_db(DbName, Start, End) ->
+ {ok, Db} = couch_db:open_int(DbName, []),
+ Docs = lists:foldl(
+ fun(DocIdCounter, Acc) ->
+ Id = integer_to_binary(DocIdCounter),
+ Doc = #doc{id = Id, body = {[]}},
+ [Doc | Acc]
+ end,
+ [], lists:seq(Start, End)),
+ {ok, _} = couch_db:update_docs(Db, Docs, []),
+ ok = couch_db:close(Db).
+
+
+wait_target_in_sync(Source, Target) ->
+ {ok, SourceDb} = couch_db:open_int(Source, []),
+ {ok, SourceInfo} = couch_db:get_db_info(SourceDb),
+ ok = couch_db:close(SourceDb),
+ SourceDocCount = couch_util:get_value(doc_count, SourceInfo),
+ wait_target_in_sync_loop(SourceDocCount, Target, 300).
+
+
+wait_target_in_sync_loop(_DocCount, _TargetName, 0) ->
+ erlang:error({assertion_failed, [
+ {module, ?MODULE}, {line, ?LINE},
+ {reason, "Could not get source and target databases in sync"}
+ ]});
+
+wait_target_in_sync_loop(DocCount, TargetName, RetriesLeft) ->
+ {ok, Target} = couch_db:open_int(TargetName, []),
+ {ok, TargetInfo} = couch_db:get_db_info(Target),
+ ok = couch_db:close(Target),
+ TargetDocCount = couch_util:get_value(doc_count, TargetInfo),
+ case TargetDocCount == DocCount of
+ true ->
+ true;
+ false ->
+ ok = timer:sleep(500),
+ wait_target_in_sync_loop(DocCount, TargetName, RetriesLeft - 1)
+ end.
+
+
+replicate(Source, Target) ->
+ SrcUrl = couch_replicator_test_helper:db_url(Source),
+ TgtUrl = couch_replicator_test_helper:db_url(Target),
+ RepObject = {[
+ {<<"source">>, SrcUrl},
+ {<<"target">>, TgtUrl},
+ {<<"continuous">>, true},
+ {<<"worker_processes">>, 1},
+ {<<"retries_per_request">>, 1},
+ % Low connection timeout so _changes feed gets restarted quicker
+ {<<"connection_timeout">>, 3000}
+ ]},
+ {ok, Rep} = couch_replicator_utils:parse_rep_doc(RepObject, ?ADMIN_USER),
+ ok = couch_replicator_scheduler:add_job(Rep),
+ couch_replicator_scheduler:reschedule(),
+ {ok, Rep#rep.id}.
diff --git a/src/couch_replicator/test/eunit/couch_replicator_retain_stats_between_job_runs.erl b/src/couch_replicator/test/eunit/couch_replicator_retain_stats_between_job_runs.erl
index 9dd86b3ef..037f37191 100644
--- a/src/couch_replicator/test/eunit/couch_replicator_retain_stats_between_job_runs.erl
+++ b/src/couch_replicator/test/eunit/couch_replicator_retain_stats_between_job_runs.erl
@@ -18,48 +18,93 @@
-define(DELAY, 500).
-define(TIMEOUT, 60000).
--define(i2l(I), integer_to_list(I)).
--define(io2b(Io), iolist_to_binary(Io)).
+
+
+setup_all() ->
+ test_util:start_couch([couch_replicator, chttpd, mem3, fabric]).
+
+
+teardown_all(Ctx) ->
+ ok = test_util:stop_couch(Ctx).
setup() ->
- Ctx = test_util:start_couch([couch_replicator, chttpd, mem3, fabric]),
Source = setup_db(),
Target = setup_db(),
- {Ctx, {Source, Target}}.
+ {Source, Target}.
-teardown({Ctx, {Source, Target}}) ->
+teardown({Source, Target}) ->
teardown_db(Source),
teardown_db(Target),
- ok = application:stop(couch_replicator),
- ok = test_util:stop_couch(Ctx).
+ ok.
stats_retained_test_() ->
{
setup,
- fun setup/0,
- fun teardown/1,
- fun t_stats_retained/1
+ fun setup_all/0,
+ fun teardown_all/1,
+ {
+ foreach,
+ fun setup/0,
+ fun teardown/1,
+ [
+ fun t_stats_retained_by_scheduler/1,
+ fun t_stats_retained_on_job_removal/1
+ ]
+ }
}.
-t_stats_retained({_Ctx, {Source, Target}}) ->
+t_stats_retained_by_scheduler({Source, Target}) ->
?_test(begin
- populate_db(Source, 42),
+ {ok, _} = add_vdu(Target),
+ populate_db_reject_even_docs(Source, 1, 10),
{ok, RepPid, RepId} = replicate(Source, Target),
+ wait_target_in_sync(6, Target),
- wait_target_in_sync(Source, Target),
- check_active_tasks(42, 42),
- check_scheduler_jobs(42, 42),
+ check_active_tasks(10, 5, 5),
+ check_scheduler_jobs(10, 5, 5),
stop_job(RepPid),
- check_scheduler_jobs(42, 42),
+ check_scheduler_jobs(10, 5, 5),
start_job(),
- check_active_tasks(42, 42),
- check_scheduler_jobs(42, 42),
+ check_active_tasks(10, 5, 5),
+ check_scheduler_jobs(10, 5, 5),
+ couch_replicator_scheduler:remove_job(RepId)
+ end).
+
+
+t_stats_retained_on_job_removal({Source, Target}) ->
+ ?_test(begin
+ {ok, _} = add_vdu(Target),
+ populate_db_reject_even_docs(Source, 1, 10),
+ {ok, _, RepId} = replicate(Source, Target),
+ wait_target_in_sync(6, Target), % 5 + 1 vdu
+
+ check_active_tasks(10, 5, 5),
+ check_scheduler_jobs(10, 5, 5),
+
+ couch_replicator_scheduler:remove_job(RepId),
+
+ populate_db_reject_even_docs(Source, 11, 20),
+ {ok, _, RepId} = replicate(Source, Target),
+ wait_target_in_sync(11, Target), % 6 + 5
+
+ check_scheduler_jobs(20, 10, 10),
+ check_active_tasks(20, 10, 10),
+
+ couch_replicator_scheduler:remove_job(RepId),
+
+ populate_db_reject_even_docs(Source, 21, 30),
+ {ok, _, RepId} = replicate(Source, Target),
+ wait_target_in_sync(16, Target), % 11 + 5
+
+ check_scheduler_jobs(30, 15, 15),
+ check_active_tasks(30, 15, 15),
+
couch_replicator_scheduler:remove_job(RepId)
end).
@@ -92,14 +137,16 @@ start_job() ->
couch_replicator_scheduler:reschedule().
-check_active_tasks(DocsRead, DocsWritten) ->
+check_active_tasks(DocsRead, DocsWritten, DocsFailed) ->
RepTask = wait_for_task_status(),
?assertNotEqual(timeout, RepTask),
?assertEqual(DocsRead, couch_util:get_value(docs_read, RepTask)),
- ?assertEqual(DocsWritten, couch_util:get_value(docs_written, RepTask)).
+ ?assertEqual(DocsWritten, couch_util:get_value(docs_written, RepTask)),
+ ?assertEqual(DocsFailed, couch_util:get_value(doc_write_failures,
+ RepTask)).
-check_scheduler_jobs(DocsRead, DocsWritten) ->
+check_scheduler_jobs(DocsRead, DocsWritten, DocFailed) ->
Info = wait_scheduler_info(),
?assert(maps:is_key(<<"changes_pending">>, Info)),
?assert(maps:is_key(<<"doc_write_failures">>, Info)),
@@ -110,7 +157,8 @@ check_scheduler_jobs(DocsRead, DocsWritten) ->
?assert(maps:is_key(<<"source_seq">>, Info)),
?assert(maps:is_key(<<"revisions_checked">>, Info)),
?assertMatch(#{<<"docs_read">> := DocsRead}, Info),
- ?assertMatch(#{<<"docs_written">> := DocsWritten}, Info).
+ ?assertMatch(#{<<"docs_written">> := DocsWritten}, Info),
+ ?assertMatch(#{<<"doc_write_failures">> := DocFailed}, Info).
replication_tasks() ->
@@ -138,25 +186,31 @@ wait_scheduler_info() ->
end).
-populate_db(DbName, DocCount) ->
+populate_db_reject_even_docs(DbName, Start, End) ->
+ BodyFun = fun(Id) ->
+ case Id rem 2 == 0 of
+ true -> {[{<<"nope">>, true}]};
+ false -> {[]}
+ end
+ end,
+ populate_db(DbName, Start, End, BodyFun).
+
+
+populate_db(DbName, Start, End, BodyFun) when is_function(BodyFun, 1) ->
{ok, Db} = couch_db:open_int(DbName, []),
Docs = lists:foldl(
fun(DocIdCounter, Acc) ->
- Id = ?io2b(["doc", ?i2l(DocIdCounter)]),
- Doc = #doc{id = Id, body = {[]}},
+ Id = integer_to_binary(DocIdCounter),
+ Doc = #doc{id = Id, body = BodyFun(DocIdCounter)},
[Doc | Acc]
end,
- [], lists:seq(1, DocCount)),
+ [], lists:seq(Start, End)),
{ok, _} = couch_db:update_docs(Db, Docs, []),
ok = couch_db:close(Db).
-wait_target_in_sync(Source, Target) ->
- {ok, SourceDb} = couch_db:open_int(Source, []),
- {ok, SourceInfo} = couch_db:get_db_info(SourceDb),
- ok = couch_db:close(SourceDb),
- SourceDocCount = couch_util:get_value(doc_count, SourceInfo),
- wait_target_in_sync_loop(SourceDocCount, Target, 300).
+wait_target_in_sync(DocCount, Target) when is_integer(DocCount) ->
+ wait_target_in_sync_loop(DocCount, Target, 300).
wait_target_in_sync_loop(_DocCount, _TargetName, 0) ->
@@ -170,7 +224,7 @@ wait_target_in_sync_loop(DocCount, TargetName, RetriesLeft) ->
{ok, TargetInfo} = couch_db:get_db_info(Target),
ok = couch_db:close(Target),
TargetDocCount = couch_util:get_value(doc_count, TargetInfo),
- case TargetDocCount == DocCount of
+ case TargetDocCount == DocCount of
true ->
true;
false ->
@@ -201,3 +255,28 @@ scheduler_jobs() ->
{ok, 200, _, Body} = test_request:get(Url, []),
Json = jiffy:decode(Body, [return_maps]),
maps:get(<<"jobs">>, Json).
+
+
+vdu() ->
+ <<"function(newDoc, oldDoc, userCtx) {
+ if(newDoc.nope === true) {
+ throw({forbidden: 'nope'});
+ } else {
+ return;
+ }
+ }">>.
+
+
+add_vdu(DbName) ->
+ DocProps = [
+ {<<"_id">>, <<"_design/vdu">>},
+ {<<"language">>, <<"javascript">>},
+ {<<"validate_doc_update">>, vdu()}
+ ],
+ Doc = couch_doc:from_json_obj({DocProps}, []),
+ {ok, Db} = couch_db:open_int(DbName, [?ADMIN_CTX]),
+ try
+ {ok, _Rev} = couch_db:update_doc(Db, Doc, [])
+ after
+ couch_db:close(Db)
+ end.
diff --git a/src/dreyfus/src/dreyfus_fabric.erl b/src/dreyfus/src/dreyfus_fabric.erl
index a953b6a38..0b25a6cc6 100644
--- a/src/dreyfus/src/dreyfus_fabric.erl
+++ b/src/dreyfus/src/dreyfus_fabric.erl
@@ -14,7 +14,7 @@
%% -*- erlang-indent-level: 4;indent-tabs-mode: nil -*-
-module(dreyfus_fabric).
--export([get_json_docs/2, handle_error_message/6]).
+-export([get_json_docs/2, handle_error_message/7]).
-include_lib("couch/include/couch_db.hrl").
-include_lib("mem3/include/mem3.hrl").
@@ -36,40 +36,42 @@ callback(timeout, _Acc) ->
{error, timeout}.
handle_error_message({rexi_DOWN, _, {_, NodeRef}, _}, _Worker,
- Counters, _Replacements, _StartFun, _StartArgs) ->
- case fabric_util:remove_down_workers(Counters, NodeRef) of
+ Counters, _Replacements, _StartFun, _StartArgs, RingOpts) ->
+ case fabric_util:remove_down_workers(Counters, NodeRef, RingOpts) of
{ok, NewCounters} ->
{ok, NewCounters};
error ->
{error, {nodedown, <<"progress not possible">>}}
end;
handle_error_message({rexi_EXIT, {maintenance_mode, _}}, Worker,
- Counters, Replacements, StartFun, StartArgs) ->
- handle_replacement(Worker, Counters, Replacements, StartFun, StartArgs);
+ Counters, Replacements, StartFun, StartArgs, RingOpts) ->
+ handle_replacement(Worker, Counters, Replacements, StartFun, StartArgs,
+ RingOpts);
handle_error_message({rexi_EXIT, Reason}, Worker,
- Counters, _Replacements, _StartFun, _StartArgs) ->
- handle_error(Reason, Worker, Counters);
+ Counters, _Replacements, _StartFun, _StartArgs, RingOpts) ->
+ handle_error(Reason, Worker, Counters, RingOpts);
handle_error_message({error, Reason}, Worker,
- Counters, _Replacements, _StartFun, _StartArgs) ->
- handle_error(Reason, Worker, Counters);
+ Counters, _Replacements, _StartFun, _StartArgs, RingOpts) ->
+ handle_error(Reason, Worker, Counters, RingOpts);
handle_error_message({'EXIT', Reason}, Worker,
- Counters, _Replacements, _StartFun, _StartArgs) ->
- handle_error({exit, Reason}, Worker, Counters);
+ Counters, _Replacements, _StartFun, _StartArgs, RingOpts) ->
+ handle_error({exit, Reason}, Worker, Counters, RingOpts);
handle_error_message(Reason, Worker, Counters,
- _Replacements, _StartFun, _StartArgs) ->
+ _Replacements, _StartFun, _StartArgs, RingOpts) ->
couch_log:error("Unexpected error during request: ~p", [Reason]),
- handle_error(Reason, Worker, Counters).
+ handle_error(Reason, Worker, Counters, RingOpts).
-handle_error(Reason, Worker, Counters0) ->
+handle_error(Reason, Worker, Counters0, RingOpts) ->
Counters = fabric_dict:erase(Worker, Counters0),
- case fabric_view:is_progress_possible(Counters) of
+ case fabric_ring:is_progress_possible(Counters, RingOpts) of
true ->
{ok, Counters};
false ->
{error, Reason}
end.
-handle_replacement(Worker, OldCntrs0, OldReplacements, StartFun, StartArgs) ->
+handle_replacement(Worker, OldCntrs0, OldReplacements, StartFun, StartArgs,
+ RingOpts) ->
OldCounters = lists:filter(fun({#shard{ref=R}, _}) ->
R /= Worker#shard.ref
end, OldCntrs0),
@@ -79,12 +81,12 @@ handle_replacement(Worker, OldCntrs0, OldReplacements, StartFun, StartArgs) ->
NewCounter = start_replacement(StartFun, StartArgs, Repl),
fabric_dict:store(NewCounter, nil, CounterAcc)
end, OldCounters, Replacements),
- true = fabric_view:is_progress_possible(NewCounters),
+ true = fabric_ring:is_progress_possible(NewCounters, RingOpts),
NewRefs = fabric_dict:fetch_keys(NewCounters),
{new_refs, NewRefs, NewCounters, NewReplacements};
false ->
handle_error({nodedown, <<"progress not possible">>},
- Worker, OldCounters)
+ Worker, OldCounters, RingOpts)
end.
start_replacement(StartFun, StartArgs, Shard) ->
@@ -106,3 +108,98 @@ start_replacement(StartFun, StartArgs, Shard) ->
{dreyfus_rpc, StartFun,
[Shard#shard.name|StartArgs1]}),
Shard#shard{ref = Ref}.
+
+
+-ifdef(TEST).
+
+-include_lib("eunit/include/eunit.hrl").
+
+
+node_down_test() ->
+ [S1, S2, S3] = [
+ mk_shard("n1", [0, 4]),
+ mk_shard("n1", [5, ?RING_END]),
+ mk_shard("n2", [0, ?RING_END])
+ ],
+ [W1, W2, W3] = [
+ S1#shard{ref = make_ref()},
+ S2#shard{ref = make_ref()},
+ S3#shard{ref = make_ref()}
+ ],
+ Counters1 = fabric_dict:init([W1, W2, W3], nil),
+
+ N1 = S1#shard.node,
+ Msg1 = {rexi_DOWN, nil, {nil, N1}, nil},
+ Res1 = handle_error_message(Msg1, nil, Counters1, nil, nil, nil, []),
+ ?assertEqual({ok, [{W3, nil}]}, Res1),
+
+ {ok, Counters2} = Res1,
+ N2 = S3#shard.node,
+ Msg2 = {rexi_DOWN, nil, {nil, N2}, nil},
+ Res2 = handle_error_message(Msg2, nil, Counters2, nil, nil, nil, []),
+ ?assertEqual({error, {nodedown, <<"progress not possible">>}}, Res2).
+
+
+worker_error_test() ->
+ [S1, S2] = [
+ mk_shard("n1", [0, ?RING_END]),
+ mk_shard("n2", [0, ?RING_END])
+ ],
+ [W1, W2] = [S1#shard{ref = make_ref()}, S2#shard{ref = make_ref()}],
+ Counters1 = fabric_dict:init([W1, W2], nil),
+
+ Res1 = handle_error(bam, W1, Counters1, []),
+ ?assertEqual({ok, [{W2, nil}]}, Res1),
+
+ {ok, Counters2} = Res1,
+ ?assertEqual({error, boom}, handle_error(boom, W2, Counters2, [])).
+
+
+node_down_with_partitions_test() ->
+ [S1, S2] = [
+ mk_shard("n1", [0, 4]),
+ mk_shard("n2", [0, 8])
+ ],
+ [W1, W2] = [
+ S1#shard{ref = make_ref()},
+ S2#shard{ref = make_ref()}
+ ],
+ Counters1 = fabric_dict:init([W1, W2], nil),
+ RingOpts = [{any, [S1, S2]}],
+
+ N1 = S1#shard.node,
+ Msg1 = {rexi_DOWN, nil, {nil, N1}, nil},
+ Res1 = handle_error_message(Msg1, nil, Counters1, nil, nil, nil, RingOpts),
+ ?assertEqual({ok, [{W2, nil}]}, Res1),
+
+ {ok, Counters2} = Res1,
+ N2 = S2#shard.node,
+ Msg2 = {rexi_DOWN, nil, {nil, N2}, nil},
+ Res2 = handle_error_message(Msg2, nil, Counters2, nil, nil, nil, RingOpts),
+ ?assertEqual({error, {nodedown, <<"progress not possible">>}}, Res2).
+
+
+worker_error_with_partitions_test() ->
+ [S1, S2] = [
+ mk_shard("n1", [0, 4]),
+ mk_shard("n2", [0, 8])],
+ [W1, W2] = [
+ S1#shard{ref = make_ref()},
+ S2#shard{ref = make_ref()}
+ ],
+ Counters1 = fabric_dict:init([W1, W2], nil),
+ RingOpts = [{any, [S1, S2]}],
+
+ Res1 = handle_error(bam, W1, Counters1, RingOpts),
+ ?assertEqual({ok, [{W2, nil}]}, Res1),
+
+ {ok, Counters2} = Res1,
+ ?assertEqual({error, boom}, handle_error(boom, W2, Counters2, RingOpts)).
+
+
+mk_shard(Name, Range) ->
+ Node = list_to_atom(Name),
+ BName = list_to_binary(Name),
+ #shard{name = BName, node = Node, range = Range}.
+
+-endif.
diff --git a/src/dreyfus/src/dreyfus_fabric_group1.erl b/src/dreyfus/src/dreyfus_fabric_group1.erl
index 2d530ca7e..bdae6f040 100644
--- a/src/dreyfus/src/dreyfus_fabric_group1.erl
+++ b/src/dreyfus/src/dreyfus_fabric_group1.erl
@@ -27,7 +27,8 @@
top_groups,
counters,
start_args,
- replacements
+ replacements,
+ ring_opts
}).
go(DbName, GroupId, IndexName, QueryArgs) when is_binary(GroupId) ->
@@ -39,6 +40,7 @@ go(DbName, DDoc, IndexName, #index_query_args{}=QueryArgs) ->
DesignName = dreyfus_util:get_design_docid(DDoc),
dreyfus_util:maybe_deny_index(DbName, DesignName, IndexName),
Shards = dreyfus_util:get_shards(DbName, QueryArgs),
+ RingOpts = dreyfus_util:get_ring_opts(QueryArgs, Shards),
Workers = fabric_util:submit_jobs(Shards, dreyfus_rpc, group1, [DDoc,
IndexName, dreyfus_util:export(QueryArgs)]),
Replacements = fabric_view:get_shard_replacements(DbName, Workers),
@@ -50,7 +52,8 @@ go(DbName, DDoc, IndexName, #index_query_args{}=QueryArgs) ->
top_groups = [],
counters = Counters,
start_args = [DDoc, IndexName, QueryArgs],
- replacements = Replacements
+ replacements = Replacements,
+ ring_opts = RingOpts
},
try
rexi_utils:recv(Workers, #shard.ref, fun handle_message/3,
@@ -89,7 +92,7 @@ handle_message(Error, Worker, State0) ->
State = upgrade_state(State0),
case dreyfus_fabric:handle_error_message(Error, Worker,
State#state.counters, State#state.replacements,
- group1, State#state.start_args) of
+ group1, State#state.start_args, State#state.ring_opts) of
{ok, Counters} ->
{ok, State#state{counters=Counters}};
{new_refs, NewRefs, NewCounters, NewReplacements} ->
diff --git a/src/dreyfus/src/dreyfus_fabric_group2.erl b/src/dreyfus/src/dreyfus_fabric_group2.erl
index 1239f8b74..8d864dd0c 100644
--- a/src/dreyfus/src/dreyfus_fabric_group2.erl
+++ b/src/dreyfus/src/dreyfus_fabric_group2.erl
@@ -29,7 +29,8 @@
top_groups,
counters,
start_args,
- replacements
+ replacements,
+ ring_opts
}).
go(DbName, GroupId, IndexName, QueryArgs) when is_binary(GroupId) ->
@@ -41,6 +42,7 @@ go(DbName, DDoc, IndexName, #index_query_args{}=QueryArgs) ->
DesignName = dreyfus_util:get_design_docid(DDoc),
dreyfus_util:maybe_deny_index(DbName, DesignName, IndexName),
Shards = dreyfus_util:get_shards(DbName, QueryArgs),
+ RingOpts = dreyfus_util:get_ring_opts(QueryArgs, Shards),
Workers = fabric_util:submit_jobs(Shards, dreyfus_rpc, group2,
[DDoc, IndexName, dreyfus_util:export(QueryArgs)]),
Replacements = fabric_view:get_shard_replacements(DbName, Workers),
@@ -54,7 +56,8 @@ go(DbName, DDoc, IndexName, #index_query_args{}=QueryArgs) ->
top_groups = [],
counters = Counters,
start_args = [DDoc, IndexName, QueryArgs],
- replacements = Replacements
+ replacements = Replacements,
+ ring_opts = RingOpts
},
try
rexi_utils:recv(Workers, #shard.ref, fun handle_message/3,
@@ -102,7 +105,7 @@ handle_message(Error, Worker, State0) ->
State = upgrade_state(State0),
case dreyfus_fabric:handle_error_message(Error, Worker,
State#state.counters, State#state.replacements,
- group2, State#state.start_args) of
+ group2, State#state.start_args, State#state.ring_opts) of
{ok, Counters} ->
{ok, State#state{counters=Counters}};
{new_refs, NewRefs, NewCounters, NewReplacements} ->
diff --git a/src/dreyfus/src/dreyfus_fabric_info.erl b/src/dreyfus/src/dreyfus_fabric_info.erl
index 27eec8065..e217bc0ef 100644
--- a/src/dreyfus/src/dreyfus_fabric_info.erl
+++ b/src/dreyfus/src/dreyfus_fabric_info.erl
@@ -49,7 +49,7 @@ handle_message({rexi_DOWN, _, {_,NodeRef},_}, _Worker, {Counters, Acc}) ->
handle_message({rexi_EXIT, Reason}, Worker, {Counters, Acc}) ->
NewCounters = fabric_dict:erase(Worker, Counters),
- case fabric_view:is_progress_possible(NewCounters) of
+ case fabric_ring:is_progress_possible(NewCounters) of
true ->
{ok, {NewCounters, Acc}};
false ->
@@ -74,7 +74,7 @@ handle_message({ok, Info}, Worker, {Counters, Acc}) ->
handle_message({error, Reason}, Worker, {Counters, Acc}) ->
NewCounters = fabric_dict:erase(Worker, Counters),
- case fabric_view:is_progress_possible(NewCounters) of
+ case fabric_ring:is_progress_possible(NewCounters) of
true ->
{ok, {NewCounters, Acc}};
false ->
@@ -82,7 +82,7 @@ handle_message({error, Reason}, Worker, {Counters, Acc}) ->
end;
handle_message({'EXIT', _}, Worker, {Counters, Acc}) ->
NewCounters = fabric_dict:erase(Worker, Counters),
- case fabric_view:is_progress_possible(NewCounters) of
+ case fabric_ring:is_progress_possible(NewCounters) of
true ->
{ok, {NewCounters, Acc}};
false ->
diff --git a/src/dreyfus/src/dreyfus_fabric_search.erl b/src/dreyfus/src/dreyfus_fabric_search.erl
index acf7a83ec..c0ebde1d6 100644
--- a/src/dreyfus/src/dreyfus_fabric_search.erl
+++ b/src/dreyfus/src/dreyfus_fabric_search.erl
@@ -27,7 +27,8 @@
top_docs,
counters,
start_args,
- replacements
+ replacements,
+ ring_opts
}).
go(DbName, GroupId, IndexName, QueryArgs) when is_binary(GroupId) ->
@@ -40,10 +41,11 @@ go(DbName, DDoc, IndexName, #index_query_args{bookmark=nil}=QueryArgs) ->
DesignName = dreyfus_util:get_design_docid(DDoc),
dreyfus_util:maybe_deny_index(DbName, DesignName, IndexName),
Shards = dreyfus_util:get_shards(DbName, QueryArgs),
+ RingOpts = dreyfus_util:get_ring_opts(QueryArgs, Shards),
Workers = fabric_util:submit_jobs(Shards, dreyfus_rpc, search,
[DDoc, IndexName, dreyfus_util:export(QueryArgs)]),
Counters = fabric_dict:init(Workers, nil),
- go(DbName, DDoc, IndexName, QueryArgs, Counters, Counters);
+ go(DbName, DDoc, IndexName, QueryArgs, Counters, Counters, RingOpts);
go(DbName, DDoc, IndexName, #index_query_args{}=QueryArgs) ->
Bookmark0 = try dreyfus_bookmark:unpack(DbName, QueryArgs)
@@ -54,6 +56,7 @@ go(DbName, DDoc, IndexName, #index_query_args{}=QueryArgs) ->
Shards = dreyfus_util:get_shards(DbName, QueryArgs),
LiveNodes = [node() | nodes()],
LiveShards = [S || #shard{node=Node} = S <- Shards, lists:member(Node, LiveNodes)],
+ RingOpts = dreyful_util:get_ring_opts(QueryArgs, LiveShards),
Bookmark1 = dreyfus_bookmark:add_missing_shards(Bookmark0, LiveShards),
Counters0 = lists:flatmap(fun({#shard{name=Name, node=N} = Shard, After}) ->
QueryArgs1 = dreyfus_util:export(QueryArgs#index_query_args{
@@ -73,14 +76,16 @@ go(DbName, DDoc, IndexName, #index_query_args{}=QueryArgs) ->
end
end, Bookmark1),
Counters = fabric_dict:init(Counters0, nil),
+ WorkerShards = fabric_dict:fetch_keys(Counters),
+ RingOpts = dreyfus_util:get_ring_opts(QueryArgs, WorkerShards),
QueryArgs2 = QueryArgs#index_query_args{
bookmark = Bookmark1
},
- go(DbName, DDoc, IndexName, QueryArgs2, Counters, Bookmark1);
+ go(DbName, DDoc, IndexName, QueryArgs2, Counters, Bookmark1, RingOpts);
go(DbName, DDoc, IndexName, OldArgs) ->
go(DbName, DDoc, IndexName, dreyfus_util:upgrade(OldArgs)).
-go(DbName, DDoc, IndexName, QueryArgs, Counters, Bookmark) ->
+go(DbName, DDoc, IndexName, QueryArgs, Counters, Bookmark, RingOpts) ->
{Workers, _} = lists:unzip(Counters),
#index_query_args{
limit = Limit,
@@ -94,7 +99,8 @@ go(DbName, DDoc, IndexName, QueryArgs, Counters, Bookmark) ->
top_docs = #top_docs{total_hits=0,hits=[]},
counters = Counters,
start_args = [DDoc, IndexName, QueryArgs],
- replacements = Replacements
+ replacements = Replacements,
+ ring_opts = RingOpts
},
RexiMon = fabric_util:create_monitors(Workers),
try rexi_utils:recv(Workers, #shard.ref, fun handle_message/3,
@@ -154,7 +160,7 @@ handle_message(Error, Worker, State0) ->
State = upgrade_state(State0),
case dreyfus_fabric:handle_error_message(Error, Worker,
State#state.counters, State#state.replacements,
- search, State#state.start_args) of
+ search, State#state.start_args, State#state.ring_opts) of
{ok, Counters} ->
{ok, State#state{counters=Counters}};
{new_refs, NewRefs, NewCounters, NewReplacements} ->
diff --git a/src/dreyfus/src/dreyfus_httpd.erl b/src/dreyfus/src/dreyfus_httpd.erl
index 5c9db80d1..e9851639b 100644
--- a/src/dreyfus/src/dreyfus_httpd.erl
+++ b/src/dreyfus/src/dreyfus_httpd.erl
@@ -73,8 +73,6 @@ handle_search_req(#httpd{method=Method, path_parts=[_, _, _, _, IndexName]}=Req
end;
_ ->
% ensure limit in group query >0
- LimitValue = parse_positive_int_param("limit", QueryArgs#index_query_args.limit,
- "max_limit", "200"),
UseNewApi = Grouping#grouping.new_api,
case dreyfus_fabric_group1:go(DbName, DDoc, IndexName, QueryArgs) of
{ok, []} ->
diff --git a/src/dreyfus/src/dreyfus_index_updater.erl b/src/dreyfus/src/dreyfus_index_updater.erl
index 3720cb63c..87edef0ad 100644
--- a/src/dreyfus/src/dreyfus_index_updater.erl
+++ b/src/dreyfus/src/dreyfus_index_updater.erl
@@ -59,7 +59,7 @@ update(IndexPid, Index) ->
true = proc_prompt(Proc, [<<"add_fun">>, Index#index.def]),
EnumFun = fun ?MODULE:load_docs/2,
[Changes] = couch_task_status:get([changes_done]),
- Acc0 = {Changes, IndexPid, Db, Proc, TotalChanges, now(), ExcludeIdRevs},
+ Acc0 = {Changes, IndexPid, Db, Proc, TotalChanges, erlang:timestamp(), ExcludeIdRevs},
{ok, _} = couch_db:fold_changes(Db, CurSeq, EnumFun, Acc0, []),
ok = clouseau_rpc:commit(IndexPid, NewCurSeq)
after
@@ -80,7 +80,7 @@ load_docs(FDI, {I, IndexPid, Db, Proc, Total, LastCommitTime, ExcludeIdRevs}=Acc
false -> update_or_delete_index(IndexPid, Db, DI, Proc)
end,
%% Force a commit every minute
- case timer:now_diff(Now = now(), LastCommitTime) >= 60000000 of
+ case timer:now_diff(Now = erlang:timestamp(), LastCommitTime) >= 60000000 of
true ->
ok = clouseau_rpc:commit(IndexPid, Seq),
{ok, {I+1, IndexPid, Db, Proc, Total, Now, ExcludeIdRevs}};
diff --git a/src/dreyfus/src/dreyfus_util.erl b/src/dreyfus/src/dreyfus_util.erl
index 6832299db..05ecdb621 100644
--- a/src/dreyfus/src/dreyfus_util.erl
+++ b/src/dreyfus/src/dreyfus_util.erl
@@ -19,7 +19,7 @@
-include_lib("mem3/include/mem3.hrl").
-include_lib("couch/include/couch_db.hrl").
--export([get_shards/2, sort/2, upgrade/1, export/1, time/2]).
+-export([get_shards/2, get_ring_opts/2, sort/2, upgrade/1, export/1, time/2]).
-export([in_black_list/1, in_black_list/3, maybe_deny_index/3]).
-export([get_design_docid/1]).
-export([
@@ -59,6 +59,15 @@ use_ushards(#index_query_args{stable=true}) ->
use_ushards(#index_query_args{}) ->
false.
+
+get_ring_opts(#index_query_args{partition = nil}, _Shards) ->
+ [];
+get_ring_opts(#index_query_args{}, Shards) ->
+ Shards1 = lists:map(fun(#shard{} = S) ->
+ S#shard{ref = undefined}
+ end, Shards),
+ [{any, Shards1}].
+
-spec sort(Order :: relevance | [any()], [#sortable{}]) -> [#sortable{}].
sort(Sort, List0) ->
{List1, Stash} = stash_items(List0),
@@ -342,7 +351,7 @@ get_signature_from_idxdir(IdxDir) ->
false -> undefined
end.
-get_local_purge_doc_body(Db, LocalDocId, PurgeSeq, Index) ->
+get_local_purge_doc_body(_, LocalDocId, PurgeSeq, Index) ->
#index{
name = IdxName,
ddoc_id = DDocId,
@@ -418,4 +427,15 @@ stash_test() ->
Unstashed = hd(unstash_items(Stashed, Stash)),
?assertEqual(Unstashed#sortable.item, bar).
+
+ring_opts_test() ->
+ Shards = [#shard{name = foo, ref = make_ref()}],
+
+ QArgs1 = #index_query_args{partition = nil},
+ ?assertEqual([], get_ring_opts(QArgs1, Shards)),
+
+ QArgs2 = #index_query_args{partition = <<"x">>},
+ ?assertMatch([{any, [#shard{name = foo, ref = undefined}]}],
+ get_ring_opts(QArgs2, Shards)).
+
-endif.
diff --git a/src/fabric/src/fabric_db_partition_info.erl b/src/fabric/src/fabric_db_partition_info.erl
index 2978832f0..954c52db2 100644
--- a/src/fabric/src/fabric_db_partition_info.erl
+++ b/src/fabric/src/fabric_db_partition_info.erl
@@ -17,15 +17,27 @@
-include_lib("fabric/include/fabric.hrl").
-include_lib("mem3/include/mem3.hrl").
+
+-record(acc, {
+ counters,
+ replies,
+ ring_opts
+}).
+
+
go(DbName, Partition) ->
- Shards = mem3:shards(DbName, <<Partition/binary, ":foo">>),
+ Shards = mem3:shards(DbName, couch_partition:shard_key(Partition)),
Workers = fabric_util:submit_jobs(Shards, get_partition_info, [Partition]),
RexiMon = fabric_util:create_monitors(Shards),
Fun = fun handle_message/3,
- Acc0 = {fabric_dict:init(Workers, nil), []},
+ Acc0 = #acc{
+ counters = fabric_dict:init(Workers, nil),
+ replies = [],
+ ring_opts = [{any, Shards}]
+ },
try
case fabric_util:recv(Workers, #shard.ref, Fun, Acc0) of
- {ok, Acc} -> {ok, Acc};
+ {ok, Res} -> {ok, Res};
{timeout, {WorkersDict, _}} ->
DefunctWorkers = fabric_util:remove_done_workers(
WorkersDict,
@@ -42,36 +54,39 @@ go(DbName, Partition) ->
rexi_monitor:stop(RexiMon)
end.
-handle_message({rexi_DOWN, _, {_,NodeRef},_}, _Shard, {Counters, Acc}) ->
- case fabric_util:remove_down_workers(Counters, NodeRef) of
+handle_message({rexi_DOWN, _, {_,NodeRef},_}, _Shard, #acc{} = Acc) ->
+ #acc{counters = Counters, ring_opts = RingOpts} = Acc,
+ case fabric_util:remove_down_workers(Counters, NodeRef, RingOpts) of
{ok, NewCounters} ->
- {ok, {NewCounters, Acc}};
+ {ok, Acc#acc{counters = NewCounters}};
error ->
{error, {nodedown, <<"progress not possible">>}}
end;
-handle_message({rexi_EXIT, Reason}, Shard, {Counters, Acc}) ->
+handle_message({rexi_EXIT, Reason}, Shard, #acc{} = Acc) ->
+ #acc{counters = Counters, ring_opts = RingOpts} = Acc,
NewCounters = fabric_dict:erase(Shard, Counters),
- case fabric_ring:is_progress_possible(NewCounters) of
+ case fabric_ring:is_progress_possible(NewCounters, RingOpts) of
true ->
- {ok, {NewCounters, Acc}};
+ {ok, Acc#acc{counters = NewCounters}};
false ->
{error, Reason}
end;
-handle_message({ok, Info}, #shard{dbname=Name} = Shard, {Counters, Acc}) ->
- Acc2 = [Info | Acc],
+handle_message({ok, Info}, #shard{dbname=Name} = Shard, #acc{} = Acc) ->
+ #acc{counters = Counters, replies = Replies} = Acc,
+ Replies1 = [Info | Replies],
Counters1 = fabric_dict:erase(Shard, Counters),
case fabric_dict:size(Counters1) =:= 0 of
true ->
- [FirstInfo | RestInfos] = Acc2,
+ [FirstInfo | RestInfos] = Replies1,
PartitionInfo = get_max_partition_size(FirstInfo, RestInfos),
{stop, [{db_name, Name} | format_partition(PartitionInfo)]};
false ->
- {ok, {Counters1, Acc2}}
+ {ok, Acc#acc{counters = Counters1, replies = Replies1}}
end;
-handle_message(_, _, Acc) ->
+handle_message(_, _, #acc{} = Acc) ->
{ok, Acc}.
@@ -97,3 +112,44 @@ format_partition(PartitionInfo) ->
{value, {sizes, Size}, PartitionInfo1} = lists:keytake(sizes, 1, PartitionInfo),
[{sizes, {Size}} | PartitionInfo1].
+
+-ifdef(TEST).
+
+-include_lib("eunit/include/eunit.hrl").
+
+
+node_down_test() ->
+ [S1, S2] = [mk_shard("n1", [0, 4]), mk_shard("n2", [0, 8])],
+ Acc1 = #acc{
+ counters = fabric_dict:init([S1, S2], nil),
+ ring_opts = [{any, [S1, S2]}]
+ },
+
+ N1 = S1#shard.node,
+ {ok, Acc2} = handle_message({rexi_DOWN, nil, {nil, N1}, nil}, nil, Acc1),
+ ?assertEqual([{S2, nil}], Acc2#acc.counters),
+
+ N2 = S2#shard.node,
+ ?assertEqual({error, {nodedown, <<"progress not possible">>}},
+ handle_message({rexi_DOWN, nil, {nil, N2}, nil}, nil, Acc2)).
+
+
+worker_exit_test() ->
+ [S1, S2] = [mk_shard("n1", [0, 4]), mk_shard("n2", [0, 8])],
+ Acc1 = #acc{
+ counters = fabric_dict:init([S1, S2], nil),
+ ring_opts = [{any, [S1, S2]}]
+ },
+
+ {ok, Acc2} = handle_message({rexi_EXIT, boom}, S1, Acc1),
+ ?assertEqual([{S2, nil}], Acc2#acc.counters),
+
+ ?assertEqual({error, bam}, handle_message({rexi_EXIT, bam}, S2, Acc2)).
+
+
+mk_shard(Name, Range) ->
+ Node = list_to_atom(Name),
+ BName = list_to_binary(Name),
+ #shard{name = BName, node = Node, range = Range}.
+
+-endif.
diff --git a/src/fabric/src/fabric_util.erl b/src/fabric/src/fabric_util.erl
index aaf0623f0..8aa14e73a 100644
--- a/src/fabric/src/fabric_util.erl
+++ b/src/fabric/src/fabric_util.erl
@@ -14,7 +14,7 @@
-export([submit_jobs/3, submit_jobs/4, cleanup/1, recv/4, get_db/1, get_db/2, error_info/1,
update_counter/3, remove_ancestors/2, create_monitors/1, kv/2,
- remove_down_workers/2, doc_id_and_rev/1]).
+ remove_down_workers/2, remove_down_workers/3, doc_id_and_rev/1]).
-export([request_timeout/0, attachments_timeout/0, all_docs_timeout/0, view_timeout/1]).
-export([log_timeout/2, remove_done_workers/2]).
-export([is_users_db/1, is_replicator_db/1]).
@@ -33,9 +33,12 @@
-include_lib("eunit/include/eunit.hrl").
remove_down_workers(Workers, BadNode) ->
+ remove_down_workers(Workers, BadNode, []).
+
+remove_down_workers(Workers, BadNode, RingOpts) ->
Filter = fun(#shard{node = Node}, _) -> Node =/= BadNode end,
NewWorkers = fabric_dict:filter(Filter, Workers),
- case fabric_ring:is_progress_possible(NewWorkers) of
+ case fabric_ring:is_progress_possible(NewWorkers, RingOpts) of
true ->
{ok, NewWorkers};
false ->
diff --git a/src/fabric/src/fabric_view.erl b/src/fabric/src/fabric_view.erl
index 55b44e6f7..425f864c4 100644
--- a/src/fabric/src/fabric_view.erl
+++ b/src/fabric/src/fabric_view.erl
@@ -12,7 +12,7 @@
-module(fabric_view).
--export([is_progress_possible/1, remove_overlapping_shards/2, maybe_send_row/1,
+-export([remove_overlapping_shards/2, maybe_send_row/1,
transform_row/1, keydict/1, extract_view/4, get_shards/2,
check_down_shards/2, handle_worker_exit/3,
get_shard_replacements/2, maybe_update_others/5]).
@@ -46,10 +46,6 @@ handle_worker_exit(Collector, _Worker, Reason) ->
{ok, Resp} = Callback({error, fabric_util:error_info(Reason)}, Acc),
{error, Resp}.
-%% @doc looks for a fully covered keyrange in the list of counters
--spec is_progress_possible([{#shard{}, term()}]) -> boolean().
-is_progress_possible(Counters) ->
- fabric_ring:is_progress_possible(Counters).
-spec remove_overlapping_shards(#shard{}, [{#shard{}, any()}]) ->
[{#shard{}, any()}].
@@ -416,28 +412,6 @@ fix_skip_and_limit(#mrargs{} = Args) ->
remove_finalizer(Args) ->
couch_mrview_util:set_extra(Args, finalizer, null).
-% unit test
-is_progress_possible_test() ->
- EndPoint = 2 bsl 31,
- T1 = [[0, EndPoint-1]],
- ?assertEqual(is_progress_possible(mk_cnts(T1)),true),
- T2 = [[0,10],[11,20],[21,EndPoint-1]],
- ?assertEqual(is_progress_possible(mk_cnts(T2)),true),
- % gap
- T3 = [[0,10],[12,EndPoint-1]],
- ?assertEqual(is_progress_possible(mk_cnts(T3)),false),
- % outside range
- T4 = [[1,10],[11,20],[21,EndPoint-1]],
- ?assertEqual(is_progress_possible(mk_cnts(T4)),false),
- % outside range
- T5 = [[0,10],[11,20],[21,EndPoint]],
- ?assertEqual(is_progress_possible(mk_cnts(T5)),false),
- T6 = [[0, 10], [11, 20], [0, 5], [6, 21], [21, EndPoint - 1]],
- ?assertEqual(is_progress_possible(mk_cnts(T6)), true),
- % not possible, overlap is not exact
- T7 = [[0, 10], [13, 20], [21, EndPoint - 1], [9, 12]],
- ?assertEqual(is_progress_possible(mk_cnts(T7)), false).
-
remove_overlapping_shards_test() ->
Cb = undefined,
@@ -482,10 +456,6 @@ get_shard_replacements_test() ->
?assertEqual(Expect, Res).
-mk_cnts(Ranges) ->
- Shards = lists:map(fun mk_shard/1, Ranges),
- orddict:from_list([{Shard,nil} || Shard <- Shards]).
-
mk_cnts(Ranges, NoNodes) ->
orddict:from_list([{Shard,nil}
|| Shard <-
@@ -502,10 +472,6 @@ mk_shards(NoNodes,Range,Shards) ->
mk_shards(NoNodes-1,Range, [mk_shard(Name, Range) | Shards]).
-mk_shard([B, E]) when is_integer(B), is_integer(E) ->
- #shard{range = [B, E]}.
-
-
mk_shard(Name, Range) ->
Node = list_to_atom(Name),
BName = list_to_binary(Name),
diff --git a/src/ken/rebar.config.script b/src/ken/rebar.config.script
index 26d6f4caa..3344206e5 100644
--- a/src/ken/rebar.config.script
+++ b/src/ken/rebar.config.script
@@ -11,7 +11,9 @@
% the License.
HaveDreyfus = element(1, file:list_dir("../dreyfus")) == ok.
-HaveHastings = element(1, file:list_dir("../hastings")) == ok.
+
+HastingsHome = os:getenv("HASTINGS_HOME", "../hastings").
+HaveHastings = element(1, file:list_dir(HastingsHome)) == ok.
CurrOpts = case lists:keyfind(erl_opts, 1, CONFIG) of
{erl_opts, Opts} -> Opts;
diff --git a/src/mem3/src/mem3_rep.erl b/src/mem3/src/mem3_rep.erl
index 4b75846ca..7fa0fc027 100644
--- a/src/mem3/src/mem3_rep.erl
+++ b/src/mem3/src/mem3_rep.erl
@@ -749,7 +749,6 @@ targets_map(#shard{name = <<"shards/", _/binary>> = SrcName} = Src,
Shards0 = mem3:shards(mem3:dbname(SrcName)),
Shards1 = [S || S <- Shards0, not shard_eq(S, Src)],
Shards2 = [S || S <- Shards1, check_overlap(SrcRange, TgtNode, S)],
- TMap = maps:from_list([{R, S} || #shard{range = R} = S <- Shards2]),
case [{R, S} || #shard{range = R} = S <- Shards2] of
[] ->
% If target map is empty, create a target map with just
diff --git a/src/mem3/src/mem3_sync_event_listener.erl b/src/mem3/src/mem3_sync_event_listener.erl
index cd8a650f1..b6fbe3279 100644
--- a/src/mem3/src/mem3_sync_event_listener.erl
+++ b/src/mem3/src/mem3_sync_event_listener.erl
@@ -293,11 +293,12 @@ should_terminate(Pid) ->
?assert(is_process_alive(Pid)),
EventMgr = whereis(config_event),
+ EventMgrWasAlive = (catch is_process_alive(EventMgr)),
Ref = erlang:monitor(process, Pid),
RestartFun = fun() -> exit(EventMgr, kill) end,
- test_util:with_process_restart(config_event, RestartFun),
+ {_, _} = test_util:with_process_restart(config_event, RestartFun),
?assertNot(is_process_alive(EventMgr)),
@@ -305,6 +306,9 @@ should_terminate(Pid) ->
{'DOWN', Ref, _, _, _} ->
ok
after 1000 ->
+ ?debugFmt("~n XKCD should_terminate EventMgrWasAlive:~p MsgQueue:~p PInfo:~p ~n", [
+ EventMgrWasAlive, process_info(self(), messages), process_info(Pid)
+ ]),
?assert(false)
end,
diff --git a/test/elixir/test/replication_test.exs b/test/elixir/test/replication_test.exs
index 73ceca6a4..bdd683e97 100644
--- a/test/elixir/test/replication_test.exs
+++ b/test/elixir/test/replication_test.exs
@@ -75,8 +75,8 @@ defmodule ReplicationTest do
assert is_list(result["history"])
assert length(result["history"]) == 2
history = Enum.at(result["history"], 0)
- assert history["docs_written"] == 1
- assert history["docs_read"] == 1
+ assert history["docs_written"] == 2
+ assert history["docs_read"] == 2
assert history["doc_write_failures"] == 0
query = %{
@@ -352,10 +352,10 @@ defmodule ReplicationTest do
assert history["session_id"] == result["session_id"]
assert is_binary(history["start_time"])
assert is_binary(history["end_time"])
- assert history["missing_checked"] == 6
- assert history["missing_found"] == 6
- assert history["docs_read"] == 6
- assert history["docs_written"] == 6
+ assert history["missing_checked"] == 27
+ assert history["missing_found"] == 27
+ assert history["docs_read"] == 27
+ assert history["docs_written"] == 27
assert history["doc_write_failures"] == 0
copy = Couch.get!("/#{tgt_db_name}/#{new_doc["_id"]}").body
@@ -414,10 +414,10 @@ defmodule ReplicationTest do
assert is_list(result["history"])
assert length(result["history"]) == 3
history = Enum.at(result["history"], 0)
- assert history["missing_checked"] == 1
- assert history["missing_found"] == 1
- assert history["docs_read"] == 1
- assert history["docs_written"] == 1
+ assert history["missing_checked"] == 28
+ assert history["missing_found"] == 28
+ assert history["docs_read"] == 28
+ assert history["docs_written"] == 28
assert history["doc_write_failures"] == 0
resp = Couch.get("/#{tgt_db_name}/#{del_doc["_id"]}")
@@ -446,10 +446,10 @@ defmodule ReplicationTest do
assert is_list(result["history"])
assert length(result["history"]) == 4
history = Enum.at(result["history"], 0)
- assert history["missing_checked"] == 1
- assert history["missing_found"] == 1
- assert history["docs_read"] == 1
- assert history["docs_written"] == 1
+ assert history["missing_checked"] == 29
+ assert history["missing_found"] == 29
+ assert history["docs_read"] == 29
+ assert history["docs_written"] == 29
assert history["doc_write_failures"] == 0
copy = Couch.get!("/#{tgt_db_name}/2", query: %{:conflicts => true}).body
@@ -473,10 +473,10 @@ defmodule ReplicationTest do
assert is_list(result["history"])
assert length(result["history"]) == 5
history = Enum.at(result["history"], 0)
- assert history["missing_checked"] == 1
- assert history["missing_found"] == 1
- assert history["docs_read"] == 1
- assert history["docs_written"] == 1
+ assert history["missing_checked"] == 30
+ assert history["missing_found"] == 30
+ assert history["docs_read"] == 30
+ assert history["docs_written"] == 30
assert history["doc_write_failures"] == 0
copy = Couch.get!("/#{tgt_db_name}/2", query: %{:conflicts => true}).body
@@ -502,10 +502,10 @@ defmodule ReplicationTest do
assert is_list(result["history"])
assert length(result["history"]) == 6
history = Enum.at(result["history"], 0)
- assert history["missing_checked"] == 1
- assert history["missing_found"] == 1
- assert history["docs_read"] == 1
- assert history["docs_written"] == 1
+ assert history["missing_checked"] == 31
+ assert history["missing_found"] == 31
+ assert history["docs_read"] == 31
+ assert history["docs_written"] == 31
assert history["doc_write_failures"] == 0
copy = Couch.get!("/#{tgt_db_name}/2", query: %{:conflicts => true}).body
@@ -534,10 +534,10 @@ defmodule ReplicationTest do
assert is_list(result["history"])
assert length(result["history"]) == 7
history = Enum.at(result["history"], 0)
- assert history["missing_checked"] == 3
- assert history["missing_found"] == 1
- assert history["docs_read"] == 1
- assert history["docs_written"] == 1
+ assert history["missing_checked"] == 34
+ assert history["missing_found"] == 32
+ assert history["docs_read"] == 32
+ assert history["docs_written"] == 32
assert history["doc_write_failures"] == 0
docs = [
@@ -559,10 +559,10 @@ defmodule ReplicationTest do
assert is_list(result["history"])
assert length(result["history"]) == 8
history = Enum.at(result["history"], 0)
- assert history["missing_checked"] == 2
- assert history["missing_found"] == 0
- assert history["docs_read"] == 0
- assert history["docs_written"] == 0
+ assert history["missing_checked"] == 36
+ assert history["missing_found"] == 32
+ assert history["docs_read"] == 32
+ assert history["docs_written"] == 32
assert history["doc_write_failures"] == 0
# Test nothing to replicate
@@ -822,10 +822,10 @@ defmodule ReplicationTest do
assert length(result["history"]) == 2
history = Enum.at(result["history"], 0)
- assert history["missing_checked"] == 3
- assert history["missing_found"] == 3
- assert history["docs_read"] == 3
- assert history["docs_written"] == 3
+ assert history["missing_checked"] == 19
+ assert history["missing_found"] == 19
+ assert history["docs_read"] == 19
+ assert history["docs_written"] == 19
assert history["doc_write_failures"] == 0
end
@@ -1185,8 +1185,8 @@ defmodule ReplicationTest do
result = replicate(repl_src, repl_tgt, body: repl_body)
assert result["ok"]
- assert result["docs_read"] == 1
- assert result["docs_written"] == 1
+ assert result["docs_read"] == 2
+ assert result["docs_written"] == 2
assert result["doc_write_failures"] == 0
retry_until(fn ->