summaryrefslogtreecommitdiff
path: root/src/rabbit_misc.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/rabbit_misc.erl')
-rw-r--r--src/rabbit_misc.erl186
1 files changed, 135 insertions, 51 deletions
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index 7d916797..3bbfb1d7 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -25,7 +25,7 @@
protocol_error/3, protocol_error/4, protocol_error/1]).
-export([not_found/1, assert_args_equivalence/4]).
-export([dirty_read/1]).
--export([table_lookup/2]).
+-export([table_lookup/2, set_table_value/4]).
-export([r/3, r/2, r_arg/4, rs/1]).
-export([enable_cover/0, report_cover/0]).
-export([enable_cover/1, report_cover/1]).
@@ -38,9 +38,9 @@
-export([ensure_ok/2]).
-export([makenode/1, nodeparts/1, cookie_hash/0, tcp_name/3]).
-export([upmap/2, map_in_order/2]).
--export([table_fold/3]).
+-export([table_filter/3]).
-export([dirty_read_all/1, dirty_foreach_key/2, dirty_dump_log/1]).
--export([read_term_file/1, write_term_file/2]).
+-export([read_term_file/1, write_term_file/2, write_file/2, write_file/3]).
-export([append_file/2, ensure_parent_dirs_exist/1]).
-export([format_stderr/2]).
-export([start_applications/1, stop_applications/1]).
@@ -48,24 +48,25 @@
-export([sort_field_table/1]).
-export([pid_to_string/1, string_to_pid/1]).
-export([version_compare/2, version_compare/3]).
--export([recursive_delete/1, recursive_copy/2, dict_cons/3, orddict_cons/3,
- unlink_and_capture_exit/1]).
+-export([recursive_delete/1, recursive_copy/2, dict_cons/3, orddict_cons/3]).
-export([get_options/2]).
-export([all_module_attributes/1, build_acyclic_graph/3]).
-export([now_ms/0]).
-export([lock_file/1]).
--export([const_ok/1, const/1]).
+-export([const_ok/0, const/1]).
-export([ntoa/1, ntoab/1]).
+-export([is_process_alive/1]).
+-export([pget/2, pget/3, pget_or_die/2]).
+-export([format_message_queue/2]).
%%----------------------------------------------------------------------------
-ifdef(use_specs).
--export_type([resource_name/0, thunk/1, const/1]).
+-export_type([resource_name/0, thunk/1]).
-type(ok_or_error() :: rabbit_types:ok_or_error(any())).
-type(thunk(T) :: fun(() -> T)).
--type(const(T) :: fun((any()) -> T)).
-type(resource_name() :: binary()).
-type(optdef() :: {flag, string()} | {option, string(), any()}).
-type(channel_or_connection_exit()
@@ -104,7 +105,12 @@
({atom(), any()}) -> rabbit_types:ok_or_error2(any(), 'not_found')).
-spec(table_lookup/2 ::
(rabbit_framing:amqp_table(), binary())
- -> 'undefined' | {rabbit_framing:amqp_field_type(), any()}).
+ -> 'undefined' | {rabbit_framing:amqp_field_type(), any()}).
+-spec(set_table_value/4 ::
+ (rabbit_framing:amqp_table(), binary(),
+ rabbit_framing:amqp_field_type(), rabbit_framing:amqp_value())
+ -> rabbit_framing:amqp_table()).
+
-spec(r/2 :: (rabbit_types:vhost(), K)
-> rabbit_types:r3(rabbit_types:vhost(), K, '_')
when is_subtype(K, atom())).
@@ -145,7 +151,8 @@
-> atom()).
-spec(upmap/2 :: (fun ((A) -> B), [A]) -> [B]).
-spec(map_in_order/2 :: (fun ((A) -> B), [A]) -> [B]).
--spec(table_fold/3 :: (fun ((any(), A) -> A), A, atom()) -> A).
+-spec(table_filter/3:: (fun ((A) -> boolean()), fun ((A, boolean()) -> 'ok'),
+ atom()) -> [A]).
-spec(dirty_read_all/1 :: (atom()) -> [any()]).
-spec(dirty_foreach_key/2 :: (fun ((any()) -> any()), atom())
-> 'ok' | 'aborted').
@@ -153,6 +160,8 @@
-spec(read_term_file/1 ::
(file:filename()) -> {'ok', [any()]} | rabbit_types:error(any())).
-spec(write_term_file/2 :: (file:filename(), [any()]) -> ok_or_error()).
+-spec(write_file/2 :: (file:filename(), iodata()) -> ok_or_error()).
+-spec(write_file/3 :: (file:filename(), iodata(), [any()]) -> ok_or_error()).
-spec(append_file/2 :: (file:filename(), string()) -> ok_or_error()).
-spec(ensure_parent_dirs_exist/1 :: (string()) -> 'ok').
-spec(format_stderr/2 :: (string(), [any()]) -> 'ok').
@@ -177,7 +186,6 @@
-> rabbit_types:ok_or_error({file:filename(), file:filename(), any()})).
-spec(dict_cons/3 :: (any(), any(), dict()) -> dict()).
-spec(orddict_cons/3 :: (any(), any(), orddict:orddict()) -> orddict:orddict()).
--spec(unlink_and_capture_exit/1 :: (pid()) -> 'ok').
-spec(get_options/2 :: ([optdef()], [string()])
-> {[string()], [{string(), any()}]}).
-spec(all_module_attributes/1 :: (atom()) -> [{atom(), [term()]}]).
@@ -190,10 +198,15 @@
digraph:vertex(), digraph:vertex()})).
-spec(now_ms/0 :: () -> non_neg_integer()).
-spec(lock_file/1 :: (file:filename()) -> rabbit_types:ok_or_error('eexist')).
--spec(const_ok/1 :: (any()) -> 'ok').
--spec(const/1 :: (A) -> const(A)).
+-spec(const_ok/0 :: () -> 'ok').
+-spec(const/1 :: (A) -> thunk(A)).
-spec(ntoa/1 :: (inet:ip_address()) -> string()).
-spec(ntoab/1 :: (inet:ip_address()) -> string()).
+-spec(is_process_alive/1 :: (pid()) -> boolean()).
+-spec(pget/2 :: (term(), [term()]) -> term()).
+-spec(pget/3 :: (term(), [term()], term()) -> term()).
+-spec(pget_or_die/2 :: (term(), [term()]) -> term() | no_return()).
+-spec(format_message_queue/2 :: (any(), priority_queue:q()) -> term()).
-endif.
@@ -266,6 +279,10 @@ table_lookup(Table, Key) ->
false -> undefined
end.
+set_table_value(Table, Key, Type, Value) ->
+ sort_field_table(
+ lists:keystore(Key, 1, Table, {Key, Type, Value})).
+
r(#resource{virtual_host = VHostPath}, Kind, Name)
when is_binary(Name) ->
#resource{virtual_host = VHostPath, kind = Kind, name = Name};
@@ -350,8 +367,11 @@ throw_on_error(E, Thunk) ->
with_exit_handler(Handler, Thunk) ->
try
Thunk()
- catch exit:{R, _} when R =:= noproc; R =:= nodedown;
- R =:= normal; R =:= shutdown ->
+ catch
+ exit:{R, _} when R =:= noproc; R =:= nodedown;
+ R =:= normal; R =:= shutdown ->
+ Handler();
+ exit:{{R, _}, _} when R =:= nodedown; R =:= shutdown ->
Handler()
end.
@@ -400,17 +420,12 @@ execute_mnesia_transaction(TxFun, PrePostCommitFun) ->
end), false).
%% Like execute_mnesia_transaction/2, but TxFun is expected to return a
-%% TailFun which gets called immediately before and after the tx commit
+%% TailFun which gets called (only) immediately after the tx commit
execute_mnesia_tx_with_tail(TxFun) ->
case mnesia:is_transaction() of
true -> execute_mnesia_transaction(TxFun);
- false -> TailFun = execute_mnesia_transaction(
- fun () ->
- TailFun1 = TxFun(),
- TailFun1(true),
- TailFun1
- end),
- TailFun(false)
+ false -> TailFun = execute_mnesia_transaction(TxFun),
+ TailFun()
end.
ensure_ok(ok, _) -> ok;
@@ -456,20 +471,23 @@ map_in_order(F, L) ->
lists:reverse(
lists:foldl(fun (E, Acc) -> [F(E) | Acc] end, [], L)).
-%% Fold over each entry in a table, executing the cons function in a
-%% transaction. This is often far more efficient than wrapping a tx
-%% around the lot.
+%% Apply a pre-post-commit function to all entries in a table that
+%% satisfy a predicate, and return those entries.
%%
%% We ignore entries that have been modified or removed.
-table_fold(F, Acc0, TableName) ->
+table_filter(Pred, PrePostCommitFun, TableName) ->
lists:foldl(
- fun (E, Acc) -> execute_mnesia_transaction(
- fun () -> case mnesia:match_object(TableName, E, read) of
- [] -> Acc;
- _ -> F(E, Acc)
- end
- end)
- end, Acc0, dirty_read_all(TableName)).
+ fun (E, Acc) ->
+ case execute_mnesia_transaction(
+ fun () -> mnesia:match_object(TableName, E, read) =/= []
+ andalso Pred(E) end,
+ fun (false, _Tx) -> false;
+ (true, Tx) -> PrePostCommitFun(E, Tx), true
+ end) of
+ false -> Acc;
+ true -> [E | Acc]
+ end
+ end, [], dirty_read_all(TableName)).
dirty_read_all(TableName) ->
mnesia:dirty_select(TableName, [{'$1',[],['$1']}]).
@@ -508,8 +526,42 @@ dirty_dump_log1(LH, {K, Terms, BadBytes}) ->
read_term_file(File) -> file:consult(File).
write_term_file(File, Terms) ->
- file:write_file(File, list_to_binary([io_lib:format("~w.~n", [Term]) ||
- Term <- Terms])).
+ write_file(File, list_to_binary([io_lib:format("~w.~n", [Term]) ||
+ Term <- Terms])).
+
+write_file(Path, Data) ->
+ write_file(Path, Data, []).
+
+%% write_file/3 and make_binary/1 are both based on corresponding
+%% functions in the kernel/file.erl module of the Erlang R14B02
+%% release, which is licensed under the EPL. That implementation of
+%% write_file/3 does not do an fsync prior to closing the file, hence
+%% the existence of this version. APIs are otherwise identical.
+write_file(Path, Data, Modes) ->
+ Modes1 = [binary, write | (Modes -- [binary, write])],
+ case make_binary(Data) of
+ Bin when is_binary(Bin) ->
+ case file:open(Path, Modes1) of
+ {ok, Hdl} -> try file:write(Hdl, Bin) of
+ ok -> file:sync(Hdl);
+ {error, _} = E -> E
+ after
+ file:close(Hdl)
+ end;
+ {error, _} = E -> E
+ end;
+ {error, _} = E -> E
+ end.
+
+make_binary(Bin) when is_binary(Bin) ->
+ Bin;
+make_binary(List) ->
+ try
+ iolist_to_binary(List)
+ catch error:Reason ->
+ {error, Reason}
+ end.
+
append_file(File, Suffix) ->
case file:read_file_info(File) of
@@ -527,7 +579,7 @@ append_file(File, 0, Suffix) ->
end;
append_file(File, _, Suffix) ->
case file:read_file(File) of
- {ok, Data} -> file:write_file([File, Suffix], Data, [append]);
+ {ok, Data} -> write_file([File, Suffix], Data, [append]);
Error -> Error
end.
@@ -744,18 +796,12 @@ dict_cons(Key, Value, Dict) ->
orddict_cons(Key, Value, Dict) ->
orddict:update(Key, fun (List) -> [Value | List] end, [Value], Dict).
-unlink_and_capture_exit(Pid) ->
- unlink(Pid),
- receive {'EXIT', Pid, _} -> ok
- after 0 -> ok
- end.
-
-% Separate flags and options from arguments.
-% get_options([{flag, "-q"}, {option, "-p", "/"}],
-% ["set_permissions","-p","/","guest",
-% "-q",".*",".*",".*"])
-% == {["set_permissions","guest",".*",".*",".*"],
-% [{"-q",true},{"-p","/"}]}
+%% Separate flags and options from arguments.
+%% get_options([{flag, "-q"}, {option, "-p", "/"}],
+%% ["set_permissions","-p","/","guest",
+%% "-q",".*",".*",".*"])
+%% == {["set_permissions","guest",".*",".*",".*"],
+%% [{"-q",true},{"-p","/"}]}
get_options(Defs, As) ->
lists:foldl(fun(Def, {AsIn, RsIn}) ->
{AsOut, Value} = case Def of
@@ -842,8 +888,8 @@ lock_file(Path) ->
ok = file:close(Lock)
end.
-const_ok(_) -> ok.
-const(X) -> fun (_) -> X end.
+const_ok() -> ok.
+const(X) -> fun () -> X end.
%% Format IPv4-mapped IPv6 addresses as IPv4, since they're what we see
%% when IPv6 is enabled but not used (i.e. 99% of the time).
@@ -858,3 +904,41 @@ ntoab(IP) ->
0 -> Str;
_ -> "[" ++ Str ++ "]"
end.
+
+is_process_alive(Pid) when node(Pid) =:= node() ->
+ erlang:is_process_alive(Pid);
+is_process_alive(Pid) ->
+ case rpc:call(node(Pid), erlang, is_process_alive, [Pid]) of
+ true -> true;
+ _ -> false
+ end.
+
+pget(K, P) -> proplists:get_value(K, P).
+pget(K, P, D) -> proplists:get_value(K, P, D).
+
+pget_or_die(K, P) ->
+ case proplists:get_value(K, P) of
+ undefined -> exit({error, key_missing, K});
+ V -> V
+ end.
+
+format_message_queue(_Opt, MQ) ->
+ Len = priority_queue:len(MQ),
+ {Len,
+ case Len > 100 of
+ false -> priority_queue:to_list(MQ);
+ true -> {summary,
+ orddict:to_list(
+ lists:foldl(
+ fun ({P, V}, Counts) ->
+ orddict:update_counter(
+ {P, format_message_queue_entry(V)}, 1, Counts)
+ end, orddict:new(), priority_queue:to_list(MQ)))}
+ end}.
+
+format_message_queue_entry(V) when is_atom(V) ->
+ V;
+format_message_queue_entry(V) when is_tuple(V) ->
+ list_to_tuple([format_message_queue_entry(E) || E <- tuple_to_list(V)]);
+format_message_queue_entry(_V) ->
+ '_'.