summaryrefslogtreecommitdiff
path: root/src/fabric/src/fabric_util.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/fabric/src/fabric_util.erl')
-rw-r--r--src/fabric/src/fabric_util.erl477
1 files changed, 0 insertions, 477 deletions
diff --git a/src/fabric/src/fabric_util.erl b/src/fabric/src/fabric_util.erl
deleted file mode 100644
index 30e82c29a..000000000
--- a/src/fabric/src/fabric_util.erl
+++ /dev/null
@@ -1,477 +0,0 @@
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-%
-% http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
-
--module(fabric_util).
-
--export([
- submit_jobs/3, submit_jobs/4,
- cleanup/1,
- recv/4,
- get_db/1, get_db/2,
- error_info/1,
- update_counter/3,
- remove_ancestors/2,
- create_monitors/1,
- kv/2,
- remove_down_workers/2, remove_down_workers/3,
- doc_id_and_rev/1
-]).
--export([request_timeout/0, attachments_timeout/0, all_docs_timeout/0, view_timeout/1]).
--export([log_timeout/2, remove_done_workers/2]).
--export([is_users_db/1, is_replicator_db/1]).
--export([open_cluster_db/1, open_cluster_db/2]).
--export([is_partitioned/1]).
--export([validate_all_docs_args/2, validate_args/3]).
--export([upgrade_mrargs/1]).
--export([worker_ranges/1]).
--export([get_uuid_prefix_len/0]).
--export([isolate/1, isolate/2]).
-
--compile({inline, [{doc_id_and_rev, 1}]}).
-
--include_lib("fabric/include/fabric.hrl").
--include_lib("mem3/include/mem3.hrl").
--include_lib("couch/include/couch_db.hrl").
--include_lib("couch_mrview/include/couch_mrview.hrl").
--include_lib("eunit/include/eunit.hrl").
-
-remove_down_workers(Workers, BadNode) ->
- remove_down_workers(Workers, BadNode, []).
-
-remove_down_workers(Workers, BadNode, RingOpts) ->
- Filter = fun(#shard{node = Node}, _) -> Node =/= BadNode end,
- NewWorkers = fabric_dict:filter(Filter, Workers),
- case fabric_ring:is_progress_possible(NewWorkers, RingOpts) of
- true ->
- {ok, NewWorkers};
- false ->
- error
- end.
-
-submit_jobs(Shards, EndPoint, ExtraArgs) ->
- submit_jobs(Shards, fabric_rpc, EndPoint, ExtraArgs).
-
-submit_jobs(Shards, Module, EndPoint, ExtraArgs) ->
- lists:map(
- fun(#shard{node = Node, name = ShardName} = Shard) ->
- Ref = rexi:cast(Node, {Module, EndPoint, [ShardName | ExtraArgs]}),
- Shard#shard{ref = Ref}
- end,
- Shards
- ).
-
-cleanup(Workers) ->
- rexi:kill_all([{Node, Ref} || #shard{node = Node, ref = Ref} <- Workers]).
-
-recv(Workers, Keypos, Fun, Acc0) ->
- rexi_utils:recv(Workers, Keypos, Fun, Acc0, request_timeout(), infinity).
-
-request_timeout() ->
- timeout("request", "60000").
-
-all_docs_timeout() ->
- timeout("all_docs", "10000").
-
-attachments_timeout() ->
- timeout("attachments", "600000").
-
-view_timeout(Args) ->
- PartitionQuery = couch_mrview_util:get_extra(Args, partition, false),
- case PartitionQuery of
- false -> timeout("view", "infinity");
- _ -> timeout("partition_view", "infinity")
- end.
-
-timeout(Type, Default) ->
- case config:get("fabric", Type ++ "_timeout", Default) of
- "infinity" -> infinity;
- N -> list_to_integer(N)
- end.
-
-log_timeout(Workers, EndPoint) ->
- CounterKey = [fabric, worker, timeouts],
- couch_stats:increment_counter(CounterKey),
- lists:map(
- fun(#shard{node = Dest, name = Name}) ->
- Fmt = "fabric_worker_timeout ~s,~p,~p",
- couch_log:error(Fmt, [EndPoint, Dest, Name])
- end,
- Workers
- ).
-
-remove_done_workers(Workers, WaitingIndicator) ->
- [W || {W, WI} <- fabric_dict:to_list(Workers), WI == WaitingIndicator].
-
-get_db(DbName) ->
- get_db(DbName, []).
-
-get_db(DbName, Options) ->
- {Local, SameZone, DifferentZone} = mem3:group_by_proximity(mem3:shards(DbName)),
- % Prefer shards on the same node over other nodes, prefer shards in the same zone over
- % over zones and sort each remote list by name so that we don't repeatedly try the same node.
- Shards =
- Local ++ lists:keysort(#shard.name, SameZone) ++ lists:keysort(#shard.name, DifferentZone),
- % suppress shards from down nodes
- Nodes = [node() | erlang:nodes()],
- Live = [S || #shard{node = N} = S <- Shards, lists:member(N, Nodes)],
- % Only accept factors > 1, otherwise our math breaks further down
- Factor = max(2, config:get_integer("fabric", "shard_timeout_factor", 2)),
- MinTimeout = config:get_integer("fabric", "shard_timeout_min_msec", 100),
- MaxTimeout = request_timeout(),
- Timeout = get_db_timeout(length(Live), Factor, MinTimeout, MaxTimeout),
- get_shard(Live, Options, Timeout, Factor).
-
-get_shard([], _Opts, _Timeout, _Factor) ->
- erlang:error({internal_server_error, "No DB shards could be opened."});
-get_shard([#shard{node = Node, name = Name} | Rest], Opts, Timeout, Factor) ->
- Mon = rexi_monitor:start([rexi_utils:server_pid(Node)]),
- MFA = {fabric_rpc, open_shard, [Name, [{timeout, Timeout} | Opts]]},
- Ref = rexi:cast(Node, self(), MFA, [sync]),
- try
- receive
- {Ref, {ok, Db}} ->
- {ok, Db};
- {Ref, {'rexi_EXIT', {{unauthorized, _} = Error, _}}} ->
- throw(Error);
- {Ref, {'rexi_EXIT', {{forbidden, _} = Error, _}}} ->
- throw(Error);
- {Ref, Reason} ->
- couch_log:debug("Failed to open shard ~p because: ~p", [Name, Reason]),
- get_shard(Rest, Opts, Timeout, Factor)
- after Timeout ->
- couch_log:debug("Failed to open shard ~p after: ~p", [Name, Timeout]),
- get_shard(Rest, Opts, Factor * Timeout, Factor)
- end
- after
- rexi_monitor:stop(Mon)
- end.
-
-get_db_timeout(N, Factor, MinTimeout, infinity) ->
- % MaxTimeout may be infinity so we just use the largest Erlang small int to
- % avoid blowing up the arithmetic
- get_db_timeout(N, Factor, MinTimeout, 1 bsl 59);
-get_db_timeout(N, Factor, MinTimeout, MaxTimeout) ->
- %
- % The progression of timeouts forms a geometric series:
- %
- % MaxTimeout = T + T*F + T*F^2 + T*F^3 ...
- %
- % Where T is the initial timeout and F is the factor. The formula for
- % the sum is:
- %
- % Sum[T * F^I, I <- 0..N] = T * (1 - F^(N + 1)) / (1 - F)
- %
- % Then, for a given sum and factor we can calculate the initial timeout T:
- %
- % T = Sum / ((1 - F^(N+1)) / (1 - F))
- %
- Timeout = MaxTimeout / ((1 - math:pow(Factor, N + 1)) / (1 - Factor)),
- % Apply a minimum timeout value
- max(MinTimeout, trunc(Timeout)).
-
-error_info({{timeout, _} = Error, _Stack}) ->
- Error;
-error_info({{Error, Reason}, Stack}) ->
- {Error, Reason, Stack};
-error_info({Error, Stack}) ->
- {Error, nil, Stack}.
-
-update_counter(Item, Incr, D) ->
- UpdateFun = fun({Old, Count}) -> {Old, Count + Incr} end,
- orddict:update(make_key(Item), UpdateFun, {Item, Incr}, D).
-
-make_key({ok, L}) when is_list(L) ->
- make_key(L);
-make_key([]) ->
- [];
-make_key([{ok, #doc{revs = {Pos, [RevId | _]}}} | Rest]) ->
- [{ok, {Pos, RevId}} | make_key(Rest)];
-make_key([{{not_found, missing}, Rev} | Rest]) ->
- [{not_found, Rev} | make_key(Rest)];
-make_key({ok, #doc{id = Id, revs = Revs}}) ->
- {Id, Revs};
-make_key(Else) ->
- Else.
-
-% this presumes the incoming list is sorted, i.e. shorter revlists come first
-remove_ancestors([], Acc) ->
- lists:reverse(Acc);
-remove_ancestors([{_, {{not_found, _}, Count}} = Head | Tail], Acc) ->
- % any document is a descendant
- case
- lists:filter(
- fun
- ({_, {{ok, #doc{}}, _}}) -> true;
- (_) -> false
- end,
- Tail
- )
- of
- [{_, {{ok, #doc{}} = Descendant, _}} | _] ->
- remove_ancestors(update_counter(Descendant, Count, Tail), Acc);
- [] ->
- remove_ancestors(Tail, [Head | Acc])
- end;
-remove_ancestors([{_, {{ok, #doc{revs = {Pos, Revs}}}, Count}} = Head | Tail], Acc) ->
- Descendants = lists:dropwhile(
- fun({_, {{ok, #doc{revs = {Pos2, Revs2}}}, _}}) ->
- case lists:nthtail(erlang:min(Pos2 - Pos, length(Revs2)), Revs2) of
- [] ->
- % impossible to tell if Revs2 is a descendant - assume no
- true;
- History ->
- % if Revs2 is a descendant, History is a prefix of Revs
- not lists:prefix(History, Revs)
- end
- end,
- Tail
- ),
- case Descendants of
- [] ->
- remove_ancestors(Tail, [Head | Acc]);
- [{Descendant, _} | _] ->
- remove_ancestors(update_counter(Descendant, Count, Tail), Acc)
- end;
-remove_ancestors([Error | Tail], Acc) ->
- remove_ancestors(Tail, [Error | Acc]).
-
-create_monitors(Shards) ->
- MonRefs = lists:usort([rexi_utils:server_pid(N) || #shard{node = N} <- Shards]),
- rexi_monitor:start(MonRefs).
-
-%% verify only id and rev are used in key.
-update_counter_test() ->
- Reply =
- {ok, #doc{
- id = <<"id">>,
- revs = <<"rev">>,
- body = <<"body">>,
- atts = <<"atts">>
- }},
- ?assertEqual(
- [{{<<"id">>, <<"rev">>}, {Reply, 1}}],
- update_counter(Reply, 1, [])
- ).
-
-remove_ancestors_test() ->
- Foo1 = {ok, #doc{revs = {1, [<<"foo">>]}}},
- Foo2 = {ok, #doc{revs = {2, [<<"foo2">>, <<"foo">>]}}},
- Bar1 = {ok, #doc{revs = {1, [<<"bar">>]}}},
- Bar2 = {not_found, {1, <<"bar">>}},
- ?assertEqual(
- [kv(Bar1, 1), kv(Foo1, 1)],
- remove_ancestors([kv(Bar1, 1), kv(Foo1, 1)], [])
- ),
- ?assertEqual(
- [kv(Bar1, 1), kv(Foo2, 2)],
- remove_ancestors([kv(Bar1, 1), kv(Foo1, 1), kv(Foo2, 1)], [])
- ),
- ?assertEqual(
- [kv(Bar1, 2)],
- remove_ancestors([kv(Bar2, 1), kv(Bar1, 1)], [])
- ).
-
-is_replicator_db(DbName) ->
- path_ends_with(DbName, <<"_replicator">>).
-
-is_users_db(DbName) ->
- ConfigName = list_to_binary(
- config:get(
- "chttpd_auth", "authentication_db", "_users"
- )
- ),
- DbName == ConfigName orelse path_ends_with(DbName, <<"_users">>).
-
-path_ends_with(Path, Suffix) ->
- Suffix =:= couch_db:dbname_suffix(Path).
-
-open_cluster_db(#shard{dbname = DbName, opts = Options}) ->
- case couch_util:get_value(props, Options) of
- Props when is_list(Props) ->
- {ok, Db} = couch_db:clustered_db(DbName, [{props, Props}]),
- Db;
- _ ->
- {ok, Db} = couch_db:clustered_db(DbName, []),
- Db
- end.
-
-open_cluster_db(DbName, Opts) ->
- % as admin
- {SecProps} = fabric:get_security(DbName),
- UserCtx = couch_util:get_value(user_ctx, Opts, #user_ctx{}),
- {ok, Db} = couch_db:clustered_db(DbName, UserCtx, SecProps),
- Db.
-
-%% test function
-kv(Item, Count) ->
- {make_key(Item), {Item, Count}}.
-
-doc_id_and_rev(#doc{id = DocId, revs = {RevNum, [RevHash | _]}}) ->
- {DocId, {RevNum, RevHash}}.
-
-is_partitioned(DbName0) when is_binary(DbName0) ->
- Shards = mem3:shards(fabric:dbname(DbName0)),
- is_partitioned(open_cluster_db(hd(Shards)));
-is_partitioned(Db) ->
- couch_db:is_partitioned(Db).
-
-validate_all_docs_args(DbName, Args) when is_binary(DbName) ->
- Shards = mem3:shards(fabric:dbname(DbName)),
- Db = open_cluster_db(hd(Shards)),
- validate_all_docs_args(Db, Args);
-validate_all_docs_args(Db, Args) ->
- true = couch_db:is_clustered(Db),
- couch_mrview_util:validate_all_docs_args(Db, Args).
-
-validate_args(DbName, DDoc, Args) when is_binary(DbName) ->
- Shards = mem3:shards(fabric:dbname(DbName)),
- Db = open_cluster_db(hd(Shards)),
- validate_args(Db, DDoc, Args);
-validate_args(Db, DDoc, Args) ->
- true = couch_db:is_clustered(Db),
- couch_mrview_util:validate_args(Db, DDoc, Args).
-
-upgrade_mrargs(#mrargs{} = Args) ->
- Args;
-upgrade_mrargs(
- {mrargs, ViewType, Reduce, PreflightFun, StartKey, StartKeyDocId, EndKey, EndKeyDocId, Keys,
- Direction, Limit, Skip, GroupLevel, Group, Stale, MultiGet, InclusiveEnd, IncludeDocs,
- DocOptions, UpdateSeq, Conflicts, Callback, Sorted, Extra}
-) ->
- {Stable, Update} =
- case Stale of
- ok -> {true, false};
- update_after -> {true, lazy};
- _ -> {false, true}
- end,
- #mrargs{
- view_type = ViewType,
- reduce = Reduce,
- preflight_fun = PreflightFun,
- start_key = StartKey,
- start_key_docid = StartKeyDocId,
- end_key = EndKey,
- end_key_docid = EndKeyDocId,
- keys = Keys,
- direction = Direction,
- limit = Limit,
- skip = Skip,
- group_level = GroupLevel,
- group = Group,
- stable = Stable,
- update = Update,
- multi_get = MultiGet,
- inclusive_end = InclusiveEnd,
- include_docs = IncludeDocs,
- doc_options = DocOptions,
- update_seq = UpdateSeq,
- conflicts = Conflicts,
- callback = Callback,
- sorted = Sorted,
- extra = Extra
- }.
-
-worker_ranges(Workers) ->
- Ranges = fabric_dict:fold(
- fun(#shard{range = [X, Y]}, _, Acc) ->
- [{X, Y} | Acc]
- end,
- [],
- Workers
- ),
- lists:usort(Ranges).
-
-get_uuid_prefix_len() ->
- config:get_integer("fabric", "uuid_prefix_len", 7).
-
-% If we issue multiple fabric calls from the same process we have to isolate
-% them so in case of error they don't pollute the processes dictionary or the
-% mailbox
-
-isolate(Fun) ->
- isolate(Fun, infinity).
-
-isolate(Fun, Timeout) ->
- {Pid, Ref} = erlang:spawn_monitor(fun() -> exit(do_isolate(Fun)) end),
- receive
- {'DOWN', Ref, _, _, {'$isolres', Res}} ->
- Res;
- {'DOWN', Ref, _, _, {'$isolerr', Tag, Reason, Stack}} ->
- erlang:raise(Tag, Reason, Stack)
- after Timeout ->
- erlang:demonitor(Ref, [flush]),
- exit(Pid, kill),
- erlang:error(timeout)
- end.
-
-% OTP_RELEASE is defined in OTP 21+ only
--ifdef(OTP_RELEASE).
-
-do_isolate(Fun) ->
- try
- {'$isolres', Fun()}
- catch
- Tag:Reason:Stack ->
- {'$isolerr', Tag, Reason, Stack}
- end.
-
--else.
-
-do_isolate(Fun) ->
- try
- {'$isolres', Fun()}
- catch ?STACKTRACE(Tag, Reason, Stack)
- {'$isolerr', Tag, Reason, Stack}
- end.
-
--endif.
-
-get_db_timeout_test() ->
- % Q=1, N=1
- ?assertEqual(20000, get_db_timeout(1, 2, 100, 60000)),
-
- % Q=2, N=1
- ?assertEqual(8571, get_db_timeout(2, 2, 100, 60000)),
-
- % Q=2, N=3 (default)
- ?assertEqual(472, get_db_timeout(2 * 3, 2, 100, 60000)),
-
- % Q=3, N=3
- ?assertEqual(100, get_db_timeout(3 * 3, 2, 100, 60000)),
-
- % Q=4, N=1
- ?assertEqual(1935, get_db_timeout(4, 2, 100, 60000)),
-
- % Q=8, N=1
- ?assertEqual(117, get_db_timeout(8, 2, 100, 60000)),
-
- % Q=8, N=3 (default in 2.x)
- ?assertEqual(100, get_db_timeout(8 * 3, 2, 100, 60000)),
-
- % Q=256, N=3
- ?assertEqual(100, get_db_timeout(256 * 3, 2, 100, 60000)),
-
- % Large factor = 100
- ?assertEqual(100, get_db_timeout(2 * 3, 100, 100, 60000)),
-
- % Small total request timeout = 1 sec
- ?assertEqual(100, get_db_timeout(2 * 3, 2, 100, 1000)),
-
- % Large total request timeout
- ?assertEqual(28346, get_db_timeout(2 * 3, 2, 100, 3600000)),
-
- % No shards at all
- ?assertEqual(60000, get_db_timeout(0, 2, 100, 60000)),
-
- % request_timeout was set to infinity, with enough shards it still gets to
- % 100 min timeout at the start from the exponential logic
- ?assertEqual(100, get_db_timeout(64, 2, 100, infinity)).