diff options
author | Jan Lehnardt <jan@apache.org> | 2021-10-30 10:49:01 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-10-30 10:49:01 +0200 |
commit | f5dedf74d200611bcb257f9dc451bf2f2f1bbaca (patch) | |
tree | dd7f8b1f27e6263b6390d6789626017e7a76e4c6 | |
parent | 4da057a91337590b44dd5ca303b365ebdda65867 (diff) | |
parent | aa6744892a12eadc1421f36e788bd2d9156cf65e (diff) | |
download | couchdb-fix-reduce-collation-bug.tar.gz |
Merge branch '3.x' into fix-reduce-collation-bugfix-reduce-collation-bug
24 files changed, 663 insertions, 158 deletions
diff --git a/src/chttpd/src/chttpd_node.erl b/src/chttpd/src/chttpd_node.erl index 7486aadfe..e92a1e506 100644 --- a/src/chttpd/src/chttpd_node.erl +++ b/src/chttpd/src/chttpd_node.erl @@ -33,13 +33,20 @@ handle_node_req(#httpd{path_parts=[A, <<"_local">>|Rest]}=Req) -> handle_node_req(Req#httpd{path_parts=[A, node()] ++ Rest}); % GET /_node/$node/_versions handle_node_req(#httpd{method='GET', path_parts=[_, _Node, <<"_versions">>]}=Req) -> - send_json(Req, 200, {[ - {erlang_version, ?l2b(?COUCHDB_ERLANG_VERSION)}, - {javascript_engine, {[ - {name, <<"spidermonkey">>}, - {version, couch_server:get_spidermonkey_version()} - ]}} - ]}); + IcuVer = couch_ejson_compare:get_icu_version(), + UcaVer = couch_ejson_compare:get_uca_version(), + send_json(Req, 200, #{ + erlang_version => ?l2b(?COUCHDB_ERLANG_VERSION), + collation_driver => #{ + name => <<"libicu">>, + library_version => version_tuple_to_str(IcuVer), + collation_algorithm_version => version_tuple_to_str(UcaVer) + }, + javascript_engine => #{ + name => <<"spidermonkey">>, + version => couch_server:get_spidermonkey_version() + } + }); handle_node_req(#httpd{path_parts=[_, _Node, <<"_versions">>]}=Req) -> send_method_not_allowed(Req, "GET"); @@ -322,3 +329,10 @@ run_queues() -> [DCQ | SQs] = lists:reverse(statistics(run_queue_lengths)), {lists:sum(SQs), DCQ} end. + +version_tuple_to_str(Version) when is_tuple(Version) -> + List1 = tuple_to_list(Version), + IsZero = fun(N) -> N == 0 end, + List2 = lists:reverse(lists:dropwhile(IsZero, lists:reverse(List1))), + List3 = [erlang:integer_to_list(N) || N <- List2], + ?l2b(lists:join(".", List3)). diff --git a/src/couch/priv/couch_ejson_compare/couch_ejson_compare.c b/src/couch/priv/couch_ejson_compare/couch_ejson_compare.c index f453a295f..6e200320d 100644 --- a/src/couch/priv/couch_ejson_compare/couch_ejson_compare.c +++ b/src/couch/priv/couch_ejson_compare/couch_ejson_compare.c @@ -166,6 +166,40 @@ compare_strings_nif(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) } +ERL_NIF_TERM +get_icu_version(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) +{ + UVersionInfo ver = {0}; + ERL_NIF_TERM tup[U_MAX_VERSION_LENGTH] = {0}; + int i; + + u_getVersion(ver); + + for (i = 0; i < U_MAX_VERSION_LENGTH; i++) { + tup[i] = enif_make_int(env, ver[i]); + } + + return enif_make_tuple_from_array(env, tup, U_MAX_VERSION_LENGTH); +} + + +ERL_NIF_TERM +get_uca_version(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) +{ + UVersionInfo ver = {0}; + ERL_NIF_TERM tup[U_MAX_VERSION_LENGTH] = {0}; + int i; + + ucol_getUCAVersion(get_collator(), ver); + + for (i = 0; i < U_MAX_VERSION_LENGTH; i++) { + tup[i] = enif_make_int(env, ver[i]); + } + + return enif_make_tuple_from_array(env, tup, U_MAX_VERSION_LENGTH); +} + + int less_json(int depth, ctx_t* ctx, ERL_NIF_TERM a, ERL_NIF_TERM b) { @@ -531,7 +565,9 @@ on_unload(ErlNifEnv* env, void* priv_data) static ErlNifFunc nif_functions[] = { {"less_nif", 2, less_json_nif}, - {"compare_strings_nif", 2, compare_strings_nif} + {"compare_strings_nif", 2, compare_strings_nif}, + {"get_icu_version", 0, get_icu_version}, + {"get_uca_version", 0, get_uca_version} }; diff --git a/src/couch/src/couch_ejson_compare.erl b/src/couch/src/couch_ejson_compare.erl index 8681296f1..b02b9ba7c 100644 --- a/src/couch/src/couch_ejson_compare.erl +++ b/src/couch/src/couch_ejson_compare.erl @@ -12,10 +12,20 @@ -module(couch_ejson_compare). --export([less/2, less_json_ids/2, less_json/2]). +-export([ + less/2, + less_json_ids/2, + less_json/2, + get_icu_version/0, + get_uca_version/0 +]). % For testing --export([less_nif/2, less_erl/2, compare_strings_nif/2]). +-export([ + less_nif/2, + less_erl/2, + compare_strings_nif/2 +]). -on_load(init/0). @@ -51,6 +61,14 @@ less_json(A,B) -> less(A, B) < 0. +get_icu_version() -> + erlang:nif_error(get_icu_version). + + +get_uca_version() -> + erlang:nif_error(get_uca_version). + + less_nif(A, B) -> erlang:nif_error(less_nif_load_error, [A, B]). diff --git a/src/couch/src/couch_server.erl b/src/couch/src/couch_server.erl index 5dc0a05f0..3c72e3357 100644 --- a/src/couch/src/couch_server.erl +++ b/src/couch/src/couch_server.erl @@ -189,7 +189,8 @@ maybe_add_sys_db_callbacks(DbName, Options) -> orelse path_ends_with(DbName, UsersDbSuffix), if DbName == DbsDbName -> - [sys_db | Options]; + [{before_doc_update, fun mem3_bdu:before_doc_update/3}, + sys_db | Options]; DbName == NodesDbName -> [sys_db | Options]; IsReplicatorDb -> diff --git a/src/couch/test/eunit/couch_ejson_compare_tests.erl b/src/couch/test/eunit/couch_ejson_compare_tests.erl index 790f6e54c..1dfbad4ed 100644 --- a/src/couch/test/eunit/couch_ejson_compare_tests.erl +++ b/src/couch/test/eunit/couch_ejson_compare_tests.erl @@ -183,6 +183,26 @@ zero_width_chars() -> % Regular EUnit tests +get_icu_version_test() -> + Ver = couch_ejson_compare:get_icu_version(), + ?assertMatch({_, _, _, _}, Ver), + {V1, V2, V3, V4} = Ver, + ?assert(is_integer(V1) andalso V1 > 0), + ?assert(is_integer(V2) andalso V2 >= 0), + ?assert(is_integer(V3) andalso V3 >= 0), + ?assert(is_integer(V4) andalso V4 >= 0). + + +get_uca_version_test() -> + Ver = couch_ejson_compare:get_uca_version(), + ?assertMatch({_, _, _, _}, Ver), + {V1, V2, V3, V4} = Ver, + ?assert(is_integer(V1) andalso V1 > 0), + ?assert(is_integer(V2) andalso V2 >= 0), + ?assert(is_integer(V3) andalso V3 >= 0), + ?assert(is_integer(V4) andalso V4 >= 0). + + max_depth_error_list_test() -> % NIF can handle terms with depth <= 9 Nested9 = nest_list(<<"val">>, 9), diff --git a/src/couch/test/eunit/couch_util_tests.erl b/src/couch/test/eunit/couch_util_tests.erl index f6d7d958a..44a5cce0a 100644 --- a/src/couch/test/eunit/couch_util_tests.erl +++ b/src/couch/test/eunit/couch_util_tests.erl @@ -15,24 +15,6 @@ -include_lib("couch/include/couch_eunit.hrl"). -setup() -> - %% We cannot start driver from here since it becomes bounded to eunit - %% master process and the next couch_server_sup:start_link call will - %% fail because server couldn't load driver since it already is. - %% - %% On other hand, we cannot unload driver here due to - %% {error, not_loaded_by_this_process} while it is. Any ideas is welcome. - %% - Ctx = test_util:start_couch(), - %% config:start_link(?CONFIG_CHAIN), - Ctx. - -teardown(Ctx) -> - ok = test_util:stop_couch(Ctx), - %% config:stop(), - ok. - - validate_callback_exists_test_() -> { "validate_callback_exists tests", diff --git a/src/couch_prometheus/test/eunit/couch_prometheus_e2e_tests.erl b/src/couch_prometheus/test/eunit/couch_prometheus_e2e_tests.erl index c862b9a9f..5b8adfd1d 100644 --- a/src/couch_prometheus/test/eunit/couch_prometheus_e2e_tests.erl +++ b/src/couch_prometheus/test/eunit/couch_prometheus_e2e_tests.erl @@ -85,17 +85,15 @@ node_call_prometheus_http(_) -> Url = construct_url(?PROM_PORT), {ok, RC1, _, _} = test_request:get( Url, - [?CONTENT_JSON, ?AUTH], - [] + [?CONTENT_JSON, ?AUTH] ), % since this port doesn't require auth, this should work {ok, RC2, _, _} = test_request:get( Url, - [?CONTENT_JSON], - [] + [?CONTENT_JSON] ), delete_db(Url), - ?_assertEqual(200, RC2). + ?_assertEqual({200, 200}, {RC1, RC2}). % we don't start the http server deny_prometheus_http(_) -> @@ -121,8 +119,6 @@ construct_url(Port) -> lists:concat(["http://", Addr, ":", Port, "/_node/_local/_prometheus"]). create_db(Url) -> - Addr = config:get("chttpd", "bind_address", "127.0.0.1"), - Port = mochiweb_socket_server:get(chttpd, port), {ok, Status, _, _} = test_request:put(Url, [?CONTENT_JSON, ?AUTH], "{}"), ?assert(Status =:= 201 orelse Status =:= 202). diff --git a/src/couch_replicator/test/eunit/couch_replicator_retain_stats_between_job_runs.erl b/src/couch_replicator/test/eunit/couch_replicator_retain_stats_between_job_runs.erl index 037f37191..a9a0fc943 100644 --- a/src/couch_replicator/test/eunit/couch_replicator_retain_stats_between_job_runs.erl +++ b/src/couch_replicator/test/eunit/couch_replicator_retain_stats_between_job_runs.erl @@ -138,7 +138,7 @@ start_job() -> check_active_tasks(DocsRead, DocsWritten, DocsFailed) -> - RepTask = wait_for_task_status(), + RepTask = wait_for_task_status(DocsWritten), ?assertNotEqual(timeout, RepTask), ?assertEqual(DocsRead, couch_util:get_value(docs_read, RepTask)), ?assertEqual(DocsWritten, couch_util:get_value(docs_written, RepTask)), @@ -147,7 +147,7 @@ check_active_tasks(DocsRead, DocsWritten, DocsFailed) -> check_scheduler_jobs(DocsRead, DocsWritten, DocFailed) -> - Info = wait_scheduler_info(), + Info = wait_scheduler_info(DocsWritten), ?assert(maps:is_key(<<"changes_pending">>, Info)), ?assert(maps:is_key(<<"doc_write_failures">>, Info)), ?assert(maps:is_key(<<"docs_read">>, Info)), @@ -167,21 +167,29 @@ replication_tasks() -> end, couch_task_status:all()). -wait_for_task_status() -> +wait_for_task_status(DocsWritten) -> test_util:wait(fun() -> case replication_tasks() of [] -> wait; - [RepTask] -> RepTask + [RepTask] -> + case couch_util:get_value(docs_written, RepTask) of + DocsWritten -> RepTask; + _Other -> wait + end end end). -wait_scheduler_info() -> +wait_scheduler_info(DocsWritten) -> test_util:wait(fun() -> case scheduler_jobs() of [] -> wait; [#{<<"info">> := null}] -> wait; - [#{<<"info">> := Info}] -> Info + [#{<<"info">> := Info}] -> + case maps:get(<<"docs_written">>, Info, undefined) of + DocsWritten -> Info; + _Other -> wait + end end end). diff --git a/src/custodian/README b/src/custodian/README index 72681f447..ff88373c5 100644 --- a/src/custodian/README +++ b/src/custodian/README @@ -1,6 +1,6 @@ Custodian is responsible for the data stored in CouchDB databases. -Custodian scans the "dbs" database, which details the location of +Custodian scans the shards database, which details the location of every shard of every database and ensures that operators are aware of any shard that is under-replicated (has less than N copies). diff --git a/src/custodian/src/custodian.hrl b/src/custodian/src/custodian.hrl deleted file mode 100644 index bce22cf95..000000000 --- a/src/custodian/src/custodian.hrl +++ /dev/null @@ -1,49 +0,0 @@ -% Licensed under the Apache License, Version 2.0 (the "License"); you may not -% use this file except in compliance with the License. You may obtain a copy of -% the License at -% -% http://www.apache.org/licenses/LICENSE-2.0 -% -% Unless required by applicable law or agreed to in writing, software -% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -% License for the specific language governing permissions and limitations under -% the License. - --define(CUSTODIAN_ID, <<"_design/custodian">>). - --define(CUSTODIAN_VALIDATION, -<<"function(newDoc, oldDoc) { - var i, range, node; - if(newDoc['_id'].substring(0, 8) === \"_design/\") return; - if(newDoc['_deleted'] === true) return; - if (!newDoc.by_node) { - throw({forbidden: \"by_node is mandatory\"}); - } - if (!newDoc.by_range) { - throw({forbidden: \"by_range is mandatory\"}); - } - for (node in newDoc.by_node) { - for (i in newDoc.by_node[node]) { - range = newDoc.by_node[node][i]; - if(!newDoc.by_range[range]) { - throw({forbidden: \"by_range for \" + range + \" is missing\"}); - } - if(newDoc.by_range[range].indexOf(node) === -1) { - throw({forbidden : \"by_range for \" + range + \" is missing \" + node}); - } - } - } - for (range in newDoc.by_range) { - for (i in newDoc.by_range[range]) { - node = newDoc.by_range[range][i]; - if(!newDoc.by_node[node]) { - throw({forbidden: \"by_node for \" + node + \" is missing\"}); - } - if (newDoc.by_node[node].indexOf(range) === -1) { - throw({forbidden: \"by_node for \" + node + \" is missing \" + range}); - } - } - } -} -">>). diff --git a/src/custodian/src/custodian_server.erl b/src/custodian/src/custodian_server.erl index 0a21eed23..0c8b87e87 100644 --- a/src/custodian/src/custodian_server.erl +++ b/src/custodian/src/custodian_server.erl @@ -132,8 +132,9 @@ start_shard_checker(#state{shard_checker=Pid}=State) when is_pid(Pid) -> start_event_listener() -> + DbName = mem3_sync:shards_db(), couch_event:link_listener( - ?MODULE, handle_db_event, nil, [{dbname, <<"dbs">>}] + ?MODULE, handle_db_event, nil, [{dbname, DbName}] ). handle_db_event(_DbName, updated, _St) -> diff --git a/src/custodian/src/custodian_util.erl b/src/custodian/src/custodian_util.erl index ee217108f..6d5a56093 100644 --- a/src/custodian/src/custodian_util.erl +++ b/src/custodian/src/custodian_util.erl @@ -11,7 +11,6 @@ % the License. -module(custodian_util). --include("custodian.hrl"). -include_lib("mem3/include/mem3.hrl"). -include_lib("couch/include/couch_db.hrl"). @@ -19,7 +18,10 @@ -export([summary/0, report/0]). -export([ensure_dbs_exists/0]). --record(state, {live, safe, n, callback, db, acc}). +% Old design doc which should be cleaned up +-define(CUSTODIAN_ID, <<"_design/custodian">>). + +-record(state, {live, safe, callback, db, acc}). %% public functions. @@ -45,7 +47,7 @@ report() -> ensure_dbs_exists() -> DbName = mem3_sync:shards_db(), {ok, Db} = mem3_util:ensure_exists(DbName), - ensure_custodian_ddoc_exists(Db), + ensure_custodian_ddoc_is_deleted(Db), {ok, Db}. %% private functions. @@ -53,10 +55,9 @@ ensure_dbs_exists() -> fold_dbs(Acc, Fun) -> Safe = maybe_redirect([node() | nodes()]), Live = Safe -- maintenance_nodes(Safe), - N = cluster_n(), {ok, Db} = ensure_dbs_exists(), try - State0 = #state{live=Live, safe=Safe, n=N, callback=Fun, db=Db, acc=Acc}, + State0 = #state{live=Live, safe=Safe, callback=Fun, db=Db, acc=Acc}, {ok, State1} = couch_db:fold_docs(Db, fun fold_dbs1/2, State0, []), State1#state.acc after @@ -80,9 +81,9 @@ fold_dbs1(#full_doc_info{id = Id} = FDI, State) -> fold_dbs(Id, Shards, State) -> IsSafe = fun(#shard{node = N}) -> lists:member(N, State#state.safe) end, IsLive = fun(#shard{node = N}) -> lists:member(N, State#state.live) end, - TargetN = State#state.n, LiveShards = lists:filter(IsLive, Shards), SafeShards = lists:filter(IsSafe, Shards), + TargetN = mem3_util:calculate_max_n(Shards), Acc0 = State#state.acc, Acc1 = case mem3_util:calculate_max_n(LiveShards) of LiveN when LiveN < TargetN -> @@ -180,41 +181,28 @@ count_conflicts(#full_doc_info{rev_tree = T}) -> Leafs = [1 || {#leaf{deleted=false}, _} <- couch_key_tree:get_all_leafs(T)], length(Leafs) - 1. -ensure_custodian_ddoc_exists(Db) -> + +% Ensure the design doc which was added 3.2.0 is deleted as we switched to using a BDU +% function instead. After a few releases this function could be removed as well +% +ensure_custodian_ddoc_is_deleted(Db) -> case couch_db:open_doc(Db, ?CUSTODIAN_ID, [ejson_body]) of {not_found, _Reason} -> - try couch_db:update_doc(Db, custodian_ddoc(), []) of - {ok, _} -> - ok - catch conflict -> - {ok, NewDb} = couch_db:reopen(Db), - ensure_custodian_ddoc_exists(NewDb) - end; + ok; {ok, Doc} -> - {Props} = couch_doc:to_json_obj(Doc, []), - Props1 = lists:keystore(<<"validate_doc_update">>, 1, Props, {<<"validate_doc_update">>, ?CUSTODIAN_VALIDATION}), - case Props =:= Props1 of - true -> - ok; - false -> - try couch_db:update_doc(Db, couch_doc:from_json_obj({Props1}), []) of - {ok, _} -> - ok - catch conflict -> - {ok, NewDb} = couch_db:reopen(Db), - ensure_custodian_ddoc_exists(NewDb) - end + DeletedDoc = Doc#doc{deleted = true, body = {[]}}, + try couch_db:update_doc(Db, DeletedDoc, [?ADMIN_CTX]) of + {ok, _} -> + LogMsg = "~p : deleted custodian ddoc ~s", + couch_log:notice(LogMsg, [?MODULE, ?CUSTODIAN_ID]), + ok + catch + conflict -> + {ok, NewDb} = couch_db:reopen(Db), + ensure_custodian_ddoc_is_deleted(NewDb) end end. -custodian_ddoc() -> - Props = [ - {<<"_id">>, ?CUSTODIAN_ID}, - {<<"language">>, <<"javascript">>}, - {<<"validate_doc_update">>, ?CUSTODIAN_VALIDATION} - ], - couch_doc:from_json_obj({Props}). - -ifdef(TEST). -include_lib("eunit/include/eunit.hrl"). diff --git a/src/dreyfus/test/dreyfus_blacklist_await_test.erl b/src/dreyfus/test/dreyfus_blacklist_await_test.erl index 28a5e7f30..82665eb02 100644 --- a/src/dreyfus/test/dreyfus_blacklist_await_test.erl +++ b/src/dreyfus/test/dreyfus_blacklist_await_test.erl @@ -62,8 +62,8 @@ do_not_await_1() -> State = create_state(?DBNAME, Index, nil, nil, []), Msg = "Index Blocked from Updating - db: ~p, ddocid: ~p name: ~p", Return = wait_log_message(Msg, fun() -> - {noreply, NewState} = dreyfus_index:handle_call({await, 1}, - self(), State) + {noreply, _NewState} = dreyfus_index:handle_call({await, 1}, + self(), State) end), ?assertEqual(Return, ok). diff --git a/src/dreyfus/test/dreyfus_purge_test.erl b/src/dreyfus/test/dreyfus_purge_test.erl index 5fa4bc90f..9b24d6f64 100644 --- a/src/dreyfus/test/dreyfus_purge_test.erl +++ b/src/dreyfus/test/dreyfus_purge_test.erl @@ -27,6 +27,8 @@ test_local_doc/0, test_delete_local_doc/0, test_purge_search/0]). -compile(export_all). +-compile(nowarn_export_all). + test_all() -> test_purge_single(), @@ -703,10 +705,14 @@ test_purge_search() -> %private API db_name() -> - Nums = tuple_to_list(erlang:now()), - Prefix = "test-db", - Suffix = lists:concat([integer_to_list(Num) || Num <- Nums]), - list_to_binary(Prefix ++ "-" ++ Suffix). + iolist_to_binary([ + "dreyfus-test-db-", [ + integer_to_list(I) || I <- [ + erlang:unique_integer([positive]), + rand:uniform(10000) + ] + ] + ]). purge_docs(DBName, DocIds) -> IdsRevs = [{DocId, [get_rev(DBName, DocId)]} || DocId <- DocIds], diff --git a/src/dreyfus/test/dreyfus_test_util.erl b/src/dreyfus/test/dreyfus_test_util.erl index 631bc1047..79fd9b59d 100644 --- a/src/dreyfus/test/dreyfus_test_util.erl +++ b/src/dreyfus/test/dreyfus_test_util.erl @@ -1,6 +1,8 @@ -module(dreyfus_test_util). --compile(export_all). +-export([ + wait_config_change/2 +]). -include_lib("couch/include/couch_db.hrl"). diff --git a/src/fabric/src/fabric_view_changes.erl b/src/fabric/src/fabric_view_changes.erl index eea6a72bb..9fdbf06df 100644 --- a/src/fabric/src/fabric_view_changes.erl +++ b/src/fabric/src/fabric_view_changes.erl @@ -146,29 +146,22 @@ send_changes(DbName, ChangesArgs, Callback, PackedSeqs, AccIn, Timeout) -> Ref = rexi:cast(N, {fabric_rpc, changes, [Name, ChangesArgs, Arg]}), {S#shard{ref = Ref}, Seq} end, WSplitSeqs0), - % For ranges that were not split start sequences from 0 - WReps = lists:map(fun(#shard{name = Name, node = N} = S) -> - Ref = rexi:cast(N, {fabric_rpc, changes, [Name, ChangesArgs, 0]}), + % For ranges that were not split, look for a replacement on a different node + WReps = lists:map(fun(#shard{name = Name, node = NewNode, range = R} = S) -> + Arg = find_replacement_sequence(Dead, R), + case Arg =/= 0 of true -> ok; false -> + couch_log:warning("~p reset seq for ~p", [?MODULE, S]) + end, + Ref = rexi:cast(NewNode, {fabric_rpc, changes, [Name, ChangesArgs, Arg]}), {S#shard{ref = Ref}, 0} end, Reps1), Seqs = WSeqs ++ WSplitSeqs ++ WReps, {Workers0, _} = lists:unzip(Seqs), Repls = fabric_ring:get_shard_replacements(DbName, Workers0), StartFun = fun(#shard{name=Name, node=N, range=R0}=Shard) -> - %% Find the original shard copy in the Seqs array - case lists:dropwhile(fun({S, _}) -> S#shard.range =/= R0 end, Seqs) of - [{#shard{}, {replace, _, _, _}} | _] -> - % Don't attempt to replace a replacement - SeqArg = 0; - [{#shard{node = OldNode}, OldSeq} | _] -> - SeqArg = make_replacement_arg(OldNode, OldSeq); - _ -> - % TODO this clause is probably unreachable in the N>2 - % case because we compute replacements only if a shard has one - % in the original set. - couch_log:error("Streaming ~s from zero while replacing ~p", - [Name, PackedSeqs]), - SeqArg = 0 + SeqArg = find_replacement_sequence(Seqs, R0), + case SeqArg =/= 0 of true -> ok; false -> + couch_log:warning("~p StartFun reset seq for ~p", [?MODULE, Shard]) end, Ref = rexi:cast(N, {fabric_rpc, changes, [Name, ChangesArgs, SeqArg]}), Shard#shard{ref = Ref} @@ -670,6 +663,22 @@ find_split_shard_replacements(DeadWorkers, Shards) -> {fabric_dict:from_list(Workers), Available}. +find_replacement_sequence(OriginalSeqs, R0) -> + %% Find the original shard copy in the Seqs array + case lists:dropwhile(fun({S, _}) -> S#shard.range =/= R0 end, OriginalSeqs) of + [{#shard{}, {replace, _, _, _}} | _] -> + % Don't attempt to replace a replacement + 0; + [{#shard{node = OldNode}, OldSeq} | _] -> + make_replacement_arg(OldNode, OldSeq); + _ -> + % TODO we don't currently attempt to replace a shard with split + % replicas of that range on other nodes, so it's possible to end + % up with an empty list here. + 0 + end. + + make_split_seq({Num, Uuid, Node}, RepCount) when RepCount > 1 -> {Num, {split, Uuid}, Node}; make_split_seq(Seq, _) -> @@ -892,3 +901,36 @@ find_split_shard_replacements_test() -> {Workers3, ShardsLeft3} = find_split_shard_replacements(Dead3, Shards3), ?assertEqual([], Workers3), ?assertEqual(Shards3, ShardsLeft3). + + +find_replacement_sequence_test() -> + Shards = [{"n2", 0, 10}, {"n3", 0, 5}], + Uuid = <<"abc1234">>, + Epoch = 'n1', + + % Not safe to use a plain integer sequence number + Dead1 = mk_workers(Shards, 42), + ?assertEqual(0, find_replacement_sequence(Dead1, [0, 10])), + ?assertEqual(0, find_replacement_sequence(Dead1, [0, 5])), + + % {Seq, Uuid} should work + Dead2 = mk_workers(Shards, {43, Uuid}), + ?assertEqual({replace, 'n2', Uuid, 43}, + find_replacement_sequence(Dead2, [0, 10])), + ?assertEqual({replace, 'n3', Uuid, 43}, + find_replacement_sequence(Dead2, [0, 5])), + + % Can't find the range at all + ?assertEqual(0, find_replacement_sequence(Dead2, [0, 4])), + + % {Seq, Uuids, EpochNode} should work + Dead3 = mk_workers(Shards, {44, Uuid, Epoch}), + ?assertEqual({replace, 'n1', Uuid, 44}, + find_replacement_sequence(Dead3, [0, 10])), + ?assertEqual({replace, 'n1', Uuid, 44}, + find_replacement_sequence(Dead3, [0, 5])), + + % Cannot replace a replacement + Dead4 = mk_workers(Shards, {replace, 'n1', Uuid, 45}), + ?assertEqual(0, find_replacement_sequence(Dead4, [0, 10])), + ?assertEqual(0, find_replacement_sequence(Dead4, [0, 5])). diff --git a/src/mem3/rebar.config b/src/mem3/rebar.config new file mode 100644 index 000000000..362c8785e --- /dev/null +++ b/src/mem3/rebar.config @@ -0,0 +1,14 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +{cover_enabled, true}. +{cover_print_enabled, true}. diff --git a/src/mem3/src/mem3_bdu.erl b/src/mem3/src/mem3_bdu.erl new file mode 100644 index 000000000..bf84d4470 --- /dev/null +++ b/src/mem3/src/mem3_bdu.erl @@ -0,0 +1,112 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(mem3_bdu). + + +-export([ + before_doc_update/3 +]). + + +-include_lib("couch/include/couch_db.hrl"). + + +-spec before_doc_update(#doc{}, Db::any(), couch_db:update_type()) -> #doc{}. +before_doc_update(#doc{id = <<?DESIGN_DOC_PREFIX, _/binary>>} = Doc, _Db, _UpdateType) -> + % Skip design docs + Doc; + +before_doc_update(#doc{deleted = true} = Doc, _Db, _UpdateType) -> + % Skip deleted + Doc; + +before_doc_update(#doc{} = Doc, _Db, replicated_changes) -> + % Skip internal replicator updates + Doc; + +before_doc_update(#doc{} = Doc, _Db, _UpdateType) -> + Body1 = couch_util:json_encode(Doc#doc.body), + Body2 = couch_util:json_decode(Body1, [return_maps]), + validate(Body2), + Doc. + + +validate(#{} = Body) -> + validate_key(<<"by_node">>, Body, ["by_node is mandatory"]), + validate_key(<<"by_range">>, Body, ["by_range is mandatory"]), + ByNode = maps:get(<<"by_node">>, Body), + case is_map(ByNode) of + true -> ok; + false -> throw({forbidden, ["by_node not an object"]}) + end, + ByRange = maps:get(<<"by_range">>, Body), + case is_map(ByRange) of + true -> ok; + false -> throw({forbidden, ["by_range not an object"]}) + end, + % "by_node": { + % "node1@xxx.xxx.xxx.xxx": ["00000000-1fffffff",...] + % ]} + maps:map(fun(Node, Ranges) -> + validate_by_node(Node, Ranges, ByRange) + end, ByNode), + % "by_range": { + % "00000000-1fffffff": ["node1@xxx.xxx.xxx.xxx", ...] + % ]} + maps:map(fun(Range, Nodes) -> + validate_by_range(Range, Nodes, ByNode) + end, ByRange). + + +validate_by_node(Node, Ranges, ByRange) -> + validate_array(Ranges, ["by_node", Ranges, "value not an array"]), + lists:foreach(fun(Range) -> + validate_key(Range, ByRange, ["by_range for", Range, "missing"]), + Nodes = maps:get(Range, ByRange), + validate_member(Node, Nodes, ["by_range for", Range, "missing", Node]) + end, Ranges). + + +validate_by_range(Range, Nodes, ByNode) -> + validate_array(Nodes, ["by_range", Nodes, "value not an array"]), + lists:foreach(fun(Node) -> + validate_key(Node, ByNode, ["by_node for", Node, "missing"]), + Ranges = maps:get(Node, ByNode), + validate_member(Range, Ranges, ["by_node for", Node, "missing", Range]) + end, Nodes). + + +validate_array(Val, _ErrMsg) when is_list(Val) -> + ok; +validate_array(_Val, ErrMsg) -> + throw({forbidden, errmsg(ErrMsg)}). + + +validate_key(Key, #{} = Map, ErrMsg) -> + case maps:is_key(Key, Map) of + true -> ok; + false -> throw({forbidden, errmsg(ErrMsg)}) + end. + + +validate_member(Val, Array, ErrMsg) when is_list(Array) -> + case lists:member(Val, Array) of + true -> ok; + false -> throw({forbidden, errmsg(ErrMsg)}) + end; +validate_member(_Val, _Array, ErrMsg) -> + throw({forbidden, errmsg(ErrMsg)}). + + +errmsg(ErrMsg) when is_list(ErrMsg) -> + list_to_binary(lists:join(" ", ErrMsg)). diff --git a/src/mem3/src/mem3_rep.erl b/src/mem3/src/mem3_rep.erl index 7fa0fc027..2487e6a98 100644 --- a/src/mem3/src/mem3_rep.erl +++ b/src/mem3/src/mem3_rep.erl @@ -788,36 +788,53 @@ reset_remaining(#{} = Targets) -> -ifdef(TEST). -include_lib("eunit/include/eunit.hrl"). +-define(TDEF(A), {atom_to_list(A), fun A/0}). -find_source_seq_unknown_node_test() -> + +find_source_seq_int_test_() -> + { + setup, + fun() -> meck:expect(couch_log, warning, 2, ok) end, + fun(_) -> meck:unload() end, + [ + ?TDEF(t_unknown_node), + ?TDEF(t_unknown_uuid), + ?TDEF(t_ok), + ?TDEF(t_old_ok), + ?TDEF(t_different_node) + ] + }. + + +t_unknown_node() -> ?assertEqual( find_source_seq_int(doc_(), <<"foo">>, <<"bing">>, <<"bar_uuid">>, 10), 0 ). -find_source_seq_unknown_uuid_test() -> +t_unknown_uuid() -> ?assertEqual( find_source_seq_int(doc_(), <<"foo">>, <<"bar">>, <<"teapot">>, 10), 0 ). -find_source_seq_ok_test() -> +t_ok() -> ?assertEqual( find_source_seq_int(doc_(), <<"foo">>, <<"bar">>, <<"bar_uuid">>, 100), 100 ). -find_source_seq_old_ok_test() -> +t_old_ok() -> ?assertEqual( find_source_seq_int(doc_(), <<"foo">>, <<"bar">>, <<"bar_uuid">>, 84), 50 ). -find_source_seq_different_node_test() -> +t_different_node() -> ?assertEqual( find_source_seq_int(doc_(), <<"foo2">>, <<"bar">>, <<"bar_uuid">>, 92), 31 diff --git a/src/mem3/src/mem3_sync_event_listener.erl b/src/mem3/src/mem3_sync_event_listener.erl index cad34225d..5a8d162d2 100644 --- a/src/mem3/src/mem3_sync_event_listener.erl +++ b/src/mem3/src/mem3_sync_event_listener.erl @@ -218,6 +218,10 @@ subscribe_for_config() -> -include_lib("couch/include/couch_eunit.hrl"). setup_all() -> + % couch_log used by config app + ok = meck:expect(couch_log, notice, 2, ok), + ok = meck:expect(couch_log, warning, 2, ok), + application:start(config), ok = meck:new(couch_event, [passthrough]), diff --git a/src/mem3/test/eunit/mem3_bdu_test.erl b/src/mem3/test/eunit/mem3_bdu_test.erl new file mode 100644 index 000000000..ad047f6e9 --- /dev/null +++ b/src/mem3/test/eunit/mem3_bdu_test.erl @@ -0,0 +1,282 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(mem3_bdu_test). + + +-include_lib("couch/include/couch_eunit.hrl"). +-include_lib("couch/include/couch_db.hrl"). + + +-define(TDEF_FE(Name), fun(Arg) -> {atom_to_list(Name), ?_test(Name(Arg))} end). + +-define(USER, "mem3_bdu_test_admin"). +-define(PASS, "pass"). +-define(AUTH, {basic_auth, {?USER, ?PASS}}). +-define(JSON, {"Content-Type", "application/json"}). + + +setup() -> + Hashed = couch_passwords:hash_admin_password(?PASS), + ok = config:set("admins", ?USER, ?b2l(Hashed), _Persist=false), + Addr = config:get("chttpd", "bind_address", "127.0.0.1"), + Db = ?tempdb(), + Port = mochiweb_socket_server:get(chttpd, port), + Url = lists:concat(["http://", Addr, ":", Port, "/"]), + ShardsDb = "_node/_local/" ++ config:get("mem3", "shards_db", "_dbs"), + {Url, Db, ShardsDb}. + + +teardown({Url, Db, _}) -> + sync_delete_db(Url, Db), + ok = config:delete("admins", ?USER, _Persist=false). + + +start_couch() -> + test_util:start_couch([mem3, chttpd]). + + +stop_couch(Ctx) -> + test_util:stop_couch(Ctx). + + +mem3_bdu_shard_doc_test_() -> + { + "mem3 bdu shard doc tests", + { + setup, + fun start_couch/0, fun stop_couch/1, + { + foreach, + fun setup/0, fun teardown/1, + [ + ?TDEF_FE(t_can_insert_shard_map_doc), + ?TDEF_FE(t_missing_by_node_section), + ?TDEF_FE(t_by_node_not_a_map), + ?TDEF_FE(t_missing_by_range_section), + ?TDEF_FE(t_by_range_not_a_map), + ?TDEF_FE(t_missing_range_in_by_range), + ?TDEF_FE(t_missing_node_in_by_range_node_list), + ?TDEF_FE(t_missing_node_in_by_node), + ?TDEF_FE(t_missing_range_in_by_node_range_list), + ?TDEF_FE(t_by_node_val_not_array), + ?TDEF_FE(t_by_range_val_not_array), + ?TDEF_FE(t_design_docs_are_not_validated), + ?TDEF_FE(t_replicated_changes_not_validated) + ] + } + } + }. + + +t_can_insert_shard_map_doc({Top, Db, ShardsDb}) -> + Node = atom_to_binary(node(), utf8), + Range = <<"00000000-ffffffff">>, + ShardMap = #{ + <<"_id">> => Db, + <<"by_node">> => #{Node => [Range]}, + <<"by_range">> => #{Range => [Node]}, + <<"suffix">> => suffix() + }, + {Code, Res} = req(post, Top ++ ShardsDb, ShardMap), + ?assertEqual(201, Code), + ?assertMatch(#{<<"ok">> := true}, Res). + + +t_missing_by_node_section({Top, Db, ShardsDb}) -> + Node = atom_to_binary(node(), utf8), + Range = <<"00000000-ffffffff">>, + ShardMap = #{ + <<"_id">> => Db, + <<"by_range">> => #{Range => [Node]} + }, + ?assertMatch({403, _}, req(post, Top ++ ShardsDb, ShardMap)). + + +t_by_node_not_a_map({Top, Db, ShardsDb}) -> + Node = atom_to_binary(node(), utf8), + Range = <<"00000000-ffffffff">>, + ShardMap = #{ + <<"_id">> => Db, + <<"by_node">> => 42, + <<"by_range">> => #{Range => [Node]} + }, + ?assertMatch({403, _}, req(post, Top ++ ShardsDb, ShardMap)). + + +t_missing_by_range_section({Top, Db, ShardsDb}) -> + Node = atom_to_binary(node(), utf8), + Range = <<"00000000-ffffffff">>, + ShardMap = #{ + <<"_id">> => Db, + <<"by_node">> => #{Node => [Range]} + }, + ?assertMatch({403, _}, req(post, Top ++ ShardsDb, ShardMap)). + + +t_by_range_not_a_map({Top, Db, ShardsDb}) -> + Node = atom_to_binary(node(), utf8), + Range = <<"00000000-ffffffff">>, + ShardMap = #{ + <<"_id">> => Db, + <<"by_node">> => #{Node => [Range]}, + <<"by_range">> => 42 + }, + ?assertMatch({403, _}, req(post, Top ++ ShardsDb, ShardMap)). + + +t_missing_range_in_by_range({Top, Db, ShardsDb}) -> + Node = atom_to_binary(node(), utf8), + Range = <<"00000000-ffffffff">>, + ShardMap = #{ + <<"_id">> => Db, + <<"by_node">> => #{Node => [Range]}, + <<"by_range">> => #{<<"xyz">> => [Node]} + }, + ?assertMatch({403, _}, req(post, Top ++ ShardsDb, ShardMap)). + + +t_missing_node_in_by_range_node_list({Top, Db, ShardsDb}) -> + Node = atom_to_binary(node(), utf8), + Range = <<"00000000-ffffffff">>, + ShardMap = #{ + <<"_id">> => Db, + <<"by_node">> => #{Node => [Range]}, + <<"by_range">> => #{Range => [<<"xyz">>]} + }, + ?assertMatch({403, _}, req(post, Top ++ ShardsDb, ShardMap)). + + +t_missing_node_in_by_node({Top, Db, ShardsDb}) -> + Node = atom_to_binary(node(), utf8), + Range = <<"00000000-ffffffff">>, + ShardMap = #{ + <<"_id">> => Db, + <<"by_node">> => #{<<"xyz">> => [Range]}, + <<"by_range">> => #{Range => [Node]} + }, + ?assertMatch({403, _}, req(post, Top ++ ShardsDb, ShardMap)). + + +t_missing_range_in_by_node_range_list({Top, Db, ShardsDb}) -> + Node = atom_to_binary(node(), utf8), + Range = <<"00000000-ffffffff">>, + ShardMap = #{ + <<"_id">> => Db, + <<"by_node">> => #{Node => [<<"xyz">>]}, + <<"by_range">> => #{Range => [Node]} + }, + ?assertMatch({403, _}, req(post, Top ++ ShardsDb, ShardMap)). + + +t_by_node_val_not_array({Top, Db, ShardsDb}) -> + Node = atom_to_binary(node(), utf8), + Range = <<"00000000-ffffffff">>, + ShardMap = #{ + <<"_id">> => Db, + <<"by_node">> => #{Node => 42}, + <<"by_range">> => #{Range => [Node]} + }, + ?assertMatch({403, _}, req(post, Top ++ ShardsDb, ShardMap)). + + +t_by_range_val_not_array({Top, Db, ShardsDb}) -> + Node = atom_to_binary(node(), utf8), + Range = <<"00000000-ffffffff">>, + ShardMap = #{ + <<"_id">> => Db, + <<"by_node">> => #{Node => [Range]}, + <<"by_range">> => #{Range => 42} + }, + ?assertMatch({403, _}, req(post, Top ++ ShardsDb, ShardMap)). + + +t_design_docs_are_not_validated({Top, _, ShardsDb}) -> + Suffix = integer_to_list(erlang:system_time() + rand:uniform(1000)), + DDocId = list_to_binary("_design/ddoc_bdu_test-" ++ Suffix), + DDoc = #{<<"_id">> => DDocId}, + {Code, Res} = req(post, Top ++ ShardsDb, DDoc), + ?assertEqual(201, Code), + #{<<"rev">> := Rev} = Res, + Deleted = #{ + <<"id">> => DDocId, + <<"_rev">> => Rev, + <<"_deleted">> => true + }, + ?assertMatch({200, _}, req(post, Top ++ ShardsDb, Deleted)). + + +t_replicated_changes_not_validated({Top, Db, ShardsDb}) -> + Node = atom_to_binary(node(), utf8), + Range = <<"00000000-ffffffff">>, + ShardMap = #{ + <<"_id">> => Db, + <<"by_node">> => #{Node => [Range]}, + % missing <<"by_range">>, we can tollerate it + % and not crash the backend + <<"suffix">> => suffix(), + <<"_rev">> => <<"1-abc">>, + <<"_revisions">> => #{ + <<"ids">> => [<<"abc">>], + <<"start">> => 1 + } + }, + Docs = #{ + <<"docs">> => [ShardMap], + <<"new_edits">> => false + }, + {Code, Res} = req(post, Top ++ ShardsDb ++ "/_bulk_docs", Docs), + ?assertEqual(201, Code), + ?assertEqual([], Res), + Deleted = #{ + <<"id">> => Db, + <<"_rev">> => <<"1-abc">>, + <<"_deleted">> => true + }, + ?assertMatch({200, _}, req(post, Top ++ ShardsDb, Deleted)). + + +delete_db(Top, Db) when is_binary(Db) -> + Url = Top ++ binary_to_list(Db), + case test_request:get(Url, [?AUTH]) of + {ok, 404, _, _} -> + not_found; + {ok, 200, _, _} -> + {ok, 200, _, _} = test_request:delete(Url, [?AUTH]), + ok + end. + + +sync_delete_db(Top, Db) when is_binary(Db) -> + delete_db(Top, Db), + try + Shards = mem3:local_shards(Db), + ShardNames = [mem3:name(S) || S <- Shards], + [couch_server:delete(N, [?ADMIN_CTX]) || N <- ShardNames], + ok + catch + error:database_does_not_exist -> + ok + end. + + +req(Method, Url, #{} = Body) -> + req(Method, Url, jiffy:encode(Body)); + +req(Method, Url, Body) -> + Headers = [?JSON, ?AUTH], + {ok, Code, _, Res} = test_request:request(Method, Url, Headers, Body), + {Code, jiffy:decode(Res, [return_maps])}. + + +suffix() -> + integer_to_list(erlang:system_time(second)). diff --git a/src/mem3/test/eunit/mem3_reshard_test.erl b/src/mem3/test/eunit/mem3_reshard_test.erl index 1122590ae..65f2b4bb0 100644 --- a/src/mem3/test/eunit/mem3_reshard_test.erl +++ b/src/mem3/test/eunit/mem3_reshard_test.erl @@ -519,7 +519,6 @@ target_reset_in_initial_copy(#{db1 := Db}) -> split_an_incomplete_shard_map(#{db1 := Db}) -> {timeout, ?TIMEOUT, ?_test(begin - [#shard{} = Src] = lists:sort(mem3:local_shards(Db)), [#shard{name=Shard}] = lists:sort(mem3:local_shards(Db)), meck:expect(mem3_util, calculate_max_n, 1, 0), ?assertMatch({error, {not_enough_shard_copies, _}}, diff --git a/test/elixir/test/basics_test.exs b/test/elixir/test/basics_test.exs index e6fb20938..abc66ca40 100644 --- a/test/elixir/test/basics_test.exs +++ b/test/elixir/test/basics_test.exs @@ -58,6 +58,17 @@ defmodule BasicsTest do assert context[:db_name] in Couch.get("/_all_dbs").body, "Db name in _all_dbs" end + @tag :with_db + test "Limit and skip should work in _all_dbs", context do + db = context[:db_name] + db_count = length(Couch.get("/_all_dbs").body) + assert db_count > 0 + assert Couch.get("/_all_dbs?limit=0").body == [] + assert length(Couch.get("/_all_dbs?limit=1").body) >= 1 + assert length(Couch.get("/_all_dbs?skip=1").body) == (db_count - 1) + assert [db] == Couch.get("/_all_dbs?start_key=\"#{db}\"&limit=1").body + end + test "Database name with '+' should encode to '+'", _context do set_config({"chttpd", "decode_plus_to_space", "false"}) diff --git a/test/elixir/test/config/suite.elixir b/test/elixir/test/config/suite.elixir index cfb32f2b7..2e97553ee 100644 --- a/test/elixir/test/config/suite.elixir +++ b/test/elixir/test/config/suite.elixir @@ -59,6 +59,7 @@ "Database name with '%2B' should encode to '+'", "Database name with '+' should encode to '+'", "Database should be in _all_dbs", + "Limit and skip should work in _all_dbs", "Default headers are returned for doc with open_revs=all", "Empty database should have zero docs", "Make sure you can do a seq=true option", |