diff options
author | Joan Touzet <wohali@users.noreply.github.com> | 2020-01-16 13:39:09 -0500 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-01-16 13:39:09 -0500 |
commit | 7429f59bb087a152b379108169b7d5ee9ca3b6f2 (patch) | |
tree | f752922656cc30757517fffb8ca0629d68b877af | |
parent | f55bd0006947fa88b8e5c3efd795e7264f8fd3a1 (diff) | |
parent | d7188ba8dea81739e8535b92ede1c6613bb598f8 (diff) | |
download | couchdb-mango_metrics.tar.gz |
Merge branch 'master' into mango_metricsmango_metrics
33 files changed, 888 insertions, 224 deletions
diff --git a/.gitignore b/.gitignore index 5eec70f3e..3fa860c59 100644 --- a/.gitignore +++ b/.gitignore @@ -8,6 +8,7 @@ *~ .venv .DS_Store +.vscode .rebar/ .eunit/ cover/ @@ -187,7 +187,7 @@ same "printed page" as the copyright notice for easier identification within third-party archives. - Copyright 2019 The Apache Foundation + Copyright 2020 The Apache Foundation Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -1,5 +1,5 @@ Apache CouchDB -Copyright 2009-2019 The Apache Software Foundation +Copyright 2009-2020 The Apache Software Foundation This product includes software developed at The Apache Software Foundation (http://www.apache.org/). diff --git a/build-aux/Jenkinsfile.full b/build-aux/Jenkinsfile.full index 2472d7827..c9327f800 100644 --- a/build-aux/Jenkinsfile.full +++ b/build-aux/Jenkinsfile.full @@ -467,22 +467,57 @@ pipeline { } // post } // stage + stage('Debian Buster arm64v8') { + agent { + docker { + image 'couchdbdev/arm64v8-debian-buster-erlang-20.3.8.24-1:latest' + label 'arm64v8' + alwaysPull true + args "${DOCKER_ARGS}" + } + } + environment { + platform = 'buster' + sm_ver = '60' + } + stages { + stage('Build from tarball & test') { + steps { + unstash 'tarball' + sh( script: build_and_test ) + } + post { + always { + junit '**/.eunit/*.xml, **/_build/*/lib/couchdbtest/*.xml, **/src/mango/nosetests.xml, **/test/javascript/junit.xml' + } + } + } + stage('Build CouchDB packages') { + steps { + sh( script: make_packages ) + sh( script: cleanup_and_save ) + } + post { + success { + archiveArtifacts artifacts: 'pkgs/**', fingerprint: true + } + } + } + } // stages + post { + cleanup { + sh 'rm -rf ${WORKSPACE}/*' + } + } // post + } // stage + + /* - * - * This is just taking too long to run. Right now, the khash tests are timing out - * on the IBM servers: - * - * [2019-12-31T20:58:48.704Z] khash randomized test - * [2019-12-31T20:59:04.869Z] khash_test:103: randomized_test_ (State matches dict implementation)...*timed out* - * - * So, this is DISABLED until we get an actual arm builder machine. - * - * ppc64le is actually slower to emulate than arm, so we're not even going to try that. + * Example of how to do a qemu-based run, please leave here */ /* stage('Debian Buster arm64v8') { - // once we have an arm64v8 node again, can restore this to original form that is less ugly // the process is convoluted to ensure we have the latest qemu static binaries on the node first // before trying to run a foreign docker container type. Alternately ensuring the `update_qemu` // container is run on every Jenkins agent *after every restart of the Docker daemon* would work. diff --git a/rebar.config.script b/rebar.config.script index 5d5a6aac3..e39a08228 100644 --- a/rebar.config.script +++ b/rebar.config.script @@ -158,7 +158,7 @@ DepDescs = [ {hyper, "hyper", {tag, "CouchDB-2.2.0-4"}}, {ibrowse, "ibrowse", {tag, "CouchDB-4.0.1-1"}}, {jiffy, "jiffy", {tag, "CouchDB-0.14.11-2"}}, -{mochiweb, "mochiweb", {tag, "v2.19.0"}}, +{mochiweb, "mochiweb", {tag, "v2.20.0"}}, {meck, "meck", {tag, "0.8.8"}} ], diff --git a/rel/overlay/etc/default.ini b/rel/overlay/etc/default.ini index 64265dce4..506d88784 100644 --- a/rel/overlay/etc/default.ini +++ b/rel/overlay/etc/default.ini @@ -245,6 +245,8 @@ iterations = 10 ; iterations for password hashing ; secret = ; users_db_public = false ; cookie_domain = example.com +; Set the SameSite cookie property for the auth cookie. If empty, the SameSite property is not set. +; same_site = ; CSP (Content Security Policy) Support for _utils [csp] diff --git a/src/couch/priv/couch_js/60/main.cpp b/src/couch/priv/couch_js/60/main.cpp index e36bc619b..e78dbb46d 100644 --- a/src/couch/priv/couch_js/60/main.cpp +++ b/src/couch/priv/couch_js/60/main.cpp @@ -416,6 +416,9 @@ main(int argc, const char* argv[]) if(cx == NULL) return 1; + JS_SetGlobalJitCompilerOption(cx, JSJITCOMPILER_BASELINE_ENABLE, 0); + JS_SetGlobalJitCompilerOption(cx, JSJITCOMPILER_ION_ENABLE, 0); + if (!JS::InitSelfHostedCode(cx)) return 1; diff --git a/src/couch/src/couch_bt_engine.erl b/src/couch/src/couch_bt_engine.erl index b659719f5..48e751a82 100644 --- a/src/couch/src/couch_bt_engine.erl +++ b/src/couch/src/couch_bt_engine.erl @@ -994,7 +994,7 @@ upgrade_purge_info(Fd, Header) -> _ -> {ok, PurgedIdsRevs} = couch_file:pread_term(Fd, Ptr), - {Infos, NewSeq} = lists:foldl(fun({Id, Revs}, {InfoAcc, PSeq}) -> + {Infos, _} = lists:foldl(fun({Id, Revs}, {InfoAcc, PSeq}) -> Info = {PSeq, couch_uuids:random(), Id, Revs}, {[Info | InfoAcc], PSeq + 1} end, {[], PurgeSeq}, PurgedIdsRevs), diff --git a/src/couch/src/couch_httpd_auth.erl b/src/couch/src/couch_httpd_auth.erl index 515ce6132..96de5bf3b 100644 --- a/src/couch/src/couch_httpd_auth.erl +++ b/src/couch/src/couch_httpd_auth.erl @@ -273,7 +273,7 @@ cookie_auth_cookie(Req, User, Secret, TimeStamp) -> Hash = crypto:hmac(sha, Secret, SessionData), mochiweb_cookies:cookie("AuthSession", couch_util:encodeBase64Url(SessionData ++ ":" ++ ?b2l(Hash)), - [{path, "/"}] ++ cookie_scheme(Req) ++ max_age() ++ cookie_domain()). + [{path, "/"}] ++ cookie_scheme(Req) ++ max_age() ++ cookie_domain() ++ same_site()). ensure_cookie_auth_secret() -> case config:get("couch_httpd_auth", "secret", undefined) of @@ -457,6 +457,20 @@ cookie_domain() -> _ -> [{domain, Domain}] end. + +same_site() -> + SameSite = config:get("couch_httpd_auth", "same_site", ""), + case string:to_lower(SameSite) of + "" -> []; + "none" -> [{same_site, none}]; + "lax" -> [{same_site, lax}]; + "strict" -> [{same_site, strict}]; + _ -> + couch_log:error("invalid config value couch_httpd_auth.same_site: ~p ",[SameSite]), + [] + end. + + reject_if_totp(User) -> case get_totp_config(User) of undefined -> diff --git a/src/couch/test/exunit/same_site_cookie_tests.exs b/src/couch/test/exunit/same_site_cookie_tests.exs new file mode 100644 index 000000000..bad32ada4 --- /dev/null +++ b/src/couch/test/exunit/same_site_cookie_tests.exs @@ -0,0 +1,44 @@ +defmodule SameSiteCookieTests do + use CouchTestCase + + @moduletag :authentication + + def get_cookie(user, pass) do + resp = Couch.post("/_session", body: %{:username => user, :password => pass}) + + true = resp.body["ok"] + resp.headers[:"set-cookie"] + end + + @tag config: [{"admins", "jan", "apple"}, {"couch_httpd_auth", "same_site", "None"}] + test "Set same_site None" do + cookie = get_cookie("jan", "apple") + assert cookie =~ "; SameSite=None" + end + + @tag config: [{"admins", "jan", "apple"}, {"couch_httpd_auth", "same_site", ""}] + test "same_site not set" do + cookie = get_cookie("jan", "apple") + assert cookie + refute cookie =~ "; SameSite=" + end + + @tag config: [{"admins", "jan", "apple"}, {"couch_httpd_auth", "same_site", "Strict"}] + test "Set same_site Strict" do + cookie = get_cookie("jan", "apple") + assert cookie =~ "; SameSite=Strict" + end + + @tag config: [{"admins", "jan", "apple"}, {"couch_httpd_auth", "same_site", "Lax"}] + test "Set same_site Lax" do + cookie = get_cookie("jan", "apple") + assert cookie =~ "; SameSite=Lax" + end + + @tag config: [{"admins", "jan", "apple"}, {"couch_httpd_auth", "same_site", "Invalid"}] + test "Set same_site invalid" do + cookie = get_cookie("jan", "apple") + assert cookie + refute cookie =~ "; SameSite=" + end +end diff --git a/src/couch_mrview/src/couch_mrview_index.erl b/src/couch_mrview/src/couch_mrview_index.erl index 8542cc63f..c96d87173 100644 --- a/src/couch_mrview/src/couch_mrview_index.erl +++ b/src/couch_mrview/src/couch_mrview_index.erl @@ -127,6 +127,12 @@ open(Db, State0) -> NewSt = couch_mrview_util:init_state(Db, Fd, State, Header), ensure_local_purge_doc(Db, NewSt), {ok, NewSt}; + {ok, {WrongSig, _}} -> + couch_log:error("~s has the wrong signature: expected: ~p but got ~p", + [IndexFName, Sig, WrongSig]), + NewSt = couch_mrview_util:reset_index(Db, Fd, State), + ensure_local_purge_doc(Db, NewSt), + {ok, NewSt}; no_valid_header -> NewSt = couch_mrview_util:reset_index(Db, Fd, State), ensure_local_purge_doc(Db, NewSt), diff --git a/src/couch_replicator/src/couch_replicator_api_wrap.erl b/src/couch_replicator/src/couch_replicator_api_wrap.erl index ab1de7df9..a21de4242 100644 --- a/src/couch_replicator/src/couch_replicator_api_wrap.erl +++ b/src/couch_replicator/src/couch_replicator_api_wrap.erl @@ -186,7 +186,9 @@ get_missing_revs(#httpdb{} = Db, IdRevs) -> ), {Id, MissingRevs, PossibleAncestors} end, - {ok, lists:map(ConvertToNativeFun, Props)} + {ok, lists:map(ConvertToNativeFun, Props)}; + (ErrCode, _, ErrMsg) when is_integer(ErrCode) -> + {error, {revs_diff_failed, ErrCode, ErrMsg}} end). @@ -408,7 +410,9 @@ update_docs(#httpdb{} = HttpDb, DocList, Options, UpdateType) -> (413, _, _) -> {error, request_body_too_large}; (417, _, Results) when is_list(Results) -> - {ok, bulk_results_to_errors(DocList, Results, remote)} + {ok, bulk_results_to_errors(DocList, Results, remote)}; + (ErrCode, _, ErrMsg) when is_integer(ErrCode) -> + {error, {bulk_docs_failed, ErrCode, ErrMsg}} end). @@ -466,7 +470,9 @@ changes_since(#httpdb{headers = Headers1, timeout = InactiveTimeout} = HttpDb, end, parse_changes_feed(Options, UserFun2, DataStreamFun2) - end) + end); + (ErrCode, _, ErrMsg) when is_integer(ErrCode) -> + throw({retry_limit, {changes_req_failed, ErrCode, ErrMsg}}) end) catch exit:{http_request_failed, _, _, max_backoff} -> diff --git a/src/couch_replicator/src/couch_replicator_scheduler.erl b/src/couch_replicator/src/couch_replicator_scheduler.erl index f84860c6e..53c040e8c 100644 --- a/src/couch_replicator/src/couch_replicator_scheduler.erl +++ b/src/couch_replicator/src/couch_replicator_scheduler.erl @@ -318,8 +318,12 @@ handle_info({'DOWN', _Ref, process, Pid, normal}, State) -> update_running_jobs_stats(State#state.stats_pid), {noreply, State}; -handle_info({'DOWN', _Ref, process, Pid, Reason}, State) -> +handle_info({'DOWN', _Ref, process, Pid, Reason0}, State) -> {ok, Job} = job_by_pid(Pid), + Reason = case Reason0 of + {shutdown, ShutdownReason} -> ShutdownReason; + Other -> Other + end, ok = handle_crashed_job(Job, Reason, State), {noreply, State}; @@ -873,7 +877,7 @@ is_continuous(#job{rep = Rep}) -> % optimize some options to help the job make progress. -spec maybe_optimize_job_for_rate_limiting(#job{}) -> #job{}. maybe_optimize_job_for_rate_limiting(Job = #job{history = - [{{crashed, {shutdown, max_backoff}}, _} | _]}) -> + [{{crashed, max_backoff}, _} | _]}) -> Opts = [ {checkpoint_interval, 5000}, {worker_processes, 2}, diff --git a/src/couch_replicator/src/couch_replicator_scheduler_job.erl b/src/couch_replicator/src/couch_replicator_scheduler_job.erl index d69febb81..0b33419e1 100644 --- a/src/couch_replicator/src/couch_replicator_scheduler_job.erl +++ b/src/couch_replicator/src/couch_replicator_scheduler_job.erl @@ -73,8 +73,6 @@ workers, stats = couch_replicator_stats:new(), session_id, - source_monitor = nil, - target_monitor = nil, source_seq = nil, use_checkpoints = true, checkpoint_interval = ?DEFAULT_CHECKPOINT_INTERVAL, @@ -242,14 +240,6 @@ handle_cast({report_seq, Seq}, handle_info(shutdown, St) -> {stop, shutdown, St}; -handle_info({'DOWN', Ref, _, _, Why}, #rep_state{source_monitor = Ref} = St) -> - couch_log:error("Source database is down. Reason: ~p", [Why]), - {stop, source_db_down, St}; - -handle_info({'DOWN', Ref, _, _, Why}, #rep_state{target_monitor = Ref} = St) -> - couch_log:error("Target database is down. Reason: ~p", [Why]), - {stop, target_db_down, St}; - handle_info({'EXIT', Pid, max_backoff}, State) -> couch_log:error("Max backoff reached child process ~p", [Pid]), {stop, {shutdown, max_backoff}, State}; @@ -261,10 +251,20 @@ handle_info({'EXIT', Pid, {shutdown, max_backoff}}, State) -> handle_info({'EXIT', Pid, normal}, #rep_state{changes_reader=Pid} = State) -> {noreply, State}; -handle_info({'EXIT', Pid, Reason}, #rep_state{changes_reader=Pid} = State) -> +handle_info({'EXIT', Pid, Reason0}, #rep_state{changes_reader=Pid} = State) -> couch_stats:increment_counter([couch_replicator, changes_reader_deaths]), + Reason = case Reason0 of + {changes_req_failed, _, _} = HttpFail -> + HttpFail; + {http_request_failed, _, _, {error, {code, Code}}} -> + {changes_req_failed, Code}; + {http_request_failed, _, _, {error, Err}} -> + {changes_req_failed, Err}; + Other -> + {changes_reader_died, Other} + end, couch_log:error("ChangesReader process died with reason: ~p", [Reason]), - {stop, changes_reader_died, cancel_timer(State)}; + {stop, {shutdown, Reason}, cancel_timer(State)}; handle_info({'EXIT', Pid, normal}, #rep_state{changes_manager = Pid} = State) -> {noreply, State}; @@ -272,7 +272,7 @@ handle_info({'EXIT', Pid, normal}, #rep_state{changes_manager = Pid} = State) -> handle_info({'EXIT', Pid, Reason}, #rep_state{changes_manager = Pid} = State) -> couch_stats:increment_counter([couch_replicator, changes_manager_deaths]), couch_log:error("ChangesManager process died with reason: ~p", [Reason]), - {stop, changes_manager_died, cancel_timer(State)}; + {stop, {shutdown, {changes_manager_died, Reason}}, cancel_timer(State)}; handle_info({'EXIT', Pid, normal}, #rep_state{changes_queue=Pid} = State) -> {noreply, State}; @@ -280,7 +280,7 @@ handle_info({'EXIT', Pid, normal}, #rep_state{changes_queue=Pid} = State) -> handle_info({'EXIT', Pid, Reason}, #rep_state{changes_queue=Pid} = State) -> couch_stats:increment_counter([couch_replicator, changes_queue_deaths]), couch_log:error("ChangesQueue process died with reason: ~p", [Reason]), - {stop, changes_queue_died, cancel_timer(State)}; + {stop, {shutdown, {changes_queue_died, Reason}}, cancel_timer(State)}; handle_info({'EXIT', Pid, normal}, #rep_state{workers = Workers} = State) -> case Workers -- [Pid] of @@ -304,8 +304,14 @@ handle_info({'EXIT', Pid, Reason}, #rep_state{workers = Workers} = State) -> {stop, {unknown_process_died, Pid, Reason}, State2}; true -> couch_stats:increment_counter([couch_replicator, worker_deaths]), - couch_log:error("Worker ~p died with reason: ~p", [Pid, Reason]), - {stop, {worker_died, Pid, Reason}, State2} + StopReason = case Reason of + {shutdown, _} = Err -> + Err; + Other -> + couch_log:error("Worker ~p died with reason: ~p", [Pid, Reason]), + {worker_died, Pid, Other} + end, + {stop, StopReason, State2} end; handle_info(timeout, InitArgs) -> @@ -380,6 +386,11 @@ terminate({shutdown, max_backoff}, State) -> terminate_cleanup(State), couch_replicator_notifier:notify({error, RepId, max_backoff}); +terminate({shutdown, Reason}, State) -> + % Unwrap so when reporting we don't have an extra {shutdown, ...} tuple + % wrapped around the message + terminate(Reason, State); + terminate(Reason, State) -> #rep_state{ source_name = Source, @@ -554,7 +565,7 @@ init_state(Rep) -> options = Options, type = Type, view = View, start_time = StartTime, - stats = Stats + stats = ArgStats0 } = Rep, % Adjust minimum number of http source connections to 2 to avoid deadlock Src = adjust_maxconn(Src0, BaseId), @@ -569,6 +580,14 @@ init_state(Rep) -> [SourceLog, TargetLog] = find_and_migrate_logs([Source, Target], Rep), {StartSeq0, History} = compare_replication_logs(SourceLog, TargetLog), + + ArgStats1 = couch_replicator_stats:new(ArgStats0), + HistoryStats = case History of + [{[_ | _] = HProps} | _] -> couch_replicator_stats:new(HProps); + _ -> couch_replicator_stats:new() + end, + Stats = couch_replicator_stats:max_stats(ArgStats1, HistoryStats), + StartSeq1 = get_value(since_seq, Options, StartSeq0), StartSeq = {0, StartSeq1}, @@ -592,15 +611,13 @@ init_state(Rep) -> src_starttime = get_value(<<"instance_start_time">>, SourceInfo), tgt_starttime = get_value(<<"instance_start_time">>, TargetInfo), session_id = couch_uuids:random(), - source_monitor = db_monitor(Source), - target_monitor = db_monitor(Target), source_seq = SourceSeq, use_checkpoints = get_value(use_checkpoints, Options, true), checkpoint_interval = get_value(checkpoint_interval, Options, ?DEFAULT_CHECKPOINT_INTERVAL), type = Type, view = View, - stats = couch_replicator_stats:new(Stats) + stats = Stats }, State#rep_state{timer = start_timer(State)}. @@ -905,12 +922,6 @@ has_session_id(SessionId, [{Props} | Rest]) -> end. -db_monitor(#httpdb{}) -> - nil; -db_monitor(Db) -> - couch_db:monitor(Db). - - get_pending_count(St) -> Rep = St#rep_state.rep_details, Timeout = get_value(connection_timeout, Rep#rep.options), diff --git a/src/couch_replicator/src/couch_replicator_stats.erl b/src/couch_replicator/src/couch_replicator_stats.erl index cd62949e9..37848b3ee 100644 --- a/src/couch_replicator/src/couch_replicator_stats.erl +++ b/src/couch_replicator/src/couch_replicator_stats.erl @@ -17,7 +17,8 @@ new/1, get/2, increment/2, - sum_stats/2 + sum_stats/2, + max_stats/2 ]). -export([ @@ -64,14 +65,29 @@ increment(Field, Stats) -> sum_stats(S1, S2) -> orddict:merge(fun(_, V1, V2) -> V1+V2 end, S1, S2). +max_stats(S1, S2) -> + orddict:merge(fun(_, V1, V2) -> max(V1, V2) end, S1, S2). -% Handle initializing from a status object which uses same values but different -% field names. -fmap({revisions_checked, V}) -> {true, {missing_checked, V}}; -fmap({missing_revisions_found, V}) -> {true, {missing_found, V}}; -fmap({missing_checked, _}) -> true; -fmap({missing_found, _}) -> true; -fmap({docs_read, _}) -> true; -fmap({docs_written, _}) -> true; -fmap({doc_write_failures, _}) -> true; -fmap({_, _}) -> false. + +% Handle initializing from a status object, which uses same values but +% different field names, as well as from ejson props from the checkpoint +% history +% +fmap({missing_found, _}) -> true; +fmap({missing_revisions_found, V}) -> {true, {missing_found, V}}; +fmap({<<"missing_found">>, V}) -> {true, {missing_found, V}}; + +fmap({missing_checked, _}) -> true; +fmap({revisions_checked, V}) -> {true, {missing_checked, V}}; +fmap({<<"missing_checked">>, V}) -> {true, {missing_checked, V}}; + +fmap({docs_read, _}) -> true; +fmap({<<"docs_read">>, V}) -> {true, {docs_read, V}}; + +fmap({docs_written, _}) -> true; +fmap({<<"docs_written">>, V}) -> {true, {docs_written, V}}; + +fmap({doc_write_failures, _}) -> true; +fmap({<<"doc_write_failures">>, V}) -> {true, {doc_write_failures, V}}; + +fmap({_, _}) -> false. diff --git a/src/couch_replicator/src/couch_replicator_worker.erl b/src/couch_replicator/src/couch_replicator_worker.erl index 3d80f5883..eb8beaaa9 100644 --- a/src/couch_replicator/src/couch_replicator_worker.erl +++ b/src/couch_replicator/src/couch_replicator_worker.erl @@ -169,6 +169,15 @@ handle_info({'EXIT', Pid, normal}, #state{writer = nil} = State) -> handle_info({'EXIT', _Pid, max_backoff}, State) -> {stop, {shutdown, max_backoff}, State}; +handle_info({'EXIT', _Pid, {bulk_docs_failed, _, _} = Err}, State) -> + {stop, {shutdown, Err}, State}; + +handle_info({'EXIT', _Pid, {revs_diff_failed, _, _} = Err}, State) -> + {stop, {shutdown, Err}, State}; + +handle_info({'EXIT', _Pid, {http_request_failed, _, _, _} = Err}, State) -> + {stop, {shutdown, Err}, State}; + handle_info({'EXIT', Pid, Reason}, State) -> {stop, {process_died, Pid, Reason}, State}. @@ -372,8 +381,9 @@ handle_flush_docs_result({error, request_body_too_large}, Target, DocList) -> " request body is too large. Splitting batch into 2 separate batches of" " sizes ~p and ~p", [Len, couch_replicator_api_wrap:db_uri(Target), length(DocList1), length(DocList2)]), - flush_docs(Target, DocList1), - flush_docs(Target, DocList2); + Stats1 = flush_docs(Target, DocList1), + Stats2 = flush_docs(Target, DocList2), + couch_replicator_stats:sum_stats(Stats1, Stats2); handle_flush_docs_result({ok, Errors}, Target, DocList) -> DbUri = couch_replicator_api_wrap:db_uri(Target), lists:foreach( @@ -386,7 +396,9 @@ handle_flush_docs_result({ok, Errors}, Target, DocList) -> couch_replicator_stats:new([ {docs_written, length(DocList) - length(Errors)}, {doc_write_failures, length(Errors)} - ]). + ]); +handle_flush_docs_result({error, {bulk_docs_failed, _, _} = Err}, _, _) -> + exit(Err). flush_doc(Target, #doc{id = Id, revs = {Pos, [RevId | _]}} = Doc) -> @@ -425,7 +437,10 @@ find_missing(DocInfos, Target) -> end, {[], 0}, DocInfos), - {ok, Missing} = couch_replicator_api_wrap:get_missing_revs(Target, IdRevs), + Missing = case couch_replicator_api_wrap:get_missing_revs(Target, IdRevs) of + {ok, Result} -> Result; + {error, Error} -> exit(Error) + end, MissingRevsCount = lists:foldl( fun({_Id, MissingRevs, _PAs}, Acc) -> Acc + length(MissingRevs) end, 0, Missing), diff --git a/src/couch_replicator/test/eunit/couch_replicator_error_reporting_tests.erl b/src/couch_replicator/test/eunit/couch_replicator_error_reporting_tests.erl new file mode 100644 index 000000000..6b4f95c25 --- /dev/null +++ b/src/couch_replicator/test/eunit/couch_replicator_error_reporting_tests.erl @@ -0,0 +1,271 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(couch_replicator_error_reporting_tests). + +-include_lib("couch/include/couch_eunit.hrl"). +-include_lib("couch/include/couch_db.hrl"). +-include_lib("couch_replicator/src/couch_replicator.hrl"). + + +setup_all() -> + test_util:start_couch([couch_replicator, chttpd, mem3, fabric]). + + +teardown_all(Ctx) -> + ok = test_util:stop_couch(Ctx). + + +setup() -> + meck:unload(), + Source = setup_db(), + Target = setup_db(), + {Source, Target}. + + +teardown({Source, Target}) -> + meck:unload(), + teardown_db(Source), + teardown_db(Target), + ok. + + +error_reporting_test_() -> + { + setup, + fun setup_all/0, + fun teardown_all/1, + { + foreach, + fun setup/0, + fun teardown/1, + [ + fun t_fail_bulk_docs/1, + fun t_fail_changes_reader/1, + fun t_fail_revs_diff/1, + fun t_fail_changes_queue/1, + fun t_fail_changes_manager/1, + fun t_fail_changes_reader_proc/1 + ] + } + }. + + +t_fail_bulk_docs({Source, Target}) -> + ?_test(begin + populate_db(Source, 1, 5), + {ok, RepId} = replicate(Source, Target), + wait_target_in_sync(Source, Target), + + {ok, Listener} = rep_result_listener(RepId), + mock_fail_req("/_bulk_docs", {ok, "403", [], [<<"{\"x\":\"y\"}">>]}), + populate_db(Source, 6, 6), + + {error, Result} = wait_rep_result(RepId), + ?assertEqual({bulk_docs_failed, 403, {[{<<"x">>, <<"y">>}]}}, Result), + + couch_replicator_notifier:stop(Listener) + end). + + +t_fail_changes_reader({Source, Target}) -> + ?_test(begin + populate_db(Source, 1, 5), + {ok, RepId} = replicate(Source, Target), + wait_target_in_sync(Source, Target), + + {ok, Listener} = rep_result_listener(RepId), + mock_fail_req("/_changes", {ok, "418", [], [<<"{\"x\":\"y\"}">>]}), + populate_db(Source, 6, 6), + + {error, Result} = wait_rep_result(RepId), + ?assertEqual({changes_req_failed, 418, {[{<<"x">>, <<"y">>}]}}, Result), + + couch_replicator_notifier:stop(Listener) + end). + + +t_fail_revs_diff({Source, Target}) -> + ?_test(begin + populate_db(Source, 1, 5), + {ok, RepId} = replicate(Source, Target), + wait_target_in_sync(Source, Target), + + {ok, Listener} = rep_result_listener(RepId), + mock_fail_req("/_revs_diff", {ok, "407", [], [<<"{\"x\":\"y\"}">>]}), + populate_db(Source, 6, 6), + + {error, Result} = wait_rep_result(RepId), + ?assertEqual({revs_diff_failed, 407, {[{<<"x">>, <<"y">>}]}}, Result), + + couch_replicator_notifier:stop(Listener) + end). + + +t_fail_changes_queue({Source, Target}) -> + ?_test(begin + populate_db(Source, 1, 5), + {ok, RepId} = replicate(Source, Target), + wait_target_in_sync(Source, Target), + + RepPid = couch_replicator_test_helper:get_pid(RepId), + State = sys:get_state(RepPid), + ChangesQueue = element(20, State), + ?assert(is_process_alive(ChangesQueue)), + + {ok, Listener} = rep_result_listener(RepId), + exit(ChangesQueue, boom), + + {error, Result} = wait_rep_result(RepId), + ?assertEqual({changes_queue_died, boom}, Result), + couch_replicator_notifier:stop(Listener) + end). + + +t_fail_changes_manager({Source, Target}) -> + ?_test(begin + populate_db(Source, 1, 5), + {ok, RepId} = replicate(Source, Target), + wait_target_in_sync(Source, Target), + + RepPid = couch_replicator_test_helper:get_pid(RepId), + State = sys:get_state(RepPid), + ChangesManager = element(21, State), + ?assert(is_process_alive(ChangesManager)), + + {ok, Listener} = rep_result_listener(RepId), + exit(ChangesManager, bam), + + {error, Result} = wait_rep_result(RepId), + ?assertEqual({changes_manager_died, bam}, Result), + couch_replicator_notifier:stop(Listener) + end). + + +t_fail_changes_reader_proc({Source, Target}) -> + ?_test(begin + populate_db(Source, 1, 5), + {ok, RepId} = replicate(Source, Target), + wait_target_in_sync(Source, Target), + + RepPid = couch_replicator_test_helper:get_pid(RepId), + State = sys:get_state(RepPid), + ChangesReader = element(22, State), + ?assert(is_process_alive(ChangesReader)), + + {ok, Listener} = rep_result_listener(RepId), + exit(ChangesReader, kapow), + + {error, Result} = wait_rep_result(RepId), + ?assertEqual({changes_reader_died, kapow}, Result), + couch_replicator_notifier:stop(Listener) + end). + + +mock_fail_req(Path, Return) -> + meck:expect(ibrowse, send_req_direct, + fun(W, Url, Headers, Meth, Body, Opts, TOut) -> + Args = [W, Url, Headers, Meth, Body, Opts, TOut], + {ok, {_, _, _, _, UPath, _}} = http_uri:parse(Url), + case lists:suffix(Path, UPath) of + true -> Return; + false -> meck:passthrough(Args) + end + end). + + +rep_result_listener(RepId) -> + ReplyTo = self(), + {ok, _Listener} = couch_replicator_notifier:start_link( + fun({_, RepId2, _} = Ev) when RepId2 =:= RepId -> + ReplyTo ! Ev; + (_) -> + ok + end). + + +wait_rep_result(RepId) -> + receive + {finished, RepId, RepResult} -> {ok, RepResult}; + {error, RepId, Reason} -> {error, Reason} + end. + + + +setup_db() -> + DbName = ?tempdb(), + {ok, Db} = couch_db:create(DbName, [?ADMIN_CTX]), + ok = couch_db:close(Db), + DbName. + + +teardown_db(DbName) -> + ok = couch_server:delete(DbName, [?ADMIN_CTX]). + + +populate_db(DbName, Start, End) -> + {ok, Db} = couch_db:open_int(DbName, []), + Docs = lists:foldl( + fun(DocIdCounter, Acc) -> + Id = integer_to_binary(DocIdCounter), + Doc = #doc{id = Id, body = {[]}}, + [Doc | Acc] + end, + [], lists:seq(Start, End)), + {ok, _} = couch_db:update_docs(Db, Docs, []), + ok = couch_db:close(Db). + + +wait_target_in_sync(Source, Target) -> + {ok, SourceDb} = couch_db:open_int(Source, []), + {ok, SourceInfo} = couch_db:get_db_info(SourceDb), + ok = couch_db:close(SourceDb), + SourceDocCount = couch_util:get_value(doc_count, SourceInfo), + wait_target_in_sync_loop(SourceDocCount, Target, 300). + + +wait_target_in_sync_loop(_DocCount, _TargetName, 0) -> + erlang:error({assertion_failed, [ + {module, ?MODULE}, {line, ?LINE}, + {reason, "Could not get source and target databases in sync"} + ]}); + +wait_target_in_sync_loop(DocCount, TargetName, RetriesLeft) -> + {ok, Target} = couch_db:open_int(TargetName, []), + {ok, TargetInfo} = couch_db:get_db_info(Target), + ok = couch_db:close(Target), + TargetDocCount = couch_util:get_value(doc_count, TargetInfo), + case TargetDocCount == DocCount of + true -> + true; + false -> + ok = timer:sleep(500), + wait_target_in_sync_loop(DocCount, TargetName, RetriesLeft - 1) + end. + + +replicate(Source, Target) -> + SrcUrl = couch_replicator_test_helper:db_url(Source), + TgtUrl = couch_replicator_test_helper:db_url(Target), + RepObject = {[ + {<<"source">>, SrcUrl}, + {<<"target">>, TgtUrl}, + {<<"continuous">>, true}, + {<<"worker_processes">>, 1}, + {<<"retries_per_request">>, 1}, + % Low connection timeout so _changes feed gets restarted quicker + {<<"connection_timeout">>, 3000} + ]}, + {ok, Rep} = couch_replicator_utils:parse_rep_doc(RepObject, ?ADMIN_USER), + ok = couch_replicator_scheduler:add_job(Rep), + couch_replicator_scheduler:reschedule(), + {ok, Rep#rep.id}. diff --git a/src/couch_replicator/test/eunit/couch_replicator_retain_stats_between_job_runs.erl b/src/couch_replicator/test/eunit/couch_replicator_retain_stats_between_job_runs.erl index 9dd86b3ef..037f37191 100644 --- a/src/couch_replicator/test/eunit/couch_replicator_retain_stats_between_job_runs.erl +++ b/src/couch_replicator/test/eunit/couch_replicator_retain_stats_between_job_runs.erl @@ -18,48 +18,93 @@ -define(DELAY, 500). -define(TIMEOUT, 60000). --define(i2l(I), integer_to_list(I)). --define(io2b(Io), iolist_to_binary(Io)). + + +setup_all() -> + test_util:start_couch([couch_replicator, chttpd, mem3, fabric]). + + +teardown_all(Ctx) -> + ok = test_util:stop_couch(Ctx). setup() -> - Ctx = test_util:start_couch([couch_replicator, chttpd, mem3, fabric]), Source = setup_db(), Target = setup_db(), - {Ctx, {Source, Target}}. + {Source, Target}. -teardown({Ctx, {Source, Target}}) -> +teardown({Source, Target}) -> teardown_db(Source), teardown_db(Target), - ok = application:stop(couch_replicator), - ok = test_util:stop_couch(Ctx). + ok. stats_retained_test_() -> { setup, - fun setup/0, - fun teardown/1, - fun t_stats_retained/1 + fun setup_all/0, + fun teardown_all/1, + { + foreach, + fun setup/0, + fun teardown/1, + [ + fun t_stats_retained_by_scheduler/1, + fun t_stats_retained_on_job_removal/1 + ] + } }. -t_stats_retained({_Ctx, {Source, Target}}) -> +t_stats_retained_by_scheduler({Source, Target}) -> ?_test(begin - populate_db(Source, 42), + {ok, _} = add_vdu(Target), + populate_db_reject_even_docs(Source, 1, 10), {ok, RepPid, RepId} = replicate(Source, Target), + wait_target_in_sync(6, Target), - wait_target_in_sync(Source, Target), - check_active_tasks(42, 42), - check_scheduler_jobs(42, 42), + check_active_tasks(10, 5, 5), + check_scheduler_jobs(10, 5, 5), stop_job(RepPid), - check_scheduler_jobs(42, 42), + check_scheduler_jobs(10, 5, 5), start_job(), - check_active_tasks(42, 42), - check_scheduler_jobs(42, 42), + check_active_tasks(10, 5, 5), + check_scheduler_jobs(10, 5, 5), + couch_replicator_scheduler:remove_job(RepId) + end). + + +t_stats_retained_on_job_removal({Source, Target}) -> + ?_test(begin + {ok, _} = add_vdu(Target), + populate_db_reject_even_docs(Source, 1, 10), + {ok, _, RepId} = replicate(Source, Target), + wait_target_in_sync(6, Target), % 5 + 1 vdu + + check_active_tasks(10, 5, 5), + check_scheduler_jobs(10, 5, 5), + + couch_replicator_scheduler:remove_job(RepId), + + populate_db_reject_even_docs(Source, 11, 20), + {ok, _, RepId} = replicate(Source, Target), + wait_target_in_sync(11, Target), % 6 + 5 + + check_scheduler_jobs(20, 10, 10), + check_active_tasks(20, 10, 10), + + couch_replicator_scheduler:remove_job(RepId), + + populate_db_reject_even_docs(Source, 21, 30), + {ok, _, RepId} = replicate(Source, Target), + wait_target_in_sync(16, Target), % 11 + 5 + + check_scheduler_jobs(30, 15, 15), + check_active_tasks(30, 15, 15), + couch_replicator_scheduler:remove_job(RepId) end). @@ -92,14 +137,16 @@ start_job() -> couch_replicator_scheduler:reschedule(). -check_active_tasks(DocsRead, DocsWritten) -> +check_active_tasks(DocsRead, DocsWritten, DocsFailed) -> RepTask = wait_for_task_status(), ?assertNotEqual(timeout, RepTask), ?assertEqual(DocsRead, couch_util:get_value(docs_read, RepTask)), - ?assertEqual(DocsWritten, couch_util:get_value(docs_written, RepTask)). + ?assertEqual(DocsWritten, couch_util:get_value(docs_written, RepTask)), + ?assertEqual(DocsFailed, couch_util:get_value(doc_write_failures, + RepTask)). -check_scheduler_jobs(DocsRead, DocsWritten) -> +check_scheduler_jobs(DocsRead, DocsWritten, DocFailed) -> Info = wait_scheduler_info(), ?assert(maps:is_key(<<"changes_pending">>, Info)), ?assert(maps:is_key(<<"doc_write_failures">>, Info)), @@ -110,7 +157,8 @@ check_scheduler_jobs(DocsRead, DocsWritten) -> ?assert(maps:is_key(<<"source_seq">>, Info)), ?assert(maps:is_key(<<"revisions_checked">>, Info)), ?assertMatch(#{<<"docs_read">> := DocsRead}, Info), - ?assertMatch(#{<<"docs_written">> := DocsWritten}, Info). + ?assertMatch(#{<<"docs_written">> := DocsWritten}, Info), + ?assertMatch(#{<<"doc_write_failures">> := DocFailed}, Info). replication_tasks() -> @@ -138,25 +186,31 @@ wait_scheduler_info() -> end). -populate_db(DbName, DocCount) -> +populate_db_reject_even_docs(DbName, Start, End) -> + BodyFun = fun(Id) -> + case Id rem 2 == 0 of + true -> {[{<<"nope">>, true}]}; + false -> {[]} + end + end, + populate_db(DbName, Start, End, BodyFun). + + +populate_db(DbName, Start, End, BodyFun) when is_function(BodyFun, 1) -> {ok, Db} = couch_db:open_int(DbName, []), Docs = lists:foldl( fun(DocIdCounter, Acc) -> - Id = ?io2b(["doc", ?i2l(DocIdCounter)]), - Doc = #doc{id = Id, body = {[]}}, + Id = integer_to_binary(DocIdCounter), + Doc = #doc{id = Id, body = BodyFun(DocIdCounter)}, [Doc | Acc] end, - [], lists:seq(1, DocCount)), + [], lists:seq(Start, End)), {ok, _} = couch_db:update_docs(Db, Docs, []), ok = couch_db:close(Db). -wait_target_in_sync(Source, Target) -> - {ok, SourceDb} = couch_db:open_int(Source, []), - {ok, SourceInfo} = couch_db:get_db_info(SourceDb), - ok = couch_db:close(SourceDb), - SourceDocCount = couch_util:get_value(doc_count, SourceInfo), - wait_target_in_sync_loop(SourceDocCount, Target, 300). +wait_target_in_sync(DocCount, Target) when is_integer(DocCount) -> + wait_target_in_sync_loop(DocCount, Target, 300). wait_target_in_sync_loop(_DocCount, _TargetName, 0) -> @@ -170,7 +224,7 @@ wait_target_in_sync_loop(DocCount, TargetName, RetriesLeft) -> {ok, TargetInfo} = couch_db:get_db_info(Target), ok = couch_db:close(Target), TargetDocCount = couch_util:get_value(doc_count, TargetInfo), - case TargetDocCount == DocCount of + case TargetDocCount == DocCount of true -> true; false -> @@ -201,3 +255,28 @@ scheduler_jobs() -> {ok, 200, _, Body} = test_request:get(Url, []), Json = jiffy:decode(Body, [return_maps]), maps:get(<<"jobs">>, Json). + + +vdu() -> + <<"function(newDoc, oldDoc, userCtx) { + if(newDoc.nope === true) { + throw({forbidden: 'nope'}); + } else { + return; + } + }">>. + + +add_vdu(DbName) -> + DocProps = [ + {<<"_id">>, <<"_design/vdu">>}, + {<<"language">>, <<"javascript">>}, + {<<"validate_doc_update">>, vdu()} + ], + Doc = couch_doc:from_json_obj({DocProps}, []), + {ok, Db} = couch_db:open_int(DbName, [?ADMIN_CTX]), + try + {ok, _Rev} = couch_db:update_doc(Db, Doc, []) + after + couch_db:close(Db) + end. diff --git a/src/dreyfus/src/dreyfus_fabric.erl b/src/dreyfus/src/dreyfus_fabric.erl index a953b6a38..0b25a6cc6 100644 --- a/src/dreyfus/src/dreyfus_fabric.erl +++ b/src/dreyfus/src/dreyfus_fabric.erl @@ -14,7 +14,7 @@ %% -*- erlang-indent-level: 4;indent-tabs-mode: nil -*- -module(dreyfus_fabric). --export([get_json_docs/2, handle_error_message/6]). +-export([get_json_docs/2, handle_error_message/7]). -include_lib("couch/include/couch_db.hrl"). -include_lib("mem3/include/mem3.hrl"). @@ -36,40 +36,42 @@ callback(timeout, _Acc) -> {error, timeout}. handle_error_message({rexi_DOWN, _, {_, NodeRef}, _}, _Worker, - Counters, _Replacements, _StartFun, _StartArgs) -> - case fabric_util:remove_down_workers(Counters, NodeRef) of + Counters, _Replacements, _StartFun, _StartArgs, RingOpts) -> + case fabric_util:remove_down_workers(Counters, NodeRef, RingOpts) of {ok, NewCounters} -> {ok, NewCounters}; error -> {error, {nodedown, <<"progress not possible">>}} end; handle_error_message({rexi_EXIT, {maintenance_mode, _}}, Worker, - Counters, Replacements, StartFun, StartArgs) -> - handle_replacement(Worker, Counters, Replacements, StartFun, StartArgs); + Counters, Replacements, StartFun, StartArgs, RingOpts) -> + handle_replacement(Worker, Counters, Replacements, StartFun, StartArgs, + RingOpts); handle_error_message({rexi_EXIT, Reason}, Worker, - Counters, _Replacements, _StartFun, _StartArgs) -> - handle_error(Reason, Worker, Counters); + Counters, _Replacements, _StartFun, _StartArgs, RingOpts) -> + handle_error(Reason, Worker, Counters, RingOpts); handle_error_message({error, Reason}, Worker, - Counters, _Replacements, _StartFun, _StartArgs) -> - handle_error(Reason, Worker, Counters); + Counters, _Replacements, _StartFun, _StartArgs, RingOpts) -> + handle_error(Reason, Worker, Counters, RingOpts); handle_error_message({'EXIT', Reason}, Worker, - Counters, _Replacements, _StartFun, _StartArgs) -> - handle_error({exit, Reason}, Worker, Counters); + Counters, _Replacements, _StartFun, _StartArgs, RingOpts) -> + handle_error({exit, Reason}, Worker, Counters, RingOpts); handle_error_message(Reason, Worker, Counters, - _Replacements, _StartFun, _StartArgs) -> + _Replacements, _StartFun, _StartArgs, RingOpts) -> couch_log:error("Unexpected error during request: ~p", [Reason]), - handle_error(Reason, Worker, Counters). + handle_error(Reason, Worker, Counters, RingOpts). -handle_error(Reason, Worker, Counters0) -> +handle_error(Reason, Worker, Counters0, RingOpts) -> Counters = fabric_dict:erase(Worker, Counters0), - case fabric_view:is_progress_possible(Counters) of + case fabric_ring:is_progress_possible(Counters, RingOpts) of true -> {ok, Counters}; false -> {error, Reason} end. -handle_replacement(Worker, OldCntrs0, OldReplacements, StartFun, StartArgs) -> +handle_replacement(Worker, OldCntrs0, OldReplacements, StartFun, StartArgs, + RingOpts) -> OldCounters = lists:filter(fun({#shard{ref=R}, _}) -> R /= Worker#shard.ref end, OldCntrs0), @@ -79,12 +81,12 @@ handle_replacement(Worker, OldCntrs0, OldReplacements, StartFun, StartArgs) -> NewCounter = start_replacement(StartFun, StartArgs, Repl), fabric_dict:store(NewCounter, nil, CounterAcc) end, OldCounters, Replacements), - true = fabric_view:is_progress_possible(NewCounters), + true = fabric_ring:is_progress_possible(NewCounters, RingOpts), NewRefs = fabric_dict:fetch_keys(NewCounters), {new_refs, NewRefs, NewCounters, NewReplacements}; false -> handle_error({nodedown, <<"progress not possible">>}, - Worker, OldCounters) + Worker, OldCounters, RingOpts) end. start_replacement(StartFun, StartArgs, Shard) -> @@ -106,3 +108,98 @@ start_replacement(StartFun, StartArgs, Shard) -> {dreyfus_rpc, StartFun, [Shard#shard.name|StartArgs1]}), Shard#shard{ref = Ref}. + + +-ifdef(TEST). + +-include_lib("eunit/include/eunit.hrl"). + + +node_down_test() -> + [S1, S2, S3] = [ + mk_shard("n1", [0, 4]), + mk_shard("n1", [5, ?RING_END]), + mk_shard("n2", [0, ?RING_END]) + ], + [W1, W2, W3] = [ + S1#shard{ref = make_ref()}, + S2#shard{ref = make_ref()}, + S3#shard{ref = make_ref()} + ], + Counters1 = fabric_dict:init([W1, W2, W3], nil), + + N1 = S1#shard.node, + Msg1 = {rexi_DOWN, nil, {nil, N1}, nil}, + Res1 = handle_error_message(Msg1, nil, Counters1, nil, nil, nil, []), + ?assertEqual({ok, [{W3, nil}]}, Res1), + + {ok, Counters2} = Res1, + N2 = S3#shard.node, + Msg2 = {rexi_DOWN, nil, {nil, N2}, nil}, + Res2 = handle_error_message(Msg2, nil, Counters2, nil, nil, nil, []), + ?assertEqual({error, {nodedown, <<"progress not possible">>}}, Res2). + + +worker_error_test() -> + [S1, S2] = [ + mk_shard("n1", [0, ?RING_END]), + mk_shard("n2", [0, ?RING_END]) + ], + [W1, W2] = [S1#shard{ref = make_ref()}, S2#shard{ref = make_ref()}], + Counters1 = fabric_dict:init([W1, W2], nil), + + Res1 = handle_error(bam, W1, Counters1, []), + ?assertEqual({ok, [{W2, nil}]}, Res1), + + {ok, Counters2} = Res1, + ?assertEqual({error, boom}, handle_error(boom, W2, Counters2, [])). + + +node_down_with_partitions_test() -> + [S1, S2] = [ + mk_shard("n1", [0, 4]), + mk_shard("n2", [0, 8]) + ], + [W1, W2] = [ + S1#shard{ref = make_ref()}, + S2#shard{ref = make_ref()} + ], + Counters1 = fabric_dict:init([W1, W2], nil), + RingOpts = [{any, [S1, S2]}], + + N1 = S1#shard.node, + Msg1 = {rexi_DOWN, nil, {nil, N1}, nil}, + Res1 = handle_error_message(Msg1, nil, Counters1, nil, nil, nil, RingOpts), + ?assertEqual({ok, [{W2, nil}]}, Res1), + + {ok, Counters2} = Res1, + N2 = S2#shard.node, + Msg2 = {rexi_DOWN, nil, {nil, N2}, nil}, + Res2 = handle_error_message(Msg2, nil, Counters2, nil, nil, nil, RingOpts), + ?assertEqual({error, {nodedown, <<"progress not possible">>}}, Res2). + + +worker_error_with_partitions_test() -> + [S1, S2] = [ + mk_shard("n1", [0, 4]), + mk_shard("n2", [0, 8])], + [W1, W2] = [ + S1#shard{ref = make_ref()}, + S2#shard{ref = make_ref()} + ], + Counters1 = fabric_dict:init([W1, W2], nil), + RingOpts = [{any, [S1, S2]}], + + Res1 = handle_error(bam, W1, Counters1, RingOpts), + ?assertEqual({ok, [{W2, nil}]}, Res1), + + {ok, Counters2} = Res1, + ?assertEqual({error, boom}, handle_error(boom, W2, Counters2, RingOpts)). + + +mk_shard(Name, Range) -> + Node = list_to_atom(Name), + BName = list_to_binary(Name), + #shard{name = BName, node = Node, range = Range}. + +-endif. diff --git a/src/dreyfus/src/dreyfus_fabric_group1.erl b/src/dreyfus/src/dreyfus_fabric_group1.erl index 2d530ca7e..bdae6f040 100644 --- a/src/dreyfus/src/dreyfus_fabric_group1.erl +++ b/src/dreyfus/src/dreyfus_fabric_group1.erl @@ -27,7 +27,8 @@ top_groups, counters, start_args, - replacements + replacements, + ring_opts }). go(DbName, GroupId, IndexName, QueryArgs) when is_binary(GroupId) -> @@ -39,6 +40,7 @@ go(DbName, DDoc, IndexName, #index_query_args{}=QueryArgs) -> DesignName = dreyfus_util:get_design_docid(DDoc), dreyfus_util:maybe_deny_index(DbName, DesignName, IndexName), Shards = dreyfus_util:get_shards(DbName, QueryArgs), + RingOpts = dreyfus_util:get_ring_opts(QueryArgs, Shards), Workers = fabric_util:submit_jobs(Shards, dreyfus_rpc, group1, [DDoc, IndexName, dreyfus_util:export(QueryArgs)]), Replacements = fabric_view:get_shard_replacements(DbName, Workers), @@ -50,7 +52,8 @@ go(DbName, DDoc, IndexName, #index_query_args{}=QueryArgs) -> top_groups = [], counters = Counters, start_args = [DDoc, IndexName, QueryArgs], - replacements = Replacements + replacements = Replacements, + ring_opts = RingOpts }, try rexi_utils:recv(Workers, #shard.ref, fun handle_message/3, @@ -89,7 +92,7 @@ handle_message(Error, Worker, State0) -> State = upgrade_state(State0), case dreyfus_fabric:handle_error_message(Error, Worker, State#state.counters, State#state.replacements, - group1, State#state.start_args) of + group1, State#state.start_args, State#state.ring_opts) of {ok, Counters} -> {ok, State#state{counters=Counters}}; {new_refs, NewRefs, NewCounters, NewReplacements} -> diff --git a/src/dreyfus/src/dreyfus_fabric_group2.erl b/src/dreyfus/src/dreyfus_fabric_group2.erl index 1239f8b74..8d864dd0c 100644 --- a/src/dreyfus/src/dreyfus_fabric_group2.erl +++ b/src/dreyfus/src/dreyfus_fabric_group2.erl @@ -29,7 +29,8 @@ top_groups, counters, start_args, - replacements + replacements, + ring_opts }). go(DbName, GroupId, IndexName, QueryArgs) when is_binary(GroupId) -> @@ -41,6 +42,7 @@ go(DbName, DDoc, IndexName, #index_query_args{}=QueryArgs) -> DesignName = dreyfus_util:get_design_docid(DDoc), dreyfus_util:maybe_deny_index(DbName, DesignName, IndexName), Shards = dreyfus_util:get_shards(DbName, QueryArgs), + RingOpts = dreyfus_util:get_ring_opts(QueryArgs, Shards), Workers = fabric_util:submit_jobs(Shards, dreyfus_rpc, group2, [DDoc, IndexName, dreyfus_util:export(QueryArgs)]), Replacements = fabric_view:get_shard_replacements(DbName, Workers), @@ -54,7 +56,8 @@ go(DbName, DDoc, IndexName, #index_query_args{}=QueryArgs) -> top_groups = [], counters = Counters, start_args = [DDoc, IndexName, QueryArgs], - replacements = Replacements + replacements = Replacements, + ring_opts = RingOpts }, try rexi_utils:recv(Workers, #shard.ref, fun handle_message/3, @@ -102,7 +105,7 @@ handle_message(Error, Worker, State0) -> State = upgrade_state(State0), case dreyfus_fabric:handle_error_message(Error, Worker, State#state.counters, State#state.replacements, - group2, State#state.start_args) of + group2, State#state.start_args, State#state.ring_opts) of {ok, Counters} -> {ok, State#state{counters=Counters}}; {new_refs, NewRefs, NewCounters, NewReplacements} -> diff --git a/src/dreyfus/src/dreyfus_fabric_info.erl b/src/dreyfus/src/dreyfus_fabric_info.erl index 27eec8065..e217bc0ef 100644 --- a/src/dreyfus/src/dreyfus_fabric_info.erl +++ b/src/dreyfus/src/dreyfus_fabric_info.erl @@ -49,7 +49,7 @@ handle_message({rexi_DOWN, _, {_,NodeRef},_}, _Worker, {Counters, Acc}) -> handle_message({rexi_EXIT, Reason}, Worker, {Counters, Acc}) -> NewCounters = fabric_dict:erase(Worker, Counters), - case fabric_view:is_progress_possible(NewCounters) of + case fabric_ring:is_progress_possible(NewCounters) of true -> {ok, {NewCounters, Acc}}; false -> @@ -74,7 +74,7 @@ handle_message({ok, Info}, Worker, {Counters, Acc}) -> handle_message({error, Reason}, Worker, {Counters, Acc}) -> NewCounters = fabric_dict:erase(Worker, Counters), - case fabric_view:is_progress_possible(NewCounters) of + case fabric_ring:is_progress_possible(NewCounters) of true -> {ok, {NewCounters, Acc}}; false -> @@ -82,7 +82,7 @@ handle_message({error, Reason}, Worker, {Counters, Acc}) -> end; handle_message({'EXIT', _}, Worker, {Counters, Acc}) -> NewCounters = fabric_dict:erase(Worker, Counters), - case fabric_view:is_progress_possible(NewCounters) of + case fabric_ring:is_progress_possible(NewCounters) of true -> {ok, {NewCounters, Acc}}; false -> diff --git a/src/dreyfus/src/dreyfus_fabric_search.erl b/src/dreyfus/src/dreyfus_fabric_search.erl index acf7a83ec..c0ebde1d6 100644 --- a/src/dreyfus/src/dreyfus_fabric_search.erl +++ b/src/dreyfus/src/dreyfus_fabric_search.erl @@ -27,7 +27,8 @@ top_docs, counters, start_args, - replacements + replacements, + ring_opts }). go(DbName, GroupId, IndexName, QueryArgs) when is_binary(GroupId) -> @@ -40,10 +41,11 @@ go(DbName, DDoc, IndexName, #index_query_args{bookmark=nil}=QueryArgs) -> DesignName = dreyfus_util:get_design_docid(DDoc), dreyfus_util:maybe_deny_index(DbName, DesignName, IndexName), Shards = dreyfus_util:get_shards(DbName, QueryArgs), + RingOpts = dreyfus_util:get_ring_opts(QueryArgs, Shards), Workers = fabric_util:submit_jobs(Shards, dreyfus_rpc, search, [DDoc, IndexName, dreyfus_util:export(QueryArgs)]), Counters = fabric_dict:init(Workers, nil), - go(DbName, DDoc, IndexName, QueryArgs, Counters, Counters); + go(DbName, DDoc, IndexName, QueryArgs, Counters, Counters, RingOpts); go(DbName, DDoc, IndexName, #index_query_args{}=QueryArgs) -> Bookmark0 = try dreyfus_bookmark:unpack(DbName, QueryArgs) @@ -54,6 +56,7 @@ go(DbName, DDoc, IndexName, #index_query_args{}=QueryArgs) -> Shards = dreyfus_util:get_shards(DbName, QueryArgs), LiveNodes = [node() | nodes()], LiveShards = [S || #shard{node=Node} = S <- Shards, lists:member(Node, LiveNodes)], + RingOpts = dreyful_util:get_ring_opts(QueryArgs, LiveShards), Bookmark1 = dreyfus_bookmark:add_missing_shards(Bookmark0, LiveShards), Counters0 = lists:flatmap(fun({#shard{name=Name, node=N} = Shard, After}) -> QueryArgs1 = dreyfus_util:export(QueryArgs#index_query_args{ @@ -73,14 +76,16 @@ go(DbName, DDoc, IndexName, #index_query_args{}=QueryArgs) -> end end, Bookmark1), Counters = fabric_dict:init(Counters0, nil), + WorkerShards = fabric_dict:fetch_keys(Counters), + RingOpts = dreyfus_util:get_ring_opts(QueryArgs, WorkerShards), QueryArgs2 = QueryArgs#index_query_args{ bookmark = Bookmark1 }, - go(DbName, DDoc, IndexName, QueryArgs2, Counters, Bookmark1); + go(DbName, DDoc, IndexName, QueryArgs2, Counters, Bookmark1, RingOpts); go(DbName, DDoc, IndexName, OldArgs) -> go(DbName, DDoc, IndexName, dreyfus_util:upgrade(OldArgs)). -go(DbName, DDoc, IndexName, QueryArgs, Counters, Bookmark) -> +go(DbName, DDoc, IndexName, QueryArgs, Counters, Bookmark, RingOpts) -> {Workers, _} = lists:unzip(Counters), #index_query_args{ limit = Limit, @@ -94,7 +99,8 @@ go(DbName, DDoc, IndexName, QueryArgs, Counters, Bookmark) -> top_docs = #top_docs{total_hits=0,hits=[]}, counters = Counters, start_args = [DDoc, IndexName, QueryArgs], - replacements = Replacements + replacements = Replacements, + ring_opts = RingOpts }, RexiMon = fabric_util:create_monitors(Workers), try rexi_utils:recv(Workers, #shard.ref, fun handle_message/3, @@ -154,7 +160,7 @@ handle_message(Error, Worker, State0) -> State = upgrade_state(State0), case dreyfus_fabric:handle_error_message(Error, Worker, State#state.counters, State#state.replacements, - search, State#state.start_args) of + search, State#state.start_args, State#state.ring_opts) of {ok, Counters} -> {ok, State#state{counters=Counters}}; {new_refs, NewRefs, NewCounters, NewReplacements} -> diff --git a/src/dreyfus/src/dreyfus_httpd.erl b/src/dreyfus/src/dreyfus_httpd.erl index 5c9db80d1..e9851639b 100644 --- a/src/dreyfus/src/dreyfus_httpd.erl +++ b/src/dreyfus/src/dreyfus_httpd.erl @@ -73,8 +73,6 @@ handle_search_req(#httpd{method=Method, path_parts=[_, _, _, _, IndexName]}=Req end; _ -> % ensure limit in group query >0 - LimitValue = parse_positive_int_param("limit", QueryArgs#index_query_args.limit, - "max_limit", "200"), UseNewApi = Grouping#grouping.new_api, case dreyfus_fabric_group1:go(DbName, DDoc, IndexName, QueryArgs) of {ok, []} -> diff --git a/src/dreyfus/src/dreyfus_index_updater.erl b/src/dreyfus/src/dreyfus_index_updater.erl index 3720cb63c..87edef0ad 100644 --- a/src/dreyfus/src/dreyfus_index_updater.erl +++ b/src/dreyfus/src/dreyfus_index_updater.erl @@ -59,7 +59,7 @@ update(IndexPid, Index) -> true = proc_prompt(Proc, [<<"add_fun">>, Index#index.def]), EnumFun = fun ?MODULE:load_docs/2, [Changes] = couch_task_status:get([changes_done]), - Acc0 = {Changes, IndexPid, Db, Proc, TotalChanges, now(), ExcludeIdRevs}, + Acc0 = {Changes, IndexPid, Db, Proc, TotalChanges, erlang:timestamp(), ExcludeIdRevs}, {ok, _} = couch_db:fold_changes(Db, CurSeq, EnumFun, Acc0, []), ok = clouseau_rpc:commit(IndexPid, NewCurSeq) after @@ -80,7 +80,7 @@ load_docs(FDI, {I, IndexPid, Db, Proc, Total, LastCommitTime, ExcludeIdRevs}=Acc false -> update_or_delete_index(IndexPid, Db, DI, Proc) end, %% Force a commit every minute - case timer:now_diff(Now = now(), LastCommitTime) >= 60000000 of + case timer:now_diff(Now = erlang:timestamp(), LastCommitTime) >= 60000000 of true -> ok = clouseau_rpc:commit(IndexPid, Seq), {ok, {I+1, IndexPid, Db, Proc, Total, Now, ExcludeIdRevs}}; diff --git a/src/dreyfus/src/dreyfus_util.erl b/src/dreyfus/src/dreyfus_util.erl index 6832299db..05ecdb621 100644 --- a/src/dreyfus/src/dreyfus_util.erl +++ b/src/dreyfus/src/dreyfus_util.erl @@ -19,7 +19,7 @@ -include_lib("mem3/include/mem3.hrl"). -include_lib("couch/include/couch_db.hrl"). --export([get_shards/2, sort/2, upgrade/1, export/1, time/2]). +-export([get_shards/2, get_ring_opts/2, sort/2, upgrade/1, export/1, time/2]). -export([in_black_list/1, in_black_list/3, maybe_deny_index/3]). -export([get_design_docid/1]). -export([ @@ -59,6 +59,15 @@ use_ushards(#index_query_args{stable=true}) -> use_ushards(#index_query_args{}) -> false. + +get_ring_opts(#index_query_args{partition = nil}, _Shards) -> + []; +get_ring_opts(#index_query_args{}, Shards) -> + Shards1 = lists:map(fun(#shard{} = S) -> + S#shard{ref = undefined} + end, Shards), + [{any, Shards1}]. + -spec sort(Order :: relevance | [any()], [#sortable{}]) -> [#sortable{}]. sort(Sort, List0) -> {List1, Stash} = stash_items(List0), @@ -342,7 +351,7 @@ get_signature_from_idxdir(IdxDir) -> false -> undefined end. -get_local_purge_doc_body(Db, LocalDocId, PurgeSeq, Index) -> +get_local_purge_doc_body(_, LocalDocId, PurgeSeq, Index) -> #index{ name = IdxName, ddoc_id = DDocId, @@ -418,4 +427,15 @@ stash_test() -> Unstashed = hd(unstash_items(Stashed, Stash)), ?assertEqual(Unstashed#sortable.item, bar). + +ring_opts_test() -> + Shards = [#shard{name = foo, ref = make_ref()}], + + QArgs1 = #index_query_args{partition = nil}, + ?assertEqual([], get_ring_opts(QArgs1, Shards)), + + QArgs2 = #index_query_args{partition = <<"x">>}, + ?assertMatch([{any, [#shard{name = foo, ref = undefined}]}], + get_ring_opts(QArgs2, Shards)). + -endif. diff --git a/src/fabric/src/fabric_db_partition_info.erl b/src/fabric/src/fabric_db_partition_info.erl index 2978832f0..954c52db2 100644 --- a/src/fabric/src/fabric_db_partition_info.erl +++ b/src/fabric/src/fabric_db_partition_info.erl @@ -17,15 +17,27 @@ -include_lib("fabric/include/fabric.hrl"). -include_lib("mem3/include/mem3.hrl"). + +-record(acc, { + counters, + replies, + ring_opts +}). + + go(DbName, Partition) -> - Shards = mem3:shards(DbName, <<Partition/binary, ":foo">>), + Shards = mem3:shards(DbName, couch_partition:shard_key(Partition)), Workers = fabric_util:submit_jobs(Shards, get_partition_info, [Partition]), RexiMon = fabric_util:create_monitors(Shards), Fun = fun handle_message/3, - Acc0 = {fabric_dict:init(Workers, nil), []}, + Acc0 = #acc{ + counters = fabric_dict:init(Workers, nil), + replies = [], + ring_opts = [{any, Shards}] + }, try case fabric_util:recv(Workers, #shard.ref, Fun, Acc0) of - {ok, Acc} -> {ok, Acc}; + {ok, Res} -> {ok, Res}; {timeout, {WorkersDict, _}} -> DefunctWorkers = fabric_util:remove_done_workers( WorkersDict, @@ -42,36 +54,39 @@ go(DbName, Partition) -> rexi_monitor:stop(RexiMon) end. -handle_message({rexi_DOWN, _, {_,NodeRef},_}, _Shard, {Counters, Acc}) -> - case fabric_util:remove_down_workers(Counters, NodeRef) of +handle_message({rexi_DOWN, _, {_,NodeRef},_}, _Shard, #acc{} = Acc) -> + #acc{counters = Counters, ring_opts = RingOpts} = Acc, + case fabric_util:remove_down_workers(Counters, NodeRef, RingOpts) of {ok, NewCounters} -> - {ok, {NewCounters, Acc}}; + {ok, Acc#acc{counters = NewCounters}}; error -> {error, {nodedown, <<"progress not possible">>}} end; -handle_message({rexi_EXIT, Reason}, Shard, {Counters, Acc}) -> +handle_message({rexi_EXIT, Reason}, Shard, #acc{} = Acc) -> + #acc{counters = Counters, ring_opts = RingOpts} = Acc, NewCounters = fabric_dict:erase(Shard, Counters), - case fabric_ring:is_progress_possible(NewCounters) of + case fabric_ring:is_progress_possible(NewCounters, RingOpts) of true -> - {ok, {NewCounters, Acc}}; + {ok, Acc#acc{counters = NewCounters}}; false -> {error, Reason} end; -handle_message({ok, Info}, #shard{dbname=Name} = Shard, {Counters, Acc}) -> - Acc2 = [Info | Acc], +handle_message({ok, Info}, #shard{dbname=Name} = Shard, #acc{} = Acc) -> + #acc{counters = Counters, replies = Replies} = Acc, + Replies1 = [Info | Replies], Counters1 = fabric_dict:erase(Shard, Counters), case fabric_dict:size(Counters1) =:= 0 of true -> - [FirstInfo | RestInfos] = Acc2, + [FirstInfo | RestInfos] = Replies1, PartitionInfo = get_max_partition_size(FirstInfo, RestInfos), {stop, [{db_name, Name} | format_partition(PartitionInfo)]}; false -> - {ok, {Counters1, Acc2}} + {ok, Acc#acc{counters = Counters1, replies = Replies1}} end; -handle_message(_, _, Acc) -> +handle_message(_, _, #acc{} = Acc) -> {ok, Acc}. @@ -97,3 +112,44 @@ format_partition(PartitionInfo) -> {value, {sizes, Size}, PartitionInfo1} = lists:keytake(sizes, 1, PartitionInfo), [{sizes, {Size}} | PartitionInfo1]. + +-ifdef(TEST). + +-include_lib("eunit/include/eunit.hrl"). + + +node_down_test() -> + [S1, S2] = [mk_shard("n1", [0, 4]), mk_shard("n2", [0, 8])], + Acc1 = #acc{ + counters = fabric_dict:init([S1, S2], nil), + ring_opts = [{any, [S1, S2]}] + }, + + N1 = S1#shard.node, + {ok, Acc2} = handle_message({rexi_DOWN, nil, {nil, N1}, nil}, nil, Acc1), + ?assertEqual([{S2, nil}], Acc2#acc.counters), + + N2 = S2#shard.node, + ?assertEqual({error, {nodedown, <<"progress not possible">>}}, + handle_message({rexi_DOWN, nil, {nil, N2}, nil}, nil, Acc2)). + + +worker_exit_test() -> + [S1, S2] = [mk_shard("n1", [0, 4]), mk_shard("n2", [0, 8])], + Acc1 = #acc{ + counters = fabric_dict:init([S1, S2], nil), + ring_opts = [{any, [S1, S2]}] + }, + + {ok, Acc2} = handle_message({rexi_EXIT, boom}, S1, Acc1), + ?assertEqual([{S2, nil}], Acc2#acc.counters), + + ?assertEqual({error, bam}, handle_message({rexi_EXIT, bam}, S2, Acc2)). + + +mk_shard(Name, Range) -> + Node = list_to_atom(Name), + BName = list_to_binary(Name), + #shard{name = BName, node = Node, range = Range}. + +-endif. diff --git a/src/fabric/src/fabric_util.erl b/src/fabric/src/fabric_util.erl index aaf0623f0..8aa14e73a 100644 --- a/src/fabric/src/fabric_util.erl +++ b/src/fabric/src/fabric_util.erl @@ -14,7 +14,7 @@ -export([submit_jobs/3, submit_jobs/4, cleanup/1, recv/4, get_db/1, get_db/2, error_info/1, update_counter/3, remove_ancestors/2, create_monitors/1, kv/2, - remove_down_workers/2, doc_id_and_rev/1]). + remove_down_workers/2, remove_down_workers/3, doc_id_and_rev/1]). -export([request_timeout/0, attachments_timeout/0, all_docs_timeout/0, view_timeout/1]). -export([log_timeout/2, remove_done_workers/2]). -export([is_users_db/1, is_replicator_db/1]). @@ -33,9 +33,12 @@ -include_lib("eunit/include/eunit.hrl"). remove_down_workers(Workers, BadNode) -> + remove_down_workers(Workers, BadNode, []). + +remove_down_workers(Workers, BadNode, RingOpts) -> Filter = fun(#shard{node = Node}, _) -> Node =/= BadNode end, NewWorkers = fabric_dict:filter(Filter, Workers), - case fabric_ring:is_progress_possible(NewWorkers) of + case fabric_ring:is_progress_possible(NewWorkers, RingOpts) of true -> {ok, NewWorkers}; false -> diff --git a/src/fabric/src/fabric_view.erl b/src/fabric/src/fabric_view.erl index 55b44e6f7..425f864c4 100644 --- a/src/fabric/src/fabric_view.erl +++ b/src/fabric/src/fabric_view.erl @@ -12,7 +12,7 @@ -module(fabric_view). --export([is_progress_possible/1, remove_overlapping_shards/2, maybe_send_row/1, +-export([remove_overlapping_shards/2, maybe_send_row/1, transform_row/1, keydict/1, extract_view/4, get_shards/2, check_down_shards/2, handle_worker_exit/3, get_shard_replacements/2, maybe_update_others/5]). @@ -46,10 +46,6 @@ handle_worker_exit(Collector, _Worker, Reason) -> {ok, Resp} = Callback({error, fabric_util:error_info(Reason)}, Acc), {error, Resp}. -%% @doc looks for a fully covered keyrange in the list of counters --spec is_progress_possible([{#shard{}, term()}]) -> boolean(). -is_progress_possible(Counters) -> - fabric_ring:is_progress_possible(Counters). -spec remove_overlapping_shards(#shard{}, [{#shard{}, any()}]) -> [{#shard{}, any()}]. @@ -416,28 +412,6 @@ fix_skip_and_limit(#mrargs{} = Args) -> remove_finalizer(Args) -> couch_mrview_util:set_extra(Args, finalizer, null). -% unit test -is_progress_possible_test() -> - EndPoint = 2 bsl 31, - T1 = [[0, EndPoint-1]], - ?assertEqual(is_progress_possible(mk_cnts(T1)),true), - T2 = [[0,10],[11,20],[21,EndPoint-1]], - ?assertEqual(is_progress_possible(mk_cnts(T2)),true), - % gap - T3 = [[0,10],[12,EndPoint-1]], - ?assertEqual(is_progress_possible(mk_cnts(T3)),false), - % outside range - T4 = [[1,10],[11,20],[21,EndPoint-1]], - ?assertEqual(is_progress_possible(mk_cnts(T4)),false), - % outside range - T5 = [[0,10],[11,20],[21,EndPoint]], - ?assertEqual(is_progress_possible(mk_cnts(T5)),false), - T6 = [[0, 10], [11, 20], [0, 5], [6, 21], [21, EndPoint - 1]], - ?assertEqual(is_progress_possible(mk_cnts(T6)), true), - % not possible, overlap is not exact - T7 = [[0, 10], [13, 20], [21, EndPoint - 1], [9, 12]], - ?assertEqual(is_progress_possible(mk_cnts(T7)), false). - remove_overlapping_shards_test() -> Cb = undefined, @@ -482,10 +456,6 @@ get_shard_replacements_test() -> ?assertEqual(Expect, Res). -mk_cnts(Ranges) -> - Shards = lists:map(fun mk_shard/1, Ranges), - orddict:from_list([{Shard,nil} || Shard <- Shards]). - mk_cnts(Ranges, NoNodes) -> orddict:from_list([{Shard,nil} || Shard <- @@ -502,10 +472,6 @@ mk_shards(NoNodes,Range,Shards) -> mk_shards(NoNodes-1,Range, [mk_shard(Name, Range) | Shards]). -mk_shard([B, E]) when is_integer(B), is_integer(E) -> - #shard{range = [B, E]}. - - mk_shard(Name, Range) -> Node = list_to_atom(Name), BName = list_to_binary(Name), diff --git a/src/ken/rebar.config.script b/src/ken/rebar.config.script index 26d6f4caa..3344206e5 100644 --- a/src/ken/rebar.config.script +++ b/src/ken/rebar.config.script @@ -11,7 +11,9 @@ % the License. HaveDreyfus = element(1, file:list_dir("../dreyfus")) == ok. -HaveHastings = element(1, file:list_dir("../hastings")) == ok. + +HastingsHome = os:getenv("HASTINGS_HOME", "../hastings"). +HaveHastings = element(1, file:list_dir(HastingsHome)) == ok. CurrOpts = case lists:keyfind(erl_opts, 1, CONFIG) of {erl_opts, Opts} -> Opts; diff --git a/src/mem3/src/mem3_rep.erl b/src/mem3/src/mem3_rep.erl index 4b75846ca..7fa0fc027 100644 --- a/src/mem3/src/mem3_rep.erl +++ b/src/mem3/src/mem3_rep.erl @@ -749,7 +749,6 @@ targets_map(#shard{name = <<"shards/", _/binary>> = SrcName} = Src, Shards0 = mem3:shards(mem3:dbname(SrcName)), Shards1 = [S || S <- Shards0, not shard_eq(S, Src)], Shards2 = [S || S <- Shards1, check_overlap(SrcRange, TgtNode, S)], - TMap = maps:from_list([{R, S} || #shard{range = R} = S <- Shards2]), case [{R, S} || #shard{range = R} = S <- Shards2] of [] -> % If target map is empty, create a target map with just diff --git a/src/mem3/src/mem3_sync_event_listener.erl b/src/mem3/src/mem3_sync_event_listener.erl index cd8a650f1..b6fbe3279 100644 --- a/src/mem3/src/mem3_sync_event_listener.erl +++ b/src/mem3/src/mem3_sync_event_listener.erl @@ -293,11 +293,12 @@ should_terminate(Pid) -> ?assert(is_process_alive(Pid)), EventMgr = whereis(config_event), + EventMgrWasAlive = (catch is_process_alive(EventMgr)), Ref = erlang:monitor(process, Pid), RestartFun = fun() -> exit(EventMgr, kill) end, - test_util:with_process_restart(config_event, RestartFun), + {_, _} = test_util:with_process_restart(config_event, RestartFun), ?assertNot(is_process_alive(EventMgr)), @@ -305,6 +306,9 @@ should_terminate(Pid) -> {'DOWN', Ref, _, _, _} -> ok after 1000 -> + ?debugFmt("~n XKCD should_terminate EventMgrWasAlive:~p MsgQueue:~p PInfo:~p ~n", [ + EventMgrWasAlive, process_info(self(), messages), process_info(Pid) + ]), ?assert(false) end, diff --git a/test/elixir/test/replication_test.exs b/test/elixir/test/replication_test.exs index 73ceca6a4..bdd683e97 100644 --- a/test/elixir/test/replication_test.exs +++ b/test/elixir/test/replication_test.exs @@ -75,8 +75,8 @@ defmodule ReplicationTest do assert is_list(result["history"]) assert length(result["history"]) == 2 history = Enum.at(result["history"], 0) - assert history["docs_written"] == 1 - assert history["docs_read"] == 1 + assert history["docs_written"] == 2 + assert history["docs_read"] == 2 assert history["doc_write_failures"] == 0 query = %{ @@ -352,10 +352,10 @@ defmodule ReplicationTest do assert history["session_id"] == result["session_id"] assert is_binary(history["start_time"]) assert is_binary(history["end_time"]) - assert history["missing_checked"] == 6 - assert history["missing_found"] == 6 - assert history["docs_read"] == 6 - assert history["docs_written"] == 6 + assert history["missing_checked"] == 27 + assert history["missing_found"] == 27 + assert history["docs_read"] == 27 + assert history["docs_written"] == 27 assert history["doc_write_failures"] == 0 copy = Couch.get!("/#{tgt_db_name}/#{new_doc["_id"]}").body @@ -414,10 +414,10 @@ defmodule ReplicationTest do assert is_list(result["history"]) assert length(result["history"]) == 3 history = Enum.at(result["history"], 0) - assert history["missing_checked"] == 1 - assert history["missing_found"] == 1 - assert history["docs_read"] == 1 - assert history["docs_written"] == 1 + assert history["missing_checked"] == 28 + assert history["missing_found"] == 28 + assert history["docs_read"] == 28 + assert history["docs_written"] == 28 assert history["doc_write_failures"] == 0 resp = Couch.get("/#{tgt_db_name}/#{del_doc["_id"]}") @@ -446,10 +446,10 @@ defmodule ReplicationTest do assert is_list(result["history"]) assert length(result["history"]) == 4 history = Enum.at(result["history"], 0) - assert history["missing_checked"] == 1 - assert history["missing_found"] == 1 - assert history["docs_read"] == 1 - assert history["docs_written"] == 1 + assert history["missing_checked"] == 29 + assert history["missing_found"] == 29 + assert history["docs_read"] == 29 + assert history["docs_written"] == 29 assert history["doc_write_failures"] == 0 copy = Couch.get!("/#{tgt_db_name}/2", query: %{:conflicts => true}).body @@ -473,10 +473,10 @@ defmodule ReplicationTest do assert is_list(result["history"]) assert length(result["history"]) == 5 history = Enum.at(result["history"], 0) - assert history["missing_checked"] == 1 - assert history["missing_found"] == 1 - assert history["docs_read"] == 1 - assert history["docs_written"] == 1 + assert history["missing_checked"] == 30 + assert history["missing_found"] == 30 + assert history["docs_read"] == 30 + assert history["docs_written"] == 30 assert history["doc_write_failures"] == 0 copy = Couch.get!("/#{tgt_db_name}/2", query: %{:conflicts => true}).body @@ -502,10 +502,10 @@ defmodule ReplicationTest do assert is_list(result["history"]) assert length(result["history"]) == 6 history = Enum.at(result["history"], 0) - assert history["missing_checked"] == 1 - assert history["missing_found"] == 1 - assert history["docs_read"] == 1 - assert history["docs_written"] == 1 + assert history["missing_checked"] == 31 + assert history["missing_found"] == 31 + assert history["docs_read"] == 31 + assert history["docs_written"] == 31 assert history["doc_write_failures"] == 0 copy = Couch.get!("/#{tgt_db_name}/2", query: %{:conflicts => true}).body @@ -534,10 +534,10 @@ defmodule ReplicationTest do assert is_list(result["history"]) assert length(result["history"]) == 7 history = Enum.at(result["history"], 0) - assert history["missing_checked"] == 3 - assert history["missing_found"] == 1 - assert history["docs_read"] == 1 - assert history["docs_written"] == 1 + assert history["missing_checked"] == 34 + assert history["missing_found"] == 32 + assert history["docs_read"] == 32 + assert history["docs_written"] == 32 assert history["doc_write_failures"] == 0 docs = [ @@ -559,10 +559,10 @@ defmodule ReplicationTest do assert is_list(result["history"]) assert length(result["history"]) == 8 history = Enum.at(result["history"], 0) - assert history["missing_checked"] == 2 - assert history["missing_found"] == 0 - assert history["docs_read"] == 0 - assert history["docs_written"] == 0 + assert history["missing_checked"] == 36 + assert history["missing_found"] == 32 + assert history["docs_read"] == 32 + assert history["docs_written"] == 32 assert history["doc_write_failures"] == 0 # Test nothing to replicate @@ -822,10 +822,10 @@ defmodule ReplicationTest do assert length(result["history"]) == 2 history = Enum.at(result["history"], 0) - assert history["missing_checked"] == 3 - assert history["missing_found"] == 3 - assert history["docs_read"] == 3 - assert history["docs_written"] == 3 + assert history["missing_checked"] == 19 + assert history["missing_found"] == 19 + assert history["docs_read"] == 19 + assert history["docs_written"] == 19 assert history["doc_write_failures"] == 0 end @@ -1185,8 +1185,8 @@ defmodule ReplicationTest do result = replicate(repl_src, repl_tgt, body: repl_body) assert result["ok"] - assert result["docs_read"] == 1 - assert result["docs_written"] == 1 + assert result["docs_read"] == 2 + assert result["docs_written"] == 2 assert result["doc_write_failures"] == 0 retry_until(fn -> |