% Licensed under the Apache License, Version 2.0 (the "License"); you may not % use this file except in compliance with the License. You may obtain a copy of % the License at % % http://www.apache.org/licenses/LICENSE-2.0 % % Unless required by applicable law or agreed to in writing, software % distributed under the License is distributed on an "AS IS" BASIS, WITHOUT % WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the % License for the specific language governing permissions and limitations under % the License. -module(mem3). -export([ start/0, stop/0, restart/0, nodes/0, node_info/2, shards/1, shards/2, choose_shards/2, n/1, n/2, dbname/1, ushards/1, ushards/2 ]). -export([get_shard/3, local_shards/1, shard_suffix/1, fold_shards/2]). -export([sync_security/0, sync_security/1]). -export([compare_nodelists/0, compare_shards/1]). -export([quorum/1, group_by_proximity/1]). -export([live_shards/2]). -export([belongs/2, owner/3]). -export([get_placement/1]). -export([ping/1, ping/2]). -export([db_is_current/1]). -export([shard_creation_time/1]). -export([generate_shard_suffix/0]). %% For mem3 use only. -export([name/1, node/1, range/1, engine/1]). -include_lib("mem3/include/mem3.hrl"). -include_lib("couch/include/couch_db.hrl"). -define(PING_TIMEOUT_IN_MS, 60000). start() -> application:start(mem3). stop() -> application:stop(mem3). restart() -> stop(), start(). %% @doc Detailed report of cluster-wide membership state. Queries the state %% on all member nodes and builds a dictionary with unique states as the %% key and the nodes holding that state as the value. Also reports member %% nodes which fail to respond and nodes which are connected but are not %% cluster members. Useful for debugging. -spec compare_nodelists() -> [ { {cluster_nodes, [node()]} | bad_nodes | non_member_nodes, [node()] } ]. compare_nodelists() -> Nodes = mem3:nodes(), AllNodes = erlang:nodes([this, visible]), {Replies, BadNodes} = gen_server:multi_call(Nodes, mem3_nodes, get_nodelist), Dict = lists:foldl( fun({Node, Nodelist}, D) -> orddict:append({cluster_nodes, Nodelist}, Node, D) end, orddict:new(), Replies ), [{non_member_nodes, AllNodes -- Nodes}, {bad_nodes, BadNodes} | Dict]. -spec compare_shards(DbName :: iodata()) -> [{bad_nodes | [#shard{}], [node()]}]. compare_shards(DbName) when is_list(DbName) -> compare_shards(list_to_binary(DbName)); compare_shards(DbName) -> Nodes = mem3:nodes(), {Replies, BadNodes} = rpc:multicall(mem3, shards, [DbName]), GoodNodes = [N || N <- Nodes, not lists:member(N, BadNodes)], Dict = lists:foldl( fun({Shards, Node}, D) -> orddict:append(Shards, Node, D) end, orddict:new(), lists:zip(Replies, GoodNodes) ), [{bad_nodes, BadNodes} | Dict]. -spec n(DbName :: iodata()) -> integer(). n(DbName) -> % Use _design to avoid issues with % partition validation n(DbName, <<"_design/foo">>). n(DbName, DocId) -> length(mem3:shards(DbName, DocId)). -spec nodes() -> [node()]. nodes() -> mem3_nodes:get_nodelist(). node_info(Node, Key) -> mem3_nodes:get_node_info(Node, Key). -spec shards(DbName :: iodata()) -> [#shard{}]. shards(DbName) -> shards_int(DbName, []). shards_int(DbName, Options) when is_list(DbName) -> shards_int(list_to_binary(DbName), Options); shards_int(DbName, Options) -> Ordered = lists:member(ordered, Options), ShardDbName = list_to_binary(config:get("mem3", "shards_db", "_dbs")), case DbName of ShardDbName when Ordered -> %% shard_db is treated as a single sharded db to support calls to db_info %% and view_all_docs [ #ordered_shard{ node = node(), name = ShardDbName, dbname = ShardDbName, range = [0, (2 bsl 31) - 1], order = undefined, opts = [] } ]; ShardDbName -> %% shard_db is treated as a single sharded db to support calls to db_info %% and view_all_docs [ #shard{ node = node(), name = ShardDbName, dbname = ShardDbName, range = [0, (2 bsl 31) - 1], opts = [] } ]; _ -> mem3_shards:for_db(DbName, Options) end. -spec shards(DbName :: iodata(), DocId :: binary()) -> [#shard{}]. shards(DbName, DocId) -> shards_int(DbName, DocId, []). shards_int(DbName, DocId, Options) when is_list(DbName) -> shards_int(list_to_binary(DbName), DocId, Options); shards_int(DbName, DocId, Options) when is_list(DocId) -> shards_int(DbName, list_to_binary(DocId), Options); shards_int(DbName, DocId, Options) -> mem3_shards:for_docid(DbName, DocId, Options). -spec ushards(DbName :: iodata()) -> [#shard{}]. ushards(DbName) -> Nodes = [node() | erlang:nodes()], ZoneMap = zone_map(Nodes), Shards = ushards(DbName, live_shards(DbName, Nodes, [ordered]), ZoneMap), mem3_util:downcast(Shards). -spec ushards(DbName :: iodata(), DocId :: binary()) -> [#shard{}]. ushards(DbName, DocId) -> Shards = shards_int(DbName, DocId, [ordered]), Shard = hd(Shards), mem3_util:downcast([Shard]). ushards(DbName, Shards0, ZoneMap) -> {L, S, D} = group_by_proximity(Shards0, ZoneMap), % Prefer shards in the local zone over shards in a different zone, % but sort each zone separately to ensure a consistent choice between % nodes in the same zone. Shards = choose_ushards(DbName, L ++ S) ++ choose_ushards(DbName, D), OverlappedShards = lists:ukeysort(#shard.range, Shards), mem3_util:non_overlapping_shards(OverlappedShards). get_shard(DbName, Node, Range) -> mem3_shards:get(DbName, Node, Range). local_shards(DbName) -> mem3_shards:local(DbName). shard_suffix(DbName0) when is_binary(DbName0) -> Shard = hd(shards(DbName0)), <<"shards/", _:8/binary, "-", _:8/binary, "/", DbName/binary>> = Shard#shard.name, filename:extension(binary_to_list(DbName)); shard_suffix(Db) -> shard_suffix(couch_db:name(Db)). shard_creation_time(DbName0) -> Shard = hd(shards(DbName0)), case Shard#shard.name of <<"shards/", _:8/binary, "-", _:8/binary, "/", DbName/binary>> -> case filename:extension(DbName) of <<".", Time/binary>> -> Time; _ -> <<"0">> end; _ -> <<"0">> end. fold_shards(Fun, Acc) -> mem3_shards:fold(Fun, Acc). sync_security() -> mem3_sync_security:go(). sync_security(Db) -> mem3_sync_security:go(dbname(Db)). -spec choose_shards(DbName :: iodata(), Options :: list()) -> [#shard{}]. choose_shards(DbName, Options) when is_list(DbName) -> choose_shards(list_to_binary(DbName), Options); choose_shards(DbName, Options) -> try shards(DbName) catch error:E when E == database_does_not_exist; E == badarg -> Nodes = allowed_nodes(), case get_placement(Options) of undefined -> choose_shards(DbName, Nodes, Options); Placement -> lists:flatmap( fun({Zone, N}) -> NodesInZone = nodes_in_zone(Nodes, Zone), Options1 = lists:keymerge(1, [{n, N}], Options), choose_shards(DbName, NodesInZone, Options1) end, Placement ) end end. choose_shards(DbName, Nodes, Options) -> NodeCount = length(Nodes), Suffix = couch_util:get_value(shard_suffix, Options, ""), N = mem3_util:n_val(couch_util:get_value(n, Options), NodeCount), if N =:= 0 -> erlang:error(no_nodes_in_zone); true -> ok end, Q = mem3_util:q_val( couch_util:get_value( q, Options, config:get_integer("cluster", "q", 2) ) ), %% rotate to a random entry in the nodelist for even distribution RotatedNodes = rotate_rand(Nodes), mem3_util:create_partition_map(DbName, N, Q, RotatedNodes, Suffix). rotate_rand(Nodes) -> {A, B} = lists:split(couch_rand:uniform(length(Nodes)), Nodes), B ++ A. get_placement(Options) -> case couch_util:get_value(placement, Options) of undefined -> case config:get("cluster", "placement") of undefined -> undefined; PlacementStr -> decode_placement_string(PlacementStr) end; PlacementStr -> decode_placement_string(PlacementStr) end. decode_placement_string(PlacementStr) -> [ begin [Zone, N] = string:tokens(Rule, ":"), {list_to_binary(Zone), list_to_integer(N)} end || Rule <- string:tokens(PlacementStr, ",") ]. -spec dbname(#shard{} | iodata()) -> binary(). dbname(#shard{dbname = DbName}) -> DbName; dbname(<<"shards/", _:8/binary, "-", _:8/binary, "/", DbName/binary>>) -> strip_shard_suffix(DbName); dbname(DbName) when is_list(DbName) -> dbname(list_to_binary(DbName)); dbname(DbName) when is_binary(DbName) -> DbName; dbname(_) -> erlang:error(badarg). %% @doc Determine if DocId belongs in shard (identified by record or filename) belongs(#shard{} = Shard, DocId) when is_binary(DocId) -> [Begin, End] = range(Shard), belongs(Begin, End, Shard, DocId); belongs(<<"shards/", _/binary>> = ShardName, DocId) when is_binary(DocId) -> [Begin, End] = range(ShardName), belongs(Begin, End, ShardName, DocId); belongs(DbName, DocId) when is_binary(DbName), is_binary(DocId) -> true. belongs(Begin, End, Shard, DocId) -> HashKey = mem3_hash:calculate(Shard, DocId), Begin =< HashKey andalso HashKey =< End. range(#shard{range = Range}) -> Range; range(#ordered_shard{range = Range}) -> Range; range(<<"shards/", Start:8/binary, "-", End:8/binary, "/", _/binary>>) -> [ binary_to_integer(Start, 16), binary_to_integer(End, 16) ]. allowed_nodes() -> lists:filter( fun(Node) -> Decom = mem3:node_info(Node, <<"decom">>), (Decom =/= true) andalso (Decom =/= <<"true">>) end, mem3:nodes() ). nodes_in_zone(Nodes, Zone) -> [Node || Node <- Nodes, Zone == mem3:node_info(Node, <<"zone">>)]. live_shards(DbName, Nodes) -> live_shards(DbName, Nodes, []). live_shards(DbName, Nodes, Options) -> [S || S <- shards_int(DbName, Options), lists:member(mem3:node(S), Nodes)]. zone_map(Nodes) -> [{Node, node_info(Node, <<"zone">>)} || Node <- Nodes]. group_by_proximity(Shards) -> Nodes = [mem3:node(S) || S <- lists:ukeysort(#shard.node, Shards)], group_by_proximity(Shards, zone_map(Nodes)). group_by_proximity(Shards, ZoneMap) -> {Local, Remote} = lists:partition( fun(S) -> mem3:node(S) =:= node() end, Shards ), LocalZone = proplists:get_value(node(), ZoneMap), Fun = fun(S) -> proplists:get_value(mem3:node(S), ZoneMap) =:= LocalZone end, {SameZone, DifferentZone} = lists:partition(Fun, Remote), {Local, SameZone, DifferentZone}. choose_ushards(DbName, Shards) -> Groups0 = group_by_range(Shards), Groups1 = [ mem3_util:rotate_list({DbName, R}, order_shards(G)) || {R, G} <- Groups0 ], [hd(G) || G <- Groups1]. order_shards([#ordered_shard{} | _] = OrderedShards) -> lists:keysort(#ordered_shard.order, OrderedShards); order_shards(UnorderedShards) -> UnorderedShards. group_by_range(Shards) -> lists:foldl( fun(Shard, Dict) -> orddict:append(mem3:range(Shard), Shard, Dict) end, orddict:new(), Shards ). % quorum functions quorum(DbName) when is_binary(DbName) -> n(DbName) div 2 + 1; quorum(Db) -> quorum(couch_db:name(Db)). node(#shard{node = Node}) -> Node; node(#ordered_shard{node = Node}) -> Node. name(#shard{name = Name}) -> Name; name(#ordered_shard{name = Name}) -> Name. % Direct calculation of node membership. This is the algorithm part. It % doesn't read the shard map, just picks owner based on a hash. -spec owner(binary(), binary(), [node()]) -> node(). owner(DbName, DocId, Nodes) -> hd(mem3_util:rotate_list({DbName, DocId}, lists:usort(Nodes))). engine(#shard{opts = Opts}) -> engine(Opts); engine(#ordered_shard{opts = Opts}) -> engine(Opts); engine(Opts) when is_list(Opts) -> case couch_util:get_value(engine, Opts) of Engine when is_binary(Engine) -> [{engine, Engine}]; _ -> [] end. %% Check whether a node is up or down %% side effect: set up a connection to Node if there not yet is one. -spec ping(Node :: atom()) -> pong | pang. ping(Node) -> ping(Node, ?PING_TIMEOUT_IN_MS). -spec ping(Node :: atom(), Timeout :: pos_integer()) -> pong | pang. ping(Node, Timeout) when is_atom(Node) -> %% The implementation of the function is copied from %% lib/kernel/src/net_adm.erl with addition of a Timeout case catch gen:call( {net_kernel, Node}, '$gen_call', {is_auth, node()}, Timeout ) of {ok, yes} -> pong; _ -> erlang:disconnect_node(Node), pang end. db_is_current(#shard{name = Name}) -> db_is_current(Name); db_is_current(<<"shards/", _/binary>> = Name) -> try Shards = mem3:shards(mem3:dbname(Name)), lists:keyfind(Name, #shard.name, Shards) =/= false catch error:database_does_not_exist -> false end; db_is_current(Name) when is_binary(Name) -> % This accounts for local (non-sharded) dbs, and is mostly % for unit tests that either test or use mem3_rep logic couch_server:exists(Name). generate_shard_suffix() -> UnixSeconds = os:system_time(second), "." ++ integer_to_list(UnixSeconds). strip_shard_suffix(DbName) when is_binary(DbName) -> % length(".1684269710") = 11. On 2286-11-20 the timestamp would flip to 11 % digits so we'd have to increase length to 12 then. case DbName of <> -> try _ = binary_to_integer(Ts), Prefix catch error:badarg -> filename:rootname(DbName) end; _ -> filename:rootname(DbName) end. -ifdef(TEST). -include_lib("eunit/include/eunit.hrl"). -define(ALLOWED_NODE, 'node1@127.0.0.1'). allowed_nodes_test_() -> {"allowed_nodes test", [ { setup, fun() -> Props = [ {?ALLOWED_NODE, []}, {'node2@127.0.0.1', [{<<"decom">>, <<"true">>}]}, {'node3@127.0.0.1', [{<<"decom">>, true}]} ], ok = meck:expect( mem3_nodes, get_nodelist, fun() -> proplists:get_keys(Props) end ), ok = meck:expect( mem3_nodes, get_node_info, fun(Node, Key) -> couch_util:get_value(Key, proplists:get_value(Node, Props)) end ) end, fun(_) -> meck:unload() end, [ ?_assertMatch([?ALLOWED_NODE], allowed_nodes()) ] } ]}. rotate_rand_degenerate_test() -> ?assertEqual([1], rotate_rand([1])). rotate_rand_distribution_test() -> Cases = [rotate_rand([1, 2, 3]) || _ <- lists:seq(1, 100)], ?assertEqual(3, length(lists:usort(Cases))). strip_shard_suffix_test_() -> Prefix = <<"shards/c0000000-ffffffff/">>, [ {DbName, ?_assertEqual(Res, dbname(<>))} || {Res, DbName} <- [ {<<"foo">>, <<"foo.1684269710">>}, {<<"foo">>, <<"foo.168426971z">>}, {<<"foo/bar">>, <<"foo/bar.1684269710">>}, {<<"foo">>, <<"foo.1">>}, {<<"foo">>, <<"foo.abc">>}, {<<"foo">>, <<"foo.1111111111111111">>}, {<<"">>, <<"">>}, {<<"/">>, <<"/">>}, {<<"//">>, <<"//">>}, {<<"/.foo">>, <<"/.foo">>}, {<<".">>, <<"..">>}, {<<".">>, <<"..foo">>} ] ]. shard_suffix_test() -> % Assert a few basic things about the db suffixes we generate. If we change % this scheme make sure to update strip_shard_suffix/1 and other places % which assume the suffix is a 10 digit unix timestamp. Suffix = generate_shard_suffix(), ?assertEqual($., hd(Suffix)), ?assertEqual(11, length(Suffix)), [$. | Timestamp] = Suffix, ?assert(is_integer(list_to_integer(Timestamp))). -endif.