diff options
Diffstat (limited to 'src/rabbit_misc.erl')
-rw-r--r-- | src/rabbit_misc.erl | 186 |
1 files changed, 135 insertions, 51 deletions
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index 7d916797..3bbfb1d7 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -25,7 +25,7 @@ protocol_error/3, protocol_error/4, protocol_error/1]). -export([not_found/1, assert_args_equivalence/4]). -export([dirty_read/1]). --export([table_lookup/2]). +-export([table_lookup/2, set_table_value/4]). -export([r/3, r/2, r_arg/4, rs/1]). -export([enable_cover/0, report_cover/0]). -export([enable_cover/1, report_cover/1]). @@ -38,9 +38,9 @@ -export([ensure_ok/2]). -export([makenode/1, nodeparts/1, cookie_hash/0, tcp_name/3]). -export([upmap/2, map_in_order/2]). --export([table_fold/3]). +-export([table_filter/3]). -export([dirty_read_all/1, dirty_foreach_key/2, dirty_dump_log/1]). --export([read_term_file/1, write_term_file/2]). +-export([read_term_file/1, write_term_file/2, write_file/2, write_file/3]). -export([append_file/2, ensure_parent_dirs_exist/1]). -export([format_stderr/2]). -export([start_applications/1, stop_applications/1]). @@ -48,24 +48,25 @@ -export([sort_field_table/1]). -export([pid_to_string/1, string_to_pid/1]). -export([version_compare/2, version_compare/3]). --export([recursive_delete/1, recursive_copy/2, dict_cons/3, orddict_cons/3, - unlink_and_capture_exit/1]). +-export([recursive_delete/1, recursive_copy/2, dict_cons/3, orddict_cons/3]). -export([get_options/2]). -export([all_module_attributes/1, build_acyclic_graph/3]). -export([now_ms/0]). -export([lock_file/1]). --export([const_ok/1, const/1]). +-export([const_ok/0, const/1]). -export([ntoa/1, ntoab/1]). +-export([is_process_alive/1]). +-export([pget/2, pget/3, pget_or_die/2]). +-export([format_message_queue/2]). %%---------------------------------------------------------------------------- -ifdef(use_specs). --export_type([resource_name/0, thunk/1, const/1]). +-export_type([resource_name/0, thunk/1]). -type(ok_or_error() :: rabbit_types:ok_or_error(any())). -type(thunk(T) :: fun(() -> T)). --type(const(T) :: fun((any()) -> T)). -type(resource_name() :: binary()). -type(optdef() :: {flag, string()} | {option, string(), any()}). -type(channel_or_connection_exit() @@ -104,7 +105,12 @@ ({atom(), any()}) -> rabbit_types:ok_or_error2(any(), 'not_found')). -spec(table_lookup/2 :: (rabbit_framing:amqp_table(), binary()) - -> 'undefined' | {rabbit_framing:amqp_field_type(), any()}). + -> 'undefined' | {rabbit_framing:amqp_field_type(), any()}). +-spec(set_table_value/4 :: + (rabbit_framing:amqp_table(), binary(), + rabbit_framing:amqp_field_type(), rabbit_framing:amqp_value()) + -> rabbit_framing:amqp_table()). + -spec(r/2 :: (rabbit_types:vhost(), K) -> rabbit_types:r3(rabbit_types:vhost(), K, '_') when is_subtype(K, atom())). @@ -145,7 +151,8 @@ -> atom()). -spec(upmap/2 :: (fun ((A) -> B), [A]) -> [B]). -spec(map_in_order/2 :: (fun ((A) -> B), [A]) -> [B]). --spec(table_fold/3 :: (fun ((any(), A) -> A), A, atom()) -> A). +-spec(table_filter/3:: (fun ((A) -> boolean()), fun ((A, boolean()) -> 'ok'), + atom()) -> [A]). -spec(dirty_read_all/1 :: (atom()) -> [any()]). -spec(dirty_foreach_key/2 :: (fun ((any()) -> any()), atom()) -> 'ok' | 'aborted'). @@ -153,6 +160,8 @@ -spec(read_term_file/1 :: (file:filename()) -> {'ok', [any()]} | rabbit_types:error(any())). -spec(write_term_file/2 :: (file:filename(), [any()]) -> ok_or_error()). +-spec(write_file/2 :: (file:filename(), iodata()) -> ok_or_error()). +-spec(write_file/3 :: (file:filename(), iodata(), [any()]) -> ok_or_error()). -spec(append_file/2 :: (file:filename(), string()) -> ok_or_error()). -spec(ensure_parent_dirs_exist/1 :: (string()) -> 'ok'). -spec(format_stderr/2 :: (string(), [any()]) -> 'ok'). @@ -177,7 +186,6 @@ -> rabbit_types:ok_or_error({file:filename(), file:filename(), any()})). -spec(dict_cons/3 :: (any(), any(), dict()) -> dict()). -spec(orddict_cons/3 :: (any(), any(), orddict:orddict()) -> orddict:orddict()). --spec(unlink_and_capture_exit/1 :: (pid()) -> 'ok'). -spec(get_options/2 :: ([optdef()], [string()]) -> {[string()], [{string(), any()}]}). -spec(all_module_attributes/1 :: (atom()) -> [{atom(), [term()]}]). @@ -190,10 +198,15 @@ digraph:vertex(), digraph:vertex()})). -spec(now_ms/0 :: () -> non_neg_integer()). -spec(lock_file/1 :: (file:filename()) -> rabbit_types:ok_or_error('eexist')). --spec(const_ok/1 :: (any()) -> 'ok'). --spec(const/1 :: (A) -> const(A)). +-spec(const_ok/0 :: () -> 'ok'). +-spec(const/1 :: (A) -> thunk(A)). -spec(ntoa/1 :: (inet:ip_address()) -> string()). -spec(ntoab/1 :: (inet:ip_address()) -> string()). +-spec(is_process_alive/1 :: (pid()) -> boolean()). +-spec(pget/2 :: (term(), [term()]) -> term()). +-spec(pget/3 :: (term(), [term()], term()) -> term()). +-spec(pget_or_die/2 :: (term(), [term()]) -> term() | no_return()). +-spec(format_message_queue/2 :: (any(), priority_queue:q()) -> term()). -endif. @@ -266,6 +279,10 @@ table_lookup(Table, Key) -> false -> undefined end. +set_table_value(Table, Key, Type, Value) -> + sort_field_table( + lists:keystore(Key, 1, Table, {Key, Type, Value})). + r(#resource{virtual_host = VHostPath}, Kind, Name) when is_binary(Name) -> #resource{virtual_host = VHostPath, kind = Kind, name = Name}; @@ -350,8 +367,11 @@ throw_on_error(E, Thunk) -> with_exit_handler(Handler, Thunk) -> try Thunk() - catch exit:{R, _} when R =:= noproc; R =:= nodedown; - R =:= normal; R =:= shutdown -> + catch + exit:{R, _} when R =:= noproc; R =:= nodedown; + R =:= normal; R =:= shutdown -> + Handler(); + exit:{{R, _}, _} when R =:= nodedown; R =:= shutdown -> Handler() end. @@ -400,17 +420,12 @@ execute_mnesia_transaction(TxFun, PrePostCommitFun) -> end), false). %% Like execute_mnesia_transaction/2, but TxFun is expected to return a -%% TailFun which gets called immediately before and after the tx commit +%% TailFun which gets called (only) immediately after the tx commit execute_mnesia_tx_with_tail(TxFun) -> case mnesia:is_transaction() of true -> execute_mnesia_transaction(TxFun); - false -> TailFun = execute_mnesia_transaction( - fun () -> - TailFun1 = TxFun(), - TailFun1(true), - TailFun1 - end), - TailFun(false) + false -> TailFun = execute_mnesia_transaction(TxFun), + TailFun() end. ensure_ok(ok, _) -> ok; @@ -456,20 +471,23 @@ map_in_order(F, L) -> lists:reverse( lists:foldl(fun (E, Acc) -> [F(E) | Acc] end, [], L)). -%% Fold over each entry in a table, executing the cons function in a -%% transaction. This is often far more efficient than wrapping a tx -%% around the lot. +%% Apply a pre-post-commit function to all entries in a table that +%% satisfy a predicate, and return those entries. %% %% We ignore entries that have been modified or removed. -table_fold(F, Acc0, TableName) -> +table_filter(Pred, PrePostCommitFun, TableName) -> lists:foldl( - fun (E, Acc) -> execute_mnesia_transaction( - fun () -> case mnesia:match_object(TableName, E, read) of - [] -> Acc; - _ -> F(E, Acc) - end - end) - end, Acc0, dirty_read_all(TableName)). + fun (E, Acc) -> + case execute_mnesia_transaction( + fun () -> mnesia:match_object(TableName, E, read) =/= [] + andalso Pred(E) end, + fun (false, _Tx) -> false; + (true, Tx) -> PrePostCommitFun(E, Tx), true + end) of + false -> Acc; + true -> [E | Acc] + end + end, [], dirty_read_all(TableName)). dirty_read_all(TableName) -> mnesia:dirty_select(TableName, [{'$1',[],['$1']}]). @@ -508,8 +526,42 @@ dirty_dump_log1(LH, {K, Terms, BadBytes}) -> read_term_file(File) -> file:consult(File). write_term_file(File, Terms) -> - file:write_file(File, list_to_binary([io_lib:format("~w.~n", [Term]) || - Term <- Terms])). + write_file(File, list_to_binary([io_lib:format("~w.~n", [Term]) || + Term <- Terms])). + +write_file(Path, Data) -> + write_file(Path, Data, []). + +%% write_file/3 and make_binary/1 are both based on corresponding +%% functions in the kernel/file.erl module of the Erlang R14B02 +%% release, which is licensed under the EPL. That implementation of +%% write_file/3 does not do an fsync prior to closing the file, hence +%% the existence of this version. APIs are otherwise identical. +write_file(Path, Data, Modes) -> + Modes1 = [binary, write | (Modes -- [binary, write])], + case make_binary(Data) of + Bin when is_binary(Bin) -> + case file:open(Path, Modes1) of + {ok, Hdl} -> try file:write(Hdl, Bin) of + ok -> file:sync(Hdl); + {error, _} = E -> E + after + file:close(Hdl) + end; + {error, _} = E -> E + end; + {error, _} = E -> E + end. + +make_binary(Bin) when is_binary(Bin) -> + Bin; +make_binary(List) -> + try + iolist_to_binary(List) + catch error:Reason -> + {error, Reason} + end. + append_file(File, Suffix) -> case file:read_file_info(File) of @@ -527,7 +579,7 @@ append_file(File, 0, Suffix) -> end; append_file(File, _, Suffix) -> case file:read_file(File) of - {ok, Data} -> file:write_file([File, Suffix], Data, [append]); + {ok, Data} -> write_file([File, Suffix], Data, [append]); Error -> Error end. @@ -744,18 +796,12 @@ dict_cons(Key, Value, Dict) -> orddict_cons(Key, Value, Dict) -> orddict:update(Key, fun (List) -> [Value | List] end, [Value], Dict). -unlink_and_capture_exit(Pid) -> - unlink(Pid), - receive {'EXIT', Pid, _} -> ok - after 0 -> ok - end. - -% Separate flags and options from arguments. -% get_options([{flag, "-q"}, {option, "-p", "/"}], -% ["set_permissions","-p","/","guest", -% "-q",".*",".*",".*"]) -% == {["set_permissions","guest",".*",".*",".*"], -% [{"-q",true},{"-p","/"}]} +%% Separate flags and options from arguments. +%% get_options([{flag, "-q"}, {option, "-p", "/"}], +%% ["set_permissions","-p","/","guest", +%% "-q",".*",".*",".*"]) +%% == {["set_permissions","guest",".*",".*",".*"], +%% [{"-q",true},{"-p","/"}]} get_options(Defs, As) -> lists:foldl(fun(Def, {AsIn, RsIn}) -> {AsOut, Value} = case Def of @@ -842,8 +888,8 @@ lock_file(Path) -> ok = file:close(Lock) end. -const_ok(_) -> ok. -const(X) -> fun (_) -> X end. +const_ok() -> ok. +const(X) -> fun () -> X end. %% Format IPv4-mapped IPv6 addresses as IPv4, since they're what we see %% when IPv6 is enabled but not used (i.e. 99% of the time). @@ -858,3 +904,41 @@ ntoab(IP) -> 0 -> Str; _ -> "[" ++ Str ++ "]" end. + +is_process_alive(Pid) when node(Pid) =:= node() -> + erlang:is_process_alive(Pid); +is_process_alive(Pid) -> + case rpc:call(node(Pid), erlang, is_process_alive, [Pid]) of + true -> true; + _ -> false + end. + +pget(K, P) -> proplists:get_value(K, P). +pget(K, P, D) -> proplists:get_value(K, P, D). + +pget_or_die(K, P) -> + case proplists:get_value(K, P) of + undefined -> exit({error, key_missing, K}); + V -> V + end. + +format_message_queue(_Opt, MQ) -> + Len = priority_queue:len(MQ), + {Len, + case Len > 100 of + false -> priority_queue:to_list(MQ); + true -> {summary, + orddict:to_list( + lists:foldl( + fun ({P, V}, Counts) -> + orddict:update_counter( + {P, format_message_queue_entry(V)}, 1, Counts) + end, orddict:new(), priority_queue:to_list(MQ)))} + end}. + +format_message_queue_entry(V) when is_atom(V) -> + V; +format_message_queue_entry(V) when is_tuple(V) -> + list_to_tuple([format_message_queue_entry(E) || E <- tuple_to_list(V)]); +format_message_queue_entry(_V) -> + '_'. |