diff options
author | Matthias Radestock <matthias@rabbitmq.com> | 2014-02-10 19:23:08 +0000 |
---|---|---|
committer | Matthias Radestock <matthias@rabbitmq.com> | 2014-02-10 19:23:08 +0000 |
commit | 38fb7acd2574ee23a3637ba279c5101ef1cb652f (patch) | |
tree | adf8e50087407994ab6b7eea88b7992db9f6db07 | |
parent | 9b3363ed418af986a030d0632a06457cb65410de (diff) | |
parent | ff213ba14eb7921c2de67c005ab84db6094a853e (diff) | |
download | rabbitmq-server-38fb7acd2574ee23a3637ba279c5101ef1cb652f.tar.gz |
merge default into bug25853
32 files changed, 931 insertions, 566 deletions
diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml index d19acd00..a7e42503 100644 --- a/docs/rabbitmqctl.1.xml +++ b/docs/rabbitmqctl.1.xml @@ -502,6 +502,23 @@ </para> </listitem> </varlistentry> + <varlistentry> + <term><cmdsynopsis><command>set_cluster_name</command> <arg choice="req">name</arg></cmdsynopsis></term> + <listitem> + <para> + Sets the cluster name. The cluster name is announced to + clients on connection, and used by the federation and + shovel plugins to record where a message has been. The + cluster name is by default derived from the hostname of + the first node in the cluster, but can be changed. + </para> + <para role="example-prefix">For example:</para> + <screen role="example">rabbitmqctl set_cluster_name london</screen> + <para role="example"> + This sets the cluster name to "london". + </para> + </listitem> + </varlistentry> </variablelist> </refsect2> diff --git a/include/rabbit.hrl b/include/rabbit.hrl index 6d117e3d..19eef65a 100644 --- a/include/rabbit.hrl +++ b/include/rabbit.hrl @@ -60,7 +60,7 @@ -record(trie_node, {exchange_name, node_id}). -record(trie_edge, {exchange_name, node_id, word}). --record(trie_binding, {exchange_name, node_id, destination}). +-record(trie_binding, {exchange_name, node_id, destination, arguments}). -record(listener, {node, protocol, host, ip_address, port}). @@ -73,7 +73,7 @@ -record(delivery, {mandatory, confirm, sender, message, msg_seq_no}). -record(amqp_error, {name, explanation = "", method = none}). --record(event, {type, props, timestamp}). +-record(event, {type, props, reference = undefined, timestamp}). -record(message_properties, {expiry, needs_confirming = false}). diff --git a/src/gen_server2.erl b/src/gen_server2.erl index 6690d181..ee82bcb3 100644 --- a/src/gen_server2.erl +++ b/src/gen_server2.erl @@ -81,6 +81,14 @@ %% process as sys:get_status/1 would). Pass through a function which %% can be invoked on the state, get back the result. The state is not %% modified. +%% +%% 10) an mcall/1 function has been added for performing multiple +%% call/3 in parallel. Unlike multi_call, which sends the same request +%% to same-named processes residing on a supplied list of nodes, it +%% operates on name/request pairs, where name is anything accepted by +%% call/3, i.e. a pid, global name, local name, or local name on a +%% particular node. +%% %% All modifications are (C) 2009-2013 GoPivotal, Inc. @@ -190,6 +198,7 @@ cast/2, reply/2, abcast/2, abcast/3, multi_call/2, multi_call/3, multi_call/4, + mcall/1, with_state/2, enter_loop/3, enter_loop/4, enter_loop/5, enter_loop/6, wake_hib/1]). @@ -389,6 +398,85 @@ multi_call(Nodes, Name, Req, Timeout) when is_list(Nodes), is_atom(Name), is_integer(Timeout), Timeout >= 0 -> do_multi_call(Nodes, Name, Req, Timeout). +%%% ----------------------------------------------------------------- +%%% Make multiple calls to multiple servers, given pairs of servers +%%% and messages. +%%% Returns: {[{Dest, Reply}], [{Dest, Error}]} +%%% +%%% Dest can be pid() | RegName :: atom() | +%%% {Name :: atom(), Node :: atom()} | {global, Name :: atom()} +%%% +%%% A middleman process is used to avoid clogging up the callers +%%% message queue. +%%% ----------------------------------------------------------------- +mcall(CallSpecs) -> + Tag = make_ref(), + {_, MRef} = spawn_monitor( + fun() -> + Refs = lists:foldl( + fun ({Dest, _Request}=S, Dict) -> + dict:store(do_mcall(S), Dest, Dict) + end, dict:new(), CallSpecs), + collect_replies(Tag, Refs, [], []) + end), + receive + {'DOWN', MRef, _, _, {Tag, Result}} -> Result; + {'DOWN', MRef, _, _, Reason} -> exit(Reason) + end. + +do_mcall({{global,Name}=Dest, Request}) -> + %% whereis_name is simply an ets lookup, and is precisely what + %% global:send/2 does, yet we need a Ref to put in the call to the + %% server, so invoking whereis_name makes a lot more sense here. + case global:whereis_name(Name) of + Pid when is_pid(Pid) -> + MRef = erlang:monitor(process, Pid), + catch msend(Pid, MRef, Request), + MRef; + undefined -> + Ref = make_ref(), + self() ! {'DOWN', Ref, process, Dest, noproc}, + Ref + end; +do_mcall({{Name,Node}=Dest, Request}) when is_atom(Name), is_atom(Node) -> + {_Node, MRef} = start_monitor(Node, Name), %% NB: we don't handle R6 + catch msend(Dest, MRef, Request), + MRef; +do_mcall({Dest, Request}) when is_atom(Dest); is_pid(Dest) -> + MRef = erlang:monitor(process, Dest), + catch msend(Dest, MRef, Request), + MRef. + +msend(Dest, MRef, Request) -> + erlang:send(Dest, {'$gen_call', {self(), MRef}, Request}, [noconnect]). + +collect_replies(Tag, Refs, Replies, Errors) -> + case dict:size(Refs) of + 0 -> exit({Tag, {Replies, Errors}}); + _ -> receive + {MRef, Reply} -> + {Refs1, Replies1} = handle_call_result(MRef, Reply, + Refs, Replies), + collect_replies(Tag, Refs1, Replies1, Errors); + {'DOWN', MRef, _, _, Reason} -> + Reason1 = case Reason of + noconnection -> nodedown; + _ -> Reason + end, + {Refs1, Errors1} = handle_call_result(MRef, Reason1, + Refs, Errors), + collect_replies(Tag, Refs1, Replies, Errors1) + end + end. + +handle_call_result(MRef, Result, Refs, AccList) -> + %% we avoid the mailbox scanning cost of a call to erlang:demonitor/{1,2} + %% here, so we must cope with MRefs that we've already seen and erased + case dict:find(MRef, Refs) of + {ok, Pid} -> {dict:erase(MRef, Refs), [{Pid, Result}|AccList]}; + _ -> {Refs, AccList} + end. + %% ----------------------------------------------------------------- %% Apply a function to a generic server's state. %% ----------------------------------------------------------------- diff --git a/src/mirrored_supervisor.erl b/src/mirrored_supervisor.erl index d5f51db0..6c5c09df 100644 --- a/src/mirrored_supervisor.erl +++ b/src/mirrored_supervisor.erl @@ -57,10 +57,10 @@ %% This is basically the same as for supervisor, except that: %% %% 1) start_link(Module, Args) becomes -%% start_link(Group, Module, Args). +%% start_link(Group, TxFun, Module, Args). %% %% 2) start_link({local, Name}, Module, Args) becomes -%% start_link({local, Name}, Group, Module, Args). +%% start_link({local, Name}, Group, TxFun, Module, Args). %% %% 3) start_link({global, Name}, Module, Args) is not available. %% @@ -115,7 +115,7 @@ {attributes, record_info(fields, mirrored_sup_childspec)}]}). -define(TABLE_MATCH, {match, #mirrored_sup_childspec{ _ = '_' }}). --export([start_link/3, start_link/4, +-export([start_link/4, start_link/5, start_child/2, restart_child/2, delete_child/2, terminate_child/2, which_children/1, count_children/1, check_childspecs/1]). @@ -126,7 +126,7 @@ -export([init/1, handle_call/3, handle_info/2, terminate/2, code_change/3, handle_cast/2]). --export([start_internal/2]). +-export([start_internal/3]). -export([create_tables/0, table_definitions/0]). -record(mirrored_sup_childspec, {key, mirroring_pid, childspec}). @@ -134,6 +134,7 @@ -record(state, {overall, delegate, group, + tx_fun, initial_childspecs}). %%---------------------------------------------------------------------------- @@ -160,19 +161,25 @@ -type group_name() :: any(). --spec start_link(GroupName, Module, Args) -> startlink_ret() when +-type(tx_fun() :: fun((fun(() -> any())) -> any())). + +-spec start_link(GroupName, TxFun, Module, Args) -> startlink_ret() when GroupName :: group_name(), + TxFun :: tx_fun(), Module :: module(), Args :: term(). --spec start_link(SupName, GroupName, Module, Args) -> startlink_ret() when +-spec start_link(SupName, GroupName, TxFun, Module, Args) -> + startlink_ret() when SupName :: supervisor2:sup_name(), GroupName :: group_name(), + TxFun :: tx_fun(), Module :: module(), Args :: term(). --spec start_internal(Group, ChildSpecs) -> Result when +-spec start_internal(Group, TxFun, ChildSpecs) -> Result when Group :: group_name(), + TxFun :: tx_fun(), ChildSpecs :: [supervisor2:child_spec()], Result :: {'ok', pid()} | {'error', term()}. @@ -190,18 +197,18 @@ behaviour_info(_Other) -> undefined. %%---------------------------------------------------------------------------- -start_link(Group, Mod, Args) -> - start_link0([], Group, init(Mod, Args)). +start_link(Group, TxFun, Mod, Args) -> + start_link0([], Group, TxFun, init(Mod, Args)). -start_link({local, SupName}, Group, Mod, Args) -> - start_link0([{local, SupName}], Group, init(Mod, Args)); +start_link({local, SupName}, Group, TxFun, Mod, Args) -> + start_link0([{local, SupName}], Group, TxFun, init(Mod, Args)); -start_link({global, _SupName}, _Group, _Mod, _Args) -> +start_link({global, _SupName}, _Group, _TxFun, _Mod, _Args) -> erlang:error(badarg). -start_link0(Prefix, Group, Init) -> +start_link0(Prefix, Group, TxFun, Init) -> case apply(?SUPERVISOR, start_link, - Prefix ++ [?MODULE, {overall, Group, Init}]) of + Prefix ++ [?MODULE, {overall, Group, TxFun, Init}]) of {ok, Pid} -> case catch call(Pid, {init, Pid}) of ok -> {ok, Pid}; E -> E @@ -257,14 +264,14 @@ mirroring(Sup) -> child(Sup, mirroring). %%---------------------------------------------------------------------------- -start_internal(Group, ChildSpecs) -> - ?GEN_SERVER:start_link(?MODULE, {mirroring, Group, ChildSpecs}, +start_internal(Group, TxFun, ChildSpecs) -> + ?GEN_SERVER:start_link(?MODULE, {mirroring, Group, TxFun, ChildSpecs}, [{timeout, infinity}]). %%---------------------------------------------------------------------------- -init({overall, _Group, ignore}) -> ignore; -init({overall, Group, {ok, {Restart, ChildSpecs}}}) -> +init({overall, _Group, _TxFun, ignore}) -> ignore; +init({overall, Group, TxFun, {ok, {Restart, ChildSpecs}}}) -> %% Important: Delegate MUST start before Mirroring so that when we %% shut down from above it shuts down last, so Mirroring does not %% see it die. @@ -273,27 +280,30 @@ init({overall, Group, {ok, {Restart, ChildSpecs}}}) -> {ok, {{one_for_all, 0, 1}, [{delegate, {?SUPERVISOR, start_link, [?MODULE, {delegate, Restart}]}, temporary, 16#ffffffff, supervisor, [?SUPERVISOR]}, - {mirroring, {?MODULE, start_internal, [Group, ChildSpecs]}, + {mirroring, {?MODULE, start_internal, [Group, TxFun, ChildSpecs]}, permanent, 16#ffffffff, worker, [?MODULE]}]}}; init({delegate, Restart}) -> {ok, {Restart, []}}; -init({mirroring, Group, ChildSpecs}) -> - {ok, #state{group = Group, initial_childspecs = ChildSpecs}}. +init({mirroring, Group, TxFun, ChildSpecs}) -> + {ok, #state{group = Group, + tx_fun = TxFun, + initial_childspecs = ChildSpecs}}. handle_call({init, Overall}, _From, State = #state{overall = undefined, delegate = undefined, group = Group, + tx_fun = TxFun, initial_childspecs = ChildSpecs}) -> process_flag(trap_exit, true), ?PG2:create(Group), ok = ?PG2:join(Group, Overall), Rest = ?PG2:get_members(Group) -- [Overall], case Rest of - [] -> {atomic, _} = mnesia:transaction(fun() -> delete_all(Group) end); + [] -> TxFun(fun() -> delete_all(Group) end); _ -> ok end, [begin @@ -303,7 +313,8 @@ handle_call({init, Overall}, _From, Delegate = delegate(Overall), erlang:monitor(process, Delegate), State1 = State#state{overall = Overall, delegate = Delegate}, - case errors([maybe_start(Group, Overall, Delegate, S) || S <- ChildSpecs]) of + case errors([maybe_start(Group, TxFun, Overall, Delegate, S) + || S <- ChildSpecs]) of [] -> {reply, ok, State1}; Errors -> {stop, {shutdown, Errors}, State1} end; @@ -311,16 +322,18 @@ handle_call({init, Overall}, _From, handle_call({start_child, ChildSpec}, _From, State = #state{overall = Overall, delegate = Delegate, - group = Group}) -> - {reply, case maybe_start(Group, Overall, Delegate, ChildSpec) of + group = Group, + tx_fun = TxFun}) -> + {reply, case maybe_start(Group, TxFun, Overall, Delegate, ChildSpec) of already_in_mnesia -> {error, already_present}; {already_in_mnesia, Pid} -> {error, {already_started, Pid}}; Else -> Else end, State}; handle_call({delete_child, Id}, _From, State = #state{delegate = Delegate, - group = Group}) -> - {reply, stop(Group, Delegate, Id), State}; + group = Group, + tx_fun = TxFun}) -> + {reply, stop(Group, TxFun, Delegate, Id), State}; handle_call({msg, F, A}, _From, State = #state{delegate = Delegate}) -> {reply, apply(?SUPERVISOR, F, [Delegate | A]), State}; @@ -357,14 +370,15 @@ handle_info({'DOWN', _Ref, process, Pid, Reason}, {stop, Reason, State}; handle_info({'DOWN', _Ref, process, Pid, _Reason}, - State = #state{delegate = Delegate, group = Group, - overall = O}) -> + State = #state{delegate = Delegate, + group = Group, + tx_fun = TxFun, + overall = O}) -> %% TODO load balance this %% No guarantee pg2 will have received the DOWN before us. R = case lists:sort(?PG2:get_members(Group)) -- [Pid] of - [O | _] -> {atomic, ChildSpecs} = - mnesia:transaction( - fun() -> update_all(O, Pid) end), + [O | _] -> ChildSpecs = + TxFun(fun() -> update_all(O, Pid) end), [start(Delegate, ChildSpec) || ChildSpec <- ChildSpecs]; _ -> [] end, @@ -387,14 +401,14 @@ code_change(_OldVsn, State, _Extra) -> tell_all_peers_to_die(Group, Reason) -> [cast(P, {die, Reason}) || P <- ?PG2:get_members(Group) -- [self()]]. -maybe_start(Group, Overall, Delegate, ChildSpec) -> - case mnesia:transaction( - fun() -> check_start(Group, Overall, Delegate, ChildSpec) end) of - {atomic, start} -> start(Delegate, ChildSpec); - {atomic, undefined} -> already_in_mnesia; - {atomic, Pid} -> {already_in_mnesia, Pid}; +maybe_start(Group, TxFun, Overall, Delegate, ChildSpec) -> + try TxFun(fun() -> check_start(Group, Overall, Delegate, ChildSpec) end) of + start -> start(Delegate, ChildSpec); + undefined -> already_in_mnesia; + Pid -> {already_in_mnesia, Pid} + catch %% If we are torn down while in the transaction... - {aborted, E} -> {error, E} + {error, E} -> {error, E} end. check_start(Group, Overall, Delegate, ChildSpec) -> @@ -429,11 +443,12 @@ delete(Group, Id) -> start(Delegate, ChildSpec) -> apply(?SUPERVISOR, start_child, [Delegate, ChildSpec]). -stop(Group, Delegate, Id) -> - case mnesia:transaction(fun() -> check_stop(Group, Delegate, Id) end) of - {atomic, deleted} -> apply(?SUPERVISOR, delete_child, [Delegate, Id]); - {atomic, running} -> {error, running}; - {aborted, E} -> {error, E} +stop(Group, TxFun, Delegate, Id) -> + try TxFun(fun() -> check_stop(Group, Delegate, Id) end) of + deleted -> apply(?SUPERVISOR, delete_child, [Delegate, Id]); + running -> {error, running} + catch + {error, E} -> {error, E} end. check_stop(Group, Delegate, Id) -> diff --git a/src/rabbit.erl b/src/rabbit.erl index 045c5d58..fd89fd95 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -20,7 +20,7 @@ -export([start/0, boot/0, stop/0, stop_and_halt/0, await_startup/0, status/0, is_running/0, - is_running/1, environment/0, rotate_logs/1, force_event_refresh/0, + is_running/1, environment/0, rotate_logs/1, force_event_refresh/1, start_fhc/0]). -export([start/2, stop/1]). @@ -227,7 +227,7 @@ -spec(is_running/1 :: (node()) -> boolean()). -spec(environment/0 :: () -> [{param(), term()}]). -spec(rotate_logs/1 :: (file_suffix()) -> rabbit_types:ok_or_error(any())). --spec(force_event_refresh/0 :: () -> 'ok'). +-spec(force_event_refresh/1 :: (reference()) -> 'ok'). -spec(log_location/1 :: ('sasl' | 'kernel') -> log_location()). @@ -696,11 +696,11 @@ log_rotation_result(ok, {error, SaslLogError}) -> log_rotation_result(ok, ok) -> ok. -force_event_refresh() -> - rabbit_direct:force_event_refresh(), - rabbit_networking:force_connection_event_refresh(), - rabbit_channel:force_event_refresh(), - rabbit_amqqueue:force_event_refresh(). +force_event_refresh(Ref) -> + rabbit_direct:force_event_refresh(Ref), + rabbit_networking:force_connection_event_refresh(Ref), + rabbit_channel:force_event_refresh(Ref), + rabbit_amqqueue:force_event_refresh(Ref). %%--------------------------------------------------------------------------- %% misc diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index c48a4d0a..607b6fc1 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -24,7 +24,7 @@ check_exclusive_access/2, with_exclusive_access_or_die/3, stat/1, deliver/2, deliver_flow/2, requeue/3, ack/3, reject/4]). -export([list/0, list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2]). --export([force_event_refresh/0, notify_policy_changed/1]). +-export([force_event_refresh/1, notify_policy_changed/1]). -export([consumers/1, consumers_all/1, consumer_info_keys/0]). -export([basic_get/4, basic_consume/9, basic_cancel/4, notify_decorators/1]). -export([notify_sent/2, notify_sent_queue_down/1, resume/2]). @@ -110,7 +110,7 @@ -spec(info_all/1 :: (rabbit_types:vhost()) -> [rabbit_types:infos()]). -spec(info_all/2 :: (rabbit_types:vhost(), rabbit_types:info_keys()) -> [rabbit_types:infos()]). --spec(force_event_refresh/0 :: () -> 'ok'). +-spec(force_event_refresh/1 :: (reference()) -> 'ok'). -spec(notify_policy_changed/1 :: (rabbit_types:amqqueue()) -> 'ok'). -spec(consumers/1 :: (rabbit_types:amqqueue()) -> [{pid(), rabbit_types:ctag(), boolean(), @@ -355,14 +355,14 @@ with(Name, F, E) -> {ok, Q = #amqqueue{pid = QPid}} -> %% We check is_process_alive(QPid) in case we receive a %% nodedown (for example) in F() that has nothing to do - %% with the QPid. + %% with the QPid. F() should be written s.t. that this + %% cannot happen, so we bail if it does since that + %% indicates a code bug and we don't want to get stuck in + %% the retry loop. rabbit_misc:with_exit_handler( - fun () -> - case rabbit_misc:is_process_alive(QPid) of - true -> E(not_found_or_absent_dirty(Name)); - false -> timer:sleep(25), - with(Name, F, E) - end + fun () -> false = rabbit_misc:is_process_alive(QPid), + timer:sleep(25), + with(Name, F, E) end, fun () -> F(Q) end); {error, not_found} -> E(not_found_or_absent_dirty(Name)) @@ -503,19 +503,20 @@ info_all(VHostPath, Items) -> map(VHostPath, fun (Q) -> info(Q, Items) end). %% the first place since a node failed). Therefore we keep poking at %% the list of queues until we were able to talk to a live process or %% the queue no longer exists. -force_event_refresh() -> force_event_refresh([Q#amqqueue.name || Q <- list()]). +force_event_refresh(Ref) -> + force_event_refresh([Q#amqqueue.name || Q <- list()], Ref). -force_event_refresh(QNames) -> +force_event_refresh(QNames, Ref) -> Qs = [Q || Q <- list(), lists:member(Q#amqqueue.name, QNames)], - {_, Bad} = rabbit_misc:multi_call( - [Q#amqqueue.pid || Q <- Qs], force_event_refresh), + {_, Bad} = gen_server2:mcall( + [{Q#amqqueue.pid, {force_event_refresh, Ref}} || Q <- Qs]), FailedPids = [Pid || {Pid, _Reason} <- Bad], Failed = [Name || #amqqueue{name = Name, pid = Pid} <- Qs, lists:member(Pid, FailedPids)], case Failed of [] -> ok; _ -> timer:sleep(?FAILOVER_WAIT_MILLIS), - force_event_refresh(Failed) + force_event_refresh(Failed, Ref) end. notify_policy_changed(#amqqueue{pid = QPid}) -> diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index da8c0607..a1997376 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -20,8 +20,9 @@ -behaviour(gen_server2). --define(SYNC_INTERVAL, 25). %% milliseconds --define(RAM_DURATION_UPDATE_INTERVAL, 5000). +-define(SYNC_INTERVAL, 200). %% milliseconds +-define(RAM_DURATION_UPDATE_INTERVAL, 5000). +-define(CONSUMER_BIAS_RATIO, 1.1). %% i.e. consume 10% faster -export([start_link/1, info_keys/0]). @@ -327,10 +328,13 @@ noreply(NewState) -> {NewState1, Timeout} = next_state(NewState), {noreply, ensure_stats_timer(ensure_rate_timer(NewState1)), Timeout}. -next_state(State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> +next_state(State = #q{backing_queue = BQ, + backing_queue_state = BQS, + msg_id_to_channel = MTC}) -> assert_invariant(State), {MsgIds, BQS1} = BQ:drain_confirmed(BQS), - State1 = confirm_messages(MsgIds, State#q{backing_queue_state = BQS1}), + MTC1 = confirm_messages(MsgIds, MTC), + State1 = State#q{backing_queue_state = BQS1, msg_id_to_channel = MTC1}, case BQ:needs_timeout(BQS1) of false -> {stop_sync_timer(State1), hibernate }; idle -> {stop_sync_timer(State1), ?SYNC_INTERVAL}; @@ -411,9 +415,9 @@ maybe_send_drained(WasEmpty, State) -> end, State. -confirm_messages([], State) -> - State; -confirm_messages(MsgIds, State = #q{msg_id_to_channel = MTC}) -> +confirm_messages([], MTC) -> + MTC; +confirm_messages(MsgIds, MTC) -> {CMs, MTC1} = lists:foldl( fun(MsgId, {CMs, MTC0}) -> @@ -427,7 +431,7 @@ confirm_messages(MsgIds, State = #q{msg_id_to_channel = MTC}) -> end end, {gb_trees:empty(), MTC}, MsgIds), rabbit_misc:gb_trees_foreach(fun rabbit_misc:confirm_to_sender/2, CMs), - State#q{msg_id_to_channel = MTC1}. + MTC1. send_or_record_confirm(#delivery{confirm = false}, State) -> {never, State}; @@ -447,23 +451,22 @@ send_or_record_confirm(#delivery{confirm = true, rabbit_misc:confirm_to_sender(SenderPid, [MsgSeqNo]), {immediately, State}. -send_mandatory(#delivery{mandatory = false}) -> +send_mandatory(#delivery{mandatory = false}) -> ok; send_mandatory(#delivery{mandatory = true, sender = SenderPid, msg_seq_no = MsgSeqNo}) -> gen_server2:cast(SenderPid, {mandatory_received, MsgSeqNo}). -discard(#delivery{sender = SenderPid, - msg_seq_no = MsgSeqNo, - message = #basic_message{id = MsgId}}, State) -> - State1 = #q{backing_queue = BQ, backing_queue_state = BQS} = - case MsgSeqNo of - undefined -> State; - _ -> confirm_messages([MsgId], State) - end, +discard(#delivery{confirm = Confirm, + sender = SenderPid, + message = #basic_message{id = MsgId}}, BQ, BQS, MTC) -> + MTC1 = case Confirm of + true -> confirm_messages([MsgId], MTC); + false -> MTC + end, BQS1 = BQ:discard(MsgId, SenderPid, BQS), - State1#q{backing_queue_state = BQS1}. + {BQS1, MTC1}. run_message_queue(State) -> run_message_queue(false, State). @@ -486,20 +489,22 @@ run_message_queue(ActiveConsumersChanged, State) -> attempt_delivery(Delivery = #delivery{sender = SenderPid, message = Message}, Props, Delivered, State = #q{backing_queue = BQ, - backing_queue_state = BQS}) -> + backing_queue_state = BQS, + msg_id_to_channel = MTC}) -> case rabbit_queue_consumers:deliver( fun (true) -> true = BQ:is_empty(BQS), {AckTag, BQS1} = BQ:publish_delivered( Message, Props, SenderPid, BQS), - {{Message, Delivered, AckTag}, - State#q{backing_queue_state = BQS1}}; + {{Message, Delivered, AckTag}, {BQS1, MTC}}; (false) -> {{Message, Delivered, undefined}, - discard(Delivery, State)} + discard(Delivery, BQ, BQS, MTC)} end, qname(State), State#q.consumers) of - {delivered, ActiveConsumersChanged, State1, Consumers} -> + {delivered, ActiveConsumersChanged, {BQS1, MTC1}, Consumers} -> {delivered, maybe_notify_decorators( ActiveConsumersChanged, - State1#q{consumers = Consumers})}; + State#q{backing_queue_state = BQS1, + msg_id_to_channel = MTC1, + consumers = Consumers})}; {undelivered, ActiveConsumersChanged, Consumers} -> {undelivered, maybe_notify_decorators( ActiveConsumersChanged, @@ -511,7 +516,7 @@ deliver_or_enqueue(Delivery = #delivery{message = Message, sender = SenderPid}, backing_queue_state = BQS}) -> send_mandatory(Delivery), %% must do this before confirms {Confirm, State1} = send_or_record_confirm(Delivery, State), - Props = message_properties(Message, Confirm, State), + Props = message_properties(Message, Confirm, State1), {IsDuplicate, BQS1} = BQ:is_duplicate(Message, BQS), State2 = State1#q{backing_queue_state = BQS1}, case IsDuplicate orelse attempt_delivery(Delivery, Props, Delivered, @@ -521,8 +526,11 @@ deliver_or_enqueue(Delivery = #delivery{message = Message, sender = SenderPid}, {delivered, State3} -> State3; %% The next one is an optimisation - {undelivered, State3 = #q{ttl = 0, dlx = undefined}} -> - discard(Delivery, State3); + {undelivered, State3 = #q{ttl = 0, dlx = undefined, + backing_queue_state = BQS2, + msg_id_to_channel = MTC}} -> + {BQS3, MTC1} = discard(Delivery, BQ, BQS2, MTC), + State3#q{backing_queue_state = BQS3, msg_id_to_channel = MTC1}; {undelivered, State3 = #q{backing_queue_state = BQS2}} -> BQS3 = BQ:publish(Message, Props, Delivered, SenderPid, BQS2), {Dropped, State4 = #q{backing_queue_state = BQS4}} = @@ -816,14 +824,15 @@ emit_stats(State, Extra) -> not lists:member(K, ExtraKs)], rabbit_event:notify(queue_stats, Extra ++ Infos). -emit_consumer_created(ChPid, CTag, Exclusive, AckRequired, QName, Args) -> +emit_consumer_created(ChPid, CTag, Exclusive, AckRequired, QName, Args, Ref) -> rabbit_event:notify(consumer_created, [{consumer_tag, CTag}, {exclusive, Exclusive}, {ack_required, AckRequired}, {channel, ChPid}, {queue, QName}, - {arguments, Args}]). + {arguments, Args}], + Ref). emit_consumer_deleted(ChPid, ConsumerTag, QName) -> rabbit_event:notify(consumer_deleted, @@ -833,24 +842,36 @@ emit_consumer_deleted(ChPid, ConsumerTag, QName) -> %%---------------------------------------------------------------------------- -prioritise_call(Msg, _From, _Len, _State) -> +prioritise_call(Msg, _From, _Len, State) -> case Msg of - info -> 9; - {info, _Items} -> 9; - consumers -> 9; - stat -> 7; - _ -> 0 + info -> 9; + {info, _Items} -> 9; + consumers -> 9; + stat -> 7; + {basic_consume, _, _, _, _, _, _, _, _, _} -> consumer_bias(State); + {basic_cancel, _, _, _} -> consumer_bias(State); + _ -> 0 end. -prioritise_cast(Msg, _Len, _State) -> +prioritise_cast(Msg, _Len, State) -> case Msg of delete_immediately -> 8; {set_ram_duration_target, _Duration} -> 8; {set_maximum_since_use, _Age} -> 8; {run_backing_queue, _Mod, _Fun} -> 6; + {ack, _AckTags, _ChPid} -> consumer_bias(State); + {notify_sent, _ChPid, _Credit} -> consumer_bias(State); + {resume, _ChPid} -> consumer_bias(State); _ -> 0 end. +consumer_bias(#q{backing_queue = BQ, backing_queue_state = BQS}) -> + case BQ:msg_rates(BQS) of + {0.0, _} -> 0; + {Ingress, Egress} when Egress / Ingress < ?CONSUMER_BIAS_RATIO -> 1; + {_, _} -> 0 + end. + prioritise_info(Msg, _Len, #q{q = #amqqueue{exclusive_owner = DownPid}}) -> case Msg of {'DOWN', _, process, DownPid, _} -> 8; @@ -947,7 +968,7 @@ handle_call({basic_consume, NoAck, ChPid, LimiterPid, LimiterActive, exclusive_consumer = ExclusiveConsumer}, ok = maybe_send_reply(ChPid, OkMsg), emit_consumer_created(ChPid, ConsumerTag, ExclusiveConsume, - not NoAck, qname(State1), Args), + not NoAck, qname(State1), Args, none), notify_decorators(State1), reply(ok, run_message_queue(State1)) end; @@ -1028,19 +1049,19 @@ handle_call(sync_mirrors, _From, State) -> handle_call(cancel_sync_mirrors, _From, State) -> reply({ok, not_syncing}, State); -handle_call(force_event_refresh, _From, +handle_call({force_event_refresh, Ref}, _From, State = #q{consumers = Consumers, exclusive_consumer = Exclusive}) -> - rabbit_event:notify(queue_created, infos(?CREATION_EVENT_KEYS, State)), + rabbit_event:notify(queue_created, infos(?CREATION_EVENT_KEYS, State), Ref), QName = qname(State), AllConsumers = rabbit_queue_consumers:all(Consumers), case Exclusive of none -> [emit_consumer_created( - Ch, CTag, false, AckRequired, QName, Args) || + Ch, CTag, false, AckRequired, QName, Args, Ref) || {Ch, CTag, AckRequired, Args} <- AllConsumers]; {Ch, CTag} -> [{Ch, CTag, AckRequired, Args}] = AllConsumers, emit_consumer_created( - Ch, CTag, true, AckRequired, QName, Args) + Ch, CTag, true, AckRequired, QName, Args, Ref) end, reply(ok, State). diff --git a/src/rabbit_auth_backend_dummy.erl b/src/rabbit_auth_backend_dummy.erl new file mode 100644 index 00000000..1a3db732 --- /dev/null +++ b/src/rabbit_auth_backend_dummy.erl @@ -0,0 +1,49 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is GoPivotal, Inc. +%% Copyright (c) 2007-2013 GoPivotal, Inc. All rights reserved. +%% + +-module(rabbit_auth_backend_dummy). +-include("rabbit.hrl"). + +-behaviour(rabbit_auth_backend). + +-export([description/0]). +-export([user/0]). +-export([check_user_login/2, check_vhost_access/2, check_resource_access/3]). + +-ifdef(use_specs). + +-spec(user/0 :: () -> rabbit_types:user()). + +-endif. + +%% A user to be used by the direct client when permission checks are +%% not needed. This user can do anything AMQPish. +user() -> #user{username = <<"dummy">>, + tags = [], + auth_backend = ?MODULE, + impl = none}. + +%% Implementation of rabbit_auth_backend + +description() -> + [{name, <<"Dummy">>}, + {description, <<"Database for the dummy user">>}]. + +check_user_login(_, _) -> + {refused, "cannot log in conventionally as dummy user", []}. + +check_vhost_access(#user{}, _VHostPath) -> true. +check_resource_access(#user{}, #resource{}, _Permission) -> true. diff --git a/src/rabbit_auth_backend_internal.erl b/src/rabbit_auth_backend_internal.erl index 61919d05..ebeac1f7 100644 --- a/src/rabbit_auth_backend_internal.erl +++ b/src/rabbit_auth_backend_internal.erl @@ -22,15 +22,18 @@ -export([description/0]). -export([check_user_login/2, check_vhost_access/2, check_resource_access/3]). --export([add_user/2, delete_user/1, change_password/2, set_tags/2, - list_users/0, user_info_keys/0, lookup_user/1, clear_password/1]). --export([make_salt/0, check_password/2, change_password_hash/2, - hash_password/1]). --export([set_permissions/5, clear_permissions/2, - list_permissions/0, list_vhost_permissions/1, list_user_permissions/1, - list_user_vhost_permissions/2, perms_info_keys/0, - vhost_perms_info_keys/0, user_perms_info_keys/0, - user_vhost_perms_info_keys/0]). +-export([add_user/2, delete_user/1, lookup_user/1, + change_password/2, clear_password/1, + hash_password/1, change_password_hash/2, + set_tags/2, set_permissions/5, clear_permissions/2]). +-export([user_info_keys/0, perms_info_keys/0, + user_perms_info_keys/0, vhost_perms_info_keys/0, + user_vhost_perms_info_keys/0, + list_users/0, list_permissions/0, + list_user_permissions/1, list_vhost_permissions/1, + list_user_vhost_permissions/2]). + +%%---------------------------------------------------------------------------- -ifdef(use_specs). @@ -38,45 +41,39 @@ -spec(add_user/2 :: (rabbit_types:username(), rabbit_types:password()) -> 'ok'). -spec(delete_user/1 :: (rabbit_types:username()) -> 'ok'). +-spec(lookup_user/1 :: (rabbit_types:username()) + -> rabbit_types:ok(rabbit_types:internal_user()) + | rabbit_types:error('not_found')). -spec(change_password/2 :: (rabbit_types:username(), rabbit_types:password()) -> 'ok'). -spec(clear_password/1 :: (rabbit_types:username()) -> 'ok'). --spec(make_salt/0 :: () -> binary()). --spec(check_password/2 :: (rabbit_types:password(), - rabbit_types:password_hash()) -> boolean()). --spec(change_password_hash/2 :: (rabbit_types:username(), - rabbit_types:password_hash()) -> 'ok'). -spec(hash_password/1 :: (rabbit_types:password()) -> rabbit_types:password_hash()). +-spec(change_password_hash/2 :: (rabbit_types:username(), + rabbit_types:password_hash()) -> 'ok'). -spec(set_tags/2 :: (rabbit_types:username(), [atom()]) -> 'ok'). --spec(list_users/0 :: () -> [rabbit_types:infos()]). --spec(user_info_keys/0 :: () -> rabbit_types:info_keys()). --spec(lookup_user/1 :: (rabbit_types:username()) - -> rabbit_types:ok(rabbit_types:internal_user()) - | rabbit_types:error('not_found')). -spec(set_permissions/5 ::(rabbit_types:username(), rabbit_types:vhost(), regexp(), regexp(), regexp()) -> 'ok'). -spec(clear_permissions/2 :: (rabbit_types:username(), rabbit_types:vhost()) -> 'ok'). +-spec(user_info_keys/0 :: () -> rabbit_types:info_keys()). +-spec(perms_info_keys/0 :: () -> rabbit_types:info_keys()). +-spec(user_perms_info_keys/0 :: () -> rabbit_types:info_keys()). +-spec(vhost_perms_info_keys/0 :: () -> rabbit_types:info_keys()). +-spec(user_vhost_perms_info_keys/0 :: () -> rabbit_types:info_keys()). +-spec(list_users/0 :: () -> [rabbit_types:infos()]). -spec(list_permissions/0 :: () -> [rabbit_types:infos()]). --spec(list_vhost_permissions/1 :: - (rabbit_types:vhost()) -> [rabbit_types:infos()]). -spec(list_user_permissions/1 :: (rabbit_types:username()) -> [rabbit_types:infos()]). +-spec(list_vhost_permissions/1 :: + (rabbit_types:vhost()) -> [rabbit_types:infos()]). -spec(list_user_vhost_permissions/2 :: (rabbit_types:username(), rabbit_types:vhost()) -> [rabbit_types:infos()]). --spec(perms_info_keys/0 :: () -> rabbit_types:info_keys()). --spec(vhost_perms_info_keys/0 :: () -> rabbit_types:info_keys()). --spec(user_perms_info_keys/0 :: () -> rabbit_types:info_keys()). --spec(user_vhost_perms_info_keys/0 :: () -> rabbit_types:info_keys()). + -endif. %%---------------------------------------------------------------------------- - --define(PERMS_INFO_KEYS, [configure, write, read]). --define(USER_INFO_KEYS, [user, tags]). - %% Implementation of rabbit_auth_backend description() -> @@ -85,11 +82,14 @@ description() -> check_user_login(Username, []) -> internal_check_user_login(Username, fun(_) -> true end); -check_user_login(Username, [{password, Password}]) -> +check_user_login(Username, [{password, Cleartext}]) -> internal_check_user_login( - Username, fun(#internal_user{password_hash = Hash}) -> - check_password(Password, Hash) - end); + Username, + fun (#internal_user{password_hash = <<Salt:4/binary, Hash/binary>>}) -> + Hash =:= salted_md5(Salt, Cleartext); + (#internal_user{}) -> + false + end); check_user_login(Username, AuthProps) -> exit({unknown_auth_props, Username, AuthProps}). @@ -145,42 +145,43 @@ permission_index(read) -> #permission.read. add_user(Username, Password) -> rabbit_log:info("Creating user '~s'~n", [Username]), - R = rabbit_misc:execute_mnesia_transaction( - fun () -> - case mnesia:wread({rabbit_user, Username}) of - [] -> - ok = mnesia:write( - rabbit_user, - #internal_user{username = Username, - password_hash = - hash_password(Password), - tags = []}, - write); - _ -> - mnesia:abort({user_already_exists, Username}) - end - end), - R. + rabbit_misc:execute_mnesia_transaction( + fun () -> + case mnesia:wread({rabbit_user, Username}) of + [] -> + ok = mnesia:write( + rabbit_user, + #internal_user{username = Username, + password_hash = + hash_password(Password), + tags = []}, + write); + _ -> + mnesia:abort({user_already_exists, Username}) + end + end). delete_user(Username) -> rabbit_log:info("Deleting user '~s'~n", [Username]), - R = rabbit_misc:execute_mnesia_transaction( - rabbit_misc:with_user( - Username, - fun () -> - ok = mnesia:delete({rabbit_user, Username}), - [ok = mnesia:delete_object( - rabbit_user_permission, R, write) || - R <- mnesia:match_object( - rabbit_user_permission, - #user_permission{user_vhost = #user_vhost{ - username = Username, - virtual_host = '_'}, - permission = '_'}, - write)], - ok - end)), - R. + rabbit_misc:execute_mnesia_transaction( + rabbit_misc:with_user( + Username, + fun () -> + ok = mnesia:delete({rabbit_user, Username}), + [ok = mnesia:delete_object( + rabbit_user_permission, R, write) || + R <- mnesia:match_object( + rabbit_user_permission, + #user_permission{user_vhost = #user_vhost{ + username = Username, + virtual_host = '_'}, + permission = '_'}, + write)], + ok + end)). + +lookup_user(Username) -> + rabbit_misc:dirty_read({rabbit_user, Username}). change_password(Username, Password) -> rabbit_log:info("Changing password for '~s'~n", [Username]), @@ -190,70 +191,44 @@ clear_password(Username) -> rabbit_log:info("Clearing password for '~s'~n", [Username]), change_password_hash(Username, <<"">>). -change_password_hash(Username, PasswordHash) -> - R = update_user(Username, fun(User) -> - User#internal_user{ - password_hash = PasswordHash } - end), - R. - hash_password(Cleartext) -> - Salt = make_salt(), - Hash = salted_md5(Salt, Cleartext), - <<Salt/binary, Hash/binary>>. - -check_password(Cleartext, <<Salt:4/binary, Hash/binary>>) -> - Hash =:= salted_md5(Salt, Cleartext); -check_password(_Cleartext, _Any) -> - false. - -make_salt() -> {A1,A2,A3} = now(), random:seed(A1, A2, A3), Salt = random:uniform(16#ffffffff), - <<Salt:32>>. + SaltBin = <<Salt:32>>, + Hash = salted_md5(SaltBin, Cleartext), + <<SaltBin/binary, Hash/binary>>. + +change_password_hash(Username, PasswordHash) -> + update_user(Username, fun(User) -> + User#internal_user{ + password_hash = PasswordHash } + end). salted_md5(Salt, Cleartext) -> Salted = <<Salt/binary, Cleartext/binary>>, erlang:md5(Salted). set_tags(Username, Tags) -> - rabbit_log:info("Setting user tags for user '~s' to ~p~n", [Username, Tags]), - R = update_user(Username, fun(User) -> - User#internal_user{tags = Tags} - end), - R. - -update_user(Username, Fun) -> - rabbit_misc:execute_mnesia_transaction( - rabbit_misc:with_user( - Username, - fun () -> - {ok, User} = lookup_user(Username), - ok = mnesia:write(rabbit_user, Fun(User), write) - end)). - -list_users() -> - [[{user, Username}, {tags, Tags}] || - #internal_user{username = Username, tags = Tags} <- - mnesia:dirty_match_object(rabbit_user, #internal_user{_ = '_'})]. - -user_info_keys() -> ?USER_INFO_KEYS. - -lookup_user(Username) -> - rabbit_misc:dirty_read({rabbit_user, Username}). - -validate_regexp(RegexpBin) -> - Regexp = binary_to_list(RegexpBin), - case re:compile(Regexp) of - {ok, _} -> ok; - {error, Reason} -> throw({error, {invalid_regexp, Regexp, Reason}}) - end. + rabbit_log:info("Setting user tags for user '~s' to ~p~n", + [Username, Tags]), + update_user(Username, fun(User) -> + User#internal_user{tags = Tags} + end). set_permissions(Username, VHostPath, ConfigurePerm, WritePerm, ReadPerm) -> - rabbit_log:info("Setting permissions for '~s' in '~s' to '~s', '~s', '~s'~n", + rabbit_log:info("Setting permissions for " + "'~s' in '~s' to '~s', '~s', '~s'~n", [Username, VHostPath, ConfigurePerm, WritePerm, ReadPerm]), - lists:map(fun validate_regexp/1, [ConfigurePerm, WritePerm, ReadPerm]), + lists:map( + fun (RegexpBin) -> + Regexp = binary_to_list(RegexpBin), + case re:compile(Regexp) of + {ok, _} -> ok; + {error, Reason} -> throw({error, {invalid_regexp, + Regexp, Reason}}) + end + end, [ConfigurePerm, WritePerm, ReadPerm]), rabbit_misc:execute_mnesia_transaction( rabbit_misc:with_user_and_vhost( Username, VHostPath, @@ -269,7 +244,6 @@ set_permissions(Username, VHostPath, ConfigurePerm, WritePerm, ReadPerm) -> write) end)). - clear_permissions(Username, VHostPath) -> rabbit_misc:execute_mnesia_transaction( rabbit_misc:with_user_and_vhost( @@ -280,32 +254,36 @@ clear_permissions(Username, VHostPath) -> virtual_host = VHostPath}}) end)). +update_user(Username, Fun) -> + rabbit_misc:execute_mnesia_transaction( + rabbit_misc:with_user( + Username, + fun () -> + {ok, User} = lookup_user(Username), + ok = mnesia:write(rabbit_user, Fun(User), write) + end)). + +%%---------------------------------------------------------------------------- +%% Listing + +-define(PERMS_INFO_KEYS, [configure, write, read]). +-define(USER_INFO_KEYS, [user, tags]). + +user_info_keys() -> ?USER_INFO_KEYS. + perms_info_keys() -> [user, vhost | ?PERMS_INFO_KEYS]. vhost_perms_info_keys() -> [user | ?PERMS_INFO_KEYS]. user_perms_info_keys() -> [vhost | ?PERMS_INFO_KEYS]. user_vhost_perms_info_keys() -> ?PERMS_INFO_KEYS. +list_users() -> + [[{user, Username}, {tags, Tags}] || + #internal_user{username = Username, tags = Tags} <- + mnesia:dirty_match_object(rabbit_user, #internal_user{_ = '_'})]. + list_permissions() -> list_permissions(perms_info_keys(), match_user_vhost('_', '_')). -list_vhost_permissions(VHostPath) -> - list_permissions( - vhost_perms_info_keys(), - rabbit_vhost:with(VHostPath, match_user_vhost('_', VHostPath))). - -list_user_permissions(Username) -> - list_permissions( - user_perms_info_keys(), - rabbit_misc:with_user(Username, match_user_vhost(Username, '_'))). - -list_user_vhost_permissions(Username, VHostPath) -> - list_permissions( - user_vhost_perms_info_keys(), - rabbit_misc:with_user_and_vhost( - Username, VHostPath, match_user_vhost(Username, VHostPath))). - -filter_props(Keys, Props) -> [T || T = {K, _} <- Props, lists:member(K, Keys)]. - list_permissions(Keys, QueryThunk) -> [filter_props(Keys, [{user, Username}, {vhost, VHostPath}, @@ -320,6 +298,24 @@ list_permissions(Keys, QueryThunk) -> %% TODO: use dirty ops instead rabbit_misc:execute_mnesia_transaction(QueryThunk)]. +filter_props(Keys, Props) -> [T || T = {K, _} <- Props, lists:member(K, Keys)]. + +list_user_permissions(Username) -> + list_permissions( + user_perms_info_keys(), + rabbit_misc:with_user(Username, match_user_vhost(Username, '_'))). + +list_vhost_permissions(VHostPath) -> + list_permissions( + vhost_perms_info_keys(), + rabbit_vhost:with(VHostPath, match_user_vhost('_', VHostPath))). + +list_user_vhost_permissions(Username, VHostPath) -> + list_permissions( + user_vhost_perms_info_keys(), + rabbit_misc:with_user_and_vhost( + Username, VHostPath, match_user_vhost(Username, VHostPath))). + match_user_vhost(Username, VHostPath) -> fun () -> mnesia:match_object( rabbit_user_permission, diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl index 2b561900..3d88be7a 100644 --- a/src/rabbit_backing_queue.erl +++ b/src/rabbit_backing_queue.erl @@ -209,6 +209,10 @@ %% Called immediately before the queue hibernates. -callback handle_pre_hibernate(state()) -> state(). +%% Used to help prioritisation in rabbit_amqqueue_process. The rate of +%% inbound messages and outbound messages at the moment. +-callback msg_rates(state()) -> {float(), float()}. + %% Exists for debugging purposes, to be able to expose state via %% rabbitmqctl list_queues backing_queue_status -callback status(state()) -> [{atom(), any()}]. @@ -236,7 +240,8 @@ behaviour_info(callbacks) -> {fetch, 2}, {ack, 2}, {requeue, 2}, {ackfold, 4}, {fold, 3}, {len, 1}, {is_empty, 1}, {depth, 1}, {set_ram_duration_target, 2}, {ram_duration, 1}, {needs_timeout, 1}, {timeout, 1}, - {handle_pre_hibernate, 1}, {status, 1}, {invoke, 3}, {is_duplicate, 2}] ; + {handle_pre_hibernate, 1}, {msg_rates, 1}, {status, 1}, + {invoke, 3}, {is_duplicate, 2}] ; behaviour_info(_Other) -> undefined. diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl index 11e6bd38..bb9c61a8 100644 --- a/src/rabbit_binding.erl +++ b/src/rabbit_binding.erl @@ -169,7 +169,7 @@ add(Binding, InnerFun) -> ok -> case mnesia:read({rabbit_route, B}) of [] -> add(Src, Dst, B); - [_] -> fun rabbit_misc:const_ok/0 + [_] -> fun () -> ok end end; {error, _} = Err -> rabbit_misc:const(Err) diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 4d866908..7907c96c 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -24,7 +24,7 @@ -export([send_command/2, deliver/4, send_credit_reply/2, send_drained/2]). -export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1]). -export([refresh_config_local/0, ready_for_close/1]). --export([force_event_refresh/0]). +-export([force_event_refresh/1]). -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2, handle_pre_hibernate/1, prioritise_call/4, @@ -106,7 +106,7 @@ -spec(info_all/1 :: (rabbit_types:info_keys()) -> [rabbit_types:infos()]). -spec(refresh_config_local/0 :: () -> 'ok'). -spec(ready_for_close/1 :: (pid()) -> 'ok'). --spec(force_event_refresh/0 :: () -> 'ok'). +-spec(force_event_refresh/1 :: (reference()) -> 'ok'). -endif. @@ -179,8 +179,8 @@ refresh_config_local() -> ready_for_close(Pid) -> gen_server2:cast(Pid, ready_for_close). -force_event_refresh() -> - [gen_server2:cast(C, force_event_refresh) || C <- list()], +force_event_refresh(Ref) -> + [gen_server2:cast(C, {force_event_refresh, Ref}) || C <- list()], ok. %%--------------------------------------------------------------------------- @@ -335,8 +335,9 @@ handle_cast({send_drained, CTagCredit}, State = #ch{writer_pid = WriterPid}) -> || {ConsumerTag, CreditDrained} <- CTagCredit], noreply(State); -handle_cast(force_event_refresh, State) -> - rabbit_event:notify(channel_created, infos(?CREATION_EVENT_KEYS, State)), +handle_cast({force_event_refresh, Ref}, State) -> + rabbit_event:notify(channel_created, infos(?CREATION_EVENT_KEYS, State), + Ref), noreply(State); handle_cast({mandatory_received, MsgSeqNo}, State = #ch{mandatory = Mand}) -> @@ -496,15 +497,14 @@ check_user_id_header(#'P_basic'{user_id = undefined}, _) -> check_user_id_header(#'P_basic'{user_id = Username}, #ch{user = #user{username = Username}}) -> ok; +check_user_id_header( + #'P_basic'{}, #ch{user = #user{auth_backend = rabbit_auth_backend_dummy}}) -> + ok; check_user_id_header(#'P_basic'{user_id = Claimed}, - #ch{user = #user{username = Actual, - tags = Tags}}) -> - case lists:member(impersonator, Tags) of - true -> ok; - false -> precondition_failed( - "user_id property set to '~s' but authenticated user was " - "'~s'", [Claimed, Actual]) - end. + #ch{user = #user{username = Actual}}) -> + precondition_failed( + "user_id property set to '~s' but authenticated user was '~s'", + [Claimed, Actual]). check_expiration_header(Props) -> case rabbit_basic:parse_expiration(Props) of @@ -1439,8 +1439,9 @@ notify_limiter(Limiter, Acked) -> end end. -deliver_to_queues({#delivery{message = #basic_message{exchange_name = XName}, - mandatory = false}, +deliver_to_queues({#delivery{message = #basic_message{exchange_name = XName}, + confirm = false, + mandatory = false}, []}, State) -> %% optimisation ?INCR_STATS([{exchange_stats, XName, 1}], publish, State), State; diff --git a/src/rabbit_channel_interceptor.erl b/src/rabbit_channel_interceptor.erl index 2bd22579..49f7e388 100644 --- a/src/rabbit_channel_interceptor.erl +++ b/src/rabbit_channel_interceptor.erl @@ -51,8 +51,11 @@ behaviour_info(_Other) -> %%---------------------------------------------------------------------------- -intercept_method(#'basic.publish'{} = M, _VHost) -> - M; +intercept_method(#'basic.publish'{} = M, _VHost) -> M; +intercept_method(#'basic.ack'{} = M, _VHost) -> M; +intercept_method(#'basic.nack'{} = M, _VHost) -> M; +intercept_method(#'basic.reject'{} = M, _VHost) -> M; +intercept_method(#'basic.credit'{} = M, _VHost) -> M; intercept_method(M, VHost) -> intercept_method(M, VHost, select(rabbit_misc:method_record_type(M))). @@ -87,5 +90,7 @@ select(Method) -> validate_method(M, M2) -> rabbit_misc:method_record_type(M) =:= rabbit_misc:method_record_type(M2). +%% keep dialyzer happy +-spec internal_error(string(), [any()]) -> no_return(). internal_error(Format, Args) -> - rabbit_misc:protocol_error(internal_error, Format, Args).
\ No newline at end of file + rabbit_misc:protocol_error(internal_error, Format, Args). diff --git a/src/rabbit_control_main.erl b/src/rabbit_control_main.erl index f3463286..746f2bdb 100644 --- a/src/rabbit_control_main.erl +++ b/src/rabbit_control_main.erl @@ -90,6 +90,7 @@ status, environment, report, + set_cluster_name, eval, close_connection, @@ -527,6 +528,10 @@ action(report, Node, _Args, _Opts, Inform) -> [print_report(Node, Q, [V]) || Q <- ?VHOST_QUERIES, V <- VHosts], ok; +action(set_cluster_name, Node, [Name], _Opts, Inform) -> + Inform("Setting cluster name to ~s", [Name]), + rpc_call(Node, rabbit_nodes, set_cluster_name, [list_to_binary(Name)]); + action(eval, Node, [Expr], _Opts, _Inform) -> case erl_scan:string(Expr) of {ok, Scanned, _} -> diff --git a/src/rabbit_direct.erl b/src/rabbit_direct.erl index 5a004792..c372d5f1 100644 --- a/src/rabbit_direct.erl +++ b/src/rabbit_direct.erl @@ -16,7 +16,7 @@ -module(rabbit_direct). --export([boot/0, force_event_refresh/0, list/0, connect/5, +-export([boot/0, force_event_refresh/1, list/0, connect/5, start_channel/9, disconnect/2]). %% Internal -export([list_local/0]). @@ -28,10 +28,10 @@ -ifdef(use_specs). -spec(boot/0 :: () -> 'ok'). --spec(force_event_refresh/0 :: () -> 'ok'). +-spec(force_event_refresh/1 :: (reference()) -> 'ok'). -spec(list/0 :: () -> [pid()]). -spec(list_local/0 :: () -> [pid()]). --spec(connect/5 :: ((rabbit_types:username() | rabbit_types:user() | +-spec(connect/5 :: (('nouser' | {rabbit_types:username(), rabbit_types:password()}), rabbit_types:vhost(), rabbit_types:protocol(), pid(), rabbit_event:event_props()) -> @@ -54,8 +54,8 @@ boot() -> rabbit_sup:start_supervisor_child( [{local, rabbit_direct_client_sup}, {rabbit_channel_sup, start_link, []}]). -force_event_refresh() -> - [Pid ! force_event_refresh || Pid<- list()], +force_event_refresh(Ref) -> + [Pid ! {force_event_refresh, Ref} || Pid <- list()], ok. list_local() -> @@ -67,37 +67,35 @@ list() -> %%---------------------------------------------------------------------------- -connect(User = #user{}, VHost, Protocol, Pid, Infos) -> - try rabbit_access_control:check_vhost_access(User, VHost) of - ok -> ok = pg_local:join(rabbit_direct, Pid), - rabbit_event:notify(connection_created, Infos), - {ok, {User, rabbit_reader:server_properties(Protocol)}} - catch - exit:#amqp_error{name = access_refused} -> - {error, access_refused} - end; - connect({Username, Password}, VHost, Protocol, Pid, Infos) -> connect0(fun () -> rabbit_access_control:check_user_pass_login( Username, Password) end, VHost, Protocol, Pid, Infos); -connect(Username, VHost, Protocol, Pid, Infos) -> - connect0(fun () -> rabbit_access_control:check_user_login( - Username, []) end, +connect(nouser, VHost, Protocol, Pid, Infos) -> + connect0(fun () -> {ok, rabbit_auth_backend_dummy:user()} end, VHost, Protocol, Pid, Infos). connect0(AuthFun, VHost, Protocol, Pid, Infos) -> case rabbit:is_running() of true -> case AuthFun() of {ok, User} -> - connect(User, VHost, Protocol, Pid, Infos); + connect1(User, VHost, Protocol, Pid, Infos); {refused, _M, _A} -> {error, {auth_failure, "Refused"}} end; false -> {error, broker_not_found_on_node} end. +connect1(User, VHost, Protocol, Pid, Infos) -> + try rabbit_access_control:check_vhost_access(User, VHost) of + ok -> ok = pg_local:join(rabbit_direct, Pid), + rabbit_event:notify(connection_created, Infos), + {ok, {User, rabbit_reader:server_properties(Protocol)}} + catch + exit:#amqp_error{name = access_refused} -> + {error, access_refused} + end. start_channel(Number, ClientChannelPid, ConnPid, ConnName, Protocol, User, VHost, Capabilities, Collector) -> diff --git a/src/rabbit_event.erl b/src/rabbit_event.erl index a713d76b..e0226955 100644 --- a/src/rabbit_event.erl +++ b/src/rabbit_event.erl @@ -22,7 +22,7 @@ -export([init_stats_timer/2, init_disabled_stats_timer/2, ensure_stats_timer/3, stop_stats_timer/2, reset_stats_timer/2]). -export([stats_level/2, if_enabled/3]). --export([notify/2, notify_if/3]). +-export([notify/2, notify/3, notify_if/3]). %%---------------------------------------------------------------------------- @@ -41,6 +41,7 @@ -type(event() :: #event { type :: event_type(), props :: event_props(), + reference :: 'none' | reference(), timestamp :: event_timestamp() }). -type(level() :: 'none' | 'coarse' | 'fine'). @@ -58,6 +59,7 @@ -spec(stats_level/2 :: (container(), pos()) -> level()). -spec(if_enabled/3 :: (container(), pos(), timer_fun()) -> 'ok'). -spec(notify/2 :: (event_type(), event_props()) -> 'ok'). +-spec(notify/3 :: (event_type(), event_props(), reference() | 'none') -> 'ok'). -spec(notify_if/3 :: (boolean(), event_type(), event_props()) -> 'ok'). -endif. @@ -140,7 +142,10 @@ if_enabled(C, P, Fun) -> notify_if(true, Type, Props) -> notify(Type, Props); notify_if(false, _Type, _Props) -> ok. -notify(Type, Props) -> +notify(Type, Props) -> notify(Type, Props, none). + +notify(Type, Props, Ref) -> gen_event:notify(?MODULE, #event{type = Type, props = Props, + reference = Ref, timestamp = os:timestamp()}). diff --git a/src/rabbit_exchange_type_topic.erl b/src/rabbit_exchange_type_topic.erl index 8ba29deb..27b8d1e6 100644 --- a/src/rabbit_exchange_type_topic.erl +++ b/src/rabbit_exchange_type_topic.erl @@ -79,9 +79,9 @@ remove_bindings(transaction, _X, Bs) -> [begin Path = [{FinalNode, _} | _] = follow_down_get_path(X, split_topic_key(K)), - trie_remove_binding(X, FinalNode, D), + trie_remove_binding(X, FinalNode, D, Args), remove_path_if_empty(X, Path) - end || #binding{source = X, key = K, destination = D} <- Bs], + end || #binding{source = X, key = K, destination = D, args = Args} <- Bs], ok; remove_bindings(none, _X, _Bs) -> ok. @@ -91,9 +91,10 @@ assert_args_equivalence(X, Args) -> %%---------------------------------------------------------------------------- -internal_add_binding(#binding{source = X, key = K, destination = D}) -> +internal_add_binding(#binding{source = X, key = K, destination = D, + args = Args}) -> FinalNode = follow_down_create(X, split_topic_key(K)), - trie_add_binding(X, FinalNode, D), + trie_add_binding(X, FinalNode, D, Args), ok. trie_match(X, Words) -> @@ -176,7 +177,8 @@ trie_bindings(X, Node) -> MatchHead = #topic_trie_binding{ trie_binding = #trie_binding{exchange_name = X, node_id = Node, - destination = '$1'}}, + destination = '$1', + arguments = '_'}}, mnesia:select(rabbit_topic_trie_binding, [{MatchHead, [], ['$1']}]). trie_update_node_counts(X, Node, Field, Delta) -> @@ -213,20 +215,21 @@ trie_edge_op(X, FromNode, ToNode, W, Op) -> node_id = ToNode}, write). -trie_add_binding(X, Node, D) -> +trie_add_binding(X, Node, D, Args) -> trie_update_node_counts(X, Node, #topic_trie_node.binding_count, +1), - trie_binding_op(X, Node, D, fun mnesia:write/3). + trie_binding_op(X, Node, D, Args, fun mnesia:write/3). -trie_remove_binding(X, Node, D) -> +trie_remove_binding(X, Node, D, Args) -> trie_update_node_counts(X, Node, #topic_trie_node.binding_count, -1), - trie_binding_op(X, Node, D, fun mnesia:delete_object/3). + trie_binding_op(X, Node, D, Args, fun mnesia:delete_object/3). -trie_binding_op(X, Node, D, Op) -> +trie_binding_op(X, Node, D, Args, Op) -> ok = Op(rabbit_topic_trie_binding, #topic_trie_binding{ trie_binding = #trie_binding{exchange_name = X, node_id = Node, - destination = D}}, + destination = D, + arguments = Args}}, write). trie_remove_all_nodes(X) -> diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index 9ce5afcb..b272c64f 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -22,7 +22,7 @@ len/1, is_empty/1, depth/1, drain_confirmed/1, dropwhile/2, fetchwhile/4, set_ram_duration_target/2, ram_duration/1, needs_timeout/1, timeout/1, handle_pre_hibernate/1, - status/1, invoke/3, is_duplicate/2]). + msg_rates/1, status/1, invoke/3, is_duplicate/2]). -export([start/1, stop/0]). @@ -353,6 +353,9 @@ handle_pre_hibernate(State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> State #state { backing_queue_state = BQ:handle_pre_hibernate(BQS) }. +msg_rates(#state { backing_queue = BQ, backing_queue_state = BQS }) -> + BQ:msg_rates(BQS). + status(State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> BQ:status(BQS) ++ [ {mirror_seen, dict:size(State #state.seen_status)}, diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl index ca495733..4f77009c 100644 --- a/src/rabbit_mirror_queue_misc.erl +++ b/src/rabbit_mirror_queue_misc.erl @@ -148,53 +148,54 @@ drop_mirrors(QName, Nodes) -> ok. drop_mirror(QName, MirrorNode) -> - rabbit_amqqueue:with( - QName, - fun (#amqqueue { name = Name, pid = QPid, slave_pids = SPids }) -> - case [Pid || Pid <- [QPid | SPids], node(Pid) =:= MirrorNode] of - [] -> - {error, {queue_not_mirrored_on_node, MirrorNode}}; - [QPid] when SPids =:= [] -> - {error, cannot_drop_only_mirror}; - [Pid] -> - rabbit_log:info( - "Dropping queue mirror on node ~p for ~s~n", - [MirrorNode, rabbit_misc:rs(Name)]), - exit(Pid, {shutdown, dropped}), - {ok, dropped} - end - end). + case rabbit_amqqueue:lookup(QName) of + {ok, #amqqueue { name = Name, pid = QPid, slave_pids = SPids }} -> + case [Pid || Pid <- [QPid | SPids], node(Pid) =:= MirrorNode] of + [] -> + {error, {queue_not_mirrored_on_node, MirrorNode}}; + [QPid] when SPids =:= [] -> + {error, cannot_drop_only_mirror}; + [Pid] -> + rabbit_log:info( + "Dropping queue mirror on node ~p for ~s~n", + [MirrorNode, rabbit_misc:rs(Name)]), + exit(Pid, {shutdown, dropped}), + {ok, dropped} + end; + {error, not_found} = E -> + E + end. add_mirrors(QName, Nodes, SyncMode) -> [add_mirror(QName, Node, SyncMode) || Node <- Nodes], ok. add_mirror(QName, MirrorNode, SyncMode) -> - rabbit_amqqueue:with( - QName, - fun (#amqqueue { name = Name, pid = QPid, slave_pids = SPids } = Q) -> - case [Pid || Pid <- [QPid | SPids], node(Pid) =:= MirrorNode] of - [] -> - start_child(Name, MirrorNode, Q, SyncMode); - [SPid] -> - case rabbit_misc:is_process_alive(SPid) of - true -> {ok, already_mirrored}; - false -> start_child(Name, MirrorNode, Q, SyncMode) - end - end - end). + case rabbit_amqqueue:lookup(QName) of + {ok, #amqqueue { name = Name, pid = QPid, slave_pids = SPids } = Q} -> + case [Pid || Pid <- [QPid | SPids], node(Pid) =:= MirrorNode] of + [] -> + start_child(Name, MirrorNode, Q, SyncMode); + [SPid] -> + case rabbit_misc:is_process_alive(SPid) of + true -> {ok, already_mirrored}; + false -> start_child(Name, MirrorNode, Q, SyncMode) + end + end; + {error, not_found} = E -> + E + end. start_child(Name, MirrorNode, Q, SyncMode) -> - case rabbit_misc:with_exit_handler( - rabbit_misc:const(down), - fun () -> - rabbit_mirror_queue_slave_sup:start_child(MirrorNode, [Q]) - end) of - {ok, SPid} -> rabbit_log:info("Adding mirror of ~s on node ~p: ~p~n", - [rabbit_misc:rs(Name), MirrorNode, SPid]), - rabbit_mirror_queue_slave:go(SPid, SyncMode); - _ -> ok - end. + rabbit_misc:with_exit_handler( + rabbit_misc:const(ok), + fun () -> + {ok, SPid} = rabbit_mirror_queue_slave_sup:start_child( + MirrorNode, [Q]), + rabbit_log:info("Adding mirror of ~s on node ~p: ~p~n", + [rabbit_misc:rs(Name), MirrorNode, SPid]), + rabbit_mirror_queue_slave:go(SPid, SyncMode) + end). report_deaths(_MirrorPid, _IsMaster, _QueueName, []) -> ok; diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index da185dce..1f31b5c8 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -115,7 +115,7 @@ handle_go(Q = #amqqueue{name = QName}) -> Self, {rabbit_amqqueue, set_ram_duration_target, [Self]}), {ok, BQ} = application:get_env(backing_queue_module), Q1 = Q #amqqueue { pid = QPid }, - BQS = bq_init(BQ, Q1, []), + BQS = bq_init(BQ, Q1, new), State = #state { q = Q1, gm = GM, backing_queue = BQ, diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index 80e160d9..848c4a87 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -53,13 +53,12 @@ -export([parse_arguments/3]). -export([all_module_attributes/1, build_acyclic_graph/3]). -export([now_ms/0]). --export([const_ok/0, const/1]). +-export([const/1]). -export([ntoa/1, ntoab/1]). -export([is_process_alive/1]). -export([pget/2, pget/3, pget_or_die/2, pset/3]). -export([format_message_queue/2]). -export([append_rpc_all_nodes/4]). --export([multi_call/2]). -export([os_cmd/1]). -export([gb_sets_difference/2]). -export([version/0, which_applications/0]). @@ -71,6 +70,7 @@ -export([ensure_timer/4, stop_timer/2]). -export([get_parent/0]). -export([store_proc_name/1, store_proc_name/2]). +-export([moving_average/4]). %% Horrible macro to use in guards -define(IS_BENIGN_EXIT(R), @@ -218,7 +218,6 @@ {bad_edge, [digraph:vertex()]}), digraph:vertex(), digraph:vertex()})). -spec(now_ms/0 :: () -> non_neg_integer()). --spec(const_ok/0 :: () -> 'ok'). -spec(const/1 :: (A) -> thunk(A)). -spec(ntoa/1 :: (inet:ip_address()) -> string()). -spec(ntoab/1 :: (inet:ip_address()) -> string()). @@ -229,8 +228,6 @@ -spec(pset/3 :: (term(), term(), [term()]) -> term()). -spec(format_message_queue/2 :: (any(), priority_queue:q()) -> term()). -spec(append_rpc_all_nodes/4 :: ([node()], atom(), atom(), [any()]) -> [any()]). --spec(multi_call/2 :: - ([pid()], any()) -> {[{pid(), any()}], [{pid(), any()}]}). -spec(os_cmd/1 :: (string()) -> string()). -spec(gb_sets_difference/2 :: (gb_set(), gb_set()) -> gb_set()). -spec(version/0 :: () -> string()). @@ -251,6 +248,8 @@ -spec(get_parent/0 :: () -> pid()). -spec(store_proc_name/2 :: (atom(), rabbit_types:proc_name()) -> ok). -spec(store_proc_name/1 :: (rabbit_types:proc_type_and_name()) -> ok). +-spec(moving_average/4 :: (float(), float(), float(), float() | 'undefined') + -> float()). -endif. %%---------------------------------------------------------------------------- @@ -888,7 +887,6 @@ build_acyclic_graph(VertexFun, EdgeFun, Graph) -> {error, Reason} end. -const_ok() -> ok. const(X) -> fun () -> X end. %% Format IPv4-mapped IPv6 addresses as IPv4, since they're what we see @@ -947,31 +945,6 @@ append_rpc_all_nodes(Nodes, M, F, A) -> _ -> Res end || Res <- ResL]). -%% A simplified version of gen_server:multi_call/2 with a sane -%% API. This is not in gen_server2 as there is no useful -%% infrastructure there to share. -multi_call(Pids, Req) -> - MonitorPids = [start_multi_call(Pid, Req) || Pid <- Pids], - receive_multi_call(MonitorPids, [], []). - -start_multi_call(Pid, Req) when is_pid(Pid) -> - Mref = erlang:monitor(process, Pid), - Pid ! {'$gen_call', {self(), Mref}, Req}, - {Mref, Pid}. - -receive_multi_call([], Good, Bad) -> - {lists:reverse(Good), lists:reverse(Bad)}; -receive_multi_call([{Mref, Pid} | MonitorPids], Good, Bad) -> - receive - {Mref, Reply} -> - erlang:demonitor(Mref, [flush]), - receive_multi_call(MonitorPids, [{Pid, Reply} | Good], Bad); - {'DOWN', Mref, _, _, noconnection} -> - receive_multi_call(MonitorPids, Good, [{Pid, nodedown} | Bad]); - {'DOWN', Mref, _, _, Reason} -> - receive_multi_call(MonitorPids, Good, [{Pid, Reason} | Bad]) - end. - os_cmd(Command) -> case os:type() of {win32, _} -> @@ -1088,6 +1061,12 @@ stop_timer(State, Idx) -> store_proc_name(Type, ProcName) -> store_proc_name({Type, ProcName}). store_proc_name(TypeProcName) -> put(process_name, TypeProcName). +moving_average(_Time, _HalfLife, Next, undefined) -> + Next; +moving_average(Time, HalfLife, Next, Current) -> + Weight = math:exp(Time * math:log(0.5) / HalfLife), + Next * (1 - Weight) + Current * Weight. + %% ------------------------------------------------------------------------- %% Begin copypasta from gen_server2.erl diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index f27f77c6..59873ffc 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -327,6 +327,7 @@ status() -> case is_running() of true -> RunningNodes = cluster_nodes(running), [{running_nodes, RunningNodes}, + {cluster_name, rabbit_nodes:cluster_name()}, {partitions, mnesia_partitions(RunningNodes)}]; false -> [] end. diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl index 91be4dcb..42438790 100644 --- a/src/rabbit_networking.erl +++ b/src/rabbit_networking.erl @@ -22,7 +22,7 @@ connections/0, connection_info_keys/0, connection_info/1, connection_info/2, connection_info_all/0, connection_info_all/1, - close_connection/2, force_connection_event_refresh/0, tcp_host/1]). + close_connection/2, force_connection_event_refresh/1, tcp_host/1]). %%used by TCP-based transports, e.g. STOMP adapter -export([tcp_listener_addresses/1, tcp_listener_spec/6, @@ -80,7 +80,7 @@ -spec(connection_info_all/1 :: (rabbit_types:info_keys()) -> [rabbit_types:infos()]). -spec(close_connection/2 :: (pid(), string()) -> 'ok'). --spec(force_connection_event_refresh/0 :: () -> 'ok'). +-spec(force_connection_event_refresh/1 :: (reference()) -> 'ok'). -spec(on_node_down/1 :: (node()) -> 'ok'). -spec(tcp_listener_addresses/1 :: (listener_config()) -> [address()]). @@ -331,8 +331,8 @@ close_connection(Pid, Explanation) -> false -> throw({error, {not_a_connection_pid, Pid}}) end. -force_connection_event_refresh() -> - [rabbit_reader:force_event_refresh(C) || C <- connections()], +force_connection_event_refresh(Ref) -> + [rabbit_reader:force_event_refresh(C, Ref) || C <- connections()], ok. %%-------------------------------------------------------------------- diff --git a/src/rabbit_nodes.erl b/src/rabbit_nodes.erl index 5a1613a7..c5aa8473 100644 --- a/src/rabbit_nodes.erl +++ b/src/rabbit_nodes.erl @@ -17,7 +17,8 @@ -module(rabbit_nodes). -export([names/1, diagnostics/1, make/1, parts/1, cookie_hash/0, - is_running/2, is_process_running/2, fqdn_nodename/0]). + is_running/2, is_process_running/2, + cluster_name/0, set_cluster_name/1]). -include_lib("kernel/include/inet.hrl"). @@ -37,7 +38,8 @@ -spec(cookie_hash/0 :: () -> string()). -spec(is_running/2 :: (node(), atom()) -> boolean()). -spec(is_process_running/2 :: (node(), atom()) -> boolean()). --spec(fqdn_nodename/0 :: () -> binary()). +-spec(cluster_name/0 :: () -> binary()). +-spec(set_cluster_name/1 :: (binary()) -> 'ok'). -endif. @@ -111,8 +113,15 @@ is_process_running(Node, Process) -> P when is_pid(P) -> true end. -fqdn_nodename() -> +cluster_name() -> + rabbit_runtime_parameters:value_global( + cluster_name, cluster_name_default()). + +cluster_name_default() -> {ID, _} = rabbit_nodes:parts(node()), {ok, Host} = inet:gethostname(), {ok, #hostent{h_name = FQDN}} = inet:gethostbyname(Host), list_to_binary(atom_to_list(rabbit_nodes:make({ID, FQDN}))). + +set_cluster_name(Name) -> + rabbit_runtime_parameters:set_global(cluster_name, Name). diff --git a/src/rabbit_queue_consumers.erl b/src/rabbit_queue_consumers.erl index bea7e0d0..c9540da8 100644 --- a/src/rabbit_queue_consumers.erl +++ b/src/rabbit_queue_consumers.erl @@ -27,6 +27,9 @@ -define(UNSENT_MESSAGE_LIMIT, 200). +%% Utilisation average calculations are all in μs. +-define(USE_AVG_HALF_LIFE, 1000000.0). + -record(state, {consumers, use}). -record(consumer, {tag, ack_required, args}). @@ -430,11 +433,6 @@ update_use({inactive, Since, Active, Avg}, active) -> use_avg(Active, Inactive, Avg) -> Time = Inactive + Active, - Ratio = Active / Time, - Weight = erlang:min(1, Time / 1000000), - case Avg of - undefined -> Ratio; - _ -> Ratio * Weight + Avg * (1 - Weight) - end. + rabbit_misc:moving_average(Time, ?USE_AVG_HALF_LIFE, Active / Time, Avg). now_micros() -> timer:now_diff(now(), {0,0,0}). diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index 29f6f5a7..e00508b4 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -358,9 +358,10 @@ start(DurableQueueNames) -> %% Any queue directory we've not been asked to recover is considered garbage QueuesDir = queues_dir(), - [rabbit_file:recursive_delete([QueueDir]) || - QueueDir <- all_queue_directory_names(QueuesDir), - not sets:is_element(filename:basename(QueueDir), DurableDirectories)], + rabbit_file:recursive_delete( + [filename:join(QueuesDir, DirName) || + DirName <- all_queue_directory_names(QueuesDir), + not sets:is_element(DirName, DurableDirectories)]), rabbit_recovery_terms:clear(), @@ -373,9 +374,8 @@ stop() -> rabbit_recovery_terms:stop(). all_queue_directory_names(Dir) -> case rabbit_file:list_dir(Dir) of - {ok, Entries} -> [ Entry || Entry <- Entries, - rabbit_file:is_dir( - filename:join(Dir, Entry)) ]; + {ok, Entries} -> [E || E <- Entries, + rabbit_file:is_dir(filename:join(Dir, E))]; {error, enoent} -> [] end. diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 64debcab..9ffcd203 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -18,7 +18,7 @@ -include("rabbit_framing.hrl"). -include("rabbit.hrl"). --export([start_link/1, info_keys/0, info/1, info/2, force_event_refresh/1, +-export([start_link/1, info_keys/0, info/1, info/2, force_event_refresh/2, shutdown/2]). -export([system_continue/3, system_terminate/4, system_code_change/4]). @@ -77,7 +77,7 @@ -spec(info_keys/0 :: () -> rabbit_types:info_keys()). -spec(info/1 :: (pid()) -> rabbit_types:infos()). -spec(info/2 :: (pid(), rabbit_types:info_keys()) -> rabbit_types:infos()). --spec(force_event_refresh/1 :: (pid()) -> 'ok'). +-spec(force_event_refresh/2 :: (pid(), reference()) -> 'ok'). -spec(shutdown/2 :: (pid(), string()) -> 'ok'). -spec(conserve_resources/3 :: (pid(), atom(), boolean()) -> 'ok'). -spec(server_properties/1 :: (rabbit_types:protocol()) -> @@ -134,8 +134,8 @@ info(Pid, Items) -> {error, Error} -> throw(Error) end. -force_event_refresh(Pid) -> - gen_server:cast(Pid, force_event_refresh). +force_event_refresh(Pid, Ref) -> + gen_server:cast(Pid, {force_event_refresh, Ref}). conserve_resources(Pid, Source, Conserve) -> Pid ! {conserve_resources, Source, Conserve}, @@ -156,19 +156,23 @@ server_properties(Protocol) -> [case X of {KeyAtom, Value} -> {list_to_binary(atom_to_list(KeyAtom)), longstr, - list_to_binary(Value)}; + maybe_list_to_binary(Value)}; {BinKey, Type, Value} -> {BinKey, Type, Value} end || X <- RawConfigServerProps ++ - [{product, Product}, - {version, Version}, - {platform, "Erlang/OTP"}, - {copyright, ?COPYRIGHT_MESSAGE}, - {information, ?INFORMATION_MESSAGE}]]], + [{product, Product}, + {version, Version}, + {cluster_name, rabbit_nodes:cluster_name()}, + {platform, "Erlang/OTP"}, + {copyright, ?COPYRIGHT_MESSAGE}, + {information, ?INFORMATION_MESSAGE}]]], %% Filter duplicated properties in favour of config file provided values lists:usort(fun ({K1,_,_}, {K2,_,_}) -> K1 =< K2 end, NormalizedConfigServerProps). +maybe_list_to_binary(V) when is_binary(V) -> V; +maybe_list_to_binary(V) when is_list(V) -> list_to_binary(V). + server_capabilities(rabbit_framing_amqp_0_9_1) -> [{<<"publisher_confirms">>, bool, true}, {<<"exchange_exchange_bindings">>, bool, true}, @@ -395,10 +399,11 @@ handle_other({'$gen_call', From, {info, Items}}, State) -> catch Error -> {error, Error} end), State; -handle_other({'$gen_cast', force_event_refresh}, State) +handle_other({'$gen_cast', {force_event_refresh, Ref}}, State) when ?IS_RUNNING(State) -> - rabbit_event:notify(connection_created, - [{type, network} | infos(?CREATION_EVENT_KEYS, State)]), + rabbit_event:notify( + connection_created, + [{type, network} | infos(?CREATION_EVENT_KEYS, State)], Ref), State; handle_other({'$gen_cast', force_event_refresh}, State) -> %% Ignore, we will emit a created event once we start running. @@ -957,6 +962,9 @@ validate_negotiated_integer_value(Field, Min, ClientValue) -> ok end. +%% keep dialyzer happy +-spec fail_negotiation(atom(), 'min' | 'max', integer(), integer()) -> + no_return(). fail_negotiation(Field, MinOrMax, ServerValue, ClientValue) -> {S1, S2} = case MinOrMax of min -> {lower, minimum}; diff --git a/src/rabbit_runtime_parameters.erl b/src/rabbit_runtime_parameters.erl index bcde0078..18b9fbb8 100644 --- a/src/rabbit_runtime_parameters.erl +++ b/src/rabbit_runtime_parameters.erl @@ -22,6 +22,8 @@ list_component/1, list/2, list_formatted/1, lookup/3, value/3, value/4, info_keys/0]). +-export([set_global/2, value_global/1, value_global/2]). + %%---------------------------------------------------------------------------- -ifdef(use_specs). @@ -34,6 +36,7 @@ -> ok_or_error_string()). -spec(set_any/4 :: (rabbit_types:vhost(), binary(), binary(), term()) -> ok_or_error_string()). +-spec(set_global/2 :: (atom(), term()) -> 'ok'). -spec(clear/3 :: (rabbit_types:vhost(), binary(), binary()) -> ok_or_error_string()). -spec(clear_any/3 :: (rabbit_types:vhost(), binary(), binary()) @@ -48,6 +51,8 @@ -> rabbit_types:infos() | 'not_found'). -spec(value/3 :: (rabbit_types:vhost(), binary(), binary()) -> term()). -spec(value/4 :: (rabbit_types:vhost(), binary(), binary(), term()) -> term()). +-spec(value_global/1 :: (atom()) -> term() | 'not_found'). +-spec(value_global/2 :: (atom(), term()) -> term()). -spec(info_keys/0 :: () -> rabbit_types:info_keys()). -endif. @@ -74,6 +79,10 @@ set(_, <<"policy">>, _, _) -> set(VHost, Component, Name, Term) -> set_any(VHost, Component, Name, Term). +set_global(Name, Term) -> + mnesia_update(Name, Term), + ok. + format_error(L) -> {error_string, rabbit_misc:format_many([{"Validation failed~n", []} | L])}. @@ -100,16 +109,22 @@ set_any0(VHost, Component, Name, Term) -> E end. +mnesia_update(Key, Term) -> + rabbit_misc:execute_mnesia_transaction(mnesia_update_fun(Key, Term)). + mnesia_update(VHost, Comp, Name, Term) -> - F = fun () -> - Res = case mnesia:read(?TABLE, {VHost, Comp, Name}, read) of - [] -> new; - [Params] -> {old, Params#runtime_parameters.value} + rabbit_misc:execute_mnesia_transaction( + rabbit_vhost:with(VHost, mnesia_update_fun({VHost, Comp, Name}, Term))). + +mnesia_update_fun(Key, Term) -> + fun () -> + Res = case mnesia:read(?TABLE, Key, read) of + [] -> new; + [Params] -> {old, Params#runtime_parameters.value} end, - ok = mnesia:write(?TABLE, c(VHost, Comp, Name, Term), write), - Res - end, - rabbit_misc:execute_mnesia_transaction(rabbit_vhost:with(VHost, F)). + ok = mnesia:write(?TABLE, c(Key, Term), write), + Res + end. clear(_, <<"policy">> , _) -> {error_string, "policies may not be cleared using this method"}; @@ -159,43 +174,46 @@ list_formatted(VHost) -> [pset(value, format(pget(value, P)), P) || P <- list(VHost)]. lookup(VHost, Component, Name) -> - case lookup0(VHost, Component, Name, rabbit_misc:const(not_found)) of + case lookup0({VHost, Component, Name}, rabbit_misc:const(not_found)) of not_found -> not_found; Params -> p(Params) end. -value(VHost, Component, Name) -> - case lookup0(VHost, Component, Name, rabbit_misc:const(not_found)) of +value(VHost, Comp, Name) -> value0({VHost, Comp, Name}). +value(VHost, Comp, Name, Def) -> value0({VHost, Comp, Name}, Def). + +value_global(Key) -> value0(Key). +value_global(Key, Default) -> value0(Key, Default). + +value0(Key) -> + case lookup0(Key, rabbit_misc:const(not_found)) of not_found -> not_found; Params -> Params#runtime_parameters.value end. -value(VHost, Component, Name, Default) -> - Params = lookup0(VHost, Component, Name, - fun () -> - lookup_missing(VHost, Component, Name, Default) - end), +value0(Key, Default) -> + Params = lookup0(Key, fun () -> lookup_missing(Key, Default) end), Params#runtime_parameters.value. -lookup0(VHost, Component, Name, DefaultFun) -> - case mnesia:dirty_read(?TABLE, {VHost, Component, Name}) of +lookup0(Key, DefaultFun) -> + case mnesia:dirty_read(?TABLE, Key) of [] -> DefaultFun(); [R] -> R end. -lookup_missing(VHost, Component, Name, Default) -> +lookup_missing(Key, Default) -> rabbit_misc:execute_mnesia_transaction( fun () -> - case mnesia:read(?TABLE, {VHost, Component, Name}, read) of - [] -> Record = c(VHost, Component, Name, Default), + case mnesia:read(?TABLE, Key, read) of + [] -> Record = c(Key, Default), mnesia:write(?TABLE, Record, write), Record; [R] -> R end end). -c(VHost, Component, Name, Default) -> - #runtime_parameters{key = {VHost, Component, Name}, +c(Key, Default) -> + #runtime_parameters{key = Key, value = Default}. p(#runtime_parameters{key = {VHost, Component, Name}, value = Value}) -> diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 2d6ff73b..ce7fe451 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -39,7 +39,6 @@ all_tests() -> application:set_env(rabbit, file_handles_high_watermark, 10, infinity), ok = file_handle_cache:set_limit(10), passed = test_version_equivalance(), - passed = test_multi_call(), passed = test_file_handle_cache(), passed = test_backing_queue(), passed = test_rabbit_basic_header_handling(), @@ -66,6 +65,7 @@ all_tests() -> passed = test_amqp_connection_refusal(), passed = test_confirms(), passed = test_with_state(), + passed = test_mcall(), passed = do_if_secondary_node( fun run_cluster_dependent_tests/1, @@ -156,26 +156,6 @@ test_version_equivalance() -> false = rabbit_misc:version_minor_equivalent("3.0.0", "3.0.foo"), passed. -test_multi_call() -> - Fun = fun() -> - receive - {'$gen_call', {From, Mref}, request} -> - From ! {Mref, response} - end, - receive - never -> ok - end - end, - Pid1 = spawn(Fun), - Pid2 = spawn(Fun), - Pid3 = spawn(Fun), - exit(Pid2, bang), - {[{Pid1, response}, {Pid3, response}], [{Pid2, _Fail}]} = - rabbit_misc:multi_call([Pid1, Pid2, Pid3], request), - exit(Pid1, bang), - exit(Pid3, bang), - passed. - test_rabbit_basic_header_handling() -> passed = write_table_with_invalid_existing_type_test(), passed = invalid_existing_headers_test(), @@ -578,33 +558,38 @@ test_topic_matching() -> key = list_to_binary(Key), destination = #resource{virtual_host = <<"/">>, kind = queue, - name = list_to_binary(Q)}} || - {Key, Q} <- [{"a.b.c", "t1"}, - {"a.*.c", "t2"}, - {"a.#.b", "t3"}, - {"a.b.b.c", "t4"}, - {"#", "t5"}, - {"#.#", "t6"}, - {"#.b", "t7"}, - {"*.*", "t8"}, - {"a.*", "t9"}, - {"*.b.c", "t10"}, - {"a.#", "t11"}, - {"a.#.#", "t12"}, - {"b.b.c", "t13"}, - {"a.b.b", "t14"}, - {"a.b", "t15"}, - {"b.c", "t16"}, - {"", "t17"}, - {"*.*.*", "t18"}, - {"vodka.martini", "t19"}, - {"a.b.c", "t20"}, - {"*.#", "t21"}, - {"#.*.#", "t22"}, - {"*.#.#", "t23"}, - {"#.#.#", "t24"}, - {"*", "t25"}, - {"#.b.#", "t26"}]], + name = list_to_binary(Q)}, + args = Args} || + {Key, Q, Args} <- [{"a.b.c", "t1", []}, + {"a.*.c", "t2", []}, + {"a.#.b", "t3", []}, + {"a.b.b.c", "t4", []}, + {"#", "t5", []}, + {"#.#", "t6", []}, + {"#.b", "t7", []}, + {"*.*", "t8", []}, + {"a.*", "t9", []}, + {"*.b.c", "t10", []}, + {"a.#", "t11", []}, + {"a.#.#", "t12", []}, + {"b.b.c", "t13", []}, + {"a.b.b", "t14", []}, + {"a.b", "t15", []}, + {"b.c", "t16", []}, + {"", "t17", []}, + {"*.*.*", "t18", []}, + {"vodka.martini", "t19", []}, + {"a.b.c", "t20", []}, + {"*.#", "t21", []}, + {"#.*.#", "t22", []}, + {"*.#.#", "t23", []}, + {"#.#.#", "t24", []}, + {"*", "t25", []}, + {"#.b.#", "t26", []}, + {"args-test", "t27", + [{<<"foo">>, longstr, <<"bar">>}]}, + {"args-test", "t27", %% Note aliasing + [{<<"foo">>, longstr, <<"baz">>}]}]], lists:foreach(fun (B) -> exchange_op_callback(X, add_binding, [B]) end, Bindings), @@ -631,12 +616,13 @@ test_topic_matching() -> "t22", "t23", "t24", "t26"]}, {"nothing.here.at.all", ["t5", "t6", "t21", "t22", "t23", "t24"]}, {"oneword", ["t5", "t6", "t21", "t22", "t23", "t24", - "t25"]}]), - + "t25"]}, + {"args-test", ["t5", "t6", "t21", "t22", "t23", "t24", + "t25", "t27"]}]), %% remove some bindings RemovedBindings = [lists:nth(1, Bindings), lists:nth(5, Bindings), lists:nth(11, Bindings), lists:nth(19, Bindings), - lists:nth(21, Bindings)], + lists:nth(21, Bindings), lists:nth(28, Bindings)], exchange_op_callback(X, remove_bindings, [RemovedBindings]), RemainingBindings = ordsets:to_list( ordsets:subtract(ordsets:from_list(Bindings), @@ -659,7 +645,8 @@ test_topic_matching() -> {"b.b.c", ["t6", "t10", "t13", "t18", "t22", "t23", "t24", "t26"]}, {"nothing.here.at.all", ["t6", "t22", "t23", "t24"]}, - {"oneword", ["t6", "t22", "t23", "t24", "t25"]}]), + {"oneword", ["t6", "t22", "t23", "t24", "t25"]}, + {"args-test", ["t6", "t22", "t23", "t24", "t25", "t27"]}]), %% remove the entire exchange exchange_op_callback(X, delete, [RemainingBindings]), @@ -1042,6 +1029,9 @@ test_user_management() -> ok = control_action(add_vhost, ["/testhost"]), ok = control_action(set_permissions, ["foo", ".*", ".*", ".*"], [{"-p", "/testhost"}]), + {new, _} = rabbit_amqqueue:declare( + rabbit_misc:r(<<"/testhost">>, queue, <<"test">>), + true, false, [], none), ok = control_action(delete_vhost, ["/testhost"]), %% user deletion @@ -1368,6 +1358,82 @@ test_with_state() -> fun (S) -> element(1, S) end), passed. +test_mcall() -> + P1 = spawn(fun gs2_test_listener/0), + register(foo, P1), + global:register_name(gfoo, P1), + + P2 = spawn(fun() -> exit(bang) end), + %% ensure P2 is dead (ignore the race setting up the monitor) + await_exit(P2), + + P3 = spawn(fun gs2_test_crasher/0), + + %% since P2 crashes almost immediately and P3 after receiving its first + %% message, we have to spawn a few more processes to handle the additional + %% cases we're interested in here + register(baz, spawn(fun gs2_test_crasher/0)), + register(bog, spawn(fun gs2_test_crasher/0)), + global:register_name(gbaz, spawn(fun gs2_test_crasher/0)), + + NoNode = rabbit_nodes:make("nonode"), + + Targets = + %% pids + [P1, P2, P3] + ++ + %% registered names + [foo, bar, baz] + ++ + %% {Name, Node} pairs + [{foo, node()}, {bar, node()}, {bog, node()}, {foo, NoNode}] + ++ + %% {global, Name} + [{global, gfoo}, {global, gbar}, {global, gbaz}], + + GoodResults = [{D, goodbye} || D <- [P1, foo, + {foo, node()}, + {global, gfoo}]], + + BadResults = [{P2, noproc}, % died before use + {P3, boom}, % died on first use + {bar, noproc}, % never registered + {baz, boom}, % died on first use + {{bar, node()}, noproc}, % never registered + {{bog, node()}, boom}, % died on first use + {{foo, NoNode}, nodedown}, % invalid node + {{global, gbar}, noproc}, % never registered globally + {{global, gbaz}, boom}], % died on first use + + {Replies, Errors} = gen_server2:mcall([{T, hello} || T <- Targets]), + true = lists:sort(Replies) == lists:sort(GoodResults), + true = lists:sort(Errors) == lists:sort(BadResults), + + %% cleanup (ignore the race setting up the monitor) + P1 ! stop, + await_exit(P1), + passed. + +await_exit(Pid) -> + MRef = erlang:monitor(process, Pid), + receive + {'DOWN', MRef, _, _, _} -> ok + end. + +gs2_test_crasher() -> + receive + {'$gen_call', _From, hello} -> exit(boom) + end. + +gs2_test_listener() -> + receive + {'$gen_call', From, hello} -> + gen_server2:reply(From, goodbye), + gs2_test_listener(); + stop -> + ok + end. + test_statistics_event_receiver(Pid) -> receive Foo -> Pid ! Foo, test_statistics_event_receiver(Pid) @@ -1467,7 +1533,7 @@ test_refresh_events(SecondaryNode) -> expect_events(Tag, Key, Type) -> expect_event(Tag, Key, Type), - rabbit:force_event_refresh(), + rabbit:force_event_refresh(make_ref()), expect_event(Tag, Key, Type). expect_event(Tag, Key, Type) -> diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl index 90372461..4cb3cacc 100644 --- a/src/rabbit_upgrade_functions.erl +++ b/src/rabbit_upgrade_functions.erl @@ -47,6 +47,7 @@ -rabbit_upgrade({policy_apply_to, mnesia, [runtime_parameters]}). -rabbit_upgrade({queue_decorators, mnesia, [gm_pids]}). -rabbit_upgrade({internal_system_x, mnesia, [exchange_decorators]}). +-rabbit_upgrade({cluster_name, mnesia, [runtime_parameters]}). %% ------------------------------------------------------------------- @@ -355,6 +356,32 @@ internal_system_x() -> [name, type, durable, auto_delete, internal, arguments, scratches, policy, decorators]). +cluster_name() -> + {atomic, ok} = mnesia:transaction(fun cluster_name_tx/0), + ok. + +cluster_name_tx() -> + %% mnesia:transform_table/4 does not let us delete records + T = rabbit_runtime_parameters, + mnesia:write_lock_table(T), + Ks = [K || {_VHost, <<"federation">>, <<"local-nodename">>} = K + <- mnesia:all_keys(T)], + case Ks of + [] -> ok; + [K|Tl] -> [{runtime_parameters, _K, Name}] = mnesia:read(T, K, write), + R = {runtime_parameters, cluster_name, Name}, + mnesia:write(T, R, write), + case Tl of + [] -> ok; + _ -> {VHost, _, _} = K, + error_logger:warning_msg( + "Multiple local-nodenames found, picking '~s' " + "from '~s' for cluster name~n", [Name, VHost]) + end + end, + [mnesia:delete(T, K, write) || K <- Ks], + ok. + %%-------------------------------------------------------------------- transform(TableName, Fun, FieldList) -> diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 8711d139..321af4ac 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -21,8 +21,8 @@ dropwhile/2, fetchwhile/4, fetch/2, drop/2, ack/2, requeue/2, ackfold/4, fold/3, len/1, is_empty/1, depth/1, set_ram_duration_target/2, ram_duration/1, - needs_timeout/1, timeout/1, handle_pre_hibernate/1, status/1, invoke/3, - is_duplicate/2, multiple_routing_keys/0]). + needs_timeout/1, timeout/1, handle_pre_hibernate/1, msg_rates/1, + status/1, invoke/3, is_duplicate/2, multiple_routing_keys/0]). -export([start/1, stop/0]). @@ -277,11 +277,10 @@ unconfirmed, confirmed, ack_out_counter, - ack_in_counter, - ack_rates + ack_in_counter }). --record(rates, { egress, ingress, avg_egress, avg_ingress, timestamp }). +-record(rates, { in, out, ack_in, ack_out, timestamp }). -record(msg_status, { seq_id, @@ -322,11 +321,11 @@ -type(timestamp() :: {non_neg_integer(), non_neg_integer(), non_neg_integer()}). -type(seq_id() :: non_neg_integer()). --type(rates() :: #rates { egress :: {timestamp(), non_neg_integer()}, - ingress :: {timestamp(), non_neg_integer()}, - avg_egress :: float(), - avg_ingress :: float(), - timestamp :: timestamp() }). +-type(rates() :: #rates { in :: float(), + out :: float(), + ack_in :: float(), + ack_out :: float(), + timestamp :: timestamp()}). -type(delta() :: #delta { start_seq_id :: non_neg_integer(), count :: non_neg_integer(), @@ -368,8 +367,7 @@ unconfirmed :: gb_set(), confirmed :: gb_set(), ack_out_counter :: non_neg_integer(), - ack_in_counter :: non_neg_integer(), - ack_rates :: rates() }). + ack_in_counter :: non_neg_integer() }). %% Duplicated from rabbit_backing_queue -spec(ack/2 :: ([ack()], state()) -> {[rabbit_guid:guid()], state()}). @@ -384,6 +382,18 @@ count = 0, end_seq_id = Z }). +-define(MICROS_PER_SECOND, 1000000.0). + +%% We're sampling every 5s for RAM duration; a half life that is of +%% the same order of magnitude is probably about right. +-define(RATE_AVG_HALF_LIFE, 5.0). + +%% We will recalculate the #rates{} every time we get asked for our +%% RAM duration, or every N messages published, whichever is +%% sooner. We do this since the priority calculations in +%% rabbit_amqqueue_process need fairly fresh rates. +-define(MSGS_PER_RATE_CALC, 100). + %%---------------------------------------------------------------------------- %% Public API %%---------------------------------------------------------------------------- @@ -540,14 +550,18 @@ publish(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId }, false -> State1 #vqstate { q1 = ?QUEUE:in(m(MsgStatus1), Q1) }; true -> State1 #vqstate { q4 = ?QUEUE:in(m(MsgStatus1), Q4) } end, - PCount1 = PCount + one_if(IsPersistent1), + InCount1 = InCount + 1, + PCount1 = PCount + one_if(IsPersistent1), UC1 = gb_sets_maybe_insert(NeedsConfirming, MsgId, UC), - a(reduce_memory_use( - inc_ram_msg_count(State2 #vqstate { next_seq_id = SeqId + 1, - len = Len + 1, - in_counter = InCount + 1, - persistent_count = PCount1, - unconfirmed = UC1 }))). + State3 = inc_ram_msg_count(State2 #vqstate { next_seq_id = SeqId + 1, + len = Len + 1, + in_counter = InCount1, + persistent_count = PCount1, + unconfirmed = UC1 }), + a(reduce_memory_use(case InCount1 > ?MSGS_PER_RATE_CALC of + true -> update_rates(State3); + false -> State3 + end)). publish_delivered(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId }, @@ -629,6 +643,31 @@ drop(AckRequired, State) -> ack([], State) -> {[], State}; +%% optimisation: this head is essentially a partial evaluation of the +%% general case below, for the single-ack case. +ack([SeqId], State) -> + {#msg_status { msg_id = MsgId, + is_persistent = IsPersistent, + msg_on_disk = MsgOnDisk, + index_on_disk = IndexOnDisk }, + State1 = #vqstate { index_state = IndexState, + msg_store_clients = MSCState, + persistent_count = PCount, + ack_out_counter = AckOutCount }} = + remove_pending_ack(SeqId, State), + IndexState1 = case IndexOnDisk of + true -> rabbit_queue_index:ack([SeqId], IndexState); + false -> IndexState + end, + case MsgOnDisk of + true -> ok = msg_store_remove(MSCState, IsPersistent, [MsgId]); + false -> ok + end, + PCount1 = PCount - one_if(IsPersistent), + {[MsgId], + a(State1 #vqstate { index_state = IndexState1, + persistent_count = PCount1, + ack_out_counter = AckOutCount + 1 })}; ack(AckTags, State) -> {{IndexOnDiskSeqIds, MsgIdsByStore, AllMsgIds}, State1 = #vqstate { index_state = IndexState, @@ -696,10 +735,10 @@ depth(State = #vqstate { ram_pending_ack = RPA, disk_pending_ack = DPA }) -> set_ram_duration_target( DurationTarget, State = #vqstate { - rates = #rates { avg_egress = AvgEgressRate, - avg_ingress = AvgIngressRate }, - ack_rates = #rates { avg_egress = AvgAckEgressRate, - avg_ingress = AvgAckIngressRate }, + rates = #rates { in = AvgIngressRate, + out = AvgEgressRate, + ack_in = AvgAckIngressRate, + ack_out = AvgAckEgressRate }, target_ram_count = TargetRamCount }) -> Rate = AvgEgressRate + AvgIngressRate + AvgAckEgressRate + AvgAckIngressRate, @@ -716,29 +755,43 @@ set_ram_duration_target( false -> reduce_memory_use(State1) end). -ram_duration(State = #vqstate { - rates = #rates { timestamp = Timestamp, - egress = Egress, - ingress = Ingress } = Rates, - ack_rates = #rates { timestamp = AckTimestamp, - egress = AckEgress, - ingress = AckIngress } = ARates, - in_counter = InCount, - out_counter = OutCount, - ack_in_counter = AckInCount, - ack_out_counter = AckOutCount, - ram_msg_count = RamMsgCount, - ram_msg_count_prev = RamMsgCountPrev, - ram_pending_ack = RPA, - ram_ack_count_prev = RamAckCountPrev }) -> - Now = now(), - {AvgEgressRate, Egress1} = update_rate(Now, Timestamp, OutCount, Egress), - {AvgIngressRate, Ingress1} = update_rate(Now, Timestamp, InCount, Ingress), - - {AvgAckEgressRate, AckEgress1} = - update_rate(Now, AckTimestamp, AckOutCount, AckEgress), - {AvgAckIngressRate, AckIngress1} = - update_rate(Now, AckTimestamp, AckInCount, AckIngress), +update_rates(State = #vqstate{ in_counter = InCount, + out_counter = OutCount, + ack_in_counter = AckInCount, + ack_out_counter = AckOutCount, + rates = #rates{ in = InRate, + out = OutRate, + ack_in = AckInRate, + ack_out = AckOutRate, + timestamp = TS }}) -> + Now = erlang:now(), + + Rates = #rates { in = update_rate(Now, TS, InCount, InRate), + out = update_rate(Now, TS, OutCount, OutRate), + ack_in = update_rate(Now, TS, AckInCount, AckInRate), + ack_out = update_rate(Now, TS, AckOutCount, AckOutRate), + timestamp = Now }, + + State#vqstate{ in_counter = 0, + out_counter = 0, + ack_in_counter = 0, + ack_out_counter = 0, + rates = Rates }. + +update_rate(Now, TS, Count, Rate) -> + Time = timer:now_diff(Now, TS) / ?MICROS_PER_SECOND, + rabbit_misc:moving_average(Time, ?RATE_AVG_HALF_LIFE, Count / Time, Rate). + +ram_duration(State) -> + State1 = #vqstate { rates = #rates { in = AvgIngressRate, + out = AvgEgressRate, + ack_in = AvgAckIngressRate, + ack_out = AvgAckEgressRate }, + ram_msg_count = RamMsgCount, + ram_msg_count_prev = RamMsgCountPrev, + ram_pending_ack = RPA, + ram_ack_count_prev = RamAckCountPrev } = + update_rates(State), RamAckCount = gb_trees:size(RPA), @@ -752,25 +805,7 @@ ram_duration(State = #vqstate { AvgAckEgressRate + AvgAckIngressRate)) end, - {Duration, State #vqstate { - rates = Rates #rates { - egress = Egress1, - ingress = Ingress1, - avg_egress = AvgEgressRate, - avg_ingress = AvgIngressRate, - timestamp = Now }, - ack_rates = ARates #rates { - egress = AckEgress1, - ingress = AckIngress1, - avg_egress = AvgAckEgressRate, - avg_ingress = AvgAckIngressRate, - timestamp = Now }, - in_counter = 0, - out_counter = 0, - ack_in_counter = 0, - ack_out_counter = 0, - ram_msg_count_prev = RamMsgCount, - ram_ack_count_prev = RamAckCount }}. + {Duration, State1}. needs_timeout(State = #vqstate { index_state = IndexState, target_ram_count = TargetRamCount }) -> @@ -796,6 +831,10 @@ timeout(State = #vqstate { index_state = IndexState }) -> handle_pre_hibernate(State = #vqstate { index_state = IndexState }) -> State #vqstate { index_state = rabbit_queue_index:flush(IndexState) }. +msg_rates(#vqstate { rates = #rates { in = AvgIngressRate, + out = AvgEgressRate } }) -> + {AvgIngressRate, AvgEgressRate}. + status(#vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4, len = Len, @@ -805,10 +844,11 @@ status(#vqstate { ram_msg_count = RamMsgCount, next_seq_id = NextSeqId, persistent_count = PersistentCount, - rates = #rates { avg_egress = AvgEgressRate, - avg_ingress = AvgIngressRate }, - ack_rates = #rates { avg_egress = AvgAckEgressRate, - avg_ingress = AvgAckIngressRate } }) -> + rates = #rates { in = AvgIngressRate, + out = AvgEgressRate, + ack_in = AvgAckIngressRate, + ack_out = AvgAckEgressRate }}) -> + [ {q1 , ?QUEUE:len(Q1)}, {q2 , ?QUEUE:len(Q2)}, {delta , Delta}, @@ -998,10 +1038,6 @@ expand_delta(SeqId, #delta { count = Count, expand_delta(_SeqId, #delta { count = Count } = Delta) -> d(Delta #delta { count = Count + 1 }). -update_rate(Now, Then, Count, {OThen, OCount}) -> - %% avg over the current period and the previous - {1000000.0 * (Count + OCount) / timer:now_diff(Now, OThen), {Then, Count}}. - %%---------------------------------------------------------------------------- %% Internal major helpers for Public API %%---------------------------------------------------------------------------- @@ -1046,22 +1082,21 @@ init(IsDurable, IndexState, DeltaCount, Terms, ram_ack_count_prev = 0, out_counter = 0, in_counter = 0, - rates = blank_rate(Now, DeltaCount1), + rates = blank_rates(Now), msgs_on_disk = gb_sets:new(), msg_indices_on_disk = gb_sets:new(), unconfirmed = gb_sets:new(), confirmed = gb_sets:new(), ack_out_counter = 0, - ack_in_counter = 0, - ack_rates = blank_rate(Now, 0) }, + ack_in_counter = 0 }, a(maybe_deltas_to_betas(State)). -blank_rate(Timestamp, IngressLength) -> - #rates { egress = {Timestamp, 0}, - ingress = {Timestamp, IngressLength}, - avg_egress = 0.0, - avg_ingress = 0.0, - timestamp = Timestamp }. +blank_rates(Now) -> + #rates { in = 0.0, + out = 0.0, + ack_in = 0.0, + ack_out = 0.0, + timestamp = Now}. in_r(MsgStatus = #msg_status { msg = undefined }, State = #vqstate { q3 = Q3, q4 = Q4 }) -> @@ -1535,11 +1570,10 @@ reduce_memory_use(AlphaBetaFun, BetaDeltaFun, AckFun, ram_pending_ack = RPA, ram_msg_count = RamMsgCount, target_ram_count = TargetRamCount, - rates = #rates { avg_ingress = AvgIngress, - avg_egress = AvgEgress }, - ack_rates = #rates { avg_ingress = AvgAckIngress, - avg_egress = AvgAckEgress } - }) -> + rates = #rates { in = AvgIngress, + out = AvgEgress, + ack_in = AvgAckIngress, + ack_out = AvgAckEgress } }) -> {Reduce, State1 = #vqstate { q2 = Q2, q3 = Q3 }} = case chunk_size(RamMsgCount + gb_trees:size(RPA), TargetRamCount) of diff --git a/src/rabbit_vhost.erl b/src/rabbit_vhost.erl index 047bce77..9fa4da44 100644 --- a/src/rabbit_vhost.erl +++ b/src/rabbit_vhost.erl @@ -83,9 +83,9 @@ delete(VHostPath) -> %% eventually the termination of that process. Exchange deletion causes %% notifications which must be sent outside the TX rabbit_log:info("Deleting vhost '~s'~n", [VHostPath]), - [{ok,_} = rabbit_amqqueue:delete(Q, false, false) || + [assert_benign(rabbit_amqqueue:delete(Q, false, false)) || Q <- rabbit_amqqueue:list(VHostPath)], - [ok = rabbit_exchange:delete(Name, false) || + [assert_benign(rabbit_exchange:delete(Name, false)) || #exchange{name = Name} <- rabbit_exchange:list(VHostPath)], R = rabbit_misc:execute_mnesia_transaction( with(VHostPath, fun () -> @@ -94,6 +94,18 @@ delete(VHostPath) -> ok = rabbit_event:notify(vhost_deleted, [{name, VHostPath}]), R. +assert_benign(ok) -> ok; +assert_benign({ok, _}) -> ok; +assert_benign({error, not_found}) -> ok; +assert_benign({error, {absent, Q}}) -> + %% We have a durable queue on a down node. Removing the mnesia + %% entries here is safe. If/when the down node restarts, it will + %% clear out the on-disk storage of the queue. + case rabbit_amqqueue:internal_delete(Q#amqqueue.name) of + ok -> ok; + {error, not_found} -> ok + end. + internal_delete(VHostPath) -> [ok = rabbit_auth_backend_internal:clear_permissions( proplists:get_value(user, Info), VHostPath) |